Merge pull request #7110 from sfc-gh-xwang/features/ppw-pause-state
Adding paused/running wiggling status to status json and also the last running/paused timestamp
This commit is contained in:
commit
6bb4e341f9
|
@ -25,9 +25,13 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
|
|||
{
|
||||
"cluster":{
|
||||
"storage_wiggler": {
|
||||
"error": "some error description",
|
||||
"wiggle_server_ids":["0ccb4e0feddb55"],
|
||||
"wiggle_server_addresses": ["127.0.0.1"],
|
||||
"primary": {
|
||||
"state": {"$enum":["running", "paused", "unknown"]},
|
||||
"last_state_change_datetime": "2022-04-02 00:05:05.123 +0000",
|
||||
"last_state_change_timestamp": 1648857905.123,
|
||||
"last_round_start_datetime": "2022-04-02 00:05:05.123 +0000",
|
||||
"last_round_start_timestamp": 1648857905.123,
|
||||
"last_round_finish_datetime": "1970-01-01 00:00:00.000 +0000",
|
||||
|
@ -42,6 +46,9 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
|
|||
"finished_wiggle": 1
|
||||
},
|
||||
"remote": {
|
||||
"state": {"$enum":["running", "paused", "unknown"]},
|
||||
"last_state_change_datetime": "2022-04-02 00:05:05.123 +0000",
|
||||
"last_state_change_timestamp": 1648857905.123,
|
||||
"last_round_start_datetime": "2022-04-02 00:05:05.123 +0000",
|
||||
"last_round_start_timestamp": 1648857905.123,
|
||||
"last_round_finish_datetime": "1970-01-01 00:00:00.000 +0000",
|
||||
|
|
|
@ -1956,13 +1956,15 @@ public:
|
|||
takeRest =
|
||||
teamCollection->server_info.size() <= teamCollection->configuration.storageTeamSize ||
|
||||
teamCollection->machine_info.size() < teamCollection->configuration.storageTeamSize;
|
||||
if (takeRest &&
|
||||
teamCollection->configuration.storageMigrationType == StorageMigrationType::GRADUAL) {
|
||||
TraceEvent(SevWarn, "PerpetualWiggleSleep", teamCollection->distributorId)
|
||||
.suppressFor(SERVER_KNOBS->PERPETUAL_WIGGLE_DELAY * 4)
|
||||
.detail("ServerSize", teamCollection->server_info.size())
|
||||
.detail("MachineSize", teamCollection->machine_info.size())
|
||||
.detail("StorageTeamSize", teamCollection->configuration.storageTeamSize);
|
||||
if (takeRest) {
|
||||
teamCollection->storageWiggler->setWiggleState(StorageWiggler::PAUSE);
|
||||
if (teamCollection->configuration.storageMigrationType == StorageMigrationType::GRADUAL) {
|
||||
TraceEvent(SevWarn, "PerpetualStorageWiggleSleep", teamCollection->distributorId)
|
||||
.suppressFor(SERVER_KNOBS->PERPETUAL_WIGGLE_DELAY * 4)
|
||||
.detail("ServerSize", teamCollection->server_info.size())
|
||||
.detail("MachineSize", teamCollection->machine_info.size())
|
||||
.detail("StorageTeamSize", teamCollection->configuration.storageTeamSize);
|
||||
}
|
||||
}
|
||||
}
|
||||
wait(updateNextWigglingStorageID(teamCollection));
|
||||
|
@ -2033,6 +2035,7 @@ public:
|
|||
TEST(true); // paused because cluster is unhealthy
|
||||
moveFinishFuture = Never();
|
||||
self->includeStorageServersForWiggle();
|
||||
self->storageWiggler->setWiggleState(StorageWiggler::PAUSE);
|
||||
TraceEvent(self->configuration.storageMigrationType == StorageMigrationType::AGGRESSIVE ? SevInfo
|
||||
: SevWarn,
|
||||
"PerpetualStorageWigglePause",
|
||||
|
@ -2049,6 +2052,7 @@ public:
|
|||
wait(self->storageWiggler->startWiggle());
|
||||
auto fv = self->excludeStorageServersForWiggle(id);
|
||||
moveFinishFuture = fv;
|
||||
self->storageWiggler->setWiggleState(StorageWiggler::RUN);
|
||||
TraceEvent("PerpetualStorageWiggleStart", self->distributorId)
|
||||
.detail("Primary", self->primary)
|
||||
.detail("ServerId", id)
|
||||
|
@ -5075,6 +5079,13 @@ bool DDTeamCollection::exclusionSafetyCheck(std::vector<UID>& excludeServerIDs)
|
|||
return true;
|
||||
}
|
||||
|
||||
std::pair<StorageWiggler::State, double> DDTeamCollection::getStorageWigglerState() const {
|
||||
if (storageWiggler) {
|
||||
return { storageWiggler->getWiggleState(), storageWiggler->lastStateChangeTs };
|
||||
}
|
||||
return { StorageWiggler::INVALID, 0.0 };
|
||||
}
|
||||
|
||||
Future<Void> DDTeamCollection::run(Reference<DDTeamCollection> teamCollection,
|
||||
Reference<InitialDataDistribution> initData,
|
||||
TeamCollectionInterface tci,
|
||||
|
|
|
@ -663,6 +663,9 @@ public:
|
|||
|
||||
bool isPrimary() const { return primary; }
|
||||
|
||||
// state and last state change timestamp
|
||||
std::pair<StorageWiggler::State, double> getStorageWigglerState() const;
|
||||
|
||||
UID getDistributorId() const { return distributorId; }
|
||||
|
||||
// Keep track of servers and teams -- serves requests for getRandomTeam
|
||||
|
|
|
@ -1207,6 +1207,18 @@ static int64_t getMedianShardSize(VectorRef<DDMetricsRef> metricVec) {
|
|||
return metricVec[metricVec.size() / 2].shardBytes;
|
||||
}
|
||||
|
||||
GetStorageWigglerStateReply getStorageWigglerStates(Reference<DataDistributorData> self) {
|
||||
GetStorageWigglerStateReply reply;
|
||||
if (self->teamCollection) {
|
||||
std::tie(reply.primary, reply.lastStateChangePrimary) = self->teamCollection->getStorageWigglerState();
|
||||
if (self->teamCollection->teamCollections.size() > 1) {
|
||||
std::tie(reply.remote, reply.lastStateChangeRemote) =
|
||||
self->teamCollection->teamCollections[1]->getStorageWigglerState();
|
||||
}
|
||||
}
|
||||
return reply;
|
||||
}
|
||||
|
||||
ACTOR Future<Void> ddGetMetrics(GetDataDistributorMetricsRequest req,
|
||||
PromiseStream<GetMetricsListRequest> getShardMetricsList) {
|
||||
ErrorOr<Standalone<VectorRef<DDMetricsRef>>> result = wait(
|
||||
|
@ -1272,6 +1284,9 @@ ACTOR Future<Void> dataDistributor(DataDistributorInterface di, Reference<AsyncV
|
|||
waitNext(di.distributorExclCheckReq.getFuture())) {
|
||||
actors.add(ddExclusionSafetyCheck(exclCheckReq, self, cx));
|
||||
}
|
||||
when(GetStorageWigglerStateRequest req = waitNext(di.storageWigglerState.getFuture())) {
|
||||
req.reply.send(getStorageWigglerStates(self));
|
||||
}
|
||||
}
|
||||
} catch (Error& err) {
|
||||
if (normalDataDistributorErrors().count(err.code()) == 0) {
|
||||
|
|
|
@ -458,9 +458,10 @@ struct StorageWiggleMetrics {
|
|||
};
|
||||
|
||||
struct StorageWiggler : ReferenceCounted<StorageWiggler> {
|
||||
enum State : uint8_t { INVALID = 0, RUN = 1, PAUSE = 2 };
|
||||
AsyncVar<bool> nonEmpty;
|
||||
DDTeamCollection const* teamCollection;
|
||||
StorageWiggleMetrics metrics;
|
||||
|
||||
// data structures
|
||||
typedef std::pair<StorageMetadataType, UID> MetadataUIDP;
|
||||
// min-heap
|
||||
|
@ -468,9 +469,10 @@ struct StorageWiggler : ReferenceCounted<StorageWiggler> {
|
|||
wiggle_pq;
|
||||
std::unordered_map<UID, decltype(wiggle_pq)::handle_type> pq_handles;
|
||||
|
||||
AsyncVar<bool> nonEmpty;
|
||||
State wiggleState = State::INVALID;
|
||||
double lastStateChangeTs = 0.0; // timestamp describes when did the state change
|
||||
|
||||
explicit StorageWiggler(DDTeamCollection* collection) : teamCollection(collection), nonEmpty(false){};
|
||||
explicit StorageWiggler(DDTeamCollection* collection) : nonEmpty(false), teamCollection(collection){};
|
||||
// add server to wiggling queue
|
||||
void addServer(const UID& serverId, const StorageMetadataType& metadata);
|
||||
// remove server from wiggling queue
|
||||
|
@ -481,6 +483,24 @@ struct StorageWiggler : ReferenceCounted<StorageWiggler> {
|
|||
bool empty() const { return wiggle_pq.empty(); }
|
||||
Optional<UID> getNextServerId();
|
||||
|
||||
State getWiggleState() const { return wiggleState; }
|
||||
void setWiggleState(State s) {
|
||||
if (wiggleState != s) {
|
||||
wiggleState = s;
|
||||
lastStateChangeTs = g_network->now();
|
||||
}
|
||||
}
|
||||
static std::string getWiggleStateStr(State s) {
|
||||
switch (s) {
|
||||
case State::RUN:
|
||||
return "running";
|
||||
case State::PAUSE:
|
||||
return "paused";
|
||||
default:
|
||||
return "unknown";
|
||||
}
|
||||
}
|
||||
|
||||
// -- statistic update
|
||||
|
||||
// reset Statistic in database when perpetual wiggle is closed by user
|
||||
|
|
|
@ -36,6 +36,7 @@ struct DataDistributorInterface {
|
|||
RequestStream<struct DistributorExclusionSafetyCheckRequest> distributorExclCheckReq;
|
||||
RequestStream<struct GetDataDistributorMetricsRequest> dataDistributorMetrics;
|
||||
RequestStream<struct DistributorSplitRangeRequest> distributorSplitRange;
|
||||
RequestStream<struct GetStorageWigglerStateRequest> storageWigglerState;
|
||||
|
||||
DataDistributorInterface() {}
|
||||
explicit DataDistributorInterface(const struct LocalityData& l, UID id) : locality(l), myId(id) {}
|
||||
|
@ -56,7 +57,8 @@ struct DataDistributorInterface {
|
|||
distributorSnapReq,
|
||||
distributorExclCheckReq,
|
||||
dataDistributorMetrics,
|
||||
distributorSplitRange);
|
||||
distributorSplitRange,
|
||||
storageWigglerState);
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -165,4 +167,27 @@ struct DistributorSplitRangeRequest {
|
|||
}
|
||||
};
|
||||
|
||||
struct GetStorageWigglerStateReply {
|
||||
constexpr static FileIdentifier file_identifier = 356721;
|
||||
uint8_t primary = 0, remote = 0; // StorageWiggler::State enum
|
||||
double lastStateChangePrimary = 0.0, lastStateChangeRemote = 0.0;
|
||||
|
||||
GetStorageWigglerStateReply() {}
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, primary, remote);
|
||||
}
|
||||
};
|
||||
|
||||
struct GetStorageWigglerStateRequest {
|
||||
constexpr static FileIdentifier file_identifier = 356722;
|
||||
ReplyPromise<GetStorageWigglerStateReply> reply;
|
||||
|
||||
GetStorageWigglerStateRequest() {}
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, reply);
|
||||
}
|
||||
};
|
||||
|
||||
#endif // FDBSERVER_DATADISTRIBUTORINTERFACE_H
|
||||
|
|
|
@ -1954,13 +1954,7 @@ static Future<std::vector<StorageServerStatusInfo>> readStorageInterfaceAndMetad
|
|||
}
|
||||
state std::vector<Future<Void>> futures(servers.size());
|
||||
for (int i = 0; i < servers.size(); ++i) {
|
||||
auto& info = servers[i];
|
||||
futures[i] = fmap(
|
||||
[&info](Optional<StorageMetadataType> meta) -> Void {
|
||||
info.metadata = meta;
|
||||
return Void();
|
||||
},
|
||||
metadataMap.get(tr, servers[i].id()));
|
||||
futures[i] = store(servers[i].metadata, metadataMap.get(tr, servers[i].id()));
|
||||
// TraceEvent(SevDebug, "MetadataAppear", servers[i].id()).detail("Present", metadata.present());
|
||||
}
|
||||
wait(waitForAll(futures));
|
||||
|
@ -2800,12 +2794,18 @@ ACTOR Future<Optional<Value>> getActivePrimaryDC(Database cx, int* fullyReplicat
|
|||
}
|
||||
|
||||
// read storageWigglerStats through Read-only tx, then convert it to JSON field
|
||||
ACTOR Future<JsonBuilderObject> storageWigglerStatsFetcher(DatabaseConfiguration conf,
|
||||
ACTOR Future<JsonBuilderObject> storageWigglerStatsFetcher(Optional<DataDistributorInterface> ddWorker,
|
||||
DatabaseConfiguration conf,
|
||||
Database cx,
|
||||
bool use_system_priority) {
|
||||
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
|
||||
state Optional<Value> primaryV;
|
||||
state Optional<Value> remoteV;
|
||||
state Future<ErrorOr<GetStorageWigglerStateReply>> stateFut;
|
||||
if (ddWorker.present()) {
|
||||
stateFut = ddWorker.get().storageWigglerState.tryGetReply(GetStorageWigglerStateRequest());
|
||||
}
|
||||
|
||||
loop {
|
||||
try {
|
||||
if (use_system_priority) {
|
||||
|
@ -2819,13 +2819,36 @@ ACTOR Future<JsonBuilderObject> storageWigglerStatsFetcher(DatabaseConfiguration
|
|||
wait(tr->onError(e));
|
||||
}
|
||||
}
|
||||
|
||||
if (ddWorker.present()) {
|
||||
wait(ready(stateFut));
|
||||
}
|
||||
JsonBuilderObject res;
|
||||
if (primaryV.present()) {
|
||||
res["primary"] = ObjectReader::fromStringRef<StorageWiggleMetrics>(primaryV.get(), IncludeVersion()).toJSON();
|
||||
auto obj = ObjectReader::fromStringRef<StorageWiggleMetrics>(primaryV.get(), IncludeVersion()).toJSON();
|
||||
if (stateFut.canGet() && stateFut.get().present()) {
|
||||
auto& reply = stateFut.get().get();
|
||||
obj["state"] = StorageWiggler::getWiggleStateStr(static_cast<StorageWiggler::State>(reply.primary));
|
||||
obj["last_state_change_timestamp"] = reply.lastStateChangePrimary;
|
||||
obj["last_state_change_datetime"] = epochsToGMTString(reply.lastStateChangePrimary);
|
||||
}
|
||||
res["primary"] = obj;
|
||||
}
|
||||
if (conf.regions.size() > 1 && remoteV.present()) {
|
||||
res["remote"] = ObjectReader::fromStringRef<StorageWiggleMetrics>(remoteV.get(), IncludeVersion()).toJSON();
|
||||
auto obj = ObjectReader::fromStringRef<StorageWiggleMetrics>(remoteV.get(), IncludeVersion()).toJSON();
|
||||
if (stateFut.canGet() && stateFut.get().present()) {
|
||||
auto& reply = stateFut.get().get();
|
||||
obj["state"] = StorageWiggler::getWiggleStateStr(static_cast<StorageWiggler::State>(reply.remote));
|
||||
obj["last_state_change_timestamp"] = reply.lastStateChangeRemote;
|
||||
obj["last_state_change_datetime"] = epochsToGMTString(reply.lastStateChangeRemote);
|
||||
}
|
||||
res["remote"] = obj;
|
||||
}
|
||||
if (stateFut.canGet() && stateFut.isError()) {
|
||||
res["error"] = std::string("Can't get storage wiggler state: ") + stateFut.getError().name();
|
||||
TraceEvent(SevWarn, "StorageWigglerStatsFetcher").error(stateFut.getError());
|
||||
} else if (stateFut.canGet() && stateFut.get().isError()) {
|
||||
res["error"] = std::string("Can't get storage wiggler state: ") + stateFut.get().getError().name();
|
||||
TraceEvent(SevWarn, "StorageWigglerStatsFetcher").error(stateFut.get().getError());
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
@ -3076,7 +3099,8 @@ ACTOR Future<StatusReply> clusterGetStatus(
|
|||
|
||||
primaryWiggleValues = readStorageWiggleValues(cx, true, true);
|
||||
remoteWiggleValues = readStorageWiggleValues(cx, false, true);
|
||||
wait(store(storageWiggler, storageWigglerStatsFetcher(configuration.get(), cx, true)) &&
|
||||
wait(store(storageWiggler,
|
||||
storageWigglerStatsFetcher(db->get().distributor, configuration.get(), cx, true)) &&
|
||||
success(primaryWiggleValues) && success(remoteWiggleValues));
|
||||
|
||||
for (auto& p : primaryWiggleValues.get())
|
||||
|
|
Loading…
Reference in New Issue