TaskDispatchBackgroundService.cs
6.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Rcs.Application.Shared;
using Rcs.Domain.Entities;
using Rcs.Domain.Repositories;
using TaskStatus = Rcs.Domain.Entities.TaskStatus;
namespace Rcs.Infrastructure.Services
{
/// <summary>
/// 后台任务调度服务 - 循环调度等待中的任务分配给空闲机器人
/// @author zzy
/// </summary>
public class TaskDispatchBackgroundService : BackgroundService, ITaskDispatchService
{
private readonly ILogger<TaskDispatchBackgroundService> _logger;
private readonly IServiceProvider _serviceProvider;
private readonly TimeSpan _dispatchInterval = TimeSpan.FromSeconds(5);
private const int MaxPendingTasksPerCycle = 10;
public TaskDispatchBackgroundService(
ILogger<TaskDispatchBackgroundService> logger,
IServiceProvider serviceProvider)
{
_logger = logger;
_serviceProvider = serviceProvider;
}
/// <summary>
/// 后台服务执行入口
/// @author zzy
/// </summary>
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("[任务调度] 后台任务调度服务已启动");
while (!stoppingToken.IsCancellationRequested)
{
try
{
await DispatchAsync(stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "[任务调度] 调度过程发生异常");
}
await Task.Delay(_dispatchInterval, stoppingToken);
}
_logger.LogInformation("[任务调度] 后台任务调度服务已停止");
}
/// <summary>
/// 执行一次任务调度
/// @author zzy
/// </summary>
public async Task<TaskDispatchResult> DispatchAsync(CancellationToken cancellationToken = default)
{
using var scope = _serviceProvider.CreateScope();
var taskRepo = scope.ServiceProvider.GetRequiredService<IRobotTaskRepository>();
var robotRepo = scope.ServiceProvider.GetRequiredService<IRobotRepository>();
var templateRepo = scope.ServiceProvider.GetRequiredService<ITaskTemplateRepository>();
// 1. 获取等待中的任务,按优先级升序(数值小优先级高)、创建时间升序排序,取前10个
var pendingTasks = (await taskRepo.GetByStatusAsync(TaskStatus.Pending, cancellationToken))
.OrderBy(t => t.Priority)
.ThenBy(t => t.CreatedAt)
.Take(MaxPendingTasksPerCycle)
.ToList();
if (!pendingTasks.Any())
{
return new TaskDispatchResult { Success = true, AssignedCount = 0, Message = "无待调度任务" };
}
int assignedCount = 0;
foreach (var task in pendingTasks)
{
// 获取任务起点所在地图的空闲机器人,后期再方法中进行策略优化
var idleRobot = await FindIdleRobotForTaskAsync(task, robotRepo, cancellationToken);
if (idleRobot == null) continue;
// 获取机器人对应的任务模板
var template = await GetTemplateForRobotAsync(idleRobot, templateRepo, cancellationToken);
// 分配任务
task.RobotId = idleRobot.RobotId;
task.TaskTemplateId = template?.TemplateId;
task.Status = TaskStatus.Assigned;
task.UpdatedAt = DateTime.Now;
await taskRepo.UpdateAsync(task, cancellationToken);
_logger.LogInformation("[任务调度] 任务 {TaskCode} 已分配给机器人 {RobotCode},模板: {TemplateCode}",
task.TaskCode, idleRobot.RobotCode, template?.TemplateCode ?? "无");
assignedCount++;
}
await taskRepo.SaveChangesAsync(cancellationToken);
return new TaskDispatchResult
{
Success = true,
AssignedCount = assignedCount,
Message = $"本次调度完成,已分配 {assignedCount} 个任务"
};
}
/// <summary>
/// 根据任务起点所在地图查找空闲机器人
/// @author zzy
/// </summary>
private async Task<Robot?> FindIdleRobotForTaskAsync(
RobotTask task,
IRobotRepository robotRepo,
CancellationToken cancellationToken)
{
// 获取任务起点节点的地图ID
var taskWithDetails = await _serviceProvider.CreateScope().ServiceProvider
.GetRequiredService<IRobotTaskRepository>()
.GetByIdWithDetailsAsync(task.TaskId, cancellationToken);
if (taskWithDetails?.BeginLocation?.MapNode == null)
{
_logger.LogWarning("[任务调度] 任务 {TaskCode} 无起点节点信息", task.TaskCode);
return null;
}
var mapId = taskWithDetails.BeginLocation?.MapNode.MapId;
// 获取该地图上所有空闲且在线的机器人
var idleRobots = await robotRepo.GetIdleRobotsAsync(cancellationToken);
var availableRobot = idleRobots
.FirstOrDefault(r => r.CurrentMapCodeId == mapId
&& r is { Online: OnlineStatus.Online, Active: true, Driving: false, Paused: false });
return availableRobot;
}
/// <summary>
/// 获取机器人对应的任务模板
/// @author zzy
/// </summary>
private async Task<TaskTemplate?> GetTemplateForRobotAsync(
Robot robot,
ITaskTemplateRepository templateRepo,
CancellationToken cancellationToken)
{
// 优先获取该机器人类型和制造商的默认模板
var template = await templateRepo.GetDefaultTemplateAsync(
robot.RobotType,
robot.RobotManufacturer,
cancellationToken: cancellationToken);
if (template != null) return template;
// 如果没有默认模板,获取该机器人类型的任意启用模板
var templates = await templateRepo.GetByRobotTypeAsync(robot.RobotType, cancellationToken);
return templates.FirstOrDefault(t => t.IsEnabled);
}
}
}