!269 enhancement #I61D1N解析增加 enable 逻辑

Merge pull request !269 from 与或非/issues/I61D1N-v2
This commit is contained in:
铂赛东 2024-03-24 08:59:33 +00:00 committed by Gitee
commit 8ba6748c62
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
25 changed files with 1197 additions and 944 deletions

View File

@ -135,7 +135,9 @@ public class LiteFlowNodeBuilder {
}
public LiteFlowNodeBuilder setLanguage(String language) {
this.node.setLanguage(language);
if (StrUtil.isNotBlank(language)){
this.node.setLanguage(language);
}
return this;
}

View File

@ -28,6 +28,8 @@ public interface ChainConstant {
String NAME = "name";
String ENABLE = "enable";
String LANGUAGE = "language";
String VALUE = "value";

View File

@ -59,6 +59,10 @@ public class NodeConvertHelper {
nodeSimpleVO.setLanguage(matchItemList.get(3));
}
if (matchItemList.size() > 4) {
nodeSimpleVO.setEnable(Boolean.TRUE.toString().equalsIgnoreCase(matchItemList.get(4)));
}
return nodeSimpleVO;
}
@ -73,6 +77,10 @@ public class NodeConvertHelper {
private String language;
private Boolean enable = Boolean.TRUE;
private String script;
public String getNodeId() {
return nodeId;
}
@ -104,5 +112,21 @@ public class NodeConvertHelper {
public void setLanguage(String language) {
this.language = language;
}
public Boolean getEnable() {
return enable;
}
public void setEnable(Boolean enable) {
this.enable = enable;
}
public String getScript() {
return script;
}
public void setScript(String script) {
this.script = script;
}
}
}

View File

