mirror of https://github.com/apache/iotdb
Feat/updated cli (#13194)
This commit is contained in:
parent
8f54b80812
commit
38c1d591ac
Binary file not shown.
After Width: | Height: | Size: 6.6 KiB |
|
@ -22,6 +22,7 @@ package org.apache.iotdb.confignode.service;
|
|||
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
|
||||
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
|
||||
import org.apache.iotdb.common.rpc.thrift.TSStatus;
|
||||
import org.apache.iotdb.commons.ServerCommandLine;
|
||||
import org.apache.iotdb.commons.client.ClientManagerMetrics;
|
||||
import org.apache.iotdb.commons.concurrent.ThreadModule;
|
||||
import org.apache.iotdb.commons.concurrent.ThreadName;
|
||||
|
@ -29,7 +30,10 @@ import org.apache.iotdb.commons.concurrent.ThreadPoolMetrics;
|
|||
import org.apache.iotdb.commons.conf.CommonConfig;
|
||||
import org.apache.iotdb.commons.conf.CommonDescriptor;
|
||||
import org.apache.iotdb.commons.conf.IoTDBConstant;
|
||||
import org.apache.iotdb.commons.exception.BadNodeUrlException;
|
||||
import org.apache.iotdb.commons.exception.ConfigurationException;
|
||||
import org.apache.iotdb.commons.exception.IllegalPathException;
|
||||
import org.apache.iotdb.commons.exception.IoTDBException;
|
||||
import org.apache.iotdb.commons.exception.StartupException;
|
||||
import org.apache.iotdb.commons.service.JMXService;
|
||||
import org.apache.iotdb.commons.service.RegisterManager;
|
||||
|
@ -43,6 +47,8 @@ import org.apache.iotdb.confignode.client.sync.SyncConfigNodeClientPool;
|
|||
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
|
||||
import org.apache.iotdb.confignode.conf.ConfigNodeConstant;
|
||||
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
|
||||
import org.apache.iotdb.confignode.conf.ConfigNodeRemoveCheck;
|
||||
import org.apache.iotdb.confignode.conf.ConfigNodeStartupCheck;
|
||||
import org.apache.iotdb.confignode.conf.SystemPropertiesUtils;
|
||||
import org.apache.iotdb.confignode.manager.ConfigManager;
|
||||
import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
|
||||
|
@ -74,7 +80,7 @@ import java.util.Arrays;
|
|||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class ConfigNode implements ConfigNodeMBean {
|
||||
public class ConfigNode extends ServerCommandLine implements ConfigNodeMBean {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(ConfigNode.class);
|
||||
|
||||
|
@ -101,11 +107,13 @@ public class ConfigNode implements ConfigNodeMBean {
|
|||
|
||||
protected ConfigManager configManager;
|
||||
|
||||
protected ConfigNode() {
|
||||
public ConfigNode() {
|
||||
super("ConfigNode");
|
||||
// We do not init anything here, so that we can re-initialize the instance in IT.
|
||||
ConfigNodeHolder.instance = this;
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
public static void main(String[] args) throws Exception {
|
||||
LOGGER.info(
|
||||
"{} environment variables: {}",
|
||||
ConfigNodeConstant.GLOBAL_NAME,
|
||||
|
@ -114,7 +122,60 @@ public class ConfigNode implements ConfigNodeMBean {
|
|||
"{} default charset is: {}",
|
||||
ConfigNodeConstant.GLOBAL_NAME,
|
||||
Charset.defaultCharset().displayName());
|
||||
new ConfigNodeCommandLine().doMain(args);
|
||||
ConfigNode configNode = new ConfigNode();
|
||||
int returnCode = configNode.run(args);
|
||||
if (returnCode != 0) {
|
||||
System.exit(returnCode);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void start() throws IoTDBException {
|
||||
try {
|
||||
// Do ConfigNode startup checks
|
||||
LOGGER.info("Starting IoTDB {}", IoTDBConstant.VERSION_WITH_BUILD);
|
||||
ConfigNodeStartupCheck checks = new ConfigNodeStartupCheck(IoTDBConstant.CN_ROLE);
|
||||
checks.startUpCheck();
|
||||
} catch (StartupException | ConfigurationException | IOException e) {
|
||||
LOGGER.error("Meet error when doing start checking", e);
|
||||
throw new IoTDBException("Error starting", -1);
|
||||
}
|
||||
active();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void remove(Long nodeId) throws IoTDBException {
|
||||
// If the nodeId was null, this is a shorthand for removing the current dataNode.
|
||||
// In this case we need to find our nodeId.
|
||||
if (nodeId == null) {
|
||||
nodeId = (long) CONF.getConfigNodeId();
|
||||
}
|
||||
|
||||
try {
|
||||
LOGGER.info("Starting to remove ConfigNode with node-id {}", nodeId);
|
||||
|
||||
try {
|
||||
TConfigNodeLocation removeConfigNodeLocation =
|
||||
ConfigNodeRemoveCheck.getInstance().removeCheck(Long.toString(nodeId));
|
||||
if (removeConfigNodeLocation == null) {
|
||||
LOGGER.error(
|
||||
"The ConfigNode to be removed is not in the cluster, or the input format is incorrect.");
|
||||
return;
|
||||
}
|
||||
|
||||
ConfigNodeRemoveCheck.getInstance().removeConfigNode(removeConfigNodeLocation);
|
||||
} catch (BadNodeUrlException e) {
|
||||
LOGGER.warn("No ConfigNodes need to be removed.", e);
|
||||
return;
|
||||
}
|
||||
|
||||
LOGGER.info(
|
||||
"ConfigNode: {} is removed. If the confignode data directory is no longer needed, you can delete it manually.",
|
||||
nodeId);
|
||||
} catch (IOException e) {
|
||||
LOGGER.error("Meet error when doing remove ConfigNode", e);
|
||||
throw new IoTDBException("Error removing", -1);
|
||||
}
|
||||
}
|
||||
|
||||
public void active() {
|
||||
|
@ -465,7 +526,7 @@ public class ConfigNode implements ConfigNodeMBean {
|
|||
|
||||
private static class ConfigNodeHolder {
|
||||
|
||||
private static ConfigNode instance = new ConfigNode();
|
||||
private static ConfigNode instance;
|
||||
|
||||
private ConfigNodeHolder() {
|
||||
// Empty constructor
|
||||
|
|
|
@ -1,128 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.apache.iotdb.confignode.service;
|
||||
|
||||
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
|
||||
import org.apache.iotdb.commons.ServerCommandLine;
|
||||
import org.apache.iotdb.commons.conf.IoTDBConstant;
|
||||
import org.apache.iotdb.commons.exception.BadNodeUrlException;
|
||||
import org.apache.iotdb.commons.exception.ConfigurationException;
|
||||
import org.apache.iotdb.commons.exception.StartupException;
|
||||
import org.apache.iotdb.confignode.conf.ConfigNodeRemoveCheck;
|
||||
import org.apache.iotdb.confignode.conf.ConfigNodeStartupCheck;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.apache.iotdb.confignode.conf.ConfigNodeConstant.REMOVE_CONFIGNODE_USAGE;
|
||||
|
||||
public class ConfigNodeCommandLine extends ServerCommandLine {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(ConfigNodeCommandLine.class);
|
||||
|
||||
// Start ConfigNode
|
||||
private static final String MODE_START = "-s";
|
||||
// Remove ConfigNode
|
||||
private static final String MODE_REMOVE = "-r";
|
||||
|
||||
private static final String USAGE =
|
||||
"Usage: <-s|-r> "
|
||||
+ "[-D{} <configure folder>] \n"
|
||||
+ "-s: Start the ConfigNode and join to the cluster\n"
|
||||
+ "-r: Remove the ConfigNode out of the cluster\n";
|
||||
|
||||
@Override
|
||||
protected String getUsage() {
|
||||
return USAGE;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int run(String[] args) {
|
||||
String mode;
|
||||
if (args.length < 1) {
|
||||
mode = MODE_START;
|
||||
LOGGER.warn(
|
||||
"ConfigNode does not specify a startup mode. The default startup mode {} will be used",
|
||||
MODE_START);
|
||||
} else {
|
||||
mode = args[0];
|
||||
}
|
||||
|
||||
LOGGER.info("Running mode {}", mode);
|
||||
if (MODE_START.equals(mode)) {
|
||||
try {
|
||||
// Do ConfigNode startup checks
|
||||
LOGGER.info("Starting IoTDB {}", IoTDBConstant.VERSION_WITH_BUILD);
|
||||
ConfigNodeStartupCheck checks = new ConfigNodeStartupCheck(IoTDBConstant.CN_ROLE);
|
||||
checks.startUpCheck();
|
||||
} catch (StartupException | ConfigurationException | IOException e) {
|
||||
LOGGER.error("Meet error when doing start checking", e);
|
||||
return -1;
|
||||
}
|
||||
activeConfigNodeInstance();
|
||||
} else if (MODE_REMOVE.equals(mode)) {
|
||||
// remove ConfigNode
|
||||
try {
|
||||
doRemoveConfigNode(args);
|
||||
} catch (IOException e) {
|
||||
LOGGER.error("Meet error when doing remove ConfigNode", e);
|
||||
return -1;
|
||||
}
|
||||
} else {
|
||||
LOGGER.error("Unsupported startup mode: {}", mode);
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
protected void activeConfigNodeInstance() {
|
||||
ConfigNode.getInstance().active();
|
||||
}
|
||||
|
||||
protected void doRemoveConfigNode(String[] args) throws IOException {
|
||||
|
||||
if (args.length != 2) {
|
||||
LOGGER.info(REMOVE_CONFIGNODE_USAGE);
|
||||
return;
|
||||
}
|
||||
|
||||
LOGGER.info("Starting to remove ConfigNode, parameter: {}, {}", args[0], args[1]);
|
||||
|
||||
try {
|
||||
TConfigNodeLocation removeConfigNodeLocation =
|
||||
ConfigNodeRemoveCheck.getInstance().removeCheck(args[1]);
|
||||
if (removeConfigNodeLocation == null) {
|
||||
LOGGER.error(
|
||||
"The ConfigNode to be removed is not in the cluster, or the input format is incorrect.");
|
||||
return;
|
||||
}
|
||||
|
||||
ConfigNodeRemoveCheck.getInstance().removeConfigNode(removeConfigNodeLocation);
|
||||
} catch (BadNodeUrlException e) {
|
||||
LOGGER.warn("No ConfigNodes need to be removed.", e);
|
||||
return;
|
||||
}
|
||||
|
||||
LOGGER.info(
|
||||
"ConfigNode: {} is removed. If the confignode data directory is no longer needed, you can delete it manually.",
|
||||
args[1]);
|
||||
}
|
||||
}
|
|
@ -67,7 +67,7 @@ public class CreateCQProcedureTest {
|
|||
Mockito.when(cqManager.getExecutor()).thenReturn(executor);
|
||||
ConfigManager configManager = Mockito.mock(ConfigManager.class);
|
||||
Mockito.when(configManager.getCQManager()).thenReturn(cqManager);
|
||||
ConfigNode configNode = ConfigNode.getInstance();
|
||||
ConfigNode configNode = new ConfigNode();
|
||||
configNode.setConfigManager(configManager);
|
||||
|
||||
try {
|
||||
|
|
|
@ -25,8 +25,6 @@ IF "%~1"=="--help" (
|
|||
echo Usage:
|
||||
echo Remove the DataNode with datanode_id
|
||||
echo ./sbin/remove-datanode.bat [datanode_id]
|
||||
echo Remove the DataNode with address:port
|
||||
echo ./sbin/remove-datanode.bat [dn_rpc_address:dn_rpc_port]
|
||||
EXIT /B 0
|
||||
)
|
||||
|
||||
|
|
|
@ -24,8 +24,6 @@ if [ "$#" -eq 1 ] && [ "$1" == "--help" ]; then
|
|||
echo "Usage:"
|
||||
echo "Remove the DataNode with datanode_id"
|
||||
echo "./sbin/remove-datanode.sh [datanode_id]"
|
||||
echo "Remove the DataNode with address:port"
|
||||
echo "./sbin/remove-datanode.sh [dn_rpc_address:dn_rpc_port]"
|
||||
exit 0
|
||||
fi
|
||||
|
||||
|
|
|
@ -26,12 +26,14 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
|
|||
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
|
||||
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
|
||||
import org.apache.iotdb.common.rpc.thrift.TNodeResource;
|
||||
import org.apache.iotdb.commons.ServerCommandLine;
|
||||
import org.apache.iotdb.commons.client.exception.ClientManagerException;
|
||||
import org.apache.iotdb.commons.concurrent.IoTDBDefaultThreadExceptionHandler;
|
||||
import org.apache.iotdb.commons.conf.CommonDescriptor;
|
||||
import org.apache.iotdb.commons.conf.IoTDBConstant;
|
||||
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
|
||||
import org.apache.iotdb.commons.exception.IllegalPathException;
|
||||
import org.apache.iotdb.commons.exception.IoTDBException;
|
||||
import org.apache.iotdb.commons.exception.StartupException;
|
||||
import org.apache.iotdb.commons.pipe.config.PipeConfig;
|
||||
import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
|
||||
|
@ -50,6 +52,8 @@ import org.apache.iotdb.commons.utils.FileUtils;
|
|||
import org.apache.iotdb.commons.utils.PathUtils;
|
||||
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
|
||||
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
|
||||
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveReq;
|
||||
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveResp;
|
||||
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartReq;
|
||||
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartResp;
|
||||
import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListReq;
|
||||
|
@ -120,13 +124,14 @@ import java.util.ArrayList;
|
|||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.apache.iotdb.commons.conf.IoTDBConstant.DEFAULT_CLUSTER_NAME;
|
||||
import static org.apache.iotdb.db.conf.IoTDBStartCheck.PROPERTIES_FILE_NAME;
|
||||
|
||||
public class DataNode implements DataNodeMBean {
|
||||
public class DataNode extends ServerCommandLine implements DataNodeMBean {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(DataNode.class);
|
||||
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
|
||||
|
@ -162,8 +167,10 @@ public class DataNode implements DataNodeMBean {
|
|||
private boolean schemaRegionConsensusStarted = false;
|
||||
private boolean dataRegionConsensusStarted = false;
|
||||
|
||||
private DataNode() {
|
||||
public DataNode() {
|
||||
super("DataNode");
|
||||
// We do not init anything here, so that we can re-initialize the instance in IT.
|
||||
DataNodeHolder.INSTANCE = this;
|
||||
}
|
||||
|
||||
// TODO: This needs removal of statics ...
|
||||
|
@ -185,11 +192,16 @@ public class DataNode implements DataNodeMBean {
|
|||
public static void main(String[] args) {
|
||||
logger.info("IoTDB-DataNode environment variables: {}", IoTDBConfig.getEnvironmentVariables());
|
||||
logger.info("IoTDB-DataNode default charset is: {}", Charset.defaultCharset().displayName());
|
||||
new DataNodeServerCommandLine().doMain(args);
|
||||
DataNode dataNode = new DataNode();
|
||||
int returnCode = dataNode.run(args);
|
||||
if (returnCode != 0) {
|
||||
System.exit(returnCode);
|
||||
}
|
||||
}
|
||||
|
||||
protected void doAddNode() {
|
||||
boolean isFirstStart = false;
|
||||
@Override
|
||||
protected void start() {
|
||||
boolean isFirstStart;
|
||||
try {
|
||||
// Check if this DataNode is start for the first time and do other pre-checks
|
||||
isFirstStart = prepareDataNode();
|
||||
|
@ -242,6 +254,56 @@ public class DataNode implements DataNodeMBean {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void remove(Long nodeId) throws IoTDBException {
|
||||
// If the nodeId was null, this is a shorthand for removing the current dataNode.
|
||||
// In this case we need to find our nodeId.
|
||||
if (nodeId == null) {
|
||||
nodeId = (long) config.getDataNodeId();
|
||||
}
|
||||
|
||||
logger.info("Starting to remove DataNode with node-id {} from cluster", nodeId);
|
||||
|
||||
// Load ConfigNodeList from system.properties file
|
||||
ConfigNodeInfo.getInstance().loadConfigNodeList();
|
||||
|
||||
int removeNodeId = nodeId.intValue();
|
||||
try (ConfigNodeClient configNodeClient =
|
||||
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
|
||||
// Find a datanode location with the given node id.
|
||||
Optional<TDataNodeLocation> dataNodeLocationOpt =
|
||||
configNodeClient
|
||||
.getDataNodeConfiguration(-1)
|
||||
.getDataNodeConfigurationMap()
|
||||
.values()
|
||||
.stream()
|
||||
.map(TDataNodeConfiguration::getLocation)
|
||||
.filter(location -> location.getDataNodeId() == removeNodeId)
|
||||
.findFirst();
|
||||
if (!dataNodeLocationOpt.isPresent()) {
|
||||
throw new IoTDBException("Invalid node-id", -1);
|
||||
}
|
||||
TDataNodeLocation dataNodeLocation = dataNodeLocationOpt.get();
|
||||
|
||||
logger.info("Start to remove datanode, removed datanode endpoint: {}", dataNodeLocation);
|
||||
TDataNodeRemoveReq removeReq =
|
||||
new TDataNodeRemoveReq(Collections.singletonList(dataNodeLocation));
|
||||
TDataNodeRemoveResp removeResp = configNodeClient.removeDataNode(removeReq);
|
||||
logger.info("Remove result {} ", removeResp);
|
||||
if (removeResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
|
||||
throw new IoTDBException(
|
||||
removeResp.getStatus().toString(), removeResp.getStatus().getCode());
|
||||
}
|
||||
logger.info(
|
||||
"Submit remove-datanode request successfully, but the process may fail. "
|
||||
+ "more details are shown in the logs of confignode-leader and removed-datanode, "
|
||||
+ "and after the process of removing datanode ends successfully, "
|
||||
+ "you are supposed to delete directory and data of the removed-datanode manually");
|
||||
} catch (TException | ClientManagerException e) {
|
||||
throw new IoTDBException("Failed removing datanode", e, -1);
|
||||
}
|
||||
}
|
||||
|
||||
/** Prepare cluster IoTDB-DataNode */
|
||||
private boolean prepareDataNode() throws StartupException, IOException {
|
||||
long startTime = System.currentTimeMillis();
|
||||
|
@ -632,7 +694,7 @@ public class DataNode implements DataNodeMBean {
|
|||
// Get resources for trigger,udf,pipe...
|
||||
prepareResources();
|
||||
|
||||
Runtime.getRuntime().addShutdownHook(new IoTDBShutdownHook());
|
||||
Runtime.getRuntime().addShutdownHook(new IoTDBShutdownHook(generateDataNodeLocation()));
|
||||
setUncaughtExceptionHandler();
|
||||
|
||||
logger.info("Recover the schema...");
|
||||
|
@ -1093,7 +1155,7 @@ public class DataNode implements DataNodeMBean {
|
|||
|
||||
private static class DataNodeHolder {
|
||||
|
||||
private static final DataNode INSTANCE = new DataNode();
|
||||
private static DataNode INSTANCE;
|
||||
|
||||
private DataNodeHolder() {
|
||||
// Empty constructor
|
||||
|
|
|
@ -1,224 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.apache.iotdb.db.service;
|
||||
|
||||
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
|
||||
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
|
||||
import org.apache.iotdb.commons.ServerCommandLine;
|
||||
import org.apache.iotdb.commons.client.IClientManager;
|
||||
import org.apache.iotdb.commons.client.exception.ClientManagerException;
|
||||
import org.apache.iotdb.commons.consensus.ConfigRegionId;
|
||||
import org.apache.iotdb.commons.exception.BadNodeUrlException;
|
||||
import org.apache.iotdb.commons.exception.IoTDBException;
|
||||
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveReq;
|
||||
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveResp;
|
||||
import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
|
||||
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
|
||||
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
|
||||
import org.apache.iotdb.rpc.TSStatusCode;
|
||||
|
||||
import org.apache.commons.lang3.math.NumberUtils;
|
||||
import org.apache.thrift.TException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class DataNodeServerCommandLine extends ServerCommandLine {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(DataNodeServerCommandLine.class);
|
||||
|
||||
// join an established cluster
|
||||
public static final String MODE_START = "-s";
|
||||
// send a request to remove a node, more arguments: ip-of-removed-node
|
||||
// metaport-of-removed-node
|
||||
public static final String MODE_REMOVE = "-r";
|
||||
|
||||
private final ConfigNodeInfo configNodeInfo;
|
||||
private final IClientManager<ConfigRegionId, ConfigNodeClient> configNodeClientManager;
|
||||
private final DataNode dataNode;
|
||||
|
||||
private static final String USAGE =
|
||||
"Usage: <-s|-r> "
|
||||
+ "[-D{} <configure folder>] \n"
|
||||
+ "-s: start the node to the cluster\n"
|
||||
+ "-r: remove the node out of the cluster\n";
|
||||
|
||||
/** Default constructor using the singletons for initializing the relationship. */
|
||||
public DataNodeServerCommandLine() {
|
||||
configNodeInfo = ConfigNodeInfo.getInstance();
|
||||
configNodeClientManager = ConfigNodeClientManager.getInstance();
|
||||
dataNode = DataNode.getInstance();
|
||||
}
|
||||
|
||||
/**
|
||||
* Additional constructor allowing injection of custom instances (mainly for testing)
|
||||
*
|
||||
* @param configNodeInfo config node info
|
||||
* @param configNodeClientManager config node client manager
|
||||
* @param dataNode data node
|
||||
*/
|
||||
public DataNodeServerCommandLine(
|
||||
ConfigNodeInfo configNodeInfo,
|
||||
IClientManager<ConfigRegionId, ConfigNodeClient> configNodeClientManager,
|
||||
DataNode dataNode) {
|
||||
this.configNodeInfo = configNodeInfo;
|
||||
this.configNodeClientManager = configNodeClientManager;
|
||||
this.dataNode = dataNode;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getUsage() {
|
||||
return USAGE;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int run(String[] args) throws Exception {
|
||||
if (args.length < 1) {
|
||||
usage(null);
|
||||
return -1;
|
||||
}
|
||||
|
||||
String mode = args[0];
|
||||
LOGGER.info("Running mode {}", mode);
|
||||
|
||||
// Start IoTDB kernel first, then start the cluster module
|
||||
if (MODE_START.equals(mode)) {
|
||||
dataNode.doAddNode();
|
||||
} else if (MODE_REMOVE.equals(mode)) {
|
||||
return doRemoveDataNode(args);
|
||||
} else {
|
||||
LOGGER.error("Unrecognized mode {}", mode);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* remove data-nodes from cluster
|
||||
*
|
||||
* @param args id or ip:rpc_port for removed datanode
|
||||
*/
|
||||
private int doRemoveDataNode(String[] args)
|
||||
throws BadNodeUrlException, TException, IoTDBException, ClientManagerException {
|
||||
|
||||
if (args.length != 2) {
|
||||
LOGGER.info("Usage: <node-id>/<ip>:<rpc-port>");
|
||||
return -1;
|
||||
}
|
||||
|
||||
// REMARK: Don't need null or empty-checks for args[0] or args[1], as if they were
|
||||
// empty, the JVM would have not received them.
|
||||
|
||||
LOGGER.info("Starting to remove DataNode from cluster, parameter: {}, {}", args[0], args[1]);
|
||||
|
||||
// Load ConfigNodeList from system.properties file
|
||||
configNodeInfo.loadConfigNodeList();
|
||||
|
||||
List<TDataNodeLocation> dataNodeLocations = buildDataNodeLocations(args[1]);
|
||||
if (dataNodeLocations.isEmpty()) {
|
||||
throw new BadNodeUrlException("No DataNode to remove");
|
||||
}
|
||||
LOGGER.info("Start to remove datanode, removed datanode endpoints: {}", dataNodeLocations);
|
||||
TDataNodeRemoveReq removeReq = new TDataNodeRemoveReq(dataNodeLocations);
|
||||
try (ConfigNodeClient configNodeClient =
|
||||
configNodeClientManager.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
|
||||
TDataNodeRemoveResp removeResp = configNodeClient.removeDataNode(removeReq);
|
||||
LOGGER.info("Remove result {} ", removeResp);
|
||||
if (removeResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
|
||||
throw new IoTDBException(
|
||||
removeResp.getStatus().toString(), removeResp.getStatus().getCode());
|
||||
}
|
||||
LOGGER.info(
|
||||
"Submit remove-datanode request successfully, but the process may fail. "
|
||||
+ "more details are shown in the logs of confignode-leader and removed-datanode, "
|
||||
+ "and after the process of removing datanode ends successfully, "
|
||||
+ "you are supposed to delete directory and data of the removed-datanode manually");
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* fetch all datanode info from ConfigNode, then compare with input 'args'
|
||||
*
|
||||
* @param args datanode id or ip:rpc_port
|
||||
* @return TDataNodeLocation list
|
||||
*/
|
||||
private List<TDataNodeLocation> buildDataNodeLocations(String args) {
|
||||
List<TDataNodeLocation> dataNodeLocations = new ArrayList<>();
|
||||
|
||||
// Now support only single datanode deletion
|
||||
if (args.split(",").length > 1) {
|
||||
throw new IllegalArgumentException("Currently only removing single nodes is supported.");
|
||||
}
|
||||
|
||||
// Below supports multiple datanode deletion, split by ',', and is reserved for extension
|
||||
List<NodeCoordinate> nodeCoordinates = parseCoordinates(args);
|
||||
try (ConfigNodeClient client =
|
||||
configNodeClientManager.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
|
||||
dataNodeLocations =
|
||||
client.getDataNodeConfiguration(-1).getDataNodeConfigurationMap().values().stream()
|
||||
.map(TDataNodeConfiguration::getLocation)
|
||||
.filter(
|
||||
location ->
|
||||
nodeCoordinates.stream()
|
||||
.anyMatch(nodeCoordinate -> nodeCoordinate.matches(location)))
|
||||
.collect(Collectors.toList());
|
||||
} catch (TException | ClientManagerException e) {
|
||||
LOGGER.error("Get data node locations failed", e);
|
||||
}
|
||||
|
||||
return dataNodeLocations;
|
||||
}
|
||||
|
||||
protected List<NodeCoordinate> parseCoordinates(String coordinatesString) {
|
||||
// Multiple nodeIds are separated by ","
|
||||
String[] nodeIdStrings = coordinatesString.split(",");
|
||||
List<NodeCoordinate> nodeIdCoordinates = new ArrayList<>(nodeIdStrings.length);
|
||||
for (String nodeId : nodeIdStrings) {
|
||||
// In the other case, we expect it to be a numeric value referring to the node-id
|
||||
if (NumberUtils.isCreatable(nodeId)) {
|
||||
nodeIdCoordinates.add(new NodeCoordinateNodeId(Integer.parseInt(nodeId)));
|
||||
} else {
|
||||
LOGGER.error("Invalid format. Expected a numeric node id, but got: {}", nodeId);
|
||||
}
|
||||
}
|
||||
return nodeIdCoordinates;
|
||||
}
|
||||
|
||||
protected interface NodeCoordinate {
|
||||
// Returns true if the given location matches this coordinate
|
||||
boolean matches(TDataNodeLocation location);
|
||||
}
|
||||
|
||||
/** Implementation of a NodeCoordinate that uses the node id to match. */
|
||||
protected static class NodeCoordinateNodeId implements NodeCoordinate {
|
||||
private final int nodeId;
|
||||
|
||||
public NodeCoordinateNodeId(int nodeId) {
|
||||
this.nodeId = nodeId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean matches(TDataNodeLocation location) {
|
||||
return location.isSetDataNodeId() && location.dataNodeId == nodeId;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.apache.iotdb.db.service;
|
||||
|
||||
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
|
||||
import org.apache.iotdb.commons.client.exception.ClientManagerException;
|
||||
import org.apache.iotdb.commons.cluster.NodeStatus;
|
||||
import org.apache.iotdb.commons.concurrent.ThreadName;
|
||||
|
@ -49,8 +50,11 @@ public class IoTDBShutdownHook extends Thread {
|
|||
|
||||
private static final Logger logger = LoggerFactory.getLogger(IoTDBShutdownHook.class);
|
||||
|
||||
public IoTDBShutdownHook() {
|
||||
private final TDataNodeLocation nodeLocation;
|
||||
|
||||
public IoTDBShutdownHook(TDataNodeLocation nodeLocation) {
|
||||
super(ThreadName.IOTDB_SHUTDOWN_HOOK.getName());
|
||||
this.nodeLocation = nodeLocation;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -120,7 +124,7 @@ public class IoTDBShutdownHook extends Thread {
|
|||
try (ConfigNodeClient client =
|
||||
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
|
||||
isReportSuccess =
|
||||
client.reportDataNodeShutdown(DataNode.generateDataNodeLocation()).getCode()
|
||||
client.reportDataNodeShutdown(nodeLocation).getCode()
|
||||
== TSStatusCode.SUCCESS_STATUS.getStatusCode();
|
||||
|
||||
// Actually stop all services started by the DataNode.
|
||||
|
|
|
@ -24,16 +24,16 @@ public class DaemonTest {
|
|||
|
||||
@Test
|
||||
public void testPid() {
|
||||
DataNode ioTDB = DataNode.getInstance();
|
||||
DataNode dataNode = new DataNode();
|
||||
// no pid set, so there is nothing happens
|
||||
ioTDB.processPid();
|
||||
dataNode.processPid();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSetPid() {
|
||||
DataNode ioTDB = DataNode.getInstance();
|
||||
DataNode dataNode = new DataNode();
|
||||
System.setProperty("iotdb-pidfile", "./iotdb.pid");
|
||||
// no pid set, so there is nothing happens
|
||||
ioTDB.processPid();
|
||||
dataNode.processPid();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,218 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.apache.iotdb.db.service;
|
||||
|
||||
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
|
||||
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
|
||||
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
|
||||
import org.apache.iotdb.common.rpc.thrift.TNodeResource;
|
||||
import org.apache.iotdb.common.rpc.thrift.TSStatus;
|
||||
import org.apache.iotdb.commons.client.IClientManager;
|
||||
import org.apache.iotdb.commons.consensus.ConfigRegionId;
|
||||
import org.apache.iotdb.commons.exception.BadNodeUrlException;
|
||||
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeConfigurationResp;
|
||||
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveReq;
|
||||
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveResp;
|
||||
import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
|
||||
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
|
||||
import org.apache.iotdb.rpc.TSStatusCode;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
import org.junit.Assert;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
|
||||
public class DataNodeServerCommandLineTest extends TestCase {
|
||||
|
||||
// List of well known locations for this test
|
||||
protected static final TDataNodeLocation LOCATION_1 =
|
||||
new TDataNodeLocation(1, new TEndPoint("1.2.3.4", 6667), null, null, null, null);
|
||||
protected static final TDataNodeLocation LOCATION_2 =
|
||||
new TDataNodeLocation(2, new TEndPoint("1.2.3.5", 6667), null, null, null, null);
|
||||
protected static final TDataNodeLocation LOCATION_3 =
|
||||
new TDataNodeLocation(3, new TEndPoint("1.2.3.6", 6667), null, null, null, null);
|
||||
// An invalid location
|
||||
protected static final TDataNodeLocation INVALID_LOCATION =
|
||||
new TDataNodeLocation(23, new TEndPoint("4.3.2.1", 815), null, null, null, null);
|
||||
|
||||
/**
|
||||
* In this test we pass an empty args list to the command. This is expected to fail.
|
||||
*
|
||||
* @throws Exception nothing should go wrong here.
|
||||
*/
|
||||
public void testNoArgs() throws Exception {
|
||||
// No need to initialize these mocks with anything sensible, as they should never be used.
|
||||
ConfigNodeInfo configNodeInfo = null;
|
||||
IClientManager<ConfigRegionId, ConfigNodeClient> configNodeClientManager = null;
|
||||
DataNode dataNode = null;
|
||||
DataNodeServerCommandLine sut =
|
||||
new DataNodeServerCommandLine(configNodeInfo, configNodeClientManager, dataNode);
|
||||
|
||||
int returnCode = sut.run(new String[0]);
|
||||
|
||||
// We expect an error code of -1.
|
||||
Assert.assertEquals(-1, returnCode);
|
||||
}
|
||||
|
||||
/**
|
||||
* In this test we pass too many arguments to the command. This should also fail with an error
|
||||
* code.
|
||||
*
|
||||
* @throws Exception nothing should go wrong here.
|
||||
*/
|
||||
public void testTooManyArgs() throws Exception {
|
||||
// No need to initialize these mocks with anything sensible, as they should never be used.
|
||||
ConfigNodeInfo configNodeInfo = null;
|
||||
IClientManager<ConfigRegionId, ConfigNodeClient> configNodeClientManager = null;
|
||||
DataNode dataNode = null;
|
||||
DataNodeServerCommandLine sut =
|
||||
new DataNodeServerCommandLine(configNodeInfo, configNodeClientManager, dataNode);
|
||||
|
||||
int returnCode = sut.run(new String[] {"-r", "2", "-s"});
|
||||
|
||||
// We expect an error code of -1.
|
||||
Assert.assertEquals(-1, returnCode);
|
||||
}
|
||||
|
||||
/**
|
||||
* In this test case we provide the coordinates for the data-node that we want to delete by
|
||||
* providing the node-id of that node.
|
||||
*
|
||||
* @throws Exception nothing should go wrong here.
|
||||
*/
|
||||
public void testSingleDataNodeRemoveById() throws Exception {
|
||||
ConfigNodeInfo configNodeInfo = Mockito.mock(ConfigNodeInfo.class);
|
||||
IClientManager<ConfigRegionId, ConfigNodeClient> configNodeClientManager =
|
||||
Mockito.mock(IClientManager.class);
|
||||
ConfigNodeClient client = Mockito.mock(ConfigNodeClient.class);
|
||||
Mockito.when(configNodeClientManager.borrowClient(Mockito.any(ConfigRegionId.class)))
|
||||
.thenReturn(client);
|
||||
// This is the result of the getDataNodeConfiguration, which contains the list of known data
|
||||
// nodes.
|
||||
TDataNodeConfigurationResp tDataNodeConfigurationResp = new TDataNodeConfigurationResp();
|
||||
tDataNodeConfigurationResp.putToDataNodeConfigurationMap(
|
||||
1, new TDataNodeConfiguration(LOCATION_1, new TNodeResource()));
|
||||
tDataNodeConfigurationResp.putToDataNodeConfigurationMap(
|
||||
2, new TDataNodeConfiguration(LOCATION_2, new TNodeResource()));
|
||||
tDataNodeConfigurationResp.putToDataNodeConfigurationMap(
|
||||
3, new TDataNodeConfiguration(LOCATION_3, new TNodeResource()));
|
||||
Mockito.when(client.getDataNodeConfiguration(Mockito.anyInt()))
|
||||
.thenReturn(tDataNodeConfigurationResp);
|
||||
// Only return something sensible, if exactly this location is asked to be deleted.
|
||||
Mockito.when(
|
||||
client.removeDataNode(new TDataNodeRemoveReq(Collections.singletonList(LOCATION_2))))
|
||||
.thenReturn(
|
||||
new TDataNodeRemoveResp(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())));
|
||||
DataNode dataNode = Mockito.mock(DataNode.class);
|
||||
DataNodeServerCommandLine sut =
|
||||
new DataNodeServerCommandLine(configNodeInfo, configNodeClientManager, dataNode);
|
||||
|
||||
int returnCode = sut.run(new String[] {"-r", "2"});
|
||||
|
||||
// Check the overall return code was ok.
|
||||
Assert.assertEquals(0, returnCode);
|
||||
// Check that the config node client was actually called with a request to remove the
|
||||
// node we want it to remove
|
||||
Mockito.verify(client, Mockito.times(1))
|
||||
.removeDataNode(new TDataNodeRemoveReq(Collections.singletonList(LOCATION_2)));
|
||||
}
|
||||
|
||||
/**
|
||||
* In this test case we provide the coordinates for the data-node that we want to delete by
|
||||
* providing the node-id of that node. However, the coordinates are invalid and therefore the
|
||||
* deletion fails with an error.
|
||||
*
|
||||
* @throws Exception nothing should go wrong here.
|
||||
*/
|
||||
public void testSingleDataNodeRemoveByIdWithInvalidCoordinates() throws Exception {
|
||||
ConfigNodeInfo configNodeInfo = Mockito.mock(ConfigNodeInfo.class);
|
||||
IClientManager<ConfigRegionId, ConfigNodeClient> configNodeClientManager =
|
||||
Mockito.mock(IClientManager.class);
|
||||
ConfigNodeClient client = Mockito.mock(ConfigNodeClient.class);
|
||||
Mockito.when(configNodeClientManager.borrowClient(Mockito.any(ConfigRegionId.class)))
|
||||
.thenReturn(client);
|
||||
// This is the result of the getDataNodeConfiguration, which contains the list of known data
|
||||
// nodes.
|
||||
TDataNodeConfigurationResp tDataNodeConfigurationResp = new TDataNodeConfigurationResp();
|
||||
tDataNodeConfigurationResp.putToDataNodeConfigurationMap(
|
||||
1, new TDataNodeConfiguration(LOCATION_1, new TNodeResource()));
|
||||
tDataNodeConfigurationResp.putToDataNodeConfigurationMap(
|
||||
2, new TDataNodeConfiguration(LOCATION_2, new TNodeResource()));
|
||||
tDataNodeConfigurationResp.putToDataNodeConfigurationMap(
|
||||
3, new TDataNodeConfiguration(LOCATION_3, new TNodeResource()));
|
||||
Mockito.when(client.getDataNodeConfiguration(Mockito.anyInt()))
|
||||
.thenReturn(tDataNodeConfigurationResp);
|
||||
DataNode dataNode = Mockito.mock(DataNode.class);
|
||||
DataNodeServerCommandLine sut =
|
||||
new DataNodeServerCommandLine(configNodeInfo, configNodeClientManager, dataNode);
|
||||
|
||||
try {
|
||||
sut.run(new String[] {"-r", "23"});
|
||||
Assert.fail("This call should have failed");
|
||||
} catch (Exception e) {
|
||||
// This is actually what we expected
|
||||
Assert.assertTrue(e instanceof BadNodeUrlException);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* In this test case we provide the coordinates for the data-node that we want to delete by
|
||||
* providing the node-id of that node. NOTE: The test was prepared to test deletion of multiple
|
||||
* nodes, however currently we don't support this.
|
||||
*
|
||||
* @throws Exception nothing should go wrong here.
|
||||
*/
|
||||
public void testMultipleDataNodeRemoveById() throws Exception {
|
||||
ConfigNodeInfo configNodeInfo = Mockito.mock(ConfigNodeInfo.class);
|
||||
IClientManager<ConfigRegionId, ConfigNodeClient> configNodeClientManager =
|
||||
Mockito.mock(IClientManager.class);
|
||||
ConfigNodeClient client = Mockito.mock(ConfigNodeClient.class);
|
||||
Mockito.when(configNodeClientManager.borrowClient(Mockito.any(ConfigRegionId.class)))
|
||||
.thenReturn(client);
|
||||
// This is the result of the getDataNodeConfiguration, which contains the list of known data
|
||||
// nodes.
|
||||
TDataNodeConfigurationResp tDataNodeConfigurationResp = new TDataNodeConfigurationResp();
|
||||
tDataNodeConfigurationResp.putToDataNodeConfigurationMap(
|
||||
1, new TDataNodeConfiguration(LOCATION_1, new TNodeResource()));
|
||||
tDataNodeConfigurationResp.putToDataNodeConfigurationMap(
|
||||
2, new TDataNodeConfiguration(LOCATION_2, new TNodeResource()));
|
||||
tDataNodeConfigurationResp.putToDataNodeConfigurationMap(
|
||||
3, new TDataNodeConfiguration(LOCATION_3, new TNodeResource()));
|
||||
Mockito.when(client.getDataNodeConfiguration(Mockito.anyInt()))
|
||||
.thenReturn(tDataNodeConfigurationResp);
|
||||
// Only return something sensible, if exactly the locations we want are asked to be deleted.
|
||||
Mockito.when(
|
||||
client.removeDataNode(new TDataNodeRemoveReq(Arrays.asList(LOCATION_1, LOCATION_2))))
|
||||
.thenReturn(
|
||||
new TDataNodeRemoveResp(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())));
|
||||
DataNode dataNode = Mockito.mock(DataNode.class);
|
||||
DataNodeServerCommandLine sut =
|
||||
new DataNodeServerCommandLine(configNodeInfo, configNodeClientManager, dataNode);
|
||||
|
||||
try {
|
||||
sut.run(new String[] {"-r", "1,2"});
|
||||
Assert.fail("This call should have failed");
|
||||
} catch (Exception e) {
|
||||
// This is actually what we expected
|
||||
Assert.assertTrue(e instanceof IllegalArgumentException);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -119,6 +119,10 @@
|
|||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-lang3</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>commons-cli</groupId>
|
||||
<artifactId>commons-cli</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
|
|
|
@ -18,50 +18,97 @@
|
|||
*/
|
||||
package org.apache.iotdb.commons;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.iotdb.commons.exception.IoTDBException;
|
||||
|
||||
import org.apache.commons.cli.CommandLine;
|
||||
import org.apache.commons.cli.CommandLineParser;
|
||||
import org.apache.commons.cli.DefaultParser;
|
||||
import org.apache.commons.cli.HelpFormatter;
|
||||
import org.apache.commons.cli.Option;
|
||||
import org.apache.commons.cli.OptionGroup;
|
||||
import org.apache.commons.cli.Options;
|
||||
import org.apache.commons.cli.ParseException;
|
||||
|
||||
import java.io.PrintWriter;
|
||||
|
||||
public abstract class ServerCommandLine {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ServerCommandLine.class);
|
||||
|
||||
/**
|
||||
* Implementing subclasses should return a usage string to print out
|
||||
*
|
||||
* @return usage
|
||||
*/
|
||||
protected abstract String getUsage();
|
||||
private static final Option OPTION_START =
|
||||
Option.builder("s").longOpt("start").desc("start a new node").build();
|
||||
private static final Option OPTION_REMOVE =
|
||||
Option.builder("r")
|
||||
.longOpt("remove")
|
||||
.desc(
|
||||
"remove a node (with the given nodeId or the node started on the current machine, if omitted)")
|
||||
.hasArg()
|
||||
.type(Number.class)
|
||||
.argName("nodeId")
|
||||
.optionalArg(true)
|
||||
.build();
|
||||
|
||||
/**
|
||||
* run command
|
||||
*
|
||||
* @param args system args
|
||||
* @return return 0 if exec success
|
||||
*/
|
||||
protected abstract int run(String[] args) throws Exception;
|
||||
private final String cliName;
|
||||
private final PrintWriter output;
|
||||
private final Options options;
|
||||
|
||||
protected void usage(String message) {
|
||||
if (message != null) {
|
||||
System.err.println(message);
|
||||
System.err.println();
|
||||
}
|
||||
|
||||
System.err.println(getUsage());
|
||||
public ServerCommandLine(String cliName) {
|
||||
this(cliName, new PrintWriter(System.out));
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse and run the given command line.
|
||||
*
|
||||
* @param args system args
|
||||
*/
|
||||
public void doMain(String[] args) {
|
||||
public ServerCommandLine(String cliName, PrintWriter output) {
|
||||
this.cliName = cliName;
|
||||
this.output = output;
|
||||
OptionGroup commands = new OptionGroup();
|
||||
commands.addOption(OPTION_START);
|
||||
commands.addOption(OPTION_REMOVE);
|
||||
// Require one option of the group.
|
||||
commands.setRequired(true);
|
||||
options = new Options();
|
||||
options.addOptionGroup(commands);
|
||||
}
|
||||
|
||||
public int run(String[] args) {
|
||||
CommandLineParser parser = new DefaultParser();
|
||||
try {
|
||||
int result = run(args);
|
||||
if (result != 0) {
|
||||
System.exit(result);
|
||||
CommandLine cmd = parser.parse(options, args);
|
||||
// When starting there is no additional argument.
|
||||
if (cmd.hasOption(OPTION_START)) {
|
||||
start();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.error("Failed to execute system command", e);
|
||||
System.exit(-1);
|
||||
// As we only support start and remove and one has to be selected,
|
||||
// no need to check if OPTION_REMOVE is set.
|
||||
else {
|
||||
Number nodeId = (Number) cmd.getParsedOptionValue(OPTION_REMOVE);
|
||||
if (nodeId != null) {
|
||||
remove(nodeId.longValue());
|
||||
} else {
|
||||
remove(null);
|
||||
}
|
||||
}
|
||||
// Make sure we exit with the 0 error code
|
||||
return 0;
|
||||
} catch (ParseException e) {
|
||||
output.println(e.getMessage());
|
||||
HelpFormatter formatter = new HelpFormatter();
|
||||
formatter.printHelp(
|
||||
output,
|
||||
formatter.getWidth(),
|
||||
cliName,
|
||||
null,
|
||||
options,
|
||||
formatter.getLeftPadding(),
|
||||
formatter.getDescPadding(),
|
||||
null,
|
||||
false);
|
||||
// Forward a generic error code to the calling process
|
||||
return 1;
|
||||
} catch (IoTDBException e) {
|
||||
output.println("An error occurred while running the command: " + e.getMessage());
|
||||
// Forward the exit code from the exception to the calling process
|
||||
return e.getErrorCode();
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract void start() throws IoTDBException;
|
||||
|
||||
protected abstract void remove(Long nodeId) throws IoTDBException;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,337 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.apache.iotdb.commons;
|
||||
|
||||
import org.apache.iotdb.commons.exception.IoTDBException;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
import org.junit.Assert;
|
||||
|
||||
import java.io.PrintWriter;
|
||||
import java.io.StringWriter;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
public class ServerCommandLineTest extends TestCase {
|
||||
|
||||
/**
|
||||
* In this test, the commandline is called without any args. In this case the usage should be
|
||||
* output and nothing should be done.
|
||||
*/
|
||||
public void testNoArgs() {
|
||||
AtomicBoolean startCalled = new AtomicBoolean(false);
|
||||
AtomicBoolean stopCalled = new AtomicBoolean(false);
|
||||
StringWriter out = new StringWriter();
|
||||
PrintWriter writer = new PrintWriter(out);
|
||||
ServerCommandLine commandLine =
|
||||
new ServerCommandLine("test-cli", writer) {
|
||||
@Override
|
||||
protected void start() {
|
||||
startCalled.set(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void remove(Long nodeId) {
|
||||
stopCalled.set(true);
|
||||
}
|
||||
};
|
||||
int returnCode = commandLine.run(new String[0]);
|
||||
|
||||
Assert.assertEquals(1, returnCode);
|
||||
String consoleOutput = out.toString();
|
||||
Assert.assertTrue(consoleOutput.contains("Missing required option"));
|
||||
// No callbacks should have been called.
|
||||
Assert.assertFalse(startCalled.get());
|
||||
Assert.assertFalse(stopCalled.get());
|
||||
}
|
||||
|
||||
/**
|
||||
* In this test, the commandline is called with an invalid arg. In this case the usage should be
|
||||
* output and nothing should be done.
|
||||
*/
|
||||
public void testInvalidArgs() {
|
||||
AtomicBoolean startCalled = new AtomicBoolean(false);
|
||||
AtomicBoolean stopCalled = new AtomicBoolean(false);
|
||||
StringWriter out = new StringWriter();
|
||||
PrintWriter writer = new PrintWriter(out);
|
||||
ServerCommandLine commandLine =
|
||||
new ServerCommandLine("test-cli", writer) {
|
||||
@Override
|
||||
protected void start() {
|
||||
startCalled.set(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void remove(Long nodeId) {
|
||||
stopCalled.set(true);
|
||||
}
|
||||
};
|
||||
int returnCode = commandLine.run(new String[] {"-z"});
|
||||
|
||||
Assert.assertEquals(1, returnCode);
|
||||
String consoleOutput = out.toString();
|
||||
Assert.assertTrue(consoleOutput.contains("Unrecognized option"));
|
||||
// No callbacks should have been called.
|
||||
Assert.assertFalse(startCalled.get());
|
||||
Assert.assertFalse(stopCalled.get());
|
||||
}
|
||||
|
||||
/**
|
||||
* In this test, the commandline is called with the start option. The start method should be
|
||||
* called.
|
||||
*/
|
||||
public void testStartArg() {
|
||||
AtomicBoolean startCalled = new AtomicBoolean(false);
|
||||
AtomicBoolean stopCalled = new AtomicBoolean(false);
|
||||
StringWriter out = new StringWriter();
|
||||
PrintWriter writer = new PrintWriter(out);
|
||||
ServerCommandLine commandLine =
|
||||
new ServerCommandLine("test-cli", writer) {
|
||||
@Override
|
||||
protected void start() {
|
||||
startCalled.set(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void remove(Long nodeId) {
|
||||
stopCalled.set(true);
|
||||
}
|
||||
};
|
||||
int returnCode = commandLine.run(new String[] {"-s"});
|
||||
|
||||
Assert.assertEquals(0, returnCode);
|
||||
// Nothing should have been output on the console.
|
||||
String consoleOutput = out.toString();
|
||||
Assert.assertTrue(consoleOutput.isEmpty());
|
||||
// Only the start method should have been called.
|
||||
Assert.assertTrue(startCalled.get());
|
||||
Assert.assertFalse(stopCalled.get());
|
||||
}
|
||||
|
||||
/**
|
||||
* In this test, the commandline is called with the remove option, but without an additional
|
||||
* attribute for providing the node id. The stop method should be called and "null" should be
|
||||
* provided as node id.
|
||||
*/
|
||||
public void testRemoveArg() {
|
||||
AtomicBoolean startCalled = new AtomicBoolean(false);
|
||||
AtomicBoolean stopCalled = new AtomicBoolean(false);
|
||||
AtomicLong stopNodeId = new AtomicLong(-1);
|
||||
StringWriter out = new StringWriter();
|
||||
PrintWriter writer = new PrintWriter(out);
|
||||
ServerCommandLine commandLine =
|
||||
new ServerCommandLine("test-cli", writer) {
|
||||
@Override
|
||||
protected void start() {
|
||||
startCalled.set(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void remove(Long nodeId) {
|
||||
stopCalled.set(true);
|
||||
if (nodeId != null) {
|
||||
stopNodeId.set(nodeId);
|
||||
}
|
||||
}
|
||||
};
|
||||
int returnCode = commandLine.run(new String[] {"-r"});
|
||||
|
||||
Assert.assertEquals(0, returnCode);
|
||||
// Nothing should have been output on the console.
|
||||
String consoleOutput = out.toString();
|
||||
Assert.assertTrue(consoleOutput.isEmpty());
|
||||
// Only the start method should have been called.
|
||||
Assert.assertFalse(startCalled.get());
|
||||
Assert.assertTrue(stopCalled.get());
|
||||
Assert.assertEquals(-1, stopNodeId.get());
|
||||
}
|
||||
|
||||
/**
|
||||
* In this test, the commandline is called with the remove option, with an additional attribute
|
||||
* for providing the node id. The stop method should be called and the id should be passed to the
|
||||
* remove callback.
|
||||
*/
|
||||
public void testRemoveWithNodeIdArg() {
|
||||
AtomicBoolean startCalled = new AtomicBoolean(false);
|
||||
AtomicBoolean stopCalled = new AtomicBoolean(false);
|
||||
AtomicLong stopNodeId = new AtomicLong(-1);
|
||||
StringWriter out = new StringWriter();
|
||||
PrintWriter writer = new PrintWriter(out);
|
||||
ServerCommandLine commandLine =
|
||||
new ServerCommandLine("test-cli", writer) {
|
||||
@Override
|
||||
protected void start() {
|
||||
startCalled.set(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void remove(Long nodeId) {
|
||||
stopCalled.set(true);
|
||||
if (nodeId != null) {
|
||||
stopNodeId.set(nodeId);
|
||||
}
|
||||
}
|
||||
};
|
||||
int returnCode = commandLine.run(new String[] {"-r", "42"});
|
||||
|
||||
Assert.assertEquals(0, returnCode);
|
||||
// Nothing should have been output on the console.
|
||||
String consoleOutput = out.toString();
|
||||
Assert.assertTrue(consoleOutput.isEmpty());
|
||||
// Only the start method should have been called.
|
||||
Assert.assertFalse(startCalled.get());
|
||||
Assert.assertTrue(stopCalled.get());
|
||||
Assert.assertEquals(42L, stopNodeId.get());
|
||||
}
|
||||
|
||||
/**
|
||||
* In this test, the commandline is called with the remove option, with an additional attribute
|
||||
* for providing the node id. However, the attribute is not an integer value, therefore an error
|
||||
* should be thrown.
|
||||
*/
|
||||
public void testRemoveWithInvalidNodeIdArg() {
|
||||
AtomicBoolean startCalled = new AtomicBoolean(false);
|
||||
AtomicBoolean stopCalled = new AtomicBoolean(false);
|
||||
AtomicLong stopNodeId = new AtomicLong(-1);
|
||||
StringWriter out = new StringWriter();
|
||||
PrintWriter writer = new PrintWriter(out);
|
||||
ServerCommandLine commandLine =
|
||||
new ServerCommandLine("test-cli", writer) {
|
||||
@Override
|
||||
protected void start() {
|
||||
startCalled.set(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void remove(Long nodeId) {
|
||||
stopCalled.set(true);
|
||||
if (nodeId != null) {
|
||||
stopNodeId.set(nodeId);
|
||||
}
|
||||
}
|
||||
};
|
||||
int returnCode = commandLine.run(new String[] {"-r", "text"});
|
||||
|
||||
Assert.assertEquals(1, returnCode);
|
||||
// Nothing should have been output on the console.
|
||||
String consoleOutput = out.toString();
|
||||
Assert.assertTrue(consoleOutput.contains("For input string"));
|
||||
// No callbacks should have been called.
|
||||
Assert.assertFalse(startCalled.get());
|
||||
Assert.assertFalse(stopCalled.get());
|
||||
}
|
||||
|
||||
/**
|
||||
* In this test, the commandline is called with the both the start and stop option. This should
|
||||
* result in an error report and no callback should be called.
|
||||
*/
|
||||
public void testCallWithMultipleActions() {
|
||||
AtomicBoolean startCalled = new AtomicBoolean(false);
|
||||
AtomicBoolean stopCalled = new AtomicBoolean(false);
|
||||
AtomicLong stopNodeId = new AtomicLong(-1);
|
||||
StringWriter out = new StringWriter();
|
||||
PrintWriter writer = new PrintWriter(out);
|
||||
ServerCommandLine commandLine =
|
||||
new ServerCommandLine("test-cli", writer) {
|
||||
@Override
|
||||
protected void start() {
|
||||
startCalled.set(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void remove(Long nodeId) {
|
||||
stopCalled.set(true);
|
||||
if (nodeId != null) {
|
||||
stopNodeId.set(nodeId);
|
||||
}
|
||||
}
|
||||
};
|
||||
int returnCode = commandLine.run(new String[] {"-r", "42", "-s"});
|
||||
|
||||
Assert.assertEquals(1, returnCode);
|
||||
// Nothing should have been output on the console.
|
||||
String consoleOutput = out.toString();
|
||||
Assert.assertTrue(
|
||||
consoleOutput.contains("but an option from this group has already been selected"));
|
||||
// No callbacks should have been called.
|
||||
Assert.assertFalse(startCalled.get());
|
||||
Assert.assertFalse(stopCalled.get());
|
||||
}
|
||||
|
||||
/**
|
||||
* In this test, the commandline is called with the start option. The start method should be
|
||||
* called, however there an exception is thrown, which should be caught.
|
||||
*/
|
||||
public void testStartWithErrorArg() {
|
||||
AtomicBoolean stopCalled = new AtomicBoolean(false);
|
||||
StringWriter out = new StringWriter();
|
||||
PrintWriter writer = new PrintWriter(out);
|
||||
ServerCommandLine commandLine =
|
||||
new ServerCommandLine("test-cli", writer) {
|
||||
@Override
|
||||
protected void start() throws IoTDBException {
|
||||
throw new IoTDBException("Error", 23);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void remove(Long nodeId) {
|
||||
stopCalled.set(true);
|
||||
}
|
||||
};
|
||||
int returnCode = commandLine.run(new String[] {"-s"});
|
||||
|
||||
Assert.assertEquals(23, returnCode);
|
||||
// Nothing should have been output on the console.
|
||||
String consoleOutput = out.toString();
|
||||
Assert.assertTrue(consoleOutput.contains("An error occurred while running the command"));
|
||||
// Only the start method should have been called.
|
||||
Assert.assertFalse(stopCalled.get());
|
||||
}
|
||||
|
||||
/**
|
||||
* In this test, the commandline is called with the remove option, with an additional attribute
|
||||
* for providing the node id. The stop method should be called and the id should be passed to the
|
||||
* remove callback, however there an exception is thrown, which should be caught.
|
||||
*/
|
||||
public void testRemoveWithNodeIdWithErrorArg() {
|
||||
AtomicBoolean startCalled = new AtomicBoolean(false);
|
||||
StringWriter out = new StringWriter();
|
||||
PrintWriter writer = new PrintWriter(out);
|
||||
ServerCommandLine commandLine =
|
||||
new ServerCommandLine("test-cli", writer) {
|
||||
@Override
|
||||
protected void start() {
|
||||
startCalled.set(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void remove(Long nodeId) throws IoTDBException {
|
||||
throw new IoTDBException("Error", 23);
|
||||
}
|
||||
};
|
||||
int returnCode = commandLine.run(new String[] {"-r", "42"});
|
||||
|
||||
Assert.assertEquals(23, returnCode);
|
||||
// Nothing should have been output on the console.
|
||||
String consoleOutput = out.toString();
|
||||
Assert.assertTrue(consoleOutput.contains("An error occurred while running the command"));
|
||||
// Only the start method should have been called.
|
||||
Assert.assertFalse(startCalled.get());
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue