getting start with storm 翻译 第八章 part-2

2019-03-02 23:46|来源: 网路

转载请注明出处:http://blog.csdn.net/lonelytrooper/article/details/12435641

The Bolts

首先我们看一下该topology中的标准bolts:

public class UserSplitterBoltimplementsIBasicBolt{

private static final longserialVersionUID=1L;

@Override

public voiddeclareOutputFields(OutputFieldsDeclarerdeclarer) {

declarer.declareStream("users",newFields("txid","tweet_id","user"));

}

@Override

public Map<String,Object>getComponentConfiguration() {

return null;

}

@Override

publicvoidprepare(Map stormConf,TopologyContext context) {

}

@Override

publicvoidexecute(Tuple input,BasicOutputCollector collector) {

String tweet =input.getStringByField("tweet");

String tweetId =input.getStringByField("tweet_id");

StringTokenizer strTok =newStringTokenizer(tweet," ");

TransactionAttempt tx = (TransactionAttempt)input.getValueByField("txid");

HashSet<String>users= newHashSet<String>();

while(strTok.hasMoreTokens()) {

String user =strTok.nextToken();

// Ensure this is an actual user, and that it's not repeated in the tweet

if(user.startsWith("@") && !users.contains(user)) {

collector.emit("users",newValues(tx,tweetId,user));

users.add(user);

}

}

}

@Override

publicvoidcleanup() {

}

}

正如本章前边提到的,UserSplitterBolt接收元组,解析tweets的文本,并发送@后边的单词或者Twitter用户。HashtagSplitterBolt以一种非常简单的方式工作。

public class HashtagSplitterBoltimplementsIBasicBolt{

private static final longserialVersionUID=1L;

@Override

public voiddeclareOutputFields(OutputFieldsDeclarerdeclarer) {

declarer.declareStream("hashtags",newFields("txid","tweet_id","hashtag"));

}

@Override

public Map<String,Object>getComponentConfiguration() {

return null;

}

@Override

public voidprepare(Map stormConf,TopologyContext context) {

}

@Override

public voidexecute(Tuple input,BasicOutputCollector collector) {

String tweet =input.getStringByField("tweet");

String tweetId =input.getStringByField("tweet_id");

StringTokenizer strTok =newStringTokenizer(tweet," ");

TransactionAttempt tx = (TransactionAttempt)input.getValueByField("txid");

HashSet<String>words= newHashSet<String>();

while(strTok.hasMoreTokens()) {

String word =strTok.nextToken();

if(word.startsWith("#") && !words.contains(word)) {

collector.emit("hashtags",newValues(tx,tweetId,word));

words.add(word);

}

}

}

@Override

publicvoidcleanup() {

}

}

我们现在看下在UserHashtagJoinBolt中发生了什么。首先要注意到的最重要的事情是它是一个BaseBatchBolt。这意味着会对接收到的元组执行execute方法但不会发送任何新的元组。逐步的,当批次结束的时候,Storm会调用finishBatch方法。

public voidexecute(Tuple tuple) {

String source =tuple.getSourceStreamId();

String tweetId =tuple.getStringByField("tweet_id");

if("hashtags".equals(source)) {

String hashtag =tuple.getStringByField("hashtag");

add(tweetHashtags,tweetId,hashtag);

} else if("users".equals(source)) {

String user =tuple.getStringByField("user");

add(userTweets,user,tweetId);

}

}

因为你需要将一条tweet中所有的标签与该tweet中提到的用户关联起来并且计数他们出现的次数,你需要对前边bolt的两条流做连接。对整个批次都这样处理,一旦完成了,finishBatch方法会被调用。

@Override

