ThreadForge 源码解读二:一个 Task 从 submit 到完成 内部运行机制深度剖析

作者:袖梨 2026-06-14

上一篇( juejin.cn/post/763703…) 讲的是边界:ThreadScope 负责把一批并发任务收进同一个生命周期。

ThreadForge 源码解读二:一个 Task 从 submit 到完成,内部到底发生了什么?

这一篇往里再进一步,直接来看执行链路,我们带着问题来看:

1. 为什么要有 Task<T>

CompletableFuture 本身已经非常强大了,但业务代码处理并发阶段时,往往还需要这些东西:

  • 明确的任务名称
  • 一眼就能看明白的状态机
  • 实际执行线程的记录
  • 稳定的取消入口
  • 生命周期绑定
  • 更贴近业务的等待和异常语义

所以 ThreadForge 额外包装了一层 Task<T> 做对外暴露。

Task 的核心字段在 src/main/java/io/threadforge/Task.java

private final long id;
private final String name;
private final CompletableFuture<T> future;
private final AtomicReference<State> state;
private final AtomicReference<Thread> runnerThread;

状态定义如下:

public enum State {
    PENDING,
    RUNNING,
    SUCCESS,
    FAILED,
    CANCELLED
}

可以先这样来理解 :

2. Task 状态流转:从 PENDING 到终态

5 个状态,5 个状态切换方法,路径很少,清晰明了:

状态切换的入口都在 Task 上,包级可见,外部只能读:

// 包级 — 由 ThreadScope 在正确时机调用
boolean markRunning(Thread runner)
void markSuccess()
void markFailed()
void markCancelled()
void interruptRunner()

先看 markRunning(...)

boolean marked = state.compareAndSet(State.PENDING, State.RUNNING);
if (marked) {
    runnerThread.set(runner);
}

主要做了两件事:

  1. CAS 从 PENDING 切到 RUNNING——只有一个线程能成功
  2. 成功后把执行线程记下来——后面取消时知道该中断谁

如果 CAS 失败(比如任务在排队的间隙被 cancel 了),方法返回 false,调用方跳过执行。这就是 cancel 和 markRunning 之间的竞态保护:cancel 先改状态为 CANCELLED,markRunning 的 CAS 就会失败,任务不会开始跑。

其余三个终态方法结构对称——清掉 runner 引用,设置状态:

void markSuccess()    { runnerThread.set(null); state.set(State.SUCCESS); }
void markFailed()     { runnerThread.set(null); state.set(State.FAILED); }
void markCancelled()  { runnerThread.set(null); state.set(State.CANCELLED); }

终态后 runnerThread 清理成 null,避免外界持有一个已终止线程的句柄。

3. cancel:三步走,协作式终止

Task.cancel() 实现很简洁:

public boolean cancel() {
    state.set(State.CANCELLED);
    Thread runner = runnerThread.get();
    if (runner != null) {
        runner.interrupt();
    }
    return future.cancel(true);
}

其实就三步,保持顺序:

  1. 先设状态为 CANCELLED——这一步配合上一节讲的 markRunning CAS,抢占式阻止任务启动
  2. 再中断 runner——如果任务已经在跑,发中断信号;如果还没拿到 runner,跳过
  3. 最后取消底层 future——future.cancel(true) 会尝试中断 future 内部的执行线程

注意为什么是「协作式」终止:Thread.interrupt() 只是发信号,不是强行杀线程。任务代码、阻塞点、中断处理逻辑要一起配合才能停下来。ThreadForge 靠三层协作:

  • Thread.interrupt() —— 唤醒阻塞中的线程
  • CancellationToken.throwIfCancelled() —— 任务代码主动检查取消点
  • 任务业务逻辑对中断的响应 —— 捕获 InterruptedException 后做清理并退出

单独哪一层都不够,三层叠加才算是可靠。

4. submit 的完整时序:从参数校验到进入调度器

ThreadScope 有十几个 submit(...) 重载,全部收敛到一个私有方法。去掉参数校验和并发许可申请后,核心构造逻辑如下:

final CompletableFuture<T> future = new CompletableFuture<T>();
final Task<T> task = new Task<T>(id, name, future);
final TaskInfo info = new TaskInfo(scopeId, id, name, Instant.now(), scheduler.name());
final ExecutionContextCarrier executionContext = ExecutionContextCarrier.capture();
final ScheduledTask timeoutTask = scheduleTaskTimeout(task, info, taskTimeout);
tasks.add(task);
future.whenComplete((value, throwable) -> {
    tasks.remove(task);
    if (timeoutTask != null) {
        timeoutTask.cancel();
    }
});

然后交给调度器:

scheduler.executor().execute(Scheduler.prioritized(
    executionContext.wrapRunnable(() -> runTask(task, info, callable, taskRetryPolicy,
        permitAcquired ? semaphore : null)),
    taskPriority,
    id
));

如果调度器拒绝执行(RejectedExecutionException),框架会释放并发许可、标记任务失败并触发 hook。

