Channel 是干什么的#
The System.Threading.Channels namespace provides a set of synchronization data structures for passing data between producers and consumers asynchronously. The library targets .NET Standard and works on all .NET implementations.
Channels are an implementation of the producer/consumer conceptual programming model.
以上是微軟官方的解釋 channels。用中文說的話就是這個類提供了在生產者跟消費者之間異步傳統數據的能力,簡單來說可以認為是一個內存消息隊列。
示例 1#
下面是一個簡單的示例,說明如何使用 Channel 類來創建一個生產者-消費者模型:
static async Task Main(string[] args)
{
var channel = Channel.CreateUnbounded<int>();
var producer = Task.Run(async () =>
{
for (int i = 0; i < 10; i++)
{
await channel.Writer.WriteAsync(i);
await Task.Delay(1000); // 模擬生產者需要一些時間來生成數據
}
channel.Writer.Complete();
});
var consumer = Task.Run(async () =>
{
await foreach (var item in channel.Reader.ReadAllAsync())
{
Console.WriteLine($"消費者接收到: {item}");
}
});
await Task.WhenAll(producer, consumer);
}
在這個例子中,我們創建了一個無界的通道,然后創建了兩個任務,一個是生產者,一個是消費者。生產者每秒生成一個數字,然后寫入通道。消費者從通道中讀取數據并打印出來。當生產者完成寫入后,它會調用 channel.Writer.Complete() 來通知消費者沒有更多的數據可以讀取。
示例 2#
你可以使用 Channel.CreateBounded(capacity) 方法來創建一個有界的通道,其中 capacity 參數指定了通道的容量。當通道滿時,嘗試寫入的操作將會阻塞,直到有空間可用。
static async Task Main(string[] args)
{
var channel = Channel.CreateBounded<int>(5); // 創建一個容量為5的有界通道
var producer = Task.Run(async () =>
{
for (int i = 0; i < 10; i++)
{
await channel.Writer.WriteAsync(i);
Console.WriteLine($"生產者生成了: {i}");
await Task.Delay(1000); // 模擬生產者需要一些時間來生成數據
}
channel.Writer.Complete();
});
var consumer = Task.Run(async () =>
{
await foreach (var item in channel.Reader.ReadAllAsync())
{
Console.WriteLine($"消費者接收到: {item}");
await Task.Delay(2000); // 模擬消費者需要一些時間來處理數據
}
});
await Task.WhenAll(producer, consumer);
}
在這個例子中,我們創建了一個容量為5的有界通道。生產者每秒生成一個數字,然后寫入通道。消費者從通道中讀取數據并打印出來,但消費者處理數據的速度比生產者慢,所以當通道滿時,生產者的 WriteAsync 操作將會阻塞,直到消費者讀取了一些數據,使得通道有空間可用。
示例 3#
下面是一個示例,展示了如何在多個生產者和消費者之間共享一個通道:
static async Task Main(string[] args)
{
var channel = Channel.CreateUnbounded<int>();
// 創建兩個生產者
var producer1 = Produce(channel.Writer, id: 1);
var producer2 = Produce(channel.Writer, id: 2);
// 創建兩個消費者
var consumer1 = Consume(channel.Reader, id: 1);
var consumer2 = Consume(channel.Reader, id: 2);
// 等待所有生產者和消費者完成
await Task.WhenAll(producer1, producer2, consumer1, consumer2);
}
static async Task Produce(ChannelWriter<int> writer, int id)
{
for (int i = 0; i < 10; i++)
{
await writer.WriteAsync(i);
Console.WriteLine($"生產者{id}生成了: {i}");
await Task.Delay(1000); // 模擬生產者需要一些時間來生成數據
}
writer.Complete();
}
static async Task Consume(ChannelReader<int> reader, int id)
{
await foreach (var item in reader.ReadAllAsync())
{
Console.WriteLine($"消費者{id}接收到: {item}");
await Task.Delay(2000); // 模擬消費者需要一些時間來處理數據
}
}
在這個例子中,我們創建了兩個生產者和兩個消費者,它們都共享同一個通道。這是一個非常重要使用模式。因為當我們使用消息隊列的時候往往會有多個生產者跟多個消費者。我們可以通過控制生產者生產的速度來控制推入隊列的數據量。我們還可以通過控制消費者的數量來控制消費數據的速度,從而來調節系統的流量,達到消峰填谷的作用。
Channel 類是 .NET CORE 3.0 后新加入的類。為我們提供了便利的生產者/消費者模式實現方案。相當于是一個進程內的內存隊列,而且它沒有持久化,純內存操作,性能是非常非常高的。當我們面對真正的高并發的時候可以為我們的系統提供吞吐量。當然代價是內存跟放棄一些實時性。
作者:Agile.Zhou
出處:https://www.cnblogs.com/kklldog/p/18201013/channel-in-net
版權:本作品采用「署名-非商業性使用-相同方式共享 4.0 國際」許可協議進行許可。
該文章在 2024/5/27 8:50:42 編輯過