1. preface
事件总线,即EventBus,是一种解耦模块间通讯的强大工具。在 CodeWF.EventBus 中,我们得以轻松实现CQRS模式,并通过清晰、简洁的接口进行事件订阅与发布。接下来,我们将详细探讨如何使用这个库来处理事件。
CQRS, whose full name is Command Query Responsibility Segregation, is a software architecture pattern that aims to improve system performance, scalability and responsiveness by separating command (write operations) and query (read operations) responsibilities in the system.
CodeWF.EventBus 适用于进程内事件传递(无其他外部依赖),与 MediatR 功能类似。MediatR库侧重于ASP.NET Core设计,且其功能更加强大,CodeWF.EventBus库优势:
- Small and flexible, designed to be used in various template projects such as WPF, Winform, Avalonia UI, ASP.NET Core, etc.
- 支持使用了任何
IOC容器的项目。 - 参考MASA Framework增强事件处理能力,支持一个类定义多个事件处理方法:
2. instructions for use
2.1. Register Event Bus
2.1.1. MS. DI Container
主要是ASP.NET Core程序,比如 MVC、Razor Pages、Blazor Server 等模板程序,请搜索 NuGet 包CodeWF.AspNetCore.EventBus并安装最新版,安装完成后,在Program中添加如下代码:
// ....
// 1、注册事件总线,将标注`EventHandler`特性方法的类采用单例方式注入IOC容器
builder.Services.AddEventBus();
var app = builder.Build();
// ...
// 2、将上面已经注入IOC容器的类取出、关联处理方法到事件总线管理
app.UseEventBus();
// ...
AddEventBus方法会扫描传入的程序集列表,将标注Event特性的类下又标注EventHandler特性方法的类采用单例方式注入 IOC 容器。UseEventBus方法会将上一步注入的类通过 IOC 容器获取到实例,将实例的事件处理方法注册到事件管理队列中去,待收到事件发布时,会从事件管理队列中查找事件处理方法并调用,达到事件通知的功能。
2.1.2. DryIOC Container
如果使用的DryIoc容器,比如 WPF /Avalonia UI中使用了 Prism 框架的DryIoc容器,请搜索 NuGet 包CodeWF.DryIoc.EventBus并安装最新版,安装完成后,在RegisterTypes方法中添加如下代码:
protected override void RegisterTypes(IContainerRegistry containerRegistry)
{
IContainer? container = containerRegistry.GetContainer();
// ...
// Register EventBus
containerRegistry.AddEventBus();
// ...
// Use EventBus
container.UseEventBus();
}
2.1.3. Any IOC container
如果使用了其他IOC容器的项目,请搜索 NuGet 包CodeWF.IOC.EventBus并安装最新版,安装完成后,根据 IOC 容器注册单例、获取服务的 API 不同,做相应修改即可。
上面ASP.NET Core示例注册事件总线可改为:
// ....
// 1、注册事件总线,将标注`EventHandler`特性方法的类采用单例方式注入IOC容器
EventBusExtensions.AddEventBus(
(t1, t2) => builder.Services.AddSingleton(t1, t2),
t => builder.Services.AddSingleton(t),
Assembly.GetExecutingAssembly());
var app = builder.Build();
// ...
// 2、将上面已经注入IOC容器的类取出、关联处理方法到事件总线管理
EventBusExtensions.UseEventBus(t => app.Services.GetRequiredService(t), Assembly.GetExecutingAssembly());
// ...
支持任意IOC容器原理就在AddEventBus和UseEventBus方法:
using CodeWF.EventBus;
using System.Reflection;
namespace CodeWF.IOC.EventBus
{
public static class EventBusExtensions
{
public static void AddEventBus(Action<Type, Type> addSingleton1,
Action<Type> addSingleton2, params Assembly[] assemblies)
{
addSingleton1(typeof(IEventBus), typeof(CodeWF.EventBus.EventBus));
var allAssemblies = assemblies.Concat(new[] { Assembly.GetCallingAssembly() }).ToArray();
CodeWF.EventBus.EventBusExtensions.HandleEventObject(type => addSingleton2(type),
BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic,
allAssemblies);
}
public static void UseEventBus(Func<Type, object> resolveAction, params Assembly[] assemblies)
{
if (!(resolveAction(typeof(IEventBus)) is IEventBus messenger))
{
throw new InvalidOperationException("Please call AddEventBus before calling UseEventBus");
}
var allAssemblies = assemblies.Concat(new[] { Assembly.GetCallingAssembly() }).ToArray();
CodeWF.EventBus.EventBusExtensions.HandleEventObject(
type => messenger.Subscribe(resolveAction(type)),
BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic, allAssemblies);
CodeWF.EventBus.EventBusExtensions.HandleEventObject(type => messenger.Subscribe(type),
BindingFlags.Static | BindingFlags.Public | BindingFlags.NonPublic, allAssemblies);
}
}
}
using System;
using System.Linq;
using System.Reflection;
namespace CodeWF.EventBus
{
public static class EventBusExtensions
{
public static void HandleEventObject(Action<Type> handleRecipient, BindingFlags findHandlerMethodBindingFlags,
Assembly[] assemblies)
{
foreach (var assembly in assemblies)
{
var types = assembly.GetTypes()
.Where(t => t.IsClass
&& !t.IsAbstract
&& t.GetCustomAttributes<EventAttribute>().Any()
&& t.GetMethods(findHandlerMethodBindingFlags)
.Any(m =>
m.GetCustomAttributes<EventHandlerAttribute>().Any()));
foreach (var type in types)
{
handleRecipient(type);
}
}
}
}
}
2.1.4. No IOC containers used
The default WPF, Winform, Avalonia UI, and console programs do not introduce any IOC containers by default. We do not need event service registration operations for such projects.
我们搜索 NuGet 包CodeWF.EventBus并安装最新版,安装完成后功能使用上和使用IOC容器一致,只是欠缺IOC注入自动订阅功能,具体差别请继续往下看。
2.2. defined event
Here we use CQRS to complete the business logic of our program, and in CQRS mode our queries are separated from other business operations. Those who don't know about CQRS can check out this article: learn.microsoft.com/zh-cn/azure/architecture/patterns/cqrs
2.2.1. Define Command
在CQRS模式中,命令代表写操作。定义命令类,这些类继承自Command类
public class CreateProductCommand : Command
{
public string Name { get; set; }
public decimal Price { get; set; }
}
public class CreateProductSuccessCommand : Command
{
public string Name { get; set; }
public decimal Price { get; set; }
}
public class DeleteProductCommand : Command
{
public Guid ProductId { get; set; }
}
2.2.2. Define Query
在CQRS模式中,查询代表读操作。查询需要等待得到回应,适用于请求/响应。使用查询,调用方只需要关心我需要使用ProductQuery、ProductsQuery,而不必操心我需要IProductService、ICategoryService等服务获取查询结果。
定义查询类,继承自Query<T>:
public class ProductQuery : Query<ProductItemDto>
{
public Guid ProductId { get; set; }
public override ProductItemDto Result { get; set; }
}
public class ProductsQuery : Query<List<ProductItemDto>>
{
public string Name { get; set; }
public override List<ProductItemDto> Result { get; set; }
}
Query<T>中T表示查询响应结果类型,在XXXQuery中使用Result属性表示查询发布后得到的结果。
Query继承自Command,带Result属性:
public abstract class Query<TResult> : Command
{
public abstract TResult Result { get; set; }
}
2.3. Subscribe to events
2.3.1. automatically subscribe to
自动订阅只能在使用了IOC容器的程序中使用,比如ASP.NET Core程序。
Generally, event handlers are encapsulated separately into a class, and the code is as follows:
[Event]
public class CommandAndQueryHandler(IEventBus eventBus, IProductService productService)
{
[EventHandler]
private async Task ReceiveCreateProductCommandAsync(CreateProductCommand command)
{
var isAddSuccess = await productService.AddProductAsync(new CreateProductRequest()
{ Name = command.Name, Price = command.Price });
if (isAddSuccess)
{
await eventBus.PublishAsync(new CreateProductSuccessCommand()
{ Name = command.Name, Price = command.Price });
}
else
{
Console.WriteLine("Create product fail");
}
}
[EventHandler(Order = 2)]
private async Task ReceiveCreateProductSuccessCommandSendEmailAsync(CreateProductSuccessCommand command)
{
Console.WriteLine($"Now send email notify create product success, name is = {command.Name}");
await Task.CompletedTask;
}
[EventHandler(Order = 1)]
private async Task ReceiveCreateProductSuccessCommandSendSmsAsync(CreateProductSuccessCommand command)
{
Console.WriteLine($"Now send sms notify create product success, name is = {command.Name}");
await Task.CompletedTask;
}
[EventHandler(Order = 3)]
private void ReceiveCreateProductSuccessCommandCallPhone(CreateProductSuccessCommand command)
{
Console.WriteLine($"Now call phone notify create product success, name is = {command.Name}");
}
[EventHandler]
private async Task ReceiveDeleteProductCommandAsync(DeleteProductCommand command)
{
var isRemoveSuccess = await productService.RemoveProductAsync(command.ProductId);
Console.WriteLine(isRemoveSuccess ? "Remote product success" : "Remote product fail");
}
[EventHandler]
private async Task ReceiveProductQueryAsync(ProductQuery query)
{
var product = await productService.QueryProductAsync(query.ProductId);
query.Result = product;
}
[EventHandler]
private async Task ReceiveAutoProductsQueryAsync(ProductsQuery query)
{
var products = await productService.QueryProductsAsync(query.Name);
query.Result = products;
}
[EventHandler]
private static async Task ReceiveAutoProductsQueryAsync2(ProductsQuery query)
{
Console.WriteLine("Test auto subscribe static method");
}
}
- 类
CommandAndQueryHandler添加了Event特性,在IOC容器注入时标识为可以作为单例注入。 - 标注了
EventHandler特性的方法拥有处理事件的能力,该方法只能有一个事件类型参数;如果方法支持异步,也只支持Task返回值,不能加泛型声明(加了无效);支持静态事件处理方法。
使用 IOC 容器的程序会自动将标注Event特性的类作为单例注入容器,事件总线收到事件通知时自动查找标注EventHandler特性的方法进行调用,达到事件通知的功能。
2.3.2. Manual subscription
对于未标注Event特性的类,可手动注册事件处理程序,如下是未使用 IOC容器时手动注册示例(核心是EventBus.Default使用):
internal class CommandAndQueryHandler
{
internal void ManuSubscribe()
{
EventBus.Default.Subscribe<DeleteProductCommand>(ReceiveDeleteProductCommandAsync);
EventBus.Default.Subscribe<ProductQuery>(ReceiveProductQueryAsync);
EventBus.Default.Subscribe<ProductsQuery>(ReceiveAutoProductsQueryAsync2);
}
public async Task ReceiveDeleteProductCommandAsync(DeleteProductCommand command)
{
}
public async Task ReceiveProductQueryAsync(ProductQuery query)
{
}
private static async Task ReceiveAutoProductsQueryAsync2(ProductsQuery query)
{
}
}
The above registration processing methods are sometimes too verbose and can be simplified:
internal class CommandAndQueryHandler
{
internal CommandAndQueryHandler()
{
EventBus.Default.Subscribe(this);
}
[EventHandler(Order = 2)]
public async Task ReceiveCreateProductSuccessCommandSendEmailAsync(CreateProductSuccessCommand command)
{
}
// ...省略N多事件处理方法,EventBus.Default.Subscribe(this)方法可以自动绑定
}
使用了 IOC容器,可以注入IEventBus服务替换EventBus.Default使用,下如示例代码:
public class EventBusTestViewModel : ViewModelBase
{
private readonly IEventBus _eventBus;
public MessageTestViewModel(IEventBus eventBus)
{
_eventBus = eventBus;
_eventBus.Subscribe(this);
}
[EventHandler]
public async Task ReceiveDeleteProductCommandAsync(DeleteProductCommand command)
{
var isRemoveSuccess = await productService.RemoveProductAsync(command.ProductId);
Console.WriteLine(isRemoveSuccess ? "Remote product success" : "Remote product fail");
}
}
EventBus是IEventBus接口的默认实现,EventBus.Default是单例引用,所有两者使用任选其一。IOC注入时默认将IEventBus和EventBus作为单例注入,所以与两者等价。
手动订阅可以在 WPF 的 XxxViewModel 中使用(上面代码即是),也可以在 IOC 其他生命周期的服务中使用:
public class TimeService : ITimeService
{
private readonly IEventBus _eventBus;
public TimeService(IEventBus eventBus)
{
_eventBus = eventBus;
_eventBus.Subscribe(this);
}
[EventHandler]
public async Task ReceiveDeleteProductCommandAsync(DeleteProductCommand command)
{
var isRemoveSuccess = await productService.RemoveProductAsync(command.ProductId);
Console.WriteLine(isRemoveSuccess ? "Remote product success" : "Remote product fail");
}
}
Manual registration can be used when single injection is not possible or required to supplement special circumstances.
2.4. publish events
发布命令(Command)与发布查询(Query)使用相同的接口,通过IEventBus或EventBus.Default的Publish和PublishAsync方法发布:
_messenger.Publish(this, new DeleteProductCommand { ProductId = id });
var query = new ProductQuery { ProductId = id };
await _messenger.PublishAsync(this, query);
Console.WriteLine($"查询产品ID为{id}的产品结果是:{query.Result}");
在B/S控制器的Action使用发布:
[ApiController]
[Route("[controller]")]
public class EventController : ControllerBase
{
private readonly ILogger<EventController> _logger;
private readonly IEventBus _eventBus;
public EventController(ILogger<EventController> logger, IEventBus eventBus)
{
_logger = logger;
_eventBus = eventBus;
}
[HttpPost("/add")]
public async Task AddAsync([FromBody] CreateProductRequest request)
{
await _eventBus.PublishAsync(new CreateProductCommand { Name = request.Name, Price = request.Price });
}
[HttpDelete("/delete")]
public async Task DeleteAsync([FromQuery] Guid id)
{
await _eventBus.PublishAsync(new DeleteProductCommand { ProductId = id });
}
[HttpGet("/get")]
public async Task<ProductItemDto> GetAsync([FromQuery] Guid id)
{
var query = new ProductQuery { ProductId = id };
await _eventBus.PublishAsync(query);
return query.Result;
}
[HttpGet("/list")]
public async Task<List<ProductItemDto>> ListAsync([FromQuery] string? name)
{
var query = new ProductsQuery { Name = name };
await _eventBus.PublishAsync(query);
return query.Result;
}
}
在WPF/Avalonia UI的XXXViewModel中使用:
public class EventBusTestViewModel : ViewModelBase
{
private readonly IEventBus _eventBus;
public MessageTestViewModel(IEventBus eventBus)
{
_eventBus = eventBus;
}
public async Task ExecuteEventBusAsync()
{
await _eventBus.PublishAsync(this, new TestMessage(nameof(MessageTestViewModel), TestClass.CurrentTime()));
}
}
2.5. Unsubscribe from events
In practice, you may need to ensure that you unsubscribe at the right time (such as when a service is destroyed) to avoid memory leaks:
- 注销指定处理程序:
Messenger.Default.Unsubscribe<CreateProductMessage>(this, ReceiveManuCreateProductMessage) - 注销指定类的所有处理程序:
Messenger.Default.Unsubscribe(this)
3. Core interface description
public interface IEventBus
{
void Subscribe<T>() where T : class;
void Subscribe(Type type);
void Subscribe(object recipient);
void Subscribe<TCommand>(Action<TCommand> action) where TCommand : Command;
void Subscribe<TCommand>(Func<TCommand, Task> asyncAction) where TCommand : Command;
void Unsubscribe<T>() where T : class;
void Unsubscribe(object recipient);
void Unsubscribe<TCommand>(Action<TCommand> action) where TCommand : Command;
void Unsubscribe<TCommand>(Func<TCommand, Task> asyncAction) where TCommand : Command;
void Publish<TCommand>(TCommand command) where TCommand : Command;
Task PublishAsync<TCommand>(TCommand command) where TCommand : Command;
}
Subscribe<T>():订阅类中静态事件处理方法Subscribe(Type type):订阅指定类类型中静态事件处理方法Subscribe(object recipient):订阅指定实例的成员事件处理方法Subscribe<TCommand>(Action<TCommand> action):订阅普通事件处理方法,包括静态事件处理方法Subscribe<TCommand>(Func<TCommand, Task> asyncAction):订阅异步事件处理方法,包括静态异步事件处理方法Unsubscribe<T>():注销类中静态事件处理方法Unsubscribe(object recipient):注销指定实例的成员事件处理方法Unsubscribe<TCommand>(Action<TCommand> action):注销普通事件处理方法,包括静态事件处理方法Unsubscribe<TCommand>(Func<TCommand, Task> asyncAction):注销异步事件处理方法,包括静态异步事件处理方法Publish<TCommand>(TCommand command):同步发布命令(Command)或查询(Query)PublishAsync<TCommand>(TCommand command):异步发布命令(Command)或查询(Query)
4. summary
CodeWF.EventBus提供了一个小巧灵活的事件总线实现,支持CQRS模式,并适用于各种项目模板,如 Avalonia UI、WPF、WinForms、ASP.NET Core 等。通过简单的订阅和发布操作,你可以轻松实现模块间的解耦和通讯。通过有序的事件处理,确保事件得到妥善处理。
事件总线具体实现请查看CodeWF.EventBus源码: https://github.com/dotnet9/CodeWF.EventBus ,具体使用可参考:
- 单元测试:CodeWF.EventBus.Tests
- AvaloniaUI + Prism:CodeWF.EventBus
- Web API:WebAPIDemo 、CodeWF
Develop reference open source projects:
希望本文的指南能帮助你更好地使用CodeWF.EventBus来处理你的应用程序中的事件。