.NET中的多线程 使用锁进行同步 及 Task

作者:袖梨 2022-06-25

.NET中的多线程使用Task


ask可以说是ThreadPool的升级版,在线程任务调度,并行编程中都有很大的作用。


创建并且初始化Task


使用lambda表达式创建Task

Task.Factory.StartNew(() => Console.WriteLine("Hello from a task!"));
 
var task = new Task(() => Console.Write("Hello"));
task.Start();

  
用默认参数的委托创建Task
    
using System;
using System.Threading.Tasks;
 
namespace MultiThread
{
    class ThreadTest
    {
        static void Main()
        {
            var task = Task.Factory.StartNew(state => Greet("Hello"), "Greeting");
            Console.WriteLine(task.AsyncState);   // Greeting
            task.Wait();
        }
 
        static void Greet(string message) { Console.Write(message); }
 
    }
}

  

这种方式的一个优点是,task.AsyncState作为一个内置的属性,可以在不同线程中获取参数的状态。


System.Threading.Tasks.TaskCreateOptions

创建Task的时候,我们可以指定创建Task的一些相关选项。在.Net 4.0中,有如下选项:
LongRunning

用来表示这个Task是长期运行的,这个参数更适合block线程。LongRunning线程一般回收的周期会比较长,因此CLR可能不会把它放到线程池中进行管理。
PreferFairness

表示让Task尽量以公平的方式运行,避免出现某些线程运行过快或者过慢的情况。
AttachedToParent

表示创建的Task是当前线程所在Task的子任务。这一个用途也很常见。

下面的代码是创建子任务的示例:

using System;
using System.Threading;
using System.Threading.Tasks;
 
namespace MultiThread
{
    class ThreadTest
    {
        public static void Main(string[] args)
        {
            Task parent = Task.Factory.StartNew(() =>
            {
                Console.WriteLine("I am a parent");
 
                Task.Factory.StartNew(() =>        // Detached task
                {
                    Console.WriteLine("I am detached");
                });
 
                Task.Factory.StartNew(() =>        // Child task
                {
                    Console.WriteLine("I am a child");
                }, TaskCreationOptions.AttachedToParent);
            });
 
            parent.Wait();
 
            Console.ReadLine();
        }
 
    }
}

  

如果你等待你一个任务结束,你必须同时等待任务里面的子任务结束。这一点很重要,尤其是你在使用Continue的时候。(后面会介绍)


等待Task

在ThreadPool内置的方法中无法实现的等待,在Task中可以很简单的实现了:
    
using System;
using System.Threading;
using System.Threading.Tasks;
 
namespace MultiThread
{
    class ThreadTest
    {
        static void Main()
        {
            var t1 = Task.Run(() => Go(null));
            var t2 = Task.Run(() => Go(123));
            Task.WaitAll(t1, t2);//等待所有Task结束
            //Task.WaitAny(t1, t2);//等待任意Task结束
        }
 
        static void Go(object data)   // data will be null with the first call.
        {
            Thread.Sleep(5000);
            Console.WriteLine("Hello from the thread pool! " + data);
        }
    }
}

  

注意:

当你调用一个Wait方法时,当前的线程会被阻塞,直到Task返回。但是如果Task还没有被执行,这个时候系统可能会用当前的线程来执行调用Task,而不是新建一个,这样就不需要重新创建一个线程,并且阻塞当前线程。这种做法节省了创建新线程的开销,也避免了一些线程的切换。但是也有缺点,当前线程如果和被调用的Task同时想要获得一个lock,就会导致死锁。


Task异常处理

当等待一个Task完成的时候(调用Wait或者或者访问Result属性的时候),Task任务中没有处理的异常会被封装成AggregateException重新抛出,InnerExceptions属性封装了各个Task没有处理的异常。

using System;
using System.Threading.Tasks;
 
namespace MultiThreadTest
{
    class Program
    {
        static void Main(string[] args)
        {
            int x = 0;
            Taskcalc = Task.Factory.StartNew(() => 7 / x);
            try
            {
                Console.WriteLine(calc.Result);
            }
            catch (AggregateException aex)
            {
                Console.Write(aex.InnerException.Message);  // Attempted to divide by 0
            }
        }
    }
}

  

