狠狠色丁香婷婷综合尤物/久久精品综合一区二区三区/中国有色金属学报/国产日韩欧美在线观看 - 国产一区二区三区四区五区tv

LOGO OA教程 ERP教程 模切知識交流 PMS教程 CRM教程 開發(fā)文檔 其他文檔  
 
網(wǎng)站管理員

NETCore中實現(xiàn)一個輕量無負(fù)擔(dān)的極簡任務(wù)調(diào)度ScheduleTask

freeflydom
2024年5月25日 11:54 本文熱度 1141

至于任務(wù)調(diào)度這個基礎(chǔ)功能,重要性不言而喻,大多數(shù)業(yè)務(wù)系統(tǒng)都會用到,世面上有很多成熟的三方庫比如Quartz,Hangfire,Coravel
這里我們不討論三方的庫如何使用 而是從0開始自己制作一個簡易的任務(wù)調(diào)度,如果只是到分鐘級別的粒度基本夠用

技術(shù)棧用到了:BackgroundServiceNCrontab

第一步我們定義一個簡單的任務(wù)約定,不干別的就是一個執(zhí)行方法:

    public interface IScheduleTask

    {

        Task ExecuteAsync();

    }

    public abstract class ScheduleTask : IScheduleTask

    {

        public virtual Task ExecuteAsync()

        {

            return Task.CompletedTask;

        }

    }


第二步定義特性標(biāo)注任務(wù)執(zhí)行周期等信的metadata

    [AttributeUsage(AttributeTargets.Class, AllowMultiple = true, Inherited = false)]

    public class ScheduleTaskAttribute(string cron) : Attribute

    {

        /// <summary>

        /// 支持的cron表達(dá)式格式 * * * * *:https://en.wikipedia.org/wiki/Cron

        /// 最小單位為分鐘

        /// </summary>

        public string Cron { get; set; } = cron;

        public string? Description { get; set; }

        /// <summary>

        /// 是否異步執(zhí)行.默認(rèn)false會阻塞接下來的同類任務(wù)

        /// </summary>

        public bool IsAsync { get; set; } = false;

        /// <summary>

        /// 是否初始化即啟動,默認(rèn)false

        /// </summary>

        public bool IsStartOnInit { get; set; } = false;

    }

   

第三步我們定義一個調(diào)度器約定,不干別的就是判斷當(dāng)前的任務(wù)是否可以執(zhí)行:

    public interface IScheduler

    {

        /// <summary>

        /// 判斷當(dāng)前的任務(wù)是否可以執(zhí)行

        /// </summary>

        bool CanRun(ScheduleTaskAttribute scheduleMetadata, DateTime referenceTime);

    }

好了,基礎(chǔ)步驟就完成了,如果我們需要實現(xiàn)配置級別的任務(wù)調(diào)度或者動態(tài)的任務(wù)調(diào)度 那我們再抽象一個Store:

    public class ScheduleTaskMetadata(Type scheduleTaskType, string cron)

    {

        public Type ScheduleTaskType { get; set; } = scheduleTaskType;

        public string Cron { get; set; } = cron;

        public string? Description { get; set; }

        public bool IsAsync { get; set; } = false;

        public bool IsStartOnInit { get; set; } = false;

    }

    public interface IScheduleMetadataStore

    {

        /// <summary>

        /// 獲取所有ScheduleTaskMetadata

        /// </summary>

        Task<IEnumerable<ScheduleTaskMetadata>> GetAllAsync();

    }

