使用 Socket 實作的分散式事件匯流排,支援 CQRS,不依賴第三方 MQ。
CodeWF.EventBus.Socket 是一個輕量級的、基於 Socket 的分散式事件匯流排系統,旨在簡化分散式架構中的事件通訊。它允許行程之間透過發布/訂閱模式進行通訊,無需依賴外部訊息佇列服務。
Command

Query

特性
輕量級:不依賴任何外部 MQ 服務,減少了系統複雜性和依賴。
高效能:基於 Socket 的直接通訊,提供低延遲、高吞吐量的訊息傳遞。
靈活性:支援自訂事件類型和訊息處理器,易於整合到現有系統中。
可擴充性:支援多客戶端連線,適用於分散式系統環境。
通訊協定
透過 TCP 協定進行資料互動,協定封包結構如下:

安裝
透過NuGet封裝管理員安裝CodeWF.EventBus.Socket:
Install-Package CodeWF.EventBus.Socket
伺服器端使用
執行事件服務
在伺服器端程式碼中,建立並啟動EventServer執行個體以監聽用戶端連線和事件:
using CodeWF.EventBus.Socket;
// 建立事件伺服器執行個體
IEventServer eventServer = new EventServer();
// 啟動事件伺服器,監聽指定IP和埠
eventServer.Start("127.0.0.1", 9100);
停止事件服務
當不再需要事件服務時,呼叫Stop方法以優雅地關閉伺服器:
eventServer.Stop();
用戶端使用
連線事件服務
在用戶端程式碼中,建立EventClient執行個體並連線到事件伺服器:
using CodeWF.EventBus.Socket;
// 建立事件用戶端執行個體
IEventClient eventClient = new EventClient();
// 連線到事件伺服器,使用eventClient.ConnectStatus檢查連線狀態
eventClient.Connect("127.0.0.1", 9100));
訂閱事件
訂閱特定類型的事件,並指定事件處理函式:
eventClient.Subscribe<NewEmailCommand>("event.email.new", ReceiveNewEmailCommand);
private void ReceiveNewEmail(NewEmailCommand command)
{
// 處理新郵件通知
Console.WriteLine($"收到新郵件,主旨是{message.Subject}");
}
發布命令(Command)
發布事件到指定的主題,供已訂閱的用戶端處理:
// 發布新郵件通知事件
eventClient.Publish("event.email.new", new NewEmailCommand { Subject = "恭喜您中GitHub一等獎", Content = "我們很開心,您在2024年7月...", SendTime = new DateTime(2024, 7, 27) });
查詢(Query)
查詢指定主題,需要有接收查詢端訂閱相同的主題(即生產者),收到請求後,再以相同的主題發布查詢結果:
eventClient.Subscribe<EmailQuery>("event.email.query", ReceiveEmailQuery);
private void ReceiveEmailQuery(EmailQuery query)
{
// 執行查詢請求,準備查詢結果
var response = new EmailQueryResponse { Emails = EmailManager.QueryEmail(request.Subject) };
// 以相同的主題,發布查詢結果
if (_eventClient!.Publish("event.email.query", response,
out var errorMessage))
{
Logger.Info($"Response query result: {response}");
}
else
{
Logger.Error($"Response query failed: {errorMessage}");
}
}
其他端可使用相同的主題查詢(即消費者):
var response = _eventClient!.Query<EmailQuery, EmailQueryResponse>("event.email.query",
new EmailQuery() { Subject = "Account" },
out var errorMessage);
if (string.IsNullOrWhiteSpace(errorMessage) && response != null)
{
Logger.Info($"Query event.email.query, result: {response}");
}
else
{
Logger.Error(
$"Query event.email.query failed: [{errorMessage}]");
}
取消訂閱事件
不再需要接收某類事件時,可以取消訂閱:
eventClient.Unsubscribe<NewEmailNotification>("event.email.new", ReceiveNewEmail);
中斷事件服務
完成事件處理或需要中斷與伺服器的連線時,呼叫Disconnect方法:
eventClient.Disconnect();
Console.WriteLine("中斷與事件服務的連線");
注意事項
- 確保伺服器端和用戶端使用的位址和埠號一致,並且埠未被其他服務佔用。
- 在生產環境中,伺服器端應設定為監聽公用 IP 位址或適當的網路介面。
- 考慮到網路異常和服務重啟等情況,用戶端可能需要實作重連邏輯。
- 根據實際需求,可以擴充
EventServer和EventClient類別以支援更複雜的功能,如訊息加密、認證授權等。