这个线程安全吗?(Is this thread safe?)
这个线程安全吗?
private static bool close_thread_running = false; public static void StartBrowserCleaning() { lock (close_thread_running) { if (close_thread_running) return; close_thread_running = true; } Thread thread = new Thread(new ThreadStart(delegate() { while (true) { lock (close_thread_running) { if (!close_thread_running) break; } CleanBrowsers(); Thread.Sleep(5000); } })); thread.Start(); } public static void StopBrowserCleaning() { lock (close_thread_running) { close_thread_running = false; } }
Is this thread safe?
private static bool close_thread_running = false; public static void StartBrowserCleaning() { lock (close_thread_running) { if (close_thread_running) return; close_thread_running = true; } Thread thread = new Thread(new ThreadStart(delegate() { while (true) { lock (close_thread_running) { if (!close_thread_running) break; } CleanBrowsers(); Thread.Sleep(5000); } })); thread.Start(); } public static void StopBrowserCleaning() { lock (close_thread_running) { close_thread_running = false; } }
原文:https://stackoverflow.com/questions/1419889
最满意答案
在任何意义上,您的解决方案都不是一个工作者协程池:您的代码不会限制并发的goroutine,也不会“重用”goroutines(当收到新作业时它总是会启动一个新的goroutines)。
生产者 - 消费者模式
正如我在Bruteforce MD5 Password cracker上发布的 ,您可以使用生产者 - 消费者模式 。 你可以有一个指定的生产者例程来生成作业(要做/计算的事情),并将它们发送到作业频道。 您可以拥有一个固定的消费者例程池(例如,其中5个),它将循环交付作业的渠道,并且每个会执行/完成接收到的作业。
当生产和发送所有工作时, 生产者协会可以简单地关闭
jobs
渠道,正确地告知消费者没有更多工作即将到来。 频道上的for ... range
构造处理“关闭”事件并正确终止。 请注意,关闭频道之前发送的所有作业仍将发送。这将导致一个干净的设计,会导致固定(但任意)的goroutines数量,并且它总是会利用100%的CPU(如果goroutines的数量大于#个CPU核心的数量)。 它还具有可以通过正确选择信道容量(缓冲信道)和消费者套路数量来“调节”的优点。
请注意,此模型具有指定的生产者例程并非强制性的。 您可能有多个goroutines也可以生成作业,但是您必须同步它们才能在所有生产者例程完成作业时关闭
jobs
通道 - 否则尝试在jobs
通道已经关闭时发送另一个作业会导致运行时恐慌。 通常生产工作便宜,而且生产速度可能比他们能够执行的要快得多,所以这种模型在1个例程中生产,而许多人正在消费/执行,这在实践中是很好的。处理结果:
如果工作有成果,您可以选择有一个指定的结果渠道,其结果可以发送(“发回”),或者您可以选择在作业完成/完成时在消费者处理结果。 后者甚至可以通过具有处理结果的“回调”函数来实现。 重要的是结果是可以独立处理还是需要合并(例如map-reduce框架)或者是合并的。
如果你使用
results
频道,你还需要一个goroutine,它接收来自它的值,防止消费者被阻止(如果results
缓冲区被填充,则会发生)。
results
频道我不会发送简单的
string
值作为作业和结果,而是创建一个可以容纳任何附加信息的包装类型,因此它更加灵活:type Job struct { Id int Work string Result string }
请注意,
Job
结构也封装了结果,所以当我们发回结果时,它也包含原始的Job
作为上下文 - 通常非常有用 。 还要注意,只在通道上发送指针(*Job
)而不是Job
值是有益的,因此不需要创建Job
的“无数”副本,并且Job
结构值的大小也变得无关紧要。下面是这个生产者 - 消费者的样子:
我会使用2个
sync.WaitGroup
值,他们的角色将遵循:var wg, wg2 sync.WaitGroup
生产者负责生成要执行的工作:
func produce(jobs chan<- *Job) { // Generate jobs: id := 0 for c := 'a'; c <= 'z'; c++ { id++ jobs <- &Job{Id: id, Work: fmt.Sprintf("%c", c)} } close(jobs) }
完成后(不再有工作),
jobs
频道关闭,向消费者表明没有更多工作会到达。请注意,
produce()
将jobs
通道看作仅发送 ,因为生产者只需要这样做就可以做到这一点:在其上发送作业(除了关闭作业,但在只发送通道上也是如此)。 生产者意外收到的将是编译时错误(在编译时提前检测到)。消费者的责任是只要能够接收到工作就能获得工作,并执行它们:
func consume(id int, jobs <-chan *Job, results chan<- *Job) { defer wg.Done() for job := range jobs { sleepMs := rand.Intn(1000) fmt.Printf("worker #%d received: '%s', sleep %dms\n", id, job.Work, sleepMs) time.Sleep(time.Duration(sleepMs) * time.Millisecond) job.Result = job.Work + fmt.Sprintf("-%dms", sleepMs) results <- job } }
请注意,
consume()
将jobs
通道视为仅接收 ; 消费者只需要从中获得收益 。 同样,results
渠道只发送给消费者。另外请注意,
results
通道不能在这里关闭,因为有多个消费者例程,并且只有第一次试图关闭它才会成功,并且进一步会导致运行时恐慌!results
通道可以(必须)在所有消费者程序结束后关闭,因为那样我们可以确定没有更多的值(结果)会在results
通道上发送。我们有需要分析的结果:
func analyze(results <-chan *Job) { defer wg2.Done() for job := range results { fmt.Printf("result: %s\n", job.Result) } }
正如你所看到的,只要他们可能会来,它也会收到结果(直到
results
通道关闭)。 分析仪的results
通道只能接收 。请注意使用通道类型:在编译时,只要使用通道类型,就只使用单向通道类型来检测和防止错误。 如果您确实需要双向通道,请仅使用双向通道类型。
这就是所有这些都粘在一起的原因:
func main() { jobs := make(chan *Job, 100) // Buffered channel results := make(chan *Job, 100) // Buffered channel // Start consumers: for i := 0; i < 5; i++ { // 5 consumers wg.Add(1) go consume(i, jobs, results) } // Start producing go produce(jobs) // Start analyzing: wg2.Add(1) go analyze(results) wg.Wait() // Wait all consumers to finish processing jobs // All jobs are processed, no more values will be sent on results: close(results) wg2.Wait() // Wait analyzer to analyze all results }
示例输出:
这是一个输出示例:
正如你所看到的,结果在即将到来的所有工作之前即将到来并得到分析:
worker #4 received: 'e', sleep 81ms worker #0 received: 'a', sleep 887ms worker #1 received: 'b', sleep 847ms worker #2 received: 'c', sleep 59ms worker #3 received: 'd', sleep 81ms worker #2 received: 'f', sleep 318ms result: c-59ms worker #4 received: 'g', sleep 425ms result: e-81ms worker #3 received: 'h', sleep 540ms result: d-81ms worker #2 received: 'i', sleep 456ms result: f-318ms worker #4 received: 'j', sleep 300ms result: g-425ms worker #3 received: 'k', sleep 694ms result: h-540ms worker #4 received: 'l', sleep 511ms result: j-300ms worker #2 received: 'm', sleep 162ms result: i-456ms worker #1 received: 'n', sleep 89ms result: b-847ms worker #0 received: 'o', sleep 728ms result: a-887ms worker #1 received: 'p', sleep 274ms result: n-89ms worker #2 received: 'q', sleep 211ms result: m-162ms worker #2 received: 'r', sleep 445ms result: q-211ms worker #1 received: 's', sleep 237ms result: p-274ms worker #3 received: 't', sleep 106ms result: k-694ms worker #4 received: 'u', sleep 495ms result: l-511ms worker #3 received: 'v', sleep 466ms result: t-106ms worker #1 received: 'w', sleep 528ms result: s-237ms worker #0 received: 'x', sleep 258ms result: o-728ms worker #2 received: 'y', sleep 47ms result: r-445ms worker #2 received: 'z', sleep 947ms result: y-47ms result: u-495ms result: x-258ms result: v-466ms result: w-528ms result: z-947ms
在Go Playground上尝试完整的应用程序。
没有
results
频道如果我们不使用
results
通道,但是消费者例程马上处理结果(在我们的例子中打印),代码将大大简化。 在这种情况下,我们不需要2个sync.WaitGroup
值(第2个只需要等待分析仪完成)。没有
results
渠道,完整的解决方案就像这样:var wg sync.WaitGroup type Job struct { Id int Work string } func produce(jobs chan<- *Job) { // Generate jobs: id := 0 for c := 'a'; c <= 'z'; c++ { id++ jobs <- &Job{Id: id, Work: fmt.Sprintf("%c", c)} } close(jobs) } func consume(id int, jobs <-chan *Job) { defer wg.Done() for job := range jobs { sleepMs := rand.Intn(1000) fmt.Printf("worker #%d received: '%s', sleep %dms\n", id, job.Work, sleepMs) time.Sleep(time.Duration(sleepMs) * time.Millisecond) fmt.Printf("result: %s\n", job.Work+fmt.Sprintf("-%dms", sleepMs)) } } func main() { jobs := make(chan *Job, 100) // Buffered channel // Start consumers: for i := 0; i < 5; i++ { // 5 consumers wg.Add(1) go consume(i, jobs) } // Start producing go produce(jobs) wg.Wait() // Wait all consumers to finish processing jobs }
输出与
results
通道“相似”(但当然执行/完成顺序是随机的)。在Go Playground上试试这个变体。
Your solution is not a worker goroutine pool in any sense: your code does not limit concurrent goroutines, and it does not "reuse" goroutines (it always starts a new one when a new job is received).
Producer-consumer pattern
As posted at Bruteforce MD5 Password cracker, you can make use of the producer-consumer pattern. You could have a designated producer goroutine that would generate the jobs (things to do / calculate), and send them on a jobs channel. You could have a fixed pool of consumer goroutines (e.g. 5 of them) which would loop over the channel on which jobs are delivered, and each would execute / complete the received jobs.
The producer goroutine could simply close the
jobs
channel when all jobs were generated and sent, properly signalling consumers that no more jobs will be coming. Thefor ... range
construct on a channel handles the "close" event and terminates properly. Note that all jobs sent before closing the channel will still be delivered.This would result in a clean design, would result in fixed (but arbitrary) number of goroutines, and it would always utilize 100% CPU (if # of goroutines is greater than # of CPU cores). It also has the advantage that it can be "throttled" with the proper selection of the channel capacity (buffered channel) and the number of consumer goroutines.
Note that this model to have a designated producer goroutine is not mandatory. You could have multiple goroutines to produce jobs too, but then you must synchronize them too to only close the
jobs
channel when all producer goroutines are done producing jobs - else attempting to send another job on thejobs
channel when it has already been closed results in a runtime panic. Usually producing jobs are cheap and can be produced at a much quicker rate than they can be executed, so this model to produce them in 1 goroutine while many are consuming / executing them is good in practice.Handling results:
If jobs have results, you may choose to have a designated result channel on which results could be delivered ("sent back"), or you may choose to handle the results in the consumer when the job is completed / finished. This latter may even be implemented by having a "callback" function that handles the results. The important thing is whether results can be processed independently or they need to be merged (e.g. map-reduce framework) or aggregated.
If you go with a
results
channel, you also need a goroutine that receives values from it, preventing consumers to get blocked (would occur if buffer ofresults
would get filled).With
results
channelInstead of sending simple
string
values as jobs and results, I would create a wrapper type which can hold any additional info and so it is much more flexible:type Job struct { Id int Work string Result string }
Note that the
Job
struct also wraps the result, so when we send back the result, it also contains the originalJob
as the context - often very useful. Also note that it is profitable to just send pointers (*Job
) on the channels instead ofJob
values so no need to make "countless" copies ofJob
s, and also the size of theJob
struct value becomes irrelevant.Here is how this producer-consumer could look like:
I would use 2
sync.WaitGroup
values, their role will follow:var wg, wg2 sync.WaitGroup
The producer is responsible to generate jobs to be executed:
func produce(jobs chan<- *Job) { // Generate jobs: id := 0 for c := 'a'; c <= 'z'; c++ { id++ jobs <- &Job{Id: id, Work: fmt.Sprintf("%c", c)} } close(jobs) }
When done (no more jobs), the
jobs
channel is closed which signals consumers that no more jobs will arrive.Note that
produce()
sees thejobs
channel as send only, because that's what the producer needs to do only with that: send jobs on it (besides closing it, but that is also permitted on a send only channel). An accidental receive in the producer would be a compile time error (detected early, at compile time).The consumer's responsibility is to receive jobs as long as jobs can be received, and execute them:
func consume(id int, jobs <-chan *Job, results chan<- *Job) { defer wg.Done() for job := range jobs { sleepMs := rand.Intn(1000) fmt.Printf("worker #%d received: '%s', sleep %dms\n", id, job.Work, sleepMs) time.Sleep(time.Duration(sleepMs) * time.Millisecond) job.Result = job.Work + fmt.Sprintf("-%dms", sleepMs) results <- job } }
Note that
consume()
sees thejobs
channel as receive only; consumer only needs to receive from it. Similarly theresults
channel is send only for the consumer.Also note that the
results
channel cannot be closed here as there are multiple consumer goroutines, and only the first attempting to close it would succeed and further ones would result in runtime panic!results
channel can (must) be closed after all consumer goroutines ended, because then we can be sure no further values (results) will be sent on theresults
channel.We have results which need to be analyzed:
func analyze(results <-chan *Job) { defer wg2.Done() for job := range results { fmt.Printf("result: %s\n", job.Result) } }
As you can see, this also receives results as long as they may come (until
results
channel is closed). Theresults
channel for the analyzer is receive only.Please note the use of channel types: whenever it is sufficient, use only a unidirectional channel type to detect and prevent errors early, at compile time. Only use bidirectional channel type if you do need both directions.
And this is how all these are glued together:
func main() { jobs := make(chan *Job, 100) // Buffered channel results := make(chan *Job, 100) // Buffered channel // Start consumers: for i := 0; i < 5; i++ { // 5 consumers wg.Add(1) go consume(i, jobs, results) } // Start producing go produce(jobs) // Start analyzing: wg2.Add(1) go analyze(results) wg.Wait() // Wait all consumers to finish processing jobs // All jobs are processed, no more values will be sent on results: close(results) wg2.Wait() // Wait analyzer to analyze all results }
Example output:
Here is an example output:
As you can see, results are coming and getting analyzed before all the jobs would be enqueued:
worker #4 received: 'e', sleep 81ms worker #0 received: 'a', sleep 887ms worker #1 received: 'b', sleep 847ms worker #2 received: 'c', sleep 59ms worker #3 received: 'd', sleep 81ms worker #2 received: 'f', sleep 318ms result: c-59ms worker #4 received: 'g', sleep 425ms result: e-81ms worker #3 received: 'h', sleep 540ms result: d-81ms worker #2 received: 'i', sleep 456ms result: f-318ms worker #4 received: 'j', sleep 300ms result: g-425ms worker #3 received: 'k', sleep 694ms result: h-540ms worker #4 received: 'l', sleep 511ms result: j-300ms worker #2 received: 'm', sleep 162ms result: i-456ms worker #1 received: 'n', sleep 89ms result: b-847ms worker #0 received: 'o', sleep 728ms result: a-887ms worker #1 received: 'p', sleep 274ms result: n-89ms worker #2 received: 'q', sleep 211ms result: m-162ms worker #2 received: 'r', sleep 445ms result: q-211ms worker #1 received: 's', sleep 237ms result: p-274ms worker #3 received: 't', sleep 106ms result: k-694ms worker #4 received: 'u', sleep 495ms result: l-511ms worker #3 received: 'v', sleep 466ms result: t-106ms worker #1 received: 'w', sleep 528ms result: s-237ms worker #0 received: 'x', sleep 258ms result: o-728ms worker #2 received: 'y', sleep 47ms result: r-445ms worker #2 received: 'z', sleep 947ms result: y-47ms result: u-495ms result: x-258ms result: v-466ms result: w-528ms result: z-947ms
Try the complete application on the Go Playground.
Without a
results
channelCode simplifies significantly if we don't use a
results
channel but the consumer goroutines handle the result right away (print it in our case). In this case we don't need 2sync.WaitGroup
values (the 2nd was only needed to wait for the analyzer to complete).Without a
results
channel the complete solution is like this:var wg sync.WaitGroup type Job struct { Id int Work string } func produce(jobs chan<- *Job) { // Generate jobs: id := 0 for c := 'a'; c <= 'z'; c++ { id++ jobs <- &Job{Id: id, Work: fmt.Sprintf("%c", c)} } close(jobs) } func consume(id int, jobs <-chan *Job) { defer wg.Done() for job := range jobs { sleepMs := rand.Intn(1000) fmt.Printf("worker #%d received: '%s', sleep %dms\n", id, job.Work, sleepMs) time.Sleep(time.Duration(sleepMs) * time.Millisecond) fmt.Printf("result: %s\n", job.Work+fmt.Sprintf("-%dms", sleepMs)) } } func main() { jobs := make(chan *Job, 100) // Buffered channel // Start consumers: for i := 0; i < 5; i++ { // 5 consumers wg.Add(1) go consume(i, jobs) } // Start producing go produce(jobs) wg.Wait() // Wait all consumers to finish processing jobs }
Output is "like" that of with
results
channel (but of course execution/completion order is random).Try this variant on the Go Playground.
相关问答
更多-
我会反过来做。 我没有产生许多goroutines(仍然需要大量的内存),而是使用一个通道来阻止它们,我会将这些工人建模为goroutine并使用一个通道来分配工作。 像这样的东西: package main import ( "fmt" "sync" ) type Task string func worker(tasks <-chan Task, quit <-chan bool, wg *sync.WaitGroup) { defer wg.Done() for ...
-
什么时候使用线程池?(When is the thread pool used?)[2022-02-05]
你对节点如何工作的理解是不正确的...但是这是一个常见的误解,因为现实情况实际上是相当复杂的,通常会被简化为一些简单的例子,例如“节点是单线程” 。 目前,我们将忽略通过集群和Webworker线程的显式多处理/多线程 ,并且只谈一些典型的非线程节点。 节点在单个事件循环中运行。 它是单线程,你只有一个线程。 所有您编写的JavaScript都将在此循环中执行,如果该代码中发生阻塞操作,则会阻塞整个循环,直到完成后才会发生其他任何事情。 这是您听到的节点的典型单线程性质。 但是,这不是整个画面。 通常用C ... -
我为Runnable创建了一个CallableDecorator来解决这个问题。 现在即使使用GenericObjectPool,我也有正确的返回值。 由于现在对于读取状态没有依赖于Pool对象,即使重用对象也不会导致状态重置 - 因此,代码中的2个更改 - 更改 futures.add(completionService.submit(myRunnable, myRunnable)); 至 futures.add(completionService.submit(new CallableDecorato ...
-
如何在工作线程中重用由主线程创建的OMP线程池?(How to Reuse OMP Thread Pool, Created by Main Thread, in Worker Thread?)[2024-01-19]
OpenMP与其他线程机制的交互故意被排除在规范之外,因此在很大程度上取决于实现。 GNU OpenMP运行时在TLS中保存一个指向线程池的指针,并将其传播到(嵌套)团队中。 通过pthread_create (或boost::thread或std::thread )启动的std::thread不继承指针,因此产生一个新池。 其他OpenMP运行时也可能就是这种情况。 标准中要求在大多数实现中基本上强制这种行为。 它是关于threadprivate变量的语义,以及它们如何在从同一个线程分叉的不同并行区域中保 ... -
什么是池线程(What is a pool thread)[2022-10-04]
感谢您提供提示。 我终于不小心使用了perf,发现那些池线程负责线程克隆或类似的东西。 Thanks for providing hints. I finally accidentally use perf and find those pool threads are responsible for thread cloning or something like this. -
在任何意义上,您的解决方案都不是一个工作者协程池:您的代码不会限制并发的goroutine,也不会“重用”goroutines(当收到新作业时它总是会启动一个新的goroutines)。 生产者 - 消费者模式 正如我在Bruteforce MD5 Password cracker上发布的 ,您可以使用生产者 - 消费者模式 。 你可以有一个指定的生产者例程来生成作业(要做/计算的事情),并将它们发送到作业频道。 您可以拥有一个固定的消费者例程池(例如,其中5个),它将循环交付作业的渠道,并且每个会执行/完 ...
-
Netty - NioWorker池和工作线程池之间的关系(Netty - relationship between NioWorker pool and worker thread pool)[2022-04-05]
是的,关系是1:1。 所以你需要至少拥有与NioWorkers一样多的线程。 如果你有更少,它将在创建* SocketChannelFactory时抛出一个“挂起”异常,具体取决于Executor实现。 Yes the relationship is 1:1. So you need to have at least as many threads as NioWorkers. If you have less it will throw an Exception of just "hang" when c ... -
您可以将BackGroundWorker与ReportsProgess一起使用,以将信息发送到UI线程。 您可以传递一个对象。 同时实施取消,这样您就不会关闭流。 BackgroundWorker.ReportProgress方法 另一种选择是有一种获取SQL通知的方法。 SQL Server中的查询通知 You could use BackGroundWorker with ReportsProgess to send the info to UI thread. You can pass an obje ...
-
您的代码死锁的原因是_work_remains是一个条件变量,它不会被代码的任何部分“通知”。 您需要创建一个类属性,并通过从_events中获取最后一个事件的任何线程通知它。 The reason your code is deadlocked is that _work_remains is a condition variable which is not "notified" by any part of your code. You would need to make that a class ...
-
下面的代码演示了如何使用Executor周围的包装器类来计算已提交作业的数量,并将其与已完成作业的数量进行比较,以实现您想要的效果。 请注意,您的任务必须调用包装类的execute方法,而不是直接调用底层的Executor 。 如果需要,扩展下面的包装器以包装ExecutorService的'submit'方法应该是微不足道的。 public class ExampleExecutor { private final Executor executor; private long subm ...