ProcessRuntime

自动注入ProcessRuntime

@Autowired
private ProcessRuntime processRuntime;

来让们看看这个ProcessRuntime 是做什么の

public interface ProcessRuntime {
  ProcessRuntimeConfiguration configuration();
  ProcessDefinition processDefinition(String processDefinitionId);
  Page processDefinitions(Pageable pageable);
  Page processDefinitions(Pageable pageable,
              GetProcessDefinitionsPayload payload);
  ProcessInstance start(StartProcessPayload payload);
  Page processInstances(Pageable pageable);
  Page processInstances(Pageable pageable,
              GetProcessInstancesPayload payload);
  ProcessInstance processInstance(String processInstanceId);
  ProcessInstance suspend(SuspendProcessPayload payload);
  ProcessInstance resume(ResumeProcessPayload payload);
  ProcessInstance delete(DeleteProcessPayload payload);
  void signal(SignalPayload payload);
  ...
}

和TaskRuntime接口类似

processDefinition(流程定义)

Page processDefinitionPage = processRuntime
                                .processDefinitions(Pageable.of(0, 10));
logger.info("> Available Process definitions: " +
                  processDefinitionPage.getTotalItems());
for (ProcessDefinition pd : processDefinitionPage.getContent()) {
  logger.info("\t > Process definition: " + pd);
}

Process Definitions需要放在/src/main/resources/processes/中

我们正在使用Spring Scheduling功能来每秒钟启动一个进程,从数组中提取随机值到进程:

@Scheduled(initialDelay = 1000, fixedDelay = 1000)
public void processText() {
  securityUtil.logInAs("system");
  String content = pickRandomString();
  SimpleDateFormat formatter = new SimpleDateFormat("dd-MM-yy HH:mm:ss");
  logger.info("> Processing content: " + content
                    + " at " + formatter.format(new Date()));
  ProcessInstance processInstance = processRuntime
                  .start(ProcessPayloadBuilder
                       .start()
                       .withProcessDefinitionKey("categorizeProcess")
                       .withProcessInstanceName("Processing Content: " + content)
                       .withVariable("content", content)
                       .build());
  logger.info(">>> Created Process Instance: " + processInstance);
}

与前面一样,我们使用ProcessPayloadBuilder以流畅的方式参数化希望启动哪个进程以及使用哪个进程变量。
现在,如果我们回顾一下流程定义,您将发现3个服务任务。为了提供这些服务任务的实现,你需要定义Connectors:

@Bean
public Connector processTextConnector() {
  return integrationContext -> {
      Map inBoundVariables = integrationContext.getInBoundVariables();
      String contentToProcess = (String) inBoundVariables.get("content")
     // Logic Here to decide if content is approved or not
     if (contentToProcess.contains("activiti")) {
        logger.info("> Approving content: " + contentToProcess);
        integrationContext.addOutBoundVariable("approved",true);
     } else {
        logger.info("> Discarding content: " + contentToProcess);
        integrationContext.addOutBoundVariable("approved",false);
     }
    return integrationContext;
  };
}

这些连接器使用Bean名称自动连接到ProcessRuntime,在本例中为“processTextConnector”。这个bean名是从我们的流程定义中的serviceTask元素的实现属性中获取的

<bpmn:serviceTask id="Task_1ylvdew" name="Process Content" implementation="processTextConnector">

这个新的连接器接口是JavaDelegates的自然演变,新版本的Activiti Core将尝试通过将你的javadagates包装在连接器实现中来重用它们

public interface Connector extends Function<IntegrationContext, IntegrationContext> {
}

函数式接口

连接器接收带有流程变量的IntegrationContext,并返回带有需要映射回流程变量的结果的修改的IntegrationContext。
在前面的示例中,连接器实现接收一个“内容”变量,并根据内容处理逻辑添加一个“批准的”变量。
在这些连接器中,您可能包括系统到系统的调用,例如REST调用和基于消息的交互。这些交互会变得越来越复杂,因此我们将在未来的教程中看到这些连接器是如何做到的

例子

@SpringBootApplication
@EnableScheduling
public class DemoApplication implements CommandLineRunner {

    private Logger logger = LoggerFactory.getLogger(DemoApplication.class);

    @Autowired
    private ProcessRuntime processRuntime;

