storm-0.8.2源码分析之topology启动

2019-03-02 23:56|来源: 网路

topology启动

         一个topology的启动包括了三个步骤

         1)创建TopologyBuilder,设置输入源,输出源

         2)获取config

         3)提交topology(这里不考虑LocalCluster本地模式)

 

         以storm.starter的ExclamationTopology为例:

public static void main(String[] args)throws Exception {

       TopologyBuilder builder = new TopologyBuilder();

       

       builder.setSpout("word", new TestWordSpout(), 10);       

       builder.setBolt("exclaim1", new ExclamationBolt(), 3)

               .shuffleGrouping("word");

       builder.setBolt("exclaim2", new ExclamationBolt(), 2)

               .shuffleGrouping("exclaim1");

               

       Config conf = new Config();

       conf.setDebug(true);

       

       if(args!=null && args.length > 0) {

           conf.setNumWorkers(3);

           

           StormSubmitter.submitTopology(args[0], conf, builder.createTopology());

       } else {

       

           LocalCluster cluster = new LocalCluster();

           cluster.submitTopology("test", conf,builder.createTopology());

           Utils.sleep(10000);

           cluster.killTopology("test");

           cluster.shutdown();   

       }

}

TopologyBuilder

   TopologyBuilder builder = newTopologyBuilder();

       

       builder.setSpout("word", new TestWordSpout(), 10);       

       builder.setBolt("exclaim1", new ExclamationBolt(), 3)

               .shuffleGrouping("word");

       builder.setBolt("exclaim2", new ExclamationBolt(), 2)

               .shuffleGrouping("exclaim1");

         每个topology都是先创建了一个TopologyBuilder对象,之后才可以进行设置相关属性。

         先来看看setSpout函数:

   public SpoutDeclarer setSpout(String id, IRichSpout spout, Numberparallelism_hint) {

       validateUnusedId(id);

       initCommon(id, spout, parallelism_hint);

       _spouts.put(id, spout);

       return new SpoutGetter(id);

    }

         这个函数的处理的流程是,先检查下componentId  是否已经被使用过。要是被使用过则抛出IllegalArgumentException 异常。

         然后开始初始化ComponentCommon,这个结构是storm.thrift中定义的(thrift会生成对应的java类),定义如下:

struct ComponentCommon {

   1: required map<GlobalStreamId, Grouping> inputs;

   2: required map<string, StreamInfo> streams; //key is stream id

   3: optional i32 parallelism_hint; //how many threads across the clustershould be dedicated to this component

   // component specific configuration

   4: optional string json_conf;

  }

初始化ComponentCommon对象,最后会记录到TopologyBuilder 的成员变量Map<String, ComponentCommon> _commons。其中key是componentId,在这里就是"word"。

然后在TopologyBuilder 的成员变量Map<String, IRichSpout> _spouts,记录下spout的记录。其中key也是componentId,在这里是"word"。

 

         builder.setBolt("exclaim1",new ExclamationBolt(), 3)这部分和setSpout基本相似,只是最后记bolt的时候,是记录在TopologyBuilder 的成员变量Map<String, IRichBolt> _bolts,其中key是componentId,在这里就是"exclaim1"。之后,.shuffleGrouping("word")这部分,是调用setBolt返回的,BoltDeclarer中的shuffleGrouping。

         最终将会调用到grouping,其中streamId在这里没有指定,会使用"default"来替代。

public BoltDeclarer shuffleGrouping(StringcomponentId) {

   return shuffleGrouping(componentId, Utils.DEFAULT_STREAM_ID);

}

 

public BoltDeclarer shuffleGrouping(StringcomponentId, String streamId) {

   return grouping(componentId, streamId, Grouping.shuffle(newNullStruct()));

}

在这里grouping最后一个参数是生成了Grouping对象,并填充shuffle为NullStruct,其中Grouping是在storm.thrift定义的一个联合体,thrift会生成对应的java代码,内部定义了很多种grouping的方式。

                                                                                                                                      

