首页 \ 问答 \ 阻塞队列和多线程消费者,如何知道何时停止(Blocking queue and multi-threaded consumer, how to know when to stop)

阻塞队列和多线程消费者,如何知道何时停止(Blocking queue and multi-threaded consumer, how to know when to stop)

我有一个线程生成器创建一些任务对象,然后被添加到一个ArrayBlockingQueue (这是固定的大小)。

我也启动一个多线程的消费者。 这是构建为固定线程池( Executors.newFixedThreadPool(threadCount); ))。 然后,我将一些ConsumerWorker意图提交到此threadPool,每个ConsumerWorker都对上述ArrayBlockingQueue实例进行了修复。

每个这样的Worker将在队列中执行一个take()并处理该任务。

我的问题是,当没有更多的工作要做的时候,让工人知道什么是最好的方法。 换句话说,我如何告诉工作人员,生产者已经完成添加到队列,从这一点上,当他看到队列为空时,每个工作人员都应该停止。

我现在所做的是一个设置,我的生产者被初始化为一个回调,当他完成工作(向队列中添加东西)时被触发。 我还列出了我创建并提交到ThreadPool的所有ConsumerWorkers。 当制片人回调告诉我生产者完成后,我可以告诉每个工人。 在这一点上,他们应该只是继续检查队列是否不为空,当它变为空时,它们应该停止,从而允许我正常地关闭ExecutorService线程池。 这是这样的

public class ConsumerWorker implements Runnable{

private BlockingQueue<Produced> inputQueue;
private volatile boolean isRunning = true;

public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
    this.inputQueue = inputQueue;
}

