From 1ca5861e40c687fbc11335069c54bde23d1817a1 Mon Sep 17 00:00:00 2001 From: "everywhere.z" Date: Sat, 14 Oct 2023 16:32:09 +0800 Subject: [PATCH] =?UTF-8?q?feature=20#I883LB=20when=E7=BA=BF=E7=A8=8B?= =?UTF-8?q?=E6=B1=A0=E9=9A=94=E7=A6=BB=E6=94=AF=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../strategy/ParallelStrategyExecutor.java | 14 +++++++-- .../liteflow/property/LiteflowConfig.java | 16 ++++++++++ .../liteflow/thread/ExecutorBuilder.java | 4 +-- .../liteflow/thread/ExecutorHelper.java | 29 +++++++++++++++++-- .../LiteFlowDefaultMainExecutorBuilder.java | 2 +- .../liteflow/springboot/LiteflowProperty.java | 11 +++++++ .../LiteflowPropertyAutoConfiguration.java | 9 +++--- ...itional-spring-configuration-metadata.json | 7 +++++ .../META-INF/liteflow-default.properties | 1 + .../AsyncNodeThreadPoolELSpringbootTest.java | 1 + .../liteflow/test/asyncNode/cmp/FCmp.java | 1 - .../asyncNode/application2.properties | 5 ++-- .../src/test/resources/asyncNode/flow2.xml | 2 +- 13 files changed, 87 insertions(+), 15 deletions(-) diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java index 95adb04e..cf3ae9d4 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java @@ -1,5 +1,6 @@ package com.yomahub.liteflow.flow.parallel.strategy; +import cn.hutool.core.util.BooleanUtil; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import com.yomahub.liteflow.enums.ParallelStrategyEnum; @@ -90,8 +91,17 @@ public abstract class ParallelStrategyExecutor { String currChainName = whenCondition.getCurrChainId(); - // 此方法其实只会初始化一次 Executor,不会每次都会初始化。Executor是唯一的 - ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildWhenExecutor(whenCondition.getThreadExecutorClass()); + LiteflowConfig liteflowConfig = LiteflowConfigGetter.get(); + + // 如果设置了线程池隔离,则每个when都会有对应的线程池,这是为了避免多层嵌套时如果线程池数量不够时出现单个线程池死锁。用线程池隔离的方式会更加好 + // 如果when没有超多层的嵌套,还是用默认的比较好。 + // 默认设置不隔离。也就是说,默认情况是一个线程池类一个实例,如果什么都不配置,那也就是在when的情况下,全局一个线程池。 + ExecutorService parallelExecutor; + if (BooleanUtil.isTrue(liteflowConfig.getWhenThreadPoolIsolate())){ + parallelExecutor = ExecutorHelper.loadInstance().buildWhenExecutorWithHash(whenCondition.getThreadExecutorClass(), String.valueOf(whenCondition.hashCode())); + }else{ + parallelExecutor = ExecutorHelper.loadInstance().buildWhenExecutor(whenCondition.getThreadExecutorClass()); + } // 设置 whenCondition 参数 setWhenConditionParams(whenCondition); diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/property/LiteflowConfig.java b/liteflow-core/src/main/java/com/yomahub/liteflow/property/LiteflowConfig.java index c0cbf039..086c301d 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/property/LiteflowConfig.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/property/LiteflowConfig.java @@ -50,6 +50,9 @@ public class LiteflowConfig { private TimeUnit whenMaxWaitTimeUnit; + // 异步线程池是否隔离 + private Boolean whenThreadPoolIsolate; + // 是否打印监控log private Boolean enableLog; @@ -458,4 +461,17 @@ public class LiteflowConfig { public void setFallbackCmpEnable(Boolean fallbackCmpEnable) { this.fallbackCmpEnable = fallbackCmpEnable; } + + public Boolean getWhenThreadPoolIsolate() { + if (ObjectUtil.isNull(whenThreadPoolIsolate)) { + return Boolean.FALSE; + } + else { + return whenThreadPoolIsolate; + } + } + + public void setWhenThreadPoolIsolate(Boolean whenThreadPoolIsolate) { + this.whenThreadPoolIsolate = whenThreadPoolIsolate; + } } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorBuilder.java b/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorBuilder.java index 0fa98c01..0204dea4 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorBuilder.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorBuilder.java @@ -18,8 +18,8 @@ public interface ExecutorBuilder { // 构建默认的线程池对象 default ExecutorService buildDefaultExecutor(int corePoolSize, int maximumPoolSize, int queueCapacity, String threadName) { - return TtlExecutors.getTtlExecutorService(new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 0L, - TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(queueCapacity), new ThreadFactory() { + return TtlExecutors.getTtlExecutorService(new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 60, + TimeUnit.SECONDS, new ArrayBlockingQueue<>(queueCapacity), new ThreadFactory() { private final AtomicLong number = new AtomicLong(); @Override diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorHelper.java b/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorHelper.java index da487919..bdaa120d 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorHelper.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorHelper.java @@ -100,6 +100,20 @@ public class ExecutorHelper { return getExecutorService(clazz); } + // 构建when线程池 - clazz和condition的hash值共同作为缓存key + public ExecutorService buildWhenExecutorWithHash(String conditionHash) { + LiteflowConfig liteflowConfig = LiteflowConfigGetter.get(); + return buildWhenExecutorWithHash(liteflowConfig.getThreadExecutorClass(), conditionHash); + } + + // 构建when线程池 - clazz和condition的hash值共同作为缓存key + public ExecutorService buildWhenExecutorWithHash(String clazz, String conditionHash) { + if (StrUtil.isBlank(clazz)) { + return buildWhenExecutorWithHash(conditionHash); + } + return getExecutorService(clazz, conditionHash); + } + // 构建默认的FlowExecutor线程池,用于execute2Future方法 public ExecutorService buildMainExecutor() { LiteflowConfig liteflowConfig = LiteflowConfigGetter.get(); @@ -119,12 +133,23 @@ public class ExecutorHelper { return getExecutorService(liteflowConfig.getParallelLoopExecutorClass()); } + private ExecutorService getExecutorService(String clazz){ + return getExecutorService(clazz, null); + } + /** * 根据线程执行构建者Class类名获取ExecutorService实例 */ - private ExecutorService getExecutorService(String clazz) { + private ExecutorService getExecutorService(String clazz, String conditionHash) { try { - ExecutorService executorServiceFromCache = executorServiceMap.get(clazz); + String key; + if (StrUtil.isBlank(conditionHash)){ + key = clazz; + }else{ + key = StrUtil.format("{}_{}", clazz, conditionHash); + } + + ExecutorService executorServiceFromCache = executorServiceMap.get(key); if (ObjectUtil.isNotNull(executorServiceFromCache)) { return executorServiceFromCache; } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/thread/LiteFlowDefaultMainExecutorBuilder.java b/liteflow-core/src/main/java/com/yomahub/liteflow/thread/LiteFlowDefaultMainExecutorBuilder.java index 76a79d0a..d38b95ba 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/thread/LiteFlowDefaultMainExecutorBuilder.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/thread/LiteFlowDefaultMainExecutorBuilder.java @@ -20,7 +20,7 @@ public class LiteFlowDefaultMainExecutorBuilder implements ExecutorBuilder { if (ObjectUtil.isNull(liteflowConfig)) { liteflowConfig = new LiteflowConfig(); } - return buildDefaultExecutor(liteflowConfig.getMainExecutorWorks(), liteflowConfig.getMainExecutorWorks(), 200, + return buildDefaultExecutor(liteflowConfig.getMainExecutorWorks(), liteflowConfig.getMainExecutorWorks()*2, 200, "main-thread-"); } diff --git a/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowProperty.java b/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowProperty.java index 2ac3b5db..1557f472 100644 --- a/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowProperty.java +++ b/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowProperty.java @@ -51,6 +51,9 @@ public class LiteflowProperty { // 异步线程池最大队列数量 private int whenQueueLimit; + // 异步线程池是否隔离 + private boolean whenThreadPoolIsolate; + // 是否在启动时解析规则文件 // 这个参数主要给编码式注册元数据的场景用的,结合FlowBus.addNode一起用 private boolean parseOnStart; @@ -289,4 +292,12 @@ public class LiteflowProperty { public void setFallbackCmpEnable(Boolean fallbackCmpEnable) { this.fallbackCmpEnable = fallbackCmpEnable; } + + public boolean isWhenThreadPoolIsolate() { + return whenThreadPoolIsolate; + } + + public void setWhenThreadPoolIsolate(boolean whenThreadPoolIsolate) { + this.whenThreadPoolIsolate = whenThreadPoolIsolate; + } } diff --git a/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/config/LiteflowPropertyAutoConfiguration.java b/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/config/LiteflowPropertyAutoConfiguration.java index 71c98bb1..9bfc7bef 100644 --- a/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/config/LiteflowPropertyAutoConfiguration.java +++ b/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/config/LiteflowPropertyAutoConfiguration.java @@ -30,12 +30,9 @@ public class LiteflowPropertyAutoConfiguration { liteflowConfig.setWhenMaxWaitSeconds(property.getWhenMaxWaitSeconds()); liteflowConfig.setWhenMaxWaitTime(property.getWhenMaxWaitTime()); liteflowConfig.setWhenMaxWaitTimeUnit(property.getWhenMaxWaitTimeUnit()); - liteflowConfig.setEnableLog(liteflowMonitorProperty.isEnableLog()); - liteflowConfig.setQueueLimit(liteflowMonitorProperty.getQueueLimit()); - liteflowConfig.setDelay(liteflowMonitorProperty.getDelay()); - liteflowConfig.setPeriod(liteflowMonitorProperty.getPeriod()); liteflowConfig.setWhenMaxWorkers(property.getWhenMaxWorkers()); liteflowConfig.setWhenQueueLimit(property.getWhenQueueLimit()); + liteflowConfig.setWhenThreadPoolIsolate(property.isWhenThreadPoolIsolate()); liteflowConfig.setParseOnStart(property.isParseOnStart()); liteflowConfig.setEnable(property.isEnable()); liteflowConfig.setSupportMultipleType(property.isSupportMultipleType()); @@ -51,6 +48,10 @@ public class LiteflowPropertyAutoConfiguration { liteflowConfig.setParallelQueueLimit(property.getParallelQueueLimit()); liteflowConfig.setParallelLoopExecutorClass(property.getParallelLoopExecutorClass()); liteflowConfig.setFallbackCmpEnable(property.isFallbackCmpEnable()); + liteflowConfig.setEnableLog(liteflowMonitorProperty.isEnableLog()); + liteflowConfig.setQueueLimit(liteflowMonitorProperty.getQueueLimit()); + liteflowConfig.setDelay(liteflowMonitorProperty.getDelay()); + liteflowConfig.setPeriod(liteflowMonitorProperty.getPeriod()); return liteflowConfig; } diff --git a/liteflow-spring-boot-starter/src/main/resources/META-INF/additional-spring-configuration-metadata.json b/liteflow-spring-boot-starter/src/main/resources/META-INF/additional-spring-configuration-metadata.json index a2fd62a4..16606752 100644 --- a/liteflow-spring-boot-starter/src/main/resources/META-INF/additional-spring-configuration-metadata.json +++ b/liteflow-spring-boot-starter/src/main/resources/META-INF/additional-spring-configuration-metadata.json @@ -103,6 +103,13 @@ "sourceType": "com.yomahub.liteflow.springboot.LiteflowProperty", "defaultValue": 512 }, + { + "name": "liteflow.when-thread-pool-isolate", + "type": "java.lang.Boolean", + "description": "set whether the asynchronous thread pool is isolated.", + "sourceType": "com.yomahub.liteflow.springboot.LiteflowProperty", + "defaultValue": false + }, { "name": "liteflow.parse-on-start", "type": "java.lang.Boolean", diff --git a/liteflow-spring-boot-starter/src/main/resources/META-INF/liteflow-default.properties b/liteflow-spring-boot-starter/src/main/resources/META-INF/liteflow-default.properties index 56b111b6..d1a74903 100644 --- a/liteflow-spring-boot-starter/src/main/resources/META-INF/liteflow-default.properties +++ b/liteflow-spring-boot-starter/src/main/resources/META-INF/liteflow-default.properties @@ -9,6 +9,7 @@ liteflow.when-max-wait-time=15000 liteflow.when-max-wait-time-unit=MILLISECONDS liteflow.when-max-workers=16 liteflow.when-queue-limit=512 +liteflow.when-thread-pool-isolate=false liteflow.parse-on-start=true liteflow.retry-count=0 liteflow.support-multiple-type=false diff --git a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/AsyncNodeThreadPoolELSpringbootTest.java b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/AsyncNodeThreadPoolELSpringbootTest.java index f4cc56ed..53167278 100644 --- a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/AsyncNodeThreadPoolELSpringbootTest.java +++ b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/AsyncNodeThreadPoolELSpringbootTest.java @@ -20,6 +20,7 @@ public class AsyncNodeThreadPoolELSpringbootTest { @Resource private FlowExecutor flowExecutor; + // 测试当when嵌套层数大于最大线程个数时,并开启线程池隔离机制的正确性 @Test public void testAsyncFlow1() { LiteflowResponse response = flowExecutor.execute2Resp("chain1", "it's a base request"); diff --git a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/cmp/FCmp.java b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/cmp/FCmp.java index 63707249..c6ee31ba 100644 --- a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/cmp/FCmp.java +++ b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/cmp/FCmp.java @@ -8,7 +8,6 @@ public class FCmp extends NodeComponent { @Override public void process() throws Exception { - Thread.sleep(1500); System.out.println("Fcomp executed!"); } diff --git a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/resources/asyncNode/application2.properties b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/resources/asyncNode/application2.properties index a317b257..99a064ee 100644 --- a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/resources/asyncNode/application2.properties +++ b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/resources/asyncNode/application2.properties @@ -1,4 +1,5 @@ liteflow.rule-source=asyncNode/flow2.xml liteflow.when-max-workers=4 -liteflow.when-max-wait-time=20 -liteflow.when-max-wait-time-unit=SECONDS \ No newline at end of file +liteflow.when-max-wait-time=3600000 +liteflow.when-max-wait-time-unit=MILLISECONDS +liteflow.when-thread-pool-isolate=true \ No newline at end of file diff --git a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/resources/asyncNode/flow2.xml b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/resources/asyncNode/flow2.xml index 3b124976..1829691f 100644 --- a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/resources/asyncNode/flow2.xml +++ b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/resources/asyncNode/flow2.xml @@ -2,6 +2,6 @@ - WHEN(f, f, WHEN(f, WHEN(f, f, f, f, WHEN(f, f, f, f, f, f, f))), f, f); + WHEN(f, WHEN(f, WHEN(f, WHEN(f, WHEN(f, WHEN(f, WHEN(f))))))); \ No newline at end of file