modify start shell

This commit is contained in:
qiaojialingithub 2017-05-15 09:45:57 +08:00
commit d885928c08
64 changed files with 1843 additions and 767 deletions

3
.gitignore vendored
View File

@ -2,7 +2,7 @@
**/.classpath
**/.project
**/.settings/
src/main/resources/
# src/main/resources/
# intellij IDE files
**/*.iml
**/.idea/
@ -16,6 +16,7 @@ src/main/resources/
# intermediately generated locally
**/logs/
src/main/resources/*
tsfile-timeseries/src/main/resources/logback.out.out.xml
tsfile-timeseries/src/main/resources/logback.out.xml

View File

@ -1,40 +0,0 @@
#!/bin/sh
mvn clean package -DskipTests
if [ -d "./tsfiledb/lib/" ]; then
rm -r ./tsfiledb/lib/
fi
if [ -d "./tsfiledb/doc/" ]; then
rm -r ./tsfiledb/doc/
fi
if [ -d "./tsfiledb/tmp/" ]; then
rm -r ./tsfiledb/tmp/
fi
if [ -d "./tsfiledb/logs/" ]; then
rm -r ./tsfiledb/logs/
fi
if [ -f "./tsfiledb/conf/tsfile.yaml" ]; then
rm ./tsfiledb/conf/tsfile.yaml
fi
if [ -f "./tsfiledb.tar.gz" ]; then
rm ./tsfiledb.tar.gz
fi
mkdir ./tsfiledb/lib/
mkdir ./tsfiledb/doc/
mkdir ./tsfiledb/tmp/
mkdir ./tsfiledb/logs/
#cp ./tsfile-service/target/lib/* ./delta/lib/
#cp ./tsfile-service/target/tsfile-service-0.0.1-SNAPSHOT.jar ./delta/lib/
#cp ./tsfile-jdbc/target/lib/* ./delta/lib/
#cp ./tsfile-jdbc/target/tsfile-jdbc-0.0.1-SNAPSHOT.jar ./delta/lib/
#cp ./tsfile-common/src/main/resources/tsfile.yaml ./delta/conf/
tar -zcf ./tsfiledb.tar.gz ./tsfiledb/

19
pom.xml
View File

@ -4,7 +4,7 @@
<groupId>cn.edu.thu</groupId>
<artifactId>tsfiledb</artifactId>
<version>0.0.1-SNAPSHOT</version>
<version>0.0.1</version>
<packaging>jar</packaging>
<name>tsfiledb</name>
@ -33,12 +33,6 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>jline</groupId>
<artifactId>jline</artifactId>
@ -50,17 +44,11 @@
<artifactId>derby</artifactId>
<version>${derby.version}</version>
</dependency>
<dependency>
<dependency>
<groupId>org.antlr</groupId>
<artifactId>antlr3-maven-plugin</artifactId>
<version>3.4</version>
</dependency>
<!-- <dependency>
<groupId>org.antlr</groupId>
<artifactId>antlr4-runtime</artifactId>
<version>4.5.3</version>
</dependency> -->
</dependency>
</dependencies>
<build>
@ -155,6 +143,7 @@
<treatWarningsAsErrors>true</treatWarningsAsErrors>
</configuration>
</plugin> -->
</plugins>
</build>
</project>

View File

@ -4,27 +4,22 @@ import cn.edu.thu.tsfiledb.auth.dao.Authorizer;
import cn.edu.thu.tsfiledb.auth.dao.DBdao;
import cn.edu.thu.tsfiledb.auth.model.User;
public class TestAu {
public class AuthDemon {
public static void main(String[] args) {
// TODO Auto-generated method stub
// 启动server的时候需要open db
// open db
DBdao dBdao = new DBdao();
dBdao.open();
System.out.println("start server.....");
// 操作数据库信息
User user = new User("test", "test");
User user = new User("demo", "demo");
// operation
try {
Authorizer.createUser(user.getUserName(), user.getPassWord());
Authorizer.deleteUser(user.getUserName());
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
// 关闭server的时候需要 close db
System.out.println("close the server...");
// close db
dBdao.close();
}

View File

@ -9,8 +9,7 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import cn.edu.thu.tsfile.common.conf.TSFileDescriptor;
import cn.edu.thu.tsfiledb.conf.TSFileDBDescriptor;
/**
* @author liukun
@ -32,7 +31,11 @@ public class DBdao {
* @param dBName
*/
public DBdao(String dBName) {
String path = TSFileDescriptor.getInstance().getConfig().derbyHome + File.separator + dBName;
String derbyDirPath = TSFileDBDescriptor.getInstance().getConfig().derbyHome;
if (derbyDirPath.length() > 0 && derbyDirPath.charAt(derbyDirPath.length() - 1) != File.separatorChar) {
derbyDirPath = derbyDirPath + File.separatorChar;
}
String path = derbyDirPath + dBName;
DBName = path;
}
@ -121,7 +124,7 @@ public class DBdao {
public boolean createOriTable() {
boolean state = false;
try {
statement.executeUpdate(InitTable.createTableSql);
statement.executeUpdate(InitTable.createUserTableSql);
statement.executeUpdate(InitTable.createRoleTableSql);
statement.executeUpdate(InitTable.createUserRoleRelTableSql);
statement.executeUpdate(InitTable.creteUserPermissionTableSql);
@ -160,11 +163,11 @@ public class DBdao {
public void close() {
closeStatement();
closeConnection();
// try {
// DriverManager.getConnection(protocal + shutdown);
// } catch (SQLException e) {
// e.printStackTrace();
// }
// try {
// DriverManager.getConnection(protocal + shutdown);
// } catch (SQLException e) {
// e.printStackTrace();
// }
}
public static Statement getStatement() {

View File

@ -6,7 +6,7 @@ package cn.edu.thu.tsfiledb.auth.dao;
*/
public class InitTable {
public static String createTableSql = "create table userTable("
public static String createUserTableSql = "create table userTable("
+ "id INT generated always as identity(start with 1,increment by 1) not null primary key,"
+ "userName VARCHAR(20) not null unique," + "password VARCHAR(20) not null,"
+ "locked CHAR(1) check (locked='t' or locked='f')," + "validTime VARCHAR(20))";
@ -32,6 +32,6 @@ public class InitTable {
+ "nodeName VARCHAR(20) not null," + "permissionId INT not null,"
+ "constraint pk_rolepermission primary key (roleId,nodeName,permissionId),"
+ "foreign key (roleId) references roleTable(id) on delete cascade)";
public static String insertIntoUserToTableSql = "insert into usertable (username,password) values('root','root')";
}

View File

@ -9,7 +9,6 @@ import java.util.List;
import cn.edu.thu.tsfiledb.auth.model.DBContext;
import cn.edu.thu.tsfiledb.auth.model.Role;
/**
* @author liukun
*

View File

@ -6,16 +6,20 @@ import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import cn.edu.thu.tsfiledb.auth.model.DBContext;
import cn.edu.thu.tsfiledb.auth.model.User;
/**
* @author liukun
*
*/
public class UserDao {
private static final Logger LOGGER = LoggerFactory.getLogger(UserDao.class);
public int createUser(Statement statement, User user) {
String sql = "insert into " + DBContext.userTable + " (userName,passWord) " + " values ('" + user.getUserName()
@ -25,6 +29,7 @@ public class UserDao {
state = statement.executeUpdate(sql);
} catch (SQLException e) {
e.printStackTrace();
LOGGER.error("Execute statement error, the statement is {}", sql);
}
return state;
}
@ -36,6 +41,7 @@ public class UserDao {
state = statement.executeUpdate(sql);
} catch (SQLException e) {
e.printStackTrace();
LOGGER.error("Execute statement error, the statement is {}", sql);
}
return state;
}
@ -89,7 +95,9 @@ public class UserDao {
}
} catch (SQLException e) {
e.printStackTrace();
LOGGER.error("Execute query statement error, the statement is {}", sql);
}
return user;
}

View File

@ -2,10 +2,13 @@ package cn.edu.thu.tsfiledb.auth.model;
/**
* The exception for authority model
* Created by liukun on 17/1/4.
* @author liukun
*
*/
public class AuthException extends Exception {
public AuthException(String format, String userName, String roleName) {
private static final long serialVersionUID = 5091102941209301301L;
public AuthException(String format, String userName, String roleName) {
super();
}

View File

@ -1,12 +1,11 @@
package cn.edu.thu.tsfiledb.auth.model;
/**
* The
* @author liukun
*
*
*/
public class DBContext {
public static final String userTable = "userTable";
public static final String roleTable = "roleTable";
public static final String userRoleRel = "userRoleRelTable";

View File

@ -5,9 +5,10 @@ package cn.edu.thu.tsfiledb.auth.model;
*
*/
public class Role {
private int id;
private String roleName;
/**
* @param id
* @param roleName
@ -16,13 +17,13 @@ public class Role {
this.id = id;
this.roleName = roleName;
}
public Role(String roleName) {
this.roleName = roleName;
}
public Role(){
public Role() {
}
/**
@ -33,7 +34,8 @@ public class Role {
}
/**
* @param id the id to set
* @param id
* the id to set
*/
public void setId(int id) {
this.id = id;
@ -47,11 +49,11 @@ public class Role {
}
/**
* @param roleName the roleName to set
* @param roleName
* the roleName to set
*/
public void setRoleName(String roleName) {
this.roleName = roleName;
}
}

View File

@ -5,12 +5,13 @@ package cn.edu.thu.tsfiledb.auth.model;
*
*/
public class User {
private int id;
private String userName;
private String passWord;
private boolean locked;//true - t false - f
private boolean locked;// true - t false - f
private String validTime;
/**
* @param id
* @param userName
@ -25,11 +26,11 @@ public class User {
this.locked = isLock;
this.validTime = validTime;
}
public User(){
public User() {
}
public User(String userName, String passWord) {
this.userName = userName;
this.passWord = passWord;
@ -43,7 +44,8 @@ public class User {
}
/**
* @param id the id to set
* @param id
* the id to set
*/
public void setId(int id) {
this.id = id;
@ -57,7 +59,8 @@ public class User {
}
/**
* @param userName the userName to set
* @param userName
* the userName to set
*/
public void setUserName(String userName) {
this.userName = userName;
@ -71,7 +74,8 @@ public class User {
}
/**
* @param passWord the passWord to set
* @param passWord
* the passWord to set
*/
public void setPassWord(String passWord) {
this.passWord = passWord;
@ -85,7 +89,8 @@ public class User {
}
/**
* @param isLock the isLock to set
* @param isLock
* the isLock to set
*/
public void setLock(boolean isLock) {
this.locked = isLock;
@ -99,11 +104,11 @@ public class User {
}
/**
* @param validTime the validTime to set
* @param validTime
* the validTime to set
*/
public void setValidTime(String validTime) {
this.validTime = validTime;
}
}

View File

@ -9,7 +9,8 @@ public class UserPermission {
private int id;
private int userId;
private String nodeName;
private int permissionId;// 权限的值必须是permission中的在数据库中使用check方法来指定
// the permissionId should be from Permission class
private int permissionId;
/**
* @param id
@ -34,7 +35,6 @@ public class UserPermission {
this.permissionId = permissionId;
}
/**
* @return the id
*/

View File

@ -8,6 +8,7 @@ public class UserRoleRel {
private int id;
private int userId;
private int roleId;
/**
* @param id
* @param userId
@ -18,48 +19,59 @@ public class UserRoleRel {
this.userId = userId;
this.roleId = roleId;
}
public UserRoleRel(int userId,int roleId){
public UserRoleRel(int userId, int roleId) {
this.userId = userId;
this.roleId = roleId;
}
public UserRoleRel() {
}
/**
* @return the id
*/
public int getId() {
return id;
}
/**
* @param id the id to set
* @param id
* the id to set
*/
public void setId(int id) {
this.id = id;
}
/**
* @return the userId
*/
public int getUserId() {
return userId;
}
/**
* @param userId the userId to set
* @param userId
* the userId to set
*/
public void setUserId(int userId) {
this.userId = userId;
}
/**
* @return the roleId
*/
public int getRoleId() {
return roleId;
}
/**
* @param roleId the roleId to set
* @param roleId
* the roleId to set
*/
public void setRoleId(int roleId) {
this.roleId = roleId;
}
}

View File

@ -0,0 +1,48 @@
package cn.edu.thu.tsfiledb.conf;
public class TSFileDBConfig {
/**
* the maximum number of writing instances existing in same time.
*/
public int writeInstanceThreshold = 5;
/**
* data directory of Overflow data
*/
public String overflowDataDir = "src/main/resources/data/overflow";
/**
* data directory of fileNode data
*/
public String FileNodeDir = "src/main/resources/data/digest";
/**
* data directory of bufferWrite data
*/
public String BufferWriteDir = "src/main/resources/data/delta";
public String metadataDir = "src/main/resources/metadata";
public String derbyHome = "src/main/resources/derby";
/**
* maximum concurrent thread number for merging overflow
*/
public int mergeConcurrentThreadNum = 10;
/**
* the maximum number of concurrent file node instances
*/
public int maxFileNodeNum = 1000;
/**
* the maximum number of concurrent overflow instances
*/
public int maxOverflowNodeNum = 100;
/**
* the maximum number of concurrent buffer write instances
*/
public int maxBufferWriteNodeNum = 50;
public int defaultFetchSize = 1000000;
public String writeLogPath = "src/main/resources/writeLog.log";
public TSFileDBConfig() {
}
}

View File

@ -0,0 +1,105 @@
package cn.edu.thu.tsfiledb.conf;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import cn.edu.thu.tsfile.common.constant.SystemConstant;
public class TSFileDBDescriptor {
private static final Logger LOGGER = LoggerFactory.getLogger(TSFileDBDescriptor.class);
private static TSFileDBDescriptor descriptor = new TSFileDBDescriptor();
private final String CONFIG_DEFAULT_PATH = "/tsfiledb.properties";
private TSFileDBDescriptor() {
loadYaml();
}
public static TSFileDBDescriptor getInstance() {
return descriptor;
}
public TSFileDBConfig getConfig() {
return conf;
}
private TSFileDBConfig conf = new TSFileDBConfig();
/**
* load an properties file and set TsfileDBConfig variables
*
*/
private void loadYaml() {
String url = System.getProperty(SystemConstant.TSFILE_HOME, CONFIG_DEFAULT_PATH);
InputStream inputStream = null;
if (url.equals(CONFIG_DEFAULT_PATH)) {
inputStream = this.getClass().getResourceAsStream(url);
return;
} else {
url = url + "/conf/tsfiledb.properties";
try {
File file = new File(url);
inputStream = new FileInputStream(file);
} catch (FileNotFoundException e) {
LOGGER.error("Fail to find config file {}", url, e);
System.exit(1);
}
}
LOGGER.info("Start to read config file {}", url);
Properties properties = new Properties();
try {
properties.load(inputStream);
conf.writeInstanceThreshold = Integer.parseInt(properties.getProperty("writeInstanceThreshold", conf.writeInstanceThreshold + ""));
conf.overflowDataDir = properties.getProperty("overflowDataDir", url+"/data/overflow");
conf.FileNodeDir = properties.getProperty("FileNodeDir", url+"/data/digest");
conf.BufferWriteDir = properties.getProperty("BufferWriteDir", url+"/data/delta");
conf.metadataDir = properties.getProperty("metadataDir", url+"/data/metadata");
conf.derbyHome = properties.getProperty("derbyHome", url+"/data/derby");
conf.mergeConcurrentThreadNum = Integer.parseInt(properties.getProperty("mergeConcurrentThreadNum", conf.mergeConcurrentThreadNum + ""));
conf.maxFileNodeNum = Integer.parseInt(properties.getProperty("maxFileNodeNum", conf.maxFileNodeNum + ""));
conf.maxOverflowNodeNum = Integer.parseInt(properties.getProperty("maxOverflowNodeNum", conf.maxOverflowNodeNum + ""));
conf.maxBufferWriteNodeNum = Integer.parseInt(properties.getProperty("maxBufferWriteNodeNum", conf.maxBufferWriteNodeNum + ""));
conf.defaultFetchSize = Integer.parseInt(properties.getProperty("defaultFetchSize", conf.defaultFetchSize + ""));
conf.writeLogPath = properties.getProperty("writeLogPath", url+"/data/writeLog.log");
} catch (IOException e) {
LOGGER.warn("Cannot load config file, use default configuration", e);
} catch (Exception e) {
LOGGER.warn("Error format in config file, use default configuration", e);
}
if (inputStream != null) {
try {
inputStream.close();
} catch (IOException e) {
LOGGER.error("Fail to close config file input stream", e);
}
}
}
public static void main(String[] args) {
TSFileDBDescriptor descriptor = TSFileDBDescriptor.getInstance();
TSFileDBConfig config = descriptor.getConfig();
System.out.println(config.writeInstanceThreshold);
System.out.println(config.overflowDataDir);
System.out.println(config.FileNodeDir);
System.out.println(config.BufferWriteDir);
System.out.println(config.metadataDir);
System.out.println(config.derbyHome);
System.out.println(config.mergeConcurrentThreadNum);
System.out.println(config.maxFileNodeNum);
System.out.println(config.maxOverflowNodeNum);
System.out.println(config.maxBufferWriteNodeNum);
System.out.println(config.defaultFetchSize);
System.out.println(config.writeLogPath);
}
}

View File

@ -43,6 +43,8 @@ import cn.edu.thu.tsfile.timeseries.write.record.DataPoint;
import cn.edu.thu.tsfile.timeseries.write.record.TSRecord;
import cn.edu.thu.tsfile.timeseries.write.schema.FileSchema;
import cn.edu.thu.tsfile.timeseries.write.series.IRowGroupWriter;
import cn.edu.thu.tsfiledb.conf.TSFileDBConfig;
import cn.edu.thu.tsfiledb.conf.TSFileDBDescriptor;
import cn.edu.thu.tsfiledb.engine.exception.BufferWriteProcessorException;
import cn.edu.thu.tsfiledb.engine.lru.LRUProcessor;
import cn.edu.thu.tsfiledb.engine.utils.FlushState;
@ -54,6 +56,7 @@ public class BufferWriteProcessor extends LRUProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(BufferWriteProcessor.class);
private static final TSFileConfig TsFileConf = TSFileDescriptor.getInstance().getConfig();
private static final TSFileDBConfig TsFileDBConf = TSFileDBDescriptor.getInstance().getConfig();
private static final MManager mManager = MManager.getInstance();
private BufferWriteIndex workingBufferIndex;
@ -87,7 +90,7 @@ public class BufferWriteProcessor extends LRUProcessor {
this.fileName = fileName;
String restoreFileName = fileName + restoreFile;
String bufferwriteDirPath = TsFileConf.BufferWriteDir;
String bufferwriteDirPath = TsFileDBConf.BufferWriteDir;
if (bufferwriteDirPath.length() > 0
&& bufferwriteDirPath.charAt(bufferwriteDirPath.length() - 1) != File.separatorChar) {
bufferwriteDirPath = bufferwriteDirPath + File.separatorChar;

View File

@ -21,6 +21,8 @@ import cn.edu.thu.tsfile.file.metadata.enums.TSDataType;
import cn.edu.thu.tsfile.timeseries.filter.definition.SingleSeriesFilterExpression;
import cn.edu.thu.tsfile.timeseries.write.record.DataPoint;
import cn.edu.thu.tsfile.timeseries.write.record.TSRecord;
import cn.edu.thu.tsfiledb.conf.TSFileDBConfig;
import cn.edu.thu.tsfiledb.conf.TSFileDBDescriptor;
import cn.edu.thu.tsfiledb.engine.bufferwrite.Action;
import cn.edu.thu.tsfiledb.engine.bufferwrite.BufferWriteProcessor;
import cn.edu.thu.tsfiledb.engine.bufferwrite.FileNodeConstants;
@ -38,6 +40,7 @@ public class FileNodeManager extends LRUManager<FileNodeProcessor> {
private static final Logger LOGGER = LoggerFactory.getLogger(FileNodeManager.class);
private static final TSFileConfig TsFileConf = TSFileDescriptor.getInstance().getConfig();
private static final TSFileDBConfig TsFileDBConf = TSFileDBDescriptor.getInstance().getConfig();
private static final String restoreFileName = "fileNodeManager.restore";
private final String fileNodeManagerStoreFile;
@ -49,8 +52,6 @@ public class FileNodeManager extends LRUManager<FileNodeProcessor> {
private FileNodeManagerStatus fileNodeManagerStatus = FileNodeManagerStatus.NONE;
// private Lock mergeLock = new ReentrantLock(false);
private Action overflowBackUpAction = new Action() {
@Override
public void act() throws Exception {
@ -74,8 +75,8 @@ public class FileNodeManager extends LRUManager<FileNodeProcessor> {
instanceLock.lock();
try {
if (instance == null) {
instance = new FileNodeManager(TsFileConf.maxFileNodeNum, MManager.getInstance(),
TsFileConf.FileNodeDir);
instance = new FileNodeManager(TsFileDBConf.maxFileNodeNum, MManager.getInstance(),
TsFileDBConf.FileNodeDir);
}
return instance;
} finally {
@ -149,9 +150,9 @@ public class FileNodeManager extends LRUManager<FileNodeProcessor> {
throw new FileNodeManagerException(e);
}
long lastUpdataTime = fileNodeProcessor.getLastUpdateTime();
LOGGER.info("Get the FileNodeProcessor: {}, the last update time is: {}, the record time is: {}",
LOGGER.debug("Get the FileNodeProcessor: {}, the last update time is: {}, the record time is: {}",
fileNodeProcessor.getNameSpacePath(), lastUpdataTime, timestamp);
LOGGER.info("Insert record is {}", tsRecord);
LOGGER.debug("Insert record is {}", tsRecord);
int insertType = 0;
String nameSpacePath = fileNodeProcessor.getNameSpacePath();
if (timestamp <= lastUpdataTime) {
@ -182,7 +183,7 @@ public class FileNodeManager extends LRUManager<FileNodeProcessor> {
fileNodeProcessor.changeTypeToChanged(timestamp);
addNameSpaceToOverflowList(fileNodeProcessor.getNameSpacePath());
// overflowProcessor.writeUnlock();
LOGGER.info("Unlock the OverflowProcessor: {}", fileNodeProcessor.getNameSpacePath());
LOGGER.debug("Unlock the OverflowProcessor: {}", fileNodeProcessor.getNameSpacePath());
insertType = 1;
} else {
BufferWriteProcessor bufferWriteProcessor;
@ -211,11 +212,11 @@ public class FileNodeManager extends LRUManager<FileNodeProcessor> {
}
fileNodeProcessor.setLastUpdateTime(timestamp);
// bufferWriteProcessor.writeUnlock();
LOGGER.info("Unlock the BufferWriteProcessor: {}", fileNodeProcessor.getNameSpacePath());
LOGGER.debug("Unlock the BufferWriteProcessor: {}", fileNodeProcessor.getNameSpacePath());
insertType = 2;
}
fileNodeProcessor.writeUnlock();
LOGGER.info("Unlock the FileNodeProcessor: {}", fileNodeProcessor.getNameSpacePath());
LOGGER.debug("Unlock the FileNodeProcessor: {}", fileNodeProcessor.getNameSpacePath());
return insertType;
}
@ -257,8 +258,10 @@ public class FileNodeManager extends LRUManager<FileNodeProcessor> {
e.printStackTrace();
throw new FileNodeManagerException(e);
}
LOGGER.info("Lock the FileNodeProcessor: {}", fileNodeProcessor.getNameSpacePath());
long lastUpdateTime = fileNodeProcessor.getLastUpdateTime();
LOGGER.debug("Get the FileNodeProcessor: {}, the last update time is: {}, the update time is from {} to {}",
fileNodeProcessor.getNameSpacePath(), lastUpdateTime, startTime, endTime);
if (startTime > lastUpdateTime) {
LOGGER.warn("The update range is error, startTime {} is gt lastUpdateTime {}", startTime, lastUpdateTime);
fileNodeProcessor.writeUnlock();
@ -294,7 +297,7 @@ public class FileNodeManager extends LRUManager<FileNodeProcessor> {
addNameSpaceToOverflowList(fileNodeProcessor.getNameSpacePath());
// overflowProcessor.writeUnlock();
fileNodeProcessor.writeUnlock();
LOGGER.info("Unlock the FileNodeProcessor: {}", fileNodeProcessor.getNameSpacePath());
LOGGER.debug("Unlock the FileNodeProcessor: {}", fileNodeProcessor.getNameSpacePath());
}
public void delete(String deltaObjectId, String measurementId, long timestamp, TSDataType type)
@ -312,14 +315,16 @@ public class FileNodeManager extends LRUManager<FileNodeProcessor> {
throw new FileNodeManagerException(e);
}
long lastUpdateTime = fileNodeProcessor.getLastUpdateTime();
LOGGER.debug("Get the FileNodeProcessor: {}, the last update time is: {}, the delete time is from 0 to {}",
fileNodeProcessor.getNameSpacePath(), lastUpdateTime, timestamp);
// no bufferwrite data, the delete operation is invalid
if (lastUpdateTime == -1) {
LOGGER.info("Last update time is -1, delete overflow is invalid");
LOGGER.warn("The last update time is -1, delete overflow is invalid");
fileNodeProcessor.writeUnlock();
LOGGER.info("Unlock the FileNodeProcessor: {}", fileNodeProcessor.getNameSpacePath());
LOGGER.debug("Unlock the FileNodeProcessor: {}", fileNodeProcessor.getNameSpacePath());
} else {
if (timestamp > lastUpdateTime) {
timestamp = lastUpdateTime;
timestamp = lastUpdateTime + 1;
}
Map<String, Object> parameters = new HashMap<>();
parameters.put(FileNodeConstants.OVERFLOW_BACKUP_MANAGER_ACTION, overflowBackUpAction);
@ -349,7 +354,7 @@ public class FileNodeManager extends LRUManager<FileNodeProcessor> {
fileNodeProcessor.changeTypeToChangedForDelete(timestamp);
addNameSpaceToOverflowList(fileNodeProcessor.getNameSpacePath());
fileNodeProcessor.writeUnlock();
LOGGER.info("Unlock the FileNodeProcessor: {}", fileNodeProcessor.getNameSpacePath());
LOGGER.debug("Unlock the FileNodeProcessor: {}", fileNodeProcessor.getNameSpacePath());
}
}
@ -359,6 +364,7 @@ public class FileNodeManager extends LRUManager<FileNodeProcessor> {
do {
fileNodeProcessor = getProcessorWithDeltaObjectIdByLRU(deltaObjectId, true);
} while (fileNodeProcessor == null);
LOGGER.debug("Get the FileNodeProcessor: {}, begin query.", fileNodeProcessor.getNameSpacePath());
int token = fileNodeProcessor.addMultiPassLock();
return token;
} catch (LRUManagerException e) {
@ -379,6 +385,8 @@ public class FileNodeManager extends LRUManager<FileNodeProcessor> {
do {
fileNodeProcessor = getProcessorWithDeltaObjectIdByLRU(deltaObjectId, false);
} while (fileNodeProcessor == null);
LOGGER.debug("Get the FileNodeProcessor: {}, query.", fileNodeProcessor.getNameSpacePath());
QueryStructure queryStructure = null;
// query operation must have overflow processor
if (!fileNodeProcessor.hasOverflowProcessor()) {
@ -387,9 +395,7 @@ public class FileNodeManager extends LRUManager<FileNodeProcessor> {
parameters.put(FileNodeConstants.OVERFLOW_FLUSH_MANAGER_ACTION, overflowFlushAction);
fileNodeProcessor.getOverflowProcessor(fileNodeProcessor.getNameSpacePath(), parameters);
}
queryStructure = fileNodeProcessor.query(deltaObjectId, measurementId, valueFilter, valueFilter,
valueFilter);
queryStructure = fileNodeProcessor.query(deltaObjectId, measurementId, timeFilter, freqFilter, valueFilter);
// return query structure
return queryStructure;
} catch (LRUManagerException e) {
@ -411,6 +417,7 @@ public class FileNodeManager extends LRUManager<FileNodeProcessor> {
do {
fileNodeProcessor = getProcessorWithDeltaObjectIdByLRU(deltaObjectId, true);
} while (fileNodeProcessor == null);
LOGGER.debug("Get the FileNodeProcessor: {}, end query.", fileNodeProcessor.getNameSpacePath());
fileNodeProcessor.removeMultiPassLock(token);
} catch (LRUManagerException e) {
e.printStackTrace();
@ -516,7 +523,7 @@ public class FileNodeManager extends LRUManager<FileNodeProcessor> {
@Override
public void run() {
ExecutorService mergeExecutorPool = Executors.newFixedThreadPool(TsFileConf.mergeConcurrentThreadNum);
ExecutorService mergeExecutorPool = Executors.newFixedThreadPool(TsFileDBConf.mergeConcurrentThreadNum);
for (String fileNodeNamespacePath : allChangedFileNodes) {
MergeOneProcessor mergeOneProcessorThread = new MergeOneProcessor(fileNodeNamespacePath);
mergeExecutorPool.execute(mergeOneProcessorThread);
@ -549,6 +556,8 @@ public class FileNodeManager extends LRUManager<FileNodeProcessor> {
do {
fileNodeProcessor = getProcessorByLRU(fileNodeNamespacePath, true);
} while (fileNodeProcessor == null);
LOGGER.info("Get the FileNodeProcessor: {}, merge.", fileNodeProcessor.getNameSpacePath());
// if bufferwrite and overflow exist
// close buffer write
if (fileNodeProcessor.hasBufferwriteProcessor()) {
@ -558,6 +567,7 @@ public class FileNodeManager extends LRUManager<FileNodeProcessor> {
fileNodeProcessor.getBufferWriteProcessor().close();
fileNodeProcessor.setBufferwriteProcessroToClosed();
}
// get overflow processor
Map<String, Object> parameters = new HashMap<>();
parameters.put(FileNodeConstants.OVERFLOW_BACKUP_MANAGER_ACTION, overflowBackUpAction);
@ -569,11 +579,18 @@ public class FileNodeManager extends LRUManager<FileNodeProcessor> {
}
fileNodeProcessor.getOverflowProcessor().close();
fileNodeProcessor.merge();
fileNodeProcessor.writeUnlock();
} catch (LRUManagerException | FileNodeProcessorException | BufferWriteProcessorException
| OverflowProcessorException e) {
LOGGER.error("Merge the filenode processor error, the nameSpacePath is {}", e.getMessage());
e.printStackTrace();
if (fileNodeProcessor != null) {
fileNodeProcessor.writeUnlock();
}
throw new ErrorDebugException(e);
}
try {
fileNodeProcessor.merge();
} catch (FileNodeProcessorException e) {
e.printStackTrace();
throw new ErrorDebugException(e);
}

View File

@ -39,6 +39,8 @@ import cn.edu.thu.tsfile.timeseries.write.io.TSFileIOWriter;
import cn.edu.thu.tsfile.timeseries.write.record.DataPoint;
import cn.edu.thu.tsfile.timeseries.write.record.TSRecord;
import cn.edu.thu.tsfile.timeseries.write.schema.FileSchema;
import cn.edu.thu.tsfiledb.conf.TSFileDBConfig;
import cn.edu.thu.tsfiledb.conf.TSFileDBDescriptor;
import cn.edu.thu.tsfiledb.engine.bufferwrite.Action;
import cn.edu.thu.tsfiledb.engine.bufferwrite.BufferWriteProcessor;
import cn.edu.thu.tsfiledb.engine.bufferwrite.FileNodeConstants;
@ -48,15 +50,19 @@ import cn.edu.thu.tsfiledb.engine.exception.OverflowProcessorException;
import cn.edu.thu.tsfiledb.engine.exception.ProcessorRuntimException;
import cn.edu.thu.tsfiledb.engine.lru.LRUProcessor;
import cn.edu.thu.tsfiledb.engine.overflow.io.OverflowProcessor;
import cn.edu.thu.tsfiledb.exception.NotConsistentException;
import cn.edu.thu.tsfiledb.exception.PathErrorException;
import cn.edu.thu.tsfiledb.metadata.ColumnSchema;
import cn.edu.thu.tsfiledb.metadata.MManager;
import cn.edu.thu.tsfiledb.query.engine.OverflowQueryEngine;
import cn.edu.thu.tsfiledb.query.engine.QueryerForMerge;
import cn.edu.thu.tsfiledb.query.management.ReadLockManager;
public class FileNodeProcessor extends LRUProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(FileNodeProcessor.class);
private static final TSFileConfig TsFileConf = TSFileDescriptor.getInstance().getConfig();
private static final TSFileDBConfig TsFileDBConf = TSFileDBDescriptor.getInstance().getConfig();
private static final MManager mManager = MManager.getInstance();
private static final String LOCK_SIGNAL = "lock___signal";
@ -74,9 +80,9 @@ public class FileNodeProcessor extends LRUProcessor {
private BufferWriteProcessor bufferWriteProcessor = null;
private OverflowProcessor overflowProcessor = null;
private Set<Integer> oldMultiPassTokenSet = new HashSet<>();
private Set<Integer> oldMultiPassTokenSet = null;
private Set<Integer> newMultiPassTokenSet = new HashSet<>();
private ReadWriteLock oldMultiPassLock = new ReentrantReadWriteLock(false);
private ReadWriteLock oldMultiPassLock = null;
private ReadWriteLock newMultiPassLock = new ReentrantReadWriteLock(false);
private Action flushFileNodeProcessorAction = new Action() {
@ -214,23 +220,20 @@ public class FileNodeProcessor extends LRUProcessor {
if (bufferWriteProcessor != null) {
try {
bufferWriteProcessor.close();
overflowProcessor.close();
} catch (BufferWriteProcessorException e) {
e.printStackTrace();
writeUnlock();
throw new FileNodeProcessorException(
"Close the bufferwrite processor failed, the reason is " + e.getMessage());
} catch (OverflowProcessorException e) {
e.printStackTrace();
writeUnlock();
throw new FileNodeProcessorException(
"Close the overflow processor failed, the reason is " + e.getMessage());
}
bufferWriteProcessor = null;
}
// Get the overflow processor, and close
// overflowProcessor = getOverflowProcessor(nameSpacePath,
// parameters);
try {
overflowProcessor.close();
} catch (OverflowProcessorException e) {
e.printStackTrace();
throw new FileNodeProcessorException(
"Close the overflow processor failed, the reason is " + e.getMessage());
}
merge();
}
if (isMerging == FileNodeProcessorState.WAITING) {
@ -406,14 +409,13 @@ public class FileNodeProcessor extends LRUProcessor {
private int multiPassLockToken = 0;
public int addMultiPassLock() {
LOGGER.debug("{} addMultiPassLock: read lock newMultiPassLock", LOCK_SIGNAL);
LOGGER.debug("AddMultiPassLock: read lock newMultiPassLock. {}", LOCK_SIGNAL);
newMultiPassLock.readLock().lock();
while (newMultiPassTokenSet.contains(multiPassLockToken)) {
multiPassLockToken++;
}
newMultiPassTokenSet.add(multiPassLockToken);
LOGGER.debug("{} add multi token:{}, nsPath:{}, new set:{}, lock:{}", LOCK_SIGNAL, multiPassLockToken,
nameSpacePath, newMultiPassTokenSet, newMultiPassLock);
LOGGER.debug("Add multi token:{}, nsPath:{}. {}", multiPassLockToken, nameSpacePath, LOCK_SIGNAL);
return multiPassLockToken;
}
@ -495,7 +497,9 @@ public class FileNodeProcessor extends LRUProcessor {
}
}
// add numOfMergeFile to control the number of the merge file
List<IntervalFileNode> backupIntervalFiles = switchFileNodeToMergev2();
List<IntervalFileNode> backupIntervalFiles = new ArrayList<>();
backupIntervalFiles = switchFileNodeToMergev2();
try {
//
// change the overflow work to merge
@ -504,6 +508,7 @@ public class FileNodeProcessor extends LRUProcessor {
} catch (ProcessorException e) {
LOGGER.error("Merge: Can't change overflow processor status from work to merge");
e.printStackTrace();
writeUnlock();
throw new FileNodeProcessorException(e);
}
synchronized (fileNodeProcessorStore) {
@ -518,6 +523,7 @@ public class FileNodeProcessor extends LRUProcessor {
"Merge: write filenode information to revocery file failed, the nameSpacePath is {}, the reason is {}",
nameSpacePath, e.getMessage());
e.printStackTrace();
writeUnlock();
throw new FileNodeProcessorException(
"Merge: write filenode information to revocery file failed, the nameSpacePath is "
+ nameSpacePath);
@ -574,11 +580,15 @@ public class FileNodeProcessor extends LRUProcessor {
switchWaitingToWorkingv2(backupIntervalFiles);
}
private List<IntervalFileNode> switchFileNodeToMergev2() {
private List<IntervalFileNode> switchFileNodeToMergev2() throws FileNodeProcessorException {
List<IntervalFileNode> result = new ArrayList<>();
if (newFileNodes.isEmpty()) {
if (emptyIntervalFileNode.overflowChangeType == OverflowChangeType.NO_CHANGE) {
throw new ProcessorRuntimException(String.format(
LOGGER.error("The newFileNodes is empty, but the emptyIntervalFileNode OverflowChangeType is {}",
emptyIntervalFileNode.overflowChangeType);
// no data should be merge
writeUnlock();
throw new FileNodeProcessorException(String.format(
"The newFileNodes is empty, but the emptyIntervalFileNode OverflowChangeType is %s",
emptyIntervalFileNode.overflowChangeType));
}
@ -587,6 +597,8 @@ public class FileNodeProcessor extends LRUProcessor {
OverflowChangeType.CHANGED, null, null);
result.add(intervalFileNode);
} else if (newFileNodes.size() == 1) {
// has overflow data, the only newFileNode must be changed or the
// emptyfile must be changed
IntervalFileNode temp = newFileNodes.get(0);
IntervalFileNode intervalFileNode = new IntervalFileNode(0, temp.endTime, temp.overflowChangeType,
temp.filePath, null);
@ -594,21 +606,34 @@ public class FileNodeProcessor extends LRUProcessor {
} else {
// add first
IntervalFileNode temp = newFileNodes.get(0);
IntervalFileNode intervalFileNode = new IntervalFileNode(0, newFileNodes.get(1).startTime - 1,
temp.overflowChangeType, temp.filePath, null);
result.add(intervalFileNode);
if (emptyIntervalFileNode.overflowChangeType == OverflowChangeType.CHANGED
|| temp.overflowChangeType == OverflowChangeType.CHANGED) {
IntervalFileNode intervalFileNode = new IntervalFileNode(0, newFileNodes.get(1).startTime - 1,
OverflowChangeType.CHANGED, temp.filePath, null);
result.add(intervalFileNode);
} else {
result.add(temp);
}
// second to the last -1
for (int i = 1; i < newFileNodes.size() - 1; i++) {
temp = newFileNodes.get(i);
intervalFileNode = new IntervalFileNode(temp.startTime, newFileNodes.get(i + 1).startTime - 1,
temp.overflowChangeType, temp.filePath, null);
result.add(intervalFileNode);
if (temp.overflowChangeType == OverflowChangeType.CHANGED) {
IntervalFileNode intervalFileNode = new IntervalFileNode(temp.startTime,
newFileNodes.get(i + 1).startTime - 1, temp.overflowChangeType, temp.filePath, null);
result.add(intervalFileNode);
} else {
result.add(temp);
}
}
// last interval
temp = newFileNodes.get(newFileNodes.size() - 1);
intervalFileNode = new IntervalFileNode(temp.startTime, temp.endTime, temp.overflowChangeType,
temp.filePath, null);
result.add(intervalFileNode);
if (temp.overflowChangeType == OverflowChangeType.CHANGED) {
IntervalFileNode intervalFileNode = new IntervalFileNode(temp.startTime, temp.endTime,
temp.overflowChangeType, temp.filePath, null);
result.add(intervalFileNode);
} else {
result.add(temp);
}
}
return result;
}
@ -621,11 +646,11 @@ public class FileNodeProcessor extends LRUProcessor {
oldMultiPassLock = newMultiPassLock;
newMultiPassTokenSet = new HashSet<>();
newMultiPassLock = new ReentrantReadWriteLock(false);
LOGGER.debug(
"Merge: swith merge to wait, switch oldMultiPassTokenSet to newMultiPassTokenSet, switch oldMultiPassLock to newMultiPassLock");
LOGGER.debug("Merge: swith merge to wait");
LOGGER.info("Merge switch merge to wait, the overflowChangeType of emptyIntervalFileNode is {}",
emptyIntervalFileNode.overflowChangeType);
LOGGER.info(
"Merge: switch merge to wait, the overflowChangeType of emptyIntervalFileNode is {}, the newFileNodes is {}",
emptyIntervalFileNode.overflowChangeType, newFileNodes);
if (emptyIntervalFileNode.overflowChangeType == OverflowChangeType.NO_CHANGE) {
// backup from newFilenodes
// no action
@ -648,6 +673,8 @@ public class FileNodeProcessor extends LRUProcessor {
if (putoff || newFileNodes.get(i).overflowChangeType == OverflowChangeType.MERGING_CHANGE) {
backupIntervalFile.overflowChangeType = OverflowChangeType.CHANGED;
putoff = false;
} else {
backupIntervalFile.overflowChangeType = OverflowChangeType.NO_CHANGE;
}
result.add(backupIntervalFile);
}
@ -665,6 +692,8 @@ public class FileNodeProcessor extends LRUProcessor {
if (putoff) {
emptyIntervalFileNode.endTime = lastUpdateTime;
emptyIntervalFileNode.overflowChangeType = OverflowChangeType.CHANGED;
} else {
emptyIntervalFileNode.overflowChangeType = OverflowChangeType.NO_CHANGE;
}
isMerging = FileNodeProcessorState.WAITING;
newFileNodes = result;
@ -697,12 +726,16 @@ public class FileNodeProcessor extends LRUProcessor {
writeLock();
try {
oldMultiPassLock.writeLock().lock();
if (oldMultiPassLock != null) {
LOGGER.info("The old Multiple Pass Token set is {}, the old Multiple Pass Lock is {}",
oldMultiPassTokenSet, oldMultiPassLock);
oldMultiPassLock.writeLock().lock();
}
try {
// delete the all files which are in the newFileNodes
// notice: the last restore file of the interval file
String bufferwriteDirPath = TsFileConf.BufferWriteDir;
String bufferwriteDirPath = TsFileDBConf.BufferWriteDir;
if (bufferwriteDirPath.length() > 0
&& bufferwriteDirPath.charAt(bufferwriteDirPath.length() - 1) != File.separatorChar) {
bufferwriteDirPath = bufferwriteDirPath + File.separatorChar;
@ -717,8 +750,7 @@ public class FileNodeProcessor extends LRUProcessor {
for (IntervalFileNode bufferFileNode : newFileNodes) {
String bufferFilePath = bufferFileNode.filePath;
if (bufferFilePath != null) {
File bufferFile = new File(bufferwriteDir, bufferFilePath);
bufferFiles.add(bufferFile.getAbsolutePath());
bufferFiles.add(bufferFilePath);
}
}
// add the restore file, if the last file is not closed
@ -749,7 +781,9 @@ public class FileNodeProcessor extends LRUProcessor {
throw new FileNodeProcessorException(e);
} finally {
oldMultiPassTokenSet = null;
oldMultiPassLock.writeLock().unlock();
if (oldMultiPassLock != null) {
oldMultiPassLock.writeLock().unlock();
}
oldMultiPassLock = null;
}
} finally {
@ -765,26 +799,22 @@ public class FileNodeProcessor extends LRUProcessor {
LOGGER.info("Merge query and merge: namespace {}, time filter {}", nameSpacePath, timeFilter);
QueryDataSet data = null;
long startTime = -1;
long endTime = -1;
OverflowQueryEngine queryEngine = new OverflowQueryEngine();
try {
data = queryEngine.query(pathList, timeFilter, null, null, null, TsFileConf.defaultFetchSize);
} catch (ProcessorException e1) {
e1.printStackTrace();
throw new IOException("Exception when merge");
}
if (!data.hasNextRecord()) {
QueryerForMerge queryer = new QueryerForMerge(pathList, (SingleSeriesFilterExpression) timeFilter);
int queryCount = 0;
if (!queryer.hasNextRecord()) {
// No record in this query
LOGGER.warn("Merge query: namespace {}, time filter {}, no query data", nameSpacePath, timeFilter);
// Set the IntervalFile
backupIntervalFile.startTime = -1;
backupIntervalFile.endTime = -1;
} else {
queryCount ++;
TSRecordWriter recordWriter;
RowRecord firstRecord = data.getNextRecord();
RowRecord firstRecord = queryer.getNextRecord();
// get the outputPate and FileSchema
String outputPath = constructOutputFilePath(nameSpacePath,
firstRecord.timestamp + FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR + System.currentTimeMillis());
@ -805,8 +835,9 @@ public class FileNodeProcessor extends LRUProcessor {
recordWriter.write(filledRecord);
startTime = endTime = firstRecord.getTime();
while (data.hasNextRecord()) {
RowRecord row = data.getNextRecord();
while (queryer.hasNextRecord()) {
queryCount ++;
RowRecord row = queryer.getNextRecord();
filledRecord = removeNullTSRecord(row);
endTime = filledRecord.time;
try {
@ -817,7 +848,7 @@ public class FileNodeProcessor extends LRUProcessor {
}
}
recordWriter.close();
System.out.println(" ============ Merge Record Count: " + queryCount);
LOGGER.debug("Merge query: namespace {}, time filter {}, filepath {} successfully", nameSpacePath,
timeFilter, outputPath);
backupIntervalFile.startTime = startTime;
@ -830,7 +861,7 @@ public class FileNodeProcessor extends LRUProcessor {
private String constructOutputFilePath(String nameSpacePath, String fileName) {
String dataDirPath = TsFileConf.BufferWriteDir;
String dataDirPath = TsFileDBConf.BufferWriteDir;
if (dataDirPath.charAt(dataDirPath.length() - 1) != File.separatorChar) {
dataDirPath = dataDirPath + File.separatorChar + nameSpacePath;
}
@ -913,16 +944,20 @@ public class FileNodeProcessor extends LRUProcessor {
@Override
public boolean canBeClosed() {
if (isMerging == FileNodeProcessorState.NONE && newMultiPassLock.writeLock().tryLock()) {
try {
if (oldMultiPassLock.writeLock().tryLock()) {
try {
return true;
} finally {
oldMultiPassLock.writeLock().unlock();
if (oldMultiPassLock != null) {
try {
if (oldMultiPassLock.writeLock().tryLock()) {
try {
return true;
} finally {
oldMultiPassLock.writeLock().unlock();
}
}
} finally {
newMultiPassLock.writeLock().unlock();
}
} finally {
newMultiPassLock.writeLock().unlock();
} else {
return true;
}
}
return false;

View File

@ -50,7 +50,10 @@ public class QueryStructure {
return allOverflowData;
}
public String toString(){
return "FilesList: " + String.valueOf(bufferwriteDataInFiles) + "\n"
+ "InsertData: " + (allOverflowData.get(0) != null ? ((DynamicOneColumnData)allOverflowData.get(0)).length : 0);
}
}

View File

@ -151,12 +151,12 @@ public abstract class LRUManager<T extends LRUProcessor> {
throws LRUManagerException {
T processor = null;
LOGGER.info("Try to get LRUProcessor {}, the nameSpacePath is {}, Thread is {}", this.getClass().getName(),
LOGGER.debug("Try to get LRUProcessor, the nameSpacePath is {}, Thread is {}",
namespacePath, Thread.currentThread().getName());
// change the processorMap position and improve concurrent performance
synchronized (processorMap) {
LOGGER.debug("The Thread {} will get the LRUProcessor, the nameSpacePath is {}",
Thread.currentThread().getContextClassLoader(), namespacePath);
Thread.currentThread().getName(), namespacePath);
if (processorMap.containsKey(namespacePath)) {
processor = processorMap.get(namespacePath);
// should use the try lock

View File

@ -67,7 +67,7 @@ public abstract class LRUProcessor {
* Release the write lock
*/
public void writeUnlock() {
LOGGER.debug("{}: lru write lock-Thread id {}", this.getClass().getSimpleName(),
LOGGER.debug("{}: lru write unlock-Thread id {}", this.getClass().getSimpleName(),
Thread.currentThread().getId());
lock.writeLock().unlock();
}

View File

@ -16,6 +16,8 @@ import cn.edu.thu.tsfile.common.conf.TSFileDescriptor;
import cn.edu.thu.tsfile.common.utils.BytesUtils;
import cn.edu.thu.tsfile.file.metadata.enums.TSDataType;
import cn.edu.thu.tsfile.timeseries.filter.definition.SingleSeriesFilterExpression;
import cn.edu.thu.tsfiledb.conf.TSFileDBConfig;
import cn.edu.thu.tsfiledb.conf.TSFileDBDescriptor;
import cn.edu.thu.tsfiledb.engine.bufferwrite.Action;
import cn.edu.thu.tsfiledb.engine.bufferwrite.FileNodeConstants;
import cn.edu.thu.tsfiledb.engine.exception.OverflowProcessorException;
@ -31,6 +33,7 @@ public class OverflowProcessor extends LRUProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(OverflowProcessor.class);
private static final TSFileConfig TsFileConf = TSFileDescriptor.getInstance().getConfig();
private static final TSFileDBConfig TsFileDBConf = TSFileDBDescriptor.getInstance().getConfig();
private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 100;
private static final int MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000;
@ -56,7 +59,7 @@ public class OverflowProcessor extends LRUProcessor {
public OverflowProcessor(String nameSpacePath, Map<String, Object> parameters) throws OverflowProcessorException {
super(nameSpacePath);
String overflowDirPath = TsFileConf.overflowDataDir;
String overflowDirPath = TsFileDBConf.overflowDataDir;
if (overflowDirPath.length() > 0
&& overflowDirPath.charAt(overflowDirPath.length() - 1) != File.separatorChar) {
overflowDirPath = overflowDirPath + File.separatorChar;
@ -416,7 +419,7 @@ public class OverflowProcessor extends LRUProcessor {
} catch (Exception e) {
LOGGER.error("filenodeFlushAction action failed");
e.printStackTrace();
throw new OverflowProcessorException("filenodeFlushAction action failed");
throw new OverflowProcessorException("FilenodeFlushAction action failed");
} finally {
synchronized (flushState) {
flushState.setUnFlushing();

View File

@ -170,7 +170,10 @@ public class OverflowSeriesImpl {
private DynamicOneColumnData readFileFromFileBlockForReader(DynamicOneColumnData newerData,
List<TimeSeriesChunkMetaData> TimeSeriesChunkMetaDataList, SingleSeriesFilterExpression timeFilter,
SingleSeriesFilterExpression freqFilter, SingleSeriesFilterExpression valueFilter) {
for (TimeSeriesChunkMetaData seriesMetaData : TimeSeriesChunkMetaDataList) {
// for (TimeSeriesChunkMetaData seriesMetaData :
// TimeSeriesChunkMetaDataList) {
for (int i = TimeSeriesChunkMetaDataList.size() - 1; i >= 0; i--) {
TimeSeriesChunkMetaData seriesMetaData = TimeSeriesChunkMetaDataList.get(i);
int chunkSize = (int) seriesMetaData.getTotalByteSize();
long offset = seriesMetaData.getProperties().getFileOffset();
InputStream in = overflowFileIO.getSeriesChunkBytes(chunkSize, offset);

View File

@ -1,102 +0,0 @@
package cn.edu.thu.tsfiledb.hadoop.io;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import cn.edu.thu.tsfile.common.utils.TSRandomAccessFileReader;
import java.io.IOException;
/**
* This class is used to wrap the {@link}FSDataInputStream and implement the
* interface {@link}TSRandomAccessFileReader.
*
* @author liukun
*/
public class HDFSInputStream implements TSRandomAccessFileReader {
private FSDataInputStream fsDataInputStream;
private FileStatus fileStatus;
public HDFSInputStream(String filePath) throws IOException {
this(filePath, new Configuration());
}
public HDFSInputStream(String filePath, Configuration configuration) throws IOException {
this(new Path(filePath),configuration);
}
public HDFSInputStream(Path path, Configuration configuration) throws IOException {
FileSystem fs = FileSystem.get(configuration);
fsDataInputStream = fs.open(path);
fileStatus = fs.getFileStatus(path);
}
@Override
public void seek(long offset) throws IOException {
fsDataInputStream.seek(offset);
}
@Override
public int read() throws IOException {
return fsDataInputStream.read();
}
@Override
public long length() throws IOException {
return fileStatus.getLen();
}
@Override
public int readInt() throws IOException {
return fsDataInputStream.readInt();
}
public void close() throws IOException {
fsDataInputStream.close();
}
public long getPos() throws IOException {
return fsDataInputStream.getPos();
}
/**
* Read the data into b, and the check the length
*
* @param b
* read the data into
* @param off
* the begin offset to read into
* @param len
* the length to read into
*/
@Override
public int read(byte[] b, int off, int len) throws IOException {
if (len < 0) {
throw new IndexOutOfBoundsException();
}
int n = 0;
while (n < len) {
int count = fsDataInputStream.read(b, off + n, len - n);
if (count < 0) {
throw new IOException("The read length is out of the length of inputstream");
}
n += count;
}
return n;
}
}

View File

@ -1,80 +0,0 @@
package cn.edu.thu.tsfiledb.hadoop.io;
import java.io.IOException;
import java.io.OutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import cn.edu.thu.tsfile.common.utils.TSRandomAccessFileWriter;
/**
* This class is used to wrap the {@link}FSDataOutputStream and implement the
* interface {@link}TSRandomAccessFileWriter
*
* @author liukun
*/
public class HDFSOutputStream implements TSRandomAccessFileWriter {
private static final Logger LOGGER = LoggerFactory.getLogger(HDFSOutputStream.class);
private FSDataOutputStream fsDataOutputStream;
public HDFSOutputStream(String filePath, boolean overwriter) throws IOException {
this(filePath, new Configuration(), overwriter);
}
public HDFSOutputStream(String filePath, Configuration configuration, boolean overwriter) throws IOException {
this(new Path(filePath),configuration,overwriter);
}
public HDFSOutputStream(Path path,Configuration configuration,boolean overwriter) throws IOException{
FileSystem fsFileSystem = FileSystem.get(configuration);
fsDataOutputStream = fsFileSystem.create(path, overwriter);
}
@Override
public OutputStream getOutputStream() {
return fsDataOutputStream;
}
@Override
public long getPos() throws IOException {
return fsDataOutputStream.getPos();
}
@Override
public void write(int b) throws IOException {
fsDataOutputStream.write(b);
}
@Override
public void write(byte[] b) throws IOException {
fsDataOutputStream.write(b);
}
@Override
public void close() throws IOException {
fsDataOutputStream.close();
}
@Override
public void seek(long offset) throws IOException {
throw new IOException("Not support");
}
}

View File

@ -20,6 +20,7 @@ import java.util.Map;
import cn.edu.thu.tsfile.common.conf.TSFileDescriptor;
import cn.edu.thu.tsfile.file.metadata.enums.TSDataType;
import cn.edu.thu.tsfile.timeseries.read.qp.Path;
import cn.edu.thu.tsfiledb.conf.TSFileDBDescriptor;
import cn.edu.thu.tsfiledb.exception.MetadataArgsErrorException;
import cn.edu.thu.tsfiledb.exception.PathErrorException;
@ -46,7 +47,8 @@ public class MManager {
private MManager() {
writeToLog = false;
String folderPath = TSFileDescriptor.getInstance().getConfig().metadataDir;
String folderPath = TSFileDBDescriptor.getInstance().getConfig().metadataDir;
datafilePath = folderPath + "/mdata.obj";
logFilePath = folderPath + "/mlog.txt";
init();

View File

@ -84,7 +84,7 @@ public class BasicFunctionOperator extends FunctionOperator {
// }
@Override
protected Pair<SingleSeriesFilterExpression, String> transformToSingleFilter(QueryProcessExecutor exec)
protected Pair<SingleSeriesFilterExpression, String> transformToSingleFilter(QueryProcessExecutor exec, FilterSeriesType filterType)
throws QpSelectFromException, QpWhereException {
TSDataType type = exec.getSeriesType(seriesPath);
if (type == null) {
@ -97,35 +97,35 @@ public class BasicFunctionOperator extends FunctionOperator {
ret =
funcToken.getSingleSeriesFilterExpression(
FilterFactory.intFilterSeries(seriesPath.getDeltaObjectToString(),
seriesPath.getMeasurementToString(), FilterSeriesType.VALUE_FILTER),
seriesPath.getMeasurementToString(), filterType),
Integer.valueOf(seriesValue));
break;
case INT64:
ret =
funcToken.getSingleSeriesFilterExpression(
FilterFactory.longFilterSeries(seriesPath.getDeltaObjectToString(),
seriesPath.getMeasurementToString(), FilterSeriesType.VALUE_FILTER),
seriesPath.getMeasurementToString(), filterType),
Long.valueOf(seriesValue));
break;
case BOOLEAN:
ret =
funcToken.getSingleSeriesFilterExpression(
FilterFactory.booleanFilterSeries(seriesPath.getDeltaObjectToString(),
seriesPath.getMeasurementToString(), FilterSeriesType.VALUE_FILTER),
seriesPath.getMeasurementToString(), filterType),
Boolean.valueOf(seriesValue));
break;
case FLOAT:
ret =
funcToken.getSingleSeriesFilterExpression(
FilterFactory.floatFilterSeries(seriesPath.getDeltaObjectToString(),
seriesPath.getMeasurementToString(), FilterSeriesType.VALUE_FILTER),
seriesPath.getMeasurementToString(), filterType),
Float.valueOf(seriesValue));
break;
case DOUBLE:
ret =
funcToken.getSingleSeriesFilterExpression(
FilterFactory.doubleFilterSeries(seriesPath.getDeltaObjectToString(),
seriesPath.getMeasurementToString(), FilterSeriesType.VALUE_FILTER),
seriesPath.getMeasurementToString(), filterType),
Double.valueOf(seriesValue));
break;
default:

View File

@ -13,6 +13,7 @@ import cn.edu.thu.tsfile.common.utils.Pair;
import cn.edu.thu.tsfile.timeseries.filter.definition.FilterExpression;
import cn.edu.thu.tsfile.timeseries.filter.definition.FilterFactory;
import cn.edu.thu.tsfile.timeseries.filter.definition.SingleSeriesFilterExpression;
import cn.edu.thu.tsfile.timeseries.filter.definition.filterseries.FilterSeriesType;
import cn.edu.thu.tsfiledb.qp.constant.SQLConstant;
import cn.edu.thu.tsfiledb.qp.exception.QueryProcessorException;
import cn.edu.thu.tsfiledb.qp.exception.logical.operator.FilterOperatorException;
@ -114,20 +115,20 @@ public class FilterOperator extends Operator implements Comparable<FilterOperato
* @return
* @throws QueryProcessorException
*/
public FilterExpression transformToFilter(QueryProcessExecutor conf)
public FilterExpression transformToFilter(QueryProcessExecutor conf, FilterSeriesType type)
throws QueryProcessorException {
if (isSingle) {
Pair<SingleSeriesFilterExpression, String> ret = transformToSingleFilter(conf);
Pair<SingleSeriesFilterExpression, String> ret = transformToSingleFilter(conf, type);
return ret.left;
} else {
if (childOperators.isEmpty()) {
throw new FilterOperatorException("this filter is not leaf, but it's empty:"
+ tokenIntType);
}
FilterExpression retFilter = childOperators.get(0).transformToFilter(conf);
FilterExpression retFilter = childOperators.get(0).transformToFilter(conf, type);
FilterExpression cross;
for (int i = 1; i < childOperators.size(); i++) {
cross = childOperators.get(i).transformToFilter(conf);
cross = childOperators.get(i).transformToFilter(conf, type);
switch (tokenIntType) {
case KW_AND:
retFilter = FilterFactory.and(retFilter, cross);
@ -153,19 +154,19 @@ public class FilterOperator extends Operator implements Comparable<FilterOperato
* represented by this child.
* @throws QueryProcessorException
*/
protected Pair<SingleSeriesFilterExpression, String> transformToSingleFilter(QueryProcessExecutor exec)
protected Pair<SingleSeriesFilterExpression, String> transformToSingleFilter(QueryProcessExecutor exec, FilterSeriesType type)
throws QueryProcessorException {
if (childOperators.isEmpty()) {
throw new FilterOperatorException(
("transformToSingleFilter: this filter is not leaf, but it's empty:{}" + tokenIntType));
}
Pair<SingleSeriesFilterExpression, String> single =
childOperators.get(0).transformToSingleFilter(exec);
childOperators.get(0).transformToSingleFilter(exec, type);
SingleSeriesFilterExpression retFilter = single.left;
String childSeriesStr = single.right;
//
for (int i = 1; i < childOperators.size(); i++) {
single = childOperators.get(i).transformToSingleFilter(exec);
single = childOperators.get(i).transformToSingleFilter(exec, type);
if (!childSeriesStr.equals(single.right))
throw new FilterOperatorException(
("transformToSingleFilter: paths among children are not inconsistent: one is:"

View File

@ -12,6 +12,7 @@ import cn.edu.thu.tsfile.timeseries.filter.definition.FilterExpression;
import cn.edu.thu.tsfile.timeseries.filter.definition.FilterFactory;
import cn.edu.thu.tsfile.timeseries.filter.definition.SingleSeriesFilterExpression;
import cn.edu.thu.tsfile.timeseries.filter.definition.filterseries.FilterSeries;
import cn.edu.thu.tsfile.timeseries.filter.definition.filterseries.FilterSeriesType;
import cn.edu.thu.tsfile.timeseries.read.query.QueryDataSet;
import cn.edu.thu.tsfile.timeseries.read.readSupport.RowRecord;
import cn.edu.thu.tsfile.timeseries.utils.StringContainer;
@ -143,11 +144,11 @@ public class SeriesSelectPlan extends PhysicalPlan {
private FilterExpression[] transformFilterOpToExpression(QueryProcessExecutor conf)
throws QueryProcessorException {
FilterExpression timeFilter =
timeFilterOperator == null ? null : timeFilterOperator.transformToFilter(conf);
timeFilterOperator == null ? null : timeFilterOperator.transformToFilter(conf, FilterSeriesType.TIME_FILTER);
FilterExpression freqFilter =
freqFilterOperator == null ? null : freqFilterOperator.transformToFilter(conf);
freqFilterOperator == null ? null : freqFilterOperator.transformToFilter(conf, FilterSeriesType.FREQUENCY_FILTER);
FilterExpression valueFilter =
valueFilterOperator == null ? null : valueFilterOperator.transformToFilter(conf);
valueFilterOperator == null ? null : valueFilterOperator.transformToFilter(conf, FilterSeriesType.VALUE_FILTER);
// TODO maybe it's a temporary solution. Up to now, if a crossSensorFilter is needed, just
// construct it with two same children via CSAnd(valueFilter, valueFilter)
if (valueFilter instanceof SingleSeriesFilterExpression) {
@ -156,10 +157,10 @@ public class SeriesSelectPlan extends PhysicalPlan {
Path path = paths.get(0);
if (!series.getDeltaObjectUID().equals(path.getDeltaObjectToString())
|| !series.getMeasurementUID().equals(path.getMeasurementToString())) {
valueFilter = FilterFactory.and(valueFilter, valueFilter);
valueFilter = FilterFactory.csAnd(valueFilter, valueFilter);
}
} else
valueFilter = FilterFactory.and(valueFilter, valueFilter);
valueFilter = FilterFactory.csAnd(valueFilter, valueFilter);
}
return new FilterExpression[] {timeFilter, freqFilter, valueFilter};
}

View File

@ -38,7 +38,7 @@ public class MinTimeAggrFunc extends AggregateFunction{
hasSetValue = true;
}else{
long maxv = result.data.getLong(0);
maxv = maxv > timestamp ? maxv : timestamp;
maxv = maxv < timestamp ? maxv : timestamp;
result.data.setLong(0, maxv);
}
}

View File

@ -97,9 +97,7 @@ public class OverflowQueryEngine {
queryDataSet.mapRet.put(func.name + "(" + path.getFullPath() + ")", aggrRet.data);
//close current recordReader
recordReader.closeFromFactory();
return queryDataSet;
}
public QueryDataSet readWithoutFilter(List<Path> paths, QueryDataSet queryDataSet, int fetchSize) throws ProcessorException, IOException {
@ -110,7 +108,6 @@ public class OverflowQueryEngine {
public DynamicOneColumnData getMoreRecordsForOneColumn(Path p, DynamicOneColumnData res) throws ProcessorException, IOException {
return OverflowQueryEngine.readOneColumnWithoutFilter(p, res, fetchSize);
}
};
queryDataSet.setBatchReaderRetGenerator(batchReaderRetGenerator);
}
@ -130,6 +127,7 @@ public class OverflowQueryEngine {
// Get 4 params
List<Object> params = getOverflowInfoAndFilterDataInMem(deltaObjectUID, measuremetnUID, null, null, null, res, recordReader.insertDataInMemory, recordReader.overflowInfo);
DynamicOneColumnData insertTrue = (DynamicOneColumnData) params.get(0);
DynamicOneColumnData updateTrue = (DynamicOneColumnData) params.get(1);
DynamicOneColumnData updateFalse = (DynamicOneColumnData) params.get(2);
SingleSeriesFilterExpression delteFilter = (SingleSeriesFilterExpression) params.get(3);
@ -180,9 +178,12 @@ public class OverflowQueryEngine {
String sensor = p.getMeasurementToString();
RecordReader recordReader = RecordReaderFactory.getInstance().getRecordReader(device, sensor, timeFilter, freqFilter, valueFilter);
// Get 4 params
List<Object> params = getOverflowInfoAndFilterDataInMem(device, sensor, timeFilter, freqFilter, valueFilter, res, recordReader.insertDataInMemory, recordReader.overflowInfo);
DynamicOneColumnData insertTrue = (DynamicOneColumnData) params.get(0);
DynamicOneColumnData updateTrue = (DynamicOneColumnData) params.get(1);
DynamicOneColumnData updateFalse = (DynamicOneColumnData) params.get(2);
SingleSeriesFilterExpression deleteFilter = (SingleSeriesFilterExpression) params.get(3);

View File

@ -0,0 +1,65 @@
package cn.edu.thu.tsfiledb.query.engine;
import java.io.IOException;
import java.util.List;
import cn.edu.thu.tsfile.common.exception.ProcessorException;
import cn.edu.thu.tsfile.timeseries.filter.definition.SingleSeriesFilterExpression;
import cn.edu.thu.tsfile.timeseries.read.qp.Path;
import cn.edu.thu.tsfile.timeseries.read.query.QueryDataSet;
import cn.edu.thu.tsfile.timeseries.read.readSupport.RowRecord;
import cn.edu.thu.tsfiledb.conf.TSFileDBConfig;
import cn.edu.thu.tsfiledb.conf.TSFileDBDescriptor;
import cn.edu.thu.tsfiledb.exception.NotConsistentException;
import cn.edu.thu.tsfiledb.query.management.ReadLockManager;
public class QueryerForMerge {
private List<Path> pathList;
private SingleSeriesFilterExpression timeFilter;
private OverflowQueryEngine queryEngine;
private QueryDataSet queryDataSet;
private static final TSFileDBConfig TsFileDBConf = TSFileDBDescriptor.getInstance().getConfig();
public QueryerForMerge(List<Path> pathList, SingleSeriesFilterExpression timeFilter) {
this.pathList = pathList;
this.timeFilter = timeFilter;
queryEngine = new OverflowQueryEngine();
queryDataSet = null;
}
public boolean hasNextRecord() {
boolean ret = false;
if (queryDataSet == null || !queryDataSet.hasNextRecord()) {
try {
queryDataSet = queryEngine.query(pathList, timeFilter, null, null, queryDataSet,
TsFileDBConf.defaultFetchSize);
} catch (ProcessorException | IOException e) {
e.printStackTrace();
}
}
ret = queryDataSet.hasNextRecord();
if (!ret) {
unlockForCurrentQuery();
}
return ret;
}
public RowRecord getNextRecord(){
if (hasNextRecord()) {
return queryDataSet.getNextRecord();
}
return null;
}
private void unlockForCurrentQuery() {
try {
ReadLockManager.getInstance().unlockForOneRequest();
} catch (NotConsistentException e) {
e.printStackTrace();
} catch (ProcessorException e) {
e.printStackTrace();
}
}
}

View File

@ -5,6 +5,7 @@ import java.util.HashMap;
import cn.edu.thu.tsfiledb.engine.exception.FileNodeManagerException;
import cn.edu.thu.tsfiledb.engine.filenode.FileNodeManager;
import cn.edu.thu.tsfiledb.exception.NotConsistentException;
import cn.edu.thu.tsfiledb.query.reader.RecordReader;
import cn.edu.thu.tsfile.common.exception.ProcessorException;
@ -13,6 +14,7 @@ public class ReadLockManager {
private static ReadLockManager instance = new ReadLockManager();
FileNodeManager fileNodeManager = FileNodeManager.getInstance();
ThreadLocal<HashMap<String,Integer>> locksMap = new ThreadLocal<>();
public RecordReaderCache recordReaderCache = new RecordReaderCache();
private ReadLockManager(){
@ -41,8 +43,7 @@ public class ReadLockManager {
}
public void unlockForQuery(String deltaObjectUID, String measurementID
, int token) throws ProcessorException{
public void unlockForQuery(String deltaObjectUID, int token) throws ProcessorException{
try {
fileNodeManager.endQuery(deltaObjectUID, token);
} catch (FileNodeManagerException e) {
@ -57,19 +58,20 @@ public class ReadLockManager {
}
HashMap<String,Integer> locks = locksMap.get();
for(String key : locks.keySet()){
String[] names = splitKey(key);
unlockForQuery(names[0], names[1], locks.get(key));
unlockForQuery(key, locks.get(key));
}
locksMap.remove();
//remove recordReaders cached
recordReaderCache.clear();
}
public String getKey(String deltaObjectUID, String measurementID){
return deltaObjectUID + "#" + measurementID;
return deltaObjectUID;
}
public String[] splitKey(String key){
return key.split("#");
}
// public String[] splitKey(String key){
// return key.split("#");
// }
public static ReadLockManager getInstance(){
if(instance == null){

View File

@ -0,0 +1,50 @@
package cn.edu.thu.tsfiledb.query.management;
import java.io.IOException;
import java.util.HashMap;
import cn.edu.thu.tsfile.common.exception.ProcessorException;
import cn.edu.thu.tsfiledb.query.reader.RecordReader;
public class RecordReaderCache {
ThreadLocal<HashMap<String,RecordReader>> cache = new ThreadLocal<>();
public boolean containsRecordReader(String deltaObjectUID, String measurementID){
checkCacheInitialized();
return cache.get().containsKey(getKey(deltaObjectUID, measurementID));
}
public RecordReader get(String deltaObjectUID, String measurementID){
checkCacheInitialized();
return cache.get().get(getKey(deltaObjectUID, measurementID));
}
public void put(String deltaObjectUID, String measurementID, RecordReader recordReader){
checkCacheInitialized();
cache.get().put(getKey(deltaObjectUID, measurementID), recordReader);
}
public void clear() throws ProcessorException{
for(RecordReader rr : cache.get().values()){
try {
rr.close();
} catch (IOException | ProcessorException e) {
e.printStackTrace();
throw new ProcessorException(e);
}
}
cache.remove();
}
private String getKey(String deltaObjectUID, String measurementID){
return deltaObjectUID + "#" + measurementID;
}
private void checkCacheInitialized(){
if(cache.get() == null){
cache.set(new HashMap<>());
}
}
}

View File

@ -1,6 +1,5 @@
package cn.edu.thu.tsfiledb.query.management;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@ -8,13 +7,13 @@ import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import cn.edu.thu.tsfile.common.exception.ProcessorException;
import cn.edu.thu.tsfile.common.utils.TSRandomAccessFileReader;
import cn.edu.thu.tsfile.timeseries.filter.definition.SingleSeriesFilterExpression;
import cn.edu.thu.tsfiledb.engine.exception.FileNodeManagerException;
import cn.edu.thu.tsfiledb.engine.filenode.FileNodeManager;
import cn.edu.thu.tsfiledb.engine.filenode.IntervalFileNode;
import cn.edu.thu.tsfiledb.engine.filenode.QueryStructure;
import cn.edu.thu.tsfile.common.exception.ProcessorException;
import cn.edu.thu.tsfiledb.query.reader.RecordReader;
/**
@ -41,16 +40,24 @@ public class RecordReaderFactory {
SingleSeriesFilterExpression timeFilter, SingleSeriesFilterExpression freqFilter,
SingleSeriesFilterExpression valueFilter) throws ProcessorException {
int token = readLockManager.lock(deltaObjectUID, measurementID);
QueryStructure queryStructure;
try {
queryStructure = fileNodeManager.query(deltaObjectUID, measurementID, timeFilter, freqFilter, valueFilter);
} catch (FileNodeManagerException e) {
throw new ProcessorException(e.getMessage());
if (readLockManager.recordReaderCache.containsRecordReader(deltaObjectUID, measurementID)) {
return readLockManager.recordReaderCache.get(deltaObjectUID, measurementID);
} else {
QueryStructure queryStructure;
try {
queryStructure = fileNodeManager.query(deltaObjectUID, measurementID, timeFilter, freqFilter,
valueFilter);
System.out.println("====== Device:" + deltaObjectUID + ". Sensor:" + measurementID);
System.out.println(queryStructure);
System.out.println("======");
} catch (FileNodeManagerException e) {
throw new ProcessorException(e.getMessage());
}
// TODO: This can be optimized in later version
RecordReader recordReader = createANewRecordReader(deltaObjectUID, measurementID, queryStructure, token);
readLockManager.recordReaderCache.put(deltaObjectUID, measurementID, recordReader);
return recordReader;
}
// TODO: This can be optimized in later version
RecordReader recordReader = createANewRecordReader(deltaObjectUID, measurementID, queryStructure, token);
return recordReader;
}
public RecordReader createANewRecordReader(String deltaObjectUID, String measurementID,
@ -95,12 +102,12 @@ public class RecordReaderFactory {
}
public void closeOneRecordReader(RecordReader recordReader) throws ProcessorException {
try {
recordReader.close();
} catch (IOException e) {
logger.error("Error in closing RecordReader : {}", e.getMessage());
e.printStackTrace();
}
// try {
// recordReader.close();
// } catch (IOException e) {
// logger.error("Error in closing RecordReader : {}", e.getMessage());
// e.printStackTrace();
// }
}
public static RecordReaderFactory getInstance() {

View File

@ -230,7 +230,6 @@ public class OverflowValueReader extends ValueReader{
idx2++;
res.insertTrueIndex++;
resCount++;
calculateFrequency(hasOverflowDataInThisPage, freqFilter, insertTrue.getInt(idx2));
// if equal, take value from insertTrue and skip one
// value from page. That is to say, insert is like
// update.
@ -246,7 +245,6 @@ public class OverflowValueReader extends ValueReader{
break;
}
int v = decoder.readInt(page);
calculateFrequency(hasOverflowDataInThisPage, freqFilter, v);
if (mode == -1) {
if ((valueFilter == null && timeFilter == null)
@ -418,7 +416,6 @@ public class OverflowValueReader extends ValueReader{
idx2++;
res.insertTrueIndex++;
resCount++;
calculateFrequency(hasOverflowDataInThisPage, freqFilter, insertTrue.getLong(idx2));
// if equal, take value from insertTrue and skip one
// value from page
if (insertTrue.getTime(idx2 - 1) == timeValues[timeIdx]) {
@ -433,7 +430,6 @@ public class OverflowValueReader extends ValueReader{
break;
}
long v = decoder.readLong(page);
calculateFrequency(hasOverflowDataInThisPage, freqFilter, v);
if (mode == -1) {
if ((valueFilter == null && timeFilter == null)
|| (valueFilter != null && timeFilter == null
@ -511,7 +507,6 @@ public class OverflowValueReader extends ValueReader{
idx2++;
res.insertTrueIndex++;
resCount++;
calculateFrequency(hasOverflowDataInThisPage, freqFilter, insertTrue.getFloat(idx2));
// if equal, take value from insertTrue and skip one
// value from page
if (insertTrue.getTime(idx2 - 1) == timeValues[timeIdx]) {
@ -526,7 +521,6 @@ public class OverflowValueReader extends ValueReader{
break;
}
float v = decoder.readFloat(page);
calculateFrequency(hasOverflowDataInThisPage, freqFilter, v);
if (mode == -1) {
if ((valueFilter == null && timeFilter == null)
|| (valueFilter != null && timeFilter == null
@ -604,7 +598,6 @@ public class OverflowValueReader extends ValueReader{
idx2++;
res.insertTrueIndex++;
resCount++;
calculateFrequency(hasOverflowDataInThisPage, freqFilter, insertTrue.getDouble(idx2));
// if equal, take value from insertTrue and skip one
// value from page
if (insertTrue.getTime(idx2 - 1) == timeValues[timeIdx]) {
@ -619,7 +612,6 @@ public class OverflowValueReader extends ValueReader{
break;
}
double v = decoder.readDouble(page);
calculateFrequency(hasOverflowDataInThisPage, freqFilter, v);
if (mode == -1) {
if ((valueFilter == null && timeFilter == null)
|| (valueFilter != null && timeFilter == null
@ -827,7 +819,7 @@ public class OverflowValueReader extends ValueReader{
}
//Record the current index for overflow info
insertTrue.curIdx = idx2;
// insertTrue.curIdx = idx2;
updateTrue.curIdx = idx[0];
updateFalse.curIdx = idx[1];
@ -864,7 +856,6 @@ public class OverflowValueReader extends ValueReader{
res.putInt(insertTrue.getInt(insertTrue.curIdx));
insertTrue.curIdx++;
res.insertTrueIndex++;
calculateFrequency(hasOverflowDataInThisPage, freqFilter, insertTrue.getInt(insertTrue.curIdx));
// if equal, take value from insertTrue and skip one
// value from page. That is to say, insert is like
// update.
@ -880,7 +871,6 @@ public class OverflowValueReader extends ValueReader{
break;
}
int v = decoder.readInt(page);
calculateFrequency(hasOverflowDataInThisPage, freqFilter, v);
if (mode == -1) {
if ((valueFilter == null && timeFilter == null)
@ -1036,14 +1026,15 @@ public class OverflowValueReader extends ValueReader{
case INT64:
while (decoder.hasNext(page)) {
// put insert points
// if(timeIdx == timeValues.length - 1){
System.out.println(timeIdx);
// }
while (insertTrue.curIdx < insertTrue.length && timeIdx < timeValues.length
&& insertTrue.getTime(insertTrue.curIdx) <= timeValues[timeIdx]) {
res.putTime(insertTrue.getTime(insertTrue.curIdx));
res.putLong(insertTrue.getLong(insertTrue.curIdx));
insertTrue.curIdx++;
res.insertTrueIndex++;
calculateFrequency(hasOverflowDataInThisPage, freqFilter,
insertTrue.getLong(insertTrue.curIdx));
// if equal, take value from insertTrue and skip one
// value from page
if (insertTrue.getTime(insertTrue.curIdx - 1) == timeValues[timeIdx]) {
@ -1058,7 +1049,6 @@ public class OverflowValueReader extends ValueReader{
break;
}
long v = decoder.readLong(page);
calculateFrequency(hasOverflowDataInThisPage, freqFilter, v);
if (mode == -1) {
if ((valueFilter == null && timeFilter == null)
|| (valueFilter != null && timeFilter == null
@ -1131,8 +1121,6 @@ public class OverflowValueReader extends ValueReader{
res.putFloat(insertTrue.getFloat(insertTrue.curIdx));
insertTrue.curIdx++;
res.insertTrueIndex++;
calculateFrequency(hasOverflowDataInThisPage, freqFilter,
insertTrue.getFloat(insertTrue.curIdx));
// if equal, take value from insertTrue and skip one
// value from page
if (insertTrue.getTime(insertTrue.curIdx - 1) == timeValues[timeIdx]) {
@ -1147,7 +1135,6 @@ public class OverflowValueReader extends ValueReader{
break;
}
float v = decoder.readFloat(page);
calculateFrequency(hasOverflowDataInThisPage, freqFilter, v);
if (mode == -1) {
if ((valueFilter == null && timeFilter == null)
|| (valueFilter != null && timeFilter == null
@ -1220,8 +1207,6 @@ public class OverflowValueReader extends ValueReader{
res.putDouble(insertTrue.getDouble(insertTrue.curIdx));
insertTrue.curIdx++;
res.insertTrueIndex++;
calculateFrequency(hasOverflowDataInThisPage, freqFilter,
insertTrue.getDouble(insertTrue.curIdx));
// if equal, take value from insertTrue and skip one
// value from page
if (insertTrue.getTime(insertTrue.curIdx - 1) == timeValues[timeIdx]) {
@ -1236,7 +1221,6 @@ public class OverflowValueReader extends ValueReader{
break;
}
double v = decoder.readDouble(page);
calculateFrequency(hasOverflowDataInThisPage, freqFilter, v);
if (mode == -1) {
if ((valueFilter == null && timeFilter == null)
|| (valueFilter != null && timeFilter == null
@ -1364,22 +1348,4 @@ public class OverflowValueReader extends ValueReader{
}
return false;
}
public void calculateFrequency(boolean hasOverflowDataInThisPage, SingleSeriesFilterExpression freqFilter, int v) {
}
public void calculateFrequency(boolean hasOverflowDataInThisPage, SingleSeriesFilterExpression freqFilter, long v) {
}
public void calculateFrequency(boolean hasOverflowDataInThisPage, SingleSeriesFilterExpression freqFilter, float v) {
}
public void calculateFrequency(boolean hasOverflowDataInThisPage, SingleSeriesFilterExpression freqFilter, double v) {
}
}

View File

@ -284,7 +284,7 @@ public class TSServiceImpl implements TSIService.Iface {
@Override
public TSExecuteBatchStatementResp ExecuteBatchStatement(TSExecuteBatchStatementReq req) throws TException {
try {
LOGGER.info("tsfile-server ExecuteBatchStatement: Receive execute batch sql operation");
LOGGER.debug("tsfile-server ExecuteBatchStatement: Receive execute batch sql operation");
if (!checkLogin()) {
LOGGER.info("tsfile-server ExecuteBatchStatement: Not login.");
return getTSBathcExecuteStatementResp(TS_StatusCode.ERROR_STATUS, "Not login", null);

View File

@ -8,6 +8,8 @@ import org.slf4j.LoggerFactory;
import cn.edu.thu.tsfile.common.conf.TSFileConfig;
import cn.edu.thu.tsfile.common.conf.TSFileDescriptor;
import cn.edu.thu.tsfiledb.conf.TSFileDBConfig;
import cn.edu.thu.tsfiledb.conf.TSFileDBDescriptor;
import cn.edu.thu.tsfiledb.qp.logical.operator.Operator.OperatorType;
import cn.edu.thu.tsfiledb.qp.physical.plan.PhysicalPlan;
import cn.edu.thu.tsfiledb.sys.writeLog.impl.LocalFileLogReader;
@ -19,7 +21,7 @@ public class WriteLogManager {
private PhysicalPlanLogTransfer transfer = new PhysicalPlanLogTransfer();
private WriteLogReadable reader;
private WriteLogPersistable writer = null;
private TSFileConfig config = TSFileDescriptor.getInstance().getConfig();
private TSFileDBConfig config = TSFileDBDescriptor.getInstance().getConfig();
private String logFile;
private WriteLogManager() {

View File

@ -11,6 +11,8 @@ import cn.edu.thu.tsfile.timeseries.utils.FileUtils;
import cn.edu.thu.tsfile.timeseries.utils.RecordUtils;
import cn.edu.thu.tsfile.timeseries.write.record.TSRecord;
import cn.edu.thu.tsfile.timeseries.write.schema.FileSchema;
import cn.edu.thu.tsfiledb.conf.TSFileDBConfig;
import cn.edu.thu.tsfiledb.conf.TSFileDBDescriptor;
import cn.edu.thu.tsfiledb.engine.exception.FileNodeManagerException;
import cn.edu.thu.tsfiledb.engine.filenode.FileNodeManager;
import cn.edu.thu.tsfiledb.exception.PathErrorException;
@ -38,7 +40,7 @@ public class LoadDataUtils {
private String measureType;
private long totalPointCount = 0;
private FileNodeManager fileNodeManager;
private TSFileConfig conf = TSFileDescriptor.getInstance().getConfig();
private TSFileDBConfig conf = TSFileDBDescriptor.getInstance().getConfig();
public LoadDataUtils() {
writeInstanceMap = new HashSet<>();

View File

@ -0,0 +1,22 @@
# exmaple tsfile config file
# 128M = 128*1024*1024
rowGroupSize=134217728
# 8KB = 8*1024
pageSize=8192
timeSeriesEncoder=TS_2DIFF
# timeSeriesEncoder=PLAIN
defaultSeriesEncoder=RLE
# defaultSeriesEncoder=PLAIN
compressName=UNCOMPRESSED
defaultRleBitWidth=8
defaultEndian=LITTLE_ENDIAN
defaultDeltaBlockSize=128
defaultPLAMaxError=100
defaultSDTMaxError=100
# RleLongDefaultNull=0
# RleIntDefaultNull=0
# TS2DiffLongDefaultNull=-1
# TS2DiffIntDefaultNull=-1
# defaultDeltaBlockSize=128

View File

@ -0,0 +1,23 @@
writeInstanceThreshold=5
overflowDataDir=src/main/resources/data/overflow
# FileNodeDir=src/main/resources/data/digest
# BufferWriteDir=src/main/resources/data/delta
# metadataDir=src/main/resources/metadata
# derbyHome=src/main/resources/derby
mergeConcurrentThreadNum =10
maxFileNodeNum =1000
maxOverflowNodeNum=100
maxBufferWriteNodeNum=50
defaultFetchSize=5000
# writeLogPath=src/main/resources/writeLog.log"

View File

@ -0,0 +1,24 @@
package cn.edu.thu.tsfiledb.auth;
import static org.junit.Assert.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class AuthDaoWrapTest {
@Before
public void setUp() throws Exception {
}
@After
public void tearDown() throws Exception {
}
@Test
public void test() {
fail("Not yet implemented");
}
}

View File

@ -37,7 +37,7 @@ public class AuthTest {
dbDao = new DBdao();
authDao = new AuthDao();
dbDao.open();
statement = dbDao.getStatement();
statement = DBdao.getStatement();
// init data
user1 = new User("user1", "user1");

View File

@ -0,0 +1,24 @@
package cn.edu.thu.tsfiledb.auth;
import static org.junit.Assert.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class AuthorizerTest {
@Before
public void setUp() throws Exception {
}
@After
public void tearDown() throws Exception {
}
@Test
public void test() {
fail("Not yet implemented");
}
}

View File

@ -38,7 +38,7 @@ public class RolePermissionTest {
permissionId = Permission.CREATE;
dbDao.open();
statement = dbDao.getStatement();
statement = DBdao.getStatement();
// if role not exist, create role
if (roleDao.getRole(statement, role.getRoleName()) == null) {

View File

@ -1,6 +1,6 @@
package cn.edu.thu.tsfiledb.auth;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import java.sql.Statement;
import java.util.ArrayList;
@ -12,6 +12,8 @@ import org.junit.Test;
import cn.edu.thu.tsfiledb.auth.dao.DBdao;
import cn.edu.thu.tsfiledb.auth.dao.RoleDao;
import cn.edu.thu.tsfiledb.auth.model.Role;
import cn.edu.thu.tsfiledb.conf.TSFileDBConfig;
import cn.edu.thu.tsfiledb.conf.TSFileDBDescriptor;
public class RoleTest {
@ -19,14 +21,15 @@ public class RoleTest {
private DBdao dbdao = null;
private RoleDao roleDao = null;
private Role role = null;
private String roleName = "role";
private TSFileDBConfig config = TSFileDBDescriptor.getInstance().getConfig();
@Before
public void setUp() throws Exception {
config.derbyHome = "";
dbdao = new DBdao();
dbdao.open();
statement = dbdao.getStatement();
statement = DBdao.getStatement();
roleDao = new RoleDao();
}
@ -36,12 +39,13 @@ public class RoleTest {
}
@Test
public void test() {
public void createAndDeleteTest() {
role = roleDao.getRole(statement, roleName);
if (role != null) {
System.out.println("Delete the original role");
roleDao.deleteRole(statement, roleName);
}
assertEquals(null, roleDao.getRole(statement, roleName));
// create role
role = new Role(roleName);
roleDao.createRole(statement, role);
@ -78,7 +82,5 @@ public class RoleTest {
assertEquals(0, getRoleNames.size());
roleDao.deleteRole(statement, role1.getRoleName());
roleDao.deleteRole(statement, role2.getRoleName());
}
}

View File

@ -1,6 +1,6 @@
package cn.edu.thu.tsfiledb.auth;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import java.sql.Statement;
@ -14,6 +14,8 @@ import cn.edu.thu.tsfiledb.auth.dao.UserPermissionDao;
import cn.edu.thu.tsfiledb.auth.model.Permission;
import cn.edu.thu.tsfiledb.auth.model.User;
import cn.edu.thu.tsfiledb.auth.model.UserPermission;
import cn.edu.thu.tsfiledb.conf.TSFileDBConfig;
import cn.edu.thu.tsfiledb.conf.TSFileDBDescriptor;
public class UserPemissionTest {
@ -25,15 +27,17 @@ public class UserPemissionTest {
private String nodeName = "nodeName";
private String newNodeName = "newNodeName";
private int permission;
private User user = new User("user1", "user1");
private TSFileDBConfig config = TSFileDBDescriptor.getInstance().getConfig();
@Before
public void setUp() throws Exception {
config.derbyHome = "";
permission = Permission.CREATE;
DBdao = new DBdao();
DBdao.open();
statement = DBdao.getStatement();
statement = cn.edu.thu.tsfiledb.auth.dao.DBdao.getStatement();
userDao = new UserDao();
UserPermissionDao = new UserPermissionDao();

View File

@ -1,6 +1,6 @@
package cn.edu.thu.tsfiledb.auth;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import java.sql.Statement;
import java.util.ArrayList;
@ -16,6 +16,8 @@ import cn.edu.thu.tsfiledb.auth.dao.UserRoleRelDao;
import cn.edu.thu.tsfiledb.auth.model.Role;
import cn.edu.thu.tsfiledb.auth.model.User;
import cn.edu.thu.tsfiledb.auth.model.UserRoleRel;
import cn.edu.thu.tsfiledb.conf.TSFileDBConfig;
import cn.edu.thu.tsfiledb.conf.TSFileDBDescriptor;
public class UserRoleRelTest {
@ -29,17 +31,20 @@ public class UserRoleRelTest {
Role role1 = new Role("role1");
Role role2 = new Role("role2");
private TSFileDBConfig config = TSFileDBDescriptor.getInstance().getConfig();
@Before
public void setUp() throws Exception {
config.derbyHome = "";
dbdao = new DBdao();
dbdao.open();
statement = dbdao.getStatement();
statement = DBdao.getStatement();
userDao = new UserDao();
roleDao = new RoleDao();
userRoleRelDao = new UserRoleRelDao();
// create user and role
// create user
if (userDao.getUser(statement, user1.getUserName()) == null) {
userDao.createUser(statement, user1);
}
@ -47,6 +52,7 @@ public class UserRoleRelTest {
userDao.createUser(statement, user2);
}
// create role
if (roleDao.getRole(statement, role1.getRoleName()) == null) {
roleDao.createRole(statement, role1);
}
@ -67,7 +73,7 @@ public class UserRoleRelTest {
}
@Test
public void test() {
public void createUserRoleRelTest() {
// create relation between user and role
String userName = "user1";
String roleName = "role1";
@ -119,7 +125,7 @@ public class UserRoleRelTest {
ArrayList<Integer> list = new ArrayList<>();
list.add(role1Id);
list.add(role2Id);
roleIds.removeAll(list);
assertEquals(0, roleIds.size());
// delete the relations
@ -136,19 +142,20 @@ public class UserRoleRelTest {
int role1Id = roleDao.getRole(statement, role1name).getId();
int user1Id = userDao.getUser(statement, user1name).getId();
int user2Id = userDao.getUser(statement, user2name).getId();
UserRoleRel userRoleRel1 = new UserRoleRel(user1Id, role1Id);
UserRoleRel userRoleRel2 = new UserRoleRel(user2Id, role1Id);
//if not exist, create the relations
if (userRoleRelDao.getUserRoleRel(statement, userRoleRel1)==null) {
// if not exist, create the relations
if (userRoleRelDao.getUserRoleRel(statement, userRoleRel1) == null) {
userRoleRelDao.createUserRoleRel(statement, userRoleRel1);
}
if (userRoleRelDao.getUserRoleRel(statement, userRoleRel2)==null) {
if (userRoleRelDao.getUserRoleRel(statement, userRoleRel2) == null) {
userRoleRelDao.createUserRoleRel(statement, userRoleRel2);
}
//get the relation and assert them
ArrayList<UserRoleRel> arrayList = (ArrayList<UserRoleRel>)userRoleRelDao.getUserRoleRelByRole(statement, role1Id);
// get the relation and assert them
ArrayList<UserRoleRel> arrayList = (ArrayList<UserRoleRel>) userRoleRelDao.getUserRoleRelByRole(statement,
role1Id);
ArrayList<Integer> userIds = new ArrayList<>();
for (UserRoleRel userRoleRel : arrayList) {
userIds.add(userRoleRel.getUserId());
@ -156,10 +163,10 @@ public class UserRoleRelTest {
ArrayList<Integer> list = new ArrayList<>();
list.add(user1Id);
list.add(user2Id);
userIds.removeAll(list);
assertEquals(0, userIds.size());
//delete the relations
// delete the relations
userRoleRelDao.deleteUserRoleRel(statement, userRoleRel1);
userRoleRelDao.deleteUserRoleRel(statement, userRoleRel2);
}

View File

@ -1,9 +1,10 @@
package cn.edu.thu.tsfiledb.auth;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import java.sql.Statement;
import java.util.ArrayList;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -11,6 +12,9 @@ import org.junit.Test;
import cn.edu.thu.tsfiledb.auth.dao.DBdao;
import cn.edu.thu.tsfiledb.auth.dao.UserDao;
import cn.edu.thu.tsfiledb.auth.model.User;
import cn.edu.thu.tsfiledb.conf.TSFileDBConfig;
import cn.edu.thu.tsfiledb.conf.TSFileDBDescriptor;
import cn.edu.thu.tsfiledb.engine.overflow.io.EngineTestHelper;
public class UserTest {
@ -21,16 +25,19 @@ public class UserTest {
private String userName = "testuser";
private String passWord = "password";
private User user = new User(userName, passWord);
private TSFileDBConfig config = TSFileDBDescriptor.getInstance().getConfig();
/**
* @throws Exception
* prepare to connect the derby DB
*/
@Before
public void setUp() throws Exception {
config.derbyHome = "";
EngineTestHelper.delete(config.derbyHome);
dBdao = new DBdao();
dBdao.open();
statement = dBdao.getStatement();
statement = DBdao.getStatement();
userDao = new UserDao();
}
@ -38,13 +45,11 @@ public class UserTest {
@After
public void tearDown() throws Exception {
dBdao.close();
EngineTestHelper.delete(config.derbyHome);
}
/**
* Create user first and delete the user
*/
@Test
public void test() {
public void createUserandDeleteUserTest() {
User getUser = userDao.getUser(statement, userName);
if (getUser != null) {
int deleteCount = userDao.deleteUser(statement, userName);
@ -60,7 +65,7 @@ public class UserTest {
// delete user
userDao.deleteUser(statement, userName);
getUser = userDao.getUser(statement, userName);
assertEquals(getUser, null);
assertEquals(null, getUser);
}
@Test
@ -77,40 +82,35 @@ public class UserTest {
userDao.createUser(statement, user2);
}
ArrayList<User> list = (ArrayList<User>) userDao.getUsers(statement);
// add the root user
// root user
assertEquals(3, list.size());
// int count = 0;
// // Some problems
// for (User user : list) {
// User testUser = arrayList.get(count);
// count++;
// if (user.getUserName().equals("root")) {
// continue;
// }
// assertEquals(testUser.getUserName(), user.getUserName());
// assertEquals(testUser.getPassWord(), user.getPassWord());
// }
userDao.deleteUser(statement, user1.getUserName());
userDao.deleteUser(statement, user2.getUserName());
assertEquals(null, userDao.getUser(statement, user1.getUserName()));
assertEquals(null, userDao.getUser(statement, user2.getUserName()));
}
@Test
public void updateUserTest(){
User user = new User("user", "user");
if ((userDao.getUser(statement, user.getUserName()))==null) {
userDao.createUser(statement, user);
}
user = userDao.getUser(statement, user.getUserName());
assertEquals("user", user.getUserName());
assertEquals("user", user.getPassWord());
// update password
String updatePassword = "password";
userDao.updateUserPassword(statement,
user.getUserName(), updatePassword);
user = userDao.getUser(statement, user.getUserName());
assertEquals("user", user.getUserName());
assertEquals(updatePassword, user.getPassWord());
userDao.deleteUser(statement, user.getUserName());
}
@Test
public void updateUserTest() {
String username = "user";
String oldPassword = username;
User user = new User(username, oldPassword);
if ((userDao.getUser(statement, user.getUserName())) == null) {
userDao.createUser(statement, user);
}
user = userDao.getUser(statement, user.getUserName());
assertEquals(username, user.getUserName());
assertEquals(oldPassword, user.getPassWord());
// update password
String updatePassword = "password";
userDao.updateUserPassword(statement, user.getUserName(), updatePassword);
user = userDao.getUser(statement, user.getUserName());
assertEquals(username, user.getUserName());
assertEquals(updatePassword, user.getPassWord());
userDao.deleteUser(statement, user.getUserName());
}
}

View File

@ -22,6 +22,8 @@ import cn.edu.thu.tsfile.common.utils.TSRandomAccessFileWriter;
import cn.edu.thu.tsfile.file.metadata.RowGroupMetaData;
import cn.edu.thu.tsfile.file.metadata.enums.TSDataType;
import cn.edu.thu.tsfile.timeseries.read.query.DynamicOneColumnData;
import cn.edu.thu.tsfiledb.conf.TSFileDBConfig;
import cn.edu.thu.tsfiledb.conf.TSFileDBDescriptor;
import cn.edu.thu.tsfiledb.engine.bufferwrite.Action;
import cn.edu.thu.tsfiledb.engine.bufferwrite.BufferWriteProcessor;
import cn.edu.thu.tsfiledb.engine.bufferwrite.FileNodeConstants;
@ -76,7 +78,8 @@ public class BufferWriteProcessorTest {
parameters.put(FileNodeConstants.BUFFERWRITE_CLOSE_ACTION, bfcloseaction);
parameters.put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, fnflushaction);
TSFileConfig tsconfig = TSFileDescriptor.getInstance().getConfig();
tsconfig.BufferWriteDir = "";
TSFileDBConfig tsdbconfig = TSFileDBDescriptor.getInstance().getConfig();
tsdbconfig.BufferWriteDir = "";
tsconfig.rowGroupSize = 2000;
tsconfig.pageCheckSizeThreshold = 3;
tsconfig.pageSize = 100;

View File

@ -19,6 +19,8 @@ import cn.edu.thu.tsfile.timeseries.filter.definition.SingleSeriesFilterExpressi
import cn.edu.thu.tsfile.timeseries.read.query.DynamicOneColumnData;
import cn.edu.thu.tsfile.timeseries.write.record.DataPoint;
import cn.edu.thu.tsfile.timeseries.write.record.TSRecord;
import cn.edu.thu.tsfiledb.conf.TSFileDBConfig;
import cn.edu.thu.tsfiledb.conf.TSFileDBDescriptor;
import cn.edu.thu.tsfiledb.engine.exception.FileNodeManagerException;
import cn.edu.thu.tsfiledb.engine.lru.MetadataManagerHelper;
import cn.edu.thu.tsfiledb.engine.overflow.io.EngineTestHelper;
@ -26,35 +28,38 @@ import cn.edu.thu.tsfiledb.engine.overflow.io.EngineTestHelper;
public class FileNodeManagerTest {
private TSFileConfig tsconfig = TSFileDescriptor.getInstance().getConfig();
private TSFileDBConfig tsdbconfig = TSFileDBDescriptor.getInstance().getConfig();
private FileNodeManager fManager = null;
private String deltaObjectId = "root.vehicle.d0";
private String deltaObjectId2 = "root.vehicle.d1";
private String measurementId = "s0";
private TSDataType dataType = TSDataType.INT32;
@Before
public void setUp() throws Exception {
tsconfig.FileNodeDir = "filenode" + File.separatorChar;
tsconfig.BufferWriteDir = "bufferwrite";
tsconfig.overflowDataDir = "overflow";
tsdbconfig.FileNodeDir = "filenode" + File.separatorChar;
tsdbconfig.BufferWriteDir = "bufferwrite";
tsdbconfig.overflowDataDir = "overflow";
// set rowgroupsize
tsconfig.rowGroupSize = 2000;
tsconfig.pageCheckSizeThreshold = 3;
tsconfig.pageSize = 100;
tsconfig.defaultMaxStringLength = 2;
EngineTestHelper.delete(tsconfig.FileNodeDir);
EngineTestHelper.delete(tsconfig.BufferWriteDir);
EngineTestHelper.delete(tsconfig.overflowDataDir);
EngineTestHelper.delete(tsdbconfig.FileNodeDir);
EngineTestHelper.delete(tsdbconfig.BufferWriteDir);
EngineTestHelper.delete(tsdbconfig.overflowDataDir);
MetadataManagerHelper.initMetadata();
}
@After
public void tearDown() throws Exception {
EngineTestHelper.delete(tsconfig.FileNodeDir);
EngineTestHelper.delete(tsconfig.BufferWriteDir);
EngineTestHelper.delete(tsconfig.overflowDataDir);
EngineTestHelper.delete(tsdbconfig.FileNodeDir);
EngineTestHelper.delete(tsdbconfig.BufferWriteDir);
EngineTestHelper.delete(tsdbconfig.overflowDataDir);
MetadataManagerHelper.clearMetadata();
}
@ -76,7 +81,7 @@ public class FileNodeManagerTest {
pairList.add(new Pair<Long, Long>(300L, 400L));
pairList.add(new Pair<Long, Long>(500L, 600L));
pairList.add(new Pair<Long, Long>(700L, 800L));
createBufferwriteFiles(pairList);
createBufferwriteFiles(pairList, deltaObjectId);
createBufferwriteInMemory(new Pair<Long, Long>(900L, 1000L));
@ -121,12 +126,12 @@ public class FileNodeManagerTest {
pairList.add(new Pair<Long, Long>(300L, 400L));
pairList.add(new Pair<Long, Long>(500L, 600L));
pairList.add(new Pair<Long, Long>(700L, 800L));
createBufferwriteFiles(pairList);
createBufferwriteFiles(pairList, deltaObjectId);
long[] overflowInsert1 = { 2, 4, 6, 8 };
long[] overflowInsert2 = { 202, 204, 206, 208 };
createOverflowInserts(overflowInsert1);
createOverflowInserts(overflowInsert1, deltaObjectId);
try {
int token = fManager.beginQuery(deltaObjectId);
QueryStructure queryResult = fManager.query(deltaObjectId, measurementId, null, null, null);
@ -145,7 +150,7 @@ public class FileNodeManagerTest {
e.printStackTrace();
fail(e.getMessage());
}
createOverflowInserts(overflowInsert2);
createOverflowInserts(overflowInsert2, deltaObjectId);
try {
int token = fManager.beginQuery(deltaObjectId);
@ -178,12 +183,12 @@ public class FileNodeManagerTest {
pairList.add(new Pair<Long, Long>(300L, 400L));
pairList.add(new Pair<Long, Long>(500L, 600L));
pairList.add(new Pair<Long, Long>(700L, 800L));
createBufferwriteFiles(pairList);
createBufferwriteFiles(pairList, deltaObjectId);
// overflow update
List<Pair<Long, Long>> overflowUpdate1 = new ArrayList<>();
overflowUpdate1.add(new Pair<Long, Long>(150L, 170L));
createOverflowUpdates(overflowUpdate1);
createOverflowUpdates(overflowUpdate1, deltaObjectId);
try {
int token = fManager.beginQuery(deltaObjectId);
@ -212,7 +217,7 @@ public class FileNodeManagerTest {
pairList.add(new Pair<Long, Long>(300L, 400L));
pairList.add(new Pair<Long, Long>(500L, 600L));
pairList.add(new Pair<Long, Long>(700L, 800L));
createBufferwriteFiles(pairList);
createBufferwriteFiles(pairList, deltaObjectId);
long overflowDelete1 = 50;
@ -244,21 +249,26 @@ public class FileNodeManagerTest {
pairList.add(new Pair<Long, Long>(300L, 400L));
pairList.add(new Pair<Long, Long>(500L, 600L));
pairList.add(new Pair<Long, Long>(700L, 800L));
createBufferwriteFiles(pairList);
createBufferwriteFiles(pairList, deltaObjectId);
createBufferwriteFiles(pairList,deltaObjectId2);
long[] overflowInsert1 = { 2, 4, 6, 8 };
long[] overflowInsert2 = { 202, 204, 206, 208 };
// new file: 2-208 300-400 500-600 700-800
// not close
createOverflowInserts(overflowInsert1);
createOverflowInserts(overflowInsert1, deltaObjectId);
createOverflowInserts(overflowInsert1,deltaObjectId2);
// not close
createOverflowInserts(overflowInsert2);
createOverflowInserts(overflowInsert2, deltaObjectId);
createOverflowInserts(overflowInsert2, deltaObjectId2);
fManager = FileNodeManager.getInstance();
try {
assertEquals(true, fManager.mergeAll());
// query: check file range
// check overflow file zero
fManager.mergeAll();
int token = fManager.beginQuery(deltaObjectId);
QueryStructure queryResult = fManager.query(deltaObjectId, measurementId, null, null, null);
fManager.endQuery(deltaObjectId, token);
DynamicOneColumnData bufferwriteindex = queryResult.getBufferwriteDataInMemory();
assertEquals(null, bufferwriteindex);
List<RowGroupMetaData> bufferwriteindisk = queryResult.getBufferwriteDataInDisk();
@ -266,46 +276,85 @@ public class FileNodeManagerTest {
List<IntervalFileNode> bufferwriteFiles = queryResult.getBufferwriteDataInFiles();
assertEquals(pairList.size(), bufferwriteFiles.size());
IntervalFileNode temp = bufferwriteFiles.get(0);
// range 1: 2-200
assertEquals(2, temp.startTime);
// range 1: 2-208
assertEquals(100, temp.startTime);
assertEquals(200, temp.endTime);
// range 2: 202-400
assertEquals(202, temp.startTime);
temp = bufferwriteFiles.get(1);
assertEquals(300, temp.startTime);
assertEquals(400, temp.endTime);
// range 3: 500-600
temp = bufferwriteFiles.get(2);
assertEquals(500, temp.startTime);
assertEquals(600, temp.endTime);
// range 4: 700-800
temp = bufferwriteFiles.get(3);
assertEquals(700, temp.startTime);
assertEquals(800, temp.endTime);
List<Object> overflowData = queryResult.getAllOverflowData();
assertEquals(true, overflowData.get(0) != null);
assertEquals(true, overflowData.get(1) != null);
assertEquals(true, overflowData.get(2) != null);
assertEquals(true, overflowData.get(3) != null);
// wait to merge over
waitToSleep(1000);
token = fManager.beginQuery(deltaObjectId);
queryResult = fManager.query(deltaObjectId, measurementId, null, null, null);
fManager.endQuery(deltaObjectId, token);
bufferwriteindex = queryResult.getBufferwriteDataInMemory();
assertEquals(null, bufferwriteindex);
bufferwriteindisk = queryResult.getBufferwriteDataInDisk();
assertEquals(null, bufferwriteindisk);
bufferwriteFiles = queryResult.getBufferwriteDataInFiles();
assertEquals(pairList.size(), bufferwriteFiles.size());
temp = bufferwriteFiles.get(0);
// range 1: 2-208
assertEquals(2, temp.startTime);
assertEquals(208, temp.endTime);
// range 2: 202-400
temp = bufferwriteFiles.get(1);
assertEquals(300, temp.startTime);
assertEquals(400, temp.endTime);
// range 3: 500-600
temp = bufferwriteFiles.get(2);
assertEquals(500, temp.startTime);
assertEquals(600, temp.endTime);
// range 4: 700-800
temp = bufferwriteFiles.get(3);
assertEquals(700, temp.startTime);
assertEquals(800, temp.endTime);
overflowData = queryResult.getAllOverflowData();
assertEquals(null, overflowData.get(0));
assertEquals(null, overflowData.get(1));
assertEquals(null, overflowData.get(2));
assertEquals(null, overflowData.get(3));
fManager.closeAll();
} catch (FileNodeManagerException e) {
e.printStackTrace();
fail(e.getMessage());
}
}
}
private void waitToSleep() {
private void waitToSleep(long waitTime) {
try {
Thread.sleep(10);
Thread.sleep(waitTime);
} catch (InterruptedException e) {
e.printStackTrace();
fail(e.getMessage());
}
}
private void createBufferwriteFiles(List<Pair<Long, Long>> pairList) {
private void createBufferwriteFiles(List<Pair<Long, Long>> pairList, String deltaObjectId) {
for (Pair<Long, Long> timePair : pairList) {
createBufferwriteFile(timePair);
createBufferwriteFile(timePair, deltaObjectId);
}
}
private void createBufferwriteFile(Pair<Long, Long> timePair) {
private void createBufferwriteFile(Pair<Long, Long> timePair, String deltaObjectId) {
long startTime = timePair.left;
long endTime = timePair.right;
@ -349,7 +398,7 @@ public class FileNodeManagerTest {
}
}
private void createOverflowInserts(long[] times) {
private void createOverflowInserts(long[] times, String deltaObjectId) {
fManager = FileNodeManager.getInstance();
for (long time : times) {
TSRecord record = new TSRecord(time, deltaObjectId);
@ -364,7 +413,7 @@ public class FileNodeManagerTest {
}
}
private void createOverflowUpdates(List<Pair<Long, Long>> timePairs) {
private void createOverflowUpdates(List<Pair<Long, Long>> timePairs, String deltaObjectId) {
fManager = FileNodeManager.getInstance();
for (Pair<Long, Long> time : timePairs) {
try {

View File

@ -20,18 +20,23 @@ import cn.edu.thu.tsfile.common.utils.Pair;
import cn.edu.thu.tsfile.file.metadata.RowGroupMetaData;
import cn.edu.thu.tsfile.file.metadata.enums.TSDataType;
import cn.edu.thu.tsfile.timeseries.read.query.DynamicOneColumnData;
import cn.edu.thu.tsfiledb.conf.TSFileDBConfig;
import cn.edu.thu.tsfiledb.conf.TSFileDBDescriptor;
import cn.edu.thu.tsfiledb.engine.bufferwrite.Action;
import cn.edu.thu.tsfiledb.engine.bufferwrite.BufferWriteProcessor;
import cn.edu.thu.tsfiledb.engine.bufferwrite.FileNodeConstants;
import cn.edu.thu.tsfiledb.engine.exception.BufferWriteProcessorException;
import cn.edu.thu.tsfiledb.engine.exception.FileNodeProcessorException;
import cn.edu.thu.tsfiledb.engine.exception.OverflowProcessorException;
import cn.edu.thu.tsfiledb.engine.lru.MetadataManagerHelper;
import cn.edu.thu.tsfiledb.engine.overflow.io.EngineTestHelper;
import cn.edu.thu.tsfiledb.engine.overflow.io.OverflowProcessor;
public class FileNodeProcessorTest {
TSFileConfig tsconfig = TSFileDescriptor.getInstance().getConfig();
private TSFileDBConfig tsdbconfig = TSFileDBDescriptor.getInstance().getConfig();
private TSFileConfig tsconfig = TSFileDescriptor.getInstance().getConfig();
private FileNodeProcessor processor = null;
@ -61,9 +66,10 @@ public class FileNodeProcessorTest {
@Before
public void setUp() throws Exception {
tsconfig.FileNodeDir = "filenode" + File.separatorChar;
tsconfig.BufferWriteDir = "bufferwrite";
tsconfig.overflowDataDir = "overflow";
tsdbconfig.FileNodeDir = "filenode" + File.separatorChar;
tsdbconfig.BufferWriteDir = "bufferwrite";
tsdbconfig.overflowDataDir = "overflow";
tsdbconfig.metadataDir = "metadata";
// set rowgroupsize
tsconfig.rowGroupSize = 2000;
tsconfig.pageCheckSizeThreshold = 3;
@ -73,23 +79,27 @@ public class FileNodeProcessorTest {
parameters = new HashMap<>();
parameters.put(FileNodeConstants.OVERFLOW_BACKUP_MANAGER_ACTION, overflowBackUpAction);
parameters.put(FileNodeConstants.OVERFLOW_FLUSH_MANAGER_ACTION, overflowFlushAction);
EngineTestHelper.delete(tsconfig.FileNodeDir);
EngineTestHelper.delete(tsconfig.BufferWriteDir);
EngineTestHelper.delete(tsconfig.overflowDataDir);
EngineTestHelper.delete(tsdbconfig.FileNodeDir);
EngineTestHelper.delete(tsdbconfig.BufferWriteDir);
EngineTestHelper.delete(tsdbconfig.overflowDataDir);
EngineTestHelper.delete(tsdbconfig.metadataDir);
MetadataManagerHelper.initMetadata();
}
@After
public void tearDown() throws Exception {
EngineTestHelper.delete(tsconfig.FileNodeDir);
EngineTestHelper.delete(tsconfig.BufferWriteDir);
EngineTestHelper.delete(tsconfig.overflowDataDir);
EngineTestHelper.delete(tsdbconfig.FileNodeDir);
EngineTestHelper.delete(tsdbconfig.BufferWriteDir);
EngineTestHelper.delete(tsdbconfig.overflowDataDir);
EngineTestHelper.delete(tsdbconfig.metadataDir);
MetadataManagerHelper.clearMetadata();
}
@Test
public void testGetAndCloseProcessor() {
try {
processor = new FileNodeProcessor(tsconfig.FileNodeDir, deltaObjectId, parameters);
processor = new FileNodeProcessor(tsdbconfig.FileNodeDir, deltaObjectId, parameters);
assertEquals(-1, processor.getLastUpdateTime());
processor.setLastUpdateTime(20);
assertEquals(20, processor.getLastUpdateTime());
@ -102,7 +112,7 @@ public class FileNodeProcessorTest {
BufferWriteProcessor bfprocessor = processor.getBufferWriteProcessor(deltaObjectId, lastUpdateTime);
String filename = bfprocessor.getFileName();
String bufferwritefilePath = tsconfig.BufferWriteDir + File.separatorChar + deltaObjectId
String bufferwritefilePath = tsdbconfig.BufferWriteDir + File.separatorChar + deltaObjectId
+ File.separatorChar + filename;
assertEquals(true, new File(bufferwritefilePath).exists());
// add intervalFileNode
@ -118,10 +128,10 @@ public class FileNodeProcessorTest {
// get overflow processor
OverflowProcessor ofprocessor = processor.getOverflowProcessor(deltaObjectId, parameters);
assertEquals(ofprocessor, processor.getOverflowProcessor());
ofprocessor.insert(measurementId, measurementId, 5, TSDataType.INT32, String.valueOf(5));
ofprocessor.insert(deltaObjectId, measurementId, 5, TSDataType.INT32, String.valueOf(5));
String overflowfile = ofprocessor.getFileName();
String overflowfilePath = tsconfig.overflowDataDir + File.separatorChar + deltaObjectId + File.separatorChar
+ overflowfile;
String overflowfilePath = tsdbconfig.overflowDataDir + File.separatorChar + deltaObjectId
+ File.separatorChar + overflowfile;
assertEquals(true, new File(overflowfilePath).exists());
String overflowfileRestorePath = overflowfilePath + ".restore";
assertEquals(false, new File(overflowfileRestorePath).exists());
@ -142,7 +152,7 @@ public class FileNodeProcessorTest {
}
}
@Test
@Deprecated
public void testMerge() {
List<Pair<Long, Long>> bufferwriteRanges = new ArrayList<>();
@ -156,7 +166,7 @@ public class FileNodeProcessorTest {
createOverflowdata();
try {
processor = new FileNodeProcessor(tsconfig.FileNodeDir, deltaObjectId, parameters);
processor = new FileNodeProcessor(tsdbconfig.FileNodeDir, deltaObjectId, parameters);
processor.writeLock();
if (processor.hasBufferwriteProcessor()) {
processor.getBufferWriteProcessor().close();
@ -165,7 +175,6 @@ public class FileNodeProcessorTest {
processor.getOverflowProcessor(deltaObjectId, parameters);
}
processor.getOverflowProcessor().close();
processor.merge();
// check the result
QueryStructure queryRestult = processor.query(deltaObjectId, measurementId, null, null, null);
@ -226,7 +235,7 @@ public class FileNodeProcessorTest {
try {
// test memory data in index
processor = new FileNodeProcessor(tsconfig.FileNodeDir, deltaObjectId, parameters);
processor = new FileNodeProcessor(tsdbconfig.FileNodeDir, deltaObjectId, parameters);
BufferWriteProcessor bfprocessor = processor.getBufferWriteProcessor(deltaObjectId, 1);
bfprocessor.setNewProcessor(false);
processor.addIntervalFileNode(1, bfprocessor.getFileName());
@ -291,7 +300,7 @@ public class FileNodeProcessorTest {
processor.close();
// test data in closed bufferwrite file
processor = new FileNodeProcessor(tsconfig.FileNodeDir, deltaObjectId, parameters);
processor = new FileNodeProcessor(tsdbconfig.FileNodeDir, deltaObjectId, parameters);
bfprocessor = processor.getBufferWriteProcessor(deltaObjectId, 401);
bfprocessor.setNewProcessor(false);
processor.addIntervalFileNode(401, bfprocessor.getFileName());
@ -332,7 +341,7 @@ public class FileNodeProcessorTest {
assertEquals(null, overflowResult.get(3));
processor.close();
processor = new FileNodeProcessor(tsconfig.FileNodeDir, deltaObjectId, parameters);
processor = new FileNodeProcessor(tsdbconfig.FileNodeDir, deltaObjectId, parameters);
bfprocessor = processor.getBufferWriteProcessor(deltaObjectId, 801);
bfprocessor.setNewProcessor(false);
processor.addIntervalFileNode(801, bfprocessor.getFileName());
@ -345,7 +354,7 @@ public class FileNodeProcessorTest {
}
processor.close();
processor = new FileNodeProcessor(tsconfig.FileNodeDir, deltaObjectId, parameters);
processor = new FileNodeProcessor(tsdbconfig.FileNodeDir, deltaObjectId, parameters);
bfprocessor = processor.getBufferWriteProcessor(deltaObjectId, 820);
bfprocessor.setNewProcessor(false);
processor.addIntervalFileNode(820, bfprocessor.getFileName());
@ -359,7 +368,7 @@ public class FileNodeProcessorTest {
processor.close();
// mkdir: test delete unused file in construct the filenode
// processor
// String tempFilePath = tsconfig.BufferWriteDir +
// String tempFilePath = tsdbconfig.BufferWriteDir +
// File.separatorChar + deltaObjectId + File.separatorChar
// + "temp";
// File tempFile = new File(tempFilePath);
@ -367,7 +376,7 @@ public class FileNodeProcessorTest {
// assertEquals(true, tempFile.exists());
// file range: 1-400 401-800 801-819 820-839
processor = new FileNodeProcessor(tsconfig.FileNodeDir, deltaObjectId, parameters);
processor = new FileNodeProcessor(tsdbconfig.FileNodeDir, deltaObjectId, parameters);
// assertEquals(false, tempFile.exists());
// overflow data
@ -422,7 +431,7 @@ public class FileNodeProcessorTest {
public void testRecoveryBufferFile() {
try {
processor = new FileNodeProcessor(tsconfig.FileNodeDir, deltaObjectId, parameters);
processor = new FileNodeProcessor(tsdbconfig.FileNodeDir, deltaObjectId, parameters);
BufferWriteProcessor bfprocessor = processor.getBufferWriteProcessor(deltaObjectId, 1);
bfprocessor.setNewProcessor(false);
processor.addIntervalFileNode(1, bfprocessor.getFileName());
@ -454,7 +463,7 @@ public class FileNodeProcessorTest {
assertEquals(null, overflowResult.get(2));
assertEquals(null, overflowResult.get(3));
// not close and restore the bufferwrite file
processor = new FileNodeProcessor(tsconfig.FileNodeDir, deltaObjectId, parameters);
processor = new FileNodeProcessor(tsdbconfig.FileNodeDir, deltaObjectId, parameters);
assertEquals(true, processor.hasBufferwriteProcessor());
assertEquals(true, processor.hasOverflowProcessor());
queryResult = processor.query(deltaObjectId, measurementId, null, null, null);
@ -506,7 +515,7 @@ public class FileNodeProcessorTest {
fileNodeProcessorStore = new FileNodeProcessorStore(500, emptyIntervalFileNode, newFilenodes,
fileNodeProcessorState, 0);
String filenodedirPath = tsconfig.FileNodeDir + deltaObjectId + File.separatorChar;
String filenodedirPath = tsdbconfig.FileNodeDir + deltaObjectId + File.separatorChar;
File file = new File(filenodedirPath);
if (!file.exists()) {
file.mkdirs();
@ -524,7 +533,7 @@ public class FileNodeProcessorTest {
// test recovery from waiting
try {
processor = new FileNodeProcessor(tsconfig.FileNodeDir, deltaObjectId, parameters);
processor = new FileNodeProcessor(tsdbconfig.FileNodeDir, deltaObjectId, parameters);
assertEquals(fileNodeProcessorStore.getLastUpdateTime(), processor.getLastUpdateTime());
processor.close();
FileNodeProcessorStore store = serializeUtil.deserialize(filenodestorePath).orElse(null);
@ -537,8 +546,10 @@ public class FileNodeProcessorTest {
// check file
for (IntervalFileNode node : store.getNewFileNodes()) {
checkFile(node.filePath);
EngineTestHelper.delete(node.filePath);
}
checkUnFile(unusedFilename);
EngineTestHelper.delete(unusedFilename);
} catch (FileNodeProcessorException e) {
e.printStackTrace();
fail(e.getMessage());
@ -572,7 +583,7 @@ public class FileNodeProcessorTest {
fileNodeProcessorStore = new FileNodeProcessorStore(500, emptyIntervalFileNode, newFilenodes,
fileNodeProcessorState, 0);
String filenodedirPath = tsconfig.FileNodeDir + deltaObjectId + File.separatorChar;
String filenodedirPath = tsdbconfig.FileNodeDir + deltaObjectId + File.separatorChar;
File file = new File(filenodedirPath);
if (!file.exists()) {
file.mkdirs();
@ -590,7 +601,7 @@ public class FileNodeProcessorTest {
// test recovery from waiting
try {
processor = new FileNodeProcessor(tsconfig.FileNodeDir, deltaObjectId, parameters);
processor = new FileNodeProcessor(tsdbconfig.FileNodeDir, deltaObjectId, parameters);
assertEquals(fileNodeProcessorStore.getLastUpdateTime(), processor.getLastUpdateTime());
} catch (FileNodeProcessorException e) {
e.printStackTrace();
@ -598,7 +609,7 @@ public class FileNodeProcessorTest {
}
}
@Test
@Deprecated
public void testRevoceryMerge2() {
// create bufferwrite files
List<Pair<Long, Long>> bufferwriteRanges = new ArrayList<>();
@ -613,7 +624,7 @@ public class FileNodeProcessorTest {
checkFile(unusedFilename);
try {
// check the bufferwrite files
processor = new FileNodeProcessor(tsconfig.FileNodeDir, deltaObjectId, parameters);
processor = new FileNodeProcessor(tsdbconfig.FileNodeDir, deltaObjectId, parameters);
processor.getOverflowProcessor(deltaObjectId, parameters);
int token = processor.addMultiPassLock();
QueryStructure queryResult = processor.query(deltaObjectId, measurementId, null, null, null);
@ -630,8 +641,6 @@ public class FileNodeProcessorTest {
IntervalFileNode node = (IntervalFileNode) newInterFiles.get(i);
assertEquals(bufferwriteRanges.get(i).left.longValue(), node.startTime);
assertEquals(bufferwriteRanges.get(i).right.longValue(), node.endTime);
// check one file
checkFile(node.filePath);
}
processor.close();
} catch (FileNodeProcessorException e) {
@ -645,7 +654,7 @@ public class FileNodeProcessorTest {
List<IntervalFileNode> restoreNewFiles = new ArrayList<>();
long lastUpdateTime = -1;
try {
processor = new FileNodeProcessor(tsconfig.FileNodeDir, deltaObjectId, parameters);
processor = new FileNodeProcessor(tsdbconfig.FileNodeDir, deltaObjectId, parameters);
lastUpdateTime = processor.getLastUpdateTime();
processor.getOverflowProcessor(deltaObjectId, parameters);
int token = processor.addMultiPassLock();
@ -706,7 +715,7 @@ public class FileNodeProcessorTest {
fileNodeProcessorStore = new FileNodeProcessorStore(lastUpdateTime, emptyIntervalFileNode, restoreNewFiles,
fileNodeProcessorState, 0);
String filenodedirPath = tsconfig.FileNodeDir + deltaObjectId + File.separatorChar;
String filenodedirPath = tsdbconfig.FileNodeDir + deltaObjectId + File.separatorChar;
File file = new File(filenodedirPath);
if (!file.exists()) {
file.mkdirs();
@ -724,7 +733,7 @@ public class FileNodeProcessorTest {
// restore from merge
try {
processor = new FileNodeProcessor(tsconfig.FileNodeDir, deltaObjectId, parameters);
processor = new FileNodeProcessor(tsdbconfig.FileNodeDir, deltaObjectId, parameters);
// it will merge automatically
QueryStructure queryRestult = processor.query(deltaObjectId, measurementId, null, null, null);
DynamicOneColumnData bufferwriteinindex = queryRestult.getBufferwriteDataInMemory();
@ -777,7 +786,7 @@ public class FileNodeProcessorTest {
public void testMergeFromEmptyIntervalFile() {
// test merege from Empty Interval File
try {
processor = new FileNodeProcessor(tsconfig.FileNodeDir, deltaObjectId, parameters);
processor = new FileNodeProcessor(tsdbconfig.FileNodeDir, deltaObjectId, parameters);
// set lastupdate time
processor.setLastUpdateTime(100);
processor.close();
@ -786,7 +795,7 @@ public class FileNodeProcessorTest {
// insert:2,22,62; update:50-70
createOverflowdata();
processor = new FileNodeProcessor(tsconfig.FileNodeDir, deltaObjectId, parameters);
processor = new FileNodeProcessor(tsdbconfig.FileNodeDir, deltaObjectId, parameters);
processor.getOverflowProcessor(deltaObjectId, parameters);
// check overflow data
QueryStructure queryResult = processor.query(deltaObjectId, measurementId, null, null, null);
@ -802,7 +811,7 @@ public class FileNodeProcessorTest {
assertEquals(222, insertData.getInt(1));
assertEquals(62, insertData.getTime(2));
assertEquals(333, insertData.getInt(2));
DynamicOneColumnData updateData = (DynamicOneColumnData) overflowResult.get(1);
assertEquals(2, updateData.length);
assertEquals(4, updateData.timeLength);
@ -823,12 +832,7 @@ public class FileNodeProcessorTest {
private void createFile(String filename) {
String filePath = tsconfig.BufferWriteDir + File.separatorChar + deltaObjectId;
File dataDir = new File(filePath);
if (!dataDir.exists()) {
dataDir.mkdirs();
}
File file = new File(dataDir, filename);
File file = new File(filename);
if (!file.exists()) {
file.mkdir();
}
@ -836,18 +840,13 @@ public class FileNodeProcessorTest {
private void checkFile(String filename) {
String filePath = tsconfig.BufferWriteDir + File.separatorChar + deltaObjectId;
File dataDir = new File(filePath);
if (!dataDir.exists()) {
dataDir.mkdirs();
}
File file = new File(dataDir, filename);
File file = new File(filename);
assertEquals(true, file.exists());
}
private void checkUnFile(String filename) {
String filePath = tsconfig.BufferWriteDir + File.separatorChar + deltaObjectId;
String filePath = tsdbconfig.BufferWriteDir + File.separatorChar + deltaObjectId;
File dataDir = new File(filePath);
if (!dataDir.exists()) {
dataDir.mkdirs();
@ -876,7 +875,7 @@ public class FileNodeProcessorTest {
*/
private void createBufferwriteFile(long begin, long end) {
try {
processor = new FileNodeProcessor(tsconfig.FileNodeDir, deltaObjectId, parameters);
processor = new FileNodeProcessor(tsdbconfig.FileNodeDir, deltaObjectId, parameters);
BufferWriteProcessor bfProcessor = processor.getBufferWriteProcessor(deltaObjectId, begin);
assertEquals(true, bfProcessor.isNewProcessor());
bfProcessor.write(measurementId, measurementId, begin, TSDataType.INT32, String.valueOf(begin));
@ -915,7 +914,7 @@ public class FileNodeProcessorTest {
private void createOverflowInsert(long time, int value) {
try {
processor = new FileNodeProcessor(tsconfig.FileNodeDir, deltaObjectId, parameters);
processor = new FileNodeProcessor(tsdbconfig.FileNodeDir, deltaObjectId, parameters);
OverflowProcessor ofProcessor = processor.getOverflowProcessor(deltaObjectId, parameters);
ofProcessor.insert(deltaObjectId, measurementId, time, TSDataType.INT32, String.valueOf(value));
processor.changeTypeToChanged(time);
@ -935,7 +934,7 @@ public class FileNodeProcessorTest {
private void createOverflowUpdate(long begin, long end, int value) {
try {
processor = new FileNodeProcessor(tsconfig.FileNodeDir, deltaObjectId, parameters);
processor = new FileNodeProcessor(tsdbconfig.FileNodeDir, deltaObjectId, parameters);
OverflowProcessor ofProcessor = processor.getOverflowProcessor(deltaObjectId, parameters);
ofProcessor.update(deltaObjectId, measurementId, begin, end, TSDataType.INT32, String.valueOf(value));
processor.changeTypeToChanged(begin, end);

View File

@ -104,22 +104,28 @@ public class LRUManagerTest {
processor.writeUnlock();
// multiple thread write test
Thread thread = new Thread(new GetWriterProcessor());
Thread thread = new Thread(new GetWriterProcessor(deltaObjectId));
thread.start();
Thread.sleep(100);
// the other thread get the write lock for the processor of
// deltaObjectId1
processor = manager.getProcessorByLRU(deltaObjectId, true);
assertEquals(null, processor);
// the max of the manager is 1, and the processor of deltaObjectId2
// can't construct
processor = manager.getProcessorByLRU(deltaObjectId2, true);
assertEquals(null, processor);
// the processor of deltaObjectId1 is used, the manager closed completly
assertEquals(false, manager.close());
Thread.sleep(1000);
processor = manager.getProcessorByLRU(deltaObjectId, true);
assertEquals(false, processor == null);
processor.writeUnlock();
assertEquals(true, manager.close());
// multiple thread read test
Thread thread2 = new Thread(new GetReaderProcessor());
Thread thread2 = new Thread(new GetReaderProcessor(deltaObjectId));
thread2.start();
Thread.sleep(100);
processor = manager.getProcessorByLRU(deltaObjectId, false);
@ -161,7 +167,7 @@ public class LRUManagerTest {
fail(e.getMessage());
}
}
try {
manager.close();
} catch (LRUManagerException e) {
@ -170,13 +176,22 @@ public class LRUManagerTest {
}
}
/**
* Get the write
*/
class GetWriterProcessor implements Runnable {
private String deltaObjectId;
public GetWriterProcessor(String deltaObjectId) {
this.deltaObjectId = deltaObjectId;
}
@Override
public void run() {
LRUProcessor lruProcessor = null;
try {
lruProcessor = manager.getProcessorByLRU("root.vehicle.d0", true);
lruProcessor = manager.getProcessorByLRU(deltaObjectId, true);
} catch (LRUManagerException e) {
e.printStackTrace();
}
@ -193,12 +208,18 @@ public class LRUManagerTest {
class GetReaderProcessor implements Runnable {
private String deltaObjectId;
public GetReaderProcessor(String deltaObjectId) {
this.deltaObjectId = deltaObjectId;
}
@Override
public void run() {
LRUProcessor lruProcessor = null;
try {
lruProcessor = manager.getProcessorByLRU("root.vehicle.d0", false);
lruProcessor = manager.getProcessorByLRU(deltaObjectId, false);
} catch (LRUManagerException e) {
e.printStackTrace();
}

View File

@ -0,0 +1,164 @@
package cn.edu.thu.tsfiledb.engine.overflow.io;
import static org.junit.Assert.*;
import java.io.File;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import cn.edu.thu.tsfile.common.conf.TSFileConfig;
import cn.edu.thu.tsfile.common.conf.TSFileDescriptor;
import cn.edu.thu.tsfile.file.metadata.enums.TSDataType;
import cn.edu.thu.tsfile.format.DataType;
import cn.edu.thu.tsfile.timeseries.filter.definition.FilterExpression;
import cn.edu.thu.tsfile.timeseries.filter.definition.SingleSeriesFilterExpression;
import cn.edu.thu.tsfile.timeseries.read.query.DynamicOneColumnData;
import cn.edu.thu.tsfiledb.conf.TSFileDBConfig;
import cn.edu.thu.tsfiledb.conf.TSFileDBDescriptor;
import cn.edu.thu.tsfiledb.engine.bufferwrite.Action;
import cn.edu.thu.tsfiledb.engine.bufferwrite.FileNodeConstants;
import cn.edu.thu.tsfiledb.engine.exception.OverflowProcessorException;
import cn.edu.thu.tsfiledb.engine.filenode.FilterUtilsForOverflow;
public class BigDataForOverflowTest {
private String nameSpacePath = "root.vehicle.d0";
private String overflowfilePath = null;
private String overflowrestorefilePath = null;
private String overflowmergefilePath = null;
private Map<String, Object> parameters = null;
private OverflowProcessor ofprocessor = null;
private TSFileDBConfig tsdbconfig = TSFileDBDescriptor.getInstance().getConfig();
private TSFileConfig tsconfig = TSFileDescriptor.getInstance().getConfig();
private String deltaObjectId = "root.vehicle.d0";
private String[] measurementIds = { "s0", "s1", "s2", "s3", "s4", "s5" };
private String measurementId = "s0";
private TSDataType[] dataTypes = { TSDataType.INT64, TSDataType.INT32, TSDataType.FLOAT, TSDataType.DOUBLE,
TSDataType.BOOLEAN, TSDataType.BYTE_ARRAY };
private Action overflowflushaction = new Action() {
@Override
public void act() throws Exception {
System.out.println("overflow flush action");
}
};
private Action filenodeflushaction = new Action() {
@Override
public void act() throws Exception {
System.out.println("filenode flush action");
}
};
private Action filenodemanagerbackupaction = new Action() {
@Override
public void act() throws Exception {
System.out.println("filenode manager backup action");
}
};
private Action filenodemanagerflushaction = new Action() {
@Override
public void act() throws Exception {
System.out.println("filenode manager flush action");
}
};
@Before
public void setUp() throws Exception {
EngineTestHelper.delete(nameSpacePath);
parameters = new HashMap<String, Object>();
parameters.put(FileNodeConstants.OVERFLOW_FLUSH_ACTION, overflowflushaction);
parameters.put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, filenodeflushaction);
parameters.put(FileNodeConstants.OVERFLOW_BACKUP_MANAGER_ACTION, filenodemanagerbackupaction);
parameters.put(FileNodeConstants.OVERFLOW_FLUSH_MANAGER_ACTION, filenodemanagerflushaction);
tsdbconfig.overflowDataDir = "";
tsconfig.rowGroupSize = 1024 * 1024 * 10;
overflowfilePath = tsdbconfig.overflowDataDir + nameSpacePath + File.separatorChar + nameSpacePath
+ ".overflow";
overflowrestorefilePath = overflowfilePath + ".restore";
overflowmergefilePath = overflowfilePath + ".merge";
}
@After
public void tearDown() throws Exception {
EngineTestHelper.delete(nameSpacePath);
}
@Test
public void testBigData() {
long step = 10000;
long pass = step * 10;
long length = step * 1000;
try {
ofprocessor = new OverflowProcessor(nameSpacePath, parameters);
} catch (OverflowProcessorException e) {
e.printStackTrace();
fail(e.getMessage());
}
for (long i = 1; i <= length; i++) {
if (i > 0 && i % pass == 0) {
System.out.println(i / pass + " pass");
}
try {
ofprocessor.insert(deltaObjectId, measurementId, i, TSDataType.INT64, String.valueOf(i));
} catch (OverflowProcessorException e) {
e.printStackTrace();
fail(e.getMessage());
}
}
// construct the time filter for the query
SingleSeriesFilterExpression timeFilter = FilterUtilsForOverflow.construct(null, null, "0",
"(>=" + 0 + ")&" + "(<=" + length + ")");
List<Object> queryResult = ofprocessor.query(deltaObjectId, measurementId, timeFilter, null, null);
DynamicOneColumnData insertData = (DynamicOneColumnData) queryResult.get(0);
assertEquals(length, insertData.length);
try {
ofprocessor.close();
} catch (OverflowProcessorException e1) {
e1.printStackTrace();
fail(e1.getMessage());
}
try {
assertEquals(true, new File(overflowrestorefilePath).exists());
ofprocessor.switchWorkingToMerge();
assertEquals(true, new File(overflowmergefilePath).exists());
assertEquals(true, new File(overflowfilePath).exists());
} catch (OverflowProcessorException e1) {
e1.printStackTrace();
fail(e1.getMessage());
}
try {
ofprocessor.insert(deltaObjectId, measurementId, length+1,TSDataType.INT64,String.valueOf(length)+1);
} catch (OverflowProcessorException e1) {
e1.printStackTrace();
fail(e1.getMessage());
}
queryResult = ofprocessor.query(deltaObjectId, measurementId, timeFilter, null, null);
insertData = (DynamicOneColumnData) queryResult.get(0);
assertEquals(length, insertData.length);
try {
ofprocessor.switchMergeToWorking();
ofprocessor.close();
} catch (OverflowProcessorException e) {
e.printStackTrace();
fail(e.getMessage());
}
}
}

View File

@ -17,6 +17,8 @@ import cn.edu.thu.tsfile.common.conf.TSFileDescriptor;
import cn.edu.thu.tsfile.common.utils.RandomAccessOutputStream;
import cn.edu.thu.tsfile.file.metadata.enums.TSDataType;
import cn.edu.thu.tsfile.timeseries.read.query.DynamicOneColumnData;
import cn.edu.thu.tsfiledb.conf.TSFileDBConfig;
import cn.edu.thu.tsfiledb.conf.TSFileDBDescriptor;
import cn.edu.thu.tsfiledb.engine.bufferwrite.Action;
import cn.edu.thu.tsfiledb.engine.bufferwrite.FileNodeConstants;
import cn.edu.thu.tsfiledb.engine.exception.OverflowProcessorException;
@ -30,7 +32,8 @@ public class OverflowProcessorTest {
private String overflowmergefilePath = null;
private Map<String, Object> parameters = null;
private OverflowProcessor ofprocessor = null;
private TSFileConfig config = TSFileDescriptor.getInstance().getConfig();
private TSFileDBConfig tsdbconfig = TSFileDBDescriptor.getInstance().getConfig();
private TSFileConfig tsconfig = TSFileDescriptor.getInstance().getConfig();
private String deltaObjectId = "root.vehicle.d0";
private String[] measurementIds = { "s0", "s1", "s2", "s3", "s4", "s5" };
private TSDataType[] dataTypes = { TSDataType.INT32, TSDataType.INT64, TSDataType.FLOAT, TSDataType.DOUBLE,
@ -78,8 +81,9 @@ public class OverflowProcessorTest {
parameters.put(FileNodeConstants.OVERFLOW_FLUSH_MANAGER_ACTION, filenodemanagerflushaction);
// set overflow data dir is ""
config.overflowDataDir = "";
overflowfilePath = config.overflowDataDir + nameSpacePath + File.separatorChar + nameSpacePath + ".overflow";
tsdbconfig.overflowDataDir = "";
overflowfilePath = tsdbconfig.overflowDataDir + nameSpacePath + File.separatorChar + nameSpacePath
+ ".overflow";
overflowrestorefilePath = overflowfilePath + ".restore";
overflowmergefilePath = overflowfilePath + ".merge";
}
@ -228,8 +232,8 @@ public class OverflowProcessorTest {
@Test
public void testFlush() {
// set the tsfile config
config.rowGroupSize = 500;
// set the tsfile tsdbconfig
tsconfig.rowGroupSize = 500;
try {
ofprocessor = new OverflowProcessor(nameSpacePath, parameters);
for (int i = 1; i < 1001; i++) {
@ -257,7 +261,7 @@ public class OverflowProcessorTest {
@Test
public void testMerge() {
// insert data
config.rowGroupSize = 500;
tsconfig.rowGroupSize = 500;
try {
ofprocessor = new OverflowProcessor(nameSpacePath, parameters);
for (int i = 1; i < 1001; i++) {
@ -309,7 +313,7 @@ public class OverflowProcessorTest {
}
@Test
public void testRestoreFrommerge() {
public void testRestoreFromMerge() {
// write some rowgroup and close file
// change the file name to merge and delete the restore file
@ -327,11 +331,11 @@ public class OverflowProcessorTest {
public void testMergeQuery() {
// write oveflow data and close
// work to merge
// optional: write data in new file
// query data and check data
fail("merge and query");

View File

@ -3,6 +3,7 @@ package cn.edu.thu.tsfiledb.engine.overflow.io;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
import java.util.List;
@ -11,39 +12,172 @@ import org.junit.Before;
import org.junit.Test;
import cn.edu.thu.tsfile.common.utils.BytesUtils;
import cn.edu.thu.tsfile.common.utils.RandomAccessOutputStream;
import cn.edu.thu.tsfile.compress.Compressor;
import cn.edu.thu.tsfile.file.metadata.TimeSeriesChunkMetaData;
import cn.edu.thu.tsfile.file.metadata.enums.CompressionTypeName;
import cn.edu.thu.tsfile.file.metadata.enums.TSDataType;
import cn.edu.thu.tsfile.timeseries.read.query.DynamicOneColumnData;
import cn.edu.thu.tsfiledb.engine.overflow.io.OverflowFileIO;
import cn.edu.thu.tsfiledb.engine.overflow.io.OverflowReadWriter;
import cn.edu.thu.tsfiledb.engine.overflow.io.OverflowSeriesImpl;
public class OverflowSeriesImplTest {
private String filePath = "overflowseriesimpltest";
private RandomAccessOutputStream raf = null;
private String mergeFilePath = filePath + ".merge";
private OverflowReadWriter ofrw = null;
private OverflowFileIO ofio = null;
private OverflowSeriesImpl seriesimpl = null;
private OverflowSeriesImpl mergeseriesimpl = null;
private String measurementId = "s0";
@Before
public void setUp() throws Exception {
EngineTestHelper.delete(mergeFilePath);
EngineTestHelper.delete(filePath);
}
@After
public void tearDown() throws Exception {
EngineTestHelper.delete(filePath);
EngineTestHelper.delete(mergeFilePath);
}
@Test
public void testMergeAndQuery() {
try {
ofrw = new OverflowReadWriter(filePath);
} catch (IOException e) {
e.printStackTrace();
fail("Construct the overflow read write failed");
}
try {
ofio = new OverflowFileIO(ofrw, filePath, 0);
} catch (IOException e) {
e.printStackTrace();
fail("Construct the overflowfile io failed");
}
seriesimpl = new OverflowSeriesImpl(measurementId, TSDataType.INT32, ofio,
Compressor.getCompressor(CompressionTypeName.UNCOMPRESSED), null);
assertEquals(TSDataType.INT32, seriesimpl.getTSDataType());
// insert data and flush
for (int i = 1; i < 11; i++) {
seriesimpl.insert(i, BytesUtils.intToBytes(i));
}
seriesimpl.switchWorkingToFlushing();
try {
seriesimpl.flushToFileWriter(ofio);
} catch (IOException e) {
e.printStackTrace();
fail("Flush overflowfileio failed");
}
// insert data and flush
for (int i = 11; i < 21; i++) {
seriesimpl.insert(i, BytesUtils.intToBytes(i));
}
seriesimpl.switchWorkingToFlushing();
try {
seriesimpl.flushToFileWriter(ofio);
} catch (IOException e) {
e.printStackTrace();
fail("Flush overflowfileio failed");
}
assertEquals(2, seriesimpl.getOFSeriesListMetadata().getMetaDatas().size());
List<TimeSeriesChunkMetaData> metaForRead = seriesimpl.getOFSeriesListMetadata().getMetaDatas();
// close file
try {
ofio.close();
} catch (IOException e) {
e.printStackTrace();
fail(e.getMessage());
}
// change file name
File overflowFile = new File(filePath);
File overflowMergeFile = new File(mergeFilePath);
overflowFile.renameTo(overflowMergeFile);
long lastupdatepostion = overflowMergeFile.length();
// construct merge serriesimpl
try {
ofrw = new OverflowReadWriter(mergeFilePath);
} catch (IOException e) {
e.printStackTrace();
fail("Construct the overflow read write failed");
}
try {
ofio = new OverflowFileIO(ofrw, mergeFilePath, lastupdatepostion);
} catch (IOException e) {
e.printStackTrace();
fail("Construct the overflowfile io failed");
}
mergeseriesimpl = new OverflowSeriesImpl(measurementId, TSDataType.INT32, ofio,
Compressor.getCompressor(CompressionTypeName.UNCOMPRESSED), metaForRead);
// construct new seriesimpl
try {
ofrw = new OverflowReadWriter(filePath);
} catch (IOException e) {
e.printStackTrace();
fail("Construct the overflow read write failed");
}
try {
ofio = new OverflowFileIO(ofrw, filePath, 0);
} catch (IOException e) {
e.printStackTrace();
fail("Construct the overflowfile io failed");
}
seriesimpl = new OverflowSeriesImpl(measurementId, TSDataType.INT32, ofio,
Compressor.getCompressor(CompressionTypeName.UNCOMPRESSED), null);
assertEquals(TSDataType.INT32, seriesimpl.getTSDataType());
// insert data
for (int i = 21; i < 31; i++) {
seriesimpl.insert(i, BytesUtils.intToBytes(i));
}
seriesimpl.switchWorkingToFlushing();
try {
seriesimpl.flushToFileWriter(ofio);
} catch (IOException e) {
e.printStackTrace();
fail("Flush overflowfileio failed");
}
for (int i = 31; i < 41; i++) {
seriesimpl.insert(i, BytesUtils.intToBytes(i));
}
// no merge query
List<Object> queryResult = seriesimpl.query(null, null, null);
DynamicOneColumnData insertData = (DynamicOneColumnData) queryResult.get(0);
assertEquals(20, insertData.length);
for (int i = 0; i < 20; i++) {
assertEquals(i + 21, insertData.getTime(i));
assertEquals(i + 21, insertData.getInt(i));
}
// merge and query
seriesimpl.switchWorkingToMerging();
seriesimpl.setMergingSeriesImpl(mergeseriesimpl);
queryResult = seriesimpl.query(null, null, null);
insertData = (DynamicOneColumnData) queryResult.get(0);
assertEquals(40, insertData.length);
for (int i = 0; i < 40; i++) {
assertEquals(i + 1, insertData.getTime(i));
assertEquals(i + 1, insertData.getInt(i));
}
try {
ofio.close();
} catch (IOException e) {
e.printStackTrace();
fail(e.getMessage());
}
}
@Test
public void test() {
public void testInsertUpdateDeleteFlushAndQuery() {
try {
ofrw = new OverflowReadWriter(filePath);
} catch (IOException e) {
@ -129,11 +263,11 @@ public class OverflowSeriesImplTest {
}
assertEquals(4, seriesimpl.getOFSeriesListMetadata().getMetaDatas().size());
result = seriesimpl.query(null, null, null);
insertresult = (DynamicOneColumnData) result.get(0);
updateresult = (DynamicOneColumnData) result.get(1);
assertEquals(0, updateresult.length);
assertEquals(0, insertresult.length);
result = seriesimpl.query(null, null, null);
insertresult = (DynamicOneColumnData) result.get(0);
updateresult = (DynamicOneColumnData) result.get(1);
assertEquals(0, updateresult.length);
assertEquals(0, insertresult.length);
// test flush empty data
seriesimpl.switchWorkingToFlushing();
@ -145,30 +279,201 @@ public class OverflowSeriesImplTest {
}
assertEquals(4, seriesimpl.getOFSeriesListMetadata().getMetaDatas().size());
}
@Test
public void testLong(){
private OverflowSeriesImpl createSeriesImpl(TSDataType dataType) {
try {
ofrw = new OverflowReadWriter(filePath);
} catch (IOException e) {
e.printStackTrace();
fail("Construct the overflow read write failed");
}
try {
ofio = new OverflowFileIO(ofrw, filePath, 0);
} catch (IOException e) {
e.printStackTrace();
fail("Construct the overflowfile io failed");
}
return seriesimpl = new OverflowSeriesImpl(measurementId, dataType, ofio,
Compressor.getCompressor(CompressionTypeName.UNCOMPRESSED), null);
}
@Test
public void testFloat(){
public void testLong() {
seriesimpl = createSeriesImpl(TSDataType.INT64);
// flush data
for (long i = 1; i < 11; i++) {
seriesimpl.insert(i, BytesUtils.longToBytes(i));
}
assertEquals(true, seriesimpl.isEmptyForWrite());
// flush data
seriesimpl.switchWorkingToFlushing();
try {
seriesimpl.flushToFileWriter(ofio);
} catch (IOException e) {
e.printStackTrace();
fail("Flush overflowfileio failed");
}
assertEquals(false, seriesimpl.isEmptyForWrite());
assertEquals(1, seriesimpl.getOFSeriesListMetadata().getMetaDatas().size());
// query
List<Object> result = seriesimpl.query(null, null, null);
DynamicOneColumnData insertresult = (DynamicOneColumnData) result.get(0);
assertEquals(false, insertresult == null);
assertEquals(10, insertresult.length);
for (int i = 1; i < 11; i++) {
assertEquals(i, insertresult.getTime(i - 1));
assertEquals(i, insertresult.getLong(i - 1));
}
}
@Test
public void testDouble(){
public void testFloat() {
seriesimpl = createSeriesImpl(TSDataType.FLOAT);
for (long i = 1; i < 11; i++) {
seriesimpl.insert(i, BytesUtils.floatToBytes((float) (1.1 + i)));
}
assertEquals(true, seriesimpl.isEmptyForWrite());
// flush data
seriesimpl.switchWorkingToFlushing();
try {
seriesimpl.flushToFileWriter(ofio);
} catch (IOException e) {
e.printStackTrace();
fail("Flush overflowfileio failed");
}
assertEquals(false, seriesimpl.isEmptyForWrite());
assertEquals(1, seriesimpl.getOFSeriesListMetadata().getMetaDatas().size());
// query
List<Object> result = seriesimpl.query(null, null, null);
DynamicOneColumnData insertresult = (DynamicOneColumnData) result.get(0);
assertEquals(false, insertresult == null);
assertEquals(10, insertresult.length);
for (int i = 1; i < 11; i++) {
assertEquals(i, insertresult.getTime(i - 1));
assertEquals(String.valueOf(i + 1.1), String.valueOf(insertresult.getFloat(i - 1)));
}
try {
ofio.close();
} catch (IOException e) {
e.printStackTrace();
fail(e.getMessage());
}
}
@Test
public void testBoolean(){
public void testDouble() {
seriesimpl = createSeriesImpl(TSDataType.DOUBLE);
for (long i = 1; i < 11; i++) {
seriesimpl.insert(i, BytesUtils.doubleToBytes((double) (1.1 + i)));
}
assertEquals(true, seriesimpl.isEmptyForWrite());
// flush data
seriesimpl.switchWorkingToFlushing();
try {
seriesimpl.flushToFileWriter(ofio);
} catch (IOException e) {
e.printStackTrace();
fail("Flush overflowfileio failed");
}
assertEquals(false, seriesimpl.isEmptyForWrite());
assertEquals(1, seriesimpl.getOFSeriesListMetadata().getMetaDatas().size());
// query
List<Object> result = seriesimpl.query(null, null, null);
DynamicOneColumnData insertresult = (DynamicOneColumnData) result.get(0);
assertEquals(false, insertresult == null);
assertEquals(10, insertresult.length);
for (int i = 1; i < 11; i++) {
assertEquals(i, insertresult.getTime(i - 1));
assertEquals(String.valueOf(i + 1.1), String.valueOf(insertresult.getDouble(i - 1)));
}
try {
ofio.close();
} catch (IOException e) {
e.printStackTrace();
fail(e.getMessage());
}
}
@Test
public void testByteArray(){
public void testBoolean() {
seriesimpl = createSeriesImpl(TSDataType.BOOLEAN);
for (long i = 1; i < 11; i++) {
seriesimpl.insert(i, BytesUtils.boolToBytes(i / 2 == 0 ? true : false));
}
assertEquals(true, seriesimpl.isEmptyForWrite());
// flush data
seriesimpl.switchWorkingToFlushing();
try {
seriesimpl.flushToFileWriter(ofio);
} catch (IOException e) {
e.printStackTrace();
fail("Flush overflowfileio failed");
}
assertEquals(false, seriesimpl.isEmptyForWrite());
assertEquals(1, seriesimpl.getOFSeriesListMetadata().getMetaDatas().size());
// query
List<Object> result = seriesimpl.query(null, null, null);
DynamicOneColumnData insertresult = (DynamicOneColumnData) result.get(0);
assertEquals(false, insertresult == null);
assertEquals(10, insertresult.length);
for (int i = 1; i < 11; i++) {
assertEquals(i, insertresult.getTime(i - 1));
assertEquals(i / 2 == 0 ? true : false, insertresult.getBoolean(i - 1));
}
try {
ofio.close();
} catch (IOException e) {
e.printStackTrace();
fail(e.getMessage());
}
}
@Test
public void testByteArray() {
seriesimpl = createSeriesImpl(TSDataType.BYTE_ARRAY);
for (long i = 1; i < 11; i++) {
seriesimpl.insert(i, BytesUtils.StringToBytes(String.valueOf(i)));
}
assertEquals(true, seriesimpl.isEmptyForWrite());
// flush data
seriesimpl.switchWorkingToFlushing();
try {
seriesimpl.flushToFileWriter(ofio);
} catch (IOException e) {
e.printStackTrace();
fail("Flush overflowfileio failed");
}
assertEquals(false, seriesimpl.isEmptyForWrite());
assertEquals(1, seriesimpl.getOFSeriesListMetadata().getMetaDatas().size());
// query
List<Object> result = seriesimpl.query(null, null, null);
DynamicOneColumnData insertresult = (DynamicOneColumnData) result.get(0);
assertEquals(false, insertresult == null);
assertEquals(10, insertresult.length);
for (int i = 1; i < 11; i++) {
assertEquals(i, insertresult.getTime(i - 1));
assertEquals(String.valueOf(i), insertresult.getStringValue(i - 1));
}
try {
ofio.close();
} catch (IOException e) {
e.printStackTrace();
fail(e.getMessage());
}
}
@Test
public void testEnum() {
seriesimpl = createSeriesImpl(TSDataType.ENUMS);
fail("Not support Type");
}
}

View File

@ -4,4 +4,45 @@ if [ -z "${TSFILE_HOME}" ]; then
export TSFILE_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi
"$TSFILE_HOME"/bin/tsfiledb-deamon.sh start
TSFILE_CONF=${TSFILE_HOME}/conf
TSFILE_LOGS=${TSFILE_HOME}/logs
if [ -f "$TSFILE_CONF/tsfile-env.sh" ]; then
. "$TSFILE_CONF/tsfile-env.sh"
else
echo "can't find $TSFILE_CONF/tsfile-env.sh"
fi
if [ -n "$JAVA_HOME" ]; then
for java in "$JAVA_HOME"/bin/amd64/java "$JAVA_HOME"/bin/java; do
if [ -x "$java" ]; then
JAVA="$java"
break
fi
done
else
JAVA=java
fi
if [ -z $JAVA ] ; then
echo Unable to find java executable. Check JAVA_HOME and PATH environment variables. > /dev/stderr
exit 1;
fi
CLASSPATH=""
for f in ${TSFILE_HOME}/lib/*.jar; do
CLASSPATH=${CLASSPATH}":"$f
done
MAIN_CLASS=cn.edu.thu.tsfiledb.service.JMXManager
pid="$TSFILE_HOME"/tmp/tsfiledb.pid
echo "starting tsfiledb"
"$JAVA" -DTSFILE_HOME=${TSFILE_HOME} -Dlogback.configurationFile=${TSFILE_CONF}/logback.xml -cp "$CLASSPATH" "$MAIN_CLASS"
exit $?

View File

@ -1,7 +0,0 @@
#!/bin/sh
if [ -z "${TSFILE_HOME}" ]; then
export TSFILE_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi
"$TSFILE_HOME"/bin/tsfiledb-deamon.sh stop

View File

@ -1,89 +0,0 @@
#!/bin/sh
if [ -z "${TSFILE_HOME}" ]; then
export TSFILE_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi
TSFILE_CONF=${TSFILE_HOME}/conf
TSFILE_LOGS=${TSFILE_HOME}/logs
if [ -f "$TSFILE_CONF/tsfile-env.sh" ]; then
. "$TSFILE_CONF/tsfile-env.sh"
else
echo "can't find $TSFILE_CONF/tsfile-env.sh"
fi
if [ -n "$JAVA_HOME" ]; then
for java in "$JAVA_HOME"/bin/amd64/java "$JAVA_HOME"/bin/java; do
if [ -x "$java" ]; then
JAVA="$java"
break
fi
done
else
JAVA=java
fi
if [ -z $JAVA ] ; then
echo Unable to find java executable. Check JAVA_HOME and PATH environment variables. > /dev/stderr
exit 1;
fi
CLASSPATH=""
for f in ${TSFILE_HOME}/lib/*.jar; do
CLASSPATH=${CLASSPATH}":"$f
done
MAIN_CLASS=cn.edu.thu.tsfiledb.service.JMXManager
pid="$TSFILE_HOME"/tmp/tsfiledb.pid
case "$1" in
start)
if [ -f ${pid} ]; then
TARGET_ID="$(cat "$pid")"
PIDS=`ps -ef |grep "$TARGET_ID" |grep -v grep | awk '{print $2}'`
if [ "$PIDS" != "" ]; then
echo "delta is already running as process "$TARGET_ID". Stop it first."
else
echo "starting delta"
rm -rf ${pid}
"$JAVA" -DTSFILE_HOME=${TSFILE_HOME} -Dlogback.configurationFile=${TSFILE_CONF}/logback.xml -cp "$CLASSPATH" "$MAIN_CLASS"
echo $! > ${pid}
fi
else
echo "starting delta"
rm -rf ${pid}
"$JAVA" -DTSFILE_HOME=${TSFILE_HOME} -Dlogback.configurationFile=${TSFILE_CONF}/logback.xml -cp "$CLASSPATH" "$MAIN_CLASS"
echo $! > ${pid}
fi
;;
stop)
if [ -f ${pid} ]; then
TARGET_ID="$(cat "$pid")"
PIDS=`ps -ef |grep "$TARGET_ID" |grep -v grep | awk '{print $2}'`
if [ "$PIDS" != "" ]; then
echo "stopping delta"
kill `cat $pid`
rm -rf ${pid}
else
echo "no delta to stop"
rm -rf ${pid}
fi
else
echo "no delta to stop"
fi
;;
*)
echo "Usage: run.sh {start|stop}"
;;
esac
exit 0

326
tsfiledb/conf/rpc.thrift Normal file
View File

@ -0,0 +1,326 @@
namespace java cn.edu.thu.tsfiledb.service.rpc.thrift
service TSIService {
TSOpenSessionResp OpenSession(1:TSOpenSessionReq req);
TSCloseSessionResp CloseSession(1:TSCloseSessionReq req);
TSExecuteStatementResp ExecuteStatement(1:TSExecuteStatementReq req);
TSExecuteBatchStatementResp ExecuteBatchStatement(1:TSExecuteBatchStatementReq req);
TSExecuteStatementResp ExecuteQueryStatement(1:TSExecuteStatementReq req);
TSExecuteStatementResp ExecuteUpdateStatement(1:TSExecuteStatementReq req);
TSFetchResultsResp FetchResults(1:TSFetchResultsReq req)
TSFetchMetadataResp FetchMetadata(1:TSFetchMetadataReq req)
//TSGetOperationStatusResp GetOperationStatus(1:TSGetOperationStatusReq req);
TSCancelOperationResp CancelOperation(1:TSCancelOperationReq req);
TSCloseOperationResp CloseOperation(1:TSCloseOperationReq req);
//TSGetResultSetMetadataResp GetResultSetMetadata(1:TSGetResultSetMetadataReq req);
//TSFetchResultsResp FetchResults(1:TSFetchResultsReq req);
}
enum TSProtocolVersion {
TSFILE_SERVICE_PROTOCOL_V1,
}
// OpenSession()
//
// Open a session (connection) on the server against
// which operations may be executed.
struct TSOpenSessionReq {
1: required TSProtocolVersion client_protocol = TSProtocolVersion.TSFILE_SERVICE_PROTOCOL_V1
2: optional string username
3: optional string password
4: optional map<string, string> configuration
}
struct TSOpenSessionResp {
1: required TS_Status status
// The protocol version that the server is using.
2: required TSProtocolVersion serverProtocolVersion = TSProtocolVersion.TSFILE_SERVICE_PROTOCOL_V1
// Session Handle
3: optional TS_SessionHandle sessionHandle
// The configuration settings for this session.
4: optional map<string, string> configuration
}
// The return status code contained in each response.
enum TS_StatusCode {
SUCCESS_STATUS,
SUCCESS_WITH_INFO_STATUS,
STILL_EXECUTING_STATUS,
ERROR_STATUS,
INVALID_HANDLE_STATUS
}
// The return status of a remote request
struct TS_Status {
1: required TS_StatusCode statusCode
// If status is SUCCESS_WITH_INFO, info_msgs may be populated with
// additional diagnostic information.
2: optional list<string> infoMessages
// If status is ERROR, then the following fields may be set
3: optional string sqlState // as defined in the ISO/IEF CLI specification
4: optional i32 errorCode // internal error code
5: optional string errorMessage
}
// CloseSession()
//
// Closes the specified session and frees any resources
// currently allocated to that session. Any open
// operations in that session will be canceled.
struct TSCloseSessionReq {
1: required TS_SessionHandle sessionHandle
}
struct TSCloseSessionResp {
1: required TS_Status status
}
struct TSHandleIdentifier {
// 16 byte globally unique identifier
// This is the public ID of the handle and
// can be used for reporting.
1: required binary guid,
// 16 byte secret generated by the server
// and used to verify that the handle is not
// being hijacked by another user.
2: required binary secret,
}
// Client-side handle to persistent
// session information on the server-side.
struct TS_SessionHandle {
1: required TSHandleIdentifier sessionId
}
struct TSGetOperationStatusReq {
// Session to run this request against
1: required TSOperationHandle operationHandle
}
struct TSGetOperationStatusResp {
1: required TS_Status status
//2: optional TSOperationState operationState
// If operationState is ERROR_STATE, then the following fields may be set
// sqlState as defined in the ISO/IEF CLI specification
//3: optional string sqlState
// Internal error code
//4: optional i32 errorCode
// Error message
//5: optional string errorMessage
// List of statuses of sub tasks
//6: optional string taskStatus
// When was the operation started
//7: optional i64 operationStarted
// When was the operation completed
//8: optional i64 operationCompleted
// If the operation has the result
//9: optional bool hasResultSet
}
// CancelOperation()
//
// Cancels processing on the specified operation handle and
// frees any resources which were allocated.
struct TSCancelOperationReq {
// Operation to cancel
1: required TSOperationHandle operationHandle
}
struct TSCancelOperationResp {
1: required TS_Status status
}
// CloseOperation()
//
// Given an operation in the FINISHED, CANCELED,
// or ERROR states, CloseOperation() will free
// all of the resources which were allocated on
// the server to service the operation.
struct TSCloseOperationReq {
1: required TSOperationHandle operationHandle
}
struct TSCloseOperationResp {
1: required TS_Status status
}
// Client-side reference to a task running
// asynchronously on the server.
struct TSOperationHandle {
1: required TSHandleIdentifier operationId
// If hasResultSet = TRUE, then this operation
// generates a result set that can be fetched.
// Note that the result set may be empty.
//
// If hasResultSet = FALSE, then this operation
// does not generate a result set, and calling
// GetResultSetMetadata or FetchResults against
// this OperationHandle will generate an error.
2: required bool hasResultSet
//3: required TSOperationType operationType
// For operations that don't generate result sets,
// modifiedRowCount is either:
//
// 1) The number of rows that were modified by
// the DML operation (e.g. number of rows inserted,
// number of rows deleted, etc).
//
// 2) 0 for operations that don't modify or add rows.
//
// 3) < 0 if the operation is capable of modifiying rows,
// but Hive is unable to determine how many rows were
// modified. For example, Hive's LOAD DATA command
// doesn't generate row count information because
// Hive doesn't inspect the data as it is loaded.
//
// modifiedRowCount is unset if the operation generates
// a result set.
//4: optional double modifiedRowCount
}
// ExecuteStatement()
//
// Execute a statement.
// The returned OperationHandle can be used to check on the
// status of the statement, and to fetch results once the
// statement has finished executing.
struct TSExecuteStatementReq {
// The session to execute the statement against
1: required TS_SessionHandle sessionHandle
// The statement to be executed (DML, DDL, SET, etc)
2: required string statement
// Configuration properties that are overlayed on top of the
// the existing session configuration before this statement
// is executed. These properties apply to this statement
// only and will not affect the subsequent state of the Session.
//3: optional map<string, string> confOverlay
// Execute asynchronously when runAsync is true
//4: optional bool runAsync = false
// The number of seconds after which the query will timeout on the server
//5: optional i64 queryTimeout = 0
}
struct TSExecuteStatementResp {
1: required TS_Status status
2: optional TSOperationHandle operationHandle
3: optional list<string> columns
}
struct TSExecuteBatchStatementResp{
1: required TS_Status status
2: optional list<i32> result
}
struct TSExecuteBatchStatementReq{
// The session to execute the statement against
1: required TS_SessionHandle sessionHandle
// The statements to be executed (DML, DDL, SET, etc)
2: required list<string> statements
}
struct TSQueryDataSet{
1: required list<string> keys
2: required list<TSDynamicOneColumnData> values
}
struct TSDynamicOneColumnData{
1: required string deviceType
2: required string dataType
3: required i32 length
4: required list<i64> timeRet
5: optional list<bool> boolList
6: optional list<i32> i32List
7: optional list<i64> i64List
8: optional list<double> floatList
9: optional list<double> doubleList
10: optional list<byte> binaryList
}
struct TSFetchResultsReq{
1: required string statement
2: required i32 fetch_size
}
struct TSFetchResultsResp{
1: required TS_Status status
2: required bool hasResultSet
3: optional TSQueryDataSet queryDataSet
}
//
// struct TSJDBCRecord {
// 1: required string deviceType
// 2: required string deviceId
// 3: required list<TSDataPoint> dataList
// 4: required TSTimeValue timeValue
// }
//
// struct TSTimeValue {
// 1: required i64 time
// }
//
// struct TSDataPoint{
// 1: required string type
// 2: required string sensorId
// 3: required string deviceId
// 4: required string valueStr
// 5: optional i32 groupId
// }
struct TSFetchMetadataResp{
1: required TS_Status status
2: optional map<string, list<TSColumnSchema>> seriesMap
3: optional map<string, list<string>> deltaObjectMap
4: optional string metadataInJson
}
struct TSFetchMetadataReq{
}
struct TSColumnSchema{
1: optional string name;
2: optional string dataType;
3: optional string encoding;
4: optional map<string, string> otherArgs;
}

View File

@ -0,0 +1,22 @@
# exmaple tsfile config file
# 128M = 128*1024*1024
rowGroupSize=134217728
# 8KB = 8*1024
pageSize=8192
timeSeriesEncoder=TS_2DIFF
# timeSeriesEncoder=PLAIN
defaultSeriesEncoder=RLE
# defaultSeriesEncoder=PLAIN
compressName=UNCOMPRESSED
defaultRleBitWidth=8
defaultEndian=LITTLE_ENDIAN
defaultDeltaBlockSize=128
defaultPLAMaxError=100
defaultSDTMaxError=100
# RleLongDefaultNull=0
# RleIntDefaultNull=0
# TS2DiffLongDefaultNull=-1
# TS2DiffIntDefaultNull=-1
# defaultDeltaBlockSize=128

View File

@ -0,0 +1,23 @@
writeInstanceThreshold=5
overflowDataDir=src/main/resources/data/overflow
# FileNodeDir=src/main/resources/data/digest
# BufferWriteDir=src/main/resources/data/delta
# metadataDir=src/main/resources/metadata
# derbyHome=src/main/resources/derby
mergeConcurrentThreadNum =10
maxFileNodeNum =1000
maxOverflowNodeNum=100
maxBufferWriteNodeNum=50
defaultFetchSize=5000
# writeLogPath=src/main/resources/writeLog.log"

View File

@ -1 +0,0 @@