now the five aggregation is right in Memory data, i.e. InserDynamicData

This commit is contained in:
CGF 2017-09-17 22:39:58 +08:00
parent e1de6e0f3b
commit 43edc943d8
3 changed files with 53 additions and 10 deletions

View File

@ -80,7 +80,32 @@ public class MaxValueAggrFunc extends AggregateFunction {
@Override
public boolean calcAggregationUsingTimestamps(InsertDynamicData insertMemoryData, List<Long> timestamps, int timeIndex) throws IOException, ProcessorException {
return false;
while (timeIndex < timestamps.size()) {
if (insertMemoryData.hasInsertData()) {
if (timestamps.get(timeIndex) == insertMemoryData.getCurrentMinTime()) {
Object value = insertMemoryData.getCurrentObjectValue();
if (!hasSetValue) {
result.data.putAnObject(value);
hasSetValue = true;
} else {
Comparable<?> v = result.data.getAnObject(0);
if (compare(v, (Comparable<?>) value) < 0) {
result.data.setAnObject(0, (Comparable<?>) value);
}
}
timeIndex ++;
insertMemoryData.removeCurrentValue();
} else if (timestamps.get(timeIndex) > insertMemoryData.getCurrentMinTime()) {
insertMemoryData.removeCurrentValue();
} else {
timeIndex += 1;
}
} else {
break;
}
}
return insertMemoryData.hasInsertData();
}
private Comparable<?> getMaxValue(DynamicOneColumnData dataInThisPage) {

View File

@ -27,7 +27,6 @@ public class AggregateEngine {
private static final Logger LOG = LoggerFactory.getLogger(AggregateEngine.class);
public static int batchSize = 50000;
private static ThreadLocal<Boolean> threadLocal = new ThreadLocal<>();
/**
* <p>Public invoking method of multiple aggregation.</p>
@ -52,9 +51,6 @@ public class AggregateEngine {
}
QueryDataSet ansQueryDataSet = new QueryDataSet();
if (threadLocal.get() != null && threadLocal.get()) {
return ansQueryDataSet;
}
List<QueryDataSet> filterQueryDataSets = new ArrayList<>(); // stores QueryDataSet of each FilterStructure answer
List<long[]> timeArray = new ArrayList<>(); // stores calculated common timestamps of each FilterStructure answer
@ -217,7 +213,6 @@ public class AggregateEngine {
// * </p>
// */
// aggres.get(0).right.maps.put("done", true);
threadLocal.set(true);
return ansQueryDataSet;
}

View File

@ -32,6 +32,11 @@ public class OverflowQueryEngine {
private static final Logger LOGGER = LoggerFactory.getLogger(OverflowQueryEngine.class);
private MManager mManager;
private int formNumber = -1;
/**
* this variable is represent that whether it is
* the second execution of aggregate method.
*/
private static ThreadLocal<Boolean> threadLocal = new ThreadLocal<>();
public OverflowQueryEngine() {
mManager = MManager.getInstance();
@ -48,7 +53,8 @@ public class OverflowQueryEngine {
}
/**
* Basic query function.
* <p>
* Basic query method.
*
* @param formNumber a complex query will be taken out to some disjunctive normal forms in query process,
* the formNumber represent the number of normal form.
@ -56,8 +62,8 @@ public class OverflowQueryEngine {
* @param queryDataSet query data set to return
* @param fetchSize fetch size for batch read
* @return basic QueryDataSet
* @throws ProcessorException
* @throws IOException
* @throws ProcessorException series resolve error
* @throws IOException TsFile read error
*/
public QueryDataSet query(int formNumber, List<Path> paths, FilterExpression timeFilter, FilterExpression freqFilter,
FilterExpression valueFilter, QueryDataSet queryDataSet, int fetchSize) throws ProcessorException, IOException, PathErrorException {
@ -75,7 +81,18 @@ public class OverflowQueryEngine {
}
}
/**
* <p>
* Basic aggregation method,
* both single aggregation method or multi aggregation method is implemented here.
*
* @param aggres a list of aggregations and corresponding path
* @param filterStructures see <code>FilterStructure</code>, a list of all conjunction form
* @return result QueryDataSet
* @throws ProcessorException series resolve error
* @throws IOException TsFile read error
* @throws PathErrorException path resolve error
*/
public QueryDataSet aggregate(List<Pair<Path, String>> aggres, List<FilterStructure> filterStructures)
throws ProcessorException, IOException, PathErrorException {
LOGGER.info("Aggregation content: {}", aggres.toString());
@ -85,6 +102,12 @@ public class OverflowQueryEngine {
AggregateFunction func = AggreFuncFactory.getAggrFuncByName(pair.right, dataType);
aggregations.add(new Pair<>(pair.left, func));
}
if (threadLocal.get() != null && threadLocal.get()) {
threadLocal.remove();
return new QueryDataSet();
}
threadLocal.set(true);
return AggregateEngine.multiAggregate(aggregations, filterStructures);
}