MediatR源码详解

MediatR源码详解(就差手摸手了)

一、项目介绍

    MediatR是.NET中一种简单的实现进程内的消息传递机制的类库。支持同步或者异步的形式进行请求响应,命令,查询。通知和事件的传递

    github地址:https://github.com/jbogard/MediatR

    作者:jbogard

二、源码分析

2.1、消息传递之单点传递:

      所谓的单点传递,就是在消息传递中,存在一个发消息的,还存在一个接收消息的,就像生活中,我们聊QQ/微信就属于消息传递,而我们现在要聊的就是消息传递中的单点传递也就是私聊,一对一的进行传递。

2.1.1 IRequest 接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

/// <summary>
/// Marker interface to represent a request with a void response
/// </summary>
public interface IRequest : IRequest<Unit> { }

/// <summary>
/// Marker interface to represent a request with a response
/// </summary>
/// <typeparam name="TResponse">Response type</typeparam>
public interface IRequest<out TResponse> : IBaseRequest { }

/// <summary>
/// Allows for generic type constraints of objects implementing IRequest or IRequest{TResponse}
/// </summary>
public interface IBaseRequest { }

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;IRequest相关接口,在消息传递中,是用于一对一的消息传递参数请求所定义相关参数规范的接口,可以理解为定义消息传递的参数格式所需要的接口 我们可以看到,这里有三个相关接口,最底层的是IBaseRequest接口,然后是一个IRequest<TRequest>接口,最后是IRequest接口,首先讲一下三个接口,IBaseRequest接口,其实就是最基础的接口,起到了一个类型约束的作用,所有的参数类型,都必须继承自IBaseRequst接口,而IRequestt<TRequest>接口呢,则是定义了在消息传递的逻辑处理中,所需要返回的返回值的类型,这样就起到了,消息的传递以及响应的作用,发出消息也定义了这个消息所需要的响应是什么。最后一个IRequest接口,继承于IRequest<Unit>接口,这是什么意思呢,这就很厉害了,它是用Unit结构,来当作返回类型,从而实现了无返回值的消息参数类型。

2.1.2 IRequestHandler接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80

/// <summary>
/// Defines a handler for a request
/// </summary>
/// <typeparam name="TRequest">The type of request being handled</typeparam>
/// <typeparam name="TResponse">The type of response from the handler</typeparam>
public interface IRequestHandler<in TRequest, TResponse> where TRequest : IRequest<TResponse>
{
/// <summary>
/// Handles a request
/// </summary>
/// <param name="request">The request</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <returns>Response from the request</returns>
Task<TResponse> Handle(TRequest request, CancellationToken cancellationToken);
}

/// <summary>
/// Defines a handler for a request with a void (<see cref="Unit" />) response.
/// You do not need to register this interface explicitly with a container as it inherits from the base <see cref="IRequestHandler{TRequest, TResponse}" /> interface.
/// </summary>
/// <typeparam name="TRequest">The type of request being handled</typeparam>
public interface IRequestHandler<in TRequest> : IRequestHandler<TRequest, Unit> where TRequest : IRequest<Unit>
{
}

/// <summary>
/// Wrapper class for a handler that asynchronously handles a request and does not return a response
/// </summary>
/// <typeparam name="TRequest">The type of request being handled</typeparam>
public abstract class AsyncRequestHandler<TRequest> : IRequestHandler<TRequest> where TRequest : IRequest
{
async Task<Unit> IRequestHandler<TRequest, Unit>.Handle(TRequest request, CancellationToken cancellationToken)
{
await Handle(request, cancellationToken).ConfigureAwait(false);
return Unit.Value;
}

/// <summary>
/// Override in a derived class for the handler logic
/// </summary>
/// <param name="request">Request</param>
/// <param name="cancellationToken"></param>
/// <returns>Response</returns>
protected abstract Task Handle(TRequest request, CancellationToken cancellationToken);
}

/// <summary>
/// Wrapper class for a handler that synchronously handles a request and returns a response
/// </summary>
/// <typeparam name="TRequest">The type of request being handled</typeparam>
/// <typeparam name="TResponse">The type of response from the handler</typeparam>
public abstract class RequestHandler<TRequest, TResponse> : IRequestHandler<TRequest, TResponse> where TRequest : IRequest<TResponse>
{
Task<TResponse> IRequestHandler<TRequest, TResponse>.Handle(TRequest request, CancellationToken cancellationToken)
=> Task.FromResult(Handle(request));

/// <summary>
/// Override in a derived class for the handler logic
/// </summary>
/// <param name="request">Request</param>
/// <returns>Response</returns>
protected abstract TResponse Handle(TRequest request);
}