實現(xiàn)一個Configuration級別的Store

    internal class ConfigurationScheduleMetadataStore(IConfiguration configuration) : IScheduleMetadataStore

    {

        const string Key = "BiwenQuickApi:Schedules";


        public Task<IEnumerable<ScheduleTaskMetadata>> GetAllAsync()

        {

            var options = configuration.GetSection(Key).GetChildren();


            if (options?.Any() is true)

            {

                var metadatas = options.Select(x =>

                {

                    var type = Type.GetType(x[nameof(ConfigurationScheduleOption.ScheduleType)]!);

                    if (type is null)

                        throw new ArgumentException($"Type {x[nameof(ConfigurationScheduleOption.ScheduleType)]} not found!");


                    return new ScheduleTaskMetadata(type, x[nameof(ConfigurationScheduleOption.Cron)]!)

                    {

                        Description = x[nameof(ConfigurationScheduleOption.Description)],

                        IsAsync = string.IsNullOrEmpty(x[nameof(ConfigurationScheduleOption.IsAsync)]) ? false : bool.Parse(x[nameof(ConfigurationScheduleOption.IsAsync)]!),

                        IsStartOnInit = string.IsNullOrEmpty(x[nameof(ConfigurationScheduleOption.IsStartOnInit)]) ? false : bool.Parse(x[nameof(ConfigurationScheduleOption.IsStartOnInit)]!),

                    };

                });

                return Task.FromResult(metadatas);

            }

            return Task.FromResult(Enumerable.Empty<ScheduleTaskMetadata>());

        }

    }


然后呢,我們可能需要多任務(wù)調(diào)度的事件做一些操作或者日志存儲.比如失敗了該干嘛,完成了回調(diào)其他后續(xù)業(yè)務(wù)等.我們再來定義一下具體的事件IEvent,具體可以參考我上一篇文章:
https://www.cnblogs.com/vipwan/p/18184088

    public abstract class ScheduleTaskEvent(IScheduleTask scheduleTask, DateTime eventTime) : IEvent

    {

        /// <summary>

        /// 任務(wù)

        /// </summary>

        public IScheduleTask ScheduleTask { get; set; } = scheduleTask;

        /// <summary>

        /// 觸發(fā)時間

        /// </summary>

        public DateTime EventTime { get; set; } = eventTime;

    }

    /// <summary>

    /// 執(zhí)行完成

    /// </summary>

    public sealed class TaskSuccessedEvent(IScheduleTask scheduleTask, DateTime eventTime, DateTime endTime) : ScheduleTaskEvent(scheduleTask, eventTime)

    {

        /// <summary>

        /// 執(zhí)行結(jié)束的時間

        /// </summary>

        public DateTime EndTime { get; set; } = endTime;

    }

    /// <summary>

    /// 執(zhí)行開始

    /// </summary>

    public sealed class TaskStartedEvent(IScheduleTask scheduleTask, DateTime eventTime) : ScheduleTaskEvent(scheduleTask, eventTime);

    /// <summary>

    /// 執(zhí)行失敗

    /// </summary>

    public sealed class TaskFailedEvent(IScheduleTask scheduleTask, DateTime eventTime, Exception exception) : ScheduleTaskEvent(scheduleTask, eventTime)

    {

        /// <summary>

        /// 異常信息

        /// </summary>

        public Exception Exception { get; private set; } = exception;

    }

接下來我們再實現(xiàn)基于NCrontab的簡易調(diào)度器,這個調(diào)度器主要是解析Cron表達(dá)式判斷傳入時間是否可以執(zhí)行ScheduleTask,具體的代碼:

    internal class SampleNCrontabScheduler : IScheduler

    {

        /// <summary>

        /// 暫存上次執(zhí)行時間

        /// </summary>

        private static ConcurrentDictionary<ScheduleTaskAttribute, DateTime> LastRunTimes = new();


        public bool CanRun(ScheduleTaskAttribute scheduleMetadata, DateTime referenceTime)

        {

            var now = DateTime.Now;

            var haveExcuteTime = LastRunTimes.TryGetValue(scheduleMetadata, out var time);

            if (!haveExcuteTime)

            {

                var nextStartTime = CrontabSchedule.Parse(scheduleMetadata.Cron).GetNextOccurrence(referenceTime);

                LastRunTimes.TryAdd(scheduleMetadata, nextStartTime);


                //如果不是初始化啟動,則不執(zhí)行

                if (!scheduleMetadata.IsStartOnInit)

                    return false;

            }

            if (now >= time)

            {

                var nextStartTime = CrontabSchedule.Parse(scheduleMetadata.Cron).GetNextOccurrence(referenceTime);

                //更新下次執(zhí)行時間

                LastRunTimes.TryUpdate(scheduleMetadata, nextStartTime, time);

                return true;

            }

            return false;

        }

    }

