DolphinScheduler 的插件体系基于 Java SPI(Service Provider Interface) 机制,配合 Google AutoService 自动生成注册文件,实现了零侵入的插件化扩展
dolphinscheduler-spi ← 核心接口层(SPI基础设施)
dolphinscheduler-datasource-plugin ← 数据源插件层
dolphinscheduler-task-plugin ← Task插件层
dolphinscheduler-worker ← 插件消费层(Worker执行任务)
PrioritySPI (接口)
└── getIdentify(): SPIIdentify ← 插件唯一标识 + 优先级
└── compareTo(Integer) ← 优先级比较
PrioritySPIFactory 是插件发现的核心引擎
通过 Java 标准 ServiceLoader 扫描 classpath
for (T t : ServiceLoader.load(spiClass)) {
if (map.containsKey(t.getIdentify().getName())) {
resolveConflict(t); // 同名插件按优先级决策,优先级相同则抛异常
} else {
map.put(t.getIdentify().getName(), t);
}
}
插件注册方式:每个插件模块使用 @AutoService 注解,编译时自动在 META-INF/services/ 下生成 SPI 配置文件,无需手动维护
DataSourceChannelFactory (SPI入口)
└── getName() ← 插件唯一名称,如 "MYSQL"
└── create() ← 创建 DataSourceChannelDataSourceChannel (通道)
└── createAdHocDataSourceClient() ← 创建临时连接客户端
└── createPooledDataSourceClient() ← 创建连接池客户端DataSourceClient (基础接口)
└── getConnection(): ConnectionPooledDataSourceClient extends DataSourceClient
└── createDataSourcePool() ← 创建 HikariCP 连接池
MySQLDataSourceChannelFactory ← @AutoService 注册
└── create() → MySQLDataSourceChannel
└── createPooledDataSourceClient() → MySQLPooledDataSourceClient
└── extends BasePooledDataSourceClient
└── createDataSourcePool() → HikariDataSource
├── setDriverClassName()
├── setJdbcUrl()
├── setUsername() / setPassword()
├── setMinimumIdle() / setMaximumPoolSize()
└── setConnectionTestQuery()
TaskChannelFactory (SPI入口) extends UiChannelFactory, PrioritySPI
└── getName() ← 插件类型名,如 "SHELL"
└── create() ← 创建 TaskChannel
└── getParams() ← 返回 UI 配置参数(前端渲染用)TaskChannel (通道)
└── createTask(TaskExecutionContext) → AbstractTask
└── parseParameters(ParametersNode) → AbstractParameters
└── getResources(parameters) → ResourceParametersHelper
└── cancelApplication(boolean)AbstractTask (任务执行基类)
└── init() ← 初始化
└── handle(callback) ← 执行(抽象)
└── cancel() ← 取消(抽象)
└── getExitStatus() ← 根据 exitCode 返回状态
ShellTaskChannelFactory ← @AutoService 注册
└── getName() → "SHELL"
└── getParams() → [nodeName, runFlag, ...] ← 前端UI参数
└── create() → ShellTaskChannel
└── createTask(ctx) → ShellTask
└── handle(callback)
└── ShellCommandExecutor.run(shellActuatorBuilder)
└── 执行 Shell 脚本进程
