一、任务编排工作流

任务编排是什么意思呢,顾名思义就是可以把"任务"这个原子单位按照自己的方式进行编排,任务之间可能互相依赖。复杂一点的编排之后就能形成一个 workflow 工作流了。我们希望这个工作流按照我们编排的方式去执行每个原子 task 任务。如下图所示,我们希望先并发运行 Task A 和 Task C,Task A 执行完后串行运行 Task B,在并发等待 Task B 和 C 都结束后运行 Task D,这样就完成了一个典型的任务编排工作流。

二、DAG 有向无环图

首先我们了解图这个数据结构,每个元素称为顶点 vertex,顶点之间的连线称为边 edge。像我们画的这种带箭头关系的称为有向图,箭头关系之间能形成一个环的成为有环图,反之称为无环图。显然运用在我们任务编排工作流上,最合适的是 DAG 有向无环图。

我们在代码里怎么存储图呢,有两种数据结构:邻接矩阵和邻接表。

下图表示一个有向图的邻接矩阵,例如 x->y 的边,只需将 Array[x][y]标识为 1 即可。

此外我们也可以使用邻接表来存储,这种存储方式较好地弥补了邻接矩阵浪费空间的缺点,但相对来说邻接矩阵能更快地判断连通性。

一般在代码实现上,我们会选择邻接矩阵,这样我们在判断两点之间是否有边更方便点。

三、一个任务编排框架

了解了 DAG 的基本知识后我们可以来简单实现一下。

了解JUC包的可能快速想到CompletableFuture,这个类对于多个并发线程有复杂关系耦合的场景是很适用的,如果是一次性任务,那么使用CompletableFuture完全没有问题。但是作为框架或者平台来说,我们还需要考虑存储节点状态、重试执行等逻辑,对于这些CompletableFuture是不能满足的。

我们需要更完整地考虑与设计这个框架。首先是存储结构,我们的 Dag 表示一整个图,Node 表示各个顶点,每个顶点有其 parents 和 children:

//Dag
public final class DefaultDag<T, R> implements Dag<T, R> {
	private Map<T, Node<T, R>> nodes = new HashMap<T, Node<T, R>>();
//Node
public final class Node<T, R> {
	 * incoming dependencies for this node
    private Set<Node<T, R>> parents = new LinkedHashSet<Node<T, R>>();
     * outgoing dependencies for this node
    private Set<Node<T, R>> children = new LinkedHashSet<Node<T, R>>();

画两个顶点,以及为这两个顶点连边操作如下:

public void addDependency(final T evalFirstNode, final T evalLaterNode) {
	Node<T, R> firstNode = createNode(evalFirstNode);
	Node<T, R> afterNode = createNode(evalLaterNode);
	addEdges(firstNode, afterNode);
private Node<T, R> createNode(final T value) {
	Node<T, R> node = new Node<T, R>(value);
	return node;
private void addEdges(final Node<T, R> firstNode, final Node<T, R> afterNode) {
	if (!firstNode.equals(afterNode)) {
		firstNode.getChildren().add(afterNode);
		afterNode.getParents().add(firstNode);

到现在我们其实已经把基础数据结构写好了,但我们作为一个任务编排框架最终是需要线程去执行的,我们把它和线程池一起给包装一下。

//任务编排线程池
public class DefaultDexecutor <T, R> {
    //执行线程,和2种重试线程
	private final ExecutorService<T, R> executionEngine;
	private final ExecutorService immediatelyRetryExecutor;
	private final ScheduledExecutorService scheduledRetryExecutor;
    //执行状态
	private final ExecutorState<T, R> state;
//执行状态
public class DefaultExecutorState<T, R> {
    //底层图数据结构
	private final Dag<T, R> graph;
    //已完成
	private final Collection<Node<T, R>> processedNodes;
    //未完成
	private final Collection<Node<T, R>> unProcessedNodes;
    //错误task
	private final Collection<ExecutionResult<T, R>> erroredTasks;
    //执行结果
	private final Collection<ExecutionResult<T, R>> executionResults;

可以看到我们的线程包括执行线程池,2 种重试线程池。我们使用 ExecutorState 来保存一些整个任务工作流执行过程中的一些状态记录,包括已完成和未完成的 task,每个 task 执行的结果等。同时它也依赖我们底层的图数据结构 DAG。

接下来我们要做的事其实很简单,就是 BFS 这整个 DAG 数据结构,然后提交到线程池中去执行就可以了,过程中注意一些节点状态的保持,结果的保存即可。

还是以上图为例,值得说的一点是在 Task D 这个点需要有一个并发等待的操作,即 Task D 需要依赖 Task B 和 Task C 执行结束后再往下执行。这里有很多办法,我选择了共享变量的方式来完成并发等待。遍历工作流中被递归的方法的伪代码如下:

private void doProcessNodes(final Set<Node<T, R>> nodes) {
		for (Node<T, R> node : nodes) {
        //共享变量 并发等待
        if (!processedNodes.contains(node) && processedNodes.containsAll(node.getParents())) {
            Task<T, R> task = newTask(node);
            this.executionEngine.submit(task);
            ExecutionResult<T, R> executionResult = this.executionEngine.processResult();
            if (executionResult.isSuccess()) {
	    	state.markProcessingDone(processedNode);
            //继续执行孩子节点
	    doExecute(processedNode.getChildren());

这样我们基本完成了这个任务编排框架的工作,现在我们可以如下来进行示例图中的任务编排以及执行:

DefaultExecutor<String, String> executor = newTaskExecutor();
executor.addDependency("A", "B");
executor.addDependency("B", "D");
executor.addDependency("C", "D");
executor.execute();

四、任务编排平台化

好了现在我们已经有一款任务编排框架了,但很多时候我们想要可视化、平台化,让使用者更加无脑。

框架与平台最大的区别在哪里?是可拖拽的可视化输入么?我觉得这个的复杂度更多在前端。而对于后端平台来讲,与框架最大的区别是数据的持久化。

对于 DAG 的顶点来说,我们需要将每个节点 Task 的信息给持久化到关系数据库中,包括 Task 的状态、输出结果等。而对于 DAG 的边来说,我们也得用数据库来存储各 Task 之间的方向关系。此外,在遍历执行 DAG 的整个过程中的中间状态数据,我们也得搬运到数据库中。

首先我们可以设计一个 workflow 表,来表示一个工作流。接着我们设计一个 task 表,来表示一个执行单元。task 表主要字段如下,这里主要是 task_parents 的设计,它是一个 string,存储 parents 的 taskId,多个由分隔符分隔。

task_id
workflow_id
task_name
task_status
result
task_parents


依赖是上图这个例子,对比框架来说,我们首先得将其存储到数据库中去,最终可能得到如下数据:

task_id  workflow_id  task_name  task_status  result  task_parents
  1          1           A           0                    -1
  2          1           B           0                    1
  3          1           C           0                    -1
  4          1           D           0                    2,3

可以看到,这样也能很好地存储 DAG 数据,和框架中代码的输入方式差别并不是很大。

接下来我们要做的是遍历执行整个 workflow,这边和框架的差别也不大。首先我们可以利用select * from task where workflow_id = 1 and task_parents = -1来获取初始化节点 Task A 和 Task C,将其提交到我们的线程池中。

接着对应框架代码中的doExecute(processedNode.getChildren());,我们使用select * from task where task_parents like %3%,就可以得到 Task C 的孩子节点 Task D,这里使用了模糊查询是因为我们的 task_parents 可能是由多个父亲的 taskId 与分隔号组合而成的字符串。查询到孩子节点后,继续提交到线程池即可。

别忘了我们在 Task D 这边还有一个并发等待的操作,对应框架代码中的if (!processedNodes.contains(node) && processedNodes.containsAll(node.getParents()))。这边我们只要判断select count(1) from task where task_id in (2,3) and status != 1的个数为 0 即可,即保证 parents task 全部成功。

另外值得注意的是 task 的重试。在框架中,失败 task 的重试可以是立即使用当前线程重试或者放到一个定时线程池中去重试。而在平台上,我们的重试基本上来自于用户在界面上的点击,即主线程。

至此,我们已经将任务编排框架的功能基本平台化了。作为一个任务编排平台,可拖拽编排的可视化输入、整个工作流状态的可视化展示、任务的可人工重试都是其优点。

五、基于有向无环图(DAG)的任务调度Demo

文章: https://blog.csdn.net/dbqb007/article/details/89042984

创建功能分支( git checkout -b my-new-feature ) 提交更改( git commit -am 'Add some feature' ) 推送到分支( git push origin my-new-feature ) 创建新的拉取请求 Apache DolphinScheduler(目前处在孵化阶段)是一个分布式、去中心化、易扩展的可视化DAG工作流任务调度系统,其致力于解决数据处理流程中错综复杂的依赖关系,使调度系统在数据处理流程中开箱即用。 DolphinScheduler是今年(2019年)中国易观公司开源的一个调度系统,在今年美国时间2019年8月29号,易观开源的分布式任务调度引擎DolphinScheduler(原EasyScheduler)正式通过顶级开源组织Apache基金会的投票决议,根据Apache基金会
AI之风盛行,为什么这么说?记得去年参加公司的校园招聘时,在面试的50人中95%的人学校都有人工智能的课程,如python、matlab等,但很少的人从事过具体应用实践,更不必说工程化场景了,由此看出AI的热度。人工智能前景的确很不错,市场也确实很有刚需,但真正要掌握好,真不是照着教科书打个helloworld那么容易。所以说,为了降低门槛,一直想做个工具,实现从数据准备、特征工程、模型训练、到评估的整个过程,也就是常说由多任务组成的pipeline。 在一个流程系统中,任务间往往存在复杂的依赖关系,为保
用于编排执行的作业处理器和任务框架。 该项目是开放的以供发表意见/建议/建议 作业处理器是一种类似于工作流来表示流程的方式。 作业处理器中的所有内容均来自TaskNode。 总体轮廓是创建一个请求以启动该过程。 然后,流程创建一个执行上下文,该上下文在流程的生命周期内从一个节点流向另一个节点。 通过生成错误或通过请求取消可以停止流。 如果产生错误,则将执行流的补偿部分或流的分支。 每个节点都有一个后继节点。 设置后继者后,将在当前节点执行完毕后立即执行。 如果没有后继者,则这标志着流程的结束。 流程的组成部分是 处理器,它是CompensatedTaskNode的一种特殊类型,是任何过程的起点。 CompensatedTaskNode 有一个开始节点,这是常规流程的开始 有一个Compensation节点,如果在正常流程的执行过程中发生错误,则将执行该节点 正常流程完
有向无环图任务调度算法 DAG ----- 图论中,如果有向图中任意一个顶点从该点出发经过任意边都无法回到该点,那么这个图称为有向无环图。 DAG常常用于多个任务的调度,各个任务之间可能存在依赖关系。 所以一个工程划分为不同的任务, 有开始任务和结束任务,通过合理规划执行小任务的顺序,确保总工期最短。
基于hypervolume方法的云计算DAG任务调度方法研究主要是通过对DAG模型任务调度优化的方法进行探讨,以提高云计算系统的性能和效率。DAG任务调度是指在云计算环境中,将任务根据其依赖关系组织成DAG模型,然后通过合适的调度算法来合理分配资源,实现任务的顺利执行。 其中,基于hypervolume方法的任务调度主要是依据多目标优化的思想,通过衡量任务调度解的性能来评估其在多个目标下的表现。这种方法可以在同时考虑任务执行时间和资源利用率等多个指标的前提下,找到最优的调度方案。 通过对云计算环境中DAG任务调度的研究,可以更好地平衡系统的负载,提高资源利用率,并且减少任务执行时间。通过对不同的调度算法进行比较和分析,可以发现适用于不同场景的调度策略,从而优化系统的性能。 在实际应用中,基于hypervolume方法的DAG任务调度可以更好地适应不同的环境和需求,提高云计算系统的可靠性和灵活性。因此,这一研究方向对于推动云计算技术的发展和应用具有重要的意义。同时,通过对任务调度方法的研究和改进,可以为云计算系统的性能优化和资源管理提供有效的支持。