feature #I7I3LL 解决冲突

This commit is contained in:
Dale Lee 2023-07-25 11:24:43 +08:00
commit 8b536ad4a5
163 changed files with 4582 additions and 1080 deletions

View File

@ -11,7 +11,7 @@ LiteFlow是一个轻量且强大的国产规则引擎框架可用于复杂的
LiteFlow于2020年正式开源2021年获得开源中国年度最受欢迎开源软件殊荣。于2022年获得Gitee最有价值开源项目(GVP)荣誉。是一个正处在高速发展中的开源项目。
LiteFlow是一个由社区驱动的项目我们非常重视社区建设拥有一个2500多人的使用者社区在使用中碰到任何问题或者建议都可以在社区中反应。
LiteFlow是一个由社区驱动的项目我们非常重视社区建设拥有一个3000多人的使用者社区在使用中碰到任何问题或者建议都可以在社区中反应。
你在官网中可以找到加入社区的方式!
@ -59,7 +59,7 @@ LiteFlow期待你的了解
**微信公众号**
由于社区群超过200人需要邀请入群。关注公众号后点击`个人微信`加我,我可以拉你入群
社区群需要邀请入群。关注公众号后点击`个人微信`加我,我可以拉你入群
![offIical-wx](static/img/offical-wx.jpg)

View File

@ -18,10 +18,16 @@ public @interface LiteflowMethod {
/**
* 节点ID用于区分节点 默认为空 则按照Spring模式下BeanName为准
* @return
* @return nodeId
*/
String nodeId() default "";
/**
* 节点Name
* @return nodeName
*/
String nodeName() default "";
/**
* CMP类型定义
* @return AnnotationNodeTypeEnum

View File

@ -79,6 +79,7 @@ public class LiteFlowChainELBuilder {
EXPRESS_RUNNER.addFunctionAndClassMethod(ChainConstant.BREAK, Object.class, new BreakOperator());
EXPRESS_RUNNER.addFunctionAndClassMethod(ChainConstant.DATA, Object.class, new DataOperator());
EXPRESS_RUNNER.addFunctionAndClassMethod(ChainConstant.MAX_WAIT_SECONDS, Object.class, new MaxWaitSecondsOperator());
EXPRESS_RUNNER.addFunctionAndClassMethod(ChainConstant.PARALLEL, Object.class, new ParallelOperator());
}
public static LiteFlowChainELBuilder createChain() {

View File

@ -0,0 +1,25 @@
package com.yomahub.liteflow.builder.el.operator;
import com.yomahub.liteflow.builder.el.operator.base.BaseOperator;
import com.yomahub.liteflow.builder.el.operator.base.OperatorHelper;
import com.yomahub.liteflow.flow.element.condition.LoopCondition;
/**
* EL规则中的parallel的操作符
*
* @author zhhhhy
* @since 2.11.0
*/
public class ParallelOperator extends BaseOperator<LoopCondition> {
@Override
public LoopCondition build(Object[] objects) throws Exception {
OperatorHelper.checkObjectSizeEqTwo(objects);
LoopCondition loopCondition = OperatorHelper.convert(objects[0], LoopCondition.class);
Boolean parallel = OperatorHelper.convert(objects[1], Boolean.class);
loopCondition.setParallel(parallel);
return loopCondition;
}
}

View File

@ -6,6 +6,7 @@ package com.yomahub.liteflow.common;
* @author tangkc
*/
public interface ChainConstant {
String PARALLEL = "parallel";
String CHAIN = "chain";

View File

@ -126,7 +126,7 @@ public abstract class NodeComponent {
stopWatch.stop();
final long timeSpent = stopWatch.getTotalTimeMillis();
LOG.debug("component[{}] finished in {} milliseconds", this.getDisplayName(), timeSpent);
LOG.info("component[{}] finished in {} milliseconds", this.getDisplayName(), timeSpent);
// 往CmpStep中放入时间消耗信息
cmpStep.setTimeSpent(timeSpent);

View File

@ -10,8 +10,10 @@ import com.yomahub.liteflow.enums.LiteFlowMethodEnum;
import com.yomahub.liteflow.enums.NodeTypeEnum;
import com.yomahub.liteflow.exception.ComponentMethodDefineErrorException;
import com.yomahub.liteflow.exception.LiteFlowException;
import com.yomahub.liteflow.exception.ProxyException;
import com.yomahub.liteflow.log.LFLog;
import com.yomahub.liteflow.log.LFLoggerManager;
import com.yomahub.liteflow.spi.holder.LiteflowComponentSupportHolder;
import com.yomahub.liteflow.util.LiteFlowProxyUtil;
import com.yomahub.liteflow.util.SerialsUtil;
import net.bytebuddy.ByteBuddy;
@ -27,6 +29,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Collectors;
/**
@ -93,8 +96,23 @@ public class ComponentProxy {
boolean legal = classes.size() == 1;
if (!legal) {
throw new LiteFlowException("The cmpClass of the same nodeId must be the same,you declared nodeId:"
+ activeNodeId + ",cmpClass:" + classes);
+ activeNodeId + ",cmpClass:" + clazz);
}
String activeNodeName;
if (isMethodCreate){
// 获取process上的LiteflowMethod
LiteflowMethod mainliteflowMethod = methodList.stream().filter(liteflowMethod -> liteflowMethod.value().isMainMethod()).findFirst().orElse(null);
if (mainliteflowMethod == null){
String errMsg = StrUtil.format("you have not defined @LiteFlowMethod on the processXXX method in class {}", clazz.getName());
throw new LiteFlowException(errMsg);
}
activeNodeName = mainliteflowMethod.nodeName();
}else{
activeNodeName = LiteflowComponentSupportHolder.loadLiteflowComponentSupport().getCmpName(bean);
}
// 当前节点实际LiteflowRetry注解
AtomicReference<LiteflowRetry> liteflowRetryAtomicReference = new AtomicReference<>(null);
// 相同nodeId只能有一个LiteflowRetry定义方法,且必须再Process方法上
@ -150,10 +168,12 @@ public class ComponentProxy {
NodeComponent nodeComponent = (NodeComponent) instance;
// 重设nodeId
nodeComponent.setNodeId(activeNodeId);
// 重设nodeName
nodeComponent.setName(activeNodeName);
return nodeComponent;
}
catch (Exception e) {
throw new LiteFlowException(e);
throw new ProxyException(e);
}
}).collect(Collectors.toList());
}

View File

@ -2,14 +2,19 @@ package com.yomahub.liteflow.enums;
public enum LiteFlowMethodEnum {
PROCESS("process", true), PROCESS_SWITCH("processSwitch", true), PROCESS_IF("processIf", true),
PROCESS_FOR("processFor", true), PROCESS_WHILE("processWhile", true), PROCESS_BREAK("processBreak", true),
PROCESS("process", true),
PROCESS_SWITCH("processSwitch", true),
PROCESS_IF("processIf", true),
PROCESS_FOR("processFor", true),
PROCESS_WHILE("processWhile", true),
PROCESS_BREAK("processBreak", true),
PROCESS_ITERATOR("processIterator", true),
IS_ACCESS("isAccess", false),
IS_END("isEnd", false), IS_CONTINUE_ON_ERROR("isContinueOnError", false),
IS_END("isEnd", false),
IS_CONTINUE_ON_ERROR("isContinueOnError", false),
GET_NODE_EXECUTOR_CLASS("getNodeExecutorClass", false),
@ -19,7 +24,10 @@ public enum LiteFlowMethodEnum {
BEFORE_PROCESS("beforeProcess", false),
AFTER_PROCESS("afterProcess", false);
AFTER_PROCESS("afterProcess", false),
GET_DISPLAY_NAME("getDisplayName", false)
;
private String methodName;

View File

@ -0,0 +1,31 @@
package com.yomahub.liteflow.exception;
/**
* @author Bryan.Zhang
*/
public class ProxyException extends RuntimeException {
private static final long serialVersionUID = 1L;
/** 异常信息 */
private String message;
public ProxyException(String message) {
this.message = message;
}
public ProxyException(Throwable cause) {
super(cause);
}
@Override
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
}

View File

@ -93,7 +93,7 @@ public class FlowBus {
}
nodeMap.put(nodeId,
new Node(ComponentInitializer.loadInstance().initComponent(nodeComponent, type, null, nodeId)));
new Node(ComponentInitializer.loadInstance().initComponent(nodeComponent, type, nodeComponent.getName(), nodeId)));
}
/**

View File

@ -1,7 +1,6 @@
package com.yomahub.liteflow.flow.element.condition;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.BooleanUtil;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.enums.ConditionTypeEnum;
import com.yomahub.liteflow.exception.AndOrConditionException;
@ -11,7 +10,9 @@ import com.yomahub.liteflow.log.LFLog;
import com.yomahub.liteflow.log.LFLoggerManager;
import com.yomahub.liteflow.slot.DataBus;
import com.yomahub.liteflow.slot.Slot;
import java.util.List;
import java.util.function.Predicate;
public class AndOrCondition extends Condition {
@ -23,21 +24,10 @@ public class AndOrCondition extends Condition {
public void executeCondition(Integer slotIndex) throws Exception {
List<Executable> itemList = this.getItem();
if (CollUtil.isEmpty(itemList)){
throw new AndOrConditionException("boolean item list is null");
}
boolean[] booleanArray = new boolean[itemList.size()];
for (int i = 0; i < itemList.size(); i++) {
Executable item = itemList.get(i);
item.setCurrChainId(this.getCurrChainId());
item.execute(slotIndex);
booleanArray[i] = item.getItemResultMetaValue(slotIndex);
LOG.info("the result of boolean component [{}] is [{}]", item.getId(), booleanArray[i]);
}
BooleanConditionTypeEnum booleanConditionType = this.getBooleanConditionType();
Slot slot = DataBus.getSlot(slotIndex);
@ -45,16 +35,37 @@ public class AndOrCondition extends Condition {
String resultKey = StrUtil.format("{}_{}",this.getClass().getName(),this.hashCode());
switch (booleanConditionType) {
case AND:
slot.setAndOrResult(resultKey, BooleanUtil.and(booleanArray));
slot.setAndOrResult(resultKey, itemList.stream().allMatch(new AndOrConditionPredicate(slotIndex)));
break;
case OR:
slot.setAndOrResult(resultKey, BooleanUtil.or(booleanArray));
slot.setAndOrResult(resultKey, itemList.stream().anyMatch(new AndOrConditionPredicate(slotIndex)));
break;
default:
throw new AndOrConditionException("condition type must be 'AND' or 'OR'");
}
}
private class AndOrConditionPredicate implements Predicate<Executable> {
private final Integer slotIndex;
public AndOrConditionPredicate(Integer slotIndex) {
this.slotIndex = slotIndex;
}
@Override
public boolean test(Executable condition) {
try {
condition.setCurrChainId(getCurrChainId());
condition.execute(slotIndex);
return condition.getItemResultMetaValue(slotIndex);
} catch (Exception e) {
throw new AndOrConditionException(e.getMessage());
}
}
}
@Override
@SuppressWarnings("unchecked")

View File

@ -6,10 +6,17 @@ import com.yomahub.liteflow.enums.ConditionTypeEnum;
import com.yomahub.liteflow.exception.NoForNodeException;
import com.yomahub.liteflow.flow.element.Executable;
import com.yomahub.liteflow.flow.element.Node;
import com.yomahub.liteflow.flow.parallel.LoopFutureObj;
import com.yomahub.liteflow.slot.DataBus;
import com.yomahub.liteflow.slot.Slot;
import com.yomahub.liteflow.thread.ExecutorHelper;
import com.yomahub.liteflow.util.LiteFlowProxyUtil;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
/**
* 循环次数Condition
*
@ -18,67 +25,92 @@ import com.yomahub.liteflow.util.LiteFlowProxyUtil;
*/
public class ForCondition extends LoopCondition {
@Override
public void executeCondition(Integer slotIndex) throws Exception {
Slot slot = DataBus.getSlot(slotIndex);
Node forNode = this.getForNode();
if (ObjectUtil.isNull(forNode)) {
String errorInfo = StrUtil.format("[{}]:no for-node found", slot.getRequestId());
throw new NoForNodeException(errorInfo);
}
@Override
public void executeCondition(Integer slotIndex) throws Exception {
Slot slot = DataBus.getSlot(slotIndex);
Node forNode = this.getForNode();
if (ObjectUtil.isNull(forNode)) {
String errorInfo = StrUtil.format("[{}]:no for-node found", slot.getRequestId());
throw new NoForNodeException(errorInfo);
}
// 先去判断isAccess方法如果isAccess方法都返回false整个FOR表达式不执行
if (!this.getForNode().isAccess(slotIndex)) {
return;
}
// 先去判断isAccess方法如果isAccess方法都返回false整个FOR表达式不执行
if (!this.getForNode().isAccess(slotIndex)) {
return;
}
// 执行forCount组件
forNode.setCurrChainId(this.getCurrChainId());
forNode.execute(slotIndex);
// 执行forCount组件
forNode.setCurrChainId(this.getCurrChainId());
forNode.execute(slotIndex);
// 获得循环次数
int forCount = forNode.getItemResultMetaValue(slotIndex);
// 获得循环次数
int forCount = forNode.getItemResultMetaValue(slotIndex);
// 获得要循环的可执行对象
Executable executableItem = this.getDoExecutor();
// 获得要循环的可执行对象
Executable executableItem = this.getDoExecutor();
// 获取Break节点
Executable breakItem = this.getBreakItem();
// 获取Break节点
Executable breakItem = this.getBreakItem();
try{
// 循环执行
for (int i = 0; i < forCount; i++) {
executableItem.setCurrChainId(this.getCurrChainId());
// 设置循环index
setLoopIndex(executableItem, i);
executableItem.execute(slotIndex);
// 如果break组件不为空则去执行
if (ObjectUtil.isNotNull(breakItem)) {
breakItem.setCurrChainId(this.getCurrChainId());
setLoopIndex(breakItem, i);
breakItem.execute(slotIndex);
boolean isBreak = breakItem.getItemResultMetaValue(slotIndex);
if (isBreak) {
break;
}
}
}
}finally {
removeLoopIndex(executableItem);
}
}
try {
if (!isParallel()) {
//串行循环执行
for (int i = 0; i < forCount; i++) {
executableItem.setCurrChainId(this.getCurrChainId());
// 设置循环index
setLoopIndex(executableItem, i);
executableItem.execute(slotIndex);
// 如果break组件不为空则去执行
if (ObjectUtil.isNotNull(breakItem)) {
breakItem.setCurrChainId(this.getCurrChainId());
setLoopIndex(breakItem, i);
breakItem.execute(slotIndex);
boolean isBreak = breakItem.getItemResultMetaValue(slotIndex);
if (isBreak) {
break;
}
}
}
}else{
//并行循环执行
//存储所有的并行执行子项的CompletableFuture
List<CompletableFuture<LoopFutureObj>> futureList = new ArrayList<>();
//获取并行循环的线程池
ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildLoopParallelExecutor();
for (int i = 0; i < forCount; i++){
//提交异步任务
CompletableFuture<LoopFutureObj> future =
CompletableFuture.supplyAsync(new LoopParallelSupplier(executableItem, this.getCurrChainId(), slotIndex, i), parallelExecutor);
futureList.add(future);
if (ObjectUtil.isNotNull(breakItem)) {
breakItem.setCurrChainId(this.getCurrChainId());
setLoopIndex(breakItem, i);
breakItem.execute(slotIndex);
boolean isBreak = breakItem.getItemResultMetaValue(slotIndex);
if (isBreak) {
break;
}
}
}
//等待所有的异步执行完毕
handleFutureList(futureList);
}
} finally {
removeLoopIndex(executableItem);
}
}
@Override
public ConditionTypeEnum getConditionType() {
return ConditionTypeEnum.TYPE_FOR;
}
@Override
public ConditionTypeEnum getConditionType() {
return ConditionTypeEnum.TYPE_FOR;
}
public Node getForNode() {
return (Node) this.getExecutableOne(ConditionKey.FOR_KEY);
}
public Node getForNode() {
return (Node) this.getExecutableOne(ConditionKey.FOR_KEY);
}
public void setForNode(Node forNode) {
this.addExecutable(ConditionKey.FOR_KEY, forNode);
}
public void setForNode(Node forNode) {
this.addExecutable(ConditionKey.FOR_KEY, forNode);
}
}

View File

@ -6,83 +6,118 @@ import com.yomahub.liteflow.enums.ConditionTypeEnum;
import com.yomahub.liteflow.exception.NoIteratorNodeException;
import com.yomahub.liteflow.flow.element.Executable;
import com.yomahub.liteflow.flow.element.Node;
import com.yomahub.liteflow.flow.parallel.LoopFutureObj;
import com.yomahub.liteflow.slot.DataBus;
import com.yomahub.liteflow.slot.Slot;
import com.yomahub.liteflow.thread.ExecutorHelper;
import com.yomahub.liteflow.util.LiteFlowProxyUtil;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
public class IteratorCondition extends LoopCondition {
@Override
public void executeCondition(Integer slotIndex) throws Exception {
Slot slot = DataBus.getSlot(slotIndex);
Node iteratorNode = this.getIteratorNode();
@Override
public void executeCondition(Integer slotIndex) throws Exception {
Slot slot = DataBus.getSlot(slotIndex);
Node iteratorNode = this.getIteratorNode();
if (ObjectUtil.isNull(iteratorNode)) {
String errorInfo = StrUtil.format("[{}]:no iterator-node found", slot.getRequestId());
throw new NoIteratorNodeException(errorInfo);
}
if (ObjectUtil.isNull(iteratorNode)) {
String errorInfo = StrUtil.format("[{}]:no iterator-node found", slot.getRequestId());
throw new NoIteratorNodeException(errorInfo);
}
// 先去判断isAccess方法如果isAccess方法都返回false整个ITERATOR表达式不执行
if (!iteratorNode.isAccess(slotIndex)) {
return;
}
// 先去判断isAccess方法如果isAccess方法都返回false整个ITERATOR表达式不执行
if (!iteratorNode.isAccess(slotIndex)) {
return;
}
// 执行Iterator组件
iteratorNode.setCurrChainId(this.getCurrChainId());
iteratorNode.execute(slotIndex);
// 执行Iterator组件
iteratorNode.setCurrChainId(this.getCurrChainId());
iteratorNode.execute(slotIndex);
Iterator<?> it = iteratorNode.getItemResultMetaValue(slotIndex);
Iterator<?> it = iteratorNode.getItemResultMetaValue(slotIndex);
// 获得要循环的可执行对象
Executable executableItem = this.getDoExecutor();
// 获得要循环的可执行对象
Executable executableItem = this.getDoExecutor();
// 获取Break节点
Executable breakItem = this.getBreakItem();
// 获取Break节点
Executable breakItem = this.getBreakItem();
try{
int index = 0;
while (it.hasNext()) {
Object itObj = it.next();
try {
int index = 0;
if (!this.isParallel()) {
//原本的串行循环执行
while (it.hasNext()) {
Object itObj = it.next();
executableItem.setCurrChainId(this.getCurrChainId());
// 设置循环index
setLoopIndex(executableItem, index);
// 设置循环迭代器对象
setCurrLoopObject(executableItem, itObj);
// 执行可执行对象
executableItem.execute(slotIndex);
// 如果break组件不为空则去执行
if (ObjectUtil.isNotNull(breakItem)) {
breakItem.setCurrChainId(this.getCurrChainId());
setLoopIndex(breakItem, index);
setCurrLoopObject(breakItem, itObj);
breakItem.execute(slotIndex);
boolean isBreak = breakItem.getItemResultMetaValue(slotIndex);
if (isBreak) {
break;
}
}
index++;
}
}finally{
removeLoopIndex(executableItem);
removeCurrLoopObject(executableItem);
}
}
executableItem.setCurrChainId(this.getCurrChainId());
// 设置循环index
setLoopIndex(executableItem, index);
// 设置循环迭代器对象
setCurrLoopObject(executableItem, itObj);
// 执行可执行对象
executableItem.execute(slotIndex);
// 如果break组件不为空则去执行
if (ObjectUtil.isNotNull(breakItem)) {
breakItem.setCurrChainId(this.getCurrChainId());
setLoopIndex(breakItem, index);
setCurrLoopObject(breakItem, itObj);
breakItem.execute(slotIndex);
boolean isBreak = breakItem.getItemResultMetaValue(slotIndex);
if (isBreak) {
break;
}
}
index++;
}
} else {
//并行循环执行
//存储所有的并行执行子项的CompletableFuture
List<CompletableFuture<LoopFutureObj>> futureList = new ArrayList<>();
//获取并行循环的线程池
ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildLoopParallelExecutor();
while (it.hasNext()) {
Object itObj = it.next();
//提交异步任务
CompletableFuture<LoopFutureObj> future =
CompletableFuture.supplyAsync(new LoopParallelSupplier(executableItem, this.getCurrChainId(), slotIndex, index, itObj), parallelExecutor);
futureList.add(future);
//break判断
if (ObjectUtil.isNotNull(breakItem)) {
breakItem.setCurrChainId(this.getCurrChainId());
setLoopIndex(breakItem, index);
breakItem.execute(slotIndex);
boolean isBreak = breakItem.getItemResultMetaValue(slotIndex);
if (isBreak) {
break;
}
}
index++;
}
//等待所有的异步执行完毕
handleFutureList(futureList);
}
} finally {
removeLoopIndex(executableItem);
removeCurrLoopObject(executableItem);
}
}
@Override
public ConditionTypeEnum getConditionType() {
return ConditionTypeEnum.TYPE_ITERATOR;
}
@Override
public ConditionTypeEnum getConditionType() {
return ConditionTypeEnum.TYPE_ITERATOR;
}
public Node getIteratorNode() {
return (Node) this.getExecutableOne(ConditionKey.ITERATOR_KEY);
}
public Node getIteratorNode() {
return (Node) this.getExecutableOne(ConditionKey.ITERATOR_KEY);
}
public void setIteratorNode(Node iteratorNode) {
this.addExecutable(ConditionKey.ITERATOR_KEY, iteratorNode);
}
public void setIteratorNode(Node iteratorNode) {
this.addExecutable(ConditionKey.ITERATOR_KEY, iteratorNode);
}
}

View File

@ -4,6 +4,11 @@ import com.yomahub.liteflow.flow.element.Chain;
import com.yomahub.liteflow.flow.element.Condition;
import com.yomahub.liteflow.flow.element.Executable;
import com.yomahub.liteflow.flow.element.Node;
import com.yomahub.liteflow.flow.parallel.LoopFutureObj;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
/**
* 循环Condition的抽象类 主要继承对象有ForCondition和WhileCondition
@ -12,73 +17,131 @@ import com.yomahub.liteflow.flow.element.Node;
* @since 2.9.0
*/
public abstract class LoopCondition extends Condition {
//判断循环是否并行执行默认为false
private boolean parallel = false;
protected Executable getBreakItem() {
return this.getExecutableOne(ConditionKey.BREAK_KEY);
}
protected Executable getBreakItem() {
return this.getExecutableOne(ConditionKey.BREAK_KEY);
}
public void setBreakItem(Executable breakNode) {
this.addExecutable(ConditionKey.BREAK_KEY, breakNode);
}
public void setBreakItem(Executable breakNode) {
this.addExecutable(ConditionKey.BREAK_KEY, breakNode);
}
protected Executable getDoExecutor() {
return this.getExecutableOne(ConditionKey.DO_KEY);
}
protected Executable getDoExecutor() {
return this.getExecutableOne(ConditionKey.DO_KEY);
}
public void setDoExecutor(Executable executable) {
this.addExecutable(ConditionKey.DO_KEY, executable);
}
public void setDoExecutor(Executable executable) {
this.addExecutable(ConditionKey.DO_KEY, executable);
}
protected void setLoopIndex(Executable executableItem, int index) {
if (executableItem instanceof Chain) {
((Chain) executableItem).getConditionList().forEach(condition -> setLoopIndex(condition, index));
}
else if (executableItem instanceof Condition) {
((Condition) executableItem).getExecutableGroup()
.forEach((key, value) -> value.forEach(executable -> setLoopIndex(executable, index)));
}
else if (executableItem instanceof Node) {
((Node) executableItem).setLoopIndex(index);
}
}
protected void setLoopIndex(Executable executableItem, int index) {
if (executableItem instanceof Chain) {
((Chain) executableItem).getConditionList().forEach(condition -> setLoopIndex(condition, index));
} else if (executableItem instanceof Condition) {
((Condition) executableItem).getExecutableGroup()
.forEach((key, value) -> value.forEach(executable -> setLoopIndex(executable, index)));
} else if (executableItem instanceof Node) {
((Node) executableItem).setLoopIndex(index);
}
}
protected void setCurrLoopObject(Executable executableItem, Object obj) {
if (executableItem instanceof Chain) {
((Chain) executableItem).getConditionList().forEach(condition -> setCurrLoopObject(condition, obj));
}
else if (executableItem instanceof Condition) {
((Condition) executableItem).getExecutableGroup()
.forEach((key, value) -> value.forEach(executable -> setCurrLoopObject(executable, obj)));
}
else if (executableItem instanceof Node) {
((Node) executableItem).setCurrLoopObject(obj);
}
}
protected void setCurrLoopObject(Executable executableItem, Object obj) {
if (executableItem instanceof Chain) {
((Chain) executableItem).getConditionList().forEach(condition -> setCurrLoopObject(condition, obj));
} else if (executableItem instanceof Condition) {
((Condition) executableItem).getExecutableGroup()
.forEach((key, value) -> value.forEach(executable -> setCurrLoopObject(executable, obj)));
} else if (executableItem instanceof Node) {
((Node) executableItem).setCurrLoopObject(obj);
}
}
protected void removeLoopIndex(Executable executableItem){
if (executableItem instanceof Chain) {
((Chain) executableItem).getConditionList().forEach(this::removeLoopIndex);
}
else if (executableItem instanceof Condition) {
((Condition) executableItem).getExecutableGroup()
.forEach((key, value) -> value.forEach(this::removeLoopIndex));
}
else if (executableItem instanceof Node) {
((Node) executableItem).removeLoopIndex();
}
}
protected void removeLoopIndex(Executable executableItem) {
if (executableItem instanceof Chain) {
((Chain) executableItem).getConditionList().forEach(this::removeLoopIndex);
} else if (executableItem instanceof Condition) {
((Condition) executableItem).getExecutableGroup()
.forEach((key, value) -> value.forEach(this::removeLoopIndex));
} else if (executableItem instanceof Node) {
((Node) executableItem).removeLoopIndex();
}
}
protected void removeCurrLoopObject(Executable executableItem){
if (executableItem instanceof Chain) {
((Chain) executableItem).getConditionList().forEach(this::removeCurrLoopObject);
}
else if (executableItem instanceof Condition) {
((Condition) executableItem).getExecutableGroup()
.forEach((key, value) -> value.forEach(this::removeCurrLoopObject));
}
else if (executableItem instanceof Node) {
((Node) executableItem).removeCurrLoopObject();
}
}
protected void removeCurrLoopObject(Executable executableItem) {
if (executableItem instanceof Chain) {
((Chain) executableItem).getConditionList().forEach(this::removeCurrLoopObject);
} else if (executableItem instanceof Condition) {
((Condition) executableItem).getExecutableGroup()
.forEach((key, value) -> value.forEach(this::removeCurrLoopObject));
} else if (executableItem instanceof Node) {
((Node) executableItem).removeCurrLoopObject();
}
}
public boolean isParallel() {
return parallel;
}
public void setParallel(boolean parallel) {
this.parallel = parallel;
}
//循环并行执行的futureList处理
protected void handleFutureList(List<CompletableFuture<LoopFutureObj>> futureList)throws Exception{
CompletableFuture<?> resultCompletableFuture = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[]{}));
resultCompletableFuture.get();
//获取所有的执行结果,如果有失败的那么需要抛出异常
for (CompletableFuture<LoopFutureObj> future : futureList) {
LoopFutureObj loopFutureObj = future.get();
if (!loopFutureObj.isSuccess()) {
throw loopFutureObj.getEx();
}
}
}
// 循环并行执行的Supplier封装
public class LoopParallelSupplier implements Supplier<LoopFutureObj> {
private final Executable executableItem;
private final String currChainId;
private final Integer slotIndex;
private final Integer loopIndex;
private final Object itObj;
public LoopParallelSupplier(Executable executableItem, String currChainId, Integer slotIndex, Integer loopIndex) {
this.executableItem = executableItem;
this.currChainId = currChainId;
this.slotIndex = slotIndex;
this.loopIndex = loopIndex;
this.itObj = null;
}
public LoopParallelSupplier(Executable executableItem, String currChainId, Integer slotIndex, Integer loopIndex, Object itObj) {
this.executableItem = executableItem;
this.currChainId = currChainId;
this.slotIndex = slotIndex;
this.loopIndex = loopIndex;
this.itObj = itObj;
}
@Override
public LoopFutureObj get() {
try {
executableItem.setCurrChainId(this.currChainId);
// 设置循环index
setLoopIndex(executableItem, loopIndex);
//IteratorCondition的情况下需要设置当前循环对象
if(itObj != null){
setCurrLoopObject(executableItem, itObj);
}
executableItem.execute(slotIndex);
return LoopFutureObj.success(executableItem.getId());
} catch (Exception e) {
return LoopFutureObj.fail(executableItem.getId(), e);
}
}
}
}

