ProcessRuntime
自动注入ProcessRuntime
@Autowired
private ProcessRuntime processRuntime;
来让们看看这个ProcessRuntime 是做什么の
public interface ProcessRuntime {
ProcessRuntimeConfiguration configuration();
ProcessDefinition processDefinition(String processDefinitionId);
Page processDefinitions(Pageable pageable);
Page processDefinitions(Pageable pageable,
GetProcessDefinitionsPayload payload);
ProcessInstance start(StartProcessPayload payload);
Page processInstances(Pageable pageable);
Page processInstances(Pageable pageable,
GetProcessInstancesPayload payload);
ProcessInstance processInstance(String processInstanceId);
ProcessInstance suspend(SuspendProcessPayload payload);
ProcessInstance resume(ResumeProcessPayload payload);
ProcessInstance delete(DeleteProcessPayload payload);
void signal(SignalPayload payload);
...
}
和TaskRuntime接口类似
processDefinition(流程定义)
Page processDefinitionPage = processRuntime
.processDefinitions(Pageable.of(0, 10));
logger.info("> Available Process definitions: " +
processDefinitionPage.getTotalItems());
for (ProcessDefinition pd : processDefinitionPage.getContent()) {
logger.info("\t > Process definition: " + pd);
}
Process Definitions需要放在/src/main/resources/processes/中
我们正在使用Spring Scheduling功能来每秒钟启动一个进程,从数组中提取随机值到进程:
@Scheduled(initialDelay = 1000, fixedDelay = 1000)
public void processText() {
securityUtil.logInAs("system");
String content = pickRandomString();
SimpleDateFormat formatter = new SimpleDateFormat("dd-MM-yy HH:mm:ss");
logger.info("> Processing content: " + content
+ " at " + formatter.format(new Date()));
ProcessInstance processInstance = processRuntime
.start(ProcessPayloadBuilder
.start()
.withProcessDefinitionKey("categorizeProcess")
.withProcessInstanceName("Processing Content: " + content)
.withVariable("content", content)
.build());
logger.info(">>> Created Process Instance: " + processInstance);
}
与前面一样,我们使用ProcessPayloadBuilder以流畅的方式参数化希望启动哪个进程以及使用哪个进程变量。
现在,如果我们回顾一下流程定义,您将发现3个服务任务。为了提供这些服务任务的实现,你需要定义Connectors:
@Bean
public Connector processTextConnector() {
return integrationContext -> {
Map inBoundVariables = integrationContext.getInBoundVariables();
String contentToProcess = (String) inBoundVariables.get("content")
// Logic Here to decide if content is approved or not
if (contentToProcess.contains("activiti")) {
logger.info("> Approving content: " + contentToProcess);
integrationContext.addOutBoundVariable("approved",true);
} else {
logger.info("> Discarding content: " + contentToProcess);
integrationContext.addOutBoundVariable("approved",false);
}
return integrationContext;
};
}
这些连接器使用Bean名称自动连接到ProcessRuntime,在本例中为“processTextConnector”。这个bean名是从我们的流程定义中的serviceTask元素的实现属性中获取的
<bpmn:serviceTask id="Task_1ylvdew" name="Process Content" implementation="processTextConnector">
这个新的连接器接口是JavaDelegates的自然演变,新版本的Activiti Core将尝试通过将你的javadagates包装在连接器实现中来重用它们
public interface Connector extends Function<IntegrationContext, IntegrationContext> {
}
函数式接口
连接器接收带有流程变量的IntegrationContext,并返回带有需要映射回流程变量的结果的修改的IntegrationContext。
在前面的示例中,连接器实现接收一个“内容”变量,并根据内容处理逻辑添加一个“批准的”变量。
在这些连接器中,您可能包括系统到系统的调用,例如REST调用和基于消息的交互。这些交互会变得越来越复杂,因此我们将在未来的教程中看到这些连接器是如何做到的
例子
@SpringBootApplication
@EnableScheduling
public class DemoApplication implements CommandLineRunner {
private Logger logger = LoggerFactory.getLogger(DemoApplication.class);
@Autowired
private ProcessRuntime processRuntime;
@Autowired
private SecurityUtil securityUtil;
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Override
public void run(String... args) {
securityUtil.logInAs("system");
Page<ProcessDefinition> processDefinitionPage = processRuntime.processDefinitions(Pageable.of(0, 10));
logger.info("> Available Process definitions: " + processDefinitionPage.getTotalItems());
for (ProcessDefinition pd : processDefinitionPage.getContent()) {
logger.info("\t > Process definition: " + pd);
}
}
@Scheduled(initialDelay = 1000, fixedDelay = 1000)
public void processText() {
securityUtil.logInAs("system");
String content = pickRandomString();
SimpleDateFormat formatter = new SimpleDateFormat("dd-MM-yy HH:mm:ss");
logger.info("> Processing content: " + content + " at " + formatter.format(new Date()));
ProcessInstance processInstance = processRuntime.start(ProcessPayloadBuilder
.start()
.withProcessDefinitionKey("categorizeProcess")
.withName("Processing Content: " + content)
.withVariable("content", content)
.build());
logger.info(">>> Created Process Instance: " + processInstance);
}
@Bean
public Connector processTextConnector() {
return integrationContext -> {
Map<String, Object> inBoundVariables = integrationContext.getInBoundVariables();
String contentToProcess = (String) inBoundVariables.get("content");
// Logic Here to decide if content is approved or not
if (contentToProcess.contains("activiti")) {
logger.info("> Approving content: " + contentToProcess);
integrationContext.addOutBoundVariable("approved",
true);
} else {
logger.info("> Discarding content: " + contentToProcess);
integrationContext.addOutBoundVariable("approved",
false);
}
return integrationContext;
};
}
@Bean
public Connector tagTextConnector() {
return integrationContext -> {
String contentToTag = (String) integrationContext.getInBoundVariables().get("content");
contentToTag += " :) ";
integrationContext.addOutBoundVariable("content",
contentToTag);
logger.info("Final Content: " + contentToTag);
return integrationContext;
};
}
@Bean
public Connector discardTextConnector() {
return integrationContext -> {
String contentToDiscard = (String) integrationContext.getInBoundVariables().get("content");
contentToDiscard += " :( ";
integrationContext.addOutBoundVariable("content",
contentToDiscard);
logger.info("Final Content: " + contentToDiscard);
return integrationContext;
};
}
@Bean
public ProcessRuntimeEventListener<ProcessCompletedEvent> processCompletedListener() {
return processCompleted -> logger.info(">>> Process Completed: '"
+ processCompleted.getEntity().getName() +
"' We can send a notification to the initiator: " + processCompleted.getEntity().getInitiator());
}
private String pickRandomString() {
String[] texts = {"hello from london", "Hi there from activiti!", "all good news over here.", "I've tweeted about activiti today.",
"other boring projects.", "activiti cloud - Cloud Native Java BPM"};
return texts[new Random().nextInt(texts.length)];
}
}
categorize-content.bpmn20.xml(processDefinition(流程定义))
<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" id="Definitions_0v7t65f" targetNamespace="http://bpmn.io/schema/bpmn">
<bpmn:process id="categorizeProcess" name="categorizeProcess" isExecutable="true">
<bpmn:startEvent id="StartEvent_1">
<bpmn:outgoing>SequenceFlow_09xowo4</bpmn:outgoing>
</bpmn:startEvent>
<bpmn:sequenceFlow id="SequenceFlow_09xowo4" sourceRef="StartEvent_1" targetRef="Task_1ylvdew" />
<bpmn:exclusiveGateway id="ExclusiveGateway_0c36qc6" name="Content Accepted?">
<bpmn:incoming>SequenceFlow_1jzbgkj</bpmn:incoming>
<bpmn:outgoing>SequenceFlow_0tsc63v</bpmn:outgoing>
<bpmn:outgoing>SequenceFlow_049fuit</bpmn:outgoing>
</bpmn:exclusiveGateway>
<bpmn:sequenceFlow id="SequenceFlow_1jzbgkj" sourceRef="Task_1ylvdew" targetRef="ExclusiveGateway_0c36qc6" />
<bpmn:sequenceFlow id="SequenceFlow_0tsc63v" name="yes" sourceRef="ExclusiveGateway_0c36qc6" targetRef="Task_0snvh02">
<bpmn:conditionExpression xsi:type="bpmn:tFormalExpression">${approved == true}</bpmn:conditionExpression>
</bpmn:sequenceFlow>
<bpmn:sequenceFlow id="SequenceFlow_049fuit" name="no" sourceRef="ExclusiveGateway_0c36qc6" targetRef="Task_1asxw87">
<bpmn:conditionExpression xsi:type="bpmn:tFormalExpression">${approved == false}</bpmn:conditionExpression>
</bpmn:sequenceFlow>
<bpmn:sequenceFlow id="SequenceFlow_0upfncf" sourceRef="Task_1asxw87" targetRef="EndEvent_13bsqqd" />
<bpmn:sequenceFlow id="SequenceFlow_1nn2llw" sourceRef="Task_0snvh02" targetRef="EndEvent_1ogwwp9" />
<bpmn:serviceTask id="Task_1ylvdew" name="Process Content" implementation="processTextConnector">
<bpmn:incoming>SequenceFlow_09xowo4</bpmn:incoming>
<bpmn:outgoing>SequenceFlow_1jzbgkj</bpmn:outgoing>
</bpmn:serviceTask>
<bpmn:serviceTask id="Task_0snvh02" name="Tag categorized Content" implementation="tagTextConnector">
<bpmn:incoming>SequenceFlow_0tsc63v</bpmn:incoming>
<bpmn:outgoing>SequenceFlow_1nn2llw</bpmn:outgoing>
</bpmn:serviceTask>
<bpmn:serviceTask id="Task_1asxw87" name="Discard and Notify user" implementation="discardTextConnector">
<bpmn:incoming>SequenceFlow_049fuit</bpmn:incoming>
<bpmn:outgoing>SequenceFlow_0upfncf</bpmn:outgoing>
</bpmn:serviceTask>
<bpmn:endEvent id="EndEvent_13bsqqd">
<bpmn:incoming>SequenceFlow_0upfncf</bpmn:incoming>
</bpmn:endEvent>
<bpmn:endEvent id="EndEvent_1ogwwp9">
<bpmn:incoming>SequenceFlow_1nn2llw</bpmn:incoming>
</bpmn:endEvent>
</bpmn:process>
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="categorizeProcess">
<bpmndi:BPMNShape id="_BPMNShape_StartEvent_2" bpmnElement="StartEvent_1">
<dc:Bounds x="173" y="102" width="36" height="36" />
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="SequenceFlow_09xowo4_di" bpmnElement="SequenceFlow_09xowo4">
<di:waypoint x="209" y="120" />
<di:waypoint x="259" y="120" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNShape id="ExclusiveGateway_0c36qc6_di" bpmnElement="ExclusiveGateway_0c36qc6" isMarkerVisible="true">
<dc:Bounds x="409" y="95" width="50" height="50" />
<bpmndi:BPMNLabel>
<dc:Bounds x="409" y="65" width="52" height="27" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="SequenceFlow_1jzbgkj_di" bpmnElement="SequenceFlow_1jzbgkj">
<di:waypoint x="359" y="120" />
<di:waypoint x="409" y="120" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="SequenceFlow_0tsc63v_di" bpmnElement="SequenceFlow_0tsc63v">
<di:waypoint x="459" y="120" />
<di:waypoint x="509" y="120" />
<bpmndi:BPMNLabel>
<dc:Bounds x="475" y="102" width="18" height="14" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="SequenceFlow_049fuit_di" bpmnElement="SequenceFlow_049fuit">
<di:waypoint x="434" y="145" />
<di:waypoint x="434" y="230" />
<di:waypoint x="509" y="230" />
<bpmndi:BPMNLabel>
<dc:Bounds x="443" y="185" width="13" height="14" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="SequenceFlow_0upfncf_di" bpmnElement="SequenceFlow_0upfncf">
<di:waypoint x="609" y="230" />
<di:waypoint x="659" y="230" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="SequenceFlow_1nn2llw_di" bpmnElement="SequenceFlow_1nn2llw">
<di:waypoint x="609" y="120" />
<di:waypoint x="659" y="120" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNShape id="ServiceTask_1vlvxl9_di" bpmnElement="Task_1ylvdew">
<dc:Bounds x="259" y="80" width="100" height="80" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="ServiceTask_0z16f74_di" bpmnElement="Task_0snvh02">
<dc:Bounds x="509" y="80" width="100" height="80" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="ServiceTask_14mct68_di" bpmnElement="Task_1asxw87">
<dc:Bounds x="509" y="190" width="100" height="80" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="EndEvent_1azfkz7_di" bpmnElement="EndEvent_13bsqqd">
<dc:Bounds x="659" y="212" width="36" height="36" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="EndEvent_18mdes3_di" bpmnElement="EndEvent_1ogwwp9">
<dc:Bounds x="659" y="102" width="36" height="36" />
</bpmndi:BPMNShape>
</bpmndi:BPMNPlane>
</bpmndi:BPMNDiagram>
</bpmn:definitions>
来让我们看着这个流程的定义
流程图如下所示
再来看下代码
@SpringBootApplication
@EnableScheduling
public class DemoApplication implements CommandLineRunner {
private Logger logger = LoggerFactory.getLogger(DemoApplication.class);
@Autowired
private ProcessRuntime processRuntime;
@Autowired
private SecurityUtil securityUtil;
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Override
public void run(String... args) {
securityUtil.logInAs("system");
Page<ProcessDefinition> processDefinitionPage = processRuntime.processDefinitions(Pageable.of(0, 10));
logger.info("> Available Process definitions: " + processDefinitionPage.getTotalItems());
for (ProcessDefinition pd : processDefinitionPage.getContent()) {
logger.info("\t > Process definition: " + pd);
}
}
@Scheduled(initialDelay = 1000, fixedDelay = 1000)
public void processText() {
securityUtil.logInAs("system");
String content = pickRandomString();
SimpleDateFormat formatter = new SimpleDateFormat("dd-MM-yy HH:mm:ss");
logger.info("> Processing content: " + content + " at " + formatter.format(new Date()));
ProcessInstance processInstance = processRuntime.start(ProcessPayloadBuilder
.start()
.withProcessDefinitionKey("categorizeProcess")
.withName("Processing Content: " + content)
.withVariable("content", content)
.build());
logger.info(">>> Created Process Instance: " + processInstance);
}
@Bean
public Connector processTextConnector() {
return integrationContext -> {
Map<String, Object> inBoundVariables = integrationContext.getInBoundVariables();
String contentToProcess = (String) inBoundVariables.get("content");
// Logic Here to decide if content is approved or not
if (contentToProcess.contains("activiti")) {
logger.info("> Approving content: " + contentToProcess);
integrationContext.addOutBoundVariable("approved",
true);
} else {
logger.info("> Discarding content: " + contentToProcess);
integrationContext.addOutBoundVariable("approved",
false);
}
return integrationContext;
};
}
@Bean
public Connector tagTextConnector() {
return integrationContext -> {
String contentToTag = (String) integrationContext.getInBoundVariables().get("content");
contentToTag += " :) ";
integrationContext.addOutBoundVariable("content",
contentToTag);
logger.info("Final Content: " + contentToTag);
return integrationContext;
};
}
@Bean
public Connector discardTextConnector() {
return integrationContext -> {
String contentToDiscard = (String) integrationContext.getInBoundVariables().get("content");
contentToDiscard += " :( ";
integrationContext.addOutBoundVariable("content",
contentToDiscard);
logger.info("Final Content: " + contentToDiscard);
return integrationContext;
};
}
@Bean
public ProcessRuntimeEventListener<ProcessCompletedEvent> processCompletedListener() {
return processCompleted -> logger.info(">>> Process Completed: '"
+ processCompleted.getEntity().getName() +
"' We can send a notification to the initiator: " + processCompleted.getEntity().getInitiator());
}
private String pickRandomString() {
String[] texts = {"hello from london", "Hi there from activiti!", "all good news over here.", "I've tweeted about activiti today.",
"other boring projects.", "activiti cloud - Cloud Native Java BPM"};
return texts[new Random().nextInt(texts.length)];
}
}
@Bean
public ProcessRuntimeEventListener<ProcessCompletedEvent> processCompletedListener() {
return processCompleted -> logger.info(">>> Process Completed: '"
+ processCompleted.getEntity().getName() +
"' We can send a notification to the initiator: " + processCompleted.getEntity().getInitiator());
}
这个是增加一个流程执行完毕的监听。当一个流程执行完毕则会回调上面的lambda表达式(就是->后面的)
Q.E.D.