2019-04-25 06:12:37 +08:00
|
|
|
/*
|
2019-10-03 05:47:09 +08:00
|
|
|
* BackupWorker.actor.cpp
|
2019-04-25 06:12:37 +08:00
|
|
|
*
|
|
|
|
* This source file is part of the FoundationDB open source project
|
|
|
|
*
|
2022-03-22 04:36:23 +08:00
|
|
|
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
2019-04-25 06:12:37 +08:00
|
|
|
*
|
|
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
* you may not use this file except in compliance with the License.
|
|
|
|
* You may obtain a copy of the License at
|
|
|
|
*
|
|
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
*
|
|
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
* See the License for the specific language governing permissions and
|
|
|
|
* limitations under the License.
|
|
|
|
*/
|
|
|
|
|
2020-01-22 08:57:30 +08:00
|
|
|
#include "fdbclient/BackupAgent.actor.h"
|
2019-09-10 01:21:16 +08:00
|
|
|
#include "fdbclient/BackupContainer.h"
|
2022-09-10 09:43:09 +08:00
|
|
|
#include "fdbclient/BlobCipher.h"
|
2020-01-08 02:27:52 +08:00
|
|
|
#include "fdbclient/DatabaseContext.h"
|
2020-09-11 08:44:15 +08:00
|
|
|
#include "fdbclient/CommitProxyInterface.h"
|
2019-05-24 07:06:23 +08:00
|
|
|
#include "fdbclient/SystemData.h"
|
2019-04-25 06:12:37 +08:00
|
|
|
#include "fdbserver/BackupInterface.h"
|
2020-01-22 08:57:30 +08:00
|
|
|
#include "fdbserver/BackupProgress.actor.h"
|
2022-08-24 14:04:12 +08:00
|
|
|
#include "fdbclient/GetEncryptCipherKeys.actor.h"
|
2021-05-31 02:51:47 +08:00
|
|
|
#include "fdbserver/Knobs.h"
|
2019-09-10 01:21:16 +08:00
|
|
|
#include "fdbserver/LogProtocolMessage.h"
|
2019-04-25 06:12:37 +08:00
|
|
|
#include "fdbserver/LogSystem.h"
|
|
|
|
#include "fdbserver/ServerDBInfo.h"
|
|
|
|
#include "fdbserver/WaitFailure.h"
|
2019-05-24 07:06:23 +08:00
|
|
|
#include "fdbserver/WorkerInterface.actor.h"
|
2019-04-25 06:12:37 +08:00
|
|
|
#include "flow/Error.h"
|
2019-12-04 08:05:12 +08:00
|
|
|
|
2020-07-08 00:06:13 +08:00
|
|
|
#include "flow/IRandom.h"
|
2022-06-24 08:05:36 +08:00
|
|
|
#include "fdbclient/Tracing.h"
|
2019-04-25 06:12:37 +08:00
|
|
|
#include "flow/actorcompiler.h" // This must be the last #include.
|
|
|
|
|
2020-05-21 04:26:57 +08:00
|
|
|
#define SevDebugMemory SevVerbose
|
|
|
|
|
2019-09-10 01:21:16 +08:00
|
|
|
struct VersionedMessage {
|
|
|
|
LogMessageVersion version;
|
|
|
|
StringRef message;
|
2019-11-13 08:44:59 +08:00
|
|
|
VectorRef<Tag> tags;
|
2019-09-17 06:56:23 +08:00
|
|
|
Arena arena; // Keep a reference to the memory containing the message
|
2022-06-30 05:21:05 +08:00
|
|
|
Arena decryptArena; // Arena used for decrypt buffer.
|
2020-05-21 04:26:57 +08:00
|
|
|
size_t bytes; // arena's size when inserted, which can grow afterwards
|
2019-09-10 01:21:16 +08:00
|
|
|
|
2019-11-13 08:44:59 +08:00
|
|
|
VersionedMessage(LogMessageVersion v, StringRef m, const VectorRef<Tag>& t, const Arena& a)
|
2020-05-21 04:26:57 +08:00
|
|
|
: version(v), message(m), tags(t), arena(a), bytes(a.getSize()) {}
|
2020-07-29 02:30:26 +08:00
|
|
|
Version getVersion() const { return version.version; }
|
|
|
|
uint32_t getSubVersion() const { return version.sub; }
|
2020-02-13 02:02:27 +08:00
|
|
|
|
|
|
|
// Returns true if the message is a mutation that should be backuped, i.e.,
|
|
|
|
// either key is not in system key space or is not a metadataVersionKey.
|
2022-06-30 05:21:05 +08:00
|
|
|
bool isBackupMessage(MutationRef* m,
|
|
|
|
const std::unordered_map<BlobCipherDetails, Reference<BlobCipherKey>>& cipherKeys) {
|
2020-02-13 02:02:27 +08:00
|
|
|
for (Tag tag : tags) {
|
|
|
|
if (tag.locality == tagLocalitySpecial || tag.locality == tagLocalityTxs) {
|
|
|
|
return false; // skip Txs mutations
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-09-29 01:58:49 +08:00
|
|
|
ArenaReader reader(arena, message, AssumeVersion(g_network->protocolVersion()));
|
2020-02-13 02:02:27 +08:00
|
|
|
|
2020-10-07 09:33:29 +08:00
|
|
|
// Return false for LogProtocolMessage and SpanContextMessage metadata messages.
|
2020-02-13 02:02:27 +08:00
|
|
|
if (LogProtocolMessage::isNextIn(reader))
|
|
|
|
return false;
|
2020-09-05 07:57:36 +08:00
|
|
|
if (reader.protocolVersion().hasSpanContext() && SpanContextMessage::isNextIn(reader))
|
|
|
|
return false;
|
2022-05-03 03:56:51 +08:00
|
|
|
if (reader.protocolVersion().hasOTELSpanContext() && OTELSpanContextMessage::isNextIn(reader)) {
|
2022-07-20 04:15:51 +08:00
|
|
|
CODE_PROBE(true, "Returning false for OTELSpanContextMessage");
|
2022-05-03 03:56:51 +08:00
|
|
|
return false;
|
|
|
|
}
|
2022-07-12 12:47:42 +08:00
|
|
|
reader >> *m;
|
|
|
|
if (m->isEncrypted()) {
|
2022-06-30 05:21:05 +08:00
|
|
|
// In case the mutation is encrypted, get the decrypted mutation and also update message to point to
|
|
|
|
// the decrypted mutation.
|
|
|
|
// We use dedicated arena for decrypt buffer, as the other arena is used to count towards backup lock bytes.
|
2022-09-10 09:43:09 +08:00
|
|
|
*m = m->decrypt(cipherKeys, decryptArena, BlobCipherMetrics::BACKUP, &message);
|
2022-06-30 05:21:05 +08:00
|
|
|
}
|
2020-02-13 02:02:27 +08:00
|
|
|
return normalKeys.contains(m->param1) || m->param1 == metadataVersionKey;
|
|
|
|
}
|
2022-06-30 05:21:05 +08:00
|
|
|
|
|
|
|
void collectCipherDetailIfEncrypted(std::unordered_set<BlobCipherDetails>& cipherDetails) {
|
2022-07-12 12:47:42 +08:00
|
|
|
ASSERT(!message.empty());
|
|
|
|
if (*message.begin() == MutationRef::Encrypted) {
|
2022-08-09 03:27:43 +08:00
|
|
|
ArenaReader reader(arena, message, AssumeVersion(ProtocolVersion::withEncryptionAtRest()));
|
2022-07-12 12:47:42 +08:00
|
|
|
MutationRef m;
|
|
|
|
reader >> m;
|
|
|
|
const BlobCipherEncryptHeader* header = m.encryptionHeader();
|
|
|
|
cipherDetails.insert(header->cipherTextDetails);
|
|
|
|
cipherDetails.insert(header->cipherHeaderDetails);
|
2022-06-30 05:21:05 +08:00
|
|
|
}
|
|
|
|
}
|
2019-09-10 01:21:16 +08:00
|
|
|
};
|
|
|
|
|
2019-04-25 06:12:37 +08:00
|
|
|
struct BackupData {
|
2019-05-24 07:06:23 +08:00
|
|
|
const UID myId;
|
2019-07-24 02:45:04 +08:00
|
|
|
const Tag tag; // LogRouter tag for this worker, i.e., (-2, i)
|
2020-02-21 08:28:27 +08:00
|
|
|
const int totalTags; // Total log router tags
|
2020-04-21 02:05:50 +08:00
|
|
|
const Version startVersion; // This worker's start version
|
2019-09-22 12:55:19 +08:00
|
|
|
const Optional<Version> endVersion; // old epoch's end version (inclusive), or empty for current epoch
|
2020-03-24 03:53:40 +08:00
|
|
|
const LogEpoch recruitedEpoch; // current epoch whose tLogs are receiving mutations
|
|
|
|
const LogEpoch backupEpoch; // the epoch workers should pull mutations
|
2020-03-21 09:39:51 +08:00
|
|
|
LogEpoch oldestBackupEpoch = 0; // oldest epoch that still has data on tLogs for backup to pull
|
2019-04-25 06:12:37 +08:00
|
|
|
Version minKnownCommittedVersion;
|
2020-04-18 13:55:09 +08:00
|
|
|
Version savedVersion; // Largest version saved to blob storage
|
True-up a backup's begin version
For the first mutation log of a backup, we need to true-up its begin version to
the exact version of the first mutation. This is needed to ensure the strict
less than relationship between two mutation logs, if one's version range is
within the other.
A problematic scenario is as follows:
Epoch 1: a mutation log A [200, 900] is saved, but its progress is NOT saved.
Epoch 2: master recruits a worker for [1, 1000], 1000 is epoch 1's end version.
New worker saves a mutation log B [100, 1000]
A's range is strict within B's range, but A's size is larger than B.
This happens because B's start version is true-up to the backup's begin version,
which is not the actual version of the first mutation. After B's begin version
is true-up to 300, we won't have this issue.
2020-04-20 01:03:47 +08:00
|
|
|
Version popVersion; // Largest version popped in NOOP mode, can be larger than savedVersion.
|
2022-06-30 05:21:05 +08:00
|
|
|
Reference<AsyncVar<ServerDBInfo> const> db;
|
2019-04-25 06:12:37 +08:00
|
|
|
AsyncVar<Reference<ILogSystem>> logSystem;
|
2019-05-24 07:06:23 +08:00
|
|
|
Database cx;
|
2019-09-10 01:21:16 +08:00
|
|
|
std::vector<VersionedMessage> messages;
|
Fix contract changes: backup worker generate continuous versions
Before we allow holes in version ranges in partitioned mutation logs. This
has been changed so that restore can easily figure out if database is
restorable. A specific problem is that if the backup worker didn't find any
mutations for an old epoch, the worker can just exit without generating a
log file, thus leaving holes in version ranges.
Another contract change is that if a backup key is set, then we must store
all mutations for that key, especially for the worker for the old epoch. As a
result, the worker must first check backup key, before pulling mutations and
uploading logs. Otherwise, we may lose mutations.
Finally, when a backup key is removed, the saving of mutations should be up to
the current version so that backup worker doesn't exit too early. I.e., avoid
the case saved mutation versions are less than the snapshot version taken.
2020-02-29 06:11:14 +08:00
|
|
|
NotifiedVersion pulledVersion;
|
2020-03-03 05:29:42 +08:00
|
|
|
bool pulling = false;
|
2020-03-05 04:32:06 +08:00
|
|
|
bool stopped = false;
|
2020-03-18 12:35:44 +08:00
|
|
|
bool exitEarly = false; // If the worker is on an old epoch and all backups starts a version >= the endVersion
|
2020-04-03 06:28:51 +08:00
|
|
|
AsyncVar<bool> paused; // Track if "backupPausedKey" is set.
|
2020-05-15 03:05:16 +08:00
|
|
|
Reference<FlowLock> lock;
|
2019-04-25 06:12:37 +08:00
|
|
|
|
2020-01-31 00:35:02 +08:00
|
|
|
struct PerBackupInfo {
|
|
|
|
PerBackupInfo() = default;
|
2020-03-23 07:31:39 +08:00
|
|
|
PerBackupInfo(BackupData* data, UID uid, Version v) : self(data), startVersion(v) {
|
|
|
|
// Open the container and get key ranges
|
|
|
|
BackupConfig config(uid);
|
2022-07-02 01:02:39 +08:00
|
|
|
container = config.backupContainer().get(data->cx.getReference());
|
|
|
|
ranges = config.backupRanges().get(data->cx.getReference());
|
2020-03-23 09:19:26 +08:00
|
|
|
if (self->backupEpoch == self->recruitedEpoch) {
|
|
|
|
// Only current epoch's worker update the number of backup workers.
|
|
|
|
updateWorker = _updateStartedWorkers(this, data, uid);
|
|
|
|
}
|
2020-03-23 07:31:39 +08:00
|
|
|
TraceEvent("BackupWorkerAddJob", data->myId).detail("BackupID", uid).detail("Version", v);
|
|
|
|
}
|
2020-01-31 00:35:02 +08:00
|
|
|
|
2020-03-23 12:08:11 +08:00
|
|
|
void stop() {
|
|
|
|
stopped = true;
|
|
|
|
updateWorker = Void(); // cancel actors
|
|
|
|
}
|
|
|
|
|
2020-03-24 13:04:10 +08:00
|
|
|
void cancelUpdater() { updateWorker = Void(); }
|
|
|
|
|
Fix contract changes: backup worker generate continuous versions
Before we allow holes in version ranges in partitioned mutation logs. This
has been changed so that restore can easily figure out if database is
restorable. A specific problem is that if the backup worker didn't find any
mutations for an old epoch, the worker can just exit without generating a
log file, thus leaving holes in version ranges.
Another contract change is that if a backup key is set, then we must store
all mutations for that key, especially for the worker for the old epoch. As a
result, the worker must first check backup key, before pulling mutations and
uploading logs. Otherwise, we may lose mutations.
Finally, when a backup key is removed, the saving of mutations should be up to
the current version so that backup worker doesn't exit too early. I.e., avoid
the case saved mutation versions are less than the snapshot version taken.
2020-02-29 06:11:14 +08:00
|
|
|
bool isReady() const { return stopped || (container.isReady() && ranges.isReady()); }
|
|
|
|
|
|
|
|
Future<Void> waitReady() {
|
|
|
|
if (stopped)
|
|
|
|
return Void();
|
|
|
|
return _waitReady(this);
|
|
|
|
}
|
|
|
|
|
|
|
|
ACTOR static Future<Void> _waitReady(PerBackupInfo* info) {
|
|
|
|
wait(success(info->container) && success(info->ranges));
|
|
|
|
return Void();
|
|
|
|
}
|
|
|
|
|
2020-03-23 07:31:39 +08:00
|
|
|
// Update the number of backup workers in the BackupConfig. Each worker
|
|
|
|
// writes (epoch, tag.id) into the key. Worker 0 monitors the key and once
|
|
|
|
// all workers have updated the key, this backup is considered as started
|
|
|
|
// (i.e., the "submitBackup" call is successful). Worker 0 then sets
|
2020-03-23 09:19:26 +08:00
|
|
|
// the "allWorkerStarted" flag, which in turn unblocks
|
|
|
|
// StartFullBackupTaskFunc::_execute.
|
2020-03-23 07:31:39 +08:00
|
|
|
ACTOR static Future<Void> _updateStartedWorkers(PerBackupInfo* info, BackupData* self, UID uid) {
|
|
|
|
state BackupConfig config(uid);
|
|
|
|
state Future<Void> watchFuture;
|
2020-03-23 09:19:26 +08:00
|
|
|
state bool updated = false;
|
2020-03-23 07:31:39 +08:00
|
|
|
state bool firstWorker = info->self->tag.id == 0;
|
|
|
|
state bool allUpdated = false;
|
|
|
|
state Optional<std::vector<std::pair<int64_t, int64_t>>> workers;
|
2020-03-25 09:22:20 +08:00
|
|
|
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(self->cx));
|
2020-03-23 07:31:39 +08:00
|
|
|
|
|
|
|
loop {
|
|
|
|
try {
|
|
|
|
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
|
|
|
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
2020-03-25 09:22:20 +08:00
|
|
|
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
2020-03-23 07:31:39 +08:00
|
|
|
|
|
|
|
Optional<std::vector<std::pair<int64_t, int64_t>>> tmp =
|
|
|
|
wait(config.startedBackupWorkers().get(tr));
|
|
|
|
workers = tmp;
|
|
|
|
if (!updated) {
|
|
|
|
if (workers.present()) {
|
|
|
|
workers.get().emplace_back(self->recruitedEpoch, (int64_t)self->tag.id);
|
|
|
|
} else {
|
|
|
|
std::vector<std::pair<int64_t, int64_t>> v(1, { self->recruitedEpoch, self->tag.id });
|
|
|
|
workers = Optional<std::vector<std::pair<int64_t, int64_t>>>(v);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if (firstWorker) {
|
2020-03-24 12:11:25 +08:00
|
|
|
if (!workers.present()) {
|
|
|
|
TraceEvent("BackupWorkerDetectAbortedJob", self->myId).detail("BackupID", uid);
|
|
|
|
return Void();
|
|
|
|
}
|
2020-03-23 12:08:11 +08:00
|
|
|
ASSERT(workers.present() && workers.get().size() > 0);
|
2020-03-23 07:31:39 +08:00
|
|
|
std::vector<std::pair<int64_t, int64_t>>& v = workers.get();
|
|
|
|
v.erase(std::remove_if(v.begin(),
|
|
|
|
v.end(),
|
2020-03-23 09:19:26 +08:00
|
|
|
[epoch = self->recruitedEpoch](const std::pair<int64_t, int64_t>& p) {
|
2020-03-23 07:31:39 +08:00
|
|
|
return p.first != epoch;
|
|
|
|
}),
|
|
|
|
v.end());
|
2020-03-24 01:44:26 +08:00
|
|
|
std::set<int64_t> tags;
|
|
|
|
for (auto p : v) {
|
|
|
|
tags.insert(p.second);
|
|
|
|
}
|
|
|
|
if (self->totalTags == tags.size()) {
|
2020-03-23 07:31:39 +08:00
|
|
|
config.allWorkerStarted().set(tr, true);
|
|
|
|
allUpdated = true;
|
|
|
|
} else {
|
|
|
|
// monitor all workers' updates
|
|
|
|
watchFuture = tr->watch(config.startedBackupWorkers().key);
|
|
|
|
}
|
2020-03-23 09:19:26 +08:00
|
|
|
ASSERT(workers.present() && workers.get().size() > 0);
|
|
|
|
if (!updated) {
|
|
|
|
config.startedBackupWorkers().set(tr, workers.get());
|
|
|
|
}
|
2020-03-23 12:08:11 +08:00
|
|
|
for (auto p : workers.get()) {
|
2020-04-12 01:23:53 +08:00
|
|
|
TraceEvent("BackupWorkerDebugTag", self->myId)
|
|
|
|
.detail("Epoch", p.first)
|
|
|
|
.detail("TagID", p.second);
|
2020-03-23 12:08:11 +08:00
|
|
|
}
|
2020-03-23 07:31:39 +08:00
|
|
|
wait(tr->commit());
|
|
|
|
|
|
|
|
updated = true; // Only set to true after commit.
|
|
|
|
if (allUpdated) {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
wait(watchFuture);
|
2020-03-25 11:14:37 +08:00
|
|
|
tr->reset();
|
2020-03-23 07:31:39 +08:00
|
|
|
} else {
|
2020-03-23 09:19:26 +08:00
|
|
|
ASSERT(workers.present() && workers.get().size() > 0);
|
2020-03-23 07:31:39 +08:00
|
|
|
config.startedBackupWorkers().set(tr, workers.get());
|
|
|
|
wait(tr->commit());
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
} catch (Error& e) {
|
|
|
|
wait(tr->onError(e));
|
|
|
|
allUpdated = false;
|
|
|
|
}
|
|
|
|
}
|
2020-03-23 09:19:26 +08:00
|
|
|
TraceEvent("BackupWorkerSetReady", self->myId).detail("BackupID", uid).detail("TagId", self->tag.id);
|
2020-03-23 07:31:39 +08:00
|
|
|
return Void();
|
|
|
|
}
|
|
|
|
|
2020-01-31 13:21:05 +08:00
|
|
|
BackupData* self = nullptr;
|
2020-04-21 08:07:50 +08:00
|
|
|
|
|
|
|
// Backup request's commit version. Mutations are logged at some version after this.
|
2020-01-31 00:35:02 +08:00
|
|
|
Version startVersion = invalidVersion;
|
2020-04-21 08:07:50 +08:00
|
|
|
// The last mutation log's saved version (not inclusive), i.e., next log's begin version.
|
2020-02-05 06:30:32 +08:00
|
|
|
Version lastSavedVersion = invalidVersion;
|
2020-04-21 08:07:50 +08:00
|
|
|
|
2020-02-15 11:32:11 +08:00
|
|
|
Future<Optional<Reference<IBackupContainer>>> container;
|
2020-01-31 00:35:02 +08:00
|
|
|
Future<Optional<std::vector<KeyRange>>> ranges; // Key ranges of this backup
|
2020-03-23 07:31:39 +08:00
|
|
|
Future<Void> updateWorker;
|
2020-01-31 00:35:02 +08:00
|
|
|
bool stopped = false; // Is the backup stopped?
|
|
|
|
};
|
|
|
|
|
|
|
|
std::map<UID, PerBackupInfo> backups; // Backup UID to infos
|
|
|
|
AsyncTrigger changedTrigger;
|
2020-03-12 11:47:54 +08:00
|
|
|
AsyncTrigger doneTrigger;
|
2020-01-22 08:57:30 +08:00
|
|
|
|
2019-07-24 02:45:04 +08:00
|
|
|
CounterCollection cc;
|
|
|
|
Future<Void> logger;
|
|
|
|
|
2021-07-12 12:54:36 +08:00
|
|
|
explicit BackupData(UID id, Reference<AsyncVar<ServerDBInfo> const> db, const InitializeBackupRequest& req)
|
2020-02-21 08:28:27 +08:00
|
|
|
: myId(id), tag(req.routerTag), totalTags(req.totalTags), startVersion(req.startVersion),
|
|
|
|
endVersion(req.endVersion), recruitedEpoch(req.recruitedEpoch), backupEpoch(req.backupEpoch),
|
2020-04-18 13:55:09 +08:00
|
|
|
minKnownCommittedVersion(invalidVersion), savedVersion(req.startVersion - 1), popVersion(req.startVersion - 1),
|
2022-06-30 05:21:05 +08:00
|
|
|
db(db), pulledVersion(0), paused(false), lock(new FlowLock(SERVER_KNOBS->BACKUP_LOCK_BYTES)),
|
2021-07-25 02:20:51 +08:00
|
|
|
cc("BackupWorker", myId.toString()) {
|
2021-07-17 15:11:40 +08:00
|
|
|
cx = openDBOnServer(db, TaskPriority::DefaultEndpoint, LockAware::True);
|
2019-07-24 02:45:04 +08:00
|
|
|
|
2019-08-12 11:15:50 +08:00
|
|
|
specialCounter(cc, "SavedVersion", [this]() { return this->savedVersion; });
|
2019-07-24 02:45:04 +08:00
|
|
|
specialCounter(cc, "MinKnownCommittedVersion", [this]() { return this->minKnownCommittedVersion; });
|
2019-09-23 04:23:27 +08:00
|
|
|
specialCounter(cc, "MsgQ", [this]() { return this->messages.size(); });
|
2020-05-15 03:05:16 +08:00
|
|
|
specialCounter(cc, "BufferedBytes", [this]() { return this->lock->activePermits(); });
|
2020-05-22 03:19:57 +08:00
|
|
|
specialCounter(cc, "AvailableBytes", [this]() { return this->lock->available(); });
|
2019-07-24 02:45:04 +08:00
|
|
|
logger = traceCounters(
|
|
|
|
"BackupWorkerMetrics", myId, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &cc, "BackupWorkerMetrics");
|
2019-05-24 07:06:23 +08:00
|
|
|
}
|
2019-08-15 05:19:50 +08:00
|
|
|
|
2020-03-03 05:29:42 +08:00
|
|
|
bool pullFinished() const { return endVersion.present() && pulledVersion.get() > endVersion.get(); }
|
|
|
|
|
|
|
|
bool allMessageSaved() const {
|
2020-03-18 12:35:44 +08:00
|
|
|
return (endVersion.present() && savedVersion >= endVersion.get()) || stopped || exitEarly;
|
2020-03-03 05:29:42 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
Version maxPopVersion() const { return endVersion.present() ? endVersion.get() : minKnownCommittedVersion; }
|
|
|
|
|
2020-01-31 13:21:05 +08:00
|
|
|
// Inserts a backup's single range into rangeMap.
|
2020-02-06 03:45:16 +08:00
|
|
|
template <class T>
|
|
|
|
void insertRange(KeyRangeMap<std::set<T>>& keyRangeMap, KeyRangeRef range, T value) {
|
|
|
|
for (auto& logRange : keyRangeMap.modify(range)) {
|
|
|
|
logRange->value().insert(value);
|
2020-01-31 13:21:05 +08:00
|
|
|
}
|
2020-02-06 03:45:16 +08:00
|
|
|
for (auto& logRange : keyRangeMap.modify(singleKeyRange(metadataVersionKey))) {
|
|
|
|
logRange->value().insert(value);
|
2020-01-31 13:21:05 +08:00
|
|
|
}
|
|
|
|
TraceEvent("BackupWorkerInsertRange", myId)
|
2020-02-06 03:45:16 +08:00
|
|
|
.detail("Value", value)
|
2020-01-31 13:21:05 +08:00
|
|
|
.detail("Begin", range.begin)
|
|
|
|
.detail("End", range.end);
|
|
|
|
}
|
|
|
|
|
|
|
|
// Inserts a backup's ranges into rangeMap.
|
2020-02-06 03:45:16 +08:00
|
|
|
template <class T>
|
|
|
|
void insertRanges(KeyRangeMap<std::set<T>>& keyRangeMap, const Optional<std::vector<KeyRange>>& ranges, T value) {
|
2020-01-31 13:21:05 +08:00
|
|
|
if (!ranges.present() || ranges.get().empty()) {
|
|
|
|
// insert full ranges of normal keys
|
2020-02-06 03:45:16 +08:00
|
|
|
return insertRange(keyRangeMap, normalKeys, value);
|
2020-01-31 13:21:05 +08:00
|
|
|
}
|
|
|
|
for (const auto& range : ranges.get()) {
|
2020-02-06 03:45:16 +08:00
|
|
|
insertRange(keyRangeMap, range, value);
|
2020-01-31 13:21:05 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-08-15 05:19:50 +08:00
|
|
|
void pop() {
|
2020-03-20 12:35:08 +08:00
|
|
|
if (backupEpoch > oldestBackupEpoch || stopped) {
|
2019-08-15 08:00:20 +08:00
|
|
|
// Defer pop if old epoch hasn't finished popping yet.
|
2020-03-20 12:35:08 +08:00
|
|
|
// If stopped because of displacement, do NOT pop as the progress may
|
|
|
|
// not be saved in a timely fashion. As a result, next epoch may still
|
|
|
|
// need to read mutations in the version range. Let the next epoch's
|
|
|
|
// worker do the pop instead.
|
2019-08-15 08:00:20 +08:00
|
|
|
TraceEvent("BackupWorkerPopDeferred", myId)
|
|
|
|
.suppressFor(1.0)
|
|
|
|
.detail("BackupEpoch", backupEpoch)
|
2020-03-18 05:45:07 +08:00
|
|
|
.detail("OldestEpoch", oldestBackupEpoch)
|
2019-08-15 08:00:20 +08:00
|
|
|
.detail("Version", savedVersion);
|
|
|
|
return;
|
|
|
|
}
|
2020-03-20 12:35:08 +08:00
|
|
|
ASSERT_WE_THINK(backupEpoch == oldestBackupEpoch);
|
2019-08-15 05:19:50 +08:00
|
|
|
const Tag popTag = logSystem.get()->getPseudoPopTag(tag, ProcessClass::BackupClass);
|
2020-04-21 02:05:50 +08:00
|
|
|
logSystem.get()->pop(std::max(popVersion, savedVersion), popTag);
|
2019-08-15 05:19:50 +08:00
|
|
|
}
|
2019-09-23 04:23:27 +08:00
|
|
|
|
2020-03-24 09:48:06 +08:00
|
|
|
void stop() {
|
|
|
|
stopped = true;
|
|
|
|
for (auto& [uid, info] : backups) {
|
2020-03-24 13:04:10 +08:00
|
|
|
// Cancel the actor. Because container is valid, CANNOT set the
|
|
|
|
// "stop" flag that will block writing mutation files in
|
|
|
|
// saveMutationsToFile().
|
|
|
|
info.cancelUpdater();
|
2020-03-24 09:48:06 +08:00
|
|
|
}
|
|
|
|
doneTrigger.trigger();
|
|
|
|
}
|
|
|
|
|
2020-05-15 03:05:16 +08:00
|
|
|
// Erases messages and updates lock with memory released.
|
|
|
|
void eraseMessages(int num) {
|
|
|
|
ASSERT(num <= messages.size());
|
|
|
|
if (num == 0)
|
|
|
|
return;
|
|
|
|
|
|
|
|
if (messages.size() == num) {
|
|
|
|
messages.clear();
|
2020-05-21 04:26:57 +08:00
|
|
|
TraceEvent(SevDebugMemory, "BackupWorkerMemory", myId).detail("ReleaseAll", lock->activePermits());
|
2020-05-15 10:46:30 +08:00
|
|
|
lock->release(lock->activePermits());
|
2020-05-15 03:05:16 +08:00
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
// keep track of each arena and accumulate their sizes
|
|
|
|
int64_t bytes = 0;
|
|
|
|
for (int i = 0; i < num; i++) {
|
|
|
|
const Arena& a = messages[i].arena;
|
|
|
|
const Arena& b = messages[i + 1].arena;
|
2020-07-14 12:10:34 +08:00
|
|
|
if (!a.sameArena(b)) {
|
2020-05-21 04:26:57 +08:00
|
|
|
bytes += messages[i].bytes;
|
|
|
|
TraceEvent(SevDebugMemory, "BackupWorkerMemory", myId).detail("Release", messages[i].bytes);
|
2020-05-15 03:05:16 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
lock->release(bytes);
|
|
|
|
messages.erase(messages.begin(), messages.begin() + num);
|
|
|
|
}
|
|
|
|
|
2019-09-23 04:23:27 +08:00
|
|
|
void eraseMessagesAfterEndVersion() {
|
|
|
|
ASSERT(endVersion.present());
|
|
|
|
const Version ver = endVersion.get();
|
|
|
|
while (!messages.empty()) {
|
|
|
|
if (messages.back().getVersion() > ver) {
|
|
|
|
messages.pop_back();
|
|
|
|
} else {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2020-01-31 00:35:02 +08:00
|
|
|
|
|
|
|
// Give a list of current active backups, compare with current list and decide
|
|
|
|
// to start new backups and stop ones not in the active state.
|
|
|
|
void onBackupChanges(const std::vector<std::pair<UID, Version>>& uidVersions) {
|
|
|
|
std::set<UID> stopList;
|
|
|
|
for (auto it : backups) {
|
|
|
|
stopList.insert(it.first);
|
|
|
|
}
|
|
|
|
|
|
|
|
bool modified = false;
|
2020-04-19 00:38:14 +08:00
|
|
|
bool minVersionChanged = false;
|
2020-04-18 13:55:09 +08:00
|
|
|
Version minVersion = std::numeric_limits<Version>::max();
|
2020-07-29 02:30:26 +08:00
|
|
|
for (const auto& [uid, version] : uidVersions) {
|
2020-01-31 00:35:02 +08:00
|
|
|
auto it = backups.find(uid);
|
|
|
|
if (it == backups.end()) {
|
|
|
|
modified = true;
|
2020-03-23 07:31:39 +08:00
|
|
|
backups.emplace(uid, BackupData::PerBackupInfo(this, uid, version));
|
2020-04-18 13:55:09 +08:00
|
|
|
minVersion = std::min(minVersion, version);
|
2020-04-19 00:38:14 +08:00
|
|
|
minVersionChanged = true;
|
2020-01-31 00:35:02 +08:00
|
|
|
} else {
|
|
|
|
stopList.erase(uid);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
for (UID uid : stopList) {
|
|
|
|
auto it = backups.find(uid);
|
|
|
|
ASSERT(it != backups.end());
|
2020-03-23 12:08:11 +08:00
|
|
|
it->second.stop();
|
2020-01-31 00:35:02 +08:00
|
|
|
modified = true;
|
|
|
|
}
|
2020-04-19 00:38:14 +08:00
|
|
|
if (minVersionChanged && backupEpoch < recruitedEpoch && savedVersion + 1 == startVersion) {
|
2020-04-18 13:55:09 +08:00
|
|
|
// Advance savedVersion to minimize version ranges in case backupEpoch's
|
|
|
|
// progress is not saved. Master may set a very low startVersion that
|
|
|
|
// is already popped. Advance the version is safe because these
|
|
|
|
// versions are not popped -- if they are popped, their progress should
|
|
|
|
// be already recorded and Master would use a higher version than minVersion.
|
|
|
|
savedVersion = std::max(minVersion, savedVersion);
|
|
|
|
}
|
2020-01-31 00:35:02 +08:00
|
|
|
if (modified)
|
|
|
|
changedTrigger.trigger();
|
|
|
|
}
|
Fix contract changes: backup worker generate continuous versions
Before we allow holes in version ranges in partitioned mutation logs. This
has been changed so that restore can easily figure out if database is
restorable. A specific problem is that if the backup worker didn't find any
mutations for an old epoch, the worker can just exit without generating a
log file, thus leaving holes in version ranges.
Another contract change is that if a backup key is set, then we must store
all mutations for that key, especially for the worker for the old epoch. As a
result, the worker must first check backup key, before pulling mutations and
uploading logs. Otherwise, we may lose mutations.
Finally, when a backup key is removed, the saving of mutations should be up to
the current version so that backup worker doesn't exit too early. I.e., avoid
the case saved mutation versions are less than the snapshot version taken.
2020-02-29 06:11:14 +08:00
|
|
|
|
|
|
|
ACTOR static Future<Void> _waitAllInfoReady(BackupData* self) {
|
|
|
|
std::vector<Future<Void>> all;
|
|
|
|
for (auto it = self->backups.begin(); it != self->backups.end();) {
|
|
|
|
if (it->second.stopped) {
|
|
|
|
TraceEvent("BackupWorkerRemoveStoppedContainer", self->myId).detail("BackupId", it->first);
|
|
|
|
it = self->backups.erase(it);
|
|
|
|
continue;
|
|
|
|
}
|
2020-02-29 09:14:18 +08:00
|
|
|
|
Fix contract changes: backup worker generate continuous versions
Before we allow holes in version ranges in partitioned mutation logs. This
has been changed so that restore can easily figure out if database is
restorable. A specific problem is that if the backup worker didn't find any
mutations for an old epoch, the worker can just exit without generating a
log file, thus leaving holes in version ranges.
Another contract change is that if a backup key is set, then we must store
all mutations for that key, especially for the worker for the old epoch. As a
result, the worker must first check backup key, before pulling mutations and
uploading logs. Otherwise, we may lose mutations.
Finally, when a backup key is removed, the saving of mutations should be up to
the current version so that backup worker doesn't exit too early. I.e., avoid
the case saved mutation versions are less than the snapshot version taken.
2020-02-29 06:11:14 +08:00
|
|
|
all.push_back(it->second.waitReady());
|
|
|
|
it++;
|
|
|
|
}
|
|
|
|
wait(waitForAll(all));
|
|
|
|
return Void();
|
|
|
|
}
|
|
|
|
|
|
|
|
Future<Void> waitAllInfoReady() { return _waitAllInfoReady(this); }
|
|
|
|
|
|
|
|
bool isAllInfoReady() const {
|
|
|
|
for (const auto& [uid, info] : backups) {
|
|
|
|
if (!info.isReady())
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
return true;
|
|
|
|
}
|
2020-03-13 05:38:40 +08:00
|
|
|
|
|
|
|
ACTOR static Future<Version> _getMinKnownCommittedVersion(BackupData* self) {
|
2020-07-10 01:49:33 +08:00
|
|
|
state Span span("BA:GetMinCommittedVersion"_loc);
|
2020-03-13 05:38:40 +08:00
|
|
|
loop {
|
2022-08-11 05:13:03 +08:00
|
|
|
try {
|
|
|
|
GetReadVersionRequest request(span.context,
|
|
|
|
0,
|
|
|
|
TransactionPriority::DEFAULT,
|
|
|
|
invalidVersion,
|
|
|
|
GetReadVersionRequest::FLAG_USE_MIN_KNOWN_COMMITTED_VERSION);
|
|
|
|
choose {
|
|
|
|
when(wait(self->cx->onProxiesChanged())) {}
|
|
|
|
when(GetReadVersionReply reply =
|
|
|
|
wait(basicLoadBalance(self->cx->getGrvProxies(UseProvisionalProxies::False),
|
|
|
|
&GrvProxyInterface::getConsistentReadVersion,
|
|
|
|
request,
|
|
|
|
self->cx->taskID))) {
|
|
|
|
self->cx->ssVersionVectorCache.applyDelta(reply.ssVersionVectorDelta);
|
|
|
|
return reply.version;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} catch (Error& e) {
|
|
|
|
if (e.code() == error_code_batch_transaction_throttled ||
|
2022-08-13 02:36:17 +08:00
|
|
|
e.code() == error_code_grv_proxy_memory_limit_exceeded) {
|
2022-08-11 05:13:03 +08:00
|
|
|
// GRV Proxy returns an error
|
2022-08-12 02:23:44 +08:00
|
|
|
wait(delayJittered(CLIENT_KNOBS->GRV_ERROR_RETRY_DELAY));
|
2022-08-11 05:13:03 +08:00
|
|
|
} else {
|
|
|
|
throw;
|
2020-03-13 05:38:40 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
Future<Version> getMinKnownCommittedVersion() { return _getMinKnownCommittedVersion(this); }
|
2019-04-25 06:12:37 +08:00
|
|
|
};
|
|
|
|
|
2020-04-18 13:55:09 +08:00
|
|
|
// Monitors "backupStartedKey". If "present" is true, wait until the key is set;
|
Fix contract changes: backup worker generate continuous versions
Before we allow holes in version ranges in partitioned mutation logs. This
has been changed so that restore can easily figure out if database is
restorable. A specific problem is that if the backup worker didn't find any
mutations for an old epoch, the worker can just exit without generating a
log file, thus leaving holes in version ranges.
Another contract change is that if a backup key is set, then we must store
all mutations for that key, especially for the worker for the old epoch. As a
result, the worker must first check backup key, before pulling mutations and
uploading logs. Otherwise, we may lose mutations.
Finally, when a backup key is removed, the saving of mutations should be up to
the current version so that backup worker doesn't exit too early. I.e., avoid
the case saved mutation versions are less than the snapshot version taken.
2020-02-29 06:11:14 +08:00
|
|
|
// otherwise, wait until the key is cleared. If "watch" is false, do not perform
|
|
|
|
// the wait for key set/clear events. Returns if key present.
|
2020-04-18 13:55:09 +08:00
|
|
|
ACTOR Future<bool> monitorBackupStartedKeyChanges(BackupData* self, bool present, bool watch) {
|
2020-01-08 02:27:52 +08:00
|
|
|
loop {
|
|
|
|
state ReadYourWritesTransaction tr(self->cx);
|
|
|
|
|
|
|
|
loop {
|
|
|
|
try {
|
|
|
|
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
2020-01-23 11:34:40 +08:00
|
|
|
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
2020-01-22 08:57:30 +08:00
|
|
|
Optional<Value> value = wait(tr.get(backupStartedKey));
|
2020-02-06 02:33:51 +08:00
|
|
|
std::vector<std::pair<UID, Version>> uidVersions;
|
2020-03-18 12:35:44 +08:00
|
|
|
bool shouldExit = self->endVersion.present();
|
2020-01-25 03:01:58 +08:00
|
|
|
if (value.present()) {
|
2020-02-06 02:33:51 +08:00
|
|
|
uidVersions = decodeBackupStartedValue(value.get());
|
|
|
|
TraceEvent e("BackupWorkerGotStartKey", self->myId);
|
|
|
|
int i = 1;
|
2020-03-18 12:35:44 +08:00
|
|
|
for (auto [uid, version] : uidVersions) {
|
|
|
|
e.detail(format("BackupID%d", i), uid).detail(format("Version%d", i), version);
|
2020-02-06 02:33:51 +08:00
|
|
|
i++;
|
2020-03-18 12:35:44 +08:00
|
|
|
if (shouldExit && version < self->endVersion.get()) {
|
|
|
|
shouldExit = false;
|
|
|
|
}
|
2020-02-06 02:33:51 +08:00
|
|
|
}
|
2020-03-18 12:35:44 +08:00
|
|
|
self->exitEarly = shouldExit;
|
2020-02-06 02:33:51 +08:00
|
|
|
self->onBackupChanges(uidVersions);
|
2020-04-18 13:55:09 +08:00
|
|
|
if (present || !watch)
|
|
|
|
return true;
|
2020-01-31 00:35:02 +08:00
|
|
|
} else {
|
2021-07-27 10:55:10 +08:00
|
|
|
TraceEvent("BackupWorkerEmptyStartKey", self->myId).log();
|
2020-02-06 02:33:51 +08:00
|
|
|
self->onBackupChanges(uidVersions);
|
2020-01-31 00:35:02 +08:00
|
|
|
|
2020-03-18 12:35:44 +08:00
|
|
|
self->exitEarly = shouldExit;
|
2020-04-18 13:55:09 +08:00
|
|
|
if (!present || !watch) {
|
Fix contract changes: backup worker generate continuous versions
Before we allow holes in version ranges in partitioned mutation logs. This
has been changed so that restore can easily figure out if database is
restorable. A specific problem is that if the backup worker didn't find any
mutations for an old epoch, the worker can just exit without generating a
log file, thus leaving holes in version ranges.
Another contract change is that if a backup key is set, then we must store
all mutations for that key, especially for the worker for the old epoch. As a
result, the worker must first check backup key, before pulling mutations and
uploading logs. Otherwise, we may lose mutations.
Finally, when a backup key is removed, the saving of mutations should be up to
the current version so that backup worker doesn't exit too early. I.e., avoid
the case saved mutation versions are less than the snapshot version taken.
2020-02-29 06:11:14 +08:00
|
|
|
return false;
|
2020-01-31 00:35:02 +08:00
|
|
|
}
|
2020-01-10 02:15:42 +08:00
|
|
|
}
|
2020-01-08 02:27:52 +08:00
|
|
|
|
|
|
|
state Future<Void> watchFuture = tr.watch(backupStartedKey);
|
|
|
|
wait(tr.commit());
|
|
|
|
wait(watchFuture);
|
|
|
|
break;
|
|
|
|
} catch (Error& e) {
|
|
|
|
wait(tr.onError(e));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-03-23 07:31:39 +08:00
|
|
|
// Set "latestBackupWorkerSavedVersion" key for backups
|
|
|
|
ACTOR Future<Void> setBackupKeys(BackupData* self, std::map<UID, Version> savedLogVersions) {
|
2020-03-25 09:22:20 +08:00
|
|
|
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(self->cx));
|
|
|
|
|
2020-03-21 02:25:41 +08:00
|
|
|
loop {
|
|
|
|
try {
|
|
|
|
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
|
|
|
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
2020-03-25 09:22:20 +08:00
|
|
|
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
2020-03-21 02:25:41 +08:00
|
|
|
|
|
|
|
state std::vector<Future<Optional<Version>>> prevVersions;
|
|
|
|
state std::vector<BackupConfig> versionConfigs;
|
2020-03-23 07:31:39 +08:00
|
|
|
state std::vector<Future<Optional<bool>>> allWorkersReady;
|
2021-03-04 01:18:03 +08:00
|
|
|
for (const auto& [uid, version] : savedLogVersions) {
|
2020-03-21 02:25:41 +08:00
|
|
|
versionConfigs.emplace_back(uid);
|
|
|
|
prevVersions.push_back(versionConfigs.back().latestBackupWorkerSavedVersion().get(tr));
|
2020-03-23 07:31:39 +08:00
|
|
|
allWorkersReady.push_back(versionConfigs.back().allWorkerStarted().get(tr));
|
2020-03-21 02:25:41 +08:00
|
|
|
}
|
|
|
|
|
2020-03-23 07:31:39 +08:00
|
|
|
wait(waitForAll(prevVersions) && waitForAll(allWorkersReady));
|
2020-03-21 02:25:41 +08:00
|
|
|
|
|
|
|
for (int i = 0; i < prevVersions.size(); i++) {
|
2020-03-23 07:31:39 +08:00
|
|
|
if (!allWorkersReady[i].get().present() || !allWorkersReady[i].get().get())
|
|
|
|
continue;
|
|
|
|
|
2020-03-21 02:25:41 +08:00
|
|
|
const Version current = savedLogVersions[versionConfigs[i].getUid()];
|
|
|
|
if (prevVersions[i].get().present()) {
|
|
|
|
const Version prev = prevVersions[i].get().get();
|
|
|
|
if (prev > current) {
|
|
|
|
TraceEvent(SevWarn, "BackupWorkerVersionInverse", self->myId)
|
|
|
|
.detail("Prev", prev)
|
|
|
|
.detail("Current", current);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if (self->backupEpoch == self->oldestBackupEpoch &&
|
|
|
|
(!prevVersions[i].get().present() || prevVersions[i].get().get() < current)) {
|
|
|
|
TraceEvent("BackupWorkerSetVersion", self->myId)
|
|
|
|
.detail("BackupID", versionConfigs[i].getUid())
|
|
|
|
.detail("Version", current);
|
|
|
|
versionConfigs[i].latestBackupWorkerSavedVersion().set(tr, current);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
wait(tr->commit());
|
|
|
|
return Void();
|
|
|
|
} catch (Error& e) {
|
|
|
|
wait(tr->onError(e));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-03-23 09:19:26 +08:00
|
|
|
// Note only worker with Tag (-2,0) runs this actor so that the latest saved
|
|
|
|
// version key is set by one process, which is stored in each BackupConfig in
|
|
|
|
// the system space. The client can know if a backup is restorable by checking
|
2020-03-07 03:58:10 +08:00
|
|
|
// log saved version > snapshot version.
|
2020-03-23 09:19:26 +08:00
|
|
|
ACTOR Future<Void> monitorBackupProgress(BackupData* self) {
|
2020-03-21 02:25:41 +08:00
|
|
|
state Future<Void> interval;
|
|
|
|
|
2020-01-22 08:57:30 +08:00
|
|
|
loop {
|
2020-03-21 02:25:41 +08:00
|
|
|
interval = delay(SERVER_KNOBS->WORKER_LOGGING_INTERVAL / 2.0);
|
2020-03-13 11:51:10 +08:00
|
|
|
while (self->backups.empty() || !self->logSystem.get()) {
|
2020-03-21 02:25:41 +08:00
|
|
|
wait(self->changedTrigger.onTrigger() || self->logSystem.onChange());
|
2020-01-22 08:57:30 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
// check all workers have started by checking their progress is larger
|
|
|
|
// than the backup's start version.
|
2020-03-21 02:25:41 +08:00
|
|
|
state Reference<BackupProgress> progress(new BackupProgress(self->myId, {}));
|
2020-06-28 12:30:38 +08:00
|
|
|
wait(getBackupProgress(self->cx, self->myId, progress, /*logging=*/false));
|
2020-03-21 02:25:41 +08:00
|
|
|
state std::map<Tag, Version> tagVersions = progress->getEpochStatus(self->recruitedEpoch);
|
2020-03-07 03:58:10 +08:00
|
|
|
state std::map<UID, Version> savedLogVersions;
|
2020-03-23 07:31:39 +08:00
|
|
|
if (tagVersions.size() != self->totalTags) {
|
2020-03-21 02:25:41 +08:00
|
|
|
wait(interval);
|
2020-03-13 11:51:10 +08:00
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
|
|
|
// Check every version is larger than backup's startVersion
|
|
|
|
for (auto& [uid, info] : self->backups) {
|
2020-03-24 04:45:48 +08:00
|
|
|
if (self->recruitedEpoch == self->oldestBackupEpoch) {
|
|
|
|
// update update progress so far if previous epochs are done
|
2020-03-13 11:51:10 +08:00
|
|
|
Version v = std::numeric_limits<Version>::max();
|
2021-03-04 01:18:03 +08:00
|
|
|
for (const auto& [tag, version] : tagVersions) {
|
2020-03-13 11:51:10 +08:00
|
|
|
v = std::min(v, version);
|
2020-01-22 08:57:30 +08:00
|
|
|
}
|
2020-03-13 11:51:10 +08:00
|
|
|
savedLogVersions.emplace(uid, v);
|
2020-03-23 07:31:39 +08:00
|
|
|
TraceEvent("BackupWorkerSavedBackupVersion", self->myId).detail("BackupID", uid).detail("Version", v);
|
2020-03-13 11:51:10 +08:00
|
|
|
}
|
|
|
|
}
|
2020-03-23 07:31:39 +08:00
|
|
|
Future<Void> setKeys = savedLogVersions.empty() ? Void() : setBackupKeys(self, savedLogVersions);
|
2020-03-07 03:58:10 +08:00
|
|
|
|
2020-03-21 02:25:41 +08:00
|
|
|
wait(interval && setKeys);
|
2020-01-22 08:57:30 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-06-01 07:14:58 +08:00
|
|
|
ACTOR Future<Void> saveProgress(BackupData* self, Version backupVersion) {
|
2019-05-24 07:06:23 +08:00
|
|
|
state Transaction tr(self->cx);
|
2019-08-15 05:19:50 +08:00
|
|
|
state Key key = backupProgressKeyFor(self->myId);
|
2019-05-24 07:06:23 +08:00
|
|
|
|
|
|
|
loop {
|
|
|
|
try {
|
2020-03-06 03:34:37 +08:00
|
|
|
// It's critical to save progress immediately so that after a master
|
|
|
|
// recovery, the new master can know the progress so far.
|
2019-05-24 07:06:23 +08:00
|
|
|
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
|
|
|
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
|
|
|
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
|
|
|
|
2020-03-10 06:33:15 +08:00
|
|
|
WorkerBackupStatus status(self->backupEpoch, backupVersion, self->tag, self->totalTags);
|
2019-08-15 05:19:50 +08:00
|
|
|
tr.set(key, backupProgressValue(status));
|
|
|
|
tr.addReadConflictRange(singleKeyRange(key));
|
2019-05-24 07:06:23 +08:00
|
|
|
wait(tr.commit());
|
|
|
|
return Void();
|
|
|
|
} catch (Error& e) {
|
|
|
|
wait(tr.onError(e));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-02-13 02:02:27 +08:00
|
|
|
// Write a mutation to a log file. Note the mutation can be different from
|
|
|
|
// message.message for clear mutations.
|
|
|
|
ACTOR Future<Void> addMutation(Reference<IBackupFile> logFile,
|
|
|
|
VersionedMessage message,
|
|
|
|
StringRef mutation,
|
|
|
|
int64_t* blockEnd,
|
|
|
|
int blockSize) {
|
|
|
|
state int bytes = sizeof(Version) + sizeof(uint32_t) + sizeof(int) + mutation.size();
|
2020-01-31 13:21:05 +08:00
|
|
|
|
|
|
|
// Convert to big Endianness for version.version, version.sub, and msgSize
|
|
|
|
// The decoder assumes 0xFF is the end, so little endian can easily be
|
|
|
|
// mistaken as the end. In contrast, big endian for version almost guarantee
|
|
|
|
// the first byte is not 0xFF (should always be 0x00).
|
|
|
|
BinaryWriter wr(Unversioned());
|
2020-02-13 02:02:27 +08:00
|
|
|
wr << bigEndian64(message.version.version) << bigEndian32(message.version.sub) << bigEndian32(mutation.size());
|
2020-01-31 13:21:05 +08:00
|
|
|
state Standalone<StringRef> header = wr.toValue();
|
|
|
|
|
|
|
|
// Start a new block if needed
|
|
|
|
if (logFile->size() + bytes > *blockEnd) {
|
|
|
|
// Write padding if needed
|
|
|
|
const int bytesLeft = *blockEnd - logFile->size();
|
|
|
|
if (bytesLeft > 0) {
|
2020-04-28 04:59:45 +08:00
|
|
|
state Value paddingFFs = fileBackup::makePadding(bytesLeft);
|
2020-01-31 13:21:05 +08:00
|
|
|
wait(logFile->append(paddingFFs.begin(), bytesLeft));
|
|
|
|
}
|
|
|
|
|
|
|
|
*blockEnd += blockSize;
|
2020-02-15 03:27:02 +08:00
|
|
|
// write block Header
|
|
|
|
wait(logFile->append((uint8_t*)&PARTITIONED_MLOG_VERSION, sizeof(PARTITIONED_MLOG_VERSION)));
|
2020-01-31 13:21:05 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
wait(logFile->append((void*)header.begin(), header.size()));
|
2020-02-13 02:02:27 +08:00
|
|
|
wait(logFile->append(mutation.begin(), mutation.size()));
|
2020-01-31 13:21:05 +08:00
|
|
|
return Void();
|
|
|
|
}
|
|
|
|
|
2020-03-30 12:20:21 +08:00
|
|
|
ACTOR static Future<Void> updateLogBytesWritten(BackupData* self,
|
|
|
|
std::vector<UID> backupUids,
|
|
|
|
std::vector<Reference<IBackupFile>> logFiles) {
|
|
|
|
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(self->cx));
|
|
|
|
|
|
|
|
ASSERT(backupUids.size() == logFiles.size());
|
|
|
|
loop {
|
|
|
|
try {
|
|
|
|
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
|
|
|
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
|
|
|
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
|
|
|
|
|
|
|
for (int i = 0; i < backupUids.size(); i++) {
|
|
|
|
BackupConfig config(backupUids[i]);
|
|
|
|
config.logBytesWritten().atomicOp(tr, logFiles[i]->size(), MutationRef::AddValue);
|
|
|
|
}
|
|
|
|
wait(tr->commit());
|
|
|
|
return Void();
|
|
|
|
} catch (Error& e) {
|
|
|
|
wait(tr->onError(e));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-09-22 12:55:19 +08:00
|
|
|
// Saves messages in the range of [0, numMsg) to a file and then remove these
|
2020-03-20 06:58:22 +08:00
|
|
|
// messages. The file content format is a sequence of (Version, sub#, msgSize, message).
|
2020-01-31 00:35:02 +08:00
|
|
|
// Note only ready backups are saved.
|
2022-06-30 05:21:05 +08:00
|
|
|
ACTOR Future<Void> saveMutationsToFile(BackupData* self,
|
|
|
|
Version popVersion,
|
|
|
|
int numMsg,
|
|
|
|
std::unordered_set<BlobCipherDetails> cipherDetails) {
|
2020-01-08 06:15:29 +08:00
|
|
|
state int blockSize = SERVER_KNOBS->BACKUP_FILE_BLOCK_BYTES;
|
2020-01-31 00:35:02 +08:00
|
|
|
state std::vector<Future<Reference<IBackupFile>>> logFileFutures;
|
2020-02-06 03:45:16 +08:00
|
|
|
state std::vector<Reference<IBackupFile>> logFiles;
|
|
|
|
state std::vector<int64_t> blockEnds;
|
2020-03-23 07:31:39 +08:00
|
|
|
state std::vector<UID> activeUids; // active Backups' UIDs
|
2020-05-16 11:06:47 +08:00
|
|
|
state std::vector<Version> beginVersions; // logFiles' begin versions
|
2020-02-06 03:45:16 +08:00
|
|
|
state KeyRangeMap<std::set<int>> keyRangeMap; // range to index in logFileFutures, logFiles, & blockEnds
|
2020-02-13 02:02:27 +08:00
|
|
|
state std::vector<Standalone<StringRef>> mutations;
|
2022-06-30 05:21:05 +08:00
|
|
|
state std::unordered_map<BlobCipherDetails, Reference<BlobCipherKey>> cipherKeys;
|
2020-02-13 02:02:27 +08:00
|
|
|
state int idx;
|
2020-01-31 00:35:02 +08:00
|
|
|
|
Fix contract changes: backup worker generate continuous versions
Before we allow holes in version ranges in partitioned mutation logs. This
has been changed so that restore can easily figure out if database is
restorable. A specific problem is that if the backup worker didn't find any
mutations for an old epoch, the worker can just exit without generating a
log file, thus leaving holes in version ranges.
Another contract change is that if a backup key is set, then we must store
all mutations for that key, especially for the worker for the old epoch. As a
result, the worker must first check backup key, before pulling mutations and
uploading logs. Otherwise, we may lose mutations.
Finally, when a backup key is removed, the saving of mutations should be up to
the current version so that backup worker doesn't exit too early. I.e., avoid
the case saved mutation versions are less than the snapshot version taken.
2020-02-29 06:11:14 +08:00
|
|
|
// Make sure all backups are ready, otherwise mutations will be lost.
|
|
|
|
while (!self->isAllInfoReady()) {
|
|
|
|
wait(self->waitAllInfoReady());
|
|
|
|
}
|
|
|
|
|
2020-02-29 09:14:18 +08:00
|
|
|
for (auto it = self->backups.begin(); it != self->backups.end();) {
|
2020-03-05 08:27:24 +08:00
|
|
|
if (it->second.stopped || !it->second.container.get().present()) {
|
2020-02-29 09:14:18 +08:00
|
|
|
TraceEvent("BackupWorkerNoContainer", self->myId).detail("BackupId", it->first);
|
|
|
|
it = self->backups.erase(it);
|
|
|
|
continue;
|
|
|
|
}
|
2020-02-06 03:45:16 +08:00
|
|
|
const int index = logFileFutures.size();
|
2020-03-23 07:31:39 +08:00
|
|
|
activeUids.push_back(it->first);
|
2020-02-06 03:45:16 +08:00
|
|
|
self->insertRanges(keyRangeMap, it->second.ranges.get(), index);
|
2020-05-16 11:06:47 +08:00
|
|
|
|
2020-02-05 08:00:13 +08:00
|
|
|
if (it->second.lastSavedVersion == invalidVersion) {
|
True-up a backup's begin version
For the first mutation log of a backup, we need to true-up its begin version to
the exact version of the first mutation. This is needed to ensure the strict
less than relationship between two mutation logs, if one's version range is
within the other.
A problematic scenario is as follows:
Epoch 1: a mutation log A [200, 900] is saved, but its progress is NOT saved.
Epoch 2: master recruits a worker for [1, 1000], 1000 is epoch 1's end version.
New worker saves a mutation log B [100, 1000]
A's range is strict within B's range, but A's size is larger than B.
This happens because B's start version is true-up to the backup's begin version,
which is not the actual version of the first mutation. After B's begin version
is true-up to 300, we won't have this issue.
2020-04-20 01:03:47 +08:00
|
|
|
if (it->second.startVersion > self->startVersion && !self->messages.empty()) {
|
|
|
|
// True-up first mutation log's begin version
|
|
|
|
it->second.lastSavedVersion = self->messages[0].getVersion();
|
|
|
|
} else {
|
2020-05-16 11:06:47 +08:00
|
|
|
it->second.lastSavedVersion = std::max({ self->popVersion, self->savedVersion, self->startVersion });
|
True-up a backup's begin version
For the first mutation log of a backup, we need to true-up its begin version to
the exact version of the first mutation. This is needed to ensure the strict
less than relationship between two mutation logs, if one's version range is
within the other.
A problematic scenario is as follows:
Epoch 1: a mutation log A [200, 900] is saved, but its progress is NOT saved.
Epoch 2: master recruits a worker for [1, 1000], 1000 is epoch 1's end version.
New worker saves a mutation log B [100, 1000]
A's range is strict within B's range, but A's size is larger than B.
This happens because B's start version is true-up to the backup's begin version,
which is not the actual version of the first mutation. After B's begin version
is true-up to 300, we won't have this issue.
2020-04-20 01:03:47 +08:00
|
|
|
}
|
2020-05-16 11:06:47 +08:00
|
|
|
TraceEvent("BackupWorkerTrueUp", self->myId).detail("LastSavedVersion", it->second.lastSavedVersion);
|
2020-02-05 06:30:32 +08:00
|
|
|
}
|
2020-05-16 11:06:47 +08:00
|
|
|
// The true-up version can be larger than first message version, so keep
|
|
|
|
// the begin versions for later muation filtering.
|
|
|
|
beginVersions.push_back(it->second.lastSavedVersion);
|
|
|
|
|
2020-02-15 11:32:11 +08:00
|
|
|
logFileFutures.push_back(it->second.container.get().get()->writeTaggedLogFile(
|
2020-02-21 08:28:27 +08:00
|
|
|
it->second.lastSavedVersion, popVersion + 1, blockSize, self->tag.id, self->totalTags));
|
2020-02-29 09:14:18 +08:00
|
|
|
it++;
|
2020-02-13 02:02:27 +08:00
|
|
|
}
|
Fix contract changes: backup worker generate continuous versions
Before we allow holes in version ranges in partitioned mutation logs. This
has been changed so that restore can easily figure out if database is
restorable. A specific problem is that if the backup worker didn't find any
mutations for an old epoch, the worker can just exit without generating a
log file, thus leaving holes in version ranges.
Another contract change is that if a backup key is set, then we must store
all mutations for that key, especially for the worker for the old epoch. As a
result, the worker must first check backup key, before pulling mutations and
uploading logs. Otherwise, we may lose mutations.
Finally, when a backup key is removed, the saving of mutations should be up to
the current version so that backup worker doesn't exit too early. I.e., avoid
the case saved mutation versions are less than the snapshot version taken.
2020-02-29 06:11:14 +08:00
|
|
|
|
2020-02-11 05:44:08 +08:00
|
|
|
keyRangeMap.coalesce(allKeys);
|
2020-01-31 00:35:02 +08:00
|
|
|
wait(waitForAll(logFileFutures));
|
|
|
|
|
|
|
|
std::transform(logFileFutures.begin(),
|
|
|
|
logFileFutures.end(),
|
|
|
|
std::back_inserter(logFiles),
|
|
|
|
[](const Future<Reference<IBackupFile>>& f) { return f.get(); });
|
|
|
|
|
2020-05-16 11:06:47 +08:00
|
|
|
ASSERT(activeUids.size() == logFiles.size() && beginVersions.size() == logFiles.size());
|
2020-03-23 07:31:39 +08:00
|
|
|
for (int i = 0; i < logFiles.size(); i++) {
|
2020-01-31 00:35:02 +08:00
|
|
|
TraceEvent("OpenMutationFile", self->myId)
|
2020-03-23 07:31:39 +08:00
|
|
|
.detail("BackupID", activeUids[i])
|
2020-01-31 00:35:02 +08:00
|
|
|
.detail("TagId", self->tag.id)
|
2020-03-23 07:31:39 +08:00
|
|
|
.detail("File", logFiles[i]->getFileName());
|
2020-01-31 00:35:02 +08:00
|
|
|
}
|
|
|
|
|
2022-06-30 05:21:05 +08:00
|
|
|
// Fetch cipher keys if any of the messages are encrypted.
|
|
|
|
if (!cipherDetails.empty()) {
|
|
|
|
std::unordered_map<BlobCipherDetails, Reference<BlobCipherKey>> getCipherKeysResult =
|
2022-09-10 09:43:09 +08:00
|
|
|
wait(getEncryptCipherKeys(self->db, cipherDetails, BlobCipherMetrics::BLOB_GRANULE));
|
2022-06-30 05:21:05 +08:00
|
|
|
cipherKeys = getCipherKeysResult;
|
|
|
|
}
|
|
|
|
|
2020-02-06 03:45:16 +08:00
|
|
|
blockEnds = std::vector<int64_t>(logFiles.size(), 0);
|
2020-02-13 02:02:27 +08:00
|
|
|
for (idx = 0; idx < numMsg; idx++) {
|
2022-06-30 05:21:05 +08:00
|
|
|
auto& message = self->messages[idx];
|
2020-02-13 02:02:27 +08:00
|
|
|
MutationRef m;
|
2022-06-30 05:21:05 +08:00
|
|
|
if (!message.isBackupMessage(&m, cipherKeys))
|
2020-02-13 02:02:27 +08:00
|
|
|
continue;
|
2019-11-18 11:14:14 +08:00
|
|
|
|
2020-05-14 05:28:04 +08:00
|
|
|
DEBUG_MUTATION("addMutation", message.version.version, m)
|
2020-03-18 05:45:07 +08:00
|
|
|
.detail("KCV", self->minKnownCommittedVersion)
|
|
|
|
.detail("SavedVersion", self->savedVersion);
|
|
|
|
|
2020-01-31 13:21:05 +08:00
|
|
|
std::vector<Future<Void>> adds;
|
|
|
|
if (m.type != MutationRef::Type::ClearRange) {
|
2020-02-06 03:45:16 +08:00
|
|
|
for (int index : keyRangeMap[m.param1]) {
|
2020-05-16 11:06:47 +08:00
|
|
|
if (message.getVersion() >= beginVersions[index]) {
|
|
|
|
adds.push_back(
|
|
|
|
addMutation(logFiles[index], message, message.message, &blockEnds[index], blockSize));
|
|
|
|
}
|
2020-01-31 13:21:05 +08:00
|
|
|
}
|
|
|
|
} else {
|
|
|
|
KeyRangeRef mutationRange(m.param1, m.param2);
|
2020-02-13 03:43:14 +08:00
|
|
|
KeyRangeRef intersectionRange;
|
2020-01-31 13:21:05 +08:00
|
|
|
|
|
|
|
// Find intersection ranges and create mutations for sub-ranges
|
2020-02-06 03:45:16 +08:00
|
|
|
for (auto range : keyRangeMap.intersectingRanges(mutationRange)) {
|
2020-01-31 13:21:05 +08:00
|
|
|
const auto& subrange = range.range();
|
|
|
|
intersectionRange = mutationRange & subrange;
|
|
|
|
MutationRef subm(MutationRef::Type::ClearRange, intersectionRange.begin, intersectionRange.end);
|
2020-09-29 01:58:49 +08:00
|
|
|
BinaryWriter wr(AssumeVersion(g_network->protocolVersion()));
|
2020-02-13 02:02:27 +08:00
|
|
|
wr << subm;
|
|
|
|
mutations.push_back(wr.toValue());
|
2020-02-06 03:45:16 +08:00
|
|
|
for (int index : range.value()) {
|
2020-05-16 11:06:47 +08:00
|
|
|
if (message.getVersion() >= beginVersions[index]) {
|
|
|
|
adds.push_back(
|
|
|
|
addMutation(logFiles[index], message, mutations.back(), &blockEnds[index], blockSize));
|
|
|
|
}
|
2020-01-31 00:35:02 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2020-01-31 13:21:05 +08:00
|
|
|
wait(waitForAll(adds));
|
2020-02-13 02:02:27 +08:00
|
|
|
mutations.clear();
|
2019-09-22 12:55:19 +08:00
|
|
|
}
|
|
|
|
|
2020-01-31 00:35:02 +08:00
|
|
|
std::vector<Future<Void>> finished;
|
|
|
|
std::transform(logFiles.begin(), logFiles.end(), std::back_inserter(finished), [](const Reference<IBackupFile>& f) {
|
|
|
|
return f->finish();
|
|
|
|
});
|
|
|
|
|
|
|
|
wait(waitForAll(finished));
|
|
|
|
|
|
|
|
for (const auto& file : logFiles) {
|
|
|
|
TraceEvent("CloseMutationFile", self->myId)
|
|
|
|
.detail("FileSize", file->size())
|
|
|
|
.detail("TagId", self->tag.id)
|
|
|
|
.detail("File", file->getFileName());
|
|
|
|
}
|
2021-03-04 01:18:03 +08:00
|
|
|
for (const UID& uid : activeUids) {
|
2020-02-06 03:45:16 +08:00
|
|
|
self->backups[uid].lastSavedVersion = popVersion + 1;
|
2020-02-05 06:30:32 +08:00
|
|
|
}
|
2019-11-18 11:14:14 +08:00
|
|
|
|
2020-03-30 12:20:21 +08:00
|
|
|
wait(updateLogBytesWritten(self, activeUids, logFiles));
|
2019-09-22 12:55:19 +08:00
|
|
|
return Void();
|
|
|
|
}
|
|
|
|
|
2019-09-23 04:23:27 +08:00
|
|
|
// Uploads self->messages to cloud storage and updates savedVersion.
|
2019-05-23 01:52:46 +08:00
|
|
|
ACTOR Future<Void> uploadData(BackupData* self) {
|
2019-05-24 07:06:23 +08:00
|
|
|
state Version popVersion = invalidVersion;
|
|
|
|
|
|
|
|
loop {
|
2020-03-20 06:58:22 +08:00
|
|
|
// Too large uploadDelay will delay popping tLog data for too long.
|
2020-03-19 10:04:43 +08:00
|
|
|
state Future<Void> uploadDelay = delay(SERVER_KNOBS->BACKUP_UPLOAD_DELAY);
|
2019-08-15 05:19:50 +08:00
|
|
|
|
Fix contract changes: backup worker generate continuous versions
Before we allow holes in version ranges in partitioned mutation logs. This
has been changed so that restore can easily figure out if database is
restorable. A specific problem is that if the backup worker didn't find any
mutations for an old epoch, the worker can just exit without generating a
log file, thus leaving holes in version ranges.
Another contract change is that if a backup key is set, then we must store
all mutations for that key, especially for the worker for the old epoch. As a
result, the worker must first check backup key, before pulling mutations and
uploading logs. Otherwise, we may lose mutations.
Finally, when a backup key is removed, the saving of mutations should be up to
the current version so that backup worker doesn't exit too early. I.e., avoid
the case saved mutation versions are less than the snapshot version taken.
2020-02-29 06:11:14 +08:00
|
|
|
state int numMsg = 0;
|
2022-06-30 05:21:05 +08:00
|
|
|
state std::unordered_set<BlobCipherDetails> cipherDetails;
|
2020-03-03 05:29:42 +08:00
|
|
|
Version lastPopVersion = popVersion;
|
2020-04-21 11:22:04 +08:00
|
|
|
// index of last version's end position in self->messages
|
|
|
|
int lastVersionIndex = 0;
|
|
|
|
Version lastVersion = invalidVersion;
|
|
|
|
|
2019-08-12 11:15:50 +08:00
|
|
|
if (self->messages.empty()) {
|
|
|
|
// Even though messages is empty, we still want to advance popVersion.
|
2020-03-03 05:29:42 +08:00
|
|
|
if (!self->endVersion.present()) {
|
|
|
|
popVersion = std::max(popVersion, self->minKnownCommittedVersion);
|
|
|
|
}
|
2019-08-12 11:15:50 +08:00
|
|
|
} else {
|
2022-06-30 05:21:05 +08:00
|
|
|
for (auto& message : self->messages) {
|
2020-03-20 06:58:22 +08:00
|
|
|
// message may be prefetched in peek; uncommitted message should not be uploaded.
|
2020-04-21 11:22:04 +08:00
|
|
|
const Version version = message.getVersion();
|
|
|
|
if (version > self->maxPopVersion())
|
|
|
|
break;
|
|
|
|
if (version > popVersion) {
|
|
|
|
lastVersionIndex = numMsg;
|
|
|
|
lastVersion = popVersion;
|
|
|
|
popVersion = version;
|
|
|
|
}
|
2022-06-30 05:21:05 +08:00
|
|
|
message.collectCipherDetailIfEncrypted(cipherDetails);
|
2019-09-10 01:21:16 +08:00
|
|
|
numMsg++;
|
2019-08-12 11:15:50 +08:00
|
|
|
}
|
2019-09-10 01:21:16 +08:00
|
|
|
}
|
2020-03-05 04:32:06 +08:00
|
|
|
if (self->pullFinished()) {
|
2020-03-04 13:04:50 +08:00
|
|
|
popVersion = self->endVersion.get();
|
2020-04-21 11:22:04 +08:00
|
|
|
} else {
|
|
|
|
// make sure file is saved on version boundary
|
|
|
|
popVersion = lastVersion;
|
|
|
|
numMsg = lastVersionIndex;
|
2021-09-24 08:03:32 +08:00
|
|
|
|
|
|
|
// If we aren't able to process any messages and the lock is blocking us from
|
|
|
|
// queuing more, then we are stuck. This could suggest the lock capacity is too small.
|
|
|
|
ASSERT(numMsg > 0 || self->lock->waiters() == 0);
|
2020-03-04 13:04:50 +08:00
|
|
|
}
|
2020-03-05 08:27:24 +08:00
|
|
|
if (((numMsg > 0 || popVersion > lastPopVersion) && self->pulling) || self->pullFinished()) {
|
2020-03-04 13:04:50 +08:00
|
|
|
TraceEvent("BackupWorkerSave", self->myId)
|
|
|
|
.detail("Version", popVersion)
|
2020-04-21 11:22:04 +08:00
|
|
|
.detail("LastPopVersion", lastPopVersion)
|
|
|
|
.detail("Pulling", self->pulling)
|
2020-04-18 13:55:09 +08:00
|
|
|
.detail("SavedVersion", self->savedVersion)
|
2020-04-21 11:22:04 +08:00
|
|
|
.detail("NumMsg", numMsg)
|
2020-03-04 13:04:50 +08:00
|
|
|
.detail("MsgQ", self->messages.size());
|
Fix contract changes: backup worker generate continuous versions
Before we allow holes in version ranges in partitioned mutation logs. This
has been changed so that restore can easily figure out if database is
restorable. A specific problem is that if the backup worker didn't find any
mutations for an old epoch, the worker can just exit without generating a
log file, thus leaving holes in version ranges.
Another contract change is that if a backup key is set, then we must store
all mutations for that key, especially for the worker for the old epoch. As a
result, the worker must first check backup key, before pulling mutations and
uploading logs. Otherwise, we may lose mutations.
Finally, when a backup key is removed, the saving of mutations should be up to
the current version so that backup worker doesn't exit too early. I.e., avoid
the case saved mutation versions are less than the snapshot version taken.
2020-02-29 06:11:14 +08:00
|
|
|
// save an empty file for old epochs so that log file versions are continuous
|
2022-06-30 05:21:05 +08:00
|
|
|
wait(saveMutationsToFile(self, popVersion, numMsg, cipherDetails));
|
2020-05-15 03:05:16 +08:00
|
|
|
self->eraseMessages(numMsg);
|
Fix contract changes: backup worker generate continuous versions
Before we allow holes in version ranges in partitioned mutation logs. This
has been changed so that restore can easily figure out if database is
restorable. A specific problem is that if the backup worker didn't find any
mutations for an old epoch, the worker can just exit without generating a
log file, thus leaving holes in version ranges.
Another contract change is that if a backup key is set, then we must store
all mutations for that key, especially for the worker for the old epoch. As a
result, the worker must first check backup key, before pulling mutations and
uploading logs. Otherwise, we may lose mutations.
Finally, when a backup key is removed, the saving of mutations should be up to
the current version so that backup worker doesn't exit too early. I.e., avoid
the case saved mutation versions are less than the snapshot version taken.
2020-02-29 06:11:14 +08:00
|
|
|
}
|
2019-08-12 11:15:50 +08:00
|
|
|
|
2020-03-05 08:27:24 +08:00
|
|
|
// If transition into NOOP mode, should clear messages
|
2020-05-22 03:19:57 +08:00
|
|
|
if (!self->pulling && self->backupEpoch == self->recruitedEpoch) {
|
2020-05-15 03:05:16 +08:00
|
|
|
self->eraseMessages(self->messages.size());
|
2020-04-18 13:55:09 +08:00
|
|
|
}
|
2020-03-05 08:27:24 +08:00
|
|
|
|
2020-04-19 01:24:08 +08:00
|
|
|
if (popVersion > self->savedVersion && popVersion > self->popVersion) {
|
2019-09-22 12:55:19 +08:00
|
|
|
wait(saveProgress(self, popVersion));
|
|
|
|
TraceEvent("BackupWorkerSavedProgress", self->myId)
|
2019-09-29 03:48:28 +08:00
|
|
|
.detail("Tag", self->tag.toString())
|
2019-09-22 12:55:19 +08:00
|
|
|
.detail("Version", popVersion)
|
|
|
|
.detail("MsgQ", self->messages.size());
|
|
|
|
self->savedVersion = std::max(popVersion, self->savedVersion);
|
|
|
|
self->pop();
|
2019-07-24 02:45:04 +08:00
|
|
|
}
|
2019-08-12 11:15:50 +08:00
|
|
|
|
2020-03-20 05:59:38 +08:00
|
|
|
if (self->allMessageSaved()) {
|
2020-05-15 03:05:16 +08:00
|
|
|
self->eraseMessages(self->messages.size());
|
2020-03-20 05:59:38 +08:00
|
|
|
return Void();
|
|
|
|
}
|
|
|
|
|
2020-03-03 05:29:42 +08:00
|
|
|
if (!self->pullFinished()) {
|
2020-03-12 11:47:54 +08:00
|
|
|
wait(uploadDelay || self->doneTrigger.onTrigger());
|
2019-09-23 04:23:27 +08:00
|
|
|
}
|
2019-05-24 07:06:23 +08:00
|
|
|
}
|
2019-05-23 01:52:46 +08:00
|
|
|
}
|
|
|
|
|
2019-04-25 06:12:37 +08:00
|
|
|
// Pulls data from TLog servers using LogRouter tag.
|
|
|
|
ACTOR Future<Void> pullAsyncData(BackupData* self) {
|
|
|
|
state Future<Void> logSystemChange = Void();
|
|
|
|
state Reference<ILogSystem::IPeekCursor> r;
|
2020-03-03 05:29:42 +08:00
|
|
|
state Version tagAt = std::max(self->pulledVersion.get(), std::max(self->startVersion, self->savedVersion));
|
2020-05-17 09:50:09 +08:00
|
|
|
state Arena prev;
|
2019-04-25 06:12:37 +08:00
|
|
|
|
2021-07-27 10:55:10 +08:00
|
|
|
TraceEvent("BackupWorkerPull", self->myId).log();
|
2019-04-25 06:12:37 +08:00
|
|
|
loop {
|
2020-04-03 06:28:51 +08:00
|
|
|
while (self->paused.get()) {
|
|
|
|
wait(self->paused.onChange());
|
|
|
|
}
|
|
|
|
|
2019-04-25 06:12:37 +08:00
|
|
|
loop choose {
|
2022-04-09 05:28:16 +08:00
|
|
|
when(wait(r ? r->getMore(TaskPriority::TLogCommit) : Never())) { break; }
|
2019-04-25 06:12:37 +08:00
|
|
|
when(wait(logSystemChange)) {
|
|
|
|
if (self->logSystem.get()) {
|
2019-05-24 07:06:23 +08:00
|
|
|
r = self->logSystem.get()->peekLogRouter(self->myId, tagAt, self->tag);
|
2019-04-25 06:12:37 +08:00
|
|
|
} else {
|
|
|
|
r = Reference<ILogSystem::IPeekCursor>();
|
|
|
|
}
|
|
|
|
logSystemChange = self->logSystem.onChange();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
self->minKnownCommittedVersion = std::max(self->minKnownCommittedVersion, r->getMinKnownCommittedVersion());
|
|
|
|
|
2019-09-10 01:21:16 +08:00
|
|
|
// Note we aggressively peek (uncommitted) messages, but only committed
|
|
|
|
// messages/mutations will be flushed to disk/blob in uploadData().
|
2019-04-25 06:12:37 +08:00
|
|
|
while (r->hasMessage()) {
|
2020-07-14 12:10:34 +08:00
|
|
|
if (!prev.sameArena(r->arena())) {
|
2020-05-21 04:26:57 +08:00
|
|
|
TraceEvent(SevDebugMemory, "BackupWorkerMemory", self->myId)
|
|
|
|
.detail("Take", r->arena().getSize())
|
|
|
|
.detail("Current", self->lock->activePermits());
|
|
|
|
|
2020-05-15 10:46:30 +08:00
|
|
|
wait(self->lock->take(TaskPriority::DefaultYield, r->arena().getSize()));
|
|
|
|
prev = r->arena();
|
|
|
|
}
|
2020-05-17 01:52:11 +08:00
|
|
|
self->messages.emplace_back(r->version(), r->getMessage(), r->getTags(), r->arena());
|
2019-04-25 06:12:37 +08:00
|
|
|
r->nextMessage();
|
|
|
|
}
|
|
|
|
|
2020-01-10 02:15:42 +08:00
|
|
|
tagAt = r->version().version;
|
2020-03-03 05:29:42 +08:00
|
|
|
self->pulledVersion.set(tagAt);
|
2019-08-07 02:14:32 +08:00
|
|
|
TraceEvent("BackupWorkerGot", self->myId).suppressFor(1.0).detail("V", tagAt);
|
2020-03-03 05:29:42 +08:00
|
|
|
if (self->pullFinished()) {
|
2019-09-23 04:23:27 +08:00
|
|
|
self->eraseMessagesAfterEndVersion();
|
2020-03-12 11:47:54 +08:00
|
|
|
self->doneTrigger.trigger();
|
2019-08-13 10:10:46 +08:00
|
|
|
TraceEvent("BackupWorkerFinishPull", self->myId)
|
2019-09-29 03:48:28 +08:00
|
|
|
.detail("Tag", self->tag.toString())
|
2019-08-13 10:10:46 +08:00
|
|
|
.detail("VersionGot", tagAt)
|
2019-09-22 12:55:19 +08:00
|
|
|
.detail("EndVersion", self->endVersion.get())
|
|
|
|
.detail("MsgQ", self->messages.size());
|
2019-08-13 10:10:46 +08:00
|
|
|
return Void();
|
|
|
|
}
|
2019-08-12 11:15:50 +08:00
|
|
|
wait(yield());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-03-13 05:38:40 +08:00
|
|
|
ACTOR Future<Void> monitorBackupKeyOrPullData(BackupData* self, bool keyPresent) {
|
Fix contract changes: backup worker generate continuous versions
Before we allow holes in version ranges in partitioned mutation logs. This
has been changed so that restore can easily figure out if database is
restorable. A specific problem is that if the backup worker didn't find any
mutations for an old epoch, the worker can just exit without generating a
log file, thus leaving holes in version ranges.
Another contract change is that if a backup key is set, then we must store
all mutations for that key, especially for the worker for the old epoch. As a
result, the worker must first check backup key, before pulling mutations and
uploading logs. Otherwise, we may lose mutations.
Finally, when a backup key is removed, the saving of mutations should be up to
the current version so that backup worker doesn't exit too early. I.e., avoid
the case saved mutation versions are less than the snapshot version taken.
2020-02-29 06:11:14 +08:00
|
|
|
state Future<Void> pullFinished = Void();
|
2020-01-25 03:01:58 +08:00
|
|
|
|
2020-01-08 02:27:52 +08:00
|
|
|
loop {
|
2020-03-13 05:38:40 +08:00
|
|
|
state Future<bool> present = monitorBackupStartedKeyChanges(self, !keyPresent, /*watch=*/true);
|
|
|
|
if (keyPresent) {
|
|
|
|
pullFinished = pullAsyncData(self);
|
|
|
|
self->pulling = true;
|
|
|
|
wait(success(present) || pullFinished);
|
|
|
|
if (pullFinished.isReady()) {
|
|
|
|
self->pulling = false;
|
|
|
|
return Void(); // backup is done for some old epoch.
|
Fix contract changes: backup worker generate continuous versions
Before we allow holes in version ranges in partitioned mutation logs. This
has been changed so that restore can easily figure out if database is
restorable. A specific problem is that if the backup worker didn't find any
mutations for an old epoch, the worker can just exit without generating a
log file, thus leaving holes in version ranges.
Another contract change is that if a backup key is set, then we must store
all mutations for that key, especially for the worker for the old epoch. As a
result, the worker must first check backup key, before pulling mutations and
uploading logs. Otherwise, we may lose mutations.
Finally, when a backup key is removed, the saving of mutations should be up to
the current version so that backup worker doesn't exit too early. I.e., avoid
the case saved mutation versions are less than the snapshot version taken.
2020-02-29 06:11:14 +08:00
|
|
|
}
|
2020-01-08 02:27:52 +08:00
|
|
|
|
2020-03-13 05:38:40 +08:00
|
|
|
// Even though the snapshot is done, mutation logs may not be written
|
|
|
|
// out yet. We need to make sure mutations up to this point is written.
|
|
|
|
Version currentVersion = wait(self->getMinKnownCommittedVersion());
|
|
|
|
wait(self->pulledVersion.whenAtLeast(currentVersion));
|
|
|
|
pullFinished = Future<Void>(); // cancels pullAsyncData()
|
2020-03-03 05:29:42 +08:00
|
|
|
self->pulling = false;
|
2020-07-23 14:35:46 +08:00
|
|
|
TraceEvent("BackupWorkerPaused", self->myId).detail("Reson", "NoBackup");
|
2020-03-13 05:38:40 +08:00
|
|
|
} else {
|
|
|
|
// Backup key is not present, enter this NOOP POP mode.
|
|
|
|
state Future<Version> committedVersion = self->getMinKnownCommittedVersion();
|
|
|
|
|
|
|
|
loop choose {
|
2022-04-09 05:28:16 +08:00
|
|
|
when(wait(success(present))) { break; }
|
2020-03-13 05:38:40 +08:00
|
|
|
when(wait(success(committedVersion) || delay(SERVER_KNOBS->BACKUP_NOOP_POP_DELAY, self->cx->taskID))) {
|
|
|
|
if (committedVersion.isReady()) {
|
2020-04-18 13:55:09 +08:00
|
|
|
self->popVersion =
|
|
|
|
std::max(self->popVersion, std::max(committedVersion.get(), self->savedVersion));
|
2020-03-13 05:38:40 +08:00
|
|
|
self->minKnownCommittedVersion =
|
|
|
|
std::max(committedVersion.get(), self->minKnownCommittedVersion);
|
2020-04-18 13:55:09 +08:00
|
|
|
TraceEvent("BackupWorkerNoopPop", self->myId)
|
|
|
|
.detail("SavedVersion", self->savedVersion)
|
|
|
|
.detail("PopVersion", self->popVersion);
|
2020-03-13 05:38:40 +08:00
|
|
|
self->pop(); // Pop while the worker is in this NOOP state.
|
|
|
|
committedVersion = Never();
|
|
|
|
} else {
|
|
|
|
committedVersion = self->getMinKnownCommittedVersion();
|
|
|
|
}
|
Fix contract changes: backup worker generate continuous versions
Before we allow holes in version ranges in partitioned mutation logs. This
has been changed so that restore can easily figure out if database is
restorable. A specific problem is that if the backup worker didn't find any
mutations for an old epoch, the worker can just exit without generating a
log file, thus leaving holes in version ranges.
Another contract change is that if a backup key is set, then we must store
all mutations for that key, especially for the worker for the old epoch. As a
result, the worker must first check backup key, before pulling mutations and
uploading logs. Otherwise, we may lose mutations.
Finally, when a backup key is removed, the saving of mutations should be up to
the current version so that backup worker doesn't exit too early. I.e., avoid
the case saved mutation versions are less than the snapshot version taken.
2020-02-29 06:11:14 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2020-04-18 13:55:09 +08:00
|
|
|
ASSERT(!keyPresent == present.get());
|
2020-03-13 05:38:40 +08:00
|
|
|
keyPresent = !keyPresent;
|
2020-01-08 06:15:29 +08:00
|
|
|
}
|
2020-01-08 02:27:52 +08:00
|
|
|
}
|
|
|
|
|
2021-07-12 12:54:36 +08:00
|
|
|
ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo> const> db, LogEpoch recoveryCount, BackupData* self) {
|
2019-05-16 07:13:04 +08:00
|
|
|
loop {
|
|
|
|
bool isDisplaced =
|
2019-08-13 04:15:15 +08:00
|
|
|
db->get().recoveryCount > recoveryCount && db->get().recoveryState != RecoveryState::UNINITIALIZED;
|
2019-05-16 07:13:04 +08:00
|
|
|
if (isDisplaced) {
|
2019-06-01 07:14:58 +08:00
|
|
|
TraceEvent("BackupWorkerDisplaced", self->myId)
|
|
|
|
.detail("RecoveryCount", recoveryCount)
|
2019-08-12 11:15:50 +08:00
|
|
|
.detail("SavedVersion", self->savedVersion)
|
2019-08-13 04:15:15 +08:00
|
|
|
.detail("BackupWorkers", describe(db->get().logSystemConfig.tLogs))
|
2019-06-01 07:14:58 +08:00
|
|
|
.detail("DBRecoveryCount", db->get().recoveryCount)
|
2019-07-24 02:45:04 +08:00
|
|
|
.detail("RecoveryState", (int)db->get().recoveryState);
|
2019-06-01 07:14:58 +08:00
|
|
|
throw worker_removed();
|
2019-05-16 07:13:04 +08:00
|
|
|
}
|
|
|
|
wait(db->onChange());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-04-03 06:28:51 +08:00
|
|
|
ACTOR static Future<Void> monitorWorkerPause(BackupData* self) {
|
|
|
|
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(self->cx));
|
|
|
|
state Future<Void> watch;
|
|
|
|
|
|
|
|
loop {
|
|
|
|
try {
|
|
|
|
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
|
|
|
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
|
|
|
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
|
|
|
|
|
|
|
Optional<Value> value = wait(tr->get(backupPausedKey));
|
2020-04-08 06:46:34 +08:00
|
|
|
bool paused = value.present() && value.get() == LiteralStringRef("1");
|
2020-04-03 06:28:51 +08:00
|
|
|
if (self->paused.get() != paused) {
|
2021-07-27 10:55:10 +08:00
|
|
|
TraceEvent(paused ? "BackupWorkerPaused" : "BackupWorkerResumed", self->myId).log();
|
2020-04-03 06:28:51 +08:00
|
|
|
self->paused.set(paused);
|
|
|
|
}
|
|
|
|
|
|
|
|
watch = tr->watch(backupPausedKey);
|
|
|
|
wait(tr->commit());
|
|
|
|
wait(watch);
|
|
|
|
tr->reset();
|
|
|
|
} catch (Error& e) {
|
|
|
|
wait(tr->onError(e));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-05-21 05:22:31 +08:00
|
|
|
ACTOR Future<Void> backupWorker(BackupInterface interf,
|
|
|
|
InitializeBackupRequest req,
|
2021-07-12 12:54:36 +08:00
|
|
|
Reference<AsyncVar<ServerDBInfo> const> db) {
|
2020-01-25 03:01:58 +08:00
|
|
|
state BackupData self(interf.id(), db, req);
|
2019-04-25 06:12:37 +08:00
|
|
|
state PromiseStream<Future<Void>> addActor;
|
2019-05-16 07:13:04 +08:00
|
|
|
state Future<Void> error = actorCollection(addActor.getFuture());
|
2019-04-25 06:12:37 +08:00
|
|
|
state Future<Void> dbInfoChange = Void();
|
2020-03-05 04:32:06 +08:00
|
|
|
state Future<Void> pull;
|
|
|
|
state Future<Void> done;
|
2019-04-25 06:12:37 +08:00
|
|
|
|
2019-09-29 03:48:28 +08:00
|
|
|
TraceEvent("BackupWorkerStart", self.myId)
|
2019-06-01 07:14:58 +08:00
|
|
|
.detail("Tag", req.routerTag.toString())
|
2020-02-21 08:28:27 +08:00
|
|
|
.detail("TotalTags", req.totalTags)
|
2019-06-01 07:14:58 +08:00
|
|
|
.detail("StartVersion", req.startVersion)
|
2019-08-12 11:15:50 +08:00
|
|
|
.detail("EndVersion", req.endVersion.present() ? req.endVersion.get() : -1)
|
2019-07-30 01:37:42 +08:00
|
|
|
.detail("LogEpoch", req.recruitedEpoch)
|
|
|
|
.detail("BackupEpoch", req.backupEpoch);
|
2019-04-25 06:12:37 +08:00
|
|
|
try {
|
2019-08-15 05:19:50 +08:00
|
|
|
addActor.send(checkRemoved(db, req.recruitedEpoch, &self));
|
2019-05-21 05:22:31 +08:00
|
|
|
addActor.send(waitFailureServer(interf.waitFailure.getFuture()));
|
2020-01-22 08:57:30 +08:00
|
|
|
if (req.recruitedEpoch == req.backupEpoch && req.routerTag.id == 0) {
|
2020-03-23 09:19:26 +08:00
|
|
|
addActor.send(monitorBackupProgress(&self));
|
2020-01-22 08:57:30 +08:00
|
|
|
}
|
2020-04-03 06:28:51 +08:00
|
|
|
addActor.send(monitorWorkerPause(&self));
|
2019-04-25 06:12:37 +08:00
|
|
|
|
Fix contract changes: backup worker generate continuous versions
Before we allow holes in version ranges in partitioned mutation logs. This
has been changed so that restore can easily figure out if database is
restorable. A specific problem is that if the backup worker didn't find any
mutations for an old epoch, the worker can just exit without generating a
log file, thus leaving holes in version ranges.
Another contract change is that if a backup key is set, then we must store
all mutations for that key, especially for the worker for the old epoch. As a
result, the worker must first check backup key, before pulling mutations and
uploading logs. Otherwise, we may lose mutations.
Finally, when a backup key is removed, the saving of mutations should be up to
the current version so that backup worker doesn't exit too early. I.e., avoid
the case saved mutation versions are less than the snapshot version taken.
2020-02-29 06:11:14 +08:00
|
|
|
// Check if backup key is present to avoid race between this check and
|
|
|
|
// noop pop as well as upload data: pop or skip upload before knowing
|
2020-03-18 12:35:44 +08:00
|
|
|
// there are backup keys. Set the "exitEarly" flag if needed.
|
Fix contract changes: backup worker generate continuous versions
Before we allow holes in version ranges in partitioned mutation logs. This
has been changed so that restore can easily figure out if database is
restorable. A specific problem is that if the backup worker didn't find any
mutations for an old epoch, the worker can just exit without generating a
log file, thus leaving holes in version ranges.
Another contract change is that if a backup key is set, then we must store
all mutations for that key, especially for the worker for the old epoch. As a
result, the worker must first check backup key, before pulling mutations and
uploading logs. Otherwise, we may lose mutations.
Finally, when a backup key is removed, the saving of mutations should be up to
the current version so that backup worker doesn't exit too early. I.e., avoid
the case saved mutation versions are less than the snapshot version taken.
2020-02-29 06:11:14 +08:00
|
|
|
bool present = wait(monitorBackupStartedKeyChanges(&self, true, false));
|
2020-03-18 12:35:44 +08:00
|
|
|
TraceEvent("BackupWorkerWaitKey", self.myId).detail("Present", present).detail("ExitEarly", self.exitEarly);
|
Fix contract changes: backup worker generate continuous versions
Before we allow holes in version ranges in partitioned mutation logs. This
has been changed so that restore can easily figure out if database is
restorable. A specific problem is that if the backup worker didn't find any
mutations for an old epoch, the worker can just exit without generating a
log file, thus leaving holes in version ranges.
Another contract change is that if a backup key is set, then we must store
all mutations for that key, especially for the worker for the old epoch. As a
result, the worker must first check backup key, before pulling mutations and
uploading logs. Otherwise, we may lose mutations.
Finally, when a backup key is removed, the saving of mutations should be up to
the current version so that backup worker doesn't exit too early. I.e., avoid
the case saved mutation versions are less than the snapshot version taken.
2020-02-29 06:11:14 +08:00
|
|
|
|
2020-03-18 12:35:44 +08:00
|
|
|
pull = self.exitEarly ? Void() : monitorBackupKeyOrPullData(&self, present);
|
|
|
|
done = self.exitEarly ? Void() : uploadData(&self);
|
2019-08-19 12:22:26 +08:00
|
|
|
|
2019-04-25 06:12:37 +08:00
|
|
|
loop choose {
|
2019-05-16 07:13:04 +08:00
|
|
|
when(wait(dbInfoChange)) {
|
2019-04-25 06:12:37 +08:00
|
|
|
dbInfoChange = db->onChange();
|
2019-08-09 07:02:49 +08:00
|
|
|
Reference<ILogSystem> ls = ILogSystem::fromServerDBInfo(self.myId, db->get(), true);
|
2019-08-15 08:00:20 +08:00
|
|
|
bool hasPseudoLocality = ls.isValid() && ls->hasPseudoLocality(tagLocalityBackup);
|
|
|
|
if (hasPseudoLocality) {
|
2019-08-09 07:02:49 +08:00
|
|
|
self.logSystem.set(ls);
|
2020-03-18 05:45:07 +08:00
|
|
|
self.oldestBackupEpoch = std::max(self.oldestBackupEpoch, ls->getOldestBackupEpoch());
|
2019-08-09 07:02:49 +08:00
|
|
|
}
|
2019-09-29 03:48:28 +08:00
|
|
|
TraceEvent("BackupWorkerLogSystem", self.myId)
|
2019-08-15 08:00:20 +08:00
|
|
|
.detail("HasBackupLocality", hasPseudoLocality)
|
2020-03-18 05:45:07 +08:00
|
|
|
.detail("OldestBackupEpoch", self.oldestBackupEpoch)
|
2019-08-15 08:00:20 +08:00
|
|
|
.detail("Tag", self.tag.toString());
|
2019-04-25 06:12:37 +08:00
|
|
|
}
|
2019-08-19 12:22:26 +08:00
|
|
|
when(wait(done)) {
|
2019-09-29 03:48:28 +08:00
|
|
|
TraceEvent("BackupWorkerDone", self.myId).detail("BackupEpoch", self.backupEpoch);
|
2019-08-15 08:00:20 +08:00
|
|
|
// Notify master so that this worker can be removed from log system, then this
|
|
|
|
// worker (for an old epoch's unfinished work) can safely exit.
|
2022-01-07 04:15:51 +08:00
|
|
|
wait(brokenPromiseToNever(db->get().clusterInterface.notifyBackupWorkerDone.getReply(
|
2019-09-23 04:23:27 +08:00
|
|
|
BackupWorkerDoneRequest(self.myId, self.backupEpoch))));
|
2019-08-15 08:00:20 +08:00
|
|
|
break;
|
2019-06-01 07:14:58 +08:00
|
|
|
}
|
2019-05-16 07:13:04 +08:00
|
|
|
when(wait(error)) {}
|
2019-04-25 06:12:37 +08:00
|
|
|
}
|
2019-05-16 07:13:04 +08:00
|
|
|
} catch (Error& e) {
|
2020-03-05 04:32:06 +08:00
|
|
|
state Error err = e;
|
|
|
|
if (e.code() == error_code_worker_removed) {
|
|
|
|
pull = Void(); // cancels pulling
|
2020-03-24 09:48:06 +08:00
|
|
|
self.stop();
|
2020-06-28 12:20:46 +08:00
|
|
|
try {
|
|
|
|
wait(done);
|
|
|
|
} catch (Error& e) {
|
2022-02-25 04:25:52 +08:00
|
|
|
TraceEvent("BackupWorkerShutdownError", self.myId).errorUnsuppressed(e);
|
2020-06-28 12:20:46 +08:00
|
|
|
}
|
2020-03-05 04:32:06 +08:00
|
|
|
}
|
2022-02-25 04:25:52 +08:00
|
|
|
TraceEvent("BackupWorkerTerminated", self.myId).errorUnsuppressed(err);
|
2020-03-05 04:32:06 +08:00
|
|
|
if (err.code() != error_code_actor_cancelled && err.code() != error_code_worker_removed) {
|
|
|
|
throw err;
|
2019-04-25 06:12:37 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
return Void();
|
2020-02-11 05:44:08 +08:00
|
|
|
}
|