Merge branch 'feature-rule-plugin-etcd' into dev

# Conflicts:
#	liteflow-rule-plugin/pom.xml
#	liteflow-testcase-el/pom.xml
This commit is contained in:
zendwang 2022-09-20 09:09:32 +08:00
commit e383cb7b73
20 changed files with 590 additions and 2 deletions

View File

@ -0,0 +1,27 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>liteflow-rule-plugin</artifactId>
<groupId>com.yomahub</groupId>
<version>${revision}</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>liteflow-rule-etcd</artifactId>
<dependencies>
<dependency>
<groupId>com.yomahub</groupId>
<artifactId>liteflow-core</artifactId>
<version>${revision}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.etcd</groupId>
<artifactId>jetcd-core</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,127 @@
package com.yomahub.liteflow.parser.etcd;
import cn.hutool.core.collection.CollUtil;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.KeyValue;
import io.etcd.jetcd.Watch;
import io.etcd.jetcd.watch.WatchEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
/**
* Etcd 客户端封装类.
* @author zendwang
* @since 2.9.0
*/
public class EtcdClient {
private static final Logger LOG = LoggerFactory.getLogger(EtcdClient.class);
private Client client;
private final ConcurrentHashMap<String, Watch.Watcher> watchCache = new ConcurrentHashMap<>();
public EtcdClient(final Client client) {
this.client = client;
}
/**
* close client.
*/
public void close() {
this.client.close();
}
/**
* get node value.
*
* @param key node name
* @return string
*/
public String get(final String key) {
List<KeyValue> keyValues = null;
try {
keyValues = client.getKVClient().get(ByteSequence.from(key, StandardCharsets.UTF_8)).get().getKvs();
} catch (InterruptedException | ExecutionException e) {
LOG.error(e.getMessage(), e);
}
if (CollUtil.isEmpty(keyValues)) {
return null;
}
return keyValues.iterator().next().getValue().toString(StandardCharsets.UTF_8);
}
/**
* put a key-value pair into etcd.
* @param key node name
* @param value node value
* @return
*/
public KeyValue put(final String key, final String value) {
KeyValue prevKv = null;
ByteSequence keyByteSequence = ByteSequence.from(key, StandardCharsets.UTF_8);
ByteSequence valueByteSequence = ByteSequence.from(value, StandardCharsets.UTF_8);
try {
prevKv = client.getKVClient().put(keyByteSequence, valueByteSequence).get().getPrevKv();
} catch (InterruptedException | ExecutionException e) {
LOG.error(e.getMessage(), e);
}
return prevKv;
}
/**
* subscribe data change.
*
* @param key node name
* @param updateHandler node value handler of update
* @param deleteHandler node value handler of delete
*/
public void watchDataChange(final String key,
final BiConsumer<String, String> updateHandler,
final Consumer<String> deleteHandler) {
Watch.Listener listener = watch(updateHandler, deleteHandler);
Watch.Watcher watch = client.getWatchClient().watch(ByteSequence.from(key, StandardCharsets.UTF_8), listener);
watchCache.put(key, watch);
}
private Watch.Listener watch(final BiConsumer<String, String> updateHandler,
final Consumer<String> deleteHandler) {
return Watch.listener(response -> {
for (WatchEvent event : response.getEvents()) {
String path = event.getKeyValue().getKey().toString(StandardCharsets.UTF_8);
String value = event.getKeyValue().getValue().toString(StandardCharsets.UTF_8);
switch (event.getEventType()) {
case PUT:
updateHandler.accept(path, value);
continue;
case DELETE:
deleteHandler.accept(path);
continue;
default:
}
}
});
}
/**
* cancel subscribe.
*
* @param key node name
*/
public void watchClose(final String key) {
if (watchCache.containsKey(key)) {
watchCache.get(key).close();
watchCache.remove(key);
}
}
}

View File