@Override
public void run() {
    //worker loop keeps taking en element from the queue as long as the producer is still running or as 
    //long as the queue is not empty:
    while(isRunning || !inputQueue.isEmpty()) {
        System.out.println("Consumer "+Thread.currentThread().getName()+" START");
        try {
            Object queueElement = inputQueue.take();
            //process queueElement
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void setRunning(boolean isRunning) {
    this.isRunning = isRunning;
}

}

这里的问题是,我有一个明显的竞争条件,有时生产者将完成,发出信号,消费者工作人员将在消费队列中的所有内容之前停止。

我的问题是什么是最好的同步方式,以便它一切正常? 我应该同步其中检查生产者是否正在运行的整个部分加上如果队列为空加一个块(在队列对象上)从队列中取出一些东西? 我应该在ConsumerWorker实例上同步更新isRunning boolean吗? 任何其他建议?

更新,这是我终止使用的工作实施:

public class ConsumerWorker implements Runnable{

private BlockingQueue<Produced> inputQueue;

private final static Produced POISON = new Produced(-1); 

public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
    this.inputQueue = inputQueue;
}

@Override
public void run() {
    //worker loop keeps taking en element from the queue as long as the producer is still running or as 
    //long as the queue is not empty:
    while(true) {
        System.out.println("Consumer "+Thread.currentThread().getName()+" START");
        try {
            Produced queueElement = inputQueue.take();
            Thread.sleep(new Random().nextInt(100));
            if(queueElement==POISON) {
                break;
            }
            //process queueElement
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("Consumer "+Thread.currentThread().getName()+" END");
    }
}

//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void stopRunning() {
    try {
        inputQueue.put(POISON);
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

}

这受到JohnVint下面的回答的启发,只有一些细微的修改。

===由于@ vendhan的评论更新。

谢谢你的暗示。 你是对的,这个问题中的第一个代码片段(除其他问题之外)其中while(isRunning || !inputQueue.isEmpty())并不真正有意义。

在我实际的最终实现中,我做的更接近你建议替换“||” (或)与“&&”(和),意思是每个工人(消费者)现在只检查他从列表中获得的元素是否是药丸,如果这样停止(理论上我们可以说工人有要运行,队列不能为空)。


I have a single thread producer which creates some task objects which are then added into an ArrayBlockingQueue (which is of fixed size).

I also start a multi-threaded consumer. This is build as a fixed thread pool (Executors.newFixedThreadPool(threadCount);). I then submit some ConsumerWorker intances to this threadPool, each ConsumerWorker having a refference to the above mentioned ArrayBlockingQueue instance.

Each such Worker will do a take() on the queue and deal with the task.

My issue is, what's the best way to have a Worker know when there won't be any more work to be done. In other words, how do I tell the Workers that the producer has finished adding to the queue, and from this point on, each worker should stop when he sees that the Queue is empty.

What I've got now is a setup where my Producer is initialized with a callback which is triggered when he finishes it's job (of adding stuff to the queue). I also keep a list of all the ConsumerWorkers I've created and submitted to the ThreadPool. When the Producer Callback tells me that the producer is done, I can tell this to each of the workers. At this point they should simply keep checking if the queue is not empty, and when it becomes empty they should stop, thus allowing me to gracefully shutDown the ExecutorService thread pool. It's something like this

public class ConsumerWorker implements Runnable{

private BlockingQueue<Produced> inputQueue;
private volatile boolean isRunning = true;

public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
    this.inputQueue = inputQueue;
}

@Override
public void run() {
    //worker loop keeps taking en element from the queue as long as the producer is still running or as 
    //long as the queue is not empty:
    while(isRunning || !inputQueue.isEmpty()) {
        System.out.println("Consumer "+Thread.currentThread().getName()+" START");
        try {
            Object queueElement = inputQueue.take();
            //process queueElement
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void setRunning(boolean isRunning) {
    this.isRunning = isRunning;
}

}

The problem here is that I have an obvious race condition where sometimes the producer will finish, signal it, and the ConsumerWorkers will stop BEFORE consuming everything in the queue.

My question is what's the best way to synchronize this so that it all works ok? Should I synchronize the whole part where it checks if the producer is running plus if the queue is empty plus take something from the queue in one block (on the queue object)? Should I just synchronize the update of the isRunning boolean on the ConsumerWorker instance? Any other suggestion?

UPDATE, HERE'S THE WORKING IMPLEMENTATION THAT I'VE ENDED UP USING:

public class ConsumerWorker implements Runnable{

private BlockingQueue<Produced> inputQueue;

private final static Produced POISON = new Produced(-1); 

public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
    this.inputQueue = inputQueue;
}

@Override
public void run() {
    //worker loop keeps taking en element from the queue as long as the producer is still running or as 
    //long as the queue is not empty:
    while(true) {
        System.out.println("Consumer "+Thread.currentThread().getName()+" START");
        try {
            Produced queueElement = inputQueue.take();
            Thread.sleep(new Random().nextInt(100));
            if(queueElement==POISON) {
                break;
            }
            //process queueElement
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("Consumer "+Thread.currentThread().getName()+" END");
    }
}

//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void stopRunning() {
    try {
        inputQueue.put(POISON);
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

}

This was inspired heavily by JohnVint's answer below, with only some minor modifications.

=== Update due to @vendhan's comment.

Thank you for your obeservation. You are right, the first snippet of code in this question has (amongst other issues) the one where the while(isRunning || !inputQueue.isEmpty()) doesn't really make sense.

In my actual final implementation of this, I do something which is closer to your suggestion of replacing "||" (or) with "&&" (and), in the sense that each worker (consumer) now only checks if the element he's got from the list is a poison pill, and if so stops (so theoretically we can say that the worker has to be running AND the queue must not be empty).


原文:https://stackoverflow.com/questions/8974638
更新时间:2023-06-04 13:06

最满意答案

我的伎俩是我有ssh冲突。

我的Windows路径上安装了Git,其中包括ssh。 cwrsync也安装ssh。

诀窍是制作批处理文件来设置正确的路径:

rsync.bat

@echo off
SETLOCAL
SET CWRSYNCHOME=c:\commands\cwrsync
SET HOME=c:\Users\Petah\
SET CWOLDPATH=%PATH%
SET PATH=%CWRSYNCHOME%\bin;%PATH%
%~dp0\cwrsync\bin\rsync.exe %*

在Windows上,您可以键入where ssh以检查这是否是一个问题。 你会得到这样的东西:

where ssh
C:\Program Files (x86)\Git\bin\ssh.exe
C:\Program Files\cwRsync\ssh.exe

i get the solution. i've using cygwin and this is the problem the rsync command for Windows work only in windows shell and works in the windows powershell.

A few times it has happened the same error between two linux boxes. and appears to be by incompatible versions of rsync

相关问答

更多
  • 我的伎俩是我有ssh冲突。 我的Windows路径上安装了Git,其中包括ssh。 cwrsync也安装ssh。 诀窍是制作批处理文件来设置正确的路径: rsync.bat @echo off SETLOCAL SET CWRSYNCHOME=c:\commands\cwrsync SET HOME=c:\Users\Petah\ SET CWOLDPATH=%PATH% SET PATH=%CWRSYNCHOME%\bin;%PATH% %~dp0\cwrsync\bin\rsync.exe %* 在W ...
  • 去除: _request.Content.Headers.ContentEncoding.Add("gzip"); _request.Content.Headers.ContentEncoding.Add("deflate"); 因为你没有发送gzipped内容 另外,我建议你看一下使用WireShak实际发送的代码 - 因为我确信HttpRequestMessage已经附加了你需要的大部分头文件。 All I needed to do was to forbid automatical request ...
  • 我找到了谷歌的解决方案,当使用rsync方法然后将ssh帐户添加到Vagrantfile内容: config.ssh.username = "vagrant" config.ssh.password = "vagrant" config.vm.synced_folder ".", "/var/www", type: "rsync", rsync__exclude: ".git/" 并从文件C:\ HashiCorp \ Vagrant \ embedded \ gems \ gems \ vagrant- ...
  • 哎呀,刚才我已经清除并且现在正在工作,命令中有一个错误。 rsync -av -P -e'ssh -p 1111'text.txt abc@123.45.67.890:/ home / abc oops there was a mistake in the command just now i had cleared and it working now. rsync -av -P -e 'ssh -p 1111' text.txt abc@123.45.67.890:/home/abc
  • 问题出在CentOS上安装了SElinux,由于某些原因阻塞了rss的ssh。 这是/ var / log / messages中的一行,表示ssh被阻止: Jun 12 13:45:59 myserver kernel: type=1400 audit(1434109559.911:33346): avc: denied { execute } for pid=11862 comm="rsync" name="ssh" dev=dm-1 ino=11931741 scontext=unconfine ...
  • 问题出在我的.bash_profile上。 我删除它似乎工作。 奇怪的是,当我把它放回去时,似乎仍然有效 The problem lies in my .bash_profile. I removed it and it seemed to work. Odd thing is when I put it back it seems to still work
  • 我曾经做过很多这样的事情。 刚刚进行了测试,提出了一些建议。 拼出你的整个用户@主机模式 首先运行ssh连接没有rsync,您可能需要先批准指纹 您似乎没有传递标志来保护扩展属性,这可能会在OS X上产生损坏的文件。如果您不需要资源分支,那么您可以,但大多数情况下您确实需要它们。 我的测试用例: $ rsync -Pav ~/Desktop/ me@remote.example.com:~/rsyc-test 在这种情况下,〜/ Desktop中的所有文件都被复制到我的家庭目录中的远程主机。 由于目录'r ...
  • 问题是一个冲突的git安装。 Git包含它自己的ssh,它的路径在cygwin路径之前,所以rsync使用git的ssh。 解决方法是交换git / cygwin bin路径的顺序,以便rsync使用正确版本的ssh。 The problem was a conflicting git installation. Git included it's own ssh and it's path was before cygwin path, so rsync was using git's ssh. The ...
  • 这可能是一个时间问题。 我建议使用AppleScript mount volume命令并等待磁盘出现在/Volumes 您需要将server.local替换为NAS的服务器名称。 set diskName to "NASDrive" try mount volume "smb://server.local/" & diskName repeat until diskName is in (do shell script "ls /Volumes") delay 0.2 ...
  • 请检查以下步骤 1.)FreeSSHd作为服务运行。 2.)SSH服务正在FreeSSHd控制台上运行。 3.)如果没有启动,请尝试在SSH选项卡下更改SSH端口(例如,在我的情况下为430)。 希望这可以帮助。 Please check following steps 1.) FreeSSHd is running as a service. 2.) SSH Service is running on the FreeSSHd console . 3.) If it not starting try ch ...

