プログラミング言語.NET プロセス内キュー Channel の入門と応用

プログラミング言語.NET プロセス内キュー Channel の入門と応用

Channel は Microsoft が .NET Core 3.0 以降で導入した新しいコレクション型で、System.Threading.Channels 名前空間にあります。非同期API、高性能、スレッドセーフなどの特徴を持っています。

最終更新 2023/12/23 20:12
素履独行
読了目安 9 分
カテゴリ
.NET
タグ
.NET C# セキュリティ

最近、ブロガーは FakeRPCWebSocket プロトコルのサポートを追加しました。これにより、全二重通信の特性を活かして、1つの接続要求内で複数のデータを送信できるようになります。FakeRPC の現時点での最大の欠点は、TCP/IP プロトコルではなく HTTP プロトコル上に構築されていることです。したがって、WebSocket プロトコルを検討するのは、主に JSON-RPC の実現可能性を検証し、今後サポートする予定の TCP/IP プロトコルへの布石とするためです。お気づきでないかもしれませんが、これらの概念は複雑に絡み合っています。しかし、各 RPC 呼び出しを一連のメッセージとして捉えれば、このやや古めかしい RPC というものをより深く理解できるのではないでしょうか。FakeRPC を作成する過程で、私は .NET の新しいデータ構造 Channel を使用してメッセージの転送を実装しました。サーバー側を例にとると、各 RPC リクエストは CallInvoker で処理された後、RPC レスポンスの結果がすぐにクライアントに返されるわけではなく、バックグラウンドスレッドが Channel からメッセージを取り出してからクライアントに返送されます。では、なぜブロガーは遠回りな方法を選んだのでしょうか?この記事でその答えをお伝えできればと思います。

Channel 入門

Channel は、マイクロソフトが .NET Core 3.0 以降で導入した新しいコレクション型です。この型は System.Threading.Channels 名前空間に属し、非同期 API、高性能、スレッドセーフなどの特徴を持っています。現在、Channel の主な応用シナリオはプロデューサー・コンシューマーモデルです。下図のように、プロデューサーがキューにデータを書き込み、コンシューマーがキューからデータを読み取ります。これを基に、プロデューサーやコンシューマーの数を増やすことで、このモデルをさらに拡張できます。普段使用している RabbitMQKafka も、特定の領域におけるプロデューサー・コンシューマーモデルの応用と見なすことができ、広義の読み書き分離の考え方を読み取ることもできます。

プロデューサー・コンシューマーモデル概略図プロデューサー・コンシューマーモデル概略図

ロマン・ロランはかつて言いました、世の中にはただ一つの真の英雄主義しかない。それは、人生の真実を見抜いた上で、なお人生を愛することだ。今、この概略図を見ながら何かを考え、次のようなやり方を思いつくかもしれません。

class Producer<T>
{
    private readonly Queue<T> _queue;
    public Producer(Queue<T> queue) { _queue = queue; }
}

class Consumer<T>
{
    private readonly Queue<T> _queue;
    public Consumer(Queue<T> queue) { _queue = queue; }
}

この考え方は理論上は問題ないと認めますが、実際に運用しようとすると問題点が山積みです。たとえば、プロデューサーは書き込みだけ、コンシューマーは読み取りだけを担当すべきですが、キューを直接渡してしまうと、その責務の純粋性を維持するのは困難です。さらに、キューを使用する過程で、プロデューサーはキューが「満杯」になる心配をし、コンシューマーはキューが「空」になる悩みを抱えます。また、複数のプロデューサー、複数のコンシューマー、マルチスレッド/ロックなどの要素を考慮すると、これは決して簡単な問題ではありません。この問題を解決するために、マイクロソフトは BlockingCollectionBufferBlock という2つのデータ構造を追加しました。ここでは前者を例に、典型的なプロデューサー・コンシューマーモデルを示します。

var bc = new BlockingCollection<int>();

// プロデューサー
var producer = Task.Run(() => {
    for (var i = 0; i < Count; i++) {
        bc.Add(i);
        Console.WriteLine("Producer Write Item: {0}", i);
    }
    bc.CompleteAdding();
});

// コンシューマー
var consumer = Task.Run(() => {
    while (!bc.IsCompleted) {
        if (bc.TryTake(out var item)) {
            Console.WriteLine("Consumer Read Item: {0}", item);
        }
    }
});

await Task.WhenAll(producer, consumer);

