update the name, comment and discription of write byte sampling; update the calculation of write bandwidth metrics
This commit is contained in:
parent
11b2c035c0
commit
55a3db82b5
|
@ -223,7 +223,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
|||
shards.
|
||||
|
||||
The bandwidth sample maintained by the storage server needs to be accurate enough to reliably measure this minimum bandwidth. See
|
||||
BANDWIDTH_UNITS_PER_SAMPLE. If this number is too low, the storage server needs to spend more memory and time on sampling.
|
||||
BYTES_WRITE_UNITS_PER_SAMPLE. If this number is too low, the storage server needs to spend more memory and time on sampling.
|
||||
*/
|
||||
|
||||
init( SHARD_SPLIT_BYTES_PER_KSEC, 250 * 1000 * 1000 ); if( buggifySmallBandwidthSplit ) SHARD_SPLIT_BYTES_PER_KSEC = 50 * 1000 * 1000;
|
||||
|
@ -743,7 +743,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
|||
init( STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS, 1000.0 / STORAGE_METRICS_AVERAGE_INTERVAL ); // milliHz!
|
||||
init( SPLIT_JITTER_AMOUNT, 0.05 ); if( randomize && BUGGIFY ) SPLIT_JITTER_AMOUNT = 0.2;
|
||||
init( IOPS_UNITS_PER_SAMPLE, 10000 * 1000 / STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS / 100 );
|
||||
init( BANDWIDTH_UNITS_PER_SAMPLE, SHARD_MIN_BYTES_PER_KSEC / STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS / 25 );
|
||||
init( BYTES_WRITE_UNITS_PER_SAMPLE, SHARD_MIN_BYTES_PER_KSEC / STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS / 25 );
|
||||
init( BYTES_READ_UNITS_PER_SAMPLE, 100000 ); // 100K bytes
|
||||
init( READ_HOT_SUB_RANGE_CHUNK_SIZE, 10000000); // 10MB
|
||||
init( EMPTY_READ_PENALTY, 20 ); // 20 bytes
|
||||
|
|
|
@ -697,7 +697,7 @@ public:
|
|||
double STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS;
|
||||
double SPLIT_JITTER_AMOUNT;
|
||||
int64_t IOPS_UNITS_PER_SAMPLE;
|
||||
int64_t BANDWIDTH_UNITS_PER_SAMPLE;
|
||||
int64_t BYTES_WRITE_UNITS_PER_SAMPLE;
|
||||
int64_t BYTES_READ_UNITS_PER_SAMPLE;
|
||||
int64_t READ_HOT_SUB_RANGE_CHUNK_SIZE;
|
||||
int64_t EMPTY_READ_PENALTY;
|
||||
|
|
|
@ -634,7 +634,7 @@ struct GetShardStateRequest {
|
|||
struct StorageMetrics {
|
||||
constexpr static FileIdentifier file_identifier = 13622226;
|
||||
int64_t bytes = 0; // total storage
|
||||
int64_t bytesPerKSecond = 0; // network bandwidth (average over 10s)
|
||||
int64_t writeBytesPerKSecond = 0; // network bandwidth (average over 10s) == write bandwidth through any IO devices
|
||||
|
||||
// FIXME: currently, iosPerKSecond is not used in DataDistribution calculations.
|
||||
int64_t iosPerKSecond = 0;
|
||||
|
@ -643,33 +643,33 @@ struct StorageMetrics {
|
|||
static const int64_t infinity = 1LL << 60;
|
||||
|
||||
bool allLessOrEqual(const StorageMetrics& rhs) const {
|
||||
return bytes <= rhs.bytes && bytesPerKSecond <= rhs.bytesPerKSecond && iosPerKSecond <= rhs.iosPerKSecond &&
|
||||
return bytes <= rhs.bytes && writeBytesPerKSecond <= rhs.writeBytesPerKSecond && iosPerKSecond <= rhs.iosPerKSecond &&
|
||||
bytesReadPerKSecond <= rhs.bytesReadPerKSecond;
|
||||
}
|
||||
void operator+=(const StorageMetrics& rhs) {
|
||||
bytes += rhs.bytes;
|
||||
bytesPerKSecond += rhs.bytesPerKSecond;
|
||||
writeBytesPerKSecond += rhs.writeBytesPerKSecond;
|
||||
iosPerKSecond += rhs.iosPerKSecond;
|
||||
bytesReadPerKSecond += rhs.bytesReadPerKSecond;
|
||||
}
|
||||
void operator-=(const StorageMetrics& rhs) {
|
||||
bytes -= rhs.bytes;
|
||||
bytesPerKSecond -= rhs.bytesPerKSecond;
|
||||
writeBytesPerKSecond -= rhs.writeBytesPerKSecond;
|
||||
iosPerKSecond -= rhs.iosPerKSecond;
|
||||
bytesReadPerKSecond -= rhs.bytesReadPerKSecond;
|
||||
}
|
||||
template <class F>
|
||||
void operator*=(F f) {
|
||||
bytes *= f;
|
||||
bytesPerKSecond *= f;
|
||||
writeBytesPerKSecond *= f;
|
||||
iosPerKSecond *= f;
|
||||
bytesReadPerKSecond *= f;
|
||||
}
|
||||
bool allZero() const { return !bytes && !bytesPerKSecond && !iosPerKSecond && !bytesReadPerKSecond; }
|
||||
bool allZero() const { return !bytes && !writeBytesPerKSecond && !iosPerKSecond && !bytesReadPerKSecond; }
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, bytes, bytesPerKSecond, iosPerKSecond, bytesReadPerKSecond);
|
||||
serializer(ar, bytes, writeBytesPerKSecond, iosPerKSecond, bytesReadPerKSecond);
|
||||
}
|
||||
|
||||
void negate() { operator*=(-1.0); }
|
||||
|
@ -697,14 +697,14 @@ struct StorageMetrics {
|
|||
}
|
||||
|
||||
bool operator==(StorageMetrics const& rhs) const {
|
||||
return bytes == rhs.bytes && bytesPerKSecond == rhs.bytesPerKSecond && iosPerKSecond == rhs.iosPerKSecond &&
|
||||
return bytes == rhs.bytes && writeBytesPerKSecond == rhs.writeBytesPerKSecond && iosPerKSecond == rhs.iosPerKSecond &&
|
||||
bytesReadPerKSecond == rhs.bytesReadPerKSecond;
|
||||
}
|
||||
|
||||
std::string toString() const {
|
||||
return format("Bytes: %lld, BPerKSec: %lld, iosPerKSec: %lld, BReadPerKSec: %lld",
|
||||
return format("Bytes: %lld, BWritePerKSec: %lld, iosPerKSec: %lld, BReadPerKSec: %lld",
|
||||
bytes,
|
||||
bytesPerKSecond,
|
||||
writeBytesPerKSecond,
|
||||
iosPerKSecond,
|
||||
bytesReadPerKSecond);
|
||||
}
|
||||
|
|
|
@ -636,11 +636,11 @@ ACTOR Future<BlobGranuleSplitPoints> splitRange(Reference<BlobManagerData> bmDat
|
|||
// only split on bytes and write rate
|
||||
state StorageMetrics splitMetrics;
|
||||
splitMetrics.bytes = SERVER_KNOBS->BG_SNAPSHOT_FILE_TARGET_BYTES;
|
||||
splitMetrics.bytesPerKSecond = SERVER_KNOBS->SHARD_SPLIT_BYTES_PER_KSEC;
|
||||
splitMetrics.writeBytesPerKSecond = SERVER_KNOBS->SHARD_SPLIT_BYTES_PER_KSEC;
|
||||
if (writeHot) {
|
||||
splitMetrics.bytesPerKSecond = std::min(splitMetrics.bytesPerKSecond, estimated.bytesPerKSecond / 2);
|
||||
splitMetrics.bytesPerKSecond =
|
||||
std::max(splitMetrics.bytesPerKSecond, SERVER_KNOBS->SHARD_MIN_BYTES_PER_KSEC);
|
||||
splitMetrics.writeBytesPerKSecond = std::min(splitMetrics.writeBytesPerKSecond, estimated.writeBytesPerKSecond / 2);
|
||||
splitMetrics.writeBytesPerKSecond =
|
||||
std::max(splitMetrics.writeBytesPerKSecond, SERVER_KNOBS->SHARD_MIN_BYTES_PER_KSEC);
|
||||
}
|
||||
splitMetrics.iosPerKSecond = splitMetrics.infinity;
|
||||
splitMetrics.bytesReadPerKSecond = splitMetrics.infinity;
|
||||
|
@ -2616,7 +2616,7 @@ ACTOR Future<Void> attemptMerges(Reference<BlobManagerData> bmData,
|
|||
wait(bmData->db->getStorageMetrics(std::get<1>(candidates[i]), CLIENT_KNOBS->TOO_MANY));
|
||||
|
||||
if (metrics.bytes >= SERVER_KNOBS->BG_SNAPSHOT_FILE_TARGET_BYTES ||
|
||||
metrics.bytesPerKSecond >= SERVER_KNOBS->SHARD_MIN_BYTES_PER_KSEC) {
|
||||
metrics.writeBytesPerKSecond >= SERVER_KNOBS->SHARD_MIN_BYTES_PER_KSEC) {
|
||||
// This granule cannot be merged with any neighbors.
|
||||
// If current candidates up to here can be merged, merge them and skip over this one
|
||||
attemptStartMerge(bmData, currentCandidates);
|
||||
|
|
|
@ -1595,7 +1595,7 @@ ACTOR Future<Void> granuleCheckMergeCandidate(Reference<BlobWorkerData> bwData,
|
|||
|
||||
// FIXME: maybe separate knob and/or value for write rate?
|
||||
if (currentMetrics.bytes >= SERVER_KNOBS->BG_SNAPSHOT_FILE_TARGET_BYTES / 2 ||
|
||||
currentMetrics.bytesPerKSecond >= SERVER_KNOBS->SHARD_MIN_BYTES_PER_KSEC) {
|
||||
currentMetrics.writeBytesPerKSecond >= SERVER_KNOBS->SHARD_MIN_BYTES_PER_KSEC) {
|
||||
wait(delayJittered(SERVER_KNOBS->BG_MERGE_CANDIDATE_THRESHOLD_SECONDS / 2.0));
|
||||
CODE_PROBE(true, "wait and check later to see if granule got smaller or colder");
|
||||
continue;
|
||||
|
|
|
@ -41,9 +41,9 @@ enum BandwidthStatus { BandwidthStatusLow, BandwidthStatusNormal, BandwidthStatu
|
|||
enum ReadBandwidthStatus { ReadBandwidthStatusNormal, ReadBandwidthStatusHigh };
|
||||
|
||||
BandwidthStatus getBandwidthStatus(StorageMetrics const& metrics) {
|
||||
if (metrics.bytesPerKSecond > SERVER_KNOBS->SHARD_MAX_BYTES_PER_KSEC)
|
||||
if (metrics.writeBytesPerKSecond > SERVER_KNOBS->SHARD_MAX_BYTES_PER_KSEC)
|
||||
return BandwidthStatusHigh;
|
||||
else if (metrics.bytesPerKSecond < SERVER_KNOBS->SHARD_MIN_BYTES_PER_KSEC)
|
||||
else if (metrics.writeBytesPerKSecond < SERVER_KNOBS->SHARD_MIN_BYTES_PER_KSEC)
|
||||
return BandwidthStatusLow;
|
||||
|
||||
return BandwidthStatusNormal;
|
||||
|
@ -176,7 +176,7 @@ ShardSizeBounds getShardSizeBounds(KeyRangeRef shard, int64_t maxShardSize) {
|
|||
bounds.max.bytes = maxShardSize;
|
||||
}
|
||||
|
||||
bounds.max.bytesPerKSecond = bounds.max.infinity;
|
||||
bounds.max.writeBytesPerKSecond = bounds.max.infinity;
|
||||
bounds.max.iosPerKSecond = bounds.max.infinity;
|
||||
bounds.max.bytesReadPerKSecond = bounds.max.infinity;
|
||||
|
||||
|
@ -187,14 +187,14 @@ ShardSizeBounds getShardSizeBounds(KeyRangeRef shard, int64_t maxShardSize) {
|
|||
bounds.min.bytes = maxShardSize / SERVER_KNOBS->SHARD_BYTES_RATIO;
|
||||
}
|
||||
|
||||
bounds.min.bytesPerKSecond = 0;
|
||||
bounds.min.writeBytesPerKSecond = 0;
|
||||
bounds.min.iosPerKSecond = 0;
|
||||
bounds.min.bytesReadPerKSecond = 0;
|
||||
|
||||
// The permitted error is 1/3 of the general-case minimum bytes (even in the special case where this is the last
|
||||
// shard)
|
||||
bounds.permittedError.bytes = bounds.max.bytes / SERVER_KNOBS->SHARD_BYTES_RATIO / 3;
|
||||
bounds.permittedError.bytesPerKSecond = bounds.permittedError.infinity;
|
||||
bounds.permittedError.writeBytesPerKSecond = bounds.permittedError.infinity;
|
||||
bounds.permittedError.iosPerKSecond = bounds.permittedError.infinity;
|
||||
bounds.permittedError.bytesReadPerKSecond = bounds.permittedError.infinity;
|
||||
|
||||
|
@ -222,18 +222,18 @@ ShardSizeBounds calculateShardSizeBounds(const KeyRange& keys,
|
|||
std::max(int64_t(bytes - (SERVER_KNOBS->MIN_SHARD_BYTES * 0.1)), (int64_t)0));
|
||||
bounds.permittedError.bytes = bytes * 0.1;
|
||||
if (bandwidthStatus == BandwidthStatusNormal) { // Not high or low
|
||||
bounds.max.bytesPerKSecond = SERVER_KNOBS->SHARD_MAX_BYTES_PER_KSEC;
|
||||
bounds.min.bytesPerKSecond = SERVER_KNOBS->SHARD_MIN_BYTES_PER_KSEC;
|
||||
bounds.permittedError.bytesPerKSecond = bounds.min.bytesPerKSecond / 4;
|
||||
bounds.max.writeBytesPerKSecond = SERVER_KNOBS->SHARD_MAX_BYTES_PER_KSEC;
|
||||
bounds.min.writeBytesPerKSecond = SERVER_KNOBS->SHARD_MIN_BYTES_PER_KSEC;
|
||||
bounds.permittedError.writeBytesPerKSecond = bounds.min.writeBytesPerKSecond / 4;
|
||||
} else if (bandwidthStatus == BandwidthStatusHigh) { // > 10MB/sec for 100MB shard, proportionally lower
|
||||
// for smaller shard, > 200KB/sec no matter what
|
||||
bounds.max.bytesPerKSecond = bounds.max.infinity;
|
||||
bounds.min.bytesPerKSecond = SERVER_KNOBS->SHARD_MAX_BYTES_PER_KSEC;
|
||||
bounds.permittedError.bytesPerKSecond = bounds.min.bytesPerKSecond / 4;
|
||||
bounds.max.writeBytesPerKSecond = bounds.max.infinity;
|
||||
bounds.min.writeBytesPerKSecond = SERVER_KNOBS->SHARD_MAX_BYTES_PER_KSEC;
|
||||
bounds.permittedError.writeBytesPerKSecond = bounds.min.writeBytesPerKSecond / 4;
|
||||
} else if (bandwidthStatus == BandwidthStatusLow) { // < 10KB/sec
|
||||
bounds.max.bytesPerKSecond = SERVER_KNOBS->SHARD_MIN_BYTES_PER_KSEC;
|
||||
bounds.min.bytesPerKSecond = 0;
|
||||
bounds.permittedError.bytesPerKSecond = bounds.max.bytesPerKSecond / 4;
|
||||
bounds.max.writeBytesPerKSecond = SERVER_KNOBS->SHARD_MIN_BYTES_PER_KSEC;
|
||||
bounds.min.writeBytesPerKSecond = 0;
|
||||
bounds.permittedError.writeBytesPerKSecond = bounds.max.writeBytesPerKSecond / 4;
|
||||
} else {
|
||||
ASSERT(false);
|
||||
}
|
||||
|
@ -309,12 +309,12 @@ ACTOR Future<Void> trackShardMetrics(DataDistributionTracker::SafeAccessor self,
|
|||
/*TraceEvent("ShardSizeUpdate")
|
||||
.detail("Keys", keys)
|
||||
.detail("UpdatedSize", metrics.metrics.bytes)
|
||||
.detail("Bandwidth", metrics.metrics.bytesPerKSecond)
|
||||
.detail("Bandwidth", metrics.metrics.writeBytesPerKSecond)
|
||||
.detail("BandwidthStatus", getBandwidthStatus(metrics))
|
||||
.detail("BytesLower", bounds.min.bytes)
|
||||
.detail("BytesUpper", bounds.max.bytes)
|
||||
.detail("BandwidthLower", bounds.min.bytesPerKSecond)
|
||||
.detail("BandwidthUpper", bounds.max.bytesPerKSecond)
|
||||
.detail("BandwidthLower", bounds.min.writeBytesPerKSecond)
|
||||
.detail("BandwidthUpper", bounds.max.writeBytesPerKSecond)
|
||||
.detail("ShardSizePresent", shardSize->get().present())
|
||||
.detail("OldShardSize", shardSize->get().present() ? shardSize->get().get().metrics.bytes : 0)
|
||||
.detail("TrackerID", trackerID);*/
|
||||
|
@ -882,7 +882,7 @@ ACTOR Future<Void> shardSplitter(DataDistributionTracker* self,
|
|||
|
||||
StorageMetrics splitMetrics;
|
||||
splitMetrics.bytes = shardBounds.max.bytes / 2;
|
||||
splitMetrics.bytesPerKSecond =
|
||||
splitMetrics.writeBytesPerKSecond =
|
||||
keys.begin >= keyServersKeys.begin ? splitMetrics.infinity : SERVER_KNOBS->SHARD_SPLIT_BYTES_PER_KSEC;
|
||||
splitMetrics.iosPerKSecond = splitMetrics.infinity;
|
||||
splitMetrics.bytesReadPerKSecond = splitMetrics.infinity; // Don't split by readBandwidth
|
||||
|
@ -905,7 +905,7 @@ ACTOR Future<Void> shardSplitter(DataDistributionTracker* self,
|
|||
bandwidthStatus == BandwidthStatusHigh ? "High"
|
||||
: bandwidthStatus == BandwidthStatusNormal ? "Normal"
|
||||
: "Low")
|
||||
.detail("BytesPerKSec", metrics.bytesPerKSecond)
|
||||
.detail("BytesPerKSec", metrics.writeBytesPerKSecond)
|
||||
.detail("NumShards", numShards);
|
||||
|
||||
if (numShards > 1) {
|
||||
|
@ -1206,7 +1206,7 @@ ACTOR Future<Void> shardTracker(DataDistributionTracker::SafeAccessor self,
|
|||
.detail("TrackerID", trackerID)
|
||||
.detail("MaxBytes", self()->maxShardSize->get().get())
|
||||
.detail("ShardSize", shardSize->get().get().bytes)
|
||||
.detail("BytesPerKSec", shardSize->get().get().bytesPerKSecond);*/
|
||||
.detail("BytesPerKSec", shardSize->get().get().writeBytesPerKSecond);*/
|
||||
|
||||
try {
|
||||
loop {
|
||||
|
|
|
@ -56,12 +56,12 @@
|
|||
ShardSizeBounds ShardSizeBounds::shardSizeBoundsBeforeTrack() {
|
||||
return ShardSizeBounds{
|
||||
.max = StorageMetrics{ .bytes = -1,
|
||||
.bytesPerKSecond = StorageMetrics::infinity,
|
||||
.writeBytesPerKSecond = StorageMetrics::infinity,
|
||||
.iosPerKSecond = StorageMetrics::infinity,
|
||||
.bytesReadPerKSecond = StorageMetrics::infinity },
|
||||
.min = StorageMetrics{ .bytes = -1, .bytesPerKSecond = 0, .iosPerKSecond = 0, .bytesReadPerKSecond = 0 },
|
||||
.min = StorageMetrics{ .bytes = -1, .writeBytesPerKSecond = 0, .iosPerKSecond = 0, .bytesReadPerKSecond = 0 },
|
||||
.permittedError = StorageMetrics{ .bytes = -1,
|
||||
.bytesPerKSecond = StorageMetrics::infinity,
|
||||
.writeBytesPerKSecond = StorageMetrics::infinity,
|
||||
.iosPerKSecond = StorageMetrics::infinity,
|
||||
.bytesReadPerKSecond = StorageMetrics::infinity }
|
||||
};
|
||||
|
|
|
@ -308,7 +308,7 @@ Future<Void> MockStorageServer::run() {
|
|||
}
|
||||
|
||||
void MockStorageServer::set(KeyRef key, int64_t bytes, int64_t oldBytes) {
|
||||
notifyMvccStorageCost(key, bytes);
|
||||
notifyWriteMetrics(key, bytes);
|
||||
byteSampleApplySet(key, bytes);
|
||||
auto delta = oldBytes - bytes;
|
||||
availableDiskSpace += delta;
|
||||
|
@ -316,7 +316,7 @@ void MockStorageServer::set(KeyRef key, int64_t bytes, int64_t oldBytes) {
|
|||
}
|
||||
|
||||
void MockStorageServer::clear(KeyRef key, int64_t bytes) {
|
||||
notifyMvccStorageCost(key, bytes);
|
||||
notifyWriteMetrics(key, bytes);
|
||||
KeyRange sr = singleKeyRange(key);
|
||||
byteSampleApplyClear(sr);
|
||||
availableDiskSpace += bytes;
|
||||
|
@ -324,7 +324,7 @@ void MockStorageServer::clear(KeyRef key, int64_t bytes) {
|
|||
}
|
||||
|
||||
void MockStorageServer::clearRange(KeyRangeRef range, int64_t beginShardBytes, int64_t endShardBytes) {
|
||||
notifyMvccStorageCost(range.begin, range.begin.size() + range.end.size());
|
||||
notifyWriteMetrics(range.begin, range.begin.size() + range.end.size());
|
||||
byteSampleApplyClear(range);
|
||||
auto totalByteSize = estimateRangeTotalBytes(range, beginShardBytes, endShardBytes);
|
||||
availableDiskSpace += totalByteSize;
|
||||
|
@ -386,10 +386,10 @@ void MockStorageServer::clearRangeTotalBytes(KeyRangeRef range, int64_t beginSha
|
|||
}
|
||||
}
|
||||
|
||||
void MockStorageServer::notifyMvccStorageCost(KeyRef key, int64_t size) {
|
||||
// update write bandwidth and iops as mock the cost of writing mvcc storage
|
||||
void MockStorageServer::notifyWriteMetrics(KeyRef key, int64_t size) {
|
||||
// update write bandwidth and iops as mock the cost of writing a mutation
|
||||
StorageMetrics s;
|
||||
s.bytesPerKSecond = mvccStorageBytes(size) / 2;
|
||||
s.writeBytesPerKSecond = size + MutationRef::OVERHEAD_BYTES;
|
||||
s.iosPerKSecond = 1;
|
||||
metrics.notify(key, s);
|
||||
}
|
||||
|
|
|
@ -75,8 +75,8 @@ KeyRef StorageMetricSample::splitEstimate(KeyRangeRef range, int64_t offset, boo
|
|||
StorageMetrics StorageServerMetrics::getMetrics(KeyRangeRef const& keys) const {
|
||||
StorageMetrics result;
|
||||
result.bytes = byteSample.getEstimate(keys);
|
||||
result.bytesPerKSecond =
|
||||
bandwidthSample.getEstimate(keys) * SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS;
|
||||
result.writeBytesPerKSecond =
|
||||
bytesWriteSample.getEstimate(keys) * SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS;
|
||||
result.iosPerKSecond = iopsSample.getEstimate(keys) * SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS;
|
||||
result.bytesReadPerKSecond =
|
||||
bytesReadSample.getEstimate(keys) * SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS;
|
||||
|
@ -88,7 +88,7 @@ StorageMetrics StorageServerMetrics::getMetrics(KeyRangeRef const& keys) const {
|
|||
void StorageServerMetrics::notify(KeyRef key, StorageMetrics& metrics) {
|
||||
ASSERT(metrics.bytes == 0); // ShardNotifyMetrics
|
||||
if (g_network->isSimulated()) {
|
||||
CODE_PROBE(metrics.bytesPerKSecond != 0, "ShardNotifyMetrics bytes");
|
||||
CODE_PROBE(metrics.writeBytesPerKSecond != 0, "ShardNotifyMetrics bytes");
|
||||
CODE_PROBE(metrics.iosPerKSecond != 0, "ShardNotifyMetrics ios");
|
||||
CODE_PROBE(metrics.bytesReadPerKSecond != 0, "ShardNotifyMetrics bytesRead", probe::decoration::rare);
|
||||
}
|
||||
|
@ -97,8 +97,8 @@ void StorageServerMetrics::notify(KeyRef key, StorageMetrics& metrics) {
|
|||
|
||||
StorageMetrics notifyMetrics;
|
||||
|
||||
if (metrics.bytesPerKSecond)
|
||||
notifyMetrics.bytesPerKSecond = bandwidthSample.addAndExpire(key, metrics.bytesPerKSecond, expire) *
|
||||
if (metrics.writeBytesPerKSecond)
|
||||
notifyMetrics.writeBytesPerKSecond = bytesWriteSample.addAndExpire(key, metrics.writeBytesPerKSecond, expire) *
|
||||
SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS;
|
||||
if (metrics.iosPerKSecond)
|
||||
notifyMetrics.iosPerKSecond = iopsSample.addAndExpire(key, metrics.iosPerKSecond, expire) *
|
||||
|
@ -177,8 +177,8 @@ void StorageServerMetrics::notifyNotReadable(KeyRangeRef keys) {
|
|||
void StorageServerMetrics::poll() {
|
||||
{
|
||||
StorageMetrics m;
|
||||
m.bytesPerKSecond = SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS;
|
||||
bandwidthSample.poll(waitMetricsMap, m);
|
||||
m.writeBytesPerKSecond = SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS;
|
||||
bytesWriteSample.poll(waitMetricsMap, m);
|
||||
}
|
||||
{
|
||||
StorageMetrics m;
|
||||
|
@ -250,7 +250,7 @@ void StorageServerMetrics::splitMetrics(SplitMetricsRequest req) const {
|
|||
if (remaining.bytes < 2 * minSplitBytes)
|
||||
break;
|
||||
KeyRef key = req.keys.end;
|
||||
bool hasUsed = used.bytes != 0 || used.bytesPerKSecond != 0 || used.iosPerKSecond != 0;
|
||||
bool hasUsed = used.bytes != 0 || used.writeBytesPerKSecond != 0 || used.iosPerKSecond != 0;
|
||||
key = getSplitKey(remaining.bytes,
|
||||
estimated.bytes,
|
||||
req.limits.bytes,
|
||||
|
@ -276,13 +276,13 @@ void StorageServerMetrics::splitMetrics(SplitMetricsRequest req) const {
|
|||
lastKey,
|
||||
key,
|
||||
hasUsed);
|
||||
key = getSplitKey(remaining.bytesPerKSecond,
|
||||
estimated.bytesPerKSecond,
|
||||
req.limits.bytesPerKSecond,
|
||||
used.bytesPerKSecond,
|
||||
key = getSplitKey(remaining.writeBytesPerKSecond,
|
||||
estimated.writeBytesPerKSecond,
|
||||
req.limits.writeBytesPerKSecond,
|
||||
used.writeBytesPerKSecond,
|
||||
req.limits.infinity,
|
||||
req.isLastShard,
|
||||
bandwidthSample,
|
||||
bytesWriteSample,
|
||||
SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS,
|
||||
lastKey,
|
||||
key,
|
||||
|
@ -328,12 +328,12 @@ void StorageServerMetrics::getStorageMetrics(GetStorageMetricsRequest req,
|
|||
|
||||
rep.available.bytes = sb.available;
|
||||
rep.available.iosPerKSecond = 10e6;
|
||||
rep.available.bytesPerKSecond = 100e9;
|
||||
rep.available.writeBytesPerKSecond = 100e9;
|
||||
rep.available.bytesReadPerKSecond = 100e9;
|
||||
|
||||
rep.capacity.bytes = sb.total;
|
||||
rep.capacity.iosPerKSecond = 10e6;
|
||||
rep.capacity.bytesPerKSecond = 100e9;
|
||||
rep.capacity.writeBytesPerKSecond = 100e9;
|
||||
rep.capacity.bytesReadPerKSecond = 100e9;
|
||||
|
||||
rep.bytesInputRate = bytesInputRate;
|
||||
|
|
|
@ -180,8 +180,8 @@ protected:
|
|||
// Decrease the intersecting shard bytes as if delete the data
|
||||
void clearRangeTotalBytes(KeyRangeRef range, int64_t beginShardBytes, int64_t endShardBytes);
|
||||
|
||||
// Update the storage metrics as if we write the MVCC storage with a mutation of `size` bytes.
|
||||
void notifyMvccStorageCost(KeyRef key, int64_t size);
|
||||
// Update the storage metrics as if we write a k-v pair of `size` bytes.
|
||||
void notifyWriteMetrics(KeyRef key, int64_t size);
|
||||
|
||||
// Randomly generate keys and kv size between the fetch range, updating the byte sample.
|
||||
// Once the fetchKeys return, the shard status will become FETCHED.
|
||||
|
|
|
@ -79,12 +79,12 @@ struct StorageServerMetrics {
|
|||
StorageMetricSample byteSample;
|
||||
|
||||
// FIXME: iops is not effectively tested, and is not used by data distribution
|
||||
TransientStorageMetricSample iopsSample, bandwidthSample;
|
||||
TransientStorageMetricSample iopsSample, bytesWriteSample;
|
||||
TransientStorageMetricSample bytesReadSample;
|
||||
|
||||
StorageServerMetrics()
|
||||
: byteSample(0), iopsSample(SERVER_KNOBS->IOPS_UNITS_PER_SAMPLE),
|
||||
bandwidthSample(SERVER_KNOBS->BANDWIDTH_UNITS_PER_SAMPLE),
|
||||
bytesWriteSample(SERVER_KNOBS->BYTES_WRITE_UNITS_PER_SAMPLE),
|
||||
bytesReadSample(SERVER_KNOBS->BYTES_READ_UNITS_PER_SAMPLE) {}
|
||||
|
||||
StorageMetrics getMetrics(KeyRangeRef const& keys) const;
|
||||
|
@ -230,10 +230,5 @@ Future<Void> serveStorageMetricsRequests(ServiceType* self, StorageServerInterfa
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
// For both the mutation log and the versioned map.
|
||||
inline int mvccStorageBytes(int64_t size) {
|
||||
return VersionedMap<KeyRef, ValueOrClearToRef>::overheadPerItem * 2 + (MutationRef::OVERHEAD_BYTES + size) * 2;
|
||||
}
|
||||
#include "flow/unactorcompiler.h"
|
||||
#endif // FDBSERVER_STORAGEMETRICS_H
|
|
@ -533,9 +533,13 @@ const int VERSION_OVERHEAD =
|
|||
sizeof(Reference<VersionedMap<KeyRef, ValueOrClearToRef>::PTreeT>)); // versioned map [ x2 for
|
||||
// createNewVersion(version+1) ], 64b
|
||||
// overhead for map
|
||||
// For both the mutation log and the versioned map.
|
||||
|
||||
// Memory size for storing mutation in the mutation log and the versioned map.
|
||||
static int mvccStorageBytes(MutationRef const& m) {
|
||||
return mvccStorageBytes(m.param1.size() + m.param2.size());
|
||||
// Why * 2:
|
||||
// - 1 insertion into version map costs 2 nodes in avg;
|
||||
// - The mutation will be stored in both mutation log and versioned map;
|
||||
return VersionedMap<KeyRef, ValueOrClearToRef>::overheadPerItem * 2 + m.totalSize() * 2;
|
||||
}
|
||||
|
||||
struct FetchInjectionInfo {
|
||||
|
@ -1960,7 +1964,7 @@ ACTOR Future<Void> getValueQ(StorageServer* data, GetValueRequest req) {
|
|||
|
||||
/*
|
||||
StorageMetrics m;
|
||||
m.bytesPerKSecond = req.key.size() + (v.present() ? v.get().size() : 0);
|
||||
m.writeBytesPerKSecond = req.key.size() + (v.present() ? v.get().size() : 0);
|
||||
m.iosPerKSecond = 1;
|
||||
data->metrics.notify(req.key, m);
|
||||
*/
|
||||
|
@ -5610,7 +5614,7 @@ void applyMutation(StorageServer* self,
|
|||
// m is expected to be in arena already
|
||||
// Clear split keys are added to arena
|
||||
StorageMetrics metrics;
|
||||
metrics.bytesPerKSecond = mvccStorageBytes(m) / 2;
|
||||
metrics.writeBytesPerKSecond = m.totalSize(); // comparable to counter.mutationBytes
|
||||
metrics.iosPerKSecond = 1;
|
||||
self->metrics.notify(m.param1, metrics);
|
||||
|
||||
|
@ -10070,12 +10074,12 @@ ACTOR Future<Void> waitMetrics(StorageServerMetrics* self, WaitMetricsRequest re
|
|||
// all the messages for one clear or set have been dispatched.
|
||||
|
||||
/*StorageMetrics m = getMetrics( data, req.keys );
|
||||
bool b = ( m.bytes != metrics.bytes || m.bytesPerKSecond != metrics.bytesPerKSecond ||
|
||||
bool b = ( m.bytes != metrics.bytes || m.writeBytesPerKSecond != metrics.writeBytesPerKSecond ||
|
||||
m.iosPerKSecond != metrics.iosPerKSecond ); if (b) { printf("keys: '%s' - '%s' @%p\n",
|
||||
printable(req.keys.begin).c_str(), printable(req.keys.end).c_str(), this);
|
||||
printf("waitMetrics: desync %d (%lld %lld %lld) != (%lld %lld %lld); +(%lld %lld %lld)\n",
|
||||
b, m.bytes, m.bytesPerKSecond, m.iosPerKSecond, metrics.bytes, metrics.bytesPerKSecond,
|
||||
metrics.iosPerKSecond, c.bytes, c.bytesPerKSecond, c.iosPerKSecond);
|
||||
b, m.bytes, m.writeBytesPerKSecond, m.iosPerKSecond, metrics.bytes, metrics.writeBytesPerKSecond,
|
||||
metrics.iosPerKSecond, c.bytes, c.writeBytesPerKSecond, c.iosPerKSecond);
|
||||
|
||||
}*/
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue