This commit is contained in:
qiaojialingithub 2017-11-08 15:35:16 +08:00
commit b86de9b7de
8 changed files with 105 additions and 72 deletions

View File

@ -92,6 +92,6 @@ public abstract class AggregateFunction {
throws IOException, ProcessorException;
public abstract void calcGroupByAggregationWithoutFilter(long partitionStart, long intervalStart, long intervalEnd,
public abstract void calcGroupByAggregationWithoutFilter(long partitionStart, long partitionEnd, long intervalStart, long intervalEnd,
DynamicOneColumnData data, boolean firstPartitionFlag);
}

View File

@ -14,7 +14,7 @@ import cn.edu.tsinghua.tsfile.timeseries.read.query.DynamicOneColumnData;
public class CountAggrFunc extends AggregateFunction {
public CountAggrFunc() {
super(AggregationConstant.COUNT, TSDataType.INT64);
super(AggregationConstant.COUNT, TSDataType.INT64, true);
}
@Override
@ -89,30 +89,39 @@ public class CountAggrFunc extends AggregateFunction {
}
@Override
public void calcGroupByAggregationWithoutFilter(long partitionStart, long intervalStart, long intervalEnd,
public void calcGroupByAggregationWithoutFilter(long partitionStart, long partitionEnd, long intervalStart, long intervalEnd,
DynamicOneColumnData data, boolean firstPartitionFlag) {
if (firstPartitionFlag) {
if (result.data.emptyTimeLength == 0) {
result.data.putEmptyTime(partitionStart);
} else if( (result.data.getEmptyTime(result.data.emptyTimeLength-1) != partitionStart)
&& (result.data.timeLength == 0 ||
(result.data.timeLength > 0 && result.data.getTime(result.data.timeLength-1) != partitionStart))){
result.data.putEmptyTime(partitionStart);
}
long valueSum = 0;
while (data.curIdx < data.timeLength) {
if (data.getTime(data.curIdx) > intervalEnd) {
return;
} else if (data.getTime(data.curIdx) < intervalStart) {
data.curIdx += 1;
} else if (data.getTime(data.curIdx) >= intervalStart && data.getTime(data.curIdx) <= intervalEnd) {
valueSum += 1;
long time = data.getTime(data.curIdx);
if (time > intervalEnd || time > partitionEnd) {
break;
} else if (time < intervalStart || time < partitionStart) {
data.curIdx ++;
} else if (time >= intervalStart && time <= intervalEnd && time >= partitionStart && time <= partitionEnd) {
valueSum ++;
data.curIdx ++;
}
}
if (result.data.emptyTimeLength > 0 && result.data.getEmptyTime(result.data.emptyTimeLength - 1) == partitionStart) {
result.data.removeLastEmptyTime();
result.data.putTime(partitionStart);
result.data.putLong(valueSum);
} else {
long preSum = result.data.getLong(result.data.valueLength - 1);
result.data.setLong(result.data.valueLength - 1, preSum + valueSum);
if (valueSum > 0) {
if (result.data.emptyTimeLength > 0 && result.data.getEmptyTime(result.data.emptyTimeLength - 1) == partitionStart) {
result.data.removeLastEmptyTime();
result.data.putTime(partitionStart);
result.data.putLong(valueSum);
} else {
long preSum = result.data.getLong(result.data.valueLength - 1);
result.data.setLong(result.data.valueLength - 1, preSum + valueSum);
}
}
}
}

View File

@ -86,7 +86,7 @@ public class MaxTimeAggrFunc extends AggregateFunction {
}
@Override
public void calcGroupByAggregationWithoutFilter(long partitionStart, long intervalStart, long intervalEnd,
public void calcGroupByAggregationWithoutFilter(long partitionStart, long partitionEnd, long intervalStart, long intervalEnd,
DynamicOneColumnData data, boolean firstPartitionFlag) {
if (partitionStart != result.data.getTime(result.data.timeLength-1) && partitionStart != 0) {
result.data.putTime(partitionStart);

View File

@ -92,7 +92,7 @@ public class MaxValueAggrFunc extends AggregateFunction {
}
@Override
public void calcGroupByAggregationWithoutFilter(long partitionStart, long intervalStart, long intervalEnd, DynamicOneColumnData data,
public void calcGroupByAggregationWithoutFilter(long partitionStart, long partitionEnd, long intervalStart, long intervalEnd, DynamicOneColumnData data,
boolean firstPartitionFlag) {
}

View File

@ -86,7 +86,7 @@ public class MinTimeAggrFunc extends AggregateFunction {
}
@Override
public void calcGroupByAggregationWithoutFilter(long partitionStart, long intervalStart, long intervalEnd, DynamicOneColumnData data,
public void calcGroupByAggregationWithoutFilter(long partitionStart, long partitionEnd, long intervalStart, long intervalEnd, DynamicOneColumnData data,
boolean firstPartitionFlag) {
}

View File

@ -93,7 +93,7 @@ public class MinValueAggrFunc extends AggregateFunction {
}
@Override
public void calcGroupByAggregationWithoutFilter(long partitionStart, long intervalStart, long intervalEnd, DynamicOneColumnData data, boolean firstPartitionFlag) {
public void calcGroupByAggregationWithoutFilter(long partitionStart, long partitionEnd, long intervalStart, long intervalEnd, DynamicOneColumnData data, boolean firstPartitionFlag) {
}

View File

@ -111,9 +111,10 @@ public class OverflowQueryEngine {
public QueryDataSet groupBy(List<Pair<Path, String>> aggres, List<FilterStructure> filterStructures,
long unit, long origin, List<Pair<Long, Long>> intervals, int fetchSize)
throws ProcessorException, PathErrorException, IOException {
// if (testFloag.get() != null && testFloag.get() == true) {
// return new QueryDataSet();
// }
if (testFloag.get() != null && testFloag.get() == true) {
return new QueryDataSet();
}
testFloag.set(true);
// QueryDataSet testQueryDataSet = new QueryDataSet();
// DynamicOneColumnData data1 = new DynamicOneColumnData(TSDataType.INT32, true, true);
// for (int i = 1;i <= 10;i ++) {
@ -136,6 +137,7 @@ public class OverflowQueryEngine {
//
// testFloag.set(true);
// return testQueryDataSet;
GroupByEngine groupByEngine = new GroupByEngine();
return null;

View File

@ -17,6 +17,7 @@ import cn.edu.tsinghua.tsfile.common.utils.Pair;
import cn.edu.tsinghua.tsfile.file.metadata.enums.TSDataType;
import cn.edu.tsinghua.tsfile.timeseries.filter.definition.FilterExpression;
import cn.edu.tsinghua.tsfile.timeseries.filter.definition.SingleSeriesFilterExpression;
import cn.edu.tsinghua.tsfile.timeseries.filter.definition.operators.NoFilter;
import cn.edu.tsinghua.tsfile.timeseries.filter.utils.LongInterval;
import cn.edu.tsinghua.tsfile.timeseries.filter.verifier.FilterVerifier;
import cn.edu.tsinghua.tsfile.timeseries.read.query.BatchReadRecordGenerator;
@ -36,6 +37,7 @@ public class GroupByEngine {
// ThreadLocal<>
private int formNumber = -1;
private List<Path> queryPaths = new ArrayList<>();
private ThreadLocal<Boolean> threadLocal = new ThreadLocal<>();
public QueryDataSet groupBy(List<Pair<Path, String>> aggres, List<FilterStructure> filterStructures,
long unit, long origin, FilterExpression intervals, int fetchSize)
@ -46,7 +48,7 @@ public class GroupByEngine {
}
List<Pair<Path, AggregateFunction>> aggregations = new ArrayList<>();
for (Pair<Path, String> pair : aggres) {
TSDataType dataType= MManager.getInstance().getSeriesType(pair.left.getFullPath());
TSDataType dataType = MManager.getInstance().getSeriesType(pair.left.getFullPath());
AggregateFunction func = AggreFuncFactory.getAggrFuncByName(pair.right, dataType);
aggregations.add(new Pair<>(pair.left, func));
}
@ -60,12 +62,9 @@ public class GroupByEngine {
// all the split time intervals
LongInterval longInterval = new LongInterval();
if (noFilterFlag) {
longInterval.addValueFlag(Long.MIN_VALUE, true);
longInterval.addValueFlag(Long.MAX_VALUE, true);
} else {
longInterval = (LongInterval) FilterVerifier.create(TSDataType.INT64).getInterval((SingleSeriesFilterExpression) intervals);
}
// longInterval = (LongInterval) FilterVerifier.create(TSDataType.INT64).getInterval((SingleSeriesFilterExpression) intervals);
longInterval.addValueFlag(1L, true);
longInterval.addValueFlag(10000L, true);
if (longInterval.count == 0) {
return new QueryDataSet();
@ -87,13 +86,17 @@ public class GroupByEngine {
String aggregateKey = aggregationKey(aggregations.get(i).left, aggregations.get(i).right);
if (!groupByResult.mapRet.containsKey(aggregateKey)) {
groupByResult.mapRet.put(aggregateKey, new DynamicOneColumnData(aggregations.get(i).right.dataType, true, true));
queryPathResult.put(aggregateKey, new DynamicOneColumnData(aggregations.get(i).right.dataType, true));
queryPathResult.put(aggregateKey, null);
} else {
duplicatedPaths.add(i);
}
}
// this process is on the basis of the traverse of partition variable
// in each [partitionStart, partitionEnd], [intervalStart, intervalEnd] would be considered
while (true) {
// after this, intervalEnd must be bigger or equals than partitionStart
while (intervalEnd < partitionStart) {
intervalIndex += 2;
if (intervalIndex >= longInterval.count)
@ -102,67 +105,86 @@ public class GroupByEngine {
intervalEnd = longInterval.flag[intervalIndex + 1] ? longInterval.v[intervalIndex + 1] : longInterval.v[intervalIndex + 1] - 1;
}
/** current partition is location in the left of intervals, using mod operator
to calculate the first satisfied partition which has intersection with intervals
**/
// current partition is location in the left of intervals, using mod operator
// to calculate the first satisfied partition which has intersection with intervals.
if (partitionEnd < intervalStart) {
partitionStart = intervalStart - (intervalStart % unit);
partitionStart = intervalStart - ((intervalStart - origin) % unit);
partitionEnd = partitionStart + unit - 1;
}
if (partitionStart < intervalStart) {
partitionStart = intervalStart;
}
while (intervalEnd <= partitionEnd) {
if (noFilterFlag) {
int cnt = 0;
for (Pair<Path, AggregateFunction> pair : aggregations) {
boolean firstPartitionFlag = true; // when calcGroupByAggregationWithoutFilter is invoked first time, this variable is true
Path path = pair.left;
AggregateFunction aggregateFunction = pair.right;
String aggregationKey = aggregationKey(path, aggregateFunction);
if (!duplicatedPaths.contains(cnt)) {
DynamicOneColumnData data = queryPathResult.get(aggregationKey);
if (data.timeLength == 0 || (data.curIdx < data.timeLength && data.getTime(data.curIdx) > intervalEnd)) {
data = readOneColumnWithoutFilter(path, data, fetchSize, null);
}
//System.out.println(partitionStart + "---" + partitionEnd);
if (partitionStart == 9999) {
System.out.println("haha");
}
while (true) {
int cnt = 0;
for (Pair<Path, AggregateFunction> pair : aggregations) {
if (duplicatedPaths.contains(cnt))
continue;
// read the data of aggregationKey until all the data in [intervalStart, intervalEnd] has been read
while (true) {
if (data.timeLength == 0 || (data.curIdx < data.timeLength && data.getTime(data.curIdx) > intervalEnd)) {
break;
}
if (data.curIdx >= data.timeLength) {
data = readOneColumnWithoutFilter(path, data, fetchSize, null);
if (data.timeLength == 0) {
break;
}
}
aggregateFunction.calcGroupByAggregationWithoutFilter(partitionStart, intervalStart, intervalEnd, data, firstPartitionFlag);
firstPartitionFlag = false;
}
}
cnt++;
Path path = pair.left;
AggregateFunction aggregateFunction = pair.right;
String aggregationKey = aggregationKey(path, aggregateFunction);
DynamicOneColumnData data = queryPathResult.get(aggregationKey);
if (data == null || data.curIdx >= data.timeLength) {
data = queryOnePath(path, data, filterStructures);
queryPathResult.put(aggregationKey, data);
}
while (true) {
aggregateFunction.calcGroupByAggregationWithoutFilter(partitionStart, partitionEnd, intervalStart, intervalEnd, data, false);
if (data.timeLength == 0 || data.curIdx < data.timeLength) {
break;
}
if (data.curIdx >= data.timeLength && data.timeLength != 0) {
data = queryOnePath(path, data, filterStructures);
}
if (data.timeLength == 0 || data.curIdx >= data.timeLength) {
break;
}
}
} else {
// with time filter and value filter
}
intervalIndex += 2;
if (intervalIndex >= longInterval.count)
if (intervalEnd <= partitionEnd) {
intervalIndex += 2;
if (intervalIndex >= longInterval.count)
break;
intervalStart = longInterval.flag[intervalIndex] ? longInterval.v[intervalIndex] : longInterval.v[intervalIndex] + 1;
intervalEnd = longInterval.flag[intervalIndex + 1] ? longInterval.v[intervalIndex + 1] : longInterval.v[intervalIndex + 1] - 1;
} else {
break;
intervalStart = longInterval.flag[intervalIndex] ? longInterval.v[intervalIndex] : longInterval.v[intervalIndex] + 1;
intervalEnd = longInterval.flag[intervalIndex + 1] ? longInterval.v[intervalIndex + 1] : longInterval.v[intervalIndex + 1] - 1;
}
}
if (intervalIndex >= longInterval.count)
break;
partitionStart = partitionEnd + 1;
partitionEnd = partitionStart + unit - 1;
}
int cnt = 0;
for (Pair<Path, AggregateFunction> pair : aggregations) {
if (duplicatedPaths.contains(cnt))
cnt++;
Path path = pair.left;
AggregateFunction aggregateFunction = pair.right;
groupByResult.mapRet.put(aggregationKey(path, aggregateFunction), aggregateFunction.result.data);
}
return groupByResult;
}
private DynamicOneColumnData queryOnePath(Path path, DynamicOneColumnData data, List<FilterStructure> filterStructures)
throws PathErrorException, IOException, ProcessorException {
if (filterStructures == null || filterStructures.size() == 0 || (filterStructures.size() == 1 && filterStructures.get(0).noFilter())) {
return readOneColumnWithoutFilter(path, data, 10000, null);
}
return null;
}
private DynamicOneColumnData readOneColumnWithoutFilter(Path path, DynamicOneColumnData res, int fetchSize, Integer readLock) throws ProcessorException, IOException, PathErrorException {
String deltaObjectID = path.getDeltaObjectToString();