然后就是核心的BackgroundService了,這里我用的IdleTime心跳來實現(xiàn),粒度分鐘,當(dāng)然內(nèi)部也可以封裝Timer等實現(xiàn)更復(fù)雜精度更高的調(diào)度,這里就不展開講了,代碼如下:


    internal class ScheduleBackgroundService : BackgroundService

    {

        private static readonly TimeSpan _pollingTime

#if DEBUG

          //輪詢20s 測試環(huán)境下,方便測試。

          = TimeSpan.FromSeconds(20);

#endif

#if !DEBUG

         //輪詢60s 正式環(huán)境下,考慮性能輪詢時間延長到60s

         = TimeSpan.FromSeconds(60);

#endif

        //心跳10s.

        private static readonly TimeSpan _minIdleTime = TimeSpan.FromSeconds(10);

        private readonly ILogger<ScheduleBackgroundService> _logger;

        private readonly IServiceProvider _serviceProvider;

        public ScheduleBackgroundService(ILogger<ScheduleBackgroundService> logger, IServiceProvider serviceProvider)

        {

            _logger = logger;

            _serviceProvider = serviceProvider;

        }

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)

        {

            while (!stoppingToken.IsCancellationRequested)

            {

                var pollingDelay = Task.Delay(_pollingTime, stoppingToken);

                try

                {

                    await RunAsync(stoppingToken);

                }

                catch (Exception ex)

                {

                    //todo:

                    _logger.LogError(ex.Message);

                }

                await WaitAsync(pollingDelay, stoppingToken);

            }

        }

        private async Task RunAsync(CancellationToken stoppingToken)

        {

            using var scope = _serviceProvider.CreateScope();

            var tasks = scope.ServiceProvider.GetServices<IScheduleTask>();

            if (tasks is null || !tasks.Any())

            {

                return;

            }

            //調(diào)度器

            var scheduler = scope.ServiceProvider.GetRequiredService<IScheduler>();

            async Task DoTaskAsync(IScheduleTask task, ScheduleTaskAttribute metadata)

            {

                if (scheduler.CanRun(metadata, DateTime.Now))

                {

                    var eventTime = DateTime.Now;

                    //通知啟動

                    _ = new TaskStartedEvent(task, eventTime).PublishAsync(default);

                    try

                    {

                        if (metadata.IsAsync)

                        {

                            //異步執(zhí)行

                            _ = task.ExecuteAsync();

                        }

                        else

                        {

                            //同步執(zhí)行

                            await task.ExecuteAsync();

                        }

                        //執(zhí)行完成

                        _ = new TaskSuccessedEvent(task, eventTime, DateTime.Now).PublishAsync(default);

                    }

                    catch (Exception ex)

                    {

                        _ = new TaskFailedEvent(task, DateTime.Now, ex).PublishAsync(default);

                    }

                }

            };

            //注解中的task

            foreach (var task in tasks)

            {

                if (stoppingToken.IsCancellationRequested)

                {

                    break;

                }

                //標(biāo)注的metadatas

                var metadatas = task.GetType().GetCustomAttributes<ScheduleTaskAttribute>();


                if (!metadatas.Any())

                {

                    continue;

                }

                foreach (var metadata in metadatas)

                {

                    await DoTaskAsync(task, metadata);

                }

            }

            //store中的scheduler

            var stores = _serviceProvider.GetServices<IScheduleMetadataStore>().ToArray();


            //并行執(zhí)行,提高性能

            Parallel.ForEach(stores, async store =>

            {

                if (stoppingToken.IsCancellationRequested)

                {

                    return;

                }

                var metadatas = await store.GetAllAsync();

                if (metadatas is null || !metadatas.Any())

                {

                    return;

                }

                foreach (var metadata in metadatas)

                {

                    var attr = new ScheduleTaskAttribute(metadata.Cron)

                    {

                        Description = metadata.Description,

                        IsAsync = metadata.IsAsync,

                        IsStartOnInit = metadata.IsStartOnInit,

                    };


                    var task = scope.ServiceProvider.GetRequiredService(metadata.ScheduleTaskType) as IScheduleTask;

                    if (task is null)

                    {

                        return;

                    }

                    await DoTaskAsync(task, attr);

                }

            });

        }


        private static async Task WaitAsync(Task pollingDelay, CancellationToken stoppingToken)

        {

            try

            {

                await Task.Delay(_minIdleTime, stoppingToken);

                await pollingDelay;

            }

            catch (OperationCanceledException)

            {

            }

        }

    }

