feature #I61D1N 规则层面增加一个enable的选项,可以禁用规则
This commit is contained in:
parent
488bc126c4
commit
ff9c786a3b
|
@ -24,6 +24,7 @@ import org.slf4j.LoggerFactory;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.function.Function;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static com.ctrip.framework.apollo.enums.PropertyChangeType.DELETED;
|
import static com.ctrip.framework.apollo.enums.PropertyChangeType.DELETED;
|
||||||
|
@ -91,7 +92,6 @@ public class ApolloParseHelper {
|
||||||
|
|
||||||
List<String> scriptItemContentList = scriptNamespaces.stream()
|
List<String> scriptItemContentList = scriptNamespaces.stream()
|
||||||
.map(item -> convert(item, scriptConfig.getProperty(item, StrUtil.EMPTY)))
|
.map(item -> convert(item, scriptConfig.getProperty(item, StrUtil.EMPTY)))
|
||||||
.filter(Objects::nonNull)
|
|
||||||
.map(RuleParsePluginUtil::toScriptXml)
|
.map(RuleParsePluginUtil::toScriptXml)
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
|
|
||||||
|
@ -115,7 +115,7 @@ public class ApolloParseHelper {
|
||||||
String newValue = configChange.getNewValue();
|
String newValue = configChange.getNewValue();
|
||||||
PropertyChangeType changeType = configChange.getChangeType();
|
PropertyChangeType changeType = configChange.getChangeType();
|
||||||
Pair<Boolean/*启停*/, String/*id*/> pair = RuleParsePluginUtil.parseIdKey(changeKey);
|
Pair<Boolean/*启停*/, String/*id*/> pair = RuleParsePluginUtil.parseIdKey(changeKey);
|
||||||
String id = pair.getValue();
|
String chainId = pair.getValue();
|
||||||
switch (changeType) {
|
switch (changeType) {
|
||||||
case ADDED:
|
case ADDED:
|
||||||
case MODIFIED:
|
case MODIFIED:
|
||||||
|
@ -123,16 +123,16 @@ public class ApolloParseHelper {
|
||||||
newValue);
|
newValue);
|
||||||
// 如果是启用,就正常更新
|
// 如果是启用,就正常更新
|
||||||
if (pair.getKey()) {
|
if (pair.getKey()) {
|
||||||
LiteFlowChainELBuilder.createChain().setChainId(id).setEL(newValue).build();
|
LiteFlowChainELBuilder.createChain().setChainId(chainId).setEL(newValue).build();
|
||||||
}
|
}
|
||||||
// 如果是禁用,就删除
|
// 如果是禁用,就删除
|
||||||
else {
|
else {
|
||||||
FlowBus.removeChain(id);
|
FlowBus.removeChain(chainId);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case DELETED:
|
case DELETED:
|
||||||
LOG.info("starting reload flow config... delete key={}", changeKey);
|
LOG.info("starting reload flow config... delete key={}", changeKey);
|
||||||
FlowBus.removeChain(id);
|
FlowBus.removeChain(chainId);
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
@ -148,11 +148,6 @@ public class ApolloParseHelper {
|
||||||
newValue = null;
|
newValue = null;
|
||||||
}
|
}
|
||||||
NodeConvertHelper.NodeSimpleVO nodeSimpleVO = convert(changeKey, newValue);
|
NodeConvertHelper.NodeSimpleVO nodeSimpleVO = convert(changeKey, newValue);
|
||||||
if (Objects.isNull(nodeSimpleVO)) {
|
|
||||||
// key不符合规范的时候,直接忽略
|
|
||||||
LOG.error("key={} is not a valid node config, ignore it", changeKey);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
switch (changeType) {
|
switch (changeType) {
|
||||||
case ADDED:
|
case ADDED:
|
||||||
case MODIFIED:
|
case MODIFIED:
|
||||||
|
@ -171,12 +166,12 @@ public class ApolloParseHelper {
|
||||||
}
|
}
|
||||||
// 禁用就删除
|
// 禁用就删除
|
||||||
else {
|
else {
|
||||||
FlowBus.getNodeMap().remove(nodeSimpleVO.getNodeId());
|
FlowBus.unloadScriptNode(nodeSimpleVO.getNodeId());
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case DELETED:
|
case DELETED:
|
||||||
LOG.info("starting reload flow config... delete key={}", changeKey);
|
LOG.info("starting reload flow config... delete key={}", changeKey);
|
||||||
FlowBus.getNodeMap().remove(nodeSimpleVO.getNodeId());
|
FlowBus.unloadScriptNode(nodeSimpleVO.getNodeId());
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
|
@ -141,20 +141,19 @@ public class EtcdParserHelper {
|
||||||
LOG.info("starting reload flow config... update path={} value={},", updatePath, updateValue);
|
LOG.info("starting reload flow config... update path={} value={},", updatePath, updateValue);
|
||||||
String changeKey = FileNameUtil.getName(updatePath);
|
String changeKey = FileNameUtil.getName(updatePath);
|
||||||
Pair<Boolean/*启停*/, String/*id*/> pair = RuleParsePluginUtil.parseIdKey(changeKey);
|
Pair<Boolean/*启停*/, String/*id*/> pair = RuleParsePluginUtil.parseIdKey(changeKey);
|
||||||
Boolean enable = pair.getKey();
|
String chainId = pair.getValue();
|
||||||
String id = pair.getValue();
|
|
||||||
// 如果是启用,就正常更新
|
// 如果是启用,就正常更新
|
||||||
if (pair.getKey()) {
|
if (pair.getKey()) {
|
||||||
LiteFlowChainELBuilder.createChain().setChainId(id).setEL(updateValue).build();
|
LiteFlowChainELBuilder.createChain().setChainId(chainId).setEL(updateValue).build();
|
||||||
}
|
}
|
||||||
// 如果是禁用,就删除
|
// 如果是禁用,就删除
|
||||||
else {
|
else {
|
||||||
FlowBus.removeChain(id);
|
FlowBus.removeChain(chainId);
|
||||||
}
|
}
|
||||||
}, (deletePath) -> {
|
}, (deletePath) -> {
|
||||||
LOG.info("starting reload flow config... delete path={}", deletePath);
|
LOG.info("starting reload flow config... delete path={}", deletePath);
|
||||||
String chainName = FileNameUtil.getName(deletePath);
|
String chainKey = FileNameUtil.getName(deletePath);
|
||||||
Pair<Boolean/*启停*/, String/*id*/> pair = RuleParsePluginUtil.parseIdKey(chainName);
|
Pair<Boolean/*启停*/, String/*id*/> pair = RuleParsePluginUtil.parseIdKey(chainKey);
|
||||||
FlowBus.removeChain(pair.getValue());
|
FlowBus.removeChain(pair.getValue());
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -175,13 +174,13 @@ public class EtcdParserHelper {
|
||||||
}
|
}
|
||||||
// 禁用就删除
|
// 禁用就删除
|
||||||
else {
|
else {
|
||||||
FlowBus.getNodeMap().remove(nodeSimpleVO.getNodeId());
|
FlowBus.unloadScriptNode(nodeSimpleVO.getNodeId());
|
||||||
}
|
}
|
||||||
}, (deletePath) -> {
|
}, (deletePath) -> {
|
||||||
LOG.info("starting reload flow config... delete path={}", deletePath);
|
LOG.info("starting reload flow config... delete path={}", deletePath);
|
||||||
String scriptNodeValue = FileNameUtil.getName(deletePath);
|
String scriptNodeValue = FileNameUtil.getName(deletePath);
|
||||||
NodeConvertHelper.NodeSimpleVO nodeSimpleVO = NodeConvertHelper.convert(scriptNodeValue);
|
NodeConvertHelper.NodeSimpleVO nodeSimpleVO = NodeConvertHelper.convert(scriptNodeValue);
|
||||||
FlowBus.getNodeMap().remove(nodeSimpleVO.getNodeId());
|
FlowBus.unloadScriptNode(nodeSimpleVO.getNodeId());
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,13 +1,18 @@
|
||||||
package com.yomahub.liteflow.parser.redis.mode;
|
package com.yomahub.liteflow.parser.redis.mode;
|
||||||
|
|
||||||
|
import cn.hutool.core.lang.Pair;
|
||||||
import cn.hutool.core.text.StrFormatter;
|
import cn.hutool.core.text.StrFormatter;
|
||||||
|
import cn.hutool.core.util.BooleanUtil;
|
||||||
import cn.hutool.core.util.StrUtil;
|
import cn.hutool.core.util.StrUtil;
|
||||||
import com.yomahub.liteflow.builder.LiteFlowNodeBuilder;
|
import com.yomahub.liteflow.builder.LiteFlowNodeBuilder;
|
||||||
|
import com.yomahub.liteflow.builder.el.LiteFlowChainELBuilder;
|
||||||
import com.yomahub.liteflow.enums.NodeTypeEnum;
|
import com.yomahub.liteflow.enums.NodeTypeEnum;
|
||||||
|
import com.yomahub.liteflow.flow.FlowBus;
|
||||||
import com.yomahub.liteflow.log.LFLog;
|
import com.yomahub.liteflow.log.LFLog;
|
||||||
import com.yomahub.liteflow.log.LFLoggerManager;
|
import com.yomahub.liteflow.log.LFLoggerManager;
|
||||||
import com.yomahub.liteflow.parser.helper.NodeConvertHelper;
|
import com.yomahub.liteflow.parser.helper.NodeConvertHelper;
|
||||||
import com.yomahub.liteflow.parser.redis.vo.RedisParserVO;
|
import com.yomahub.liteflow.parser.redis.vo.RedisParserVO;
|
||||||
|
import com.yomahub.liteflow.util.RuleParsePluginUtil;
|
||||||
import org.redisson.config.Config;
|
import org.redisson.config.Config;
|
||||||
import org.redisson.config.SentinelServersConfig;
|
import org.redisson.config.SentinelServersConfig;
|
||||||
|
|
||||||
|
@ -15,6 +20,7 @@ import org.redisson.config.SentinelServersConfig;
|
||||||
* Redis 解析器通用接口
|
* Redis 解析器通用接口
|
||||||
*
|
*
|
||||||
* @author hxinyu
|
* @author hxinyu
|
||||||
|
* @author Bryan.Zhang
|
||||||
* @since 2.11.0
|
* @since 2.11.0
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
@ -105,29 +111,48 @@ public interface RedisParserHelper {
|
||||||
/**
|
/**
|
||||||
* script节点的修改/添加
|
* script节点的修改/添加
|
||||||
*
|
*
|
||||||
* @param scriptFieldValue 新的script名
|
* @param scriptKeyValue 新的script名
|
||||||
* @param newValue 新的script值
|
* @param newValue 新的script值
|
||||||
*/
|
*/
|
||||||
static void changeScriptNode(String scriptFieldValue, String newValue) {
|
static boolean changeScriptNode(String scriptKeyValue, String newValue) {
|
||||||
NodeConvertHelper.NodeSimpleVO nodeSimpleVO = NodeConvertHelper.convert(scriptFieldValue);
|
NodeConvertHelper.NodeSimpleVO nodeSimpleVO = NodeConvertHelper.convert(scriptKeyValue);
|
||||||
// 有语言类型
|
|
||||||
if (StrUtil.isNotBlank(nodeSimpleVO.getLanguage())) {
|
if (BooleanUtil.isTrue(nodeSimpleVO.getEnable())){
|
||||||
LiteFlowNodeBuilder.createScriptNode()
|
// 有语言类型
|
||||||
.setId(nodeSimpleVO.getNodeId())
|
if (StrUtil.isNotBlank(nodeSimpleVO.getLanguage())) {
|
||||||
.setType(NodeTypeEnum.getEnumByCode(nodeSimpleVO.getType()))
|
LiteFlowNodeBuilder.createScriptNode()
|
||||||
.setName(nodeSimpleVO.getName())
|
.setId(nodeSimpleVO.getNodeId())
|
||||||
.setScript(newValue)
|
.setType(NodeTypeEnum.getEnumByCode(nodeSimpleVO.getType()))
|
||||||
.setLanguage(nodeSimpleVO.getLanguage())
|
.setName(nodeSimpleVO.getName())
|
||||||
.build();
|
.setScript(newValue)
|
||||||
|
.setLanguage(nodeSimpleVO.getLanguage())
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
// 没有语言类型
|
||||||
|
else {
|
||||||
|
LiteFlowNodeBuilder.createScriptNode()
|
||||||
|
.setId(nodeSimpleVO.getNodeId())
|
||||||
|
.setType(NodeTypeEnum.getEnumByCode(nodeSimpleVO.getType()))
|
||||||
|
.setName(nodeSimpleVO.getName())
|
||||||
|
.setScript(newValue)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}else{
|
||||||
|
FlowBus.unloadScriptNode(nodeSimpleVO.getNodeId());
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
// 没有语言类型
|
}
|
||||||
|
|
||||||
|
static void changeChain(String chainId, String value) {
|
||||||
|
Pair<Boolean/*启停*/, String/*id*/> pair = RuleParsePluginUtil.parseIdKey(chainId);
|
||||||
|
// 如果是启用,就正常更新
|
||||||
|
if (BooleanUtil.isTrue(pair.getKey())) {
|
||||||
|
LiteFlowChainELBuilder.createChain().setChainId(chainId).setEL(value).build();
|
||||||
|
}
|
||||||
|
// 如果是禁用,就删除
|
||||||
else {
|
else {
|
||||||
LiteFlowNodeBuilder.createScriptNode()
|
FlowBus.removeChain(chainId);
|
||||||
.setId(nodeSimpleVO.getNodeId())
|
|
||||||
.setType(NodeTypeEnum.getEnumByCode(nodeSimpleVO.getType()))
|
|
||||||
.setName(nodeSimpleVO.getName())
|
|
||||||
.setScript(newValue)
|
|
||||||
.build();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
package com.yomahub.liteflow.parser.redis.mode.polling;
|
package com.yomahub.liteflow.parser.redis.mode.polling;
|
||||||
|
|
||||||
|
import cn.hutool.core.util.BooleanUtil;
|
||||||
import cn.hutool.core.util.StrUtil;
|
import cn.hutool.core.util.StrUtil;
|
||||||
import cn.hutool.crypto.digest.DigestUtil;
|
import cn.hutool.crypto.digest.DigestUtil;
|
||||||
import com.yomahub.liteflow.flow.FlowBus;
|
import com.yomahub.liteflow.flow.FlowBus;
|
||||||
|
@ -70,7 +71,7 @@ public class ScriptPollingTask implements Runnable {
|
||||||
if (StrUtil.equals(newSHA, "nil")) {
|
if (StrUtil.equals(newSHA, "nil")) {
|
||||||
//新SHA值为nil, 即未获取到该script,表示该script已被删除
|
//新SHA值为nil, 即未获取到该script,表示该script已被删除
|
||||||
NodeConvertHelper.NodeSimpleVO nodeSimpleVO = NodeConvertHelper.convert(scriptFieldValue);
|
NodeConvertHelper.NodeSimpleVO nodeSimpleVO = NodeConvertHelper.convert(scriptFieldValue);
|
||||||
FlowBus.getNodeMap().remove(nodeSimpleVO.getNodeId());
|
FlowBus.unloadScriptNode(nodeSimpleVO.getNodeId());
|
||||||
LOG.info("starting reload flow config... delete key={}", scriptFieldValue);
|
LOG.info("starting reload flow config... delete key={}", scriptFieldValue);
|
||||||
|
|
||||||
//添加到待删除的list 后续统一从SHAMap中移除
|
//添加到待删除的list 后续统一从SHAMap中移除
|
||||||
|
@ -80,17 +81,11 @@ public class ScriptPollingTask implements Runnable {
|
||||||
//SHA值发生变化,表示该script的值已被修改,重新拉取变化的script
|
//SHA值发生变化,表示该script的值已被修改,重新拉取变化的script
|
||||||
String scriptData = scriptClient.hget(scriptKey, scriptFieldValue);
|
String scriptData = scriptClient.hget(scriptKey, scriptFieldValue);
|
||||||
|
|
||||||
NodeConvertHelper.NodeSimpleVO nodeSimpleVO = NodeConvertHelper.convert(scriptFieldValue);
|
boolean changeSuccess = RedisParserHelper.changeScriptNode(scriptFieldValue, scriptData);
|
||||||
nodeSimpleVO.setScript(scriptData);
|
|
||||||
if (nodeSimpleVO.getEnable()) {
|
|
||||||
RedisParserHelper.changeScriptNode(scriptFieldValue, scriptData);
|
|
||||||
LOG.info("starting reload flow config... update key={} new value={},", scriptFieldValue, scriptData);
|
|
||||||
|
|
||||||
//修改SHAMap
|
if (BooleanUtil.isTrue(changeSuccess)){
|
||||||
scriptSHAMap.put(scriptFieldValue, newSHA);
|
scriptSHAMap.put(scriptFieldValue, newSHA);
|
||||||
} else {
|
}else{
|
||||||
FlowBus.getNodeMap().remove(nodeSimpleVO.getNodeId());
|
|
||||||
LOG.info("starting reload flow config... delete key={}", scriptFieldValue);
|
|
||||||
needDelete.add(scriptFieldValue);
|
needDelete.add(scriptFieldValue);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -114,13 +109,13 @@ public class ScriptPollingTask implements Runnable {
|
||||||
if (!scriptSHAMap.containsKey(scriptFieldValue)) {
|
if (!scriptSHAMap.containsKey(scriptFieldValue)) {
|
||||||
//将新script添加到LiteFlowChainELBuilder和SHAMap
|
//将新script添加到LiteFlowChainELBuilder和SHAMap
|
||||||
String scriptData = scriptClient.hget(scriptKey, scriptFieldValue);
|
String scriptData = scriptClient.hget(scriptKey, scriptFieldValue);
|
||||||
NodeConvertHelper.NodeSimpleVO nodeSimpleVO = NodeConvertHelper.convert(scriptFieldValue);
|
|
||||||
if (nodeSimpleVO.getEnable()) {
|
boolean isAddSuccess = RedisParserHelper.changeScriptNode(scriptFieldValue, scriptData);
|
||||||
RedisParserHelper.changeScriptNode(scriptFieldValue, scriptData);
|
|
||||||
|
if (BooleanUtil.isTrue(isAddSuccess)){
|
||||||
LOG.info("starting reload flow config... create key={} new value={},", scriptFieldValue, scriptData);
|
LOG.info("starting reload flow config... create key={} new value={},", scriptFieldValue, scriptData);
|
||||||
scriptSHAMap.put(scriptFieldValue, DigestUtil.sha1Hex(scriptData));
|
scriptSHAMap.put(scriptFieldValue, DigestUtil.sha1Hex(scriptData));
|
||||||
} else {
|
}else{
|
||||||
FlowBus.getNodeMap().remove(nodeSimpleVO.getNodeId());
|
|
||||||
LOG.info("starting reload flow config... delete key={}", scriptFieldValue);
|
LOG.info("starting reload flow config... delete key={}", scriptFieldValue);
|
||||||
needDelete.add(scriptFieldValue);
|
needDelete.add(scriptFieldValue);
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,6 +2,7 @@ package com.yomahub.liteflow.parser.redis.mode.subscribe;
|
||||||
|
|
||||||
import cn.hutool.core.collection.CollUtil;
|
import cn.hutool.core.collection.CollUtil;
|
||||||
import cn.hutool.core.lang.Pair;
|
import cn.hutool.core.lang.Pair;
|
||||||
|
import cn.hutool.core.util.BooleanUtil;
|
||||||
import cn.hutool.core.util.ObjectUtil;
|
import cn.hutool.core.util.ObjectUtil;
|
||||||
import cn.hutool.core.util.StrUtil;
|
import cn.hutool.core.util.StrUtil;
|
||||||
import com.yomahub.liteflow.builder.LiteFlowNodeBuilder;
|
import com.yomahub.liteflow.builder.LiteFlowNodeBuilder;
|
||||||
|
@ -19,6 +20,7 @@ import com.yomahub.liteflow.util.RuleParsePluginUtil;
|
||||||
import org.redisson.Redisson;
|
import org.redisson.Redisson;
|
||||||
import org.redisson.api.map.event.EntryCreatedListener;
|
import org.redisson.api.map.event.EntryCreatedListener;
|
||||||
import org.redisson.api.map.event.EntryRemovedListener;
|
import org.redisson.api.map.event.EntryRemovedListener;
|
||||||
|
import org.redisson.api.map.event.EntryUpdatedListener;
|
||||||
import org.redisson.config.Config;
|
import org.redisson.config.Config;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -149,25 +151,23 @@ public class RedisParserSubscribeMode implements RedisParserHelper {
|
||||||
public void listenRedis() {
|
public void listenRedis() {
|
||||||
//监听 chain
|
//监听 chain
|
||||||
String chainKey = redisParserVO.getChainKey();
|
String chainKey = redisParserVO.getChainKey();
|
||||||
EntryCreatedListener<String, String> chainModifyFunc = event -> {
|
|
||||||
LOG.info("starting modify flow config... create key={} value={},", event.getKey(), event.getValue());
|
|
||||||
String chainName = event.getKey();
|
|
||||||
String value = event.getValue();
|
|
||||||
Pair<Boolean/*启停*/, String/*id*/> pair = RuleParsePluginUtil.parseIdKey(chainName);
|
|
||||||
String id = pair.getValue();
|
|
||||||
// 如果是启用,就正常更新
|
|
||||||
if (pair.getKey()) {
|
|
||||||
LiteFlowChainELBuilder.createChain().setChainId(id).setEL(value).build();
|
|
||||||
}
|
|
||||||
// 如果是禁用,就删除
|
|
||||||
else {
|
|
||||||
FlowBus.removeChain(id);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
//添加新 chain
|
//添加新 chain
|
||||||
chainClient.addListener(chainKey, chainModifyFunc);
|
chainClient.addListener(chainKey, (EntryCreatedListener<String, String>) event -> {
|
||||||
|
LOG.info("starting modify flow config... create key={} value={},", event.getKey(), event.getValue());
|
||||||
|
String chainId = event.getKey();
|
||||||
|
String value = event.getValue();
|
||||||
|
RedisParserHelper.changeChain(chainId, value);
|
||||||
|
});
|
||||||
|
|
||||||
//修改 chain
|
//修改 chain
|
||||||
chainClient.addListener(chainKey, chainModifyFunc);
|
chainClient.addListener(chainKey, (EntryUpdatedListener<String, String>) event -> {
|
||||||
|
LOG.info("starting modify flow config... create key={} value={},", event.getKey(), event.getValue());
|
||||||
|
String chainId = event.getKey();
|
||||||
|
String value = event.getValue();
|
||||||
|
RedisParserHelper.changeChain(chainId, value);
|
||||||
|
});
|
||||||
|
|
||||||
//删除 chain
|
//删除 chain
|
||||||
chainClient.addListener(chainKey, (EntryRemovedListener<String, String>) event -> {
|
chainClient.addListener(chainKey, (EntryRemovedListener<String, String>) event -> {
|
||||||
LOG.info("starting reload flow config... delete key={}", event.getKey());
|
LOG.info("starting reload flow config... delete key={}", event.getKey());
|
||||||
|
@ -176,36 +176,24 @@ public class RedisParserSubscribeMode implements RedisParserHelper {
|
||||||
});
|
});
|
||||||
|
|
||||||
//监听 script
|
//监听 script
|
||||||
EntryCreatedListener<String, String> scriptModifyFunc = event -> {
|
|
||||||
LOG.info("starting modify flow config... create key={} value={},", event.getKey(), event.getValue());
|
|
||||||
NodeConvertHelper.NodeSimpleVO nodeSimpleVO = NodeConvertHelper.convert(event.getKey());
|
|
||||||
nodeSimpleVO.setScript(event.getValue());
|
|
||||||
// 启用就正常更新
|
|
||||||
if (nodeSimpleVO.getEnable()) {
|
|
||||||
LiteFlowNodeBuilder.createScriptNode()
|
|
||||||
.setId(nodeSimpleVO.getNodeId())
|
|
||||||
.setType(NodeTypeEnum.getEnumByCode(nodeSimpleVO.getType()))
|
|
||||||
.setName(nodeSimpleVO.getName())
|
|
||||||
.setScript(nodeSimpleVO.getScript())
|
|
||||||
.setLanguage(nodeSimpleVO.getLanguage())
|
|
||||||
.build();
|
|
||||||
}
|
|
||||||
// 禁用就删除
|
|
||||||
else {
|
|
||||||
FlowBus.getNodeMap().remove(nodeSimpleVO.getNodeId());
|
|
||||||
}
|
|
||||||
};
|
|
||||||
if (ObjectUtil.isNotNull(scriptClient) && ObjectUtil.isNotNull(redisParserVO.getScriptDataBase())) {
|
if (ObjectUtil.isNotNull(scriptClient) && ObjectUtil.isNotNull(redisParserVO.getScriptDataBase())) {
|
||||||
String scriptKey = redisParserVO.getScriptKey();
|
String scriptKey = redisParserVO.getScriptKey();
|
||||||
|
|
||||||
//添加 script
|
//添加 script
|
||||||
scriptClient.addListener(scriptKey, scriptModifyFunc);
|
scriptClient.addListener(scriptKey, (EntryCreatedListener<String, String>) event -> {
|
||||||
|
LOG.info("starting reload flow config... create key={} value={},", event.getKey(), event.getValue());
|
||||||
|
RedisParserHelper.changeScriptNode(event.getKey(), event.getValue());
|
||||||
|
});
|
||||||
//修改 script
|
//修改 script
|
||||||
scriptClient.addListener(scriptKey, scriptModifyFunc);
|
scriptClient.addListener(scriptKey, (EntryUpdatedListener<String, String>) event -> {
|
||||||
|
LOG.info("starting reload flow config... update key={} new value={},", event.getKey(), event.getValue());
|
||||||
|
RedisParserHelper.changeScriptNode(event.getKey(), event.getValue());
|
||||||
|
});
|
||||||
//删除 script
|
//删除 script
|
||||||
scriptClient.addListener(scriptKey, (EntryRemovedListener<String, String>) event -> {
|
scriptClient.addListener(scriptKey, (EntryRemovedListener<String, String>) event -> {
|
||||||
LOG.info("starting reload flow config... delete key={}", event.getKey());
|
LOG.info("starting reload flow config... delete key={}", event.getKey());
|
||||||
NodeConvertHelper.NodeSimpleVO nodeSimpleVO = NodeConvertHelper.convert(event.getKey());
|
NodeConvertHelper.NodeSimpleVO nodeSimpleVO = NodeConvertHelper.convert(event.getKey());
|
||||||
FlowBus.getNodeMap().remove(nodeSimpleVO.getNodeId());
|
FlowBus.unloadScriptNode(nodeSimpleVO.getNodeId());
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,7 +38,7 @@ public class ScriptReadPollTask extends AbstractSqlReadPollTask {
|
||||||
NodeConvertHelper.NodeSimpleVO scriptVO = NodeConvertHelper.convert(id);
|
NodeConvertHelper.NodeSimpleVO scriptVO = NodeConvertHelper.convert(id);
|
||||||
|
|
||||||
// 删除script
|
// 删除script
|
||||||
FlowBus.getNodeMap().remove(scriptVO.getNodeId());
|
FlowBus.unloadScriptNode(scriptVO.getNodeId());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -15,6 +15,7 @@ import com.yomahub.liteflow.parser.zk.vo.ZkParserVO;
|
||||||
import com.yomahub.liteflow.util.RuleParsePluginUtil;
|
import com.yomahub.liteflow.util.RuleParsePluginUtil;
|
||||||
import org.apache.curator.framework.CuratorFramework;
|
import org.apache.curator.framework.CuratorFramework;
|
||||||
import org.apache.curator.framework.CuratorFrameworkFactory;
|
import org.apache.curator.framework.CuratorFrameworkFactory;
|
||||||
|
import org.apache.curator.framework.recipes.cache.ChildData;
|
||||||
import org.apache.curator.framework.recipes.cache.CuratorCache;
|
import org.apache.curator.framework.recipes.cache.CuratorCache;
|
||||||
import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
|
import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
|
||||||
import org.apache.curator.retry.RetryNTimes;
|
import org.apache.curator.retry.RetryNTimes;
|
||||||
|
@ -129,8 +130,9 @@ public class ZkParserHelper {
|
||||||
CuratorCache cache1 = CuratorCache.build(client, zkParserVO.getChainPath());
|
CuratorCache cache1 = CuratorCache.build(client, zkParserVO.getChainPath());
|
||||||
cache1.start();
|
cache1.start();
|
||||||
cache1.listenable().addListener((type, oldData, data) -> {
|
cache1.listenable().addListener((type, oldData, data) -> {
|
||||||
String path = data.getPath();
|
ChildData currChildData = data == null? oldData : data;
|
||||||
String value = new String(data.getData());
|
String path = currChildData.getPath();
|
||||||
|
String value = new String(currChildData.getData());
|
||||||
if (StrUtil.isBlank(value)) {
|
if (StrUtil.isBlank(value)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -161,8 +163,9 @@ public class ZkParserHelper {
|
||||||
CuratorCache cache2 = CuratorCache.build(client, zkParserVO.getScriptPath());
|
CuratorCache cache2 = CuratorCache.build(client, zkParserVO.getScriptPath());
|
||||||
cache2.start();
|
cache2.start();
|
||||||
cache2.listenable().addListener((type, oldData, data) -> {
|
cache2.listenable().addListener((type, oldData, data) -> {
|
||||||
String path = data.getPath();
|
ChildData currChildData = data == null? oldData : data;
|
||||||
String value = new String(data.getData());
|
String path = currChildData.getPath();
|
||||||
|
String value = new String(currChildData.getData());
|
||||||
if (StrUtil.isBlank(value)) {
|
if (StrUtil.isBlank(value)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -184,13 +187,13 @@ public class ZkParserHelper {
|
||||||
}
|
}
|
||||||
// 禁用就删除
|
// 禁用就删除
|
||||||
else {
|
else {
|
||||||
FlowBus.getNodeMap().remove(nodeSimpleVO.getNodeId());
|
FlowBus.unloadScriptNode(nodeSimpleVO.getNodeId());
|
||||||
}
|
}
|
||||||
} else if (CuratorCacheListener.Type.NODE_DELETED.equals(type)) {
|
} else if (CuratorCacheListener.Type.NODE_DELETED.equals(type)) {
|
||||||
LOG.info("starting reload flow config... delete path={}", path);
|
LOG.info("starting reload flow config... delete path={}", path);
|
||||||
String scriptNodeValue = FileNameUtil.getName(path);
|
String scriptNodeValue = FileNameUtil.getName(path);
|
||||||
NodeConvertHelper.NodeSimpleVO nodeSimpleVO = NodeConvertHelper.convert(scriptNodeValue);
|
NodeConvertHelper.NodeSimpleVO nodeSimpleVO = NodeConvertHelper.convert(scriptNodeValue);
|
||||||
FlowBus.getNodeMap().remove(nodeSimpleVO.getNodeId());
|
FlowBus.unloadScriptNode(nodeSimpleVO.getNodeId());
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue