diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index f1618eebad..e8bdfc1bcf 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -673,6 +673,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( FETCH_KEYS_PARALLELISM, 2 ); init( FETCH_KEYS_LOWER_PRIORITY, 0 ); init( FETCH_CHANGEFEED_PARALLELISM, 2 ); + init( SERVE_FETCH_CHECKPOINT_PARALLELISM, 4 ); init( BUGGIFY_BLOCK_BYTES, 10000 ); init( STORAGE_RECOVERY_VERSION_LAG_LIMIT, 2 * MAX_READ_TRANSACTION_LIFE_VERSIONS ); init( STORAGE_COMMIT_BYTES, 10000000 ); if( randomize && BUGGIFY ) STORAGE_COMMIT_BYTES = 2000000; diff --git a/fdbclient/ServerKnobs.h b/fdbclient/ServerKnobs.h index 19273218fa..84b320a356 100644 --- a/fdbclient/ServerKnobs.h +++ b/fdbclient/ServerKnobs.h @@ -622,6 +622,7 @@ public: int FETCH_KEYS_PARALLELISM; int FETCH_KEYS_LOWER_PRIORITY; int FETCH_CHANGEFEED_PARALLELISM; + int SERVE_FETCH_CHECKPOINT_PARALLELISM; int BUGGIFY_BLOCK_BYTES; int64_t STORAGE_RECOVERY_VERSION_LAG_LIMIT; double STORAGE_DURABILITY_LAG_REJECT_THRESHOLD; diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 1db8115a62..397ed0ed2a 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -62,6 +62,7 @@ #include "fdbserver/MoveKeys.actor.h" #include "fdbserver/MutationTracking.h" #include "fdbserver/RecoveryState.h" +#include "fdbserver/RocksDBCheckpointUtils.actor.h" #include "fdbserver/StorageMetrics.h" #include "fdbserver/ServerCheckpoint.actor.h" #include "fdbserver/ServerDBInfo.h" @@ -846,6 +847,8 @@ public: AsyncVar fetchKeysBudgetUsed; std::vector> readyFetchKeys; + FlowLock serveFetchCheckpointParallelismLock; + int64_t instanceID; Promise otherError; @@ -991,6 +994,12 @@ public: }); specialCounter( cc, "FetchChangeFeedWaiting", [self]() { return self->fetchChangeFeedParallelismLock.waiters(); }); + specialCounter(cc, "ServeFetchCheckpointActive", [self]() { + return self->serveFetchCheckpointParallelismLock.activePermits(); + }); + specialCounter(cc, "ServeFetchCheckpointWaiting", [self]() { + return self->serveFetchCheckpointParallelismLock.waiters(); + }); specialCounter(cc, "QueryQueueMax", [self]() { return self->getAndResetMaxQueryQueueSize(); }); specialCounter(cc, "BytesStored", [self]() { return self->metrics.byteSample.getEstimate(allKeys); }); specialCounter(cc, "ActiveWatches", [self]() { return self->numWatches; }); @@ -1046,6 +1055,7 @@ public: fetchKeysParallelismLock(SERVER_KNOBS->FETCH_KEYS_PARALLELISM), fetchChangeFeedParallelismLock(SERVER_KNOBS->FETCH_KEYS_PARALLELISM), fetchKeysBytesBudget(SERVER_KNOBS->STORAGE_FETCH_BYTES), fetchKeysBudgetUsed(false), + serveFetchCheckpointParallelismLock(SERVER_KNOBS->SERVE_FETCH_CHECKPOINT_PARALLELISM), instanceID(deterministicRandom()->randomUniqueID().first()), shuttingDown(false), behind(false), versionBehind(false), debug_inApplyUpdate(false), debug_lastValidateTime(0), lastBytesInputEBrake(0), lastDurableVersionEBrake(0), maxQueryQueue(0), transactionTagCounter(ssi.id()), counters(this), @@ -1863,6 +1873,8 @@ ACTOR Future getCheckpointQ(StorageServer* self, GetCheckpointRequest req) // Delete the checkpoint from disk, as well as all related presisted meta data. ACTOR Future deleteCheckpointQ(StorageServer* self, Version version, CheckpointMetaData checkpoint) { + wait(delay(0, TaskPriority::Low)); + wait(self->durableVersion.whenAtLeast(version)); TraceEvent("DeleteCheckpointBegin", self->thisServerID).detail("Checkpoint", checkpoint.toString()); @@ -1937,6 +1949,75 @@ ACTOR Future fetchCheckpointQ(StorageServer* self, FetchCheckpointRequest return Void(); } +// Serves FetchCheckpointKeyValuesRequest, reads local checkpoint and sends it to the client over wire. +ACTOR Future fetchCheckpointKeyValuesQ(StorageServer* self, FetchCheckpointKeyValuesRequest req) { + wait(self->serveFetchCheckpointParallelismLock.take(TaskPriority::DefaultYield)); + state FlowLock::Releaser holder(self->serveFetchCheckpointParallelismLock); + + TraceEvent("ServeFetchCheckpointKeyValuesBegin", self->thisServerID) + .detail("CheckpointID", req.checkpointID) + .detail("Range", req.range); + + req.reply.setByteLimit(SERVER_KNOBS->CHECKPOINT_TRANSFER_BLOCK_BYTES); + + // Returns error is the checkpoint cannot be found. + const auto it = self->checkpoints.find(req.checkpointID); + if (it == self->checkpoints.end()) { + req.reply.sendError(checkpoint_not_found()); + TraceEvent("ServeFetchCheckpointNotFound", self->thisServerID).detail("CheckpointID", req.checkpointID); + return Void(); + } + + try { + state ICheckpointReader* reader = newCheckpointReader(it->second, self->thisServerID); + wait(reader->init(BinaryWriter::toValue(req.range, IncludeVersion()))); + + loop { + state RangeResult res = + wait(reader->nextKeyValues(CLIENT_KNOBS->REPLY_BYTE_LIMIT, CLIENT_KNOBS->REPLY_BYTE_LIMIT)); + if (!res.empty()) { + TraceEvent(SevDebug, "FetchCheckpontKeyValuesReadRange", self->thisServerID) + .detail("CheckpointID", req.checkpointID) + .detail("Begin", res.front().key) + .detail("End", res.back().key) + .detail("Size", res.size()); + } else { + TraceEvent(SevWarn, "FetchCheckpontKeyValuesEmptyRange", self->thisServerID) + .detail("CheckpointID", req.checkpointID); + } + + wait(req.reply.onReady()); + FetchCheckpointKeyValuesStreamReply reply; + reply.arena.dependsOn(res.arena()); + // reply.data.reserve(reply.arena, res.size()); + for (int i = 0; i < res.size(); ++i) { + reply.data.push_back(reply.arena, res[i]); + } + + req.reply.send(reply); + } + } catch (Error& e) { + if (e.code() == error_code_end_of_stream) { + req.reply.sendError(end_of_stream()); + TraceEvent("ServeFetchCheckpointKeyValuesEnd", self->thisServerID) + .detail("CheckpointID", req.checkpointID) + .detail("Range", req.range); + } else { + TraceEvent(SevWarnAlways, "ServerFetchCheckpointKeyValuesFailure") + .errorUnsuppressed(e) + .detail("CheckpointID", req.checkpointID) + .detail("Range", req.range); + if (!canReplyWith(e)) { + throw e; + } + req.reply.sendError(e); + } + } + + wait(reader->close()); + return Void(); +} + ACTOR Future overlappingChangeFeedsQ(StorageServer* data, OverlappingChangeFeedsRequest req) { wait(delay(0)); wait(data->version.whenAtLeast(req.minVersion)); @@ -7399,12 +7480,14 @@ ACTOR Future createCheckpoint(StorageServer* data, CheckpointMetaData meta data->storage.clearRange(singleKeyRange(pendingCheckpointKey)); data->storage.writeKeyValue(KeyValueRef(persistCheckpointKey, checkpointValue(checkpointResult))); wait(data->storage.commit()); + TraceEvent("StorageCreateCheckpointPersisted", data->thisServerID) + .detail("Checkpoint", checkpointResult.toString()); } catch (Error& e) { // If the checkpoint meta data is not persisted successfully, remove the checkpoint. - TraceEvent("StorageCreateCheckpointPersistFailure", data->thisServerID) + TraceEvent(SevWarn, "StorageCreateCheckpointPersistFailure", data->thisServerID) .errorUnsuppressed(e) .detail("Checkpoint", checkpointResult.toString()); - data->checkpoints.erase(checkpointResult.checkpointID); + data->checkpoints[checkpointResult.checkpointID].setState(CheckpointMetaData::Deleting); data->actors.add(deleteCheckpointQ(data, metaData.version, checkpointResult)); } @@ -7453,6 +7536,7 @@ ACTOR Future updateStorage(StorageServer* data) { if (cVer <= desiredVersion) { TraceEvent("CheckpointVersionSatisfied", data->thisServerID) .detail("DesiredVersion", desiredVersion) + .detail("DurableVersion", data->durableVersion.get()) .detail("CheckPointVersion", cVer); desiredVersion = cVer; requireCheckpoint = true; @@ -7553,13 +7637,20 @@ ACTOR Future updateStorage(StorageServer* data) { debug_advanceMinCommittedVersion(data->thisServerID, newOldestVersion); if (requireCheckpoint) { - ASSERT(newOldestVersion == data->pendingCheckpoints.begin()->first); - std::vector> createCheckpoints; - for (int idx = 0; idx < data->pendingCheckpoints.begin()->second.size(); ++idx) { - createCheckpoints.push_back(createCheckpoint(data, data->pendingCheckpoints.begin()->second[idx])); + TraceEvent(SevDebug, "CheckpointVersionDurable", data->thisServerID) + .detail("NewDurableVersion", newOldestVersion) + .detail("DesiredVersion", desiredVersion) + .detail("SmallestCheckPointVersion", data->pendingCheckpoints.begin()->first); + ASSERT(newOldestVersion <= data->pendingCheckpoints.begin()->first); + if (newOldestVersion == data->pendingCheckpoints.begin()->first) { + std::vector> createCheckpoints; + // TODO: Combine these checkpoints if necessary. + for (int idx = 0; idx < data->pendingCheckpoints.begin()->second.size(); ++idx) { + createCheckpoints.push_back(createCheckpoint(data, data->pendingCheckpoints.begin()->second[idx])); + } + wait(waitForAll(createCheckpoints)); + data->pendingCheckpoints.erase(data->pendingCheckpoints.begin()); } - wait(waitForAll(createCheckpoints)); - data->pendingCheckpoints.erase(data->pendingCheckpoints.begin()); requireCheckpoint = false; } @@ -8926,6 +9017,9 @@ ACTOR Future storageServerCore(StorageServer* self, StorageServerInterface when(FetchCheckpointRequest req = waitNext(ssi.fetchCheckpoint.getFuture())) { self->actors.add(fetchCheckpointQ(self, req)); } + when(FetchCheckpointKeyValuesRequest req = waitNext(ssi.fetchCheckpointKeyValues.getFuture())) { + self->actors.add(fetchCheckpointKeyValuesQ(self, req)); + } when(wait(updateProcessStatsTimer)) { updateProcessStats(self); updateProcessStatsTimer = delay(SERVER_KNOBS->FASTRESTORE_UPDATE_PROCESS_STATS_INTERVAL); diff --git a/flow/error_definitions.h b/flow/error_definitions.h index a0c34a24b4..3631c1cb36 100755 --- a/flow/error_definitions.h +++ b/flow/error_definitions.h @@ -195,6 +195,7 @@ ERROR( checkpoint_not_found, 2040, "Checkpoint not found" ) ERROR( key_not_tuple, 2041, "The key cannot be parsed as a tuple" ); ERROR( value_not_tuple, 2042, "The value cannot be parsed as a tuple" ); ERROR( mapper_not_tuple, 2043, "The mapper cannot be parsed as a tuple" ); +ERROR( invalid_checkpoint_format, 2044, "Invalid checkpoint format" ) ERROR( incompatible_protocol_version, 2100, "Incompatible protocol version" )