Remove "Storages" field from data_distribution_stats
It seems like a cool idea, but it feels rushed to me. Reverting for now.
This commit is contained in:
parent
a55b0771c5
commit
e1dfa410c1
|
@ -850,15 +850,14 @@ Reads in the metrics module are not transactional and may require rpcs to comple
|
|||
>>> for k, v in db.get_range_startswith('\xff\xff/metrics/data_distribution_stats/', limit=3):
|
||||
... print(k, v)
|
||||
...
|
||||
('\xff\xff/metrics/data_distribution_stats/', '{"ShardBytes":3828000,"Storages":["697f39751849d70067eabd1b079478a5"]}')
|
||||
('\xff\xff/metrics/data_distribution_stats/mako00079', '{"ShardBytes":2013000,"Storages":["697f39751849d70067eabd1b079478a5"]}')
|
||||
('\xff\xff/metrics/data_distribution_stats/mako00126', '{"ShardBytes":3201000,"Storages":["697f39751849d70067eabd1b079478a5"]}')
|
||||
('\xff\xff/metrics/data_distribution_stats/', '{"ShardBytes":3828000}')
|
||||
('\xff\xff/metrics/data_distribution_stats/mako00079', '{"ShardBytes":2013000}')
|
||||
('\xff\xff/metrics/data_distribution_stats/mako00126', '{"ShardBytes":3201000}')
|
||||
|
||||
========================= ======== ===============
|
||||
**Field** **Type** **Description**
|
||||
------------------------- -------- ---------------
|
||||
ShardBytes number An estimate of the sum of kv sizes for this shard.
|
||||
Storages [string] A list of the storage process ids currently responsible for this shard.
|
||||
========================= ======== ===============
|
||||
|
||||
Keys starting with ``\xff\xff/metrics/health/`` represent stats about the health of the cluster, suitable for application-level throttling.
|
||||
|
|
|
@ -502,54 +502,7 @@ void DatabaseContext::registerSpecialKeySpaceModule(SpecialKeySpace::MODULE modu
|
|||
|
||||
ACTOR Future<Standalone<RangeResultRef>> getWorkerInterfaces(Reference<ClusterConnectionFile> clusterFile);
|
||||
ACTOR Future<Optional<Value>> getJSON(Database db);
|
||||
template <class F>
|
||||
Future<pair<KeyRange, Reference<LocationInfo>>> getKeyLocation(Database const& cx, Key const& key,
|
||||
F StorageServerInterface::*member,
|
||||
TransactionInfo const& info, bool isBackward = false);
|
||||
|
||||
class DDStatsRangeImpl : public SpecialKeyRangeAsyncImpl {
|
||||
public:
|
||||
explicit DDStatsRangeImpl(KeyRangeRef kr);
|
||||
Future<Standalone<RangeResultRef>> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override;
|
||||
};
|
||||
|
||||
ACTOR Future<Standalone<RangeResultRef>> ddMetricsGetRangeActor(ReadYourWritesTransaction* ryw, KeyRangeRef kr) {
|
||||
state KeyRangeRef keys = kr.removePrefix(ddStatsRange.begin);
|
||||
state Standalone<VectorRef<DDMetricsRef>> resultWithoutPrefix =
|
||||
wait(waitDataDistributionMetricsList(ryw->getDatabase(), keys, CLIENT_KNOBS->STORAGE_METRICS_SHARD_LIMIT));
|
||||
state Standalone<RangeResultRef> result;
|
||||
ryw->getDatabase()->invalidateCache(keys);
|
||||
for (const auto& ddMetricsRef_ : resultWithoutPrefix) {
|
||||
state DDMetricsRef ddMetricsRef = ddMetricsRef_;
|
||||
// each begin key is the previous end key, thus we only encode the begin key in the result
|
||||
state KeyRef beginKey = ddMetricsRef.beginKey.withPrefix(ddStatsRange.begin, result.arena());
|
||||
state pair<KeyRange, Reference<LocationInfo>> ssi = wait(getKeyLocation(
|
||||
ryw->getDatabase(), ddMetricsRef.beginKey, &StorageServerInterface::getValue, ryw->getTransactionInfo()));
|
||||
// Use json string encoded in utf-8 to encode the values, easy for adding more fields in the future
|
||||
JsonBuilderObject statsObj;
|
||||
statsObj["ShardBytes"] = ddMetricsRef.shardBytes;
|
||||
JsonBuilderArray storages;
|
||||
std::vector<UID> uids;
|
||||
for (int i = 0; i < ssi.second->size(); ++i) {
|
||||
uids.push_back(ssi.second->getId(i));
|
||||
}
|
||||
std::sort(uids.begin(), uids.end());
|
||||
for (const auto& uid : uids) {
|
||||
storages.push_back(uid.toString());
|
||||
}
|
||||
statsObj["Storages"] = storages;
|
||||
std::string statsString = statsObj.getJson();
|
||||
ValueRef bytes(result.arena(), statsString);
|
||||
result.push_back(result.arena(), KeyValueRef(beginKey, bytes));
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
DDStatsRangeImpl::DDStatsRangeImpl(KeyRangeRef kr) : SpecialKeyRangeAsyncImpl(kr) {}
|
||||
|
||||
Future<Standalone<RangeResultRef>> DDStatsRangeImpl::getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const {
|
||||
return ddMetricsGetRangeActor(ryw, kr);
|
||||
}
|
||||
struct WorkerInterfacesSpecialKeyImpl : SpecialKeyRangeBaseImpl {
|
||||
Future<Standalone<RangeResultRef>> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override {
|
||||
if (ryw->getDatabase().getPtr() && ryw->getDatabase()->getConnectionFile()) {
|
||||
|
@ -1507,7 +1460,7 @@ ACTOR Future< pair<KeyRange,Reference<LocationInfo>> > getKeyLocation_internal(
|
|||
template <class F>
|
||||
Future<pair<KeyRange, Reference<LocationInfo>>> getKeyLocation(Database const& cx, Key const& key,
|
||||
F StorageServerInterface::*member,
|
||||
TransactionInfo const& info, bool isBackward) {
|
||||
TransactionInfo const& info, bool isBackward = false) {
|
||||
auto ssi = cx->getCachedLocation( key, isBackward );
|
||||
if (!ssi.second) {
|
||||
return getKeyLocation_internal( cx, key, info, isBackward );
|
||||
|
|
|
@ -856,10 +856,7 @@ const KeyRef JSONSchemas::latencyBandConfigurationSchema = LiteralStringRef(R"co
|
|||
|
||||
const KeyRef JSONSchemas::dataDistributionStatsSchema = LiteralStringRef(R"""(
|
||||
{
|
||||
"ShardBytes": 1947000,
|
||||
"Storages": [
|
||||
"697f39751849d70067eabd1b079478a5"
|
||||
]
|
||||
"ShardBytes": 1947000
|
||||
}
|
||||
)""");
|
||||
|
||||
|
|
|
@ -348,3 +348,32 @@ Future<Standalone<RangeResultRef>> ConflictingKeysImpl::getRange(ReadYourWritesT
|
|||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
ACTOR Future<Standalone<RangeResultRef>> ddMetricsGetRangeActor(ReadYourWritesTransaction* ryw, KeyRangeRef kr) {
|
||||
try {
|
||||
auto keys = kr.removePrefix(ddStatsRange.begin);
|
||||
Standalone<VectorRef<DDMetricsRef>> resultWithoutPrefix =
|
||||
wait(waitDataDistributionMetricsList(ryw->getDatabase(), keys, CLIENT_KNOBS->STORAGE_METRICS_SHARD_LIMIT));
|
||||
Standalone<RangeResultRef> result;
|
||||
for (const auto& ddMetricsRef : resultWithoutPrefix) {
|
||||
// each begin key is the previous end key, thus we only encode the begin key in the result
|
||||
KeyRef beginKey = ddMetricsRef.beginKey.withPrefix(ddStatsRange.begin, result.arena());
|
||||
// Use json string encoded in utf-8 to encode the values, easy for adding more fields in the future
|
||||
json_spirit::mObject statsObj;
|
||||
statsObj["ShardBytes"] = ddMetricsRef.shardBytes;
|
||||
std::string statsString =
|
||||
json_spirit::write_string(json_spirit::mValue(statsObj), json_spirit::Output_options::raw_utf8);
|
||||
ValueRef bytes(result.arena(), statsString);
|
||||
result.push_back(result.arena(), KeyValueRef(beginKey, bytes));
|
||||
}
|
||||
return result;
|
||||
} catch (Error& e) {
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
DDStatsRangeImpl::DDStatsRangeImpl(KeyRangeRef kr) : SpecialKeyRangeAsyncImpl(kr) {}
|
||||
|
||||
Future<Standalone<RangeResultRef>> DDStatsRangeImpl::getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const {
|
||||
return ddMetricsGetRangeActor(ryw, kr);
|
||||
}
|
||||
|
|
|
@ -190,5 +190,11 @@ public:
|
|||
Future<Standalone<RangeResultRef>> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override;
|
||||
};
|
||||
|
||||
class DDStatsRangeImpl : public SpecialKeyRangeAsyncImpl {
|
||||
public:
|
||||
explicit DDStatsRangeImpl(KeyRangeRef kr);
|
||||
Future<Standalone<RangeResultRef>> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override;
|
||||
};
|
||||
|
||||
#include "flow/unactorcompiler.h"
|
||||
#endif
|
||||
|
|
|
@ -157,13 +157,15 @@ struct DataDistributionMetricsWorkload : KVWorkload {
|
|||
.detail("ErrorStr", errorStr.c_str())
|
||||
.detail("JSON", json_spirit::write_string(json_spirit::mValue(result[i].value.toString())));
|
||||
totalBytes += valueObj["ShardBytes"].get_int64();
|
||||
for (const auto& s : valueObj["Storages"].get_array()) {
|
||||
UID::fromString(s.get_str()); // Will throw if it's not a valid uid
|
||||
}
|
||||
}
|
||||
self->avgBytes = totalBytes / self->numShards;
|
||||
// fetch data-distribution stats for a smaller range
|
||||
ASSERT(result.size());
|
||||
state int idx = deterministicRandom()->randomInt(0, result.size());
|
||||
Standalone<RangeResultRef> res = wait(tr->getRange(
|
||||
KeyRangeRef(result[idx].key, idx + 1 < result.size() ? result[idx + 1].key : ddStatsRange.end), 100));
|
||||
ASSERT_WE_THINK(res.size() == 1 && res[0] == result[idx]); // It works good now. However, not sure in any
|
||||
// case of data-distribution, the number changes
|
||||
} catch (Error& e) {
|
||||
TraceEvent(SevError, "FailedToRetrieveDDMetrics").detail("Error", e.what());
|
||||
throw;
|
||||
|
|
Loading…
Reference in New Issue