《编写高质量C#的建议》学习笔记 第六部分 异步、多线程、任务和并行
第六部分 异步、多线程、任务和并行
异步、多线程、任务和并行
异步、并行、任务中的异常处理
第六部分 异步、多线程、任务和并行
异步、多线程、任务和并行
计算密集型工作,采用多线程。 IO密集型工作,采用异步机制。
所谓线程同步就是多个线程在某个对象上执行等待(锁定该对象),直到该对象被解除锁定。值类型不能被锁定,引用类型上的等待机制分为两类:
1 |
|
锁定使用关键字lock和类型Monitor。两者没有实质区别,前者其实是后者的语法糖。
信号同步机制中涉及的类型都继承自抽象类WaitHandle,这些类型有EventWaitHandle(类型化为AutoResetEvent、ManualResetEvent)、Semaphore以及Mutex。
它们底层的原理是一致的,维护的都是一个系统内核句柄。简单的区别如下:
1 |
|
以上两者提供的都是单应用程序域内的线程同步功能,Mutex提供的是跨应用程序域阻塞和解除阻塞线程的能力。
使用信号机制提供线程同步的简单例子:
// 创建一个同步类型对象,设置默认阻滞状态是false。任何在它上面进行等待的线程都将被阻滞。
AutoResetEvent autoResetEvent = new AutoResetEvent(false);
private void buttonStartAThread_Click(object sender, EventArgs e)
{
Thread tWork = new Thread(() =>
{
label1.Text = " 线程启动..." + Environment.NewLine;
label1.Text += " 开始处理一些实际的工作" + Environment.NewLine;
// 省略工作代码
label1.Text += " 我开始等待别的线程给我信号,才愿意继续下去 " + Environment.NewLine;
autoResetEvent.WaitOne(); // 这就是等待的那个线程
label1.Text += "我继续做一些工作,然后结束了!";
// 省略工作代码
});
tWork.IsBackground = true;
tWork.Start();
}
private void buttonSet_Click(object sender, EventArgs e)
{
//给在autoResetEvent 上等待的线程一个信号
autoResetEvent.Set();
}
AutoResetEvent和ManualResetEvent的区别是:前者在发送信号完毕后(即调用Set方法),会自动将自己的阻滞状态设置为false,而后者需要进行手动设定。
如果是两个工作线程都阻滞,直到收到主线程的信号再继续工作:
以上就是开两个tWork,都等待接收Set,使用AutoResetEvent就会导致只有一个工作线程继续工作,因为AutoResetEvent发送信号完毕后就在内核中自动将自己的状态设置回false了,所以另一个工作线程根本没有收到信号。换成ManualResetEvent就可以了。
另外再模拟一个网络通信的心跳检测:
线程tClient模拟客户端,主线程模拟服务器端,每3秒检测是否收到服务器的心跳数据,如果没有心跳数据,则显示网络连接断开。
AutoResetEvent autoResetEvent = new AutoResetEvent(false);
private void buttonStartAThread_Click(object sender, EventArgs e)
{
Thread tClient = new Thread(() =>
{
while (true)
{
// 等3秒,3秒没有信号,显示断开
// 有信号,则显示更新
bool re = autoResetEvent.WaitOne(3000);
if(re)
{
label1.Text = String.Format("时间:{0},{1}",DateTime.Now.ToString(), "保持连接状态");
}
else
{
label1.Text = String.Format("时间:{0},{1}",DateTime.Now.ToString(), "断开,需要重启");
}
}
});
tClient.IsBackground = true;
tClient.Start();
}
private void buttonSet_Click(object sender, EventArgs e)
{
// 模拟发送心跳数据
autoResetEvent.Set();
}
线程同步的另一种编码方式是使用线程锁。锁住一个资源,使得应用程序在此刻只有一个线程访问该资源。
选择锁对象(也叫同步对象),需要注意以下几点:
-
同步对象在需要同步的多个线程中是可见的同一个对象。 // 不建议使用lock(this),如果两个对象的实例分别执行了锁定的代码,实际锁定的也就会是两个对象,不能达到同步的目的。 // 编写多线程代码时,遵循这样的一个原则:类型的静态方法应当保证线程安全,非静态方法不需要实现线程安全。
-
在非静态方法中,静态变量不应作为同步对象。
-
值类型对象不能作为同步对象。 // 值类型在传递到另一个线程的时候,会创建一个副本,相当于每个线程锁定的也是两个对象。
-
避免将字符串作为同步对象。 // 没必要且危险 // 如果有两个变量被分配了相同内容的字符串,这两个引用会被指向同一块内存。所以,如果有两个地方同时使用了lock(“abc”), // 那么实际锁定的是同一个对象,会导致整个应用程序被阻滞。
-
降低同步对象的可见性。 // 将同步对象藏起来,只开放给自己或自己的子类(情况也不多)就够了 // 可见范围最广的一种同步对象时typeof(SampleClass)。typeof方法所返回的结果(也就是类型的type)是SampleClass的所有 // 实例所共有的,即:所有实例的type都指向typeof方法的结果。这样一来,如果lock(typeof(SampleClass)),当前应用程序中 // 所有SampleClass的实例线程将会全部被同步。完全没必要,且这样的同步对象太开放了。
线程分为前台前程和后台线程,即每个线程都有一个IsBackground属性。两者在表现形式上的唯一区别是:如果前台线程不退出,应用程序的进程就会一直存在,必须所有的前台线程全部退出,应用程序才算退出。而后台进程则没有这方面的限制,如果应用程序退出,后台线程也会一并退出。
查看如下代码:
Thread t = new Thread(() =>
{
Console.WriteLine("线程开始工作……");
// 省略工作代码
Console.ReadKey();
Console.WriteLine("线程结束");
});
// 注意,默认就为false
t.IsBackground = false;
t,Start();
Console.WriteLine("主线程完毕");
用Thread创建的线程默认是前台线程,也就是IsBackground属性默认是false。以上代码需要等到工作结束(敲入一个按键)应用程序才会结束,而如果设置IsBackground为true,应用程序则会立刻结束。
演示代码用的是Thread,线程池中的线程默认都是后台线程。基于两者的区别,实际编码中应该更多使用后台线程。只有在非常关键的工作中,如线程正在执行事务或者占有的某些非托管资源需要释放时,才使用前台线程。
C#中,线程之间的调度占有一定的时间和空间开销,并且不实时。
如果想将0到9分别传给10个不同的线程,如下代码:
static int _id = 0;
static void Main()
{
for (int i = 0; i < 10; i++, _id++)
{
Thread t = new Thread(() =>
{
Console.WriteLine(String.Format("{0}:{1}", Thread.CurrentThread.Name, _id));
});
t.Name = String.Format("Thread{0}", i);
t.IsBackground = true;
t.Start();
}
Console.ReadLine();
}
可能输出如下:
1 |
|
首先,线程没有按顺序启动,代码中,前面Start的那个线程也许迟于Start的那个线程执行;
其次,传入线程内部的ID值,不再是for循环执行中当前的ID值。
需要改为同步代码:
static int _id = 0;
static void Main()
{
for (int i = 0; i < 10; i++, _id++)
{
NewMethod1(i, _id);
}
Console.ReadLine();
}
private static void NewMethod1(int i, int realTimeID)
{
Thread t = new Thread(() =>
{
Console.WriteLine(String.Format("{0}:{1}", Thread.CurrentThread.Name, realTimeID));
});
t.Name = String.Format("Thread{0}", i);
t.IsBackground = true;
t.Start();
}
输出为:
1 |
|
只是不会立即启动,但传入线程的ID值都正确。
C#线程有五个优先级:Highest、AboveNormal、Normal、BelowNormal和Lowest。
Windows系统是基于优先级的抢占式调度系统。系统中优先运行优先级较高且就绪状态的线程。
使用Thread和ThreadPool新起的线程,默认优先级都是Normal。
如果是非常关键的线程,可以通过 t.Priority = ThreadPriority.Highest; 来提升线程的优先级。但是一般不建议这么做。这些关键线程应当具有运行时间短,能即刻进入等待状态等特征。
正确停止线程的标准的取消模式:协作式取消。
如下是一个最基础的协作式取消的示例:
CancellationTokenSource cts = new CancellationTokenSource();
Thread t = new Thread(() =>
{
while (true)
{
if(cts.Token.IsCancellationRequested)
{
Console.WriteLine(" 线程被终止! ");
break;
}
Console.WriteLine(DateTime.Now.ToString());
Thread.Sleep(1000);
}
});
t.Start(); // 以1000ms的频率边工作便检查是否传入cancel信号,如果有信号,则退出
Console.ReadLine();
cts.Cancel(); // 通知工作线程退出
/// 正确停止线程的机制中,真正起作用的应该是线程本身。
CancelllationToken还有一个Register方法,负责传递一个Action委托,在线程停止的时候被回调,使用方法如下:
cts.Token.Register(() =>
{
Console.WriteLine("工作线程被终止了。");
});
应避免线程数量过多:
1 |
|
错误创建过多线程的典型例子是:为每个Socket连接建立一个线程去管理。每个连接一个线程,意味着在32位系统的服务器不能同时管理超过1000台的客户机。CLR为每个进程分配的内存会超过1MB。约1000个进程,加上.NET进程启动本身所占用的一些内存,即刻就耗尽了系统能分配给进程的最大可用地址空间2GB。即便应用程序在设计之初的需求设计书中说明,生产环境中客户端数目不会超过500台,在管理这500台客户端时进行线程上下文切换,也会损耗相当多的CPU时间。这类I/O密集型场合应该使用异步去完成。
过多的线程还会带来另一个问题:新起的线程可能需要等待相当长的时间才会真正运行。除了启动问题外,线程之间的切换也存在同样的问题。
所以,不要滥用线程,尤其不要滥用过多的线程。即使真的需要线程也应该考虑使用线程池技术。比如I/O密集型场合,应该使用异步来完成。异步会在后台使用线程池进行管理。1000台客户端在使用了异步技术后,实际只要几个线程就能完成所有的管理工作(具体取决于“心跳频率”)。
线程能极大地提升用户体验度,但是线程的开销也是很大的。
线程的空间开销来自:
- 线程内核对象(Thread Kernel Object)。每个线程都会创建一个这样的对象,它主要包含线程上下文信息,在32位系统中,它所占用的内存在700字节左右。
- 线程环境块(Thread Environment Block)。TEB包括线程的异常处理链,32位系统中占用4KB内存。
- 用户模式栈(User Mode Stack),即线程栈。线程栈用于保存方法的参数、局部变量和返回值。每个线程栈占用1024KB的内存。要用完内存很简单,写个不能结束的递归方法,让方法参数和返回值不停地消耗内存,很快就会发生OutOfMemoryException。
- 内核模式栈(Kernel Mode Stack)。当调用操作系统的内核模式函数时,系统会将函数参数从用户模式栈复制到内核模式栈。在32位系统中,内核模式栈会占用12KB内存。
线程的时间开销来自:
- 线程创建的时候,系统相继初始化以上这些内存空间。
- 接着CLR会调用所有加载DLL的DLLMain方法,并传递连接标志(线程终止的时候,也会调用DLL的DLLMain方法,并传递分离标志)。
- 线程上下文切换。一个系统中会加载很多的进程,而一个进程又包含若干个线程。但是一个CPU在任何时候只能有一个线程在执行。为了让每个线程看上去都在运行,系统会不断地切换“线程上下文”:每个线程大概得到几十毫秒的执行时间片,然后就会切换到下一个线程了。这个过程大概又分为以下5个步骤:
- 步骤1 进入内核模式。
- 步骤2 将上下文信息(主要是一些CPU寄存器信息)保存到正在执行的线程内核对象上。
- 步骤3 系统获取一个Spinlock,并确定下一个要执行的线程,然后释放Spinlock。如果下一个线程不在同一个进程内,则需要进行虚拟地址交换。
- 步骤4 从将被执行的线程内核对象上载入上下文信息。
- 步骤5 离开内核模式。
创建和销毁一个线程由于要进行如此多的工作,代价昂贵。应该使用ThreadPool或BackgroundWorker代替Thread。
线程池替开发人员管理工作线程。当一项工作完毕时,CLR不会销毁这个线程,而是保留一段时间,看是否有别的工作需要这个线程。至于何时销毁或新起线程,由CLR根据自身算法决定。所以多线程编码时,不应想到:
Thread t = new Thread(() =>
{
// 工作代码
});
t.Start();
应该首先想到依赖线程池:
ThreadPool.QueueUserWorkItem((objState) =>
{
// 工作代码
}, null);
重点关注业务的实现,而不是线程的性能测试。
BackgroundWorker是在内部使用了线程池技术,同时,在Winform或WPF编程中,它还给工作线程和UI线程提供了交互的能力。
Thread和ThreadPool默认都没有这种交互能力。BackgroundWorker通过事件提供了这种能力。这种能力包括:报告进度、支持完成回调、取消任务、暂停任务等。简单示例如下:
private BackgroundWorker worker;
private void startAsyncButton_Click(System.Object sender, System.EventArgs e)
{
worker.DoWork += new DoWorkEventHandler(worker_DoWork);
worker.ProgressChanged += new ProgressChangedEventHandler(worker_ProgressChanged);
worker.RunWorkerAsync();
}
private void worker_DoWork(object sender, DoWorkEventArgs e)
{
BackgroundWorker worker = sender as BackgroundWorker;
for (int i = 0; i < 10; i++)
{
worker.ReportProgress(i);
Thread.Sleep(100);
}
}
private void worker_ProgressChanged(object sender, ProgressChangedEventArgs e)
{
this.label1.Text = e.ProgressPercentage.ToString();
}
ThreadPool在使用上是存在一定的不方便的:
- 不支持线程的取消、完成、失败通知等交互性操作
- 不支持线程执行的先后次序
而Task在线程池的基础上进行了优化。
简单的任务示例如下:
static void Main(String[] args)
{
Task t = new Task(() =>
{
Console.WriteLine("任务开始工作……");
// 模拟工作过程
Thread.Sleep(5000);
});
t.Start();
t.ContinueWith((task) =>
{
Console.WriteLine("任务完成,完成时候的状态为:");
Console.WriteLine("IsCanceled={0}\tIsCompleted={1}\tIsFaulted={2}", task.IsCanceled, task.IsCompleted, task.IsFaulted);
});
Console.ReadKey();
}
任务没有提供回调事件来通知完成,而是通过启用一个新任务的方式来完成类似的功能,在新任务中获取原任务的结果值。
任务完成时的状态有:IsCanceled 因为被取消而完成 IsCompleted 成功完成 IsFaulted 因为发生异常而完成下面是个稍微复杂的例子,同时支持完成通知、取消。获取任务返回值等功能:
static void Main(String[] args)
{
CancellationTokenSource cts = new CancellationTokenSource();
Task<int> t = new Task<int>(() => Add(cts.Token), cts.Token);
t.Start();
t.ContinueWith(TaskEnded);
// 等待按任意键取消任务
Console.ReadKey();
cts.Cancel();
Console.ReadKey();
}
static void TaskEnded(Task<int> task)
{
Console.WriteLine("任务完成,完成时候的状态为:");
Console.WriteLine("IsCanceled={0}\tIsCompleted={1}\tIsFaulted={2}", task.IsCanceled, task.IsCompleted, task.IsFaulted);
Console.WriteLine("任务的返回值为:{0}", task.Result);
}
static int Add(CancellationToken ct)
{
Console.WriteLine("任务开始……");
int result = 0;
while (!ct.IsCancellationRequested)
{
result++;
Thread.Sleep(1000);
}
return result;
}
Task还支持任务工厂的概念。任务工厂支持多个任务之间共享相同的状态,如取消类型CancellationTokenSource就是可以被共享的。
通过使用任务工厂,可以同时取消一组任务:
static void Main(String[] args)
{
CancellationTokenSource cts = new CancellationTokenSource();
// 等待按任意键取消任务
TaskFactory taskFactory = new TaskFactory();
Task[] tasks = new Task[]
{
taskFactory.StartNew(() => Add(cts.Token)),
taskFactory.StartNew(() => Add(cts.Token)),
taskFactory.StartNew(() => Add(cts.Token))
};
// CancellationToken.None只是TasksEnded不能被取消
taskFactory.ContinueWithAll(tasks, TasksEnded, CancellationToken.None);
Console.ReadKey();
cts.Cancel();
Console.ReadKey();
}
static void TasksEnded(Task[] tasks)
{
Console.WriteLine("所有任务已完成!");
}
Task进一步优化了后台线程池的调度,加快了线程的处理速度。如果使用多线程,理应更多地使用Task。
Parallel可以简化在同步状态下的Task操作。主要有3个方法:For、ForEach、Invoke。
因为是同步状态下,所以运行Parallel中的For、ForEach方法时,调用者线程(示例中的主线程)是被阻滞的。
Parallel虽然将任务交给Task处理,即交给线程池处理,不过调用者会一直等到线程池中的相关工作全部完成。
表示并行的静态类Parallel甚至只提供了Invoke方法,而没有同时提供一个BeginInvoke方法,也从一定程度上说明了这个问题。
使用Task时,最常用Start方法(Task也提供了RunSynchronously),它不会阻滞调用者线程。如下所示:
static void Main()
{
Task t = new Task(() =>
{
while (true)
{
}
});
t.Start();
Console.WriteLine("主线程即将结束"); // 输出这句
Console.ReadKey();
}
如果使用Parallel执行相近功能,主线程会被阻滞:
static void Main()
{
// 这里也可以使用Invoke方法
Parallel.For(0, 1, (i) =>
{
while (true)
{
}
});
Console.WriteLine("主线程即将结束"); // 永远不会输出
Console.ReadKey();
}
并行编程意味着运行时在后台将任务分配到尽量多的CPU上,虽然它在后台使用Task管理,但不意味这它等同于异步。
For主要用于处理针对数组元素的并行操作:
static void Main(String[] args)
{
int[] nums = new int[] {1, 2, 3, 4};
Parallel.For(0, nums.Length, (i) =>
{
Console.WriteLine("针对数组索引{0}对应的那个元素{1}的一些工作代码……", i, nums[i]);
});
Console.ReadKey();
}
注意
工作代码不会按照数组的索引次序进行遍历。因为遍历是并行的,不是顺序的。
如果我们的输出必须是同步的,或者说必须是顺序输出的,则不应使用Parallel的方式。
ForEach方法主要用于处理泛型集合元素的并行操作:
static void Main(string[] args)
{
List<int> nums = new List<int> {1, 2, 3, 4 };
Parallel.ForEach(nums, (item) =>
{
Console.WriteLine("针对集合元素{0}的一些工作代码……", item);
});
Console.ReadKey();
}
Invoke方法简化了启动一组并行操作,隐式启动的就是Task。该方法接收Params Action[]参数:
static void Main(string[] args)
{
Parallel.Invoke(() =>
{
Console.WriteLine("任务1……");
},
() =>
{
Console.WriteLine("任务2……");
},
() =>
{
Console.WriteLine("任务3……");
});
});
Console.ReadKey();
}
还可以使用PLINQ,使得传统的单线程的LINQ支持并行计算:
static void Main(String[] args)
{
List<int> intList = new List<int>() { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
var query = from p in intList select p;
Console.WriteLine("以下是LINQ顺序输出:");
foreach (int item in query)
{
Console.WriteLine(item.ToString()); // 按照索引顺序输出
}
Console.WriteLine("以下是PLINQ并行输出:");
var queryParallel = from p in intList.AsParallel() select p;
foreach (int item in queryParallel)
{
Console.WriteLine(item.ToString()); // 无序输出
}
// 还有一种方式处理:
// 但如果要将并行输出后的结果排序,ForAll会忽略掉查询的AsOrdered请求。
// var queryParallel = from p in intList.AsParallel().AsOrdered() select p;
// AsOrdered方法可以对并行计算后的队列进行重新组合,以便保持顺序,但在ForAll方法中,输出仍无序。
queryParallel.ForAll((item) =>
{
Console.WriteLine(item.ToString());
});
}
谨慎使用并行的情况
并行多带来的后台任务及任务的管理,都会带来一定的开销,如果一项工作本来就能很快完成,或者说循环体很小,那么并行的速度也许会比非并行要慢。
某些本身就需要同步运行的场合,或者需要较长时间锁定共享资源的场合。
在对整型数据进行同步操作时,可以使用静态类Interlocked的Add方法,这就极大避免了由于进行院子操作长时间锁定某个共享资源所带来的同步性能损耗。
static void Main(string[] args)
{
int[] nums = new int[] { 1, 2, 3, 4 };
int total = 0;
Parallel.For<int>(0, nums.Length, () =>
{
return 1;
},(i, loopState, subtotal) =>
{
subtotal += nums[i];
return subtotal;
},
(x) => Interlocked.Add(ref total, x)
);
Console.WriteLine("total={0}", total);
Console.ReadKey();
}
理论上,针对total的加法操作,需要使用一个同步锁,否则就无法避免一次tornread(即两次mov操作所导致的字段内存地址边界对齐问题)。
FCL通过提供Interlocked类型解决了这个问题,FCL用来解决简单类型的原子性操作还提供了volatile关键字。
FCL现有的原子性操作我们同步整型数据的时候,带来了性能上的提高。但是,一些场合下,不得不考虑因为同步锁带来的损耗。
static void Main(string[] args)
{
SampleClass sample = new SampleClass();
Parallel.For(0, 10000000, (i) =>
{
sample.SimpleAdd();
});
Console.WriteLine(sample.SomeCount);
}
class SampleClass
{
public long SomeCount { get; private set; }
public void SimpleAdd()
{
SomeCount++;
}
}
为了保证输出正确,必须为并行中的方法体加锁(假设SampleClass是外部提供的API,无权进行源码修改在其内部加锁):
object syncObj = new object();
Parallel.For(0, 10000000, (i) =>
{
lock(syncObj)
{
sample.SimpleAdd();
}
});
虽然输出正确了,但是由于锁的存在,系统的开销增加,同步带来的线程上下文切换,牺牲了CPU时间与空间性能。还不如不用并行。
异步、并行、任务中的异常处理
Task中的异常处理:
1 |
|
简单示例如下:
static void Main(string[] args)
{
Task t = new Task(() =>
{
throw new Exception("任务并行编码中产生的未知异常");
});
t.Start();
try
{
// 若有Result,可求Result
// 采用Wait方法不可取是因为主任务也许会执行较长时间,会阻塞调用者。
t.Wait();
}
catch (AggregateException e)
{
foreach(var item in e.InnerExceptions)
{
Console.WriteLine("异常类型:{0}{1}来自:{2}{3}异常内容:{4}", item.GetType(), Environment.NewLine, item.Source, Environment.NewLine, item.Message);
}
}
Console.WriteLine("主线程马上结束");
Console.ReadKey();
}
输出:
1 |
|
虽然运行Wait、WaitAny、WaitAll等方法,或者求Result属性能得到任务的异常信息,但会阻滞当前线程。
不能因为一个异常就故意等待,可以考虑任务并行库中Task类型的一个功能:新起一个后续任务,解决等待问题:
static void Main()
{
Task t = new Task(() =>
{
throw new Exception("任务并行编码中产生的未知异常");
});
t.Start();
Task tEnd = t.ContinueWith((task) =>
{
foreach (Exception item in task.Exception.InnerExceptions)
{
Console.WriteLine("异常类型:{0}{1}来自:{2}{3}异常内容:{4}", item.GetType(), Environment.NewLine, item.Source, Environment.NewLine, item.Message);
}
}, TaskContinuationOptions.OnlyOnFaulted);
Console.WriteLine("主线程马上结束");
Console.ReadKey();
}
输出为:
1 |
|
虽然解决了主线程等待问题,但异常处理没有回到主线程中,还在线程池中。
在某些场合,比如对于业务逻辑上特定异常的处理,需要采取这种方式,也鼓励这种用法。但更多时候需要进一步将异常处理封装到主线程。
可采用类似Wait方法达到目的。
static void Main(string[] args)
{
Task t = new Task(() =>
{
throw new InvalidOperationException("任务并行编码中产生的未知异常");
});
t.Start();
Task tEnd = t.ContinueWith((task) =>
{
throw task.Exception;
}, TaskContinuationOptions.OnlyOnFaulted);
try
{
tEnd.Wait();
}
catch (AggregateException err)
{
foreach (var item in err.InnerExceptions)
{
Console.WriteLine("异常类型:{0}{1}来自:{2}{3}异常内容:{4}", item.GetType(), Environment.NewLine, item.Source, Environment.NewLine, item.Message);
}
}
Console.WriteLine("主线程马上结束");
Console.ReadKey();
}
输出为:
1 |
|
对线程调用Wait方法或者求Result不是最好的,因为它会阻滞主线程,并且CLR在后台会新起线程池线程来完成额外的工作。如果要包装异常到主线程,另外一个方法就是使用事件通知的方式。