bug #I8YDGE 修复无法在 isAccess 方法中正确获取到 currChainId 问题

This commit is contained in:
luoyi 2024-02-27 15:03:23 +08:00
parent 778cd5787e
commit 174bf83062
8 changed files with 28 additions and 19 deletions

View File

@ -33,13 +33,15 @@ public class ForCondition extends LoopCondition {
throw new NoForNodeException(errorInfo);
}
// 提前设置 chainId避免无法在 isAccess 方法中获取到
forNode.setCurrChainId(this.getCurrChainId());
// 先去判断isAccess方法如果isAccess方法都返回false整个FOR表达式不执行
if (!this.getForNode().isAccess(slotIndex)) {
if (!forNode.isAccess(slotIndex)) {
return;
}
// 执行forCount组件
forNode.setCurrChainId(this.getCurrChainId());
forNode.execute(slotIndex);
// 获得循环次数

View File

@ -3,7 +3,8 @@ package com.yomahub.liteflow.flow.element.condition;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.enums.ConditionTypeEnum;
import com.yomahub.liteflow.exception.*;
import com.yomahub.liteflow.exception.IfTargetCannotBePreOrFinallyException;
import com.yomahub.liteflow.exception.NoIfTrueNodeException;
import com.yomahub.liteflow.flow.element.Condition;
import com.yomahub.liteflow.flow.element.Executable;
import com.yomahub.liteflow.slot.DataBus;
@ -21,13 +22,15 @@ public class IfCondition extends Condition {
public void executeCondition(Integer slotIndex) throws Exception {
Executable ifItem = this.getIfItem();
// 提前设置 chainId避免无法在 isAccess 方法中获取到
ifItem.setCurrChainId(this.getCurrChainId());
// 先去判断isAccess方法如果isAccess方法都返回false整个IF表达式不执行
if (!ifItem.isAccess(slotIndex)) {
return;
}
// 先执行IF节点
ifItem.setCurrChainId(this.getCurrChainId());
ifItem.execute(slotIndex);
// 拿到If执行过的结果

View File

@ -29,13 +29,15 @@ public class IteratorCondition extends LoopCondition {
throw new NoIteratorNodeException(errorInfo);
}
// 提前设置 chainId避免无法在 isAccess 方法中获取到
iteratorNode.setCurrChainId(this.getCurrChainId());
// 先去判断isAccess方法如果isAccess方法都返回false整个ITERATOR表达式不执行
if (!iteratorNode.isAccess(slotIndex)) {
return;
}
// 执行Iterator组件
iteratorNode.setCurrChainId(this.getCurrChainId());
iteratorNode.execute(slotIndex);
Iterator<?> it = iteratorNode.getItemResultMetaValue(slotIndex);

View File

@ -1,13 +1,10 @@
package com.yomahub.liteflow.flow.element.condition;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.enums.ConditionTypeEnum;
import com.yomahub.liteflow.enums.NodeTypeEnum;
import com.yomahub.liteflow.exception.NoSwitchTargetNodeException;
import com.yomahub.liteflow.exception.SwitchTargetCannotBePreOrFinallyException;
import com.yomahub.liteflow.exception.SwitchTypeErrorException;
import com.yomahub.liteflow.flow.element.Condition;
import com.yomahub.liteflow.flow.element.Executable;
import com.yomahub.liteflow.flow.element.Node;
@ -35,13 +32,15 @@ public class SwitchCondition extends Condition {
// 获取target List
List<Executable> targetList = this.getTargetList();
// 提前设置 chainId避免无法在 isAccess 方法中获取到
switchNode.setCurrChainId(this.getCurrChainId());
// 先去判断isAccess方法如果isAccess方法都返回false整个SWITCH表达式不执行
if (!switchNode.isAccess(slotIndex)) {
return;
}
// 先执行switch节点
switchNode.setCurrChainId(this.getCurrChainId());
switchNode.execute(slotIndex);
// 拿到switch节点的结果

View File

@ -3,7 +3,6 @@ package com.yomahub.liteflow.flow.element.condition;
import cn.hutool.core.util.ObjectUtil;
import com.yomahub.liteflow.enums.ConditionTypeEnum;
import com.yomahub.liteflow.flow.element.Executable;
import com.yomahub.liteflow.flow.element.Node;
import com.yomahub.liteflow.flow.parallel.LoopFutureObj;
import com.yomahub.liteflow.thread.ExecutorHelper;
@ -24,6 +23,9 @@ public class WhileCondition extends LoopCondition {
public void executeCondition(Integer slotIndex) throws Exception {
Executable whileItem = this.getWhileItem();
// 提前设置 chainId避免无法在 isAccess 方法中获取到
whileItem.setCurrChainId(this.getCurrChainId());
// 先去判断isAccess方法如果isAccess方法都返回false整个WHILE表达式不执行
if (!whileItem.isAccess(slotIndex)) {
return;
@ -84,7 +86,6 @@ public class WhileCondition extends LoopCondition {
private boolean getWhileResult(Integer slotIndex, int loopIndex) throws Exception {
Executable whileItem = this.getWhileItem();
// 执行while组件
whileItem.setCurrChainId(this.getCurrChainId());
setLoopIndex(whileItem, loopIndex);
whileItem.execute(slotIndex);

View File

@ -33,7 +33,7 @@ public class AllOfParallelExecutor extends ParallelStrategyExecutor {
// allOf 这个场景中不需要过滤
@Override
protected Stream<Executable> filterAccess(Stream<Executable> stream, Integer slotIndex) {
protected Stream<Executable> filterAccess(Stream<Executable> stream, Integer slotIndex, String currentChainId) {
return stream;
}

View File

@ -11,7 +11,6 @@ import com.yomahub.liteflow.flow.element.condition.FinallyCondition;
import com.yomahub.liteflow.flow.element.condition.PreCondition;
import com.yomahub.liteflow.flow.element.condition.WhenCondition;
import com.yomahub.liteflow.flow.parallel.CompletableFutureExpand;
import com.yomahub.liteflow.flow.parallel.CompletableFutureTimeout;
import com.yomahub.liteflow.flow.parallel.ParallelSupplier;
import com.yomahub.liteflow.flow.parallel.WhenFutureObj;
import com.yomahub.liteflow.log.LFLog;
@ -89,20 +88,23 @@ public abstract class ParallelStrategyExecutor {
* 过滤 WHEN 待执行任务
* @param executableList 所有任务列表
* @param slotIndex
* @param currentChainId 当前执行的 chainId
* @return
*/
protected Stream<Executable> filterWhenTaskList(List<Executable> executableList, Integer slotIndex) {
protected Stream<Executable> filterWhenTaskList(List<Executable> executableList, Integer slotIndex, String currentChainId) {
// 1.先进行过滤前置和后置组件过滤掉因为在 EL Chain 处理的时候已经提出来了
// 2.过滤 isAccess false 的情况因为不过滤这个的话如果加上了 any那么 isAccess false 那就是最快的了
Stream<Executable> stream = executableList.stream()
.filter(executable -> !(executable instanceof PreCondition) && !(executable instanceof FinallyCondition));
return filterAccess(stream, slotIndex);
return filterAccess(stream, slotIndex, currentChainId);
}
// 过滤 isAccess 的方法默认实现同时为避免同一个 node isAccess 方法重复执行 node 设置 isAccess 方法执行结果
protected Stream<Executable> filterAccess(Stream<Executable> stream, Integer slotIndex) {
protected Stream<Executable> filterAccess(Stream<Executable> stream, Integer slotIndex, String currentChainId) {
return stream.filter(executable -> {
try {
// 提前设置 chainId避免无法在 isAccess 方法中获取到
executable.setCurrChainId(currentChainId);
boolean access = executable.isAccess(slotIndex);
if (executable instanceof Node) {
((Node) executable).setAccessResult(access);
@ -150,14 +152,14 @@ public abstract class ParallelStrategyExecutor {
String currChainName = whenCondition.getCurrChainId();
// 设置 whenCondition 参数
setWhenConditionParams(whenCondition);
this.setWhenConditionParams(whenCondition);
// 获取 WHEN 所需线程池
ExecutorService parallelExecutor = getWhenExecutorService(whenCondition);
// 这里主要是做了封装 CompletableFuture 对象 lambda 表达式做了很多事情这句代码要仔细理清
// 根据 condition.getNodeList() 的集合进行流处理 map 进行把 executable 对象转换成 List<CompletableFuture<WhenFutureObj>>
List<CompletableFuture<WhenFutureObj>> completableFutureList = filterWhenTaskList(whenCondition.getExecutableList(), slotIndex)
List<CompletableFuture<WhenFutureObj>> completableFutureList = filterWhenTaskList(whenCondition.getExecutableList(), slotIndex, currChainName)
.map(executable -> wrappedFutureObj(executable, parallelExecutor, whenCondition, currChainName, slotIndex))
.collect(Collectors.toList());

View File

@ -41,7 +41,7 @@ public class SpecifyParallelExecutor extends ParallelStrategyExecutor {
List<CompletableFuture<WhenFutureObj>> allTaskList = new ArrayList<>();
// 遍历 when 所有 node进行筛选及处理
filterWhenTaskList(whenCondition.getExecutableList(), slotIndex)
filterWhenTaskList(whenCondition.getExecutableList(), slotIndex, currChainName)
.forEach(executable -> {
// 处理 task封装成 CompletableFuture 对象
CompletableFuture<WhenFutureObj> completableFutureTask = wrappedFutureObj(executable, parallelExecutor, whenCondition, currChainName, slotIndex);