Skip to content

Latest commit

 

History

History
468 lines (321 loc) · 21.8 KB

README_zh_CN.md

File metadata and controls

468 lines (321 loc) · 21.8 KB

License version Java CI with Maven EN doc

简介

  • 本项目定义了一个通用的工作流描述规范(FlowSpec)
  • 我们开发了基于FlowSpec的迁移工具(MigrationX)来迁移不同工作流调度系统的工作流模型
  • 我们可以利用这个工具来开发其他调度系统的工作流模型转换工具

FlowSpec字段参考

CycleWorkflow

CycleWorkflow用于描述周期性的工作流,包含任务节点和依赖关系

字段列表

字段名 字段类型 是否必填 字段说明
version string 版本号
kind string CycleWorkflow
metadata Metadata 定义工作流的扩展元信息
spec CycleWorkflowSpec CycleWorkflow的定义

ManualWorkflow

ManualWorkflow用于描述手动触发的工作流,包含任务节点和依赖关系

字段列表

字段名 字段类型 是否必填 字段说明
version string 版本信息
kind string ManualWorkflow
metadata Metadata 定义工作流的扩展元信息
spec ManualWorkflowSpec ManualWorkflow的定义

CycleWorkflowSpec

CycleWorkflowSpec是周期调度工作流CycleWorkflow的定义描述

字段列表

Field Name 字段类型 是否必填 字段说明
nodes Array<Node> 周期任务节点列表
variables Array<Variable> 变量定义列表
scripts Array<Script> 脚本定义列表
triggers Array<Trigger> 周期调度定义列表
artifacts Array<Artifact> 产出物定义列表
runtimeResources Array<RuntimeResource> 运行时资源定义列表
fileResources Array<FileResource> 文件资源列表
functions Array<Function> 函数定义列表
flow Array<Flow> 定义节点之间的依赖关系

ManualWorkflowSpec

ManualWorkflowSpec是手动触发工作流ManualWorkflow的定义描述

字段列表

Field Name 字段类型 是否必填 字段说明
nodes Array<Node> 周期任务节点列表
variables Array<Variable> 变量定义列表
scripts Array<Script> 脚本定义列表
artifacts Array<Artifact> 产出物定义列表
runtimeResources Array<RuntimeResource> 运行时资源定义列表
fileResources Array<FileResource> 文件资源列表
functions Array<Function> 函数定义列表
flow Array<Flow> 定义节点之间的依赖关系

Metadata

定义工作流的扩展元信息

字段列表

Field Name 字段类型 是否必填 字段说明
owner string 责任人
description string 字段说明

Node

工作流节点的定义描述

字段列表

字段名 字段类型 是否必填 字段说明
id string 节点在Spec中的唯一ID
name string 节点名
script Script 节点所需的脚本定义或者引用
functions Array<Function> 节点所需的函数定义或者引用
fileResources Array<FileResource> 节点所需的文件资源定义或者引用
inputs Inputs 节点的输入. 可以是:TableArtifact, NodeArtifcat, Variable
outputs Outputs 节点输出,可以是:TableArtifact, NodeArtifcat, Variable
runtimeResource RuntimeResource 节点的运行时资源定义或引用
recurrence string recurrence定义周期调度节点的调度状态, 具体值参考枚举类型: Recurrence
priority integer 定义节点的优先级, 数值越大优先级越高
timeout integer 定义节点的超时时间,运行超过指定时间后,节点将会被kill掉,单位为秒
instanceMode string 节点的实例化模式,具体值参考枚举类型:InstanceMode
rerunMode string 节点重试策略,具体值参考枚举类型:RerunMode

Flow

flow部分定义了工作流节点之间的依赖关系。

字段列表

字段名 字段类型 是否必填 字段说明
nodeId string 节点的唯一标识符,全局唯一,用于标识工作流中的节点
depends Array<FlowDepend> 依赖的节点列表

FlowDepend