View File

@ -4,6 +4,13 @@ import cn.hutool.core.util.ObjectUtil;
import com.yomahub.liteflow.enums.ConditionTypeEnum;
import com.yomahub.liteflow.flow.element.Executable;
import com.yomahub.liteflow.flow.element.Node;
import com.yomahub.liteflow.flow.parallel.LoopFutureObj;
import com.yomahub.liteflow.thread.ExecutorHelper;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
/**
* 循环条件Condition
@ -30,21 +37,47 @@ public class WhileCondition extends LoopCondition {
// 循环执行
int index = 0;
while (getWhileResult(slotIndex)) {
executableItem.setCurrChainId(this.getCurrChainId());
setLoopIndex(executableItem, index);
executableItem.execute(slotIndex);
// 如果break组件不为空则去执行
if (ObjectUtil.isNotNull(breakItem)) {
breakItem.setCurrChainId(this.getCurrChainId());
setLoopIndex(breakItem, index);
breakItem.execute(slotIndex);
boolean isBreak = breakItem.getItemResultMetaValue(slotIndex);
if (isBreak) {
break;
if(!this.isParallel()){
//串行循环
while (getWhileResult(slotIndex)) {
executableItem.setCurrChainId(this.getCurrChainId());
setLoopIndex(executableItem, index);
executableItem.execute(slotIndex);
// 如果break组件不为空则去执行
if (ObjectUtil.isNotNull(breakItem)) {
breakItem.setCurrChainId(this.getCurrChainId());
setLoopIndex(breakItem, index);
breakItem.execute(slotIndex);
boolean isBreak = breakItem.getItemResultMetaValue(slotIndex);
if (isBreak) {
break;
}
}
index++;
}
index++;
}else{
//并行循环逻辑
List<CompletableFuture<LoopFutureObj>> futureList = new ArrayList<>();
//获取并行循环的线程池
ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildLoopParallelExecutor();
while (getWhileResult(slotIndex)){
CompletableFuture<LoopFutureObj> future =
CompletableFuture.supplyAsync(new LoopParallelSupplier(executableItem, this.getCurrChainId(), slotIndex, index), parallelExecutor);
futureList.add(future);
//break判断
if (ObjectUtil.isNotNull(breakItem)) {
breakItem.setCurrChainId(this.getCurrChainId());
setLoopIndex(breakItem, index);
breakItem.execute(slotIndex);
boolean isBreak = breakItem.getItemResultMetaValue(slotIndex);
if (isBreak) {
break;
}
}
index++;
}
//等待所有的异步执行完毕
handleFutureList(futureList);
}
}

View File

@ -0,0 +1,56 @@
package com.yomahub.liteflow.flow.parallel;
/**
* 并行循环各个并行子项的执行结果对象
* 如果该子项执行成功则success为trueex为null
* 否则success为falseex为程序抛出异常
*
* @author zhhhhy
* @since 2.11.0
*/
public class LoopFutureObj {
private String executorName;
private boolean success;
private Exception ex;
public static LoopFutureObj success(String executorName) {
LoopFutureObj result = new LoopFutureObj();
result.setSuccess(true);
result.setExecutorName(executorName);
return result;
}
public static LoopFutureObj fail(String executorName, Exception ex) {
LoopFutureObj result = new LoopFutureObj();
result.setSuccess(false);
result.setExecutorName(executorName);
result.setEx(ex);
return result;
}
public Exception getEx() {
return ex;
}
public String getExecutorName() {
return executorName;
}
public boolean isSuccess() {
return success;
}
public void setEx(Exception ex) {
this.ex = ex;
}
public void setExecutorName(String executorName) {
this.executorName = executorName;
}
public void setSuccess(boolean success) {
this.success = success;
}
}

View File

@ -63,13 +63,21 @@ public class MonitorFile {
@Override
public void onFileChange(File file) {
LOG.info("file modify,filePath={}", file.getAbsolutePath());
FlowExecutorHolder.loadInstance().reloadRule();
this.reloadRule();
}
@Override
public void onFileDelete(File file) {
LOG.info("file delete,filePath={}", file.getAbsolutePath());
FlowExecutorHolder.loadInstance().reloadRule();
this.reloadRule();
}
private void reloadRule() {
try {
FlowExecutorHolder.loadInstance().reloadRule();
} catch (Exception e) {
LOG.error("reload rule error", e);
}
}
});
// 创建文件变化监听器

View File

@ -140,6 +140,8 @@ public class ParserHelper {
// 校验加载的 chainName 是否有重复的
// TODO 这里是否有个问题当混合格式加载的时候2个同名的Chain在不同的文件里就不行了
String chainName = Optional.ofNullable(e.attributeValue(ID)).orElse(e.attributeValue(NAME));
// 检查 chainName
checkChainId(chainName, e.getText());
if (!chainNameSet.add(chainName)) {
throw new ChainDuplicateException(String.format("[chain name duplicate] chainName=%s", chainName));
}
@ -202,9 +204,11 @@ public class ParserHelper {
JsonNode innerJsonObject = iterator.next();
// 校验加载的 chainName 是否有重复的
// TODO 这里是否有个问题当混合格式加载的时候2个同名的Chain在不同的文件里就不行了
String chainName = Optional.ofNullable(innerJsonObject.get(ID))
.orElse(innerJsonObject.get(NAME))
.textValue();
JsonNode chainNameJsonNode = Optional.ofNullable(innerJsonObject.get(ID))
.orElse(innerJsonObject.get(NAME));
String chainName = Optional.ofNullable(chainNameJsonNode).map(JsonNode::textValue).orElse(null);
// 检查 chainName
checkChainId(chainName, innerJsonObject.toPrettyString());
if (!chainNameSet.add(chainName)) {
throw new ChainDuplicateException(String.format("[chain name duplicate] chainName=%s", chainName));
}
@ -250,6 +254,17 @@ public class ParserHelper {
chainELBuilder.setEL(el).build();
}
/**
* 检查 chainId
* @param chainId chainId
* @param elData elData
*/
private static void checkChainId(String chainId, String elData) {
if (StrUtil.isBlank(chainId)) {
throw new ParseException("missing chain id in expression \r\n" + elData);
}
}
private static class RegexUtil {
// java 注释的正则表达式

View File

@ -103,6 +103,15 @@ public class LiteflowConfig {
// 规则文件/脚本文件变更监听
private Boolean enableMonitorFile = Boolean.FALSE;
//并行循环线程池所用class路径
private String parallelLoopExecutorClass;
//使用默认并行循环线程池时最大线程数
private Integer parallelMaxWorkers;
//使用默认并行循环线程池时最大队列数
private Integer parallelQueueLimit;
public Boolean getEnableMonitorFile() {
return enableMonitorFile;
}
@ -409,4 +418,40 @@ public class LiteflowConfig {
public void setWhenMaxWaitTimeUnit(TimeUnit whenMaxWaitTimeUnit) {
this.whenMaxWaitTimeUnit = whenMaxWaitTimeUnit;
}
public Integer getParallelMaxWorkers() {
if(ObjectUtil.isNull(parallelMaxWorkers)){
return 16;
}else{
return parallelMaxWorkers;
}
}
public void setParallelMaxWorkers(Integer parallelMaxWorkers) {
this.parallelMaxWorkers = parallelMaxWorkers;
}
public Integer getParallelQueueLimit() {
if(ObjectUtil.isNull(parallelQueueLimit)){
return 512;
}else{
return parallelQueueLimit;
}
}
public void setParallelQueueLimit(Integer parallelQueueLimit) {
this.parallelQueueLimit = parallelQueueLimit;
}
public String getParallelLoopExecutorClass() {
if (StrUtil.isBlank(parallelLoopExecutorClass)) {
return "com.yomahub.liteflow.thread.LiteFlowDefaultParallelLoopExecutorBuilder";
}
else {
return parallelLoopExecutorClass;
}
}
public void setParallelLoopExecutorClass(String parallelLoopExecutorClass) {
this.parallelLoopExecutorClass = parallelLoopExecutorClass;
}
}

View File

@ -3,6 +3,7 @@ package com.yomahub.liteflow.script.proxy;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.util.*;
import com.yomahub.liteflow.exception.LiteFlowException;
import com.yomahub.liteflow.exception.ProxyException;
import com.yomahub.liteflow.exception.ScriptBeanMethodInvokeException;
import com.yomahub.liteflow.log.LFLog;
import com.yomahub.liteflow.log.LFLoggerManager;
@ -10,8 +11,18 @@ import com.yomahub.liteflow.script.annotation.ScriptBean;
import com.yomahub.liteflow.util.LiteFlowProxyUtil;
import com.yomahub.liteflow.util.SerialsUtil;
import net.bytebuddy.ByteBuddy;
import net.bytebuddy.description.method.ParameterDescription;
import net.bytebuddy.description.modifier.Visibility;
import net.bytebuddy.dynamic.DynamicType;
import net.bytebuddy.dynamic.scaffold.subclass.ConstructorStrategy;
import net.bytebuddy.implementation.FixedValue;
import net.bytebuddy.implementation.InvocationHandlerAdapter;
import net.bytebuddy.implementation.MethodCall;
import net.bytebuddy.implementation.MethodDelegation;
import net.bytebuddy.implementation.bytecode.constant.DefaultValue;
import net.bytebuddy.matcher.ElementMatchers;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.util.Arrays;
@ -56,19 +67,18 @@ public class ScriptBeanProxy {
}
try {
return new ByteBuddy().subclass(orignalClass)
.name(StrUtil.format("{}.ByteBuddy${}", ClassUtil.getPackage(orignalClass),
SerialsUtil.generateShortUUID()))
Class<?> c = new ByteBuddy().subclass(orignalClass).name(StrUtil.format("{}.ByteBuddy${}", ClassUtil.getPackage(orignalClass),SerialsUtil.generateShortUUID()))
.method(ElementMatchers.any())
.intercept(InvocationHandlerAdapter.of(new AopInvocationHandler(bean, methodNameList)))
.annotateType(orignalClass.getAnnotations())
.make()
.load(ScriptBeanProxy.class.getClassLoader())
.getLoaded()
.newInstance();
.getLoaded();
return ReflectUtil.newInstanceIfPossible(c);
}
catch (Exception e) {
throw new LiteFlowException(e);
throw new ProxyException(e);
}
}

View File

@ -1,5 +1,7 @@
package com.yomahub.liteflow.spi;
import java.util.Map;
/**
* 环境容器SPI接口
*
@ -8,18 +10,27 @@ package com.yomahub.liteflow.spi;
*/
public interface ContextAware extends SpiPriority {
<T> T getBean(String name);
<T> T getBean(String name);
<T> T getBean(Class<T> clazz);
<T> T getBean(Class<T> clazz);
<T> T registerBean(String beanName, Class<T> clazz);
<T> T registerBean(String beanName, Class<T> clazz);
<T> T registerBean(Class<T> clazz);
<T> T registerBean(Class<T> clazz);
<T> T registerBean(String beanName, Object bean);
<T> T registerBean(String beanName, Object bean);
<T> T registerOrGet(String beanName, Class<T> clazz);
<T> T registerOrGet(String beanName, Class<T> clazz);
boolean hasBean(String beanName);
/**
* 获取指定类型对应的所有Bean包括子类
*
* @param <T> Bean类型
* @param type 接口null表示获取所有bean
* @return 类型对应的beankey是bean注册的namevalue是Bean
*/
<T> Map<String, T> getBeansOfType(Class<T> type);
boolean hasBean(String beanName);
}

View File

@ -10,6 +10,6 @@ import com.yomahub.liteflow.core.NodeComponent;
*/
public interface LiteflowComponentSupport extends SpiPriority {
String getCmpName(NodeComponent nodeComponent);
String getCmpName(Object nodeComponent);
}

View File

@ -3,6 +3,8 @@ package com.yomahub.liteflow.spi.local;
import cn.hutool.core.util.ReflectUtil;
import com.yomahub.liteflow.spi.ContextAware;
import java.util.Map;
/**
* 非Spring环境容器实现 其实非Spring没有环境容器所以这是个空实现
*
@ -11,44 +13,49 @@ import com.yomahub.liteflow.spi.ContextAware;
*/
public class LocalContextAware implements ContextAware {
@Override
public <T> T getBean(String name) {
return null;
}
@Override
public <T> T getBean(String name) {
return null;
}
@Override
public <T> T getBean(Class<T> clazz) {
return null;
}
@Override
public <T> T getBean(Class<T> clazz) {
return null;
}
@Override
public <T> T registerBean(String beanName, Class<T> clazz) {
return ReflectUtil.newInstance(clazz);
}
@Override
public <T> T registerBean(String beanName, Class<T> clazz) {
return ReflectUtil.newInstance(clazz);
}
@Override
public <T> T registerBean(Class<T> clazz) {
return registerBean(null, clazz);
}
@Override
public <T> T registerBean(Class<T> clazz) {
return registerBean(null, clazz);
}
@Override
public <T> T registerBean(String beanName, Object bean) {
return (T) bean;
}
@Override
public <T> T registerBean(String beanName, Object bean) {
return (T) bean;
}
@Override
public <T> T registerOrGet(String beanName, Class<T> clazz) {
return registerBean(beanName, clazz);
}
@Override
public <T> T registerOrGet(String beanName, Class<T> clazz) {
return registerBean(beanName, clazz);
}
@Override
public boolean hasBean(String beanName) {
return false;
}
@Override
public <T> Map<String, T> getBeansOfType(Class<T> type) {
return null;
}
@Override
public int priority() {
return 2;
}
@Override
public boolean hasBean(String beanName) {
return false;
}
@Override
public int priority() {
return 2;
}
}

View File

@ -12,7 +12,7 @@ import com.yomahub.liteflow.spi.LiteflowComponentSupport;
public class LocalLiteflowComponentSupport implements LiteflowComponentSupport {
@Override
public String getCmpName(NodeComponent nodeComponent) {
public String getCmpName(Object nodeComponent) {
return null;
}

View File

@ -113,6 +113,12 @@ public class ExecutorHelper {
return getExecutorService(clazz);
}
//构造并行循环的线程池
public ExecutorService buildLoopParallelExecutor(){
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
return getExecutorService(liteflowConfig.getParallelLoopExecutorClass());
}
/**
* 根据线程执行构建者Class类名获取ExecutorService实例
*/

View File

@ -0,0 +1,27 @@
package com.yomahub.liteflow.thread;
import cn.hutool.core.util.ObjectUtil;
import com.yomahub.liteflow.property.LiteflowConfig;
import com.yomahub.liteflow.property.LiteflowConfigGetter;
import java.util.concurrent.ExecutorService;
/**
* LiteFlow默认的并行循环执行器实现
*
* @author zhhhhy
* @since 2.11.0
*/
public class LiteFlowDefaultParallelLoopExecutorBuilder implements ExecutorBuilder {
@Override
public ExecutorService buildExecutor() {
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
// 只有在非spring的场景下liteflowConfig才会为null
if (ObjectUtil.isNull(liteflowConfig)) {
liteflowConfig = new LiteflowConfig();
}
return buildDefaultExecutor(liteflowConfig.getParallelMaxWorkers(), liteflowConfig.getParallelMaxWorkers(),
liteflowConfig.getParallelQueueLimit(), "loop-thead-");
}
}

View File

@ -71,6 +71,9 @@ public class SQLXmlELParser extends ClassXmlFlowELParser {
* @param sqlParserVO sqlParserVO
*/
private void checkParserVO(SQLParserVO sqlParserVO) {
if (sqlParserVO.isDefaultDataSource()) {
return;
}
if (StrUtil.isEmpty(sqlParserVO.getUrl())) {
throw new ELSQLException(StrFormatter.format(ERROR_MSG_PATTERN, "url"));
}

View File

@ -9,7 +9,6 @@ import com.yomahub.liteflow.parser.sql.exception.ELSQLException;
import com.yomahub.liteflow.parser.sql.vo.SQLParserVO;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
@ -25,331 +24,279 @@ import java.util.Objects;
*/
public class JDBCHelper {
private static final String SQL_PATTERN = "SELECT {},{} FROM {} WHERE {}=?";
private static final String SQL_PATTERN = "SELECT {},{} FROM {} WHERE {}=?";
private static final String SCRIPT_SQL_CHECK_PATTERN = "SELECT 1 FROM {} WHERE {}=?";
private static final String SCRIPT_SQL_CHECK_PATTERN = "SELECT 1 FROM {} WHERE {}=?";
private static final String SCRIPT_SQL_PATTERN = "SELECT {},{},{},{} FROM {} WHERE {}=?";
private static final String SCRIPT_SQL_PATTERN = "SELECT {},{},{},{} FROM {} WHERE {}=?";
private static final String SCRIPT_WITH_LANGUAG_SQL_PATTERN = "SELECT {},{},{},{},{} FROM {} WHERE {}=?";
private static final String SCRIPT_WITH_LANGUAG_SQL_PATTERN = "SELECT {},{},{},{},{} FROM {} WHERE {}=?";
private static final String CHAIN_XML_PATTERN = "<chain name=\"{}\">{}</chain>";
private static final String CHAIN_XML_PATTERN = "<chain name=\"{}\">{}</chain>";
private static final String NODE_XML_PATTERN = "<nodes>{}</nodes>";
private static final String NODE_XML_PATTERN = "<nodes>{}</nodes>";
private static final String NODE_ITEM_XML_PATTERN = "<node id=\"{}\" name=\"{}\" type=\"{}\"><![CDATA[{}]]></node>";
private static final String NODE_ITEM_XML_PATTERN = "<node id=\"{}\" name=\"{}\" type=\"{}\"><![CDATA[{}]]></node>";
private static final String NODE_ITEM_WITH_LANGUAGE_XML_PATTERN = "<node id=\"{}\" name=\"{}\" type=\"{}\" language=\"{}\"><![CDATA[{}]]></node>";
private static final String NODE_ITEM_WITH_LANGUAGE_XML_PATTERN = "<node id=\"{}\" name=\"{}\" type=\"{}\" language=\"{}\"><![CDATA[{}]]></node>";
private static final String XML_PATTERN = "<?xml version=\"1.0\" encoding=\"UTF-8\"?><flow>{}{}</flow>";
private static final String XML_PATTERN = "<?xml version=\"1.0\" encoding=\"UTF-8\"?><flow>{}{}</flow>";
private static final Integer FETCH_SIZE_MAX = 1000;
private static final Integer FETCH_SIZE_MAX = 1000;
private SQLParserVO sqlParserVO;
private SQLParserVO sqlParserVO;
private static JDBCHelper INSTANCE;
private static JDBCHelper INSTANCE;
/**
* 初始化 INSTANCE
*/
public static void init(SQLParserVO sqlParserVO) {
try {
INSTANCE = new JDBCHelper();
Class.forName(sqlParserVO.getDriverClassName());
INSTANCE.setSqlParserVO(sqlParserVO);
}
catch (ClassNotFoundException e) {
throw new ELSQLException(e.getMessage());
}
}
/**
* 初始化 INSTANCE
*/
public static void init(SQLParserVO sqlParserVO) {
try {
INSTANCE = new JDBCHelper();
if (StrUtil.isNotBlank(sqlParserVO.getDriverClassName())) {
Class.forName(sqlParserVO.getDriverClassName());
}
INSTANCE.setSqlParserVO(sqlParserVO);
} catch (ClassNotFoundException e) {
throw new ELSQLException(e.getMessage());
}
}
/**
* 获取 INSTANCE
*/
public static JDBCHelper getInstance() {
return INSTANCE;
}
/**
* 获取 INSTANCE
*/
public static JDBCHelper getInstance() {
return INSTANCE;
}
/**
* 获取链接
*/
public Connection getConn() {
Connection connection;
try {
connection = DriverManager.getConnection(sqlParserVO.getUrl(), sqlParserVO.getUsername(),
sqlParserVO.getPassword());
}
catch (SQLException e) {
throw new ELSQLException(e.getMessage());
}
return connection;
}
/**
* 获取 ElData 数据内容
*/
public String getContent() {
Connection conn = null;
PreparedStatement stmt = null;
ResultSet rs = null;
/**
* 获取 ElData 数据内容
*/
public String getContent() {
Connection conn = null;
PreparedStatement stmt = null;
ResultSet rs = null;
String chainTableName = sqlParserVO.getChainTableName();
String elDataField = sqlParserVO.getElDataField();
String chainNameField = sqlParserVO.getChainNameField();
String chainApplicationNameField = sqlParserVO.getChainApplicationNameField();
String applicationName = sqlParserVO.getApplicationName();
String chainTableName = sqlParserVO.getChainTableName();
String elDataField = sqlParserVO.getElDataField();
String chainNameField = sqlParserVO.getChainNameField();
String chainApplicationNameField = sqlParserVO.getChainApplicationNameField();
String applicationName = sqlParserVO.getApplicationName();
if (StrUtil.isBlank(chainTableName)) {
throw new ELSQLException("You did not define the chainTableName property");
}
if (StrUtil.isBlank(chainTableName)) {
throw new ELSQLException("You did not define the chainTableName property");
}
if (StrUtil.isBlank(applicationName) || StrUtil.isBlank(chainApplicationNameField)) {
throw new ELSQLException("You did not define the applicationName or chainApplicationNameField property");
}
if (StrUtil.isBlank(applicationName) || StrUtil.isBlank(chainApplicationNameField)) {
throw new ELSQLException("You did not define the applicationName or chainApplicationNameField property");
}
String sqlCmd = StrUtil.format(SQL_PATTERN, chainNameField, elDataField, chainTableName,
chainApplicationNameField);
String sqlCmd = StrUtil.format(SQL_PATTERN, chainNameField, elDataField, chainTableName,
chainApplicationNameField);
List<String> result = new ArrayList<>();
try {
conn = LiteFlowJdbcUtil.getConn(sqlParserVO);
stmt = conn.prepareStatement(sqlCmd, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
// 设置游标拉取数量
stmt.setFetchSize(FETCH_SIZE_MAX);
stmt.setString(1, applicationName);
rs = stmt.executeQuery();
List<String> result = new ArrayList<>();
try {
conn = getConn();
stmt = conn.prepareStatement(sqlCmd, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
// 设置游标拉取数量
stmt.setFetchSize(FETCH_SIZE_MAX);
stmt.setString(1, applicationName);
rs = stmt.executeQuery();
while (rs.next()) {
String elData = getStringFromResultSet(rs, elDataField);
String chainName = getStringFromResultSet(rs, chainNameField);
while (rs.next()) {
String elData = getStringFromResultSet(rs, elDataField);
String chainName = getStringFromResultSet(rs, chainNameField);
result.add(StrUtil.format(CHAIN_XML_PATTERN, XmlUtil.escape(chainName), elData));
}
} catch (Exception e) {
throw new ELSQLException(e.getMessage());
} finally {
// 关闭连接
LiteFlowJdbcUtil.close(conn, stmt, rs);
}
result.add(StrUtil.format(CHAIN_XML_PATTERN, XmlUtil.escape(chainName), elData));
}
}
catch (Exception e) {
throw new ELSQLException(e.getMessage());
}
finally {
// 关闭连接
close(conn, stmt, rs);
}
String chainsContent = CollUtil.join(result, StrUtil.EMPTY);
String chainsContent = CollUtil.join(result, StrUtil.EMPTY);
String nodesContent;
if (hasScriptData()) {
nodesContent = getScriptNodes();
} else {
nodesContent = StrUtil.EMPTY;
}
String nodesContent;
if (hasScriptData()) {
nodesContent = getScriptNodes();
}
else {
nodesContent = StrUtil.EMPTY;
}
return StrUtil.format(XML_PATTERN, nodesContent, chainsContent);
}
return StrUtil.format(XML_PATTERN, nodesContent, chainsContent);
}
/**
* 获取脚本节点内容
*/
public String getScriptNodes() {
String scriptLanguageField = sqlParserVO.getScriptLanguageField();
if (StrUtil.isNotBlank(scriptLanguageField)) {
return getScriptNodesWithLanguage();
}
public String getScriptNodes() {
String scriptLanguageField = sqlParserVO.getScriptLanguageField();
if (StrUtil.isNotBlank(scriptLanguageField)) {
return getScriptNodesWithLanguage();
}
List<String> result = new ArrayList<>();
Connection conn = null;
PreparedStatement stmt = null;
ResultSet rs = null;
List<String> result = new ArrayList<>();
Connection conn = null;
PreparedStatement stmt = null;
ResultSet rs = null;
String scriptTableName = sqlParserVO.getScriptTableName();
String scriptIdField = sqlParserVO.getScriptIdField();
String scriptDataField = sqlParserVO.getScriptDataField();
String scriptNameField = sqlParserVO.getScriptNameField();
String scriptTypeField = sqlParserVO.getScriptTypeField();
String scriptApplicationNameField = sqlParserVO.getScriptApplicationNameField();
String applicationName = sqlParserVO.getApplicationName();
String scriptTableName = sqlParserVO.getScriptTableName();
String scriptIdField = sqlParserVO.getScriptIdField();
String scriptDataField = sqlParserVO.getScriptDataField();
String scriptNameField = sqlParserVO.getScriptNameField();
String scriptTypeField = sqlParserVO.getScriptTypeField();
String scriptApplicationNameField = sqlParserVO.getScriptApplicationNameField();
String applicationName = sqlParserVO.getApplicationName();
if (StrUtil.isBlank(applicationName) || StrUtil.isBlank(scriptApplicationNameField)) {
throw new ELSQLException("You did not define the applicationName or scriptApplicationNameField property");
}
if (StrUtil.isBlank(applicationName) || StrUtil.isBlank(scriptApplicationNameField)) {
throw new ELSQLException("You did not define the applicationName or scriptApplicationNameField property");
}
String sqlCmd = StrUtil.format(SCRIPT_SQL_PATTERN, scriptIdField, scriptDataField, scriptNameField,
scriptTypeField, scriptTableName, scriptApplicationNameField);
try {
conn = LiteFlowJdbcUtil.getConn(sqlParserVO);
stmt = conn.prepareStatement(sqlCmd, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
// 设置游标拉取数量
stmt.setFetchSize(FETCH_SIZE_MAX);
stmt.setString(1, applicationName);
rs = stmt.executeQuery();
String sqlCmd = StrUtil.format(SCRIPT_SQL_PATTERN, scriptIdField, scriptDataField, scriptNameField,
scriptTypeField, scriptTableName, scriptApplicationNameField);
try {
conn = getConn();
stmt = conn.prepareStatement(sqlCmd, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
// 设置游标拉取数量
stmt.setFetchSize(FETCH_SIZE_MAX);
stmt.setString(1, applicationName);
rs = stmt.executeQuery();
while (rs.next()) {
String id = getStringFromResultSet(rs, scriptIdField);
String data = getStringFromResultSet(rs, scriptDataField);
String name = getStringFromResultSet(rs, scriptNameField);
String type = getStringFromResultSet(rs, scriptTypeField);
while (rs.next()) {
String id = getStringFromResultSet(rs, scriptIdField);
String data = getStringFromResultSet(rs, scriptDataField);
String name = getStringFromResultSet(rs, scriptNameField);
String type = getStringFromResultSet(rs, scriptTypeField);
NodeTypeEnum nodeTypeEnum = NodeTypeEnum.getEnumByCode(type);
if (Objects.isNull(nodeTypeEnum)) {
throw new ELSQLException(StrUtil.format("Invalid type value[{}]", type));
}
NodeTypeEnum nodeTypeEnum = NodeTypeEnum.getEnumByCode(type);
if (Objects.isNull(nodeTypeEnum)) {
throw new ELSQLException(StrUtil.format("Invalid type value[{}]", type));
}
if (!nodeTypeEnum.isScript()) {
throw new ELSQLException(StrUtil.format("The type value[{}] is not a script type", type));
}
if (!nodeTypeEnum.isScript()) {
throw new ELSQLException(StrUtil.format("The type value[{}] is not a script type", type));
}
result.add(StrUtil.format(NODE_ITEM_XML_PATTERN, XmlUtil.escape(id), XmlUtil.escape(name), type, data));
}
} catch (Exception e) {
throw new ELSQLException(e.getMessage());
} finally {
// 关闭连接
LiteFlowJdbcUtil.close(conn, stmt, rs);
}
return StrUtil.format(NODE_XML_PATTERN, CollUtil.join(result, StrUtil.EMPTY));
}
result.add(StrUtil.format(NODE_ITEM_XML_PATTERN, XmlUtil.escape(id), XmlUtil.escape(name), type, data));
}
}
catch (Exception e) {
throw new ELSQLException(e.getMessage());
}
finally {
// 关闭连接
close(conn, stmt, rs);
}
return StrUtil.format(NODE_XML_PATTERN, CollUtil.join(result, StrUtil.EMPTY));
}
/**
* 获取脚本节点带语言
*
* @return
*/
public String getScriptNodesWithLanguage() {
public String getScriptNodesWithLanguage() {
List<String> result = new ArrayList<>();
Connection conn = null;
PreparedStatement stmt = null;
ResultSet rs = null;
List<String> result = new ArrayList<>();
Connection conn = null;
PreparedStatement stmt = null;
ResultSet rs = null;
String scriptTableName = sqlParserVO.getScriptTableName();
String scriptIdField = sqlParserVO.getScriptIdField();
String scriptDataField = sqlParserVO.getScriptDataField();
String scriptNameField = sqlParserVO.getScriptNameField();
String scriptTypeField = sqlParserVO.getScriptTypeField();
String scriptApplicationNameField = sqlParserVO.getScriptApplicationNameField();
String applicationName = sqlParserVO.getApplicationName();
String scriptLanguageField = sqlParserVO.getScriptLanguageField();
String scriptTableName = sqlParserVO.getScriptTableName();
String scriptIdField = sqlParserVO.getScriptIdField();
String scriptDataField = sqlParserVO.getScriptDataField();
String scriptNameField = sqlParserVO.getScriptNameField();
String scriptTypeField = sqlParserVO.getScriptTypeField();
String scriptApplicationNameField = sqlParserVO.getScriptApplicationNameField();
String applicationName = sqlParserVO.getApplicationName();
String scriptLanguageField = sqlParserVO.getScriptLanguageField();
if (StrUtil.isBlank(applicationName) || StrUtil.isBlank(scriptApplicationNameField)) {
throw new ELSQLException("You did not define the applicationName or scriptApplicationNameField property");
}
if (StrUtil.isBlank(applicationName) || StrUtil.isBlank(scriptApplicationNameField)) {
throw new ELSQLException("You did not define the applicationName or scriptApplicationNameField property");
}
String sqlCmd = StrUtil.format(SCRIPT_WITH_LANGUAG_SQL_PATTERN, scriptIdField, scriptDataField, scriptNameField,
scriptTypeField, scriptLanguageField, scriptTableName, scriptApplicationNameField);
try {
conn = LiteFlowJdbcUtil.getConn(sqlParserVO);
stmt = conn.prepareStatement(sqlCmd, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
// 设置游标拉取数量
stmt.setFetchSize(FETCH_SIZE_MAX);
stmt.setString(1, applicationName);
rs = stmt.executeQuery();
String sqlCmd = StrUtil.format(SCRIPT_WITH_LANGUAG_SQL_PATTERN, scriptIdField, scriptDataField, scriptNameField,
scriptTypeField, scriptLanguageField, scriptTableName, scriptApplicationNameField);
try {
conn = getConn();
stmt = conn.prepareStatement(sqlCmd, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
// 设置游标拉取数量
stmt.setFetchSize(FETCH_SIZE_MAX);
stmt.setString(1, applicationName);
rs = stmt.executeQuery();
while (rs.next()) {
String id = getStringFromResultSet(rs, scriptIdField);
String data = getStringFromResultSet(rs, scriptDataField);
String name = getStringFromResultSet(rs, scriptNameField);
String type = getStringFromResultSet(rs, scriptTypeField);
String language = getStringFromResultSet(rs, scriptLanguageField);
while (rs.next()) {
String id = getStringFromResultSet(rs, scriptIdField);
String data = getStringFromResultSet(rs, scriptDataField);
String name = getStringFromResultSet(rs, scriptNameField);
String type = getStringFromResultSet(rs, scriptTypeField);
String language = getStringFromResultSet(rs, scriptLanguageField);
NodeTypeEnum nodeTypeEnum = NodeTypeEnum.getEnumByCode(type);
if (Objects.isNull(nodeTypeEnum)) {
throw new ELSQLException(StrUtil.format("Invalid type value[{}]", type));
}
NodeTypeEnum nodeTypeEnum = NodeTypeEnum.getEnumByCode(type);
if (Objects.isNull(nodeTypeEnum)) {
throw new ELSQLException(StrUtil.format("Invalid type value[{}]", type));
}
if (!nodeTypeEnum.isScript()) {
throw new ELSQLException(StrUtil.format("The type value[{}] is not a script type", type));
}
if (!nodeTypeEnum.isScript()) {
throw new ELSQLException(StrUtil.format("The type value[{}] is not a script type", type));
}
if (!ScriptTypeEnum.checkScriptType(language)) {
throw new ELSQLException(StrUtil.format("The language value[{}] is error", language));
}
if (!ScriptTypeEnum.checkScriptType(language)) {
throw new ELSQLException(StrUtil.format("The language value[{}] is error", language));
}
result.add(StrUtil.format(NODE_ITEM_WITH_LANGUAGE_XML_PATTERN, XmlUtil.escape(id), XmlUtil.escape(name),
type, language, data));
}
} catch (Exception e) {
throw new ELSQLException(e.getMessage());
} finally {
// 关闭连接
LiteFlowJdbcUtil.close(conn, stmt, rs);
}
return StrUtil.format(NODE_XML_PATTERN, CollUtil.join(result, StrUtil.EMPTY));
}
result.add(StrUtil.format(NODE_ITEM_WITH_LANGUAGE_XML_PATTERN, XmlUtil.escape(id), XmlUtil.escape(name),
type, language, data));
}
}
catch (Exception e) {
throw new ELSQLException(e.getMessage());
}
finally {
// 关闭连接
close(conn, stmt, rs);
}
return StrUtil.format(NODE_XML_PATTERN, CollUtil.join(result, StrUtil.EMPTY));
}
private boolean hasScriptData() {
if (StrUtil.isBlank(sqlParserVO.getScriptTableName())) {
return false;
}
/**
* 关闭连接
* @param conn conn
* @param stmt stmt
* @param rs rs
*/
private void close(Connection conn, PreparedStatement stmt, ResultSet rs) {
// 关闭连接
if (conn != null) {
try {
conn.close();
}
catch (SQLException e) {
throw new ELSQLException(e.getMessage());
}
}
// 关闭 statement
if (stmt != null) {
try {
stmt.close();
}
catch (SQLException e) {
throw new ELSQLException(e.getMessage());
}
}
// 关闭结果集
if (rs != null) {
try {
rs.close();
}
catch (SQLException e) {
throw new ELSQLException(e.getMessage());
}
}
}
Connection conn = null;
PreparedStatement stmt = null;
ResultSet rs = null;
String sqlCmd = StrUtil.format(SCRIPT_SQL_CHECK_PATTERN, sqlParserVO.getScriptTableName(),
sqlParserVO.getScriptApplicationNameField());
try {
conn = LiteFlowJdbcUtil.getConn(sqlParserVO);
stmt = conn.prepareStatement(sqlCmd, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
stmt.setFetchSize(1);
stmt.setString(1, sqlParserVO.getApplicationName());
rs = stmt.executeQuery();
return rs.next();
} catch (Exception e) {
return false;
} finally {
// 关闭连接
LiteFlowJdbcUtil.close(conn, stmt, rs);
}
}
private boolean hasScriptData() {
if (StrUtil.isBlank(sqlParserVO.getScriptTableName())) {
return false;
}
private String getStringFromResultSet(ResultSet rs, String field) throws SQLException {
String data = rs.getString(field);
if (StrUtil.isBlank(data)) {
throw new ELSQLException(StrUtil.format("exist {} field value is empty", field));
}
return data;
}
Connection conn = null;
PreparedStatement stmt = null;
ResultSet rs = null;
String sqlCmd = StrUtil.format(SCRIPT_SQL_CHECK_PATTERN, sqlParserVO.getScriptTableName(),
sqlParserVO.getScriptApplicationNameField());
try {
conn = getConn();
stmt = conn.prepareStatement(sqlCmd, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
stmt.setFetchSize(1);
stmt.setString(1, sqlParserVO.getApplicationName());
rs = stmt.executeQuery();
return rs.next();
}
catch (Exception e) {
return false;
}
finally {
// 关闭连接
close(conn, stmt, rs);
}
}
private SQLParserVO getSqlParserVO() {
return sqlParserVO;
}
// #region get set method
private String getStringFromResultSet(ResultSet rs, String field) throws SQLException {
String data = rs.getString(field);
if (StrUtil.isBlank(data)) {
throw new ELSQLException(StrUtil.format("exist {} field value is empty", field));
}
return data;
}
private SQLParserVO getSqlParserVO() {
return sqlParserVO;
}
private void setSqlParserVO(SQLParserVO sqlParserVO) {
this.sqlParserVO = sqlParserVO;
}
private void setSqlParserVO(SQLParserVO sqlParserVO) {
this.sqlParserVO = sqlParserVO;
}
}

View File

@ -0,0 +1,134 @@
package com.yomahub.liteflow.parser.sql.util;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.parser.sql.exception.ELSQLException;
import com.yomahub.liteflow.parser.sql.vo.SQLParserVO;
import com.yomahub.liteflow.spi.holder.ContextAwareHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.sql.DataSource;
import java.sql.*;
import java.util.Map;
public class LiteFlowJdbcUtil {
private static final Logger LOG = LoggerFactory.getLogger(LiteFlowJdbcUtil.class);
private static final String CHECK_SQL_PATTERN = "SELECT {},{} FROM {} WHERE {}='{}'";
/**
* 获取链接
* 此方法会根据配置判读使用指定数据源还是IOC容器中已有的数据源
*
* @param sqlParserVO
* @return
*/
public static Connection getConn(SQLParserVO sqlParserVO) {
Connection connection = null;
String url = sqlParserVO.getUrl();
String username = sqlParserVO.getUsername();
String password = sqlParserVO.getPassword();
try {
// 如果不配置 jdbc 连接相关配置代表使用项目数据源
if (sqlParserVO.isDefaultDataSource()) {
String executeSql = buildCheckSql(sqlParserVO);
Map<String, DataSource> dataSourceMap = ContextAwareHolder.loadContextAware().getBeansOfType(DataSource.class);
// 遍历数据源多数据源场景下判断哪个数据源有 liteflow 配置
for (Map.Entry<String, DataSource> entry : dataSourceMap.entrySet()) {
String dataSourceName = entry.getKey();
DataSource dataSource = entry.getValue();
if (checkConnectionCanExecuteSql(dataSource.getConnection(), executeSql)) {
connection = dataSource.getConnection();
LOG.info("use dataSourceName[{}],has found liteflow config", dataSourceName);
} else {
LOG.info("check dataSourceName[{}],but not has liteflow config", dataSourceName);
}
}
if (connection == null) {
throw new ELSQLException("can not found liteflow config in dataSourceName " + dataSourceMap.keySet());
}
}
// 如果配置 jdbc 连接相关配置,代表使用指定链接信息
else {
connection = DriverManager.getConnection(url, username, password);
}
} catch (Exception e) {
throw new ELSQLException(e.getMessage());
}
return connection;
}
/**
* 判断连接是否可以执行指定 sql
*
* @param conn 连接
* @param sql 执行 sql
*/
public static boolean checkConnectionCanExecuteSql(Connection conn, String sql) {
PreparedStatement stmt = null;
ResultSet rs = null;
try {
stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
stmt.setFetchSize(1);
rs = stmt.executeQuery();
return rs.next();
} catch (Exception e) {
return false;
} finally {
// 关闭连接
close(conn, stmt, rs);
}
}
/**
* 关闭
*
* @param conn conn
* @param conn conn
* @param rs rs
*/
public static void close(Connection conn, PreparedStatement stmt, ResultSet rs) {
// 关闭连接
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
throw new ELSQLException(e.getMessage());
}
}
// 关闭 statement
if (stmt != null) {
try {
stmt.close();
} catch (SQLException e) {
throw new ELSQLException(e.getMessage());
}
}
// 关闭结果集
if (rs != null) {
try {
rs.close();
} catch (SQLException e) {
throw new ELSQLException(e.getMessage());
}
}
}
/**
* 构建检查 sql
*
* @param sqlParserVO
* @return
*/
private static String buildCheckSql(SQLParserVO sqlParserVO) {
String chainTableName = sqlParserVO.getChainTableName();
String elDataField = sqlParserVO.getElDataField();
String chainNameField = sqlParserVO.getChainNameField();
String chainApplicationNameField = sqlParserVO.getChainApplicationNameField();
String applicationName = sqlParserVO.getApplicationName();
return StrUtil.format(CHECK_SQL_PATTERN, chainNameField, elDataField, chainTableName, chainApplicationNameField, applicationName);
}
}

View File

@ -1,5 +1,7 @@
package com.yomahub.liteflow.parser.sql.vo;
import cn.hutool.core.util.StrUtil;
/**
* 用于解析 RuleSourceExtData VO 用于 sql 模式中
*
@ -8,212 +10,219 @@ package com.yomahub.liteflow.parser.sql.vo;
*/
public class SQLParserVO {
/**
* 连接地址
*/
private String url;
/**
* 连接地址
*/
private String url;
/**
* 驱动
*/
private String driverClassName;
/**
* 驱动
*/
private String driverClassName;
/**
* 账号名
*/
private String username;
/**
* 账号名
*/
private String username;
/**
* 密码
*/
private String password;
/**
* 密码
*/
private String password;
/**
* 应用名
*/
private String applicationName;
/**
* 应用名
*/
private String applicationName;
/**
* chain表名
*/
private String chainTableName;
/**
* chain表名
*/
private String chainTableName;
/**
* chain表里的应用名字段
*/
private String chainApplicationNameField = "application_name";
/**
* chain表里的应用名字段
*/
private String chainApplicationNameField = "application_name";
/**
* chainName
*/
private String chainNameField = "chain_name";
/**
* chainName
*/
private String chainNameField = "chain_name";
/**
* el 表达式相关数据
*/
private String elDataField = "el_data";
/**
* el 表达式相关数据
*/
private String elDataField = "el_data";
/**
* 脚本 node 表名
*/
private String scriptTableName;
/**
* 脚本 node 表名
*/
private String scriptTableName;
/**
* script表里的应用名字段
*/
private String scriptApplicationNameField = "application_name";
/**
* script表里的应用名字段
*/
private String scriptApplicationNameField = "application_name";
/**
* 脚本 node id 字段
*/
private String scriptIdField = "script_id";
/**
* 脚本 node id 字段
*/
private String scriptIdField = "script_id";
/**
* 脚本 node name 字段
*/
private String scriptNameField = "script_name";
/**
* 脚本 node name 字段
*/
private String scriptNameField = "script_name";
/**
* 脚本 node data 字段
*/
private String scriptDataField = "script_data";
/**
* 脚本 node data 字段
*/
private String scriptDataField = "script_data";
/**
* 脚本 node type 字段
*/
private String scriptTypeField = "script_type";
/**
* 脚本 node type 字段
*/
private String scriptTypeField = "script_type";
/**
* 脚本 node language 字段
*/
private String scriptLanguageField;
/**
* 脚本 node language 字段
*/
private String scriptLanguageField;
public String getUrl() {
return url;
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public void setUrl(String url) {
this.url = url;
}
public String getDriverClassName() {
return driverClassName;
}
public String getDriverClassName() {
return driverClassName;
}
public void setDriverClassName(String driverClassName) {
this.driverClassName = driverClassName;
}
public void setDriverClassName(String driverClassName) {
this.driverClassName = driverClassName;
}
public String getUsername() {
return username;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public void setPassword(String password) {
this.password = password;
}
public String getApplicationName() {
return applicationName;
}
public String getApplicationName() {
return applicationName;
}
public void setApplicationName(String applicationName) {
this.applicationName = applicationName;
}
public void setApplicationName(String applicationName) {
this.applicationName = applicationName;
}
public String getChainTableName() {
return chainTableName;
}
public String getChainTableName() {
return chainTableName;
}
public void setChainTableName(String chainTableName) {
this.chainTableName = chainTableName;
}
public void setChainTableName(String chainTableName) {
this.chainTableName = chainTableName;
}
public String getChainApplicationNameField() {
return chainApplicationNameField;
}
public String getChainApplicationNameField() {
return chainApplicationNameField;
}
public void setChainApplicationNameField(String chainApplicationNameField) {
this.chainApplicationNameField = chainApplicationNameField;
}
public void setChainApplicationNameField(String chainApplicationNameField) {
this.chainApplicationNameField = chainApplicationNameField;
}
public String getChainNameField() {
return chainNameField;
}
public String getChainNameField() {
return chainNameField;
}
public void setChainNameField(String chainNameField) {
this.chainNameField = chainNameField;
}
public void setChainNameField(String chainNameField) {
this.chainNameField = chainNameField;
}
public String getElDataField() {
return elDataField;
}
public String getElDataField() {
return elDataField;
}
public void setElDataField(String elDataField) {
this.elDataField = elDataField;
}
public void setElDataField(String elDataField) {
this.elDataField = elDataField;
}
public String getScriptTableName() {
return scriptTableName;
}
public String getScriptTableName() {
return scriptTableName;
}
public void setScriptTableName(String scriptTableName) {
this.scriptTableName = scriptTableName;
}
public void setScriptTableName(String scriptTableName) {
this.scriptTableName = scriptTableName;
}
public String getScriptApplicationNameField() {
return scriptApplicationNameField;
}
public String getScriptApplicationNameField() {
return scriptApplicationNameField;
}
public void setScriptApplicationNameField(String scriptApplicationNameField) {
this.scriptApplicationNameField = scriptApplicationNameField;
}
public void setScriptApplicationNameField(String scriptApplicationNameField) {
this.scriptApplicationNameField = scriptApplicationNameField;
}
public String getScriptIdField() {
return scriptIdField;
}
public String getScriptIdField() {
return scriptIdField;
}
public void setScriptIdField(String scriptIdField) {
this.scriptIdField = scriptIdField;
}
public void setScriptIdField(String scriptIdField) {
this.scriptIdField = scriptIdField;
}
public String getScriptNameField() {
return scriptNameField;
}
public String getScriptNameField() {
return scriptNameField;
}
public void setScriptNameField(String scriptNameField) {
this.scriptNameField = scriptNameField;
}
public void setScriptNameField(String scriptNameField) {
this.scriptNameField = scriptNameField;
}
public String getScriptDataField() {
return scriptDataField;
}
public String getScriptDataField() {
return scriptDataField;
}
public void setScriptDataField(String scriptDataField) {
this.scriptDataField = scriptDataField;
}
public void setScriptDataField(String scriptDataField) {
this.scriptDataField = scriptDataField;
}
public String getScriptTypeField() {
return scriptTypeField;
}
public String getScriptTypeField() {
return scriptTypeField;
}
public void setScriptTypeField(String scriptTypeField) {
this.scriptTypeField = scriptTypeField;
}
public void setScriptTypeField(String scriptTypeField) {
this.scriptTypeField = scriptTypeField;
}
public String getScriptLanguageField() {
return scriptLanguageField;
}
public String getScriptLanguageField() {
return scriptLanguageField;
}
public void setScriptLanguageField(String scriptLanguageField) {
this.scriptLanguageField = scriptLanguageField;
}
public void setScriptLanguageField(String scriptLanguageField) {
this.scriptLanguageField = scriptLanguageField;
}
/**
* 判断配资是否使用 IOC 已有数据源
*/
public boolean isDefaultDataSource() {
return StrUtil.isBlank(url) && StrUtil.isBlank(username) && StrUtil.isBlank(password) && StrUtil.isBlank(driverClassName);
}
}

View File

@ -47,6 +47,9 @@ public class LiteflowAutoConfiguration {
liteflowConfig.setMainExecutorClass(property.getMainExecutorClass());
liteflowConfig.setPrintExecutionLog(property.isPrintExecutionLog());
liteflowConfig.setSubstituteCmpClass(property.getSubstituteCmpClass());
liteflowConfig.setParallelMaxWorkers(property.getParallelMaxWorkers());
liteflowConfig.setParallelQueueLimit(property.getParallelQueueLimit());
liteflowConfig.setParallelLoopExecutorClass(property.getParallelLoopExecutorClass());
return liteflowConfig;
}

View File

@ -70,6 +70,15 @@ public class LiteflowProperty {
// 替补组件的class路径
private String substituteCmpClass;
//并行循环线程池类路径
private String parallelLoopExecutorClass;
//使用默认并行循环线程池时最大线程数
private Integer parallelMaxWorkers;
//使用默认并行循环线程池时最大队列数
private Integer parallelQueueLimit;
public boolean isEnable() {
return enable;
}
@ -219,4 +228,27 @@ public class LiteflowProperty {
this.ruleSourceExtData = ruleSourceExtData;
}
public String getParallelLoopExecutorClass() {
return parallelLoopExecutorClass;
}
public void setParallelLoopExecutorClass(String parallelLoopExecutorClass) {
this.parallelLoopExecutorClass = parallelLoopExecutorClass;
}
public Integer getParallelMaxWorkers() {
return parallelMaxWorkers;
}
public void setParallelMaxWorkers(Integer parallelMaxWorkers) {
this.parallelMaxWorkers = parallelMaxWorkers;
}
public Integer getParallelQueueLimit() {
return parallelQueueLimit;
}
public void setParallelQueueLimit(Integer parallelQueueLimit) {
this.parallelQueueLimit = parallelQueueLimit;
}
}

View File

@ -1,10 +1,15 @@
package com.yomahub.liteflow.spi.solon;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.ObjectUtil;
import com.yomahub.liteflow.spi.ContextAware;
import org.noear.solon.Solon;
import org.noear.solon.core.BeanWrap;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 基于代码形式的 Solon 上下文工具类
*
@ -12,73 +17,76 @@ import org.noear.solon.core.BeanWrap;
*/
public class SolonContextAware implements ContextAware {
@Override
public <T> T getBean(String name) {
try {
return Solon.context().getBean(name);
}
catch (Exception e) {
return null;
}
}
@Override
public <T> T getBean(String name) {
try {
return Solon.context().getBean(name);
} catch (Exception e) {
return null;
}
}
@Override
public <T> T getBean(Class<T> clazz) {
try {
return Solon.context().getBean(clazz);
}
catch (Exception e) {
return null;
}
}
@Override
public <T> T getBean(Class<T> clazz) {
try {
return Solon.context().getBean(clazz);
} catch (Exception e) {
return null;
}
}
private <T> T getBean(String beanName, Class<T> clazz) {
try {
return Solon.context().getBean(beanName);
}
catch (Exception e) {
return null;
}
}
private <T> T getBean(String beanName, Class<T> clazz) {
try {
return Solon.context().getBean(beanName);
} catch (Exception e) {
return null;
}
}
@Override
public <T> T registerBean(String beanName, Class<T> c) {
BeanWrap beanWrap = new BeanWrap(Solon.context(), c, null, beanName);
Solon.context().putWrap(beanName, beanWrap);
@Override
public <T> T registerBean(String beanName, Class<T> c) {
BeanWrap beanWrap = new BeanWrap(Solon.context(), c, null, beanName);
Solon.context().putWrap(beanName, beanWrap);
return beanWrap.get();
}
return beanWrap.get();
}
@Override
public <T> T registerBean(Class<T> c) {
return registerBean(c.getName(), c);
}
@Override
public <T> T registerBean(Class<T> c) {
return registerBean(c.getName(), c);
}
@Override
public <T> T registerBean(String beanName, Object bean) {
BeanWrap beanWrap = new BeanWrap(Solon.context(), bean.getClass(), bean, beanName);
Solon.context().putWrap(beanName, beanWrap);
@Override
public <T> T registerBean(String beanName, Object bean) {
BeanWrap beanWrap = new BeanWrap(Solon.context(), bean.getClass(), bean, beanName);
Solon.context().putWrap(beanName, beanWrap);
return beanWrap.get();
}
return beanWrap.get();
}
@Override
public <T> T registerOrGet(String beanName, Class<T> clazz) {
T t = getBean(beanName, clazz);
if (ObjectUtil.isNull(t)) {
t = registerBean(beanName, clazz);
}
return t;
}
@Override
public <T> T registerOrGet(String beanName, Class<T> clazz) {
T t = getBean(beanName, clazz);
if (ObjectUtil.isNull(t)) {
t = registerBean(beanName, clazz);
}
return t;
}
@Override
public boolean hasBean(String beanName) {
return Solon.context().hasWrap(beanName);
}
@Override
public <T> Map<String, T> getBeansOfType(Class<T> type) {
List<BeanWrap> wrapsOfType = Solon.context().getWrapsOfType(type);
return CollUtil.toMap(wrapsOfType, new HashMap<String, T>(), BeanWrap::name, BeanWrap::get);
}
@Override
public int priority() {
return 1;
}
@Override
public boolean hasBean(String beanName) {
return Solon.context().hasWrap(beanName);
}
@Override
public int priority() {
return 1;
}
}

View File

@ -14,7 +14,7 @@ import com.yomahub.liteflow.spi.LiteflowComponentSupport;
public class SolonLiteflowComponentSupport implements LiteflowComponentSupport {
@Override
public String getCmpName(NodeComponent nodeComponent) {
public String getCmpName(Object nodeComponent) {
// 判断NodeComponent是否是标识了@LiteflowComponent的标注
// 如果标注了那么要从中取到name字段
LiteflowComponent liteflowComponent = nodeComponent.getClass().getAnnotation(LiteflowComponent.class);

View File

@ -80,6 +80,14 @@ public class LiteflowProperty {
// 规则文件/脚本文件变更监听
private Boolean enableMonitorFile;
private String parallelLoopExecutorClass;
//使用默认并行循环线程池时最大线程数
private Integer parallelMaxWorkers;
//使用默认并行循环线程池时最大队列数
private Integer parallelQueueLimit;
public Boolean getEnableMonitorFile() {
return enableMonitorFile;
}
@ -257,4 +265,28 @@ public class LiteflowProperty {
public void setWhenMaxWaitTimeUnit(TimeUnit whenMaxWaitTimeUnit) {
this.whenMaxWaitTimeUnit = whenMaxWaitTimeUnit;
}
public String getParallelLoopExecutorClass() {
return parallelLoopExecutorClass;
}
public void setParallelLoopExecutorClass(String parallelLoopExecutorClass) {
this.parallelLoopExecutorClass = parallelLoopExecutorClass;
}
public Integer getParallelMaxWorkers() {
return parallelMaxWorkers;
}
public void setParallelMaxWorkers(Integer parallelMaxWorkers) {
this.parallelMaxWorkers = parallelMaxWorkers;
}
public Integer getParallelQueueLimit() {
return parallelQueueLimit;
}
public void setParallelQueueLimit(Integer parallelQueueLimit) {
this.parallelQueueLimit = parallelQueueLimit;
}
}

View File

@ -48,6 +48,9 @@ public class LiteflowPropertyAutoConfiguration {
liteflowConfig.setPrintExecutionLog(property.isPrintExecutionLog());
liteflowConfig.setSubstituteCmpClass(property.getSubstituteCmpClass());
liteflowConfig.setEnableMonitorFile(property.getEnableMonitorFile());
liteflowConfig.setParallelMaxWorkers(property.getParallelMaxWorkers());
liteflowConfig.setParallelQueueLimit(property.getParallelQueueLimit());
liteflowConfig.setParallelLoopExecutorClass(property.getParallelLoopExecutorClass());
return liteflowConfig;
}

View File

@ -1,10 +1,8 @@
package com.yomahub.liteflow.spi.spring;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.ReflectUtil;
import com.yomahub.liteflow.spi.ContextAware;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.beans.factory.support.GenericBeanDefinition;
@ -12,6 +10,8 @@ import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ConfigurableApplicationContext;
import java.util.Map;
/**
* 基于代码形式的spring上下文工具类
*
@ -19,83 +19,87 @@ import org.springframework.context.ConfigurableApplicationContext;
*/
public class SpringAware implements ApplicationContextAware, ContextAware {
private static ApplicationContext applicationContext = null;
private static ApplicationContext applicationContext = null;
public SpringAware() {
}
public SpringAware() {
}
@Override
public void setApplicationContext(ApplicationContext ac) throws BeansException {
applicationContext = ac;
}
@Override
public void setApplicationContext(ApplicationContext ac) throws BeansException {
applicationContext = ac;
}
public static ApplicationContext getApplicationContext() {
return applicationContext;
}
public static ApplicationContext getApplicationContext() {
return applicationContext;
}
@Override
public <T> T getBean(String name) {
T t = (T) applicationContext.getBean(name);
return t;
}
@Override
public <T> T getBean(String name) {
T t = (T) applicationContext.getBean(name);
return t;
}
@Override
public <T> T getBean(Class<T> clazz) {
T t = applicationContext.getBean(clazz);
return t;
}
@Override
public <T> Map<String, T> getBeansOfType(Class<T> type) {
return applicationContext.getBeansOfType(type);
}
private <T> T getBean(String beanName, Class<T> clazz) {
T t = applicationContext.getBean(beanName, clazz);
return t;
}
@Override
public <T> T getBean(Class<T> clazz) {
T t = applicationContext.getBean(clazz);
return t;
}
@Override
public <T> T registerBean(String beanName, Class<T> c) {
DefaultListableBeanFactory beanFactory = (DefaultListableBeanFactory) applicationContext
.getAutowireCapableBeanFactory();
BeanDefinition beanDefinition = new GenericBeanDefinition();
beanDefinition.setBeanClassName(c.getName());
beanFactory.setAllowBeanDefinitionOverriding(true);
beanFactory.registerBeanDefinition(beanName, beanDefinition);
return getBean(beanName);
}
private <T> T getBean(String beanName, Class<T> clazz) {
T t = applicationContext.getBean(beanName, clazz);
return t;
}
@Override
public <T> T registerBean(Class<T> c) {
return registerBean(c.getName(), c);
}
@Override
public <T> T registerBean(String beanName, Class<T> c) {
DefaultListableBeanFactory beanFactory = (DefaultListableBeanFactory) applicationContext
.getAutowireCapableBeanFactory();
BeanDefinition beanDefinition = new GenericBeanDefinition();
beanDefinition.setBeanClassName(c.getName());
beanFactory.setAllowBeanDefinitionOverriding(true);
beanFactory.registerBeanDefinition(beanName, beanDefinition);
return getBean(beanName);
}
@Override
public <T> T registerBean(String beanName, Object bean) {
ConfigurableApplicationContext configurableApplicationContext = (ConfigurableApplicationContext) applicationContext;
DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory) configurableApplicationContext
.getAutowireCapableBeanFactory();
defaultListableBeanFactory.registerSingleton(beanName, bean);
return (T) configurableApplicationContext.getBean(beanName);
}
@Override
public <T> T registerBean(Class<T> c) {
return registerBean(c.getName(), c);
}
@Override
public <T> T registerOrGet(String beanName, Class<T> clazz) {
if (ObjectUtil.isNull(applicationContext)) {
return null;
}
try {
return getBean(beanName, clazz);
}
catch (Exception e) {
return registerBean(beanName, clazz);
}
}
@Override
public <T> T registerBean(String beanName, Object bean) {
ConfigurableApplicationContext configurableApplicationContext = (ConfigurableApplicationContext) applicationContext;
DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory) configurableApplicationContext
.getAutowireCapableBeanFactory();
defaultListableBeanFactory.registerSingleton(beanName, bean);
return (T) configurableApplicationContext.getBean(beanName);
}
@Override
public boolean hasBean(String beanName) {
return applicationContext.containsBean(beanName);
}
@Override
public <T> T registerOrGet(String beanName, Class<T> clazz) {
if (ObjectUtil.isNull(applicationContext)) {
return null;
}
try {
return getBean(beanName, clazz);
} catch (Exception e) {
return registerBean(beanName, clazz);
}
}
@Override
public int priority() {
return 1;
}
@Override
public boolean hasBean(String beanName) {
return applicationContext.containsBean(beanName);
}
@Override
public int priority() {
return 1;
}
}

View File

@ -15,7 +15,7 @@ import com.yomahub.liteflow.spi.LiteflowComponentSupport;
public class SpringLiteflowComponentSupport implements LiteflowComponentSupport {
@Override
public String getCmpName(NodeComponent nodeComponent) {
public String getCmpName(Object nodeComponent) {
// 判断NodeComponent是否是标识了@LiteflowComponent的标注
// 如果标注了那么要从中取到name字段
LiteflowComponent liteflowComponent = nodeComponent.getClass().getAnnotation(LiteflowComponent.class);

View File

@ -10,7 +10,7 @@ import javax.annotation.Resource;
@LiteflowComponent
public class CmpConfig {
@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS, nodeId = "a")
@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS, nodeId = "a", nodeName = "A组件")
public void processA(NodeComponent bindCmp) {
System.out.println("ACmp executed!");
}

View File

@ -1,44 +0,0 @@
package com.yomahub.liteflow.test.monitorFile;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.io.resource.ClassPathResource;
import cn.hutool.core.util.CharsetUtil;
import com.yomahub.liteflow.core.FlowExecutor;
import com.yomahub.liteflow.flow.LiteflowResponse;
import com.yomahub.liteflow.test.BaseTest;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
import java.io.File;
@ExtendWith(SpringExtension.class)
@TestPropertySource(value = "classpath:/monitorFile/application.properties")
@SpringBootTest(classes = MonitorFileELDeclMultiSpringbootTest.class)
@EnableAutoConfiguration
@ComponentScan({ "com.yomahub.liteflow.test.monitorFile.cmp" })
public class MonitorFileELDeclMultiSpringbootTest extends BaseTest {
@Resource
private FlowExecutor flowExecutor;
@Test
public void testMonitor() throws Exception {
String absolutePath = new ClassPathResource("classpath:/monitorFile/flow.el.xml").getAbsolutePath();
String content = FileUtil.readUtf8String(absolutePath);
String newContent = content.replace("THEN(a, b, c);", "THEN(a, c, b);");
FileUtil.writeString(newContent, new File(absolutePath), CharsetUtil.CHARSET_UTF_8);
Thread.sleep(3000);
LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg");
Assertions.assertEquals("a==>c==>b", response.getExecuteStepStr());
}
}

View File

@ -1,28 +0,0 @@
package com.yomahub.liteflow.test.monitorFile.cmp;
import com.yomahub.liteflow.annotation.LiteflowComponent;
import com.yomahub.liteflow.annotation.LiteflowMethod;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.enums.LiteFlowMethodEnum;
import java.util.Random;
@LiteflowComponent
public class CmpConfig {
@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS, nodeId = "a")
public void processA(NodeComponent bindCmp) {
System.out.println("ACmp executed!");
}
@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS, nodeId = "b")
public void processB(NodeComponent bindCmp) {
System.out.println("BCmp executed!");
}
@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS, nodeId = "c")
public void process(NodeComponent bindCmp) {
System.out.println("BCmp executed!");
}
}

View File

@ -0,0 +1,14 @@
package com.yomahub.liteflow.test.parallelLoop;
import com.yomahub.liteflow.exception.LiteFlowException;
/**
* 用户自定义带状态码的异常
*/
public class CustomStatefulException extends LiteFlowException {
public CustomStatefulException(String code, String message) {
super(code, message);
}
}

View File

@ -0,0 +1,23 @@
package com.yomahub.liteflow.test.parallelLoop;
import cn.hutool.core.util.ObjectUtil;
import com.yomahub.liteflow.property.LiteflowConfig;
import com.yomahub.liteflow.property.LiteflowConfigGetter;
import com.yomahub.liteflow.thread.ExecutorBuilder;
import java.util.concurrent.ExecutorService;
public class CustomThreadExecutor implements ExecutorBuilder {
@Override
public ExecutorService buildExecutor() {
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
// 只有在非spring的场景下liteflowConfig才会为null
if (ObjectUtil.isNull(liteflowConfig)) {
liteflowConfig = new LiteflowConfig();
}
return buildDefaultExecutor(liteflowConfig.getParallelMaxWorkers(), liteflowConfig.getParallelMaxWorkers(),
liteflowConfig.getParallelQueueLimit(), "customer-loop-thead-");
}
}

View File

@ -0,0 +1,119 @@
package com.yomahub.liteflow.test.parallelLoop;
import cn.hutool.core.collection.ListUtil;
import com.yomahub.liteflow.core.FlowExecutor;
import com.yomahub.liteflow.exception.LiteFlowException;
import com.yomahub.liteflow.flow.LiteflowResponse;
import com.yomahub.liteflow.slot.DefaultContext;
import com.yomahub.liteflow.test.BaseTest;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import javax.annotation.Resource;
import java.util.List;
import java.util.regex.Pattern;
/**
* springboot环境EL异步循环测试
*
* @author zhhhhy
* @since 2.11.0
*/
@ExtendWith(SpringExtension.class)
@TestPropertySource(value = "classpath:/parallelLoop/application.properties")
@SpringBootTest(classes = ParallelLoopELDeclMultiSpringbootTest.class)
@EnableAutoConfiguration
@ComponentScan({ "com.yomahub.liteflow.test.parallelLoop.cmp" })
public class ParallelLoopELDeclMultiSpringbootTest extends BaseTest {
@Resource
private FlowExecutor flowExecutor;
//测试并行FOR循环循环次数直接在el中定义
@Test
public void testParallelLoop1() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg");
Assertions.assertTrue(response.isSuccess());
}
//测试并行FOR循环循环次数由For组件定义
@Test
public void testParallelLoop2() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain2", "arg");
Assertions.assertTrue(response.isSuccess());
}
//测试并行FOR循环中的BREAK组件能够正常发挥作用
@Test
public void testParallelLoop3() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain3", "arg");
Assertions.assertTrue(response.isSuccess());
}
//测试并行FOR循环中主线程是否会正常等待所有并行子项完成后再继续执行
@Test
public void testParallelLoop4() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain4", "arg");
Assertions.assertTrue(response.isSuccess());
}
@Test
//测试并行FOR循环中某个并行子项抛出异常
public void testParallelLoop5() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain5", "arg");
Assertions.assertFalse(response.isSuccess());
Assertions.assertEquals("300", response.getCode());
Assertions.assertNotNull(response.getCause());
Assertions.assertTrue(response.getCause() instanceof LiteFlowException);
Assertions.assertNotNull(response.getSlot());
}
//并行的条件循环
@Test
public void testParallelLoop6() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain6", "arg");
Assertions.assertTrue(response.isSuccess());
}
//并行的迭代循环
@Test
public void testParallelLoop7() throws Exception {
List<String> list = ListUtil.toList("1", "2", "3", "4", "5");
LiteflowResponse response = flowExecutor.execute2Resp("chain7", list);
Assertions.assertTrue(response.isSuccess());
}
//测试并行FOR循环中的index
@Test
public void testParallelLoop8() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain8", "arg");
DefaultContext context = response.getFirstContextBean();
Assertions.assertTrue(response.isSuccess());
String regex = "(?!.*(.).*\\1)[0-4]{5}"; //匹配不包含重复数字的0-4的5位数字
Pattern pattern = Pattern.compile(regex);
//e1,e2,e3分别并行执行5次因此单个循环的顺序可以是任意的
Assertions.assertTrue(pattern.matcher(context.getData("loop_e1")).matches());
Assertions.assertTrue(pattern.matcher(context.getData("loop_e2")).matches());
Assertions.assertTrue(pattern.matcher(context.getData("loop_e3")).matches());
}
//测试自定义线程池配置是否生效
@Test
public void testParallelLoop9() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain9", "arg");
DefaultContext context = response.getFirstContextBean();
Assertions.assertTrue(response.isSuccess());
Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-loop-thead"));
}
}

View File

@ -0,0 +1,120 @@
package com.yomahub.liteflow.test.parallelLoop.cmp;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.annotation.LiteflowComponent;
import com.yomahub.liteflow.annotation.LiteflowMethod;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.enums.LiteFlowMethodEnum;
import com.yomahub.liteflow.enums.NodeTypeEnum;
import com.yomahub.liteflow.slot.DefaultContext;
import com.yomahub.liteflow.test.exception.CustomStatefulException;
import java.util.Iterator;
import java.util.List;
@LiteflowComponent
public class CmpConfig {
@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS, nodeId = "a")
public void processA(NodeComponent bindCmp) {
System.out.println("ACmp executed!");
}
@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS, nodeId = "b")
public void processB(NodeComponent bindCmp) {
System.out.println("BCmp executed!");
}
@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS, nodeId = "c")
public void processC(NodeComponent bindCmp) {
System.out.println("CCmp executed!");
}
@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS, nodeId = "d")
public void processD(NodeComponent bindCmp) {
DefaultContext context = bindCmp.getFirstContextBean();
String key = "test";
if (context.hasData(key)) {
int count = context.getData(key);
context.setData(key, ++count);
} else {
context.setData(key, 1);
}
}
@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS, nodeId = "e")
public void processE(NodeComponent bindCmp) {
synchronized (this){
DefaultContext context = bindCmp.getFirstContextBean();
String key = StrUtil.format("{}_{}", "loop", bindCmp.getTag());
if (context.hasData(key)) {
String loopStr = context.getData(key);
String loopStrReturn = StrUtil.format("{}{}", loopStr, bindCmp.getLoopIndex());
context.setData(key, loopStrReturn);
} else {
context.setData(key, bindCmp.getLoopIndex().toString());
}
}
}
@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS, nodeId = "f")
public void processF(NodeComponent bindCmp){
try {
System.out.println("FCmp start to sleep 5s");
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("FCmp executed!");
}
@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS, nodeId = "g")
public void processG(NodeComponent bindCmp){
if(bindCmp.getLoopIndex()==1){
throw new CustomStatefulException("300", "chain execute custom stateful execption");
}
System.out.println("GCmp executed!");
}
@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS, nodeId = "h")
public void processH(NodeComponent bindCmp){
DefaultContext context = bindCmp.getFirstContextBean();
context.setData("threadName", Thread.currentThread().getName());
System.out.println("HCmp executed!");
}
@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS_ITERATOR, nodeId = "it", nodeType = NodeTypeEnum.ITERATOR)
public Iterator<?> processIT(NodeComponent bindCmp) {
List<String> list = bindCmp.getRequestData();
return list.iterator();
}
@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS_FOR, nodeId = "x", nodeType = NodeTypeEnum.FOR)
public int processX(NodeComponent bindCmp) {
return 3;
}
@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS_BREAK, nodeId = "y", nodeType = NodeTypeEnum.BREAK)
public boolean processY(NodeComponent bindCmp) {
DefaultContext context = bindCmp.getFirstContextBean();
int count = 0;
if(context.hasData("test")) {
count = context.getData("test");
}
return count > 3;
}
@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS_WHILE, nodeId = "z", nodeType = NodeTypeEnum.WHILE)
public boolean processZ(NodeComponent bindCmp) {
DefaultContext context = bindCmp.getFirstContextBean();
String key = "test";
if (context.hasData(key)) {
int count = context.getData("test");
return count < 5;
}
else {
return true;
}
}
}

View File

@ -1,2 +0,0 @@
liteflow.rule-source=monitorFile/flow.el.xml
liteflow.enable-monitor-file=true

View File

@ -1,7 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<flow>
<chain name="chain1">
THEN(a, b, c);
</chain>
</flow>

View File

@ -0,0 +1,4 @@
liteflow.rule-source=parallelLoop/flow.xml
liteflow.parallel-max-workers = 10
liteflow.parallel-queue-limit = 1024
liteflow.parallel-loop-executor-class =com.yomahub.liteflow.test.parallelLoop.CustomThreadExecutor

View File

@ -0,0 +1,46 @@
<?xml version="1.0" encoding="UTF-8"?>
<flow>
<chain name="chain1">
FOR(2).DO(THEN(a,b,c));
</chain>
<chain name="chain2">
FOR(x).parallel(true).DO(THEN(a,b,c));
</chain>
<chain name="chain3">
FOR(100).parallel(true).DO(THEN(a,b,d)).BREAK(y);
</chain>
<chain name="chain4">
FOR(x).parallel(true).DO(THEN(a,b,f));
</chain>
<chain name="chain5">
FOR(x).parallel(true).DO(THEN(a,b,g));
</chain>
<chain name="chain6">
WHILE(z).parallel(true).DO(THEN(a,d));
</chain>
<chain name="chain7">
ITERATOR(it).parallel(true).DO(THEN(a,b));
</chain>
<chain name="chain8">
FOR(5).parallel(true).DO(
WHEN(
THEN(a,e.tag("e1")),
THEN(c,e.tag("e2")),
THEN(b,e.tag("e3"))
)
);
</chain>
<chain name="chain9">
FOR(x).parallel(true).DO(THEN(a,b,h));
</chain>
</flow>

