enhancement #I5ZLH6 支持zk分离chain以及脚本的存储结构

This commit is contained in:
everywhere.z 2022-11-05 11:01:32 +08:00
parent 657a557d7e
commit 3ec3cfd346
5 changed files with 181 additions and 51 deletions

View File

@ -29,7 +29,9 @@ public class JDBCHelper {
private static final String SCRIPT_SQL_PATTERN = "SELECT {},{},{},{} FROM {} WHERE {}=?";
private static final String CHAIN_XML_PATTERN = "<chain name=\"{}\">{}</chain>";
private static final String NODE_XML_PATTERN = "<nodes><node id=\"{}\" name=\"{}\" type=\"{}\"><![CDATA[{}]]></node></nodes>";
private static final String NODE_XML_PATTERN = "<nodes>{}</nodes>";
private static final String NODE_ITEM_XML_PATTERN = "<node id=\"{}\" name=\"{}\" type=\"{}\"><![CDATA[{}]]></node>";
private static final String XML_PATTERN = "<?xml version=\"1.0\" encoding=\"UTF-8\"?><flow>{}{}</flow>";
private static final Integer FETCH_SIZE_MAX = 1000;
@ -178,7 +180,7 @@ public class JDBCHelper {
throw new ELSQLException(StrUtil.format("The type value[{}] is not a script type", type));
}
result.add(StrUtil.format(NODE_XML_PATTERN, id, name, type, data));
result.add(StrUtil.format(NODE_ITEM_XML_PATTERN, id, name, type, data));
}
} catch (Exception e) {
throw new ELSQLException(e.getMessage());
@ -186,7 +188,7 @@ public class JDBCHelper {
// 关闭连接
close(conn, stmt, rs);
}
return CollUtil.join(result, StrUtil.EMPTY);
return StrUtil.format(NODE_XML_PATTERN, CollUtil.join(result, StrUtil.EMPTY));
}
/**

View File

@ -17,8 +17,8 @@
<groupId>com.yomahub</groupId>
<artifactId>liteflow-core</artifactId>
<version>${revision}</version>
<optional>true</optional>
<scope>provided</scope>
<!--<optional>true</optional>
<scope>provided</scope>-->
</dependency>
<dependency>

View File

@ -25,13 +25,6 @@ public class ZkXmlELParser extends ClassXmlFlowELParser {
private final ZkParserHelper zkParserHelper;
public ZkXmlELParser() {
Consumer<String> parseConsumer = t -> {
try {
parse(t);
} catch (Exception e) {
throw new RuntimeException(e);
}
};
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
try{
@ -46,14 +39,14 @@ public class ZkXmlELParser extends ClassXmlFlowELParser {
throw new ZkException("rule-source-ext-data is empty");
}
if (StrUtil.isBlank(zkParserVO.getNodePath())){
zkParserVO.setNodePath("/lite-flow/flow");
if (StrUtil.isBlank(zkParserVO.getChainPath())){
throw new ZkException("You must configure the chainPath property");
}
if (StrUtil.isBlank(zkParserVO.getConnectStr())){
throw new ZkException("zk connect string is empty");
}
zkParserHelper = new ZkParserHelper(zkParserVO, parseConsumer);
zkParserHelper = new ZkParserHelper(zkParserVO);
}catch (Exception e){
throw new ZkException(e.getMessage());
}
@ -64,9 +57,14 @@ public class ZkXmlELParser extends ClassXmlFlowELParser {
try{
String content = zkParserHelper.getContent();
zkParserHelper.checkContent(content);
zkParserHelper.listenZkNode();
Consumer<String> listenerConsumer = s -> {
try{
parse(s);
}catch (Exception e){
throw new ZkException(e.getMessage());
}
};
zkParserHelper.listenZkNode(listenerConsumer);
return content;
}catch (Exception e){

View File

@ -1,20 +1,24 @@
package com.yomahub.liteflow.parser.zk.util;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.util.ReUtil;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.exception.ParseException;
import com.yomahub.liteflow.core.FlowExecutorHolder;
import com.yomahub.liteflow.parser.zk.exception.ZkException;
import com.yomahub.liteflow.parser.zk.vo.ZkParserVO;
import com.yomahub.liteflow.util.JsonUtil;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.retry.RetryNTimes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
public class ZkParserHelper {
@ -22,13 +26,18 @@ public class ZkParserHelper {
private static final Logger LOG = LoggerFactory.getLogger(ZkParserHelper.class);
private final ZkParserVO zkParserVO;
private final Consumer<String> parseConsumer;
private final CuratorFramework client;
public ZkParserHelper(ZkParserVO zkParserVO, Consumer<String> parseConsumer) {
private final String CHAIN_XML_PATTERN = "<chain name=\"{}\">{}</chain>";
private final String NODE_XML_PATTERN = "<nodes>{}</nodes>";
private final String NODE_ITEM_XML_PATTERN = "<node id=\"{}\" name=\"{}\" type=\"{}\"><![CDATA[{}]]></node>";
private final String XML_PATTERN = "<?xml version=\"1.0\" encoding=\"UTF-8\"?><flow>{}{}</flow>";
public ZkParserHelper(ZkParserVO zkParserVO) {
this.zkParserVO = zkParserVO;
this.parseConsumer = parseConsumer;
try{
CuratorFramework client = CuratorFrameworkFactory.newClient(
@ -37,46 +46,157 @@ public class ZkParserHelper {
);
client.start();
if (client.checkExists().forPath(zkParserVO.getNodePath()) == null) {
client.create().creatingParentsIfNeeded().forPath(zkParserVO.getNodePath(), "".getBytes());
}
this.client = client;
}catch (Exception e){
throw new ZkException(e.getMessage());
}
}
public String getContent(){
try{
return new String(client.getData().forPath(zkParserVO.getNodePath()));
//检查zk上有没有chainPath节点
if (client.checkExists().forPath(zkParserVO.getChainPath()) == null) {
throw new ZkException(StrUtil.format("zk node[{}] is not exist", zkParserVO.getChainPath()));
}
//检查chainPath路径下有没有子节点
List<String> chainNameList = client.getChildren().forPath(zkParserVO.getChainPath());
if (CollectionUtil.isEmpty(chainNameList)){
throw new ZkException(StrUtil.format("There are no chains in path [{}]", zkParserVO.getChainPath()));
}
//获取chainPath路径下的所有子节点内容List
List<String> chainItemContentList = new ArrayList<>();
for (String chainName : chainNameList){
String chainData = new String(client.getData().forPath(StrUtil.format("{}/{}", zkParserVO.getChainPath(), chainName)));
chainItemContentList.add(StrUtil.format(CHAIN_XML_PATTERN, chainName, chainData));
}
//合并成所有chain的xml内容
String chainAllContent = CollUtil.join(chainItemContentList, StrUtil.EMPTY);
//检查是否有脚本内容如果有进行脚本内容的获取
String scriptAllContent = StrUtil.EMPTY;
if (hasScript()){
List<String> scriptNodeValueList = client.getChildren().forPath(zkParserVO.getScriptPath());
List<String> scriptItemContentList = new ArrayList<>();
for (String scriptNodeValue: scriptNodeValueList){
NodeSimpleVO nodeSimpleVO = convert(scriptNodeValue);
if (Objects.isNull(nodeSimpleVO)){
throw new ZkException(StrUtil.format("The name of the zk node is invalid:{}", scriptNodeValue));
}
String scriptData = new String(
client.getData().forPath(StrUtil.format("{}/{}", zkParserVO.getScriptPath(), scriptNodeValue))
);
scriptItemContentList.add(
StrUtil.format(NODE_ITEM_XML_PATTERN,
nodeSimpleVO.getNodeId(),
nodeSimpleVO.getName(),
nodeSimpleVO.getType(),
scriptData)
);
}
scriptAllContent = StrUtil.format(NODE_XML_PATTERN, CollUtil.join(scriptItemContentList, StrUtil.EMPTY));
}
return StrUtil.format(XML_PATTERN, scriptAllContent, chainAllContent);
}catch (Exception e){
throw new ZkException(e.getMessage());
}
}
/**
* 检查 content 是否合法
*/
public void checkContent(String content) {
if (StrUtil.isBlank(content)) {
String error = MessageFormat.format("the node[{0}] value is empty", zkParserVO.getNodePath());
throw new ParseException(error);
public boolean hasScript(){
//没有配置scriptPath
if (StrUtil.isBlank(zkParserVO.getScriptPath())){
return false;
}
try{
//配置了但是不存在这个节点
if (client.checkExists().forPath(zkParserVO.getScriptPath()) == null){
return false;
}
//存在这个节点但是子节点不存在
List<String> chainNameList = client.getChildren().forPath(zkParserVO.getScriptPath());
if (CollUtil.isEmpty(chainNameList)){
return false;
}
return true;
}catch (Exception e){
return false;
}
}
/**
* 监听 zk 节点
*/
public void listenZkNode() {
CuratorCache cache = CuratorCache.build(client, zkParserVO.getNodePath());
public void listenZkNode(Consumer<String> listenerConsumer) {
//监听chain
CuratorCache cache1 = CuratorCache.build(client, zkParserVO.getChainPath());
cache1.start();
cache1.listenable().addListener((type, oldData, data) -> listenerConsumer.accept(getContent()));
cache.start();
//监听script
CuratorCache cache2 = CuratorCache.build(client, zkParserVO.getScriptPath());
cache2.start();
cache2.listenable().addListener((type, oldData, data) -> listenerConsumer.accept(getContent()));
}
cache.listenable().addListener((type, oldData, data) -> {
String content1 = new String(data.getData());
LOG.info("stating load flow config....");
parseConsumer.accept(content1);
});
public NodeSimpleVO convert(String str){
//不需要去理解这串正则就是一个匹配冒号的
//一定得是a:b或是a:b:c...这种完整类型的字符串的
List<String> matchItemList = ReUtil.findAllGroup0("(?<=[^:]:)[^:]+|[^:]+(?=:[^:])", str);
if (CollUtil.isEmpty(matchItemList)){
return null;
}
NodeSimpleVO nodeSimpleVO = new NodeSimpleVO();
if (matchItemList.size() > 1){
nodeSimpleVO.setNodeId(matchItemList.get(0));
nodeSimpleVO.setType(matchItemList.get(1));
}
if (matchItemList.size() > 2){
nodeSimpleVO.setName(matchItemList.get(2));
}
return nodeSimpleVO;
}
private static class NodeSimpleVO{
private String nodeId;
private String type;
private String name="";
public String getNodeId() {
return nodeId;
}
public void setNodeId(String nodeId) {
this.nodeId = nodeId;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
}

View File

@ -9,7 +9,9 @@ public class ZkParserVO {
private String connectStr;
private String nodePath;
private String chainPath;
private String scriptPath;
public String getConnectStr() {
return connectStr;
@ -19,11 +21,19 @@ public class ZkParserVO {
this.connectStr = connectStr;
}
public String getNodePath() {
return nodePath;
public String getChainPath() {
return chainPath;
}
public void setNodePath(String nodePath) {
this.nodePath = nodePath;
public void setChainPath(String chainPath) {
this.chainPath = chainPath;
}
public String getScriptPath() {
return scriptPath;
}
public void setScriptPath(String scriptPath) {
this.scriptPath = scriptPath;
}
}