Ss shard management (#7340)
* Storage server shard management with physical shards. * Cleanup. * Resolved comments. * Added `UnlimintedCommitBytes`. Co-authored-by: He Liu <heliu@apple.com>
This commit is contained in:
parent
c759357687
commit
7a8be255cd
|
@ -740,6 +740,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
|||
init( MAX_PARALLEL_QUICK_GET_VALUE, 50 ); if ( randomize && BUGGIFY ) MAX_PARALLEL_QUICK_GET_VALUE = deterministicRandom()->randomInt(1, 100);
|
||||
init( QUICK_GET_KEY_VALUES_LIMIT, 2000 );
|
||||
init( QUICK_GET_KEY_VALUES_LIMIT_BYTES, 1e7 );
|
||||
init( STORAGE_SERVER_SHARD_AWARE, true );
|
||||
|
||||
//Wait Failure
|
||||
init( MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS, 250 ); if( randomize && BUGGIFY ) MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS = 2;
|
||||
|
|
|
@ -475,18 +475,21 @@ const Value serverKeysValue(const UID& id) {
|
|||
|
||||
void decodeServerKeysValue(const ValueRef& value, bool& assigned, bool& emptyRange, UID& id) {
|
||||
if (value.size() == 0) {
|
||||
id = UID();
|
||||
assigned = false;
|
||||
emptyRange = false;
|
||||
id = UID();
|
||||
} else if (value == serverKeysTrue) {
|
||||
assigned = true;
|
||||
emptyRange = false;
|
||||
id = anonymousShardId;
|
||||
} else if (value == serverKeysTrueEmptyRange) {
|
||||
assigned = true;
|
||||
emptyRange = true;
|
||||
id = anonymousShardId;
|
||||
} else if (value == serverKeysFalse) {
|
||||
assigned = false;
|
||||
emptyRange = false;
|
||||
id = UID();
|
||||
} else {
|
||||
BinaryReader rd(value, IncludeVersion());
|
||||
ASSERT(rd.protocolVersion().hasShardEncodeLocationMetaData());
|
||||
|
|
|
@ -698,6 +698,7 @@ public:
|
|||
int CHECKPOINT_TRANSFER_BLOCK_BYTES;
|
||||
int QUICK_GET_KEY_VALUES_LIMIT;
|
||||
int QUICK_GET_KEY_VALUES_LIMIT_BYTES;
|
||||
bool STORAGE_SERVER_SHARD_AWARE;
|
||||
|
||||
// Wait Failure
|
||||
int MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS;
|
||||
|
|
|
@ -25,6 +25,7 @@
|
|||
#include <ostream>
|
||||
#include "fdbclient/FDBTypes.h"
|
||||
#include "fdbclient/StorageCheckpoint.h"
|
||||
#include "fdbclient/StorageServerShard.h"
|
||||
#include "fdbrpc/Locality.h"
|
||||
#include "fdbrpc/QueueModel.h"
|
||||
#include "fdbrpc/fdbrpc.h"
|
||||
|
@ -572,12 +573,13 @@ struct GetShardStateReply {
|
|||
|
||||
Version first;
|
||||
Version second;
|
||||
std::vector<StorageServerShard> shards;
|
||||
GetShardStateReply() = default;
|
||||
GetShardStateReply(Version first, Version second) : first(first), second(second) {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, first, second);
|
||||
serializer(ar, first, second, shards);
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -587,13 +589,16 @@ struct GetShardStateRequest {
|
|||
|
||||
KeyRange keys;
|
||||
int32_t mode;
|
||||
bool includePhysicalShard;
|
||||
ReplyPromise<GetShardStateReply> reply;
|
||||
GetShardStateRequest() {}
|
||||
GetShardStateRequest(KeyRange const& keys, waitMode mode) : keys(keys), mode(mode) {}
|
||||
GetShardStateRequest() = default;
|
||||
GetShardStateRequest(KeyRange const& keys, waitMode mode, bool includePhysicalShard)
|
||||
: keys(keys), mode(mode), includePhysicalShard(includePhysicalShard) {}
|
||||
GetShardStateRequest(KeyRange const& keys, waitMode mode) : keys(keys), mode(mode), includePhysicalShard(false) {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, keys, mode, reply);
|
||||
serializer(ar, keys, mode, reply, includePhysicalShard);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -0,0 +1,88 @@
|
|||
/*
|
||||
* StorageServerShard.h
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#ifndef FDBCLIENT_STORAGESERVERSHARD_H
|
||||
#define FDBCLIENT_STORAGESERVERSHARD_H
|
||||
#pragma once
|
||||
|
||||
#include "fdbclient/FDBTypes.h"
|
||||
#include "flow/flow.h"
|
||||
|
||||
// Represents a data shard on a storage server hosting a continuous keyrange.
|
||||
struct StorageServerShard {
|
||||
constexpr static FileIdentifier file_identifier = 4028358;
|
||||
|
||||
enum ShardState {
|
||||
NotAssigned = 0,
|
||||
MovingIn = 1,
|
||||
ReadWritePending = 2,
|
||||
ReadWrite = 3,
|
||||
};
|
||||
|
||||
StorageServerShard() = default;
|
||||
StorageServerShard(KeyRange range,
|
||||
Version version,
|
||||
const uint64_t id,
|
||||
const uint64_t desiredId,
|
||||
ShardState shardState)
|
||||
: range(range), version(version), id(id), desiredId(desiredId), shardState(shardState) {}
|
||||
|
||||
static StorageServerShard notAssigned(KeyRange range, Version version = 0) {
|
||||
return StorageServerShard(range, version, 0, 0, NotAssigned);
|
||||
}
|
||||
|
||||
ShardState getShardState() const { return static_cast<ShardState>(this->shardState); };
|
||||
|
||||
void setShardState(const ShardState shardState) { this->shardState = static_cast<int8_t>(shardState); }
|
||||
|
||||
std::string getShardStateString() const {
|
||||
const ShardState ss = getShardState();
|
||||
switch (ss) {
|
||||
case NotAssigned:
|
||||
return "NotAssigned";
|
||||
case MovingIn:
|
||||
return "MovingIn";
|
||||
case ReadWritePending:
|
||||
return "ReadWritePending";
|
||||
case ReadWrite:
|
||||
return "ReadWrite";
|
||||
}
|
||||
return "InvalidState";
|
||||
}
|
||||
|
||||
std::string toString() const {
|
||||
return "StorageServerShard: [Range]: " + Traceable<KeyRangeRef>::toString(range) +
|
||||
" [Shard ID]: " + format("%016llx", this->id) + " [Version]: " + std::to_string(version) +
|
||||
" [State]: " + getShardStateString() + " [Desired Shard ID]: " + format("%016llx", this->desiredId);
|
||||
}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, range, version, id, desiredId, shardState);
|
||||
}
|
||||
|
||||
KeyRange range;
|
||||
Version version; // Shard creation version.
|
||||
uint64_t id; // The actual shard ID.
|
||||
uint64_t desiredId; // The intended shard ID.
|
||||
int8_t shardState;
|
||||
};
|
||||
|
||||
#endif
|
|
@ -2217,6 +2217,8 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
|
|||
|
||||
KeyValueStoreType getType() const override { return KeyValueStoreType(KeyValueStoreType::SSD_SHARDED_ROCKSDB); }
|
||||
|
||||
bool shardAware() const override { return true; }
|
||||
|
||||
Future<Void> init() override {
|
||||
if (openFuture.isValid()) {
|
||||
return openFuture;
|
||||
|
|
|
@ -61,6 +61,9 @@ public:
|
|||
class IKeyValueStore : public IClosable {
|
||||
public:
|
||||
virtual KeyValueStoreType getType() const = 0;
|
||||
// Returns true if the KV store supports shards, i.e., implements addRange(), removeRange(), and
|
||||
// persistRangeMapping().
|
||||
virtual bool shardAware() const { return false; }
|
||||
virtual void set(KeyValueRef keyValue, const Arena* arena = nullptr) = 0;
|
||||
virtual void clear(KeyRangeRef range, const Arena* arena = nullptr) = 0;
|
||||
virtual Future<Void> canCommit() { return Void(); }
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
*PhysicalShardMove.actor.cpp
|
||||
* PhysicalShardMove.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
|
@ -43,11 +43,14 @@ std::string printValue(const ErrorOr<Optional<Value>>& value) {
|
|||
}
|
||||
} // namespace
|
||||
|
||||
struct SSCheckpointWorkload : TestWorkload {
|
||||
struct PhysicalShardMoveWorkLoad : TestWorkload {
|
||||
FlowLock startMoveKeysParallelismLock;
|
||||
FlowLock finishMoveKeysParallelismLock;
|
||||
FlowLock cleanUpDataMoveParallelismLock;
|
||||
const bool enabled;
|
||||
bool pass;
|
||||
|
||||
SSCheckpointWorkload(WorkloadContext const& wcx) : TestWorkload(wcx), enabled(!clientId), pass(true) {}
|
||||
PhysicalShardMoveWorkLoad(WorkloadContext const& wcx) : TestWorkload(wcx), enabled(!clientId), pass(true) {}
|
||||
|
||||
void validationFailed(ErrorOr<Optional<Value>> expectedValue, ErrorOr<Optional<Value>> actualValue) {
|
||||
TraceEvent(SevError, "TestFailed")
|
||||
|
@ -56,7 +59,7 @@ struct SSCheckpointWorkload : TestWorkload {
|
|||
pass = false;
|
||||
}
|
||||
|
||||
std::string description() const override { return "SSCheckpoint"; }
|
||||
std::string description() const override { return "PhysicalShardMove"; }
|
||||
|
||||
Future<Void> setup(Database const& cx) override { return Void(); }
|
||||
|
||||
|
@ -67,126 +70,153 @@ struct SSCheckpointWorkload : TestWorkload {
|
|||
return _start(this, cx);
|
||||
}
|
||||
|
||||
ACTOR Future<Void> _start(SSCheckpointWorkload* self, Database cx) {
|
||||
state Key key = "TestKey"_sr;
|
||||
state Key endKey = "TestKey0"_sr;
|
||||
state Value oldValue = "TestValue"_sr;
|
||||
state KeyRange testRange = KeyRangeRef(key, endKey);
|
||||
|
||||
ACTOR Future<Void> _start(PhysicalShardMoveWorkLoad* self, Database cx) {
|
||||
int ignore = wait(setDDMode(cx, 0));
|
||||
state Version version = wait(self->writeAndVerify(self, cx, key, oldValue));
|
||||
state std::map<Key, Value> kvs({ { "TestKeyA"_sr, "TestValueA"_sr },
|
||||
{ "TestKeyB"_sr, "TestValueB"_sr },
|
||||
{ "TestKeyC"_sr, "TestValueC"_sr },
|
||||
{ "TestKeyD"_sr, "TestValueD"_sr },
|
||||
{ "TestKeyE"_sr, "TestValueE"_sr },
|
||||
{ "TestKeyF"_sr, "TestValueF"_sr } });
|
||||
|
||||
// Create checkpoint.
|
||||
state Transaction tr(cx);
|
||||
state CheckpointFormat format = deterministicRandom()->coinflip() ? RocksDBColumnFamily : RocksDB;
|
||||
loop {
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
wait(createCheckpoint(&tr, testRange, format));
|
||||
wait(tr.commit());
|
||||
version = tr.getCommittedVersion();
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
Version _ = wait(self->populateData(self, cx, &kvs));
|
||||
|
||||
TraceEvent("TestValueWritten").log();
|
||||
|
||||
state std::unordered_set<UID> excludes;
|
||||
state std::unordered_set<UID> includes;
|
||||
state int teamSize = 1;
|
||||
std::vector<UID> teamA = wait(self->moveShard(self,
|
||||
cx,
|
||||
deterministicRandom()->randomUniqueID(),
|
||||
KeyRangeRef("TestKeyA"_sr, "TestKeyF"_sr),
|
||||
teamSize,
|
||||
includes,
|
||||
excludes));
|
||||
excludes.insert(teamA.begin(), teamA.end());
|
||||
|
||||
state uint64_t sh0 = deterministicRandom()->randomUInt64();
|
||||
state uint64_t sh1 = deterministicRandom()->randomUInt64();
|
||||
state uint64_t sh2 = deterministicRandom()->randomUInt64();
|
||||
|
||||
// Move range [TestKeyA, TestKeyB) to sh0.
|
||||
state std::vector<UID> teamA = wait(self->moveShard(self,
|
||||
cx,
|
||||
UID(sh0, deterministicRandom()->randomUInt64()),
|
||||
KeyRangeRef("TestKeyA"_sr, "TestKeyB"_sr),
|
||||
teamSize,
|
||||
includes,
|
||||
excludes));
|
||||
includes.insert(teamA.begin(), teamA.end());
|
||||
// Move range [TestKeyB, TestKeyC) to sh1, on the same server.
|
||||
state std::vector<UID> teamB = wait(self->moveShard(self,
|
||||
cx,
|
||||
UID(sh1, deterministicRandom()->randomUInt64()),
|
||||
KeyRangeRef("TestKeyB"_sr, "TestKeyC"_sr),
|
||||
teamSize,
|
||||
includes,
|
||||
excludes));
|
||||
ASSERT(std::equal(teamA.begin(), teamA.end(), teamB.begin()));
|
||||
|
||||
state int teamIdx = 0;
|
||||
for (teamIdx = 0; teamIdx < teamA.size(); ++teamIdx) {
|
||||
std::vector<StorageServerShard> shards =
|
||||
wait(self->getStorageServerShards(cx, teamA[teamIdx], KeyRangeRef("TestKeyA"_sr, "TestKeyC"_sr)));
|
||||
ASSERT(shards.size() == 2);
|
||||
ASSERT(shards[0].desiredId == sh0);
|
||||
ASSERT(shards[1].desiredId == sh1);
|
||||
TraceEvent("TestStorageServerShards", teamA[teamIdx]).detail("Shards", describe(shards));
|
||||
}
|
||||
|
||||
TraceEvent("TestCheckpointCreated").detail("Range", testRange).detail("Version", version);
|
||||
state std::vector<UID> teamC = wait(self->moveShard(self,
|
||||
cx,
|
||||
UID(sh2, deterministicRandom()->randomUInt64()),
|
||||
KeyRangeRef("TestKeyB"_sr, "TestKeyC"_sr),
|
||||
teamSize,
|
||||
includes,
|
||||
excludes));
|
||||
ASSERT(std::equal(teamA.begin(), teamA.end(), teamC.begin()));
|
||||
|
||||
// Fetch checkpoint meta data.
|
||||
loop {
|
||||
try {
|
||||
state std::vector<CheckpointMetaData> records =
|
||||
wait(getCheckpointMetaData(cx, testRange, version, format));
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
TraceEvent("TestFetchCheckpointMetadataError")
|
||||
.errorUnsuppressed(e)
|
||||
.detail("Range", testRange)
|
||||
.detail("Version", version);
|
||||
|
||||
// The checkpoint was just created, we don't expect this error.
|
||||
ASSERT(e.code() != error_code_checkpoint_not_found);
|
||||
}
|
||||
for (teamIdx = 0; teamIdx < teamA.size(); ++teamIdx) {
|
||||
std::vector<StorageServerShard> shards =
|
||||
wait(self->getStorageServerShards(cx, teamA[teamIdx], KeyRangeRef("TestKeyA"_sr, "TestKeyC"_sr)));
|
||||
ASSERT(shards.size() == 2);
|
||||
ASSERT(shards[0].desiredId == sh0);
|
||||
ASSERT(shards[1].id == sh1);
|
||||
ASSERT(shards[1].desiredId == sh2);
|
||||
TraceEvent("TestStorageServerShards", teamA[teamIdx]).detail("Shards", describe(shards));
|
||||
}
|
||||
|
||||
TraceEvent("TestCheckpointFetched")
|
||||
.detail("Range", testRange)
|
||||
.detail("Version", version)
|
||||
.detail("Checkpoints", describe(records));
|
||||
|
||||
state std::string pwd = platform::getWorkingDirectory();
|
||||
state std::string folder = pwd + "/checkpoints";
|
||||
platform::eraseDirectoryRecursive(folder);
|
||||
ASSERT(platform::createDirectory(folder));
|
||||
|
||||
// Fetch checkpoint.
|
||||
state std::vector<CheckpointMetaData> fetchedCheckpoints;
|
||||
state int i = 0;
|
||||
for (; i < records.size(); ++i) {
|
||||
loop {
|
||||
TraceEvent("TestFetchingCheckpoint").detail("Checkpoint", records[i].toString());
|
||||
try {
|
||||
state CheckpointMetaData record = wait(fetchCheckpoint(cx, records[0], folder));
|
||||
fetchedCheckpoints.push_back(record);
|
||||
TraceEvent("TestCheckpointFetched").detail("Checkpoint", record.toString());
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
TraceEvent("TestFetchCheckpointError")
|
||||
.errorUnsuppressed(e)
|
||||
.detail("Checkpoint", records[i].toString());
|
||||
wait(delay(1));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
state std::string rocksDBTestDir = "rocksdb-kvstore-test-db";
|
||||
platform::eraseDirectoryRecursive(rocksDBTestDir);
|
||||
|
||||
// Restore KVS.
|
||||
state IKeyValueStore* kvStore = keyValueStoreRocksDB(
|
||||
rocksDBTestDir, deterministicRandom()->randomUniqueID(), KeyValueStoreType::SSD_ROCKSDB_V1);
|
||||
wait(kvStore->init());
|
||||
try {
|
||||
wait(kvStore->restore(fetchedCheckpoints));
|
||||
} catch (Error& e) {
|
||||
TraceEvent(SevError, "TestRestoreCheckpointError")
|
||||
.errorUnsuppressed(e)
|
||||
.detail("Checkpoint", describe(records));
|
||||
}
|
||||
|
||||
// Compare the keyrange between the original database and the one restored from checkpoint.
|
||||
// For now, it should have been a single key.
|
||||
tr.reset();
|
||||
loop {
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
state RangeResult res = wait(tr.getRange(KeyRangeRef(key, endKey), CLIENT_KNOBS->TOO_MANY));
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
|
||||
RangeResult kvRange = wait(kvStore->readRange(testRange));
|
||||
ASSERT(res.size() == kvRange.size());
|
||||
for (int i = 0; i < res.size(); ++i) {
|
||||
ASSERT(res[i] == kvRange[i]);
|
||||
}
|
||||
wait(self->validateData(self, cx, KeyRangeRef("TestKeyA"_sr, "TestKeyF"_sr), &kvs));
|
||||
TraceEvent("TestValueVerified").log();
|
||||
|
||||
int ignore = wait(setDDMode(cx, 1));
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> readAndVerify(SSCheckpointWorkload* self,
|
||||
ACTOR Future<Version> populateData(PhysicalShardMoveWorkLoad* self, Database cx, std::map<Key, Value>* kvs) {
|
||||
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(cx);
|
||||
state Version version;
|
||||
loop {
|
||||
state UID debugID = deterministicRandom()->randomUniqueID();
|
||||
try {
|
||||
tr->debugTransaction(debugID);
|
||||
for (const auto& [key, value] : *kvs) {
|
||||
tr->set(key, value);
|
||||
}
|
||||
wait(tr->commit());
|
||||
version = tr->getCommittedVersion();
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
TraceEvent("TestCommitError").errorUnsuppressed(e);
|
||||
wait(tr->onError(e));
|
||||
}
|
||||
}
|
||||
|
||||
TraceEvent("PopulateTestDataDone")
|
||||
.detail("CommitVersion", tr->getCommittedVersion())
|
||||
.detail("DebugID", debugID);
|
||||
|
||||
return version;
|
||||
}
|
||||
|
||||
ACTOR Future<Void> validateData(PhysicalShardMoveWorkLoad* self,
|
||||
Database cx,
|
||||
KeyRange range,
|
||||
std::map<Key, Value>* kvs) {
|
||||
state Transaction tr(cx);
|
||||
loop {
|
||||
state UID debugID = deterministicRandom()->randomUniqueID();
|
||||
try {
|
||||
tr.debugTransaction(debugID);
|
||||
RangeResult res = wait(tr.getRange(range, CLIENT_KNOBS->TOO_MANY));
|
||||
ASSERT(!res.more && res.size() < CLIENT_KNOBS->TOO_MANY);
|
||||
|
||||
for (const auto& kv : res) {
|
||||
ASSERT((*kvs)[kv.key] == kv.value);
|
||||
}
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
TraceEvent("TestCommitError").errorUnsuppressed(e);
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
|
||||
TraceEvent("ValidateTestDataDone").detail("DebugID", debugID);
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> readAndVerify(PhysicalShardMoveWorkLoad* self,
|
||||
Database cx,
|
||||
Key key,
|
||||
ErrorOr<Optional<Value>> expectedValue) {
|
||||
state Transaction tr(cx);
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
|
||||
loop {
|
||||
try {
|
||||
state Version readVersion = wait(tr.getReadVersion());
|
||||
state Optional<Value> res = wait(timeoutError(tr.get(key), 30.0));
|
||||
const bool equal = !expectedValue.isError() && res == expectedValue.get();
|
||||
if (!equal) {
|
||||
|
@ -194,6 +224,7 @@ struct SSCheckpointWorkload : TestWorkload {
|
|||
}
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
TraceEvent("TestReadError").errorUnsuppressed(e);
|
||||
if (expectedValue.isError() && expectedValue.getError().code() == e.code()) {
|
||||
break;
|
||||
}
|
||||
|
@ -201,35 +232,146 @@ struct SSCheckpointWorkload : TestWorkload {
|
|||
}
|
||||
}
|
||||
|
||||
TraceEvent("TestReadSuccess").detail("Version", readVersion);
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Version> writeAndVerify(SSCheckpointWorkload* self, Database cx, Key key, Optional<Value> value) {
|
||||
state Transaction tr(cx);
|
||||
ACTOR Future<Version> writeAndVerify(PhysicalShardMoveWorkLoad* self, Database cx, Key key, Optional<Value> value) {
|
||||
// state Transaction tr(cx);
|
||||
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(cx);
|
||||
state Version version;
|
||||
loop {
|
||||
state UID debugID = deterministicRandom()->randomUniqueID();
|
||||
try {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->debugTransaction(debugID);
|
||||
if (value.present()) {
|
||||
tr.set(key, value.get());
|
||||
tr->set(key, value.get());
|
||||
tr->set("Test?"_sr, value.get());
|
||||
tr->set(key, value.get());
|
||||
} else {
|
||||
tr.clear(key);
|
||||
tr->clear(key);
|
||||
}
|
||||
wait(timeoutError(tr.commit(), 30.0));
|
||||
version = tr.getCommittedVersion();
|
||||
wait(timeoutError(tr->commit(), 30.0));
|
||||
version = tr->getCommittedVersion();
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
TraceEvent("TestCommitError").errorUnsuppressed(e);
|
||||
wait(tr->onError(e));
|
||||
}
|
||||
}
|
||||
|
||||
TraceEvent("TestCommitSuccess").detail("CommitVersion", tr->getCommittedVersion()).detail("DebugID", debugID);
|
||||
|
||||
wait(self->readAndVerify(self, cx, key, value));
|
||||
|
||||
return version;
|
||||
}
|
||||
|
||||
// Move keys to a random selected team consisting of a single SS, after disabling DD, so that keys won't be
|
||||
// kept in the new team until DD is enabled.
|
||||
// Returns the address of the single SS of the new team.
|
||||
ACTOR Future<std::vector<UID>> moveShard(PhysicalShardMoveWorkLoad* self,
|
||||
Database cx,
|
||||
UID dataMoveId,
|
||||
KeyRange keys,
|
||||
int teamSize,
|
||||
std::unordered_set<UID> includes,
|
||||
std::unordered_set<UID> excludes) {
|
||||
// Disable DD to avoid DD undoing of our move.
|
||||
int ignore = wait(setDDMode(cx, 0));
|
||||
|
||||
// Pick a random SS as the dest, keys will reside on a single server after the move.
|
||||
std::vector<StorageServerInterface> interfs = wait(getStorageServers(cx));
|
||||
ASSERT(interfs.size() > teamSize - includes.size());
|
||||
while (includes.size() < teamSize) {
|
||||
const auto& interf = interfs[deterministicRandom()->randomInt(0, interfs.size())];
|
||||
if (excludes.count(interf.uniqueID) == 0 && includes.count(interf.uniqueID) == 0) {
|
||||
includes.insert(interf.uniqueID);
|
||||
}
|
||||
}
|
||||
|
||||
state std::vector<UID> dests(includes.begin(), includes.end());
|
||||
state UID owner = deterministicRandom()->randomUniqueID();
|
||||
// state Key ownerKey = "\xff/moveKeysLock/Owner"_sr;
|
||||
state DDEnabledState ddEnabledState;
|
||||
|
||||
state Transaction tr(cx);
|
||||
|
||||
loop {
|
||||
try {
|
||||
TraceEvent("TestMoveShard").detail("Range", keys.toString());
|
||||
state MoveKeysLock moveKeysLock = wait(takeMoveKeysLock(cx, owner));
|
||||
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
state RangeResult dataMoves = wait(tr.getRange(dataMoveKeys, CLIENT_KNOBS->TOO_MANY));
|
||||
Version readVersion = wait(tr.getReadVersion());
|
||||
TraceEvent("TestMoveShardReadDataMoves")
|
||||
.detail("DataMoves", dataMoves.size())
|
||||
.detail("ReadVersion", readVersion);
|
||||
state int i = 0;
|
||||
for (; i < dataMoves.size(); ++i) {
|
||||
UID dataMoveId = decodeDataMoveKey(dataMoves[i].key);
|
||||
state DataMoveMetaData dataMove = decodeDataMoveValue(dataMoves[i].value);
|
||||
ASSERT(dataMoveId == dataMove.id);
|
||||
TraceEvent("TestCancelDataMoveBegin").detail("DataMove", dataMove.toString());
|
||||
wait(cleanUpDataMove(cx,
|
||||
dataMoveId,
|
||||
moveKeysLock,
|
||||
&self->cleanUpDataMoveParallelismLock,
|
||||
dataMove.range,
|
||||
&ddEnabledState));
|
||||
TraceEvent("TestCancelDataMoveEnd").detail("DataMove", dataMove.toString());
|
||||
}
|
||||
|
||||
wait(moveKeys(cx,
|
||||
dataMoveId,
|
||||
keys,
|
||||
dests,
|
||||
dests,
|
||||
moveKeysLock,
|
||||
Promise<Void>(),
|
||||
&self->startMoveKeysParallelismLock,
|
||||
&self->finishMoveKeysParallelismLock,
|
||||
false,
|
||||
deterministicRandom()->randomUniqueID(), // for logging only
|
||||
&ddEnabledState));
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_movekeys_conflict) {
|
||||
// Conflict on moveKeysLocks with the current running DD is expected, just retry.
|
||||
tr.reset();
|
||||
} else {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
TraceEvent("TestMoveShardComplete").detail("Range", keys.toString()).detail("NewTeam", describe(dests));
|
||||
|
||||
return dests;
|
||||
}
|
||||
|
||||
ACTOR Future<std::vector<StorageServerShard>> getStorageServerShards(Database cx, UID ssId, KeyRange range) {
|
||||
state Transaction tr(cx);
|
||||
loop {
|
||||
try {
|
||||
Optional<Value> serverListValue = wait(tr.get(serverListKeyFor(ssId)));
|
||||
ASSERT(serverListValue.present());
|
||||
state StorageServerInterface ssi = decodeServerListValue(serverListValue.get());
|
||||
GetShardStateRequest req(range, GetShardStateRequest::READABLE, true);
|
||||
GetShardStateReply rep = wait(ssi.getShardState.getReply(req, TaskPriority::DefaultEndpoint));
|
||||
return rep.shards;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Future<bool> check(Database const& cx) override { return pass; }
|
||||
|
||||
void getMetrics(std::vector<PerfMetric>& m) override {}
|
||||
};
|
||||
|
||||
WorkloadFactory<SSCheckpointWorkload> SSCheckpointWorkloadFactory("SSCheckpointWorkload");
|
||||
WorkloadFactory<PhysicalShardMoveWorkLoad> PhysicalShardMoveWorkLoadFactory("PhysicalShardMove");
|
|
@ -0,0 +1,238 @@
|
|||
/*
|
||||
*PhysicalShardMove.actor.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbclient/ManagementAPI.actor.h"
|
||||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbrpc/simulator.h"
|
||||
#include "fdbserver/IKeyValueStore.h"
|
||||
#include "fdbserver/ServerCheckpoint.actor.h"
|
||||
#include "fdbserver/MoveKeys.actor.h"
|
||||
#include "fdbserver/QuietDatabase.h"
|
||||
#include "fdbserver/workloads/workloads.actor.h"
|
||||
#include "flow/Error.h"
|
||||
#include "flow/IRandom.h"
|
||||
#include "flow/flow.h"
|
||||
#include <cstdint>
|
||||
#include <limits>
|
||||
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
namespace {
|
||||
std::string printValue(const ErrorOr<Optional<Value>>& value) {
|
||||
if (value.isError()) {
|
||||
return value.getError().name();
|
||||
}
|
||||
return value.get().present() ? value.get().get().toString() : "Value Not Found.";
|
||||
}
|
||||
} // namespace
|
||||
|
||||
struct SSCheckpointRestoreWorkload : TestWorkload {
|
||||
const bool enabled;
|
||||
bool pass;
|
||||
|
||||
SSCheckpointRestoreWorkload(WorkloadContext const& wcx) : TestWorkload(wcx), enabled(!clientId), pass(true) {}
|
||||
|
||||
void validationFailed(ErrorOr<Optional<Value>> expectedValue, ErrorOr<Optional<Value>> actualValue) {
|
||||
TraceEvent(SevError, "TestFailed")
|
||||
.detail("ExpectedValue", printValue(expectedValue))
|
||||
.detail("ActualValue", printValue(actualValue));
|
||||
pass = false;
|
||||
}
|
||||
|
||||
std::string description() const override { return "SSCheckpoint"; }
|
||||
|
||||
Future<Void> setup(Database const& cx) override { return Void(); }
|
||||
|
||||
Future<Void> start(Database const& cx) override {
|
||||
if (!enabled) {
|
||||
return Void();
|
||||
}
|
||||
return _start(this, cx);
|
||||
}
|
||||
|
||||
ACTOR Future<Void> _start(SSCheckpointRestoreWorkload* self, Database cx) {
|
||||
state Key key = "TestKey"_sr;
|
||||
state Key endKey = "TestKey0"_sr;
|
||||
state Value oldValue = "TestValue"_sr;
|
||||
state KeyRange testRange = KeyRangeRef(key, endKey);
|
||||
|
||||
int ignore = wait(setDDMode(cx, 0));
|
||||
state Version version = wait(self->writeAndVerify(self, cx, key, oldValue));
|
||||
|
||||
// Create checkpoint.
|
||||
state Transaction tr(cx);
|
||||
state CheckpointFormat format = deterministicRandom()->coinflip() ? RocksDBColumnFamily : RocksDB;
|
||||
loop {
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
wait(createCheckpoint(&tr, testRange, format));
|
||||
wait(tr.commit());
|
||||
version = tr.getCommittedVersion();
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
|
||||
TraceEvent("TestCheckpointCreated").detail("Range", testRange).detail("Version", version);
|
||||
|
||||
// Fetch checkpoint meta data.
|
||||
loop {
|
||||
try {
|
||||
state std::vector<CheckpointMetaData> records =
|
||||
wait(getCheckpointMetaData(cx, testRange, version, format));
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
TraceEvent("TestFetchCheckpointMetadataError")
|
||||
.errorUnsuppressed(e)
|
||||
.detail("Range", testRange)
|
||||
.detail("Version", version);
|
||||
|
||||
// The checkpoint was just created, we don't expect this error.
|
||||
ASSERT(e.code() != error_code_checkpoint_not_found);
|
||||
}
|
||||
}
|
||||
|
||||
TraceEvent("TestCheckpointFetched")
|
||||
.detail("Range", testRange)
|
||||
.detail("Version", version)
|
||||
.detail("Checkpoints", describe(records));
|
||||
|
||||
state std::string pwd = platform::getWorkingDirectory();
|
||||
state std::string folder = pwd + "/checkpoints";
|
||||
platform::eraseDirectoryRecursive(folder);
|
||||
ASSERT(platform::createDirectory(folder));
|
||||
|
||||
// Fetch checkpoint.
|
||||
state std::vector<CheckpointMetaData> fetchedCheckpoints;
|
||||
state int i = 0;
|
||||
for (; i < records.size(); ++i) {
|
||||
loop {
|
||||
TraceEvent("TestFetchingCheckpoint").detail("Checkpoint", records[i].toString());
|
||||
try {
|
||||
state CheckpointMetaData record = wait(fetchCheckpoint(cx, records[0], folder));
|
||||
fetchedCheckpoints.push_back(record);
|
||||
TraceEvent("TestCheckpointFetched").detail("Checkpoint", record.toString());
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
TraceEvent("TestFetchCheckpointError")
|
||||
.errorUnsuppressed(e)
|
||||
.detail("Checkpoint", records[i].toString());
|
||||
wait(delay(1));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
state std::string rocksDBTestDir = "rocksdb-kvstore-test-db";
|
||||
platform::eraseDirectoryRecursive(rocksDBTestDir);
|
||||
|
||||
// Restore KVS.
|
||||
state IKeyValueStore* kvStore = keyValueStoreRocksDB(
|
||||
rocksDBTestDir, deterministicRandom()->randomUniqueID(), KeyValueStoreType::SSD_ROCKSDB_V1);
|
||||
wait(kvStore->init());
|
||||
try {
|
||||
wait(kvStore->restore(fetchedCheckpoints));
|
||||
} catch (Error& e) {
|
||||
TraceEvent(SevError, "TestRestoreCheckpointError")
|
||||
.errorUnsuppressed(e)
|
||||
.detail("Checkpoint", describe(records));
|
||||
}
|
||||
|
||||
// Compare the keyrange between the original database and the one restored from checkpoint.
|
||||
// For now, it should have been a single key.
|
||||
tr.reset();
|
||||
loop {
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
state RangeResult res = wait(tr.getRange(KeyRangeRef(key, endKey), CLIENT_KNOBS->TOO_MANY));
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
|
||||
RangeResult kvRange = wait(kvStore->readRange(testRange));
|
||||
ASSERT(res.size() == kvRange.size());
|
||||
for (int i = 0; i < res.size(); ++i) {
|
||||
ASSERT(res[i] == kvRange[i]);
|
||||
}
|
||||
|
||||
int ignore = wait(setDDMode(cx, 1));
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> readAndVerify(SSCheckpointRestoreWorkload* self,
|
||||
Database cx,
|
||||
Key key,
|
||||
ErrorOr<Optional<Value>> expectedValue) {
|
||||
state Transaction tr(cx);
|
||||
|
||||
loop {
|
||||
try {
|
||||
state Optional<Value> res = wait(timeoutError(tr.get(key), 30.0));
|
||||
const bool equal = !expectedValue.isError() && res == expectedValue.get();
|
||||
if (!equal) {
|
||||
self->validationFailed(expectedValue, ErrorOr<Optional<Value>>(res));
|
||||
}
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
if (expectedValue.isError() && expectedValue.getError().code() == e.code()) {
|
||||
break;
|
||||
}
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Version> writeAndVerify(SSCheckpointRestoreWorkload* self,
|
||||
Database cx,
|
||||
Key key,
|
||||
Optional<Value> value) {
|
||||
state Transaction tr(cx);
|
||||
state Version version;
|
||||
loop {
|
||||
try {
|
||||
if (value.present()) {
|
||||
tr.set(key, value.get());
|
||||
} else {
|
||||
tr.clear(key);
|
||||
}
|
||||
wait(timeoutError(tr.commit(), 30.0));
|
||||
version = tr.getCommittedVersion();
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
|
||||
wait(self->readAndVerify(self, cx, key, value));
|
||||
|
||||
return version;
|
||||
}
|
||||
|
||||
Future<bool> check(Database const& cx) override { return pass; }
|
||||
|
||||
void getMetrics(std::vector<PerfMetric>& m) override {}
|
||||
};
|
||||
|
||||
WorkloadFactory<SSCheckpointRestoreWorkload> SSCheckpointRestoreWorkloadFactory("SSCheckpointRestoreWorkload");
|
|
@ -192,10 +192,12 @@ if(WITH_PYTHON)
|
|||
add_fdb_test(TEST_FILES noSim/KeyValueStoreRocksDBTest.toml)
|
||||
add_fdb_test(TEST_FILES noSim/ShardedRocksDBTest.toml UNIT)
|
||||
add_fdb_test(TEST_FILES fast/PhysicalShardMove.toml)
|
||||
add_fdb_test(TEST_FILES fast/StorageServerCheckpointRestore.toml)
|
||||
else()
|
||||
add_fdb_test(TEST_FILES noSim/KeyValueStoreRocksDBTest.toml IGNORE)
|
||||
add_fdb_test(TEST_FILES noSim/ShardedRocksDBTest.toml IGNORE)
|
||||
add_fdb_test(TEST_FILES fast/PhysicalShardMove.toml IGNORE)
|
||||
add_fdb_test(TEST_FILES fast/StorageServerCheckpointRestore.toml IGNORE)
|
||||
endif()
|
||||
add_fdb_test(TEST_FILES rare/CheckRelocation.toml)
|
||||
add_fdb_test(TEST_FILES rare/ClogUnclog.toml)
|
||||
|
|
|
@ -6,9 +6,13 @@ coordinators = 3
|
|||
machineCount = 15
|
||||
allowDefaultTenant = false
|
||||
|
||||
[[knobs]]
|
||||
shard_encode_location_metadata = true
|
||||
storage_server_shard_aware = true
|
||||
|
||||
[[test]]
|
||||
testTitle = 'PhysicalShardMove'
|
||||
useDB = true
|
||||
|
||||
[[test.workload]]
|
||||
testName = 'SSCheckpointWorkload'
|
||||
testName = 'PhysicalShardMove'
|
||||
|
|
|
@ -0,0 +1,14 @@
|
|||
[configuration]
|
||||
config = 'triple'
|
||||
storageEngineType = 4
|
||||
processesPerMachine = 1
|
||||
coordinators = 3
|
||||
machineCount = 15
|
||||
allowDefaultTenant = false
|
||||
|
||||
[[test]]
|
||||
testTitle = 'SSCheckpointRestoreWorkload'
|
||||
useDB = true
|
||||
|
||||
[[test.workload]]
|
||||
testName = 'SSCheckpointRestoreWorkload'
|
Loading…
Reference in New Issue