DtpExecutorProps add autoCreate field, and support reset adapter tp prop with global config, and support dynamic update more fields for ExecutorWrapper

This commit is contained in:
yanhom 2024-09-22 21:47:44 +08:00
parent 94b1b53a44
commit eefda154b5
18 changed files with 427 additions and 181 deletions

View File

@ -19,6 +19,7 @@ package org.dromara.dynamictp.adapter.dubbo.apache;
import org.apache.dubbo.common.threadpool.support.eager.EagerThreadPoolExecutor;
import org.apache.dubbo.common.threadpool.support.eager.TaskQueue;
import org.dromara.dynamictp.common.util.ExecutorUtil;
import org.dromara.dynamictp.core.aware.AwareManager;
import org.dromara.dynamictp.core.aware.RejectHandlerAware;
import org.dromara.dynamictp.core.aware.TaskEnhanceAware;
@ -44,7 +45,7 @@ public class EagerThreadPoolExecutorProxy extends EagerThreadPoolExecutor implem
/**
* Reject handler type.
*/
private final String rejectHandlerType;
private String rejectHandlerType;
public EagerThreadPoolExecutorProxy(EagerThreadPoolExecutor executor) {
super(executor.getCorePoolSize(), executor.getMaximumPoolSize(),
@ -65,14 +66,15 @@ public class EagerThreadPoolExecutorProxy extends EagerThreadPoolExecutor implem
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
AwareManager.beforeExecute(this, t, r);
super.beforeExecute(t, r);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
AwareManager.afterExecute(this, r, t);
ExecutorUtil.tryExecAfterExecute(r, t);
}
@Override
@ -89,4 +91,9 @@ public class EagerThreadPoolExecutorProxy extends EagerThreadPoolExecutor implem
public String getRejectHandlerType() {
return rejectHandlerType;
}
@Override
public void setRejectHandlerType(String rejectHandlerType) {
this.rejectHandlerType = rejectHandlerType;
}
}

View File

@ -18,6 +18,7 @@
package org.dromara.dynamictp.adapter.motan;
import com.weibo.api.motan.transport.netty.StandardThreadExecutor;
import org.dromara.dynamictp.common.util.ExecutorUtil;
import org.dromara.dynamictp.common.util.ReflectionUtil;
import org.dromara.dynamictp.core.aware.AwareManager;
import org.dromara.dynamictp.core.aware.RejectHandlerAware;
@ -42,7 +43,7 @@ public class StandardThreadExecutorProxy extends StandardThreadExecutor implemen
*/
private List<TaskWrapper> taskWrappers;
private final String rejectHandlerType;
private String rejectHandlerType;
public StandardThreadExecutorProxy(StandardThreadExecutor executor) {
super(executor.getCorePoolSize(), executor.getMaximumPoolSize(),
@ -70,14 +71,15 @@ public class StandardThreadExecutorProxy extends StandardThreadExecutor implemen
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
AwareManager.beforeExecute(this, t, r);
super.beforeExecute(t, r);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
AwareManager.afterExecute(this, r, t);
ExecutorUtil.tryExecAfterExecute(r, t);
}
@Override
@ -94,4 +96,9 @@ public class StandardThreadExecutorProxy extends StandardThreadExecutor implemen
public String getRejectHandlerType() {
return rejectHandlerType;
}
@Override
public void setRejectHandlerType(String rejectHandlerType) {
this.rejectHandlerType = rejectHandlerType;
}
}

View File

@ -50,31 +50,13 @@ public class DtpExecutorProps extends TpExecutorProps {
*/
private boolean fair = false;
/**
* Thread name prefix.
*/
private String threadNamePrefix = "dtp";
/**
* Whether to wait for scheduled tasks to complete on shutdown,
* not interrupting running tasks and executing all tasks in the queue.
*/
private boolean waitForTasksToCompleteOnShutdown = true;
/**
* The maximum number of seconds that this executor is supposed to block
* on shutdown in order to wait for remaining tasks to complete their execution
* before the rest of the container continues to shut down.
*/
private int awaitTerminationSeconds = 3;
/**
* If enhance reject.
*/
private boolean rejectEnhanced = true;
/**
* Plugin names.
*/
private Set<String> pluginNames;
/**
* If false, will not auto create dtpExecutor, default is true.
*/
private boolean autoCreate = true;
}

View File

