C# Parallel.ForEach 是否限制活动线程的数量?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/1114317/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-06 08:33:58  来源:igfitidea点击:

Does Parallel.ForEach limit the number of active threads?

c#.netc#-4.0parallel-processing

提问by Jader Dias

Given this code:

鉴于此代码:

var arrayStrings = new string[1000];
Parallel.ForEach<string>(arrayStrings, someString =>
{
    DoSomething(someString);
});

Will all 1000 threads spawn almost simultaneously?

所有 1000 个线程会几乎同时产生吗?

采纳答案by Jon Skeet

No, it won't start 1000 threads - yes, it will limit how many threads are used. Parallel Extensions uses an appropriate number of cores, based on how many you physically have andhow many are already busy. It allocates work for each core and then uses a technique called work stealingto let each thread process its own queue efficiently and only need to do any expensive cross-thread access when it really needs to.

不,它不会启动 1000 个线程 - 是的,它会限制使用的线程数。Parallel Extensions 使用适当数量的内核,具体取决于您实际拥有的内核数量以及已经忙碌的内核数量。它为每个核心分配工作,然后使用一种称为工作窃取的技术,让每个线程有效地处理自己的队列,并且只需要在真正需要时进行任何昂贵的跨线程访问。

Have a look at the PFX Team Blogfor loadsof information about how it allocates work and all kinds of other topics.

看一看在PFX团队博客负载有关如何分配工作的信息和各种其他主题。

Note that in some cases you can specify the degree of parallelism you want, too.

请注意,在某些情况下,您也可以指定所需的并行度。

回答by Colin Mackay

It works out an optimal number of threads based on the number of processors/cores. They will not all spawn at once.

它根据处理器/内核的数量计算出最佳线程数。它们不会一次全部产卵。

回答by Kevin Hakanson

See Does Parallel.For use one Task per iteration?for an idea of a "mental model" to use. However the author does state that "At the end of the day, it's important to remember that implementation details may change at any time."

请参阅Parallel.For 每次迭代使用一个任务吗?使用“心智模型”的想法。然而,作者确实声明“在一天结束时,重要的是要记住实现细节可能随时更改。”

回答by Microsoft Developer

On a single core machine... Parallel.ForEach partitions (chunks) of the collection it's working on between a number of threads, but that number is calculated based on an algorithm that takes into account and appears to continually monitor the work done by the threads it's allocating to the ForEach. So if the body part of the ForEach calls out to long running IO-bound/blocking functions which would leave the thread waiting around, the algorithm will spawn up more threads and repartition the collection between them. If the threads complete quickly and don't block on IO threads for example, such as simply calculating some numbers, the algorithm will ramp up (or indeed down) the number of threads to a point where the algorithm considers optimum for throughput (average completion time of each iteration).

在单核机器上......它在多个线程之间处理的集合的 Parallel.ForEach 分区(块),但该数字是根据一种算法计算得出的,该算法考虑并似乎持续监控由它分配给 ForEach 的线程。因此,如果 ForEach 的主体部分调用长时间运行的 IO 绑定/阻塞函数,这将使线程等待,该算法将产生更多线程并在它们之间重新分配集合。如果线程很快完成并且不会阻塞在 IO 线程上,例如简单地计算一些数字,该算法将增加(或实际上减少)线程数到算法认为吞吐量最佳的点(每次迭代的平均完成时间)

Basically the thread pool behind all the various Parallel library functions, will work out an optimum number of threads to use. The number of physical processor cores forms only part of the equation. There is NOT a simple one to one relationship between the number of cores and the number of threads spawned.

基本上所有各种并行库函数背后的线程池将计算出要使用的最佳线程数。物理处理器内核的数量只是等式的一部分。内核数和产生的线程数之间没有简单的一对一关系。

I don't find the documentation around the cancellation and handling of synchronizing threads very helpful. Hopefully MS can supply better examples in MSDN.

我没有发现有关取消和处理同步线程的文档很有帮助。希望 MS 可以在 MSDN 中提供更好的示例。

Don't forget, the body code must be written to run on multiple threads, along with all the usual thread safety considerations, the framework does not abstract that factor... yet.

不要忘记,主体代码必须编写为在多个线程上运行,以及所有通常的线程安全考虑因素,框架还没有抽象那个因素......

回答by Timothy Gonzalez

