CodeWF.EventBus: Lightweight event bus for smoother communication

CodeWF.EventBus: Lightweight event bus for smoother communication

CodeWF.EventBus, a flexible event bus library that enables decoupling communication between modules. Supports multiple. NET project types, such as WPF, WinForms, ASP.NET Core, etc. Adopt concise design to easily implement command publishing and subscribing, request and response. Ensure that incidents are properly handled through orderly incident handling. Streamline your code and improve system maintainability.

最后更新 6/20/2024 1:02 PM
沙漠尽头的狼
预计阅读 14 分钟
分类
.NET
标签
.NET C# ASP.NET Core WPF Winform

1. preface

事件总线,即EventBus,是一种解耦模块间通讯的强大工具。在 CodeWF.EventBus 中,我们得以轻松实现CQRS模式,并通过清晰、简洁的接口进行事件订阅与发布。接下来,我们将详细探讨如何使用这个库来处理事件。

CQRS, whose full name is Command Query Responsibility Segregation, is a software architecture pattern that aims to improve system performance, scalability and responsiveness by separating command (write operations) and query (read operations) responsibilities in the system.

CodeWF.EventBus 适用于进程内事件传递(无其他外部依赖),与 MediatR 功能类似。MediatR库侧重于ASP.NET Core设计,且其功能更加强大,CodeWF.EventBus库优势:

  1. Small and flexible, designed to be used in various template projects such as WPF, Winform, Avalonia UI, ASP.NET Core, etc.
  2. 支持使用了任何 IOC 容器的项目。
  3. 参考MASA Framework增强事件处理能力,支持一个类定义多个事件处理方法:

2. instructions for use

2.1. Register Event Bus

2.1.1. MS. DI Container

主要是ASP.NET Core程序,比如 MVC、Razor Pages、Blazor Server 等模板程序,请搜索 NuGet 包CodeWF.AspNetCore.EventBus并安装最新版,安装完成后,在Program中添加如下代码:

// ....

// 1、注册事件总线,将标注`EventHandler`特性方法的类采用单例方式注入IOC容器
builder.Services.AddEventBus();

var app = builder.Build();

// ...

// 2、将上面已经注入IOC容器的类取出、关联处理方法到事件总线管理
app.UseEventBus();

// ...
  • AddEventBus方法会扫描传入的程序集列表,将标注Event特性的类下又标注EventHandler特性方法的类采用单例方式注入 IOC 容器。
  • UseEventBus方法会将上一步注入的类通过 IOC 容器获取到实例,将实例的事件处理方法注册到事件管理队列中去,待收到事件发布时,会从事件管理队列中查找事件处理方法并调用,达到事件通知的功能。

2.1.2. DryIOC Container

如果使用的DryIoc容器,比如 WPF /Avalonia UI中使用了 Prism 框架的DryIoc容器,请搜索 NuGet 包CodeWF.DryIoc.EventBus并安装最新版,安装完成后,在RegisterTypes方法中添加如下代码:

protected override void RegisterTypes(IContainerRegistry containerRegistry)
{
    IContainer? container = containerRegistry.GetContainer();

    // ...

    // Register EventBus
    containerRegistry.AddEventBus();

    // ...

    // Use EventBus
    container.UseEventBus();
}

2.1.3. Any IOC container

如果使用了其他IOC容器的项目,请搜索 NuGet 包CodeWF.IOC.EventBus并安装最新版,安装完成后,根据 IOC 容器注册单例、获取服务的 API 不同,做相应修改即可。

上面ASP.NET Core示例注册事件总线可改为:

// ....

// 1、注册事件总线,将标注`EventHandler`特性方法的类采用单例方式注入IOC容器
EventBusExtensions.AddEventBus(
    (t1, t2) => builder.Services.AddSingleton(t1, t2),
    t => builder.Services.AddSingleton(t),
    Assembly.GetExecutingAssembly());

var app = builder.Build();

// ...

