** Distributed event bus implemented using Socket, supports CQRS, and does not rely on third-party MQ. **
CodeWF.EventBus.Socket 是一个轻量级的、基于 Socket 的分布式事件总线系统,旨在简化分布式架构中的事件通信。它允许进程之间通过发布/订阅模式进行通信,无需依赖外部消息队列服务。
Command

Query

characteristics
** Lightweight **: Does not rely on any external MQ services, reducing system complexity and dependence.
** High performance **: Socket based direct communication provides low-latency, high-throughput message delivery.
** Flexibility **: Supports custom event types and message processors, making it easy to integrate into existing systems.
** Extensibility **: Supports multi-client connections and is suitable for distributed system environments.
communication protocol
通过 TCP 协议进行数据交互,协议包结构如下:

installation
通过NuGet包管理器安装CodeWF.EventBus.Socket:
Install-Package CodeWF.EventBus.Socket
Server use
Running event services
在服务端代码中,创建并启动EventServer实例以监听客户端连接和事件:
using CodeWF.EventBus.Socket;
// 创建事件服务器实例
IEventServer eventServer = new EventServer();
// 启动事件服务器,监听指定IP和端口
eventServer.Start("127.0.0.1", 9100);
Stop event services
当不再需要事件服务时,调用Stop方法以优雅地关闭服务器:
eventServer.Stop();
client uses
Connecting event services
在客户端代码中,创建EventClient实例并连接到事件服务器:
using CodeWF.EventBus.Socket;
// 创建事件客户端实例
IEventClient eventClient = new EventClient();
// 连接到事件服务器,使用eventClient.ConnectStatus检查连接状态
eventClient.Connect("127.0.0.1", 9100));
subscription event
Subscribe to specific types of events and specify event handlers:
eventClient.Subscribe<NewEmailCommand>("event.email.new", ReceiveNewEmailCommand);
private void ReceiveNewEmail(NewEmailCommand command)
{
// 处理新邮件通知
Console.WriteLine($"收到新邮件,主题是{message.Subject}");
}
Command
Publish events to the specified topic for processing by subscribed clients:
// 发布新邮件通知事件
eventClient.Publish("event.email.new", new NewEmailCommand { Subject = "恭喜您中GitHub一等奖", Content = "我们很开心,您在2024年7月...", SendTime = new DateTime(2024, 7, 27) });
Query
To query the specified topic, the receiving query terminal needs to subscribe to the same topic (i.e. producer). After receiving the request, the query result will be published in the same topic:
eventClient.Subscribe<EmailQuery>("event.email.query", ReceiveEmailQuery);
private void ReceiveEmailQuery(EmailQuery query)
{
// 执行查询请求,准备查询结果
var response = new EmailQueryResponse { Emails = EmailManager.QueryEmail(request.Subject) };
// 以相同的主题,发布查询结果
if (_eventClient!.Publish("event.email.query", response,
out var errorMessage))
{
Logger.Info($"Response query result: {response}");
}
else
{
Logger.Error($"Response query failed: {errorMessage}");
}
}
Other terminals can use the same subject query (i.e. consumers):
var response = _eventClient!.Query<EmailQuery, EmailQueryResponse>("event.email.query",
new EmailQuery() { Subject = "Account" },
out var errorMessage);
if (string.IsNullOrWhiteSpace(errorMessage) && response != null)
{
Logger.Info($"Query event.email.query, result: {response}");
}
else
{
Logger.Error(
$"Query event.email.query failed: [{errorMessage}]");
}
Unsubscribe from events
You can unsubscribe when you no longer need to receive certain types of events:
eventClient.Unsubscribe<NewEmailNotification>("event.email.new", ReceiveNewEmail);
Disconnect event services
完成事件处理或需要断开与服务器的连接时,调用Disconnect方法:
eventClient.Disconnect();
Console.WriteLine("断开与事件服务的连接");
precautions
- Ensure that the addresses and port numbers used by the server and client are consistent, and that the ports are not occupied by other services.
- In a production environment, servers should be configured to listen on public IP addresses or appropriate network interfaces.
- Considering network exceptions and service restarts, the client may need to implement reconnect logic.
- 根据实际需求,可以扩展
EventServer和EventClient类以支持更复杂的功能,如消息加密、认证授权等。