狠狠色丁香婷婷综合尤物/久久精品综合一区二区三区/中国有色金属学报/国产日韩欧美在线观看 - 国产一区二区三区四区五区tv

LOGO OA教程 ERP教程 模切知識(shí)交流 PMS教程 CRM教程 開發(fā)文檔 其他文檔  
 
網(wǎng)站管理員

.NET 高性能緩沖隊(duì)列實(shí)現(xiàn) BufferQueue

freeflydom
2024年8月5日 9:33 本文熱度 1052

前言

BufferQueue 是一個(gè)用 .NET 編寫的高性能的緩沖隊(duì)列實(shí)現(xiàn),支持多線程并發(fā)操作。

項(xiàng)目地址:https://github.com/eventhorizon-cli/BufferQueue

項(xiàng)目是從 mocha 項(xiàng)目中獨(dú)立出來的一個(gè)組件,經(jīng)過修改以提供更通用的緩沖隊(duì)列功能。

目前支持的緩沖區(qū)類型為內(nèi)存緩沖區(qū),后續(xù)會(huì)考慮支持更多類型的緩沖區(qū)。

適用場景

生產(chǎn)者和消費(fèi)者之間的速度不一致,需要并發(fā)批量處理數(shù)據(jù)的場景。

因?yàn)槟壳爸挥袃?nèi)存版本,不適用于不允許數(shù)據(jù)丟失的業(yè)務(wù)場景。

功能說明

支持創(chuàng)建多個(gè) Topic,每個(gè) Topic 可以有多種數(shù)據(jù)類型。每一對(duì) Topic 和數(shù)據(jù)類型對(duì)應(yīng)一個(gè)獨(dú)立的緩沖區(qū)。

 

支持創(chuàng)建多個(gè) Consumer Group,每個(gè) Consumer Group 的消費(fèi)進(jìn)度都是獨(dú)立的。支持多個(gè) Consumer Group 并發(fā)消費(fèi)同一個(gè) Topic。

支持同一個(gè) Consumer Group 創(chuàng)建多個(gè) Consumer,以負(fù)載均衡的方式消費(fèi)數(shù)據(jù)。

支持?jǐn)?shù)據(jù)的批量消費(fèi),可以一次性獲取多條數(shù)據(jù)。

支持 pull 模式和 push 模式兩種消費(fèi)模式。

pull 模式下和 push 模式下都支持 auto commit 和 manual commit 兩種提交方式。auto commit 模式下,消費(fèi)者在收到數(shù)據(jù)后自動(dòng)提交消費(fèi)進(jìn)度,如果消費(fèi)失敗不會(huì)重試。manual commit 模式下,消費(fèi)者需要手動(dòng)提交消費(fèi)進(jìn)度,如果消費(fèi)失敗只要不提交進(jìn)度就可以重試。

需要注意的是,當(dāng)前版本出于簡化實(shí)現(xiàn)的考慮,暫不支持消費(fèi)者的動(dòng)態(tài)擴(kuò)容和縮容,需要在創(chuàng)建消費(fèi)者時(shí)指定消費(fèi)者數(shù)量。

使用示例

安裝 Nuget 包:

dotnet add package BufferQueue

項(xiàng)目基于 Microsoft.Extensions.DependencyInjection,使用時(shí)需要先注冊(cè)服務(wù)。

BufferQueue 支持兩種消費(fèi)模式:pull 模式和 push 模式。


builder.Services.AddBufferQueue(options =>

{

    options.UseMemory(bufferOptions =>

        {

            // 每一對(duì) Topic 和數(shù)據(jù)類型對(duì)應(yīng)一個(gè)獨(dú)立的緩沖區(qū),可以設(shè)置 partitionNumber

            bufferOptions.AddTopic<Foo>("topic-foo1", partitionNumber: 6);

            bufferOptions.AddTopic<Foo>("topic-foo2", partitionNumber: 4);

            bufferOptions.AddTopic<Bar>("topic-bar", partitionNumber: 8);

        })

        // 添加 push 模式的消費(fèi)者

        // 掃描指定程序集中的標(biāo)記了 BufferPushCustomerAttribute 的類,

        // 注冊(cè)為 push 模式的消費(fèi)者

        .AddPushCustomers(typeof(Program).Assembly);

});


// 在 HostedService 中使用 pull模式 消費(fèi)數(shù)據(jù)

builder.Services.AddHostedService<Foo1PullConsumerHostService>();

pull 模式的消費(fèi)者示例:

public class Foo1PullConsumerHostService(

    IBufferQueue bufferQueue,

    ILogger<Foo1PullConsumerHostService> logger) : IHostedService

