知识点
相关文章
更多最近更新
更多Storm中Spout使用注意事项小结
2019-03-02 23:44|来源: 网路
Storm中Spout用于读取并向计算拓扑中发送数据源,最近在调试一个topology时遇到了系统qps低,处理速度达不到要求的问题,经过排查后发现是由于对Spout的使用模式不当导致的多线程同步等待。这里罗列几点个人觉得编写Spout代码时需要特别注意的地方:
1. 最常用的模式是使用一个线程安全的queue,如BlockingQueue,spout主线程从queue中读取数据;另外的一个或多个线程负责从数据源(如各种消息中间件、db等)读取数据并放入queue中。
2. 如果不关心数据是否丢失(例如数据统计分析的典型场景),不要启用ack机制。
3. Spout的nextTuple和ack方法是在同一个线程中被执行的(可能最初觉得这块不会成为瓶颈,为了简单实现起见就单线程了,jstorm应该是已经改成了多线程),因此不能在nextTuple或ack方法里block住当前线程,这样将直接影响spout的处理速度,很关键。
4. Spout的nextTuple发送数据时,不能阻塞当前线程(见上一条),比如从queue中取数据时,使用poll接口而不是take,且poll方法尽量不要传参阻塞固定时间,如果queue中没有数据则直接返回;如果有多条待发送的数据,则一次调用nextTuple时遍历全部发出去。
5. Spout从0.8.1之后在调用nextTuple方法时,如果没有emit tuple,那么默认需要休眠1ms,这个具体的策略是可配置的,因此可以根据自己的具体场景,进行设置,以达到合理利用cpu资源。
转自:http://www.cnblogs.com/panfeng412/p/storm-spout-common-issues
相关问答
更多-
电梯机房安全操作注意事项[2022-09-12]
电梯安全乘坐须知 为保证乘客的人身安全和电梯设备的正常,请遵照以下规定正确使用电梯。 一、禁止携带易燃、易爆或带腐蚀性的危险品乘坐电梯。 二、乘坐电梯时请勿在轿厢内左右摇晃。 三、禁止在轿厢内吸烟以免引起火灾。 -
R-Storm包中的ack()函数存在错误。 现在已修复。 如果你需要,可以从回购中抽出一个git。 https://github.com/allenday/R-Storm/blob/master/Storm/R/Storm.R 此帖子之后的任何软件包更新都应该有修复。 There was an bug in ack() function in R-Storm package. This is fixed now. Do a git pull from repo if you need this. http ...
-
Storm-jms Spout收集Avro消息并向下传输?(Storm-jms Spout collecting Avro messages and sending down stream?)[2022-07-02]
现有的HDFS Bolt不支持编写avro文件,我们需要通过进行以下更改来克服这个问题。 在此示例代码中,我使用从我的spout获取JMS消息并将这些JMS字节消息转换为AVRO并将它们发送到HDFS。 此代码可以作为修改AbstractHdfsBolt中的方法的示例。 public void execute(Tuple tuple) { try { long length = bytesMessage.getBody ... -
来自Apache Storm Trident和Kafka集成的Spout错误(Spout Error from Apache Storm Trident and Kafka Integration)[2022-05-01]
你的堆栈跟踪表明你正在访问https://issues.apache.org/jira/browse/STORM-3046 。 Your stack trace indicates that you are hitting https://issues.apache.org/jira/browse/STORM-3046. -
是。 您实际上可以在元组的基础上启用/禁用acking。 在collector.emit(...)您可以提供message-id(启用acking)或省略message-id(禁用acking)。 对于其余的拓扑结构,您不需要再关心这种差异。 如果你使用一个没有ID的元组作为锚点或者确认它,Storm只会忽略这些调用并继续。 Yes. You can actually enable/disable acking on a tuple basis. On collector.emit(...) you can ...
-
我已经浏览了ptgoetz的Storm JMS示例,并提出了一个解决方案,可以直接将主题数据提供给spout。 需要在jms-activemq.xml中指定主题
同一个Storm spout是否并行接收acked / failed消息(Does same Storm spout receive acked/failed message in parallel)[2022-03-17]
如果我正确解释Storm的保证消息处理文档,那么发出元组的Spout将始终收到ack / fail调用: “请注意,元组将被创建它的完全相同的Spout任务激活或失败。因此,如果Spout在整个集群中执行尽可能多的任务,则不会通过不同于任务的任务执行或失败元组。创造了它。“ If I'm interpreting Storm's guaranteed message processing documentation correctly, then the Spout that emitted the tup ...后来我发现上述行为的原因是因为注册的序列化器过滤掉了来自卡夫卡队列的所有消息,因此没有消息从喷口发射到螺栓。 这也解释了为什么kafka主题消费群体的抵消逐渐增加。 I later figured out that the reason for the above behaviour was because the serializer registered was filtering out all of the messages from the kafka queue thereby no messa ...创建一个每隔X秒发出一次元组的Apache Storm spout(Creating an Apache Storm spout which emits tuples every X seconds)[2023-09-17]
您可以实现自己的MQTT喷口。 举个例子来看看MongoSpout 。 重要的部分是nextTuple方法。 调用此方法时,Storm会请求Spout向输出收集器发出元组。 此方法应该是非阻塞的,因此如果Spout没有要发出的元组,则此方法应该返回。 nextTuple,ack和fail都是在spout任务中的单个线程中的紧密循环中调用的。 当没有元组发出时,让nextTuple睡眠很短的时间(如一毫秒)是有礼貌的,这样就不会浪费太多的CPU。 您不能一次等待指定的时间,但是您可以实现nextTuple以便 ...