DDConfiguration class for modeling user specified key range configuration options. Added KeyBackedRangeMapSnapshot, some other supporting changes to KeyBackedTypes. Added invalidKey to give KeyBackedTypes a safe prefix to avoid accidental userspace modification from uninitialized accessors.
This commit is contained in:
parent
b1a17cce0d
commit
b7e68bbf51
|
@ -0,0 +1,56 @@
|
|||
/*
|
||||
* DataDistributionConfig.actor.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbclient/DataDistributionConfig.actor.h"
|
||||
#include "flow/actorcompiler.h"
|
||||
|
||||
// Read all key ranges of a RangeConfigMap and return it as a RangeConfigMapSnapshot
|
||||
// allKeys.begin/end will be set to default values in the result if they do not exist in the database
|
||||
//
|
||||
// TODO: Move this to a more generic function in KeyBackedRangeMap, which will need to understand more than it currently
|
||||
// does about the KeyType.
|
||||
ACTOR Future<DDConfiguration::RangeConfigMapSnapshot> DDConfiguration::readRangeMap(
|
||||
Reference<ReadYourWritesTransaction> tr,
|
||||
DDConfiguration::RangeConfigMap map) {
|
||||
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::READ_LOCK_AWARE);
|
||||
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
|
||||
state DDConfiguration::RangeConfigMapSnapshot result;
|
||||
result.map[allKeys.begin] = DDRangeConfig();
|
||||
result.map[allKeys.end] = DDRangeConfig();
|
||||
state Key begin = allKeys.begin;
|
||||
|
||||
loop {
|
||||
DDConfiguration::RangeConfigMap::RangeMapResult boundaries =
|
||||
wait(DDConfiguration().userRangeConfig().getRanges(tr, begin, allKeys.end, false));
|
||||
for (auto& kv : boundaries.results) {
|
||||
result.map[kv.key] = kv.value;
|
||||
}
|
||||
|
||||
if (!boundaries.more) {
|
||||
break;
|
||||
}
|
||||
ASSERT(!boundaries.results.empty());
|
||||
begin = keyAfter(boundaries.results.back().key);
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
|
@ -0,0 +1,115 @@
|
|||
/*
|
||||
* DataDistributionConfig.actor.h
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#if defined(NO_INTELLISENSE) && !defined(FDBCLIENT_DATA_DISTRIBUTION_CONFIG_ACTOR_G_H)
|
||||
#define FDBCLIENT_DATA_DISTRIBUTION_CONFIG_ACTOR_G_H
|
||||
#include "fdbclient/DataDistributionConfig.actor.g.h"
|
||||
#elif !defined(FDBCLIENT_DATA_DISTRIBUTION_CONFIG_ACTOR_H)
|
||||
#define FDBCLIENT_DATA_DISTRIBUTION_CONFIG_ACTOR_H
|
||||
|
||||
#include "flow/serialize.h"
|
||||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbclient/SystemData.h"
|
||||
#include "fdbclient/FDBTypes.h"
|
||||
#include "fdbclient/KeyBackedTypes.h"
|
||||
#include "fdbclient/ReadYourWrites.h"
|
||||
#include "fdbclient/RunTransaction.actor.h"
|
||||
#include "fdbclient/DatabaseContext.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
// DD Configuration for a key range range.
|
||||
// There are many properties here, all Optional, because for a given key range range a property
|
||||
// can be unspecified which means it continues the setting of the preceding range.
|
||||
struct DDRangeConfig {
|
||||
constexpr static FileIdentifier file_identifier = 9193856;
|
||||
|
||||
bool forceBoundary;
|
||||
Optional<int> replicationFactor;
|
||||
|
||||
DDRangeConfig update(DDRangeConfig const& update) const {
|
||||
DDRangeConfig result = *this;
|
||||
if (update.forceBoundary) {
|
||||
result.forceBoundary = true;
|
||||
}
|
||||
if (update.replicationFactor.present()) {
|
||||
result.replicationFactor = update.replicationFactor;
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
bool canMerge(DDRangeConfig const& next) const {
|
||||
// If either range has a forced boundary, they cannot merge
|
||||
if (forceBoundary || next.forceBoundary) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return replicationFactor == next.replicationFactor;
|
||||
}
|
||||
|
||||
// String description of the range config
|
||||
std::string toString() const {
|
||||
return fmt::format("forceBoundary={} replication={}", forceBoundary, replicationFactor);
|
||||
}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, forceBoundary, replicationFactor);
|
||||
}
|
||||
};
|
||||
|
||||
template <>
|
||||
struct Traceable<DDRangeConfig> : std::true_type {
|
||||
static std::string toString(const DDRangeConfig& c) { return c.toString(); }
|
||||
};
|
||||
|
||||
struct DDConfiguration : public KeyBackedClass {
|
||||
DDConfiguration(KeyRef prefix = SystemKey("\xff\x02/ddconfig/"_sr)) : KeyBackedClass(prefix) {}
|
||||
|
||||
// RangeConfigMap is a KeyBackedRangeMap of DDRangeConfig values describing various option overrides for key ranges
|
||||
typedef KeyBackedRangeMap<Key,
|
||||
DDRangeConfig,
|
||||
TupleCodec<Key>,
|
||||
ObjectCodec<DDRangeConfig, decltype(IncludeVersion())>>
|
||||
RangeConfigMap;
|
||||
|
||||
typedef RangeConfigMap::LocalSnapshot RangeConfigMapSnapshot;
|
||||
|
||||
// Range configuration options set by Users
|
||||
RangeConfigMap userRangeConfig() const { return { subSpace.pack(__FUNCTION__sr), trigger, IncludeVersion() }; }
|
||||
|
||||
Future<RangeConfigMapSnapshot> userRangeConfigSnapshot(Reference<ReadYourWritesTransaction> tr) const {
|
||||
return readRangeMap(tr, userRangeConfig());
|
||||
}
|
||||
|
||||
Future<RangeConfigMapSnapshot> userRangeConfigSnapshot(Database cx) const {
|
||||
return runTransaction(cx.getReference(),
|
||||
[=](Reference<ReadYourWritesTransaction> tr) { return userRangeConfigSnapshot(tr); });
|
||||
}
|
||||
|
||||
private:
|
||||
// Read all key ranges of a RangeConfigMap and return it as a RangeConfigMapSnapshot
|
||||
// allKeys.begin/end will be set to default values in the result if they do not exist in the database
|
||||
ACTOR static Future<RangeConfigMapSnapshot> readRangeMap(Reference<ReadYourWritesTransaction> tr,
|
||||
RangeConfigMap map);
|
||||
};
|
||||
|
||||
#include "flow/unactorcompiler.h"
|
||||
#endif
|
|
@ -43,6 +43,11 @@ typedef int64_t Generation;
|
|||
typedef UID SpanID;
|
||||
typedef uint64_t CoordinatorsHash;
|
||||
|
||||
// invalidKey is intentionally far beyond the system space. It is meant to be used as a safe initial value for a key
|
||||
// before it is set to something meaningful to avoid mistakes where a default constructed key is written to instead of
|
||||
// the intended target.
|
||||
static const KeyRef invalidKey = "\xff\xff\xff\xff\xff\xff\xff\xff"_sr;
|
||||
|
||||
enum {
|
||||
tagLocalitySpecial = -1, // tag with this locality means it is invalidTag (id=0), txsTag (id=1), or cacheTag (id=2)
|
||||
tagLocalityLogRouter = -2,
|
||||
|
|
|
@ -22,6 +22,7 @@
|
|||
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
#include <ranges>
|
||||
|
||||
#include "fdbclient/ClientBooleanParams.h"
|
||||
#include "fdbclient/CommitTransaction.h"
|
||||
|
@ -226,7 +227,7 @@ public:
|
|||
template <typename T, typename Codec = TupleCodec<T>>
|
||||
class KeyBackedProperty {
|
||||
public:
|
||||
KeyBackedProperty(KeyRef key, Optional<WatchableTrigger> trigger = {}, Codec codec = {})
|
||||
KeyBackedProperty(KeyRef key = invalidKey, Optional<WatchableTrigger> trigger = {}, Codec codec = {})
|
||||
: key(key), trigger(trigger), codec(codec) {}
|
||||
|
||||
template <class Transaction>
|
||||
|
@ -358,7 +359,7 @@ class KeyBackedBinaryValue : public KeyBackedProperty<T, BinaryCodec<T>> {
|
|||
typedef KeyBackedProperty<T, BinaryCodec<T>> Base;
|
||||
|
||||
public:
|
||||
KeyBackedBinaryValue(KeyRef key, Optional<WatchableTrigger> trigger = {}) : Base(key, trigger) {}
|
||||
KeyBackedBinaryValue(KeyRef key = invalidKey, Optional<WatchableTrigger> trigger = {}) : Base(key, trigger) {}
|
||||
|
||||
template <class Transaction>
|
||||
void atomicOp(Transaction tr, T const& val, MutationRef::Type type) {
|
||||
|
@ -386,6 +387,12 @@ struct TypedKVPair {
|
|||
ValueType value;
|
||||
};
|
||||
|
||||
template <typename KeyType>
|
||||
struct TypedRange {
|
||||
KeyType begin;
|
||||
KeyType end;
|
||||
};
|
||||
|
||||
// Convenient read/write access to a sorted map of KeyType to ValueType under prefix
|
||||
// Even though 'this' is not actually mutated, methods that change db keys are not const.
|
||||
template <typename _KeyType,
|
||||
|
@ -394,7 +401,7 @@ template <typename _KeyType,
|
|||
typename ValueCodec = TupleCodec<_ValueType>>
|
||||
class KeyBackedMap {
|
||||
public:
|
||||
KeyBackedMap(KeyRef prefix, Optional<WatchableTrigger> trigger = {}, ValueCodec valueCodec = {})
|
||||
KeyBackedMap(KeyRef prefix = invalidKey, Optional<WatchableTrigger> trigger = {}, ValueCodec valueCodec = {})
|
||||
: subspace(prefixRange(prefix)), trigger(trigger), valueCodec(valueCodec) {}
|
||||
|
||||
typedef _KeyType KeyType;
|
||||
|
@ -547,18 +554,19 @@ public:
|
|||
template <typename _KeyType, typename _ValueType, typename VersionOptions, typename KeyCodec = TupleCodec<_KeyType>>
|
||||
class KeyBackedObjectMap
|
||||
: public KeyBackedMap<_KeyType, _ValueType, KeyCodec, ObjectCodec<_ValueType, VersionOptions>> {
|
||||
typedef KeyBackedMap<_KeyType, _ValueType, KeyCodec, ObjectCodec<_ValueType, VersionOptions>> Base;
|
||||
typedef ObjectCodec<_ValueType, VersionOptions> ObjectCodec;
|
||||
typedef ObjectCodec<_ValueType, VersionOptions> ValueCodec;
|
||||
typedef KeyBackedMap<_KeyType, _ValueType, KeyCodec, ValueCodec> Base;
|
||||
|
||||
public:
|
||||
KeyBackedObjectMap(KeyRef key, VersionOptions vo, Optional<WatchableTrigger> trigger = {})
|
||||
: Base(key, trigger, ObjectCodec(vo)) {}
|
||||
: Base(key, trigger, ValueCodec(vo)) {}
|
||||
};
|
||||
|
||||
template <typename _ValueType, typename Codec = TupleCodec<_ValueType>>
|
||||
class KeyBackedSet {
|
||||
public:
|
||||
KeyBackedSet(KeyRef key, Optional<WatchableTrigger> trigger = {}) : subspace(prefixRange(key)), trigger(trigger) {}
|
||||
KeyBackedSet(KeyRef key = invalidKey, Optional<WatchableTrigger> trigger = {})
|
||||
: subspace(prefixRange(key)), trigger(trigger) {}
|
||||
|
||||
typedef _ValueType ValueType;
|
||||
typedef KeyBackedRangeResult<ValueType> RangeResultType;
|
||||
|
@ -640,6 +648,62 @@ public:
|
|||
Key packKey(ValueType const& value) const { return subspace.begin.withSuffix(Codec::pack(value)); }
|
||||
};
|
||||
|
||||
// A local in-memory representation of a KeyBackedRangeMap snapshot
|
||||
// It is invalid to look up a range in this map which not within the range of
|
||||
// [first key of map, last key of map)
|
||||
template <typename KeyType, typename ValueType>
|
||||
struct KeyRangeMapSnapshot {
|
||||
typedef std::map<KeyType, ValueType> Map;
|
||||
|
||||
// Iterator for ranges in the map. Ranges are represented by a key, its value, and the next key in the map.
|
||||
struct RangeIter {
|
||||
typename Map::const_iterator impl;
|
||||
|
||||
using iterator_category = std::bidirectional_iterator_tag;
|
||||
using value_type = RangeIter;
|
||||
using pointer = RangeIter*;
|
||||
using reference = RangeIter&;
|
||||
|
||||
TypedRange<const KeyType&> range() const { return { impl->first, std::next(impl)->first }; }
|
||||
const ValueType& value() const { return impl->second; }
|
||||
|
||||
const RangeIter& operator*() const { return *this; }
|
||||
const RangeIter* operator->() const { return this; }
|
||||
|
||||
RangeIter operator++(int) { return { impl++ }; }
|
||||
RangeIter operator--(int) { return { impl-- }; }
|
||||
RangeIter & operator++() { ++impl; return *this; }
|
||||
RangeIter & operator--() { --impl; return *this; }
|
||||
|
||||
bool operator==(const RangeIter &rhs) const { return impl == rhs.impl; }
|
||||
bool operator!=(const RangeIter &rhs) const { return impl == rhs.impl; }
|
||||
};
|
||||
|
||||
// Range-for compatible object representing a list of contiguous ranges.
|
||||
struct Ranges {
|
||||
RangeIter iBegin, iEnd;
|
||||
RangeIter begin() const { return iBegin; }
|
||||
RangeIter end() const { return iEnd; }
|
||||
};
|
||||
|
||||
RangeIter rangeContaining(const KeyType& begin) const {
|
||||
ASSERT(map.size() >= 2);
|
||||
auto i = map.upper_bound(begin);
|
||||
ASSERT(i != map.begin());
|
||||
ASSERT(i != map.end());
|
||||
return { --i };
|
||||
}
|
||||
|
||||
// Get a set of Ranges which cover [begin, end)
|
||||
Ranges intersectingRanges(const KeyType& begin, const KeyType& end) const {
|
||||
return { rangeContaining(begin), { map.lower_bound(end) } };
|
||||
}
|
||||
|
||||
Ranges ranges() const { return { { map.begin() }, { std::prev(map.end()) } }; }
|
||||
|
||||
Map map;
|
||||
};
|
||||
|
||||
// KeyBackedKeyRangeMap is similar to KeyRangeMap but without a Metric
|
||||
// It is assumed that any range not covered by the map is set to a default ValueType()
|
||||
// The ValueType must have
|
||||
|
@ -657,15 +721,25 @@ class KeyBackedRangeMap {
|
|||
public:
|
||||
typedef KeyBackedMap<KeyType, ValueType, KeyCodec, ValueCodec> Map;
|
||||
typedef KeyBackedRangeResult<typename Map::KVType> RangeMapResult;
|
||||
typedef KeyRangeMapSnapshot<KeyType, ValueType> LocalSnapshot;
|
||||
|
||||
KeyBackedRangeMap(KeyRef prefix, Optional<WatchableTrigger> trigger = {}, ValueCodec valueCodec = {})
|
||||
KeyBackedRangeMap(KeyRef prefix = invalidKey, Optional<WatchableTrigger> trigger = {}, ValueCodec valueCodec = {})
|
||||
: kvMap(prefix, trigger, valueCodec) {}
|
||||
|
||||
// Get ranges to fully describe the given range.
|
||||
// Result will be that returned by a range read from lastLessOrEqual(begin) to lastLessOrEqual(end)
|
||||
// Get ranges for a given key range.
|
||||
// The ranges returned will start at a key < end.
|
||||
//
|
||||
// If includeBeforeBegin is true, the ranges returned will fully cover the given range. This means the first key of
|
||||
// result could be < begin if begin does not exist as a boundary.
|
||||
//
|
||||
// If includeBeforeBegin is false, the ranges returned will be those which start at a key >= begin
|
||||
template <class Transaction>
|
||||
Future<RangeMapResult> getRanges(Transaction* tr, KeyType const& begin, KeyType const& end) const {
|
||||
KeySelector rangeBegin = lastLessOrEqual(kvMap.packKey(begin));
|
||||
Future<RangeMapResult> getRanges(Transaction tr,
|
||||
KeyType const& begin,
|
||||
KeyType const& end,
|
||||
bool includeBeforeBegin = true) const {
|
||||
KeySelector rangeBegin =
|
||||
includeBeforeBegin ? lastLessOrEqual(kvMap.packKey(begin)) : firstGreaterOrEqual(kvMap.packKey(begin));
|
||||
KeySelector rangeEnd = lastLessOrEqual(kvMap.packKey(end));
|
||||
|
||||
typename transaction_future_type<Transaction, RangeResult>::type getRangeFuture =
|
||||
|
@ -674,13 +748,14 @@ public:
|
|||
return holdWhile(
|
||||
getRangeFuture,
|
||||
map(safeThreadFutureToFuture(getRangeFuture), [=, kvMap = kvMap](RangeResult const& rawkvs) mutable {
|
||||
RangeMapResult result;
|
||||
result.results.reserve(rawkvs.size());
|
||||
RangeMapResult result;
|
||||
result.results.reserve(rawkvs.size());
|
||||
for (auto const& kv : rawkvs) {
|
||||
result.results.emplace_back(kvMap.unpackKV(kv));
|
||||
result.results.emplace_back(kvMap.unpackKV(kv));
|
||||
}
|
||||
return result;
|
||||
}));
|
||||
result.more = rawkvs.more;
|
||||
return result;
|
||||
}));
|
||||
}
|
||||
|
||||
// Get the Value for the range that key is in
|
||||
|
@ -695,12 +770,12 @@ public:
|
|||
return holdWhile(
|
||||
getRangeFuture,
|
||||
map(safeThreadFutureToFuture(getRangeFuture), [=, kvMap = kvMap](RangeResult const& rawkvs) mutable {
|
||||
ASSERT(rawkvs.size() <= 1);
|
||||
if(rawkvs.empty()) {
|
||||
return ValueType();
|
||||
}
|
||||
return kvMap.unpackValue(rawkvs.back().value);
|
||||
}));
|
||||
ASSERT(rawkvs.size() <= 1);
|
||||
if (rawkvs.empty()) {
|
||||
return ValueType();
|
||||
}
|
||||
return kvMap.unpackValue(rawkvs.back().value);
|
||||
}));
|
||||
}
|
||||
|
||||
// Update the range from begin to end by applying valueUpdate to it
|
||||
|
@ -708,50 +783,50 @@ public:
|
|||
template <class Transaction>
|
||||
Future<Void> updateRange(Transaction* tr, KeyType const& begin, KeyType const& end, ValueType const& valueUpdate) {
|
||||
return map(getRanges(tr, begin, end), [=, kvMap = kvMap](RangeMapResult const& range) mutable {
|
||||
// TODO: Handle partial ranges.
|
||||
ASSERT(!range.more);
|
||||
// TODO: Handle partial ranges.
|
||||
ASSERT(!range.more);
|
||||
|
||||
// A deque here makes the following logic easier, though it's not very efficient.
|
||||
std::deque<typename Map::KVType> kvs(range.results.begin(), range.results.end());
|
||||
// A deque here makes the following logic easier, though it's not very efficient.
|
||||
std::deque<typename Map::KVType> kvs(range.results.begin(), range.results.end());
|
||||
|
||||
// First, handle modification at the begin boundary.
|
||||
// If there are no records, or no records <= begin, then set begin to valueUpdate
|
||||
if (kvs.empty() || kvs.front().key > begin) {
|
||||
kvMap.set(tr, begin, valueUpdate);
|
||||
} else {
|
||||
// Otherwise, the first record represents the range that begin is currently in.
|
||||
// This record may be begin or before it, either way the action is to set begin
|
||||
// to the first record's value updated with valueUpdate
|
||||
kvMap.set(tr, begin, kvs.front().value.update(valueUpdate));
|
||||
// The first record has consumed
|
||||
kvs.pop_front();
|
||||
}
|
||||
// First, handle modification at the begin boundary.
|
||||
// If there are no records, or no records <= begin, then set begin to valueUpdate
|
||||
if (kvs.empty() || kvs.front().key > begin) {
|
||||
kvMap.set(tr, begin, valueUpdate);
|
||||
} else {
|
||||
// Otherwise, the first record represents the range that begin is currently in.
|
||||
// This record may be begin or before it, either way the action is to set begin
|
||||
// to the first record's value updated with valueUpdate
|
||||
kvMap.set(tr, begin, kvs.front().value.update(valueUpdate));
|
||||
// The first record has consumed
|
||||
kvs.pop_front();
|
||||
}
|
||||
|
||||
// Next, handle modification at the end boundary
|
||||
// If there are no records, then set end to the default value to clear the effects of begin as of the
|
||||
// end boundary.
|
||||
if (kvs.empty()) {
|
||||
kvMap.set(tr, end, ValueType());
|
||||
} else if (kvs.back().key < end) {
|
||||
// If the last key is less than end, then end is not yet a boundary in the range map so we must
|
||||
// create it. It will be set to the value of the range it resides in, which begins with the last
|
||||
// item in kvs. Note that it is very important to do this *before* modifying all the middle
|
||||
// boundaries of the range so that the changes we are making here terminate at end.
|
||||
kvMap.set(tr, end, kvs.back().value);
|
||||
} else {
|
||||
// Otherwise end is in kvs, which effectively ends the range we are modifying so just pop it from
|
||||
// kvs so that it is not modified in the loop below.
|
||||
ASSERT(kvs.back().key == end);
|
||||
kvs.pop_back();
|
||||
}
|
||||
// Next, handle modification at the end boundary
|
||||
// If there are no records, then set end to the default value to clear the effects of begin as of the
|
||||
// end boundary.
|
||||
if (kvs.empty()) {
|
||||
kvMap.set(tr, end, ValueType());
|
||||
} else if (kvs.back().key < end) {
|
||||
// If the last key is less than end, then end is not yet a boundary in the range map so we must
|
||||
// create it. It will be set to the value of the range it resides in, which begins with the last
|
||||
// item in kvs. Note that it is very important to do this *before* modifying all the middle
|
||||
// boundaries of the range so that the changes we are making here terminate at end.
|
||||
kvMap.set(tr, end, kvs.back().value);
|
||||
} else {
|
||||
// Otherwise end is in kvs, which effectively ends the range we are modifying so just pop it from
|
||||
// kvs so that it is not modified in the loop below.
|
||||
ASSERT(kvs.back().key == end);
|
||||
kvs.pop_back();
|
||||
}
|
||||
|
||||
// Last, update all of the range boundaries in the middle which are left in the kvs queue
|
||||
for (auto const& kv : kvs) {
|
||||
kvMap.set(tr, kv.key, kv.value.update(valueUpdate));
|
||||
}
|
||||
// Last, update all of the range boundaries in the middle which are left in the kvs queue
|
||||
for (auto const& kv : kvs) {
|
||||
kvMap.set(tr, kv.key, kv.value.update(valueUpdate));
|
||||
}
|
||||
|
||||
return Void();
|
||||
});
|
||||
return Void();
|
||||
});
|
||||
}
|
||||
|
||||
private:
|
||||
|
|
|
@ -353,10 +353,6 @@ public:
|
|||
// 'plaintext marker' is present.
|
||||
Optional<std::string> dataAtRestPlaintextMarker;
|
||||
|
||||
// A collection of custom shard boundaries (begin, end, replication factor) that will be removed once this feature
|
||||
// is integrated with a way to set these boundaries in the database
|
||||
std::vector<std::tuple<std::string, std::string, int>> customReplicas;
|
||||
|
||||
flowGlobalType global(int id) const final;
|
||||
void setGlobal(size_t id, flowGlobalType v) final;
|
||||
|
||||
|
|
|
@ -28,6 +28,7 @@
|
|||
#include "fdbclient/DatabaseContext.h"
|
||||
#include "fdbclient/ReadYourWrites.h"
|
||||
#include "fdbclient/TagThrottle.actor.h"
|
||||
#include "fdbclient/DataDistributionConfig.actor.h"
|
||||
#include "fdbserver/Knobs.h"
|
||||
#include "fdbserver/StorageMetrics.actor.h"
|
||||
#include "fdbserver/DataDistribution.actor.h"
|
||||
|
@ -440,6 +441,8 @@ ACTOR Future<Void> checkDataConsistency(Database cx,
|
|||
sharedRandom.randomShuffle(shardOrder);
|
||||
}
|
||||
|
||||
state DDConfiguration::RangeConfigMapSnapshot userRangeConfig = wait(DDConfiguration().userRangeConfigSnapshot(cx));
|
||||
|
||||
for (; i < ranges.size(); i++) {
|
||||
state int shard = shardOrder[i];
|
||||
|
||||
|
@ -457,46 +460,50 @@ ACTOR Future<Void> checkDataConsistency(Database cx,
|
|||
// If the destStorageServers is non-empty, then this shard is being relocated
|
||||
state bool isRelocating = destStorageServers.size() > 0;
|
||||
|
||||
state int customReplication = configuration.storageTeamSize;
|
||||
if (g_network->isSimulated() && ddLargeTeamEnabled()) {
|
||||
for (auto& it : g_simulator->customReplicas) {
|
||||
KeyRangeRef replicaRange(std::get<0>(it), std::get<1>(it));
|
||||
if (range.intersects(replicaRange)) {
|
||||
TraceEvent("ConsistencyCheck_CheckCustomReplica")
|
||||
.detail("ShardBegin", printable(range.begin))
|
||||
.detail("ShardEnd", printable(range.end))
|
||||
.detail("SourceTeamSize", sourceStorageServers.size())
|
||||
.detail("DestServerSize", destStorageServers.size())
|
||||
.detail("ConfigStorageTeamSize", configuration.storageTeamSize)
|
||||
.detail("CustomBegin", std::get<0>(it))
|
||||
.detail("CustomEnd", std::get<1>(it))
|
||||
.detail("CustomReplicas", std::get<2>(it))
|
||||
.detail("UsableRegions", configuration.usableRegions)
|
||||
.detail("First", firstClient)
|
||||
.detail("Perform", performQuiescentChecks);
|
||||
if (!replicaRange.contains(range)) {
|
||||
testFailure("Custom shard boundary violated", performQuiescentChecks, success, failureIsError);
|
||||
return Void();
|
||||
}
|
||||
customReplication = std::max(customReplication, std::get<2>(it));
|
||||
state int expectedReplicas = configuration.storageTeamSize;
|
||||
if (ddLargeTeamEnabled()) {
|
||||
// For every custom range that overlaps with this shard range, print it and update the replication count
|
||||
// There should only be one custom range, possibly the default range with no custom configuration at all
|
||||
for (auto it : userRangeConfig.intersectingRanges(range.begin, range.end)) {
|
||||
KeyRangeRef configuredRange(it->range().begin, it->range().end);
|
||||
|
||||
TraceEvent("ConsistencyCheck_CheckCustomReplica")
|
||||
.detail("ShardBegin", printable(range.begin))
|
||||
.detail("ShardEnd", printable(range.end))
|
||||
.detail("SourceTeamSize", sourceStorageServers.size())
|
||||
.detail("DestServerSize", destStorageServers.size())
|
||||
.detail("ConfigStorageTeamSize", configuration.storageTeamSize)
|
||||
.detail("CustomBegin", configuredRange.begin)
|
||||
.detail("CustomEnd", configuredRange.end)
|
||||
.detail("CustomConfig", it->value())
|
||||
.detail("UsableRegions", configuration.usableRegions)
|
||||
.detail("First", firstClient)
|
||||
.detail("Perform", performQuiescentChecks);
|
||||
|
||||
// The custom range should completely contain the shard range or a shard boundary that should exist
|
||||
// does not exist.
|
||||
if (!configuredRange.contains(range)) {
|
||||
testFailure("Custom shard boundary violated", performQuiescentChecks, success, failureIsError);
|
||||
return Void();
|
||||
}
|
||||
|
||||
expectedReplicas = std::max(expectedReplicas, it->value().replicationFactor.orDefault(0));
|
||||
}
|
||||
}
|
||||
|
||||
// In a quiescent database, check that the team size is the same as the desired team size
|
||||
// FIXME: when usable_regions=2, we need to determine how many storage servers are alive in each DC
|
||||
if (firstClient && performQuiescentChecks &&
|
||||
((configuration.usableRegions == 1 &&
|
||||
sourceStorageServers.size() != std::min(ssCount, customReplication)) ||
|
||||
((configuration.usableRegions == 1 && sourceStorageServers.size() != std::min(ssCount, expectedReplicas)) ||
|
||||
sourceStorageServers.size() < configuration.usableRegions * configuration.storageTeamSize ||
|
||||
sourceStorageServers.size() > configuration.usableRegions * customReplication)) {
|
||||
sourceStorageServers.size() > configuration.usableRegions * expectedReplicas)) {
|
||||
TraceEvent("ConsistencyCheck_InvalidTeamSize")
|
||||
.detail("ShardBegin", printable(range.begin))
|
||||
.detail("ShardEnd", printable(range.end))
|
||||
.detail("SourceTeamSize", sourceStorageServers.size())
|
||||
.detail("DestServerSize", destStorageServers.size())
|
||||
.detail("ConfigStorageTeamSize", configuration.storageTeamSize)
|
||||
.detail("CustomReplicas", customReplication)
|
||||
.detail("ExpectedReplicas", expectedReplicas)
|
||||
.detail("UsableRegions", configuration.usableRegions)
|
||||
.detail("SSCount", ssCount);
|
||||
// Record the server reponsible for the problematic shards
|
||||
|
|
|
@ -887,7 +887,7 @@ static bool shardForwardMergeFeasible(DataDistributionTracker* self, KeyRange co
|
|||
return false;
|
||||
}
|
||||
|
||||
if (self->customReplication->rangeContaining(keys.begin).range().end < nextRange.end) {
|
||||
if (self->userRangeConfig->rangeContaining(keys.begin)->range().end < nextRange.end) {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -899,7 +899,7 @@ static bool shardBackwardMergeFeasible(DataDistributionTracker* self, KeyRange c
|
|||
return false;
|
||||
}
|
||||
|
||||
if (self->customReplication->rangeContaining(keys.begin).range().begin > prevRange.begin) {
|
||||
if (self->userRangeConfig->rangeContaining(keys.begin)->range().begin > prevRange.begin) {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -1210,7 +1210,7 @@ ACTOR Future<Void> trackInitialShards(DataDistributionTracker* self, Reference<I
|
|||
wait(delay(0.0, TaskPriority::DataDistribution));
|
||||
|
||||
state std::vector<Key> customBoundaries;
|
||||
for (auto& it : self->customReplication->ranges()) {
|
||||
for (auto it : self->userRangeConfig->ranges()) {
|
||||
customBoundaries.push_back(it->range().begin);
|
||||
}
|
||||
|
||||
|
@ -1489,7 +1489,7 @@ Future<Void> DataDistributionTracker::run(Reference<DataDistributionTracker> sel
|
|||
self->getTopKMetrics = getTopKMetrics;
|
||||
self->getShardMetricsList = getShardMetricsList;
|
||||
self->averageShardBytes = getAverageShardBytes;
|
||||
self->customReplication = initData->customReplication;
|
||||
self->userRangeConfig = initData->userRangeConfig;
|
||||
return holdWhile(self, DataDistributionTrackerImpl::run(self.getPtr(), initData));
|
||||
}
|
||||
|
||||
|
|
|
@ -239,8 +239,8 @@ public:
|
|||
|
||||
if (ddLargeTeamEnabled() && req.keys.present()) {
|
||||
int customReplicas = self->configuration.storageTeamSize;
|
||||
for (auto& it : self->customReplication->intersectingRanges(req.keys.get())) {
|
||||
customReplicas = std::max(customReplicas, it.value());
|
||||
for (auto it : self->userRangeConfig->intersectingRanges(req.keys->begin, req.keys->end)) {
|
||||
customReplicas = std::max(customReplicas, it->value().replicationFactor.orDefault(0));
|
||||
}
|
||||
if (customReplicas > self->configuration.storageTeamSize) {
|
||||
auto newTeam = self->buildLargeTeam(customReplicas);
|
||||
|
@ -511,7 +511,7 @@ public:
|
|||
Reference<InitialDataDistribution> initTeams,
|
||||
const DDEnabledState* ddEnabledState) {
|
||||
|
||||
self->customReplication = initTeams->customReplication;
|
||||
self->userRangeConfig = initTeams->userRangeConfig;
|
||||
self->healthyZone.set(initTeams->initHealthyZoneValue);
|
||||
// SOMEDAY: If some servers have teams and not others (or some servers have more data than others) and there is
|
||||
// an address/locality collision, should we preferentially mark the least used server as undesirable?
|
||||
|
@ -3645,8 +3645,8 @@ void DDTeamCollection::fixUnderReplication() {
|
|||
}
|
||||
|
||||
int customReplicas = configuration.storageTeamSize;
|
||||
for (auto& c : customReplication->intersectingRanges(r.range())) {
|
||||
customReplicas = std::max(customReplicas, c.value());
|
||||
for (auto it : userRangeConfig->intersectingRanges(r.range().begin, r.range().end)) {
|
||||
customReplicas = std::max(customReplicas, it->value().replicationFactor.orDefault(0));
|
||||
}
|
||||
|
||||
int currentSize = 0;
|
||||
|
|
|
@ -242,6 +242,8 @@ class DDTxnProcessorImpl {
|
|||
|
||||
state Transaction tr(cx);
|
||||
|
||||
wait(store(*result->userRangeConfig, DDConfiguration().userRangeConfigSnapshot(cx)));
|
||||
|
||||
state std::map<UID, Optional<Key>> server_dc;
|
||||
state std::map<std::vector<UID>, std::pair<std::vector<UID>, std::vector<UID>>> team_cache;
|
||||
state std::vector<std::pair<StorageServerInterface, ProcessClass>> tss_servers;
|
||||
|
@ -259,12 +261,6 @@ class DDTxnProcessorImpl {
|
|||
tss_servers.clear();
|
||||
team_cache.clear();
|
||||
succeeded = false;
|
||||
result->customReplication->insert(allKeys, -1);
|
||||
if (g_network->isSimulated() && ddLargeTeamEnabled()) {
|
||||
for (auto& it : g_simulator->customReplicas) {
|
||||
result->customReplication->insert(KeyRangeRef(std::get<0>(it), std::get<1>(it)), std::get<2>(it));
|
||||
}
|
||||
}
|
||||
try {
|
||||
// Read healthyZone value which is later used to determine on/off of failure triggered DD
|
||||
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
||||
|
|
|
@ -499,11 +499,12 @@ public:
|
|||
}
|
||||
|
||||
state std::vector<Key> customBoundaries;
|
||||
for (auto& it : self->initData->customReplication->ranges()) {
|
||||
customBoundaries.push_back(it->range().begin);
|
||||
TraceEvent(SevDebug, "DDInitCustomReplicas", self->ddId)
|
||||
.detail("Range", it->range())
|
||||
.detail("Replication", it->value());
|
||||
for (auto it : self->initData->userRangeConfig->ranges()) {
|
||||
auto range = it->range();
|
||||
customBoundaries.push_back(range.begin);
|
||||
TraceEvent(SevDebug, "DDInitCustomRangeConfig", self->ddId)
|
||||
.detail("Range", KeyRangeRef(range.begin, range.end))
|
||||
.detail("Config", it->value());
|
||||
}
|
||||
|
||||
state int shard = 0;
|
||||
|
@ -534,9 +535,11 @@ public:
|
|||
auto& keys = ranges[r];
|
||||
self->shardsAffectedByTeamFailure->defineShard(keys);
|
||||
|
||||
auto customRange = self->initData->customReplication->rangeContaining(keys.begin);
|
||||
int customReplicas = std::max(self->configuration.storageTeamSize, customRange.value());
|
||||
ASSERT_WE_THINK(customRange.range().contains(keys));
|
||||
auto it = self->initData->userRangeConfig->rangeContaining(keys.begin);
|
||||
auto range = it->range();
|
||||
int customReplicas =
|
||||
std::max(self->configuration.storageTeamSize, it->value().replicationFactor.orDefault(0));
|
||||
ASSERT_WE_THINK(KeyRangeRef(range.begin, range.end).contains(keys));
|
||||
|
||||
bool unhealthy = iShard.primarySrc.size() != customReplicas;
|
||||
if (!unhealthy && self->configuration.usableRegions > 1) {
|
||||
|
|
|
@ -108,7 +108,7 @@ public:
|
|||
|
||||
Optional<Reference<TenantCache>> ddTenantCache;
|
||||
|
||||
Reference<KeyRangeMap<int>> customReplication;
|
||||
std::shared_ptr<DDConfiguration::RangeConfigMapSnapshot> userRangeConfig;
|
||||
|
||||
DataDistributionTracker() = default;
|
||||
|
||||
|
|
|
@ -311,7 +311,7 @@ protected:
|
|||
|
||||
LocalityMap<UID> machineLocalityMap; // locality info of machines
|
||||
|
||||
Reference<KeyRangeMap<int>> customReplication;
|
||||
std::shared_ptr<DDConfiguration::RangeConfigMapSnapshot> userRangeConfig;
|
||||
CoalescedKeyRangeMap<bool> underReplication;
|
||||
|
||||
// A mechanism to tell actors that reference a DDTeamCollection object through a direct
|
||||
|
|
|
@ -36,6 +36,7 @@
|
|||
#include "fdbserver/MoveKeys.actor.h"
|
||||
#include "fdbserver/ShardsAffectedByTeamFailure.h"
|
||||
#include "fdbclient/StorageWiggleMetrics.actor.h"
|
||||
#include "fdbclient/DataDistributionConfig.actor.h"
|
||||
#include <boost/heap/policies.hpp>
|
||||
#include <boost/heap/skew_heap.hpp>
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
@ -485,7 +486,8 @@ struct DDShardInfo {
|
|||
|
||||
struct InitialDataDistribution : ReferenceCounted<InitialDataDistribution> {
|
||||
InitialDataDistribution()
|
||||
: dataMoveMap(std::make_shared<DataMove>()), customReplication(makeReference<KeyRangeMap<int>>(-1)) {}
|
||||
: dataMoveMap(std::make_shared<DataMove>()),
|
||||
userRangeConfig(std::make_shared<DDConfiguration::RangeConfigMapSnapshot>()) {}
|
||||
|
||||
// Read from dataDistributionModeKey. Whether DD is disabled. DD can be disabled persistently (mode = 0). Set mode
|
||||
// to 1 will enable all disabled parts
|
||||
|
@ -498,7 +500,7 @@ struct InitialDataDistribution : ReferenceCounted<InitialDataDistribution> {
|
|||
Optional<Key> initHealthyZoneValue; // set for maintenance mode
|
||||
KeyRangeMap<std::shared_ptr<DataMove>> dataMoveMap;
|
||||
std::vector<AuditStorageState> auditStates;
|
||||
Reference<KeyRangeMap<int>> customReplication;
|
||||
std::shared_ptr<DDConfiguration::RangeConfigMapSnapshot> userRangeConfig;
|
||||
};
|
||||
|
||||
// Holds the permitted size and IO Bounds for a shard
|
||||
|
|
|
@ -46,6 +46,7 @@
|
|||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbclient/SystemData.h"
|
||||
#include "fdbclient/TenantManagement.actor.h"
|
||||
#include "fdbclient/DataDistributionConfig.actor.h"
|
||||
#include "fdbserver/KnobProtectiveGroups.h"
|
||||
#include "fdbserver/TesterInterface.actor.h"
|
||||
#include "fdbserver/WorkerInterface.actor.h"
|
||||
|
@ -1931,6 +1932,7 @@ ACTOR Future<Void> runTests(Reference<AsyncVar<Optional<struct ClusterController
|
|||
Optional<TenantName> defaultTenant,
|
||||
Standalone<VectorRef<TenantNameRef>> tenantsToCreate,
|
||||
bool restartingTest) {
|
||||
state DDConfiguration::RangeConfigMap rangeConfig(DDConfiguration().userRangeConfig());
|
||||
state Database cx;
|
||||
state Reference<AsyncVar<ServerDBInfo>> dbInfo(new AsyncVar<ServerDBInfo>);
|
||||
state Future<Void> ccMonitor = monitorServerDBInfo(cc, LocalityData(), dbInfo); // FIXME: locality
|
||||
|
@ -2059,10 +2061,28 @@ ACTOR Future<Void> runTests(Reference<AsyncVar<Optional<struct ClusterController
|
|||
|
||||
if (deterministicRandom()->random01() < 1 && (g_simulator->storagePolicy->info() == "zoneid^1 x 1" ||
|
||||
g_simulator->storagePolicy->info() == "zoneid^2 x 1")) {
|
||||
g_simulator->customReplicas.push_back(std::make_tuple("\xff\x03", "\xff\x04", 3));
|
||||
g_simulator->customReplicas.push_back(std::make_tuple("\xff\x04", "\xff\x05", 3));
|
||||
TraceEvent("SettingCustomReplicas");
|
||||
MoveKeysLock lock = wait(takeMoveKeysLock(cx, UID()));
|
||||
state DDRangeConfig triple;
|
||||
triple.replicationFactor = 3;
|
||||
|
||||
state ReadYourWritesTransaction tr(cx);
|
||||
loop {
|
||||
try {
|
||||
TraceEvent("SettingCustomReplicas").log();
|
||||
// Map logic should work with or without allKeys endpoints initialized
|
||||
if (deterministicRandom()->coinflip()) {
|
||||
wait(rangeConfig.updateRange(&tr, allKeys.begin, allKeys.end, DDRangeConfig()));
|
||||
}
|
||||
|
||||
wait(rangeConfig.updateRange(&tr, "\xff\x03"_sr, "\xff\x04"_sr, triple));
|
||||
wait(rangeConfig.updateRange(&tr, "\xff\x04"_sr, "\xff\x05"_sr, triple));
|
||||
wait(tr.commit());
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
// TODO: DD should check DDConfiguration().trigger.watch() for changes
|
||||
wait(success(takeMoveKeysLock(cx, UID())));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -22,6 +22,7 @@
|
|||
#define FLOW_OPTIONAL_H
|
||||
|
||||
#include <optional>
|
||||
#include <fmt/format.h>
|
||||
|
||||
#include "flow/FileIdentifier.h"
|
||||
#include "flow/Error.h"
|
||||
|
@ -251,6 +252,12 @@ public:
|
|||
// Ordering: If T is ordered, then Optional() < Optional(t) and (Optional(u)<Optional(v))==(u<v)
|
||||
bool operator<(Optional const& o) const { return impl < o.impl; }
|
||||
|
||||
const T* operator->() const { return &get(); }
|
||||
T* operator->() { return &get(); }
|
||||
const T& operator*() const& { return get(); }
|
||||
T& operator*() & { return get(); }
|
||||
T&& operator*() && { return get(); }
|
||||
|
||||
void reset() { impl.reset(); }
|
||||
size_t hash() const { return hashFunc(impl); }
|
||||
|
||||
|
@ -259,4 +266,17 @@ private:
|
|||
std::optional<T> impl;
|
||||
};
|
||||
|
||||
template <typename T>
|
||||
struct fmt::formatter<Optional<T>> : fmt::formatter<T> {
|
||||
|
||||
template <typename FormatContext>
|
||||
auto format(const Optional<T>& opt, FormatContext& ctx) {
|
||||
if (opt.present()) {
|
||||
fmt::formatter<T>::format(opt.get(), ctx);
|
||||
return ctx.out();
|
||||
}
|
||||
return fmt::format_to(ctx.out(), "<np>");
|
||||
}
|
||||
};
|
||||
|
||||
#endif // FLOW_OPTIONAL_H
|
Loading…
Reference in New Issue