fix:bug && add junit test

This commit is contained in:
houxinyu 2023-07-24 21:01:02 +08:00
parent c0e4d922f3
commit eb1bec2078
15 changed files with 428 additions and 29 deletions

View File

@ -8,9 +8,7 @@ import com.yomahub.liteflow.log.LFLog;
import com.yomahub.liteflow.parser.redis.vo.RedisParserVO;
import redis.clients.jedis.Jedis;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.*;
/**
* 用于轮询chain的定时任务
@ -52,6 +50,7 @@ public class ChainPollingTask {
//修改chainNum为最新chain数量
chainNum = Integer.parseInt(keyNum);
List<String> needDelete = new ArrayList<>();
//遍历Map,判断各个chain的value有无变化修改变化了值的chain和被删除的chain
for(Map.Entry<String, String> entry: chainSHAMap.entrySet()) {
String chainId = entry.getKey();
@ -63,14 +62,15 @@ public class ChainPollingTask {
FlowBus.removeChain(chainId);
LOG.info("starting reload flow config... delete key={}", chainId);
//修改SHAMap
chainSHAMap.remove(chainId);
//添加到待删除的list 后续统一从SHAMap中移除
//不在这里直接移除是为了避免先删除导致chainSHAMap并没有完全遍历完 chain删除不全
needDelete.add(chainId);
}
else if (!StrUtil.equals(newSHA, oldSHA)) {
//SHA值发生变化,表示该chain的值已被修改,重新拉取变化的chain
String chainData = chainJedis.hget(chainKey, chainId);
LiteFlowChainELBuilder.createChain().setChainId(chainId).setEL(chainData).build();
LOG.info("starting poll flow config... update key={} new value={},", chainId, chainData);
LOG.info("starting reload flow config... update key={} new value={},", chainId, chainData);
//修改SHAMap
chainSHAMap.put(chainId, newSHA);
@ -78,6 +78,11 @@ public class ChainPollingTask {
//SHA值无变化,表示该chain未改变
}
//统一从SHAMap中移除要删除的chain
for (String chainId : needDelete) {
chainSHAMap.remove(chainId);
}
//处理新添加chain和chainId被修改的情况
if (chainNum > chainSHAMap.size()) {
//如果封装的SHAMap数量比最新chain总数少, 说明有两种情况
@ -91,7 +96,7 @@ public class ChainPollingTask {
//将新chainId添加到LiteFlowChainELBuilder和SHAMap
String chainData = chainJedis.hget(chainKey, chainId);
LiteFlowChainELBuilder.createChain().setChainId(chainId).setEL(chainData).build();
LOG.info("starting poll flow config... update key={} new value={},", chainId, chainData);
LOG.info("starting reload flow config... create key={} new value={},", chainId, chainData);
chainSHAMap.put(chainId, DigestUtil.sha1Hex(chainData));
}
}

View File

@ -41,15 +41,9 @@ public class RedisParserPollingMode implements RedisParserHelper {
//scriptKey中value的SHA1加密值 用于轮询时确定value是否变化
private Map<String, String> scriptSHAMap = new HashMap<>();
//定时任务线程池参数配置
//定时任务线程池核心线程数
private static final int CORE_POOL_SIZE = 1;
private static final int MAX_POOL_SIZE = 1;
private static final int QUEUE_CAPACITY = 5;
private static final Long KEEP_ALIVE_TIME = 1L;
//计算hash中field数量的lua脚本
private final String luaOfKey = "local keys = redis.call(\"hkeys\", KEYS[1]);\n" +
"return #keys;\n";
@ -193,12 +187,10 @@ public class RedisParserPollingMode implements RedisParserHelper {
String valueLuaOfChain = chainJedis.scriptLoad(luaOfValue);
//定时任务线程池
ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
ScheduledThreadPoolExecutor pollExecutor = new ScheduledThreadPoolExecutor(
CORE_POOL_SIZE,
new ThreadPoolExecutor.DiscardOldestPolicy());
//添加轮询chain的定时任务
ChainPollingTask chainTask = new ChainPollingTask(redisParserVO, chainJedis, chainNum, chainSHAMap, LOG);
pollExecutor.scheduleAtFixedRate(chainTask.pollChainTask(keyLuaOfChain, valueLuaOfChain),

View File

@ -8,6 +8,8 @@ import com.yomahub.liteflow.parser.redis.mode.RedisParserHelper;
import com.yomahub.liteflow.parser.redis.vo.RedisParserVO;
import redis.clients.jedis.Jedis;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -51,6 +53,7 @@ public class ScriptPollingTask {
//修改scriptNum为最新script数量
scriptNum = Integer.parseInt(keyNum);
List<String> needDelete = new ArrayList<>();
//遍历Map,判断各个script的value有无变化修改变化了值的script和被删除的script
for (Map.Entry<String, String> entry : scriptSHAMap.entrySet()) {
String scriptFieldValue = entry.getKey();
@ -63,9 +66,11 @@ public class ScriptPollingTask {
FlowBus.getNodeMap().remove(nodeSimpleVO.getNodeId());
LOG.info("starting reload flow config... delete key={}", scriptFieldValue);
//修改SHAMap
scriptSHAMap.remove(scriptFieldValue);
} else if (!StrUtil.equals(newSHA, oldSHA)) {
//添加到待删除的list 后续统一从SHAMap中移除
//不在这里直接移除是为了避免先删除导致scriptSHAMap并没有完全遍历完 script删除不全
needDelete.add(scriptFieldValue);
}
else if (!StrUtil.equals(newSHA, oldSHA)) {
//SHA值发生变化,表示该script的值已被修改,重新拉取变化的script
String scriptData = scriptJedis.hget(scriptKey, scriptFieldValue);
RedisParserHelper.changeScriptNode(scriptFieldValue, scriptData);
@ -77,6 +82,11 @@ public class ScriptPollingTask {
//SHA值无变化,表示该script未改变
}
//统一从SHAMap中移除要删除的script
for (String scriptFieldValue : needDelete) {
scriptSHAMap.remove(scriptFieldValue);
}
//处理新添加script和script名被修改的情况
if (scriptNum > scriptSHAMap.size()) {
//如果封装的SHAMap数量比最新script总数少, 说明有两种情况
@ -90,7 +100,7 @@ public class ScriptPollingTask {
//将新script添加到LiteFlowChainELBuilder和SHAMap
String scriptData = scriptJedis.hget(scriptKey, scriptFieldValue);
RedisParserHelper.changeScriptNode(scriptFieldValue, scriptData);
LOG.info("starting reload flow config... update key={} new value={},", scriptFieldValue, scriptData);
LOG.info("starting reload flow config... create key={} new value={},", scriptFieldValue, scriptData);
scriptSHAMap.put(scriptFieldValue, DigestUtil.sha1Hex(scriptData));
}
}

View File

@ -42,7 +42,7 @@ public class RedisParserVO {
return host;
}
public void setHost(String url) {
public void setHost(String host) {
this.host = host;
}

View File

@ -0,0 +1 @@
com.yomahub.liteflow.parser.spi.redis.RedisParserClassNameSpi

View File

@ -13,7 +13,36 @@
<artifactId>liteflow-testcase-el-redis-springboot</artifactId>
<dependencies>
<dependency>
<groupId>com.yomahub</groupId>
<artifactId>liteflow-spring-boot-starter</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>com.yomahub</groupId>
<artifactId>liteflow-rule-redis</artifactId>
<version>${revision}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>com.yomahub</groupId>
<artifactId>liteflow-script-groovy</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>com.yomahub</groupId>
<artifactId>liteflow-script-graaljs</artifactId>
<version>${revision}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -1,4 +1,22 @@
package com.yomahub.liteflow.test;
import com.yomahub.liteflow.core.FlowInitHook;
import com.yomahub.liteflow.flow.FlowBus;
import com.yomahub.liteflow.property.LiteflowConfigGetter;
import com.yomahub.liteflow.spi.holder.SpiFactoryCleaner;
import com.yomahub.liteflow.spring.ComponentScanner;
import com.yomahub.liteflow.thread.ExecutorHelper;
import org.junit.jupiter.api.AfterAll;
public class BaseTest {
@AfterAll
public static void cleanScanCache() {
ComponentScanner.cleanCache();
FlowBus.cleanCache();
ExecutorHelper.loadInstance().clearExecutorServiceMap();
SpiFactoryCleaner.clean();
LiteflowConfigGetter.clean();
FlowInitHook.cleanHook();
}
}

View File

@ -0,0 +1,122 @@
package com.yomahub.liteflow.test.redis;
import com.yomahub.liteflow.core.FlowExecutor;
import com.yomahub.liteflow.flow.LiteflowResponse;
import com.yomahub.liteflow.parser.redis.vo.RedisParserVO;
import com.yomahub.liteflow.property.LiteflowConfig;
import com.yomahub.liteflow.property.LiteflowConfigGetter;
import com.yomahub.liteflow.util.JsonUtil;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.redisson.api.RMapCache;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import redis.clients.jedis.Jedis;
import javax.annotation.Resource;
/**
* springboot环境下的redis配置源轮询拉取模式功能测试
*
* @author hxinyu
* @since 2.11.0
*/
@ExtendWith(SpringExtension.class)
@TestPropertySource(value = "classpath:/redis/application-poll-xml.properties")
@SpringBootTest(classes = RedisWithXmlELPollSpringbootTest.class)
@EnableAutoConfiguration
@ComponentScan({"com.yomahub.liteflow.test.redis.cmp"})
public class RedisWithXmlELPollSpringbootTest {
private static Jedis jedis;
@Resource
private FlowExecutor flowExecutor;
@BeforeAll
public static void setUpBeforeClass() {
jedis = new Jedis("localhost", 6379);
jedis.select(1);
jedis.hset("pollScriptKey", "s1:script:脚本s1:groovy", "defaultContext.setData(\"test1\",\"hello s1\");");
jedis.hset("pollScriptKey", "s2:script:脚本s2:js", "defaultContext.setData(\"test2\",\"hello s2\");");
jedis.hset("pollScriptKey", "s3:script:脚本s3", "defaultContext.setData(\"test3\",\"hello s3\");");
jedis.hset("pollChainKey", "chain1", "THEN(a, b, c);");
jedis.hset("pollChainKey", "chain2", "THEN(a, b, c, s3);");
jedis.hset("pollChainKey", "chain3", "THEN(a, b, c, s1, s2);");
}
@Test
public void testPollWithXml() throws InterruptedException {
LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg");
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("a==>b==>c", response.getExecuteStepStr());
//修改redis中规则
changeXMLData();
//重新加载规则 定时任务1分钟后开始第一次轮询 所以这里休眠1分钟
Thread.sleep(65000);
Assertions.assertEquals("a==>c==>b", flowExecutor.execute2Resp("chain1", "arg").getExecuteStepStr());
//删除redis中规则
deleteXMLData();
//重新加载规则
Thread.sleep(5000);
response = flowExecutor.execute2Resp("chain1", "arg");
Assertions.assertTrue(!response.isSuccess());
response = flowExecutor.execute2Resp("chain2", "arg");
Assertions.assertTrue(!response.isSuccess());
//添加redis中规则
addXMLData();
//重新加载规则
Thread.sleep(5000);
Assertions.assertEquals("b==>c", flowExecutor.execute2Resp("chain4", "arg").getExecuteStepStr());
}
/**
* 修改redisson中的chain
*/
public void changeXMLData() {
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
RedisParserVO redisParserVO = JsonUtil.parseObject(liteflowConfig.getRuleSourceExtData(), RedisParserVO.class);
jedis.hset(redisParserVO.getChainKey(), "chain1", "THEN(a, c, b);");
}
/**
* 删除redisson中的chain
*/
public void deleteXMLData() {
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
RedisParserVO redisParserVO = JsonUtil.parseObject(liteflowConfig.getRuleSourceExtData(), RedisParserVO.class);
jedis.hdel(redisParserVO.getChainKey(), "chain1");
jedis.hdel(redisParserVO.getChainKey(), "chain2");
}
/**
* 新增redisson中的chain
*/
public void addXMLData() {
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
RedisParserVO redisParserVO = JsonUtil.parseObject(liteflowConfig.getRuleSourceExtData(), RedisParserVO.class);
jedis.hset(redisParserVO.getChainKey(), "chain4", "THEN(b, c);");
}
/**
* 修改redisson中的脚本
*/
public void changeScriptData() {
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
RedisParserVO redisParserVO = JsonUtil.parseObject(liteflowConfig.getRuleSourceExtData(), RedisParserVO.class);
jedis.hset(redisParserVO.getScriptKey(), "s1:script:脚本s1:groovy", "defaultContext.setData(\"test1\",\"hello s1 version2\");");
}
}