    @Autowired
    private SecurityUtil securityUtil;

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);

    }

    @Override
    public void run(String... args) {
        securityUtil.logInAs("system");

        Page<ProcessDefinition> processDefinitionPage = processRuntime.processDefinitions(Pageable.of(0, 10));
        logger.info("> Available Process definitions: " + processDefinitionPage.getTotalItems());
        for (ProcessDefinition pd : processDefinitionPage.getContent()) {
            logger.info("\t > Process definition: " + pd);
        }
    }

    @Scheduled(initialDelay = 1000, fixedDelay = 1000)
    public void processText() {

        securityUtil.logInAs("system");

        String content = pickRandomString();

        SimpleDateFormat formatter = new SimpleDateFormat("dd-MM-yy HH:mm:ss");

        logger.info("> Processing content: " + content + " at " + formatter.format(new Date()));

        ProcessInstance processInstance = processRuntime.start(ProcessPayloadBuilder
                .start()
                .withProcessDefinitionKey("categorizeProcess")
                .withName("Processing Content: " + content)
                .withVariable("content", content)
                .build());
        logger.info(">>> Created Process Instance: " + processInstance);


    }

    @Bean
    public Connector processTextConnector() {
        return integrationContext -> {
            Map<String, Object> inBoundVariables = integrationContext.getInBoundVariables();
            String contentToProcess = (String) inBoundVariables.get("content");
            // Logic Here to decide if content is approved or not
            if (contentToProcess.contains("activiti")) {
                logger.info("> Approving content: " + contentToProcess);
                integrationContext.addOutBoundVariable("approved",
                        true);
            } else {
                logger.info("> Discarding content: " + contentToProcess);
                integrationContext.addOutBoundVariable("approved",
                        false);
            }
            return integrationContext;
        };
    }

    @Bean
    public Connector tagTextConnector() {
        return integrationContext -> {
            String contentToTag = (String) integrationContext.getInBoundVariables().get("content");
            contentToTag += " :) ";
            integrationContext.addOutBoundVariable("content",
                    contentToTag);
            logger.info("Final Content: " + contentToTag);
            return integrationContext;
        };
    }

    @Bean
    public Connector discardTextConnector() {
        return integrationContext -> {
            String contentToDiscard = (String) integrationContext.getInBoundVariables().get("content");
            contentToDiscard += " :( ";
            integrationContext.addOutBoundVariable("content",
                    contentToDiscard);
            logger.info("Final Content: " + contentToDiscard);
            return integrationContext;
        };
    }

    @Bean
    public ProcessRuntimeEventListener<ProcessCompletedEvent> processCompletedListener() {
        return processCompleted -> logger.info(">>> Process Completed: '"
                + processCompleted.getEntity().getName() +
                "' We can send a notification to the initiator: " + processCompleted.getEntity().getInitiator());
    }

    private String pickRandomString() {
        String[] texts = {"hello from london", "Hi there from activiti!", "all good news over here.", "I've tweeted about activiti today.",
                "other boring projects.", "activiti cloud - Cloud Native Java BPM"};
        return texts[new Random().nextInt(texts.length)];
    }

}

categorize-content.bpmn20.xml(processDefinition(流程定义))

