本方案将 定时调度、分布式锁 与 消息队列 有机结合,构建一个轻量、可靠、可扩展的分布式定时任务调度服务。 核心思路:

@Scheduled 定义触发时机适用场景:固定 Cron 表达式的周期性任务(如订单超时取消、报表生成、数据同步),且希望避免引入重型调度中心。
spring-boot-starter-amqp)rocketmq-spring-boot-starter) —— 本方案以 RabbitMQ 为主common-security、common-apidoc、schedule-api 等<!-- 启用 Redis 锁提供者(取消注释并确保版本) -->
<dependency>
<groupId>net.javacrumbs.shedlock</groupId>
<artifactId>shedlock-provider-redis-spring</artifactId>
<version>5.10.2</version> <!-- 与 shedlock-spring 版本一致 -->
</dependency>
若选择 JDBC 锁,则无需额外引入(已含 shedlock-provider-jdbc-template),但需配置数据源。
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ 实例1 │ │ 实例2 │ │ 实例N │
│ @Scheduled │ │ @Scheduled │ │ @Scheduled │
│ + ShedLock │ │ + ShedLock │ │ + ShedLock │
└──────┬──────┘ └──────┬──────┘ └──────┬──────┘
│ 锁竞争(Redis) │ │
└──────────────────────┼────────────────────┘
│ 胜出者发送消息
▼
┌─────────────────┐
│ RabbitMQ │
│ 交换机/队列 │
└────────┬────────┘
│ 消费
▼
┌─────────────────┐
│ 业务消费者服务 │
│ (独立微服务) │
│ 真正执行任务 │
└─────────────────┘
schedule-service):仅负责触发消息,轻量快速,锁持有时间 < 1s| 模块 | 说明 |
|---|---|
schedule-api | 对外暴露的 API(如任务手动触发接口、状态查询) |
schedule-service | 本模块,包含调度配置、消息发送、ShedLock 配置 |
| 业务消费者 | 在其他微服务中实现,使用 @RabbitListener |
schedule-service.yaml (存放于 Nacos)spring:
rabbitmq:
host: ${RABBITMQ_HOST:10.0.0.10}
port: 5672
username: admin
password: ${RABBITMQ_PASSWORD}
publisher-confirm-type: correlated # 发送端确认
publisher-returns: true
listener:
simple:
concurrency: 2
max-concurrency: 8
prefetch: 1
acknowledge-mode: manual # 手动确认
retry:
enabled: true
max-attempts: 3
initial-interval: 1000ms
multiplier: 2.0
redis:
host: ${REDIS_HOST:10.0.0.20}
port: 6379
password: ${REDIS_PASSWORD}
database: 0
timeout: 3000ms
lettuce:
pool:
max-active: 8
max-idle: 4scheduling:
pool-size: 10 # 定时任务线程池大小
tasks:
order-cancel:
enabled: true # 动态开关
spring:
cloud:
nacos:
discovery:
server-addr: ${NACOS_SERVER:127.0.0.1:8848}
config:
server-addr: ${NACOS_SERVER:127.0.0.1:8848}
file-extension: yaml
shared-configs:
- data-id: schedule-service.yaml
group: DEFAULT_GROUP
refresh: true
application:
name: schedule-service
@Configuration
@EnableScheduling
@EnableSchedulerLock(defaultLockAtMostFor = "PT1M") // 全局默认锁最大持有1分钟
public class SchedulerConfig { @Bean
public LockProvider lockProvider(RedisConnectionFactory connectionFactory) {
return new RedisLockProvider(connectionFactory, "schedule-service");
}
}
锁参数说明:
lockAtMostFor:锁的强制释放时间,必须大于任务最长执行时间(此处发送消息几乎瞬间,30s足够)lockAtLeastFor:锁最短持有时间,防止同一任务因时钟漂移瞬间重复执行(建议5~15s)JDBC 备选(如需使用):
@Bean
public LockProvider lockProvider(DataSource dataSource) {
return new JdbcTemplateLockProvider(
JdbcTemplateLockProvider.Configuration.builder()
.withJdbcTemplate(new JdbcTemplate(dataSource))
.usingDbTime()
.build()
);
}
并执行建表 SQL(见运维手册)。
避免默认单线程导致任务阻塞。
@Configuration
public class SchedulingThreadPoolConfig implements SchedulingConfigurer { @Value("${scheduling.pool-size:10}")
private int poolSize; @Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
taskRegistrar.setScheduler(taskExecutor());
} @Bean(destroyMethod = "shutdown")
public ScheduledExecutorService taskExecutor() {
return Executors.newScheduledThreadPool(poolSize,
new ThreadFactoryBuilder().setNameFormat("sched-pool-%d").build());
}
}
@Configuration
public class RabbitMQDeclareConfig { public static final String TASK_EXCHANGE = "scheduled.task.exchange";
public static final String TASK_QUEUE = "scheduled.task.queue";
public static final String TASK_ROUTING_KEY = "task.execute"; // 死信
public static final String DLX_EXCHANGE = "scheduled.task.dlx.exchange";
public static final String DLX_QUEUE = "scheduled.task.dlx.queue";
public static final String DLX_ROUTING_KEY = "task.execute.dlx"; @Bean
public DirectExchange taskExchange() {
return new DirectExchange(TASK_EXCHANGE, true, false);
} @Bean
public Queue taskQueue() {
return QueueBuilder.durable(TASK_QUEUE)
.deadLetterExchange(DLX_EXCHANGE)
.deadLetterRoutingKey(DLX_ROUTING_KEY)
.build();
} @Bean
public Binding taskBinding() {
return BindingBuilder.bind(taskQueue()).to(taskExchange()).with(TASK_ROUTING_KEY);
} @Bean
public DirectExchange dlxExchange() {
return new DirectExchange(DLX_EXCHANGE);
} @Bean
public Queue dlxQueue() {
return new Queue(DLX_QUEUE);
} @Bean
public Binding dlxBinding() {
return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(DLX_ROUTING_KEY);
}
}
@Configuration
public class RabbitMQMessageConfig { @Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
任务指令对象:
@Data
@AllArgsConstructor
@NoArgsConstructor
public class TaskCommand implements Serializable {
private String taskType; // 任务类型标识,如 "ORDER_CANCEL"
private String traceId; // 链路追踪ID
private Map<String, Object> params; // 可选参数
}
触发器示例:
@Component
@Slf4j
@ConditionalOnProperty(name = "scheduling.tasks.order-cancel.enabled", havingValue = "true")
public class OrderCancelTrigger { private final RabbitTemplate rabbitTemplate; public OrderCancelTrigger(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
} @Scheduled(cron = "${scheduling.tasks.order-cancel.cron:0 0/5 * * * ?}")
@SchedulerLock(name = "OrderCancelTask",
lockAtMostFor = "PT30S",
lockAtLeastFor = "PT10S")
public void triggerOrderCancel() {
String traceId = MDC.get("traceId");
TaskCommand cmd = new TaskCommand("ORDER_CANCEL", traceId, null);
rabbitTemplate.convertAndSend(
RabbitMQDeclareConfig.TASK_EXCHANGE,
RabbitMQDeclareConfig.TASK_ROUTING_KEY,
cmd,
message -> {
message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
return message;
}
);
log.info("Sent task command: {}", cmd.getTaskType());
}
}
要点:
@ConditionalOnProperty 实现 Nacos 动态开关,无需重启@RefreshScope 刷新(需在类上加 @RefreshScope)@Component
@Slf4j
public class OrderCancelConsumer { @Autowired
private OrderService orderService; @RabbitListener(queues = RabbitMQDeclareConfig.TASK_QUEUE)
public void handleTask(TaskCommand command, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) {
try {
if ("ORDER_CANCEL".equals(command.getTaskType())) {
orderService.cancelExpiredOrders();
}
channel.basicAck(tag, false);
} catch (Exception e) {
log.error("Task execution failed: {}", command, e);
// 根据异常类型决定是否重新入队
try {
channel.basicNack(tag, false, true); // 重试
} catch (IOException io) {
// 记录错误,触发告警
}
}
}
}
注意:业务服务必须实现幂等(如基于状态机 + 乐观锁)。
虽然触发器是内部调用,我们仍可使用 Sentinel 对 RabbitTemplate 的发送进行包装,避免突发流量冲垮 RabbitMQ。
自定义 Sentinel 资源:
@Scheduled(...)
@SchedulerLock(...)
public void triggerOrderCancel() {
Entry entry = null;
try {
entry = SphU.entry("send_task_command");
// 发送消息
} catch (BlockException e) {
log.warn("Flow control triggered, task command send blocked");
} finally {
if (entry != null) entry.exit();
}
}
更优雅的方式是对 RabbitTemplate.convertAndSend 使用 Sentinel 的 @SentinelResource 或定义切面。
# sentinel-degrade-rules
[
{
"resource": "send_task_command",
"count": 20,
"grade": 1,
"timeWindow": 10
}
]
当每秒发送超过 20 条时熔断 10 秒,防止 RabbitMQ 过载。
利用 Spring Cloud Alibaba 的配置刷新能力,关键任务可通过 Nacos 实时启停,无需重新部署。
@ConditionalOnProperty@RefreshScopescheduling.tasks.xxx.enabled 即可也可将 Cron 表达式外置:
@Scheduled(cron = "${order.cancel.cron}")
并在 Nacos 中更新,配合 @RefreshScope 会重新初始化任务调度(需要 ScheduledTaskRegistrar 重新注册,稍复杂,推荐使用 XXL-JOB 等动态平台)。
@Bean
public MeterRegistryCustomizer<MeterRegistry> metricsCommonTags() {
return registry -> registry.config().commonTags("application", "schedule-service");
}// 定时任务执行埋点(AOP)
@Around("@annotation(scheduled)")
public Object monitorSchedule(ProceedingJoinPoint pjp, Scheduled scheduled) {
Timer.Sample sample = Timer.start(meterRegistry);
try {
Object result = pjp.proceed();
sample.stop(Timer.builder("scheduled.task.execution")
.tag("task", pjp.getSignature().getName())
.tag("status", "success")
.register(meterRegistry));
return result;
} catch (Exception e) {
sample.stop(Timer.builder("scheduled.task.execution")
.tag("task", pjp.getSignature().getName())
.tag("status", "error")
.register(meterRegistry));
throw e;
}
}
scheduled.task.execution 的 status=error 计数 > 0,触发钉钉/邮件通知scheduled.task.dlx.queue 有消息堆积,立即告警在 TaskCommand 中传递 traceId,使用 SkyWalking 或 Sleuth + Zipkin,串联调度服务和消费服务。
server:
shutdown: graceful
spring:
lifecycle:
timeout-per-shutdown-phase: 30s
此设置让正在发送的消息有足够时间完成 RabbitMQ 交互,并等待 @RabbitListener 中正在处理的消息完成(消费者端)。
deliveryMode=2 持久化publisher-confirm 开启,发送端确保消息投递CREATE TABLE shedlock (
name VARCHAR(64) NOT NULL,
lock_until TIMESTAMP NOT NULL,
locked_at TIMESTAMP NOT NULL,
locked_by VARCHAR(255) NOT NULL,
PRIMARY KEY (name)
);
KEYS schedule-service* 可查看当前持有的锁scheduled.task.queue 的消息堆积、消费者数量scheduled.task.dlx.queue 消息数应为 0,若存在则说明有消费失败的消息,需排查| 问题 | 可能原因 | 解决方法 |
|---|---|---|
| 任务重复执行 | ShedLock 未生效(配置错误、Redis连接失败) | 检查 @EnableSchedulerLock、LockProvider Bean 是否正常,检查日志 |
| 消息堆积 | 消费者处理太慢或宕机 | 增加消费者实例数,检查消费者逻辑是否有死循环 |
| 定时任务不触发 | 开关关闭、Cron 表达式错误、线程池满 | 检查 Nacos 配置、@ConditionalOnProperty,查看线程堆栈 |
| 锁超时释放导致重复发消息 | lockAtMostFor 设置太小,发送消息超时 | 调大锁时长,优化 RabbitMQ 连接超时配置 |
@SpringBootApplication
@EnableDiscoveryClient
@RefreshScope
public class ScheduleServiceApplication {
public static void main(String[] args) {
SpringApplication.run(ScheduleServiceApplication.class, args);
}
}
@Component
public class ShedLockHealthIndicator implements HealthIndicator {
@Autowired
private LockProvider lockProvider;
@Override
public Health health() {
// 简单检查锁提供者是否可用
Optional<SimpleLock> lock = lockProvider.lock(new LockConfiguration("health-check", Instant.now(), "health"));
if (lock.isPresent()) {
lock.get().unlock();
return Health.up().build();
}
return Health.down().withDetail("shedlock", "LockProvider unavailable").build();
}
}
@RestControllerAdvice
public class GlobalExceptionHandler {
// 针对定时任务触发的 HTTP 接口(如手动触发)
}
这份方案将 Spring Schedule + ShedLock + RabbitMQ 无缝融入现有的 Spring Cloud Alibaba 体系,充分利用 Nacos 动态配置、Sentinel 流控以及 RabbitMQ 的可靠投递能力,构建了一个 高可用、可观测、易运维 的分布式定时任务调度服务。
它足够轻量,却能稳稳支撑核心业务场景。当未来需求增长至需要动态 Cron、任务分片时,可平滑迁移至 XXL-JOB 或 PowerJob,当前消息驱动架构为此类演进提供了天然基础。