diff --git a/fdbclient/BlobGranuleReader.actor.cpp b/fdbclient/BlobGranuleReader.actor.cpp index 481eb4bab6..50c50990d8 100644 --- a/fdbclient/BlobGranuleReader.actor.cpp +++ b/fdbclient/BlobGranuleReader.actor.cpp @@ -253,6 +253,8 @@ static void applyDeltas(std::map* dataMap, } } +// TODO: improve the interface of this function so that it doesn't need +// to be passed the entire BlobWorkerStats object ACTOR Future readBlobGranule(BlobGranuleChunkRef chunk, KeyRangeRef keyRange, Version readVersion, diff --git a/fdbclient/BlobWorkerCommon.h b/fdbclient/BlobWorkerCommon.h index fe58155133..a9c602f579 100644 --- a/fdbclient/BlobWorkerCommon.h +++ b/fdbclient/BlobWorkerCommon.h @@ -30,11 +30,10 @@ struct BlobWorkerStats { Counter deltaBytesWritten, snapshotBytesWritten; Counter bytesReadFromFDBForInitialSnapshot; Counter bytesReadFromS3ForCompaction; - Counter rangeAssignmentRequests; - Counter readRequests; + Counter rangeAssignmentRequests, readRequests; Counter wrongShardServer; - Counter rangeFeedInputBytes, rangeFeedOutputBytes; - Counter readReqTotalFilesReturned, readReqDeltaFilesReturned; + Counter changeFeedInputBytes; + Counter readReqTotalFilesReturned; Counter readReqDeltaBytesReturned; int numRangesAssigned; @@ -53,8 +52,8 @@ struct BlobWorkerStats { bytesReadFromS3ForCompaction("BytesReadFromS3ForCompaction", cc), rangeAssignmentRequests("RangeAssignmentRequests", cc), readRequests("ReadRequests", cc), wrongShardServer("WrongShardServer", cc), - rangeFeedInputBytes("RangeFeedInputBytes", cc), rangeFeedOutputBytes("RangeFeedOutputBytes", cc), - readReqTotalFilesReturned("ReadReqTotalFilesReturned", cc), readReqDeltaFilesReturned("ReadReqDeltaFilesReturned", cc), + changeFeedInputBytes("RangeFeedInputBytes", cc), + readReqTotalFilesReturned("ReadReqTotalFilesReturned", cc), readReqDeltaBytesReturned("ReadReqDeltaBytesReturned", cc), numRangesAssigned(0), mutationBytesBuffered(0) { diff --git a/fdbserver/BlobWorker.actor.cpp b/fdbserver/BlobWorker.actor.cpp index f8b821346e..5cee970f14 100644 --- a/fdbserver/BlobWorker.actor.cpp +++ b/fdbserver/BlobWorker.actor.cpp @@ -220,14 +220,17 @@ ACTOR Future writeDeltaFile(BlobWorkerData* bwData, } } } catch (Error& e) { + if (e.code() == error_code_operation_cancelled) { + throw e; + } + // FIXME: only delete if key doesn't exist - // if transaction throws non-retryable error, delete s3 file before exiting if (BW_DEBUG) { printf("deleting s3 delta file %s after error %s\n", fname.c_str(), e.name()); } state Error eState = e; - wait(bwData->bstore->deleteFile(fname)); ++bwData->stats.s3DeleteReqs; + wait(bwData->bstore->deleteFile(fname)); throw eState; } } @@ -287,7 +290,7 @@ ACTOR Future writeSnapshot(BlobWorkerData* bwData, ++bwData->stats.s3PutReqs; ++bwData->stats.snapshotFilesWritten; - bwData->stats.snapshotBytesWritten += serialized.size(); // vs. snapshot.size()? + bwData->stats.snapshotBytesWritten += serialized.size(); wait(objectFile->append(serialized.begin(), serialized.size())); wait(objectFile->finish()); @@ -314,14 +317,17 @@ ACTOR Future writeSnapshot(BlobWorkerData* bwData, } } } catch (Error& e) { + if (e.code() == error_code_operation_cancelled) { + throw e; + } + // FIXME: only delete if key doesn't exist - // if transaction throws non-retryable error, delete s3 file before exiting if (BW_DEBUG) { printf("deleting s3 snapshot file %s after error %s\n", fname.c_str(), e.name()); } state Error eState = e; - wait(bwData->bstore->deleteFile(fname)); ++bwData->stats.s3DeleteReqs; + wait(bwData->bstore->deleteFile(fname)); throw eState; } @@ -496,7 +502,6 @@ ACTOR Future blobGranuleUpdateFiles(BlobWorkerData* bwData, ReferencecurrentDeltaVersion = metadata->lastWriteVersion; rangeFeedFuture = bwData->db->getRangeFeedStream( rangeFeedStream, rangeFeedData.first, newSnapshotFile.version + 1, maxVersion, metadata->keyRange); - bwData->stats.rangeFeedInputBytes += metadata->keyRange.expectedSize(); loop { // TODO: Buggify delay in change feed stream @@ -507,9 +512,9 @@ ACTOR Future blobGranuleUpdateFiles(BlobWorkerData* bwData, ReferencecurrentDeltaBytes += 17 + delta.param1.size() + delta.param2.size(); - bwData->stats.rangeFeedOutputBytes += delta.expectedSize(); - bwData->stats.mutationBytesBuffered = metadata->currentDeltaBytes; + metadata->currentDeltaBytes += delta.totalSize(); + bwData->stats.changeFeedInputBytes += delta.totalSize(); + bwData->stats.mutationBytesBuffered += delta.totalSize(); } } @@ -537,13 +542,13 @@ ACTOR Future blobGranuleUpdateFiles(BlobWorkerData* bwData, ReferencelastWriteVersion = metadata->currentDeltaVersion; metadata->bytesInNewDeltaFiles += metadata->currentDeltaBytes; + bwData->stats.mutationBytesBuffered -= metadata->currentDeltaBytes; + // reset current deltas metadata->deltaArena = Arena(); metadata->currentDeltas = GranuleDeltas(); metadata->currentDeltaBytes = 0; - bwData->stats.mutationBytesBuffered = 0; - if (BW_DEBUG) { printf("Popping range feed %s at %lld\n\n", rangeFeedData.first.printable().c_str(), @@ -598,7 +603,7 @@ static void handleBlobGranuleFileRequest(BlobWorkerData* bwData, const BlobGranu if (lastRangeEnd < r.begin() || !isValid) { if (BW_REQUEST_DEBUG) { printf("No %s blob data for [%s - %s) in request range [%s - %s), skipping request\n", - isValid ? "" : "valid", + isValid ? "" : "valid", lastRangeEnd.printable().c_str(), r.begin().printable().c_str(), req.keyRange.begin.printable().c_str(), @@ -697,7 +702,6 @@ static void handleBlobGranuleFileRequest(BlobWorkerData* bwData, const BlobGranu rep.chunks.push_back(rep.arena, chunk); bwData->stats.readReqTotalFilesReturned += chunk.deltaFiles.size() + int(chunk.snapshotFile.present()); - bwData->stats.readReqDeltaFilesReturned += chunk.deltaFiles.size(); // TODO yield? } @@ -996,4 +1000,4 @@ ACTOR Future blobWorker(BlobWorkerInterface bwInterf, Reference