diff --git a/fdbserver/worker.actor.cpp b/fdbserver/worker.actor.cpp index 27e355368b..e12574aae2 100644 --- a/fdbserver/worker.actor.cpp +++ b/fdbserver/worker.actor.cpp @@ -1579,7 +1579,7 @@ ACTOR Future workerServer(Reference connRecord, // As (store type, spill type) can map to the same TLogFn across multiple TLogVersions, we need to // decide if we should collapse them into the same SharedTLog instance as well. The answer // here is no, so that when running with log_version==3, all files should say V=3. - state std::map sharedLogs; + state std::map> sharedLogs; state Reference> activeSharedTLog(new AsyncVar()); state WorkerCache backupWorkerCache; state WorkerCache blobWorkerCache; @@ -1772,32 +1772,29 @@ ACTOR Future workerServer(Reference connRecord, Promise recovery; TLogFn tLogFn = tLogFnForOptions(s.tLogOptions); auto& logData = sharedLogs[SharedLogsKey(s.tLogOptions, s.storeType)]; + logData.push_back(SharedLogsValue()); // FIXME: Shouldn't if logData.first isValid && !isReady, shouldn't we // be sending a fake InitializeTLogRequest rather than calling tLog() ? - Future tl = - tLogFn(kv, - queue, - dbInfo, - locality, - !logData.actor.isValid() || logData.actor.isReady() ? logData.requests - : PromiseStream(), - s.storeID, - interf.id(), - true, - oldLog, - recovery, - folder, - degraded, - activeSharedTLog); + Future tl = tLogFn(kv, + queue, + dbInfo, + locality, + logData.back().requests, + s.storeID, + interf.id(), + true, + oldLog, + recovery, + folder, + degraded, + activeSharedTLog); recoveries.push_back(recovery.getFuture()); activeSharedTLog->set(s.storeID); tl = handleIOErrors(tl, kv, s.storeID); tl = handleIOErrors(tl, queue, s.storeID); - if (!logData.actor.isValid() || logData.actor.isReady()) { - logData.actor = oldLog.getFuture() || tl; - logData.uid = s.storeID; - } + logData.back().actor = oldLog.getFuture() || tl; + logData.back().uid = s.storeID; errorForwarders.add(forwardError(errors, Role::SHARED_TRANSACTION_LOG, s.storeID, tl)); } } @@ -2141,8 +2138,10 @@ ACTOR Future workerServer(Reference connRecord, TLogOptions tLogOptions(req.logVersion, req.spillType); TLogFn tLogFn = tLogFnForOptions(tLogOptions); auto& logData = sharedLogs[SharedLogsKey(tLogOptions, req.storeType)]; - logData.requests.send(req); - if (!logData.actor.isValid() || logData.actor.isReady()) { + while (!logData.empty() && (!logData.back().actor.isValid() || logData.back().actor.isReady())) { + logData.pop_back(); + } + if (logData.empty()) { UID logId = deterministicRandom()->randomUniqueID(); std::map details; details["ForMaster"] = req.recruitmentID.shortString(); @@ -2168,11 +2167,12 @@ ACTOR Future workerServer(Reference connRecord, filesClosed.add(data->onClosed()); filesClosed.add(queue->onClosed()); + logData.push_back(SharedLogsValue()); Future tLogCore = tLogFn(data, queue, dbInfo, locality, - logData.requests, + logData.back().requests, logId, interf.id(), false, @@ -2184,10 +2184,11 @@ ACTOR Future workerServer(Reference connRecord, tLogCore = handleIOErrors(tLogCore, data, logId); tLogCore = handleIOErrors(tLogCore, queue, logId); errorForwarders.add(forwardError(errors, Role::SHARED_TRANSACTION_LOG, logId, tLogCore)); - logData.actor = tLogCore; - logData.uid = logId; + logData.back().actor = tLogCore; + logData.back().uid = logId; } - activeSharedTLog->set(logData.uid); + logData.back().requests.send(req); + activeSharedTLog->set(logData.back().uid); } when(InitializeStorageRequest req = waitNext(interf.storage.getFuture())) { // We want to prevent double recruiting on a worker unless we try to recruit something