feature #I7HJFX 为循环Condition添加异步任务执行逻辑

This commit is contained in:
zy 2023-07-02 13:12:26 +08:00
parent 90eca86ca4
commit 25a99a97d1
5 changed files with 339 additions and 125 deletions

View File

@ -6,10 +6,17 @@ import com.yomahub.liteflow.enums.ConditionTypeEnum;
import com.yomahub.liteflow.exception.NoForNodeException;
import com.yomahub.liteflow.flow.element.Executable;
import com.yomahub.liteflow.flow.element.Node;
import com.yomahub.liteflow.flow.parallel.LoopFutureObj;
import com.yomahub.liteflow.slot.DataBus;
import com.yomahub.liteflow.slot.Slot;
import com.yomahub.liteflow.thread.ExecutorHelper;
import com.yomahub.liteflow.util.LiteFlowProxyUtil;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
/**
* 循环次数Condition
*
@ -18,67 +25,100 @@ import com.yomahub.liteflow.util.LiteFlowProxyUtil;
*/
public class ForCondition extends LoopCondition {
@Override
public void executeCondition(Integer slotIndex) throws Exception {
Slot slot = DataBus.getSlot(slotIndex);
Node forNode = this.getForNode();
if (ObjectUtil.isNull(forNode)) {
String errorInfo = StrUtil.format("[{}]:no for-node found", slot.getRequestId());
throw new NoForNodeException(errorInfo);
}
@Override
public void executeCondition(Integer slotIndex) throws Exception {
Slot slot = DataBus.getSlot(slotIndex);
Node forNode = this.getForNode();
if (ObjectUtil.isNull(forNode)) {
String errorInfo = StrUtil.format("[{}]:no for-node found", slot.getRequestId());
throw new NoForNodeException(errorInfo);
}
// 先去判断isAccess方法如果isAccess方法都返回false整个FOR表达式不执行
if (!this.getForNode().isAccess(slotIndex)) {
return;
}
// 先去判断isAccess方法如果isAccess方法都返回false整个FOR表达式不执行
if (!this.getForNode().isAccess(slotIndex)) {
return;
}
// 执行forCount组件
forNode.setCurrChainId(this.getCurrChainId());
forNode.execute(slotIndex);
// 执行forCount组件
forNode.setCurrChainId(this.getCurrChainId());
forNode.execute(slotIndex);
// 获得循环次数
int forCount = forNode.getItemResultMetaValue(slotIndex);
// 获得循环次数
int forCount = forNode.getItemResultMetaValue(slotIndex);
// 获得要循环的可执行对象
Executable executableItem = this.getDoExecutor();
// 获得要循环的可执行对象
Executable executableItem = this.getDoExecutor();
// 获取Break节点
Executable breakItem = this.getBreakItem();
// 获取Break节点
Executable breakItem = this.getBreakItem();
try{
// 循环执行
for (int i = 0; i < forCount; i++) {
executableItem.setCurrChainId(this.getCurrChainId());
// 设置循环index
setLoopIndex(executableItem, i);
executableItem.execute(slotIndex);
// 如果break组件不为空则去执行
if (ObjectUtil.isNotNull(breakItem)) {
breakItem.setCurrChainId(this.getCurrChainId());
setLoopIndex(breakItem, i);
breakItem.execute(slotIndex);
boolean isBreak = breakItem.getItemResultMetaValue(slotIndex);
if (isBreak) {
break;
}
}
}
}finally {
removeLoopIndex(executableItem);
}
}
try {
if (!isParallel()) {
//串行循环执行
for (int i = 0; i < forCount; i++) {
executableItem.setCurrChainId(this.getCurrChainId());
// 设置循环index
setLoopIndex(executableItem, i);
executableItem.execute(slotIndex);
// 如果break组件不为空则去执行
if (ObjectUtil.isNotNull(breakItem)) {
breakItem.setCurrChainId(this.getCurrChainId());
setLoopIndex(breakItem, i);
breakItem.execute(slotIndex);
boolean isBreak = breakItem.getItemResultMetaValue(slotIndex);
if (isBreak) {
break;
}
}
}
}else{
//并行循环执行
//存储所有的并行执行子项的CompletableFuture
List<CompletableFuture<LoopFutureObj>> futureList = new ArrayList<>();
//获取并行循环的线程池
ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildLoopParallelExecutor();
for (int i = 0; i < forCount; i++){
//提交异步任务
CompletableFuture<LoopFutureObj> future =
CompletableFuture.supplyAsync(new LoopParallelSupplier(executableItem, this.getCurrChainId(), slotIndex, i), parallelExecutor);
futureList.add(future);
if (ObjectUtil.isNotNull(breakItem)) {
breakItem.setCurrChainId(this.getCurrChainId());
setLoopIndex(breakItem, i);
breakItem.execute(slotIndex);
boolean isBreak = breakItem.getItemResultMetaValue(slotIndex);
if (isBreak) {
break;
}
}
}
//等待所有的异步执行完毕
CompletableFuture<?> resultCompletableFuture = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[]{}));
resultCompletableFuture.join();
//获取所有的执行结果,如果有失败的那么需要抛出异常
for (CompletableFuture<LoopFutureObj> future : futureList) {
LoopFutureObj loopFutureObj = future.get();
if (!loopFutureObj.isSuccess()) {
throw loopFutureObj.getEx();
}
}
}
} finally {
removeLoopIndex(executableItem);
}
}
@Override
public ConditionTypeEnum getConditionType() {
return ConditionTypeEnum.TYPE_FOR;
}
@Override
public ConditionTypeEnum getConditionType() {
return ConditionTypeEnum.TYPE_FOR;
}
public Node getForNode() {
return (Node) this.getExecutableOne(ConditionKey.FOR_KEY);
}
public Node getForNode() {
return (Node) this.getExecutableOne(ConditionKey.FOR_KEY);
}
public void setForNode(Node forNode) {
this.addExecutable(ConditionKey.FOR_KEY, forNode);
}
public void setForNode(Node forNode) {
this.addExecutable(ConditionKey.FOR_KEY, forNode);
}
}

