Storm-kafka【接口实现】-1 DynamicBrokersReader

2019-03-02 23:58|来源: 网路


阅读前提:您可能需要对kafka有基本的认识,并且和idaokafka-storm之中的关系

本章主题: 实现一个对于kafkaBroker 动态读取的Class - DynamicBrokersReader

本章精要: 1 关注kafka在Storm之上的信息注册

                  2 关注微观的逻辑过程


DynamicBrokersReader

package com.mixbox.storm.kafka;

import backtype.storm.Config;
import backtype.storm.utils.Utils;

import com.mixbox.storm.kafka.trident.GlobalPartitionInformation;
import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.framework.CuratorFrameworkFactory;
import com.netflix.curator.retry.RetryNTimes;
import org.json.simple.JSONValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.Map;

/**
 * 动态的Broker读 我们维护了有一个与zk之间的连接,提供了获取指定topic的每一个partition正在活动着的leader所对应的broker
 * 这样你有能力知道,当前的这些topic,哪一些broker是活动的 * @author Yin Shuai
 */

public class DynamicBrokersReader {

	public static final Logger LOG = LoggerFactory
			.getLogger(DynamicBrokersReader.class);

	// 对于Client CuratorFrameWork的封装
	private CuratorFramework _curator;

	// 在Zk上注册的位置
	private String _zkPath;

	// 指定的_topic
	private String _topic;

	public DynamicBrokersReader(Map conf, String zkStr, String zkPath,
			String topic) {
		_zkPath = zkPath;
		_topic = topic;
		try {
			_curator = CuratorFrameworkFactory
					.newClient(
							zkStr,
							Utils.getInt(conf
									.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)),
							15000,
							new RetryNTimes(
									Utils.getInt(conf
											.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)),
									Utils.getInt(conf
											.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL))));
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		_curator.start();
	}

	public DynamicBrokersReader(String zkPath) {
		this._zkPath = zkPath;
	}

	/**
	 * 确定指定topic下,每一个partition的leader,所对应的 主机和端口, 并将它们存入到全部分区信息中
	 * 
	 */
	public GlobalPartitionInformation getBrokerInfo() {
		GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation();
		try {

			// 拿到当前的分区数目
			int numPartitionsForTopic = getNumPartitions();

			/**
			 * /brokers/ids
			 */
			String brokerInfoPath = brokerPath();

			// 默认的我们的分区数目就只有 0, 1 两个
			for (int partition = 0; partition < numPartitionsForTopic; partition++) {

				// 这里请主要参考分区和领导者的关系
				int leader = getLeaderFor(partition);

				// 拿到领导者以后的zookeeper路径
				String leaderPath = brokerInfoPath + "/" + leader;

				try {

					byte[] brokerData = _curator.getData().forPath(leaderPath);

					/**
					 * 在这里, 我们拿到的brokerData为:
					 * {"jmx_port":-1,"timestamp":"1403076810435"
					 * ,"host":"192.168.50.207","version":1,"port":9092} 注意
					 * 这里是字节数组开始转json
					 */
					Broker hp = getBrokerHost(brokerData);

					/**
					 * 记录好 每一个分区 partition 所对应的 Broker
					 */
					globalPartitionInformation.addPartition(partition, hp);

				} catch (org.apache.zookeeper.KeeperException.NoNodeException e) {
					LOG.error("Node {} does not exist ", leaderPath);
				}
			}
		} catch (Exception e) {
			throw new RuntimeException(e);
		}
		LOG.info("Read partition info from zookeeper: "
				+ globalPartitionInformation);
		return globalPartitionInformation;
	}

	/**
	 * @return 拿到指定topic下的分区数目
	 */
	private int getNumPartitions() {
		try {
			String topicBrokersPath = partitionPath();
			List<String> children = _curator.getChildren().forPath(
					topicBrokersPath);
			return children.size();
		} catch (Exception e) {
			throw new RuntimeException(e);
		}
	}

	/**
	 * @return 拿到的topic在zookeeper注册的分区地址
	 *         brokers/topics/storm-sentence/partitions
	 */
	public String partitionPath() {
		return _zkPath + "/topics/" + _topic + "/partitions";
	}

	/**
	 *  持有的是Broker节点的id号码,这个id号是在配置的过程中为每一个Broker分配的
	 * @return   /brokers/ids
	 */
	public String brokerPath() {
		return _zkPath + "/ids";
	}

	/**
	 * get /brokers/topics/distributedTopic/partitions/1/state {
	 * "controller_epoch":4, "isr":[ 1, 0 ], "leader":1, "leader_epoch":1,
	 * "version":1 }
	 * 
	 * 说明一下,在kafka之中,每一个分区都会有一个Leader,有0个或者多个的followers, 一个leader会处理这个分区的所有请求
	 * @param partition
	 * @return
	 */
	private int getLeaderFor(long partition) {
		try {
			String topicBrokersPath = partitionPath();
			byte[] hostPortData = _curator.getData().forPath(
					topicBrokersPath + "/" + partition + "/state");
			@SuppressWarnings("unchecked")
			Map<Object, Object> value = (Map<Object, Object>) JSONValue
					.parse(new String(hostPortData, "UTF-8"));
			Integer leader = ((Number) value.get("leader")).intValue();
			return leader;
		} catch (Exception e) {
			throw new RuntimeException(e);
		}
	}

	public void close() {
		_curator.close();
	}

	/**
	 * [zk: localhost:2181(CONNECTED) 56] get /brokers/ids/0 {
	 * "host":"localhost", "jmx_port":9999, "port":9092, "version":1 }
	 * 
	 * 
	 * @param contents
	 * @return
	 */
	private Broker getBrokerHost(byte[] contents) {
		try {
			@SuppressWarnings("unchecked")
			Map<Object, Object> value = (Map<Object, Object>) JSONValue
					.parse(new String(contents, "UTF-8"));
			String host = (String) value.get("host");
			Integer port = ((Long) value.get("port")).intValue();
			return new Broker(host, port);
		} catch (UnsupportedEncodingException e) {
			throw new RuntimeException(e);
		}
	}

}

