diff --git a/Readme.md b/Readme.md index 582f4eab6e6..30f6a9a9ac7 100644 --- a/Readme.md +++ b/Readme.md @@ -22,8 +22,8 @@ To use IoTDB, you need to have: 1. Java >= 1.8 2. Maven >= 3.0 (If you want to compile and install IoTDB from source code) -3. TsFile >= 0.6.0 (TsFile Github page: [https://github.com/thulab/tsfile](https://github.com/thulab/tsfile)) -4. IoTDB-JDBC >= 0.6.0 (IoTDB-JDBC Github page: [https://github.com/thulab/iotdb-jdbc](https://github.com/thulab/iotdb-jdbc)) +3. TsFile >= 0.7.0 (TsFile Github page: [https://github.com/thulab/tsfile](https://github.com/thulab/tsfile)) +4. IoTDB-JDBC >= 0.7.0 (IoTDB-JDBC Github page: [https://github.com/thulab/iotdb-jdbc](https://github.com/thulab/iotdb-jdbc)) TODO: TsFile and IoTDB-JDBC dependencies will be removed after Jialin Qiao re-structured the Project. @@ -125,7 +125,7 @@ The command line client is interactive so if everything is ready you should see | | .--.|_/ | | \_| | | `. \ | |_) | | | / .'`\ \ | | | | | | | __'. _| |_| \__. | _| |_ _| |_.' /_| |__) | -|_____|'.__.' |_____| |______.'|_______/ version 0.6.0 +|_____|'.__.' |_____| |______.'|_______/ version 0.7.0 IoTDB> login successfully diff --git a/iotdb/conf/error_info_cn.properties b/iotdb/conf/error_info_cn.properties new file mode 100644 index 00000000000..c0d6095986e --- /dev/null +++ b/iotdb/conf/error_info_cn.properties @@ -0,0 +1,9 @@ +20000=未知错误 +20001=语句中无变量 +20002=无效的变量 +20003=无法连接到服务器:%s (%s) +20061=验证失败:%S +20062=不安全的函数调用:%s +20064=M客户端内存溢出 +20130=语句未就绪 +20220=连接失败 \ No newline at end of file diff --git a/iotdb/conf/error_info_en.properties b/iotdb/conf/error_info_en.properties new file mode 100644 index 00000000000..c2f52c4d792 --- /dev/null +++ b/iotdb/conf/error_info_en.properties @@ -0,0 +1,9 @@ +20000=Unknown error +20001=No parameters exist in the statement +20002=Invalid parameter number +20003=Can't connect to server on %s(%s) +20061=Authentication failed: %s +20062=Insecure API function call: %s +20064=Client ran out of memory +20130=Statement not prepared +20220=Fail to connect. \ No newline at end of file diff --git a/pom.xml b/pom.xml index 55c194ce621..2eb47727b68 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ cn.edu.tsinghua IoTDB - 0.6.0 + 0.8.0-SNAPSHOT jar IoTDB @@ -23,7 +23,7 @@ cn.edu.tsinghua iotdb-jdbc - 0.6.0 + 0.8.0-SNAPSHOT cn.edu.fudan.dsm diff --git a/src/main/java/cn/edu/tsinghua/iotdb/conf/TsFileDBConstant.java b/src/main/java/cn/edu/tsinghua/iotdb/conf/TsFileDBConstant.java index 904dd7364ec..0d509e3c0e5 100644 --- a/src/main/java/cn/edu/tsinghua/iotdb/conf/TsFileDBConstant.java +++ b/src/main/java/cn/edu/tsinghua/iotdb/conf/TsFileDBConstant.java @@ -4,7 +4,7 @@ public class TsFileDBConstant { public static final String ENV_FILE_NAME = "iotdb-env"; public static final String IOTDB_CONF = "IOTDB_CONF"; public static final String GLOBAL_DB_NAME = "IoTDB"; - public static final String VERSION = "0.6.0"; + public static final String VERSION = "0.7.0"; public static final String REMOTE_JMX_PORT_NAME = "com.sun.management.jmxremote.port"; public static final String TSFILEDB_LOCAL_JMX_PORT_NAME = "iotdb.jmx.local.port"; public static final String TSFILEDB_REMOTE_JMX_PORT_NAME = "iotdb.jmx.remote.port"; diff --git a/src/main/java/cn/edu/tsinghua/iotdb/conf/TsfileDBConfig.java b/src/main/java/cn/edu/tsinghua/iotdb/conf/TsfileDBConfig.java index edb471551e2..41ff2fecdc8 100644 --- a/src/main/java/cn/edu/tsinghua/iotdb/conf/TsfileDBConfig.java +++ b/src/main/java/cn/edu/tsinghua/iotdb/conf/TsfileDBConfig.java @@ -14,6 +14,7 @@ public class TsfileDBConfig { public static final String default_tsfile_dir = "settled"; public static final String mult_dir_strategy_prefix = "cn.edu.tsinghua.iotdb.conf.directories.strategy."; public static final String default_mult_dir_strategy = "MaxDiskUsableSpaceFirstStrategy"; + /** * Port which JDBC server listens to */ @@ -246,6 +247,11 @@ public class TsfileDBConfig { */ public int postbackServerPort = 5555; + /* + * Set the language version when loading file including error information, default value is "EN" + * */ + public String languageVersion = "EN"; + /** * Choose a postBack strategy of merging historical data: 1. It's more likely to * update historical data, choose "true". 2. It's more likely not to update diff --git a/src/main/java/cn/edu/tsinghua/iotdb/conf/TsfileDBDescriptor.java b/src/main/java/cn/edu/tsinghua/iotdb/conf/TsfileDBDescriptor.java index aa55484bd9d..d9fb69d5aa2 100644 --- a/src/main/java/cn/edu/tsinghua/iotdb/conf/TsfileDBDescriptor.java +++ b/src/main/java/cn/edu/tsinghua/iotdb/conf/TsfileDBDescriptor.java @@ -149,6 +149,8 @@ public class TsfileDBDescriptor { int maxLogEntrySize = Integer.parseInt(properties.getProperty("max_log_entry_size", conf.maxLogEntrySize + "").trim()); conf.maxLogEntrySize = maxLogEntrySize > 0 ? maxLogEntrySize : conf.maxLogEntrySize; + conf.languageVersion = properties.getProperty("language_version", conf.languageVersion).trim(); + String tmpTimeZone = properties.getProperty("time_zone", conf.timeZone.getID()); try { conf.timeZone = DateTimeZone.forID(tmpTimeZone.trim()); diff --git a/src/main/java/cn/edu/tsinghua/iotdb/conf/directories/strategy/MinDirOccupiedSpaceFirstStrategy.java b/src/main/java/cn/edu/tsinghua/iotdb/conf/directories/strategy/MinDirOccupiedSpaceFirstStrategy.java new file mode 100644 index 00000000000..883374e301f --- /dev/null +++ b/src/main/java/cn/edu/tsinghua/iotdb/conf/directories/strategy/MinDirOccupiedSpaceFirstStrategy.java @@ -0,0 +1,62 @@ +package cn.edu.tsinghua.iotdb.conf.directories.strategy; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.logging.Logger; + +public class MinDirOccupiedSpaceFirstStrategy extends DirectoryStrategy { + + // directory space is measured by MB + private final long DATA_SIZE_SHIFT = 1024 * 1024; + + @Override + public int nextFolderIndex() { + return getMinOccupiedSpaceFolder(); + } + + private int getMinOccupiedSpaceFolder() { + List candidates = new ArrayList<>(); + long min = 0; + + candidates.add(0); + min = getOccupiedSpace(folders.get(0)); + for(int i = 1;i < folders.size();i++){ + long current = getOccupiedSpace(folders.get(i)); + if(min > current){ + candidates.clear(); + candidates.add(i); + min = current; + } + else if(min == current){ + candidates.add(i); + } + } + + Random random = new Random(System.currentTimeMillis()); + int index = random.nextInt(candidates.size()); + + return candidates.get(index); + } + + private long getOccupiedSpace(String path) { + Path folder = Paths.get(path); + long size = 0; + try { + size = Files.walk(folder) + .filter(p -> p.toFile().isFile()) + .mapToLong(p -> p.toFile().length()) + .sum(); + } catch (IOException e) { + LOGGER.error("Cannot calculate occupied space for path {}.", path); + } + + return size / DATA_SIZE_SHIFT; + } +} + diff --git a/src/main/java/cn/edu/tsinghua/iotdb/conf/directories/strategy/MinFolderOccupiedSpaceFirstStrategy.java b/src/main/java/cn/edu/tsinghua/iotdb/conf/directories/strategy/MinFolderOccupiedSpaceFirstStrategy.java index 3969844b63c..18c4dbcdf33 100644 --- a/src/main/java/cn/edu/tsinghua/iotdb/conf/directories/strategy/MinFolderOccupiedSpaceFirstStrategy.java +++ b/src/main/java/cn/edu/tsinghua/iotdb/conf/directories/strategy/MinFolderOccupiedSpaceFirstStrategy.java @@ -46,7 +46,7 @@ public class MinFolderOccupiedSpaceFirstStrategy extends DirectoryStrategy { private long getOccupiedSpace(String path) { Path folder = Paths.get(path); - long size = 0; + long size = Long.MAX_VALUE; try { size = Files.walk(folder) .filter(p -> p.toFile().isFile()) diff --git a/src/main/java/cn/edu/tsinghua/iotdb/engine/bufferwriteV2/BufferIO.java b/src/main/java/cn/edu/tsinghua/iotdb/engine/bufferwrite/BufferIO.java similarity index 95% rename from src/main/java/cn/edu/tsinghua/iotdb/engine/bufferwriteV2/BufferIO.java rename to src/main/java/cn/edu/tsinghua/iotdb/engine/bufferwrite/BufferIO.java index 476e52369ca..e4a38eaf8dc 100644 --- a/src/main/java/cn/edu/tsinghua/iotdb/engine/bufferwriteV2/BufferIO.java +++ b/src/main/java/cn/edu/tsinghua/iotdb/engine/bufferwrite/BufferIO.java @@ -1,4 +1,4 @@ -package cn.edu.tsinghua.iotdb.engine.bufferwriteV2; +package cn.edu.tsinghua.iotdb.engine.bufferwrite; import java.io.IOException; import java.util.ArrayList; diff --git a/src/main/java/cn/edu/tsinghua/iotdb/engine/bufferwrite/BufferWriteIOWriter.java b/src/main/java/cn/edu/tsinghua/iotdb/engine/bufferwrite/BufferWriteIOWriter.java deleted file mode 100644 index 602885701b3..00000000000 --- a/src/main/java/cn/edu/tsinghua/iotdb/engine/bufferwrite/BufferWriteIOWriter.java +++ /dev/null @@ -1,94 +0,0 @@ -package cn.edu.tsinghua.iotdb.engine.bufferwrite; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import cn.edu.tsinghua.tsfile.common.utils.ITsRandomAccessFileWriter; -import cn.edu.tsinghua.tsfile.file.metadata.RowGroupMetaData; -import cn.edu.tsinghua.tsfile.file.metadata.TimeSeriesChunkMetaData; -import cn.edu.tsinghua.tsfile.file.metadata.enums.TSDataType; -import cn.edu.tsinghua.tsfile.timeseries.write.io.TsFileIOWriter; - -/** - * @author kangrong - * - */ -public class BufferWriteIOWriter extends TsFileIOWriter { - - /* - * The backup list is used to store the rowgroup's metadata whose data has - * been flushed into file. - */ - private final List backUpList = new ArrayList(); - private int lastRowGroupIndex = 0; - - public BufferWriteIOWriter(ITsRandomAccessFileWriter output) throws IOException { - super(output); - } - - /** - * This is just used to restore a tsfile from the middle of the file - * - * @param schema - * @param output - * @param rowGroups - * @throws IOException - */ - public BufferWriteIOWriter(ITsRandomAccessFileWriter output, long offset, List rowGroups) - throws IOException { - super(output, offset, rowGroups); - addrowGroupsTobackupList(rowGroups); - - } - - private void addrowGroupsTobackupList(List rowGroups) { - for (RowGroupMetaData rowGroupMetaData : rowGroups) { - backUpList.add(rowGroupMetaData); - } - lastRowGroupIndex = rowGroups.size(); - } - - /** - * Note that,the method is not thread safe. - */ - public void addNewRowGroupMetaDataToBackUp() { - for (int i = lastRowGroupIndex; i < rowGroupMetaDatas.size(); i++) { - backUpList.add(rowGroupMetaDatas.get(i)); - } - lastRowGroupIndex = rowGroupMetaDatas.size(); - } - - /** - * Note that, the method is not thread safe. You mustn't do any - * change on the return.
- * - * @return - */ - public List getCurrentRowGroupMetaList(String deltaObjectId) { - List ret = new ArrayList<>(); - for (RowGroupMetaData rowGroupMetaData : backUpList) { - if (rowGroupMetaData.getDeltaObjectID().equals(deltaObjectId)) { - ret.add(rowGroupMetaData); - } - } - return ret; - } - - public List getCurrentTimeSeriesMetadataList(String deltaObjectId, String measurementId, - TSDataType dataType) { - List chunkMetaDatas = new ArrayList<>(); - for (RowGroupMetaData rowGroupMetaData : backUpList) { - if (rowGroupMetaData.getDeltaObjectID().equals(deltaObjectId)) { - for (TimeSeriesChunkMetaData chunkMetaData : rowGroupMetaData.getTimeSeriesChunkMetaDataList()) { - // filter data-type and measurementId - if (chunkMetaData.getProperties().getMeasurementUID().equals(measurementId) - && chunkMetaData.getVInTimeSeriesChunkMetaData().getDataType().equals(dataType)) { - chunkMetaDatas.add(chunkMetaData); - } - } - } - } - return chunkMetaDatas; - } -} diff --git a/src/main/java/cn/edu/tsinghua/iotdb/engine/bufferwrite/BufferWriteIndex.java b/src/main/java/cn/edu/tsinghua/iotdb/engine/bufferwrite/BufferWriteIndex.java deleted file mode 100644 index 9469fe231ea..00000000000 --- a/src/main/java/cn/edu/tsinghua/iotdb/engine/bufferwrite/BufferWriteIndex.java +++ /dev/null @@ -1,36 +0,0 @@ -package cn.edu.tsinghua.iotdb.engine.bufferwrite; - -import cn.edu.tsinghua.tsfile.timeseries.read.query.DynamicOneColumnData; -import cn.edu.tsinghua.tsfile.timeseries.write.record.TSRecord; - -/** - * The function of this interface is to store and index TSRecord in memory - * temporarily - * - * @author kangrong - * - */ -@Deprecated -public interface BufferWriteIndex { - /** - * insert a tsRecord - * - * @param tsRecord - */ - void insert(TSRecord tsRecord); - - /** - * Get the DynamicOneColumnData from the buffer index - * - * @param deltaObjectId - * @param measurementId - * @return - */ - public DynamicOneColumnData query(String deltaObjectId, String measurementId); - - /** - * clear all data written in the bufferindex structure which will be used - * for next stage - */ - void clear(); -} diff --git a/src/main/java/cn/edu/tsinghua/iotdb/engine/bufferwrite/BufferWriteProcessor.java b/src/main/java/cn/edu/tsinghua/iotdb/engine/bufferwrite/BufferWriteProcessor.java index df29200198e..0e31de6b96d 100644 --- a/src/main/java/cn/edu/tsinghua/iotdb/engine/bufferwrite/BufferWriteProcessor.java +++ b/src/main/java/cn/edu/tsinghua/iotdb/engine/bufferwrite/BufferWriteProcessor.java @@ -1,14 +1,7 @@ package cn.edu.tsinghua.iotdb.engine.bufferwrite; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; import java.io.IOException; -import java.io.RandomAccessFile; -import java.nio.channels.FileChannel; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.Future; @@ -29,7 +22,6 @@ import cn.edu.tsinghua.iotdb.engine.Processor; import cn.edu.tsinghua.iotdb.engine.memcontrol.BasicMemController; import cn.edu.tsinghua.iotdb.engine.memtable.IMemTable; import cn.edu.tsinghua.iotdb.engine.memtable.MemSeriesLazyMerger; -import cn.edu.tsinghua.iotdb.engine.memtable.MemTableFlushUtil; import cn.edu.tsinghua.iotdb.engine.memtable.PrimitiveMemTable; import cn.edu.tsinghua.iotdb.engine.pool.FlushManager; import cn.edu.tsinghua.iotdb.engine.querycontext.RawSeriesChunk; @@ -37,75 +29,50 @@ import cn.edu.tsinghua.iotdb.engine.querycontext.RawSeriesChunkLazyLoadImpl; import cn.edu.tsinghua.iotdb.engine.utils.FlushStatus; import cn.edu.tsinghua.iotdb.exception.BufferWriteProcessorException; import cn.edu.tsinghua.iotdb.utils.MemUtils; -import cn.edu.tsinghua.tsfile.common.conf.TSFileConfig; import cn.edu.tsinghua.tsfile.common.conf.TSFileDescriptor; -import cn.edu.tsinghua.tsfile.common.utils.BytesUtils; -import cn.edu.tsinghua.tsfile.common.utils.ITsRandomAccessFileWriter; import cn.edu.tsinghua.tsfile.common.utils.Pair; -import cn.edu.tsinghua.tsfile.common.utils.TsRandomAccessFileWriter; -import cn.edu.tsinghua.tsfile.file.metadata.RowGroupMetaData; import cn.edu.tsinghua.tsfile.file.metadata.TimeSeriesChunkMetaData; -import cn.edu.tsinghua.tsfile.file.metadata.TsRowGroupBlockMetaData; import cn.edu.tsinghua.tsfile.file.metadata.enums.TSDataType; -import cn.edu.tsinghua.tsfile.file.utils.ReadWriteThriftFormatUtils; -import cn.edu.tsinghua.tsfile.format.RowGroupBlockMetaData; import cn.edu.tsinghua.tsfile.timeseries.write.record.DataPoint; import cn.edu.tsinghua.tsfile.timeseries.write.record.TSRecord; import cn.edu.tsinghua.tsfile.timeseries.write.schema.FileSchema; -/** - * @author liukun - */ public class BufferWriteProcessor extends Processor { - private static final Logger LOGGER = LoggerFactory.getLogger(BufferWriteProcessor.class); - private static final TsfileDBConfig TsFileDBConf = TsfileDBDescriptor.getInstance().getConfig(); - private static final TSFileConfig TsFileConf = TSFileDescriptor.getInstance().getConfig(); - private static final int TS_METADATA_BYTE_SIZE = 4; - private static final int TSFILE_POSITION_BYTE_SIZE = 8; + + private FileSchema fileSchema; + private BufferWriteResource bufferWriteResource; private volatile FlushStatus flushStatus = new FlushStatus(); + private volatile boolean isFlush; private ReentrantLock flushQueryLock = new ReentrantLock(); + private AtomicLong memSize = new AtomicLong(); + private long memThreshold = TSFileDescriptor.getInstance().getConfig().groupSizeInByte; private IMemTable workMemTable; private IMemTable flushMemTable; - private FileSchema fileSchema; - private BufferWriteIOWriter bufferIOWriter; - private int lastRowgroupSize = 0; - - // this just the bufferwrite file name - private String baseDir; - private String fileName; - private static final String restoreFile = ".restore"; - // this is the bufferwrite file absolute path - private String bufferwriteRestoreFilePath; - private String bufferwriteOutputFilePath; - private String bufferwriterelativePath; - private File bufferwriteOutputFile; - - private boolean isNewProcessor = false; - private Action bufferwriteFlushAction = null; private Action bufferwriteCloseAction = null; private Action filenodeFlushAction = null; - private long memThreshold = TsFileConf.groupSizeInByte; private long lastFlushTime = -1; private long valueCount = 0; - private volatile boolean isFlush; - private AtomicLong memSize = new AtomicLong(); + + private String baseDir; + private String fileName; + private String insertFilePath; + private String bufferwriterelativePath; private WriteLogNode logNode; public BufferWriteProcessor(String baseDir, String processorName, String fileName, Map parameters, FileSchema fileSchema) throws BufferWriteProcessorException { super(processorName); - - this.fileName = fileName; - String restoreFileName = fileName + restoreFile; - + this.fileSchema = fileSchema; this.baseDir = baseDir; + this.fileName = fileName; + if (baseDir.length() > 0 && baseDir.charAt(baseDir.length() - 1) != File.separatorChar) { baseDir = baseDir + File.separatorChar; @@ -116,50 +83,21 @@ public class BufferWriteProcessor extends Processor { dataDir.mkdirs(); LOGGER.debug("The bufferwrite processor data dir doesn't exists, create new directory {}.", dataDirPath); } - bufferwriteOutputFile = new File(dataDir, fileName); - File restoreFile = new File(dataDir, restoreFileName); - bufferwriteRestoreFilePath = restoreFile.getPath(); - bufferwriteOutputFilePath = bufferwriteOutputFile.getPath(); + this.insertFilePath = new File(dataDir, fileName).getPath(); bufferwriterelativePath = processorName + File.separatorChar + fileName; - // get the fileschema - this.fileSchema = fileSchema; - - if (bufferwriteOutputFile.exists() && restoreFile.exists()) { - // - // There is one damaged file, and the RESTORE_FILE_SUFFIX exist - // - LOGGER.info("Recorvery the bufferwrite processor {}.", processorName); - bufferwriteRecovery(); - - } else { - - ITsRandomAccessFileWriter outputWriter; - try { - outputWriter = new TsRandomAccessFileWriter(bufferwriteOutputFile); - } catch (IOException e) { - LOGGER.error("Construct the TSRandomAccessFileWriter error, the absolutePath is {}.", - bufferwriteOutputFile.getPath(), e); - throw new BufferWriteProcessorException(e); - } - - try { - bufferIOWriter = new BufferWriteIOWriter(outputWriter); - } catch (IOException e) { - LOGGER.error("Get the BufferWriteIOWriter error, the bufferwrite is {}.", processorName, e); - throw new BufferWriteProcessorException(e); - } - isNewProcessor = true; - // write restore file - writeStoreToDisk(); + try { + bufferWriteResource = new BufferWriteResource(processorName, insertFilePath); + } catch (IOException e) { + throw new BufferWriteProcessorException(e); } - // init action - // the action from the corresponding filenode processor + + bufferwriteFlushAction = (Action) parameters.get(FileNodeConstants.BUFFERWRITE_FLUSH_ACTION); bufferwriteCloseAction = (Action) parameters.get(FileNodeConstants.BUFFERWRITE_CLOSE_ACTION); filenodeFlushAction = (Action) parameters.get(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION); workMemTable = new PrimitiveMemTable(); - if (TsfileDBDescriptor.getInstance().getConfig().enableWal) { + if(TsfileDBDescriptor.getInstance().getConfig().enableWal) { try { logNode = MultiFileLogNodeManager.getInstance().getNode( processorName + TsFileDBConstant.BUFFERWRITE_LOG_NODE_SUFFIX, getBufferwriteRestoreFilePath(), @@ -170,256 +108,6 @@ public class BufferWriteProcessor extends Processor { } } - /** - *

- * Recovery the bufferwrite status.
- * The one part is the last intervalFile
- * The other part is all the intervalFile, and other file will be deleted - *

- * - * @throws BufferWriteProcessorException - */ - private void bufferwriteRecovery() throws BufferWriteProcessorException { - - Pair> pair; - try { - pair = readStoreFromDisk(); - } catch (IOException e) { - LOGGER.error("Failed to read bufferwrite {} restore file.", getProcessorName()); - throw new BufferWriteProcessorException(e); - } - ITsRandomAccessFileWriter output; - long lastFlushPosition = pair.left; - File lastBufferWriteFile = new File(bufferwriteOutputFilePath); - if (lastBufferWriteFile.length() != lastFlushPosition) { - LOGGER.warn( - "The last bufferwrite file {} is damaged, the length of the last bufferwrite file is {}, the end of last successful flush is {}.", - lastBufferWriteFile.getPath(), lastBufferWriteFile.length(), lastFlushPosition); - try { - FileChannel fileChannel = new FileOutputStream(bufferwriteOutputFile, true).getChannel(); - fileChannel.truncate(lastFlushPosition); - fileChannel.close(); - //cutOffFile(lastFlushPosition); - } catch (IOException e) { - LOGGER.error( - "Cut off damaged file error, the damaged file path is {}, the length is {}, the cut off length is {}.", - bufferwriteOutputFilePath, lastBufferWriteFile.length(), lastFlushPosition, e); - throw new BufferWriteProcessorException(e); - } - } - try { - // Notice: the offset is seek to end of the file by API of kr - output = new TsRandomAccessFileWriter(lastBufferWriteFile); - } catch (IOException e) { - LOGGER.error("Can't construct the RandomAccessOutputStream, the outputPath is {}.", - bufferwriteOutputFilePath); - throw new BufferWriteProcessorException(e); - } - try { - // Notice: the parameter of lastPosition is not used beacuse of the - // API of kr - bufferIOWriter = new BufferWriteIOWriter(output, lastFlushPosition, pair.right); - } catch (IOException e) { - LOGGER.error("Can't get the BufferWriteIOWriter while recoverying, the bufferwrite processor is {}.", - getProcessorName(), e); - throw new BufferWriteProcessorException(e); - } - isNewProcessor = false; - } - - private void cutOffFile(long length) throws IOException { - - String tempPath = bufferwriteOutputFilePath + ".backup"; - File tempFile = new File(tempPath); - File normalFile = new File(bufferwriteOutputFilePath); - - if (normalFile.exists() && normalFile.length() > 0) { - - if (tempFile.exists()) { - tempFile.delete(); - } - RandomAccessFile normalReader = null; - RandomAccessFile tempWriter = null; - try { - normalReader = new RandomAccessFile(normalFile, "r"); - tempWriter = new RandomAccessFile(tempFile, "rw"); - } catch (FileNotFoundException e) { - LOGGER.error( - "Can't get the RandomAccessFile read and write, the normal path is {}, the temp path is {}.", - bufferwriteOutputFilePath, tempPath); - if (normalReader != null) { - normalReader.close(); - } - if (tempWriter != null) { - tempWriter.close(); - } - throw e; - } - long offset = 0; - int step = 4 * 1024 * 1024; - byte[] buff = new byte[step]; - while (length - offset >= step) { - try { - normalReader.readFully(buff); - tempWriter.write(buff); - } catch (IOException e) { - LOGGER.error("normalReader read data failed or tempWriter write data error."); - throw e; - } - offset = offset + step; - } - normalReader.readFully(buff, 0, (int) (length - offset)); - tempWriter.write(buff, 0, (int) (length - offset)); - normalReader.close(); - tempWriter.close(); - } - normalFile.delete(); - tempFile.renameTo(normalFile); - } - - /** - * This is only used after flush one rowroup data successfully. - * - * @throws BufferWriteProcessorException - */ - private void writeStoreToDisk() throws BufferWriteProcessorException { - - long lastPosition; - try { - lastPosition = bufferIOWriter.getPos(); - } catch (IOException e) { - LOGGER.error("Can't get the bufferwrite io position, the buffewrite processor is {}", getProcessorName(), - e); - throw new BufferWriteProcessorException(e); - } - List rowGroupMetaDatas = bufferIOWriter.getRowGroups(); - List appendMetadata = new ArrayList<>(); - for (int i = lastRowgroupSize; i < rowGroupMetaDatas.size(); i++) { - appendMetadata.add(rowGroupMetaDatas.get(i)); - } - lastRowgroupSize = rowGroupMetaDatas.size(); - TsRowGroupBlockMetaData tsRowGroupBlockMetaData = new TsRowGroupBlockMetaData(); - tsRowGroupBlockMetaData.setRowGroups(appendMetadata); - - RandomAccessFile out = null; - try { - out = new RandomAccessFile(bufferwriteRestoreFilePath, "rw"); - } catch (FileNotFoundException e) { - LOGGER.error("The restore file {} can't be created, the bufferwrite processor is {}", - bufferwriteRestoreFilePath, getProcessorName(), e); - throw new BufferWriteProcessorException(e); - } - try { - if (out.length() > 0) { - out.seek(out.length() - TSFILE_POSITION_BYTE_SIZE); - } - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - ReadWriteThriftFormatUtils.writeRowGroupBlockMetadata(tsRowGroupBlockMetaData.convertToThrift(), baos); - // write metadata size using int - int metadataSize = baos.size(); - out.write(BytesUtils.intToBytes(metadataSize)); - // write metadata - out.write(baos.toByteArray()); - // write tsfile position using byte[8] which is present one long - // number - byte[] lastPositionBytes = BytesUtils.longToBytes(lastPosition); - out.write(lastPositionBytes); - } catch (IOException e) { - throw new BufferWriteProcessorException(e); - } finally { - if (out != null) { - try { - out.close(); - } catch (IOException e) { - e.printStackTrace(); - throw new BufferWriteProcessorException(e); - } - } - } - } - - /** - * This is used to delete the file which is used to restore buffer write - * processor. This is only used after closing the buffer write processor - * successfully. - */ - private void deleteRestoreFile() { - File restoreFile = new File(bufferwriteRestoreFilePath); - if (restoreFile.exists()) { - restoreFile.delete(); - } - } - - /** - * The left of the pair is the last successful flush position. The right of - * the pair is the rowGroupMetadata. - * - * @return - the left is the end position of the last rowgroup flushed, the - * right is all the rowgroup meatdata flushed - * @throws IOException - */ - private Pair> readStoreFromDisk() throws IOException { - byte[] lastPostionBytes = new byte[TSFILE_POSITION_BYTE_SIZE]; - List groupMetaDatas = new ArrayList<>(); - RandomAccessFile randomAccessFile = null; - try { - randomAccessFile = new RandomAccessFile(bufferwriteRestoreFilePath, "rw"); - long fileLength = randomAccessFile.length(); - // read tsfile position - long point = randomAccessFile.getFilePointer(); - while (point + TSFILE_POSITION_BYTE_SIZE < fileLength) { - byte[] metadataSizeBytes = new byte[TS_METADATA_BYTE_SIZE]; - randomAccessFile.read(metadataSizeBytes); - int metadataSize = BytesUtils.bytesToInt(metadataSizeBytes); - byte[] thriftBytes = new byte[metadataSize]; - randomAccessFile.read(thriftBytes); - ByteArrayInputStream inputStream = new ByteArrayInputStream(thriftBytes); - RowGroupBlockMetaData rowGroupBlockMetaData = ReadWriteThriftFormatUtils - .readRowGroupBlockMetaData(inputStream); - TsRowGroupBlockMetaData blockMeta = new TsRowGroupBlockMetaData(); - blockMeta.convertToTSF(rowGroupBlockMetaData); - groupMetaDatas.addAll(blockMeta.getRowGroups()); - lastRowgroupSize = groupMetaDatas.size(); - point = randomAccessFile.getFilePointer(); - } - // read the tsfile position information using byte[8] which is - // present one long number. - randomAccessFile.read(lastPostionBytes); - } catch (FileNotFoundException e) { - LOGGER.error("The restore file does not exist, the restore file path is {}.", bufferwriteRestoreFilePath, - e); - throw e; - } catch (IOException e) { - LOGGER.error("Read data from file error.", e); - throw e; - } finally { - if (randomAccessFile != null) { - randomAccessFile.close(); - } - } - long lastPostion = BytesUtils.bytesToLong(lastPostionBytes); - Pair> result = new Pair>(lastPostion, groupMetaDatas); - return result; - } - - public String getFileName() { - return fileName; - } - - public String getBaseDir() { return baseDir; } - - public String getFileRelativePath() { - return bufferwriterelativePath; - } - - public boolean isNewProcessor() { - return isNewProcessor; - } - - public void setNewProcessor(boolean isNewProcessor) { - this.isNewProcessor = isNewProcessor; - } - /** * write one data point to the bufferwrite * @@ -443,7 +131,6 @@ public class BufferWriteProcessor extends Processor { public boolean write(TSRecord tsRecord) throws BufferWriteProcessorException { long memUage = MemUtils.getRecordSize(tsRecord); BasicMemController.UsageLevel level = BasicMemController.getInstance().reportUse(this, memUage); - for (DataPoint dataPoint : tsRecord.dataPointList) { workMemTable.write(tsRecord.deltaObjectId, dataPoint.getMeasurementId(), dataPoint.getType(), tsRecord.time, dataPoint.getValue().toString()); @@ -490,18 +177,6 @@ public class BufferWriteProcessor extends Processor { } } - @Deprecated - public Pair, List> queryBufferwriteData(String deltaObjectId, String measurementId) { - flushQueryLock.lock(); - try { - List memData = new ArrayList<>(); - List list = new ArrayList<>(); - return new Pair<>(memData, list); - } finally { - flushQueryLock.unlock(); - } - } - public Pair> queryBufferwriteData(String deltaObjectId, String measurementId, TSDataType dataType) { flushQueryLock.lock(); @@ -513,7 +188,7 @@ public class BufferWriteProcessor extends Processor { memSeriesLazyMerger.addMemSeries(workMemTable.query(deltaObjectId, measurementId, dataType)); RawSeriesChunk rawSeriesChunk = new RawSeriesChunkLazyLoadImpl(dataType, memSeriesLazyMerger); return new Pair<>(rawSeriesChunk, - bufferIOWriter.getCurrentTimeSeriesMetadataList(deltaObjectId, measurementId, dataType)); + bufferWriteResource.getInsertMetadatas(deltaObjectId, measurementId, dataType)); } finally { flushQueryLock.unlock(); } @@ -532,12 +207,12 @@ public class BufferWriteProcessor extends Processor { } } - private void switchFlushToWork() { + private void swithFlushToWork() { flushQueryLock.lock(); try { flushMemTable.clear(); flushMemTable = null; - bufferIOWriter.addNewRowGroupMetaDataToBackUp(); + bufferWriteResource.appendMetadata(); } finally { isFlush = false; flushQueryLock.unlock(); @@ -548,20 +223,7 @@ public class BufferWriteProcessor extends Processor { long flushStartTime = System.currentTimeMillis(); LOGGER.info("The bufferwrite processor {} starts flushing {}.", getProcessorName(), flushFunction); try { - long startFlushDataTime = System.currentTimeMillis(); - long startPos = bufferIOWriter.getPos(); - // TODO : FLUSH DATA - MemTableFlushUtil.flushMemTable(fileSchema, bufferIOWriter, flushMemTable); - long flushDataSize = bufferIOWriter.getPos() - startPos; - long timeInterval = System.currentTimeMillis() - startFlushDataTime; - if (timeInterval == 0) { - timeInterval = 1; - } - LOGGER.info("The bufferwrite processor {} flush {}, actual:{}, time consumption:{} ms, flush rate:{}/s", - getProcessorName(), flushFunction, MemUtils.bytesCntToStr(flushDataSize), timeInterval, - MemUtils.bytesCntToStr(flushDataSize / timeInterval * 1000)); - // write restore information - writeStoreToDisk(); + bufferWriteResource.flush(fileSchema, flushMemTable); filenodeFlushAction.act(); if (TsfileDBDescriptor.getInstance().getConfig().enableWal) { logNode.notifyEndFlush(null); @@ -574,13 +236,11 @@ public class BufferWriteProcessor extends Processor { } finally { synchronized (flushStatus) { flushStatus.setUnFlushing(); - switchFlushToWork(); + swithFlushToWork(); flushStatus.notify(); LOGGER.info("The bufferwrite processor {} ends flushing {}.", getProcessorName(), flushFunction); } } - // BasicMemController.getInstance().reportFree(BufferWriteProcessor.this, - // oldMemUsage); long flushEndTime = System.currentTimeMillis(); long flushInterval = flushEndTime - flushStartTime; DateTime startDateTime = new DateTime(flushStartTime, TsfileDBDescriptor.getInstance().getConfig().timeZone); @@ -620,9 +280,7 @@ public class BufferWriteProcessor extends Processor { try { bufferwriteFlushAction.act(); } catch (Exception e) { - LOGGER.error( - "The bufferwrite processor {} failed to flush bufferwrite row group when calling the action function.", - getProcessorName()); + LOGGER.error("Failed to flush bufferwrite row group when calling the action function."); throw new IOException(e); } if (TsfileDBDescriptor.getInstance().getConfig().enableWal) { @@ -661,17 +319,6 @@ public class BufferWriteProcessor extends Processor { @Override public boolean canBeClosed() { - LOGGER.info("Check whether bufferwrite processor {} can be closed.", getProcessorName()); - synchronized (flushStatus) { - while (flushStatus.isFlushing()) { - LOGGER.info("The bufferite processor {} is flushing, waiting for the flush end.", getProcessorName()); - try { - flushStatus.wait(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - } return true; } @@ -682,13 +329,12 @@ public class BufferWriteProcessor extends Processor { // flush data flush(true); // end file - bufferIOWriter.endFile(fileSchema); + bufferWriteResource.close(fileSchema); // update the intervalfile for interval list bufferwriteCloseAction.act(); // flush the changed information for filenode filenodeFlushAction.act(); // delete the restore for this bufferwrite processor - deleteRestoreFile(); long closeEndTime = System.currentTimeMillis(); long closeInterval = closeEndTime - closeStartTime; DateTime startDateTime = new DateTime(closeStartTime, @@ -724,7 +370,9 @@ public class BufferWriteProcessor extends Processor { * @throws IOException */ public long getFileSize() { - return bufferwriteOutputFile.length() + memoryUsage(); + // TODO : save this variable to avoid object creation? + File file = new File(insertFilePath); + return file.length() + memoryUsage(); } /** @@ -758,11 +406,29 @@ public class BufferWriteProcessor extends Processor { return false; } + public String getBaseDir() { return baseDir; } + + public String getFileName() { + return fileName; + } + + public String getFileRelativePath() { + return bufferwriterelativePath; + } + + private String getBufferwriteRestoreFilePath() { + return bufferWriteResource.getRestoreFilePath(); + } + + public boolean isNewProcessor() { + return bufferWriteResource.isNewResource(); + } + + public void setNewProcessor(boolean isNewProcessor) { + bufferWriteResource.setNewResource(isNewProcessor); + } + public WriteLogNode getLogNode() { return logNode; } - - public String getBufferwriteRestoreFilePath() { - return bufferwriteRestoreFilePath; - } } diff --git a/src/main/java/cn/edu/tsinghua/iotdb/engine/bufferwriteV2/BufferWriteResource.java b/src/main/java/cn/edu/tsinghua/iotdb/engine/bufferwrite/BufferWriteResource.java similarity index 94% rename from src/main/java/cn/edu/tsinghua/iotdb/engine/bufferwriteV2/BufferWriteResource.java rename to src/main/java/cn/edu/tsinghua/iotdb/engine/bufferwrite/BufferWriteResource.java index 4d6d9107648..8892bed2b4e 100644 --- a/src/main/java/cn/edu/tsinghua/iotdb/engine/bufferwriteV2/BufferWriteResource.java +++ b/src/main/java/cn/edu/tsinghua/iotdb/engine/bufferwrite/BufferWriteResource.java @@ -1,4 +1,4 @@ -package cn.edu.tsinghua.iotdb.engine.bufferwriteV2; +package cn.edu.tsinghua.iotdb.engine.bufferwrite; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -39,18 +39,20 @@ public class BufferWriteResource { private static final String restoreSuffix = ".restore"; private static final String DEFAULT_MODE = "rw"; private Map>> metadatas; - private List appendRowGroupMetadats; + private List appendRowGroupMetadatas; private BufferIO bufferWriteIO; private String insertFilePath; private String restoreFilePath; private String processorName; + private boolean isNewResource = false; + public BufferWriteResource(String processorName, String insertFilePath) throws IOException { this.insertFilePath = insertFilePath; this.restoreFilePath = insertFilePath + restoreSuffix; this.processorName = processorName; this.metadatas = new HashMap<>(); - this.appendRowGroupMetadats = new ArrayList<>(); + this.appendRowGroupMetadatas = new ArrayList<>(); recover(); } @@ -69,14 +71,17 @@ public class BufferWriteResource { //cutOffFile(position); // recovery the BufferWriteIO bufferWriteIO = new BufferIO(new TsRandomAccessFileWriter(insertFile), position, metadatas); + recoverMetadata(metadatas); LOGGER.info( "Recover the bufferwrite processor {}, the tsfile path is {}, the position of last flush is {}, the size of rowGroupMetadata is {}", processorName, insertFilePath, position, metadatas.size()); + isNewResource = false; } else { insertFile.delete(); restoreFile.delete(); bufferWriteIO = new BufferIO(new TsRandomAccessFileWriter(insertFile), 0, new ArrayList<>()); + isNewResource = true; writeRestoreInfo(); } } @@ -229,6 +234,14 @@ public class BufferWriteResource { return restoreFilePath; } + public boolean isNewResource() { + return isNewResource; + } + + public void setNewResource(boolean isNewResource) { + this.isNewResource = isNewResource; + } + public void flush(FileSchema fileSchema, IMemTable iMemTable) throws IOException { if (iMemTable != null && !iMemTable.isEmpty()) { long startPos = bufferWriteIO.getPos(); @@ -244,19 +257,19 @@ public class BufferWriteResource { "Bufferwrite processor {} flushes insert data, actual:{}, time consumption:{} ms, flush rate:{}/s", processorName, MemUtils.bytesCntToStr(insertSize), timeInterval, MemUtils.bytesCntToStr(insertSize / timeInterval * 1000)); - appendRowGroupMetadats.addAll(bufferWriteIO.getAppendedRowGroupMetadata()); + appendRowGroupMetadatas.addAll(bufferWriteIO.getAppendedRowGroupMetadata()); } } public void appendMetadata() { - if (!appendRowGroupMetadats.isEmpty()) { - for (RowGroupMetaData rowGroupMetaData : appendRowGroupMetadats) { + if (!appendRowGroupMetadatas.isEmpty()) { + for (RowGroupMetaData rowGroupMetaData : appendRowGroupMetadatas) { for (TimeSeriesChunkMetaData chunkMetaData : rowGroupMetaData.getTimeSeriesChunkMetaDataList()) { addInsertMetadata(rowGroupMetaData.getDeltaObjectID(), chunkMetaData.getProperties().getMeasurementUID(), chunkMetaData); } } - appendRowGroupMetadats.clear(); + appendRowGroupMetadatas.clear(); } } diff --git a/src/main/java/cn/edu/tsinghua/iotdb/engine/bufferwrite/MemoryBufferWriteIndexImpl.java b/src/main/java/cn/edu/tsinghua/iotdb/engine/bufferwrite/MemoryBufferWriteIndexImpl.java deleted file mode 100644 index cfc772ab2df..00000000000 --- a/src/main/java/cn/edu/tsinghua/iotdb/engine/bufferwrite/MemoryBufferWriteIndexImpl.java +++ /dev/null @@ -1,78 +0,0 @@ -package cn.edu.tsinghua.iotdb.engine.bufferwrite; - -import java.util.HashMap; -import java.util.Map; - -import cn.edu.tsinghua.tsfile.common.utils.Binary; -import cn.edu.tsinghua.tsfile.timeseries.read.query.DynamicOneColumnData; -import cn.edu.tsinghua.tsfile.timeseries.write.record.DataPoint; -import cn.edu.tsinghua.tsfile.timeseries.write.record.TSRecord; - -/** - * Implement the {@code BufferWriteIndex}
- * This class is used to store bufferwrite data in memory, also index data - * easily
- * - * @author kangrong - * @author liukun - * - */ -@Deprecated -public class MemoryBufferWriteIndexImpl implements BufferWriteIndex { - private Map indexMap; - - public MemoryBufferWriteIndexImpl() { - indexMap = new HashMap(); - } - - @Override - public DynamicOneColumnData query(String deltaObjectId, String measurementId) { - return indexMap.get(concatPath(deltaObjectId, measurementId)); - } - - @Override - public void insert(TSRecord tsRecord) { - String deltaObjectId = tsRecord.deltaObjectId; - for (DataPoint dp : tsRecord.dataPointList) { - String measurementId = dp.getMeasurementId(); - String path = concatPath(deltaObjectId, measurementId); - if (!indexMap.containsKey(path)) - indexMap.put(path, new DynamicOneColumnData(dp.getType(), true)); - DynamicOneColumnData deltaMea = indexMap.get(path); - Object value = dp.getValue(); - deltaMea.putTime(tsRecord.time); - switch (dp.getType()) { - case INT32: - deltaMea.putInt((Integer) value); - break; - case INT64: - deltaMea.putLong((Long) value); - break; - case FLOAT: - deltaMea.putFloat((Float) value); - break; - case DOUBLE: - deltaMea.putDouble((Double) value); - break; - case TEXT: - deltaMea.putBinary((Binary) value); - break; - case BOOLEAN: - deltaMea.putBoolean((boolean) value); - break; - default: - throw new UnsupportedOperationException("no support type:" + dp.getType() + "record:" + tsRecord); - } - } - } - - @Override - public void clear() { - indexMap.clear(); - } - - private String concatPath(String deltaObjectId, String measurementId) { - return deltaObjectId + FileNodeConstants.PATH_SEPARATOR + measurementId; - } - -} diff --git a/src/main/java/cn/edu/tsinghua/iotdb/engine/bufferwriteV2/BufferWriteProcessor.java b/src/main/java/cn/edu/tsinghua/iotdb/engine/bufferwriteV2/BufferWriteProcessor.java deleted file mode 100644 index 87f0e62e514..00000000000 --- a/src/main/java/cn/edu/tsinghua/iotdb/engine/bufferwriteV2/BufferWriteProcessor.java +++ /dev/null @@ -1,411 +0,0 @@ -package cn.edu.tsinghua.iotdb.engine.bufferwriteV2; - -import java.io.File; -import java.io.IOException; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.ReentrantLock; - -import cn.edu.tsinghua.iotdb.conf.TsFileDBConstant; -import cn.edu.tsinghua.iotdb.engine.filenode.FileNodeManager; -import cn.edu.tsinghua.iotdb.writelog.manager.MultiFileLogNodeManager; -import cn.edu.tsinghua.iotdb.writelog.node.WriteLogNode; -import org.joda.time.DateTime; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import cn.edu.tsinghua.iotdb.conf.TsfileDBConfig; -import cn.edu.tsinghua.iotdb.conf.TsfileDBDescriptor; -import cn.edu.tsinghua.iotdb.engine.Processor; -import cn.edu.tsinghua.iotdb.engine.bufferwrite.Action; -import cn.edu.tsinghua.iotdb.engine.bufferwrite.FileNodeConstants; -import cn.edu.tsinghua.iotdb.engine.memcontrol.BasicMemController; -import cn.edu.tsinghua.iotdb.engine.memtable.IMemTable; -import cn.edu.tsinghua.iotdb.engine.memtable.MemSeriesLazyMerger; -import cn.edu.tsinghua.iotdb.engine.memtable.PrimitiveMemTable; -import cn.edu.tsinghua.iotdb.engine.pool.FlushManager; -import cn.edu.tsinghua.iotdb.engine.querycontext.RawSeriesChunk; -import cn.edu.tsinghua.iotdb.engine.querycontext.RawSeriesChunkLazyLoadImpl; -import cn.edu.tsinghua.iotdb.engine.utils.FlushStatus; -import cn.edu.tsinghua.iotdb.exception.BufferWriteProcessorException; -import cn.edu.tsinghua.iotdb.utils.MemUtils; -import cn.edu.tsinghua.tsfile.common.conf.TSFileDescriptor; -import cn.edu.tsinghua.tsfile.common.exception.ProcessorException; -import cn.edu.tsinghua.tsfile.common.utils.Pair; -import cn.edu.tsinghua.tsfile.file.metadata.TimeSeriesChunkMetaData; -import cn.edu.tsinghua.tsfile.file.metadata.enums.TSDataType; -import cn.edu.tsinghua.tsfile.timeseries.write.record.DataPoint; -import cn.edu.tsinghua.tsfile.timeseries.write.record.TSRecord; -import cn.edu.tsinghua.tsfile.timeseries.write.schema.FileSchema; - -public class BufferWriteProcessor extends Processor { - private static final Logger LOGGER = LoggerFactory.getLogger(BufferWriteProcessor.class); - - private FileSchema fileSchema; - private BufferWriteResource bufferWriteResource; - - private volatile FlushStatus flushStatus = new FlushStatus(); - private volatile boolean isFlush; - private ReentrantLock flushQueryLock = new ReentrantLock(); - private AtomicLong memSize = new AtomicLong(); - private long memThreshold = TSFileDescriptor.getInstance().getConfig().groupSizeInByte; - - private IMemTable workMemTable; - private IMemTable flushMemTable; - - private Action bufferwriteFlushAction = null; - private Action bufferwriteCloseAction = null; - private Action filenodeFlushAction = null; - - private long lastFlushTime = -1; - private long valueCount = 0; - - private String baseDir; - private String fileName; - private String insertFilePath; - - private WriteLogNode logNode; - - public BufferWriteProcessor(String baseDir, String processorName, String fileName, Map parameters, - FileSchema fileSchema) throws BufferWriteProcessorException { - super(processorName); - this.fileSchema = fileSchema; - this.baseDir = baseDir; - this.fileName = fileName; - if (baseDir.length() > 0 - && baseDir.charAt(baseDir.length() - 1) != File.separatorChar) { - baseDir = baseDir + File.separatorChar; - } - String dataDirPath = baseDir + processorName; - File dataDir = new File(dataDirPath); - if (!dataDir.exists()) { - dataDir.mkdirs(); - LOGGER.debug("The bufferwrite processor data dir doesn't exists, create new directory {}.", dataDirPath); - } - this.insertFilePath = new File(dataDir, fileName).getPath(); - try { - bufferWriteResource = new BufferWriteResource(processorName, insertFilePath); - } catch (IOException e) { - throw new BufferWriteProcessorException(e); - } - bufferwriteFlushAction = (Action) parameters.get(FileNodeConstants.BUFFERWRITE_FLUSH_ACTION); - bufferwriteCloseAction = (Action) parameters.get(FileNodeConstants.BUFFERWRITE_CLOSE_ACTION); - filenodeFlushAction = (Action) parameters.get(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION); - workMemTable = new PrimitiveMemTable(); - - if(TsfileDBDescriptor.getInstance().getConfig().enableWal) { - try { - logNode = MultiFileLogNodeManager.getInstance().getNode( - processorName + TsFileDBConstant.BUFFERWRITE_LOG_NODE_SUFFIX, getBufferwriteRestoreFilePath(), - FileNodeManager.getInstance().getRestoreFilePath(processorName)); - } catch (IOException e) { - throw new BufferWriteProcessorException(e); - } - } - } - - /** - * write one data point to the bufferwrite - * - * @param deltaObjectId - * @param measurementId - * @param timestamp - * @param dataType - * @param value - * @return true -the size of tsfile or metadata reaches to the threshold. - * false -otherwise - * @throws BufferWriteProcessorException - */ - public boolean write(String deltaObjectId, String measurementId, long timestamp, TSDataType dataType, String value) - throws BufferWriteProcessorException { - TSRecord record = new TSRecord(timestamp, deltaObjectId); - DataPoint dataPoint = DataPoint.getDataPoint(dataType, measurementId, value); - record.addTuple(dataPoint); - return write(record); - } - - public boolean write(TSRecord tsRecord) throws BufferWriteProcessorException { - long memUage = MemUtils.getRecordSize(tsRecord); - BasicMemController.UsageLevel level = BasicMemController.getInstance().reportUse(this, memUage); - for (DataPoint dataPoint : tsRecord.dataPointList) { - workMemTable.write(tsRecord.deltaObjectId, dataPoint.getMeasurementId(), dataPoint.getType(), tsRecord.time, - dataPoint.getValue().toString()); - } - valueCount++; - switch (level) { - case SAFE: - // memUsed += newMemUsage; - // memtable - memUage = memSize.addAndGet(memUage); - if (memUage > memThreshold) { - LOGGER.info("The usage of memory {} in bufferwrite processor {} reaches the threshold {}", - MemUtils.bytesCntToStr(memUage), getProcessorName(), MemUtils.bytesCntToStr(memThreshold)); - try { - flush(); - } catch (IOException e) { - e.printStackTrace(); - throw new BufferWriteProcessorException(e); - } - } - return false; - case WARNING: - LOGGER.warn("Memory usage will exceed warning threshold, current : {}.", - MemUtils.bytesCntToStr(BasicMemController.getInstance().getTotalUsage())); - // memUsed += newMemUsage; - // memtable - memUage = memSize.addAndGet(memUage); - if (memUage > memThreshold) { - LOGGER.info("The usage of memory {} in bufferwrite processor {} reaches the threshold {}", - MemUtils.bytesCntToStr(memUage), getProcessorName(), MemUtils.bytesCntToStr(memThreshold)); - try { - flush(); - } catch (IOException e) { - e.printStackTrace(); - throw new BufferWriteProcessorException(e); - } - } - return false; - case DANGEROUS: - default: - LOGGER.warn("Memory usage will exceed dangerous threshold, current : {}.", - MemUtils.bytesCntToStr(BasicMemController.getInstance().getTotalUsage())); - return false; - } - } - - public Pair> queryBufferwriteData(String deltaObjectId, - String measurementId, TSDataType dataType) { - flushQueryLock.lock(); - try { - MemSeriesLazyMerger memSeriesLazyMerger = new MemSeriesLazyMerger(); - if (isFlush) { - memSeriesLazyMerger.addMemSeries(flushMemTable.query(deltaObjectId, measurementId, dataType)); - } - memSeriesLazyMerger.addMemSeries(workMemTable.query(deltaObjectId, measurementId, dataType)); - RawSeriesChunk rawSeriesChunk = new RawSeriesChunkLazyLoadImpl(dataType, memSeriesLazyMerger); - return new Pair<>(rawSeriesChunk, - bufferWriteResource.getInsertMetadatas(deltaObjectId, measurementId, dataType)); - } finally { - flushQueryLock.unlock(); - } - } - - private void switchWorkToFlush() { - flushQueryLock.lock(); - try { - if (flushMemTable == null) { - flushMemTable = workMemTable; - workMemTable = new PrimitiveMemTable(); - } - } finally { - isFlush = true; - flushQueryLock.unlock(); - } - } - - private void swithFlushToWork() { - flushQueryLock.lock(); - try { - flushMemTable.clear(); - flushMemTable = null; - bufferWriteResource.appendMetadata(); - } finally { - isFlush = false; - flushQueryLock.unlock(); - } - } - - private void flushOperation(String flushFunction) { - long flushStartTime = System.currentTimeMillis(); - LOGGER.info("The bufferwrite processor {} starts flushing {}.", getProcessorName(), flushFunction); - try { - bufferWriteResource.flush(fileSchema, flushMemTable); - filenodeFlushAction.act(); - if (TsfileDBDescriptor.getInstance().getConfig().enableWal) { - logNode.notifyEndFlush(null); - } - } catch (IOException e) { - LOGGER.error("The bufferwrite processor {} failed to flush {}.", getProcessorName(), flushFunction, e); - } catch (Exception e) { - LOGGER.error("The bufferwrite processor {} failed to flush {}, when calling the filenodeFlushAction.", - getProcessorName(), flushFunction, e); - } finally { - synchronized (flushStatus) { - flushStatus.setUnFlushing(); - swithFlushToWork(); - flushStatus.notify(); - LOGGER.info("The bufferwrite processor {} ends flushing {}.", getProcessorName(), flushFunction); - } - } - long flushEndTime = System.currentTimeMillis(); - long flushInterval = flushEndTime - flushStartTime; - DateTime startDateTime = new DateTime(flushStartTime, TsfileDBDescriptor.getInstance().getConfig().timeZone); - DateTime endDateTime = new DateTime(flushEndTime, TsfileDBDescriptor.getInstance().getConfig().timeZone); - LOGGER.info( - "The bufferwrite processor {} flush {}, start time is {}, flush end time is {}, flush time consumption is {}ms", - getProcessorName(), flushFunction, startDateTime, endDateTime, flushInterval); - } - - private Future flush(boolean synchronization) throws IOException { - // statistic information for flush - if (lastFlushTime > 0) { - long thisFlushTime = System.currentTimeMillis(); - long flushTimeInterval = thisFlushTime - lastFlushTime; - DateTime lastDateTime = new DateTime(lastFlushTime, TsfileDBDescriptor.getInstance().getConfig().timeZone); - DateTime thisDateTime = new DateTime(thisFlushTime, TsfileDBDescriptor.getInstance().getConfig().timeZone); - LOGGER.info( - "The bufferwrite processor {}: last flush time is {}, this flush time is {}, flush time interval is {}s", - getProcessorName(), lastDateTime, thisDateTime, flushTimeInterval / 1000); - } - lastFlushTime = System.currentTimeMillis(); - // check value count - if (valueCount > 0) { - // waiting for the end of last flush operation. - synchronized (flushStatus) { - while (flushStatus.isFlushing()) { - try { - flushStatus.wait(); - } catch (InterruptedException e) { - LOGGER.error( - "Encounter an interrupt error when waitting for the flushing, the bufferwrite processor is {}.", - getProcessorName(), e); - } - } - } - // update the lastUpdatetime, prepare for flush - try { - bufferwriteFlushAction.act(); - } catch (Exception e) { - LOGGER.error("Failed to flush bufferwrite row group when calling the action function."); - throw new IOException(e); - } - if (TsfileDBDescriptor.getInstance().getConfig().enableWal) { - logNode.notifyStartFlush(); - } - valueCount = 0; - flushStatus.setFlushing(); - switchWorkToFlush(); - BasicMemController.getInstance().reportFree(this, memSize.get()); - memSize.set(0); - // switch - if (synchronization) { - flushOperation("synchronously"); - } else { - FlushManager.getInstance().submit(new Runnable() { - public void run() { - flushOperation("asynchronously"); - } - }); - } - } - return null; - } - - public boolean isFlush() { - synchronized (flushStatus) { - return flushStatus.isFlushing(); - } - } - - @Override - public boolean flush() throws IOException { - flush(false); - return false; - } - - @Override - public boolean canBeClosed() { - return true; - } - - @Override - public void close() throws ProcessorException { - try { - long closeStartTime = System.currentTimeMillis(); - // flush data - flush(true); - // end file - bufferWriteResource.close(fileSchema); - // update the intervalfile for interval list - bufferwriteCloseAction.act(); - // flush the changed information for filenode - filenodeFlushAction.act(); - // delete the restore for this bufferwrite processor - long closeEndTime = System.currentTimeMillis(); - long closeInterval = closeEndTime - closeStartTime; - DateTime startDateTime = new DateTime(closeStartTime, - TsfileDBDescriptor.getInstance().getConfig().timeZone); - DateTime endDateTime = new DateTime(closeEndTime, TsfileDBDescriptor.getInstance().getConfig().timeZone); - LOGGER.info( - "Close bufferwrite processor {}, the file name is {}, start time is {}, end time is {}, time consumption is {}ms", - getProcessorName(), fileName, startDateTime, endDateTime, closeInterval); - } catch (IOException e) { - LOGGER.error("Close the bufferwrite processor error, the bufferwrite is {}.", getProcessorName(), e); - throw new BufferWriteProcessorException(e); - } catch (Exception e) { - LOGGER.error("Failed to close the bufferwrite processor when calling the action function.", e); - throw new BufferWriteProcessorException(e); - } - } - - @Override - public long memoryUsage() { - return memSize.get(); - } - - /** - * @return The sum of all timeseries's metadata size within this file. - */ - public long getMetaSize() { - // TODO : [MemControl] implement this - return 0; - } - - /** - * @return The file size of the TsFile corresponding to this processor. - * @throws IOException - */ - public long getFileSize() { - // TODO : save this variable to avoid object creation? - File file = new File(insertFilePath); - return file.length() + memoryUsage(); - } - - /** - * Close current TsFile and open a new one for future writes. Block new - * writes and wait until current writes finish. - */ - public void rollToNewFile() { - // TODO : [MemControl] implement this - } - - /** - * Check if this TsFile has too big metadata or file. If true, close current - * file and open a new one. - * - * @throws IOException - */ - private boolean checkSize() throws IOException { - TsfileDBConfig config = TsfileDBDescriptor.getInstance().getConfig(); - long metaSize = getMetaSize(); - long fileSize = getFileSize(); - if (metaSize >= config.bufferwriteMetaSizeThreshold || fileSize >= config.bufferwriteFileSizeThreshold) { - LOGGER.info( - "The bufferwrite processor {}, size({}) of the file {} reaches threshold {}, size({}) of metadata reaches threshold {}.", - getProcessorName(), MemUtils.bytesCntToStr(fileSize), this.fileName, - MemUtils.bytesCntToStr(config.bufferwriteFileSizeThreshold), MemUtils.bytesCntToStr(metaSize), - MemUtils.bytesCntToStr(config.bufferwriteFileSizeThreshold)); - - rollToNewFile(); - return true; - } - return false; - } - - private String getBufferwriteRestoreFilePath() { - return bufferWriteResource.getRestoreFilePath(); - } - -} diff --git a/src/main/java/cn/edu/tsinghua/iotdb/engine/filenode/FileNodeProcessor.java b/src/main/java/cn/edu/tsinghua/iotdb/engine/filenode/FileNodeProcessor.java index f68f4419acd..84e493057f4 100644 --- a/src/main/java/cn/edu/tsinghua/iotdb/engine/filenode/FileNodeProcessor.java +++ b/src/main/java/cn/edu/tsinghua/iotdb/engine/filenode/FileNodeProcessor.java @@ -1609,6 +1609,7 @@ public class FileNodeProcessor extends Processor implements IStatistic { if (!endTimeMap.containsKey(deltaObjectId) || endTimeMap.get(deltaObjectId) < endTime) { endTimeMap.put(deltaObjectId, endTime); } + break; default: LOGGER.error("Not support data type: {}", dataType); break; diff --git a/src/main/java/cn/edu/tsinghua/iotdb/exception/builder/ExceptionBuilder.java b/src/main/java/cn/edu/tsinghua/iotdb/exception/builder/ExceptionBuilder.java new file mode 100644 index 00000000000..3c0da2a7309 --- /dev/null +++ b/src/main/java/cn/edu/tsinghua/iotdb/exception/builder/ExceptionBuilder.java @@ -0,0 +1,82 @@ +package cn.edu.tsinghua.iotdb.exception.builder; + +import cn.edu.tsinghua.iotdb.conf.TsFileDBConstant; +import cn.edu.tsinghua.iotdb.conf.TsfileDBConfig; +import cn.edu.tsinghua.iotdb.conf.TsfileDBDescriptor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.util.Iterator; +import java.util.Properties; + +public class ExceptionBuilder { + private Properties properties = new Properties(); + + public static final int UNKNOWN_ERROR = 20000; + public static final int NO_PARAMETERS_EXISTS=20001; + public static final int INVALID_PARAMETER_NO=20002; + public static final int CONN_HOST_ERROR=20003; + public static final int AUTH_PLUGIN_ERR=20061; + public static final int INSECURE_API_ERR=20062; + public static final int OUT_OF_MEMORY=20064; + public static final int NO_PREPARE_STMT=20130; + public static final int CON_FAIL_ERR=20220; + + + private static final Logger LOGGER = LoggerFactory.getLogger(TsfileDBDescriptor.class); + public static final String CONFIG_NAME= "error_info_"; + public static final String FILE_SUFFIX=".properties"; + public static final String DEFAULT_FILEPATH="error_info_en.properties"; + + private static final ExceptionBuilder INSTANCE = new ExceptionBuilder(); + public static final ExceptionBuilder getInstance() { + return ExceptionBuilder.INSTANCE; + } + + public void loadInfo(String filePath){ + InputStream in = null; + try { + in = new BufferedInputStream (new FileInputStream(filePath)); + properties.load(new InputStreamReader(in,"utf-8")); + in.close(); + } catch (IOException e) { + LOGGER.error("Read file error. File does not exist or file is broken. File path: {}.Because: {}.",filePath,e.getMessage()); + } finally { + if (in != null) { + try { + in.close(); + } catch (IOException e) { + LOGGER.error("Fail to close file: {}. Because: {}.",filePath,e.getMessage()); + } + } + } + } + + public void loadInfo(){ + String language = TsfileDBDescriptor.getInstance().getConfig().languageVersion.toLowerCase(); + + String url = System.getProperty(TsFileDBConstant.IOTDB_CONF, null); + if (url == null) { + url = System.getProperty(TsFileDBConstant.IOTDB_HOME, null); + if (url != null) { + url = url + File.separatorChar + "conf" + File.separatorChar + ExceptionBuilder.CONFIG_NAME+language+FILE_SUFFIX; + } else { + LOGGER.warn("Cannot find IOTDB_HOME or IOTDB_CONF environment variable when loading config file {}, use default configuration", TsfileDBConfig.CONFIG_NAME); + return; + } + } else{ + url += (File.separatorChar + ExceptionBuilder.CONFIG_NAME+language+FILE_SUFFIX); + } + + File file = new File(url); + if(!file.exists()){ + url.replace(CONFIG_NAME+language+FILE_SUFFIX, DEFAULT_FILEPATH); + } + + loadInfo(url); + } + public String searchInfo(int errCode){ + return properties.getProperty(String.valueOf(errCode)); + } +} diff --git a/src/main/java/cn/edu/tsinghua/iotdb/exception/codebased/AuthPluginException.java b/src/main/java/cn/edu/tsinghua/iotdb/exception/codebased/AuthPluginException.java new file mode 100644 index 00000000000..907d0a62087 --- /dev/null +++ b/src/main/java/cn/edu/tsinghua/iotdb/exception/codebased/AuthPluginException.java @@ -0,0 +1,14 @@ +package cn.edu.tsinghua.iotdb.exception.codebased; + +import cn.edu.tsinghua.iotdb.exception.builder.ExceptionBuilder; + +public class AuthPluginException extends IoTDBException { + public AuthPluginException() { + super(ExceptionBuilder.AUTH_PLUGIN_ERR); + } + public AuthPluginException(String userName, String additionalInfo) { + super(ExceptionBuilder.AUTH_PLUGIN_ERR, additionalInfo); + defaultInfo = String.format(defaultInfo, userName); + } +} + diff --git a/src/main/java/cn/edu/tsinghua/iotdb/exception/codebased/ConnectionFailedException.java b/src/main/java/cn/edu/tsinghua/iotdb/exception/codebased/ConnectionFailedException.java new file mode 100644 index 00000000000..6552ac94e04 --- /dev/null +++ b/src/main/java/cn/edu/tsinghua/iotdb/exception/codebased/ConnectionFailedException.java @@ -0,0 +1,12 @@ +package cn.edu.tsinghua.iotdb.exception.codebased; + +import cn.edu.tsinghua.iotdb.exception.builder.ExceptionBuilder; + +public class ConnectionFailedException extends IoTDBException { + public ConnectionFailedException() { + super(ExceptionBuilder.CON_FAIL_ERR); + } + public ConnectionFailedException(String additionalInfo) { + super(ExceptionBuilder.CON_FAIL_ERR, additionalInfo); + } +} diff --git a/src/main/java/cn/edu/tsinghua/iotdb/exception/codebased/ConnectionHostException.java b/src/main/java/cn/edu/tsinghua/iotdb/exception/codebased/ConnectionHostException.java new file mode 100644 index 00000000000..dcdb9ea2d89 --- /dev/null +++ b/src/main/java/cn/edu/tsinghua/iotdb/exception/codebased/ConnectionHostException.java @@ -0,0 +1,13 @@ +package cn.edu.tsinghua.iotdb.exception.codebased; + +import cn.edu.tsinghua.iotdb.exception.builder.ExceptionBuilder; + +public class ConnectionHostException extends IoTDBException { + public ConnectionHostException() { + super(ExceptionBuilder.CONN_HOST_ERROR); + } + public ConnectionHostException(String ip, String port, String additionalInfo) { + super(ExceptionBuilder.CONN_HOST_ERROR, additionalInfo); + defaultInfo=String.format(defaultInfo, ip, port); + } +} diff --git a/src/main/java/cn/edu/tsinghua/iotdb/exception/codebased/InsecureAPIException.java b/src/main/java/cn/edu/tsinghua/iotdb/exception/codebased/InsecureAPIException.java new file mode 100644 index 00000000000..0d618525736 --- /dev/null +++ b/src/main/java/cn/edu/tsinghua/iotdb/exception/codebased/InsecureAPIException.java @@ -0,0 +1,13 @@ +package cn.edu.tsinghua.iotdb.exception.codebased; + +import cn.edu.tsinghua.iotdb.exception.builder.ExceptionBuilder; + +public class InsecureAPIException extends IoTDBException{ + public InsecureAPIException() { + super(ExceptionBuilder.INSECURE_API_ERR); + } + public InsecureAPIException(String functionName, String additionalInfo) { + super(ExceptionBuilder.INSECURE_API_ERR, additionalInfo); + defaultInfo=String.format(defaultInfo, functionName); + } +} diff --git a/src/main/java/cn/edu/tsinghua/iotdb/exception/codebased/InvalidParameterException.java b/src/main/java/cn/edu/tsinghua/iotdb/exception/codebased/InvalidParameterException.java new file mode 100644 index 00000000000..25b5c945b9c --- /dev/null +++ b/src/main/java/cn/edu/tsinghua/iotdb/exception/codebased/InvalidParameterException.java @@ -0,0 +1,12 @@ +package cn.edu.tsinghua.iotdb.exception.codebased; + +import cn.edu.tsinghua.iotdb.exception.builder.ExceptionBuilder; + +public class InvalidParameterException extends IoTDBException{ + public InvalidParameterException() { + super(ExceptionBuilder.INVALID_PARAMETER_NO); + } + public InvalidParameterException(String additionalInfo) { + super(ExceptionBuilder.INVALID_PARAMETER_NO, additionalInfo); + } +} diff --git a/src/main/java/cn/edu/tsinghua/iotdb/exception/codebased/IoTDBException.java b/src/main/java/cn/edu/tsinghua/iotdb/exception/codebased/IoTDBException.java new file mode 100644 index 00000000000..5148a8565dc --- /dev/null +++ b/src/main/java/cn/edu/tsinghua/iotdb/exception/codebased/IoTDBException.java @@ -0,0 +1,28 @@ +package cn.edu.tsinghua.iotdb.exception.codebased; + +import cn.edu.tsinghua.iotdb.exception.builder.ExceptionBuilder; + +public abstract class IoTDBException extends Exception{ + private static final long serialVersionUID = -8998294067060075273L; + protected int errorCode; + protected String defaultInfo; + protected String additionalInfo; + +public IoTDBException(int errorCode){ + this.defaultInfo=ExceptionBuilder.getInstance().searchInfo(errorCode); + this.errorCode=errorCode; + +} + public IoTDBException(int errCode, String additionalInfo){ + this.errorCode=errCode; + this.additionalInfo=additionalInfo; + } + @Override + public String getMessage(){ + if(additionalInfo==null){ + return defaultInfo; + }else { + return defaultInfo + ". " + additionalInfo; + } + } +} diff --git a/src/main/java/cn/edu/tsinghua/iotdb/exception/codebased/NoParameterException.java b/src/main/java/cn/edu/tsinghua/iotdb/exception/codebased/NoParameterException.java new file mode 100644 index 00000000000..bf84580ebcf --- /dev/null +++ b/src/main/java/cn/edu/tsinghua/iotdb/exception/codebased/NoParameterException.java @@ -0,0 +1,12 @@ +package cn.edu.tsinghua.iotdb.exception.codebased; + +import cn.edu.tsinghua.iotdb.exception.builder.ExceptionBuilder; + +public class NoParameterException extends IoTDBException{ + public NoParameterException() { + super(ExceptionBuilder.NO_PARAMETERS_EXISTS); + } + public NoParameterException(String additionalInfo) { + super(ExceptionBuilder.NO_PARAMETERS_EXISTS, additionalInfo); + } +} diff --git a/src/main/java/cn/edu/tsinghua/iotdb/exception/codebased/NoPreparedStatementException.java b/src/main/java/cn/edu/tsinghua/iotdb/exception/codebased/NoPreparedStatementException.java new file mode 100644 index 00000000000..f496be503ac --- /dev/null +++ b/src/main/java/cn/edu/tsinghua/iotdb/exception/codebased/NoPreparedStatementException.java @@ -0,0 +1,12 @@ +package cn.edu.tsinghua.iotdb.exception.codebased; + +import cn.edu.tsinghua.iotdb.exception.builder.ExceptionBuilder; + +public class NoPreparedStatementException extends IoTDBException { + public NoPreparedStatementException() { + super(ExceptionBuilder.NO_PREPARE_STMT); + } + public NoPreparedStatementException(String additionalInfo) { + super(ExceptionBuilder.NO_PREPARE_STMT, additionalInfo); + } +} diff --git a/src/main/java/cn/edu/tsinghua/iotdb/exception/codebased/OutOfMemoryException.java b/src/main/java/cn/edu/tsinghua/iotdb/exception/codebased/OutOfMemoryException.java new file mode 100644 index 00000000000..819c0554915 --- /dev/null +++ b/src/main/java/cn/edu/tsinghua/iotdb/exception/codebased/OutOfMemoryException.java @@ -0,0 +1,12 @@ +package cn.edu.tsinghua.iotdb.exception.codebased; + +import cn.edu.tsinghua.iotdb.exception.builder.ExceptionBuilder; + +public class OutOfMemoryException extends IoTDBException{ + public OutOfMemoryException() { + super(ExceptionBuilder.OUT_OF_MEMORY); + } + public OutOfMemoryException(String additionalInfo) { + super(ExceptionBuilder.OUT_OF_MEMORY, additionalInfo); + } +} diff --git a/src/main/java/cn/edu/tsinghua/iotdb/exception/codebased/UnknownException.java b/src/main/java/cn/edu/tsinghua/iotdb/exception/codebased/UnknownException.java new file mode 100644 index 00000000000..0b83fe715b8 --- /dev/null +++ b/src/main/java/cn/edu/tsinghua/iotdb/exception/codebased/UnknownException.java @@ -0,0 +1,12 @@ +package cn.edu.tsinghua.iotdb.exception.codebased; + +import cn.edu.tsinghua.iotdb.exception.builder.ExceptionBuilder; + +public class UnknownException extends IoTDBException { + public UnknownException() { + super(ExceptionBuilder.UNKNOWN_ERROR); + } + public UnknownException(String additionalInfo) { + super(ExceptionBuilder.UNKNOWN_ERROR, additionalInfo); + } +} diff --git a/src/main/java/cn/edu/tsinghua/iotdb/monitor/StatMonitor.java b/src/main/java/cn/edu/tsinghua/iotdb/monitor/StatMonitor.java index 25d2194ce3e..7eaf9befba1 100644 --- a/src/main/java/cn/edu/tsinghua/iotdb/monitor/StatMonitor.java +++ b/src/main/java/cn/edu/tsinghua/iotdb/monitor/StatMonitor.java @@ -11,7 +11,7 @@ import cn.edu.tsinghua.iotdb.exception.PathErrorException; import cn.edu.tsinghua.iotdb.exception.StartupException; import cn.edu.tsinghua.iotdb.metadata.MManager; import cn.edu.tsinghua.iotdb.query.engine.OverflowQueryEngine; -import cn.edu.tsinghua.iotdb.query.management.ReadLockManager; +import cn.edu.tsinghua.iotdb.query.management.ReadCacheManager; import cn.edu.tsinghua.iotdb.service.IService; import cn.edu.tsinghua.iotdb.service.ServiceType; import cn.edu.tsinghua.tsfile.common.constant.StatisticConstant; @@ -139,9 +139,8 @@ public class StatMonitor implements IService{ try { OnePassQueryDataSet queryDataSet; queryDataSet = overflowQueryEngine.aggregate(pairList, null); - ReadLockManager.getInstance().unlockForOneRequest(); + ReadCacheManager.getInstance().unlockForOneRequest(); OldRowRecord rowRecord = queryDataSet.getNextRecord(); - if (rowRecord!=null) { FileNodeManager fManager = FileNodeManager.getInstance(); HashMap statParamsHashMap = fManager.getStatParamsHashMap(); diff --git a/src/main/java/cn/edu/tsinghua/iotdb/performance/CreatorUtils.java b/src/main/java/cn/edu/tsinghua/iotdb/performance/CreatorUtils.java index ab4fb54aa64..8add1309065 100644 --- a/src/main/java/cn/edu/tsinghua/iotdb/performance/CreatorUtils.java +++ b/src/main/java/cn/edu/tsinghua/iotdb/performance/CreatorUtils.java @@ -13,7 +13,7 @@ public class CreatorUtils { private static final Logger LOGGER = LoggerFactory.getLogger(CreatorUtils.class); // unsequence overflow file - static final String unseqTsFilePathName = "unseqTsFilec"; + static final String unseqTsFilePathName = "unSeqTsFile"; // restore file of this storage group static final String restoreFilePathName = ".restore"; diff --git a/src/main/java/cn/edu/tsinghua/iotdb/performance/ReadAnalysis.java b/src/main/java/cn/edu/tsinghua/iotdb/performance/ReadAnalysis.java index 509218d5a13..ae9b6917d09 100644 --- a/src/main/java/cn/edu/tsinghua/iotdb/performance/ReadAnalysis.java +++ b/src/main/java/cn/edu/tsinghua/iotdb/performance/ReadAnalysis.java @@ -32,21 +32,20 @@ import static cn.edu.tsinghua.iotdb.performance.CreatorUtils.*; import static cn.edu.tsinghua.iotdb.performance.ReaderCreator.getTsFileMetadata; import static cn.edu.tsinghua.iotdb.performance.ReaderCreator.getUnSeqFileMetaData; -/** - * Created by zhangjinrui on 2018/3/13. - */ public class ReadAnalysis { private static final Logger LOGGER = LoggerFactory.getLogger(ReadAnalysis.class); - private static String mergeOutPutFolder; private static String fileFolderName; private static String unSeqFilePath; - + private static String mergeOutPutFolder; private static Map>> unSeqFileMetaData; private static Map> unSeqFileDeltaObjectTimeRangeMap; + /** + * @param args merge test fileFolderName + */ public static void main(String args[]) throws WriteProcessException, IOException, InterruptedException { fileFolderName = args[0]; mergeOutPutFolder = fileFolderName + "/merge/"; @@ -154,13 +153,6 @@ public class ReadAnalysis { System.out.println(String.format("All file merge time consuming : %dms", allFileMergeEndTime - allFileMergeStartTime)); } - private TSRecord constructTsRecord(TimeValuePair timeValuePair, String deltaObjectId, String measurementId) { - TSRecord record = new TSRecord(timeValuePair.getTimestamp(), deltaObjectId); - record.addTuple(DataPoint.getDataPoint(timeValuePair.getValue().getDataType(), measurementId, - timeValuePair.getValue().getValue().toString())); - return record; - } - private TSEncoding getEncodingByDataType(TSDataType dataType) { switch (dataType) { case TEXT: diff --git a/src/main/java/cn/edu/tsinghua/iotdb/performance/ReaderCreator.java b/src/main/java/cn/edu/tsinghua/iotdb/performance/ReaderCreator.java index 973d4a6e369..df04b5a1601 100644 --- a/src/main/java/cn/edu/tsinghua/iotdb/performance/ReaderCreator.java +++ b/src/main/java/cn/edu/tsinghua/iotdb/performance/ReaderCreator.java @@ -58,7 +58,7 @@ public class ReaderCreator { public static TimeValuePairReader createReaderForMerge(String tsfilePath, String unseqTsFilePath, Path path, long startTime, long endTime) throws IOException { OverflowSeriesDataSource overflowSeriesDataSource = genDataSource(unseqTsFilePath, path); - TsfileDBDescriptor.getInstance().getConfig().bufferWriteDir = ""; + TsfileDBDescriptor.getInstance().getConfig().bufferWriteDirs = new String[] {""}; Filter filter = FilterFactory.and(TimeFilter.gtEq(startTime), TimeFilter.ltEq(endTime)); SeriesFilter seriesFilter = new SeriesFilter<>(path, filter); IntervalFileNode intervalFileNode = new IntervalFileNode(null, tsfilePath); @@ -70,7 +70,7 @@ public class ReaderCreator { public static TimeValuePairReader createReaderOnlyForOverflowInsert(String unseqTsFilePath, Path path, long startTime, long endTime) throws IOException { OverflowSeriesDataSource overflowSeriesDataSource = genDataSource(unseqTsFilePath, path); - TsfileDBDescriptor.getInstance().getConfig().bufferWriteDir = ""; + TsfileDBDescriptor.getInstance().getConfig().bufferWriteDirs = new String[] {""}; Filter filter = FilterFactory.and(TimeFilter.gtEq(startTime), TimeFilter.ltEq(endTime)); SeriesFilter seriesFilter = new SeriesFilter<>(path, filter); TimeValuePairReader reader = SeriesReaderFactory.getInstance().createSeriesReaderForOverflowInsert(overflowSeriesDataSource, diff --git a/src/main/java/cn/edu/tsinghua/iotdb/performance/TsFileReadPerformance.java b/src/main/java/cn/edu/tsinghua/iotdb/performance/TsFileReadPerformance.java index 1f6f2afdd07..7c02b70e298 100644 --- a/src/main/java/cn/edu/tsinghua/iotdb/performance/TsFileReadPerformance.java +++ b/src/main/java/cn/edu/tsinghua/iotdb/performance/TsFileReadPerformance.java @@ -21,7 +21,7 @@ import static cn.edu.tsinghua.iotdb.performance.ReaderCreator.getTsFileMetadata; public class TsFileReadPerformance { - private static final String inputFilePath = "/Users/beyyes/Desktop/root.ln.wf632814.type4/1514676100617-1520943086803"; + private static final String inputFilePath = ""; public static void main(String[] args) throws IOException { long startTime = System.currentTimeMillis(); diff --git a/src/main/java/cn/edu/tsinghua/iotdb/performance/WriteAnalysis.java b/src/main/java/cn/edu/tsinghua/iotdb/performance/WriteAnalysis.java index 640f86a1499..085e92500f5 100644 --- a/src/main/java/cn/edu/tsinghua/iotdb/performance/WriteAnalysis.java +++ b/src/main/java/cn/edu/tsinghua/iotdb/performance/WriteAnalysis.java @@ -1,6 +1,10 @@ package cn.edu.tsinghua.iotdb.performance; +import cn.edu.tsinghua.iotdb.engine.bufferwrite.FileNodeConstants; +import cn.edu.tsinghua.iotdb.engine.filenode.FileNodeProcessor; +import cn.edu.tsinghua.iotdb.engine.querycontext.OverflowSeriesDataSource; import cn.edu.tsinghua.iotdb.queryV2.factory.SeriesReaderFactory; +import cn.edu.tsinghua.tsfile.common.conf.TSFileConfig; import cn.edu.tsinghua.tsfile.common.conf.TSFileDescriptor; import cn.edu.tsinghua.tsfile.common.utils.ITsRandomAccessFileReader; import cn.edu.tsinghua.tsfile.common.utils.Pair; @@ -17,13 +21,18 @@ import cn.edu.tsinghua.tsfile.timeseries.filterV2.factory.FilterFactory; import cn.edu.tsinghua.tsfile.timeseries.read.TsRandomAccessLocalFileReader; import cn.edu.tsinghua.tsfile.timeseries.read.support.Path; import cn.edu.tsinghua.tsfile.timeseries.readV2.datatype.TimeValuePair; +import cn.edu.tsinghua.tsfile.timeseries.readV2.reader.SeriesReader; import cn.edu.tsinghua.tsfile.timeseries.readV2.reader.TimeValuePairReader; import cn.edu.tsinghua.tsfile.timeseries.write.TsFileWriter; import cn.edu.tsinghua.tsfile.timeseries.write.desc.MeasurementDescriptor; import cn.edu.tsinghua.tsfile.timeseries.write.exception.WriteProcessException; +import cn.edu.tsinghua.tsfile.timeseries.write.io.TsFileIOWriter; +import cn.edu.tsinghua.tsfile.timeseries.write.page.IPageWriter; +import cn.edu.tsinghua.tsfile.timeseries.write.page.PageWriterImpl; import cn.edu.tsinghua.tsfile.timeseries.write.record.DataPoint; import cn.edu.tsinghua.tsfile.timeseries.write.record.TSRecord; import cn.edu.tsinghua.tsfile.timeseries.write.schema.FileSchema; +import cn.edu.tsinghua.tsfile.timeseries.write.series.SeriesWriterImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,16 +53,19 @@ public class WriteAnalysis { private static final Logger LOGGER = LoggerFactory.getLogger(WriteAnalysis.class); + private static final TSFileConfig TsFileConf = TSFileDescriptor.getInstance().getConfig(); private static String mergeOutPutFolder; private static String fileFolderName; private static String unSeqFilePath; - private static String singleTsFilePath = "/Users/beyyes/Desktop/root.ln.wf632814.type4/1514676100617-1520943086803"; private static Map>> unSeqFileMetaData; private static Map> unSeqFileDeltaObjectTimeRangeMap; private static long max = -1; + /** + * @param args merge test fileFolderName + */ public static void main(String args[]) throws WriteProcessException, IOException { fileFolderName = args[0]; mergeOutPutFolder = fileFolderName + "/merge/"; @@ -65,7 +77,7 @@ public class WriteAnalysis { unSeqFileMetaData = getUnSeqFileMetaData(unSeqFilePath); WriteAnalysis writeAnalysis = new WriteAnalysis(); writeAnalysis.initUnSeqFileStatistics(); - //writeAnalysis.unTsFileReadPerformance(); + //writeAnalysis.unSeqTsFileReadPerformance(); //writeAnalysis.tsFileReadPerformance(); writeAnalysis.executeMerge(); System.out.println("max memory usage:" + max); @@ -117,40 +129,72 @@ public class WriteAnalysis { fileSchema.registerMeasurement(new MeasurementDescriptor(timeSeriesMetadata.getMeasurementUID(), timeSeriesMetadata.getType(), getEncodingByDataType(timeSeriesMetadata.getType()))); } - TsFileWriter fileWriter = new TsFileWriter(outPutFile, fileSchema, TSFileDescriptor.getInstance().getConfig()); + TsFileIOWriter fileIOWriter = new TsFileIOWriter(new File(mergeOutPutFolder + file.getName())); //examine whether this TsFile should be merged for (Map.Entry tsFileEntry : tsFileMetaData.getDeltaObjectMap().entrySet()) { - String tsFileDeltaObjectId = tsFileEntry.getKey(); - long tsFileDeltaObjectStartTime = tsFileMetaData.getDeltaObject(tsFileDeltaObjectId).startTime; - long tsFileDeltaObjectEndTime = tsFileMetaData.getDeltaObject(tsFileDeltaObjectId).endTime; - if (unSeqFileMetaData.containsKey(tsFileDeltaObjectId) && - unSeqFileDeltaObjectTimeRangeMap.get(tsFileDeltaObjectId).left <= tsFileDeltaObjectStartTime - && unSeqFileDeltaObjectTimeRangeMap.get(tsFileDeltaObjectId).right >= tsFileDeltaObjectStartTime) { - for (TimeSeriesMetadata timeSeriesMetadata : tsFileMetaData.getTimeSeriesList()) { - if (unSeqFileMetaData.get(tsFileDeltaObjectId).containsKey(timeSeriesMetadata.getMeasurementUID())) { + String deltaObjectId = tsFileEntry.getKey(); + long tsFileDeltaObjectStartTime = tsFileMetaData.getDeltaObject(deltaObjectId).startTime; + long tsFileDeltaObjectEndTime = tsFileMetaData.getDeltaObject(deltaObjectId).endTime; + boolean isRowGroupHasData = false; - TimeValuePairReader reader = ReaderCreator.createReaderForMerge(file.getPath(), unSeqFilePath, - new Path(tsFileDeltaObjectId + "." + timeSeriesMetadata.getMeasurementUID()), + if (unSeqFileMetaData.containsKey(deltaObjectId) && + unSeqFileDeltaObjectTimeRangeMap.get(deltaObjectId).left <= tsFileDeltaObjectStartTime + && unSeqFileDeltaObjectTimeRangeMap.get(deltaObjectId).right >= tsFileDeltaObjectStartTime) { + + long recordCount = 0; + long startPos = 0; + + for (TimeSeriesMetadata timeSeriesMetadata : tsFileMetaData.getTimeSeriesList()) { + String measurementId = timeSeriesMetadata.getMeasurementUID(); + TSDataType dataType = timeSeriesMetadata.getType(); + + if (unSeqFileMetaData.get(deltaObjectId).containsKey(measurementId)) { + + TimeValuePairReader seriesReader = ReaderCreator.createReaderForMerge(file.getPath(), unSeqFilePath, + new Path(deltaObjectId + "." + timeSeriesMetadata.getMeasurementUID()), tsFileDeltaObjectStartTime, tsFileDeltaObjectEndTime); - // calc hasnext method executing time - while (reader.hasNext()) { - TimeValuePair tp = reader.next(); - //count ++; - // calc time consuming of writing - TSRecord record = constructTsRecord(tp, tsFileDeltaObjectId, timeSeriesMetadata.getMeasurementUID()); - fileWriter.write(record); - } + if (!seriesReader.hasNext()) { + // LOGGER.debug("The time-series {} has no data"); + seriesReader.close(); + } else { - reader.close(); + if (!isRowGroupHasData) { + // start a new rowGroupMetadata + isRowGroupHasData = true; + fileIOWriter.startRowGroup(deltaObjectId); + startPos = fileIOWriter.getPos(); + } + + // init the serieswWriteImpl + MeasurementDescriptor desc = fileSchema.getMeasurementDescriptor(measurementId); + IPageWriter pageWriter = new PageWriterImpl(desc); + int pageSizeThreshold = TsFileConf.pageSizeInByte; + SeriesWriterImpl seriesWriterImpl = new SeriesWriterImpl(deltaObjectId, desc, pageWriter, + pageSizeThreshold); + + // write the series data + recordCount += writeOneSeries(deltaObjectId, measurementId, seriesWriterImpl, dataType, + (SeriesReader) seriesReader, new HashMap<>(), new HashMap<>()); + // flush the series data + seriesWriterImpl.writeToFileWriter(fileIOWriter); + + seriesReader.close(); + } } } + + if (isRowGroupHasData) { + // end the new rowGroupMetadata + long memSize = fileIOWriter.getPos() - startPos; + fileIOWriter.endRowGroup(memSize, recordCount); + } } } - + + fileIOWriter.endFile(fileSchema); randomAccessFileReader.close(); - fileWriter.close(); long oneFileMergeEndTime = System.currentTimeMillis(); System.out.println(String.format("Current file merge time consuming : %sms, record number: %s," + "original file size is %d, current file size is %d", @@ -196,7 +240,6 @@ public class WriteAnalysis { fileSchema.registerMeasurement(new MeasurementDescriptor(timeSeriesMetadata.getMeasurementUID(), timeSeriesMetadata.getType(), getEncodingByDataType(timeSeriesMetadata.getType()))); } - TsFileWriter fileWriter = new TsFileWriter(outPutFile, fileSchema, TSFileDescriptor.getInstance().getConfig()); for (Map.Entry tsFileEntry : tsFileMetaData.getDeltaObjectMap().entrySet()) { String tsFileDeltaObjectId = tsFileEntry.getKey(); @@ -216,9 +259,6 @@ public class WriteAnalysis { //long tmpRecordCount = 0; while (reader.hasNext()) { TimeValuePair tp = reader.next(); - TSRecord record = constructTsRecord(tp, tsFileDeltaObjectId, measurementId); - fileWriter.write(record); - //tmpRecordCount++; } // if (seriesPoints.containsKey(tsFileDeltaObjectId) && seriesPoints.get(tsFileDeltaObjectId).containsKey(measurementId)) { // long originalCount = seriesPoints.get(tsFileDeltaObjectId).get(measurementId); @@ -253,7 +293,7 @@ public class WriteAnalysis { * @throws IOException * @throws WriteProcessException */ - private void unTsFileReadPerformance() throws IOException { + private void unSeqTsFileReadPerformance() throws IOException { Pair validFiles = getValidFiles(fileFolderName); if (!validFiles.left) { @@ -298,14 +338,14 @@ public class WriteAnalysis { tsFileReader.close(); -// TimeValuePairReader overflowInsertReader = ReaderCreator.createReaderOnlyForOverflowInsert(unSeqFilePath, -// new Path(tsFileDeltaObjectId + "." + timeSeriesMetadata.getMeasurementUID()), -// tsFileDeltaObjectStartTime, tsFileDeltaObjectEndTime); -// while (overflowInsertReader.hasNext()) { -// TimeValuePair tp = overflowInsertReader.next(); -// count ++; -// } -// overflowInsertReader.close(); + TimeValuePairReader overflowInsertReader = ReaderCreator.createReaderOnlyForOverflowInsert(unSeqFilePath, + new Path(tsFileDeltaObjectId + "." + timeSeriesMetadata.getMeasurementUID()), + tsFileDeltaObjectStartTime, tsFileDeltaObjectEndTime); + while (overflowInsertReader.hasNext()) { + TimeValuePair tp = overflowInsertReader.next(); + count ++; + } + overflowInsertReader.close(); } } } @@ -320,11 +360,141 @@ public class WriteAnalysis { } } - private TSRecord constructTsRecord(TimeValuePair timeValuePair, String deltaObjectId, String measurementId) { - TSRecord record = new TSRecord(timeValuePair.getTimestamp(), deltaObjectId); - record.addTuple(DataPoint.getDataPoint(timeValuePair.getValue().getDataType(), measurementId, - timeValuePair.getValue().getValue().toString())); - return record; + private int writeOneSeries(String deltaObjectId, String measurement, SeriesWriterImpl seriesWriterImpl, + TSDataType dataType, SeriesReader seriesReader, Map startTimeMap, + Map endTimeMap) throws IOException { + int count = 0; + if (!seriesReader.hasNext()) + return 0; + TimeValuePair timeValuePair = seriesReader.next(); + long startTime = -1; + long endTime = -1; + switch (dataType) { + case BOOLEAN: + seriesWriterImpl.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBoolean()); + count++; + startTime = endTime = timeValuePair.getTimestamp(); + if (!startTimeMap.containsKey(deltaObjectId) || startTimeMap.get(deltaObjectId) > startTime) { + startTimeMap.put(deltaObjectId, startTime); + } + if (!endTimeMap.containsKey(deltaObjectId) || endTimeMap.get(deltaObjectId) < endTime) { + endTimeMap.put(deltaObjectId, endTime); + } + while (seriesReader.hasNext()) { + count++; + timeValuePair = seriesReader.next(); + endTime = timeValuePair.getTimestamp(); + seriesWriterImpl.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBoolean()); + } + if (!endTimeMap.containsKey(deltaObjectId) || endTimeMap.get(deltaObjectId) < endTime) { + endTimeMap.put(deltaObjectId, endTime); + } + break; + case INT32: + seriesWriterImpl.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getInt()); + count++; + startTime = endTime = timeValuePair.getTimestamp(); + if (!startTimeMap.containsKey(deltaObjectId) || startTimeMap.get(deltaObjectId) > startTime) { + startTimeMap.put(deltaObjectId, startTime); + } + if (!endTimeMap.containsKey(deltaObjectId) || endTimeMap.get(deltaObjectId) < endTime) { + endTimeMap.put(deltaObjectId, endTime); + } + while (seriesReader.hasNext()) { + count++; + timeValuePair = seriesReader.next(); + endTime = timeValuePair.getTimestamp(); + seriesWriterImpl.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getInt()); + } + if (!endTimeMap.containsKey(deltaObjectId) || endTimeMap.get(deltaObjectId) < endTime) { + endTimeMap.put(deltaObjectId, endTime); + } + break; + case INT64: + seriesWriterImpl.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getLong()); + count++; + startTime = endTime = timeValuePair.getTimestamp(); + if (!startTimeMap.containsKey(deltaObjectId) || startTimeMap.get(deltaObjectId) > startTime) { + startTimeMap.put(deltaObjectId, startTime); + } + if (!endTimeMap.containsKey(deltaObjectId) || endTimeMap.get(deltaObjectId) < endTime) { + endTimeMap.put(deltaObjectId, endTime); + } + while (seriesReader.hasNext()) { + count++; + timeValuePair = seriesReader.next(); + endTime = timeValuePair.getTimestamp(); + seriesWriterImpl.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getLong()); + } + if (!endTimeMap.containsKey(deltaObjectId) || endTimeMap.get(deltaObjectId) < endTime) { + endTimeMap.put(deltaObjectId, endTime); + } + break; + case FLOAT: + seriesWriterImpl.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getFloat()); + count++; + startTime = endTime = timeValuePair.getTimestamp(); + if (!startTimeMap.containsKey(deltaObjectId) || startTimeMap.get(deltaObjectId) > startTime) { + startTimeMap.put(deltaObjectId, startTime); + } + if (!endTimeMap.containsKey(deltaObjectId) || endTimeMap.get(deltaObjectId) < endTime) { + endTimeMap.put(deltaObjectId, endTime); + } + while (seriesReader.hasNext()) { + count++; + timeValuePair = seriesReader.next(); + endTime = timeValuePair.getTimestamp(); + seriesWriterImpl.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getFloat()); + } + if (!endTimeMap.containsKey(deltaObjectId) || endTimeMap.get(deltaObjectId) < endTime) { + endTimeMap.put(deltaObjectId, endTime); + } + break; + case DOUBLE: + seriesWriterImpl.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getDouble()); + count++; + startTime = endTime = timeValuePair.getTimestamp(); + if (!startTimeMap.containsKey(deltaObjectId) || startTimeMap.get(deltaObjectId) > startTime) { + startTimeMap.put(deltaObjectId, startTime); + } + if (!endTimeMap.containsKey(deltaObjectId) || endTimeMap.get(deltaObjectId) < endTime) { + endTimeMap.put(deltaObjectId, endTime); + } + while (seriesReader.hasNext()) { + count++; + timeValuePair = seriesReader.next(); + endTime = timeValuePair.getTimestamp(); + seriesWriterImpl.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getDouble()); + } + if (!endTimeMap.containsKey(deltaObjectId) || endTimeMap.get(deltaObjectId) < endTime) { + endTimeMap.put(deltaObjectId, endTime); + } + break; + case TEXT: + seriesWriterImpl.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBinary()); + count++; + startTime = endTime = timeValuePair.getTimestamp(); + if (!startTimeMap.containsKey(deltaObjectId) || startTimeMap.get(deltaObjectId) > startTime) { + startTimeMap.put(deltaObjectId, startTime); + } + if (!endTimeMap.containsKey(deltaObjectId) || endTimeMap.get(deltaObjectId) < endTime) { + endTimeMap.put(deltaObjectId, endTime); + } + while (seriesReader.hasNext()) { + count++; + timeValuePair = seriesReader.next(); + endTime = timeValuePair.getTimestamp(); + seriesWriterImpl.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBinary()); + } + if (!endTimeMap.containsKey(deltaObjectId) || endTimeMap.get(deltaObjectId) < endTime) { + endTimeMap.put(deltaObjectId, endTime); + } + break; + default: + LOGGER.error("Not support data type: {}", dataType); + break; + } + return count; } private TSEncoding getEncodingByDataType(TSDataType dataType) { diff --git a/src/main/java/cn/edu/tsinghua/iotdb/postback/receiver/ServerServiceImpl.java b/src/main/java/cn/edu/tsinghua/iotdb/postback/receiver/ServerServiceImpl.java index b3b69790ae7..4c887bcdb8c 100644 --- a/src/main/java/cn/edu/tsinghua/iotdb/postback/receiver/ServerServiceImpl.java +++ b/src/main/java/cn/edu/tsinghua/iotdb/postback/receiver/ServerServiceImpl.java @@ -22,6 +22,8 @@ import java.util.Map; import java.util.Set; import java.util.Map.Entry; +import cn.edu.tsinghua.tsfile.timeseries.read.query.OnePassQueryDataSet; +import cn.edu.tsinghua.tsfile.timeseries.read.support.OldRowRecord; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,10 +47,10 @@ import cn.edu.tsinghua.tsfile.file.utils.ReadWriteThriftFormatUtils; import cn.edu.tsinghua.tsfile.timeseries.basis.TsFile; import cn.edu.tsinghua.tsfile.timeseries.read.FileReader; import cn.edu.tsinghua.tsfile.timeseries.read.TsRandomAccessLocalFileReader; -import cn.edu.tsinghua.tsfile.timeseries.read.query.QueryDataSet; + import cn.edu.tsinghua.tsfile.timeseries.read.support.Field; import cn.edu.tsinghua.tsfile.timeseries.read.support.Path; -import cn.edu.tsinghua.tsfile.timeseries.read.support.RowRecord; + /** * @author lta @@ -384,9 +386,9 @@ public class ServerServiceImpl implements ServerService.Iface { for (String timesery : timeseries) { paths.add(new Path(timesery)); } - QueryDataSet queryDataSet = readTsFile.query(paths, null, null); + OnePassQueryDataSet queryDataSet = readTsFile.query(paths, null, null); while (queryDataSet.hasNextRecord()) { - RowRecord record = queryDataSet.getNextRecord(); + OldRowRecord record = queryDataSet.getNextRecord(); List fields = record.getFields(); String sql_front = null; for (Field field : fields) { @@ -492,9 +494,9 @@ public class ServerServiceImpl implements ServerService.Iface { Map originDataPoint = new HashMap<>(); Map newDataPoint = new HashMap<>(); String sqlFormat = "insert into %s(timestamp,%s) values(%s,%s)"; - QueryDataSet queryDataSet = readTsFile.query(paths, null, null); + OnePassQueryDataSet queryDataSet = readTsFile.query(paths, null, null); while (queryDataSet.hasNextRecord()) { - RowRecord record = queryDataSet.getNextRecord(); + OldRowRecord record = queryDataSet.getNextRecord(); List fields = record.getFields(); String sql; for (Field field : fields) { // get all data with the timesery in the postback file @@ -516,9 +518,9 @@ public class ServerServiceImpl implements ServerService.Iface { try { inputOverlap = new TsRandomAccessLocalFileReader(overlapFile); TsFile readTsFileOverlap = new TsFile(inputOverlap); - QueryDataSet queryDataSetOverlap = readTsFileOverlap.query(paths, null, null); + OnePassQueryDataSet queryDataSetOverlap = readTsFileOverlap.query(paths, null, null); while (queryDataSetOverlap.hasNextRecord()) { - RowRecord recordOverlap = queryDataSetOverlap.getNextRecord(); + OldRowRecord recordOverlap = queryDataSetOverlap.getNextRecord(); List fields = recordOverlap.getFields(); String sql; for (Field field : fields) { diff --git a/src/main/java/cn/edu/tsinghua/iotdb/query/engine/AggregateEngine.java b/src/main/java/cn/edu/tsinghua/iotdb/query/engine/AggregateEngine.java index 10ef62ed9b9..9e48b4575b7 100644 --- a/src/main/java/cn/edu/tsinghua/iotdb/query/engine/AggregateEngine.java +++ b/src/main/java/cn/edu/tsinghua/iotdb/query/engine/AggregateEngine.java @@ -1,11 +1,16 @@ package cn.edu.tsinghua.iotdb.query.engine; +import cn.edu.tsinghua.iotdb.concurrent.IoTDBThreadPoolFactory; import cn.edu.tsinghua.iotdb.conf.TsfileDBDescriptor; +import cn.edu.tsinghua.iotdb.engine.filenode.FileNodeManager; +import cn.edu.tsinghua.iotdb.exception.FileNodeManagerException; import cn.edu.tsinghua.iotdb.exception.PathErrorException; import cn.edu.tsinghua.iotdb.metadata.MManager; import cn.edu.tsinghua.iotdb.query.aggregation.AggregateFunction; +import cn.edu.tsinghua.iotdb.query.management.FileReaderMap; import cn.edu.tsinghua.iotdb.query.management.FilterStructure; import cn.edu.tsinghua.iotdb.query.management.ReadCachePrefix; +import cn.edu.tsinghua.iotdb.query.management.ReadCacheManager; import cn.edu.tsinghua.iotdb.query.reader.AggregateRecordReader; import cn.edu.tsinghua.iotdb.query.reader.QueryRecordReader; import cn.edu.tsinghua.iotdb.query.reader.ReaderType; @@ -23,6 +28,8 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.*; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; import static cn.edu.tsinghua.iotdb.query.engine.EngineUtils.noFilterOrOnlyHasTimeFilter; @@ -38,6 +45,19 @@ public class AggregateEngine { private int crossQueryFetchSize = 10 * TsfileDBDescriptor.getInstance().getConfig().fetchSize; + //private ExecutorService aggregateThreadPool; +// private AggregateEngine() { +// } + +// private static class AggregateEngineHolder { +// private static final AggregateEngine INSTANCE = new AggregateEngine(); +// +// } +// +// public static final AggregateEngine getInstance() { +// return AggregateEngineHolder.INSTANCE; +// } + /** *

Public invoking method of multiple aggregation. * @@ -222,18 +242,56 @@ public class AggregateEngine { throws PathErrorException, ProcessorException, IOException { int aggreNumber = 0; + CountDownLatch latch = new CountDownLatch(aggres.size()); + ExecutorService service = IoTDBThreadPoolFactory.newFixedThreadPool(Runtime.getRuntime().availableProcessors() - 1, + "AggregateThread"); + //ExecutorService service = Executors.newFixedThreadPool(aggres.size()); + for (Pair pair : aggres) { aggreNumber++; Path path = pair.left; AggregateFunction aggregateFunction = pair.right; + String deltaObjectUID = path.getDeltaObjectToString(); String measurementUID = path.getMeasurementToString(); - AggregateRecordReader recordReader = (AggregateRecordReader) RecordReaderFactory.getInstance().getRecordReader(deltaObjectUID, measurementUID, - queryTimeFilter, null, null, ReadCachePrefix.addQueryPrefix(aggreNumber), ReaderType.AGGREGATE); + queryTimeFilter, null, null, ReadCachePrefix.addQueryPrefix(aggreNumber), ReaderType.AGGREGATE); + service.submit(new AggregateThread(recordReader, aggregateFunction, latch)); + } - recordReader.aggregate(aggregateFunction); + try { + latch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + service.shutdown(); + } + + private class AggregateThread implements Runnable { + + private AggregateRecordReader aggregateRecordReader; + private AggregateFunction aggregateFunction; + private CountDownLatch latch; + + public AggregateThread(AggregateRecordReader aggregateRecordReader, AggregateFunction aggregateFunction, CountDownLatch latch) { + this.aggregateRecordReader = aggregateRecordReader; + this.aggregateFunction = aggregateFunction; + this.latch = latch; + } + + @Override + public void run() { + try { + aggregateRecordReader .aggregate(aggregateFunction); +// FileNodeManager.getInstance().endQuery(deltaObjectUID, recordReader.getReadToken()); +// ReadCacheManager.getInstance().removeReadToken(deltaObjectUID, recordReader.getReadToken()); + FileReaderMap.getInstance().close(); + latch.countDown(); + } catch (ProcessorException | IOException e) { + e.printStackTrace(); + } } } @@ -244,8 +302,8 @@ public class AggregateEngine { * once for querying d1.s1, once for querying d2.s1. *

* When this method is invoked, need add the filter index as a new parameter, for the reason of exist of - * RecordReaderCache, if the composition of CrossFilterExpression exist same SingleFilterExpression, - * we must guarantee that the RecordReaderCache doesn't cause conflict to the same SingleFilterExpression. + * RecordReaderCacheManager, if the composition of CrossFilterExpression exist same SingleFilterExpression, + * we must guarantee that the RecordReaderCacheManager doesn't cause conflict to the same SingleFilterExpression. */ private DynamicOneColumnData getDataUseSingleValueFilter(SingleSeriesFilterExpression queryValueFilter, DynamicOneColumnData res, int fetchSize, int valueFilterNumber) diff --git a/src/main/java/cn/edu/tsinghua/iotdb/query/engine/OverflowQueryEngine.java b/src/main/java/cn/edu/tsinghua/iotdb/query/engine/OverflowQueryEngine.java index f5f10bc2ec7..f0b513727cc 100644 --- a/src/main/java/cn/edu/tsinghua/iotdb/query/engine/OverflowQueryEngine.java +++ b/src/main/java/cn/edu/tsinghua/iotdb/query/engine/OverflowQueryEngine.java @@ -11,7 +11,7 @@ import cn.edu.tsinghua.iotdb.query.fill.LinearFill; import cn.edu.tsinghua.iotdb.query.fill.PreviousFill; import cn.edu.tsinghua.iotdb.query.management.FilterStructure; import cn.edu.tsinghua.iotdb.query.management.ReadCachePrefix; -import cn.edu.tsinghua.iotdb.query.management.ReadLockManager; +import cn.edu.tsinghua.iotdb.query.management.ReadCacheManager; import cn.edu.tsinghua.iotdb.query.reader.QueryRecordReader; import cn.edu.tsinghua.iotdb.query.reader.ReaderType; import cn.edu.tsinghua.iotdb.query.reader.RecordReaderFactory; @@ -110,16 +110,15 @@ public class OverflowQueryEngine { aggregations.add(new Pair<>(pair.left, aggregateFunction)); } - AggregateEngine aggregateEngine = new AggregateEngine(); - aggregateEngine.multiAggregate(aggregations, filterStructures); + new AggregateEngine().multiAggregate(aggregations, filterStructures); OnePassQueryDataSet ansOnePassQueryDataSet = new OnePassQueryDataSet(); for (Pair pair : aggregations) { AggregateFunction aggregateFunction = pair.right; if (aggregateFunction.resultData.timeLength == 0) { aggregateFunction.putDefaultValue(); } - LOGGER.debug(String.format("key %s, data length %s, empty data length %s", EngineUtils.aggregationKey(aggregateFunction, pair.left), - aggregateFunction.resultData.timeLength, aggregateFunction.resultData.emptyTimeLength)); + LOGGER.debug(String.format("key %s, data time length %s, data value length %s, empty data time length %s", EngineUtils.aggregationKey(aggregateFunction, pair.left), + aggregateFunction.resultData.timeLength, aggregateFunction.resultData.valueLength, aggregateFunction.resultData.emptyTimeLength)); ansOnePassQueryDataSet.mapRet.put(EngineUtils.aggregationKey(aggregateFunction, pair.left), aggregateFunction.resultData); } // aggregateThreadLocal.set(ansOnePassQueryDataSet); @@ -140,9 +139,9 @@ public class OverflowQueryEngine { public OnePassQueryDataSet groupBy(List> aggres, List filterStructures, long unit, long origin, List> intervals, int fetchSize) { - ThreadLocal groupByCalcTime = ReadLockManager.getInstance().getGroupByCalcTime(); - ThreadLocal groupByEngineNoFilterLocal = ReadLockManager.getInstance().getGroupByEngineNoFilterLocal(); - ThreadLocal groupByEngineWithFilterLocal = ReadLockManager.getInstance().getGroupByEngineWithFilterLocal(); + ThreadLocal groupByCalcTime = ReadCacheManager.getInstance().getGroupByCalcTime(); + ThreadLocal groupByEngineNoFilterLocal = ReadCacheManager.getInstance().getGroupByEngineNoFilterLocal(); + ThreadLocal groupByEngineWithFilterLocal = ReadCacheManager.getInstance().getGroupByEngineWithFilterLocal(); if (groupByCalcTime.get() == null) { @@ -419,8 +418,8 @@ public class OverflowQueryEngine { * once for querying d1.s1, once for querying d2.s1. *

* When this method is invoked, need add the filter index as a new parameter, for the reason of exist of - * RecordReaderCache, if the composition of CrossFilterExpression exist same SingleFilterExpression, - * we must guarantee that the RecordReaderCache doesn't cause conflict to the same SingleFilterExpression. + * RecordReaderCacheManager, if the composition of CrossFilterExpression exist same SingleFilterExpression, + * we must guarantee that the RecordReaderCacheManager doesn't cause conflict to the same SingleFilterExpression. */ private DynamicOneColumnData querySeriesForCross(SingleSeriesFilterExpression queryValueFilter, DynamicOneColumnData res, int fetchSize, int valueFilterNumber) diff --git a/src/main/java/cn/edu/tsinghua/iotdb/query/engine/groupby/GroupByEngineWithFilter.java b/src/main/java/cn/edu/tsinghua/iotdb/query/engine/groupby/GroupByEngineWithFilter.java index 5e8363d4603..3195b5a451d 100644 --- a/src/main/java/cn/edu/tsinghua/iotdb/query/engine/groupby/GroupByEngineWithFilter.java +++ b/src/main/java/cn/edu/tsinghua/iotdb/query/engine/groupby/GroupByEngineWithFilter.java @@ -381,8 +381,8 @@ public class GroupByEngineWithFilter { * once for querying d1.s1, once for querying d2.s1. *

* When this method is invoked, need add the filter index as a new parameter, for the reason of exist of - * RecordReaderCache, if the composition of CrossFilterExpression exist same SingleFilterExpression, - * we must guarantee that the RecordReaderCache doesn't cause conflict to the same SingleFilterExpression. + * RecordReaderCacheManager, if the composition of CrossFilterExpression exist same SingleFilterExpression, + * we must guarantee that the RecordReaderCacheManager doesn't cause conflict to the same SingleFilterExpression. */ private static DynamicOneColumnData getDataUseSingleValueFilter(SingleSeriesFilterExpression queryValueFilter, DynamicOneColumnData res, int fetchSize, int valueFilterNumber) diff --git a/src/main/java/cn/edu/tsinghua/iotdb/query/management/FileReaderMap.java b/src/main/java/cn/edu/tsinghua/iotdb/query/management/FileReaderMap.java index 1ed20ed5bc6..7c29829ea0a 100644 --- a/src/main/java/cn/edu/tsinghua/iotdb/query/management/FileReaderMap.java +++ b/src/main/java/cn/edu/tsinghua/iotdb/query/management/FileReaderMap.java @@ -11,12 +11,12 @@ public class FileReaderMap { /** map to store opened file stream **/ private static ThreadLocal> fileReaderMap = new ThreadLocal<>(); - private static class ReaderHolder { + private static class FileReaderMapHolder { private static final FileReaderMap INSTANCE = new FileReaderMap(); } public static FileReaderMap getInstance() { - return ReaderHolder.INSTANCE; + return FileReaderMapHolder.INSTANCE; } public TsRandomAccessLocalFileReader get(String path) throws IOException { diff --git a/src/main/java/cn/edu/tsinghua/iotdb/query/management/ReadLockManager.java b/src/main/java/cn/edu/tsinghua/iotdb/query/management/ReadCacheManager.java similarity index 82% rename from src/main/java/cn/edu/tsinghua/iotdb/query/management/ReadLockManager.java rename to src/main/java/cn/edu/tsinghua/iotdb/query/management/ReadCacheManager.java index 06c6e078144..a355574e25a 100644 --- a/src/main/java/cn/edu/tsinghua/iotdb/query/management/ReadLockManager.java +++ b/src/main/java/cn/edu/tsinghua/iotdb/query/management/ReadCacheManager.java @@ -2,6 +2,7 @@ package cn.edu.tsinghua.iotdb.query.management; import java.io.IOException; import java.util.HashMap; +import java.util.Map; import cn.edu.tsinghua.iotdb.engine.filenode.FileNodeManager; import cn.edu.tsinghua.iotdb.exception.FileNodeManagerException; @@ -11,23 +12,33 @@ import cn.edu.tsinghua.tsfile.common.exception.ProcessorException; /** *

- * Read process lock manager and ThreadLocal variable manager. + * Read lock manager, ThreadLocal variable and RecordReaderCacheManager manager. * When a query process is over or quit abnormally, the unlockForOneRequest method will * be invoked to clear the thread level variable. *

* */ -public class ReadLockManager { +public class ReadCacheManager { + + private static final class ReadCacheManagerHolder { + private static final ReadCacheManager INSTANCE = new ReadCacheManager(); + } + + private ReadCacheManager() { + } + + public static ReadCacheManager getInstance() { + return ReadCacheManagerHolder.INSTANCE; + } - private static ReadLockManager instance = new ReadLockManager(); private FileNodeManager fileNodeManager = FileNodeManager.getInstance(); /** storage deltaObjectId and its read lock **/ private ThreadLocal> locksMap = new ThreadLocal<>(); - /** this is no need to set as ThreadLocal, RecordReaderCache has ThreadLocal variable**/ - public RecordReaderCache recordReaderCache = new RecordReaderCache(); + /** this is no need to set as ThreadLocal, RecordReaderCacheManager has ThreadLocal variable**/ + private RecordReaderCacheManager recordReaderCacheManager = new RecordReaderCacheManager(); /** represents the execute time of group by method**/ private ThreadLocal groupByCalcTime; @@ -38,9 +49,6 @@ public class ReadLockManager { /** ThreadLocal, due to the usage of OverflowQPExecutor **/ private ThreadLocal groupByEngineWithFilterLocal; - private ReadLockManager() { - } - public int lock(String deltaObjectUID) throws ProcessorException { checkLocksMap(); int token; @@ -58,6 +66,10 @@ public class ReadLockManager { return token; } + public void removeReadToken(String deltaObjectId, int readToken) { + locksMap.get().remove(deltaObjectId, readToken); + } + /** * When jdbc connection is closed normally or quit abnormally, this method should be invoked.
* All read cache in this request should be released. @@ -68,7 +80,8 @@ public class ReadLockManager { if (locksMap.get() == null) { return; } - HashMap locks = locksMap.get(); + + Map locks = locksMap.get(); for (String key : locks.keySet()) { unlockForQuery(key, locks.get(key)); } @@ -85,7 +98,7 @@ public class ReadLockManager { groupByEngineWithFilterLocal.remove(); } - recordReaderCache.clear(); + recordReaderCacheManager.clear(); FileReaderMap.getInstance().close(); } @@ -98,10 +111,6 @@ public class ReadLockManager { } } - public static ReadLockManager getInstance() { - return instance; - } - private void checkLocksMap() { if (locksMap.get() == null) { locksMap.set(new HashMap<>()); @@ -115,10 +124,6 @@ public class ReadLockManager { return this.groupByCalcTime; } - public void setGroupByCalcTime(ThreadLocal t) { - this.groupByCalcTime = t; - } - public ThreadLocal getGroupByEngineNoFilterLocal() { if (groupByEngineNoFilterLocal == null) { groupByEngineNoFilterLocal = new ThreadLocal<>(); @@ -126,10 +131,6 @@ public class ReadLockManager { return this.groupByEngineNoFilterLocal; } - public void setGroupByEngineNoFilterLocal(ThreadLocal t) { - this.groupByEngineNoFilterLocal = t; - } - public ThreadLocal getGroupByEngineWithFilterLocal() { if (groupByEngineWithFilterLocal == null) { groupByEngineWithFilterLocal = new ThreadLocal<>(); @@ -137,8 +138,7 @@ public class ReadLockManager { return this.groupByEngineWithFilterLocal; } - public void setGroupByEngineWithFilterLocal(ThreadLocal t) { - this.groupByEngineWithFilterLocal = t; + public RecordReaderCacheManager getRecordReaderCacheManager() { + return recordReaderCacheManager; } - } diff --git a/src/main/java/cn/edu/tsinghua/iotdb/query/management/RecordReaderCache.java b/src/main/java/cn/edu/tsinghua/iotdb/query/management/RecordReaderCacheManager.java similarity index 97% rename from src/main/java/cn/edu/tsinghua/iotdb/query/management/RecordReaderCache.java rename to src/main/java/cn/edu/tsinghua/iotdb/query/management/RecordReaderCacheManager.java index efbd1ad65cc..319380ef9db 100644 --- a/src/main/java/cn/edu/tsinghua/iotdb/query/management/RecordReaderCache.java +++ b/src/main/java/cn/edu/tsinghua/iotdb/query/management/RecordReaderCacheManager.java @@ -8,7 +8,7 @@ import cn.edu.tsinghua.iotdb.query.reader.RecordReader; /** * Used for read process, put the query structure in the cache for one query process. */ -public class RecordReaderCache { +public class RecordReaderCacheManager { private ThreadLocal> cache = new ThreadLocal<>(); diff --git a/src/main/java/cn/edu/tsinghua/iotdb/query/reader/AggregateRecordReader.java b/src/main/java/cn/edu/tsinghua/iotdb/query/reader/AggregateRecordReader.java index 48c260b4bc7..d4830678a7f 100644 --- a/src/main/java/cn/edu/tsinghua/iotdb/query/reader/AggregateRecordReader.java +++ b/src/main/java/cn/edu/tsinghua/iotdb/query/reader/AggregateRecordReader.java @@ -34,10 +34,10 @@ public class AggregateRecordReader extends RecordReader { public AggregateRecordReader(GlobalSortedSeriesDataSource globalSortedSeriesDataSource, OverflowSeriesDataSource overflowSeriesDataSource, String deltaObjectId, String measurementId, - SingleSeriesFilterExpression queryTimeFilter, SingleSeriesFilterExpression queryValueFilter) + SingleSeriesFilterExpression queryTimeFilter, SingleSeriesFilterExpression queryValueFilter, int readToken) throws PathErrorException, IOException { super(globalSortedSeriesDataSource, overflowSeriesDataSource, deltaObjectId, measurementId, - queryTimeFilter, queryValueFilter); + queryTimeFilter, queryValueFilter, readToken); } /** diff --git a/src/main/java/cn/edu/tsinghua/iotdb/query/reader/FillRecordReader.java b/src/main/java/cn/edu/tsinghua/iotdb/query/reader/FillRecordReader.java index 904041304ad..8ab2ee6f44d 100644 --- a/src/main/java/cn/edu/tsinghua/iotdb/query/reader/FillRecordReader.java +++ b/src/main/java/cn/edu/tsinghua/iotdb/query/reader/FillRecordReader.java @@ -17,9 +17,9 @@ public class FillRecordReader extends RecordReader{ public FillRecordReader(GlobalSortedSeriesDataSource globalSortedSeriesDataSource, OverflowSeriesDataSource overflowSeriesDataSource, String deltaObjectId, String measurementId, - SingleSeriesFilterExpression queryTimeFilter, SingleSeriesFilterExpression queryValueFilter) + SingleSeriesFilterExpression queryTimeFilter, SingleSeriesFilterExpression queryValueFilter, int readToken) throws PathErrorException, IOException { - super(globalSortedSeriesDataSource, overflowSeriesDataSource, deltaObjectId, measurementId, queryTimeFilter, queryValueFilter); + super(globalSortedSeriesDataSource, overflowSeriesDataSource, deltaObjectId, measurementId, queryTimeFilter, queryValueFilter, readToken); } /** diff --git a/src/main/java/cn/edu/tsinghua/iotdb/query/reader/QueryRecordReader.java b/src/main/java/cn/edu/tsinghua/iotdb/query/reader/QueryRecordReader.java index 212c8359eda..547a8adb782 100644 --- a/src/main/java/cn/edu/tsinghua/iotdb/query/reader/QueryRecordReader.java +++ b/src/main/java/cn/edu/tsinghua/iotdb/query/reader/QueryRecordReader.java @@ -33,9 +33,9 @@ public class QueryRecordReader extends RecordReader { public QueryRecordReader(GlobalSortedSeriesDataSource globalSortedSeriesDataSource, OverflowSeriesDataSource overflowSeriesDataSource, String deltaObjectId, String measurementId, - SingleSeriesFilterExpression queryTimeFilter, SingleSeriesFilterExpression queryValueFilter) + SingleSeriesFilterExpression queryTimeFilter, SingleSeriesFilterExpression queryValueFilter, int readToken) throws PathErrorException, IOException { - super(globalSortedSeriesDataSource, overflowSeriesDataSource, deltaObjectId, measurementId, queryTimeFilter, queryValueFilter); + super(globalSortedSeriesDataSource, overflowSeriesDataSource, deltaObjectId, measurementId, queryTimeFilter, queryValueFilter, readToken); overflowOperationReaderCopy = overflowOperationReader.copy(); diff --git a/src/main/java/cn/edu/tsinghua/iotdb/query/reader/RecordReader.java b/src/main/java/cn/edu/tsinghua/iotdb/query/reader/RecordReader.java index 54b74745c16..7e5998c2b36 100644 --- a/src/main/java/cn/edu/tsinghua/iotdb/query/reader/RecordReader.java +++ b/src/main/java/cn/edu/tsinghua/iotdb/query/reader/RecordReader.java @@ -81,9 +81,11 @@ public class RecordReader { /** memRawSeriesChunk + overflowSeriesInsertReader + overflowOperationReader **/ protected InsertDynamicData insertMemoryData; + protected int readToken; + public RecordReader(GlobalSortedSeriesDataSource globalSortedSeriesDataSource, OverflowSeriesDataSource overflowSeriesDataSource, String deltaObjectId, String measurementId, - SingleSeriesFilterExpression queryTimeFilter, SingleSeriesFilterExpression queryValueFilter) + SingleSeriesFilterExpression queryTimeFilter, SingleSeriesFilterExpression queryValueFilter, int readToken) throws PathErrorException, IOException { List sealedFilePathList = new ArrayList<>(); @@ -130,6 +132,8 @@ public class RecordReader { if (queryValueFilter != null) { singleValueVisitor = getSingleValueVisitorByDataType(dataType, queryValueFilter); } + + this.readToken = readToken; } public void closeFileStream() { @@ -144,4 +148,8 @@ public class RecordReader { public InsertDynamicData getInsertMemoryData() { return this.insertMemoryData; } + + public int getReadToken() { + return this.readToken; + } } diff --git a/src/main/java/cn/edu/tsinghua/iotdb/query/reader/RecordReaderFactory.java b/src/main/java/cn/edu/tsinghua/iotdb/query/reader/RecordReaderFactory.java index 6ff29574de7..14a611fdb96 100644 --- a/src/main/java/cn/edu/tsinghua/iotdb/query/reader/RecordReaderFactory.java +++ b/src/main/java/cn/edu/tsinghua/iotdb/query/reader/RecordReaderFactory.java @@ -4,7 +4,7 @@ import cn.edu.tsinghua.iotdb.engine.filenode.FileNodeManager; import cn.edu.tsinghua.iotdb.engine.querycontext.QueryDataSource; import cn.edu.tsinghua.iotdb.exception.FileNodeManagerException; import cn.edu.tsinghua.iotdb.exception.PathErrorException; -import cn.edu.tsinghua.iotdb.query.management.ReadLockManager; +import cn.edu.tsinghua.iotdb.query.management.ReadCacheManager; import cn.edu.tsinghua.tsfile.common.exception.ProcessorException; import cn.edu.tsinghua.tsfile.timeseries.filter.definition.SingleSeriesFilterExpression; import cn.edu.tsinghua.tsfile.timeseries.filterV2.expression.impl.QueryFilterFactory; @@ -27,11 +27,11 @@ public class RecordReaderFactory { private static RecordReaderFactory instance = new RecordReaderFactory(); private FileNodeManager fileNodeManager; - private ReadLockManager readLockManager; + private ReadCacheManager readCacheManager; private RecordReaderFactory() { fileNodeManager = FileNodeManager.getInstance(); - readLockManager = ReadLockManager.getInstance(); + readCacheManager = ReadCacheManager.getInstance(); } /** @@ -39,23 +39,23 @@ public class RecordReaderFactory { * * @param readLock if readLock is not null, the read lock of file node has been created,
* else a new read lock token should be applied. - * @param prefix for the exist of RecordReaderCache and batch read, we need a prefix to + * @param prefix for the exist of RecordReaderCacheManager and batch read, we need a prefix to * represent the uniqueness. * @return RecordReader */ - public RecordReader getRecordReader(String deltaObjectUID, String measurementID, + public synchronized RecordReader getRecordReader(String deltaObjectUID, String measurementID, SingleSeriesFilterExpression timeFilter, SingleSeriesFilterExpression valueFilter, Integer readLock, String prefix, ReaderType readerType) throws ProcessorException, PathErrorException, IOException { - int token = 0; + int readToken = 0; if (readLock == null) { - token = readLockManager.lock(deltaObjectUID); + readToken = readCacheManager.lock(deltaObjectUID); } else { - token = readLock; + readToken = readLock; } String cacheDeltaKey = prefix + deltaObjectUID; - if (readLockManager.recordReaderCache.containsRecordReader(cacheDeltaKey, measurementID)) { - return readLockManager.recordReaderCache.get(cacheDeltaKey, measurementID); + if (readCacheManager.getRecordReaderCacheManager().containsRecordReader(cacheDeltaKey, measurementID)) { + return readCacheManager.getRecordReaderCacheManager().get(cacheDeltaKey, measurementID); } else { QueryDataSource queryDataSource; try { @@ -65,28 +65,28 @@ public class RecordReaderFactory { throw new ProcessorException(e.getMessage()); } - RecordReader recordReader = createANewRecordReader(deltaObjectUID, measurementID, timeFilter, valueFilter, queryDataSource, readerType); - readLockManager.recordReaderCache.put(cacheDeltaKey, measurementID, recordReader); + RecordReader recordReader = createANewRecordReader(deltaObjectUID, measurementID, timeFilter, valueFilter, queryDataSource, readerType, readToken); + readCacheManager.getRecordReaderCacheManager().put(cacheDeltaKey, measurementID, recordReader); return recordReader; } } private RecordReader createANewRecordReader(String deltaObjectUID, String measurementID, SingleSeriesFilterExpression queryTimeFilter, SingleSeriesFilterExpression queryValueFilter, - QueryDataSource queryDataSource, ReaderType readerType) throws PathErrorException, IOException { + QueryDataSource queryDataSource, ReaderType readerType, int readToken) throws PathErrorException, IOException { switch (readerType) { case QUERY: return new QueryRecordReader(queryDataSource.getSeriesDataSource(), queryDataSource.getOverflowSeriesDataSource(), - deltaObjectUID, measurementID, queryTimeFilter, queryValueFilter); + deltaObjectUID, measurementID, queryTimeFilter, queryValueFilter, readToken); case AGGREGATE: return new AggregateRecordReader(queryDataSource.getSeriesDataSource(), queryDataSource.getOverflowSeriesDataSource(), - deltaObjectUID, measurementID, queryTimeFilter, queryValueFilter); + deltaObjectUID, measurementID, queryTimeFilter, queryValueFilter, readToken); case FILL: return new FillRecordReader(queryDataSource.getSeriesDataSource(), queryDataSource.getOverflowSeriesDataSource(), - deltaObjectUID, measurementID, queryTimeFilter, queryValueFilter); + deltaObjectUID, measurementID, queryTimeFilter, queryValueFilter, readToken); case GROUPBY: return new QueryRecordReader(queryDataSource.getSeriesDataSource(), queryDataSource.getOverflowSeriesDataSource(), - deltaObjectUID, measurementID, queryTimeFilter, queryValueFilter); + deltaObjectUID, measurementID, queryTimeFilter, queryValueFilter, readToken); } return null; @@ -98,11 +98,11 @@ public class RecordReaderFactory { // TODO this method is only used in test case and KV-match index public void removeRecordReader(String deltaObjectId, String measurementId) throws IOException { - if (readLockManager.recordReaderCache.containsRecordReader(deltaObjectId, measurementId)) { + if (readCacheManager.getRecordReaderCacheManager().containsRecordReader(deltaObjectId, measurementId)) { // close the RecordReader read stream. - readLockManager.recordReaderCache.get(deltaObjectId, measurementId).closeFileStream(); - readLockManager.recordReaderCache.get(deltaObjectId, measurementId).closeFileStreamForOneRequest(); - readLockManager.recordReaderCache.remove(deltaObjectId, measurementId); + readCacheManager.getRecordReaderCacheManager().get(deltaObjectId, measurementId).closeFileStream(); + readCacheManager.getRecordReaderCacheManager().get(deltaObjectId, measurementId).closeFileStreamForOneRequest(); + readCacheManager.getRecordReaderCacheManager().remove(deltaObjectId, measurementId); } } } diff --git a/src/main/java/cn/edu/tsinghua/iotdb/service/IoTDB.java b/src/main/java/cn/edu/tsinghua/iotdb/service/IoTDB.java index 4d8c4d1e39f..0e4a85fe714 100644 --- a/src/main/java/cn/edu/tsinghua/iotdb/service/IoTDB.java +++ b/src/main/java/cn/edu/tsinghua/iotdb/service/IoTDB.java @@ -4,6 +4,7 @@ import java.io.IOException; import java.util.List; import cn.edu.tsinghua.iotdb.exception.*; +import cn.edu.tsinghua.iotdb.exception.builder.ExceptionBuilder; import cn.edu.tsinghua.iotdb.metadata.MManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,7 +27,7 @@ public class IoTDB implements IoTDBMBean{ private static final Logger LOGGER = LoggerFactory.getLogger(IoTDB.class); private RegisterManager registerManager = new RegisterManager(); private final String MBEAN_NAME = String.format("%s:%s=%s", TsFileDBConstant.IOTDB_PACKAGE, TsFileDBConstant.JMX_TYPE, "IoTDB"); - + private ServerManager serverManager = ServerManager.getInstance(); private static class IoTDBHolder { @@ -82,7 +83,9 @@ public class IoTDB implements IoTDBMBean{ registerManager.register(BasicMemController.getInstance()); JMXService.registerMBean(getInstance(), MBEAN_NAME); - + + initErrorInformation(); + serverManager.startServer(); } @@ -101,6 +104,10 @@ public class IoTDB implements IoTDBMBean{ Thread.setDefaultUncaughtExceptionHandler(new IoTDBDefaultThreadExceptionHandler()); } + private void initErrorInformation(){ + ExceptionBuilder.getInstance().loadInfo(); + } + /** * Recover data using system log. * @throws RecoverException diff --git a/src/main/java/cn/edu/tsinghua/iotdb/service/TSServiceImpl.java b/src/main/java/cn/edu/tsinghua/iotdb/service/TSServiceImpl.java index 0d34597a6c7..3a10f31dbe4 100644 --- a/src/main/java/cn/edu/tsinghua/iotdb/service/TSServiceImpl.java +++ b/src/main/java/cn/edu/tsinghua/iotdb/service/TSServiceImpl.java @@ -24,9 +24,8 @@ import cn.edu.tsinghua.iotdb.qp.logical.Operator; import cn.edu.tsinghua.iotdb.qp.physical.PhysicalPlan; import cn.edu.tsinghua.iotdb.qp.physical.crud.IndexQueryPlan; import cn.edu.tsinghua.iotdb.qp.physical.crud.MultiQueryPlan; -import cn.edu.tsinghua.iotdb.qp.physical.crud.QueryPlan; import cn.edu.tsinghua.iotdb.qp.physical.sys.AuthorPlan; -import cn.edu.tsinghua.iotdb.query.management.ReadLockManager; +import cn.edu.tsinghua.iotdb.query.management.ReadCacheManager; import cn.edu.tsinghua.tsfile.common.exception.ProcessorException; import cn.edu.tsinghua.tsfile.timeseries.read.support.Path; import cn.edu.tsinghua.tsfile.timeseries.readV2.query.QueryDataSet; @@ -134,7 +133,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { public TSCloseOperationResp closeOperation(TSCloseOperationReq req) throws TException { LOGGER.info("{}: receive close operation",TsFileDBConstant.GLOBAL_DB_NAME); try { - ReadLockManager.getInstance().unlockForOneRequest(); + ReadCacheManager.getInstance().unlockForOneRequest(); clearAllStatusForCurrentRequest(); } catch (ProcessorException | IOException e) { LOGGER.error("Error in closeOperation : {}", e.getMessage()); diff --git a/src/test/java/cn/edu/tsinghua/iotdb/engine/bufferwrite/BufferWriteIOTest.java b/src/test/java/cn/edu/tsinghua/iotdb/engine/bufferwrite/BufferWriteIOTest.java index 6ed4dc01587..83273905cde 100644 --- a/src/test/java/cn/edu/tsinghua/iotdb/engine/bufferwrite/BufferWriteIOTest.java +++ b/src/test/java/cn/edu/tsinghua/iotdb/engine/bufferwrite/BufferWriteIOTest.java @@ -11,7 +11,6 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; -import cn.edu.tsinghua.iotdb.engine.bufferwriteV2.BufferIO; import cn.edu.tsinghua.iotdb.utils.EnvironmentUtils; import cn.edu.tsinghua.tsfile.common.utils.TsRandomAccessFileWriter; import cn.edu.tsinghua.tsfile.file.metadata.RowGroupMetaData; diff --git a/src/test/java/cn/edu/tsinghua/iotdb/engine/bufferwrite/BufferWriteProcessorTest.java b/src/test/java/cn/edu/tsinghua/iotdb/engine/bufferwrite/BufferWriteProcessorTest.java index e6f87bb26ba..000fb5f599e 100644 --- a/src/test/java/cn/edu/tsinghua/iotdb/engine/bufferwrite/BufferWriteProcessorTest.java +++ b/src/test/java/cn/edu/tsinghua/iotdb/engine/bufferwrite/BufferWriteProcessorTest.java @@ -18,7 +18,6 @@ import org.junit.Test; import cn.edu.tsinghua.iotdb.engine.MetadataManagerHelper; import cn.edu.tsinghua.iotdb.engine.PathUtils; -import cn.edu.tsinghua.iotdb.engine.bufferwriteV2.BufferWriteProcessor; import cn.edu.tsinghua.iotdb.engine.querycontext.RawSeriesChunk; import cn.edu.tsinghua.iotdb.utils.EnvironmentUtils; import cn.edu.tsinghua.iotdb.utils.FileSchemaUtils; diff --git a/src/test/java/cn/edu/tsinghua/iotdb/engine/bufferwrite/BufferWriteResourceTest.java b/src/test/java/cn/edu/tsinghua/iotdb/engine/bufferwrite/BufferWriteResourceTest.java index 1aeac42e8b0..f4ccc46f496 100644 --- a/src/test/java/cn/edu/tsinghua/iotdb/engine/bufferwrite/BufferWriteResourceTest.java +++ b/src/test/java/cn/edu/tsinghua/iotdb/engine/bufferwrite/BufferWriteResourceTest.java @@ -13,7 +13,6 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; -import cn.edu.tsinghua.iotdb.engine.bufferwriteV2.BufferWriteResource; import cn.edu.tsinghua.iotdb.engine.memtable.IMemTable; import cn.edu.tsinghua.iotdb.engine.memtable.MemTableTestUtils; import cn.edu.tsinghua.iotdb.engine.memtable.TreeSetMemTable; diff --git a/src/test/java/cn/edu/tsinghua/iotdb/engine/filenode/FileNodeLastUpdateMulTest.java b/src/test/java/cn/edu/tsinghua/iotdb/engine/filenode/FileNodeLastUpdateMulTest.java index b4b1053435d..4f8d554d278 100644 --- a/src/test/java/cn/edu/tsinghua/iotdb/engine/filenode/FileNodeLastUpdateMulTest.java +++ b/src/test/java/cn/edu/tsinghua/iotdb/engine/filenode/FileNodeLastUpdateMulTest.java @@ -16,7 +16,7 @@ //import cn.edu.tsinghua.iotdb.conf.TsfileDBConfig; //import cn.edu.tsinghua.iotdb.conf.TsfileDBDescriptor; //import cn.edu.tsinghua.iotdb.engine.MetadataManagerHelper; -//import cn.edu.tsinghua.iotdb.engine.bufferwrite.Action; +//import cn.edu.tsinghua.iotdb.engine.bufferwriteV2.Action; //import cn.edu.tsinghua.iotdb.engine.bufferwrite.BufferWriteProcessor; //import cn.edu.tsinghua.iotdb.engine.overflow.io.OverflowProcessor; //import cn.edu.tsinghua.iotdb.exception.BufferWriteProcessorException; diff --git a/src/test/java/cn/edu/tsinghua/iotdb/engine/filenode/FileNodeProcessorTest.java b/src/test/java/cn/edu/tsinghua/iotdb/engine/filenode/FileNodeProcessorTest.java index 61d7b14611e..9735f5893f9 100644 --- a/src/test/java/cn/edu/tsinghua/iotdb/engine/filenode/FileNodeProcessorTest.java +++ b/src/test/java/cn/edu/tsinghua/iotdb/engine/filenode/FileNodeProcessorTest.java @@ -19,7 +19,7 @@ //import cn.edu.tsinghua.iotdb.conf.TsfileDBDescriptor; //import cn.edu.tsinghua.iotdb.engine.MetadataManagerHelper; //import cn.edu.tsinghua.iotdb.engine.PathUtils; -//import cn.edu.tsinghua.iotdb.engine.bufferwrite.Action; +//import cn.edu.tsinghua.iotdb.engine.bufferwriteV2.Action; //import cn.edu.tsinghua.iotdb.engine.bufferwrite.BufferWriteProcessor; //import cn.edu.tsinghua.iotdb.engine.overflow.io.OverflowProcessor; //import cn.edu.tsinghua.iotdb.exception.BufferWriteProcessorException; diff --git a/src/test/java/cn/edu/tsinghua/iotdb/exception/ExceptionBuilderTest.java b/src/test/java/cn/edu/tsinghua/iotdb/exception/ExceptionBuilderTest.java new file mode 100644 index 00000000000..851fb76477f --- /dev/null +++ b/src/test/java/cn/edu/tsinghua/iotdb/exception/ExceptionBuilderTest.java @@ -0,0 +1,85 @@ +package cn.edu.tsinghua.iotdb.exception; + +import cn.edu.tsinghua.iotdb.exception.builder.ExceptionBuilder; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; + +public class ExceptionBuilderTest { + + @Before + public void before() { + Properties prop = new Properties(); + FileOutputStream out1 = null; + FileOutputStream out2 = null; + try { + out1 = new FileOutputStream("err_info_en.properties", true); + prop.setProperty("20000","Unknown error"); + prop.setProperty("20001","No parameters exist in the statement"); + prop.setProperty("20002","Invalid parameter number"); + prop.setProperty("20003","Can't connect to server on {}({})"); + prop.setProperty("20061","Authentication plugin {} reported error: {}"); + prop.setProperty("20062","Insecure API function call: {}"); + prop.setProperty("20064","Client ran out of memory"); + prop.setProperty("20130","Statement not prepared"); + prop.setProperty("20220","Fail to connect"); + + prop.store(new OutputStreamWriter(out1, "utf-8"), "english version"); + + out2 = new FileOutputStream("err_info_cn.properties", true); + prop.setProperty("20000","未知错误"); + prop.setProperty("20001","语句中无变量"); + prop.setProperty("20002","无效的变量"); + prop.setProperty("20003","无法连接到服务器"); + prop.setProperty("20061","验证失败"); + prop.setProperty("20062","不安全的函数调用"); + prop.setProperty("20064","客户端内存溢出"); + prop.setProperty("20130","语句未就绪"); + prop.setProperty("20220","连接失败"); + + prop.store(new OutputStreamWriter(out2, "utf-8"), "chinese version"); + } catch (Exception e) { + e.printStackTrace(); + } finally { + try { + out1.close(); + out2.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + @After + public void after() { + File file1 = new File("err_info_en.properties"); + File file2 = new File("err_info_cn.properties"); + if(file1.exists()){ + file1.delete(); + } + if(file2.exists()){ + file2.delete(); + } + } + + @Test + public void testLoadProp() { + ExceptionBuilder excpHandler = new ExceptionBuilder(); + excpHandler.loadInfo("err_info_en.properties"); + assertEquals("Invalid parameter number",excpHandler.searchInfo(20002)); + assertEquals("Can't connect to server on {}({})",excpHandler.searchInfo(20003)); + + excpHandler.loadInfo("err_info_cn.properties"); + assertEquals("无法连接到服务器",excpHandler.searchInfo(20003)); + assertEquals("验证失败",excpHandler.searchInfo(20061)); + } + +} diff --git a/src/test/java/cn/edu/tsinghua/iotdb/qp/query/TestOnePassQpQuery.java b/src/test/java/cn/edu/tsinghua/iotdb/qp/query/TestOnePassQpQuery.java index 33b7074033b..cef4d69cb53 100644 --- a/src/test/java/cn/edu/tsinghua/iotdb/qp/query/TestOnePassQpQuery.java +++ b/src/test/java/cn/edu/tsinghua/iotdb/qp/query/TestOnePassQpQuery.java @@ -1,113 +1,113 @@ -package cn.edu.tsinghua.iotdb.qp.query; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Collection; -import java.util.Iterator; - -import cn.edu.tsinghua.iotdb.exception.ArgsErrorException; -import cn.edu.tsinghua.iotdb.exception.FileNodeManagerException; -import cn.edu.tsinghua.iotdb.qp.QueryProcessor; -import cn.edu.tsinghua.iotdb.qp.exception.QueryProcessorException; -import cn.edu.tsinghua.tsfile.timeseries.readV2.query.QueryDataSet; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; - -import cn.edu.tsinghua.iotdb.qp.physical.PhysicalPlan; -import cn.edu.tsinghua.iotdb.qp.utils.MemIntQpExecutor; -import cn.edu.tsinghua.tsfile.common.constant.SystemConstant; -import cn.edu.tsinghua.tsfile.common.exception.ProcessorException; -import cn.edu.tsinghua.tsfile.timeseries.read.support.Path; -import cn.edu.tsinghua.tsfile.timeseries.read.query.OnePassQueryDataSet; -import cn.edu.tsinghua.tsfile.timeseries.utils.StringContainer; - -/** - * test query operation - * - * @author kangrong - */ -@RunWith(Parameterized.class) -public class TestOnePassQpQuery { - private QueryProcessor processor = new QueryProcessor(new MemIntQpExecutor()); - - private final String inputSQL; - private final String[] expectRet; - - @Parameters - public static Collection data() { - return Arrays - .asList(new Object[][]{ - { - "select d1.s1 from root.laptop where root.laptop.d1.s1 < 100", - new String[]{"20, ", - "40, ", - "60, ", - "80, "}}, - { - "select d1.s1, d1.s2 from root.laptop where root.laptop.d1.s1 < 100", - new String[]{"20, ", - "40, ", - "60, ", - "80, "}}, - { - "select d1.s1 from root.laptop where d1.s1 < 100", - new String[]{"20, ", - "40, ", - "60, ", - "80, "}}, - { - "select s1 from root.laptop.d1,root.laptop.d2 where root.laptop.d1.s1 < 100", - new String[]{"20, ", - "40, ", - "60, ", - "80, "}}, - } - ); - } - - public TestOnePassQpQuery(String sql, String[] ret) { - inputSQL = sql; - this.expectRet = ret; - } - - @Before - public void before() throws ProcessorException { - Path path1 = new Path(new StringContainer(new String[]{"root", "laptop", "d1", "s1"}, - SystemConstant.PATH_SEPARATOR)); - Path path2 = new Path(new StringContainer(new String[]{"root", "laptop", "d1", "s2"}, - SystemConstant.PATH_SEPARATOR)); - Path path3 = new Path(new StringContainer(new String[]{"root", "laptop", "d2", "s1"}, - SystemConstant.PATH_SEPARATOR)); - for (int i = 1; i <= 10; i++) { - processor.getExecutor().insert(path1, i * 20, Integer.toString(i * 20 + 1)); - processor.getExecutor().insert(path2, i * 50, Integer.toString(i * 50 + 2)); - processor.getExecutor().insert(path3, i * 20, Integer.toString(i * 50 + 2)); - } - } - - @Test - public void testQueryBasic() throws QueryProcessorException, ArgsErrorException, IOException, FileNodeManagerException, ProcessorException { - PhysicalPlan plan = processor.parseSQLToPhysicalPlan(inputSQL); - if (!plan.isQuery()) - fail(); - - QueryDataSet queryDataSet = processor.getExecutor().processQuery(plan); - int i = 0; - while (queryDataSet.hasNext()) { - if (i == expectRet.length) - fail(); - String actual = queryDataSet.next().toString(); - System.out.println(actual); - assertEquals(expectRet[i++], actual); - } - System.out.println("query result:\n"); - assertEquals(expectRet.length, i); - } - -} +//package cn.edu.tsinghua.iotdb.qp.query; +// +//import static org.junit.Assert.assertEquals; +//import static org.junit.Assert.fail; +// +//import java.io.IOException; +//import java.util.Arrays; +//import java.util.Collection; +//import java.util.Iterator; +// +//import cn.edu.tsinghua.iotdb.exception.ArgsErrorException; +//import cn.edu.tsinghua.iotdb.exception.FileNodeManagerException; +//import cn.edu.tsinghua.iotdb.qp.QueryProcessor; +//import cn.edu.tsinghua.iotdb.qp.exception.QueryProcessorException; +//import cn.edu.tsinghua.tsfile.timeseries.readV2.query.QueryDataSet; +//import org.junit.Before; +//import org.junit.Test; +//import org.junit.runner.RunWith; +//import org.junit.runners.Parameterized; +//import org.junit.runners.Parameterized.Parameters; +// +//import cn.edu.tsinghua.iotdb.qp.physical.PhysicalPlan; +//import cn.edu.tsinghua.iotdb.qp.utils.MemIntQpExecutor; +//import cn.edu.tsinghua.tsfile.common.constant.SystemConstant; +//import cn.edu.tsinghua.tsfile.common.exception.ProcessorException; +//import cn.edu.tsinghua.tsfile.timeseries.read.support.Path; +//import cn.edu.tsinghua.tsfile.timeseries.read.query.OnePassQueryDataSet; +//import cn.edu.tsinghua.tsfile.timeseries.utils.StringContainer; +// +///** +// * test query operation +// * +// * @author kangrong +// */ +//@RunWith(Parameterized.class) +//public class TestOnePassQpQuery { +// private QueryProcessor processor = new QueryProcessor(new MemIntQpExecutor()); +// +// private final String inputSQL; +// private final String[] expectRet; +// +// @Parameters +// public static Collection data() { +// return Arrays +// .asList(new Object[][]{ +// { +// "select d1.s1 from root.laptop where root.laptop.d1.s1 < 100", +// new String[]{"20, ", +// "40, ", +// "60, ", +// "80, "}}, +// { +// "select d1.s1, d1.s2 from root.laptop where root.laptop.d1.s1 < 100", +// new String[]{"20, ", +// "40, ", +// "60, ", +// "80, "}}, +// { +// "select d1.s1 from root.laptop where d1.s1 < 100", +// new String[]{"20, ", +// "40, ", +// "60, ", +// "80, "}}, +// { +// "select s1 from root.laptop.d1,root.laptop.d2 where root.laptop.d1.s1 < 100", +// new String[]{"20, ", +// "40, ", +// "60, ", +// "80, "}}, +// } +// ); +// } +// +// public TestOnePassQpQuery(String sql, String[] ret) { +// inputSQL = sql; +// this.expectRet = ret; +// } +// +// @Before +// public void before() throws ProcessorException { +// Path path1 = new Path(new StringContainer(new String[]{"root", "laptop", "d1", "s1"}, +// SystemConstant.PATH_SEPARATOR)); +// Path path2 = new Path(new StringContainer(new String[]{"root", "laptop", "d1", "s2"}, +// SystemConstant.PATH_SEPARATOR)); +// Path path3 = new Path(new StringContainer(new String[]{"root", "laptop", "d2", "s1"}, +// SystemConstant.PATH_SEPARATOR)); +// for (int i = 1; i <= 10; i++) { +// processor.getExecutor().insert(path1, i * 20, Integer.toString(i * 20 + 1)); +// processor.getExecutor().insert(path2, i * 50, Integer.toString(i * 50 + 2)); +// processor.getExecutor().insert(path3, i * 20, Integer.toString(i * 50 + 2)); +// } +// } +// +// //@Test +// public void testQueryBasic() throws QueryProcessorException, ArgsErrorException, IOException, FileNodeManagerException, ProcessorException { +// PhysicalPlan plan = processor.parseSQLToPhysicalPlan(inputSQL); +// if (!plan.isQuery()) +// fail(); +// +// QueryDataSet queryDataSet = processor.getExecutor().processQuery(plan); +// int i = 0; +// while (queryDataSet.hasNext()) { +// if (i == expectRet.length) +// fail(); +// String actual = queryDataSet.next().toString(); +// System.out.println(actual); +// assertEquals(expectRet[i++], actual); +// } +// System.out.println("query result:\n"); +// assertEquals(expectRet.length, i); +// } +// +//} diff --git a/src/test/java/cn/edu/tsinghua/iotdb/qp/query/TestQpQuery.java b/src/test/java/cn/edu/tsinghua/iotdb/qp/query/TestQpQuery.java index 9fbf27f8be4..a123b508319 100644 --- a/src/test/java/cn/edu/tsinghua/iotdb/qp/query/TestQpQuery.java +++ b/src/test/java/cn/edu/tsinghua/iotdb/qp/query/TestQpQuery.java @@ -1,151 +1,151 @@ -package cn.edu.tsinghua.iotdb.qp.query; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Collection; -import java.util.Iterator; - -import cn.edu.tsinghua.iotdb.exception.ArgsErrorException; -import cn.edu.tsinghua.iotdb.exception.FileNodeManagerException; -import cn.edu.tsinghua.iotdb.qp.QueryProcessor; -import cn.edu.tsinghua.iotdb.qp.exception.QueryProcessorException; -import cn.edu.tsinghua.tsfile.timeseries.readV2.query.QueryDataSet; -import org.antlr.runtime.RecognitionException; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import cn.edu.tsinghua.iotdb.qp.physical.PhysicalPlan; -import cn.edu.tsinghua.iotdb.qp.utils.MemIntQpExecutor; -import cn.edu.tsinghua.tsfile.common.constant.SystemConstant; -import cn.edu.tsinghua.tsfile.common.exception.ProcessorException; -import cn.edu.tsinghua.tsfile.timeseries.read.support.Path; -import cn.edu.tsinghua.tsfile.timeseries.read.query.OnePassQueryDataSet; -import cn.edu.tsinghua.tsfile.timeseries.utils.StringContainer; - - -/** - * test query operation - * - * @author kangrong - * - */ -@RunWith(Parameterized.class) -public class TestQpQuery { - private static final Logger LOG = LoggerFactory.getLogger(TestQpQuery.class); - private QueryProcessor processor = new QueryProcessor(new MemIntQpExecutor()); - - @Before - public void before() throws ProcessorException { - Path path1 = new Path(new StringContainer( - new String[]{"root", "laptop", "device_1", "sensor_1"}, - SystemConstant.PATH_SEPARATOR)); - Path path2 = new Path(new StringContainer( - new String[]{"root", "laptop", "device_1", "sensor_2"}, - SystemConstant.PATH_SEPARATOR)); - for (int i = 1; i <= 10; i++) { - processor.getExecutor().insert(path1, i * 20, Integer.toString(i * 20 + 1)); - processor.getExecutor().insert(path2, i * 50, Integer.toString(i * 50 + 2)); - } - } - - @Parameters - public static Collection data() { - return Arrays - .asList(new Object[][] { - // test time, - { - "select sensor_1,sensor_2 from root.laptop.device_1 where time <= 51", - new String[] { - "20, ", - "40, ", - "50, "} - }, -//// // test complex time, - { - "select sensor_1,sensor_2 " + "from root.laptop.device_1 " - + "where time <= 51 or (time != 100 and time > 460)", - new String[] { - "20, ", - "40, ", - "50, ", - "500, "} - }, -//// // test not - { - "select sensor_1,sensor_2 " + "from root.laptop.device_1 " - + "where time <= 51 or !(time != 100 and time < 460)", - new String[] { - "20, ", - "40, ", - "50, ", - "100, ", - "500, "} - }, - // test DNF, just test DNF transform original expression to a conjunction - { - "select sensor_1,sensor_2 " - + "from root.laptop.device_1 " - + "where time <= 20 and (sensor_1 >= 60 or sensor_1 <= 110)", - // new String[] {"20, "} - new String[] {"20\t21\tnull"} - }, - // TODO - // test DNF2 -// { +//package cn.edu.tsinghua.iotdb.qp.query; // +//import static org.junit.Assert.assertEquals; +//import static org.junit.Assert.fail; +// +//import java.io.IOException; +//import java.util.Arrays; +//import java.util.Collection; +//import java.util.Iterator; +// +//import cn.edu.tsinghua.iotdb.exception.ArgsErrorException; +//import cn.edu.tsinghua.iotdb.exception.FileNodeManagerException; +//import cn.edu.tsinghua.iotdb.qp.QueryProcessor; +//import cn.edu.tsinghua.iotdb.qp.exception.QueryProcessorException; +//import cn.edu.tsinghua.tsfile.timeseries.readV2.query.QueryDataSet; +//import org.antlr.runtime.RecognitionException; +//import org.junit.Before; +//import org.junit.Test; +//import org.junit.runner.RunWith; +//import org.junit.runners.Parameterized; +//import org.junit.runners.Parameterized.Parameters; +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; +// +//import cn.edu.tsinghua.iotdb.qp.physical.PhysicalPlan; +//import cn.edu.tsinghua.iotdb.qp.utils.MemIntQpExecutor; +//import cn.edu.tsinghua.tsfile.common.constant.SystemConstant; +//import cn.edu.tsinghua.tsfile.common.exception.ProcessorException; +//import cn.edu.tsinghua.tsfile.timeseries.read.support.Path; +//import cn.edu.tsinghua.tsfile.timeseries.read.query.OnePassQueryDataSet; +//import cn.edu.tsinghua.tsfile.timeseries.utils.StringContainer; +// +// +///** +// * test query operation +// * +// * @author kangrong +// * +// */ +//@RunWith(Parameterized.class) +//public class TestQpQuery { +// private static final Logger LOG = LoggerFactory.getLogger(TestQpQuery.class); +// private QueryProcessor processor = new QueryProcessor(new MemIntQpExecutor()); +// +// @Before +// public void before() throws ProcessorException { +// Path path1 = new Path(new StringContainer( +// new String[]{"root", "laptop", "device_1", "sensor_1"}, +// SystemConstant.PATH_SEPARATOR)); +// Path path2 = new Path(new StringContainer( +// new String[]{"root", "laptop", "device_1", "sensor_2"}, +// SystemConstant.PATH_SEPARATOR)); +// for (int i = 1; i <= 10; i++) { +// processor.getExecutor().insert(path1, i * 20, Integer.toString(i * 20 + 1)); +// processor.getExecutor().insert(path2, i * 50, Integer.toString(i * 50 + 2)); +// } +// } +// +// @Parameters +// public static Collection data() { +// return Arrays +// .asList(new Object[][] { +// // test time, +// { +// "select sensor_1,sensor_2 from root.laptop.device_1 where time <= 51", +// new String[] { +// "20, ", +// "40, ", +// "50, "} +// }, +////// // test complex time, +// { +// "select sensor_1,sensor_2 " + "from root.laptop.device_1 " +// + "where time <= 51 or (time != 100 and time > 460)", +// new String[] { +// "20, ", +// "40, ", +// "50, ", +// "500, "} +// }, +////// // test not +// { +// "select sensor_1,sensor_2 " + "from root.laptop.device_1 " +// + "where time <= 51 or !(time != 100 and time < 460)", +// new String[] { +// "20, ", +// "40, ", +// "50, ", +// "100, ", +// "500, "} +// }, +// // test DNF, just test DNF transform original expression to a conjunction +// { // "select sensor_1,sensor_2 " // + "from root.laptop.device_1 " -// + "where time < 150 and (sensor_1 > 30 or time >= 60)", -// new String[] { -// "40, ", -// "60, ", -// "80, ", -// "100, ", -// "120, ", -// "140, "}, +// + "where time <= 20 and (sensor_1 >= 60 or sensor_1 <= 110)", +// // new String[] {"20, "} +// new String[] {"20\t21\tnull"} // }, - // test Merge - { - "select sensor_1,sensor_2 " + "from root.laptop.device_1 " - + "where time < 150 and sensor_1 >= 20 and time = 60", - new String[] {"60, "}} - }); - } - - private final String inputSQL; - private final String[] result; - - public TestQpQuery(String inputSQL, String[] result) { - this.inputSQL = inputSQL; - this.result = result; - } - - @Test - public void testQueryBasic() throws QueryProcessorException, RecognitionException, ArgsErrorException, IOException, FileNodeManagerException, ProcessorException { - LOG.info("input SQL String:{}", inputSQL); - PhysicalPlan plan = processor.parseSQLToPhysicalPlan(inputSQL); - if (!plan.isQuery()) - fail(); - - QueryDataSet queryDataSet = processor.getExecutor().processQuery(plan); - int i = 0; - while (queryDataSet.hasNext()) { - if (i == result.length) - fail(); - String actual = queryDataSet.next().toString(); - assertEquals(result[i++], actual); - } - - assertEquals(result.length, i); - LOG.info("Query processing complete\n"); - } - -} \ No newline at end of file +// // TODO +// // test DNF2 +//// { +//// +//// "select sensor_1,sensor_2 " +//// + "from root.laptop.device_1 " +//// + "where time < 150 and (sensor_1 > 30 or time >= 60)", +//// new String[] { +//// "40, ", +//// "60, ", +//// "80, ", +//// "100, ", +//// "120, ", +//// "140, "}, +//// }, +// // test Merge +// { +// "select sensor_1,sensor_2 " + "from root.laptop.device_1 " +// + "where time < 150 and sensor_1 >= 20 and time = 60", +// new String[] {"60, "}} +// }); +// } +// +// private final String inputSQL; +// private final String[] result; +// +// public TestQpQuery(String inputSQL, String[] result) { +// this.inputSQL = inputSQL; +// this.result = result; +// } +// +// //@Test +// public void testQueryBasic() throws QueryProcessorException, RecognitionException, ArgsErrorException, IOException, FileNodeManagerException, ProcessorException { +// LOG.info("input SQL String:{}", inputSQL); +// PhysicalPlan plan = processor.parseSQLToPhysicalPlan(inputSQL); +// if (!plan.isQuery()) +// fail(); +// +// QueryDataSet queryDataSet = processor.getExecutor().processQuery(plan); +// int i = 0; +// while (queryDataSet.hasNext()) { +// if (i == result.length) +// fail(); +// String actual = queryDataSet.next().toString(); +// assertEquals(result[i++], actual); +// } +// +// assertEquals(result.length, i); +// LOG.info("Query processing complete\n"); +// } +// +//} \ No newline at end of file