diff --git a/liteflow-core/pom.xml b/liteflow-core/pom.xml index 7ea088f6..9911e129 100644 --- a/liteflow-core/pom.xml +++ b/liteflow-core/pom.xml @@ -9,7 +9,7 @@ com.thebeastshop liteflow - 2.1.1 + 2.1.3-SNAPSHOT diff --git a/liteflow-core/src/main/java/com/thebeastshop/liteflow/core/FlowExecutor.java b/liteflow-core/src/main/java/com/thebeastshop/liteflow/core/FlowExecutor.java index 85f83d51..f015dcf3 100644 --- a/liteflow-core/src/main/java/com/thebeastshop/liteflow/core/FlowExecutor.java +++ b/liteflow-core/src/main/java/com/thebeastshop/liteflow/core/FlowExecutor.java @@ -40,13 +40,13 @@ import com.thebeastshop.liteflow.parser.ZookeeperXmlFlowParser; import com.thebeastshop.liteflow.util.LOGOPrinter; public class FlowExecutor { - + private static final Logger LOG = LoggerFactory.getLogger(FlowExecutor.class); - + private List rulePath; - + private String zkNode; - + public void init() { XmlFlowParser parser = null; for(String path : rulePath){ @@ -71,88 +71,88 @@ public class FlowExecutor { } } } - + private boolean isZKConfig(String path) { Pattern p = Pattern.compile("[\\w\\d][\\w\\d\\.]+\\:(\\d)+(\\,[\\w\\d][\\w\\d\\.]+\\:(\\d)+)*"); Matcher m = p.matcher(path); return m.find(); } - + private boolean isLocalConfig(String path) { Pattern p = Pattern.compile("^[\\w\\/]+(\\/\\w+)*\\.xml$"); Matcher m = p.matcher(path); return m.find(); } - + private boolean isClassConfig(String path) { Pattern p = Pattern.compile("^\\w+(\\.\\w+)*$"); Matcher m = p.matcher(path); return m.find(); } - + public void reloadRule(){ init(); } - public T execute(String chainId,Object param){ + public T execute(String chainId,Object param) throws Exception{ return execute(chainId, param, DefaultSlot.class,null,false); } - - public T execute(String chainId,Object param,Class slotClazz){ + + public T execute(String chainId,Object param,Class slotClazz) throws Exception{ return execute(chainId, param, slotClazz,null,false); } - - public void invoke(String chainId,Object param,Class slotClazz,Integer slotIndex){ + + public void invoke(String chainId,Object param,Class slotClazz,Integer slotIndex) throws Exception{ execute(chainId, param, slotClazz,slotIndex,true); } - - public T execute(String chainId,Object param,Class slotClazz,Integer slotIndex,boolean isInnerChain){ + + public T execute(String chainId,Object param,Class slotClazz,Integer slotIndex,boolean isInnerChain) throws Exception{ Slot slot = null; try{ if(FlowBus.needInit()) { init(); } - + Chain chain = FlowBus.getChain(chainId); - + if(chain == null){ String errorMsg = MessageFormat.format("couldn't find chain with the id[{0}]", chainId); throw new ChainNotFoundException(errorMsg); } - + if(!isInnerChain && slotIndex == null) { slotIndex = DataBus.offerSlot(slotClazz); LOG.info("slot[{}] offered",slotIndex); } - + if(slotIndex == -1){ throw new NoAvailableSlotException("there is no available slot"); } - + slot = DataBus.getSlot(slotIndex); if(slot == null) { throw new NoAvailableSlotException("the slot is not exist"); } - + if(StringUtils.isBlank(slot.getRequestId())) { slot.generateRequestId(); LOG.info("requestId[{}] has generated",slot.getRequestId()); } - + if(!isInnerChain) { slot.setRequestData(param); slot.setChainName(chainId); }else { slot.setChainReqData(chainId, param); } - + List conditionList = chain.getConditionList(); - + List nodeList = null; NodeComponent component = null; for(Condition condition : conditionList){ nodeList = condition.getNodeList(); - + if(condition instanceof ThenCondition){ for(Node node : nodeList){ component = node.getInstance(); @@ -176,6 +176,9 @@ public class FlowExecutor { LOG.error(errorMsg,t); throw t; } + }finally { + component.removeSlotIndex(); + component.removeIsEnd(); } } }else if(condition instanceof WhenCondition){ @@ -190,7 +193,7 @@ public class FlowExecutor { }catch(Exception e){ String errorMsg = MessageFormat.format("[{0}]executor cause error", slot.getRequestId()); LOG.error(errorMsg,e); - throw new FlowSystemException(errorMsg); + throw e; }finally{ if(!isInnerChain) { slot.printStep(); @@ -198,24 +201,24 @@ public class FlowExecutor { } } } - + private class WhenConditionThread extends Thread{ - + private Node node; - + private Integer slotIndex; - + private String requestId; - + private CountDownLatch latch; - + public WhenConditionThread(Node node,Integer slotIndex,String requestId,CountDownLatch latch){ this.node = node; this.slotIndex = slotIndex; this.requestId = requestId; this.latch = latch; } - + @Override public void run() { try{ @@ -240,7 +243,7 @@ public class FlowExecutor { public void setRulePath(List rulePath) { this.rulePath = rulePath; } - + public String getZkNode() { return zkNode; } diff --git a/liteflow-core/src/main/java/com/thebeastshop/liteflow/core/NodeComponent.java b/liteflow-core/src/main/java/com/thebeastshop/liteflow/core/NodeComponent.java index a13dae2c..a2d127bf 100644 --- a/liteflow-core/src/main/java/com/thebeastshop/liteflow/core/NodeComponent.java +++ b/liteflow-core/src/main/java/com/thebeastshop/liteflow/core/NodeComponent.java @@ -31,6 +31,8 @@ public abstract class NodeComponent { private String nodeId; + private InheritableThreadLocal isEndTL = new InheritableThreadLocal<>(); + public void execute() throws Exception{ Slot slot = this.getSlot(); LOG.info("[{}]:[O]start component[{}] execution",slot.getRequestId(),this.getClass().getSimpleName()); @@ -88,7 +90,23 @@ public abstract class NodeComponent { * 是否结束整个流程(不往下继续执行) */ protected boolean isEnd() { - return false; + Boolean isEnd = isEndTL.get(); + if(isEnd == null){ + return false; + }else{ + return isEndTL.get(); + } + } + + /** + * 设置是否结束整个流程 + */ + protected void setIsEnd(boolean isEnd){ + this.isEndTL.set(isEnd); + } + + protected void removeIsEnd(){ + this.isEndTL.remove(); } public NodeComponent setSlotIndex(Integer slotIndex) { @@ -100,6 +118,10 @@ public abstract class NodeComponent { return this.slotIndexTL.get(); } + public void removeSlotIndex(){ + this.slotIndexTL.remove(); + } + public T getSlot(){ return DataBus.getSlot(this.slotIndexTL.get()); } diff --git a/liteflow-core/src/test/java/com/thebeastshop/liteflow/test/component/HComponent.java b/liteflow-core/src/test/java/com/thebeastshop/liteflow/test/component/HComponent.java index a58eb444..0b928b31 100644 --- a/liteflow-core/src/test/java/com/thebeastshop/liteflow/test/component/HComponent.java +++ b/liteflow-core/src/test/java/com/thebeastshop/liteflow/test/component/HComponent.java @@ -22,11 +22,16 @@ public class HComponent extends NodeComponent { @Resource private FlowExecutor flowExecutor; - + @Override public void process() { System.out.println("Hcomponent executed!"); - flowExecutor.invoke("strategy1",3, DefaultSlot.class, this.getSlotIndex()); + try{ + flowExecutor.invoke("strategy1",3, DefaultSlot.class, this.getSlotIndex()); + }catch (Exception e){ + e.printStackTrace(); + } + } - + } diff --git a/liteflow-core/src/test/java/com/thebeastshop/liteflow/test/component/M3Component.java b/liteflow-core/src/test/java/com/thebeastshop/liteflow/test/component/M3Component.java index 13926b7c..58ab3e06 100644 --- a/liteflow-core/src/test/java/com/thebeastshop/liteflow/test/component/M3Component.java +++ b/liteflow-core/src/test/java/com/thebeastshop/liteflow/test/component/M3Component.java @@ -22,11 +22,16 @@ public class M3Component extends NodeComponent { @Resource private FlowExecutor flowExecutor; - + @Override public void process() { System.out.println("m3 component executed!"); - flowExecutor.invoke("strategy2",10, DefaultSlot.class, this.getSlotIndex()); + try{ + flowExecutor.invoke("strategy2",10, DefaultSlot.class, this.getSlotIndex()); + }catch (Exception e){ + e.printStackTrace(); + } + } - + } diff --git a/liteflow-spring-boot-starter/pom.xml b/liteflow-spring-boot-starter/pom.xml index cb93e0a4..3ef2b984 100644 --- a/liteflow-spring-boot-starter/pom.xml +++ b/liteflow-spring-boot-starter/pom.xml @@ -5,7 +5,7 @@ liteflow com.thebeastshop - 2.1.1 + 2.1.3-SNAPSHOT 4.0.0 diff --git a/liteflow-test/pom.xml b/liteflow-test/pom.xml index 714c9d5a..4e65985b 100644 --- a/liteflow-test/pom.xml +++ b/liteflow-test/pom.xml @@ -26,7 +26,7 @@ com.thebeastshop liteflow-spring-boot-starter - 2.1.1 + 2.1.3-SNAPSHOT diff --git a/liteflow-test/src/main/java/com/thebeastshop/flowtest/components/HComponent.java b/liteflow-test/src/main/java/com/thebeastshop/flowtest/components/HComponent.java index d06d5b80..5db50e91 100644 --- a/liteflow-test/src/main/java/com/thebeastshop/flowtest/components/HComponent.java +++ b/liteflow-test/src/main/java/com/thebeastshop/flowtest/components/HComponent.java @@ -25,7 +25,10 @@ public class HComponent extends NodeComponent { @Override public void process() { System.out.println("Hcomponent executed!"); - flowExecutor.invoke("strategy1",3, DefaultSlot.class, this.getSlotIndex()); + try{ + flowExecutor.invoke("strategy1",3, DefaultSlot.class, this.getSlotIndex()); + }catch (Exception e){ + e.printStackTrace(); + } } - } diff --git a/liteflow-test/src/main/java/com/thebeastshop/flowtest/components/M3Component.java b/liteflow-test/src/main/java/com/thebeastshop/flowtest/components/M3Component.java index 7f2b902a..a742fe0f 100644 --- a/liteflow-test/src/main/java/com/thebeastshop/flowtest/components/M3Component.java +++ b/liteflow-test/src/main/java/com/thebeastshop/flowtest/components/M3Component.java @@ -25,7 +25,10 @@ public class M3Component extends NodeComponent { @Override public void process() { System.out.println("m3 component executed!"); - flowExecutor.invoke("strategy2",10, DefaultSlot.class, this.getSlotIndex()); + try{ + flowExecutor.invoke("strategy2",10, DefaultSlot.class, this.getSlotIndex()); + }catch (Exception e){ + e.printStackTrace(); + } } - } diff --git a/pom.xml b/pom.xml index 00fda161..c6087416 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ liteflow pom 4.0.0 - 2.1.1 + 2.1.3-SNAPSHOT UTF-8