/// <summary>
/// Wrapper class for a handler that synchronously handles a request does not return a response
/// </summary>
/// <typeparam name="TRequest">The type of request being handled</typeparam>
public abstract class RequestHandler<TRequest> : IRequestHandler<TRequest> where TRequest : IRequest
{
Task<Unit> IRequestHandler<TRequest, Unit>.Handle(TRequest request, CancellationToken cancellationToken)
{
Handle(request);
return Unit.Task;
}

protected abstract void Handle(TRequest request);
}

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;既然说到了微信聊天,我们肯定知道聊天最起码得是俩人的事,特别是私聊,对面必须的有一个消息接收者,没有接收者我们发出去的就不是消息,而是寂寞。而IRequestHandler接口的主要作用是接收消息参数进行响应的逻辑处理,在这个接口下面,还有三个抽象类,采用的是包装器模式,对这个接口进行额外的功能实现,
可以看出,这三个抽象类实现了有响应,无响应这两种情况的同步以及异步不同模式下的处理方式(就像Boss直聘里面经常出现的:消息未读、已读不回、还有已读明天回),而方法中的抽象方法更是为了在不同情况下,所继承的实现类根据具体业务来实现具体逻辑使用。

2.2、消息传递之消息通知

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;通知,这个就有趣了,通知可以是单独通知一个人,也可以通知一群人,在经历2020年初疫情之后,大家肯定对网上很火某地村长拿大喇叭喊话通知这段视频记忆犹新吧,从这里我们就可以看出消息的通知,可以是针对一到N个人进行通知的,而且通知后不需要每个人进行回复响应,想要接收到通知以后有条不紊的做就完了。所以接下来说的就是项目中通知的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

/// <summary>
/// Marker interface to represent a notification
/// </summary>
public interface INotification { }

/// <summary>
/// Defines a handler for a notification
/// </summary>
/// <typeparam name="TNotification">The type of notification being handled</typeparam>
public interface INotificationHandler<in TNotification> where TNotification : INotification
{
/// <summary>
/// Handles a notification
/// </summary>
/// <param name="notification">The notification</param>
/// <param name="cancellationToken">Cancellation token</param>
Task Handle(TNotification notification, CancellationToken cancellationToken);
}

/// <summary>
/// Wrapper class for a synchronous notification handler
/// </summary>
/// <typeparam name="TNotification">The notification type</typeparam>
public abstract class NotificationHandler<TNotification> : INotificationHandler<TNotification> where TNotification : INotification
{
Task INotificationHandler<TNotification>.Handle(TNotification notification, CancellationToken cancellationToken)
{
Handle(notification);
return Unit.Task;
}

/// <summary>
/// Override in a derived class for the handler logic
/// </summary>
/// <param name="notification">Notification</param>
protected abstract void Handle(TNotification notification);
}

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;通知,既然是一对多的,那么什么人可以收到通知呢?我们从上面代码可以看到通知只有一个简单的INotification接口,他的作用也仅仅是起到个类型约束的作用,所有发送的通知类都必须继承此接口,但是如何去接收指定类型通知呢?或者换句话说,我怎么知道这个通知是不是通知给我的呢?我们先把这个问题放在这里,后面会讲到。

2.3、消息处理管道

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;消息处理管道,管道我们都知道,就不再赘述了,就是你脑子里想到的那个东西,不管你想到的是啥,反正作用差不多,那么消息处理管道是干什么的,我们先看源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

/// <summary>
/// Represents an async continuation for the next task to execute in the pipeline
/// </summary>
/// <typeparam name="TResponse">Response type</typeparam>
/// <returns>Awaitable task returning a <typeparamref name="TResponse"/></returns>
public delegate Task<TResponse> RequestHandlerDelegate<TResponse>();

/// <summary>
/// Pipeline behavior to surround the inner handler.
/// Implementations add additional behavior and await the next delegate.
/// </summary>
/// <typeparam name="TRequest">Request type</typeparam>
/// <typeparam name="TResponse">Response type</typeparam>
public interface IPipelineBehavior<in TRequest, TResponse> where TRequest : notnull
{
/// <summary>
/// Pipeline handler. Perform any additional behavior and await the <paramref name="next"/> delegate as necessary
/// </summary>
/// <param name="request">Incoming request</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <param name="next">Awaitable delegate for the next action in the pipeline. Eventually this delegate represents the handler.</param>
/// <returns>Awaitable task returning the <typeparamref name="TResponse"/></returns>
Task<TResponse> Handle(TRequest request, CancellationToken cancellationToken, RequestHandlerDelegate<TResponse> next);
}

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;所谓请求处理管道,即在传递传递过程对消息进行额外处理的,举个不恰当的例子,大家都见过流水线,一批批产品放在流水线上,然后过一个地方就会被进行相应的处理,比如可口可乐吧,先是装瓶、然后盖盖、然后包装、喷日期、这系列下来最后才成为一个可以出场的产品,大家可以把请求处理管道理解从请求到接收请求处理的过程看作一个流水线,然后管道处理程序就像是装瓶喷日期等操作,虽然不恰当,但是我个人觉得,很形象,在管道中我们可以先校验信息的安全性,以及参数的是否符合规范,熟悉.NetCore的应该可以立马想起来.NET Core中的请求管道和中间件的构建,先构建管道,然后再消息传递过程中去执行管道。

在IPipelineBehavior接口下还实现了有两个类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55

/// <summary>
/// Behavior for executing all <see cref="IRequestPreProcessor{TRequest}"/> instances before handling a request
/// </summary>
/// <typeparam name="TRequest"></typeparam>
/// <typeparam name="TResponse"></typeparam>
public class RequestPreProcessorBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse>
where TRequest : notnull
{
private readonly IEnumerable<IRequestPreProcessor<TRequest>> _preProcessors;

public RequestPreProcessorBehavior(IEnumerable<IRequestPreProcessor<TRequest>> preProcessors)
{
_preProcessors = preProcessors;
}

public async Task<TResponse> Handle(TRequest request, CancellationToken cancellationToken, RequestHandlerDelegate<TResponse> next)
{
foreach (var processor in _preProcessors)
{
await processor.Process(request, cancellationToken).ConfigureAwait(false);
}

return await next().ConfigureAwait(false);
}
}

/// <summary>
/// Behavior for executing all <see cref="IRequestPostProcessor{TRequest,TResponse}"/> instances after handling the request
/// </summary>
/// <typeparam name="TRequest">Request type</typeparam>
/// <typeparam name="TResponse">Response type</typeparam>
public class RequestPostProcessorBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse>
where TRequest : notnull
{
private readonly IEnumerable<IRequestPostProcessor<TRequest, TResponse>> _postProcessors;

public RequestPostProcessorBehavior(IEnumerable<IRequestPostProcessor<TRequest, TResponse>> postProcessors)
{
_postProcessors = postProcessors;
}

public async Task<TResponse> Handle(TRequest request, CancellationToken cancellationToken, RequestHandlerDelegate<TResponse> next)
{
var response = await next().ConfigureAwait(false);

foreach (var processor in _postProcessors)
{
await processor.Process(request, response, cancellationToken).ConfigureAwait(false);
}

return response;
}
}

这两个类主要是供大家在项目中使用的时候去继承的,一个是用于请求前处理消息,一个是用于请求后处理消息。不过大家也可以直接去继承IPipelineBehavior接口去实现我们自己的逻辑。

话到这里有个问题,我们如何保证其调用的先后顺序呢?我们还是先放放,继续向下

2.3、核心功能实现:

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 在此之前我们把必要的接口都过了一遍了,而他们那么多全都是为了实现一个功能,消息的发送与通知。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