Great question. In your example, the level of parallelization is pretty low even on a quad core processor, but with some waiting the level of parallelization can get quite high.

很好的问题。在您的示例中,即使在四核处理器上,并行化级别也非常低,但是经过一些等待,并行化级别可能会变得非常高。

// Max concurrency: 5
[Test]
public void Memory_Operations()
{
    ConcurrentBag<int> monitor = new ConcurrentBag<int>();
    ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
    var arrayStrings = new string[1000];
    Parallel.ForEach<string>(arrayStrings, someString =>
    {
        monitor.Add(monitor.Count);
        monitor.TryTake(out int result);
        monitorOut.Add(result);
    });

    Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
}

Now look what happens when a waiting operation is added to simulate an HTTP request.

现在看看当添加等待操作来模拟 HTTP 请求时会发生什么。

// Max concurrency: 34
[Test]
public void Waiting_Operations()
{
    ConcurrentBag<int> monitor = new ConcurrentBag<int>();
    ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
    var arrayStrings = new string[1000];
    Parallel.ForEach<string>(arrayStrings, someString =>
    {
        monitor.Add(monitor.Count);

        System.Threading.Thread.Sleep(1000);

        monitor.TryTake(out int result);
        monitorOut.Add(result);
    });

    Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
}

I haven't made any changes yet and the level of concurrency/parallelization has jumped drammatically. Concurrency can have its limit increased with ParallelOptions.MaxDegreeOfParallelism.

我还没有做任何改变,并发/并行化的水平已经戏剧性地跃升了。并发的限制可以随着 增加ParallelOptions.MaxDegreeOfParallelism

// Max concurrency: 43
[Test]
public void Test()
{
    ConcurrentBag<int> monitor = new ConcurrentBag<int>();
    ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
    var arrayStrings = new string[1000];
    var options = new ParallelOptions {MaxDegreeOfParallelism = int.MaxValue};
    Parallel.ForEach<string>(arrayStrings, options, someString =>
    {
        monitor.Add(monitor.Count);

        System.Threading.Thread.Sleep(1000);

        monitor.TryTake(out int result);
        monitorOut.Add(result);
    });

    Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
}

// Max concurrency: 391
[Test]
public void Test()
{
    ConcurrentBag<int> monitor = new ConcurrentBag<int>();
    ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
    var arrayStrings = new string[1000];
    var options = new ParallelOptions {MaxDegreeOfParallelism = int.MaxValue};
    Parallel.ForEach<string>(arrayStrings, options, someString =>
    {
        monitor.Add(monitor.Count);

        System.Threading.Thread.Sleep(100000);

        monitor.TryTake(out int result);
        monitorOut.Add(result);
    });

    Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
}

I reccommend setting ParallelOptions.MaxDegreeOfParallelism. It will not necessarily increase the number of threads in use, but it will ensure you only start a sane number of threads, which seems to be your concern.

我建议设置ParallelOptions.MaxDegreeOfParallelism。它不一定会增加正在使用的线程数,但它会确保您只启动合理数量的线程,这似乎是您关心的问题。

Lastly to answer your question, no you will not get all threads to start at once. Use Parallel.Invoke if you are looking to invoke in parallel perfectly e.g. testing race conditions.

最后回答您的问题,不,您不会立即启动所有线程。如果您希望完美地并行调用,例如测试竞争条件,请使用 Parallel.Invoke。

// 636462943623363344
// 636462943623363344
// 636462943623363344
// 636462943623363344
// 636462943623363344
// 636462943623368346
// 636462943623368346
// 636462943623373351
// 636462943623393364
// 636462943623393364
[Test]
public void Test()
{
    ConcurrentBag<string> monitor = new ConcurrentBag<string>();
    ConcurrentBag<string> monitorOut = new ConcurrentBag<string>();
    var arrayStrings = new string[1000];
    var options = new ParallelOptions {MaxDegreeOfParallelism = int.MaxValue};
    Parallel.ForEach<string>(arrayStrings, options, someString =>
    {
        monitor.Add(DateTime.UtcNow.Ticks.ToString());
        monitor.TryTake(out string result);
        monitorOut.Add(result);
    });

    var startTimes = monitorOut.OrderBy(x => x.ToString()).ToList();
    Console.WriteLine(string.Join(Environment.NewLine, startTimes.Take(10)));
}