首页 \ 问答 \ Storm Fields分组示例(Storm Fields grouping example)

Storm Fields分组示例(Storm Fields grouping example)

我正在使用Kafka风暴,kafka发送/发出json字符串暴风雨,在暴风雨中,我想根据json中的键/字段将负载分配给几个worker。 怎么做? 在我的例子中,它是json字符串中的groupid字段。

例如,我有这样的json:

{groupid: 1234, userid: 145, comments:"I want to distribute all this group 1234  to one worker", size:50,type:"group json"}
{groupid: 1235, userid: 134, comments:"I want to distribute all this group 1234 to another worker", size:90,type:"group json"}
{groupid: 1234, userid: 158, comments:"I want to be sent to same worker as group 1234", size:50,type:"group json"}   

===风暴0.9.4。 使用=====

我的源代码如下:

import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;


public class KafkaBoltMain {
   private static final String SPOUTNAME="TopicSpout"; 
   private static final String ANALYSISBOLT = "AnalysisWorker";
   private static final String CLIENTID = "Storm";
   private static final String TOPOLOGYNAME = "LocalTopology";


   private static class AppAnalysisBolt extends BaseRichBolt {
       private static final long serialVersionUID = -6885792881303198646L;
        private OutputCollector _collector;
       private long groupid=-1L;
       private String log="test";

       public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
           _collector = collector;
       }

       public void execute(Tuple tuple) {
           List<Object> objs = tuple.getValues();
           int i=0;
           for(Object obj:objs){
               System.out.println(""+i+"th object's value is:"+obj.toString());
               i++;
           }

//         _collector.emit(new Values(groupid,log));
           _collector.ack(tuple);
       }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("groupid","log"));
        }
   } 

   public static void main(String[] args){
       String zookeepers = null;
       String topicName = null;
       if(args.length == 2 ){
           zookeepers = args[0];
           topicName = args[1];
        }else if(args.length == 1 && args[0].equalsIgnoreCase("help")){ 
           System.out.println("xxxx");
           System.exit(0);
        }
       else{
           System.out.println("You need to have two arguments: kafka zookeeper:port and topic name");
           System.out.println("xxxx");
           System.exit(-1);
        }       

        SpoutConfig spoutConfig = new SpoutConfig(new ZkHosts(zookeepers),
                topicName,
                "",// zookeeper root path for offset storing
                CLIENTID);
        spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
        KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout(SPOUTNAME, kafkaSpout, 1);
        builder.setBolt(ANALYSISBOLT, new AppAnalysisBolt(),2)
            .fieldsGrouping(SPOUTNAME,new Fields("groupid"));

        //Configuration
        Config conf = new Config();
        conf.setDebug(false);
        //Topology run
        conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology(TOPOLOGYNAME, conf, builder.createTopology());
    }
}

但是当我提交拓扑时,会出现以下错误:

12794 [main] WARN  backtype.storm.daemon.nimbus - Topology submission exception. (topology name='LocalTopology') #<InvalidTopologyException InvalidTopologyException(msg:Component:
 [AnalysisWorker] subscribes from stream: [default] of component [TopicSpout] with non-existent fields: #{"groupid"})>
12800 [main] ERROR org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Thread Thread[main,5,main] died
backtype.storm.generated.InvalidTopologyException: null

为什么不存在fileds警告消息? 任何提示?


I am using Kafka storm, kafka sends/emits json string to storm, in the storm, I want to distribute the load to a couple of workers based on the key/field in the json. How to do that? In my case, it is groupid field in json string.

For example, I have json like that:

{groupid: 1234, userid: 145, comments:"I want to distribute all this group 1234  to one worker", size:50,type:"group json"}
{groupid: 1235, userid: 134, comments:"I want to distribute all this group 1234 to another worker", size:90,type:"group json"}
{groupid: 1234, userid: 158, comments:"I want to be sent to same worker as group 1234", size:50,type:"group json"}   

===Storm 0.9.4. is used=====

My source codes is as follows:

import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;


public class KafkaBoltMain {
   private static final String SPOUTNAME="TopicSpout"; 
   private static final String ANALYSISBOLT = "AnalysisWorker";
   private static final String CLIENTID = "Storm";
   private static final String TOPOLOGYNAME = "LocalTopology";


   private static class AppAnalysisBolt extends BaseRichBolt {
       private static final long serialVersionUID = -6885792881303198646L;
        private OutputCollector _collector;
       private long groupid=-1L;
       private String log="test";

       public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
           _collector = collector;
       }

