C#使用Socket實現分散式事件匯流排,不依賴第三方MQ

C#使用Socket實現分散式事件匯流排,不依賴第三方MQ

CodeWF.EventBus.Socket 是一個輕量級的、基於Socket的分散式事件匯流排系統,旨在簡化分散式架構中的事件通訊。它允許處理序之間透過發佈/訂閱模式進行通訊,無需依賴外部訊息佇列服務。

最後更新 2024/7/28 上午10:25
沙漠尽头的狼
預計閱讀 4 分鐘
分類
.NET
標籤
.NET C# 架構設計 Distributed EventBus

使用 Socket 實作的分散式事件匯流排,支援 CQRS,不依賴第三方 MQ。

CodeWF.EventBus.Socket 是一個輕量級的、基於 Socket 的分散式事件匯流排系統,旨在簡化分散式架構中的事件通訊。它允許行程之間透過發布/訂閱模式進行通訊,無需依賴外部訊息佇列服務。

Command

Command

Query

Query

特性

  • 輕量級:不依賴任何外部 MQ 服務,減少了系統複雜性和依賴。

  • 高效能:基於 Socket 的直接通訊,提供低延遲、高吞吐量的訊息傳遞。

  • 靈活性:支援自訂事件類型和訊息處理器,易於整合到現有系統中。

  • 可擴充性:支援多客戶端連線,適用於分散式系統環境。

通訊協定

透過 TCP 協定進行資料互動,協定封包結構如下:

0.0.8@2x

安裝

透過NuGet封裝管理員安裝CodeWF.EventBus.Socket

Install-Package CodeWF.EventBus.Socket

伺服器端使用

執行事件服務

在伺服器端程式碼中,建立並啟動EventServer執行個體以監聽用戶端連線和事件:

using CodeWF.EventBus.Socket;

// 建立事件伺服器執行個體
IEventServer eventServer = new EventServer();

// 啟動事件伺服器,監聽指定IP和埠
eventServer.Start("127.0.0.1", 9100);

停止事件服務

當不再需要事件服務時,呼叫Stop方法以優雅地關閉伺服器:

eventServer.Stop();

用戶端使用

連線事件服務

在用戶端程式碼中,建立EventClient執行個體並連線到事件伺服器:

using CodeWF.EventBus.Socket;

// 建立事件用戶端執行個體
IEventClient eventClient = new EventClient();

// 連線到事件伺服器,使用eventClient.ConnectStatus檢查連線狀態
eventClient.Connect("127.0.0.1", 9100));

訂閱事件

訂閱特定類型的事件,並指定事件處理函式:

eventClient.Subscribe<NewEmailCommand>("event.email.new", ReceiveNewEmailCommand);

private void ReceiveNewEmail(NewEmailCommand command)
{
    // 處理新郵件通知
    Console.WriteLine($"收到新郵件,主旨是{message.Subject}");
}

發布命令(Command)

發布事件到指定的主題,供已訂閱的用戶端處理:

// 發布新郵件通知事件
eventClient.Publish("event.email.new", new NewEmailCommand { Subject = "恭喜您中GitHub一等獎", Content = "我們很開心,您在2024年7月...", SendTime = new DateTime(2024, 7, 27) });

查詢(Query)

查詢指定主題,需要有接收查詢端訂閱相同的主題(即生產者),收到請求後,再以相同的主題發布查詢結果:

eventClient.Subscribe<EmailQuery>("event.email.query", ReceiveEmailQuery);

private void ReceiveEmailQuery(EmailQuery query)
{
    // 執行查詢請求,準備查詢結果
    var response = new EmailQueryResponse { Emails = EmailManager.QueryEmail(request.Subject) };

    // 以相同的主題,發布查詢結果
    if (_eventClient!.Publish("event.email.query", response,
        out var errorMessage))
    {
        Logger.Info($"Response query result: {response}");
    }
    else
    {
        Logger.Error($"Response query failed: {errorMessage}");
    }
}

其他端可使用相同的主題查詢(即消費者):

var response = _eventClient!.Query<EmailQuery, EmailQueryResponse>("event.email.query",
    new EmailQuery() { Subject = "Account" },
    out var errorMessage);
if (string.IsNullOrWhiteSpace(errorMessage) && response != null)
{
    Logger.Info($"Query event.email.query, result: {response}");
}
else
{
    Logger.Error(
        $"Query event.email.query failed: [{errorMessage}]");
}

取消訂閱事件

不再需要接收某類事件時,可以取消訂閱:

eventClient.Unsubscribe<NewEmailNotification>("event.email.new", ReceiveNewEmail);

中斷事件服務

完成事件處理或需要中斷與伺服器的連線時,呼叫Disconnect方法:

eventClient.Disconnect();
Console.WriteLine("中斷與事件服務的連線");

注意事項

  • 確保伺服器端和用戶端使用的位址和埠號一致,並且埠未被其他服務佔用。
  • 在生產環境中,伺服器端應設定為監聽公用 IP 位址或適當的網路介面。
  • 考慮到網路異常和服務重啟等情況,用戶端可能需要實作重連邏輯。
  • 根據實際需求,可以擴充EventServerEventClient類別以支援更複雜的功能,如訊息加密、認證授權等。
繼續探索

延伸閱讀

更多文章
同分類 / 同標籤 2026/2/7

AOT使用經驗總結

從專案建立伊始,就應養成良好的習慣,即只要添加了新功能或使用了較新的語法,就及時進行 AOT 發布測試。

繼續閱讀