enhancement #I7XAIB WHEN 增加 must 语法,调整 WhenCondition 逻辑
This commit is contained in:
parent
178d22f210
commit
4e616f9da6
|
@ -15,8 +15,8 @@ import com.yomahub.liteflow.exception.ELParseException;
|
|||
import com.yomahub.liteflow.exception.FlowSystemException;
|
||||
import com.yomahub.liteflow.flow.FlowBus;
|
||||
import com.yomahub.liteflow.flow.element.Chain;
|
||||
import com.yomahub.liteflow.flow.element.Node;
|
||||
import com.yomahub.liteflow.flow.element.Condition;
|
||||
import com.yomahub.liteflow.flow.element.Node;
|
||||
import com.yomahub.liteflow.log.LFLog;
|
||||
import com.yomahub.liteflow.log.LFLoggerManager;
|
||||
|
||||
|
@ -72,6 +72,7 @@ public class LiteFlowChainELBuilder {
|
|||
EXPRESS_RUNNER.addFunctionAndClassMethod(ChainConstant.DEFAULT, Object.class, new DefaultOperator());
|
||||
EXPRESS_RUNNER.addFunctionAndClassMethod(ChainConstant.TAG, Object.class, new TagOperator());
|
||||
EXPRESS_RUNNER.addFunctionAndClassMethod(ChainConstant.ANY, Object.class, new AnyOperator());
|
||||
EXPRESS_RUNNER.addFunctionAndClassMethod(ChainConstant.MUST, Object.class, new MustOperator());
|
||||
EXPRESS_RUNNER.addFunctionAndClassMethod(ChainConstant.ID, Object.class, new IdOperator());
|
||||
EXPRESS_RUNNER.addFunctionAndClassMethod(ChainConstant.IGNORE_ERROR, Object.class, new IgnoreErrorOperator());
|
||||
EXPRESS_RUNNER.addFunctionAndClassMethod(ChainConstant.THREAD_POOL, Object.class, new ThreadPoolOperator());
|
||||
|
|
|
@ -2,6 +2,7 @@ package com.yomahub.liteflow.builder.el.operator;
|
|||
|
||||
import com.yomahub.liteflow.builder.el.operator.base.BaseOperator;
|
||||
import com.yomahub.liteflow.builder.el.operator.base.OperatorHelper;
|
||||
import com.yomahub.liteflow.enums.ParallelStrategyEnum;
|
||||
import com.yomahub.liteflow.flow.element.condition.WhenCondition;
|
||||
|
||||
/**
|
||||
|
@ -19,7 +20,7 @@ public class AnyOperator extends BaseOperator<WhenCondition> {
|
|||
WhenCondition whenCondition = OperatorHelper.convert(objects[0], WhenCondition.class);
|
||||
|
||||
Boolean any = OperatorHelper.convert(objects[1], Boolean.class);
|
||||
whenCondition.setAny(any);
|
||||
whenCondition.setParallelStrategy(any ? ParallelStrategyEnum.ANY : ParallelStrategyEnum.ALL);
|
||||
return whenCondition;
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,28 @@
|
|||
package com.yomahub.liteflow.builder.el.operator;
|
||||
|
||||
import com.yomahub.liteflow.builder.el.operator.base.BaseOperator;
|
||||
import com.yomahub.liteflow.builder.el.operator.base.OperatorHelper;
|
||||
import com.yomahub.liteflow.enums.ParallelStrategyEnum;
|
||||
import com.yomahub.liteflow.flow.element.condition.WhenCondition;
|
||||
|
||||
/**
|
||||
* EL 规则中的 must 的操作符
|
||||
*
|
||||
* @author luo yi
|
||||
* @since 2.11.0
|
||||
*/
|
||||
public class MustOperator extends BaseOperator<WhenCondition> {
|
||||
|
||||
@Override
|
||||
public WhenCondition build(Object[] objects) throws Exception {
|
||||
OperatorHelper.checkObjectSizeEqTwo(objects);
|
||||
|
||||
WhenCondition whenCondition = OperatorHelper.convert(objects[0], WhenCondition.class);
|
||||
|
||||
String specifyId = OperatorHelper.convert(objects[1], String.class);
|
||||
whenCondition.setSpecifyId(specifyId);
|
||||
whenCondition.setParallelStrategy(ParallelStrategyEnum.SPECIFY);
|
||||
return whenCondition;
|
||||
}
|
||||
|
||||
}
|
|
@ -30,6 +30,8 @@ public interface ChainConstant {
|
|||
|
||||
String ANY = "any";
|
||||
|
||||
String MUST = "must";
|
||||
|
||||
String TYPE = "type";
|
||||
|
||||
String THEN = "THEN";
|
||||
|
|
|
@ -0,0 +1,57 @@
|
|||
package com.yomahub.liteflow.enums;
|
||||
|
||||
import com.yomahub.liteflow.flow.parallel.strategy.AllOfParallelExecutor;
|
||||
import com.yomahub.liteflow.flow.parallel.strategy.AnyOfParallelExecutor;
|
||||
import com.yomahub.liteflow.flow.parallel.strategy.ParallelStrategyExecutor;
|
||||
import com.yomahub.liteflow.flow.parallel.strategy.SpecifyParallelExecutor;
|
||||
|
||||
/**
|
||||
* 并行策略枚举类
|
||||
*
|
||||
* @author luo yi
|
||||
* @since 2.11.0
|
||||
*/
|
||||
public enum ParallelStrategyEnum {
|
||||
|
||||
ANY("anyOf", "完成任一任务", AnyOfParallelExecutor.class),
|
||||
|
||||
ALL("allOf", "完成全部任务", AllOfParallelExecutor.class),
|
||||
|
||||
SPECIFY("must", "完成指定任务", SpecifyParallelExecutor.class);
|
||||
|
||||
private String strategyType;
|
||||
|
||||
private String description;
|
||||
|
||||
private Class<? extends ParallelStrategyExecutor> clazz;
|
||||
|
||||
ParallelStrategyEnum(String strategyType, String description, Class<? extends ParallelStrategyExecutor> clazz) {
|
||||
this.strategyType = strategyType;
|
||||
this.description = description;
|
||||
this.clazz = clazz;
|
||||
}
|
||||
|
||||
public String getStrategyType() {
|
||||
return strategyType;
|
||||
}
|
||||
|
||||
public void setStrategyType(String strategyType) {
|
||||
this.strategyType = strategyType;
|
||||
}
|
||||
|
||||
public String getDescription() {
|
||||
return description;
|
||||
}
|
||||
|
||||
public void setDescription(String description) {
|
||||
this.description = description;
|
||||
}
|
||||
|
||||
public Class<? extends ParallelStrategyExecutor> getClazz() {
|
||||
return clazz;
|
||||
}
|
||||
|
||||
public void setClazz(Class<? extends ParallelStrategyExecutor> clazz) {
|
||||
this.clazz = clazz;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,29 @@
|
|||
package com.yomahub.liteflow.exception;
|
||||
|
||||
/**
|
||||
* 并行策略执行器创建异常
|
||||
*
|
||||
* @author luo yi
|
||||
* @since 2.11.0
|
||||
*/
|
||||
public class ParallelExecutorCreateException extends RuntimeException {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
/** 异常信息 */
|
||||
private String message;
|
||||
|
||||
public ParallelExecutorCreateException(String message) {
|
||||
this.message = message;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getMessage() {
|
||||
return message;
|
||||
}
|
||||
|
||||
public void setMessage(String message) {
|
||||
this.message = message;
|
||||
}
|
||||
|
||||
}
|
|
@ -7,28 +7,16 @@
|
|||
*/
|
||||
package com.yomahub.liteflow.flow.element.condition;
|
||||
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.yomahub.liteflow.common.LocalDefaultFlowConstant;
|
||||
import com.yomahub.liteflow.enums.ConditionTypeEnum;
|
||||
import com.yomahub.liteflow.exception.WhenExecuteException;
|
||||
import com.yomahub.liteflow.enums.ParallelStrategyEnum;
|
||||
import com.yomahub.liteflow.flow.element.Condition;
|
||||
import com.yomahub.liteflow.flow.parallel.CompletableFutureTimeout;
|
||||
import com.yomahub.liteflow.flow.parallel.ParallelSupplier;
|
||||
import com.yomahub.liteflow.flow.parallel.WhenFutureObj;
|
||||
import com.yomahub.liteflow.flow.parallel.strategy.ParallelStrategyExecutor;
|
||||
import com.yomahub.liteflow.flow.parallel.strategy.ParallelStrategyHelper;
|
||||
import com.yomahub.liteflow.log.LFLog;
|
||||
import com.yomahub.liteflow.log.LFLoggerManager;
|
||||
import com.yomahub.liteflow.property.LiteflowConfig;
|
||||
import com.yomahub.liteflow.property.LiteflowConfigGetter;
|
||||
import com.yomahub.liteflow.slot.DataBus;
|
||||
import com.yomahub.liteflow.slot.Slot;
|
||||
import com.yomahub.liteflow.thread.ExecutorHelper;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* 并行器
|
||||
|
@ -46,8 +34,11 @@ public class WhenCondition extends Condition {
|
|||
// 此属性已弃用
|
||||
private String group = LocalDefaultFlowConstant.DEFAULT;
|
||||
|
||||
// 只在when类型下有效,为true的话说明在多个并行节点下,任意一个成功,整个when就成功
|
||||
private boolean any = false;
|
||||
// 当前 When 对应并行策略,默认为 ALL
|
||||
private ParallelStrategyEnum parallelStrategy;
|
||||
|
||||
// 只有 must 条件下,才会赋值 specifyId
|
||||
private String specifyId;
|
||||
|
||||
// when单独的线程池名称
|
||||
private String threadExecutorClass;
|
||||
|
@ -71,133 +62,10 @@ public class WhenCondition extends Condition {
|
|||
// 使用线程池执行when并发流程
|
||||
// 这块涉及到挺多的多线程逻辑,所以注释比较详细,看到这里的童鞋可以仔细阅读
|
||||
private void executeAsyncCondition(Integer slotIndex) throws Exception {
|
||||
Slot slot = DataBus.getSlot(slotIndex);
|
||||
|
||||
String currChainName = this.getCurrChainId();
|
||||
|
||||
// 此方法其实只会初始化一次Executor,不会每次都会初始化。Executor是唯一的
|
||||
ExecutorService parallelExecutor = ExecutorHelper.loadInstance()
|
||||
.buildWhenExecutor(this.getThreadExecutorClass());
|
||||
|
||||
// 获得liteflow的参数
|
||||
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
|
||||
|
||||
// 定义是否中断参数
|
||||
// 这里为什么要定义成数组呢,因为后面lambda要用到,根据final不能修改引用的原则,这里用了数组对象
|
||||
final boolean[] interrupted = { false };
|
||||
|
||||
// 这里主要是做了封装CompletableFuture对象,用lumbda表达式做了很多事情,这句代码要仔细理清
|
||||
// 1.先进行过滤,前置和后置组件过滤掉,因为在EL Chain处理的时候已经提出来了
|
||||
// 2.过滤isAccess为false的情况,因为不过滤这个的话,如果加上了any,那么isAccess为false那就是最快的了
|
||||
// 3.根据condition.getNodeList()的集合进行流处理,用map进行把executable对象转换成List<CompletableFuture<WhenFutureObj>>
|
||||
// 4.在转的过程中,套入CompletableFutureTimeout方法进行超时判断,如果超时则用WhenFutureObj.timeOut返回超时的对象
|
||||
// 5.第2个参数是主要的本体CompletableFuture,传入了ParallelSupplier和线程池对象
|
||||
if (ObjectUtil.isNull(this.getMaxWaitTime())) {
|
||||
if (ObjectUtil.isNotNull(liteflowConfig.getWhenMaxWaitSeconds())) {
|
||||
// 获取全局异步线程最长等待秒数
|
||||
this.setMaxWaitTime(liteflowConfig.getWhenMaxWaitSeconds());
|
||||
this.setMaxWaitTimeUnit(TimeUnit.SECONDS);
|
||||
} else {
|
||||
// 获取全局异步线程最⻓的等待时间
|
||||
this.setMaxWaitTime(liteflowConfig.getWhenMaxWaitTime());
|
||||
}
|
||||
}
|
||||
|
||||
if (ObjectUtil.isNull(this.getMaxWaitTimeUnit())) {
|
||||
// 获取全局异步线程最⻓的等待时间单位
|
||||
this.setMaxWaitTimeUnit(liteflowConfig.getWhenMaxWaitTimeUnit());
|
||||
}
|
||||
|
||||
List<CompletableFuture<WhenFutureObj>> completableFutureList = this.getExecutableList()
|
||||
.stream()
|
||||
.filter(executable -> !(executable instanceof PreCondition) && !(executable instanceof FinallyCondition))
|
||||
.filter(executable -> {
|
||||
try {
|
||||
return executable.isAccess(slotIndex);
|
||||
} catch (Exception e) {
|
||||
LOG.error("there was an error when executing the when component isAccess", e);
|
||||
return false;
|
||||
}
|
||||
})
|
||||
.map(executable -> CompletableFutureTimeout.completeOnTimeout(
|
||||
WhenFutureObj.timeOut(executable.getId()),
|
||||
CompletableFuture.supplyAsync(new ParallelSupplier(executable, currChainName, slotIndex),
|
||||
parallelExecutor),
|
||||
this.getMaxWaitTime(), this.getMaxWaitTimeUnit()))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
CompletableFuture<?> resultCompletableFuture;
|
||||
|
||||
// 这里判断执行方式
|
||||
// 如果any为false,说明这些异步任务全部执行好或者超时,才返回
|
||||
// 如果any为true,说明这些异步任务只要任意一个执行完成,就返回
|
||||
if (this.isAny()) {
|
||||
// 把这些CompletableFuture通过anyOf合成一个CompletableFuture
|
||||
resultCompletableFuture = CompletableFuture
|
||||
.anyOf(completableFutureList.toArray(new CompletableFuture[] {}));
|
||||
} else {
|
||||
// 把这些CompletableFuture通过allOf合成一个CompletableFuture
|
||||
resultCompletableFuture = CompletableFuture
|
||||
.allOf(completableFutureList.toArray(new CompletableFuture[] {}));
|
||||
}
|
||||
|
||||
try {
|
||||
// 进行执行,这句执行完后,就意味着所有的任务要么执行完毕,要么超时返回
|
||||
resultCompletableFuture.get();
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
LOG.error("there was an error when executing the CompletableFuture", e);
|
||||
interrupted[0] = true;
|
||||
}
|
||||
|
||||
// 拿到已经完成的CompletableFuture
|
||||
// 如果any为false,那么所有任务都已经完成
|
||||
// 如果any为true,那么这里拿到的是第一个完成的任务
|
||||
// 这里过滤和转换一起用lumbda做了
|
||||
List<WhenFutureObj> allCompletableWhenFutureObjList = completableFutureList.stream().filter(f -> {
|
||||
// 过滤出已经完成的,没完成的就直接终止
|
||||
if (f.isDone()) {
|
||||
return true;
|
||||
} else {
|
||||
f.cancel(true);
|
||||
return false;
|
||||
}
|
||||
}).map(f -> {
|
||||
try {
|
||||
return f.get();
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
interrupted[0] = true;
|
||||
return null;
|
||||
}
|
||||
}).collect(Collectors.toList());
|
||||
|
||||
// 判断超时,上面已经拿到了所有已经完成的CompletableFuture
|
||||
// 那我们只要过滤出超时的CompletableFuture
|
||||
List<WhenFutureObj> timeOutWhenFutureObjList = allCompletableWhenFutureObjList.stream()
|
||||
.filter(WhenFutureObj::isTimeout)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
// 输出超时信息
|
||||
timeOutWhenFutureObjList.forEach(whenFutureObj -> LOG.warn(
|
||||
"executing thread has reached max-wait-seconds, thread canceled.Execute-item: [{}]", whenFutureObj.getExecutorName()));
|
||||
|
||||
// 当配置了ignoreError = false,出现interrupted或者!f.get()的情况,将抛出WhenExecuteException
|
||||
if (!this.isIgnoreError()) {
|
||||
if (interrupted[0]) {
|
||||
throw new WhenExecuteException(StrUtil
|
||||
.format("requestId [{}] when execute interrupted. errorResume [false].", slot.getRequestId()));
|
||||
}
|
||||
|
||||
// 循环判断CompletableFuture的返回值,如果异步执行失败,则抛出相应的业务异常
|
||||
for (WhenFutureObj whenFutureObj : allCompletableWhenFutureObjList) {
|
||||
if (!whenFutureObj.isSuccess()) {
|
||||
LOG.info(StrUtil.format("when-executor[{}] execute failed. errorResume [false].", whenFutureObj.getExecutorName()));
|
||||
throw whenFutureObj.getEx();
|
||||
}
|
||||
}
|
||||
} else if (interrupted[0]) {
|
||||
// 这里由于配置了ignoreError,所以只打印warn日志
|
||||
LOG.warn("executing when condition timeout , but ignore with errorResume.");
|
||||
}
|
||||
// 获取并发执行策略
|
||||
ParallelStrategyExecutor parallelStrategyExecutor = ParallelStrategyHelper.loadInstance().buildParallelExecutor(this.getParallelStrategy());
|
||||
// 执行逻辑
|
||||
parallelStrategyExecutor.execute(this, slotIndex);
|
||||
}
|
||||
|
||||
public boolean isIgnoreError() {
|
||||
|
@ -216,12 +84,20 @@ public class WhenCondition extends Condition {
|
|||
this.group = group;
|
||||
}
|
||||
|
||||
public boolean isAny() {
|
||||
return any;
|
||||
public ParallelStrategyEnum getParallelStrategy() {
|
||||
return parallelStrategy;
|
||||
}
|
||||
|
||||
public void setAny(boolean any) {
|
||||
this.any = any;
|
||||
public void setParallelStrategy(ParallelStrategyEnum parallelStrategy) {
|
||||
this.parallelStrategy = parallelStrategy;
|
||||
}
|
||||
|
||||
public String getSpecifyId() {
|
||||
return specifyId;
|
||||
}
|
||||
|
||||
public void setSpecifyId(String specifyId) {
|
||||
this.specifyId = specifyId;
|
||||
}
|
||||
|
||||
public String getThreadExecutorClass() {
|
||||
|
|
|
@ -0,0 +1,31 @@
|
|||
package com.yomahub.liteflow.flow.parallel.strategy;
|
||||
|
||||
import com.yomahub.liteflow.flow.element.condition.WhenCondition;
|
||||
import com.yomahub.liteflow.flow.parallel.WhenFutureObj;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
/**
|
||||
* 完成全部任务
|
||||
*
|
||||
* @author luo yi
|
||||
* @since 2.11.0
|
||||
*/
|
||||
public class AllOfParallelExecutor extends ParallelStrategyExecutor {
|
||||
|
||||
@Override
|
||||
public void execute(WhenCondition whenCondition, Integer slotIndex) throws Exception {
|
||||
|
||||
// 获取所有 CompletableFuture
|
||||
List<CompletableFuture<WhenFutureObj>> completableFutureList = this.getCompletableFutureList(whenCondition, slotIndex);
|
||||
|
||||
// 把这些 CompletableFuture 通过 anyOf 合成一个 CompletableFuture
|
||||
CompletableFuture<?> resultCompletableFuture = CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[] {}));
|
||||
|
||||
// 结果处理
|
||||
this.handleResult(whenCondition, slotIndex, completableFutureList, resultCompletableFuture);
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,31 @@
|
|||
package com.yomahub.liteflow.flow.parallel.strategy;
|
||||
|
||||
import com.yomahub.liteflow.flow.element.condition.WhenCondition;
|
||||
import com.yomahub.liteflow.flow.parallel.WhenFutureObj;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
/**
|
||||
* 完成任一任务
|
||||
*
|
||||
* @author luo yi
|
||||
* @since 2.11.0
|
||||
*/
|
||||
public class AnyOfParallelExecutor extends ParallelStrategyExecutor {
|
||||
|
||||
@Override
|
||||
public void execute(WhenCondition whenCondition, Integer slotIndex) throws Exception {
|
||||
|
||||
// 获取所有 CompletableFuture
|
||||
List<CompletableFuture<WhenFutureObj>> completableFutureList = this.getCompletableFutureList(whenCondition, slotIndex);
|
||||
|
||||
// 把这些 CompletableFuture 通过 anyOf 合成一个 CompletableFuture
|
||||
CompletableFuture<?> resultCompletableFuture = CompletableFuture.anyOf(completableFutureList.toArray(new CompletableFuture[] {}));
|
||||
|
||||
// 结果处理
|
||||
this.handleResult(whenCondition, slotIndex, completableFutureList, resultCompletableFuture);
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,194 @@
|
|||
package com.yomahub.liteflow.flow.parallel.strategy;
|
||||
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.yomahub.liteflow.exception.WhenExecuteException;
|
||||
import com.yomahub.liteflow.flow.element.Executable;
|
||||
import com.yomahub.liteflow.flow.element.condition.FinallyCondition;
|
||||
import com.yomahub.liteflow.flow.element.condition.PreCondition;
|
||||
import com.yomahub.liteflow.flow.element.condition.WhenCondition;
|
||||
import com.yomahub.liteflow.flow.parallel.CompletableFutureTimeout;
|
||||
import com.yomahub.liteflow.flow.parallel.ParallelSupplier;
|
||||
import com.yomahub.liteflow.flow.parallel.WhenFutureObj;
|
||||
import com.yomahub.liteflow.log.LFLog;
|
||||
import com.yomahub.liteflow.log.LFLoggerManager;
|
||||
import com.yomahub.liteflow.property.LiteflowConfig;
|
||||
import com.yomahub.liteflow.property.LiteflowConfigGetter;
|
||||
import com.yomahub.liteflow.slot.DataBus;
|
||||
import com.yomahub.liteflow.slot.Slot;
|
||||
import com.yomahub.liteflow.thread.ExecutorHelper;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* 并发策略执行器抽象类
|
||||
*
|
||||
* @author luo yi
|
||||
* @since 2.11.0
|
||||
*/
|
||||
public abstract class ParallelStrategyExecutor {
|
||||
|
||||
protected final LFLog LOG = LFLoggerManager.getLogger(this.getClass());
|
||||
|
||||
/**
|
||||
* 封装 CompletableFuture 对象
|
||||
* @param executable
|
||||
* @param parallelExecutor
|
||||
* @param whenCondition
|
||||
* @param currChainName
|
||||
* @param slotIndex
|
||||
* @return
|
||||
*/
|
||||
protected CompletableFuture<WhenFutureObj> wrappedFutureObj(Executable executable, ExecutorService parallelExecutor,
|
||||
WhenCondition whenCondition, String currChainName, Integer slotIndex) {
|
||||
return CompletableFutureTimeout.completeOnTimeout(
|
||||
WhenFutureObj.timeOut(executable.getId()),
|
||||
CompletableFuture.supplyAsync(new ParallelSupplier(executable, currChainName, slotIndex), parallelExecutor),
|
||||
whenCondition.getMaxWaitTime(),
|
||||
whenCondition.getMaxWaitTimeUnit());
|
||||
}
|
||||
|
||||
/**
|
||||
* 设置 WhenCondition 参数
|
||||
* @param whenCondition
|
||||
*/
|
||||
protected void setWhenConditionParams(WhenCondition whenCondition) {
|
||||
// 获得liteflow的参数
|
||||
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
|
||||
if (ObjectUtil.isNull(whenCondition.getMaxWaitTime())) {
|
||||
if (ObjectUtil.isNotNull(liteflowConfig.getWhenMaxWaitSeconds())) {
|
||||
// 获取全局异步线程最长等待秒数
|
||||
whenCondition.setMaxWaitTime(liteflowConfig.getWhenMaxWaitSeconds());
|
||||
whenCondition.setMaxWaitTimeUnit(TimeUnit.SECONDS);
|
||||
} else {
|
||||
// 获取全局异步线程最⻓的等待时间
|
||||
whenCondition.setMaxWaitTime(liteflowConfig.getWhenMaxWaitTime());
|
||||
}
|
||||
}
|
||||
|
||||
if (ObjectUtil.isNull(whenCondition.getMaxWaitTimeUnit())) {
|
||||
// 获取全局异步线程最⻓的等待时间单位
|
||||
whenCondition.setMaxWaitTimeUnit(liteflowConfig.getWhenMaxWaitTimeUnit());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取所有任务
|
||||
* @param whenCondition
|
||||
* @param slotIndex
|
||||
* @return
|
||||
*/
|
||||
protected List<CompletableFuture<WhenFutureObj>> getCompletableFutureList(WhenCondition whenCondition, Integer slotIndex) {
|
||||
String currChainName = whenCondition.getCurrChainId();
|
||||
|
||||
// 此方法其实只会初始化一次Executor,不会每次都会初始化。Executor是唯一的
|
||||
ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildWhenExecutor(whenCondition.getThreadExecutorClass());
|
||||
|
||||
// 设置参数
|
||||
setWhenConditionParams(whenCondition);
|
||||
|
||||
// 这里主要是做了封装CompletableFuture对象,用lumbda表达式做了很多事情,这句代码要仔细理清
|
||||
// 1.先进行过滤,前置和后置组件过滤掉,因为在EL Chain处理的时候已经提出来了
|
||||
// 2.过滤isAccess为false的情况,因为不过滤这个的话,如果加上了any,那么isAccess为false那就是最快的了
|
||||
// 3.根据condition.getNodeList()的集合进行流处理,用map进行把executable对象转换成List<CompletableFuture<WhenFutureObj>>
|
||||
// 4.在转的过程中,套入CompletableFutureTimeout方法进行超时判断,如果超时则用WhenFutureObj.timeOut返回超时的对象
|
||||
// 5.第2个参数是主要的本体CompletableFuture,传入了ParallelSupplier和线程池对象
|
||||
List<CompletableFuture<WhenFutureObj>> completableFutureList = whenCondition.getExecutableList()
|
||||
.stream()
|
||||
.filter(executable -> !(executable instanceof PreCondition) && !(executable instanceof FinallyCondition))
|
||||
.filter(executable -> {
|
||||
try {
|
||||
return executable.isAccess(slotIndex);
|
||||
} catch (Exception e) {
|
||||
LOG.error("there was an error when executing the when component isAccess", e);
|
||||
return false;
|
||||
}
|
||||
})
|
||||
.map(executable -> wrappedFutureObj(executable, parallelExecutor, whenCondition, currChainName, slotIndex))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
return completableFutureList;
|
||||
}
|
||||
|
||||
/**
|
||||
* 结果处理
|
||||
* @param whenCondition
|
||||
* @param slotIndex
|
||||
* @param completableFutureList
|
||||
* @param resultCompletableFuture
|
||||
* @throws Exception
|
||||
*/
|
||||
protected void handleResult(WhenCondition whenCondition, Integer slotIndex, List<CompletableFuture<WhenFutureObj>> completableFutureList, CompletableFuture<?> resultCompletableFuture) throws Exception {
|
||||
Slot slot = DataBus.getSlot(slotIndex);
|
||||
|
||||
// 定义是否中断参数
|
||||
// 这里为什么要定义成数组呢,因为后面lambda要用到,根据final不能修改引用的原则,这里用了数组对象
|
||||
final boolean[] interrupted = { false };
|
||||
|
||||
try {
|
||||
// 进行执行,这句执行完后,就意味着所有的任务要么执行完毕,要么超时返回
|
||||
resultCompletableFuture.get();
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
LOG.error("there was an error when executing the CompletableFuture", e);
|
||||
interrupted[0] = true;
|
||||
}
|
||||
|
||||
// 拿到已经完成的CompletableFuture
|
||||
// 如果any为false,那么所有任务都已经完成
|
||||
// 如果any为true,那么这里拿到的是第一个完成的任务
|
||||
// 这里过滤和转换一起用lumbda做了
|
||||
List<WhenFutureObj> allCompletableWhenFutureObjList = completableFutureList.stream().filter(f -> {
|
||||
// 过滤出已经完成的,没完成的就直接终止
|
||||
if (f.isDone()) {
|
||||
return true;
|
||||
} else {
|
||||
f.cancel(true);
|
||||
return false;
|
||||
}
|
||||
}).map(f -> {
|
||||
try {
|
||||
return f.get();
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
interrupted[0] = true;
|
||||
return null;
|
||||
}
|
||||
}).collect(Collectors.toList());
|
||||
|
||||
// 判断超时,上面已经拿到了所有已经完成的CompletableFuture
|
||||
// 那我们只要过滤出超时的CompletableFuture
|
||||
List<WhenFutureObj> timeOutWhenFutureObjList = allCompletableWhenFutureObjList.stream()
|
||||
.filter(WhenFutureObj::isTimeout)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
// 输出超时信息
|
||||
timeOutWhenFutureObjList.forEach(whenFutureObj -> LOG.warn(
|
||||
"executing thread has reached max-wait-seconds, thread canceled.Execute-item: [{}]", whenFutureObj.getExecutorName()));
|
||||
|
||||
// 当配置了ignoreError = false,出现interrupted或者!f.get()的情况,将抛出WhenExecuteException
|
||||
if (!whenCondition.isIgnoreError()) {
|
||||
if (interrupted[0]) {
|
||||
throw new WhenExecuteException(StrUtil
|
||||
.format("requestId [{}] when execute interrupted. errorResume [false].", slot.getRequestId()));
|
||||
}
|
||||
|
||||
// 循环判断CompletableFuture的返回值,如果异步执行失败,则抛出相应的业务异常
|
||||
for (WhenFutureObj whenFutureObj : allCompletableWhenFutureObjList) {
|
||||
if (!whenFutureObj.isSuccess()) {
|
||||
LOG.info(StrUtil.format("when-executor[{}] execute failed. errorResume [false].", whenFutureObj.getExecutorName()));
|
||||
throw whenFutureObj.getEx();
|
||||
}
|
||||
}
|
||||
} else if (interrupted[0]) {
|
||||
// 这里由于配置了ignoreError,所以只打印warn日志
|
||||
LOG.warn("executing when condition timeout , but ignore with errorResume.");
|
||||
}
|
||||
}
|
||||
|
||||
public abstract void execute(WhenCondition whenCondition, Integer slotIndex) throws Exception;
|
||||
|
||||
}
|
|
@ -0,0 +1,79 @@
|
|||
package com.yomahub.liteflow.flow.parallel.strategy;
|
||||
|
||||
import cn.hutool.core.map.MapUtil;
|
||||
import cn.hutool.core.util.ObjUtil;
|
||||
import com.yomahub.liteflow.enums.ParallelStrategyEnum;
|
||||
import com.yomahub.liteflow.exception.ParallelExecutorCreateException;
|
||||
import com.yomahub.liteflow.log.LFLog;
|
||||
import com.yomahub.liteflow.log.LFLoggerManager;
|
||||
import com.yomahub.liteflow.spi.holder.ContextAwareHolder;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* WHEN 并发策略辅助
|
||||
*
|
||||
* @author luo yi
|
||||
* @since 2.11.0
|
||||
*/
|
||||
public class ParallelStrategyHelper {
|
||||
|
||||
private final LFLog LOG = LFLoggerManager.getLogger(ParallelStrategyHelper.class);
|
||||
|
||||
/**
|
||||
* 此处使用Map缓存线程池信息 key - 线程池构建者的Class全类名 value - 线程池对象
|
||||
*/
|
||||
private final Map<ParallelStrategyEnum, ParallelStrategyExecutor> strategyExecutorMap;
|
||||
|
||||
private ParallelStrategyHelper() {
|
||||
strategyExecutorMap = MapUtil.newConcurrentHashMap();
|
||||
}
|
||||
|
||||
/**
|
||||
* 使用静态内部类实现单例模式
|
||||
*/
|
||||
private static class Holder {
|
||||
|
||||
static final ParallelStrategyHelper INSTANCE = new ParallelStrategyHelper();
|
||||
|
||||
}
|
||||
|
||||
public static ParallelStrategyHelper loadInstance() {
|
||||
return ParallelStrategyHelper.Holder.INSTANCE;
|
||||
}
|
||||
|
||||
private ParallelStrategyExecutor getParallelStrategyExecutor(ParallelStrategyEnum parallelStrategyEnum) {
|
||||
try {
|
||||
ParallelStrategyExecutor strategyExecutor = strategyExecutorMap.get(parallelStrategyEnum);
|
||||
if (ObjUtil.isNotNull(strategyExecutor)) return strategyExecutor;
|
||||
|
||||
Class<ParallelStrategyExecutor> executorClass = (Class<ParallelStrategyExecutor>) Class.forName(parallelStrategyEnum.getClazz().getName());
|
||||
strategyExecutor = ContextAwareHolder.loadContextAware().registerBean(executorClass);
|
||||
strategyExecutorMap.put(parallelStrategyEnum, strategyExecutor);
|
||||
return strategyExecutor;
|
||||
} catch (Exception e) {
|
||||
LOG.error(e.getMessage());
|
||||
throw new ParallelExecutorCreateException(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public ParallelStrategyExecutor buildParallelExecutor(ParallelStrategyEnum parallelStrategyEnum) {
|
||||
if (ObjUtil.isNull(parallelStrategyEnum)) return buildParallelExecutor();
|
||||
return getParallelStrategyExecutor(parallelStrategyEnum);
|
||||
}
|
||||
|
||||
/**
|
||||
* 默认需完成所有任务
|
||||
* @return
|
||||
*/
|
||||
public ParallelStrategyExecutor buildParallelExecutor() {
|
||||
return buildParallelExecutor(ParallelStrategyEnum.ALL);
|
||||
}
|
||||
|
||||
public void clearStrategyExecutorMap() {
|
||||
if (MapUtil.isNotEmpty(strategyExecutorMap)) {
|
||||
strategyExecutorMap.clear();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,78 @@
|
|||
package com.yomahub.liteflow.flow.parallel.strategy;
|
||||
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import cn.hutool.core.util.ObjUtil;
|
||||
import com.yomahub.liteflow.flow.element.Executable;
|
||||
import com.yomahub.liteflow.flow.element.condition.FinallyCondition;
|
||||
import com.yomahub.liteflow.flow.element.condition.PreCondition;
|
||||
import com.yomahub.liteflow.flow.element.condition.WhenCondition;
|
||||
import com.yomahub.liteflow.flow.parallel.WhenFutureObj;
|
||||
import com.yomahub.liteflow.thread.ExecutorHelper;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* 完成指定任务执行器,使用 ID 进行比较
|
||||
*
|
||||
* @author luo yi
|
||||
* @since 2.11.0
|
||||
*/
|
||||
public class SpecifyParallelExecutor extends ParallelStrategyExecutor {
|
||||
|
||||
@Override
|
||||
public void execute(WhenCondition whenCondition, Integer slotIndex) throws Exception {
|
||||
|
||||
String currChainName = whenCondition.getCurrChainId();
|
||||
|
||||
this.setWhenConditionParams(whenCondition);
|
||||
|
||||
// 此方法其实只会初始化一次Executor,不会每次都会初始化。Executor是唯一的
|
||||
ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildWhenExecutor(whenCondition.getThreadExecutorClass());
|
||||
|
||||
// 过滤指定 ID 的任务,且该任务只会有一个或者没有
|
||||
Map<Boolean, List<Executable>> specifyExecutableMap = whenCondition.getExecutableList()
|
||||
.stream()
|
||||
.filter(executable -> !(executable instanceof PreCondition) && !(executable instanceof FinallyCondition))
|
||||
.filter(executable -> {
|
||||
try {
|
||||
return executable.isAccess(slotIndex);
|
||||
} catch (Exception e) {
|
||||
LOG.error("there was an error when executing the when component isAccess", e);
|
||||
return false;
|
||||
}
|
||||
})
|
||||
.collect(Collectors.partitioningBy(executable -> whenCondition.getSpecifyId().equals(executable.getId())));
|
||||
|
||||
CompletableFuture<?> resultCompletableFuture = null;
|
||||
|
||||
// 处理非指定 task
|
||||
List<CompletableFuture<WhenFutureObj>> completableFutureList = specifyExecutableMap.get(Boolean.FALSE)
|
||||
.stream()
|
||||
.map(executable -> wrappedFutureObj(executable, parallelExecutor, whenCondition, currChainName, slotIndex))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
if (specifyExecutableMap.containsKey(Boolean.TRUE) && CollUtil.isNotEmpty(specifyExecutableMap.get(Boolean.TRUE))) {
|
||||
// 存在 must 指定的 task
|
||||
CompletableFuture<WhenFutureObj> specifyCompletableFuture = wrappedFutureObj(specifyExecutableMap.get(Boolean.TRUE).get(0), parallelExecutor, whenCondition, currChainName, slotIndex);
|
||||
// 组合所有任务
|
||||
completableFutureList.add(specifyCompletableFuture);
|
||||
// 设置结果 future
|
||||
resultCompletableFuture = specifyCompletableFuture;
|
||||
}
|
||||
|
||||
if (ObjUtil.isNull(resultCompletableFuture)) {
|
||||
LOG.warn("The specified task[{}] was not found, waiting for all tasks to complete by default.", whenCondition.getSpecifyId());
|
||||
// 不存在指定任务,则所有任务都执行
|
||||
resultCompletableFuture = CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[] {}));
|
||||
}
|
||||
|
||||
// 结果处理
|
||||
this.handleResult(whenCondition, slotIndex, completableFutureList, resultCompletableFuture);
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -139,4 +139,40 @@ public class AsyncNodeELSpringbootTest extends BaseTest {
|
|||
Assertions.assertTrue(context.getData("check").toString().startsWith("habc"));
|
||||
}
|
||||
|
||||
// 测试 must 关键字
|
||||
@Test
|
||||
public void testAsyncFlow9() throws Exception {
|
||||
LiteflowResponse response = flowExecutor.execute2Resp("chain9", "it's a base request");
|
||||
DefaultContext context = response.getFirstContextBean();
|
||||
Assertions.assertTrue(response.isSuccess());
|
||||
Assertions.assertTrue(context.getData("check").toString().startsWith("habc"));
|
||||
}
|
||||
|
||||
// 测试 must 与 ignoreError 关键字,不忽略异常
|
||||
@Test
|
||||
public void testAsyncFlow10() throws Exception {
|
||||
LiteflowResponse response = flowExecutor.execute2Resp("chain10", "it's a base request");
|
||||
DefaultContext context = response.getFirstContextBean();
|
||||
Assertions.assertTrue(context.getData("check").toString().startsWith("kg"));
|
||||
Assertions.assertFalse(response.isSuccess());
|
||||
}
|
||||
|
||||
// 测试 must 与 ignoreError 关键字,忽略异常
|
||||
@Test
|
||||
public void testAsyncFlow11() throws Exception {
|
||||
LiteflowResponse response = flowExecutor.execute2Resp("chain11", "it's a base request");
|
||||
DefaultContext context = response.getFirstContextBean();
|
||||
Assertions.assertTrue(response.isSuccess());
|
||||
Assertions.assertTrue(context.getData("check").toString().startsWith("kgdabc"));
|
||||
}
|
||||
|
||||
// 测试 must 、 ignoreError 、 id 关键字
|
||||
@Test
|
||||
public void testAsyncFlow12() throws Exception {
|
||||
LiteflowResponse response = flowExecutor.execute2Resp("chain12", "it's a base request");
|
||||
DefaultContext context = response.getFirstContextBean();
|
||||
Assertions.assertTrue(response.isSuccess());
|
||||
Assertions.assertTrue(context.getData("check").toString().startsWith("akbc"));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,27 @@
|
|||
package com.yomahub.liteflow.test.asyncNode.cmp;
|
||||
|
||||
import com.yomahub.liteflow.core.NodeComponent;
|
||||
import com.yomahub.liteflow.slot.DefaultContext;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component("k")
|
||||
public class KCmp extends NodeComponent {
|
||||
|
||||
@Override
|
||||
public void process() throws Exception {
|
||||
Thread.sleep(200);
|
||||
DefaultContext context = this.getFirstContextBean();
|
||||
synchronized (NodeComponent.class) {
|
||||
if (context.hasData("check")) {
|
||||
String str = context.getData("check");
|
||||
str += this.getNodeId();
|
||||
context.setData("check", str);
|
||||
}
|
||||
else {
|
||||
context.setData("check", this.getNodeId());
|
||||
}
|
||||
}
|
||||
System.out.println("Kcomp executed!");
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,15 @@
|
|||
package com.yomahub.liteflow.test.asyncNode.cmp;
|
||||
|
||||
import com.yomahub.liteflow.core.NodeComponent;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component("l")
|
||||
public class LCmp extends NodeComponent {
|
||||
|
||||
@Override
|
||||
public void process() throws Exception {
|
||||
System.out.println("Lcomp executed! Throw exception");
|
||||
int i = 1/0;
|
||||
}
|
||||
|
||||
}
|
|
@ -46,4 +46,20 @@
|
|||
THEN(WHEN(d, g, h).any(true), THEN(a, b, c));
|
||||
</chain>
|
||||
|
||||
<chain name="chain9">
|
||||
THEN(WHEN(d, g, h).must("h"), THEN(a, b, c));
|
||||
</chain>
|
||||
|
||||
<chain name="chain10">
|
||||
THEN(WHEN(d, g, k, l).must("g").ignoreError(false), THEN(a, b, c));
|
||||
</chain>
|
||||
|
||||
<chain name="chain11">
|
||||
THEN(WHEN(d, g, k, l).ignoreError(true).must("d"), THEN(a, b, c));
|
||||
</chain>
|
||||
|
||||
<chain name="chain12">
|
||||
THEN(WHEN(d, g, l, a, THEN(k, b).id("z")).ignoreError(true).must("z"), c);
|
||||
</chain>
|
||||
|
||||
</flow>
|
Loading…
Reference in New Issue