private BoltDeclarer grouping(StringcomponentId, String streamId, Grouping grouping)

{

    _commons.get(_boltId).put_to_inputs(new GlobalStreamId(componentId,streamId), grouping);

    return this;

}

 

grouing函数是将之前记录在_commons中的,bolt的componentId对应的ComponentCommon的键值对,取出来设置ComponentCommon中的inputs的值。以第一个setBolt为例,就是取出"exclaim1"这个componentId对应的ComponentCommon,将里面的inputs设置为,这个输入是从哪里来的,也就是"word"这个componentId,streamId为"default"的这个spout流作为第一个bolt的输入源。

 

获取Config

         Config比较简单,内部定义了很多key的记录,并且这个Config是从Map派生过来的,因此Config就是一个Map,在之后插入记录,就插入自身当中就可以。

         在这个例子中有两个set函数的调用。

         conf.setDebug(true);就是在Map中插入一条记录("topology.debug" -> "true"),用来标记是打开debug模式的。

         conf.setNumWorkers(3);同样在Map中插入一条记录("topology.workers" -> 3),用来增加一条配置项,标记worker数为3个。

 

 

提交Topology

         在看看最后一部分,如何生成topology和提交的:

         StormSubmitter.submitTopology(args[0],conf, builder.createTopology())。

        

生成StormTopology

         builder.createTopology()会利用从之前设置的TopologyBuilder对象去生成一个topology。

public StormTopology createTopology() {

   Map<String, Bolt> boltSpecs = new HashMap<String, Bolt>();

   Map<String, SpoutSpec> spoutSpecs = new HashMap<String,SpoutSpec>();

   for(String boltId: _bolts.keySet()) {

       IRichBolt bolt = _bolts.get(boltId);

       ComponentCommon common = getComponentCommon(boltId, bolt);

       boltSpecs.put(boltId,\

                   newBolt(ComponentObject.serialized_java(Utils.serialize(bolt)), common));

    }

   for(String spoutId: _spouts.keySet()) {

       IRichSpout spout = _spouts.get(spoutId);

       ComponentCommon common = getComponentCommon(spoutId, spout);

       spoutSpecs.put(spoutId, \

                   newSpoutSpec(ComponentObject.serialized_java(Utils.serialize(spout)), common));

       

    }

   return new StormTopology(spoutSpecs,

                             boltSpecs,

                             newHashMap<String, StateSpoutSpec>());

}

 

创建一个topology的流程是:

1)首先,所有bolt取出来(setBolt设置),每个对应一个Bolt插入到BoltSpec。具体来说是将所有在setBolt阶段记录的componentId->IRichBolt记录取出来,从_commons记录的componentId->ComponentCommon中查找到对应的ComponentCommon,通过深拷贝生成一个新的ComponentCommon并设置相关值并返回。插入一条关于bolt的记录到boltSpecs中,其中key是在之前setBolt传入的componentId,value是一个Bolt对象。注意,这个Bolt不是setBolt时传递进来的那个Bolt(传递进来的是一个IRichBolt),而是storm.thrift中定义的Bolt,在Bolt内部有会有两个值,这两个值的类型分别是ComponentObject和ComponentCommon,其中这个ComponentCommon返回的那个。

boltSpecs可以理解为内部有很多条componentId->(序列化后的bolt+ComponentCommon)。

2)然后生成所有的spout对应SpoutSpec的记录插入到spoutSpecs。具体过程基本一样,只是spoutSpecs中的value是storm.thrift中定义的SpoutSpec。但从结构来看Bolt和SpoutSpec内部都是有两个成员变量,分别是 ComponentObject bolt_object和ComponentCommoncommon。

3)最后是生成一个StormTopology,并返回。StormTopology也是storm.thrift中定义的一个struct结构,定义如下:

struct StormTopology {

