Parallel.Invoke
方法:并行执行一组委托。
Parallel.For
方法:执行与C# for循环等价的并行方法。
Parallel.ForEach
方法:执行与C#foreach循环等价的并行方法。
这三个方法都会阻塞线程直到所有工作完成为止。和PLINQ一样,在出现未处理异常之后,其他的工作线程将会在其当前迭代完成之后停止,并将异常包装为AggregateException抛出给调用者。
1.Parallel.Invoke方法
Parallel.Invoke
方法并行执行一组Action委托,然后等待它们完成。该方法最简单的定义方式如下:
public static void Invoke (params Action[] actions);
和PLINQ一样,Parallel.*方法是针对计算密集型任务而不是I/O密集型任务进行优化的。但是,我们可以使用一次下载两个网页的方式来简单展示Parallel.Invoke的用法:
Parallel.Invoke (
() => new WebClient().DownloadFile ("http://www.linqpad.net", "lp.html"),
() => new WebClient().DownloadFile ("http://www.jaoo.dk", "jaoo.html"));
从表面看来,Parallel.Invoke就像是创建了两个绑定到线程的Task对象,然后等待它们执行结束的快捷操作。但是它们存在一个重要区别:如果将一百万个委托传递给Parallel.Invoke方法,它仍然能够有效工作。这是因为该方法会将大量的元素划分为若干批次,并将其分派给底层的Task,而不会单纯为每一个委托创建一个独立的Task。
所有的Parallel方法都需要自行整理结果,这意味着要时刻注意线程安全性。例如,以下代码就不是线程安全的:
var data = new List<string>();
Parallel.Invoke (
() => data.Add(new WebClient().DownloadFile ("http://www.linqpad.net", "lp.html")),
() => data.Add(new WebClient().DownloadFile ("http://www.jaoo.dk", "jaoo.html")));
2.Parallel.For方法和Parallel.ForEach方法
Parallel.For和Parallel.ForEach分别等价于C#中的for和foreach循环,但是每一次迭代都是并行而非顺序执行的。以下给出了这两个方法最简单的声明:
public static ParallelLoopResult For(
int fromInclusive, int toExclusive, Action<int, ParallelLoopState> body);
public static ParallelLoopResult ForEach<TSource>(
OrderablePartitioner<TSource> source, Action<TSource, ParallelLoopState, long> body);
对于for循环:
// 顺序
for(int i=0; i<100; i++)
Foo(i);
// 并行话代码
Parallel.For(0, 100, i=> Foo(i));
// 简洁写法
Parallel.For(0, 100,Foo);
而对于foreach循环:
Parallel.ForEach("hello,world",Foo);
外层循环与内层循环
Parallel.For和Parallel.ForEach通常在外层循环比内层循环效果更好,因为前者通常为并行化提供了更大的工作块,而这可以降低管理开销。通常,没必要同时将内层和外层循环并行化。
包含索引的Parallel.ForEach方法
有些情况下,循环迭代中的索引用处很大。对于顺序执行的foreach循环,我们很容易获得索引:
int i = 0;
foreach(char c in "hello,world")
Console.WriteLine(c.ToString() + i++);
但是在并行上下文中,递增一个共享变量的值不是线程安全的。因此必须使用以下版本的ForEach语句:
Parallel.ForEach ("Hello, world", (c, state, i) =>
Console.WriteLine (c.ToString() + i);
为了在实际环境中使用它,我们仍以前面使用PLINQ编写的拼写检查器为例。以下代码将加载一个字典和一个包含一百万个测试单词的数组:
string wordLookupFile = Path.Combine(Path.GetTempPath(), "WordLookup.txt");
if (!File.Exists(wordLookupFile))
new WebClient().DownloadFile(
"http://www.albahari.com/ispell/allwords.txt", wordLookupFile);
var wordLookup = new HashSet<string>(
File.ReadAllLines(wordLookupFile),
StringComparer.InvariantCultureIgnoreCase);
var random = new Random();
string[] wordList = wordLookup.ToArray();
string[] wordsToTest = Enumerable.Range(0, 1000000)
.Select(i => wordList[random.Next(0, wordList.Length)])
.ToArray();
wordsToTest[12345] = "woozsh";
wordsToTest[23456] = "wubsie";
var misspellings = new ConcurrentBag<Tuple<int, string>>();
Parallel.ForEach(wordsToTest, (word, state, i) =>
if (!wordLookup.Contains(word))
misspellings.Add(Tuple.Create((int)i, word));
需要注意的是,必须将结果整理到一个线程安全的集合中,这是一个缺点(与PLINQ相比)。而这种方式胜过PLINQ的地方是索引化ForEach比索引化Select查询运算符的执行效率高。
ParallelLoopState:提前跳出循环
由于并行的For或ForEach的循环体是一个委托,因此无法使用break语句提前结束循环。但是可以调用ParallelLoopState对象的Break方法或Stop方法来跳出或者结束循环:
public class ParallelLoopState
public void Break();
public void Stop();
public bool IsExceptional { get; }
public bool IsStopped { get; }
public long? LowestBreakIteration { get; }
public bool ShouldExitCurrentIteration { get; }
获得ParallelLoopState很容易:所有的For和ForEach都有以Action<TSource, ParallelLoopState>
为循环体的重载。因此,如果需要并行化如下代码:
foreach (char c in "hello,world")
if (c == ',')
break;
Console.Write(c);
// 并行化
Parallel.ForEach("hello,world", (c, loopState) =>
if (c == ',')
loopState.Break();
Console.Write(c);
使用本地值进行优化
Parallel.For和Parallel.ForEach方法均提供了一组接受TLocal泛型类型参数的重载。这些重载方法可帮助优化密集迭代循环过程中的结果整理工作。其中最简单的形式如下:
public static ParallelLoopResult For<TLocal>(
int fromInclusive,
int toExclusive,
Func <TLocal> localInit,
Func <int, ParallelLoopState, TLocal, TLocal> body,
Action <TLocal> localFinally);
这些重载都非常复杂,在实际中也很少用到,幸运的是其大部分应用场景都可以通过PLINQ解决。
大部分问题和以下例子类似:假设要对1~10 000 000之间的数字的平方根求和。计算一千万个平方根这种工作很容易并行化,但是对它们的值求和却有点麻烦。因为我们需要锁定并更新最终结果:
object locker = new object();
double total = 0;
Parallel.For (1, 10000000, i => { lock (locker) total += Math.Sqrt (i); });
并行化的效果大部分都被一千万个锁操作和因此带来的阻塞抵消了。
实际上,并不需要一千万次的锁操作。
考虑这样一种情况,假设有一组志愿者需要收集大量的垃圾。如果所有志愿者都共用一个垃圾桶,那么移动和竞争将显著降低收集效率。显然,更加有效的方式是让每一个志愿者都有仅属于自己的(本地)垃圾桶,只是偶尔将自己的垃圾倒入主垃圾桶。
拥有TLocal的For和ForEach重载就是按照上述方式工作的。志愿者就是内部工作线程,本地值就是本地垃圾桶。为了保证Parallel类型的工作,还需要提供两个额外的委托,分别负责:
初始化一个新的本地值。
将本地聚合值和主要结果值进行合并。
此外,循环体委托的返回值也不是void,而是本地值的聚合结果。以下是重构之后的代码:
object locker = new object();
double grandTotal = 0;
Parallel.For(1, 10000000,
() => 0.0, //初始化本地值
(i, state, localTotal) => // 主体委托
localTotal + Math.Sqrt(i), //返回新的本地总数
localTotal =>
lock (locker)
grandTotal += localTotal; // 将本地数添加到主总数中
虽然我们仍需要锁定,但是这个锁定过程只会在本地结果和最终结果合并时发生,因此整个过程会更加高效。
如前所述,这种场景使用PLINQ往往会更加有效。上述例子可以直接用PLINQ进行并行化:
var total = ParallelEnumerable.Range(1, 10000000)
.Sum(i => Math.Sqrt(i));