add configurations of small flush and MemMonitor (#264)

This commit is contained in:
jt2594838 2018-01-05 10:32:20 +08:00 committed by XuYi
parent c92bb48d02
commit db9683522c
8 changed files with 70 additions and 21 deletions

View File

@ -94,3 +94,11 @@ concurrent_flush_thread=0
enable_stat_monitor = true
back_loop_period = 5
# When set to false, MemMonitorThread and MemStatisticThread will not be created.
enable_mem_monitor=true;
# When set to true, small flush will be triggered periodically even if memory threshold is not exceeded.
enable_small_flush=false;
# The interval of small flush in ms.
small_flush_interval=60000;

View File

@ -160,6 +160,21 @@ public class TsfileDBConfig {
public long overflowFileSizeThreshold = 2 * 1024 * 1024 * 1024L;
/**
* When set to false, MemMonitorThread and MemStatisticThread will not be created.
*/
public boolean enableMemMonitor = true;
/**
* When set to true, small flush will be triggered periodically even if memory threshold is not exceeded.
*/
public boolean enableSmallFlush = false;
/**
* The interval of small flush in ms.
*/
public long smallFlushInterval = 60 * 1000;
/*
* The statMonitor's BackLoop period, 5s is enough
*/

View File

@ -111,6 +111,10 @@ public class TsfileDBDescriptor {
if(conf.concurrentFlushThread <= 0)
conf.concurrentFlushThread = Runtime.getRuntime().availableProcessors();
conf.enableMemMonitor = Boolean.parseBoolean(properties.getProperty("enable_mem_monitor", conf.enableMemMonitor + "").trim());
conf.enableSmallFlush = Boolean.parseBoolean(properties.getProperty("enable_small_flush", conf.enableSmallFlush + "").trim());
conf.smallFlushInterval = Long.parseLong(properties.getProperty("small_flush_interval", conf.smallFlushInterval + "").trim());
String tmpTimeZone = properties.getProperty("time_zone", conf.timeZone.getID());
try {
conf.timeZone = DateTimeZone.forID(tmpTimeZone.trim());

View File

@ -22,10 +22,12 @@ public abstract class BasicMemController {
BasicMemController(TsfileDBConfig config) {
warningThreshold = config.memThresholdWarning;
dangerouseThreshold = config.memThresholdDangerous;
monitorThread = new MemMonitorThread(config.memMonitorInterval);
monitorThread.start();
memStatisticThread = new MemStatisticThread();
memStatisticThread.start();
if(config.enableMemMonitor) {
monitorThread = new MemMonitorThread(config);
monitorThread.start();
memStatisticThread = new MemStatisticThread();
memStatisticThread.start();
}
}
// change instance here
@ -48,7 +50,8 @@ public abstract class BasicMemController {
}
public void setCheckInterval(long checkInterval) {
this.monitorThread.setCheckInterval(checkInterval);
if(this.monitorThread != null)
this.monitorThread.setCheckInterval(checkInterval);
}
public abstract long getTotalUsage();

View File

@ -1,5 +1,6 @@
package cn.edu.tsinghua.iotdb.engine.memcontrol;
import cn.edu.tsinghua.iotdb.conf.TsfileDBDescriptor;
import cn.edu.tsinghua.iotdb.engine.filenode.FileNodeManager;
import cn.edu.tsinghua.iotdb.utils.MemUtils;
import org.slf4j.Logger;
@ -11,6 +12,7 @@ import org.slf4j.LoggerFactory;
public class FlushPartialPolicy implements Policy{
private Logger logger = LoggerFactory.getLogger(FlushPartialPolicy.class);
private Thread workerThread;
private long sleepInterval = TsfileDBDescriptor.getInstance().getConfig().smallFlushInterval;
@Override
public void execute() {
@ -20,19 +22,25 @@ public class FlushPartialPolicy implements Policy{
MemUtils.bytesCntToStr(Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()));
// use a thread to avoid blocking
if (workerThread == null) {
workerThread = new Thread(() -> {
FileNodeManager.getInstance().forceFlush(BasicMemController.UsageLevel.SAFE);
});
workerThread = createWorkerThread();
workerThread.start();
} else {
if (workerThread.isAlive()) {
logger.info("Last flush is ongoing...");
} else {
workerThread = new Thread(() -> {
FileNodeManager.getInstance().forceFlush(BasicMemController.UsageLevel.SAFE);
});
workerThread = createWorkerThread();
workerThread.start();
}
}
}
private Thread createWorkerThread() {
return new Thread(() -> {
FileNodeManager.getInstance().forceFlush(BasicMemController.UsageLevel.SAFE);
try {
Thread.sleep(sleepInterval);
} catch (InterruptedException ignored) {
}
},"IoTDB-FlushPartialPolicy-Thread");
}
}

View File

@ -17,21 +17,22 @@ public class ForceFLushAllPolicy implements Policy {
MemUtils.bytesCntToStr(Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()));
// use a thread to avoid blocking
if (workerThread == null) {
workerThread = new Thread(() -> {
FileNodeManager.getInstance().forceFlush(BasicMemController.UsageLevel.DANGEROUS);
System.gc();
});
workerThread = createWorkerThread();
workerThread.start();
} else {
if (workerThread.isAlive()) {
logger.info("Last flush is ongoing...");
} else {
workerThread = new Thread(() -> {
FileNodeManager.getInstance().forceFlush(BasicMemController.UsageLevel.DANGEROUS);
System.gc();
});
workerThread = createWorkerThread();
workerThread.start();
}
}
}
private Thread createWorkerThread() {
return new Thread(() -> {
FileNodeManager.getInstance().forceFlush(BasicMemController.UsageLevel.DANGEROUS);
System.gc();
},"IoTDB-ForceFlushAllPolicy-thread");
}
}

View File

@ -1,5 +1,6 @@
package cn.edu.tsinghua.iotdb.engine.memcontrol;
import cn.edu.tsinghua.iotdb.conf.TsfileDBConfig;
import cn.edu.tsinghua.iotdb.engine.filenode.FileNodeManager;
import cn.edu.tsinghua.iotdb.utils.MemUtils;
import org.slf4j.Logger;
@ -15,9 +16,14 @@ public class MemMonitorThread extends Thread {
private Policy warningPolicy;
private Policy dangerousPolicy;
public MemMonitorThread(long checkInterval) {
public MemMonitorThread(TsfileDBConfig config) {
this.setName("IoTDB-MemMonitor-thread");
long checkInterval = config.memMonitorInterval;
this.checkInterval = checkInterval > 0 ? checkInterval : this.checkInterval;
this.safePolicy = new FlushPartialPolicy();
if(config.enableSmallFlush)
this.safePolicy = new FlushPartialPolicy();
else
this.safePolicy = new NoActPolicy();
this.warningPolicy = new ForceFLushAllPolicy();
this.dangerousPolicy = new ForceFLushAllPolicy();
}

View File

@ -21,6 +21,10 @@ public class MemStatisticThread extends Thread{
// log statistic every so many intervals
private int reportCycle = 60;
public MemStatisticThread() {
this.setName("IoTDB-MemStatistic-thread");
}
@Override
public void run() {
logger.info("MemStatisticThread started");