@ -35,394 +35,436 @@ import static com.yomahub.liteflow.common.ChainConstant.*;
*/
public class ParserHelper {
/**
* 私有化构造器
*/
private ParserHelper() {
}
/**
* 私有化构造器
*/
private ParserHelper() {
}
/**
* 构建 node
* @param nodePropBean 构建 node 的中间属性
*/
public static void buildNode(NodePropBean nodePropBean) {
String id = nodePropBean.getId();
String name = nodePropBean.getName();
String clazz = nodePropBean.getClazz();
String script = nodePropBean.getScript();
String type = nodePropBean.getType();
String file = nodePropBean.getFile();
String language = nodePropBean.getLanguage();
/**
* 构建 node
* @param nodePropBean 构建 node 的中间属性
*/
public static void buildNode(NodePropBean nodePropBean) {
String id = nodePropBean.getId();
String name = nodePropBean.getName();
String clazz = nodePropBean.getClazz();
String script = nodePropBean.getScript();
String type = nodePropBean.getType();
String file = nodePropBean.getFile();
String language = nodePropBean.getLanguage();
// clazz有值的基本都不是脚本节点
// 脚本节点都必须配置type
// 非脚本节点的先尝试自动推断类型
if (StrUtil.isNotBlank(clazz)) {
try {
// 先尝试从继承的类型中推断
Class<?> c = Class.forName(clazz);
NodeTypeEnum nodeType = NodeTypeEnum.guessType(c);
if (nodeType != null) {
type = nodeType.getCode();
}
}
catch (Exception e) {
throw new NodeClassNotFoundException(StrUtil.format("cannot find the node[{}]", clazz));
}
}
// clazz有值的基本都不是脚本节点
// 脚本节点都必须配置type
// 非脚本节点的先尝试自动推断类型
if (StrUtil.isNotBlank(clazz)) {
try {
// 先尝试从继承的类型中推断
Class<?> c = Class.forName(clazz);
NodeTypeEnum nodeType = NodeTypeEnum.guessType(c);
if (nodeType != null) {
type = nodeType.getCode();
}
}
catch (Exception e) {
throw new NodeClassNotFoundException(StrUtil.format("cannot find the node[{}]", clazz));
}
}
// 因为脚本节点是必须设置type的所以到这里type就全都有了所以进行二次检查
if (StrUtil.isBlank(type)) {
throw new NodeTypeCanNotGuessException(StrUtil.format("cannot guess the type of node[{}]", clazz));
}
// 因为脚本节点是必须设置type的所以到这里type就全都有了所以进行二次检查
if (StrUtil.isBlank(type)) {
throw new NodeTypeCanNotGuessException(StrUtil.format("cannot guess the type of node[{}]", clazz));
}
// 检查nodeType是不是规定的类型
NodeTypeEnum nodeTypeEnum = NodeTypeEnum.getEnumByCode(type);
if (ObjectUtil.isNull(nodeTypeEnum)) {
throw new NodeTypeNotSupportException(StrUtil.format("type [{}] is not support", type));
}
// 检查nodeType是不是规定的类型
NodeTypeEnum nodeTypeEnum = NodeTypeEnum.getEnumByCode(type);
if (ObjectUtil.isNull(nodeTypeEnum)) {
throw new NodeTypeNotSupportException(StrUtil.format("type [{}] is not support", type));
}
// 进行node的build过程
LiteFlowNodeBuilder.createNode()
.setId(id)
.setName(name)
.setClazz(clazz)
.setType(nodeTypeEnum)
.setScript(script)
.setFile(file)
.setLanguage(language)
.build();
}
// 进行node的build过程
LiteFlowNodeBuilder.createNode()
.setId(id)
.setName(name)
.setClazz(clazz)
.setType(nodeTypeEnum)
.setScript(script)
.setFile(file)
.setLanguage(language)
.build();
}
/**
* xml 形式的主要解析过程
* @param documentList documentList
*/
/**
* xml 形式的主要解析过程
* @param documentList documentList
*/
public static void parseNodeDocument(List<Document> documentList) {
for (Document document : documentList) {
Element rootElement = document.getRootElement();
Element nodesElement = rootElement.element(NODES);
// 当存在<nodes>节点定义时解析node节点
if (ObjectUtil.isNotNull(nodesElement)) {
List<Element> nodeList = nodesElement.elements(NODE);
String id, name, clazz, type, script, file, language;
for (Element e : nodeList) {
id = e.attributeValue(ID);
name = e.attributeValue(NAME);
clazz = e.attributeValue(_CLASS);
type = e.attributeValue(TYPE);
script = e.getText();
file = e.attributeValue(FILE);
language = e.attributeValue(LANGUAGE);
/**
* xml 形式的主要解析过程
* @param documentList documentList
*/
/**
* xml 形式的主要解析过程
* @param documentList documentList
*/
public static void parseNodeDocument(List<Document> documentList) {
for (Document document : documentList) {
Element rootElement = document.getRootElement();
Element nodesElement = rootElement.element(NODES);
// 当存在<nodes>节点定义时解析node节点
if (ObjectUtil.isNotNull(nodesElement)) {
List<Element> nodeList = nodesElement.elements(NODE);
String id, name, clazz, type, script, file, language;
for (Element e : nodeList) {
id = e.attributeValue(ID);
name = e.attributeValue(NAME);
clazz = e.attributeValue(_CLASS);
type = e.attributeValue(TYPE);
script = e.getText();
file = e.attributeValue(FILE);
language = e.attributeValue(LANGUAGE);
// 构建 node
NodePropBean nodePropBean = new NodePropBean().setId(id)
.setName(name)
.setClazz(clazz)
.setScript(script)
.setType(type)
.setFile(file)
.setLanguage(language);
if (!getEnableByElement(e)) {
continue;
}
ParserHelper.buildNode(nodePropBean);
}
}
}
}
// 构建 node
NodePropBean nodePropBean = new NodePropBean().setId(id)
.setName(name)
.setClazz(clazz)
.setScript(script)
.setType(type)
.setFile(file)
.setLanguage(language);
public static void parseChainDocument(List<Document> documentList, Set<String> chainIdSet,
Consumer<Element> parseOneChainConsumer) {
//用于存放抽象chain的map
Map<String,Element> abstratChainMap = new HashMap<>();
//用于存放已经解析过的实现chain
Set<Element> implChainSet = new HashSet<>();
// 先在元数据里放上chain
// 先放有一个好处可以在parse的时候先映射到FlowBus的chainMap然后再去解析
// 这样就不用去像之前的版本那样回归调用
// 同时也解决了不能循环依赖的问题
documentList.forEach(document -> {
// 解析chain节点
List<Element> chainList = document.getRootElement().elements(CHAIN);
ParserHelper.buildNode(nodePropBean);
}
}
}
}
// 先在元数据里放上chain
chainList.forEach(e -> {
// 校验加载的 chainName 是否有重复的
// TODO 这里是否有个问题当混合格式加载的时候2个同名的Chain在不同的文件里就不行了
String chainId = Optional.ofNullable(e.attributeValue(ID)).orElse(e.attributeValue(NAME));
// 检查 chainName
checkChainId(chainId, e.getText());
if (!chainIdSet.add(chainId)) {
throw new ChainDuplicateException(StrUtil.format("[chain name duplicate] chainName={}", chainId));
}
public static void parseChainDocument(List<Document> documentList, Set<String> chainIdSet,
Consumer<Element> parseOneChainConsumer) {
//用于存放抽象chain的map
Map<String,Element> abstratChainMap = new HashMap<>();
//用于存放已经解析过的实现chain
Set<Element> implChainSet = new HashSet<>();
// 先在元数据里放上chain
// 先放有一个好处可以在parse的时候先映射到FlowBus的chainMap然后再去解析
// 这样就不用去像之前的版本那样回归调用
// 同时也解决了不能循环依赖的问题
documentList.forEach(document -> {
// 解析chain节点
List<Element> chainList = document.getRootElement().elements(CHAIN);
FlowBus.addChain(chainId);
if(ElRegexUtil.isAbstractChain(e.getText())){
abstratChainMap.put(chainId,e);
//如果是抽象chain则向其中添加一个AbstractCondition,用于标记这个chain为抽象chain
Chain chain = FlowBus.getChain(chainId);
chain.getConditionList().add(new AbstractCondition());
}
});
});
// 清空
chainIdSet.clear();
// 先在元数据里放上chain
for (Element e : chainList) {
// 校验加载的 chainName 是否有重复的
// TODO 这里是否有个问题当混合格式加载的时候2个同名的Chain在不同的文件里就不行了
String chainId = Optional.ofNullable(e.attributeValue(ID)).orElse(e.attributeValue(NAME));
// 检查 chainName
checkChainId(chainId, e.getText());
if (!chainIdSet.add(chainId)) {
throw new ChainDuplicateException(StrUtil.format("[chain name duplicate] chainName={}", chainId));
}
// 如果是禁用就不解析了
if (!getEnableByElement(e)) {
continue;
}
// 解析每一个chain
for (Document document : documentList) {
Element rootElement = document.getRootElement();
List<Element> chainList = rootElement.elements(CHAIN);
for(Element chain:chainList){
//首先需要对继承自抽象Chain的chain进行字符串替换
parseImplChain(abstratChainMap, implChainSet, chain);
//如果一个chain不为抽象chain则进行解析
String chainName = Optional.ofNullable(chain.attributeValue(ID)).orElse(chain.attributeValue(NAME));
if(!abstratChainMap.containsKey(chainName)){
parseOneChainConsumer.accept(chain);
}
}
}
}
FlowBus.addChain(chainId);
if(ElRegexUtil.isAbstractChain(e.getText())){
abstratChainMap.put(chainId,e);
//如果是抽象chain则向其中添加一个AbstractCondition,用于标记这个chain为抽象chain
Chain chain = FlowBus.getChain(chainId);
chain.getConditionList().add(new AbstractCondition());
}
};
});
// 清空
chainIdSet.clear();
public static void parseNodeJson(List<JsonNode> flowJsonObjectList) {
for (JsonNode flowJsonNode : flowJsonObjectList) {
// 当存在<nodes>节点定义时解析node节点
if (flowJsonNode.get(FLOW).has(NODES)) {
Iterator<JsonNode> nodeIterator = flowJsonNode.get(FLOW).get(NODES).get(NODE).elements();
String id, name, clazz, script, type, file;
while ((nodeIterator.hasNext())) {
JsonNode nodeObject = nodeIterator.next();
id = nodeObject.get(ID).textValue();
name = nodeObject.hasNonNull(NAME) ? nodeObject.get(NAME).textValue() : "";
clazz = nodeObject.hasNonNull(_CLASS) ? nodeObject.get(_CLASS).textValue() : "";
;
type = nodeObject.hasNonNull(TYPE) ? nodeObject.get(TYPE).textValue() : null;
script = nodeObject.hasNonNull(VALUE) ? nodeObject.get(VALUE).textValue() : "";
file = nodeObject.hasNonNull(FILE) ? nodeObject.get(FILE).textValue() : "";
// 解析每一个chain
for (Document document : documentList) {
Element rootElement = document.getRootElement();
List<Element> chainList = rootElement.elements(CHAIN);
for(Element chain:chainList){
// 如果是禁用就不解析了
if (!getEnableByElement(chain)) {
continue;
}
// 构建 node
NodePropBean nodePropBean = new NodePropBean().setId(id)
.setName(name)
.setClazz(clazz)
.setScript(script)
.setType(type)
.setFile(file);
//首先需要对继承自抽象Chain的chain进行字符串替换
parseImplChain(abstratChainMap, implChainSet, chain);
//如果一个chain不为抽象chain则进行解析
String chainName = Optional.ofNullable(chain.attributeValue(ID)).orElse(chain.attributeValue(NAME));
if(!abstratChainMap.containsKey(chainName)){
parseOneChainConsumer.accept(chain);
}
}
}
}
ParserHelper.buildNode(nodePropBean);
}
}
}
}
public static void parseNodeJson(List<JsonNode> flowJsonObjectList) {
for (JsonNode flowJsonNode : flowJsonObjectList) {
// 当存在<nodes>节点定义时解析node节点
if (flowJsonNode.get(FLOW).has(NODES)) {
Iterator<JsonNode> nodeIterator = flowJsonNode.get(FLOW).get(NODES).get(NODE).elements();
String id, name, clazz, script, type, file;
while ((nodeIterator.hasNext())) {
JsonNode nodeObject = nodeIterator.next();
id = nodeObject.get(ID).textValue();
name = nodeObject.hasNonNull(NAME) ? nodeObject.get(NAME).textValue() : "";
clazz = nodeObject.hasNonNull(_CLASS) ? nodeObject.get(_CLASS).textValue() : "";
type = nodeObject.hasNonNull(TYPE) ? nodeObject.get(TYPE).textValue() : null;
script = nodeObject.hasNonNull(VALUE) ? nodeObject.get(VALUE).textValue() : "";
file = nodeObject.hasNonNull(FILE) ? nodeObject.get(FILE).textValue() : "";
public static void parseChainJson(List<JsonNode> flowJsonObjectList, Set<String> chainIdSet,
Consumer<JsonNode> parseOneChainConsumer) {
//用于存放抽象chain的map
Map<String,JsonNode> abstratChainMap = new HashMap<>();
//用于存放已经解析过的实现chain
Set<JsonNode> implChainSet = new HashSet<>();
// 先在元数据里放上chain
// 先放有一个好处可以在parse的时候先映射到FlowBus的chainMap然后再去解析
// 这样就不用去像之前的版本那样回归调用
// 同时也解决了不能循环依赖的问题
flowJsonObjectList.forEach(jsonObject -> {
// 解析chain节点
Iterator<JsonNode> iterator = jsonObject.get(FLOW).get(CHAIN).elements();
// 先在元数据里放上chain
while (iterator.hasNext()) {
JsonNode innerJsonObject = iterator.next();
// 校验加载的 chainName 是否有重复的
// TODO 这里是否有个问题当混合格式加载的时候2个同名的Chain在不同的文件里就不行了
JsonNode chainNameJsonNode = Optional.ofNullable(innerJsonObject.get(ID))
.orElse(innerJsonObject.get(NAME));
String chainId = Optional.ofNullable(chainNameJsonNode).map(JsonNode::textValue).orElse(null);
// 检查 chainName
checkChainId(chainId, innerJsonObject.toString());
if (!chainIdSet.add(chainId)) {
throw new ChainDuplicateException(String.format("[chain id duplicate] chainId=%s", chainId));
}
// 如果是禁用的就不编译了
if (!getEnableByJsonNode(nodeObject)) {
continue;
}
FlowBus.addChain(chainId);
if(ElRegexUtil.isAbstractChain(innerJsonObject.get(VALUE).textValue())){
abstratChainMap.put(chainId,innerJsonObject);
//如果是抽象chain则向其中添加一个AbstractCondition,用于标记这个chain为抽象chain
Chain chain = FlowBus.getChain(chainId);
chain.getConditionList().add(new AbstractCondition());
}
}
});
// 清空
chainIdSet.clear();
// 构建 node
NodePropBean nodePropBean = new NodePropBean().setId(id)
.setName(name)
.setClazz(clazz)
.setScript(script)
.setType(type)
.setFile(file);
for (JsonNode flowJsonNode : flowJsonObjectList) {
// 解析每一个chain
Iterator<JsonNode> chainIterator = flowJsonNode.get(FLOW).get(CHAIN).elements();
while (chainIterator.hasNext()) {
JsonNode chainNode = chainIterator.next();
//首先需要对继承自抽象Chain的chain进行字符串替换
parseImplChain(abstratChainMap, implChainSet, chainNode);
//如果一个chain不为抽象chain则进行解析
JsonNode chainNameJsonNode = Optional.ofNullable(chainNode.get(ID)).orElse(chainNode.get(NAME));
String chainId = Optional.ofNullable(chainNameJsonNode).map(JsonNode::textValue).orElse(null);
if(!abstratChainMap.containsKey(chainId)){
parseOneChainConsumer.accept(chainNode);
}
}
}
}
ParserHelper.buildNode(nodePropBean);
}
}
}
}
/**
* 解析一个chain的过程
* @param chainNode chain 节点
*/
public static void parseOneChainEl(JsonNode chainNode) {
// 构建chainBuilder
String chainId = Optional.ofNullable(chainNode.get(ID)).orElse(chainNode.get(NAME)).textValue();
public static void parseChainJson(List<JsonNode> flowJsonObjectList, Set<String> chainIdSet,
Consumer<JsonNode> parseOneChainConsumer) {
//用于存放抽象chain的map
Map<String,JsonNode> abstratChainMap = new HashMap<>();
//用于存放已经解析过的实现chain
Set<JsonNode> implChainSet = new HashSet<>();
// 先在元数据里放上chain
// 先放有一个好处可以在parse的时候先映射到FlowBus的chainMap然后再去解析
// 这样就不用去像之前的版本那样回归调用
// 同时也解决了不能循环依赖的问题
flowJsonObjectList.forEach(jsonObject -> {
// 解析chain节点
Iterator<JsonNode> iterator = jsonObject.get(FLOW).get(CHAIN).elements();
// 先在元数据里放上chain
while (iterator.hasNext()) {
JsonNode innerJsonObject = iterator.next();
// 校验加载的 chainName 是否有重复的
// TODO 这里是否有个问题当混合格式加载的时候2个同名的Chain在不同的文件里就不行了
JsonNode chainNameJsonNode = Optional.ofNullable(innerJsonObject.get(ID))
.orElse(innerJsonObject.get(NAME));
String chainId = Optional.ofNullable(chainNameJsonNode).map(JsonNode::textValue).orElse(null);
// 检查 chainName
checkChainId(chainId, innerJsonObject.toString());
if (!chainIdSet.add(chainId)) {
throw new ChainDuplicateException(String.format("[chain id duplicate] chainId=%s", chainId));
}
JsonNode routeJsonNode = chainNode.get(ROUTE);
// 如果是禁用就不解析了
if (!getEnableByJsonNode(innerJsonObject)) {
continue;
}
LiteFlowChainELBuilder builder = LiteFlowChainELBuilder.createChain().setChainId(chainId);
FlowBus.addChain(chainId);
if(ElRegexUtil.isAbstractChain(innerJsonObject.get(VALUE).textValue())){
abstratChainMap.put(chainId,innerJsonObject);
//如果是抽象chain则向其中添加一个AbstractCondition,用于标记这个chain为抽象chain
Chain chain = FlowBus.getChain(chainId);
chain.getConditionList().add(new AbstractCondition());
}
}
});
// 清空
chainIdSet.clear();
// 如果有route这个标签说明是决策表chain
// 决策表链路必须有route和body这两个标签
if (routeJsonNode != null){
builder.setRoute(routeJsonNode.textValue());
for (JsonNode flowJsonNode : flowJsonObjectList) {
// 解析每一个chain
Iterator<JsonNode> chainIterator = flowJsonNode.get(FLOW).get(CHAIN).elements();
while (chainIterator.hasNext()) {
JsonNode chainNode = chainIterator.next();
// 如果是禁用就不解析了
if (!getEnableByJsonNode(chainNode)) {
continue;
}
JsonNode bodyJsonNode = chainNode.get(BODY);
if (bodyJsonNode == null){
String errMsg = StrUtil.format("If you have defined the field route, then you must define the field body in chain[{}]", chainId);
throw new FlowSystemException(errMsg);
}
builder.setEL(bodyJsonNode.textValue());
}else{
builder.setEL(chainNode.get(VALUE).textValue());
}
//首先需要对继承自抽象Chain的chain进行字符串替换
parseImplChain(abstratChainMap, implChainSet, chainNode);
//如果一个chain不为抽象chain则进行解析
JsonNode chainNameJsonNode = Optional.ofNullable(chainNode.get(ID)).orElse(chainNode.get(NAME));
String chainId = Optional.ofNullable(chainNameJsonNode).map(JsonNode::textValue).orElse(null);
if(!abstratChainMap.containsKey(chainId)){
parseOneChainConsumer.accept(chainNode);
}
}
}
}
builder.build();
}
/**
* 解析一个chain的过程
* @param chainNode chain 节点
*/
public static void parseOneChainEl(JsonNode chainNode) {
// 构建chainBuilder
String chainId = Optional.ofNullable(chainNode.get(ID)).orElse(chainNode.get(NAME)).textValue();
/**
* 解析一个chain的过程
* @param e chain 节点
*/
public static void parseOneChainEl(Element e) {
// 构建chainBuilder
String chainId = Optional.ofNullable(e.attributeValue(ID)).orElse(e.attributeValue(NAME));
JsonNode routeJsonNode = chainNode.get(ROUTE);
Element routeElement = e.element(ROUTE);
LiteFlowChainELBuilder builder = LiteFlowChainELBuilder.createChain().setChainId(chainId);
LiteFlowChainELBuilder builder = LiteFlowChainELBuilder.createChain().setChainId(chainId);
// 如果有route这个标签说明是决策表chain
// 决策表链路必须有route和body这两个标签
if (routeJsonNode != null){
builder.setRoute(routeJsonNode.textValue());
// 如果有route这个标签说明是决策表chain
// 决策表链路必须有route和body这两个标签
if (routeElement != null){
builder.setRoute(ElRegexUtil.removeComments(routeElement.getText()));
JsonNode bodyJsonNode = chainNode.get(BODY);
if (bodyJsonNode == null){
String errMsg = StrUtil.format("If you have defined the field route, then you must define the field body in chain[{}]", chainId);
throw new FlowSystemException(errMsg);
}
builder.setEL(bodyJsonNode.textValue());
}else{
builder.setEL(chainNode.get(VALUE).textValue());
}
Element bodyElement = e.element(BODY);
if (bodyElement == null){
String errMsg = StrUtil.format("If you have defined the tag <route>, then you must define the tag <body> in chain[{}]", chainId);
throw new FlowSystemException(errMsg);
}
builder.setEL(ElRegexUtil.removeComments(bodyElement.getText()));
}else{
builder.setEL(ElRegexUtil.removeComments(e.getText()));
}
builder.build();
}
builder.build();
}
/**
* 解析一个chain的过程
* @param e chain 节点
*/
public static void parseOneChainEl(Element e) {
// 构建chainBuilder
String chainId = Optional.ofNullable(e.attributeValue(ID)).orElse(e.attributeValue(NAME));
/**
* 检查 chainId
* @param chainId chainId
* @param elData elData
*/
private static void checkChainId(String chainId, String elData) {
if (StrUtil.isBlank(chainId)) {
throw new ParseException("missing chain id in expression \r\n" + elData);
}
}
Element routeElement = e.element(ROUTE);
/**
* 解析一个带继承关系的Chain,xml格式
* @param chain 实现Chain
* @param abstratChainMap 所有的抽象Chain
* @param implChainSet 已经解析过的实现Chain
*/
private static void parseImplChain(Map<String, Element> abstratChainMap, Set<Element> implChainSet, Element chain) {
if(ObjectUtil.isNotNull(chain.attributeValue(EXTENDS))){
String baseChainId = chain.attributeValue(EXTENDS);
Element baseChain = abstratChainMap.get(baseChainId);
if(baseChain!=null) {
internalParseImplChain(baseChain,chain,abstratChainMap,implChainSet);
}else{
throw new ChainNotFoundException(StrUtil.format("[abstract chain not found] chainName={}", baseChainId));
}
}
}
LiteFlowChainELBuilder builder = LiteFlowChainELBuilder.createChain().setChainId(chainId);
/**
* 解析一个带继承关系的Chain,json格式
* @param chainNode 实现Chain
* @param abstratChainMap 所有的抽象Chain
* @param implChainSet 已经解析过的实现Chain
*/
private static void parseImplChain(Map<String, JsonNode> abstratChainMap, Set<JsonNode> implChainSet, JsonNode chainNode) {
if(chainNode.hasNonNull(EXTENDS)){
String baseChainId = chainNode.get(EXTENDS).textValue();
JsonNode baseChain= abstratChainMap.get(baseChainId);
if(baseChain!=null) {
internalParseImplChain(baseChain,chainNode,abstratChainMap,implChainSet);
}else{
throw new ChainNotFoundException(StrUtil.format("[abstract chain not found] chainName={}", baseChainId));
}
}
}
// 如果有route这个标签说明是决策表chain
// 决策表链路必须有route和body这两个标签
if (routeElement != null){
builder.setRoute(ElRegexUtil.removeComments(routeElement.getText()));
/**
* 解析一个继承自baseChain的implChain,xml格式
* @param baseChain 父Chain
* @param implChain 实现Chain
* @param abstractChainMap 所有的抽象Chain
* @param implChainSet 已经解析过的实现Chain
*/
private static void internalParseImplChain(JsonNode baseChain,JsonNode implChain,Map<String,JsonNode> abstractChainMap,Set<JsonNode> implChainSet) {
//如果已经解析过了就不再解析
if(implChainSet.contains(implChain)) return;
//如果baseChainId也是继承自其他的chain需要递归解析
parseImplChain(abstractChainMap, implChainSet, baseChain);
//否则根据baseChainId解析implChainId
String implChainEl = implChain.get(VALUE).textValue();
String baseChainEl = baseChain.get(VALUE).textValue();
//替换baseChainId中的implChainId
// 使用正则表达式匹配占位符并替换
String parsedEl = ElRegexUtil.replaceAbstractChain(baseChainEl,implChainEl);
ObjectNode objectNode = (ObjectNode) implChain;
objectNode.put(VALUE,parsedEl);
implChainSet.add(implChain);
}
Element bodyElement = e.element(BODY);
if (bodyElement == null){
String errMsg = StrUtil.format("If you have defined the tag <route>, then you must define the tag <body> in chain[{}]", chainId);
throw new FlowSystemException(errMsg);
}
builder.setEL(ElRegexUtil.removeComments(bodyElement.getText()));
}else{
builder.setEL(ElRegexUtil.removeComments(e.getText()));
}
/**
* 解析一个继承自baseChain的implChain,json格式
* @param baseChain 父Chain
* @param implChain 实现Chain
* @param abstractChainMap 所有的抽象Chain
* @param implChainSet 已经解析过的实现Chain
*/
private static void internalParseImplChain(Element baseChain,Element implChain,Map<String,Element> abstractChainMap,Set<Element> implChainSet) {
//如果已经解析过了就不再解析
if(implChainSet.contains(implChain)) return;
//如果baseChainId也是继承自其他的chain需要递归解析
parseImplChain(abstractChainMap, implChainSet, baseChain);
//否则根据baseChainId解析implChainId
String implChainEl = implChain.getText();
String baseChainEl = baseChain.getText();
//替换baseChainId中的implChainId
// 使用正则表达式匹配占位符并替换
String parsedEl = ElRegexUtil.replaceAbstractChain(baseChainEl,implChainEl);
implChain.setText(parsedEl);
implChainSet.add(implChain);
}
builder.build();
}
/**
* 检查 chainId
* @param chainId chainId
* @param elData elData
*/
private static void checkChainId(String chainId, String elData) {
if (StrUtil.isBlank(chainId)) {
throw new ParseException("missing chain id in expression \r\n" + elData);
}
}
/**
* 解析一个带继承关系的Chain,xml格式
* @param chain 实现Chain
* @param abstratChainMap 所有的抽象Chain
* @param implChainSet 已经解析过的实现Chain
*/
private static void parseImplChain(Map<String, Element> abstratChainMap, Set<Element> implChainSet, Element chain) {
if(ObjectUtil.isNotNull(chain.attributeValue(EXTENDS))){
String baseChainId = chain.attributeValue(EXTENDS);
Element baseChain = abstratChainMap.get(baseChainId);
if(baseChain!=null) {
internalParseImplChain(baseChain,chain,abstratChainMap,implChainSet);
}else{
throw new ChainNotFoundException(StrUtil.format("[abstract chain not found] chainName={}", baseChainId));
}
}
}
/**
* 解析一个带继承关系的Chain,json格式
* @param chainNode 实现Chain
* @param abstratChainMap 所有的抽象Chain
* @param implChainSet 已经解析过的实现Chain
*/
private static void parseImplChain(Map<String, JsonNode> abstratChainMap, Set<JsonNode> implChainSet, JsonNode chainNode) {
if(chainNode.hasNonNull(EXTENDS)){
String baseChainId = chainNode.get(EXTENDS).textValue();
JsonNode baseChain= abstratChainMap.get(baseChainId);
if(baseChain!=null) {
internalParseImplChain(baseChain,chainNode,abstratChainMap,implChainSet);
}else{
throw new ChainNotFoundException(StrUtil.format("[abstract chain not found] chainName={}", baseChainId));
}
}
}
/**
* 解析一个继承自baseChain的implChain,xml格式
* @param baseChain 父Chain
* @param implChain 实现Chain
* @param abstractChainMap 所有的抽象Chain
* @param implChainSet 已经解析过的实现Chain
*/
private static void internalParseImplChain(JsonNode baseChain,JsonNode implChain,Map<String,JsonNode> abstractChainMap,Set<JsonNode> implChainSet) {
//如果已经解析过了就不再解析
if(implChainSet.contains(implChain)) return;
//如果baseChainId也是继承自其他的chain需要递归解析
parseImplChain(abstractChainMap, implChainSet, baseChain);
//否则根据baseChainId解析implChainId
String implChainEl = implChain.get(VALUE).textValue();
String baseChainEl = baseChain.get(VALUE).textValue();
//替换baseChainId中的implChainId
// 使用正则表达式匹配占位符并替换
String parsedEl = ElRegexUtil.replaceAbstractChain(baseChainEl,implChainEl);
ObjectNode objectNode = (ObjectNode) implChain;
objectNode.put(VALUE,parsedEl);
implChainSet.add(implChain);
}
/**
* 解析一个继承自baseChain的implChain,json格式
* @param baseChain 父Chain
* @param implChain 实现Chain
* @param abstractChainMap 所有的抽象Chain
* @param implChainSet 已经解析过的实现Chain
*/
private static void internalParseImplChain(Element baseChain,Element implChain,Map<String,Element> abstractChainMap,Set<Element> implChainSet) {
//如果已经解析过了就不再解析
if(implChainSet.contains(implChain)) return;
//如果baseChainId也是继承自其他的chain需要递归解析
parseImplChain(abstractChainMap, implChainSet, baseChain);
//否则根据baseChainId解析implChainId
String implChainEl = implChain.getText();
String baseChainEl = baseChain.getText();
//替换baseChainId中的implChainId
// 使用正则表达式匹配占位符并替换
String parsedEl = ElRegexUtil.replaceAbstractChain(baseChainEl,implChainEl);
implChain.setText(parsedEl);
implChainSet.add(implChain);
}
private static Boolean getEnableByElement(Element element) {
String enableStr = element.attributeValue(ENABLE);
if (StrUtil.isBlank(enableStr)) {
return true;
}
return Boolean.TRUE.toString().equalsIgnoreCase(enableStr);
}
private static Boolean getEnableByJsonNode(JsonNode nodeObject) {
String enableStr = nodeObject.hasNonNull(ENABLE) ? nodeObject.get(ENABLE).toString() : "";
if (StrUtil.isBlank(enableStr)) {
return true;
}
return Boolean.TRUE.toString().equalsIgnoreCase(enableStr);
}
}

