[IOTDB-323] Batch insert in session (#588)

* warp lots of insert command into batch in session
This commit is contained in:
SilverNarcissus 2019-11-28 04:50:38 -06:00 committed by Jialin Qiao
parent 6bae410529
commit adfc5ea8a8
7 changed files with 266 additions and 69 deletions

View File

@ -101,6 +101,10 @@ Here we show the commonly used interfaces and their parameters in the Session:
TSStatus deleteData(List<String> paths, long time)
* Insert data into existing timeseries in batch
TSStatus insertInBatch(List<String> deviceIds, List<Long> times, List<List<String>> measurementsList, List<List<String>> valuesList)
* Insert data into existing timeseries
TSStatus insert(String deviceId, long time, List<String> measurements, List<String> values)

View File

@ -48,6 +48,7 @@ public class SessionExample {
session.createTimeseries("root.sg1.d1.s3", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
insert();
insertInBatch();
insertRowBatch();
nonQuery();
query();
@ -71,6 +72,39 @@ public class SessionExample {
}
}
private static void insertInBatch() throws IoTDBSessionException {
String deviceId = "root.sg1.d2";
List<String> measurements = new ArrayList<>();
measurements.add("s1");
measurements.add("s2");
measurements.add("s3");
List<String> deviceIds = new ArrayList<>();
List<List<String>> measurementsList = new ArrayList<>();
List<List<String>> valuesList = new ArrayList<>();
List<Long> timestamps = new ArrayList<>();
for (long time = 0; time < 500; time++) {
List<String> values = new ArrayList<>();
values.add("1");
values.add("2");
values.add("3");
deviceIds.add(deviceId);
measurementsList.add(measurements);
valuesList.add(values);
timestamps.add(time);
if (time != 0 && time % 100 == 0) {
session.insertInBatch(deviceIds, timestamps, measurementsList, valuesList);
deviceIds.clear();
measurementsList.clear();
valuesList.clear();
timestamps.clear();
}
}
session.insertInBatch(deviceIds, timestamps, measurementsList, valuesList);
}
private static void insertRowBatch() throws IoTDBSessionException {
Schema schema = new Schema();
schema.registerMeasurement(new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.RLE));

View File

@ -88,6 +88,7 @@ import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementResp;
import org.apache.iotdb.service.rpc.thrift.TSExecuteInsertRowInBatchResp;
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataReq;
@ -97,6 +98,7 @@ import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
import org.apache.iotdb.service.rpc.thrift.TSGetTimeZoneResp;
import org.apache.iotdb.service.rpc.thrift.TSHandleIdentifier;
import org.apache.iotdb.service.rpc.thrift.TSIService;
import org.apache.iotdb.service.rpc.thrift.TSInsertInBatchReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertionReq;
import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
@ -156,6 +158,46 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
processor = new QueryProcessor(new QueryProcessExecutor());
}
public static TSDataType getSeriesType(String path) throws QueryProcessException {
switch (path.toLowerCase()) {
// authorization queries
case ROLE:
case USER:
case PRIVILEGE:
case STORAGE_GROUP:
return TSDataType.TEXT;
case TTL:
return TSDataType.INT64;
default:
// do nothing
}
if (path.contains("(") && !path.startsWith("(") && path.endsWith(")")) {
// aggregation
int leftBracketIndex = path.indexOf('(');
String aggrType = path.substring(0, leftBracketIndex);
String innerPath = path.substring(leftBracketIndex + 1, path.length() - 1);
switch (aggrType.toLowerCase()) {
case StatisticConstant.MIN_TIME:
case StatisticConstant.MAX_TIME:
case StatisticConstant.COUNT:
return TSDataType.INT64;
case StatisticConstant.LAST:
case StatisticConstant.FIRST:
case StatisticConstant.MIN_VALUE:
case StatisticConstant.MAX_VALUE:
return getSeriesType(innerPath);
case StatisticConstant.AVG:
case StatisticConstant.SUM:
return TSDataType.DOUBLE;
default:
throw new QueryProcessException(
"aggregate does not support " + aggrType + " function.");
}
}
return MManager.getInstance().getSeriesType(path);
}
@Override
public TSOpenSessionResp openSession(TSOpenSessionReq req) throws TException {
logger.info("{}: receive open session request from username {}", IoTDBConstant.GLOBAL_DB_NAME,
@ -285,7 +327,6 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
return new TSStatus(getStatus(TSStatusCode.SUCCESS_STATUS));
}
/**
* release single operation resource
*/
@ -443,46 +484,6 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
return MManager.getInstance().getMetadataInString();
}
public static TSDataType getSeriesType(String path) throws QueryProcessException {
switch (path.toLowerCase()) {
// authorization queries
case ROLE:
case USER:
case PRIVILEGE:
case STORAGE_GROUP:
return TSDataType.TEXT;
case TTL:
return TSDataType.INT64;
default:
// do nothing
}
if (path.contains("(") && !path.startsWith("(") && path.endsWith(")")) {
// aggregation
int leftBracketIndex = path.indexOf('(');
String aggrType = path.substring(0, leftBracketIndex);
String innerPath = path.substring(leftBracketIndex + 1, path.length() - 1);
switch (aggrType.toLowerCase()) {
case StatisticConstant.MIN_TIME:
case StatisticConstant.MAX_TIME:
case StatisticConstant.COUNT:
return TSDataType.INT64;
case StatisticConstant.LAST:
case StatisticConstant.FIRST:
case StatisticConstant.MIN_VALUE:
case StatisticConstant.MAX_VALUE:
return getSeriesType(innerPath);
case StatisticConstant.AVG:
case StatisticConstant.SUM:
return TSDataType.DOUBLE;
default:
throw new QueryProcessException(
"aggregate does not support " + aggrType + " function.");
}
}
return MManager.getInstance().getSeriesType(path);
}
protected List<String> getPaths(String path) throws MetadataException {
return MManager.getInstance().getPaths(path);
}
@ -1173,6 +1174,34 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
}
}
@Override
public TSExecuteInsertRowInBatchResp insertRowInBatch(TSInsertInBatchReq req) {
TSExecuteInsertRowInBatchResp resp = new TSExecuteInsertRowInBatchResp();
if (!checkLogin()) {
logger.info(INFO_NOT_LOGIN, IoTDBConstant.GLOBAL_DB_NAME);
resp.addToStatusList(new TSStatus(getStatus(TSStatusCode.NOT_LOGIN_ERROR)));
return resp;
}
InsertPlan plan = new InsertPlan();
for (int i = 0; i < req.deviceIds.size(); i++) {
plan.setDeviceId(req.getDeviceIds().get(i));
plan.setTime(req.getTimestamps().get(i));
plan.setMeasurements(req.getMeasurementsList().get(i).toArray(new String[0]));
plan.setValues(req.getValuesList().get(i).toArray(new String[0]));
TSStatus status = checkAuthority(plan);
if (status != null) {
resp.addToStatusList(new TSStatus(status));
}
else{
resp.addToStatusList(executePlan(plan));
}
}
return resp;
}
@Override
public TSStatus insertRow(TSInsertReq req) {
if (!checkLogin()) {

View File

@ -51,6 +51,8 @@ Last Updated on October 27th, 2019 by Lei Rui.
| Add method TSStatus deleteData(1:TSDeleteDataReq req) | Jack Tsai, Jialin Qiao, qiaojialin |
| Add method TSStatus deleteTimeseries(1:list\<string> path) | qiaojialin |
| Add method TSStatus deleteStorageGroups(1:list\<string> storageGroup) | Yi Tao |
| Add Struct TSExecuteInsertRowInBatchResp | Kaifeng Xue |
| Add method insertRowInBatch(1:TSInsertInBatchReq req); | Kaifeng Xue |
## 3. Update

View File

@ -126,6 +126,9 @@ struct TSExecuteStatementReq {
3: required i64 statementId
}
struct TSExecuteInsertRowInBatchResp{
1: required list<TSStatus> statusList
}
struct TSExecuteBatchStatementResp{
1: required TSStatus status
@ -240,6 +243,13 @@ struct TSInsertReq {
4: required i64 timestamp
}
struct TSInsertInBatchReq {
1: required list<string> deviceIds
2: required list<list<string>> measurementsList
3: required list<list<string>> valuesList
4: required list<i64> timestamps
}
struct TSDeleteDataReq {
1: required list<string> paths
2: required i64 timestamp
@ -299,6 +309,8 @@ service TSIService {
TSStatus insertRow(1:TSInsertReq req);
TSExecuteInsertRowInBatchResp insertRowInBatch(1:TSInsertInBatchReq req);
TSStatus deleteData(1:TSDeleteDataReq req);
i64 requestStatementId();

View File

@ -18,11 +18,33 @@
*/
package org.apache.iotdb.session;
import static org.apache.iotdb.session.Config.PATH_MATCHER;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Pattern;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.iotdb.rpc.IoTDBRPCException;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.*;
import org.apache.iotdb.service.rpc.thrift.TSBatchInsertionReq;
import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementResp;
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
import org.apache.iotdb.service.rpc.thrift.TSGetTimeZoneResp;
import org.apache.iotdb.service.rpc.thrift.TSIService;
import org.apache.iotdb.service.rpc.thrift.TSInsertInBatchReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertReq;
import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.iotdb.service.rpc.thrift.TS_SessionHandle;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
@ -36,14 +58,6 @@ import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Pattern;
import static org.apache.iotdb.session.Config.PATH_MATCHER;
public class Session {
private static final Logger logger = LoggerFactory.getLogger(Session.class);
@ -57,7 +71,6 @@ public class Session {
private TSocket transport;
private boolean isClosed = true;
private ZoneId zoneId;
private TSOperationHandle operationHandle;
private long statementId;
@ -154,7 +167,12 @@ public class Session {
}
}
public synchronized TSExecuteBatchStatementResp insertBatch(RowBatch rowBatch)
/**
* use batch interface to insert data
*
* @param rowBatch data batch
*/
public TSExecuteBatchStatementResp insertBatch(RowBatch rowBatch)
throws IoTDBSessionException {
TSBatchInsertionReq request = new TSBatchInsertionReq();
request.deviceId = rowBatch.deviceId;
@ -173,7 +191,45 @@ public class Session {
}
}
public synchronized TSStatus insert(String deviceId, long time, List<String> measurements,
/**
* insert data in batch format, which can reduce the overhead of network
*/
public List<TSStatus> insertInBatch(List<String> deviceIds, List<Long> times,
List<List<String>> measurementsList,
List<List<String>> valuesList)
throws IoTDBSessionException {
// check params size
int len = deviceIds.size();
if (len != times.size() || len != measurementsList.size() || len != valuesList.size()) {
throw new IllegalArgumentException(
"deviceIds, times, measurementsList and valuesList's size should be equal");
}
TSInsertInBatchReq request = new TSInsertInBatchReq();
request.setDeviceIds(deviceIds);
request.setTimestamps(times);
request.setMeasurementsList(measurementsList);
request.setValuesList(valuesList);
try {
List<TSStatus> result = new ArrayList<>();
for (TSStatus cur : client.insertRowInBatch(request).getStatusList()) {
result.add(checkAndReturn(cur));
}
return result;
} catch (TException e) {
throw new IoTDBSessionException(e);
}
}
/**
* insert data in one row, if you want improve your performance, please use insertInBatch method
* or insertBatch method
*
* @see Session#insertInBatch(List, List, List, List)
* @see Session#insertBatch(RowBatch)
*/
public TSStatus insert(String deviceId, long time, List<String> measurements,
List<String> values)
throws IoTDBSessionException {
TSInsertReq request = new TSInsertReq();
@ -194,7 +250,7 @@ public class Session {
*
* @param path timeseries to delete, should be a whole path
*/
synchronized TSStatus deleteTimeseries(String path) throws IoTDBSessionException {
public TSStatus deleteTimeseries(String path) throws IoTDBSessionException {
List<String> paths = new ArrayList<>();
paths.add(path);
return deleteTimeseries(paths);
@ -205,7 +261,7 @@ public class Session {
*
* @param paths timeseries to delete, should be a whole path
*/
public synchronized TSStatus deleteTimeseries(List<String> paths) throws IoTDBSessionException {
public TSStatus deleteTimeseries(List<String> paths) throws IoTDBSessionException {
try {
return checkAndReturn(client.deleteTimeseries(paths));
} catch (TException e) {
@ -219,7 +275,7 @@ public class Session {
* @param path data in which time series to delete
* @param time data with time stamp less than or equal to time will be deleted
*/
public synchronized TSStatus deleteData(String path, long time) throws IoTDBSessionException {
public TSStatus deleteData(String path, long time) throws IoTDBSessionException {
List<String> paths = new ArrayList<>();
paths.add(path);
return deleteData(paths, time);
@ -231,7 +287,7 @@ public class Session {
* @param paths data in which time series to delete
* @param time data with time stamp less than or equal to time will be deleted
*/
synchronized TSStatus deleteData(List<String> paths, long time)
public TSStatus deleteData(List<String> paths, long time)
throws IoTDBSessionException {
TSDeleteDataReq request = new TSDeleteDataReq();
request.setPaths(paths);
@ -244,7 +300,7 @@ public class Session {
}
}
public synchronized TSStatus setStorageGroup(String storageGroupId) throws IoTDBSessionException {
public TSStatus setStorageGroup(String storageGroupId) throws IoTDBSessionException {
checkPathValidity(storageGroupId);
try {
return checkAndReturn(client.setStorageGroup(storageGroupId));
@ -254,14 +310,14 @@ public class Session {
}
synchronized TSStatus deleteStorageGroup(String storageGroup)
public TSStatus deleteStorageGroup(String storageGroup)
throws IoTDBSessionException {
List<String> groups = new ArrayList<>();
groups.add(storageGroup);
return deleteStorageGroups(groups);
}
synchronized TSStatus deleteStorageGroups(List<String> storageGroup)
public TSStatus deleteStorageGroups(List<String> storageGroup)
throws IoTDBSessionException {
try {
return checkAndReturn(client.deleteStorageGroups(storageGroup));
@ -270,7 +326,7 @@ public class Session {
}
}
public synchronized TSStatus createTimeseries(String path, TSDataType dataType,
public TSStatus createTimeseries(String path, TSDataType dataType,
TSEncoding encoding, CompressionType compressor) throws IoTDBSessionException {
checkPathValidity(path);
TSCreateTimeseriesReq request = new TSCreateTimeseriesReq();
@ -345,9 +401,9 @@ public class Session {
TSExecuteStatementResp execResp = client.executeStatement(execReq);
RpcUtils.verifySuccess(execResp.getStatus());
operationHandle = execResp.getOperationHandle();
return new SessionDataSet(sql, execResp.getColumns(), execResp.getDataTypeList(),
operationHandle.getOperationId().getQueryId(), client, operationHandle);
execResp.getOperationHandle().getOperationId().getQueryId(), client,
execResp.getOperationHandle());
}
/**
@ -363,8 +419,6 @@ public class Session {
TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionHandle, sql, statementId);
TSExecuteStatementResp execResp = client.executeUpdateStatement(execReq);
operationHandle = execResp.getOperationHandle();
RpcUtils.verifySuccess(execResp.getStatus());
}

View File

@ -93,6 +93,10 @@ public class IoTDBSessionIT {
query2();
insertInBatch();
query4();
// Add another storage group to test the deletion of storage group
session.setStorageGroup("root.sg2");
session.createTimeseries("root.sg2.d1.s1", TSDataType.INT64, TSEncoding.RLE,
@ -132,6 +136,45 @@ public class IoTDBSessionIT {
CompressionType.SNAPPY);
session.createTimeseries("root.sg1.d1.s3", TSDataType.INT64, TSEncoding.RLE,
CompressionType.SNAPPY);
session.createTimeseries("root.sg1.d2.s1", TSDataType.INT64, TSEncoding.RLE,
CompressionType.SNAPPY);
session.createTimeseries("root.sg1.d2.s2", TSDataType.INT64, TSEncoding.RLE,
CompressionType.SNAPPY);
session.createTimeseries("root.sg1.d2.s3", TSDataType.INT64, TSEncoding.RLE,
CompressionType.SNAPPY);
}
private void insertInBatch() throws IoTDBSessionException {
String deviceId = "root.sg1.d2";
List<String> measurements = new ArrayList<>();
measurements.add("s1");
measurements.add("s2");
measurements.add("s3");
List<String> deviceIds = new ArrayList<>();
List<List<String>> measurementsList = new ArrayList<>();
List<List<String>> valuesList = new ArrayList<>();
List<Long> timestamps = new ArrayList<>();
for (long time = 0; time < 500; time++) {
List<String> values = new ArrayList<>();
values.add("1");
values.add("2");
values.add("3");
deviceIds.add(deviceId);
measurementsList.add(measurements);
valuesList.add(values);
timestamps.add(time);
if (time != 0 && time % 100 == 0) {
session.insertInBatch(deviceIds, timestamps, measurementsList, valuesList);
deviceIds.clear();
measurementsList.clear();
valuesList.clear();
timestamps.clear();
}
}
session.insertInBatch(deviceIds, timestamps, measurementsList, valuesList);
}
private void insert() throws IoTDBSessionException {
@ -199,7 +242,8 @@ public class IoTDBSessionIT {
private void query() throws ClassNotFoundException, SQLException {
Class.forName(Config.JDBC_DRIVER_NAME);
String standard =
"Time\n" + "root.sg1.d1.s1\n" + "root.sg1.d1.s2\n" + "root.sg1.d1.s3\n";
"Time\n" + "root.sg1.d1.s1\n" + "root.sg1.d1.s2\n" + "root.sg1.d1.s3\n"
+ "root.sg1.d2.s1\n" + "root.sg1.d2.s2\n" + "root.sg1.d2.s3\n";
try (Connection connection = DriverManager
.getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
Statement statement = connection.createStatement()) {
@ -223,7 +267,8 @@ public class IoTDBSessionIT {
private void query2() throws ClassNotFoundException, SQLException {
Class.forName(Config.JDBC_DRIVER_NAME);
String standard =
"Time\n" + "root.sg1.d1.s2\n" + "root.sg1.d1.s3\n";
"Time\n" + "root.sg1.d1.s2\n" + "root.sg1.d1.s3\n"
+ "root.sg1.d2.s1\n" + "root.sg1.d2.s2\n" + "root.sg1.d2.s3\n";
try (Connection connection = DriverManager
.getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
Statement statement = connection.createStatement()) {
@ -284,6 +329,23 @@ public class IoTDBSessionIT {
}
}
private void query4() throws TException, IoTDBRPCException, SQLException {
SessionDataSet sessionDataSet = session.executeQueryStatement("select * from root.sg1.d2");
sessionDataSet.setBatchSize(1024);
int count = 0;
while (sessionDataSet.hasNext()) {
long index = 1;
count++;
for (Field f : sessionDataSet.next().getFields()) {
Assert.assertEquals(f.getLongV(), index);
index++;
}
}
Assert.assertEquals(500, count);
sessionDataSet.closeOperationHandle();
}
private void query3() throws TException, IoTDBRPCException, SQLException {
SessionDataSet sessionDataSet = session.executeQueryStatement("select * from root.sg1.d1");
sessionDataSet.setBatchSize(1024);
@ -340,7 +402,7 @@ public class IoTDBSessionIT {
session.close();
}
private void checkSetSG(Session session, String sg, boolean correctStatus){
private void checkSetSG(Session session, String sg, boolean correctStatus) {
boolean status = true;
try {
session.setStorageGroup(sg);
@ -350,7 +412,7 @@ public class IoTDBSessionIT {
assertEquals(status, correctStatus);
}
private void checkCreateTimeseries(Session session, String timeseris, boolean correctStatus){
private void checkCreateTimeseries(Session session, String timeseris, boolean correctStatus) {
boolean status = true;
try {
session.createTimeseries(timeseris, TSDataType.INT64, TSEncoding.RLE,