Add metrics for blob worker.

We want to add metrics for the blob worker to evaluate its
performance more concretely. We decided to track the following
information:
- s3 put requests
- s3 get requests
- S3 delete requests
- Delta files written
- Snapshot files written
- Delta bytes written
- Snapshot bytes written
- Number of current ranges assigned
- Bytes read from FDB (initial snapshot)
- Bytes read from S3 (compaction)
- Read requests count
- Read files returned
- Read deltas returned
- Read delta bytes returned
- Ranges assigned
- Ranges revoked
- Number of current ranges assigned
- Total mutation bytes buffered across all ranges // current or accumulated
- Range feed bytes input
- Range feed mutations input
- Wrong Shard Server count
This commit is contained in:
Suraj Gupta 2021-09-03 10:10:15 -04:00
parent bdbb0303f3
commit a0b3446572
4 changed files with 125 additions and 21 deletions

View File

@ -256,7 +256,9 @@ static void applyDeltas(std::map<KeyRef, ValueRef>* dataMap,
ACTOR Future<RangeResult> readBlobGranule(BlobGranuleChunkRef chunk,
KeyRangeRef keyRange,
Version readVersion,
Reference<BackupContainerFileSystem> bstore) {
Reference<BackupContainerFileSystem> bstore,
Optional<BlobWorkerStats *> stats) {
// TODO REMOVE with V2 of protocol
ASSERT(readVersion == chunk.includedVersion);
// Arena to hold all allocations for applying deltas. Most of it, and the arenas produced by reading the files,
@ -269,13 +271,23 @@ ACTOR Future<RangeResult> readBlobGranule(BlobGranuleChunkRef chunk,
try {
state std::map<KeyRef, ValueRef> dataMap;
Future<Arena> readSnapshotFuture = chunk.snapshotFile.present()
? readSnapshotFile(bstore, chunk.snapshotFile.get(), keyRange, &dataMap)
: Future<Arena>(Arena());
Future<Arena> readSnapshotFuture;
if (chunk.snapshotFile.present()) {
readSnapshotFuture = readSnapshotFile(bstore, chunk.snapshotFile.get(), keyRange, &dataMap);
if (stats.present()) {
++stats.get()->s3GetReqs;
}
} else {
readSnapshotFuture = Future<Arena>(Arena());
}
state std::vector<Future<Standalone<GranuleDeltas>>> readDeltaFutures;
readDeltaFutures.reserve(chunk.deltaFiles.size());
for (BlobFilenameRef deltaFile : chunk.deltaFiles) {
readDeltaFutures.push_back(readDeltaFile(bstore, deltaFile, keyRange, readVersion));
if (stats.present()) {
++stats.get()->s3GetReqs;
}
}
Arena snapshotArena = wait(readSnapshotFuture);
@ -516,4 +528,4 @@ TEST_CASE("/blobgranule/reader/applyDelta") {
ASSERT(data == correctData);
return Void();
}
}

View File

@ -30,6 +30,7 @@
#include "fdbclient/BlobWorkerInterface.h"
#include "fdbclient/BackupContainerFileSystem.h"
#include "fdbclient/BlobWorkerCommon.h"
#include "flow/actorcompiler.h" // This must be the last #include.
@ -38,7 +39,8 @@
ACTOR Future<RangeResult> readBlobGranule(BlobGranuleChunkRef chunk,
KeyRangeRef keyRange,
Version readVersion,
Reference<BackupContainerFileSystem> bstore);
Reference<BackupContainerFileSystem> bstore,
Optional<BlobWorkerStats *> stats=Optional<BlobWorkerStats *>());
ACTOR Future<Void> readBlobGranules(BlobGranuleFileRequest request,
BlobGranuleFileReply reply,

View File

@ -0,0 +1,68 @@
/*
* BlobWorkerCommon.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2021 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FDBCLIENT_BLOBWORKERCOMMON_H
#define FDBCLIENT_BLOBWORKERCOMMON_H
#include "fdbrpc/Stats.h"
struct BlobWorkerStats {
CounterCollection cc;
Counter s3PutReqs, s3GetReqs, s3DeleteReqs;
Counter deltaFilesWritten, snapshotFilesWritten;
Counter deltaBytesWritten, snapshotBytesWritten;
Counter bytesReadFromFDBForInitialSnapshot;
Counter bytesReadFromS3ForCompaction;
Counter rangeAssignmentRequests;
Counter readRequests;
Counter wrongShardServer;
Counter rangeFeedInputBytes, rangeFeedOutputBytes;
Counter readReqTotalFilesReturned, readReqDeltaFilesReturned;
Counter readReqDeltaBytesReturned;
int numRangesAssigned;
int mutationBytesBuffered;
Future<Void> logger;
// Current stats maintained for a given blob worker process
explicit BlobWorkerStats(UID id, double interval)
: cc("BlobWorkerStats", id.toString()),
s3PutReqs("S3PutReqs", cc), s3GetReqs("S3GetReqs", cc), s3DeleteReqs("S3DeleteReqs", cc),
deltaFilesWritten("DeltaFilesWritten", cc), snapshotFilesWritten("SnapshotFilesWritten", cc),
deltaBytesWritten("DeltaBytesWritten", cc), snapshotBytesWritten("SnapshotBytesWritten", cc),
bytesReadFromFDBForInitialSnapshot("BytesReadFromFDBForInitialSnapshot", cc),
bytesReadFromS3ForCompaction("BytesReadFromS3ForCompaction", cc),
rangeAssignmentRequests("RangeAssignmentRequests", cc), readRequests("ReadRequests", cc),
wrongShardServer("WrongShardServer", cc),
rangeFeedInputBytes("RangeFeedInputBytes", cc), rangeFeedOutputBytes("RangeFeedOutputBytes", cc),
readReqTotalFilesReturned("ReadReqTotalFilesReturned", cc), readReqDeltaFilesReturned("ReadReqDeltaFilesReturned", cc),
readReqDeltaBytesReturned("ReadReqDeltaBytesReturned", cc),
numRangesAssigned(0), mutationBytesBuffered(0)
{
specialCounter(cc, "NumRangesAssigned", [this]() { return this->numRangesAssigned; });
specialCounter(cc, "MutationBytesBuffered", [this]() { return this->mutationBytesBuffered; });
logger = traceCounters("BlobWorkerMetrics", id, interval, &cc, "BlobWorkerMetrics");
}
};
#endif

View File

@ -22,6 +22,7 @@
#include "fdbrpc/simulator.h"
#include "fdbclient/BackupContainerFileSystem.h"
#include "fdbclient/BlobGranuleReader.actor.h"
#include "fdbclient/BlobWorkerCommon.h"
#include "fdbclient/BlobWorkerInterface.h"
#include "fdbclient/DatabaseContext.h"
#include "fdbclient/NativeAPI.actor.h"
@ -89,6 +90,8 @@ struct BlobWorkerData {
UID id;
Database db;
BlobWorkerStats stats;
LocalityData locality;
int64_t currentManagerEpoch = -1;
@ -100,7 +103,7 @@ struct BlobWorkerData {
KeyRangeMap<GranuleRangeMetadata> granuleMetadata;
BlobWorkerData(UID id, Database db) : id(id), db(db) {}
BlobWorkerData(UID id, Database db) : id(id), db(db), stats(id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL) {}
~BlobWorkerData() { printf("Destroying blob worker data for %s\n", id.toString().c_str()); }
};
@ -182,6 +185,11 @@ ACTOR Future<BlobFileIndex> writeDeltaFile(BlobWorkerData* bwData,
state Value serialized = ObjectWriter::toValue(*deltasToWrite, Unversioned());
state Reference<IBackupFile> objectFile = wait(bwData->bstore->writeFile(fname));
++bwData->stats.s3PutReqs;
++bwData->stats.deltaFilesWritten;
bwData->stats.deltaBytesWritten += serialized.size();
wait(objectFile->append(serialized.begin(), serialized.size()));
wait(objectFile->finish());
@ -219,6 +227,7 @@ ACTOR Future<BlobFileIndex> writeDeltaFile(BlobWorkerData* bwData,
}
state Error eState = e;
wait(bwData->bstore->deleteFile(fname));
++bwData->stats.s3DeleteReqs;
throw eState;
}
}
@ -275,6 +284,11 @@ ACTOR Future<BlobFileIndex> writeSnapshot(BlobWorkerData* bwData,
// write to s3 using multi part upload
state Reference<IBackupFile> objectFile = wait(bwData->bstore->writeFile(fname));
++bwData->stats.s3PutReqs;
++bwData->stats.snapshotFilesWritten;
bwData->stats.snapshotBytesWritten += serialized.size(); // vs. snapshot.size()?
wait(objectFile->append(serialized.begin(), serialized.size()));
wait(objectFile->finish());
@ -307,6 +321,7 @@ ACTOR Future<BlobFileIndex> writeSnapshot(BlobWorkerData* bwData,
}
state Error eState = e;
wait(bwData->bstore->deleteFile(fname));
++bwData->stats.s3DeleteReqs;
throw eState;
}
@ -342,6 +357,7 @@ ACTOR Future<BlobFileIndex> dumpInitialSnapshotFromFDB(BlobWorkerData* bwData, R
// TODO: use streaming range read
// TODO knob for limit?
RangeResult res = wait(tr->getRange(KeyRangeRef(beginKey, metadata->keyRange.end), 1000));
bwData->stats.bytesReadFromFDBForInitialSnapshot += res.size();
rowsStream.send(res);
if (res.more) {
beginKey = keyAfter(res.back().key);
@ -412,7 +428,8 @@ ACTOR Future<BlobFileIndex> compactFromBlob(BlobWorkerData* bwData, Reference<Gr
state PromiseStream<RangeResult> rowsStream;
state Future<BlobFileIndex> snapshotWriter = writeSnapshot(
bwData, metadata->keyRange, metadata->lockEpoch, metadata->lockSeqno, version, rowsStream);
RangeResult newGranule = wait(readBlobGranule(chunk, metadata->keyRange, version, bwData->bstore));
RangeResult newGranule = wait(readBlobGranule(chunk, metadata->keyRange, version, bwData->bstore, &bwData->stats));
bwData->stats.bytesReadFromS3ForCompaction += newGranule.expectedSize();
rowsStream.send(std::move(newGranule));
rowsStream.sendError(end_of_stream());
@ -479,6 +496,7 @@ ACTOR Future<Void> blobGranuleUpdateFiles(BlobWorkerData* bwData, Reference<Gran
metadata->currentDeltaVersion = metadata->lastWriteVersion;
rangeFeedFuture = bwData->db->getRangeFeedStream(
rangeFeedStream, rangeFeedData.first, newSnapshotFile.version + 1, maxVersion, metadata->keyRange);
bwData->stats.rangeFeedInputBytes += metadata->keyRange.expectedSize();
loop {
// TODO: Buggify delay in change feed stream
@ -490,6 +508,8 @@ ACTOR Future<Void> blobGranuleUpdateFiles(BlobWorkerData* bwData, Reference<Gran
// FIXME: add mutation tracking here
// 8 for version, 1 for type, 4 for each param length then actual param size
metadata->currentDeltaBytes += 17 + delta.param1.size() + delta.param2.size();
bwData->stats.rangeFeedOutputBytes += delta.expectedSize();
bwData->stats.mutationBytesBuffered = metadata->currentDeltaBytes;
}
}
@ -522,6 +542,8 @@ ACTOR Future<Void> blobGranuleUpdateFiles(BlobWorkerData* bwData, Reference<Gran
metadata->currentDeltas = GranuleDeltas();
metadata->currentDeltaBytes = 0;
bwData->stats.mutationBytesBuffered = 0;
if (BW_DEBUG) {
printf("Popping range feed %s at %lld\n\n",
rangeFeedData.first.printable().c_str(),
@ -572,25 +594,17 @@ static void handleBlobGranuleFileRequest(BlobWorkerData* bwData, const BlobGranu
// check for gaps as errors before doing actual data copying
KeyRef lastRangeEnd = req.keyRange.begin;
for (auto& r : checkRanges) {
if (lastRangeEnd < r.begin()) {
bool isValid = r.value().activeMetadata.isValid();
if (lastRangeEnd < r.begin() || !isValid) {
if (BW_REQUEST_DEBUG) {
printf("No blob data for [%s - %s) in request range [%s - %s), skipping request\n",
lastRangeEnd.printable().c_str(),
r.begin().printable().c_str(),
req.keyRange.begin.printable().c_str(),
req.keyRange.end.printable().c_str());
}
req.reply.sendError(wrong_shard_server());
return;
}
if (!r.value().activeMetadata.isValid()) {
if (BW_REQUEST_DEBUG) {
printf("No valid blob data for [%s - %s) in request range [%s - %s), skipping request\n",
printf("No %s blob data for [%s - %s) in request range [%s - %s), skipping request\n",
isValid ? "" : "valid",
lastRangeEnd.printable().c_str(),
r.begin().printable().c_str(),
req.keyRange.begin.printable().c_str(),
req.keyRange.end.printable().c_str());
}
++bwData->stats.wrongShardServer;
req.reply.sendError(wrong_shard_server());
return;
}
@ -604,6 +618,7 @@ static void handleBlobGranuleFileRequest(BlobWorkerData* bwData, const BlobGranu
req.keyRange.begin.printable().c_str(),
req.keyRange.end.printable().c_str());
}
++bwData->stats.wrongShardServer;
req.reply.sendError(wrong_shard_server());
return;
}
@ -664,6 +679,7 @@ static void handleBlobGranuleFileRequest(BlobWorkerData* bwData, const BlobGranu
while (j <= i) {
BlobFileIndex deltaF = metadata->deltaFiles[j];
chunk.deltaFiles.emplace_back_deep(rep.arena, deltaF.filename, deltaF.offset, deltaF.length);
bwData->stats.readReqDeltaBytesReturned += deltaF.length;
j++;
}
@ -680,6 +696,9 @@ static void handleBlobGranuleFileRequest(BlobWorkerData* bwData, const BlobGranu
rep.chunks.push_back(rep.arena, chunk);
bwData->stats.readReqTotalFilesReturned += chunk.deltaFiles.size() + int(chunk.snapshotFile.present());
bwData->stats.readReqDeltaFilesReturned += chunk.deltaFiles.size();
// TODO yield?
}
req.reply.send(rep);
@ -923,9 +942,11 @@ ACTOR Future<Void> blobWorker(BlobWorkerInterface bwInterf, Reference<AsyncVar<S
/*printf("Got blob granule request [%s - %s)\n",
req.keyRange.begin.printable().c_str(),
req.keyRange.end.printable().c_str());*/
++self.stats.readRequests;
handleBlobGranuleFileRequest(&self, req);
}
when(AssignBlobRangeRequest _req = waitNext(bwInterf.assignBlobRangeRequest.getFuture())) {
++self.stats.rangeAssignmentRequests;
state AssignBlobRangeRequest req = _req;
if (BW_DEBUG) {
printf("Worker %s %s range [%s - %s) @ (%lld, %lld)\n",
@ -956,6 +977,7 @@ ACTOR Future<Void> blobWorker(BlobWorkerInterface bwInterf, Reference<AsyncVar<S
// TODO with range versioning, need to persist only after it's confirmed
changeBlobRange(&self, req.keyRange, req.managerEpoch, req.managerSeqno, req.isAssign);
req.isAssign ? ++self.stats.numRangesAssigned : --self.stats.numRangesAssigned;
req.reply.send(AssignBlobRangeReply(true));
}
}