Storm常见模式——求TOP N

2019-03-02 23:44|来源: 网路

Storm的另一种常见模式是对流式数据进行所谓“streaming top N”的计算,它的特点是持续的在内存中按照某个统计指标(如出现次数)计算TOP N,然后每隔一定时间间隔输出实时计算后的TOP N结果。

流式数据的TOP N计算的应用场景很多,例如计算twitter上最近一段时间内的热门话题、热门点击图片等等。

下面结合Storm-Starter中的例子,介绍一种可以很容易进行扩展的实现方法:首先,在多台机器上并行的运行多个Bolt,每个Bolt负责一部分数据的TOP N计算,然后再有一个全局的Bolt来合并这些机器上计算出来的TOP N结果,合并后得到最终全局的TOP N结果。

该部分示例代码的入口是RollingTopWords类,用于计算文档中出现次数最多的N个单词。首先看一下这个Topology结构:

Topology构建的代码如下:

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("word", new TestWordSpout(), 5);
        builder.setBolt("count", new RollingCountObjects(60, 10), 4)
                 .fieldsGrouping("word", new Fields("word"));
        builder.setBolt("rank", new RankObjects(TOP_N), 4)
                 .fieldsGrouping("count", new Fields("obj"));
        builder.setBolt("merge", new MergeObjects(TOP_N))
                 .globalGrouping("rank");

1)首先,TestWordSpout()Topology的数据源Spout,持续随机生成单词发出去,产生数据流“word”,输出Fields“word”,核心代码如下:

    public void nextTuple() {
        Utils.sleep(100);
        final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
        final Random rand = new Random();
        final String word = words[rand.nextInt(words.length)];
        _collector.emit(new Values(word));
  }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
  }

2)接下来,“word”流入RollingCountObjects这个Bolt中进行word count计算,为了保证同一个word的数据被发送到同一个Bolt中进行处理,按照“word”字段进行field grouping;在RollingCountObjects中会计算各个word的出现次数,然后产生“count”流,输出“obj”“count”两个Field,核心代码如下

    public void execute(Tuple tuple) {

        Object obj = tuple.getValue(0);
        int bucket = currentBucket(_numBuckets);
        synchronized(_objectCounts) {
            long[] curr = _objectCounts.get(obj);
            if(curr==null) {
                curr = new long[_numBuckets];
                _objectCounts.put(obj, curr);
            }
            curr[bucket]++;
            _collector.emit(new Values(obj, totalObjects(obj)));
            _collector.ack(tuple);
        }
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("obj", "count"));
    }

3)然后,RankObjects这个Bolt按照“count”流的“obj”字段进行field grouping;在Bolt内维护TOP N个有序的单词,如果超过TOP N个单词,则将排在最后的单词踢掉,同时每个一定时间(2秒)产生“rank”流,输出“list”字段,输出TOP N计算结果到下一级数据流“merge”流,核心代码如下:

    public void execute(Tuple tuple, BasicOutputCollector collector) {
        Object tag = tuple.getValue(0);
        Integer existingIndex = _find(tag);
        if (null != existingIndex) {
            _rankings.set(existingIndex, tuple.getValues());
        } else {
            _rankings.add(tuple.getValues());
        }
        Collections.sort(_rankings, new Comparator<List>() {
            public int compare(List o1, List o2) {
                return _compare(o1, o2);
            }
        });
        if (_rankings.size() > _count) {
            _rankings.remove(_count);
        }
        long currentTime = System.currentTimeMillis();
        if(_lastTime==null || currentTime >= _lastTime + 2000) {
            collector.emit(new Values(new ArrayList(_rankings)));
            _lastTime = currentTime;
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("list"));
    }

4)最后,MergeObjects这个Bolt按照“rank”流的进行全局的grouping,即所有上一级Bolt产生的“rank”流都流到这个“merge”流进行;MergeObjects的计算逻辑和RankObjects类似,只是将各个RankObjectsBolt合并后计算得到最终全局的TOP N结果,核心代码如下:

    public void execute(Tuple tuple, BasicOutputCollector collector) {
        List<List> merging = (List) tuple.getValue(0);
        for(List pair : merging) {
            Integer existingIndex = _find(pair.get(0));
            if (null != existingIndex) {
                _rankings.set(existingIndex, pair);
            } else {
                _rankings.add(pair);
            }

            Collections.sort(_rankings, new Comparator<List>() {
                public int compare(List o1, List o2) {
                    return _compare(o1, o2);
                }
            });

            if (_rankings.size() > _count) {
                _rankings.subList(_count, _rankings.size()).clear();
            }
        }

        long currentTime = System.currentTimeMillis();
        if(_lastTime==null || currentTime >= _lastTime + 2000) {
            collector.emit(new Values(new ArrayList(_rankings)));
            LOG.info("Rankings: " + _rankings);
            _lastTime = currentTime;
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("list"));
    }

关于上述例子的几点说明:

(1) 为什么要有RankObjectsMergeObjects两级的Bolt来计算呢?

其实,计算TOP N的一个最简单的思路是直接使用一个Bolt(通过类似于RankObjects的类实现)来做全局的求TOP N操作。

但是,这种方式的明显缺点在于受限于单台机器的处理能力。

(2) 如何保证计算结果的正确性?

