至于任務(wù)調(diào)度這個基礎(chǔ)功能,重要性不言而喻,大多數(shù)業(yè)務(wù)系統(tǒng)都會用到,世面上有很多成熟的三方庫比如Quartz,Hangfire,Coravel
這里我們不討論三方的庫如何使用 而是從0開始自己制作一個簡易的任務(wù)調(diào)度,如果只是到分鐘級別的粒度基本夠用
技術(shù)棧用到了:BackgroundService
和NCrontab
庫
第一步我們定義一個簡單的任務(wù)約定,不干別的就是一個執(zhí)行方法:
public interface IScheduleTask
{
Task ExecuteAsync();
}
public abstract class ScheduleTask : IScheduleTask
{
public virtual Task ExecuteAsync()
{
return Task.CompletedTask;
}
}
第二步定義特性標(biāo)注任務(wù)執(zhí)行周期等信的metadata
[AttributeUsage(AttributeTargets.Class, AllowMultiple = true, Inherited = false)]
public class ScheduleTaskAttribute(string cron) : Attribute
{
/// <summary>
/// 支持的cron表達(dá)式格式 * * * * *:https://en.wikipedia.org/wiki/Cron
/// 最小單位為分鐘
/// </summary>
public string Cron { get; set; } = cron;
public string? Description { get; set; }
/// <summary>
/// 是否異步執(zhí)行.默認(rèn)false會阻塞接下來的同類任務(wù)
/// </summary>
public bool IsAsync { get; set; } = false;
/// <summary>
/// 是否初始化即啟動,默認(rèn)false
/// </summary>
public bool IsStartOnInit { get; set; } = false;
}
第三步我們定義一個調(diào)度器約定,不干別的就是判斷當(dāng)前的任務(wù)是否可以執(zhí)行:
public interface IScheduler
{
/// <summary>
/// 判斷當(dāng)前的任務(wù)是否可以執(zhí)行
/// </summary>
bool CanRun(ScheduleTaskAttribute scheduleMetadata, DateTime referenceTime);
}
好了,基礎(chǔ)步驟就完成了,如果我們需要實現(xiàn)配置級別的任務(wù)調(diào)度或者動態(tài)的任務(wù)調(diào)度 那我們再抽象一個Store:
public class ScheduleTaskMetadata(Type scheduleTaskType, string cron)
{
public Type ScheduleTaskType { get; set; } = scheduleTaskType;
public string Cron { get; set; } = cron;
public string? Description { get; set; }
public bool IsAsync { get; set; } = false;
public bool IsStartOnInit { get; set; } = false;
}
public interface IScheduleMetadataStore
{
/// <summary>
/// 獲取所有ScheduleTaskMetadata
/// </summary>
Task<IEnumerable<ScheduleTaskMetadata>> GetAllAsync();
}
實現(xiàn)一個Configuration級別的Store
internal class ConfigurationScheduleMetadataStore(IConfiguration configuration) : IScheduleMetadataStore
{
const string Key = "BiwenQuickApi:Schedules";
public Task<IEnumerable<ScheduleTaskMetadata>> GetAllAsync()
{
var options = configuration.GetSection(Key).GetChildren();
if (options?.Any() is true)
{
var metadatas = options.Select(x =>
{
var type = Type.GetType(x[nameof(ConfigurationScheduleOption.ScheduleType)]!);
if (type is null)
throw new ArgumentException($"Type {x[nameof(ConfigurationScheduleOption.ScheduleType)]} not found!");
return new ScheduleTaskMetadata(type, x[nameof(ConfigurationScheduleOption.Cron)]!)
{
Description = x[nameof(ConfigurationScheduleOption.Description)],
IsAsync = string.IsNullOrEmpty(x[nameof(ConfigurationScheduleOption.IsAsync)]) ? false : bool.Parse(x[nameof(ConfigurationScheduleOption.IsAsync)]!),
IsStartOnInit = string.IsNullOrEmpty(x[nameof(ConfigurationScheduleOption.IsStartOnInit)]) ? false : bool.Parse(x[nameof(ConfigurationScheduleOption.IsStartOnInit)]!),
};
});
return Task.FromResult(metadatas);
}
return Task.FromResult(Enumerable.Empty<ScheduleTaskMetadata>());
}
}
然后呢,我們可能需要多任務(wù)調(diào)度的事件做一些操作或者日志存儲.比如失敗了該干嘛,完成了回調(diào)其他后續(xù)業(yè)務(wù)等.我們再來定義一下具體的事件IEvent
,具體可以參考我上一篇文章:
https://www.cnblogs.com/vipwan/p/18184088
public abstract class ScheduleTaskEvent(IScheduleTask scheduleTask, DateTime eventTime) : IEvent
{
/// <summary>
/// 任務(wù)
/// </summary>
public IScheduleTask ScheduleTask { get; set; } = scheduleTask;
/// <summary>
/// 觸發(fā)時間
/// </summary>
public DateTime EventTime { get; set; } = eventTime;
}
/// <summary>
/// 執(zhí)行完成
/// </summary>
public sealed class TaskSuccessedEvent(IScheduleTask scheduleTask, DateTime eventTime, DateTime endTime) : ScheduleTaskEvent(scheduleTask, eventTime)
{
/// <summary>
/// 執(zhí)行結(jié)束的時間
/// </summary>
public DateTime EndTime { get; set; } = endTime;
}
/// <summary>
/// 執(zhí)行開始
/// </summary>
public sealed class TaskStartedEvent(IScheduleTask scheduleTask, DateTime eventTime) : ScheduleTaskEvent(scheduleTask, eventTime);
/// <summary>
/// 執(zhí)行失敗
/// </summary>
public sealed class TaskFailedEvent(IScheduleTask scheduleTask, DateTime eventTime, Exception exception) : ScheduleTaskEvent(scheduleTask, eventTime)
{
/// <summary>
/// 異常信息
/// </summary>
public Exception Exception { get; private set; } = exception;
}
接下來我們再實現(xiàn)基于NCrontab
的簡易調(diào)度器,這個調(diào)度器主要是解析Cron
表達(dá)式判斷傳入時間是否可以執(zhí)行ScheduleTask,具體的代碼:
internal class SampleNCrontabScheduler : IScheduler
{
/// <summary>
/// 暫存上次執(zhí)行時間
/// </summary>
private static ConcurrentDictionary<ScheduleTaskAttribute, DateTime> LastRunTimes = new();
public bool CanRun(ScheduleTaskAttribute scheduleMetadata, DateTime referenceTime)
{
var now = DateTime.Now;
var haveExcuteTime = LastRunTimes.TryGetValue(scheduleMetadata, out var time);
if (!haveExcuteTime)
{
var nextStartTime = CrontabSchedule.Parse(scheduleMetadata.Cron).GetNextOccurrence(referenceTime);
LastRunTimes.TryAdd(scheduleMetadata, nextStartTime);
//如果不是初始化啟動,則不執(zhí)行
if (!scheduleMetadata.IsStartOnInit)
return false;
}
if (now >= time)
{
var nextStartTime = CrontabSchedule.Parse(scheduleMetadata.Cron).GetNextOccurrence(referenceTime);
//更新下次執(zhí)行時間
LastRunTimes.TryUpdate(scheduleMetadata, nextStartTime, time);
return true;
}
return false;
}
}
然后就是核心的BackgroundService
了,這里我用的IdleTime心跳來實現(xiàn),粒度分鐘,當(dāng)然內(nèi)部也可以封裝Timer
等實現(xiàn)更復(fù)雜精度更高的調(diào)度,這里就不展開講了,代碼如下:
internal class ScheduleBackgroundService : BackgroundService
{
private static readonly TimeSpan _pollingTime
#if DEBUG
//輪詢20s 測試環(huán)境下,方便測試。
= TimeSpan.FromSeconds(20);
#endif
#if !DEBUG
//輪詢60s 正式環(huán)境下,考慮性能輪詢時間延長到60s
= TimeSpan.FromSeconds(60);
#endif
//心跳10s.
private static readonly TimeSpan _minIdleTime = TimeSpan.FromSeconds(10);
private readonly ILogger<ScheduleBackgroundService> _logger;
private readonly IServiceProvider _serviceProvider;
public ScheduleBackgroundService(ILogger<ScheduleBackgroundService> logger, IServiceProvider serviceProvider)
{
_logger = logger;
_serviceProvider = serviceProvider;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var pollingDelay = Task.Delay(_pollingTime, stoppingToken);
try
{
await RunAsync(stoppingToken);
}
catch (Exception ex)
{
//todo:
_logger.LogError(ex.Message);
}
await WaitAsync(pollingDelay, stoppingToken);
}
}
private async Task RunAsync(CancellationToken stoppingToken)
{
using var scope = _serviceProvider.CreateScope();
var tasks = scope.ServiceProvider.GetServices<IScheduleTask>();
if (tasks is null || !tasks.Any())
{
return;
}
//調(diào)度器
var scheduler = scope.ServiceProvider.GetRequiredService<IScheduler>();
async Task DoTaskAsync(IScheduleTask task, ScheduleTaskAttribute metadata)
{
if (scheduler.CanRun(metadata, DateTime.Now))
{
var eventTime = DateTime.Now;
//通知啟動
_ = new TaskStartedEvent(task, eventTime).PublishAsync(default);
try
{
if (metadata.IsAsync)
{
//異步執(zhí)行
_ = task.ExecuteAsync();
}
else
{
//同步執(zhí)行
await task.ExecuteAsync();
}
//執(zhí)行完成
_ = new TaskSuccessedEvent(task, eventTime, DateTime.Now).PublishAsync(default);
}
catch (Exception ex)
{
_ = new TaskFailedEvent(task, DateTime.Now, ex).PublishAsync(default);
}
}
};
//注解中的task
foreach (var task in tasks)
{
if (stoppingToken.IsCancellationRequested)
{
break;
}
//標(biāo)注的metadatas
var metadatas = task.GetType().GetCustomAttributes<ScheduleTaskAttribute>();
if (!metadatas.Any())
{
continue;
}
foreach (var metadata in metadatas)
{
await DoTaskAsync(task, metadata);
}
}
//store中的scheduler
var stores = _serviceProvider.GetServices<IScheduleMetadataStore>().ToArray();
//并行執(zhí)行,提高性能
Parallel.ForEach(stores, async store =>
{
if (stoppingToken.IsCancellationRequested)
{
return;
}
var metadatas = await store.GetAllAsync();
if (metadatas is null || !metadatas.Any())
{
return;
}
foreach (var metadata in metadatas)
{
var attr = new ScheduleTaskAttribute(metadata.Cron)
{
Description = metadata.Description,
IsAsync = metadata.IsAsync,
IsStartOnInit = metadata.IsStartOnInit,
};
var task = scope.ServiceProvider.GetRequiredService(metadata.ScheduleTaskType) as IScheduleTask;
if (task is null)
{
return;
}
await DoTaskAsync(task, attr);
}
});
}
private static async Task WaitAsync(Task pollingDelay, CancellationToken stoppingToken)
{
try
{
await Task.Delay(_minIdleTime, stoppingToken);
await pollingDelay;
}
catch (OperationCanceledException)
{
}
}
}
最后收尾階段我們老規(guī)矩擴展一下IServiceCollection
:
internal static IServiceCollection AddScheduleTask(this IServiceCollection services)
{
foreach (var task in ScheduleTasks)
{
services.AddTransient(task);
services.AddTransient(typeof(IScheduleTask), task);
}
//調(diào)度器
services.AddScheduler<SampleNCrontabScheduler>();
//配置文件Store:
services.AddScheduleMetadataStore<ConfigurationScheduleMetadataStore>();
//BackgroundService
services.AddHostedService<ScheduleBackgroundService>();
return services;
}
/// <summary>
/// 注冊調(diào)度器AddScheduler
/// </summary>
public static IServiceCollection AddScheduler<T>(this IServiceCollection services) where T : class, IScheduler
{
services.AddSingleton<IScheduler, T>();
return services;
}
/// <summary>
/// 注冊ScheduleMetadataStore
/// </summary>
public static IServiceCollection AddScheduleMetadataStore<T>(this IServiceCollection services) where T : class, IScheduleMetadataStore
{
services.AddSingleton<IScheduleMetadataStore, T>();
return services;
}
老規(guī)矩我們來測試一下:
//通過特性標(biāo)注的方式執(zhí)行:
[ScheduleTask(Constants.CronEveryMinute)] //每分鐘一次
[ScheduleTask("0/3 * * * *")]//每3分鐘執(zhí)行一次
public class KeepAlive(ILogger<KeepAlive> logger) : IScheduleTask
{
public async Task ExecuteAsync()
{
//執(zhí)行5s
await Task.Delay(TimeSpan.FromSeconds(5));
logger.LogInformation("keep alive!");
}
}
public class DemoConfigTask(ILogger<DemoConfigTask> logger) : IScheduleTask
{
public Task ExecuteAsync()
{
logger.LogInformation("Demo Config Schedule Done!");
return Task.CompletedTask;
}
}
通過配置文件的方式配置Store:
{
"BiwenQuickApi": {
"Schedules": [
{
"ScheduleType": "Biwen.QuickApi.DemoWeb.Schedules.DemoConfigTask,Biwen.QuickApi.DemoWeb",
"Cron": "0/5 * * * *",
"Description": "Every 5 mins",
"IsAsync": true,
"IsStartOnInit": false
},
{
"ScheduleType": "Biwen.QuickApi.DemoWeb.Schedules.DemoConfigTask,Biwen.QuickApi.DemoWeb",
"Cron": "0/10 * * * *",
"Description": "Every 10 mins",
"IsAsync": false,
"IsStartOnInit": true
}
]
}
}
我們還可以實現(xiàn)自己的Store,這里以放到內(nèi)存為例,如果有興趣 你可以可以自行開發(fā)一個面板管理:
public class DemoStore : IScheduleMetadataStore
{
public Task<IEnumerable<ScheduleTaskMetadata>> GetAllAsync()
{
//模擬從數(shù)據(jù)庫或配置文件中獲取ScheduleTaskMetadata
IEnumerable<ScheduleTaskMetadata> metadatas =
[
new ScheduleTaskMetadata(typeof(DemoTask),Constants.CronEveryNMinutes(2))
{
Description="測試的Schedule"
},
];
return Task.FromResult(metadatas);
}
}
//然后注冊這個Store:
builder.Services.AddScheduleMetadataStore<DemoStore>();
所有的一切都大功告成,最后我們來跑一下Demo,成功了:
當(dāng)然這里是自己的固定思維設(shè)計的一個簡約版,還存在一些不足,歡迎板磚輕拍指正!
2024/05/16更新:
提供同一時間單一運行中的任務(wù)實現(xiàn)
/// <summary>
/// 模擬一個只能同時存在一個的任務(wù).一分鐘執(zhí)行一次,但是耗時兩分鐘.
/// </summary>
/// <param name="logger"></param>
[ScheduleTask(Constants.CronEveryMinute, IsStartOnInit = true)]
public class OnlyOneTask(ILogger<OnlyOneTask> logger) : OnlyOneRunningScheduleTask
{
public override Task OnAbort()
{
logger.LogWarning($"[{DateTime.Now}]任務(wù)被打斷.因為有一個相同的任務(wù)正在執(zhí)行!");
return Task.CompletedTask;
}
public override async Task ExecuteAsync()
{
var now = DateTime.Now;
//模擬一個耗時2分鐘的任務(wù)
await Task.Delay(TimeSpan.FromMinutes(2));
logger.LogInformation($"[{now}] ~ {DateTime.Now} 執(zhí)行一個耗時兩分鐘的任務(wù)!");
}
}
源代碼我發(fā)布到了GitHub,歡迎star! https://github.com/vipwan/Biwen.QuickApi
https://github.com/vipwan/Biwen.QuickApi/tree/master/Biwen.QuickApi/Scheduling
轉(zhuǎn)自https://www.cnblogs.com/vipwan/p/18194062/biwen-quickapi-scheduletask 作者:萬雅虎
該文章在 2024/5/25 11:54:07 編輯過