Twitter Storm 序列化

2019-03-02 23:38|来源: 网路

序列化

这篇文章是关于序列化方法在storm 0.6.0版及之前版本中是如何工作的。0.6.0版之前,storm使用一种不同的序列化方法,参见 Serialization (prior to 0.6.0)

元组可由任何一种类型的对象组成。由于storm是一个分布式系统,当对象在任务之间传递时,它需要知道如何序列化和反序列化这些对象。
Storm使用 Kryo进行序列化。Kryo是一个灵活快速的序列化库,产生小的序列化。
默认情况下,storm能序列化原始类型:String、字节数组、ArrayList、HashMap、HashSet及Clojure的集合类型。如果你想在元组中使用另外的类型,你需要注册一个自定义序列化装置。
 
动态类型
元组中字段没有声明类型。你放置对象到字段中,storm动态地计算出序列化。我们获到序列化接口之前,让我们花点时间理解为什么storm的元组是动态类型。
如果添加静态类型到元组字段将使Storm API非常复杂。例如,Hadoop,它的key和value是静态类型,但需要非常多的注解。使用Hadoop API是一个负担,这样子做到类型安全是不值得的。动态类型简单易用。
此外,不可能用合理的方式静态化storm元组的类型。假如一个bolt订阅多个数据流,这些数据流中的字段可能使用不同的数据类型。当一个bolt在execute方法中接收一个元组后,这个元组可以来自于任意一个数据流,因此元组的数据类型可以是任意数据类型的组合。这里也许你可以使用一些反射技巧,为一个bolt订阅的不同数据流中的元组声明不同的方法,但storm使用简单直接的方式实现动态类型。
最后,使用动态类型的另一个原因是因为允许动态类型语言以简单的方式使用storm,像Clojure和Ruby。
 
自定义序列化
如上所述,Storm使用 Kryo进行序列化。为了实现自定义序列化,你需要注册新的序列化装置和Kryo,强烈推荐你看看 Kryo的主页,了解它如何处理自定义序列化。
通过拓扑配置的“topology.kryo.register”属性添加自定义序列化。它需要一个注册清单,其中的每个注册都可以采用以下两种形式之一:
1. 你要注册的类名。在这种情况下,storm使用Kryo的“FieldsSerializer”来序列化这个类。对这个类来说,这不一定是最优的,更多细节参见Kryo文档。
2. 你要注册的类名和一个 com.esotericsoftware.kryo.Serializer接口的实现。
让我们看一个例子:
          
  
  
  1. topology.kryo.register: 
  2.   - com.mycompany.CustomType1 
  3.   - com.mycompany.CustomType2: com.mycompany.serializer.CustomType2Serializer 
  4.   - com.mycompany.CustomType3 
com.mycompany.CustomType1和com.mycompany.CustomType3的序列化使用FieldsSerializer。但com.mycompany.CustomType2的序列化使用com.mycompany.serializer.CustomType2Serializer。
Storm使用拓扑配置注册序列化装置提供了帮助。Config类的registerSerialization方法把注册的序列化装置添加到配置。
这里有一个称之为Config.TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS的高级配置。如果你设置它为真,storm将忽略任何已注册的序列化装置,就算classpath中没有它们的代码可用;否则,当storm未找到一个序列化装置时,将抛出异常。如果你在一个集群中运行多个拓扑,每个拓扑使用不同的序列化方式,但你想在storm.yaml文件中对这些拓扑声明各自的序列化方式,这个配置就非常有用。
 
Java序列化
如果storm遇到一个未注册序列化装置的类型,它将使用java序列化。如果对象不能用java序列化,storm将抛出异常。
注意,Java序列化是非常昂贵的,不管是CPU的花费,还是被序列化后对象所占的空间。在生产环境运行拓扑,强烈建议你注册自定义序列化装置。
通过设置Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION为假,你可以关闭这个行为,回退到使用java序列化。
 
 
 

 


转自:http://chenlx.blog.51cto.com/4096635/749165

相关问答

