From 19bc54d3cd7d9afd6d5d50164aa9d406db7e9fbe Mon Sep 17 00:00:00 2001 From: xingtanzjr Date: Mon, 22 May 2017 20:56:32 +0800 Subject: [PATCH] fuck --- pom.xml | 28 ------------------- .../edu/thu/tsfiledb/conf/TSFileDBConfig.java | 2 +- .../bufferwrite/BufferWriteProcessor.java | 8 +++++- .../engine/filenode/FileNodeManager.java | 20 +++++++++++-- .../engine/overflow/io/OverflowProcessor.java | 10 +++++++ .../thu/tsfiledb/service/TSServiceImpl.java | 10 +++++-- .../sys/writeLog/WriteLogManager.java | 24 ++++++++++++---- .../tsfiledb/sys/writeLog/WriteLogNode.java | 2 +- 8 files changed, 63 insertions(+), 41 deletions(-) diff --git a/pom.xml b/pom.xml index 892b3de5414..a734b71e602 100644 --- a/pom.xml +++ b/pom.xml @@ -49,34 +49,6 @@ antlr3-maven-plugin 3.4 - - org.slf4j - slf4j-api - 1.6.1 - - - ch.qos.logback - logback-core - - - ch.qos.logback - logback-classic - - - - jline - jline - 2.14.1 - - - - org.antlr - antlr3-maven-plugin - 3.4 - - - - diff --git a/src/main/java/cn/edu/thu/tsfiledb/conf/TSFileDBConfig.java b/src/main/java/cn/edu/thu/tsfiledb/conf/TSFileDBConfig.java index f45ae3110f5..e50dec817bc 100644 --- a/src/main/java/cn/edu/thu/tsfiledb/conf/TSFileDBConfig.java +++ b/src/main/java/cn/edu/thu/tsfiledb/conf/TSFileDBConfig.java @@ -43,7 +43,7 @@ public class TSFileDBConfig { public String writeLogPath = "src/main/resources/writeLog.log"; public int LogCompactSize = 100000; - public int LogMemorySize = 10000; + public int LogMemorySize = 1; public TSFileDBConfig() { } diff --git a/src/main/java/cn/edu/thu/tsfiledb/engine/bufferwrite/BufferWriteProcessor.java b/src/main/java/cn/edu/thu/tsfiledb/engine/bufferwrite/BufferWriteProcessor.java index 6ccf2e55673..55bf8dcbd1c 100644 --- a/src/main/java/cn/edu/thu/tsfiledb/engine/bufferwrite/BufferWriteProcessor.java +++ b/src/main/java/cn/edu/thu/tsfiledb/engine/bufferwrite/BufferWriteProcessor.java @@ -46,11 +46,13 @@ import cn.edu.thu.tsfile.timeseries.write.series.IRowGroupWriter; import cn.edu.thu.tsfiledb.conf.TSFileDBConfig; import cn.edu.thu.tsfiledb.conf.TSFileDBDescriptor; import cn.edu.thu.tsfiledb.engine.exception.BufferWriteProcessorException; +import cn.edu.thu.tsfiledb.engine.exception.FileNodeManagerException; import cn.edu.thu.tsfiledb.engine.lru.LRUProcessor; import cn.edu.thu.tsfiledb.engine.utils.FlushState; import cn.edu.thu.tsfiledb.exception.PathErrorException; import cn.edu.thu.tsfiledb.metadata.ColumnSchema; import cn.edu.thu.tsfiledb.metadata.MManager; +import cn.edu.thu.tsfiledb.sys.writeLog.WriteLogManager; public class BufferWriteProcessor extends LRUProcessor { @@ -618,13 +620,16 @@ public class BufferWriteProcessor extends LRUProcessor { e.printStackTrace(); throw new IOException(e); } - + + //For WAL + WriteLogManager.getInstance().startBufferWriteFlush(nameSpacePath); // flush bufferwrite data if (isFlushingSync) { try { super.flushRowGroup(false); writeStoreToDisk(); filenodeFlushAction.act(); + WriteLogManager.getInstance().endBufferWriteFlush(nameSpacePath); } catch (IOException e) { LOGGER.error("Flush row group to store failed, processor:{}. Message: {}", nameSpacePath, e.getMessage()); @@ -658,6 +663,7 @@ public class BufferWriteProcessor extends LRUProcessor { asyncFlushRowGroupToStore(); writeStoreToDisk(); filenodeFlushAction.act(); + WriteLogManager.getInstance().endBufferWriteFlush(nameSpacePath); } catch (IOException e) { /* * There should be added system log by CGF and throw diff --git a/src/main/java/cn/edu/thu/tsfiledb/engine/filenode/FileNodeManager.java b/src/main/java/cn/edu/thu/tsfiledb/engine/filenode/FileNodeManager.java index 844e8e46e46..c0731ee3133 100644 --- a/src/main/java/cn/edu/thu/tsfiledb/engine/filenode/FileNodeManager.java +++ b/src/main/java/cn/edu/thu/tsfiledb/engine/filenode/FileNodeManager.java @@ -38,6 +38,7 @@ import cn.edu.thu.tsfiledb.engine.overflow.io.OverflowProcessor; import cn.edu.thu.tsfiledb.exception.ErrorDebugException; import cn.edu.thu.tsfiledb.exception.PathErrorException; import cn.edu.thu.tsfiledb.metadata.MManager; +import cn.edu.thu.tsfiledb.sys.writeLog.WriteLogManager; public class FileNodeManager extends LRUManager { @@ -189,7 +190,14 @@ public class FileNodeManager extends LRUManager { e.printStackTrace(); throw new FileNodeManagerException(e); } - // overflowProcessor.writeLock(); + //For WAL + try { + WriteLogManager.getInstance().write(tsRecord, WriteLogManager.OVERFLOW); + } catch (IOException | PathErrorException e) { + LOGGER.error("Error in write WAL: {}", e.getMessage()); + throw new FileNodeManagerException(e); + } + for (DataPoint dataPoint : tsRecord.dataPointList) { try { overflowProcessor.insert(deltaObjectId, dataPoint.getMeasurementId(), timestamp, @@ -222,7 +230,15 @@ public class FileNodeManager extends LRUManager { String fileAbsolutePath = bufferWriteProcessor.getFileAbsolutePath(); fileNodeProcessor.addIntervalFileNode(timestamp, fileAbsolutePath); } - // bufferWriteProcessor.writeLock(); + + //For WAL + try { + WriteLogManager.getInstance().write(tsRecord, WriteLogManager.BUFFERWRITER); + } catch (IOException | PathErrorException e) { + LOGGER.error("Error in write WAL: {}", e.getMessage()); + throw new FileNodeManagerException(e); + } + try { bufferWriteProcessor.write(tsRecord); } catch (BufferWriteProcessorException e) { diff --git a/src/main/java/cn/edu/thu/tsfiledb/engine/overflow/io/OverflowProcessor.java b/src/main/java/cn/edu/thu/tsfiledb/engine/overflow/io/OverflowProcessor.java index daf2152b4b4..d3268aeb897 100644 --- a/src/main/java/cn/edu/thu/tsfiledb/engine/overflow/io/OverflowProcessor.java +++ b/src/main/java/cn/edu/thu/tsfiledb/engine/overflow/io/OverflowProcessor.java @@ -27,6 +27,7 @@ import cn.edu.thu.tsfiledb.engine.overflow.utils.ReadWriteThriftFormatUtils; import cn.edu.thu.tsfiledb.engine.overflow.utils.TSFileMetaDataConverter; import cn.edu.thu.tsfiledb.engine.overflow.utils.TimePair; import cn.edu.thu.tsfiledb.engine.utils.FlushState; +import cn.edu.thu.tsfiledb.sys.writeLog.WriteLogManager; import cn.edu.thu.tsfile.common.exception.ProcessorException; public class OverflowProcessor extends LRUProcessor { @@ -380,6 +381,13 @@ public class OverflowProcessor extends LRUProcessor { } } } + + try { + WriteLogManager.getInstance().startOverflowFlush(nameSpacePath); + } catch (IOException e1) { + throw new OverflowProcessorException(e1); + } + ofSupport.switchWorkToFlush(); recordCount = 0; // update the status of the newIntervalFiles @@ -409,6 +417,7 @@ public class OverflowProcessor extends LRUProcessor { // call filenode manager function to flush overflow // nameSpacePath set filenodeManagerFlushAction.act(); + WriteLogManager.getInstance().endOverflowFlush(nameSpacePath); } catch (IOException e) { LOGGER.error("Flush overflow rowGroup to file failed synchronously"); throw new OverflowProcessorException( @@ -443,6 +452,7 @@ public class OverflowProcessor extends LRUProcessor { // call filenode manager function to flush overflow // nameSpacePath set filenodeManagerFlushAction.act(); + WriteLogManager.getInstance().endOverflowFlush(nameSpacePath); } catch (IOException e) { LOGGER.error("Flush overflow rowgroup to file error in asynchronously. The reason is {}", e.getMessage()); diff --git a/src/main/java/cn/edu/thu/tsfiledb/service/TSServiceImpl.java b/src/main/java/cn/edu/thu/tsfiledb/service/TSServiceImpl.java index 47728f08fbc..ef23bbc90b6 100644 --- a/src/main/java/cn/edu/thu/tsfiledb/service/TSServiceImpl.java +++ b/src/main/java/cn/edu/thu/tsfiledb/service/TSServiceImpl.java @@ -556,7 +556,7 @@ public class TSServiceImpl implements TSIService.Iface { operationHandle = new TSOperationHandle(operationId, false); resp.setOperationHandle(operationHandle); return resp; - } catch (QueryProcessorException e) { + } catch (QueryProcessorException | PathErrorException e) { LOGGER.error(e.getMessage()); return getTSExecuteStatementResp(TS_StatusCode.ERROR_STATUS, e.getMessage()); } catch (IOException e) { @@ -598,7 +598,11 @@ public class TSServiceImpl implements TSIService.Iface { return getTSExecuteStatementResp(TS_StatusCode.ERROR_STATUS, e.getMessage()); } if (execRet && needToBeWritenToLog(plan)) { - writeLogManager.write(plan); + try { + writeLogManager.write(plan); + } catch (PathErrorException e) { + throw new ProcessorException(e); + } } TS_StatusCode statusCode = execRet ? TS_StatusCode.SUCCESS_STATUS : TS_StatusCode.ERROR_STATUS; String msg = execRet ? "Execute successfully" : "Execute statement error."; @@ -613,7 +617,7 @@ public class TSServiceImpl implements TSIService.Iface { private boolean needToBeWritenToLog(PhysicalPlan plan) { if (plan.getOperatorType() == OperatorType.INSERT) { - return true; + return false; } if (plan.getOperatorType() == OperatorType.UPDATE) { return true; diff --git a/src/main/java/cn/edu/thu/tsfiledb/sys/writeLog/WriteLogManager.java b/src/main/java/cn/edu/thu/tsfiledb/sys/writeLog/WriteLogManager.java index 2c8186fe1f7..a66289342c1 100644 --- a/src/main/java/cn/edu/thu/tsfiledb/sys/writeLog/WriteLogManager.java +++ b/src/main/java/cn/edu/thu/tsfiledb/sys/writeLog/WriteLogManager.java @@ -43,13 +43,27 @@ public class WriteLogManager { getWriteLogNode(MManager.getInstance().getFileNameByPath(plan.getPath().getFullPath())).write(plan); } - public void write(TSRecord record, int isOverflow) throws IOException { - + public void write(TSRecord record, int type) throws IOException, PathErrorException { + getWriteLogNode(MManager.getInstance().getFileNameByPath(record.deltaObjectId)).write(record, type); } - - public void flush() { - + + public void startOverflowFlush(String nsPath) throws IOException { + getWriteLogNode(nsPath).overflowFlushStart(); } + + public void endOverflowFlush(String nsPath) throws IOException { + getWriteLogNode(nsPath).overflowFlushEnd(); + } + + public void startBufferWriteFlush(String nsPath) throws IOException { + getWriteLogNode(nsPath).bufferFlushStart(); + } + + public void endBufferWriteFlush(String nsPath) throws IOException { + getWriteLogNode(nsPath).bufferFlushEnd(); + } + + public PhysicalPlan getPhysicalPlan() throws IOException { return null; diff --git a/src/main/java/cn/edu/thu/tsfiledb/sys/writeLog/WriteLogNode.java b/src/main/java/cn/edu/thu/tsfiledb/sys/writeLog/WriteLogNode.java index b40ba8e3f82..cfa39a50182 100644 --- a/src/main/java/cn/edu/thu/tsfiledb/sys/writeLog/WriteLogNode.java +++ b/src/main/java/cn/edu/thu/tsfiledb/sys/writeLog/WriteLogNode.java @@ -83,7 +83,7 @@ public class WriteLogNode { List insertValues = new ArrayList<>(); for (DataPoint dp : record.dataPointList) { measurementList.add(dp.getMeasurementId()); - insertValues.add((String)dp.getValue()); + insertValues.add(dp.getValue().toString()); } plansInMemory.add(new MultiInsertPlan(1, record.deltaObjectId, record.time, measurementList, insertValues)); }