View File

@ -0,0 +1,108 @@
package com.yomahub.liteflow.util;
import cn.hutool.core.lang.Pair;
import cn.hutool.core.text.StrPool;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.parser.helper.NodeConvertHelper;
/**
* 插件通用工具类
*
* @author gaibu
*/
public class RuleParsePluginUtil {
private static final String CHAIN_XML_PATTERN = "<chain id=\"{}\" enable=\"{}\">{}</chain>";
private static final String NODE_ITEM_XML_PATTERN = "<node id=\"{}\" name=\"{}\" type=\"{}\" enable=\"{}\"><![CDATA[{}]]></node>";
private static final String NODE_ITEM_WITH_LANGUAGE_XML_PATTERN = "<node id=\"{}\" name=\"{}\" type=\"{}\" language=\"{}\" enable=\"{}\"><![CDATA[{}]]></node>";
public static ChainDto parseChainKey(String chainKey) {
String[] chainProp = chainKey.split(StrPool.COLON);
if (chainProp.length == 2) {
return new ChainDto(chainProp[0], chainProp[1]);
}
return new ChainDto(chainKey);
}
public static String toScriptXml(NodeConvertHelper.NodeSimpleVO simpleVO) {
if (StrUtil.isNotBlank(simpleVO.getLanguage())) {
return StrUtil.format(NODE_ITEM_WITH_LANGUAGE_XML_PATTERN,
simpleVO.getNodeId(),
simpleVO.getName(),
simpleVO.getType(),
simpleVO.getLanguage(),
simpleVO.getEnable(),
simpleVO.getScript()
);
} else {
return StrUtil.format(NODE_ITEM_XML_PATTERN,
simpleVO.getNodeId(),
simpleVO.getName(),
simpleVO.getType(),
simpleVO.getEnable(),
simpleVO.getScript()
);
}
}
public static Pair<Boolean/*启停*/, String/*id*/> parseIdKey(String idKey) {
String[] idProp = idKey.split(StrPool.COLON);
if (idProp.length == 2) {
String id = idProp[0];
String enableStr = idProp[1];
return new Pair<>(Boolean.TRUE.toString().equalsIgnoreCase(enableStr), id);
}
return new Pair<>(Boolean.TRUE, idKey);
}
public static class ChainDto {
/**
* chain id
*/
private String id;
/**
* chain 启用状态默认启用
*/
private String enable = Boolean.TRUE.toString();
public boolean isDisable() {
return !isEnable();
}
public boolean isEnable() {
return Boolean.TRUE.toString().equalsIgnoreCase(enable);
}
public ChainDto(String chainId) {
ChainDto chainDto = new ChainDto(chainId, null);
this.enable = chainDto.getEnable();
this.id = chainDto.getId();
}
public ChainDto(String chainId, String enable) {
this.id = chainId;
if (StrUtil.isBlank(enable)) {
this.enable = Boolean.TRUE.toString();
return;
}
if (Boolean.TRUE.toString().equalsIgnoreCase(enable)) {
this.enable = Boolean.TRUE.toString();
return;
}
this.enable = Boolean.FALSE.toString();
}
public String getId() {
return id;
}
public String getEnable() {
return enable;
}
public String toElXml(String elContent) {
return StrUtil.format(CHAIN_XML_PATTERN, id, enable, elContent);
}
}
}

View File