首先通过field grouping将同一个word的计算放到同一个Bolt上处理;最后有一个全局的global grouping汇总得到TOP N

这样可以做到最大可能并行性,同时也能保证计算结果的正确。

(3) 如果当前计算资源无法满足计算TOP N,该怎么办?

这个问题本质上就是系统的可扩展性问题,基本的解决方法就是尽可能做到在多个机器上的并行计算过程,针对上面的Topology结构:

a) 可以通过增加每一级处理单元Bolt的数量,减少每个Bolt处理的数据规模;

b) 可以通过增加一级或多级Bolt处理单元,减少最终汇总处理的数据规模。

本文参考代码见:https://github.com/nathanmarz/storm-starter


转自:http://www.cnblogs.com/panfeng412/archive/2012/06/16/storm-common-patterns-of-streaming-top-n

相关问答

更多
  • 这是之前用C 写过的. #include #include main(){int f[10000],a[10000],i,j,k,t,n;while(scanf("%d",&n)!=EOF){memset(f,0,sizeof(f));f[0]=1;for(i=1;i =0;i--){if(f[i]!=0){k=i;break;}}for(i=k;i>=0;i--){printf("%d",f[i]);}printf("\n");}} 等下补上一个用Java 写的 //表示一个数字,使用 科学计数法。如50 ...
  • #include   long fac(unsigned n)   {   if (n == 0 || n == 1)   {   return 1;   }   else   {   return (n * fac(n - 1));   }   }   int main()   {   unsigned ui;   printf("输入计算的 阶乘:");   scanf("%u", &ui);   printf("%u的阶乘为:%u\n", ui, fac(ui));   return 0;   }
  • 如果[]的表达式返回false,则执行or ||之后的操作 (并且exit 0 )。 否则,它将短路并且将评估下一个语句。 If the expression in [] returns false, do the thing after the or || (and exit 0). Otherwise, it will short circuit and the next statement will be evaluated.
  • 您需要将范围运算符放在分组构造之后。 (?:\d[\s-]*){16} 说明 : (?: # group, but do not capture (16 times): \d # digits (0-9) [\s-]* # any character of: whitespace, '-' (0 or more times) ){16} # end of grouping 注意: | 在一个字符类中不是一个交替操作符,它按字面意思匹配字符。 You ...
  • 在我看来,在比特串中找到最右边的1。 看看以下两种情况: 这个数字很奇怪。 在这种情况下(n-1)是没有设置最后一位的相同编号。 反过来,你会得到一个0的掩码,其中原始数字为1,除了最后一个点。 例如:n = 01101,(n-1)= 01100,〜(n-1)= 10011,n&〜(n-1)= 01101&10011 = 00001 这个数字是偶数。 然后,与情况1相同,除了在AND操作期间,最后1的右边的所有位都是0。 例子:n = 01100,(n-1)= 01011,〜(n-1)= 10100,n&( ...
  • 可以根据需要在n和2 k之间来回切换,因为你假设n = 2 k 。 但是,这并不意味着您拥有的内容是正确的。 请记住,当n减小时,n log n的值也会不断减小,因此声明不是这种情况 线i = W(2 ki )+ i * n lg n 是真的。 让我们再一次使用迭代方法: W(n)= W(n / 2)+ n log n =(W(n / 4)+(n / 2)log(n / 2))+ n log n = W(n / 4)+(n / 2)(log n-1)+ n log n = W(n / 4)+ n log n ...
  • 当使用double数学时,日志结果或商可能不完全是数学结果,但1(或2)下一个可以表示double 。 计算log()只会返回log(0)的精确数学结果,所有其他数学结果都是非理性的。 一切都是理性的。 这可能会导致像29.999999 ...这样的答案,其保存为int为29。 推荐使用整数数学来代替 int mylog2(unsigned long x) { int y = 0; #if (ULONG_MAX>>16 > 1lu<<16) if (x >= 1lu<<32) { x >>= ...
  • 您可以编写一个使用列表初始化的Comparator类。 然后,您可以使用列表和比较器调用Collections.sort()。 代码可能如下所示: import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.HashSet; import java.util.List; import java.util.Set; public class FrequencyC ...
  • 我想我找到了答案。 在群集中,可能会运行更多拓扑。 设A , B , C , D为4个拓扑,在同一个集群中运行。 这是我的情况。 当您启动拓扑时,他们将为每个拓扑分配连续的数字,但是每个群集 (这是我的错误)。 因此我们从: A-1-... B-2-... C-3-... D-4-... 如果你重新启动C ,你就可以了 C-5... 那么C-4在哪里? 它根本不存在,因为D已经占用了4 。 因此,从n yo n+2跳过是正常的。 您可能会发现n+1分配给另一个拓扑。 (QED) I guess I've ...
  • 正如Jürgen所说,你需要使用set-buffer-file-coding-system 。 你可以说 (set-buffer-file-coding-system 'unix) 并将其粘贴到find-file-hook的函数中,以便为您打开的所有缓冲区设置它。 或者,您可以将其放在write-file-hook列表中,以便在将文件转储到磁盘之前正确设置文件编码系统。 有了更简单的方法,如果您使用的是Emacs的GUI版本,则可以单击左侧的模式行中的第3个字符。 这是在eol格式之间切换。 As Jür ...