Twitter Storm 分布式RPC

2019-03-02 23:38|来源: 网路

分布式RPC

分布式RPC(DRPC)的真正目的是使用storm实时并行计算极端功能。Storm拓扑需要一个输入流作为函数参数,以一个输出流的形式发射每个函数调用的结果。
 
DRPC没有多少storm特性,因为它是从storm的原始流,spouts,bolts,拓扑来表达一个模式。DRPC没有单独打包,但它如此有用,以至于和storm捆绑在一起。
 
概述
分布式RPC通过“DRPC server”协调。DRPC服务器协调接收一个RPC请求,发送请求到storm拓扑,从storm拓扑接收结果,发送结果回等待的客户端。从一个客户端的角度来看,一个分布式RPC调用就像是一个常规的RPC调用。例如,一个客户端如何为带“http://twitter.com”参数的“reach”功能计算结果。
         
 
 
  1. DRPCClient client = new DRPCClient("drpc-host"3772); 
  2. String result = client.execute("reach""http://twitter.com"); 
分布式RPC工作流程如下:

客户端发送功能名称及功能所需参数到DRPC服务器去执行。图中的拓扑实现了此功能,它使用DRPCSpout从DRPC服务器接收功能调用流。每个功能调用通过DRPC服务器使用唯一ID标记,随后拓扑计算结果,在拓扑的最后,一个称之为“ReturnResults”的bolt连接到DRPC服务器,把结果交给这个功能调用(根据功能调用ID),DRPC服务器根据ID找到等待中的客户端,为等待中的客户端消除阻塞,并发送结果给客户端。

LinearDRPCTopologyBuilder
Storm有一个称之为LinearDRPCTopologyBuilder的拓扑Builder几乎自动完成DRPC所需的所有相关步骤。包括:
1.设置spout
2.返回结果给DRPC服务器
3.为bolt提供对一组元组的有限聚合功能
让我们看一个简单的例子。这是一个DRPC拓扑的实现,在输入参数的尾部追加“!”并返回:
          
  
  
  1. public static class ExclaimBolt implements IBasicBolt { 
  2.     public void prepare(Map conf, TopologyContext context) { 
  3.     } 
  4.  
  5.     public void execute(Tuple tuple, BasicOutputCollector collector) { 
  6.         String input = tuple.getString(1); 
  7.         collector.emit(new Values(tuple.getValue(0), input + "!")); 
  8.     } 
  9.  
  10.     public void cleanup() { 
  11.     } 
  12.  
  13.     public void declareOutputFields(OutputFieldsDeclarer declarer) { 
  14.         declarer.declare(new Fields("id""result")); 
  15.     } 
  16.  
  17.  
  18. public static void main(String[] args) throws Exception { 
  19.     LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation"); 
  20.     builder.addBolt(new ExclaimBolt(), 3); 
  21.     // ... 

如你所见,代码非常少。当创建LinearDRPCTopologyBuilder时,你把这个拓扑的DRPC功能名称告诉storm。一个DRPC服务器可以协调许多功能,功能名称用于区别不同的功能,首先声明的bolt将接收一个输入的2-tuples,第一个字段是请求ID,第二个字段是请求参数。LinearDRPCTopologyBuilder认为最后的bolt会发射一个输出流,该输出流包含[id, result]格式的2-tuples。最后,所有拓扑中间过程产生的元组(tuple)都包含请求id作为其第一个字段。

在这个例子中,ExclaimBolt只是简单地在元组的第二个字段尾部追加“!”字符。LinearDRPCTopologyBuilder处理其余的协调工作,包括连接DRPC服务器,发送最终结果。
 
本地模式DRPC
DRPC可以运行在本地模式。这是如何在本地模式运行上述例子:
           
   
   
  1. LocalDRPC drpc = new LocalDRPC(); 
  2. LocalCluster cluster = new LocalCluster(); 
  3.  
  4. cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc)); 
  5.  
  6. System.out.println("Results for 'hello':" + drpc.execute("exclamation""hello")); 
  7.  
  8. cluster.shutdown(); 
  9. drpc.shutdown(); 