 //ids must be unique across maps

  //#workers to use is in conf

  1:required map<string, SpoutSpec> spouts;

  2:required map<string, Bolt> bolts;

  3:required map<string, StateSpoutSpec> state_spouts;

}

因此StormTopology对象的构造函数只是简单的将前面生成的boltSpecs,spoutSpecs以及一个新的_stateSpouts对象,赋给StormTopology内部对应的变量。

 

         通过上述步骤就生成了一个StormTopology。

         另外还需要单独将getComponentCommon拿出来说一说,上面说的只是一个大概含义。

先看看这个函数

 private ComponentCommongetComponentCommon(String id, IComponent component) {

   ComponentCommon ret = new ComponentCommon(_commons.get(id));

   

   OutputFieldsGetter getter = new OutputFieldsGetter();

   component.declareOutputFields(getter);

   ret.set_streams(getter.getFieldsDeclaration());

   return ret;       

}

         第一句就是之前说的,从_commons记录中取一个componentId对应的ComponentCommon,并赋给新的ComponentCommon,这里的赋值是通过deepcopy完成的。

         后面的,就和用户编写topology有关系了。OutputFieldsGetter是实现了OutputFieldsDeclarer接口,内部有declare接口供编写bolt或者spout时调用,用来申明stream信息,这些信息都会记录在内部的Map<String, StreamInfo> _field,如果没有在申明是指定streamId,那么streamId都会采用默认的"default"作为streamId。

         bolt和spout最终都会实现IComponent的declareOutputFields接口。在这个接口内部会申明下这个流有什么字段之类的信息。以ExclamationTopology例子中的ExclamationBolt为例,在declareOutputFields内部会去申明这个流有一个字段叫做word,declareOutputFields的参数就是上面说的OutputFieldsGetter实例。

public static class ExclamationBolt extendsBaseRichBolt {

         ...

   public void declareOutputFields(OutputFieldsDeclarer declarer) {

       declarer.declare(new Fields("word"));

    }

}

         再取到OutputFieldsGetter中记录stream对应信息的_fields,赋值给ComponentCommon中的变量stream。这样,ComponentCommon的记录信息就完整了(bolt会记录inputs信息,而spout不会记录)。

        

submitTopology

提交到集群中,会调用StormSubmitter中的静态方法:

public static void submitTopology(Stringname, Map stormConf, StormTopology topology) throws AlreadyAliveException,InvalidTopologyException {

       submitTopology(name, stormConf, topology, null);

}

参数name表示要运行的topology的名字。分布式模式时,name就是jar包中的入口类的名字,本地模式时name可以任意一个可以用来表示topology的名字。

内部将会调用真正提交topology的函数。

 

public static void submitTopology(Stringname, Map stormConf, StormTopology topology, SubmitOptions opts) throwsAlreadyAliveException, InvalidTopologyException {

       if(!Utils.isValidConf(stormConf)) {

           throw new IllegalArgumentException("Storm conf is not valid. Mustbe                                           json-serializable");

       }

       stormConf = new HashMap(stormConf);

       stormConf.putAll(Utils.readCommandLineOpts());

       Map conf = Utils.readStormConfig();

       conf.putAll(stormConf);

       try {

           String serConf = JSONValue.toJSONString(stormConf);

           if(localNimbus!=null) {

                LOG.info("Submittingtopology " + name + " in local mode");

               localNimbus.submitTopology(name, null, serConf, topology);

           } else {

                NimbusClient client =NimbusClient.getConfiguredClient(conf);

                if(topologyNameExists(conf,name)) {

                    throw newRuntimeException("Topology with name `" + name + "`                                                    alreadyexists on cluster");

                }

                submitJar(conf);

                try {

                   LOG.info("Submitting topology" +  name + " in distributedmode with                                            conf" + serConf);

                    if(opts!=null) {

                       client.getClient().submitTopologyWithOpts(name, submittedJar,                                                       serConf,topology, opts);                   

                    } else {

                        // this is forbackwards compatibility

                       client.getClient().submitTopology(name, submittedJar, serConf,                                                        topology);                                           

                    }

                }catch(InvalidTopologyException e) {

                    LOG.warn("Topologysubmission exception", e);

                    throw e;

                } catch(AlreadyAliveExceptione) {

                    LOG.warn("Topologyalready alive exception", e);

                    throw e;

                } finally {

                    client.close();

                }

           }

           LOG.info("Finished submitting topology: " +  name);

       } catch(TException e) {

           throw new RuntimeException(e);

       }

    }

 

