mirror of https://github.com/apache/iotdb
Fix issue#260 (#266)
This commit is contained in:
parent
83fdf35be9
commit
134f868df6
|
@ -89,10 +89,24 @@ concurrent_flush_thread=0
|
|||
|
||||
|
||||
# Statistics Monitor configuration
|
||||
# default monitor is enabled, and write statistics info to IoTDB every 5 seconds
|
||||
# Choose to change the back_loop_period >= 1 seconds
|
||||
# Set enable_stat_monitor true(or false) to enable(or disable) the StatMonitor that stores statistics info periodically.
|
||||
# back_loop_period_sec decides the period when StatMonitor writes statistics info into IoTDB.
|
||||
# stat_monitor_detect_freq_sec decides when IoTDB detects statistics info out-of-date.
|
||||
# IoTDB just keeps statistics info within stat_monitor_retain_interval_sec seconds before current time.
|
||||
# Note: IoTDB requires stat_monitor_detect_freq_sec >= 600s and stat_monitor_retain_interval_sec >= 600s.
|
||||
|
||||
# The monitor is enabled by default, and writes statistics info to IoTDB periodically
|
||||
enable_stat_monitor = true
|
||||
back_loop_period = 5
|
||||
|
||||
# The period that StatMonitor stores statistics info
|
||||
back_loop_period_sec = 5
|
||||
|
||||
# The interval at which StatMonitor starts to check whether statistics info can be deleted due to exceeding the retention volume
|
||||
stat_monitor_detect_freq_sec = 600
|
||||
|
||||
# The minimum age of statistics storage information to be eligible for deletion due to age
|
||||
stat_monitor_retain_interval_sec = 600
|
||||
|
||||
|
||||
# When set to false, MemMonitorThread and MemStatisticThread will not be created.
|
||||
enable_mem_monitor=true
|
||||
|
|
|
@ -175,18 +175,27 @@ public class TsfileDBConfig {
|
|||
*/
|
||||
public long smallFlushInterval = 60 * 1000;
|
||||
|
||||
/*
|
||||
* The statMonitor's BackLoop period, 5s is enough
|
||||
/**
|
||||
* The statMonitor writes statistics info into IoTDB every backLoopPeriodSec secs.
|
||||
* Default value is 5s.
|
||||
*/
|
||||
public int backLoopPeriod = 5;
|
||||
public int backLoopPeriodSec = 5;
|
||||
|
||||
/**
|
||||
* Set whether to enable statistics service
|
||||
* Set true to enable statistics monitor service,
|
||||
* false to disable statistics service
|
||||
*/
|
||||
public boolean enableStatMonitor = true;
|
||||
|
||||
/**
|
||||
* the maximum number of writing instances existing in same time.
|
||||
* Set the time interval when StatMonitor performs delete detection, default value is 600s,
|
||||
*/
|
||||
public int statMonitorDetectFreqSec = 60 * 10;
|
||||
|
||||
/**
|
||||
* Set the maximum time to keep monitor statistics information in IoTDB, default value is 600s
|
||||
*/
|
||||
public int statMonitorRetainIntervalSec = 60 * 10;
|
||||
|
||||
|
||||
public TsfileDBConfig() {}
|
||||
|
|
|
@ -68,7 +68,22 @@ public class TsfileDBDescriptor {
|
|||
try {
|
||||
properties.load(inputStream);
|
||||
conf.enableStatMonitor = Boolean.parseBoolean(properties.getProperty("enable_stat_monitor", conf.enableStatMonitor + ""));
|
||||
conf.backLoopPeriod = Integer.parseInt(properties.getProperty("back_loop_period", conf.backLoopPeriod + ""));
|
||||
conf.backLoopPeriodSec = Integer.parseInt(properties.getProperty("back_loop_period_sec", conf.backLoopPeriodSec + ""));
|
||||
int statMonitorDetectFreqSec = Integer.parseInt(properties.getProperty("stat_monitor_detect_freq_sec", conf.statMonitorDetectFreqSec + ""));
|
||||
int statMonitorRetainIntervalSec = Integer.parseInt(properties.getProperty("stat_monitor_retain_interval_sec", conf.statMonitorRetainIntervalSec + ""));
|
||||
// the conf value must > default value, or may cause system unstable
|
||||
if (conf.statMonitorDetectFreqSec < statMonitorDetectFreqSec) {
|
||||
conf.statMonitorDetectFreqSec = statMonitorDetectFreqSec;
|
||||
} else {
|
||||
LOGGER.info("The stat_monitor_detect_freq_sec value is smaller than default, use default value");
|
||||
}
|
||||
|
||||
if (conf.statMonitorRetainIntervalSec < statMonitorRetainIntervalSec) {
|
||||
conf.statMonitorRetainIntervalSec = statMonitorRetainIntervalSec;
|
||||
}else {
|
||||
LOGGER.info("The stat_monitor_retain_interval_sec value is smaller than default, use default value");
|
||||
}
|
||||
|
||||
conf.rpcPort = Integer.parseInt(properties.getProperty("rpc_port",conf.rpcPort+""));
|
||||
|
||||
conf.enableWal = Boolean.parseBoolean(properties.getProperty("enable_wal", conf.enableWal+""));
|
||||
|
|
|
@ -93,6 +93,12 @@ public class FileNodeManager implements IStatistic {
|
|||
}
|
||||
};
|
||||
|
||||
private void updateStatHashMapWhenFail(TSRecord tsRecord) {
|
||||
statParamsHashMap.get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_REQ_FAIL.name())
|
||||
.incrementAndGet();
|
||||
statParamsHashMap.get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_POINTS_FAIL.name())
|
||||
.addAndGet(tsRecord.dataPointList.size());
|
||||
}
|
||||
/**
|
||||
* @return the key represent the params' name, values is AtomicLong type
|
||||
*/
|
||||
|
@ -197,7 +203,7 @@ public class FileNodeManager implements IStatistic {
|
|||
if (TsFileDBConf.enableStatMonitor) {
|
||||
StatMonitor statMonitor = StatMonitor.getInstance();
|
||||
registStatMetadata();
|
||||
statMonitor.registStatistics(getClass().getSimpleName(), this);
|
||||
statMonitor.registStatistics(statStorageDeltaName, this);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -267,12 +273,22 @@ public class FileNodeManager implements IStatistic {
|
|||
}
|
||||
}
|
||||
|
||||
public int insert(TSRecord tsRecord) throws FileNodeManagerException {
|
||||
/**
|
||||
* insert TsRecord into storage group
|
||||
* @param tsRecord: input Data
|
||||
* @param isMonitor: if true the insertion is done by StatMonitor then the Stat Info will not be recorded.
|
||||
* else the statParamsHashMap will be updated
|
||||
* @return an int value represents the insert type
|
||||
* @throws FileNodeManagerException
|
||||
*/
|
||||
public int insert(TSRecord tsRecord, boolean isMonitor) throws FileNodeManagerException {
|
||||
long timestamp = tsRecord.time;
|
||||
String deltaObjectId = tsRecord.deltaObjectId;
|
||||
|
||||
statParamsHashMap.get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_POINTS.name())
|
||||
.addAndGet(tsRecord.dataPointList.size());
|
||||
if (!isMonitor) {
|
||||
statParamsHashMap.get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_POINTS.name())
|
||||
.addAndGet(tsRecord.dataPointList.size());
|
||||
}
|
||||
|
||||
FileNodeProcessor fileNodeProcessor = getProcessor(deltaObjectId, true);
|
||||
int insertType = 0;
|
||||
|
@ -293,10 +309,9 @@ public class FileNodeManager implements IStatistic {
|
|||
String.format("Get the overflow processor failed, the filenode is {}, insert time is {}",
|
||||
filenodeName, timestamp),
|
||||
e);
|
||||
statParamsHashMap.get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_REQ_FAIL.name())
|
||||
.incrementAndGet();
|
||||
statParamsHashMap.get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_POINTS_FAIL.name())
|
||||
.addAndGet(tsRecord.dataPointList.size());
|
||||
if (!isMonitor) {
|
||||
updateStatHashMapWhenFail(tsRecord);
|
||||
}
|
||||
throw new FileNodeManagerException(e);
|
||||
}
|
||||
// write wal
|
||||
|
@ -308,10 +323,9 @@ public class FileNodeManager implements IStatistic {
|
|||
}
|
||||
} catch (IOException | PathErrorException e) {
|
||||
LOGGER.error("Error in write WAL.", e);
|
||||
statParamsHashMap.get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_REQ_FAIL.name())
|
||||
.incrementAndGet();
|
||||
statParamsHashMap.get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_POINTS_FAIL.name())
|
||||
.addAndGet(tsRecord.dataPointList.size());
|
||||
if (!isMonitor) {
|
||||
updateStatHashMapWhenFail(tsRecord);
|
||||
}
|
||||
throw new FileNodeManagerException(e);
|
||||
}
|
||||
// write overflow data
|
||||
|
@ -321,10 +335,9 @@ public class FileNodeManager implements IStatistic {
|
|||
dataPoint.getType(), dataPoint.getValue().toString());
|
||||
} catch (ProcessorException e) {
|
||||
LOGGER.error("Insert into overflow error, the reason is {}", e.getMessage());
|
||||
statParamsHashMap.get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_REQ_FAIL.name())
|
||||
.incrementAndGet();
|
||||
statParamsHashMap.get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_POINTS_FAIL.name())
|
||||
.addAndGet(tsRecord.dataPointList.size());
|
||||
if (!isMonitor) {
|
||||
updateStatHashMapWhenFail(tsRecord);
|
||||
}
|
||||
throw new FileNodeManagerException(e);
|
||||
}
|
||||
}
|
||||
|
@ -340,10 +353,9 @@ public class FileNodeManager implements IStatistic {
|
|||
} catch (FileNodeProcessorException e) {
|
||||
LOGGER.error("Get the bufferwrite processor failed, the filenode is {}, insert time is {}",
|
||||
filenodeName, timestamp);
|
||||
statParamsHashMap.get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_REQ_FAIL.name())
|
||||
.incrementAndGet();
|
||||
statParamsHashMap.get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_POINTS_FAIL.name())
|
||||
.addAndGet(tsRecord.dataPointList.size());
|
||||
if (!isMonitor) {
|
||||
updateStatHashMapWhenFail(tsRecord);
|
||||
}
|
||||
throw new FileNodeManagerException(e);
|
||||
}
|
||||
// Add a new interval file to newfilelist
|
||||
|
@ -353,10 +365,9 @@ public class FileNodeManager implements IStatistic {
|
|||
try {
|
||||
fileNodeProcessor.addIntervalFileNode(timestamp, bufferwriteRelativePath);
|
||||
} catch (Exception e) {
|
||||
statParamsHashMap.get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_REQ_FAIL.name())
|
||||
.incrementAndGet();
|
||||
statParamsHashMap.get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_POINTS_FAIL.name())
|
||||
.addAndGet(tsRecord.dataPointList.size());
|
||||
if (!isMonitor) {
|
||||
updateStatHashMapWhenFail(tsRecord);
|
||||
}
|
||||
throw new FileNodeManagerException(e);
|
||||
}
|
||||
}
|
||||
|
@ -369,20 +380,18 @@ public class FileNodeManager implements IStatistic {
|
|||
}
|
||||
} catch (IOException | PathErrorException e) {
|
||||
LOGGER.error("Error in write WAL.", e);
|
||||
statParamsHashMap.get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_REQ_FAIL.name())
|
||||
.incrementAndGet();
|
||||
statParamsHashMap.get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_POINTS_FAIL.name())
|
||||
.addAndGet(tsRecord.dataPointList.size());
|
||||
if (!isMonitor) {
|
||||
updateStatHashMapWhenFail(tsRecord);
|
||||
}
|
||||
throw new FileNodeManagerException(e);
|
||||
}
|
||||
// Write data
|
||||
try {
|
||||
bufferWriteProcessor.write(tsRecord);
|
||||
} catch (BufferWriteProcessorException e) {
|
||||
statParamsHashMap.get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_REQ_FAIL.name())
|
||||
.incrementAndGet();
|
||||
statParamsHashMap.get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_POINTS_FAIL.name())
|
||||
.addAndGet(tsRecord.dataPointList.size());
|
||||
if (!isMonitor) {
|
||||
updateStatHashMapWhenFail(tsRecord);
|
||||
}
|
||||
throw new FileNodeManagerException(e);
|
||||
}
|
||||
fileNodeProcessor.setIntervalFileNodeStartTime(deltaObjectId, timestamp);
|
||||
|
@ -392,14 +401,17 @@ public class FileNodeManager implements IStatistic {
|
|||
} finally {
|
||||
fileNodeProcessor.writeUnlock();
|
||||
}
|
||||
fileNodeProcessor.getStatParamsHashMap()
|
||||
.get(MonitorConstants.FileNodeProcessorStatConstants.TOTAL_POINTS_SUCCESS.name())
|
||||
.addAndGet(tsRecord.dataPointList.size());
|
||||
fileNodeProcessor.getStatParamsHashMap()
|
||||
.get(MonitorConstants.FileNodeProcessorStatConstants.TOTAL_REQ_SUCCESS.name()).incrementAndGet();
|
||||
statParamsHashMap.get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_REQ_SUCCESS.name()).incrementAndGet();
|
||||
statParamsHashMap.get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_POINTS_SUCCESS.name())
|
||||
.addAndGet(tsRecord.dataPointList.size());
|
||||
//Modify the insert
|
||||
if (!isMonitor) {
|
||||
fileNodeProcessor.getStatParamsHashMap()
|
||||
.get(MonitorConstants.FileNodeProcessorStatConstants.TOTAL_POINTS_SUCCESS.name())
|
||||
.addAndGet(tsRecord.dataPointList.size());
|
||||
fileNodeProcessor.getStatParamsHashMap()
|
||||
.get(MonitorConstants.FileNodeProcessorStatConstants.TOTAL_REQ_SUCCESS.name()).incrementAndGet();
|
||||
statParamsHashMap.get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_REQ_SUCCESS.name()).incrementAndGet();
|
||||
statParamsHashMap.get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_POINTS_SUCCESS.name())
|
||||
.addAndGet(tsRecord.dataPointList.size());
|
||||
}
|
||||
return insertType;
|
||||
}
|
||||
|
||||
|
|
|
@ -4,6 +4,7 @@ import cn.edu.tsinghua.tsfile.timeseries.write.record.TSRecord;
|
|||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
|
||||
public interface IStatistic {
|
||||
|
@ -27,4 +28,10 @@ public interface IStatistic {
|
|||
* the name is the statistics name need to store
|
||||
*/
|
||||
List<String> getAllPathForStatistic();
|
||||
|
||||
/**
|
||||
*
|
||||
* @return a HashMap contains the name and values of the statistics parameters
|
||||
*/
|
||||
HashMap<String, AtomicLong> getStatParamsHashMap();
|
||||
}
|
||||
|
|
|
@ -7,6 +7,8 @@ import cn.edu.tsinghua.iotdb.exception.FileNodeManagerException;
|
|||
import cn.edu.tsinghua.iotdb.exception.MetadataArgsErrorException;
|
||||
import cn.edu.tsinghua.iotdb.exception.PathErrorException;
|
||||
import cn.edu.tsinghua.iotdb.metadata.MManager;
|
||||
import cn.edu.tsinghua.iotdb.utils.IoTDBThreadPoolFactory;
|
||||
import cn.edu.tsinghua.tsfile.file.metadata.enums.TSDataType;
|
||||
import cn.edu.tsinghua.tsfile.timeseries.write.record.DataPoint;
|
||||
import cn.edu.tsinghua.tsfile.timeseries.write.record.TSRecord;
|
||||
import cn.edu.tsinghua.tsfile.timeseries.write.record.datapoint.LongDataPoint;
|
||||
|
@ -14,8 +16,9 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
@ -25,11 +28,16 @@ import java.util.concurrent.atomic.AtomicLong;
|
|||
*/
|
||||
public class StatMonitor {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(StatMonitor.class);
|
||||
private final int backLoopPeriod;
|
||||
|
||||
// key is the store path like FileNodeProcessor.root_stats_xxx.xxx,
|
||||
// or simple name like:FileNodeManager. And value is interface implement
|
||||
// statistics function
|
||||
private long runningTimeMillis = System.currentTimeMillis();
|
||||
private final int backLoopPeriod;
|
||||
private final int statMonitorDetectFreqSec;
|
||||
private final int statMonitorRetainIntervalSec;
|
||||
|
||||
/**
|
||||
* key: is the statistics store path
|
||||
* Value: is an interface that implements statistics function
|
||||
*/
|
||||
private HashMap<String, IStatistic> statisticMap;
|
||||
private ScheduledExecutorService service;
|
||||
|
||||
|
@ -45,7 +53,9 @@ public class StatMonitor {
|
|||
MManager mManager = MManager.getInstance();
|
||||
statisticMap = new HashMap<>();
|
||||
TsfileDBConfig config = TsfileDBDescriptor.getInstance().getConfig();
|
||||
backLoopPeriod = config.backLoopPeriod;
|
||||
statMonitorDetectFreqSec = config.statMonitorDetectFreqSec;
|
||||
statMonitorRetainIntervalSec = config.statMonitorRetainIntervalSec;
|
||||
backLoopPeriod = config.backLoopPeriodSec;
|
||||
try {
|
||||
String prefix = MonitorConstants.statStorageGroupPrefix;
|
||||
|
||||
|
@ -102,8 +112,14 @@ public class StatMonitor {
|
|||
}
|
||||
}
|
||||
|
||||
public void recovery() {
|
||||
// TODO: restore the FildeNode Manager TOTAL_POINTS statistics info
|
||||
|
||||
}
|
||||
|
||||
public void activate() {
|
||||
service = Executors.newScheduledThreadPool(1);
|
||||
|
||||
service = IoTDBThreadPoolFactory.newScheduledThreadPool(1, "StatMonitorService");
|
||||
service.scheduleAtFixedRate(new StatMonitor.statBackLoop(),
|
||||
1, backLoopPeriod, TimeUnit.SECONDS
|
||||
);
|
||||
|
@ -213,7 +229,7 @@ public class StatMonitor {
|
|||
int pointNum;
|
||||
for (Map.Entry<String, TSRecord> entry : tsRecordHashMap.entrySet()) {
|
||||
try {
|
||||
fManager.insert(entry.getValue());
|
||||
fManager.insert(entry.getValue(), true);
|
||||
numInsert.incrementAndGet();
|
||||
pointNum = entry.getValue().dataPointList.size();
|
||||
numPointsInsert.addAndGet(pointNum);
|
||||
|
@ -245,6 +261,27 @@ public class StatMonitor {
|
|||
|
||||
class statBackLoop implements Runnable {
|
||||
public void run() {
|
||||
long currentTimeMillis = System.currentTimeMillis();
|
||||
long seconds = (currentTimeMillis - runningTimeMillis)/1000;
|
||||
if (seconds - statMonitorDetectFreqSec >= 0) {
|
||||
runningTimeMillis = currentTimeMillis;
|
||||
// delete time-series data
|
||||
FileNodeManager fManager = FileNodeManager.getInstance();
|
||||
try {
|
||||
for (Map.Entry<String, IStatistic> entry : statisticMap.entrySet()) {
|
||||
for (String statParamName : entry.getValue().getStatParamsHashMap().keySet()) {
|
||||
fManager.delete(entry.getKey(),
|
||||
statParamName,
|
||||
currentTimeMillis - statMonitorRetainIntervalSec * 1000,
|
||||
TSDataType.INT64
|
||||
);
|
||||
}
|
||||
}
|
||||
}catch (FileNodeManagerException e) {
|
||||
LOGGER.error("Error when delete Statistics information periodically, ", e);
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
HashMap<String, TSRecord> tsRecordHashMap = gatherStatistics();
|
||||
insert(tsRecordHashMap);
|
||||
numBackLoop.incrementAndGet();
|
||||
|
|
|
@ -257,7 +257,7 @@ public class OverflowQPExecutor extends QueryProcessExecutor {
|
|||
TSRecord tsRecord = new TSRecord(timestamp, deltaObjectId);
|
||||
DataPoint dataPoint = DataPoint.getDataPoint(type, measurementId, value);
|
||||
tsRecord.addTuple(dataPoint);
|
||||
return fileNodeManager.insert(tsRecord);
|
||||
return fileNodeManager.insert(tsRecord, false);
|
||||
|
||||
} catch (PathErrorException e) {
|
||||
throw new ProcessorException("Error in insert: " + e.getMessage());
|
||||
|
@ -286,7 +286,7 @@ public class OverflowQPExecutor extends QueryProcessExecutor {
|
|||
DataPoint dataPoint = DataPoint.getDataPoint(dataType, measurementList.get(i), value);
|
||||
tsRecord.addTuple(dataPoint);
|
||||
}
|
||||
return fileNodeManager.insert(tsRecord);
|
||||
return fileNodeManager.insert(tsRecord, false);
|
||||
|
||||
} catch (PathErrorException | FileNodeManagerException e) {
|
||||
throw new ProcessorException(e.getMessage());
|
||||
|
|
|
@ -86,7 +86,10 @@ public class IoTDB implements IoTDBMBean {
|
|||
}
|
||||
|
||||
initFileNodeManager();
|
||||
enableStatMonitor();
|
||||
|
||||
// When registering statMonitor, we should start recovering some statistics with latest values stored
|
||||
// Warn: registMonitor() method should be called before systemDataRecovery()
|
||||
registStatMonitor();
|
||||
systemDataRecovery();
|
||||
|
||||
maybeInitJmx();
|
||||
|
@ -94,6 +97,8 @@ public class IoTDB implements IoTDBMBean {
|
|||
registMonitor();
|
||||
registIoTDBServer();
|
||||
startCloseAndMergeServer();
|
||||
// StatMonitor should start at the end
|
||||
enableStatMonitor();
|
||||
}
|
||||
|
||||
private void maybeInitJmx() {
|
||||
|
@ -110,6 +115,13 @@ public class IoTDB implements IoTDBMBean {
|
|||
}
|
||||
}
|
||||
|
||||
private void registStatMonitor() {
|
||||
if (TsfileDBDescriptor.getInstance().getConfig().enableStatMonitor){
|
||||
statMonitor = StatMonitor.getInstance();
|
||||
statMonitor.recovery();
|
||||
}
|
||||
}
|
||||
|
||||
private void registMonitor() throws MalformedObjectNameException, InstanceAlreadyExistsException,
|
||||
MBeanRegistrationException, NotCompliantMBeanException {
|
||||
monitorMBean = new Monitor();
|
||||
|
|
|
@ -1,11 +1,13 @@
|
|||
package cn.edu.tsinghua.iotdb.service;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
import cn.edu.tsinghua.iotdb.engine.filenode.FileNodeManager;
|
||||
import cn.edu.tsinghua.iotdb.exception.FileNodeManagerException;
|
||||
import cn.edu.tsinghua.iotdb.exception.PathErrorException;
|
||||
import cn.edu.tsinghua.iotdb.metadata.MManager;
|
||||
import cn.edu.tsinghua.iotdb.monitor.MonitorConstants;
|
||||
import cn.edu.tsinghua.iotdb.qp.physical.crud.DeletePlan;
|
||||
import cn.edu.tsinghua.iotdb.qp.physical.crud.InsertPlan;
|
||||
import cn.edu.tsinghua.iotdb.qp.physical.crud.UpdatePlan;
|
||||
|
@ -40,7 +42,15 @@ class WriteLogRecovery {
|
|||
DataPoint dataPoint = DataPoint.getDataPoint(dataType, measurementList.get(i), value);
|
||||
tsRecord.addTuple(dataPoint);
|
||||
}
|
||||
FileNodeManager.getInstance().insert(tsRecord);
|
||||
String fileName = MManager.getInstance().getFileNameByPath(deltaObject);
|
||||
// When Wal restores the statistics info, need to set isMonitor true for the insert method to stop
|
||||
// collecting statistics data of storage group whose name is "root.stats". Because system will not
|
||||
// Record statistics data for "root.stats" storage group
|
||||
if (MonitorConstants.statStorageGroupPrefix.equals(fileName)) {
|
||||
FileNodeManager.getInstance().insert(tsRecord, true);
|
||||
} else {
|
||||
FileNodeManager.getInstance().insert(tsRecord, false);
|
||||
}
|
||||
}
|
||||
|
||||
static void update(UpdatePlan updatePlan) throws FileNodeManagerException, PathErrorException {
|
||||
|
|
|
@ -130,7 +130,7 @@ public class LoadDataUtils {
|
|||
}
|
||||
// appeared before, insert directly
|
||||
try {
|
||||
fileNodeManager.insert(record);
|
||||
fileNodeManager.insert(record, false);
|
||||
} catch (FileNodeManagerException e) {
|
||||
LOG.error("failed when insert into fileNodeManager, record:{}, reason:{}", line, e.getMessage());
|
||||
}
|
||||
|
|
|
@ -210,7 +210,7 @@ public class FileNodeManagerMulTest {
|
|||
DataPoint dataPoint = DataPoint.getDataPoint(dataType, measurementId, String.valueOf(5));
|
||||
record.addTuple(dataPoint);
|
||||
try {
|
||||
fileNodeManager.insert(record);
|
||||
fileNodeManager.insert(record, false);
|
||||
} catch (FileNodeManagerException e) {
|
||||
fail(e.getMessage());
|
||||
e.printStackTrace();
|
||||
|
@ -219,7 +219,7 @@ public class FileNodeManagerMulTest {
|
|||
record = new TSRecord(5, deltaObjectId2);
|
||||
record.addTuple(dataPoint);
|
||||
try {
|
||||
fileNodeManager.insert(record);
|
||||
fileNodeManager.insert(record, false);
|
||||
} catch (FileNodeManagerException e) {
|
||||
e.printStackTrace();
|
||||
fail(e.getMessage());
|
||||
|
@ -229,7 +229,7 @@ public class FileNodeManagerMulTest {
|
|||
record = new TSRecord(65, deltaObjectId1);
|
||||
record.addTuple(dataPoint);
|
||||
try {
|
||||
fileNodeManager.insert(record);
|
||||
fileNodeManager.insert(record, false);
|
||||
} catch (FileNodeManagerException e) {
|
||||
e.printStackTrace();
|
||||
fail(e.getMessage());
|
||||
|
@ -394,13 +394,13 @@ public class FileNodeManagerMulTest {
|
|||
DataPoint dataPoint = DataPoint.getDataPoint(dataType, measurementId, String.valueOf(5));
|
||||
record.addTuple(dataPoint);
|
||||
try {
|
||||
fileNodeManager.insert(record);
|
||||
fileNodeManager.insert(record, false);
|
||||
record = new TSRecord(5, deltaObjectId1);
|
||||
record.addTuple(dataPoint);
|
||||
fileNodeManager.insert(record);
|
||||
fileNodeManager.insert(record, false);
|
||||
record = new TSRecord(10, deltaObjectId2);
|
||||
record.addTuple(dataPoint);
|
||||
fileNodeManager.insert(record);
|
||||
fileNodeManager.insert(record, false);
|
||||
// query check
|
||||
QueryStructure queryStructure = fileNodeManager.query(deltaObjectId2, measurementId, null, null, null);
|
||||
DynamicOneColumnData insert = (DynamicOneColumnData) queryStructure.getAllOverflowData().get(0);
|
||||
|
@ -476,13 +476,13 @@ public class FileNodeManagerMulTest {
|
|||
DataPoint dataPoint = DataPoint.getDataPoint(dataType, measurementId, String.valueOf(5));
|
||||
record.addTuple(dataPoint);
|
||||
try {
|
||||
fileNodeManager.insert(record);
|
||||
fileNodeManager.insert(record, false);
|
||||
record = new TSRecord(5, deltaObjectId1);
|
||||
record.addTuple(dataPoint);
|
||||
fileNodeManager.insert(record);
|
||||
fileNodeManager.insert(record, false);
|
||||
record = new TSRecord(10, deltaObjectId2);
|
||||
record.addTuple(dataPoint);
|
||||
fileNodeManager.insert(record);
|
||||
fileNodeManager.insert(record, false);
|
||||
// query check
|
||||
QueryStructure queryStructure = fileNodeManager.query(deltaObjectId2, measurementId, null, null, null);
|
||||
DynamicOneColumnData insert = (DynamicOneColumnData) queryStructure.getAllOverflowData().get(0);
|
||||
|
@ -497,7 +497,7 @@ public class FileNodeManagerMulTest {
|
|||
record = new TSRecord(20, deltaObjectId2);
|
||||
record.addTuple(dataPoint);
|
||||
try {
|
||||
fileNodeManager.insert(record);
|
||||
fileNodeManager.insert(record, false);
|
||||
} catch (FileNodeManagerException e1) {
|
||||
e1.printStackTrace();
|
||||
fail(e1.getMessage());
|
||||
|
@ -590,7 +590,7 @@ public class FileNodeManagerMulTest {
|
|||
DataPoint dataPoint = DataPoint.getDataPoint(dataType, measurementId, String.valueOf(5));
|
||||
record.addTuple(dataPoint);
|
||||
try {
|
||||
fileNodeManager.insert(record);
|
||||
fileNodeManager.insert(record, false);
|
||||
} catch (FileNodeManagerException e) {
|
||||
e.printStackTrace();
|
||||
fail(e.getMessage());
|
||||
|
@ -602,8 +602,8 @@ public class FileNodeManagerMulTest {
|
|||
// merge data
|
||||
try {
|
||||
fileNodeManager.mergeAll();
|
||||
fileNodeManager.insert(record);
|
||||
fileNodeManager.insert(overflow);
|
||||
fileNodeManager.insert(record, false);
|
||||
fileNodeManager.insert(overflow, false);
|
||||
// wait end of merge
|
||||
while (!fileNodeManager.closeAll()) {
|
||||
System.out.println("wait to merge end, 1000ms...");
|
||||
|
@ -731,7 +731,7 @@ public class FileNodeManagerMulTest {
|
|||
DataPoint dataPoint = DataPoint.getDataPoint(dataType, measurementId, String.valueOf(5));
|
||||
record.addTuple(dataPoint);
|
||||
try {
|
||||
fileNodeManager.insert(record);
|
||||
fileNodeManager.insert(record, false);
|
||||
} catch (FileNodeManagerException e) {
|
||||
e.printStackTrace();
|
||||
fail(e.getMessage());
|
||||
|
@ -748,8 +748,8 @@ public class FileNodeManagerMulTest {
|
|||
System.out.println("wait to merge end, 1000ms...");
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
fileNodeManager.insert(record);
|
||||
fileNodeManager.insert(overflow);
|
||||
fileNodeManager.insert(record, false);
|
||||
fileNodeManager.insert(overflow, false);
|
||||
fileNodeManager.mergeAll();
|
||||
// wait end of merge
|
||||
while (!fileNodeManager.closeAll()) {
|
||||
|
@ -902,7 +902,7 @@ public class FileNodeManagerMulTest {
|
|||
TSRecord record = new TSRecord(15, deltaObjectId0);
|
||||
DataPoint dataPoint = DataPoint.getDataPoint(dataType, measurementId, String.valueOf(15));
|
||||
record.addTuple(dataPoint);
|
||||
fileNodeManager.insert(record);
|
||||
fileNodeManager.insert(record, false);
|
||||
queryStructure = fileNodeManager.query(deltaObjectId0, measurementId, null, null, null);
|
||||
assertEquals(6, queryStructure.getBufferwriteDataInFiles().size());
|
||||
assertEquals(OverflowChangeType.MERGING_CHANGE,
|
||||
|
@ -950,7 +950,7 @@ public class FileNodeManagerMulTest {
|
|||
DataPoint dataPoint = DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i));
|
||||
record.addTuple(dataPoint);
|
||||
try {
|
||||
fileNodeManager.insert(record);
|
||||
fileNodeManager.insert(record, false);
|
||||
} catch (FileNodeManagerException e) {
|
||||
e.printStackTrace();
|
||||
fail(e.getMessage());
|
||||
|
|
|
@ -706,11 +706,11 @@ public class FileNodeManagerTest {
|
|||
DataPoint dataPoint = DataPoint.getDataPoint(dataType, measurementId, String.valueOf(5));
|
||||
record.addTuple(dataPoint);
|
||||
try {
|
||||
fileNodeManager.insert(record);
|
||||
fileNodeManager.insert(record, false);
|
||||
record = new TSRecord(10, deltaObjectId);
|
||||
dataPoint = DataPoint.getDataPoint(dataType, measurementId, String.valueOf(10));
|
||||
record.addTuple(dataPoint);
|
||||
fileNodeManager.insert(record);
|
||||
fileNodeManager.insert(record, false);
|
||||
} catch (FileNodeManagerException e) {
|
||||
e.printStackTrace();
|
||||
fail(e.getMessage());
|
||||
|
@ -809,7 +809,7 @@ public class FileNodeManagerTest {
|
|||
DataPoint dataPoint = DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i));
|
||||
record.addTuple(dataPoint);
|
||||
try {
|
||||
fManager.insert(record);
|
||||
fManager.insert(record, false);
|
||||
} catch (FileNodeManagerException e) {
|
||||
e.printStackTrace();
|
||||
fail(e.getMessage());
|
||||
|
@ -838,7 +838,7 @@ public class FileNodeManagerTest {
|
|||
DataPoint dataPoint = DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i));
|
||||
record.addTuple(dataPoint);
|
||||
try {
|
||||
fManager.insert(record);
|
||||
fManager.insert(record, false);
|
||||
} catch (FileNodeManagerException e) {
|
||||
e.printStackTrace();
|
||||
fail(e.getMessage());
|
||||
|
@ -853,7 +853,7 @@ public class FileNodeManagerTest {
|
|||
DataPoint dataPoint = DataPoint.getDataPoint(dataType, measurementId, String.valueOf((int) time));
|
||||
record.addTuple(dataPoint);
|
||||
try {
|
||||
fManager.insert(record);
|
||||
fManager.insert(record, false);
|
||||
} catch (FileNodeManagerException e) {
|
||||
e.printStackTrace();
|
||||
fail(e.getMessage());
|
||||
|
|
|
@ -16,8 +16,6 @@ import cn.edu.tsinghua.iotdb.conf.TsfileDBDescriptor;
|
|||
import cn.edu.tsinghua.iotdb.engine.filenode.FileNodeManager;
|
||||
import cn.edu.tsinghua.iotdb.exception.FileNodeManagerException;
|
||||
import cn.edu.tsinghua.iotdb.metadata.MManager;
|
||||
import cn.edu.tsinghua.iotdb.monitor.MonitorConstants;
|
||||
import cn.edu.tsinghua.iotdb.monitor.StatMonitor;
|
||||
import cn.edu.tsinghua.iotdb.utils.EnvironmentUtils;
|
||||
import cn.edu.tsinghua.tsfile.timeseries.write.record.DataPoint;
|
||||
import cn.edu.tsinghua.tsfile.timeseries.write.record.TSRecord;
|
||||
|
@ -39,7 +37,7 @@ public class MonitorTest {
|
|||
EnvironmentUtils.closeMemControl();
|
||||
EnvironmentUtils.envSetUp();
|
||||
tsdbconfig.enableStatMonitor = true;
|
||||
tsdbconfig.backLoopPeriod = 1;
|
||||
tsdbconfig.backLoopPeriodSec = 1;
|
||||
}
|
||||
|
||||
@After
|
||||
|
@ -82,9 +80,6 @@ public class MonitorTest {
|
|||
// Get stat data and test right
|
||||
|
||||
HashMap<String, TSRecord> statHashMap = fManager.getAllStatisticsValue();
|
||||
Long numInsert = statMonitor.getNumInsert();
|
||||
Long numPointsInsert = statMonitor.getNumPointsInsert();
|
||||
long numInsertError = statMonitor.getNumInsertError();
|
||||
|
||||
String path = fManager.getAllPathForStatistic().get(0);
|
||||
int pos = path.lastIndexOf('.');
|
||||
|
@ -94,16 +89,16 @@ public class MonitorTest {
|
|||
for (DataPoint dataPoint : fTSRecord.dataPointList) {
|
||||
String m = dataPoint.getMeasurementId();
|
||||
Long v = (Long) dataPoint.getValue();
|
||||
System.out.println( m + " measurement, value:" + v);
|
||||
if (m == "TOTAL_REQ_SUCCESS") {
|
||||
assertEquals(v, numInsert);
|
||||
|
||||
if (m.equals("TOTAL_REQ_SUCCESS")) {
|
||||
assertEquals(v, new Long(0));
|
||||
}
|
||||
if (m.contains("FAIL")) {
|
||||
assertEquals(v, new Long(0));
|
||||
} else if (m.contains("POINTS")) {
|
||||
assertEquals(v, numPointsInsert);
|
||||
assertEquals(v, new Long(0));
|
||||
} else {
|
||||
assertEquals(v, numInsert);
|
||||
assertEquals(v, new Long(0));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue