Merge branch 'master' into fix_issue113_wal_null

This commit is contained in:
CGF 2017-10-10 21:43:20 +08:00
commit 41864b4c75
14 changed files with 404 additions and 300 deletions

View File

@ -5,8 +5,6 @@ import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Scanner;
import cn.edu.tsinghua.iotdb.jdbc.TsfileConnection;
import cn.edu.tsinghua.iotdb.jdbc.TsfileJDBCConfig;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
@ -15,6 +13,8 @@ import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import cn.edu.tsinghua.iotdb.exception.ArgsErrorException;
import cn.edu.tsinghua.iotdb.jdbc.TsfileConnection;
import cn.edu.tsinghua.iotdb.jdbc.TsfileJDBCConfig;
public class WinClient extends AbstractClient {

View File

@ -42,6 +42,7 @@ import cn.edu.tsinghua.tsfile.file.metadata.TSFileMetaData;
import cn.edu.tsinghua.tsfile.file.metadata.TimeSeriesMetadata;
import cn.edu.tsinghua.tsfile.file.metadata.converter.TSFileMetaDataConverter;
import cn.edu.tsinghua.tsfile.file.metadata.enums.TSDataType;
import cn.edu.tsinghua.tsfile.file.metadata.enums.TSEncoding;
import cn.edu.tsinghua.tsfile.file.utils.ReadWriteThriftFormatUtils;
import cn.edu.tsinghua.tsfile.format.FileMetaData;
import cn.edu.tsinghua.tsfile.timeseries.write.TSRecordWriteSupport;
@ -139,7 +140,11 @@ public class BufferWriteProcessor extends LRUProcessor {
}
WriteSupport<TSRecord> writeSupport = new TSRecordWriteSupport();
recordWriter = new BufferWriteRecordWriter(TsFileConf, bufferIOWriter, writeSupport, fileSchema);
try {
recordWriter = new BufferWriteRecordWriter(TsFileConf, bufferIOWriter, writeSupport, fileSchema);
} catch (WriteProcessException e) {
throw new BufferWriteProcessorException(e);
}
isNewProcessor = true;
// write restore file
writeStoreToDisk();
@ -201,7 +206,11 @@ public class BufferWriteProcessor extends LRUProcessor {
throw new BufferWriteProcessorException(e);
}
WriteSupport<TSRecord> writeSupport = new TSRecordWriteSupport();
recordWriter = new BufferWriteRecordWriter(TsFileConf, bufferIOWriter, writeSupport, fileSchema);
try {
recordWriter = new BufferWriteRecordWriter(TsFileConf, bufferIOWriter, writeSupport, fileSchema);
} catch (WriteProcessException e) {
throw new BufferWriteProcessorException(e);
}
isNewProcessor = false;
}
@ -387,55 +396,44 @@ public class BufferWriteProcessor extends LRUProcessor {
private FileSchema constructFileSchema(String nameSpacePath) throws PathErrorException, WriteProcessException {
List<ColumnSchema> columnSchemaList;
String deltaObjectType = null;
try {
deltaObjectType = mManager.getDeltaObjectTypeByPath(nameSpacePath);
} catch (PathErrorException e) {
LOGGER.error("Get the deltaObjectType from MManager error using nameSpacePath is {}.", nameSpacePath);
throw e;
}
try {
columnSchemaList = mManager.getSchemaForOneType(deltaObjectType);
} catch (PathErrorException e) {
LOGGER.error("The list of ColumnSchema error from MManager error using deltaObjectType is {}.",
deltaObjectType);
throw e;
}
columnSchemaList = mManager.getSchemaForFileName(nameSpacePath);
FileSchema fileSchema = null;
try {
fileSchema = getFileSchemaFromColumnSchema(columnSchemaList, deltaObjectType);
fileSchema = getFileSchemaFromColumnSchema(columnSchemaList, nameSpacePath);
} catch (WriteProcessException e) {
LOGGER.error("Get the FileSchema error, the list of ColumnSchema is {}.", columnSchemaList);
throw e;
}
return fileSchema;
}
private FileSchema getFileSchemaFromColumnSchema(List<ColumnSchema> schemaList, String deltaObjectType)
private FileSchema getFileSchemaFromColumnSchema(List<ColumnSchema> schemaList, String nameSpacePath)
throws WriteProcessException {
JSONArray rowGroup = new JSONArray();
for (ColumnSchema col : schemaList) {
JSONObject measurement = new JSONObject();
measurement.put(JsonFormatConstant.MEASUREMENT_UID, col.name);
measurement.put(JsonFormatConstant.DATA_TYPE, col.dataType.toString());
measurement.put(JsonFormatConstant.MEASUREMENT_ENCODING, col.encoding.toString());
for (Entry<String, String> entry : col.getArgsMap().entrySet()) {
if (JsonFormatConstant.ENUM_VALUES.equals(entry.getKey())) {
String[] valueArray = entry.getValue().split(",");
measurement.put(JsonFormatConstant.ENUM_VALUES, new JSONArray(valueArray));
} else
measurement.put(entry.getKey(), entry.getValue().toString());
}
rowGroup.put(measurement);
rowGroup.put(constrcutMeasurement(col));
}
JSONObject jsonSchema = new JSONObject();
jsonSchema.put(JsonFormatConstant.JSON_SCHEMA, rowGroup);
jsonSchema.put(JsonFormatConstant.DELTA_TYPE, deltaObjectType);
jsonSchema.put(JsonFormatConstant.DELTA_TYPE, nameSpacePath);
return new FileSchema(jsonSchema);
}
private JSONObject constrcutMeasurement(ColumnSchema col) {
JSONObject measurement = new JSONObject();
measurement.put(JsonFormatConstant.MEASUREMENT_UID, col.name);
measurement.put(JsonFormatConstant.DATA_TYPE, col.dataType.toString());
measurement.put(JsonFormatConstant.MEASUREMENT_ENCODING, col.encoding.toString());
for (Entry<String, String> entry : col.getArgsMap().entrySet()) {
if (JsonFormatConstant.ENUM_VALUES.equals(entry.getKey())) {
String[] valueArray = entry.getValue().split(",");
measurement.put(JsonFormatConstant.ENUM_VALUES, new JSONArray(valueArray));
} else
measurement.put(entry.getKey(), entry.getValue().toString());
}
return measurement;
}
public String getFileName() {
return fileName;
}
@ -546,6 +544,18 @@ public class BufferWriteProcessor extends LRUProcessor {
}
public void addTimeSeries(String measurementToString, String dataType, String encoding, String[] encodingArgs)
throws IOException {
ColumnSchema col = new ColumnSchema(measurementToString, TSDataType.valueOf(dataType),
TSEncoding.valueOf(encoding));
JSONObject measurement = constrcutMeasurement(col);
try {
recordWriter.addMeasurementByJson(measurement);
} catch (WriteProcessException e) {
throw new IOException(e);
}
}
private class BufferWriteRecordWriter extends TSRecordWriter {
private Map<String, IRowGroupWriter> flushingRowGroupWriters;
@ -553,7 +563,7 @@ public class BufferWriteProcessor extends LRUProcessor {
private long flushingRecordCount;
BufferWriteRecordWriter(TSFileConfig conf, BufferWriteIOWriter ioFileWriter,
WriteSupport<TSRecord> writeSupport, FileSchema schema) {
WriteSupport<TSRecord> writeSupport, FileSchema schema) throws WriteProcessException {
super(conf, ioFileWriter, writeSupport, schema);
}
@ -725,4 +735,5 @@ public class BufferWriteProcessor extends LRUProcessor {
private void switchIndexFromFlushToWork() {
bufferIOWriter.addNewRowGroupMetaDataToBackUp();
}
}

View File

@ -127,12 +127,8 @@ public class FileNodeManager extends LRUManager<FileNodeProcessor> {
fileNodeProcessor.writeUnlock();
}
}
} catch (PathErrorException e1) {
LOGGER.error("Restore all FileNodeManager failed.", e1);
} catch (LRUManagerException e2) {
LOGGER.error("Construt the filenode processor failed.", e2);
} catch (FileNodeProcessorException e3) {
LOGGER.error("Recovery the filenode processor failed.", e3);
} catch (PathErrorException | LRUManagerException | FileNodeProcessorException e) {
LOGGER.error("Restore all FileNode failed, the reason is {}", e.getMessage());
}
}
@ -574,6 +570,30 @@ public class FileNodeManager extends LRUManager<FileNodeProcessor> {
return res;
}
// TODO: should synchronized
public synchronized void addTimeSeries(Path path, String dataType, String encoding, String[] encodingArgs)
throws FileNodeManagerException {
// TODO: optimize and do't get the filenode processor instance
FileNodeProcessor fileNodeProcessor = null;
try {
do {
fileNodeProcessor = getProcessorWithDeltaObjectIdByLRU(path.getFullPath(), true);
} while (fileNodeProcessor == null);
if (fileNodeProcessor.hasBufferwriteProcessor()) {
BufferWriteProcessor bufferWriteProcessor = fileNodeProcessor.getBufferWriteProcessor();
bufferWriteProcessor.addTimeSeries(path.getMeasurementToString(), dataType, encoding, encodingArgs);
} else {
return;
}
} catch (LRUManagerException | IOException | FileNodeProcessorException e) {
throw new FileNodeManagerException(e);
} finally {
if (fileNodeProcessor != null) {
fileNodeProcessor.writeUnlock();
}
}
}
public synchronized boolean closeOneFileNode(String namespacePath) throws FileNodeManagerException {
if (fileNodeManagerStatus == FileNodeManagerStatus.NONE) {
fileNodeManagerStatus = FileNodeManagerStatus.CLOSE;

View File

@ -136,9 +136,11 @@ public class FileNodeProcessor extends LRUProcessor {
endTimeMap.put(deltaObjectId, lastUpdateTimeMap.get(deltaObjectId));
}
currentIntervalFileNode.setEndTimeMap(endTimeMap);
} else {
throw new ProcessorRuntimException("The intervalFile list is empty when close bufferwrite file");
}
// else {
// throw new ProcessorRuntimException("The intervalFile list is empty
// when close bufferwrite file");
// }
}
public void addIntervalFileNode(long startTime, String fileName) throws Exception {
@ -948,13 +950,7 @@ public class FileNodeProcessor extends LRUProcessor {
+ FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR + System.currentTimeMillis());
FileSchema fileSchema;
try {
fileSchema = constructFileSchema(nameSpacePath);
} catch (PathErrorException e) {
LOGGER.error("Get the FileSchema error, the nameSpacePath is {}", nameSpacePath);
throw new WriteProcessException(
"Get the FileSchema error, the nameSpacePath is " + nameSpacePath);
}
fileSchema = constructFileSchema(nameSpacePath);
raf = new RandomAccessOutputStream(new File(outputPath));
tsfileIOWriter = new TSFileIOWriter(fileSchema, raf);
writeSupport = new TSRecordWriteSupport();
@ -1009,29 +1005,14 @@ public class FileNodeProcessor extends LRUProcessor {
return outputFile.getAbsolutePath();
}
private FileSchema constructFileSchema(String nameSpacePath) throws PathErrorException, WriteProcessException {
private FileSchema constructFileSchema(String nameSpacePath) throws WriteProcessException {
List<ColumnSchema> columnSchemaList;
String deltaObjectType = null;
try {
deltaObjectType = mManager.getDeltaObjectTypeByPath(nameSpacePath);
} catch (PathErrorException e) {
LOGGER.error("Get the deltaObjectType from MManager error using nameSpacePath is {}", nameSpacePath);
throw e;
}
try {
columnSchemaList = mManager.getSchemaForOneType(deltaObjectType);
} catch (PathErrorException e) {
LOGGER.error("The list of ColumnSchema error from MManager error using deltaObjectType is {}",
deltaObjectType);
throw e;
}
columnSchemaList = mManager.getSchemaForFileName(nameSpacePath);
FileSchema fileSchema = null;
try {
fileSchema = getFileSchemaFromColumnSchema(columnSchemaList, deltaObjectType);
fileSchema = getFileSchemaFromColumnSchema(columnSchemaList, nameSpacePath);
} catch (WriteProcessException e) {
LOGGER.error("Get the FileSchema error, the list of ColumnSchema is {}", columnSchemaList);
throw e;

View File

@ -6,6 +6,7 @@ import java.util.Map;
import cn.edu.tsinghua.tsfile.file.metadata.enums.TSDataType;
import cn.edu.tsinghua.tsfile.file.metadata.enums.TSEncoding;
import cn.edu.tsinghua.tsfile.format.Encoding;
public class ColumnSchema implements Serializable {
private static final long serialVersionUID = -8257474930341487207L;
@ -29,6 +30,18 @@ public class ColumnSchema implements Serializable {
public String getValueFromArgs(String key) {
return args.get(key);
}
public String getName(){
return name;
}
public TSDataType geTsDataType(){
return dataType;
}
public TSEncoding getEncoding(){
return encoding;
}
public Map<String, String> getArgsMap() {
return args;

View File

@ -196,6 +196,15 @@ public class MGraph implements Serializable {
public ArrayList<ColumnSchema> getSchemaForOneType(String path) throws PathErrorException {
return mTree.getSchemaForOneType(path);
}
/**
* <p>Get all ColumnSchemas for the filenode path</p>
* @param path
* @return ArrayList<ColumnSchema> The list of the schema
*/
public ArrayList<ColumnSchema> getSchemaForOneFileNode(String path){
return mTree.getSchemaForOneFileNode(path);
}
/**
* Calculate the count of storage-level nodes included in given path
@ -213,6 +222,10 @@ public class MGraph implements Serializable {
public String getFileNameByPath(String path) throws PathErrorException {
return mTree.getFileNameByPath(path);
}
public boolean checkFileNameByPath(String path){
return mTree.checkFileNameByPath(path);
}
/**
* Check whether the path given exists
@ -228,7 +241,7 @@ public class MGraph implements Serializable {
public String getDeltaObjectTypeByPath(String path) throws PathErrorException {
return mTree.getDeltaObjectTypeByPath(path);
}
/**
* Get ColumnSchema for given path. Notice: Path must be a complete Path
* from root to leaf node.
@ -241,12 +254,6 @@ public class MGraph implements Serializable {
StringBuilder sb = new StringBuilder();
sb.append("=== Timeseries Tree ===\n\n");
sb.append(mTree.toString());
// sb.append("\n\n=== Properties Tree === Size : " + pTreeMap.size() + "\n\n");
// for (String key : pTreeMap.keySet()) {
// sb.append("--- name : " + key + "---\n");
// sb.append(pTreeMap.get(key).toString());
// sb.append("\n\n");
// }
return sb.toString();
}
}

View File

@ -20,6 +20,8 @@ import cn.edu.tsinghua.iotdb.conf.TsfileDBDescriptor;
import cn.edu.tsinghua.iotdb.exception.MetadataArgsErrorException;
import cn.edu.tsinghua.iotdb.exception.PathErrorException;
import cn.edu.tsinghua.tsfile.file.metadata.enums.TSDataType;
import cn.edu.tsinghua.tsfile.format.DataType;
import cn.edu.tsinghua.tsfile.format.Encoding;
import cn.edu.tsinghua.tsfile.timeseries.read.qp.Path;
/**
@ -32,7 +34,7 @@ import cn.edu.tsinghua.tsfile.timeseries.read.qp.Path;
*
*/
public class MManager {
//private static MManager manager = new MManager();
// private static MManager manager = new MManager();
private static final String ROOT_NAME = MetadataConstant.ROOT;
// the lock for read/write
private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@ -45,12 +47,12 @@ public class MManager {
private boolean writeToLog;
private String metadataDirPath;
private static class MManagerHolder{
private static final MManager INSTANCE = new MManager();
private static class MManagerHolder {
private static final MManager INSTANCE = new MManager();
}
public static MManager getInstance() {
return MManagerHolder.INSTANCE;
}
@ -151,18 +153,18 @@ public class MManager {
unlinkMNodeFromPTree(args[1], args[2]);
}
}
private void initLogStream(){
if(logWriter==null){
private void initLogStream() {
if (logWriter == null) {
File logFile = new File(logFilePath);
File metadataDir = new File(metadataDirPath);
if(!metadataDir.exists()){
if (!metadataDir.exists()) {
metadataDir.mkdirs();
}
FileWriter fileWriter;
try {
fileWriter = new FileWriter(logFile,true);
fileWriter = new FileWriter(logFile, true);
logWriter = new BufferedWriter(fileWriter);
} catch (IOException e) {
e.printStackTrace();
@ -172,7 +174,21 @@ public class MManager {
}
/**
* operation: Add a path to Metadata Tree
* <p>
* Add one timeseries to metadata. Must invoke the<code>pathExist</code> and
* <code>getFileNameByPath</code> method first to check timeseries.
* </p>
*
* @param path
* the timeseries path
* @param dataType
* the datetype {@code DataType} for the timeseries
* @param encoding
* the encoding function {@code Encoding} for the timeseries
* @param args
* @throws PathErrorException
* @throws IOException
* @throws MetadataArgsErrorException
*/
public void addPathToMTree(String path, String dataType, String encoding, String[] args)
throws PathErrorException, IOException, MetadataArgsErrorException {
@ -390,6 +406,21 @@ public class MManager {
lock.readLock().unlock();
}
}
/**
* <p>Get all ColumnSchemas for the filenode path</p>
* @param path
* @return ArrayList<ColumnSchema> The list of the schema
*/
public ArrayList<ColumnSchema> getSchemaForFileName(String path){
lock.readLock().lock();
try{
return mGraph.getSchemaForOneFileNode(path);
}finally {
lock.readLock().unlock();
}
}
/**
* Calculate the count of storage-level nodes included in given path
@ -429,6 +460,16 @@ public class MManager {
}
}
public boolean checkFileNameByPath(String path) {
lock.readLock().lock();
try {
return mGraph.checkFileNameByPath(path);
} finally {
lock.readLock().unlock();
}
}
public List<String> getAllFileNames() throws PathErrorException {
lock.readLock().lock();

View File

@ -135,32 +135,27 @@ public class MTree implements Serializable {
throw new PathErrorException(String.format("The storage group can't be set to the %s node", path));
}
int i = 1;
while (i < nodeNames.length) {
while (i < nodeNames.length - 1) {
MNode temp = cur.getChild(nodeNames[i]);
if (temp == null) {
// add one child node
cur.addChild(nodeNames[i], new MNode(nodeNames[i], cur, false));
} else if (temp.isStorageLevel()) {
// before set storage group should check the path exist or not
// throw exception
throw new PathErrorException(String.format("The prefix of %s has been set to the storage group.", path));
}
cur = cur.getChild(nodeNames[i]);
i++;
}
checkStorageGroup(cur);
// for (MNode node : cur.getChildren().values()) {
// if (node.getDataFileName() == null) {
// cur = node;
// } else {
// throw new PathErrorException(
// String.format("The storage group %s has been set",
// node.getDataFileName()));
// }
// }
// set storage group level
cur = root;
for (i = 1; i < nodeNames.length; i++) {
if (cur.hasChild(nodeNames[i])) {
cur = cur.getChild(nodeNames[i]);
} else {
throw new PathErrorException(String.format("Timeseries %s does not exist", path));
}
}
if (cur.isLeaf()) {
throw new PathErrorException(String.format("The storage group can't be set to the left node"));
MNode temp = cur.getChild(nodeNames[i]);
if (temp == null) {
cur.addChild(nodeNames[i], new MNode(nodeNames[i], cur, false));
} else {
throw new PathErrorException(
String.format("The path of %s already exist, it can't be set to the storage group", path));
}
cur = cur.getChild(nodeNames[i]);
cur.setStorageLevel(true);
setDataFileName(path, cur);
}
@ -194,54 +189,6 @@ public class MTree implements Serializable {
}
}
// /**
// * Add a path to current Metadata Tree
// *
// * @param path
// * Format: root.node.(node)*
// */
// public int addPath(String path, String dataType, String encoding,
// String[] args)
// throws PathErrorException, MetadataArgsErrorException {
// int addCount = 0;
// if (getRoot() == null) {
// throw new PathErrorException("Root node is null, please initialize root
// first");
// }
// String[] nodeNames = path.trim().split(separator);
// if (nodeNames.length <= 1 || !nodeNames[0].equals(getRoot().getName())) {
// throw new PathErrorException(String.format("Timeseries %s is not right.",
// path));
// }
//
// MNode cur = getRoot();
// int i;
// for (i = 1; i < nodeNames.length - 1; i++) {
// if (!cur.hasChild(nodeNames[i])) {
// cur.addChild(nodeNames[i], new MNode(nodeNames[i], cur, false));
// addCount++;
// }
// cur = cur.getChild(nodeNames[i]);
// }
// if (cur.hasChild(nodeNames[i])) {
// throw new PathErrorException(String.format("Timeseries %s already
// exists.", path));
// } else {
// TSDataType dt = TSDataType.valueOf(dataType);
// TSEncoding ed = TSEncoding.valueOf(encoding);
// MNode leaf = new MNode(nodeNames[i], cur, dt, ed);
// if (args.length > 0) {
// for (int k = 0; k < args.length; k++) {
// String[] arg = args[k].split("=");
// leaf.getSchema().putKeyValueToArgs(arg[0], arg[1]);
// }
// }
// cur.addChild(nodeNames[i], leaf);
// addCount++;
// }
// return addCount;
// }
/**
* Delete one path from current Metadata Tree
*
@ -272,8 +219,10 @@ public class MTree implements Serializable {
cur.getParent().deleteChild(cur.getName());
cur = cur.getParent();
while (cur != null && !cur.getName().equals("root") && cur.getChildren().size() == 0) {
if (cur.isStorageLevel())
if (cur.isStorageLevel()) {
dataFileName = cur.getDataFileName();
return dataFileName;
}
cur.getParent().deleteChild(cur.getName());
cur = cur.getParent();
}
@ -365,33 +314,57 @@ public class MTree implements Serializable {
}
/**
* Get the file name for given path Notice: This method could be called if
* and only if the path includes one node whose {@code isStorageLevel} is
* true
* <p>
* Get the storage group path from the path
* </p>
*
* @param path
* @return String storage group path
* @throws PathErrorException
*/
public String getFileNameByPath(String path) throws PathErrorException {
String[] nodes = path.split(separator);
if (nodes.length == 0 || !nodes[0].equals(getRoot().getName())) {
throw new PathErrorException(String.format("Timeseries %s is not correct", path));
}
String[] nodes = path.split(separator);
MNode cur = getRoot();
for (int i = 1; i < nodes.length; i++) {
if (!cur.hasChild(nodes[i])) {
if (cur == null) {
throw new PathErrorException(
"Timeseries is not correct. Node[" + cur.getName() + "] doesn't have child named:" + nodes[i]);
}
if (cur.getDataFileName() != null) {
String.format("The prefix of the path %s is not one storage group path", path));
} else if (cur.isStorageLevel()) {
return cur.getDataFileName();
} else {
cur = cur.getChild(nodes[i]);
}
cur = cur.getChild(nodes[i]);
}
if (cur.getDataFileName() != null) {
if(cur.isStorageLevel()){
return cur.getDataFileName();
}
throw new PathErrorException(String.format(
"Timeseries %s does not set storage group, please set storage group first and then do the operation",
path));
throw new PathErrorException(String.format("The prefix of the path %s is not one storage group path", path));
}
/**
* <p>
* Check the prefix of this path is storage group path
* </p>
*
* @param path
* @return true the prefix of this path is storage group path false the
* prefix of this path is not storage group path
*/
public boolean checkFileNameByPath(String path) {
String[] nodes = path.split(separator);
MNode cur = getRoot();
for (int i = 1; i <= nodes.length; i++) {
if (cur == null) {
return false;
} else if (cur.isStorageLevel()) {
return true;
} else {
cur = cur.getChild(nodes[i]);
}
}
return false;
}
/**
@ -512,6 +485,24 @@ public class MTree implements Serializable {
res.addAll(leafMap.values());
return res;
}
/**
* <p>Get all ColumnSchemas for the filenode path</p>
* @param path
* @return ArrayList<ColumnSchema> The list of the schema
*/
public ArrayList<ColumnSchema> getSchemaForOneFileNode(String path){
String nodes[] = path.split(separator);
HashMap<String, ColumnSchema> leafMap = new HashMap<>();
MNode cur = getRoot();
for(int i = 1;i<nodes.length;i++){
cur = cur.getChild(nodes[i]);
}
putLeafToLeafMap(cur, leafMap);
ArrayList<ColumnSchema> res = new ArrayList<>();
res.addAll(leafMap.values());
return res;
}
private void putLeafToLeafMap(MNode node, HashMap<String, ColumnSchema> leafMap) {
if (node.isLeaf()) {

View File

@ -11,6 +11,7 @@ import cn.edu.tsinghua.iotdb.engine.filenode.FileNodeManager;
import cn.edu.tsinghua.iotdb.exception.ArgsErrorException;
import cn.edu.tsinghua.iotdb.exception.FileNodeManagerException;
import cn.edu.tsinghua.iotdb.exception.PathErrorException;
import cn.edu.tsinghua.iotdb.metadata.ColumnSchema;
import cn.edu.tsinghua.iotdb.metadata.MManager;
import cn.edu.tsinghua.iotdb.qp.constant.SQLConstant;
import cn.edu.tsinghua.iotdb.qp.logical.sys.AuthorOperator;
@ -41,6 +42,7 @@ public class OverflowQPExecutor extends QueryProcessExecutor {
private OverflowQueryEngine queryEngine;
private FileNodeManager fileNodeManager;
private MManager mManager = MManager.getInstance();
private final String separator = "\\.";
public OverflowQPExecutor() {
queryEngine = new OverflowQueryEngine();
@ -321,15 +323,34 @@ public class OverflowQPExecutor extends QueryProcessExecutor {
if (mManager.pathExist(path.getFullPath())) {
throw new ProcessorException(String.format("Timeseries %s already exist", path.getFullPath()));
}
if (!mManager.checkFileNameByPath(path.getFullPath())) {
throw new ProcessorException("Storage group should be created first");
}
String fileNodePath = mManager.getFileNameByPath(path.getFullPath());
ArrayList<ColumnSchema> columnSchemas = mManager.getSchemaForFileName(fileNodePath);
String lastNode = path.getMeasurementToString();
boolean isNewMeasurement = true;
for (ColumnSchema columnSchema : columnSchemas) {
if (columnSchema.getName().equals(lastNode)) {
isNewMeasurement = false;
if (!columnSchema.geTsDataType().toString().equals(dataType)
|| !columnSchema.getEncoding().toString().equals(encoding)) {
throw new ProcessorException(String.format(
"The dataType or encoding of the last node %s is conflicting in the storage group %s",
lastNode, fileNodePath));
}
}
}
mManager.addPathToMTree(path.getFullPath(), dataType, encoding, encodingArgs);
try {
String namespacePath = mManager.getFileNameByPath(path.getFullPath());
fileNodeManager.closeOneFileNode(namespacePath);
} catch (PathErrorException e) {
// no operation
} catch (FileNodeManagerException e) {
e.printStackTrace();
throw new ProcessorException(e.getMessage());
if (isNewMeasurement) {
// add time series to schema
fileNodeManager.addTimeSeries(path, dataType, encoding, encodingArgs);
}
// fileNodeManager.closeOneFileNode(namespacePath);
} catch (PathErrorException | FileNodeManagerException e) {
throw new ProcessorException(e);
}
break;
case DELETE_PATH:
@ -338,15 +359,11 @@ public class OverflowQPExecutor extends QueryProcessExecutor {
for (Path p : deletePathList) {
ArrayList<String> subPaths = mManager.getPaths(p.getFullPath());
if (subPaths.isEmpty()) {
throw new ProcessorException(String
.format("The timeseries %s does not exist and can't be deleted", p.getFullPath()));
throw new ProcessorException(
String.format("There are no timeseries in the prefix of %s path", p.getFullPath()));
}
pathSet.addAll(subPaths);
}
if (pathSet.isEmpty()) {
throw new ProcessorException(
"Timeseries does not exist and cannot be delete its metadata and data");
}
for (String p : pathSet) {
if (!mManager.pathExist(p)) {
throw new ProcessorException(String.format(
@ -358,17 +375,19 @@ public class OverflowQPExecutor extends QueryProcessExecutor {
try {
deleteDataOfTimeSeries(fullPath);
} catch (ProcessorException e) {
// no operation
throw new ProcessorException(e);
}
for (String p : fullPath) {
String nameSpacePath = null;
try {
nameSpacePath = mManager.getFileNameByPath(p);
} catch (PathErrorException e) {
// no operation
throw new ProcessorException(e);
}
// TODO: don't delete the storage group path recursively
String deleteNameSpacePath = mManager.deletePathFromMTree(p);
if (deleteNameSpacePath != null) {
// TODO: should we delete the filenode in the disk
// delete this filenode
try {
// clear filenode
@ -376,26 +395,21 @@ public class OverflowQPExecutor extends QueryProcessExecutor {
// close processor
fileNodeManager.deleteOneFileNode(deleteNameSpacePath);
} catch (FileNodeManagerException e) {
e.printStackTrace();
throw new ProcessorException(e.getMessage());
throw new ProcessorException(e);
}
} else if (nameSpacePath != null) {
// TODO: should we must close the filenode
// close this filenode
try {
fileNodeManager.closeOneFileNode(nameSpacePath);
} catch (FileNodeManagerException e) {
e.printStackTrace();
throw new ProcessorException(e.getMessage());
throw new ProcessorException(e);
}
}
}
}
break;
case SET_FILE_LEVEL:
if (!mManager.pathExist(path.getFullPath())) {
throw new ProcessorException(String.format("Timeseries %s does not exist.", path.getFullPath()));
}
// Storage group just can be set only once
mManager.setStorageLevelToMTree(path.getFullPath());
break;
default:

View File

@ -281,7 +281,7 @@ public class FileNodeProcessorTest {
bufferwritedataindisk = queryResult.getBufferwriteDataInDisk();
bufferwritedatainfiles = queryResult.getBufferwriteDataInFiles();
overflowResult = queryResult.getAllOverflowData();
assertEquals(4, bufferwritedataindisk.size());
assertEquals(3, bufferwritedataindisk.size());
assertEquals(1, bufferwritedatainfiles.size());
assertEquals(false, bufferwritedatainfiles.get(0).isClosed());
assertEquals(null, overflowResult.get(0));
@ -318,7 +318,7 @@ public class FileNodeProcessorTest {
bufferwritedataindisk = queryResult.getBufferwriteDataInDisk();
bufferwritedatainfiles = queryResult.getBufferwriteDataInFiles();
overflowResult = queryResult.getAllOverflowData();
assertEquals(4, bufferwritedataindisk.size());
assertEquals(3, bufferwritedataindisk.size());
assertEquals(2, bufferwritedatainfiles.size());
IntervalFileNode intervalFileNode = bufferwritedatainfiles.get(0);
assertEquals(true, intervalFileNode.isClosed());
@ -476,7 +476,7 @@ public class FileNodeProcessorTest {
List<RowGroupMetaData> bufferwritedataindisk = queryResult.getBufferwriteDataInDisk();
List<IntervalFileNode> bufferwritedatainfiles = queryResult.getBufferwriteDataInFiles();
List<Object> overflowResult = queryResult.getAllOverflowData();
assertEquals(4, bufferwritedataindisk.size());
assertEquals(3, bufferwritedataindisk.size());
assertEquals(1, bufferwritedatainfiles.size());
assertEquals(false, bufferwritedatainfiles.get(0).isClosed());
assertEquals(null, overflowResult.get(0));
@ -495,7 +495,7 @@ public class FileNodeProcessorTest {
bufferwritedataindisk = queryResult.getBufferwriteDataInDisk();
bufferwritedatainfiles = queryResult.getBufferwriteDataInFiles();
overflowResult = queryResult.getAllOverflowData();
assertEquals(4, bufferwritedataindisk.size());
assertEquals(3, bufferwritedataindisk.size());
assertEquals(1, bufferwritedatainfiles.size());
assertEquals(false, bufferwritedatainfiles.get(0).isClosed());
assertEquals(null, overflowResult.get(0));

View File

@ -14,6 +14,10 @@ public class MetadataManagerHelper {
mmanager = MManager.getInstance();
mmanager.clear();
try {
mmanager.setStorageLevelToMTree("root.vehicle.d0");
mmanager.setStorageLevelToMTree("root.vehicle.d1");
mmanager.setStorageLevelToMTree("root.vehicle.d2");
mmanager.addPathToMTree("root.vehicle.d0.s0", "INT32", "RLE", new String[0]);
mmanager.addPathToMTree("root.vehicle.d0.s1", "INT64", "RLE", new String[0]);
mmanager.addPathToMTree("root.vehicle.d0.s2", "FLOAT", "RLE", new String[0]);
@ -35,9 +39,6 @@ public class MetadataManagerHelper {
mmanager.addPathToMTree("root.vehicle.d2.s4", "BOOLEAN", "PLAIN", new String[0]);
mmanager.addPathToMTree("root.vehicle.d2.s5", "TEXT", "PLAIN", new String[0]);
mmanager.setStorageLevelToMTree("root.vehicle.d0");
mmanager.setStorageLevelToMTree("root.vehicle.d1");
mmanager.setStorageLevelToMTree("root.vehicle.d2");
} catch (Exception e) {
throw new RuntimeException("Initialize the metadata manager failed",e);
}
@ -48,6 +49,8 @@ public class MetadataManagerHelper {
mmanager = MManager.getInstance();
mmanager.clear();
try {
mmanager.setStorageLevelToMTree("root.vehicle");
mmanager.addPathToMTree("root.vehicle.d0.s0", "INT32", "RLE", new String[0]);
mmanager.addPathToMTree("root.vehicle.d0.s1", "INT64", "RLE", new String[0]);
mmanager.addPathToMTree("root.vehicle.d0.s2", "FLOAT", "RLE", new String[0]);
@ -69,7 +72,6 @@ public class MetadataManagerHelper {
mmanager.addPathToMTree("root.vehicle.d2.s4", "BOOLEAN", "PLAIN", new String[0]);
mmanager.addPathToMTree("root.vehicle.d2.s5", "TEXT", "PLAIN", new String[0]);
mmanager.setStorageLevelToMTree("root.vehicle");
} catch (Exception e) {
throw new RuntimeException("Initialize the metadata manager failed",e);

View File

@ -25,7 +25,10 @@ public class MManagerAdvancedTest {
mmanager = MManager.getInstance();
mmanager.clear();
mmanager.setStorageLevelToMTree("root.vehicle.d1");
mmanager.setStorageLevelToMTree("root.vehicle.d0");
mmanager.addPathToMTree("root.vehicle.d0.s0", "INT32", "RLE", new String[0]);
mmanager.addPathToMTree("root.vehicle.d0.s1", "INT64", "RLE", new String[0]);
mmanager.addPathToMTree("root.vehicle.d0.s2", "FLOAT", "RLE", new String[0]);
@ -40,8 +43,7 @@ public class MManagerAdvancedTest {
mmanager.addPathToMTree("root.vehicle.d1.s4", "BOOLEAN", "PLAIN", new String[0]);
mmanager.addPathToMTree("root.vehicle.d1.s5", "TEXT", "PLAIN", new String[0]);
mmanager.setStorageLevelToMTree("root.vehicle.d1");
mmanager.setStorageLevelToMTree("root.vehicle.d0");
}

View File

@ -39,6 +39,19 @@ public class MManagerBasicTest {
assertEquals(manager.pathExist("root.laptop"), false);
try {
manager.setStorageLevelToMTree("root.laptop.d1");
} catch (PathErrorException | IOException e) {
e.printStackTrace();
fail(e.getMessage());
}
try {
manager.setStorageLevelToMTree("root.laptop");
} catch (PathErrorException | IOException e) {
assertEquals("The path of root.laptop already exist, it can't be set to the storage group", e.getMessage());
}
try {
manager.addPathToMTree("root.laptop.d1.s0","INT32","RLE",new String[0]);
} catch (PathErrorException | MetadataArgsErrorException | IOException e) {
@ -62,6 +75,8 @@ public class MManagerBasicTest {
e.printStackTrace();
fail(e.getMessage());
}
// just delete s0, and don't delete root.laptop.d1??
// delete storage group or not
assertEquals(manager.pathExist("root.laptop.d1.s1"), false);
try {
manager.deletePathFromMTree("root.laptop.d1.s0");
@ -70,10 +85,17 @@ public class MManagerBasicTest {
fail(e.getMessage());
}
assertEquals(manager.pathExist("root.laptop.d1.s0"), false);
assertEquals(manager.pathExist("root.laptop.d1"), false);
assertEquals(manager.pathExist("root.laptop"), false);
assertEquals(manager.pathExist("root.laptop.d1"), true);
assertEquals(manager.pathExist("root.laptop"), true);
assertEquals(manager.pathExist("root"), true);
// can't delete the storage group
// try {
// manager.setStorageLevelToMTree("root.laptop");
// } catch (PathErrorException | IOException e) {
// fail(e.getMessage());
// }
try {
manager.addPathToMTree("root.laptop.d1.s1","INT32","RLE",new String[0]);
} catch (PathErrorException | MetadataArgsErrorException | IOException e1) {
@ -87,31 +109,10 @@ public class MManagerBasicTest {
e1.printStackTrace();
fail(e1.getMessage());
}
try {
manager.addPathToMTree("root.laptop.d2.s0","INT32","RLE",new String[0]);
} catch (PathErrorException | MetadataArgsErrorException | IOException e1) {
e1.printStackTrace();
fail(e1.getMessage());
}
try {
manager.addPathToMTree("root.laptop.d2.s1","INT32","RLE",new String[0]);
} catch (PathErrorException | MetadataArgsErrorException | IOException e1) {
e1.printStackTrace();
fail(e1.getMessage());
}
try {
manager.setStorageLevelToMTree("root.laptop.d1");
} catch (PathErrorException | IOException e) {
e.printStackTrace();
fail(e.getMessage());
}
try {
manager.setStorageLevelToMTree("root.laptop");
} catch (PathErrorException | IOException e) {
assertEquals(String.format("The storage group %s has been set", "root.laptop.d1"), e.getMessage());
}
assertEquals(false, manager.pathExist("root.laptop.d2"));
assertEquals(false, manager.checkFileNameByPath("root.laptop.d2"));
try {
manager.deletePathFromMTree("root.laptop.d1.s0");
} catch (PathErrorException | IOException e) {
@ -125,20 +126,15 @@ public class MManagerBasicTest {
fail(e.getMessage());
}
try {
manager.setStorageLevelToMTree("root.laptop");
} catch (PathErrorException | IOException e) {
fail(e.getMessage());
}
try {
manager.setStorageLevelToMTree("root.laptop.d2");
} catch (PathErrorException | IOException e) {
assertEquals(String.format("The storage group %s has been set", "root.laptop"),e.getMessage());
assertEquals(String.format("The path of %s already exist, it can't be set to the storage group", "root.laptop.d2"),e.getMessage());
}
/*
* check file level
*/
assertEquals(manager.pathExist("root.laptop.d2.s1"),true);
assertEquals(manager.pathExist("root.laptop.d2.s1"),false);
List<Path> paths = new ArrayList<>();
paths.add(new Path("root.laptop.d2.s1"));
try {
@ -147,6 +143,21 @@ public class MManagerBasicTest {
e.printStackTrace();
fail(e.getMessage());
}
try {
manager.addPathToMTree("root.laptop.d2.s1","INT32","RLE",new String[0]);
} catch (PathErrorException | MetadataArgsErrorException | IOException e1) {
e1.printStackTrace();
fail(e1.getMessage());
}
try {
manager.addPathToMTree("root.laptop.d2.s0","INT32","RLE",new String[0]);
} catch (PathErrorException | MetadataArgsErrorException | IOException e1) {
e1.printStackTrace();
fail(e1.getMessage());
}
try {
manager.deletePathFromMTree("root.laptop.d2.s0");
} catch (PathErrorException | IOException e) {
@ -159,27 +170,21 @@ public class MManagerBasicTest {
e.printStackTrace();
fail(e.getMessage());
}
/*
* root d1 s0
*/
try {
manager.addPathToMTree("root.laptop.d1.s0","INT32","RLE",new String[0]);
} catch (PathErrorException | MetadataArgsErrorException | IOException e1) {
e1.printStackTrace();
fail(e1.getMessage());
}
try {
manager.addPathToMTree("root.laptop.d1.s1","INT32","RLE",new String[0]);
} catch (PathErrorException | MetadataArgsErrorException | IOException e1) {
e1.printStackTrace();
fail(e1.getMessage());
}
try {
manager.setStorageLevelToMTree("root.laptop.d1");
} catch (PathErrorException | IOException e) {
e.printStackTrace();
fail(e.getMessage());
}
paths = new ArrayList<>();
paths.add(new Path("root.laptop.d1.s0"));
try {

View File

@ -17,6 +17,7 @@ import org.junit.Test;
public class MTreeTest {
private static TsfileDBConfig dbconfig = TsfileDBDescriptor.getInstance().getConfig();
@Before
public void setUp() throws Exception {
MManager.getInstance().clear();
@ -27,9 +28,9 @@ public class MTreeTest {
MManager.getInstance().flushObjectToFile();
EngineTestHelper.delete(dbconfig.metadataDir);
}
@Test
public void testAddLeftNodePath(){
public void testAddLeftNodePath() {
MTree root = new MTree("root");
try {
root.addTimeseriesPath("root.laptop.d1.s1", "INT32", "RLE", new String[0]);
@ -40,8 +41,8 @@ public class MTreeTest {
try {
root.addTimeseriesPath("root.laptop.d1.s1.b", "INT32", "RLE", new String[0]);
} catch (PathErrorException e) {
Assert.assertEquals(String.format("The Node [%s] is left node, the timeseries %s can't be created",
"s1","root.laptop.d1.s1.b" ), e.getMessage());
Assert.assertEquals(String.format("The Node [%s] is left node, the timeseries %s can't be created", "s1",
"root.laptop.d1.s1.b"), e.getMessage());
}
}
@ -65,22 +66,30 @@ public class MTreeTest {
Assert.assertEquals(String.format("Timeseries %s is not right.", "aa.bb.cc"), e.getMessage());
}
}
@Test
public void testAddAndQueryPath() {
MTree root = new MTree("root");
try {
assertEquals(false, root.isPathExist("root.a.d0"));
assertEquals(false, root.checkFileNameByPath("root.a.d0"));
root.setStorageGroup("root.a.d0");
root.addTimeseriesPath("root.a.d0.s0", "INT32", "RLE", new String[0]);
root.addTimeseriesPath("root.a.d0.s1", "INT32", "RLE", new String[0]);
root.setStorageGroup("root.a.d0");
assertEquals(false, root.isPathExist("root.a.d1"));
assertEquals(false, root.checkFileNameByPath("root.a.d1"));
root.setStorageGroup("root.a.d1");
root.addTimeseriesPath("root.a.d1.s0", "INT32", "RLE", new String[0]);
root.addTimeseriesPath("root.a.d1.s1", "INT32", "RLE", new String[0]);
root.setStorageGroup("root.a.d1");
root.addTimeseriesPath("root.a.b.d0.s0", "INT32", "RLE", new String[0]);
root.setStorageGroup("root.a.b.d0");
root.addTimeseriesPath("root.a.b.d0.s0", "INT32", "RLE", new String[0]);
} catch (PathErrorException e1) {
fail(e1.getMessage());
e1.printStackTrace();
}
try {
HashMap<String, ArrayList<String>> result = root.getAllPath("root.a.*.s0");
assertEquals(result.size(), 2);
@ -88,53 +97,62 @@ public class MTreeTest {
assertEquals(result.get("root.a.d1").get(0), "root.a.d1.s0");
assertTrue(result.containsKey("root.a.d0"));
assertEquals(result.get("root.a.d0").get(0), "root.a.d0.s0");
result = root.getAllPath("root.a.*.*.s0");
assertTrue(result.containsKey("root.a.b.d0"));
assertEquals(result.get("root.a.b.d0").get(0), "root.a.b.d0.s0");
} catch (PathErrorException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Test
public void testSetStorageGroup(){
MTree root = new MTree("root");
assertEquals(root.isPathExist("root.laptop.d1.s0"), false);
assertEquals(root.isPathExist("root.laptop.d1.s1"), false);
assertEquals(root.isPathExist("root.laptop.d2.s0"), false);
assertEquals(root.isPathExist("root.laptop.d2.s1"), false);
try {
root.addTimeseriesPath("root.laptop.d1.s0", "INT32", "RLE", new String[0]);
root.addTimeseriesPath("root.laptop.d1.s1", "INT32", "RLE", new String[0]);
root.addTimeseriesPath("root.laptop.d2.s0", "INT32", "RLE", new String[0]);
root.addTimeseriesPath("root.laptop.d2.s1", "INT32", "RLE", new String[0]);
} catch (PathErrorException e) {
e.printStackTrace();
fail(e.getMessage());
}
try {
root.setStorageGroup("root.laptop.d1");
} catch (PathErrorException e) {
e.printStackTrace();
fail(e.getMessage());
}
try {
root.setStorageGroup("root.laptop");
} catch (PathErrorException e) {
Assert.assertEquals(String.format("The storage group %s has been set","root.laptop.d1" ), e.getMessage());
}
try {
root.setStorageGroup("root.laptop.d2");
} catch (PathErrorException e) {
e.printStackTrace();
fail(e.getMessage());
}
}
@Test
public void testSetStorageGroup() {
// set storage group first
MTree root = new MTree("root");
try {
root.setStorageGroup("root.laptop.d1");
assertEquals(true, root.isPathExist("root.laptop.d1"));
assertEquals(true, root.checkFileNameByPath("root.laptop.d1"));
assertEquals("root.laptop.d1", root.getFileNameByPath("root.laptop.d1"));
assertEquals(false, root.isPathExist("root.laptop.d1.s1"));
assertEquals(true, root.checkFileNameByPath("root.laptop.d1.s1"));
assertEquals("root.laptop.d1", root.getFileNameByPath("root.laptop.d1.s1"));
} catch (PathErrorException e) {
e.printStackTrace();
fail(e.getMessage());
}
try {
root.setStorageGroup("root.laptop.d2");
} catch (PathErrorException e) {
fail(e.getMessage());
}
try {
root.setStorageGroup("root.laptop");
} catch (PathErrorException e) {
assertEquals("The path of root.laptop already exist, it can't be set to the storage group", e.getMessage());
}
// check timeseries
assertEquals(root.isPathExist("root.laptop.d1.s0"), false);
assertEquals(root.isPathExist("root.laptop.d1.s1"), false);
assertEquals(root.isPathExist("root.laptop.d2.s0"), false);
assertEquals(root.isPathExist("root.laptop.d2.s1"), false);
try {
assertEquals("root.laptop.d1", root.getFileNameByPath("root.laptop.d1.s0"));
root.addTimeseriesPath("root.laptop.d1.s0", "INT32", "RLE", new String[0]);
assertEquals("root.laptop.d1", root.getFileNameByPath("root.laptop.d1.s1"));
root.addTimeseriesPath("root.laptop.d1.s1", "INT32", "RLE", new String[0]);
assertEquals("root.laptop.d2", root.getFileNameByPath("root.laptop.d2.s0"));
root.addTimeseriesPath("root.laptop.d2.s0", "INT32", "RLE", new String[0]);
assertEquals("root.laptop.d2", root.getFileNameByPath("root.laptop.d2.s1"));
root.addTimeseriesPath("root.laptop.d2.s1", "INT32", "RLE", new String[0]);
} catch (PathErrorException e) {
e.printStackTrace();
fail(e.getMessage());
}
try {
root.deletePath("root.laptop.d1.s0");
} catch (PathErrorException e) {
@ -154,5 +172,4 @@ public class MTreeTest {
assertEquals(root.isPathExist("root.laptop.d2"), true);
assertEquals(root.isPathExist("root.laptop.d2.s0"), true);
}
}