publicvoidfinishBatch() {

for(String user:userTweets.keySet()) {

Set<String>tweets= getUserTweets(user);

HashMap<String,Integer>hashtagsCounter =new HashMap<String,Integer>();

for(String tweet:tweets) {

Set<String>hashtags= getTweetHashtags(tweet);

if(hashtags!=null) {

for(String hashtag:hashtags) {

Integer count =hashtagsCounter.get(hashtag);

if(count==null)

count =0;

count ++;

hashtagsCounter.put(hashtag,count);

}

}

}

for (String hashtag:hashtagsCounter.keySet()) {

int count=hashtagsCounter.get(hashtag);

collector.emit(newValues(id,user,hashtag, count));

}

}

}

在该方法中,对每一个用户--标签以及它出现的次数,生成并发射一个元组。

你可以在GitHub看到完整的可下载的代码。

提交者 Bolts

正如你已经知道的,在topology中批量的元组被协调器和发射器发送。这些批量的元组被并行的处理,并没有特定的顺序。

coordinator bolts或者是实现了ICommitter接口的特殊批量bolts,或者它在TransactionalTopologyBuilder中被用setCommiterBolt方法设置过。它与常规的批量bolts的主要不同在于当该批次准备好被提交时会执行提交者 bolts的finishBatch方法。这在所有前边的事务被成功的提交后会发生。另外,finishBatch方法被顺序的执行。所以,当事务ID为1的批次和事务ID为2的批次在topology中被并行的处理时,正在处理事务ID为2的批次的提交者bolt的finishBatch方法只有在事务ID为1的批次的finishBatch方法结束并且没有任何错误的情况下才会被执行。

该类的实现如下:

public class RedisCommiterCommiterBoltextendsBaseTransactionalBolt

implements ICommitter{

public static final String LAST_COMMITED_TRANSACTION_FIELD="LAST_COMMIT";

TransactionAttempt id;

BatchOutputCollector collector;

Jedis jedis;

@Override

public voidprepare(Map conf,TopologyContext context,

BatchOutputCollector collector,TransactionAttempt id) {

this.id=id;

this.collector=collector;

this.jedis=newJedis("localhost");

}

HashMap<String,Long>hashtags = new HashMap<String,Long>();

HashMap<String,Long>users = newHashMap<String,Long>();

HashMap<String,Long>usersHashtags =new HashMap<String,Long>();

private voidcount(HashMap<String,Long>map, String key,intcount) {

Long value =map.get(key);

if(value==null)

value = (long)0;

value +=count;

map.put(key,value);

}

@Override

public voidexecute(Tuple tuple) {

String origin =tuple.getSourceComponent();

if("users-splitter".equals(origin)) {

String user =tuple.getStringByField("user");

count(users,user,1);

} else if("hashtag-splitter".equals(origin)) {

String hashtag =tuple.getStringByField("hashtag");

count(hashtags,hashtag,1);

} else if("user-hashtag-merger".equals(origin)) {

String hashtag =tuple.getStringByField("hashtag");

String user =tuple.getStringByField("user");

String key =user+ ":" + hashtag;

Integer count =tuple.getIntegerByField("count");

count(usersHashtags,key,count);

}

}

@Override

publicvoidfinishBatch() {

String lastCommitedTransaction = jedis.get(LAST_COMMITED_TRANSACTION_FIELD);

String currentTransaction =""+id.getTransactionId();

if(currentTransaction.equals(lastCommitedTransaction))

return;

Transaction multi =jedis.multi();

multi.set(LAST_COMMITED_TRANSACTION_FIELD,currentTransaction);

Set<String>keys= hashtags.keySet();

for(String hashtag:keys) {

Long count =hashtags.get(hashtag);

multi.hincrBy("hashtags",hashtag,count);

}

keys =users.keySet();

for(String user:keys) {

Long count = users.get(user);

multi.hincrBy("users",user,count);

}

keys =usersHashtags.keySet();

for(String key:keys) {

Long count =usersHashtags.get(key);

multi.hincrBy("users_hashtags",key,count);

}

multi.exec();

}

@Override

publicvoiddeclareOutputFields(OutputFieldsDeclarer declarer) {

}

}

