catch runtime exception when inserting memtable

This commit is contained in:
qiaojialin 2019-09-21 18:48:57 +08:00
parent 1776c30a73
commit 93584319ce
15 changed files with 69 additions and 43 deletions

View File

@ -57,7 +57,7 @@ public class SessionExample {
measurements.add("s3");
for (long time = 0; time < 100; time++) {
List<String> values = new ArrayList<>();
values.add("1");
values.add("1a");
values.add("2");
values.add("3");
session.insert(deviceId, time, measurements, values);

View File

@ -36,6 +36,7 @@ import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.StorageEngineFailureException;
import org.apache.iotdb.db.exception.qp.QueryProcessorException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
@ -168,7 +169,11 @@ public class StorageEngine implements IService {
}
// TODO monitor: update statistics
return storageGroupProcessor.insert(insertPlan);
try {
return storageGroupProcessor.insert(insertPlan);
} catch (QueryProcessorException e) {
throw new StorageEngineException(e.getMessage());
}
}
/**
@ -187,7 +192,11 @@ public class StorageEngine implements IService {
}
// TODO monitor: update statistics
return storageGroupProcessor.insertBatch(batchInsertPlan);
try {
return storageGroupProcessor.insertBatch(batchInsertPlan);
} catch (QueryProcessorException e) {
throw new StorageEngineException(e);
}
}
/**

View File

@ -26,6 +26,7 @@ import java.util.Map.Entry;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.exception.qp.QueryProcessorException;
import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.rescon.TVListAllocator;
@ -80,20 +81,28 @@ public abstract class AbstractMemTable implements IMemTable {
@Override
public void insert(InsertPlan insertPlan) {
for (int i = 0; i < insertPlan.getValues().length; i++) {
write(insertPlan.getDeviceId(), insertPlan.getMeasurements()[i],
insertPlan.getDataTypes()[i], insertPlan.getTime(), insertPlan.getValues()[i]);
public void insert(InsertPlan insertPlan) throws QueryProcessorException {
try {
for (int i = 0; i < insertPlan.getValues().length; i++) {
write(insertPlan.getDeviceId(), insertPlan.getMeasurements()[i],
insertPlan.getDataTypes()[i], insertPlan.getTime(), insertPlan.getValues()[i]);
}
long recordSizeInByte = MemUtils.getRecordSize(insertPlan);
memSize += recordSizeInByte;
} catch (RuntimeException e) {
throw new QueryProcessorException(e);
}
long recordSizeInByte = MemUtils.getRecordSize(insertPlan);
memSize += recordSizeInByte;
}
@Override
public void insertBatch(BatchInsertPlan batchInsertPlan, List<Integer> indexes) {
write(batchInsertPlan, indexes);
long recordSizeInByte = MemUtils.getRecordSize(batchInsertPlan);
memSize += recordSizeInByte;
public void insertBatch(BatchInsertPlan batchInsertPlan, List<Integer> indexes) throws QueryProcessorException{
try {
write(batchInsertPlan, indexes);
long recordSizeInByte = MemUtils.getRecordSize(batchInsertPlan);
memSize += recordSizeInByte;
} catch (RuntimeException e) {
throw new QueryProcessorException(e);
}
}

View File

@ -22,6 +22,7 @@ import java.util.List;
import java.util.Map;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.exception.qp.QueryProcessorException;
import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@ -52,9 +53,9 @@ public interface IMemTable {
*/
long memSize();
void insert(InsertPlan insertPlan);
void insert(InsertPlan insertPlan) throws QueryProcessorException;
void insertBatch(BatchInsertPlan batchInsertPlan, List<Integer> indexes);
void insertBatch(BatchInsertPlan batchInsertPlan, List<Integer> indexes) throws QueryProcessorException;
ReadOnlyMemChunk query(String deviceId, String measurement, TSDataType dataType,
Map<String, String> props);

View File

@ -60,6 +60,7 @@ import org.apache.iotdb.db.exception.MetadataErrorException;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.exception.StorageGroupProcessorException;
import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.db.exception.qp.QueryProcessorException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
@ -335,7 +336,7 @@ public class StorageGroupProcessor {
}
}
public boolean insert(InsertPlan insertPlan) {
public boolean insert(InsertPlan insertPlan) throws QueryProcessorException {
writeLock();
try {
// init map
@ -350,7 +351,7 @@ public class StorageGroupProcessor {
}
}
public Integer[] insertBatch(BatchInsertPlan batchInsertPlan) {
public Integer[] insertBatch(BatchInsertPlan batchInsertPlan) throws QueryProcessorException {
writeLock();
try {
// init map
@ -385,7 +386,7 @@ public class StorageGroupProcessor {
}
private void insertBatchToTsFileProcessor(BatchInsertPlan batchInsertPlan,
List<Integer> indexes, boolean sequence, Integer[] results) {
List<Integer> indexes, boolean sequence, Integer[] results) throws QueryProcessorException {
TsFileProcessor tsFileProcessor = getOrCreateTsFileProcessor(sequence);
if (tsFileProcessor == null) {
@ -416,7 +417,8 @@ public class StorageGroupProcessor {
}
}
private boolean insertToTsFileProcessor(InsertPlan insertPlan, boolean sequence) {
private boolean insertToTsFileProcessor(InsertPlan insertPlan, boolean sequence)
throws QueryProcessorException {
TsFileProcessor tsFileProcessor;
boolean result;

View File

@ -43,6 +43,7 @@ import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.CloseTsFileCallBack;
import org.apache.iotdb.db.engine.version.VersionController;
import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.db.exception.qp.QueryProcessorException;
import org.apache.iotdb.db.qp.constant.DatetimeUtils;
import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
@ -133,7 +134,7 @@ public class TsFileProcessor {
* @param insertPlan physical plan of insertion
* @return succeed or fail
*/
public boolean insert(InsertPlan insertPlan) {
public boolean insert(InsertPlan insertPlan) throws QueryProcessorException {
if (workMemTable == null) {
workMemTable = MemTablePool.getInstance().getAvailableMemTable(this);
@ -162,7 +163,7 @@ public class TsFileProcessor {
}
public boolean insertBatch(BatchInsertPlan batchInsertPlan, List<Integer> indexes,
Integer[] results) {
Integer[] results) throws QueryProcessorException {
if (workMemTable == null) {
workMemTable = MemTablePool.getInstance().getAvailableMemTable(this);
}

View File

@ -234,7 +234,7 @@ public class QueryProcessExecutor extends AbstractQueryProcessExecutor {
return storageEngine.insert(insertPlan);
} catch (PathErrorException | StorageEngineException e) {
throw new ProcessorException(e);
throw new ProcessorException(e.getMessage());
}
}

View File

@ -31,6 +31,7 @@ import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.version.VersionController;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.exception.qp.QueryProcessorException;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
@ -99,7 +100,7 @@ public class LogReplayer {
replayUpdate((UpdatePlan) plan);
}
}
} catch (IOException e) {
} catch (IOException | QueryProcessorException e) {
throw new ProcessorException("Cannot replay logs", e);
} finally {
logReader.close();
@ -116,7 +117,7 @@ public class LogReplayer {
}
}
private void replayInsert(InsertPlan insertPlan) {
private void replayInsert(InsertPlan insertPlan) throws QueryProcessorException {
if (currentTsFileResource != null) {
// the last chunk group may contain the same data with the logs, ignore such logs in seq file
Long lastEndTime = currentTsFileResource.getEndTimeMap().get(insertPlan.getDeviceId());

View File

@ -25,6 +25,7 @@ import org.apache.iotdb.db.engine.MetadataManagerHelper;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.qp.QueryProcessorException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.query.context.QueryContext;
@ -74,7 +75,7 @@ public class DeviceMetaDataCacheTest {
EnvironmentUtils.cleanDir(systemDir);
}
private void insertOneRecord(long time, int num) {
private void insertOneRecord(long time, int num) throws QueryProcessorException {
TSRecord record = new TSRecord(time, deviceId0);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId0, String.valueOf(num)));
record.addTuple(DataPoint.getDataPoint(TSDataType.INT64, measurementId1, String.valueOf(num)));
@ -85,7 +86,7 @@ public class DeviceMetaDataCacheTest {
storageGroupProcessor.insert(new InsertPlan(record));
}
protected void insertData() throws IOException {
protected void insertData() throws IOException, QueryProcessorException {
for (int j = 1; j <= 100; j++) {
insertOneRecord(j, j);
}

View File

@ -30,6 +30,7 @@ import org.apache.iotdb.db.engine.merge.manage.MergeManager;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.exception.qp.QueryProcessorException;
import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
@ -72,7 +73,7 @@ public class StorageGroupProcessorTest {
@Test
public void testSequenceSyncClose() {
public void testSequenceSyncClose() throws QueryProcessorException {
for (int j = 1; j <= 10; j++) {
TSRecord record = new TSRecord(j, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
@ -91,7 +92,7 @@ public class StorageGroupProcessorTest {
}
@Test
public void testIoTDBRowBatchWriteAndSyncClose() {
public void testIoTDBRowBatchWriteAndSyncClose() throws QueryProcessorException {
String[] measurements = new String[2];
measurements[0] = "s0";
@ -146,7 +147,7 @@ public class StorageGroupProcessorTest {
@Test
public void testSeqAndUnSeqSyncClose() {
public void testSeqAndUnSeqSyncClose() throws QueryProcessorException {
for (int j = 21; j <= 30; j++) {
TSRecord record = new TSRecord(j, deviceId);
@ -178,7 +179,7 @@ public class StorageGroupProcessorTest {
}
@Test
public void testMerge() {
public void testMerge() throws QueryProcessorException {
mergeLock = new AtomicLong(0);
for (int j = 21; j <= 30; j++) {

View File

@ -32,6 +32,7 @@ import org.apache.iotdb.db.engine.MetadataManagerHelper;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.engine.version.SysTimeVersionController;
import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.db.exception.qp.QueryProcessorException;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.utils.EnvironmentUtils;
@ -74,8 +75,7 @@ public class TsFileProcessorTest {
}
@Test
public void testWriteAndFlush()
throws WriteProcessException, IOException, TsFileProcessorException {
public void testWriteAndFlush() throws IOException, QueryProcessorException {
processor = new TsFileProcessor(storageGroup, SystemFileFactory.INSTANCE.getFile(filePath),
SchemaUtils.constructSchema(deviceId), SysTimeVersionController.INSTANCE, x -> {
},
@ -121,8 +121,7 @@ public class TsFileProcessorTest {
}
@Test
public void testWriteAndRestoreMetadata()
throws IOException {
public void testWriteAndRestoreMetadata() throws IOException, QueryProcessorException {
processor = new TsFileProcessor(storageGroup, SystemFileFactory.INSTANCE.getFile(filePath),
SchemaUtils.constructSchema(deviceId), SysTimeVersionController.INSTANCE, x -> {
},
@ -188,8 +187,7 @@ public class TsFileProcessorTest {
@Test
public void testMultiFlush()
throws WriteProcessException, IOException, TsFileProcessorException {
public void testMultiFlush() throws IOException, QueryProcessorException {
processor = new TsFileProcessor(storageGroup, SystemFileFactory.INSTANCE.getFile(filePath),
SchemaUtils.constructSchema(deviceId), SysTimeVersionController.INSTANCE, x -> {
},
@ -224,8 +222,7 @@ public class TsFileProcessorTest {
@Test
public void testWriteAndClose()
throws WriteProcessException, IOException {
public void testWriteAndClose() throws IOException, QueryProcessorException {
processor = new TsFileProcessor(storageGroup, SystemFileFactory.INSTANCE.getFile(filePath),
SchemaUtils.constructSchema(deviceId), SysTimeVersionController.INSTANCE,
unsealedTsFileProcessor -> {

View File

@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.reader;
import java.io.IOException;
import org.apache.iotdb.db.engine.MetadataManagerHelper;
import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
import org.apache.iotdb.db.exception.qp.QueryProcessorException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.utils.EnvironmentUtils;
@ -58,9 +59,9 @@ public abstract class ReaderTestHelper {
EnvironmentUtils.cleanDir(systemDir);
}
abstract protected void insertData() throws IOException;
abstract protected void insertData() throws IOException, QueryProcessorException;
protected void insertOneRecord(long time, int num) {
protected void insertOneRecord(long time, int num) throws QueryProcessorException {
TSRecord record = new TSRecord(time, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(num)));
storageGroupProcessor.insert(new InsertPlan(record));

View File

@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.reader.fileRelated;
import java.io.IOException;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.qp.QueryProcessorException;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.reader.ReaderTestHelper;
import org.apache.iotdb.db.utils.EnvironmentUtils;
@ -84,7 +85,7 @@ public class UnSealedTsFileReaderTest extends ReaderTestHelper {
@Override
protected void insertData() throws IOException {
protected void insertData() throws IOException, QueryProcessorException {
for (int j = 1000; j <= 1009; j++) {
insertOneRecord(j, j);
}

View File

@ -20,6 +20,7 @@ package org.apache.iotdb.db.query.reader.resourceRelated;
import java.io.IOException;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.qp.QueryProcessorException;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.reader.ReaderTestHelper;
import org.apache.iotdb.db.utils.EnvironmentUtils;
@ -77,7 +78,7 @@ public class SeqResourceReaderTest extends ReaderTestHelper {
}
@Override
protected void insertData() throws IOException {
protected void insertData() throws IOException, QueryProcessorException {
for (int j = 1000; j <= 1009; j++) {
insertOneRecord(j, j);
storageGroupProcessor.putAllWorkingTsFileProcessorIntoClosingList();

View File

@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.reader.resourceRelated;
import java.io.IOException;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.qp.QueryProcessorException;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
@ -38,7 +39,7 @@ public class UnseqResourceReaderTest extends ReaderTestHelper {
private QueryContext context = EnvironmentUtils.TEST_QUERY_CONTEXT;
@Override
protected void insertData() throws IOException {
protected void insertData() throws IOException, QueryProcessorException {
for (int j = 1; j <= 100; j++) {
insertOneRecord(j, j);
}