知识点
相关文章
更多最近更新
更多荐 Twitter Storm Stream Grouping编写自定义分组实现
2019-03-02 23:57|来源: 网路
自定义Grouping测试
Storm是支持自定义分组的,本篇文章就是探究Storm如何编写一个自定义分组器,以及对Storm分组器如何分组数据的理解。
这是我写的一个自定义分组,总是把数据分到第一个Task:
public class MyFirstStreamGrouping implements CustomStreamGrouping { private static Logger log = LoggerFactory.getLogger(MyFirstStreamGrouping.class); private List<Integer> tasks; @Override public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) { this.tasks = targetTasks; log.info(tasks.toString()); } @Override public List<Integer> chooseTasks(int taskId, List<Object> values) { log.info(values.toString()); return Arrays.asList(tasks.get(0)); } }
从上面的代码可以看出,该自定义分组会把数据归并到第一个TaskArrays.asList(tasks.get(0));
,也就是数据到达后总是被派发到第一组。
测试代码:
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("words", new TestWordSpout(), 2); //自定义分组, builder.setBolt("exclaim1", new DefaultStringBolt(), 3) .customGrouping("words", new MyFirstStreamGrouping());
和之前的测试用例一样,Spout总是发送new String[] {“nathan”, “mike”, “jackson”, “golda”, “bertels”}
列表的字符串。我们运行验证一下:
11878 [Thread-25-exclaim1] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson 11943 [Thread-41-words] INFO cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [nathan] 11944 [Thread-25-exclaim1] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan 11979 [Thread-29-words] INFO cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [mike] 11980 [Thread-25-exclaim1] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: mike 12045 [Thread-41-words] INFO cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [jackson] 12045 [Thread-25-exclaim1] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson 12080 [Thread-29-words] INFO cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [jackson] 12081 [Thread-25-exclaim1] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson 12145 [Thread-41-words] INFO cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [mike] 12146 [Thread-25-exclaim1] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: mike
从这个运行日志我们可以看出,数据总是派发到一个Blot:Thread-25-exclaim1。因为我时本地测试,Thread-25-exclaim1是线程名。而派发的线程是数据多个线程的。因此该测试符合预期,即总是发送到一个Task,并且这个Task也是第一个。
理解自定义分组实现
自己实现一个自定义分组难吗?其实如果你理解了Hadoop的Partitioner,Storm的CustomStreamGrouping和它也是一样的道理。
Hadoop MapReduce的Map完成后会把Map的中间结果写入磁盘,在写磁盘前,线程首先根据数据最终要传送到的Reducer把数据划分成相应的分区,然后不同的分区进入不同的Reduce。我们先来看看Hadoop是怎样把数据怎样分组的,这是Partitioner唯一一个方法:
public class Partitioner<K, V> { @Override public int getPartition(K key, V value, int numReduceTasks) { return 0; } }
上面的代码中:Map输出的数据都会经过getPartition()方法,用来确定下一步的分组。numReduceTasks是一个Job的Reduce数量,而返回值就是确定该条数据进入哪个Reduce。返回值必须大于等于0,小于numReduceTasks,否则就会报错。返回0就意味着这条数据进入第一个Reduce。对于随机分组来说,这个方法可以这么实现:
public int getPartition(K key, V value, int numReduceTasks) { return hash(key) % numReduceTasks; }
其实Hadoop 默认的Hash分组策略也正是这么实现的。这样好处是,数据在整个集群基本上是负载平衡的。
搞通了Hadoop的Partitioner,我们来看看Storm的CustomStreamGrouping。
这是CustomStreamGrouping类的源码:
public interface CustomStreamGrouping extends Serializable { void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks); List<Integer> chooseTasks(int taskId, List<Object> values); }
一模一样的道理,targetTasks就是Storm运行时告诉你,当前有几个目标Task可以选择,每一个都给编上了数字编号。而 chooseTasks(int taskId, List
转自:http://my.oschina.net/zhzhenqin/blog/223304
相关问答
更多-
注释掉process.exit(0); 在console.log(data)之前,它应该工作(我测试过,它为我做了)。 这导致您的脚本在控制台上显示data之前退出。 Comment out process.exit(0); before console.log(data) and it should work (I tested and it did for me). That is causing your script to exit before data is shown on the conso ...
-
好好利用风暴?(Good use of storm?)[2021-12-04]
在风暴谷歌集团找到答案。 似乎DRCP拓扑将发出一个元组,其中包含由DRCP spout作为流接收的参数,然后在处理完成时指示回(使用称为请求ID的唯一标识)。 在同一个线程中,hadoop可能最适合这些情况,除非数据不够大,并且可以一直处理。 Found the answer in the storm google group. Seems that DRCP topologies will emit a tuple with parameters that is received by DRCP spo ... -
不 ,您不能为生成的@param提供默认类型。 只要可以从函数声明中检测到,PhpStorm就会提供正确的类型。 在你的例子中,它是不可能的(它可以是任何东西)。 而且我个人没有看到默认情况下它需要成为字符串(或任何其他类型)的真正原因。 未知/缺失类型告诉我,我需要完成编辑此PHPDoc块,以便它反映实际/正确的参数类型。 已经输入了默认类型,这样的检查是不可能的(如果类型应该是某个类(例如Person)或数组,那该怎么办?) PS种类相关(至少对实际主题): 这是可编辑PHPDoc模板的功能请求票证(可 ...
-
在拓扑运行时无法更改拓扑结构。 您需要kill拓扑并在之后重新部署新版本。 拓扑运行时可以更改的唯一参数是并行度。 有关更多详细信息,请参见此处: https : //storm.apache.org/releases/1.0.0/Understanding-the-parallelism-of-a-Storm-topology.html It is not possible to change the structure of a topology while it is running. You nee ...
-
Storm Fields分组示例(Storm Fields grouping example)[2022-02-14]
您需要将json属性从json对象中拉出,并将两个值(json对象和String groupId)作为双值元组传递。 当您将流声明为拓扑规范逻辑的一部分时,您将为第二个字段指定名称“groupId”,并且事情应该正常工作。 如果您不想修改Kafka喷口,则需要有一个中间螺栓,其唯一目的是将groupId从json对象中分离出来。 中间螺栓还可以使用定向流(emitDirect()方法),将目标放在json对象中的groupId上。 这就是为什么我不重复使用Kafka喷口的一个原因 - 除了盲目地将数据写入流 ... -
您使用的代码是针对旧的twitter api版本1.Twitter最近推出了新版本的api 1.1并停止使用旧版本。 新的twitter api要求您对每个请求进行身份验证,并且您可以在Twitter开发人员站点中阅读的身份验证机制中进行一些更改。 至于脚本中的上述错误,是因为你试图解析JSON.parse(chunk); 块是html,你得到那些错误。 尝试更新最新版本的twitter api的代码。 谢谢 The code you are using is for old twitter api ver ...
-
Storm拓扑中的可选流(Optional stream in Storm topology)[2022-10-05]
错误螺栓可能比您怀疑的要慢,导致error_stream上的备份,这反过来导致备份到您的第一个螺栓,最终导致元组开始超时。 当元组超时时,它会被喷口重新发送。 尝试: 增加超时配置(topology.message.timeout.secs), 限制来自喷口(topology.max.spout.pending)和/或的飞行元组数量 增加螺栓的平行度计数 It's possible that the error bolt is slower than you suspect, causing a backu ... -
在Apache Storm bolt中使用Apache Camel ProducerTemplate(Using Apache Camel ProducerTemplate in Apache Storm bolt)[2022-09-09]
您的问题的根本原因是您正在向您的风暴配置添加ProducerTemplate,并且它正在抛出异常,因为它不可序列化。 如果那是你自己的类,你可以改变代码使其工作,但由于这是一个Camel类,我会推荐一种不同的方法。 WebSocketBolt:将您的producerTemplate私有成员更改为transient: private transient ProducerTemplate producerTemplate; 这样就不会尝试序列化(将其置于conf中也会遇到同样的问题)。 WebSocketBol ... -
我从来没有听说过DSMS一词,但是看一下维基百科上的描述,我认为Storm绝对可以说是DSMS。 来自维基百科: 它类似于数据库管理系统(DBMS)[...]但是,与DBMS相比,DSMS执行连续查询,该查询不仅执行一次,而且是永久安装的。 这听起来就像Storm一样。 但请注意,在Storm的情况下,它通常与DBMS结合使用。 例如,Storm可以提供One-time queries , unlimited secondary storage等,维基百科说这与DSMS结合时缺乏DSMS。 I had ne ...
-
一旦所有下游任务都响应了元组,就意味着它们已经成功处理了消息,并且如果关闭则无需重放。 如果要将任何状态保留在内存中,则应将其存储在持久性存储中。 当由于消息而导致状态发生变化时,应该激活消息。 Once all the downstream tasks have acked the tuple, it means that they have successfully processed the message and it need not be replayed if there is a shut ...