public interface IMediator
{
/// <summary>
/// Asynchronously send a request to a single handler
/// </summary>
/// <typeparam name="TResponse">Response type</typeparam>
/// <param name="request">Request object</param>
/// <param name="cancellationToken">Optional cancellation token</param>
/// <returns>A task that represents the send operation. The task result contains the handler response</returns>
Task<TResponse> Send<TResponse>(IRequest<TResponse> request, CancellationToken cancellationToken = default);

/// <summary>
/// Asynchronously send an object request to a single handler via dynamic dispatch
/// </summary>
/// <param name="request">Request object</param>
/// <param name="cancellationToken">Optional cancellation token</param>
/// <returns>A task that represents the send operation. The task result contains the type erased handler response</returns>
Task<object?> Send(object request, CancellationToken cancellationToken = default);

/// <summary>
/// Asynchronously send a notification to multiple handlers
/// </summary>
/// <param name="notification">Notification object</param>
/// <param name="cancellationToken">Optional cancellation token</param>
/// <returns>A task that represents the publish operation.</returns>
Task Publish(object notification, CancellationToken cancellationToken = default);

/// <summary>
/// Asynchronously send a notification to multiple handlers
/// </summary>
/// <param name="notification">Notification object</param>
/// <param name="cancellationToken">Optional cancellation token</param>
/// <returns>A task that represents the publish operation.</returns>
Task Publish<TNotification>(TNotification notification, CancellationToken cancellationToken = default)
where TNotification : INotification;
}

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;上面的接口,就是整个项目最核心的接口,IMediator接口,很明确的表自己的作用,以及地位,接口里面包含四个方法:

  • Send<TResponse>() //异步发送一个请求到单个处理程序
  • Send() //异步发送一个动态调用类型的请求到单个处理程序
  • Publish() //异步发送动态调用类型的请求到多个处理程序,并且不需要返回值(多个也没法给返回值)
  • Publish<TNotification>() //异步发送泛型的请求到多个处理程序,并且不需要返回值(多个也没法给返回值)

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;这四个方法就是撑起我们整个类库的核心功能,就是消息传递,并且支持同步及异步的方式进行请求。

在项目的源码中,还有已经实现的MediatR类,但是开发者可以根据自己的业务以及自己的水平,自己去写一个继承接口的实现类,并且实现功能,在这里呢,我们先看人家是怎么写的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105

public class Mediator : IMediator
{
private readonly ServiceFactory _serviceFactory;
private static readonly ConcurrentDictionary<Type, object> _requestHandlers = new ConcurrentDictionary<Type, object>();
private static readonly ConcurrentDictionary<Type, NotificationHandlerWrapper> _notificationHandlers = new ConcurrentDictionary<Type, NotificationHandlerWrapper>();

/// <summary>
/// Initializes a new instance of the <see cref="Mediator"/> class.
/// </summary>
/// <param name="serviceFactory">The single instance factory.</param>
public Mediator(ServiceFactory serviceFactory)
{
_serviceFactory = serviceFactory;
}

public Task<TResponse> Send<TResponse>(IRequest<TResponse> request, CancellationToken cancellationToken = default)
{
if (request == null)
{
throw new ArgumentNullException(nameof(request));
}

var requestType = request.GetType();

var handler = (RequestHandlerWrapper<TResponse>)_requestHandlers.GetOrAdd(requestType,
t => Activator.CreateInstance(typeof(RequestHandlerWrapperImpl<,>).MakeGenericType(requestType, typeof(TResponse))));

return handler.Handle(request, cancellationToken, _serviceFactory);
}

public Task<object?> Send(object request, CancellationToken cancellationToken = default)
{
if (request == null)
{
throw new ArgumentNullException(nameof(request));
}
var requestType = request.GetType();
var requestInterfaceType = requestType
.GetInterfaces()
.FirstOrDefault(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IRequest<>));
var isValidRequest = requestInterfaceType != null;

if (!isValidRequest)
{
throw new ArgumentException($"{nameof(request)} does not implement ${nameof(IRequest)}");
}

var responseType = requestInterfaceType!.GetGenericArguments()[0];
var handler = _requestHandlers.GetOrAdd(requestType,
t => Activator.CreateInstance(typeof(RequestHandlerWrapperImpl<,>).MakeGenericType(requestType, responseType)));

// call via dynamic dispatch to avoid calling through reflection for performance reasons
return ((RequestHandlerBase) handler).Handle(request, cancellationToken, _serviceFactory);
}

public Task Publish<TNotification>(TNotification notification, CancellationToken cancellationToken = default) where TNotification : INotification
{
if (notification == null)
{
throw new ArgumentNullException(nameof(notification));
}

return PublishNotification(notification, cancellationToken);
}

public Task Publish(object notification, CancellationToken cancellationToken = default)
{
if (notification == null)
{
throw new ArgumentNullException(nameof(notification));
}
if (notification is INotification instance)
{
return PublishNotification(instance, cancellationToken);
}

throw new ArgumentException($"{nameof(notification)} does not implement ${nameof(INotification)}");
}

