知识点
相关文章
更多最近更新
更多storm starter学习(二) - 流聚合
2019-03-02 23:57|来源: 网路
SingleJoinExample示例说明了storm中流聚合的应用,将具有相同tuple属性的数据流整合成一个新的数据流。来看一下Topology。先定义两个数据源genderSpout和ageSpout,Fields分别为("id", "gender")、("id", "age"),最终聚合后的数据流按id进行分组,输出为("gender", "age")。具体Topology如下:
// 定义数据源 FeederSpout genderSpout = new FeederSpout(new Fields("id", "gender")); FeederSpout ageSpout = new FeederSpout(new Fields("id", "age")); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("gender", genderSpout); builder.setSpout("age", ageSpout); builder.setBolt("join", new SingleJoinBolt(new Fields("gender", "age"))).fieldsGrouping("gender", new Fields("id")) .fieldsGrouping("age", new Fields("id")); //数据流聚合拓扑中流聚合的主要功能在SingleJoinBolt下,先来看一下prepare方法。
public void prepare(Map conf, TopologyContext context, OutputCollector collector) { _fieldLocations = new HashMap<String, GlobalStreamId>(); _collector = collector; int timeout = ((Number) conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue(); _pending = new TimeCacheMap<List<Object>, Map<GlobalStreamId, Tuple>>(timeout, new ExpireCallback()); _numSources = context.getThisSources().size(); Set<String> idFields = null; for (GlobalStreamId source : context.getThisSources().keySet()) { Fields fields = context.getComponentOutputFields(source.get_componentId(), source.get_streamId()); Set<String> setFields = new HashSet<String>(fields.toList()); if (idFields == null) idFields = setFields; else idFields.retainAll(setFields); for (String outfield : _outFields) { for (String sourcefield : fields) { if (outfield.equals(sourcefield)) { _fieldLocations.put(outfield, source); } } } } _idFields = new Fields(new ArrayList<String>(idFields)); if (_fieldLocations.size() != _outFields.size()) { throw new RuntimeException("Cannot find all outfields among sources"); } }首先在处理开始的地方,使用了TimeCacheMap。使用它的目的是,由于bolt在接收两个数据源的流数据时,同一id的两个数据流很可能不会在一个时间点内同时发出,因此需要对数据流先进行缓存,直到所有id相同的数据源都收到后再进行聚合处理,做完聚合处理后再将对应的tuple信息从缓存中删除。在处理过程中,有可能会出现某些id的tuple丢失,导致缓存中对应该id的其他tuples一直缓存在内存中,解决此问题的方法是设置timeout时间,定期清理过期tuples发送fail信息给spout。超时时间的大小设置需要结合具体应用的进行判断,尽量保证相同id的tuples会在较短的时间间隔内发送给bolt,避免重复timeout事件的放生。
TimeCacheMap中ExpireCallback方法如下:
private class ExpireCallback implements TimeCacheMap.ExpiredCallback<List<Object>, Map<GlobalStreamId, Tuple>> { @Override public void expire(List<Object> id, Map<GlobalStreamId, Tuple> tuples) { for (Tuple tuple : tuples.values()) { _collector.fail(tuple); } } }接下来在prepare中遍历TopologyContext中不同数据源,得到所有数据源(genderSpout和ageSpout),使用retainAll方法提取Set中公共的Filed信息,保存到变量_idFields中(id),将_outFileds中字段所在数据源记录下来,保存到_fieldLocations,以便在聚合时获取对应的字段值。
excute方法是执行最后的流聚合功能,代码如下:
public void execute(Tuple tuple) { List<Object> id = tuple.select(_idFields); GlobalStreamId streamId = new GlobalStreamId(tuple.getSourceComponent(), tuple.getSourceStreamId()); if (!_pending.containsKey(id)) { _pending.put(id, new HashMap<GlobalStreamId, Tuple>()); } Map<GlobalStreamId, Tuple> parts = _pending.get(id); if (parts.containsKey(streamId)) throw new RuntimeException("Received same side of single join twice"); parts.put(streamId, tuple); if (parts.size() == _numSources) { _pending.remove(id); List<Object> joinResult = new ArrayList<Object>(); for (String outField : _outFields) { GlobalStreamId loc = _fieldLocations.get(outField); joinResult.add(parts.get(loc).getValueByField(outField)); } _collector.emit(new ArrayList<Tuple>(parts.values()), joinResult); for (Tuple part : parts.values()) { _collector.ack(part); } } }从tuple中取出id Fields信息和GlobalStreamId,判断当前id是否在_pending中存在,如不存在将当前数据放入到 _pending中。然后根据id来获取parts中对应的信息,如存在相同流id信息时,抛出异常:接收到来自同一Spout中id一致的tuple信息。不存在则放入到parts里。
如果parts已经包含了聚合数据源的个数_numSources时,本例中为2,表示相同id从genderSpout和ageSpout中发出的tuple都已经收到,可以进行聚合处理。从_pending队列中移除这条记录,然后开始构造聚合后的结果字段:依次遍历_outFields中各个字段,从_fieldLocations中取到这些outFiled字段对应的GlobalStreamId,紧接着从parts中取出GlobalStreamId对应的outFiled,放入聚合后的结果中。ack所有tuple。否则,继续等待两spout的流数据,直到缓存的数据源个数达到聚合要求。
最后,声明聚合后的输出字段,见declareOutputFields:
public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(_outFields); }总结:
在此示例中使用一种叫做TimeCacheMap的数据结构,用于在内存中保存近期活跃的对象,它的实现非常地高效,而且可以自动删除过期不再活跃的对象。
转自:http://my.oschina.net/u/262605/blog/299938
相关问答
更多-
好好利用风暴?(Good use of storm?)[2021-12-04]
在风暴谷歌集团找到答案。 似乎DRCP拓扑将发出一个元组,其中包含由DRCP spout作为流接收的参数,然后在处理完成时指示回(使用称为请求ID的唯一标识)。 在同一个线程中,hadoop可能最适合这些情况,除非数据不够大,并且可以一直处理。 Found the answer in the storm google group. Seems that DRCP topologies will emit a tuple with parameters that is received by DRCP spo ... -
在拓扑运行时无法更改拓扑结构。 您需要kill拓扑并在之后重新部署新版本。 拓扑运行时可以更改的唯一参数是并行度。 有关更多详细信息,请参见此处: https : //storm.apache.org/releases/1.0.0/Understanding-the-parallelism-of-a-Storm-topology.html It is not possible to change the structure of a topology while it is running. You nee ...
-
回答你的问题有点难,因为Zookeeper,Maven和Nginx都做了很多不同的事情,但我会尽我所能。 Zookeeper是运行Storm的一部分。 它会跟踪通过Storm拓扑运行的当前事务。 Maven是JVM生态系统中常见的构建管理工具。 你需要这个来构建东西。 在某些情况下,Nginx可能会使用Storm拓扑,但对于所有用例来说肯定不是必需的或共同的。 因此,您可能需要项目中的所有三个,您肯定需要Zookeeper,并且几乎肯定会以某种方式与Maven交互,即使您在其他地方使用Ant或Leineng ...
-
免责声明:我写了您在上述问题中引用的文章 。 但是,我对“任务”的概念有些困惑。 任务是组件的运行实例(spout还是螺栓)? 执行者有多个任务实际上是说执行者多次执行相同的组件,我是否正确? 是的,是的。 此外,在一般的并行性意义上,Storm将为一个喷口或螺栓生成一个专用的线程(执行器),但是由具有多个任务的执行器(线程)对并行性的贡献是什么? 每个执行器运行多个任务不会增加并行级别 - 执行程序总是有一个线程用于其所有任务,这意味着任务在执行程序上连续运行。 正如我在文章中写道,请注意: 在拓扑开始后 ...
-
关于加缪,是的,启动工作的调度员应该工作。 他们在LinkedIn上使用的是Azkaban,你也可以看一下。 如果在另一个完成之前启动,则会读取一些数据量两次。 由于第二个作业将从第一个作业使用的相同偏移开始读取。 关于加缪与S3,目前我不认为这已经到位。 Kafka actually retains events for a configurable period of time -- events are not purged immediately upon consumption like othe ...
-
建立风暴启动者:得到错误“POM for storm-core missing”(building storm-starter: getting error “POM for storm-core missing”)[2023-03-01]
我遇到了同样的问题,并从storm-user邮件列表档案中找到了答案,请参阅 http://mail-archives.apache.org/mod_mbox/storm-user/201404.mbox/%3C72091F29-8109-4A1C-A79D-DDD2D871737B%40petrolink.com%3E I encountered the same problem, and found the answer from storm-user mailing list archives, re ... -
Logstash不是集群流处理系统。 它只是一个基于JVM的进程。 最新版本支持磁盘缓冲,但没有与Spark或Storm几乎相同的交付保证。 看看http://storm.apache.org/releases/1.0.3/Guaranteeing-message-processing.html 是但不确定为什么首先使用Elastic存储数据。 为什么不HDFS-> SparkML->弹性? 这里要考虑的主要是管理模型,培训和测试。 Logstash is not cluster stream proces ...
-
好吧,没有办法像我想要的那样快速压缩。 但是我找到了解决办法,如果有人需要,我可以在这里分享。 这个问题不仅与Storm相关,而且是一个更一般的Hadoop问题。 我的所有数据都是使用HdfsBolt写入的: RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter("|"); //Synchronize data buffer with the filesystem every 1000 tuples ...
-
这通常是由风暴客户端和风暴启动器版本不一致造成的。 请尝试按照以下步骤操作该示例。 从http://storm.apache.org/downloads.html下载最新版本 在这个例子中,我们将使用版本1.1.1 将其解压缩到一个文件夹,我们称之为${STORM_HOME} cd到${STORM_HOME}/examples/storm-starter 执行mvn package -DskipTests=true 这应该在目标文件夹中构建风暴启动器jar ${STORM_HOME}/examples/st ...
-
Apache Storm - 风暴启动器上的ClassNotFoundException(Apache Storm - ClassNotFoundException on storm-starter)[2023-04-25]
通过maven运行暴风雨拓扑不是要走的路。 您应该在命令行上使用bin/storm jar myJarFile.jar向集群提交拓扑(这也适用于本地模式)。 文件./target/original-storm-starter-0.11.0-SNAPSHOT.jar和./target/storm-starter-0.11.0-SNAPSHOT.jar是标准的maven工件,不能用于向集群提交拓扑。 您可以使用maven-jar-plugin (我建议您开始使用 - 您可能还需要使用maven-dependen ...