这些都很直观,但是在finishBatch方法中有一个非常重要的细节。

...

multi.set(LAST_COMMITED_TRANSACTION_FIELD,currentTransaction);

...

这里你正在存储上一个被提交的事务ID到数据库。你为什么要那样做?记住当一个事务失败时,如果有必要的话Storm将重放它足够多次。如果你不确定你已经处理过该事务,那么你可以高估,这样整个topology的事务性含义都没意义了。所以记住:存储上一个被提交的事务ID并且提交前核对它。


转自:http://www.cnblogs.com/keanuyaoo/p/3357883

相关问答

更多
  • 请问你是想把你输入的数字保存到文本区的,,, 而大于1000个字符后,,,你无法确定是否需要保存到文本区吗? 对于以上, 我的理解是,你完全可以设定文本框最大字符长度。。
  • charge up原意充电、提价,此句与a storm意思是索取很多,即大量借款
  • Hi: 下列还有三个短语相同意思: cook up a storm烹饪上露一手 dance up a storm舞姿翩翩 talk up a storm 侃侃而谈 [MAINLY US INFORMAL:主要用于美国,非正式用语] to do something with a lot of energy and often skill:[以非凡的能力和技能做某事] Rob was in the kitchen cooking up a storm. Rob在厨房里大显身手. (from Cambridge ...
  • 当下雨的时候,风暴都到了
  • 你可以使用fieldsGrouping 。 您可以声明一个字段,通过该字段对元组进行分组(在您的情况下为id )。 我只是假设您的输入流是具有id和body字段的JSON对象 {"id":"1234","body":"some body"} 还假设您的拓扑结构有一个喷口,两个螺栓,即BoltA和BoltB。 在BoltB中,覆盖declareOutputFields方法并填写详细信息。 public void declareOutputFields(OutputFieldsDeclarer declare ...
  • 当你注意到脚本是一个bash文件。 该功能(显示在Storm UI中日志末端偏移喷口多远)目前无法在Windows上运行。 这有点不方便,但它不应妨碍您的拓扑正常工作。 如果您想知道喷口的日志偏移距离有多远,可以在其中一台Kafka机器上使用kafka-consumer-groups.bat脚本。 如果您使用storm-kafka-client喷口,喷口滞后也是该组件提供的指标的一部分。 我希望我们可以在某些时候摆脱storm-kafka-monitor (它有一些其他问题https://issues.ap ...
  • 让我们说你的风暴在/ home / your / download / storm解压缩 我将其定义为env变量 STORM_HOME="/home/your/download/storm" 因此您可以通过以下方式导出此变量 export STORM_HOME 不要忘记导入路径: export PATH=$PATH:$STORM_HOME/bin 然后你可以在其中使用storm命令。 Let us say your storm is unpacked at /home/your/download/storm ...
  • 我终于来了! 我只是简单了,我创建了一个非常简单的redis bolt,它发布了内容,我监视了redis数据库,它正在运行。 我的工作螺栓: public class RedisBolt implements IRichBolt { protected String channel = "Somriures"; // protected String configChannel; protected OutputCollector collector ...
  • 在系统变量的路径中使用PROGRA~1而不是Program文件 。 例如: C:\Program Files\Java\jdk1.6.0_45 ==> C:\PROGRA~1\Java\jdk1.6.0_45 Use PROGRA~1 instead of Program files in System variables' path. For example: C:\Program Files\Java\jdk1.6.0_45 ==> C:\PROGRA~1\Java\jdk1.6.0_45
  • 看起来像带有bin文件夹的“Storm.cmd”具有错误的主管路径类路径。 在cmd文件中,路径显示为:CLASS = org.apache.storm.daemon.supervisor 而将它与python脚本进行比较时,路径为:klass =“org.apache.storm.daemon.supervisor.Supervisor” 我们将storm.cmd更新为与python中的相同,然后它工作了。 希望有所帮助 It looks like "Storm.cmd" withing the bin ...