Merge pull request #2256 from alexmiller-apple/spill-log-on-switch-6.2

Spill SharedTLog when there's more than one
This commit is contained in:
Evan Tschannen 2019-10-23 10:51:28 -07:00 committed by GitHub
commit e01e8371a6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 102 additions and 32 deletions

View File

@ -262,6 +262,7 @@ struct TLogData : NonCopyable {
int64_t instanceID; int64_t instanceID;
int64_t bytesInput; int64_t bytesInput;
int64_t bytesDurable; int64_t bytesDurable;
int64_t targetVolatileBytes; // The number of bytes of mutations this TLog should hold in memory before spilling.
int64_t overheadBytesInput; int64_t overheadBytesInput;
int64_t overheadBytesDurable; int64_t overheadBytesDurable;
@ -288,7 +289,7 @@ struct TLogData : NonCopyable {
: dbgid(dbgid), instanceID(deterministicRandom()->randomUniqueID().first()), : dbgid(dbgid), instanceID(deterministicRandom()->randomUniqueID().first()),
persistentData(persistentData), rawPersistentQueue(persistentQueue), persistentQueue(new TLogQueue(persistentQueue, dbgid)), persistentData(persistentData), rawPersistentQueue(persistentQueue), persistentQueue(new TLogQueue(persistentQueue, dbgid)),
dbInfo(dbInfo), degraded(degraded), queueCommitBegin(0), queueCommitEnd(0), dbInfo(dbInfo), degraded(degraded), queueCommitBegin(0), queueCommitEnd(0),
diskQueueCommitBytes(0), largeDiskQueueCommitBytes(false), bytesInput(0), bytesDurable(0), overheadBytesInput(0), overheadBytesDurable(0), diskQueueCommitBytes(0), largeDiskQueueCommitBytes(false), bytesInput(0), bytesDurable(0), targetVolatileBytes(SERVER_KNOBS->TLOG_SPILL_THRESHOLD), overheadBytesInput(0), overheadBytesDurable(0),
concurrentLogRouterReads(SERVER_KNOBS->CONCURRENT_LOG_ROUTER_READS), concurrentLogRouterReads(SERVER_KNOBS->CONCURRENT_LOG_ROUTER_READS),
ignorePopRequest(false), ignorePopDeadline(), ignorePopUid(), dataFolder(folder), toBePopped() ignorePopRequest(false), ignorePopDeadline(), ignorePopUid(), dataFolder(folder), toBePopped()
{ {
@ -697,7 +698,7 @@ ACTOR Future<Void> updateStorage( TLogData* self ) {
state FlowLock::Releaser commitLockReleaser; state FlowLock::Releaser commitLockReleaser;
if(logData->stopped) { if(logData->stopped) {
if (self->bytesInput - self->bytesDurable >= SERVER_KNOBS->TLOG_SPILL_THRESHOLD) { if (self->bytesInput - self->bytesDurable >= self->targetVolatileBytes) {
while(logData->persistentDataDurableVersion != logData->version.get()) { while(logData->persistentDataDurableVersion != logData->version.get()) {
totalSize = 0; totalSize = 0;
Map<Version, std::pair<int,int>>::iterator sizeItr = logData->version_sizes.begin(); Map<Version, std::pair<int,int>>::iterator sizeItr = logData->version_sizes.begin();
@ -742,7 +743,7 @@ ACTOR Future<Void> updateStorage( TLogData* self ) {
} else { } else {
Map<Version, std::pair<int,int>>::iterator sizeItr = logData->version_sizes.begin(); Map<Version, std::pair<int,int>>::iterator sizeItr = logData->version_sizes.begin();
while( totalSize < SERVER_KNOBS->UPDATE_STORAGE_BYTE_LIMIT && sizeItr != logData->version_sizes.end() while( totalSize < SERVER_KNOBS->UPDATE_STORAGE_BYTE_LIMIT && sizeItr != logData->version_sizes.end()
&& (logData->bytesInput.getValue() - logData->bytesDurable.getValue() - totalSize >= SERVER_KNOBS->TLOG_SPILL_THRESHOLD || sizeItr->value.first == 0) ) && (logData->bytesInput.getValue() - logData->bytesDurable.getValue() - totalSize >= self->targetVolatileBytes || sizeItr->value.first == 0) )
{ {
totalSize += sizeItr->value.first + sizeItr->value.second; totalSize += sizeItr->value.first + sizeItr->value.second;
++sizeItr; ++sizeItr;
@ -2315,8 +2316,18 @@ ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, Localit
return Void(); return Void();
} }
ACTOR Future<Void> startSpillingInTenSeconds(TLogData* self, UID tlogId, Reference<AsyncVar<UID>> activeSharedTLog) {
wait(delay(10));
if (activeSharedTLog->get() != tlogId) {
// TODO: This should fully spill, but currently doing so will cause us to no longer update poppedVersion
// and QuietDatabase will hang thinking our TLog is behind.
self->targetVolatileBytes = SERVER_KNOBS->REFERENCE_SPILL_UPDATE_STORAGE_BYTE_LIMIT * 2;
}
return Void();
}
// New tLog (if !recoverFrom.size()) or restore from network // New tLog (if !recoverFrom.size()) or restore from network
ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQueue, Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality, PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, bool restoreFromDisk, Promise<Void> oldLog, Promise<Void> recovered, std::string folder, Reference<AsyncVar<bool>> degraded) { ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQueue, Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality, PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, bool restoreFromDisk, Promise<Void> oldLog, Promise<Void> recovered, std::string folder, Reference<AsyncVar<bool>> degraded, Reference<AsyncVar<UID>> activeSharedTLog) {
state TLogData self( tlogId, persistentData, persistentQueue, db, degraded, folder ); state TLogData self( tlogId, persistentData, persistentQueue, db, degraded, folder );
state Future<Void> error = actorCollection( self.sharedActors.getFuture() ); state Future<Void> error = actorCollection( self.sharedActors.getFuture() );
@ -2339,6 +2350,12 @@ ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQ
self.sharedActors.send( updateStorageLoop(&self) ); self.sharedActors.send( updateStorageLoop(&self) );
loop { loop {
if (activeSharedTLog->get() == tlogId) {
self.targetVolatileBytes = SERVER_KNOBS->TLOG_SPILL_THRESHOLD;
} else {
self.sharedActors.send( startSpillingInTenSeconds(&self, tlogId, activeSharedTLog) );
}
choose { choose {
when ( InitializeTLogRequest req = waitNext(tlogRequests.getFuture() ) ) { when ( InitializeTLogRequest req = waitNext(tlogRequests.getFuture() ) ) {
if( !self.tlogCache.exists( req.recruitmentID ) ) { if( !self.tlogCache.exists( req.recruitmentID ) ) {
@ -2349,6 +2366,7 @@ ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQ
} }
} }
when ( wait( error ) ) { throw internal_error(); } when ( wait( error ) ) { throw internal_error(); }
when ( wait( activeSharedTLog->onChange() ) ) {}
} }
} }
} catch (Error& e) { } catch (Error& e) {

View File

@ -312,6 +312,7 @@ struct TLogData : NonCopyable {
int64_t instanceID; int64_t instanceID;
int64_t bytesInput; int64_t bytesInput;
int64_t bytesDurable; int64_t bytesDurable;
int64_t targetVolatileBytes; // The number of bytes of mutations this TLog should hold in memory before spilling.
int64_t overheadBytesInput; int64_t overheadBytesInput;
int64_t overheadBytesDurable; int64_t overheadBytesDurable;
@ -339,7 +340,7 @@ struct TLogData : NonCopyable {
: dbgid(dbgid), instanceID(deterministicRandom()->randomUniqueID().first()), : dbgid(dbgid), instanceID(deterministicRandom()->randomUniqueID().first()),
persistentData(persistentData), rawPersistentQueue(persistentQueue), persistentQueue(new TLogQueue(persistentQueue, dbgid)), persistentData(persistentData), rawPersistentQueue(persistentQueue), persistentQueue(new TLogQueue(persistentQueue, dbgid)),
dbInfo(dbInfo), degraded(degraded), queueCommitBegin(0), queueCommitEnd(0), dbInfo(dbInfo), degraded(degraded), queueCommitBegin(0), queueCommitEnd(0),
diskQueueCommitBytes(0), largeDiskQueueCommitBytes(false), bytesInput(0), bytesDurable(0), overheadBytesInput(0), overheadBytesDurable(0), diskQueueCommitBytes(0), largeDiskQueueCommitBytes(false), bytesInput(0), bytesDurable(0), targetVolatileBytes(SERVER_KNOBS->TLOG_SPILL_THRESHOLD), overheadBytesInput(0), overheadBytesDurable(0),
peekMemoryLimiter(SERVER_KNOBS->TLOG_SPILL_REFERENCE_MAX_PEEK_MEMORY_BYTES), peekMemoryLimiter(SERVER_KNOBS->TLOG_SPILL_REFERENCE_MAX_PEEK_MEMORY_BYTES),
concurrentLogRouterReads(SERVER_KNOBS->CONCURRENT_LOG_ROUTER_READS), concurrentLogRouterReads(SERVER_KNOBS->CONCURRENT_LOG_ROUTER_READS),
ignorePopRequest(false), ignorePopDeadline(), ignorePopUid(), dataFolder(folder), toBePopped() ignorePopRequest(false), ignorePopDeadline(), ignorePopUid(), dataFolder(folder), toBePopped()
@ -952,7 +953,7 @@ ACTOR Future<Void> updateStorage( TLogData* self ) {
state FlowLock::Releaser commitLockReleaser; state FlowLock::Releaser commitLockReleaser;
if(logData->stopped) { if(logData->stopped) {
if (self->bytesInput - self->bytesDurable >= SERVER_KNOBS->TLOG_SPILL_THRESHOLD) { if (self->bytesInput - self->bytesDurable >= self->targetVolatileBytes) {
while(logData->persistentDataDurableVersion != logData->version.get()) { while(logData->persistentDataDurableVersion != logData->version.get()) {
totalSize = 0; totalSize = 0;
Map<Version, std::pair<int,int>>::iterator sizeItr = logData->version_sizes.begin(); Map<Version, std::pair<int,int>>::iterator sizeItr = logData->version_sizes.begin();
@ -1000,10 +1001,12 @@ ACTOR Future<Void> updateStorage( TLogData* self ) {
if(logData->version_sizes.empty()) { if(logData->version_sizes.empty()) {
nextVersion = logData->version.get(); nextVersion = logData->version.get();
} else { } else {
// Double check that a running TLog wasn't wrongly affected by spilling locked SharedTLogs.
ASSERT_WE_THINK(self->targetVolatileBytes == SERVER_KNOBS->TLOG_SPILL_THRESHOLD);
Map<Version, std::pair<int,int>>::iterator sizeItr = logData->version_sizes.begin(); Map<Version, std::pair<int,int>>::iterator sizeItr = logData->version_sizes.begin();
while( totalSize < SERVER_KNOBS->REFERENCE_SPILL_UPDATE_STORAGE_BYTE_LIMIT && while( totalSize < SERVER_KNOBS->REFERENCE_SPILL_UPDATE_STORAGE_BYTE_LIMIT &&
sizeItr != logData->version_sizes.end() sizeItr != logData->version_sizes.end()
&& (logData->bytesInput.getValue() - logData->bytesDurable.getValue() - totalSize >= SERVER_KNOBS->TLOG_SPILL_THRESHOLD || sizeItr->value.first == 0) ) && (logData->bytesInput.getValue() - logData->bytesDurable.getValue() - totalSize >= self->targetVolatileBytes || sizeItr->value.first == 0) )
{ {
totalSize += sizeItr->value.first + sizeItr->value.second; totalSize += sizeItr->value.first + sizeItr->value.second;
++sizeItr; ++sizeItr;
@ -2596,20 +2599,10 @@ ACTOR Future<Void> updateLogSystem(TLogData* self, Reference<LogData> logData, L
} }
} }
ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, LocalityData locality ) { void stopAllTLogs( TLogData* self, UID newLogId ) {
state TLogInterface recruited(self->dbgid, locality);
recruited.initEndpoints();
DUMPTOKEN( recruited.peekMessages );
DUMPTOKEN( recruited.popMessages );
DUMPTOKEN( recruited.commit );
DUMPTOKEN( recruited.lock );
DUMPTOKEN( recruited.getQueuingMetrics );
DUMPTOKEN( recruited.confirmRunning );
for(auto it : self->id_data) { for(auto it : self->id_data) {
if( !it.second->stopped ) { if( !it.second->stopped ) {
TraceEvent("TLogStoppedByNewRecruitment", self->dbgid).detail("LogId", it.second->logId).detail("StoppedId", it.first.toString()).detail("RecruitedId", recruited.id()).detail("EndEpoch", it.second->logSystem->get().getPtr() != 0); TraceEvent("TLogStoppedByNewRecruitment", self->dbgid).detail("LogId", it.second->logId).detail("StoppedId", it.first.toString()).detail("RecruitedId", newLogId).detail("EndEpoch", it.second->logSystem->get().getPtr() != 0);
if(!it.second->isPrimary && it.second->logSystem->get()) { if(!it.second->isPrimary && it.second->logSystem->get()) {
it.second->removed = it.second->removed && it.second->logSystem->get()->endEpoch(); it.second->removed = it.second->removed && it.second->logSystem->get()->endEpoch();
} }
@ -2623,6 +2616,21 @@ ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, Localit
} }
it.second->stopCommit.trigger(); it.second->stopCommit.trigger();
} }
}
// Start the tLog role for a worker
ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, LocalityData locality ) {
state TLogInterface recruited(self->dbgid, locality);
recruited.initEndpoints();
DUMPTOKEN( recruited.peekMessages );
DUMPTOKEN( recruited.popMessages );
DUMPTOKEN( recruited.commit );
DUMPTOKEN( recruited.lock );
DUMPTOKEN( recruited.getQueuingMetrics );
DUMPTOKEN( recruited.confirmRunning );
stopAllTLogs(self, recruited.id());
state Reference<LogData> logData = Reference<LogData>( new LogData(self, recruited, req.remoteTag, req.isPrimary, req.logRouterTags, req.txsTags, req.recruitmentID, currentProtocolVersion, req.allTags) ); state Reference<LogData> logData = Reference<LogData>( new LogData(self, recruited, req.remoteTag, req.isPrimary, req.logRouterTags, req.txsTags, req.recruitmentID, currentProtocolVersion, req.allTags) );
self->id_data[recruited.id()] = logData; self->id_data[recruited.id()] = logData;
@ -2739,8 +2747,21 @@ ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, Localit
return Void(); return Void();
} }
ACTOR Future<Void> startSpillingInTenSeconds(TLogData* self, UID tlogId, Reference<AsyncVar<UID>> activeSharedTLog) {
wait(delay(10));
if (activeSharedTLog->get() != tlogId) {
// TODO: This should fully spill, but currently doing so will cause us to no longer update poppedVersion
// and QuietDatabase will hang thinking our TLog is behind.
TraceEvent("SharedTLogBeginSpilling", self->dbgid).detail("NowActive", activeSharedTLog->get());
self->targetVolatileBytes = SERVER_KNOBS->REFERENCE_SPILL_UPDATE_STORAGE_BYTE_LIMIT * 2;
} else {
TraceEvent("SharedTLogSkipSpilling", self->dbgid).detail("NowActive", activeSharedTLog->get());
}
return Void();
}
// New tLog (if !recoverFrom.size()) or restore from network // New tLog (if !recoverFrom.size()) or restore from network
ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQueue, Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality, PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, bool restoreFromDisk, Promise<Void> oldLog, Promise<Void> recovered, std::string folder, Reference<AsyncVar<bool>> degraded ) { ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQueue, Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality, PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, bool restoreFromDisk, Promise<Void> oldLog, Promise<Void> recovered, std::string folder, Reference<AsyncVar<bool>> degraded, Reference<AsyncVar<UID>> activeSharedTLog ) {
state TLogData self( tlogId, persistentData, persistentQueue, db, degraded, folder ); state TLogData self( tlogId, persistentData, persistentQueue, db, degraded, folder );
state Future<Void> error = actorCollection( self.sharedActors.getFuture() ); state Future<Void> error = actorCollection( self.sharedActors.getFuture() );
@ -2763,6 +2784,15 @@ ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQ
self.sharedActors.send( updateStorageLoop(&self) ); self.sharedActors.send( updateStorageLoop(&self) );
loop { loop {
if (activeSharedTLog->get() == tlogId) {
TraceEvent("SharedTLogNowActive", self.dbgid).detail("NowActive", activeSharedTLog->get());
self.targetVolatileBytes = SERVER_KNOBS->TLOG_SPILL_THRESHOLD;
} else {
stopAllTLogs(&self, tlogId);
TraceEvent("SharedTLogQueueSpilling", self.dbgid).detail("NowActive", activeSharedTLog->get());
self.sharedActors.send( startSpillingInTenSeconds(&self, tlogId, activeSharedTLog) );
}
choose { choose {
when ( InitializeTLogRequest req = waitNext(tlogRequests.getFuture() ) ) { when ( InitializeTLogRequest req = waitNext(tlogRequests.getFuture() ) ) {
if( !self.tlogCache.exists( req.recruitmentID ) ) { if( !self.tlogCache.exists( req.recruitmentID ) ) {
@ -2773,6 +2803,7 @@ ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQ
} }
} }
when ( wait( error ) ) { throw internal_error(); } when ( wait( error ) ) { throw internal_error(); }
when ( wait( activeSharedTLog->onChange() ) ) {}
} }
} }
} catch (Error& e) { } catch (Error& e) {

View File

@ -445,7 +445,9 @@ ACTOR Future<Void> masterProxyServer(MasterProxyInterface proxy, InitializeMaste
ACTOR Future<Void> tLog(IKeyValueStore* persistentData, IDiskQueue* persistentQueue, ACTOR Future<Void> tLog(IKeyValueStore* persistentData, IDiskQueue* persistentQueue,
Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality, Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality,
PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, bool restoreFromDisk, PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, bool restoreFromDisk,
Promise<Void> oldLog, Promise<Void> recovered, std::string folder, Reference<AsyncVar<bool>> degraded); // changes tli->id() to be the recovered ID Promise<Void> oldLog, Promise<Void> recovered, std::string folder,
Reference<AsyncVar<bool>> degraded, Reference<AsyncVar<UID>> activeSharedTLog);
ACTOR Future<Void> monitorServerDBInfo(Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> ccInterface, ACTOR Future<Void> monitorServerDBInfo(Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> ccInterface,
Reference<ClusterConnectionFile> ccf, LocalityData locality, Reference<ClusterConnectionFile> ccf, LocalityData locality,
Reference<AsyncVar<ServerDBInfo>> dbInfo); Reference<AsyncVar<ServerDBInfo>> dbInfo);
@ -467,7 +469,8 @@ namespace oldTLog_6_0 {
ACTOR Future<Void> tLog(IKeyValueStore* persistentData, IDiskQueue* persistentQueue, ACTOR Future<Void> tLog(IKeyValueStore* persistentData, IDiskQueue* persistentQueue,
Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality, Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality,
PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, bool restoreFromDisk, PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, bool restoreFromDisk,
Promise<Void> oldLog, Promise<Void> recovered, std::string folder, Reference<AsyncVar<bool>> degraded); Promise<Void> oldLog, Promise<Void> recovered, std::string folder,
Reference<AsyncVar<bool>> degraded, Reference<AsyncVar<UID>> activeSharedTLog);
} }
typedef decltype(&tLog) TLogFn; typedef decltype(&tLog) TLogFn;

View File

@ -754,6 +754,17 @@ ACTOR Future<Void> monitorServerDBInfo( Reference<AsyncVar<Optional<ClusterContr
} }
} }
struct SharedLogsValue {
Future<Void> actor = Void();
UID uid = UID();
PromiseStream<InitializeTLogRequest> requests;
SharedLogsValue() = default;
SharedLogsValue( Future<Void> actor, UID uid, PromiseStream<InitializeTLogRequest> requests )
: actor(actor), uid(uid), requests(requests) {
}
};
ACTOR Future<Void> workerServer( ACTOR Future<Void> workerServer(
Reference<ClusterConnectionFile> connFile, Reference<ClusterConnectionFile> connFile,
Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> ccInterface, Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> ccInterface,
@ -782,7 +793,9 @@ ACTOR Future<Void> workerServer(
// decide if we should collapse them into the same SharedTLog instance as well. The answer // decide if we should collapse them into the same SharedTLog instance as well. The answer
// here is no, so that when running with log_version==3, all files should say V=3. // here is no, so that when running with log_version==3, all files should say V=3.
state std::map<std::tuple<TLogVersion, KeyValueStoreType::StoreType, TLogSpillType>, state std::map<std::tuple<TLogVersion, KeyValueStoreType::StoreType, TLogSpillType>,
std::pair<Future<Void>, PromiseStream<InitializeTLogRequest>>> sharedLogs; SharedLogsValue> sharedLogs;
state Reference<AsyncVar<UID>> activeSharedTLog(new AsyncVar<UID>());
state std::string coordFolder = abspath(_coordFolder); state std::string coordFolder = abspath(_coordFolder);
state WorkerInterface interf( locality ); state WorkerInterface interf( locality );
@ -899,13 +912,15 @@ ACTOR Future<Void> workerServer(
auto& logData = sharedLogs[std::make_tuple(s.tLogOptions.version, s.storeType, s.tLogOptions.spillType)]; auto& logData = sharedLogs[std::make_tuple(s.tLogOptions.version, s.storeType, s.tLogOptions.spillType)];
// FIXME: Shouldn't if logData.first isValid && !isReady, shouldn't we // FIXME: Shouldn't if logData.first isValid && !isReady, shouldn't we
// be sending a fake InitializeTLogRequest rather than calling tLog() ? // be sending a fake InitializeTLogRequest rather than calling tLog() ?
Future<Void> tl = tLogFn( kv, queue, dbInfo, locality, !logData.first.isValid() || logData.first.isReady() ? logData.second : PromiseStream<InitializeTLogRequest>(), s.storeID, true, oldLog, recovery, folder, degraded ); Future<Void> tl = tLogFn( kv, queue, dbInfo, locality, !logData.actor.isValid() || logData.actor.isReady() ? logData.requests : PromiseStream<InitializeTLogRequest>(), s.storeID, true, oldLog, recovery, folder, degraded, activeSharedTLog );
recoveries.push_back(recovery.getFuture()); recoveries.push_back(recovery.getFuture());
activeSharedTLog->set(s.storeID);
tl = handleIOErrors( tl, kv, s.storeID ); tl = handleIOErrors( tl, kv, s.storeID );
tl = handleIOErrors( tl, queue, s.storeID ); tl = handleIOErrors( tl, queue, s.storeID );
if(!logData.first.isValid() || logData.first.isReady()) { if(!logData.actor.isValid() || logData.actor.isReady()) {
logData.first = oldLog.getFuture() || tl; logData.actor = oldLog.getFuture() || tl;
logData.uid = s.storeID;
} }
errorForwarders.add( forwardError( errors, Role::SHARED_TRANSACTION_LOG, s.storeID, tl ) ); errorForwarders.add( forwardError( errors, Role::SHARED_TRANSACTION_LOG, s.storeID, tl ) );
} }
@ -1045,8 +1060,8 @@ ACTOR Future<Void> workerServer(
TLogOptions tLogOptions(req.logVersion, req.spillType); TLogOptions tLogOptions(req.logVersion, req.spillType);
TLogFn tLogFn = tLogFnForOptions(tLogOptions); TLogFn tLogFn = tLogFnForOptions(tLogOptions);
auto& logData = sharedLogs[std::make_tuple(req.logVersion, req.storeType, req.spillType)]; auto& logData = sharedLogs[std::make_tuple(req.logVersion, req.storeType, req.spillType)];
logData.second.send(req); logData.requests.send(req);
if(!logData.first.isValid() || logData.first.isReady()) { if(!logData.actor.isValid() || logData.actor.isReady()) {
UID logId = deterministicRandom()->randomUniqueID(); UID logId = deterministicRandom()->randomUniqueID();
std::map<std::string, std::string> details; std::map<std::string, std::string> details;
details["ForMaster"] = req.recruitmentID.shortString(); details["ForMaster"] = req.recruitmentID.shortString();
@ -1063,11 +1078,14 @@ ACTOR Future<Void> workerServer(
filesClosed.add( data->onClosed() ); filesClosed.add( data->onClosed() );
filesClosed.add( queue->onClosed() ); filesClosed.add( queue->onClosed() );
logData.first = tLogFn( data, queue, dbInfo, locality, logData.second, logId, false, Promise<Void>(), Promise<Void>(), folder, degraded ); Future<Void> tLogCore = tLogFn( data, queue, dbInfo, locality, logData.requests, logId, false, Promise<Void>(), Promise<Void>(), folder, degraded, activeSharedTLog );
logData.first = handleIOErrors( logData.first, data, logId ); tLogCore = handleIOErrors( tLogCore, data, logId );
logData.first = handleIOErrors( logData.first, queue, logId ); tLogCore = handleIOErrors( tLogCore, queue, logId );
errorForwarders.add( forwardError( errors, Role::SHARED_TRANSACTION_LOG, logId, logData.first ) ); errorForwarders.add( forwardError( errors, Role::SHARED_TRANSACTION_LOG, logId, tLogCore ) );
logData.actor = tLogCore;
logData.uid = logId;
} }
activeSharedTLog->set(logData.uid);
} }
when( InitializeStorageRequest req = waitNext(interf.storage.getFuture()) ) { when( InitializeStorageRequest req = waitNext(interf.storage.getFuture()) ) {
if( !storageCache.exists( req.reqId ) ) { if( !storageCache.exists( req.reqId ) ) {