知识点
相关文章
更多最近更新
更多【Storm-kafka】接口:PartitionManager 分区管理器
2019-03-02 23:58|来源: 网路
阅读背景:对于java内部类有一个粗浅的认识
阅读目的:了解kafka 分区是如何在Storm接口之中进行管理的
最终主题:详尽的梳理PartitionManager的整个过程
package com.mixbox.storm.kafka; import backtype.storm.Config; import backtype.storm.metric.api.CombinedMetric; import backtype.storm.metric.api.CountMetric; import backtype.storm.metric.api.MeanReducer; import backtype.storm.metric.api.ReducedMetric; import backtype.storm.spout.SpoutOutputCollector; import com.google.common.collect.ImmutableMap; import com.mixbox.storm.kafka.KafkaSpout.EmitState; import com.mixbox.storm.kafka.KafkaSpout.MessageAndRealOffset; import com.mixbox.storm.kafka.trident.MaxMetric; import kafka.api.OffsetRequest; import kafka.javaapi.consumer.SimpleConsumer; import kafka.javaapi.message.ByteBufferMessageSet; import kafka.message.MessageAndOffset; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.*; /** * 分区的管理器 * * @author Yin Shuai * */ public class PartitionManager { public static final Logger LOG = LoggerFactory .getLogger(PartitionManager.class); private final CombinedMetric _fetchAPILatencyMax; private final ReducedMetric _fetchAPILatencyMean; private final CountMetric _fetchAPICallCount; private final CountMetric _fetchAPIMessageCount; /** * kafka MessageID 封装了 partition 和offset * * @author Yin Shuai */ static class KafkaMessageId { public Partition partition; public long offset; public KafkaMessageId(Partition partition, long offset) { this.partition = partition; this.offset = offset; } } // 被发送的偏移量 Long _emittedToOffset; SortedSet<Long> _pending = new TreeSet<Long>(); // 已经提交的 Long _committedTo; // 等待去发射 LinkedList<MessageAndRealOffset> _waitingToEmit = new LinkedList<MessageAndRealOffset>(); // 分区 Partition _partition; // Storm Spout的配置文件 SpoutConfig _spoutConfig; // topology 的实例ID String _topologyInstanceId; // kafka 底层的消费者ID SimpleConsumer _consumer; // 动态的分区Connection DynamicPartitionConnections _connections; //ZKState 状态的维护 ZkState _state; //Storm的配置文件 Map _stormConf; // @SuppressWarnings("unchecked") public PartitionManager(DynamicPartitionConnections connections, String topologyInstanceId, ZkState state, Map stormConf, SpoutConfig spoutConfig, Partition id) { _partition = id; _connections = connections; _spoutConfig = spoutConfig; _topologyInstanceId = topologyInstanceId; _consumer = connections.register(id.host, id.partition); _state = state; _stormConf = stormConf; String jsonTopologyId = null; Long jsonOffset = null; String path = committedPath(); try { Map<Object, Object> json = _state.readJSON(path); LOG.info("Read partition information from: " + path + " --> " + json); if (json != null) { jsonTopologyId = (String) ((Map<Object, Object>) json .get("topology")).get("id"); jsonOffset = (Long) json.get("offset"); } } catch (Throwable e) { LOG.warn("Error reading and/or parsing at ZkNode: " + path, e); } if (jsonTopologyId == null || jsonOffset == null) { // failed to parse // JSON? _committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig); LOG.info("No partition information found, using configuration to determine offset"); } else if (!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.forceFromStart) { _committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig.startOffsetTime); LOG.info("Topology change detected and reset from start forced, using configuration to determine offset"); } else { _committedTo = jsonOffset; LOG.info("Read last commit offset from zookeeper: " + _committedTo + "; old topology_id: " + jsonTopologyId + " - new topology_id: " + topologyInstanceId); } LOG.info("Starting " + _partition + " from offset " + _committedTo); _emittedToOffset = _committedTo; _fetchAPILatencyMax = new CombinedMetric(new MaxMetric()); _fetchAPILatencyMean = new ReducedMetric(new MeanReducer()); _fetchAPICallCount = new CountMetric(); _fetchAPIMessageCount = new CountMetric(); } public Map getMetricsDataMap() { Map ret = new HashMap(); ret.put(_partition + "/fetchAPILatencyMax", _fetchAPILatencyMax.getValueAndReset()); ret.put(_partition + "/fetchAPILatencyMean", _fetchAPILatencyMean.getValueAndReset()); ret.put(_partition + "/fetchAPICallCount", _fetchAPICallCount.getValueAndReset()); ret.put(_partition + "/fetchAPIMessageCount", _fetchAPIMessageCount.getValueAndReset()); return ret; } // returns false if it's reached the end of current batch public EmitState next(SpoutOutputCollector collector) { //等待去发送的 为空了。 if (_waitingToEmit.isEmpty()) { // 开始装载 fill(); } while (true) { //检索并移除List中间的第一个元素 MessageAndRealOffset toEmit = _waitingToEmit.pollFirst(); //要发送的为空的时候, 没有发生的 if (toEmit == null) { return EmitState.NO_EMITTED; } // 这里的tups Iterable<List<Object>> tups = KafkaUtils.generateTuples( _spoutConfig, toEmit.msg); if (tups != null) { for (List<Object> tup : tups) { collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset)); } break; } else { ack(toEmit.offset); } } if (!_waitingToEmit.isEmpty()) { return EmitState.EMITTED_MORE_LEFT; } else { return EmitState.EMITTED_END; } } /** * 填充的行为 * 这里真正的决定了你有哪些数据需要去发送 */ private void fill() { long start = System.nanoTime(); /* * 拿到MessageSet */ ByteBufferMessageSet msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, _emittedToOffset); long end = System.nanoTime(); long millis = (end - start) / 1000000; _fetchAPILatencyMax.update(millis); _fetchAPILatencyMean.update(millis); _fetchAPICallCount.incr(); int numMessages = countMessages(msgs); _fetchAPIMessageCount.incrBy(numMessages); if (numMessages > 0) { LOG.info("Fetched " + numMessages + " messages from: " + _partition); } for (MessageAndOffset msg : msgs) { _pending.add(_emittedToOffset); _waitingToEmit.add(new MessageAndRealOffset(msg.message(), _emittedToOffset)); _emittedToOffset = msg.nextOffset(); } if (numMessages > 0) { LOG.info("Added " + numMessages + " messages from: " + _partition + " to internal buffers"); } } private int countMessages(ByteBufferMessageSet messageSet) { int counter = 0; for (MessageAndOffset messageAndOffset : messageSet) { counter = counter + 1; } return counter; } public void ack(Long offset) { _pending.remove(offset); } public void fail(Long offset) { // TODO: should it use in-memory ack set to skip anything that's been // acked but not committed??? // things might get crazy with lots of timeouts if (_emittedToOffset > offset) { _emittedToOffset = offset; _pending.tailSet(offset).clear(); } } public void commit() { // 最新完成的偏移量 long lastCompletedOffset = lastCompletedOffset(); //写最新的完全的偏移量到zk,的某个分区,到某一个topology if (lastCompletedOffset != lastCommittedOffset()) { LOG.info("Writing last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId); Map<Object, Object> data = ImmutableMap .builder() .put("topology", ImmutableMap.of("id", _topologyInstanceId, "name", _stormConf.get(Config.TOPOLOGY_NAME))) .put("offset", lastCompletedOffset) .put("partition", _partition.partition) .put("broker", ImmutableMap.of("host", _partition.host.host, "port", _partition.host.port)) .put("topic", _spoutConfig.topic).build(); // 直接JSON 写入 _state.writeJSON(committedPath(), data); _committedTo = lastCompletedOffset; LOG.info("Wrote last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId); } else { LOG.info("No new offset for " + _partition + " for topology: " + _topologyInstanceId); } } //提交的路径 private String committedPath() { return "/" + _spoutConfig.zkRoot + "/" + _spoutConfig.id + "/" + _partition.getId(); } //拿到最新的分区便宜量 public long queryPartitionOffsetLatestTime() { return KafkaUtils.getOffset(_consumer, _spoutConfig.topic, _partition.partition, OffsetRequest.LatestTime()); } //最新的提交的便宜量 public long lastCommittedOffset() { return _committedTo; } public long lastCompletedOffset() { if (_pending.isEmpty()) { return _emittedToOffset; } else { return _pending.first(); } } //拿到最新的分区 public Partition getPartition() { return _partition; } public void close() { _connections.unregister(_partition.host, _partition.partition); } }
1 PartitionManager封装了一个Static 的class kafkaMessageId,并且封装了某个分区和偏移量
static class KafkaMessageId { public Partition partition; public long offset; public KafkaMessageId(Partition partition, long offset) { this.partition = partition; this.offset = offset; } }
2: 在PartitionManager中同时持有了一下的实例变量:
2.1 已经发射的数据 pending
2.2 已经提交的 committedTo
2.3 等待去发射的 _waitingToEmit
2.4 具体的分区 _partition
其中 _waitingToEmit 是一个LinkedList<MessageAndRealOffset>
3 : PartitionManager 在初始化的时候,需要传递的参数是
topologyInstanceId
DynamicPartitionConnections
ZkState
Map
SpoutConfig
Partition
SimpleConsumer 对象,SimpleConsumer对象将在 DynamicPartitionConnections中
通过register的方法进行注册
转自:http://my.oschina.net/u/1791874/blog/299337
相关问答
更多-
用电脑对教师档案进行管理,属于电脑应用中的:()[2022-09-19]
管理 -
Storm-kafka 0.8 plus,我可以从最新的偏移量中读取吗?(Storm-kafka 0.8 plus, Can I read from the latest offset?)[2022-09-19]
试试kafkaConfig.forceStartOffsetTime(-1) 。 -1为最新的Kafka偏移量, -2为最早的可用偏移量。 编辑: 此外,您可以强制喷口开始使用相同的选项从任何所需的偏移量消耗 - 仅传递数值偏移量作为唯一参数。 忽略forceStartOffsetTime的“ Time ”,参数名称有点混乱。 卡夫卡的偏移量是数字,与任何时间概念都没有关系。 -1只是一种告诉卡夫卡鲸鱼嘴从卡夫卡本身收集最新偏移量的特殊方式(最早可用偏移量为-2 )。 Try kafkaConfig.for ... -
我相信Flux可以处理调用静态工厂方法。 - id: "startingOffsetTime" className: "kafka.api.OffsetRequest" factory: "LatestTime" 然后在你的SpoutConfig定义中使用它 properties: - name: "startOffsetTime" ref: "startingOffsetTime" 我没有测试过这个,但我认为它应该可以工作。 调用静态工厂方法的能力在https://issues.a ...
-
我将config中的“topology.spout.max.batch.size”值更新为大约64 * 1024值,然后风暴处理变得很快。 I updated the "topology.spout.max.batch.size" value in config to about 64*1024 value and then storm processing became fast.
-
问:如何在tomcats server.xml中配置会话管理器? 答:
无法使用kafka-storm向apache storm提交拓扑(Unable to submit topology to apache storm using kafka-storm)[2022-04-07]
我最终通过使用maven repo中预编译的storm-kafka版本并在拓扑中添加过滤器螺栓而不是在spout本身中进行过滤来解决这个问题。 从而消除了对storm-core和storm-kafka本地编译的jar文件的需求。 这不是一个“解决方案”,但它是解决问题的一种方法。 I eventually worked around this problem by using a pre-compiled version of storm-kafka from a maven repo and adding ...差不多两年后,我有一个相似的(如果不那么雄心勃勃的)目标:从Matlab,我想在所有文件资源管理器窗口中记录当前路径。 下面的代码适用于我(也许可以扩展为执行其他操作): explorer = actxserver('Shell.Application'); windows = explorer.Windows; nWins = windows.Count; for iWin = 1:nWins w1 = windows.Item(int32(iWin-1)); % Item is z ...是的,你可以..同一个zookeeper集群可用于处理多个应用程序。 设置zookeeper非常简单,你可以指定zookeeper节点和端口列表为 storm.zookeeper.servers: - "localhost" storm.zookeeper.port: 2181 storm.zookeeper.root: "/storm" 这是一个示例文件 Yes you can .. same zookeeper cluster can be used for han ...后来我发现上述行为的原因是因为注册的序列化器过滤掉了来自卡夫卡队列的所有消息,因此没有消息从喷口发射到螺栓。 这也解释了为什么kafka主题消费群体的抵消逐渐增加。 I later figured out that the reason for the above behaviour was because the serializer registered was filtering out all of the messages from the kafka queue thereby no messa ...