C#でSocketを使用した分散イベントバスの実装、サードパーティMQに依存しない

C#でSocketを使用した分散イベントバスの実装、サードパーティMQに依存しない

CodeWF.EventBus.Socket は、軽量でSocketベースの分散イベントバスシステムであり、分散アーキテクチャにおけるイベント通信を簡素化することを目的としています。プロセス間でパブリッシュ/サブスクライブパターンを使用して通信でき、外部のメッセージキューサービスに依存する必要はありません。

最終更新 2024/07/28 10:25
沙漠尽头的狼
読了目安 3 分
カテゴリ
.NET
タグ
.NET C# アーキテクチャ設計 Distributed EventBus

Socket を使用した分散イベントバス。CQRS をサポートし、サードパーティの MQ に依存しません。

CodeWF.EventBus.Socket は、Socket ベースの軽量な分散イベントバスシステムで、分散アーキテクチャにおけるイベント通信を簡素化することを目的としています。プロセス間でパブリッシュ/サブスクライブパターンを利用した通信を可能にし、外部メッセージキューサービスに依存しません。

Command

Command

Query

Query

特徴

  • 軽量:外部 MQ サービスに依存しないため、システムの複雑さと依存関係を軽減します。

  • 高性能:Socket ベースの直接通信により、低レイテンシ、高スループットのメッセージ配信を実現します。

  • 柔軟性:カスタムイベントタイプとメッセージハンドラをサポートし、既存システムへの統合が容易です。

  • 拡張性:マルチクライアント接続をサポートし、分散システム環境に適しています。

通信プロトコル

TCP プロトコルを介してデータをやり取りします。プロトコルパケット構造は以下の通りです。

0.0.8@2x

インストール

NuGet パッケージマネージャーを使用して CodeWF.EventBus.Socket をインストールします。

Install-Package CodeWF.EventBus.Socket

サーバー側の使用方法

イベントサービスの起動

サーバー側コードでは、EventServer インスタンスを作成して起動し、クライアント接続とイベントを待ち受けます。

using CodeWF.EventBus.Socket;

// イベントサーバーインスタンスを作成
IEventServer eventServer = new EventServer();

// イベントサーバーを起動し、指定されたIPとポートで待ち受け
eventServer.Start("127.0.0.1", 9100);

イベントサービスの停止

イベントサービスが不要になった場合は、Stop メソッドを呼び出してサーバーを適切に停止します。

eventServer.Stop();

クライアント側の使用方法

イベントサービスへの接続

クライアントコードでは、EventClient インスタンスを作成し、イベントサーバーに接続します。

using CodeWF.EventBus.Socket;

// イベントクライアントインスタンスを作成
IEventClient eventClient = new EventClient();

// イベントサーバーに接続。eventClient.ConnectStatus で接続状態を確認可能
eventClient.Connect("127.0.0.1", 9100));

イベントのサブスクライブ

特定のイベントタイプをサブスクライブし、イベントハンドラを指定します。

eventClient.Subscribe<NewEmailCommand>("event.email.new", ReceiveNewEmailCommand);

private void ReceiveNewEmail(NewEmailCommand command)
{
    // 新しいメール通知を処理
    Console.WriteLine($"新しいメールを受信しました。件名は {message.Subject} です。");
}

コマンドのパブリッシュ

指定されたトピックにイベントを発行し、サブスクライブしているクライアントが処理します。

// 新しいメール通知イベントを発行
eventClient.Publish("event.email.new", new NewEmailCommand { Subject = "GitHub 1等賞おめでとうございます", Content = "2024年7月に...", SendTime = new DateTime(2024, 7, 27) });

クエリ (Query)

指定されたトピックをクエリします。受信側(プロデューサー)が同じトピックをサブスクライブしている必要があります。リクエストを受け取ったら、同じトピックでクエリ結果を発行します。

eventClient.Subscribe<EmailQuery>("event.email.query", ReceiveEmailQuery);

private void ReceiveEmailQuery(EmailQuery query)
{
    // クエリリクエストを実行し、結果を準備
    var response = new EmailQueryResponse { Emails = EmailManager.QueryEmail(request.Subject) };

    // 同じトピックでクエリ結果を発行
    if (_eventClient!.Publish("event.email.query", response,
        out var errorMessage))
    {
        Logger.Info($"クエリ結果を応答: {response}");
    }
    else
    {
        Logger.Error($"クエリ応答に失敗: {errorMessage}");
    }
}

他のエンドポイントでは、同じトピックを使用してクエリ(コンシューマー)を実行できます。

var response = _eventClient!.Query<EmailQuery, EmailQueryResponse>("event.email.query",
    new EmailQuery() { Subject = "Account" },
    out var errorMessage);
if (string.IsNullOrWhiteSpace(errorMessage) && response != null)
{
    Logger.Info($"イベント email.query をクエリ、結果: {response}");
}
else
{
    Logger.Error(
        $"イベント email.query のクエリに失敗: [{errorMessage}]");
}

イベントのサブスクライブ解除

特定のイベントの受信が不要になった場合は、サブスクライブを解除します。

eventClient.Unsubscribe<NewEmailNotification>("event.email.new", ReceiveNewEmail);

イベントサービスからの切断

イベント処理が完了した場合やサーバーとの接続を切断する必要がある場合は、Disconnect メソッドを呼び出します。

eventClient.Disconnect();
Console.WriteLine("イベントサービスとの接続を切断しました");

注意事項

  • サーバー側とクライアント側で使用するアドレスとポート番号を一致させ、ポートが他のサービスによって占有されていないことを確認してください。
  • 本番環境では、サーバーはパブリック IP アドレスまたは適切なネットワークインターフェイスで待ち受けるように設定する必要があります。
  • ネットワーク障害やサーバー再起動などの状況を考慮して、クライアントに再接続ロジックを実装する必要がある場合があります。
  • 実際の要件に応じて、EventServer および EventClient クラスを拡張し、メッセージ暗号化や認証認可などのより複雑な機能をサポートすることができます。
さらに探索

関連読書

その他の記事
同じカテゴリ / 同じタグ 2026/04/22

各OSバージョンの.NETサポート状況(250707更新)

仮想マシンとテストマシンを使用して、各OSバージョンの.NETサポート状況を確認します。OSインストール後、対応するランタイムをインストールし、Stardustエージェントを実行できることを確認します(合格条件)。

続きを読む
同じカテゴリ / 同じタグ 2026/02/07

AOTの使用経験のまとめ

プロジェクト作成当初から、新機能を追加したり新しい構文を使用したりした場合には、すぐにAOT公開テストを実施するという良い習慣を身につけるべきです。

続きを読む