@ -2,8 +2,8 @@ package com.yomahub.liteflow.parser.apollo.util;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.lang.Pair;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.ReUtil;
import cn.hutool.core.util.StrUtil;
import com.ctrip.framework.apollo.Config;
import com.ctrip.framework.apollo.ConfigService;
@ -17,6 +17,7 @@ import com.yomahub.liteflow.parser.apollo.exception.ApolloException;
import com.yomahub.liteflow.parser.apollo.vo.ApolloParserConfigVO;
import com.yomahub.liteflow.parser.helper.NodeConvertHelper;
import com.yomahub.liteflow.spi.holder.ContextAwareHolder;
import com.yomahub.liteflow.util.RuleParsePluginUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -35,12 +36,8 @@ public class ApolloParseHelper {
private static final Logger LOG = LoggerFactory.getLogger(ApolloParseHelper.class);
private final String CHAIN_XML_PATTERN = "<chain name=\"{}\">{}</chain>";
private final String NODE_XML_PATTERN = "<nodes>{}</nodes>";
private final String NODE_ITEM_XML_PATTERN = "<node id=\"{}\" name=\"{}\" type=\"{}\"><![CDATA[{}]]></node>";
private final String XML_PATTERN = "<?xml version=\"1.0\" encoding=\"UTF-8\"?><flow>{}{}</flow>";
private final ApolloParserConfigVO apolloParserConfigVO;
@ -81,7 +78,7 @@ public class ApolloParseHelper {
// 1. handle chain
Set<String> propertyNames = chainConfig.getPropertyNames();
List<String> chainItemContentList = propertyNames.stream()
.map(item -> StrUtil.format(CHAIN_XML_PATTERN, item, chainConfig.getProperty(item, StrUtil.EMPTY)))
.map(item -> RuleParsePluginUtil.parseChainKey(item).toElXml(chainConfig.getProperty(item, StrUtil.EMPTY)))
.collect(Collectors.toList());
// merge all chain content
String chainAllContent = CollUtil.join(chainItemContentList, StrUtil.EMPTY);
@ -95,8 +92,7 @@ public class ApolloParseHelper {
List<String> scriptItemContentList = scriptNamespaces.stream()
.map(item -> convert(item, scriptConfig.getProperty(item, StrUtil.EMPTY)))
.filter(Objects::nonNull)
.map(item -> StrUtil.format(NODE_ITEM_XML_PATTERN, item.getNodeId(), item.getName(), item.getType(),
item.getScript()))
.map(RuleParsePluginUtil::toScriptXml)
.collect(Collectors.toList());
scriptAllContent = StrUtil.format(NODE_XML_PATTERN,
@ -118,16 +114,25 @@ public class ApolloParseHelper {
ConfigChange configChange = changeEvent.getChange(changeKey);
String newValue = configChange.getNewValue();
PropertyChangeType changeType = configChange.getChangeType();
Pair<Boolean/*启停*/, String/*id*/> pair = RuleParsePluginUtil.parseIdKey(changeKey);
String id = pair.getValue();
switch (changeType) {
case ADDED:
case MODIFIED:
LOG.info("starting reload flow config... {} key={} value={},", changeType.name(), changeKey,
newValue);
LiteFlowChainELBuilder.createChain().setChainId(changeKey).setEL(newValue).build();
// 如果是启用就正常更新
if (pair.getKey()) {
LiteFlowChainELBuilder.createChain().setChainId(id).setEL(newValue).build();
}
// 如果是禁用就删除
else {
FlowBus.removeChain(id);
}
break;
case DELETED:
LOG.info("starting reload flow config... delete key={}", changeKey);
FlowBus.removeChain(changeKey);
FlowBus.removeChain(id);
break;
default:
}
@ -142,7 +147,7 @@ public class ApolloParseHelper {
if (DELETED.equals(changeType)) {
newValue = null;
}
NodeSimpleVO nodeSimpleVO = convert(changeKey, newValue);
NodeConvertHelper.NodeSimpleVO nodeSimpleVO = convert(changeKey, newValue);
if (Objects.isNull(nodeSimpleVO)) {
// key不符合规范的时候直接忽略
LOG.error("key={} is not a valid node config, ignore it", changeKey);
@ -154,12 +159,20 @@ public class ApolloParseHelper {
LOG.info("starting reload flow config... {} key={} value={},", changeType.name(), changeKey,
newValue);
LiteFlowNodeBuilder.createScriptNode()
.setId(nodeSimpleVO.getNodeId())
.setType(NodeTypeEnum.getEnumByCode(nodeSimpleVO.getType()))
.setName(nodeSimpleVO.getName())
.setScript(nodeSimpleVO.getScript())
.build();
// 启用就正常更新
if (nodeSimpleVO.getEnable()) {
LiteFlowNodeBuilder.createScriptNode()
.setId(nodeSimpleVO.getNodeId())
.setType(NodeTypeEnum.getEnumByCode(nodeSimpleVO.getType()))
.setName(nodeSimpleVO.getName())
.setScript(nodeSimpleVO.getScript())
.setLanguage(nodeSimpleVO.getLanguage())
.build();
}
// 禁用就删除
else {
FlowBus.getNodeMap().remove(nodeSimpleVO.getNodeId());
}
break;
case DELETED:
LOG.info("starting reload flow config... delete key={}", changeKey);
@ -171,72 +184,13 @@ public class ApolloParseHelper {
}
}
private NodeSimpleVO convert(String key, String value) {
// 不需要去理解这串正则就是一个匹配冒号的
// 一定得是a:b或是a:b:c...这种完整类型的字符串的
List<String> matchItemList = ReUtil.findAllGroup0("(?<=[^:]:)[^:]+|[^:]+(?=:[^:])", key);
if (CollUtil.isEmpty(matchItemList)) {
return null;
}
NodeSimpleVO nodeSimpleVO = new NodeSimpleVO();
if (matchItemList.size() > 1) {
nodeSimpleVO.setNodeId(matchItemList.get(0));
nodeSimpleVO.setType(matchItemList.get(1));
}
if (matchItemList.size() > 2) {
nodeSimpleVO.setName(matchItemList.get(2));
}
private NodeConvertHelper.NodeSimpleVO convert(String key, String value) {
NodeConvertHelper.NodeSimpleVO nodeSimpleVO = NodeConvertHelper.convert(key);
// set script
nodeSimpleVO.setScript(value);
return nodeSimpleVO;
}
private static class NodeSimpleVO {
private String nodeId;
private String type;
private String name = StrUtil.EMPTY;
private String script;
public String getNodeId() {
return nodeId;
}
public void setNodeId(String nodeId) {
this.nodeId = nodeId;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getScript() {
return script;
}
public void setScript(String script) {
this.script = script;
}
}
}

View File

@ -1,10 +1,9 @@
package com.yomahub.liteflow.parser.etcd.util;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.io.file.FileNameUtil;
import cn.hutool.core.lang.Pair;
import cn.hutool.core.util.CharsetUtil;
import cn.hutool.core.util.ReUtil;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.builder.LiteFlowNodeBuilder;
import com.yomahub.liteflow.builder.el.LiteFlowChainELBuilder;
@ -13,7 +12,9 @@ import com.yomahub.liteflow.flow.FlowBus;
import com.yomahub.liteflow.parser.etcd.EtcdClient;
import com.yomahub.liteflow.parser.etcd.exception.EtcdException;
import com.yomahub.liteflow.parser.etcd.vo.EtcdParserVO;
import com.yomahub.liteflow.parser.helper.NodeConvertHelper;
import com.yomahub.liteflow.spi.holder.ContextAwareHolder;
import com.yomahub.liteflow.util.RuleParsePluginUtil;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.ClientBuilder;
@ -31,200 +32,159 @@ import java.util.stream.Collectors;
*/
public class EtcdParserHelper {
private static final Logger LOG = LoggerFactory.getLogger(EtcdParserHelper.class);
private static final Logger LOG = LoggerFactory.getLogger(EtcdParserHelper.class);
private final String CHAIN_XML_PATTERN = "<chain name=\"{}\">{}</chain>";
private final String NODE_XML_PATTERN = "<nodes>{}</nodes>";
private final String NODE_XML_PATTERN = "<nodes>{}</nodes>";
private final String XML_PATTERN = "<?xml version=\"1.0\" encoding=\"UTF-8\"?><flow>{}{}</flow>";
private final String NODE_ITEM_XML_PATTERN = "<node id=\"{}\" name=\"{}\" type=\"{}\"><![CDATA[{}]]></node>";
private static final String SEPARATOR = "/";
private final String XML_PATTERN = "<?xml version=\"1.0\" encoding=\"UTF-8\"?><flow>{}{}</flow>";
private final EtcdParserVO etcdParserVO;
private static final String SEPARATOR = "/";
private EtcdClient client;
private final EtcdParserVO etcdParserVO;
public EtcdParserHelper(EtcdParserVO etcdParserVO) {
this.etcdParserVO = etcdParserVO;
private EtcdClient client;
try {
try {
this.client = ContextAwareHolder.loadContextAware().getBean(EtcdClient.class);
} catch (Exception ignored) {
}
if (this.client == null) {
ClientBuilder clientBuilder = Client.builder().endpoints(etcdParserVO.getEndpoints().split(","));
if (StrUtil.isNotBlank(etcdParserVO.getNamespace())) {
clientBuilder.namespace(ByteSequence.from(etcdParserVO.getNamespace(), CharsetUtil.CHARSET_UTF_8));
}
if (StrUtil.isAllNotBlank(etcdParserVO.getUser(), etcdParserVO.getPassword())) {
clientBuilder.user(ByteSequence.from(etcdParserVO.getUser(), CharsetUtil.CHARSET_UTF_8));
clientBuilder.password(ByteSequence.from(etcdParserVO.getPassword(), CharsetUtil.CHARSET_UTF_8));
}
this.client = new EtcdClient(clientBuilder.build());
}
} catch (Exception e) {
throw new EtcdException(e.getMessage());
}
}
public EtcdParserHelper(EtcdParserVO etcdParserVO) {
this.etcdParserVO = etcdParserVO;
public String getContent() {
try {
// 检查chainPath路径下有没有子节点
List<String> chainNameList = client.getChildrenKeys(etcdParserVO.getChainPath(), SEPARATOR);
try {
try {
this.client = ContextAwareHolder.loadContextAware().getBean(EtcdClient.class);
}
catch (Exception ignored) {
}
if (this.client == null) {
ClientBuilder clientBuilder = Client.builder().endpoints(etcdParserVO.getEndpoints().split(","));
if (StrUtil.isNotBlank(etcdParserVO.getNamespace())) {
clientBuilder.namespace(ByteSequence.from(etcdParserVO.getNamespace(), CharsetUtil.CHARSET_UTF_8));
}
if (StrUtil.isAllNotBlank(etcdParserVO.getUser(), etcdParserVO.getPassword())) {
clientBuilder.user(ByteSequence.from(etcdParserVO.getUser(), CharsetUtil.CHARSET_UTF_8));
clientBuilder.password(ByteSequence.from(etcdParserVO.getPassword(), CharsetUtil.CHARSET_UTF_8));
}
this.client = new EtcdClient(clientBuilder.build());
}
}
catch (Exception e) {
throw new EtcdException(e.getMessage());
}
}
// 获取chainPath路径下的所有子节点内容List
List<String> chainItemContentList = new ArrayList<>();
for (String chainName : chainNameList) {
RuleParsePluginUtil.ChainDto chainDto = RuleParsePluginUtil.parseChainKey(chainName);
String chainData = client.get(StrUtil.format("{}/{}", etcdParserVO.getChainPath(), chainName));
if (StrUtil.isNotBlank(chainData)) {
chainItemContentList.add(chainDto.toElXml(chainData));
}
}
// 合并成所有chain的xml内容
String chainAllContent = CollUtil.join(chainItemContentList, StrUtil.EMPTY);
public String getContent() {
try {
// 检查chainPath路径下有没有子节点
List<String> chainNameList = client.getChildrenKeys(etcdParserVO.getChainPath(), SEPARATOR);
// 检查是否有脚本内容如果有进行脚本内容的获取
String scriptAllContent = StrUtil.EMPTY;
if (hasScript()) {
List<String> scriptNodeValueList = client.getChildrenKeys(etcdParserVO.getScriptPath(), SEPARATOR)
.stream()
.filter(StrUtil::isNotBlank)
.collect(Collectors.toList());
// 获取chainPath路径下的所有子节点内容List
List<String> chainItemContentList = new ArrayList<>();
for (String chainName : chainNameList) {
String chainData = client.get(StrUtil.format("{}/{}", etcdParserVO.getChainPath(), chainName));
if (StrUtil.isNotBlank(chainData)) {
chainItemContentList.add(StrUtil.format(CHAIN_XML_PATTERN, chainName, chainData));
}
}
// 合并成所有chain的xml内容
String chainAllContent = CollUtil.join(chainItemContentList, StrUtil.EMPTY);
List<String> scriptItemContentList = new ArrayList<>();
for (String scriptNodeValue : scriptNodeValueList) {
NodeConvertHelper.NodeSimpleVO nodeSimpleVO = NodeConvertHelper.convert(scriptNodeValue);
if (Objects.isNull(nodeSimpleVO)) {
throw new EtcdException(
StrUtil.format("The name of the etcd node is invalid:{}", scriptNodeValue));
}
String scriptData = client
.get(StrUtil.format("{}/{}", etcdParserVO.getScriptPath(), scriptNodeValue));
// 检查是否有脚本内容如果有进行脚本内容的获取
String scriptAllContent = StrUtil.EMPTY;
if (hasScript()) {
List<String> scriptNodeValueList = client.getChildrenKeys(etcdParserVO.getScriptPath(), SEPARATOR)
.stream()
.filter(StrUtil::isNotBlank)
.collect(Collectors.toList());
nodeSimpleVO.setScript(scriptData);
scriptItemContentList.add(RuleParsePluginUtil.toScriptXml(nodeSimpleVO));
}
List<String> scriptItemContentList = new ArrayList<>();
for (String scriptNodeValue : scriptNodeValueList) {
NodeSimpleVO nodeSimpleVO = convert(scriptNodeValue);
if (Objects.isNull(nodeSimpleVO)) {
throw new EtcdException(
StrUtil.format("The name of the etcd node is invalid:{}", scriptNodeValue));
}
String scriptData = client
.get(StrUtil.format("{}/{}", etcdParserVO.getScriptPath(), scriptNodeValue));
scriptItemContentList.add(StrUtil.format(NODE_ITEM_XML_PATTERN, nodeSimpleVO.getNodeId(),
nodeSimpleVO.getName(), nodeSimpleVO.getType(), scriptData));
}
scriptAllContent = StrUtil.format(NODE_XML_PATTERN,
CollUtil.join(scriptItemContentList, StrUtil.EMPTY));
}
scriptAllContent = StrUtil.format(NODE_XML_PATTERN,
CollUtil.join(scriptItemContentList, StrUtil.EMPTY));
}
return StrUtil.format(XML_PATTERN, scriptAllContent, chainAllContent);
} catch (Exception e) {
throw new EtcdException(e.getMessage());
}
}
return StrUtil.format(XML_PATTERN, scriptAllContent, chainAllContent);
}
catch (Exception e) {
throw new EtcdException(e.getMessage());
}
}
public boolean hasScript() {
// 没有配置scriptPath
if (StrUtil.isBlank(etcdParserVO.getScriptPath())) {
return false;
}
public boolean hasScript() {
// 没有配置scriptPath
if (StrUtil.isBlank(etcdParserVO.getScriptPath())) {
return false;
}
try {
// 存在这个节点但是子节点不存在
List<String> chainNameList = client.getChildrenKeys(etcdParserVO.getScriptPath(), SEPARATOR);
return !CollUtil.isEmpty(chainNameList);
} catch (Exception e) {
return false;
}
}
try {
// 存在这个节点但是子节点不存在
List<String> chainNameList = client.getChildrenKeys(etcdParserVO.getScriptPath(), SEPARATOR);
return !CollUtil.isEmpty(chainNameList);
}
catch (Exception e) {
return false;
}
}
/**
* 监听 etcd 节点
*/
public void listen() {
this.client.watchChildChange(this.etcdParserVO.getChainPath(), (updatePath, updateValue) -> {
LOG.info("starting reload flow config... update path={} value={},", updatePath, updateValue);
String changeKey = FileNameUtil.getName(updatePath);
Pair<Boolean/*启停*/, String/*id*/> pair = RuleParsePluginUtil.parseIdKey(changeKey);
Boolean enable = pair.getKey();
String id = pair.getValue();
// 如果是启用就正常更新
if (pair.getKey()) {
LiteFlowChainELBuilder.createChain().setChainId(id).setEL(updateValue).build();
}
// 如果是禁用就删除
else {
FlowBus.removeChain(id);
}
}, (deletePath) -> {
LOG.info("starting reload flow config... delete path={}", deletePath);
String chainName = FileNameUtil.getName(deletePath);
Pair<Boolean/*启停*/, String/*id*/> pair = RuleParsePluginUtil.parseIdKey(chainName);
FlowBus.removeChain(pair.getValue());
});
/**
* 监听 etcd 节点
*/
public void listen() {
this.client.watchChildChange(this.etcdParserVO.getChainPath(), (updatePath, updateValue) -> {
LOG.info("starting reload flow config... update path={} value={},", updatePath, updateValue);
String chainName = FileNameUtil.getName(updatePath);
LiteFlowChainELBuilder.createChain().setChainId(chainName).setEL(updateValue).build();
}, (deletePath) -> {
LOG.info("starting reload flow config... delete path={}", deletePath);
String chainName = FileNameUtil.getName(deletePath);
FlowBus.removeChain(chainName);
});
if (StrUtil.isNotBlank(this.etcdParserVO.getScriptPath())) {
this.client.watchChildChange(this.etcdParserVO.getScriptPath(), (updatePath, updateValue) -> {
LOG.info("starting reload flow config... update path={} value={}", updatePath, updateValue);
String scriptNodeValue = FileNameUtil.getName(updatePath);
NodeConvertHelper.NodeSimpleVO nodeSimpleVO = NodeConvertHelper.convert(scriptNodeValue);
// 启用就正常更新
if (nodeSimpleVO.getEnable()) {
LiteFlowNodeBuilder.createScriptNode()
.setId(nodeSimpleVO.getNodeId())
.setType(NodeTypeEnum.getEnumByCode(nodeSimpleVO.getType()))
.setName(nodeSimpleVO.getName())
.setScript(nodeSimpleVO.getScript())
.setLanguage(nodeSimpleVO.getLanguage())
.build();
}
// 禁用就删除
else {
FlowBus.getNodeMap().remove(nodeSimpleVO.getNodeId());
}
}, (deletePath) -> {
LOG.info("starting reload flow config... delete path={}", deletePath);
String scriptNodeValue = FileNameUtil.getName(deletePath);
NodeConvertHelper.NodeSimpleVO nodeSimpleVO = NodeConvertHelper.convert(scriptNodeValue);
FlowBus.getNodeMap().remove(nodeSimpleVO.getNodeId());
});
}
}
if (StrUtil.isNotBlank(this.etcdParserVO.getScriptPath())) {
this.client.watchChildChange(this.etcdParserVO.getScriptPath(), (updatePath, updateValue) -> {
LOG.info("starting reload flow config... update path={} value={}", updatePath, updateValue);
String scriptNodeValue = FileNameUtil.getName(updatePath);
NodeSimpleVO nodeSimpleVO = convert(scriptNodeValue);
LiteFlowNodeBuilder.createScriptNode()
.setId(nodeSimpleVO.getNodeId())
.setType(NodeTypeEnum.getEnumByCode(nodeSimpleVO.type))
.setName(nodeSimpleVO.getName())
.setScript(updateValue)
.build();
}, (deletePath) -> {
LOG.info("starting reload flow config... delete path={}", deletePath);
String scriptNodeValue = FileNameUtil.getName(deletePath);
NodeSimpleVO nodeSimpleVO = convert(scriptNodeValue);
FlowBus.getNodeMap().remove(nodeSimpleVO.getNodeId());
});
}
}
public NodeSimpleVO convert(String str) {
// 不需要去理解这串正则就是一个匹配冒号的
// 一定得是a:b或是a:b:c...这种完整类型的字符串的
List<String> matchItemList = ReUtil.findAllGroup0("(?<=[^:]:)[^:]+|[^:]+(?=:[^:])", str);
if (CollUtil.isEmpty(matchItemList)) {
return null;
}
NodeSimpleVO nodeSimpleVO = new NodeSimpleVO();
if (matchItemList.size() > 1) {
nodeSimpleVO.setNodeId(matchItemList.get(0));
nodeSimpleVO.setType(matchItemList.get(1));
}
if (matchItemList.size() > 2) {
nodeSimpleVO.setName(matchItemList.get(2));
}
return nodeSimpleVO;
}
private static class NodeSimpleVO {
private String nodeId;
private String type;
private String name = "";
public String getNodeId() {
return nodeId;
}
public void setNodeId(String nodeId) {
this.nodeId = nodeId;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
}

View File

@ -1,5 +1,6 @@
package com.yomahub.liteflow.parser.redis.mode.polling;
import cn.hutool.core.lang.Pair;
import cn.hutool.core.util.StrUtil;
import cn.hutool.crypto.digest.DigestUtil;
import com.yomahub.liteflow.builder.el.LiteFlowChainELBuilder;
@ -8,6 +9,7 @@ import com.yomahub.liteflow.log.LFLog;
import com.yomahub.liteflow.log.LFLoggerManager;
import com.yomahub.liteflow.parser.redis.mode.RClient;
import com.yomahub.liteflow.parser.redis.vo.RedisParserVO;
import com.yomahub.liteflow.util.RuleParsePluginUtil;
import java.util.*;
@ -61,11 +63,19 @@ public class ChainPollingTask implements Runnable {
for (Map.Entry<String, String> entry : chainSHAMap.entrySet()) {
String chainId = entry.getKey();
String oldSHA = entry.getValue();
Pair<Boolean/*启停*/, String/*id*/> pair = RuleParsePluginUtil.parseIdKey(chainId);
// 如果是停用就直接进删除
if (pair.getKey()){
FlowBus.removeChain(pair.getValue());
needDelete.add(chainId);
continue;
}
//在redis服务端通过Lua脚本计算SHA值
String newSHA = chainClient.evalSha(valueLua, chainKey, chainId);
if (StrUtil.equals(newSHA, "nil")) {
//新SHA值为nil, 即未获取到该chain,表示该chain已被删除
FlowBus.removeChain(chainId);
FlowBus.removeChain(pair.getValue());
LOG.info("starting reload flow config... delete key={}", chainId);
//添加到待删除的list 后续统一从SHAMap中移除
@ -75,7 +85,7 @@ public class ChainPollingTask implements Runnable {
else if (!StrUtil.equals(newSHA, oldSHA)) {
//SHA值发生变化,表示该chain的值已被修改,重新拉取变化的chain
String chainData = chainClient.hget(chainKey, chainId);
LiteFlowChainELBuilder.createChain().setChainId(chainId).setEL(chainData).build();
LiteFlowChainELBuilder.createChain().setChainId(pair.getValue()).setEL(chainData).build();
LOG.info("starting reload flow config... update key={} new value={},", chainId, chainData);
//修改SHAMap
@ -98,12 +108,17 @@ public class ChainPollingTask implements Runnable {
//在此处重新拉取所有chainId集合,补充添加新chain
Set<String> newChainSet = chainClient.hkeys(chainKey);
for (String chainId : newChainSet) {
Pair<Boolean/*启停*/, String/*id*/> pair = RuleParsePluginUtil.parseIdKey(chainId);
if (!chainSHAMap.containsKey(chainId)) {
//将新chainId添加到LiteFlowChainELBuilder和SHAMap
String chainData = chainClient.hget(chainKey, chainId);
LiteFlowChainELBuilder.createChain().setChainId(chainId).setEL(chainData).build();
LOG.info("starting reload flow config... create key={} new value={},", chainId, chainData);
chainSHAMap.put(chainId, DigestUtil.sha1Hex(chainData));
// 如果是启用才装配
if (pair.getKey()){
LiteFlowChainELBuilder.createChain().setChainId(pair.getValue()).setEL(chainData).build();
LOG.info("starting reload flow config... create key={} new value={},", chainId, chainData);
chainSHAMap.put(chainId, DigestUtil.sha1Hex(chainData));
}
}
}
}

View File

@ -1,7 +1,6 @@
package com.yomahub.liteflow.parser.redis.mode.polling;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.thread.NamedThreadFactory;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
@ -13,11 +12,15 @@ import com.yomahub.liteflow.parser.redis.mode.RedisMode;
import com.yomahub.liteflow.parser.redis.mode.RedisParserHelper;
import com.yomahub.liteflow.parser.redis.vo.RedisParserVO;
import com.yomahub.liteflow.spi.holder.ContextAwareHolder;
import com.yomahub.liteflow.util.RuleParsePluginUtil;
import org.redisson.Redisson;
import org.redisson.config.Config;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Redis 轮询机制实现类
@ -126,8 +129,9 @@ public class RedisParserPollingMode implements RedisParserHelper {
List<String> chainItemContentList = new ArrayList<>();
for (String chainName : chainNameSet) {
String chainData = chainClient.hget(chainKey, chainName);
RuleParsePluginUtil.ChainDto chainDto = RuleParsePluginUtil.parseChainKey(chainName);
if (StrUtil.isNotBlank(chainData)) {
chainItemContentList.add(StrUtil.format(CHAIN_XML_PATTERN, chainName, chainData));
chainItemContentList.add(chainDto.toElXml(chainData));
}else{
continue;
}
@ -154,19 +158,10 @@ public class RedisParserPollingMode implements RedisParserHelper {
StrUtil.format("The name of the redis field [{}] in scriptKey [{}] is invalid",
scriptFieldValue, scriptKey));
}
String scriptData = scriptClient.hget(scriptKey, scriptFieldValue);
// 有语言类型
if (StrUtil.isNotBlank(nodeSimpleVO.getLanguage())) {
scriptItemContentList.add(StrUtil.format(NODE_ITEM_WITH_LANGUAGE_XML_PATTERN,
nodeSimpleVO.getNodeId(), nodeSimpleVO.getName(), nodeSimpleVO.getType(),
nodeSimpleVO.getLanguage(), scriptData));
}
// 没有语言类型
else {
scriptItemContentList.add(StrUtil.format(NODE_ITEM_XML_PATTERN, nodeSimpleVO.getNodeId(),
nodeSimpleVO.getName(), nodeSimpleVO.getType(), scriptData));
}
String scriptData = scriptClient.hget(scriptKey, scriptFieldValue);
nodeSimpleVO.setScript(scriptData);
scriptItemContentList.add(RuleParsePluginUtil.toScriptXml(nodeSimpleVO));
//计算scriptData的SHA值
String scriptSHA = DigestUtil.sha1Hex(scriptData);

View File

@ -76,15 +76,23 @@ public class ScriptPollingTask implements Runnable {
//添加到待删除的list 后续统一从SHAMap中移除
//不在这里直接移除是为了避免先删除导致scriptSHAMap并没有完全遍历完 script删除不全
needDelete.add(scriptFieldValue);
}
else if (!StrUtil.equals(newSHA, oldSHA)) {
} else if (!StrUtil.equals(newSHA, oldSHA)) {
//SHA值发生变化,表示该script的值已被修改,重新拉取变化的script
String scriptData = scriptClient.hget(scriptKey, scriptFieldValue);
RedisParserHelper.changeScriptNode(scriptFieldValue, scriptData);
LOG.info("starting reload flow config... update key={} new value={},", scriptFieldValue, scriptData);
//修改SHAMap
scriptSHAMap.put(scriptFieldValue, newSHA);
NodeConvertHelper.NodeSimpleVO nodeSimpleVO = NodeConvertHelper.convert(scriptFieldValue);
nodeSimpleVO.setScript(scriptData);
if (nodeSimpleVO.getEnable()) {
RedisParserHelper.changeScriptNode(scriptFieldValue, scriptData);
LOG.info("starting reload flow config... update key={} new value={},", scriptFieldValue, scriptData);
//修改SHAMap
scriptSHAMap.put(scriptFieldValue, newSHA);
} else {
FlowBus.getNodeMap().remove(nodeSimpleVO.getNodeId());
LOG.info("starting reload flow config... delete key={}", scriptFieldValue);
needDelete.add(scriptFieldValue);
}
}
//SHA值无变化,表示该script未改变
}
@ -106,9 +114,16 @@ public class ScriptPollingTask implements Runnable {
if (!scriptSHAMap.containsKey(scriptFieldValue)) {
//将新script添加到LiteFlowChainELBuilder和SHAMap
String scriptData = scriptClient.hget(scriptKey, scriptFieldValue);
RedisParserHelper.changeScriptNode(scriptFieldValue, scriptData);
LOG.info("starting reload flow config... create key={} new value={},", scriptFieldValue, scriptData);
scriptSHAMap.put(scriptFieldValue, DigestUtil.sha1Hex(scriptData));
NodeConvertHelper.NodeSimpleVO nodeSimpleVO = NodeConvertHelper.convert(scriptFieldValue);
if (nodeSimpleVO.getEnable()) {
RedisParserHelper.changeScriptNode(scriptFieldValue, scriptData);
LOG.info("starting reload flow config... create key={} new value={},", scriptFieldValue, scriptData);
scriptSHAMap.put(scriptFieldValue, DigestUtil.sha1Hex(scriptData));
} else {
FlowBus.getNodeMap().remove(nodeSimpleVO.getNodeId());
LOG.info("starting reload flow config... delete key={}", scriptFieldValue);
needDelete.add(scriptFieldValue);
}
}
}
}

View File

@ -1,10 +1,12 @@
package com.yomahub.liteflow.parser.redis.mode.subscribe;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.lang.Pair;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.builder.LiteFlowNodeBuilder;
import com.yomahub.liteflow.builder.el.LiteFlowChainELBuilder;
import com.yomahub.liteflow.enums.NodeTypeEnum;
import com.yomahub.liteflow.flow.FlowBus;
import com.yomahub.liteflow.parser.helper.NodeConvertHelper;
import com.yomahub.liteflow.parser.redis.exception.RedisException;
@ -13,10 +15,10 @@ import com.yomahub.liteflow.parser.redis.mode.RedisMode;
import com.yomahub.liteflow.parser.redis.mode.RedisParserHelper;
import com.yomahub.liteflow.parser.redis.vo.RedisParserVO;
import com.yomahub.liteflow.spi.holder.ContextAwareHolder;
import com.yomahub.liteflow.util.RuleParsePluginUtil;
import org.redisson.Redisson;
import org.redisson.api.map.event.EntryCreatedListener;
import org.redisson.api.map.event.EntryRemovedListener;
import org.redisson.api.map.event.EntryUpdatedListener;
import org.redisson.config.Config;
import java.util.ArrayList;
@ -28,7 +30,7 @@ import java.util.Map;
* 使用 Redisson客户端 RMapCache存储结构
*
* @author hxinyu
* @since 2.11.0
* @since 2.11.0
*/
public class RedisParserSubscribeMode implements RedisParserHelper {
@ -46,14 +48,13 @@ public class RedisParserSubscribeMode implements RedisParserHelper {
try {
this.chainClient = ContextAwareHolder.loadContextAware().getBean("chainClient");
this.scriptClient = ContextAwareHolder.loadContextAware().getBean("scriptClient");
}
catch (Exception ignored) {
} catch (Exception ignored) {
}
if (ObjectUtil.isNull(chainClient)) {
RedisMode redisMode = redisParserVO.getRedisMode();
Config config;
//Redis单点模式
if (redisMode.equals(RedisMode.SINGLE)){
if (redisMode.equals(RedisMode.SINGLE)) {
config = getSingleRedissonConfig(redisParserVO, redisParserVO.getChainDataBase());
this.chainClient = new RClient(Redisson.create(config));
//如果有脚本数据
@ -74,8 +75,7 @@ public class RedisParserSubscribeMode implements RedisParserHelper {
}
}
}
}
catch (Exception e) {
} catch (Exception e) {
throw new RedisException(e.getMessage());
}
@ -91,8 +91,9 @@ public class RedisParserSubscribeMode implements RedisParserHelper {
for (Map.Entry<String, String> entry : chainMap.entrySet()) {
String chainId = entry.getKey();
String chainData = entry.getValue();
RuleParsePluginUtil.ChainDto chainDto = RuleParsePluginUtil.parseChainKey(chainId);
if (StrUtil.isNotBlank(chainData)) {
chainItemContentList.add(StrUtil.format(CHAIN_XML_PATTERN, chainId, chainData));
chainItemContentList.add(chainDto.toElXml(chainData));
}
}
// 合并成所有chain的xml内容
@ -112,17 +113,9 @@ public class RedisParserSubscribeMode implements RedisParserHelper {
StrUtil.format("The name of the redis field [{}] in scriptKey [{}] is invalid",
scriptFieldValue, redisParserVO.getScriptKey()));
}
// 有语言类型
if (StrUtil.isNotBlank(nodeSimpleVO.getLanguage())) {
scriptItemContentList.add(StrUtil.format(NODE_ITEM_WITH_LANGUAGE_XML_PATTERN,
nodeSimpleVO.getNodeId(), nodeSimpleVO.getName(), nodeSimpleVO.getType(),
nodeSimpleVO.getLanguage(), scriptData));
}
// 没有语言类型
else {
scriptItemContentList.add(StrUtil.format(NODE_ITEM_XML_PATTERN, nodeSimpleVO.getNodeId(),
nodeSimpleVO.getName(), nodeSimpleVO.getType(), scriptData));
}
nodeSimpleVO.setScript(scriptData);
scriptItemContentList.add(RuleParsePluginUtil.toScriptXml(nodeSimpleVO));
}
scriptAllContent = StrUtil.format(NODE_XML_PATTERN,
@ -130,8 +123,7 @@ public class RedisParserSubscribeMode implements RedisParserHelper {
}
return StrUtil.format(XML_PATTERN, scriptAllContent, chainAllContent);
}
catch (Exception e) {
} catch (Exception e) {
throw new RedisException(e.getMessage());
}
}
@ -145,8 +137,7 @@ public class RedisParserSubscribeMode implements RedisParserHelper {
// 存在这个节点但是子节点不存在
Map<String, String> scriptMap = scriptClient.getMap(redisParserVO.getScriptKey());
return !CollUtil.isEmpty(scriptMap);
}
catch (Exception e) {
} catch (Exception e) {
return false;
}
}
@ -158,35 +149,58 @@ public class RedisParserSubscribeMode implements RedisParserHelper {
public void listenRedis() {
//监听 chain
String chainKey = redisParserVO.getChainKey();
EntryCreatedListener<String, String> chainModifyFunc = event -> {
LOG.info("starting modify flow config... create key={} value={},", event.getKey(), event.getValue());
String chainName = event.getKey();
String value = event.getValue();
Pair<Boolean/*启停*/, String/*id*/> pair = RuleParsePluginUtil.parseIdKey(chainName);
String id = pair.getValue();
// 如果是启用就正常更新
if (pair.getKey()) {
LiteFlowChainELBuilder.createChain().setChainId(id).setEL(value).build();
}
// 如果是禁用就删除
else {
FlowBus.removeChain(id);
}
};
//添加新 chain
chainClient.addListener(chainKey, (EntryCreatedListener<String, String>) event -> {
LOG.info("starting reload flow config... create key={} value={},", event.getKey(), event.getValue());
LiteFlowChainELBuilder.createChain().setChainId(event.getKey()).setEL(event.getValue()).build();
});
chainClient.addListener(chainKey, chainModifyFunc);
//修改 chain
chainClient.addListener(chainKey, (EntryUpdatedListener<String, String>) event -> {
LOG.info("starting reload flow config... update key={} new value={},", event.getKey(), event.getValue());
LiteFlowChainELBuilder.createChain().setChainId(event.getKey()).setEL(event.getValue()).build();
});
chainClient.addListener(chainKey, chainModifyFunc);
//删除 chain
chainClient.addListener(chainKey, (EntryRemovedListener<String, String>) event -> {
LOG.info("starting reload flow config... delete key={}", event.getKey());
FlowBus.removeChain(event.getKey());
Pair<Boolean/*启停*/, String/*id*/> pair = RuleParsePluginUtil.parseIdKey(event.getKey());
FlowBus.removeChain(pair.getValue());
});
//监听 script
EntryCreatedListener<String, String> scriptModifyFunc = event -> {
LOG.info("starting modify flow config... create key={} value={},", event.getKey(), event.getValue());
NodeConvertHelper.NodeSimpleVO nodeSimpleVO = NodeConvertHelper.convert(event.getKey());
nodeSimpleVO.setScript(event.getValue());
// 启用就正常更新
if (nodeSimpleVO.getEnable()) {
LiteFlowNodeBuilder.createScriptNode()
.setId(nodeSimpleVO.getNodeId())
.setType(NodeTypeEnum.getEnumByCode(nodeSimpleVO.getType()))
.setName(nodeSimpleVO.getName())
.setScript(nodeSimpleVO.getScript())
.setLanguage(nodeSimpleVO.getLanguage())
.build();
}
// 禁用就删除
else {
FlowBus.getNodeMap().remove(nodeSimpleVO.getNodeId());
}
};
if (ObjectUtil.isNotNull(scriptClient) && ObjectUtil.isNotNull(redisParserVO.getScriptDataBase())) {
String scriptKey = redisParserVO.getScriptKey();
//添加 script
scriptClient.addListener(scriptKey, (EntryCreatedListener<String, String>) event -> {
LOG.info("starting reload flow config... create key={} value={},", event.getKey(), event.getValue());
RedisParserHelper.changeScriptNode(event.getKey(), event.getValue());
});
scriptClient.addListener(scriptKey, scriptModifyFunc);
//修改 script
scriptClient.addListener(scriptKey, (EntryUpdatedListener<String, String>) event -> {
LOG.info("starting reload flow config... update key={} new value={},", event.getKey(), event.getValue());
RedisParserHelper.changeScriptNode(event.getKey(), event.getValue());
});
scriptClient.addListener(scriptKey, scriptModifyFunc);
//删除 script
scriptClient.addListener(scriptKey, (EntryRemovedListener<String, String>) event -> {
LOG.info("starting reload flow config... delete key={}", event.getKey());

View File

@ -49,16 +49,20 @@ public abstract class AbstractSqlRead implements SqlRead {
// 设置游标拉取数量
stmt.setFetchSize(SqlReadConstant.FETCH_SIZE_MAX);
stmt.setString(1, config.getApplicationName());
ParameterMetaData parameterMetaData = stmt.getParameterMetaData();
if (parameterMetaData.getParameterCount() == 2) {
stmt.setBoolean(2, true);
}
rs = stmt.executeQuery();
while (rs.next()) {
String xml = buildXmlElement(rs);
String uniqueKey = buildXmlElementUniqueKey(rs);
if (hasEnableFiled()){
boolean enable = getEnableFiledValue(rs);
// 如果停用直接跳过
if (!enable){
continue;
}
}
result.put(uniqueKey, xml);
}
} catch (Exception e) {
@ -71,6 +75,16 @@ public abstract class AbstractSqlRead implements SqlRead {
return result;
}
/**
* 是否包含启停字段
*/
public abstract boolean hasEnableFiled();
/**
* 获取启停字段对应的字段值
*/
public abstract boolean getEnableFiledValue(ResultSet rs) throws SQLException;
public abstract String buildQuerySql();
public abstract String buildXmlElement(ResultSet rs) throws SQLException;

View File

@ -23,6 +23,20 @@ public class ChainRead extends AbstractSqlRead {
super(config);
}
@Override
public boolean hasEnableFiled() {
String chainEnableField = super.config.getChainEnableField();
return StrUtil.isNotBlank(chainEnableField);
}
@Override
public boolean getEnableFiledValue(ResultSet rs) throws SQLException {
String chainEnableField = super.config.getChainEnableField();
byte enable = rs.getByte(chainEnableField);
return enable == 1;
}
@Override
public String buildQuerySql() {
String chainTableName = super.config.getChainTableName();
@ -30,7 +44,7 @@ public class ChainRead extends AbstractSqlRead {
String chainNameField = super.config.getChainNameField();
String chainApplicationNameField = super.config.getChainApplicationNameField();
String applicationName = super.config.getApplicationName();
String chainEnableField = super.config.getChainEnableField();
if (StrUtil.isBlank(chainTableName)) {
throw new ELSQLException("You did not define the chainTableName property");
@ -43,10 +57,6 @@ public class ChainRead extends AbstractSqlRead {
String sqlCmd = StrUtil.format(SqlReadConstant.SQL_PATTERN, chainNameField, elDataField, chainTableName,
chainApplicationNameField);
if (StrUtil.isNotBlank(chainEnableField)){
sqlCmd = StrUtil.format("{} {}", sqlCmd, StrUtil.format(SqlReadConstant.SQL_ENABLE_PATTERN, chainEnableField));
}
return sqlCmd;
}

View File

@ -30,6 +30,20 @@ public class ScriptRead extends AbstractSqlRead {
super(config);
}
@Override
public boolean hasEnableFiled() {
String scriptEnableField = super.config.getScriptEnableField();
return StrUtil.isNotBlank(scriptEnableField);
}
@Override
public boolean getEnableFiledValue(ResultSet rs) throws SQLException {
String scriptEnableField = super.config.getScriptEnableField();
byte enable = rs.getByte(scriptEnableField);
return enable == 1;
}
@Override
public String buildQuerySql() {
String scriptLanguageField = super.config.getScriptLanguageField();
@ -40,7 +54,7 @@ public class ScriptRead extends AbstractSqlRead {
String scriptTypeField = super.config.getScriptTypeField();
String scriptApplicationNameField = super.config.getScriptApplicationNameField();
String applicationName = super.config.getApplicationName();
String scriptEnableField = super.config.getScriptEnableField();
if (StrUtil.isBlank(applicationName) || StrUtil.isBlank(scriptApplicationNameField)) {
throw new ELSQLException("You did not define the applicationName or scriptApplicationNameField property");
@ -73,10 +87,6 @@ public class ScriptRead extends AbstractSqlRead {
);
}
if (StrUtil.isNotBlank(scriptEnableField)){
sqlCmd = StrUtil.format("{} {}", sqlCmd, StrUtil.format(SqlReadConstant.SQL_ENABLE_PATTERN, scriptEnableField));
}
return sqlCmd;
}

View File

@ -1,9 +1,9 @@
package com.yomahub.liteflow.parser.zk.util;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.io.file.FileNameUtil;
import cn.hutool.core.lang.Pair;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.builder.LiteFlowNodeBuilder;
import com.yomahub.liteflow.builder.el.LiteFlowChainELBuilder;
@ -12,6 +12,7 @@ import com.yomahub.liteflow.flow.FlowBus;
import com.yomahub.liteflow.parser.helper.NodeConvertHelper;
import com.yomahub.liteflow.parser.zk.exception.ZkException;
import com.yomahub.liteflow.parser.zk.vo.ZkParserVO;
import com.yomahub.liteflow.util.RuleParsePluginUtil;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.CuratorCache;
@ -26,186 +27,172 @@ import java.util.Objects;
public class ZkParserHelper {
private static final Logger LOG = LoggerFactory.getLogger(ZkParserHelper.class);
private static final Logger LOG = LoggerFactory.getLogger(ZkParserHelper.class);
private final ZkParserVO zkParserVO;
private final ZkParserVO zkParserVO;
private final CuratorFramework client;
private final CuratorFramework client;
private final String CHAIN_XML_PATTERN = "<chain name=\"{}\">{}</chain>";
private final String NODE_XML_PATTERN = "<nodes>{}</nodes>";
private final String NODE_XML_PATTERN = "<nodes>{}</nodes>";
private final String XML_PATTERN = "<?xml version=\"1.0\" encoding=\"UTF-8\"?><flow>{}{}</flow>";
private final String NODE_ITEM_XML_PATTERN = "<node id=\"{}\" name=\"{}\" type=\"{}\"><![CDATA[{}]]></node>";
public ZkParserHelper(ZkParserVO zkParserVO) {
this.zkParserVO = zkParserVO;
private final String NODE_ITEM_XML_WITH_LANGUAGE_PATTERN = "<node id=\"{}\" name=\"{}\" type=\"{}\" language=\"{}\"><![CDATA[{}]]></node>";
try {
CuratorFramework client = CuratorFrameworkFactory.newClient(zkParserVO.getConnectStr(),
new RetryNTimes(10, 5000));
client.start();
private final String XML_PATTERN = "<?xml version=\"1.0\" encoding=\"UTF-8\"?><flow>{}{}</flow>";
this.client = client;
} catch (Exception e) {
throw new ZkException(e.getMessage());
}
}
public ZkParserHelper(ZkParserVO zkParserVO) {
this.zkParserVO = zkParserVO;
public String getContent() {
try {
// 检查zk上有没有chainPath节点
if (client.checkExists().forPath(zkParserVO.getChainPath()) == null) {
throw new ZkException(StrUtil.format("zk node[{}] is not exist", zkParserVO.getChainPath()));
}
try {
CuratorFramework client = CuratorFrameworkFactory.newClient(zkParserVO.getConnectStr(),
new RetryNTimes(10, 5000));
client.start();
// 检查chainPath路径下有没有子节点
List<String> chainNameList = client.getChildren().forPath(zkParserVO.getChainPath());
// 获取chainPath路径下的所有子节点内容List
List<String> chainItemContentList = new ArrayList<>();
for (String chainName : chainNameList) {
RuleParsePluginUtil.ChainDto chainDto = RuleParsePluginUtil.parseChainKey(chainName);
String chainData = new String(
client.getData().forPath(StrUtil.format("{}/{}", zkParserVO.getChainPath(), chainName)));
if (StrUtil.isNotBlank(chainData)) {
chainItemContentList.add(chainDto.toElXml(chainData));
}
}
// 合并成所有chain的xml内容
String chainAllContent = CollUtil.join(chainItemContentList, StrUtil.EMPTY);
this.client = client;
}
catch (Exception e) {
throw new ZkException(e.getMessage());
}
}
// 检查是否有脚本内容如果有进行脚本内容的获取
String scriptAllContent = StrUtil.EMPTY;
if (hasScript()) {
List<String> scriptNodeValueList = client.getChildren().forPath(zkParserVO.getScriptPath());
public String getContent() {
try {
// 检查zk上有没有chainPath节点
if (client.checkExists().forPath(zkParserVO.getChainPath()) == null) {
throw new ZkException(StrUtil.format("zk node[{}] is not exist", zkParserVO.getChainPath()));
}
List<String> scriptItemContentList = new ArrayList<>();
for (String scriptNodeValue : scriptNodeValueList) {
NodeConvertHelper.NodeSimpleVO nodeSimpleVO = NodeConvertHelper.convert(scriptNodeValue);
if (Objects.isNull(nodeSimpleVO)) {
throw new ZkException(StrUtil.format("The name of the zk node is invalid:{}", scriptNodeValue));
}
String scriptData = new String(client.getData()
.forPath(StrUtil.format("{}/{}", zkParserVO.getScriptPath(), scriptNodeValue)));
// 检查chainPath路径下有没有子节点
List<String> chainNameList = client.getChildren().forPath(zkParserVO.getChainPath());
// 获取chainPath路径下的所有子节点内容List
List<String> chainItemContentList = new ArrayList<>();
for (String chainName : chainNameList) {
String chainData = new String(
client.getData().forPath(StrUtil.format("{}/{}", zkParserVO.getChainPath(), chainName)));
if (StrUtil.isBlank(chainData)){
continue;
}
chainItemContentList.add(StrUtil.format(CHAIN_XML_PATTERN, chainName, chainData));
}
// 合并成所有chain的xml内容
String chainAllContent = CollUtil.join(chainItemContentList, StrUtil.EMPTY);
nodeSimpleVO.setScript(scriptData);
scriptItemContentList.add(RuleParsePluginUtil.toScriptXml(nodeSimpleVO));
}
// 检查是否有脚本内容如果有进行脚本内容的获取
String scriptAllContent = StrUtil.EMPTY;
if (hasScript()) {
List<String> scriptNodeValueList = client.getChildren().forPath(zkParserVO.getScriptPath());
scriptAllContent = StrUtil.format(NODE_XML_PATTERN,
CollUtil.join(scriptItemContentList, StrUtil.EMPTY));
}
List<String> scriptItemContentList = new ArrayList<>();
for (String scriptNodeValue : scriptNodeValueList) {
NodeConvertHelper.NodeSimpleVO nodeSimpleVO = NodeConvertHelper.convert(scriptNodeValue);
if (Objects.isNull(nodeSimpleVO)) {
throw new ZkException(StrUtil.format("The name of the zk node is invalid:{}", scriptNodeValue));
}
String scriptData = new String(client.getData()
.forPath(StrUtil.format("{}/{}", zkParserVO.getScriptPath(), scriptNodeValue)));
return StrUtil.format(XML_PATTERN, scriptAllContent, chainAllContent);
} catch (Exception e) {
throw new ZkException(e.getMessage());
}
}
// 有语言类型
if (StrUtil.isNotBlank(nodeSimpleVO.getLanguage())) {
scriptItemContentList.add(StrUtil.format(NODE_ITEM_XML_WITH_LANGUAGE_PATTERN,
nodeSimpleVO.getNodeId(), nodeSimpleVO.getName(), nodeSimpleVO.getType(),
nodeSimpleVO.getLanguage(), scriptData));
}
// 没有语言类型
else {
scriptItemContentList.add(StrUtil.format(NODE_ITEM_XML_PATTERN, nodeSimpleVO.getNodeId(),
nodeSimpleVO.getName(), nodeSimpleVO.getType(), scriptData));
}
}
public boolean hasScript() {
// 没有配置scriptPath
if (StrUtil.isBlank(zkParserVO.getScriptPath())) {
return false;
}
scriptAllContent = StrUtil.format(NODE_XML_PATTERN,
CollUtil.join(scriptItemContentList, StrUtil.EMPTY));
}
try {
// 配置了但是不存在这个节点
if (client.checkExists().forPath(zkParserVO.getScriptPath()) == null) {
return false;
}
return StrUtil.format(XML_PATTERN, scriptAllContent, chainAllContent);
}
catch (Exception e) {
throw new ZkException(e.getMessage());
}
}
// 存在这个节点但是子节点不存在
List<String> chainNameList = client.getChildren().forPath(zkParserVO.getScriptPath());
return !CollUtil.isEmpty(chainNameList);
} catch (Exception e) {
return false;
}
}
public boolean hasScript() {
// 没有配置scriptPath
if (StrUtil.isBlank(zkParserVO.getScriptPath())) {
return false;
}
/**
* 监听 zk 节点
*/
public void listenZkNode() {
// 监听chain
CuratorCache cache1 = CuratorCache.build(client, zkParserVO.getChainPath());
cache1.start();
cache1.listenable().addListener((type, oldData, data) -> {
String path = data.getPath();
String value = new String(data.getData());
if (StrUtil.isBlank(value)) {
return;
}
if (ListUtil.toList(CuratorCacheListener.Type.NODE_CREATED, CuratorCacheListener.Type.NODE_CHANGED)
.contains(type)) {
LOG.info("starting reload flow config... {} path={} value={},", type.name(), path, value);
String chainName = FileNameUtil.getName(path);
Pair<Boolean/*启停*/, String/*id*/> pair = RuleParsePluginUtil.parseIdKey(chainName);
String id = pair.getValue();
// 如果是启用就正常更新
if (pair.getKey()) {
LiteFlowChainELBuilder.createChain().setChainId(id).setEL(value).build();
}
// 如果是禁用就删除
else {
FlowBus.removeChain(id);
}
} else if (CuratorCacheListener.Type.NODE_DELETED.equals(type)) {
LOG.info("starting reload flow config... delete path={}", path);
String chainName = FileNameUtil.getName(path);
Pair<Boolean/*启停*/, String/*id*/> pair = RuleParsePluginUtil.parseIdKey(chainName);
FlowBus.removeChain(pair.getValue());
}
});
try {
// 配置了但是不存在这个节点
if (client.checkExists().forPath(zkParserVO.getScriptPath()) == null) {
return false;
}
if (StrUtil.isNotBlank(zkParserVO.getScriptPath())) {
// 监听script
CuratorCache cache2 = CuratorCache.build(client, zkParserVO.getScriptPath());
cache2.start();
cache2.listenable().addListener((type, oldData, data) -> {
String path = data.getPath();
String value = new String(data.getData());
if (StrUtil.isBlank(value)) {
return;
}
if (ListUtil.toList(CuratorCacheListener.Type.NODE_CREATED, CuratorCacheListener.Type.NODE_CHANGED)
.contains(type)) {
LOG.info("starting reload flow config... {} path={} value={},", type.name(), path, value);
String scriptNodeValue = FileNameUtil.getName(path);
NodeConvertHelper.NodeSimpleVO nodeSimpleVO = NodeConvertHelper.convert(scriptNodeValue);
// 存在这个节点但是子节点不存在
List<String> chainNameList = client.getChildren().forPath(zkParserVO.getScriptPath());
return !CollUtil.isEmpty(chainNameList);
}
catch (Exception e) {
return false;
}
}
/**
* 监听 zk 节点
*/
public void listenZkNode() {
// 监听chain
CuratorCache cache1 = CuratorCache.build(client, zkParserVO.getChainPath());
cache1.start();
cache1.listenable().addListener((type, oldData, data) -> {
String path = data.getPath();
String value = new String(data.getData());
if (StrUtil.isBlank(value)) {
return;
}
if (ListUtil.toList(CuratorCacheListener.Type.NODE_CREATED, CuratorCacheListener.Type.NODE_CHANGED)
.contains(type)) {
LOG.info("starting reload flow config... {} path={} value={},", type.name(), path, value);
String chainName = FileNameUtil.getName(path);
LiteFlowChainELBuilder.createChain().setChainId(chainName).setEL(value).build();
}
else if (CuratorCacheListener.Type.NODE_DELETED.equals(type)) {
LOG.info("starting reload flow config... delete path={}", path);
String chainName = FileNameUtil.getName(path);
FlowBus.removeChain(chainName);
}
});
if (StrUtil.isNotBlank(zkParserVO.getScriptPath())) {
// 监听script
CuratorCache cache2 = CuratorCache.build(client, zkParserVO.getScriptPath());
cache2.start();
cache2.listenable().addListener((type, oldData, data) -> {
String path = data.getPath();
String value = new String(data.getData());
if (StrUtil.isBlank(value)) {
return;
}
if (ListUtil.toList(CuratorCacheListener.Type.NODE_CREATED, CuratorCacheListener.Type.NODE_CHANGED)
.contains(type)) {
LOG.info("starting reload flow config... {} path={} value={},", type.name(), path, value);
String scriptNodeValue = FileNameUtil.getName(path);
NodeConvertHelper.NodeSimpleVO nodeSimpleVO = NodeConvertHelper.convert(scriptNodeValue);
// 有语言类型
if (StrUtil.isNotBlank(nodeSimpleVO.getLanguage())) {
LiteFlowNodeBuilder.createScriptNode()
.setId(nodeSimpleVO.getNodeId())
.setType(NodeTypeEnum.getEnumByCode(nodeSimpleVO.getType()))
.setName(nodeSimpleVO.getName())
.setScript(value)
.setLanguage(nodeSimpleVO.getLanguage())
.build();
}
// 没有语言类型
else {
LiteFlowNodeBuilder.createScriptNode()
.setId(nodeSimpleVO.getNodeId())
.setType(NodeTypeEnum.getEnumByCode(nodeSimpleVO.getType()))
.setName(nodeSimpleVO.getName())
.setScript(value)
.build();
}
}
else if (CuratorCacheListener.Type.NODE_DELETED.equals(type)) {
LOG.info("starting reload flow config... delete path={}", path);
String scriptNodeValue = FileNameUtil.getName(path);
NodeConvertHelper.NodeSimpleVO nodeSimpleVO = NodeConvertHelper.convert(scriptNodeValue);
FlowBus.getNodeMap().remove(nodeSimpleVO.getNodeId());
}
});
}
}
// 启用就正常更新
if (nodeSimpleVO.getEnable()) {
LiteFlowNodeBuilder.createScriptNode()
.setId(nodeSimpleVO.getNodeId())
.setType(NodeTypeEnum.getEnumByCode(nodeSimpleVO.getType()))
.setName(nodeSimpleVO.getName())
.setScript(nodeSimpleVO.getScript())
.setLanguage(nodeSimpleVO.getLanguage())
.build();
}
// 禁用就删除
else {
FlowBus.getNodeMap().remove(nodeSimpleVO.getNodeId());
}
} else if (CuratorCacheListener.Type.NODE_DELETED.equals(type)) {
LOG.info("starting reload flow config... delete path={}", path);
String scriptNodeValue = FileNameUtil.getName(path);
NodeConvertHelper.NodeSimpleVO nodeSimpleVO = NodeConvertHelper.convert(scriptNodeValue);
FlowBus.getNodeMap().remove(nodeSimpleVO.getNodeId());
}
});
}
}
}

View File

@ -3,10 +3,10 @@ package com.yomahub.liteflow.test.apollo;
import com.ctrip.framework.apollo.Config;
import com.google.common.collect.Sets;
import com.yomahub.liteflow.core.FlowExecutor;
import com.yomahub.liteflow.exception.ChainNotFoundException;
import com.yomahub.liteflow.flow.FlowBus;
import com.yomahub.liteflow.flow.LiteflowResponse;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@ -16,13 +16,12 @@ import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.event.annotation.AfterTestClass;
import org.springframework.test.context.event.annotation.BeforeTestClass;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import javax.annotation.Resource;
import java.util.Set;
import static org.mockito.Mockito.*;
import static org.mockito.Mockito.when;
/**
* @Description:
@ -33,35 +32,44 @@ import static org.mockito.Mockito.*;
@TestPropertySource(value = "classpath:/apollo/application-xml.properties")
@SpringBootTest(classes = ApolloWithXmlELSpringbootTest.class)
@EnableAutoConfiguration
@ComponentScan({ "com.yomahub.liteflow.test.apollo.cmp" })
@ComponentScan({"com.yomahub.liteflow.test.apollo.cmp"})
public class ApolloWithXmlELSpringbootTest {
@MockBean(name = "chainConfig")
private Config chainConfig;
@MockBean(name = "chainConfig")
private Config chainConfig;
@MockBean(name = "scriptConfig")
private Config scriptConfig;
@MockBean(name = "scriptConfig")
private Config scriptConfig;
@Resource
private FlowExecutor flowExecutor;
@Resource
private FlowExecutor flowExecutor;
@BeforeEach
public void setUp() {
MockitoAnnotations.initMocks(this);
}
@Test
public void testApolloWithXml1() {
Set<String> chainNameList = Sets.newHashSet("chain1");
Set<String> scriptNodeValueList = Sets.newHashSet("s1:script:脚本s1");
when(chainConfig.getPropertyNames()).thenReturn(chainNameList);
when(scriptConfig.getPropertyNames()).thenReturn(scriptNodeValueList);
@BeforeEach
public void setUp() {
MockitoAnnotations.initMocks(this);
}
String chain1Data = "THEN(a, b, c, s1);";
String scriptNodeValue = "defaultContext.setData(\"test\",\"hello\");";
when(chainConfig.getProperty(anyString(), anyString())).thenReturn(chain1Data);
when(scriptConfig.getProperty(anyString(), anyString())).thenReturn(scriptNodeValue);
@Test
public void testApolloWithXml1() {
Set<String> chainNameList = Sets.newHashSet("chain1", "chain2:false");
Set<String> scriptNodeValueList = Sets.newHashSet("s1:script:脚本s1", "s2:script:脚本s1:groovy:false");
when(chainConfig.getPropertyNames()).thenReturn(chainNameList);
when(scriptConfig.getPropertyNames()).thenReturn(scriptNodeValueList);
LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg");
Assertions.assertEquals("a==>b==>c==>s1[脚本s1]", response.getExecuteStepStrWithoutTime());
}
when(chainConfig.getProperty("chain1", "")).thenReturn("THEN(a, b, c, s1);");
when(chainConfig.getProperty("chain2:false", "")).thenReturn("THEN(a, b, c, s1);");
when(scriptConfig.getProperty("s1:script:脚本s1", "")).thenReturn("defaultContext.setData(\"test\",\"hello\");");
when(scriptConfig.getProperty("s2:script:脚本s1:groovy:false", "")).thenReturn("defaultContext.setData(\"test\",\"hello\");");
LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg");
Assertions.assertEquals("a==>b==>c==>s1[脚本s1]", response.getExecuteStepStrWithoutTime());
// 测试 chain 停用
Assertions.assertThrows(ChainNotFoundException.class, () -> {
throw flowExecutor.execute2Resp("chain2", "arg").getCause();
});
// 测试 script 停用
Assertions.assertTrue(!FlowBus.getNodeMap().containsKey("s2"));
}
}

View File

@ -2,6 +2,7 @@ package com.yomahub.liteflow.test.etcd;
import com.google.common.collect.Lists;
import com.yomahub.liteflow.core.FlowExecutor;
import com.yomahub.liteflow.exception.ChainNotFoundException;
import com.yomahub.liteflow.flow.FlowBus;
import com.yomahub.liteflow.flow.LiteflowResponse;
import com.yomahub.liteflow.parser.etcd.EtcdClient;
@ -16,8 +17,10 @@ import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import javax.annotation.Resource;
import java.util.List;
import static org.mockito.Mockito.*;
/**
@ -27,79 +30,88 @@ import static org.mockito.Mockito.*;
@TestPropertySource(value = "classpath:/etcd/application-xml-cluster.properties")
@SpringBootTest(classes = EtcdWithXmlELSpringbootTest.class)
@EnableAutoConfiguration
@ComponentScan({ "com.yomahub.liteflow.test.etcd.cmp" })
@ComponentScan({"com.yomahub.liteflow.test.etcd.cmp"})
public class EtcdWithXmlELSpringbootTest extends BaseTest {
@MockBean
private EtcdClient etcdClient;
@MockBean
private EtcdClient etcdClient;
@Resource
private FlowExecutor flowExecutor;
@Resource
private FlowExecutor flowExecutor;
private static final String SEPARATOR = "/";
private static final String SEPARATOR = "/";
private static final String CHAIN_PATH = "/liteflow/chain";
private static final String CHAIN_PATH = "/liteflow/chain";
private static final String SCRIPT_PATH = "/liteflow/script";
private static final String SCRIPT_PATH = "/liteflow/script";
@BeforeEach
public void setUp() {
MockitoAnnotations.initMocks(this);
}
@BeforeEach
public void setUp() {
MockitoAnnotations.initMocks(this);
}
@AfterEach
public void after() {
FlowBus.cleanCache();
FlowBus.clearStat();
}
@AfterEach
public void after() {
FlowBus.cleanCache();
FlowBus.clearStat();
}
@Test
public void testEtcdNodeWithXml1() throws Exception {
List<String> chainNameList = Lists.newArrayList("chain1");
List<String> scriptNodeValueList = Lists.newArrayList("s1:script:脚本s1");
when(etcdClient.getChildrenKeys(anyString(), anyString())).thenReturn(chainNameList)
.thenReturn(scriptNodeValueList);
@Test
public void testEtcdNodeWithXml1() throws Exception {
List<String> chainNameList = Lists.newArrayList("chain1", "chain2:false");
List<String> scriptNodeValueList = Lists.newArrayList("s1:script:脚本s1", "s2:script:脚本s1:groovy:false");
when(etcdClient.getChildrenKeys(CHAIN_PATH, SEPARATOR)).thenReturn(chainNameList);
when(etcdClient.getChildrenKeys(SCRIPT_PATH, SEPARATOR)).thenReturn(scriptNodeValueList);
String chain1Data = "THEN(a, b, c, s1);";
String scriptNodeValue = "defaultContext.setData(\"test\",\"hello\");";
when(etcdClient.get(anyString())).thenReturn(chain1Data).thenReturn(scriptNodeValue);
when(etcdClient.get(CHAIN_PATH + "/chain1")).thenReturn("THEN(a, b, c, s1);");
when(etcdClient.get(CHAIN_PATH + "/chain2:false")).thenReturn("THEN(a, b, c, s1);");
when(etcdClient.get(SCRIPT_PATH + "/s1:script:脚本s1")).thenReturn("defaultContext.setData(\"test\",\"hello\");");
when(etcdClient.get(SCRIPT_PATH + "/s2:script:脚本s1:groovy:false")).thenReturn("defaultContext.setData(\"test\",\"hello\");");
LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg");
DefaultContext context = response.getFirstContextBean();
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("a==>b==>c==>s1[脚本s1]", response.getExecuteStepStr());
Assertions.assertEquals("hello", context.getData("test"));
}
LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg");
DefaultContext context = response.getFirstContextBean();
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("a==>b==>c==>s1[脚本s1]", response.getExecuteStepStr());
Assertions.assertEquals("hello", context.getData("test"));
@Test
public void testEtcdNodeWithXml2() throws Exception {
List<String> chainNameList = Lists.newArrayList("chain1");
List<String> scriptNodeValueList = Lists.newArrayList("s1:script:脚本s1");
when(etcdClient.getChildrenKeys(CHAIN_PATH, SEPARATOR)).thenReturn(chainNameList);
when(etcdClient.getChildrenKeys(SCRIPT_PATH, SEPARATOR)).thenReturn(scriptNodeValueList);
// 测试 chain 停用
Assertions.assertThrows(ChainNotFoundException.class, () -> {
throw flowExecutor.execute2Resp("chain2", "arg").getCause();
});
String chain1Data = "THEN(a, b, c, s1);";
String chain1ChangedData = "THEN(a, b, s1);";
String scriptNodeValue = "defaultContext.setData(\"test\",\"hello\");";
String scriptNodeChangedValue = "defaultContext.setData(\"test\",\"hello world\");";
when(etcdClient.get(CHAIN_PATH + SEPARATOR + "chain1")).thenReturn(chain1Data).thenReturn(chain1ChangedData);
when(etcdClient.get(SCRIPT_PATH + SEPARATOR + "s1:script:脚本s1")).thenReturn(scriptNodeValue)
.thenReturn(scriptNodeChangedValue);
// 测试 script 停用
Assertions.assertTrue(!FlowBus.getNodeMap().containsKey("s2"));
}
LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg");
DefaultContext context = response.getFirstContextBean();
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("a==>b==>c==>s1[脚本s1]", response.getExecuteStepStr());
Assertions.assertEquals("hello", context.getData("test"));
@Test
public void testEtcdNodeWithXml2() throws Exception {
List<String> chainNameList = Lists.newArrayList("chain1");
List<String> scriptNodeValueList = Lists.newArrayList("s1:script:脚本s1");
when(etcdClient.getChildrenKeys(CHAIN_PATH, SEPARATOR)).thenReturn(chainNameList);
when(etcdClient.getChildrenKeys(SCRIPT_PATH, SEPARATOR)).thenReturn(scriptNodeValueList);
flowExecutor.reloadRule();
String chain1Data = "THEN(a, b, c, s1);";
String chain1ChangedData = "THEN(a, b, s1);";
String scriptNodeValue = "defaultContext.setData(\"test\",\"hello\");";
String scriptNodeChangedValue = "defaultContext.setData(\"test\",\"hello world\");";
when(etcdClient.get(CHAIN_PATH + SEPARATOR + "chain1")).thenReturn(chain1Data).thenReturn(chain1ChangedData);
when(etcdClient.get(SCRIPT_PATH + SEPARATOR + "s1:script:脚本s1")).thenReturn(scriptNodeValue)
.thenReturn(scriptNodeChangedValue);
LiteflowResponse response2 = flowExecutor.execute2Resp("chain1", "arg");
DefaultContext context2 = response2.getFirstContextBean();
Assertions.assertTrue(response2.isSuccess());
Assertions.assertEquals("a==>b==>s1[脚本s1]", response2.getExecuteStepStr());
Assertions.assertEquals("hello world", context2.getData("test"));
LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg");
DefaultContext context = response.getFirstContextBean();
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("a==>b==>c==>s1[脚本s1]", response.getExecuteStepStr());
Assertions.assertEquals("hello", context.getData("test"));
}
flowExecutor.reloadRule();
LiteflowResponse response2 = flowExecutor.execute2Resp("chain1", "arg");
DefaultContext context2 = response2.getFirstContextBean();
Assertions.assertTrue(response2.isSuccess());
Assertions.assertEquals("a==>b==>s1[脚本s1]", response2.getExecuteStepStr());
Assertions.assertEquals("hello world", context2.getData("test"));
}
}

View File

@ -2,6 +2,7 @@ package com.yomahub.liteflow.test.parser;
import com.yomahub.liteflow.core.FlowExecutor;
import com.yomahub.liteflow.core.FlowExecutorHolder;
import com.yomahub.liteflow.exception.ChainNotFoundException;
import com.yomahub.liteflow.flow.LiteflowResponse;
import com.yomahub.liteflow.property.LiteflowConfig;
import com.yomahub.liteflow.test.BaseTest;
@ -33,4 +34,11 @@ public class JsonParserTest extends BaseTest {
Assertions.assertTrue(response.isSuccess());
}
@Test
public void testJsonDisableParser() {
Assertions.assertThrows(ChainNotFoundException.class,()->{
throw flowExecutor.execute2Resp("chain3", "arg").getCause();
});
}
}

View File

@ -2,6 +2,7 @@ package com.yomahub.liteflow.test.parser;
import com.yomahub.liteflow.core.FlowExecutor;
import com.yomahub.liteflow.core.FlowExecutorHolder;
import com.yomahub.liteflow.exception.ChainNotFoundException;
import com.yomahub.liteflow.flow.LiteflowResponse;
import com.yomahub.liteflow.property.LiteflowConfig;
import com.yomahub.liteflow.test.BaseTest;
@ -33,4 +34,11 @@ public class XmlParserTest extends BaseTest {
Assertions.assertTrue(response.isSuccess());
}
@Test
public void testXmlDisableParser() {
Assertions.assertThrows(ChainNotFoundException.class,()->{
throw flowExecutor.execute2Resp("chain3", "arg").getCause();
});
}
}

View File

@ -2,6 +2,7 @@ package com.yomahub.liteflow.test.parser;
import com.yomahub.liteflow.core.FlowExecutor;
import com.yomahub.liteflow.core.FlowExecutorHolder;
import com.yomahub.liteflow.exception.ChainNotFoundException;
import com.yomahub.liteflow.flow.LiteflowResponse;
import com.yomahub.liteflow.property.LiteflowConfig;
import com.yomahub.liteflow.test.BaseTest;
@ -33,4 +34,10 @@ public class YmlParserTest extends BaseTest {
Assertions.assertTrue(response.isSuccess());
}
@Test
public void testYmlDisableParser() {
Assertions.assertThrows(ChainNotFoundException.class,()->{
throw flowExecutor.execute2Resp("chain3", "arg").getCause();
});
}
}

View File

@ -40,6 +40,11 @@
{
"name": "chain1",
"value": "THEN(a,c,WHEN(b,d,SWITCH(e).to(f,g)), chain2);"
},
{
"name": "chain3",
"enable": "false",
"value": "THEN(a,c,WHEN(b,d,SWITCH(e).to(f,g)), chain2);"
}
]
}

View File

@ -17,4 +17,7 @@
<chain name="chain2">
THEN(c, g, f);
</chain>
<chain name="chain3" enable="false">
THEN(c, g, f);
</chain>
</flow>

View File

@ -20,3 +20,6 @@ flow:
value: "THEN(a, c, WHEN(b, d, SWITCH(e).to(f, g)), chain2);"
- name: chain2
value: "THEN(c, g, f);"
- name: chain3
value: "THEN(c, g, f);"
enable: false

View File

@ -2,6 +2,8 @@ package com.yomahub.liteflow.test.redis;
import cn.hutool.crypto.digest.DigestUtil;
import com.yomahub.liteflow.core.FlowExecutor;
import com.yomahub.liteflow.exception.ChainNotFoundException;
import com.yomahub.liteflow.flow.FlowBus;
import com.yomahub.liteflow.flow.LiteflowResponse;
import com.yomahub.liteflow.log.LFLog;
import com.yomahub.liteflow.log.LFLoggerManager;
@ -9,7 +11,9 @@ import com.yomahub.liteflow.parser.redis.mode.RClient;
import com.yomahub.liteflow.parser.redis.mode.polling.RedisParserPollingMode;
import com.yomahub.liteflow.slot.DefaultContext;
import com.yomahub.liteflow.test.BaseTest;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
@ -17,13 +21,15 @@ import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import javax.annotation.Resource;
import java.lang.reflect.Field;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.when;
/**
@ -163,4 +169,27 @@ public class RedisWithXmlELPollSpringbootTest extends BaseTest {
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("hello world", context.getData("test11"));
}
@Test
public void testDisablePollWithXml() throws InterruptedException {
Set<String> chainNameSet = new HashSet<>();
chainNameSet.add("chain1122:false");
String chainValue = "THEN(a, b, c);";
when(chainClient.hkeys("pollChainKey")).thenReturn(chainNameSet);
when(chainClient.hget("pollChainKey", "chain1122:true")).thenReturn(chainValue);
Set<String> scriptFieldSet = new HashSet<>();
scriptFieldSet.add("s4:script:脚本s3:groovy:false");
when(scriptClient.hkeys("pollScriptKey")).thenReturn(scriptFieldSet);
when(scriptClient.hget("pollScriptKey", "s4:script:脚本s3:groovy:true")).thenReturn("defaultContext.setData(\"test\",\"hello\");");
// 测试 chain 停用
Assertions.assertThrows(ChainNotFoundException.class, () -> {
throw flowExecutor.execute2Resp("chain1122", "arg").getCause();
});
// 测试 script 停用
Assertions.assertTrue(!FlowBus.getNodeMap().containsKey("s4"));
}
}

View File

@ -1,6 +1,8 @@
package com.yomahub.liteflow.test.zookeeper;
import com.yomahub.liteflow.core.FlowExecutor;
import com.yomahub.liteflow.exception.ChainNotFoundException;
import com.yomahub.liteflow.flow.FlowBus;
import com.yomahub.liteflow.flow.LiteflowResponse;
import com.yomahub.liteflow.slot.DefaultContext;
import com.yomahub.liteflow.test.BaseTest;
@ -66,6 +68,10 @@ public class ZkNodeWithXmlELSpringbootTest extends BaseTest {
zkClient.createPersistent(chain2Path, true);
zkClient.writeData(chain2Path, "THEN(a, b, c, s3);");
String chain3Path = ZK_CHAIN_PATH + "/chain3:false";
zkClient.createPersistent(chain3Path, true);
zkClient.writeData(chain3Path, "THEN(a, b, c, s3);");
String script1Path = ZK_SCRIPT_PATH + "/s1:script:脚本s1:groovy";
zkClient.createPersistent(script1Path, true);
zkClient.writeData(script1Path, "defaultContext.setData(\"test\",\"hello\");");
@ -77,6 +83,10 @@ public class ZkNodeWithXmlELSpringbootTest extends BaseTest {
String script3Path = ZK_SCRIPT_PATH + "/s3:script:脚本s3";
zkClient.createPersistent(script3Path, true);
zkClient.writeData(script3Path, "defaultContext.setData(\"test\",\"hello\");");
String script4Path = ZK_SCRIPT_PATH + "/s4:script:脚本s3:groovy:false";
zkClient.createPersistent(script4Path, true);
zkClient.writeData(script4Path, "defaultContext.setData(\"test\",\"hello\");");
}
@Test
@ -94,6 +104,14 @@ public class ZkNodeWithXmlELSpringbootTest extends BaseTest {
DefaultContext context = response.getFirstContextBean();
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("hello", context.getData("test"));
// 测试 chain 停用
Assertions.assertThrows(ChainNotFoundException.class, () -> {
throw flowExecutor.execute2Resp("chain3", "arg").getCause();
});
// 测试 script 停用
Assertions.assertTrue(!FlowBus.getNodeMap().containsKey("s4"));
}
@AfterAll