• Parallel.Invoke 方法:并行执行一组委托。
  • Parallel.For 方法:执行与C# for循环等价的并行方法。
  • Parallel.ForEach 方法:执行与C#foreach循环等价的并行方法。
  • 这三个方法都会阻塞线程直到所有工作完成为止。和PLINQ一样,在出现未处理异常之后,其他的工作线程将会在其当前迭代完成之后停止,并将异常包装为AggregateException抛出给调用者。

    1.Parallel.Invoke方法

    Parallel.Invoke 方法并行执行一组Action委托,然后等待它们完成。该方法最简单的定义方式如下:

    public static void Invoke (params Action[] actions);
    

    和PLINQ一样,Parallel.*方法是针对计算密集型任务而不是I/O密集型任务进行优化的。但是,我们可以使用一次下载两个网页的方式来简单展示Parallel.Invoke的用法:

    Parallel.Invoke (
        () => new WebClient().DownloadFile ("http://www.linqpad.net", "lp.html"),
        () => new WebClient().DownloadFile ("http://www.jaoo.dk", "jaoo.html"));
    

    从表面看来,Parallel.Invoke就像是创建了两个绑定到线程的Task对象,然后等待它们执行结束的快捷操作。但是它们存在一个重要区别:如果将一百万个委托传递给Parallel.Invoke方法,它仍然能够有效工作。这是因为该方法会将大量的元素划分为若干批次,并将其分派给底层的Task,而不会单纯为每一个委托创建一个独立的Task。
    所有的Parallel方法都需要自行整理结果,这意味着要时刻注意线程安全性。例如,以下代码就不是线程安全的:

    var data = new List<string>();
    Parallel.Invoke (
        () => data.Add(new WebClient().DownloadFile ("http://www.linqpad.net", "lp.html")),
        () => data.Add(new WebClient().DownloadFile ("http://www.jaoo.dk", "jaoo.html")));
    

    2.Parallel.For方法和Parallel.ForEach方法

    Parallel.For和Parallel.ForEach分别等价于C#中的for和foreach循环,但是每一次迭代都是并行而非顺序执行的。以下给出了这两个方法最简单的声明:

    public static ParallelLoopResult For(
        int fromInclusive, int toExclusive, Action<int, ParallelLoopState> body);
    public static ParallelLoopResult ForEach<TSource>(
        OrderablePartitioner<TSource> source, Action<TSource, ParallelLoopState, long> body);
    

    对于for循环:

    // 顺序
    for(int i=0; i<100; i++)
        Foo(i);
    // 并行话代码
    Parallel.For(0, 100, i=> Foo(i));
    // 简洁写法
    Parallel.For(0, 100,Foo);
    

    而对于foreach循环:

    Parallel.ForEach("hello,world",Foo);
    

    外层循环与内层循环

    Parallel.For和Parallel.ForEach通常在外层循环比内层循环效果更好,因为前者通常为并行化提供了更大的工作块,而这可以降低管理开销。通常,没必要同时将内层和外层循环并行化。

    包含索引的Parallel.ForEach方法

    有些情况下,循环迭代中的索引用处很大。对于顺序执行的foreach循环,我们很容易获得索引:

    int i = 0;
    foreach(char c in "hello,world")
        Console.WriteLine(c.ToString() + i++);
    

    但是在并行上下文中,递增一个共享变量的值不是线程安全的。因此必须使用以下版本的ForEach语句:

    Parallel.ForEach ("Hello, world", (c, state, i) =>
        Console.WriteLine (c.ToString() + i);
    

    为了在实际环境中使用它,我们仍以前面使用PLINQ编写的拼写检查器为例。以下代码将加载一个字典和一个包含一百万个测试单词的数组:

    string wordLookupFile = Path.Combine(Path.GetTempPath(), "WordLookup.txt");
    if (!File.Exists(wordLookupFile))
        new WebClient().DownloadFile(
            "http://www.albahari.com/ispell/allwords.txt", wordLookupFile);
    var wordLookup = new HashSet<string>(
        File.ReadAllLines(wordLookupFile),
        StringComparer.InvariantCultureIgnoreCase);
    var random = new Random();
    string[] wordList = wordLookup.ToArray();
    string[] wordsToTest = Enumerable.Range(0, 1000000)
        .Select(i => wordList[random.Next(0, wordList.Length)])
        .ToArray();
    wordsToTest[12345] = "woozsh";     
    wordsToTest[23456] = "wubsie";     
    var misspellings = new ConcurrentBag<Tuple<int, string>>();
    Parallel.ForEach(wordsToTest, (word, state, i) =>
        if (!wordLookup.Contains(word))
            misspellings.Add(Tuple.Create((int)i, word));
    

    需要注意的是,必须将结果整理到一个线程安全的集合中,这是一个缺点(与PLINQ相比)。而这种方式胜过PLINQ的地方是索引化ForEach比索引化Select查询运算符的执行效率高。

    ParallelLoopState:提前跳出循环

    由于并行的For或ForEach的循环体是一个委托,因此无法使用break语句提前结束循环。但是可以调用ParallelLoopState对象的Break方法或Stop方法来跳出或者结束循环:

    public class ParallelLoopState
        public void Break();
        public void Stop();
        public bool IsExceptional { get; }
        public bool IsStopped { get; }
        public long? LowestBreakIteration { get; }
        public bool ShouldExitCurrentIteration { get; }
    

    获得ParallelLoopState很容易:所有的For和ForEach都有以Action<TSource, ParallelLoopState>为循环体的重载。因此,如果需要并行化如下代码:

    foreach (char c in "hello,world")
        if (c == ',')
            break;
            Console.Write(c);
    // 并行化
    Parallel.ForEach("hello,world", (c, loopState) =>
        if (c == ',')
            loopState.Break();
            Console.Write(c);
    

    使用本地值进行优化

    Parallel.For和Parallel.ForEach方法均提供了一组接受TLocal泛型类型参数的重载。这些重载方法可帮助优化密集迭代循环过程中的结果整理工作。其中最简单的形式如下:

    public static ParallelLoopResult For<TLocal>(
        int fromInclusive,
        int toExclusive,
        Func <TLocal> localInit,
        Func <int, ParallelLoopState, TLocal, TLocal> body,
        Action <TLocal> localFinally);
    

    这些重载都非常复杂,在实际中也很少用到,幸运的是其大部分应用场景都可以通过PLINQ解决。

    大部分问题和以下例子类似:假设要对1~10 000 000之间的数字的平方根求和。计算一千万个平方根这种工作很容易并行化,但是对它们的值求和却有点麻烦。因为我们需要锁定并更新最终结果:

    object locker = new object();
    double total = 0;
    Parallel.For (1, 10000000, i => { lock (locker) total += Math.Sqrt (i); });
    

    并行化的效果大部分都被一千万个锁操作和因此带来的阻塞抵消了。
    实际上,并不需要一千万次的锁操作。
    考虑这样一种情况,假设有一组志愿者需要收集大量的垃圾。如果所有志愿者都共用一个垃圾桶,那么移动和竞争将显著降低收集效率。显然,更加有效的方式是让每一个志愿者都有仅属于自己的(本地)垃圾桶,只是偶尔将自己的垃圾倒入主垃圾桶。
    拥有TLocal的For和ForEach重载就是按照上述方式工作的。志愿者就是内部工作线程,本地值就是本地垃圾桶。为了保证Parallel类型的工作,还需要提供两个额外的委托,分别负责:

  • 初始化一个新的本地值。
  • 将本地聚合值和主要结果值进行合并。
  • 此外,循环体委托的返回值也不是void,而是本地值的聚合结果。以下是重构之后的代码:

    object locker = new object();
    double grandTotal = 0;
    Parallel.For(1, 10000000, 
        () => 0.0,                      //初始化本地值
        (i, state, localTotal) =>       // 主体委托
            localTotal + Math.Sqrt(i),  //返回新的本地总数
        localTotal =>
            lock (locker) 
                grandTotal += localTotal; // 将本地数添加到主总数中
    

    虽然我们仍需要锁定,但是这个锁定过程只会在本地结果和最终结果合并时发生,因此整个过程会更加高效。

    如前所述,这种场景使用PLINQ往往会更加有效。上述例子可以直接用PLINQ进行并行化:

    var total = ParallelEnumerable.Range(1, 10000000)
        .Sum(i => Math.Sqrt(i));