Merge pull request #7385 from sfc-gh-etschannen/fix-multiple-shared-tlogs

Avoid recruiting multiple shared tlogs
This commit is contained in:
Markus Pilman 2022-07-11 17:28:35 -06:00 committed by GitHub
commit 4f4095b6ae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 27 additions and 26 deletions

View File

@ -1589,7 +1589,7 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
// As (store type, spill type) can map to the same TLogFn across multiple TLogVersions, we need to // As (store type, spill type) can map to the same TLogFn across multiple TLogVersions, we need to
// decide if we should collapse them into the same SharedTLog instance as well. The answer // decide if we should collapse them into the same SharedTLog instance as well. The answer
// here is no, so that when running with log_version==3, all files should say V=3. // here is no, so that when running with log_version==3, all files should say V=3.
state std::map<SharedLogsKey, SharedLogsValue> sharedLogs; state std::map<SharedLogsKey, std::vector<SharedLogsValue>> sharedLogs;
state Reference<AsyncVar<UID>> activeSharedTLog(new AsyncVar<UID>()); state Reference<AsyncVar<UID>> activeSharedTLog(new AsyncVar<UID>());
state WorkerCache<InitializeBackupReply> backupWorkerCache; state WorkerCache<InitializeBackupReply> backupWorkerCache;
state WorkerCache<InitializeBlobWorkerReply> blobWorkerCache; state WorkerCache<InitializeBlobWorkerReply> blobWorkerCache;
@ -1787,32 +1787,29 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
Promise<Void> recovery; Promise<Void> recovery;
TLogFn tLogFn = tLogFnForOptions(s.tLogOptions); TLogFn tLogFn = tLogFnForOptions(s.tLogOptions);
auto& logData = sharedLogs[SharedLogsKey(s.tLogOptions, s.storeType)]; auto& logData = sharedLogs[SharedLogsKey(s.tLogOptions, s.storeType)];
logData.push_back(SharedLogsValue());
// FIXME: Shouldn't if logData.first isValid && !isReady, shouldn't we // FIXME: Shouldn't if logData.first isValid && !isReady, shouldn't we
// be sending a fake InitializeTLogRequest rather than calling tLog() ? // be sending a fake InitializeTLogRequest rather than calling tLog() ?
Future<Void> tl = Future<Void> tl = tLogFn(kv,
tLogFn(kv, queue,
queue, dbInfo,
dbInfo, locality,
locality, logData.back().requests,
!logData.actor.isValid() || logData.actor.isReady() ? logData.requests s.storeID,
: PromiseStream<InitializeTLogRequest>(), interf.id(),
s.storeID, true,
interf.id(), oldLog,
true, recovery,
oldLog, folder,
recovery, degraded,
folder, activeSharedTLog);
degraded,
activeSharedTLog);
recoveries.push_back(recovery.getFuture()); recoveries.push_back(recovery.getFuture());
activeSharedTLog->set(s.storeID); activeSharedTLog->set(s.storeID);
tl = handleIOErrors(tl, kv, s.storeID); tl = handleIOErrors(tl, kv, s.storeID);
tl = handleIOErrors(tl, queue, s.storeID); tl = handleIOErrors(tl, queue, s.storeID);
if (!logData.actor.isValid() || logData.actor.isReady()) { logData.back().actor = oldLog.getFuture() || tl;
logData.actor = oldLog.getFuture() || tl; logData.back().uid = s.storeID;
logData.uid = s.storeID;
}
errorForwarders.add(forwardError(errors, Role::SHARED_TRANSACTION_LOG, s.storeID, tl)); errorForwarders.add(forwardError(errors, Role::SHARED_TRANSACTION_LOG, s.storeID, tl));
} }
} }
@ -2156,8 +2153,10 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
TLogOptions tLogOptions(req.logVersion, req.spillType); TLogOptions tLogOptions(req.logVersion, req.spillType);
TLogFn tLogFn = tLogFnForOptions(tLogOptions); TLogFn tLogFn = tLogFnForOptions(tLogOptions);
auto& logData = sharedLogs[SharedLogsKey(tLogOptions, req.storeType)]; auto& logData = sharedLogs[SharedLogsKey(tLogOptions, req.storeType)];
logData.requests.send(req); while (!logData.empty() && (!logData.back().actor.isValid() || logData.back().actor.isReady())) {
if (!logData.actor.isValid() || logData.actor.isReady()) { logData.pop_back();
}
if (logData.empty()) {
UID logId = deterministicRandom()->randomUniqueID(); UID logId = deterministicRandom()->randomUniqueID();
std::map<std::string, std::string> details; std::map<std::string, std::string> details;
details["ForMaster"] = req.recruitmentID.shortString(); details["ForMaster"] = req.recruitmentID.shortString();
@ -2183,11 +2182,12 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
filesClosed.add(data->onClosed()); filesClosed.add(data->onClosed());
filesClosed.add(queue->onClosed()); filesClosed.add(queue->onClosed());
logData.push_back(SharedLogsValue());
Future<Void> tLogCore = tLogFn(data, Future<Void> tLogCore = tLogFn(data,
queue, queue,
dbInfo, dbInfo,
locality, locality,
logData.requests, logData.back().requests,
logId, logId,
interf.id(), interf.id(),
false, false,
@ -2199,10 +2199,11 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
tLogCore = handleIOErrors(tLogCore, data, logId); tLogCore = handleIOErrors(tLogCore, data, logId);
tLogCore = handleIOErrors(tLogCore, queue, logId); tLogCore = handleIOErrors(tLogCore, queue, logId);
errorForwarders.add(forwardError(errors, Role::SHARED_TRANSACTION_LOG, logId, tLogCore)); errorForwarders.add(forwardError(errors, Role::SHARED_TRANSACTION_LOG, logId, tLogCore));
logData.actor = tLogCore; logData.back().actor = tLogCore;
logData.uid = logId; logData.back().uid = logId;
} }
activeSharedTLog->set(logData.uid); logData.back().requests.send(req);
activeSharedTLog->set(logData.back().uid);
} }
when(InitializeStorageRequest req = waitNext(interf.storage.getFuture())) { when(InitializeStorageRequest req = waitNext(interf.storage.getFuture())) {
// We want to prevent double recruiting on a worker unless we try to recruit something // We want to prevent double recruiting on a worker unless we try to recruit something