// 2、将上面已经注入IOC容器的类取出、关联处理方法到事件总线管理
EventBusExtensions.UseEventBus(t => app.Services.GetRequiredService(t), Assembly.GetExecutingAssembly());

// ...

支持任意IOC容器原理就在AddEventBusUseEventBus方法:

using CodeWF.EventBus;
using System.Reflection;

namespace CodeWF.IOC.EventBus
{
    public static class EventBusExtensions
    {
        public static void AddEventBus(Action<Type, Type> addSingleton1,
            Action<Type> addSingleton2, params Assembly[] assemblies)
        {
            addSingleton1(typeof(IEventBus), typeof(CodeWF.EventBus.EventBus));

            var allAssemblies = assemblies.Concat(new[] { Assembly.GetCallingAssembly() }).ToArray();

            CodeWF.EventBus.EventBusExtensions.HandleEventObject(type => addSingleton2(type),
                BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic,
                allAssemblies);
        }

        public static void UseEventBus(Func<Type, object> resolveAction, params Assembly[] assemblies)
        {
            if (!(resolveAction(typeof(IEventBus)) is IEventBus messenger))
            {
                throw new InvalidOperationException("Please call AddEventBus before calling UseEventBus");
            }

            var allAssemblies = assemblies.Concat(new[] { Assembly.GetCallingAssembly() }).ToArray();

            CodeWF.EventBus.EventBusExtensions.HandleEventObject(
                type => messenger.Subscribe(resolveAction(type)),
                BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic, allAssemblies);

            CodeWF.EventBus.EventBusExtensions.HandleEventObject(type => messenger.Subscribe(type),
                BindingFlags.Static | BindingFlags.Public | BindingFlags.NonPublic, allAssemblies);
        }
    }
}
using System;
using System.Linq;
using System.Reflection;

namespace CodeWF.EventBus
{
    public static class EventBusExtensions
    {
        public static void HandleEventObject(Action<Type> handleRecipient, BindingFlags findHandlerMethodBindingFlags,
            Assembly[] assemblies)
        {
            foreach (var assembly in assemblies)
            {
                var types = assembly.GetTypes()
                    .Where(t => t.IsClass
                                && !t.IsAbstract
                                && t.GetCustomAttributes<EventAttribute>().Any()
                                && t.GetMethods(findHandlerMethodBindingFlags)
                                    .Any(m =>
                                        m.GetCustomAttributes<EventHandlerAttribute>().Any()));

                foreach (var type in types)
                {
                    handleRecipient(type);
                }
            }
        }
    }
}

2.1.4. No IOC containers used

The default WPF, Winform, Avalonia UI, and console programs do not introduce any IOC containers by default. We do not need event service registration operations for such projects.

我们搜索 NuGet 包CodeWF.EventBus并安装最新版,安装完成后功能使用上和使用IOC容器一致,只是欠缺IOC注入自动订阅功能,具体差别请继续往下看。

2.2. defined event

Here we use CQRS to complete the business logic of our program, and in CQRS mode our queries are separated from other business operations. Those who don't know about CQRS can check out this article: learn.microsoft.com/zh-cn/azure/architecture/patterns/cqrs

2.2.1. Define Command

在CQRS模式中,命令代表写操作。定义命令类,这些类继承自Command

public class CreateProductCommand : Command
{
    public string Name { get; set; }
    public decimal Price { get; set; }
}
public class CreateProductSuccessCommand : Command
{
    public string Name { get; set; }
    public decimal Price { get; set; }
}
public class DeleteProductCommand : Command
{
    public Guid ProductId { get; set; }
}

2.2.2. Define Query

在CQRS模式中,查询代表读操作。查询需要等待得到回应,适用于请求/响应。使用查询,调用方只需要关心我需要使用ProductQueryProductsQuery,而不必操心我需要IProductServiceICategoryService等服务获取查询结果。

定义查询类,继承自Query<T>:

public class ProductQuery : Query<ProductItemDto>
{
    public Guid ProductId { get; set; }
    public override ProductItemDto Result { get; set; }
}
public class ProductsQuery : Query<List<ProductItemDto>>
{
    public string Name { get; set; }
    public override List<ProductItemDto> Result { get; set; }
}

