[IOTDB-234] Refactor TsFile storage on HDFS (#417)

* Refactor TsFile storage on HDFS
This commit is contained in:
Zesong Sun 2019-10-18 21:21:11 +08:00 committed by Jiang Tian
parent 1c7b0b3c3a
commit 20b2b9119f
46 changed files with 846 additions and 357 deletions

View File

@ -18,7 +18,6 @@
*/
package org.apache.iotdb.tsfile;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
@ -36,7 +35,7 @@ import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex;
import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.fileSystem.TSFileFactory;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.reader.page.PageReader;
@ -49,7 +48,7 @@ public class TsFileSequenceRead {
filename = args[0];
}
TsFileSequenceReader reader = new TsFileSequenceReader(filename);
System.out.println("file length: " + TSFileFactory.INSTANCE.getFile(filename).length());
System.out.println("file length: " + FSFactoryProducer.getFSFactory().getFile(filename).length());
System.out.println("file magic head: " + reader.readHeadMagic());
System.out.println("file magic tail: " + reader.readTailMagic());
System.out.println("Level 1 metadata position: " + reader.getFileMetadataPos());

View File

@ -24,7 +24,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import java.io.File;
import org.apache.iotdb.tsfile.fileSystem.TSFileFactory;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.write.TsFileWriter;
import org.apache.iotdb.tsfile.write.schema.Schema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@ -38,7 +38,7 @@ public class TsFileWriteWithRowBatch {
public static void main(String[] args) {
try {
String path = "test.tsfile";
File f = TSFileFactory.INSTANCE.getFile(path);
File f = FSFactoryProducer.getFSFactory().getFile(path);
if (f.exists()) {
f.delete();
}

View File

@ -22,7 +22,7 @@ package org.apache.iotdb.tsfile;
import java.io.File;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.fileSystem.TSFileFactory;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.write.TsFileWriter;
import org.apache.iotdb.tsfile.write.record.TSRecord;
import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
@ -38,7 +38,7 @@ public class TsFileWriteWithTSRecord {
public static void main(String args[]) {
try {
String path = "test.tsfile";
File f = TSFileFactory.INSTANCE.getFile(path);
File f = FSFactoryProducer.getFSFactory().getFile(path);
if (f.exists()) {
f.delete();
}

View File

@ -29,8 +29,13 @@
</parent>
<artifactId>hadoop-tsfile</artifactId>
<packaging>jar</packaging>
<name>IoTDB Hadoop-TsFile</name>
<name>IoTDB Hadoop-Tsfile</name>
<url>http://maven.apache.org</url>
<properties>
<hadoop.test.skip>false</hadoop.test.skip>
<hadoop.it.skip>${hadoop.test.skip}</hadoop.it.skip>
<hadoop.ut.skip>${hadoop.test.skip}</hadoop.ut.skip>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.iotdb</groupId>
@ -42,4 +47,55 @@
<artifactId>hadoop-client</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<!--using `mvn test` to run UT, `mvn verify` to run ITs
Reference: https://antoniogoncalves.org/2012/12/13/lets-turn-integration-tests-with-maven-to-a-first-class-citizen/-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<skipTests>${hadoop.ut.skip}</skipTests>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<executions>
<execution>
<id>run-integration-tests</id>
<phase>integration-test</phase>
<goals>
<goal>integration-test</goal>
<goal>verify</goal>
</goals>
</execution>
</executions>
<configuration>
<skipTests>${hadoop.test.skip}</skipTests>
<skipITs>${hadoop.it.skip}</skipITs>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.1.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<!-- this is used for inheritance merges -->
<phase>package</phase>
<!-- bind to the packaging phase -->
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -19,10 +19,16 @@
package org.apache.iotdb.tsfile.fileSystem;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileFilter;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
@ -32,6 +38,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -219,6 +226,70 @@ public class HDFSFile extends File {
}
}
public BufferedReader getBufferedReader(String filePath) {
try {
return new BufferedReader(new InputStreamReader(fs.open(new Path(filePath))));
} catch (IOException e) {
logger.error("Failed to get buffered reader for {}. ", filePath, e);
return null;
}
}
public BufferedWriter getBufferedWriter(String filePath, boolean append) {
try {
return new BufferedWriter(new OutputStreamWriter(fs.create(new Path(filePath))));
} catch (IOException e) {
logger.error("Failed to get buffered writer for {}. ", filePath, e);
return null;
}
}
public BufferedInputStream getBufferedInputStream(String filePath) {
try {
return new BufferedInputStream(fs.open(new Path(filePath)));
} catch (IOException e) {
logger.error("Failed to get buffered input stream for {}. ", filePath, e);
return null;
}
}
public BufferedOutputStream getBufferedOutputStream(String filePath) {
try {
return new BufferedOutputStream(fs.create(new Path(filePath)));
} catch (IOException e) {
logger.error("Failed to get buffered output stream for {}. ", filePath, e);
return null;
}
}
public File[] listFilesBySuffix(String fileFolder, String suffix) {
PathFilter pathFilter = path -> path.toUri().toString().endsWith(suffix);
List<HDFSFile> files = listFiles(fileFolder, pathFilter);
return files.toArray(new HDFSFile[files.size()]);
}
public File[] listFilesByPrefix(String fileFolder, String prefix) {
PathFilter pathFilter = path -> path.toUri().toString().startsWith(prefix);
List<HDFSFile> files = listFiles(fileFolder, pathFilter);
return files.toArray(new HDFSFile[files.size()]);
}
private List<HDFSFile> listFiles(String fileFolder, PathFilter pathFilter) {
List<HDFSFile> files = new ArrayList<>();
try {
Path path = new Path(fileFolder);
for (FileStatus fileStatus : fs.listStatus(path)) {
Path filePath = fileStatus.getPath();
if (pathFilter.accept(filePath)) {
HDFSFile file = new HDFSFile(filePath.toUri().toString());
files.add(file);
}
}
} catch (IOException e) {
logger.error("Failed to list files in {}. ", fileFolder);
}
return files;
}
@Override
public String getParent() {

View File

@ -27,8 +27,8 @@ import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
import org.apache.iotdb.tsfile.fileSystem.HDFSInput;
import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -43,7 +43,8 @@ import java.util.stream.Collectors;
/**
* @author Yuan Tian
*/
public class TSFInputFormat extends FileInputFormat<NullWritable, MapWritable> {
public class
TSFInputFormat extends FileInputFormat<NullWritable, MapWritable> {
/**
* key to configure whether reading time enable

View File

@ -23,8 +23,8 @@ import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.fileSystem.HDFSInput;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.read.ReadOnlyTsFile;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Field;

View File

@ -22,8 +22,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.apache.iotdb.tsfile.fileSystem.HDFSOutput;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.apache.iotdb.tsfile.hadoop.record.HDFSTSRecord;
import org.apache.iotdb.tsfile.write.TsFileWriter;
import org.apache.iotdb.tsfile.write.schema.Schema;

View File

@ -58,7 +58,7 @@
<module>spark-iotdb-connector</module>
<module>distribution</module>
</modules>
<!-- Properties Management -->
<!-- Properties Management -->
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
@ -404,7 +404,6 @@
<generateBackupPoms>false</generateBackupPoms>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>

View File

@ -25,7 +25,7 @@ import java.util.List;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.directories.strategy.DirectoryStrategy;
import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
import org.apache.iotdb.tsfile.fileSystem.TSFileFactory;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -75,7 +75,7 @@ public class DirectoryManager {
private void mkDataDirs(List<String> folders) {
for (String folder : folders) {
File file = TSFileFactory.INSTANCE.getFile(folder);
File file = FSFactoryProducer.getFSFactory().getFile(folder);
if (file.mkdirs()) {
logger.info("folder {} doesn't exist, create it", file.getPath());
} else {

View File

@ -21,7 +21,6 @@ package org.apache.iotdb.db.engine.fileSystem;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.tsfile.fileSystem.FSType;
import org.apache.iotdb.tsfile.fileSystem.HDFSFile;
import java.io.File;
import java.net.URI;

View File

@ -24,11 +24,10 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.resourceRelated.CachedUnseqResourceMergeReader;
import org.apache.iotdb.db.utils.MergeUtils;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.fileSystem.TSFileFactory;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.common.Path;
@ -99,7 +98,7 @@ public class MergeResource {
public RestorableTsFileIOWriter getMergeFileWriter(TsFileResource resource) throws IOException {
RestorableTsFileIOWriter writer = fileWriterCache.get(resource);
if (writer == null) {
writer = new RestorableTsFileIOWriter(TSFileFactory.INSTANCE
writer = new RestorableTsFileIOWriter(FSFactoryProducer.getFSFactory()
.getFile(resource.getFile().getPath() + MERGE_SUFFIX));
fileWriterCache.put(resource, writer);
}

View File

@ -27,7 +27,7 @@ import java.util.List;
import org.apache.iotdb.db.engine.modification.io.LocalTextModificationAccessor;
import org.apache.iotdb.db.engine.modification.io.ModificationReader;
import org.apache.iotdb.db.engine.modification.io.ModificationWriter;
import org.apache.iotdb.tsfile.fileSystem.TSFileFactory;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
/**
* ModificationFile stores the Modifications of a TsFile or unseq file in another file in the same
@ -122,7 +122,7 @@ public class ModificationFile {
public void remove() throws IOException {
close();
TSFileFactory.INSTANCE.getFile(filePath).delete();
FSFactoryProducer.getFSFactory().getFile(filePath).delete();
}
}

View File

@ -21,7 +21,7 @@ package org.apache.iotdb.db.engine.modification.io;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.tsfile.fileSystem.TSFileFactory;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.common.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -57,14 +57,14 @@ public class LocalTextModificationAccessor implements ModificationReader, Modifi
@Override
public Collection<Modification> read() {
if (!TSFileFactory.INSTANCE.getFile(filePath).exists()) {
if (!FSFactoryProducer.getFSFactory().getFile(filePath).exists()) {
logger.debug("No modification has been written to this file");
return new ArrayList<>();
}
String line;
List<Modification> modificationList = new ArrayList<>();
try(BufferedReader reader = TSFileFactory.INSTANCE.getBufferedReader(filePath)) {
try(BufferedReader reader = FSFactoryProducer.getFSFactory().getBufferedReader(filePath)) {
while ((line = reader.readLine()) != null) {
if (line.equals(ABORT_MARK) && !modificationList.isEmpty()) {
modificationList.remove(modificationList.size() - 1);
@ -90,7 +90,7 @@ public class LocalTextModificationAccessor implements ModificationReader, Modifi
@Override
public void abort() throws IOException {
if (writer == null) {
writer = TSFileFactory.INSTANCE.getBufferedWriter(filePath, true);
writer = FSFactoryProducer.getFSFactory().getBufferedWriter(filePath, true);
}
writer.write(ABORT_MARK);
writer.newLine();
@ -100,7 +100,7 @@ public class LocalTextModificationAccessor implements ModificationReader, Modifi
@Override
public void write(Modification mod) throws IOException {
if (writer == null) {
writer = TSFileFactory.INSTANCE.getBufferedWriter(filePath, true);
writer = FSFactoryProducer.getFSFactory().getBufferedWriter(filePath, true);
}
writer.write(encodeModification(mod));
writer.newLine();

View File

@ -75,7 +75,8 @@ import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.tsfile.fileSystem.TSFileFactory;
import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.schema.Schema;
@ -177,6 +178,7 @@ public class StorageGroupProcessor {
private LinkedList<String> lruForSensorUsedInQuery = new LinkedList<>();
private static final int MAX_CACHE_SENSORS = 5000;
private FSFactory fsFactory = FSFactoryProducer.getFSFactory();
public StorageGroupProcessor(String systemInfoDir, String storageGroupName)
throws ProcessorException {
@ -241,7 +243,7 @@ public class StorageGroupProcessor {
private List<TsFileResource> getAllFiles(List<String> folders) throws IOException {
List<File> tsFiles = new ArrayList<>();
for (String baseDir : folders) {
File fileFolder = TSFileFactory.INSTANCE.getFile(baseDir, storageGroupName);
File fileFolder = fsFactory.getFile(baseDir, storageGroupName);
if (!fileFolder.exists()) {
continue;
}
@ -254,7 +256,7 @@ public class StorageGroupProcessor {
continueFailedRenames(fileFolder, MERGE_SUFFIX);
Collections.addAll(tsFiles,
TSFileFactory.INSTANCE.listFilesBySuffix(fileFolder.getAbsolutePath(), TSFILE_SUFFIX));
fsFactory.listFilesBySuffix(fileFolder.getAbsolutePath(), TSFILE_SUFFIX));
}
tsFiles.sort(this::compareFileName);
List<TsFileResource> ret = new ArrayList<>();
@ -263,10 +265,10 @@ public class StorageGroupProcessor {
}
private void continueFailedRenames(File fileFolder, String suffix) {
File[] files = TSFileFactory.INSTANCE.listFilesBySuffix(fileFolder.getAbsolutePath(), suffix);
File[] files = fsFactory.listFilesBySuffix(fileFolder.getAbsolutePath(), suffix);
if (files != null) {
for (File tempResource : files) {
File originResource = TSFileFactory.INSTANCE.getFile(tempResource.getPath().replace(suffix, ""));
File originResource = fsFactory.getFile(tempResource.getPath().replace(suffix, ""));
if (originResource.exists()) {
tempResource.delete();
} else {
@ -490,17 +492,17 @@ public class StorageGroupProcessor {
} else {
baseDir = DirectoryManager.getInstance().getNextFolderForUnSequenceFile();
}
TSFileFactory.INSTANCE.getFile(baseDir, storageGroupName).mkdirs();
fsFactory.getFile(baseDir, storageGroupName).mkdirs();
String filePath = baseDir + File.separator + storageGroupName + File.separator +
System.currentTimeMillis() + "-" + versionController.nextVersion() + TSFILE_SUFFIX;
if (sequence) {
return new TsFileProcessor(storageGroupName, TSFileFactory.INSTANCE.getFile(filePath),
return new TsFileProcessor(storageGroupName, fsFactory.getFile(filePath),
schema, versionController, this::closeUnsealedTsFileProcessor,
this::updateLatestFlushTimeCallback, sequence);
} else {
return new TsFileProcessor(storageGroupName, TSFileFactory.INSTANCE.getFile(filePath),
return new TsFileProcessor(storageGroupName, fsFactory.getFile(filePath),
schema, versionController, this::closeUnsealedTsFileProcessor,
() -> true, sequence);
}
@ -558,7 +560,7 @@ public class StorageGroupProcessor {
List<String> folder = DirectoryManager.getInstance().getAllSequenceFileFolders();
folder.addAll(DirectoryManager.getInstance().getAllUnSequenceFileFolders());
for (String tsfilePath : folder) {
File storageGroupFolder = TSFileFactory.INSTANCE.getFile(tsfilePath, storageGroupName);
File storageGroupFolder = fsFactory.getFile(tsfilePath, storageGroupName);
if (storageGroupFolder.exists()) {
try {
FileUtils.deleteDirectory(storageGroupFolder);

View File

@ -29,7 +29,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.fileSystem.TSFileFactory;
import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
public class TsFileResource {
@ -68,6 +69,8 @@ public class TsFileResource {
private ReentrantReadWriteLock mergeQueryLock = new ReentrantReadWriteLock();
private FSFactory fsFactory = FSFactoryProducer.getFSFactory();
public TsFileResource(File file) {
this.file = file;
this.startTimeMap = new ConcurrentHashMap<>();
@ -104,7 +107,7 @@ public class TsFileResource {
}
public void serialize() throws IOException {
try (OutputStream outputStream = TSFileFactory.INSTANCE.getBufferedOutputStream(
try (OutputStream outputStream = fsFactory.getBufferedOutputStream(
file + RESOURCE_SUFFIX + TEMP_SUFFIX)) {
ReadWriteIOUtils.write(this.startTimeMap.size(), outputStream);
for (Entry<String, Long> entry : this.startTimeMap.entrySet()) {
@ -117,14 +120,14 @@ public class TsFileResource {
ReadWriteIOUtils.write(entry.getValue(), outputStream);
}
}
File src = TSFileFactory.INSTANCE.getFile(file + RESOURCE_SUFFIX + TEMP_SUFFIX);
File dest = TSFileFactory.INSTANCE.getFile(file + RESOURCE_SUFFIX);
File src = fsFactory.getFile(file + RESOURCE_SUFFIX + TEMP_SUFFIX);
File dest = fsFactory.getFile(file + RESOURCE_SUFFIX);
dest.delete();
TSFileFactory.INSTANCE.moveFile(src, dest);
fsFactory.moveFile(src, dest);
}
public void deSerialize() throws IOException {
try (InputStream inputStream = TSFileFactory.INSTANCE.getBufferedInputStream(
try (InputStream inputStream = fsFactory.getBufferedInputStream(
file + RESOURCE_SUFFIX)) {
int size = ReadWriteIOUtils.readInt(inputStream);
Map<String, Long> startTimes = new HashMap<>();
@ -160,7 +163,7 @@ public class TsFileResource {
}
public boolean fileExists() {
return TSFileFactory.INSTANCE.getFile(file + RESOURCE_SUFFIX).exists();
return fsFactory.getFile(file + RESOURCE_SUFFIX).exists();
}
public void forceUpdateEndTime(String device, long time) {
@ -235,8 +238,8 @@ public class TsFileResource {
public void remove() {
file.delete();
TSFileFactory.INSTANCE.getFile(file.getPath() + RESOURCE_SUFFIX).delete();
TSFileFactory.INSTANCE.getFile(file.getPath() + ModificationFile.FILE_SUFFIX).delete();
fsFactory.getFile(file.getPath() + RESOURCE_SUFFIX).delete();
fsFactory.getFile(file.getPath() + ModificationFile.FILE_SUFFIX).delete();
}
@Override

View File

@ -27,7 +27,7 @@ import java.util.Comparator;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.qp.constant.DatetimeUtils;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.tsfile.fileSystem.TSFileFactory;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
/**
* this tool can analyze the tsfile.resource files from a folder.
@ -41,7 +41,7 @@ public class TsFileResourcePrinter {
folder = args[0];
}
File folderFile = SystemFileFactory.INSTANCE.getFile(folder);
File[] files = TSFileFactory.INSTANCE.listFilesBySuffix(folderFile.getAbsolutePath(), ".tsfile.resource");
File[] files = FSFactoryProducer.getFSFactory().listFilesBySuffix(folderFile.getAbsolutePath(), ".tsfile.resource");
Arrays.sort(files, Comparator.comparingLong(x -> Long.valueOf(x.getName().split("-")[0])));
for (File file : files) {

View File

@ -35,7 +35,7 @@ import org.apache.iotdb.tsfile.file.metadata.TsDigest;
import org.apache.iotdb.tsfile.file.metadata.TsDigest.StatisticType;
import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.fileSystem.TSFileFactory;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.utils.BytesUtils;
@ -54,7 +54,7 @@ public class TsFileSketchTool {
System.out.println("TsFile path:" + filename);
System.out.println("Sketch save path:" + outFile);
PrintWriter pw = new PrintWriter(new FileWriter(outFile));
long length = TSFileFactory.INSTANCE.getFile(filename).length();
long length = FSFactoryProducer.getFSFactory().getFile(filename).length();
printlnBoth(pw,
"-------------------------------- TsFile Sketch --------------------------------");
printlnBoth(pw, "file path: " + filename);

View File

@ -18,11 +18,11 @@
*/
package org.apache.iotdb.db.utils;
import org.apache.iotdb.tsfile.fileSystem.TSFileFactory;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
public class CommonUtils {
@ -43,13 +43,13 @@ public class CommonUtils {
}
/**
* NOTICE: This method is currently used only for data dir, thus using TSFileFactory to get file
* NOTICE: This method is currently used only for data dir, thus using FSFactory to get file
*
* @param dir directory path
* @return
*/
public static long getUsableSpace(String dir) {
return TSFileFactory.INSTANCE.getFile(dir).getFreeSpace();
return FSFactoryProducer.getFSFactory().getFile(dir).getFreeSpace();
}
public static boolean hasSpace(String dir) {

View File

@ -40,7 +40,7 @@ import org.apache.iotdb.db.writelog.io.ILogReader;
import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
import org.apache.iotdb.db.writelog.node.WriteLogNode;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.fileSystem.TSFileFactory;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.write.schema.Schema;
@ -86,7 +86,7 @@ public class LogReplayer {
*/
public void replayLogs() throws ProcessorException {
WriteLogNode logNode = MultiFileLogNodeManager.getInstance().getNode(
logNodePrefix + TSFileFactory.INSTANCE.getFile(insertFilePath).getName());
logNodePrefix + FSFactoryProducer.getFSFactory().getFile(insertFilePath).getName());
ILogReader logReader = logNode.getLogReader();
try {

View File

@ -39,7 +39,7 @@ import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadata;
import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex;
import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.tsfile.fileSystem.TSFileFactory;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.write.schema.Schema;
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
@ -83,7 +83,7 @@ public class TsFileRecoverPerformer {
this.logReplayer = new LogReplayer(logNodePrefix, insertFilePath, tsFileResource.getModFile(),
versionController,
tsFileResource, schema, recoverMemTable, acceptUnseq);
File insertFile = TSFileFactory.INSTANCE.getFile(insertFilePath);
File insertFile = FSFactoryProducer.getFSFactory().getFile(insertFilePath);
if (!insertFile.exists()) {
logger.error("TsFile {} is missing, will skip its recovery.", insertFilePath);
return;

View File

@ -26,7 +26,7 @@ import org.apache.iotdb.db.engine.MetadataManagerHelper;
import org.apache.iotdb.db.engine.flush.MemTableFlushTask;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.fileSystem.TSFileFactory;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
import org.junit.After;
import org.junit.Before;
@ -45,7 +45,7 @@ public class MemTableFlushTaskTest {
public void setUp() throws Exception {
MetadataManagerHelper.initMetadata();
EnvironmentUtils.envSetUp();
writer = new RestorableTsFileIOWriter(TSFileFactory.INSTANCE.getFile(filePath));
writer = new RestorableTsFileIOWriter(FSFactoryProducer.getFSFactory().getFile(filePath));
memTable = new PrimitiveMemTable();
}

View File

@ -36,6 +36,11 @@
<artifactId>tsfile</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>hadoop-tsfile</artifactId>
<version>0.9.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>

View File

@ -21,7 +21,7 @@ package org.apache.iotdb.spark.tool;
import java.io.File;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.fileSystem.TSFileFactory;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.write.TsFileWriter;
import org.apache.iotdb.tsfile.write.record.TSRecord;
@ -37,7 +37,7 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
public class TsFileExample {
public static void create(String tsfilePath) throws Exception {
File f = TSFileFactory.INSTANCE.getFile(tsfilePath);
File f = FSFactoryProducer.getFSFactory().getFile(tsfilePath);
if (f.exists()) {
f.delete();
}

View File

@ -53,24 +53,6 @@
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>

View File

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.tsfile.fileSystem;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.fileSystem.fileInputFactory.FileInputFactory;
import org.apache.iotdb.tsfile.fileSystem.fileInputFactory.HDFSInputFactory;
import org.apache.iotdb.tsfile.fileSystem.fileInputFactory.LocalFSInputFactory;
import org.apache.iotdb.tsfile.fileSystem.fileOutputFactory.FileOutputFactory;
import org.apache.iotdb.tsfile.fileSystem.fileOutputFactory.HDFSOutputFactory;
import org.apache.iotdb.tsfile.fileSystem.fileOutputFactory.LocalFSOutputFactory;
import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory;
import org.apache.iotdb.tsfile.fileSystem.fsFactory.HDFSFactory;
import org.apache.iotdb.tsfile.fileSystem.fsFactory.LocalFSFactory;
public class FSFactoryProducer {
private static FSType fSType = TSFileDescriptor.getInstance().getConfig().getTSFileStorageFs();
private static FSFactory fsFactory;
private static FileInputFactory fileInputFactory;
private static FileOutputFactory fileOutputFactory;
static {
if (fSType.equals(FSType.HDFS)) {
fsFactory = new HDFSFactory();
fileInputFactory = new HDFSInputFactory();
fileOutputFactory = new HDFSOutputFactory();
} else {
fsFactory = new LocalFSFactory();
fileInputFactory = new LocalFSInputFactory();
fileOutputFactory = new LocalFSOutputFactory();
}
}
public static FSFactory getFSFactory() {
return fsFactory;
}
public static FileInputFactory getFileInputFactory() {
return fileInputFactory;
}
public static FileOutputFactory getFileOutputFactory() {
return fileOutputFactory;
}
}

View File

@ -1,201 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.tsfile.fileSystem;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public enum TSFileFactory {
INSTANCE;
private static FSType fSType = TSFileDescriptor.getInstance().getConfig().getTSFileStorageFs();
private static final Logger logger = LoggerFactory.getLogger(TSFileFactory.class);
private FileSystem fs;
private Configuration conf = new Configuration();
public File getFile(String pathname) {
if (fSType.equals(FSType.HDFS)) {
return new HDFSFile(pathname);
} else {
return new File(pathname);
}
}
public File getFile(String parent, String child) {
if (fSType.equals(FSType.HDFS)) {
return new HDFSFile(parent, child);
} else {
return new File(parent, child);
}
}
public File getFile(File parent, String child) {
if (fSType.equals(FSType.HDFS)) {
return new HDFSFile(parent, child);
} else {
return new File(parent, child);
}
}
public File getFile(URI uri) {
if (fSType.equals(FSType.HDFS)) {
return new HDFSFile(uri);
} else {
return new File(uri);
}
}
public BufferedReader getBufferedReader(String filePath) {
try {
if (fSType.equals(FSType.HDFS)) {
Path path = new Path(filePath);
fs = path.getFileSystem(conf);
return new BufferedReader(new InputStreamReader(fs.open(path)));
} else {
return new BufferedReader(new FileReader(filePath));
}
} catch (IOException e) {
logger.error("Failed to get buffered reader for {}. ", filePath, e);
return null;
}
}
public BufferedWriter getBufferedWriter(String filePath, boolean append) {
try {
if (fSType.equals(FSType.HDFS)) {
Path path = new Path(filePath);
fs = path.getFileSystem(conf);
return new BufferedWriter(new OutputStreamWriter(fs.create(path)));
} else {
return new BufferedWriter(new FileWriter(filePath, append));
}
} catch (IOException e) {
logger.error("Failed to get buffered writer for {}. ", filePath, e);
return null;
}
}
public BufferedInputStream getBufferedInputStream(String filePath) {
try {
if (fSType.equals(FSType.HDFS)) {
Path path = new Path(filePath);
fs = path.getFileSystem(conf);
return new BufferedInputStream(fs.open(path));
} else {
return new BufferedInputStream(new FileInputStream(filePath));
}
} catch (IOException e) {
logger.error("Failed to get buffered input stream for {}. ", filePath, e);
return null;
}
}
public BufferedOutputStream getBufferedOutputStream(String filePath) {
try {
if (fSType.equals(FSType.HDFS)) {
Path path = new Path(filePath);
fs = path.getFileSystem(conf);
return new BufferedOutputStream(fs.create(path));
} else {
return new BufferedOutputStream(new FileOutputStream(filePath));
}
} catch (IOException e) {
logger.error("Failed to get buffered output stream for {}. ", filePath, e);
return null;
}
}
public void moveFile(File srcFile, File destFile) {
try {
if (fSType.equals(FSType.HDFS)) {
boolean rename = srcFile.renameTo(destFile);
if (!rename) {
logger.error("Failed to rename file from {} to {}. ", srcFile.getName(),
destFile.getName());
}
} else {
FileUtils.moveFile(srcFile, destFile);
}
} catch (IOException e) {
logger.error("Failed to move file from {} to {}. ", srcFile.getAbsolutePath(),
destFile.getAbsolutePath(), e);
}
}
public File[] listFilesBySuffix(String fileFolder, String suffix) {
if (fSType.equals(FSType.HDFS)) {
PathFilter pathFilter = path -> path.toUri().toString().endsWith(suffix);
List<HDFSFile> files = listFiles(fileFolder, pathFilter);
return files.toArray(new HDFSFile[files.size()]);
} else {
return new File(fileFolder).listFiles(file -> file.getName().endsWith(suffix));
}
}
public File[] listFilesByPrefix(String fileFolder, String prefix) {
if (fSType.equals(FSType.HDFS)) {
PathFilter pathFilter = path -> path.toUri().toString().startsWith(prefix);
List<HDFSFile> files = listFiles(fileFolder, pathFilter);
return files.toArray(new HDFSFile[files.size()]);
} else {
return new File(fileFolder).listFiles(file -> file.getName().startsWith(prefix));
}
}
private List<HDFSFile> listFiles(String fileFolder, PathFilter pathFilter) {
List<HDFSFile> files = new ArrayList<>();
try {
Path path = new Path(fileFolder);
fs = path.getFileSystem(conf);
for (FileStatus fileStatus: fs.listStatus(path)) {
Path filePath = fileStatus.getPath();
if (pathFilter.accept(filePath)) {
files.add(new HDFSFile(filePath.toUri().toString()));
}
}
} catch (IOException e) {
logger.error("Failed to list files in {}. ", fileFolder);
}
return files;
}
}

View File

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.tsfile.fileSystem.fileInputFactory;
import org.apache.iotdb.tsfile.read.reader.TsFileInput;
public interface FileInputFactory {
TsFileInput getTsFileInput(String filePath);
}

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.tsfile.fileSystem.fileInputFactory;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import org.apache.iotdb.tsfile.read.reader.TsFileInput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HDFSInputFactory implements FileInputFactory {
private static final Logger logger = LoggerFactory.getLogger(HDFSInputFactory.class);
private static Constructor constructor;
static {
try {
Class<?> clazz = Class.forName("org.apache.iotdb.tsfile.fileSystem.HDFSInput");
constructor = clazz.getConstructor(String.class);
} catch (ClassNotFoundException | NoSuchMethodException e) {
logger.error(
"Failed to get HDFSInput in Hadoop file system. Please check your dependency of Hadoop module.",
e);
}
}
public TsFileInput getTsFileInput(String filePath) {
try {
return (TsFileInput) constructor.newInstance(filePath);
} catch (InstantiationException | InvocationTargetException | IllegalAccessException e) {
logger.error(
"Failed to get TsFile input of file: {}. Please check your dependency of Hadoop module.",
filePath, e);
return null;
}
}
}

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@ -17,32 +17,22 @@
* under the License.
*/
package org.apache.iotdb.tsfile.fileSystem;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.read.reader.DefaultTsFileInput;
import org.apache.iotdb.tsfile.read.reader.TsFileInput;
import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
package org.apache.iotdb.tsfile.fileSystem.fileInputFactory;
import java.io.IOException;
import java.nio.file.Paths;
import org.apache.iotdb.tsfile.read.reader.DefaultTsFileInput;
import org.apache.iotdb.tsfile.read.reader.TsFileInput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public enum FileInputFactory {
public class LocalFSInputFactory implements FileInputFactory {
INSTANCE;
private static FSType fsType = TSFileDescriptor.getInstance().getConfig().getTSFileStorageFs();
private static final Logger logger = LoggerFactory.getLogger(FileInputFactory.class);
private static final Logger logger = LoggerFactory.getLogger(LocalFSInputFactory.class);
public TsFileInput getTsFileInput(String filePath) {
try {
if (fsType.equals(FSType.HDFS)) {
return new HDFSInput(filePath);
} else {
return new DefaultTsFileInput(Paths.get(filePath));
}
return new DefaultTsFileInput(Paths.get(filePath));
} catch (IOException e) {
logger.error("Failed to get TsFile input of file: {}, ", filePath, e);
return null;

View File

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.tsfile.fileSystem.fileOutputFactory;
import org.apache.iotdb.tsfile.write.writer.TsFileOutput;
public interface FileOutputFactory {
TsFileOutput getTsFileOutput(String filePath, boolean append);
}

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.tsfile.fileSystem.fileOutputFactory;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import org.apache.iotdb.tsfile.write.writer.TsFileOutput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HDFSOutputFactory implements FileOutputFactory {
private static final Logger logger = LoggerFactory.getLogger(HDFSOutputFactory.class);
private static Constructor constructor;
static {
try {
Class<?> clazz = Class.forName("org.apache.iotdb.tsfile.fileSystem.HDFSOutput");
constructor = clazz.getConstructor(String.class, boolean.class);
} catch (ClassNotFoundException | NoSuchMethodException e) {
logger.error(
"Failed to get HDFSInput in Hadoop file system. Please check your dependency of Hadoop module.",
e);
}
}
public TsFileOutput getTsFileOutput(String filePath, boolean append) {
try {
return (TsFileOutput) constructor.newInstance(filePath, !append);
} catch (InstantiationException | InvocationTargetException | IllegalAccessException e) {
logger.error(
"Failed to get TsFile output of file: {}. Please check your dependency of Hadoop module.",
filePath, e);
return null;
}
}
}

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@ -17,33 +17,24 @@
* under the License.
*/
package org.apache.iotdb.tsfile.fileSystem;
package org.apache.iotdb.tsfile.fileSystem.fileOutputFactory;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import java.io.FileOutputStream;
import java.io.IOException;
import org.apache.iotdb.tsfile.write.writer.DefaultTsFileOutput;
import org.apache.iotdb.tsfile.write.writer.TsFileOutput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.FileOutputStream;
import java.io.IOException;
public class LocalFSOutputFactory implements FileOutputFactory {
public enum FileOutputFactory {
INSTANCE;
private static FSType fsType = TSFileDescriptor.getInstance().getConfig().getTSFileStorageFs();
private static final Logger logger = LoggerFactory.getLogger(FileOutputFactory.class);
private static final Logger logger = LoggerFactory.getLogger(LocalFSOutputFactory.class);
public TsFileOutput getTsFileOutput(String filePath, boolean append) {
try {
if (fsType.equals(FSType.HDFS)) {
return new HDFSOutput(filePath, !append);
} else {
return new DefaultTsFileOutput(new FileOutputStream(filePath, append));
}
return new DefaultTsFileOutput(new FileOutputStream(filePath, append));
} catch (IOException e) {
logger.error("Failed to get TsFile Output: ", e);
logger.error("Failed to get TsFile output of file: {}, ", filePath, e);
return null;
}
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.tsfile.fileSystem.fsFactory;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.net.URI;
public interface FSFactory {
File getFile(String pathname);
File getFile(String parent, String child);
File getFile(File parent, String child);
File getFile(URI uri);
BufferedReader getBufferedReader(String filePath);
BufferedWriter getBufferedWriter(String filePath, boolean append);
BufferedInputStream getBufferedInputStream(String filePath);
BufferedOutputStream getBufferedOutputStream(String filePath);
void moveFile(File srcFile, File destFile);
File[] listFilesBySuffix(String fileFolder, String suffix);
File[] listFilesByPrefix(String fileFolder, String prefix);
}

View File

@ -0,0 +1,190 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.tsfile.fileSystem.fsFactory;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URI;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HDFSFactory implements FSFactory {
private static final Logger logger = LoggerFactory.getLogger(HDFSFactory.class);
private static Constructor constructorWithPathname;
private static Constructor constructorWithParentStringAndChild;
private static Constructor constructorWithParentFileAndChild;
private static Constructor constructorWithUri;
private static Method getBufferedReader;
private static Method getBufferedWriter;
private static Method getBufferedInputStream;
private static Method getBufferedOutputStream;
private static Method listFilesBySuffix;
private static Method listFilesByPrefix;
static {
try {
Class<?> clazz = Class.forName("org.apache.iotdb.tsfile.fileSystem.HDFSFile");
constructorWithPathname = clazz.getConstructor(String.class);
constructorWithParentStringAndChild = clazz.getConstructor(String.class, String.class);
constructorWithParentFileAndChild = clazz.getConstructor(File.class, String.class);
constructorWithUri = clazz.getConstructor(URI.class);
getBufferedReader = clazz.getMethod("getBufferedReader", String.class);
getBufferedWriter = clazz.getMethod("getBufferedWriter", String.class, boolean.class);
getBufferedInputStream = clazz.getMethod("getBufferedInputStream", String.class);
getBufferedOutputStream = clazz.getMethod("getBufferedOutputStream", String.class);
listFilesBySuffix = clazz.getMethod("listFilesBySuffix", String.class, String.class);
listFilesByPrefix = clazz.getMethod("listFilesByPrefix", String.class, String.class);
} catch (ClassNotFoundException | NoSuchMethodException e) {
logger.error(
"Failed to get Hadoop file system. Please check your dependency of Hadoop module.", e);
}
}
public File getFile(String pathname) {
try {
return (File) constructorWithPathname.newInstance(pathname);
} catch (InstantiationException | InvocationTargetException | IllegalAccessException e) {
logger.error(
"Failed to get file: {}. Please check your dependency of Hadoop module.", pathname, e);
return null;
}
}
public File getFile(String parent, String child) {
try {
return (File) constructorWithParentStringAndChild.newInstance(parent, child);
} catch (InstantiationException | InvocationTargetException | IllegalAccessException e) {
logger.error(
"Failed to get file: {}" + File.separator
+ "{}. Please check your dependency of Hadoop module.", parent, child, e);
return null;
}
}
public File getFile(File parent, String child) {
try {
return (File) constructorWithParentFileAndChild.newInstance(parent, child);
} catch (InstantiationException | InvocationTargetException | IllegalAccessException e) {
logger.error(
"Failed to get file: {}" + File.separator
+ "{}. Please check your dependency of Hadoop module.", parent.getAbsolutePath(),
child, e);
return null;
}
}
public File getFile(URI uri) {
try {
return (File) constructorWithUri.newInstance(uri);
} catch (InstantiationException | InvocationTargetException | IllegalAccessException e) {
logger.error(
"Failed to get file: {}. Please check your dependency of Hadoop module.",
uri.toString(), e);
return null;
}
}
public BufferedReader getBufferedReader(String filePath) {
try {
return (BufferedReader) getBufferedReader
.invoke(constructorWithPathname.newInstance(filePath), filePath);
} catch (InstantiationException | InvocationTargetException | IllegalAccessException e) {
logger.error(
"Failed to get buffered reader for {}. Please check your dependency of Hadoop module.",
filePath, e);
return null;
}
}
public BufferedWriter getBufferedWriter(String filePath, boolean append) {
try {
return (BufferedWriter) getBufferedWriter
.invoke(constructorWithPathname.newInstance(filePath), filePath, append);
} catch (InstantiationException | InvocationTargetException | IllegalAccessException e) {
logger.error(
"Failed to get buffered writer for {}. Please check your dependency of Hadoop module.",
filePath, e);
return null;
}
}
public BufferedInputStream getBufferedInputStream(String filePath) {
try {
return (BufferedInputStream) getBufferedInputStream
.invoke(constructorWithPathname.newInstance(filePath), filePath);
} catch (InstantiationException | InvocationTargetException | IllegalAccessException e) {
logger.error(
"Failed to get buffered input stream for {}. Please check your dependency of Hadoop module.",
filePath, e);
return null;
}
}
public BufferedOutputStream getBufferedOutputStream(String filePath) {
try {
return (BufferedOutputStream) getBufferedOutputStream
.invoke(constructorWithPathname.newInstance(filePath), filePath);
} catch (InstantiationException | InvocationTargetException | IllegalAccessException e) {
logger.error(
"Failed to get buffered output stream for {}. Please check your dependency of Hadoop module.",
filePath, e);
return null;
}
}
public void moveFile(File srcFile, File destFile) {
boolean rename = srcFile.renameTo(destFile);
if (!rename) {
logger.error("Failed to rename file from {} to {}. ", srcFile.getName(),
destFile.getName());
}
}
public File[] listFilesBySuffix(String fileFolder, String suffix) {
try {
return (File[]) listFilesBySuffix
.invoke(constructorWithPathname.newInstance(fileFolder), fileFolder, suffix);
} catch (InstantiationException | InvocationTargetException | IllegalAccessException e) {
logger.error(
"Failed to list files in {} with SUFFIX {}. Please check your dependency of Hadoop module.",
fileFolder, suffix, e);
return null;
}
}
public File[] listFilesByPrefix(String fileFolder, String prefix) {
try {
return (File[]) listFilesByPrefix
.invoke(constructorWithPathname.newInstance(fileFolder), fileFolder, prefix);
} catch (InstantiationException | InvocationTargetException | IllegalAccessException e) {
logger.error(
"Failed to list files in {} with PREFIX {}. Please check your dependency of Hadoop module.",
fileFolder, prefix, e);
return null;
}
}
}

View File

@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.tsfile.fileSystem.fsFactory;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.net.URI;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class LocalFSFactory implements FSFactory {
private static final Logger logger = LoggerFactory.getLogger(LocalFSFactory.class);
public File getFile(String pathname) {
return new File(pathname);
}
public File getFile(String parent, String child) {
return new File(parent, child);
}
public File getFile(File parent, String child) {
return new File(parent, child);
}
public File getFile(URI uri) {
return new File(uri);
}
public BufferedReader getBufferedReader(String filePath) {
try {
return new BufferedReader(new FileReader(filePath));
} catch (IOException e) {
logger.error("Failed to get buffered reader for {}. ", filePath, e);
return null;
}
}
public BufferedWriter getBufferedWriter(String filePath, boolean append) {
try {
return new BufferedWriter(new FileWriter(filePath, append));
} catch (IOException e) {
logger.error("Failed to get buffered writer for {}. ", filePath, e);
return null;
}
}
public BufferedInputStream getBufferedInputStream(String filePath) {
try {
return new BufferedInputStream(new FileInputStream(filePath));
} catch (IOException e) {
logger.error("Failed to get buffered input stream for {}. ", filePath, e);
return null;
}
}
public BufferedOutputStream getBufferedOutputStream(String filePath) {
try {
return new BufferedOutputStream(new FileOutputStream(filePath));
} catch (IOException e) {
logger.error("Failed to get buffered output stream for {}. ", filePath, e);
return null;
}
}
public void moveFile(File srcFile, File destFile) {
try {
FileUtils.moveFile(srcFile, destFile);
} catch (IOException e) {
logger.error("Failed to move file from {} to {}. ", srcFile.getAbsolutePath(),
destFile.getAbsolutePath(), e);
}
}
public File[] listFilesBySuffix(String fileFolder, String suffix) {
return new File(fileFolder).listFiles(file -> file.getName().endsWith(suffix));
}
public File[] listFilesByPrefix(String fileFolder, String prefix) {
return new File(fileFolder).listFiles(file -> file.getName().startsWith(prefix));
}
}

View File

@ -19,7 +19,7 @@
package org.apache.iotdb.tsfile.read;
import org.apache.iotdb.tsfile.fileSystem.TSFileFactory;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.write.TsFileWriter;
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
import org.slf4j.Logger;
@ -60,7 +60,7 @@ public class TsFileRestorableReader extends TsFileSequenceReader {
if (!isComplete()) {
// Try to close it
logger.info("File {} has no correct tail magic, try to repair...", file);
RestorableTsFileIOWriter rWriter = new RestorableTsFileIOWriter(TSFileFactory.INSTANCE.getFile(file));
RestorableTsFileIOWriter rWriter = new RestorableTsFileIOWriter(FSFactoryProducer.getFSFactory().getFile(file));
TsFileWriter writer = new TsFileWriter(rWriter);
// This writes the right magic string
writer.close();

View File

@ -42,8 +42,7 @@ import org.apache.iotdb.tsfile.file.metadata.TsDigest.StatisticType;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.fileSystem.FileInputFactory;
import org.apache.iotdb.tsfile.fileSystem.TSFileFactory;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.reader.TsFileInput;
@ -52,6 +51,13 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
import static org.apache.iotdb.tsfile.write.writer.TsFileIOWriter.magicStringBytes;
public class TsFileSequenceReader implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(TsFileSequenceReader.class);
@ -88,7 +94,7 @@ public class TsFileSequenceReader implements AutoCloseable {
*/
public TsFileSequenceReader(String file, boolean loadMetadataSize) throws IOException {
this.file = file;
tsFileInput = FileInputFactory.INSTANCE.getTsFileInput(file);
tsFileInput = FSFactoryProducer.getFileInputFactory().getTsFileInput(file);
try {
if (loadMetadataSize) {
loadMetadataSize();
@ -516,7 +522,7 @@ public class TsFileSequenceReader implements AutoCloseable {
*/
public long selfCheck(Map<String, MeasurementSchema> newSchema,
List<ChunkGroupMetaData> newMetaData, boolean fastFinish) throws IOException {
File checkFile = TSFileFactory.INSTANCE.getFile(this.file);
File checkFile = FSFactoryProducer.getFSFactory().getFile(this.file);
long fileSize;
if (!checkFile.exists()) {
return TsFileCheckStatus.FILE_NOT_FOUND;

View File

@ -34,7 +34,7 @@ import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex;
import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.fileSystem.FileOutputFactory;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.TsFileCheckStatus;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@ -72,7 +72,7 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
*/
public RestorableTsFileIOWriter(File file) throws IOException {
this.file = file;
this.out = FileOutputFactory.INSTANCE.getTsFileOutput(file.getPath(), true);
this.out = FSFactoryProducer.getFileOutputFactory().getTsFileOutput(file.getPath(), true);
// file doesn't exist
if (file.length() == 0) {

View File

@ -46,7 +46,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.fileSystem.FileOutputFactory;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.utils.BytesUtils;
@ -123,7 +123,7 @@ public class TsFileIOWriter {
*/
public TsFileIOWriter(TsFileOutput out, List<ChunkGroupMetaData> chunkGroupMetaDataList)
throws IOException {
this.out = FileOutputFactory.INSTANCE.getTsFileOutput(file.getPath(), false); //NOTE overwrite false here
this.out = FSFactoryProducer.getFileOutputFactory().getTsFileOutput(file.getPath(), false); //NOTE overwrite false here
this.chunkGroupMetaDataList = chunkGroupMetaDataList;
if (chunkGroupMetaDataList.isEmpty()) {
startFile();

View File

@ -26,17 +26,19 @@ import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.fileSystem.TSFileFactory;
import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.utils.TsFileGeneratorForTest;
import org.junit.Test;
public class TsFileRestorableReaderTest {
private static final String FILE_PATH = TsFileGeneratorForTest.outputDataFile;
private FSFactory fsFactory = FSFactoryProducer.getFSFactory();
@Test
public void testToReadDamagedFileAndRepair() throws IOException {
File file = TSFileFactory.INSTANCE.getFile(FILE_PATH);
File file = fsFactory.getFile(FILE_PATH);
TsFileGeneratorForTest.writeFileWithOneIncompleteChunkHeader(file);
@ -51,7 +53,7 @@ public class TsFileRestorableReaderTest {
@Test
public void testToReadDamagedFileNoRepair() throws IOException {
File file = TSFileFactory.INSTANCE.getFile(FILE_PATH);
File file = fsFactory.getFile(FILE_PATH);
TsFileGeneratorForTest.writeFileWithOneIncompleteChunkHeader(file);
// This should throw an Illegal Argument Exception

View File

@ -18,8 +18,8 @@
*/
package org.apache.iotdb.tsfile.utils;
import org.apache.iotdb.tsfile.fileSystem.TSFileFactory;
import java.io.File;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
/**
* FileUtils is just used for return file attribute like file size, and contains some measurement conversion among B,
@ -28,7 +28,7 @@ import java.io.File;
public class FileUtils {
public static double getLocalFileByte(String filePath, Unit unit) {
File f = TSFileFactory.INSTANCE.getFile(filePath);
File f = FSFactoryProducer.getFSFactory().getFile(filePath);
return getLocalFileByte(f, unit);
}

View File

@ -36,7 +36,8 @@ import org.apache.iotdb.tsfile.file.header.ChunkHeader;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.fileSystem.TSFileFactory;
import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.write.TsFileWriter;
import org.apache.iotdb.tsfile.write.record.TSRecord;
import org.apache.iotdb.tsfile.write.schema.Schema;
@ -58,6 +59,7 @@ public class TsFileGeneratorForTest {
private static int rowCount;
private static int chunkGroupSize;
private static int pageSize;
private static FSFactory fsFactory = FSFactoryProducer.getFSFactory();
public static void generateFile(int rowCount, int chunkGroupSize, int pageSize)
throws IOException, InterruptedException, WriteProcessException {
@ -80,22 +82,22 @@ public class TsFileGeneratorForTest {
}
public static void after() {
File file = TSFileFactory.INSTANCE.getFile(inputDataFile);
File file = fsFactory.getFile(inputDataFile);
if (file.exists()) {
Assert.assertTrue(file.delete());
}
file = TSFileFactory.INSTANCE.getFile(outputDataFile);
file = fsFactory.getFile(outputDataFile);
if (file.exists()) {
Assert.assertTrue(file.delete());
}
file = TSFileFactory.INSTANCE.getFile(errorOutputDataFile);
file = fsFactory.getFile(errorOutputDataFile);
if (file.exists()) {
Assert.assertTrue(file.delete());
}
}
static private void generateSampleInputDataFile(int minRowCount, int maxRowCount) throws IOException {
File file = TSFileFactory.INSTANCE.getFile(inputDataFile);
File file = fsFactory.getFile(inputDataFile);
if (file.exists()) {
Assert.assertTrue(file.delete());
}
@ -148,8 +150,8 @@ public class TsFileGeneratorForTest {
}
static public void write() throws IOException {
File file = TSFileFactory.INSTANCE.getFile(outputDataFile);
File errorFile = TSFileFactory.INSTANCE.getFile(errorOutputDataFile);
File file = fsFactory.getFile(outputDataFile);
File errorFile = fsFactory.getFile(errorOutputDataFile);
if (file.exists()) {
Assert.assertTrue(file.delete());
}
@ -164,7 +166,7 @@ public class TsFileGeneratorForTest {
innerWriter = new TsFileWriter(file, schema, TSFileDescriptor.getInstance().getConfig());
// write
try (Scanner in = new Scanner(TSFileFactory.INSTANCE.getFile(inputDataFile))) {
try (Scanner in = new Scanner(fsFactory.getFile(inputDataFile))) {
assert in != null;
while (in.hasNextLine()) {
String str = in.nextLine();

View File

@ -30,7 +30,8 @@ import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.file.metadata.statistics.FloatStatistics;
import org.apache.iotdb.tsfile.fileSystem.TSFileFactory;
import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.ReadOnlyTsFile;
import org.apache.iotdb.tsfile.read.TsFileCheckStatus;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
@ -51,10 +52,11 @@ import static org.junit.Assert.*;
public class RestorableTsFileIOWriterTest {
private static final String FILE_NAME = "test.ts";
private static FSFactory fsFactory = FSFactoryProducer.getFSFactory();
@Test(expected = IOException.class)
public void testBadHeadMagic() throws Exception {
File file = TSFileFactory.INSTANCE.getFile(FILE_NAME);
File file = fsFactory.getFile(FILE_NAME);
FileWriter fWriter = new FileWriter(file);
fWriter.write("Tsfile");
fWriter.close();
@ -67,7 +69,7 @@ public class RestorableTsFileIOWriterTest {
@Test
public void testOnlyHeadMagic() throws Exception {
File file = TSFileFactory.INSTANCE.getFile(FILE_NAME);
File file = fsFactory.getFile(FILE_NAME);
TsFileWriter writer = new TsFileWriter(file);
writer.getIOWriter().close();
@ -86,7 +88,7 @@ public class RestorableTsFileIOWriterTest {
@Test
public void testOnlyFirstMask() throws Exception {
File file = TSFileFactory.INSTANCE.getFile(FILE_NAME);
File file = fsFactory.getFile(FILE_NAME);
TsFileWriter writer = new TsFileWriter(file);
//we have to flush using inner API.
writer.getIOWriter().out.write(new byte[] {MetaMarker.CHUNK_HEADER});
@ -100,7 +102,7 @@ public class RestorableTsFileIOWriterTest {
@Test
public void testOnlyOneIncompleteChunkHeader() throws Exception {
File file = TSFileFactory.INSTANCE.getFile(FILE_NAME);
File file = fsFactory.getFile(FILE_NAME);
TsFileGeneratorForTest.writeFileWithOneIncompleteChunkHeader(file);
@ -130,7 +132,7 @@ public class RestorableTsFileIOWriterTest {
@Test
public void testOnlyOneChunkHeaderAndSomePage() throws Exception {
File file = TSFileFactory.INSTANCE.getFile(FILE_NAME);
File file = fsFactory.getFile(FILE_NAME);
TsFileWriter writer = new TsFileWriter(file);
writer.addMeasurement(new MeasurementSchema("s1", TSDataType.FLOAT, TSEncoding.RLE));
writer.addMeasurement(new MeasurementSchema("s2", TSDataType.FLOAT, TSEncoding.RLE));
@ -153,7 +155,7 @@ public class RestorableTsFileIOWriterTest {
@Test
public void testOnlyOneChunkGroup() throws Exception {
File file = TSFileFactory.INSTANCE.getFile(FILE_NAME);
File file = fsFactory.getFile(FILE_NAME);
TsFileWriter writer = new TsFileWriter(file);
writer.addMeasurement(new MeasurementSchema("s1", TSDataType.FLOAT, TSEncoding.RLE));
writer.addMeasurement(new MeasurementSchema("s2", TSDataType.FLOAT, TSEncoding.RLE));
@ -189,7 +191,7 @@ public class RestorableTsFileIOWriterTest {
@Test
public void testOnlyOneChunkGroupAndOneMask() throws Exception {
File file = TSFileFactory.INSTANCE.getFile(FILE_NAME);
File file = fsFactory.getFile(FILE_NAME);
TsFileWriter writer = new TsFileWriter(file);
writer.addMeasurement(new MeasurementSchema("s1", TSDataType.FLOAT, TSEncoding.RLE));
writer.addMeasurement(new MeasurementSchema("s2", TSDataType.FLOAT, TSEncoding.RLE));
@ -214,7 +216,7 @@ public class RestorableTsFileIOWriterTest {
@Test
public void testTwoChunkGroupAndMore() throws Exception {
File file = TSFileFactory.INSTANCE.getFile(FILE_NAME);
File file = fsFactory.getFile(FILE_NAME);
TsFileWriter writer = new TsFileWriter(file);
writer.addMeasurement(new MeasurementSchema("s1", TSDataType.FLOAT, TSEncoding.RLE));
writer.addMeasurement(new MeasurementSchema("s2", TSDataType.FLOAT, TSEncoding.RLE));
@ -241,7 +243,7 @@ public class RestorableTsFileIOWriterTest {
@Test
public void testNoSeperatorMask() throws Exception {
File file = TSFileFactory.INSTANCE.getFile(FILE_NAME);
File file = fsFactory.getFile(FILE_NAME);
TsFileWriter writer = new TsFileWriter(file);
writer.addMeasurement(new MeasurementSchema("s1", TSDataType.FLOAT, TSEncoding.RLE));
writer.addMeasurement(new MeasurementSchema("s2", TSDataType.FLOAT, TSEncoding.RLE));
@ -272,7 +274,7 @@ public class RestorableTsFileIOWriterTest {
@Test
public void testHavingSomeFileMetadata() throws Exception {
File file = TSFileFactory.INSTANCE.getFile(FILE_NAME);
File file = fsFactory.getFile(FILE_NAME);
TsFileWriter writer = new TsFileWriter(file);
writer.addMeasurement(new MeasurementSchema("s1", TSDataType.FLOAT, TSEncoding.RLE));
writer.addMeasurement(new MeasurementSchema("s2", TSDataType.FLOAT, TSEncoding.RLE));
@ -303,7 +305,7 @@ public class RestorableTsFileIOWriterTest {
@Test
public void testOpenCompleteFile() throws Exception {
File file = TSFileFactory.INSTANCE.getFile(FILE_NAME);
File file = fsFactory.getFile(FILE_NAME);
TsFileWriter writer = new TsFileWriter(file);
writer.addMeasurement(new MeasurementSchema("s1", TSDataType.FLOAT, TSEncoding.RLE));
writer.addMeasurement(new MeasurementSchema("s2", TSDataType.FLOAT, TSEncoding.RLE));
@ -328,7 +330,7 @@ public class RestorableTsFileIOWriterTest {
@Test
public void testAppendDataOnCompletedFile() throws Exception {
File file = TSFileFactory.INSTANCE.getFile(FILE_NAME);
File file = fsFactory.getFile(FILE_NAME);
TsFileWriter writer = new TsFileWriter(file);
writer.addMeasurement(new MeasurementSchema("s1", TSDataType.FLOAT, TSEncoding.RLE));
writer.addMeasurement(new MeasurementSchema("s2", TSDataType.FLOAT, TSEncoding.RLE));