问题产生
当异步操作需要将结果传递给指定线程执行,需要一个中介来实现。
列如UI程序需要ui线程执行异步结果显示;单线程模型服务器需要由主线程执行异步结果操作逻辑。
基础概念
工作单元(Task)应列入上下文(Context),而不是某个特定线程(Thread);
微软为开发者提供了SynchronizationContext,在线程与线程通讯中充当传输者的角色;
不是每个线程都附加SynchronizationContext这个对象,对于WinForm程序,WindowsFormsSynchronizationContext将所有异步操作回归UI线程执行;
每一个运行的Task都有相应的TaskScheduler来为其进行调度,而SynchronizationContext是作为TaskScheduler的一个一般抽象,它的出现主要是针对具有UI线程的应用。所以,在不同的应用中,都有各自的SynchronizationContext的实现,比如WinForm、WPF等。
当await一个异步操作时,就会捕获这个SynchronizationContext或者TaskScheduler,具体逻辑如下面代码所示:如果当前SynchronizationContext不为空则就是它,否则,查看TaskScheduler是否符合条件。
object scheduler = SynchronizationContext.Current;
if (scheduler is null && TaskScheduler.Current != TaskScheduler.Default)
{
scheduler = TaskScheduler.Current;
}
Default SynchronizationContext
// Decompiled with JetBrains decompiler
// Type: System.Threading.SynchronizationContext
// Assembly: System.Private.CoreLib, Version=6.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e
// MVID: 52CD8444-76EC-4091-8389-09409447BE01
// Assembly location: C:\Program Files\dotnet\shared\Microsoft.NETCore.App\6.0.8\System.Private.CoreLib.dll
#nullable enable
namespace System.Threading
{
public class SynchronizationContext
{
private bool _requireWaitNotification;
#nullable disable
private static int InvokeWaitMethodHelper(
SynchronizationContext syncContext,
IntPtr[] waitHandles,
bool waitAll,
int millisecondsTimeout)
{
return syncContext.Wait(waitHandles, waitAll, millisecondsTimeout);
}
#nullable enable
public static SynchronizationContext? Current => Thread.CurrentThread._synchronizationContext;
protected void SetWaitNotificationRequired() => this._requireWaitNotification = true;
public bool IsWaitNotificationRequired() => this._requireWaitNotification;
public virtual void Send(SendOrPostCallback d, object? state) => d(state);
public virtual void Post(SendOrPostCallback d, object? state) => ThreadPool.QueueUserWorkItem<(SendOrPostCallback, object)>((Action<(SendOrPostCallback, object)>) (s => s.d(s.state)), (d, state), false);
public virtual void OperationStarted()
{
}
public virtual void OperationCompleted()
{
}
[CLSCompliant(false)]
public virtual int Wait(IntPtr[] waitHandles, bool waitAll, int millisecondsTimeout) => SynchronizationContext.WaitHelper(waitHandles, waitAll, millisecondsTimeout);
[CLSCompliant(false)]
protected static int WaitHelper(IntPtr[] waitHandles, bool waitAll, int millisecondsTimeout)
{
if (waitHandles == null)
throw new ArgumentNullException(nameof (waitHandles));
return WaitHandle.WaitMultipleIgnoringSyncContext((Span<IntPtr>) waitHandles, waitAll, millisecondsTimeout);
}
public static void SetSynchronizationContext(SynchronizationContext? syncContext) => Thread.CurrentThread._synchronizationContext = syncContext;
public virtual SynchronizationContext CreateCopy() => new SynchronizationContext();
}
}
注意到Post方法,默认的是调用ThreadPool
自定义同步上下文
继承SynchronizationContext,实现异步方法回归主线程执行:
using System;
using System.Collections.Concurrent;
using System.Threading;
namespace ET
{
public class ThreadSynchronizationContext : SynchronizationContext
{
// 线程同步队列,发送接收socket回调都放到该队列,由poll线程统一执行
private readonly ConcurrentQueue<Action> queue = new ConcurrentQueue<Action>();
private Action a;
public void Update()
{
while (true)
{
if (!this.queue.TryDequeue(out a))
{
return;
}
try
{
a();
}
catch (Exception e)
{
Log.Error(e);
}
}
}
public override void Post(SendOrPostCallback callback, object state)
{
this.Post(() => callback(state));
}
public void Post(Action action)
{
this.queue.Enqueue(action);
}
}
}
通过重写Post方法,将异步执行完毕调用封装成委托,存入队列,由主线程调用
private readonly ThreadSynchronizationContext threadSynchronizationContext = new ThreadSynchronizationContext();
创建自己的上下文,并设置到SynchronizationContext
SynchronizationContext.SetSynchronizationContext(this.threadSynchronizationContext);
这样所有await async执行完异步方法后,通过传递的上下文threadSynchronizationContext,向队列中传入回调,由主线程执行
关于上下文在异步方法中的传递
异步调用遇到await关键字时,自动将当前线程的上下文传递给异步方法。但你可以使用ConfigureAwait来改变这一行为
using System;
using System.Collections.Concurrent;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
namespace Test2
{
public class MySynchronizationContext : SynchronizationContext
{
// 线程同步队列,发送接收socket回调都放到该队列,由poll线程统一执行
private readonly ConcurrentQueue<Action> queue = new ConcurrentQueue<Action>();
private Action a;
public void Update()
{
while (true)
{
if (!this.queue.TryDequeue(out a))
{
return;
}
try
{
a();
}
catch (Exception e)
{
Console.WriteLine(e);
}
}
}
public override void Post(SendOrPostCallback callback, object state)
{
this.Post(() => callback(state));
}
public void Post(Action action)
{
this.queue.Enqueue(action);
}
}
class Program
{
private static int loopCount = 0;
private static MySynchronizationContext ctx;
static void Main(string[] args)
{
ctx = new MySynchronizationContext();
SynchronizationContext.SetSynchronizationContext(ctx);
Console.WriteLine($"主线程: {Thread.CurrentThread.ManagedThreadId}");
Crontine();
while (true)
{
ctx.Update();
Thread.Sleep(1);
++loopCount;
if (loopCount % 10000 == 0)
{
Console.WriteLine($"loop count: {loopCount}");
}
}
}
private static async void Crontine()
{
await WaitTimeAsync(0);
Console.WriteLine($"当前线程: {Thread.CurrentThread.ManagedThreadId}, WaitTimeAsync finsih loopCount的值是: {loopCount}");
await WaitTimeAsync(0);
Console.WriteLine($"当前线程: {Thread.CurrentThread.ManagedThreadId}, WaitTimeAsync finsih loopCount的值是: {loopCount}");
await WaitTimeAsync(0);
Console.WriteLine($"当前线程: {Thread.CurrentThread.ManagedThreadId}, WaitTimeAsync finsih loopCount的值是: {loopCount}");
await WaitTimeAsync(0).ConfigureAwait(false);
Console.WriteLine("ConfigureAwait(false)之后...");
Console.WriteLine($"当前线程: {Thread.CurrentThread.ManagedThreadId}, WaitTimeAsync finsih loopCount的值是: {loopCount}");
await WaitTimeAsync(0);
Console.WriteLine($"当前线程: {Thread.CurrentThread.ManagedThreadId}, WaitTimeAsync finsih loopCount的值是: {loopCount}");
await WaitTimeAsync(0);
Console.WriteLine($"当前线程: {Thread.CurrentThread.ManagedThreadId}, WaitTimeAsync finsih loopCount的值是: {loopCount}");
}
private static Task WaitTimeAsync(int waitTime)
{
TaskCompletionSource<bool> tcs = new TaskCompletionSource<bool>();
Thread thread = new Thread(()=>WaitTime(waitTime, tcs));
thread.Start();
return tcs.Task;
}
/// <summary>
/// 在另外的线程等待
/// </summary>
private static void WaitTime(int waitTime, TaskCompletionSource<bool> tcs)
{
Thread.Sleep(waitTime);
// 将tcs扔回主线程执行
//ctx.Post(o=>tcs.SetResult(true), null);
tcs.SetResult(true);
}
}
}
结果:
主线程: 1
当前线程: 1, WaitTimeAsync finsih loopCount的值是: 0
当前线程: 1, WaitTimeAsync finsih loopCount的值是: 1
当前线程: 1, WaitTimeAsync finsih loopCount的值是: 2
ConfigureAwait(false)之后...
当前线程: 8, WaitTimeAsync finsih loopCount的值是: 2
当前线程: 10, WaitTimeAsync finsih loopCount的值是: 3
当前线程: 11, WaitTimeAsync finsih loopCount的值是: 4
默认情况下,当前同步上下文在await关键字处被捕获,await后面的执行代码会列入到该同步上下文中执行,即调用上下文的Post方法
对于异步方法
static async Task<int> fun()
{
await AsyncFun();
Run();
}
执行代码类似于
Task t = AsyncFun();
var currentContext = SynchronizationContext.Current;
if (null == currentContext)
{
t.ContinueWith((te) => { Run(); }, TaskScheduler.Current);
}
else
{
t.ContinueWith((te) => { Run(); }, TaskScheduler.FromCurrentSynchronizationContext());
}
函数会先获取SynchronizationContext.Current对象,这个对象默认情况下,在控制台程序中为null,在GUI程序中不为null
ConfigureAwait 提供了一种途径避免 SynchronizationContext 捕获
而TaskScheduler.Current默认使用线程池来执行异步回调
参考
实现自己的异步方法
C#中的ExecutionContext 和 SynchronizationContext
SynchronizationContext(同步上下文)综述