Merge pull request #2378 from dongxinEric/perf/fix-read-sampling-perf-regression

Perf/fix read sampling perf regression
This commit is contained in:
Meng Xu 2019-12-02 13:29:28 -08:00 committed by GitHub
commit d43b5d390d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 53 additions and 26 deletions

View File

@ -460,7 +460,8 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
init( IOPS_UNITS_PER_SAMPLE, 10000 * 1000 / STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS / 100 );
init( BANDWIDTH_UNITS_PER_SAMPLE, SHARD_MIN_BYTES_PER_KSEC / STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS / 25 );
init( BYTES_READ_UNITS_PER_SAMPLE, 100000 ); // 100K bytes
init( EMPTY_READ_PENALTY, 20 ); // 20 bytes
init( EMPTY_READ_PENALTY, 20 ); // 20 bytes
init( READ_SAMPLING_ENABLED, true ); if ( randomize && BUGGIFY ) READ_SAMPLING_ENABLED = false;// enable/disable read sampling
//Storage Server
init( STORAGE_LOGGING_DELAY, 5.0 );

View File

@ -398,6 +398,7 @@ public:
int64_t BANDWIDTH_UNITS_PER_SAMPLE;
int64_t BYTES_READ_UNITS_PER_SAMPLE;
int64_t EMPTY_READ_PENALTY;
bool READ_SAMPLING_ENABLED;
//Storage Server
double STORAGE_LOGGING_DELAY;

View File