/// <summary>
/// Override in a derived class to control how the tasks are awaited. By default the implementation is a foreach and await of each handler
/// </summary>
/// <param name="allHandlers">Enumerable of tasks representing invoking each notification handler</param>
/// <param name="notification">The notification being published</param>
/// <param name="cancellationToken">The cancellation token</param>
/// <returns>A task representing invoking all handlers</returns>
protected virtual async Task PublishCore(IEnumerable<Func<INotification, CancellationToken, Task>> allHandlers, INotification notification, CancellationToken cancellationToken)
{
foreach (var handler in allHandlers)
{
await handler(notification, cancellationToken).ConfigureAwait(false);
}
}

private Task PublishNotification(INotification notification, CancellationToken cancellationToken = default)
{
var notificationType = notification.GetType();
var handler = _notificationHandlers.GetOrAdd(notificationType,
t => (NotificationHandlerWrapper)Activator.CreateInstance(typeof(NotificationHandlerWrapperImpl<>).MakeGenericType(notificationType)));

return handler.Handle(notification, cancellationToken, _serviceFactory, PublishCore);
}
}

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;我们注意到,在类的构造函数中,它注入了一个ServiceFactory,这个ServiceFactory是个什么东西,我们来看一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

/// <summary>
/// Factory method used to resolve all services. For multiple instances, it will resolve against <see cref="IEnumerable{T}" />
/// </summary>
/// <param name="serviceType">Type of service to resolve</param>
/// <returns>An instance of type <paramref name="serviceType" /></returns>
public delegate object ServiceFactory(Type serviceType);

public static class ServiceFactoryExtensions
{
public static T GetInstance<T>(this ServiceFactory factory)=> (T) factory(typeof(T));

public static IEnumerable<T> GetInstances<T>(this ServiceFactory factory)
{
Type type = typeof(IEnumerable<T>);

var a = (IEnumerable<T>) factory.Invoke(typeof(IEnumerable<T>));

return a;
}
}

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;ServiceFactory是干什么用的,这次我们看注释,我给翻译了一下:用于解析所有服务的工厂方法。对于多个实例,它将解析为IEnumerable,欸!这就很有意思了,他的意思是什么呢,他是一个工厂,通过IOC也就是依赖注入,当你使用的时候,指定一个类型,他将在IOC容器里面直接给你解析出一个或者多个实例出来,是不是很牛逼,反正我觉得很牛逼,当初这玩意我整整研究了一天才研究明白!搞清楚ServiceFactory的用法以后,我们开始一步步理清这个项目是如何实现功能的,首先我们从Send()开始,我们可以看到,当参数类传入进来以后,首先根据GetType方法得到了参数类型,然后通过反射实例化出了一个继承于RequestHandlerWrapperImpl<,>类型的实例,那么这个RequestHandlerWrapperImpl<,>又是干啥的呢?我们进去看一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57

internal abstract class RequestHandlerBase
{
public abstract Task<object?> Handle(object request, CancellationToken cancellationToken,ServiceFactory serviceFactory);

protected static THandler GetHandler<THandler>(ServiceFactory factory)
{
THandler handler;

try
{
handler = factory.GetInstance<THandler>();
}
catch (Exception e)
{
throw new InvalidOperationException($"Error constructing handler for request of type {typeof(THandler)}. Register your handlers with the container. See the samples in GitHub for examples.", e);
}

if (handler == null)
{
throw new InvalidOperationException($"Handler was not found for request of type {typeof(THandler)}. Register your handlers with the container. See the samples in GitHub for examples.");
}

return handler;
}
}

internal abstract class RequestHandlerWrapper<TResponse> : RequestHandlerBase
{
public abstract Task<TResponse> Handle(IRequest<TResponse> request, CancellationToken cancellationToken,ServiceFactory serviceFactory);
}

