diff --git a/fdbserver/OldTLogServer_4_6.actor.cpp b/fdbserver/OldTLogServer_4_6.actor.cpp index 0e02cd57b6..388f6effed 100644 --- a/fdbserver/OldTLogServer_4_6.actor.cpp +++ b/fdbserver/OldTLogServer_4_6.actor.cpp @@ -270,6 +270,7 @@ namespace oldTLog_4_6 { std::map<UID, Reference<struct LogData>> id_data; UID dbgid; + UID workerID; IKeyValueStore* persistentData; IDiskQueue* rawPersistentQueue; @@ -303,8 +304,8 @@ namespace oldTLog_4_6 { PromiseStream<Future<Void>> sharedActors; bool terminated; - TLogData(UID dbgid, IKeyValueStore* persistentData, IDiskQueue * persistentQueue, Reference<AsyncVar<ServerDBInfo>> const& dbInfo) - : dbgid(dbgid), instanceID(deterministicRandom()->randomUniqueID().first()), + TLogData(UID dbgid, UID workerID, IKeyValueStore* persistentData, IDiskQueue * persistentQueue, Reference<AsyncVar<ServerDBInfo>> const& dbInfo) + : dbgid(dbgid), workerID(workerID), instanceID(deterministicRandom()->randomUniqueID().first()), persistentData(persistentData), rawPersistentQueue(persistentQueue), persistentQueue(new TLogQueue(persistentQueue, dbgid)), dbInfo(dbInfo), queueCommitBegin(0), queueCommitEnd(0), prevVersion(0), diskQueueCommitBytes(0), largeDiskQueueCommitBytes(false), @@ -412,7 +413,7 @@ namespace oldTLog_4_6 { // These are initialized differently on init() or recovery recoveryCount(), stopped(false), initialized(false), queueCommittingVersion(0), newPersistentDataVersion(invalidVersion), recovery(Void()) { - startRole(Role::TRANSACTION_LOG,interf.id(), UID()); + startRole(Role::TRANSACTION_LOG, interf.id(), tLogData->workerID, {{"SharedTLog", tLogData->dbgid.shortString()}}, "Restored"); persistentDataVersion.init(LiteralStringRef("TLog.PersistentDataVersion"), cc.id); persistentDataDurableVersion.init(LiteralStringRef("TLog.PersistentDataDurableVersion"), cc.id); @@ -1120,7 +1121,7 @@ namespace oldTLog_4_6 { // The TLogRejoinRequest is needed to establish communications with a new master, which doesn't have our TLogInterface TLogRejoinRequest req; req.myInterface = tli; - TraceEvent("TLogRejoining", self->dbgid).detail("Master", self->dbInfo->get().master.id()); + TraceEvent("TLogRejoining", tli.id()).detail("Master", self->dbInfo->get().master.id()); choose { when(TLogRejoinReply rep = wait(brokenPromiseToNever(self->dbInfo->get().master.tlogRejoin.getReply(req)))) { @@ -1421,9 +1422,9 @@ namespace oldTLog_4_6 { return Void(); } - ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQueue, Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality, UID tlogId ) + ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQueue, Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality, UID tlogId, UID workerID ) { - state TLogData self( tlogId, persistentData, persistentQueue, db ); + state TLogData self( tlogId, workerID, persistentData, persistentQueue, db ); state Future<Void> error = actorCollection( self.sharedActors.getFuture() ); TraceEvent("SharedTlog", tlogId); diff --git a/fdbserver/OldTLogServer_6_0.actor.cpp b/fdbserver/OldTLogServer_6_0.actor.cpp index 36a8ac77bb..20bf5ec392 100644 --- a/fdbserver/OldTLogServer_6_0.actor.cpp +++ b/fdbserver/OldTLogServer_6_0.actor.cpp @@ -245,6 +245,7 @@ struct TLogData : NonCopyable { std::map<UID, Reference<struct LogData>> id_data; UID dbgid; + UID workerID; IKeyValueStore* persistentData; IDiskQueue* rawPersistentQueue; @@ -286,8 +287,8 @@ struct TLogData : NonCopyable { Reference<AsyncVar<bool>> degraded; std::vector<TagsAndMessage> tempTagMessages; - TLogData(UID dbgid, IKeyValueStore* persistentData, IDiskQueue * persistentQueue, Reference<AsyncVar<ServerDBInfo>> dbInfo, Reference<AsyncVar<bool>> degraded, std::string folder) - : dbgid(dbgid), instanceID(deterministicRandom()->randomUniqueID().first()), + TLogData(UID dbgid, UID workerID, IKeyValueStore* persistentData, IDiskQueue * persistentQueue, Reference<AsyncVar<ServerDBInfo>> dbInfo, Reference<AsyncVar<bool>> degraded, std::string folder) + : dbgid(dbgid), workerID(workerID), instanceID(deterministicRandom()->randomUniqueID().first()), persistentData(persistentData), rawPersistentQueue(persistentQueue), persistentQueue(new TLogQueue(persistentQueue, dbgid)), dbInfo(dbInfo), degraded(degraded), queueCommitBegin(0), queueCommitEnd(0), diskQueueCommitBytes(0), largeDiskQueueCommitBytes(false), bytesInput(0), bytesDurable(0), targetVolatileBytes(SERVER_KNOBS->TLOG_SPILL_THRESHOLD), overheadBytesInput(0), overheadBytesDurable(0), @@ -439,14 +440,15 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> { bool execOpCommitInProgress; int txsTags; - explicit LogData(TLogData* tLogData, TLogInterface interf, Tag remoteTag, bool isPrimary, int logRouterTags, int txsTags, UID recruitmentID, std::vector<Tag> tags) : tLogData(tLogData), knownCommittedVersion(0), logId(interf.id()), - cc("TLog", interf.id().toString()), bytesInput("BytesInput", cc), bytesDurable("BytesDurable", cc), remoteTag(remoteTag), isPrimary(isPrimary), logRouterTags(logRouterTags), txsTags(txsTags), recruitmentID(recruitmentID), - logSystem(new AsyncVar<Reference<ILogSystem>>()), logRouterPoppedVersion(0), durableKnownCommittedVersion(0), minKnownCommittedVersion(0), allTags(tags.begin(), tags.end()), terminated(tLogData->terminated.getFuture()), - // These are initialized differently on init() or recovery - recoveryCount(), stopped(false), initialized(false), queueCommittingVersion(0), newPersistentDataVersion(invalidVersion), unrecoveredBefore(1), recoveredAt(1), unpoppedRecoveredTags(0), - logRouterPopToVersion(0), locality(tagLocalityInvalid), execOpCommitInProgress(false) + explicit LogData(TLogData* tLogData, TLogInterface interf, Tag remoteTag, bool isPrimary, int logRouterTags, int txsTags, UID recruitmentID, std::vector<Tag> tags, std::string context) + : tLogData(tLogData), knownCommittedVersion(0), logId(interf.id()), + cc("TLog", interf.id().toString()), bytesInput("BytesInput", cc), bytesDurable("BytesDurable", cc), remoteTag(remoteTag), isPrimary(isPrimary), logRouterTags(logRouterTags), txsTags(txsTags), recruitmentID(recruitmentID), + logSystem(new AsyncVar<Reference<ILogSystem>>()), logRouterPoppedVersion(0), durableKnownCommittedVersion(0), minKnownCommittedVersion(0), allTags(tags.begin(), tags.end()), terminated(tLogData->terminated.getFuture()), + // These are initialized differently on init() or recovery + recoveryCount(), stopped(false), initialized(false), queueCommittingVersion(0), newPersistentDataVersion(invalidVersion), unrecoveredBefore(1), recoveredAt(1), unpoppedRecoveredTags(0), + logRouterPopToVersion(0), locality(tagLocalityInvalid), execOpCommitInProgress(false) { - startRole(Role::TRANSACTION_LOG, interf.id(), UID()); + startRole(Role::TRANSACTION_LOG, interf.id(), tLogData->workerID, {{"SharedTLog", tLogData->dbgid.shortString()}}, context); persistentDataVersion.init(LiteralStringRef("TLog.PersistentDataVersion"), cc.id); persistentDataDurableVersion.init(LiteralStringRef("TLog.PersistentDataDurableVersion"), cc.id); @@ -1478,7 +1480,7 @@ ACTOR Future<Void> rejoinMasters( TLogData* self, TLogInterface tli, DBRecoveryC if ( self->dbInfo->get().master.id() != lastMasterID) { // The TLogRejoinRequest is needed to establish communications with a new master, which doesn't have our TLogInterface TLogRejoinRequest req(tli); - TraceEvent("TLogRejoining", self->dbgid).detail("Master", self->dbInfo->get().master.id()); + TraceEvent("TLogRejoining", tli.id()).detail("Master", self->dbInfo->get().master.id()); choose { when(TLogRejoinReply rep = wait(brokenPromiseToNever(self->dbInfo->get().master.tlogRejoin.getReply(req)))) { @@ -1973,7 +1975,7 @@ ACTOR Future<Void> restorePersistentState( TLogData* self, LocalityData locality tlogRequests.getFuture().pop().reply.sendError(recruitment_failed()); } - wait( oldTLog_4_6::tLog(self->persistentData, self->rawPersistentQueue, self->dbInfo, locality, self->dbgid) ); + wait( oldTLog_4_6::tLog(self->persistentData, self->rawPersistentQueue, self->dbInfo, locality, self->dbgid, self->workerID) ); throw internal_error(); } @@ -2019,7 +2021,7 @@ ACTOR Future<Void> restorePersistentState( TLogData* self, LocalityData locality DUMPTOKEN( recruited.confirmRunning ); //We do not need the remoteTag, because we will not be loading any additional data - logData = Reference<LogData>( new LogData(self, recruited, Tag(), true, id_logRouterTags[id1], id_txsTags[id1], UID(), std::vector<Tag>()) ); + logData = Reference<LogData>( new LogData(self, recruited, Tag(), true, id_logRouterTags[id1], id_txsTags[id1], UID(), std::vector<Tag>(), "Restored") ); logData->locality = id_locality[id1]; logData->stopped = true; self->id_data[id1] = logData; @@ -2202,7 +2204,8 @@ ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, Localit it.second->stopCommit.trigger(); } - state Reference<LogData> logData = Reference<LogData>( new LogData(self, recruited, req.remoteTag, req.isPrimary, req.logRouterTags, req.txsTags, req.recruitmentID, req.allTags) ); + bool recovering = (req.recoverFrom.logSystemType == LogSystemType::tagPartitioned); + state Reference<LogData> logData = Reference<LogData>( new LogData(self, recruited, req.remoteTag, req.isPrimary, req.logRouterTags, req.txsTags, req.recruitmentID, req.allTags, recovering ? "Recovered" : "Recruited") ); self->id_data[recruited.id()] = logData; logData->locality = req.locality; logData->recoveryCount = req.epoch; @@ -2218,7 +2221,7 @@ ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, Localit throw logData->removed.getError(); } - if (req.recoverFrom.logSystemType == LogSystemType::tagPartitioned) { + if (recovering) { logData->unrecoveredBefore = req.startVersion; logData->recoveredAt = req.recoverAt; logData->knownCommittedVersion = req.startVersion - 1; @@ -2324,13 +2327,11 @@ ACTOR Future<Void> startSpillingInTenSeconds(TLogData* self, UID tlogId, Referen } // New tLog (if !recoverFrom.size()) or restore from network -ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQueue, Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality, PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, bool restoreFromDisk, Promise<Void> oldLog, Promise<Void> recovered, std::string folder, Reference<AsyncVar<bool>> degraded, Reference<AsyncVar<UID>> activeSharedTLog) { - state TLogData self( tlogId, persistentData, persistentQueue, db, degraded, folder ); +ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQueue, Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality, PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, UID workerID, bool restoreFromDisk, Promise<Void> oldLog, Promise<Void> recovered, std::string folder, Reference<AsyncVar<bool>> degraded, Reference<AsyncVar<UID>> activeSharedTLog) { + state TLogData self( tlogId, workerID, persistentData, persistentQueue, db, degraded, folder ); state Future<Void> error = actorCollection( self.sharedActors.getFuture() ); TraceEvent("SharedTlog", tlogId); - // FIXME: Pass the worker id instead of stubbing it - startRole(Role::SHARED_TRANSACTION_LOG, tlogId, UID()); try { if(restoreFromDisk) { wait( restorePersistentState( &self, locality, oldLog, recovered, tlogRequests ) ); @@ -2371,7 +2372,6 @@ ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQ } catch (Error& e) { self.terminated.send(Void()); TraceEvent("TLogError", tlogId).error(e, true); - endRole(Role::SHARED_TRANSACTION_LOG, tlogId, "Error", true); if(recovered.canBeSet()) recovered.send(Void()); while(!tlogRequests.isEmpty()) { diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index f58b97f179..fcfb40a3ab 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -295,6 +295,7 @@ struct TLogData : NonCopyable { std::map<UID, Reference<struct LogData>> id_data; UID dbgid; + UID workerID; IKeyValueStore* persistentData; IDiskQueue* rawPersistentQueue; @@ -337,8 +338,8 @@ struct TLogData : NonCopyable { Reference<AsyncVar<bool>> degraded; std::vector<TagsAndMessage> tempTagMessages; - TLogData(UID dbgid, IKeyValueStore* persistentData, IDiskQueue * persistentQueue, Reference<AsyncVar<ServerDBInfo>> dbInfo, Reference<AsyncVar<bool>> degraded, std::string folder) - : dbgid(dbgid), instanceID(deterministicRandom()->randomUniqueID().first()), + TLogData(UID dbgid, UID workerID, IKeyValueStore* persistentData, IDiskQueue * persistentQueue, Reference<AsyncVar<ServerDBInfo>> dbInfo, Reference<AsyncVar<bool>> degraded, std::string folder) + : dbgid(dbgid), workerID(workerID), instanceID(deterministicRandom()->randomUniqueID().first()), persistentData(persistentData), rawPersistentQueue(persistentQueue), persistentQueue(new TLogQueue(persistentQueue, dbgid)), dbInfo(dbInfo), degraded(degraded), queueCommitBegin(0), queueCommitEnd(0), diskQueueCommitBytes(0), largeDiskQueueCommitBytes(false), bytesInput(0), bytesDurable(0), targetVolatileBytes(SERVER_KNOBS->TLOG_SPILL_THRESHOLD), overheadBytesInput(0), overheadBytesDurable(0), @@ -499,15 +500,16 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> { bool execOpCommitInProgress; int txsTags; - explicit LogData(TLogData* tLogData, TLogInterface interf, Tag remoteTag, bool isPrimary, int logRouterTags, int txsTags, UID recruitmentID, ProtocolVersion protocolVersion, std::vector<Tag> tags) : tLogData(tLogData), knownCommittedVersion(0), logId(interf.id()), - cc("TLog", interf.id().toString()), bytesInput("BytesInput", cc), bytesDurable("BytesDurable", cc), remoteTag(remoteTag), isPrimary(isPrimary), logRouterTags(logRouterTags), txsTags(txsTags), recruitmentID(recruitmentID), protocolVersion(protocolVersion), - logSystem(new AsyncVar<Reference<ILogSystem>>()), logRouterPoppedVersion(0), durableKnownCommittedVersion(0), minKnownCommittedVersion(0), queuePoppedVersion(0), allTags(tags.begin(), tags.end()), terminated(tLogData->terminated.getFuture()), - minPoppedTagVersion(0), minPoppedTag(invalidTag), + explicit LogData(TLogData* tLogData, TLogInterface interf, Tag remoteTag, bool isPrimary, int logRouterTags, int txsTags, UID recruitmentID, ProtocolVersion protocolVersion, std::vector<Tag> tags, std::string context) + : tLogData(tLogData), knownCommittedVersion(0), logId(interf.id()), + cc("TLog", interf.id().toString()), bytesInput("BytesInput", cc), bytesDurable("BytesDurable", cc), remoteTag(remoteTag), isPrimary(isPrimary), logRouterTags(logRouterTags), txsTags(txsTags), recruitmentID(recruitmentID), protocolVersion(protocolVersion), + logSystem(new AsyncVar<Reference<ILogSystem>>()), logRouterPoppedVersion(0), durableKnownCommittedVersion(0), minKnownCommittedVersion(0), queuePoppedVersion(0), allTags(tags.begin(), tags.end()), terminated(tLogData->terminated.getFuture()), + minPoppedTagVersion(0), minPoppedTag(invalidTag), // These are initialized differently on init() or recovery recoveryCount(), stopped(false), initialized(false), queueCommittingVersion(0), newPersistentDataVersion(invalidVersion), unrecoveredBefore(1), recoveredAt(1), unpoppedRecoveredTags(0), logRouterPopToVersion(0), locality(tagLocalityInvalid), execOpCommitInProgress(false) { - startRole(Role::TRANSACTION_LOG, interf.id(), UID()); + startRole(Role::TRANSACTION_LOG, interf.id(), tLogData->workerID, {{"SharedTLog", tLogData->dbgid.shortString()}}, context); persistentDataVersion.init(LiteralStringRef("TLog.PersistentDataVersion"), cc.id); persistentDataDurableVersion.init(LiteralStringRef("TLog.PersistentDataDurableVersion"), cc.id); @@ -1870,7 +1872,7 @@ ACTOR Future<Void> rejoinMasters( TLogData* self, TLogInterface tli, DBRecoveryC if ( self->dbInfo->get().master.id() != lastMasterID) { // The TLogRejoinRequest is needed to establish communications with a new master, which doesn't have our TLogInterface TLogRejoinRequest req(tli); - TraceEvent("TLogRejoining", self->dbgid).detail("Master", self->dbInfo->get().master.id()); + TraceEvent("TLogRejoining", tli.id()).detail("Master", self->dbInfo->get().master.id()); choose { when(TLogRejoinReply rep = wait(brokenPromiseToNever(self->dbInfo->get().master.tlogRejoin.getReply(req)))) { @@ -2424,7 +2426,7 @@ ACTOR Future<Void> restorePersistentState( TLogData* self, LocalityData locality ProtocolVersion protocolVersion = BinaryReader::fromStringRef<ProtocolVersion>( fProtocolVersions.get()[idx].value, Unversioned() ); //We do not need the remoteTag, because we will not be loading any additional data - logData = Reference<LogData>( new LogData(self, recruited, Tag(), true, id_logRouterTags[id1], id_txsTags[id1], UID(), protocolVersion, std::vector<Tag>()) ); + logData = Reference<LogData>( new LogData(self, recruited, Tag(), true, id_logRouterTags[id1], id_txsTags[id1], UID(), protocolVersion, std::vector<Tag>(), "Restored") ); logData->locality = id_locality[id1]; logData->stopped = true; self->id_data[id1] = logData; @@ -2631,7 +2633,8 @@ ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, Localit stopAllTLogs(self, recruited.id()); - state Reference<LogData> logData = Reference<LogData>( new LogData(self, recruited, req.remoteTag, req.isPrimary, req.logRouterTags, req.txsTags, req.recruitmentID, currentProtocolVersion, req.allTags) ); + bool recovering = (req.recoverFrom.logSystemType == LogSystemType::tagPartitioned); + state Reference<LogData> logData = Reference<LogData>( new LogData(self, recruited, req.remoteTag, req.isPrimary, req.logRouterTags, req.txsTags, req.recruitmentID, currentProtocolVersion, req.allTags, recovering ? "Recovered" : "Recruited") ); self->id_data[recruited.id()] = logData; logData->locality = req.locality; logData->recoveryCount = req.epoch; @@ -2649,7 +2652,7 @@ ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, Localit throw logData->removed.getError(); } - if (req.recoverFrom.logSystemType == LogSystemType::tagPartitioned) { + if (recovering) { logData->unrecoveredBefore = req.startVersion; logData->recoveredAt = req.recoverAt; logData->knownCommittedVersion = req.startVersion - 1; @@ -2758,13 +2761,11 @@ ACTOR Future<Void> startSpillingInTenSeconds(TLogData* self, UID tlogId, Referen } // New tLog (if !recoverFrom.size()) or restore from network -ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQueue, Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality, PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, bool restoreFromDisk, Promise<Void> oldLog, Promise<Void> recovered, std::string folder, Reference<AsyncVar<bool>> degraded, Reference<AsyncVar<UID>> activeSharedTLog ) { - state TLogData self( tlogId, persistentData, persistentQueue, db, degraded, folder ); +ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQueue, Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality, PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, UID workerID, bool restoreFromDisk, Promise<Void> oldLog, Promise<Void> recovered, std::string folder, Reference<AsyncVar<bool>> degraded, Reference<AsyncVar<UID>> activeSharedTLog ) { + state TLogData self( tlogId, workerID, persistentData, persistentQueue, db, degraded, folder ); state Future<Void> error = actorCollection( self.sharedActors.getFuture() ); TraceEvent("SharedTlog", tlogId); - // FIXME: Pass the worker id instead of stubbing it - startRole(Role::SHARED_TRANSACTION_LOG, tlogId, UID()); try { if(restoreFromDisk) { wait( restorePersistentState( &self, locality, oldLog, recovered, tlogRequests ) ); @@ -2808,7 +2809,6 @@ ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQ } catch (Error& e) { self.terminated.send(Void()); TraceEvent("TLogError", tlogId).error(e, true); - endRole(Role::SHARED_TRANSACTION_LOG, tlogId, "Error", true); if(recovered.canBeSet()) recovered.send(Void()); while(!tlogRequests.isEmpty()) { diff --git a/fdbserver/WorkerInterface.actor.h b/fdbserver/WorkerInterface.actor.h index 114b1f1c36..a2eb851db2 100644 --- a/fdbserver/WorkerInterface.actor.h +++ b/fdbserver/WorkerInterface.actor.h @@ -418,7 +418,7 @@ private: } }; -void startRole(const Role &role, UID roleId, UID workerId, std::map<std::string, std::string> details = std::map<std::string, std::string>(), std::string origination = "Recruited"); +void startRole(const Role &role, UID roleId, UID workerId, const std::map<std::string, std::string> &details = std::map<std::string, std::string>(), const std::string &origination = "Recruited"); void endRole(const Role &role, UID id, std::string reason, bool ok = true, Error e = Error()); struct ServerDBInfo; @@ -455,8 +455,8 @@ ACTOR Future<Void> masterProxyServer(MasterProxyInterface proxy, InitializeMaste Reference<AsyncVar<ServerDBInfo>> db, std::string whitelistBinPaths); ACTOR Future<Void> tLog(IKeyValueStore* persistentData, IDiskQueue* persistentQueue, Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality, - PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, bool restoreFromDisk, - Promise<Void> oldLog, Promise<Void> recovered, std::string folder, + PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, UID workerID, + bool restoreFromDisk, Promise<Void> oldLog, Promise<Void> recovered, std::string folder, Reference<AsyncVar<bool>> degraded, Reference<AsyncVar<UID>> activeSharedTLog); ACTOR Future<Void> monitorServerDBInfo(Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> ccInterface, @@ -474,13 +474,13 @@ void updateCpuProfiler(ProfilerRequest req); namespace oldTLog_4_6 { ACTOR Future<Void> tLog(IKeyValueStore* persistentData, IDiskQueue* persistentQueue, - Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality, UID tlogId); + Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality, UID tlogId, UID workerID); } namespace oldTLog_6_0 { ACTOR Future<Void> tLog(IKeyValueStore* persistentData, IDiskQueue* persistentQueue, Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality, - PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, bool restoreFromDisk, - Promise<Void> oldLog, Promise<Void> recovered, std::string folder, + PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, UID workerID, + bool restoreFromDisk, Promise<Void> oldLog, Promise<Void> recovered, std::string folder, Reference<AsyncVar<bool>> degraded, Reference<AsyncVar<UID>> activeSharedTLog); } diff --git a/fdbserver/worker.actor.cpp b/fdbserver/worker.actor.cpp index 0d86c86974..3c4f0f48c9 100644 --- a/fdbserver/worker.actor.cpp +++ b/fdbserver/worker.actor.cpp @@ -618,7 +618,7 @@ Standalone<StringRef> roleString(std::set<std::pair<std::string, std::string>> r return StringRef(result); } -void startRole(const Role &role, UID roleId, UID workerId, std::map<std::string, std::string> details, std::string origination) { +void startRole(const Role &role, UID roleId, UID workerId, const std::map<std::string, std::string> &details, const std::string &origination) { if(role.includeInTraceRoles) { addTraceRole(role.abbreviation); } @@ -921,7 +921,7 @@ ACTOR Future<Void> workerServer( auto& logData = sharedLogs[std::make_tuple(s.tLogOptions.version, s.storeType, s.tLogOptions.spillType)]; // FIXME: Shouldn't if logData.first isValid && !isReady, shouldn't we // be sending a fake InitializeTLogRequest rather than calling tLog() ? - Future<Void> tl = tLogFn( kv, queue, dbInfo, locality, !logData.actor.isValid() || logData.actor.isReady() ? logData.requests : PromiseStream<InitializeTLogRequest>(), s.storeID, true, oldLog, recovery, folder, degraded, activeSharedTLog ); + Future<Void> tl = tLogFn( kv, queue, dbInfo, locality, !logData.actor.isValid() || logData.actor.isReady() ? logData.requests : PromiseStream<InitializeTLogRequest>(), s.storeID, interf.id(), true, oldLog, recovery, folder, degraded, activeSharedTLog ); recoveries.push_back(recovery.getFuture()); activeSharedTLog->set(s.storeID); @@ -1087,7 +1087,7 @@ ACTOR Future<Void> workerServer( filesClosed.add( data->onClosed() ); filesClosed.add( queue->onClosed() ); - Future<Void> tLogCore = tLogFn( data, queue, dbInfo, locality, logData.requests, logId, false, Promise<Void>(), Promise<Void>(), folder, degraded, activeSharedTLog ); + Future<Void> tLogCore = tLogFn( data, queue, dbInfo, locality, logData.requests, logId, interf.id(), false, Promise<Void>(), Promise<Void>(), folder, degraded, activeSharedTLog ); tLogCore = handleIOErrors( tLogCore, data, logId ); tLogCore = handleIOErrors( tLogCore, queue, logId ); errorForwarders.add( forwardError( errors, Role::SHARED_TRANSACTION_LOG, logId, tLogCore ) );