Query<T>中T表示查询响应结果类型,在XXXQuery中使用Result属性表示查询发布后得到的结果。

Query继承自Command,带Result属性:

public abstract class Query<TResult> : Command
{
    public abstract TResult Result { get; set; }
}

2.3. Subscribe to events

2.3.1. automatically subscribe to

自动订阅只能在使用了IOC容器的程序中使用,比如ASP.NET Core程序。

Generally, event handlers are encapsulated separately into a class, and the code is as follows:

[Event]
public class CommandAndQueryHandler(IEventBus eventBus, IProductService productService)
{
    [EventHandler]
    private async Task ReceiveCreateProductCommandAsync(CreateProductCommand command)
    {
        var isAddSuccess = await productService.AddProductAsync(new CreateProductRequest()
            { Name = command.Name, Price = command.Price });
        if (isAddSuccess)
        {
            await eventBus.PublishAsync(new CreateProductSuccessCommand()
                { Name = command.Name, Price = command.Price });
        }
        else
        {
            Console.WriteLine("Create product fail");
        }
    }

    [EventHandler(Order = 2)]
    private async Task ReceiveCreateProductSuccessCommandSendEmailAsync(CreateProductSuccessCommand command)
    {
        Console.WriteLine($"Now send email notify create product success, name is = {command.Name}");
        await Task.CompletedTask;
    }

    [EventHandler(Order = 1)]
    private async Task ReceiveCreateProductSuccessCommandSendSmsAsync(CreateProductSuccessCommand command)
    {
        Console.WriteLine($"Now send sms notify create product success, name is = {command.Name}");
        await Task.CompletedTask;
    }

    [EventHandler(Order = 3)]
    private void ReceiveCreateProductSuccessCommandCallPhone(CreateProductSuccessCommand command)
    {
        Console.WriteLine($"Now call phone notify create product success, name is = {command.Name}");
    }

    [EventHandler]
    private async Task ReceiveDeleteProductCommandAsync(DeleteProductCommand command)
    {
        var isRemoveSuccess = await productService.RemoveProductAsync(command.ProductId);
        Console.WriteLine(isRemoveSuccess ? "Remote product success" : "Remote product fail");
    }

    [EventHandler]
    private async Task ReceiveProductQueryAsync(ProductQuery query)
    {
        var product = await productService.QueryProductAsync(query.ProductId);
        query.Result = product;
    }

    [EventHandler]
    private async Task ReceiveAutoProductsQueryAsync(ProductsQuery query)
    {
        var products = await productService.QueryProductsAsync(query.Name);
        query.Result = products;
    }

    [EventHandler]
    private static async Task ReceiveAutoProductsQueryAsync2(ProductsQuery query)
    {
        Console.WriteLine("Test auto subscribe static method");
    }
}
  • CommandAndQueryHandler添加了Event特性,在 IOC 容器注入时标识为可以作为单例注入。
  • 标注了EventHandler特性的方法拥有处理事件的能力,该方法只能有一个事件类型参数;如果方法支持异步,也只支持Task返回值,不能加泛型声明(加了无效);支持静态事件处理方法。

使用 IOC 容器的程序会自动将标注Event特性的类作为单例注入容器,事件总线收到事件通知时自动查找标注EventHandler特性的方法进行调用,达到事件通知的功能。

2.3.2. Manual subscription

对于未标注Event特性的类,可手动注册事件处理程序,如下是未使用 IOC容器时手动注册示例(核心是EventBus.Default使用):

internal class CommandAndQueryHandler
{
    internal void ManuSubscribe()
    {
        EventBus.Default.Subscribe<DeleteProductCommand>(ReceiveDeleteProductCommandAsync);
        EventBus.Default.Subscribe<ProductQuery>(ReceiveProductQueryAsync);
        EventBus.Default.Subscribe<ProductsQuery>(ReceiveAutoProductsQueryAsync2);
    }

