Added fetchCheckpointKeyValuesQ in storage server.
This commit is contained in:
parent
05bbe174c3
commit
bc509d9572
|
@ -673,6 +673,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
|||
init( FETCH_KEYS_PARALLELISM, 2 );
|
||||
init( FETCH_KEYS_LOWER_PRIORITY, 0 );
|
||||
init( FETCH_CHANGEFEED_PARALLELISM, 2 );
|
||||
init( SERVE_FETCH_CHECKPOINT_PARALLELISM, 4 );
|
||||
init( BUGGIFY_BLOCK_BYTES, 10000 );
|
||||
init( STORAGE_RECOVERY_VERSION_LAG_LIMIT, 2 * MAX_READ_TRANSACTION_LIFE_VERSIONS );
|
||||
init( STORAGE_COMMIT_BYTES, 10000000 ); if( randomize && BUGGIFY ) STORAGE_COMMIT_BYTES = 2000000;
|
||||
|
|
|
@ -622,6 +622,7 @@ public:
|
|||
int FETCH_KEYS_PARALLELISM;
|
||||
int FETCH_KEYS_LOWER_PRIORITY;
|
||||
int FETCH_CHANGEFEED_PARALLELISM;
|
||||
int SERVE_FETCH_CHECKPOINT_PARALLELISM;
|
||||
int BUGGIFY_BLOCK_BYTES;
|
||||
int64_t STORAGE_RECOVERY_VERSION_LAG_LIMIT;
|
||||
double STORAGE_DURABILITY_LAG_REJECT_THRESHOLD;
|
||||
|
|
|
@ -62,6 +62,7 @@
|
|||
#include "fdbserver/MoveKeys.actor.h"
|
||||
#include "fdbserver/MutationTracking.h"
|
||||
#include "fdbserver/RecoveryState.h"
|
||||
#include "fdbserver/RocksDBCheckpointUtils.actor.h"
|
||||
#include "fdbserver/StorageMetrics.h"
|
||||
#include "fdbserver/ServerCheckpoint.actor.h"
|
||||
#include "fdbserver/ServerDBInfo.h"
|
||||
|
@ -846,6 +847,8 @@ public:
|
|||
AsyncVar<bool> fetchKeysBudgetUsed;
|
||||
std::vector<Promise<FetchInjectionInfo*>> readyFetchKeys;
|
||||
|
||||
FlowLock serveFetchCheckpointParallelismLock;
|
||||
|
||||
int64_t instanceID;
|
||||
|
||||
Promise<Void> otherError;
|
||||
|
@ -991,6 +994,12 @@ public:
|
|||
});
|
||||
specialCounter(
|
||||
cc, "FetchChangeFeedWaiting", [self]() { return self->fetchChangeFeedParallelismLock.waiters(); });
|
||||
specialCounter(cc, "ServeFetchCheckpointActive", [self]() {
|
||||
return self->serveFetchCheckpointParallelismLock.activePermits();
|
||||
});
|
||||
specialCounter(cc, "ServeFetchCheckpointWaiting", [self]() {
|
||||
return self->serveFetchCheckpointParallelismLock.waiters();
|
||||
});
|
||||
specialCounter(cc, "QueryQueueMax", [self]() { return self->getAndResetMaxQueryQueueSize(); });
|
||||
specialCounter(cc, "BytesStored", [self]() { return self->metrics.byteSample.getEstimate(allKeys); });
|
||||
specialCounter(cc, "ActiveWatches", [self]() { return self->numWatches; });
|
||||
|
@ -1046,6 +1055,7 @@ public:
|
|||
fetchKeysParallelismLock(SERVER_KNOBS->FETCH_KEYS_PARALLELISM),
|
||||
fetchChangeFeedParallelismLock(SERVER_KNOBS->FETCH_KEYS_PARALLELISM),
|
||||
fetchKeysBytesBudget(SERVER_KNOBS->STORAGE_FETCH_BYTES), fetchKeysBudgetUsed(false),
|
||||
serveFetchCheckpointParallelismLock(SERVER_KNOBS->SERVE_FETCH_CHECKPOINT_PARALLELISM),
|
||||
instanceID(deterministicRandom()->randomUniqueID().first()), shuttingDown(false), behind(false),
|
||||
versionBehind(false), debug_inApplyUpdate(false), debug_lastValidateTime(0), lastBytesInputEBrake(0),
|
||||
lastDurableVersionEBrake(0), maxQueryQueue(0), transactionTagCounter(ssi.id()), counters(this),
|
||||
|
@ -1863,6 +1873,8 @@ ACTOR Future<Void> getCheckpointQ(StorageServer* self, GetCheckpointRequest req)
|
|||
|
||||
// Delete the checkpoint from disk, as well as all related presisted meta data.
|
||||
ACTOR Future<Void> deleteCheckpointQ(StorageServer* self, Version version, CheckpointMetaData checkpoint) {
|
||||
wait(delay(0, TaskPriority::Low));
|
||||
|
||||
wait(self->durableVersion.whenAtLeast(version));
|
||||
|
||||
TraceEvent("DeleteCheckpointBegin", self->thisServerID).detail("Checkpoint", checkpoint.toString());
|
||||
|
@ -1937,6 +1949,75 @@ ACTOR Future<Void> fetchCheckpointQ(StorageServer* self, FetchCheckpointRequest
|
|||
return Void();
|
||||
}
|
||||
|
||||
// Serves FetchCheckpointKeyValuesRequest, reads local checkpoint and sends it to the client over wire.
|
||||
ACTOR Future<Void> fetchCheckpointKeyValuesQ(StorageServer* self, FetchCheckpointKeyValuesRequest req) {
|
||||
wait(self->serveFetchCheckpointParallelismLock.take(TaskPriority::DefaultYield));
|
||||
state FlowLock::Releaser holder(self->serveFetchCheckpointParallelismLock);
|
||||
|
||||
TraceEvent("ServeFetchCheckpointKeyValuesBegin", self->thisServerID)
|
||||
.detail("CheckpointID", req.checkpointID)
|
||||
.detail("Range", req.range);
|
||||
|
||||
req.reply.setByteLimit(SERVER_KNOBS->CHECKPOINT_TRANSFER_BLOCK_BYTES);
|
||||
|
||||
// Returns error is the checkpoint cannot be found.
|
||||
const auto it = self->checkpoints.find(req.checkpointID);
|
||||
if (it == self->checkpoints.end()) {
|
||||
req.reply.sendError(checkpoint_not_found());
|
||||
TraceEvent("ServeFetchCheckpointNotFound", self->thisServerID).detail("CheckpointID", req.checkpointID);
|
||||
return Void();
|
||||
}
|
||||
|
||||
try {
|
||||
state ICheckpointReader* reader = newCheckpointReader(it->second, self->thisServerID);
|
||||
wait(reader->init(BinaryWriter::toValue(req.range, IncludeVersion())));
|
||||
|
||||
loop {
|
||||
state RangeResult res =
|
||||
wait(reader->nextKeyValues(CLIENT_KNOBS->REPLY_BYTE_LIMIT, CLIENT_KNOBS->REPLY_BYTE_LIMIT));
|
||||
if (!res.empty()) {
|
||||
TraceEvent(SevDebug, "FetchCheckpontKeyValuesReadRange", self->thisServerID)
|
||||
.detail("CheckpointID", req.checkpointID)
|
||||
.detail("Begin", res.front().key)
|
||||
.detail("End", res.back().key)
|
||||
.detail("Size", res.size());
|
||||
} else {
|
||||
TraceEvent(SevWarn, "FetchCheckpontKeyValuesEmptyRange", self->thisServerID)
|
||||
.detail("CheckpointID", req.checkpointID);
|
||||
}
|
||||
|
||||
wait(req.reply.onReady());
|
||||
FetchCheckpointKeyValuesStreamReply reply;
|
||||
reply.arena.dependsOn(res.arena());
|
||||
// reply.data.reserve(reply.arena, res.size());
|
||||
for (int i = 0; i < res.size(); ++i) {
|
||||
reply.data.push_back(reply.arena, res[i]);
|
||||
}
|
||||
|
||||
req.reply.send(reply);
|
||||
}
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_end_of_stream) {
|
||||
req.reply.sendError(end_of_stream());
|
||||
TraceEvent("ServeFetchCheckpointKeyValuesEnd", self->thisServerID)
|
||||
.detail("CheckpointID", req.checkpointID)
|
||||
.detail("Range", req.range);
|
||||
} else {
|
||||
TraceEvent(SevWarnAlways, "ServerFetchCheckpointKeyValuesFailure")
|
||||
.errorUnsuppressed(e)
|
||||
.detail("CheckpointID", req.checkpointID)
|
||||
.detail("Range", req.range);
|
||||
if (!canReplyWith(e)) {
|
||||
throw e;
|
||||
}
|
||||
req.reply.sendError(e);
|
||||
}
|
||||
}
|
||||
|
||||
wait(reader->close());
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> overlappingChangeFeedsQ(StorageServer* data, OverlappingChangeFeedsRequest req) {
|
||||
wait(delay(0));
|
||||
wait(data->version.whenAtLeast(req.minVersion));
|
||||
|
@ -7399,12 +7480,14 @@ ACTOR Future<Void> createCheckpoint(StorageServer* data, CheckpointMetaData meta
|
|||
data->storage.clearRange(singleKeyRange(pendingCheckpointKey));
|
||||
data->storage.writeKeyValue(KeyValueRef(persistCheckpointKey, checkpointValue(checkpointResult)));
|
||||
wait(data->storage.commit());
|
||||
TraceEvent("StorageCreateCheckpointPersisted", data->thisServerID)
|
||||
.detail("Checkpoint", checkpointResult.toString());
|
||||
} catch (Error& e) {
|
||||
// If the checkpoint meta data is not persisted successfully, remove the checkpoint.
|
||||
TraceEvent("StorageCreateCheckpointPersistFailure", data->thisServerID)
|
||||
TraceEvent(SevWarn, "StorageCreateCheckpointPersistFailure", data->thisServerID)
|
||||
.errorUnsuppressed(e)
|
||||
.detail("Checkpoint", checkpointResult.toString());
|
||||
data->checkpoints.erase(checkpointResult.checkpointID);
|
||||
data->checkpoints[checkpointResult.checkpointID].setState(CheckpointMetaData::Deleting);
|
||||
data->actors.add(deleteCheckpointQ(data, metaData.version, checkpointResult));
|
||||
}
|
||||
|
||||
|
@ -7453,6 +7536,7 @@ ACTOR Future<Void> updateStorage(StorageServer* data) {
|
|||
if (cVer <= desiredVersion) {
|
||||
TraceEvent("CheckpointVersionSatisfied", data->thisServerID)
|
||||
.detail("DesiredVersion", desiredVersion)
|
||||
.detail("DurableVersion", data->durableVersion.get())
|
||||
.detail("CheckPointVersion", cVer);
|
||||
desiredVersion = cVer;
|
||||
requireCheckpoint = true;
|
||||
|
@ -7553,13 +7637,20 @@ ACTOR Future<Void> updateStorage(StorageServer* data) {
|
|||
debug_advanceMinCommittedVersion(data->thisServerID, newOldestVersion);
|
||||
|
||||
if (requireCheckpoint) {
|
||||
ASSERT(newOldestVersion == data->pendingCheckpoints.begin()->first);
|
||||
std::vector<Future<Void>> createCheckpoints;
|
||||
for (int idx = 0; idx < data->pendingCheckpoints.begin()->second.size(); ++idx) {
|
||||
createCheckpoints.push_back(createCheckpoint(data, data->pendingCheckpoints.begin()->second[idx]));
|
||||
TraceEvent(SevDebug, "CheckpointVersionDurable", data->thisServerID)
|
||||
.detail("NewDurableVersion", newOldestVersion)
|
||||
.detail("DesiredVersion", desiredVersion)
|
||||
.detail("SmallestCheckPointVersion", data->pendingCheckpoints.begin()->first);
|
||||
ASSERT(newOldestVersion <= data->pendingCheckpoints.begin()->first);
|
||||
if (newOldestVersion == data->pendingCheckpoints.begin()->first) {
|
||||
std::vector<Future<Void>> createCheckpoints;
|
||||
// TODO: Combine these checkpoints if necessary.
|
||||
for (int idx = 0; idx < data->pendingCheckpoints.begin()->second.size(); ++idx) {
|
||||
createCheckpoints.push_back(createCheckpoint(data, data->pendingCheckpoints.begin()->second[idx]));
|
||||
}
|
||||
wait(waitForAll(createCheckpoints));
|
||||
data->pendingCheckpoints.erase(data->pendingCheckpoints.begin());
|
||||
}
|
||||
wait(waitForAll(createCheckpoints));
|
||||
data->pendingCheckpoints.erase(data->pendingCheckpoints.begin());
|
||||
requireCheckpoint = false;
|
||||
}
|
||||
|
||||
|
@ -8926,6 +9017,9 @@ ACTOR Future<Void> storageServerCore(StorageServer* self, StorageServerInterface
|
|||
when(FetchCheckpointRequest req = waitNext(ssi.fetchCheckpoint.getFuture())) {
|
||||
self->actors.add(fetchCheckpointQ(self, req));
|
||||
}
|
||||
when(FetchCheckpointKeyValuesRequest req = waitNext(ssi.fetchCheckpointKeyValues.getFuture())) {
|
||||
self->actors.add(fetchCheckpointKeyValuesQ(self, req));
|
||||
}
|
||||
when(wait(updateProcessStatsTimer)) {
|
||||
updateProcessStats(self);
|
||||
updateProcessStatsTimer = delay(SERVER_KNOBS->FASTRESTORE_UPDATE_PROCESS_STATS_INTERVAL);
|
||||
|
|
|
@ -195,6 +195,7 @@ ERROR( checkpoint_not_found, 2040, "Checkpoint not found" )
|
|||
ERROR( key_not_tuple, 2041, "The key cannot be parsed as a tuple" );
|
||||
ERROR( value_not_tuple, 2042, "The value cannot be parsed as a tuple" );
|
||||
ERROR( mapper_not_tuple, 2043, "The mapper cannot be parsed as a tuple" );
|
||||
ERROR( invalid_checkpoint_format, 2044, "Invalid checkpoint format" )
|
||||
|
||||
|
||||
ERROR( incompatible_protocol_version, 2100, "Incompatible protocol version" )
|
||||
|
|
Loading…
Reference in New Issue