feature #I7YYLE 实现组件降级

This commit is contained in:
Dale Lee 2023-09-24 15:58:04 +08:00
parent 510e3d77c9
commit cdbc5aca7d
13 changed files with 751 additions and 386 deletions

View File

@ -0,0 +1,27 @@
package com.yomahub.liteflow.annotation;
import com.yomahub.liteflow.enums.NodeTypeEnum;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* 降级组件
* @author DaleLee
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface FallbackCmp {
/**
* 节点类型
* @return NodeTypeEnum
*/
NodeTypeEnum type() default NodeTypeEnum.COMMON;
}

View File

@ -2,10 +2,16 @@ package com.yomahub.liteflow.builder.el.operator;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.ql.util.express.ArraySwap;
import com.ql.util.express.IExpressContext;
import com.ql.util.express.InstructionSetContext;
import com.ql.util.express.OperateData;
import com.ql.util.express.exception.QLException;
import com.yomahub.liteflow.builder.el.operator.base.BaseOperator;
import com.yomahub.liteflow.builder.el.operator.base.OperatorHelper;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.flow.FlowBus;
import com.yomahub.liteflow.flow.element.FallbackNodeProxy;
import com.yomahub.liteflow.flow.element.Node;
import com.yomahub.liteflow.property.LiteflowConfig;
import com.yomahub.liteflow.property.LiteflowConfigGetter;
@ -20,38 +26,16 @@ public class NodeOperator extends BaseOperator<Node> {
@Override
public Node build(Object[] objects) throws Exception {
OperatorHelper.checkObjectSizeEqOne(objects);
OperatorHelper.checkObjectSizeEqOne(objects);
String nodeId = OperatorHelper.convert(objects[0], String.class);
if (FlowBus.containNode(nodeId)) {
// 找到对应节点
return FlowBus.getNode(nodeId);
}
else {
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
if (StrUtil.isNotBlank(liteflowConfig.getSubstituteCmpClass())) {
Node substituteNode = FlowBus.getNodeMap()
.values()
.stream()
.filter(node -> node.getInstance()
.getClass()
.getName()
.equals(liteflowConfig.getSubstituteCmpClass()))
.findFirst()
.orElse(null);
if (ObjectUtil.isNotNull(substituteNode)) {
return substituteNode;
}
else {
String error = StrUtil.format("This node[{}] cannot be found", nodeId);
throw new QLException(error);
}
}
else {
String error = StrUtil.format("This node[{}] cannot be found, or you can configure an substitute node",
nodeId);
throw new QLException(error);
}
} else {
// 生成代理节点
return new FallbackNodeProxy(nodeId);
}
}

View File

@ -0,0 +1,30 @@
package com.yomahub.liteflow.exception;
/**
* 没有找到降级组件异常
*
* @author DaleLee
*/
public class FallbackCmpNotFoundException extends RuntimeException {
private static final long serialVersionUID = 1L;
/**
* 异常信息
*/
private String message;
public FallbackCmpNotFoundException(String message) {
this.message = message;
}
@Override
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
}

View File

@ -6,11 +6,14 @@
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.flow;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.annotation.FallbackCmp;
import com.yomahub.liteflow.annotation.util.AnnoUtil;
import com.yomahub.liteflow.core.*;
import com.yomahub.liteflow.enums.FlowParserTypeEnum;
import com.yomahub.liteflow.enums.NodeTypeEnum;
@ -31,6 +34,7 @@ import com.yomahub.liteflow.spi.holder.ContextAwareHolder;
import com.yomahub.liteflow.spi.local.LocalContextAware;
import com.yomahub.liteflow.util.CopyOnWriteHashMap;
import com.yomahub.liteflow.util.LiteFlowProxyUtil;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@ -50,6 +54,8 @@ public class FlowBus {
private static final Map<String, Node> nodeMap = new CopyOnWriteHashMap<>();
private static final Map<NodeTypeEnum, Node> fallbackNodeMap = new CopyOnWriteHashMap<>();
private FlowBus() {
}
@ -83,7 +89,7 @@ public class FlowBus {
/**
* 添加已托管的节点SpringSolon 管理的节点
* */
*/
public static void addManagedNode(String nodeId, NodeComponent nodeComponent) {
// 根据class来猜测类型
NodeTypeEnum type = NodeTypeEnum.guessType(nodeComponent.getClass());
@ -92,12 +98,15 @@ public class FlowBus {
throw new NullNodeTypeException(StrUtil.format("node type is null for node[{}]", nodeId));
}
nodeMap.put(nodeId,
new Node(ComponentInitializer.loadInstance().initComponent(nodeComponent, type, nodeComponent.getName(), nodeId)));
Node node = new Node(ComponentInitializer.loadInstance()
.initComponent(nodeComponent, type, nodeComponent.getName(), nodeId));
nodeMap.put(nodeId, node);
addFallbackNode(node);
}
/**
* 添加 node
*
* @param nodeId 节点id
* @param name 节点名称
* @param type 节点类型
@ -109,6 +118,7 @@ public class FlowBus {
/**
* 添加 node
*
* @param nodeId 节点id
* @param name 节点名称
* @param nodeType 节点类型
@ -118,8 +128,7 @@ public class FlowBus {
Class<?> cmpClazz;
try {
cmpClazz = Class.forName(cmpClazzStr);
}
catch (Exception e) {
} catch (Exception e) {
throw new ComponentCannotRegisterException(e.getMessage());
}
addNode(nodeId, name, nodeType, cmpClazz, null, null);
@ -127,6 +136,7 @@ public class FlowBus {
/**
* 添加脚本 node
*
* @param nodeId 节点id
* @param name 节点名称
* @param nodeType 节点类型
@ -154,18 +164,16 @@ public class FlowBus {
Object bean = ContextAwareHolder.loadContextAware().registerBean(nodeId, cmpClazz);
if (LocalContextAware.class.isAssignableFrom(contextAware.getClass())) {
cmpInstances = LiteFlowProxyUtil.proxy2NodeComponent(bean, nodeId);
}
else {
} else {
cmpInstances = ListUtil.toList((NodeComponent) bean);
}
}
else {
} else {
// 以node方式配置本质上是为了适配无spring的环境如果有spring环境其实不用这么配置
// 这里的逻辑是判断是否能从spring上下文中取到如果没有spring则就是new instance了
// 如果是script类型的节点因为class只有一个所以也不能注册进spring上下文注册的时候需要new Instance
if (!type.isScript()) {
cmpInstances = ListUtil
.toList((NodeComponent) ContextAwareHolder.loadContextAware().registerOrGet(nodeId, cmpClazz));
cmpInstances = ListUtil.toList(
(NodeComponent) ContextAwareHolder.loadContextAware().registerOrGet(nodeId, cmpClazz));
}
// 去除null元素
cmpInstances.remove(null);
@ -176,8 +184,7 @@ public class FlowBus {
}
}
// 进行初始化component
cmpInstances = cmpInstances.stream()
.map(cmpInstance -> ComponentInitializer.loadInstance()
cmpInstances = cmpInstances.stream().map(cmpInstance -> ComponentInitializer.loadInstance()
.initComponent(cmpInstance, type, name,
cmpInstance.getNodeId() == null ? nodeId : cmpInstance.getNodeId()))
.collect(Collectors.toList());
@ -194,8 +201,7 @@ public class FlowBus {
node.setScript(script);
node.setLanguage(language);
((ScriptComponent) cmpInstance).loadScript(script, language);
}
else {
} else {
String errorMsg = StrUtil.format("script for node[{}] is empty", nodeId);
throw new ScriptLoadException(errorMsg);
}
@ -203,10 +209,10 @@ public class FlowBus {
String activeNodeId = StrUtil.isEmpty(cmpInstance.getNodeId()) ? nodeId : cmpInstance.getNodeId();
nodeMap.put(activeNodeId, node);
addFallbackNode(node);
}
}
catch (Exception e) {
} catch (Exception e) {
String error = StrUtil.format("component[{}] register error",
StrUtil.isEmpty(name) ? nodeId : StrUtil.format("{}({})", nodeId, name));
LOG.error(e.getMessage());
@ -226,9 +232,14 @@ public class FlowBus {
return chainMap;
}
public static Node getFallBackNode(NodeTypeEnum nodeType) {
return fallbackNodeMap.get(nodeType);
}
public static void cleanCache() {
chainMap.clear();
nodeMap.clear();
fallbackNodeMap.clear();
cleanScriptCache();
}
@ -236,19 +247,16 @@ public class FlowBus {
// 如果引入了脚本组件SPI则还需要清理脚本的缓存
try {
ScriptExecutorFactory.loadInstance().cleanScriptCache();
}
catch (ScriptSpiException ignored) {
} catch (ScriptSpiException ignored) {
}
}
public static void refreshFlowMetaData(FlowParserTypeEnum type, String content) throws Exception {
if (type.equals(FlowParserTypeEnum.TYPE_EL_XML)) {
new LocalXmlFlowELParser().parse(content);
}
else if (type.equals(FlowParserTypeEnum.TYPE_EL_JSON)) {
} else if (type.equals(FlowParserTypeEnum.TYPE_EL_JSON)) {
new LocalJsonFlowELParser().parse(content);
}
else if (type.equals(FlowParserTypeEnum.TYPE_EL_YML)) {
} else if (type.equals(FlowParserTypeEnum.TYPE_EL_YML)) {
new LocalYmlFlowELParser().parse(content);
}
}
@ -257,8 +265,7 @@ public class FlowBus {
if (containChain(chainId)) {
chainMap.remove(chainId);
return true;
}
else {
} else {
String errMsg = StrUtil.format("cannot find the chain[{}]", chainId);
LOG.error(errMsg);
return false;
@ -269,4 +276,18 @@ public class FlowBus {
Arrays.stream(chainIds).forEach(FlowBus::removeChain);
}
private static void addFallbackNode(Node node) {
NodeComponent nodeComponent = node.getInstance();
FallbackCmp fallbackCmp = AnnoUtil.getAnnotation(nodeComponent.getClass(), FallbackCmp.class);
if (fallbackCmp == null) {
return;
}
NodeTypeEnum nodeType = node.getType();
if (nodeType == null) {
nodeType = fallbackCmp.type();
}
fallbackNodeMap.put(nodeType, node);
}
}

View File

@ -1,10 +1,12 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
*
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.flow.element;
import cn.hutool.core.collection.CollUtil;
@ -12,7 +14,6 @@ import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.util.ObjectUtil;
import com.yomahub.liteflow.enums.ExecuteTypeEnum;
import com.yomahub.liteflow.exception.ChainEndException;
import com.yomahub.liteflow.flow.element.Executable;
import com.yomahub.liteflow.enums.ConditionTypeEnum;
import com.yomahub.liteflow.flow.element.condition.ConditionKey;
import com.yomahub.liteflow.slot.DataBus;
@ -28,7 +29,7 @@ import java.util.Map;
*
* @author Bryan.Zhang
*/
public abstract class Condition implements Executable{
public abstract class Condition implements Executable {
private String id;
@ -46,25 +47,27 @@ public abstract class Condition implements Executable{
@Override
public void execute(Integer slotIndex) throws Exception {
// 当前 Condition 入栈
Slot slot = DataBus.getSlot(slotIndex);
try {
slot.pushCondition(this);
executeCondition(slotIndex);
}
catch (ChainEndException e) {
} catch (ChainEndException e) {
// 这里单独catch ChainEndException是因为ChainEndException是用户自己setIsEnd抛出的异常
// 是属于正常逻辑所以会在FlowExecutor中判断这里不作为异常处理
throw e;
}
catch (Exception e) {
Slot slot = DataBus.getSlot(slotIndex);
} catch (Exception e) {
String chainId = this.getCurrChainId();
// 这里事先取到exception set到slot里为了方便finally取到exception
if (slot.isSubChain(chainId)) {
slot.setSubException(chainId, e);
}
else {
} else {
slot.setException(e);
}
throw e;
} finally {
// 当前 Condition 出栈
slot.popCondition();
}
}
@ -91,8 +94,7 @@ public abstract class Condition implements Executable{
List<Executable> list = getExecutableList(groupKey);
if (CollUtil.isEmpty(list)) {
return null;
}
else {
} else {
return list.get(0);
}
}
@ -112,8 +114,7 @@ public abstract class Condition implements Executable{
List<Executable> executableList = this.executableGroup.get(groupKey);
if (CollUtil.isEmpty(executableList)) {
this.executableGroup.put(groupKey, ListUtil.toList(executable));
}
else {
} else {
this.executableGroup.get(groupKey).add(executable);
}
}

View File

@ -0,0 +1,178 @@
package com.yomahub.liteflow.flow.element;
import cn.hutool.core.text.StrFormatter;
import cn.hutool.core.util.ObjectUtil;
import com.yomahub.liteflow.enums.ConditionTypeEnum;
import com.yomahub.liteflow.enums.NodeTypeEnum;
import com.yomahub.liteflow.exception.FallbackCmpNotFoundException;
import com.yomahub.liteflow.exception.FlowSystemException;
import com.yomahub.liteflow.flow.FlowBus;
import com.yomahub.liteflow.flow.element.condition.ConditionKey;
import com.yomahub.liteflow.flow.element.condition.ForCondition;
import com.yomahub.liteflow.flow.element.condition.IfCondition;
import com.yomahub.liteflow.flow.element.condition.IteratorCondition;
import com.yomahub.liteflow.flow.element.condition.LoopCondition;
import com.yomahub.liteflow.flow.element.condition.SwitchCondition;
import com.yomahub.liteflow.flow.element.condition.WhileCondition;
import com.yomahub.liteflow.slot.DataBus;
import com.yomahub.liteflow.slot.Slot;
public class FallbackNodeProxy extends Node {
private String originalNodeId;
private Node fallbackNode;`
public FallbackNodeProxy() {
}
public FallbackNodeProxy(String originalNodeId) {
this.originalNodeId = originalNodeId;
}
@Override
public void execute(Integer slotIndex) throws Exception {
loadFallBackNode(slotIndex);
this.fallbackNode.setCurrChainId(this.getCurrChainId());
this.fallbackNode.execute(slotIndex);
}
private void loadFallBackNode(Integer slotIndex) throws Exception {
if (ObjectUtil.isNotNull(this.fallbackNode)) {
// 已经加载过了
return;
}
Slot slot = DataBus.getSlot(slotIndex);
Condition curCondition = slot.getCurrentCondition();
if (ObjectUtil.isNotNull(curCondition)) {
throw new FlowSystemException("The current executing condition could not be found.");
}
Node node = findFallbackNode(curCondition);
if (ObjectUtil.isNull(node)) {
throw new FallbackCmpNotFoundException(
StrFormatter.format("No fallback component found for \"{}\" in {}.",
this.originalNodeId, this.getCurrChainId()));
}
// 使用 node 的副本
this.fallbackNode = node.copy();
}
private Node findFallbackNode(Condition condition) {
ConditionTypeEnum conditionType = condition.getConditionType();
switch (conditionType) {
case TYPE_THEN:
case TYPE_WHEN:
case TYPE_PRE:
case TYPE_FINALLY:
case TYPE_CATCH:
return FlowBus.getFallBackNode(NodeTypeEnum.COMMON);
case TYPE_IF:
return findNodeInIf((IfCondition) condition);
case TYPE_SWITCH:
return findNodeInSwitch((SwitchCondition) condition);
case TYPE_FOR:
return findNodeInFor((ForCondition) condition);
case TYPE_WHILE:
return findNodeInWhile((WhileCondition) condition);
case TYPE_ITERATOR:
return findNodeInIterator((IteratorCondition) condition);
case TYPE_NOT_OPT:
case TYPE_AND_OR_OPT:
return FlowBus.getFallBackNode(NodeTypeEnum.IF);
default:
return null;
}
}
private Node findNodeInIf(IfCondition ifCondition) {
Executable ifItem = ifCondition.getIfItem();
if (ifItem == this) {
// 需要条件组件
return FlowBus.getFallBackNode(NodeTypeEnum.IF);
}
// 需要普通组件
return FlowBus.getFallBackNode(NodeTypeEnum.COMMON);
}
private Node findNodeInSwitch(SwitchCondition switchCondition) {
Node switchNode = switchCondition.getSwitchNode();
if (switchNode == this) {
return FlowBus.getFallBackNode(NodeTypeEnum.SWITCH);
}
return FlowBus.getFallBackNode(NodeTypeEnum.COMMON);
}
private Node findNodeInFor(ForCondition forCondition) {
Node forNode = forCondition.getForNode();
if (forNode == this) {
return FlowBus.getFallBackNode(NodeTypeEnum.FOR);
}
return findNodeInLoop(forCondition);
}
private Node findNodeInWhile(WhileCondition whileCondition) {
Executable whileItem = whileCondition.getWhileItem();
if (whileItem == this) {
return FlowBus.getFallBackNode(NodeTypeEnum.WHILE);
}
Executable breakItem = whileCondition.getExecutableOne(ConditionKey.BREAK_KEY);
if (breakItem == this) {
return FlowBus.getFallBackNode(NodeTypeEnum.BREAK);
}
return findNodeInLoop(whileCondition);
}
private Node findNodeInLoop(LoopCondition loopCondition) {
Executable breakItem = loopCondition.getExecutableOne(ConditionKey.BREAK_KEY);
if (breakItem == this) {
return FlowBus.getFallBackNode(NodeTypeEnum.BREAK);
}
return FlowBus.getFallBackNode(NodeTypeEnum.COMMON);
}
private Node findNodeInIterator(IteratorCondition iteratorCondition) {
Node iteratorNode = iteratorCondition.getIteratorNode();
if (iteratorNode == this) {
return FlowBus.getFallBackNode(NodeTypeEnum.ITERATOR);
}
return FlowBus.getFallBackNode(NodeTypeEnum.COMMON);
}
@Override
public <T> T getItemResultMetaValue(Integer slotIndex) {
return this.fallbackNode.getItemResultMetaValue(slotIndex);
}
@Override
public boolean isAccess(Integer slotIndex) throws Exception {
// WHEN 可能会先访问这个方法所以在这里就要加载降级节点
loadFallBackNode(slotIndex);
return this.fallbackNode.isAccess(slotIndex);
}
@Override
public String getId() {
return this.fallbackNode.getId();
}
@Override
public Node copy() {
// 代理节点不复制
return this;
}
public String getOriginalNodeId() {
return originalNodeId;
}
public void setOriginalNodeId(String originalNodeId) {
this.originalNodeId = originalNodeId;
}
}

View File

@ -13,6 +13,7 @@ import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.exception.NoSuchContextBeanException;
import com.yomahub.liteflow.exception.NullParamException;
import com.yomahub.liteflow.flow.element.Condition;
import com.yomahub.liteflow.flow.entity.CmpStep;
import com.yomahub.liteflow.flow.id.IdGeneratorHolder;
import com.yomahub.liteflow.log.LFLog;
@ -26,7 +27,6 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Consumer;
/**
* Slot的抽象类实现
@ -89,6 +89,8 @@ public class Slot {
private List<Object> contextBeanList;
private final Deque<Condition> conditionStack = new ConcurrentLinkedDeque<>();
public Slot() {
}
@ -288,6 +290,18 @@ public class Slot {
return getThreadMetaData(ITERATOR_PREFIX + key);
}
public Condition getCurrentCondition() {
return this.conditionStack.peek();
}
public void pushCondition(Condition condition) {
this.conditionStack.push(condition);
}
public void popCondition() {
this.conditionStack.pop();
}
/**
* @deprecated 请使用 {@link #setChainId(String)}
*/

View File

@ -0,0 +1,30 @@
package com.yomahub.liteflow.test.fallback;
import com.yomahub.liteflow.core.FlowExecutor;
import com.yomahub.liteflow.test.execute2Future.Executor2FutureELSpringbootTest;
import org.junit.jupiter.api.Test;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.test.context.TestPropertySource;
import javax.annotation.Resource;
@TestPropertySource(value = "classpath:/fallback/application.properties")
@SpringBootTest(classes = FallbackSpringbootTest.class)
@EnableAutoConfiguration
@ComponentScan({ "com.yomahub.liteflow.test.fallback.cmp" })
public class FallbackSpringbootTest {
@Resource
private FlowExecutor flowExecutor;
@Test
public void test1() {
flowExecutor.execute2Resp("chain1");
}
@Test
public void test2() {
flowExecutor.execute2Resp("chain2");
}
}

View File

@ -0,0 +1,21 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.fallback.cmp;
import com.yomahub.liteflow.annotation.LiteflowComponent;
import com.yomahub.liteflow.core.NodeComponent;
@LiteflowComponent("a")
public class ACmp extends NodeComponent {
@Override
public void process() {
System.out.println("ACmp executed!");
}
}

View File

@ -0,0 +1,24 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.fallback.cmp;
import com.yomahub.liteflow.annotation.LiteflowComponent;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.test.customNodes.domain.DemoDomain;
import javax.annotation.Resource;
@LiteflowComponent("b")
public class BCmp extends NodeComponent {
@Override
public void process() {
System.out.println("BCmp executed!");
}
}

View File

@ -0,0 +1,23 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.fallback.cmp;
import com.yomahub.liteflow.annotation.FallbackCmp;
import com.yomahub.liteflow.core.NodeComponent;
import org.springframework.stereotype.Component;
@Component("c")
@FallbackCmp
public class CCmp extends NodeComponent {
@Override
public void process() {
System.out.println("CCmp executed!");
}
}

View File

@ -0,0 +1 @@
liteflow.rule-source=fallback/flow.el.xml

View File

@ -0,0 +1,11 @@
<?xml version="1.0" encoding="UTF-8"?>
<flow>
<chain name="chain1">
THEN(a, node("d"));
</chain>
<chain name="chain2">
THEN(a, WHEN(b,node("d")));
</chain>
</flow>