荐 Twitter Storm, 数据流分组策略,fieldsGrouping

2019-03-02 23:57|来源: 网路

Storm Grouping

  1. shuffleGrouping

    将流分组定义为混排。这种混排分组意味着来自Spout的输入将混排,或随机分发给此Bolt中的任务。shuffle grouping对各个task的tuple分配的比较均匀。

  2. fieldsGrouping

    这种grouping机制保证相同field值的tuple会去同一个task,这对于WordCount来说非常关键,如果同一个单词不去同一个task,那么统计出来的单词次数就不对了。

  3. All grouping

    广播发送, 对于每一个tuple将会复制到每一个bolt中处理。

  4. Global grouping

    Stream中的所有的tuple都会发送给同一个bolt任务处理,所有的tuple将会发送给拥有最小task_id的bolt任务处理。

  5. None grouping

    不关注并行处理负载均衡策略时使用该方式,目前等同于shuffle grouping,另外storm将会把bolt任务和他的上游提供数据的任务安排在同一个线程下。

  6. Direct grouping

    由tuple的发射单元直接决定tuple将发射给那个bolt,一般情况下是由接收tuple的bolt决定接收哪个bolt发射的Tuple。这是一种比较特别的分组方法,用这种分组意味着消息的发送者指定由消息接收者的哪个task处理这个消息。 只有被声明为Direct Stream的消息流可以声明这种分组方法。而且这种消息tuple必须使用emitDirect方法来发射。消息处理者可以通过TopologyContext来获取处理它的消息的taskid (OutputCollector.emit方法也会返回taskid)

fieldsGrouping

上面的资料我摘抄自:http://xumingming.sinaapp.com/127/twitter-storm%E5%A6%82%E4%BD%95%E4%BF%9D%E8%AF%81%E6%B6%88%E6%81%AF%E4%B8%8D%E4%B8%A2%E5%A4%B1/

如果你了解Storm,我想你能明白其中的大多数Grouping。这里的Grouping策略我想着重介绍一下fieldsGrouping,也最难理解的。

fieldsGrouping是按照数据中字段Field的值分组的。下面是我的测试代码:

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("words", new TestWordSpout(), 2); 
builder.setBolt("exclaim2", new DefaultStringBolt(), 5)
        .fieldsGrouping("words", new Fields("word"));

测试的例子Spout是Storm自带的例子,Blot代码如下:

public void execute(Tuple tuple) {
    log.info("rev a message: " + tuple.getString(0));
    collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
    collector.ack(tuple);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("word"));
}

Storm自带的例子Spout能随机的返回new String[] {“nathan”, “mike”, “jackson”, “golda”, “bertels”};列表中的几个字符串。这也是测试FieldGroup的好例子。

按照我最早做Storm开始前的理解,既然是按照Field分组,那么是所有相同的Field值得数据都会到达一个Blot的。我测试很多次,其结果并不是这样,一个Blot会收到多个不同的值。我没有仔细探究Storm这样分组有什么特别的地方,以至于自己对Storm的学习停滞了很长时间。

Storm能保证所有相同Field值的数据到达的是相同的Blot,但是不保证一个Blot只处理一个值域。

也就是说,所有值是nathan能到达到一个Blot,但是到达同一个Blot的值可能有多个,如"nathan”, “mike"的数据都到达。

理解到这点上,fieldsGrouping就算是理解了。

下面是测试日志:

9144 [Thread-35-exclaim2] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: bertels
9234 [Thread-35-exclaim2] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: mike
9245 [Thread-33-exclaim2] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan
9335 [Thread-26-exclaim2] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: golda
9346 [Thread-26-exclaim2] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: golda
9437 [Thread-35-exclaim2] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
9447 [Thread-35-exclaim2] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: mike
9537 [Thread-26-exclaim2] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: golda
9548 [Thread-35-exclaim2] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
9639 [Thread-33-exclaim2] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan
9649 [Thread-35-exclaim2] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
9740 [Thread-33-exclaim2] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan
9749 [Thread-35-exclaim2] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
9841 [Thread-35-exclaim2] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: bertels
9850 [Thread-26-exclaim2] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: golda

由上面的日志可以看出,golda这个值的数据,的确归并到一个Blot处理的。线程编号:Thread-26-exclaim2。 其它值也都是相同值都是在一个线程内被处理的。


转自:http://my.oschina.net/zhzhenqin/blog/221946

相关问答

