Handle commit failure of table procedures

This commit is contained in:
Caideyipi 2024-09-24 16:57:30 +08:00 committed by GitHub
parent 737c4ccea9
commit 5b22e7b010
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
25 changed files with 531 additions and 106 deletions

View File

@ -161,6 +161,7 @@ public enum ConfigPhysicalPlanType {
AddTableColumn((short) 853),
SetTableProperties((short) 854),
ShowTable((short) 855),
FetchTable((short) 856),
/** Deprecated types for sync, restored them for upgrade. */
@Deprecated

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.confignode.consensus.request.read.table;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
import org.apache.iotdb.confignode.consensus.request.read.ConfigPhysicalReadPlan;
import java.util.Map;
import java.util.Set;
public class FetchTablePlan extends ConfigPhysicalReadPlan {
private final Map<String, Set<String>> fetchTableMap;
public FetchTablePlan(final Map<String, Set<String>> fetchTableMap) {
super(ConfigPhysicalPlanType.FetchTable);
this.fetchTableMap = fetchTableMap;
}
public Map<String, Set<String>> getFetchTableMap() {
return fetchTableMap;
}
}

View File

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.confignode.consensus.response.table;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.schema.table.TsTable;
import org.apache.iotdb.commons.schema.table.TsTableInternalRPCUtil;
import org.apache.iotdb.confignode.rpc.thrift.TFetchTableResp;
import org.apache.iotdb.consensus.common.DataSet;
import java.util.Map;
public class FetchTableResp implements DataSet {
private final TSStatus status;
private final Map<String, Map<String, TsTable>> fetchTableMap;
public FetchTableResp(
final TSStatus status, final Map<String, Map<String, TsTable>> fetchTableMap) {
this.status = status;
this.fetchTableMap = fetchTableMap;
}
public TFetchTableResp convertToTFetchTableResp() {
return new TFetchTableResp(status)
.setTableInfoMap(TsTableInternalRPCUtil.serializeTableFetchResult(fetchTableMap));
}
}

View File

@ -166,6 +166,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropTopicReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
import org.apache.iotdb.confignode.rpc.thrift.TFetchTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllSubscriptionInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
@ -2582,6 +2583,28 @@ public class ConfigManager implements IManager {
: new TShowTableResp(status);
}
@Override
public TFetchTableResp fetchTables(final Map<String, Set<String>> fetchTableMap) {
final TSStatus status = confirmLeader();
return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
? clusterSchemaManager.fetchTables(
fetchTableMap.entrySet().stream()
.filter(
entry -> {
entry
.getValue()
.removeIf(
table ->
procedureManager
.checkDuplicateTableTask(
entry.getKey(), null, table, null, null)
.getRight());
return !entry.getValue().isEmpty();
})
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)))
: new TFetchTableResp(status);
}
@Override
public DataSet registerAINode(TAINodeRegisterReq req) {
TSStatus status = confirmLeader();

View File

@ -91,6 +91,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropTopicReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
import org.apache.iotdb.confignode.rpc.thrift.TFetchTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllSubscriptionInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
@ -148,6 +149,7 @@ import org.apache.iotdb.rpc.TSStatusCode;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* A subset of services provided by {@link ConfigManager}. For use internally only, passed to
@ -833,4 +835,6 @@ public interface IManager {
TSStatus alterTable(final TAlterTableReq req);
TShowTableResp showTables(final String database);
TFetchTableResp fetchTables(final Map<String, Set<String>> fetchTableMap);
}

View File

@ -231,7 +231,7 @@ public class ProcedureManager {
while (executor.isRunning()
&& System.currentTimeMillis() - startCheckTimeForProcedures < PROCEDURE_WAIT_TIME_OUT) {
final Pair<Long, Boolean> procedureIdDuplicatePair =
awaitDuplicateTableTask(
checkDuplicateTableTask(
database, null, null, null, ProcedureType.CREATE_TABLE_PROCEDURE);
hasOverlappedTask = procedureIdDuplicatePair.getRight();
@ -1362,7 +1362,7 @@ public class ProcedureManager {
long procedureId;
synchronized (this) {
final Pair<Long, Boolean> procedureIdDuplicatePair =
awaitDuplicateTableTask(database, table, tableName, queryId, thisType);
checkDuplicateTableTask(database, table, tableName, queryId, thisType);
procedureId = procedureIdDuplicatePair.getLeft();
if (procedureId == -1) {
@ -1375,16 +1375,12 @@ public class ProcedureManager {
}
}
final List<TSStatus> procedureStatus = new ArrayList<>();
final boolean isSucceed =
waitingProcedureFinished(Collections.singletonList(procedureId), procedureStatus);
if (isSucceed) {
return StatusUtils.OK;
} else {
return procedureStatus.get(0);
}
return waitingProcedureFinished(Collections.singletonList(procedureId), procedureStatus)
? StatusUtils.OK
: procedureStatus.get(0);
}
private Pair<Long, Boolean> awaitDuplicateTableTask(
public Pair<Long, Boolean> checkDuplicateTableTask(
final String database,
final TsTable table,
final String tableName,

View File

@ -39,6 +39,7 @@ import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.read.database.CountDatabasePlan;
import org.apache.iotdb.confignode.consensus.request.read.database.GetDatabasePlan;
import org.apache.iotdb.confignode.consensus.request.read.table.FetchTablePlan;
import org.apache.iotdb.confignode.consensus.request.read.table.ShowTablePlan;
import org.apache.iotdb.confignode.consensus.request.read.template.GetAllSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.read.template.GetAllTemplateSetInfoPlan;
@ -63,6 +64,7 @@ import org.apache.iotdb.confignode.consensus.request.write.template.UnsetSchemaT
import org.apache.iotdb.confignode.consensus.response.database.CountDatabaseResp;
import org.apache.iotdb.confignode.consensus.response.database.DatabaseSchemaResp;
import org.apache.iotdb.confignode.consensus.response.partition.PathInfoResp;
import org.apache.iotdb.confignode.consensus.response.table.FetchTableResp;
import org.apache.iotdb.confignode.consensus.response.table.ShowTableResp;
import org.apache.iotdb.confignode.consensus.response.template.AllTemplateSetInfoResp;
import org.apache.iotdb.confignode.consensus.response.template.TemplateInfoResp;
@ -76,6 +78,7 @@ import org.apache.iotdb.confignode.manager.partition.PartitionMetrics;
import org.apache.iotdb.confignode.persistence.schema.ClusterSchemaInfo;
import org.apache.iotdb.confignode.rpc.thrift.TDatabaseInfo;
import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
import org.apache.iotdb.confignode.rpc.thrift.TFetchTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp;
@ -1073,6 +1076,19 @@ public class ClusterSchemaManager {
}
}
public TFetchTableResp fetchTables(final Map<String, Set<String>> fetchTableMap) {
try {
return ((FetchTableResp)
configManager.getConsensusManager().read(new FetchTablePlan(fetchTableMap)))
.convertToTFetchTableResp();
} catch (final ConsensusException e) {
LOGGER.warn("Failed in the read API executing the consensus layer due to: ", e);
final TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
res.setMessage(e.getMessage());
return new TFetchTableResp(res);
}
}
public byte[] getAllTableInfoForDataNodeActivation() {
return TsTableInternalRPCUtil.serializeTableInitializationInfo(
clusterSchemaInfo.getAllUsingTables(), clusterSchemaInfo.getAllPreCreateTables());

View File

@ -45,6 +45,7 @@ import org.apache.iotdb.confignode.consensus.request.read.partition.GetTimeSlotL
import org.apache.iotdb.confignode.consensus.request.read.pipe.plugin.GetPipePluginJarPlan;
import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionIdPlan;
import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionInfoListPlan;
import org.apache.iotdb.confignode.consensus.request.read.table.FetchTablePlan;
import org.apache.iotdb.confignode.consensus.request.read.table.ShowTablePlan;
import org.apache.iotdb.confignode.consensus.request.read.template.CheckTemplateSettablePlan;
import org.apache.iotdb.confignode.consensus.request.read.template.GetPathsSetTemplatePlan;
@ -311,6 +312,8 @@ public class ConfigPlanExecutor {
return clusterSchemaInfo.getTemplateSetInfo((GetTemplateSetInfoPlan) req);
case ShowTable:
return clusterSchemaInfo.showTables((ShowTablePlan) req);
case FetchTable:
return clusterSchemaInfo.fetchTables((FetchTablePlan) req);
case GetTriggerTable:
return triggerInfo.getTriggerTable((GetTriggerTablePlan) req);
case GetTriggerLocation:

View File

@ -35,6 +35,7 @@ import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.consensus.request.read.database.CountDatabasePlan;
import org.apache.iotdb.confignode.consensus.request.read.database.GetDatabasePlan;
import org.apache.iotdb.confignode.consensus.request.read.table.FetchTablePlan;
import org.apache.iotdb.confignode.consensus.request.read.table.ShowTablePlan;
import org.apache.iotdb.confignode.consensus.request.read.template.CheckTemplateSettablePlan;
import org.apache.iotdb.confignode.consensus.request.read.template.GetPathsSetTemplatePlan;
@ -63,6 +64,7 @@ import org.apache.iotdb.confignode.consensus.request.write.template.UnsetSchemaT
import org.apache.iotdb.confignode.consensus.response.database.CountDatabaseResp;
import org.apache.iotdb.confignode.consensus.response.database.DatabaseSchemaResp;
import org.apache.iotdb.confignode.consensus.response.partition.PathInfoResp;
import org.apache.iotdb.confignode.consensus.response.table.FetchTableResp;
import org.apache.iotdb.confignode.consensus.response.table.ShowTableResp;
import org.apache.iotdb.confignode.consensus.response.template.AllTemplateSetInfoResp;
import org.apache.iotdb.confignode.consensus.response.template.TemplateInfoResp;
@ -1102,6 +1104,27 @@ public class ClusterSchemaInfo implements SnapshotProcessor {
}
}
public FetchTableResp fetchTables(final FetchTablePlan plan) {
databaseReadWriteLock.readLock().lock();
try {
final Map<String, Map<String, TsTable>> result = new HashMap<>();
for (final Map.Entry<String, Set<String>> database2Tables :
plan.getFetchTableMap().entrySet()) {
result.put(
database2Tables.getKey(),
mTree.getSpecificTablesUnderSpecificDatabase(
getQualifiedDatabasePartialPath(database2Tables.getKey()),
database2Tables.getValue()));
}
return new FetchTableResp(StatusUtils.OK, result);
} catch (final MetadataException e) {
return new FetchTableResp(
RpcUtils.getStatus(e.getErrorCode(), e.getMessage()), Collections.emptyMap());
} finally {
databaseReadWriteLock.readLock().unlock();
}
}
public Map<String, List<TsTable>> getAllUsingTables() {
databaseReadWriteLock.readLock().lock();
try {

View File

@ -685,6 +685,23 @@ public class ConfigMTree {
.collect(Collectors.toList());
}
public Map<String, TsTable> getSpecificTablesUnderSpecificDatabase(
final PartialPath databasePath, final Set<String> tables) throws MetadataException {
final IConfigMNode databaseNode = getDatabaseNodeByDatabasePath(databasePath).getAsMNode();
final Map<String, TsTable> result = new HashMap<>();
tables.forEach(
table -> {
final IConfigMNode child = databaseNode.getChildren().get(table);
if (child instanceof ConfigTableNode
&& ((ConfigTableNode) child).getStatus().equals(TableNodeStatus.USING)) {
result.put(table, ((ConfigTableNode) child).getTable());
} else {
result.put(table, null);
}
});
return result;
}
public Map<String, List<TsTable>> getAllUsingTables() {
return getAllDatabasePaths().stream()
.collect(

View File

@ -167,7 +167,6 @@ public class AddTableColumnProcedure
database,
table.getTableName(),
failedResults);
// TODO: Handle commit failure
}
}

View File

@ -297,7 +297,6 @@ public class CreateTableProcedure
database,
table.getTableName(),
failedResults);
// TODO: Handle commit failure
}
}

View File

@ -181,7 +181,6 @@ public class SetTablePropertiesProcedure
database,
table.getTableName(),
failedResults);
// TODO: Handle commit failure
}
}

View File

@ -133,6 +133,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropTopicReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
import org.apache.iotdb.confignode.rpc.thrift.TFetchTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllSubscriptionInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
@ -214,6 +215,7 @@ import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/** ConfigNodeRPCServer exposes the interface that interacts with the DataNode */
@ -1304,4 +1306,9 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac
public TShowTableResp showTables(final String database) {
return configManager.showTables(database);
}
@Override
public TFetchTableResp fetchTables(final Map<String, Set<String>> fetchTableMap) {
return configManager.fetchTables(fetchTableMap);
}
}

View File

@ -1045,6 +1045,8 @@ public class IoTDBConfig {
/** Policy of DataNodeSchemaCache eviction */
private String dataNodeSchemaCacheEvictionPolicy = "FIFO";
private int dataNodeTableCacheSemaphorePermitNum = 5;
private String readConsistencyLevel = "strong";
/** Maximum execution time of a DriverTask */
@ -3466,6 +3468,14 @@ public class IoTDBConfig {
this.dataNodeSchemaCacheEvictionPolicy = dataNodeSchemaCacheEvictionPolicy;
}
public int getDataNodeTableCacheSemaphorePermitNum() {
return dataNodeTableCacheSemaphorePermitNum;
}
public void setDataNodeTableCacheSemaphorePermitNum(int dataNodeTableCacheSemaphorePermitNum) {
this.dataNodeTableCacheSemaphorePermitNum = dataNodeTableCacheSemaphorePermitNum;
}
public String getReadConsistencyLevel() {
return readConsistencyLevel;
}

View File

@ -1073,6 +1073,12 @@ public class IoTDBDescriptor {
properties.getProperty(
"datanode_schema_cache_eviction_policy", conf.getDataNodeSchemaCacheEvictionPolicy()));
conf.setDataNodeTableCacheSemaphorePermitNum(
Integer.parseInt(
properties.getProperty(
"datanode_table_cache_semaphore_permit_num",
String.valueOf(conf.getDataNodeTableCacheSemaphorePermitNum()))));
loadIoTConsensusProps(properties);
loadIoTConsensusV2Props(properties);
}

View File

@ -96,6 +96,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropTopicReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
import org.apache.iotdb.confignode.rpc.thrift.TFetchTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllSubscriptionInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
@ -180,6 +181,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;
public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClient, AutoCloseable {
@ -1278,6 +1280,13 @@ public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClie
() -> client.showTables(database), resp -> !updateConfigNodeLeader(resp.status));
}
@Override
public TFetchTableResp fetchTables(final Map<String, Set<String>> fetchTableMap)
throws TException {
return executeRemoteCallWithRetry(
() -> client.fetchTables(fetchTableMap), resp -> !updateConfigNodeLeader(resp.status));
}
public static class Factory extends ThriftClientFactory<ConfigRegionId, ConfigNodeClient> {
public Factory(

View File

@ -86,6 +86,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropTopicReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
import org.apache.iotdb.confignode.rpc.thrift.TFetchTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetDatabaseReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetPipePluginTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq;
@ -285,6 +286,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
@ -3197,6 +3199,22 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
return future;
}
@Override
public TFetchTableResp fetchTables(final Map<String, Set<String>> fetchTableMap) {
try (final ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
final TFetchTableResp fetchTableResp = configNodeClient.fetchTables(fetchTableMap);
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != fetchTableResp.getStatus().getCode()) {
LOGGER.warn("Failed to fetchTables, status is {}.", fetchTableResp);
}
return fetchTableResp;
} catch (final Exception e) {
return new TFetchTableResp(
new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode())
.setMessage(e.toString()));
}
}
@Override
public SettableFuture<ConfigTaskResult> alterTableAddColumn(
final String database,
@ -3305,14 +3323,14 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
return future;
}
public void handlePipeConfigClientExit(String clientId) {
public void handlePipeConfigClientExit(final String clientId) {
try (final ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
final TSStatus status = configNodeClient.handlePipeConfigClientExit(clientId);
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != status.getCode()) {
LOGGER.warn("Failed to handlePipeConfigClientExit, status is {}.", status);
}
} catch (Exception e) {
} catch (final Exception e) {
LOGGER.warn("Failed to handlePipeConfigClientExit.", e);
}
}

