refine tsfile write interface; remote deltaType property

This commit is contained in:
xiangdong huang 2017-10-15 16:27:00 +08:00
parent e623fc218d
commit d483aebb18
24 changed files with 86 additions and 115 deletions

View File

@ -4,10 +4,9 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import cn.edu.tsinghua.tsfile.common.utils.TSRandomAccessFileWriter;
import cn.edu.tsinghua.tsfile.common.utils.ITsRandomAccessFileWriter;
import cn.edu.tsinghua.tsfile.file.metadata.RowGroupMetaData;
import cn.edu.tsinghua.tsfile.timeseries.write.io.TSFileIOWriter;
import cn.edu.tsinghua.tsfile.timeseries.write.schema.FileSchema;
/**
@ -23,8 +22,8 @@ public class BufferWriteIOWriter extends TSFileIOWriter {
private final List<RowGroupMetaData> backUpList = new ArrayList<RowGroupMetaData>();
private int lastRowGroupIndex = 0;
public BufferWriteIOWriter(FileSchema schema, TSRandomAccessFileWriter output) throws IOException {
super(schema, output);
public BufferWriteIOWriter(ITsRandomAccessFileWriter output) throws IOException {
super(output);
}
/**
@ -34,8 +33,8 @@ public class BufferWriteIOWriter extends TSFileIOWriter {
* @param rowGroups
* @throws IOException
*/
public BufferWriteIOWriter(FileSchema schema,TSRandomAccessFileWriter output, long offset,List<RowGroupMetaData> rowGroups) throws IOException{
super(schema, output,offset, rowGroups);
public BufferWriteIOWriter(ITsRandomAccessFileWriter output, long offset,List<RowGroupMetaData> rowGroups) throws IOException{
super(output,offset, rowGroups);
addrowGroupsTobackupList(rowGroups);
}
@ -51,10 +50,10 @@ public class BufferWriteIOWriter extends TSFileIOWriter {
* <b>Note that</b>,the method is not thread safe.
*/
public void addNewRowGroupMetaDataToBackUp() {
for(int i = lastRowGroupIndex;i<rowGroups.size();i++){
backUpList.add(rowGroups.get(i));
for(int i = lastRowGroupIndex;i<rowGroupMetaDatas.size();i++){
backUpList.add(rowGroupMetaDatas.get(i));
}
lastRowGroupIndex = rowGroups.size();
lastRowGroupIndex = rowGroupMetaDatas.size();
}
/**

View File

@ -35,8 +35,8 @@ import cn.edu.tsinghua.tsfile.common.conf.TSFileDescriptor;
import cn.edu.tsinghua.tsfile.common.constant.JsonFormatConstant;
import cn.edu.tsinghua.tsfile.common.utils.BytesUtils;
import cn.edu.tsinghua.tsfile.common.utils.Pair;
import cn.edu.tsinghua.tsfile.common.utils.RandomAccessOutputStream;
import cn.edu.tsinghua.tsfile.common.utils.TSRandomAccessFileWriter;
import cn.edu.tsinghua.tsfile.common.utils.TsRandomAccessFileWriter;
import cn.edu.tsinghua.tsfile.common.utils.ITsRandomAccessFileWriter;
import cn.edu.tsinghua.tsfile.file.metadata.RowGroupMetaData;
import cn.edu.tsinghua.tsfile.file.metadata.TSFileMetaData;
import cn.edu.tsinghua.tsfile.file.metadata.TimeSeriesMetadata;
@ -45,9 +45,7 @@ import cn.edu.tsinghua.tsfile.file.metadata.enums.TSDataType;
import cn.edu.tsinghua.tsfile.file.metadata.enums.TSEncoding;
import cn.edu.tsinghua.tsfile.file.utils.ReadWriteThriftFormatUtils;
import cn.edu.tsinghua.tsfile.format.FileMetaData;
import cn.edu.tsinghua.tsfile.timeseries.write.TSRecordWriteSupport;
import cn.edu.tsinghua.tsfile.timeseries.write.TSRecordWriter;
import cn.edu.tsinghua.tsfile.timeseries.write.WriteSupport;
import cn.edu.tsinghua.tsfile.timeseries.write.TsFileWriter;
import cn.edu.tsinghua.tsfile.timeseries.write.exception.WriteProcessException;
import cn.edu.tsinghua.tsfile.timeseries.write.record.DataPoint;
import cn.edu.tsinghua.tsfile.timeseries.write.record.TSRecord;
@ -123,9 +121,9 @@ public class BufferWriteProcessor extends LRUProcessor {
} else {
TSRandomAccessFileWriter outputWriter;
ITsRandomAccessFileWriter outputWriter;
try {
outputWriter = new RandomAccessOutputStream(outputFile);
outputWriter = new TsRandomAccessFileWriter(outputFile);
} catch (IOException e) {
LOGGER.error("Construct the TSRandomAccessFileWriter error, the absolutePath is {}.",
outputFile.getAbsolutePath());
@ -133,15 +131,14 @@ public class BufferWriteProcessor extends LRUProcessor {
}
try {
bufferIOWriter = new BufferWriteIOWriter(fileSchema, outputWriter);
bufferIOWriter = new BufferWriteIOWriter(outputWriter);
} catch (IOException e) {
LOGGER.error("Get the BufferWriteIOWriter error, the nameSpacePath is {}.", nameSpacePath);
throw new BufferWriteProcessorException(e);
}
WriteSupport<TSRecord> writeSupport = new TSRecordWriteSupport();
try {
recordWriter = new BufferWriteRecordWriter(TsFileConf, bufferIOWriter, writeSupport, fileSchema);
recordWriter = new BufferWriteRecordWriter(TsFileConf, bufferIOWriter, fileSchema);
} catch (WriteProcessException e) {
throw new BufferWriteProcessorException(e);
}
@ -174,7 +171,7 @@ public class BufferWriteProcessor extends LRUProcessor {
LOGGER.error("Read bufferwrite restore file failed.");
throw new BufferWriteProcessorException(e);
}
TSRandomAccessFileWriter output;
ITsRandomAccessFileWriter output;
long lastPosition = pair.left;
File lastBufferWriteFile = new File(bufferwriteOutputFilePath);
if (lastBufferWriteFile.length() != lastPosition) {
@ -191,7 +188,7 @@ public class BufferWriteProcessor extends LRUProcessor {
}
try {
// Notice: the offset is seek to end of the file by API of kr
output = new RandomAccessOutputStream(lastBufferWriteFile);
output = new TsRandomAccessFileWriter(lastBufferWriteFile);
} catch (IOException e) {
LOGGER.error("Can't construct the RandomAccessOutputStream, the outputPath is {}.",
bufferwriteOutputFilePath);
@ -200,14 +197,13 @@ public class BufferWriteProcessor extends LRUProcessor {
try {
// Notice: the parameter of lastPosition is not used beacuse of the
// API of kr
bufferIOWriter = new BufferWriteIOWriter(fileSchema, output, lastPosition, pair.right);
bufferIOWriter = new BufferWriteIOWriter(output, lastPosition, pair.right);
} catch (IOException e) {
LOGGER.error("Can't get the bufferwrite io when recovery, the nameSpacePath is {}.", nameSpacePath);
throw new BufferWriteProcessorException(e);
}
WriteSupport<TSRecord> writeSupport = new TSRecordWriteSupport();
try {
recordWriter = new BufferWriteRecordWriter(TsFileConf, bufferIOWriter, writeSupport, fileSchema);
recordWriter = new BufferWriteRecordWriter(TsFileConf, bufferIOWriter, fileSchema);
} catch (WriteProcessException e) {
throw new BufferWriteProcessorException(e);
}
@ -501,7 +497,7 @@ public class BufferWriteProcessor extends LRUProcessor {
}
convertBufferLock.readLock().lock();
try {
memData = recordWriter.query(deltaObjectId, measurementId);
memData = recordWriter.getDataInMemory(deltaObjectId, measurementId);
list = bufferIOWriter.getCurrentRowGroupMetaList(deltaObjectId);
} finally {
convertBufferLock.readLock().unlock();
@ -556,15 +552,15 @@ public class BufferWriteProcessor extends LRUProcessor {
}
}
private class BufferWriteRecordWriter extends TSRecordWriter {
private class BufferWriteRecordWriter extends TsFileWriter {
private Map<String, IRowGroupWriter> flushingRowGroupWriters;
private Set<String> flushingRowGroupSet;
private long flushingRecordCount;
BufferWriteRecordWriter(TSFileConfig conf, BufferWriteIOWriter ioFileWriter,
WriteSupport<TSRecord> writeSupport, FileSchema schema) throws WriteProcessException {
super(conf, ioFileWriter, writeSupport, schema);
FileSchema schema) throws WriteProcessException {
super(ioFileWriter, schema, conf);
}
/**
@ -688,11 +684,10 @@ public class BufferWriteProcessor extends LRUProcessor {
private void asyncFlushRowGroupToStore() throws IOException {
if (flushingRecordCount > 0) {
String deltaObjectType = schema.getDeltaType();
long totalMemStart = deltaFileWriter.getPos();
for (String deltaObjectId : flushingRowGroupSet) {
long rowGroupStart = deltaFileWriter.getPos();
deltaFileWriter.startRowGroup(flushingRecordCount, deltaObjectId, deltaObjectType);
deltaFileWriter.startRowGroup(flushingRecordCount, deltaObjectId);
IRowGroupWriter groupWriter = flushingRowGroupWriters.get(deltaObjectId);
groupWriter.flushToFileWriter(deltaFileWriter);
deltaFileWriter.endRowGroup(deltaFileWriter.getPos() - rowGroupStart);
@ -716,7 +711,6 @@ public class BufferWriteProcessor extends LRUProcessor {
flushingRecordCount = recordCount;
// reset
groupWriters = new HashMap<String, IRowGroupWriter>();
writeSupport.init(groupWriters);
schema.getDeltaObjectAppearedSet().clear();
recordCount = 0;
}

View File

@ -29,7 +29,6 @@ import cn.edu.tsinghua.iotdb.exception.BufferWriteProcessorException;
import cn.edu.tsinghua.iotdb.exception.FileNodeProcessorException;
import cn.edu.tsinghua.iotdb.exception.OverflowProcessorException;
import cn.edu.tsinghua.iotdb.exception.PathErrorException;
import cn.edu.tsinghua.iotdb.exception.ProcessorRuntimException;
import cn.edu.tsinghua.iotdb.metadata.ColumnSchema;
import cn.edu.tsinghua.iotdb.metadata.MManager;
import cn.edu.tsinghua.iotdb.query.engine.QueryForMerge;
@ -38,8 +37,8 @@ import cn.edu.tsinghua.tsfile.common.conf.TSFileDescriptor;
import cn.edu.tsinghua.tsfile.common.constant.JsonFormatConstant;
import cn.edu.tsinghua.tsfile.common.exception.ProcessorException;
import cn.edu.tsinghua.tsfile.common.utils.Pair;
import cn.edu.tsinghua.tsfile.common.utils.RandomAccessOutputStream;
import cn.edu.tsinghua.tsfile.common.utils.TSRandomAccessFileWriter;
import cn.edu.tsinghua.tsfile.common.utils.TsRandomAccessFileWriter;
import cn.edu.tsinghua.tsfile.common.utils.ITsRandomAccessFileWriter;
import cn.edu.tsinghua.tsfile.file.metadata.RowGroupMetaData;
import cn.edu.tsinghua.tsfile.file.metadata.enums.CompressionTypeName;
import cn.edu.tsinghua.tsfile.timeseries.filter.definition.FilterExpression;
@ -47,9 +46,7 @@ import cn.edu.tsinghua.tsfile.timeseries.filter.definition.SingleSeriesFilterExp
import cn.edu.tsinghua.tsfile.timeseries.read.qp.Path;
import cn.edu.tsinghua.tsfile.timeseries.read.query.DynamicOneColumnData;
import cn.edu.tsinghua.tsfile.timeseries.read.support.RowRecord;
import cn.edu.tsinghua.tsfile.timeseries.write.TSRecordWriteSupport;
import cn.edu.tsinghua.tsfile.timeseries.write.TSRecordWriter;
import cn.edu.tsinghua.tsfile.timeseries.write.WriteSupport;
import cn.edu.tsinghua.tsfile.timeseries.write.TsFileWriter;
import cn.edu.tsinghua.tsfile.timeseries.write.exception.WriteProcessException;
import cn.edu.tsinghua.tsfile.timeseries.write.io.TSFileIOWriter;
import cn.edu.tsinghua.tsfile.timeseries.write.record.DataPoint;
@ -906,10 +903,9 @@ public class FileNodeProcessor extends LRUProcessor {
Map<String, Long> startTimeMap = new HashMap<>();
Map<String, Long> endTimeMap = new HashMap<>();
TSRandomAccessFileWriter raf = null;
ITsRandomAccessFileWriter raf = null;
TSFileIOWriter tsfileIOWriter = null;
WriteSupport<TSRecord> writeSupport = null;
TSRecordWriter recordWriter = null;
TsFileWriter recordWriter = null;
String outputPath = null;
for (String deltaObjectId : backupIntervalFile.getStartTimeMap().keySet()) {
// query one deltaObjectId
@ -949,12 +945,8 @@ public class FileNodeProcessor extends LRUProcessor {
outputPath = constructOutputFilePath(nameSpacePath, firstRecord.timestamp
+ FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR + System.currentTimeMillis());
FileSchema fileSchema;
fileSchema = constructFileSchema(nameSpacePath);
raf = new RandomAccessOutputStream(new File(outputPath));
tsfileIOWriter = new TSFileIOWriter(fileSchema, raf);
writeSupport = new TSRecordWriteSupport();
recordWriter = new TSRecordWriter(TsFileConf, tsfileIOWriter, writeSupport, fileSchema);
FileSchema fileSchema = constructFileSchema(nameSpacePath);
recordWriter = new TsFileWriter(new File(outputPath), fileSchema, TsFileConf);
}
TSRecord filledRecord = removeNullTSRecord(firstRecord);

View File

@ -5,8 +5,8 @@ import java.io.IOException;
import java.io.OutputStream;
import java.io.RandomAccessFile;
import cn.edu.tsinghua.tsfile.common.utils.TSRandomAccessFileReader;
import cn.edu.tsinghua.tsfile.common.utils.TSRandomAccessFileWriter;
import cn.edu.tsinghua.tsfile.common.utils.ITsRandomAccessFileReader;
import cn.edu.tsinghua.tsfile.common.utils.ITsRandomAccessFileWriter;
/**
* The stream of bytes for overflow read and write
@ -15,7 +15,7 @@ import cn.edu.tsinghua.tsfile.common.utils.TSRandomAccessFileWriter;
* @author liukun
*
*/
public class OverflowReadWriter extends OutputStream implements TSRandomAccessFileReader, TSRandomAccessFileWriter {
public class OverflowReadWriter extends OutputStream implements ITsRandomAccessFileReader, ITsRandomAccessFileWriter {
private RandomAccessFile raf;
private final String filePath;
private final int onePassCopySize = 4 * 1024 * 1024;

View File

@ -10,7 +10,7 @@ import cn.edu.tsinghua.iotdb.qp.physical.PhysicalPlan;
import cn.edu.tsinghua.iotdb.query.engine.FilterStructure;
import cn.edu.tsinghua.tsfile.common.exception.ProcessorException;
import cn.edu.tsinghua.tsfile.common.utils.Pair;
import cn.edu.tsinghua.tsfile.common.utils.TSRandomAccessFileReader;
import cn.edu.tsinghua.tsfile.common.utils.ITsRandomAccessFileReader;
import cn.edu.tsinghua.tsfile.file.metadata.enums.TSDataType;
import cn.edu.tsinghua.tsfile.timeseries.filter.definition.FilterExpression;
import cn.edu.tsinghua.tsfile.timeseries.read.qp.Path;
@ -21,7 +21,7 @@ public class SingleFileQPExecutor extends QueryProcessExecutor {
private QueryEngine queryEngine;
public SingleFileQPExecutor(TSRandomAccessFileReader raf) throws IOException {
public SingleFileQPExecutor(ITsRandomAccessFileReader raf) throws IOException {
queryEngine = new QueryEngine(raf);
}

View File

@ -132,7 +132,6 @@ public class MergeQuerySetIterator implements Iterator<QueryDataSet> {
LinkedHashMap<String, DynamicOneColumnData> mapRet = dataSet.mapRet;
if (!mapRet.containsKey(key)) {
DynamicOneColumnData oneCol = new DynamicOneColumnData(f.dataType, true);
oneCol.setDeltaObjectType(record.deltaObjectType);
mapRet.put(key, oneCol);
}

View File

@ -16,7 +16,6 @@ public class AggregationResult {
AggregationResult(TSDataType dataType){
data = new DynamicOneColumnData(dataType, true);
data.setDeltaObjectType("Aggregation");
}
}

View File

@ -30,7 +30,6 @@ public class EngineUtils {
for (Path path : dataSet.getBatchReaderRetGenerator().retMap.keySet()) {
DynamicOneColumnData batchReadData = dataSet.getBatchReaderRetGenerator().retMap.get(path);
DynamicOneColumnData leftData = batchReadData.sub(batchReadData.curIdx);
leftData.setDeltaObjectType(batchReadData.getDeltaObjectType());
// copy batch read info from oneColRet to leftRet
batchReadData.copyFetchInfoTo(leftData);

View File

@ -6,8 +6,8 @@ import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import cn.edu.tsinghua.tsfile.common.utils.TSRandomAccessFileReader;
import cn.edu.tsinghua.tsfile.timeseries.read.LocalFileInput;
import cn.edu.tsinghua.tsfile.common.utils.ITsRandomAccessFileReader;
import cn.edu.tsinghua.tsfile.timeseries.read.TsRandomAccessLocalFileReader;
public class FileStreamManager {
@ -17,11 +17,11 @@ public class FileStreamManager {
private FileStreamManager(){
}
public TSRandomAccessFileReader getLocalRandomAccessFileReader(String path) throws FileNotFoundException{
return new LocalFileInput(path);
public ITsRandomAccessFileReader getLocalRandomAccessFileReader(String path) throws FileNotFoundException{
return new TsRandomAccessLocalFileReader(path);
}
public void closeLocalRandomAccessFileReader(LocalFileInput localFileInput) throws IOException{
public void closeLocalRandomAccessFileReader(TsRandomAccessLocalFileReader localFileInput) throws IOException{
localFileInput.close();
}
@ -32,7 +32,7 @@ public class FileStreamManager {
return instance;
}
public void closeFileStreams(TSRandomAccessFileReader raf){
public void closeFileStreams(ITsRandomAccessFileReader raf){
try {
raf.close();
} catch (IOException e) {

View File

@ -13,7 +13,7 @@ import cn.edu.tsinghua.iotdb.engine.filenode.QueryStructure;
import cn.edu.tsinghua.iotdb.exception.FileNodeManagerException;
import cn.edu.tsinghua.iotdb.query.reader.RecordReader;
import cn.edu.tsinghua.tsfile.common.exception.ProcessorException;
import cn.edu.tsinghua.tsfile.common.utils.TSRandomAccessFileReader;
import cn.edu.tsinghua.tsfile.common.utils.ITsRandomAccessFileReader;
import cn.edu.tsinghua.tsfile.timeseries.filter.definition.SingleSeriesFilterExpression;
/**
@ -84,15 +84,15 @@ public class RecordReaderFactory {
} else {
hasUnEnvelopedFile = false;
}
List<TSRandomAccessFileReader> rafList = new ArrayList<>();
List<ITsRandomAccessFileReader> rafList = new ArrayList<>();
try {
for (int i = 0; i < fileNodes.size() - 1; i++) {
IntervalFileNode fileNode = fileNodes.get(i);
TSRandomAccessFileReader raf = fileStreamManager.getLocalRandomAccessFileReader(fileNode.filePath);
ITsRandomAccessFileReader raf = fileStreamManager.getLocalRandomAccessFileReader(fileNode.filePath);
rafList.add(raf);
}
if (hasUnEnvelopedFile) {
TSRandomAccessFileReader unsealedRaf = fileStreamManager
ITsRandomAccessFileReader unsealedRaf = fileStreamManager
.getLocalRandomAccessFileReader(fileNodes.get(fileNodes.size() - 1).filePath);
// if currentPage is null, both currentPage and pageList must both are null

View File

@ -7,7 +7,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import cn.edu.tsinghua.tsfile.common.utils.TSRandomAccessFileReader;
import cn.edu.tsinghua.tsfile.common.utils.ITsRandomAccessFileReader;
import cn.edu.tsinghua.tsfile.file.metadata.RowGroupMetaData;
import cn.edu.tsinghua.tsfile.file.metadata.TSFileMetaData;
import cn.edu.tsinghua.tsfile.file.metadata.converter.TSFileMetaDataConverter;
@ -25,11 +25,11 @@ public class FileReader {
private static final int FOOTER_LENGTH = 4;
private static final int MAGIC_LENGTH = TSFileIOWriter.magicStringBytes.length;
private TSRandomAccessFileReader raf; // file pointer
private ITsRandomAccessFileReader raf; // file pointer
private ArrayList<RowGroupReader> rowGroupReaderList;
private HashMap<String, ArrayList<RowGroupReader>> rowGroupReadersMap;
FileReader(TSRandomAccessFileReader raf) throws IOException {
FileReader(ITsRandomAccessFileReader raf) throws IOException {
this.raf = raf;
init();
}
@ -40,7 +40,7 @@ public class FileReader {
* @param raf file pointer
* @param rowGroupMetaDataList RowGroupMetaDataList, no need to invoke init().
*/
FileReader(TSRandomAccessFileReader raf, List<RowGroupMetaData> rowGroupMetaDataList) {
FileReader(ITsRandomAccessFileReader raf, List<RowGroupMetaData> rowGroupMetaDataList) {
this.raf = raf;
initFromRowGroupMetadataList(rowGroupMetaDataList);
}

View File

@ -14,7 +14,7 @@ import cn.edu.tsinghua.iotdb.query.aggregation.AggregationResult;
import cn.edu.tsinghua.iotdb.query.dataset.InsertDynamicData;
import cn.edu.tsinghua.tsfile.common.exception.ProcessorException;
import cn.edu.tsinghua.tsfile.common.utils.Binary;
import cn.edu.tsinghua.tsfile.common.utils.TSRandomAccessFileReader;
import cn.edu.tsinghua.tsfile.common.utils.ITsRandomAccessFileReader;
import cn.edu.tsinghua.tsfile.encoding.decoder.Decoder;
import cn.edu.tsinghua.tsfile.file.metadata.TSDigest;
import cn.edu.tsinghua.tsfile.file.metadata.enums.CompressionTypeName;
@ -34,7 +34,7 @@ public class OverflowValueReader extends ValueReader {
private static final Logger LOG = LoggerFactory.getLogger(OverflowValueReader.class);
OverflowValueReader(long offset, long totalSize, TSDataType dataType, TSDigest digest,
TSRandomAccessFileReader raf, List<String> enumValues, CompressionTypeName compressionTypeName,
ITsRandomAccessFileReader raf, List<String> enumValues, CompressionTypeName compressionTypeName,
long rowNums) {
super(offset, totalSize, dataType, digest, raf, enumValues, compressionTypeName, rowNums);
}

View File

@ -5,9 +5,9 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import cn.edu.tsinghua.tsfile.common.utils.TSRandomAccessFileReader;
import cn.edu.tsinghua.tsfile.common.utils.ITsRandomAccessFileReader;
import cn.edu.tsinghua.tsfile.file.metadata.RowGroupMetaData;
import cn.edu.tsinghua.tsfile.timeseries.read.LocalFileInput;
import cn.edu.tsinghua.tsfile.timeseries.read.TsRandomAccessLocalFileReader;
/**
@ -21,7 +21,7 @@ public class ReaderManager {
private List<FileReader> fileReaderList;
private List<TSRandomAccessFileReader> rafList; // file has been serialized, sealed
private List<ITsRandomAccessFileReader> rafList; // file has been serialized, sealed
private HashMap<String, List<RowGroupReader>> rowGroupReaderMap;
private List<RowGroupReader> rowGroupReaderList;
@ -33,13 +33,13 @@ public class ReaderManager {
* @param rafList fileInputStreamList
* @throws IOException
*/
ReaderManager(List<TSRandomAccessFileReader> rafList) throws IOException {
ReaderManager(List<ITsRandomAccessFileReader> rafList) throws IOException {
this.rafList = rafList;
rowGroupReaderList = new ArrayList<>();
rowGroupReaderMap = new HashMap<>();
fileReaderList = new ArrayList<>();
for (TSRandomAccessFileReader taf : rafList) {
for (ITsRandomAccessFileReader taf : rafList) {
FileReader fr = new FileReader(taf);
fileReaderList.add(fr);
addRowGroupReadersToMap(fr);
@ -55,8 +55,8 @@ public class ReaderManager {
* @param rowGroupMetadataList RowGroupMetadata List for unsealedFile
* @throws IOException
*/
ReaderManager(List<TSRandomAccessFileReader> rafList,
TSRandomAccessFileReader unsealedFileReader, List<RowGroupMetaData> rowGroupMetadataList) throws IOException {
ReaderManager(List<ITsRandomAccessFileReader> rafList,
ITsRandomAccessFileReader unsealedFileReader, List<RowGroupMetaData> rowGroupMetadataList) throws IOException {
this(rafList);
this.rafList.add(unsealedFileReader);
@ -100,9 +100,9 @@ public class ReaderManager {
* @throws IOException
*/
public void close() throws IOException {
for (TSRandomAccessFileReader raf : rafList) {
if (raf instanceof LocalFileInput) {
((LocalFileInput) raf).closeFromManager();
for (ITsRandomAccessFileReader raf : rafList) {
if (raf instanceof TsRandomAccessLocalFileReader) {
((TsRandomAccessLocalFileReader) raf).closeFromManager();
} else {
raf.close();
}

View File

@ -17,7 +17,7 @@ import org.slf4j.LoggerFactory;
import cn.edu.tsinghua.iotdb.query.dataset.InsertDynamicData;
import cn.edu.tsinghua.tsfile.common.exception.ProcessorException;
import cn.edu.tsinghua.tsfile.common.exception.UnSupportedDataTypeException;
import cn.edu.tsinghua.tsfile.common.utils.TSRandomAccessFileReader;
import cn.edu.tsinghua.tsfile.common.utils.ITsRandomAccessFileReader;
import cn.edu.tsinghua.tsfile.file.metadata.RowGroupMetaData;
import cn.edu.tsinghua.tsfile.file.metadata.enums.CompressionTypeName;
import cn.edu.tsinghua.tsfile.file.metadata.enums.TSDataType;
@ -50,7 +50,7 @@ public class RecordReader {
* @param rafList bufferwrite file has been serialized completely
* @throws IOException file error
*/
public RecordReader(List<TSRandomAccessFileReader> rafList, String deltaObjectUID, String measurementID, int lockToken,
public RecordReader(List<ITsRandomAccessFileReader> rafList, String deltaObjectUID, String measurementID, int lockToken,
DynamicOneColumnData insertPageInMemory, List<ByteArrayInputStream> bufferWritePageList, CompressionTypeName compressionTypeName,
List<Object> overflowInfo) throws IOException {
this.readerManager = new ReaderManager(rafList);
@ -69,7 +69,7 @@ public class RecordReader {
* @param rowGroupMetadataList unsealed RowGroupMetadataList to construct unsealedFileReader
* @throws IOException file error
*/
public RecordReader(List<TSRandomAccessFileReader> rafList, TSRandomAccessFileReader unsealedFileReader,
public RecordReader(List<ITsRandomAccessFileReader> rafList, ITsRandomAccessFileReader unsealedFileReader,
List<RowGroupMetaData> rowGroupMetadataList, String deltaObjectUID, String measurementID, int lockToken,
DynamicOneColumnData insertPageInMemory, List<ByteArrayInputStream> bufferWritePageList, CompressionTypeName compressionTypeName,
List<Object> overflowInfo) throws IOException {
@ -105,7 +105,6 @@ public class RecordReader {
if (rowGroupReader.getValueReaders().containsKey(sensorId)) {
res = rowGroupReader.getValueReaders().get(sensorId)
.getValuesWithOverFlow(updateTrue, updateFalse, insertMemoryData, timeFilter, null, valueFilter, res, fetchSize);
res.setDeltaObjectType(rowGroupReader.getDeltaObjectType());
if (res.valueLength >= fetchSize) {
res.hasReadAll = false;
return res;
@ -148,7 +147,6 @@ public class RecordReader {
res = rowGroupReader.getValueReaders().get(sensorId)
.getValuesWithOverFlow(updateTrue, updateFalse, insertMemoryData, timeFilter, freqFilter, valueFilter, res,
fetchSize);
res.setDeltaObjectType(rowGroupReader.getDeltaObjectType());
if (res.valueLength >= fetchSize) {
res.hasReadAll = false;
return res;
@ -173,7 +171,6 @@ public class RecordReader {
try {
TSDataType type = MManager.getInstance().getSeriesType(fullPath);
DynamicOneColumnData res = new DynamicOneColumnData(type, true);
res.setDeltaObjectType(MManager.getInstance().getDeltaObjectTypeByPath(fullPath));
return res;
} catch (PathErrorException e) {
throw new ProcessorException(e.getMessage());
@ -277,10 +274,8 @@ public class RecordReader {
DynamicOneColumnData oldRes = getValuesUseTimestamps(deviceUID, sensorId, timestamps);
if (oldRes == null) {
oldRes = new DynamicOneColumnData(dataType, true);
oldRes.setDeltaObjectType(deviceType);
}
DynamicOneColumnData res = new DynamicOneColumnData(dataType, true);
res.setDeltaObjectType(deviceType);
// the timestamps of timeData is eventual, its has conclude the value of insertMemory.
int oldResIdx = 0;
@ -331,7 +326,6 @@ public class RecordReader {
RowGroupReader rowGroupReader = rowGroupReaderList.get(i);
if (i == 0) {
res = rowGroupReader.readValueUseTimestamps(measurementUId, timestamps);
res.setDeltaObjectType(rowGroupReader.getDeltaObjectType());
} else {
DynamicOneColumnData tmpRes = rowGroupReader.readValueUseTimestamps(measurementUId, timestamps);
res.mergeRecord(tmpRes);

View File

@ -8,7 +8,7 @@ import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import cn.edu.tsinghua.tsfile.common.utils.TSRandomAccessFileReader;
import cn.edu.tsinghua.tsfile.common.utils.ITsRandomAccessFileReader;
import cn.edu.tsinghua.tsfile.file.metadata.RowGroupMetaData;
import cn.edu.tsinghua.tsfile.file.metadata.TimeSeriesChunkMetaData;
import cn.edu.tsinghua.tsfile.file.metadata.enums.TSDataType;
@ -26,9 +26,9 @@ public class RowGroupReader {
protected ArrayList<String> sids;
private long totalByteSize;
protected TSRandomAccessFileReader raf;
protected ITsRandomAccessFileReader raf;
public RowGroupReader(RowGroupMetaData rowGroupMetaData, TSRandomAccessFileReader raf) {
public RowGroupReader(RowGroupMetaData rowGroupMetaData, ITsRandomAccessFileReader raf) {
Logger.debug("init a new RowGroupReader..");
seriesTypeMap = new HashMap<>();
deltaObjectUID = rowGroupMetaData.getDeltaObjectUID();
@ -117,11 +117,11 @@ public class RowGroupReader {
this.valueReaders = valueReaders;
}
public TSRandomAccessFileReader getRaf() {
public ITsRandomAccessFileReader getRaf() {
return raf;
}
public void setRaf(TSRandomAccessFileReader raf) {
public void setRaf(ITsRandomAccessFileReader raf) {
this.raf = raf;
}

View File

@ -64,7 +64,7 @@ public class Utils {
for(int i = 0 ; i < dynamicOneColumnData.timeLength; i ++){
timeRetList.add(dynamicOneColumnData.getTime(i));
}
TSDynamicOneColumnData tsDynamicOneColumnData = new TSDynamicOneColumnData(dynamicOneColumnData.getDeltaObjectType(), dynamicOneColumnData.dataType.toString(), dynamicOneColumnData.valueLength, timeRetList);
TSDynamicOneColumnData tsDynamicOneColumnData = new TSDynamicOneColumnData("", dynamicOneColumnData.dataType.toString(), dynamicOneColumnData.valueLength, timeRetList);
switch (dynamicOneColumnData.dataType) {
case BOOLEAN:

View File

@ -25,8 +25,8 @@ import cn.edu.tsinghua.iotdb.sys.writelog.WriteLogManager;
import cn.edu.tsinghua.tsfile.common.conf.TSFileConfig;
import cn.edu.tsinghua.tsfile.common.conf.TSFileDescriptor;
import cn.edu.tsinghua.tsfile.common.utils.Pair;
import cn.edu.tsinghua.tsfile.common.utils.RandomAccessOutputStream;
import cn.edu.tsinghua.tsfile.common.utils.TSRandomAccessFileWriter;
import cn.edu.tsinghua.tsfile.common.utils.TsRandomAccessFileWriter;
import cn.edu.tsinghua.tsfile.common.utils.ITsRandomAccessFileWriter;
import cn.edu.tsinghua.tsfile.file.metadata.RowGroupMetaData;
import cn.edu.tsinghua.tsfile.file.metadata.enums.CompressionTypeName;
import cn.edu.tsinghua.tsfile.file.metadata.enums.TSDataType;
@ -182,7 +182,7 @@ public class BufferWriteProcessorTest {
// write some bytes in the outputfile and test cuf off function
File dir = new File(nsp);
File outFile = new File(dir, filename);
TSRandomAccessFileWriter raf = new RandomAccessOutputStream(outFile);
ITsRandomAccessFileWriter raf = new TsRandomAccessFileWriter(outFile);
raf.seek(outFile.length());
byte[] buff = new byte[100];
Arrays.fill(buff, (byte) 10);

View File

@ -21,7 +21,7 @@ import cn.edu.tsinghua.iotdb.metadata.MManager;
import cn.edu.tsinghua.iotdb.sys.writelog.WriteLogManager;
import cn.edu.tsinghua.tsfile.common.conf.TSFileConfig;
import cn.edu.tsinghua.tsfile.common.conf.TSFileDescriptor;
import cn.edu.tsinghua.tsfile.common.utils.RandomAccessOutputStream;
import cn.edu.tsinghua.tsfile.common.utils.TsRandomAccessFileWriter;
import cn.edu.tsinghua.tsfile.file.metadata.enums.TSDataType;
import cn.edu.tsinghua.tsfile.timeseries.read.query.DynamicOneColumnData;
@ -136,7 +136,7 @@ public class OverflowProcessorTest {
assertEquals(true, new File(overflowfilePath).exists());
assertEquals(true, new File(overflowrestorefilePath).exists());
// add bytes in the tail of the file
RandomAccessOutputStream ras = new RandomAccessOutputStream(new File(overflowfilePath));
TsRandomAccessFileWriter ras = new TsRandomAccessFileWriter(new File(overflowfilePath));
ras.seek(ras.getPos());
ras.write(new byte[10]);
ras.close();

View File

@ -10,7 +10,7 @@ import org.junit.Before;
import org.junit.Test;
import cn.edu.tsinghua.iotdb.engine.overflow.utils.TSFileMetaDataConverter;
import cn.edu.tsinghua.tsfile.common.utils.RandomAccessOutputStream;
import cn.edu.tsinghua.tsfile.common.utils.TsRandomAccessFileWriter;
import cn.edu.tsinghua.tsfile.file.metadata.TimeSeriesChunkMetaData;
import cn.edu.tsinghua.tsfile.file.metadata.enums.CompressionTypeName;
import cn.edu.tsinghua.tsfile.file.metadata.enums.TSChunkType;
@ -296,7 +296,7 @@ public class OverflowMetaDataTest {
}
FileOutputStream fos = new FileOutputStream(file);
RandomAccessOutputStream out = new RandomAccessOutputStream(file, "rw");
TsRandomAccessFileWriter out = new TsRandomAccessFileWriter(file, "rw");
Utils.write(tsfOFFileMetadata.convertToThrift(), out.getOutputStream());
out.close();

View File

@ -11,7 +11,7 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import cn.edu.tsinghua.tsfile.common.utils.RandomAccessOutputStream;
import cn.edu.tsinghua.tsfile.common.utils.TsRandomAccessFileWriter;
import cn.edu.tsinghua.tsfile.file.metadata.TInTimeSeriesChunkMetaData;
import cn.edu.tsinghua.tsfile.file.metadata.enums.TSDataType;
import cn.edu.tsinghua.tsfile.file.metadata.enums.TSFreqType;
@ -53,7 +53,7 @@ public class TInTimeSeriesChunkMetaDataTest {
if (file.exists())
file.delete();
FileOutputStream fos = new FileOutputStream(file);
RandomAccessOutputStream out = new RandomAccessOutputStream(file, "rw");
TsRandomAccessFileWriter out = new TsRandomAccessFileWriter(file, "rw");
Utils.write(metaData.convertToThrift(), out.getOutputStream());
out.close();

View File

@ -10,7 +10,7 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import cn.edu.tsinghua.tsfile.common.utils.RandomAccessOutputStream;
import cn.edu.tsinghua.tsfile.common.utils.TsRandomAccessFileWriter;
import cn.edu.tsinghua.tsfile.file.metadata.TInTimeSeriesChunkMetaData;
import cn.edu.tsinghua.tsfile.file.metadata.TimeSeriesChunkMetaData;
import cn.edu.tsinghua.tsfile.file.metadata.VInTimeSeriesChunkMetaData;
@ -49,7 +49,7 @@ public class TimeSeriesChunkMetaDataTest {
if (file.exists())
file.delete();
FileOutputStream fos = new FileOutputStream(file);
RandomAccessOutputStream out = new RandomAccessOutputStream(file, "rw");
TsRandomAccessFileWriter out = new TsRandomAccessFileWriter(file, "rw");
Utils.write(metaData.convertToThrift(), out.getOutputStream());
out.close();

View File

@ -75,11 +75,6 @@ public class Utils {
assertTrue(
timeSeriesInTSF.getMeasurementUID().equals(timeSeriesInThrift.getMeasurement_uid()));
}
if (Utils.isTwoObjectsNotNULL(timeSeriesInTSF.getDeltaObjectType(),
timeSeriesInThrift.getDelta_object_type(), "device type")) {
assertTrue(
timeSeriesInTSF.getDeltaObjectType().equals(timeSeriesInThrift.getDelta_object_type()));
}
assertTrue(timeSeriesInTSF.getTypeLength() == timeSeriesInThrift.getType_length());
if (Utils.isTwoObjectsNotNULL(timeSeriesInTSF.getType(), timeSeriesInThrift.getType(),
"data type")) {

View File

@ -11,7 +11,7 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import cn.edu.tsinghua.tsfile.common.utils.RandomAccessOutputStream;
import cn.edu.tsinghua.tsfile.common.utils.TsRandomAccessFileWriter;
import cn.edu.tsinghua.tsfile.file.metadata.TSDigest;
import cn.edu.tsinghua.tsfile.file.metadata.VInTimeSeriesChunkMetaData;
import cn.edu.tsinghua.tsfile.file.metadata.enums.TSDataType;
@ -46,7 +46,7 @@ public class VInTimeSeriesChunkMetaDataTest {
if (file.exists())
file.delete();
FileOutputStream fos = new FileOutputStream(file);
RandomAccessOutputStream out = new RandomAccessOutputStream(file, "rw");
TsRandomAccessFileWriter out = new TsRandomAccessFileWriter(file, "rw");
Utils.write(metaData.convertToThrift(), out.getOutputStream());
out.close();

View File

@ -21,7 +21,7 @@ import cn.edu.tsinghua.iotdb.qp.exception.QueryProcessorException;
import cn.edu.tsinghua.iotdb.qp.executor.QueryProcessExecutor;
import cn.edu.tsinghua.iotdb.qp.executor.SingleFileQPExecutor;
import cn.edu.tsinghua.iotdb.qp.physical.PhysicalPlan;
import cn.edu.tsinghua.tsfile.timeseries.read.LocalFileInput;
import cn.edu.tsinghua.tsfile.timeseries.read.TsRandomAccessLocalFileReader;
import cn.edu.tsinghua.tsfile.timeseries.read.query.QueryDataSet;
/**
@ -41,7 +41,7 @@ public class TestSingleFileQpQuery {
if(!file.exists())
exec = null;
else {
exec = new SingleFileQPExecutor(new LocalFileInput("src/test/resources/testMultiDeviceMerge.tsfile"));
exec = new SingleFileQPExecutor(new TsRandomAccessLocalFileReader("src/test/resources/testMultiDeviceMerge.tsfile"));
processor = new QueryProcessor(exec);
}
}