在工作流的编排中,有些时候会同时运行多个任务或子进程,默认的方式是串行运行,但是为了提高性能,我们会希望能并行运行。我也进行了一些测试,发现这个并行运行还不是这么简单的。

举个例子,我们现在定义一个流程,这个流程很简单,就是调用一个Http接口。然后再定义另外一个流程,在这个流程里面引用刚才定义的子流程,然后运行这个子流程多次。我们希望实现的效果是这个子流程的运行能够并行。

首先我们先简单定义一个Http接口,在这个接口接收到Reqest之后,简单的休眠10秒之后再返回。这个接口很简单,代码就不附上了。

然后我们定义一个简单的流程,里面包含一个Service Task,是调用这个Http接口的。这个流程的BPMN内容如下:

<?xml version="1.0" encoding="UTF-8"?>
<bpmn2:definitions xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:bpmn2="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" xmlns:camunda="http://camunda.org/schema/1.0/bpmn" id="sample-diagram" targetNamespace="http://bpmn.io/schema/bpmn" xsi:schemaLocation="http://www.omg.org/spec/BPMN/20100524/MODEL BPMN20.xsd">
  <bpmn2:process id="Process_12345" name="Test_12345" isExecutable="true">
    <bpmn2:startEvent id="StartEvent_1">
      <bpmn2:outgoing>Flow_1lqf9ii</bpmn2:outgoing>
    </bpmn2:startEvent>
    <bpmn2:sequenceFlow id="Flow_1lqf9ii" sourceRef="StartEvent_1" targetRef="Activity_1jsutqa" />
    <bpmn2:endEvent id="Event_03vs1kv" camunda:asyncBefore="true">
      <bpmn2:incoming>Flow_179keqc</bpmn2:incoming>
    </bpmn2:endEvent>
    <bpmn2:sequenceFlow id="Flow_179keqc" sourceRef="Activity_1jsutqa" targetRef="Event_03vs1kv" />
    <bpmn2:serviceTask id="Activity_1jsutqa" name="调用HTTP接口" camunda:resultVariable="newNEType">
      <bpmn2:extensionElements>
        <camunda:connector>
          <camunda:inputOutput>
            <camunda:inputParameter name="url">http://127.0.0.1:7777/getDeviceList</camunda:inputParameter>
            <camunda:inputParameter name="method">POST</camunda:inputParameter>
            <camunda:inputParameter name="headers">
              <camunda:map>
                <camunda:entry key="Accept">application/json</camunda:entry>
                <camunda:entry key="Content-Type">application/json</camunda:entry>
              </camunda:map>
            </camunda:inputParameter>
            <camunda:inputParameter name="payload">{"neType": "${execution.getVariable("neType")}",
"test": "${execution.getVariable("test")}"}</camunda:inputParameter>
          </camunda:inputOutput>
          <camunda:connectorId>http-connector</camunda:connectorId>
        </camunda:connector>
      </bpmn2:extensionElements>
      <bpmn2:incoming>Flow_1lqf9ii</bpmn2:incoming>
      <bpmn2:outgoing>Flow_179keqc</bpmn2:outgoing>
    </bpmn2:serviceTask>
  </bpmn2:process>
  <bpmndi:BPMNDiagram id="BPMNDiagram_1">
    <bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="Process_12345">
      <bpmndi:BPMNShape id="_BPMNShape_StartEvent_2" bpmnElement="StartEvent_1">
        <dc:Bounds x="192" y="102" width="36" height="36" />
      </bpmndi:BPMNShape>
      <bpmndi:BPMNShape id="Event_03vs1kv_di" bpmnElement="Event_03vs1kv">
        <dc:Bounds x="432" y="102" width="36" height="36" />
      </bpmndi:BPMNShape>
      <bpmndi:BPMNShape id="Activity_1fzq8lj_di" bpmnElement="Activity_1jsutqa">
        <dc:Bounds x="280" y="80" width="100" height="80" />
        <bpmndi:BPMNLabel />
      </bpmndi:BPMNShape>
      <bpmndi:BPMNEdge id="Flow_1lqf9ii_di" bpmnElement="Flow_1lqf9ii">
        <di:waypoint x="228" y="120" />
        <di:waypoint x="280" y="120" />
      </bpmndi:BPMNEdge>
      <bpmndi:BPMNEdge id="Flow_179keqc_di" bpmnElement="Flow_179keqc">
        <di:waypoint x="380" y="120" />
        <di:waypoint x="432" y="120" />
      </bpmndi:BPMNEdge>
    </bpmndi:BPMNPlane>
  </bpmndi:BPMNDiagram>
</bpmn2:definitions>

然后我们再定义另外一个流程,这个流程里面包含一个Call Activity,这个Call Activity引用了刚才我们定义的流程。为了能让这个Call Activity能运行多次,我们需要定义一个inputCollection的变量,例如我在工作流里面定义一个Script Task,然后通过JavaScript来创建一个InputCollection:

var ArrayList = Java.type("java.util.ArrayList")
var neTypes = new ArrayList();
neTypes.add('AMF');
neTypes.add('SMF');
neTypes.add('UPF');
neTypes.add('ENUM');
neTypes.add('SPG');
neTypes.add('PCC');
neTypes;

然后创建一个Call Activity,在配置里面选择类型是并行运行多实例,即三条竖线的小图标,然后在Multi-Instance属性里面的Collection,设置为${neTypes},即刚才Script Task输出的变量。在Element Variable里面输入neType,这样当工作流运行时,就会遍历Collection里面的元素,把其赋值为neType,然后传给Call Activity执行。另外,在Multi-Instance里面的Asynchronous before选项也要勾上,但是Exclusive选项不要勾选。这样当流程执行到Call Activity之前,会暂停,然后创建线程任务来异步运行Call Activity。如果不这样设置,那么即使我们设置Call Activity是并行运行,但是仍然是串行执行。如下图所示:

 以下是这个流程定义的BPMN代码:

<?xml version="1.0" encoding="UTF-8"?>
<bpmn2:definitions xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:bpmn2="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" xmlns:camunda="http://camunda.org/schema/1.0/bpmn" id="sample-diagram" targetNamespace="http://bpmn.io/schema/bpmn" xsi:schemaLocation="http://www.omg.org/spec/BPMN/20100524/MODEL BPMN20.xsd">
  <bpmn2:process id="Process_12345" name="Test_12345" isExecutable="true">
    <bpmn2:startEvent id="StartEvent_1">
      <bpmn2:outgoing>Flow_1lqf9ii</bpmn2:outgoing>
    </bpmn2:startEvent>
    <bpmn2:sequenceFlow id="Flow_1lqf9ii" sourceRef="StartEvent_1" targetRef="Activity_1jsutqa" />
    <bpmn2:endEvent id="Event_03vs1kv" camunda:asyncBefore="true">
      <bpmn2:incoming>Flow_179keqc</bpmn2:incoming>
    </bpmn2:endEvent>
    <bpmn2:sequenceFlow id="Flow_179keqc" sourceRef="Activity_1jsutqa" targetRef="Event_03vs1kv" />
    <bpmn2:serviceTask id="Activity_1jsutqa" name="调用HTTP接口" camunda:resultVariable="newNEType">
      <bpmn2:extensionElements>
        <camunda:connector>
          <camunda:inputOutput>
            <camunda:inputParameter name="url">http://127.0.0.1:7777/getDeviceList</camunda:inputParameter>
            <camunda:inputParameter name="method">POST</camunda:inputParameter>
            <camunda:inputParameter name="headers">
              <camunda:map>
                <camunda:entry key="Accept">application/json</camunda:entry>
                <camunda:entry key="Content-Type">application/json</camunda:entry>
              </camunda:map>
            </camunda:inputParameter>
            <camunda:inputParameter name="payload">{"neType": "${execution.getVariable("neType")}",
"test": "${execution.getVariable("test")}"}</camunda:inputParameter>
          </camunda:inputOutput>
          <camunda:connectorId>http-connector</camunda:connectorId>
        </camunda:connector>
      </bpmn2:extensionElements>
      <bpmn2:incoming>Flow_1lqf9ii</bpmn2:incoming>
      <bpmn2:outgoing>Flow_179keqc</bpmn2:outgoing>
    </bpmn2:serviceTask>
  </bpmn2:process>
  <bpmndi:BPMNDiagram id="BPMNDiagram_1">
    <bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="Process_12345">
      <bpmndi:BPMNShape id="_BPMNShape_StartEvent_2" bpmnElement="StartEvent_1">
        <dc:Bounds x="192" y="102" width="36" height="36" />
      </bpmndi:BPMNShape>
      <bpmndi:BPMNShape id="Event_03vs1kv_di" bpmnElement="Event_03vs1kv">
        <dc:Bounds x="432" y="102" width="36" height="36" />
      </bpmndi:BPMNShape>
      <bpmndi:BPMNShape id="Activity_1fzq8lj_di" bpmnElement="Activity_1jsutqa">
        <dc:Bounds x="280" y="80" width="100" height="80" />
        <bpmndi:BPMNLabel />
      </bpmndi:BPMNShape>
      <bpmndi:BPMNEdge id="Flow_1lqf9ii_di" bpmnElement="Flow_1lqf9ii">
        <di:waypoint x="228" y="120" />
        <di:waypoint x="280" y="120" />
      </bpmndi:BPMNEdge>
      <bpmndi:BPMNEdge id="Flow_179keqc_di" bpmnElement="Flow_179keqc">
        <di:waypoint x="380" y="120" />
        <di:waypoint x="432" y="120" />
      </bpmndi:BPMNEdge>
    </bpmndi:BPMNPlane>
  </bpmndi:BPMNDiagram>
</bpmn2:definitions>