これで、プロデューサー・コンシューマーモデルを実装する難易度はほぼゼロになったことに気付くでしょう。同時に、BlockingCollection<T>BufferBlock<T> はどちらもスレッドセーフなコレクションであり、マルチスレッド環境でも安心して使用できます。過去の経験を振り返ると、スレッドシグナルを使ってスレッド同期を行う必要があるたびに、バグの境界線を慎重に歩かなければなりませんでした。では、既に BlockingCollection<T>BufferBlock<T> のようなデータ構造があるのに、なぜ Channel が必要なのでしょうか?ごく普通のプログラマーとして、数え切れないほどのバグが時間とともに消えていき、かつて頭を悩ませた問題も絶えず新たな答えで更新されています。

BlockingCollection、BufferBlock、Channel のパフォーマンス比較BlockingCollection、BufferBlock、Channel のパフォーマンス比較

図のように、10000件のデータを読み書きするシナリオで3つのデータ構造のパフォーマンスをテストしました。明らかに Channel のパフォーマンスが最も優れています。これが理由にならないでしょうか?パフォーマンスほど興奮させるものは他にありません。あなたのPCのグラフィックカードがアサシン クリードの「神話三部作」を楽しめず、ローカルでStable Diffusionを動かすことさえ夢のまた夢であるなら、このわずかなパフォーマンス最適化こそ、ムーアの法則が無効になると予言される時代に残された数少ない匠の技であることを認めざるを得ません。

// 有限容量の Channel を作成
var boundedChannel = Channel.CreateBounded<int>(100);

// 無限容量の Channel を作成
var unboundedChannel = Channel.CreateUnbounded<string>();

さて、Channel についてどこから話し始めましょうか?Channel の作成は非常に簡単で、有限容量の Channel を作成する場合を除けば特に難しいことはありません。最初に提起した問題を覚えていますか?プロデューサー・コンシューマーモデルでは、容量に制限のある固定キューでは、必然的にキューが「満杯」になる状況が発生します。その場合、何らかの戦略やメカニズムを策定してモデル全体を補完する必要があります。この問題に対する Channel の解決策は BoundedChannelFullMode です。

var boundedChannel = Channel.CreateBounded<string>(
    new BoundedChannelOptions(100) {
        FullMode = BoundedChannelFullMode.Wait
});

これは列挙型で、実際には WaitDropNewestDropOldestDropWrite の4つの値を持ち、デフォルトは Wait です。各値の意味は以下のとおりです。

  • Wait:キューが満杯の場合、データ書き込み時に false を返し、キュー内に空きができるまで待機します。
  • DropNewest:最新のデータを削除します。つまり、キューの末尾から要素を削除します。
  • DropOldest:最も古いデータを削除します。つまり、キューの先頭から要素を削除します。
  • DropWrite:データの書き込みは可能ですが、データはすぐに破棄されます。

キューが「満杯」または「空」の問題に加えて、マルチスレッド環境におけるプロデューサー・コンシューマーモデルの問題も考慮しました。幸いなことに、Channel はマルチスレッドをネイティブでサポートしており、ChannelOptionsSingleWriterSingleReader を使用して、Channel が単一のコンシューマーまたはプロデューサーであるかどうかを指定できます。デフォルトでは両方とも false です。

var boundedChannel = Channel.CreateBounded<string>(
    new BoundedChannelOptions(100) {
        SingleWriter = true,
        SingleReader = false,
        FullMode = BoundedChannelFullMode.Wait
});

例えば、上記のコードスニペットを使用すると、単一プロデューサー・複数コンシューマーの Channel を作成できます。Channel にとって最も重要な2つのメンバーは WriterReader です。前者はプロデューサーに対応し、型は ChannelWriter<T>、後者はコンシューマーに対応し、型は ChannelReader<T> です。今回は、真の意味での読み書き分離を実現しました。

// プロデューサーがデータを生成
channel.Writer.TryWrite("大漠孤烟直、長河落日円。");

// コンシューマーがデータを消費
// モード1:一度に1つずつ読む
while (await channel.Reader.WaitToReadAsync())
{
    while (channel.Reader.TryRead(out var item))
    {
        // ここに具体的な処理ロジックを記述
    }
}

// モード2:一度にすべて読み出す
while (await channel.Reader.WaitToReadAsync())
{
    await foreach (var item in channel.Reader.ReadAllAsync())
    {
        // ここに具体的な処理ロジックを記述
    }
}

この話題にまだ物足りなさを感じるかもしれませんが、残念ながらこれが Channel の最も核となる使い方です。いかがでしょうか、非常にシンプルだと感じませんか?これはマイクロソフトの一貫したスタイル、つまり「複雑なものをシンプルで使いやすくする」という方針に沿っています。Channel の詳細についてはここでは触れませんので、公式ドキュメントをご参照ください。MSDN と MDN は私がこれまでに見た中で最もよく書かれたドキュメントだと断言します。