首先你创建一个LocalDRPC对象。这个对象在进程内模拟一个DRPC服务器,就像在进程内模拟一个storm集群一样。然后你创建本地集群,在本地模式运行这个拓扑。创建本地拓扑和远程拓扑,LinearDRPCTopologyBuilder有不同的方法。在本地模式,LocalDRPC未绑定任何端口,拓扑也需要知道与哪个对象通讯,这是为什么createLocaclTopology方法需要接受LocalDRPC对象作为输入参数的原因。
 
载入拓扑后,你可以用LocalDRPC的execute方法执行DRPC调用。
 
远程模式DRPC
在实际的集群使用DRPC也很简单。有三个步骤:
1.      启动DRPC服务器
2.      配置DRPC服务器位置
3.      提交DRPC拓扑到storm集群
使用storm脚本启动DRPC服务器,和启动nimbus和ui一样:
          
  
  
  1. bin/storm drpc 
接下来,配置你的storm集群,让集群知道DRPC服务器的位置,这样DRPCSpout就知道从哪里读取功能调用。可以通过修改storm.yaml配置文件或拓扑配置完成配置DRPC服务器位置。修改storm.yaml配置文件如下所示:
          
  
  
  1. drpc.servers: 
  2.   - "drpc1.foo.com" 
  3.   - "drpc2.foo.com" 
最后,使用StormSubmitter启动DRPC拓扑,就像启动其它拓扑一样。在远程模式运行上述例子,代码如下所示:
           
   
   
  1. StormSubmitter.submitTopology("exclamation-drpc", conf, builder.createRemoteTopology()); 
createRemoteTopology方法用于在storm集群创建拓扑。
 
