阻塞队列和多线程消费者,如何知道何时停止(Blocking queue and multi-threaded consumer, how to know when to stop)
我有一个线程生成器创建一些任务对象,然后被添加到一个
ArrayBlockingQueue
(这是固定的大小)。我也启动一个多线程的消费者。 这是构建为固定线程池(
Executors.newFixedThreadPool(threadCount);
))。 然后,我将一些ConsumerWorker意图提交到此threadPool,每个ConsumerWorker都对上述ArrayBlockingQueue实例进行了修复。每个这样的Worker将在队列中执行一个
take()
并处理该任务。我的问题是,当没有更多的工作要做的时候,让工人知道什么是最好的方法。 换句话说,我如何告诉工作人员,生产者已经完成添加到队列,从这一点上,当他看到队列为空时,每个工作人员都应该停止。
我现在所做的是一个设置,我的生产者被初始化为一个回调,当他完成工作(向队列中添加东西)时被触发。 我还列出了我创建并提交到ThreadPool的所有ConsumerWorkers。 当制片人回调告诉我生产者完成后,我可以告诉每个工人。 在这一点上,他们应该只是继续检查队列是否不为空,当它变为空时,它们应该停止,从而允许我正常地关闭ExecutorService线程池。 这是这样的
public class ConsumerWorker implements Runnable{ private BlockingQueue<Produced> inputQueue; private volatile boolean isRunning = true; public ConsumerWorker(BlockingQueue<Produced> inputQueue) { this.inputQueue = inputQueue; } @Override public void run() { //worker loop keeps taking en element from the queue as long as the producer is still running or as //long as the queue is not empty: while(isRunning || !inputQueue.isEmpty()) { System.out.println("Consumer "+Thread.currentThread().getName()+" START"); try { Object queueElement = inputQueue.take(); //process queueElement } catch (Exception e) { e.printStackTrace(); } } } //this is used to signal from the main thread that he producer has finished adding stuff to the queue public void setRunning(boolean isRunning) { this.isRunning = isRunning; }
}
这里的问题是,我有一个明显的竞争条件,有时生产者将完成,发出信号,消费者工作人员将在消费队列中的所有内容之前停止。
我的问题是什么是最好的同步方式,以便它一切正常? 我应该同步其中检查生产者是否正在运行的整个部分加上如果队列为空加一个块(在队列对象上)从队列中取出一些东西? 我应该在ConsumerWorker实例上同步更新
isRunning
boolean吗? 任何其他建议?更新,这是我终止使用的工作实施:
public class ConsumerWorker implements Runnable{ private BlockingQueue<Produced> inputQueue; private final static Produced POISON = new Produced(-1); public ConsumerWorker(BlockingQueue<Produced> inputQueue) { this.inputQueue = inputQueue; } @Override public void run() { //worker loop keeps taking en element from the queue as long as the producer is still running or as //long as the queue is not empty: while(true) { System.out.println("Consumer "+Thread.currentThread().getName()+" START"); try { Produced queueElement = inputQueue.take(); Thread.sleep(new Random().nextInt(100)); if(queueElement==POISON) { break; } //process queueElement } catch (Exception e) { e.printStackTrace(); } System.out.println("Consumer "+Thread.currentThread().getName()+" END"); } } //this is used to signal from the main thread that he producer has finished adding stuff to the queue public void stopRunning() { try { inputQueue.put(POISON); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } }
}
这受到JohnVint下面的回答的启发,只有一些细微的修改。
===由于@ vendhan的评论更新。
谢谢你的暗示。 你是对的,这个问题中的第一个代码片段(除其他问题之外)其中
while(isRunning || !inputQueue.isEmpty())
并不真正有意义。在我实际的最终实现中,我做的更接近你建议替换“||” (或)与“&&”(和),意思是每个工人(消费者)现在只检查他从列表中获得的元素是否是药丸,如果这样停止(理论上我们可以说工人有要运行,队列不能为空)。
I have a single thread producer which creates some task objects which are then added into an
ArrayBlockingQueue
(which is of fixed size).I also start a multi-threaded consumer. This is build as a fixed thread pool (
Executors.newFixedThreadPool(threadCount);
). I then submit some ConsumerWorker intances to this threadPool, each ConsumerWorker having a refference to the above mentioned ArrayBlockingQueue instance.Each such Worker will do a
take()
on the queue and deal with the task.My issue is, what's the best way to have a Worker know when there won't be any more work to be done. In other words, how do I tell the Workers that the producer has finished adding to the queue, and from this point on, each worker should stop when he sees that the Queue is empty.
What I've got now is a setup where my Producer is initialized with a callback which is triggered when he finishes it's job (of adding stuff to the queue). I also keep a list of all the ConsumerWorkers I've created and submitted to the ThreadPool. When the Producer Callback tells me that the producer is done, I can tell this to each of the workers. At this point they should simply keep checking if the queue is not empty, and when it becomes empty they should stop, thus allowing me to gracefully shutDown the ExecutorService thread pool. It's something like this
public class ConsumerWorker implements Runnable{ private BlockingQueue<Produced> inputQueue; private volatile boolean isRunning = true; public ConsumerWorker(BlockingQueue<Produced> inputQueue) { this.inputQueue = inputQueue; } @Override public void run() { //worker loop keeps taking en element from the queue as long as the producer is still running or as //long as the queue is not empty: while(isRunning || !inputQueue.isEmpty()) { System.out.println("Consumer "+Thread.currentThread().getName()+" START"); try { Object queueElement = inputQueue.take(); //process queueElement } catch (Exception e) { e.printStackTrace(); } } } //this is used to signal from the main thread that he producer has finished adding stuff to the queue public void setRunning(boolean isRunning) { this.isRunning = isRunning; }
}
The problem here is that I have an obvious race condition where sometimes the producer will finish, signal it, and the ConsumerWorkers will stop BEFORE consuming everything in the queue.
My question is what's the best way to synchronize this so that it all works ok? Should I synchronize the whole part where it checks if the producer is running plus if the queue is empty plus take something from the queue in one block (on the queue object)? Should I just synchronize the update of the
isRunning
boolean on the ConsumerWorker instance? Any other suggestion?UPDATE, HERE'S THE WORKING IMPLEMENTATION THAT I'VE ENDED UP USING:
public class ConsumerWorker implements Runnable{ private BlockingQueue<Produced> inputQueue; private final static Produced POISON = new Produced(-1); public ConsumerWorker(BlockingQueue<Produced> inputQueue) { this.inputQueue = inputQueue; } @Override public void run() { //worker loop keeps taking en element from the queue as long as the producer is still running or as //long as the queue is not empty: while(true) { System.out.println("Consumer "+Thread.currentThread().getName()+" START"); try { Produced queueElement = inputQueue.take(); Thread.sleep(new Random().nextInt(100)); if(queueElement==POISON) { break; } //process queueElement } catch (Exception e) { e.printStackTrace(); } System.out.println("Consumer "+Thread.currentThread().getName()+" END"); } } //this is used to signal from the main thread that he producer has finished adding stuff to the queue public void stopRunning() { try { inputQueue.put(POISON); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } }
}
This was inspired heavily by JohnVint's answer below, with only some minor modifications.
=== Update due to @vendhan's comment.
Thank you for your obeservation. You are right, the first snippet of code in this question has (amongst other issues) the one where the
while(isRunning || !inputQueue.isEmpty())
doesn't really make sense.In my actual final implementation of this, I do something which is closer to your suggestion of replacing "||" (or) with "&&" (and), in the sense that each worker (consumer) now only checks if the element he's got from the list is a poison pill, and if so stops (so theoretically we can say that the worker has to be running AND the queue must not be empty).
原文:https://stackoverflow.com/questions/8974638
最满意答案
我的伎俩是我有
ssh
冲突。我的Windows路径上安装了Git,其中包括ssh。 cwrsync也安装ssh。
诀窍是制作批处理文件来设置正确的路径:
rsync.bat
@echo off SETLOCAL SET CWRSYNCHOME=c:\commands\cwrsync SET HOME=c:\Users\Petah\ SET CWOLDPATH=%PATH% SET PATH=%CWRSYNCHOME%\bin;%PATH% %~dp0\cwrsync\bin\rsync.exe %*
在Windows上,您可以键入
where ssh
以检查这是否是一个问题。 你会得到这样的东西:where ssh C:\Program Files (x86)\Git\bin\ssh.exe C:\Program Files\cwRsync\ssh.exe
i get the solution. i've using cygwin and this is the problem the rsync command for Windows work only in windows shell and works in the windows powershell.
A few times it has happened the same error between two linux boxes. and appears to be by incompatible versions of rsync
相关问答
更多-
为什么这个rsync连接在Windows上意外关闭?(Why is this rsync connection unexpectedly closed on Windows? [closed])[2022-05-25]
我的伎俩是我有ssh冲突。 我的Windows路径上安装了Git,其中包括ssh。 cwrsync也安装ssh。 诀窍是制作批处理文件来设置正确的路径: rsync.bat @echo off SETLOCAL SET CWRSYNCHOME=c:\commands\cwrsync SET HOME=c:\Users\Petah\ SET CWOLDPATH=%PATH% SET PATH=%CWRSYNCHOME%\bin;%PATH% %~dp0\cwrsync\bin\rsync.exe %* 在W ... -
去除: _request.Content.Headers.ContentEncoding.Add("gzip"); _request.Content.Headers.ContentEncoding.Add("deflate"); 因为你没有发送gzipped内容 另外,我建议你看一下使用WireShak实际发送的代码 - 因为我确信HttpRequestMessage已经附加了你需要的大部分头文件。 All I needed to do was to forbid automatical request ...
-
我找到了谷歌的解决方案,当使用rsync方法然后将ssh帐户添加到Vagrantfile内容: config.ssh.username = "vagrant" config.ssh.password = "vagrant" config.vm.synced_folder ".", "/var/www", type: "rsync", rsync__exclude: ".git/" 并从文件C:\ HashiCorp \ Vagrant \ embedded \ gems \ gems \ vagrant- ...
-
哎呀,刚才我已经清除并且现在正在工作,命令中有一个错误。 rsync -av -P -e'ssh -p 1111'text.txt abc@123.45.67.890:/ home / abc oops there was a mistake in the command just now i had cleared and it working now. rsync -av -P -e 'ssh -p 1111' text.txt abc@123.45.67.890:/home/abc
-
问题出在CentOS上安装了SElinux,由于某些原因阻塞了rss的ssh。 这是/ var / log / messages中的一行,表示ssh被阻止: Jun 12 13:45:59 myserver kernel: type=1400 audit(1434109559.911:33346): avc: denied { execute } for pid=11862 comm="rsync" name="ssh" dev=dm-1 ino=11931741 scontext=unconfine ...
-
问题出在我的.bash_profile上。 我删除它似乎工作。 奇怪的是,当我把它放回去时,似乎仍然有效 The problem lies in my .bash_profile. I removed it and it seemed to work. Odd thing is when I put it back it seems to still work
-
我曾经做过很多这样的事情。 刚刚进行了测试,提出了一些建议。 拼出你的整个用户@主机模式 首先运行ssh连接没有rsync,您可能需要先批准指纹 您似乎没有传递标志来保护扩展属性,这可能会在OS X上产生损坏的文件。如果您不需要资源分支,那么您可以,但大多数情况下您确实需要它们。 我的测试用例: $ rsync -Pav ~/Desktop/ me@remote.example.com:~/rsyc-test 在这种情况下,〜/ Desktop中的所有文件都被复制到我的家庭目录中的远程主机。 由于目录'r ...
-
cygwin rsync协议错误(cygwin rsync protocol error)[2023-07-26]
问题是一个冲突的git安装。 Git包含它自己的ssh,它的路径在cygwin路径之前,所以rsync使用git的ssh。 解决方法是交换git / cygwin bin路径的顺序,以便rsync使用正确版本的ssh。 The problem was a conflicting git installation. Git included it's own ssh and it's path was before cygwin path, so rsync was using git's ssh. The ... -
这可能是一个时间问题。 我建议使用AppleScript mount volume命令并等待磁盘出现在/Volumes 您需要将server.local替换为NAS的服务器名称。 set diskName to "NASDrive" try mount volume "smb://server.local/" & diskName repeat until diskName is in (do shell script "ls /Volumes") delay 0.2 ...
-
Windows 2012上sshd连接意外关闭[关闭](sshd connections unexpectedly close on windows 2012 [closed])[2022-03-30]
请检查以下步骤 1.)FreeSSHd作为服务运行。 2.)SSH服务正在FreeSSHd控制台上运行。 3.)如果没有启动,请尝试在SSH选项卡下更改SSH端口(例如,在我的情况下为430)。 希望这可以帮助。 Please check following steps 1.) FreeSSHd is running as a service. 2.) SSH Service is running on the FreeSSHd console . 3.) If it not starting try ch ...