    public async Task ReceiveDeleteProductCommandAsync(DeleteProductCommand command)
    {
    }

    public async Task ReceiveProductQueryAsync(ProductQuery query)
    {
    }

    private static async Task ReceiveAutoProductsQueryAsync2(ProductsQuery query)
    {
    }
}

The above registration processing methods are sometimes too verbose and can be simplified:

internal class CommandAndQueryHandler
{
    internal CommandAndQueryHandler()
    {
        EventBus.Default.Subscribe(this);
    }

    [EventHandler(Order = 2)]
    public async Task ReceiveCreateProductSuccessCommandSendEmailAsync(CreateProductSuccessCommand command)
    {
    }

    // ...省略N多事件处理方法,EventBus.Default.Subscribe(this)方法可以自动绑定
}

使用了 IOC容器,可以注入IEventBus服务替换EventBus.Default使用,下如示例代码:

public class EventBusTestViewModel : ViewModelBase
{
    private readonly IEventBus _eventBus;

    public MessageTestViewModel(IEventBus eventBus)
    {
        _eventBus = eventBus;
        _eventBus.Subscribe(this);
    }
    
    [EventHandler]
    public async Task ReceiveDeleteProductCommandAsync(DeleteProductCommand command)
    {
        var isRemoveSuccess = await productService.RemoveProductAsync(command.ProductId);
        Console.WriteLine(isRemoveSuccess ? "Remote product success" : "Remote product fail");
    }
}

EventBusIEventBus接口的默认实现,EventBus.Default是单例引用,所有两者使用任选其一。IOC注入时默认将IEventBusEventBus作为单例注入,所以与两者等价。

手动订阅可以在 WPF 的 XxxViewModel 中使用(上面代码即是),也可以在 IOC 其他生命周期的服务中使用:

public class TimeService : ITimeService
{
    private readonly IEventBus _eventBus;

    public TimeService(IEventBus eventBus)
    {
        _eventBus = eventBus;
        _eventBus.Subscribe(this);
    }
    
	[EventHandler]
    public async Task ReceiveDeleteProductCommandAsync(DeleteProductCommand command)
    {
        var isRemoveSuccess = await productService.RemoveProductAsync(command.ProductId);
        Console.WriteLine(isRemoveSuccess ? "Remote product success" : "Remote product fail");
    }
}

Manual registration can be used when single injection is not possible or required to supplement special circumstances.

2.4. publish events

发布命令(Command)与发布查询(Query)使用相同的接口,通过IEventBusEventBus.DefaultPublishPublishAsync方法发布:

_messenger.Publish(this, new DeleteProductCommand { ProductId = id });
var query = new ProductQuery { ProductId = id };
await _messenger.PublishAsync(this, query);
Console.WriteLine($"查询产品ID为{id}的产品结果是:{query.Result}");

B/S控制器的Action使用发布:

[ApiController]
[Route("[controller]")]
public class EventController : ControllerBase
{
    private readonly ILogger<EventController> _logger;
    private readonly IEventBus _eventBus;

    public EventController(ILogger<EventController> logger, IEventBus eventBus)
    {
        _logger = logger;
        _eventBus = eventBus;
    }

    [HttpPost("/add")]
    public async Task AddAsync([FromBody] CreateProductRequest request)
    {
        await _eventBus.PublishAsync(new CreateProductCommand { Name = request.Name, Price = request.Price });
    }

    [HttpDelete("/delete")]
    public async Task DeleteAsync([FromQuery] Guid id)
    {
        await _eventBus.PublishAsync(new DeleteProductCommand { ProductId = id });
    }

    [HttpGet("/get")]
    public async Task<ProductItemDto> GetAsync([FromQuery] Guid id)
    {
        var query = new ProductQuery { ProductId = id };
        await _eventBus.PublishAsync(query);
        return query.Result;
    }

    [HttpGet("/list")]
    public async Task<List<ProductItemDto>> ListAsync([FromQuery] string? name)
    {
        var query = new ProductsQuery { Name = name };
        await _eventBus.PublishAsync(query);
        return query.Result;
    }
}