对于有父子关系的Task,子任务未处理的异常会逐层传递到父Task,并且最后包装在AggregateException中。

using System;
using System.Threading.Tasks;
 
namespace MultiThreadTest
{
    class Program
    {
        static void Main(string[] args)
        {
            TaskCreationOptions atp = TaskCreationOptions.AttachedToParent;
            var parent = Task.Factory.StartNew(() =>
            {
                Task.Factory.StartNew(() =>   // Child
                {
                    Task.Factory.StartNew(() => { throw null; }, atp);   // Grandchild
                }, atp);
            });
 
            // The following call throws a NullReferenceException (wrapped
            // in nested AggregateExceptions):
            parent.Wait();
        }
    }
}

  
取消Task

如果想要支持取消任务,那么在创建Task的时候,需要传入一个CancellationTokenSouce

示例代码:

using System;
using System.Threading;
using System.Threading.Tasks;
 
namespace MultiThreadTest
{
    class Program
    {
        static void Main(string[] args)
        {
            var cancelSource = new CancellationTokenSource();
            CancellationToken token = cancelSource.Token;
 
            Task task = Task.Factory.StartNew(() =>
            {
                // Do some stuff...
                token.ThrowIfCancellationRequested();  // Check for cancellation request
                // Do some stuff...
            }, token);
            cancelSource.Cancel();
 
            try
            {
                task.Wait();
            }
            catch (AggregateException ex)
            {
                if (ex.InnerException is OperationCanceledException)
                    Console.Write("Task canceled!");
            }
 
            Console.ReadLine();
        }
    }
}

  
任务的连续执行


Continuations

任务调度也是常见的需求,Task支持一个任务结束之后执行另一个任务。

Task task1 = Task.Factory.StartNew(() => Console.Write("antecedant.."));
Task task2 = task1.ContinueWith(task =>Console.Write("..continuation"));

  
Continuations 和Task

Task也有带返回值的重载,示例代码如下:

Task.Factory.StartNew(() => 8)
    .ContinueWith(ant => ant.Result * 2)
    .ContinueWith(ant => Math.Sqrt(ant.Result))
    .ContinueWith(ant => Console.WriteLine(ant.Result));   // output 4

  
子任务

前面提到了,当你等待一个任务的时候,同时需要等待它的子任务完成。

下面代码演示了带子任务的Task:

using System;
using System.Threading.Tasks;
using System.Threading;
 
namespace MultiThreadTest
{
    class Program
    {
        public static void Main(string[] args)
        {
            TaskparentTask = Task.Factory.StartNew(() =>
            {
                int[] results = new int[3];
 
                Task t1 = new Task(() => { Thread.Sleep(3000); results[0] = 0; }, TaskCreationOptions.AttachedToParent);
                Task t2 = new Task(() => { Thread.Sleep(3000); results[1] = 1; }, TaskCreationOptions.AttachedToParent);
                Task t3 = new Task(() => { Thread.Sleep(3000); results[2] = 2; }, TaskCreationOptions.AttachedToParent);
 
                t1.Start();
                t2.Start();
                t3.Start();
 
                return results;
            });
 
            Task finalTask = parentTask.ContinueWith(parent =>
            {
                foreach (int result in parent.Result)
                {
                    Console.WriteLine(result);
                }
            });
 
            finalTask.Wait();
            Console.ReadLine();
        }
    }
}


这段代码的输出结果是: 1,2,3

FinalTask会等待所有子Task结束后再执行。


TaskFactory

关于TaskFactory,上面的例子中我们使用了System.Threading.Tasks .Task.Factory属性来快速的创建Task。当然你也可以自己创建TaskFactory,你可以指定自己的TaskCreationOptions,TaskContinuationOptions来使得通过你的Factory创建的Task默认行为不同。

.Net中有一些默认的创建Task的方式,由于TaskFactory创建Task的默认行为不同可能会导致一些不容易发现的问题。

如在.NET 4.5中,Task加入了一个Run的静态方法:

Task.Run(someAction);