最后收尾階段我們老規(guī)矩擴展一下IServiceCollection:

        internal static IServiceCollection AddScheduleTask(this IServiceCollection services)

        {

            foreach (var task in ScheduleTasks)

            {

                services.AddTransient(task);

                services.AddTransient(typeof(IScheduleTask), task);

            }

            //調(diào)度器

            services.AddScheduler<SampleNCrontabScheduler>();

            //配置文件Store:

services.AddScheduleMetadataStore<ConfigurationScheduleMetadataStore>();

            //BackgroundService

           services.AddHostedService<ScheduleBackgroundService>();

            return services;

        }

        /// <summary>

        /// 注冊調(diào)度器AddScheduler

        /// </summary>

        public static IServiceCollection AddScheduler<T>(this IServiceCollection services) where T : class, IScheduler

        {

            services.AddSingleton<IScheduler, T>();

            return services;

        }


        /// <summary>

        /// 注冊ScheduleMetadataStore

        /// </summary>

        public static IServiceCollection AddScheduleMetadataStore<T>(this IServiceCollection services) where T : class, IScheduleMetadataStore

        {

            services.AddSingleton<IScheduleMetadataStore, T>();

            return services;

        }

老規(guī)矩我們來測試一下:

    //通過特性標(biāo)注的方式執(zhí)行:

    [ScheduleTask(Constants.CronEveryMinute)] //每分鐘一次

    [ScheduleTask("0/3 * * * *")]//每3分鐘執(zhí)行一次

    public class KeepAlive(ILogger<KeepAlive> logger) : IScheduleTask

    {

        public async Task ExecuteAsync()

        {

            //執(zhí)行5s

            await Task.Delay(TimeSpan.FromSeconds(5));

            logger.LogInformation("keep alive!");

        }

    }

public class DemoConfigTask(ILogger<DemoConfigTask> logger) : IScheduleTask

    {

        public Task ExecuteAsync()

        {

            logger.LogInformation("Demo Config Schedule Done!");

            return Task.CompletedTask;

        }

    }

通過配置文件的方式配置Store:

{

  "BiwenQuickApi": {

    "Schedules": [

      {

        "ScheduleType": "Biwen.QuickApi.DemoWeb.Schedules.DemoConfigTask,Biwen.QuickApi.DemoWeb",

        "Cron": "0/5 * * * *",

        "Description": "Every 5 mins",

        "IsAsync": true,

        "IsStartOnInit": false

      },

      {

        "ScheduleType": "Biwen.QuickApi.DemoWeb.Schedules.DemoConfigTask,Biwen.QuickApi.DemoWeb",

        "Cron": "0/10 * * * *",

        "Description": "Every 10 mins",

        "IsAsync": false,

        "IsStartOnInit": true

      }

    ]

  }

}

