C#生产者/消费者
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/1656404/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
C# producer/consumer
提问by lboregard
i've recently come across a producer/consumer pattern c# implementation. it's very simple and (for me at least) very elegant.
我最近遇到了一个生产者/消费者模式 c# 实现。它非常简单并且(至少对我而言)非常优雅。
it seems to have been devised around 2006, so i was wondering if this implementation is
- safe
- still applicable
它似乎是在 2006 年左右设计的,所以我想知道这个实现是否
- 安全
- 仍然适用
Code is below (original code was referenced at http://bytes.com/topic/net/answers/575276-producer-consumer#post2251375)
代码如下(原始代码参考http://bytes.com/topic/net/answers/575276-producer-consumer#post2251375)
using System;
using System.Collections;
using System.Threading;
public class Test
{
static ProducerConsumer queue;
static void Main()
{
queue = new ProducerConsumer();
new Thread(new ThreadStart(ConsumerJob)).Start();
Random rng = new Random(0);
for (int i=0; i < 10; i++)
{
Console.WriteLine ("Producing {0}", i);
queue.Produce(i);
Thread.Sleep(rng.Next(1000));
}
}
static void ConsumerJob()
{
// Make sure we get a different random seed from the
// first thread
Random rng = new Random(1);
// We happen to know we've only got 10
// items to receive
for (int i=0; i < 10; i++)
{
object o = queue.Consume();
Console.WriteLine ("\t\t\t\tConsuming {0}", o);
Thread.Sleep(rng.Next(1000));
}
}
}
public class ProducerConsumer
{
readonly object listLock = new object();
Queue queue = new Queue();
public void Produce(object o)
{
lock (listLock)
{
queue.Enqueue(o);
// We always need to pulse, even if the queue wasn't
// empty before. Otherwise, if we add several items
// in quick succession, we may only pulse once, waking
// a single thread up, even if there are multiple threads
// waiting for items.
Monitor.Pulse(listLock);
}
}
public object Consume()
{
lock (listLock)
{
// If the queue is empty, wait for an item to be added
// Note that this is a while loop, as we may be pulsed
// but not wake up before another thread has come in and
// consumed the newly added object. In that case, we'll
// have to wait for another pulse.
while (queue.Count==0)
{
// This releases listLock, only reacquiring it
// after being woken up by a call to Pulse
Monitor.Wait(listLock);
}
return queue.Dequeue();
}
}
}
回答by Jon Skeet
The code is older than that - I wrote it some time before .NET 2.0 came out. The conceptof a producer/consumer queue is wayolder than that though :)
代码比那更旧 - 我在 .NET 2.0 出现之前写了一段时间。该概念生产者/消费者队列的方式相比,虽然旧的:)
Yes, that code is safe as far as I'm aware - but it has some deficiencies:
是的,就我所知,该代码是安全的 - 但它有一些缺陷:
- It's non-generic. A modern version would certainly be generic.
- It has no way of stopping the queue. One simple way of stopping the queue (so that all the consumer threads retire) is to have a "stop work" token which can be put into the queue. You then add as many tokens as you have threads. Alternatively, you have a separate flag to indicate that you want to stop. (This allows the other threads to stop before finishing all the current work in the queue.)
- If the jobs are very small, consuming a single job at a time may not be the most efficient thing to do.
- 它是非通用的。现代版本肯定是通用的。
- 它无法停止队列。停止队列(以便所有消费者线程退出)的一种简单方法是拥有一个可以放入队列的“停止工作”令牌。然后,您添加与线程一样多的令牌。或者,您有一个单独的标志来指示您要停止。(这允许其他线程在完成队列中的所有当前工作之前停止。)
- 如果作业非常小,一次消耗一个作业可能不是最有效的做法。
The ideas behind the code are more important than the code itself, to be honest.
老实说,代码背后的想法比代码本身更重要。
回答by dashton
You could do something like the following code snippet. It's generic and has a method for enqueue-ing nulls (or whatever flag you'd like to use) to tell the worker threads to exit.
您可以执行类似于以下代码片段的操作。它是通用的,并且有一种方法可以将空值(或您想使用的任何标志)排入队列,以告诉工作线程退出。
The code is taken from here: http://www.albahari.com/threading/part4.aspx#_Wait_and_Pulse
代码取自这里:http: //www.albahari.com/threading/part4.aspx#_Wait_and_Pulse
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
namespace ConsoleApplication1
{
public class TaskQueue<T> : IDisposable where T : class
{
object locker = new object();
Thread[] workers;
Queue<T> taskQ = new Queue<T>();
public TaskQueue(int workerCount)
{
workers = new Thread[workerCount];
// Create and start a separate thread for each worker
for (int i = 0; i < workerCount; i++)
(workers[i] = new Thread(Consume)).Start();
}
public void Dispose()
{
// Enqueue one null task per worker to make each exit.
foreach (Thread worker in workers) EnqueueTask(null);
foreach (Thread worker in workers) worker.Join();
}
public void EnqueueTask(T task)
{
lock (locker)
{
taskQ.Enqueue(task);
Monitor.PulseAll(locker);
}
}
void Consume()
{
while (true)
{
T task;
lock (locker)
{
while (taskQ.Count == 0) Monitor.Wait(locker);
task = taskQ.Dequeue();
}
if (task == null) return; // This signals our exit
Console.Write(task);
Thread.Sleep(1000); // Simulate time-consuming task
}
}
}
}
回答by DiogoNeves
Warning:If you read the comments, you'll understand my answer is wrong :)
警告:如果您阅读评论,您就会明白我的回答是错误的:)
There's a possible deadlockin your code.
您的代码中可能存在死锁。
Imagine the following case, for clarity, I used a single-thread approach but should be easy to convert to multi-thread with sleep:
想象一下以下情况,为了清楚起见,我使用了单线程方法,但应该很容易转换为带睡眠的多线程:
// We create some actions...
object locker = new object();
Action action1 = () => {
lock (locker)
{
System.Threading.Monitor.Wait(locker);
Console.WriteLine("This is action1");
}
};
Action action2 = () => {
lock (locker)
{
System.Threading.Monitor.Wait(locker);
Console.WriteLine("This is action2");
}
};
// ... (stuff happens, etc.)
// Imagine both actions were running
// and there's 0 items in the queue
// And now the producer kicks in...
lock (locker)
{
// This would add a job to the queue
Console.WriteLine("Pulse now!");
System.Threading.Monitor.Pulse(locker);
}
// ... (more stuff)
// and the actions finish now!
Console.WriteLine("Consume action!");
action1(); // Oops... they're locked...
action2();
Please do let me know if this doesn't make any sense.
如果这没有任何意义,请告诉我。
If this is confirmed, then the answer to your question is, "no, it isn't safe" ;) I hope this helps.
如果这一点得到确认,那么您的问题的答案是,“不,它不安全”;) 我希望这会有所帮助。
回答by kicsit
Back in the day I learned how Monitor.Wait/Pulse works (and a lot about threads in general) from the above piece of code and the article seriesit is from. So as Jon says, it has a lot of value to it and is indeed safe and applicable.
回到那天,我从上面的代码片段和它来自的文章系列中了解到 Monitor.Wait/Pulse 是如何工作的(以及很多关于线程的一般知识)。因此,正如 Jon 所说,它具有很多价值,并且确实安全且适用。
However, as of .NET 4, there is a producer-consumer queue implementation in the framework. I only just found it myself but up to this point it does everything I need.
但是,从 .NET 4 开始,框架中有一个生产者-消费者队列实现。我只是自己找到它,但到目前为止,它可以满足我的一切需求。
回答by Narottam Goyal
public class ProducerConsumerProblem
{
private int n;
object obj = new object();
public ProducerConsumerProblem(int n)
{
this.n = n;
}
public void Producer()
{
for (int i = 0; i < n; i++)
{
lock (obj)
{
Console.Write("Producer =>");
System.Threading.Monitor.Pulse(obj);
System.Threading.Thread.Sleep(1);
System.Threading.Monitor.Wait(obj);
}
}
}
public void Consumer()
{
lock (obj)
{
for (int i = 0; i < n; i++)
{
System.Threading.Monitor.Wait(obj, 10);
Console.Write("<= Consumer");
System.Threading.Monitor.Pulse(obj);
Console.WriteLine();
}
}
}
}
public class Program
{
static void Main(string[] args)
{
ProducerConsumerProblem f = new ProducerConsumerProblem(10);
System.Threading.Thread t1 = new System.Threading.Thread(() => f.Producer());
System.Threading.Thread t2 = new System.Threading.Thread(() => f.Consumer());
t1.IsBackground = true;
t2.IsBackground = true;
t1.Start();
t2.Start();
Console.ReadLine();
}
}
output
输出
Producer =><= Consumer
Producer =><= Consumer
Producer =><= Consumer
Producer =><= Consumer
Producer =><= Consumer
Producer =><= Consumer
Producer =><= Consumer
Producer =><= Consumer
Producer =><= Consumer
Producer =><= Consumer