逐行看:

  • CompletableFuture — 任务结果的载体
  • Task — 带状态机和元信息的句柄
  • TaskInfo — 观测用的快照数据
  • 捕获上下文 — Context 和 OpenTelemetry 双份
  • 注册任务级 timeout — 可选的独立时间预算
  • 登记到 tasks — scope 后续可以捞出所有任务
  • 注册 whenComplete 回调 — 任务无论成功、失败还是取消,结束时自动从 tasks 队列移除,同时取消关联的 timeout 任务
  • 包成带优先级的 runnable,交给调度器

上半段提交,下半段执行,最后按成功、失败、取消三条路径收尾。

5. runTask(...):真正执行任务的内核

runTask(...) 是整条链路的执行引擎。完整方法约 40 行,精简后主结构如下:

long started = System.nanoTime();
CompletableFuture<T> future = task.toCompletableFuture();try {
    // ① 启动前双重取消检查
    if (task.isCancelled() || token.isCancelled()) {
        completeTaskCancelled(task, future, new CancelledException("..."), info, started);
        return;
    }    // ② 状态机: PENDING → RUNNING
    if (!task.markRunning(Thread.currentThread())) {
        return;
    }    // ③ 触发观测
    safeHookStart(info);
    token.throwIfCancelled();    // ④ 真正执行(含重试)
    T value = RetryExecutor.execute(callable, retryPolicy, token);
    if (future.complete(value)) {
        task.markSuccess();
        safeHookSuccess(info, elapsedNanos(started));
    }
} catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    completeTaskCancelled(task, future, new CancelledException("..."), info, started);
} catch (CancelledException e) {
    completeTaskCancelled(task, future, e, info, started);
} catch (Throwable t) {
    completeTaskFailure(task, future, t, info, started);
} finally {
    if (acquiredSemaphore != null) {
        acquiredSemaphore.release(); // 释放并发许可
    }
}

异常走了三条分叉:

  • InterruptedException → 重置线程中断标志后走取消路径
  • CancelledException → 直接走取消路径
  • 其他 Throwable → 走失败路径(completeTaskFailure 内部通过 future.completeExceptionally(...) 把异常写入 future,再调用 task.markFailed() 和 hook)

失败、取消、重试、并发许可——全收在这一个方法里。

6. Scheduler:执行策略和所有权分离

private final ExecutorService executor;
private final boolean ownsExecutor;
private final String name;
private final boolean virtualThreadMode;

四个字段,四个职责:

  • executor — 任务交给谁执行
  • ownsExecutor — scope 关闭时要不要连带关掉执行器
  • name — 日志和观测里的标识
  • virtualThreadMode — 是否跑在虚拟线程上

6.1 Scheduler.detect()

public static Scheduler detect() {
    if (isVirtualThreadSupported()) {
        return virtualThreads();
    }
    return commonPool();
}

isVirtualThreadSupported() 通过反射探测 Executors.newVirtualThreadPerTaskExecutor()virtualThreads() 内部 DCL 延迟创建。反射找不到方法或调用失败,自动回退到 commonPool()。调用方一行不用改。

6.2 Scheduler.fixed(int size)

int queueCapacity = Math.max(256, size * 100);
new ThreadPoolExecutor(
    size, size, 60L, TimeUnit.SECONDS,
    new LinkedBlockingQueue<Runnable>(queueCapacity),
    new NamedThreadFactory("threadforge-fixed"),
    new ThreadPoolExecutor.CallerRunsPolicy()
);
executor.allowCoreThreadTimeOut(true);
  • core = max,固定线程数;线程打满后任务入有界队列
  • queueCapacity 至少 256,随线程数线性放大
  • CallerRunsPolicy:队列满时提交线程自己执行,形成自然反压
  • allowCoreThreadTimeOut(true):空闲超过 60 秒的核心线程也回收,避免高峰过后空转

6.3 Scheduler.priority(int size)

同样固定线程池,但队列换成无界的 PriorityBlockingQueue

new ThreadPoolExecutor(
    size, size, 60L, TimeUnit.SECONDS,
    new PriorityBlockingQueue<Runnable>(),
    new NamedThreadFactory("threadforge-priority"),
    new ThreadPoolExecutor.CallerRunsPolicy()
);

队列无界,CallerRunsPolicy 一般不会被触发。优先级靠 PrioritizedRunnable.compareTo(...) 实现:

int byPriority = Integer.compare(this.taskPriority.rank(), other.taskPriority.rank());
if (byPriority != 0) return byPriority;
return Long.compare(this.sequence, other.sequence);

先按 TaskPriority 排,同优先级按提交顺序(FIFO)。

无论哪种方式创建,最后都从 executor.execute(...) 出去。

7. ExecutionContextCarrier:线程切换时,上下文怎么跟着走

线程切换后,提交线程里的 Context 和 OpenTelemetry 上下文就断了——两者都基于线程局部变量。ExecutionContextCarrier 专门解决这个问题:

// 提交时捕获
static ExecutionContextCarrier capture() {
    return new ExecutionContextCarrier(
        Context.capture(),
        OpenTelemetryBridge.currentContext()
    );
}// 执行时安装,结束后恢复
Context.Snapshot previous = Context.install(contextSnapshot);
Object scope = OpenTelemetryBridge.makeCurrent(otelParentContext);
try {
    return callable.call();
} finally {
    OpenTelemetryBridge.closeScope(scope);
    Context.restore(previous);
}

传播了两类上下文:

  1. Context —— ThreadForge 自己的线程局部变量
  2. OpenTelemetry —— 外部 Tracing 系统的当前 span

8. RetryExecutor:重试为什么必须感知取消

RetryExecutor 的核心循环是这样的:

while (true) {
    token.throwIfCancelled();
    try {
        return callable.call();
    } catch (InterruptedException e) {
        throw e;                    // 中断直接透传,不重试
    } catch (CancelledException e) {
        throw e;                    // 取消直接透传,不重试
    } catch (Throwable failure) {
        if (!retryPolicy.allowsRetry(attempt, failure)) {
            // 不再重试,把之前所有失败附加到 suppressed
            if (previousFailures != null) {
                for (Throwable prev : previousFailures) {
                    if (prev != failure) failure.addSuppressed(prev);
                }
            }
            throw failure;
        }
        previousFailures.add(failure);
        sleepBeforeRetry(retryPolicy.nextDelay(attempt, failure), token);
        attempt++;
    }
}
  1. 中断和取消直接透传——不是业务失败,不能重试,框架必须立即响应
  2. 每次循环先检查 token——上次执行成功后,下一轮也要看 scope 是否已取消
  3. sleep 分片——每 100ms 醒来检查一次取消

sleepBeforeRetry(...) 实现:

long remainingMillis = delay.toMillis();
while (remainingMillis > 0L) {
    token.throwIfCancelled();
    long chunk = Math.min(remainingMillis, 100L);
    Thread.sleep(chunk);
    remainingMillis -= chunk;
}

如果最终还是失败,前面每次失败的异常会累加到最终异常的 suppressed 里——排查时能看到完整的失败历史,不是只有最后一次的错误信息。

分片 sleep 是关键:假设重试要等 5 秒,整段睡过去,中途收到取消信号就会反应慢半拍。分成 100ms 的小段,每段醒来检查一次取消,scope 一喊停就能更快响应。

9. 两层时间预算:scope deadline + task timeout

9.1 scope 级 deadline

withDeadline(Duration)

配置整个 ThreadScope 的截止时间。到达后:

  • deadlineTriggered = truetoken.cancel()
  • 作用域内所有任务收到取消信号
  • 后续等待抛出 ScopeTimeoutException

9.2 task 级 timeout

scope.submit("rpc-a", callable, Duration.ofMillis(200));

给单个任务设置独立预算。内部注册一个超时任务:

if (task.toCompletableFuture().completeExceptionally(timeoutException)) {
    task.markFailed();
    task.interruptRunner();
    safeHookFailure(info, timeoutException, timeout.toNanos());
}

只影响当前任务:标记失败、中断执行线程、触发 hook。不影响 scope 内其他任务。

类型作用范围触发后影响适合场景
scope deadline整个 ThreadScope取消整个作用域一个请求、一个批处理的总预算
task timeout单个 Task当前任务失败并中断 runner某个 RPC、IO 的独立预算

一个管全局,一个管局部。

10. Hook 和 Metrics:观测内嵌在执行链路里

ThreadHook 是 interface,4 个方法全是 default——只覆写需要的:

default void onStart(TaskInfo info) {}
default void onSuccess(TaskInfo info, Duration duration) {}
default void onFailure(TaskInfo info, Throwable error, Duration duration) {}
default void onCancel(TaskInfo info, Duration duration) {}

通过 andThen(...) 组合多个 hook,每个回调内部独立 catch 异常——前一个抛异常不阻止后续执行:

ThreadHook combined = hook1.andThen(hook2).andThen(hook3);

元信息来自 TaskInfo

new TaskInfo(scopeId, id, name, Instant.now(), scheduler.name())

内置指标由 ScopeMetrics 维护,LongAdder + AtomicLong,竞争开销低:

started / succeeded / failed / cancelled
totalDurationNanos / maxDurationNanos

观测和执行事件发生在同一时刻:

safeHookStart(info);   metrics.recordStart();
// ... 执行 ...
safeHookSuccess(...)   metrics.recordTerminal(Task.State.SUCCESS, ...)

任务走到哪个生命周期节点,hook 和 metrics 就跟到哪里。

写在最后

最近换工作,刚进了一个新团队。上家的节奏比较正常,现在几乎每天加班赶进度。

说不清忙和闲哪个更好。

职位又往上走了一步,压力也跟着翻了一倍。

向上对齐、向下兜底,还是技术最纯粹。没有汇报,没有上下级,简单而美好。

这一篇是 ThreadForge 源码解读的执行链路部分,下一篇继续,解读高阶编排怎么在一个 scope 里协同工作。 也算是给整个系列收官。

项目地址:github.com/wuuJiawei/T…,欢迎提 issue,也欢迎 star。

相关文章

精彩推荐