From a0b3446572f9b0b663d1a833ad95362cae3006fe Mon Sep 17 00:00:00 2001 From: Suraj Gupta Date: Fri, 3 Sep 2021 10:10:15 -0400 Subject: [PATCH] Add metrics for blob worker. We want to add metrics for the blob worker to evaluate its performance more concretely. We decided to track the following information: - s3 put requests - s3 get requests - S3 delete requests - Delta files written - Snapshot files written - Delta bytes written - Snapshot bytes written - Number of current ranges assigned - Bytes read from FDB (initial snapshot) - Bytes read from S3 (compaction) - Read requests count - Read files returned - Read deltas returned - Read delta bytes returned - Ranges assigned - Ranges revoked - Number of current ranges assigned - Total mutation bytes buffered across all ranges // current or accumulated - Range feed bytes input - Range feed mutations input - Wrong Shard Server count --- fdbclient/BlobGranuleReader.actor.cpp | 22 +++++++-- fdbclient/BlobGranuleReader.actor.h | 4 +- fdbclient/BlobWorkerCommon.h | 68 +++++++++++++++++++++++++++ fdbserver/BlobWorker.actor.cpp | 52 ++++++++++++++------ 4 files changed, 125 insertions(+), 21 deletions(-) create mode 100644 fdbclient/BlobWorkerCommon.h diff --git a/fdbclient/BlobGranuleReader.actor.cpp b/fdbclient/BlobGranuleReader.actor.cpp index b9ea93b6c0..481eb4bab6 100644 --- a/fdbclient/BlobGranuleReader.actor.cpp +++ b/fdbclient/BlobGranuleReader.actor.cpp @@ -256,7 +256,9 @@ static void applyDeltas(std::map* dataMap, ACTOR Future readBlobGranule(BlobGranuleChunkRef chunk, KeyRangeRef keyRange, Version readVersion, - Reference bstore) { + Reference bstore, + Optional stats) { + // TODO REMOVE with V2 of protocol ASSERT(readVersion == chunk.includedVersion); // Arena to hold all allocations for applying deltas. Most of it, and the arenas produced by reading the files, @@ -269,13 +271,23 @@ ACTOR Future readBlobGranule(BlobGranuleChunkRef chunk, try { state std::map dataMap; - Future readSnapshotFuture = chunk.snapshotFile.present() - ? readSnapshotFile(bstore, chunk.snapshotFile.get(), keyRange, &dataMap) - : Future(Arena()); + Future readSnapshotFuture; + if (chunk.snapshotFile.present()) { + readSnapshotFuture = readSnapshotFile(bstore, chunk.snapshotFile.get(), keyRange, &dataMap); + if (stats.present()) { + ++stats.get()->s3GetReqs; + } + } else { + readSnapshotFuture = Future(Arena()); + } + state std::vector>> readDeltaFutures; readDeltaFutures.reserve(chunk.deltaFiles.size()); for (BlobFilenameRef deltaFile : chunk.deltaFiles) { readDeltaFutures.push_back(readDeltaFile(bstore, deltaFile, keyRange, readVersion)); + if (stats.present()) { + ++stats.get()->s3GetReqs; + } } Arena snapshotArena = wait(readSnapshotFuture); @@ -516,4 +528,4 @@ TEST_CASE("/blobgranule/reader/applyDelta") { ASSERT(data == correctData); return Void(); -} \ No newline at end of file +} diff --git a/fdbclient/BlobGranuleReader.actor.h b/fdbclient/BlobGranuleReader.actor.h index 78016e37e4..97bf45ddc7 100644 --- a/fdbclient/BlobGranuleReader.actor.h +++ b/fdbclient/BlobGranuleReader.actor.h @@ -30,6 +30,7 @@ #include "fdbclient/BlobWorkerInterface.h" #include "fdbclient/BackupContainerFileSystem.h" +#include "fdbclient/BlobWorkerCommon.h" #include "flow/actorcompiler.h" // This must be the last #include. @@ -38,7 +39,8 @@ ACTOR Future readBlobGranule(BlobGranuleChunkRef chunk, KeyRangeRef keyRange, Version readVersion, - Reference bstore); + Reference bstore, + Optional stats=Optional()); ACTOR Future readBlobGranules(BlobGranuleFileRequest request, BlobGranuleFileReply reply, diff --git a/fdbclient/BlobWorkerCommon.h b/fdbclient/BlobWorkerCommon.h new file mode 100644 index 0000000000..fe58155133 --- /dev/null +++ b/fdbclient/BlobWorkerCommon.h @@ -0,0 +1,68 @@ +/* + * BlobWorkerCommon.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2021 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FDBCLIENT_BLOBWORKERCOMMON_H +#define FDBCLIENT_BLOBWORKERCOMMON_H + +#include "fdbrpc/Stats.h" + +struct BlobWorkerStats { + CounterCollection cc; + Counter s3PutReqs, s3GetReqs, s3DeleteReqs; + Counter deltaFilesWritten, snapshotFilesWritten; + Counter deltaBytesWritten, snapshotBytesWritten; + Counter bytesReadFromFDBForInitialSnapshot; + Counter bytesReadFromS3ForCompaction; + Counter rangeAssignmentRequests; + Counter readRequests; + Counter wrongShardServer; + Counter rangeFeedInputBytes, rangeFeedOutputBytes; + Counter readReqTotalFilesReturned, readReqDeltaFilesReturned; + Counter readReqDeltaBytesReturned; + + int numRangesAssigned; + int mutationBytesBuffered; + + Future logger; + + // Current stats maintained for a given blob worker process + explicit BlobWorkerStats(UID id, double interval) + : cc("BlobWorkerStats", id.toString()), + + s3PutReqs("S3PutReqs", cc), s3GetReqs("S3GetReqs", cc), s3DeleteReqs("S3DeleteReqs", cc), + deltaFilesWritten("DeltaFilesWritten", cc), snapshotFilesWritten("SnapshotFilesWritten", cc), + deltaBytesWritten("DeltaBytesWritten", cc), snapshotBytesWritten("SnapshotBytesWritten", cc), + bytesReadFromFDBForInitialSnapshot("BytesReadFromFDBForInitialSnapshot", cc), + bytesReadFromS3ForCompaction("BytesReadFromS3ForCompaction", cc), + rangeAssignmentRequests("RangeAssignmentRequests", cc), readRequests("ReadRequests", cc), + wrongShardServer("WrongShardServer", cc), + rangeFeedInputBytes("RangeFeedInputBytes", cc), rangeFeedOutputBytes("RangeFeedOutputBytes", cc), + readReqTotalFilesReturned("ReadReqTotalFilesReturned", cc), readReqDeltaFilesReturned("ReadReqDeltaFilesReturned", cc), + readReqDeltaBytesReturned("ReadReqDeltaBytesReturned", cc), + numRangesAssigned(0), mutationBytesBuffered(0) + { + specialCounter(cc, "NumRangesAssigned", [this]() { return this->numRangesAssigned; }); + specialCounter(cc, "MutationBytesBuffered", [this]() { return this->mutationBytesBuffered; }); + + logger = traceCounters("BlobWorkerMetrics", id, interval, &cc, "BlobWorkerMetrics"); + } +}; + +#endif \ No newline at end of file diff --git a/fdbserver/BlobWorker.actor.cpp b/fdbserver/BlobWorker.actor.cpp index 351bb8f51e..f8b821346e 100644 --- a/fdbserver/BlobWorker.actor.cpp +++ b/fdbserver/BlobWorker.actor.cpp @@ -22,6 +22,7 @@ #include "fdbrpc/simulator.h" #include "fdbclient/BackupContainerFileSystem.h" #include "fdbclient/BlobGranuleReader.actor.h" +#include "fdbclient/BlobWorkerCommon.h" #include "fdbclient/BlobWorkerInterface.h" #include "fdbclient/DatabaseContext.h" #include "fdbclient/NativeAPI.actor.h" @@ -89,6 +90,8 @@ struct BlobWorkerData { UID id; Database db; + BlobWorkerStats stats; + LocalityData locality; int64_t currentManagerEpoch = -1; @@ -100,7 +103,7 @@ struct BlobWorkerData { KeyRangeMap granuleMetadata; - BlobWorkerData(UID id, Database db) : id(id), db(db) {} + BlobWorkerData(UID id, Database db) : id(id), db(db), stats(id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL) {} ~BlobWorkerData() { printf("Destroying blob worker data for %s\n", id.toString().c_str()); } }; @@ -182,6 +185,11 @@ ACTOR Future writeDeltaFile(BlobWorkerData* bwData, state Value serialized = ObjectWriter::toValue(*deltasToWrite, Unversioned()); state Reference objectFile = wait(bwData->bstore->writeFile(fname)); + + ++bwData->stats.s3PutReqs; + ++bwData->stats.deltaFilesWritten; + bwData->stats.deltaBytesWritten += serialized.size(); + wait(objectFile->append(serialized.begin(), serialized.size())); wait(objectFile->finish()); @@ -219,6 +227,7 @@ ACTOR Future writeDeltaFile(BlobWorkerData* bwData, } state Error eState = e; wait(bwData->bstore->deleteFile(fname)); + ++bwData->stats.s3DeleteReqs; throw eState; } } @@ -275,6 +284,11 @@ ACTOR Future writeSnapshot(BlobWorkerData* bwData, // write to s3 using multi part upload state Reference objectFile = wait(bwData->bstore->writeFile(fname)); + + ++bwData->stats.s3PutReqs; + ++bwData->stats.snapshotFilesWritten; + bwData->stats.snapshotBytesWritten += serialized.size(); // vs. snapshot.size()? + wait(objectFile->append(serialized.begin(), serialized.size())); wait(objectFile->finish()); @@ -307,6 +321,7 @@ ACTOR Future writeSnapshot(BlobWorkerData* bwData, } state Error eState = e; wait(bwData->bstore->deleteFile(fname)); + ++bwData->stats.s3DeleteReqs; throw eState; } @@ -342,6 +357,7 @@ ACTOR Future dumpInitialSnapshotFromFDB(BlobWorkerData* bwData, R // TODO: use streaming range read // TODO knob for limit? RangeResult res = wait(tr->getRange(KeyRangeRef(beginKey, metadata->keyRange.end), 1000)); + bwData->stats.bytesReadFromFDBForInitialSnapshot += res.size(); rowsStream.send(res); if (res.more) { beginKey = keyAfter(res.back().key); @@ -412,7 +428,8 @@ ACTOR Future compactFromBlob(BlobWorkerData* bwData, Reference rowsStream; state Future snapshotWriter = writeSnapshot( bwData, metadata->keyRange, metadata->lockEpoch, metadata->lockSeqno, version, rowsStream); - RangeResult newGranule = wait(readBlobGranule(chunk, metadata->keyRange, version, bwData->bstore)); + RangeResult newGranule = wait(readBlobGranule(chunk, metadata->keyRange, version, bwData->bstore, &bwData->stats)); + bwData->stats.bytesReadFromS3ForCompaction += newGranule.expectedSize(); rowsStream.send(std::move(newGranule)); rowsStream.sendError(end_of_stream()); @@ -479,6 +496,7 @@ ACTOR Future blobGranuleUpdateFiles(BlobWorkerData* bwData, ReferencecurrentDeltaVersion = metadata->lastWriteVersion; rangeFeedFuture = bwData->db->getRangeFeedStream( rangeFeedStream, rangeFeedData.first, newSnapshotFile.version + 1, maxVersion, metadata->keyRange); + bwData->stats.rangeFeedInputBytes += metadata->keyRange.expectedSize(); loop { // TODO: Buggify delay in change feed stream @@ -490,6 +508,8 @@ ACTOR Future blobGranuleUpdateFiles(BlobWorkerData* bwData, ReferencecurrentDeltaBytes += 17 + delta.param1.size() + delta.param2.size(); + bwData->stats.rangeFeedOutputBytes += delta.expectedSize(); + bwData->stats.mutationBytesBuffered = metadata->currentDeltaBytes; } } @@ -522,6 +542,8 @@ ACTOR Future blobGranuleUpdateFiles(BlobWorkerData* bwData, ReferencecurrentDeltas = GranuleDeltas(); metadata->currentDeltaBytes = 0; + bwData->stats.mutationBytesBuffered = 0; + if (BW_DEBUG) { printf("Popping range feed %s at %lld\n\n", rangeFeedData.first.printable().c_str(), @@ -572,25 +594,17 @@ static void handleBlobGranuleFileRequest(BlobWorkerData* bwData, const BlobGranu // check for gaps as errors before doing actual data copying KeyRef lastRangeEnd = req.keyRange.begin; for (auto& r : checkRanges) { - if (lastRangeEnd < r.begin()) { + bool isValid = r.value().activeMetadata.isValid(); + if (lastRangeEnd < r.begin() || !isValid) { if (BW_REQUEST_DEBUG) { - printf("No blob data for [%s - %s) in request range [%s - %s), skipping request\n", - lastRangeEnd.printable().c_str(), - r.begin().printable().c_str(), - req.keyRange.begin.printable().c_str(), - req.keyRange.end.printable().c_str()); - } - req.reply.sendError(wrong_shard_server()); - return; - } - if (!r.value().activeMetadata.isValid()) { - if (BW_REQUEST_DEBUG) { - printf("No valid blob data for [%s - %s) in request range [%s - %s), skipping request\n", + printf("No %s blob data for [%s - %s) in request range [%s - %s), skipping request\n", + isValid ? "" : "valid", lastRangeEnd.printable().c_str(), r.begin().printable().c_str(), req.keyRange.begin.printable().c_str(), req.keyRange.end.printable().c_str()); } + ++bwData->stats.wrongShardServer; req.reply.sendError(wrong_shard_server()); return; } @@ -604,6 +618,7 @@ static void handleBlobGranuleFileRequest(BlobWorkerData* bwData, const BlobGranu req.keyRange.begin.printable().c_str(), req.keyRange.end.printable().c_str()); } + ++bwData->stats.wrongShardServer; req.reply.sendError(wrong_shard_server()); return; } @@ -664,6 +679,7 @@ static void handleBlobGranuleFileRequest(BlobWorkerData* bwData, const BlobGranu while (j <= i) { BlobFileIndex deltaF = metadata->deltaFiles[j]; chunk.deltaFiles.emplace_back_deep(rep.arena, deltaF.filename, deltaF.offset, deltaF.length); + bwData->stats.readReqDeltaBytesReturned += deltaF.length; j++; } @@ -680,6 +696,9 @@ static void handleBlobGranuleFileRequest(BlobWorkerData* bwData, const BlobGranu rep.chunks.push_back(rep.arena, chunk); + bwData->stats.readReqTotalFilesReturned += chunk.deltaFiles.size() + int(chunk.snapshotFile.present()); + bwData->stats.readReqDeltaFilesReturned += chunk.deltaFiles.size(); + // TODO yield? } req.reply.send(rep); @@ -923,9 +942,11 @@ ACTOR Future blobWorker(BlobWorkerInterface bwInterf, Reference blobWorker(BlobWorkerInterface bwInterf, Reference