知识点
相关文章
更多最近更新
更多Storm-kafka【接口】- kafkaUtils
2019-03-02 23:57|来源: 网路
本章主题:
为了实现Storm-kafka : 将Storm之中一些常用的方法进行封装
实现细节如下:
1
public static Iterable<List<Object>> generateTuples( KafkaConfig kafkaConfig, Message msg) { Iterable<List<Object>> tups; ByteBuffer payload = msg.payload(); ByteBuffer key = msg.key(); if (key != null && kafkaConfig.scheme instanceof KeyValueSchemeAsMultiScheme) { tups = ((KeyValueSchemeAsMultiScheme) kafkaConfig.scheme) .deserializeKeyAndValue(Utils.toByteArray(key), Utils.toByteArray(payload)); } else { tups = kafkaConfig.scheme.deserialize(Utils.toByteArray(payload)); } return tups; }
1 :方法内部 Iterable<List<Object>> 类型的tups对象
2 : Message对象是kafka的Message。 包括了Key,以及palyload负载
3 : 在生成元组的时候,需要拿到kafkaConfig.scheme 当中的Scheme,一旦Scheme instanceof
KeyValueSchemeAsMultiScheme,将强制性转换为 KeyValueSchemeAsMultiScheme,并且调用deserializeKeyAndValue 方法将其反序列化 List<Object>
细节上,让我们查看一下 KeyValueSchemeAsMultiScheme 对象
package com.mixbox.storm.kafka; import backtype.storm.spout.SchemeAsMultiScheme; import java.util.Arrays; import java.util.List; @SuppressWarnings("serial") public class KeyValueSchemeAsMultiScheme extends SchemeAsMultiScheme{ public KeyValueSchemeAsMultiScheme(KeyValueScheme scheme) { super(scheme); } //把一个key value 进行反序列化 public Iterable<List<Object>> deserializeKeyAndValue(final byte[] key, final byte[] value) { List<Object> o = ((KeyValueScheme)scheme).deserializeKeyAndValue(key, value); if(o == null) return null; else return Arrays.asList(o); } }
首先, KeyValueSchemeAsMultiScheme 继承自SchemeAsMultiScheme,SchemeAsMultiScheme是Storm自身所带有的一个class实现,以上的2个class 都实现了Storm之中【MultiScheme】
转自:http://my.oschina.net/u/1791874/blog/299338
相关问答
更多-
我这边的应用是这样的: ①采集程序:使用avro方式将自定义对象序列化成字节流存入Kafka ②spark streaming:获取Kafka中的字节流,使用avro反序列化为自定义对象
-
我这边的应用是这样的: ①采集程序:使用avro方式将 自定义对象 序列化成 字节流存入Kafka ②spark streaming:获取Kafka中的字节流,使用avro反序列化为自定义对象
-
Storm-kafka 0.8 plus,我可以从最新的偏移量中读取吗?(Storm-kafka 0.8 plus, Can I read from the latest offset?)[2022-09-19]
试试kafkaConfig.forceStartOffsetTime(-1) 。 -1为最新的Kafka偏移量, -2为最早的可用偏移量。 编辑: 此外,您可以强制喷口开始使用相同的选项从任何所需的偏移量消耗 - 仅传递数值偏移量作为唯一参数。 忽略forceStartOffsetTime的“ Time ”,参数名称有点混乱。 卡夫卡的偏移量是数字,与任何时间概念都没有关系。 -1只是一种告诉卡夫卡鲸鱼嘴从卡夫卡本身收集最新偏移量的特殊方式(最早可用偏移量为-2 )。 Try kafkaConfig.for ... -
我发现你做错了几件事。 你需要传递Map[TopicAndPartition, Long] ,而目前你有一个Tuple2[TopicAndPartition, Long] 。 所以你需要: val fromOffsets: Map[TopicAndPartition, Long] = Map(TopicAndPartition(metrics_rs.getString(1), metrics_rs.getInt(2)) -> metrics_r ...
-
pyspark无法找到KafkaUtils.createDirectStream(pyspark is unable to find KafkaUtils.createDirectStream)[2023-06-09]
您正在使用Spark 1.3.0,并且已在Spark 1.4.0中引入了createDirectStream Python版本。 Spark 1.3仅提供Scala和Java实现。 如果要使用直接流,则必须更新Spark安装。 You're using Spark 1.3.0 and Python version of createDirectStream has been introduced in Spark 1.4.0. Spark 1.3 provides only Scala and Java i ... -
这是Kafka-Spark集成页面中提到的内容。 val kafkaStream = KafkaUtils.createStream(streamingContext,[ZK quorum],[consumer group id],[要使用的Kafka分区的每个主题数]) KafkaUtils.createStream将创建一个接收器并使用Kafka主题。 选项“要使用的Kafka分区的每个主题数”表示此接收器将并行读取多少个分区。 例如,你有一个名为“Topic1”的主题,有2个分区,你提供了选项'Top ...
-
在KafkaUtils010 SparkStreaming中的MessageHandler(MessageHandler in KafkaUtils010 SparkStreaming)[2023-01-25]
当你在0.10中使用createDirectStream ,你会得到一个ConsumerRecord 。 该记录具有topic价值。 您可以创建主题和值的元组: val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String]( streamingContext, PreferConsistent, Subscribe[String, ... -
我将config中的“topology.spout.max.batch.size”值更新为大约64 * 1024值,然后风暴处理变得很快。 I updated the "topology.spout.max.batch.size" value in config to about 64*1024 value and then storm processing became fast.
-
KafkaUtils.createDirectStream到一个String对象Spark(KafkaUtils.createDirectStream to a String object Spark)[2022-07-23]
您正在寻找的API是: DStream.foreachRDD(func) 它将函数func应用于从流中生成的每个RDD。 所以,对于你的用例,我可能会写下面的代码: lines.foreachRDD(rdd => { val data = rdd.collect().mkString("\n") println(data) }) 请注意,由于此代码在驱动程序进程上运行,因此您必须确保它具有足够的资源来处理给定文件。 通常,应该使用此API将每个RDD中的数据推送到外部系统,例如将RDD保存到文件 ... -
我还是不知道造成这个问题的原因。 但基本上我们没有正确关闭风暴,动物园管理员和卡夫卡。 这导致风暴拓扑失败,我们不得不拆除整个集群并重新构建它。 更新到风暴0.10.0有助于解决其他一些问题。 I still dont know what caused this issue. But basically we did not shut down storm, zookeeper and kafka properly. This resulted in storm topologies failing, we ...