View File

@ -1,43 +0,0 @@
package com.yomahub.liteflow.test.monitorFile;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.io.resource.ClassPathResource;
import cn.hutool.core.util.CharsetUtil;
import com.yomahub.liteflow.core.FlowExecutor;
import com.yomahub.liteflow.flow.LiteflowResponse;
import com.yomahub.liteflow.test.BaseTest;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
import java.io.File;
@ExtendWith(SpringExtension.class)
@TestPropertySource(value = "classpath:/monitorFile/application.properties")
@SpringBootTest(classes = MonitorFileELDeclSpringbootTest.class)
@EnableAutoConfiguration
@ComponentScan({ "com.yomahub.liteflow.test.monitorFile.cmp" })
public class MonitorFileELDeclSpringbootTest extends BaseTest {
@Resource
private FlowExecutor flowExecutor;
@Test
public void testMonitor() throws Exception {
String absolutePath = new ClassPathResource("classpath:/monitorFile/flow.el.xml").getAbsolutePath();
String content = FileUtil.readUtf8String(absolutePath);
String newContent = content.replace("THEN(a, b, c);", "THEN(a, c, b);");
FileUtil.writeString(newContent, new File(absolutePath), CharsetUtil.CHARSET_UTF_8);
Thread.sleep(3000);
LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg");
Assertions.assertEquals("a==>c==>b", response.getExecuteStepStr());
}
}