@ -46,9 +46,9 @@ public class TpExecutorProps {
private String threadPoolAliasName;
/**
* If false, will not auto create dtpExecutor, default is true.
* Thread name prefix.
*/
private boolean autoCreateDtp = true;
private String threadNamePrefix = "dtp";
/**
* CoreSize of ThreadPool.
@ -87,6 +87,11 @@ public class TpExecutorProps {
*/
private String rejectedHandlerType = RejectedTypeEnum.ABORT_POLICY.getName();
/**
* If enhance reject.
*/
private boolean rejectEnhanced = true;
/**
* If allow core thread timeout.
*/
@ -127,6 +132,19 @@ public class TpExecutorProps {
*/
private long queueTimeout = 0;
/**
* Whether to wait for scheduled tasks to complete on shutdown,
* not interrupting running tasks and executing all tasks in the queue.
*/
private boolean waitForTasksToCompleteOnShutdown = true;
/**
* The maximum number of seconds that this executor is supposed to block
* on shutdown in order to wait for remaining tasks to complete their execution
* before the rest of the container continues to shut down.
*/
private int awaitTerminationSeconds = 3;
/**
* Task wrapper names.
*/

View File

@ -0,0 +1,177 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.dynamictp.common.util;
import cn.hutool.core.util.ReflectUtil;
import lombok.val;
import org.apache.commons.collections4.CollectionUtils;
import org.dromara.dynamictp.common.entity.DtpExecutorProps;
import org.dromara.dynamictp.common.entity.TpExecutorProps;
import org.dromara.dynamictp.common.properties.DtpProperties;
import org.springframework.core.env.Environment;
import java.lang.reflect.Field;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import static org.dromara.dynamictp.common.constant.DynamicTpConst.AWARE_NAMES;
import static org.dromara.dynamictp.common.constant.DynamicTpConst.EXECUTORS_CONFIG_PREFIX;
import static org.dromara.dynamictp.common.constant.DynamicTpConst.GLOBAL_CONFIG_PREFIX;
import static org.dromara.dynamictp.common.constant.DynamicTpConst.MAIN_PROPERTIES_PREFIX;
import static org.dromara.dynamictp.common.constant.DynamicTpConst.NOTIFY_ITEMS;
import static org.dromara.dynamictp.common.constant.DynamicTpConst.PLATFORM_IDS;
/**
* DtpPropertiesBinderUtil related
*
* @author yanhom
* @since 1.1.0
*/
@SuppressWarnings("unchecked")
public final class DtpPropertiesBinderUtil {
private DtpPropertiesBinderUtil() {
}
/**
* Assign global environment variable to property
*
* @param source environment
* @param dtpProperties dtpProperties
*/
public static void tryResetWithGlobalConfig(Object source, DtpProperties dtpProperties) {
if (Objects.isNull(dtpProperties.getGlobalExecutorProps())) {
return;
}
if (CollectionUtils.isNotEmpty(dtpProperties.getExecutors())) {
tryResetCusExecutors(dtpProperties, source);
}
tryResetAdapterExecutors(dtpProperties, source);
}
private static void tryResetCusExecutors(DtpProperties dtpProperties, Object source) {
val dtpPropsFields = ReflectionUtil.getAllFields(DtpExecutorProps.class);
int[] idx = {0};
dtpProperties.getExecutors().forEach(executor -> {
dtpPropsFields.forEach(field -> {
String executorFieldKey = EXECUTORS_CONFIG_PREFIX + idx[0] + "]." + field.getName();
setBasicField(source, field, executor, executorFieldKey);
});
setListField(dtpProperties, executor);
val globalExecutorProps = dtpProperties.getGlobalExecutorProps();
if (CollectionUtils.isEmpty(executor.getPluginNames()) &&
CollectionUtils.isNotEmpty(globalExecutorProps.getPluginNames())) {
executor.setPluginNames(globalExecutorProps.getPluginNames());
}
idx[0]++;
});
}
private static void tryResetAdapterExecutors(DtpProperties dtpProperties, Object source) {
val dtpPropertiesFields = ReflectionUtil.getAllFields(DtpProperties.class);
val tpExecutorPropFields = ReflectionUtil.getAllFields(TpExecutorProps.class);
dtpPropertiesFields.forEach(dtpPropertiesField -> {
val targetObj = ReflectUtil.getFieldValue(dtpProperties, dtpPropertiesField);
if (Objects.isNull(targetObj)) {
return;
}
if (dtpPropertiesField.getType().isAssignableFrom(TpExecutorProps.class)) {
tpExecutorPropFields.forEach(tpField -> setBasicField(source, tpField, dtpPropertiesField.getName(), targetObj));
setListField(dtpProperties, targetObj);
} else if (dtpPropertiesField.getGenericType() instanceof ParameterizedType) {
ParameterizedType paramType = (ParameterizedType) dtpPropertiesField.getGenericType();
Type[] argTypes = paramType.getActualTypeArguments();
if (argTypes.length == 1 && argTypes[0].equals(TpExecutorProps.class)) {
List<TpExecutorProps> tpExecutorProps = (List<TpExecutorProps>) targetObj;
if (CollectionUtils.isEmpty(tpExecutorProps)) {
return;
}
int[] idx = {0};
tpExecutorProps.forEach(tpProp -> {
tpExecutorPropFields.forEach(tpField -> setBasicField(source, tpField, dtpPropertiesField.getName(), tpProp, idx));
setListField(dtpProperties, tpProp);
idx[0]++;
});
}
}
});
}
private static Object getProperty(String key, Object environment) {
if (environment instanceof Environment) {
Environment env = (Environment) environment;
return env.getProperty(key);
} else if (environment instanceof Map) {
Map<?, Object> properties = (Map<?, Object>) environment;
return properties.get(key);
}
return null;
}
private static void setBasicField(Object source, Field tpPropField, String targetObjName, Object targetObj, int[] idx) {
String executorFieldKey = MAIN_PROPERTIES_PREFIX + "." + targetObjName + "[" + idx[0] + "]." + tpPropField.getName();
setBasicField(source, tpPropField, targetObj, executorFieldKey);
}
private static void setBasicField(Object source, Field tpPropField, String targetObjName, Object targetObj) {
String executorFieldKey = MAIN_PROPERTIES_PREFIX + "." + targetObjName + "." + tpPropField.getName();
setBasicField(source, tpPropField, targetObj, executorFieldKey);
}
private static void setBasicField(Object source, Field tpPropField, Object targetObj, String executorFieldKey) {
Object executorFieldVal = getProperty(executorFieldKey, source);
if (Objects.nonNull(executorFieldVal)) {
return;
}
Object globalFieldVal = getProperty(GLOBAL_CONFIG_PREFIX + tpPropField.getName(), source);
if (Objects.isNull(globalFieldVal)) {
return;
}
ReflectUtil.setFieldValue(targetObj, tpPropField.getName(), globalFieldVal);
}
private static void setListField(DtpProperties dtpProperties, Object fieldVal) {
val globalExecutorProps = dtpProperties.getGlobalExecutorProps();
val taskWrappers = (Collection<?>) ReflectUtil.getFieldValue(fieldVal, "taskWrapperNames");
if (CollectionUtils.isEmpty(taskWrappers) &&
CollectionUtils.isNotEmpty(globalExecutorProps.getTaskWrapperNames())) {
ReflectUtil.setFieldValue(fieldVal, "taskWrapperNames", globalExecutorProps.getTaskWrapperNames());
}
val platformIds = (List<?>) ReflectUtil.getFieldValue(fieldVal, PLATFORM_IDS);
if (CollectionUtils.isEmpty(platformIds) &&
CollectionUtils.isNotEmpty(globalExecutorProps.getPlatformIds())) {
ReflectUtil.setFieldValue(fieldVal, PLATFORM_IDS, globalExecutorProps.getPlatformIds());
}
val notifyItems = (List<?>) ReflectUtil.getFieldValue(fieldVal, NOTIFY_ITEMS);
if (CollectionUtils.isEmpty(notifyItems) &&
CollectionUtils.isNotEmpty(globalExecutorProps.getNotifyItems())) {
ReflectUtil.setFieldValue(fieldVal, NOTIFY_ITEMS, globalExecutorProps.getNotifyItems());
}
val awareNames = (List<?>) ReflectUtil.getFieldValue(fieldVal, AWARE_NAMES);
if (CollectionUtils.isEmpty(awareNames) &&
CollectionUtils.isNotEmpty(globalExecutorProps.getAwareNames())) {
ReflectUtil.setFieldValue(fieldVal, AWARE_NAMES, globalExecutorProps.getAwareNames());
}
}
}

View File

@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.dynamictp.common.util;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import java.util.Objects;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import static org.dromara.dynamictp.common.constant.DynamicTpConst.TRACE_ID;
/**
* ExecutorUtil related
*
* @author yanhom
* @since 1.1.9
*/
@Slf4j
public final class ExecutorUtil {
private ExecutorUtil() {
}
public static void tryExecAfterExecute(Runnable r, Throwable t) {
tryPrintError(r, t);
tryClearContext();
}
private static void tryPrintError(Runnable r, Throwable t) {
if (Objects.nonNull(t)) {
log.error("DynamicTp execute, thread {} throw exception, traceId {}",
Thread.currentThread(), MDC.get(TRACE_ID), t);
return;
}
if (r instanceof FutureTask) {
try {
Future<?> future = (Future<?>) r;
future.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
log.error("DynamicTp execute, thread {} throw exception, traceId {}",
Thread.currentThread(), MDC.get(TRACE_ID), e);
}
}
}
public static void tryClearContext() {
MDC.remove(TRACE_ID);
}
}

View File

@ -36,6 +36,7 @@ import org.dromara.dynamictp.common.util.StreamUtil;
import org.dromara.dynamictp.core.aware.AwareManager;
import org.dromara.dynamictp.core.converter.ExecutorConverter;
import org.dromara.dynamictp.core.executor.DtpExecutor;
import org.dromara.dynamictp.core.executor.NamedThreadFactory;
import org.dromara.dynamictp.core.notifier.manager.NoticeManager;
import org.dromara.dynamictp.core.notifier.manager.NotifyHelper;
import org.dromara.dynamictp.core.reject.RejectHandlerGetter;
@ -45,6 +46,7 @@ import org.dromara.dynamictp.core.support.task.wrapper.TaskWrapper;
import org.dromara.dynamictp.core.support.task.wrapper.TaskWrappers;
import org.springframework.context.event.ContextRefreshedEvent;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -244,9 +246,8 @@ public class DtpRegistry extends OnceApplicationContextEventListener {
String currentRejectHandlerType = executor.getRejectHandlerType();
if (!Objects.equals(currentRejectHandlerType, props.getRejectedHandlerType())) {
val rejectHandler = RejectHandlerGetter.buildRejectedHandler(props.getRejectedHandlerType());
executor.setRejectedExecutionHandler(rejectHandler);
executorWrapper.setRejectHandler(rejectHandler);
}
executorWrapper.setPreStartAllCoreThreads(props.isPreStartAllCoreThreads());
List<TaskWrapper> taskWrappers = TaskWrappers.getInstance().getByNames(props.getTaskWrapperNames());
executorWrapper.setTaskWrappers(taskWrappers);
@ -255,6 +256,7 @@ public class DtpRegistry extends OnceApplicationContextEventListener {
NotifyHelper.updateNotifyInfo(executorWrapper, props, dtpProperties.getPlatforms());
// update aware related
AwareManager.refresh(executorWrapper, props);
updateWrapper(executorWrapper, props);
}
private static void doRefreshDtp(ExecutorWrapper executorWrapper, DtpExecutorProps props) {
@ -263,14 +265,29 @@ public class DtpRegistry extends OnceApplicationContextEventListener {
if (StringUtils.isNotBlank(props.getThreadPoolAliasName())) {
executor.setThreadPoolAliasName(props.getThreadPoolAliasName());
}
executor.setPreStartAllCoreThreads(props.isPreStartAllCoreThreads());
if (executor.getThreadFactory() instanceof NamedThreadFactory) {
String prefix = ((NamedThreadFactory) executor.getThreadFactory()).getNamePrefix();
if (!Objects.equals(prefix, props.getThreadNamePrefix())) {
((NamedThreadFactory) executor.getThreadFactory()).setNamePrefix(props.getThreadNamePrefix());
}
}
// update reject handler
executor.setRejectEnhanced(props.isRejectEnhanced());
if (!Objects.equals(executor.getRejectHandlerType(), props.getRejectedHandlerType())) {
executor.setRejectHandler(RejectHandlerGetter.buildRejectedHandler(props.getRejectedHandlerType()));
}
// update timeout related
executor.setRunTimeout(props.getRunTimeout());
executor.setQueueTimeout(props.getQueueTimeout());
executor.setTryInterrupt(props.isTryInterrupt());
// update shutdown related
executor.setWaitForTasksToCompleteOnShutdown(props.isWaitForTasksToCompleteOnShutdown());
executor.setAwaitTerminationSeconds(props.getAwaitTerminationSeconds());
executor.setPreStartAllCoreThreads(props.isPreStartAllCoreThreads());
List<TaskWrapper> taskWrappers = TaskWrappers.getInstance().getByNames(props.getTaskWrapperNames());
executor.setTaskWrappers(taskWrappers);
@ -278,14 +295,18 @@ public class DtpRegistry extends OnceApplicationContextEventListener {
NotifyHelper.updateNotifyInfo(executor, props, dtpProperties.getPlatforms());
// update aware related
AwareManager.refresh(executorWrapper, props);
updateWrapper(executorWrapper, executor);
updateWrapper(executorWrapper, props);
}
private static void updateWrapper(ExecutorWrapper executorWrapper, DtpExecutor executor) {
executorWrapper.setThreadPoolAliasName(executor.getThreadPoolAliasName());
executorWrapper.setNotifyItems(executor.getNotifyItems());
executorWrapper.setPlatformIds(executor.getPlatformIds());
executorWrapper.setNotifyEnabled(executor.isNotifyEnabled());
private static void updateWrapper(ExecutorWrapper executorWrapper, DtpExecutorProps props) {
executorWrapper.setThreadPoolAliasName(props.getThreadPoolAliasName());
executorWrapper.setNotifyItems(props.getNotifyItems());
executorWrapper.setPlatformIds(props.getPlatformIds());
executorWrapper.setNotifyEnabled(props.isNotifyEnabled());
executorWrapper.setPreStartAllCoreThreads(props.isPreStartAllCoreThreads());
executorWrapper.setRejectEnhanced(props.isRejectEnhanced());
executorWrapper.setWaitForTasksToCompleteOnShutdown(props.isWaitForTasksToCompleteOnShutdown());
executorWrapper.setAwaitTerminationSeconds(props.getAwaitTerminationSeconds());
}
/**
@ -336,17 +357,17 @@ public class DtpRegistry extends OnceApplicationContextEventListener {
@Override
protected void onContextRefreshedEvent(ContextRefreshedEvent event) {
val executors = Optional.ofNullable(dtpProperties.getExecutors()).orElse(Collections.emptyList());
Set<String> remoteExecutors = Collections.emptySet();
if (CollectionUtils.isNotEmpty(executors)) {
remoteExecutors = executors.stream()
.map(DtpExecutorProps::getThreadPoolName)
.collect(Collectors.toSet());
}
val registeredExecutors = Sets.newHashSet(EXECUTOR_REGISTRY.keySet());
Collection<String> remoteExecutors = Collections.emptySet();
if (CollectionUtils.isNotEmpty(executors)) {
remoteExecutors = CollectionUtils.intersection(executors.stream()
.map(DtpExecutorProps::getThreadPoolName)
.collect(Collectors.toSet()), registeredExecutors);
}
val localExecutors = CollectionUtils.subtract(registeredExecutors, remoteExecutors);
// refresh just for non-dtp executors
val nonDtpExecutors = executors.stream().filter(e -> !e.isAutoCreateDtp()).collect(toList());
val nonDtpExecutors = executors.stream().filter(e -> !e.isAutoCreate()).collect(toList());
if (CollectionUtils.isNotEmpty(nonDtpExecutors)) {
nonDtpExecutors.forEach(DtpRegistry::refresh);
}

View File

@ -31,4 +31,13 @@ public interface RejectHandlerAware extends DtpAware {
* @return reject handler type
*/
String getRejectHandlerType();
/**
* Set reject handler type.
*
* @param rejectHandlerType reject handler type
*/
default void setRejectHandlerType(String rejectHandlerType) {
}
}

View File

@ -23,6 +23,7 @@ import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.dromara.dynamictp.common.em.NotifyItemEnum;
import org.dromara.dynamictp.common.entity.NotifyItem;
import org.dromara.dynamictp.common.util.ExecutorUtil;
import org.dromara.dynamictp.core.aware.AwareManager;
import org.dromara.dynamictp.core.aware.TaskEnhanceAware;
import org.dromara.dynamictp.core.notifier.manager.NotifyHelper;
@ -30,22 +31,16 @@ import org.dromara.dynamictp.core.reject.RejectHandlerGetter;
import org.dromara.dynamictp.core.spring.SpringExecutor;
import org.dromara.dynamictp.core.support.ExecutorAdapter;
import org.dromara.dynamictp.core.support.task.wrapper.TaskWrapper;
import org.slf4j.MDC;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static org.dromara.dynamictp.common.constant.DynamicTpConst.TRACE_ID;
/**
* Dynamic ThreadPoolExecutor, extending ThreadPoolExecutor, implements some new features
*
@ -197,16 +192,15 @@ public class DtpExecutor extends ThreadPoolExecutor
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
AwareManager.beforeExecute(this, t, r);
super.beforeExecute(t, r);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
AwareManager.afterExecute(this, r, t);
tryPrintError(r, t);
clearContext();
ExecutorUtil.tryExecAfterExecute(r, t);
}
@Override
@ -243,29 +237,6 @@ public class DtpExecutor extends ThreadPoolExecutor
setRejectedExecutionHandler(RejectHandlerGetter.getProxy(handler));
}
private void tryPrintError(Runnable r, Throwable t) {
if (Objects.nonNull(t)) {
log.error("DynamicTp execute, thread {} throw exception, traceId {}",
Thread.currentThread(), MDC.get(TRACE_ID), t);
return;
}
if (r instanceof FutureTask) {
try {
Future<?> future = (Future<?>) r;
future.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
log.error("DynamicTp execute, thread {} throw exception, traceId {}",
Thread.currentThread(), MDC.get(TRACE_ID), e);
}
}
}
private void clearContext() {
MDC.remove(TRACE_ID);
}
public String getThreadPoolName() {
return threadPoolName;
}
@ -405,4 +376,9 @@ public class DtpExecutor extends ThreadPoolExecutor
public void setAllowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) {
allowCoreThreadTimeOut(allowCoreThreadTimeOut);
}
@Override
public void preStartAllCoreThreads() {
super.prestartAllCoreThreads();
}
}

View File

@ -31,9 +31,9 @@ import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public class NamedThreadFactory implements ThreadFactory {
private final ThreadGroup group;
private String namePrefix;
private final String namePrefix;
private final ThreadGroup group;
/**
* is daemon thread.
@ -78,4 +78,8 @@ public class NamedThreadFactory implements ThreadFactory {
public String getNamePrefix() {
return namePrefix;
}
public void setNamePrefix(String namePrefix) {
this.namePrefix = namePrefix;
}
}

View File

@ -84,12 +84,12 @@ public class DtpBeanDefinitionRegistrar implements ImportBeanDefinitionRegistrar
BinderHelper.bindDtpProperties(environment, dtpProperties);
val executors = dtpProperties.getExecutors();
if (CollectionUtils.isEmpty(executors)) {
log.warn("DynamicTp registrar, no executors are configured.");
log.info("DynamicTp registrar, no executors are configured.");
return;
}
executors.forEach(e -> {
if (!e.isAutoCreateDtp()) {
if (!e.isAutoCreate()) {
return;
}
Class<?> executorTypeClass = ExecutorType.getClass(e.getExecutorType());

View File

@ -18,7 +18,6 @@
package org.dromara.dynamictp.core.support;
import lombok.extern.slf4j.Slf4j;
import org.dromara.dynamictp.core.executor.DtpExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.Objects;
@ -55,23 +54,15 @@ public class DtpLifecycleSupport {
* @param executorWrapper executor wrapper
*/
public static void destroy(ExecutorWrapper executorWrapper) {
if (executorWrapper.isDtpExecutor()) {
destroy((DtpExecutor) executorWrapper.getExecutor());
} else if (executorWrapper.isThreadPoolExecutor()) {
internalShutdown(((ThreadPoolExecutorAdapter) executorWrapper.getExecutor()).getOriginal(),
if (executorWrapper.isExecutorService()) {
ExecutorService executorService = (ExecutorService) executorWrapper.getExecutor().getOriginal();
internalShutdown(executorService,
executorWrapper.getThreadPoolName(),
true,
0);
executorWrapper.isWaitForTasksToCompleteOnShutdown(),
executorWrapper.getAwaitTerminationSeconds());
}
}
public static void destroy(DtpExecutor executor) {
internalShutdown(executor,
executor.getThreadPoolName(),
executor.isWaitForTasksToCompleteOnShutdown(),
executor.getAwaitTerminationSeconds());
}
public static void shutdownGracefulAsync(ExecutorService executor,
String threadPoolName,
int timeout) {

View File

@ -22,16 +22,20 @@ import lombok.Data;
import org.dromara.dynamictp.common.em.NotifyItemEnum;
import org.dromara.dynamictp.common.entity.NotifyItem;
import org.dromara.dynamictp.core.aware.AwareManager;
import org.dromara.dynamictp.core.aware.RejectHandlerAware;
import org.dromara.dynamictp.core.aware.TaskEnhanceAware;
import org.dromara.dynamictp.core.executor.DtpExecutor;
import org.dromara.dynamictp.core.notifier.capture.CapturedExecutor;
import org.dromara.dynamictp.core.notifier.manager.AlarmManager;
import org.dromara.dynamictp.core.reject.RejectHandlerGetter;
import org.dromara.dynamictp.core.support.task.wrapper.TaskWrapper;
import org.springframework.beans.BeanUtils;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
/**
@ -79,15 +83,33 @@ public class ExecutorWrapper {
private boolean preStartAllCoreThreads;
/**
* Thread pool stat provider
* If enhance reject.
*/
private ThreadPoolStatProvider threadPoolStatProvider;
private boolean rejectEnhanced = true;
/**
* Aware names
*/
private Set<String> awareNames = Sets.newHashSet();
/**
* Whether to wait for scheduled tasks to complete on shutdown,
* not interrupting running tasks and executing all tasks in the queue.
*/
protected boolean waitForTasksToCompleteOnShutdown = false;
/**
* The maximum number of seconds that this executor is supposed to block
* on shutdown in order to wait for remaining tasks to complete their execution
* before the rest of the container continues to shut down.
*/
protected int awaitTerminationSeconds = 0;
/**
* Thread pool stat provider
*/
private ThreadPoolStatProvider threadPoolStatProvider;
private ExecutorWrapper() {
}
@ -97,14 +119,17 @@ public class ExecutorWrapper {
* @param executor the DtpExecutor
*/
public ExecutorWrapper(DtpExecutor executor) {
this.executor = executor;
this.threadPoolName = executor.getThreadPoolName();
this.threadPoolAliasName = executor.getThreadPoolAliasName();
this.executor = executor;
this.notifyItems = executor.getNotifyItems();
this.notifyEnabled = executor.isNotifyEnabled();
this.platformIds = executor.getPlatformIds();
this.awareNames = executor.getAwareNames();
this.preStartAllCoreThreads = executor.isPreStartAllCoreThreads();
this.rejectEnhanced = executor.isRejectEnhanced();
this.waitForTasksToCompleteOnShutdown = executor.isWaitForTasksToCompleteOnShutdown();
this.awaitTerminationSeconds = executor.getAwaitTerminationSeconds();
this.threadPoolStatProvider = ThreadPoolStatProvider.of(this);
}
@ -155,8 +180,7 @@ public class ExecutorWrapper {
*/
public void initialize() {
if (isDtpExecutor()) {
DtpExecutor dtpExecutor = (DtpExecutor) getExecutor();
dtpExecutor.initialize();
((DtpExecutor) getExecutor()).initialize();
AwareManager.register(this);
} else if (isThreadPoolExecutor()) {
AwareManager.register(this);
@ -175,6 +199,10 @@ public class ExecutorWrapper {
return this.executor instanceof DtpExecutor;
}
public boolean isExecutorService() {
return this.executor.getOriginal() instanceof ExecutorService;
}
/**
* whether is ThreadPoolExecutor
*
@ -194,4 +222,16 @@ public class ExecutorWrapper {
((TaskEnhanceAware) executor.getOriginal()).setTaskWrappers(taskWrappers);
}
}
public void setRejectHandler(RejectedExecutionHandler handler) {
String rejectHandlerType = handler.getClass().getSimpleName();
if (executor.getOriginal() instanceof RejectHandlerAware) {
((RejectHandlerAware) executor.getOriginal()).setRejectHandlerType(rejectHandlerType);
}
if (isRejectEnhanced()) {
executor.setRejectedExecutionHandler(RejectHandlerGetter.getProxy(handler));
} else {
executor.setRejectedExecutionHandler(handler);
}
}
}

View File

@ -17,11 +17,13 @@
package org.dromara.dynamictp.core.support;
import org.dromara.dynamictp.common.util.ExecutorUtil;
import org.dromara.dynamictp.core.aware.AwareManager;
import org.dromara.dynamictp.core.aware.RejectHandlerAware;
import org.dromara.dynamictp.core.aware.TaskEnhanceAware;
import org.dromara.dynamictp.core.reject.RejectHandlerGetter;
import org.dromara.dynamictp.core.support.task.wrapper.TaskWrapper;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
@ -45,7 +47,7 @@ public class ScheduledThreadPoolExecutorProxy extends ScheduledThreadPoolExecuto
/**
* Reject handler type.
*/
private final String rejectHandlerType;
private String rejectHandlerType;
public ScheduledThreadPoolExecutorProxy(ScheduledThreadPoolExecutor executor) {
super(executor.getCorePoolSize(), executor.getThreadFactory());
@ -89,14 +91,15 @@ public class ScheduledThreadPoolExecutorProxy extends ScheduledThreadPoolExecuto
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
AwareManager.beforeExecute(this, t, r);
super.beforeExecute(t, r);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
AwareManager.afterExecute(this, r, t);
ExecutorUtil.tryExecAfterExecute(r, t);
}
@Override
@ -104,6 +107,11 @@ public class ScheduledThreadPoolExecutorProxy extends ScheduledThreadPoolExecuto
return rejectHandlerType;
}
@Override
public void setRejectHandlerType(String rejectHandlerType) {
this.rejectHandlerType = rejectHandlerType;
}
@Override
public List<TaskWrapper> getTaskWrappers() {
return taskWrappers;

View File

@ -17,6 +17,7 @@
package org.dromara.dynamictp.core.support;
import org.dromara.dynamictp.common.util.ExecutorUtil;
import org.dromara.dynamictp.core.aware.AwareManager;
import org.dromara.dynamictp.core.aware.RejectHandlerAware;
import org.dromara.dynamictp.core.aware.TaskEnhanceAware;
@ -43,7 +44,7 @@ public class ThreadPoolExecutorProxy extends ThreadPoolExecutor implements TaskE
/**
* Reject handler type.
*/
private final String rejectHandlerType;
private String rejectHandlerType;
public ThreadPoolExecutorProxy(ThreadPoolExecutor executor) {
super(executor.getCorePoolSize(), executor.getMaximumPoolSize(),
@ -63,14 +64,15 @@ public class ThreadPoolExecutorProxy extends ThreadPoolExecutor implements TaskE
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
AwareManager.beforeExecute(this, t, r);
super.beforeExecute(t, r);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
AwareManager.afterExecute(this, r, t);
ExecutorUtil.tryExecAfterExecute(r, t);
}
@Override
@ -87,4 +89,9 @@ public class ThreadPoolExecutorProxy extends ThreadPoolExecutor implements TaskE
public String getRejectHandlerType() {
return rejectHandlerType;
}
@Override
public void setRejectHandlerType(String rejectHandlerType) {
this.rejectHandlerType = rejectHandlerType;
}
}

View File

@ -18,6 +18,7 @@
package org.dromara.dynamictp.core.support.task.runnable;
import lombok.extern.slf4j.Slf4j;
import org.dromara.dynamictp.common.util.ExecutorUtil;
import org.dromara.dynamictp.core.aware.AwareManager;
import java.util.Objects;
import java.util.concurrent.Executor;
@ -58,6 +59,7 @@ public class EnhancedRunnable implements Runnable {
throw e;
} finally {
AwareManager.afterExecute(executor, runnable, t);
ExecutorUtil.tryExecAfterExecute(runnable, t);
}
}
}

View File

@ -20,6 +20,7 @@ package org.dromara.dynamictp.starter.adapter.webserver.tomcat;
import lombok.extern.slf4j.Slf4j;
import org.apache.tomcat.util.threads.TaskQueue;
import org.apache.tomcat.util.threads.ThreadPoolExecutor;
import org.dromara.dynamictp.common.util.ExecutorUtil;
import org.dromara.dynamictp.common.util.ReflectionUtil;
import org.dromara.dynamictp.core.aware.AwareManager;
import org.dromara.dynamictp.core.aware.RejectHandlerAware;
@ -86,14 +87,15 @@ public class TomcatExecutorProxy extends ThreadPoolExecutor implements TaskEnhan
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
AwareManager.beforeExecute(this, t, r);
super.beforeExecute(t, r);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
AwareManager.afterExecute(this, r, t);
ExecutorUtil.tryExecAfterExecute(r, t);
}
@Override

View File

@ -17,14 +17,10 @@
package org.dromara.dynamictp.starter.common.binder;
import cn.hutool.core.util.ReflectUtil;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.dromara.dynamictp.common.entity.DtpExecutorProps;
import org.dromara.dynamictp.common.properties.DtpProperties;
import org.dromara.dynamictp.common.util.ReflectionUtil;
import org.dromara.dynamictp.common.util.DtpPropertiesBinderUtil;
import org.dromara.dynamictp.core.spring.PropertiesBinder;
import org.springframework.beans.MutablePropertyValues;
import org.springframework.beans.PropertyValues;
@ -39,10 +35,7 @@ import org.springframework.core.env.PropertyResolver;
import java.lang.reflect.Constructor;
import java.lang.reflect.Method;
import java.util.Map;
import java.util.Objects;
import static org.dromara.dynamictp.common.constant.DynamicTpConst.EXECUTORS_CONFIG_PREFIX;
import static org.dromara.dynamictp.common.constant.DynamicTpConst.GLOBAL_CONFIG_PREFIX;
import static org.dromara.dynamictp.common.constant.DynamicTpConst.MAIN_PROPERTIES_PREFIX;
/**
@ -52,7 +45,6 @@ import static org.dromara.dynamictp.common.constant.DynamicTpConst.MAIN_PROPERTI
* @since 1.0.3
**/
@Slf4j
@SuppressWarnings("all")
public class SpringBootPropertiesBinder implements PropertiesBinder {
@Override
@ -81,7 +73,7 @@ public class SpringBootPropertiesBinder implements PropertiesBinder {
@Override
public void afterBind(Object source, DtpProperties dtpProperties) {
tryResetWithGlobalConfig(source, dtpProperties);
DtpPropertiesBinderUtil.tryResetWithGlobalConfig(source, dtpProperties);
}
private void doBindIn2X(Map<?, Object> properties, DtpProperties dtpProperties) {
@ -132,70 +124,5 @@ public class SpringBootPropertiesBinder implements PropertiesBinder {
throw new RuntimeException(e);
}
}
/**
* Assign global environment variable to property
*
* @param environment
* @param dtpProperties
*/
private void tryResetWithGlobalConfig(Object source, DtpProperties dtpProperties) {
if (Objects.isNull(dtpProperties.getGlobalExecutorProps()) ||
CollectionUtils.isEmpty(dtpProperties.getExecutors())) {
return;
}
val fields = ReflectionUtil.getAllFields(DtpExecutorProps.class);
if (CollectionUtils.isEmpty(fields)) {
return;
}
final int[] executorIndex = {0};
dtpProperties.getExecutors().forEach(executor -> {
fields.forEach(field -> {
Object executorFieldVal = getProperty(EXECUTORS_CONFIG_PREFIX + executorIndex[0] + "]." + field.getName(), source);
if (Objects.nonNull(executorFieldVal)) {
return;
}
Object globalFieldVal = getProperty(GLOBAL_CONFIG_PREFIX + field.getName(), source);
if (Objects.isNull(globalFieldVal)) {
return;
}
ReflectUtil.setFieldValue(executor, field, globalFieldVal);
});
val globalExecutorProps = dtpProperties.getGlobalExecutorProps();
if (CollectionUtils.isEmpty(executor.getTaskWrapperNames()) &&
CollectionUtils.isNotEmpty(globalExecutorProps.getTaskWrapperNames())) {
executor.setTaskWrapperNames(globalExecutorProps.getTaskWrapperNames());
}
if (CollectionUtils.isEmpty(executor.getPlatformIds()) &&
CollectionUtils.isNotEmpty(globalExecutorProps.getPlatformIds())) {
executor.setPlatformIds(globalExecutorProps.getPlatformIds());
}
if (CollectionUtils.isEmpty(executor.getNotifyItems()) &&
CollectionUtils.isNotEmpty(globalExecutorProps.getNotifyItems())) {
executor.setNotifyItems(globalExecutorProps.getNotifyItems());
}
if (CollectionUtils.isEmpty(executor.getAwareNames()) &&
CollectionUtils.isNotEmpty(globalExecutorProps.getAwareNames())) {
executor.setAwareNames(globalExecutorProps.getAwareNames());
}
if (CollectionUtils.isEmpty(executor.getPluginNames()) &&
CollectionUtils.isNotEmpty(globalExecutorProps.getPluginNames())) {
executor.setPluginNames(globalExecutorProps.getPluginNames());
}
executorIndex[0]++;
});
}
private Object getProperty(String key, Object environment) {
if (environment instanceof Environment) {
Environment env = (Environment) environment;
return env.getProperty(key);
} else if (environment instanceof Map) {
Map<?, Object> properties = (Map<?, Object>) environment;
return properties.get(key);
}
return null;
}
}