对于以上代码须知:

1 : 我们持有了一个ZkPath , 在Storm-kafka的class之中我们默认的是/brokers 


2 : _topic ,  目前我们是针对的是Topic, 也就是说我们的partition,leader都是针对于单个Topic的


3:   


    

1 int numPartitionsForTopic = getNumPartitions();

       针对与一个Topic,首先我们要取当前的分区数,一般的情况,我们在kafka之中默认的分区数为2

2 String brokerInfoPath = brokerPath();

       拿到 /brokers/ids 的分区号

3:	for (int partition = 0; partition < numPartitionsForTopic; partition++) {

        依次的遍历每一个分区

   

4:int leader = getLeaderFor(partition);String leaderPath = brokerInfoPath + "/" + leader;byte[] brokerData = _curator.getData().forPath(leaderPath);

       再通过分区拿到领导者,以及领导者的路径,最后拿到领导者的数据:

       我们举一个小例子

        * 在这里, 我们拿到的brokerData为:

* {"jmx_port":-1,"timestamp":"1403076810435"

* ,"host":"192.168.50.207","version":1,"port":9092} 

         

4:Broker hp = getBrokerHost(brokerData);

          拿到某一个Topic自己的分区在kafka所对应的Broker,并且其封装到 globalPartitionInformation


  

5 globalPartitionInformation.addPartition(partition, hp);

       GlobalPartitionInformaton底层维护了一个HashMap


     总结: 简单的来说:DynamicBrokersReader 针对某一个Topic维护了  每一个分区 partition 所对应的 Broker


    对于DynamicBrokersReader将被谁调用,如何进行下一步的封装,请看

    Storm-kafka【接口实现】-2 ZkBrokerReader


    


转自:http://my.oschina.net/u/1791874/blog/293778

相关问答

更多
  • 你好湮魂,从多个角度全面讲解Storm实时数据处理技术和最佳实践,为快速掌握并灵活应用Storm提供实用指南   从实际问题出发,系统介绍Storm的基本应用、多语言特性、完整业务系统实现和产品交付的最佳实践方法;从产品持续交付角度,分析并实践集成、测试和交付的所有步骤   《大数据技术丛书:Storm实时数据处理》涵盖搭建基于Storm的开发环境和测试实时系统的许多实用方法与实战用例,以及如何应用交付最佳实践来将系统部署至云端。
  • 试试kafkaConfig.forceStartOffsetTime(-1) 。 -1为最新的Kafka偏移量, -2为最早的可用偏移量。 编辑: 此外,您可以强制喷口开始使用相同的选项从任何所需的偏移量消耗 - 仅传递数值偏移量作为唯一参数。 忽略forceStartOffsetTime的“ Time ”,参数名称有点混乱。 卡夫卡的偏移量是数字,与任何时间概念都没有关系。 -1只是一种告诉卡夫卡鲸鱼嘴从卡夫卡本身收集最新偏移量的特殊方式(最早可用偏移量为-2 )。 Try kafkaConfig.for ...
  • 尝试添加环境变量: CHROOT=/kafka_0.9 Zookeeper还允许您添加一个“chroot”路径,该路径将使该群集的所有kafka数据显示在特定路径下。 这是在同一个zookeeper集群上设置多个Kafka集群或其他应用程序的一种方法。 要做到这一点,以hostname1:port1,hostname2:port2,hostname3:port3 / chroot / path的形式给出一个连接字符串,将所有这个集群的数据放在路径/ chroot / path下。 请注意,您必须在启动代理之 ...
  • 只要你独立运行它们,我认为不应该有任何问题。 As long as you are running them independently I don't think there should be any issues.
  • 我相信Flux可以处理调用静态工厂方法。 - id: "startingOffsetTime" className: "kafka.api.OffsetRequest" factory: "LatestTime" 然后在你的SpoutConfig定义中使用它 properties: - name: "startOffsetTime" ref: "startingOffsetTime" 我没有测试过这个,但我认为它应该可以工作。 调用静态工厂方法的能力在https://issues.a ...
  • 我将config中的“topology.spout.max.batch.size”值更新为大约64 * 1024值,然后风暴处理变得很快。 I updated the "topology.spout.max.batch.size" value in config to about 64*1024 value and then storm processing became fast.
  • 问题出在提交代码中 - 如果在没有拓扑名称的情况下运行storm jar则提交拓扑的模板代码将创建LocalCluster的实例,并且本地集群不捕获状态,因此不会重放。 所以 $ storm jar myjar.jar storm.myorg.MyTopology topologyname 将在我的单节点开发集群上启动它 $ storm jar myjar.jar storm.myorg.MyTopology 将在LocalCluster的实例上启动它 The problem were in the s ...
  • 我最终通过使用maven repo中预编译的storm-kafka版本并在拓扑中添加过滤器螺栓而不是在spout本身中进行过滤来解决这个问题。 从而消除了对storm-core和storm-kafka本地编译的jar文件的需求。 这不是一个“解决方案”,但它是解决问题的一种方法。 I eventually worked around this problem by using a pre-compiled version of storm-kafka from a maven repo and adding ...
  • HDP 2.5中的Apache Storm实际上与正式的Apache Storm 1.0.1不同,因为它从下一个版本中提取了更多补丁(尚未发布)。 https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.5.0/bk_release-notes/content/patch_storm.html “卡夫卡的偏移滞后”功能引入了Storm 1.1.0(尚未发布)并且需要相关的storm-kafka版本(因此也从Storm 1.1.0开始)。 您需要将storm- ...
  • 我可以指出几个起点,从这些起点到完全功能的东西会涉及到一些工作。 一种选择是使用docker镜像在Cloud Foundry(例如Pivotal Web Services)上部署kafka群集。 Spotify有Dockerized kafka和kafka-proxy (包括Zookeeper)。 要记住的一件事是PWS目前不支持具有持久性的应用程序(尽管这项工作正在开始 ),所以如果你现在就开始使用这条路线,那么在应用程序推出时你会丢失kafka中的数据。 看看Spotify的回购,看起来Docker图像 ...