initial implementation of get-dd-metrics

This commit is contained in:
Jon Fu 2019-05-16 16:46:33 -07:00
parent 2e09a214d9
commit 0984d272e1
10 changed files with 176 additions and 6 deletions

View File

@ -51,6 +51,7 @@ struct MasterProxyInterface {
RequestStream< struct TxnStateRequest > txnState;
RequestStream< struct GetHealthMetricsRequest > getHealthMetrics;
RequestStream< struct GetDDMetricsRequest > getDDMetrics;
UID id() const { return commit.getEndpoint().token; }
std::string toString() const { return id().shortString(); }
@ -62,7 +63,7 @@ struct MasterProxyInterface {
void serialize(Archive& ar) {
serializer(ar, locality, provisional, commit, getConsistentReadVersion, getKeyServersLocations,
waitFailure, getStorageServerRejoinInfo, getRawCommittedVersion,
txnState, getHealthMetrics);
txnState, getHealthMetrics, getDDMetrics);
}
void initEndpoints() {
@ -298,4 +299,31 @@ struct GetHealthMetricsRequest
}
};
struct GetDDMetricsReply
{
constexpr static FileIdentifier file_identifier = 7277713;
Standalone<RangeResultRef> storageMetricsList;
GetDDMetricsReply() {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, storageMetricsList);
}
};
struct GetDDMetricsRequest {
constexpr static FileIdentifier file_identifier = 14536812;
KeyRange keys;
ReplyPromise<struct GetDDMetricsReply> reply;
GetDDMetricsRequest() {}
explicit GetDDMetricsRequest(KeyRange const& keys) : keys(keys) {}
template<class Ar>
void serialize(Ar& ar) {
serializer(ar, keys, reply);
}
};
#endif

View File

@ -3189,6 +3189,20 @@ Future< StorageMetrics > Transaction::getStorageMetrics( KeyRange const& keys, i
return ::waitStorageMetrics( cx, keys, StorageMetrics(), m, StorageMetrics(), shardLimit );
}
ACTOR Future< Standalone<RangeResultRef> > waitStorageMetricsList(
Database cx,
KeyRange keys,
int shardLimit )
{
// TODO: add use for or remove shardLimit
GetDDMetricsReply rep = wait(loadBalance(cx->getMasterProxies(false), &MasterProxyInterface::getDDMetrics, GetDDMetricsRequest(keys)));
return rep.storageMetricsList;
}
Future< Standalone<RangeResultRef> > Transaction::getStorageMetricsList(KeyRange const& keys, int shardLimit) {
return ::waitStorageMetricsList(cx, keys, shardLimit);
}
ACTOR Future< Standalone<VectorRef<KeyRef>> > splitStorageMetrics( Database cx, KeyRange keys, StorageMetrics limit, StorageMetrics estimated )
{
loop {

View File

@ -255,6 +255,7 @@ public:
Future< StorageMetrics > waitStorageMetrics( KeyRange const& keys, StorageMetrics const& min, StorageMetrics const& max, StorageMetrics const& permittedError, int shardLimit );
Future< StorageMetrics > getStorageMetrics( KeyRange const& keys, int shardLimit );
Future< Standalone<VectorRef<KeyRef>> > splitStorageMetrics( KeyRange const& keys, StorageMetrics const& limit, StorageMetrics const& estimated );
Future< Standalone<RangeResultRef> > getStorageMetricsList(KeyRange const& keys, int shardLimit);
// If checkWriteConflictRanges is true, existing write conflict ranges will be searched for this key
void set( const KeyRef& key, const ValueRef& value, bool addConflictRange = true );

View File

@ -1273,7 +1273,20 @@ Future< Standalone<RangeResultRef> > ReadYourWritesTransaction::getRange(
return Standalone<RangeResultRef>();
}
}
auto stats_prefix = LiteralStringRef("\xff\xff/dd_stats/");
if (begin.getKey().startsWith(stats_prefix) &&
end.getKey().startsWith(stats_prefix)) {
if (tr.getDatabase().getPtr() && tr.getDatabase()->getConnectionFile()) {
auto keys = KeyRangeRef(begin.getKey(), end.getKey()).removePrefix(stats_prefix);
// TODO: find appropriate use for shardLimit, currently not used
return tr.getStorageMetricsList(keys, 10);
}
else {
return Standalone<RangeResultRef>();
}
}
if(checkUsedDuringCommit()) {
return used_during_commit();
}

View File

@ -57,6 +57,7 @@ struct WorkerInfo : NonCopyable {
WorkerDetails details;
Future<Void> haltRatekeeper;
Future<Void> haltDistributor;
Future<GetDataDistributorMetricsReply> ddMetrics;
WorkerInfo() : gen(-1), reboots(0), lastAvailableTime(now()), priorityInfo(ProcessClass::UnsetFit, false, ClusterControllerPriorityInfo::FitnessUnknown) {}
WorkerInfo( Future<Void> watcher, ReplyPromise<RegisterWorkerReply> reply, Generation gen, WorkerInterface interf, ProcessClass initialClass, ProcessClass processClass, ClusterControllerPriorityInfo priorityInfo, bool degraded ) :

View File

@ -3517,7 +3517,7 @@ ACTOR Future<Void> monitorBatchLimitedTime(Reference<AsyncVar<ServerDBInfo>> db,
}
}
ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self)
ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self, PromiseStream<GetMetricsListRequest> getShardMetricsList)
{
state double lastLimited = 0;
self->addActor.send( monitorBatchLimitedTime(self->dbInfo, &lastLimited) );
@ -3676,7 +3676,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self)
}
actors.push_back( pollMoveKeysLock(cx, lock) );
actors.push_back( reportErrorsExcept( dataDistributionTracker( initData, cx, output, shardsAffectedByTeamFailure, getShardMetrics, getAverageShardBytes.getFuture(), readyToStart, anyZeroHealthyTeams, self->ddId ), "DDTracker", self->ddId, &normalDDQueueErrors() ) );
actors.push_back( reportErrorsExcept( dataDistributionTracker( initData, cx, output, shardsAffectedByTeamFailure, getShardMetrics, getShardMetricsList, getAverageShardBytes.getFuture(), readyToStart, anyZeroHealthyTeams, self->ddId ), "DDTracker", self->ddId, &normalDDQueueErrors() ) );
actors.push_back( reportErrorsExcept( dataDistributionQueue( cx, output, input.getFuture(), getShardMetrics, processingUnhealthy, tcis, shardsAffectedByTeamFailure, lock, getAverageShardBytes, self->ddId, storageTeamSize, &lastLimited ), "DDQueue", self->ddId, &normalDDQueueErrors() ) );
vector<DDTeamCollection*> teamCollectionsPtrs;
@ -3722,11 +3722,12 @@ static std::set<int> const& normalDataDistributorErrors() {
ACTOR Future<Void> dataDistributor(DataDistributorInterface di, Reference<AsyncVar<struct ServerDBInfo>> db ) {
state Reference<DataDistributorData> self( new DataDistributorData(db, di.id()) );
state Future<Void> collection = actorCollection( self->addActor.getFuture() );
state PromiseStream<GetMetricsListRequest> getShardMetricsList;
try {
TraceEvent("DataDistributor_Running", di.id());
self->addActor.send( waitFailureServer(di.waitFailure.getFuture()) );
state Future<Void> distributor = reportErrorsExcept( dataDistribution(self), "DataDistribution", di.id(), &normalDataDistributorErrors() );
state Future<Void> distributor = reportErrorsExcept( dataDistribution(self, getShardMetricsList), "DataDistribution", di.id(), &normalDataDistributorErrors() );
loop choose {
when ( wait(distributor || collection) ) {
@ -3738,6 +3739,12 @@ ACTOR Future<Void> dataDistributor(DataDistributorInterface di, Reference<AsyncV
TraceEvent("DataDistributorHalted", di.id()).detail("ReqID", req.requesterID);
break;
}
when ( state GetDataDistributorMetricsRequest req = waitNext(di.dataDistributorMetrics.getFuture()) ) {
Standalone<RangeResultRef> result = wait(brokenPromiseToNever(getShardMetricsList.getReply( GetMetricsListRequest(req.keys))));
GetDataDistributorMetricsReply rep;
rep.storageMetricsList = result;
req.reply.send(rep);
}
}
}
catch ( Error &err ) {

View File

@ -121,6 +121,14 @@ struct GetMetricsRequest {
GetMetricsRequest( KeyRange const& keys ) : keys(keys) {}
};
struct GetMetricsListRequest {
KeyRange keys;
Promise< Standalone<RangeResultRef> > reply;
GetMetricsListRequest() {}
GetMetricsListRequest( KeyRange const& keys ) : keys(keys) {}
};
struct TeamCollectionInterface {
PromiseStream< GetTeamRequest > getTeam;
};
@ -214,6 +222,7 @@ Future<Void> dataDistributionTracker(
PromiseStream<RelocateShard> const& output,
Reference<ShardsAffectedByTeamFailure> const& shardsAffectedByTeamFailure,
PromiseStream<GetMetricsRequest> const& getShardMetrics,
PromiseStream<GetMetricsListRequest> const& getShardMetricsList,
FutureStream<Promise<int64_t>> const& getAverageShardBytes,
Promise<Void> const& readyToStart,
Reference<AsyncVar<bool>> const& zeroHealthyTeams,

View File

@ -650,12 +650,62 @@ ACTOR Future<Void> fetchShardMetrics( DataDistributionTracker* self, GetMetricsR
return Void();
}
ACTOR Future<Void> fetchShardMetricsList_impl( DataDistributionTracker* self, GetMetricsListRequest req ) {
try {
loop {
// no shard identifier as of yet, use simple numbering system
int shardNum = 0;
// list of metrics, regenerate on loop when full range unsuccessful
Standalone<RangeResultRef> result;
Future<Void> onChange;
for( auto t : self->shards.intersectingRanges( req.keys ) ) {
auto &stats = t.value().stats;
if( !stats->get().present() ) {
onChange = stats->onChange();
break;
}
result.push_back_deep(
result.arena(),
KeyValueRef(
StringRef(std::to_string(shardNum)),
StringRef(t.value().stats->get().get().toString())
)
);
++shardNum;
}
if( !onChange.isValid() ) {
req.reply.send( result );
return Void();
}
wait( onChange );
}
} catch( Error &e ) {
if( e.code() != error_code_actor_cancelled && !req.reply.isSet() )
req.reply.sendError(e);
throw;
}
}
ACTOR Future<Void> fetchShardMetricsList( DataDistributionTracker* self, GetMetricsListRequest req ) {
choose {
when( wait( fetchShardMetricsList_impl( self, req ) ) ) {}
when( wait( delay( SERVER_KNOBS->DD_SHARD_METRICS_TIMEOUT ) ) ) {
// TODO: implement proper behaviour on timeout
}
}
return Void();
}
ACTOR Future<Void> dataDistributionTracker(
Reference<InitialDataDistribution> initData,
Database cx,
PromiseStream<RelocateShard> output,
Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure,
PromiseStream<GetMetricsRequest> getShardMetrics,
PromiseStream<GetMetricsListRequest> getShardMetricsList,
FutureStream<Promise<int64_t>> getAverageShardBytes,
Promise<Void> readyToStart,
Reference<AsyncVar<bool>> anyZeroHealthyTeams,
@ -682,6 +732,9 @@ ACTOR Future<Void> dataDistributionTracker(
when( GetMetricsRequest req = waitNext( getShardMetrics.getFuture() ) ) {
self.sizeChanges.add( fetchShardMetrics( &self, req ) );
}
when( GetMetricsListRequest req = waitNext( getShardMetricsList.getFuture() ) ) {
self.sizeChanges.add( fetchShardMetricsList( &self, req ) );
}
when( wait( self.sizeChanges.getResult() ) ) {}
}
} catch (Error& e) {

View File

@ -23,12 +23,14 @@
#include "fdbrpc/fdbrpc.h"
#include "fdbrpc/Locality.h"
#include "fdbclient/FDBTypes.h"
struct DataDistributorInterface {
constexpr static FileIdentifier file_identifier = 12383874;
RequestStream<ReplyPromise<Void>> waitFailure;
RequestStream<struct HaltDataDistributorRequest> haltDataDistributor;
struct LocalityData locality;
RequestStream<struct GetDataDistributorMetricsRequest> dataDistributorMetrics;
DataDistributorInterface() {}
explicit DataDistributorInterface(const struct LocalityData& l) : locality(l) {}
@ -45,7 +47,7 @@ struct DataDistributorInterface {
template <class Archive>
void serialize(Archive& ar) {
serializer(ar, waitFailure, haltDataDistributor, locality);
serializer(ar, waitFailure, haltDataDistributor, locality, dataDistributorMetrics);
}
};
@ -63,4 +65,30 @@ struct HaltDataDistributorRequest {
}
};
struct GetDataDistributorMetricsReply {
constexpr static FileIdentifier file_identifier = 1284337;
Standalone<RangeResultRef> storageMetricsList;
GetDataDistributorMetricsReply() {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar,storageMetricsList);
}
};
struct GetDataDistributorMetricsRequest {
constexpr static FileIdentifier file_identifier = 1059267;
KeyRange keys;
ReplyPromise<struct GetDataDistributorMetricsReply> reply;
GetDataDistributorMetricsRequest() {}
explicit GetDataDistributorMetricsRequest(KeyRange const& keys) : keys(keys) {}
template<class Ar>
void serialize(Ar& ar) {
serializer(ar, keys, reply);
}
};
#endif //FDBSERVER_DATADISTRIBUTORINTERFACE_H

View File

@ -1329,6 +1329,21 @@ ACTOR Future<Void> healthMetricsRequestServer(MasterProxyInterface proxy, GetHea
}
}
ACTOR Future<Void> ddMetricsRequestServer(MasterProxyInterface proxy, Reference<AsyncVar<ServerDBInfo>> db)
{
loop {
choose {
when(state GetDDMetricsRequest req = waitNext(proxy.getDDMetrics.getFuture()))
{
GetDataDistributorMetricsReply reply = wait(db->get().distributor.get().dataDistributorMetrics.getReply(GetDataDistributorMetricsRequest(req.keys)));
GetDDMetricsReply newReply;
newReply.storageMetricsList = reply.storageMetricsList;
req.reply.send(newReply);
}
}
}
}
ACTOR Future<Void> monitorRemoteCommitted(ProxyCommitData* self, Reference<AsyncVar<ServerDBInfo>> db) {
loop {
wait(delay(0)); //allow this actor to be cancelled if we are removed after db changes.
@ -1444,6 +1459,7 @@ ACTOR Future<Void> masterProxyServerCore(
addActor.send(readRequestServer(proxy, &commitData));
addActor.send(rejoinServer(proxy, &commitData));
addActor.send(healthMetricsRequestServer(proxy, &healthMetricsReply, &detailedHealthMetricsReply));
addActor.send(ddMetricsRequestServer(proxy, db));
// wait for txnStateStore recovery
wait(success(commitData.txnStateStore->readValue(StringRef())));