正在去做提交topoloy这个动作的流程是:

1)校验传进来的配置合法性,并读取default.yaml和storm.yaml。这个过程在分析nimbus启读取配置时有分析过,这里就不再累赘了。

2)获取Config.NIMBUS_HOST和Config.NIMBUS_THRIFT_PORT值,创建NimbusClient。在内部是封装了访问Nimbus这个rpc server(基于thrift)的rpc client,在NimbusClient构造时,就创建了rpcclient并建立与rpc server的连接。

3)检测现在准备提交的topology是否和集群中正在运行的topology名字有冲突。检测过程很简单,通过rpc调用从Nimbus服务器上取下集群的信息。集群的信息里面会记录supervisor的统计信息,nimbus更新时间,topology的统计信息。在topology的统计信息中会记录topology的名字。因此只要遍历比较下就能知道名字是否会冲突。

在storm.thrift中定义的集群信息结构如下:

struct ClusterSummary {

  1:required list<SupervisorSummary> supervisors;

  2:required i32 nimbus_uptime_secs;

  3:required list<TopologySummary> topologies;

}

4) submitJar 上传jar文件。使用bin/stormjar xxx.jar xxx.class arg1 args2运行topology的时候,

bin/storm这个python脚本中,最后会使用"java-Dstorm.jar=xxx.jar ..."命名运行jar中指定的xxx.class中的main函数。submitJar首先会从环境变量中读取到storm.jar这个变量的值,如果没有设置过,则抛出异常。否则,会创建连接到Nimbus的rpc client,调用beginFileUpload通知要上传文件,Nimbus会返回一个上传的路径,之后分段读取jar文件,调用uploadChunk上传到nimbus所告知的那个路径,jar文件数据都上传完毕调用finishFileUpload告知nimbus,对那个路径的文件已上传完毕。其中beginFileUpload,uploadChunk,finishFileUpload都是storm.thrift定义的service Nimbus中的方法,其中Nimbus.Iface是在 Nimbus.clj被实现了。

5)成功上传jar文件后,会用第二步中创建的rpcclient调用Nimbus上的submitTopology方法,这个方法也是在storm.thrft中service Nimbus。调用这个Nimbus上的这个方法可以理解为通知Nimbus去运行这个topology。通知的时候,会带上name,这个name就是jar的入口类的名字。

 

         这样经过上述5步后,一个编写有topology任务的jar文件就上传提交到nimbus,并可以由nimbus将任务分发给supervisors去执行这个topology。


转自:http://blog.csdn.net/chlaws/article/details/10562035

相关问答

