diff --git a/liteflow-rule-plugin/liteflow-rule-redis/pom.xml b/liteflow-rule-plugin/liteflow-rule-redis/pom.xml new file mode 100644 index 00000000..a5e98349 --- /dev/null +++ b/liteflow-rule-plugin/liteflow-rule-redis/pom.xml @@ -0,0 +1,37 @@ + + + + liteflow-rule-plugin + com.yomahub + ${revision} + ../pom.xml + + 4.0.0 + + liteflow-rule-redis + + + + com.yomahub + liteflow-core + ${revision} + true + provided + + + + org.redisson + redisson + ${redisson.version} + + + + cn.hutool + hutool-crypto + ${hutool-crypto.version} + + + + \ No newline at end of file diff --git a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/RedisXmlELParser.java b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/RedisXmlELParser.java new file mode 100644 index 00000000..547b291c --- /dev/null +++ b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/RedisXmlELParser.java @@ -0,0 +1,109 @@ +package com.yomahub.liteflow.parser.redis; + +import cn.hutool.core.bean.BeanUtil; +import cn.hutool.core.bean.copier.CopyOptions; +import cn.hutool.core.map.MapUtil; +import cn.hutool.core.text.StrFormatter; +import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.StrUtil; +import com.yomahub.liteflow.core.FlowInitHook; +import com.yomahub.liteflow.parser.el.ClassXmlFlowELParser; +import com.yomahub.liteflow.parser.redis.exception.RedisException; +import com.yomahub.liteflow.parser.redis.mode.polling.RedisParserPollingMode; +import com.yomahub.liteflow.parser.redis.mode.subscribe.RedisParserSubscribeMode; +import com.yomahub.liteflow.parser.redis.mode.RedisParserHelper; +import com.yomahub.liteflow.parser.redis.mode.RedisParserMode; +import com.yomahub.liteflow.parser.redis.vo.RedisParserVO; +import com.yomahub.liteflow.property.LiteflowConfig; +import com.yomahub.liteflow.property.LiteflowConfigGetter; +import com.yomahub.liteflow.util.JsonUtil; + +import java.util.Objects; + +/** + * Redis解析器实现,只支持EL形式的XML,不支持其他的形式 + * + * @author hxinyu + * @since 2.11.0 + */ + +public class RedisXmlELParser extends ClassXmlFlowELParser { + + private final RedisParserHelper redisParserHelper; + + private static final String ERROR_COMMON_MSG = "ruleSourceExtData or map is empty"; + + private static final String ERROR_MSG_PATTERN = "ruleSourceExtData {} is blank"; + + public RedisXmlELParser() { + LiteflowConfig liteflowConfig = LiteflowConfigGetter.get(); + + try { + RedisParserVO redisParserVO = null; + if (MapUtil.isNotEmpty((liteflowConfig.getRuleSourceExtDataMap()))) { + redisParserVO = BeanUtil.toBean(liteflowConfig.getRuleSourceExtDataMap(), + RedisParserVO.class, CopyOptions.create()); + } + else if (StrUtil.isNotBlank(liteflowConfig.getRuleSourceExtData())) { + redisParserVO = JsonUtil.parseObject(liteflowConfig.getRuleSourceExtData(), RedisParserVO.class); + } + if (Objects.isNull(redisParserVO)) { + throw new RedisException(ERROR_COMMON_MSG); + } + + //检查配置文件 + checkParserVO(redisParserVO); + + //选择订阅机制 or 轮询机制 + RedisParserMode mode = redisParserVO.getMode(); + switch (mode) { + case SUB: + case SUBSCRIBE: + redisParserHelper = new RedisParserSubscribeMode(redisParserVO); + break; + case POLL: + default: + redisParserHelper = new RedisParserPollingMode(redisParserVO); + break; + } + + } + catch (RedisException redisException) { + throw redisException; + } + catch (Exception e) { + throw new RedisException(e.getMessage()); + } + } + + @Override + public String parseCustom() { + try { + String content = redisParserHelper.getContent(); + FlowInitHook.addHook(() -> { + redisParserHelper.listenRedis(); + return true; + }); + return content; + + } + catch (Exception e) { + throw new RedisException(e.getMessage()); + } + } + + private void checkParserVO(RedisParserVO redisParserVO) { + if (StrUtil.isBlank(redisParserVO.getHost())) { + throw new RedisException(StrFormatter.format(ERROR_MSG_PATTERN, "host")); + } + if (ObjectUtil.isNull(redisParserVO.getPort())) { + throw new RedisException(StrFormatter.format(ERROR_MSG_PATTERN, "port")); + } + if (ObjectUtil.isNull(redisParserVO.getChainDataBase())) { + throw new RedisException(StrFormatter.format(ERROR_MSG_PATTERN, "chainDataBase")); + } + if (StrUtil.isBlank(redisParserVO.getChainKey())) { + throw new RedisException(StrFormatter.format(ERROR_MSG_PATTERN, "chainKey")); + } + } +} diff --git a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/exception/RedisException.java b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/exception/RedisException.java new file mode 100644 index 00000000..1b6ee435 --- /dev/null +++ b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/exception/RedisException.java @@ -0,0 +1,23 @@ +package com.yomahub.liteflow.parser.redis.exception; + +/** + * Redis解析异常 + * + * @author hxinyu + * @since 2.11.0 + */ + +public class RedisException extends RuntimeException{ + + private String message; + + public RedisException(String message) { + super(); + this.message = message; + } + + @Override + public String getMessage() { + return message; + } +} diff --git a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/RClient.java b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/RClient.java new file mode 100644 index 00000000..cfa4a1bf --- /dev/null +++ b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/RClient.java @@ -0,0 +1,109 @@ +package com.yomahub.liteflow.parser.redis.mode; + +import cn.hutool.core.collection.CollectionUtil; +import org.redisson.api.RMap; +import org.redisson.api.RMapCache; +import org.redisson.api.RScript; +import org.redisson.api.RedissonClient; +import org.redisson.api.map.event.MapEntryListener; +import org.redisson.client.codec.StringCodec; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +/** + * Redisson 客户端封装类. + * + * @author hxinyu + * @since 2.11.0 + */ +public class RClient { + + private final RedissonClient redissonClient; + + private Map map = new HashMap<>(); + + public RClient(RedissonClient redissonClient) { + this.redissonClient = redissonClient; + } + + + /** + * get hashmap of the key + * + * @param key hash name + * @return hashmap + */ + public Map getMap(String key) { + RMapCache mapCache = redissonClient.getMapCache(key); + Set mapFieldSet = mapCache.keySet(); + if (CollectionUtil.isEmpty(mapFieldSet)) { + return map; + } + for (String field : mapFieldSet) { + String value = mapCache.get(field); + map.put(field, value); + } + return map; + } + + + /** + * add listener of the key + * + * @param key hash name + * @param listener listener + * @return listener id + */ + public int addListener(String key, MapEntryListener listener) { + RMapCache mapCache = redissonClient.getMapCache(key); + return mapCache.addListener(listener); + } + + /** + * get all keys of hash + * + * @param key hash name + * @return keySet + */ + public Set hkeys(String key) { + RMap map = redissonClient.getMap(key, new StringCodec()); + return map.readAllKeySet(); + } + + /** + * gey value of the key + * + * @param key hash name + * @param field hash field + * @return hash value + */ + public String hget(String key, String field) { + RMap map = redissonClient.getMap(key, new StringCodec()); + return map.get(field); + } + + /** + * Loads Lua script into Redis scripts cache and returns its SHA-1 digest + * @param luaScript script + * @return shaDigest + */ + public String scriptLoad(String luaScript) { + RScript script = redissonClient.getScript(new StringCodec()); + return script.scriptLoad(luaScript); + } + + /** + * Executes Lua script stored in Redis scripts cache by SHA-1 digest + * @param shaDigest script cache by SHA-1 + * @param args script args + * @return string + */ + public String evalSha(String shaDigest, String... args){ + RScript script = redissonClient.getScript(new StringCodec()); + return script.evalSha(RScript.Mode.READ_ONLY, shaDigest, RScript.ReturnType.VALUE, + Arrays.asList(args)).toString(); + } +} diff --git a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/RedisMode.java b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/RedisMode.java new file mode 100644 index 00000000..ce561042 --- /dev/null +++ b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/RedisMode.java @@ -0,0 +1,27 @@ +package com.yomahub.liteflow.parser.redis.mode; + +/** + * 用于定义Redis模式的枚举类 + * + * single单点模式, sentinel哨兵模式 + * 不支持集群模式配置 + * + * @author hxinyu + * @since 2.11.0 + */ +public enum RedisMode { + + SINGLE("single"), + + SENTINEL("sentinel"); + + private String mode; + + RedisMode(String mode) { + this.mode = mode; + } + + public String getMode() { + return mode; + } +} diff --git a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/RedisParserHelper.java b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/RedisParserHelper.java new file mode 100644 index 00000000..eb60d2cd --- /dev/null +++ b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/RedisParserHelper.java @@ -0,0 +1,205 @@ +package com.yomahub.liteflow.parser.redis.mode; + +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.text.StrFormatter; +import cn.hutool.core.util.ReUtil; +import cn.hutool.core.util.StrUtil; +import com.yomahub.liteflow.builder.LiteFlowNodeBuilder; +import com.yomahub.liteflow.enums.NodeTypeEnum; +import com.yomahub.liteflow.log.LFLog; +import com.yomahub.liteflow.log.LFLoggerManager; +import com.yomahub.liteflow.parser.redis.vo.RedisParserVO; +import org.redisson.config.Config; +import org.redisson.config.SentinelServersConfig; + +import java.util.List; + +/** + * Redis 解析器通用接口 + * + * @author hxinyu + * @since 2.11.0 + */ + +public interface RedisParserHelper { + + LFLog LOG = LFLoggerManager.getLogger(RedisParserHelper.class); + + String SINGLE_REDIS_URL_PATTERN = "redis://{}:{}"; + + String SENTINEL_REDIS_URL_PATTERN = "redis://{}"; + + String CHAIN_XML_PATTERN = "{}"; + + String NODE_XML_PATTERN = "{}"; + + String NODE_ITEM_XML_PATTERN = ""; + + String NODE_ITEM_WITH_LANGUAGE_XML_PATTERN = ""; + + String XML_PATTERN = "{}{}"; + + String getContent(); + + void listenRedis(); + + + /** + * 获取Redisson客户端的Config配置通用方法(单点模式) + * @param redisParserVO redisParserVO + * @param dataBase redis连接的数据库号 + * @return redisson config + */ + default Config getSingleRedissonConfig(RedisParserVO redisParserVO, Integer dataBase) { + Config config = new Config(); + String redisAddress = StrFormatter.format(SINGLE_REDIS_URL_PATTERN, redisParserVO.getHost(), redisParserVO.getPort()); + //如果配置了用户名和密码 + if (StrUtil.isNotBlank(redisParserVO.getUsername()) && StrUtil.isNotBlank(redisParserVO.getPassword())) { + config.useSingleServer().setAddress(redisAddress) + .setUsername(redisParserVO.getUsername()) + .setPassword(redisParserVO.getPassword()) + .setDatabase(dataBase); + } + //如果配置了密码 + else if (StrUtil.isNotBlank(redisParserVO.getPassword())) { + config.useSingleServer().setAddress(redisAddress) + .setPassword(redisParserVO.getPassword()) + .setDatabase(dataBase); + } + //没有配置密码 + else { + config.useSingleServer().setAddress(redisAddress) + .setDatabase(dataBase); + } + return config; + } + + /** + * 获取Redisson客户端的Config配置通用方法(哨兵模式) + * @param redisParserVO redisParserVO + * @param dataBase redis连接的数据库号 + * @return redisson Config + */ + default Config getSentinelRedissonConfig(RedisParserVO redisParserVO, Integer dataBase) { + Config config = new Config(); + SentinelServersConfig sentinelConfig = config.useSentinelServers() + .setMasterName(redisParserVO.getMasterName()); + redisParserVO.getSentinelAddress().forEach(address -> { + sentinelConfig.addSentinelAddress(StrFormatter.format(SENTINEL_REDIS_URL_PATTERN, address)); + }); + //如果配置了用户名和密码 + if(StrUtil.isNotBlank(redisParserVO.getUsername()) && StrUtil.isNotBlank(redisParserVO.getPassword())) { + sentinelConfig.setUsername(redisParserVO.getUsername()) + .setPassword(redisParserVO.getPassword()) + .setDatabase(dataBase); + } + //如果配置了密码 + else if(StrUtil.isNotBlank(redisParserVO.getPassword())) { + sentinelConfig.setPassword(redisParserVO.getPassword()) + .setDatabase(dataBase); + } + //没有配置密码 + else { + sentinelConfig.setDatabase(dataBase); + } + return config; + } + + /** + * script节点的修改/添加 + * + * @param scriptFieldValue 新的script名 + * @param newValue 新的script值 + */ + static void changeScriptNode(String scriptFieldValue, String newValue) { + NodeSimpleVO nodeSimpleVO = convert(scriptFieldValue); + // 有语言类型 + if (StrUtil.isNotBlank(nodeSimpleVO.getLanguage())) { + LiteFlowNodeBuilder.createScriptNode() + .setId(nodeSimpleVO.getNodeId()) + .setType(NodeTypeEnum.getEnumByCode(nodeSimpleVO.getType())) + .setName(nodeSimpleVO.getName()) + .setScript(newValue) + .setLanguage(nodeSimpleVO.getLanguage()) + .build(); + } + // 没有语言类型 + else { + LiteFlowNodeBuilder.createScriptNode() + .setId(nodeSimpleVO.getNodeId()) + .setType(NodeTypeEnum.getEnumByCode(nodeSimpleVO.getType())) + .setName(nodeSimpleVO.getName()) + .setScript(newValue) + .build(); + } + } + + static NodeSimpleVO convert(String str) { + // 不需要去理解这串正则,就是一个匹配冒号的 + // 一定得是a:b,或是a:b:c...这种完整类型的字符串的 + List matchItemList = ReUtil.findAllGroup0("(?<=[^:]:)[^:]+|[^:]+(?=:[^:])", str); + if (CollUtil.isEmpty(matchItemList)) { + return null; + } + + NodeSimpleVO nodeSimpleVO = new NodeSimpleVO(); + if (matchItemList.size() > 1) { + nodeSimpleVO.setNodeId(matchItemList.get(0)); + nodeSimpleVO.setType(matchItemList.get(1)); + } + + if (matchItemList.size() > 2) { + nodeSimpleVO.setName(matchItemList.get(2)); + } + + if (matchItemList.size() > 3) { + nodeSimpleVO.setLanguage(matchItemList.get(3)); + } + + return nodeSimpleVO; + } + + class NodeSimpleVO { + + private String nodeId; + + private String type; + + private String name = StrUtil.EMPTY; + + private String language; + + public String getNodeId() { + return nodeId; + } + + public void setNodeId(String nodeId) { + this.nodeId = nodeId; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getLanguage() { + return language; + } + + public void setLanguage(String language) { + this.language = language; + } + + } +} diff --git a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/RedisParserMode.java b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/RedisParserMode.java new file mode 100644 index 00000000..258229c6 --- /dev/null +++ b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/RedisParserMode.java @@ -0,0 +1,26 @@ +package com.yomahub.liteflow.parser.redis.mode; + +/** + * 用于定义Redis规则存储和监听方式的枚举类 + * + * poll轮询拉取模式, sub监听模式 + * @author hxinyu + * @since 2.11.0 + */ +public enum RedisParserMode { + + //poll为轮询模式,subscribe/sub为订阅模式,默认为poll + POLL("poll"), + SUB("subscribe"), + SUBSCRIBE("subscribe"); + + private String mode; + + RedisParserMode(String mode) { + this.mode = mode; + } + + public String getMode() { + return mode; + } +} diff --git a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/polling/ChainPollingTask.java b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/polling/ChainPollingTask.java new file mode 100644 index 00000000..6b66d030 --- /dev/null +++ b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/polling/ChainPollingTask.java @@ -0,0 +1,114 @@ +package com.yomahub.liteflow.parser.redis.mode.polling; + +import cn.hutool.core.util.StrUtil; +import cn.hutool.crypto.digest.DigestUtil; +import com.yomahub.liteflow.builder.el.LiteFlowChainELBuilder; +import com.yomahub.liteflow.flow.FlowBus; +import com.yomahub.liteflow.log.LFLog; +import com.yomahub.liteflow.log.LFLoggerManager; +import com.yomahub.liteflow.parser.redis.mode.RClient; +import com.yomahub.liteflow.parser.redis.vo.RedisParserVO; + +import java.util.*; + +/** + * 用于轮询chain的定时任务 + * + * @author hxinyu + * @since 2.11.0 + */ +public class ChainPollingTask implements Runnable { + + private RedisParserVO redisParserVO; + + private RClient chainClient; + + private Integer chainNum; + + private Map chainSHAMap; + + private String keyLua; + + private String valueLua; + + LFLog LOG = LFLoggerManager.getLogger(ChainPollingTask.class); + + public ChainPollingTask(RedisParserVO redisParserVO, RClient chainClient, Integer chainNum, Map chainSHAMap, String keyLua, String valueLua) { + this.redisParserVO = redisParserVO; + this.chainClient = chainClient; + this.chainNum = chainNum; + this.chainSHAMap = chainSHAMap; + this.keyLua = keyLua; + this.valueLua = valueLua; + } + + /** + * 用于返回chain轮询任务 + * 先根据hash中value的SHA值修改变化的和被删除的chain + * 再根据hash中field数量的变化拉取新增的chain + */ + @Override + public void run() { + try { + String chainKey = redisParserVO.getChainKey(); + //Lua获取chainKey中最新的chain数量 + String keyNum = chainClient.evalSha(keyLua, chainKey); + //修改chainNum为最新chain数量 + chainNum = Integer.parseInt(keyNum); + + List needDelete = new ArrayList<>(); + //遍历Map,判断各个chain的value有无变化:修改变化了值的chain和被删除的chain + for (Map.Entry entry : chainSHAMap.entrySet()) { + String chainId = entry.getKey(); + String oldSHA = entry.getValue(); + //在redis服务端通过Lua脚本计算SHA值 + String newSHA = chainClient.evalSha(valueLua, chainKey, chainId); + if (StrUtil.equals(newSHA, "nil")) { + //新SHA值为nil, 即未获取到该chain,表示该chain已被删除 + FlowBus.removeChain(chainId); + LOG.info("starting reload flow config... delete key={}", chainId); + + //添加到待删除的list 后续统一从SHAMap中移除 + //不在这里直接移除是为了避免先删除导致chainSHAMap并没有完全遍历完 chain删除不全 + needDelete.add(chainId); + } + else if (!StrUtil.equals(newSHA, oldSHA)) { + //SHA值发生变化,表示该chain的值已被修改,重新拉取变化的chain + String chainData = chainClient.hget(chainKey, chainId); + LiteFlowChainELBuilder.createChain().setChainId(chainId).setEL(chainData).build(); + LOG.info("starting reload flow config... update key={} new value={},", chainId, chainData); + + //修改SHAMap + chainSHAMap.put(chainId, newSHA); + } + //SHA值无变化,表示该chain未改变 + } + + //统一从SHAMap中移除要删除的chain + for (String chainId : needDelete) { + chainSHAMap.remove(chainId); + } + + //处理新添加chain和chainId被修改的情况 + if (chainNum > chainSHAMap.size()) { + //如果封装的SHAMap数量比最新chain总数少, 说明有两种情况: + // 1、添加了新chain + // 2、修改了chainId:因为遍历到旧的id时会取到nil,SHAMap会把原来的chainId删掉,但没有机会添加新的chainId + // 3、上述两者结合 + //在此处重新拉取所有chainId集合,补充添加新chain + Set newChainSet = chainClient.hkeys(chainKey); + for (String chainId : newChainSet) { + if (!chainSHAMap.containsKey(chainId)) { + //将新chainId添加到LiteFlowChainELBuilder和SHAMap + String chainData = chainClient.hget(chainKey, chainId); + LiteFlowChainELBuilder.createChain().setChainId(chainId).setEL(chainData).build(); + LOG.info("starting reload flow config... create key={} new value={},", chainId, chainData); + chainSHAMap.put(chainId, DigestUtil.sha1Hex(chainData)); + } + } + } + } catch (Exception e) { + LOG.error("[Exception during chain polling] " + e.getMessage(), e); + } + } +} diff --git a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/polling/RedisParserPollingMode.java b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/polling/RedisParserPollingMode.java new file mode 100644 index 00000000..231b9907 --- /dev/null +++ b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/polling/RedisParserPollingMode.java @@ -0,0 +1,231 @@ +package com.yomahub.liteflow.parser.redis.mode.polling; + +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.collection.CollectionUtil; +import cn.hutool.core.thread.NamedThreadFactory; +import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.StrUtil; +import cn.hutool.crypto.digest.DigestUtil; +import com.yomahub.liteflow.parser.redis.exception.RedisException; +import com.yomahub.liteflow.parser.redis.mode.RClient; +import com.yomahub.liteflow.parser.redis.mode.RedisMode; +import com.yomahub.liteflow.parser.redis.mode.RedisParserHelper; +import com.yomahub.liteflow.parser.redis.vo.RedisParserVO; +import com.yomahub.liteflow.spi.holder.ContextAwareHolder; +import org.redisson.Redisson; +import org.redisson.config.Config; + +import java.util.*; +import java.util.concurrent.*; + +/** + * Redis 轮询机制实现类 + * + * @author hxinyu + * @since 2.11.0 + */ + +public class RedisParserPollingMode implements RedisParserHelper { + + private final RedisParserVO redisParserVO; + + private RClient chainClient; + + private RClient scriptClient; + + //chainKey中chain总数 + private Integer chainNum = 0; + + //scriptKey中script总数 + private Integer scriptNum = 0; + + //chainKey中value的SHA1加密值 用于轮询时确定value是否变化 + private Map chainSHAMap = new HashMap<>(); + + //scriptKey中value的SHA1加密值 用于轮询时确定value是否变化 + private Map scriptSHAMap = new HashMap<>(); + + //定时任务线程池核心线程数 + private static final int CORE_POOL_SIZE = 2; + + //定时任务线程池 + private ScheduledThreadPoolExecutor pollExecutor; + + //计算hash中field数量的lua脚本 + private final String luaOfKey = "local keys = redis.call(\"hkeys\", KEYS[1]);\n" + + "return #keys;\n"; + + //计算hash中value的SHA值的lua脚本 + private final String luaOfValue = "local key = KEYS[1];\n" + + "local field = KEYS[2];\n" + + "local value, err = redis.call(\"hget\", key, field);\n" + + "if value == false or value == nil then\n" + + " return \"nil\";\n" + + "end\n" + + "local sha1 = redis.sha1hex(value);\n" + + "return sha1;"; + + public RedisParserPollingMode(RedisParserVO redisParserVO) { + this.redisParserVO = redisParserVO; + + try{ + try{ + this.chainClient = ContextAwareHolder.loadContextAware().getBean("chainClient"); + this.scriptClient = ContextAwareHolder.loadContextAware().getBean("scriptClient"); + } + catch (Exception ignored) { + } + if (ObjectUtil.isNull(chainClient)) { + RedisMode redisMode = redisParserVO.getRedisMode(); + Config config; + //Redis单点模式 + if (redisMode.equals(RedisMode.SINGLE)){ + config = getSingleRedissonConfig(redisParserVO, redisParserVO.getChainDataBase()); + this.chainClient = new RClient(Redisson.create(config)); + //如果有脚本数据 + if (ObjectUtil.isNotNull(redisParserVO.getScriptDataBase())) { + config = getSingleRedissonConfig(redisParserVO, redisParserVO.getScriptDataBase()); + this.scriptClient = new RClient(Redisson.create(config)); + } + } + + //Redis哨兵模式 + else if (redisMode.equals(RedisMode.SENTINEL)) { + config = getSentinelRedissonConfig(redisParserVO, redisParserVO.getChainDataBase()); + this.chainClient = new RClient(Redisson.create(config)); + //如果有脚本数据 + if (ObjectUtil.isNotNull(redisParserVO.getScriptDataBase())) { + config = getSentinelRedissonConfig(redisParserVO, redisParserVO.getScriptDataBase()); + this.scriptClient = new RClient(Redisson.create(config)); + } + } + } + //创建定时任务线程池 + if (ObjectUtil.isNull(pollExecutor)) { + ThreadFactory namedThreadFactory = new NamedThreadFactory("Redis-Polling-", false); + pollExecutor = new ScheduledThreadPoolExecutor( + CORE_POOL_SIZE, + namedThreadFactory, + new ThreadPoolExecutor.DiscardOldestPolicy()); + } + } + catch (Exception e) { + throw new RedisException(e.getMessage()); + } + } + + @Override + public String getContent() { + try { + // 检查chainKey下有没有子节点 + String chainKey = redisParserVO.getChainKey(); + Set chainNameSet = chainClient.hkeys(chainKey); + if (CollectionUtil.isEmpty(chainNameSet)) { + throw new RedisException(StrUtil.format("There are no chains in key [{}]", chainKey)); + } + chainNum = chainNameSet.size(); + // 获取chainKey下的所有子节点内容List + List chainItemContentList = new ArrayList<>(); + for (String chainName : chainNameSet) { + String chainData = chainClient.hget(chainKey, chainName); + if (StrUtil.isNotBlank(chainData)) { + chainItemContentList.add(StrUtil.format(CHAIN_XML_PATTERN, chainName, chainData)); + } + + //计算该chainData的SHA值 + String chainSHA = DigestUtil.sha1Hex(chainData); + chainSHAMap.put(chainName, chainSHA); + } + // 合并成所有chain的xml内容 + String chainAllContent = CollUtil.join(chainItemContentList, StrUtil.EMPTY); + + // 检查是否有脚本内容,如果有,进行脚本内容的获取 + String scriptAllContent = StrUtil.EMPTY; + if (hasScript()) { + String scriptKey = redisParserVO.getScriptKey(); + Set scriptFieldSet = scriptClient.hkeys(scriptKey); + scriptNum = scriptFieldSet.size(); + + List scriptItemContentList = new ArrayList<>(); + for (String scriptFieldValue : scriptFieldSet) { + NodeSimpleVO nodeSimpleVO = RedisParserHelper.convert(scriptFieldValue); + if (ObjectUtil.isNull(nodeSimpleVO)) { + throw new RedisException( + StrUtil.format("The name of the redis field [{}] in scriptKey [{}] is invalid", + scriptFieldValue, scriptKey)); + } + String scriptData = scriptClient.hget(scriptKey, scriptFieldValue); + + // 有语言类型 + if (StrUtil.isNotBlank(nodeSimpleVO.getLanguage())) { + scriptItemContentList.add(StrUtil.format(NODE_ITEM_WITH_LANGUAGE_XML_PATTERN, + nodeSimpleVO.getNodeId(), nodeSimpleVO.getName(), nodeSimpleVO.getType(), + nodeSimpleVO.getLanguage(), scriptData)); + } + // 没有语言类型 + else { + scriptItemContentList.add(StrUtil.format(NODE_ITEM_XML_PATTERN, nodeSimpleVO.getNodeId(), + nodeSimpleVO.getName(), nodeSimpleVO.getType(), scriptData)); + } + + //计算scriptData的SHA值 + String scriptSHA = DigestUtil.sha1Hex(scriptData); + scriptSHAMap.put(scriptFieldValue, scriptSHA); + } + + scriptAllContent = StrUtil.format(NODE_XML_PATTERN, + CollUtil.join(scriptItemContentList, StrUtil.EMPTY)); + } + + return StrUtil.format(XML_PATTERN, scriptAllContent, chainAllContent); + } + catch (Exception e) { + throw new RedisException(e.getMessage()); + } + } + + public boolean hasScript() { + if (ObjectUtil.isNull(scriptClient) || ObjectUtil.isNull(redisParserVO.getScriptDataBase())) { + return false; + } + try{ + String scriptKey = redisParserVO.getScriptKey(); + if (StrUtil.isBlank(scriptKey)) { + return false; + } + Set scriptKeySet = scriptClient.hkeys(scriptKey); + return !CollUtil.isEmpty(scriptKeySet); + } + catch (Exception e) { + return false; + } + } + + /** + * 定时轮询拉取Redis中变化的数据 + */ + @Override + public void listenRedis() { + //将lua脚本添加到chainJedis脚本缓存 + String keyLuaOfChain = chainClient.scriptLoad(luaOfKey); + String valueLuaOfChain = chainClient.scriptLoad(luaOfValue); + + //添加轮询chain的定时任务 + ChainPollingTask chainTask = new ChainPollingTask(redisParserVO, chainClient, chainNum, chainSHAMap, keyLuaOfChain, valueLuaOfChain); + pollExecutor.scheduleAtFixedRate(chainTask, redisParserVO.getPollingStartTime().longValue(), + redisParserVO.getPollingInterval().longValue(), TimeUnit.SECONDS); + + //如果有脚本 + if (ObjectUtil.isNotNull(scriptClient) && ObjectUtil.isNotNull(redisParserVO.getScriptDataBase()) + && StrUtil.isNotBlank(redisParserVO.getScriptKey())) { + //将lua脚本添加到scriptJedis脚本缓存 + String keyLuaOfScript = scriptClient.scriptLoad(luaOfKey); + String valueLuaOfScript = scriptClient.scriptLoad(luaOfValue); + + //添加轮询script的定时任务 + ScriptPollingTask scriptTask = new ScriptPollingTask(redisParserVO, scriptClient, scriptNum, scriptSHAMap, keyLuaOfScript, valueLuaOfScript); + pollExecutor.scheduleAtFixedRate(scriptTask, redisParserVO.getPollingStartTime().longValue(), + redisParserVO.getPollingInterval().longValue(), TimeUnit.SECONDS); + } + } +} diff --git a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/polling/ScriptPollingTask.java b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/polling/ScriptPollingTask.java new file mode 100644 index 00000000..a23965c3 --- /dev/null +++ b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/polling/ScriptPollingTask.java @@ -0,0 +1,118 @@ +package com.yomahub.liteflow.parser.redis.mode.polling; + +import cn.hutool.core.util.StrUtil; +import cn.hutool.crypto.digest.DigestUtil; +import com.yomahub.liteflow.flow.FlowBus; +import com.yomahub.liteflow.log.LFLog; +import com.yomahub.liteflow.log.LFLoggerManager; +import com.yomahub.liteflow.parser.redis.mode.RClient; +import com.yomahub.liteflow.parser.redis.mode.RedisParserHelper; +import com.yomahub.liteflow.parser.redis.vo.RedisParserVO; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * 用于轮询script的定时任务 + * + * @author hxinyu + * @since 2.11.0 + */ +public class ScriptPollingTask implements Runnable { + + private RedisParserVO redisParserVO; + + private RClient scriptClient; + + private Integer scriptNum; + + private Map scriptSHAMap; + + private String keyLua; + + private String valueLua; + + LFLog LOG = LFLoggerManager.getLogger(ScriptPollingTask.class); + + public ScriptPollingTask(RedisParserVO redisParserVO, RClient scriptClient, Integer scriptNum, Map scriptSHAMap, String keyLua, String valueLua) { + this.redisParserVO = redisParserVO; + this.scriptClient = scriptClient; + this.scriptNum = scriptNum; + this.scriptSHAMap = scriptSHAMap; + this.keyLua = keyLua; + this.valueLua = valueLua; + } + + /** + * 用于返回script轮询任务 + * 首先根据hash中field数量的变化拉取新增的script + * 再根据hash中value的SHA值修改变化的和被删除的script + */ + @Override + public void run() { + try { + String scriptKey = redisParserVO.getScriptKey(); + //Lua获取scriptKey中最新的script数量 + String keyNum = scriptClient.evalSha(keyLua, scriptKey); + //修改scriptNum为最新script数量 + scriptNum = Integer.parseInt(keyNum); + + List needDelete = new ArrayList<>(); + //遍历Map,判断各个script的value有无变化:修改变化了值的script和被删除的script + for (Map.Entry entry : scriptSHAMap.entrySet()) { + String scriptFieldValue = entry.getKey(); + String oldSHA = entry.getValue(); + //在redis服务端通过Lua脚本计算SHA值 + String newSHA = scriptClient.evalSha(valueLua, scriptKey, scriptFieldValue); + if (StrUtil.equals(newSHA, "nil")) { + //新SHA值为nil, 即未获取到该script,表示该script已被删除 + RedisParserHelper.NodeSimpleVO nodeSimpleVO = RedisParserHelper.convert(scriptFieldValue); + FlowBus.getNodeMap().remove(nodeSimpleVO.getNodeId()); + LOG.info("starting reload flow config... delete key={}", scriptFieldValue); + + //添加到待删除的list 后续统一从SHAMap中移除 + //不在这里直接移除是为了避免先删除导致scriptSHAMap并没有完全遍历完 script删除不全 + needDelete.add(scriptFieldValue); + } + else if (!StrUtil.equals(newSHA, oldSHA)) { + //SHA值发生变化,表示该script的值已被修改,重新拉取变化的script + String scriptData = scriptClient.hget(scriptKey, scriptFieldValue); + RedisParserHelper.changeScriptNode(scriptFieldValue, scriptData); + LOG.info("starting reload flow config... update key={} new value={},", scriptFieldValue, scriptData); + + //修改SHAMap + scriptSHAMap.put(scriptFieldValue, newSHA); + } + //SHA值无变化,表示该script未改变 + } + + //统一从SHAMap中移除要删除的script + for (String scriptFieldValue : needDelete) { + scriptSHAMap.remove(scriptFieldValue); + } + + //处理新添加script和script名被修改的情况 + if (scriptNum > scriptSHAMap.size()) { + //如果封装的SHAMap数量比最新script总数少, 说明有两种情况: + // 1、添加了新script + // 2、修改了script名:因为遍历到旧的id时会取到nil,SHAMap会把原来的script删掉,但没有机会添加新的script + // 3、上述两者结合 + //在此处重新拉取所有script名集合,补充添加新script + Set newScriptSet = scriptClient.hkeys(scriptKey); + for (String scriptFieldValue : newScriptSet) { + if (!scriptSHAMap.containsKey(scriptFieldValue)) { + //将新script添加到LiteFlowChainELBuilder和SHAMap + String scriptData = scriptClient.hget(scriptKey, scriptFieldValue); + RedisParserHelper.changeScriptNode(scriptFieldValue, scriptData); + LOG.info("starting reload flow config... create key={} new value={},", scriptFieldValue, scriptData); + scriptSHAMap.put(scriptFieldValue, DigestUtil.sha1Hex(scriptData)); + } + } + } + } catch (Exception e) { + LOG.error("[Exception during script polling] " + e.getMessage(), e); + } + } +} \ No newline at end of file diff --git a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/subscribe/RedisParserSubscribeMode.java b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/subscribe/RedisParserSubscribeMode.java new file mode 100644 index 00000000..b115baec --- /dev/null +++ b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/subscribe/RedisParserSubscribeMode.java @@ -0,0 +1,201 @@ +package com.yomahub.liteflow.parser.redis.mode.subscribe; + +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.collection.CollectionUtil; +import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.StrUtil; +import com.yomahub.liteflow.builder.el.LiteFlowChainELBuilder; +import com.yomahub.liteflow.flow.FlowBus; +import com.yomahub.liteflow.parser.redis.exception.RedisException; +import com.yomahub.liteflow.parser.redis.mode.RClient; +import com.yomahub.liteflow.parser.redis.mode.RedisMode; +import com.yomahub.liteflow.parser.redis.mode.RedisParserHelper; +import com.yomahub.liteflow.parser.redis.vo.RedisParserVO; +import com.yomahub.liteflow.spi.holder.ContextAwareHolder; +import org.redisson.Redisson; +import org.redisson.api.map.event.EntryCreatedListener; +import org.redisson.api.map.event.EntryRemovedListener; +import org.redisson.api.map.event.EntryUpdatedListener; +import org.redisson.config.Config; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * Redis Pub/Sub机制实现类 + * 使用 Redisson客户端 RMapCache存储结构 + * + * @author hxinyu + * @since 2.11.0 + */ + +public class RedisParserSubscribeMode implements RedisParserHelper { + + private final RedisParserVO redisParserVO; + + private RClient chainClient; + + private RClient scriptClient; + + public RedisParserSubscribeMode(RedisParserVO redisParserVO) { + this.redisParserVO = redisParserVO; + + try { + try { + this.chainClient = ContextAwareHolder.loadContextAware().getBean("chainClient"); + this.scriptClient = ContextAwareHolder.loadContextAware().getBean("scriptClient"); + } + catch (Exception ignored) { + } + if (ObjectUtil.isNull(chainClient)) { + RedisMode redisMode = redisParserVO.getRedisMode(); + Config config; + //Redis单点模式 + if (redisMode.equals(RedisMode.SINGLE)){ + config = getSingleRedissonConfig(redisParserVO, redisParserVO.getChainDataBase()); + this.chainClient = new RClient(Redisson.create(config)); + //如果有脚本数据 + if (ObjectUtil.isNotNull(redisParserVO.getScriptDataBase())) { + config = getSingleRedissonConfig(redisParserVO, redisParserVO.getScriptDataBase()); + this.scriptClient = new RClient(Redisson.create(config)); + } + } + + //Redis哨兵模式 + else if (redisMode.equals(RedisMode.SENTINEL)) { + config = getSentinelRedissonConfig(redisParserVO, redisParserVO.getChainDataBase()); + this.chainClient = new RClient(Redisson.create(config)); + //如果有脚本数据 + if (ObjectUtil.isNotNull(redisParserVO.getScriptDataBase())) { + config = getSentinelRedissonConfig(redisParserVO, redisParserVO.getScriptDataBase()); + this.scriptClient = new RClient(Redisson.create(config)); + } + } + } + } + catch (Exception e) { + throw new RedisException(e.getMessage()); + } + + } + + @Override + public String getContent() { + try { + // 检查chainKey下有没有子节点 + Map chainMap = chainClient.getMap(redisParserVO.getChainKey()); + if (CollectionUtil.isEmpty(chainMap)) { + throw new RedisException(StrUtil.format("There are no chains in key [{}]", + redisParserVO.getChainKey())); + } + // 获取chainKey下的所有子节点内容List + List chainItemContentList = new ArrayList<>(); + for (Map.Entry entry : chainMap.entrySet()) { + String chainId = entry.getKey(); + String chainData = entry.getValue(); + if (StrUtil.isNotBlank(chainData)) { + chainItemContentList.add(StrUtil.format(CHAIN_XML_PATTERN, chainId, chainData)); + } + } + // 合并成所有chain的xml内容 + String chainAllContent = CollUtil.join(chainItemContentList, StrUtil.EMPTY); + + // 检查是否有脚本内容,如果有,进行脚本内容的获取 + String scriptAllContent = StrUtil.EMPTY; + if (hasScript()) { + Map scriptMap = scriptClient.getMap(redisParserVO.getScriptKey()); + List scriptItemContentList = new ArrayList<>(); + for (Map.Entry entry : scriptMap.entrySet()) { + String scriptFieldValue = entry.getKey(); + String scriptData = entry.getValue(); + NodeSimpleVO nodeSimpleVO = RedisParserHelper.convert(scriptFieldValue); + if (ObjectUtil.isNull(nodeSimpleVO)) { + throw new RedisException( + StrUtil.format("The name of the redis field [{}] in scriptKey [{}] is invalid", + scriptFieldValue, redisParserVO.getScriptKey())); + } + // 有语言类型 + if (StrUtil.isNotBlank(nodeSimpleVO.getLanguage())) { + scriptItemContentList.add(StrUtil.format(NODE_ITEM_WITH_LANGUAGE_XML_PATTERN, + nodeSimpleVO.getNodeId(), nodeSimpleVO.getName(), nodeSimpleVO.getType(), + nodeSimpleVO.getLanguage(), scriptData)); + } + // 没有语言类型 + else { + scriptItemContentList.add(StrUtil.format(NODE_ITEM_XML_PATTERN, nodeSimpleVO.getNodeId(), + nodeSimpleVO.getName(), nodeSimpleVO.getType(), scriptData)); + } + } + + scriptAllContent = StrUtil.format(NODE_XML_PATTERN, + CollUtil.join(scriptItemContentList, StrUtil.EMPTY)); + } + + return StrUtil.format(XML_PATTERN, scriptAllContent, chainAllContent); + } + catch (Exception e) { + throw new RedisException(e.getMessage()); + } + } + + public boolean hasScript() { + // 没有scriptClient或没有配置scriptDataBase + if (ObjectUtil.isNull(scriptClient) || ObjectUtil.isNull(redisParserVO.getScriptDataBase())) { + return false; + } + try { + // 存在这个节点,但是子节点不存在 + Map scriptMap = scriptClient.getMap(redisParserVO.getScriptKey()); + return !CollUtil.isEmpty(scriptMap); + } + catch (Exception e) { + return false; + } + } + + /** + * 监听 redis key + */ + @Override + public void listenRedis() { + //监听 chain + String chainKey = redisParserVO.getChainKey(); + //添加新 chain + chainClient.addListener(chainKey, (EntryCreatedListener) event -> { + LOG.info("starting reload flow config... create key={} value={},", event.getKey(), event.getValue()); + LiteFlowChainELBuilder.createChain().setChainId(event.getKey()).setEL(event.getValue()).build(); + }); + //修改 chain + chainClient.addListener(chainKey, (EntryUpdatedListener) event -> { + LOG.info("starting reload flow config... update key={} new value={},", event.getKey(), event.getValue()); + LiteFlowChainELBuilder.createChain().setChainId(event.getKey()).setEL(event.getValue()).build(); + }); + //删除 chain + chainClient.addListener(chainKey, (EntryRemovedListener) event -> { + LOG.info("starting reload flow config... delete key={}", event.getKey()); + FlowBus.removeChain(event.getKey()); + }); + + //监听 script + if (ObjectUtil.isNotNull(scriptClient) && ObjectUtil.isNotNull(redisParserVO.getScriptDataBase())) { + String scriptKey = redisParserVO.getScriptKey(); + //添加 script + scriptClient.addListener(scriptKey, (EntryCreatedListener) event -> { + LOG.info("starting reload flow config... create key={} value={},", event.getKey(), event.getValue()); + RedisParserHelper.changeScriptNode(event.getKey(), event.getValue()); + }); + //修改 script + scriptClient.addListener(scriptKey, (EntryUpdatedListener) event -> { + LOG.info("starting reload flow config... update key={} new value={},", event.getKey(), event.getValue()); + RedisParserHelper.changeScriptNode(event.getKey(), event.getValue()); + }); + //删除 script + scriptClient.addListener(scriptKey, (EntryRemovedListener) event -> { + LOG.info("starting reload flow config... delete key={}", event.getKey()); + NodeSimpleVO nodeSimpleVO = RedisParserHelper.convert(event.getKey()); + FlowBus.getNodeMap().remove(nodeSimpleVO.getNodeId()); + }); + } + } +} diff --git a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/vo/RedisParserVO.java b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/vo/RedisParserVO.java new file mode 100644 index 00000000..7aeaecd7 --- /dev/null +++ b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/vo/RedisParserVO.java @@ -0,0 +1,204 @@ +package com.yomahub.liteflow.parser.redis.vo; + +import com.yomahub.liteflow.parser.redis.mode.RedisMode; +import com.yomahub.liteflow.parser.redis.mode.RedisParserMode; + +import java.util.List; + +/** + * 用于解析RuleSourceExtData的vo类, 用于Redis模式中 + * + * @author hxinyu + * @since 2.11.0 + */ + +public class RedisParserVO { + + /*Redis配置模式 单点/哨兵, 默认为单点模式*/ + private RedisMode redisMode = RedisMode.SINGLE; + + /*单点模式 连接地址*/ + private String host; + + /*单点模式 端口号*/ + private Integer port; + + /*哨兵模式 主节点名*/ + private String masterName; + + /*哨兵模式 哨兵节点连接地址 ip:port, 可配置多个*/ + private List sentinelAddress; + + /*用户名 需要Redis 6.0及以上*/ + private String username; + + /*密码*/ + private String password; + + /*监听机制 轮询为poll 订阅为subscribe 默认为poll*/ + private RedisParserMode mode = RedisParserMode.POLL; + + /*轮询时间间隔(s) 默认60s 若选择订阅机制可不配置*/ + private Integer pollingInterval = 60; + + /*规则配置后首次轮询的起始时间 默认为60s 若选择订阅机制可不配置*/ + private Integer pollingStartTime = 60; + + /*chain表配置的数据库号*/ + private Integer chainDataBase; + + /*chain配置的键名*/ + private String chainKey; + + /*脚本表配置的数据库号 若没有脚本数据可不配置*/ + private Integer scriptDataBase; + + /*脚本配置的键名 若没有脚本数据可不配置*/ + private String scriptKey; + + public void setRedisMode(String redisMode) { + redisMode = redisMode.toUpperCase(); + try{ + RedisMode m = RedisMode.valueOf(redisMode); + this.redisMode = m; + } + catch (Exception ignored) { + //转换出错默认为单点模式 + } + } + + public RedisMode getRedisMode() { + return redisMode; + } + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public Integer getPort() { + return port; + } + + public void setPort(Integer port) { + this.port = port; + } + + public String getMasterName() { + return masterName; + } + + public void setMasterName(String masterName) { + this.masterName = masterName; + } + + public List getSentinelAddress() { + return sentinelAddress; + } + + public void setSentinelAddress(List sentinelAddress) { + this.sentinelAddress = sentinelAddress; + } + + public String getUsername() { + return username; + } + + public void setUsername(String username) { + this.username = username; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public RedisParserMode getMode() { + return mode; + } + + public void setMode(String mode) { + mode = mode.toUpperCase(); + try{ + RedisParserMode m = RedisParserMode.valueOf(mode); + this.mode = m; + } + catch (Exception ignored) { + //枚举类转换出错默认为轮询方式 + } + } + + public Integer getPollingStartTime() { + return pollingStartTime; + } + + public void setPollingStartTime(Integer pollingStartTime) { + this.pollingStartTime = pollingStartTime; + } + + public Integer getPollingInterval() { + return pollingInterval; + } + + public void setPollingInterval(Integer pollingInterval) { + this.pollingInterval = pollingInterval; + } + + public Integer getChainDataBase() { + return chainDataBase; + } + + public void setChainDataBase(Integer chainDataBase) { + this.chainDataBase = chainDataBase; + } + + public String getChainKey() { + return chainKey; + } + + public void setChainKey(String chainKey) { + this.chainKey = chainKey; + } + + public Integer getScriptDataBase() { + return scriptDataBase; + } + + public void setScriptDataBase(Integer scriptDataBase) { + this.scriptDataBase = scriptDataBase; + } + + public String getScriptKey() { + return scriptKey; + } + + public void setScriptKey(String scriptKey) { + this.scriptKey = scriptKey; + } + + @Override + public String toString() { + return "RedisParserVO{" + + "redisMode=" + redisMode + + ", host='" + host + '\'' + + ", port=" + port + + ", masterName=" + masterName + + ", sentinelAddress=" + sentinelAddress + + ", username='" + username + '\'' + + ", password='" + password + '\'' + + ", mode=" + mode + + ", pollingInterval=" + pollingInterval + + ", pollingStartTime=" + pollingStartTime + + ", chainDataBase=" + chainDataBase + + ", chainKey='" + chainKey + '\'' + + ", scriptDataBase=" + scriptDataBase + + ", scriptKey='" + scriptKey + '\'' + + '}'; + } +} diff --git a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/spi/redis/RedisParserClassNameSpi.java b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/spi/redis/RedisParserClassNameSpi.java new file mode 100644 index 00000000..51aeabfd --- /dev/null +++ b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/spi/redis/RedisParserClassNameSpi.java @@ -0,0 +1,18 @@ +package com.yomahub.liteflow.parser.spi.redis; + +import com.yomahub.liteflow.parser.redis.RedisXmlELParser; +import com.yomahub.liteflow.parser.spi.ParserClassNameSpi; + +/** + * Redis 解析器 SPI 实现 + * + * @author hxinyu + * @since 2.11.0 + */ +public class RedisParserClassNameSpi implements ParserClassNameSpi { + + @Override + public String getSpiClassName() { + return RedisXmlELParser.class.getName(); + } +} diff --git a/liteflow-rule-plugin/liteflow-rule-redis/src/main/resources/META-INF/services/com.yomahub.liteflow.parser.spi.ParserClassNameSpi b/liteflow-rule-plugin/liteflow-rule-redis/src/main/resources/META-INF/services/com.yomahub.liteflow.parser.spi.ParserClassNameSpi new file mode 100644 index 00000000..3e341b98 --- /dev/null +++ b/liteflow-rule-plugin/liteflow-rule-redis/src/main/resources/META-INF/services/com.yomahub.liteflow.parser.spi.ParserClassNameSpi @@ -0,0 +1 @@ +com.yomahub.liteflow.parser.spi.redis.RedisParserClassNameSpi \ No newline at end of file diff --git a/liteflow-rule-plugin/pom.xml b/liteflow-rule-plugin/pom.xml index 02de13a6..f915ebb7 100644 --- a/liteflow-rule-plugin/pom.xml +++ b/liteflow-rule-plugin/pom.xml @@ -16,6 +16,7 @@ liteflow-rule-nacos liteflow-rule-etcd liteflow-rule-apollo + liteflow-rule-redis liteflow-rule-plugin diff --git a/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/pom.xml b/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/pom.xml new file mode 100644 index 00000000..d64a3121 --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/pom.xml @@ -0,0 +1,50 @@ + + + + liteflow-testcase-el + com.yomahub + ${revision} + ../pom.xml + + 4.0.0 + + liteflow-testcase-el-redis-springboot + + + + com.yomahub + liteflow-spring-boot-starter + ${revision} + + + + com.yomahub + liteflow-rule-redis + ${revision} + test + + + + org.springframework.boot + spring-boot-starter-test + + + + com.yomahub + liteflow-script-groovy + ${revision} + test + + + + com.yomahub + liteflow-script-graaljs + ${revision} + test + + + + + \ No newline at end of file diff --git a/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/BaseTest.java b/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/BaseTest.java new file mode 100644 index 00000000..e282c0fd --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/BaseTest.java @@ -0,0 +1,22 @@ +package com.yomahub.liteflow.test; + +import com.yomahub.liteflow.core.FlowInitHook; +import com.yomahub.liteflow.flow.FlowBus; +import com.yomahub.liteflow.property.LiteflowConfigGetter; +import com.yomahub.liteflow.spi.holder.SpiFactoryCleaner; +import com.yomahub.liteflow.spring.ComponentScanner; +import com.yomahub.liteflow.thread.ExecutorHelper; +import org.junit.jupiter.api.AfterAll; + +public class BaseTest { + + @AfterAll + public static void cleanScanCache() { + ComponentScanner.cleanCache(); + FlowBus.cleanCache(); + ExecutorHelper.loadInstance().clearExecutorServiceMap(); + SpiFactoryCleaner.clean(); + LiteflowConfigGetter.clean(); + FlowInitHook.cleanHook(); + } +} diff --git a/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/RedisSubscribeTestCondition.java b/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/RedisSubscribeTestCondition.java new file mode 100644 index 00000000..82a307ef --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/RedisSubscribeTestCondition.java @@ -0,0 +1,29 @@ +package com.yomahub.liteflow.test.redis; + +import org.redisson.Redisson; +import org.redisson.api.RedissonClient; +import org.redisson.api.redisnode.RedisNodes; +import org.redisson.api.redisnode.RedisSingle; +import org.redisson.config.Config; + +/** + * 判断本地是否启动Redis + * + * @author hxinyu + * @since 2.11.0 + */ +public class RedisSubscribeTestCondition { + + /* 若6379端口未启动Redis则返回true */ + public static boolean notStartRedis() { + try{ + Config config = new Config(); + config.useSingleServer().setAddress("redis://127.0.0.1:6379"); + RedissonClient redissonClient = Redisson.create(config); + RedisSingle redisNode = redissonClient.getRedisNodes(RedisNodes.SINGLE); + return !redisNode.pingAll(); + } catch (Exception e) { + return true; + } + } +} diff --git a/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/RedisWithXmlELPollSpringbootTest.java b/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/RedisWithXmlELPollSpringbootTest.java new file mode 100644 index 00000000..05749ee9 --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/RedisWithXmlELPollSpringbootTest.java @@ -0,0 +1,172 @@ +package com.yomahub.liteflow.test.redis; + +import cn.hutool.crypto.digest.DigestUtil; +import com.yomahub.liteflow.core.FlowExecutor; +import com.yomahub.liteflow.core.FlowInitHook; +import com.yomahub.liteflow.flow.FlowBus; +import com.yomahub.liteflow.flow.LiteflowResponse; +import com.yomahub.liteflow.parser.redis.mode.RClient; +import com.yomahub.liteflow.property.LiteflowConfigGetter; +import com.yomahub.liteflow.slot.DefaultContext; +import com.yomahub.liteflow.spi.holder.SpiFactoryCleaner; +import com.yomahub.liteflow.spring.ComponentScanner; +import com.yomahub.liteflow.test.BaseTest; +import com.yomahub.liteflow.thread.ExecutorHelper; +import org.junit.jupiter.api.*; +import org.junit.jupiter.api.extension.ExtendWith; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.junit.jupiter.SpringExtension; +import javax.annotation.Resource; +import java.util.HashSet; +import java.util.Set; + +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.when; + +/** + * springboot环境下的redis配置源轮询拉取模式功能测试 + * + * @author hxinyu + * @since 2.11.0 + */ +@ExtendWith(SpringExtension.class) +@TestPropertySource(value = "classpath:/redis/application-poll-xml.properties") +@SpringBootTest(classes = RedisWithXmlELPollSpringbootTest.class) +@EnableAutoConfiguration +@ComponentScan({"com.yomahub.liteflow.test.redis.cmp"}) +public class RedisWithXmlELPollSpringbootTest extends BaseTest { + + @MockBean(name = "chainClient") + private static RClient chainClient; + + @MockBean(name = "scriptClient") + private static RClient scriptClient; + + @Resource + private FlowExecutor flowExecutor; + + //计算hash中field数量的lua脚本 + private final String luaOfKey = "local keys = redis.call(\"hkeys\", KEYS[1]);\n" + + "return #keys;\n"; + + //计算hash中value的SHA值的lua脚本 + private final String luaOfValue = "local key = KEYS[1];\n" + + "local field = KEYS[2];\n" + + "local value, err = redis.call(\"hget\", key, field);\n" + + "if value == false or value == nil then\n" + + " return \"nil\";\n" + + "end\n" + + "local sha1 = redis.sha1hex(value);\n" + + "return sha1;"; + + + @AfterEach + public void after() { + FlowBus.cleanCache(); + FlowInitHook.cleanHook(); + ExecutorHelper.loadInstance().clearExecutorServiceMap(); + SpiFactoryCleaner.clean(); + } + + /** + * 测试chain + */ + @Test + public void testPollWithXml() throws InterruptedException { + Set chainNameSet = new HashSet<>(); + chainNameSet.add("chain11"); + String chainValue = "THEN(a, b, c);"; + //SHA值用于测试修改chain的轮询刷新功能 + String chainSHA = DigestUtil.sha1Hex(chainValue); + + //修改chain并更新SHA值 + String changeChainValue = "THEN(a, c);"; + String changeChainSHA = DigestUtil.sha1Hex(changeChainValue); + when(chainClient.hkeys("pollChainKey")).thenReturn(chainNameSet); + when(chainClient.hget("pollChainKey", "chain11")).thenReturn(chainValue).thenReturn(changeChainValue); + when(chainClient.scriptLoad(luaOfKey)).thenReturn("keysha"); + when(chainClient.scriptLoad(luaOfValue)).thenReturn("valuesha"); + when(chainClient.evalSha(eq("keysha"), anyString())).thenReturn("1"); + when(chainClient.evalSha(eq("valuesha"), anyString(), anyString())).thenReturn(chainSHA).thenReturn(changeChainSHA); + //这里其实并没有script数据 预设数据只是为了不产生NumberFormatException + when(scriptClient.scriptLoad(luaOfKey)).thenReturn("keysha"); + when(scriptClient.scriptLoad(luaOfValue)).thenReturn("valuesha"); + when(scriptClient.evalSha(eq("keysha"), anyString())).thenReturn("0"); + when(scriptClient.evalSha(eq("valuesha"), anyString(), anyString())).thenReturn(""); + + //测试修改前的chain + LiteflowResponse response = flowExecutor.execute2Resp("chain11", "arg"); + Assertions.assertTrue(response.isSuccess()); + Assertions.assertEquals("a==>b==>c", response.getExecuteStepStr()); + + Thread.sleep(4000); + + //测试修改后的chain + response = flowExecutor.execute2Resp("chain11", "arg"); + Assertions.assertTrue(response.isSuccess()); + Assertions.assertEquals("a==>c", response.getExecuteStepStr()); + } + + /** + * 测试script + */ + @Test + public void testPollWithScript() throws InterruptedException { + Set chainNameSet = new HashSet<>(); + chainNameSet.add("chain22"); + String chainValue = "THEN(s11, s22, s33, a, b);"; + when(chainClient.hkeys("pollChainKey")).thenReturn(chainNameSet); + when(chainClient.hget("pollChainKey", "chain22")).thenReturn(chainValue); + when(chainClient.scriptLoad(luaOfKey)).thenReturn("keysha"); + when(chainClient.scriptLoad(luaOfValue)).thenReturn("valuesha"); + when(chainClient.evalSha(eq("keysha"), anyString())).thenReturn("1"); + when(chainClient.evalSha(eq("valuesha"), anyString(), anyString())).thenReturn(""); + + Set scriptFieldSet = new HashSet<>(); + scriptFieldSet.add("s11:script:脚本s11:groovy"); + scriptFieldSet.add("s22:script:脚本s22:js"); + scriptFieldSet.add("s33:script:脚本s33"); + String s11 = "defaultContext.setData(\"test11\",\"hello s11\");"; + String s22 = "defaultContext.setData(\"test22\",\"hello s22\");"; + String s33 = "defaultContext.setData(\"test33\",\"hello s33\");"; + //SHA值用于测试修改script的轮询刷新功能 + String s11SHA = DigestUtil.sha1Hex(s11); + String s22SHA = DigestUtil.sha1Hex(s22); + String s33SHA = DigestUtil.sha1Hex(s33); + //修改script值并更新SHA值 + String changeS11 = "defaultContext.setData(\"test11\",\"hello world\");"; + String changeS11SHA = DigestUtil.sha1Hex(changeS11); + + when(scriptClient.hkeys("pollScriptKey")).thenReturn(scriptFieldSet); + when(scriptClient.hget("pollScriptKey", "s11:script:脚本s11:groovy")).thenReturn(s11).thenReturn(changeS11); + when(scriptClient.hget("pollScriptKey", "s22:script:脚本s22:js")).thenReturn(s22); + when(scriptClient.hget("pollScriptKey", "s33:script:脚本s33")).thenReturn(s33); + //分别模拟三个script的evalsha指纹值计算的返回值, 其中s11脚本修改 指纹值变化 + when(scriptClient.scriptLoad(luaOfKey)).thenReturn("keysha"); + when(scriptClient.scriptLoad(luaOfValue)).thenReturn("valuesha"); + when(scriptClient.evalSha(eq("keysha"), anyString())).thenReturn("3"); + when(scriptClient.evalSha("valuesha", "pollScriptKey", "s11:script:脚本s11:groovy")).thenReturn(s11SHA).thenReturn(changeS11SHA); + when(scriptClient.evalSha("valuesha", "pollScriptKey", "s22:script:脚本s22:js")).thenReturn(s22SHA); + when(scriptClient.evalSha("valuesha", "pollScriptKey", "s33:script:脚本s33")).thenReturn(s33SHA); + + //测试修改前的script + LiteflowResponse response = flowExecutor.execute2Resp("chain22", "arg"); + DefaultContext context = response.getFirstContextBean(); + Assertions.assertTrue(response.isSuccess()); + Assertions.assertEquals("hello s11", context.getData("test11")); + Assertions.assertEquals("hello s22", context.getData("test22")); + Assertions.assertEquals("s11[脚本s11]==>s22[脚本s22]==>s33[脚本s33]==>a==>b", response.getExecuteStepStrWithoutTime()); + + Thread.sleep(4000); + + //测试修改后的script + response = flowExecutor.execute2Resp("chain22", "arg"); + context = response.getFirstContextBean(); + Assertions.assertTrue(response.isSuccess()); + Assertions.assertEquals("hello world", context.getData("test11")); + } +} diff --git a/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/RedisWithXmlELSubscribeSpringbootTest.java b/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/RedisWithXmlELSubscribeSpringbootTest.java new file mode 100644 index 00000000..02575d0e --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/RedisWithXmlELSubscribeSpringbootTest.java @@ -0,0 +1,196 @@ +package com.yomahub.liteflow.test.redis; + +import cn.hutool.core.util.ObjectUtil; +import com.yomahub.liteflow.core.FlowExecutor; +import com.yomahub.liteflow.flow.LiteflowResponse; +import com.yomahub.liteflow.parser.redis.vo.RedisParserVO; +import com.yomahub.liteflow.property.LiteflowConfig; +import com.yomahub.liteflow.property.LiteflowConfigGetter; +import com.yomahub.liteflow.slot.DefaultContext; +import com.yomahub.liteflow.test.BaseTest; +import com.yomahub.liteflow.util.JsonUtil; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.redisson.Redisson; +import org.redisson.api.RMapCache; +import org.redisson.api.RedissonClient; +import org.redisson.config.Config; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.junit.jupiter.DisabledIf; +import org.springframework.test.context.junit.jupiter.DisabledIfCondition; +import org.springframework.test.context.junit.jupiter.SpringExtension; + +import javax.annotation.Resource; + +/** + * springboot环境下的redis配置源订阅模式功能测试 + * + * 由于Redisson中RMapCache的监听器功能无法mock测试 + * 故Sub模式测试用例需本地启动Redis服务 连接地址: 127.0.0.1:6379 + * 若本地该端口号未启动Redis 则自动忽略本类中测试用例 + * + * 测试用例会在1号database中添加测试数据 chainKey:testChainKey; scriptKey:testScriptKey + * 测试完成后清除测试数据 + * + * @author hxinyu + * @since 2.11.0 + */ +@ExtendWith({SpringExtension.class, DisabledIfCondition.class}) +@TestPropertySource(value = "classpath:/redis/application-sub-xml.properties") +@SpringBootTest(classes = RedisWithXmlELSubscribeSpringbootTest.class) +@EnableAutoConfiguration +@ComponentScan({"com.yomahub.liteflow.test.redis.cmp"}) +@DisabledIf("#{T(com.yomahub.liteflow.test.redis.RedisSubscribeTestCondition).notStartRedis()}") +public class RedisWithXmlELSubscribeSpringbootTest extends BaseTest { + + private static RedissonClient redissonClient; + + @Resource + private FlowExecutor flowExecutor; + + @BeforeAll + public static void setUpBeforeClass() { + Config config = new Config(); + config.useSingleServer().setAddress("redis://127.0.0.1:6379").setDatabase(1); + redissonClient = Redisson.create(config); + RMapCache chainKey = redissonClient.getMapCache("testChainKey"); + RMapCache scriptKey = redissonClient.getMapCache("testScriptKey"); + scriptKey.put("s1:script:脚本s1:groovy", "defaultContext.setData(\"test1\",\"hello s1\");"); + scriptKey.put("s2:script:脚本s2:js", "defaultContext.setData(\"test2\",\"hello s2\");"); + scriptKey.put("s3:script:脚本s3", "defaultContext.setData(\"test3\",\"hello s3\");"); + chainKey.put("chain1", "THEN(a, b, c);"); + chainKey.put("chain2", "THEN(a, b, c, s3);"); + chainKey.put("chain3", "THEN(a, b, c, s1, s2);"); + } + + @AfterAll + public static void after(){ + testCleanData(); + } + + /** + * 测试chain + */ + @Test + public void testSubWithXml() throws InterruptedException { + LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg"); + Assertions.assertTrue(response.isSuccess()); + Assertions.assertEquals("a==>b==>c", response.getExecuteStepStr()); + + //修改redis中规则 + changeXMLData(); + //重新加载规则 + Thread.sleep(100); + Assertions.assertEquals("a==>c==>b", flowExecutor.execute2Resp("chain1", "arg").getExecuteStepStr()); + + //删除redis中规则 + deleteXMLData(); + //重新加载规则 + Thread.sleep(100); + response = flowExecutor.execute2Resp("chain1", "arg"); + Assertions.assertTrue(!response.isSuccess()); + + //添加redis中规则 + addXMLData(); + //重新加载规则 + Thread.sleep(100); + Assertions.assertEquals("b==>c", flowExecutor.execute2Resp("chain4", "arg").getExecuteStepStr()); + } + + /** + * 测试script + */ + @Test + public void testSubWithScriptXml() throws InterruptedException { + LiteflowResponse response = flowExecutor.execute2Resp("chain3", "arg"); + DefaultContext context = response.getFirstContextBean(); + Assertions.assertTrue(response.isSuccess()); + Assertions.assertEquals("hello s1", context.getData("test1")); + Assertions.assertEquals("a==>b==>c==>s1[脚本s1]==>s2[脚本s2]", response.getExecuteStepStrWithoutTime()); + + //添加和删除脚本 + addAndDeleteScriptData(); + //修改redis脚本 + changeScriptData(); + Thread.sleep(100); + context = flowExecutor.execute2Resp("chain3", "arg").getFirstContextBean(); + Assertions.assertEquals("hello s1 version2", context.getData("test1")); + context = flowExecutor.execute2Resp("chain2", "arg").getFirstContextBean(); + Assertions.assertEquals("hello s3 version2", context.getData("test2")); + } + + /** + * 修改redisson中的chain + */ + public void changeXMLData() { + LiteflowConfig liteflowConfig = LiteflowConfigGetter.get(); + RedisParserVO redisParserVO = JsonUtil.parseObject(liteflowConfig.getRuleSourceExtData(), RedisParserVO.class); + RMapCache chainKey = redissonClient.getMapCache(redisParserVO.getChainKey()); + chainKey.put("chain1", "THEN(a, c, b);"); + } + + /** + * 删除redisson中的chain + */ + public void deleteXMLData() { + LiteflowConfig liteflowConfig = LiteflowConfigGetter.get(); + RedisParserVO redisParserVO = JsonUtil.parseObject(liteflowConfig.getRuleSourceExtData(), RedisParserVO.class); + RMapCache chainKey = redissonClient.getMapCache(redisParserVO.getChainKey()); + chainKey.remove("chain1"); + chainKey.remove("chain4"); + } + + /** + * 新增redisson中的chain + */ + public void addXMLData() { + LiteflowConfig liteflowConfig = LiteflowConfigGetter.get(); + RedisParserVO redisParserVO = JsonUtil.parseObject(liteflowConfig.getRuleSourceExtData(), RedisParserVO.class); + RMapCache chainKey = redissonClient.getMapCache(redisParserVO.getChainKey()); + chainKey.put("chain4","THEN(b, c);"); + } + + /** + * 修改redisson中的脚本 + */ + public void changeScriptData() { + LiteflowConfig liteflowConfig = LiteflowConfigGetter.get(); + RedisParserVO redisParserVO = JsonUtil.parseObject(liteflowConfig.getRuleSourceExtData(), RedisParserVO.class); + RMapCache scriptKey = redissonClient.getMapCache(redisParserVO.getScriptKey()); + scriptKey.put("s1:script:脚本s1:groovy", "defaultContext.setData(\"test1\",\"hello s1 version2\");"); + scriptKey.put("s3:script:脚本s3", "defaultContext.setData(\"test2\",\"hello s3 version2\");"); + } + + /** + * 新增和删除redisson中的chain + */ + public void addAndDeleteScriptData() { + LiteflowConfig liteflowConfig = LiteflowConfigGetter.get(); + RedisParserVO redisParserVO = JsonUtil.parseObject(liteflowConfig.getRuleSourceExtData(), RedisParserVO.class); + RMapCache scriptKey = redissonClient.getMapCache(redisParserVO.getScriptKey()); + scriptKey.remove("s3:script:脚本s3"); + scriptKey.put("s5:script:脚本s5:groovy", "defaultContext.setData(\"test1\",\"hello s5\");"); + } + + //redis内规则数据数据清空 + public static void testCleanData(){ + if(ObjectUtil.isNotNull(redissonClient)){ + RMapCache chainKey = redissonClient.getMapCache("testChainKey"); + RMapCache scriptKey = redissonClient.getMapCache("testScriptKey"); + for (String key : chainKey.keySet()) { + chainKey.remove(key); + } + for (String key : scriptKey.keySet()) { + scriptKey.remove(key); + } + chainKey.delete(); + scriptKey.delete(); + } + } +} diff --git a/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/cmp/ACmp.java b/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/cmp/ACmp.java new file mode 100644 index 00000000..34aae4c0 --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/cmp/ACmp.java @@ -0,0 +1,21 @@ +/** + *

Title: liteflow

+ *

Description: 轻量级的组件式流程框架

+ * @author Bryan.Zhang + * @email weenyc31@163.com + * @Date 2020/4/1 + */ +package com.yomahub.liteflow.test.redis.cmp; + +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +@Component("a") +public class ACmp extends NodeComponent { + + @Override + public void process() { + System.out.println("ACmp executed!"); + } + +} diff --git a/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/cmp/BCmp.java b/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/cmp/BCmp.java new file mode 100644 index 00000000..86ea7f13 --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/cmp/BCmp.java @@ -0,0 +1,21 @@ +/** + *

Title: liteflow

+ *

Description: 轻量级的组件式流程框架

+ * @author Bryan.Zhang + * @email weenyc31@163.com + * @Date 2020/4/1 + */ +package com.yomahub.liteflow.test.redis.cmp; + +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +@Component("b") +public class BCmp extends NodeComponent { + + @Override + public void process() { + System.out.println("BCmp executed!"); + } + +} diff --git a/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/cmp/CCmp.java b/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/cmp/CCmp.java new file mode 100644 index 00000000..983980a0 --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/cmp/CCmp.java @@ -0,0 +1,21 @@ +/** + *

Title: liteflow

+ *

Description: 轻量级的组件式流程框架

+ * @author Bryan.Zhang + * @email weenyc31@163.com + * @Date 2020/4/1 + */ +package com.yomahub.liteflow.test.redis.cmp; + +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +@Component("c") +public class CCmp extends NodeComponent { + + @Override + public void process() { + System.out.println("CCmp executed!"); + } + +} \ No newline at end of file diff --git a/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/resources/redis/application-poll-xml.properties b/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/resources/redis/application-poll-xml.properties new file mode 100644 index 00000000..aea34f3c --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/resources/redis/application-poll-xml.properties @@ -0,0 +1,11 @@ +liteflow.rule-source-ext-data={\ + "host":"localhost",\ + "port":6379,\ + "pollingInterval":2,\ + "pollingStartTime":2,\ + "chainDataBase":1,\ + "chainKey":"pollChainKey",\ + "scriptDataBase":1,\ + "scriptKey":"pollScriptKey"\ + } +liteflow.parse-on-start=false \ No newline at end of file diff --git a/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/resources/redis/application-sub-xml.properties b/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/resources/redis/application-sub-xml.properties new file mode 100644 index 00000000..6d841fea --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/resources/redis/application-sub-xml.properties @@ -0,0 +1,10 @@ +liteflow.rule-source-ext-data={\ + "host":"localhost",\ + "port":6379,\ + "mode":"sub",\ + "chainDataBase":1,\ + "chainKey":"testChainKey",\ + "scriptDataBase":1,\ + "scriptKey":"testScriptKey"\ + } +liteflow.parse-on-start=false \ No newline at end of file diff --git a/liteflow-testcase-el/pom.xml b/liteflow-testcase-el/pom.xml index 8f23e9e9..e824294a 100644 --- a/liteflow-testcase-el/pom.xml +++ b/liteflow-testcase-el/pom.xml @@ -30,6 +30,7 @@ liteflow-testcase-el-nacos-springboot liteflow-testcase-el-etcd-springboot liteflow-testcase-el-apollo-springboot + liteflow-testcase-el-redis-springboot liteflow-testcase-el-script-python-springboot liteflow-testcase-el-script-lua-springboot liteflow-testcase-el-script-multi-language-springboot diff --git a/pom.xml b/pom.xml index c0b2652e..3c877f57 100644 --- a/pom.xml +++ b/pom.xml @@ -1,4 +1,4 @@ - + 4.0.0 @@ -75,6 +75,8 @@ 5.3.3 2.11.0 1.3.5 + 3.21.0 + 5.8.18 3.1.9