C#uses Socket to implement a distributed event bus and does not rely on third-party MQ

C#uses Socket to implement a distributed event bus and does not rely on third-party MQ

CodeWF.EventBus.Socket is a lightweight, Socket-based distributed event bus system designed to simplify event communication in a distributed architecture. It allows communication between processes through a publish/subscribe model without relying on external message queuing services.

最后更新 7/28/2024 10:25 AM
沙漠尽头的狼
预计阅读 4 分钟
分类
.NET
标签
.NET C# architecture design Distributed EventBus

** Distributed event bus implemented using Socket, supports CQRS, and does not rely on third-party MQ. **

CodeWF.EventBus.Socket 是一个轻量级的、基于 Socket 的分布式事件总线系统,旨在简化分布式架构中的事件通信。它允许进程之间通过发布/订阅模式进行通信,无需依赖外部消息队列服务。

Command

Command

Query

Query

characteristics

  • ** Lightweight **: Does not rely on any external MQ services, reducing system complexity and dependence.

  • ** High performance **: Socket based direct communication provides low-latency, high-throughput message delivery.

  • ** Flexibility **: Supports custom event types and message processors, making it easy to integrate into existing systems.

  • ** Extensibility **: Supports multi-client connections and is suitable for distributed system environments.

communication protocol

通过 TCP 协议进行数据交互,协议包结构如下:

0.0.8@2x

installation

通过NuGet包管理器安装CodeWF.EventBus.Socket

Install-Package CodeWF.EventBus.Socket

Server use

Running event services

在服务端代码中,创建并启动EventServer实例以监听客户端连接和事件:

using CodeWF.EventBus.Socket;

// 创建事件服务器实例
IEventServer eventServer = new EventServer();

// 启动事件服务器,监听指定IP和端口
eventServer.Start("127.0.0.1", 9100);

Stop event services

当不再需要事件服务时,调用Stop方法以优雅地关闭服务器:

eventServer.Stop();

client uses

Connecting event services

在客户端代码中,创建EventClient实例并连接到事件服务器:

using CodeWF.EventBus.Socket;

// 创建事件客户端实例
IEventClient eventClient = new EventClient();

// 连接到事件服务器,使用eventClient.ConnectStatus检查连接状态
eventClient.Connect("127.0.0.1", 9100));

subscription event

Subscribe to specific types of events and specify event handlers:

eventClient.Subscribe<NewEmailCommand>("event.email.new", ReceiveNewEmailCommand);

private void ReceiveNewEmail(NewEmailCommand command)
{
    // 处理新邮件通知
    Console.WriteLine($"收到新邮件,主题是{message.Subject}");
}

Command

Publish events to the specified topic for processing by subscribed clients:

// 发布新邮件通知事件
eventClient.Publish("event.email.new", new NewEmailCommand { Subject = "恭喜您中GitHub一等奖", Content = "我们很开心,您在2024年7月...", SendTime = new DateTime(2024, 7, 27) });

Query

To query the specified topic, the receiving query terminal needs to subscribe to the same topic (i.e. producer). After receiving the request, the query result will be published in the same topic:

eventClient.Subscribe<EmailQuery>("event.email.query", ReceiveEmailQuery);

private void ReceiveEmailQuery(EmailQuery query)
{
    // 执行查询请求,准备查询结果
    var response = new EmailQueryResponse { Emails = EmailManager.QueryEmail(request.Subject) };

    // 以相同的主题,发布查询结果
    if (_eventClient!.Publish("event.email.query", response,
        out var errorMessage))
    {
        Logger.Info($"Response query result: {response}");
    }
    else
    {
        Logger.Error($"Response query failed: {errorMessage}");
    }
}

Other terminals can use the same subject query (i.e. consumers):

var response = _eventClient!.Query<EmailQuery, EmailQueryResponse>("event.email.query",
    new EmailQuery() { Subject = "Account" },
    out var errorMessage);
if (string.IsNullOrWhiteSpace(errorMessage) && response != null)
{
    Logger.Info($"Query event.email.query, result: {response}");
}
else
{
    Logger.Error(
        $"Query event.email.query failed: [{errorMessage}]");
}

Unsubscribe from events

You can unsubscribe when you no longer need to receive certain types of events:

eventClient.Unsubscribe<NewEmailNotification>("event.email.new", ReceiveNewEmail);

Disconnect event services

完成事件处理或需要断开与服务器的连接时,调用Disconnect方法:

eventClient.Disconnect();
Console.WriteLine("断开与事件服务的连接");

precautions

  • Ensure that the addresses and port numbers used by the server and client are consistent, and that the ports are not occupied by other services.
  • In a production environment, servers should be configured to listen on public IP addresses or appropriate network interfaces.
  • Considering network exceptions and service restarts, the client may need to implement reconnect logic.
  • 根据实际需求,可以扩展EventServerEventClient类以支持更复杂的功能,如消息加密、认证授权等。
Keep Exploring

延伸阅读

更多文章
同分类 / 同标签 4/22/2026

Support for. NET by operating system versions (250707 update)

Use virtual machines and test machines to test the support of each version of the operating system for. NET. After installing the operating system, it is passed by measuring the corresponding running time of the installation and being able to run the Stardust Agent.

继续阅读
同分类 / 同标签 2/7/2026

Summary of experience in using AOT

From the very beginning of project creation, you should develop a good habit of conducting AOT release testing in a timely manner whenever new features are added or newer syntax is used.

继续阅读