update 0.8.0-SNAPSHOT

This commit is contained in:
qiaojialingithub 2018-09-05 10:28:03 +08:00
commit 8241a365bb
56 changed files with 1121 additions and 1434 deletions

View File

@ -22,8 +22,8 @@ To use IoTDB, you need to have:
1. Java >= 1.8
2. Maven >= 3.0 (If you want to compile and install IoTDB from source code)
3. TsFile >= 0.6.0 (TsFile Github page: [https://github.com/thulab/tsfile](https://github.com/thulab/tsfile))
4. IoTDB-JDBC >= 0.6.0 (IoTDB-JDBC Github page: [https://github.com/thulab/iotdb-jdbc](https://github.com/thulab/iotdb-jdbc))
3. TsFile >= 0.7.0 (TsFile Github page: [https://github.com/thulab/tsfile](https://github.com/thulab/tsfile))
4. IoTDB-JDBC >= 0.7.0 (IoTDB-JDBC Github page: [https://github.com/thulab/iotdb-jdbc](https://github.com/thulab/iotdb-jdbc))
TODO: TsFile and IoTDB-JDBC dependencies will be removed after Jialin Qiao re-structured the Project.
@ -125,7 +125,7 @@ The command line client is interactive so if everything is ready you should see
| | .--.|_/ | | \_| | | `. \ | |_) |
| | / .'`\ \ | | | | | | | __'.
_| |_| \__. | _| |_ _| |_.' /_| |__) |
|_____|'.__.' |_____| |______.'|_______/ version 0.6.0
|_____|'.__.' |_____| |______.'|_______/ version 0.7.0
IoTDB> login successfully

View File

@ -0,0 +1,9 @@
20000=未知错误
20001=语句中无变量
20002=无效的变量
20003=无法连接到服务器:%s (%s)
20061=验证失败:%S
20062=不安全的函数调用:%s
20064=M客户端内存溢出
20130=语句未就绪
20220=连接失败

View File

@ -0,0 +1,9 @@
20000=Unknown error
20001=No parameters exist in the statement
20002=Invalid parameter number
20003=Can't connect to server on %s(%s)
20061=Authentication failed: %s
20062=Insecure API function call: %s
20064=Client ran out of memory
20130=Statement not prepared
20220=Fail to connect.

View File

@ -4,7 +4,7 @@
<groupId>cn.edu.tsinghua</groupId>
<artifactId>IoTDB</artifactId>
<version>0.6.0</version>
<version>0.8.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>IoTDB</name>
@ -23,7 +23,7 @@
<dependency>
<groupId>cn.edu.tsinghua</groupId>
<artifactId>iotdb-jdbc</artifactId>
<version>0.6.0</version>
<version>0.8.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>cn.edu.fudan.dsm</groupId>

View File

@ -4,7 +4,7 @@ public class TsFileDBConstant {
public static final String ENV_FILE_NAME = "iotdb-env";
public static final String IOTDB_CONF = "IOTDB_CONF";
public static final String GLOBAL_DB_NAME = "IoTDB";
public static final String VERSION = "0.6.0";
public static final String VERSION = "0.7.0";
public static final String REMOTE_JMX_PORT_NAME = "com.sun.management.jmxremote.port";
public static final String TSFILEDB_LOCAL_JMX_PORT_NAME = "iotdb.jmx.local.port";
public static final String TSFILEDB_REMOTE_JMX_PORT_NAME = "iotdb.jmx.remote.port";

View File

@ -14,6 +14,7 @@ public class TsfileDBConfig {
public static final String default_tsfile_dir = "settled";
public static final String mult_dir_strategy_prefix = "cn.edu.tsinghua.iotdb.conf.directories.strategy.";
public static final String default_mult_dir_strategy = "MaxDiskUsableSpaceFirstStrategy";
/**
* Port which JDBC server listens to
*/
@ -246,6 +247,11 @@ public class TsfileDBConfig {
*/
public int postbackServerPort = 5555;
/*
* Set the language version when loading file including error information, default value is "EN"
* */
public String languageVersion = "EN";
/**
* Choose a postBack strategy of merging historical data: 1. It's more likely to
* update historical data, choose "true". 2. It's more likely not to update

View File

@ -149,6 +149,8 @@ public class TsfileDBDescriptor {
int maxLogEntrySize = Integer.parseInt(properties.getProperty("max_log_entry_size", conf.maxLogEntrySize + "").trim());
conf.maxLogEntrySize = maxLogEntrySize > 0 ? maxLogEntrySize : conf.maxLogEntrySize;
conf.languageVersion = properties.getProperty("language_version", conf.languageVersion).trim();
String tmpTimeZone = properties.getProperty("time_zone", conf.timeZone.getID());
try {
conf.timeZone = DateTimeZone.forID(tmpTimeZone.trim());

View File

@ -0,0 +1,62 @@
package cn.edu.tsinghua.iotdb.conf.directories.strategy;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.logging.Logger;
public class MinDirOccupiedSpaceFirstStrategy extends DirectoryStrategy {
// directory space is measured by MB
private final long DATA_SIZE_SHIFT = 1024 * 1024;
@Override
public int nextFolderIndex() {
return getMinOccupiedSpaceFolder();
}
private int getMinOccupiedSpaceFolder() {
List<Integer> candidates = new ArrayList<>();
long min = 0;
candidates.add(0);
min = getOccupiedSpace(folders.get(0));
for(int i = 1;i < folders.size();i++){
long current = getOccupiedSpace(folders.get(i));
if(min > current){
candidates.clear();
candidates.add(i);
min = current;
}
else if(min == current){
candidates.add(i);
}
}
Random random = new Random(System.currentTimeMillis());
int index = random.nextInt(candidates.size());
return candidates.get(index);
}
private long getOccupiedSpace(String path) {
Path folder = Paths.get(path);
long size = 0;
try {
size = Files.walk(folder)
.filter(p -> p.toFile().isFile())
.mapToLong(p -> p.toFile().length())
.sum();
} catch (IOException e) {
LOGGER.error("Cannot calculate occupied space for path {}.", path);
}
return size / DATA_SIZE_SHIFT;
}
}

View File

@ -46,7 +46,7 @@ public class MinFolderOccupiedSpaceFirstStrategy extends DirectoryStrategy {
private long getOccupiedSpace(String path) {
Path folder = Paths.get(path);
long size = 0;
long size = Long.MAX_VALUE;
try {
size = Files.walk(folder)
.filter(p -> p.toFile().isFile())

View File

@ -1,4 +1,4 @@
package cn.edu.tsinghua.iotdb.engine.bufferwriteV2;
package cn.edu.tsinghua.iotdb.engine.bufferwrite;
import java.io.IOException;
import java.util.ArrayList;

View File

@ -1,94 +0,0 @@
package cn.edu.tsinghua.iotdb.engine.bufferwrite;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import cn.edu.tsinghua.tsfile.common.utils.ITsRandomAccessFileWriter;
import cn.edu.tsinghua.tsfile.file.metadata.RowGroupMetaData;
import cn.edu.tsinghua.tsfile.file.metadata.TimeSeriesChunkMetaData;
import cn.edu.tsinghua.tsfile.file.metadata.enums.TSDataType;
import cn.edu.tsinghua.tsfile.timeseries.write.io.TsFileIOWriter;
/**
* @author kangrong
*
*/
public class BufferWriteIOWriter extends TsFileIOWriter {
/*
* The backup list is used to store the rowgroup's metadata whose data has
* been flushed into file.
*/
private final List<RowGroupMetaData> backUpList = new ArrayList<RowGroupMetaData>();
private int lastRowGroupIndex = 0;
public BufferWriteIOWriter(ITsRandomAccessFileWriter output) throws IOException {
super(output);
}
/**
* This is just used to restore a tsfile from the middle of the file
*
* @param schema
* @param output
* @param rowGroups
* @throws IOException
*/
public BufferWriteIOWriter(ITsRandomAccessFileWriter output, long offset, List<RowGroupMetaData> rowGroups)
throws IOException {
super(output, offset, rowGroups);
addrowGroupsTobackupList(rowGroups);
}
private void addrowGroupsTobackupList(List<RowGroupMetaData> rowGroups) {
for (RowGroupMetaData rowGroupMetaData : rowGroups) {
backUpList.add(rowGroupMetaData);
}
lastRowGroupIndex = rowGroups.size();
}
/**
* <b>Note that</b>,the method is not thread safe.
*/
public void addNewRowGroupMetaDataToBackUp() {
for (int i = lastRowGroupIndex; i < rowGroupMetaDatas.size(); i++) {
backUpList.add(rowGroupMetaDatas.get(i));
}
lastRowGroupIndex = rowGroupMetaDatas.size();
}
/**
* <b>Note that</b>, the method is not thread safe. You mustn't do any
* change on the return.<br>
*
* @return
*/
public List<RowGroupMetaData> getCurrentRowGroupMetaList(String deltaObjectId) {
List<RowGroupMetaData> ret = new ArrayList<>();
for (RowGroupMetaData rowGroupMetaData : backUpList) {
if (rowGroupMetaData.getDeltaObjectID().equals(deltaObjectId)) {
ret.add(rowGroupMetaData);
}
}
return ret;
}
public List<TimeSeriesChunkMetaData> getCurrentTimeSeriesMetadataList(String deltaObjectId, String measurementId,
TSDataType dataType) {
List<TimeSeriesChunkMetaData> chunkMetaDatas = new ArrayList<>();
for (RowGroupMetaData rowGroupMetaData : backUpList) {
if (rowGroupMetaData.getDeltaObjectID().equals(deltaObjectId)) {
for (TimeSeriesChunkMetaData chunkMetaData : rowGroupMetaData.getTimeSeriesChunkMetaDataList()) {
// filter data-type and measurementId
if (chunkMetaData.getProperties().getMeasurementUID().equals(measurementId)
&& chunkMetaData.getVInTimeSeriesChunkMetaData().getDataType().equals(dataType)) {
chunkMetaDatas.add(chunkMetaData);
}
}
}
}
return chunkMetaDatas;
}
}

View File

@ -1,36 +0,0 @@
package cn.edu.tsinghua.iotdb.engine.bufferwrite;
import cn.edu.tsinghua.tsfile.timeseries.read.query.DynamicOneColumnData;
import cn.edu.tsinghua.tsfile.timeseries.write.record.TSRecord;
/**
* The function of this interface is to store and index TSRecord in memory
* temporarily
*
* @author kangrong
*
*/
@Deprecated
public interface BufferWriteIndex {
/**
* insert a tsRecord
*
* @param tsRecord
*/
void insert(TSRecord tsRecord);
/**
* Get the DynamicOneColumnData from the buffer index
*
* @param deltaObjectId
* @param measurementId
* @return
*/
public DynamicOneColumnData query(String deltaObjectId, String measurementId);
/**
* clear all data written in the bufferindex structure which will be used
* for next stage
*/
void clear();
}

View File

@ -1,14 +1,7 @@
package cn.edu.tsinghua.iotdb.engine.bufferwrite;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
@ -29,7 +22,6 @@ import cn.edu.tsinghua.iotdb.engine.Processor;
import cn.edu.tsinghua.iotdb.engine.memcontrol.BasicMemController;
import cn.edu.tsinghua.iotdb.engine.memtable.IMemTable;
import cn.edu.tsinghua.iotdb.engine.memtable.MemSeriesLazyMerger;
import cn.edu.tsinghua.iotdb.engine.memtable.MemTableFlushUtil;
import cn.edu.tsinghua.iotdb.engine.memtable.PrimitiveMemTable;
import cn.edu.tsinghua.iotdb.engine.pool.FlushManager;
import cn.edu.tsinghua.iotdb.engine.querycontext.RawSeriesChunk;
@ -37,75 +29,50 @@ import cn.edu.tsinghua.iotdb.engine.querycontext.RawSeriesChunkLazyLoadImpl;
import cn.edu.tsinghua.iotdb.engine.utils.FlushStatus;
import cn.edu.tsinghua.iotdb.exception.BufferWriteProcessorException;
import cn.edu.tsinghua.iotdb.utils.MemUtils;
import cn.edu.tsinghua.tsfile.common.conf.TSFileConfig;
import cn.edu.tsinghua.tsfile.common.conf.TSFileDescriptor;
import cn.edu.tsinghua.tsfile.common.utils.BytesUtils;
import cn.edu.tsinghua.tsfile.common.utils.ITsRandomAccessFileWriter;
import cn.edu.tsinghua.tsfile.common.utils.Pair;
import cn.edu.tsinghua.tsfile.common.utils.TsRandomAccessFileWriter;
import cn.edu.tsinghua.tsfile.file.metadata.RowGroupMetaData;
import cn.edu.tsinghua.tsfile.file.metadata.TimeSeriesChunkMetaData;
import cn.edu.tsinghua.tsfile.file.metadata.TsRowGroupBlockMetaData;
import cn.edu.tsinghua.tsfile.file.metadata.enums.TSDataType;
import cn.edu.tsinghua.tsfile.file.utils.ReadWriteThriftFormatUtils;
import cn.edu.tsinghua.tsfile.format.RowGroupBlockMetaData;
import cn.edu.tsinghua.tsfile.timeseries.write.record.DataPoint;
import cn.edu.tsinghua.tsfile.timeseries.write.record.TSRecord;
import cn.edu.tsinghua.tsfile.timeseries.write.schema.FileSchema;
/**
* @author liukun
*/
public class BufferWriteProcessor extends Processor {
private static final Logger LOGGER = LoggerFactory.getLogger(BufferWriteProcessor.class);
private static final TsfileDBConfig TsFileDBConf = TsfileDBDescriptor.getInstance().getConfig();
private static final TSFileConfig TsFileConf = TSFileDescriptor.getInstance().getConfig();
private static final int TS_METADATA_BYTE_SIZE = 4;
private static final int TSFILE_POSITION_BYTE_SIZE = 8;
private FileSchema fileSchema;
private BufferWriteResource bufferWriteResource;
private volatile FlushStatus flushStatus = new FlushStatus();
private volatile boolean isFlush;
private ReentrantLock flushQueryLock = new ReentrantLock();
private AtomicLong memSize = new AtomicLong();
private long memThreshold = TSFileDescriptor.getInstance().getConfig().groupSizeInByte;
private IMemTable workMemTable;
private IMemTable flushMemTable;
private FileSchema fileSchema;
private BufferWriteIOWriter bufferIOWriter;
private int lastRowgroupSize = 0;
// this just the bufferwrite file name
private String baseDir;
private String fileName;
private static final String restoreFile = ".restore";
// this is the bufferwrite file absolute path
private String bufferwriteRestoreFilePath;
private String bufferwriteOutputFilePath;
private String bufferwriterelativePath;
private File bufferwriteOutputFile;
private boolean isNewProcessor = false;
private Action bufferwriteFlushAction = null;
private Action bufferwriteCloseAction = null;
private Action filenodeFlushAction = null;
private long memThreshold = TsFileConf.groupSizeInByte;
private long lastFlushTime = -1;
private long valueCount = 0;
private volatile boolean isFlush;
private AtomicLong memSize = new AtomicLong();
private String baseDir;
private String fileName;
private String insertFilePath;
private String bufferwriterelativePath;
private WriteLogNode logNode;
public BufferWriteProcessor(String baseDir, String processorName, String fileName, Map<String, Object> parameters,
FileSchema fileSchema) throws BufferWriteProcessorException {
super(processorName);
this.fileName = fileName;
String restoreFileName = fileName + restoreFile;
this.fileSchema = fileSchema;
this.baseDir = baseDir;
this.fileName = fileName;
if (baseDir.length() > 0
&& baseDir.charAt(baseDir.length() - 1) != File.separatorChar) {
baseDir = baseDir + File.separatorChar;
@ -116,50 +83,21 @@ public class BufferWriteProcessor extends Processor {
dataDir.mkdirs();
LOGGER.debug("The bufferwrite processor data dir doesn't exists, create new directory {}.", dataDirPath);
}
bufferwriteOutputFile = new File(dataDir, fileName);
File restoreFile = new File(dataDir, restoreFileName);
bufferwriteRestoreFilePath = restoreFile.getPath();
bufferwriteOutputFilePath = bufferwriteOutputFile.getPath();
this.insertFilePath = new File(dataDir, fileName).getPath();
bufferwriterelativePath = processorName + File.separatorChar + fileName;
// get the fileschema
this.fileSchema = fileSchema;
if (bufferwriteOutputFile.exists() && restoreFile.exists()) {
//
// There is one damaged file, and the RESTORE_FILE_SUFFIX exist
//
LOGGER.info("Recorvery the bufferwrite processor {}.", processorName);
bufferwriteRecovery();
} else {
ITsRandomAccessFileWriter outputWriter;
try {
outputWriter = new TsRandomAccessFileWriter(bufferwriteOutputFile);
} catch (IOException e) {
LOGGER.error("Construct the TSRandomAccessFileWriter error, the absolutePath is {}.",
bufferwriteOutputFile.getPath(), e);
throw new BufferWriteProcessorException(e);
}
try {
bufferIOWriter = new BufferWriteIOWriter(outputWriter);
} catch (IOException e) {
LOGGER.error("Get the BufferWriteIOWriter error, the bufferwrite is {}.", processorName, e);
throw new BufferWriteProcessorException(e);
}
isNewProcessor = true;
// write restore file
writeStoreToDisk();
try {
bufferWriteResource = new BufferWriteResource(processorName, insertFilePath);
} catch (IOException e) {
throw new BufferWriteProcessorException(e);
}
// init action
// the action from the corresponding filenode processor
bufferwriteFlushAction = (Action) parameters.get(FileNodeConstants.BUFFERWRITE_FLUSH_ACTION);
bufferwriteCloseAction = (Action) parameters.get(FileNodeConstants.BUFFERWRITE_CLOSE_ACTION);
filenodeFlushAction = (Action) parameters.get(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION);
workMemTable = new PrimitiveMemTable();
if (TsfileDBDescriptor.getInstance().getConfig().enableWal) {
if(TsfileDBDescriptor.getInstance().getConfig().enableWal) {
try {
logNode = MultiFileLogNodeManager.getInstance().getNode(
processorName + TsFileDBConstant.BUFFERWRITE_LOG_NODE_SUFFIX, getBufferwriteRestoreFilePath(),
@ -170,256 +108,6 @@ public class BufferWriteProcessor extends Processor {
}
}
/**
* <p>
* Recovery the bufferwrite status.<br>
* The one part is the last intervalFile<br>
* The other part is all the intervalFile, and other file will be deleted
* </p>
*
* @throws BufferWriteProcessorException
*/
private void bufferwriteRecovery() throws BufferWriteProcessorException {
Pair<Long, List<RowGroupMetaData>> pair;
try {
pair = readStoreFromDisk();
} catch (IOException e) {
LOGGER.error("Failed to read bufferwrite {} restore file.", getProcessorName());
throw new BufferWriteProcessorException(e);
}
ITsRandomAccessFileWriter output;
long lastFlushPosition = pair.left;
File lastBufferWriteFile = new File(bufferwriteOutputFilePath);
if (lastBufferWriteFile.length() != lastFlushPosition) {
LOGGER.warn(
"The last bufferwrite file {} is damaged, the length of the last bufferwrite file is {}, the end of last successful flush is {}.",
lastBufferWriteFile.getPath(), lastBufferWriteFile.length(), lastFlushPosition);
try {
FileChannel fileChannel = new FileOutputStream(bufferwriteOutputFile, true).getChannel();
fileChannel.truncate(lastFlushPosition);
fileChannel.close();
//cutOffFile(lastFlushPosition);
} catch (IOException e) {
LOGGER.error(
"Cut off damaged file error, the damaged file path is {}, the length is {}, the cut off length is {}.",
bufferwriteOutputFilePath, lastBufferWriteFile.length(), lastFlushPosition, e);
throw new BufferWriteProcessorException(e);
}
}
try {
// Notice: the offset is seek to end of the file by API of kr
output = new TsRandomAccessFileWriter(lastBufferWriteFile);
} catch (IOException e) {
LOGGER.error("Can't construct the RandomAccessOutputStream, the outputPath is {}.",
bufferwriteOutputFilePath);
throw new BufferWriteProcessorException(e);
}
try {
// Notice: the parameter of lastPosition is not used beacuse of the
// API of kr
bufferIOWriter = new BufferWriteIOWriter(output, lastFlushPosition, pair.right);
} catch (IOException e) {
LOGGER.error("Can't get the BufferWriteIOWriter while recoverying, the bufferwrite processor is {}.",
getProcessorName(), e);
throw new BufferWriteProcessorException(e);
}
isNewProcessor = false;
}
private void cutOffFile(long length) throws IOException {
String tempPath = bufferwriteOutputFilePath + ".backup";
File tempFile = new File(tempPath);
File normalFile = new File(bufferwriteOutputFilePath);
if (normalFile.exists() && normalFile.length() > 0) {
if (tempFile.exists()) {
tempFile.delete();
}
RandomAccessFile normalReader = null;
RandomAccessFile tempWriter = null;
try {
normalReader = new RandomAccessFile(normalFile, "r");
tempWriter = new RandomAccessFile(tempFile, "rw");
} catch (FileNotFoundException e) {
LOGGER.error(
"Can't get the RandomAccessFile read and write, the normal path is {}, the temp path is {}.",
bufferwriteOutputFilePath, tempPath);
if (normalReader != null) {
normalReader.close();
}
if (tempWriter != null) {
tempWriter.close();
}
throw e;
}
long offset = 0;
int step = 4 * 1024 * 1024;
byte[] buff = new byte[step];
while (length - offset >= step) {
try {
normalReader.readFully(buff);
tempWriter.write(buff);
} catch (IOException e) {
LOGGER.error("normalReader read data failed or tempWriter write data error.");
throw e;
}
offset = offset + step;
}
normalReader.readFully(buff, 0, (int) (length - offset));
tempWriter.write(buff, 0, (int) (length - offset));
normalReader.close();
tempWriter.close();
}
normalFile.delete();
tempFile.renameTo(normalFile);
}
/**
* This is only used after flush one rowroup data successfully.
*
* @throws BufferWriteProcessorException
*/
private void writeStoreToDisk() throws BufferWriteProcessorException {
long lastPosition;
try {
lastPosition = bufferIOWriter.getPos();
} catch (IOException e) {
LOGGER.error("Can't get the bufferwrite io position, the buffewrite processor is {}", getProcessorName(),
e);
throw new BufferWriteProcessorException(e);
}
List<RowGroupMetaData> rowGroupMetaDatas = bufferIOWriter.getRowGroups();
List<RowGroupMetaData> appendMetadata = new ArrayList<>();
for (int i = lastRowgroupSize; i < rowGroupMetaDatas.size(); i++) {
appendMetadata.add(rowGroupMetaDatas.get(i));
}
lastRowgroupSize = rowGroupMetaDatas.size();
TsRowGroupBlockMetaData tsRowGroupBlockMetaData = new TsRowGroupBlockMetaData();
tsRowGroupBlockMetaData.setRowGroups(appendMetadata);
RandomAccessFile out = null;
try {
out = new RandomAccessFile(bufferwriteRestoreFilePath, "rw");
} catch (FileNotFoundException e) {
LOGGER.error("The restore file {} can't be created, the bufferwrite processor is {}",
bufferwriteRestoreFilePath, getProcessorName(), e);
throw new BufferWriteProcessorException(e);
}
try {
if (out.length() > 0) {
out.seek(out.length() - TSFILE_POSITION_BYTE_SIZE);
}
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ReadWriteThriftFormatUtils.writeRowGroupBlockMetadata(tsRowGroupBlockMetaData.convertToThrift(), baos);
// write metadata size using int
int metadataSize = baos.size();
out.write(BytesUtils.intToBytes(metadataSize));
// write metadata
out.write(baos.toByteArray());
// write tsfile position using byte[8] which is present one long
// number
byte[] lastPositionBytes = BytesUtils.longToBytes(lastPosition);
out.write(lastPositionBytes);
} catch (IOException e) {
throw new BufferWriteProcessorException(e);
} finally {
if (out != null) {
try {
out.close();
} catch (IOException e) {
e.printStackTrace();
throw new BufferWriteProcessorException(e);
}
}
}
}
/**
* This is used to delete the file which is used to restore buffer write
* processor. This is only used after closing the buffer write processor
* successfully.
*/
private void deleteRestoreFile() {
File restoreFile = new File(bufferwriteRestoreFilePath);
if (restoreFile.exists()) {
restoreFile.delete();
}
}
/**
* The left of the pair is the last successful flush position. The right of
* the pair is the rowGroupMetadata.
*
* @return - the left is the end position of the last rowgroup flushed, the
* right is all the rowgroup meatdata flushed
* @throws IOException
*/
private Pair<Long, List<RowGroupMetaData>> readStoreFromDisk() throws IOException {
byte[] lastPostionBytes = new byte[TSFILE_POSITION_BYTE_SIZE];
List<RowGroupMetaData> groupMetaDatas = new ArrayList<>();
RandomAccessFile randomAccessFile = null;
try {
randomAccessFile = new RandomAccessFile(bufferwriteRestoreFilePath, "rw");
long fileLength = randomAccessFile.length();
// read tsfile position
long point = randomAccessFile.getFilePointer();
while (point + TSFILE_POSITION_BYTE_SIZE < fileLength) {
byte[] metadataSizeBytes = new byte[TS_METADATA_BYTE_SIZE];
randomAccessFile.read(metadataSizeBytes);
int metadataSize = BytesUtils.bytesToInt(metadataSizeBytes);
byte[] thriftBytes = new byte[metadataSize];
randomAccessFile.read(thriftBytes);
ByteArrayInputStream inputStream = new ByteArrayInputStream(thriftBytes);
RowGroupBlockMetaData rowGroupBlockMetaData = ReadWriteThriftFormatUtils
.readRowGroupBlockMetaData(inputStream);
TsRowGroupBlockMetaData blockMeta = new TsRowGroupBlockMetaData();
blockMeta.convertToTSF(rowGroupBlockMetaData);
groupMetaDatas.addAll(blockMeta.getRowGroups());
lastRowgroupSize = groupMetaDatas.size();
point = randomAccessFile.getFilePointer();
}
// read the tsfile position information using byte[8] which is
// present one long number.
randomAccessFile.read(lastPostionBytes);
} catch (FileNotFoundException e) {
LOGGER.error("The restore file does not exist, the restore file path is {}.", bufferwriteRestoreFilePath,
e);
throw e;
} catch (IOException e) {
LOGGER.error("Read data from file error.", e);
throw e;
} finally {
if (randomAccessFile != null) {
randomAccessFile.close();
}
}
long lastPostion = BytesUtils.bytesToLong(lastPostionBytes);
Pair<Long, List<RowGroupMetaData>> result = new Pair<Long, List<RowGroupMetaData>>(lastPostion, groupMetaDatas);
return result;
}
public String getFileName() {
return fileName;
}
public String getBaseDir() { return baseDir; }
public String getFileRelativePath() {
return bufferwriterelativePath;
}
public boolean isNewProcessor() {
return isNewProcessor;
}
public void setNewProcessor(boolean isNewProcessor) {
this.isNewProcessor = isNewProcessor;
}
/**
* write one data point to the bufferwrite
*
@ -443,7 +131,6 @@ public class BufferWriteProcessor extends Processor {
public boolean write(TSRecord tsRecord) throws BufferWriteProcessorException {
long memUage = MemUtils.getRecordSize(tsRecord);
BasicMemController.UsageLevel level = BasicMemController.getInstance().reportUse(this, memUage);
for (DataPoint dataPoint : tsRecord.dataPointList) {
workMemTable.write(tsRecord.deltaObjectId, dataPoint.getMeasurementId(), dataPoint.getType(), tsRecord.time,
dataPoint.getValue().toString());
@ -490,18 +177,6 @@ public class BufferWriteProcessor extends Processor {
}
}
@Deprecated
public Pair<List<Object>, List<RowGroupMetaData>> queryBufferwriteData(String deltaObjectId, String measurementId) {
flushQueryLock.lock();
try {
List<Object> memData = new ArrayList<>();
List<RowGroupMetaData> list = new ArrayList<>();
return new Pair<>(memData, list);
} finally {
flushQueryLock.unlock();
}
}
public Pair<RawSeriesChunk, List<TimeSeriesChunkMetaData>> queryBufferwriteData(String deltaObjectId,
String measurementId, TSDataType dataType) {
flushQueryLock.lock();
@ -513,7 +188,7 @@ public class BufferWriteProcessor extends Processor {
memSeriesLazyMerger.addMemSeries(workMemTable.query(deltaObjectId, measurementId, dataType));
RawSeriesChunk rawSeriesChunk = new RawSeriesChunkLazyLoadImpl(dataType, memSeriesLazyMerger);
return new Pair<>(rawSeriesChunk,
bufferIOWriter.getCurrentTimeSeriesMetadataList(deltaObjectId, measurementId, dataType));
bufferWriteResource.getInsertMetadatas(deltaObjectId, measurementId, dataType));
} finally {
flushQueryLock.unlock();
}
@ -532,12 +207,12 @@ public class BufferWriteProcessor extends Processor {
}
}
private void switchFlushToWork() {
private void swithFlushToWork() {
flushQueryLock.lock();
try {
flushMemTable.clear();
flushMemTable = null;
bufferIOWriter.addNewRowGroupMetaDataToBackUp();
bufferWriteResource.appendMetadata();
} finally {
isFlush = false;
flushQueryLock.unlock();
@ -548,20 +223,7 @@ public class BufferWriteProcessor extends Processor {
long flushStartTime = System.currentTimeMillis();
LOGGER.info("The bufferwrite processor {} starts flushing {}.", getProcessorName(), flushFunction);
try {
long startFlushDataTime = System.currentTimeMillis();
long startPos = bufferIOWriter.getPos();
// TODO : FLUSH DATA
MemTableFlushUtil.flushMemTable(fileSchema, bufferIOWriter, flushMemTable);
long flushDataSize = bufferIOWriter.getPos() - startPos;
long timeInterval = System.currentTimeMillis() - startFlushDataTime;
if (timeInterval == 0) {
timeInterval = 1;
}
LOGGER.info("The bufferwrite processor {} flush {}, actual:{}, time consumption:{} ms, flush rate:{}/s",
getProcessorName(), flushFunction, MemUtils.bytesCntToStr(flushDataSize), timeInterval,
MemUtils.bytesCntToStr(flushDataSize / timeInterval * 1000));
// write restore information
writeStoreToDisk();
bufferWriteResource.flush(fileSchema, flushMemTable);
filenodeFlushAction.act();
if (TsfileDBDescriptor.getInstance().getConfig().enableWal) {
logNode.notifyEndFlush(null);
@ -574,13 +236,11 @@ public class BufferWriteProcessor extends Processor {
} finally {
synchronized (flushStatus) {
flushStatus.setUnFlushing();
switchFlushToWork();
swithFlushToWork();
flushStatus.notify();
LOGGER.info("The bufferwrite processor {} ends flushing {}.", getProcessorName(), flushFunction);
}
}
// BasicMemController.getInstance().reportFree(BufferWriteProcessor.this,
// oldMemUsage);
long flushEndTime = System.currentTimeMillis();
long flushInterval = flushEndTime - flushStartTime;
DateTime startDateTime = new DateTime(flushStartTime, TsfileDBDescriptor.getInstance().getConfig().timeZone);
@ -620,9 +280,7 @@ public class BufferWriteProcessor extends Processor {
try {
bufferwriteFlushAction.act();
} catch (Exception e) {
LOGGER.error(
"The bufferwrite processor {} failed to flush bufferwrite row group when calling the action function.",
getProcessorName());
LOGGER.error("Failed to flush bufferwrite row group when calling the action function.");
throw new IOException(e);
}
if (TsfileDBDescriptor.getInstance().getConfig().enableWal) {
@ -661,17 +319,6 @@ public class BufferWriteProcessor extends Processor {
@Override
public boolean canBeClosed() {
LOGGER.info("Check whether bufferwrite processor {} can be closed.", getProcessorName());
synchronized (flushStatus) {
while (flushStatus.isFlushing()) {
LOGGER.info("The bufferite processor {} is flushing, waiting for the flush end.", getProcessorName());
try {
flushStatus.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
return true;
}
@ -682,13 +329,12 @@ public class BufferWriteProcessor extends Processor {
// flush data
flush(true);
// end file
bufferIOWriter.endFile(fileSchema);
bufferWriteResource.close(fileSchema);
// update the intervalfile for interval list
bufferwriteCloseAction.act();
// flush the changed information for filenode
filenodeFlushAction.act();
// delete the restore for this bufferwrite processor
deleteRestoreFile();
long closeEndTime = System.currentTimeMillis();
long closeInterval = closeEndTime - closeStartTime;
DateTime startDateTime = new DateTime(closeStartTime,
@ -724,7 +370,9 @@ public class BufferWriteProcessor extends Processor {
* @throws IOException
*/
public long getFileSize() {
return bufferwriteOutputFile.length() + memoryUsage();
// TODO : save this variable to avoid object creation?
File file = new File(insertFilePath);
return file.length() + memoryUsage();
}
/**
@ -758,11 +406,29 @@ public class BufferWriteProcessor extends Processor {
return false;
}
public String getBaseDir() { return baseDir; }
public String getFileName() {
return fileName;
}
public String getFileRelativePath() {
return bufferwriterelativePath;
}
private String getBufferwriteRestoreFilePath() {
return bufferWriteResource.getRestoreFilePath();
}
public boolean isNewProcessor() {
return bufferWriteResource.isNewResource();
}
public void setNewProcessor(boolean isNewProcessor) {
bufferWriteResource.setNewResource(isNewProcessor);
}
public WriteLogNode getLogNode() {
return logNode;
}
public String getBufferwriteRestoreFilePath() {
return bufferwriteRestoreFilePath;
}
}

View File

@ -1,4 +1,4 @@
package cn.edu.tsinghua.iotdb.engine.bufferwriteV2;
package cn.edu.tsinghua.iotdb.engine.bufferwrite;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@ -39,18 +39,20 @@ public class BufferWriteResource {
private static final String restoreSuffix = ".restore";
private static final String DEFAULT_MODE = "rw";
private Map<String, Map<String, List<TimeSeriesChunkMetaData>>> metadatas;
private List<RowGroupMetaData> appendRowGroupMetadats;
private List<RowGroupMetaData> appendRowGroupMetadatas;
private BufferIO bufferWriteIO;
private String insertFilePath;
private String restoreFilePath;
private String processorName;
private boolean isNewResource = false;
public BufferWriteResource(String processorName, String insertFilePath) throws IOException {
this.insertFilePath = insertFilePath;
this.restoreFilePath = insertFilePath + restoreSuffix;
this.processorName = processorName;
this.metadatas = new HashMap<>();
this.appendRowGroupMetadats = new ArrayList<>();
this.appendRowGroupMetadatas = new ArrayList<>();
recover();
}
@ -69,14 +71,17 @@ public class BufferWriteResource {
//cutOffFile(position);
// recovery the BufferWriteIO
bufferWriteIO = new BufferIO(new TsRandomAccessFileWriter(insertFile), position, metadatas);
recoverMetadata(metadatas);
LOGGER.info(
"Recover the bufferwrite processor {}, the tsfile path is {}, the position of last flush is {}, the size of rowGroupMetadata is {}",
processorName, insertFilePath, position, metadatas.size());
isNewResource = false;
} else {
insertFile.delete();
restoreFile.delete();
bufferWriteIO = new BufferIO(new TsRandomAccessFileWriter(insertFile), 0, new ArrayList<>());
isNewResource = true;
writeRestoreInfo();
}
}
@ -229,6 +234,14 @@ public class BufferWriteResource {
return restoreFilePath;
}
public boolean isNewResource() {
return isNewResource;
}
public void setNewResource(boolean isNewResource) {
this.isNewResource = isNewResource;
}
public void flush(FileSchema fileSchema, IMemTable iMemTable) throws IOException {
if (iMemTable != null && !iMemTable.isEmpty()) {
long startPos = bufferWriteIO.getPos();
@ -244,19 +257,19 @@ public class BufferWriteResource {
"Bufferwrite processor {} flushes insert data, actual:{}, time consumption:{} ms, flush rate:{}/s",
processorName, MemUtils.bytesCntToStr(insertSize), timeInterval,
MemUtils.bytesCntToStr(insertSize / timeInterval * 1000));
appendRowGroupMetadats.addAll(bufferWriteIO.getAppendedRowGroupMetadata());
appendRowGroupMetadatas.addAll(bufferWriteIO.getAppendedRowGroupMetadata());
}
}
public void appendMetadata() {
if (!appendRowGroupMetadats.isEmpty()) {
for (RowGroupMetaData rowGroupMetaData : appendRowGroupMetadats) {
if (!appendRowGroupMetadatas.isEmpty()) {
for (RowGroupMetaData rowGroupMetaData : appendRowGroupMetadatas) {
for (TimeSeriesChunkMetaData chunkMetaData : rowGroupMetaData.getTimeSeriesChunkMetaDataList()) {
addInsertMetadata(rowGroupMetaData.getDeltaObjectID(),
chunkMetaData.getProperties().getMeasurementUID(), chunkMetaData);
}
}
appendRowGroupMetadats.clear();
appendRowGroupMetadatas.clear();
}
}

View File

@ -1,78 +0,0 @@
package cn.edu.tsinghua.iotdb.engine.bufferwrite;
import java.util.HashMap;
import java.util.Map;
import cn.edu.tsinghua.tsfile.common.utils.Binary;
import cn.edu.tsinghua.tsfile.timeseries.read.query.DynamicOneColumnData;
import cn.edu.tsinghua.tsfile.timeseries.write.record.DataPoint;
import cn.edu.tsinghua.tsfile.timeseries.write.record.TSRecord;
/**
* Implement the {@code BufferWriteIndex}<br>
* This class is used to store bufferwrite data in memory, also index data
* easily<br>
*
* @author kangrong
* @author liukun
*
*/
@Deprecated
public class MemoryBufferWriteIndexImpl implements BufferWriteIndex {
private Map<String, DynamicOneColumnData> indexMap;
public MemoryBufferWriteIndexImpl() {
indexMap = new HashMap<String, DynamicOneColumnData>();
}
@Override
public DynamicOneColumnData query(String deltaObjectId, String measurementId) {
return indexMap.get(concatPath(deltaObjectId, measurementId));
}
@Override
public void insert(TSRecord tsRecord) {
String deltaObjectId = tsRecord.deltaObjectId;
for (DataPoint dp : tsRecord.dataPointList) {
String measurementId = dp.getMeasurementId();
String path = concatPath(deltaObjectId, measurementId);
if (!indexMap.containsKey(path))
indexMap.put(path, new DynamicOneColumnData(dp.getType(), true));
DynamicOneColumnData deltaMea = indexMap.get(path);
Object value = dp.getValue();
deltaMea.putTime(tsRecord.time);
switch (dp.getType()) {
case INT32:
deltaMea.putInt((Integer) value);
break;
case INT64:
deltaMea.putLong((Long) value);
break;
case FLOAT:
deltaMea.putFloat((Float) value);
break;
case DOUBLE:
deltaMea.putDouble((Double) value);
break;
case TEXT:
deltaMea.putBinary((Binary) value);
break;
case BOOLEAN:
deltaMea.putBoolean((boolean) value);
break;
default:
throw new UnsupportedOperationException("no support type:" + dp.getType() + "record:" + tsRecord);
}
}
}
@Override
public void clear() {
indexMap.clear();
}
private String concatPath(String deltaObjectId, String measurementId) {
return deltaObjectId + FileNodeConstants.PATH_SEPARATOR + measurementId;
}
}

View File

@ -1,411 +0,0 @@
package cn.edu.tsinghua.iotdb.engine.bufferwriteV2;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import cn.edu.tsinghua.iotdb.conf.TsFileDBConstant;
import cn.edu.tsinghua.iotdb.engine.filenode.FileNodeManager;
import cn.edu.tsinghua.iotdb.writelog.manager.MultiFileLogNodeManager;
import cn.edu.tsinghua.iotdb.writelog.node.WriteLogNode;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import cn.edu.tsinghua.iotdb.conf.TsfileDBConfig;
import cn.edu.tsinghua.iotdb.conf.TsfileDBDescriptor;
import cn.edu.tsinghua.iotdb.engine.Processor;
import cn.edu.tsinghua.iotdb.engine.bufferwrite.Action;
import cn.edu.tsinghua.iotdb.engine.bufferwrite.FileNodeConstants;
import cn.edu.tsinghua.iotdb.engine.memcontrol.BasicMemController;
import cn.edu.tsinghua.iotdb.engine.memtable.IMemTable;
import cn.edu.tsinghua.iotdb.engine.memtable.MemSeriesLazyMerger;
import cn.edu.tsinghua.iotdb.engine.memtable.PrimitiveMemTable;
import cn.edu.tsinghua.iotdb.engine.pool.FlushManager;
import cn.edu.tsinghua.iotdb.engine.querycontext.RawSeriesChunk;
import cn.edu.tsinghua.iotdb.engine.querycontext.RawSeriesChunkLazyLoadImpl;
import cn.edu.tsinghua.iotdb.engine.utils.FlushStatus;
import cn.edu.tsinghua.iotdb.exception.BufferWriteProcessorException;
import cn.edu.tsinghua.iotdb.utils.MemUtils;
import cn.edu.tsinghua.tsfile.common.conf.TSFileDescriptor;
import cn.edu.tsinghua.tsfile.common.exception.ProcessorException;
import cn.edu.tsinghua.tsfile.common.utils.Pair;
import cn.edu.tsinghua.tsfile.file.metadata.TimeSeriesChunkMetaData;
import cn.edu.tsinghua.tsfile.file.metadata.enums.TSDataType;
import cn.edu.tsinghua.tsfile.timeseries.write.record.DataPoint;
import cn.edu.tsinghua.tsfile.timeseries.write.record.TSRecord;
import cn.edu.tsinghua.tsfile.timeseries.write.schema.FileSchema;
public class BufferWriteProcessor extends Processor {
private static final Logger LOGGER = LoggerFactory.getLogger(BufferWriteProcessor.class);
private FileSchema fileSchema;
private BufferWriteResource bufferWriteResource;
private volatile FlushStatus flushStatus = new FlushStatus();
private volatile boolean isFlush;
private ReentrantLock flushQueryLock = new ReentrantLock();
private AtomicLong memSize = new AtomicLong();
private long memThreshold = TSFileDescriptor.getInstance().getConfig().groupSizeInByte;
private IMemTable workMemTable;
private IMemTable flushMemTable;
private Action bufferwriteFlushAction = null;
private Action bufferwriteCloseAction = null;
private Action filenodeFlushAction = null;
private long lastFlushTime = -1;
private long valueCount = 0;
private String baseDir;
private String fileName;
private String insertFilePath;
private WriteLogNode logNode;
public BufferWriteProcessor(String baseDir, String processorName, String fileName, Map<String, Object> parameters,
FileSchema fileSchema) throws BufferWriteProcessorException {
super(processorName);
this.fileSchema = fileSchema;
this.baseDir = baseDir;
this.fileName = fileName;
if (baseDir.length() > 0
&& baseDir.charAt(baseDir.length() - 1) != File.separatorChar) {
baseDir = baseDir + File.separatorChar;
}
String dataDirPath = baseDir + processorName;
File dataDir = new File(dataDirPath);
if (!dataDir.exists()) {
dataDir.mkdirs();
LOGGER.debug("The bufferwrite processor data dir doesn't exists, create new directory {}.", dataDirPath);
}
this.insertFilePath = new File(dataDir, fileName).getPath();
try {
bufferWriteResource = new BufferWriteResource(processorName, insertFilePath);
} catch (IOException e) {
throw new BufferWriteProcessorException(e);
}
bufferwriteFlushAction = (Action) parameters.get(FileNodeConstants.BUFFERWRITE_FLUSH_ACTION);
bufferwriteCloseAction = (Action) parameters.get(FileNodeConstants.BUFFERWRITE_CLOSE_ACTION);
filenodeFlushAction = (Action) parameters.get(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION);
workMemTable = new PrimitiveMemTable();
if(TsfileDBDescriptor.getInstance().getConfig().enableWal) {
try {
logNode = MultiFileLogNodeManager.getInstance().getNode(
processorName + TsFileDBConstant.BUFFERWRITE_LOG_NODE_SUFFIX, getBufferwriteRestoreFilePath(),
FileNodeManager.getInstance().getRestoreFilePath(processorName));
} catch (IOException e) {
throw new BufferWriteProcessorException(e);
}
}
}
/**
* write one data point to the bufferwrite
*
* @param deltaObjectId
* @param measurementId
* @param timestamp
* @param dataType
* @param value
* @return true -the size of tsfile or metadata reaches to the threshold.
* false -otherwise
* @throws BufferWriteProcessorException
*/
public boolean write(String deltaObjectId, String measurementId, long timestamp, TSDataType dataType, String value)
throws BufferWriteProcessorException {
TSRecord record = new TSRecord(timestamp, deltaObjectId);
DataPoint dataPoint = DataPoint.getDataPoint(dataType, measurementId, value);
record.addTuple(dataPoint);
return write(record);
}
public boolean write(TSRecord tsRecord) throws BufferWriteProcessorException {
long memUage = MemUtils.getRecordSize(tsRecord);
BasicMemController.UsageLevel level = BasicMemController.getInstance().reportUse(this, memUage);
for (DataPoint dataPoint : tsRecord.dataPointList) {
workMemTable.write(tsRecord.deltaObjectId, dataPoint.getMeasurementId(), dataPoint.getType(), tsRecord.time,
dataPoint.getValue().toString());
}
valueCount++;
switch (level) {
case SAFE:
// memUsed += newMemUsage;
// memtable
memUage = memSize.addAndGet(memUage);
if (memUage > memThreshold) {
LOGGER.info("The usage of memory {} in bufferwrite processor {} reaches the threshold {}",
MemUtils.bytesCntToStr(memUage), getProcessorName(), MemUtils.bytesCntToStr(memThreshold));
try {
flush();
} catch (IOException e) {
e.printStackTrace();
throw new BufferWriteProcessorException(e);
}
}
return false;
case WARNING:
LOGGER.warn("Memory usage will exceed warning threshold, current : {}.",
MemUtils.bytesCntToStr(BasicMemController.getInstance().getTotalUsage()));
// memUsed += newMemUsage;
// memtable
memUage = memSize.addAndGet(memUage);
if (memUage > memThreshold) {
LOGGER.info("The usage of memory {} in bufferwrite processor {} reaches the threshold {}",
MemUtils.bytesCntToStr(memUage), getProcessorName(), MemUtils.bytesCntToStr(memThreshold));
try {
flush();
} catch (IOException e) {
e.printStackTrace();
throw new BufferWriteProcessorException(e);
}
}
return false;
case DANGEROUS:
default:
LOGGER.warn("Memory usage will exceed dangerous threshold, current : {}.",
MemUtils.bytesCntToStr(BasicMemController.getInstance().getTotalUsage()));
return false;
}
}
public Pair<RawSeriesChunk, List<TimeSeriesChunkMetaData>> queryBufferwriteData(String deltaObjectId,
String measurementId, TSDataType dataType) {
flushQueryLock.lock();
try {
MemSeriesLazyMerger memSeriesLazyMerger = new MemSeriesLazyMerger();
if (isFlush) {
memSeriesLazyMerger.addMemSeries(flushMemTable.query(deltaObjectId, measurementId, dataType));
}
memSeriesLazyMerger.addMemSeries(workMemTable.query(deltaObjectId, measurementId, dataType));
RawSeriesChunk rawSeriesChunk = new RawSeriesChunkLazyLoadImpl(dataType, memSeriesLazyMerger);
return new Pair<>(rawSeriesChunk,
bufferWriteResource.getInsertMetadatas(deltaObjectId, measurementId, dataType));
} finally {
flushQueryLock.unlock();
}
}
private void switchWorkToFlush() {
flushQueryLock.lock();
try {
if (flushMemTable == null) {
flushMemTable = workMemTable;
workMemTable = new PrimitiveMemTable();
}
} finally {
isFlush = true;
flushQueryLock.unlock();
}
}
private void swithFlushToWork() {
flushQueryLock.lock();
try {
flushMemTable.clear();
flushMemTable = null;
bufferWriteResource.appendMetadata();
} finally {
isFlush = false;
flushQueryLock.unlock();
}
}
private void flushOperation(String flushFunction) {
long flushStartTime = System.currentTimeMillis();
LOGGER.info("The bufferwrite processor {} starts flushing {}.", getProcessorName(), flushFunction);
try {
bufferWriteResource.flush(fileSchema, flushMemTable);
filenodeFlushAction.act();
if (TsfileDBDescriptor.getInstance().getConfig().enableWal) {
logNode.notifyEndFlush(null);
}
} catch (IOException e) {
LOGGER.error("The bufferwrite processor {} failed to flush {}.", getProcessorName(), flushFunction, e);
} catch (Exception e) {
LOGGER.error("The bufferwrite processor {} failed to flush {}, when calling the filenodeFlushAction.",
getProcessorName(), flushFunction, e);
} finally {
synchronized (flushStatus) {
flushStatus.setUnFlushing();
swithFlushToWork();
flushStatus.notify();
LOGGER.info("The bufferwrite processor {} ends flushing {}.", getProcessorName(), flushFunction);
}
}
long flushEndTime = System.currentTimeMillis();
long flushInterval = flushEndTime - flushStartTime;
DateTime startDateTime = new DateTime(flushStartTime, TsfileDBDescriptor.getInstance().getConfig().timeZone);
DateTime endDateTime = new DateTime(flushEndTime, TsfileDBDescriptor.getInstance().getConfig().timeZone);
LOGGER.info(
"The bufferwrite processor {} flush {}, start time is {}, flush end time is {}, flush time consumption is {}ms",
getProcessorName(), flushFunction, startDateTime, endDateTime, flushInterval);
}
private Future<?> flush(boolean synchronization) throws IOException {
// statistic information for flush
if (lastFlushTime > 0) {
long thisFlushTime = System.currentTimeMillis();
long flushTimeInterval = thisFlushTime - lastFlushTime;
DateTime lastDateTime = new DateTime(lastFlushTime, TsfileDBDescriptor.getInstance().getConfig().timeZone);
DateTime thisDateTime = new DateTime(thisFlushTime, TsfileDBDescriptor.getInstance().getConfig().timeZone);
LOGGER.info(
"The bufferwrite processor {}: last flush time is {}, this flush time is {}, flush time interval is {}s",
getProcessorName(), lastDateTime, thisDateTime, flushTimeInterval / 1000);
}
lastFlushTime = System.currentTimeMillis();
// check value count
if (valueCount > 0) {
// waiting for the end of last flush operation.
synchronized (flushStatus) {
while (flushStatus.isFlushing()) {
try {
flushStatus.wait();
} catch (InterruptedException e) {
LOGGER.error(
"Encounter an interrupt error when waitting for the flushing, the bufferwrite processor is {}.",
getProcessorName(), e);
}
}
}
// update the lastUpdatetime, prepare for flush
try {
bufferwriteFlushAction.act();
} catch (Exception e) {
LOGGER.error("Failed to flush bufferwrite row group when calling the action function.");
throw new IOException(e);
}
if (TsfileDBDescriptor.getInstance().getConfig().enableWal) {
logNode.notifyStartFlush();
}
valueCount = 0;
flushStatus.setFlushing();
switchWorkToFlush();
BasicMemController.getInstance().reportFree(this, memSize.get());
memSize.set(0);
// switch
if (synchronization) {
flushOperation("synchronously");
} else {
FlushManager.getInstance().submit(new Runnable() {
public void run() {
flushOperation("asynchronously");
}
});
}
}
return null;
}
public boolean isFlush() {
synchronized (flushStatus) {
return flushStatus.isFlushing();
}
}
@Override
public boolean flush() throws IOException {
flush(false);
return false;
}
@Override
public boolean canBeClosed() {
return true;
}
@Override
public void close() throws ProcessorException {
try {
long closeStartTime = System.currentTimeMillis();
// flush data
flush(true);
// end file
bufferWriteResource.close(fileSchema);
// update the intervalfile for interval list
bufferwriteCloseAction.act();
// flush the changed information for filenode
filenodeFlushAction.act();
// delete the restore for this bufferwrite processor
long closeEndTime = System.currentTimeMillis();
long closeInterval = closeEndTime - closeStartTime;
DateTime startDateTime = new DateTime(closeStartTime,
TsfileDBDescriptor.getInstance().getConfig().timeZone);
DateTime endDateTime = new DateTime(closeEndTime, TsfileDBDescriptor.getInstance().getConfig().timeZone);
LOGGER.info(
"Close bufferwrite processor {}, the file name is {}, start time is {}, end time is {}, time consumption is {}ms",
getProcessorName(), fileName, startDateTime, endDateTime, closeInterval);
} catch (IOException e) {
LOGGER.error("Close the bufferwrite processor error, the bufferwrite is {}.", getProcessorName(), e);
throw new BufferWriteProcessorException(e);
} catch (Exception e) {
LOGGER.error("Failed to close the bufferwrite processor when calling the action function.", e);
throw new BufferWriteProcessorException(e);
}
}
@Override
public long memoryUsage() {
return memSize.get();
}
/**
* @return The sum of all timeseries's metadata size within this file.
*/
public long getMetaSize() {
// TODO : [MemControl] implement this
return 0;
}
/**
* @return The file size of the TsFile corresponding to this processor.
* @throws IOException
*/
public long getFileSize() {
// TODO : save this variable to avoid object creation?
File file = new File(insertFilePath);
return file.length() + memoryUsage();
}
/**
* Close current TsFile and open a new one for future writes. Block new
* writes and wait until current writes finish.
*/
public void rollToNewFile() {
// TODO : [MemControl] implement this
}
/**
* Check if this TsFile has too big metadata or file. If true, close current
* file and open a new one.
*
* @throws IOException
*/
private boolean checkSize() throws IOException {
TsfileDBConfig config = TsfileDBDescriptor.getInstance().getConfig();
long metaSize = getMetaSize();
long fileSize = getFileSize();
if (metaSize >= config.bufferwriteMetaSizeThreshold || fileSize >= config.bufferwriteFileSizeThreshold) {
LOGGER.info(
"The bufferwrite processor {}, size({}) of the file {} reaches threshold {}, size({}) of metadata reaches threshold {}.",
getProcessorName(), MemUtils.bytesCntToStr(fileSize), this.fileName,
MemUtils.bytesCntToStr(config.bufferwriteFileSizeThreshold), MemUtils.bytesCntToStr(metaSize),
MemUtils.bytesCntToStr(config.bufferwriteFileSizeThreshold));
rollToNewFile();
return true;
}
return false;
}
private String getBufferwriteRestoreFilePath() {
return bufferWriteResource.getRestoreFilePath();
}
}

View File

@ -1609,6 +1609,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
if (!endTimeMap.containsKey(deltaObjectId) || endTimeMap.get(deltaObjectId) < endTime) {
endTimeMap.put(deltaObjectId, endTime);
}
break;
default:
LOGGER.error("Not support data type: {}", dataType);
break;

View File

@ -0,0 +1,82 @@
package cn.edu.tsinghua.iotdb.exception.builder;
import cn.edu.tsinghua.iotdb.conf.TsFileDBConstant;
import cn.edu.tsinghua.iotdb.conf.TsfileDBConfig;
import cn.edu.tsinghua.iotdb.conf.TsfileDBDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.util.Iterator;
import java.util.Properties;
public class ExceptionBuilder {
private Properties properties = new Properties();
public static final int UNKNOWN_ERROR = 20000;
public static final int NO_PARAMETERS_EXISTS=20001;
public static final int INVALID_PARAMETER_NO=20002;
public static final int CONN_HOST_ERROR=20003;
public static final int AUTH_PLUGIN_ERR=20061;
public static final int INSECURE_API_ERR=20062;
public static final int OUT_OF_MEMORY=20064;
public static final int NO_PREPARE_STMT=20130;
public static final int CON_FAIL_ERR=20220;
private static final Logger LOGGER = LoggerFactory.getLogger(TsfileDBDescriptor.class);
public static final String CONFIG_NAME= "error_info_";
public static final String FILE_SUFFIX=".properties";
public static final String DEFAULT_FILEPATH="error_info_en.properties";
private static final ExceptionBuilder INSTANCE = new ExceptionBuilder();
public static final ExceptionBuilder getInstance() {
return ExceptionBuilder.INSTANCE;
}
public void loadInfo(String filePath){
InputStream in = null;
try {
in = new BufferedInputStream (new FileInputStream(filePath));
properties.load(new InputStreamReader(in,"utf-8"));
in.close();
} catch (IOException e) {
LOGGER.error("Read file error. File does not exist or file is broken. File path: {}.Because: {}.",filePath,e.getMessage());
} finally {
if (in != null) {
try {
in.close();
} catch (IOException e) {
LOGGER.error("Fail to close file: {}. Because: {}.",filePath,e.getMessage());
}
}
}
}
public void loadInfo(){
String language = TsfileDBDescriptor.getInstance().getConfig().languageVersion.toLowerCase();
String url = System.getProperty(TsFileDBConstant.IOTDB_CONF, null);
if (url == null) {
url = System.getProperty(TsFileDBConstant.IOTDB_HOME, null);
if (url != null) {
url = url + File.separatorChar + "conf" + File.separatorChar + ExceptionBuilder.CONFIG_NAME+language+FILE_SUFFIX;
} else {
LOGGER.warn("Cannot find IOTDB_HOME or IOTDB_CONF environment variable when loading config file {}, use default configuration", TsfileDBConfig.CONFIG_NAME);
return;
}
} else{
url += (File.separatorChar + ExceptionBuilder.CONFIG_NAME+language+FILE_SUFFIX);
}
File file = new File(url);
if(!file.exists()){
url.replace(CONFIG_NAME+language+FILE_SUFFIX, DEFAULT_FILEPATH);
}
loadInfo(url);
}
public String searchInfo(int errCode){
return properties.getProperty(String.valueOf(errCode));
}
}

View File

@ -0,0 +1,14 @@
package cn.edu.tsinghua.iotdb.exception.codebased;
import cn.edu.tsinghua.iotdb.exception.builder.ExceptionBuilder;
public class AuthPluginException extends IoTDBException {
public AuthPluginException() {
super(ExceptionBuilder.AUTH_PLUGIN_ERR);
}
public AuthPluginException(String userName, String additionalInfo) {
super(ExceptionBuilder.AUTH_PLUGIN_ERR, additionalInfo);
defaultInfo = String.format(defaultInfo, userName);
}
}

View File

@ -0,0 +1,12 @@
package cn.edu.tsinghua.iotdb.exception.codebased;
import cn.edu.tsinghua.iotdb.exception.builder.ExceptionBuilder;
public class ConnectionFailedException extends IoTDBException {
public ConnectionFailedException() {
super(ExceptionBuilder.CON_FAIL_ERR);
}
public ConnectionFailedException(String additionalInfo) {
super(ExceptionBuilder.CON_FAIL_ERR, additionalInfo);
}
}

View File

@ -0,0 +1,13 @@
package cn.edu.tsinghua.iotdb.exception.codebased;
import cn.edu.tsinghua.iotdb.exception.builder.ExceptionBuilder;
public class ConnectionHostException extends IoTDBException {
public ConnectionHostException() {
super(ExceptionBuilder.CONN_HOST_ERROR);
}
public ConnectionHostException(String ip, String port, String additionalInfo) {
super(ExceptionBuilder.CONN_HOST_ERROR, additionalInfo);
defaultInfo=String.format(defaultInfo, ip, port);
}
}

View File

@ -0,0 +1,13 @@
package cn.edu.tsinghua.iotdb.exception.codebased;
import cn.edu.tsinghua.iotdb.exception.builder.ExceptionBuilder;
public class InsecureAPIException extends IoTDBException{
public InsecureAPIException() {
super(ExceptionBuilder.INSECURE_API_ERR);
}
public InsecureAPIException(String functionName, String additionalInfo) {
super(ExceptionBuilder.INSECURE_API_ERR, additionalInfo);
defaultInfo=String.format(defaultInfo, functionName);
}
}

View File

@ -0,0 +1,12 @@
package cn.edu.tsinghua.iotdb.exception.codebased;
import cn.edu.tsinghua.iotdb.exception.builder.ExceptionBuilder;
public class InvalidParameterException extends IoTDBException{
public InvalidParameterException() {
super(ExceptionBuilder.INVALID_PARAMETER_NO);
}
public InvalidParameterException(String additionalInfo) {
super(ExceptionBuilder.INVALID_PARAMETER_NO, additionalInfo);
}
}

View File

@ -0,0 +1,28 @@
package cn.edu.tsinghua.iotdb.exception.codebased;
import cn.edu.tsinghua.iotdb.exception.builder.ExceptionBuilder;
public abstract class IoTDBException extends Exception{
private static final long serialVersionUID = -8998294067060075273L;
protected int errorCode;
protected String defaultInfo;
protected String additionalInfo;
public IoTDBException(int errorCode){
this.defaultInfo=ExceptionBuilder.getInstance().searchInfo(errorCode);
this.errorCode=errorCode;
}
public IoTDBException(int errCode, String additionalInfo){
this.errorCode=errCode;
this.additionalInfo=additionalInfo;
}
@Override
public String getMessage(){
if(additionalInfo==null){
return defaultInfo;
}else {
return defaultInfo + ". " + additionalInfo;
}
}
}

View File

@ -0,0 +1,12 @@
package cn.edu.tsinghua.iotdb.exception.codebased;
import cn.edu.tsinghua.iotdb.exception.builder.ExceptionBuilder;
public class NoParameterException extends IoTDBException{
public NoParameterException() {
super(ExceptionBuilder.NO_PARAMETERS_EXISTS);
}
public NoParameterException(String additionalInfo) {
super(ExceptionBuilder.NO_PARAMETERS_EXISTS, additionalInfo);
}
}

View File

@ -0,0 +1,12 @@
package cn.edu.tsinghua.iotdb.exception.codebased;
import cn.edu.tsinghua.iotdb.exception.builder.ExceptionBuilder;
public class NoPreparedStatementException extends IoTDBException {
public NoPreparedStatementException() {
super(ExceptionBuilder.NO_PREPARE_STMT);
}
public NoPreparedStatementException(String additionalInfo) {
super(ExceptionBuilder.NO_PREPARE_STMT, additionalInfo);
}
}

View File

@ -0,0 +1,12 @@
package cn.edu.tsinghua.iotdb.exception.codebased;
import cn.edu.tsinghua.iotdb.exception.builder.ExceptionBuilder;
public class OutOfMemoryException extends IoTDBException{
public OutOfMemoryException() {
super(ExceptionBuilder.OUT_OF_MEMORY);
}
public OutOfMemoryException(String additionalInfo) {
super(ExceptionBuilder.OUT_OF_MEMORY, additionalInfo);
}
}

View File

@ -0,0 +1,12 @@
package cn.edu.tsinghua.iotdb.exception.codebased;
import cn.edu.tsinghua.iotdb.exception.builder.ExceptionBuilder;
public class UnknownException extends IoTDBException {
public UnknownException() {
super(ExceptionBuilder.UNKNOWN_ERROR);
}
public UnknownException(String additionalInfo) {
super(ExceptionBuilder.UNKNOWN_ERROR, additionalInfo);
}
}

View File

@ -11,7 +11,7 @@ import cn.edu.tsinghua.iotdb.exception.PathErrorException;
import cn.edu.tsinghua.iotdb.exception.StartupException;
import cn.edu.tsinghua.iotdb.metadata.MManager;
import cn.edu.tsinghua.iotdb.query.engine.OverflowQueryEngine;
import cn.edu.tsinghua.iotdb.query.management.ReadLockManager;
import cn.edu.tsinghua.iotdb.query.management.ReadCacheManager;
import cn.edu.tsinghua.iotdb.service.IService;
import cn.edu.tsinghua.iotdb.service.ServiceType;
import cn.edu.tsinghua.tsfile.common.constant.StatisticConstant;
@ -139,9 +139,8 @@ public class StatMonitor implements IService{
try {
OnePassQueryDataSet queryDataSet;
queryDataSet = overflowQueryEngine.aggregate(pairList, null);
ReadLockManager.getInstance().unlockForOneRequest();
ReadCacheManager.getInstance().unlockForOneRequest();
OldRowRecord rowRecord = queryDataSet.getNextRecord();
if (rowRecord!=null) {
FileNodeManager fManager = FileNodeManager.getInstance();
HashMap<String, AtomicLong> statParamsHashMap = fManager.getStatParamsHashMap();

View File

@ -13,7 +13,7 @@ public class CreatorUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(CreatorUtils.class);
// unsequence overflow file
static final String unseqTsFilePathName = "unseqTsFilec";
static final String unseqTsFilePathName = "unSeqTsFile";
// restore file of this storage group
static final String restoreFilePathName = ".restore";

View File

@ -32,21 +32,20 @@ import static cn.edu.tsinghua.iotdb.performance.CreatorUtils.*;
import static cn.edu.tsinghua.iotdb.performance.ReaderCreator.getTsFileMetadata;
import static cn.edu.tsinghua.iotdb.performance.ReaderCreator.getUnSeqFileMetaData;
/**
* Created by zhangjinrui on 2018/3/13.
*/
public class ReadAnalysis {
private static final Logger LOGGER = LoggerFactory.getLogger(ReadAnalysis.class);
private static String mergeOutPutFolder;
private static String fileFolderName;
private static String unSeqFilePath;
private static String mergeOutPutFolder;
private static Map<String, Map<String, List<TimeSeriesChunkMetaData>>> unSeqFileMetaData;
private static Map<String, Pair<Long, Long>> unSeqFileDeltaObjectTimeRangeMap;
/**
* @param args merge test fileFolderName
*/
public static void main(String args[]) throws WriteProcessException, IOException, InterruptedException {
fileFolderName = args[0];
mergeOutPutFolder = fileFolderName + "/merge/";
@ -154,13 +153,6 @@ public class ReadAnalysis {
System.out.println(String.format("All file merge time consuming : %dms", allFileMergeEndTime - allFileMergeStartTime));
}
private TSRecord constructTsRecord(TimeValuePair timeValuePair, String deltaObjectId, String measurementId) {
TSRecord record = new TSRecord(timeValuePair.getTimestamp(), deltaObjectId);
record.addTuple(DataPoint.getDataPoint(timeValuePair.getValue().getDataType(), measurementId,
timeValuePair.getValue().getValue().toString()));
return record;
}
private TSEncoding getEncodingByDataType(TSDataType dataType) {
switch (dataType) {
case TEXT:

View File

@ -58,7 +58,7 @@ public class ReaderCreator {
public static TimeValuePairReader createReaderForMerge(String tsfilePath, String unseqTsFilePath,
Path path, long startTime, long endTime) throws IOException {
OverflowSeriesDataSource overflowSeriesDataSource = genDataSource(unseqTsFilePath, path);
TsfileDBDescriptor.getInstance().getConfig().bufferWriteDir = "";
TsfileDBDescriptor.getInstance().getConfig().bufferWriteDirs = new String[] {""};
Filter<?> filter = FilterFactory.and(TimeFilter.gtEq(startTime), TimeFilter.ltEq(endTime));
SeriesFilter<?> seriesFilter = new SeriesFilter<>(path, filter);
IntervalFileNode intervalFileNode = new IntervalFileNode(null, tsfilePath);
@ -70,7 +70,7 @@ public class ReaderCreator {
public static TimeValuePairReader createReaderOnlyForOverflowInsert(String unseqTsFilePath,
Path path, long startTime, long endTime) throws IOException {
OverflowSeriesDataSource overflowSeriesDataSource = genDataSource(unseqTsFilePath, path);
TsfileDBDescriptor.getInstance().getConfig().bufferWriteDir = "";
TsfileDBDescriptor.getInstance().getConfig().bufferWriteDirs = new String[] {""};
Filter<?> filter = FilterFactory.and(TimeFilter.gtEq(startTime), TimeFilter.ltEq(endTime));
SeriesFilter<?> seriesFilter = new SeriesFilter<>(path, filter);
TimeValuePairReader reader = SeriesReaderFactory.getInstance().createSeriesReaderForOverflowInsert(overflowSeriesDataSource,

View File

@ -21,7 +21,7 @@ import static cn.edu.tsinghua.iotdb.performance.ReaderCreator.getTsFileMetadata;
public class TsFileReadPerformance {
private static final String inputFilePath = "/Users/beyyes/Desktop/root.ln.wf632814.type4/1514676100617-1520943086803";
private static final String inputFilePath = "";
public static void main(String[] args) throws IOException {
long startTime = System.currentTimeMillis();

View File

@ -1,6 +1,10 @@
package cn.edu.tsinghua.iotdb.performance;
import cn.edu.tsinghua.iotdb.engine.bufferwrite.FileNodeConstants;
import cn.edu.tsinghua.iotdb.engine.filenode.FileNodeProcessor;
import cn.edu.tsinghua.iotdb.engine.querycontext.OverflowSeriesDataSource;
import cn.edu.tsinghua.iotdb.queryV2.factory.SeriesReaderFactory;
import cn.edu.tsinghua.tsfile.common.conf.TSFileConfig;
import cn.edu.tsinghua.tsfile.common.conf.TSFileDescriptor;
import cn.edu.tsinghua.tsfile.common.utils.ITsRandomAccessFileReader;
import cn.edu.tsinghua.tsfile.common.utils.Pair;
@ -17,13 +21,18 @@ import cn.edu.tsinghua.tsfile.timeseries.filterV2.factory.FilterFactory;
import cn.edu.tsinghua.tsfile.timeseries.read.TsRandomAccessLocalFileReader;
import cn.edu.tsinghua.tsfile.timeseries.read.support.Path;
import cn.edu.tsinghua.tsfile.timeseries.readV2.datatype.TimeValuePair;
import cn.edu.tsinghua.tsfile.timeseries.readV2.reader.SeriesReader;
import cn.edu.tsinghua.tsfile.timeseries.readV2.reader.TimeValuePairReader;
import cn.edu.tsinghua.tsfile.timeseries.write.TsFileWriter;
import cn.edu.tsinghua.tsfile.timeseries.write.desc.MeasurementDescriptor;
import cn.edu.tsinghua.tsfile.timeseries.write.exception.WriteProcessException;
import cn.edu.tsinghua.tsfile.timeseries.write.io.TsFileIOWriter;
import cn.edu.tsinghua.tsfile.timeseries.write.page.IPageWriter;
import cn.edu.tsinghua.tsfile.timeseries.write.page.PageWriterImpl;
import cn.edu.tsinghua.tsfile.timeseries.write.record.DataPoint;
import cn.edu.tsinghua.tsfile.timeseries.write.record.TSRecord;
import cn.edu.tsinghua.tsfile.timeseries.write.schema.FileSchema;
import cn.edu.tsinghua.tsfile.timeseries.write.series.SeriesWriterImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -44,16 +53,19 @@ public class WriteAnalysis {
private static final Logger LOGGER = LoggerFactory.getLogger(WriteAnalysis.class);
private static final TSFileConfig TsFileConf = TSFileDescriptor.getInstance().getConfig();
private static String mergeOutPutFolder;
private static String fileFolderName;
private static String unSeqFilePath;
private static String singleTsFilePath = "/Users/beyyes/Desktop/root.ln.wf632814.type4/1514676100617-1520943086803";
private static Map<String, Map<String, List<TimeSeriesChunkMetaData>>> unSeqFileMetaData;
private static Map<String, Pair<Long, Long>> unSeqFileDeltaObjectTimeRangeMap;
private static long max = -1;
/**
* @param args merge test fileFolderName
*/
public static void main(String args[]) throws WriteProcessException, IOException {
fileFolderName = args[0];
mergeOutPutFolder = fileFolderName + "/merge/";
@ -65,7 +77,7 @@ public class WriteAnalysis {
unSeqFileMetaData = getUnSeqFileMetaData(unSeqFilePath);
WriteAnalysis writeAnalysis = new WriteAnalysis();
writeAnalysis.initUnSeqFileStatistics();
//writeAnalysis.unTsFileReadPerformance();
//writeAnalysis.unSeqTsFileReadPerformance();
//writeAnalysis.tsFileReadPerformance();
writeAnalysis.executeMerge();
System.out.println("max memory usage:" + max);
@ -117,40 +129,72 @@ public class WriteAnalysis {
fileSchema.registerMeasurement(new MeasurementDescriptor(timeSeriesMetadata.getMeasurementUID(), timeSeriesMetadata.getType(),
getEncodingByDataType(timeSeriesMetadata.getType())));
}
TsFileWriter fileWriter = new TsFileWriter(outPutFile, fileSchema, TSFileDescriptor.getInstance().getConfig());
TsFileIOWriter fileIOWriter = new TsFileIOWriter(new File(mergeOutPutFolder + file.getName()));
//examine whether this TsFile should be merged
for (Map.Entry<String, TsDeltaObject> tsFileEntry : tsFileMetaData.getDeltaObjectMap().entrySet()) {
String tsFileDeltaObjectId = tsFileEntry.getKey();
long tsFileDeltaObjectStartTime = tsFileMetaData.getDeltaObject(tsFileDeltaObjectId).startTime;
long tsFileDeltaObjectEndTime = tsFileMetaData.getDeltaObject(tsFileDeltaObjectId).endTime;
if (unSeqFileMetaData.containsKey(tsFileDeltaObjectId) &&
unSeqFileDeltaObjectTimeRangeMap.get(tsFileDeltaObjectId).left <= tsFileDeltaObjectStartTime
&& unSeqFileDeltaObjectTimeRangeMap.get(tsFileDeltaObjectId).right >= tsFileDeltaObjectStartTime) {
for (TimeSeriesMetadata timeSeriesMetadata : tsFileMetaData.getTimeSeriesList()) {
if (unSeqFileMetaData.get(tsFileDeltaObjectId).containsKey(timeSeriesMetadata.getMeasurementUID())) {
String deltaObjectId = tsFileEntry.getKey();
long tsFileDeltaObjectStartTime = tsFileMetaData.getDeltaObject(deltaObjectId).startTime;
long tsFileDeltaObjectEndTime = tsFileMetaData.getDeltaObject(deltaObjectId).endTime;
boolean isRowGroupHasData = false;
TimeValuePairReader reader = ReaderCreator.createReaderForMerge(file.getPath(), unSeqFilePath,
new Path(tsFileDeltaObjectId + "." + timeSeriesMetadata.getMeasurementUID()),
if (unSeqFileMetaData.containsKey(deltaObjectId) &&
unSeqFileDeltaObjectTimeRangeMap.get(deltaObjectId).left <= tsFileDeltaObjectStartTime
&& unSeqFileDeltaObjectTimeRangeMap.get(deltaObjectId).right >= tsFileDeltaObjectStartTime) {
long recordCount = 0;
long startPos = 0;
for (TimeSeriesMetadata timeSeriesMetadata : tsFileMetaData.getTimeSeriesList()) {
String measurementId = timeSeriesMetadata.getMeasurementUID();
TSDataType dataType = timeSeriesMetadata.getType();
if (unSeqFileMetaData.get(deltaObjectId).containsKey(measurementId)) {
TimeValuePairReader seriesReader = ReaderCreator.createReaderForMerge(file.getPath(), unSeqFilePath,
new Path(deltaObjectId + "." + timeSeriesMetadata.getMeasurementUID()),
tsFileDeltaObjectStartTime, tsFileDeltaObjectEndTime);
// calc hasnext method executing time
while (reader.hasNext()) {
TimeValuePair tp = reader.next();
//count ++;
// calc time consuming of writing
TSRecord record = constructTsRecord(tp, tsFileDeltaObjectId, timeSeriesMetadata.getMeasurementUID());
fileWriter.write(record);
}
if (!seriesReader.hasNext()) {
// LOGGER.debug("The time-series {} has no data");
seriesReader.close();
} else {
reader.close();
if (!isRowGroupHasData) {
// start a new rowGroupMetadata
isRowGroupHasData = true;
fileIOWriter.startRowGroup(deltaObjectId);
startPos = fileIOWriter.getPos();
}
// init the serieswWriteImpl
MeasurementDescriptor desc = fileSchema.getMeasurementDescriptor(measurementId);
IPageWriter pageWriter = new PageWriterImpl(desc);
int pageSizeThreshold = TsFileConf.pageSizeInByte;
SeriesWriterImpl seriesWriterImpl = new SeriesWriterImpl(deltaObjectId, desc, pageWriter,
pageSizeThreshold);
// write the series data
recordCount += writeOneSeries(deltaObjectId, measurementId, seriesWriterImpl, dataType,
(SeriesReader) seriesReader, new HashMap<>(), new HashMap<>());
// flush the series data
seriesWriterImpl.writeToFileWriter(fileIOWriter);
seriesReader.close();
}
}
}
if (isRowGroupHasData) {
// end the new rowGroupMetadata
long memSize = fileIOWriter.getPos() - startPos;
fileIOWriter.endRowGroup(memSize, recordCount);
}
}
}
fileIOWriter.endFile(fileSchema);
randomAccessFileReader.close();
fileWriter.close();
long oneFileMergeEndTime = System.currentTimeMillis();
System.out.println(String.format("Current file merge time consuming : %sms, record number: %s," +
"original file size is %d, current file size is %d",
@ -196,7 +240,6 @@ public class WriteAnalysis {
fileSchema.registerMeasurement(new MeasurementDescriptor(timeSeriesMetadata.getMeasurementUID(), timeSeriesMetadata.getType(),
getEncodingByDataType(timeSeriesMetadata.getType())));
}
TsFileWriter fileWriter = new TsFileWriter(outPutFile, fileSchema, TSFileDescriptor.getInstance().getConfig());
for (Map.Entry<String, TsDeltaObject> tsFileEntry : tsFileMetaData.getDeltaObjectMap().entrySet()) {
String tsFileDeltaObjectId = tsFileEntry.getKey();
@ -216,9 +259,6 @@ public class WriteAnalysis {
//long tmpRecordCount = 0;
while (reader.hasNext()) {
TimeValuePair tp = reader.next();
TSRecord record = constructTsRecord(tp, tsFileDeltaObjectId, measurementId);
fileWriter.write(record);
//tmpRecordCount++;
}
// if (seriesPoints.containsKey(tsFileDeltaObjectId) && seriesPoints.get(tsFileDeltaObjectId).containsKey(measurementId)) {
// long originalCount = seriesPoints.get(tsFileDeltaObjectId).get(measurementId);
@ -253,7 +293,7 @@ public class WriteAnalysis {
* @throws IOException
* @throws WriteProcessException
*/
private void unTsFileReadPerformance() throws IOException {
private void unSeqTsFileReadPerformance() throws IOException {
Pair<Boolean, File[]> validFiles = getValidFiles(fileFolderName);
if (!validFiles.left) {
@ -298,14 +338,14 @@ public class WriteAnalysis {
tsFileReader.close();
// TimeValuePairReader overflowInsertReader = ReaderCreator.createReaderOnlyForOverflowInsert(unSeqFilePath,
// new Path(tsFileDeltaObjectId + "." + timeSeriesMetadata.getMeasurementUID()),
// tsFileDeltaObjectStartTime, tsFileDeltaObjectEndTime);
// while (overflowInsertReader.hasNext()) {
// TimeValuePair tp = overflowInsertReader.next();
// count ++;
// }
// overflowInsertReader.close();
TimeValuePairReader overflowInsertReader = ReaderCreator.createReaderOnlyForOverflowInsert(unSeqFilePath,
new Path(tsFileDeltaObjectId + "." + timeSeriesMetadata.getMeasurementUID()),
tsFileDeltaObjectStartTime, tsFileDeltaObjectEndTime);
while (overflowInsertReader.hasNext()) {
TimeValuePair tp = overflowInsertReader.next();
count ++;
}
overflowInsertReader.close();
}
}
}
@ -320,11 +360,141 @@ public class WriteAnalysis {
}
}
private TSRecord constructTsRecord(TimeValuePair timeValuePair, String deltaObjectId, String measurementId) {
TSRecord record = new TSRecord(timeValuePair.getTimestamp(), deltaObjectId);
record.addTuple(DataPoint.getDataPoint(timeValuePair.getValue().getDataType(), measurementId,
timeValuePair.getValue().getValue().toString()));
return record;
private int writeOneSeries(String deltaObjectId, String measurement, SeriesWriterImpl seriesWriterImpl,
TSDataType dataType, SeriesReader seriesReader, Map<String, Long> startTimeMap,
Map<String, Long> endTimeMap) throws IOException {
int count = 0;
if (!seriesReader.hasNext())
return 0;
TimeValuePair timeValuePair = seriesReader.next();
long startTime = -1;
long endTime = -1;
switch (dataType) {
case BOOLEAN:
seriesWriterImpl.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBoolean());
count++;
startTime = endTime = timeValuePair.getTimestamp();
if (!startTimeMap.containsKey(deltaObjectId) || startTimeMap.get(deltaObjectId) > startTime) {
startTimeMap.put(deltaObjectId, startTime);
}
if (!endTimeMap.containsKey(deltaObjectId) || endTimeMap.get(deltaObjectId) < endTime) {
endTimeMap.put(deltaObjectId, endTime);
}
while (seriesReader.hasNext()) {
count++;
timeValuePair = seriesReader.next();
endTime = timeValuePair.getTimestamp();
seriesWriterImpl.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBoolean());
}
if (!endTimeMap.containsKey(deltaObjectId) || endTimeMap.get(deltaObjectId) < endTime) {
endTimeMap.put(deltaObjectId, endTime);
}
break;
case INT32:
seriesWriterImpl.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getInt());
count++;
startTime = endTime = timeValuePair.getTimestamp();
if (!startTimeMap.containsKey(deltaObjectId) || startTimeMap.get(deltaObjectId) > startTime) {
startTimeMap.put(deltaObjectId, startTime);
}
if (!endTimeMap.containsKey(deltaObjectId) || endTimeMap.get(deltaObjectId) < endTime) {
endTimeMap.put(deltaObjectId, endTime);
}
while (seriesReader.hasNext()) {
count++;
timeValuePair = seriesReader.next();
endTime = timeValuePair.getTimestamp();
seriesWriterImpl.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getInt());
}
if (!endTimeMap.containsKey(deltaObjectId) || endTimeMap.get(deltaObjectId) < endTime) {
endTimeMap.put(deltaObjectId, endTime);
}
break;
case INT64:
seriesWriterImpl.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getLong());
count++;
startTime = endTime = timeValuePair.getTimestamp();
if (!startTimeMap.containsKey(deltaObjectId) || startTimeMap.get(deltaObjectId) > startTime) {
startTimeMap.put(deltaObjectId, startTime);
}
if (!endTimeMap.containsKey(deltaObjectId) || endTimeMap.get(deltaObjectId) < endTime) {
endTimeMap.put(deltaObjectId, endTime);
}
while (seriesReader.hasNext()) {
count++;
timeValuePair = seriesReader.next();
endTime = timeValuePair.getTimestamp();
seriesWriterImpl.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getLong());
}
if (!endTimeMap.containsKey(deltaObjectId) || endTimeMap.get(deltaObjectId) < endTime) {
endTimeMap.put(deltaObjectId, endTime);
}
break;
case FLOAT:
seriesWriterImpl.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getFloat());
count++;
startTime = endTime = timeValuePair.getTimestamp();
if (!startTimeMap.containsKey(deltaObjectId) || startTimeMap.get(deltaObjectId) > startTime) {
startTimeMap.put(deltaObjectId, startTime);
}
if (!endTimeMap.containsKey(deltaObjectId) || endTimeMap.get(deltaObjectId) < endTime) {
endTimeMap.put(deltaObjectId, endTime);
}
while (seriesReader.hasNext()) {
count++;
timeValuePair = seriesReader.next();
endTime = timeValuePair.getTimestamp();
seriesWriterImpl.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getFloat());
}
if (!endTimeMap.containsKey(deltaObjectId) || endTimeMap.get(deltaObjectId) < endTime) {
endTimeMap.put(deltaObjectId, endTime);
}
break;
case DOUBLE:
seriesWriterImpl.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getDouble());
count++;
startTime = endTime = timeValuePair.getTimestamp();
if (!startTimeMap.containsKey(deltaObjectId) || startTimeMap.get(deltaObjectId) > startTime) {
startTimeMap.put(deltaObjectId, startTime);
}
if (!endTimeMap.containsKey(deltaObjectId) || endTimeMap.get(deltaObjectId) < endTime) {
endTimeMap.put(deltaObjectId, endTime);
}
while (seriesReader.hasNext()) {
count++;
timeValuePair = seriesReader.next();
endTime = timeValuePair.getTimestamp();
seriesWriterImpl.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getDouble());
}
if (!endTimeMap.containsKey(deltaObjectId) || endTimeMap.get(deltaObjectId) < endTime) {
endTimeMap.put(deltaObjectId, endTime);
}
break;
case TEXT:
seriesWriterImpl.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBinary());
count++;
startTime = endTime = timeValuePair.getTimestamp();
if (!startTimeMap.containsKey(deltaObjectId) || startTimeMap.get(deltaObjectId) > startTime) {
startTimeMap.put(deltaObjectId, startTime);
}
if (!endTimeMap.containsKey(deltaObjectId) || endTimeMap.get(deltaObjectId) < endTime) {
endTimeMap.put(deltaObjectId, endTime);
}
while (seriesReader.hasNext()) {
count++;
timeValuePair = seriesReader.next();
endTime = timeValuePair.getTimestamp();
seriesWriterImpl.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBinary());
}
if (!endTimeMap.containsKey(deltaObjectId) || endTimeMap.get(deltaObjectId) < endTime) {
endTimeMap.put(deltaObjectId, endTime);
}
break;
default:
LOGGER.error("Not support data type: {}", dataType);
break;
}
return count;
}
private TSEncoding getEncodingByDataType(TSDataType dataType) {

View File

@ -22,6 +22,8 @@ import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;
import cn.edu.tsinghua.tsfile.timeseries.read.query.OnePassQueryDataSet;
import cn.edu.tsinghua.tsfile.timeseries.read.support.OldRowRecord;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -45,10 +47,10 @@ import cn.edu.tsinghua.tsfile.file.utils.ReadWriteThriftFormatUtils;
import cn.edu.tsinghua.tsfile.timeseries.basis.TsFile;
import cn.edu.tsinghua.tsfile.timeseries.read.FileReader;
import cn.edu.tsinghua.tsfile.timeseries.read.TsRandomAccessLocalFileReader;
import cn.edu.tsinghua.tsfile.timeseries.read.query.QueryDataSet;
import cn.edu.tsinghua.tsfile.timeseries.read.support.Field;
import cn.edu.tsinghua.tsfile.timeseries.read.support.Path;
import cn.edu.tsinghua.tsfile.timeseries.read.support.RowRecord;
/**
* @author lta
@ -384,9 +386,9 @@ public class ServerServiceImpl implements ServerService.Iface {
for (String timesery : timeseries) {
paths.add(new Path(timesery));
}
QueryDataSet queryDataSet = readTsFile.query(paths, null, null);
OnePassQueryDataSet queryDataSet = readTsFile.query(paths, null, null);
while (queryDataSet.hasNextRecord()) {
RowRecord record = queryDataSet.getNextRecord();
OldRowRecord record = queryDataSet.getNextRecord();
List<Field> fields = record.getFields();
String sql_front = null;
for (Field field : fields) {
@ -492,9 +494,9 @@ public class ServerServiceImpl implements ServerService.Iface {
Map<String, String> originDataPoint = new HashMap<>();
Map<String, String> newDataPoint = new HashMap<>();
String sqlFormat = "insert into %s(timestamp,%s) values(%s,%s)";
QueryDataSet queryDataSet = readTsFile.query(paths, null, null);
OnePassQueryDataSet queryDataSet = readTsFile.query(paths, null, null);
while (queryDataSet.hasNextRecord()) {
RowRecord record = queryDataSet.getNextRecord();
OldRowRecord record = queryDataSet.getNextRecord();
List<Field> fields = record.getFields();
String sql;
for (Field field : fields) { // get all data with the timesery in the postback file
@ -516,9 +518,9 @@ public class ServerServiceImpl implements ServerService.Iface {
try {
inputOverlap = new TsRandomAccessLocalFileReader(overlapFile);
TsFile readTsFileOverlap = new TsFile(inputOverlap);
QueryDataSet queryDataSetOverlap = readTsFileOverlap.query(paths, null, null);
OnePassQueryDataSet queryDataSetOverlap = readTsFileOverlap.query(paths, null, null);
while (queryDataSetOverlap.hasNextRecord()) {
RowRecord recordOverlap = queryDataSetOverlap.getNextRecord();
OldRowRecord recordOverlap = queryDataSetOverlap.getNextRecord();
List<Field> fields = recordOverlap.getFields();
String sql;
for (Field field : fields) {

View File

@ -1,11 +1,16 @@
package cn.edu.tsinghua.iotdb.query.engine;
import cn.edu.tsinghua.iotdb.concurrent.IoTDBThreadPoolFactory;
import cn.edu.tsinghua.iotdb.conf.TsfileDBDescriptor;
import cn.edu.tsinghua.iotdb.engine.filenode.FileNodeManager;
import cn.edu.tsinghua.iotdb.exception.FileNodeManagerException;
import cn.edu.tsinghua.iotdb.exception.PathErrorException;
import cn.edu.tsinghua.iotdb.metadata.MManager;
import cn.edu.tsinghua.iotdb.query.aggregation.AggregateFunction;
import cn.edu.tsinghua.iotdb.query.management.FileReaderMap;
import cn.edu.tsinghua.iotdb.query.management.FilterStructure;
import cn.edu.tsinghua.iotdb.query.management.ReadCachePrefix;
import cn.edu.tsinghua.iotdb.query.management.ReadCacheManager;
import cn.edu.tsinghua.iotdb.query.reader.AggregateRecordReader;
import cn.edu.tsinghua.iotdb.query.reader.QueryRecordReader;
import cn.edu.tsinghua.iotdb.query.reader.ReaderType;
@ -23,6 +28,8 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import static cn.edu.tsinghua.iotdb.query.engine.EngineUtils.noFilterOrOnlyHasTimeFilter;
@ -38,6 +45,19 @@ public class AggregateEngine {
private int crossQueryFetchSize =
10 * TsfileDBDescriptor.getInstance().getConfig().fetchSize;
//private ExecutorService aggregateThreadPool;
// private AggregateEngine() {
// }
// private static class AggregateEngineHolder {
// private static final AggregateEngine INSTANCE = new AggregateEngine();
//
// }
//
// public static final AggregateEngine getInstance() {
// return AggregateEngineHolder.INSTANCE;
// }
/**
* <p>Public invoking method of multiple aggregation.
*
@ -222,18 +242,56 @@ public class AggregateEngine {
throws PathErrorException, ProcessorException, IOException {
int aggreNumber = 0;
CountDownLatch latch = new CountDownLatch(aggres.size());
ExecutorService service = IoTDBThreadPoolFactory.newFixedThreadPool(Runtime.getRuntime().availableProcessors() - 1,
"AggregateThread");
//ExecutorService service = Executors.newFixedThreadPool(aggres.size());
for (Pair<Path, AggregateFunction> pair : aggres) {
aggreNumber++;
Path path = pair.left;
AggregateFunction aggregateFunction = pair.right;
String deltaObjectUID = path.getDeltaObjectToString();
String measurementUID = path.getMeasurementToString();
AggregateRecordReader recordReader = (AggregateRecordReader)
RecordReaderFactory.getInstance().getRecordReader(deltaObjectUID, measurementUID,
queryTimeFilter, null, null, ReadCachePrefix.addQueryPrefix(aggreNumber), ReaderType.AGGREGATE);
queryTimeFilter, null, null, ReadCachePrefix.addQueryPrefix(aggreNumber), ReaderType.AGGREGATE);
service.submit(new AggregateThread(recordReader, aggregateFunction, latch));
}
recordReader.aggregate(aggregateFunction);
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
service.shutdown();
}
private class AggregateThread implements Runnable {
private AggregateRecordReader aggregateRecordReader;
private AggregateFunction aggregateFunction;
private CountDownLatch latch;
public AggregateThread(AggregateRecordReader aggregateRecordReader, AggregateFunction aggregateFunction, CountDownLatch latch) {
this.aggregateRecordReader = aggregateRecordReader;
this.aggregateFunction = aggregateFunction;
this.latch = latch;
}
@Override
public void run() {
try {
aggregateRecordReader .aggregate(aggregateFunction);
// FileNodeManager.getInstance().endQuery(deltaObjectUID, recordReader.getReadToken());
// ReadCacheManager.getInstance().removeReadToken(deltaObjectUID, recordReader.getReadToken());
FileReaderMap.getInstance().close();
latch.countDown();
} catch (ProcessorException | IOException e) {
e.printStackTrace();
}
}
}
@ -244,8 +302,8 @@ public class AggregateEngine {
* once for querying d1.s1, once for querying d2.s1.
* <p>
* When this method is invoked, need add the filter index as a new parameter, for the reason of exist of
* <code>RecordReaderCache</code>, if the composition of CrossFilterExpression exist same SingleFilterExpression,
* we must guarantee that the <code>RecordReaderCache</code> doesn't cause conflict to the same SingleFilterExpression.
* <code>RecordReaderCacheManager</code>, if the composition of CrossFilterExpression exist same SingleFilterExpression,
* we must guarantee that the <code>RecordReaderCacheManager</code> doesn't cause conflict to the same SingleFilterExpression.
*/
private DynamicOneColumnData getDataUseSingleValueFilter(SingleSeriesFilterExpression queryValueFilter,
DynamicOneColumnData res, int fetchSize, int valueFilterNumber)

View File

@ -11,7 +11,7 @@ import cn.edu.tsinghua.iotdb.query.fill.LinearFill;
import cn.edu.tsinghua.iotdb.query.fill.PreviousFill;
import cn.edu.tsinghua.iotdb.query.management.FilterStructure;
import cn.edu.tsinghua.iotdb.query.management.ReadCachePrefix;
import cn.edu.tsinghua.iotdb.query.management.ReadLockManager;
import cn.edu.tsinghua.iotdb.query.management.ReadCacheManager;
import cn.edu.tsinghua.iotdb.query.reader.QueryRecordReader;
import cn.edu.tsinghua.iotdb.query.reader.ReaderType;
import cn.edu.tsinghua.iotdb.query.reader.RecordReaderFactory;
@ -110,16 +110,15 @@ public class OverflowQueryEngine {
aggregations.add(new Pair<>(pair.left, aggregateFunction));
}
AggregateEngine aggregateEngine = new AggregateEngine();
aggregateEngine.multiAggregate(aggregations, filterStructures);
new AggregateEngine().multiAggregate(aggregations, filterStructures);
OnePassQueryDataSet ansOnePassQueryDataSet = new OnePassQueryDataSet();
for (Pair<Path, AggregateFunction> pair : aggregations) {
AggregateFunction aggregateFunction = pair.right;
if (aggregateFunction.resultData.timeLength == 0) {
aggregateFunction.putDefaultValue();
}
LOGGER.debug(String.format("key %s, data length %s, empty data length %s", EngineUtils.aggregationKey(aggregateFunction, pair.left),
aggregateFunction.resultData.timeLength, aggregateFunction.resultData.emptyTimeLength));
LOGGER.debug(String.format("key %s, data time length %s, data value length %s, empty data time length %s", EngineUtils.aggregationKey(aggregateFunction, pair.left),
aggregateFunction.resultData.timeLength, aggregateFunction.resultData.valueLength, aggregateFunction.resultData.emptyTimeLength));
ansOnePassQueryDataSet.mapRet.put(EngineUtils.aggregationKey(aggregateFunction, pair.left), aggregateFunction.resultData);
}
// aggregateThreadLocal.set(ansOnePassQueryDataSet);
@ -140,9 +139,9 @@ public class OverflowQueryEngine {
public OnePassQueryDataSet groupBy(List<Pair<Path, String>> aggres, List<FilterStructure> filterStructures,
long unit, long origin, List<Pair<Long, Long>> intervals, int fetchSize) {
ThreadLocal<Integer> groupByCalcTime = ReadLockManager.getInstance().getGroupByCalcTime();
ThreadLocal<GroupByEngineNoFilter> groupByEngineNoFilterLocal = ReadLockManager.getInstance().getGroupByEngineNoFilterLocal();
ThreadLocal<GroupByEngineWithFilter> groupByEngineWithFilterLocal = ReadLockManager.getInstance().getGroupByEngineWithFilterLocal();
ThreadLocal<Integer> groupByCalcTime = ReadCacheManager.getInstance().getGroupByCalcTime();
ThreadLocal<GroupByEngineNoFilter> groupByEngineNoFilterLocal = ReadCacheManager.getInstance().getGroupByEngineNoFilterLocal();
ThreadLocal<GroupByEngineWithFilter> groupByEngineWithFilterLocal = ReadCacheManager.getInstance().getGroupByEngineWithFilterLocal();
if (groupByCalcTime.get() == null) {
@ -419,8 +418,8 @@ public class OverflowQueryEngine {
* once for querying d1.s1, once for querying d2.s1.
* <p>
* When this method is invoked, need add the filter index as a new parameter, for the reason of exist of
* <code>RecordReaderCache</code>, if the composition of CrossFilterExpression exist same SingleFilterExpression,
* we must guarantee that the <code>RecordReaderCache</code> doesn't cause conflict to the same SingleFilterExpression.
* <code>RecordReaderCacheManager</code>, if the composition of CrossFilterExpression exist same SingleFilterExpression,
* we must guarantee that the <code>RecordReaderCacheManager</code> doesn't cause conflict to the same SingleFilterExpression.
*/
private DynamicOneColumnData querySeriesForCross(SingleSeriesFilterExpression queryValueFilter,
DynamicOneColumnData res, int fetchSize, int valueFilterNumber)

View File

@ -381,8 +381,8 @@ public class GroupByEngineWithFilter {
* once for querying d1.s1, once for querying d2.s1.
* <p>
* When this method is invoked, need add the filter index as a new parameter, for the reason of exist of
* <code>RecordReaderCache</code>, if the composition of CrossFilterExpression exist same SingleFilterExpression,
* we must guarantee that the <code>RecordReaderCache</code> doesn't cause conflict to the same SingleFilterExpression.
* <code>RecordReaderCacheManager</code>, if the composition of CrossFilterExpression exist same SingleFilterExpression,
* we must guarantee that the <code>RecordReaderCacheManager</code> doesn't cause conflict to the same SingleFilterExpression.
*/
private static DynamicOneColumnData getDataUseSingleValueFilter(SingleSeriesFilterExpression queryValueFilter,
DynamicOneColumnData res, int fetchSize, int valueFilterNumber)

View File

@ -11,12 +11,12 @@ public class FileReaderMap {
/** map to store opened file stream **/
private static ThreadLocal<Map<String, TsRandomAccessLocalFileReader>> fileReaderMap = new ThreadLocal<>();
private static class ReaderHolder {
private static class FileReaderMapHolder {
private static final FileReaderMap INSTANCE = new FileReaderMap();
}
public static FileReaderMap getInstance() {
return ReaderHolder.INSTANCE;
return FileReaderMapHolder.INSTANCE;
}
public TsRandomAccessLocalFileReader get(String path) throws IOException {

View File

@ -2,6 +2,7 @@ package cn.edu.tsinghua.iotdb.query.management;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import cn.edu.tsinghua.iotdb.engine.filenode.FileNodeManager;
import cn.edu.tsinghua.iotdb.exception.FileNodeManagerException;
@ -11,23 +12,33 @@ import cn.edu.tsinghua.tsfile.common.exception.ProcessorException;
/**
* <p>
* Read process lock manager and ThreadLocal variable manager.
* Read lock manager, ThreadLocal variable and RecordReaderCacheManager manager.
* When a query process is over or quit abnormally, the <code>unlockForOneRequest</code> method will
* be invoked to clear the thread level variable.
* </p>
*
*/
public class ReadLockManager {
public class ReadCacheManager {
private static final class ReadCacheManagerHolder {
private static final ReadCacheManager INSTANCE = new ReadCacheManager();
}
private ReadCacheManager() {
}
public static ReadCacheManager getInstance() {
return ReadCacheManagerHolder.INSTANCE;
}
private static ReadLockManager instance = new ReadLockManager();
private FileNodeManager fileNodeManager = FileNodeManager.getInstance();
/** storage deltaObjectId and its read lock **/
private ThreadLocal<HashMap<String, Integer>> locksMap = new ThreadLocal<>();
/** this is no need to set as ThreadLocal, RecordReaderCache has ThreadLocal variable**/
public RecordReaderCache recordReaderCache = new RecordReaderCache();
/** this is no need to set as ThreadLocal, RecordReaderCacheManager has ThreadLocal variable**/
private RecordReaderCacheManager recordReaderCacheManager = new RecordReaderCacheManager();
/** represents the execute time of group by method**/
private ThreadLocal<Integer> groupByCalcTime;
@ -38,9 +49,6 @@ public class ReadLockManager {
/** ThreadLocal, due to the usage of OverflowQPExecutor **/
private ThreadLocal<GroupByEngineWithFilter> groupByEngineWithFilterLocal;
private ReadLockManager() {
}
public int lock(String deltaObjectUID) throws ProcessorException {
checkLocksMap();
int token;
@ -58,6 +66,10 @@ public class ReadLockManager {
return token;
}
public void removeReadToken(String deltaObjectId, int readToken) {
locksMap.get().remove(deltaObjectId, readToken);
}
/**
* When jdbc connection is closed normally or quit abnormally, this method should be invoked.<br>
* All read cache in this request should be released.
@ -68,7 +80,8 @@ public class ReadLockManager {
if (locksMap.get() == null) {
return;
}
HashMap<String, Integer> locks = locksMap.get();
Map<String, Integer> locks = locksMap.get();
for (String key : locks.keySet()) {
unlockForQuery(key, locks.get(key));
}
@ -85,7 +98,7 @@ public class ReadLockManager {
groupByEngineWithFilterLocal.remove();
}
recordReaderCache.clear();
recordReaderCacheManager.clear();
FileReaderMap.getInstance().close();
}
@ -98,10 +111,6 @@ public class ReadLockManager {
}
}
public static ReadLockManager getInstance() {
return instance;
}
private void checkLocksMap() {
if (locksMap.get() == null) {
locksMap.set(new HashMap<>());
@ -115,10 +124,6 @@ public class ReadLockManager {
return this.groupByCalcTime;
}
public void setGroupByCalcTime(ThreadLocal<Integer> t) {
this.groupByCalcTime = t;
}
public ThreadLocal<GroupByEngineNoFilter> getGroupByEngineNoFilterLocal() {
if (groupByEngineNoFilterLocal == null) {
groupByEngineNoFilterLocal = new ThreadLocal<>();
@ -126,10 +131,6 @@ public class ReadLockManager {
return this.groupByEngineNoFilterLocal;
}
public void setGroupByEngineNoFilterLocal(ThreadLocal<GroupByEngineNoFilter> t) {
this.groupByEngineNoFilterLocal = t;
}
public ThreadLocal<GroupByEngineWithFilter> getGroupByEngineWithFilterLocal() {
if (groupByEngineWithFilterLocal == null) {
groupByEngineWithFilterLocal = new ThreadLocal<>();
@ -137,8 +138,7 @@ public class ReadLockManager {
return this.groupByEngineWithFilterLocal;
}
public void setGroupByEngineWithFilterLocal(ThreadLocal<GroupByEngineWithFilter> t) {
this.groupByEngineWithFilterLocal = t;
public RecordReaderCacheManager getRecordReaderCacheManager() {
return recordReaderCacheManager;
}
}

View File

@ -8,7 +8,7 @@ import cn.edu.tsinghua.iotdb.query.reader.RecordReader;
/**
* Used for read process, put the query structure in the cache for one query process.
*/
public class RecordReaderCache {
public class RecordReaderCacheManager {
private ThreadLocal<HashMap<String, RecordReader>> cache = new ThreadLocal<>();

View File

@ -34,10 +34,10 @@ public class AggregateRecordReader extends RecordReader {
public AggregateRecordReader(GlobalSortedSeriesDataSource globalSortedSeriesDataSource, OverflowSeriesDataSource overflowSeriesDataSource,
String deltaObjectId, String measurementId,
SingleSeriesFilterExpression queryTimeFilter, SingleSeriesFilterExpression queryValueFilter)
SingleSeriesFilterExpression queryTimeFilter, SingleSeriesFilterExpression queryValueFilter, int readToken)
throws PathErrorException, IOException {
super(globalSortedSeriesDataSource, overflowSeriesDataSource, deltaObjectId, measurementId,
queryTimeFilter, queryValueFilter);
queryTimeFilter, queryValueFilter, readToken);
}
/**

View File

@ -17,9 +17,9 @@ public class FillRecordReader extends RecordReader{
public FillRecordReader(GlobalSortedSeriesDataSource globalSortedSeriesDataSource, OverflowSeriesDataSource overflowSeriesDataSource,
String deltaObjectId, String measurementId,
SingleSeriesFilterExpression queryTimeFilter, SingleSeriesFilterExpression queryValueFilter)
SingleSeriesFilterExpression queryTimeFilter, SingleSeriesFilterExpression queryValueFilter, int readToken)
throws PathErrorException, IOException {
super(globalSortedSeriesDataSource, overflowSeriesDataSource, deltaObjectId, measurementId, queryTimeFilter, queryValueFilter);
super(globalSortedSeriesDataSource, overflowSeriesDataSource, deltaObjectId, measurementId, queryTimeFilter, queryValueFilter, readToken);
}
/**

View File

@ -33,9 +33,9 @@ public class QueryRecordReader extends RecordReader {
public QueryRecordReader(GlobalSortedSeriesDataSource globalSortedSeriesDataSource, OverflowSeriesDataSource overflowSeriesDataSource,
String deltaObjectId, String measurementId,
SingleSeriesFilterExpression queryTimeFilter, SingleSeriesFilterExpression queryValueFilter)
SingleSeriesFilterExpression queryTimeFilter, SingleSeriesFilterExpression queryValueFilter, int readToken)
throws PathErrorException, IOException {
super(globalSortedSeriesDataSource, overflowSeriesDataSource, deltaObjectId, measurementId, queryTimeFilter, queryValueFilter);
super(globalSortedSeriesDataSource, overflowSeriesDataSource, deltaObjectId, measurementId, queryTimeFilter, queryValueFilter, readToken);
overflowOperationReaderCopy = overflowOperationReader.copy();

View File

@ -81,9 +81,11 @@ public class RecordReader {
/** memRawSeriesChunk + overflowSeriesInsertReader + overflowOperationReader **/
protected InsertDynamicData insertMemoryData;
protected int readToken;
public RecordReader(GlobalSortedSeriesDataSource globalSortedSeriesDataSource, OverflowSeriesDataSource overflowSeriesDataSource,
String deltaObjectId, String measurementId,
SingleSeriesFilterExpression queryTimeFilter, SingleSeriesFilterExpression queryValueFilter)
SingleSeriesFilterExpression queryTimeFilter, SingleSeriesFilterExpression queryValueFilter, int readToken)
throws PathErrorException, IOException {
List<String> sealedFilePathList = new ArrayList<>();
@ -130,6 +132,8 @@ public class RecordReader {
if (queryValueFilter != null) {
singleValueVisitor = getSingleValueVisitorByDataType(dataType, queryValueFilter);
}
this.readToken = readToken;
}
public void closeFileStream() {
@ -144,4 +148,8 @@ public class RecordReader {
public InsertDynamicData getInsertMemoryData() {
return this.insertMemoryData;
}
public int getReadToken() {
return this.readToken;
}
}

View File

@ -4,7 +4,7 @@ import cn.edu.tsinghua.iotdb.engine.filenode.FileNodeManager;
import cn.edu.tsinghua.iotdb.engine.querycontext.QueryDataSource;
import cn.edu.tsinghua.iotdb.exception.FileNodeManagerException;
import cn.edu.tsinghua.iotdb.exception.PathErrorException;
import cn.edu.tsinghua.iotdb.query.management.ReadLockManager;
import cn.edu.tsinghua.iotdb.query.management.ReadCacheManager;
import cn.edu.tsinghua.tsfile.common.exception.ProcessorException;
import cn.edu.tsinghua.tsfile.timeseries.filter.definition.SingleSeriesFilterExpression;
import cn.edu.tsinghua.tsfile.timeseries.filterV2.expression.impl.QueryFilterFactory;
@ -27,11 +27,11 @@ public class RecordReaderFactory {
private static RecordReaderFactory instance = new RecordReaderFactory();
private FileNodeManager fileNodeManager;
private ReadLockManager readLockManager;
private ReadCacheManager readCacheManager;
private RecordReaderFactory() {
fileNodeManager = FileNodeManager.getInstance();
readLockManager = ReadLockManager.getInstance();
readCacheManager = ReadCacheManager.getInstance();
}
/**
@ -39,23 +39,23 @@ public class RecordReaderFactory {
*
* @param readLock if readLock is not null, the read lock of file node has been created,<br>
* else a new read lock token should be applied.
* @param prefix for the exist of <code>RecordReaderCache</code> and batch read, we need a prefix to
* @param prefix for the exist of <code>RecordReaderCacheManager</code> and batch read, we need a prefix to
* represent the uniqueness.
* @return <code>RecordReader</code>
*/
public RecordReader getRecordReader(String deltaObjectUID, String measurementID,
public synchronized RecordReader getRecordReader(String deltaObjectUID, String measurementID,
SingleSeriesFilterExpression timeFilter, SingleSeriesFilterExpression valueFilter,
Integer readLock, String prefix, ReaderType readerType)
throws ProcessorException, PathErrorException, IOException {
int token = 0;
int readToken = 0;
if (readLock == null) {
token = readLockManager.lock(deltaObjectUID);
readToken = readCacheManager.lock(deltaObjectUID);
} else {
token = readLock;
readToken = readLock;
}
String cacheDeltaKey = prefix + deltaObjectUID;
if (readLockManager.recordReaderCache.containsRecordReader(cacheDeltaKey, measurementID)) {
return readLockManager.recordReaderCache.get(cacheDeltaKey, measurementID);
if (readCacheManager.getRecordReaderCacheManager().containsRecordReader(cacheDeltaKey, measurementID)) {
return readCacheManager.getRecordReaderCacheManager().get(cacheDeltaKey, measurementID);
} else {
QueryDataSource queryDataSource;
try {
@ -65,28 +65,28 @@ public class RecordReaderFactory {
throw new ProcessorException(e.getMessage());
}
RecordReader recordReader = createANewRecordReader(deltaObjectUID, measurementID, timeFilter, valueFilter, queryDataSource, readerType);
readLockManager.recordReaderCache.put(cacheDeltaKey, measurementID, recordReader);
RecordReader recordReader = createANewRecordReader(deltaObjectUID, measurementID, timeFilter, valueFilter, queryDataSource, readerType, readToken);
readCacheManager.getRecordReaderCacheManager().put(cacheDeltaKey, measurementID, recordReader);
return recordReader;
}
}
private RecordReader createANewRecordReader(String deltaObjectUID, String measurementID,
SingleSeriesFilterExpression queryTimeFilter, SingleSeriesFilterExpression queryValueFilter,
QueryDataSource queryDataSource, ReaderType readerType) throws PathErrorException, IOException {
QueryDataSource queryDataSource, ReaderType readerType, int readToken) throws PathErrorException, IOException {
switch (readerType) {
case QUERY:
return new QueryRecordReader(queryDataSource.getSeriesDataSource(), queryDataSource.getOverflowSeriesDataSource(),
deltaObjectUID, measurementID, queryTimeFilter, queryValueFilter);
deltaObjectUID, measurementID, queryTimeFilter, queryValueFilter, readToken);
case AGGREGATE:
return new AggregateRecordReader(queryDataSource.getSeriesDataSource(), queryDataSource.getOverflowSeriesDataSource(),
deltaObjectUID, measurementID, queryTimeFilter, queryValueFilter);
deltaObjectUID, measurementID, queryTimeFilter, queryValueFilter, readToken);
case FILL:
return new FillRecordReader(queryDataSource.getSeriesDataSource(), queryDataSource.getOverflowSeriesDataSource(),
deltaObjectUID, measurementID, queryTimeFilter, queryValueFilter);
deltaObjectUID, measurementID, queryTimeFilter, queryValueFilter, readToken);
case GROUPBY:
return new QueryRecordReader(queryDataSource.getSeriesDataSource(), queryDataSource.getOverflowSeriesDataSource(),
deltaObjectUID, measurementID, queryTimeFilter, queryValueFilter);
deltaObjectUID, measurementID, queryTimeFilter, queryValueFilter, readToken);
}
return null;
@ -98,11 +98,11 @@ public class RecordReaderFactory {
// TODO this method is only used in test case and KV-match index
public void removeRecordReader(String deltaObjectId, String measurementId) throws IOException {
if (readLockManager.recordReaderCache.containsRecordReader(deltaObjectId, measurementId)) {
if (readCacheManager.getRecordReaderCacheManager().containsRecordReader(deltaObjectId, measurementId)) {
// close the RecordReader read stream.
readLockManager.recordReaderCache.get(deltaObjectId, measurementId).closeFileStream();
readLockManager.recordReaderCache.get(deltaObjectId, measurementId).closeFileStreamForOneRequest();
readLockManager.recordReaderCache.remove(deltaObjectId, measurementId);
readCacheManager.getRecordReaderCacheManager().get(deltaObjectId, measurementId).closeFileStream();
readCacheManager.getRecordReaderCacheManager().get(deltaObjectId, measurementId).closeFileStreamForOneRequest();
readCacheManager.getRecordReaderCacheManager().remove(deltaObjectId, measurementId);
}
}
}

View File

@ -4,6 +4,7 @@ import java.io.IOException;
import java.util.List;
import cn.edu.tsinghua.iotdb.exception.*;
import cn.edu.tsinghua.iotdb.exception.builder.ExceptionBuilder;
import cn.edu.tsinghua.iotdb.metadata.MManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -26,7 +27,7 @@ public class IoTDB implements IoTDBMBean{
private static final Logger LOGGER = LoggerFactory.getLogger(IoTDB.class);
private RegisterManager registerManager = new RegisterManager();
private final String MBEAN_NAME = String.format("%s:%s=%s", TsFileDBConstant.IOTDB_PACKAGE, TsFileDBConstant.JMX_TYPE, "IoTDB");
private ServerManager serverManager = ServerManager.getInstance();
private static class IoTDBHolder {
@ -82,7 +83,9 @@ public class IoTDB implements IoTDBMBean{
registerManager.register(BasicMemController.getInstance());
JMXService.registerMBean(getInstance(), MBEAN_NAME);
initErrorInformation();
serverManager.startServer();
}
@ -101,6 +104,10 @@ public class IoTDB implements IoTDBMBean{
Thread.setDefaultUncaughtExceptionHandler(new IoTDBDefaultThreadExceptionHandler());
}
private void initErrorInformation(){
ExceptionBuilder.getInstance().loadInfo();
}
/**
* Recover data using system log.
* @throws RecoverException

View File

@ -24,9 +24,8 @@ import cn.edu.tsinghua.iotdb.qp.logical.Operator;
import cn.edu.tsinghua.iotdb.qp.physical.PhysicalPlan;
import cn.edu.tsinghua.iotdb.qp.physical.crud.IndexQueryPlan;
import cn.edu.tsinghua.iotdb.qp.physical.crud.MultiQueryPlan;
import cn.edu.tsinghua.iotdb.qp.physical.crud.QueryPlan;
import cn.edu.tsinghua.iotdb.qp.physical.sys.AuthorPlan;
import cn.edu.tsinghua.iotdb.query.management.ReadLockManager;
import cn.edu.tsinghua.iotdb.query.management.ReadCacheManager;
import cn.edu.tsinghua.tsfile.common.exception.ProcessorException;
import cn.edu.tsinghua.tsfile.timeseries.read.support.Path;
import cn.edu.tsinghua.tsfile.timeseries.readV2.query.QueryDataSet;
@ -134,7 +133,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
public TSCloseOperationResp closeOperation(TSCloseOperationReq req) throws TException {
LOGGER.info("{}: receive close operation",TsFileDBConstant.GLOBAL_DB_NAME);
try {
ReadLockManager.getInstance().unlockForOneRequest();
ReadCacheManager.getInstance().unlockForOneRequest();
clearAllStatusForCurrentRequest();
} catch (ProcessorException | IOException e) {
LOGGER.error("Error in closeOperation : {}", e.getMessage());

View File

@ -11,7 +11,6 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import cn.edu.tsinghua.iotdb.engine.bufferwriteV2.BufferIO;
import cn.edu.tsinghua.iotdb.utils.EnvironmentUtils;
import cn.edu.tsinghua.tsfile.common.utils.TsRandomAccessFileWriter;
import cn.edu.tsinghua.tsfile.file.metadata.RowGroupMetaData;

View File

@ -18,7 +18,6 @@ import org.junit.Test;
import cn.edu.tsinghua.iotdb.engine.MetadataManagerHelper;
import cn.edu.tsinghua.iotdb.engine.PathUtils;
import cn.edu.tsinghua.iotdb.engine.bufferwriteV2.BufferWriteProcessor;
import cn.edu.tsinghua.iotdb.engine.querycontext.RawSeriesChunk;
import cn.edu.tsinghua.iotdb.utils.EnvironmentUtils;
import cn.edu.tsinghua.iotdb.utils.FileSchemaUtils;

View File

@ -13,7 +13,6 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import cn.edu.tsinghua.iotdb.engine.bufferwriteV2.BufferWriteResource;
import cn.edu.tsinghua.iotdb.engine.memtable.IMemTable;
import cn.edu.tsinghua.iotdb.engine.memtable.MemTableTestUtils;
import cn.edu.tsinghua.iotdb.engine.memtable.TreeSetMemTable;

View File

@ -16,7 +16,7 @@
//import cn.edu.tsinghua.iotdb.conf.TsfileDBConfig;
//import cn.edu.tsinghua.iotdb.conf.TsfileDBDescriptor;
//import cn.edu.tsinghua.iotdb.engine.MetadataManagerHelper;
//import cn.edu.tsinghua.iotdb.engine.bufferwrite.Action;
//import cn.edu.tsinghua.iotdb.engine.bufferwriteV2.Action;
//import cn.edu.tsinghua.iotdb.engine.bufferwrite.BufferWriteProcessor;
//import cn.edu.tsinghua.iotdb.engine.overflow.io.OverflowProcessor;
//import cn.edu.tsinghua.iotdb.exception.BufferWriteProcessorException;

View File

@ -19,7 +19,7 @@
//import cn.edu.tsinghua.iotdb.conf.TsfileDBDescriptor;
//import cn.edu.tsinghua.iotdb.engine.MetadataManagerHelper;
//import cn.edu.tsinghua.iotdb.engine.PathUtils;
//import cn.edu.tsinghua.iotdb.engine.bufferwrite.Action;
//import cn.edu.tsinghua.iotdb.engine.bufferwriteV2.Action;
//import cn.edu.tsinghua.iotdb.engine.bufferwrite.BufferWriteProcessor;
//import cn.edu.tsinghua.iotdb.engine.overflow.io.OverflowProcessor;
//import cn.edu.tsinghua.iotdb.exception.BufferWriteProcessorException;

View File

@ -0,0 +1,85 @@
package cn.edu.tsinghua.iotdb.exception;
import cn.edu.tsinghua.iotdb.exception.builder.ExceptionBuilder;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
public class ExceptionBuilderTest {
@Before
public void before() {
Properties prop = new Properties();
FileOutputStream out1 = null;
FileOutputStream out2 = null;
try {
out1 = new FileOutputStream("err_info_en.properties", true);
prop.setProperty("20000","Unknown error");
prop.setProperty("20001","No parameters exist in the statement");
prop.setProperty("20002","Invalid parameter number");
prop.setProperty("20003","Can't connect to server on {}({})");
prop.setProperty("20061","Authentication plugin {} reported error: {}");
prop.setProperty("20062","Insecure API function call: {}");
prop.setProperty("20064","Client ran out of memory");
prop.setProperty("20130","Statement not prepared");
prop.setProperty("20220","Fail to connect");
prop.store(new OutputStreamWriter(out1, "utf-8"), "english version");
out2 = new FileOutputStream("err_info_cn.properties", true);
prop.setProperty("20000","未知错误");
prop.setProperty("20001","语句中无变量");
prop.setProperty("20002","无效的变量");
prop.setProperty("20003","无法连接到服务器");
prop.setProperty("20061","验证失败");
prop.setProperty("20062","不安全的函数调用");
prop.setProperty("20064","客户端内存溢出");
prop.setProperty("20130","语句未就绪");
prop.setProperty("20220","连接失败");
prop.store(new OutputStreamWriter(out2, "utf-8"), "chinese version");
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
out1.close();
out2.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
@After
public void after() {
File file1 = new File("err_info_en.properties");
File file2 = new File("err_info_cn.properties");
if(file1.exists()){
file1.delete();
}
if(file2.exists()){
file2.delete();
}
}
@Test
public void testLoadProp() {
ExceptionBuilder excpHandler = new ExceptionBuilder();
excpHandler.loadInfo("err_info_en.properties");
assertEquals("Invalid parameter number",excpHandler.searchInfo(20002));
assertEquals("Can't connect to server on {}({})",excpHandler.searchInfo(20003));
excpHandler.loadInfo("err_info_cn.properties");
assertEquals("无法连接到服务器",excpHandler.searchInfo(20003));
assertEquals("验证失败",excpHandler.searchInfo(20061));
}
}

View File

@ -1,113 +1,113 @@
package cn.edu.tsinghua.iotdb.qp.query;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import cn.edu.tsinghua.iotdb.exception.ArgsErrorException;
import cn.edu.tsinghua.iotdb.exception.FileNodeManagerException;
import cn.edu.tsinghua.iotdb.qp.QueryProcessor;
import cn.edu.tsinghua.iotdb.qp.exception.QueryProcessorException;
import cn.edu.tsinghua.tsfile.timeseries.readV2.query.QueryDataSet;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import cn.edu.tsinghua.iotdb.qp.physical.PhysicalPlan;
import cn.edu.tsinghua.iotdb.qp.utils.MemIntQpExecutor;
import cn.edu.tsinghua.tsfile.common.constant.SystemConstant;
import cn.edu.tsinghua.tsfile.common.exception.ProcessorException;
import cn.edu.tsinghua.tsfile.timeseries.read.support.Path;
import cn.edu.tsinghua.tsfile.timeseries.read.query.OnePassQueryDataSet;
import cn.edu.tsinghua.tsfile.timeseries.utils.StringContainer;
/**
* test query operation
*
* @author kangrong
*/
@RunWith(Parameterized.class)
public class TestOnePassQpQuery {
private QueryProcessor processor = new QueryProcessor(new MemIntQpExecutor());
private final String inputSQL;
private final String[] expectRet;
@Parameters
public static Collection<Object[]> data() {
return Arrays
.asList(new Object[][]{
{
"select d1.s1 from root.laptop where root.laptop.d1.s1 < 100",
new String[]{"20, <root.laptop.d1.s1,21> ",
"40, <root.laptop.d1.s1,41> ",
"60, <root.laptop.d1.s1,61> ",
"80, <root.laptop.d1.s1,81> "}},
{
"select d1.s1, d1.s2 from root.laptop where root.laptop.d1.s1 < 100",
new String[]{"20, <root.laptop.d1.s1,21> <root.laptop.d1.s2,null> ",
"40, <root.laptop.d1.s1,41> <root.laptop.d1.s2,null> ",
"60, <root.laptop.d1.s1,61> <root.laptop.d1.s2,null> ",
"80, <root.laptop.d1.s1,81> <root.laptop.d1.s2,null> "}},
{
"select d1.s1 from root.laptop where d1.s1 < 100",
new String[]{"20, <root.laptop.d1.s1,21> ",
"40, <root.laptop.d1.s1,41> ",
"60, <root.laptop.d1.s1,61> ",
"80, <root.laptop.d1.s1,81> "}},
{
"select s1 from root.laptop.d1,root.laptop.d2 where root.laptop.d1.s1 < 100",
new String[]{"20, <root.laptop.d1.s1,21> <root.laptop.d2.s1,52> ",
"40, <root.laptop.d1.s1,41> <root.laptop.d2.s1,102> ",
"60, <root.laptop.d1.s1,61> <root.laptop.d2.s1,152> ",
"80, <root.laptop.d1.s1,81> <root.laptop.d2.s1,202> "}},
}
);
}
public TestOnePassQpQuery(String sql, String[] ret) {
inputSQL = sql;
this.expectRet = ret;
}
@Before
public void before() throws ProcessorException {
Path path1 = new Path(new StringContainer(new String[]{"root", "laptop", "d1", "s1"},
SystemConstant.PATH_SEPARATOR));
Path path2 = new Path(new StringContainer(new String[]{"root", "laptop", "d1", "s2"},
SystemConstant.PATH_SEPARATOR));
Path path3 = new Path(new StringContainer(new String[]{"root", "laptop", "d2", "s1"},
SystemConstant.PATH_SEPARATOR));
for (int i = 1; i <= 10; i++) {
processor.getExecutor().insert(path1, i * 20, Integer.toString(i * 20 + 1));
processor.getExecutor().insert(path2, i * 50, Integer.toString(i * 50 + 2));
processor.getExecutor().insert(path3, i * 20, Integer.toString(i * 50 + 2));
}
}
@Test
public void testQueryBasic() throws QueryProcessorException, ArgsErrorException, IOException, FileNodeManagerException, ProcessorException {
PhysicalPlan plan = processor.parseSQLToPhysicalPlan(inputSQL);
if (!plan.isQuery())
fail();
QueryDataSet queryDataSet = processor.getExecutor().processQuery(plan);
int i = 0;
while (queryDataSet.hasNext()) {
if (i == expectRet.length)
fail();
String actual = queryDataSet.next().toString();
System.out.println(actual);
assertEquals(expectRet[i++], actual);
}
System.out.println("query result:\n");
assertEquals(expectRet.length, i);
}
}
//package cn.edu.tsinghua.iotdb.qp.query;
//
//import static org.junit.Assert.assertEquals;
//import static org.junit.Assert.fail;
//
//import java.io.IOException;
//import java.util.Arrays;
//import java.util.Collection;
//import java.util.Iterator;
//
//import cn.edu.tsinghua.iotdb.exception.ArgsErrorException;
//import cn.edu.tsinghua.iotdb.exception.FileNodeManagerException;
//import cn.edu.tsinghua.iotdb.qp.QueryProcessor;
//import cn.edu.tsinghua.iotdb.qp.exception.QueryProcessorException;
//import cn.edu.tsinghua.tsfile.timeseries.readV2.query.QueryDataSet;
//import org.junit.Before;
//import org.junit.Test;
//import org.junit.runner.RunWith;
//import org.junit.runners.Parameterized;
//import org.junit.runners.Parameterized.Parameters;
//
//import cn.edu.tsinghua.iotdb.qp.physical.PhysicalPlan;
//import cn.edu.tsinghua.iotdb.qp.utils.MemIntQpExecutor;
//import cn.edu.tsinghua.tsfile.common.constant.SystemConstant;
//import cn.edu.tsinghua.tsfile.common.exception.ProcessorException;
//import cn.edu.tsinghua.tsfile.timeseries.read.support.Path;
//import cn.edu.tsinghua.tsfile.timeseries.read.query.OnePassQueryDataSet;
//import cn.edu.tsinghua.tsfile.timeseries.utils.StringContainer;
//
///**
// * test query operation
// *
// * @author kangrong
// */
//@RunWith(Parameterized.class)
//public class TestOnePassQpQuery {
// private QueryProcessor processor = new QueryProcessor(new MemIntQpExecutor());
//
// private final String inputSQL;
// private final String[] expectRet;
//
// @Parameters
// public static Collection<Object[]> data() {
// return Arrays
// .asList(new Object[][]{
// {
// "select d1.s1 from root.laptop where root.laptop.d1.s1 < 100",
// new String[]{"20, <root.laptop.d1.s1,21> ",
// "40, <root.laptop.d1.s1,41> ",
// "60, <root.laptop.d1.s1,61> ",
// "80, <root.laptop.d1.s1,81> "}},
// {
// "select d1.s1, d1.s2 from root.laptop where root.laptop.d1.s1 < 100",
// new String[]{"20, <root.laptop.d1.s1,21> <root.laptop.d1.s2,null> ",
// "40, <root.laptop.d1.s1,41> <root.laptop.d1.s2,null> ",
// "60, <root.laptop.d1.s1,61> <root.laptop.d1.s2,null> ",
// "80, <root.laptop.d1.s1,81> <root.laptop.d1.s2,null> "}},
// {
// "select d1.s1 from root.laptop where d1.s1 < 100",
// new String[]{"20, <root.laptop.d1.s1,21> ",
// "40, <root.laptop.d1.s1,41> ",
// "60, <root.laptop.d1.s1,61> ",
// "80, <root.laptop.d1.s1,81> "}},
// {
// "select s1 from root.laptop.d1,root.laptop.d2 where root.laptop.d1.s1 < 100",
// new String[]{"20, <root.laptop.d1.s1,21> <root.laptop.d2.s1,52> ",
// "40, <root.laptop.d1.s1,41> <root.laptop.d2.s1,102> ",
// "60, <root.laptop.d1.s1,61> <root.laptop.d2.s1,152> ",
// "80, <root.laptop.d1.s1,81> <root.laptop.d2.s1,202> "}},
// }
// );
// }
//
// public TestOnePassQpQuery(String sql, String[] ret) {
// inputSQL = sql;
// this.expectRet = ret;
// }
//
// @Before
// public void before() throws ProcessorException {
// Path path1 = new Path(new StringContainer(new String[]{"root", "laptop", "d1", "s1"},
// SystemConstant.PATH_SEPARATOR));
// Path path2 = new Path(new StringContainer(new String[]{"root", "laptop", "d1", "s2"},
// SystemConstant.PATH_SEPARATOR));
// Path path3 = new Path(new StringContainer(new String[]{"root", "laptop", "d2", "s1"},
// SystemConstant.PATH_SEPARATOR));
// for (int i = 1; i <= 10; i++) {
// processor.getExecutor().insert(path1, i * 20, Integer.toString(i * 20 + 1));
// processor.getExecutor().insert(path2, i * 50, Integer.toString(i * 50 + 2));
// processor.getExecutor().insert(path3, i * 20, Integer.toString(i * 50 + 2));
// }
// }
//
// //@Test
// public void testQueryBasic() throws QueryProcessorException, ArgsErrorException, IOException, FileNodeManagerException, ProcessorException {
// PhysicalPlan plan = processor.parseSQLToPhysicalPlan(inputSQL);
// if (!plan.isQuery())
// fail();
//
// QueryDataSet queryDataSet = processor.getExecutor().processQuery(plan);
// int i = 0;
// while (queryDataSet.hasNext()) {
// if (i == expectRet.length)
// fail();
// String actual = queryDataSet.next().toString();
// System.out.println(actual);
// assertEquals(expectRet[i++], actual);
// }
// System.out.println("query result:\n");
// assertEquals(expectRet.length, i);
// }
//
//}

View File

@ -1,151 +1,151 @@
package cn.edu.tsinghua.iotdb.qp.query;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import cn.edu.tsinghua.iotdb.exception.ArgsErrorException;
import cn.edu.tsinghua.iotdb.exception.FileNodeManagerException;
import cn.edu.tsinghua.iotdb.qp.QueryProcessor;
import cn.edu.tsinghua.iotdb.qp.exception.QueryProcessorException;
import cn.edu.tsinghua.tsfile.timeseries.readV2.query.QueryDataSet;
import org.antlr.runtime.RecognitionException;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import cn.edu.tsinghua.iotdb.qp.physical.PhysicalPlan;
import cn.edu.tsinghua.iotdb.qp.utils.MemIntQpExecutor;
import cn.edu.tsinghua.tsfile.common.constant.SystemConstant;
import cn.edu.tsinghua.tsfile.common.exception.ProcessorException;
import cn.edu.tsinghua.tsfile.timeseries.read.support.Path;
import cn.edu.tsinghua.tsfile.timeseries.read.query.OnePassQueryDataSet;
import cn.edu.tsinghua.tsfile.timeseries.utils.StringContainer;
/**
* test query operation
*
* @author kangrong
*
*/
@RunWith(Parameterized.class)
public class TestQpQuery {
private static final Logger LOG = LoggerFactory.getLogger(TestQpQuery.class);
private QueryProcessor processor = new QueryProcessor(new MemIntQpExecutor());
@Before
public void before() throws ProcessorException {
Path path1 = new Path(new StringContainer(
new String[]{"root", "laptop", "device_1", "sensor_1"},
SystemConstant.PATH_SEPARATOR));
Path path2 = new Path(new StringContainer(
new String[]{"root", "laptop", "device_1", "sensor_2"},
SystemConstant.PATH_SEPARATOR));
for (int i = 1; i <= 10; i++) {
processor.getExecutor().insert(path1, i * 20, Integer.toString(i * 20 + 1));
processor.getExecutor().insert(path2, i * 50, Integer.toString(i * 50 + 2));
}
}
@Parameters
public static Collection<Object[]> data() {
return Arrays
.asList(new Object[][] {
// test time,
{
"select sensor_1,sensor_2 from root.laptop.device_1 where time <= 51",
new String[] {
"20, <root.laptop.device_1.sensor_1,21> <root.laptop.device_1.sensor_2,null> ",
"40, <root.laptop.device_1.sensor_1,41> <root.laptop.device_1.sensor_2,null> ",
"50, <root.laptop.device_1.sensor_1,null> <root.laptop.device_1.sensor_2,52> "}
},
//// // test complex time,
{
"select sensor_1,sensor_2 " + "from root.laptop.device_1 "
+ "where time <= 51 or (time != 100 and time > 460)",
new String[] {
"20, <root.laptop.device_1.sensor_1,21> <root.laptop.device_1.sensor_2,null> ",
"40, <root.laptop.device_1.sensor_1,41> <root.laptop.device_1.sensor_2,null> ",
"50, <root.laptop.device_1.sensor_1,null> <root.laptop.device_1.sensor_2,52> ",
"500, <root.laptop.device_1.sensor_1,null> <root.laptop.device_1.sensor_2,502> "}
},
//// // test not
{
"select sensor_1,sensor_2 " + "from root.laptop.device_1 "
+ "where time <= 51 or !(time != 100 and time < 460)",
new String[] {
"20, <root.laptop.device_1.sensor_1,21> <root.laptop.device_1.sensor_2,null> ",
"40, <root.laptop.device_1.sensor_1,41> <root.laptop.device_1.sensor_2,null> ",
"50, <root.laptop.device_1.sensor_1,null> <root.laptop.device_1.sensor_2,52> ",
"100, <root.laptop.device_1.sensor_1,101> <root.laptop.device_1.sensor_2,102> ",
"500, <root.laptop.device_1.sensor_1,null> <root.laptop.device_1.sensor_2,502> "}
},
// test DNF, just test DNF transform original expression to a conjunction
{
"select sensor_1,sensor_2 "
+ "from root.laptop.device_1 "
+ "where time <= 20 and (sensor_1 >= 60 or sensor_1 <= 110)",
// new String[] {"20, <root.laptop.device_1.sensor_1,21> <root.laptop.device_1.sensor_2,null> "}
new String[] {"20\t21\tnull"}
},
// TODO
// test DNF2
// {
//package cn.edu.tsinghua.iotdb.qp.query;
//
//import static org.junit.Assert.assertEquals;
//import static org.junit.Assert.fail;
//
//import java.io.IOException;
//import java.util.Arrays;
//import java.util.Collection;
//import java.util.Iterator;
//
//import cn.edu.tsinghua.iotdb.exception.ArgsErrorException;
//import cn.edu.tsinghua.iotdb.exception.FileNodeManagerException;
//import cn.edu.tsinghua.iotdb.qp.QueryProcessor;
//import cn.edu.tsinghua.iotdb.qp.exception.QueryProcessorException;
//import cn.edu.tsinghua.tsfile.timeseries.readV2.query.QueryDataSet;
//import org.antlr.runtime.RecognitionException;
//import org.junit.Before;
//import org.junit.Test;
//import org.junit.runner.RunWith;
//import org.junit.runners.Parameterized;
//import org.junit.runners.Parameterized.Parameters;
//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
//
//import cn.edu.tsinghua.iotdb.qp.physical.PhysicalPlan;
//import cn.edu.tsinghua.iotdb.qp.utils.MemIntQpExecutor;
//import cn.edu.tsinghua.tsfile.common.constant.SystemConstant;
//import cn.edu.tsinghua.tsfile.common.exception.ProcessorException;
//import cn.edu.tsinghua.tsfile.timeseries.read.support.Path;
//import cn.edu.tsinghua.tsfile.timeseries.read.query.OnePassQueryDataSet;
//import cn.edu.tsinghua.tsfile.timeseries.utils.StringContainer;
//
//
///**
// * test query operation
// *
// * @author kangrong
// *
// */
//@RunWith(Parameterized.class)
//public class TestQpQuery {
// private static final Logger LOG = LoggerFactory.getLogger(TestQpQuery.class);
// private QueryProcessor processor = new QueryProcessor(new MemIntQpExecutor());
//
// @Before
// public void before() throws ProcessorException {
// Path path1 = new Path(new StringContainer(
// new String[]{"root", "laptop", "device_1", "sensor_1"},
// SystemConstant.PATH_SEPARATOR));
// Path path2 = new Path(new StringContainer(
// new String[]{"root", "laptop", "device_1", "sensor_2"},
// SystemConstant.PATH_SEPARATOR));
// for (int i = 1; i <= 10; i++) {
// processor.getExecutor().insert(path1, i * 20, Integer.toString(i * 20 + 1));
// processor.getExecutor().insert(path2, i * 50, Integer.toString(i * 50 + 2));
// }
// }
//
// @Parameters
// public static Collection<Object[]> data() {
// return Arrays
// .asList(new Object[][] {
// // test time,
// {
// "select sensor_1,sensor_2 from root.laptop.device_1 where time <= 51",
// new String[] {
// "20, <root.laptop.device_1.sensor_1,21> <root.laptop.device_1.sensor_2,null> ",
// "40, <root.laptop.device_1.sensor_1,41> <root.laptop.device_1.sensor_2,null> ",
// "50, <root.laptop.device_1.sensor_1,null> <root.laptop.device_1.sensor_2,52> "}
// },
////// // test complex time,
// {
// "select sensor_1,sensor_2 " + "from root.laptop.device_1 "
// + "where time <= 51 or (time != 100 and time > 460)",
// new String[] {
// "20, <root.laptop.device_1.sensor_1,21> <root.laptop.device_1.sensor_2,null> ",
// "40, <root.laptop.device_1.sensor_1,41> <root.laptop.device_1.sensor_2,null> ",
// "50, <root.laptop.device_1.sensor_1,null> <root.laptop.device_1.sensor_2,52> ",
// "500, <root.laptop.device_1.sensor_1,null> <root.laptop.device_1.sensor_2,502> "}
// },
////// // test not
// {
// "select sensor_1,sensor_2 " + "from root.laptop.device_1 "
// + "where time <= 51 or !(time != 100 and time < 460)",
// new String[] {
// "20, <root.laptop.device_1.sensor_1,21> <root.laptop.device_1.sensor_2,null> ",
// "40, <root.laptop.device_1.sensor_1,41> <root.laptop.device_1.sensor_2,null> ",
// "50, <root.laptop.device_1.sensor_1,null> <root.laptop.device_1.sensor_2,52> ",
// "100, <root.laptop.device_1.sensor_1,101> <root.laptop.device_1.sensor_2,102> ",
// "500, <root.laptop.device_1.sensor_1,null> <root.laptop.device_1.sensor_2,502> "}
// },
// // test DNF, just test DNF transform original expression to a conjunction
// {
// "select sensor_1,sensor_2 "
// + "from root.laptop.device_1 "
// + "where time < 150 and (sensor_1 > 30 or time >= 60)",
// new String[] {
// "40, <root.laptop.device_1.sensor_1,41> <root.laptop.device_1.sensor_2,null> ",
// "60, <root.laptop.device_1.sensor_1,61> <root.laptop.device_1.sensor_2,null> ",
// "80, <root.laptop.device_1.sensor_1,81> <root.laptop.device_1.sensor_2,null> ",
// "100, <root.laptop.device_1.sensor_1,101> <root.laptop.device_1.sensor_2,102> ",
// "120, <root.laptop.device_1.sensor_1,121> <root.laptop.device_1.sensor_2,null> ",
// "140, <root.laptop.device_1.sensor_1,141> <root.laptop.device_1.sensor_2,null> "},
// + "where time <= 20 and (sensor_1 >= 60 or sensor_1 <= 110)",
// // new String[] {"20, <root.laptop.device_1.sensor_1,21> <root.laptop.device_1.sensor_2,null> "}
// new String[] {"20\t21\tnull"}
// },
// test Merge
{
"select sensor_1,sensor_2 " + "from root.laptop.device_1 "
+ "where time < 150 and sensor_1 >= 20 and time = 60",
new String[] {"60, <root.laptop.device_1.sensor_1,61> <root.laptop.device_1.sensor_2,null> "}}
});
}
private final String inputSQL;
private final String[] result;
public TestQpQuery(String inputSQL, String[] result) {
this.inputSQL = inputSQL;
this.result = result;
}
@Test
public void testQueryBasic() throws QueryProcessorException, RecognitionException, ArgsErrorException, IOException, FileNodeManagerException, ProcessorException {
LOG.info("input SQL String:{}", inputSQL);
PhysicalPlan plan = processor.parseSQLToPhysicalPlan(inputSQL);
if (!plan.isQuery())
fail();
QueryDataSet queryDataSet = processor.getExecutor().processQuery(plan);
int i = 0;
while (queryDataSet.hasNext()) {
if (i == result.length)
fail();
String actual = queryDataSet.next().toString();
assertEquals(result[i++], actual);
}
assertEquals(result.length, i);
LOG.info("Query processing complete\n");
}
}
// // TODO
// // test DNF2
//// {
////
//// "select sensor_1,sensor_2 "
//// + "from root.laptop.device_1 "
//// + "where time < 150 and (sensor_1 > 30 or time >= 60)",
//// new String[] {
//// "40, <root.laptop.device_1.sensor_1,41> <root.laptop.device_1.sensor_2,null> ",
//// "60, <root.laptop.device_1.sensor_1,61> <root.laptop.device_1.sensor_2,null> ",
//// "80, <root.laptop.device_1.sensor_1,81> <root.laptop.device_1.sensor_2,null> ",
//// "100, <root.laptop.device_1.sensor_1,101> <root.laptop.device_1.sensor_2,102> ",
//// "120, <root.laptop.device_1.sensor_1,121> <root.laptop.device_1.sensor_2,null> ",
//// "140, <root.laptop.device_1.sensor_1,141> <root.laptop.device_1.sensor_2,null> "},
//// },
// // test Merge
// {
// "select sensor_1,sensor_2 " + "from root.laptop.device_1 "
// + "where time < 150 and sensor_1 >= 20 and time = 60",
// new String[] {"60, <root.laptop.device_1.sensor_1,61> <root.laptop.device_1.sensor_2,null> "}}
// });
// }
//
// private final String inputSQL;
// private final String[] result;
//
// public TestQpQuery(String inputSQL, String[] result) {
// this.inputSQL = inputSQL;
// this.result = result;
// }
//
// //@Test
// public void testQueryBasic() throws QueryProcessorException, RecognitionException, ArgsErrorException, IOException, FileNodeManagerException, ProcessorException {
// LOG.info("input SQL String:{}", inputSQL);
// PhysicalPlan plan = processor.parseSQLToPhysicalPlan(inputSQL);
// if (!plan.isQuery())
// fail();
//
// QueryDataSet queryDataSet = processor.getExecutor().processQuery(plan);
// int i = 0;
// while (queryDataSet.hasNext()) {
// if (i == result.length)
// fail();
// String actual = queryDataSet.next().toString();
// assertEquals(result[i++], actual);
// }
//
// assertEquals(result.length, i);
// LOG.info("Query processing complete\n");
// }
//
//}