[IOTDB-6337] Refine the count calculation in RegionScan framework

This commit is contained in:
YangCaiyin 2024-06-06 11:00:07 +08:00 committed by GitHub
parent 351ab3075b
commit 4c6e110e28
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 133 additions and 52 deletions

View File

@ -285,6 +285,28 @@ public class IoTDBActiveRegionScanIT {
basicShowActiveDeviceTest(sql, SHOW_DEVICES_COLUMN_NAMES, retArray);
}
@Test
public void showActiveDeviceEmptyTest() {
String sql = "show devices root.empty where time < 50";
String[] retArray = new String[] {};
basicShowActiveDeviceTest(sql, SHOW_DEVICES_COLUMN_NAMES, retArray);
sql = "count devices root.empty where time < 50";
long value = 0;
basicCountActiveDeviceTest(sql, COUNT_DEVICES_COLUMN_NAMES, value);
}
@Test
public void showActiveTimeseriesEmptyTest() {
String sql = "show timeseries root.empty where time < 50";
String[] retArray = new String[] {};
basicShowActiveDeviceTest(sql, SHOW_TIMESERIES_COLUMN_NAMES, retArray);
sql = "count timeseries root.empty where time < 50";
long value = 0;
basicCountActiveDeviceTest(sql, COUNT_TIMESERIES_COLUMN_NAMES, value);
}
@Test
public void showActiveTimeseriesTest() {
String sql = "show timeseries where time = 4";
@ -461,7 +483,6 @@ public class IoTDBActiveRegionScanIT {
try (ResultSet resultSet = statement.executeQuery(sql)) {
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
Map<String, Integer> map = new HashMap<>();
assertEquals(1, resultSetMetaData.getColumnCount());
assertEquals(columnName, resultSetMetaData.getColumnName(1));
int cnt = 0;

View File

@ -117,26 +117,37 @@ public class ActiveRegionScanMergeOperator extends AbstractConsumeAllOperator {
}
}
TimeColumnBuilder timeColumnBuilder = tsBlockBuilder.getTimeColumnBuilder();
ColumnBuilder[] valueColumnBuilders = tsBlockBuilder.getValueColumnBuilders();
int curTsBlockRowIndex;
for (int i = 0; i < inputOperatorsCount; i++) {
if (inputTsBlocks[i] == null) {
continue;
if (!needMergeBeforeCount) {
for (int i = 0; i < inputOperatorsCount; i++) {
if (inputTsBlocks[i] == null) {
continue;
}
for (int row = 0; row < maxRowCanBuild; row++) {
long childCount = inputTsBlocks[i].getValueColumns()[0].getLong(inputIndex[i] + row);
count += childCount;
inputIndex[i] += maxRowCanBuild;
}
}
curTsBlockRowIndex = inputIndex[i];
for (int row = 0; row < maxRowCanBuild; row++) {
String id =
inputTsBlocks[i].getValueColumns()[0].getBinary(curTsBlockRowIndex + row).toString();
if (!outputCount || needMergeBeforeCount) {
} else {
TimeColumnBuilder timeColumnBuilder = tsBlockBuilder.getTimeColumnBuilder();
ColumnBuilder[] valueColumnBuilders = tsBlockBuilder.getValueColumnBuilders();
int curTsBlockRowIndex;
for (int i = 0; i < inputOperatorsCount; i++) {
if (inputTsBlocks[i] == null) {
continue;
}
curTsBlockRowIndex = inputIndex[i];
for (int row = 0; row < maxRowCanBuild; row++) {
String id =
inputTsBlocks[i].getValueColumns()[0].getBinary(curTsBlockRowIndex + row).toString();
if (deduplicatedSet.contains(id)) {
continue;
}
deduplicatedSet.add(id);
buildOneRow(i, curTsBlockRowIndex + row, timeColumnBuilder, valueColumnBuilders);
}
buildOneRow(i, curTsBlockRowIndex + row, timeColumnBuilder, valueColumnBuilders);
inputIndex[i] += maxRowCanBuild;
}
inputIndex[i] += maxRowCanBuild;
}
return outputCount ? returnResultIfNoMoreData() : tsBlockBuilder.build();
}

View File

@ -37,6 +37,9 @@ public abstract class AbstractRegionScanDataSourceOperator extends AbstractSourc
protected boolean finished = false;
protected boolean outputCount;
protected long count = 0;
protected AbstractRegionScanForActiveDataUtil regionScanUtil;
protected TsBlockBuilder resultTsBlockBuilder;
@ -97,16 +100,28 @@ public abstract class AbstractRegionScanDataSourceOperator extends AbstractSourc
} while (System.nanoTime() - start < maxRuntime && !resultTsBlockBuilder.isFull());
finished =
resultTsBlockBuilder.isEmpty()
(resultTsBlockBuilder.isEmpty())
&& ((!regionScanUtil.hasMoreData() && regionScanUtil.isCurrentTsFileFinished())
|| isAllDataChecked());
return !finished;
boolean hasCachedCountValue = buildCountResultIfNeed();
return !finished || hasCachedCountValue;
} catch (IOException e) {
throw new IOException("Error occurs when scanning active time series.", e);
}
}
private boolean buildCountResultIfNeed() {
if (!outputCount || !finished || count == -1) {
return false;
}
resultTsBlockBuilder.getTimeColumnBuilder().writeLong(-1);
resultTsBlockBuilder.getValueColumnBuilders()[0].writeLong(count);
resultTsBlockBuilder.declarePosition();
count = -1;
return true;
}
@Override
public void close() throws Exception {
// do nothing

View File

@ -51,7 +51,9 @@ public class ActiveDeviceRegionScanOperator extends AbstractRegionScanDataSource
OperatorContext operatorContext,
PlanNodeId sourceId,
Map<IDeviceID, Boolean> deviceToAlignedMap,
Filter timeFilter) {
Filter timeFilter,
boolean outputCount) {
this.outputCount = outputCount;
this.sourceId = sourceId;
this.operatorContext = operatorContext;
this.deviceToAlignedMap = deviceToAlignedMap;
@ -70,26 +72,36 @@ public class ActiveDeviceRegionScanOperator extends AbstractRegionScanDataSource
@Override
protected void updateActiveData() {
TimeColumnBuilder timeColumnBuilder = resultTsBlockBuilder.getTimeColumnBuilder();
ColumnBuilder[] columnBuilders = resultTsBlockBuilder.getValueColumnBuilders();
List<IDeviceID> activeDevices =
((RegionScanForActiveDeviceUtil) regionScanUtil).getActiveDevices();
for (IDeviceID deviceID : activeDevices) {
timeColumnBuilder.writeLong(-1);
columnBuilders[0].writeBinary(new Binary(deviceID.getBytes()));
columnBuilders[1].writeBinary(
new Binary(
String.valueOf(deviceToAlignedMap.get(deviceID)), TSFileConfig.STRING_CHARSET));
columnBuilders[2].appendNull();
columnBuilders[3].appendNull();
resultTsBlockBuilder.declarePosition();
deviceToAlignedMap.remove(deviceID);
if (this.outputCount) {
count += activeDevices.size();
activeDevices.forEach(deviceToAlignedMap.keySet()::remove);
} else {
TimeColumnBuilder timeColumnBuilder = resultTsBlockBuilder.getTimeColumnBuilder();
ColumnBuilder[] columnBuilders = resultTsBlockBuilder.getValueColumnBuilders();
for (IDeviceID deviceID : activeDevices) {
timeColumnBuilder.writeLong(-1);
columnBuilders[0].writeBinary(new Binary(deviceID.getBytes()));
columnBuilders[1].writeBinary(
new Binary(
String.valueOf(deviceToAlignedMap.get(deviceID)), TSFileConfig.STRING_CHARSET));
columnBuilders[2].appendNull();
columnBuilders[3].appendNull();
resultTsBlockBuilder.declarePosition();
deviceToAlignedMap.remove(deviceID);
}
}
}
@Override
protected List<TSDataType> getResultDataTypes() {
if (outputCount) {
return ColumnHeaderConstant.countDevicesColumnHeaders.stream()
.map(ColumnHeader::getColumnType)
.collect(Collectors.toList());
}
return ColumnHeaderConstant.showDevicesColumnHeaders.stream()
.map(ColumnHeader::getColumnType)
.collect(Collectors.toList());

View File

@ -54,7 +54,9 @@ public class ActiveTimeSeriesRegionScanOperator extends AbstractRegionScanDataSo
OperatorContext operatorContext,
PlanNodeId sourceId,
Map<IDeviceID, Map<String, TimeseriesSchemaInfo>> timeSeriesToSchemasInfo,
Filter timeFilter) {
Filter timeFilter,
boolean isOutputCount) {
this.outputCount = isOutputCount;
this.operatorContext = operatorContext;
this.sourceId = sourceId;
this.timeSeriesToSchemasInfo = timeSeriesToSchemasInfo;
@ -95,6 +97,16 @@ public class ActiveTimeSeriesRegionScanOperator extends AbstractRegionScanDataSo
Map<IDeviceID, List<String>> activeTimeSeries =
((RegionScanForActiveTimeSeriesUtil) regionScanUtil).getActiveTimeSeries();
if (outputCount) {
for (Map.Entry<IDeviceID, List<String>> entry : activeTimeSeries.entrySet()) {
List<String> timeSeriesList = entry.getValue();
count += timeSeriesList.size();
removeTimeseriesListFromDevice(entry.getKey(), timeSeriesList);
}
return;
}
for (Map.Entry<IDeviceID, List<String>> entry : activeTimeSeries.entrySet()) {
IDeviceID deviceID = entry.getKey();
String deviceStr = ((PlainDeviceID) deviceID).toStringID();
@ -117,11 +129,18 @@ public class ActiveTimeSeriesRegionScanOperator extends AbstractRegionScanDataSo
checkAndAppend(schemaInfo.getDeadbandParameters(), columnBuilders[9]); // DeadbandParameters
columnBuilders[10].writeBinary(VIEW_TYPE); // ViewType
resultTsBlockBuilder.declarePosition();
timeSeriesInfo.remove(timeSeries);
}
if (timeSeriesInfo.isEmpty()) {
timeSeriesToSchemasInfo.remove(deviceID);
}
removeTimeseriesListFromDevice(deviceID, timeSeriesList);
}
}
private void removeTimeseriesListFromDevice(IDeviceID deviceID, List<String> timeSeriesList) {
Map<String, TimeseriesSchemaInfo> timeSeriesInfo = timeSeriesToSchemasInfo.get(deviceID);
for (String timeSeries : timeSeriesList) {
timeSeriesInfo.remove(timeSeries);
}
if (timeSeriesInfo.isEmpty()) {
timeSeriesToSchemasInfo.remove(deviceID);
}
}
@ -131,6 +150,11 @@ public class ActiveTimeSeriesRegionScanOperator extends AbstractRegionScanDataSo
@Override
protected List<TSDataType> getResultDataTypes() {
if (outputCount) {
return ColumnHeaderConstant.countTimeSeriesColumnHeaders.stream()
.map(ColumnHeader::getColumnType)
.collect(Collectors.toList());
}
return ColumnHeaderConstant.showTimeSeriesColumnHeaders.stream()
.map(ColumnHeader::getColumnType)
.collect(Collectors.toList());

View File

@ -2928,6 +2928,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
analyzeTimeseriesRegionScan(
showTimeSeriesStatement.getTimeCondition(), patternTree, analysis, context);
if (!hasSchema) {
analysis.setRespDatasetHeader(DatasetHeaderFactory.getShowTimeSeriesHeader());
return analysis;
}
} catch (IllegalPathException e) {
@ -3007,7 +3008,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
schemaTree.removeLogicalView();
}
private boolean analyzeDeviceRegionScan(
private void analyzeDeviceRegionScan(
WhereCondition timeCondition,
PathPatternTree patternTree,
Analysis analysis,
@ -3019,7 +3020,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
ISchemaTree schemaTree = schemaFetcher.fetchSchemaInDeviceLevel(patternTree, context);
if (schemaTree.isEmpty()) {
analysis.setFinishQueryAfterAnalyze(true);
return false;
return;
}
// fetch Data partition
@ -3037,7 +3038,6 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
schemaTree,
context);
analysis.setDataPartitionInfo(dataPartition);
return true;
}
@Override
@ -3051,12 +3051,8 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
showDevicesStatement.getPathPattern().concatNode(IoTDBConstant.ONE_LEVEL_PATH_WILDCARD));
if (showDevicesStatement.hasTimeCondition()) {
boolean hasSchema =
analyzeDeviceRegionScan(
showDevicesStatement.getTimeCondition(), patternTree, analysis, context);
if (!hasSchema) {
return analysis;
}
analyzeDeviceRegionScan(
showDevicesStatement.getTimeCondition(), patternTree, analysis, context);
} else {
SchemaPartition schemaPartitionInfo = partitionFetcher.getSchemaPartition(patternTree);
analysis.setSchemaPartitionInfo(schemaPartitionInfo);
@ -3117,12 +3113,8 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
patternTree.appendPathPattern(
countDevicesStatement.getPathPattern().concatNode(IoTDBConstant.ONE_LEVEL_PATH_WILDCARD));
if (countDevicesStatement.hasTimeCondition()) {
boolean hasSchema =
analyzeDeviceRegionScan(
countDevicesStatement.getTimeCondition(), patternTree, analysis, context);
if (!hasSchema) {
return analysis;
}
analyzeDeviceRegionScan(
countDevicesStatement.getTimeCondition(), patternTree, analysis, context);
} else {
SchemaPartition schemaPartitionInfo = partitionFetcher.getSchemaPartition(patternTree);
analysis.setSchemaPartitionInfo(schemaPartitionInfo);
@ -3147,6 +3139,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
analyzeTimeseriesRegionScan(
countTimeSeriesStatement.getTimeCondition(), patternTree, analysis, context);
if (!hasSchema) {
analysis.setRespDatasetHeader(DatasetHeaderFactory.getCountTimeSeriesHeader());
return analysis;
}
} catch (IllegalPathException e) {

View File

@ -3540,7 +3540,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
}
ActiveDeviceRegionScanOperator regionScanOperator =
new ActiveDeviceRegionScanOperator(
operatorContext, node.getPlanNodeId(), deviceIDToAligned, filter);
operatorContext, node.getPlanNodeId(), deviceIDToAligned, filter, node.isOutputCount());
DataDriverContext dataDriverContext = (DataDriverContext) context.getDriverContext();
dataDriverContext.addSourceOperator(regionScanOperator);
@ -3573,7 +3573,11 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
}
ActiveTimeSeriesRegionScanOperator regionScanOperator =
new ActiveTimeSeriesRegionScanOperator(
operatorContext, node.getPlanNodeId(), timeseriesToSchemaInfo, filter);
operatorContext,
node.getPlanNodeId(),
timeseriesToSchemaInfo,
filter,
node.isOutputCount());
dataDriverContext.addSourceOperator(regionScanOperator);
dataDriverContext.setQueryDataSourceType(QueryDataSourceType.TIME_SERIES_REGION_SCAN);

View File

@ -774,6 +774,7 @@ public class SourceRewriter extends BaseSourceRewriter<DistributionPlanContext>
RegionScanNode regionScanNode = (RegionScanNode) node.clone();
regionScanNode.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
regionScanNode.setRegionReplicaSet(dataRegion);
regionScanNode.setOutputCount(node.isOutputCount());
regionScanNode.clearPath();
return regionScanNode;
})