<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" id="Definitions_0v7t65f" targetNamespace="http://bpmn.io/schema/bpmn">
  <bpmn:process id="categorizeProcess" name="categorizeProcess" isExecutable="true">
    <bpmn:startEvent id="StartEvent_1">
      <bpmn:outgoing>SequenceFlow_09xowo4</bpmn:outgoing>
    </bpmn:startEvent>
    <bpmn:sequenceFlow id="SequenceFlow_09xowo4" sourceRef="StartEvent_1" targetRef="Task_1ylvdew" />
    <bpmn:exclusiveGateway id="ExclusiveGateway_0c36qc6" name="Content Accepted?">
      <bpmn:incoming>SequenceFlow_1jzbgkj</bpmn:incoming>
      <bpmn:outgoing>SequenceFlow_0tsc63v</bpmn:outgoing>
      <bpmn:outgoing>SequenceFlow_049fuit</bpmn:outgoing>
    </bpmn:exclusiveGateway>
    <bpmn:sequenceFlow id="SequenceFlow_1jzbgkj" sourceRef="Task_1ylvdew" targetRef="ExclusiveGateway_0c36qc6" />
    <bpmn:sequenceFlow id="SequenceFlow_0tsc63v" name="yes" sourceRef="ExclusiveGateway_0c36qc6" targetRef="Task_0snvh02">
      <bpmn:conditionExpression xsi:type="bpmn:tFormalExpression">${approved == true}</bpmn:conditionExpression>
    </bpmn:sequenceFlow>
    <bpmn:sequenceFlow id="SequenceFlow_049fuit" name="no" sourceRef="ExclusiveGateway_0c36qc6" targetRef="Task_1asxw87">
      <bpmn:conditionExpression xsi:type="bpmn:tFormalExpression">${approved == false}</bpmn:conditionExpression>
    </bpmn:sequenceFlow>
    <bpmn:sequenceFlow id="SequenceFlow_0upfncf" sourceRef="Task_1asxw87" targetRef="EndEvent_13bsqqd" />
    <bpmn:sequenceFlow id="SequenceFlow_1nn2llw" sourceRef="Task_0snvh02" targetRef="EndEvent_1ogwwp9" />
    <bpmn:serviceTask id="Task_1ylvdew" name="Process Content" implementation="processTextConnector">
      <bpmn:incoming>SequenceFlow_09xowo4</bpmn:incoming>
      <bpmn:outgoing>SequenceFlow_1jzbgkj</bpmn:outgoing>
    </bpmn:serviceTask>
    <bpmn:serviceTask id="Task_0snvh02" name="Tag categorized Content" implementation="tagTextConnector">
      <bpmn:incoming>SequenceFlow_0tsc63v</bpmn:incoming>
      <bpmn:outgoing>SequenceFlow_1nn2llw</bpmn:outgoing>
    </bpmn:serviceTask>
    <bpmn:serviceTask id="Task_1asxw87" name="Discard and Notify user" implementation="discardTextConnector">
      <bpmn:incoming>SequenceFlow_049fuit</bpmn:incoming>
      <bpmn:outgoing>SequenceFlow_0upfncf</bpmn:outgoing>
    </bpmn:serviceTask>
    <bpmn:endEvent id="EndEvent_13bsqqd">
      <bpmn:incoming>SequenceFlow_0upfncf</bpmn:incoming>
    </bpmn:endEvent>
    <bpmn:endEvent id="EndEvent_1ogwwp9">
      <bpmn:incoming>SequenceFlow_1nn2llw</bpmn:incoming>
    </bpmn:endEvent>
  </bpmn:process>
  <bpmndi:BPMNDiagram id="BPMNDiagram_1">
    <bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="categorizeProcess">
      <bpmndi:BPMNShape id="_BPMNShape_StartEvent_2" bpmnElement="StartEvent_1">
        <dc:Bounds x="173" y="102" width="36" height="36" />
      </bpmndi:BPMNShape>
      <bpmndi:BPMNEdge id="SequenceFlow_09xowo4_di" bpmnElement="SequenceFlow_09xowo4">
        <di:waypoint x="209" y="120" />
        <di:waypoint x="259" y="120" />
      </bpmndi:BPMNEdge>
      <bpmndi:BPMNShape id="ExclusiveGateway_0c36qc6_di" bpmnElement="ExclusiveGateway_0c36qc6" isMarkerVisible="true">
        <dc:Bounds x="409" y="95" width="50" height="50" />
        <bpmndi:BPMNLabel>
          <dc:Bounds x="409" y="65" width="52" height="27" />
        </bpmndi:BPMNLabel>
      </bpmndi:BPMNShape>
      <bpmndi:BPMNEdge id="SequenceFlow_1jzbgkj_di" bpmnElement="SequenceFlow_1jzbgkj">
        <di:waypoint x="359" y="120" />
        <di:waypoint x="409" y="120" />
      </bpmndi:BPMNEdge>
      <bpmndi:BPMNEdge id="SequenceFlow_0tsc63v_di" bpmnElement="SequenceFlow_0tsc63v">
        <di:waypoint x="459" y="120" />
        <di:waypoint x="509" y="120" />
        <bpmndi:BPMNLabel>
          <dc:Bounds x="475" y="102" width="18" height="14" />
        </bpmndi:BPMNLabel>
      </bpmndi:BPMNEdge>
      <bpmndi:BPMNEdge id="SequenceFlow_049fuit_di" bpmnElement="SequenceFlow_049fuit">
        <di:waypoint x="434" y="145" />
        <di:waypoint x="434" y="230" />
        <di:waypoint x="509" y="230" />
        <bpmndi:BPMNLabel>
          <dc:Bounds x="443" y="185" width="13" height="14" />
        </bpmndi:BPMNLabel>
      </bpmndi:BPMNEdge>
      <bpmndi:BPMNEdge id="SequenceFlow_0upfncf_di" bpmnElement="SequenceFlow_0upfncf">
        <di:waypoint x="609" y="230" />
        <di:waypoint x="659" y="230" />
      </bpmndi:BPMNEdge>
      <bpmndi:BPMNEdge id="SequenceFlow_1nn2llw_di" bpmnElement="SequenceFlow_1nn2llw">
        <di:waypoint x="609" y="120" />
        <di:waypoint x="659" y="120" />
      </bpmndi:BPMNEdge>
      <bpmndi:BPMNShape id="ServiceTask_1vlvxl9_di" bpmnElement="Task_1ylvdew">
        <dc:Bounds x="259" y="80" width="100" height="80" />
      </bpmndi:BPMNShape>
      <bpmndi:BPMNShape id="ServiceTask_0z16f74_di" bpmnElement="Task_0snvh02">
        <dc:Bounds x="509" y="80" width="100" height="80" />
      </bpmndi:BPMNShape>
      <bpmndi:BPMNShape id="ServiceTask_14mct68_di" bpmnElement="Task_1asxw87">
        <dc:Bounds x="509" y="190" width="100" height="80" />
      </bpmndi:BPMNShape>
      <bpmndi:BPMNShape id="EndEvent_1azfkz7_di" bpmnElement="EndEvent_13bsqqd">
        <dc:Bounds x="659" y="212" width="36" height="36" />
      </bpmndi:BPMNShape>
      <bpmndi:BPMNShape id="EndEvent_18mdes3_di" bpmnElement="EndEvent_1ogwwp9">
        <dc:Bounds x="659" y="102" width="36" height="36" />
      </bpmndi:BPMNShape>
    </bpmndi:BPMNPlane>
  </bpmndi:BPMNDiagram>