       public void execute(Tuple tuple) {
           List<Object> objs = tuple.getValues();
           int i=0;
           for(Object obj:objs){
               System.out.println(""+i+"th object's value is:"+obj.toString());
               i++;
           }

//         _collector.emit(new Values(groupid,log));
           _collector.ack(tuple);
       }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("groupid","log"));
        }
   } 

   public static void main(String[] args){
       String zookeepers = null;
       String topicName = null;
       if(args.length == 2 ){
           zookeepers = args[0];
           topicName = args[1];
        }else if(args.length == 1 && args[0].equalsIgnoreCase("help")){ 
           System.out.println("xxxx");
           System.exit(0);
        }
       else{
           System.out.println("You need to have two arguments: kafka zookeeper:port and topic name");
           System.out.println("xxxx");
           System.exit(-1);
        }       

        SpoutConfig spoutConfig = new SpoutConfig(new ZkHosts(zookeepers),
                topicName,
                "",// zookeeper root path for offset storing
                CLIENTID);
        spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
        KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout(SPOUTNAME, kafkaSpout, 1);
        builder.setBolt(ANALYSISBOLT, new AppAnalysisBolt(),2)
            .fieldsGrouping(SPOUTNAME,new Fields("groupid"));

        //Configuration
        Config conf = new Config();
        conf.setDebug(false);
        //Topology run
        conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology(TOPOLOGYNAME, conf, builder.createTopology());
    }
}

But when I submit topology, it gives following error:

12794 [main] WARN  backtype.storm.daemon.nimbus - Topology submission exception. (topology name='LocalTopology') #<InvalidTopologyException InvalidTopologyException(msg:Component:
 [AnalysisWorker] subscribes from stream: [default] of component [TopicSpout] with non-existent fields: #{"groupid"})>
12800 [main] ERROR org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Thread Thread[main,5,main] died
backtype.storm.generated.InvalidTopologyException: null

Why there is non-existent fileds warning message? Any hints?


原文:https://stackoverflow.com/questions/29998310
更新时间:2022-02-14 20:02

最满意答案

标准文档中有perldoc perlstyle

关于代码的美学,关于Larry唯一关心的唯一事情是多行BLOCK的结束括号应该与启动构造的关键字对齐。 除此之外,他还有其他偏好并不那么强烈。

拉里对这些事情都有他的理由,但他并没有声称其他人的思想和他的思想一样。

如果您安装Perl::Tidy模块,它将包含程序/工具perltidy ,它将重新格式化您的代码以符合Larry Wall的偏好,如perlstyle 。 通过命令行参数-pbp ,它将符合Damian Conway在其Perl Best Practices一书中表达的样式偏好。


There's perldoc perlstyle in the standard documentation.

Regarding aesthetics of code lay out, about the only thing Larry cares strongly about is that the closing curly bracket of a multi-line BLOCK should line up with the keyword that started the construct. Beyond that, he has other preferences that aren't so strong..

.. Larry has his reasons for each of these things, but he doesn't claim that everyone else's mind works the same as his does.

If you install the Perl::Tidy module it includes the program/tool perltidy that will reformat your code to conform to Larry Wall's preferences as expressed in perlstyle. With the command line parameter -pbp it will instead conform to the style preferences Damian Conway expressed in his book Perl Best Practices.

相关问答