更多
  • 免责声明:我写了您在上述问题中引用的文章 。 但是,我对“任务”的概念有些困惑。 任务是组件的运行实例(spout还是螺栓)? 执行者有多个任务实际上是说执行者多次执行相同的组件,我是否正确? 是的,是的。 此外,在一般的并行性意义上,Storm将为一个喷口或螺栓生成一个专用的线程(执行器),但是由具有多个任务的执行器(线程)对并行性的贡献是什么? 每个执行器运行多个任务不会增加并行级别 - 执行程序总是有一个线程用于其所有任务,这意味着任务在执行程序上连续运行。 正如我在文章中写道,请注意: 在拓扑开始后 ...
  • 使用将在编译时使用的XmlRoot属性来装饰您的根实体。 [XmlRoot(Namespace = "www.contoso.com", ElementName = "MyGroupName", DataType = "string", IsNullable=true)] 或者在运行时序列化时指定root属性。 XmlRootAttribute xRoot = new XmlRootAttribute(); xRoot.ElementName = "user"; // xRoot.Namespace = ...
  • 你可以使用fieldsGrouping 。 您可以声明一个字段,通过该字段对元组进行分组(在您的情况下为id )。 我只是假设您的输入流是具有id和body字段的JSON对象 {"id":"1234","body":"some body"} 还假设您的拓扑结构有一个喷口,两个螺栓,即BoltA和BoltB。 在BoltB中,覆盖declareOutputFields方法并填写详细信息。 public void declareOutputFields(OutputFieldsDeclarer declare ...
  • 你应该调用[tableView reloadData]; 在填充数组后,在请求完成处理程序中。 检查您是否收到任何数据。 数组是否填充了字典对象? 但是,伙计,严肃地说,你需要阅读一些关于编码的好书,你的代码真的缺乏对你正在做的事情的理解。 请删除[tableView reloadData]; 来自- ...cellForRowAtIndexPath:...方法。 You should call [tableView reloadData]; in your request completion handl ...
  • 好吧,没有办法像我想要的那样快速压缩。 但是我找到了解决办法,如果有人需要,我可以在这里分享。 这个问题不仅与Storm相关,而且是一个更一般的Hadoop问题。 我的所有数据都是使用HdfsBolt写入的: RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter("|"); //Synchronize data buffer with the filesystem every 1000 tuples ...
  • 您的问题的根本原因是您正在向您的风暴配置添加ProducerTemplate,并且它正在抛出异常,因为它不可序列化。 如果那是你自己的类,你可以改变代码使其工作,但由于这是一个Camel类,我会推荐一种不同的方法。 WebSocketBolt:将您的producerTemplate私有成员更改为transient: private transient ProducerTemplate producerTemplate; 这样就不会尝试序列化(将其置于conf中也会遇到同样的问题)。 WebSocketBol ...
  • 我最终通过使用maven repo中预编译的storm-kafka版本并在拓扑中添加过滤器螺栓而不是在spout本身中进行过滤来解决这个问题。 从而消除了对storm-core和storm-kafka本地编译的jar文件的需求。 这不是一个“解决方案”,但它是解决问题的一种方法。 I eventually worked around this problem by using a pre-compiled version of storm-kafka from a maven repo and adding ...
  • 我从来没有听说过DSMS一词,但是看一下维基百科上的描述,我认为Storm绝对可以说是DSMS。 来自维基百科: 它类似于数据库管理系统(DBMS)[...]但是,与DBMS相比,DSMS执行连续查询,该查询不仅执行一次,而且是永久安装的。 这听起来就像Storm一样。 但请注意,在Storm的情况下,它通常与DBMS结合使用。 例如,Storm可以提供One-time queries , unlimited secondary storage等,维基百科说这与DSMS结合时缺乏DSMS。 I had ne ...
  • 我不知道你正在使用的平台,但在C ++ 10ms是永恒的 。 我认为你正在使用错误的工具来完成工作。 使用C ++,提供一些本地查询应该不到一微秒。 触摸多个内存位置和/或必须等待磁盘或网络I / O的非本地查询别无选择,只能花费更多时间。 在这种情况下,并行性是你最好的朋友。 你必须找到瓶颈。 是I / O吗? 是CPU吗? 是内存带宽吗? 是内存访问时间吗? 在找到瓶颈之后,您可以改进它,异步它和/或乘以(=并行化)它。 I don't know the platform you're using, b ...
  • 以您打算稍后使用它的格式将数据存储在数据库中是最好的,这样您只需操作一次。 在完整性方面,mysql_real_escape_string和serialize / unserialize函数将确保您存储的内容是您获得的内容。 要存储数据以便检索并假设您拥有PHP变量$ data中的数据(无论何种格式),您可以执行类似的操作。