View File

@ -6,83 +6,116 @@ import com.yomahub.liteflow.enums.ConditionTypeEnum;
import com.yomahub.liteflow.exception.NoIteratorNodeException;
import com.yomahub.liteflow.flow.element.Executable;
import com.yomahub.liteflow.flow.element.Node;
import com.yomahub.liteflow.flow.parallel.LoopFutureObj;
import com.yomahub.liteflow.slot.DataBus;
import com.yomahub.liteflow.slot.Slot;
import com.yomahub.liteflow.thread.ExecutorHelper;
import com.yomahub.liteflow.util.LiteFlowProxyUtil;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
public class IteratorCondition extends LoopCondition {
@Override
public void executeCondition(Integer slotIndex) throws Exception {
Slot slot = DataBus.getSlot(slotIndex);
Node iteratorNode = this.getIteratorNode();
@Override
public void executeCondition(Integer slotIndex) throws Exception {
Slot slot = DataBus.getSlot(slotIndex);
Node iteratorNode = this.getIteratorNode();
if (ObjectUtil.isNull(iteratorNode)) {
String errorInfo = StrUtil.format("[{}]:no iterator-node found", slot.getRequestId());
throw new NoIteratorNodeException(errorInfo);
}
if (ObjectUtil.isNull(iteratorNode)) {
String errorInfo = StrUtil.format("[{}]:no iterator-node found", slot.getRequestId());
throw new NoIteratorNodeException(errorInfo);
}
// 先去判断isAccess方法如果isAccess方法都返回false整个ITERATOR表达式不执行
if (!iteratorNode.isAccess(slotIndex)) {
return;
}
// 先去判断isAccess方法如果isAccess方法都返回false整个ITERATOR表达式不执行
if (!iteratorNode.isAccess(slotIndex)) {
return;
}
// 执行Iterator组件
iteratorNode.setCurrChainId(this.getCurrChainId());
iteratorNode.execute(slotIndex);
// 执行Iterator组件
iteratorNode.setCurrChainId(this.getCurrChainId());
iteratorNode.execute(slotIndex);
Iterator<?> it = iteratorNode.getItemResultMetaValue(slotIndex);
Iterator<?> it = iteratorNode.getItemResultMetaValue(slotIndex);
// 获得要循环的可执行对象
Executable executableItem = this.getDoExecutor();
// 获得要循环的可执行对象
Executable executableItem = this.getDoExecutor();
// 获取Break节点
Executable breakItem = this.getBreakItem();
// 获取Break节点
Executable breakItem = this.getBreakItem();
try{
int index = 0;
while (it.hasNext()) {
Object itObj = it.next();
try {
int index = 0;
if (!this.isParallel()) {
//原本的串行循环执行
while (it.hasNext()) {
Object itObj = it.next();
executableItem.setCurrChainId(this.getCurrChainId());
// 设置循环index
setLoopIndex(executableItem, index);
// 设置循环迭代器对象
setCurrLoopObject(executableItem, itObj);
// 执行可执行对象
executableItem.execute(slotIndex);
// 如果break组件不为空则去执行
if (ObjectUtil.isNotNull(breakItem)) {
breakItem.setCurrChainId(this.getCurrChainId());
setLoopIndex(breakItem, index);
setCurrLoopObject(breakItem, itObj);
breakItem.execute(slotIndex);
boolean isBreak = breakItem.getItemResultMetaValue(slotIndex);
if (isBreak) {
break;
}
}
index++;
}
}finally{
removeLoopIndex(executableItem);
removeCurrLoopObject(executableItem);
}
}
executableItem.setCurrChainId(this.getCurrChainId());
// 设置循环index
setLoopIndex(executableItem, index);
// 设置循环迭代器对象
setCurrLoopObject(executableItem, itObj);
// 执行可执行对象
executableItem.execute(slotIndex);
// 如果break组件不为空则去执行
if (ObjectUtil.isNotNull(breakItem)) {
breakItem.setCurrChainId(this.getCurrChainId());
setLoopIndex(breakItem, index);
setCurrLoopObject(breakItem, itObj);
breakItem.execute(slotIndex);
boolean isBreak = breakItem.getItemResultMetaValue(slotIndex);
if (isBreak) {
break;
}
}
index++;
}
} else {
//并行循环执行
//存储所有的并行执行子项的CompletableFuture
List<CompletableFuture<LoopFutureObj>> futureList = new ArrayList<>();
//获取并行循环的线程池
ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildLoopParallelExecutor();
while (it.hasNext()) {
Object itObj = it.next();
//提交异步任务
CompletableFuture<LoopFutureObj> future =
CompletableFuture.supplyAsync(new LoopParallelSupplier(executableItem, this.getCurrChainId(), slotIndex, index, itObj), parallelExecutor);
futureList.add(future);
//break判断
if (ObjectUtil.isNotNull(breakItem)) {
breakItem.setCurrChainId(this.getCurrChainId());
setLoopIndex(breakItem, index);
breakItem.execute(slotIndex);
boolean isBreak = breakItem.getItemResultMetaValue(slotIndex);
if (isBreak) {
break;
}
}
index++;
}
}
} finally {
removeLoopIndex(executableItem);
removeCurrLoopObject(executableItem);
}
}
@Override
public ConditionTypeEnum getConditionType() {
return ConditionTypeEnum.TYPE_ITERATOR;
}
@Override
public ConditionTypeEnum getConditionType() {
return ConditionTypeEnum.TYPE_ITERATOR;
}
public Node getIteratorNode() {
return (Node) this.getExecutableOne(ConditionKey.ITERATOR_KEY);
}
public Node getIteratorNode() {
return (Node) this.getExecutableOne(ConditionKey.ITERATOR_KEY);
}
public void setIteratorNode(Node iteratorNode) {
this.addExecutable(ConditionKey.ITERATOR_KEY, iteratorNode);
}
public void setIteratorNode(Node iteratorNode) {
this.addExecutable(ConditionKey.ITERATOR_KEY, iteratorNode);
}
}