Channel の応用

さて、Channel について基本的なイメージがつかめたところで、具体的なシナリオでの応用を見ていきましょう。この記事の冒頭で紹介した FakeRPC に話を戻します。WebSocket プロトコルのサポートを追加する際に、最初に直面した問題は、メソッド呼び出しを WebSocket 通信にどのようにマッピングするかでした。通常の HTTP プロトコルはリクエスト・レスポンスモデルに従うため、メソッド呼び出しと自然に関連付けることができます。しかし、全二重通信の WebSocket プロトコルに視点を切り替えると、この問題は突然興味深いものになります。ご存知のように、WebSocket の場合、最初の接続は通常の HTTP プロトコルですが、接続が確立されると HTTP プロトコルは不要になります。この時点で、双方のやり取りは双方向になります。最終的に、FakeRPC は以下のスキームで WebSocket プロトコルをサポートすることにしました。

FakeRPC が WebSocket プロトコルをサポートする方法FakeRPC が WebSocket プロトコルをサポートする方法

このスキームでは、CallInvoker がリクエスト処理を実際に担当する中核コンポーネントです。クライアント側では、主にリクエストのメソッドとパラメータを FakeRpcRequest に組み立て、ClientWebSocket インスタンスの SendAsync() メソッドを呼び出してサーバーにメッセージを送信します。さらに、サーバーからメッセージを受信する必要もあります。各メッセージには Id が含まれているため、どのメッセージが自分への応答であるかを簡単に識別できます。これに基づき、ブロガーはバックグラウンドスレッドを使用して Channel からメッセージを読み取ります。これにより、メッセージの送信と受信は実際には異なる2つのスレッドで動作します。サーバー側でもメッセージの処理は似ていますが、違いは、サーバーが Channel からメッセージを読み取るのはクライアントに送信するためであり、クライアントが Channel からメッセージを読み取るのは結果をプロキシクラスに渡すためです。以下のコードは、上記で言及したクライアント実装の一部を示しています。

// クライアントがメッセージを送信
private async Task SendMessage(FakeRpcRequest request) {
    var payload = await _messageSerializer.SerializeAsync<FakeRpcRequest>(request);
    await _webSocket.SendAsync(new ArraySegment<byte>(payload), WebSocketMessageType.Binary, true, CancellationToken.None);
    OnMessageSent?.Invoke(_webSocket, request);
}

// クライアントが Channel からメッセージを読み取り
private async Task ReadMessagesFromQueue() {
    try {
        while (await _messagesToReadQueue.Reader.WaitToReadAsync()) {
            while (_messagesToReadQueue.Reader.TryRead(out var message)) {
                try {
                    var response = await _messageSerializer.DeserializeAsync<FakeRpcResponse>(message.Array);
                    OnMessageReceived?.Invoke(_webSocket, response);
                } catch (Exception e) {
                    _logger.LogError(e, $"Failed to send message due to {e.Message}");
                }
            }
        }
    }
    catch (TaskCanceledException) { }
    catch (OperationCanceledException) { }
    catch (Exception e) {
        _logger.LogError(e, $"Restart listen message queue due to {e.Message}");
        ListenMessageQueue();
    }
}

もちろん、ClientWebSocket インスタンスがメッセージを受信すると、実際にはメッセージを Channel に書き込みます。ある意味では、CallInvoker はプロデューサーとコンシューマーの両方の役割を同時に担い、プロデューサーとコンシューマーは2つの異なるスレッドで動作します。

var bytes = stream.ToArray();
var response = await _messageSerializer.DeserializeAsync<FakeRpcResponse>(bytes);
_logger?.LogInformation("Send response to {0}/{1}, payload:{3}", request.ServiceName, request.MethodName, response.Result);
_messagesToReadQueue.Writer.TryWrite(new ArraySegment<byte>(bytes));

これにより、動的プロキシを利用して、RPC インターフェースを非常に簡単に呼び出すことができ、WebSocket プロトコル上で長い接続として動作します。

var _clientFactory = serviceProvider.GetService<FakeRpcClientFactory>();

// GreetService を呼び出し
var greetProxy = _clientFactory.Create<IGreetService>(new Uri("ws://localhost:5000"), FakeRpcTransportProtocols.WebSocket, FakeRpcMediaTypes.Default);
var reply = await greetProxy.SayHello(new HelloRequest() { Name = "张三" });
reply = await greetProxy.SayWho();

// ICalculatorService を呼び出し
var calculatorProxy = _clientFactory.Create<ICalculatorService>(new Uri("ws://localhost:5000"), FakeRpcTransportProtocols.WebSocket, FakeRpcMediaTypes.Default);
var result = calculatorProxy.Random();

