added timeout and shard limiting behaviour

This commit is contained in:
Jon Fu 2019-05-17 16:11:50 -07:00
parent 0984d272e1
commit e339ab890b
9 changed files with 39 additions and 21 deletions

View File

@ -315,14 +315,15 @@ struct GetDDMetricsReply
struct GetDDMetricsRequest { struct GetDDMetricsRequest {
constexpr static FileIdentifier file_identifier = 14536812; constexpr static FileIdentifier file_identifier = 14536812;
KeyRange keys; KeyRange keys;
int shardLimit;
ReplyPromise<struct GetDDMetricsReply> reply; ReplyPromise<struct GetDDMetricsReply> reply;
GetDDMetricsRequest() {} GetDDMetricsRequest() {}
explicit GetDDMetricsRequest(KeyRange const& keys) : keys(keys) {} explicit GetDDMetricsRequest(KeyRange const& keys, const int shardLimit) : keys(keys), shardLimit(shardLimit) {}
template<class Ar> template<class Ar>
void serialize(Ar& ar) { void serialize(Ar& ar) {
serializer(ar, keys, reply); serializer(ar, keys, shardLimit, reply);
} }
}; };

View File

@ -3194,9 +3194,11 @@ ACTOR Future< Standalone<RangeResultRef> > waitStorageMetricsList(
KeyRange keys, KeyRange keys,
int shardLimit ) int shardLimit )
{ {
// TODO: add use for or remove shardLimit ErrorOr<GetDDMetricsReply> rep = wait(errorOr(loadBalance(cx->getMasterProxies(false), &MasterProxyInterface::getDDMetrics, GetDDMetricsRequest(keys, shardLimit))));
GetDDMetricsReply rep = wait(loadBalance(cx->getMasterProxies(false), &MasterProxyInterface::getDDMetrics, GetDDMetricsRequest(keys))); if (rep.isError()) {
return rep.storageMetricsList; throw rep.getError();
}
return rep.get().storageMetricsList;
} }
Future< Standalone<RangeResultRef> > Transaction::getStorageMetricsList(KeyRange const& keys, int shardLimit) { Future< Standalone<RangeResultRef> > Transaction::getStorageMetricsList(KeyRange const& keys, int shardLimit) {

View File

@ -1279,8 +1279,11 @@ Future< Standalone<RangeResultRef> > ReadYourWritesTransaction::getRange(
end.getKey().startsWith(stats_prefix)) { end.getKey().startsWith(stats_prefix)) {
if (tr.getDatabase().getPtr() && tr.getDatabase()->getConnectionFile()) { if (tr.getDatabase().getPtr() && tr.getDatabase()->getConnectionFile()) {
auto keys = KeyRangeRef(begin.getKey(), end.getKey()).removePrefix(stats_prefix); auto keys = KeyRangeRef(begin.getKey(), end.getKey()).removePrefix(stats_prefix);
// TODO: find appropriate use for shardLimit, currently not used try {
return tr.getStorageMetricsList(keys, 10); return tr.getStorageMetricsList(keys, CLIENT_KNOBS->STORAGE_METRICS_SHARD_LIMIT);
} catch( Error &e ) {
return e;
}
} }
else { else {
return Standalone<RangeResultRef>(); return Standalone<RangeResultRef>();

View File

@ -57,7 +57,6 @@ struct WorkerInfo : NonCopyable {
WorkerDetails details; WorkerDetails details;
Future<Void> haltRatekeeper; Future<Void> haltRatekeeper;
Future<Void> haltDistributor; Future<Void> haltDistributor;
Future<GetDataDistributorMetricsReply> ddMetrics;
WorkerInfo() : gen(-1), reboots(0), lastAvailableTime(now()), priorityInfo(ProcessClass::UnsetFit, false, ClusterControllerPriorityInfo::FitnessUnknown) {} WorkerInfo() : gen(-1), reboots(0), lastAvailableTime(now()), priorityInfo(ProcessClass::UnsetFit, false, ClusterControllerPriorityInfo::FitnessUnknown) {}
WorkerInfo( Future<Void> watcher, ReplyPromise<RegisterWorkerReply> reply, Generation gen, WorkerInterface interf, ProcessClass initialClass, ProcessClass processClass, ClusterControllerPriorityInfo priorityInfo, bool degraded ) : WorkerInfo( Future<Void> watcher, ReplyPromise<RegisterWorkerReply> reply, Generation gen, WorkerInterface interf, ProcessClass initialClass, ProcessClass processClass, ClusterControllerPriorityInfo priorityInfo, bool degraded ) :

View File

@ -3740,10 +3740,14 @@ ACTOR Future<Void> dataDistributor(DataDistributorInterface di, Reference<AsyncV
break; break;
} }
when ( state GetDataDistributorMetricsRequest req = waitNext(di.dataDistributorMetrics.getFuture()) ) { when ( state GetDataDistributorMetricsRequest req = waitNext(di.dataDistributorMetrics.getFuture()) ) {
Standalone<RangeResultRef> result = wait(brokenPromiseToNever(getShardMetricsList.getReply( GetMetricsListRequest(req.keys)))); ErrorOr<Standalone<RangeResultRef>> result = wait(errorOr(brokenPromiseToNever(getShardMetricsList.getReply( GetMetricsListRequest(req.keys, req.shardLimit)))));
GetDataDistributorMetricsReply rep; if ( result.isError() ) {
rep.storageMetricsList = result; req.reply.sendError(result.getError());
req.reply.send(rep); } else {
GetDataDistributorMetricsReply rep;
rep.storageMetricsList = result.get();
req.reply.send(rep);
}
} }
} }
} }