View File

@ -1,4 +0,0 @@
package com.yomahub.liteflow.test.redis;
public class RedisWithXmlELSpringbootTest {
}

View File

@ -0,0 +1,155 @@
package com.yomahub.liteflow.test.redis;
import com.yomahub.liteflow.core.FlowExecutor;
import com.yomahub.liteflow.flow.LiteflowResponse;
import com.yomahub.liteflow.parser.redis.vo.RedisParserVO;
import com.yomahub.liteflow.property.LiteflowConfig;
import com.yomahub.liteflow.property.LiteflowConfigGetter;
import com.yomahub.liteflow.slot.DefaultContext;
import com.yomahub.liteflow.util.JsonUtil;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.redisson.Redisson;
import org.redisson.api.RMapCache;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import javax.annotation.Resource;
import java.util.Set;
/**
* springboot环境下的redis配置源订阅模式功能测试
*
* @author hxinyu
* @since 2.11.0
*/
@ExtendWith(SpringExtension.class)
@TestPropertySource(value = "classpath:/redis/application-sub-xml.properties")
@SpringBootTest(classes = RedisWithXmlELSubscribeSpringbootTest.class)
@EnableAutoConfiguration
@ComponentScan({"com.yomahub.liteflow.test.redis.cmp"})
public class RedisWithXmlELSubscribeSpringbootTest {
private static RedissonClient redissonClient;
@Resource
private FlowExecutor flowExecutor;
@BeforeAll
public static void setUpBeforeClass() {
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379").setDatabase(1);
redissonClient = Redisson.create(config);
RMapCache<String, String> chainKey = redissonClient.getMapCache("chainKey");
RMapCache<String, String> scriptKey = redissonClient.getMapCache("scriptKey");
scriptKey.put("s1:script:脚本s1:groovy", "defaultContext.setData(\"test1\",\"hello s1\");");
scriptKey.put("s2:script:脚本s2:js", "defaultContext.setData(\"test2\",\"hello s2\");");
scriptKey.put("s3:script:脚本s3", "defaultContext.setData(\"test3\",\"hello s3\");");
chainKey.put("chain1", "THEN(a, b, c);");
chainKey.put("chain2", "THEN(a, b, c, s3);");
chainKey.put("chain3", "THEN(a, b, c, s1, s2);");
}
@Test
public void testSubWithXml() throws InterruptedException {
LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg");
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("a==>b==>c", response.getExecuteStepStr());
//修改redis中规则
changeXMLData();
//重新加载规则
Thread.sleep(50);
Assertions.assertEquals("a==>c==>b", flowExecutor.execute2Resp("chain1", "arg").getExecuteStepStr());
//删除redis中规则
deleteXMLData();
//重新加载规则
Thread.sleep(50);
response = flowExecutor.execute2Resp("chain1", "arg");
Assertions.assertTrue(!response.isSuccess());
//添加redis中规则
addXMLData();
//重新加载规则
Thread.sleep(50);
Assertions.assertEquals("b==>c", flowExecutor.execute2Resp("chain4", "arg").getExecuteStepStr());
}
@Test
public void testSubWithScriptXml() throws InterruptedException {
LiteflowResponse response = flowExecutor.execute2Resp("chain3", "arg");
DefaultContext context = response.getFirstContextBean();
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("hello s1", context.getData("test1"));
Assertions.assertEquals("a==>b==>c==>s1[脚本s1]==>s2[脚本s2]", response.getExecuteStepStrWithoutTime());
//添加和删除脚本
addAndDeleteScriptData();
//修改redis脚本
changeScriptData();
Thread.sleep(50);
context = flowExecutor.execute2Resp("chain3", "arg").getFirstContextBean();
Assertions.assertEquals("hello s1 version2", context.getData("test1"));
}
/**
* 修改redisson中的chain
*/
public void changeXMLData() {
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
RedisParserVO redisParserVO = JsonUtil.parseObject(liteflowConfig.getRuleSourceExtData(), RedisParserVO.class);
RMapCache<String, String> chainKey = redissonClient.getMapCache(redisParserVO.getChainKey());
chainKey.put("chain1", "THEN(a, c, b);");
}
/**
* 删除redisson中的chain
*/
public void deleteXMLData() {
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
RedisParserVO redisParserVO = JsonUtil.parseObject(liteflowConfig.getRuleSourceExtData(), RedisParserVO.class);
RMapCache<String, String> chainKey = redissonClient.getMapCache(redisParserVO.getChainKey());
chainKey.remove("chain1");
chainKey.remove("chain4");
}
/**
* 新增redisson中的chain
*/
public void addXMLData() {
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
RedisParserVO redisParserVO = JsonUtil.parseObject(liteflowConfig.getRuleSourceExtData(), RedisParserVO.class);
RMapCache<String, String> chainKey = redissonClient.getMapCache(redisParserVO.getChainKey());
chainKey.put("chain4","THEN(b, c);");
}
/**
* 修改redisson中的脚本
*/
public void changeScriptData() {
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
RedisParserVO redisParserVO = JsonUtil.parseObject(liteflowConfig.getRuleSourceExtData(), RedisParserVO.class);
RMapCache<String, String> scriptKey = redissonClient.getMapCache(redisParserVO.getScriptKey());
scriptKey.put("s1:script:脚本s1:groovy", "defaultContext.setData(\"test1\",\"hello s1 version2\");");
}
/**
* 新增和删除redisson中的chain
*/
public void addAndDeleteScriptData() {
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
RedisParserVO redisParserVO = JsonUtil.parseObject(liteflowConfig.getRuleSourceExtData(), RedisParserVO.class);
RMapCache<String, String> scriptKey = redissonClient.getMapCache(redisParserVO.getScriptKey());
scriptKey.remove("s4:script:脚本s4");
scriptKey.remove("s5:script:脚本s5:groovy");
scriptKey.put("s5:script:脚本s5:groovy", "defaultContext.setData(\"test1\",\"hello s5\");");
}
}

