协调多个并发队列(Coordinating multiple concurrent queues)
我目前有一个并发队列实现,它使用
BlockingQueue
作为数据存储。 我现在需要引入具有更高优先级的第二类对象,使我朝向原始队列的饥饿/优先级队列。 所以我们正在处理类型为A的对象,而B类是从多个线程生成的。 B类型的任何对象都应该在类型A的对象之前处理,但除了必须保持FIFO顺序之外。 因此,如果插入{1A,1B,2A,3A,2B},则顺序应为{1B,2B,1A,2A,3A}我尝试了一个
PriorityBlockingQueue
将类型B推到前面,但我无法维持FIFO要求(相同类型的项之间没有自然顺序)。我的下一个想法是使用两个并发队列。 在协调两个队列之间的访问时,我正在寻找常见的问题或考虑因素。 理想情况下,我想做这样的事情:
public void add(A a) { aQueue.add(a); } public void add(B b) { bQueue.add(b); } private void consume() { if(!bQueue.isEmpty()) process(bQueue.poll()); else if(!aQueue.isEmpty()) process(aQueue.poll()); }
如果两个队列都是
ConcurrentLinkedQueue
(或在此处插入更合适的结构),我是否需要任何同步或锁定? 注意我有很多生产者,但只有一个消费者(单线程ThreadPoolExecutor
)。编辑:如果一个B在isEmpty()检查后进入,则可以处理A并在下一个consume()调用时处理它。
I currently have a concurrent queue implementation that uses a
BlockingQueue
as the data store. I now need to introduce a second type of object that has a higher priority, leading me towards a starvation/priority queue for the original queue. So we're working with objects of type A and type B being produced from multiple threads. Any objects of type B should be processed before those of type A, but other than that FIFO order MUST be maintained. So if { 1A, 1B, 2A, 3A, 2B } are inserted the order should be {1B, 2B, 1A, 2A, 3A}I tried a single
PriorityBlockingQueue
to push type Bs to the front, but I couldn't maintain the FIFO requirement (there's no natural order between items of the same type).My next thought is to use two concurrent queues. I'm looking for common gotchas or considerations when coordinating access between the two queues. Ideally, I'd want to do something like this:
public void add(A a) { aQueue.add(a); } public void add(B b) { bQueue.add(b); } private void consume() { if(!bQueue.isEmpty()) process(bQueue.poll()); else if(!aQueue.isEmpty()) process(aQueue.poll()); }
Would I need any synchronization or locks if both queues are
ConcurrentLinkedQueue
(or insert more appropriate structure here)? Note I have many producers, but only one consumer (single threadedThreadPoolExecutor
).EDIT: If a B comes in after the isEmpty() check, it's ok to process an A and handle it on the next consume() call.
原文:https://stackoverflow.com/questions/4617356