@ -209,9 +209,11 @@ struct StorageServerMetrics {
// Notifies waiting WaitMetricsRequests through waitMetricsMap, and updates metricsAverageQueue and metricsSampleMap
void notify( KeyRef key, StorageMetrics& metrics ) {
ASSERT (metrics.bytes == 0); // ShardNotifyMetrics
TEST (metrics.bytesPerKSecond != 0); // ShardNotifyMetrics
TEST (metrics.iosPerKSecond != 0); // ShardNotifyMetrics
TEST(metrics.bytesReadPerKSecond != 0); // ShardNotifyMetrics
if (g_network->isSimulated()) {
TEST(metrics.bytesPerKSecond != 0); // ShardNotifyMetrics
TEST(metrics.iosPerKSecond != 0); // ShardNotifyMetrics
TEST(metrics.bytesReadPerKSecond != 0); // ShardNotifyMetrics
}
double expire = now() + SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL;
@ -227,12 +229,32 @@ struct StorageServerMetrics {
if (!notifyMetrics.allZero()) {
auto& v = waitMetricsMap[key];
for(int i=0; i<v.size(); i++) {
TEST( true ); // ShardNotifyMetrics
if (g_network->isSimulated()) {
TEST(true);
}
// ShardNotifyMetrics
v[i].send( notifyMetrics );
}
}
}
// Due to the fact that read sampling will be called on all reads, use this specialized function to avoid overhead
// around branch misses and unnecessary stack allocation which eventually addes up under heavy load.
void notifyBytesReadPerKSecond(KeyRef key, int64_t in) {
double expire = now() + SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL;
int64_t bytesReadPerKSecond =
bytesReadSample.addAndExpire(key, in, expire) * SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS;
if (bytesReadPerKSecond > 0) {
StorageMetrics notifyMetrics;
notifyMetrics.bytesReadPerKSecond = bytesReadPerKSecond;
auto& v = waitMetricsMap[key];
for (int i = 0; i < v.size(); i++) {
TEST(true); // ShardNotifyMetrics
v[i].send(notifyMetrics);
}
}
}
// Called by StorageServerDisk when the size of a key in byteSample changes, to notify WaitMetricsRequest
// Should not be called for keys past allKeys.end
void notifyBytes( RangeMap<Key, std::vector<PromiseStream<StorageMetrics>>, KeyRangeRef>::Iterator shard, int64_t bytes ) {

View File

@ -874,12 +874,13 @@ ACTOR Future<Void> getValueQ( StorageServer* data, GetValueRequest req ) {
++data->counters.emptyQueries;
}
StorageMetrics metrics;
// If the read yields no value, randomly sample the empty read.
metrics.bytesReadPerKSecond =
v.present() ? std::max((int64_t)(req.key.size() + v.get().size()), SERVER_KNOBS->EMPTY_READ_PENALTY)
: SERVER_KNOBS->EMPTY_READ_PENALTY;
data->metrics.notify(req.key, metrics);
if (SERVER_KNOBS->READ_SAMPLING_ENABLED) {
// If the read yields no value, randomly sample the empty read.
int64_t bytesReadPerKSecond =
v.present() ? std::max((int64_t)(req.key.size() + v.get().size()), SERVER_KNOBS->EMPTY_READ_PENALTY)
: SERVER_KNOBS->EMPTY_READ_PENALTY;
data->metrics.notifyBytesReadPerKSecond(req.key, bytesReadPerKSecond);
}
if( req.debugID.present() )
g_traceBatch.addEvent("GetValueDebug", req.debugID.get().first(), "getValueQ.AfterRead"); //.detail("TaskID", g_network->getCurrentTask());
@ -1079,7 +1080,6 @@ ACTOR Future<GetKeyValuesReply> readRange( StorageServer* data, Version version,
state KeyRef readEnd;
state Key readBeginTemp;
state int vCount;
state int64_t readSize;
//state UID rrid = deterministicRandom()->randomUniqueID();
//state int originalLimit = limit;
//state int originalLimitBytes = *pLimitBytes;
@ -1156,7 +1156,6 @@ ACTOR Future<GetKeyValuesReply> readRange( StorageServer* data, Version version,
for (auto i = &result.data[prevSize]; i != result.data.end(); i++) {
*pLimitBytes -= sizeof(KeyValueRef) + i->expectedSize();
readSize += sizeof(KeyValueRef) + i->expectedSize();
}
// Setup for the next iteration
@ -1246,7 +1245,6 @@ ACTOR Future<GetKeyValuesReply> readRange( StorageServer* data, Version version,
for (auto i = &result.data[prevSize]; i != result.data.end(); i++) {
*pLimitBytes -= sizeof(KeyValueRef) + i->expectedSize();
readSize += sizeof(KeyValueRef) + i->expectedSize();
}
vStart = vEnd;
@ -1261,9 +1259,6 @@ ACTOR Future<GetKeyValuesReply> readRange( StorageServer* data, Version version,
}
result.more = limit == 0 || *pLimitBytes<=0; // FIXME: Does this have to be exact?
result.version = version;
StorageMetrics metrics;
metrics.bytesReadPerKSecond = std::max(readSize, SERVER_KNOBS->EMPTY_READ_PENALTY);
data->metrics.notify(limit >= 0 ? range.begin : range.end, metrics);
return result;
}
@ -1317,15 +1312,18 @@ ACTOR Future<Key> findKey( StorageServer* data, KeySelectorRef sel, Version vers
if (index < rep.data.size()) {
*pOffset = 0;
StorageMetrics metrics;
metrics.bytesReadPerKSecond = std::max((int64_t)rep.data[index].key.size(), SERVER_KNOBS->EMPTY_READ_PENALTY);
data->metrics.notify(sel.getKey(), metrics);
if (SERVER_KNOBS->READ_SAMPLING_ENABLED) {
int64_t bytesReadPerKSecond =
std::max((int64_t)rep.data[index].key.size(), SERVER_KNOBS->EMPTY_READ_PENALTY);
data->metrics.notifyBytesReadPerKSecond(sel.getKey(), bytesReadPerKSecond);
}
return rep.data[ index ].key;
} else {
StorageMetrics metrics;
metrics.bytesReadPerKSecond = SERVER_KNOBS->EMPTY_READ_PENALTY;
data->metrics.notify(sel.getKey(), metrics);
if (SERVER_KNOBS->READ_SAMPLING_ENABLED) {
int64_t bytesReadPerKSecond = SERVER_KNOBS->EMPTY_READ_PENALTY;
data->metrics.notifyBytesReadPerKSecond(sel.getKey(), bytesReadPerKSecond);
}
// FIXME: If range.begin=="" && !forward, return success?
*pOffset = index - rep.data.size() + 1;
@ -1454,10 +1452,15 @@ ACTOR Future<Void> getKeyValues( StorageServer* data, GetKeyValuesRequest req )
data->metrics.notify(r.data[i].key, m);
}*/
// For performance concerns, the cost of a range read is billed to the start key and end key of the range.
int64_t totalByteSize = 0;
for (int i = 0; i < r.data.size(); i++) {
StorageMetrics m;
m.bytesReadPerKSecond = std::max((int64_t)r.data[i].expectedSize(), SERVER_KNOBS->EMPTY_READ_PENALTY);
data->metrics.notify(r.data[i].key, m);
totalByteSize += r.data[i].expectedSize();
}
if (totalByteSize > 0 && SERVER_KNOBS->READ_SAMPLING_ENABLED) {
int64_t bytesReadPerKSecond = std::max(totalByteSize, SERVER_KNOBS->EMPTY_READ_PENALTY) / 2;
data->metrics.notifyBytesReadPerKSecond(r.data[0].key, bytesReadPerKSecond);
data->metrics.notifyBytesReadPerKSecond(r.data[r.data.size() - 1].key, bytesReadPerKSecond);
}
r.penalty = data->getPenalty();