c#使用socket實現分布式事件總線,不依賴第三方mq

c#使用socket實現分布式事件總線,不依賴第三方mq

codewf.eventbus.socket 是一個輕量級的、基於socket的分布式事件總線系統,旨在簡化分布式架構中的事件通信。它允許進程之間通過發布/訂閱模式進行通信,無需依賴外部消息隊列服務。

最后更新 2024/7/28 上午10:25
沙漠尽头的狼
预计阅读 4 分钟
分类
.NET
标签
.NET C# 架構設計 Distributed EventBus

使用 socket 實現的分布式事件總線,支持 cqrs,不依賴第三方 mq。

CodeWF.EventBus.Socket 是一个轻量级的、基于 Socket 的分布式事件总线系统,旨在简化分布式架构中的事件通信。它允许进程之间通过发布/订阅模式进行通信,无需依赖外部消息队列服务。

Command

Command

Query

Query

特性

  • 輕量級:不依賴任何外部 mq 服務,減少了系統複雜性和依賴。

  • 高性能:基於 socket 的直接通信,提供低延遲、高吞吐量的消息傳遞。

  • 靈活性:支持自定義事件類型和消息處理器,易於集成到現有系統中。

  • 可擴展性:支持多客戶端連接,適用於分布式系統環境。

通信協議

通过 TCP 协议进行数据交互,协议包结构如下:

0.0.8@2x

安裝

通过NuGet包管理器安装CodeWF.EventBus.Socket

Install-Package CodeWF.EventBus.Socket

服務端使用

運行事件服務

在服务端代码中,创建并启动EventServer实例以监听客户端连接和事件:

using CodeWF.EventBus.Socket;

// 创建事件服务器实例
IEventServer eventServer = new EventServer();

// 启动事件服务器,监听指定IP和端口
eventServer.Start("127.0.0.1", 9100);

停止事件服務

当不再需要事件服务时,调用Stop方法以优雅地关闭服务器:

eventServer.Stop();

客戶端使用

連接事件服務

在客户端代码中,创建EventClient实例并连接到事件服务器:

using CodeWF.EventBus.Socket;

// 创建事件客户端实例
IEventClient eventClient = new EventClient();

// 连接到事件服务器,使用eventClient.ConnectStatus检查连接状态
eventClient.Connect("127.0.0.1", 9100));

訂閱事件

訂閱特定類型的事件,並指定事件處理函數:

eventClient.Subscribe<NewEmailCommand>("event.email.new", ReceiveNewEmailCommand);

private void ReceiveNewEmail(NewEmailCommand command)
{
    // 处理新邮件通知
    Console.WriteLine($"收到新邮件,主题是{message.Subject}");
}

發布命令(command)

發布事件到指定的主題,供已訂閱的客戶端處理:

// 发布新邮件通知事件
eventClient.Publish("event.email.new", new NewEmailCommand { Subject = "恭喜您中GitHub一等奖", Content = "我们很开心,您在2024年7月...", SendTime = new DateTime(2024, 7, 27) });

查詢(query)

查詢指定主題,需要有接收查詢端訂閱相同的主題(即生產者),收到請求後,再以相同的主題發布查詢結果:

eventClient.Subscribe<EmailQuery>("event.email.query", ReceiveEmailQuery);

private void ReceiveEmailQuery(EmailQuery query)
{
    // 执行查询请求,准备查询结果
    var response = new EmailQueryResponse { Emails = EmailManager.QueryEmail(request.Subject) };

    // 以相同的主题,发布查询结果
    if (_eventClient!.Publish("event.email.query", response,
        out var errorMessage))
    {
        Logger.Info($"Response query result: {response}");
    }
    else
    {
        Logger.Error($"Response query failed: {errorMessage}");
    }
}

其他端可使用相同的主題查詢(即消費者):

var response = _eventClient!.Query<EmailQuery, EmailQueryResponse>("event.email.query",
    new EmailQuery() { Subject = "Account" },
    out var errorMessage);
if (string.IsNullOrWhiteSpace(errorMessage) && response != null)
{
    Logger.Info($"Query event.email.query, result: {response}");
}
else
{
    Logger.Error(
        $"Query event.email.query failed: [{errorMessage}]");
}

取消訂閱事件

不再需要接收某類事件時,可以取消訂閱:

eventClient.Unsubscribe<NewEmailNotification>("event.email.new", ReceiveNewEmail);

斷開事件服務

完成事件处理或需要断开与服务器的连接时,调用Disconnect方法:

eventClient.Disconnect();
Console.WriteLine("断开与事件服务的连接");

注意事項

  • 確保服務端和客戶端使用的地址和埠號一致,並且埠未被其他服務占用。
  • 在生產環境中,服務端應配置為監聽公共 ip 地址或適當的網絡接口。
  • 考慮到網絡異常和服務重啟等情況,客戶端可能需要實現重連邏輯。
  • 根据实际需求,可以扩展EventServerEventClient类以支持更复杂的功能,如消息加密、认证授权等。
Keep Exploring

延伸阅读

更多文章
同分类 / 同标签 2026/2/7

aot使用經驗總結

從項目創建伊始,就應養成良好的習慣,即只要添加了新功能或使用了較新的語法,就及時進行 aot 發布測試。

继续阅读