View File

@ -26,6 +26,7 @@ import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.schema.table.TsTable;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchema;
import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
import org.apache.iotdb.confignode.rpc.thrift.TFetchTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TSpaceQuotaResp;
import org.apache.iotdb.confignode.rpc.thrift.TThrottleQuotaResp;
import org.apache.iotdb.db.protocol.session.IClientSession;
@ -95,6 +96,7 @@ import com.google.common.util.concurrent.SettableFuture;
import java.util.List;
import java.util.Map;
import java.util.Set;
public interface IConfigTaskExecutor {
@ -297,6 +299,8 @@ public interface IConfigTaskExecutor {
SettableFuture<ConfigTaskResult> showTables(final String database);
TFetchTableResp fetchTables(final Map<String, Set<String>> fetchTableMap);
SettableFuture<ConfigTaskResult> alterTableAddColumn(
final String database,
final String tableName,

View File

@ -52,6 +52,7 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
@ -80,19 +81,25 @@ public class TableHeaderSchemaValidator {
}
public Optional<TableSchema> validateTableHeaderSchema(
String database, TableSchema tableSchema, MPPQueryContext context, boolean allowCreateTable) {
final String database,
final TableSchema tableSchema,
final MPPQueryContext context,
final boolean allowCreateTable) {
// The schema cache R/W and fetch operation must be locked together thus the cache clean
// operation executed by delete timeSeries will be effective.
DataNodeSchemaLockManager.getInstance()
.takeReadLock(context, SchemaLockType.VALIDATE_VS_DELETION);
List<ColumnSchema> inputColumnList = tableSchema.getColumns();
final List<ColumnSchema> inputColumnList = tableSchema.getColumns();
if (inputColumnList == null || inputColumnList.isEmpty()) {
throw new IllegalArgumentException(
"No column other than Time present, please check the request");
}
TsTable table = DataNodeTableCache.getInstance().getTable(database, tableSchema.getTableName());
List<ColumnSchema> missingColumnList = new ArrayList<>();
// Get directly if there is a table because we do not want "addColumn" to affect
// original writings
TsTable table =
DataNodeTableCache.getInstance().getTableInWrite(database, tableSchema.getTableName());
final List<ColumnSchema> missingColumnList = new ArrayList<>();
// first round validate, check existing schema
if (table == null) {
@ -112,25 +119,36 @@ public class TableHeaderSchemaValidator {
}
}
for (ColumnSchema columnSchema : inputColumnList) {
boolean refreshed = false;
for (final ColumnSchema columnSchema : inputColumnList) {
TsTableColumnSchema existingColumn = table.getColumnSchema(columnSchema.getName());
if (existingColumn == null) {
// check arguments for column auto creation
if (columnSchema.getColumnCategory() == null) {
throw new SemanticException(
String.format(
"Unknown column category for %s. Cannot auto create column.",
columnSchema.getName()),
TSStatusCode.COLUMN_NOT_EXISTS.getStatusCode());
if (Objects.isNull(existingColumn)) {
if (!refreshed) {
// Refresh because there may be new columns added and failed to commit
// Allow refresh only once to avoid too much failure columns in sql when there are column
// procedures
refreshed = true;
table = DataNodeTableCache.getInstance().getTable(database, tableSchema.getTableName());
existingColumn = table.getColumnSchema(columnSchema.getName());
}
if (columnSchema.getType() == null) {
throw new SemanticException(
String.format(
"Unknown column data type for %s. Cannot auto create column.",
columnSchema.getName()),
TSStatusCode.COLUMN_NOT_EXISTS.getStatusCode());
if (Objects.isNull(existingColumn)) {
// check arguments for column auto creation
if (columnSchema.getColumnCategory() == null) {
throw new SemanticException(
String.format(
"Unknown column category for %s. Cannot auto create column.",
columnSchema.getName()),
TSStatusCode.COLUMN_NOT_EXISTS.getStatusCode());
}
if (columnSchema.getType() == null) {
throw new SemanticException(
String.format(
"Unknown column data type for %s. Cannot auto create column.",
columnSchema.getName()),
TSStatusCode.COLUMN_NOT_EXISTS.getStatusCode());
}
missingColumnList.add(columnSchema);
}
missingColumnList.add(columnSchema);
} else {
// leave measurement columns' dataType checking to the caller, then the caller can decide
// whether to do partial insert
@ -145,7 +163,7 @@ public class TableHeaderSchemaValidator {
}
}
List<ColumnSchema> resultColumnList = new ArrayList<>();
final List<ColumnSchema> resultColumnList = new ArrayList<>();
if (!missingColumnList.isEmpty()
&& IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()) {
// TODO table metadata: authority check for table alter

View File

@ -23,16 +23,23 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.schema.table.TsTable;
import org.apache.iotdb.commons.schema.table.TsTableInternalRPCUtil;
import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.confignode.rpc.thrift.TFetchTableResp;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.plan.execution.config.executor.ClusterConfigTaskExecutor;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import javax.annotation.Nonnull;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -47,9 +54,13 @@ public class DataNodeTableCache implements ITableCache {
private final Map<String, Map<String, TsTable>> databaseTableMap = new ConcurrentHashMap<>();
private final Map<String, Map<String, TsTable>> preUpdateTableMap = new ConcurrentHashMap<>();
private final Map<String, Map<String, Pair<TsTable, Long>>> preUpdateTableMap =
new ConcurrentHashMap<>();
private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final Semaphore fetchTableSemaphore =
new Semaphore(
IoTDBDescriptor.getInstance().getConfig().getDataNodeTableCacheSemaphorePermitNum());
private DataNodeTableCache() {
// Do nothing
@ -76,38 +87,52 @@ public class DataNodeTableCache implements ITableCache {
TsTableInternalRPCUtil.deserializeTableInitializationInfo(tableInitializationBytes);
final Map<String, List<TsTable>> usingMap = tableInfo.left;
final Map<String, List<TsTable>> preCreateMap = tableInfo.right;
saveUpdatedTableInfo(usingMap, databaseTableMap);
saveUpdatedTableInfo(preCreateMap, preUpdateTableMap);
usingMap.forEach(
(key, value) ->
databaseTableMap.put(
PathUtils.unQualifyDatabaseName(key),
value.stream()
.collect(
Collectors.toMap(
TsTable::getTableName,
Function.identity(),
(v1, v2) -> v2,
ConcurrentHashMap::new))));
preCreateMap.forEach(
(key, value) ->
preUpdateTableMap.put(
PathUtils.unQualifyDatabaseName(key),
value.stream()
.collect(
Collectors.toMap(
TsTable::getTableName,
table -> new Pair<>(table, 0L),
(v1, v2) -> v2,
ConcurrentHashMap::new))));
LOGGER.info("Init DataNodeTableCache successfully");
} finally {
readWriteLock.writeLock().unlock();
}
}
private void saveUpdatedTableInfo(
final Map<String, List<TsTable>> tableMap,
final Map<String, Map<String, TsTable>> localTableMap) {
tableMap.forEach(
(key, value) ->
localTableMap.put(
key,
value.stream()
.collect(
Collectors.toMap(
TsTable::getTableName,
Function.identity(),
(v1, v2) -> v2,
ConcurrentHashMap::new))));
}
@Override
public void preUpdateTable(String database, final TsTable table) {
database = PathUtils.qualifyDatabaseName(database);
database = PathUtils.unQualifyDatabaseName(database);
readWriteLock.writeLock().lock();
try {
preUpdateTableMap
.computeIfAbsent(database, k -> new ConcurrentHashMap<>())
.put(table.getTableName(), table);
.compute(
table.getTableName(),
(k, v) -> {
if (Objects.isNull(v)) {
return new Pair<>(table, 0L);
} else {
v.setLeft(table);
v.setRight(v.getRight() + 1);
return v;
}
});
LOGGER.info("Pre-update table {}.{} successfully", database, table);
} finally {
readWriteLock.writeLock().unlock();
@ -116,7 +141,7 @@ public class DataNodeTableCache implements ITableCache {
@Override
public void rollbackUpdateTable(String database, final String tableName) {
database = PathUtils.qualifyDatabaseName(database);
database = PathUtils.unQualifyDatabaseName(database);
readWriteLock.writeLock().lock();
try {
removeTableFromPreUpdateMap(database, tableName);
@ -133,24 +158,19 @@ public class DataNodeTableCache implements ITableCache {
if (v == null) {
throw new IllegalStateException();
}
v.remove(tableName);
if (v.isEmpty()) {
return null;
} else {
return v;
}
v.get(tableName).setLeft(null);
return v;
});
}
@Override
public void commitUpdateTable(String database, final String tableName) {
database = PathUtils.qualifyDatabaseName(database);
database = PathUtils.unQualifyDatabaseName(database);
readWriteLock.writeLock().lock();
try {
final TsTable table = preUpdateTableMap.get(database).get(tableName);
databaseTableMap
.computeIfAbsent(database, k -> new ConcurrentHashMap<>())
.put(tableName, table);
.put(tableName, preUpdateTableMap.get(database).get(tableName).getLeft());
removeTableFromPreUpdateMap(database, tableName);
LOGGER.info("Commit-update table {}.{} successfully", database, tableName);
} finally {
@ -160,7 +180,7 @@ public class DataNodeTableCache implements ITableCache {
@Override
public void invalid(String database) {
database = PathUtils.qualifyDatabaseName(database);
database = PathUtils.unQualifyDatabaseName(database);
readWriteLock.writeLock().lock();
try {
databaseTableMap.remove(database);
@ -170,30 +190,137 @@ public class DataNodeTableCache implements ITableCache {
}
}
public TsTable getTableInWrite(final String database, final String tableName) {
final TsTable result = getTableInCache(database, tableName);
return Objects.nonNull(result) ? result : getTable(database, tableName);
}
/**
* The following logic can handle the cases when configNode failed to clear some table in {@link
* #preUpdateTableMap}, due to the failure of "commit" or rollback of "pre-update".
*/
public TsTable getTable(String database, final String tableName) {
database = PathUtils.qualifyDatabaseName(database);
database = PathUtils.unQualifyDatabaseName(database);
final Map<String, Map<String, Long>> preUpdateTables =
mayGetTableInPreUpdateMap(database, tableName);
if (Objects.nonNull(preUpdateTables)) {
updateTable(getTablesInConfigNode(preUpdateTables), preUpdateTables);
}
return getTableInCache(database, tableName);
}
private Map<String, Map<String, Long>> mayGetTableInPreUpdateMap(
final String database, final String tableName) {
readWriteLock.readLock().lock();
try {
if (databaseTableMap.containsKey(database)) {
return databaseTableMap.get(database).get(tableName);
}
return null;
return preUpdateTableMap.containsKey(database)
&& preUpdateTableMap.get(database).containsKey(tableName)
? preUpdateTableMap.entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
entry ->
entry.getValue().entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
innerEntry -> innerEntry.getValue().getRight()))))
: null;
} finally {
readWriteLock.readLock().unlock();
}
}
public Optional<List<TsTable>> getTables(String database) {
database = PathUtils.qualifyDatabaseName(database);
private Map<String, Map<String, TsTable>> getTablesInConfigNode(
final Map<String, Map<String, Long>> tableInput) {
Map<String, Map<String, TsTable>> result = Collections.emptyMap();
try {
fetchTableSemaphore.acquire();
final TFetchTableResp resp =
ClusterConfigTaskExecutor.getInstance()
.fetchTables(
tableInput.entrySet().stream()
.collect(
Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().keySet())));
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() == resp.getStatus().getCode()) {
result = TsTableInternalRPCUtil.deserializeTsTableFetchResult(resp.getTableInfoMap());
}
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.warn(
"Interrupted when trying to acquire semaphore when trying to get tables from configNode, ignore.");
} catch (final Exception e) {
fetchTableSemaphore.release();
throw e;
}
fetchTableSemaphore.release();
return result;
}
private void updateTable(
final Map<String, Map<String, TsTable>> fetchedTables,
final Map<String, Map<String, Long>> previousVersions) {
readWriteLock.writeLock().lock();
try {
fetchedTables.forEach(
(database, tableInfoMap) -> {
if (preUpdateTableMap.containsKey(database)) {
tableInfoMap.forEach(
(tableName, tsTable) -> {
final Pair<TsTable, Long> existingPair =
preUpdateTableMap.get(database).get(tableName);
if (Objects.isNull(existingPair)
|| !Objects.equals(
existingPair.getRight(),
previousVersions.get(database).get(tableName))) {
return;
}
existingPair.setLeft(null);
if (Objects.nonNull(tsTable)) {
databaseTableMap
.computeIfAbsent(database, k -> new ConcurrentHashMap<>())
.put(tableName, tsTable);
} else if (databaseTableMap.containsKey(database)) {
databaseTableMap.get(database).remove(tableName);
}
});
}
});
} finally {
readWriteLock.writeLock().unlock();
}
}
private TsTable getTableInCache(final String database, final String tableName) {
readWriteLock.readLock().lock();
try {
final Map<String, TsTable> tableMap = databaseTableMap.get(database);
return tableMap != null ? Optional.of(new ArrayList<>(tableMap.values())) : Optional.empty();
return databaseTableMap.containsKey(database)
? databaseTableMap.get(database).get(tableName)
: null;
} finally {
readWriteLock.readLock().unlock();
}
}
// Database shall not start with "root"
public String tryGetInternColumnName(
final @Nonnull String database,
final @Nonnull String tableName,
final @Nonnull String columnName) {
if (columnName.isEmpty()) {
return columnName;
}
try {
return databaseTableMap
.get(database)
.get(tableName)
.getColumnSchema(columnName)
.getColumnName();
} catch (final Exception e) {
return columnName;
}
}
/** Check whether the given path overlap with some table existence. */
public Pair<String, String> checkTableCreateAndPreCreateOnGivenPath(final PartialPath path) {
readWriteLock.writeLock().lock();
@ -210,9 +337,9 @@ public class DataNodeTableCache implements ITableCache {
}
private Pair<String, String> checkTableExistenceOnGivenPath(
final String path, final Map<String, Map<String, TsTable>> tableMap) {
final String path, final Map<String, ? extends Map<String, ?>> tableMap) {
final int dbStartIndex = PATH_ROOT.length() + 1;
for (final Map.Entry<String, Map<String, TsTable>> dbEntry : tableMap.entrySet()) {
for (final Map.Entry<String, ? extends Map<String, ?>> dbEntry : tableMap.entrySet()) {
final String database = dbEntry.getKey();
if (!(path.startsWith(database, dbStartIndex)
&& path.charAt(dbStartIndex + database.length()) == PATH_SEPARATOR)) {

View File

@ -189,10 +189,10 @@ public class TsTable {
return stream.toByteArray();
}
public void serialize(OutputStream stream) throws IOException {
public void serialize(final OutputStream stream) throws IOException {
ReadWriteIOUtils.write(tableName, stream);
ReadWriteIOUtils.write(columnSchemaMap.size(), stream);
for (TsTableColumnSchema columnSchema : columnSchemaMap.values()) {
for (final TsTableColumnSchema columnSchema : columnSchemaMap.values()) {
TsTableColumnSchemaUtil.serialize(columnSchema, stream);
}
ReadWriteIOUtils.write(props, stream);

View File

@ -30,35 +30,36 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
public class TsTableInternalRPCUtil {
private TsTableInternalRPCUtil() {
// do nothing
// Do nothing
}
public static byte[] serializeBatchTsTable(Map<String, List<TsTable>> tableMap) {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
public static byte[] serializeBatchTsTable(final Map<String, List<TsTable>> tableMap) {
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
try {
ReadWriteIOUtils.write(tableMap.size(), outputStream);
for (Map.Entry<String, List<TsTable>> entry : tableMap.entrySet()) {
for (final Map.Entry<String, List<TsTable>> entry : tableMap.entrySet()) {
ReadWriteIOUtils.write(entry.getKey(), outputStream);
ReadWriteIOUtils.write(entry.getValue().size(), outputStream);
for (TsTable table : entry.getValue()) {
for (final TsTable table : entry.getValue()) {
table.serialize(outputStream);
}
}
} catch (IOException ignored) {
// won't happen
} catch (final IOException ignored) {
// ByteArrayOutputStream won't throw IOException
}
return outputStream.toByteArray();
}
public static Map<String, List<TsTable>> deserializeBatchTsTable(byte[] bytes) {
InputStream inputStream = new ByteArrayInputStream(bytes);
Map<String, List<TsTable>> result = new HashMap<>();
public static Map<String, List<TsTable>> deserializeBatchTsTable(final byte[] bytes) {
final InputStream inputStream = new ByteArrayInputStream(bytes);
final Map<String, List<TsTable>> result = new HashMap<>();
try {
int dbNum = ReadWriteIOUtils.readInt(inputStream);
final int dbNum = ReadWriteIOUtils.readInt(inputStream);
String database;
int tableNum;
List<TsTable> tableList;
@ -71,8 +72,8 @@ public class TsTableInternalRPCUtil {
}
result.put(database, tableList);
}
} catch (IOException ignored) {
// won't happen
} catch (final IOException ignored) {
// ByteArrayInputStream won't throw IOException
}
return result;
}
@ -89,29 +90,76 @@ public class TsTableInternalRPCUtil {
}
public static Pair<String, TsTable> deserializeSingleTsTable(final byte[] bytes) {
InputStream inputStream = new ByteArrayInputStream(bytes);
final InputStream inputStream = new ByteArrayInputStream(bytes);
try {
String database = ReadWriteIOUtils.readString(inputStream);
TsTable table = TsTable.deserialize(inputStream);
return new Pair<>(database, table);
} catch (IOException ignored) {
return new Pair<>(ReadWriteIOUtils.readString(inputStream), TsTable.deserialize(inputStream));
} catch (final IOException ignored) {
// ByteArrayInputStream won't throw IOException
}
throw new IllegalStateException();
}
public static byte[] serializeTableInitializationInfo(
Map<String, List<TsTable>> usingTableMap, Map<String, List<TsTable>> preCreateTableMap) {
byte[] usingBytes = serializeBatchTsTable(usingTableMap);
byte[] preCreateBytes = serializeBatchTsTable(preCreateTableMap);
byte[] result = new byte[usingBytes.length + preCreateBytes.length];
final Map<String, List<TsTable>> usingTableMap,
final Map<String, List<TsTable>> preCreateTableMap) {
final byte[] usingBytes = serializeBatchTsTable(usingTableMap);
final byte[] preCreateBytes = serializeBatchTsTable(preCreateTableMap);
final byte[] result = new byte[usingBytes.length + preCreateBytes.length];
System.arraycopy(usingBytes, 0, result, 0, usingBytes.length);
System.arraycopy(preCreateBytes, 0, result, usingBytes.length, preCreateBytes.length);
return result;
}
public static Pair<Map<String, List<TsTable>>, Map<String, List<TsTable>>>
deserializeTableInitializationInfo(byte[] bytes) {
deserializeTableInitializationInfo(final byte[] bytes) {
return new Pair<>(deserializeBatchTsTable(bytes), deserializeBatchTsTable(bytes));
}
public static byte[] serializeTableFetchResult(
final Map<String, Map<String, TsTable>> fetchTableMap) {
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
try {
ReadWriteIOUtils.write(fetchTableMap.size(), outputStream);
for (final Map.Entry<String, Map<String, TsTable>> entry : fetchTableMap.entrySet()) {
ReadWriteIOUtils.write(entry.getKey(), outputStream);
ReadWriteIOUtils.write(entry.getValue().size(), outputStream);
for (final Map.Entry<String, TsTable> tableEntry : entry.getValue().entrySet()) {
ReadWriteIOUtils.write(tableEntry.getKey(), outputStream);
ReadWriteIOUtils.write(Objects.nonNull(tableEntry.getValue()), outputStream);
if (Objects.nonNull(tableEntry.getValue())) {
tableEntry.getValue().serialize(outputStream);
}
}
}
} catch (final IOException ignored) {
// ByteArrayOutputStream won't throw IOException
}
return outputStream.toByteArray();
}
public static Map<String, Map<String, TsTable>> deserializeTsTableFetchResult(
final byte[] bytes) {
final InputStream inputStream = new ByteArrayInputStream(bytes);
final Map<String, Map<String, TsTable>> result = new HashMap<>();
try {
int dbNum = ReadWriteIOUtils.readInt(inputStream);
String database;
int tableNum;
Map<String, TsTable> tableMap;
for (int i = 0; i < dbNum; i++) {
database = ReadWriteIOUtils.readString(inputStream);
tableNum = ReadWriteIOUtils.readInt(inputStream);
tableMap = new HashMap<>(tableNum);
for (int j = 0; j < tableNum; j++) {
tableMap.put(
ReadWriteIOUtils.readString(inputStream),
ReadWriteIOUtils.readBool(inputStream) ? TsTable.deserialize(inputStream) : null);
}
result.put(database, tableMap);
}
} catch (final IOException ignored) {
// ByteArrayInputStream won't throw IOException
}
return result;
}
}

View File

@ -213,4 +213,11 @@ public class PathUtils {
}
return databaseName;
}
public static String unQualifyDatabaseName(String databaseName) {
if (databaseName != null && databaseName.startsWith("root.")) {
databaseName = databaseName.substring(5);
}
return databaseName;
}
}

View File

@ -1044,6 +1044,11 @@ struct TShowTableResp {
2: optional list<TTableInfo> tableInfoList
}
struct TFetchTableResp {
1: required common.TSStatus status
2: optional binary tableInfoMap
}
struct TTableInfo {
1: required string tableName
// TTL is stored as string in table props
@ -1785,5 +1790,7 @@ service IConfigNodeRPCService {
common.TSStatus alterTable(TAlterTableReq req)
TShowTableResp showTables(string database)
TFetchTableResp fetchTables(map<string, set<string>> fetchTableMap)
}