为异步超时提供新的工具类

This commit is contained in:
everywhere.z 2024-02-12 16:47:42 +08:00
parent 2d9c357d8f
commit dd776b8c62
5 changed files with 107 additions and 17 deletions

View File

@ -0,0 +1,87 @@
package com.yomahub.liteflow.flow.parallel;
import java.util.concurrent.*;
import java.util.function.BiConsumer;
public class CompletableFutureExpand {
/**
* 如果在给定超时之前未完成则异常完成此 CompletableFuture 并抛出 {@link TimeoutException}
*
* @param timeout 在出现 TimeoutException 异常完成之前等待多长时间 {@code unit} 为单位
* @param unit 一个 {@link TimeUnit}结合 {@code timeout} 参数表示给定粒度单位的持续时间
* @return 入参的 CompletableFuture
*/
public static <T> CompletableFuture<T> completeOnTimeout(CompletableFuture<T> future, long timeout, TimeUnit unit, T timeoutDefaultObj) {
if (future.isDone()) {
return future;
}
return future.whenComplete(new Canceller(Delayer.delay(new Timeout<>(future, timeoutDefaultObj), timeout, unit)));
}
/**
* 超时时异常完成的操作
*/
static final class Timeout<T> implements Runnable {
final CompletableFuture<T> future;
final T timeoutDefaultObj;
Timeout(CompletableFuture<T> future, T timeoutDefaultObj) {
this.future = future;
this.timeoutDefaultObj = timeoutDefaultObj;
}
public void run() {
if (null != future && !future.isDone()) {
future.complete(timeoutDefaultObj);
}
}
}
/**
* 取消不需要的超时的操作
*/
static final class Canceller implements BiConsumer<Object, Throwable> {
final Future<?> future;
Canceller(Future<?> future) {
this.future = future;
}
public void accept(Object ignore, Throwable ex) {
if (null == ex && null != future && !future.isDone()) {
future.cancel(false);
}
}
}
/**
* 单例延迟调度器仅用于启动和取消任务一个线程就足够
*/
static final class Delayer {
static final ScheduledThreadPoolExecutor delayer;
static ScheduledFuture<?> delay(Runnable command, long delay, TimeUnit unit) {
return delayer.schedule(command, delay, unit);
}
static final class DaemonThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("CompletableFutureExpandUtilsDelayScheduler");
return t;
}
}
static {
delayer = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory());
delayer.setRemoveOnCancelPolicy(true);
}
}
}

View File

@ -63,8 +63,7 @@ public class CompletableFutureTimeout {
}
// 哪个先完成 就apply哪一个结果 这是一个关键的API,exceptionally出现异常后返回默认值
public static <T> CompletableFuture<T> completeOnTimeout(T t, CompletableFuture<T> future, long timeout,
TimeUnit unit) {
public static <T> CompletableFuture<T> completeOnTimeout(CompletableFuture<T> future, long timeout, TimeUnit unit, T t) {
final CompletableFuture<T> timeoutFuture = timeoutAfter(timeout, unit);
return future.applyToEither(timeoutFuture, Function.identity()).exceptionally((throwable) -> t);
}

View File

@ -10,6 +10,7 @@ import com.yomahub.liteflow.flow.element.Node;
import com.yomahub.liteflow.flow.element.condition.FinallyCondition;
import com.yomahub.liteflow.flow.element.condition.PreCondition;
import com.yomahub.liteflow.flow.element.condition.WhenCondition;
import com.yomahub.liteflow.flow.parallel.CompletableFutureExpand;
import com.yomahub.liteflow.flow.parallel.CompletableFutureTimeout;
import com.yomahub.liteflow.flow.parallel.ParallelSupplier;
import com.yomahub.liteflow.flow.parallel.WhenFutureObj;
@ -53,11 +54,11 @@ public abstract class ParallelStrategyExecutor {
WhenCondition whenCondition, String currChainName, Integer slotIndex) {
// 套入 CompletableFutureTimeout 方法进行超时判断如果超时则用 WhenFutureObj.timeOut 返回超时的对象
// 2 个参数是主要的本体 CompletableFuture传入了 ParallelSupplier 和线程池对象
return CompletableFutureTimeout.completeOnTimeout(
WhenFutureObj.timeOut(executable.getId()),
return CompletableFutureExpand.completeOnTimeout(
CompletableFuture.supplyAsync(new ParallelSupplier(executable, currChainName, slotIndex), parallelExecutor),
whenCondition.getMaxWaitTime(),
whenCondition.getMaxWaitTimeUnit());
whenCondition.getMaxWaitTimeUnit(),
WhenFutureObj.timeOut(executable.getId()));
}
/**
@ -167,11 +168,11 @@ public abstract class ParallelStrategyExecutor {
* 任务结果处理
* @param whenCondition 并行组件对象
* @param slotIndex 当前 slot index
* @param whenAllTaskList 并行组件中所有任务列表
* @param whenAllFutureList 并行组件中所有任务列表
* @param specifyTask 指定预先完成的任务详见 {@link ParallelStrategyEnum}
* @throws Exception
*/
protected void handleTaskResult(WhenCondition whenCondition, Integer slotIndex, List<CompletableFuture<WhenFutureObj>> whenAllTaskList,
protected void handleTaskResult(WhenCondition whenCondition, Integer slotIndex, List<CompletableFuture<WhenFutureObj>> whenAllFutureList,
CompletableFuture<?> specifyTask) throws Exception {
Slot slot = DataBus.getSlot(slotIndex);
@ -193,17 +194,23 @@ public abstract class ParallelStrategyExecutor {
// 如果 any true那么这里拿到的是第一个完成的任务
// 如果为 must那么这里获取到的就是指定的任务
// 这里过滤和转换一起用 lambda 做了
List<WhenFutureObj> allCompletableWhenFutureObjList = whenAllTaskList.stream().filter(f -> {
List<WhenFutureObj> allCompletableWhenFutureObjList = whenAllFutureList.stream().filter(f -> {
// 过滤出已经完成的没完成的就直接终止
if (f.isDone()) {
return true;
} else {
//事实上CompletableFuture并不能cancel掉底层的线程
f.cancel(true);
return false;
}
}).map(f -> {
try {
return f.get();
WhenFutureObj whenFutureObj = f.get();
if (whenFutureObj.isTimeout()){
//事实上CompletableFuture并不能cancel掉底层的线程
f.cancel(true);
}
return whenFutureObj;
} catch (InterruptedException | ExecutionException e) {
interrupted[0] = true;
return null;

View File

@ -16,7 +16,7 @@ public class BCmp extends NodeComponent {
@Override
public void process() {
try {
Thread.sleep(4000);
Thread.sleep(2000);
}
catch (Exception ignored) {

View File

@ -14,14 +14,11 @@ import org.springframework.stereotype.Component;
public class CCmp extends NodeComponent {
@Override
public void process() {
try {
Thread.sleep(3500);
public void process() throws Exception{
for (int i = 0; i < 10; i++) {
System.out.println("executing cmp c");
Thread.sleep(500);
}
catch (Exception ignored) {
}
System.out.println("CCmp executed!");
}
}