createdTime based storage wiggler (#6219)

* add storagemetadata

* add StorageWiggler;

* fix serverMetadataKey bug

* add metadata tracker in storage tracker

* finish StorageWiggler

* update next storage ID

* change pid to server id

* write metadata when seed SS

* add status json fields

* remove pid based ppw iteration

* fix time expression

* fix tss metadata nonexistence; fix transaction retry when retrieving metadata

* fix checkMetadata bug when store type is wrong

* fix remove storage status json

* format code

* refactor updateNextWigglingStoragePID

* seperate storage metadata tracker and store type tracker

* rename pid

* wiggler stats

* fix completion between waitServerListChange and storageRecruiter

* solve review comments

* rename system key

* fix database lock timeout by adding lock_aware

* format code

* status json

* resolve code format/naming comments

* delete expireNow; change PerpetualStorageWiggleID's value to KeyBackedObjectMap<UID, StorageWiggleValue>

* fix omit start rount

* format code

* status json reset

* solve status json format

* improve status json latency; replace binarywriter/reader to objectwriter/reader; refactor storagewigglerstats transactions

* status timestamp
This commit is contained in:
Xiaoxi Wang 2022-02-04 15:04:30 -08:00 committed by GitHub
parent 3a4e93604a
commit 6dc5921575
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 723 additions and 236 deletions

3
.gitignore vendored
View File

@ -9,8 +9,7 @@ bindings/java/foundationdb-tests*.jar
bindings/java/fdb-java-*-sources.jar
packaging/msi/FDBInstaller.msi
build/
cmake-build-debug/
cmake-build-release/
cmake-build-*/
# Generated source, build, and packaging files
*.g.cpp

View File

@ -1177,7 +1177,7 @@ void printStatus(StatusObjectReader statusObj,
}
}
// "db" is the handler to the multiversion databse
// "db" is the handler to the multiversion database
// localDb is the native Database object
// localDb is rarely needed except the "db" has not establised a connection to the cluster where the operation will
// return Never as we expect status command to always return, we use "localDb" to return the default result

View File

@ -1196,4 +1196,34 @@ struct ReadBlobGranuleContext {
bool debugNoMaterialize;
};
// Store metadata associated with each storage server. Now it only contains data be used in perpetual storage wiggle.
struct StorageMetadataType {
constexpr static FileIdentifier file_identifier = 732123;
// when the SS is initialized
uint64_t createdTime; // comes from Platform::timer_int()
StorageMetadataType() : createdTime(0) {}
StorageMetadataType(uint64_t t) : createdTime(t) {}
// To change this serialization, ProtocolVersion::StorageMetadata must be updated, and downgrades need
// to be considered
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, createdTime);
}
};
// store metadata of wiggle action
struct StorageWiggleValue {
constexpr static FileIdentifier file_identifier = 732124;
UID id; // storage id
StorageWiggleValue(UID id = UID(0, 0)) : id(id) {}
// To change this serialization, ProtocolVersion::PerpetualWiggleMetadata must be updated, and downgrades need
// to be considered
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, id);
}
};
#endif

View File

@ -466,6 +466,10 @@ public:
return k.expectedSize() + v.expectedSize();
}
Key serializeKey(KeyType const& key) { return space.pack(Codec<KeyType>::pack(key)); }
Value serializeValue(ValueType const& val) { return ObjectWriter::toValue(val, versionOptions); }
void erase(Reference<ReadYourWritesTransaction> tr, KeyType const& key) {
return tr->clear(space.pack(Codec<KeyType>::pack(key)));
}

View File

