storm starter学习(二) - 流聚合

2019-03-02 23:57|来源: 网路

    SingleJoinExample示例说明了storm中流聚合的应用,将具有相同tuple属性的数据流整合成一个新的数据流。来看一下Topology。先定义两个数据源genderSpout和ageSpout,Fields分别为("id", "gender")、("id", "age"),最终聚合后的数据流按id进行分组,输出为("gender", "age")。具体Topology如下:


// 定义数据源
FeederSpout genderSpout = new FeederSpout(new Fields("id", "gender"));
FeederSpout ageSpout = new FeederSpout(new Fields("id", "age"));

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("gender", genderSpout);
builder.setSpout("age", ageSpout);
builder.setBolt("join", new SingleJoinBolt(new Fields("gender", "age"))).fieldsGrouping("gender", new Fields("id"))
        .fieldsGrouping("age", new Fields("id")); //数据流聚合
    拓扑中流聚合的主要功能在SingleJoinBolt下,先来看一下prepare方法。



public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
    _fieldLocations = new HashMap<String, GlobalStreamId>();
    _collector = collector;
    int timeout = ((Number) conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();
    _pending = new TimeCacheMap<List<Object>, Map<GlobalStreamId, Tuple>>(timeout, new ExpireCallback());
    _numSources = context.getThisSources().size();
    Set<String> idFields = null;
    for (GlobalStreamId source : context.getThisSources().keySet()) {
      Fields fields = context.getComponentOutputFields(source.get_componentId(), source.get_streamId());
      Set<String> setFields = new HashSet<String>(fields.toList());
      if (idFields == null)
        idFields = setFields;
      else
        idFields.retainAll(setFields);

      for (String outfield : _outFields) {
        for (String sourcefield : fields) {
          if (outfield.equals(sourcefield)) {
            _fieldLocations.put(outfield, source);
          }
        }
      }
    }
    _idFields = new Fields(new ArrayList<String>(idFields));

    if (_fieldLocations.size() != _outFields.size()) {
      throw new RuntimeException("Cannot find all outfields among sources");
    }
  }
    首先在处理开始的地方,使用了TimeCacheMap。使用它的目的是,由于bolt在接收两个数据源的流数据时,同一id的两个数据流很可能不会在一个时间点内同时发出,因此需要对数据流先进行缓存,直到所有id相同的数据源都收到后再进行聚合处理,做完聚合处理后再将对应的tuple信息从缓存中删除。在处理过程中,有可能会出现某些id的tuple丢失,导致缓存中对应该id的其他tuples一直缓存在内存中,解决此问题的方法是设置timeout时间,定期清理过期tuples发送fail信息给spout。超时时间的大小设置需要结合具体应用的进行判断,尽量保证相同id的tuples会在较短的时间间隔内发送给bolt,避免重复timeout事件的放生。


    TimeCacheMap中ExpireCallback方法如下:


private class ExpireCallback implements TimeCacheMap.ExpiredCallback<List<Object>, Map<GlobalStreamId, Tuple>> {
    @Override
    public void expire(List<Object> id, Map<GlobalStreamId, Tuple> tuples) {
      for (Tuple tuple : tuples.values()) {
        _collector.fail(tuple);
      }
    }
}
    接下来在prepare中遍历TopologyContext中不同数据源,得到所有数据源(genderSpout和ageSpout),使用retainAll方法提取Set中公共的Filed信息,保存到变量_idFields中(id),将_outFileds中字段所在数据源记录下来,保存到_fieldLocations,以便在聚合时获取对应的字段值。


    excute方法是执行最后的流聚合功能,代码如下:


public void execute(Tuple tuple) {
    List<Object> id = tuple.select(_idFields);
    GlobalStreamId streamId = new GlobalStreamId(tuple.getSourceComponent(), tuple.getSourceStreamId());
    if (!_pending.containsKey(id)) {
      _pending.put(id, new HashMap<GlobalStreamId, Tuple>());
    }
    Map<GlobalStreamId, Tuple> parts = _pending.get(id);
    if (parts.containsKey(streamId))
      throw new RuntimeException("Received same side of single join twice");
    parts.put(streamId, tuple);
    if (parts.size() == _numSources) {
      _pending.remove(id);
      List<Object> joinResult = new ArrayList<Object>();
      for (String outField : _outFields) {
        GlobalStreamId loc = _fieldLocations.get(outField);
        joinResult.add(parts.get(loc).getValueByField(outField));
      }
      _collector.emit(new ArrayList<Tuple>(parts.values()), joinResult);

      for (Tuple part : parts.values()) {
        _collector.ack(part);
      }
    }
}
    从tuple中取出id Fields信息和GlobalStreamId,判断当前id是否在_pending中存在,如不存在将当前数据放入到 _pending中。然后根据id来获取parts中对应的信息,如存在相同流id信息时,抛出异常:接收到来自同一Spout中id一致的tuple信息。不存在则放入到parts里。


    如果parts已经包含了聚合数据源的个数_numSources时,本例中为2,表示相同id从genderSpout和ageSpout中发出的tuple都已经收到,可以进行聚合处理。从_pending队列中移除这条记录,然后开始构造聚合后的结果字段:依次遍历_outFields中各个字段,从_fieldLocations中取到这些outFiled字段对应的GlobalStreamId,紧接着从parts中取出GlobalStreamId对应的outFiled,放入聚合后的结果中。ack所有tuple。否则,继续等待两spout的流数据,直到缓存的数据源个数达到聚合要求。

    最后,声明聚合后的输出字段,见declareOutputFields:


public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(_outFields);
}
    总结:


    在此示例中使用一种叫做TimeCacheMap的数据结构,用于在内存中保存近期活跃的对象,它的实现非常地高效,而且可以自动删除过期不再活跃的对象。


转自:http://my.oschina.net/u/262605/blog/299938

相关问答

更多
  • 在风暴谷歌集团找到答案。 似乎DRCP拓扑将发出一个元组,其中包含由DRCP spout作为流接收的参数,然后在处理完成时指示回(使用称为请求ID的唯一标识)。 在同一个线程中,hadoop可能最适合这些情况,除非数据不够大,并且可以一直处理。 Found the answer in the storm google group. Seems that DRCP topologies will emit a tuple with parameters that is received by DRCP spo ...
  • 在拓扑运行时无法更改拓扑结构。 您需要kill拓扑并在之后重新部署新版本。 拓扑运行时可以更改的唯一参数是并行度。 有关更多详细信息,请参见此处: https : //storm.apache.org/releases/1.0.0/Understanding-the-parallelism-of-a-Storm-topology.html It is not possible to change the structure of a topology while it is running. You nee ...
  • 回答你的问题有点难,因为Zookeeper,Maven和Nginx都做了很多不同的事情,但我会尽我所能。 Zookeeper是运行Storm的一部分。 它会跟踪通过Storm拓扑运行的当前事务。 Maven是JVM生态系统中常见的构建管理工具。 你需要这个来构建东西。 在某些情况下,Nginx可能会使用Storm拓扑,但对于所有用例来说肯定不是必需的或共同的。 因此,您可能需要项目中的所有三个,您肯定需要Zookeeper,并且几乎肯定会以某种方式与Maven交互,即使您在其他地方使用Ant或Leineng ...
  • 免责声明:我写了您在上述问题中引用的文章 。 但是,我对“任务”的概念有些困惑。 任务是组件的运行实例(spout还是螺栓)? 执行者有多个任务实际上是说执行者多次执行相同的组件,我是否正确? 是的,是的。 此外,在一般的并行性意义上,Storm将为一个喷口或螺栓生成一个专用的线程(执行器),但是由具有多个任务的执行器(线程)对并行性的贡献是什么? 每个执行器运行多个任务不会增加并行级别 - 执行程序总是有一个线程用于其所有任务,这意味着任务在执行程序上连续运行。 正如我在文章中写道,请注意: 在拓扑开始后 ...
  • 关于加缪,是的,启动工作的调度员应该工作。 他们在LinkedIn上使用的是Azkaban,你也可以看一下。 如果在另一个完成之前启动,则会读取一些数据量两次。 由于第二个作业将从第一个作业使用的相同偏移开始读取。 关于加缪与S3,目前我不认为这已经到位。 Kafka actually retains events for a configurable period of time -- events are not purged immediately upon consumption like othe ...
  • 我遇到了同样的问题,并从storm-user邮件列表档案中找到了答案,请参阅 http://mail-archives.apache.org/mod_mbox/storm-user/201404.mbox/%3C72091F29-8109-4A1C-A79D-DDD2D871737B%40petrolink.com%3E I encountered the same problem, and found the answer from storm-user mailing list archives, re ...
  • Logstash不是集群流处理系统。 它只是一个基于JVM的进程。 最新版本支持磁盘缓冲,但没有与Spark或Storm几乎相同的交付保证。 看看http://storm.apache.org/releases/1.0.3/Guaranteeing-message-processing.html 是但不确定为什么首先使用Elastic存储数据。 为什么不HDFS-> SparkML->弹性? 这里要考虑的主要是管理模型,培训和测试。 Logstash is not cluster stream proces ...
  • 好吧,没有办法像我想要的那样快速压缩。 但是我找到了解决办法,如果有人需要,我可以在这里分享。 这个问题不仅与Storm相关,而且是一个更一般的Hadoop问题。 我的所有数据都是使用HdfsBolt写入的: RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter("|"); //Synchronize data buffer with the filesystem every 1000 tuples ...
  • 这通常是由风暴客户端和风暴启动器版本不一致造成的。 请尝试按照以下步骤操作该示例。 从http://storm.apache.org/downloads.html下载最新版本 在这个例子中,我们将使用版本1.1.1 将其解压缩到一个文件夹,我们称之为${STORM_HOME} cd到${STORM_HOME}/examples/storm-starter 执行mvn package -DskipTests=true 这应该在目标文件夹中构建风暴启动器jar ${STORM_HOME}/examples/st ...
  • 通过maven运行暴风雨拓扑不是要走的路。 您应该在命令行上使用bin/storm jar myJarFile.jar向集群提交拓扑(这也适用于本地模式)。 文件./target/original-storm-starter-0.11.0-SNAPSHOT.jar和./target/storm-starter-0.11.0-SNAPSHOT.jar是标准的maven工件,不能用于向集群提交拓扑。 您可以使用maven-jar-plugin (我建议您开始使用 - 您可能还需要使用maven-dependen ...