data operation api (not finished)
This commit is contained in:
parent
e48135086e
commit
8a59bc276d
|
@ -250,6 +250,68 @@ Future<Void> MockStorageServer::run() {
|
|||
return serveStorageMetricsRequests(this, ssi);
|
||||
}
|
||||
|
||||
void MockStorageServer::set(KeyRef key, int64_t bytes, int64_t oldBytes) {
|
||||
notifyMvccStorageCost(key, bytes);
|
||||
}
|
||||
|
||||
void MockStorageServer::insert(KeyRef key, int64_t bytes) {
|
||||
notifyMvccStorageCost(key, bytes);
|
||||
}
|
||||
|
||||
void MockStorageServer::clear(KeyRef key, int64_t bytes) {
|
||||
notifyMvccStorageCost(key, bytes);
|
||||
}
|
||||
|
||||
void MockStorageServer::clearRange(KeyRangeRef range, int64_t beginShardBytes, int64_t endShardBytes) {
|
||||
notifyMvccStorageCost(range.begin, range.begin.size() + range.end.size());
|
||||
|
||||
auto totalByteSize = estimateRangeTotalBytes(range, beginShardBytes, endShardBytes);
|
||||
}
|
||||
|
||||
void MockStorageServer::get(KeyRef key, int64_t bytes) {
|
||||
// If the read yields no value, randomly sample the empty read.
|
||||
int64_t bytesReadPerKSecond = std::max(bytes, SERVER_KNOBS->EMPTY_READ_PENALTY);
|
||||
metrics.notifyBytesReadPerKSecond(key, bytesReadPerKSecond);
|
||||
}
|
||||
|
||||
void MockStorageServer::getRange(KeyRangeRef range, int64_t beginShardBytes, int64_t endShardBytes) {
|
||||
auto totalByteSize = estimateRangeTotalBytes(range, beginShardBytes, endShardBytes);
|
||||
// For performance concerns, the cost of a range read is billed to the start key and end key of the
|
||||
// range.
|
||||
if (totalByteSize > 0) {
|
||||
int64_t bytesReadPerKSecond = std::max(totalByteSize, SERVER_KNOBS->EMPTY_READ_PENALTY) / 2;
|
||||
metrics.notifyBytesReadPerKSecond(range.begin, bytesReadPerKSecond);
|
||||
metrics.notifyBytesReadPerKSecond(range.end, bytesReadPerKSecond);
|
||||
}
|
||||
}
|
||||
|
||||
int64_t MockStorageServer::estimateRangeTotalBytes(KeyRangeRef range, int64_t beginShardBytes, int64_t endShardBytes) {
|
||||
int64_t totalByteSize = 0;
|
||||
auto ranges = serverKeys.intersectingRanges(range);
|
||||
|
||||
// use the beginShardBytes as partial size
|
||||
if (ranges.begin().begin() < range.begin) {
|
||||
ranges.pop_front();
|
||||
totalByteSize += beginShardBytes;
|
||||
}
|
||||
// use the endShardBytes as partial size
|
||||
if (ranges.end().begin() < range.end) {
|
||||
totalByteSize += endShardBytes;
|
||||
}
|
||||
for (auto it = ranges.begin(); it != ranges.end(); ++it) {
|
||||
totalByteSize += it->cvalue().shardSize;
|
||||
}
|
||||
return totalByteSize;
|
||||
}
|
||||
|
||||
void MockStorageServer::notifyMvccStorageCost(KeyRef key, int64_t size) {
|
||||
// update write bandwidth and iops as mock the cost of writing mvcc storage
|
||||
StorageMetrics s;
|
||||
s.bytesPerKSecond = mvccStorageBytes(size) / 2;
|
||||
s.iosPerKSecond = 1;
|
||||
metrics.notify(key, s);
|
||||
}
|
||||
|
||||
void MockGlobalState::initializeAsEmptyDatabaseMGS(const DatabaseConfiguration& conf, uint64_t defaultDiskSpace) {
|
||||
ASSERT(conf.storageTeamSize > 0);
|
||||
configuration = conf;
|
||||
|
|
|
@ -133,6 +133,23 @@ public:
|
|||
|
||||
Future<Void> run();
|
||||
|
||||
// data operation APIs - change the metrics
|
||||
|
||||
// Set key with a new value, the total bytes change from oldBytes to bytes
|
||||
void set(KeyRef key, int64_t bytes, int64_t oldBytes);
|
||||
// Insert key with a new value, the total bytes is `bytes`
|
||||
void insert(KeyRef key, int64_t bytes);
|
||||
// Clear key and its value of which the size is bytes
|
||||
void clear(KeyRef key, int64_t bytes);
|
||||
// Clear range, assuming the first and last shard within the range having size `beginShardBytes` and `endShardBytes`
|
||||
void clearRange(KeyRangeRef range, int64_t beginShardBytes, int64_t endShardBytes);
|
||||
|
||||
// modify the metrics as like doing an n-bytes read op
|
||||
// Read key and cause bytes read overhead
|
||||
void get(KeyRef key, int64_t bytes);
|
||||
// Read range, assuming the first and last shard within the range having size `beginShardBytes` and `endShardBytes`
|
||||
void getRange(KeyRangeRef range, int64_t beginShardBytes, int64_t endShardBytes);
|
||||
|
||||
protected:
|
||||
void threeWayShardSplitting(KeyRangeRef outerRange,
|
||||
KeyRangeRef innerRange,
|
||||
|
@ -140,6 +157,11 @@ protected:
|
|||
bool restrictSize);
|
||||
|
||||
void twoWayShardSplitting(KeyRangeRef range, KeyRef splitPoint, uint64_t rangeSize, bool restrictSize);
|
||||
|
||||
// Assuming the first and last shard within the range having size `beginShardBytes` and `endShardBytes`
|
||||
int64_t estimateRangeTotalBytes(KeyRangeRef range, int64_t beginShardBytes, int64_t endShardBytes);
|
||||
// Update the storage metrics as if we write the MVCC storage with a mutation of `size` bytes.
|
||||
void notifyMvccStorageCost(KeyRef key, int64_t size);
|
||||
};
|
||||
|
||||
class MockGlobalStateImpl;
|
||||
|
|
|
@ -228,5 +228,9 @@ Future<Void> serveStorageMetricsRequests(ServiceType* self, StorageServerInterfa
|
|||
}
|
||||
}
|
||||
|
||||
// For both the mutation log and the versioned map.
|
||||
inline int mvccStorageBytes(int64_t size) {
|
||||
return VersionedMap<KeyRef, ValueOrClearToRef>::overheadPerItem * 2 + (MutationRef::OVERHEAD_BYTES + size) * 2;
|
||||
}
|
||||
#include "flow/unactorcompiler.h"
|
||||
#endif // FDBSERVER_STORAGEMETRICS_H
|
|
@ -535,8 +535,7 @@ const int VERSION_OVERHEAD =
|
|||
// overhead for map
|
||||
// For both the mutation log and the versioned map.
|
||||
static int mvccStorageBytes(MutationRef const& m) {
|
||||
return VersionedMap<KeyRef, ValueOrClearToRef>::overheadPerItem * 2 +
|
||||
(MutationRef::OVERHEAD_BYTES + m.param1.size() + m.param2.size()) * 2;
|
||||
return mvccStorageBytes(m.param1.size() + m.param2.size());
|
||||
}
|
||||
|
||||
struct FetchInjectionInfo {
|
||||
|
|
Loading…
Reference in New Issue