</bpmn:definitions>

来让我们看着这个流程的定义

2021-05-04_201606

流程图如下所示

2021-05-04_202905

再来看下代码

@SpringBootApplication
@EnableScheduling
public class DemoApplication implements CommandLineRunner {

    private Logger logger = LoggerFactory.getLogger(DemoApplication.class);

    @Autowired
    private ProcessRuntime processRuntime;

    @Autowired
    private SecurityUtil securityUtil;

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);

    }

    @Override
    public void run(String... args) {
        securityUtil.logInAs("system");

        Page<ProcessDefinition> processDefinitionPage = processRuntime.processDefinitions(Pageable.of(0, 10));
        logger.info("> Available Process definitions: " + processDefinitionPage.getTotalItems());
        for (ProcessDefinition pd : processDefinitionPage.getContent()) {
            logger.info("\t > Process definition: " + pd);
        }

    }

    @Scheduled(initialDelay = 1000, fixedDelay = 1000)
    public void processText() {

        securityUtil.logInAs("system");

        String content = pickRandomString();

        SimpleDateFormat formatter = new SimpleDateFormat("dd-MM-yy HH:mm:ss");

        logger.info("> Processing content: " + content + " at " + formatter.format(new Date()));

        ProcessInstance processInstance = processRuntime.start(ProcessPayloadBuilder
                .start()
                .withProcessDefinitionKey("categorizeProcess")
                .withName("Processing Content: " + content)
                .withVariable("content", content)
                .build());
        logger.info(">>> Created Process Instance: " + processInstance);


    }

    @Bean
    public Connector processTextConnector() {
        return integrationContext -> {
            Map<String, Object> inBoundVariables = integrationContext.getInBoundVariables();
            String contentToProcess = (String) inBoundVariables.get("content");
            // Logic Here to decide if content is approved or not
            if (contentToProcess.contains("activiti")) {
                logger.info("> Approving content: " + contentToProcess);
                integrationContext.addOutBoundVariable("approved",
                        true);
            } else {
                logger.info("> Discarding content: " + contentToProcess);
                integrationContext.addOutBoundVariable("approved",
                        false);
            }
            return integrationContext;
        };
    }

    @Bean
    public Connector tagTextConnector() {
        return integrationContext -> {
            String contentToTag = (String) integrationContext.getInBoundVariables().get("content");
            contentToTag += " :) ";
            integrationContext.addOutBoundVariable("content",
                    contentToTag);
            logger.info("Final Content: " + contentToTag);
            return integrationContext;
        };
    }

    @Bean
    public Connector discardTextConnector() {
        return integrationContext -> {
            String contentToDiscard = (String) integrationContext.getInBoundVariables().get("content");
            contentToDiscard += " :( ";
            integrationContext.addOutBoundVariable("content",
                    contentToDiscard);
            logger.info("Final Content: " + contentToDiscard);
            return integrationContext;
        };
    }

    @Bean
    public ProcessRuntimeEventListener<ProcessCompletedEvent> processCompletedListener() {
        return processCompleted -> logger.info(">>> Process Completed: '"
                + processCompleted.getEntity().getName() +
                "' We can send a notification to the initiator: " + processCompleted.getEntity().getInitiator());
    }

    private String pickRandomString() {
        String[] texts = {"hello from london", "Hi there from activiti!", "all good news over here.", "I've tweeted about activiti today.",
                "other boring projects.", "activiti cloud - Cloud Native Java BPM"};
        return texts[new Random().nextInt(texts.length)];
    }

}

2021-05-04_203318

2021-05-04_203901

   @Bean
    public ProcessRuntimeEventListener<ProcessCompletedEvent> processCompletedListener() {
        return processCompleted -> logger.info(">>> Process Completed: '"
                + processCompleted.getEntity().getName() +
                "' We can send a notification to the initiator: " + processCompleted.getEntity().getInitiator());
    }

这个是增加一个流程执行完毕的监听。当一个流程执行完毕则会回调上面的lambda表达式(就是->后面的)

Q.E.D.


Very lazy chimp