View File

@ -0,0 +1,14 @@
package com.yomahub.liteflow.test.parallelLoop;
import com.yomahub.liteflow.exception.LiteFlowException;
/**
* 用户自定义带状态码的异常
*/
public class CustomStatefulException extends LiteFlowException {
public CustomStatefulException(String code, String message) {
super(code, message);
}
}

View File

@ -0,0 +1,23 @@
package com.yomahub.liteflow.test.parallelLoop;
import cn.hutool.core.util.ObjectUtil;
import com.yomahub.liteflow.property.LiteflowConfig;
import com.yomahub.liteflow.property.LiteflowConfigGetter;
import com.yomahub.liteflow.thread.ExecutorBuilder;
import java.util.concurrent.ExecutorService;
public class CustomThreadExecutor implements ExecutorBuilder {
@Override
public ExecutorService buildExecutor() {
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
// 只有在非spring的场景下liteflowConfig才会为null
if (ObjectUtil.isNull(liteflowConfig)) {
liteflowConfig = new LiteflowConfig();
}
return buildDefaultExecutor(liteflowConfig.getParallelMaxWorkers(), liteflowConfig.getParallelMaxWorkers(),
liteflowConfig.getParallelQueueLimit(), "customer-loop-thead-");
}
}

View File

@ -0,0 +1,118 @@
package com.yomahub.liteflow.test.parallelLoop;
import cn.hutool.core.collection.ListUtil;
import com.yomahub.liteflow.core.FlowExecutor;
import com.yomahub.liteflow.exception.LiteFlowException;
import com.yomahub.liteflow.flow.LiteflowResponse;
import com.yomahub.liteflow.slot.DefaultContext;
import com.yomahub.liteflow.test.BaseTest;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import javax.annotation.Resource;
import java.util.List;
import java.util.regex.Pattern;
/**
* springboot环境EL异步循环测试
*
* @author zhhhhy
* @since 2.11.0
*/
@ExtendWith(SpringExtension.class)
@TestPropertySource(value = "classpath:/parallelLoop/application.properties")
@SpringBootTest(classes = ParallelLoopELDeclSpringbootTest.class)
@EnableAutoConfiguration
@ComponentScan({ "com.yomahub.liteflow.test.parallelLoop.cmp" })
public class ParallelLoopELDeclSpringbootTest extends BaseTest {
@Resource
private FlowExecutor flowExecutor;
//测试并行FOR循环循环次数直接在el中定义
@Test
public void testParallelLoop1() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg");
Assertions.assertTrue(response.isSuccess());
}
//测试并行FOR循环循环次数由For组件定义
@Test
public void testParallelLoop2() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain2", "arg");
Assertions.assertTrue(response.isSuccess());
}
//测试并行FOR循环中的BREAK组件能够正常发挥作用
@Test
public void testParallelLoop3() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain3", "arg");
Assertions.assertTrue(response.isSuccess());
}
//测试并行FOR循环中主线程是否会正常等待所有并行子项完成后再继续执行
@Test
public void testParallelLoop4() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain4", "arg");
Assertions.assertTrue(response.isSuccess());
}
@Test
//测试并行FOR循环中某个并行子项抛出异常
public void testParallelLoop5() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain5", "arg");
Assertions.assertFalse(response.isSuccess());
Assertions.assertEquals("300", response.getCode());
Assertions.assertNotNull(response.getCause());
Assertions.assertTrue(response.getCause() instanceof LiteFlowException);
Assertions.assertNotNull(response.getSlot());
}
//并行的条件循环
@Test
public void testParallelLoop6() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain6", "arg");
Assertions.assertTrue(response.isSuccess());
}
//并行的迭代循环
@Test
public void testParallelLoop7() throws Exception {
List<String> list = ListUtil.toList("1", "2", "3", "4", "5");
LiteflowResponse response = flowExecutor.execute2Resp("chain7", list);
Assertions.assertTrue(response.isSuccess());
}
//测试并行FOR循环中的index
@Test
public void testParallelLoop8() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain8", "arg");
DefaultContext context = response.getFirstContextBean();
Assertions.assertTrue(response.isSuccess());
String regex = "(?!.*(.).*\\1)[0-4]{5}"; //匹配不包含重复数字的0-4的5位数字
Pattern pattern = Pattern.compile(regex);
//e1,e2,e3分别并行执行5次因此单个循环的顺序可以是任意的
Assertions.assertTrue(pattern.matcher(context.getData("loop_e1")).matches());
Assertions.assertTrue(pattern.matcher(context.getData("loop_e2")).matches());
Assertions.assertTrue(pattern.matcher(context.getData("loop_e3")).matches());
}
//测试自定义线程池配置是否生效
@Test
public void testParallelLoop9() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain9", "arg");
DefaultContext context = response.getFirstContextBean();
Assertions.assertTrue(response.isSuccess());
Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-loop-thead"));
}
}

