-
Notifications
You must be signed in to change notification settings - Fork 35
Expand file tree
/
Copy pathAsyncParallelForeach.cs
More file actions
70 lines (68 loc) · 2.53 KB
/
AsyncParallelForeach.cs
File metadata and controls
70 lines (68 loc) · 2.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace ViennaNET.Utils
{
/// <summary>
/// Асинхронный параллельный ForEach
/// </summary>
public static class AsyncParallelForeach
{
/// <summary>
/// Метод, выполняющий асинхронный параллельный Select
/// </summary>
/// <param name="source">Массив исходных данных</param>
/// <param name="action">
/// Метод для выполнения по параметрам <typeparamref name="V" />, возвращающий Task
/// <typeparamref name="T" />
/// </param>
/// <param name="threadsCount">Количество потоков</param>
/// <returns>List объектов типа <typeparamref name="T" /></returns>
public static async Task<List<T>> SelectAsync<T, V>(IEnumerable<V> source, Func<V, Task<T>> action,
int threadsCount)
{
var result = new ConcurrentBag<T>();
var tasks = new ConcurrentQueue<Task<T>>(source.Select(d => action(d)));
var threads = Enumerable.Range(1, threadsCount)
.Select(async p =>
{
while (tasks.TryDequeue(out var item))
{
if (item.IsCompleted)
{
result.Add(item.Result);
}
else
{
result.Add(await item);
}
}
});
await Task.WhenAll(threads);
return result.ToList();
}
/// <summary>
/// Метод, выполняющий асинхронный параллельный Foreach
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="source">Массив исходных данных для запроса</param>
/// <param name="action">Метод для выполнения запроса по параметрам <typeparamref name="T" />, возвращающий Task</param>
/// <param name="threadsCount">Количество потоков</param>
/// <returns></returns>
public static async Task ForEachAsync<T>(IEnumerable<T> source, Func<T, Task> action, int threadsCount)
{
var tasks = new ConcurrentQueue<Task>(source.Select(d => action(d)));
var threads = Enumerable.Range(1, threadsCount)
.Select(async p =>
{
while (tasks.TryDequeue(out var item))
{
await item;
}
});
await Task.WhenAll(threads);
}
}
}