{

    private readonly CancellationTokenSource _cancellationTokenSource = new();


    public Task StartAsync(CancellationToken cancellationToken)

    {

        var token = CancellationTokenSource

            .CreateLinkedTokenSource(cancellationToken, _cancellationTokenSource.Token)

            .Token;


        var consumers = bufferQueue.CreatePullConsumers<Foo>(

            new BufferPullConsumerOptions

            {

                TopicName = "topic-foo1", GroupName = "group-foo1", AutoCommit = true, BatchSize = 100,

            }, consumerNumber: 4);


        foreach (var consumer in consumers)

        {

            _ = ConsumeAsync(consumer, token);

        }


        return Task.CompletedTask;

    }


    public Task StopAsync(CancellationToken cancellationToken)

    {

        _cancellationTokenSource.Cancel();

        return Task.CompletedTask;

    }


    private async Task ConsumeAsync(IBufferPullConsumer<Foo> consumer, CancellationToken cancellationToken)

    {

        await foreach (var buffer in consumer.ConsumeAsync(cancellationToken))

        {

            foreach (var foo in buffer)

            {

                // Process the foo

                logger.LogInformation("Foo1PullConsumerHostService.ConsumeAsync: {Foo}", foo);

            }

        }

    }

}

push 模式的消費(fèi)者示例:

通過 BufferPushCustomer 特性注冊(cè) push 模式的消費(fèi)者。

push consumer 會(huì)被注冊(cè)到 DI 容器中,可以通過構(gòu)造函數(shù)注入其他服務(wù),可以通過設(shè)置 ServiceLifetime 來控制 consumer 的生命周期。

BufferPushCustomerAttribute 中的 concurrency 參數(shù)用于設(shè)置 push consumer 的消費(fèi)并發(fā)數(shù),對(duì)應(yīng) pull consumer 的 consumerNumber。


[BufferPushCustomer(

    topicName: "topic-foo2",

    groupName: "group-foo2",

    batchSize: 100,

    serviceLifetime: ServiceLifetime.Singleton,

    concurrency: 2)]

public class Foo2PushConsumer(ILogger<Foo2PushConsumer> logger) : IBufferAutoCommitPushConsumer<Foo>

{

    public Task ConsumeAsync(IEnumerable<Foo> buffer, CancellationToken cancellationToken)

    {

        foreach (var foo in buffer)

        {

            logger.LogInformation("Foo2PushConsumer.ConsumeAsync: {Foo}", foo);

        }


        return Task.CompletedTask;

    }

}

[BufferPushCustomer(

    "topic-bar",

    "group-bar",

    100,

    ServiceLifetime.Scoped,

    2)]

public class BarPushConsumer(ILogger<BarPushConsumer> logger) : IBufferManualCommitPushConsumer<Bar>

{

    public async Task ConsumeAsync(IEnumerable<Bar> buffer, IBufferConsumerCommitter committer,

        CancellationToken cancellationToken)

    {

        foreach (var bar in buffer)

        {

            logger.LogInformation("BarPushConsumer.ConsumeAsync: {Bar}", bar);

        }


        var commitTask = committer.CommitAsync();

        if (!commitTask.IsCompletedSuccessfully)

        {

            await commitTask.AsTask();

        }

    }

}

Producer 示例:

通過 IBufferQueue 獲取到指定的 Producer,然后調(diào)用 ProduceAsync 方法發(fā)送數(shù)據(jù)。

[ApiController]

[Route("/api/[controller]")]

public class TestController(IBufferQueue bufferQueue) : ControllerBase

{

    [HttpPost("foo1")]

    public async Task<IActionResult> PostFoo1([FromBody] Foo foo)

    {

        var producer = bufferQueue.GetProducer<Foo>("topic-foo1");

        await producer.ProduceAsync(foo);

        return Ok();

    }


    [HttpPost("foo2")]

    public async Task<IActionResult> PostFoo2([FromBody] Foo foo)

    {

        var producer = bufferQueue.GetProducer<Foo>("topic-foo2");

        await producer.ProduceAsync(foo);

        return Ok();

    }


    [HttpPost("bar")]

    public async Task<IActionResult> PostBar([FromBody] Bar bar)

    {

        var producer = bufferQueue.GetProducer<Bar>("topic-bar");

        await producer.ProduceAsync(bar);

        return Ok();

    }

}

BufferQueue 內(nèi)部設(shè)計(jì)概述

Topic 的隔離

BufferQueue 有以下的特性:

同一個(gè)數(shù)據(jù)類型 下的 不同 Topic 的 BufferQueue 互不干擾。

同一個(gè) Topic 下的 不同數(shù)據(jù)類型 的 BufferQueue 互不干擾。

 

這個(gè)特性是通過以下兩層接口設(shè)計(jì)實(shí)現(xiàn)的:

IBufferQueue:根據(jù) TopicName 和 類型參數(shù) T 將請(qǐng)求轉(zhuǎn)發(fā)給具體的 IBufferQueue<T> 實(shí)現(xiàn)(借助 KeyedService 實(shí)現(xiàn)),其中參數(shù) T 代表 Buffer 所承載的數(shù)據(jù)實(shí)體的類型。

IBufferQueue<T>:具體的 BufferQueue 實(shí)現(xiàn),負(fù)責(zé)管理 Topic 下的數(shù)據(jù)。屬于 Buffer 模塊的內(nèi)部實(shí)現(xiàn),不對(duì)外暴露。

 