View File

@ -5,15 +5,13 @@
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.monitorFile.cmp;
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.annotation.LiteflowMethod;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.enums.LiteFlowMethodEnum;
import org.springframework.stereotype.Component;
import java.util.Random;
@Component("a")
public class ACmp {

View File

@ -5,15 +5,13 @@
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.monitorFile.cmp;
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.annotation.LiteflowMethod;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.enums.LiteFlowMethodEnum;
import org.springframework.stereotype.Component;
import java.util.Random;
@Component("b")
public class BCmp {

View File

@ -5,15 +5,13 @@
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.monitorFile.cmp;
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.annotation.LiteflowMethod;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.enums.LiteFlowMethodEnum;
import org.springframework.stereotype.Component;
import java.util.Random;
@Component("c")
public class CCmp {

View File

@ -0,0 +1,32 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.annotation.LiteflowMethod;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.enums.LiteFlowMethodEnum;
import com.yomahub.liteflow.slot.DefaultContext;
import org.springframework.stereotype.Component;
@Component("d")
public class DCmp {
@LiteflowMethod(LiteFlowMethodEnum.PROCESS)
public void process(NodeComponent bindCmp) {
DefaultContext context = bindCmp.getFirstContextBean();
String key = "test";
if (context.hasData(key)) {
int count = context.getData(key);
context.setData(key, ++count);
}
else {
context.setData(key, 1);
}
}
}

View File

@ -0,0 +1,35 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.parallelLoop.cmp;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.annotation.LiteflowMethod;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.enums.LiteFlowMethodEnum;
import com.yomahub.liteflow.slot.DefaultContext;
import org.springframework.stereotype.Component;
@Component("e")
public class ECmp {
//注意与串行的ECmp相比,并行的ECmp的process方法必须保证线程安全
@LiteflowMethod(LiteFlowMethodEnum.PROCESS)
public synchronized void process(NodeComponent bindCmp) {
DefaultContext context = bindCmp.getFirstContextBean();
String key = StrUtil.format("{}_{}", "loop", bindCmp.getTag());
if (context.hasData(key)) {
String loopStr = context.getData(key);
String loopStrReturn = StrUtil.format("{}{}", loopStr, bindCmp.getLoopIndex());
context.setData(key, loopStrReturn);
}
else {
context.setData(key, bindCmp.getLoopIndex().toString());
}
}
}

View File

@ -0,0 +1,22 @@
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.annotation.LiteflowMethod;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.enums.LiteFlowMethodEnum;
import org.springframework.stereotype.Component;
@Component("f")
public class FCmp {
@LiteflowMethod(LiteFlowMethodEnum.PROCESS)
public void process(NodeComponent bindCmp) {
try {
System.out.println("FCmp start to sleep 5s");
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("FCmp executed!");
}
}

View File

@ -0,0 +1,20 @@
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.annotation.LiteflowMethod;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.enums.LiteFlowMethodEnum;
import com.yomahub.liteflow.test.exception.CustomStatefulException;
import org.springframework.stereotype.Component;
@Component("g")
public class GCmp {
@LiteflowMethod(LiteFlowMethodEnum.PROCESS)
public void process(NodeComponent bindCmp) {
if(bindCmp.getLoopIndex()==1){
throw new CustomStatefulException("300", "chain execute custom stateful execption");
}
System.out.println("GCmp executed!");
}
}

View File

@ -0,0 +1,21 @@
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.annotation.LiteflowMethod;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.enums.LiteFlowMethodEnum;
import com.yomahub.liteflow.slot.DefaultContext;
import com.yomahub.liteflow.test.exception.CustomStatefulException;
import org.springframework.stereotype.Component;
@Component("h")
public class HCmp {
@LiteflowMethod(LiteFlowMethodEnum.PROCESS)
public void process(NodeComponent bindCmp) {
DefaultContext context = bindCmp.getFirstContextBean();
context.setData("threadName", Thread.currentThread().getName());
System.out.println("HCmp executed!");
}
}

View File

@ -0,0 +1,24 @@
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.annotation.LiteflowCmpDefine;
import com.yomahub.liteflow.annotation.LiteflowMethod;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.core.NodeIteratorComponent;
import com.yomahub.liteflow.enums.LiteFlowMethodEnum;
import com.yomahub.liteflow.enums.NodeTypeEnum;
import org.springframework.stereotype.Component;
import java.util.Iterator;
import java.util.List;
@Component("it")
@LiteflowCmpDefine(NodeTypeEnum.ITERATOR)
public class ITCmp {
@LiteflowMethod(LiteFlowMethodEnum.PROCESS_ITERATOR)
public Iterator<?> processIterator(NodeComponent bindCmp) throws Exception {
List<String> list = bindCmp.getRequestData();
return list.iterator();
}
}

View File

@ -0,0 +1,17 @@
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.annotation.LiteflowComponent;
import com.yomahub.liteflow.annotation.LiteflowMethod;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.enums.LiteFlowMethodEnum;
import com.yomahub.liteflow.enums.NodeTypeEnum;
@LiteflowComponent("x")
public class XCmp {
@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS_FOR, nodeType = NodeTypeEnum.FOR)
public int processFor(NodeComponent bindCmp) throws Exception {
return 3;
}
}

View File

@ -0,0 +1,23 @@
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.annotation.LiteflowComponent;
import com.yomahub.liteflow.annotation.LiteflowMethod;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.enums.LiteFlowMethodEnum;
import com.yomahub.liteflow.enums.NodeTypeEnum;
import com.yomahub.liteflow.slot.DefaultContext;
@LiteflowComponent("y")
public class YCmp {
@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS_BREAK, nodeType = NodeTypeEnum.BREAK)
public boolean processBreak(NodeComponent bindCmp) throws Exception {
DefaultContext context = bindCmp.getFirstContextBean();
int count = 0;
if(context.hasData("test")) {
count = context.getData("test");
}
return count > 3;
}
}

View File

@ -0,0 +1,26 @@
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.annotation.LiteflowComponent;
import com.yomahub.liteflow.annotation.LiteflowMethod;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.enums.LiteFlowMethodEnum;
import com.yomahub.liteflow.enums.NodeTypeEnum;
import com.yomahub.liteflow.slot.DefaultContext;
@LiteflowComponent("z")
public class ZCmp {
@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS_WHILE, nodeType = NodeTypeEnum.WHILE)
public boolean processWhile(NodeComponent bindCmp) throws Exception {
DefaultContext context = bindCmp.getFirstContextBean();
String key = "test";
if (context.hasData(key)) {
int count = context.getData("test");
return count < 5;
}
else {
return true;
}
}
}

View File

@ -1,2 +0,0 @@
liteflow.rule-source=monitorFile/flow.el.xml
liteflow.enable-monitor-file=true

View File

@ -1,7 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<flow>
<chain name="chain1">
THEN(a, b, c);
</chain>
</flow>

View File

@ -0,0 +1,4 @@
liteflow.rule-source=parallelLoop/flow.xml
liteflow.parallel-max-workers = 10
liteflow.parallel-queue-limit = 1024
liteflow.parallel-loop-executor-class =com.yomahub.liteflow.test.parallelLoop.CustomThreadExecutor

View File

@ -0,0 +1,46 @@
<?xml version="1.0" encoding="UTF-8"?>
<flow>
<chain name="chain1">
FOR(2).parallel(true).DO(THEN(a,b,c));
</chain>
<chain name="chain2">
FOR(x).parallel(true).DO(THEN(a,b,c));
</chain>
<chain name="chain3">
FOR(100).parallel(true).DO(THEN(a,b,d)).BREAK(y);
</chain>
<chain name="chain4">
FOR(x).parallel(true).DO(THEN(a,b,f));
</chain>
<chain name="chain5">
FOR(x).parallel(true).DO(THEN(a,b,g));
</chain>
<chain name="chain6">
WHILE(z).parallel(true).DO(THEN(a,d));
</chain>
<chain name="chain7">
ITERATOR(it).parallel(true).DO(THEN(a,b));
</chain>
<chain name="chain8">
FOR(5).parallel(true).DO(
WHEN(
THEN(a,e.tag("e1")),
THEN(c,e.tag("e2")),
THEN(b,e.tag("e3"))
)
);
</chain>
<chain name="chain9">
FOR(x).parallel(true).DO(THEN(a,b,h));
</chain>
</flow>

View File

@ -1,40 +0,0 @@
package com.yomahub.liteflow.test.monitorFile;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.io.resource.ClassPathResource;
import cn.hutool.core.util.CharsetUtil;
import com.yomahub.liteflow.core.FlowExecutor;
import com.yomahub.liteflow.core.FlowExecutorHolder;
import com.yomahub.liteflow.flow.LiteflowResponse;
import com.yomahub.liteflow.property.LiteflowConfig;
import com.yomahub.liteflow.test.BaseTest;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.io.File;
public class LiteflowMonitorFileTest extends BaseTest {
private static FlowExecutor flowExecutor;
@BeforeAll
public static void init() {
LiteflowConfig config = new LiteflowConfig();
config.setRuleSource("monitorFile/flow.el.xml");
config.setEnableMonitorFile(true);
flowExecutor = FlowExecutorHolder.loadInstance(config);
}
@Test
public void testMonitor() throws InterruptedException {
String absolutePath = new ClassPathResource("classpath:/monitorFile/flow.el.xml").getAbsolutePath();
String content = FileUtil.readUtf8String(absolutePath);
String newContent = content.replace("THEN(a, b, c);", "THEN(a, c, b);");
FileUtil.writeString(newContent, new File(absolutePath), CharsetUtil.CHARSET_UTF_8);
Thread.sleep(3000);
LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg");
Assertions.assertEquals("a==>c==>b", response.getExecuteStepStr());
}
}

View File

@ -0,0 +1,14 @@
package com.yomahub.liteflow.test.parallelLoop;
import com.yomahub.liteflow.exception.LiteFlowException;
/**
* 用户自定义带状态码的异常
*/
public class CustomStatefulException extends LiteFlowException {
public CustomStatefulException(String code, String message) {
super(code, message);
}
}

View File

@ -0,0 +1,23 @@
package com.yomahub.liteflow.test.parallelLoop;
import cn.hutool.core.util.ObjectUtil;
import com.yomahub.liteflow.property.LiteflowConfig;
import com.yomahub.liteflow.property.LiteflowConfigGetter;
import com.yomahub.liteflow.thread.ExecutorBuilder;
import java.util.concurrent.ExecutorService;
public class CustomThreadExecutor implements ExecutorBuilder {
@Override
public ExecutorService buildExecutor() {
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
// 只有在非spring的场景下liteflowConfig才会为null
if (ObjectUtil.isNull(liteflowConfig)) {
liteflowConfig = new LiteflowConfig();
}
return buildDefaultExecutor(liteflowConfig.getParallelMaxWorkers(), liteflowConfig.getParallelMaxWorkers(),
liteflowConfig.getParallelQueueLimit(), "customer-loop-thead-");
}
}

View File

@ -0,0 +1,120 @@
package com.yomahub.liteflow.test.parallelLoop;
import cn.hutool.core.collection.ListUtil;
import com.yomahub.liteflow.core.FlowExecutor;
import com.yomahub.liteflow.core.FlowExecutorHolder;
import com.yomahub.liteflow.exception.LiteFlowException;
import com.yomahub.liteflow.flow.LiteflowResponse;
import com.yomahub.liteflow.property.LiteflowConfig;
import com.yomahub.liteflow.slot.DefaultContext;
import com.yomahub.liteflow.test.BaseTest;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.util.List;
import java.util.regex.Pattern;
/**
* nospring环境EL异步循环测试
*
* @author zhhhhy
* @since 2.11.0
*/
public class ParallelLoopTest extends BaseTest {
private static FlowExecutor flowExecutor;
@BeforeAll
public static void init() {
LiteflowConfig config = new LiteflowConfig();
config.setRuleSource("parallelLoop/flow.xml");
config.setParallelMaxWorkers(10);
config.setParallelQueueLimit(1024);
config.setParallelLoopExecutorClass("com.yomahub.liteflow.test.parallelLoop.CustomThreadExecutor");
flowExecutor = FlowExecutorHolder.loadInstance(config);
}
//测试并行FOR循环循环次数直接在el中定义
@Test
public void testParallelLoop1() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg");
Assertions.assertTrue(response.isSuccess());
}
//测试并行FOR循环循环次数由For组件定义
@Test
public void testParallelLoop2() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain2", "arg");
Assertions.assertTrue(response.isSuccess());
}
//测试并行FOR循环中的BREAK组件能够正常发挥作用
@Test
public void testParallelLoop3() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain3", "arg");
Assertions.assertTrue(response.isSuccess());
}
//测试并行FOR循环中主线程是否会正常等待所有并行子项完成后再继续执行
@Test
public void testParallelLoop4() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain4", "arg");
Assertions.assertTrue(response.isSuccess());
}
@Test
//测试并行FOR循环中某个并行子项抛出异常
public void testParallelLoop5() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain5", "arg");
Assertions.assertFalse(response.isSuccess());
Assertions.assertEquals("300", response.getCode());
Assertions.assertNotNull(response.getCause());
Assertions.assertTrue(response.getCause() instanceof LiteFlowException);
Assertions.assertNotNull(response.getSlot());
}
//并行的条件循环
@Test
public void testParallelLoop6() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain6", "arg");
Assertions.assertTrue(response.isSuccess());
}
//并行的迭代循环
@Test
public void testParallelLoop7() throws Exception {
List<String> list = ListUtil.toList("1", "2", "3", "4", "5");
LiteflowResponse response = flowExecutor.execute2Resp("chain7", list);
Assertions.assertTrue(response.isSuccess());
}
//测试并行FOR循环中的index
@Test
public void testParallelLoop8() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain8", "arg");
DefaultContext context = response.getFirstContextBean();
Assertions.assertTrue(response.isSuccess());
String regex = "(?!.*(.).*\\1)[0-4]{5}"; //匹配不包含重复数字的0-4的5位数字
Pattern pattern = Pattern.compile(regex);
//e1,e2,e3分别并行执行5次因此单个循环的顺序可以是任意的
Assertions.assertTrue(pattern.matcher(context.getData("loop_e1")).matches());
Assertions.assertTrue(pattern.matcher(context.getData("loop_e2")).matches());
Assertions.assertTrue(pattern.matcher(context.getData("loop_e3")).matches());
}
//测试自定义线程池配置是否生效
@Test
public void testParallelLoop9() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain9", "arg");
DefaultContext context = response.getFirstContextBean();
Assertions.assertTrue(response.isSuccess());
Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-loop-thead"));
}
}

View File

@ -5,11 +5,11 @@
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.monitorFile.cmp;
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.core.NodeComponent;
public class ACmp extends NodeComponent {
public class ACmp extends NodeComponent{
@Override
public void process() {

View File

@ -5,11 +5,11 @@
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.monitorFile.cmp;
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.core.NodeComponent;
public class BCmp extends NodeComponent {
public class BCmp extends NodeComponent{
@Override
public void process() {

View File

@ -5,11 +5,11 @@
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.monitorFile.cmp;
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.core.NodeComponent;
public class CCmp extends NodeComponent {
public class CCmp extends NodeComponent{
@Override
public void process() {

View File

@ -0,0 +1,28 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.slot.DefaultContext;
public class DCmp extends NodeComponent{
@Override
public void process() {
DefaultContext context = this.getFirstContextBean();
String key = "test";
if (context.hasData(key)) {
int count = context.getData(key);
context.setData(key, ++count);
}
else {
context.setData(key, 1);
}
}
}

View File

@ -0,0 +1,31 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.parallelLoop.cmp;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.slot.DefaultContext;
public class ECmp extends NodeComponent{
//注意与串行的ECmp相比,并行的ECmp的process方法必须保证线程安全
@Override
public synchronized void process() {
DefaultContext context = this.getFirstContextBean();
String key = StrUtil.format("{}_{}", "loop", this.getTag());
if (context.hasData(key)) {
String loopStr = context.getData(key);
String loopStrReturn = StrUtil.format("{}{}", loopStr, this.getLoopIndex());
context.setData(key, loopStrReturn);
}
else {
context.setData(key, this.getLoopIndex().toString());
}
}
}

View File

@ -0,0 +1,18 @@
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.core.NodeComponent;
public class FCmp extends NodeComponent{
@Override
public void process() {
try {
System.out.println("FCmp start to sleep 5s");
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("FCmp executed!");
}
}

View File

@ -0,0 +1,16 @@
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.test.exception.CustomStatefulException;
public class GCmp extends NodeComponent{
@Override
public void process() {
if(this.getLoopIndex()==1){
throw new CustomStatefulException("300", "chain execute custom stateful execption");
}
System.out.println("GCmp executed!");
}
}

View File

@ -0,0 +1,16 @@
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.slot.DefaultContext;
public class HCmp extends NodeComponent{
@Override
public void process() {
DefaultContext context = this.getFirstContextBean();
context.setData("threadName", Thread.currentThread().getName());
System.out.println("HCmp executed!");
}
}

View File

@ -0,0 +1,15 @@
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.core.NodeIteratorComponent;
import java.util.Iterator;
import java.util.List;
public class ITCmp extends NodeIteratorComponent {
@Override
public Iterator<?> processIterator() throws Exception {
List<String> list = this.getRequestData();
return list.iterator();
}
}

View File

@ -0,0 +1,12 @@
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.core.NodeForComponent;
public class XCmp extends NodeForComponent {
@Override
public int processFor() throws Exception {
return 3;
}
}

View File

@ -0,0 +1,17 @@
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.core.NodeBreakComponent;
import com.yomahub.liteflow.slot.DefaultContext;
public class YCmp extends NodeBreakComponent {
@Override
public boolean processBreak() throws Exception {
DefaultContext context = this.getFirstContextBean();
int count = 0;
if(context.hasData("test")) {
count = context.getData("test");
}
return count > 3;
}
}

View File

@ -0,0 +1,19 @@
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.core.NodeWhileComponent;
import com.yomahub.liteflow.slot.DefaultContext;
public class ZCmp extends NodeWhileComponent {
@Override
public boolean processWhile() throws Exception {
DefaultContext context = this.getFirstContextBean();
String key = "test";
if (context.hasData(key)) {
int count = context.getData("test");
return count < 5;
} else {
return true;
}
}
}

View File

@ -0,0 +1,61 @@
<?xml version="1.0" encoding="UTF-8"?>
<flow>
<nodes>
<node id="a" class="com.yomahub.liteflow.test.parallelLoop.cmp.ACmp"/>
<node id="b" class="com.yomahub.liteflow.test.parallelLoop.cmp.BCmp"/>
<node id="c" class="com.yomahub.liteflow.test.parallelLoop.cmp.CCmp"/>
<node id="d" class="com.yomahub.liteflow.test.parallelLoop.cmp.DCmp"/>
<node id="e" class="com.yomahub.liteflow.test.parallelLoop.cmp.ECmp"/>
<node id="f" class="com.yomahub.liteflow.test.parallelLoop.cmp.FCmp"/>
<node id="g" class="com.yomahub.liteflow.test.parallelLoop.cmp.GCmp"/>
<node id="h" class="com.yomahub.liteflow.test.parallelLoop.cmp.HCmp"/>
<node id="it" class="com.yomahub.liteflow.test.parallelLoop.cmp.ITCmp"/>
<node id="x" class="com.yomahub.liteflow.test.parallelLoop.cmp.XCmp"/>
<node id="y" class="com.yomahub.liteflow.test.parallelLoop.cmp.YCmp"/>
<node id="z" class="com.yomahub.liteflow.test.parallelLoop.cmp.ZCmp"/>
</nodes>
<chain name="chain1">
FOR(2).DO(THEN(a,b,c));
</chain>
<chain name="chain2">
FOR(x).parallel(true).DO(THEN(a,b,c));
</chain>
<chain name="chain3">
FOR(100).parallel(true).DO(THEN(a,b,d)).BREAK(y);
</chain>
<chain name="chain4">
FOR(x).parallel(true).DO(THEN(a,b,f));
</chain>
<chain name="chain5">
FOR(x).parallel(true).DO(THEN(a,b,g));
</chain>
<chain name="chain6">
WHILE(z).parallel(true).DO(THEN(a,d));
</chain>
<chain name="chain7">
ITERATOR(it).parallel(true).DO(THEN(a,b));
</chain>
<chain name="chain8">
FOR(5).parallel(true).DO(
WHEN(
THEN(a,e.tag("e1")),
THEN(c,e.tag("e2")),
THEN(b,e.tag("e3"))
)
);
</chain>
<chain name="chain9">
FOR(x).parallel(true).DO(THEN(a,b,h));
</chain>
</flow>

View File

@ -92,4 +92,13 @@ public class LiteFlowScriptScriptbeanGroovyELTest extends BaseTest {
Assertions.assertEquals("hello", context.get("demo"));
}
//测试用构造方法的方式注入bean的场景
@Test
public void testScriptBean8() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain8", "arg");
Assertions.assertTrue(response.isSuccess());
DefaultContext context = response.getFirstContextBean();
Assertions.assertEquals("hello,jordan", context.getData("demo"));
}
}

View File

@ -0,0 +1,26 @@
package com.yomahub.liteflow.test.script.groovy.scriptbean.bean;
import com.yomahub.liteflow.script.annotation.ScriptBean;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Component
@ScriptBean("demo5")
public class DemoBean5 {
private final DemoBean2 demoBean2;
public DemoBean5(DemoBean2 demoBean2) {
this.demoBean2 = demoBean2;
}
public String getDemoStr1() {
return "hello";
}
public String getDemoStr2(String name) {
return demoBean2.getDemoStr2(name);
}
}

View File

@ -50,6 +50,13 @@
abcCx.put("demo", str)
]]>
</node>
<node id="f" type="script" language="groovy">
<![CDATA[
def str = demo5.getDemoStr2("jordan")
defaultContext.setData("demo", str)
]]>
</node>
</nodes>
<chain name="chain1">
@ -79,4 +86,8 @@
<chain name="chain7">
THEN(a,b,c,s5);
</chain>
<chain name="chain8">
THEN(a,b,c,f);
</chain>
</flow>

View File

@ -1,36 +0,0 @@
package com.yomahub.liteflow.test.monitorFile;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.io.resource.ClassPathResource;
import cn.hutool.core.util.CharsetUtil;
import com.yomahub.liteflow.core.FlowExecutor;
import com.yomahub.liteflow.flow.LiteflowResponse;
import com.yomahub.liteflow.test.BaseTest;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.noear.solon.annotation.Inject;
import org.noear.solon.test.SolonJUnit5Extension;
import org.noear.solon.test.annotation.TestPropertySource;
import java.io.File;
@ExtendWith(SolonJUnit5Extension.class)
@TestPropertySource("classpath:/monitorFile/application.properties")
public class MonitorFileSpringbootTest extends BaseTest {
@Inject
private FlowExecutor flowExecutor;
@Test
public void testMonitor() throws Exception {
String absolutePath = new ClassPathResource("classpath:/monitorFile/flow.xml").getAbsolutePath();
String content = FileUtil.readUtf8String(absolutePath);
String newContent = content.replace("THEN(a, b, c);", "THEN(a, c, b);");
FileUtil.writeString(newContent, new File(absolutePath), CharsetUtil.CHARSET_UTF_8);
Thread.sleep(3000);
LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg");
Assertions.assertEquals("a==>c==>b", response.getExecuteStepStr());
}
}

View File

@ -0,0 +1,14 @@
package com.yomahub.liteflow.test.parallelLoop;
import com.yomahub.liteflow.exception.LiteFlowException;
/**
* 用户自定义带状态码的异常
*/
public class CustomStatefulException extends LiteFlowException {
public CustomStatefulException(String code, String message) {
super(code, message);
}
}

View File

@ -0,0 +1,23 @@
package com.yomahub.liteflow.test.parallelLoop;
import cn.hutool.core.util.ObjectUtil;
import com.yomahub.liteflow.property.LiteflowConfig;
import com.yomahub.liteflow.property.LiteflowConfigGetter;
import com.yomahub.liteflow.thread.ExecutorBuilder;
import java.util.concurrent.ExecutorService;
public class CustomThreadExecutor implements ExecutorBuilder {
@Override
public ExecutorService buildExecutor() {
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
// 只有在非spring的场景下liteflowConfig才会为null
if (ObjectUtil.isNull(liteflowConfig)) {
liteflowConfig = new LiteflowConfig();
}
return buildDefaultExecutor(liteflowConfig.getParallelMaxWorkers(), liteflowConfig.getParallelMaxWorkers(),
liteflowConfig.getParallelQueueLimit(), "customer-loop-thead-");
}
}

View File

@ -0,0 +1,113 @@
package com.yomahub.liteflow.test.parallelLoop;
import cn.hutool.core.collection.ListUtil;
import com.yomahub.liteflow.core.FlowExecutor;
import com.yomahub.liteflow.exception.LiteFlowException;
import com.yomahub.liteflow.flow.LiteflowResponse;
import com.yomahub.liteflow.slot.DefaultContext;
import com.yomahub.liteflow.test.BaseTest;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.noear.solon.annotation.Inject;
import org.noear.solon.test.SolonJUnit5Extension;
import org.noear.solon.test.annotation.TestPropertySource;
import javax.annotation.Resource;
import java.util.List;
import java.util.regex.Pattern;
/**
* springboot环境EL异步循环测试
*
* @author zhhhhy
* @since 2.11.0
*/
@ExtendWith(SolonJUnit5Extension.class)
@TestPropertySource("classpath:/parallelLoop/application.properties")
public class ParallelLoopELSpringbootTest extends BaseTest {
@Inject
private FlowExecutor flowExecutor;
//测试并行FOR循环循环次数直接在el中定义
@Test
public void testParallelLoop1() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg");
Assertions.assertTrue(response.isSuccess());
}
//测试并行FOR循环循环次数由For组件定义
@Test
public void testParallelLoop2() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain2", "arg");
Assertions.assertTrue(response.isSuccess());
}
//测试并行FOR循环中的BREAK组件能够正常发挥作用
@Test
public void testParallelLoop3() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain3", "arg");
Assertions.assertTrue(response.isSuccess());
}
//测试并行FOR循环中主线程是否会正常等待所有并行子项完成后再继续执行
@Test
public void testParallelLoop4() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain4", "arg");
Assertions.assertTrue(response.isSuccess());
}
@Test
//测试并行FOR循环中某个并行子项抛出异常
public void testParallelLoop5() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain5", "arg");
Assertions.assertFalse(response.isSuccess());
Assertions.assertEquals("300", response.getCode());
Assertions.assertNotNull(response.getCause());
Assertions.assertTrue(response.getCause() instanceof LiteFlowException);
Assertions.assertNotNull(response.getSlot());
}
//并行的条件循环
@Test
public void testParallelLoop6() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain6", "arg");
Assertions.assertTrue(response.isSuccess());
}
//并行的迭代循环
@Test
public void testParallelLoop7() throws Exception {
List<String> list = ListUtil.toList("1", "2", "3", "4", "5");
LiteflowResponse response = flowExecutor.execute2Resp("chain7", list);
Assertions.assertTrue(response.isSuccess());
}
//测试并行FOR循环中的index
@Test
public void testParallelLoop8() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain8", "arg");
DefaultContext context = response.getFirstContextBean();
Assertions.assertTrue(response.isSuccess());
String regex = "(?!.*(.).*\\1)[0-4]{5}"; //匹配不包含重复数字的0-4的5位数字
Pattern pattern = Pattern.compile(regex);
//e1,e2,e3分别并行执行5次因此单个循环的顺序可以是任意的
Assertions.assertTrue(pattern.matcher(context.getData("loop_e1")).matches());
Assertions.assertTrue(pattern.matcher(context.getData("loop_e2")).matches());
Assertions.assertTrue(pattern.matcher(context.getData("loop_e3")).matches());
}
//测试自定义线程池配置是否生效
@Test
public void testParallelLoop9() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain9", "arg");
DefaultContext context = response.getFirstContextBean();
Assertions.assertTrue(response.isSuccess());
Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-loop-thead"));
}
}

View File

@ -5,15 +5,13 @@
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.monitorFile.cmp;
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.core.NodeComponent;
import org.noear.solon.annotation.Component;
import java.util.Random;
@Component("a")
public class ACmp extends NodeComponent {
public class ACmp extends NodeComponent{
@Override
public void process() {

View File

@ -5,15 +5,13 @@
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.monitorFile.cmp;
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.core.NodeComponent;
import org.noear.solon.annotation.Component;
import java.util.Random;
@Component("b")
public class BCmp extends NodeComponent {
public class BCmp extends NodeComponent{
@Override
public void process() {

View File

@ -5,15 +5,13 @@
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.monitorFile.cmp;
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.core.NodeComponent;
import org.noear.solon.annotation.Component;
import java.util.Random;
@Component("c")
public class CCmp extends NodeComponent {
public class CCmp extends NodeComponent{
@Override
public void process() {

View File

@ -0,0 +1,30 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.slot.DefaultContext;
import org.noear.solon.annotation.Component;
@Component("d")
public class DCmp extends NodeComponent{
@Override
public void process() {
DefaultContext context = this.getFirstContextBean();
String key = "test";
if (context.hasData(key)) {
int count = context.getData(key);
context.setData(key, ++count);
}
else {
context.setData(key, 1);
}
}
}

View File

@ -0,0 +1,33 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.parallelLoop.cmp;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.slot.DefaultContext;
import org.noear.solon.annotation.Component;
@Component("e")
public class ECmp extends NodeComponent{
//注意与串行的ECmp相比,并行的ECmp的process方法必须保证线程安全
@Override
public synchronized void process() {
DefaultContext context = this.getFirstContextBean();
String key = StrUtil.format("{}_{}", "loop", this.getTag());
if (context.hasData(key)) {
String loopStr = context.getData(key);
String loopStrReturn = StrUtil.format("{}{}", loopStr, this.getLoopIndex());
context.setData(key, loopStrReturn);
}
else {
context.setData(key, this.getLoopIndex().toString());
}
}
}

View File

@ -0,0 +1,20 @@
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.core.NodeComponent;
import org.noear.solon.annotation.Component;
@Component("f")
public class FCmp extends NodeComponent{
@Override
public void process() {
try {
System.out.println("FCmp start to sleep 5s");
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("FCmp executed!");
}
}

View File

@ -0,0 +1,18 @@
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.test.exception.CustomStatefulException;
import org.noear.solon.annotation.Component;
@Component("g")
public class GCmp extends NodeComponent{
@Override
public void process() {
if(this.getLoopIndex()==1){
throw new CustomStatefulException("300", "chain execute custom stateful execption");
}
System.out.println("GCmp executed!");
}
}

Some files were not shown because too many files have changed in this diff Show More