system log

This commit is contained in:
CGF 2017-05-22 20:22:05 +08:00
parent 3191888b3c
commit edbd1677e1
3 changed files with 51 additions and 1 deletions

View File

@ -36,6 +36,15 @@ public class MultiInsertPlan extends PhysicalPlan {
this.insertValues = insertValues;
}
public MultiInsertPlan(int insertType, String deltaObject, long insertTime, List<String> measurementList, List<String> insertValues) {
super(false, OperatorType.MULTIINSERT);
this.insertType = insertType;
this.insertTime = insertTime;
this.deltaObject = deltaObject;
this.measurementList = measurementList;
this.insertValues = insertValues;
}
@Override
public boolean processNonQuery(QueryProcessExecutor exec) throws ProcessorException{
insertType = exec.multiInsert(deltaObject, insertTime, measurementList, insertValues);

View File

@ -4,6 +4,9 @@ import java.io.IOException;
import java.util.HashMap;
import cn.edu.thu.tsfile.timeseries.read.qp.Path;
import cn.edu.thu.tsfile.timeseries.write.record.TSRecord;
import cn.edu.thu.tsfiledb.exception.PathErrorException;
import cn.edu.thu.tsfiledb.metadata.MManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -13,6 +16,7 @@ public class WriteLogManager {
private static final Logger LOG = LoggerFactory.getLogger(WriteLogManager.class);
private static WriteLogManager instance;
private static HashMap<String, WriteLogNode> logNodeMaps;
public static final int BUFFERWRITER = 0, OVERFLOW = 1;
private WriteLogManager() {
logNodeMaps = new HashMap<>();
@ -35,7 +39,15 @@ public class WriteLogManager {
return logNodeMaps.get(fileNode);
}
public void write(PhysicalPlan plan) throws IOException {
public void write(PhysicalPlan plan) throws IOException, PathErrorException {
getWriteLogNode(MManager.getInstance().getFileNameByPath(plan.getPath().getFullPath())).write(plan);
}
public void write(TSRecord record, int isOverflow) throws IOException {
}
public void flush() {
}

View File

@ -2,8 +2,12 @@ package cn.edu.thu.tsfiledb.sys.writeLog;
import cn.edu.thu.tsfile.common.utils.BytesUtils;
import cn.edu.thu.tsfile.timeseries.read.qp.Path;
import cn.edu.thu.tsfile.timeseries.write.record.DataPoint;
import cn.edu.thu.tsfile.timeseries.write.record.TSRecord;
import cn.edu.thu.tsfiledb.conf.TSFileDBDescriptor;
import cn.edu.thu.tsfiledb.qp.logical.operator.Operator;
import cn.edu.thu.tsfiledb.qp.physical.plan.InsertPlan;
import cn.edu.thu.tsfiledb.qp.physical.plan.MultiInsertPlan;
import cn.edu.thu.tsfiledb.qp.physical.plan.PhysicalPlan;
import cn.edu.thu.tsfiledb.sys.writeLog.impl.LocalFileLogReader;
import cn.edu.thu.tsfiledb.sys.writeLog.impl.LocalFileLogWriter;
@ -65,6 +69,31 @@ public class WriteLogNode {
}
}
synchronized public void write(TSRecord record, int flag) throws IOException {
if (flag == WriteLogManager.OVERFLOW) {
List<String> measurementList = new ArrayList<>();
List<String> insertValues = new ArrayList<>();
for (DataPoint dp : record.dataPointList) {
measurementList.add(dp.getMeasurementId());
insertValues.add((String)dp.getValue());
}
plansInMemory.add(new MultiInsertPlan(2, record.deltaObjectId, record.time, measurementList, insertValues));
} else if (flag == WriteLogManager.BUFFERWRITER) {
List<String> measurementList = new ArrayList<>();
List<String> insertValues = new ArrayList<>();
for (DataPoint dp : record.dataPointList) {
measurementList.add(dp.getMeasurementId());
insertValues.add((String)dp.getValue());
}
plansInMemory.add(new MultiInsertPlan(1, record.deltaObjectId, record.time, measurementList, insertValues));
}
if (plansInMemory.size() >= LogMemorySize) {
serializeMemoryToFile();
logSize += plansInMemory.size();
checkLogsCompactFileSize(false);
}
}
synchronized public void overflowFlushStart() throws IOException {
serializeMemoryToFile();