Partition 的設(shè)計(jì)

為了保證消費(fèi)速度,BufferQueue 將數(shù)據(jù)劃分為多個(gè) Partition,每個(gè) Partition 都是一個(gè)獨(dú)立的隊(duì)列,每個(gè) Partition 都有一個(gè)對(duì)應(yīng)的消費(fèi)者線程。

Producer 以輪詢的方式往每個(gè) Partition 中寫入數(shù)據(jù)。

Consumer 最多不允許超過 Partition 的數(shù)量,Partition 按平均分配到組內(nèi)每個(gè) Customer 上。

當(dāng)一個(gè) Consumer 被分配了多個(gè) Partition 時(shí),以輪訓(xùn)的方式進(jìn)行消費(fèi)。

每個(gè) Partition 上會(huì)記錄不同消費(fèi)組的消費(fèi)進(jìn)度,不同組之間的消費(fèi)進(jìn)度互不干擾。

 

對(duì)并發(fā)的支持

Producer 支持并發(fā)寫入。

Consumer 消費(fèi)時(shí)是綁定 Partition 的,為保證能正確管理 Partition 的消費(fèi)進(jìn)度,Consumer 不支持并發(fā)消費(fèi)。

如果要增加消費(fèi)速度,需創(chuàng)建多個(gè) Consumer。

Partition 的動(dòng)態(tài)擴(kuò)容

Partition 的基本組成單元是 Segment,Segment 代表保存數(shù)據(jù)的數(shù)組,多個(gè) Segment 通過鏈表的形式組合成一個(gè) Partition。

當(dāng)一個(gè) Segment 寫滿后,通過在其后面追加一個(gè) Segment 實(shí)現(xiàn)擴(kuò)容。

Segment 中用于保存數(shù)據(jù)的數(shù)組的每一個(gè)元素稱為 Slot,每個(gè) Slot 都有一個(gè)Partition 內(nèi)唯一的自增 Offset。

 

Segment 的回收機(jī)制

每次在 Partition 中新增 Segment 時(shí),會(huì)從頭判斷此前的 Segment 是否已經(jīng)被所有消費(fèi)組消費(fèi)完,回收最后一個(gè)消費(fèi)完的 Segment 作為新的 Segment 追加到 Partition 末尾使用。

 

Benchmark

測試環(huán)境:Apple M2 Max 64GB

寫入性能測試

與 BlockingCollection 對(duì)比并發(fā),并發(fā)線程數(shù)為 CPU 邏輯核心數(shù) 12, partitionNumber 為 1 和 12。

測試結(jié)果

 

在并發(fā)寫入時(shí),BufferQueue 的寫入性能明顯優(yōu)于 BlockingCollection。

消費(fèi)性能測試

pull 模式 consumer 與 BlockingCollection 對(duì)比并發(fā)讀取性能,并發(fā)線程數(shù)為 CPU 邏輯核心數(shù) 12,partitionNumber 為 12。

測試結(jié)果

 

在批量消費(fèi)時(shí),隨著批量大小的增加,BufferQueue 的消費(fèi)性能優(yōu)勢更加明顯。


轉(zhuǎn)自https://www.cnblogs.com/eventhorizon/p/18331018


該文章在 2024/8/5 9:51:53 編輯過
關(guān)鍵字查詢
相關(guān)文章
正在查詢...
點(diǎn)晴ERP是一款針對(duì)中小制造業(yè)的專業(yè)生產(chǎn)管理軟件系統(tǒng),系統(tǒng)成熟度和易用性得到了國內(nèi)大量中小企業(yè)的青睞。
點(diǎn)晴PMS碼頭管理系統(tǒng)主要針對(duì)港口碼頭集裝箱與散貨日常運(yùn)作、調(diào)度、堆場、車隊(duì)、財(cái)務(wù)費(fèi)用、相關(guān)報(bào)表等業(yè)務(wù)管理,結(jié)合碼頭的業(yè)務(wù)特點(diǎn),圍繞調(diào)度、堆場作業(yè)而開發(fā)的。集技術(shù)的先進(jìn)性、管理的有效性于一體,是物流碼頭及其他港口類企業(yè)的高效ERP管理信息系統(tǒng)。
點(diǎn)晴WMS倉儲(chǔ)管理系統(tǒng)提供了貨物產(chǎn)品管理,銷售管理,采購管理,倉儲(chǔ)管理,倉庫管理,保質(zhì)期管理,貨位管理,庫位管理,生產(chǎn)管理,WMS管理系統(tǒng),標(biāo)簽打印,條形碼,二維碼管理,批號(hào)管理軟件。
點(diǎn)晴免費(fèi)OA是一款軟件和通用服務(wù)都免費(fèi),不限功能、不限時(shí)間、不限用戶的免費(fèi)OA協(xié)同辦公管理系統(tǒng)。
Copyright 2010-2025 ClickSun All Rights Reserved