Merge pull request #1901 from ajbeamon/data-distribution-receives-bytes-input-rate

Send bytes input rate to data distribution
This commit is contained in:
A.J. Beamon 2019-07-30 15:01:36 -07:00 committed by GitHub
commit 14648e20f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 35 additions and 29 deletions

View File

@ -50,7 +50,7 @@ struct StorageServerInterface {
RequestStream<struct GetShardStateRequest> getShardState;
RequestStream<struct WaitMetricsRequest> waitMetrics;
RequestStream<struct SplitMetricsRequest> splitMetrics;
RequestStream<struct GetPhysicalMetricsRequest> getPhysicalMetrics;
RequestStream<struct GetStorageMetricsRequest> getStorageMetrics;
RequestStream<ReplyPromise<Void>> waitFailure;
RequestStream<struct StorageQueuingMetricsRequest> getQueuingMetrics;
@ -69,11 +69,11 @@ struct StorageServerInterface {
if constexpr (!is_fb_function<Ar>) {
serializer(ar, uniqueID, locality, getVersion, getValue, getKey, getKeyValues, getShardState, waitMetrics,
splitMetrics, getPhysicalMetrics, waitFailure, getQueuingMetrics, getKeyValueStoreType);
splitMetrics, getStorageMetrics, waitFailure, getQueuingMetrics, getKeyValueStoreType);
if (ar.protocolVersion().hasWatches()) serializer(ar, watchValue);
} else {
serializer(ar, uniqueID, locality, getVersion, getValue, getKey, getKeyValues, getShardState, waitMetrics,
splitMetrics, getPhysicalMetrics, waitFailure, getQueuingMetrics, getKeyValueStoreType,
splitMetrics, getStorageMetrics, waitFailure, getQueuingMetrics, getKeyValueStoreType,
watchValue);
}
}
@ -340,21 +340,24 @@ struct SplitMetricsRequest {
}
};
struct GetPhysicalMetricsReply {
struct GetStorageMetricsReply {
constexpr static FileIdentifier file_identifier = 15491478;
StorageMetrics load;
StorageMetrics free;
StorageMetrics capacity;
double bytesInputRate;
GetStorageMetricsReply() : bytesInputRate(0) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, load, free, capacity);
serializer(ar, load, free, capacity, bytesInputRate);
}
};
struct GetPhysicalMetricsRequest {
struct GetStorageMetricsRequest {
constexpr static FileIdentifier file_identifier = 13290999;
ReplyPromise<GetPhysicalMetricsReply> reply;
ReplyPromise<GetStorageMetricsReply> reply;
template <class Ar>
void serialize(Ar& ar) {

View File

@ -50,7 +50,7 @@ struct TCServerInfo : public ReferenceCounted<TCServerInfo> {
Reference<TCMachineInfo> machine;
Future<Void> tracker;
int64_t dataInFlightToServer;
ErrorOr<GetPhysicalMetricsReply> serverMetrics;
ErrorOr<GetStorageMetricsReply> serverMetrics;
Promise<std::pair<StorageServerInterface, ProcessClass>> interfaceChanged;
Future<std::pair<StorageServerInterface, ProcessClass>> onInterfaceChanged;
Promise<Void> removed;
@ -91,14 +91,14 @@ struct TCMachineInfo : public ReferenceCounted<TCMachineInfo> {
ACTOR Future<Void> updateServerMetrics( TCServerInfo *server ) {
state StorageServerInterface ssi = server->lastKnownInterface;
state Future<ErrorOr<GetPhysicalMetricsReply>> metricsRequest = ssi.getPhysicalMetrics.tryGetReply( GetPhysicalMetricsRequest(), TaskPriority::DataDistributionLaunch );
state Future<ErrorOr<GetStorageMetricsReply>> metricsRequest = ssi.getStorageMetrics.tryGetReply( GetStorageMetricsRequest(), TaskPriority::DataDistributionLaunch );
state Future<Void> resetRequest = Never();
state Future<std::pair<StorageServerInterface, ProcessClass>> interfaceChanged( server->onInterfaceChanged );
state Future<Void> serverRemoved( server->onRemoved );
loop {
choose {
when( ErrorOr<GetPhysicalMetricsReply> rep = wait( metricsRequest ) ) {
when( ErrorOr<GetStorageMetricsReply> rep = wait( metricsRequest ) ) {
if( rep.present() ) {
server->serverMetrics = rep;
if(server->updated.canBeSet()) {
@ -118,12 +118,12 @@ ACTOR Future<Void> updateServerMetrics( TCServerInfo *server ) {
return Void();
}
when( wait( resetRequest ) ) { //To prevent a tight spin loop
if(IFailureMonitor::failureMonitor().getState(ssi.getPhysicalMetrics.getEndpoint()).isFailed()) {
resetRequest = IFailureMonitor::failureMonitor().onStateEqual(ssi.getPhysicalMetrics.getEndpoint(), FailureStatus(false));
if(IFailureMonitor::failureMonitor().getState(ssi.getStorageMetrics.getEndpoint()).isFailed()) {
resetRequest = IFailureMonitor::failureMonitor().onStateEqual(ssi.getStorageMetrics.getEndpoint(), FailureStatus(false));
}
else {
resetRequest = Never();
metricsRequest = ssi.getPhysicalMetrics.tryGetReply( GetPhysicalMetricsRequest(), TaskPriority::DataDistributionLaunch );
metricsRequest = ssi.getStorageMetrics.tryGetReply( GetStorageMetricsRequest(), TaskPriority::DataDistributionLaunch );
}
}
}
@ -292,8 +292,8 @@ public:
return getMinFreeSpaceRatio() > SERVER_KNOBS->MIN_FREE_SPACE_RATIO && getMinFreeSpace() > SERVER_KNOBS->MIN_FREE_SPACE;
}
virtual Future<Void> updatePhysicalMetrics() {
return doUpdatePhysicalMetrics( this );
virtual Future<Void> updateStorageMetrics() {
return doUpdateStorageMetrics( this );
}
virtual bool isOptimal() {
@ -341,7 +341,7 @@ private:
// Calculate the max of the metrics replies that we received.
ACTOR Future<Void> doUpdatePhysicalMetrics( TCTeamInfo* self ) {
ACTOR Future<Void> doUpdateStorageMetrics( TCTeamInfo* self ) {
std::vector<Future<Void>> updates;
for( int i = 0; i< self->servers.size(); i++ )
updates.push_back( updateServerMetrics( self->servers[i] ) );

View File

@ -80,7 +80,7 @@ struct IDataDistributionTeam {
virtual int64_t getMinFreeSpace( bool includeInFlight = true ) = 0;
virtual double getMinFreeSpaceRatio( bool includeInFlight = true ) = 0;
virtual bool hasHealthyFreeSpace() = 0;
virtual Future<Void> updatePhysicalMetrics() = 0;
virtual Future<Void> updateStorageMetrics() = 0;
virtual void addref() = 0;
virtual void delref() = 0;
virtual bool isHealthy() = 0;

View File

@ -181,11 +181,11 @@ public:
});
}
virtual Future<Void> updatePhysicalMetrics() {
virtual Future<Void> updateStorageMetrics() {
vector<Future<Void>> futures;
for (auto it = teams.begin(); it != teams.end(); it++) {
futures.push_back((*it)->updatePhysicalMetrics());
futures.push_back((*it)->updateStorageMetrics());
}
return waitForAll(futures);
}
@ -1049,7 +1049,7 @@ ACTOR Future<Void> dataDistributionRelocator( DDQueueData *self, RelocateData rd
if( error.code() != error_code_move_to_removed_server ) {
if( !error.code() ) {
try {
wait( healthyDestinations.updatePhysicalMetrics() ); //prevent a gap between the polling for an increase in physical metrics and decrementing data in flight
wait( healthyDestinations.updateStorageMetrics() ); //prevent a gap between the polling for an increase in storage metrics and decrementing data in flight
} catch( Error& e ) {
error = e;
}

View File

@ -342,18 +342,19 @@ struct StorageServerMetrics {
}
}
void getPhysicalMetrics( GetPhysicalMetricsRequest req, StorageBytes sb ){
GetPhysicalMetricsReply rep;
void getStorageMetrics( GetStorageMetricsRequest req, StorageBytes sb, double bytesInputRate ){
GetStorageMetricsReply rep;
// SOMEDAY: make bytes dynamic with hard disk space
rep.load = getMetrics(allKeys);
if (sb.free < 1e9 && deterministicRandom()->random01() < 0.1)
if (sb.free < 1e9 && deterministicRandom()->random01() < 0.1) {
TraceEvent(SevWarn, "PhysicalDiskMetrics")
.detail("Free", sb.free)
.detail("Total", sb.total)
.detail("Available", sb.available)
.detail("Load", rep.load.bytes);
}
rep.free.bytes = sb.free;
rep.free.iosPerKSecond = 10e6;
@ -363,6 +364,8 @@ struct StorageServerMetrics {
rep.capacity.iosPerKSecond = 10e6;
rep.capacity.bytesPerKSecond = 100e9;
rep.bytesInputRate = bytesInputRate;
req.reply.send(rep);
}

View File

@ -3399,9 +3399,9 @@ ACTOR Future<Void> metricsCore( StorageServer* self, StorageServerInterface ssi
self->metrics.splitMetrics( req );
}
}
when (GetPhysicalMetricsRequest req = waitNext(ssi.getPhysicalMetrics.getFuture())) {
when (GetStorageMetricsRequest req = waitNext(ssi.getStorageMetrics.getFuture())) {
StorageBytes sb = self->storage.getStorageBytes();
self->metrics.getPhysicalMetrics( req, sb );
self->metrics.getStorageMetrics( req, sb, self->counters.bytesInput.getRate() );
}
when (wait(doPollMetrics) ) {
self->metrics.poll();

View File

@ -570,7 +570,7 @@ ACTOR Future<Void> storageServerRollbackRebooter( Future<Void> prevStorageServer
DUMPTOKEN(recruited.getShardState);
DUMPTOKEN(recruited.waitMetrics);
DUMPTOKEN(recruited.splitMetrics);
DUMPTOKEN(recruited.getPhysicalMetrics);
DUMPTOKEN(recruited.getStorageMetrics);
DUMPTOKEN(recruited.waitFailure);
DUMPTOKEN(recruited.getQueuingMetrics);
DUMPTOKEN(recruited.getKeyValueStoreType);
@ -847,7 +847,7 @@ ACTOR Future<Void> workerServer(
DUMPTOKEN(recruited.getShardState);
DUMPTOKEN(recruited.waitMetrics);
DUMPTOKEN(recruited.splitMetrics);
DUMPTOKEN(recruited.getPhysicalMetrics);
DUMPTOKEN(recruited.getStorageMetrics);
DUMPTOKEN(recruited.waitFailure);
DUMPTOKEN(recruited.getQueuingMetrics);
DUMPTOKEN(recruited.getKeyValueStoreType);
@ -1074,7 +1074,7 @@ ACTOR Future<Void> workerServer(
DUMPTOKEN(recruited.getShardState);
DUMPTOKEN(recruited.waitMetrics);
DUMPTOKEN(recruited.splitMetrics);
DUMPTOKEN(recruited.getPhysicalMetrics);
DUMPTOKEN(recruited.getStorageMetrics);
DUMPTOKEN(recruited.waitFailure);
DUMPTOKEN(recruited.getQueuingMetrics);
DUMPTOKEN(recruited.getKeyValueStoreType);

View File

@ -96,7 +96,7 @@ public: // introduced features
//
// xyzdev
// vvvv
constexpr ProtocolVersion currentProtocolVersion(0x0FDB00B062000001LL);
constexpr ProtocolVersion currentProtocolVersion(0x0FDB00B062010001LL);
// This assert is intended to help prevent incrementing the leftmost digits accidentally. It will probably need to
// change when we reach version 10.
static_assert(currentProtocolVersion.version() < 0x0FDB00B100000000LL, "Unexpected protocol version");