批量发货场景下,用户选择多条待发货单据一键发货,系统需要:

数据库加状态字段虽然也能做,但频繁读写、并发查询性能差。Redis 作为内存缓存,天然适合这种"短生命周期、高频读写"的临时状态管理。
用户触发批量发货 ↓遍历每一条待发货单据 ↓检查 Redis key 是否存在(是否正在处理中) ├── 存在 → 跳过,返回"正在处理中" └── 不存在 → 写入 Redis key(标记占位) → 执行发货逻辑 ↓ 发货成功 → 删除 key 或设短过期 发货失败 → 删除 key,返回错误
| 操作 | Redis 命令 | 说明 |
|---|---|---|
| 标记正在处理 | SET key value EX ttl NX | NX 保证互斥,EX 防死锁 |
| 检查是否处理中 | GET key | 非空则跳过 |
| 处理完成清除 | DEL key | 释放标记 |
| 批量检查 | MGET key1 key2 ... | 一次查多个单据状态 |
import org.springframework.data.redis.core.StringRedisTemplate;import org.springframework.stereotype.Service;import java.util.ArrayList;import java.util.List;import java.util.concurrent.TimeUnit;@Servicepublic class BatchTaskService { private final StringRedisTemplate redisTemplate; // key 前缀 private static final String PROCESSING_KEY_PREFIX = "batch:task:processing:"; // 标记过期时间(防止异常未清除导致永久阻塞) private static final long EXPIRE_SECONDS = 300; public BatchTaskService(StringRedisTemplate redisTemplate) { this.redisTemplate = redisTemplate; } /** * 批量处理任务. * * @param taskIds 待处理的任务ID列表 * @return 处理结果 */ public BatchResult processBatch(List<String> taskIds) { List<String> successList = new ArrayList<>(); List<String> skippedList = new ArrayList<>(); List<String> failedList = new ArrayList<>(); for (String taskId : taskIds) { String redisKey = PROCESSING_KEY_PREFIX + taskId; // 尝试标记为处理中(SET NX EX:不存在才设置 + 过期时间) Boolean acquired = redisTemplate.opsForValue() .setIfAbsent(redisKey, "processing", EXPIRE_SECONDS, TimeUnit.SECONDS); if (Boolean.FALSE.equals(acquired)) { // key 已存在,说明正在被其他线程/实例处理 skippedList.add(taskId); continue; } try { // 执行业务逻辑 doProcess(taskId); successList.add(taskId); } catch (Exception e) { failedList.add(taskId); } finally { // 无论成功失败都清除标记,允许后续重试 redisTemplate.delete(redisKey); } } return new BatchResult(successList, skippedList, failedList); } /** * 查询哪些任务正在处理中(前端轮询用). */ public List<String> listProcessingTasks(List<String> taskIds) { List<String> keys = taskIds.stream() .map(id -> PROCESSING_KEY_PREFIX + id) .collect(Collectors.toList()); List<String> values = redisTemplate.opsForValue().multiGet(keys); List<String> processingIds = new ArrayList<>(); for (int i = 0; i < taskIds.size(); i++) { if (values != null && values.get(i) != null) { processingIds.add(taskIds.get(i)); } } return processingIds; } private void doProcess(String taskId) { // 具体业务逻辑... }}如果前端需要展示"已处理 3/10"这样的进度,可以用 Redis Hash:
/** * 批量任务进度跟踪. */@Servicepublic class BatchProgressService { private final StringRedisTemplate redisTemplate; private static final String PROGRESS_KEY_PREFIX = "batch:progress:"; public BatchProgressService(StringRedisTemplate redisTemplate) { this.redisTemplate = redisTemplate; } /** 初始化批次进度. */ public String initBatch(List<String> taskIds) { String batchId = UUID.randomUUID().toString(); String progressKey = PROGRESS_KEY_PREFIX + batchId; // Hash 结构:field=taskId, value=状态 Map<String, String> statusMap = taskIds.stream() .collect(Collectors.toMap(id -> id, id -> "PENDING")); redisTemplate.opsForHash().putAll(progressKey, statusMap); redisTemplate.expire(progressKey, 1, TimeUnit.HOURS); return batchId; } /** 更新单个任务状态. */ public void updateStatus(String batchId, String taskId, String status) { String progressKey = PROGRESS_KEY_PREFIX + batchId; redisTemplate.opsForHash().put(progressKey, taskId, status); } /** 查询批次进度. */ public Map<String, String> getProgress(String batchId) { String progressKey = PROGRESS_KEY_PREFIX + batchId; Map<Object, Object> entries = redisTemplate.opsForHash().entries(progressKey); return entries.entrySet().stream() .collect(Collectors.toMap( e -> e.getKey().toString(), e -> e.getValue().toString() )); }}| 考量点 | 说明 |
|---|---|
| 过期时间 | 必须设置,防止应用崩溃后 key 永久残留阻塞后续操作 |
| 幂等性 | 加锁失败时不报错而是跳过,配合重试机制实现最终一致 |
| key 设计 | 业务前缀:模块:唯一标识,如 batch:delivery:SO202401010001 |
| 清除时机 | 成功/失败都删除(允许重试)或成功后保留短时间(防重复成功回调) |
| 集群环境 | 确保所有实例连同一个 Redis 集群,key 可见性一致 |
自动发货场景下:
Redis List 结构完美匹配:左进右出、支持批量读取、设置过期自动清理。
自动发货失败 ↓格式化错误信息(单号 + 客户码 + requestId + 失败原因) ↓RPUSH 写入 Redis List ↓设置 key 1天过期(EXPIRE) ↓定时任务每分钟 LPOP/LRANGE 取出 → 推送钉钉/企微
| 操作 | Redis 命令 | 说明 |
|---|---|---|
| 追加错误 | RPUSH key message | List 尾部追加,O(1) |
| 批量读取 | LRANGE key 0 -1 | 取所有消息 |
| 逐条消费 | LPOP key | 从头部弹出一条 |
| 批量消费 | LPOP key count (Redis 6.2+) | 弹出 N 条 |
| 设置过期 | EXPIRE key seconds | 自动清理过期数据 |
| 查看队列长度 | LLEN key | 监控积压量 |
import org.springframework.data.redis.core.StringRedisTemplate;import org.springframework.stereotype.Service;import java.time.LocalDateTime;import java.time.format.DateTimeFormatter;import java.util.List;import java.util.concurrent.TimeUnit;/** * 异步任务错误信息暂存服务. * 用于暂存自动任务的失败信息,供定时任务消费后推送告警. */@Servicepublic class ErrorMessageBufferService { private final StringRedisTemplate redisTemplate; // Redis key private static final String ERROR_BUFFER_KEY = "error:buffer:auto-task"; // 过期时间(防止无人消费时无限增长) private static final long EXPIRE_HOURS = 24; // 单次消费最大条数 private static final int BATCH_SIZE = 50; public ErrorMessageBufferService(StringRedisTemplate redisTemplate) { this.redisTemplate = redisTemplate; } /** * 写入错误信息. * * @param taskId 任务标识 * @param errorMsg 错误原因 * @param context 上下文信息(如 requestId) */ public void recordError(String taskId, String errorMsg, String context) { String timestamp = LocalDateTime.now() .format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); // 格式化消息内容 String message = String.format("[%s] 任务:%s, 上下文:%s, 失败原因:%s", timestamp, taskId, context, errorMsg); try { // 追加到 List 尾部 redisTemplate.opsForList().rightPush(ERROR_BUFFER_KEY, message); // 刷新过期时间(每次写入都续期,保证最后一条写入后24小时过期) redisTemplate.expire(ERROR_BUFFER_KEY, EXPIRE_HOURS, TimeUnit.HOURS); } catch (Exception e) { // Redis 操作失败不影响主流程,仅记录日志 log.warn("错误信息写入Redis失败, taskId={}", taskId, e); } } /** * 批量消费错误信息(定时任务调用). * * @return 本次消费的错误消息列表 */ public List<String> consumeErrors() { // 先查长度,取 min(length, BATCH_SIZE) 条 Long length = redisTemplate.opsForList().size(ERROR_BUFFER_KEY); if (length == null || length == 0) { return List.of(); } int count = (int) Math.min(length, BATCH_SIZE); // LRANGE 读取 + LTRIM 删除(两步操作,实际可用 Lua 保证原子性) List<String> messages = redisTemplate.opsForList().range(ERROR_BUFFER_KEY, 0, count - 1); redisTemplate.opsForList().trim(ERROR_BUFFER_KEY, count, -1); return messages; } /** * 查看当前积压的错误数量(监控用). */ public long getPendingCount() { Long size = redisTemplate.opsForList().size(ERROR_BUFFER_KEY); return size != null ? size : 0; }}import org.springframework.scheduling.annotation.Scheduled;import org.springframework.stereotype.Component;import java.util.List;/** * 定时消费错误缓存并推送告警. */@Componentpublic class ErrorAlertJob { private final ErrorMessageBufferService errorBufferService; private final AlertService alertService; public ErrorAlertJob(ErrorMessageBufferService errorBufferService, AlertService alertService) { this.errorBufferService = errorBufferService; this.alertService = alertService; } /** * 每分钟执行一次,消费错误消息并批量推送. */ @Scheduled(fixedDelay = 60000) public void consumeAndAlert() { List<String> errors = errorBufferService.consumeErrors(); if (errors.isEmpty()) { return; } // 拼接为一条告警消息 StringBuilder content = new StringBuilder(); content.append("⚠️ 自动任务失败告警(").append(errors.size()).append("条)nn"); for (String error : errors) { content.append("• ").append(error).append("n"); } // 推送到钉钉/企微/飞书 alertService.sendToWebhook(content.toString()); }}LRANGE + LTRIM 两步操作在并发消费时可能丢数据或重复消费,用 Lua 脚本保证原子性:
/** * 原子性批量弹出 List 元素. */public List<String> atomicBatchPop(String key, int count) { String luaScript = "local result = redis.call('lrange', KEYS[1], 0, ARGV[1] - 1) " + "if #result > 0 then " + " redis.call('ltrim', KEYS[1], #result, -1) " + "end " + "return result"; DefaultRedisScript<List> script = new DefaultRedisScript<>(luaScript, List.class); List<String> result = redisTemplate.execute( script, Collections.singletonList(key), String.valueOf(count)); return result != null ? result : List.of();}项目中通过数据库/Redis 存储消息模板,运行时动态填充参数:
/** * 模板化错误消息. */public class ErrorMessageTemplate { /** * 根据模板和参数生成最终消息. * * @param template 模板,如 "任务{1}失败,客户{2},原因:{3}" * @param args 参数数组 * @return 填充后的消息 */ public static String format(String template, String[] args) { String result = template; for (int i = 0; i < args.length; i++) { result = result.replace("{" + (i + 1) + "}", args[i]); } return result; }}// 使用String template = configService.getTemplate("auto-delivery-error");String message = ErrorMessageTemplate.format(template, new String[]{ orderCode, sellerCode, requestId, exception.getMessage()});errorBufferService.recordError(orderCode, message, requestId);| 维度 | 批量状态缓存 | 错误信息暂存 |
|---|---|---|
| 数据结构 | String(每个任务一个 key) | List(所有错误共用一个 key) |
| 生命周期 | 任务处理完即删除 | 定时消费后删除,或自然过期 |
| 并发模式 | 写少读多(前端轮询) | 写多读少(定时消费) |
| 一致性要求 | 强(标记必须准确) | 弱(少量丢失可接受) |
| 失败影响 | Redis 写失败可能导致重复处理 | Redis 写失败仅丢失告警,不影响主流程 |
| 典型 key 模式 | batch:delivery:{orderCode} | error:buffer:auto-delivery |
| 问题 | 解决方案 |
|---|---|
| Redis 宕机时标记丢失 | 状态缓存:降级为数据库 SELECT FOR UPDATE;错误暂存:降级为本地日志 |
| List 无限增长 | 设置 EXPIRE + 消费任务监控积压量告警 |
| 并发消费重复 | 单消费者模式,或用 Lua 原子弹出 |
| 消息格式变更 | 模板配置化,修改模板不需要改代码和重启 |
| 跨机房同步 | 错误暂存场景容忍延迟,可用异步复制;状态缓存需要同机房 Redis |