上一篇( juejin.cn/post/763703…) 讲的是边界:ThreadScope 负责把一批并发任务收进同一个生命周期。

这一篇往里再进一步,直接来看执行链路,我们带着问题来看:
Task<T>CompletableFuture 本身已经非常强大了,但业务代码处理并发阶段时,往往还需要这些东西:
所以 ThreadForge 额外包装了一层 Task<T> 做对外暴露。
Task 的核心字段在 src/main/java/io/threadforge/Task.java:
private final long id;
private final String name;
private final CompletableFuture<T> future;
private final AtomicReference<State> state;
private final AtomicReference<Thread> runnerThread;
状态定义如下:
public enum State {
PENDING,
RUNNING,
SUCCESS,
FAILED,
CANCELLED
}
可以先这样来理解 :
5 个状态,5 个状态切换方法,路径很少,清晰明了:
状态切换的入口都在 Task 上,包级可见,外部只能读:
// 包级 — 由 ThreadScope 在正确时机调用
boolean markRunning(Thread runner)
void markSuccess()
void markFailed()
void markCancelled()
void interruptRunner()
先看 markRunning(...):
boolean marked = state.compareAndSet(State.PENDING, State.RUNNING);
if (marked) {
runnerThread.set(runner);
}
主要做了两件事:
PENDING 切到 RUNNING——只有一个线程能成功如果 CAS 失败(比如任务在排队的间隙被 cancel 了),方法返回 false,调用方跳过执行。这就是 cancel 和 markRunning 之间的竞态保护:cancel 先改状态为 CANCELLED,markRunning 的 CAS 就会失败,任务不会开始跑。
其余三个终态方法结构对称——清掉 runner 引用,设置状态:
void markSuccess() { runnerThread.set(null); state.set(State.SUCCESS); }
void markFailed() { runnerThread.set(null); state.set(State.FAILED); }
void markCancelled() { runnerThread.set(null); state.set(State.CANCELLED); }
终态后 runnerThread 清理成 null,避免外界持有一个已终止线程的句柄。
Task.cancel() 实现很简洁:
public boolean cancel() {
state.set(State.CANCELLED);
Thread runner = runnerThread.get();
if (runner != null) {
runner.interrupt();
}
return future.cancel(true);
}
其实就三步,保持顺序:
CANCELLED——这一步配合上一节讲的 markRunning CAS,抢占式阻止任务启动future.cancel(true) 会尝试中断 future 内部的执行线程注意为什么是「协作式」终止:Thread.interrupt() 只是发信号,不是强行杀线程。任务代码、阻塞点、中断处理逻辑要一起配合才能停下来。ThreadForge 靠三层协作:
Thread.interrupt() —— 唤醒阻塞中的线程CancellationToken.throwIfCancelled() —— 任务代码主动检查取消点InterruptedException 后做清理并退出单独哪一层都不够,三层叠加才算是可靠。
ThreadScope 有十几个 submit(...) 重载,全部收敛到一个私有方法。去掉参数校验和并发许可申请后,核心构造逻辑如下:
final CompletableFuture<T> future = new CompletableFuture<T>();
final Task<T> task = new Task<T>(id, name, future);
final TaskInfo info = new TaskInfo(scopeId, id, name, Instant.now(), scheduler.name());
final ExecutionContextCarrier executionContext = ExecutionContextCarrier.capture();
final ScheduledTask timeoutTask = scheduleTaskTimeout(task, info, taskTimeout);
tasks.add(task);
future.whenComplete((value, throwable) -> {
tasks.remove(task);
if (timeoutTask != null) {
timeoutTask.cancel();
}
});
然后交给调度器:
scheduler.executor().execute(Scheduler.prioritized(
executionContext.wrapRunnable(() -> runTask(task, info, callable, taskRetryPolicy,
permitAcquired ? semaphore : null)),
taskPriority,
id
));
如果调度器拒绝执行(RejectedExecutionException),框架会释放并发许可、标记任务失败并触发 hook。
逐行看:
CompletableFuture — 任务结果的载体Task — 带状态机和元信息的句柄TaskInfo — 观测用的快照数据Context 和 OpenTelemetry 双份tasks — scope 后续可以捞出所有任务whenComplete 回调 — 任务无论成功、失败还是取消,结束时自动从 tasks 队列移除,同时取消关联的 timeout 任务上半段提交,下半段执行,最后按成功、失败、取消三条路径收尾。
runTask(...):真正执行任务的内核runTask(...) 是整条链路的执行引擎。完整方法约 40 行,精简后主结构如下:
long started = System.nanoTime();
CompletableFuture<T> future = task.toCompletableFuture();try {
// ① 启动前双重取消检查
if (task.isCancelled() || token.isCancelled()) {
completeTaskCancelled(task, future, new CancelledException("..."), info, started);
return;
} // ② 状态机: PENDING → RUNNING
if (!task.markRunning(Thread.currentThread())) {
return;
} // ③ 触发观测
safeHookStart(info);
token.throwIfCancelled(); // ④ 真正执行(含重试)
T value = RetryExecutor.execute(callable, retryPolicy, token);
if (future.complete(value)) {
task.markSuccess();
safeHookSuccess(info, elapsedNanos(started));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
completeTaskCancelled(task, future, new CancelledException("..."), info, started);
} catch (CancelledException e) {
completeTaskCancelled(task, future, e, info, started);
} catch (Throwable t) {
completeTaskFailure(task, future, t, info, started);
} finally {
if (acquiredSemaphore != null) {
acquiredSemaphore.release(); // 释放并发许可
}
}
异常走了三条分叉:
InterruptedException → 重置线程中断标志后走取消路径CancelledException → 直接走取消路径Throwable → 走失败路径(completeTaskFailure 内部通过 future.completeExceptionally(...) 把异常写入 future,再调用 task.markFailed() 和 hook)失败、取消、重试、并发许可——全收在这一个方法里。
private final ExecutorService executor;
private final boolean ownsExecutor;
private final String name;
private final boolean virtualThreadMode;
四个字段,四个职责:
executor — 任务交给谁执行ownsExecutor — scope 关闭时要不要连带关掉执行器name — 日志和观测里的标识virtualThreadMode — 是否跑在虚拟线程上Scheduler.detect()public static Scheduler detect() {
if (isVirtualThreadSupported()) {
return virtualThreads();
}
return commonPool();
}
isVirtualThreadSupported() 通过反射探测 Executors.newVirtualThreadPerTaskExecutor(),virtualThreads() 内部 DCL 延迟创建。反射找不到方法或调用失败,自动回退到 commonPool()。调用方一行不用改。
Scheduler.fixed(int size)int queueCapacity = Math.max(256, size * 100);
new ThreadPoolExecutor(
size, size, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(queueCapacity),
new NamedThreadFactory("threadforge-fixed"),
new ThreadPoolExecutor.CallerRunsPolicy()
);
executor.allowCoreThreadTimeOut(true);
queueCapacity 至少 256,随线程数线性放大CallerRunsPolicy:队列满时提交线程自己执行,形成自然反压allowCoreThreadTimeOut(true):空闲超过 60 秒的核心线程也回收,避免高峰过后空转Scheduler.priority(int size)同样固定线程池,但队列换成无界的 PriorityBlockingQueue:
new ThreadPoolExecutor(
size, size, 60L, TimeUnit.SECONDS,
new PriorityBlockingQueue<Runnable>(),
new NamedThreadFactory("threadforge-priority"),
new ThreadPoolExecutor.CallerRunsPolicy()
);
队列无界,CallerRunsPolicy 一般不会被触发。优先级靠 PrioritizedRunnable.compareTo(...) 实现:
int byPriority = Integer.compare(this.taskPriority.rank(), other.taskPriority.rank());
if (byPriority != 0) return byPriority;
return Long.compare(this.sequence, other.sequence);
先按 TaskPriority 排,同优先级按提交顺序(FIFO)。
无论哪种方式创建,最后都从 executor.execute(...) 出去。
线程切换后,提交线程里的 Context 和 OpenTelemetry 上下文就断了——两者都基于线程局部变量。ExecutionContextCarrier 专门解决这个问题:
// 提交时捕获
static ExecutionContextCarrier capture() {
return new ExecutionContextCarrier(
Context.capture(),
OpenTelemetryBridge.currentContext()
);
}// 执行时安装,结束后恢复
Context.Snapshot previous = Context.install(contextSnapshot);
Object scope = OpenTelemetryBridge.makeCurrent(otelParentContext);
try {
return callable.call();
} finally {
OpenTelemetryBridge.closeScope(scope);
Context.restore(previous);
}
传播了两类上下文:
Context —— ThreadForge 自己的线程局部变量RetryExecutor 的核心循环是这样的:
while (true) {
token.throwIfCancelled();
try {
return callable.call();
} catch (InterruptedException e) {
throw e; // 中断直接透传,不重试
} catch (CancelledException e) {
throw e; // 取消直接透传,不重试
} catch (Throwable failure) {
if (!retryPolicy.allowsRetry(attempt, failure)) {
// 不再重试,把之前所有失败附加到 suppressed
if (previousFailures != null) {
for (Throwable prev : previousFailures) {
if (prev != failure) failure.addSuppressed(prev);
}
}
throw failure;
}
previousFailures.add(failure);
sleepBeforeRetry(retryPolicy.nextDelay(attempt, failure), token);
attempt++;
}
}
sleepBeforeRetry(...) 实现:
long remainingMillis = delay.toMillis();
while (remainingMillis > 0L) {
token.throwIfCancelled();
long chunk = Math.min(remainingMillis, 100L);
Thread.sleep(chunk);
remainingMillis -= chunk;
}
如果最终还是失败,前面每次失败的异常会累加到最终异常的 suppressed 里——排查时能看到完整的失败历史,不是只有最后一次的错误信息。
分片 sleep 是关键:假设重试要等 5 秒,整段睡过去,中途收到取消信号就会反应慢半拍。分成 100ms 的小段,每段醒来检查一次取消,scope 一喊停就能更快响应。
withDeadline(Duration)
配置整个 ThreadScope 的截止时间。到达后:
deadlineTriggered = true,token.cancel()ScopeTimeoutExceptionscope.submit("rpc-a", callable, Duration.ofMillis(200));
给单个任务设置独立预算。内部注册一个超时任务:
if (task.toCompletableFuture().completeExceptionally(timeoutException)) {
task.markFailed();
task.interruptRunner();
safeHookFailure(info, timeoutException, timeout.toNanos());
}
只影响当前任务:标记失败、中断执行线程、触发 hook。不影响 scope 内其他任务。
| 类型 | 作用范围 | 触发后影响 | 适合场景 |
|---|---|---|---|
| scope deadline | 整个 ThreadScope | 取消整个作用域 | 一个请求、一个批处理的总预算 |
| task timeout | 单个 Task | 当前任务失败并中断 runner | 某个 RPC、IO 的独立预算 |
一个管全局,一个管局部。
ThreadHook 是 interface,4 个方法全是 default——只覆写需要的:
default void onStart(TaskInfo info) {}
default void onSuccess(TaskInfo info, Duration duration) {}
default void onFailure(TaskInfo info, Throwable error, Duration duration) {}
default void onCancel(TaskInfo info, Duration duration) {}
通过 andThen(...) 组合多个 hook,每个回调内部独立 catch 异常——前一个抛异常不阻止后续执行:
ThreadHook combined = hook1.andThen(hook2).andThen(hook3);
元信息来自 TaskInfo:
new TaskInfo(scopeId, id, name, Instant.now(), scheduler.name())
内置指标由 ScopeMetrics 维护,LongAdder + AtomicLong,竞争开销低:
started / succeeded / failed / cancelled
totalDurationNanos / maxDurationNanos
观测和执行事件发生在同一时刻:
safeHookStart(info); metrics.recordStart();
// ... 执行 ...
safeHookSuccess(...) metrics.recordTerminal(Task.State.SUCCESS, ...)
任务走到哪个生命周期节点,hook 和 metrics 就跟到哪里。
最近换工作,刚进了一个新团队。上家的节奏比较正常,现在几乎每天加班赶进度。
说不清忙和闲哪个更好。
职位又往上走了一步,压力也跟着翻了一倍。
向上对齐、向下兜底,还是技术最纯粹。没有汇报,没有上下级,简单而美好。
这一篇是 ThreadForge 源码解读的执行链路部分,下一篇继续,解读高阶编排怎么在一个 scope 里协同工作。 也算是给整个系列收官。
项目地址:github.com/wuuJiawei/T…,欢迎提 issue,也欢迎 star。