我們還可以實現(xiàn)自己的Store,這里以放到內(nèi)存為例,如果有興趣 你可以可以自行開發(fā)一個面板管理:

    public class DemoStore : IScheduleMetadataStore

    {

        public Task<IEnumerable<ScheduleTaskMetadata>> GetAllAsync()

        {

            //模擬從數(shù)據(jù)庫或配置文件中獲取ScheduleTaskMetadata

            IEnumerable<ScheduleTaskMetadata> metadatas =

                [

                    new ScheduleTaskMetadata(typeof(DemoTask),Constants.CronEveryNMinutes(2))

                    {

                        Description="測試的Schedule"

                    },

                ];

            return Task.FromResult(metadatas);

        }

    }

//然后注冊這個Store:

builder.Services.AddScheduleMetadataStore<DemoStore>();

所有的一切都大功告成,最后我們來跑一下Demo,成功了:

 

當(dāng)然這里是自己的固定思維設(shè)計的一個簡約版,還存在一些不足,歡迎板磚輕拍指正!

2024/05/16更新:
提供同一時間單一運行中的任務(wù)實現(xiàn)

/// <summary>

/// 模擬一個只能同時存在一個的任務(wù).一分鐘執(zhí)行一次,但是耗時兩分鐘.

/// </summary>

/// <param name="logger"></param>

[ScheduleTask(Constants.CronEveryMinute, IsStartOnInit = true)]

    public class OnlyOneTask(ILogger<OnlyOneTask> logger) : OnlyOneRunningScheduleTask

    {

        public override Task OnAbort()

        {

            logger.LogWarning($"[{DateTime.Now}]任務(wù)被打斷.因為有一個相同的任務(wù)正在執(zhí)行!");

            return Task.CompletedTask;

        }


        public override async Task ExecuteAsync()

        {

            var now = DateTime.Now;

            //模擬一個耗時2分鐘的任務(wù)

            await Task.Delay(TimeSpan.FromMinutes(2));

            logger.LogInformation($"[{now}] ~ {DateTime.Now} 執(zhí)行一個耗時兩分鐘的任務(wù)!");

        }

    }


源代碼我發(fā)布到了GitHub,歡迎star! https://github.com/vipwan/Biwen.QuickApi
https://github.com/vipwan/Biwen.QuickApi/tree/master/Biwen.QuickApi/Scheduling


轉(zhuǎn)自https://www.cnblogs.com/vipwan/p/18194062/biwen-quickapi-scheduletask 作者:萬雅虎


該文章在 2024/5/25 11:54:07 編輯過
關(guān)鍵字查詢
相關(guān)文章
正在查詢...
點晴ERP是一款針對中小制造業(yè)的專業(yè)生產(chǎn)管理軟件系統(tǒng),系統(tǒng)成熟度和易用性得到了國內(nèi)大量中小企業(yè)的青睞。
點晴PMS碼頭管理系統(tǒng)主要針對港口碼頭集裝箱與散貨日常運作、調(diào)度、堆場、車隊、財務(wù)費用、相關(guān)報表等業(yè)務(wù)管理,結(jié)合碼頭的業(yè)務(wù)特點,圍繞調(diào)度、堆場作業(yè)而開發(fā)的。集技術(shù)的先進性、管理的有效性于一體,是物流碼頭及其他港口類企業(yè)的高效ERP管理信息系統(tǒng)。
點晴WMS倉儲管理系統(tǒng)提供了貨物產(chǎn)品管理,銷售管理,采購管理,倉儲管理,倉庫管理,保質(zhì)期管理,貨位管理,庫位管理,生產(chǎn)管理,WMS管理系統(tǒng),標(biāo)簽打印,條形碼,二維碼管理,批號管理軟件。
點晴免費OA是一款軟件和通用服務(wù)都免費,不限功能、不限時間、不限用戶的免費OA協(xié)同辦公管理系統(tǒng)。
Copyright 2010-2025 ClickSun All Rights Reserved