Merge pull request #2672 from ajbeamon/improve-tlog-role-event
Improve TLog "Role" event
This commit is contained in:
commit
6d9decdf59
|
@ -270,6 +270,7 @@ namespace oldTLog_4_6 {
|
|||
std::map<UID, Reference<struct LogData>> id_data;
|
||||
|
||||
UID dbgid;
|
||||
UID workerID;
|
||||
|
||||
IKeyValueStore* persistentData;
|
||||
IDiskQueue* rawPersistentQueue;
|
||||
|
@ -303,8 +304,8 @@ namespace oldTLog_4_6 {
|
|||
PromiseStream<Future<Void>> sharedActors;
|
||||
bool terminated;
|
||||
|
||||
TLogData(UID dbgid, IKeyValueStore* persistentData, IDiskQueue * persistentQueue, Reference<AsyncVar<ServerDBInfo>> const& dbInfo)
|
||||
: dbgid(dbgid), instanceID(deterministicRandom()->randomUniqueID().first()),
|
||||
TLogData(UID dbgid, UID workerID, IKeyValueStore* persistentData, IDiskQueue * persistentQueue, Reference<AsyncVar<ServerDBInfo>> const& dbInfo)
|
||||
: dbgid(dbgid), workerID(workerID), instanceID(deterministicRandom()->randomUniqueID().first()),
|
||||
persistentData(persistentData), rawPersistentQueue(persistentQueue), persistentQueue(new TLogQueue(persistentQueue, dbgid)),
|
||||
dbInfo(dbInfo), queueCommitBegin(0), queueCommitEnd(0), prevVersion(0),
|
||||
diskQueueCommitBytes(0), largeDiskQueueCommitBytes(false),
|
||||
|
@ -412,7 +413,7 @@ namespace oldTLog_4_6 {
|
|||
// These are initialized differently on init() or recovery
|
||||
recoveryCount(), stopped(false), initialized(false), queueCommittingVersion(0), newPersistentDataVersion(invalidVersion), recovery(Void())
|
||||
{
|
||||
startRole(Role::TRANSACTION_LOG,interf.id(), UID());
|
||||
startRole(Role::TRANSACTION_LOG, interf.id(), tLogData->workerID, {{"SharedTLog", tLogData->dbgid.shortString()}}, "Restored");
|
||||
|
||||
persistentDataVersion.init(LiteralStringRef("TLog.PersistentDataVersion"), cc.id);
|
||||
persistentDataDurableVersion.init(LiteralStringRef("TLog.PersistentDataDurableVersion"), cc.id);
|
||||
|
@ -1120,7 +1121,7 @@ namespace oldTLog_4_6 {
|
|||
// The TLogRejoinRequest is needed to establish communications with a new master, which doesn't have our TLogInterface
|
||||
TLogRejoinRequest req;
|
||||
req.myInterface = tli;
|
||||
TraceEvent("TLogRejoining", self->dbgid).detail("Master", self->dbInfo->get().master.id());
|
||||
TraceEvent("TLogRejoining", tli.id()).detail("Master", self->dbInfo->get().master.id());
|
||||
choose {
|
||||
when(TLogRejoinReply rep =
|
||||
wait(brokenPromiseToNever(self->dbInfo->get().master.tlogRejoin.getReply(req)))) {
|
||||
|
@ -1421,9 +1422,9 @@ namespace oldTLog_4_6 {
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQueue, Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality, UID tlogId )
|
||||
ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQueue, Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality, UID tlogId, UID workerID )
|
||||
{
|
||||
state TLogData self( tlogId, persistentData, persistentQueue, db );
|
||||
state TLogData self( tlogId, workerID, persistentData, persistentQueue, db );
|
||||
state Future<Void> error = actorCollection( self.sharedActors.getFuture() );
|
||||
|
||||
TraceEvent("SharedTlog", tlogId);
|
||||
|
|
|
@ -245,6 +245,7 @@ struct TLogData : NonCopyable {
|
|||
std::map<UID, Reference<struct LogData>> id_data;
|
||||
|
||||
UID dbgid;
|
||||
UID workerID;
|
||||
|
||||
IKeyValueStore* persistentData;
|
||||
IDiskQueue* rawPersistentQueue;
|
||||
|
@ -286,8 +287,8 @@ struct TLogData : NonCopyable {
|
|||
Reference<AsyncVar<bool>> degraded;
|
||||
std::vector<TagsAndMessage> tempTagMessages;
|
||||
|
||||
TLogData(UID dbgid, IKeyValueStore* persistentData, IDiskQueue * persistentQueue, Reference<AsyncVar<ServerDBInfo>> dbInfo, Reference<AsyncVar<bool>> degraded, std::string folder)
|
||||
: dbgid(dbgid), instanceID(deterministicRandom()->randomUniqueID().first()),
|
||||
TLogData(UID dbgid, UID workerID, IKeyValueStore* persistentData, IDiskQueue * persistentQueue, Reference<AsyncVar<ServerDBInfo>> dbInfo, Reference<AsyncVar<bool>> degraded, std::string folder)
|
||||
: dbgid(dbgid), workerID(workerID), instanceID(deterministicRandom()->randomUniqueID().first()),
|
||||
persistentData(persistentData), rawPersistentQueue(persistentQueue), persistentQueue(new TLogQueue(persistentQueue, dbgid)),
|
||||
dbInfo(dbInfo), degraded(degraded), queueCommitBegin(0), queueCommitEnd(0),
|
||||
diskQueueCommitBytes(0), largeDiskQueueCommitBytes(false), bytesInput(0), bytesDurable(0), targetVolatileBytes(SERVER_KNOBS->TLOG_SPILL_THRESHOLD), overheadBytesInput(0), overheadBytesDurable(0),
|
||||
|
@ -439,14 +440,15 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
|
|||
bool execOpCommitInProgress;
|
||||
int txsTags;
|
||||
|
||||
explicit LogData(TLogData* tLogData, TLogInterface interf, Tag remoteTag, bool isPrimary, int logRouterTags, int txsTags, UID recruitmentID, std::vector<Tag> tags) : tLogData(tLogData), knownCommittedVersion(0), logId(interf.id()),
|
||||
cc("TLog", interf.id().toString()), bytesInput("BytesInput", cc), bytesDurable("BytesDurable", cc), remoteTag(remoteTag), isPrimary(isPrimary), logRouterTags(logRouterTags), txsTags(txsTags), recruitmentID(recruitmentID),
|
||||
logSystem(new AsyncVar<Reference<ILogSystem>>()), logRouterPoppedVersion(0), durableKnownCommittedVersion(0), minKnownCommittedVersion(0), allTags(tags.begin(), tags.end()), terminated(tLogData->terminated.getFuture()),
|
||||
// These are initialized differently on init() or recovery
|
||||
recoveryCount(), stopped(false), initialized(false), queueCommittingVersion(0), newPersistentDataVersion(invalidVersion), unrecoveredBefore(1), recoveredAt(1), unpoppedRecoveredTags(0),
|
||||
logRouterPopToVersion(0), locality(tagLocalityInvalid), execOpCommitInProgress(false)
|
||||
explicit LogData(TLogData* tLogData, TLogInterface interf, Tag remoteTag, bool isPrimary, int logRouterTags, int txsTags, UID recruitmentID, std::vector<Tag> tags, std::string context)
|
||||
: tLogData(tLogData), knownCommittedVersion(0), logId(interf.id()),
|
||||
cc("TLog", interf.id().toString()), bytesInput("BytesInput", cc), bytesDurable("BytesDurable", cc), remoteTag(remoteTag), isPrimary(isPrimary), logRouterTags(logRouterTags), txsTags(txsTags), recruitmentID(recruitmentID),
|
||||
logSystem(new AsyncVar<Reference<ILogSystem>>()), logRouterPoppedVersion(0), durableKnownCommittedVersion(0), minKnownCommittedVersion(0), allTags(tags.begin(), tags.end()), terminated(tLogData->terminated.getFuture()),
|
||||
// These are initialized differently on init() or recovery
|
||||
recoveryCount(), stopped(false), initialized(false), queueCommittingVersion(0), newPersistentDataVersion(invalidVersion), unrecoveredBefore(1), recoveredAt(1), unpoppedRecoveredTags(0),
|
||||
logRouterPopToVersion(0), locality(tagLocalityInvalid), execOpCommitInProgress(false)
|
||||
{
|
||||
startRole(Role::TRANSACTION_LOG, interf.id(), UID());
|
||||
startRole(Role::TRANSACTION_LOG, interf.id(), tLogData->workerID, {{"SharedTLog", tLogData->dbgid.shortString()}}, context);
|
||||
|
||||
persistentDataVersion.init(LiteralStringRef("TLog.PersistentDataVersion"), cc.id);
|
||||
persistentDataDurableVersion.init(LiteralStringRef("TLog.PersistentDataDurableVersion"), cc.id);
|
||||
|
@ -1478,7 +1480,7 @@ ACTOR Future<Void> rejoinMasters( TLogData* self, TLogInterface tli, DBRecoveryC
|
|||
if ( self->dbInfo->get().master.id() != lastMasterID) {
|
||||
// The TLogRejoinRequest is needed to establish communications with a new master, which doesn't have our TLogInterface
|
||||
TLogRejoinRequest req(tli);
|
||||
TraceEvent("TLogRejoining", self->dbgid).detail("Master", self->dbInfo->get().master.id());
|
||||
TraceEvent("TLogRejoining", tli.id()).detail("Master", self->dbInfo->get().master.id());
|
||||
choose {
|
||||
when(TLogRejoinReply rep =
|
||||
wait(brokenPromiseToNever(self->dbInfo->get().master.tlogRejoin.getReply(req)))) {
|
||||
|
@ -1973,7 +1975,7 @@ ACTOR Future<Void> restorePersistentState( TLogData* self, LocalityData locality
|
|||
tlogRequests.getFuture().pop().reply.sendError(recruitment_failed());
|
||||
}
|
||||
|
||||
wait( oldTLog_4_6::tLog(self->persistentData, self->rawPersistentQueue, self->dbInfo, locality, self->dbgid) );
|
||||
wait( oldTLog_4_6::tLog(self->persistentData, self->rawPersistentQueue, self->dbInfo, locality, self->dbgid, self->workerID) );
|
||||
throw internal_error();
|
||||
}
|
||||
|
||||
|
@ -2019,7 +2021,7 @@ ACTOR Future<Void> restorePersistentState( TLogData* self, LocalityData locality
|
|||
DUMPTOKEN( recruited.confirmRunning );
|
||||
|
||||
//We do not need the remoteTag, because we will not be loading any additional data
|
||||
logData = Reference<LogData>( new LogData(self, recruited, Tag(), true, id_logRouterTags[id1], id_txsTags[id1], UID(), std::vector<Tag>()) );
|
||||
logData = Reference<LogData>( new LogData(self, recruited, Tag(), true, id_logRouterTags[id1], id_txsTags[id1], UID(), std::vector<Tag>(), "Restored") );
|
||||
logData->locality = id_locality[id1];
|
||||
logData->stopped = true;
|
||||
self->id_data[id1] = logData;
|
||||
|
@ -2202,7 +2204,8 @@ ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, Localit
|
|||
it.second->stopCommit.trigger();
|
||||
}
|
||||
|
||||
state Reference<LogData> logData = Reference<LogData>( new LogData(self, recruited, req.remoteTag, req.isPrimary, req.logRouterTags, req.txsTags, req.recruitmentID, req.allTags) );
|
||||
bool recovering = (req.recoverFrom.logSystemType == LogSystemType::tagPartitioned);
|
||||
state Reference<LogData> logData = Reference<LogData>( new LogData(self, recruited, req.remoteTag, req.isPrimary, req.logRouterTags, req.txsTags, req.recruitmentID, req.allTags, recovering ? "Recovered" : "Recruited") );
|
||||
self->id_data[recruited.id()] = logData;
|
||||
logData->locality = req.locality;
|
||||
logData->recoveryCount = req.epoch;
|
||||
|
@ -2218,7 +2221,7 @@ ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, Localit
|
|||
throw logData->removed.getError();
|
||||
}
|
||||
|
||||
if (req.recoverFrom.logSystemType == LogSystemType::tagPartitioned) {
|
||||
if (recovering) {
|
||||
logData->unrecoveredBefore = req.startVersion;
|
||||
logData->recoveredAt = req.recoverAt;
|
||||
logData->knownCommittedVersion = req.startVersion - 1;
|
||||
|
@ -2324,13 +2327,11 @@ ACTOR Future<Void> startSpillingInTenSeconds(TLogData* self, UID tlogId, Referen
|
|||
}
|
||||
|
||||
// New tLog (if !recoverFrom.size()) or restore from network
|
||||
ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQueue, Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality, PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, bool restoreFromDisk, Promise<Void> oldLog, Promise<Void> recovered, std::string folder, Reference<AsyncVar<bool>> degraded, Reference<AsyncVar<UID>> activeSharedTLog) {
|
||||
state TLogData self( tlogId, persistentData, persistentQueue, db, degraded, folder );
|
||||
ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQueue, Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality, PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, UID workerID, bool restoreFromDisk, Promise<Void> oldLog, Promise<Void> recovered, std::string folder, Reference<AsyncVar<bool>> degraded, Reference<AsyncVar<UID>> activeSharedTLog) {
|
||||
state TLogData self( tlogId, workerID, persistentData, persistentQueue, db, degraded, folder );
|
||||
state Future<Void> error = actorCollection( self.sharedActors.getFuture() );
|
||||
|
||||
TraceEvent("SharedTlog", tlogId);
|
||||
// FIXME: Pass the worker id instead of stubbing it
|
||||
startRole(Role::SHARED_TRANSACTION_LOG, tlogId, UID());
|
||||
try {
|
||||
if(restoreFromDisk) {
|
||||
wait( restorePersistentState( &self, locality, oldLog, recovered, tlogRequests ) );
|
||||
|
@ -2371,7 +2372,6 @@ ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQ
|
|||
} catch (Error& e) {
|
||||
self.terminated.send(Void());
|
||||
TraceEvent("TLogError", tlogId).error(e, true);
|
||||
endRole(Role::SHARED_TRANSACTION_LOG, tlogId, "Error", true);
|
||||
if(recovered.canBeSet()) recovered.send(Void());
|
||||
|
||||
while(!tlogRequests.isEmpty()) {
|
||||
|
|
|
@ -295,6 +295,7 @@ struct TLogData : NonCopyable {
|
|||
std::map<UID, Reference<struct LogData>> id_data;
|
||||
|
||||
UID dbgid;
|
||||
UID workerID;
|
||||
|
||||
IKeyValueStore* persistentData;
|
||||
IDiskQueue* rawPersistentQueue;
|
||||
|
@ -337,8 +338,8 @@ struct TLogData : NonCopyable {
|
|||
Reference<AsyncVar<bool>> degraded;
|
||||
std::vector<TagsAndMessage> tempTagMessages;
|
||||
|
||||
TLogData(UID dbgid, IKeyValueStore* persistentData, IDiskQueue * persistentQueue, Reference<AsyncVar<ServerDBInfo>> dbInfo, Reference<AsyncVar<bool>> degraded, std::string folder)
|
||||
: dbgid(dbgid), instanceID(deterministicRandom()->randomUniqueID().first()),
|
||||
TLogData(UID dbgid, UID workerID, IKeyValueStore* persistentData, IDiskQueue * persistentQueue, Reference<AsyncVar<ServerDBInfo>> dbInfo, Reference<AsyncVar<bool>> degraded, std::string folder)
|
||||
: dbgid(dbgid), workerID(workerID), instanceID(deterministicRandom()->randomUniqueID().first()),
|
||||
persistentData(persistentData), rawPersistentQueue(persistentQueue), persistentQueue(new TLogQueue(persistentQueue, dbgid)),
|
||||
dbInfo(dbInfo), degraded(degraded), queueCommitBegin(0), queueCommitEnd(0),
|
||||
diskQueueCommitBytes(0), largeDiskQueueCommitBytes(false), bytesInput(0), bytesDurable(0), targetVolatileBytes(SERVER_KNOBS->TLOG_SPILL_THRESHOLD), overheadBytesInput(0), overheadBytesDurable(0),
|
||||
|
@ -499,15 +500,16 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
|
|||
bool execOpCommitInProgress;
|
||||
int txsTags;
|
||||
|
||||
explicit LogData(TLogData* tLogData, TLogInterface interf, Tag remoteTag, bool isPrimary, int logRouterTags, int txsTags, UID recruitmentID, ProtocolVersion protocolVersion, std::vector<Tag> tags) : tLogData(tLogData), knownCommittedVersion(0), logId(interf.id()),
|
||||
cc("TLog", interf.id().toString()), bytesInput("BytesInput", cc), bytesDurable("BytesDurable", cc), remoteTag(remoteTag), isPrimary(isPrimary), logRouterTags(logRouterTags), txsTags(txsTags), recruitmentID(recruitmentID), protocolVersion(protocolVersion),
|
||||
logSystem(new AsyncVar<Reference<ILogSystem>>()), logRouterPoppedVersion(0), durableKnownCommittedVersion(0), minKnownCommittedVersion(0), queuePoppedVersion(0), allTags(tags.begin(), tags.end()), terminated(tLogData->terminated.getFuture()),
|
||||
minPoppedTagVersion(0), minPoppedTag(invalidTag),
|
||||
explicit LogData(TLogData* tLogData, TLogInterface interf, Tag remoteTag, bool isPrimary, int logRouterTags, int txsTags, UID recruitmentID, ProtocolVersion protocolVersion, std::vector<Tag> tags, std::string context)
|
||||
: tLogData(tLogData), knownCommittedVersion(0), logId(interf.id()),
|
||||
cc("TLog", interf.id().toString()), bytesInput("BytesInput", cc), bytesDurable("BytesDurable", cc), remoteTag(remoteTag), isPrimary(isPrimary), logRouterTags(logRouterTags), txsTags(txsTags), recruitmentID(recruitmentID), protocolVersion(protocolVersion),
|
||||
logSystem(new AsyncVar<Reference<ILogSystem>>()), logRouterPoppedVersion(0), durableKnownCommittedVersion(0), minKnownCommittedVersion(0), queuePoppedVersion(0), allTags(tags.begin(), tags.end()), terminated(tLogData->terminated.getFuture()),
|
||||
minPoppedTagVersion(0), minPoppedTag(invalidTag),
|
||||
// These are initialized differently on init() or recovery
|
||||
recoveryCount(), stopped(false), initialized(false), queueCommittingVersion(0), newPersistentDataVersion(invalidVersion), unrecoveredBefore(1), recoveredAt(1), unpoppedRecoveredTags(0),
|
||||
logRouterPopToVersion(0), locality(tagLocalityInvalid), execOpCommitInProgress(false)
|
||||
{
|
||||
startRole(Role::TRANSACTION_LOG, interf.id(), UID());
|
||||
startRole(Role::TRANSACTION_LOG, interf.id(), tLogData->workerID, {{"SharedTLog", tLogData->dbgid.shortString()}}, context);
|
||||
|
||||
persistentDataVersion.init(LiteralStringRef("TLog.PersistentDataVersion"), cc.id);
|
||||
persistentDataDurableVersion.init(LiteralStringRef("TLog.PersistentDataDurableVersion"), cc.id);
|
||||
|
@ -1870,7 +1872,7 @@ ACTOR Future<Void> rejoinMasters( TLogData* self, TLogInterface tli, DBRecoveryC
|
|||
if ( self->dbInfo->get().master.id() != lastMasterID) {
|
||||
// The TLogRejoinRequest is needed to establish communications with a new master, which doesn't have our TLogInterface
|
||||
TLogRejoinRequest req(tli);
|
||||
TraceEvent("TLogRejoining", self->dbgid).detail("Master", self->dbInfo->get().master.id());
|
||||
TraceEvent("TLogRejoining", tli.id()).detail("Master", self->dbInfo->get().master.id());
|
||||
choose {
|
||||
when(TLogRejoinReply rep =
|
||||
wait(brokenPromiseToNever(self->dbInfo->get().master.tlogRejoin.getReply(req)))) {
|
||||
|
@ -2424,7 +2426,7 @@ ACTOR Future<Void> restorePersistentState( TLogData* self, LocalityData locality
|
|||
ProtocolVersion protocolVersion = BinaryReader::fromStringRef<ProtocolVersion>( fProtocolVersions.get()[idx].value, Unversioned() );
|
||||
|
||||
//We do not need the remoteTag, because we will not be loading any additional data
|
||||
logData = Reference<LogData>( new LogData(self, recruited, Tag(), true, id_logRouterTags[id1], id_txsTags[id1], UID(), protocolVersion, std::vector<Tag>()) );
|
||||
logData = Reference<LogData>( new LogData(self, recruited, Tag(), true, id_logRouterTags[id1], id_txsTags[id1], UID(), protocolVersion, std::vector<Tag>(), "Restored") );
|
||||
logData->locality = id_locality[id1];
|
||||
logData->stopped = true;
|
||||
self->id_data[id1] = logData;
|
||||
|
@ -2631,7 +2633,8 @@ ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, Localit
|
|||
|
||||
stopAllTLogs(self, recruited.id());
|
||||
|
||||
state Reference<LogData> logData = Reference<LogData>( new LogData(self, recruited, req.remoteTag, req.isPrimary, req.logRouterTags, req.txsTags, req.recruitmentID, currentProtocolVersion, req.allTags) );
|
||||
bool recovering = (req.recoverFrom.logSystemType == LogSystemType::tagPartitioned);
|
||||
state Reference<LogData> logData = Reference<LogData>( new LogData(self, recruited, req.remoteTag, req.isPrimary, req.logRouterTags, req.txsTags, req.recruitmentID, currentProtocolVersion, req.allTags, recovering ? "Recovered" : "Recruited") );
|
||||
self->id_data[recruited.id()] = logData;
|
||||
logData->locality = req.locality;
|
||||
logData->recoveryCount = req.epoch;
|
||||
|
@ -2649,7 +2652,7 @@ ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, Localit
|
|||
throw logData->removed.getError();
|
||||
}
|
||||
|
||||
if (req.recoverFrom.logSystemType == LogSystemType::tagPartitioned) {
|
||||
if (recovering) {
|
||||
logData->unrecoveredBefore = req.startVersion;
|
||||
logData->recoveredAt = req.recoverAt;
|
||||
logData->knownCommittedVersion = req.startVersion - 1;
|
||||
|
@ -2758,13 +2761,11 @@ ACTOR Future<Void> startSpillingInTenSeconds(TLogData* self, UID tlogId, Referen
|
|||
}
|
||||
|
||||
// New tLog (if !recoverFrom.size()) or restore from network
|
||||
ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQueue, Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality, PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, bool restoreFromDisk, Promise<Void> oldLog, Promise<Void> recovered, std::string folder, Reference<AsyncVar<bool>> degraded, Reference<AsyncVar<UID>> activeSharedTLog ) {
|
||||
state TLogData self( tlogId, persistentData, persistentQueue, db, degraded, folder );
|
||||
ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQueue, Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality, PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, UID workerID, bool restoreFromDisk, Promise<Void> oldLog, Promise<Void> recovered, std::string folder, Reference<AsyncVar<bool>> degraded, Reference<AsyncVar<UID>> activeSharedTLog ) {
|
||||
state TLogData self( tlogId, workerID, persistentData, persistentQueue, db, degraded, folder );
|
||||
state Future<Void> error = actorCollection( self.sharedActors.getFuture() );
|
||||
|
||||
TraceEvent("SharedTlog", tlogId);
|
||||
// FIXME: Pass the worker id instead of stubbing it
|
||||
startRole(Role::SHARED_TRANSACTION_LOG, tlogId, UID());
|
||||
try {
|
||||
if(restoreFromDisk) {
|
||||
wait( restorePersistentState( &self, locality, oldLog, recovered, tlogRequests ) );
|
||||
|
@ -2808,7 +2809,6 @@ ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQ
|
|||
} catch (Error& e) {
|
||||
self.terminated.send(Void());
|
||||
TraceEvent("TLogError", tlogId).error(e, true);
|
||||
endRole(Role::SHARED_TRANSACTION_LOG, tlogId, "Error", true);
|
||||
if(recovered.canBeSet()) recovered.send(Void());
|
||||
|
||||
while(!tlogRequests.isEmpty()) {
|
||||
|
|
|
@ -418,7 +418,7 @@ private:
|
|||
}
|
||||
};
|
||||
|
||||
void startRole(const Role &role, UID roleId, UID workerId, std::map<std::string, std::string> details = std::map<std::string, std::string>(), std::string origination = "Recruited");
|
||||
void startRole(const Role &role, UID roleId, UID workerId, const std::map<std::string, std::string> &details = std::map<std::string, std::string>(), const std::string &origination = "Recruited");
|
||||
void endRole(const Role &role, UID id, std::string reason, bool ok = true, Error e = Error());
|
||||
|
||||
struct ServerDBInfo;
|
||||
|
@ -455,8 +455,8 @@ ACTOR Future<Void> masterProxyServer(MasterProxyInterface proxy, InitializeMaste
|
|||
Reference<AsyncVar<ServerDBInfo>> db, std::string whitelistBinPaths);
|
||||
ACTOR Future<Void> tLog(IKeyValueStore* persistentData, IDiskQueue* persistentQueue,
|
||||
Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality,
|
||||
PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, bool restoreFromDisk,
|
||||
Promise<Void> oldLog, Promise<Void> recovered, std::string folder,
|
||||
PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, UID workerID,
|
||||
bool restoreFromDisk, Promise<Void> oldLog, Promise<Void> recovered, std::string folder,
|
||||
Reference<AsyncVar<bool>> degraded, Reference<AsyncVar<UID>> activeSharedTLog);
|
||||
|
||||
ACTOR Future<Void> monitorServerDBInfo(Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> ccInterface,
|
||||
|
@ -474,13 +474,13 @@ void updateCpuProfiler(ProfilerRequest req);
|
|||
|
||||
namespace oldTLog_4_6 {
|
||||
ACTOR Future<Void> tLog(IKeyValueStore* persistentData, IDiskQueue* persistentQueue,
|
||||
Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality, UID tlogId);
|
||||
Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality, UID tlogId, UID workerID);
|
||||
}
|
||||
namespace oldTLog_6_0 {
|
||||
ACTOR Future<Void> tLog(IKeyValueStore* persistentData, IDiskQueue* persistentQueue,
|
||||
Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality,
|
||||
PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, bool restoreFromDisk,
|
||||
Promise<Void> oldLog, Promise<Void> recovered, std::string folder,
|
||||
PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, UID workerID,
|
||||
bool restoreFromDisk, Promise<Void> oldLog, Promise<Void> recovered, std::string folder,
|
||||
Reference<AsyncVar<bool>> degraded, Reference<AsyncVar<UID>> activeSharedTLog);
|
||||
}
|
||||
|
||||
|
|
|
@ -618,7 +618,7 @@ Standalone<StringRef> roleString(std::set<std::pair<std::string, std::string>> r
|
|||
return StringRef(result);
|
||||
}
|
||||
|
||||
void startRole(const Role &role, UID roleId, UID workerId, std::map<std::string, std::string> details, std::string origination) {
|
||||
void startRole(const Role &role, UID roleId, UID workerId, const std::map<std::string, std::string> &details, const std::string &origination) {
|
||||
if(role.includeInTraceRoles) {
|
||||
addTraceRole(role.abbreviation);
|
||||
}
|
||||
|
@ -921,7 +921,7 @@ ACTOR Future<Void> workerServer(
|
|||
auto& logData = sharedLogs[std::make_tuple(s.tLogOptions.version, s.storeType, s.tLogOptions.spillType)];
|
||||
// FIXME: Shouldn't if logData.first isValid && !isReady, shouldn't we
|
||||
// be sending a fake InitializeTLogRequest rather than calling tLog() ?
|
||||
Future<Void> tl = tLogFn( kv, queue, dbInfo, locality, !logData.actor.isValid() || logData.actor.isReady() ? logData.requests : PromiseStream<InitializeTLogRequest>(), s.storeID, true, oldLog, recovery, folder, degraded, activeSharedTLog );
|
||||
Future<Void> tl = tLogFn( kv, queue, dbInfo, locality, !logData.actor.isValid() || logData.actor.isReady() ? logData.requests : PromiseStream<InitializeTLogRequest>(), s.storeID, interf.id(), true, oldLog, recovery, folder, degraded, activeSharedTLog );
|
||||
recoveries.push_back(recovery.getFuture());
|
||||
activeSharedTLog->set(s.storeID);
|
||||
|
||||
|
@ -1087,7 +1087,7 @@ ACTOR Future<Void> workerServer(
|
|||
filesClosed.add( data->onClosed() );
|
||||
filesClosed.add( queue->onClosed() );
|
||||
|
||||
Future<Void> tLogCore = tLogFn( data, queue, dbInfo, locality, logData.requests, logId, false, Promise<Void>(), Promise<Void>(), folder, degraded, activeSharedTLog );
|
||||
Future<Void> tLogCore = tLogFn( data, queue, dbInfo, locality, logData.requests, logId, interf.id(), false, Promise<Void>(), Promise<Void>(), folder, degraded, activeSharedTLog );
|
||||
tLogCore = handleIOErrors( tLogCore, data, logId );
|
||||
tLogCore = handleIOErrors( tLogCore, queue, logId );
|
||||
errorForwarders.add( forwardError( errors, Role::SHARED_TRANSACTION_LOG, logId, tLogCore ) );
|
||||
|
|
Loading…
Reference in New Issue