如果你用这个方法代替上面例子中的Task.Factory.StartNew,就无法得到正确的结果。原因是Task.Run创建Task的行为默认是默认是拒绝添加子任务的。上面的代码等价于:

Task.Factory.StartNew(someAction, CancellationToken.None, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);

你也可以创建具有自己默认行为的TaskFactory。


无论ThreadPool也好,或者Task,微软都是在想进办法来实现线程的重用,来节省不停的创建销毁线程带来的开销。线程池内部的实现可能在不同版本中有不同的机制。如果可能的话,使用线程池来管理线程仍然是建议的选择。

 
我们主要介绍了一下Task的基本用法,在我们编程过程中,有一些使用Task来提升程序性能的场景往往是很相似的,微软为了简化编程,在System.Threading.Tasks.Parallel中封装了一系列的并行类,内部也是通过Task来实现的。


Parallel的For,Foreach,Invoke 方法


在编程过程中,我们经常会用到循环语句:

for (int i = 0; i < 10; i++)
{
    DoSomeWork(i);
}


如果循环过程中的工作可以是并行的话,那么我们可以用如下语句:

Parallel.For(0, 10, i => DoSomeWork(i));


我们也经常会使用Foreach来遍历某个集合:

foreach (var item in collection)
{
    DoSomeWork(item);
}


如果我们用一个线程池来执行里面的任务,那么我们可以写成:

Parallel.ForEach(collection, item => DoSomeWork(item));

最后,如果你想并行的执行几个不同的方法,你可以:

Parallel.Invoke(Method1, Method2, Method3);

  

如果你看下后台的实现,你会发现基本都是基于Task的线程池,当然你也可以通过手动创建一个Task集合,然后等待所有的任务结束来实现同样的功能。上面的Parallel.For和Parallel.Forach方法并不以为这你可以寻找你代码里面所有用到For和Foreach方法,并且替代他们,因为每一个任务都会分配一个委托,并且在线程池里执行,如果委托里面的任务是线程不安全的,你可能还需要lock来保证线程安全,使用lock本身就会造成性能上的损耗。如果每一个任务都是需要长时间执行并且线程安全的,Parallel会给你带来不错的性能提升。对于短任务,或者线程不安全的任务,你需要权衡下,你是否真的需要使用Parallel。


NET 多线程 使用锁进行同步


通过锁来实现同步


排它锁主要用来保证,在一段时间内,只有一个线程可以访问某一段代码。两种主要类型的排它锁是lock和Mutex。Lock和Mutex相比构造起来更方便,运行的也更快。但是Mutex可以在同一个机器上的不同进程使用。


Monitor.Enter和Monitor.Exit

C#中的lock关键字,实际上是Monitor.Enter,Monitor.Exist的一个简写。在.NET 1.0,2.0,3.0 版本的c#中,lock会被编译成如下代码:
    
Monitor.Enter(_locker);
try
{
    if (_val2 != 0) Console.WriteLine(_val1 / _val2);
    _val2 = 0;
}
finally { Monitor.Exit(_locker); }

如果你没有调用Monitor.Enter而直接调用Monitor.Exit会引发异常。
LockTaken版本:

想象一下上面这段代码,如果再Monitor.Enter之后,try之前,线程出现了异常(比如被终止),在这种情况下,finally中的Exit方法就永远不会被执行,也就导致了这个锁不会被释放。为了避免这种情况,CLR 4.0的设计者重载了Monitor.Enter方法:

public static void Enter (object obj, ref bool lockTaken);

如果当前线程由于某些异常导致锁没有被获取到,lockTake值会为false,因此在CLR 4.0中,lock会被解释成如下代码:

bool lockTaken = false;
try
{
 
    Monitor.Enter(_locker, ref lockTaken);
 
    // Do your stuff...
}
 
finally { if (lockTaken) Monitor.Exit(_locker); }


TryEnter

Monitor也提供了了一个TryEnter方法,允许你设置一个超时时间,避免当前线程长时间获取不到锁而一直等待。


选择正确的同步对象