注目すべき点は、転送プロトコルだけでなくメッセージプロトコルも抽象化しており、以前と同じように自由に MessagePackProtobuf を使用できることです。Channel が本当にパフォーマンス向上に役立つことを証明するために、FakeRPC のベンチマークテスト結果を以下に示します。

FakeRPC の異なる通信プロトコル、メッセージプロトコルのパフォーマンス比較FakeRPC の異なる通信プロトコル、メッセージプロトコルのパフォーマンス比較

HTTP プロトコルでも WebSocket プロトコルでも、MessagePack は常に優れたパフォーマンスを発揮しています。これにより、TCP/IP プロトコルでもこの伝説を継続できるかどうか期待が高まります。数日前、TCP/IP プロトコル用のバイナリメッセージ定義を完了しました。シリアライズとデシリアライズが IMessageSerializer インターフェースに抽象化されて以来、より多くのメッセージプロトコルをサポートする機会が増えています。Ubisoft が新作『アサシン クリード ミラージュ』を正式発表したことで、FakeRPC という名前に対する私の理解は、アサシンブラザーフッドの理念、すなわち「Nothing is true(すべては虚構である)」へとつながっています。なぜなら、ある意味では、RPC は複雑で曲がりくねった中間プロセスを隠蔽し、ローカルメソッドを呼び出すかのようにリモートメソッドを呼び出せるという錯覚を生み出すものだからです。バイナリメッセージプロトコルを設計しているときでさえ、私は結局 HTTP プロトコルを再発明しているだけだと気づきました。では、これに意味はあるのでしょうか?もちろんあります!人生、楽しければそれでいいのです!

var buffer = new BufferBlock<int>();

// Producer
async static Task Producer(IEnumerable<int> values) {
    foreach (var value in values) {
        await buffer.SendAsync(value);
    }

    buffer.Complete();
}

// Consumer
async static Task Consumer(Action<int> process) {
    while (await buffer.OutputAvailableAsync()) {
        process?.Invoke(await buffer.ReceiveAsync());
    }
}

var range = Enumerable.Range(0, 100);
await Task.WhenAll(Producer(range), Consumer(n => Console.WriteLine(n)));

BufferBlock は Microsoft の TPL DataFlow の重要なコンポーネントの1つです。その基本思想は、データフローが次々とデータブロックで構成され、1つのブロックが処理を完了すると次のブロックにリンクされるというものです。各ブロックは、1つ以上のソースからのデータをメッセージとして受信し、バッファリングします。ブロックが情報を受信すると、入力に反応し、同時にそのブロックの出力は次のブロックに渡されます。要するに、BufferBlock の他にも ActionBlockTransformBlockBroadcastBlock などのブロックがあります。ここで BufferBlock に言及する最大の理由は、それがプロデューサー・コンシューマーモデルを採用しており、BlockingCollectionBufferBlockChannel が .NET の異なる段階を表しているからです。そして、それぞれの段階のあなたを思い返すと、これはきっと感慨深い話になるでしょう!この考え方を応用して、Channel の新しい使い方を見つけました。

// GetFiles
Task<Channel<string>> GetFiles(string root) {
    var filePathChannel = Channel.CreateUnbounded<string>();
    var directoryInfo = new DirectoryInfo(root);

    foreach (var file in directoryInfo.EnumerateFileSystemInfos()) {
        filePathChannel.Writer.TryWrite(file.FullName);
    }

    filePathChannel.Writer.Complete();
    return Task.FromResult(filePathChannel);
}

// Analyse
async Task<Channel<string>[]> Analyse(Channel<string> rootChannel) {
    var counterChannel = Channel.CreateUnbounded<string>();
    var errorsChannel = Channel.CreateUnbounded<string>();

    while (await rootChannel.Reader.WaitToReadAsync()) {
        await foreach (var filePath in rootChannel.Reader.ReadAllAsync()) {
            var fileInfo = new FileInfo(filePath);
            if (fileInfo.Extension == ".md") {
                var totalWords = File.ReadAllText(filePath).Length;
                counterChannel.Writer.TryWrite($"文章 [{fileInfo.Name}] 共 {totalWords} 个字符.");
            } else {
                errorsChannel.Writer.TryWrite($"路径 [{filePath}] 是文件夹或者格式不正确.");
            }
        }
    }

    counterChannel.Writer.Complete();
    errorsChannel.Writer.Complete();

    return new Channel<string>[] { counterChannel, errorsChannel };
}