View File

@ -1,4 +1,21 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.redis.cmp;
public class ACmp {
import com.yomahub.liteflow.core.NodeComponent;
import org.springframework.stereotype.Component;
@Component("a")
public class ACmp extends NodeComponent {
@Override
public void process() {
System.out.println("ACmp executed!");
}
}

View File

@ -1,4 +1,21 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.redis.cmp;
public class BCmp {
import com.yomahub.liteflow.core.NodeComponent;
import org.springframework.stereotype.Component;
@Component("b")
public class BCmp extends NodeComponent {
@Override
public void process() {
System.out.println("BCmp executed!");
}
}

View File

@ -1,4 +1,21 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.redis.cmp;
public class CCmp {
}
import com.yomahub.liteflow.core.NodeComponent;
import org.springframework.stereotype.Component;
@Component("c")
public class CCmp extends NodeComponent {
@Override
public void process() {
System.out.println("CCmp executed!");
}
}

View File

@ -0,0 +1,10 @@
liteflow.rule-source-ext-data={\
"host":"localhost",\
"port":6379,\
"pollingInterval":1,\
"chainDataBase":1,\
"chainKey":"pollChainKey",\
"scriptDataBase":1,\
"scriptKey":"pollScriptKey"\
}
liteflow.parse-on-start=false

View File

@ -0,0 +1,10 @@
liteflow.rule-source-ext-data={\
"host":"localhost",\
"port":6379,\
"mode":"SUB",\
"chainDataBase":1,\
"chainKey":"chainKey",\
"scriptDataBase":1,\
"scriptKey":"scriptKey"\
}
liteflow.parse-on-start=false