Disruptor快速入门
开源项目
知识点
相关文章
更多最近更新
更多构建Disruptor实例-生产消费模型完成整个入门示例
2019-05-20 23:30|来源: 网路
1、初始化Disruptor,构建Disruptor只要需要以下几个参数
1 eventFactory: 消息(event)工厂对象
2 ringBufferSize: 容器的长度
3 executor: 线程池(建议使用自定义线程池) RejectedExecutionHandler
4 ProducerType: 单生产者 还是 多生产者
5 waitStrategy: 等待策略
示例代码:
Disruptor<OrderEvent> disruptor = new Disruptor<>(orderEventFactory, ringBufferSize, executor, ProducerType.SINGLE, new BlockingWaitStrategy());
2、初始化好Disruptor之后,通过该对象的handleEventsWith添加消费者的监听
3、然后启动Disruptor实例
4、往RingBuffer中生产数据,完成生产消费模型
具体代码如下:
package com656463.quickstart; import java.nio.ByteBuffer; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import com.lmax.disruptor.BlockingWaitStrategy; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; public class Main { public static void main(String[] args) { // 参数准备工作 OrderEventFactory orderEventFactory = new OrderEventFactory(); int ringBufferSize = 4; ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); /** * 1 eventFactory: 消息(event)工厂对象 * 2 ringBufferSize: 容器的长度 * 3 executor: 线程池(建议使用自定义线程池) RejectedExecutionHandler * 4 ProducerType: 单生产者 还是 多生产者 * 5 waitStrategy: 等待策略 */ //1. 实例化disruptor对象 Disruptor<OrderEvent> disruptor = new Disruptor<>(orderEventFactory, ringBufferSize, executor, ProducerType.SINGLE, new BlockingWaitStrategy()); //2. 添加消费者的监听 (构建disruptor 与 消费者的一个关联关系) disruptor.handleEventsWith(new OrderEventHandler()); //3. 启动disruptor disruptor.start(); //4. 获取实际存储数据的容器: RingBuffer RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer(); OrderEventProducer producer = new OrderEventProducer(ringBuffer); ByteBuffer bb = ByteBuffer.allocate(8); for (long i = 0; i < 5; i++) { bb.putLong(0, i); producer.sendData(bb); } disruptor.shutdown(); executor.shutdown(); } }
整个Disruptor的入门程序完成,接下来深入理解Disruptor核心API
相关问答
更多-
生产者消费者程序问题![2023-08-10]
不是断了,是程序停止了! -
从LinkedBlockingQueue迁移到LMAX的Disruptor(Migrating from LinkedBlockingQueue to LMAX' Disruptor)[2022-11-17]
Mentaqueue提供了一个单一的生产者单一消费者队列基于相同的想法 - http://mentaqueue.soliveirajr.com/Page.mtw ,你可以检查代码,但我从来没有使用它自己。 开箱即用的Disruptor提供了两种技术 - 我不会进入代码,但可以根据需要进行操作。 它允许对事件处理程序进行排序,并且可以对其进行配置,以便每个处理程序将并行处理所有请求; 每个请求由每个处理程序处理。 一个Worker Pool实现,它允许一个工作线程池来处理一个请求; 每个请求将从线程池中处理一 ... -
通用.Net生产者/消费者(Generic .Net Producer/Consumer)[2023-12-05]
微软CCR包含你所需要的大部分内容。 以下是一些代码示例和使用说明。 Microsoft CCR contains much of what you need. Here are some code samples and usage notes. -
如果您在多线程环境中使用队列,我建议使用ConcurrentLinkedQueue 。 这将为您管理所有同步。 If you are using a queue inside of a multithreaded environment, I suggest a ConcurrentLinkedQueue. This will manage all the synchronization for you.
-
首先,不要使用Timer和TimerTask 。 使用ExecutorService进行多线程处理。 并在Singleton类中使用Eager Initialization。 或者双重检查null的锁定,以使Singleton真正单身。 FTPClientPolling.java public class FTPClientPolling { private static FTPClientPolling instance = new FTPClientPolling(); privat ...
-
如果我理解你的算法,它应该是: main.Add(key, part[key]) If I understand your algorithm, it should be: main.Add(key, part[key])
-
我该如何修复这个“不完全同步”的消费者生产者示例(How can I fix this “not quite synchronized” consumer producer example)[2022-04-11]
您的消费者必须await 持有锁 (作为方法状态的javadoc )。 另外,你不应该使用tryLock ,你应该只使用lock 。 如果锁定获取失败,如果要执行其他操作,则只使用tryLock 。 在您的情况下,如果锁定获取失败,您只需尝试再次获取它。 Your consumer must await while holding the lock (as the javadocs for the method state). also, you shouldn't be using tryLock, yo ... -
主要区别在于Disruptor设计用于同一个过程。 为什么? 出于性能原因(简答)。 更长的答案是,如果你不小心使用JMS接口的额外开销,套接字连接,锁定和多线程将有更高的开销,使Disruptor相形见绌。 快速JMS服务每秒可处理超过20,000条消息,但破坏程序的设计目的是处理2000万条消息的速率。 要实现这一点,这意味着您无法执行JMS假定的某些事情。 (往上看) The main difference is that the Disruptor is designed to work in th ...
-
LMAX Disruptor事件中的类字段是否需要易变?(Do class fields within an LMAX Disruptor event need to be volatile?)[2024-03-07]
彼得的评论提供了一个很好的线索,事实上,是的,有涉及的记忆围栏。 您可以在Sequence类中看到putOrderedLong()compareAndSet()等等。 每个都强制执行内存排序。 有关更多详细信息,请参阅源代码 。 Peter's comment gives a good clue, and in fact, yes there are memory fences involved. You can see a putOrderedLong() compareAndSet() and so o ... -
一般来说,使用WorkerPool允许多个池化工作线程在单个消费者上工作,如果您具有独立且可能具有可变持续时间的任务(例如:一些短任务,一些更长),这是很好的。 另一个选择是让多个独立工作者对事件进行并行处理,但每个工作者只处理模N个工作者(例如2个线程,一个线程处理奇数,一个线程处理甚至事件ID)。 如果您具有一致的持续时间处理任务,并且允许批处理也能非常有效地工作,则此方法很有用。 另一件需要考虑的事情是消费者可以进行“批处理”,这在审计中尤其有用。 如果您的消费者有10个事件在等待,而不是独立地向审计 ...