diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index 0ecac5e54a..c2049aea2d 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -740,6 +740,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( MAX_PARALLEL_QUICK_GET_VALUE, 50 ); if ( randomize && BUGGIFY ) MAX_PARALLEL_QUICK_GET_VALUE = deterministicRandom()->randomInt(1, 100); init( QUICK_GET_KEY_VALUES_LIMIT, 2000 ); init( QUICK_GET_KEY_VALUES_LIMIT_BYTES, 1e7 ); + init( STORAGE_SERVER_SHARD_AWARE, true ); //Wait Failure init( MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS, 250 ); if( randomize && BUGGIFY ) MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS = 2; diff --git a/fdbclient/SystemData.cpp b/fdbclient/SystemData.cpp index 753a37ffa5..e2bbf5af3b 100644 --- a/fdbclient/SystemData.cpp +++ b/fdbclient/SystemData.cpp @@ -475,18 +475,21 @@ const Value serverKeysValue(const UID& id) { void decodeServerKeysValue(const ValueRef& value, bool& assigned, bool& emptyRange, UID& id) { if (value.size() == 0) { - id = UID(); assigned = false; emptyRange = false; + id = UID(); } else if (value == serverKeysTrue) { assigned = true; emptyRange = false; + id = anonymousShardId; } else if (value == serverKeysTrueEmptyRange) { assigned = true; emptyRange = true; + id = anonymousShardId; } else if (value == serverKeysFalse) { assigned = false; emptyRange = false; + id = UID(); } else { BinaryReader rd(value, IncludeVersion()); ASSERT(rd.protocolVersion().hasShardEncodeLocationMetaData()); diff --git a/fdbclient/include/fdbclient/ServerKnobs.h b/fdbclient/include/fdbclient/ServerKnobs.h index d3e10d61af..a1e0299930 100644 --- a/fdbclient/include/fdbclient/ServerKnobs.h +++ b/fdbclient/include/fdbclient/ServerKnobs.h @@ -698,6 +698,7 @@ public: int CHECKPOINT_TRANSFER_BLOCK_BYTES; int QUICK_GET_KEY_VALUES_LIMIT; int QUICK_GET_KEY_VALUES_LIMIT_BYTES; + bool STORAGE_SERVER_SHARD_AWARE; // Wait Failure int MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS; diff --git a/fdbclient/include/fdbclient/StorageServerInterface.h b/fdbclient/include/fdbclient/StorageServerInterface.h index b9d6349fd5..c873703082 100644 --- a/fdbclient/include/fdbclient/StorageServerInterface.h +++ b/fdbclient/include/fdbclient/StorageServerInterface.h @@ -25,6 +25,7 @@ #include #include "fdbclient/FDBTypes.h" #include "fdbclient/StorageCheckpoint.h" +#include "fdbclient/StorageServerShard.h" #include "fdbrpc/Locality.h" #include "fdbrpc/QueueModel.h" #include "fdbrpc/fdbrpc.h" @@ -572,12 +573,13 @@ struct GetShardStateReply { Version first; Version second; + std::vector shards; GetShardStateReply() = default; GetShardStateReply(Version first, Version second) : first(first), second(second) {} template void serialize(Ar& ar) { - serializer(ar, first, second); + serializer(ar, first, second, shards); } }; @@ -587,13 +589,16 @@ struct GetShardStateRequest { KeyRange keys; int32_t mode; + bool includePhysicalShard; ReplyPromise reply; - GetShardStateRequest() {} - GetShardStateRequest(KeyRange const& keys, waitMode mode) : keys(keys), mode(mode) {} + GetShardStateRequest() = default; + GetShardStateRequest(KeyRange const& keys, waitMode mode, bool includePhysicalShard) + : keys(keys), mode(mode), includePhysicalShard(includePhysicalShard) {} + GetShardStateRequest(KeyRange const& keys, waitMode mode) : keys(keys), mode(mode), includePhysicalShard(false) {} template void serialize(Ar& ar) { - serializer(ar, keys, mode, reply); + serializer(ar, keys, mode, reply, includePhysicalShard); } }; diff --git a/fdbclient/include/fdbclient/StorageServerShard.h b/fdbclient/include/fdbclient/StorageServerShard.h new file mode 100644 index 0000000000..dce0d5a6fa --- /dev/null +++ b/fdbclient/include/fdbclient/StorageServerShard.h @@ -0,0 +1,88 @@ +/* + * StorageServerShard.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FDBCLIENT_STORAGESERVERSHARD_H +#define FDBCLIENT_STORAGESERVERSHARD_H +#pragma once + +#include "fdbclient/FDBTypes.h" +#include "flow/flow.h" + +// Represents a data shard on a storage server hosting a continuous keyrange. +struct StorageServerShard { + constexpr static FileIdentifier file_identifier = 4028358; + + enum ShardState { + NotAssigned = 0, + MovingIn = 1, + ReadWritePending = 2, + ReadWrite = 3, + }; + + StorageServerShard() = default; + StorageServerShard(KeyRange range, + Version version, + const uint64_t id, + const uint64_t desiredId, + ShardState shardState) + : range(range), version(version), id(id), desiredId(desiredId), shardState(shardState) {} + + static StorageServerShard notAssigned(KeyRange range, Version version = 0) { + return StorageServerShard(range, version, 0, 0, NotAssigned); + } + + ShardState getShardState() const { return static_cast(this->shardState); }; + + void setShardState(const ShardState shardState) { this->shardState = static_cast(shardState); } + + std::string getShardStateString() const { + const ShardState ss = getShardState(); + switch (ss) { + case NotAssigned: + return "NotAssigned"; + case MovingIn: + return "MovingIn"; + case ReadWritePending: + return "ReadWritePending"; + case ReadWrite: + return "ReadWrite"; + } + return "InvalidState"; + } + + std::string toString() const { + return "StorageServerShard: [Range]: " + Traceable::toString(range) + + " [Shard ID]: " + format("%016llx", this->id) + " [Version]: " + std::to_string(version) + + " [State]: " + getShardStateString() + " [Desired Shard ID]: " + format("%016llx", this->desiredId); + } + + template + void serialize(Ar& ar) { + serializer(ar, range, version, id, desiredId, shardState); + } + + KeyRange range; + Version version; // Shard creation version. + uint64_t id; // The actual shard ID. + uint64_t desiredId; // The intended shard ID. + int8_t shardState; +}; + +#endif diff --git a/fdbserver/KeyValueStoreShardedRocksDB.actor.cpp b/fdbserver/KeyValueStoreShardedRocksDB.actor.cpp index 07fd99fef5..d0dd490202 100644 --- a/fdbserver/KeyValueStoreShardedRocksDB.actor.cpp +++ b/fdbserver/KeyValueStoreShardedRocksDB.actor.cpp @@ -2217,6 +2217,8 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore { KeyValueStoreType getType() const override { return KeyValueStoreType(KeyValueStoreType::SSD_SHARDED_ROCKSDB); } + bool shardAware() const override { return true; } + Future init() override { if (openFuture.isValid()) { return openFuture; diff --git a/fdbserver/include/fdbserver/IKeyValueStore.h b/fdbserver/include/fdbserver/IKeyValueStore.h index 942ff37539..6eec8f4b76 100644 --- a/fdbserver/include/fdbserver/IKeyValueStore.h +++ b/fdbserver/include/fdbserver/IKeyValueStore.h @@ -61,6 +61,9 @@ public: class IKeyValueStore : public IClosable { public: virtual KeyValueStoreType getType() const = 0; + // Returns true if the KV store supports shards, i.e., implements addRange(), removeRange(), and + // persistRangeMapping(). + virtual bool shardAware() const { return false; } virtual void set(KeyValueRef keyValue, const Arena* arena = nullptr) = 0; virtual void clear(KeyRangeRef range, const Arena* arena = nullptr) = 0; virtual Future canCommit() { return Void(); } diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 56df253b13..a7225ecc82 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -41,18 +41,23 @@ #include "fdbclient/Tracing.h" #include "flow/Util.h" #include "fdbclient/Atomic.h" -#include "fdbclient/DatabaseContext.h" -#include "fdbclient/KeyRangeMap.h" #include "fdbclient/CommitProxyInterface.h" +#include "fdbclient/DatabaseContext.h" +#include "fdbclient/FDBTypes.h" #include "fdbclient/KeyBackedTypes.h" +#include "fdbclient/KeyRangeMap.h" #include "fdbclient/NativeAPI.actor.h" #include "fdbclient/Notified.h" #include "fdbclient/StatusClient.h" -#include "fdbclient/Tenant.h" -#include "fdbclient/Tuple.h" +#include "fdbclient/StorageServerShard.h" #include "fdbclient/SystemData.h" +#include "fdbclient/Tenant.h" #include "fdbclient/TransactionLineage.h" +#include "fdbclient/Tuple.h" #include "fdbclient/VersionedMap.h" +#include "fdbrpc/sim_validation.h" +#include "fdbrpc/Smoother.h" +#include "fdbrpc/Stats.h" #include "fdbserver/EncryptedMutationMessage.h" #include "fdbserver/FDBExecHelper.actor.h" #include "fdbserver/GetEncryptCipherKeys.h" @@ -60,23 +65,31 @@ #include "fdbserver/Knobs.h" #include "fdbserver/LatencyBandConfig.h" #include "fdbserver/LogProtocolMessage.h" -#include "fdbserver/SpanContextMessage.h" #include "fdbserver/LogSystem.h" #include "fdbserver/MoveKeys.actor.h" #include "fdbserver/MutationTracking.h" +#include "fdbserver/OTELSpanContextMessage.h" #include "fdbserver/RecoveryState.h" #include "fdbserver/RocksDBCheckpointUtils.actor.h" -#include "fdbserver/StorageMetrics.h" #include "fdbserver/ServerCheckpoint.actor.h" #include "fdbserver/ServerDBInfo.h" +#include "fdbserver/SpanContextMessage.h" +#include "fdbserver/StorageMetrics.h" #include "fdbserver/TLogInterface.h" #include "fdbserver/TransactionTagCounter.h" #include "fdbserver/WaitFailure.h" #include "fdbserver/WorkerInterface.actor.h" -#include "fdbrpc/sim_validation.h" -#include "fdbrpc/Smoother.h" -#include "fdbrpc/Stats.h" +#include "flow/ActorCollection.h" +#include "flow/Arena.h" +#include "flow/Error.h" +#include "flow/Hash3.h" +#include "flow/Histogram.h" +#include "flow/IRandom.h" +#include "flow/IndexedSet.h" +#include "flow/SystemMonitor.h" #include "flow/TDMetric.actor.h" +#include "flow/Trace.h" +#include "flow/Util.h" #include "flow/genericactors.actor.h" #include "flow/actorcompiler.h" // This must be the last #include. @@ -121,11 +134,16 @@ bool canReplyWith(Error e) { #define PERSIST_PREFIX "\xff\xff" +FDB_DECLARE_BOOLEAN_PARAM(UnlimitedCommitBytes); +FDB_DEFINE_BOOLEAN_PARAM(UnlimitedCommitBytes); + // Immutable static const KeyValueRef persistFormat(LiteralStringRef(PERSIST_PREFIX "Format"), LiteralStringRef("FoundationDB/StorageServer/1/4")); +static const KeyValueRef persistShardAwareFormat(LiteralStringRef(PERSIST_PREFIX "Format"), + LiteralStringRef("FoundationDB/StorageServer/1/5")); static const KeyRangeRef persistFormatReadableRange(LiteralStringRef("FoundationDB/StorageServer/1/2"), - LiteralStringRef("FoundationDB/StorageServer/1/5")); + LiteralStringRef("FoundationDB/StorageServer/1/6")); static const KeyRef persistID = LiteralStringRef(PERSIST_PREFIX "ID"); static const KeyRef persistTssPairID = LiteralStringRef(PERSIST_PREFIX "tssPairID"); static const KeyRef persistSSPairID = LiteralStringRef(PERSIST_PREFIX "ssWithTSSPairID"); @@ -151,6 +169,10 @@ static const KeyRangeRef persistTenantMapKeys = KeyRangeRef(LiteralStringRef(PERSIST_PREFIX "TM/"), LiteralStringRef(PERSIST_PREFIX "TM0")); // data keys are unmangled (but never start with PERSIST_PREFIX because they are always in allKeys) +static const KeyRangeRef persistStorageServerShardKeys = + KeyRangeRef(LiteralStringRef(PERSIST_PREFIX "StorageServerShard/"), + LiteralStringRef(PERSIST_PREFIX "StorageServerShard0")); + // Checkpoint related prefixes. static const KeyRangeRef persistCheckpointKeys = KeyRangeRef(LiteralStringRef(PERSIST_PREFIX "Checkpoint/"), LiteralStringRef(PERSIST_PREFIX "Checkpoint0")); @@ -212,7 +234,7 @@ struct AddingShard : NonCopyable { class ShardInfo : public ReferenceCounted, NonCopyable { ShardInfo(KeyRange keys, std::unique_ptr&& adding, StorageServer* readWrite) - : adding(std::move(adding)), readWrite(readWrite), keys(keys) {} + : adding(std::move(adding)), readWrite(readWrite), keys(keys), version(0) {} public: // A shard has 3 mutual exclusive states: adding, readWrite and notAssigned. @@ -220,6 +242,9 @@ public: struct StorageServer* readWrite; KeyRange keys; uint64_t changeCounter; + uint64_t shardId; + uint64_t desiredShardId; + Version version; static ShardInfo* newNotAssigned(KeyRange keys) { return new ShardInfo(keys, nullptr, nullptr); } static ShardInfo* newReadWrite(KeyRange keys, StorageServer* data) { return new ShardInfo(keys, nullptr, data); } @@ -230,6 +255,73 @@ public: return new ShardInfo(keys, std::make_unique(oldShard, keys), nullptr); } + static ShardInfo* newShard(StorageServer* data, const StorageServerShard& shard) { + ShardInfo* res = nullptr; + switch (shard.getShardState()) { + case StorageServerShard::NotAssigned: + res = newNotAssigned(shard.range); + break; + case StorageServerShard::MovingIn: + case StorageServerShard::ReadWritePending: + res = newAdding(data, shard.range); + break; + case StorageServerShard::ReadWrite: + res = newReadWrite(shard.range, data); + break; + default: + TraceEvent(SevError, "UnknownShardState").detail("State", shard.shardState); + } + res->populateShard(shard); + return res; + } + + static bool canMerge(const ShardInfo* l, const ShardInfo* r) { + if (l == nullptr || r == nullptr || l->keys.end != r->keys.begin || l->version == invalidVersion || + r->version == invalidVersion) { + return false; + } + if (l->shardId != r->shardId || l->desiredShardId != r->desiredShardId) { + return false; + } + return (l->isReadable() && r->isReadable()) || (!l->assigned() && !r->assigned()); + } + + StorageServerShard toStorageServerShard() const { + StorageServerShard::ShardState st; + if (this->isReadable()) { + st = StorageServerShard::ReadWrite; + } else if (!this->assigned()) { + st = StorageServerShard::NotAssigned; + } else { + ASSERT(this->adding); + if (this->adding) + st = this->adding->phase == AddingShard::Waiting ? StorageServerShard::ReadWritePending + : StorageServerShard::MovingIn; + } + return StorageServerShard(this->keys, this->version, this->shardId, this->desiredShardId, st); + } + + // Copies necessary information from `shard`. + void populateShard(const StorageServerShard& shard) { + this->version = shard.version; + this->shardId = shard.id; + this->desiredShardId = shard.desiredId; + } + + // Returns true if the current shard is merged with `other`. + bool mergeWith(const ShardInfo* other) { + if (!canMerge(this, other)) { + return false; + } + this->keys = KeyRangeRef(this->keys.begin, other->keys.end); + this->version = std::max(this->version, other->version); + return true; + } + + void validate() const { + // TODO: Complete this. + } + bool isReadable() const { return readWrite != nullptr; } bool notAssigned() const { return !readWrite && !adding; } bool assigned() const { return readWrite || adding; } @@ -255,8 +347,11 @@ public: struct StorageServerDisk { explicit StorageServerDisk(struct StorageServer* data, IKeyValueStore* storage) : data(data), storage(storage) {} - void makeNewStorageServerDurable(); - bool makeVersionMutationsDurable(Version& prevStorageVersion, Version newStorageVersion, int64_t& bytesLeft); + void makeNewStorageServerDurable(const bool shardAware); + bool makeVersionMutationsDurable(Version& prevStorageVersion, + Version newStorageVersion, + int64_t& bytesLeft, + UnlimitedCommitBytes unlimitedCommitBytes); void makeVersionDurable(Version version); void makeTssQuarantineDurable(); Future restoreDurableState(); @@ -267,6 +362,16 @@ struct StorageServerDisk { void writeKeyValue(KeyValueRef kv); void clearRange(KeyRangeRef keys); + Future addRange(KeyRangeRef range, std::string id) { return storage->addRange(range, id); } + + std::vector removeRange(KeyRangeRef range) { return storage->removeRange(range); } + + void persistRangeMapping(KeyRangeRef range, bool isAdd) { storage->persistRangeMapping(range, isAdd); } + + Future cleanUpShardsIfNeeded(const std::vector& shardIds) { + return storage->cleanUpShardsIfNeeded(shardIds); + }; + Future getError() { return storage->getError(); } Future init() { return storage->init(); } Future canCommit() { return storage->canCommit(); } @@ -541,10 +646,29 @@ private: std::unordered_map> watchMap; // keep track of server watches public: + struct PendingNewShard { + PendingNewShard(uint64_t shardId, KeyRangeRef range) : shardId(format("%016llx", shardId)), range(range) {} + + std::string toString() const { + return format("PendingNewShard: [ShardID]: %s [Range]: %s", + this->shardId, + Traceable::toString(this->range)); + } + + std::string shardId; + KeyRange range; + }; + std::map> pendingCheckpoints; // Pending checkpoint requests std::unordered_map checkpoints; // Existing and deleting checkpoints TenantMap tenantMap; TenantPrefixIndex tenantPrefixIndex; + std::map> + pendingAddRanges; // Pending requests to add ranges to physical shards + std::map> + pendingRemoveRanges; // Pending requests to remove ranges from physical shards + + bool shardAware; // True if the storage server is aware of the physical shards. // Histograms struct FetchKeysHistograms { @@ -586,6 +710,8 @@ public: Optional getTenantEntry(Version version, TenantInfo tenant); KeyRangeRef clampRangeToTenant(KeyRangeRef range, Optional tenantEntry, Arena& arena); + std::vector getStorageServerShards(KeyRangeRef range); + class CurrentRunningFetchKeys { std::unordered_map startTimeMap; std::unordered_map keyRangeMap; @@ -1038,9 +1164,9 @@ public: StorageServer(IKeyValueStore* storage, Reference const> const& db, StorageServerInterface const& ssi) - : tlogCursorReadsLatencyHistogram(Histogram::getHistogram(STORAGESERVER_HISTOGRAM_GROUP, - TLOG_CURSOR_READS_LATENCY_HISTOGRAM, - Histogram::Unit::microseconds)), + : shardAware(false), tlogCursorReadsLatencyHistogram(Histogram::getHistogram(STORAGESERVER_HISTOGRAM_GROUP, + TLOG_CURSOR_READS_LATENCY_HISTOGRAM, + Histogram::Unit::microseconds)), ssVersionLockLatencyHistogram(Histogram::getHistogram(STORAGESERVER_HISTOGRAM_GROUP, SS_VERSION_LOCK_LATENCY_HISTOGRAM, Histogram::Unit::microseconds)), @@ -1085,7 +1211,12 @@ public: newestAvailableVersion.insert(allKeys, invalidVersion); newestDirtyVersion.insert(allKeys, invalidVersion); - addShard(ShardInfo::newNotAssigned(allKeys)); + if (CLIENT_KNOBS->SHARD_ENCODE_LOCATION_METADATA && + (SERVER_KNOBS->STORAGE_SERVER_SHARD_AWARE || storage->shardAware())) { + addShard(ShardInfo::newShard(this, StorageServerShard::notAssigned(allKeys))); + } else { + addShard(ShardInfo::newNotAssigned(allKeys)); + } cx = openDBOnServer(db, TaskPriority::DefaultEndpoint, LockAware::True); @@ -1348,16 +1479,33 @@ void validate(StorageServer* data, bool force = false) { data->newestDirtyVersion.validateCoalesced(); for (auto s = data->shards.ranges().begin(); s != data->shards.ranges().end(); ++s) { + TraceEvent(SevVerbose, "ValidateShard", data->thisServerID) + .detail("Range", s->range()) + .detail("ShardRange", s->value()->keys) + .detail("ShardState", s->value()->debugDescribeState()) + .log(); ASSERT(s->value()->keys == s->range()); ASSERT(!s->value()->keys.empty()); + if (data->shardAware) { + s->value()->validate(); + } } - for (auto s = data->shards.ranges().begin(); s != data->shards.ranges().end(); ++s) + for (auto s = data->shards.ranges().begin(); s != data->shards.ranges().end(); ++s) { if (s->value()->isReadable()) { auto ar = data->newestAvailableVersion.intersectingRanges(s->range()); - for (auto a = ar.begin(); a != ar.end(); ++a) + for (auto a = ar.begin(); a != ar.end(); ++a) { + TraceEvent(SevVerbose, "ValidateShardReadable", data->thisServerID) + .detail("Range", s->range()) + .detail("ShardRange", s->value()->keys) + .detail("ShardState", s->value()->debugDescribeState()) + .detail("AvailableRange", a->range()) + .detail("AvailableVersion", a->value()) + .log(); ASSERT(a->value() == latestVersion); + } } + } // * versionedData contains versions [storageVersion(), version.get()]. It might also contain version // (version.get()+1), in which changeDurableVersion may be deleting ghosts, and/or it might @@ -1559,6 +1707,14 @@ Optional StorageServer::getTenantEntry(Version version, TenantIn return Optional(); } +std::vector StorageServer::getStorageServerShards(KeyRangeRef range) { + std::vector res; + for (auto t : this->shards.intersectingRanges(range)) { + res.push_back(t.value()->toStorageServerShard()); + } + return res; +} + ACTOR Future getValueQ(StorageServer* data, GetValueRequest req) { state int64_t resultSize = 0; Span span("SS:getValue"_loc, req.spanContext); @@ -2856,7 +3012,11 @@ ACTOR Future getShardState_impl(StorageServer* data, GetShardStateRequest } if (!onChange.size()) { - req.reply.send(GetShardStateReply{ data->version.get(), data->durableVersion.get() }); + GetShardStateReply rep(data->version.get(), data->durableVersion.get()); + if (req.includePhysicalShard) { + rep.shards = std::move(data->getStorageServerShards(req.keys)); + } + req.reply.send(rep); return Void(); } @@ -3366,10 +3526,28 @@ KeyRange getShardKeyRange(StorageServer* data, const KeySelectorRef& sel) { auto i = sel.isBackward() ? data->shards.rangeContainingKeyBefore(sel.getKey()) : data->shards.rangeContaining(sel.getKey()); + auto fullRange = data->shards.ranges(); if (!i->value()->isReadable()) throw wrong_shard_server(); ASSERT(selectorInRange(sel, i->range())); - return i->range(); + Key begin, end; + if (sel.isBackward()) { + end = i->range().end; + while (i != fullRange.begin() && i.value()->isReadable()) { + begin = i->range().begin; + --i; + } + if (i.value()->isReadable()) { + begin = i->range().begin; + } + } else { + begin = i->range().begin; + while (i != fullRange.end() && i.value()->isReadable()) { + end = i->range().end; + ++i; + } + } + return KeyRangeRef(begin, end); } ACTOR Future getKeyValuesQ(StorageServer* data, GetKeyValuesRequest req) @@ -4978,6 +5156,34 @@ void removeDataRange(StorageServer* ss, void setAvailableStatus(StorageServer* self, KeyRangeRef keys, bool available); void setAssignedStatus(StorageServer* self, KeyRangeRef keys, bool nowAssigned); +void updateStorageShard(StorageServer* self, StorageServerShard shard); + +void coalescePhysicalShards(StorageServer* data, KeyRangeRef keys) { + auto shardRanges = data->shards.intersectingRanges(keys); + auto fullRange = data->shards.ranges(); + + auto iter = shardRanges.begin(); + if (iter != fullRange.begin()) { + --iter; + } + auto iterEnd = shardRanges.end(); + if (iterEnd != fullRange.end()) { + ++iterEnd; + } + + KeyRangeMap>::iterator lastShard = iter; + ++iter; + + for (; iter != iterEnd; ++iter) { + if (ShardInfo::canMerge(lastShard.value().getPtr(), iter->value().getPtr())) { + ShardInfo* newShard = lastShard.value().extractPtr(); + ASSERT(newShard->mergeWith(iter->value().getPtr())); + data->addShard(newShard); + iter = data->shards.rangeContaining(newShard->keys.begin); + } + lastShard = iter; + } +} void coalesceShards(StorageServer* data, KeyRangeRef keys) { auto shardRanges = data->shards.intersectingRanges(keys); @@ -6100,8 +6306,16 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { // This actor finishes committing the keys [keys.begin,nfk) that we already fetched. // The remaining unfetched keys [nfk,keys.end) will become a separate AddingShard with its own // fetchKeys. - shard->server->addShard(ShardInfo::addingSplitLeft(KeyRangeRef(keys.begin, nfk), shard)); - shard->server->addShard(ShardInfo::newAdding(data, KeyRangeRef(nfk, keys.end))); + if (data->shardAware) { + StorageServerShard rightShard = data->shards[keys.begin]->toStorageServerShard(); + rightShard.range = KeyRangeRef(nfk, keys.end); + shard->server->addShard(ShardInfo::addingSplitLeft(KeyRangeRef(keys.begin, nfk), shard)); + shard->server->addShard(ShardInfo::newShard(data, rightShard)); + data->shards[keys.begin]->populateShard(rightShard); + } else { + shard->server->addShard(ShardInfo::addingSplitLeft(KeyRangeRef(keys.begin, nfk), shard)); + shard->server->addShard(ShardInfo::newAdding(data, KeyRangeRef(nfk, keys.end))); + } shard = data->shards.rangeContaining(keys.begin).value()->adding.get(); warningLogger = logFetchKeysWarning(shard); AddingShard* otherShard = data->shards.rangeContaining(nfk).value()->adding.get(); @@ -6271,6 +6485,12 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { .detail("Version", feedTransferredVersion) .detail("StorageVersion", data->storageVersion()); + if (data->shardAware) { + state StorageServerShard newShard = data->shards[keys.begin]->toStorageServerShard(); + ASSERT(newShard.range == keys); + ASSERT(newShard.getShardState() == StorageServerShard::ReadWritePending); + updateStorageShard(data, newShard); + } setAvailableStatus(data, keys, true); // keys will be available when getLatestVersion()==transferredVersion is durable @@ -6286,8 +6506,14 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { shard->keys); // We aren't changing whether the shard is assigned data->newestAvailableVersion.insert(shard->keys, latestVersion); shard->readWrite.send(Void()); - data->addShard(ShardInfo::newReadWrite(shard->keys, data)); // invalidates shard! - coalesceShards(data, keys); + if (data->shardAware) { + newShard.setShardState(StorageServerShard::ReadWrite); + data->addShard(ShardInfo::newShard(data, newShard)); // invalidates shard! + coalescePhysicalShards(data, keys); + } else { + data->addShard(ShardInfo::newReadWrite(shard->keys, data)); // invalidates shard! + coalesceShards(data, keys); + } validate(data); @@ -6386,6 +6612,129 @@ void ShardInfo::addMutation(Version version, bool fromFetch, MutationRef const& enum ChangeServerKeysContext { CSK_UPDATE, CSK_RESTORE, CSK_ASSIGN_EMPTY }; const char* changeServerKeysContextName[] = { "Update", "Restore" }; +ACTOR Future restoreShards(StorageServer* data, + Version version, + RangeResult storageShards, + RangeResult assignedShards, + RangeResult availableShards) { + TraceEvent(SevInfo, "StorageServerRestoreShardsBegin", data->thisServerID) + .detail("StorageShard", storageShards.size()) + .detail("Version", version); + + state int mLoc; + for (mLoc = 0; mLoc < storageShards.size(); ++mLoc) { + const KeyRangeRef shardRange(storageShards[mLoc].key.removePrefix(persistStorageServerShardKeys.begin), + mLoc + 1 == storageShards.size() ? allKeys.end + : storageShards[mLoc + 1].key.removePrefix( + persistStorageServerShardKeys.begin)); + StorageServerShard shard = + ObjectReader::fromStringRef(storageShards[mLoc].value, IncludeVersion()); + shard.range = shardRange; + TraceEvent(SevVerbose, "RestoreShardsStorageShard", data->thisServerID) + .detail("Range", shardRange) + .detail("StorageShard", shard.toString()); + + // TODO(psm): Schedule deletion of finished moveInShard. + const StorageServerShard::ShardState shardState = shard.getShardState(); + auto existingShards = data->shards.intersectingRanges(shardRange); + for (auto it = existingShards.begin(); it != existingShards.end(); ++it) { + TraceEvent(SevVerbose, "RestoreShardsIntersectingRange", data->thisServerID) + .detail("StorageShard", shard.toString()) + .detail("IntersectingShardRange", it->value()->keys) + .detail("IntersectingShardState", it->value()->debugDescribeState()) + .log(); + ASSERT(it->value()->notAssigned()); + } + + if (shardState == StorageServerShard::NotAssigned) { + continue; + } + + auto ranges = data->shards.getAffectedRangesAfterInsertion(shard.range, Reference()); + for (int i = 0; i < ranges.size(); i++) { + KeyRangeRef& range = static_cast(ranges[i]); + TraceEvent(SevVerbose, "RestoreShardsInstallPhysicalShard", data->thisServerID) + .detail("Shard", shard.toString()) + .detail("Range", range); + if (range == shard.range) { + data->addShard(ShardInfo::newShard(data, shard)); + } else { + StorageServerShard rightShard = ranges[i].value->toStorageServerShard(); + rightShard.range = range; + data->addShard(ShardInfo::newShard(data, rightShard)); + } + } + + const bool nowAvailable = shard.getShardState() == StorageServerShard::ReadWrite; + data->newestAvailableVersion.insert(shard.range, nowAvailable ? latestVersion : invalidVersion); + + wait(yield()); + } + + state int availableLoc; + for (availableLoc = 0; availableLoc < availableShards.size(); availableLoc++) { + KeyRangeRef shardRange( + availableShards[availableLoc].key.removePrefix(persistShardAvailableKeys.begin), + availableLoc + 1 == availableShards.size() + ? allKeys.end + : availableShards[availableLoc + 1].key.removePrefix(persistShardAvailableKeys.begin)); + ASSERT(!shardRange.empty()); + + const bool nowAvailable = availableShards[availableLoc].value != LiteralStringRef("0"); + auto existingShards = data->shards.intersectingRanges(shardRange); + for (auto it = existingShards.begin(); it != existingShards.end(); ++it) { + TraceEvent(SevVerbose, "RestoreShardsValidateAvailable", data->thisServerID) + .detail("Range", shardRange) + .detail("Available", nowAvailable) + .detail("IntersectingShardRange", it->value()->keys) + .detail("IntersectingShardState", it->value()->debugDescribeState()) + .log(); + if (nowAvailable) { + ASSERT(it->value()->isReadable()); + ASSERT(data->newestAvailableVersion.allEqual(shardRange, latestVersion)); + } + } + + wait(yield()); + } + + state int assignedLoc; + for (assignedLoc = 0; assignedLoc < assignedShards.size(); ++assignedLoc) { + KeyRangeRef shardRange(assignedShards[assignedLoc].key.removePrefix(persistShardAssignedKeys.begin), + assignedLoc + 1 == assignedShards.size() + ? allKeys.end + : assignedShards[assignedLoc + 1].key.removePrefix(persistShardAssignedKeys.begin)); + ASSERT(!shardRange.empty()); + const bool nowAssigned = assignedShards[assignedLoc].value != LiteralStringRef("0"); + + auto existingShards = data->shards.intersectingRanges(shardRange); + for (auto it = existingShards.begin(); it != existingShards.end(); ++it) { + TraceEvent(SevVerbose, "RestoreShardsValidateAssigned", data->thisServerID) + .detail("Range", shardRange) + .detail("Assigned", nowAssigned) + .detail("IntersectingShardRange", it->value()->keys) + .detail("IntersectingShardState", it->value()->debugDescribeState()) + .log(); + + ASSERT_EQ(it->value()->assigned(), nowAssigned); + if (!nowAssigned) { + ASSERT(data->newestAvailableVersion.allEqual(shardRange, invalidVersion)); + } + } + + wait(yield()); + } + + TraceEvent(SevVerbose, "StorageServerRestoreShardsCoalesce", data->thisServerID).detail("Version", version); + coalescePhysicalShards(data, allKeys); + TraceEvent(SevVerbose, "StorageServerRestoreShardsValidate", data->thisServerID).detail("Version", version); + validate(data, /*force=*/true); + TraceEvent(SevInfo, "StorageServerRestoreShardsEnd", data->thisServerID).detail("Version", version); + + return Void(); +} + +// Finds any change feeds that no longer have shards on this server, and clean them up void cleanUpChangeFeeds(StorageServer* data, const KeyRangeRef& keys, Version version) { std::map candidateFeeds; auto ranges = data->keyChangeFeed.intersectingRanges(keys); @@ -6587,6 +6936,225 @@ void changeServerKeys(StorageServer* data, } validate(data); + if (!nowAssigned) { + cleanUpChangeFeeds(data, keys, version); + } +} + +void changeServerKeysWithPhysicalShards(StorageServer* data, + const KeyRangeRef& keys, + const UID& dataMoveId, + bool nowAssigned, + Version version, + ChangeServerKeysContext context) { + ASSERT(!keys.empty()); + TraceEvent(SevDebug, "ChangeServerKeysWithPhysicalShards", data->thisServerID) + .detail("DataMoveID", dataMoveId) + .detail("Range", keys) + .detail("NowAssigned", nowAssigned) + .detail("Version", version) + .detail("Context", changeServerKeysContextName[(int)context]); + + validate(data); + + DEBUG_KEY_RANGE(nowAssigned ? "KeysAssigned" : "KeysUnassigned", version, keys, data->thisServerID); + + const uint64_t desiredId = dataMoveId.first(); + const Version cVer = version + 1; + ASSERT(data->data().getLatestVersion() == cVer); + + // Save a backup of the ShardInfo references before we start messing with shards, in order to defer fetchKeys + // cancellation (and its potential call to removeDataRange()) until shards is again valid + std::vector> oldShards; + auto os = data->shards.intersectingRanges(keys); + for (auto r = os.begin(); r != os.end(); ++r) { + oldShards.push_back(r->value()); + } + + auto ranges = data->shards.getAffectedRangesAfterInsertion( + keys, + Reference()); // null reference indicates the range being changed + for (int i = 0; i < ranges.size(); i++) { + const Reference currentShard = ranges[i].value; + const KeyRangeRef currentRange = static_cast(ranges[i]); + if (!currentShard.isValid()) { + ASSERT(currentRange == keys); // there shouldn't be any nulls except for the range being inserted + } else if (currentShard->notAssigned()) { + ASSERT(nowAssigned); // Adding a new range to the server. + StorageServerShard newShard = currentShard->toStorageServerShard(); + newShard.range = currentRange; + data->addShard(ShardInfo::newShard(data, newShard)); + TraceEvent(SevVerbose, "SSSplitShardNotAssigned", data->thisServerID) + .detail("Range", keys) + .detail("NowAssigned", nowAssigned) + .detail("Version", cVer) + .detail("ResultingShard", newShard.toString()); + } else if (currentShard->isReadable()) { + StorageServerShard newShard = currentShard->toStorageServerShard(); + newShard.range = currentRange; + data->addShard(ShardInfo::newShard(data, newShard)); + TraceEvent(SevVerbose, "SSSplitShardReadable", data->thisServerID) + .detail("Range", keys) + .detail("NowAssigned", nowAssigned) + .detail("Version", cVer) + .detail("ResultingShard", newShard.toString()); + } else if (ranges[i].value->adding) { + ASSERT(!nowAssigned); + StorageServerShard newShard = currentShard->toStorageServerShard(); + newShard.range = currentRange; + // TODO(psm): Cancel or update the moving-in shard when physical shard move is enabled. + data->addShard(ShardInfo::newShard(data, newShard)); + TraceEvent(SevVerbose, "SSSplitShardAdding", data->thisServerID) + .detail("Range", keys) + .detail("NowAssigned", nowAssigned) + .detail("Version", cVer) + .detail("ResultingShard", newShard.toString()); + } else { + ASSERT(false); + } + } + + auto vr = data->shards.intersectingRanges(keys); + std::vector> changeNewestAvailable; + std::vector removeRanges; + std::vector newEmptyRanges; + std::vector updatedShards; + int totalAssignedAtVer = 0; + for (auto r = vr.begin(); r != vr.end(); ++r) { + KeyRangeRef range = keys & r->range(); + const bool dataAvailable = r->value()->isReadable(); + TraceEvent(SevVerbose, "CSKPhysicalShard", data->thisServerID) + .detail("Range", range) + .detail("ExistingShardRange", r->range()) + .detail("Available", dataAvailable) + .detail("NowAssigned", nowAssigned) + .detail("ShardState", r->value()->debugDescribeState()); + ASSERT(keys.contains(r->range())); + if (context == CSK_ASSIGN_EMPTY && !dataAvailable) { + ASSERT(nowAssigned); + TraceEvent(SevVerbose, "ChangeServerKeysAddEmptyRange", data->thisServerID) + .detail("Range", range) + .detail("Version", cVer); + newEmptyRanges.push_back(range); + updatedShards.emplace_back(range, cVer, desiredId, desiredId, StorageServerShard::ReadWrite); + } else if (!nowAssigned) { + if (dataAvailable) { + ASSERT(data->newestAvailableVersion[range.begin] == + latestVersion); // Not that we care, but this used to be checked instead of dataAvailable + ASSERT(data->mutableData().getLatestVersion() > version || context == CSK_RESTORE); + changeNewestAvailable.emplace_back(range, version); + removeRanges.push_back(range); + } + updatedShards.push_back(StorageServerShard::notAssigned(range, cVer)); + data->watches.triggerRange(range.begin, range.end); + data->pendingRemoveRanges[cVer].push_back(range); + // TODO(psm): When fetchKeys() is not used, make sure KV will remove the data, otherwise, removeDataShard() + // here. + TraceEvent(SevVerbose, "SSUnassignShard", data->thisServerID) + .detail("Range", range) + .detail("NowAssigned", nowAssigned) + .detail("Version", cVer) + .detail("NewShard", updatedShards.back().toString()); + } else if (!dataAvailable) { + if (version == data->initialClusterVersion - 1) { + TraceEvent(SevVerbose, "CSKWithPhysicalShardsSeedRange", data->thisServerID) + .detail("ShardID", desiredId) + .detail("Range", range); + changeNewestAvailable.emplace_back(range, latestVersion); + updatedShards.push_back( + StorageServerShard(range, version, desiredId, desiredId, StorageServerShard::ReadWrite)); + setAvailableStatus(data, range, true); + // Note: The initial range is available, however, the shard won't be created in the storage engine + // untill version is committed. + data->pendingAddRanges[cVer].emplace_back(desiredId, range); + TraceEvent(SevVerbose, "SSInitialShard", data->thisServerID) + .detail("Range", range) + .detail("NowAssigned", nowAssigned) + .detail("Version", cVer) + .detail("NewShard", updatedShards.back().toString()); + } else { + auto& shard = data->shards[range.begin]; + if (!shard->assigned()) { + updatedShards.push_back( + StorageServerShard(range, cVer, desiredId, desiredId, StorageServerShard::MovingIn)); + data->pendingAddRanges[cVer].emplace_back(desiredId, range); + data->newestDirtyVersion.insert(range, cVer); + TraceEvent(SevVerbose, "SSAssignShard", data->thisServerID) + .detail("Range", range) + .detail("NowAssigned", nowAssigned) + .detail("Version", cVer) + .detail("TotalAssignedAtVer", ++totalAssignedAtVer) + .detail("NewShard", updatedShards.back().toString()); + } else { + ASSERT(shard->adding != nullptr); + if (shard->desiredShardId != desiredId) { + TraceEvent(SevError, "CSKConflictingMoveInShards", data->thisServerID) + .detail("DataMoveID", dataMoveId) + .detail("Range", range) + .detail("TargetShard", desiredId) + .detail("CurrentShard", shard->desiredShardId) + .detail("Version", cVer); + // TODO(psm): Replace this with moveInShard->cancel(). + ASSERT(false); + } else { + TraceEvent(SevVerbose, "CSKMoveInToSameShard", data->thisServerID) + .detail("DataMoveID", dataMoveId) + .detailf("TargetShard", "%016llx", desiredId) + .detail("MoveRange", keys) + .detail("Range", range) + .detail("ExistingShardRange", shard->keys) + .detail("Version", cVer); + } + } + } + } else { + updatedShards.push_back(StorageServerShard( + range, cVer, data->shards[range.begin]->shardId, desiredId, StorageServerShard::ReadWrite)); + changeNewestAvailable.emplace_back(range, latestVersion); + TraceEvent(SevVerbose, "SSAssignShardAlreadyAvailable", data->thisServerID) + .detail("Range", range) + .detail("NowAssigned", nowAssigned) + .detail("Version", cVer) + .detail("NewShard", updatedShards.back().toString()); + } + } + + for (const auto& shard : updatedShards) { + data->addShard(ShardInfo::newShard(data, shard)); + updateStorageShard(data, shard); + } + + // Update newestAvailableVersion when a shard becomes (un)available (in a separate loop to avoid invalidating vr + // above) + for (auto r = changeNewestAvailable.begin(); r != changeNewestAvailable.end(); ++r) { + data->newestAvailableVersion.insert(r->first, r->second); + } + + if (!nowAssigned) { + data->metrics.notifyNotReadable(keys); + } + + coalescePhysicalShards(data, KeyRangeRef(ranges[0].begin, ranges[ranges.size() - 1].end)); + + // Now it is OK to do removeDataRanges, directly and through fetchKeys cancellation (and we have to do so before + // validate()) + oldShards.clear(); + ranges.clear(); + for (auto r = removeRanges.begin(); r != removeRanges.end(); ++r) { + removeDataRange(data, data->addVersionToMutationLog(data->data().getLatestVersion()), data->shards, *r); + setAvailableStatus(data, *r, false); + } + + // Clear the moving-in empty range, and set it available at the latestVersion. + for (const auto& range : newEmptyRanges) { + MutationRef clearRange(MutationRef::ClearRange, range.begin, range.end); + data->addMutation(data->data().getLatestVersion(), true, clearRange, range, data->updateEagerReads); + data->newestAvailableVersion.insert(range, latestVersion); + setAvailableStatus(data, range, true); + ++data->counters.kvSystemClearRanges; + } + validate(data); + // find any change feeds that no longer have shards on this server, and clean them up if (!nowAssigned) { cleanUpChangeFeeds(data, keys, version); @@ -6717,15 +7285,24 @@ private: // ignore data movements for tss in quarantine if (!data->isTSSInQuarantine()) { - // add changes in shard assignment to the mutation log - setAssignedStatus(data, keys, nowAssigned); - - // The changes for version have already been received (and are being processed now). We need to fetch - // the data for change.version-1 (changes from versions < change.version) - // If emptyRange, treat the shard as empty, see removeKeysFromFailedServer() for more details about this - // scenario. const ChangeServerKeysContext context = emptyRange ? CSK_ASSIGN_EMPTY : CSK_UPDATE; - changeServerKeys(data, keys, nowAssigned, currentVersion - 1, context); + if (data->shardAware) { + setAssignedStatus(data, keys, nowAssigned); + TraceEvent(SevDebug, "SSSetAssignedStatus", data->thisServerID) + .detail("Range", keys) + .detail("NowAssigned", nowAssigned) + .detail("Version", ver); + changeServerKeysWithPhysicalShards( + data, keys, dataMoveId, nowAssigned, currentVersion - 1, context); + } else { + // add changes in shard assignment to the mutation log + setAssignedStatus(data, keys, nowAssigned); + + // The changes for version have already been received (and are being processed now). We need to + // fetch the data for change.version-1 (changes from versions < change.version) If emptyRange, treat + // the shard as empty, see removeKeysFromFailedServer() for more details about this scenario. + changeServerKeys(data, keys, nowAssigned, currentVersion - 1, context); + } } processedStartKey = false; @@ -7639,7 +8216,9 @@ ACTOR Future createCheckpoint(StorageServer* data, CheckpointMetaData meta } ACTOR Future updateStorage(StorageServer* data) { + state UnlimitedCommitBytes unlimitedCommitBytes = UnlimitedCommitBytes::False; loop { + unlimitedCommitBytes = UnlimitedCommitBytes::False; ASSERT(data->durableVersion.get() == data->storageVersion()); if (g_network->isSimulated()) { double endTime = @@ -7678,7 +8257,7 @@ ACTOR Future updateStorage(StorageServer* data) { if (!data->pendingCheckpoints.empty()) { const Version cVer = data->pendingCheckpoints.begin()->first; if (cVer <= desiredVersion) { - TraceEvent("CheckpointVersionSatisfied", data->thisServerID) + TraceEvent(SevDebug, "CheckpointVersionSatisfied", data->thisServerID) .detail("DesiredVersion", desiredVersion) .detail("DurableVersion", data->durableVersion.get()) .detail("CheckPointVersion", cVer); @@ -7687,10 +8266,56 @@ ACTOR Future updateStorage(StorageServer* data) { } } + state bool removeKVSRanges = false; + if (!data->pendingRemoveRanges.empty()) { + const Version aVer = data->pendingRemoveRanges.begin()->first; + if (aVer <= desiredVersion) { + TraceEvent(SevDebug, "RemoveRangeVersionSatisfied", data->thisServerID) + .detail("DesiredVersion", desiredVersion) + .detail("DurableVersion", data->durableVersion.get()) + .detail("RemoveRangeVersion", aVer); + desiredVersion = aVer; + removeKVSRanges = true; + } + } + + state bool addedRanges = false; + if (!data->pendingAddRanges.empty()) { + const Version aVer = data->pendingAddRanges.begin()->first; + if (aVer <= desiredVersion) { + TraceEvent(SevDebug, "AddRangeVersionSatisfied", data->thisServerID) + .detail("DesiredVersion", desiredVersion) + .detail("DurableVersion", data->durableVersion.get()) + .detail("AddRangeVersion", aVer); + desiredVersion = aVer; + ASSERT(!data->pendingAddRanges.begin()->second.empty()); + TraceEvent(SevVerbose, "SSAddKVSRangeBegin", data->thisServerID) + .detail("Version", data->pendingAddRanges.begin()->first) + .detail("DurableVersion", data->durableVersion.get()) + .detail("NewRanges", describe(data->pendingAddRanges.begin()->second)); + state std::vector> fAddRanges; + for (const auto& shard : data->pendingAddRanges.begin()->second) { + TraceEvent(SevInfo, "SSAddKVSRange", data->thisServerID) + .detail("Range", shard.range) + .detail("PhysicalShardID", shard.shardId); + fAddRanges.push_back(data->storage.addRange(shard.range, shard.shardId)); + } + wait(waitForAll(fAddRanges)); + TraceEvent(SevVerbose, "SSAddKVSRangeEnd", data->thisServerID) + .detail("Version", data->pendingAddRanges.begin()->first) + .detail("DurableVersion", data->durableVersion.get()); + addedRanges = true; + // Remove commit byte limit to make sure the private mutaiton(s) associated with the + // `addRange` are committed. + unlimitedCommitBytes = UnlimitedCommitBytes::True; + } + } + // Write mutations to storage until we reach the desiredVersion or have written too much (bytesleft) state double beforeStorageUpdates = now(); loop { - state bool done = data->storage.makeVersionMutationsDurable(newOldestVersion, desiredVersion, bytesLeft); + state bool done = data->storage.makeVersionMutationsDurable( + newOldestVersion, desiredVersion, bytesLeft, unlimitedCommitBytes); if (data->tenantMap.getLatestVersion() < newOldestVersion) { data->tenantMap.createNewVersion(newOldestVersion); data->tenantPrefixIndex.createNewVersion(newOldestVersion); @@ -7709,6 +8334,32 @@ ACTOR Future updateStorage(StorageServer* data) { break; } + if (addedRanges) { + TraceEvent(SevVerbose, "SSAddKVSRangeMetaData", data->thisServerID) + .detail("NewDurableVersion", newOldestVersion) + .detail("DesiredVersion", desiredVersion) + .detail("OldestRemoveKVSRangesVersion", data->pendingAddRanges.begin()->first); + ASSERT(newOldestVersion == data->pendingAddRanges.begin()->first); + ASSERT(newOldestVersion == desiredVersion); + for (const auto& shard : data->pendingAddRanges.begin()->second) { + data->storage.persistRangeMapping(shard.range, true); + } + data->pendingAddRanges.erase(data->pendingAddRanges.begin()); + } + + if (removeKVSRanges) { + TraceEvent(SevDebug, "RemoveKVSRangesVersionDurable", data->thisServerID) + .detail("NewDurableVersion", newOldestVersion) + .detail("DesiredVersion", desiredVersion) + .detail("OldestRemoveKVSRangesVersion", data->pendingRemoveRanges.begin()->first); + ASSERT(newOldestVersion <= data->pendingRemoveRanges.begin()->first); + if (newOldestVersion == data->pendingRemoveRanges.begin()->first) { + for (const auto& range : data->pendingRemoveRanges.begin()->second) { + data->storage.persistRangeMapping(range, false); + } + } + } + std::set modifiedChangeFeeds = data->fetchingChangeFeeds; data->fetchingChangeFeeds.clear(); while (!data->changeFeedVersions.empty() && data->changeFeedVersions.front().second <= newOldestVersion) { @@ -7780,6 +8431,22 @@ ACTOR Future updateStorage(StorageServer* data) { debug_advanceMinCommittedVersion(data->thisServerID, newOldestVersion); + if (removeKVSRanges) { + TraceEvent(SevDebug, "RemoveKVSRangesComitted", data->thisServerID) + .detail("NewDurableVersion", newOldestVersion) + .detail("DesiredVersion", desiredVersion) + .detail("OldestRemoveKVSRangesVersion", data->pendingRemoveRanges.begin()->first); + ASSERT(newOldestVersion <= data->pendingRemoveRanges.begin()->first); + if (newOldestVersion == data->pendingRemoveRanges.begin()->first) { + for (const auto& range : data->pendingRemoveRanges.begin()->second) { + data->storage.removeRange(range); + TraceEvent(SevVerbose, "RemoveKVSRange", data->thisServerID).detail("Range", range); + } + data->pendingRemoveRanges.erase(data->pendingRemoveRanges.begin()); + } + removeKVSRanges = false; + } + if (requireCheckpoint) { // `pendingCheckpoints` is a queue of checkpoint requests ordered by their versoins, and // `newOldestVersion` is chosen such that it is no larger than the smallest pending checkpoing @@ -7907,6 +8574,7 @@ ACTOR Future updateStorage(StorageServer* data) { //TraceEvent("StorageServerDurable", data->thisServerID).detail("Version", newOldestVersion); data->fetchKeysBytesBudget = SERVER_KNOBS->STORAGE_FETCH_BYTES; + data->fetchKeysBudgetUsed.set(false); if (!data->fetchKeysBudgetUsed.get()) { wait(durableDelay || data->fetchKeysBudgetUsed.onChange()); @@ -7923,8 +8591,12 @@ ACTOR Future updateStorage(StorageServer* data) { #pragma region StorageServerDisk #endif -void StorageServerDisk::makeNewStorageServerDurable() { - storage->set(persistFormat); +void StorageServerDisk::makeNewStorageServerDurable(const bool shardAware) { + if (shardAware) { + storage->set(persistShardAwareFormat); + } else { + storage->set(persistFormat); + } storage->set(KeyValueRef(persistID, BinaryWriter::toValue(data->thisServerID, Unversioned()))); if (data->tssPairID.present()) { storage->set(KeyValueRef(persistTssPairID, BinaryWriter::toValue(data->tssPairID.get(), Unversioned()))); @@ -7933,8 +8605,14 @@ void StorageServerDisk::makeNewStorageServerDurable() { storage->set( KeyValueRef(persistClusterIdKey, BinaryWriter::toValue(data->clusterId.getFuture().get(), Unversioned()))); storage->set(KeyValueRef(persistVersion, BinaryWriter::toValue(data->version.get(), Unversioned()))); - storage->set(KeyValueRef(persistShardAssignedKeys.begin.toString(), LiteralStringRef("0"))); - storage->set(KeyValueRef(persistShardAvailableKeys.begin.toString(), LiteralStringRef("0"))); + + if (shardAware) { + storage->set(KeyValueRef(persistStorageServerShardKeys.begin.toString(), + ObjectWriter::toValue(StorageServerShard::notAssigned(allKeys, 0), IncludeVersion()))); + } else { + storage->set(KeyValueRef(persistShardAssignedKeys.begin.toString(), LiteralStringRef("0"))); + storage->set(KeyValueRef(persistShardAvailableKeys.begin.toString(), LiteralStringRef("0"))); + } auto view = data->tenantMap.atLatest(); for (auto itr = view.begin(); itr != view.end(); ++itr) { @@ -7984,6 +8662,38 @@ void setAvailableStatus(StorageServer* self, KeyRangeRef keys, bool available) { } } +void updateStorageShard(StorageServer* data, StorageServerShard shard) { + if (shard.getShardState() == StorageServerShard::ReadWritePending) { + shard.setShardState(StorageServerShard::ReadWrite); + } + + auto& mLV = data->addVersionToMutationLog(data->data().getLatestVersion()); + + KeyRange shardKeys = KeyRangeRef(persistStorageServerShardKeys.begin.toString() + shard.range.begin.toString(), + persistStorageServerShardKeys.begin.toString() + shard.range.end.toString()); + TraceEvent(SevVerbose, "UpdateStorageServerShard", data->thisServerID) + .detail("Version", mLV.version) + .detail("Shard", shard.toString()) + .detail("ShardKey", shardKeys.begin); + + data->addMutationToMutationLog(mLV, MutationRef(MutationRef::ClearRange, shardKeys.begin, shardKeys.end)); + ++data->counters.kvSystemClearRanges; + data->addMutationToMutationLog( + mLV, MutationRef(MutationRef::SetValue, shardKeys.begin, ObjectWriter::toValue(shard, IncludeVersion()))); + if (shard.range.end != allKeys.end) { + StorageServerShard endShard = data->shards.rangeContaining(shard.range.end)->value()->toStorageServerShard(); + if (endShard.getShardState() == StorageServerShard::ReadWritePending) { + endShard.setShardState(StorageServerShard::ReadWrite); + } + TraceEvent(SevVerbose, "UpdateStorageServerShardEndShard", data->thisServerID) + .detail("Version", mLV.version) + .detail("Shard", endShard.toString()) + .detail("ShardKey", shardKeys.end); + data->addMutationToMutationLog( + mLV, MutationRef(MutationRef::SetValue, shardKeys.end, ObjectWriter::toValue(endShard, IncludeVersion()))); + } +} + void setAssignedStatus(StorageServer* self, KeyRangeRef keys, bool nowAssigned) { ASSERT(!keys.empty()); auto& mLV = self->addVersionToMutationLog(self->data().getLatestVersion()); @@ -8043,8 +8753,9 @@ void StorageServerDisk::writeMutations(const VectorRef& mutations, bool StorageServerDisk::makeVersionMutationsDurable(Version& prevStorageVersion, Version newStorageVersion, - int64_t& bytesLeft) { - if (bytesLeft <= 0) + int64_t& bytesLeft, + UnlimitedCommitBytes unlimitedCommitBytes) { + if (!unlimitedCommitBytes && bytesLeft <= 0) return true; // Apply mutations from the mutationLog @@ -8254,6 +8965,7 @@ ACTOR Future restoreDurableState(StorageServer* data, IKeyValueStore* stor state Future fPendingCheckpoints = storage->readRange(persistPendingCheckpointKeys); state Future fCheckpoints = storage->readRange(persistCheckpointKeys); state Future fTenantMap = storage->readRange(persistTenantMapKeys); + state Future fStorageShards = storage->readRange(persistStorageServerShardKeys); state Promise byteSampleSampleRecovered; state Promise startByteSampleRestore; @@ -8263,8 +8975,13 @@ ACTOR Future restoreDurableState(StorageServer* data, IKeyValueStore* stor TraceEvent("ReadingDurableState", data->thisServerID).log(); wait(waitForAll(std::vector{ fFormat, fID, fClusterID, ftssPairID, fssPairID, fTssQuarantine, fVersion, fLogProtocol, fPrimaryLocality })); - wait(waitForAll( - std::vector{ fShardAssigned, fShardAvailable, fChangeFeeds, fPendingCheckpoints, fCheckpoints, fTenantMap })); + wait(waitForAll(std::vector{ fShardAssigned, + fShardAvailable, + fChangeFeeds, + fPendingCheckpoints, + fCheckpoints, + fTenantMap, + fStorageShards })); wait(byteSampleSampleRecovered.getFuture()); TraceEvent("RestoringDurableState", data->thisServerID).log(); @@ -8275,6 +8992,9 @@ ACTOR Future restoreDurableState(StorageServer* data, IKeyValueStore* stor data->thisServerID = UID(); data->sk = Key(); return false; + } else { + TraceEvent(SevVerbose, "RestoringStorageServerWithPhysicalShards", data->thisServerID).log(); + data->shardAware = fFormat.get().get() == persistShardAwareFormat.value; } data->bytesRestored += fFormat.get().expectedSize(); if (!persistFormatReadableRange.contains(fFormat.get().get())) { @@ -8369,20 +9089,25 @@ ACTOR Future restoreDurableState(StorageServer* data, IKeyValueStore* stor state RangeResult assigned = fShardAssigned.get(); data->bytesRestored += assigned.logicalSize(); state int assignedLoc; - for (assignedLoc = 0; assignedLoc < assigned.size(); assignedLoc++) { - KeyRangeRef keys(assigned[assignedLoc].key.removePrefix(persistShardAssignedKeys.begin), - assignedLoc + 1 == assigned.size() - ? allKeys.end - : assigned[assignedLoc + 1].key.removePrefix(persistShardAssignedKeys.begin)); - ASSERT(!keys.empty()); - bool nowAssigned = assigned[assignedLoc].value != LiteralStringRef("0"); - /*if(nowAssigned) - TraceEvent("AssignedShard", data->thisServerID).detail("RangeBegin", keys.begin).detail("RangeEnd", keys.end);*/ - changeServerKeys(data, keys, nowAssigned, version, CSK_RESTORE); + if (data->shardAware) { + // TODO(psm): Avoid copying RangeResult around. + wait(restoreShards(data, version, fStorageShards.get(), assigned, available)); + } else { + for (assignedLoc = 0; assignedLoc < assigned.size(); assignedLoc++) { + KeyRangeRef keys(assigned[assignedLoc].key.removePrefix(persistShardAssignedKeys.begin), + assignedLoc + 1 == assigned.size() + ? allKeys.end + : assigned[assignedLoc + 1].key.removePrefix(persistShardAssignedKeys.begin)); + ASSERT(!keys.empty()); + bool nowAssigned = assigned[assignedLoc].value != LiteralStringRef("0"); + /*if(nowAssigned) + TraceEvent("AssignedShard", data->thisServerID).detail("RangeBegin", keys.begin).detail("RangeEnd", keys.end);*/ + changeServerKeys(data, keys, nowAssigned, version, CSK_RESTORE); - if (!nowAssigned) - ASSERT(data->newestAvailableVersion.allEqual(keys, invalidVersion)); - wait(yield()); + if (!nowAssigned) + ASSERT(data->newestAvailableVersion.allEqual(keys, invalidVersion)); + wait(yield()); + } } state RangeResult changeFeeds = fChangeFeeds.get(); @@ -9445,6 +10170,8 @@ ACTOR Future storageServer(IKeyValueStore* persistentData, Reference const> db, std::string folder) { state StorageServer self(persistentData, db, ssi); + self.shardAware = CLIENT_KNOBS->SHARD_ENCODE_LOCATION_METADATA && + (SERVER_KNOBS->STORAGE_SERVER_SHARD_AWARE || persistentData->shardAware()); state Future ssCore; self.clusterId.send(clusterId); self.initialClusterVersion = startVersion; @@ -9481,7 +10208,7 @@ ACTOR Future storageServer(IKeyValueStore* persistentData, self.tag = seedTag; } - self.storage.makeNewStorageServerDurable(); + self.storage.makeNewStorageServerDurable(self.shardAware); wait(self.storage.commit()); ++self.counters.kvCommits; diff --git a/fdbserver/workloads/PhysicalShardMove.actor.cpp b/fdbserver/workloads/PhysicalShardMove.actor.cpp index bd5e08dc9b..2c59de681d 100644 --- a/fdbserver/workloads/PhysicalShardMove.actor.cpp +++ b/fdbserver/workloads/PhysicalShardMove.actor.cpp @@ -1,5 +1,5 @@ /* - *PhysicalShardMove.actor.cpp + * PhysicalShardMove.cpp * * This source file is part of the FoundationDB open source project * @@ -43,11 +43,14 @@ std::string printValue(const ErrorOr>& value) { } } // namespace -struct SSCheckpointWorkload : TestWorkload { +struct PhysicalShardMoveWorkLoad : TestWorkload { + FlowLock startMoveKeysParallelismLock; + FlowLock finishMoveKeysParallelismLock; + FlowLock cleanUpDataMoveParallelismLock; const bool enabled; bool pass; - SSCheckpointWorkload(WorkloadContext const& wcx) : TestWorkload(wcx), enabled(!clientId), pass(true) {} + PhysicalShardMoveWorkLoad(WorkloadContext const& wcx) : TestWorkload(wcx), enabled(!clientId), pass(true) {} void validationFailed(ErrorOr> expectedValue, ErrorOr> actualValue) { TraceEvent(SevError, "TestFailed") @@ -56,7 +59,7 @@ struct SSCheckpointWorkload : TestWorkload { pass = false; } - std::string description() const override { return "SSCheckpoint"; } + std::string description() const override { return "PhysicalShardMove"; } Future setup(Database const& cx) override { return Void(); } @@ -67,126 +70,153 @@ struct SSCheckpointWorkload : TestWorkload { return _start(this, cx); } - ACTOR Future _start(SSCheckpointWorkload* self, Database cx) { - state Key key = "TestKey"_sr; - state Key endKey = "TestKey0"_sr; - state Value oldValue = "TestValue"_sr; - state KeyRange testRange = KeyRangeRef(key, endKey); - + ACTOR Future _start(PhysicalShardMoveWorkLoad* self, Database cx) { int ignore = wait(setDDMode(cx, 0)); - state Version version = wait(self->writeAndVerify(self, cx, key, oldValue)); + state std::map kvs({ { "TestKeyA"_sr, "TestValueA"_sr }, + { "TestKeyB"_sr, "TestValueB"_sr }, + { "TestKeyC"_sr, "TestValueC"_sr }, + { "TestKeyD"_sr, "TestValueD"_sr }, + { "TestKeyE"_sr, "TestValueE"_sr }, + { "TestKeyF"_sr, "TestValueF"_sr } }); - // Create checkpoint. - state Transaction tr(cx); - state CheckpointFormat format = deterministicRandom()->coinflip() ? RocksDBColumnFamily : RocksDB; - loop { - try { - tr.setOption(FDBTransactionOptions::LOCK_AWARE); - tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); - wait(createCheckpoint(&tr, testRange, format)); - wait(tr.commit()); - version = tr.getCommittedVersion(); - break; - } catch (Error& e) { - wait(tr.onError(e)); - } + Version _ = wait(self->populateData(self, cx, &kvs)); + + TraceEvent("TestValueWritten").log(); + + state std::unordered_set excludes; + state std::unordered_set includes; + state int teamSize = 1; + std::vector teamA = wait(self->moveShard(self, + cx, + deterministicRandom()->randomUniqueID(), + KeyRangeRef("TestKeyA"_sr, "TestKeyF"_sr), + teamSize, + includes, + excludes)); + excludes.insert(teamA.begin(), teamA.end()); + + state uint64_t sh0 = deterministicRandom()->randomUInt64(); + state uint64_t sh1 = deterministicRandom()->randomUInt64(); + state uint64_t sh2 = deterministicRandom()->randomUInt64(); + + // Move range [TestKeyA, TestKeyB) to sh0. + state std::vector teamA = wait(self->moveShard(self, + cx, + UID(sh0, deterministicRandom()->randomUInt64()), + KeyRangeRef("TestKeyA"_sr, "TestKeyB"_sr), + teamSize, + includes, + excludes)); + includes.insert(teamA.begin(), teamA.end()); + // Move range [TestKeyB, TestKeyC) to sh1, on the same server. + state std::vector teamB = wait(self->moveShard(self, + cx, + UID(sh1, deterministicRandom()->randomUInt64()), + KeyRangeRef("TestKeyB"_sr, "TestKeyC"_sr), + teamSize, + includes, + excludes)); + ASSERT(std::equal(teamA.begin(), teamA.end(), teamB.begin())); + + state int teamIdx = 0; + for (teamIdx = 0; teamIdx < teamA.size(); ++teamIdx) { + std::vector shards = + wait(self->getStorageServerShards(cx, teamA[teamIdx], KeyRangeRef("TestKeyA"_sr, "TestKeyC"_sr))); + ASSERT(shards.size() == 2); + ASSERT(shards[0].desiredId == sh0); + ASSERT(shards[1].desiredId == sh1); + TraceEvent("TestStorageServerShards", teamA[teamIdx]).detail("Shards", describe(shards)); } - TraceEvent("TestCheckpointCreated").detail("Range", testRange).detail("Version", version); + state std::vector teamC = wait(self->moveShard(self, + cx, + UID(sh2, deterministicRandom()->randomUInt64()), + KeyRangeRef("TestKeyB"_sr, "TestKeyC"_sr), + teamSize, + includes, + excludes)); + ASSERT(std::equal(teamA.begin(), teamA.end(), teamC.begin())); - // Fetch checkpoint meta data. - loop { - try { - state std::vector records = - wait(getCheckpointMetaData(cx, testRange, version, format)); - break; - } catch (Error& e) { - TraceEvent("TestFetchCheckpointMetadataError") - .errorUnsuppressed(e) - .detail("Range", testRange) - .detail("Version", version); - - // The checkpoint was just created, we don't expect this error. - ASSERT(e.code() != error_code_checkpoint_not_found); - } + for (teamIdx = 0; teamIdx < teamA.size(); ++teamIdx) { + std::vector shards = + wait(self->getStorageServerShards(cx, teamA[teamIdx], KeyRangeRef("TestKeyA"_sr, "TestKeyC"_sr))); + ASSERT(shards.size() == 2); + ASSERT(shards[0].desiredId == sh0); + ASSERT(shards[1].id == sh1); + ASSERT(shards[1].desiredId == sh2); + TraceEvent("TestStorageServerShards", teamA[teamIdx]).detail("Shards", describe(shards)); } - TraceEvent("TestCheckpointFetched") - .detail("Range", testRange) - .detail("Version", version) - .detail("Checkpoints", describe(records)); - - state std::string pwd = platform::getWorkingDirectory(); - state std::string folder = pwd + "/checkpoints"; - platform::eraseDirectoryRecursive(folder); - ASSERT(platform::createDirectory(folder)); - - // Fetch checkpoint. - state std::vector fetchedCheckpoints; - state int i = 0; - for (; i < records.size(); ++i) { - loop { - TraceEvent("TestFetchingCheckpoint").detail("Checkpoint", records[i].toString()); - try { - state CheckpointMetaData record = wait(fetchCheckpoint(cx, records[0], folder)); - fetchedCheckpoints.push_back(record); - TraceEvent("TestCheckpointFetched").detail("Checkpoint", record.toString()); - break; - } catch (Error& e) { - TraceEvent("TestFetchCheckpointError") - .errorUnsuppressed(e) - .detail("Checkpoint", records[i].toString()); - wait(delay(1)); - } - } - } - - state std::string rocksDBTestDir = "rocksdb-kvstore-test-db"; - platform::eraseDirectoryRecursive(rocksDBTestDir); - - // Restore KVS. - state IKeyValueStore* kvStore = keyValueStoreRocksDB( - rocksDBTestDir, deterministicRandom()->randomUniqueID(), KeyValueStoreType::SSD_ROCKSDB_V1); - wait(kvStore->init()); - try { - wait(kvStore->restore(fetchedCheckpoints)); - } catch (Error& e) { - TraceEvent(SevError, "TestRestoreCheckpointError") - .errorUnsuppressed(e) - .detail("Checkpoint", describe(records)); - } - - // Compare the keyrange between the original database and the one restored from checkpoint. - // For now, it should have been a single key. - tr.reset(); - loop { - try { - tr.setOption(FDBTransactionOptions::LOCK_AWARE); - state RangeResult res = wait(tr.getRange(KeyRangeRef(key, endKey), CLIENT_KNOBS->TOO_MANY)); - break; - } catch (Error& e) { - wait(tr.onError(e)); - } - } - - RangeResult kvRange = wait(kvStore->readRange(testRange)); - ASSERT(res.size() == kvRange.size()); - for (int i = 0; i < res.size(); ++i) { - ASSERT(res[i] == kvRange[i]); - } + wait(self->validateData(self, cx, KeyRangeRef("TestKeyA"_sr, "TestKeyF"_sr), &kvs)); + TraceEvent("TestValueVerified").log(); int ignore = wait(setDDMode(cx, 1)); return Void(); } - ACTOR Future readAndVerify(SSCheckpointWorkload* self, + ACTOR Future populateData(PhysicalShardMoveWorkLoad* self, Database cx, std::map* kvs) { + state Reference tr = makeReference(cx); + state Version version; + loop { + state UID debugID = deterministicRandom()->randomUniqueID(); + try { + tr->debugTransaction(debugID); + for (const auto& [key, value] : *kvs) { + tr->set(key, value); + } + wait(tr->commit()); + version = tr->getCommittedVersion(); + break; + } catch (Error& e) { + TraceEvent("TestCommitError").errorUnsuppressed(e); + wait(tr->onError(e)); + } + } + + TraceEvent("PopulateTestDataDone") + .detail("CommitVersion", tr->getCommittedVersion()) + .detail("DebugID", debugID); + + return version; + } + + ACTOR Future validateData(PhysicalShardMoveWorkLoad* self, + Database cx, + KeyRange range, + std::map* kvs) { + state Transaction tr(cx); + loop { + state UID debugID = deterministicRandom()->randomUniqueID(); + try { + tr.debugTransaction(debugID); + RangeResult res = wait(tr.getRange(range, CLIENT_KNOBS->TOO_MANY)); + ASSERT(!res.more && res.size() < CLIENT_KNOBS->TOO_MANY); + + for (const auto& kv : res) { + ASSERT((*kvs)[kv.key] == kv.value); + } + break; + } catch (Error& e) { + TraceEvent("TestCommitError").errorUnsuppressed(e); + wait(tr.onError(e)); + } + } + + TraceEvent("ValidateTestDataDone").detail("DebugID", debugID); + + return Void(); + } + + ACTOR Future readAndVerify(PhysicalShardMoveWorkLoad* self, Database cx, Key key, ErrorOr> expectedValue) { state Transaction tr(cx); + tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); loop { try { + state Version readVersion = wait(tr.getReadVersion()); state Optional res = wait(timeoutError(tr.get(key), 30.0)); const bool equal = !expectedValue.isError() && res == expectedValue.get(); if (!equal) { @@ -194,6 +224,7 @@ struct SSCheckpointWorkload : TestWorkload { } break; } catch (Error& e) { + TraceEvent("TestReadError").errorUnsuppressed(e); if (expectedValue.isError() && expectedValue.getError().code() == e.code()) { break; } @@ -201,35 +232,146 @@ struct SSCheckpointWorkload : TestWorkload { } } + TraceEvent("TestReadSuccess").detail("Version", readVersion); + return Void(); } - ACTOR Future writeAndVerify(SSCheckpointWorkload* self, Database cx, Key key, Optional value) { - state Transaction tr(cx); + ACTOR Future writeAndVerify(PhysicalShardMoveWorkLoad* self, Database cx, Key key, Optional value) { + // state Transaction tr(cx); + state Reference tr = makeReference(cx); state Version version; loop { + state UID debugID = deterministicRandom()->randomUniqueID(); try { + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr->debugTransaction(debugID); if (value.present()) { - tr.set(key, value.get()); + tr->set(key, value.get()); + tr->set("Test?"_sr, value.get()); + tr->set(key, value.get()); } else { - tr.clear(key); + tr->clear(key); } - wait(timeoutError(tr.commit(), 30.0)); - version = tr.getCommittedVersion(); + wait(timeoutError(tr->commit(), 30.0)); + version = tr->getCommittedVersion(); break; } catch (Error& e) { - wait(tr.onError(e)); + TraceEvent("TestCommitError").errorUnsuppressed(e); + wait(tr->onError(e)); } } + TraceEvent("TestCommitSuccess").detail("CommitVersion", tr->getCommittedVersion()).detail("DebugID", debugID); + wait(self->readAndVerify(self, cx, key, value)); return version; } + // Move keys to a random selected team consisting of a single SS, after disabling DD, so that keys won't be + // kept in the new team until DD is enabled. + // Returns the address of the single SS of the new team. + ACTOR Future> moveShard(PhysicalShardMoveWorkLoad* self, + Database cx, + UID dataMoveId, + KeyRange keys, + int teamSize, + std::unordered_set includes, + std::unordered_set excludes) { + // Disable DD to avoid DD undoing of our move. + int ignore = wait(setDDMode(cx, 0)); + + // Pick a random SS as the dest, keys will reside on a single server after the move. + std::vector interfs = wait(getStorageServers(cx)); + ASSERT(interfs.size() > teamSize - includes.size()); + while (includes.size() < teamSize) { + const auto& interf = interfs[deterministicRandom()->randomInt(0, interfs.size())]; + if (excludes.count(interf.uniqueID) == 0 && includes.count(interf.uniqueID) == 0) { + includes.insert(interf.uniqueID); + } + } + + state std::vector dests(includes.begin(), includes.end()); + state UID owner = deterministicRandom()->randomUniqueID(); + // state Key ownerKey = "\xff/moveKeysLock/Owner"_sr; + state DDEnabledState ddEnabledState; + + state Transaction tr(cx); + + loop { + try { + TraceEvent("TestMoveShard").detail("Range", keys.toString()); + state MoveKeysLock moveKeysLock = wait(takeMoveKeysLock(cx, owner)); + + tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + state RangeResult dataMoves = wait(tr.getRange(dataMoveKeys, CLIENT_KNOBS->TOO_MANY)); + Version readVersion = wait(tr.getReadVersion()); + TraceEvent("TestMoveShardReadDataMoves") + .detail("DataMoves", dataMoves.size()) + .detail("ReadVersion", readVersion); + state int i = 0; + for (; i < dataMoves.size(); ++i) { + UID dataMoveId = decodeDataMoveKey(dataMoves[i].key); + state DataMoveMetaData dataMove = decodeDataMoveValue(dataMoves[i].value); + ASSERT(dataMoveId == dataMove.id); + TraceEvent("TestCancelDataMoveBegin").detail("DataMove", dataMove.toString()); + wait(cleanUpDataMove(cx, + dataMoveId, + moveKeysLock, + &self->cleanUpDataMoveParallelismLock, + dataMove.range, + &ddEnabledState)); + TraceEvent("TestCancelDataMoveEnd").detail("DataMove", dataMove.toString()); + } + + wait(moveKeys(cx, + dataMoveId, + keys, + dests, + dests, + moveKeysLock, + Promise(), + &self->startMoveKeysParallelismLock, + &self->finishMoveKeysParallelismLock, + false, + deterministicRandom()->randomUniqueID(), // for logging only + &ddEnabledState)); + break; + } catch (Error& e) { + if (e.code() == error_code_movekeys_conflict) { + // Conflict on moveKeysLocks with the current running DD is expected, just retry. + tr.reset(); + } else { + wait(tr.onError(e)); + } + } + } + + TraceEvent("TestMoveShardComplete").detail("Range", keys.toString()).detail("NewTeam", describe(dests)); + + return dests; + } + + ACTOR Future> getStorageServerShards(Database cx, UID ssId, KeyRange range) { + state Transaction tr(cx); + loop { + try { + Optional serverListValue = wait(tr.get(serverListKeyFor(ssId))); + ASSERT(serverListValue.present()); + state StorageServerInterface ssi = decodeServerListValue(serverListValue.get()); + GetShardStateRequest req(range, GetShardStateRequest::READABLE, true); + GetShardStateReply rep = wait(ssi.getShardState.getReply(req, TaskPriority::DefaultEndpoint)); + return rep.shards; + } catch (Error& e) { + wait(tr.onError(e)); + } + } + } + Future check(Database const& cx) override { return pass; } void getMetrics(std::vector& m) override {} }; -WorkloadFactory SSCheckpointWorkloadFactory("SSCheckpointWorkload"); \ No newline at end of file +WorkloadFactory PhysicalShardMoveWorkLoadFactory("PhysicalShardMove"); \ No newline at end of file diff --git a/fdbserver/workloads/StorageServerCheckpointRestoreTest.actor.cpp b/fdbserver/workloads/StorageServerCheckpointRestoreTest.actor.cpp new file mode 100644 index 0000000000..80a82cc537 --- /dev/null +++ b/fdbserver/workloads/StorageServerCheckpointRestoreTest.actor.cpp @@ -0,0 +1,238 @@ +/* + *PhysicalShardMove.actor.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "fdbclient/ManagementAPI.actor.h" +#include "fdbclient/NativeAPI.actor.h" +#include "fdbrpc/simulator.h" +#include "fdbserver/IKeyValueStore.h" +#include "fdbserver/ServerCheckpoint.actor.h" +#include "fdbserver/MoveKeys.actor.h" +#include "fdbserver/QuietDatabase.h" +#include "fdbserver/workloads/workloads.actor.h" +#include "flow/Error.h" +#include "flow/IRandom.h" +#include "flow/flow.h" +#include +#include + +#include "flow/actorcompiler.h" // This must be the last #include. + +namespace { +std::string printValue(const ErrorOr>& value) { + if (value.isError()) { + return value.getError().name(); + } + return value.get().present() ? value.get().get().toString() : "Value Not Found."; +} +} // namespace + +struct SSCheckpointRestoreWorkload : TestWorkload { + const bool enabled; + bool pass; + + SSCheckpointRestoreWorkload(WorkloadContext const& wcx) : TestWorkload(wcx), enabled(!clientId), pass(true) {} + + void validationFailed(ErrorOr> expectedValue, ErrorOr> actualValue) { + TraceEvent(SevError, "TestFailed") + .detail("ExpectedValue", printValue(expectedValue)) + .detail("ActualValue", printValue(actualValue)); + pass = false; + } + + std::string description() const override { return "SSCheckpoint"; } + + Future setup(Database const& cx) override { return Void(); } + + Future start(Database const& cx) override { + if (!enabled) { + return Void(); + } + return _start(this, cx); + } + + ACTOR Future _start(SSCheckpointRestoreWorkload* self, Database cx) { + state Key key = "TestKey"_sr; + state Key endKey = "TestKey0"_sr; + state Value oldValue = "TestValue"_sr; + state KeyRange testRange = KeyRangeRef(key, endKey); + + int ignore = wait(setDDMode(cx, 0)); + state Version version = wait(self->writeAndVerify(self, cx, key, oldValue)); + + // Create checkpoint. + state Transaction tr(cx); + state CheckpointFormat format = deterministicRandom()->coinflip() ? RocksDBColumnFamily : RocksDB; + loop { + try { + tr.setOption(FDBTransactionOptions::LOCK_AWARE); + tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + wait(createCheckpoint(&tr, testRange, format)); + wait(tr.commit()); + version = tr.getCommittedVersion(); + break; + } catch (Error& e) { + wait(tr.onError(e)); + } + } + + TraceEvent("TestCheckpointCreated").detail("Range", testRange).detail("Version", version); + + // Fetch checkpoint meta data. + loop { + try { + state std::vector records = + wait(getCheckpointMetaData(cx, testRange, version, format)); + break; + } catch (Error& e) { + TraceEvent("TestFetchCheckpointMetadataError") + .errorUnsuppressed(e) + .detail("Range", testRange) + .detail("Version", version); + + // The checkpoint was just created, we don't expect this error. + ASSERT(e.code() != error_code_checkpoint_not_found); + } + } + + TraceEvent("TestCheckpointFetched") + .detail("Range", testRange) + .detail("Version", version) + .detail("Checkpoints", describe(records)); + + state std::string pwd = platform::getWorkingDirectory(); + state std::string folder = pwd + "/checkpoints"; + platform::eraseDirectoryRecursive(folder); + ASSERT(platform::createDirectory(folder)); + + // Fetch checkpoint. + state std::vector fetchedCheckpoints; + state int i = 0; + for (; i < records.size(); ++i) { + loop { + TraceEvent("TestFetchingCheckpoint").detail("Checkpoint", records[i].toString()); + try { + state CheckpointMetaData record = wait(fetchCheckpoint(cx, records[0], folder)); + fetchedCheckpoints.push_back(record); + TraceEvent("TestCheckpointFetched").detail("Checkpoint", record.toString()); + break; + } catch (Error& e) { + TraceEvent("TestFetchCheckpointError") + .errorUnsuppressed(e) + .detail("Checkpoint", records[i].toString()); + wait(delay(1)); + } + } + } + + state std::string rocksDBTestDir = "rocksdb-kvstore-test-db"; + platform::eraseDirectoryRecursive(rocksDBTestDir); + + // Restore KVS. + state IKeyValueStore* kvStore = keyValueStoreRocksDB( + rocksDBTestDir, deterministicRandom()->randomUniqueID(), KeyValueStoreType::SSD_ROCKSDB_V1); + wait(kvStore->init()); + try { + wait(kvStore->restore(fetchedCheckpoints)); + } catch (Error& e) { + TraceEvent(SevError, "TestRestoreCheckpointError") + .errorUnsuppressed(e) + .detail("Checkpoint", describe(records)); + } + + // Compare the keyrange between the original database and the one restored from checkpoint. + // For now, it should have been a single key. + tr.reset(); + loop { + try { + tr.setOption(FDBTransactionOptions::LOCK_AWARE); + state RangeResult res = wait(tr.getRange(KeyRangeRef(key, endKey), CLIENT_KNOBS->TOO_MANY)); + break; + } catch (Error& e) { + wait(tr.onError(e)); + } + } + + RangeResult kvRange = wait(kvStore->readRange(testRange)); + ASSERT(res.size() == kvRange.size()); + for (int i = 0; i < res.size(); ++i) { + ASSERT(res[i] == kvRange[i]); + } + + int ignore = wait(setDDMode(cx, 1)); + return Void(); + } + + ACTOR Future readAndVerify(SSCheckpointRestoreWorkload* self, + Database cx, + Key key, + ErrorOr> expectedValue) { + state Transaction tr(cx); + + loop { + try { + state Optional res = wait(timeoutError(tr.get(key), 30.0)); + const bool equal = !expectedValue.isError() && res == expectedValue.get(); + if (!equal) { + self->validationFailed(expectedValue, ErrorOr>(res)); + } + break; + } catch (Error& e) { + if (expectedValue.isError() && expectedValue.getError().code() == e.code()) { + break; + } + wait(tr.onError(e)); + } + } + + return Void(); + } + + ACTOR Future writeAndVerify(SSCheckpointRestoreWorkload* self, + Database cx, + Key key, + Optional value) { + state Transaction tr(cx); + state Version version; + loop { + try { + if (value.present()) { + tr.set(key, value.get()); + } else { + tr.clear(key); + } + wait(timeoutError(tr.commit(), 30.0)); + version = tr.getCommittedVersion(); + break; + } catch (Error& e) { + wait(tr.onError(e)); + } + } + + wait(self->readAndVerify(self, cx, key, value)); + + return version; + } + + Future check(Database const& cx) override { return pass; } + + void getMetrics(std::vector& m) override {} +}; + +WorkloadFactory SSCheckpointRestoreWorkloadFactory("SSCheckpointRestoreWorkload"); \ No newline at end of file diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 0f148297ef..9efe533f97 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -192,10 +192,12 @@ if(WITH_PYTHON) add_fdb_test(TEST_FILES noSim/KeyValueStoreRocksDBTest.toml) add_fdb_test(TEST_FILES noSim/ShardedRocksDBTest.toml UNIT) add_fdb_test(TEST_FILES fast/PhysicalShardMove.toml) + add_fdb_test(TEST_FILES fast/StorageServerCheckpointRestore.toml) else() add_fdb_test(TEST_FILES noSim/KeyValueStoreRocksDBTest.toml IGNORE) add_fdb_test(TEST_FILES noSim/ShardedRocksDBTest.toml IGNORE) add_fdb_test(TEST_FILES fast/PhysicalShardMove.toml IGNORE) + add_fdb_test(TEST_FILES fast/StorageServerCheckpointRestore.toml IGNORE) endif() add_fdb_test(TEST_FILES rare/CheckRelocation.toml) add_fdb_test(TEST_FILES rare/ClogUnclog.toml) diff --git a/tests/fast/PhysicalShardMove.toml b/tests/fast/PhysicalShardMove.toml index 66948ce559..bb0a28068e 100644 --- a/tests/fast/PhysicalShardMove.toml +++ b/tests/fast/PhysicalShardMove.toml @@ -6,9 +6,13 @@ coordinators = 3 machineCount = 15 allowDefaultTenant = false +[[knobs]] +shard_encode_location_metadata = true +storage_server_shard_aware = true + [[test]] testTitle = 'PhysicalShardMove' useDB = true [[test.workload]] - testName = 'SSCheckpointWorkload' + testName = 'PhysicalShardMove' diff --git a/tests/fast/StorageServerCheckpointRestore.toml b/tests/fast/StorageServerCheckpointRestore.toml new file mode 100644 index 0000000000..ab81c70a7b --- /dev/null +++ b/tests/fast/StorageServerCheckpointRestore.toml @@ -0,0 +1,14 @@ +[configuration] +config = 'triple' +storageEngineType = 4 +processesPerMachine = 1 +coordinators = 3 +machineCount = 15 +allowDefaultTenant = false + +[[test]] +testTitle = 'SSCheckpointRestoreWorkload' +useDB = true + + [[test.workload]] + testName = 'SSCheckpointRestoreWorkload'