enhancement #I883LB 调整 When 相关逻辑,提取公共方法,补充遗漏线程池获取

This commit is contained in:
luoyi 2023-10-15 16:34:10 +08:00
parent 1ca5861e40
commit 71ca602ea6
2 changed files with 55 additions and 43 deletions

View File

@ -26,6 +26,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* 并发策略执行器抽象类
@ -81,6 +82,51 @@ public abstract class ParallelStrategyExecutor {
}
}
/**
* 过滤 WHEN 待执行任务
* @param executableList 所有任务列表
* @param slotIndex
* @return
*/
protected Stream<Executable> filterWhenTaskList(List<Executable> executableList, Integer slotIndex) {
// 1.先进行过滤前置和后置组件过滤掉因为在 EL Chain 处理的时候已经提出来了
// 2.过滤 isAccess false 的情况因为不过滤这个的话如果加上了 any那么 isAccess false 那就是最快的了
return executableList.stream()
.filter(executable -> !(executable instanceof PreCondition) && !(executable instanceof FinallyCondition))
.filter(executable -> {
try {
return executable.isAccess(slotIndex);
} catch (Exception e) {
LOG.error("there was an error when executing the when component isAccess", e);
return false;
}
});
}
/**
* 获取 WHEN 所需线程池
* @param whenCondition
* @return
*/
protected ExecutorService getWhenExecutorService(WhenCondition whenCondition) {
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
// 如果设置了线程池隔离则每个 when 都会有对应的线程池这是为了避免多层嵌套时如果线程池数量不够时出现单个线程池死锁用线程池隔离的方式会更加好
// 如果 when 没有超多层的嵌套还是用默认的比较好
// 默认设置不隔离也就是说默认情况是一个线程池类一个实例如果什么都不配置那也就是在 when 的情况下全局一个线程池
ExecutorService parallelExecutor;
if (BooleanUtil.isTrue(liteflowConfig.getWhenThreadPoolIsolate())) {
parallelExecutor = ExecutorHelper.loadInstance().buildWhenExecutorWithHash(whenCondition.getThreadExecutorClass(), String.valueOf(whenCondition.hashCode()));
} else {
parallelExecutor = ExecutorHelper.loadInstance().buildWhenExecutor(whenCondition.getThreadExecutorClass());
}
return parallelExecutor;
}
/**
* 获取所有任务 CompletableFuture 集合
* @param whenCondition
@ -91,36 +137,15 @@ public abstract class ParallelStrategyExecutor {
String currChainName = whenCondition.getCurrChainId();
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
// 如果设置了线程池隔离则每个when都会有对应的线程池这是为了避免多层嵌套时如果线程池数量不够时出现单个线程池死锁用线程池隔离的方式会更加好
// 如果when没有超多层的嵌套还是用默认的比较好
// 默认设置不隔离也就是说默认情况是一个线程池类一个实例如果什么都不配置那也就是在when的情况下全局一个线程池
ExecutorService parallelExecutor;
if (BooleanUtil.isTrue(liteflowConfig.getWhenThreadPoolIsolate())){
parallelExecutor = ExecutorHelper.loadInstance().buildWhenExecutorWithHash(whenCondition.getThreadExecutorClass(), String.valueOf(whenCondition.hashCode()));
}else{
parallelExecutor = ExecutorHelper.loadInstance().buildWhenExecutor(whenCondition.getThreadExecutorClass());
}
// 设置 whenCondition 参数
setWhenConditionParams(whenCondition);
// 这里主要是做了封装 CompletableFuture 对象 lumbda 表达式做了很多事情这句代码要仔细理清
// 1.先进行过滤前置和后置组件过滤掉因为在 EL Chain 处理的时候已经提出来了
// 2.过滤 isAccess false 的情况因为不过滤这个的话如果加上了 any那么 isAccess false 那就是最快的了
// 3.根据 condition.getNodeList() 的集合进行流处理 map 进行把 executable 对象转换成 List<CompletableFuture<WhenFutureObj>>
List<CompletableFuture<WhenFutureObj>> completableFutureList = whenCondition.getExecutableList()
.stream()
.filter(executable -> !(executable instanceof PreCondition) && !(executable instanceof FinallyCondition))
.filter(executable -> {
try {
return executable.isAccess(slotIndex);
} catch (Exception e) {
LOG.error("there was an error when executing the when component isAccess", e);
return false;
}
})
// 获取 WHEN 所需线程池
ExecutorService parallelExecutor = getWhenExecutorService(whenCondition);
// 这里主要是做了封装 CompletableFuture 对象 lambda 表达式做了很多事情这句代码要仔细理清
// 根据 condition.getNodeList() 的集合进行流处理 map 进行把 executable 对象转换成 List<CompletableFuture<WhenFutureObj>>
List<CompletableFuture<WhenFutureObj>> completableFutureList = filterWhenTaskList(whenCondition.getExecutableList(), slotIndex)
.map(executable -> wrappedFutureObj(executable, parallelExecutor, whenCondition, currChainName, slotIndex))
.collect(Collectors.toList());

View File

@ -1,11 +1,8 @@
package com.yomahub.liteflow.flow.parallel.strategy;
import cn.hutool.core.collection.CollUtil;
import com.yomahub.liteflow.flow.element.condition.FinallyCondition;
import com.yomahub.liteflow.flow.element.condition.PreCondition;
import com.yomahub.liteflow.flow.element.condition.WhenCondition;
import com.yomahub.liteflow.flow.parallel.WhenFutureObj;
import com.yomahub.liteflow.thread.ExecutorHelper;
import java.util.*;
import java.util.concurrent.CompletableFuture;
@ -27,8 +24,8 @@ public class SpecifyParallelExecutor extends ParallelStrategyExecutor {
// 设置 whenCondition 参数
this.setWhenConditionParams(whenCondition);
// 此方法其实只会初始化一次Executor不会每次都会初始化Executor 是唯一的
ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildWhenExecutor(whenCondition.getThreadExecutorClass());
// 获取 WHEN 所需线程池
ExecutorService parallelExecutor = getWhenExecutorService(whenCondition);
// 指定完成的任务
CompletableFuture<?> specifyTask;
@ -43,17 +40,7 @@ public class SpecifyParallelExecutor extends ParallelStrategyExecutor {
List<CompletableFuture<WhenFutureObj>> allTaskList = new ArrayList<>();
// 遍历 when 所有 node进行筛选及处理
whenCondition.getExecutableList()
.stream()
.filter(executable -> !(executable instanceof PreCondition) && !(executable instanceof FinallyCondition))
.filter(executable -> {
try {
return executable.isAccess(slotIndex);
} catch (Exception e) {
LOG.error("there was an error when executing the when component isAccess", e);
return false;
}
})
filterWhenTaskList(whenCondition.getExecutableList(), slotIndex)
.forEach(executable -> {
// 处理 task封装成 CompletableFuture 对象
CompletableFuture<WhenFutureObj> completableFutureTask = wrappedFutureObj(executable, parallelExecutor, whenCondition, currChainName, slotIndex);