相关文章

更多

最新问答

更多
  • 获取MVC 4使用的DisplayMode后缀(Get the DisplayMode Suffix being used by MVC 4)
  • 如何通过引用返回对象?(How is returning an object by reference possible?)
  • 矩阵如何存储在内存中?(How are matrices stored in memory?)
  • 每个请求的Java新会话?(Java New Session For Each Request?)
  • css:浮动div中重叠的标题h1(css: overlapping headlines h1 in floated divs)
  • 无论图像如何,Caffe预测同一类(Caffe predicts same class regardless of image)
  • xcode语法颜色编码解释?(xcode syntax color coding explained?)
  • 在Access 2010 Runtime中使用Office 2000校对工具(Use Office 2000 proofing tools in Access 2010 Runtime)
  • 从单独的Web主机将图像传输到服务器上(Getting images onto server from separate web host)
  • 从旧版本复制文件并保留它们(旧/新版本)(Copy a file from old revision and keep both of them (old / new revision))
  • 西安哪有PLC可控制编程的培训
  • 在Entity Framework中选择基类(Select base class in Entity Framework)
  • 在Android中出现错误“数据集和渲染器应该不为null,并且应该具有相同数量的系列”(Error “Dataset and renderer should be not null and should have the same number of series” in Android)
  • 电脑二级VF有什么用
  • Datamapper Ruby如何添加Hook方法(Datamapper Ruby How to add Hook Method)
  • 金华英语角.
  • 手机软件如何制作
  • 用于Android webview中图像保存的上下文菜单(Context Menu for Image Saving in an Android webview)
  • 注意:未定义的偏移量:PHP(Notice: Undefined offset: PHP)
  • 如何读R中的大数据集[复制](How to read large dataset in R [duplicate])
  • Unity 5 Heighmap与地形宽度/地形长度的分辨率关系?(Unity 5 Heighmap Resolution relationship to terrain width / terrain length?)
  • 如何通知PipedOutputStream线程写入最后一个字节的PipedInputStream线程?(How to notify PipedInputStream thread that PipedOutputStream thread has written last byte?)
  • python的访问器方法有哪些
  • DeviceNetworkInformation:哪个是哪个?(DeviceNetworkInformation: Which is which?)
  • 在Ruby中对组合进行排序(Sorting a combination in Ruby)
  • 网站开发的流程?
  • 使用Zend Framework 2中的JOIN sql检索数据(Retrieve data using JOIN sql in Zend Framework 2)
  • 条带格式类型格式模式编号无法正常工作(Stripes format type format pattern number not working properly)
  • 透明度错误IE11(Transparency bug IE11)
  • linux的基本操作命令。。。