FlowDepend定义了工作流节点之间的依赖关系。

字段名 字段类型 是否必填 字段说明
nodeId string 节点的唯一标识
type string 节点依赖的类型,具体值参考枚举类型:DependencyType

Variable

Variable定义了工作流的变量。Variable可以被在工作流节点中引用

字段列表

字段名 字段类型 是否必填 字段说明
id string 唯一标识
name string 变量名
scope string 变量作用域, 具体值参考:VariableScope
type string 变量类型,具体值参考: VariableType
value string 变量值

Script

Script定义了脚本资源。Script可以被在工作流节点、资源或函数中引用

字段列表

字段名 字段类型 是否必填 字段说明
id string 唯一标识
path string 脚本路径
language string 脚本语言
runtime Runtime 脚本运行时环境定义
parameters Array<Variable> 脚本参数名及表达式定义列表

Trigger

Trigger定义了工作流的触发器。Trigger决定工作流节点的周期调度运行的开始时间

字段列表

字段名 字段类型 是否必填 字段说明
id string 唯一标识
type string 触发器类型, 可选值: Scheduler, Manual
cron string 周期调度触发器的定时表达式
startTime string 周期调度的起始生效时间,节点只在startTimeendTime时间段内执行周期调度运行
endTime string 周期调度的结束生效时间,节点只在startTimeendTime时间段内执行周期调度运行
timezone string 周期调度时间的时区

Artifact

Artifact的类型可以是NodeOutput, Table, Variable. 都可以作为工作流节点的输入或者输出

Table

字段名 字段类型 是否必填 字段说明
guid string 产出表唯一标识符

NodeOutput

字段名 字段类型 是否必填 字段说明
output string 节点调度输出标识

RuntimeResource

RuntimeResource定义了节点的运行时资源,如:调度资源组,YARN集群等

字段列表

字段名 字段类型 是否必填 字段说明
id string 唯一标识符
resourceGroup string 资源组唯一标识

FileResource

FileResource定义了文件资源,如:jar, python, text file, archive files等。FileResource可以被节点、函数引用

字段列表

字段名 字段类型 是否必填 字段说明
id string 唯一标识
name string 资源名称
script Script 资源文件

Function

定义工作流节点使用的UDF

字段列表

字段名称 字段类型 是否必填 字段说明
id string 唯一标识
name string 函数名称
script Script 使用的脚本
fileResources Array<FileResource> 使用的资源列表

Runtime

Runtime定义脚本的运行时环境,如:命令,运行时引擎,镜像等

字段列表

字段名 字段类型 是否必填 字段说明
engine string 运行时引擎
command string 运行时环境的命令标识

Outputs

输出定义了工作流节点的产出信息,Outputs可以被下游工作流节点消费

字段列表

字段名 字段类型 是否必填 字段说明
tables Array<Table> 节点产出表的列表
variables Array<Variable> 节点产出变量的列表
nodeOutputs Array<NodeOutput> 节点的预定义输出的列表

Inputs

