diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/CompletableFutureExpand.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/CompletableFutureExpand.java new file mode 100644 index 00000000..37ba6dbb --- /dev/null +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/CompletableFutureExpand.java @@ -0,0 +1,87 @@ +package com.yomahub.liteflow.flow.parallel; + +import java.util.concurrent.*; +import java.util.function.BiConsumer; + +public class CompletableFutureExpand { + + /** + * 如果在给定超时之前未完成,则异常完成此 CompletableFuture 并抛出 {@link TimeoutException} 。 + * + * @param timeout 在出现 TimeoutException 异常完成之前等待多长时间,以 {@code unit} 为单位 + * @param unit 一个 {@link TimeUnit},结合 {@code timeout} 参数,表示给定粒度单位的持续时间 + * @return 入参的 CompletableFuture + */ + public static CompletableFuture completeOnTimeout(CompletableFuture future, long timeout, TimeUnit unit, T timeoutDefaultObj) { + if (future.isDone()) { + return future; + } + + return future.whenComplete(new Canceller(Delayer.delay(new Timeout<>(future, timeoutDefaultObj), timeout, unit))); + } + + /** + * 超时时异常完成的操作 + */ + static final class Timeout implements Runnable { + final CompletableFuture future; + + final T timeoutDefaultObj; + + Timeout(CompletableFuture future, T timeoutDefaultObj) { + this.future = future; + this.timeoutDefaultObj = timeoutDefaultObj; + } + + public void run() { + if (null != future && !future.isDone()) { + future.complete(timeoutDefaultObj); + } + } + } + + /** + * 取消不需要的超时的操作 + */ + static final class Canceller implements BiConsumer { + final Future future; + + Canceller(Future future) { + this.future = future; + } + + public void accept(Object ignore, Throwable ex) { + if (null == ex && null != future && !future.isDone()) { + future.cancel(false); + } + } + } + + /** + * 单例延迟调度器,仅用于启动和取消任务,一个线程就足够 + */ + static final class Delayer { + + static final ScheduledThreadPoolExecutor delayer; + + static ScheduledFuture delay(Runnable command, long delay, TimeUnit unit) { + return delayer.schedule(command, delay, unit); + } + + static final class DaemonThreadFactory implements ThreadFactory { + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r); + t.setDaemon(true); + t.setName("CompletableFutureExpandUtilsDelayScheduler"); + return t; + } + } + + static { + delayer = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory()); + delayer.setRemoveOnCancelPolicy(true); + } + } +} diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/CompletableFutureTimeout.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/CompletableFutureTimeout.java index e73ec8c7..54e645eb 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/CompletableFutureTimeout.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/CompletableFutureTimeout.java @@ -63,8 +63,7 @@ public class CompletableFutureTimeout { } // 哪个先完成 就apply哪一个结果 这是一个关键的API,exceptionally出现异常后返回默认值 - public static CompletableFuture completeOnTimeout(T t, CompletableFuture future, long timeout, - TimeUnit unit) { + public static CompletableFuture completeOnTimeout(CompletableFuture future, long timeout, TimeUnit unit, T t) { final CompletableFuture timeoutFuture = timeoutAfter(timeout, unit); return future.applyToEither(timeoutFuture, Function.identity()).exceptionally((throwable) -> t); } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java index 27a8f489..c0cabd24 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java @@ -10,6 +10,7 @@ import com.yomahub.liteflow.flow.element.Node; import com.yomahub.liteflow.flow.element.condition.FinallyCondition; import com.yomahub.liteflow.flow.element.condition.PreCondition; import com.yomahub.liteflow.flow.element.condition.WhenCondition; +import com.yomahub.liteflow.flow.parallel.CompletableFutureExpand; import com.yomahub.liteflow.flow.parallel.CompletableFutureTimeout; import com.yomahub.liteflow.flow.parallel.ParallelSupplier; import com.yomahub.liteflow.flow.parallel.WhenFutureObj; @@ -53,11 +54,11 @@ public abstract class ParallelStrategyExecutor { WhenCondition whenCondition, String currChainName, Integer slotIndex) { // 套入 CompletableFutureTimeout 方法进行超时判断,如果超时则用 WhenFutureObj.timeOut 返回超时的对象 // 第 2 个参数是主要的本体 CompletableFuture,传入了 ParallelSupplier 和线程池对象 - return CompletableFutureTimeout.completeOnTimeout( - WhenFutureObj.timeOut(executable.getId()), + return CompletableFutureExpand.completeOnTimeout( CompletableFuture.supplyAsync(new ParallelSupplier(executable, currChainName, slotIndex), parallelExecutor), whenCondition.getMaxWaitTime(), - whenCondition.getMaxWaitTimeUnit()); + whenCondition.getMaxWaitTimeUnit(), + WhenFutureObj.timeOut(executable.getId())); } /** @@ -167,11 +168,11 @@ public abstract class ParallelStrategyExecutor { * 任务结果处理 * @param whenCondition 并行组件对象 * @param slotIndex 当前 slot 的 index - * @param whenAllTaskList 并行组件中所有任务列表 + * @param whenAllFutureList 并行组件中所有任务列表 * @param specifyTask 指定预先完成的任务,详见 {@link ParallelStrategyEnum} * @throws Exception */ - protected void handleTaskResult(WhenCondition whenCondition, Integer slotIndex, List> whenAllTaskList, + protected void handleTaskResult(WhenCondition whenCondition, Integer slotIndex, List> whenAllFutureList, CompletableFuture specifyTask) throws Exception { Slot slot = DataBus.getSlot(slotIndex); @@ -193,17 +194,23 @@ public abstract class ParallelStrategyExecutor { // 如果 any 为 true,那么这里拿到的是第一个完成的任务 // 如果为 must,那么这里获取到的就是指定的任务 // 这里过滤和转换一起用 lambda 做了 - List allCompletableWhenFutureObjList = whenAllTaskList.stream().filter(f -> { + List allCompletableWhenFutureObjList = whenAllFutureList.stream().filter(f -> { // 过滤出已经完成的,没完成的就直接终止 if (f.isDone()) { return true; } else { + //事实上CompletableFuture并不能cancel掉底层的线程 f.cancel(true); return false; } }).map(f -> { try { - return f.get(); + WhenFutureObj whenFutureObj = f.get(); + if (whenFutureObj.isTimeout()){ + //事实上CompletableFuture并不能cancel掉底层的线程 + f.cancel(true); + } + return whenFutureObj; } catch (InterruptedException | ExecutionException e) { interrupted[0] = true; return null; diff --git a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/whenTimeOut/cmp/BCmp.java b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/whenTimeOut/cmp/BCmp.java index f5778fcd..9c3cb6bd 100644 --- a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/whenTimeOut/cmp/BCmp.java +++ b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/whenTimeOut/cmp/BCmp.java @@ -16,7 +16,7 @@ public class BCmp extends NodeComponent { @Override public void process() { try { - Thread.sleep(4000); + Thread.sleep(2000); } catch (Exception ignored) { diff --git a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/whenTimeOut/cmp/CCmp.java b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/whenTimeOut/cmp/CCmp.java index e50c9689..abdb1f12 100644 --- a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/whenTimeOut/cmp/CCmp.java +++ b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/whenTimeOut/cmp/CCmp.java @@ -14,14 +14,11 @@ import org.springframework.stereotype.Component; public class CCmp extends NodeComponent { @Override - public void process() { - try { - Thread.sleep(3500); + public void process() throws Exception{ + for (int i = 0; i < 10; i++) { + System.out.println("executing cmp c"); + Thread.sleep(500); } - catch (Exception ignored) { - - } - System.out.println("CCmp executed!"); } }