更多
  • Modelica用户指南中的指导是我知道的唯一指导。 这个话题已经在设计会议上多次讨论过了, 我写了一篇文章讨论了这个话题,但并没有真正提出具体的指导方针。 部分问题在于,虽然Modelica协会可能有他们的指导方针(如您所见),但它们并不代表任何可能不同的特定业务或行业指导方针。 换句话说,我可以设想有许多不同的指导方针,适合特定类型的模型或特定的行业惯例。 但Modelica是我特别注意的唯一一个(尽管如果大型组织使用它们有他们自己的正式样式准则,我不会感到惊讶)。 The guidelines in ...
  • CSILD参考如何: http : //msdn.microsoft.com/en-us/library/bb762494%28v=VS.85%29.aspx 和KNOWNFOLDERID参考: http : //msdn.microsoft.com/en-us/library/dd378457%28v=VS.85%29.aspx How about the CSILD reference: http://msdn.microsoft.com/en-us/library/bb762494%28v=VS.85 ...
  • 一般来说,我经常使用的快速参考指南是 http://allaboutfpga.com/category/vhdl/这里的例子有非常好的语法和注释,它主要是优化的VHDL,更适合于学习它,所以这可能会或可能不会有帮助。 如果您使用的是Xilinx工具,那么它们的模板包含了您可能发现的各种操作或情况。如果您想避免使用像Reuse Methodology( https://www.amazon.com)这样的书,我会建议您熟悉这些工具。 .com / Reuse-Methodology-Manual-System ...
  • Mac开发中心的核心基础设计概念实际上解决了我的问题。 Core Foundation Design Concepts at the Mac Dev Center actually cleared up my question.
  • 我将回顾JSLint检查的编码标准,或者查看NPM (Isaac Shlueter's) 编码标准的作者 。 您还可以查看着名Node.JS编码器使用的样式: TJ Holowaychuk 以撒洗牌机 蒂姆·卡斯威尔 杰里米·阿什肯纳斯 FelixGeisendörfer 查理·罗宾斯 Marak Squires 亚伦·赫克曼 吉列尔莫·拉赫 迈克尔·罗杰斯 Ryan Dahl +你可以看一下实际的Node.JS代码库 我会把我的扔在那里好的措施;) 编辑: @alienhard的建议 Google Jav ...
  • 有很好的材料可以在网上学习Perl。 其中一些是: - 在哪里以及如何开始学习Perl http://perl-tutorial.org/ http://learn.perl.org/ 开始Perl There are very good materials to learn Perl on the web. Some of these are:- Where and how to start learning Perl http://perl-tutorial.org/ http://learn.perl ...
  • 标准文档中有perldoc perlstyle 。 关于代码的美学,关于Larry唯一关心的唯一事情是多行BLOCK的结束括号应该与启动构造的关键字对齐。 除此之外,他还有其他偏好并不那么强烈。 拉里对这些事情都有他的理由,但他并没有声称其他人的思想和他的思想一样。 如果您安装Perl::Tidy模块,它将包含程序/工具perltidy ,它将重新格式化您的代码以符合Larry Wall的偏好,如perlstyle 。 通过命令行参数-pbp ,它将符合Damian Conway在其Perl Best Pr ...
  • $q->cgi确实是一个方法调用。 CGI subs可以作为方法或函数调用。 它检查参数并使用本地对象(如果未在调用中使用)。 $q->cgi is indeed a method call. CGI subs can be called as methods or as functions. It checks the arguments and uses a local object if one wasn't used in the call.
  • “那家伙”非常好,你应该读一读。 "That guy" is quite good and you should read that.
  • 我怀疑书籍错误(由于浏览器错误) 这是我怀疑的。 CSS权威指南第3版。 这个网站从2008年开始提到,Firefox得到的负面z-index呈现不正确 (虽然个人而言,我认为Firefox版本应该是什么,规格应该改变;但这是我的意见)。 在这篇文章中,IE与Firefox的呈现差异是你现在看到的差异(而且,FF不再像过去那样呈现它,而是“正确”的方式)。 因此,用于本书的图像很可能来自Firefox并且当时呈现“错误”。 因此,为了“正确”使它现在呈现类似于书籍图像,最顶层的元素( #one )不能给出自 ...

相关文章

更多

最新问答

更多
  • 获取MVC 4使用的DisplayMode后缀(Get the DisplayMode Suffix being used by MVC 4)
  • 如何通过引用返回对象?(How is returning an object by reference possible?)
  • 矩阵如何存储在内存中?(How are matrices stored in memory?)
  • 每个请求的Java新会话?(Java New Session For Each Request?)
  • css:浮动div中重叠的标题h1(css: overlapping headlines h1 in floated divs)
  • 无论图像如何,Caffe预测同一类(Caffe predicts same class regardless of image)
  • xcode语法颜色编码解释?(xcode syntax color coding explained?)
  • 在Access 2010 Runtime中使用Office 2000校对工具(Use Office 2000 proofing tools in Access 2010 Runtime)
  • 从单独的Web主机将图像传输到服务器上(Getting images onto server from separate web host)
  • 从旧版本复制文件并保留它们(旧/新版本)(Copy a file from old revision and keep both of them (old / new revision))
  • 西安哪有PLC可控制编程的培训
  • 在Entity Framework中选择基类(Select base class in Entity Framework)
  • 在Android中出现错误“数据集和渲染器应该不为null,并且应该具有相同数量的系列”(Error “Dataset and renderer should be not null and should have the same number of series” in Android)
  • 电脑二级VF有什么用
  • Datamapper Ruby如何添加Hook方法(Datamapper Ruby How to add Hook Method)
  • 金华英语角.
  • 手机软件如何制作
  • 用于Android webview中图像保存的上下文菜单(Context Menu for Image Saving in an Android webview)
  • 注意:未定义的偏移量:PHP(Notice: Undefined offset: PHP)
  • 如何读R中的大数据集[复制](How to read large dataset in R [duplicate])
  • Unity 5 Heighmap与地形宽度/地形长度的分辨率关系?(Unity 5 Heighmap Resolution relationship to terrain width / terrain length?)
  • 如何通知PipedOutputStream线程写入最后一个字节的PipedInputStream线程?(How to notify PipedInputStream thread that PipedOutputStream thread has written last byte?)
  • python的访问器方法有哪些
  • DeviceNetworkInformation:哪个是哪个?(DeviceNetworkInformation: Which is which?)
  • 在Ruby中对组合进行排序(Sorting a combination in Ruby)
  • 网站开发的流程?
  • 使用Zend Framework 2中的JOIN sql检索数据(Retrieve data using JOIN sql in Zend Framework 2)
  • 条带格式类型格式模式编号无法正常工作(Stripes format type format pattern number not working properly)
  • 透明度错误IE11(Transparency bug IE11)
  • linux的基本操作命令。。。