荐 Twitter Storm Stream Grouping编写自定义分组实现

2019-03-02 23:57|来源: 网路

自定义Grouping测试

Storm是支持自定义分组的,本篇文章就是探究Storm如何编写一个自定义分组器,以及对Storm分组器如何分组数据的理解。

这是我写的一个自定义分组,总是把数据分到第一个Task:

public class MyFirstStreamGrouping implements CustomStreamGrouping {
    private static Logger log = LoggerFactory.getLogger(MyFirstStreamGrouping.class);

    private List<Integer> tasks;

    @Override
    public void prepare(WorkerTopologyContext context, GlobalStreamId stream,
        List<Integer> targetTasks) {
        this.tasks = targetTasks;
        log.info(tasks.toString());
    }   
    @Override
    public List<Integer> chooseTasks(int taskId, List<Object> values) {
        log.info(values.toString());
        return Arrays.asList(tasks.get(0));
    }
}

从上面的代码可以看出,该自定义分组会把数据归并到第一个TaskArrays.asList(tasks.get(0));,也就是数据到达后总是被派发到第一组。

测试代码:

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("words", new TestWordSpout(), 2); 
//自定义分组,
builder.setBolt("exclaim1", new DefaultStringBolt(), 3)
        .customGrouping("words", new MyFirstStreamGrouping());

和之前的测试用例一样,Spout总是发送new String[] {“nathan”, “mike”, “jackson”, “golda”, “bertels”}列表的字符串。我们运行验证一下:

11878 [Thread-25-exclaim1] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
11943 [Thread-41-words] INFO  cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [nathan]
11944 [Thread-25-exclaim1] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan
11979 [Thread-29-words] INFO  cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [mike]
11980 [Thread-25-exclaim1] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: mike
12045 [Thread-41-words] INFO  cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [jackson]
12045 [Thread-25-exclaim1] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
12080 [Thread-29-words] INFO  cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [jackson]
12081 [Thread-25-exclaim1] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
12145 [Thread-41-words] INFO  cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [mike]
12146 [Thread-25-exclaim1] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: mike

从这个运行日志我们可以看出,数据总是派发到一个Blot:Thread-25-exclaim1。因为我时本地测试,Thread-25-exclaim1是线程名。而派发的线程是数据多个线程的。因此该测试符合预期,即总是发送到一个Task,并且这个Task也是第一个。

理解自定义分组实现

自己实现一个自定义分组难吗?其实如果你理解了Hadoop的Partitioner,Storm的CustomStreamGrouping和它也是一样的道理。

Hadoop MapReduce的Map完成后会把Map的中间结果写入磁盘,在写磁盘前,线程首先根据数据最终要传送到的Reducer把数据划分成相应的分区,然后不同的分区进入不同的Reduce。我们先来看看Hadoop是怎样把数据怎样分组的,这是Partitioner唯一一个方法:

public class Partitioner<K, V> {
    @Override
    public int getPartition(K key, V value, int numReduceTasks) {
        return 0;
    }
}

上面的代码中:Map输出的数据都会经过getPartition()方法,用来确定下一步的分组。numReduceTasks是一个Job的Reduce数量,而返回值就是确定该条数据进入哪个Reduce。返回值必须大于等于0,小于numReduceTasks,否则就会报错。返回0就意味着这条数据进入第一个Reduce。对于随机分组来说,这个方法可以这么实现:

public int getPartition(K key, V value, int numReduceTasks) {
    return hash(key) % numReduceTasks;
}

其实Hadoop 默认的Hash分组策略也正是这么实现的。这样好处是,数据在整个集群基本上是负载平衡的。

搞通了Hadoop的Partitioner,我们来看看Storm的CustomStreamGrouping。

这是CustomStreamGrouping类的源码:

public interface CustomStreamGrouping extends Serializable {

   void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks);

   List<Integer> chooseTasks(int taskId, List<Object> values); 
}

一模一样的道理,targetTasks就是Storm运行时告诉你,当前有几个目标Task可以选择,每一个都给编上了数字编号。而 chooseTasks(int taskId, List values); 就是让你选择,你的这条数据values,是要哪几个目标Task处理?

如上文文章开头的自定义分组器实现的代码,我选择的总是让第一个Task来处理数据, return Arrays.asList(tasks.get(0)); 。和Hadoop不同的是,Storm允许一条数据被多个Task处理,因此返回值是List .就是让你来在提供的 'List targetTasks' Task中选择任意的几个(必须至少是一个)Task来处理数据。

由此,Storm的自定义分组策略也就不那么麻烦了吧?


转自:http://my.oschina.net/zhzhenqin/blog/223304

相关问答