WPF/Avalonia UIXXXViewModel中使用:

public class EventBusTestViewModel : ViewModelBase
{
    private readonly IEventBus _eventBus;

    public MessageTestViewModel(IEventBus eventBus)
    {
        _eventBus = eventBus;
    }

    public async Task ExecuteEventBusAsync()
    {
        await _eventBus.PublishAsync(this, new TestMessage(nameof(MessageTestViewModel), TestClass.CurrentTime()));
    }
}

2.5. Unsubscribe from events

In practice, you may need to ensure that you unsubscribe at the right time (such as when a service is destroyed) to avoid memory leaks:

  1. 注销指定处理程序:Messenger.Default.Unsubscribe<CreateProductMessage>(this, ReceiveManuCreateProductMessage)
  2. 注销指定类的所有处理程序:Messenger.Default.Unsubscribe(this)

3. Core interface description

public interface IEventBus
{
    void Subscribe<T>() where T : class;
    void Subscribe(Type type);
    void Subscribe(object recipient);
    void Subscribe<TCommand>(Action<TCommand> action) where TCommand : Command;
    void Subscribe<TCommand>(Func<TCommand, Task> asyncAction) where TCommand : Command;
    void Unsubscribe<T>() where T : class;
    void Unsubscribe(object recipient);
    void Unsubscribe<TCommand>(Action<TCommand> action) where TCommand : Command;
    void Unsubscribe<TCommand>(Func<TCommand, Task> asyncAction) where TCommand : Command;
    void Publish<TCommand>(TCommand command) where TCommand : Command;
    Task PublishAsync<TCommand>(TCommand command) where TCommand : Command;
}
  • Subscribe<T>():订阅类中静态事件处理方法
  • Subscribe(Type type):订阅指定类类型中静态事件处理方法
  • Subscribe(object recipient):订阅指定实例的成员事件处理方法
  • Subscribe<TCommand>(Action<TCommand> action):订阅普通事件处理方法,包括静态事件处理方法
  • Subscribe<TCommand>(Func<TCommand, Task> asyncAction):订阅异步事件处理方法,包括静态异步事件处理方法
  • Unsubscribe<T>():注销类中静态事件处理方法
  • Unsubscribe(object recipient):注销指定实例的成员事件处理方法
  • Unsubscribe<TCommand>(Action<TCommand> action):注销普通事件处理方法,包括静态事件处理方法
  • Unsubscribe<TCommand>(Func<TCommand, Task> asyncAction):注销异步事件处理方法,包括静态异步事件处理方法
  • Publish<TCommand>(TCommand command):同步发布命令(Command)或查询(Query)
  • PublishAsync<TCommand>(TCommand command):异步发布命令(Command)或查询(Query)

4. summary

CodeWF.EventBus提供了一个小巧灵活的事件总线实现,支持CQRS模式,并适用于各种项目模板,如 Avalonia UI、WPF、WinForms、ASP.NET Core 等。通过简单的订阅和发布操作,你可以轻松实现模块间的解耦和通讯。通过有序的事件处理,确保事件得到妥善处理。

事件总线具体实现请查看CodeWF.EventBus源码: https://github.com/dotnet9/CodeWF.EventBus ,具体使用可参考:

  1. 单元测试:CodeWF.EventBus.Tests
  2. AvaloniaUI + Prism:CodeWF.EventBus
  3. Web API:WebAPIDemoCodeWF

Develop reference open source projects:

  1. Messenger | MvvmCross
  2. Prism.Events
  3. MediatR
  4. MASA Framework

希望本文的指南能帮助你更好地使用CodeWF.EventBus来处理你的应用程序中的事件。

Keep Exploring

延伸阅读

更多文章
同分类 / 同标签 5/27/2025

WPF achieves a danger warning effect

When the program we write is released, the user is doing some dangerous operations. Our software should give some reminder effects, such as red on the edge of the border, similar to the alarm reminder effect like Gaode Map

继续阅读