更多
  • 你好湮魂,从多个角度全面讲解Storm实时数据处理技术和最佳实践,为快速掌握并灵活应用Storm提供实用指南   从实际问题出发,系统介绍Storm的基本应用、多语言特性、完整业务系统实现和产品交付的最佳实践方法;从产品持续交付角度,分析并实践集成、测试和交付的所有步骤   《大数据技术丛书:Storm实时数据处理》涵盖搭建基于Storm的开发环境和测试实时系统的许多实用方法与实战用例,以及如何应用交付最佳实践来将系统部署至云端。
  • 回答你的问题有点难,因为Zookeeper,Maven和Nginx都做了很多不同的事情,但我会尽我所能。 Zookeeper是运行Storm的一部分。 它会跟踪通过Storm拓扑运行的当前事务。 Maven是JVM生态系统中常见的构建管理工具。 你需要这个来构建东西。 在某些情况下,Nginx可能会使用Storm拓扑,但对于所有用例来说肯定不是必需的或共同的。 因此,您可能需要项目中的所有三个,您肯定需要Zookeeper,并且几乎肯定会以某种方式与Maven交互,即使您在其他地方使用Ant或Leineng ...
  • 你的理解是正确的。 bolt的每个任务都只会打开自己与数据库服务器的连接。 Your understanding is correct. Each task of a bolt will just open its own connection to the database server.
  • 免责声明:我写了您在上述问题中引用的文章 。 但是,我对“任务”的概念有些困惑。 任务是组件的运行实例(spout还是螺栓)? 执行者有多个任务实际上是说执行者多次执行相同的组件,我是否正确? 是的,是的。 此外,在一般的并行性意义上,Storm将为一个喷口或螺栓生成一个专用的线程(执行器),但是由具有多个任务的执行器(线程)对并行性的贡献是什么? 每个执行器运行多个任务不会增加并行级别 - 执行程序总是有一个线程用于其所有任务,这意味着任务在执行程序上连续运行。 正如我在文章中写道,请注意: 在拓扑开始后 ...
  • 可能有多个问题。 while循环中没有中断 - 无限循环。 你可以调用f.readline()两次。 您可能打算在每次select后仅调用一次。 为避免阻塞,请在select后使用data = os.read(f.fileno(), 1024) 。 我不知道是否可以阻止nextTuple()直到子进程退出。 如果你所做的只是读取子过程中的行,那么你不需要select : def iter_lines(*args, DEVNULL=open(os.devnull, 'r+')): p = Popen( ...
  • 关于加缪,是的,启动工作的调度员应该工作。 他们在LinkedIn上使用的是Azkaban,你也可以看一下。 如果在另一个完成之前启动,则会读取一些数据量两次。 由于第二个作业将从第一个作业使用的相同偏移开始读取。 关于加缪与S3,目前我不认为这已经到位。 Kafka actually retains events for a configurable period of time -- events are not purged immediately upon consumption like othe ...
  • 您需要将json属性从json对象中拉出,并将两个值(json对象和String groupId)作为双值元组传递。 当您将流声明为拓扑规范逻辑的一部分时,您将为第二个字段指定名称“groupId”,并且事情应该正常工作。 如果您不想修改Kafka喷口,则需要有一个中间螺栓,其唯一目的是将groupId从json对象中分离出来。 中间螺栓还可以使用定向流(emitDirect()方法),将目标放在json对象中的groupId上。 这就是为什么我不重复使用Kafka喷口的一个原因 - 除了盲目地将数据写入流 ...
  • 好吧,没有办法像我想要的那样快速压缩。 但是我找到了解决办法,如果有人需要,我可以在这里分享。 这个问题不仅与Storm相关,而且是一个更一般的Hadoop问题。 我的所有数据都是使用HdfsBolt写入的: RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter("|"); //Synchronize data buffer with the filesystem every 1000 tuples ...
  • 我从来没有听说过DSMS一词,但是看一下维基百科上的描述,我认为Storm绝对可以说是DSMS。 来自维基百科: 它类似于数据库管理系统(DBMS)[...]但是,与DBMS相比,DSMS执行连续查询,该查询不仅执行一次,而且是永久安装的。 这听起来就像Storm一样。 但请注意,在Storm的情况下,它通常与DBMS结合使用。 例如,Storm可以提供One-time queries , unlimited secondary storage等,维基百科说这与DSMS结合时缺乏DSMS。 I had ne ...
  • 我不知道你正在使用的平台,但在C ++ 10ms是永恒的 。 我认为你正在使用错误的工具来完成工作。 使用C ++,提供一些本地查询应该不到一微秒。 触摸多个内存位置和/或必须等待磁盘或网络I / O的非本地查询别无选择,只能花费更多时间。 在这种情况下,并行性是你最好的朋友。 你必须找到瓶颈。 是I / O吗? 是CPU吗? 是内存带宽吗? 是内存访问时间吗? 在找到瓶颈之后,您可以改进它,异步它和/或乘以(=并行化)它。 I don't know the platform you're using, b ...