更多
  • 注释掉process.exit(0); 在console.log(data)之前,它应该工作(我测试过,它为我做了)。 这导致您的脚本在控制台上显示data之前退出。 Comment out process.exit(0); before console.log(data) and it should work (I tested and it did for me). That is causing your script to exit before data is shown on the conso ...
  • 在风暴谷歌集团找到答案。 似乎DRCP拓扑将发出一个元组,其中包含由DRCP spout作为流接收的参数,然后在处理完成时指示回(使用称为请求ID的唯一标识)。 在同一个线程中,hadoop可能最适合这些情况,除非数据不够大,并且可以一直处理。 Found the answer in the storm google group. Seems that DRCP topologies will emit a tuple with parameters that is received by DRCP spo ...
  • 不 ,您不能为生成的@param提供默认类型。 只要可以从函数声明中检测到,PhpStorm就会提供正确的类型。 在你的例子中,它是不可能的(它可以是任何东西)。 而且我个人没有看到默认情况下它需要成为字符串(或任何其他类型)的真正原因。 未知/缺失类型告诉我,我需要完成编辑此PHPDoc块,以便它反映实际/正确的参数类型。 已经输入了默认类型,这样的检查是不可能的(如果类型应该是某个类(例如Person)或数组,那该怎么办?) PS种类相关(至少对实际主题): 这是可编辑PHPDoc模板的功能请求票证(可 ...
  • 在拓扑运行时无法更改拓扑结构。 您需要kill拓扑并在之后重新部署新版本。 拓扑运行时可以更改的唯一参数是并行度。 有关更多详细信息,请参见此处: https : //storm.apache.org/releases/1.0.0/Understanding-the-parallelism-of-a-Storm-topology.html It is not possible to change the structure of a topology while it is running. You nee ...
  • 您需要将json属性从json对象中拉出,并将两个值(json对象和String groupId)作为双值元组传递。 当您将流声明为拓扑规范逻辑的一部分时,您将为第二个字段指定名称“groupId”,并且事情应该正常工作。 如果您不想修改Kafka喷口,则需要有一个中间螺栓,其唯一目的是将groupId从json对象中分离出来。 中间螺栓还可以使用定向流(emitDirect()方法),将目标放在json对象中的groupId上。 这就是为什么我不重复使用Kafka喷口的一个原因 - 除了盲目地将数据写入流 ...
  • 您使用的代码是针对旧的twitter api版本1.Twitter最近推出了新版本的api 1.1并停止使用旧版本。 新的twitter api要求您对每个请求进行身份验证,并且您可以在Twitter开发人员站点中阅读的身份验证机制中进行一些更改。 至于脚本中的上述错误,是因为你试图解析JSON.parse(chunk); 块是html,你得到那些错误。 尝试更新最新版本的twitter api的代码。 谢谢 The code you are using is for old twitter api ver ...
  • 错误螺栓可能比您怀疑的要慢,导致error_stream上的备份,这反过来导致备份到您的第一个螺栓,最终导致元组开始超时。 当元组超时时,它会被喷口重新发送。 尝试: 增加超时配置(topology.message.timeout.secs), 限制来自喷口(topology.max.spout.pending)和/或的飞行元组数量 增加螺栓的平行度计数 It's possible that the error bolt is slower than you suspect, causing a backu ...
  • 您的问题的根本原因是您正在向您的风暴配置添加ProducerTemplate,并且它正在抛出异常,因为它不可序列化。 如果那是你自己的类,你可以改变代码使其工作,但由于这是一个Camel类,我会推荐一种不同的方法。 WebSocketBolt:将您的producerTemplate私有成员更改为transient: private transient ProducerTemplate producerTemplate; 这样就不会尝试序列化(将其置于conf中也会遇到同样的问题)。 WebSocketBol ...
  • 我从来没有听说过DSMS一词,但是看一下维基百科上的描述,我认为Storm绝对可以说是DSMS。 来自维基百科: 它类似于数据库管理系统(DBMS)[...]但是,与DBMS相比,DSMS执行连续查询,该查询不仅执行一次,而且是永久安装的。 这听起来就像Storm一样。 但请注意,在Storm的情况下,它通常与DBMS结合使用。 例如,Storm可以提供One-time queries , unlimited secondary storage等,维基百科说这与DSMS结合时缺乏DSMS。 I had ne ...
  • 一旦所有下游任务都响应了元组,就意味着它们已经成功处理了消息,并且如果关闭则无需重放。 如果要将任何状态保留在内存中,则应将其存储在持久性存储中。 当由于消息而导致状态发生变化时,应该激活消息。 Once all the downstream tasks have acked the tuple, it means that they have successfully processed the message and it need not be replayed if there is a shut ...