View File

@ -4,6 +4,9 @@ import com.yomahub.liteflow.flow.element.Chain;
import com.yomahub.liteflow.flow.element.Condition;
import com.yomahub.liteflow.flow.element.Executable;
import com.yomahub.liteflow.flow.element.Node;
import com.yomahub.liteflow.flow.parallel.LoopFutureObj;
import java.util.function.Supplier;
/**
* 循环Condition的抽象类 主要继承对象有ForCondition和WhileCondition
@ -83,4 +86,47 @@ public abstract class LoopCondition extends Condition {
this.parallel = parallel;
}
// 循环并行执行的Supplier封装
public class LoopParallelSupplier implements Supplier<LoopFutureObj> {
private final Executable executableItem;
private final String currChainId;
private final Integer slotIndex;
private final Integer loopIndex;
private final Object itObj;
public LoopParallelSupplier(Executable executableItem, String currChainId, Integer slotIndex, Integer loopIndex) {
this.executableItem = executableItem;
this.currChainId = currChainId;
this.slotIndex = slotIndex;
this.loopIndex = loopIndex;
this.itObj = null;
}
public LoopParallelSupplier(Executable executableItem, String currChainId, Integer slotIndex, Integer loopIndex, Object itObj) {
this.executableItem = executableItem;
this.currChainId = currChainId;
this.slotIndex = slotIndex;
this.loopIndex = loopIndex;
this.itObj = itObj;
}
@Override
public LoopFutureObj get() {
try {
executableItem.setCurrChainId(this.currChainId);
// 设置循环index
setLoopIndex(executableItem, loopIndex);
//IteratorCondition的情况下需要设置当前循环对象
if(itObj != null){
setCurrLoopObject(executableItem, itObj);
}
executableItem.execute(slotIndex);
return LoopFutureObj.success(executableItem.getId());
} catch (Exception e) {
return LoopFutureObj.fail(executableItem.getId(), e);
}
}
}
}

View File

@ -4,6 +4,13 @@ import cn.hutool.core.util.ObjectUtil;
import com.yomahub.liteflow.enums.ConditionTypeEnum;
import com.yomahub.liteflow.flow.element.Executable;
import com.yomahub.liteflow.flow.element.Node;
import com.yomahub.liteflow.flow.parallel.LoopFutureObj;
import com.yomahub.liteflow.thread.ExecutorHelper;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
/**
* 循环条件Condition
@ -30,21 +37,55 @@ public class WhileCondition extends LoopCondition {
// 循环执行
int index = 0;
while (getWhileResult(slotIndex)) {
executableItem.setCurrChainId(this.getCurrChainId());
setLoopIndex(executableItem, index);
executableItem.execute(slotIndex);
// 如果break组件不为空则去执行
if (ObjectUtil.isNotNull(breakItem)) {
breakItem.setCurrChainId(this.getCurrChainId());
setLoopIndex(breakItem, index);
breakItem.execute(slotIndex);
boolean isBreak = breakItem.getItemResultMetaValue(slotIndex);
if (isBreak) {
break;
if(!this.isParallel()){
//串行循环
while (getWhileResult(slotIndex)) {
executableItem.setCurrChainId(this.getCurrChainId());
setLoopIndex(executableItem, index);
executableItem.execute(slotIndex);
// 如果break组件不为空则去执行
if (ObjectUtil.isNotNull(breakItem)) {
breakItem.setCurrChainId(this.getCurrChainId());
setLoopIndex(breakItem, index);
breakItem.execute(slotIndex);
boolean isBreak = breakItem.getItemResultMetaValue(slotIndex);
if (isBreak) {
break;
}
}
index++;
}
}else{
//并行循环逻辑
List<CompletableFuture<LoopFutureObj>> futureList = new ArrayList<>();
//获取并行循环的线程池
ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildLoopParallelExecutor();
while (getWhileResult(slotIndex)){
CompletableFuture<LoopFutureObj> future =
CompletableFuture.supplyAsync(new LoopParallelSupplier(executableItem, this.getCurrChainId(), slotIndex, index), parallelExecutor);
futureList.add(future);
//break判断
if (ObjectUtil.isNotNull(breakItem)) {
breakItem.setCurrChainId(this.getCurrChainId());
setLoopIndex(breakItem, index);
breakItem.execute(slotIndex);
boolean isBreak = breakItem.getItemResultMetaValue(slotIndex);
if (isBreak) {
break;
}
}
index++;
}
//等待所有的异步执行完毕
CompletableFuture<?> resultCompletableFuture = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[]{}));
resultCompletableFuture.join();
//获取所有的执行结果,如果有失败的那么需要抛出异常
for (CompletableFuture<LoopFutureObj> future : futureList) {
LoopFutureObj loopFutureObj = future.get();
if (!loopFutureObj.isSuccess()) {
throw loopFutureObj.getEx();
}
}
index++;
}
}

View File

@ -0,0 +1,54 @@
package com.yomahub.liteflow.flow.parallel;
/**
* 并行循环各子项的执行结果对象
*
* @author zhhhhy
* @since 2.10.5
*/
public class LoopFutureObj {
private String executorName;
private boolean success;
private Exception ex;
public static LoopFutureObj success(String executorName) {
LoopFutureObj result = new LoopFutureObj();
result.setSuccess(true);
result.setExecutorName(executorName);
return result;
}
public static LoopFutureObj fail(String executorName, Exception ex) {
LoopFutureObj result = new LoopFutureObj();
result.setSuccess(false);
result.setExecutorName(executorName);
result.setEx(ex);
return result;
}
public Exception getEx() {
return ex;
}
public String getExecutorName() {
return executorName;
}
public boolean isSuccess() {
return success;
}
public void setEx(Exception ex) {
this.ex = ex;
}
public void setExecutorName(String executorName) {
this.executorName = executorName;
}
public void setSuccess(boolean success) {
this.success = success;
}
}