Add the hotrange fdbcli command (#9570)
* Add the hotrange fdbcli command * Remove the unnecessary state * Add the doc about the hotrange command
This commit is contained in:
parent
7a0b3c05b9
commit
7273723a43
|
@ -755,3 +755,16 @@ Removes a TSS process from quarantine, disposing of the TSS and allowing Data Di
|
|||
|
||||
Lists the storage UIDs of all TSS processes currently in quarantine.
|
||||
|
||||
hotrange
|
||||
--------
|
||||
|
||||
Utility commands for fetching sampled read bytes/ops metrics from the specified storage server.
|
||||
|
||||
``hotrange``
|
||||
|
||||
It will populate a list of available storage servers' network addresses. Users need to run this first before fetching metrics from a specific storage server. Otherwise, the address is not recognized.
|
||||
|
||||
``hotrange <IP:PORT> <bytes|readBytes|readOps> <begin> <end> <splitCount>``
|
||||
|
||||
Fetch read metrics from the given storage server to find the hot range. Run ``help hotrange`` to read the guide.
|
||||
|
||||
|
|
|
@ -0,0 +1,128 @@
|
|||
/*
|
||||
* HotRangeCommand.actor.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2023 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include <boost/lexical_cast.hpp>
|
||||
|
||||
#include "fdbcli/fdbcli.actor.h"
|
||||
|
||||
#include "fdbclient/FDBOptions.g.h"
|
||||
#include "fdbclient/IClientApi.h"
|
||||
#include "fdbclient/StorageServerInterface.h"
|
||||
|
||||
#include "fdbclient/json_spirit/json_spirit_value.h"
|
||||
#include "flow/Arena.h"
|
||||
#include "flow/FastRef.h"
|
||||
#include "flow/NetworkAddress.h"
|
||||
#include "flow/ThreadHelper.actor.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
namespace {
|
||||
|
||||
ReadHotSubRangeRequest::SplitType parseSplitType(const std::string& typeStr) {
|
||||
auto type = ReadHotSubRangeRequest::SplitType::BYTES;
|
||||
if (typeStr == "bytes") {
|
||||
type = ReadHotSubRangeRequest::BYTES;
|
||||
} else if (typeStr == "readBytes") {
|
||||
type = ReadHotSubRangeRequest::READ_BYTES;
|
||||
} else if (typeStr == "readOps") {
|
||||
type = ReadHotSubRangeRequest::READ_OPS;
|
||||
} else {
|
||||
fmt::print("Error: {} is not a valid split type. Will use bytes as the default split type\n", typeStr);
|
||||
}
|
||||
return type;
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
namespace fdb_cli {
|
||||
|
||||
ACTOR Future<bool> hotRangeCommandActor(Database localdb,
|
||||
Reference<IDatabase> db,
|
||||
std::vector<StringRef> tokens,
|
||||
std::map<std::string, StorageServerInterface>* storage_interface) {
|
||||
|
||||
if (tokens.size() == 1) {
|
||||
// initialize storage interfaces
|
||||
storage_interface->clear();
|
||||
wait(getStorageServerInterfaces(db, storage_interface));
|
||||
fmt::print("\nThe following {} storage servers can be queried:\n", storage_interface->size());
|
||||
for (const auto& [addr, _] : *storage_interface) {
|
||||
fmt::print("{}\n", addr);
|
||||
}
|
||||
fmt::print("\n");
|
||||
} else if (tokens.size() == 6) {
|
||||
if (storage_interface->size() == 0) {
|
||||
fprintf(stderr, "ERROR: no storage processes to query. You must run the `hotrange’ command first.\n");
|
||||
return false;
|
||||
}
|
||||
state Key address = tokens[1];
|
||||
// At present we only support one process(IP:Port) at a time
|
||||
if (!storage_interface->count(address.toString())) {
|
||||
fprintf(stderr, "ERROR: storage process `%s' not recognized.\n", printable(address).c_str());
|
||||
return false;
|
||||
}
|
||||
state int splitCount;
|
||||
try {
|
||||
splitCount = boost::lexical_cast<int>(tokens[5].toString());
|
||||
} catch (...) {
|
||||
fmt::print("Error: splitCount value: '{}', cannot be parsed to an Integer\n", tokens[5].toString());
|
||||
return false;
|
||||
}
|
||||
ReadHotSubRangeRequest::SplitType splitType = parseSplitType(tokens[2].toString());
|
||||
KeyRangeRef range(tokens[3], tokens[4]);
|
||||
// TODO: refactor this to support multiversion fdbcli in the future
|
||||
Standalone<VectorRef<ReadHotRangeWithMetrics>> metrics =
|
||||
wait(localdb->getHotRangeMetrics((*storage_interface)[address.toString()], range, splitType, splitCount));
|
||||
// next parse the result and form a json array for each object
|
||||
json_spirit::mArray resultArray;
|
||||
for (const auto& metric : metrics) {
|
||||
json_spirit::mObject metricObj;
|
||||
metricObj["begin"] = metric.keys.begin.toString();
|
||||
metricObj["end"] = metric.keys.end.toString();
|
||||
metricObj["readBytesPerSec"] = metric.readBandwidthSec;
|
||||
metricObj["readOpsPerSec"] = metric.readOpsSec;
|
||||
metricObj["bytes"] = metric.bytes;
|
||||
resultArray.push_back(metricObj);
|
||||
}
|
||||
// print out the json array
|
||||
const std::string result =
|
||||
json_spirit::write_string(json_spirit::mValue(resultArray), json_spirit::pretty_print);
|
||||
fmt::print("\n{}\n", result);
|
||||
} else {
|
||||
printUsage(tokens[0]);
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
CommandFactory hotRangeFactory(
|
||||
"hotrange",
|
||||
CommandHelp(
|
||||
"hotrange <IP:PORT> <bytes|readBytes|readOps> <begin> <end> <splitCount>",
|
||||
"Fetch read metrics from a given storage server to detect hot range",
|
||||
"If no arguments are specified, populates the list of storage processes that can be queried. "
|
||||
"<begin> <end> specify the range you are interested in, "
|
||||
"<bytes|readBytes|readOps> is the metric used to divide ranges, "
|
||||
"splitCount is the number of returned ranges divided by the given metric. "
|
||||
"The command will return an array of json object for each range with their metrics."
|
||||
"Notice: the three metrics are sampled by a different way, so their values are not perfectly matched.\n"));
|
||||
|
||||
} // namespace fdb_cli
|
|
@ -153,4 +153,30 @@ ACTOR Future<bool> getWorkers(Reference<IDatabase> db, std::vector<ProcessData>*
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> getStorageServerInterfaces(Reference<IDatabase> db,
|
||||
std::map<std::string, StorageServerInterface>* interfaces) {
|
||||
state Reference<ITransaction> tr = db->createTransaction();
|
||||
loop {
|
||||
interfaces->clear();
|
||||
try {
|
||||
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
state ThreadFuture<RangeResult> serverListF = tr->getRange(serverListKeys, CLIENT_KNOBS->TOO_MANY);
|
||||
wait(success(safeThreadFutureToFuture(serverListF)));
|
||||
ASSERT(!serverListF.get().more);
|
||||
ASSERT_LT(serverListF.get().size(), CLIENT_KNOBS->TOO_MANY);
|
||||
RangeResult serverList = serverListF.get();
|
||||
// decode server interfaces
|
||||
for (int i = 0; i < serverList.size(); i++) {
|
||||
auto ssi = decodeServerListValue(serverList[i].value);
|
||||
(*interfaces)[ssi.address().toString()] = ssi;
|
||||
}
|
||||
return Void();
|
||||
} catch (Error& e) {
|
||||
wait(safeThreadFutureToFuture(tr->onError(e)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace fdb_cli
|
||||
|
|
|
@ -28,6 +28,7 @@
|
|||
#include "fdbclient/Status.h"
|
||||
#include "fdbclient/KeyBackedTypes.h"
|
||||
#include "fdbclient/StatusClient.h"
|
||||
#include "fdbclient/StorageServerInterface.h"
|
||||
#include "fdbclient/DatabaseContext.h"
|
||||
#include "fdbclient/GlobalConfig.actor.h"
|
||||
#include "fdbclient/IKnobCollection.h"
|
||||
|
@ -1090,6 +1091,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise, Reference<ClusterCo
|
|||
state bool writeMode = false;
|
||||
|
||||
state std::map<Key, std::pair<Value, ClientLeaderRegInterface>> address_interface;
|
||||
state std::map<std::string, StorageServerInterface> storage_interface;
|
||||
|
||||
state FdbOptions globalOptions;
|
||||
state FdbOptions activeOptions;
|
||||
|
@ -2145,6 +2147,15 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise, Reference<ClusterCo
|
|||
continue;
|
||||
}
|
||||
|
||||
if (tokencmp(tokens[0], "hotrange")) {
|
||||
bool _result =
|
||||
wait(makeInterruptable(hotRangeCommandActor(localDb, db, tokens, &storage_interface)));
|
||||
if (!_result) {
|
||||
is_error = true;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
fprintf(stderr, "ERROR: Unknown command `%s'. Try `help'?\n", formatStringRef(tokens[0]).c_str());
|
||||
is_error = true;
|
||||
}
|
||||
|
|
|
@ -33,6 +33,7 @@
|
|||
#include "fdbclient/CoordinationInterface.h"
|
||||
#include "fdbclient/IClientApi.h"
|
||||
#include "fdbclient/StatusClient.h"
|
||||
#include "fdbclient/StorageServerInterface.h"
|
||||
#include "flow/Arena.h"
|
||||
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
@ -136,6 +137,9 @@ inline const KeyRef workerInterfacesVerifyOptionSpecialKey = "\xff\xff/managemen
|
|||
|
||||
// get all workers' info
|
||||
ACTOR Future<bool> getWorkers(Reference<IDatabase> db, std::vector<ProcessData>* workers);
|
||||
// get all storages' interface
|
||||
ACTOR Future<Void> getStorageServerInterfaces(Reference<IDatabase> db,
|
||||
std::map<std::string, StorageServerInterface>* interfaces);
|
||||
|
||||
// compare StringRef with the given c string
|
||||
bool tokencmp(StringRef token, const char* command);
|
||||
|
@ -230,6 +234,11 @@ ACTOR Future<bool> blobKeyCommandActor(Database localDb,
|
|||
std::vector<StringRef> tokens);
|
||||
// blobrestore command
|
||||
ACTOR Future<bool> blobRestoreCommandActor(Database localDb, std::vector<StringRef> tokens);
|
||||
// hotrange command
|
||||
ACTOR Future<bool> hotRangeCommandActor(Database localDb,
|
||||
Reference<IDatabase> db,
|
||||
std::vector<StringRef> tokens,
|
||||
std::map<std::string, StorageServerInterface>* storage_interface);
|
||||
|
||||
// maintenance command
|
||||
ACTOR Future<bool> setHealthyZone(Reference<IDatabase> db, StringRef zoneId, double seconds, bool printWarning = false);
|
||||
|
|
|
@ -11179,6 +11179,30 @@ Future<bool> DatabaseContext::blobRestore(KeyRange range, Optional<Version> vers
|
|||
return blobRestoreActor(Reference<DatabaseContext>::addRef(this), range, version);
|
||||
}
|
||||
|
||||
ACTOR static Future<Standalone<VectorRef<ReadHotRangeWithMetrics>>>
|
||||
getHotRangeMetricsActor(Reference<DatabaseContext> db, StorageServerInterface ssi, ReadHotSubRangeRequest req) {
|
||||
|
||||
ErrorOr<ReadHotSubRangeReply> fs = wait(ssi.getReadHotRanges.tryGetReply(req));
|
||||
if (fs.isError()) {
|
||||
fmt::print("Error({}): cannot get read hot metrics from storage server {}.\n",
|
||||
fs.getError().what(),
|
||||
ssi.address().toString());
|
||||
return Standalone<VectorRef<ReadHotRangeWithMetrics>>();
|
||||
} else {
|
||||
return fs.get().readHotRanges;
|
||||
}
|
||||
}
|
||||
|
||||
Future<Standalone<VectorRef<ReadHotRangeWithMetrics>>> DatabaseContext::getHotRangeMetrics(
|
||||
StorageServerInterface ssi,
|
||||
const KeyRange& keys,
|
||||
ReadHotSubRangeRequest::SplitType type,
|
||||
int splitCount) {
|
||||
|
||||
return getHotRangeMetricsActor(
|
||||
Reference<DatabaseContext>::addRef(this), ssi, ReadHotSubRangeRequest(keys, type, splitCount));
|
||||
}
|
||||
|
||||
int64_t getMaxKeySize(KeyRef const& key) {
|
||||
return getMaxWriteKeySize(key, true);
|
||||
}
|
||||
|
|
|
@ -313,6 +313,10 @@ public:
|
|||
Optional<int> const& minSplitBytes = {});
|
||||
|
||||
Future<Standalone<VectorRef<ReadHotRangeWithMetrics>>> getReadHotRanges(KeyRange const& keys);
|
||||
Future<Standalone<VectorRef<ReadHotRangeWithMetrics>>> getHotRangeMetrics(StorageServerInterface ssi,
|
||||
KeyRange const& keys,
|
||||
ReadHotSubRangeRequest::SplitType type,
|
||||
int splitCount);
|
||||
|
||||
// Returns the protocol version reported by the coordinator this client is connected to
|
||||
// If an expected version is given, the future won't return until the protocol version is different than expected
|
||||
|
|
|
@ -216,12 +216,7 @@ Future<Void> serveStorageMetricsRequests(ServiceType* self, StorageServerInterfa
|
|||
self->getStorageMetrics(req);
|
||||
}
|
||||
when(ReadHotSubRangeRequest req = waitNext(ssi.getReadHotRanges.getFuture())) {
|
||||
if (!self->isReadable(req.keys)) {
|
||||
CODE_PROBE(true, "readHotSubRanges immediate wrong_shard_server()", probe::decoration::rare);
|
||||
self->sendErrorWithPenalty(req.reply, wrong_shard_server(), self->getPenalty());
|
||||
} else {
|
||||
self->metrics.getReadHotRanges(req);
|
||||
}
|
||||
self->metrics.getReadHotRanges(req);
|
||||
}
|
||||
when(SplitRangeRequest req = waitNext(ssi.getRangeSplitPoints.getFuture())) {
|
||||
if (!self->isReadable(req.keys)) {
|
||||
|
|
Loading…
Reference in New Issue