更多
  • 关于内核的书不在多,而是要精,强烈推荐 《Linux内核设计与实现》,英文名Linux Kernel Development,机械工业出版社,¥35, 美国Robert Love著,陈莉君译。 此书是当今首屈一指的入门最佳图书。作者是为2.6内核加入了抢占的人,对调度部分非常精通,而调度是整个系统的核心,因此本书是很权威的。对没怎么深入内核的人来说,这是强烈推荐的一本书。
  • 一些想法以及我迄今为止进行类似实验的经验(在Sprint期间在Spike中完成): 根据我的经验(我可能是错的),随着需求的增加,您不会真正旋转更多的螺栓,而是调整拓扑中每个螺栓的平行度配置。 拓扑不会通过增加更多的螺栓来缩放,而是通过增加平行度来扩展螺栓的瓶颈。 以示例字数问题为例: builder.setBolt(4, new MyBolt(), 12) .shuffleGrouping(1) .shuffleGrouping(2) .fieldsGrouping(3, new ...
  • 在拓扑运行时无法更改拓扑结构。 您需要kill拓扑并在之后重新部署新版本。 拓扑运行时可以更改的唯一参数是并行度。 有关更多详细信息,请参见此处: https : //storm.apache.org/releases/1.0.0/Understanding-the-parallelism-of-a-Storm-topology.html It is not possible to change the structure of a topology while it is running. You nee ...
  • 增加supervisor.slots.ports的端口数量。 现在它只有4个端口(6700到6703)意味着只有4个工作人员可以在该监控机器上运行。 如果您有4个拓扑,每个拓扑有10个工作线程,那么您必须添加40个端口(意味着从6700到6739)。 increase the number of ports in supervisor.slots.ports. now it has only 4 ports (6700 to 6703) means only 4 workers will run on th ...
  • 我想我会回答我自己的问题。 基于我在github.com和javadocs中看到的...我相信pyleus不支持1.0.1。 javadocs表明,在以前的版本中,IRichBolt处于backtype.storm.topology但现在它存在于org.apache.storm.topology 。 谁知道还有什么是不相容的。 所以我想现在,运行一个旧版本的Storm(我只看到0.9.4兼容的引用,所以也许0.9.6也可以)。 我确实看到有一个关于0.10.0的开放拉取请求,所以我想在支持1.0.1之前可能 ...
  • 这里的例外很清楚。 如果您只是查看了java.io.NotSerializableException的文档,您会看到正在打印的消息是不可序列化的类。 要修复,只需让您的拓扑类实现Serializable : public class ProvaTopology implements Serializable { ... } 这是必需的,以便Storm可以序列化您的拓扑并将其发送到Nimbus执行。 由于您的Bolts和Spout扩展或实现了Storm提供的类或接口,因此您不必担心将它们标记为可序列 ...
  • 我建议在设计螺栓时遵循Unix的理念:“做一件事,做得好”。 螺栓'A'过滤数据。 Bolt'B'将螺栓'A'的输出转换为DBObject并将其保存到MongoDB。 这样,您可以保持拓扑简单。 每个螺栓都有简单而专注的责任。 如果出现故障,您确切知道问题所在。 当然,你可以告诉Storm每个螺栓需要多少并行化。 我认为你不需要任何特殊的抽象(三叉戟或交易)。 常规的Storm拓扑为您提供良好的服务。 顺便说一句,Storm文档很棒! I would suggest to follow the philos ...
  • 我想我找到了答案。 在群集中,可能会运行更多拓扑。 设A , B , C , D为4个拓扑,在同一个集群中运行。 这是我的情况。 当您启动拓扑时,他们将为每个拓扑分配连续的数字,但是每个群集 (这是我的错误)。 因此我们从: A-1-... B-2-... C-3-... D-4-... 如果你重新启动C ,你就可以了 C-5... 那么C-4在哪里? 它根本不存在,因为D已经占用了4 。 因此,从n yo n+2跳过是正常的。 您可能会发现n+1分配给另一个拓扑。 (QED) I guess I've ...
  • 遗憾的是,无法通过REST API提供拓扑提交。 我们在Storm 0.10测试版中添加了它,但发现安全问题并决定摆脱它。 Unfortunately, topology submission is not available via REST API. We added it at Storm 0.10 beta but found security issue and decided to get rid of it.
  • 我终于来了! 我只是简单了,我创建了一个非常简单的redis bolt,它发布了内容,我监视了redis数据库,它正在运行。 我的工作螺栓: public class RedisBolt implements IRichBolt { protected String channel = "Somriures"; // protected String configChannel; protected OutputCollector collector ...