首页 \ 问答 \ 风暴集群模式,分布式螺栓/工人负载共享(storm cluster mode, distributed bolt/worker load sharing)

风暴集群模式,分布式螺栓/工人负载共享(storm cluster mode, distributed bolt/worker load sharing)

HI:我将进行大容量风暴分析任务。 对我来说,我希望在不同的节点/机器上分离出许多螺栓/工人来完成任务,这样每台机器都可以共享负载。 我想知道如何编写bolt / workers / topology以便它们可以相互通信。 在下面的代码中,我在一台机器中提交拓扑,如何在其他机器中编写bolt / worker / config,以便拓扑知道其他机器的bolt / worker。 我想我无法在一台机器上提交拓扑并在其他机器中提交相同的拓扑。 关于风暴工作者负载分担的任何提示?

import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class StormClusterMain {
    private static final String SPOUTNAME="KafkaSpout"; 
    private static final String ANALYSISBOLT = "ClusterAnalysisWorker";
    private static final String CLIENTID = "ClusterStorm";
    private static final String TOPOLOGYNAME = "ClusterTopology";

    private static class AppAnalysisBolt extends BaseRichBolt {
        private static final long serialVersionUID = -6885792881303198646L;
        private static final String collectionName="clusterusers";
        private OutputCollector _collector;
        private AtomicInteger index = new AtomicInteger(0); 
        private static final Logger boltLogger = LoggerFactory.getLogger(AppAnalysisBolt.class); 

        public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
            _collector = collector;
        }

        public void execute(Tuple tuple) {  
            boltLogger.error("Message received:"+tuple.getString(0));
            _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
            _collector.ack(tuple);
        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }


    }

   public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException{

       String zookeepers = null;
       String topicName = null;
       if(args.length == 2 ){
           zookeepers = args[0];
           topicName = args[1];
       }else{
           System.out.println("You need to have two arguments: kafka zookeeper:port and topic name");
           System.out.println("Usage :.xxx");
           System.exit(-1);
       }        

       SpoutConfig spoutConfig = new SpoutConfig(new ZkHosts(zookeepers),
            topicName,
            "",// zookeeper root path for offset storing
            CLIENTID);
       spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
       KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

       TopologyBuilder builder = new TopologyBuilder();
       builder.setSpout(SPOUTNAME, kafkaSpout, 1);
       builder.setBolt(ANALYSISBOLT, new AppAnalysisBolt())
                                                 .shuffleGrouping(SPOUTNAME);

        //Configuration
        Config conf = new Config();
        conf.setDebug(false);
        //Topology run
        conf.setNumWorkers(3);
        StormSubmitter.submitTopologyWithProgressBar(TOPOLOGYNAME, conf, builder.createTopology());

HI: I will have a large capacity storm analysis task. For me, I want to spin off many bolt/workers across different nodes/machines to take the task so that every machine could share the load . I am wondering how to write bolt/workers/topology so that they could communicate with each other. In the below codes, I submit topology in one machine, how to write bolt/worker/config in other machines so that topology is aware of other machines' bolt/worker. I suppose I could not submit topology in one machine and submit same topology in other machines. Any hints on storm worker load sharing?

import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class StormClusterMain {
    private static final String SPOUTNAME="KafkaSpout"; 
    private static final String ANALYSISBOLT = "ClusterAnalysisWorker";
    private static final String CLIENTID = "ClusterStorm";
    private static final String TOPOLOGYNAME = "ClusterTopology";

    private static class AppAnalysisBolt extends BaseRichBolt {
        private static final long serialVersionUID = -6885792881303198646L;
        private static final String collectionName="clusterusers";
        private OutputCollector _collector;
        private AtomicInteger index = new AtomicInteger(0); 
        private static final Logger boltLogger = LoggerFactory.getLogger(AppAnalysisBolt.class); 

        public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
            _collector = collector;
        }

        public void execute(Tuple tuple) {  
            boltLogger.error("Message received:"+tuple.getString(0));
            _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
            _collector.ack(tuple);
        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }


    }

   public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException{

       String zookeepers = null;
       String topicName = null;
       if(args.length == 2 ){
           zookeepers = args[0];
           topicName = args[1];
       }else{
           System.out.println("You need to have two arguments: kafka zookeeper:port and topic name");
           System.out.println("Usage :.xxx");
           System.exit(-1);
       }        

       SpoutConfig spoutConfig = new SpoutConfig(new ZkHosts(zookeepers),
            topicName,
            "",// zookeeper root path for offset storing
            CLIENTID);
       spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
       KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

       TopologyBuilder builder = new TopologyBuilder();
       builder.setSpout(SPOUTNAME, kafkaSpout, 1);
       builder.setBolt(ANALYSISBOLT, new AppAnalysisBolt())
                                                 .shuffleGrouping(SPOUTNAME);

        //Configuration
        Config conf = new Config();
        conf.setDebug(false);
        //Topology run
        conf.setNumWorkers(3);
        StormSubmitter.submitTopologyWithProgressBar(TOPOLOGYNAME, conf, builder.createTopology());

原文:
更新时间:2022-02-10 16:02

最满意答案

你不能,你已经创建了结构。 我建议使用更正常的结构:

// The constructor function
var TeslaModelS = function() {
  this.numWheels = 4;
  this.manufacturer = 'Tesla';
  this.make = 'Model S';

  this.pressGasPedal();
};

TeslaModelS.prototype.pressGasPedal = function() {
  console.log(this.numWheels);
};

TeslaModelS.prototype.pressBrakePedal = function() {
};

var i = new TeslaModelS();

但是如果您不想,如果您真的想要Drive对象,则需要在每个实例的不同Drive对象上创建这些函数的绑定版本:

// The functions you'll bind each time
var Drive = {
  go: function() {
    console.log(this.numWheels)
  },

  stop: function() {}
};

// The constructor function
var TeslaModelS = function() {
  this.numWheels = 4;
  this.manufacturer = 'Tesla';
  this.make = 'Model S';

  // Create a Drive object for this instance with the functions bound
  // to `this`
  this.Drive = {
    pressGasPedal: Drive.go.bind(this),
    pressBrakePedal: Drive.stop.bind(this)
  };

  this.Drive.pressGasPedal();
}

var i = new TeslaModelS();


You can't, with the structure you've created. I would suggest using a more normal structure:

// The constructor function
var TeslaModelS = function() {
  this.numWheels = 4;
  this.manufacturer = 'Tesla';
  this.make = 'Model S';

  this.pressGasPedal();
};

TeslaModelS.prototype.pressGasPedal = function() {
  console.log(this.numWheels);
};

TeslaModelS.prototype.pressBrakePedal = function() {
};

var i = new TeslaModelS();

But if you don't want to, if you really want that Drive object, you'll need to create bound versions of those functions on a distinct Drive object for each instance:

// The functions you'll bind each time
var Drive = {
  go: function() {
    console.log(this.numWheels)
  },

  stop: function() {}
};

// The constructor function
var TeslaModelS = function() {
  this.numWheels = 4;
  this.manufacturer = 'Tesla';
  this.make = 'Model S';

  // Create a Drive object for this instance with the functions bound
  // to `this`
  this.Drive = {
    pressGasPedal: Drive.go.bind(this),
    pressBrakePedal: Drive.stop.bind(this)
  };

  this.Drive.pressGasPedal();
}

var i = new TeslaModelS();

相关问答

更多
  • function onYouTubePlayerReady( ) // playerid is usefull here or not { ytplayer = document.getElementById("myytplayer"); return ytplayer; } function loadVideo( id, start ) { ytplayer = onYouTubePlayerReady(); // call onYouTubePlayerReady ...
  • 我相信这是因为我被悬挂,但我的任务并非如此。 因此,当您提醒i时,它指的是尚未分配值的本地范围的i。 它最终像这样工作: var i; alert(i); if(!i){ i = 20; } I believe it's because the i is hoisted, but the assignment of i is not. Therefore, when you alert i, it is referring to the locally scoped i which hasn't ...
  • 在Javascript中有两种变量: 局部变量和全局变量。 当在函数外部使用var ,你声明了一个全局变量,如果你根本不使用var ,也会发生同样的情况。 写作 foo = "first"; 在顶层(在任何函数之外)与var foo = "first" 。 当函数内部的东西不同时,关键字var区分了局部变量和全局变量: var foo = "first"; var bar = "second"; function f() { var foo = "third"; // local bar ...
  • 欢迎来到变量吊装陷阱: 在javasript中,变量声明被移动到外部(包含)函数的开头,所以你的代码实际上等价于: function scopeTest() { var x; x = 200; //setting local x to 200 alert(x); //200 x = 500; //creating a local variable x and setting it to 500 alert(x); //500 } welcome to variab ...
  • JavaScript不会“仅查看”查询字符串吗? 客户端脚本如何知道任何重写规则? 我能想到的唯一方法就是使用PHP - 将它回显到主页面中的内联脚本中的变量中,而不是JS文件中。 Won't JavaScript "only see" the query string? How would client-side script know about any rewrite rules? The only way I can think of is to use PHP -- echo it into a ...
  • 好的,所以在你的建议之后,我有这个临时代码。 可能不是最好的(可能会堵塞),但它解决了问题。 我只是在确定从Neo4J数据库收到变量(node.id)之后才调用下一个函数。 如果有人知道更优雅的解决方案,请告诉我(特别是使用异步库)。 dbneo.insertNode({ auth_id: user.id, username: user.name, name: user.name },function (err, node){ if(err) throw err; ...
  • 你不能,你已经创建了结构。 我建议使用更正常的结构: // The constructor function var TeslaModelS = function() { this.numWheels = 4; this.manufacturer = 'Tesla'; this.make = 'Model S'; this.pressGasPedal(); }; TeslaModelS.prototype.pressGasPedal = function() { ...
  • 忘记我之前的回答。 您只需添加私有版本即可完成此操作 。 var mymodule = (function() { return { MyObj : function() { this.myvar = 123; var that = this; this.publicfunc = function() { someOtherFunc(); }; ...
  • 实际上,是的。 有一个由运行时创建的所谓“范围链”,链中的链接仅在不再引用时释放。 fooinner()有自己的范围,“父范围”链接到fooOuter()的范围,依此类推 - 这是fooinner()的范围链。 因此, fooinner()并在其外部定义的变量将至少与特定函数对象一样长。 Effectively, yes. There is a so-called "scope chain" that is created by the runtime, and links in the chain are ...
  • 这里的直接问题是计时问题 - 您在填充值之前很久就试图从范围中读取locations_array值。 事件的顺序是这样的: 对于文档触发器的ready事件,在Angular甚至考虑启动之前,您的内联JS代码运行,尝试从范围中读取值,该值尚不存在。 Angular引导您的Angular应用程序以响应文档的ready事件(这可能在#1之前,具体取决于页面上脚本的顺序)。 这将调用HomeController构造函数,该构造函数仅为$viewContentLoaded事件设置一个侦听器。 $viewContent ...

相关文章

更多

最新问答

更多
  • 获取MVC 4使用的DisplayMode后缀(Get the DisplayMode Suffix being used by MVC 4)
  • 如何通过引用返回对象?(How is returning an object by reference possible?)
  • 矩阵如何存储在内存中?(How are matrices stored in memory?)
  • 每个请求的Java新会话?(Java New Session For Each Request?)
  • css:浮动div中重叠的标题h1(css: overlapping headlines h1 in floated divs)
  • 无论图像如何,Caffe预测同一类(Caffe predicts same class regardless of image)
  • xcode语法颜色编码解释?(xcode syntax color coding explained?)
  • 在Access 2010 Runtime中使用Office 2000校对工具(Use Office 2000 proofing tools in Access 2010 Runtime)
  • 从单独的Web主机将图像传输到服务器上(Getting images onto server from separate web host)
  • 从旧版本复制文件并保留它们(旧/新版本)(Copy a file from old revision and keep both of them (old / new revision))
  • 西安哪有PLC可控制编程的培训
  • 在Entity Framework中选择基类(Select base class in Entity Framework)
  • 在Android中出现错误“数据集和渲染器应该不为null,并且应该具有相同数量的系列”(Error “Dataset and renderer should be not null and should have the same number of series” in Android)
  • 电脑二级VF有什么用
  • Datamapper Ruby如何添加Hook方法(Datamapper Ruby How to add Hook Method)
  • 金华英语角.
  • 手机软件如何制作
  • 用于Android webview中图像保存的上下文菜单(Context Menu for Image Saving in an Android webview)
  • 注意:未定义的偏移量:PHP(Notice: Undefined offset: PHP)
  • 如何读R中的大数据集[复制](How to read large dataset in R [duplicate])
  • Unity 5 Heighmap与地形宽度/地形长度的分辨率关系?(Unity 5 Heighmap Resolution relationship to terrain width / terrain length?)
  • 如何通知PipedOutputStream线程写入最后一个字节的PipedInputStream线程?(How to notify PipedInputStream thread that PipedOutputStream thread has written last byte?)
  • python的访问器方法有哪些
  • DeviceNetworkInformation:哪个是哪个?(DeviceNetworkInformation: Which is which?)
  • 在Ruby中对组合进行排序(Sorting a combination in Ruby)
  • 网站开发的流程?
  • 使用Zend Framework 2中的JOIN sql检索数据(Retrieve data using JOIN sql in Zend Framework 2)
  • 条带格式类型格式模式编号无法正常工作(Stripes format type format pattern number not working properly)
  • 透明度错误IE11(Transparency bug IE11)
  • linux的基本操作命令。。。