@ -0,0 +1,65 @@
package com.yomahub.liteflow.parser.etcd;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.parser.el.ClassXmlFlowELParser;
import com.yomahub.liteflow.parser.etcd.exception.EtcdException;
import com.yomahub.liteflow.parser.etcd.util.EtcdParserHelper;
import com.yomahub.liteflow.parser.etcd.vo.EtcdParserVO;
import com.yomahub.liteflow.property.LiteflowConfig;
import com.yomahub.liteflow.property.LiteflowConfigGetter;
import com.yomahub.liteflow.util.JsonUtil;
import java.util.function.Consumer;
/**
* Etcd解析器实现只支持EL形式的XML不支持其他的形式
* @author zendwang
* @since 2.9.0
*/
public class EtcdXmlELParser extends ClassXmlFlowELParser {
private final EtcdParserHelper etcdParserHelper;
public EtcdXmlELParser() {
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
if (StrUtil.isBlank(liteflowConfig.getRuleSourceExtData())){
throw new EtcdException("rule-source-ext-data is empty");
}
try{
EtcdParserVO etcdParserVO = JsonUtil.parseObject(liteflowConfig.getRuleSourceExtData(), EtcdParserVO.class);
assert etcdParserVO != null;
if (StrUtil.isBlank(etcdParserVO.getNodePath())){
etcdParserVO.setNodePath("/lite-flow/flow");
}
if (StrUtil.isBlank(etcdParserVO.getConnectStr())){
throw new EtcdException("Etcd connect string is empty");
}
etcdParserHelper = new EtcdParserHelper(etcdParserVO);
}catch (Exception e){
throw new EtcdException(e.getMessage());
}
}
@Override
public String parseCustom() {
Consumer<String> parseConsumer = t -> {
try {
parse(t);
} catch (Exception e) {
throw new RuntimeException(e);
}
};
try {
String content = etcdParserHelper.getContent();
etcdParserHelper.checkContent(content);
etcdParserHelper.listen(parseConsumer);
return content;
} catch (Exception e){
throw new EtcdException(e.getMessage());
}
}
}

View File

@ -0,0 +1,28 @@
package com.yomahub.liteflow.parser.etcd.exception;
/**
* Etcd解析异常
* @author zendwang
* @since 2.9.0
*/
public class EtcdException extends RuntimeException {
private static final long serialVersionUID = 1L;
/** 异常信息 */
private String message;
public EtcdException(String message) {
this.message = message;
}
@Override
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
}

View File

@ -0,0 +1,72 @@
package com.yomahub.liteflow.parser.etcd.util;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.exception.ParseException;
import com.yomahub.liteflow.parser.el.XmlFlowELParser;
import com.yomahub.liteflow.parser.etcd.EtcdClient;
import com.yomahub.liteflow.parser.etcd.exception.EtcdException;
import com.yomahub.liteflow.parser.etcd.vo.EtcdParserVO;
import com.yomahub.liteflow.spi.holder.ContextAwareHolder;
import io.etcd.jetcd.Client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.MessageFormat;
import java.util.function.Consumer;
/**
* @author zendwang
* @since 2.9.0
*/
public class EtcdParserHelper {
private static final Logger LOG = LoggerFactory.getLogger(EtcdParserHelper.class);
private final EtcdParserVO etcdParserVO;
private EtcdClient etcdClient;
public EtcdParserHelper(EtcdParserVO etcdParserVO) {
this.etcdParserVO = etcdParserVO;
try{
this.etcdClient = ContextAwareHolder.loadContextAware().getBean(EtcdClient.class);
if (this.etcdClient == null) {
Client client = Client.builder()
.endpoints(etcdParserVO.getConnectStr().split(","))
.build();
this.etcdClient = new EtcdClient(client);
}
}catch (Exception e){
throw new EtcdException(e.getMessage());
}
}
public String getContent(){
try{
return this.etcdClient.get(etcdParserVO.getNodePath());
}catch (Exception e){
throw new EtcdException(e.getMessage());
}
}
/**
* 检查 content 是否合法
*/
public void checkContent(String content) {
if (StrUtil.isBlank(content)) {
String error = MessageFormat.format("the node[{0}] value is empty", etcdParserVO.getNodePath());
throw new ParseException(error);
}
}
/**
* 监听 etcd 节点
*/
public void listen(Consumer<String> parseConsumer) {
this.etcdClient.watchDataChange(this.etcdParserVO.getNodePath(), (updatePath, updateValue) -> {
LOG.info("starting load flow config....");
parseConsumer.accept(updateValue);
}, null);
}
}