internal class RequestHandlerWrapperImpl<TRequest, TResponse> : RequestHandlerWrapper<TResponse> where TRequest : IRequest<TResponse>
{
public override Task<object?> Handle(object request, CancellationToken cancellationToken, ServiceFactory serviceFactory)
{
return Handle((IRequest<TResponse>)request, cancellationToken, serviceFactory).ContinueWith(t =>
{
if (t.IsFaulted)
{
ExceptionDispatchInfo.Capture(t.Exception.InnerException).Throw();
}
return (object?)t.Result;
}, cancellationToken);
}

public override Task<TResponse> Handle(IRequest<TResponse> request, CancellationToken cancellationToken,ServiceFactory serviceFactory)
{
Task<TResponse> Handler() => GetHandler<IRequestHandler<TRequest, TResponse>>(serviceFactory).Handle((TRequest) request, cancellationToken);

return serviceFactory
.GetInstances<IPipelineBehavior<TRequest, TResponse>>()
.Reverse()
.Aggregate((RequestHandlerDelegate<TResponse>) Handler, (next, pipeline) => () => pipeline.Handle((TRequest)request, cancellationToken, next))();
}
}

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;又是个典型的包装器模式,话不多说直接看实现,我们可以看到RequestHandlerWrapperImpl<,>类继承于RequestHandlerWrapper<TRespose>类,是包装器的实现类,其中类里面包含一个Handle方法,将参数以及ServiceFactory传入进来以后首先解析出继承管道的所有类,欸!!!这个时候我们回顾一下我们的问题,管道的执行顺序,嘿嘿最骚的来了,
我们看他下面的Reverse()这个方法,是将IEnumerable类型的元素反过来排序,这个时候你就问了,这跟顺序有啥关系?我来告诉你!!NetCore的IOC容器,不管是内置的还是AutoFac,进行注册的时候,服务进入容器的顺序就像压栈一样,先进的被压在下面,后进的在上面,调用顺序也是先进后出的顺序。所以当我们根据依赖注入反射出所有管道实现类以后,反转排序一下,是不是就跟你注册时候写的顺序一毛一样了???这个时候下面的Aggregate()方法,就会按照管道顺序,一个一个的执行,可以看到在Aggregate()方法,还传入了一个请求处理的委托类型Handler,这个委托类型就是管道最后所执行的我们传入的参数的处理类型,看一下Haddler的实现方法,首先是通过GetHandler<THandler>()方法(这个方法在RequestHandlerBase那个抽象类里面,不记得的往上翻)通过IOC获取出对应的实例出来,然后调用实例的Handle()方法进行处理。一对一消息传递的所有的流程全部走完,还有人问通知模式下如何实现的?来我告诉你。。。看源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

internal abstract class NotificationHandlerWrapper
{
public abstract Task Handle(INotification notification, CancellationToken cancellationToken, ServiceFactory serviceFactory,Func<IEnumerable<Func<INotification, CancellationToken, Task>>, INotification, CancellationToken, Task> publish);
}

internal class NotificationHandlerWrapperImpl<TNotification> : NotificationHandlerWrapper where TNotification : INotification
{
public override Task Handle(INotification notification, CancellationToken cancellationToken, ServiceFactory serviceFactory,Func<IEnumerable<Func<INotification, CancellationToken, Task>>, INotification, CancellationToken, Task> publish)
{
var handlers = serviceFactory.GetInstances<INotificationHandler<TNotification>>()
.Select(x => new Func<INotification, CancellationToken, Task>((theNotification, theToken) => x.Handle((TNotification)theNotification, theToken)));

return publish(handlers, notification, cancellationToken);
}
}

/// <summary>
/// Override in a derived class to control how the tasks are awaited. By default the implementation is a foreach and await of each handler
/// </summary>
/// <param name="allHandlers">Enumerable of tasks representing invoking each notification handler</param>
/// <param name="notification">The notification being published</param>
/// <param name="cancellationToken">The cancellation token</param>
/// <returns>A task representing invoking all handlers</returns>
protected virtual async Task PublishCore(IEnumerable<Func<INotification, CancellationToken, Task>> allHandlers, INotification notification, CancellationToken cancellationToken)
{
foreach (var handler in allHandlers)
{
await handler(notification, cancellationToken).ConfigureAwait(false);
}
}



&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;看到了吧,除了没有走管道,几乎一模一样,除了最后用了个publish();这个publish()就是将查出来的实现类循环执行一遍。。没了!!

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;好了,至此整个项目完全梳理完了,估计仔仔细细看一遍,不说能帮你完全了解吧,咋说也能在你看源码的时候帮你一点。。。

在这里插入图片描述
在这里插入图片描述


MediatR源码详解
http://blog.renzhichu.top/2021/08/23/事件总线/MediatR源码详解/
作者
多拉多拉曼
发布于
2021年8月23日
许可协议