Spill SharedTLog when there's more than one.

When switching between spill_type or log_version, a new instance of a
SharedTLog is created in the transaction log processes.  If this is done
in a saturated database, then doubling the amount of memory to hold
mutations in memory can cause TLogs to be uncomfortably close to the 8GB
OOM limit.

Instead, we now thread which UID of a SharedTLog is active, and the
other TLog spill out the majority of their mutations.

This is a backport of #2213 (fef89aa1) to release-6.2
This commit is contained in:
Alex Miller 2019-07-29 23:40:28 -07:00
parent 41ddb80c88
commit 1eb3a70b96
4 changed files with 102 additions and 32 deletions

View File

@ -262,6 +262,7 @@ struct TLogData : NonCopyable {
int64_t instanceID;
int64_t bytesInput;
int64_t bytesDurable;
int64_t targetVolatileBytes; // The number of bytes of mutations this TLog should hold in memory before spilling.
int64_t overheadBytesInput;
int64_t overheadBytesDurable;
@ -288,7 +289,7 @@ struct TLogData : NonCopyable {
: dbgid(dbgid), instanceID(deterministicRandom()->randomUniqueID().first()),
persistentData(persistentData), rawPersistentQueue(persistentQueue), persistentQueue(new TLogQueue(persistentQueue, dbgid)),
dbInfo(dbInfo), degraded(degraded), queueCommitBegin(0), queueCommitEnd(0),
diskQueueCommitBytes(0), largeDiskQueueCommitBytes(false), bytesInput(0), bytesDurable(0), overheadBytesInput(0), overheadBytesDurable(0),
diskQueueCommitBytes(0), largeDiskQueueCommitBytes(false), bytesInput(0), bytesDurable(0), targetVolatileBytes(SERVER_KNOBS->TLOG_SPILL_THRESHOLD), overheadBytesInput(0), overheadBytesDurable(0),
concurrentLogRouterReads(SERVER_KNOBS->CONCURRENT_LOG_ROUTER_READS),
ignorePopRequest(false), ignorePopDeadline(), ignorePopUid(), dataFolder(folder), toBePopped()
{
@ -697,7 +698,7 @@ ACTOR Future<Void> updateStorage( TLogData* self ) {
state FlowLock::Releaser commitLockReleaser;
if(logData->stopped) {
if (self->bytesInput - self->bytesDurable >= SERVER_KNOBS->TLOG_SPILL_THRESHOLD) {
if (self->bytesInput - self->bytesDurable >= self->targetVolatileBytes) {
while(logData->persistentDataDurableVersion != logData->version.get()) {
totalSize = 0;
Map<Version, std::pair<int,int>>::iterator sizeItr = logData->version_sizes.begin();
@ -742,7 +743,7 @@ ACTOR Future<Void> updateStorage( TLogData* self ) {
} else {
Map<Version, std::pair<int,int>>::iterator sizeItr = logData->version_sizes.begin();
while( totalSize < SERVER_KNOBS->UPDATE_STORAGE_BYTE_LIMIT && sizeItr != logData->version_sizes.end()
&& (logData->bytesInput.getValue() - logData->bytesDurable.getValue() - totalSize >= SERVER_KNOBS->TLOG_SPILL_THRESHOLD || sizeItr->value.first == 0) )
&& (logData->bytesInput.getValue() - logData->bytesDurable.getValue() - totalSize >= self->targetVolatileBytes || sizeItr->value.first == 0) )
{
totalSize += sizeItr->value.first + sizeItr->value.second;
++sizeItr;
@ -2312,8 +2313,18 @@ ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, Localit
return Void();
}
ACTOR Future<Void> startSpillingInTenSeconds(TLogData* self, UID tlogId, Reference<AsyncVar<UID>> activeSharedTLog) {
wait(delay(10));
if (activeSharedTLog->get() != tlogId) {
// TODO: This should fully spill, but currently doing so will cause us to no longer update poppedVersion
// and QuietDatabase will hang thinking our TLog is behind.
self->targetVolatileBytes = SERVER_KNOBS->REFERENCE_SPILL_UPDATE_STORAGE_BYTE_LIMIT * 2;
}
return Void();
}
// New tLog (if !recoverFrom.size()) or restore from network
ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQueue, Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality, PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, bool restoreFromDisk, Promise<Void> oldLog, Promise<Void> recovered, std::string folder, Reference<AsyncVar<bool>> degraded) {
ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQueue, Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality, PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, bool restoreFromDisk, Promise<Void> oldLog, Promise<Void> recovered, std::string folder, Reference<AsyncVar<bool>> degraded, Reference<AsyncVar<UID>> activeSharedTLog) {
state TLogData self( tlogId, persistentData, persistentQueue, db, degraded, folder );
state Future<Void> error = actorCollection( self.sharedActors.getFuture() );
@ -2346,6 +2357,13 @@ ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQ
}
}
when ( wait( error ) ) { throw internal_error(); }
when ( wait( activeSharedTLog->onChange() ) ) {
if (activeSharedTLog->get() == tlogId) {
self.targetVolatileBytes = SERVER_KNOBS->TLOG_SPILL_THRESHOLD;
} else {
self.sharedActors.send( startSpillingInTenSeconds(&self, tlogId, activeSharedTLog) );
}
}
}
}
} catch (Error& e) {

View File

@ -312,6 +312,7 @@ struct TLogData : NonCopyable {
int64_t instanceID;
int64_t bytesInput;
int64_t bytesDurable;
int64_t targetVolatileBytes; // The number of bytes of mutations this TLog should hold in memory before spilling.
int64_t overheadBytesInput;
int64_t overheadBytesDurable;
@ -339,7 +340,7 @@ struct TLogData : NonCopyable {
: dbgid(dbgid), instanceID(deterministicRandom()->randomUniqueID().first()),
persistentData(persistentData), rawPersistentQueue(persistentQueue), persistentQueue(new TLogQueue(persistentQueue, dbgid)),
dbInfo(dbInfo), degraded(degraded), queueCommitBegin(0), queueCommitEnd(0),
diskQueueCommitBytes(0), largeDiskQueueCommitBytes(false), bytesInput(0), bytesDurable(0), overheadBytesInput(0), overheadBytesDurable(0),
diskQueueCommitBytes(0), largeDiskQueueCommitBytes(false), bytesInput(0), bytesDurable(0), targetVolatileBytes(SERVER_KNOBS->TLOG_SPILL_THRESHOLD), overheadBytesInput(0), overheadBytesDurable(0),
peekMemoryLimiter(SERVER_KNOBS->TLOG_SPILL_REFERENCE_MAX_PEEK_MEMORY_BYTES),
concurrentLogRouterReads(SERVER_KNOBS->CONCURRENT_LOG_ROUTER_READS),
ignorePopRequest(false), ignorePopDeadline(), ignorePopUid(), dataFolder(folder), toBePopped()
@ -952,7 +953,7 @@ ACTOR Future<Void> updateStorage( TLogData* self ) {
state FlowLock::Releaser commitLockReleaser;
if(logData->stopped) {
if (self->bytesInput - self->bytesDurable >= SERVER_KNOBS->TLOG_SPILL_THRESHOLD) {
if (self->bytesInput - self->bytesDurable >= self->targetVolatileBytes) {
while(logData->persistentDataDurableVersion != logData->version.get()) {
totalSize = 0;
Map<Version, std::pair<int,int>>::iterator sizeItr = logData->version_sizes.begin();
@ -1000,10 +1001,12 @@ ACTOR Future<Void> updateStorage( TLogData* self ) {
if(logData->version_sizes.empty()) {
nextVersion = logData->version.get();
} else {
// Double check that a running TLog wasn't wrongly affected by spilling locked SharedTLogs.
ASSERT_WE_THINK(self->targetVolatileBytes == SERVER_KNOBS->TLOG_SPILL_THRESHOLD);
Map<Version, std::pair<int,int>>::iterator sizeItr = logData->version_sizes.begin();
while( totalSize < SERVER_KNOBS->REFERENCE_SPILL_UPDATE_STORAGE_BYTE_LIMIT &&
sizeItr != logData->version_sizes.end()
&& (logData->bytesInput.getValue() - logData->bytesDurable.getValue() - totalSize >= SERVER_KNOBS->TLOG_SPILL_THRESHOLD || sizeItr->value.first == 0) )
&& (logData->bytesInput.getValue() - logData->bytesDurable.getValue() - totalSize >= self->targetVolatileBytes || sizeItr->value.first == 0) )
{
totalSize += sizeItr->value.first + sizeItr->value.second;
++sizeItr;
@ -2593,20 +2596,10 @@ ACTOR Future<Void> updateLogSystem(TLogData* self, Reference<LogData> logData, L
}
}
ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, LocalityData locality ) {
state TLogInterface recruited(self->dbgid, locality);
recruited.initEndpoints();
DUMPTOKEN( recruited.peekMessages );
DUMPTOKEN( recruited.popMessages );
DUMPTOKEN( recruited.commit );
DUMPTOKEN( recruited.lock );
DUMPTOKEN( recruited.getQueuingMetrics );
DUMPTOKEN( recruited.confirmRunning );
void stopAllTLogs( TLogData* self, UID newLogId ) {
for(auto it : self->id_data) {
if( !it.second->stopped ) {
TraceEvent("TLogStoppedByNewRecruitment", self->dbgid).detail("LogId", it.second->logId).detail("StoppedId", it.first.toString()).detail("RecruitedId", recruited.id()).detail("EndEpoch", it.second->logSystem->get().getPtr() != 0);
TraceEvent("TLogStoppedByNewRecruitment", self->dbgid).detail("LogId", it.second->logId).detail("StoppedId", it.first.toString()).detail("RecruitedId", newLogId).detail("EndEpoch", it.second->logSystem->get().getPtr() != 0);
if(!it.second->isPrimary && it.second->logSystem->get()) {
it.second->removed = it.second->removed && it.second->logSystem->get()->endEpoch();
}
@ -2620,6 +2613,21 @@ ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, Localit
}
it.second->stopCommit.trigger();
}
}
// Start the tLog role for a worker
ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, LocalityData locality ) {
state TLogInterface recruited(self->dbgid, locality);
recruited.initEndpoints();
DUMPTOKEN( recruited.peekMessages );
DUMPTOKEN( recruited.popMessages );
DUMPTOKEN( recruited.commit );
DUMPTOKEN( recruited.lock );
DUMPTOKEN( recruited.getQueuingMetrics );
DUMPTOKEN( recruited.confirmRunning );
stopAllTLogs(self, recruited.id());
state Reference<LogData> logData = Reference<LogData>( new LogData(self, recruited, req.remoteTag, req.isPrimary, req.logRouterTags, req.txsTags, req.recruitmentID, currentProtocolVersion, req.allTags) );
self->id_data[recruited.id()] = logData;
@ -2736,8 +2744,21 @@ ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, Localit
return Void();
}
ACTOR Future<Void> startSpillingInTenSeconds(TLogData* self, UID tlogId, Reference<AsyncVar<UID>> activeSharedTLog) {
wait(delay(10));
if (activeSharedTLog->get() != tlogId) {
// TODO: This should fully spill, but currently doing so will cause us to no longer update poppedVersion
// and QuietDatabase will hang thinking our TLog is behind.
TraceEvent("SharedTLogBeginSpilling", self->dbgid).detail("NowActive", activeSharedTLog->get());
self->targetVolatileBytes = SERVER_KNOBS->REFERENCE_SPILL_UPDATE_STORAGE_BYTE_LIMIT * 2;
} else {
TraceEvent("SharedTLogSkipSpilling", self->dbgid).detail("NowActive", activeSharedTLog->get());
}
return Void();
}
// New tLog (if !recoverFrom.size()) or restore from network
ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQueue, Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality, PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, bool restoreFromDisk, Promise<Void> oldLog, Promise<Void> recovered, std::string folder, Reference<AsyncVar<bool>> degraded ) {
ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQueue, Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality, PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, bool restoreFromDisk, Promise<Void> oldLog, Promise<Void> recovered, std::string folder, Reference<AsyncVar<bool>> degraded, Reference<AsyncVar<UID>> activeSharedTLog ) {
state TLogData self( tlogId, persistentData, persistentQueue, db, degraded, folder );
state Future<Void> error = actorCollection( self.sharedActors.getFuture() );
@ -2770,6 +2791,16 @@ ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQ
}
}
when ( wait( error ) ) { throw internal_error(); }
when ( wait( activeSharedTLog->onChange() ) ) {
if (activeSharedTLog->get() == tlogId) {
TraceEvent("SharedTLogNowActive", self.dbgid).detail("NowActive", activeSharedTLog->get());
self.targetVolatileBytes = SERVER_KNOBS->TLOG_SPILL_THRESHOLD;
} else {
stopAllTLogs(&self, tlogId);
TraceEvent("SharedTLogQueueSpilling", self.dbgid).detail("NowActive", activeSharedTLog->get());
self.sharedActors.send( startSpillingInTenSeconds(&self, tlogId, activeSharedTLog) );
}
}
}
}
} catch (Error& e) {

View File

@ -445,7 +445,9 @@ ACTOR Future<Void> masterProxyServer(MasterProxyInterface proxy, InitializeMaste
ACTOR Future<Void> tLog(IKeyValueStore* persistentData, IDiskQueue* persistentQueue,
Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality,
PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, bool restoreFromDisk,
Promise<Void> oldLog, Promise<Void> recovered, std::string folder, Reference<AsyncVar<bool>> degraded); // changes tli->id() to be the recovered ID
Promise<Void> oldLog, Promise<Void> recovered, std::string folder,
Reference<AsyncVar<bool>> degraded, Reference<AsyncVar<UID>> activeSharedTLog);
ACTOR Future<Void> monitorServerDBInfo(Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> ccInterface,
Reference<ClusterConnectionFile> ccf, LocalityData locality,
Reference<AsyncVar<ServerDBInfo>> dbInfo);
@ -467,7 +469,8 @@ namespace oldTLog_6_0 {
ACTOR Future<Void> tLog(IKeyValueStore* persistentData, IDiskQueue* persistentQueue,
Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality,
PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, bool restoreFromDisk,
Promise<Void> oldLog, Promise<Void> recovered, std::string folder, Reference<AsyncVar<bool>> degraded);
Promise<Void> oldLog, Promise<Void> recovered, std::string folder,
Reference<AsyncVar<bool>> degraded, Reference<AsyncVar<UID>> activeSharedTLog);
}
typedef decltype(&tLog) TLogFn;

View File

@ -754,6 +754,17 @@ ACTOR Future<Void> monitorServerDBInfo( Reference<AsyncVar<Optional<ClusterContr
}
}
struct SharedLogsValue {
Future<Void> actor = Void();
UID uid = UID();
PromiseStream<InitializeTLogRequest> requests;
SharedLogsValue() = default;
SharedLogsValue( Future<Void> actor, UID uid, PromiseStream<InitializeTLogRequest> requests )
: actor(actor), uid(uid), requests(requests) {
}
};
ACTOR Future<Void> workerServer(
Reference<ClusterConnectionFile> connFile,
Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> ccInterface,
@ -782,7 +793,9 @@ ACTOR Future<Void> workerServer(
// decide if we should collapse them into the same SharedTLog instance as well. The answer
// here is no, so that when running with log_version==3, all files should say V=3.
state std::map<std::tuple<TLogVersion, KeyValueStoreType::StoreType, TLogSpillType>,
std::pair<Future<Void>, PromiseStream<InitializeTLogRequest>>> sharedLogs;
SharedLogsValue> sharedLogs;
state Reference<AsyncVar<UID>> activeSharedTLog(new AsyncVar<UID>());
state std::string coordFolder = abspath(_coordFolder);
state WorkerInterface interf( locality );
@ -899,13 +912,15 @@ ACTOR Future<Void> workerServer(
auto& logData = sharedLogs[std::make_tuple(s.tLogOptions.version, s.storeType, s.tLogOptions.spillType)];
// FIXME: Shouldn't if logData.first isValid && !isReady, shouldn't we
// be sending a fake InitializeTLogRequest rather than calling tLog() ?
Future<Void> tl = tLogFn( kv, queue, dbInfo, locality, !logData.first.isValid() || logData.first.isReady() ? logData.second : PromiseStream<InitializeTLogRequest>(), s.storeID, true, oldLog, recovery, folder, degraded );
Future<Void> tl = tLogFn( kv, queue, dbInfo, locality, !logData.actor.isValid() || logData.actor.isReady() ? logData.requests : PromiseStream<InitializeTLogRequest>(), s.storeID, true, oldLog, recovery, folder, degraded, activeSharedTLog );
recoveries.push_back(recovery.getFuture());
activeSharedTLog->set(s.storeID);
tl = handleIOErrors( tl, kv, s.storeID );
tl = handleIOErrors( tl, queue, s.storeID );
if(!logData.first.isValid() || logData.first.isReady()) {
logData.first = oldLog.getFuture() || tl;
if(!logData.actor.isValid() || logData.actor.isReady()) {
logData.actor = oldLog.getFuture() || tl;
logData.uid = s.storeID;
}
errorForwarders.add( forwardError( errors, Role::SHARED_TRANSACTION_LOG, s.storeID, tl ) );
}
@ -1045,8 +1060,8 @@ ACTOR Future<Void> workerServer(
TLogOptions tLogOptions(req.logVersion, req.spillType);
TLogFn tLogFn = tLogFnForOptions(tLogOptions);
auto& logData = sharedLogs[std::make_tuple(req.logVersion, req.storeType, req.spillType)];
logData.second.send(req);
if(!logData.first.isValid() || logData.first.isReady()) {
logData.requests.send(req);
if(!logData.actor.isValid() || logData.actor.isReady()) {
UID logId = deterministicRandom()->randomUniqueID();
std::map<std::string, std::string> details;
details["ForMaster"] = req.recruitmentID.shortString();
@ -1063,11 +1078,14 @@ ACTOR Future<Void> workerServer(
filesClosed.add( data->onClosed() );
filesClosed.add( queue->onClosed() );
logData.first = tLogFn( data, queue, dbInfo, locality, logData.second, logId, false, Promise<Void>(), Promise<Void>(), folder, degraded );
logData.first = handleIOErrors( logData.first, data, logId );
logData.first = handleIOErrors( logData.first, queue, logId );
errorForwarders.add( forwardError( errors, Role::SHARED_TRANSACTION_LOG, logId, logData.first ) );
Future<Void> tLogCore = tLogFn( data, queue, dbInfo, locality, logData.requests, logId, false, Promise<Void>(), Promise<Void>(), folder, degraded, activeSharedTLog );
tLogCore = handleIOErrors( tLogCore, data, logId );
tLogCore = handleIOErrors( tLogCore, queue, logId );
errorForwarders.add( forwardError( errors, Role::SHARED_TRANSACTION_LOG, logId, tLogCore ) );
logData.actor = tLogCore;
logData.uid = logId;
}
activeSharedTLog->set(logData.uid);
}
when( InitializeStorageRequest req = waitNext(interf.storage.getFuture()) ) {
if( !storageCache.exists( req.reqId ) ) {