增加结束整个流程的选项

修复一些问题
This commit is contained in:
bryan.zhang 2020-01-29 23:41:18 +08:00
parent 325106d180
commit b0c81e87b2
10 changed files with 90 additions and 49 deletions

View File

@ -9,7 +9,7 @@
<parent>
<groupId>com.thebeastshop</groupId>
<artifactId>liteflow</artifactId>
<version>2.1.1</version>
<version>2.1.3-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -40,13 +40,13 @@ import com.thebeastshop.liteflow.parser.ZookeeperXmlFlowParser;
import com.thebeastshop.liteflow.util.LOGOPrinter;
public class FlowExecutor {
private static final Logger LOG = LoggerFactory.getLogger(FlowExecutor.class);
private List<String> rulePath;
private String zkNode;
public void init() {
XmlFlowParser parser = null;
for(String path : rulePath){
@ -71,88 +71,88 @@ public class FlowExecutor {
}
}
}
private boolean isZKConfig(String path) {
Pattern p = Pattern.compile("[\\w\\d][\\w\\d\\.]+\\:(\\d)+(\\,[\\w\\d][\\w\\d\\.]+\\:(\\d)+)*");
Matcher m = p.matcher(path);
return m.find();
}
private boolean isLocalConfig(String path) {
Pattern p = Pattern.compile("^[\\w\\/]+(\\/\\w+)*\\.xml$");
Matcher m = p.matcher(path);
return m.find();
}
private boolean isClassConfig(String path) {
Pattern p = Pattern.compile("^\\w+(\\.\\w+)*$");
Matcher m = p.matcher(path);
return m.find();
}
public void reloadRule(){
init();
}
public <T extends Slot> T execute(String chainId,Object param){
public <T extends Slot> T execute(String chainId,Object param) throws Exception{
return execute(chainId, param, DefaultSlot.class,null,false);
}
public <T extends Slot> T execute(String chainId,Object param,Class<? extends Slot> slotClazz){
public <T extends Slot> T execute(String chainId,Object param,Class<? extends Slot> slotClazz) throws Exception{
return execute(chainId, param, slotClazz,null,false);
}
public void invoke(String chainId,Object param,Class<? extends Slot> slotClazz,Integer slotIndex){
public void invoke(String chainId,Object param,Class<? extends Slot> slotClazz,Integer slotIndex) throws Exception{
execute(chainId, param, slotClazz,slotIndex,true);
}
public <T extends Slot> T execute(String chainId,Object param,Class<? extends Slot> slotClazz,Integer slotIndex,boolean isInnerChain){
public <T extends Slot> T execute(String chainId,Object param,Class<? extends Slot> slotClazz,Integer slotIndex,boolean isInnerChain) throws Exception{
Slot slot = null;
try{
if(FlowBus.needInit()) {
init();
}
Chain chain = FlowBus.getChain(chainId);
if(chain == null){
String errorMsg = MessageFormat.format("couldn't find chain with the id[{0}]", chainId);
throw new ChainNotFoundException(errorMsg);
}
if(!isInnerChain && slotIndex == null) {
slotIndex = DataBus.offerSlot(slotClazz);
LOG.info("slot[{}] offered",slotIndex);
}
if(slotIndex == -1){
throw new NoAvailableSlotException("there is no available slot");
}
slot = DataBus.getSlot(slotIndex);
if(slot == null) {
throw new NoAvailableSlotException("the slot is not exist");
}
if(StringUtils.isBlank(slot.getRequestId())) {
slot.generateRequestId();
LOG.info("requestId[{}] has generated",slot.getRequestId());
}
if(!isInnerChain) {
slot.setRequestData(param);
slot.setChainName(chainId);
}else {
slot.setChainReqData(chainId, param);
}
List<Condition> conditionList = chain.getConditionList();
List<Node> nodeList = null;
NodeComponent component = null;
for(Condition condition : conditionList){
nodeList = condition.getNodeList();
if(condition instanceof ThenCondition){
for(Node node : nodeList){
component = node.getInstance();
@ -176,6 +176,9 @@ public class FlowExecutor {
LOG.error(errorMsg,t);
throw t;
}
}finally {
component.removeSlotIndex();
component.removeIsEnd();
}
}
}else if(condition instanceof WhenCondition){
@ -190,7 +193,7 @@ public class FlowExecutor {
}catch(Exception e){
String errorMsg = MessageFormat.format("[{0}]executor cause error", slot.getRequestId());
LOG.error(errorMsg,e);
throw new FlowSystemException(errorMsg);
throw e;
}finally{
if(!isInnerChain) {
slot.printStep();
@ -198,24 +201,24 @@ public class FlowExecutor {
}
}
}
private class WhenConditionThread extends Thread{
private Node node;
private Integer slotIndex;
private String requestId;
private CountDownLatch latch;
public WhenConditionThread(Node node,Integer slotIndex,String requestId,CountDownLatch latch){
this.node = node;
this.slotIndex = slotIndex;
this.requestId = requestId;
this.latch = latch;
}
@Override
public void run() {
try{
@ -240,7 +243,7 @@ public class FlowExecutor {
public void setRulePath(List<String> rulePath) {
this.rulePath = rulePath;
}
public String getZkNode() {
return zkNode;
}

View File

@ -31,6 +31,8 @@ public abstract class NodeComponent {
private String nodeId;
private InheritableThreadLocal<Boolean> isEndTL = new InheritableThreadLocal<>();
public void execute() throws Exception{
Slot slot = this.getSlot();
LOG.info("[{}]:[O]start component[{}] execution",slot.getRequestId(),this.getClass().getSimpleName());
@ -88,7 +90,23 @@ public abstract class NodeComponent {
* 是否结束整个流程(不往下继续执行)
*/
protected boolean isEnd() {
return false;
Boolean isEnd = isEndTL.get();
if(isEnd == null){
return false;
}else{
return isEndTL.get();
}
}
/**
* 设置是否结束整个流程
*/
protected void setIsEnd(boolean isEnd){
this.isEndTL.set(isEnd);
}
protected void removeIsEnd(){
this.isEndTL.remove();
}
public NodeComponent setSlotIndex(Integer slotIndex) {
@ -100,6 +118,10 @@ public abstract class NodeComponent {
return this.slotIndexTL.get();
}
public void removeSlotIndex(){
this.slotIndexTL.remove();
}
public <T extends Slot> T getSlot(){
return DataBus.getSlot(this.slotIndexTL.get());
}

View File

@ -22,11 +22,16 @@ public class HComponent extends NodeComponent {
@Resource
private FlowExecutor flowExecutor;
@Override
public void process() {
System.out.println("Hcomponent executed!");
flowExecutor.invoke("strategy1",3, DefaultSlot.class, this.getSlotIndex());
try{
flowExecutor.invoke("strategy1",3, DefaultSlot.class, this.getSlotIndex());
}catch (Exception e){
e.printStackTrace();
}
}
}

View File

@ -22,11 +22,16 @@ public class M3Component extends NodeComponent {
@Resource
private FlowExecutor flowExecutor;
@Override
public void process() {
System.out.println("m3 component executed!");
flowExecutor.invoke("strategy2",10, DefaultSlot.class, this.getSlotIndex());
try{
flowExecutor.invoke("strategy2",10, DefaultSlot.class, this.getSlotIndex());
}catch (Exception e){
e.printStackTrace();
}
}
}

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>liteflow</artifactId>
<groupId>com.thebeastshop</groupId>
<version>2.1.1</version>
<version>2.1.3-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -26,7 +26,7 @@
<dependency>
<groupId>com.thebeastshop</groupId>
<artifactId>liteflow-spring-boot-starter</artifactId>
<version>2.1.1</version>
<version>2.1.3-SNAPSHOT</version>
</dependency>
<dependency>

View File

@ -25,7 +25,10 @@ public class HComponent extends NodeComponent {
@Override
public void process() {
System.out.println("Hcomponent executed!");
flowExecutor.invoke("strategy1",3, DefaultSlot.class, this.getSlotIndex());
try{
flowExecutor.invoke("strategy1",3, DefaultSlot.class, this.getSlotIndex());
}catch (Exception e){
e.printStackTrace();
}
}
}

View File

@ -25,7 +25,10 @@ public class M3Component extends NodeComponent {
@Override
public void process() {
System.out.println("m3 component executed!");
flowExecutor.invoke("strategy2",10, DefaultSlot.class, this.getSlotIndex());
try{
flowExecutor.invoke("strategy2",10, DefaultSlot.class, this.getSlotIndex());
}catch (Exception e){
e.printStackTrace();
}
}
}

View File

@ -5,7 +5,7 @@
<artifactId>liteflow</artifactId>
<packaging>pom</packaging>
<modelVersion>4.0.0</modelVersion>
<version>2.1.1</version>
<version>2.1.3-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>