.NET中的多线程使用Task
ask可以说是ThreadPool的升级版,在线程任务调度,并行编程中都有很大的作用。
创建并且初始化Task
使用lambda表达式创建Task
Task.Factory.StartNew(() => Console.WriteLine("Hello from a task!"));
var task = new Task(() => Console.Write("Hello"));
task.Start();
用默认参数的委托创建Task
using System;
using System.Threading.Tasks;
namespace MultiThread
{
class ThreadTest
{
static void Main()
{
var task = Task.Factory.StartNew(state => Greet("Hello"), "Greeting");
Console.WriteLine(task.AsyncState); // Greeting
task.Wait();
}
static void Greet(string message) { Console.Write(message); }
}
}
这种方式的一个优点是,task.AsyncState作为一个内置的属性,可以在不同线程中获取参数的状态。
System.Threading.Tasks.TaskCreateOptions
创建Task的时候,我们可以指定创建Task的一些相关选项。在.Net 4.0中,有如下选项:
LongRunning
用来表示这个Task是长期运行的,这个参数更适合block线程。LongRunning线程一般回收的周期会比较长,因此CLR可能不会把它放到线程池中进行管理。
PreferFairness
表示让Task尽量以公平的方式运行,避免出现某些线程运行过快或者过慢的情况。
AttachedToParent
表示创建的Task是当前线程所在Task的子任务。这一个用途也很常见。
下面的代码是创建子任务的示例:
using System;
using System.Threading;
using System.Threading.Tasks;
namespace MultiThread
{
class ThreadTest
{
public static void Main(string[] args)
{
Task parent = Task.Factory.StartNew(() =>
{
Console.WriteLine("I am a parent");
Task.Factory.StartNew(() => // Detached task
{
Console.WriteLine("I am detached");
});
Task.Factory.StartNew(() => // Child task
{
Console.WriteLine("I am a child");
}, TaskCreationOptions.AttachedToParent);
});
parent.Wait();
Console.ReadLine();
}
}
}
如果你等待你一个任务结束,你必须同时等待任务里面的子任务结束。这一点很重要,尤其是你在使用Continue的时候。(后面会介绍)
等待Task
在ThreadPool内置的方法中无法实现的等待,在Task中可以很简单的实现了:
using System;
using System.Threading;
using System.Threading.Tasks;
namespace MultiThread
{
class ThreadTest
{
static void Main()
{
var t1 = Task.Run(() => Go(null));
var t2 = Task.Run(() => Go(123));
Task.WaitAll(t1, t2);//等待所有Task结束
//Task.WaitAny(t1, t2);//等待任意Task结束
}
static void Go(object data) // data will be null with the first call.
{
Thread.Sleep(5000);
Console.WriteLine("Hello from the thread pool! " + data);
}
}
}
注意:
当你调用一个Wait方法时,当前的线程会被阻塞,直到Task返回。但是如果Task还没有被执行,这个时候系统可能会用当前的线程来执行调用Task,而不是新建一个,这样就不需要重新创建一个线程,并且阻塞当前线程。这种做法节省了创建新线程的开销,也避免了一些线程的切换。但是也有缺点,当前线程如果和被调用的Task同时想要获得一个lock,就会导致死锁。
Task异常处理
当等待一个Task完成的时候(调用Wait或者或者访问Result属性的时候),Task任务中没有处理的异常会被封装成AggregateException重新抛出,InnerExceptions属性封装了各个Task没有处理的异常。
using System;
using System.Threading.Tasks;
namespace MultiThreadTest
{
class Program
{
static void Main(string[] args)
{
int x = 0;
Task
try
{
Console.WriteLine(calc.Result);
}
catch (AggregateException aex)
{
Console.Write(aex.InnerException.Message); // Attempted to divide by 0
}
}
}
}
对于有父子关系的Task,子任务未处理的异常会逐层传递到父Task,并且最后包装在AggregateException中。
using System;
using System.Threading.Tasks;
namespace MultiThreadTest
{
class Program
{
static void Main(string[] args)
{
TaskCreationOptions atp = TaskCreationOptions.AttachedToParent;
var parent = Task.Factory.StartNew(() =>
{
Task.Factory.StartNew(() => // Child
{
Task.Factory.StartNew(() => { throw null; }, atp); // Grandchild
}, atp);
});
// The following call throws a NullReferenceException (wrapped
// in nested AggregateExceptions):
parent.Wait();
}
}
}
取消Task
如果想要支持取消任务,那么在创建Task的时候,需要传入一个CancellationTokenSouce
示例代码:
using System;
using System.Threading;
using System.Threading.Tasks;
namespace MultiThreadTest
{
class Program
{
static void Main(string[] args)
{
var cancelSource = new CancellationTokenSource();
CancellationToken token = cancelSource.Token;
Task task = Task.Factory.StartNew(() =>
{
// Do some stuff...
token.ThrowIfCancellationRequested(); // Check for cancellation request
// Do some stuff...
}, token);
cancelSource.Cancel();
try
{
task.Wait();
}
catch (AggregateException ex)
{
if (ex.InnerException is OperationCanceledException)
Console.Write("Task canceled!");
}
Console.ReadLine();
}
}
}
任务的连续执行
Continuations
任务调度也是常见的需求,Task支持一个任务结束之后执行另一个任务。
Task task1 = Task.Factory.StartNew(() => Console.Write("antecedant.."));
Task task2 = task1.ContinueWith(task =>Console.Write("..continuation"));
Continuations 和Task
Task也有带返回值的重载,示例代码如下:
Task.Factory.StartNew
.ContinueWith(ant => ant.Result * 2)
.ContinueWith(ant => Math.Sqrt(ant.Result))
.ContinueWith(ant => Console.WriteLine(ant.Result)); // output 4
子任务
前面提到了,当你等待一个任务的时候,同时需要等待它的子任务完成。
下面代码演示了带子任务的Task:
using System;
using System.Threading.Tasks;
using System.Threading;
namespace MultiThreadTest
{
class Program
{
public static void Main(string[] args)
{
Task
{
int[] results = new int[3];
Task t1 = new Task(() => { Thread.Sleep(3000); results[0] = 0; }, TaskCreationOptions.AttachedToParent);
Task t2 = new Task(() => { Thread.Sleep(3000); results[1] = 1; }, TaskCreationOptions.AttachedToParent);
Task t3 = new Task(() => { Thread.Sleep(3000); results[2] = 2; }, TaskCreationOptions.AttachedToParent);
t1.Start();
t2.Start();
t3.Start();
return results;
});
Task finalTask = parentTask.ContinueWith(parent =>
{
foreach (int result in parent.Result)
{
Console.WriteLine(result);
}
});
finalTask.Wait();
Console.ReadLine();
}
}
}
这段代码的输出结果是: 1,2,3
FinalTask会等待所有子Task结束后再执行。
TaskFactory
关于TaskFactory,上面的例子中我们使用了System.Threading.Tasks .Task.Factory属性来快速的创建Task。当然你也可以自己创建TaskFactory,你可以指定自己的TaskCreationOptions,TaskContinuationOptions来使得通过你的Factory创建的Task默认行为不同。
.Net中有一些默认的创建Task的方式,由于TaskFactory创建Task的默认行为不同可能会导致一些不容易发现的问题。
如在.NET 4.5中,Task加入了一个Run的静态方法:
Task.Run(someAction);
如果你用这个方法代替上面例子中的Task.Factory.StartNew,就无法得到正确的结果。原因是Task.Run创建Task的行为默认是默认是拒绝添加子任务的。上面的代码等价于:
Task.Factory.StartNew(someAction, CancellationToken.None, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
你也可以创建具有自己默认行为的TaskFactory。
无论ThreadPool也好,或者Task,微软都是在想进办法来实现线程的重用,来节省不停的创建销毁线程带来的开销。线程池内部的实现可能在不同版本中有不同的机制。如果可能的话,使用线程池来管理线程仍然是建议的选择。
我们主要介绍了一下Task的基本用法,在我们编程过程中,有一些使用Task来提升程序性能的场景往往是很相似的,微软为了简化编程,在System.Threading.Tasks.Parallel中封装了一系列的并行类,内部也是通过Task来实现的。
Parallel的For,Foreach,Invoke 方法
在编程过程中,我们经常会用到循环语句:
for (int i = 0; i < 10; i++)
{
DoSomeWork(i);
}
如果循环过程中的工作可以是并行的话,那么我们可以用如下语句:
Parallel.For(0, 10, i => DoSomeWork(i));
我们也经常会使用Foreach来遍历某个集合:
foreach (var item in collection)
{
DoSomeWork(item);
}
如果我们用一个线程池来执行里面的任务,那么我们可以写成:
Parallel.ForEach(collection, item => DoSomeWork(item));
最后,如果你想并行的执行几个不同的方法,你可以:
Parallel.Invoke(Method1, Method2, Method3);
如果你看下后台的实现,你会发现基本都是基于Task的线程池,当然你也可以通过手动创建一个Task集合,然后等待所有的任务结束来实现同样的功能。上面的Parallel.For和Parallel.Forach方法并不以为这你可以寻找你代码里面所有用到For和Foreach方法,并且替代他们,因为每一个任务都会分配一个委托,并且在线程池里执行,如果委托里面的任务是线程不安全的,你可能还需要lock来保证线程安全,使用lock本身就会造成性能上的损耗。如果每一个任务都是需要长时间执行并且线程安全的,Parallel会给你带来不错的性能提升。对于短任务,或者线程不安全的任务,你需要权衡下,你是否真的需要使用Parallel。
NET 多线程 使用锁进行同步
通过锁来实现同步
排它锁主要用来保证,在一段时间内,只有一个线程可以访问某一段代码。两种主要类型的排它锁是lock和Mutex。Lock和Mutex相比构造起来更方便,运行的也更快。但是Mutex可以在同一个机器上的不同进程使用。
Monitor.Enter和Monitor.Exit
C#中的lock关键字,实际上是Monitor.Enter,Monitor.Exist的一个简写。在.NET 1.0,2.0,3.0 版本的c#中,lock会被编译成如下代码:
Monitor.Enter(_locker);
try
{
if (_val2 != 0) Console.WriteLine(_val1 / _val2);
_val2 = 0;
}
finally { Monitor.Exit(_locker); }
如果你没有调用Monitor.Enter而直接调用Monitor.Exit会引发异常。
LockTaken版本:
想象一下上面这段代码,如果再Monitor.Enter之后,try之前,线程出现了异常(比如被终止),在这种情况下,finally中的Exit方法就永远不会被执行,也就导致了这个锁不会被释放。为了避免这种情况,CLR 4.0的设计者重载了Monitor.Enter方法:
public static void Enter (object obj, ref bool lockTaken);
如果当前线程由于某些异常导致锁没有被获取到,lockTake值会为false,因此在CLR 4.0中,lock会被解释成如下代码:
bool lockTaken = false;
try
{
Monitor.Enter(_locker, ref lockTaken);
// Do your stuff...
}
finally { if (lockTaken) Monitor.Exit(_locker); }
TryEnter
Monitor也提供了了一个TryEnter方法,允许你设置一个超时时间,避免当前线程长时间获取不到锁而一直等待。
选择正确的同步对象
你需要选择一个对所有线程都可见的对象进行lock(obj)来确保程序能够按照你的意图执行。如果比不了解C#语言中的某些特性,lock可能不会按照你 期望来执行。
由于字符串的驻留机制,lock("string")不是一个好的选择
Lock一个值类型不是一个好的选择
Lock(typeof(..))不是一个好的选择,因为System.Type的特性
什么时候使用lock
一个基本的规则,你需要对任意的写操作,或者可修改的字段进行lock。即使是一个赋值操作,或者累加操作,你也不能假设他是线程安全的。
例如下面代码不是线程安全的:
class ThreadUnsafe
{
static int _x;
static void Increment() { _x++; }
static void Assign() { _x = 123; }
}
你需要这样写:
class ThreadSafe
{
static readonly object _locker = new object();
static int _x;
static void Increment() { lock (_locker) _x++; }
static void Assign() { lock (_locker) _x = 123; }
}
如果你看过一些BCL类库里面的实现,你可以能会发现,某些情况下会使用InterLocked类,而不是lock,我们会在后面介绍。
关于嵌套锁或者reentrant
你在阅读一些文档的时候,有的文档可能会说lock或者Monitor.Enter是reentrant(可重入的),那么我们如何理解reentrant呢?
想象下以下代码:
lock (locker)
lock (locker)
lock (locker)
{
// Do something...
}
或者是:
Monitor.Enter(locker); Monitor.Enter(locker); Monitor.Enter(locker);
// Do something...
Monitor.Exit(locker); Monitor.Exit(locker); Monitor.Exit(locker);
这种情况下,只有在最后一个exit执行后,或者执行了相应次数的Exit后,locker才是可获取的状态。
Mutex
Mutex像c#中的lock一样,但是在不同的进程中仍然可以使用。换句话说,Mutex是一个计算机级别的锁。因此获取这样一个锁要比Monitor慢很多。
示例代码:
using System;
using System.Threading.Tasks;
using System.Threading;
namespace MultiThreadTest
{
class OneAtATimePlease
{
static void Main()
{
// Naming a Mutex makes it available computer-wide. Use a name that's
// unique to your company and application (e.g., include your URL).
using (var mutex = new Mutex(false, "oreilly.com OneAtATimeDemo"))
{
// Wait a few seconds if contended, in case another instance
// of the program is still in the process of shutting down.
if (!mutex.WaitOne(TimeSpan.FromSeconds(3), false))
{
Console.WriteLine("Another app instance is running. Bye!");
return;
}
RunProgram();
}
}
static void RunProgram()
{
Console.WriteLine("Running. Press Enter to exit");
Console.ReadLine();
}
}
}
Semaphore
Monitor和Mutex都是排他锁,Semaphore我们常用的另外一种非排他的锁。
我们用它来实现这样一个例子:一个酒吧,最多能容纳3人,如果客满则需要等待,有客人离开,等待的人随时可以进来。
示例代码:
using System;
using System.Threading;
class TheClub // No door lists!
{
static Semaphore _sem = new Semaphore(3, 3); // Capacity of 3
static void Main()
{
for (int i = 1; i <= 5; i++) new Thread(Enter).Start(i);
Console.ReadLine();
}
static void Enter(object id)
{
Console.WriteLine(id + " wants to enter");
_sem.WaitOne();
Console.WriteLine(id + " is in!"); // Only three threads
Thread.Sleep(1000 * (int)id); // can be here at
Console.WriteLine(id + " is leaving"); // a time.
_sem.Release();
}
}
使用Semaphore需要调用者来控制访问资源,调用WaitOne来获取资源,通过Release来释放资源。开发者有责任确保资源能够正确释放。
Semaphore在限制同步访问的时候非常有用,它不会像Monitor或者Mutex那样当一个线程访问某些资源时,其它所有线程都需要等,而是设置一个缓冲区,允许最多多少个线程同时进行访问。
Semaphore也可以像Mutex一样,跨进程进行同步。