// Merge
async Task<Channel<string>> Merge(params Channel<string>[] channels) {
    var mergeTasks = new List<Task>();
    var outputChannel = Channel.CreateUnbounded<string>();

    foreach (var channel in channels) {
        var thisChannel = channel;
        var mergeTask = Task.Run(async () => {
            while (await thisChannel.Reader.WaitToReadAsync()) {
                await foreach (var item in thisChannel.Reader.ReadAllAsync()) {
                    outputChannel.Writer.TryWrite(item);
                }
            }
        });

        mergeTasks.Add(mergeTask);
    }

    await Task.WhenAll(mergeTasks);
    outputChannel.Writer.Complete();

    return outputChannel;
}

// Run
var filePathChannel = await GetFiles(@"/hugo-blog/content/posts/");
var analysedChannels = await Analyse(filePathChannel);
var mergedChannel = await Merge(analysedChannels);
while (await mergedChannel.Reader.WaitToReadAsync()) {
    await foreach (var item in mergedChannel.Reader.ReadAllAsync()) {
        Console.WriteLine(item);
    }
}

さて、これら3つのメソッドは何をしているのでしょうか?個人的には、これは上で述べたデータフローそのものだと思います。まず、GetFiles() メソッドで指定ディレクトリ内のファイル情報を取得します。次に、これらの情報を Analyse() メソッドで処理します。ここでは、markdown 形式のファイルの文字数をカウントし、markdown 形式以外のファイルやサブディレクトリをふるい落とします。最後に、Merge() 関数で前のステップの結果を集約して出力します。図で表すと、以下のようなフローになります。

Channel を利用したデータフローパターンChannel を利用したデータフローパターン

ある意味では、これは「分割統治」戦略です。つまり、大きなタスクを複数の小さなタスクに分解し、それらの小さなタスクの結果を結合するというものです。何年も前、並列プログラミングに関する本で同じようなコード片を見たことがあります。その頃、私はすでに Google の MapReduce について少し聞いており、その後 Parallel にも触れました。そして、もし Map()Reduce() の2つの関数がリモートサーバー上で動作するとしたら、このプロセスは RPC と見なすことができ、リモートサーバー上で動作するこれらの関数は並列に何らかの計算を実行していることになります。そのプロセスは並列計算と見なせます。そして、これらの並列計算が世界中のスケーラブルな計算リソースを使用する場合、そのプロセスはまさにクラウドコンピューティングです。このように、執筆というプロセスはなかなか面白いものだと思いませんか?

本記事のまとめ

何年も前、Wesley 先輩がログをデータベースに書き込む方法について話していました。当時、私たちはまだ Elasticsearch という言葉を聞いたことがありませんでした。そのため、当時私たちが考えた解決策は、BlockingCollection を使ってブロッキングキューを作ることでした。つまり、NLogLog4Net からログを取得した後、それらのログをすべて BlockingCollection に入れ、その後データベースや何らかの出力先に書き込むことを検討するというものです。その後、私は NLogTargetSerilogSink に触れるようになり、これらがどのように動作するのかを大体理解しました。これらのログコンポーネントはログを異なる場所に出力することもサポートしています。ある意味、この認識の変化は、過去の未熟な思考が時代遅れになったことを示しているのかもしれません。しかし、まさにその未熟な思考こそが、物事に対する認識を絶えず更新するよう促してくれます。言い換えれば、私たちの人生は無数の過去から成り立っており、かつてどんなに情けなく、落ち込み、やる気を失っていても、それらは全力で「私は確かにこの世界に存在した」と教えてくれます。今日、より使いやすい Channel が登場したからといって、過去の思考が無意味だったわけではありません。少なくともプロデューサー・コンシューマーモデルについて話すとき、私は Wesley 先輩や BlockingCollection を思い出します。これはおそらく、記憶力が良すぎる私に対する時間の応答のようなものです。過去を懐かしむことはできても、すべてが最初のままであることを期待してはいけません。この世界で、あなたは自分にできることだけをすればいいのです。

さらに探索

関連読書

その他の記事
同じカテゴリ / 同じタグ 2026/04/22

各OSバージョンの.NETサポート状況(250707更新)

仮想マシンとテストマシンを使用して、各OSバージョンの.NETサポート状況を確認します。OSインストール後、対応するランタイムをインストールし、Stardustエージェントを実行できることを確認します(合格条件)。

続きを読む
同じカテゴリ / 同じタグ 2026/02/07

AOTの使用経験のまとめ

プロジェクト作成当初から、新機能を追加したり新しい構文を使用したりした場合には、すぐにAOT公開テストを実施するという良い習慣を身につけるべきです。

続きを読む