还有一点需要注意的,那就是在Call Activity引用的流程里面,在end event那里也需要设置Asynchronous before和Exclusive,不然的话Call Activity运行的次数会和我们预计的不同,会更多一些。按照网上的解释,由于可能存在多个线程同时运行这个子进程,但是Camunda只允许同一个进程中只有一个线程是激活的,因此只有1个线程是成功运行,其他的线程将重试运行,因此我们就会观察到有多于我们预计的运行次数。在End envent设置Async before可以阻止线程重复运行。

运行流程,从HTTP接口打印的调用日志,可以看到子流程确实是并行运行的。

在实际项目中,我还发现了一个奇怪的问题,就是我在Call Activity并行运行多个子进程,但是其中一个子进程会一直运行而不结束,其他子进程则没有这个问题。当时想了好久也没找到有啥问题,后来观察了一下这个子进程,其运行时间较长,因为要处理的数据比其他子进程要多,大概运行一次要5分多钟的时间。我又查看了一下官网上的配置描述,发现其中在任务执行里面有一个配置参数是lock-time-in-millis,默认配置刚好是5分钟,因此很有可能是因为超过5分钟这个任务还没结束,因此Camunda又启动了另外一个任务来重复执行,因此造成的现象就是好像这个任务一直在运行。找到问题之后,我把这个配置改为15分钟,即解决问题。当然按照官网的解释,另一种更好的解决办法是,当有任务需要运行长时间时,最好是改为external task的方式交由外部执行,当外部执行完毕之后发送消息给任务即可。

在工作流中,通常需要执行多个任务,这些任务可能是并行执行的,也可能是串行执行的。使用并行网关,可以将一个流程分成多个分支,每个分支可以独立地执行它们的任务。需要注意的是,在使用并行网关时,必须确保每个分支的任务都可以独立执行,并且它们之间没有任何依赖关系。此外,使用并行网关可以简化工作流的设计和维护,因为它可以将流程划分为更小的部分,每个部分都可以独立设计和测试。Camunda中的并行网关(Parallel Gateway)用于并行执行多个任务,这些任务可以是相互独立的,也可以是相互依赖的。 在 Camunda 中,子流程可以使用嵌入式子流程(Embedded Subprocess)或调用式子流程(Call Activity)来实现。嵌入式子流程是一种直接嵌入到主流程中的子流程,可以在主流程的任何位置执行;而调用式子流程是一种通过调用另一个流程定义来执行子流程的方式,可以使用输入输出参数来传递数据。 <bpmn:process id="async" isExecutable="true" camunda:jobPriority="50"> 第一个节点优先级不定义、第二个节点定义为30 <bpmn:userTask id="Activity_1nicj02" name="【总经理】审批" camunda:asyncBefore="true" camunda:asyncAfter="true" camunda:priorit 选择之后,就可以让“并行会签预审批”任务同时由多个人处理~ 参考《基于camunda如何实现会签:camunda会签流程配置与原理解析》 大神写的文章,里面对并行用户任务 的主要参数配置进行了说明: loop cardinality:循环基数。可选项。可以直接填整数,表示会签的人数。 Collection:集合。可选项。会签人数的集合,通常为list,和loop cardinality二选一。 Element v 最近在用Camunda流程引擎做业务开发时,认识到了Camunda Modeler 流程工具有条件事件、定时事件这2个东西。那它们长什么样子,在流程中可以用来做什么,以及怎么用的? 1、先来看看它们长什么样子吧!! 在生产环境中应用Camunda工作流,通常都需要配置多个工作流引擎实例,以满足负载分担,容灾备份等需求。这里我将用nacos+nginx,来实现多个工作流引擎的服务注册和负载分担。 在多个部门进行并行阅办的过程中,只要登记单的状态还不是已归档状态,第二个节点的领导,还可以随时增加阅办的部门(从技术的角度来说,只要子流程节点还是活动状态,就可以随时增加子流程活动节点的实例数)。在一些业务场景中,允许管理员或某个角色对运行中的流程进行节点跳转(或成为调度)操作,也就是可以将流程的任意一个节点设置为一个活动节点(为了保证流程允许的完整性同时会取消当前所有的活动节点)。结合上面的需求和已经画好的流程图,综合来看已经很清楚实现了,并行子流程应对 “多节点为一组” 独立并行流转的场景最为合适。 Camunda 对资源要求不高。通常不会达到 Camunda 可扩展性的限制。。普通开发人员笔记本(Intel i5 4 Core @2.5 Ghz,8 GB RAM,SSD HD)上运行基准测试,该笔记本每秒启动大约 100 到 500 个进程实例。性能测试:建议通过 JMeter 等负载工具在接近生产环境的环境中进行负载测试。 目前Camunda提供了5种不同类型的网关:Parallel Gateway(并行网关),Complex Gateway(复杂网关) ,Exclusive Gateway(独占网关),Inclusive Gateway(包容网关),Event-based Gateway(事件网关)。本文主要介绍Parallel Gateway(并行网关)。一、并行网关使用场景并行网关适用于,当多个条件任务同时执行完毕时,流程才能继续向下进行的场景。