1. 前言
事件匯流排,即 EventBus,是一種解耦模組間通訊的強大工具。在 CodeWF.EventBus 中,我們得以輕鬆實現 CQRS 模式,並透過清晰、簡潔的介面進行事件訂閱與發佈。接下來,我們將詳細探討如何使用這個函式庫來處理事件。
CQRS,全稱 Command Query Responsibility Segregation,是一種軟體架構模式,旨在透過將系統中的命令(寫操作)和查詢(讀操作)職責進行分離,來提高系統的效能、可擴充性與回應性。
CodeWF.EventBus 適用於行程內事件傳遞(無其他外部依賴),與 MediatR 功能類似。MediatR 函式庫側重於 ASP.NET Core 設計,且其功能更加強大,CodeWF.EventBus 函式庫優勢:
- 小巧靈活,設計可在各種範本專案中使用,如 WPF、Winform、Avalonia UI、ASP.NET Core 等。
- 支援使用了任何
IOC容器的專案。 - 參考 MASA Framework 增強事件處理能力,支援一個類別定義多個事件處理方法:
2. 使用說明
2.1. 註冊事件匯流排
2.1.1. MS.DI 容器
主要是 ASP.NET Core 程式,例如 MVC、Razor Pages、Blazor Server 等範本程式,請搜尋 NuGet 套件 CodeWF.AspNetCore.EventBus 並安裝最新版,安裝完成後,在 Program 中加入下列程式碼:
// ....
// 1、註冊事件匯流排,將標註 `EventHandler` 特性方法的類別採用單例方式注入 IOC 容器
builder.Services.AddEventBus();
var app = builder.Build();
// ...
// 2、將上面已經注入 IOC 容器的類別取出、關聯處理方法到事件匯流排管理
app.UseEventBus();
// ...
AddEventBus方法會掃描傳入的組件清單,將標註Event特性的類別下又標註EventHandler特性方法的類別採用單例方式注入 IOC 容器。UseEventBus方法會將上一步注入的類別透過 IOC 容器取得實例,將實例的事件處理方法註冊到事件管理佇列中去,待收到事件發佈時,會從事件管理佇列中尋找事件處理方法並呼叫,達到事件通知的功能。
2.1.2. DryIOC 容器
如果使用的 DryIoc 容器,例如 WPF / Avalonia UI 中使用了 Prism 框架的 DryIoc 容器,請搜尋 NuGet 套件 CodeWF.DryIoc.EventBus 並安裝最新版,安裝完成後,在 RegisterTypes 方法中加入下列程式碼:
protected override void RegisterTypes(IContainerRegistry containerRegistry)
{
IContainer? container = containerRegistry.GetContainer();
// ...
// Register EventBus
containerRegistry.AddEventBus();
// ...
// Use EventBus
container.UseEventBus();
}
2.1.3. 任意 IOC 容器
如果使用了其他 IOC 容器的專案,請搜尋 NuGet 套件 CodeWF.IOC.EventBus 並安裝最新版,安裝完成後,根據 IOC 容器註冊單例、取得服務的 API 不同,做相應修改即可。
上面 ASP.NET Core 範例註冊事件匯流排可改為:
// ....
// 1、註冊事件匯流排,將標註 `EventHandler` 特性方法的類別採用單例方式注入 IOC 容器
EventBusExtensions.AddEventBus(
(t1, t2) => builder.Services.AddSingleton(t1, t2),
t => builder.Services.AddSingleton(t),
Assembly.GetExecutingAssembly());
var app = builder.Build();
// ...
// 2、將上面已經注入 IOC 容器的類別取出、關聯處理方法到事件匯流排管理
EventBusExtensions.UseEventBus(t => app.Services.GetRequiredService(t), Assembly.GetExecutingAssembly());
// ...
支援任意 IOC 容器原理就在 AddEventBus 和 UseEventBus 方法:
using CodeWF.EventBus;
using System.Reflection;
namespace CodeWF.IOC.EventBus
{
public static class EventBusExtensions
{
public static void AddEventBus(Action<Type, Type> addSingleton1,
Action<Type> addSingleton2, params Assembly[] assemblies)
{
addSingleton1(typeof(IEventBus), typeof(CodeWF.EventBus.EventBus));
var allAssemblies = assemblies.Concat(new[] { Assembly.GetCallingAssembly() }).ToArray();
CodeWF.EventBus.EventBusExtensions.HandleEventObject(type => addSingleton2(type),
BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic,
allAssemblies);
}
public static void UseEventBus(Func<Type, object> resolveAction, params Assembly[] assemblies)
{
if (!(resolveAction(typeof(IEventBus)) is IEventBus messenger))
{
throw new InvalidOperationException("Please call AddEventBus before calling UseEventBus");
}
var allAssemblies = assemblies.Concat(new[] { Assembly.GetCallingAssembly() }).ToArray();
CodeWF.EventBus.EventBusExtensions.HandleEventObject(
type => messenger.Subscribe(resolveAction(type)),
BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic, allAssemblies);
CodeWF.EventBus.EventBusExtensions.HandleEventObject(type => messenger.Subscribe(type),
BindingFlags.Static | BindingFlags.Public | BindingFlags.NonPublic, allAssemblies);
}
}
}
using System;
using System.Linq;
using System.Reflection;
namespace CodeWF.EventBus
{
public static class EventBusExtensions
{
public static void HandleEventObject(Action<Type> handleRecipient, BindingFlags findHandlerMethodBindingFlags,
Assembly[] assemblies)
{
foreach (var assembly in assemblies)
{
var types = assembly.GetTypes()
.Where(t => t.IsClass
&& !t.IsAbstract
&& t.GetCustomAttributes<EventAttribute>().Any()
&& t.GetMethods(findHandlerMethodBindingFlags)
.Any(m =>
m.GetCustomAttributes<EventHandlerAttribute>().Any()));
foreach (var type in types)
{
handleRecipient(type);
}
}
}
}
}
2.1.4. 未使用任何 IOC 容器
預設的 WPF、Winform、Avalonia UI、控制台程式預設未引入任何 IOC 容器,這類專案我們可以不需要事件服務註冊操作。
我們搜尋 NuGet 套件 CodeWF.EventBus 並安裝最新版,安裝完成後功能使用上和使用 IOC 容器一致,只是欠缺 IOC 注入自動訂閱功能,具體差別請繼續往下看。
2.2. 定義事件
在這裡我們使用 CQRS 來完成我們程式業務邏輯,在 CQRS 模式中我們的查詢和其他業務操作是分開的。不瞭解 CQRS 的可以看看這篇文章:https://learn.microsoft.com/zh-tw/azure/architecture/patterns/cqrs
2.2.1. 定義命令(Command)
在 CQRS 模式中,命令代表寫操作。定義命令類別,這些類別繼承自 Command 類別
public class CreateProductCommand : Command
{
public string Name { get; set; }
public decimal Price { get; set; }
}
public class CreateProductSuccessCommand : Command
{
public string Name { get; set; }
public decimal Price { get; set; }
}
public class DeleteProductCommand : Command
{
public Guid ProductId { get; set; }
}
2.2.2. 定義查詢(Query)
在 CQRS 模式中,查詢代表讀操作。查詢需要等待得到回應,適用於請求/回應。使用查詢,呼叫方只需要關心我需要使用 ProductQuery、ProductsQuery,而不必操心我需要 IProductService、ICategoryService 等服務取得查詢結果。
定義查詢類別,繼承自 Query<T>:
public class ProductQuery : Query<ProductItemDto>
{
public Guid ProductId { get; set; }
public override ProductItemDto Result { get; set; }
}
public class ProductsQuery : Query<List<ProductItemDto>>
{
public string Name { get; set; }
public override List<ProductItemDto> Result { get; set; }
}
Query<T> 中 T 表示查詢回應結果類型,在 XXXQuery 中使用 Result 屬性表示查詢發佈後得到的結果。
Query 繼承自 Command,帶 Result 屬性:
public abstract class Query<TResult> : Command
{
public abstract TResult Result { get; set; }
}
2.3. 訂閱事件(Subscribe)
2.3.1. 自動訂閱
自動訂閱 只能在使用了 IOC 容器的程式中使用,例如 ASP.NET Core 程式。
一般將事件處理程式單獨封裝到一個類別中,程式碼如下:
[Event]
public class CommandAndQueryHandler(IEventBus eventBus, IProductService productService)
{
[EventHandler]
private async Task ReceiveCreateProductCommandAsync(CreateProductCommand command)
{
var isAddSuccess = await productService.AddProductAsync(new CreateProductRequest()
{ Name = command.Name, Price = command.Price });
if (isAddSuccess)
{
await eventBus.PublishAsync(new CreateProductSuccessCommand()
{ Name = command.Name, Price = command.Price });
}
else
{
Console.WriteLine("Create product fail");
}
}
[EventHandler(Order = 2)]
private async Task ReceiveCreateProductSuccessCommandSendEmailAsync(CreateProductSuccessCommand command)
{
Console.WriteLine($"Now send email notify create product success, name is = {command.Name}");
await Task.CompletedTask;
}
[EventHandler(Order = 1)]
private async Task ReceiveCreateProductSuccessCommandSendSmsAsync(CreateProductSuccessCommand command)
{
Console.WriteLine($"Now send sms notify create product success, name is = {command.Name}");
await Task.CompletedTask;
}
[EventHandler(Order = 3)]
private void ReceiveCreateProductSuccessCommandCallPhone(CreateProductSuccessCommand command)
{
Console.WriteLine($"Now call phone notify create product success, name is = {command.Name}");
}
[EventHandler]
private async Task ReceiveDeleteProductCommandAsync(DeleteProductCommand command)
{
var isRemoveSuccess = await productService.RemoveProductAsync(command.ProductId);
Console.WriteLine(isRemoveSuccess ? "Remote product success" : "Remote product fail");
}
[EventHandler]
private async Task ReceiveProductQueryAsync(ProductQuery query)
{
var product = await productService.QueryProductAsync(query.ProductId);
query.Result = product;
}
[EventHandler]
private async Task ReceiveAutoProductsQueryAsync(ProductsQuery query)
{
var products = await productService.QueryProductsAsync(query.Name);
query.Result = products;
}
[EventHandler]
private static async Task ReceiveAutoProductsQueryAsync2(ProductsQuery query)
{
Console.WriteLine("Test auto subscribe static method");
}
}
- 類別
CommandAndQueryHandler加入了Event特性,在IOC容器注入時標識為可以作為單例注入。 - 標註了
EventHandler特性的方法擁有處理事件的能力,該方法只能有一個事件類型參數;如果方法支援非同步,也只支援Task回傳值,不能加泛型宣告(加了無效);支援靜態事件處理方法。
使用 IOC 容器的程式會自動將標註 Event 特性的類別作為單例注入容器,事件匯流排收到事件通知時自動尋找標註 EventHandler 特性的方法進行呼叫,達到事件通知的功能。
2.3.2. 手動訂閱
對於未標註 Event 特性的類別,可手動註冊事件處理程式,如下是未使用 IOC 容器時手動註冊範例(核心是 EventBus.Default 使用):
internal class CommandAndQueryHandler
{
internal void ManuSubscribe()
{
EventBus.Default.Subscribe<DeleteProductCommand>(ReceiveDeleteProductCommandAsync);
EventBus.Default.Subscribe<ProductQuery>(ReceiveProductQueryAsync);
EventBus.Default.Subscribe<ProductsQuery>(ReceiveAutoProductsQueryAsync2);
}
public async Task ReceiveDeleteProductCommandAsync(DeleteProductCommand command)
{
}
public async Task ReceiveProductQueryAsync(ProductQuery query)
{
}
private static async Task ReceiveAutoProductsQueryAsync2(ProductsQuery query)
{
}
}
上面挨個註冊處理方法有時會過於囉嗦,可以簡化:
internal class CommandAndQueryHandler
{
internal CommandAndQueryHandler()
{
EventBus.Default.Subscribe(this);
}
[EventHandler(Order = 2)]
public async Task ReceiveCreateProductSuccessCommandSendEmailAsync(CreateProductSuccessCommand command)
{
}
// ...省略N多事件處理方法,EventBus.Default.Subscribe(this)方法可以自動綁定
}
使用了 IOC 容器,可以注入 IEventBus 服務替換 EventBus.Default 使用,如下範例程式碼:
public class EventBusTestViewModel : ViewModelBase
{
private readonly IEventBus _eventBus;
public MessageTestViewModel(IEventBus eventBus)
{
_eventBus = eventBus;
_eventBus.Subscribe(this);
}
[EventHandler]
public async Task ReceiveDeleteProductCommandAsync(DeleteProductCommand command)
{
var isRemoveSuccess = await productService.RemoveProductAsync(command.ProductId);
Console.WriteLine(isRemoveSuccess ? "Remote product success" : "Remote product fail");
}
}
EventBus 是 IEventBus 介面的預設實作,EventBus.Default 是單例參考,兩者使用任選其一。IOC 注入時預設將 IEventBus 和 EventBus 作為單例注入,所以與兩者等價。
手動訂閱可以在 WPF 的 XxxViewModel 中使用(上面程式碼即是),也可以在 IOC 其他生命週期的服務中使用:
public class TimeService : ITimeService
{
private readonly IEventBus _eventBus;
public TimeService(IEventBus eventBus)
{
_eventBus = eventBus;
_eventBus.Subscribe(this);
}
[EventHandler]
public async Task ReceiveDeleteProductCommandAsync(DeleteProductCommand command)
{
var isRemoveSuccess = await productService.RemoveProductAsync(command.ProductId);
Console.WriteLine(isRemoveSuccess ? "Remote product success" : "Remote product fail");
}
}
手動註冊可運用在無法或不需要單例注入的情況使用,補充特殊情況。
2.4. 發佈事件
發佈命令(Command)與發佈查詢(Query)使用相同的介面,透過 IEventBus 或 EventBus.Default 的 Publish 和 PublishAsync 方法發佈:
_messenger.Publish(this, new DeleteProductCommand { ProductId = id });
var query = new ProductQuery { ProductId = id };
await _messenger.PublishAsync(this, query);
Console.WriteLine($"查詢產品ID為{id}的產品結果是:{query.Result}");
在 B/S 控制器的 Action 使用發佈:
[ApiController]
[Route("[controller]")]
public class EventController : ControllerBase
{
private readonly ILogger<EventController> _logger;
private readonly IEventBus _eventBus;
public EventController(ILogger<EventController> logger, IEventBus eventBus)
{
_logger = logger;
_eventBus = eventBus;
}
[HttpPost("/add")]
public async Task AddAsync([FromBody] CreateProductRequest request)
{
await _eventBus.PublishAsync(new CreateProductCommand { Name = request.Name, Price = request.Price });
}
[HttpDelete("/delete")]
public async Task DeleteAsync([FromQuery] Guid id)
{
await _eventBus.PublishAsync(new DeleteProductCommand { ProductId = id });
}
[HttpGet("/get")]
public async Task<ProductItemDto> GetAsync([FromQuery] Guid id)
{
var query = new ProductQuery { ProductId = id };
await _eventBus.PublishAsync(query);
return query.Result;
}
[HttpGet("/list")]
public async Task<List<ProductItemDto>> ListAsync([FromQuery] string? name)
{
var query = new ProductsQuery { Name = name };
await _eventBus.PublishAsync(query);
return query.Result;
}
}
在 WPF/Avalonia UI 的 XXXViewModel 中使用:
public class EventBusTestViewModel : ViewModelBase
{
private readonly IEventBus _eventBus;
public MessageTestViewModel(IEventBus eventBus)
{
_eventBus = eventBus;
}
public async Task ExecuteEventBusAsync()
{
await _eventBus.PublishAsync(this, new TestMessage(nameof(MessageTestViewModel), TestClass.CurrentTime()));
}
}
2.5. 取消訂閱事件
在實際應用中,你可能需要確保在適當的時機(如服務銷毀時)取消訂閱,以避免記憶體洩漏:
- 登出指定處理程式:
Messenger.Default.Unsubscribe<CreateProductMessage>(this, ReceiveManuCreateProductMessage) - 登出指定類別的所有處理程式:
Messenger.Default.Unsubscribe(this)
3. 核心介面說明
public interface IEventBus
{
void Subscribe<T>() where T : class;
void Subscribe(Type type);
void Subscribe(object recipient);
void Subscribe<TCommand>(Action<TCommand> action) where TCommand : Command;
void Subscribe<TCommand>(Func<TCommand, Task> asyncAction) where TCommand : Command;
void Unsubscribe<T>() where T : class;
void Unsubscribe(object recipient);
void Unsubscribe<TCommand>(Action<TCommand> action) where TCommand : Command;
void Unsubscribe<TCommand>(Func<TCommand, Task> asyncAction) where TCommand : Command;
void Publish<TCommand>(TCommand command) where TCommand : Command;
Task PublishAsync<TCommand>(TCommand command) where TCommand : Command;
}
Subscribe<T>():訂閱類別中靜態事件處理方法Subscribe(Type type):訂閱指定類別類型中靜態事件處理方法Subscribe(object recipient):訂閱指定實例的成員事件處理方法Subscribe<TCommand>(Action<TCommand> action):訂閱普通事件處理方法,包括靜態事件處理方法Subscribe<TCommand>(Func<TCommand, Task> asyncAction):訂閱非同步事件處理方法,包括靜態非同步事件處理方法Unsubscribe<T>():登出類別中靜態事件處理方法Unsubscribe(object recipient):登出指定實例的成員事件處理方法Unsubscribe<TCommand>(Action<TCommand> action):登出普通事件處理方法,包括靜態事件處理方法Unsubscribe<TCommand>(Func<TCommand, Task> asyncAction):登出非同步事件處理方法,包括靜態非同步事件處理方法Publish<TCommand>(TCommand command):同步發佈命令(Command)或查詢(Query)PublishAsync<TCommand>(TCommand command):非同步發佈命令(Command)或查詢(Query)
4. 總結
CodeWF.EventBus 提供了一個小巧靈活的事件匯流排實作,支援 CQRS 模式,並適用於各種專案範本,如 Avalonia UI、WPF、WinForms、ASP.NET Core 等。透過簡單的訂閱和發佈操作,你可以輕鬆實現模組間的解耦與通訊。透過有序的事件處理,確保事件得到妥善處理。
事件匯流排具體實作請查看 CodeWF.EventBus 原始碼: https://github.com/dotnet9/CodeWF.EventBus ,具體使用可參考:
- 單元測試:CodeWF.EventBus.Tests
- AvaloniaUI + Prism:CodeWF.EventBus
- Web API:WebAPIDemo 、CodeWF
開發參考開源專案:
希望本文的指南能幫助你更好地使用 CodeWF.EventBus 來處理你的應用程式中的事件。