Address review comments. KeyRangeMapSnapshot is now ReferenceCounted and getSnapshot() returns a Reference to discourage copying. Added several comments for clarity. Added FormatUsingTraceable and changed all new formatters to use it except for Standalone<T> which redirects to the formatter for T.
This commit is contained in:
parent
7f3df82d98
commit
858b51a69b
|
@ -72,11 +72,12 @@ ACTOR Future<bool> rangeConfigCommandActor(Database cx, std::vector<StringRef> t
|
|||
}
|
||||
}
|
||||
|
||||
DDConfiguration::RangeConfigMapSnapshot config = wait(DDConfiguration().userRangeConfig().getSnapshot(
|
||||
SystemDBWriteLockedNow(cx.getReference()), allKeys.begin, allKeys.end));
|
||||
Reference<DDConfiguration::RangeConfigMapSnapshot> config =
|
||||
wait(DDConfiguration().userRangeConfig().getSnapshot(
|
||||
SystemDBWriteLockedNow(cx.getReference()), allKeys.begin, allKeys.end));
|
||||
fmt::print(
|
||||
"{}\n",
|
||||
json_spirit::write_string(DDConfiguration::toJSON(config, includeDefault), json_spirit::pretty_print));
|
||||
json_spirit::write_string(DDConfiguration::toJSON(*config, includeDefault), json_spirit::pretty_print));
|
||||
|
||||
} else if (cmd == "update"_sr || cmd == "set"_sr) {
|
||||
if (args.size() < 3) {
|
||||
|
|
|
@ -684,7 +684,7 @@ public:
|
|||
// restore uid. Get this uid's tag, then get the KEY for the tag's uid but don't read it. That becomes the
|
||||
// validation key which TaskBucket will check, and its value must be this restore config's uid.
|
||||
UID u = uid; // 'this' could be invalid in lambda
|
||||
Key p = subSpace.key();
|
||||
Key p = subspace.key();
|
||||
return map(tag().get(tr), [u, p, task](Optional<std::string> const& tag) -> Void {
|
||||
if (!tag.present())
|
||||
throw restore_error();
|
||||
|
|
|
@ -90,12 +90,7 @@ struct Traceable<DDRangeConfig> : std::true_type {
|
|||
};
|
||||
|
||||
template <>
|
||||
struct fmt::formatter<DDRangeConfig> : formatter<std::string> {
|
||||
template <typename FormatContext>
|
||||
auto format(const DDRangeConfig& val, FormatContext& ctx) {
|
||||
return fmt::formatter<std::string>::format(val.toString(), ctx);
|
||||
}
|
||||
};
|
||||
struct fmt::formatter<DDRangeConfig> : FormatUsingTraceable<DDRangeConfig> {};
|
||||
|
||||
struct DDConfiguration : public KeyBackedClass {
|
||||
DDConfiguration(KeyRef prefix = SystemKey("\xff\x02/ddconfig/"_sr)) : KeyBackedClass(prefix) {}
|
||||
|
@ -110,7 +105,7 @@ struct DDConfiguration : public KeyBackedClass {
|
|||
typedef RangeConfigMap::LocalSnapshot RangeConfigMapSnapshot;
|
||||
|
||||
// Range configuration options set by Users
|
||||
RangeConfigMap userRangeConfig() const { return { subSpace.pack(__FUNCTION__sr), trigger, IncludeVersion() }; }
|
||||
RangeConfigMap userRangeConfig() const { return { subspace.pack(__FUNCTION__sr), trigger, IncludeVersion() }; }
|
||||
|
||||
static json_spirit::mValue toJSON(RangeConfigMapSnapshot const& snapshot, bool includeDefaultRanges = false);
|
||||
};
|
||||
|
|
|
@ -25,14 +25,17 @@
|
|||
#elif !defined(FDBCLIENT_KEYBACKEDRANGEMAP_ACTOR_H)
|
||||
#define FDBCLIENT_KEYBACKEDRANGEMAP_ACTOR_H
|
||||
|
||||
#include "flow/FastRef.h"
|
||||
#include "fdbclient/KeyBackedTypes.actor.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
// A local in-memory representation of a KeyBackedRangeMap snapshot
|
||||
// It is invalid to look up a range in this map which not within the range of
|
||||
// [first key of map, last key of map)
|
||||
// [first key of snapshot, last key of snapshot)
|
||||
// This is ReferenceCounted as it can be large and there is no reason to copy it as
|
||||
// it should not be modified locally.
|
||||
template <typename KeyType, typename ValueType>
|
||||
struct KeyRangeMapSnapshot {
|
||||
struct KeyRangeMapSnapshot : public ReferenceCounted<KeyRangeMapSnapshot<KeyType, ValueType>> {
|
||||
typedef std::map<KeyType, ValueType> Map;
|
||||
|
||||
// A default constructed map snapshot can't be used to look anything up because no ranges are covered.
|
||||
|
@ -310,7 +313,10 @@ public:
|
|||
}
|
||||
|
||||
ACTOR template <class Transaction>
|
||||
static Future<LocalSnapshot> getSnapshotActor(KeyBackedRangeMap self, Transaction tr, KeyType begin, KeyType end) {
|
||||
static Future<Reference<LocalSnapshot>> getSnapshotActor(KeyBackedRangeMap self,
|
||||
Transaction tr,
|
||||
KeyType begin,
|
||||
KeyType end) {
|
||||
kbt_debug("RANGEMAP snapshot start\n");
|
||||
|
||||
// The range read should start at KeySelector::lastLessOrEqual(begin) to get the range which covers begin.
|
||||
|
@ -326,12 +332,12 @@ public:
|
|||
state Future<RangeResultType> boundariesFuture =
|
||||
self.kvMap.getRange(tr, rangeBegin, rangeEnd, GetRangeLimits(readSize));
|
||||
|
||||
state LocalSnapshot result;
|
||||
state Reference<LocalSnapshot> result = makeReference<LocalSnapshot>();
|
||||
loop {
|
||||
kbt_debug("RANGEMAP snapshot loop\n");
|
||||
RangeResultType boundaries = wait(boundariesFuture);
|
||||
for (auto const& bv : boundaries.results) {
|
||||
result.map[bv.first] = bv.second;
|
||||
result->map[bv.first] = bv.second;
|
||||
}
|
||||
if (!boundaries.more) {
|
||||
break;
|
||||
|
@ -345,12 +351,12 @@ public:
|
|||
|
||||
// LocalSnapshot requires initialization of the widest range it will be queried with, so we must ensure that has
|
||||
// been done now. If the map does not start at or before begin then add begin with a default value type
|
||||
if (result.map.empty() || result.map.begin()->first > begin) {
|
||||
result.map[begin] = ValueType();
|
||||
if (result->map.empty() || result->map.begin()->first > begin) {
|
||||
result->map[begin] = ValueType();
|
||||
}
|
||||
// The map is no longer empty, so if the last key is not >= end then add end with a default value type
|
||||
if (result.map.rbegin()->first < end) {
|
||||
result.map[end] = ValueType();
|
||||
if (result->map.rbegin()->first < end) {
|
||||
result->map[end] = ValueType();
|
||||
}
|
||||
|
||||
kbt_debug("RANGEMAP snapshot end\n");
|
||||
|
@ -361,7 +367,7 @@ public:
|
|||
// If the map in the database does not have boundaries <=begin or >=end then these boundaries will be
|
||||
// added to the returned snapshot with a default ValueType.
|
||||
template <class Transaction>
|
||||
Future<LocalSnapshot> getSnapshot(Transaction tr, KeyType const& begin, KeyType const& end) const {
|
||||
Future<Reference<LocalSnapshot>> getSnapshot(Transaction tr, KeyType const& begin, KeyType const& end) const {
|
||||
if constexpr (is_transaction_creator<Transaction>) {
|
||||
return runTransaction(tr, [=, self = *this](decltype(tr->createTransaction()) tr) {
|
||||
return self.getSnapshot(tr, begin, end);
|
||||
|
|
|
@ -631,10 +631,9 @@ public:
|
|||
}
|
||||
|
||||
// Find the closest key which is <, <=, >, or >= query
|
||||
// This is like resolving a KeySelector to a key but it will return not-present if the key is outside of the map
|
||||
// subspace. It does this without visiting any range outside of the map subspace, so it succeeds if the map is
|
||||
// located next to a shard which is currently unavailable or, more likely, an adjacent key has been modified by
|
||||
// a VersionStamp atomic op so it is not allowed to be read in a range read operation.
|
||||
// These operation can be accomplished using KeySelectors however they run the risk of touching keys outside of
|
||||
// map subspace, which can cause problems if this touches an offline range or a key which is unreadable by range
|
||||
// read operations due to having been modified with a version stamp operation in the current transaction.
|
||||
ACTOR template <class Transaction>
|
||||
static Future<Optional<KVType>> seek(KeyBackedMap self,
|
||||
Transaction tr,
|
||||
|
@ -937,10 +936,9 @@ public:
|
|||
}
|
||||
|
||||
// Find the closest key which is <, <=, >, or >= query
|
||||
// This is like resolving a KeySelector to a key but it will return not-present if the key is outside of the map
|
||||
// subspace. It does this without visiting any range outside of the map subspace, so it succeeds if the map is
|
||||
// located next to a shard which is currently unavailable or, more likely, an adjacent key has been modified by
|
||||
// a VersionStamp atomic op so it is not allowed to be read in a range read operation.
|
||||
// These operation can be accomplished using KeySelectors however they run the risk of touching keys outside of
|
||||
// map subspace, which can cause problems if this touches an offline range or a key which is unreadable by range
|
||||
// read operations due to having been modified with a version stamp operation in the current transaction.
|
||||
ACTOR template <class Transaction>
|
||||
static Future<Optional<ValueType>> seek(KeyBackedSet self,
|
||||
Transaction tr,
|
||||
|
@ -1072,9 +1070,9 @@ public:
|
|||
class KeyBackedClass {
|
||||
public:
|
||||
KeyBackedClass(StringRef prefix, Optional<Key> triggerOverride = {})
|
||||
: subSpace(prefix), trigger(triggerOverride.orDefault(subSpace.pack("_changeTrigger"_sr))) {}
|
||||
: subspace(prefix), trigger(triggerOverride.orDefault(subspace.pack("_changeTrigger"_sr))) {}
|
||||
|
||||
Subspace subSpace;
|
||||
Subspace subspace;
|
||||
WatchableTrigger trigger;
|
||||
};
|
||||
|
||||
|
|
|
@ -162,7 +162,7 @@ public:
|
|||
DataForDc(StringRef prefix) : KeyBackedClass(prefix) {}
|
||||
|
||||
auto storageWiggleDelay() const {
|
||||
auto key = subSpace.pack("storageWiggleDelay"_sr);
|
||||
auto key = subspace.pack("storageWiggleDelay"_sr);
|
||||
return KeyBackedObjectProperty<StorageWiggleDelay, decltype(IncludeVersion())>(key, IncludeVersion());
|
||||
}
|
||||
};
|
||||
|
|
|
@ -441,7 +441,7 @@ ACTOR Future<Void> checkDataConsistency(Database cx,
|
|||
sharedRandom.randomShuffle(shardOrder);
|
||||
}
|
||||
|
||||
state DDConfiguration::RangeConfigMapSnapshot userRangeConfig =
|
||||
state Reference<DDConfiguration::RangeConfigMapSnapshot> userRangeConfig =
|
||||
wait(DDConfiguration().userRangeConfig().getSnapshot(
|
||||
SystemDBWriteLockedNow(cx.getReference()), allKeys.begin, allKeys.end));
|
||||
|
||||
|
@ -466,7 +466,7 @@ ACTOR Future<Void> checkDataConsistency(Database cx,
|
|||
if (ddLargeTeamEnabled()) {
|
||||
// For every custom range that overlaps with this shard range, print it and update the replication count
|
||||
// There should only be one custom range, possibly the default range with no custom configuration at all
|
||||
for (auto it : userRangeConfig.intersectingRanges(range.begin, range.end)) {
|
||||
for (auto it : userRangeConfig->intersectingRanges(range.begin, range.end)) {
|
||||
KeyRangeRef configuredRange(it->range().begin, it->range().end);
|
||||
|
||||
CODE_PROBE(true, "Checked custom replication configuration.");
|
||||
|
|
|
@ -243,7 +243,7 @@ class DDTxnProcessorImpl {
|
|||
state Transaction tr(cx);
|
||||
|
||||
if (ddLargeTeamEnabled()) {
|
||||
wait(store(*result->userRangeConfig,
|
||||
wait(store(result->userRangeConfig,
|
||||
DDConfiguration().userRangeConfig().getSnapshot(
|
||||
SystemDBWriteLockedNow(cx.getReference()), allKeys.begin, allKeys.end)));
|
||||
}
|
||||
|
|
|
@ -538,7 +538,7 @@ public:
|
|||
auto it = self->initData->userRangeConfig->rangeContaining(keys.begin);
|
||||
int customReplicas =
|
||||
std::max(self->configuration.storageTeamSize, it->value().replicationFactor.orDefault(0));
|
||||
// ASSERT_WE_THINK(KeyRangeRef(it->range().begin, it->range().end).contains(keys));
|
||||
ASSERT_WE_THINK(KeyRangeRef(it->range().begin, it->range().end).contains(keys));
|
||||
|
||||
bool unhealthy = iShard.primarySrc.size() != customReplicas;
|
||||
if (!unhealthy && self->configuration.usableRegions > 1) {
|
||||
|
|
|
@ -303,7 +303,7 @@ Future<std::string> RestoreConfigFR::getFullStatus(Reference<ReadYourWritesTrans
|
|||
|
||||
std::string RestoreConfigFR::toString() {
|
||||
std::stringstream ss;
|
||||
ss << "uid:" << uid.toString() << " prefix:" << subSpace.key().contents().toString();
|
||||
ss << "uid:" << uid.toString() << " prefix:" << subspace.key().contents().toString();
|
||||
return ss.str();
|
||||
}
|
||||
|
||||
|
|
|
@ -108,7 +108,7 @@ public:
|
|||
|
||||
Optional<Reference<TenantCache>> ddTenantCache;
|
||||
|
||||
std::shared_ptr<DDConfiguration::RangeConfigMapSnapshot> userRangeConfig;
|
||||
Reference<DDConfiguration::RangeConfigMapSnapshot> userRangeConfig;
|
||||
|
||||
DataDistributionTracker() = default;
|
||||
|
||||
|
|
|
@ -311,7 +311,7 @@ protected:
|
|||
|
||||
LocalityMap<UID> machineLocalityMap; // locality info of machines
|
||||
|
||||
std::shared_ptr<DDConfiguration::RangeConfigMapSnapshot> userRangeConfig;
|
||||
Reference<DDConfiguration::RangeConfigMapSnapshot> userRangeConfig;
|
||||
CoalescedKeyRangeMap<bool> underReplication;
|
||||
|
||||
// A mechanism to tell actors that reference a DDTeamCollection object through a direct
|
||||
|
|
|
@ -487,7 +487,7 @@ struct DDShardInfo {
|
|||
struct InitialDataDistribution : ReferenceCounted<InitialDataDistribution> {
|
||||
InitialDataDistribution()
|
||||
: dataMoveMap(std::make_shared<DataMove>()),
|
||||
userRangeConfig(std::make_shared<DDConfiguration::RangeConfigMapSnapshot>(allKeys.begin, allKeys.end)) {}
|
||||
userRangeConfig(makeReference<DDConfiguration::RangeConfigMapSnapshot>(allKeys.begin, allKeys.end)) {}
|
||||
|
||||
// Read from dataDistributionModeKey. Whether DD is disabled. DD can be disabled persistently (mode = 0). Set mode
|
||||
// to 1 will enable all disabled parts
|
||||
|
@ -500,7 +500,7 @@ struct InitialDataDistribution : ReferenceCounted<InitialDataDistribution> {
|
|||
Optional<Key> initHealthyZoneValue; // set for maintenance mode
|
||||
KeyRangeMap<std::shared_ptr<DataMove>> dataMoveMap;
|
||||
std::vector<AuditStorageState> auditStates;
|
||||
std::shared_ptr<DDConfiguration::RangeConfigMapSnapshot> userRangeConfig;
|
||||
Reference<DDConfiguration::RangeConfigMapSnapshot> userRangeConfig;
|
||||
};
|
||||
|
||||
// Holds the permitted size and IO Bounds for a shard
|
||||
|
|
|
@ -2054,6 +2054,9 @@ ACTOR Future<Void> runTests(Reference<AsyncVar<Optional<struct ClusterController
|
|||
ASSERT(g_simulator->storagePolicy && g_simulator->tLogPolicy);
|
||||
ASSERT(!g_simulator->hasSatelliteReplication || g_simulator->satelliteTLogPolicy);
|
||||
|
||||
// This block will randomly add custom configured key ranges to the test.
|
||||
// TODO: Move this to a workload which is randomly added to tests similar to FailureInjectionWorkload classes
|
||||
// but the injected behavior should not be considered a failure for workload selection purposes.
|
||||
if (deterministicRandom()->random01() < 0.25) {
|
||||
state ReadYourWritesTransaction tr(cx);
|
||||
state bool verbose = (KEYBACKEDTYPES_DEBUG != 0);
|
||||
|
@ -2108,12 +2111,12 @@ ACTOR Future<Void> runTests(Reference<AsyncVar<Optional<struct ClusterController
|
|||
{ "b1"_sr, false, DDRangeConfig() }
|
||||
};
|
||||
|
||||
state DDConfiguration::RangeConfigMapSnapshot snapshot =
|
||||
state Reference<DDConfiguration::RangeConfigMapSnapshot> snapshot =
|
||||
wait(rangeConfig->getSnapshot(&tr, allKeys.begin, allKeys.end));
|
||||
|
||||
if (verbose) {
|
||||
fmt::print("DD User Range Config:\n{}\n",
|
||||
json_spirit::write_string(DDConfiguration::toJSON(snapshot, true),
|
||||
json_spirit::write_string(DDConfiguration::toJSON(*snapshot, true),
|
||||
json_spirit::pretty_print));
|
||||
}
|
||||
|
||||
|
@ -2140,7 +2143,7 @@ ACTOR Future<Void> runTests(Reference<AsyncVar<Optional<struct ClusterController
|
|||
DDRangeConfig rc = std::get<2>(rangeTests[i]);
|
||||
ASSERT(!verify.present() || verify->value == rc);
|
||||
|
||||
auto snapshotRange = snapshot.rangeContaining(query);
|
||||
auto snapshotRange = snapshot->rangeContaining(query);
|
||||
ASSERT(snapshotRange.value() == rc);
|
||||
// The snapshot has all ranges covered but the db may not
|
||||
if (verify.present()) {
|
||||
|
|
|
@ -32,6 +32,7 @@
|
|||
#include "flow/ObjectSerializerTraits.h"
|
||||
#include "flow/FileIdentifier.h"
|
||||
#include "flow/Optional.h"
|
||||
#include "flow/Traceable.h"
|
||||
#include <algorithm>
|
||||
#include <boost/functional/hash.hpp>
|
||||
#include <stdint.h>
|
||||
|
@ -261,13 +262,6 @@ inline void save(Archive& ar, const Optional<T>& value) {
|
|||
}
|
||||
}
|
||||
|
||||
template <class T>
|
||||
struct Traceable<Optional<T>> : std::conditional<Traceable<T>::value, std::true_type, std::false_type>::type {
|
||||
static std::string toString(const Optional<T>& value) {
|
||||
return value.present() ? Traceable<T>::toString(value.get()) : "[not set]";
|
||||
}
|
||||
};
|
||||
|
||||
template <class T>
|
||||
struct union_like_traits<Optional<T>> : std::true_type {
|
||||
using Member = Optional<T>;
|
||||
|
@ -667,6 +661,9 @@ struct TraceableString<StringRef> {
|
|||
template <>
|
||||
struct Traceable<StringRef> : TraceableStringImpl<StringRef> {};
|
||||
|
||||
template <>
|
||||
struct fmt::formatter<StringRef> : FormatUsingTraceable<StringRef> {};
|
||||
|
||||
inline std::string StringRef::printable() const {
|
||||
return Traceable<StringRef>::toString(*this);
|
||||
}
|
||||
|
@ -674,19 +671,11 @@ inline std::string StringRef::printable() const {
|
|||
template <class T>
|
||||
struct Traceable<Standalone<T>> : Traceable<T> {};
|
||||
|
||||
#define __FILE__sr StringRef(reinterpret_cast<const uint8_t*>(__FILE__), sizeof(__FILE__) - 1)
|
||||
#define __FUNCTION__sr StringRef(reinterpret_cast<const uint8_t*>(__FUNCTION__), sizeof(__FUNCTION__) - 1)
|
||||
|
||||
template <class T>
|
||||
struct fmt::formatter<Standalone<T>> : fmt::formatter<T> {};
|
||||
|
||||
template <>
|
||||
struct fmt::formatter<StringRef> : formatter<std::string> {
|
||||
template <typename FormatContext>
|
||||
auto format(const StringRef& str, FormatContext& ctx) -> decltype(ctx.out()) {
|
||||
return formatter<std::string>::format(TraceableStringImpl<StringRef>::toString(str), ctx);
|
||||
}
|
||||
};
|
||||
#define __FILE__sr StringRef(reinterpret_cast<const uint8_t*>(__FILE__), sizeof(__FILE__) - 1)
|
||||
#define __FUNCTION__sr StringRef(reinterpret_cast<const uint8_t*>(__FUNCTION__), sizeof(__FUNCTION__) - 1)
|
||||
|
||||
inline StringRef operator"" _sr(const char* str, size_t size) {
|
||||
return StringRef(reinterpret_cast<const uint8_t*>(str), size);
|
||||
|
|
|
@ -24,6 +24,7 @@
|
|||
#include <optional>
|
||||
#include <fmt/format.h>
|
||||
|
||||
#include "flow/Traceable.h"
|
||||
#include "flow/FileIdentifier.h"
|
||||
#include "flow/Error.h"
|
||||
|
||||
|
@ -266,17 +267,14 @@ private:
|
|||
std::optional<T> impl;
|
||||
};
|
||||
|
||||
template <typename T>
|
||||
struct fmt::formatter<Optional<T>> : fmt::formatter<T> {
|
||||
|
||||
template <typename FormatContext>
|
||||
auto format(const Optional<T>& opt, FormatContext& ctx) {
|
||||
if (opt.present()) {
|
||||
fmt::formatter<T>::format(opt.get(), ctx);
|
||||
return ctx.out();
|
||||
}
|
||||
return fmt::format_to(ctx.out(), "<np>");
|
||||
template <class T>
|
||||
struct Traceable<Optional<T>> : std::conditional<Traceable<T>::value, std::true_type, std::false_type>::type {
|
||||
static std::string toString(const Optional<T>& value) {
|
||||
return value.present() ? Traceable<T>::toString(value.get()) : "[not set]";
|
||||
}
|
||||
};
|
||||
|
||||
template <typename T>
|
||||
struct fmt::formatter<Optional<T>> : FormatUsingTraceable<Optional<T>> {};
|
||||
|
||||
#endif // FLOW_OPTIONAL_H
|
|
@ -27,6 +27,7 @@
|
|||
#include <string>
|
||||
#include <string_view>
|
||||
#include <type_traits>
|
||||
#include <fmt/format.h>
|
||||
|
||||
#define PRINTABLE_COMPRESS_NULLS 0
|
||||
|
||||
|
@ -244,4 +245,13 @@ struct Traceable<std::atomic<T>> : std::true_type {
|
|||
static std::string toString(const std::atomic<T>& value) { return Traceable<T>::toString(value.load()); }
|
||||
};
|
||||
|
||||
// Adapter to redirect fmt::formatter calls to Traceable for a supported type
|
||||
template <typename T>
|
||||
struct FormatUsingTraceable : fmt::formatter<std::string> {
|
||||
template <typename FormatContext>
|
||||
auto format(const T& val, FormatContext& ctx) {
|
||||
return fmt::formatter<std::string>::format(Traceable<T>::toString(val), ctx);
|
||||
}
|
||||
};
|
||||
|
||||
#endif
|
||||
|
|
Loading…
Reference in New Issue