Update peekLogRouter for backup workers to peek

This commit is contained in:
Jingyu Zhou 2019-05-22 10:52:46 -07:00
parent c3e5a9550f
commit a797958af6
3 changed files with 27 additions and 10 deletions

View File

@ -39,6 +39,12 @@ struct BackupData {
myId(id), startVersion(0), version(invalidVersion) {}
};
ACTOR Future<Void> uploadData(BackupData* self) {
// TODO: upload self->messages to cloud
wait(Never());
return Void();
}
// Pulls data from TLog servers using LogRouter tag.
ACTOR Future<Void> pullAsyncData(BackupData* self) {
state Future<Void> logSystemChange = Void();
@ -58,6 +64,7 @@ ACTOR Future<Void> pullAsyncData(BackupData* self) {
// peekLogRouter() assumes "myId" is one of the log router and returns the ServerPeekCursor
// from primary location's server for the tag.
// Otherwise, returns the SetPeekCursor from old log sets that has the log router.
// TODO: set tag for this worker
Tag tag(-2, 0);
r = self->logSystem.get()->peekLogRouter(self->myId, tagAt, tag);
} else {
@ -68,7 +75,6 @@ ACTOR Future<Void> pullAsyncData(BackupData* self) {
}
self->minKnownCommittedVersion = std::max(self->minKnownCommittedVersion, r->getMinKnownCommittedVersion());
state Version ver = 0;
while (r->hasMessage()) {
lastVersion = r->version().version;
self->messages.emplace_back(r->getMessage(), std::vector<Tag>());
@ -76,6 +82,7 @@ ACTOR Future<Void> pullAsyncData(BackupData* self) {
}
tagAt = std::max(r->version().version, lastVersion);
TraceEvent("BackupWorkerGot", self->myId).detail("V", tagAt);
}
}
@ -120,6 +127,7 @@ ACTOR Future<Void> backupWorker(BackupInterface interf, InitializeBackupRequest
TraceEvent("BackupWorkerStart", interf.id());
try {
addActor.send(pullAsyncData(&self));
addActor.send(uploadData(&self));
addActor.send(waitFailureServer(interf.waitFailure.getFuture()));
loop choose {

View File

@ -71,7 +71,7 @@ public:
return result;
}
bool hasLogRouter(UID id) {
bool hasLogRouter(UID id) const {
for (const auto& router : logRouters) {
if (router->get().id() == id) {
return true;
@ -80,6 +80,15 @@ public:
return false;
}
bool hasBackupWorker(UID id) const {
for (const auto& worker : backupWorkers) {
if (worker->get().id() == id) {
return true;
}
}
return false;
}
std::string logServerString() {
std::string result;
for(int i = 0; i < logServers.size(); i++) {

View File

@ -859,13 +859,13 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
virtual Reference<IPeekCursor> peekLogRouter( UID dbgid, Version begin, Tag tag ) {
bool found = false;
for( auto& log : tLogs ) {
found = log->hasLogRouter(dbgid);
if(found) {
for (const auto& log : tLogs) {
found = log->hasLogRouter(dbgid) || log->hasBackupWorker(dbgid);
if (found) {
break;
}
}
if( found ) {
if (found) {
if(stopped) {
std::vector<Reference<LogSet>> localSets;
int bestPrimarySet = 0;
@ -917,11 +917,11 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
}
bool firstOld = true;
for(auto& old : oldLogData) {
for (const auto& old : oldLogData) {
found = false;
for( auto& log : old.tLogs ) {
found = log->hasLogRouter(dbgid);
if(found) {
for (const auto& log : old.tLogs) {
found = log->hasLogRouter(dbgid) || log->hasBackupWorker(dbgid);
if (found) {
break;
}
}