View File

@ -123,10 +123,11 @@ struct GetMetricsRequest {
struct GetMetricsListRequest { struct GetMetricsListRequest {
KeyRange keys; KeyRange keys;
int shardLimit;
Promise< Standalone<RangeResultRef> > reply; Promise< Standalone<RangeResultRef> > reply;
GetMetricsListRequest() {} GetMetricsListRequest() {}
GetMetricsListRequest( KeyRange const& keys ) : keys(keys) {} GetMetricsListRequest( KeyRange const& keys, const int shardLimit ) : keys(keys), shardLimit(shardLimit) {}
}; };
struct TeamCollectionInterface { struct TeamCollectionInterface {

View File

@ -655,7 +655,7 @@ ACTOR Future<Void> fetchShardMetricsList_impl( DataDistributionTracker* self, Ge
try { try {
loop { loop {
// no shard identifier as of yet, use simple numbering system // no shard identifier as of yet, use simple numbering system
int shardNum = 0; int shardNum = 1;
// list of metrics, regenerate on loop when full range unsuccessful // list of metrics, regenerate on loop when full range unsuccessful
Standalone<RangeResultRef> result; Standalone<RangeResultRef> result;
Future<Void> onChange; Future<Void> onChange;
@ -673,6 +673,9 @@ ACTOR Future<Void> fetchShardMetricsList_impl( DataDistributionTracker* self, Ge
) )
); );
++shardNum; ++shardNum;
if ( shardNum > req.shardLimit ) {
break;
}
} }
if( !onChange.isValid() ) { if( !onChange.isValid() ) {
@ -693,7 +696,7 @@ ACTOR Future<Void> fetchShardMetricsList( DataDistributionTracker* self, GetMetr
choose { choose {
when( wait( fetchShardMetricsList_impl( self, req ) ) ) {} when( wait( fetchShardMetricsList_impl( self, req ) ) ) {}
when( wait( delay( SERVER_KNOBS->DD_SHARD_METRICS_TIMEOUT ) ) ) { when( wait( delay( SERVER_KNOBS->DD_SHARD_METRICS_TIMEOUT ) ) ) {
// TODO: implement proper behaviour on timeout req.reply.sendError(timed_out());
} }
} }
return Void(); return Void();

View File

@ -80,14 +80,15 @@ struct GetDataDistributorMetricsReply {
struct GetDataDistributorMetricsRequest { struct GetDataDistributorMetricsRequest {
constexpr static FileIdentifier file_identifier = 1059267; constexpr static FileIdentifier file_identifier = 1059267;
KeyRange keys; KeyRange keys;
int shardLimit;
ReplyPromise<struct GetDataDistributorMetricsReply> reply; ReplyPromise<struct GetDataDistributorMetricsReply> reply;
GetDataDistributorMetricsRequest() {} GetDataDistributorMetricsRequest() {}
explicit GetDataDistributorMetricsRequest(KeyRange const& keys) : keys(keys) {} explicit GetDataDistributorMetricsRequest(KeyRange const& keys, const int shardLimit) : keys(keys), shardLimit(shardLimit) {}
template<class Ar> template<class Ar>
void serialize(Ar& ar) { void serialize(Ar& ar) {
serializer(ar, keys, reply); serializer(ar, keys, shardLimit, reply);
} }
}; };

View File

@ -1335,10 +1335,14 @@ ACTOR Future<Void> ddMetricsRequestServer(MasterProxyInterface proxy, Reference<
choose { choose {
when(state GetDDMetricsRequest req = waitNext(proxy.getDDMetrics.getFuture())) when(state GetDDMetricsRequest req = waitNext(proxy.getDDMetrics.getFuture()))
{ {
GetDataDistributorMetricsReply reply = wait(db->get().distributor.get().dataDistributorMetrics.getReply(GetDataDistributorMetricsRequest(req.keys))); ErrorOr<GetDataDistributorMetricsReply> reply = wait(errorOr(db->get().distributor.get().dataDistributorMetrics.getReply(GetDataDistributorMetricsRequest(req.keys, req.shardLimit))));
GetDDMetricsReply newReply; if ( reply.isError() ) {
newReply.storageMetricsList = reply.storageMetricsList; req.reply.sendError(reply.getError());
req.reply.send(newReply); } else {
GetDDMetricsReply newReply;
newReply.storageMetricsList = reply.get().storageMetricsList;
req.reply.send(newReply);
}
} }
} }
} }