一个更完整的例子
这个exclaimation DRPC例子只是一个用来说明DRPC概念的玩具。让我们看一个更完整的例子,该例子是一个真正需要storm集群的并行计算的DRPC功能。我们将要看的例子是对twitter网站上的一个URL的接触用户进行统计。
一个URL的接触用户数是在twitter网站上接触一个URL的用户数,你需要以下4步:
1. 获取tweeted the URL的全部用户
2. 获取这些用户的全部追随者
3. 使追随者集合中的用户唯一
4. 统计唯一的用户数
一个单独的reach计算在计算期间涉及到数千数据库访问和数千万追随者记录。它是一个真正的耗时计算。正如你将要看到的,在storm上实现这个功能非常简单。在一台机器上,reach计算花费数分钟,在storm集群,最难计算reach的URL也只需数秒。
Storm-starter项目 这里定义了一个reach样例,reach拓扑定义如下所示:
          
  
  
  1. LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("reach"); 
  2. builder.addBolt(new GetTweeters(), 3); 
  3. builder.addBolt(new GetFollowers(), 12
  4.         .shuffleGrouping(); 
  5. builder.addBolt(new PartialUniquer(), 6
  6.         .fieldsGrouping(new Fields("id""follower")); 
  7. builder.addBolt(new CountAggregator(), 2
  8.         .fieldsGrouping(new Fields("id")); 
这个拓扑以4个步骤的形式执行:

1. GetTweeters获取tweeted the URL的用户。它转换一个[id, url]形式的输入流到[id, tweeter]形式的输出流。每个url元组将映射到多个tweeter元组。

2. GetFollowers获取这些tweeter的追随者。它转换一个[id, tweeter]形式的输入流到[id, follower]形式的输出流。跨所有任务,当某人追随多个tweeter,这些tweeter又tweeted相同的URL时,这可能会得到重复的追随者。

3. PartialUniquer按追随者ID对追随者数据流进行分组。同一的追随者去到同一的任务,因此每个PartialUniquer任务都接收到独立的相互独立的追随者集合。 PartialUniquer一旦收到请求ID用于它的所有追随者元组,它就发射追随者子集的唯一总数。
4. 最后,CountAggregator从每个PartialUniquer任务接收计数并对它们求和。
让我们来看看 PartialUniquer
            
    
    
  1. public static class PartialUniquer implements IRichBolt, FinishedCallback { 
  2.     OutputCollector _collector; 
  3.     Map<Object, Set<String>> _sets = new HashMap<Object, Set<String>>(); 
  4.      
  5.     public void prepare(Map conf, TopologyContext context, OutputCollector collector) { 
  6.         _collector = collector; 
  7.     } 
  8.  
  9.     public void execute(Tuple tuple) { 
  10.         Object id = tuple.getValue(0); 
  11.         Set<String> curr = _sets.get(id); 
  12.         if(curr==null) { 
  13.             curr = new HashSet<String>(); 
  14.             _sets.put(id, curr); 
  15.         } 
  16.         curr.add(tuple.getString(1)); 
  17.         _collector.ack(tuple); 
  18.     } 
  19.  
  20.     public void cleanup() { 
  21.     } 
  22.  
  23.     public void finishedId(Object id) { 
  24.         Set<String> curr = _sets.remove(id); 
  25.         int count; 
  26.         if(curr!=null) { 
  27.             count = curr.size(); 
  28.         } else { 
  29.             count = 0
  30.         } 
  31.         _collector.emit(new Values(id, count)); 
  32.     } 
  33.  
  34.     public void declareOutputFields(OutputFieldsDeclarer declarer) { 
  35.         declarer.declare(new Fields("id""partial-count")); 
  36.     } 
当PartialUniquer在exectue方法中接收一个follower元组时,它用一个内部HashMap添加它到与请求ID对应的集合。
PartialUniquer也实现了FinishedCallback接口,它告诉LinearDRPCTopologyBuilder,对于任意给定的请求ID,当它已收到所有指向它的元组时,请通知它。这个回调是finishedId方法。在这个回调中,PartialUniquer发射单一的元组,元组包含它的追随者子集的唯一总数。 

在底层,CoordinatedBolt用于检测一个bolt何时收到该请求ID的所有元组。CoordinatedBolt使用direct stream管理协调。

其它的拓扑应该是不言自明。如你所见,reach计算的每一单步都是并行执行的,而且定义一个DRPC拓扑也非常简单。
Non-Linear DRPC拓扑
LinearDRPCTopologyBuilder仅处理“线性的”DRPC拓扑,计算以一连串步骤的形式表达(像reach)。不难想象某些功能将需要更复杂的拓扑结构,这些拓扑带有带分支和合并bolt。目前,要做到这一点,你需要直接使用CoordinateBolt。务必在邮件列表中谈谈你的非线性DRPC拓扑用例,写下DRPC拓扑更普遍的抽象结构。
LinearDRPCTopologyBuilder如何工作?

DRPCSpout发射[args, return-info],return-info是DRPC服务器的主机和端口,还有DRPC服务器生成的ID。

拓扑组成部分:
  • DRPCSpout
  • PrepareRequest(生成一个请求ID,创建一个返回信息流,一个参数流)
  • CoordinatedBolt包装器和直接分组
  • JoinResult(同返回信息一起连接结果)
  • ReturnResult(连接DRPC服务器并返回结果)
  • LinearDRPCTopologyBuilder是一个构建在Storm原语之上的高层次抽象的好例子。
高级
  • 同时编排处理多个请的KeyedFairBolt
  • 如何直接使用CoordinateBolt
 
 
 
 

转自:http://chenlx.blog.51cto.com/4096635/748348

相关问答

更多
  • 一、DFS为何物? DFS 即微软分布式文件系统的简称,系统管理员可以利用它来有效的整合网络资源,并把这些资源以单一的层次结构呈现给网络用户。管理员利用它可以把资源发布成一 个树形结构,这样大大简化了为用户进行资源配置和对资源管理的工作量。我们可以在不同的机器上调整和移动文件,这不会影响到用户的访问。 二、为什么要使用DES? 1、DFS使用了现有网络中的Share权限,管理员不必进行新的配置 2、通过一个DFS树形结构用户就可以访问多个网络资源,而不用再把远程驱动器映射到本地共享资源中。 3、DFS可以配 ...
  • 分布式系统(distributed system)是建立在网络之上的软件系统。正是因为软件的特性,所以分布式系统具有高度的内聚性和透明性。因此,网络和分布式系统之间的区别更多的在于高层软件(特别是操作系统),而不是硬件。内聚性是指每一个数据库分布节点高度自治,有本地的数据库管理系统。透明性是指每一个数据库分布节点对用户的应用来说都是透明的,看不出是本地还是远程。在分布式数据库系统中,用户感觉不到数据是分布的,即用户不须知道关系是否分割、有无复本、数据存于哪个站点以及事务在哪个站点上执行等。 故名思义,分布式 ...
  • 免责声明:我写了您在上述问题中引用的文章 。 但是,我对“任务”的概念有些困惑。 任务是组件的运行实例(spout还是螺栓)? 执行者有多个任务实际上是说执行者多次执行相同的组件,我是否正确? 是的,是的。 此外,在一般的并行性意义上,Storm将为一个喷口或螺栓生成一个专用的线程(执行器),但是由具有多个任务的执行器(线程)对并行性的贡献是什么? 每个执行器运行多个任务不会增加并行级别 - 执行程序总是有一个线程用于其所有任务,这意味着任务在执行程序上连续运行。 正如我在文章中写道,请注意: 在拓扑开始后 ...
  • 您可以使用Apache Kafka作为分布式和强大的队列,可以处理大量数据,并使您能够将邮件从一个端点传递到另一个端点。 风暴不是队列。 它是一种具有分布式实时处理能力的系统,意味着可以对实时数据进行并行执行各种操作。 这些工具的共同流程(据我所知)如下: 实时系统 - > Kafka - > Storm - > NoSql - > BI(可选) 所以你有你的实时应用程序处理大量的数据,发送到卡夫卡队列。 风暴从卡夫卡拉取数据并应用一些必要的操纵。 在这一点上,您通常希望从这些数据中获得一些好处,因此您可以 ...
  • 你可以使用fieldsGrouping 。 您可以声明一个字段,通过该字段对元组进行分组(在您的情况下为id )。 我只是假设您的输入流是具有id和body字段的JSON对象 {"id":"1234","body":"some body"} 还假设您的拓扑结构有一个喷口,两个螺栓,即BoltA和BoltB。 在BoltB中,覆盖declareOutputFields方法并填写详细信息。 public void declareOutputFields(OutputFieldsDeclarer declare ...
  • 我最终通过使用maven repo中预编译的storm-kafka版本并在拓扑中添加过滤器螺栓而不是在spout本身中进行过滤来解决这个问题。 从而消除了对storm-core和storm-kafka本地编译的jar文件的需求。 这不是一个“解决方案”,但它是解决问题的一种方法。 I eventually worked around this problem by using a pre-compiled version of storm-kafka from a maven repo and adding ...
  • 我从来没有听说过DSMS一词,但是看一下维基百科上的描述,我认为Storm绝对可以说是DSMS。 来自维基百科: 它类似于数据库管理系统(DBMS)[...]但是,与DBMS相比,DSMS执行连续查询,该查询不仅执行一次,而且是永久安装的。 这听起来就像Storm一样。 但请注意,在Storm的情况下,它通常与DBMS结合使用。 例如,Storm可以提供One-time queries , unlimited secondary storage等,维基百科说这与DSMS结合时缺乏DSMS。 I had ne ...
  • 只需按照群集设置指南操作: https://storm.apache.org/documentation/Setting-up-a-Storm-cluster.html 对于伪分布式设置,请在单台计算机中运行所有守护程序(ZK,Nimbus和一个单独的主管)。 Just follow the cluster setup guide: https://storm.apache.org/documentation/Setting-up-a-Storm-cluster.html For pseudo distri ...
  • 错误是由于事实 - BlockingQueue未在输出收集器中初始化; The error is due to the fact -- The BlockingQueue was not initilaized in the output collector;
  • 除非出现问题,否则你已经完成了。 当您向Storm提交拓扑时,Nimbus服务会通过遍布整个群集的Supervisor进程查看群集上的负载。 然后,Nimbus为拓扑运行提供一定数量的资源。 这些资源通常遍布集群中的各个Supervisor节点,并且它们将并行处理元组。 Nimbus偶尔会重新审视这些决策并更改哪些节点处理什么以尝试保持群集中的负载平衡。 作为用户,您永远不应该注意到这个过程。 假设您的Storm集群设置正确,您唯一需要做的就是提交拓扑。 Storm为您处理整个多节点并行处理事务。 也就是说 ...