View File

@ -0,0 +1,29 @@
package com.yomahub.liteflow.parser.etcd.vo;
/**
* 用于解析RuleSourceExtData的vo类用于etcd模式中
* @author zendwang
* @since 2.9.0
*/
public class EtcdParserVO {
private String connectStr;
private String nodePath;
public String getConnectStr() {
return connectStr;
}
public void setConnectStr(String connectStr) {
this.connectStr = connectStr;
}
public String getNodePath() {
return nodePath;
}
public void setNodePath(String nodePath) {
this.nodePath = nodePath;
}
}

View File

@ -0,0 +1,18 @@
package com.yomahub.liteflow.parser.spi.etcd;
import com.yomahub.liteflow.parser.etcd.EtcdXmlELParser;
import com.yomahub.liteflow.parser.spi.ParserClassNameSpi;
/**
* Etcd解析器SPI实现
* @author zendwang
* @since 2.9.0
*/
public class EtcdParserClassNameSpi implements ParserClassNameSpi {
@Override
public String getSpiClassName() {
return EtcdXmlELParser.class.getName();
}
}

View File

@ -0,0 +1 @@
com.yomahub.liteflow.parser.spi.etcd.EtcdParserClassNameSpi

View File

@ -13,6 +13,7 @@
<modules>
<module>liteflow-rule-zk</module>
<module>liteflow-rule-sql</module>
<module>liteflow-rule-etcd</module>
</modules>
<artifactId>liteflow-rule-plugin</artifactId>

View File

@ -0,0 +1,57 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>liteflow-testcase-el</artifactId>
<groupId>com.yomahub</groupId>
<version>${revision}</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>liteflow-testcase-el-etcd-springboot</artifactId>
<dependencies>
<dependency>
<groupId>com.yomahub</groupId>
<artifactId>liteflow-spring-boot-starter</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>com.yomahub</groupId>
<artifactId>liteflow-rule-etcd</artifactId>
<version>${revision}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${springboot.version}</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.8.2</version>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,20 @@
package com.yomahub.liteflow.test;
import com.yomahub.liteflow.flow.FlowBus;
import com.yomahub.liteflow.property.LiteflowConfigGetter;
import com.yomahub.liteflow.spi.holder.SpiFactoryCleaner;
import com.yomahub.liteflow.spring.ComponentScanner;
import com.yomahub.liteflow.thread.ExecutorHelper;
import org.junit.AfterClass;
public class BaseTest {
@AfterClass
public static void cleanScanCache(){
ComponentScanner.cleanCache();
FlowBus.cleanCache();
ExecutorHelper.loadInstance().clearExecutorServiceMap();
SpiFactoryCleaner.clean();
LiteflowConfigGetter.clean();
}
}

View File