你需要选择一个对所有线程都可见的对象进行lock(obj)来确保程序能够按照你的意图执行。如果比不了解C#语言中的某些特性,lock可能不会按照你 期望来执行。

    由于字符串的驻留机制,lock("string")不是一个好的选择
    Lock一个值类型不是一个好的选择
    Lock(typeof(..))不是一个好的选择,因为System.Type的特性

什么时候使用lock

一个基本的规则,你需要对任意的写操作,或者可修改的字段进行lock。即使是一个赋值操作,或者累加操作,你也不能假设他是线程安全的。

例如下面代码不是线程安全的:

class ThreadUnsafe
{
    static int _x;
    static void Increment() { _x++; }
    static void Assign() { _x = 123; }
}

 

你需要这样写:

class ThreadSafe
{
    static readonly object _locker = new object();
    static int _x;
 
    static void Increment() { lock (_locker) _x++; }
    static void Assign() { lock (_locker) _x = 123; }
}

如果你看过一些BCL类库里面的实现,你可以能会发现,某些情况下会使用InterLocked类,而不是lock,我们会在后面介绍。


关于嵌套锁或者reentrant

你在阅读一些文档的时候,有的文档可能会说lock或者Monitor.Enter是reentrant(可重入的),那么我们如何理解reentrant呢?

想象下以下代码:

lock (locker)
    lock (locker)
        lock (locker)
        {
            // Do something...
        }

 

或者是:
    
Monitor.Enter(locker); Monitor.Enter(locker); Monitor.Enter(locker);
// Do something...
Monitor.Exit(locker); Monitor.Exit(locker); Monitor.Exit(locker);

这种情况下,只有在最后一个exit执行后,或者执行了相应次数的Exit后,locker才是可获取的状态。
Mutex

Mutex像c#中的lock一样,但是在不同的进程中仍然可以使用。换句话说,Mutex是一个计算机级别的锁。因此获取这样一个锁要比Monitor慢很多。

示例代码:

using System;
using System.Threading.Tasks;
using System.Threading;
 
namespace MultiThreadTest
{
    class OneAtATimePlease
    {
        static void Main()
        {
            // Naming a Mutex makes it available computer-wide. Use a name that's
            // unique to your company and application (e.g., include your URL).
 
            using (var mutex = new Mutex(false, "oreilly.com OneAtATimeDemo"))
            {
                // Wait a few seconds if contended, in case another instance
                // of the program is still in the process of shutting down.
 
                if (!mutex.WaitOne(TimeSpan.FromSeconds(3), false))
                {
                    Console.WriteLine("Another app instance is running. Bye!");
                    return;
                }
                RunProgram();
            }
        }
 
        static void RunProgram()
        {
            Console.WriteLine("Running. Press Enter to exit");
            Console.ReadLine();
        }
    }
}


Semaphore

Monitor和Mutex都是排他锁,Semaphore我们常用的另外一种非排他的锁。

我们用它来实现这样一个例子:一个酒吧,最多能容纳3人,如果客满则需要等待,有客人离开,等待的人随时可以进来。

示例代码:

using System;
using System.Threading;
 
class TheClub      // No door lists!
{
    static Semaphore _sem = new Semaphore(3, 3);    // Capacity of 3
 
    static void Main()
    {
        for (int i = 1; i <= 5; i++) new Thread(Enter).Start(i);
 
        Console.ReadLine();
    }
 
    static void Enter(object id)
    {
        Console.WriteLine(id + " wants to enter");
        _sem.WaitOne();
        Console.WriteLine(id + " is in!");           // Only three threads
        Thread.Sleep(1000 * (int)id);               // can be here at
        Console.WriteLine(id + " is leaving");       // a time.
        _sem.Release();
    }
}

 

使用Semaphore需要调用者来控制访问资源,调用WaitOne来获取资源,通过Release来释放资源。开发者有责任确保资源能够正确释放。

Semaphore在限制同步访问的时候非常有用,它不会像Monitor或者Mutex那样当一个线程访问某些资源时,其它所有线程都需要等,而是设置一个缓冲区,允许最多多少个线程同时进行访问。

Semaphore也可以像Mutex一样,跨进程进行同步。

相关文章

精彩推荐