Inputs定义节点输入依赖,可以是上游节点Outputs定义的NodeArtifact, TableArtifact, Variable`

字段列表

字段名 字段类型 是否必填 字段说明
tables Array<Table> tables 节点使用的上游产出表列表
variables Array<Variable> variables 节点依赖的变量Variable列表
nodeOutputs Array<NodeOutput> nodeOutputs 节点使用的上游节点的预定义输出列表

Enumerations

枚举说明

VariableScope

变量作用域

枚举值 字段说明
NodeParameter NodeParameter 表示只有节点内使用的参数
NodeContext NodeContext 表示节点的上下文中的参数,可以被下游节点使用
Workflow Workflow 表示工作流级别的参数,可以被工作流所有节点使用
Workspace Workspace 表示工作空间级别的参数,可以被工作空间所有节点使用
Tenant Tenant 表示租户级别参数,可以被租户的所有节点使用

VariableType

变量类型

枚举值 字段说明
System System 表示系统变量,例如: $yyyymmdd
Constant Constant 表示常量

DependencyType

依赖类型

枚举值 字段说明
Normal Normal 表示节点依赖的是同周期的对应的节点示例
CrossCycleDependsOnSelf CrossCycleDependsOnSelf 表示节点当前周期的实例依赖的是上一周期的自己的实例
CrossCycleDependsOnChildren CrossCycleDependsOnChildren 表示节点当前周期的实例依赖的是上一周期自己的所有子节点实例
CrossCycleDependsOnOtherNode CrossCycleDependsOnOtherNode 表示节点当前周期的实例依赖的是上一周期的指定节点的实例

Recurrence

周期调度状态类型

枚举值 字段说明
Normal Normal表示节点会被正常按照指定调度周期调度运行
Skip Skip表示节点会被正常按照指定调度周期实例化,但是代码不会被运行生效,任务状态直接设置为成功
Pause Pause表示节点会被正产按照指定调度周期实例化,但是状态直接设置为失败

RerunMode

重跑策略

枚举值 字段说明
Allowed Allowed任何情况下都允许重跑
Denied Denied 任何情况下都不允许重跑
FailureAllowed FailureAllowed 失败的情况下允许重跑

InstanceMode

实例化模式

枚举值 字段说明
T+1 T+1表示节点的配置修改将会在T+1天后生效
Immediately Immediately 表示节点配置修改将立即生效

FlowSpec 示例

真实案例

EMR/CDH 示例

不使用变量引用方式的定义示例

使用变量引用方式的定义

简单示例

分支节点

归并节点

for-each/do-while 节点

手动工作流

EMR节点

资源定义

函数定义

参数节点

迁移助手导入包示例

  • 包的目录结构和目前DataWorks数据开发的目录树结构保持一致
  • *.sql, *.sh, *.hql 是用户的脚本文件
  • *.flow 脚本文件对应的节点调度配置文件
➜  project_c_dw tree
.
└── 业务流程
    ├── project_c_dag_3zq3ei4d6
    │   └── ClickHouse
    │       └── 数据开发
    │           ├── clickhouse_sql_1.flow
    │           └── clickhouse_sql_1.sql
    └── project_c_demo_workflow_1
        ├── EMR
        │   └── 数据开发
        │       ├── demo_hive_sql_1.flow
        │       ├── demo_hive_sql_1.hql
        │       ├── demo_pg_sql_1.flow
        │       ├── demo_pg_sql_1.hql
        │       ├── demo_shell_1.flow
        │       ├── demo_shell_1.sh
        │       ├── demo_sql_1.flow
        │       └── demo_sql_1.hql
        └── 通用
            ├── dep_ck_1
            └── dep_ck_1.flow

DataWorks Migration Package FlowSpec example

FlowSpec 工具

MigrationX 是一款基于 FlowSpec 规范的工作流迁移工具,它可以将 Dolphinscheduler 工作流转换为 DataWorks FlowSpec。

  • MigrationX
    • Dolphinscheduler 一键迁移到 Dataworks DataStudio
    • Dolphinscheduler 工作流一键导出
    • 将Dolphinscheduler工作流转换为 DataWorks FlowSpec
    • 将FlowSpec包通过OpenAPI导入DataWorks DataStudio

整体架构

image

领域模型

定义不同工作流引擎的领域模型

Reader

实现不同工作流系统的工作流导出

Transformer

实现不同工作流系统之间的模型转换

Writer

实现工作流系统的工作流导入

用法

用法

Modules

  • migrationx-common,通用模块
  • migrationx-domain,工作流引擎的模型定义
  • migrationx-reader,工作流引擎的导出实现
  • migrationx-transformer,工作流引擎模型的转换实现
  • migrationx-writer,工作流引擎的导入实现

开发指南

开发指南

贡献

  • 阿里云-DataWorks-开发分析建模团队