!129 支持etcd分离chain以及脚本的存储结构

Merge pull request !129 from zendwang/dev
This commit is contained in:
铂赛东 2022-11-10 07:12:04 +00:00 committed by Gitee
commit 39c94e77d0
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
8 changed files with 374 additions and 77 deletions

View File

@ -5,16 +5,21 @@ import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.KeyValue;
import io.etcd.jetcd.Watch;
import io.etcd.jetcd.options.GetOption;
import io.etcd.jetcd.options.WatchOption;
import io.etcd.jetcd.watch.WatchEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import static java.nio.charset.StandardCharsets.UTF_8;
/**
* Etcd 客户端封装类.
@ -49,7 +54,7 @@ public class EtcdClient {
public String get(final String key) {
List<KeyValue> keyValues = null;
try {
keyValues = client.getKVClient().get(ByteSequence.from(key, StandardCharsets.UTF_8)).get().getKvs();
keyValues = client.getKVClient().get(bytesOf(key)).get().getKvs();
} catch (InterruptedException | ExecutionException e) {
LOG.error(e.getMessage(), e);
}
@ -58,7 +63,7 @@ public class EtcdClient {
return null;
}
return keyValues.iterator().next().getValue().toString(StandardCharsets.UTF_8);
return keyValues.iterator().next().getValue().toString(UTF_8);
}
/**
@ -69,8 +74,8 @@ public class EtcdClient {
*/
public KeyValue put(final String key, final String value) {
KeyValue prevKv = null;
ByteSequence keyByteSequence = ByteSequence.from(key, StandardCharsets.UTF_8);
ByteSequence valueByteSequence = ByteSequence.from(value, StandardCharsets.UTF_8);
ByteSequence keyByteSequence = bytesOf(key);
ByteSequence valueByteSequence = bytesOf(value);
try {
prevKv = client.getKVClient().put(keyByteSequence, valueByteSequence).get().getPrevKv();
} catch (InterruptedException | ExecutionException e) {
@ -79,6 +84,44 @@ public class EtcdClient {
return prevKv;
}
/**
* get node sub nodes.
*
* @param prefix node prefix.
* @param separator separator char
* @return sub nodes
* @throws ExecutionException the exception
* @throws InterruptedException the exception
*/
public List<String> getChildrenKeys(final String prefix, final String separator) throws ExecutionException, InterruptedException {
ByteSequence prefixByteSequence = bytesOf(prefix);
GetOption getOption = GetOption.newBuilder()
.withPrefix(prefixByteSequence)
.withSortField(GetOption.SortTarget.KEY)
.withSortOrder(GetOption.SortOrder.ASCEND)
.build();
List<KeyValue> keyValues = client.getKVClient()
.get(prefixByteSequence, getOption)
.get()
.getKvs();
return keyValues.stream()
.map(e -> getSubNodeKeyName(prefix, e.getKey().toString(UTF_8), separator))
.distinct()
.filter(e -> Objects.nonNull(e))
.collect(Collectors.toList());
}
private String getSubNodeKeyName(final String prefix, final String fullPath, final String separator) {
if (prefix.length() > fullPath.length()) {
return null;
}
String pathWithoutPrefix = fullPath.substring(prefix.length());
return pathWithoutPrefix.contains(separator) ? pathWithoutPrefix.substring(1) : pathWithoutPrefix;
}
/**
* subscribe data change.
*
@ -90,16 +133,44 @@ public class EtcdClient {
final BiConsumer<String, String> updateHandler,
final Consumer<String> deleteHandler) {
Watch.Listener listener = watch(updateHandler, deleteHandler);
Watch.Watcher watch = client.getWatchClient().watch(ByteSequence.from(key, StandardCharsets.UTF_8), listener);
Watch.Watcher watch = client.getWatchClient().watch(bytesOf(key), listener);
watchCache.put(key, watch);
}
/**
* subscribe sub node change.
*
* @param key param node name.
* @param updateHandler sub node handler of update
* @param deleteHandler sub node delete of delete
*/
public void watchChildChange(final String key,
final BiConsumer<String, String> updateHandler,
final Consumer<String> deleteHandler) {
Watch.Listener listener = watch(updateHandler, deleteHandler);
WatchOption option = WatchOption.newBuilder()
.withPrefix(bytesOf(key))
.build();
Watch.Watcher watch = client.getWatchClient().watch(bytesOf(key), option, listener);
watchCache.put(key, watch);
}
/**
* bytesOf string.
* @param val val.
* @return bytes val.
*/
public ByteSequence bytesOf(final String val) {
return ByteSequence.from(val, UTF_8);
}
private Watch.Listener watch(final BiConsumer<String, String> updateHandler,
final Consumer<String> deleteHandler) {
return Watch.listener(response -> {
for (WatchEvent event : response.getEvents()) {
String path = event.getKeyValue().getKey().toString(StandardCharsets.UTF_8);
String value = event.getKeyValue().getValue().toString(StandardCharsets.UTF_8);
String path = event.getKeyValue().getKey().toString(UTF_8);
String value = event.getKeyValue().getValue().toString(UTF_8);
switch (event.getEventType()) {
case PUT:
updateHandler.accept(path, value);

View File

@ -39,11 +39,11 @@ public class EtcdXmlELParser extends ClassXmlFlowELParser {
throw new EtcdException("rule-source-ext-data is empty");
}
if (StrUtil.isBlank(etcdParserVO.getNodePath())){
etcdParserVO.setNodePath("/lite-flow/flow");
if (StrUtil.isBlank(etcdParserVO.getChainPath())){
throw new EtcdException("You must configure the chainPath property");
}
if (StrUtil.isBlank(etcdParserVO.getConnectStr())){
throw new EtcdException("Etcd connect string is empty");
if (StrUtil.isBlank(etcdParserVO.getEndpoints())){
throw new EtcdException("etcd endpoints is empty");
}
etcdParserHelper = new EtcdParserHelper(etcdParserVO);
@ -54,17 +54,20 @@ public class EtcdXmlELParser extends ClassXmlFlowELParser {
@Override
public String parseCustom() {
Consumer<String> parseConsumer = t -> {
try {
parse(t);
} catch (Exception e) {
throw new RuntimeException(e);
}
};
try {
String content = etcdParserHelper.getContent();
etcdParserHelper.checkContent(content);
etcdParserHelper.listen(parseConsumer);
Consumer<String> listenerConsumer = t -> {
try {
parse(t);
} catch (Exception e) {
throw new RuntimeException(e);
}
};
etcdParserHelper.listen(listenerConsumer);
return content;
} catch (Exception e){
throw new EtcdException(e.getMessage());

View File

@ -1,17 +1,27 @@
package com.yomahub.liteflow.parser.etcd.util;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.util.CharsetUtil;
import cn.hutool.core.util.ReUtil;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.exception.ParseException;
import com.yomahub.liteflow.parser.el.XmlFlowELParser;
import com.yomahub.liteflow.builder.LiteFlowNodeBuilder;
import com.yomahub.liteflow.builder.el.LiteFlowChainELBuilder;
import com.yomahub.liteflow.enums.NodeTypeEnum;
import com.yomahub.liteflow.flow.FlowBus;
import com.yomahub.liteflow.parser.etcd.EtcdClient;
import com.yomahub.liteflow.parser.etcd.exception.EtcdException;
import com.yomahub.liteflow.parser.etcd.vo.EtcdParserVO;
import com.yomahub.liteflow.spi.holder.ContextAwareHolder;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.ClientBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
/**
@ -22,22 +32,38 @@ public class EtcdParserHelper {
private static final Logger LOG = LoggerFactory.getLogger(EtcdParserHelper.class);
private final String CHAIN_XML_PATTERN = "<chain name=\"{}\">{}</chain>";
private final String NODE_XML_PATTERN = "<nodes>{}</nodes>";
private final String NODE_ITEM_XML_PATTERN = "<node id=\"{}\" name=\"{}\" type=\"{}\"><![CDATA[{}]]></node>";
private final String XML_PATTERN = "<?xml version=\"1.0\" encoding=\"UTF-8\"?><flow>{}{}</flow>";
private static final String SEPARATOR = "/";
private final EtcdParserVO etcdParserVO;
private EtcdClient etcdClient;
private EtcdClient client;
public EtcdParserHelper(EtcdParserVO etcdParserVO) {
this.etcdParserVO = etcdParserVO;
try{
try{
this.etcdClient = ContextAwareHolder.loadContextAware().getBean(EtcdClient.class);
this.client = ContextAwareHolder.loadContextAware().getBean(EtcdClient.class);
}catch (Exception ignored){}
if (this.etcdClient == null) {
Client client = Client.builder()
.endpoints(etcdParserVO.getConnectStr().split(","))
.build();
this.etcdClient = new EtcdClient(client);
if (this.client == null) {
ClientBuilder clientBuilder = Client.builder()
.endpoints(etcdParserVO.getEndpoints().split(","));
if (StrUtil.isNotBlank(etcdParserVO.getNamespace())) {
clientBuilder.namespace(ByteSequence.from(etcdParserVO.getNamespace(), CharsetUtil.CHARSET_UTF_8));
}
if (StrUtil.isAllNotBlank(etcdParserVO.getUser(), etcdParserVO.getPassword())) {
clientBuilder.user(ByteSequence.from(etcdParserVO.getUser(), CharsetUtil.CHARSET_UTF_8));
clientBuilder.password(ByteSequence.from(etcdParserVO.getPassword(), CharsetUtil.CHARSET_UTF_8));
}
this.client = new EtcdClient(clientBuilder.build());
}
}catch (Exception e){
throw new EtcdException(e.getMessage());
@ -46,29 +72,154 @@ public class EtcdParserHelper {
public String getContent(){
try{
return this.etcdClient.get(etcdParserVO.getNodePath());
//检查chainPath路径下有没有子节点
List<String> chainNameList = client.getChildrenKeys(etcdParserVO.getChainPath(), SEPARATOR);
if (CollectionUtil.isEmpty(chainNameList)){
throw new EtcdException(StrUtil.format("There are no chains in path [{}]", etcdParserVO.getChainPath()));
}
//获取chainPath路径下的所有子节点内容List
List<String> chainItemContentList = new ArrayList<>();
for (String chainName : chainNameList){
String chainData = client.get(StrUtil.format("{}/{}", etcdParserVO.getChainPath(), chainName));
if (StrUtil.isNotBlank(chainData)) {
chainItemContentList.add(StrUtil.format(CHAIN_XML_PATTERN, chainName, chainData));
}
}
//合并成所有chain的xml内容
String chainAllContent = CollUtil.join(chainItemContentList, StrUtil.EMPTY);
//检查是否有脚本内容如果有进行脚本内容的获取
String scriptAllContent = StrUtil.EMPTY;
if (hasScript()){
List<String> scriptNodeValueList = client.getChildrenKeys(etcdParserVO.getScriptPath(), SEPARATOR);
List<String> scriptItemContentList = new ArrayList<>();
for (String scriptNodeValue: scriptNodeValueList){
NodeSimpleVO nodeSimpleVO = convert(scriptNodeValue);
if (Objects.isNull(nodeSimpleVO)){
throw new EtcdException(StrUtil.format("The name of the etcd node is invalid:{}", scriptNodeValue));
}
String scriptData = client.get(StrUtil.format("{}/{}", etcdParserVO.getScriptPath(), scriptNodeValue));
scriptItemContentList.add(
StrUtil.format(NODE_ITEM_XML_PATTERN,
nodeSimpleVO.getNodeId(),
nodeSimpleVO.getName(),
nodeSimpleVO.getType(),
scriptData)
);
}
scriptAllContent = StrUtil.format(NODE_XML_PATTERN, CollUtil.join(scriptItemContentList, StrUtil.EMPTY));
}
return StrUtil.format(XML_PATTERN, scriptAllContent, chainAllContent);
}catch (Exception e){
throw new EtcdException(e.getMessage());
}
}
/**
* 检查 content 是否合法
*/
public void checkContent(String content) {
if (StrUtil.isBlank(content)) {
String error = MessageFormat.format("the node[{0}] value is empty", etcdParserVO.getNodePath());
throw new ParseException(error);
public boolean hasScript(){
//没有配置scriptPath
if (StrUtil.isBlank(etcdParserVO.getScriptPath())){
return false;
}
try{
//存在这个节点但是子节点不存在
List<String> chainNameList = client.getChildrenKeys(etcdParserVO.getScriptPath(), SEPARATOR);
if (CollUtil.isEmpty(chainNameList)){
return false;
}
return true;
}catch (Exception e){
return false;
}
}
/**
* 监听 etcd 节点
*/
public void listen(Consumer<String> parseConsumer) {
this.etcdClient.watchDataChange(this.etcdParserVO.getNodePath(), (updatePath, updateValue) -> {
LOG.info("starting load flow config....");
parseConsumer.accept(updateValue);
}, null);
this.client.watchChildChange(this.etcdParserVO.getChainPath(), (updatePath, updateValue) -> {
LOG.info("starting reload flow config... update path={} value={},", updatePath, updateValue);
String chainName = updatePath.replace(this.etcdParserVO.getChainPath() + SEPARATOR, "");
LiteFlowChainELBuilder.createChain().setChainName(chainName).setEL(updateValue).build();
}, (deletePath) -> {
LOG.info("starting reload flow config... delete path={}", deletePath);
String chainName = deletePath.replace(this.etcdParserVO.getChainPath() + SEPARATOR, "");
FlowBus.removeChain(chainName);
});
this.client.watchChildChange(this.etcdParserVO.getScriptPath(), (updatePath, updateValue) -> {
LOG.info("starting reload flow config... update path={} value={}", updatePath, updateValue);
String scriptNodeValue = updatePath.replace(this.etcdParserVO.getScriptPath() + SEPARATOR, "");;
NodeSimpleVO nodeSimpleVO = convert(scriptNodeValue);
LiteFlowNodeBuilder.createScriptNode().setId(nodeSimpleVO.getNodeId())
.setType(NodeTypeEnum.getEnumByCode(nodeSimpleVO.type))
.setName(nodeSimpleVO.getName())
.setScript(updateValue).build();
}, (deletePath) -> {
LOG.info("starting reload flow config... delete path={}", deletePath);
String scriptNodeValue = deletePath.replace(this.etcdParserVO.getScriptPath() + SEPARATOR, "");;
NodeSimpleVO nodeSimpleVO = convert(scriptNodeValue);
FlowBus.getNodeMap().remove(nodeSimpleVO.getNodeId());
});
}
public NodeSimpleVO convert(String str){
//不需要去理解这串正则就是一个匹配冒号的
//一定得是a:b或是a:b:c...这种完整类型的字符串的
List<String> matchItemList = ReUtil.findAllGroup0("(?<=[^:]:)[^:]+|[^:]+(?=:[^:])", str);
if (CollUtil.isEmpty(matchItemList)){
return null;
}
NodeSimpleVO nodeSimpleVO = new NodeSimpleVO();
if (matchItemList.size() > 1){
nodeSimpleVO.setNodeId(matchItemList.get(0));
nodeSimpleVO.setType(matchItemList.get(1));
}
if (matchItemList.size() > 2){
nodeSimpleVO.setName(matchItemList.get(2));
}
return nodeSimpleVO;
}
private static class NodeSimpleVO{
private String nodeId;
private String type;
private String name="";
public String getNodeId() {
return nodeId;
}
public void setNodeId(String nodeId) {
this.nodeId = nodeId;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
}

View File

@ -7,23 +7,63 @@ package com.yomahub.liteflow.parser.etcd.vo;
*/
public class EtcdParserVO {
private String connectStr;
private String endpoints;
private String nodePath;
private String user;
public String getConnectStr() {
return connectStr;
private String password;
private String namespace;
private String chainPath;
private String scriptPath;
public String getEndpoints() {
return endpoints;
}
public void setConnectStr(String connectStr) {
this.connectStr = connectStr;
public void setEndpoints(String endpoints) {
this.endpoints = endpoints;
}
public String getNodePath() {
return nodePath;
public String getUser() {
return user;
}
public void setNodePath(String nodePath) {
this.nodePath = nodePath;
public void setUser(String user) {
this.user = user;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public String getNamespace() {
return namespace;
}
public void setNamespace(String namespace) {
this.namespace = namespace;
}
public String getChainPath() {
return chainPath;
}
public void setChainPath(String chainPath) {
this.chainPath = chainPath;
}
public String getScriptPath() {
return scriptPath;
}
public void setScriptPath(String scriptPath) {
this.scriptPath = scriptPath;
}
}

View File

@ -26,6 +26,12 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.yomahub</groupId>
<artifactId>liteflow-script-groovy</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>

View File

@ -1,33 +1,24 @@
package com.yomahub.liteflow.test.etcd;
import cn.hutool.core.util.ReflectUtil;
import com.google.common.collect.Lists;
import com.yomahub.liteflow.core.FlowExecutor;
import com.yomahub.liteflow.enums.FlowParserTypeEnum;
import com.yomahub.liteflow.flow.FlowBus;
import com.yomahub.liteflow.flow.LiteflowResponse;
import com.yomahub.liteflow.parser.etcd.EtcdClient;
import com.yomahub.liteflow.parser.etcd.EtcdXmlELParser;
import com.yomahub.liteflow.parser.etcd.util.EtcdParserHelper;
import com.yomahub.liteflow.spi.holder.ContextAwareHolder;
import com.yomahub.liteflow.slot.DefaultContext;
import com.yomahub.liteflow.test.BaseTest;
import org.junit.*;
import org.junit.runner.RunWith;
import org.mockito.Answers;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.boot.test.mock.mockito.MockitoTestExecutionListener;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.test.context.TestExecutionListeners;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
import java.util.function.Consumer;
import static org.mockito.ArgumentMatchers.any;
import java.util.List;
import static org.mockito.Mockito.*;
/**
@ -46,6 +37,13 @@ public class EtcdWithXmlELSpringbootTest extends BaseTest {
@Resource
private FlowExecutor flowExecutor;
private static final String SEPARATOR = "/";
private static final String CHAIN_PATH = "/liteflow/chain";
private static final String SCRIPT_PATH = "/liteflow/script";
@Before
public void setUp(){
MockitoAnnotations.initMocks(this);
@ -58,29 +56,48 @@ public class EtcdWithXmlELSpringbootTest extends BaseTest {
@Test
public void testEtcdNodeWithXml1() throws Exception {
String flowXml = "<?xml version=\"1.0\" encoding=\"UTF-8\"?><flow><chain name=\"chain1\">THEN(a, b, c);</chain></flow>";
when(etcdClient.get(anyString())).thenReturn(flowXml);
List<String> chainNameList = Lists.newArrayList("chain1");
List<String> scriptNodeValueList = Lists.newArrayList("s1:script:脚本s1");
when(etcdClient.getChildrenKeys(anyString(), anyString())).thenReturn(chainNameList).thenReturn(scriptNodeValueList);
String chain1Data = "THEN(a, b, c, s1);";
String scriptNodeValue = "defaultContext.setData(\"test\",\"hello\");";
when(etcdClient.get(anyString())).thenReturn(chain1Data).thenReturn(scriptNodeValue);
LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg");
DefaultContext context = response.getFirstContextBean();
Assert.assertTrue(response.isSuccess());
Assert.assertEquals("a==>b==>c", response.getExecuteStepStr());
Assert.assertEquals("a==>b==>c==>s1[脚本s1]", response.getExecuteStepStr());
Assert.assertEquals("hello", context.getData("test"));
}
@Test
public void testEtcdNodeWithXml2() throws Exception {
String flowXml = "<?xml version=\"1.0\" encoding=\"UTF-8\"?><flow><chain name=\"chain1\">THEN(a, b, c);</chain></flow>";
String changedFlowXml = "<?xml version=\"1.0\" encoding=\"UTF-8\"?><flow><chain name=\"chain1\">THEN(a, c);</chain></flow>";
when(etcdClient.get(anyString())).thenReturn(flowXml).thenReturn(changedFlowXml);
List<String> chainNameList = Lists.newArrayList("chain1");
List<String> scriptNodeValueList = Lists.newArrayList("s1:script:脚本s1");
when(etcdClient.getChildrenKeys(CHAIN_PATH, SEPARATOR)).thenReturn(chainNameList);
when(etcdClient.getChildrenKeys(SCRIPT_PATH, SEPARATOR)).thenReturn(scriptNodeValueList);
String chain1Data = "THEN(a, b, c, s1);";
String chain1ChangedData = "THEN(a, b, s1);";
String scriptNodeValue = "defaultContext.setData(\"test\",\"hello\");";
String scriptNodeChangedValue = "defaultContext.setData(\"test\",\"hello world\");";
when(etcdClient.get(CHAIN_PATH + SEPARATOR + "chain1")).thenReturn(chain1Data).thenReturn(chain1ChangedData);
when(etcdClient.get(SCRIPT_PATH + SEPARATOR + "s1:script:脚本s1")).thenReturn(scriptNodeValue).thenReturn(scriptNodeChangedValue);
LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg");
DefaultContext context = response.getFirstContextBean();
Assert.assertTrue(response.isSuccess());
Assert.assertEquals("a==>b==>c", response.getExecuteStepStr());
Assert.assertEquals("a==>b==>c==>s1[脚本s1]", response.getExecuteStepStr());
Assert.assertEquals("hello", context.getData("test"));
// 手动触发一次 模拟节点数据变更
FlowBus.refreshFlowMetaData(FlowParserTypeEnum.TYPE_EL_XML,changedFlowXml);
flowExecutor.reloadRule();
LiteflowResponse response2 = flowExecutor.execute2Resp("chain1", "arg");
DefaultContext context2 = response2.getFirstContextBean();
Assert.assertTrue(response2.isSuccess());
Assert.assertEquals("a==>c", response2.getExecuteStepStr());
Assert.assertEquals("a==>b==>s1[脚本s1]", response2.getExecuteStepStr());
Assert.assertEquals("hello world", context2.getData("test"));
}
}

View File

@ -1,2 +1,6 @@
liteflow.rule-source-ext-data={"connectStr":"http://localhost:2379,http://localhost:3379,http://localhost:4379"}
liteflow.rule-source-ext-data={\
"endpoints":"http://127.0.0.1:2379,http://127.0.0.1:3379,http://127.0.0.1:4379",\
"chainPath": "/liteflow/chain",\
"scriptPath": "/liteflow/script"\
}
liteflow.parse-on-start=false

View File

@ -1 +1,6 @@
liteflow.rule-source-ext-data={"connectStr":"http://localhost:2379"}
liteflow.rule-source-ext-data={\
"endpoints":"http://127.0.0.1:2379",\
"chainPath": "/liteflow/chain",\
"scriptPath": "/liteflow/script"\
}
liteflow.parse-on-start=false