diff --git a/iotdb/conf/iotdb-engine.properties b/iotdb/conf/iotdb-engine.properties index c2b566cd04c..27b6e0d83ba 100644 --- a/iotdb/conf/iotdb-engine.properties +++ b/iotdb/conf/iotdb-engine.properties @@ -89,10 +89,24 @@ concurrent_flush_thread=0 # Statistics Monitor configuration -# default monitor is enabled, and write statistics info to IoTDB every 5 seconds -# Choose to change the back_loop_period >= 1 seconds +# Set enable_stat_monitor true(or false) to enable(or disable) the StatMonitor that stores statistics info periodically. +# back_loop_period_sec decides the period when StatMonitor writes statistics info into IoTDB. +# stat_monitor_detect_freq_sec decides when IoTDB detects statistics info out-of-date. +# IoTDB just keeps statistics info within stat_monitor_retain_interval_sec seconds before current time. +# Note: IoTDB requires stat_monitor_detect_freq_sec >= 600s and stat_monitor_retain_interval_sec >= 600s. + +# The monitor is enabled by default, and writes statistics info to IoTDB periodically enable_stat_monitor = true -back_loop_period = 5 + +# The period that StatMonitor stores statistics info +back_loop_period_sec = 5 + +# The interval at which StatMonitor starts to check whether statistics info can be deleted due to exceeding the retention volume +stat_monitor_detect_freq_sec = 600 + +# The minimum age of statistics storage information to be eligible for deletion due to age +stat_monitor_retain_interval_sec = 600 + # When set to false, MemMonitorThread and MemStatisticThread will not be created. enable_mem_monitor=true diff --git a/src/main/java/cn/edu/tsinghua/iotdb/conf/TsfileDBConfig.java b/src/main/java/cn/edu/tsinghua/iotdb/conf/TsfileDBConfig.java index 54f811350f4..82cf1fa4ebb 100644 --- a/src/main/java/cn/edu/tsinghua/iotdb/conf/TsfileDBConfig.java +++ b/src/main/java/cn/edu/tsinghua/iotdb/conf/TsfileDBConfig.java @@ -175,18 +175,27 @@ public class TsfileDBConfig { */ public long smallFlushInterval = 60 * 1000; - /* - * The statMonitor's BackLoop period, 5s is enough + /** + * The statMonitor writes statistics info into IoTDB every backLoopPeriodSec secs. + * Default value is 5s. */ - public int backLoopPeriod = 5; + public int backLoopPeriodSec = 5; /** - * Set whether to enable statistics service + * Set true to enable statistics monitor service, + * false to disable statistics service */ public boolean enableStatMonitor = true; + /** - * the maximum number of writing instances existing in same time. + * Set the time interval when StatMonitor performs delete detection, default value is 600s, */ + public int statMonitorDetectFreqSec = 60 * 10; + + /** + * Set the maximum time to keep monitor statistics information in IoTDB, default value is 600s + */ + public int statMonitorRetainIntervalSec = 60 * 10; public TsfileDBConfig() {} diff --git a/src/main/java/cn/edu/tsinghua/iotdb/conf/TsfileDBDescriptor.java b/src/main/java/cn/edu/tsinghua/iotdb/conf/TsfileDBDescriptor.java index fcf20ba24b7..06d178dbc85 100644 --- a/src/main/java/cn/edu/tsinghua/iotdb/conf/TsfileDBDescriptor.java +++ b/src/main/java/cn/edu/tsinghua/iotdb/conf/TsfileDBDescriptor.java @@ -68,7 +68,22 @@ public class TsfileDBDescriptor { try { properties.load(inputStream); conf.enableStatMonitor = Boolean.parseBoolean(properties.getProperty("enable_stat_monitor", conf.enableStatMonitor + "")); - conf.backLoopPeriod = Integer.parseInt(properties.getProperty("back_loop_period", conf.backLoopPeriod + "")); + conf.backLoopPeriodSec = Integer.parseInt(properties.getProperty("back_loop_period_sec", conf.backLoopPeriodSec + "")); + int statMonitorDetectFreqSec = Integer.parseInt(properties.getProperty("stat_monitor_detect_freq_sec", conf.statMonitorDetectFreqSec + "")); + int statMonitorRetainIntervalSec = Integer.parseInt(properties.getProperty("stat_monitor_retain_interval_sec", conf.statMonitorRetainIntervalSec + "")); + // the conf value must > default value, or may cause system unstable + if (conf.statMonitorDetectFreqSec < statMonitorDetectFreqSec) { + conf.statMonitorDetectFreqSec = statMonitorDetectFreqSec; + } else { + LOGGER.info("The stat_monitor_detect_freq_sec value is smaller than default, use default value"); + } + + if (conf.statMonitorRetainIntervalSec < statMonitorRetainIntervalSec) { + conf.statMonitorRetainIntervalSec = statMonitorRetainIntervalSec; + }else { + LOGGER.info("The stat_monitor_retain_interval_sec value is smaller than default, use default value"); + } + conf.rpcPort = Integer.parseInt(properties.getProperty("rpc_port",conf.rpcPort+"")); conf.enableWal = Boolean.parseBoolean(properties.getProperty("enable_wal", conf.enableWal+"")); diff --git a/src/main/java/cn/edu/tsinghua/iotdb/engine/filenode/FileNodeManager.java b/src/main/java/cn/edu/tsinghua/iotdb/engine/filenode/FileNodeManager.java index c4d9e958847..0afa34354ea 100644 --- a/src/main/java/cn/edu/tsinghua/iotdb/engine/filenode/FileNodeManager.java +++ b/src/main/java/cn/edu/tsinghua/iotdb/engine/filenode/FileNodeManager.java @@ -93,6 +93,12 @@ public class FileNodeManager implements IStatistic { } }; + private void updateStatHashMapWhenFail(TSRecord tsRecord) { + statParamsHashMap.get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_REQ_FAIL.name()) + .incrementAndGet(); + statParamsHashMap.get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_POINTS_FAIL.name()) + .addAndGet(tsRecord.dataPointList.size()); + } /** * @return the key represent the params' name, values is AtomicLong type */ @@ -197,7 +203,7 @@ public class FileNodeManager implements IStatistic { if (TsFileDBConf.enableStatMonitor) { StatMonitor statMonitor = StatMonitor.getInstance(); registStatMetadata(); - statMonitor.registStatistics(getClass().getSimpleName(), this); + statMonitor.registStatistics(statStorageDeltaName, this); } } @@ -267,12 +273,22 @@ public class FileNodeManager implements IStatistic { } } - public int insert(TSRecord tsRecord) throws FileNodeManagerException { + /** + * insert TsRecord into storage group + * @param tsRecord: input Data + * @param isMonitor: if true the insertion is done by StatMonitor then the Stat Info will not be recorded. + * else the statParamsHashMap will be updated + * @return an int value represents the insert type + * @throws FileNodeManagerException + */ + public int insert(TSRecord tsRecord, boolean isMonitor) throws FileNodeManagerException { long timestamp = tsRecord.time; String deltaObjectId = tsRecord.deltaObjectId; - statParamsHashMap.get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_POINTS.name()) - .addAndGet(tsRecord.dataPointList.size()); + if (!isMonitor) { + statParamsHashMap.get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_POINTS.name()) + .addAndGet(tsRecord.dataPointList.size()); + } FileNodeProcessor fileNodeProcessor = getProcessor(deltaObjectId, true); int insertType = 0; @@ -293,10 +309,9 @@ public class FileNodeManager implements IStatistic { String.format("Get the overflow processor failed, the filenode is {}, insert time is {}", filenodeName, timestamp), e); - statParamsHashMap.get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_REQ_FAIL.name()) - .incrementAndGet(); - statParamsHashMap.get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_POINTS_FAIL.name()) - .addAndGet(tsRecord.dataPointList.size()); + if (!isMonitor) { + updateStatHashMapWhenFail(tsRecord); + } throw new FileNodeManagerException(e); } // write wal @@ -308,10 +323,9 @@ public class FileNodeManager implements IStatistic { } } catch (IOException | PathErrorException e) { LOGGER.error("Error in write WAL.", e); - statParamsHashMap.get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_REQ_FAIL.name()) - .incrementAndGet(); - statParamsHashMap.get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_POINTS_FAIL.name()) - .addAndGet(tsRecord.dataPointList.size()); + if (!isMonitor) { + updateStatHashMapWhenFail(tsRecord); + } throw new FileNodeManagerException(e); } // write overflow data @@ -321,10 +335,9 @@ public class FileNodeManager implements IStatistic { dataPoint.getType(), dataPoint.getValue().toString()); } catch (ProcessorException e) { LOGGER.error("Insert into overflow error, the reason is {}", e.getMessage()); - statParamsHashMap.get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_REQ_FAIL.name()) - .incrementAndGet(); - statParamsHashMap.get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_POINTS_FAIL.name()) - .addAndGet(tsRecord.dataPointList.size()); + if (!isMonitor) { + updateStatHashMapWhenFail(tsRecord); + } throw new FileNodeManagerException(e); } } @@ -340,10 +353,9 @@ public class FileNodeManager implements IStatistic { } catch (FileNodeProcessorException e) { LOGGER.error("Get the bufferwrite processor failed, the filenode is {}, insert time is {}", filenodeName, timestamp); - statParamsHashMap.get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_REQ_FAIL.name()) - .incrementAndGet(); - statParamsHashMap.get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_POINTS_FAIL.name()) - .addAndGet(tsRecord.dataPointList.size()); + if (!isMonitor) { + updateStatHashMapWhenFail(tsRecord); + } throw new FileNodeManagerException(e); } // Add a new interval file to newfilelist @@ -353,10 +365,9 @@ public class FileNodeManager implements IStatistic { try { fileNodeProcessor.addIntervalFileNode(timestamp, bufferwriteRelativePath); } catch (Exception e) { - statParamsHashMap.get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_REQ_FAIL.name()) - .incrementAndGet(); - statParamsHashMap.get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_POINTS_FAIL.name()) - .addAndGet(tsRecord.dataPointList.size()); + if (!isMonitor) { + updateStatHashMapWhenFail(tsRecord); + } throw new FileNodeManagerException(e); } } @@ -369,20 +380,18 @@ public class FileNodeManager implements IStatistic { } } catch (IOException | PathErrorException e) { LOGGER.error("Error in write WAL.", e); - statParamsHashMap.get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_REQ_FAIL.name()) - .incrementAndGet(); - statParamsHashMap.get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_POINTS_FAIL.name()) - .addAndGet(tsRecord.dataPointList.size()); + if (!isMonitor) { + updateStatHashMapWhenFail(tsRecord); + } throw new FileNodeManagerException(e); } // Write data try { bufferWriteProcessor.write(tsRecord); } catch (BufferWriteProcessorException e) { - statParamsHashMap.get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_REQ_FAIL.name()) - .incrementAndGet(); - statParamsHashMap.get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_POINTS_FAIL.name()) - .addAndGet(tsRecord.dataPointList.size()); + if (!isMonitor) { + updateStatHashMapWhenFail(tsRecord); + } throw new FileNodeManagerException(e); } fileNodeProcessor.setIntervalFileNodeStartTime(deltaObjectId, timestamp); @@ -392,14 +401,17 @@ public class FileNodeManager implements IStatistic { } finally { fileNodeProcessor.writeUnlock(); } - fileNodeProcessor.getStatParamsHashMap() - .get(MonitorConstants.FileNodeProcessorStatConstants.TOTAL_POINTS_SUCCESS.name()) - .addAndGet(tsRecord.dataPointList.size()); - fileNodeProcessor.getStatParamsHashMap() - .get(MonitorConstants.FileNodeProcessorStatConstants.TOTAL_REQ_SUCCESS.name()).incrementAndGet(); - statParamsHashMap.get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_REQ_SUCCESS.name()).incrementAndGet(); - statParamsHashMap.get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_POINTS_SUCCESS.name()) - .addAndGet(tsRecord.dataPointList.size()); + //Modify the insert + if (!isMonitor) { + fileNodeProcessor.getStatParamsHashMap() + .get(MonitorConstants.FileNodeProcessorStatConstants.TOTAL_POINTS_SUCCESS.name()) + .addAndGet(tsRecord.dataPointList.size()); + fileNodeProcessor.getStatParamsHashMap() + .get(MonitorConstants.FileNodeProcessorStatConstants.TOTAL_REQ_SUCCESS.name()).incrementAndGet(); + statParamsHashMap.get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_REQ_SUCCESS.name()).incrementAndGet(); + statParamsHashMap.get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_POINTS_SUCCESS.name()) + .addAndGet(tsRecord.dataPointList.size()); + } return insertType; } diff --git a/src/main/java/cn/edu/tsinghua/iotdb/monitor/IStatistic.java b/src/main/java/cn/edu/tsinghua/iotdb/monitor/IStatistic.java index fe4296ce292..f10b414b0f8 100644 --- a/src/main/java/cn/edu/tsinghua/iotdb/monitor/IStatistic.java +++ b/src/main/java/cn/edu/tsinghua/iotdb/monitor/IStatistic.java @@ -4,6 +4,7 @@ import cn.edu.tsinghua.tsfile.timeseries.write.record.TSRecord; import java.util.HashMap; import java.util.List; +import java.util.concurrent.atomic.AtomicLong; public interface IStatistic { @@ -27,4 +28,10 @@ public interface IStatistic { * the name is the statistics name need to store */ List getAllPathForStatistic(); + + /** + * + * @return a HashMap contains the name and values of the statistics parameters + */ + HashMap getStatParamsHashMap(); } diff --git a/src/main/java/cn/edu/tsinghua/iotdb/monitor/StatMonitor.java b/src/main/java/cn/edu/tsinghua/iotdb/monitor/StatMonitor.java index 81c31799c47..8db909d7a12 100644 --- a/src/main/java/cn/edu/tsinghua/iotdb/monitor/StatMonitor.java +++ b/src/main/java/cn/edu/tsinghua/iotdb/monitor/StatMonitor.java @@ -7,6 +7,8 @@ import cn.edu.tsinghua.iotdb.exception.FileNodeManagerException; import cn.edu.tsinghua.iotdb.exception.MetadataArgsErrorException; import cn.edu.tsinghua.iotdb.exception.PathErrorException; import cn.edu.tsinghua.iotdb.metadata.MManager; +import cn.edu.tsinghua.iotdb.utils.IoTDBThreadPoolFactory; +import cn.edu.tsinghua.tsfile.file.metadata.enums.TSDataType; import cn.edu.tsinghua.tsfile.timeseries.write.record.DataPoint; import cn.edu.tsinghua.tsfile.timeseries.write.record.TSRecord; import cn.edu.tsinghua.tsfile.timeseries.write.record.datapoint.LongDataPoint; @@ -14,8 +16,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.*; -import java.util.concurrent.Executors; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -25,11 +28,16 @@ import java.util.concurrent.atomic.AtomicLong; */ public class StatMonitor { private static final Logger LOGGER = LoggerFactory.getLogger(StatMonitor.class); - private final int backLoopPeriod; - // key is the store path like FileNodeProcessor.root_stats_xxx.xxx, - // or simple name like:FileNodeManager. And value is interface implement - // statistics function + private long runningTimeMillis = System.currentTimeMillis(); + private final int backLoopPeriod; + private final int statMonitorDetectFreqSec; + private final int statMonitorRetainIntervalSec; + + /** + * key: is the statistics store path + * Value: is an interface that implements statistics function + */ private HashMap statisticMap; private ScheduledExecutorService service; @@ -45,7 +53,9 @@ public class StatMonitor { MManager mManager = MManager.getInstance(); statisticMap = new HashMap<>(); TsfileDBConfig config = TsfileDBDescriptor.getInstance().getConfig(); - backLoopPeriod = config.backLoopPeriod; + statMonitorDetectFreqSec = config.statMonitorDetectFreqSec; + statMonitorRetainIntervalSec = config.statMonitorRetainIntervalSec; + backLoopPeriod = config.backLoopPeriodSec; try { String prefix = MonitorConstants.statStorageGroupPrefix; @@ -102,8 +112,14 @@ public class StatMonitor { } } + public void recovery() { + // TODO: restore the FildeNode Manager TOTAL_POINTS statistics info + + } + public void activate() { - service = Executors.newScheduledThreadPool(1); + + service = IoTDBThreadPoolFactory.newScheduledThreadPool(1, "StatMonitorService"); service.scheduleAtFixedRate(new StatMonitor.statBackLoop(), 1, backLoopPeriod, TimeUnit.SECONDS ); @@ -213,7 +229,7 @@ public class StatMonitor { int pointNum; for (Map.Entry entry : tsRecordHashMap.entrySet()) { try { - fManager.insert(entry.getValue()); + fManager.insert(entry.getValue(), true); numInsert.incrementAndGet(); pointNum = entry.getValue().dataPointList.size(); numPointsInsert.addAndGet(pointNum); @@ -245,6 +261,27 @@ public class StatMonitor { class statBackLoop implements Runnable { public void run() { + long currentTimeMillis = System.currentTimeMillis(); + long seconds = (currentTimeMillis - runningTimeMillis)/1000; + if (seconds - statMonitorDetectFreqSec >= 0) { + runningTimeMillis = currentTimeMillis; + // delete time-series data + FileNodeManager fManager = FileNodeManager.getInstance(); + try { + for (Map.Entry entry : statisticMap.entrySet()) { + for (String statParamName : entry.getValue().getStatParamsHashMap().keySet()) { + fManager.delete(entry.getKey(), + statParamName, + currentTimeMillis - statMonitorRetainIntervalSec * 1000, + TSDataType.INT64 + ); + } + } + }catch (FileNodeManagerException e) { + LOGGER.error("Error when delete Statistics information periodically, ", e); + e.printStackTrace(); + } + } HashMap tsRecordHashMap = gatherStatistics(); insert(tsRecordHashMap); numBackLoop.incrementAndGet(); diff --git a/src/main/java/cn/edu/tsinghua/iotdb/qp/executor/OverflowQPExecutor.java b/src/main/java/cn/edu/tsinghua/iotdb/qp/executor/OverflowQPExecutor.java index 88b1c0640af..eb3c1d4b54e 100644 --- a/src/main/java/cn/edu/tsinghua/iotdb/qp/executor/OverflowQPExecutor.java +++ b/src/main/java/cn/edu/tsinghua/iotdb/qp/executor/OverflowQPExecutor.java @@ -257,7 +257,7 @@ public class OverflowQPExecutor extends QueryProcessExecutor { TSRecord tsRecord = new TSRecord(timestamp, deltaObjectId); DataPoint dataPoint = DataPoint.getDataPoint(type, measurementId, value); tsRecord.addTuple(dataPoint); - return fileNodeManager.insert(tsRecord); + return fileNodeManager.insert(tsRecord, false); } catch (PathErrorException e) { throw new ProcessorException("Error in insert: " + e.getMessage()); @@ -286,7 +286,7 @@ public class OverflowQPExecutor extends QueryProcessExecutor { DataPoint dataPoint = DataPoint.getDataPoint(dataType, measurementList.get(i), value); tsRecord.addTuple(dataPoint); } - return fileNodeManager.insert(tsRecord); + return fileNodeManager.insert(tsRecord, false); } catch (PathErrorException | FileNodeManagerException e) { throw new ProcessorException(e.getMessage()); diff --git a/src/main/java/cn/edu/tsinghua/iotdb/service/IoTDB.java b/src/main/java/cn/edu/tsinghua/iotdb/service/IoTDB.java index 2495a00e612..ba412db0371 100644 --- a/src/main/java/cn/edu/tsinghua/iotdb/service/IoTDB.java +++ b/src/main/java/cn/edu/tsinghua/iotdb/service/IoTDB.java @@ -86,7 +86,10 @@ public class IoTDB implements IoTDBMBean { } initFileNodeManager(); - enableStatMonitor(); + + // When registering statMonitor, we should start recovering some statistics with latest values stored + // Warn: registMonitor() method should be called before systemDataRecovery() + registStatMonitor(); systemDataRecovery(); maybeInitJmx(); @@ -94,6 +97,8 @@ public class IoTDB implements IoTDBMBean { registMonitor(); registIoTDBServer(); startCloseAndMergeServer(); + // StatMonitor should start at the end + enableStatMonitor(); } private void maybeInitJmx() { @@ -110,6 +115,13 @@ public class IoTDB implements IoTDBMBean { } } + private void registStatMonitor() { + if (TsfileDBDescriptor.getInstance().getConfig().enableStatMonitor){ + statMonitor = StatMonitor.getInstance(); + statMonitor.recovery(); + } + } + private void registMonitor() throws MalformedObjectNameException, InstanceAlreadyExistsException, MBeanRegistrationException, NotCompliantMBeanException { monitorMBean = new Monitor(); diff --git a/src/main/java/cn/edu/tsinghua/iotdb/service/WriteLogRecovery.java b/src/main/java/cn/edu/tsinghua/iotdb/service/WriteLogRecovery.java index 3412409d801..fa7d3d93b93 100644 --- a/src/main/java/cn/edu/tsinghua/iotdb/service/WriteLogRecovery.java +++ b/src/main/java/cn/edu/tsinghua/iotdb/service/WriteLogRecovery.java @@ -1,11 +1,13 @@ package cn.edu.tsinghua.iotdb.service; import java.util.List; +import java.util.logging.Logger; import cn.edu.tsinghua.iotdb.engine.filenode.FileNodeManager; import cn.edu.tsinghua.iotdb.exception.FileNodeManagerException; import cn.edu.tsinghua.iotdb.exception.PathErrorException; import cn.edu.tsinghua.iotdb.metadata.MManager; +import cn.edu.tsinghua.iotdb.monitor.MonitorConstants; import cn.edu.tsinghua.iotdb.qp.physical.crud.DeletePlan; import cn.edu.tsinghua.iotdb.qp.physical.crud.InsertPlan; import cn.edu.tsinghua.iotdb.qp.physical.crud.UpdatePlan; @@ -40,7 +42,15 @@ class WriteLogRecovery { DataPoint dataPoint = DataPoint.getDataPoint(dataType, measurementList.get(i), value); tsRecord.addTuple(dataPoint); } - FileNodeManager.getInstance().insert(tsRecord); + String fileName = MManager.getInstance().getFileNameByPath(deltaObject); + // When Wal restores the statistics info, need to set isMonitor true for the insert method to stop + // collecting statistics data of storage group whose name is "root.stats". Because system will not + // Record statistics data for "root.stats" storage group + if (MonitorConstants.statStorageGroupPrefix.equals(fileName)) { + FileNodeManager.getInstance().insert(tsRecord, true); + } else { + FileNodeManager.getInstance().insert(tsRecord, false); + } } static void update(UpdatePlan updatePlan) throws FileNodeManagerException, PathErrorException { diff --git a/src/main/java/cn/edu/tsinghua/iotdb/utils/LoadDataUtils.java b/src/main/java/cn/edu/tsinghua/iotdb/utils/LoadDataUtils.java index fe723ce3179..b3e4ce7828e 100644 --- a/src/main/java/cn/edu/tsinghua/iotdb/utils/LoadDataUtils.java +++ b/src/main/java/cn/edu/tsinghua/iotdb/utils/LoadDataUtils.java @@ -130,7 +130,7 @@ public class LoadDataUtils { } // appeared before, insert directly try { - fileNodeManager.insert(record); + fileNodeManager.insert(record, false); } catch (FileNodeManagerException e) { LOG.error("failed when insert into fileNodeManager, record:{}, reason:{}", line, e.getMessage()); } diff --git a/src/test/java/cn/edu/tsinghua/iotdb/engine/filenode/FileNodeManagerMulTest.java b/src/test/java/cn/edu/tsinghua/iotdb/engine/filenode/FileNodeManagerMulTest.java index 56a55bd5665..4b9e0e21734 100644 --- a/src/test/java/cn/edu/tsinghua/iotdb/engine/filenode/FileNodeManagerMulTest.java +++ b/src/test/java/cn/edu/tsinghua/iotdb/engine/filenode/FileNodeManagerMulTest.java @@ -210,7 +210,7 @@ public class FileNodeManagerMulTest { DataPoint dataPoint = DataPoint.getDataPoint(dataType, measurementId, String.valueOf(5)); record.addTuple(dataPoint); try { - fileNodeManager.insert(record); + fileNodeManager.insert(record, false); } catch (FileNodeManagerException e) { fail(e.getMessage()); e.printStackTrace(); @@ -219,7 +219,7 @@ public class FileNodeManagerMulTest { record = new TSRecord(5, deltaObjectId2); record.addTuple(dataPoint); try { - fileNodeManager.insert(record); + fileNodeManager.insert(record, false); } catch (FileNodeManagerException e) { e.printStackTrace(); fail(e.getMessage()); @@ -229,7 +229,7 @@ public class FileNodeManagerMulTest { record = new TSRecord(65, deltaObjectId1); record.addTuple(dataPoint); try { - fileNodeManager.insert(record); + fileNodeManager.insert(record, false); } catch (FileNodeManagerException e) { e.printStackTrace(); fail(e.getMessage()); @@ -394,13 +394,13 @@ public class FileNodeManagerMulTest { DataPoint dataPoint = DataPoint.getDataPoint(dataType, measurementId, String.valueOf(5)); record.addTuple(dataPoint); try { - fileNodeManager.insert(record); + fileNodeManager.insert(record, false); record = new TSRecord(5, deltaObjectId1); record.addTuple(dataPoint); - fileNodeManager.insert(record); + fileNodeManager.insert(record, false); record = new TSRecord(10, deltaObjectId2); record.addTuple(dataPoint); - fileNodeManager.insert(record); + fileNodeManager.insert(record, false); // query check QueryStructure queryStructure = fileNodeManager.query(deltaObjectId2, measurementId, null, null, null); DynamicOneColumnData insert = (DynamicOneColumnData) queryStructure.getAllOverflowData().get(0); @@ -476,13 +476,13 @@ public class FileNodeManagerMulTest { DataPoint dataPoint = DataPoint.getDataPoint(dataType, measurementId, String.valueOf(5)); record.addTuple(dataPoint); try { - fileNodeManager.insert(record); + fileNodeManager.insert(record, false); record = new TSRecord(5, deltaObjectId1); record.addTuple(dataPoint); - fileNodeManager.insert(record); + fileNodeManager.insert(record, false); record = new TSRecord(10, deltaObjectId2); record.addTuple(dataPoint); - fileNodeManager.insert(record); + fileNodeManager.insert(record, false); // query check QueryStructure queryStructure = fileNodeManager.query(deltaObjectId2, measurementId, null, null, null); DynamicOneColumnData insert = (DynamicOneColumnData) queryStructure.getAllOverflowData().get(0); @@ -497,7 +497,7 @@ public class FileNodeManagerMulTest { record = new TSRecord(20, deltaObjectId2); record.addTuple(dataPoint); try { - fileNodeManager.insert(record); + fileNodeManager.insert(record, false); } catch (FileNodeManagerException e1) { e1.printStackTrace(); fail(e1.getMessage()); @@ -590,7 +590,7 @@ public class FileNodeManagerMulTest { DataPoint dataPoint = DataPoint.getDataPoint(dataType, measurementId, String.valueOf(5)); record.addTuple(dataPoint); try { - fileNodeManager.insert(record); + fileNodeManager.insert(record, false); } catch (FileNodeManagerException e) { e.printStackTrace(); fail(e.getMessage()); @@ -602,8 +602,8 @@ public class FileNodeManagerMulTest { // merge data try { fileNodeManager.mergeAll(); - fileNodeManager.insert(record); - fileNodeManager.insert(overflow); + fileNodeManager.insert(record, false); + fileNodeManager.insert(overflow, false); // wait end of merge while (!fileNodeManager.closeAll()) { System.out.println("wait to merge end, 1000ms..."); @@ -731,7 +731,7 @@ public class FileNodeManagerMulTest { DataPoint dataPoint = DataPoint.getDataPoint(dataType, measurementId, String.valueOf(5)); record.addTuple(dataPoint); try { - fileNodeManager.insert(record); + fileNodeManager.insert(record, false); } catch (FileNodeManagerException e) { e.printStackTrace(); fail(e.getMessage()); @@ -748,8 +748,8 @@ public class FileNodeManagerMulTest { System.out.println("wait to merge end, 1000ms..."); Thread.sleep(1000); } - fileNodeManager.insert(record); - fileNodeManager.insert(overflow); + fileNodeManager.insert(record, false); + fileNodeManager.insert(overflow, false); fileNodeManager.mergeAll(); // wait end of merge while (!fileNodeManager.closeAll()) { @@ -902,7 +902,7 @@ public class FileNodeManagerMulTest { TSRecord record = new TSRecord(15, deltaObjectId0); DataPoint dataPoint = DataPoint.getDataPoint(dataType, measurementId, String.valueOf(15)); record.addTuple(dataPoint); - fileNodeManager.insert(record); + fileNodeManager.insert(record, false); queryStructure = fileNodeManager.query(deltaObjectId0, measurementId, null, null, null); assertEquals(6, queryStructure.getBufferwriteDataInFiles().size()); assertEquals(OverflowChangeType.MERGING_CHANGE, @@ -950,7 +950,7 @@ public class FileNodeManagerMulTest { DataPoint dataPoint = DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i)); record.addTuple(dataPoint); try { - fileNodeManager.insert(record); + fileNodeManager.insert(record, false); } catch (FileNodeManagerException e) { e.printStackTrace(); fail(e.getMessage()); diff --git a/src/test/java/cn/edu/tsinghua/iotdb/engine/filenode/FileNodeManagerTest.java b/src/test/java/cn/edu/tsinghua/iotdb/engine/filenode/FileNodeManagerTest.java index 494cdd887e8..ebdc8820f70 100644 --- a/src/test/java/cn/edu/tsinghua/iotdb/engine/filenode/FileNodeManagerTest.java +++ b/src/test/java/cn/edu/tsinghua/iotdb/engine/filenode/FileNodeManagerTest.java @@ -706,11 +706,11 @@ public class FileNodeManagerTest { DataPoint dataPoint = DataPoint.getDataPoint(dataType, measurementId, String.valueOf(5)); record.addTuple(dataPoint); try { - fileNodeManager.insert(record); + fileNodeManager.insert(record, false); record = new TSRecord(10, deltaObjectId); dataPoint = DataPoint.getDataPoint(dataType, measurementId, String.valueOf(10)); record.addTuple(dataPoint); - fileNodeManager.insert(record); + fileNodeManager.insert(record, false); } catch (FileNodeManagerException e) { e.printStackTrace(); fail(e.getMessage()); @@ -809,7 +809,7 @@ public class FileNodeManagerTest { DataPoint dataPoint = DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i)); record.addTuple(dataPoint); try { - fManager.insert(record); + fManager.insert(record, false); } catch (FileNodeManagerException e) { e.printStackTrace(); fail(e.getMessage()); @@ -838,7 +838,7 @@ public class FileNodeManagerTest { DataPoint dataPoint = DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i)); record.addTuple(dataPoint); try { - fManager.insert(record); + fManager.insert(record, false); } catch (FileNodeManagerException e) { e.printStackTrace(); fail(e.getMessage()); @@ -853,7 +853,7 @@ public class FileNodeManagerTest { DataPoint dataPoint = DataPoint.getDataPoint(dataType, measurementId, String.valueOf((int) time)); record.addTuple(dataPoint); try { - fManager.insert(record); + fManager.insert(record, false); } catch (FileNodeManagerException e) { e.printStackTrace(); fail(e.getMessage()); diff --git a/src/test/java/cn/edu/tsinghua/iotdb/monitor/MonitorTest.java b/src/test/java/cn/edu/tsinghua/iotdb/monitor/MonitorTest.java index acb56807ec7..a3671edd763 100644 --- a/src/test/java/cn/edu/tsinghua/iotdb/monitor/MonitorTest.java +++ b/src/test/java/cn/edu/tsinghua/iotdb/monitor/MonitorTest.java @@ -16,8 +16,6 @@ import cn.edu.tsinghua.iotdb.conf.TsfileDBDescriptor; import cn.edu.tsinghua.iotdb.engine.filenode.FileNodeManager; import cn.edu.tsinghua.iotdb.exception.FileNodeManagerException; import cn.edu.tsinghua.iotdb.metadata.MManager; -import cn.edu.tsinghua.iotdb.monitor.MonitorConstants; -import cn.edu.tsinghua.iotdb.monitor.StatMonitor; import cn.edu.tsinghua.iotdb.utils.EnvironmentUtils; import cn.edu.tsinghua.tsfile.timeseries.write.record.DataPoint; import cn.edu.tsinghua.tsfile.timeseries.write.record.TSRecord; @@ -39,7 +37,7 @@ public class MonitorTest { EnvironmentUtils.closeMemControl(); EnvironmentUtils.envSetUp(); tsdbconfig.enableStatMonitor = true; - tsdbconfig.backLoopPeriod = 1; + tsdbconfig.backLoopPeriodSec = 1; } @After @@ -82,9 +80,6 @@ public class MonitorTest { // Get stat data and test right HashMap statHashMap = fManager.getAllStatisticsValue(); - Long numInsert = statMonitor.getNumInsert(); - Long numPointsInsert = statMonitor.getNumPointsInsert(); - long numInsertError = statMonitor.getNumInsertError(); String path = fManager.getAllPathForStatistic().get(0); int pos = path.lastIndexOf('.'); @@ -94,16 +89,16 @@ public class MonitorTest { for (DataPoint dataPoint : fTSRecord.dataPointList) { String m = dataPoint.getMeasurementId(); Long v = (Long) dataPoint.getValue(); - System.out.println( m + " measurement, value:" + v); - if (m == "TOTAL_REQ_SUCCESS") { - assertEquals(v, numInsert); + + if (m.equals("TOTAL_REQ_SUCCESS")) { + assertEquals(v, new Long(0)); } if (m.contains("FAIL")) { assertEquals(v, new Long(0)); } else if (m.contains("POINTS")) { - assertEquals(v, numPointsInsert); + assertEquals(v, new Long(0)); } else { - assertEquals(v, numInsert); + assertEquals(v, new Long(0)); } }