@ -0,0 +1,66 @@
package com.yomahub.liteflow.test.etcd;
import cn.hutool.core.util.ReflectUtil;
import com.yomahub.liteflow.core.FlowExecutor;
import com.yomahub.liteflow.flow.LiteflowResponse;
import com.yomahub.liteflow.parser.etcd.EtcdClient;
import com.yomahub.liteflow.parser.etcd.EtcdXmlELParser;
import com.yomahub.liteflow.parser.etcd.util.EtcdParserHelper;
import com.yomahub.liteflow.spi.holder.ContextAwareHolder;
import com.yomahub.liteflow.test.BaseTest;
import org.junit.*;
import org.junit.runner.RunWith;
import org.mockito.Answers;
import org.mockito.MockitoAnnotations;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
import java.util.function.Consumer;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
/**
* springboot环境下的etcd 规则解析器 测试
*/
@RunWith(SpringRunner.class)
@TestPropertySource(value = "classpath:/etcd/application-xml-cluster.properties")
@SpringBootTest(classes = EtcdWithXmlELSpringbootTest.class)
@EnableAutoConfiguration
@ComponentScan({"com.yomahub.liteflow.test.etcd.cmp"})
public class EtcdWithXmlELSpringbootTest extends BaseTest {
@MockBean(answer= Answers.RETURNS_MOCKS)
private EtcdClient etcdClient;
@Resource
private FlowExecutor flowExecutor;
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
String flowXml = "<?xml version=\"1.0\" encoding=\"UTF-8\"?><flow><chain name=\"chain1\">THEN(a, b, c);</chain></flow>";
String changedFlowXml = "<?xml version=\"1.0\" encoding=\"UTF-8\"?><flow><chain name=\"chain1\">THEN(a, c);</chain></flow>";
when(etcdClient.get(any())).thenReturn(flowXml).thenReturn(changedFlowXml);
}
@Test
public void testEtcdNodeWithXml() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg");
Assert.assertTrue(response.isSuccess());
Assert.assertTrue("a==>b==>c".equals(response.getExecuteStepStr()));
// 手动触发一次 模拟节点数据变更
EtcdXmlELParser parser = ContextAwareHolder.loadContextAware().getBean(EtcdXmlELParser.class);
parser.parse(etcdClient.get("/lite-flow/flow"));
LiteflowResponse response2 = flowExecutor.execute2Resp("chain1", "arg");
Assert.assertTrue(response2.isSuccess());
Assert.assertTrue("a==>c".equals(response2.getExecuteStepStr()));
}
}

View File

@ -0,0 +1,20 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.etcd.cmp;
import com.yomahub.liteflow.core.NodeComponent;
import org.springframework.stereotype.Component;
@Component("a")
public class ACmp extends NodeComponent {
@Override
public void process() {
System.out.println("ACmp executed!");
}
}

View File

@ -0,0 +1,21 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.etcd.cmp;
import com.yomahub.liteflow.core.NodeComponent;
import org.springframework.stereotype.Component;
@Component("b")
public class BCmp extends NodeComponent {
@Override
public void process() {
System.out.println("BCmp executed!");
}
}

View File

@ -0,0 +1,21 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.etcd.cmp;
import com.yomahub.liteflow.core.NodeComponent;
import org.springframework.stereotype.Component;
@Component("c")
public class CCmp extends NodeComponent {
@Override
public void process() {
System.out.println("CCmp executed!");
}
}

View File

@ -0,0 +1,2 @@
liteflow.rule-source-ext-data={"connectStr":"http://localhost:2379,http://localhost:3379,http://localhost:4379"}
liteflow.parse-on-start=false

View File

@ -0,0 +1 @@
liteflow.rule-source-ext-data={"connectStr":"http://localhost:2379"}

View File

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<flow>
<chain name="chain1">
THEN(a, b, c);
</chain>
</flow>

View File

@ -24,5 +24,6 @@
<module>liteflow-testcase-el-script-qlexpress-springboot</module>
<module>liteflow-testcase-el-zk-springboot</module>
<module>liteflow-testcase-el-sql-springboot</module>
<module>liteflow-testcase-el-etcd-springboot</module>
</modules>
</project>

View File

@ -56,6 +56,7 @@
<transmittable-thread-local.version>2.12.3</transmittable-thread-local.version>
<curator-test.version>5.1.0</curator-test.version>
<zkclient.version>0.10</zkclient.version>
<jetcd.version>0.5.0</jetcd.version>
<qlexpress.version>3.3.0</qlexpress.version>
<groovy.version>3.0.8</groovy.version>
<bytebuddy.version>1.11.13</bytebuddy.version>
@ -167,7 +168,11 @@
<artifactId>zkclient</artifactId>
<version>${zkclient.version}</version>
</dependency>
<dependency>
<groupId>io.etcd</groupId>
<artifactId>jetcd-core</artifactId>
<version>${jetcd.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>QLExpress</artifactId>
@ -324,7 +329,7 @@
<module>liteflow-spring</module>
<module>liteflow-testcase-old</module>
<module>liteflow-testcase-el</module>
</modules>
</modules>
<distributionManagement>
<snapshotRepository>