mirror of https://github.com/apache/dubbo.git
Add error code 2-2, 6-1, 6-2, 0-1. (#10391)
* Replace logger implementation of dubbo-remoting-nettyX module. * Replace logger implementation of dubbo-remoting-nettyX module. * Add error code 2-2 (in AbstractDirectory), 6-1 and 6-2 (in Netty implementation). * Swap the position of modifiers. * Adding error code 0-1, and expanding the blocking-queue- selecting logic. * Adding message of error code 0-1. * Change created blocking queue's variable name in FixedThreadPool.
This commit is contained in:
parent
5482f3b68d
commit
a36c21c277
|
@ -20,7 +20,7 @@ import org.apache.dubbo.common.URL;
|
||||||
import org.apache.dubbo.common.Version;
|
import org.apache.dubbo.common.Version;
|
||||||
import org.apache.dubbo.common.config.Configuration;
|
import org.apache.dubbo.common.config.Configuration;
|
||||||
import org.apache.dubbo.common.config.ConfigurationUtils;
|
import org.apache.dubbo.common.config.ConfigurationUtils;
|
||||||
import org.apache.dubbo.common.logger.Logger;
|
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
|
||||||
import org.apache.dubbo.common.logger.LoggerFactory;
|
import org.apache.dubbo.common.logger.LoggerFactory;
|
||||||
import org.apache.dubbo.common.threadpool.manager.FrameworkExecutorRepository;
|
import org.apache.dubbo.common.threadpool.manager.FrameworkExecutorRepository;
|
||||||
import org.apache.dubbo.common.utils.ConcurrentHashSet;
|
import org.apache.dubbo.common.utils.ConcurrentHashSet;
|
||||||
|
@ -71,7 +71,7 @@ import static org.apache.dubbo.rpc.cluster.Constants.REFER_KEY;
|
||||||
public abstract class AbstractDirectory<T> implements Directory<T> {
|
public abstract class AbstractDirectory<T> implements Directory<T> {
|
||||||
|
|
||||||
// logger
|
// logger
|
||||||
private static final Logger logger = LoggerFactory.getLogger(AbstractDirectory.class);
|
private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(AbstractDirectory.class);
|
||||||
|
|
||||||
private final URL url;
|
private final URL url;
|
||||||
|
|
||||||
|
@ -193,7 +193,10 @@ public abstract class AbstractDirectory<T> implements Directory<T> {
|
||||||
|
|
||||||
List<Invoker<T>> routedResult = doList(availableInvokers, invocation);
|
List<Invoker<T>> routedResult = doList(availableInvokers, invocation);
|
||||||
if (routedResult.isEmpty()) {
|
if (routedResult.isEmpty()) {
|
||||||
logger.warn("No provider available after connectivity filter for the service " + getConsumerUrl().getServiceKey()
|
// 2-2 - No provider available.
|
||||||
|
|
||||||
|
logger.warn("2-2", "provider server or registry center crashed", "",
|
||||||
|
"No provider available after connectivity filter for the service " + getConsumerUrl().getServiceKey()
|
||||||
+ " All validInvokers' size: " + validInvokers.size()
|
+ " All validInvokers' size: " + validInvokers.size()
|
||||||
+ " All routed invokers' size: " + routedResult.size()
|
+ " All routed invokers' size: " + routedResult.size()
|
||||||
+ " All invokers' size: " + invokers.size()
|
+ " All invokers' size: " + invokers.size()
|
||||||
|
|
|
@ -14,10 +14,11 @@
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.apache.dubbo.common.threadpool.support;
|
package org.apache.dubbo.common.threadpool.support;
|
||||||
|
|
||||||
import org.apache.dubbo.common.URL;
|
import org.apache.dubbo.common.URL;
|
||||||
import org.apache.dubbo.common.logger.Logger;
|
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
|
||||||
import org.apache.dubbo.common.logger.LoggerFactory;
|
import org.apache.dubbo.common.logger.LoggerFactory;
|
||||||
import org.apache.dubbo.common.threadpool.event.ThreadPoolExhaustedEvent;
|
import org.apache.dubbo.common.threadpool.event.ThreadPoolExhaustedEvent;
|
||||||
import org.apache.dubbo.common.threadpool.event.ThreadPoolExhaustedListener;
|
import org.apache.dubbo.common.threadpool.event.ThreadPoolExhaustedListener;
|
||||||
|
@ -47,7 +48,7 @@ import static org.apache.dubbo.common.constants.CommonConstants.OS_WIN_PREFIX;
|
||||||
*/
|
*/
|
||||||
public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {
|
public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {
|
||||||
|
|
||||||
protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class);
|
protected static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(AbortPolicyWithReport.class);
|
||||||
|
|
||||||
private final String threadName;
|
private final String threadName;
|
||||||
|
|
||||||
|
@ -82,9 +83,13 @@ public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {
|
||||||
e.getLargestPoolSize(),
|
e.getLargestPoolSize(),
|
||||||
e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
|
e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
|
||||||
url.getProtocol(), url.getIp(), url.getPort());
|
url.getProtocol(), url.getIp(), url.getPort());
|
||||||
logger.warn(msg);
|
|
||||||
|
// 0-1 - Thread pool is EXHAUSTED!
|
||||||
|
logger.warn("0-1", "too much client requesting provider", "", msg);
|
||||||
|
|
||||||
dumpJStack();
|
dumpJStack();
|
||||||
dispatchThreadPoolExhaustedEvent(msg);
|
dispatchThreadPoolExhaustedEvent(msg);
|
||||||
|
|
||||||
throw new RejectedExecutionException(msg);
|
throw new RejectedExecutionException(msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -51,7 +51,7 @@ public class EagerThreadPool implements ThreadPool {
|
||||||
int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE);
|
int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE);
|
||||||
|
|
||||||
// init queue and executor
|
// init queue and executor
|
||||||
TaskQueue<Runnable> taskQueue = new TaskQueue<Runnable>(queues <= 0 ? 1 : queues);
|
TaskQueue<Runnable> taskQueue = new TaskQueue<>(queues <= 0 ? 1 : queues);
|
||||||
EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores,
|
EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores,
|
||||||
threads,
|
threads,
|
||||||
alive,
|
alive,
|
||||||
|
|
|
@ -22,6 +22,7 @@ import org.apache.dubbo.common.threadpool.MemorySafeLinkedBlockingQueue;
|
||||||
import org.apache.dubbo.common.threadpool.ThreadPool;
|
import org.apache.dubbo.common.threadpool.ThreadPool;
|
||||||
import org.apache.dubbo.common.threadpool.support.AbortPolicyWithReport;
|
import org.apache.dubbo.common.threadpool.support.AbortPolicyWithReport;
|
||||||
|
|
||||||
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
import java.util.concurrent.SynchronousQueue;
|
import java.util.concurrent.SynchronousQueue;
|
||||||
|
@ -47,10 +48,18 @@ public class FixedThreadPool implements ThreadPool {
|
||||||
String name = url.getParameter(THREAD_NAME_KEY, (String) url.getAttribute(THREAD_NAME_KEY, DEFAULT_THREAD_NAME));
|
String name = url.getParameter(THREAD_NAME_KEY, (String) url.getAttribute(THREAD_NAME_KEY, DEFAULT_THREAD_NAME));
|
||||||
int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
|
int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
|
||||||
int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
|
int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
|
||||||
return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
|
|
||||||
queues == 0 ? new SynchronousQueue<Runnable>() :
|
BlockingQueue<Runnable> blockingQueue;
|
||||||
(queues < 0 ? new MemorySafeLinkedBlockingQueue<Runnable>()
|
|
||||||
: new LinkedBlockingQueue<Runnable>(queues)),
|
if (queues == 0) {
|
||||||
|
blockingQueue = new SynchronousQueue<>();
|
||||||
|
} else if (queues < 0) {
|
||||||
|
blockingQueue = new MemorySafeLinkedBlockingQueue<>();
|
||||||
|
} else {
|
||||||
|
blockingQueue = new LinkedBlockingQueue<>(queues);
|
||||||
|
}
|
||||||
|
|
||||||
|
return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS, blockingQueue,
|
||||||
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
|
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -18,7 +18,7 @@ package org.apache.dubbo.remoting.transport.netty;
|
||||||
|
|
||||||
import org.apache.dubbo.common.URL;
|
import org.apache.dubbo.common.URL;
|
||||||
import org.apache.dubbo.common.Version;
|
import org.apache.dubbo.common.Version;
|
||||||
import org.apache.dubbo.common.logger.Logger;
|
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
|
||||||
import org.apache.dubbo.common.logger.LoggerFactory;
|
import org.apache.dubbo.common.logger.LoggerFactory;
|
||||||
import org.apache.dubbo.common.utils.NamedThreadFactory;
|
import org.apache.dubbo.common.utils.NamedThreadFactory;
|
||||||
import org.apache.dubbo.common.utils.NetUtils;
|
import org.apache.dubbo.common.utils.NetUtils;
|
||||||
|
@ -44,7 +44,7 @@ import java.util.concurrent.TimeUnit;
|
||||||
*/
|
*/
|
||||||
public class NettyClient extends AbstractClient {
|
public class NettyClient extends AbstractClient {
|
||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(NettyClient.class);
|
private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(NettyClient.class);
|
||||||
|
|
||||||
// ChannelFactory's closure has a DirectMemory leak, using static to avoid
|
// ChannelFactory's closure has a DirectMemory leak, using static to avoid
|
||||||
// https://issues.jboss.org/browse/NETTY-424
|
// https://issues.jboss.org/browse/NETTY-424
|
||||||
|
@ -121,13 +121,26 @@ public class NettyClient extends AbstractClient {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if (future.getCause() != null) {
|
} else if (future.getCause() != null) {
|
||||||
throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
|
Throwable cause = future.getCause();
|
||||||
+ getRemoteAddress() + ", error message is:" + future.getCause().getMessage(), future.getCause());
|
|
||||||
|
RemotingException remotingException = new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
|
||||||
|
+ getRemoteAddress() + ", error message is:" + cause.getMessage(), cause);
|
||||||
|
|
||||||
|
// 6-1 - Failed to connect to provider server by other reason.
|
||||||
|
logger.error("6-1", "network disconnected", "", "Failed to connect to provider server by other reason.", cause);
|
||||||
|
|
||||||
|
throw remotingException;
|
||||||
} else {
|
} else {
|
||||||
throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
|
|
||||||
|
RemotingException remotingException = new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
|
||||||
+ getRemoteAddress() + " client-side timeout "
|
+ getRemoteAddress() + " client-side timeout "
|
||||||
+ getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "
|
+ getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "
|
||||||
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());
|
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());
|
||||||
|
|
||||||
|
// 6-2 - Client-side timeout.
|
||||||
|
logger.error("6-2", "provider crash", "", "Client-side timeout.", remotingException);
|
||||||
|
|
||||||
|
throw remotingException;
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
if (!isConnected()) {
|
if (!isConnected()) {
|
||||||
|
|
|
@ -16,11 +16,10 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.dubbo.remoting.transport.netty4;
|
package org.apache.dubbo.remoting.transport.netty4;
|
||||||
|
|
||||||
import io.netty.util.concurrent.EventExecutorGroup;
|
|
||||||
import org.apache.dubbo.common.URL;
|
import org.apache.dubbo.common.URL;
|
||||||
import org.apache.dubbo.common.Version;
|
import org.apache.dubbo.common.Version;
|
||||||
import org.apache.dubbo.common.config.ConfigurationUtils;
|
import org.apache.dubbo.common.config.ConfigurationUtils;
|
||||||
import org.apache.dubbo.common.logger.Logger;
|
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
|
||||||
import org.apache.dubbo.common.logger.LoggerFactory;
|
import org.apache.dubbo.common.logger.LoggerFactory;
|
||||||
import org.apache.dubbo.common.resource.GlobalResourceInitializer;
|
import org.apache.dubbo.common.resource.GlobalResourceInitializer;
|
||||||
import org.apache.dubbo.common.utils.NetUtils;
|
import org.apache.dubbo.common.utils.NetUtils;
|
||||||
|
@ -42,6 +41,7 @@ import io.netty.channel.EventLoopGroup;
|
||||||
import io.netty.channel.socket.SocketChannel;
|
import io.netty.channel.socket.SocketChannel;
|
||||||
import io.netty.handler.proxy.Socks5ProxyHandler;
|
import io.netty.handler.proxy.Socks5ProxyHandler;
|
||||||
import io.netty.handler.timeout.IdleStateHandler;
|
import io.netty.handler.timeout.IdleStateHandler;
|
||||||
|
import io.netty.util.concurrent.EventExecutorGroup;
|
||||||
|
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
|
|
||||||
|
@ -62,7 +62,7 @@ public class NettyClient extends AbstractClient {
|
||||||
|
|
||||||
private static final String DEFAULT_SOCKS_PROXY_PORT = "1080";
|
private static final String DEFAULT_SOCKS_PROXY_PORT = "1080";
|
||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(NettyClient.class);
|
private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(NettyClient.class);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* netty client bootstrap
|
* netty client bootstrap
|
||||||
|
@ -186,13 +186,32 @@ public class NettyClient extends AbstractClient {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if (future.cause() != null) {
|
} else if (future.cause() != null) {
|
||||||
throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
|
|
||||||
+ getRemoteAddress() + ", error message is:" + future.cause().getMessage(), future.cause());
|
Throwable cause = future.cause();
|
||||||
|
|
||||||
|
// 6-1 Failed to connect to provider server by other reason.
|
||||||
|
|
||||||
|
RemotingException remotingException = new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
|
||||||
|
+ getRemoteAddress() + ", error message is:" + cause.getMessage(), cause);
|
||||||
|
|
||||||
|
logger.error("6-1", "network disconnected", "",
|
||||||
|
"Failed to connect to provider server by other reason.", cause);
|
||||||
|
|
||||||
|
throw remotingException;
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
|
|
||||||
|
// 6-2 Client-side timeout
|
||||||
|
|
||||||
|
RemotingException remotingException = new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
|
||||||
+ getRemoteAddress() + " client-side timeout "
|
+ getRemoteAddress() + " client-side timeout "
|
||||||
+ getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "
|
+ getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "
|
||||||
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());
|
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());
|
||||||
|
|
||||||
|
logger.error("6-2", "provider crash", "",
|
||||||
|
"Client-side timeout.", remotingException);
|
||||||
|
|
||||||
|
throw remotingException;
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
// just add new valid channel to NettyChannel's cache
|
// just add new valid channel to NettyChannel's cache
|
||||||
|
|
|
@ -31,7 +31,7 @@ public class Fastjson2CreatorManager implements ScopeClassLoaderListener<Framewo
|
||||||
/**
|
/**
|
||||||
* An empty classLoader used when classLoader is system classLoader. Prevent the NPE.
|
* An empty classLoader used when classLoader is system classLoader. Prevent the NPE.
|
||||||
*/
|
*/
|
||||||
private final static ClassLoader SYSTEM_CLASSLOADER_KEY = new ClassLoader() {};
|
private static final ClassLoader SYSTEM_CLASSLOADER_KEY = new ClassLoader() {};
|
||||||
|
|
||||||
private final Map<ClassLoader, ObjectReaderCreatorASM> readerMap = new ConcurrentHashMap<>();
|
private final Map<ClassLoader, ObjectReaderCreatorASM> readerMap = new ConcurrentHashMap<>();
|
||||||
private final Map<ClassLoader, ObjectWriterCreatorASM> writerMap = new ConcurrentHashMap<>();
|
private final Map<ClassLoader, ObjectWriterCreatorASM> writerMap = new ConcurrentHashMap<>();
|
||||||
|
|
|
@ -27,7 +27,7 @@ import java.lang.reflect.Type;
|
||||||
* Java object input implementation
|
* Java object input implementation
|
||||||
*/
|
*/
|
||||||
public class JavaObjectInput extends NativeJavaObjectInput {
|
public class JavaObjectInput extends NativeJavaObjectInput {
|
||||||
public final static int MAX_BYTE_ARRAY_LENGTH = 8 * 1024 * 1024;
|
public static final int MAX_BYTE_ARRAY_LENGTH = 8 * 1024 * 1024;
|
||||||
|
|
||||||
public JavaObjectInput(InputStream is) throws IOException {
|
public JavaObjectInput(InputStream is) throws IOException {
|
||||||
super(new ObjectInputStream(is));
|
super(new ObjectInputStream(is));
|
||||||
|
|
|
@ -39,7 +39,7 @@ import static org.apache.dubbo.common.serialize.Constants.JAVA_SERIALIZATION_ID;
|
||||||
*/
|
*/
|
||||||
public class JavaSerialization implements Serialization {
|
public class JavaSerialization implements Serialization {
|
||||||
private static final Logger logger = LoggerFactory.getLogger(JavaSerialization.class);
|
private static final Logger logger = LoggerFactory.getLogger(JavaSerialization.class);
|
||||||
private final static AtomicBoolean warn = new AtomicBoolean(false);
|
private static final AtomicBoolean warn = new AtomicBoolean(false);
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public byte getContentTypeId() {
|
public byte getContentTypeId() {
|
||||||
|
|
|
@ -41,7 +41,7 @@ import static org.apache.dubbo.common.serialize.Constants.NATIVE_JAVA_SERIALIZAT
|
||||||
*/
|
*/
|
||||||
public class NativeJavaSerialization implements Serialization {
|
public class NativeJavaSerialization implements Serialization {
|
||||||
private static final Logger logger = LoggerFactory.getLogger(JavaSerialization.class);
|
private static final Logger logger = LoggerFactory.getLogger(JavaSerialization.class);
|
||||||
private final static AtomicBoolean warn = new AtomicBoolean(false);
|
private static final AtomicBoolean warn = new AtomicBoolean(false);
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public byte getContentTypeId() {
|
public byte getContentTypeId() {
|
||||||
|
|
Loading…
Reference in New Issue