Merge pull request #6482 from sfc-gh-xwang/ppw-status
Print the IP:Port address of wiggling server in status
This commit is contained in:
commit
6024bf0109
|
@ -810,6 +810,28 @@ void printStatus(StatusObjectReader statusObj,
|
||||||
outputString = outputStringCache;
|
outputString = outputStringCache;
|
||||||
outputString += "\n Unable to retrieve data status";
|
outputString += "\n Unable to retrieve data status";
|
||||||
}
|
}
|
||||||
|
// Storage Wiggle section
|
||||||
|
StatusObjectReader storageWigglerObj;
|
||||||
|
std::string storageWigglerString;
|
||||||
|
try {
|
||||||
|
if (statusObjCluster.get("storage_wiggler", storageWigglerObj)) {
|
||||||
|
int size = 0;
|
||||||
|
if (storageWigglerObj.has("wiggle_server_addresses")) {
|
||||||
|
storageWigglerString += "\n Wiggle server addresses-";
|
||||||
|
for (auto& v : storageWigglerObj.obj().at("wiggle_server_addresses").get_array()) {
|
||||||
|
storageWigglerString += " " + v.get_str();
|
||||||
|
size += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
storageWigglerString += "\n Wiggle server count - " + std::to_string(size);
|
||||||
|
}
|
||||||
|
} catch (std::runtime_error&) {
|
||||||
|
storageWigglerString += "\n Unable to retrieve storage wiggler status";
|
||||||
|
}
|
||||||
|
if (storageWigglerString.size()) {
|
||||||
|
outputString += "\n\nStorage wiggle:";
|
||||||
|
outputString += storageWigglerString;
|
||||||
|
}
|
||||||
|
|
||||||
// Operating space section
|
// Operating space section
|
||||||
outputString += "\n\nOperating space:";
|
outputString += "\n\nOperating space:";
|
||||||
|
|
|
@ -6783,6 +6783,32 @@ ACTOR Future<Void> setPerpetualStorageWiggle(Database cx, bool enable, LockAware
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ACTOR Future<std::vector<std::pair<UID, StorageWiggleValue>>> readStorageWiggleValues(Database cx,
|
||||||
|
bool primary,
|
||||||
|
bool use_system_priority) {
|
||||||
|
state const Key readKey = perpetualStorageWiggleIDPrefix.withSuffix(primary ? "primary/"_sr : "remote/"_sr);
|
||||||
|
state KeyBackedObjectMap<UID, StorageWiggleValue, decltype(IncludeVersion())> metadataMap(readKey,
|
||||||
|
IncludeVersion());
|
||||||
|
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
|
||||||
|
state std::vector<std::pair<UID, StorageWiggleValue>> res;
|
||||||
|
// read the wiggling pairs
|
||||||
|
loop {
|
||||||
|
try {
|
||||||
|
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
||||||
|
tr->setOption(FDBTransactionOptions::READ_LOCK_AWARE);
|
||||||
|
if (use_system_priority) {
|
||||||
|
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||||
|
}
|
||||||
|
wait(store(res, metadataMap.getRange(tr, UID(0, 0), Optional<UID>(), CLIENT_KNOBS->TOO_MANY)));
|
||||||
|
wait(tr->commit());
|
||||||
|
break;
|
||||||
|
} catch (Error& e) {
|
||||||
|
wait(tr->onError(e));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
ACTOR Future<Standalone<VectorRef<KeyRef>>> splitStorageMetrics(Database cx,
|
ACTOR Future<Standalone<VectorRef<KeyRef>>> splitStorageMetrics(Database cx,
|
||||||
KeyRange keys,
|
KeyRange keys,
|
||||||
StorageMetrics limit,
|
StorageMetrics limit,
|
||||||
|
|
|
@ -480,5 +480,9 @@ inline uint64_t getWriteOperationCost(uint64_t bytes) {
|
||||||
// will be 1. Otherwise, the value will be 0.
|
// will be 1. Otherwise, the value will be 0.
|
||||||
ACTOR Future<Void> setPerpetualStorageWiggle(Database cx, bool enable, LockAware lockAware = LockAware::False);
|
ACTOR Future<Void> setPerpetualStorageWiggle(Database cx, bool enable, LockAware lockAware = LockAware::False);
|
||||||
|
|
||||||
|
ACTOR Future<std::vector<std::pair<UID, StorageWiggleValue>>> readStorageWiggleValues(Database cx,
|
||||||
|
bool primary,
|
||||||
|
bool use_system_priority);
|
||||||
|
|
||||||
#include "flow/unactorcompiler.h"
|
#include "flow/unactorcompiler.h"
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -25,6 +25,8 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
|
||||||
{
|
{
|
||||||
"cluster":{
|
"cluster":{
|
||||||
"storage_wiggler": {
|
"storage_wiggler": {
|
||||||
|
"wiggle_server_ids":["0ccb4e0feddb55"],
|
||||||
|
"wiggle_server_addresses": ["127.0.0.1"],
|
||||||
"primary": {
|
"primary": {
|
||||||
"last_round_start_datetime": "Wed Feb 4 09:36:37 2022 +0000",
|
"last_round_start_datetime": "Wed Feb 4 09:36:37 2022 +0000",
|
||||||
"last_round_start_timestamp": 63811229797,
|
"last_round_start_timestamp": 63811229797,
|
||||||
|
|
|
@ -2828,25 +2828,8 @@ public:
|
||||||
|
|
||||||
// read the current map of `perpetualStorageWiggleIDPrefix`, then restore wigglingId.
|
// read the current map of `perpetualStorageWiggleIDPrefix`, then restore wigglingId.
|
||||||
ACTOR static Future<Void> readStorageWiggleMap(DDTeamCollection* self) {
|
ACTOR static Future<Void> readStorageWiggleMap(DDTeamCollection* self) {
|
||||||
|
state std::vector<std::pair<UID, StorageWiggleValue>> res =
|
||||||
state const Key readKey =
|
wait(readStorageWiggleValues(self->cx, self->primary, false));
|
||||||
perpetualStorageWiggleIDPrefix.withSuffix(self->primary ? "primary/"_sr : "remote/"_sr);
|
|
||||||
state KeyBackedObjectMap<UID, StorageWiggleValue, decltype(IncludeVersion())> metadataMap(readKey,
|
|
||||||
IncludeVersion());
|
|
||||||
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(self->cx));
|
|
||||||
state std::vector<std::pair<UID, StorageWiggleValue>> res;
|
|
||||||
// read the wiggling pairs
|
|
||||||
loop {
|
|
||||||
try {
|
|
||||||
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
|
||||||
tr->setOption(FDBTransactionOptions::READ_LOCK_AWARE);
|
|
||||||
wait(store(res, metadataMap.getRange(tr, UID(0, 0), Optional<UID>(), CLIENT_KNOBS->TOO_MANY)));
|
|
||||||
wait(tr->commit());
|
|
||||||
break;
|
|
||||||
} catch (Error& e) {
|
|
||||||
wait(tr->onError(e));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (res.size() > 0) {
|
if (res.size() > 0) {
|
||||||
// SOMEDAY: support wiggle multiple SS at once
|
// SOMEDAY: support wiggle multiple SS at once
|
||||||
ASSERT(!self->wigglingId.present()); // only single process wiggle is allowed
|
ASSERT(!self->wigglingId.present()); // only single process wiggle is allowed
|
||||||
|
|
|
@ -2928,6 +2928,7 @@ ACTOR Future<StatusReply> clusterGetStatus(
|
||||||
state JsonBuilderObject qos;
|
state JsonBuilderObject qos;
|
||||||
state JsonBuilderObject dataOverlay;
|
state JsonBuilderObject dataOverlay;
|
||||||
state JsonBuilderObject storageWiggler;
|
state JsonBuilderObject storageWiggler;
|
||||||
|
state std::unordered_set<UID> wiggleServers;
|
||||||
|
|
||||||
statusObj["protocol_version"] = format("%" PRIx64, g_network->protocolVersion().version());
|
statusObj["protocol_version"] = format("%" PRIx64, g_network->protocolVersion().version());
|
||||||
statusObj["connection_string"] = coordinators.ccr->getConnectionString().toString();
|
statusObj["connection_string"] = coordinators.ccr->getConnectionString().toString();
|
||||||
|
@ -3018,8 +3019,18 @@ ACTOR Future<StatusReply> clusterGetStatus(
|
||||||
clusterSummaryStatisticsFetcher(pMetrics, storageServerFuture, tLogFuture, &status_incomplete_reasons));
|
clusterSummaryStatisticsFetcher(pMetrics, storageServerFuture, tLogFuture, &status_incomplete_reasons));
|
||||||
|
|
||||||
if (configuration.get().perpetualStorageWiggleSpeed > 0) {
|
if (configuration.get().perpetualStorageWiggleSpeed > 0) {
|
||||||
wait(store(storageWiggler, storageWigglerStatsFetcher(configuration.get(), cx, true)));
|
state Future<std::vector<std::pair<UID, StorageWiggleValue>>> primaryWiggleValues;
|
||||||
statusObj["storage_wiggler"] = storageWiggler;
|
state Future<std::vector<std::pair<UID, StorageWiggleValue>>> remoteWiggleValues;
|
||||||
|
|
||||||
|
primaryWiggleValues = readStorageWiggleValues(cx, true, true);
|
||||||
|
remoteWiggleValues = readStorageWiggleValues(cx, false, true);
|
||||||
|
wait(store(storageWiggler, storageWigglerStatsFetcher(configuration.get(), cx, true)) &&
|
||||||
|
success(primaryWiggleValues) && success(remoteWiggleValues));
|
||||||
|
|
||||||
|
for (auto& p : primaryWiggleValues.get())
|
||||||
|
wiggleServers.insert(p.first);
|
||||||
|
for (auto& p : remoteWiggleValues.get())
|
||||||
|
wiggleServers.insert(p.first);
|
||||||
}
|
}
|
||||||
|
|
||||||
state std::vector<JsonBuilderObject> workerStatuses = wait(getAll(futures2));
|
state std::vector<JsonBuilderObject> workerStatuses = wait(getAll(futures2));
|
||||||
|
@ -3178,13 +3189,27 @@ ACTOR Future<StatusReply> clusterGetStatus(
|
||||||
statusObj["datacenter_lag"] = getLagObject(datacenterVersionDifference);
|
statusObj["datacenter_lag"] = getLagObject(datacenterVersionDifference);
|
||||||
|
|
||||||
int activeTSSCount = 0;
|
int activeTSSCount = 0;
|
||||||
|
JsonBuilderArray wiggleServerAddress;
|
||||||
for (auto& it : storageServers) {
|
for (auto& it : storageServers) {
|
||||||
if (it.first.isTss()) {
|
if (it.first.isTss()) {
|
||||||
activeTSSCount++;
|
activeTSSCount++;
|
||||||
}
|
}
|
||||||
|
if (wiggleServers.count(it.first.id())) {
|
||||||
|
wiggleServerAddress.push_back(it.first.address().toString());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
statusObj["active_tss_count"] = activeTSSCount;
|
statusObj["active_tss_count"] = activeTSSCount;
|
||||||
|
|
||||||
|
if (!storageWiggler.empty()) {
|
||||||
|
JsonBuilderArray wiggleServerUID;
|
||||||
|
for (auto& id : wiggleServers)
|
||||||
|
wiggleServerUID.push_back(id.shortString());
|
||||||
|
|
||||||
|
storageWiggler["wiggle_server_ids"] = wiggleServerUID;
|
||||||
|
storageWiggler["wiggle_server_addresses"] = wiggleServerAddress;
|
||||||
|
statusObj["storage_wiggler"] = storageWiggler;
|
||||||
|
}
|
||||||
|
|
||||||
int totalDegraded = 0;
|
int totalDegraded = 0;
|
||||||
for (auto& it : workers) {
|
for (auto& it : workers) {
|
||||||
if (it.degraded) {
|
if (it.degraded) {
|
||||||
|
|
|
@ -88,7 +88,8 @@ struct StatusWorkload : TestWorkload {
|
||||||
schemaCoverage(spath, false);
|
schemaCoverage(spath, false);
|
||||||
|
|
||||||
if (skv.second.type() == json_spirit::array_type && skv.second.get_array().size()) {
|
if (skv.second.type() == json_spirit::array_type && skv.second.get_array().size()) {
|
||||||
schemaCoverageRequirements(skv.second.get_array()[0].get_obj(), spath + "[0]");
|
if (skv.second.get_array()[0].type() != json_spirit::str_type)
|
||||||
|
schemaCoverageRequirements(skv.second.get_array()[0].get_obj(), spath + "[0]");
|
||||||
} else if (skv.second.type() == json_spirit::obj_type) {
|
} else if (skv.second.type() == json_spirit::obj_type) {
|
||||||
if (skv.second.get_obj().count("$enum")) {
|
if (skv.second.get_obj().count("$enum")) {
|
||||||
for (auto& enum_item : skv.second.get_obj().at("$enum").get_array())
|
for (auto& enum_item : skv.second.get_obj().at("$enum").get_array())
|
||||||
|
|
Loading…
Reference in New Issue