feature #I7I3LL 增加maxWaitSeconds关键字,允许对EL中的每一个组件进行超时控制

This commit is contained in:
Dale Lee 2023-07-07 09:49:57 +08:00
parent 8cce43db12
commit 9f5f8959fb
15 changed files with 546 additions and 9 deletions

View File

@ -78,6 +78,7 @@ public class LiteFlowChainELBuilder {
EXPRESS_RUNNER.addFunctionAndClassMethod(ChainConstant.DO, Object.class, new DoOperator());
EXPRESS_RUNNER.addFunctionAndClassMethod(ChainConstant.BREAK, Object.class, new BreakOperator());
EXPRESS_RUNNER.addFunctionAndClassMethod(ChainConstant.DATA, Object.class, new DataOperator());
EXPRESS_RUNNER.addFunctionAndClassMethod(ChainConstant.MAX_WAIT_SECONDS, Object.class, new MaxWaitSecondsOperator());
}
public static LiteFlowChainELBuilder createChain() {

View File

@ -0,0 +1,89 @@
package com.yomahub.liteflow.builder.el.operator;
import cn.hutool.core.collection.CollUtil;
import com.ql.util.express.exception.QLException;
import com.yomahub.liteflow.builder.el.operator.base.BaseOperator;
import com.yomahub.liteflow.builder.el.operator.base.OperatorHelper;
import com.yomahub.liteflow.flow.element.Condition;
import com.yomahub.liteflow.flow.element.Executable;
import com.yomahub.liteflow.flow.element.condition.ConditionKey;
import com.yomahub.liteflow.flow.element.condition.FinallyCondition;
import com.yomahub.liteflow.flow.element.condition.ThenCondition;
import com.yomahub.liteflow.flow.element.condition.WhenCondition;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* EL 规则中的 maxWaitSeconds 的操作符
*
* @author DaleLee
* @since 2.11.0
*/
public class MaxWaitSecondsOperator extends BaseOperator<Condition> {
@Override
public Condition build(Object[] objects) throws Exception {
OperatorHelper.checkObjectSizeEqTwo(objects);
Executable executable = OperatorHelper.convert(objects[0], Executable.class);
// 获取传入的时间参数
Integer maxWaitSeconds = OperatorHelper.convert(objects[1], Integer.class);
if (executable instanceof WhenCondition) {
// WhenCondition直接设置等待时间
WhenCondition whenCondition = OperatorHelper.convert(executable, WhenCondition.class);
whenCondition.setMaxWaitTime(maxWaitSeconds);
whenCondition.setMaxWaitTimeUnit(TimeUnit.SECONDS);
return whenCondition;
} else if (executable instanceof FinallyCondition) {
// FINALLY报错
String errorMsg = "The caller cannot be FinallyCondition item";
throw new QLException(errorMsg);
} else if (containsFinally(executable)) {
// 处理 THEN 中的 FINALLY
ThenCondition thenCondition = OperatorHelper.convert(executable, ThenCondition.class);
return handleFinally(thenCondition, maxWaitSeconds);
} else {
// 其他情况 WHEN 包装
return wrappedByWhen(executable, maxWaitSeconds);
}
}
// 将一个 Executable 包装为带有单独超时控制的 WhenCondition
private WhenCondition wrappedByWhen(Executable executable, Integer maxWaitSeconds) {
WhenCondition whenCondition = new WhenCondition();
whenCondition.addExecutable(executable);
whenCondition.setMaxWaitTime(maxWaitSeconds);
whenCondition.setMaxWaitTimeUnit(TimeUnit.SECONDS);
return whenCondition;
}
// 判断 THEN 中是否含有 FINALLY 组件
private boolean containsFinally(Executable executable) {
return executable instanceof ThenCondition
&& CollUtil.isNotEmpty(((ThenCondition) executable).getFinallyConditionList());
}
// FINALLY 排除在超时控制之外
private ThenCondition handleFinally(ThenCondition thenCondition, Integer maxWaitSeconds) {
// 进行如下转换
// THEN(PRE(a),b,FINALLY(c))
// => THEN(
// WHEN(THEN(PRE(a),b)),
// FINALLY(c))
// 定义外层 THEN
ThenCondition outerThenCondition = new ThenCondition();
// FINALLY 转移到外层 THEN
List<Executable> finallyList = thenCondition.getExecutableList(ConditionKey.FINALLY_KEY);
finallyList.forEach(executable
-> outerThenCondition
.addFinallyCondition((FinallyCondition) executable));
finallyList.clear();
// 包装内部 THEN
WhenCondition whenCondition = wrappedByWhen(thenCondition, maxWaitSeconds);
outerThenCondition.addExecutable(whenCondition);
return outerThenCondition;
}
}

View File

@ -81,4 +81,6 @@ public interface ChainConstant {
String NOT = "NOT";
String MAX_WAIT_SECONDS = "maxWaitSeconds";
}

View File

@ -52,6 +52,12 @@ public class WhenCondition extends Condition {
// when单独的线程池名称
private String threadExecutorClass;
// 异步线程最的等待时间
private Integer maxWaitTime;
// 等待时间单位
private TimeUnit maxWaitTimeUnit;
@Override
public void executeCondition(Integer slotIndex) throws Exception {
executeAsyncCondition(slotIndex);
@ -86,15 +92,20 @@ public class WhenCondition extends Condition {
// 3.根据condition.getNodeList()的集合进行流处理用map进行把executable对象转换成List<CompletableFuture<WhenFutureObj>>
// 4.在转的过程中套入CompletableFutureTimeout方法进行超时判断如果超时则用WhenFutureObj.timeOut返回超时的对象
// 5.第2个参数是主要的本体CompletableFuture传入了ParallelSupplier和线程池对象
Integer whenMaxWaitTime;
TimeUnit whenMaxWaitTimeUnit;
if (ObjectUtil.isNull(this.getMaxWaitTime())) {
if (ObjectUtil.isNotNull(liteflowConfig.getWhenMaxWaitSeconds())) {
// 获取全局异步线程最长等待秒数
this.setMaxWaitTime(liteflowConfig.getWhenMaxWaitSeconds());
this.setMaxWaitTimeUnit(TimeUnit.SECONDS);
} else {
// 获取全局异步线程最的等待时间
this.setMaxWaitTime(liteflowConfig.getWhenMaxWaitTime());
}
}
if (ObjectUtil.isNotNull(liteflowConfig.getWhenMaxWaitSeconds())){
whenMaxWaitTime = liteflowConfig.getWhenMaxWaitSeconds();
whenMaxWaitTimeUnit = TimeUnit.SECONDS;
}else{
whenMaxWaitTime = liteflowConfig.getWhenMaxWaitTime();
whenMaxWaitTimeUnit = liteflowConfig.getWhenMaxWaitTimeUnit();
if (ObjectUtil.isNull(this.getMaxWaitTimeUnit())) {
// 获取全局异步线程最的等待时间单位
this.setMaxWaitTimeUnit(liteflowConfig.getWhenMaxWaitTimeUnit());
}
List<CompletableFuture<WhenFutureObj>> completableFutureList = this.getExecutableList()
@ -112,7 +123,7 @@ public class WhenCondition extends Condition {
WhenFutureObj.timeOut(executable.getId()),
CompletableFuture.supplyAsync(new ParallelSupplier(executable, currChainName, slotIndex),
parallelExecutor),
whenMaxWaitTime, whenMaxWaitTimeUnit))
this.getMaxWaitTime(), this.getMaxWaitTimeUnit()))
.collect(Collectors.toList());
CompletableFuture<?> resultCompletableFuture;
@ -221,4 +232,19 @@ public class WhenCondition extends Condition {
this.threadExecutorClass = threadExecutorClass;
}
public Integer getMaxWaitTime() {
return maxWaitTime;
}
public void setMaxWaitTime(Integer maxWaitTime) {
this.maxWaitTime = maxWaitTime;
}
public TimeUnit getMaxWaitTimeUnit() {
return maxWaitTimeUnit;
}
public void setMaxWaitTimeUnit(TimeUnit maxWaitTimeUnit) {
this.maxWaitTimeUnit = maxWaitTimeUnit;
}
}

View File

@ -0,0 +1,184 @@
package com.yomahub.liteflow.test.maxWaitSeconds;
import com.yomahub.liteflow.builder.LiteFlowNodeBuilder;
import com.yomahub.liteflow.builder.el.LiteFlowChainELBuilder;
import com.yomahub.liteflow.core.FlowExecutor;
import com.yomahub.liteflow.enums.NodeTypeEnum;
import com.yomahub.liteflow.exception.WhenTimeoutException;
import com.yomahub.liteflow.flow.LiteflowResponse;
import com.yomahub.liteflow.slot.DefaultContext;
import com.yomahub.liteflow.test.BaseTest;
import com.yomahub.liteflow.test.validateRule.cmp.ACmp;
import com.yomahub.liteflow.test.validateRule.cmp.BCmp;
import com.yomahub.liteflow.test.validateRule.cmp.CCmp;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
/**
* springboot环境下超时控制测试
*
* @author DaleLee
* @since 2.11.0
*/
@RunWith(SpringRunner.class)
@TestPropertySource(value = "classpath:/maxWaitSeconds/application.properties")
@SpringBootTest(classes = MaxWaitSecondsELSpringBootTest.class)
@EnableAutoConfiguration
@ComponentScan({ "com.yomahub.liteflow.test.maxWaitSeconds.cmp" })
public class MaxWaitSecondsELSpringBootTest extends BaseTest {
@Resource
private FlowExecutor flowExecutor;
public static final String CONTENT_KEY = "testKey";
// 测试 THEN 的超时情况
@Test
public void testThen1() {
assertTimeout("then1");
}
// 测试 THEN 的非超时情况
@Test
public void testThen2() {
assertNotTimeout("then2");
}
// 测试 When 的超时情况
@Test
public void testWhen1() {
assertTimeout("when1");
}
// 测试 WHEN 的非超时情况
@Test
public void testWhen2() {
assertNotTimeout("when2");
}
// 测试 FOR 的超时情况
@Test
public void testFor1() {
assertTimeout("for1");
}
// 测试 FOR 的非超时情况
@Test
public void testFor2() {
assertNotTimeout("for2");
}
// 测试 WHILE 的超时情况
@Test
public void testWhile1() {
assertTimeout("while1");
}
// 测试 WHILE 的非超时情况
@Test
public void testWhile2() {
assertNotTimeout("while2");
}
// 测试 ITERATOR 的超时情况
@Test
public void testIterator1() {
assertTimeout("iterator1");
}
// 测试 ITERATOR 的非超时情况
@Test
public void testIterator2() {
assertNotTimeout("iterator2");
}
// 测试 SWITCH 的超时情况
@Test
public void testSwitch1() {
assertTimeout("switch1");
}
// 测试 SWITCH 的非超时情况
@Test
public void testSwitch2() {
assertNotTimeout("switch2");
}
// 测试 IF 的超时情况
@Test
public void testIf1() {
assertTimeout("if1");
}
// 测试 SWITCH 的非超时情况
@Test
public void testIf2() {
assertNotTimeout("if2");
}
// 测试单个组件的超时情况
@Test
public void testComponent1() {
assertTimeout("component1");
}
// 测试单个组件的非超时情况
@Test
public void testComponent2() {
assertNotTimeout("component2");
}
// 测试 FINALLY虽然超时 FINALLY 仍会执行
@Test
public void testFinally1() {
LiteflowResponse response = flowExecutor.execute2Resp("finally", "arg");
Assert.assertFalse(response.isSuccess());
Assert.assertEquals(WhenTimeoutException.class, response.getCause().getClass());
// FINALLY 执行时在默认数据上下文中放入了 CONTENT_KEY
DefaultContext contextBean = response.getFirstContextBean();
Assert.assertTrue(contextBean.hasData(CONTENT_KEY));
}
// 测试 maxWaitSeconds 关键字不能作用于 Finally
@Test
public void testFinally2() {
LiteFlowNodeBuilder.createNode()
.setId("a")
.setName("组件A")
.setType(NodeTypeEnum.COMMON)
.setClazz(ACmp.class)
.build();
LiteFlowNodeBuilder.createNode()
.setId("b")
.setName("组件B")
.setType(NodeTypeEnum.COMMON)
.setClazz(BCmp.class)
.build();
LiteFlowNodeBuilder.createNode()
.setId("c")
.setName("组件C")
.setType(NodeTypeEnum.COMMON)
.setClazz(CCmp.class)
.build();
Assert.assertFalse(LiteFlowChainELBuilder.validate("THEN(a, b, FINALLY(c).maxWaitSeconds(10))"));
}
private void assertTimeout(String chainId) {
LiteflowResponse response = flowExecutor.execute2Resp(chainId, "arg");
Assert.assertFalse(response.isSuccess());
Assert.assertEquals(WhenTimeoutException.class, response.getCause().getClass());
}
private void assertNotTimeout(String chainId) {
LiteflowResponse response = flowExecutor.execute2Resp(chainId, "arg");
Assert.assertTrue(response.isSuccess());
}
}

View File

@ -0,0 +1,20 @@
package com.yomahub.liteflow.test.maxWaitSeconds.cmp;
import com.yomahub.liteflow.annotation.LiteflowComponent;
import com.yomahub.liteflow.core.NodeComponent;
import java.util.Random;
@LiteflowComponent("a")
public class ACmp extends NodeComponent {
@Override
public void process() {
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("ACmp executed!");
}
}

View File

@ -0,0 +1,18 @@
package com.yomahub.liteflow.test.maxWaitSeconds.cmp;
import com.yomahub.liteflow.annotation.LiteflowComponent;
import com.yomahub.liteflow.core.NodeComponent;
@LiteflowComponent("b")
public class BCmp extends NodeComponent {
@Override
public void process() {
try {
Thread.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("BCmp executed!");
}
}

View File

@ -0,0 +1,18 @@
package com.yomahub.liteflow.test.maxWaitSeconds.cmp;
import com.yomahub.liteflow.annotation.LiteflowComponent;
import com.yomahub.liteflow.core.NodeComponent;
@LiteflowComponent("c")
public class CCmp extends NodeComponent {
@Override
public void process() {
try {
Thread.sleep(5000);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("CCmp executed!");
}
}

View File

@ -0,0 +1,24 @@
package com.yomahub.liteflow.test.maxWaitSeconds.cmp;
import com.yomahub.liteflow.annotation.LiteflowComponent;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.slot.DefaultContext;
import static com.yomahub.liteflow.test.maxWaitSeconds.MaxWaitSecondsELSpringBootTest.CONTENT_KEY;
@LiteflowComponent("d")
public class DCmp extends NodeComponent {
@Override
public void process() {
try {
Thread.sleep(500);
DefaultContext contextBean = this.getFirstContextBean();
contextBean.setData(CONTENT_KEY,"value");
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("DCmp executed!");
}
}

View File

@ -0,0 +1,12 @@
package com.yomahub.liteflow.test.maxWaitSeconds.cmp;
import com.yomahub.liteflow.annotation.LiteflowComponent;
import com.yomahub.liteflow.core.NodeIfComponent;
@LiteflowComponent("f")
public class FCmp extends NodeIfComponent {
@Override
public boolean processIf() throws Exception {
return true;
}
}

View File

@ -0,0 +1,13 @@
package com.yomahub.liteflow.test.maxWaitSeconds.cmp;
import com.yomahub.liteflow.annotation.LiteflowComponent;
import com.yomahub.liteflow.core.NodeSwitchComponent;
@LiteflowComponent("s")
public class SCmp extends NodeSwitchComponent {
@Override
public String processSwitch() throws Exception {
return "b";
}
}

View File

@ -0,0 +1,14 @@
package com.yomahub.liteflow.test.maxWaitSeconds.cmp;
import com.yomahub.liteflow.annotation.LiteflowComponent;
import com.yomahub.liteflow.core.NodeWhileComponent;
@LiteflowComponent("w")
public class WCmp extends NodeWhileComponent {
private int count = 0;
@Override
public boolean processWhile() throws Exception {
count++;
return count <= 2;
}
}

View File

@ -0,0 +1,17 @@
package com.yomahub.liteflow.test.maxWaitSeconds.cmp;
import cn.hutool.core.collection.ListUtil;
import com.yomahub.liteflow.annotation.LiteflowComponent;
import com.yomahub.liteflow.core.NodeIteratorComponent;
import java.util.Iterator;
import java.util.List;
@LiteflowComponent("x")
public class XCmp extends NodeIteratorComponent {
@Override
public Iterator<?> processIterator() throws Exception {
List<String> list = ListUtil.toList("one", "two");
return list.iterator();
}
}

View File

@ -0,0 +1 @@
liteflow.rule-source=maxWaitSeconds/flow.el.xml

View File

@ -0,0 +1,98 @@
<?xml version="1.0" encoding="UTF-8"?>
<flow>
<!--
a=>1s b=>2s c=>5s
-->
<!-- 串行编排测试 -->
<chain name="then1">
<!-- 超时 -->
THEN(a,b).maxWaitSeconds(2);
</chain>
<chain name="then2">
<!-- 不超时 -->
THEN(a,b).maxWaitSeconds(5);
</chain>
<!-- 并行编排测试 -->
<chain name="when1">
<!-- 超时 -->
WHEN(a,c).maxWaitSeconds(3);
</chain>
<chain name="when2">
<!-- 不超时 -->
WHEN(a,b).maxWaitSeconds(3);
</chain>
<!-- 循环编排测试 -->
<chain name="for1">
<!-- 超时 -->
FOR(2).DO(a).maxWaitSeconds(1);
</chain>
<chain name="for2">
<!-- 不超时 -->
FOR(2).DO(a).maxWaitSeconds(3);
</chain>
<!-- w 循环两次 -->
<chain name="while1">
<!-- 超时 -->
WHILE(w).DO(a).maxWaitSeconds(1);
</chain>
<chain name="while2">
<!-- 不超时 -->
WHILE(w).DO(a).maxWaitSeconds(3);
</chain>
<!-- x 迭代两次 -->
<chain name="iterator1">
<!-- 超时 -->
ITERATOR(x).DO(a).maxWaitSeconds(1);
</chain>
<chain name="iterator2">
<!-- 不超时 -->
ITERATOR(x).DO(a).maxWaitSeconds(3);
</chain>
<!-- 选择编排测试 -->
<!-- s 选择 b 组件 -->
<chain name="switch1">
<!-- 超时 -->
SWITCH(s).TO(a, b).maxWaitSeconds(1);
</chain>
<chain name="switch2">
<!-- 不超时 -->
SWITCH(s).TO(a, b).maxWaitSeconds(3);
</chain>
<!-- 条件编排测试 -->
<!-- f 返回 true -->
<chain name="if1">
<!-- 超时 -->
IF(f, b, c).maxWaitSeconds(1);
</chain>
<chain name="if2">
<!-- 不超时 -->
IF(f, b, c).maxWaitSeconds(3);
</chain>
<!-- 测试单个组件 -->
<chain name="component1">
<!-- 超时 -->
WHEN(
a.maxWaitSeconds(2),
c.maxWaitSeconds(3)
);
</chain>
<chain name="component2">
<!-- 不超时 -->
WHEN(
a.maxWaitSeconds(2),
b.maxWaitSeconds(3)
);
</chain>
<!-- 测试 FINALLY -->
<chain name="finally">
<!-- 超时,但 FINALLY 执行 -->
THEN(PRE(a), b, FINALLY(d)).maxWaitSeconds(2);
</chain>
</flow>