@ -24,6 +24,36 @@
const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
{
"cluster":{
"storage_wiggler": {
"primary": {
"last_round_start_datetime": "Wed Feb 4 09:36:37 2022 +0000",
"last_round_start_timestamp": 63811229797,
"last_round_finish_datetime": "Thu Jan 1 00:00:00 1970 +0000",
"last_round_finish_timestamp": 0,
"smoothed_round_seconds": 1,
"finished_round": 1,
"last_wiggle_start_datetime": "Wed Feb 4 09:36:37 2022 +0000",
"last_wiggle_start_timestamp": 63811229797,
"last_wiggle_finish_datetime": "Thu Jan 1 00:00:00 1970 +0000",
"last_wiggle_finish_timestamp": 0,
"smoothed_wiggle_seconds": 1,
"finished_wiggle": 1
},
"remote": {
"last_round_start_datetime": "Wed Feb 4 09:36:37 2022 +0000",
"last_round_start_timestamp": 63811229797,
"last_round_finish_datetime": "Thu Jan 1 00:00:00 1970 +0000",
"last_round_finish_timestamp": 0,
"smoothed_round_seconds": 1,
"finished_round": 1,
"last_wiggle_start_datetime": "Wed Feb 4 09:36:37 2022 +0000",
"last_wiggle_start_timestamp": 63811229797,
"last_wiggle_finish_datetime": "Thu Jan 1 00:00:00 1970 +0000",
"last_wiggle_finish_timestamp": 0,
"smoothed_wiggle_seconds": 1,
"finished_wiggle": 1
}
},
"layers":{
"_valid":true,
"_error":"some error description"
@ -103,6 +133,10 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
"coordinator"
]
},
"storage_metadata":{
"created_time_datetime":"Thu Jan 1 00:00:00 1970 +0000",
"created_time_timestamp": 0
},
"data_version":12341234,
"durable_version":12341234,
"data_lag": {

View File

@ -369,6 +369,9 @@ UID decodeTssQuarantineKey(KeyRef const& key) {
const KeyRangeRef tssMismatchKeys(LiteralStringRef("\xff/tssMismatch/"), LiteralStringRef("\xff/tssMismatch0"));
const KeyRangeRef serverMetadataKeys(LiteralStringRef("\xff/serverMetadata/"),
LiteralStringRef("\xff/serverMetadata0"));
const KeyRangeRef serverTagKeys(LiteralStringRef("\xff/serverTag/"), LiteralStringRef("\xff/serverTag0"));
const KeyRef serverTagPrefix = serverTagKeys.begin;
@ -633,7 +636,10 @@ const KeyRef configKeysPrefix = configKeys.begin;
const KeyRef perpetualStorageWiggleKey(LiteralStringRef("\xff/conf/perpetual_storage_wiggle"));
const KeyRef perpetualStorageWiggleLocalityKey(LiteralStringRef("\xff/conf/perpetual_storage_wiggle_locality"));
const KeyRef wigglingStorageServerKey(LiteralStringRef("\xff/storageWigglePID"));
const KeyRef perpetualStorageWiggleIDPrefix(
LiteralStringRef("\xff/storageWiggleID/")); // withSuffix /primary or /remote
const KeyRef perpetualStorageWiggleStatsPrefix(
LiteralStringRef("\xff/storageWiggleStats/")); // withSuffix /primary or /remote
const KeyRef triggerDDTeamInfoPrintKey(LiteralStringRef("\xff/triggerDDTeamInfoPrint"));

View File

@ -131,6 +131,10 @@ UID decodeTssQuarantineKey(KeyRef const&);
// For recording tss mismatch details in the system keyspace
extern const KeyRangeRef tssMismatchKeys;
// \xff/serverMetadata/[[storageInterfaceUID]] = [[StorageMetadataType]]
// Note: storageInterfaceUID is the one stated in the file name
extern const KeyRangeRef serverMetadataKeys;
// "\xff/serverTag/[[serverID]]" = "[[Tag]]"
// Provides the Tag for the given serverID. Used to access a
// storage server's corresponding TLog in order to apply mutations.
@ -214,7 +218,9 @@ extern const KeyRef configKeysPrefix;
extern const KeyRef perpetualStorageWiggleKey;
extern const KeyRef perpetualStorageWiggleLocalityKey;
extern const KeyRef wigglingStorageServerKey;
extern const KeyRef perpetualStorageWiggleIDPrefix;
extern const KeyRef perpetualStorageWiggleStatsPrefix;
// Change the value of this key to anything and that will trigger detailed data distribution team info log.
extern const KeyRef triggerDDTeamInfoPrintKey;

View File

@ -28,6 +28,7 @@
#include "fdbclient/DatabaseContext.h"
#include "fdbclient/ManagementAPI.actor.h"
#include "fdbclient/RunTransaction.actor.h"
#include "fdbclient/KeyBackedTypes.h"
#include "fdbrpc/Replication.h"
#include "fdbserver/DataDistribution.actor.h"
#include "fdbserver/FDBExecHelper.actor.h"
@ -162,6 +163,9 @@ public:
ACTOR Future<Void> updateServerMetrics(Reference<TCServerInfo> server);
// Read storage metadata from database, and do necessary updates
ACTOR Future<Void> readOrCreateStorageMetadata(DDTeamCollection* self, TCServerInfo* server);
// TeamCollection's machine team information
class TCMachineTeamInfo : public ReferenceCounted<TCMachineTeamInfo> {
public:
@ -654,15 +658,18 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
int64_t unhealthyServers;
std::map<int,int> priority_teams;
std::map<UID, Reference<TCServerInfo>> server_info;
std::map<Key, std::vector<Reference<TCServerInfo>>> pid2server_info; // some process may serve as multiple storage servers
std::vector<AddressExclusion> wiggle_addresses; // collection of wiggling servers' address
std::map<UID, Reference<TCServerInfo>> tss_info_by_pair;
std::map<UID, Reference<TCServerInfo>> server_and_tss_info; // TODO could replace this with an efficient way to do a read-only concatenation of 2 data structures?
std::map<Key, int> lagging_zones; // zone to number of storage servers lagging
AsyncVar<bool> disableFailingLaggingServers;
Optional<Key> wigglingPid; // Process id of current wiggling storage server;
Reference<AsyncVar<bool>> pauseWiggle;
// storage wiggle info
Reference<StorageWiggler> storageWiggler;
std::vector<AddressExclusion> wiggleAddresses; // collection of wiggling servers' address
Optional<UID> wigglingId; // Process id of current wiggling storage server;
Reference<AsyncVar<bool>> pauseWiggle;
Reference<AsyncVar<bool>> processingWiggle; // track whether wiggling relocation is being processed
PromiseStream<StorageWiggleValue> nextWiggleInfo;
// machine_info has all machines info; key must be unique across processes on the same machine
std::map<Standalone<StringRef>, Reference<TCMachineInfo>> machine_info;
@ -775,7 +782,8 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
PromiseStream<Promise<int>> getUnhealthyRelocationCount)
: cx(cx), distributorId(distributorId), configuration(configuration), doBuildTeams(true),
lastBuildTeamsFailed(false), teamBuilder(Void()), lock(lock), output(output), unhealthyServers(0),
processingWiggle(processingWiggle), shardsAffectedByTeamFailure(shardsAffectedByTeamFailure),
storageWiggler(makeReference<StorageWiggler>(this)), processingWiggle(processingWiggle),
shardsAffectedByTeamFailure(shardsAffectedByTeamFailure),
initialFailureReactionDelay(
delayed(readyToStart, SERVER_KNOBS->INITIAL_FAILURE_REACTION_DELAY, TaskPriority::DataDistribution)),
initializationDoneActor(logOnCompletion(readyToStart && initialFailureReactionDelay, this)),
@ -845,6 +853,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
info->collection = nullptr;
}
storageWiggler->teamCollection = nullptr;
// TraceEvent("DDTeamCollectionDestructed", distributorId)
// .detail("Primary", primary)
// .detail("ServerTrackerDestroyed", server_info.size());
@ -1249,21 +1258,20 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
self->healthyZone.set(initTeams->initHealthyZoneValue);
// SOMEDAY: If some servers have teams and not others (or some servers have more data than others) and there is
// an address/locality collision, should we preferentially mark the least used server as undesirable?
for (auto i = initTeams->allServers.begin(); i != initTeams->allServers.end(); ++i) {
if (self->shouldHandleServer(i->first)) {
if (!self->isValidLocality(self->configuration.storagePolicy, i->first.locality)) {
for (auto& server : initTeams->allServers) {
if (self->shouldHandleServer(server.first)) {
if (!self->isValidLocality(self->configuration.storagePolicy, server.first.locality)) {
TraceEvent(SevWarnAlways, "MissingLocality")
.detail("Server", i->first.uniqueID)
.detail("Locality", i->first.locality.toString());
auto addr = i->first.stableAddress();
.detail("Server", server.first.uniqueID)
.detail("Locality", server.first.locality.toString());
auto addr = server.first.stableAddress();
self->invalidLocalityAddr.insert(AddressExclusion(addr.ip, addr.port));
if (self->checkInvalidLocalities.isReady()) {
self->checkInvalidLocalities = checkAndRemoveInvalidLocalityAddr(self);
self->addActor.send(self->checkInvalidLocalities);
}
}
self->addServer(i->first, i->second, self->serverTrackerErrorOut, 0, ddEnabledState);
self->addServer(server.first, server.second, self->serverTrackerErrorOut, 0, ddEnabledState);
}
}
@ -2594,10 +2602,6 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
server_info[newServer.id()] = r;
// Establish the relation between server and machine
checkAndCreateMachine(r);
// Add storage server to pid map
ASSERT(r->lastKnownInterface.locality.processId().present());
StringRef pid = r->lastKnownInterface.locality.processId().get();
pid2server_info[pid].push_back(r);
}
r->tracker =
@ -2798,19 +2802,8 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
// ASSERT( !shardsAffectedByTeamFailure->getServersForTeam( t ) for all t in teams that contain removedServer )
Reference<TCServerInfo> removedServerInfo = server_info[removedServer];
// Step: Remove TCServerInfo from pid2server_info
ASSERT(removedServerInfo->lastKnownInterface.locality.processId().present());
StringRef pid = removedServerInfo->lastKnownInterface.locality.processId().get();
auto& info_vec = pid2server_info[pid];
for (size_t i = 0; i < info_vec.size(); ++i) {
if (info_vec[i] == removedServerInfo) {
info_vec[i--] = info_vec.back();
info_vec.pop_back();
}
}
if (info_vec.size() == 0) {
pid2server_info.erase(pid);
}
// Step: Remove TCServerInfo from storageWiggler
storageWiggler->removeServer(removedServer);
// Step: Remove server team that relate to removedServer
// Find all servers with which the removedServer shares teams
@ -2926,34 +2919,41 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
.detail("DesiredTeamsPerServer", SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER);
}
// Adds storage servers held on process of which the Process Id is “pid” into excludeServers which prevent
// Adds storage servers held on process of which the Process Id is “id” into excludeServers which prevent
// recruiting the wiggling storage servers and let teamTracker start to move data off the affected teams;
// Return a vector of futures wait for all data is moved to other teams.
std::vector<Future<Void>> excludeStorageServersForWiggle(const Value& pid) {
std::vector<Future<Void>> moveFutures;
if (this->pid2server_info.count(pid) != 0) {
for (auto& info : this->pid2server_info[pid]) {
AddressExclusion addr(info->lastKnownInterface.address().ip, info->lastKnownInterface.address().port);
if (this->excludedServers.count(addr) &&
this->excludedServers.get(addr) != DDTeamCollection::Status::NONE) {
continue; // don't overwrite the value set by actor trackExcludedServer
}
this->wiggle_addresses.push_back(addr);
this->excludedServers.set(addr, DDTeamCollection::Status::WIGGLING);
moveFutures.push_back(info->onRemoved);
Future<Void> excludeStorageServersForWiggle(const UID& id) {
Future<Void> moveFuture = Void();
if (this->server_info.count(id) != 0) {
auto& info = server_info.at(id);
AddressExclusion addr(info->lastKnownInterface.address().ip, info->lastKnownInterface.address().port);
// don't overwrite the value set by actor trackExcludedServer
bool abnormal =
this->excludedServers.count(addr) && this->excludedServers.get(addr) != DDTeamCollection::Status::NONE;
if (info->lastKnownInterface.secondaryAddress().present()) {
AddressExclusion addr2(info->lastKnownInterface.secondaryAddress().get().ip,
info->lastKnownInterface.secondaryAddress().get().port);
abnormal |= this->excludedServers.count(addr2) &&
this->excludedServers.get(addr2) != DDTeamCollection::Status::NONE;
}
if (!moveFutures.empty()) {
if (!abnormal) {
this->wiggleAddresses.push_back(addr);
this->excludedServers.set(addr, DDTeamCollection::Status::WIGGLING);
moveFuture = info->onRemoved;
this->restartRecruiting.trigger();
}
}
return moveFutures;
return moveFuture;
}
// Include wiggled storage servers by setting their status from `WIGGLING`
// to `NONE`. The storage recruiter will recruit them as new storage servers
void includeStorageServersForWiggle() {
bool included = false;
for (auto& address : this->wiggle_addresses) {
for (auto& address : this->wiggleAddresses) {
if (!this->excludedServers.count(address) ||
this->excludedServers.get(address) != DDTeamCollection::Status::WIGGLING) {
continue;
@ -2961,13 +2961,94 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
included = true;
this->excludedServers.set(address, DDTeamCollection::Status::NONE);
}
this->wiggle_addresses.clear();
this->wiggleAddresses.clear();
if (included) {
this->restartRecruiting.trigger();
}
}
};
// add server to wiggling queue
void StorageWiggler::addServer(const UID& serverId, const StorageMetadataType& metadata) {
// std::cout << "size: " << pq_handles.size() << " add " << serverId.toString() << " DC: "<< teamCollection->primary
// << std::endl;
ASSERT(!pq_handles.count(serverId));
pq_handles[serverId] = wiggle_pq.emplace(metadata, serverId);
nonEmpty.set(true);
}
void StorageWiggler::removeServer(const UID& serverId) {
// std::cout << "size: " << pq_handles.size() << " remove " << serverId.toString() << " DC: "<<
// teamCollection->primary <<std::endl;
if (contains(serverId)) { // server haven't been popped
auto handle = pq_handles.at(serverId);
pq_handles.erase(serverId);
wiggle_pq.erase(handle);
}
nonEmpty.set(!wiggle_pq.empty());
}
void StorageWiggler::updateMetadata(const UID& serverId, const StorageMetadataType& metadata) {
// std::cout << "size: " << pq_handles.size() << " update " << serverId.toString()
// << " DC: " << teamCollection->primary << std::endl;
auto handle = pq_handles.at(serverId);
if ((*handle).first.createdTime == metadata.createdTime) {
return;
}
wiggle_pq.update(handle, std::make_pair(metadata, serverId));
}
Optional<UID> StorageWiggler::getNextServerId() {
if (!wiggle_pq.empty()) {
auto [metadata, id] = wiggle_pq.top();
wiggle_pq.pop();
pq_handles.erase(id);
return Optional<UID>(id);
}
return Optional<UID>();
}
Future<Void> StorageWiggler::resetStats() {
auto newMetrics = StorageWiggleMetrics();
newMetrics.smoothed_round_duration = metrics.smoothed_round_duration;
newMetrics.smoothed_wiggle_duration = metrics.smoothed_wiggle_duration;
return StorageWiggleMetrics::runSetTransaction(teamCollection->cx, teamCollection->primary, newMetrics);
}
Future<Void> StorageWiggler::restoreStats() {
auto& metricsRef = metrics;
auto assignFunc = [&metricsRef](Optional<Value> v) {
if (v.present()) {
metricsRef = BinaryReader::fromStringRef<StorageWiggleMetrics>(v.get(), IncludeVersion());
}
return Void();
};
auto readFuture = StorageWiggleMetrics::runGetTransaction(teamCollection->cx, teamCollection->primary);
return map(readFuture, assignFunc);
}
Future<Void> StorageWiggler::startWiggle() {
metrics.last_wiggle_start = timer_int();
if (shouldStartNewRound()) {
metrics.last_round_start = metrics.last_wiggle_start;
}
return StorageWiggleMetrics::runSetTransaction(teamCollection->cx, teamCollection->primary, metrics);
}
Future<Void> StorageWiggler::finishWiggle() {
metrics.last_wiggle_finish = timer_int();
metrics.finished_wiggle += 1;
auto duration = metrics.last_wiggle_finish - metrics.last_wiggle_start;
metrics.smoothed_wiggle_duration.setTotal((double)duration);
if (shouldFinishRound()) {
metrics.last_round_finish = metrics.last_wiggle_finish;
metrics.finished_round += 1;
duration = metrics.last_round_finish - metrics.last_round_start;
metrics.smoothed_round_duration.setTotal((double)duration);
}
return StorageWiggleMetrics::runSetTransaction(teamCollection->cx, teamCollection->primary, metrics);
}
TCServerInfo::~TCServerInfo() {
if (collection && ssVersionTooFarBehind.get() && !lastKnownInterface.isTss()) {
collection->removeLaggingStorageServer(lastKnownInterface.locality.zoneId().get());
@ -4061,103 +4142,93 @@ ACTOR Future<std::vector<std::pair<StorageServerInterface, ProcessClass>>> getSe
return results;
}
// Create a transaction reading the value of `wigglingStorageServerKey` and update it to the next Process ID according
// to a sorted PID set maintained by the data distributor. If now no storage server exists, the new Process ID is 0.
ACTOR Future<Void> updateNextWigglingStoragePID(DDTeamCollection* teamCollection) {
state ReadYourWritesTransaction tr(teamCollection->cx);
state Value writeValue = ""_sr;
state const Key writeKey =
wigglingStorageServerKey.withSuffix(teamCollection->primary ? "/primary"_sr : "/remote"_sr);
// return the next ServerID in storageWiggler
ACTOR Future<UID> getNextWigglingServerID(DDTeamCollection* teamCollection) {
state Optional<Value> localityKey;
state Optional<Value> localityValue;
// NOTE: because normal \xff/conf change through `changeConfig` now will cause DD throw `movekeys_conflict()` then
// recruit a new DD, we only need to read current configuration once
if (teamCollection->configuration.perpetualStorageWiggleLocality != "0") {
// parsing format is like "datahall:0"
std::string& localityKeyValue = teamCollection->configuration.perpetualStorageWiggleLocality;
ASSERT(isValidPerpetualStorageWiggleLocality(localityKeyValue));
// get key and value from perpetual_storage_wiggle_locality.
int split = localityKeyValue.find(':');
localityKey = Optional<Value>(ValueRef((uint8_t*)localityKeyValue.c_str(), split));
localityValue = Optional<Value>(
ValueRef((uint8_t*)localityKeyValue.c_str() + split + 1, localityKeyValue.size() - split - 1));
}
loop {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
Optional<Value> locality = wait(tr.get(perpetualStorageWiggleLocalityKey));
// wait until the wiggle queue is not empty
if (teamCollection->storageWiggler->empty()) {
wait(teamCollection->storageWiggler->nonEmpty.onChange());
}
if (teamCollection->pid2server_info.empty()) {
writeValue = ""_sr;
} else if (locality.present() && locality.get().toString().compare("0")) {
// if perpetual_storage_wiggle_locality has value and not 0(disabled).
state std::string localityKeyValue = locality.get().toString();
ASSERT(isValidPerpetualStorageWiggleLocality(localityKeyValue));
// if perpetual_storage_wiggle_locality has value and not 0(disabled).
if (localityKey.present()) {
// Whether the selected server matches the locality
auto id = teamCollection->storageWiggler->getNextServerId();
if (!id.present())
continue;
auto server = teamCollection->server_info.at(id.get());
// get key and value from perpetual_storage_wiggle_locality.
int split = localityKeyValue.find(':');
state std::string localityKey = localityKeyValue.substr(0, split);
state std::string localityValue = localityKeyValue.substr(split + 1);
state Value prevValue;
state int serverInfoSize = teamCollection->pid2server_info.size();
Optional<Value> value = wait(tr.get(writeKey));
if (value.present()) {
prevValue = value.get();
} else {
// if value not present, check for locality match of the first entry in pid2server_info.
auto& info_vec = teamCollection->pid2server_info.begin()->second;
if (info_vec.size() && info_vec[0]->lastKnownInterface.locality.get(localityKey) == localityValue) {
writeValue = teamCollection->pid2server_info.begin()->first; // first entry locality matched.
} else {
prevValue = teamCollection->pid2server_info.begin()->first;
serverInfoSize--;
}
}
// If first entry of pid2server_info, did not match the locality.
if (!(writeValue.compare(LiteralStringRef("")))) {
auto nextIt = teamCollection->pid2server_info.upper_bound(prevValue);
while (true) {
if (nextIt == teamCollection->pid2server_info.end()) {
nextIt = teamCollection->pid2server_info.begin();
}
if (nextIt->second.size() &&
nextIt->second[0]->lastKnownInterface.locality.get(localityKey) == localityValue) {
writeValue = nextIt->first; // locality matched
break;
}
serverInfoSize--;
if (!serverInfoSize) {
// None of the entries in pid2server_info matched the given locality.
writeValue = LiteralStringRef("");
TraceEvent("PerpetualNextWigglingStoragePIDNotFound", teamCollection->distributorId)
.detail("WriteValue", "No process matched the given perpetualStorageWiggleLocality")
.detail("PerpetualStorageWiggleLocality", localityKeyValue);
break;
}
nextIt++;
}
}
// TraceEvent("PerpetualLocality").detail("Server", server->lastKnownInterface.locality.get(localityKey)).detail("Desire", localityValue);
if (server->lastKnownInterface.locality.get(localityKey.get()) == localityValue) {
return id.get();
} else {
Optional<Value> value = wait(tr.get(writeKey));
Value pid = teamCollection->pid2server_info.begin()->first;
if (value.present()) {
auto nextIt = teamCollection->pid2server_info.upper_bound(value.get());
if (nextIt == teamCollection->pid2server_info.end()) {
writeValue = pid;
} else {
writeValue = nextIt->first;
}
} else {
writeValue = pid;
if (teamCollection->storageWiggler->empty()) {
// None of the entries in wiggle queue matches the given locality.
TraceEvent("PerpetualStorageWiggleEmptyQueue", teamCollection->distributorId)
.detail("WriteValue", "No process matched the given perpetualStorageWiggleLocality")
.detail("PerpetualStorageWiggleLocality",
teamCollection->configuration.perpetualStorageWiggleLocality);
}
continue;
}
tr.set(writeKey, writeValue);
wait(tr.commit());
break;
} catch (Error& e) {
wait(tr.onError(e));
} else {
auto id = teamCollection->storageWiggler->getNextServerId();
if (!id.present())
continue;
return id.get();
}
}
TraceEvent(SevDebug, "PerpetualNextWigglingStoragePID", teamCollection->distributorId)
}
// Create a transaction updating `perpetualStorageWiggleIDPrefix` to the next serverID according to a sorted wiggle_pq
// maintained by the wiggler.
ACTOR Future<Void> updateNextWigglingStorageID(DDTeamCollection* teamCollection) {
state Key writeKey =
perpetualStorageWiggleIDPrefix.withSuffix(teamCollection->primary ? "primary/"_sr : "remote/"_sr);
state KeyBackedObjectMap<UID, StorageWiggleValue, decltype(IncludeVersion())> metadataMap(writeKey,
IncludeVersion());
state UID nextId = wait(getNextWigglingServerID(teamCollection));
state StorageWiggleValue value(nextId);
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(teamCollection->cx));
loop {
// write the next server id
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
metadataMap.set(tr, nextId, value);
wait(tr->commit());
break;
} catch (Error& e) {
wait(tr->onError(e));
}
}
teamCollection->nextWiggleInfo.send(value);
TraceEvent(SevDebug, "PerpetualStorageWiggleNextID", teamCollection->distributorId)
.detail("Primary", teamCollection->primary)
.detail("WriteValue", writeValue);
.detail("WriteID", nextId);
return Void();
}
// Iterate over each storage process to do storage wiggle. After initializing the first Process ID, it waits a signal
// from `perpetualStorageWiggler` indicating the wiggling of current process is finished. Then it writes the next
// Process ID to a system key: `wigglingStorageServerKey` to show the next process to wiggle.
// Process ID to a system key: `perpetualStorageWiggleIDPrefix` to show the next process to wiggle.
ACTOR Future<Void> perpetualStorageWiggleIterator(AsyncVar<bool>* stopSignal,
FutureStream<Void> finishStorageWiggleSignal,
DDTeamCollection* teamCollection) {
@ -4180,7 +4251,7 @@ ACTOR Future<Void> perpetualStorageWiggleIterator(AsyncVar<bool>* stopSignal,
.detail("StorageTeamSize", teamCollection->configuration.storageTeamSize);
}
}
wait(updateNextWigglingStoragePID(teamCollection));
wait(updateNextWigglingStorageID(teamCollection));
}
}
if (stopSignal->get()) {
@ -4191,29 +4262,43 @@ ACTOR Future<Void> perpetualStorageWiggleIterator(AsyncVar<bool>* stopSignal,
return Void();
}
// Watch the value change of `wigglingStorageServerKey`.
// Return the watch future and the current value of `wigglingStorageServerKey`.
ACTOR Future<std::pair<Future<Void>, Value>> watchPerpetualStoragePIDChange(DDTeamCollection* self) {
state ReadYourWritesTransaction tr(self->cx);
state Future<Void> watchFuture;
state Value ret;
state const Key readKey = wigglingStorageServerKey.withSuffix(self->primary ? "/primary"_sr : "/remote"_sr);
// read the current map of `perpetualStorageWiggleIDPrefix`, then restore wigglingId.
ACTOR Future<Void> readStorageWiggleMap(DDTeamCollection* self) {
state const Key readKey = perpetualStorageWiggleIDPrefix.withSuffix(self->primary ? "primary/"_sr : "remote/"_sr);
state KeyBackedObjectMap<UID, StorageWiggleValue, decltype(IncludeVersion())> metadataMap(readKey,
IncludeVersion());
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(self->cx));
state std::vector<std::pair<UID, StorageWiggleValue>> res;
// read the wiggling pairs
loop {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
Optional<Value> value = wait(tr.get(readKey));
if (value.present()) {
ret = value.get();
}
watchFuture = tr.watch(readKey);
wait(tr.commit());
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::READ_LOCK_AWARE);
wait(store(res, metadataMap.getRange(tr, UID(0, 0), Optional<UID>(), CLIENT_KNOBS->TOO_MANY)));
wait(tr->commit());
break;
} catch (Error& e) {
wait(tr.onError(e));
wait(tr->onError(e));
}
}
return std::make_pair(watchFuture, ret);
if (res.size() > 0) {
// SOMEDAY: support wiggle multiple SS at once
ASSERT(!self->wigglingId.present()); // only single process wiggle is allowed
self->wigglingId = res.begin()->first;
}
return Void();
}
auto eraseStorageWiggleMap(DDTeamCollection* self,
KeyBackedObjectMap<UID, StorageWiggleValue, decltype(IncludeVersion())>* metadataMap,
UID id) {
return runRYWTransaction(self->cx, [metadataMap, id](Reference<ReadYourWritesTransaction> tr) -> Future<Void> {
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
metadataMap->erase(tr, id);
return Void();
});
}
// periodically check whether the cluster is healthy if we continue perpetual wiggle
@ -4244,25 +4329,33 @@ ACTOR Future<Void> clusterHealthCheckForPerpetualWiggle(DDTeamCollection* self,
wait(delay(SERVER_KNOBS->CHECK_TEAM_DELAY, TaskPriority::DataDistributionLow));
}
}
// Watches the value (pid) change of \xff/storageWigglePID, and adds storage servers held on process of which the
// Process Id is “pid” into excludeServers which prevent recruiting the wiggling storage servers and let teamTracker
// start to move data off the affected teams. The wiggling process of current storage servers will be paused if the
// cluster is unhealthy and restarted once the cluster is healthy again.
// Watches the value change of `perpetualStorageWiggleIDPrefix`, and adds the storage server into excludeServers which
// prevent recruiting the wiggling storage servers and let teamTracker start to move data off the affected teams. The
// wiggling process of current storage servers will be paused if the cluster is unhealthy and restarted once the cluster
// is healthy again.
ACTOR Future<Void> perpetualStorageWiggler(AsyncVar<bool>* stopSignal,
PromiseStream<Void> finishStorageWiggleSignal,
DDTeamCollection* self) {
state Future<Void> watchFuture = Never();
state KeyBackedObjectMap<UID, StorageWiggleValue, decltype(IncludeVersion())> metadataMap(
perpetualStorageWiggleIDPrefix.withSuffix(self->primary ? "primary/"_sr : "remote/"_sr), IncludeVersion());
state Future<StorageWiggleValue> nextFuture = Never();
state Future<Void> moveFinishFuture = Never();
state int extraTeamCount = 0;
state Future<Void> ddQueueCheck = clusterHealthCheckForPerpetualWiggle(self, &extraTeamCount);
state int movingCount = 0;
state std::pair<Future<Void>, Value> res = wait(watchPerpetualStoragePIDChange(self));
ASSERT(!self->wigglingPid.present()); // only single process wiggle is allowed
self->wigglingPid = Optional<Key>(res.second);
state FutureStream<StorageWiggleValue> nextStream = self->nextWiggleInfo.getFuture();
wait(readStorageWiggleMap(self));
if (!self->wigglingId.present()) {
// skip to the next valid ID
nextFuture = waitAndForward(nextStream);
finishStorageWiggleSignal.send(Void());
}
loop {
if (self->wigglingPid.present()) {
state StringRef pid = self->wigglingPid.get();
if (self->wigglingId.present()) {
state UID id = self->wigglingId.get();
if (self->pauseWiggle->get()) {
TEST(true); // paused because cluster is unhealthy
moveFinishFuture = Never();
@ -4272,24 +4365,22 @@ ACTOR Future<Void> perpetualStorageWiggler(AsyncVar<bool>* stopSignal,
"PerpetualStorageWigglePause",
self->distributorId)
.detail("Primary", self->primary)
.detail("ProcessId", pid)
.detail("ProcessId", id)
.detail("BestTeamKeepStuckCount", self->bestTeamKeepStuckCount)
.detail("ExtraHealthyTeamCount", extraTeamCount)
.detail("HealthyTeamCount", self->healthyTeamCount)
.detail("StorageCount", movingCount);
.detail("HealthyTeamCount", self->healthyTeamCount);
} else {
TEST(true); // start wiggling
choose {
when(wait(waitUntilHealthy(self))) {
auto fv = self->excludeStorageServersForWiggle(pid);
movingCount = fv.size();
moveFinishFuture = waitForAll(fv);
TEST(true); // start wiggling
wait(self->storageWiggler->startWiggle());
auto fv = self->excludeStorageServersForWiggle(id);
moveFinishFuture = fv;
TraceEvent("PerpetualStorageWiggleStart", self->distributorId)
.detail("Primary", self->primary)
.detail("ProcessId", pid)
.detail("ProcessId", id)
.detail("ExtraHealthyTeamCount", extraTeamCount)
.detail("HealthyTeamCount", self->healthyTeamCount)
.detail("StorageCount", movingCount);
.detail("HealthyTeamCount", self->healthyTeamCount);
}
when(wait(self->pauseWiggle->onChange())) { continue; }
}
@ -4297,21 +4388,15 @@ ACTOR Future<Void> perpetualStorageWiggler(AsyncVar<bool>* stopSignal,
}
choose {
when(wait(watchFuture)) {
ASSERT(!self->wigglingPid.present()); // the previous wiggle must be finished
watchFuture = Never();
// read new pid and set the next watch Future
wait(store(res, watchPerpetualStoragePIDChange(self)));
self->wigglingPid = Optional<Key>(res.second);
when(StorageWiggleValue value = wait(nextFuture)) {
ASSERT(!self->wigglingId.present()); // the previous wiggle must be finished
nextFuture = Never();
self->wigglingId = value.id;
// random delay
wait(delayJittered(5.0, TaskPriority::DataDistributionLow));
}
when(wait(moveFinishFuture)) {
ASSERT(self->wigglingPid.present());
StringRef pid = self->wigglingPid.get();
TEST(pid != LiteralStringRef("")); // finish wiggling this process
ASSERT(self->wigglingId.present());
self->waitUntilRecruited.set(true);
self->restartTeamBuilder.trigger();
@ -4319,11 +4404,12 @@ ACTOR Future<Void> perpetualStorageWiggler(AsyncVar<bool>* stopSignal,
self->includeStorageServersForWiggle();
TraceEvent("PerpetualStorageWiggleFinish", self->distributorId)
.detail("Primary", self->primary)
.detail("ProcessId", pid.toString())
.detail("StorageCount", movingCount);
.detail("ProcessId", self->wigglingId.get());
self->wigglingPid.reset();
watchFuture = res.first;
wait(eraseStorageWiggleMap(self, &metadataMap, self->wigglingId.get()) &&
self->storageWiggler->finishWiggle());
self->wigglingId.reset();
nextFuture = waitAndForward(nextStream);
finishStorageWiggleSignal.send(Void());
extraTeamCount = std::max(0, extraTeamCount - 1);
}
@ -4335,12 +4421,12 @@ ACTOR Future<Void> perpetualStorageWiggler(AsyncVar<bool>* stopSignal,
}
}
if (self->wigglingPid.present()) {
if (self->wigglingId.present()) {
self->includeStorageServersForWiggle();
TraceEvent("PerpetualStorageWiggleExitingPause", self->distributorId)
.detail("Primary", self->primary)
.detail("ProcessId", self->wigglingPid.get());
self->wigglingPid.reset();
.detail("ProcessId", self->wigglingId.get());
self->wigglingId.reset();
}
return Void();
@ -4360,7 +4446,7 @@ ACTOR Future<Void> monitorPerpetualStorageWiggle(DDTeamCollection* teamCollectio
state ReadYourWritesTransaction tr(teamCollection->cx);
loop {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
Optional<Standalone<StringRef>> value = wait(tr.get(perpetualStorageWiggleKey));
if (value.present()) {
@ -4371,6 +4457,7 @@ ACTOR Future<Void> monitorPerpetualStorageWiggle(DDTeamCollection* teamCollectio
ASSERT(speed == 1 || speed == 0);
if (speed == 1 && stopWiggleSignal.get()) { // avoid duplicated start
wait(teamCollection->storageWiggler->restoreStats());
stopWiggleSignal.set(false);
collection.add(perpetualStorageWiggleIterator(
&stopWiggleSignal, finishStorageWiggleSignal.getFuture(), teamCollection));
@ -4379,10 +4466,11 @@ ACTOR Future<Void> monitorPerpetualStorageWiggle(DDTeamCollection* teamCollectio
TraceEvent("PerpetualStorageWiggleOpen", teamCollection->distributorId)
.detail("Primary", teamCollection->primary);
} else if (speed == 0) {
wait(teamCollection->storageWiggler->resetStats());
if (!stopWiggleSignal.get()) {
stopWiggleSignal.set(true);
wait(collection.signalAndReset());
teamCollection->pauseWiggle->set(true);
wait(collection.signalAndReset());
}
TraceEvent("PerpetualStorageWiggleClose", teamCollection->distributorId)
.detail("Primary", teamCollection->primary);
@ -4413,12 +4501,13 @@ ACTOR Future<Void> waitServerListChange(DDTeamCollection* self,
isFetchingResults = true;
serverListAndProcessClasses = getServerListAndProcessClasses(&tr);
}
when(std::vector<std::pair<StorageServerInterface, ProcessClass>> results =
when(state std::vector<std::pair<StorageServerInterface, ProcessClass>> results =
wait(serverListAndProcessClasses)) {
serverListAndProcessClasses = Never();
isFetchingResults = false;
for (int i = 0; i < results.size(); i++) {
state int i = 0;
for (; i < results.size(); i++) {
UID serverId = results[i].first.id();
StorageServerInterface const& ssi = results[i].first;
ProcessClass const& processClass = results[i].second;
@ -4533,13 +4622,47 @@ ACTOR Future<Void> keyValueStoreTypeTracker(DDTeamCollection* self, TCServerInfo
wait(brokenPromiseToNever(server->lastKnownInterface.getKeyValueStoreType.getReplyWithTaskID<KeyValueStoreType>(
TaskPriority::DataDistribution)));
server->storeType = type;
if (type != self->configuration.storageServerStoreType) {
if (self->wrongStoreTypeRemover.isReady()) {
self->wrongStoreTypeRemover = removeWrongStoreType(self);
self->addActor.send(self->wrongStoreTypeRemover);
}
}
return Never();
}
// Read storage metadata from database; Error(s) are expected to be handled by the caller
ACTOR Future<Void> readOrCreateStorageMetadata(DDTeamCollection* self, TCServerInfo* server) {
state KeyBackedObjectMap<UID, StorageMetadataType, decltype(IncludeVersion())> metadataMap(serverMetadataKeys.begin,
IncludeVersion());
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(self->cx);
state StorageMetadataType data(timer_int());
// printf("------ read metadata %s\n", server->id.toString().c_str());
// read storage metadata
loop {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
auto property = metadataMap.getProperty(server->id);
Optional<StorageMetadataType> metadata = wait(property.get(tr));
// NOTE: in upgrade testing, there may not be any metadata
if (metadata.present()) {
data = metadata.get();
} else {
metadataMap.set(tr, server->id, data);
}
wait(tr->commit());
break;
} catch (Error& e) {
wait(tr->onError(e));
}
}
// add server to wiggler
if (self->storageWiggler->contains(server->id)) {
self->storageWiggler->updateMetadata(server->id, data);
} else {
self->storageWiggler->addServer(server->id, data);
}
return Never();
}
@ -4694,7 +4817,7 @@ ACTOR Future<Void> storageServerTracker(
!self->isValidLocality(self->configuration.storagePolicy, server->lastKnownInterface.locality);
state int targetTeamNumPerServer =
(SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * (self->configuration.storageTeamSize + 1)) / 2;
state Future<Void> storageMetadataTracker = (isTss) ? Never() : readOrCreateStorageMetadata(self, server);
try {
loop {
status.isUndesired = !self->disableFailingLaggingServers.get() && server->ssVersionTooFarBehind.get();
@ -4783,7 +4906,7 @@ ACTOR Future<Void> storageServerTracker(
// wiggler.
auto invalidWiggleServer =
[](const AddressExclusion& addr, const DDTeamCollection* tc, const TCServerInfo* server) {
return server->lastKnownInterface.locality.processId() != tc->wigglingPid;
return !tc->wigglingId.present() || server->id != tc->wigglingId.get();
};
// If the storage server is in the excluded servers list, it is undesired
NetworkAddress a = server->lastKnownInterface.address();
@ -4794,7 +4917,7 @@ ACTOR Future<Void> storageServerTracker(
TraceEvent(SevInfo, "InvalidWiggleServer", self->distributorId)
.detail("Address", worstAddr.toString())
.detail("ProcessId", server->lastKnownInterface.locality.processId())
.detail("ValidWigglingId", self->wigglingPid.present());
.detail("WigglingId", self->wigglingId.present());
self->excludedServers.set(worstAddr, DDTeamCollection::Status::NONE);
worstStatus = DDTeamCollection::Status::NONE;
}
@ -4818,7 +4941,7 @@ ACTOR Future<Void> storageServerTracker(
TraceEvent(SevInfo, "InvalidWiggleServer", self->distributorId)
.detail("Address", testAddr.toString())
.detail("ProcessId", server->lastKnownInterface.locality.processId())
.detail("ValidWigglingId", self->wigglingPid.present());
.detail("ValidWigglingId", self->wigglingId.present());
self->excludedServers.set(testAddr, DDTeamCollection::Status::NONE);
testStatus = DDTeamCollection::Status::NONE;
}
@ -4839,7 +4962,7 @@ ACTOR Future<Void> storageServerTracker(
if (worstStatus == DDTeamCollection::Status::WIGGLING && !isTss) {
status.isWiggling = true;
TraceEvent("PerpetualWigglingStorageServer", self->distributorId)
TraceEvent("PerpetualStorageWiggleSS", self->distributorId)
.detail("Primary", self->primary)
.detail("Server", server->id)
.detail("ProcessId", server->lastKnownInterface.locality.processId())
@ -4885,6 +5008,7 @@ ACTOR Future<Void> storageServerTracker(
}
// Remove server from FF/serverList
storageMetadataTracker.cancel();
wait(removeStorageServer(
cx, server->id, server->lastKnownInterface.tssPairID, self->lock, ddEnabledState));
@ -4907,14 +5031,11 @@ ACTOR Future<Void> storageServerTracker(
bool localityChanged = server->lastKnownInterface.locality != newInterface.first.locality;
bool machineLocalityChanged = server->lastKnownInterface.locality.zoneId().get() !=
newInterface.first.locality.zoneId().get();
bool processIdChanged = server->lastKnownInterface.locality.processId().get() !=
newInterface.first.locality.processId().get();
TraceEvent("StorageServerInterfaceChanged", self->distributorId)
.detail("ServerID", server->id)
.detail("NewWaitFailureToken", newInterface.first.waitFailure.getEndpoint().token)
.detail("OldWaitFailureToken", server->lastKnownInterface.waitFailure.getEndpoint().token)
.detail("LocalityChanged", localityChanged)
.detail("ProcessIdChanged", processIdChanged)
.detail("MachineLocalityChanged", machineLocalityChanged);
server->lastKnownInterface = newInterface.first;
@ -4959,20 +5080,6 @@ ACTOR Future<Void> storageServerTracker(
ASSERT(destMachine.isValid());
}
// update pid2server_info if the process id has changed
if (processIdChanged) {
self->pid2server_info[newInterface.first.locality.processId().get()].push_back(
self->server_info[server->id]);
// delete the old one
auto& old_infos =
self->pid2server_info[server->lastKnownInterface.locality.processId().get()];
for (int i = 0; i < old_infos.size(); ++i) {
if (old_infos[i].getPtr() == server) {
std::swap(old_infos[i--], old_infos.back());
old_infos.pop_back();
}
}
}
// Ensure the server's server team belong to a machine team, and
// Get the newBadTeams due to the locality change
std::vector<Reference<TCTeamInfo>> newBadTeams;
@ -5027,6 +5134,7 @@ ACTOR Future<Void> storageServerTracker(
// Restart the storeTracker for the new interface. This will cancel the previous
// keyValueStoreTypeTracker
storeTypeTracker = (isTss) ? Never() : keyValueStoreTypeTracker(self, server);
storageMetadataTracker = (isTss) ? Never() : readOrCreateStorageMetadata(self, server);
hasWrongDC = !isCorrectDC(self, server);
hasInvalidLocality =
!self->isValidLocality(self->configuration.storagePolicy, server->lastKnownInterface.locality);
@ -5046,7 +5154,7 @@ ACTOR Future<Void> storageServerTracker(
.detail("WrongStoreTypeRemoved", server->wrongStoreTypeToRemove.get());
}
when(wait(server->wakeUpTracker.getFuture())) { server->wakeUpTracker = Promise<Void>(); }
when(wait(storeTypeTracker)) {}
when(wait(storageMetadataTracker || storeTypeTracker)) {}
when(wait(server->ssVersionTooFarBehind.onChange())) {}
when(wait(self->disableFailingLaggingServers.onChange())) {}
}
@ -5349,7 +5457,7 @@ ACTOR Future<Void> initializeStorage(DDTeamCollection* self,
}
}
TraceEvent("DDRecruiting")
TraceEvent("DDRecruiting", self->distributorId)
.detail("Primary", self->primary)
.detail("State", "Sending request to worker")
.detail("WorkerID", candidateWorker.worker.id())
@ -5408,7 +5516,7 @@ ACTOR Future<Void> initializeStorage(DDTeamCollection* self,
self->recruitingIds.erase(interfaceId);
self->recruitingLocalities.erase(candidateWorker.worker.stableAddress());
TraceEvent("DDRecruiting")
TraceEvent("DDRecruiting", self->distributorId)
.detail("Primary", self->primary)
.detail("State", "Finished request")
.detail("WorkerID", candidateWorker.worker.id())
@ -5421,14 +5529,16 @@ ACTOR Future<Void> initializeStorage(DDTeamCollection* self,
UID id = newServer.get().interf.id();
if (!self->server_and_tss_info.count(id)) {
if (!recruitTss || tssState->tssRecruitSuccess()) {
// signal all done after adding tss to tracking info
self->addServer(newServer.get().interf,
candidateWorker.processClass,
self->serverTrackerErrorOut,
newServer.get().addedVersion,
ddEnabledState);
self->waitUntilRecruited.set(false);
// signal all done after adding tss to tracking info
tssState->markComplete();
// signal the teamBuilder a new SS is recruited
if (!recruitTss)
self->waitUntilRecruited.set(false);
}
} else {
TraceEvent(SevWarn, "DDRecruitmentError")
@ -5510,7 +5620,7 @@ ACTOR Future<Void> storageRecruiter(DDTeamCollection* self,
for (auto s = self->server_and_tss_info.begin(); s != self->server_and_tss_info.end(); ++s) {
auto serverStatus = self->server_status.get(s->second->lastKnownInterface.id());
if (serverStatus.excludeOnRecruit()) {
TraceEvent(SevDebug, "DDRecruitExcl1")
TraceEvent(SevDebug, "DDRecruitExcl1", self->distributorId)
.detail("Primary", self->primary)
.detail("Excluding", s->second->lastKnownInterface.address());
auto addr = s->second->lastKnownInterface.stableAddress();
@ -5526,7 +5636,7 @@ ACTOR Future<Void> storageRecruiter(DDTeamCollection* self,
auto excl = self->excludedServers.getKeys();
for (const auto& s : excl) {
if (self->excludedServers.get(s) != DDTeamCollection::Status::NONE) {
TraceEvent(SevDebug, "DDRecruitExcl2")
TraceEvent(SevDebug, "DDRecruitExcl2", self->distributorId)
.detail("Primary", self->primary)
.detail("Excluding", s.toString());
exclusions.insert(s);
@ -5535,7 +5645,8 @@ ACTOR Future<Void> storageRecruiter(DDTeamCollection* self,
// Exclude workers that have invalid locality
for (auto& addr : self->invalidLocalityAddr) {
TraceEvent(SevDebug, "DDRecruitExclInvalidAddr").detail("Excluding", addr.toString());
TraceEvent(SevDebug, "DDRecruitExclInvalidAddr", self->distributorId)
.detail("Excluding", addr.toString());
exclusions.insert(addr);
}
@ -5546,7 +5657,7 @@ ACTOR Future<Void> storageRecruiter(DDTeamCollection* self,
rsr.includeDCs = self->includedDCs;
TraceEvent(rsr.criticalRecruitment ? SevWarn : SevInfo, "DDRecruiting")
TraceEvent(rsr.criticalRecruitment ? SevWarn : SevInfo, "DDRecruiting", self->distributorId)
.detail("Primary", self->primary)
.detail("State", "Sending request to CC")
.detail("Exclusions", rsr.excludeAddresses.size())
@ -5676,6 +5787,7 @@ ACTOR Future<Void> storageRecruiter(DDTeamCollection* self,
throw;
}
TEST(true); // Storage recruitment timed out
wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY, TaskPriority::DataDistribution));
}
}
}

View File

@ -24,9 +24,12 @@
#elif !defined(FDBSERVER_DATA_DISTRIBUTION_ACTOR_H)
#define FDBSERVER_DATA_DISTRIBUTION_ACTOR_H
#include <boost/heap/skew_heap.hpp>
#include <boost/heap/policies.hpp>
#include "fdbclient/NativeAPI.actor.h"
#include "fdbserver/MoveKeys.actor.h"
#include "fdbserver/LogSystem.h"
#include "fdbclient/RunTransaction.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
struct RelocateShard {
@ -289,6 +292,150 @@ ShardSizeBounds getShardSizeBounds(KeyRangeRef shard, int64_t maxShardSize);
int64_t getMaxShardSize(double dbSizeEstimate);
struct DDTeamCollection;
struct StorageWiggleMetrics {
constexpr static FileIdentifier file_identifier = 4728961;
// round statistics
// One StorageServer wiggle round is considered 'complete', when all StorageServers with creationTime < T are
// wiggled
uint64_t last_round_start = 0; // wall timer: timer_int()
uint64_t last_round_finish = 0;
TimerSmoother smoothed_round_duration;
int finished_round = 0; // finished round since storage wiggle is open
// step statistics
// 1 wiggle step as 1 storage server is wiggled in the current round
uint64_t last_wiggle_start = 0; // wall timer: timer_int()
uint64_t last_wiggle_finish = 0;
TimerSmoother smoothed_wiggle_duration;
int finished_wiggle = 0; // finished step since storage wiggle is open
StorageWiggleMetrics() : smoothed_round_duration(20.0 * 60), smoothed_wiggle_duration(10.0 * 60) {}
template <class Ar>
void serialize(Ar& ar) {
if (ar.isDeserializing) {
double step_total, round_total;
serializer(ar,
last_wiggle_start,
last_wiggle_finish,
step_total,
finished_wiggle,
last_round_start,
last_round_finish,
round_total,
finished_round);
smoothed_round_duration.reset(round_total);
smoothed_wiggle_duration.reset(step_total);
} else {
serializer(ar,
last_wiggle_start,
last_wiggle_finish,
smoothed_wiggle_duration.total,
finished_wiggle,
last_round_start,
last_round_finish,
smoothed_round_duration.total,
finished_round);
}
}
static Future<Void> runSetTransaction(Reference<ReadYourWritesTransaction> tr,
bool primary,
StorageWiggleMetrics metrics) {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
tr->set(perpetualStorageWiggleStatsPrefix.withSuffix(primary ? "primary"_sr : "remote"_sr),
ObjectWriter::toValue(metrics, IncludeVersion()));
return Void();
}
static Future<Void> runSetTransaction(Database cx, bool primary, StorageWiggleMetrics metrics) {
return runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr) -> Future<Void> {
return runSetTransaction(tr, primary, metrics);
});
}
static Future<Optional<Value>> runGetTransaction(Reference<ReadYourWritesTransaction> tr, bool primary) {
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::READ_LOCK_AWARE);
return tr->get(perpetualStorageWiggleStatsPrefix.withSuffix(primary ? "primary"_sr : "remote"_sr));
}
static Future<Optional<Value>> runGetTransaction(Database cx, bool primary) {
return runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr) -> Future<Optional<Value>> {
return runGetTransaction(tr, primary);
});
}
StatusObject toJSON() {
StatusObject result;
result["last_round_start_datetime"] = timerIntToGmt(last_round_start);
result["last_round_finish_datetime"] = timerIntToGmt(last_round_finish);
result["last_round_start_timestamp"] = last_round_start;
result["last_round_finish_timestamp"] = last_round_finish;
result["smoothed_round_seconds"] = smoothed_round_duration.estimate;
result["finished_round"] = finished_round;
result["last_wiggle_start_datetime"] = timerIntToGmt(last_wiggle_start);
result["last_wiggle_finish_datetime"] = timerIntToGmt(last_wiggle_finish);
result["last_wiggle_start_timestamp"] = last_wiggle_start;
result["last_wiggle_finish_timestamp"] = last_wiggle_finish;
result["smoothed_wiggle_seconds"] = smoothed_wiggle_duration.estimate;
result["finished_wiggle"] = finished_wiggle;
return result;
}
};
struct StorageWiggler : ReferenceCounted<StorageWiggler> {
DDTeamCollection* teamCollection;
StorageWiggleMetrics metrics;
// data structures
typedef std::pair<StorageMetadataType, UID> MetadataUIDP;
// sorted by (createdTime, UID), the least comes first
struct CompPair {
bool operator()(MetadataUIDP const& a, MetadataUIDP const& b) const {
if (a.first.createdTime == b.first.createdTime) {
return a.second > b.second;
}
// larger createdTime means the age is younger
return a.first.createdTime > b.first.createdTime;
}
};
boost::heap::skew_heap<MetadataUIDP, boost::heap::mutable_<true>, boost::heap::compare<CompPair>> wiggle_pq;
std::unordered_map<UID, decltype(wiggle_pq)::handle_type> pq_handles;
AsyncVar<bool> nonEmpty;
explicit StorageWiggler(DDTeamCollection* collection) : teamCollection(collection), nonEmpty(false){};
// add server to wiggling queue
void addServer(const UID& serverId, const StorageMetadataType& metadata);
// remove server from wiggling queue
void removeServer(const UID& serverId);
// update metadata and adjust priority_queue
void updateMetadata(const UID& serverId, const StorageMetadataType& metadata);
bool contains(const UID& serverId) { return pq_handles.count(serverId) > 0; }
bool empty() { return wiggle_pq.empty(); }
Optional<UID> getNextServerId();
// -- statistic update
// reset Statistic in database when perpetual wiggle is closed by user
Future<Void> resetStats();
// restore Statistic from database when the perpetual wiggle is opened
Future<Void> restoreStats();
// called when start wiggling a SS
Future<Void> startWiggle();
Future<Void> finishWiggle();
bool shouldStartNewRound() { return metrics.last_round_finish >= metrics.last_round_start; }
bool shouldFinishRound() {
if (wiggle_pq.empty())
return true;
return (wiggle_pq.top().first.createdTime >= metrics.last_round_start);
}
};
ACTOR Future<std::vector<std::pair<StorageServerInterface, ProcessClass>>> getServerListAndProcessClasses(
Transaction* tr);

View File

@ -1001,6 +1001,9 @@ ACTOR static Future<Void> finishMoveKeys(Database occ,
ACTOR Future<std::pair<Version, Tag>> addStorageServer(Database cx, StorageServerInterface server) {
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(cx);
state KeyBackedMap<UID, UID> tssMapDB = KeyBackedMap<UID, UID>(tssMappingKeys.begin);
state KeyBackedObjectMap<UID, StorageMetadataType, decltype(IncludeVersion())> metadataMap(serverMetadataKeys.begin,
IncludeVersion());
state int maxSkipTags = 1;
loop {
@ -1148,6 +1151,9 @@ ACTOR Future<std::pair<Version, Tag>> addStorageServer(Database cx, StorageServe
tr->addReadConflictRange(conflictRange);
tr->addWriteConflictRange(conflictRange);
StorageMetadataType metadata(timer_int());
metadataMap.set(tr, server.id(), metadata);
if (SERVER_KNOBS->TSS_HACK_IDENTITY_MAPPING) {
// THIS SHOULD NEVER BE ENABLED IN ANY NON-TESTING ENVIRONMENT
TraceEvent(SevError, "TSSIdentityMappingEnabled").log();
@ -1157,6 +1163,8 @@ ACTOR Future<std::pair<Version, Tag>> addStorageServer(Database cx, StorageServe
tr->set(serverListKeyFor(server.id()), serverListValue(server));
wait(tr->commit());
TraceEvent("AddedStorageServerSystemKey").detail("ServerID", server.id());
return std::make_pair(tr->getCommittedVersion(), tag);
} catch (Error& e) {
if (e.code() == error_code_commit_unknown_result)
@ -1196,6 +1204,8 @@ ACTOR Future<Void> removeStorageServer(Database cx,
MoveKeysLock lock,
const DDEnabledState* ddEnabledState) {
state KeyBackedMap<UID, UID> tssMapDB = KeyBackedMap<UID, UID>(tssMappingKeys.begin);
state KeyBackedObjectMap<UID, StorageMigrationType, decltype(IncludeVersion())> metadataMap(
serverMetadataKeys.begin, IncludeVersion());
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(cx);
state bool retry = false;
state int noCanRemoveCount = 0;
@ -1232,6 +1242,7 @@ ACTOR Future<Void> removeStorageServer(Database cx,
TEST(true); // Storage server already removed after retrying transaction
return Void();
}
TraceEvent(SevError, "RemoveInvalidServer").detail("ServerID", serverID);
ASSERT(false); // Removing an already-removed server? A never added server?
}
@ -1288,6 +1299,8 @@ ACTOR Future<Void> removeStorageServer(Database cx,
}
}
metadataMap.erase(tr, serverID);
retry = true;
wait(tr->commit());
return Void();
@ -1506,10 +1519,15 @@ void seedShardServers(Arena& arena, CommitTransactionRef& tr, std::vector<Storag
// This isn't strictly necessary, but make sure this is the first transaction
tr.read_snapshot = 0;
tr.read_conflict_ranges.push_back_deep(arena, allKeys);
KeyBackedObjectMap<UID, StorageMetadataType, decltype(IncludeVersion())> metadataMap(serverMetadataKeys.begin,
IncludeVersion());
StorageMetadataType metadata(timer_int());
for (auto& s : servers) {
tr.set(arena, serverTagKeyFor(s.id()), serverTagValue(server_tag[s.id()]));
tr.set(arena, serverListKeyFor(s.id()), serverListValue(s));
tr.set(arena, metadataMap.serializeKey(s.id()), metadataMap.serializeValue(metadata));
if (SERVER_KNOBS->TSS_HACK_IDENTITY_MAPPING) {
// THIS SHOULD NEVER BE ENABLED IN ANY NON-TESTING ENVIRONMENT
TraceEvent(SevError, "TSSIdentityMappingEnabled").log();

View File

@ -24,7 +24,7 @@
#include "fdbserver/MutationTracking.h"
#include "fdbserver/LogProtocolMessage.h"
#include "fdbserver/SpanContextMessage.h"
#include "fdbclient/SystemData.h"
#if defined(FDB_CLEAN_BUILD) && MUTATION_TRACKING_ENABLED
#error "You cannot use mutation tracking in a clean/release build."
#endif

View File

@ -21,6 +21,7 @@
#include <cinttypes>
#include "contrib/fmt-8.0.1/include/fmt/format.h"
#include "fdbclient/BlobWorkerInterface.h"
#include "fdbclient/KeyBackedTypes.h"
#include "fdbserver/Status.h"
#include "flow/ITrace.h"
#include "flow/Trace.h"
@ -586,6 +587,15 @@ struct RolesInfo {
obj["busiest_write_tag"] = busiestWriteTagObj;
}
}
if (!iface.isTss()) { // only storage server has Metadata field
TraceEventFields const& metadata = metrics.at("Metadata");
JsonBuilderObject metadataObj;
metadataObj["created_time_datetime"] = metadata.getValue("CreatedTimeDatetime");
metadataObj["created_time_timestamp"] = metadata.getUint64("CreatedTimeTimestamp");
obj["storage_metadata"] = metadataObj;
}
} catch (Error& e) {
if (e.code() != error_code_attribute_not_found)
throw e;
@ -1873,6 +1883,37 @@ static Future<std::vector<TraceEventFields>> getServerBusiestWriteTags(
return result;
}
ACTOR
static Future<std::vector<Optional<StorageMetadataType>>> getServerMetadata(std::vector<StorageServerInterface> servers,
Database cx,
bool use_system_priority) {
state KeyBackedObjectMap<UID, StorageMetadataType, decltype(IncludeVersion())> metadataMap(serverMetadataKeys.begin,
IncludeVersion());
state std::vector<Optional<StorageMetadataType>> res(servers.size());
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(cx);
loop {
try {
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
if (use_system_priority) {
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
}
state int i = 0;
for (i = 0; i < servers.size(); ++i) {
Optional<StorageMetadataType> metadata = wait(metadataMap.get(tr, servers[i].id(), Snapshot::True));
// TraceEvent(SevDebug, "MetadataAppear", servers[i].id()).detail("Present", metadata.present());
res[i] = metadata;
}
wait(tr->commit());
break;
} catch (Error& e) {
wait(tr->onError(e));
}
}
return res;
}
ACTOR static Future<std::vector<std::pair<StorageServerInterface, EventMap>>> getStorageServersAndMetrics(
Database cx,
std::unordered_map<NetworkAddress, WorkerInterface> address_workers,
@ -1880,16 +1921,32 @@ ACTOR static Future<std::vector<std::pair<StorageServerInterface, EventMap>>> ge
state std::vector<StorageServerInterface> servers = wait(timeoutError(getStorageServers(cx, true), 5.0));
state std::vector<std::pair<StorageServerInterface, EventMap>> results;
state std::vector<TraceEventFields> busiestWriteTags;
state std::vector<Optional<StorageMetadataType>> metadata;
wait(store(results,
getServerMetrics(servers,
address_workers,
std::vector<std::string>{
"StorageMetrics", "ReadLatencyMetrics", "ReadLatencyBands", "BusiestReadTag" })) &&
store(busiestWriteTags, getServerBusiestWriteTags(servers, address_workers, rkWorker)));
store(busiestWriteTags, getServerBusiestWriteTags(servers, address_workers, rkWorker)) &&
store(metadata, getServerMetadata(servers, cx, true)));
ASSERT(busiestWriteTags.size() == results.size());
for (int i = 0; i < busiestWriteTags.size(); ++i) {
ASSERT(busiestWriteTags.size() == results.size() && metadata.size() == results.size());
for (int i = 0; i < results.size(); ++i) {
results[i].second.emplace("BusiestWriteTag", busiestWriteTags[i]);
// FIXME: it's possible that a SS is removed between `getStorageServers` and `getServerMetadata`. Maybe we can
// read StorageServer and Metadata in an atomic transaction?
if (metadata[i].present()) {
TraceEventFields metadataField;
metadataField.addField("CreatedTimeTimestamp", std::to_string(metadata[i].get().createdTime));
metadataField.addField("CreatedTimeDatetime", timerIntToGmt(metadata[i].get().createdTime));
results[i].second.emplace("Metadata", metadataField);
} else if (!servers[i].isTss()) {
TraceEventFields metadataField;
metadataField.addField("CreatedTimeTimestamp", "0");
metadataField.addField("CreatedTimeDatetime", "[removed]");
results[i].second.emplace("Metadata", metadataField);
}
}
return results;
@ -2705,6 +2762,36 @@ ACTOR Future<Optional<Value>> getActivePrimaryDC(Database cx, int* fullyReplicat
}
}
// read storageWigglerStats through Read-only tx, then convert it to JSON field
ACTOR Future<JsonBuilderObject> storageWigglerStatsFetcher(DatabaseConfiguration conf,
Database cx,
bool use_system_priority) {
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
state Optional<Value> primaryV;
state Optional<Value> remoteV;
loop {
try {
if (use_system_priority) {
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
}
wait(store(primaryV, StorageWiggleMetrics::runGetTransaction(tr, true)) &&
store(remoteV, StorageWiggleMetrics::runGetTransaction(tr, false)));
wait(tr->commit());
break;
} catch (Error& e) {
wait(tr->onError(e));
}
}
JsonBuilderObject res;
if (primaryV.present()) {
res["primary"] = ObjectReader::fromStringRef<StorageWiggleMetrics>(primaryV.get(), IncludeVersion()).toJSON();
}
if (conf.regions.size() > 1 && remoteV.present()) {
res["remote"] = ObjectReader::fromStringRef<StorageWiggleMetrics>(remoteV.get(), IncludeVersion()).toJSON();
}
return res;
}
// constructs the cluster section of the json status output
ACTOR Future<StatusReply> clusterGetStatus(
Reference<AsyncVar<ServerDBInfo>> db,
@ -2839,7 +2926,8 @@ ACTOR Future<StatusReply> clusterGetStatus(
state std::vector<std::pair<GrvProxyInterface, EventMap>> grvProxies;
state std::vector<BlobWorkerInterface> blobWorkers;
state JsonBuilderObject qos;
state JsonBuilderObject data_overlay;
state JsonBuilderObject dataOverlay;
state JsonBuilderObject storageWiggler;
statusObj["protocol_version"] = format("%" PRIx64, g_network->protocolVersion().version());
statusObj["connection_string"] = coordinators.ccr->getConnectionString().toString();
@ -2918,15 +3006,22 @@ ACTOR Future<StatusReply> clusterGetStatus(
state int minStorageReplicasRemaining = -1;
state int fullyReplicatedRegions = -1;
// NOTE: here we should start all the transaction before wait in order to overlay latency
state Future<Optional<Value>> primaryDCFO = getActivePrimaryDC(cx, &fullyReplicatedRegions, &messages);
std::vector<Future<JsonBuilderObject>> futures2;
state std::vector<Future<JsonBuilderObject>> futures2;
futures2.push_back(dataStatusFetcher(ddWorker, configuration.get(), &minStorageReplicasRemaining));
futures2.push_back(workloadStatusFetcher(
db, workers, mWorker, rkWorker, &qos, &data_overlay, &status_incomplete_reasons, storageServerFuture));
db, workers, mWorker, rkWorker, &qos, &dataOverlay, &status_incomplete_reasons, storageServerFuture));
futures2.push_back(layerStatusFetcher(cx, &messages, &status_incomplete_reasons));
futures2.push_back(lockedStatusFetcher(db, &messages, &status_incomplete_reasons));
futures2.push_back(
clusterSummaryStatisticsFetcher(pMetrics, storageServerFuture, tLogFuture, &status_incomplete_reasons));
if (configuration.get().perpetualStorageWiggleSpeed > 0) {
wait(store(storageWiggler, storageWigglerStatsFetcher(configuration.get(), cx, true)));
statusObj["storage_wiggler"] = storageWiggler;
}
state std::vector<JsonBuilderObject> workerStatuses = wait(getAll(futures2));
wait(success(primaryDCFO));
@ -2958,7 +3053,7 @@ ACTOR Future<StatusReply> clusterGetStatus(
}
// workloadStatusFetcher returns the workload section but also optionally writes the qos section and adds to
// the data_overlay object
// the dataOverlay object
if (!workerStatuses[1].empty())
statusObj["workload"] = workerStatuses[1];
@ -2974,12 +3069,12 @@ ACTOR Future<StatusReply> clusterGetStatus(
if (!qos.empty())
statusObj["qos"] = qos;
// Merge data_overlay into data
// Merge dataOverlay into data
JsonBuilderObject& clusterDataSection = workerStatuses[0];
// TODO: This probably is no longer possible as there is no ability to merge json objects with an
// output-only model
clusterDataSection.addContents(data_overlay);
clusterDataSection.addContents(dataOverlay);
// If data section not empty, add it to statusObj
if (!clusterDataSection.empty())

View File

@ -1923,6 +1923,18 @@ void getLocalTime(const time_t* timep, struct tm* result) {
#endif
}
std::string timerIntToGmt(uint64_t timestamp) {
auto time = (time_t)(timestamp / 1e9); // convert to second, see timer_int() implementation
return getGmtTimeStr(&time);
}
std::string getGmtTimeStr(const time_t* time) {
char buff[50];
auto size = strftime(buff, 50, "%c %z", gmtime(time));
// printf(buff);
return std::string(std::begin(buff), std::begin(buff) + size);
}
void setMemoryQuota(size_t limit) {
#if defined(USE_SANITIZER)
// ASAN doesn't work with memory quotas: https://github.com/google/sanitizers/wiki/AddressSanitizer#ulimit--v

View File

@ -279,6 +279,11 @@ uint64_t timer_int(); // Return timer as uint64_t
void getLocalTime(const time_t* timep, struct tm* result);
// convert timestamp returned by timer_int() to Gmt format string
std::string timerIntToGmt(uint64_t timestamp);
std::string getGmtTimeStr(const time_t* time);
void setMemoryQuota(size_t limit);
void* allocate(size_t length, bool allowLargePages);

View File

@ -159,6 +159,8 @@ public: // introduced features
PROTOCOL_VERSION_FEATURE(0x0FDB00B071010000LL, ChangeFeed);
PROTOCOL_VERSION_FEATURE(0x0FDB00B071010000LL, BlobGranule);
PROTOCOL_VERSION_FEATURE(0x0FDB00B071010000LL, NetworkAddressHostnameFlag);
PROTOCOL_VERSION_FEATURE(0x0FDB00B071010000LL, StorageMetadata);
PROTOCOL_VERSION_FEATURE(0x0FDB00B071010000LL, PerpetualWiggleMetadata);
};
template <>

View File

@ -1553,6 +1553,18 @@ void parseNumericValue(std::string const& s, int64_t& outValue, bool permissive
throw attribute_not_found();
}
void parseNumericValue(std::string const& s, uint64_t& outValue, bool permissive = false) {
unsigned long long int i = 0;
int consumed = 0;
int r = sscanf(s.c_str(), "%llu%n", &i, &consumed);
if (r == 1 && (consumed == s.size() || permissive)) {
outValue = i;
return;
}
throw attribute_not_found();
}
template <class T>
T getNumericValue(TraceEventFields const& fields, std::string key, bool permissive) {
std::string field = fields.getValue(key);
@ -1585,6 +1597,10 @@ int64_t TraceEventFields::getInt64(std::string key, bool permissive) const {
return getNumericValue<int64_t>(*this, key, permissive);
}
uint64_t TraceEventFields::getUint64(std::string key, bool permissive) const {
return getNumericValue<uint64_t>(*this, key, permissive);
}
double TraceEventFields::getDouble(std::string key, bool permissive) const {
return getNumericValue<double>(*this, key, permissive);
}

View File

@ -119,6 +119,7 @@ public:
std::string getValue(std::string key) const;
int getInt(std::string key, bool permissive = false) const;
int64_t getInt64(std::string key, bool permissive = false) const;
uint64_t getUint64(std::string key, bool permissive = false) const;
double getDouble(std::string key, bool permissive = false) const;
Field& mutate(int index);