This commit is contained in:
xingtanzjr 2017-05-22 20:56:32 +08:00
parent edbd1677e1
commit 19bc54d3cd
8 changed files with 63 additions and 41 deletions

28
pom.xml
View File

@ -49,34 +49,6 @@
<artifactId>antlr3-maven-plugin</artifactId> <artifactId>antlr3-maven-plugin</artifactId>
<version>3.4</version> <version>3.4</version>
</dependency> </dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>jline</groupId>
<artifactId>jline</artifactId>
<version>2.14.1</version>
</dependency>
<dependency>
<groupId>org.antlr</groupId>
<artifactId>antlr3-maven-plugin</artifactId>
<version>3.4</version>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@ -43,7 +43,7 @@ public class TSFileDBConfig {
public String writeLogPath = "src/main/resources/writeLog.log"; public String writeLogPath = "src/main/resources/writeLog.log";
public int LogCompactSize = 100000; public int LogCompactSize = 100000;
public int LogMemorySize = 10000; public int LogMemorySize = 1;
public TSFileDBConfig() { public TSFileDBConfig() {
} }

View File

@ -46,11 +46,13 @@ import cn.edu.thu.tsfile.timeseries.write.series.IRowGroupWriter;
import cn.edu.thu.tsfiledb.conf.TSFileDBConfig; import cn.edu.thu.tsfiledb.conf.TSFileDBConfig;
import cn.edu.thu.tsfiledb.conf.TSFileDBDescriptor; import cn.edu.thu.tsfiledb.conf.TSFileDBDescriptor;
import cn.edu.thu.tsfiledb.engine.exception.BufferWriteProcessorException; import cn.edu.thu.tsfiledb.engine.exception.BufferWriteProcessorException;
import cn.edu.thu.tsfiledb.engine.exception.FileNodeManagerException;
import cn.edu.thu.tsfiledb.engine.lru.LRUProcessor; import cn.edu.thu.tsfiledb.engine.lru.LRUProcessor;
import cn.edu.thu.tsfiledb.engine.utils.FlushState; import cn.edu.thu.tsfiledb.engine.utils.FlushState;
import cn.edu.thu.tsfiledb.exception.PathErrorException; import cn.edu.thu.tsfiledb.exception.PathErrorException;
import cn.edu.thu.tsfiledb.metadata.ColumnSchema; import cn.edu.thu.tsfiledb.metadata.ColumnSchema;
import cn.edu.thu.tsfiledb.metadata.MManager; import cn.edu.thu.tsfiledb.metadata.MManager;
import cn.edu.thu.tsfiledb.sys.writeLog.WriteLogManager;
public class BufferWriteProcessor extends LRUProcessor { public class BufferWriteProcessor extends LRUProcessor {
@ -618,13 +620,16 @@ public class BufferWriteProcessor extends LRUProcessor {
e.printStackTrace(); e.printStackTrace();
throw new IOException(e); throw new IOException(e);
} }
//For WAL
WriteLogManager.getInstance().startBufferWriteFlush(nameSpacePath);
// flush bufferwrite data // flush bufferwrite data
if (isFlushingSync) { if (isFlushingSync) {
try { try {
super.flushRowGroup(false); super.flushRowGroup(false);
writeStoreToDisk(); writeStoreToDisk();
filenodeFlushAction.act(); filenodeFlushAction.act();
WriteLogManager.getInstance().endBufferWriteFlush(nameSpacePath);
} catch (IOException e) { } catch (IOException e) {
LOGGER.error("Flush row group to store failed, processor:{}. Message: {}", nameSpacePath, LOGGER.error("Flush row group to store failed, processor:{}. Message: {}", nameSpacePath,
e.getMessage()); e.getMessage());
@ -658,6 +663,7 @@ public class BufferWriteProcessor extends LRUProcessor {
asyncFlushRowGroupToStore(); asyncFlushRowGroupToStore();
writeStoreToDisk(); writeStoreToDisk();
filenodeFlushAction.act(); filenodeFlushAction.act();
WriteLogManager.getInstance().endBufferWriteFlush(nameSpacePath);
} catch (IOException e) { } catch (IOException e) {
/* /*
* There should be added system log by CGF and throw * There should be added system log by CGF and throw

View File

@ -38,6 +38,7 @@ import cn.edu.thu.tsfiledb.engine.overflow.io.OverflowProcessor;
import cn.edu.thu.tsfiledb.exception.ErrorDebugException; import cn.edu.thu.tsfiledb.exception.ErrorDebugException;
import cn.edu.thu.tsfiledb.exception.PathErrorException; import cn.edu.thu.tsfiledb.exception.PathErrorException;
import cn.edu.thu.tsfiledb.metadata.MManager; import cn.edu.thu.tsfiledb.metadata.MManager;
import cn.edu.thu.tsfiledb.sys.writeLog.WriteLogManager;
public class FileNodeManager extends LRUManager<FileNodeProcessor> { public class FileNodeManager extends LRUManager<FileNodeProcessor> {
@ -189,7 +190,14 @@ public class FileNodeManager extends LRUManager<FileNodeProcessor> {
e.printStackTrace(); e.printStackTrace();
throw new FileNodeManagerException(e); throw new FileNodeManagerException(e);
} }
// overflowProcessor.writeLock(); //For WAL
try {
WriteLogManager.getInstance().write(tsRecord, WriteLogManager.OVERFLOW);
} catch (IOException | PathErrorException e) {
LOGGER.error("Error in write WAL: {}", e.getMessage());
throw new FileNodeManagerException(e);
}
for (DataPoint dataPoint : tsRecord.dataPointList) { for (DataPoint dataPoint : tsRecord.dataPointList) {
try { try {
overflowProcessor.insert(deltaObjectId, dataPoint.getMeasurementId(), timestamp, overflowProcessor.insert(deltaObjectId, dataPoint.getMeasurementId(), timestamp,
@ -222,7 +230,15 @@ public class FileNodeManager extends LRUManager<FileNodeProcessor> {
String fileAbsolutePath = bufferWriteProcessor.getFileAbsolutePath(); String fileAbsolutePath = bufferWriteProcessor.getFileAbsolutePath();
fileNodeProcessor.addIntervalFileNode(timestamp, fileAbsolutePath); fileNodeProcessor.addIntervalFileNode(timestamp, fileAbsolutePath);
} }
// bufferWriteProcessor.writeLock();
//For WAL
try {
WriteLogManager.getInstance().write(tsRecord, WriteLogManager.BUFFERWRITER);
} catch (IOException | PathErrorException e) {
LOGGER.error("Error in write WAL: {}", e.getMessage());
throw new FileNodeManagerException(e);
}
try { try {
bufferWriteProcessor.write(tsRecord); bufferWriteProcessor.write(tsRecord);
} catch (BufferWriteProcessorException e) { } catch (BufferWriteProcessorException e) {

View File

@ -27,6 +27,7 @@ import cn.edu.thu.tsfiledb.engine.overflow.utils.ReadWriteThriftFormatUtils;
import cn.edu.thu.tsfiledb.engine.overflow.utils.TSFileMetaDataConverter; import cn.edu.thu.tsfiledb.engine.overflow.utils.TSFileMetaDataConverter;
import cn.edu.thu.tsfiledb.engine.overflow.utils.TimePair; import cn.edu.thu.tsfiledb.engine.overflow.utils.TimePair;
import cn.edu.thu.tsfiledb.engine.utils.FlushState; import cn.edu.thu.tsfiledb.engine.utils.FlushState;
import cn.edu.thu.tsfiledb.sys.writeLog.WriteLogManager;
import cn.edu.thu.tsfile.common.exception.ProcessorException; import cn.edu.thu.tsfile.common.exception.ProcessorException;
public class OverflowProcessor extends LRUProcessor { public class OverflowProcessor extends LRUProcessor {
@ -380,6 +381,13 @@ public class OverflowProcessor extends LRUProcessor {
} }
} }
} }
try {
WriteLogManager.getInstance().startOverflowFlush(nameSpacePath);
} catch (IOException e1) {
throw new OverflowProcessorException(e1);
}
ofSupport.switchWorkToFlush(); ofSupport.switchWorkToFlush();
recordCount = 0; recordCount = 0;
// update the status of the newIntervalFiles // update the status of the newIntervalFiles
@ -409,6 +417,7 @@ public class OverflowProcessor extends LRUProcessor {
// call filenode manager function to flush overflow // call filenode manager function to flush overflow
// nameSpacePath set // nameSpacePath set
filenodeManagerFlushAction.act(); filenodeManagerFlushAction.act();
WriteLogManager.getInstance().endOverflowFlush(nameSpacePath);
} catch (IOException e) { } catch (IOException e) {
LOGGER.error("Flush overflow rowGroup to file failed synchronously"); LOGGER.error("Flush overflow rowGroup to file failed synchronously");
throw new OverflowProcessorException( throw new OverflowProcessorException(
@ -443,6 +452,7 @@ public class OverflowProcessor extends LRUProcessor {
// call filenode manager function to flush overflow // call filenode manager function to flush overflow
// nameSpacePath set // nameSpacePath set
filenodeManagerFlushAction.act(); filenodeManagerFlushAction.act();
WriteLogManager.getInstance().endOverflowFlush(nameSpacePath);
} catch (IOException e) { } catch (IOException e) {
LOGGER.error("Flush overflow rowgroup to file error in asynchronously. The reason is {}", LOGGER.error("Flush overflow rowgroup to file error in asynchronously. The reason is {}",
e.getMessage()); e.getMessage());

View File

@ -556,7 +556,7 @@ public class TSServiceImpl implements TSIService.Iface {
operationHandle = new TSOperationHandle(operationId, false); operationHandle = new TSOperationHandle(operationId, false);
resp.setOperationHandle(operationHandle); resp.setOperationHandle(operationHandle);
return resp; return resp;
} catch (QueryProcessorException e) { } catch (QueryProcessorException | PathErrorException e) {
LOGGER.error(e.getMessage()); LOGGER.error(e.getMessage());
return getTSExecuteStatementResp(TS_StatusCode.ERROR_STATUS, e.getMessage()); return getTSExecuteStatementResp(TS_StatusCode.ERROR_STATUS, e.getMessage());
} catch (IOException e) { } catch (IOException e) {
@ -598,7 +598,11 @@ public class TSServiceImpl implements TSIService.Iface {
return getTSExecuteStatementResp(TS_StatusCode.ERROR_STATUS, e.getMessage()); return getTSExecuteStatementResp(TS_StatusCode.ERROR_STATUS, e.getMessage());
} }
if (execRet && needToBeWritenToLog(plan)) { if (execRet && needToBeWritenToLog(plan)) {
writeLogManager.write(plan); try {
writeLogManager.write(plan);
} catch (PathErrorException e) {
throw new ProcessorException(e);
}
} }
TS_StatusCode statusCode = execRet ? TS_StatusCode.SUCCESS_STATUS : TS_StatusCode.ERROR_STATUS; TS_StatusCode statusCode = execRet ? TS_StatusCode.SUCCESS_STATUS : TS_StatusCode.ERROR_STATUS;
String msg = execRet ? "Execute successfully" : "Execute statement error."; String msg = execRet ? "Execute successfully" : "Execute statement error.";
@ -613,7 +617,7 @@ public class TSServiceImpl implements TSIService.Iface {
private boolean needToBeWritenToLog(PhysicalPlan plan) { private boolean needToBeWritenToLog(PhysicalPlan plan) {
if (plan.getOperatorType() == OperatorType.INSERT) { if (plan.getOperatorType() == OperatorType.INSERT) {
return true; return false;
} }
if (plan.getOperatorType() == OperatorType.UPDATE) { if (plan.getOperatorType() == OperatorType.UPDATE) {
return true; return true;

View File

@ -43,13 +43,27 @@ public class WriteLogManager {
getWriteLogNode(MManager.getInstance().getFileNameByPath(plan.getPath().getFullPath())).write(plan); getWriteLogNode(MManager.getInstance().getFileNameByPath(plan.getPath().getFullPath())).write(plan);
} }
public void write(TSRecord record, int isOverflow) throws IOException { public void write(TSRecord record, int type) throws IOException, PathErrorException {
getWriteLogNode(MManager.getInstance().getFileNameByPath(record.deltaObjectId)).write(record, type);
} }
public void flush() { public void startOverflowFlush(String nsPath) throws IOException {
getWriteLogNode(nsPath).overflowFlushStart();
} }
public void endOverflowFlush(String nsPath) throws IOException {
getWriteLogNode(nsPath).overflowFlushEnd();
}
public void startBufferWriteFlush(String nsPath) throws IOException {
getWriteLogNode(nsPath).bufferFlushStart();
}
public void endBufferWriteFlush(String nsPath) throws IOException {
getWriteLogNode(nsPath).bufferFlushEnd();
}
public PhysicalPlan getPhysicalPlan() throws IOException { public PhysicalPlan getPhysicalPlan() throws IOException {
return null; return null;

View File

@ -83,7 +83,7 @@ public class WriteLogNode {
List<String> insertValues = new ArrayList<>(); List<String> insertValues = new ArrayList<>();
for (DataPoint dp : record.dataPointList) { for (DataPoint dp : record.dataPointList) {
measurementList.add(dp.getMeasurementId()); measurementList.add(dp.getMeasurementId());
insertValues.add((String)dp.getValue()); insertValues.add(dp.getValue().toString());
} }
plansInMemory.add(new MultiInsertPlan(1, record.deltaObjectId, record.time, measurementList, insertValues)); plansInMemory.add(new MultiInsertPlan(1, record.deltaObjectId, record.time, measurementList, insertValues));
} }