Storm中的数据并行性(Data parallelism in Storm)
我已阅读有关Apache风暴的信息,并做了一些基本的教程。 我有以下拓扑结构,我想用storm实现,但不知道如何处理数据分发。 业务要求是实时评估客户组合。 简化形式包括:1)接受市场价格的实时蒸汽(货币,商品等......)2)每个价格刻度计算每个头寸的当前利润并将其转换为客户账户货币3)分析总p / l和每个客户的所有头寸的数量,并在需要时生成信号4)在客户层面,计算必须是连续的和原子/序列化的。 即所有头寸必须按其进入系统的顺序进行评估,即使客户有100个头寸,也必须根据相同的价格计算总数。 5)分析按符号/客户类型/国家/等等汇总的系统中所有职位的数量/趋势,并使其在某种仪表板中可用。
所有订单都在rdbms中执行并存储。 我的主要问题是如何在不同节点上分布数千个位于Storm螺栓上的位置,每个节点都处理它自己的部分。 使用Modulo足以对客户进行分区,但是我如何为每个螺栓实例提供id,以便它们每个都只处理它自己的同等部分客户? Storm中有没有开箱即用的东西? 另一个问题是如何有效地进行上述聚合?
I have read about the Apache storm and did some basic tutorials. I have following topology in mind that I would like to implement with storm, but not sure how to handle the data distribution. Business requirement is to evaluate customers portfolio in realtime. In simplified form it involves: 1) Accept live steam of market prices (currencies, commodities, etc...) 2) For every price tick calculate current profit of every position and convert it to customer account currency 3) Analyze total p/l and volume of all positions per customer and generate signals if required 4) At customer level calculation must be sequential and atomic/serialized. I.e. all positions must be evaluated with every tick in the order it entered the system and totals must be calculated based on the same price even if customer has 100s of positions. 5) Analyze volumes / trends of all positions in system aggregated by symbol/customer type/country /etc... and make them available in some kind of a dashboard.
All orders are executed and stored in rdbms. My major question is how to distribute 100s of thousands of positions across Storm bolts on different nodes that every node handles it's own part. Using Modulo is good enough for partitioning the customers, but how can I provide id to every instance of bolt so each of them handles it's own equal part of customers only? Is there something out of the box in Storm to do that? Another question is how to do above aggregations efficiently?
原文:https://stackoverflow.com/questions/30130926
最满意答案
要执行此功能,您必须使用插件加载器。 使用插件加载器加载类。
这是一个加载器类,用于在/ library / Xyz / Fld1 / Fld2 / Fld3 /目录中加载自定义类
这是一个代码示例。
<?php class Xyz_Core { /** * File name Core.php inside Xyz directory * * Loader for parsers * * @var Zend_Loader_PluginLoader */ protected $_pluginLoader; /** * Gets the plugin loader * * @return Zend_Loader_PluginLoader */ public function getPluginLoader() { if( null === $this->_pluginLoader ) { $this->_pluginLoader = new Zend_Loader_PluginLoader(array( 'Xyz_Fld1_Fld2_Fld3_' => 'XYZ/fld1/fld2/fld3/' )); } return $this->_pluginLoader; } /** * Get a helper * * @param string $name */ public function getHelper($name) { $name = $this->_normalizeHelperName($name); if( !isset($this->_helpers[$name]) ) { $helper = $this->getPluginLoader()->load($name); $this->_helpers[$name] = new $helper; } return $this->_helpers[$name]; } /** * Normalize helper name * * @param string $name * @return string */ protected function _normalizeHelperName($name) { $name = preg_replace('/[^A-Za-z0-9]/', '', $name); //$name = strtolower($name); $name = ucfirst($name); return $name; } } $api = new Xyz_Core(); /* * To load object of class Example.php */ $obj = $api->getHelper('Example'); /* * Or To load include the file only of class Example.php */ $class = $api->getPluginLoader()->load('Example'); $obj = new $class($param1, $param2, $etc);
To do the functionality you have to use plugin loader. Using plugin loader you load classes.
This is a loader class to load custom classes inside your /library/Xyz/Fld1/Fld2/Fld3/ directory
Here is a code sample.
<?php class Xyz_Core { /** * File name Core.php inside Xyz directory * * Loader for parsers * * @var Zend_Loader_PluginLoader */ protected $_pluginLoader; /** * Gets the plugin loader * * @return Zend_Loader_PluginLoader */ public function getPluginLoader() { if( null === $this->_pluginLoader ) { $this->_pluginLoader = new Zend_Loader_PluginLoader(array( 'Xyz_Fld1_Fld2_Fld3_' => 'XYZ/fld1/fld2/fld3/' )); } return $this->_pluginLoader; } /** * Get a helper * * @param string $name */ public function getHelper($name) { $name = $this->_normalizeHelperName($name); if( !isset($this->_helpers[$name]) ) { $helper = $this->getPluginLoader()->load($name); $this->_helpers[$name] = new $helper; } return $this->_helpers[$name]; } /** * Normalize helper name * * @param string $name * @return string */ protected function _normalizeHelperName($name) { $name = preg_replace('/[^A-Za-z0-9]/', '', $name); //$name = strtolower($name); $name = ucfirst($name); return $name; } } $api = new Xyz_Core(); /* * To load object of class Example.php */ $obj = $api->getHelper('Example'); /* * Or To load include the file only of class Example.php */ $class = $api->getPluginLoader()->load('Example'); $obj = new $class($param1, $param2, $etc);
相关问答
更多-
要将验证器选项注入到您的构造函数方法中,您应该在module.config.php文件中注册您的验证器: array( 'factories' => array( ImageValidator:: ...
-
您的使用声明应如下: use Zend\Validator\Identical; 您正在use语句而不是名称空间导入类。 Your use statement should be as follows: use Zend\Validator\Identical; You are importing classes with the use statement and not namespaces.
-
有几种方法可以做到这一点: 您可以在application.ini文件中添加命名空间: [production] autoloaderNamespaces[] = "Dtd_" 或者,在你的引导下: protected function _initAutoloader() { $autoloader = Zend_Loader_Autoloader::getInstance(); $autoloader->registerNamespace("Dtd_"); } There are a ...
-
Zend Validator位置(Zend Validator location)[2022-04-12]
我能够解决我的问题 addElementPrefixPath('Application_Validate', '../application/validate', 'validate'); I was able to solve my problem using addElementPrefixPath('Application_Validate', ... -
要执行此功能,您必须使用插件加载器。 使用插件加载器加载类。 这是一个加载器类,用于在/ library / Xyz / Fld1 / Fld2 / Fld3 /目录中加载自定义类 这是一个代码示例。好的,我想出了问题所在。 而不是返回Zend \ Mvc \ Router \ RouteMatch我应该返回Zend \ Mvc \ Router \ Http \ RouteMatch 这解决了我的问题 Ok, i figured out what the problem was. Instead of returning Zend\Mvc\Router\RouteMatch I should return Zend\Mvc\Router\Http\RouteMatch This fixed my pr ...那么,这个问题在评论中得到了排序:)把这个问题变成一个答案,你可以接受关闭这个问题。 您不能从PHP脚本访问该路径,查看错误消息(/ usr / share不在允许的目录列表中)。 改变包含路径(我猜)/ var/home/library/Zend/应该解决它。 PLESK和CentOS显然存在一个问题,那就是无法从虚拟主机访问PHP的包含路径! 我认为这归结于open_basedir设置,该设置禁止虚拟主机访问外部任何内容,这不是操作系统特定的。 无论如何,很高兴它被分类! Well, this ques ...你有一个错误:在评估while(argv[5][count+count2] != 0);之前你应该零count while(argv[5][count+count2] != 0); 。 如果你不这样做,你将访问argv数组越界,每个构建可能会有所不同。 ... count2 += count; printf("\nAfter Count: %3d, Count2: %3d, Addthem: %3d, Next: %02X", count, count2, count+count2 ...最后,我决定在我的Controller中手动解析传入的SOAP请求: // TestController.php class TestSoapServer extends Zend_Soap_Server { // Handle the request and generate suitable response public function handle($request = null) { if (null === $request) { ...您需要配置StandardAutoloader以加载库类。 最简单的方法是修改Application模块的Module::getAutoloaderConfig()方法,使其看起来像这样: public function getAutoloaderConfig() { return array( 'Zend\Loader\StandardAutoloader' => array( 'namespaces' => array( _ ...
相关文章
更多- Storm入门
- Storm重要概念
- Storm Topology的并发度
- storm
- Storm Config
- storm
- Yet Another STORM
- Storm配置项详解
- Storm配置项详解
- Storm编程概述
最新问答
更多- 获取MVC 4使用的DisplayMode后缀(Get the DisplayMode Suffix being used by MVC 4)
- 如何通过引用返回对象?(How is returning an object by reference possible?)
- 矩阵如何存储在内存中?(How are matrices stored in memory?)
- 每个请求的Java新会话?(Java New Session For Each Request?)
- css:浮动div中重叠的标题h1(css: overlapping headlines h1 in floated divs)
- 无论图像如何,Caffe预测同一类(Caffe predicts same class regardless of image)
- xcode语法颜色编码解释?(xcode syntax color coding explained?)
- 在Access 2010 Runtime中使用Office 2000校对工具(Use Office 2000 proofing tools in Access 2010 Runtime)
- 从单独的Web主机将图像传输到服务器上(Getting images onto server from separate web host)
- 从旧版本复制文件并保留它们(旧/新版本)(Copy a file from old revision and keep both of them (old / new revision))
- 西安哪有PLC可控制编程的培训
- 在Entity Framework中选择基类(Select base class in Entity Framework)
- 在Android中出现错误“数据集和渲染器应该不为null,并且应该具有相同数量的系列”(Error “Dataset and renderer should be not null and should have the same number of series” in Android)
- 电脑二级VF有什么用
- Datamapper Ruby如何添加Hook方法(Datamapper Ruby How to add Hook Method)
- 金华英语角.
- 手机软件如何制作
- 用于Android webview中图像保存的上下文菜单(Context Menu for Image Saving in an Android webview)
- 注意:未定义的偏移量:PHP(Notice: Undefined offset: PHP)
- 如何读R中的大数据集[复制](How to read large dataset in R [duplicate])
- Unity 5 Heighmap与地形宽度/地形长度的分辨率关系?(Unity 5 Heighmap Resolution relationship to terrain width / terrain length?)
- 如何通知PipedOutputStream线程写入最后一个字节的PipedInputStream线程?(How to notify PipedInputStream thread that PipedOutputStream thread has written last byte?)
- python的访问器方法有哪些
- DeviceNetworkInformation:哪个是哪个?(DeviceNetworkInformation: Which is which?)
- 在Ruby中对组合进行排序(Sorting a combination in Ruby)
- 网站开发的流程?
- 使用Zend Framework 2中的JOIN sql检索数据(Retrieve data using JOIN sql in Zend Framework 2)
- 条带格式类型格式模式编号无法正常工作(Stripes format type format pattern number not working properly)
- 透明度错误IE11(Transparency bug IE11)
- linux的基本操作命令。。。