Add tags for backup worker trace events
This commit is contained in:
parent
52bdaeee39
commit
08d9f36071
|
@ -59,10 +59,10 @@ struct BackupData {
|
||||||
CounterCollection cc;
|
CounterCollection cc;
|
||||||
Future<Void> logger;
|
Future<Void> logger;
|
||||||
|
|
||||||
explicit BackupData(UID id, Reference<AsyncVar<ServerDBInfo>> db, const InitializeBackupRequest& req)
|
explicit BackupData(Reference<AsyncVar<ServerDBInfo>> db, const InitializeBackupRequest& req)
|
||||||
: myId(id), tag(req.routerTag), startVersion(req.startVersion), endVersion(req.endVersion),
|
: myId(req.reqId), tag(req.routerTag), startVersion(req.startVersion), endVersion(req.endVersion),
|
||||||
recruitedEpoch(req.recruitedEpoch), backupEpoch(req.backupEpoch), minKnownCommittedVersion(invalidVersion),
|
recruitedEpoch(req.recruitedEpoch), backupEpoch(req.backupEpoch), minKnownCommittedVersion(invalidVersion),
|
||||||
savedVersion(invalidVersion), lastSeenVersion(invalidVersion), cc("BackupWorker", id.toString()) {
|
savedVersion(invalidVersion), lastSeenVersion(invalidVersion), cc("BackupWorker", myId.toString()) {
|
||||||
cx = openDBOnServer(db, TaskPriority::DefaultEndpoint, true, true);
|
cx = openDBOnServer(db, TaskPriority::DefaultEndpoint, true, true);
|
||||||
pullFinished.set(false);
|
pullFinished.set(false);
|
||||||
|
|
||||||
|
@ -219,6 +219,7 @@ ACTOR Future<Void> uploadData(BackupData* self) {
|
||||||
if (popVersion > self->savedVersion) {
|
if (popVersion > self->savedVersion) {
|
||||||
wait(saveProgress(self, popVersion));
|
wait(saveProgress(self, popVersion));
|
||||||
TraceEvent("BackupWorkerSavedProgress", self->myId)
|
TraceEvent("BackupWorkerSavedProgress", self->myId)
|
||||||
|
.detail("Tag", self->tag.toString())
|
||||||
.detail("Version", popVersion)
|
.detail("Version", popVersion)
|
||||||
.detail("MsgQ", self->messages.size());
|
.detail("MsgQ", self->messages.size());
|
||||||
self->savedVersion = std::max(popVersion, self->savedVersion);
|
self->savedVersion = std::max(popVersion, self->savedVersion);
|
||||||
|
@ -267,6 +268,7 @@ ACTOR Future<Void> pullAsyncData(BackupData* self) {
|
||||||
if (self->endVersion.present() && tagAt > self->endVersion.get()) {
|
if (self->endVersion.present() && tagAt > self->endVersion.get()) {
|
||||||
self->eraseMessagesAfterEndVersion();
|
self->eraseMessagesAfterEndVersion();
|
||||||
TraceEvent("BackupWorkerFinishPull", self->myId)
|
TraceEvent("BackupWorkerFinishPull", self->myId)
|
||||||
|
.detail("Tag", self->tag.toString())
|
||||||
.detail("VersionGot", tagAt)
|
.detail("VersionGot", tagAt)
|
||||||
.detail("EndVersion", self->endVersion.get())
|
.detail("EndVersion", self->endVersion.get())
|
||||||
.detail("MsgQ", self->messages.size());
|
.detail("MsgQ", self->messages.size());
|
||||||
|
@ -297,12 +299,12 @@ ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo>> db, LogEpoch r
|
||||||
|
|
||||||
ACTOR Future<Void> backupWorker(BackupInterface interf, InitializeBackupRequest req,
|
ACTOR Future<Void> backupWorker(BackupInterface interf, InitializeBackupRequest req,
|
||||||
Reference<AsyncVar<ServerDBInfo>> db) {
|
Reference<AsyncVar<ServerDBInfo>> db) {
|
||||||
state BackupData self(interf.id(), db, req);
|
state BackupData self(db, req);
|
||||||
state PromiseStream<Future<Void>> addActor;
|
state PromiseStream<Future<Void>> addActor;
|
||||||
state Future<Void> error = actorCollection(addActor.getFuture());
|
state Future<Void> error = actorCollection(addActor.getFuture());
|
||||||
state Future<Void> dbInfoChange = Void();
|
state Future<Void> dbInfoChange = Void();
|
||||||
|
|
||||||
TraceEvent("BackupWorkerStart", interf.id())
|
TraceEvent("BackupWorkerStart", self.myId)
|
||||||
.detail("Tag", req.routerTag.toString())
|
.detail("Tag", req.routerTag.toString())
|
||||||
.detail("StartVersion", req.startVersion)
|
.detail("StartVersion", req.startVersion)
|
||||||
.detail("EndVersion", req.endVersion.present() ? req.endVersion.get() : -1)
|
.detail("EndVersion", req.endVersion.present() ? req.endVersion.get() : -1)
|
||||||
|
@ -324,12 +326,12 @@ ACTOR Future<Void> backupWorker(BackupInterface interf, InitializeBackupRequest
|
||||||
self.logSystem.set(ls);
|
self.logSystem.set(ls);
|
||||||
self.pop();
|
self.pop();
|
||||||
}
|
}
|
||||||
TraceEvent("BackupWorkerLogSystem", interf.id())
|
TraceEvent("BackupWorkerLogSystem", self.myId)
|
||||||
.detail("HasBackupLocality", hasPseudoLocality)
|
.detail("HasBackupLocality", hasPseudoLocality)
|
||||||
.detail("Tag", self.tag.toString());
|
.detail("Tag", self.tag.toString());
|
||||||
}
|
}
|
||||||
when(wait(done)) {
|
when(wait(done)) {
|
||||||
TraceEvent("BackupWorkerDone", interf.id()).detail("BackupEpoch", self.backupEpoch);
|
TraceEvent("BackupWorkerDone", self.myId).detail("BackupEpoch", self.backupEpoch);
|
||||||
// Notify master so that this worker can be removed from log system, then this
|
// Notify master so that this worker can be removed from log system, then this
|
||||||
// worker (for an old epoch's unfinished work) can safely exit.
|
// worker (for an old epoch's unfinished work) can safely exit.
|
||||||
wait(brokenPromiseToNever(db->get().master.notifyBackupWorkerDone.getReply(
|
wait(brokenPromiseToNever(db->get().master.notifyBackupWorkerDone.getReply(
|
||||||
|
@ -339,7 +341,7 @@ ACTOR Future<Void> backupWorker(BackupInterface interf, InitializeBackupRequest
|
||||||
when(wait(error)) {}
|
when(wait(error)) {}
|
||||||
}
|
}
|
||||||
} catch (Error& e) {
|
} catch (Error& e) {
|
||||||
TraceEvent("BackupWorkerTerminated", interf.id()).error(e, true);
|
TraceEvent("BackupWorkerTerminated", self.myId).error(e, true);
|
||||||
if (e.code() != error_code_actor_cancelled && e.code() != error_code_worker_removed) {
|
if (e.code() != error_code_actor_cancelled && e.code() != error_code_worker_removed) {
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue