PR changes

This commit is contained in:
Suraj Gupta 2021-09-03 10:10:15 -04:00
parent a0b3446572
commit 5aebd77f0a
3 changed files with 25 additions and 20 deletions

View File

@ -253,6 +253,8 @@ static void applyDeltas(std::map<KeyRef, ValueRef>* dataMap,
}
}
// TODO: improve the interface of this function so that it doesn't need
// to be passed the entire BlobWorkerStats object
ACTOR Future<RangeResult> readBlobGranule(BlobGranuleChunkRef chunk,
KeyRangeRef keyRange,
Version readVersion,

View File

@ -30,11 +30,10 @@ struct BlobWorkerStats {
Counter deltaBytesWritten, snapshotBytesWritten;
Counter bytesReadFromFDBForInitialSnapshot;
Counter bytesReadFromS3ForCompaction;
Counter rangeAssignmentRequests;
Counter readRequests;
Counter rangeAssignmentRequests, readRequests;
Counter wrongShardServer;
Counter rangeFeedInputBytes, rangeFeedOutputBytes;
Counter readReqTotalFilesReturned, readReqDeltaFilesReturned;
Counter changeFeedInputBytes;
Counter readReqTotalFilesReturned;
Counter readReqDeltaBytesReturned;
int numRangesAssigned;
@ -53,8 +52,8 @@ struct BlobWorkerStats {
bytesReadFromS3ForCompaction("BytesReadFromS3ForCompaction", cc),
rangeAssignmentRequests("RangeAssignmentRequests", cc), readRequests("ReadRequests", cc),
wrongShardServer("WrongShardServer", cc),
rangeFeedInputBytes("RangeFeedInputBytes", cc), rangeFeedOutputBytes("RangeFeedOutputBytes", cc),
readReqTotalFilesReturned("ReadReqTotalFilesReturned", cc), readReqDeltaFilesReturned("ReadReqDeltaFilesReturned", cc),
changeFeedInputBytes("RangeFeedInputBytes", cc),
readReqTotalFilesReturned("ReadReqTotalFilesReturned", cc),
readReqDeltaBytesReturned("ReadReqDeltaBytesReturned", cc),
numRangesAssigned(0), mutationBytesBuffered(0)
{

View File

@ -220,14 +220,17 @@ ACTOR Future<BlobFileIndex> writeDeltaFile(BlobWorkerData* bwData,
}
}
} catch (Error& e) {
if (e.code() == error_code_operation_cancelled) {
throw e;
}
// FIXME: only delete if key doesn't exist
// if transaction throws non-retryable error, delete s3 file before exiting
if (BW_DEBUG) {
printf("deleting s3 delta file %s after error %s\n", fname.c_str(), e.name());
}
state Error eState = e;
wait(bwData->bstore->deleteFile(fname));
++bwData->stats.s3DeleteReqs;
wait(bwData->bstore->deleteFile(fname));
throw eState;
}
}
@ -287,7 +290,7 @@ ACTOR Future<BlobFileIndex> writeSnapshot(BlobWorkerData* bwData,
++bwData->stats.s3PutReqs;
++bwData->stats.snapshotFilesWritten;
bwData->stats.snapshotBytesWritten += serialized.size(); // vs. snapshot.size()?
bwData->stats.snapshotBytesWritten += serialized.size();
wait(objectFile->append(serialized.begin(), serialized.size()));
wait(objectFile->finish());
@ -314,14 +317,17 @@ ACTOR Future<BlobFileIndex> writeSnapshot(BlobWorkerData* bwData,
}
}
} catch (Error& e) {
if (e.code() == error_code_operation_cancelled) {
throw e;
}
// FIXME: only delete if key doesn't exist
// if transaction throws non-retryable error, delete s3 file before exiting
if (BW_DEBUG) {
printf("deleting s3 snapshot file %s after error %s\n", fname.c_str(), e.name());
}
state Error eState = e;
wait(bwData->bstore->deleteFile(fname));
++bwData->stats.s3DeleteReqs;
wait(bwData->bstore->deleteFile(fname));
throw eState;
}
@ -496,7 +502,6 @@ ACTOR Future<Void> blobGranuleUpdateFiles(BlobWorkerData* bwData, Reference<Gran
metadata->currentDeltaVersion = metadata->lastWriteVersion;
rangeFeedFuture = bwData->db->getRangeFeedStream(
rangeFeedStream, rangeFeedData.first, newSnapshotFile.version + 1, maxVersion, metadata->keyRange);
bwData->stats.rangeFeedInputBytes += metadata->keyRange.expectedSize();
loop {
// TODO: Buggify delay in change feed stream
@ -507,9 +512,9 @@ ACTOR Future<Void> blobGranuleUpdateFiles(BlobWorkerData* bwData, Reference<Gran
for (auto& delta : deltas.mutations) {
// FIXME: add mutation tracking here
// 8 for version, 1 for type, 4 for each param length then actual param size
metadata->currentDeltaBytes += 17 + delta.param1.size() + delta.param2.size();
bwData->stats.rangeFeedOutputBytes += delta.expectedSize();
bwData->stats.mutationBytesBuffered = metadata->currentDeltaBytes;
metadata->currentDeltaBytes += delta.totalSize();
bwData->stats.changeFeedInputBytes += delta.totalSize();
bwData->stats.mutationBytesBuffered += delta.totalSize();
}
}
@ -537,13 +542,13 @@ ACTOR Future<Void> blobGranuleUpdateFiles(BlobWorkerData* bwData, Reference<Gran
metadata->lastWriteVersion = metadata->currentDeltaVersion;
metadata->bytesInNewDeltaFiles += metadata->currentDeltaBytes;
bwData->stats.mutationBytesBuffered -= metadata->currentDeltaBytes;
// reset current deltas
metadata->deltaArena = Arena();
metadata->currentDeltas = GranuleDeltas();
metadata->currentDeltaBytes = 0;
bwData->stats.mutationBytesBuffered = 0;
if (BW_DEBUG) {
printf("Popping range feed %s at %lld\n\n",
rangeFeedData.first.printable().c_str(),
@ -598,7 +603,7 @@ static void handleBlobGranuleFileRequest(BlobWorkerData* bwData, const BlobGranu
if (lastRangeEnd < r.begin() || !isValid) {
if (BW_REQUEST_DEBUG) {
printf("No %s blob data for [%s - %s) in request range [%s - %s), skipping request\n",
isValid ? "" : "valid",
isValid ? "" : "valid",
lastRangeEnd.printable().c_str(),
r.begin().printable().c_str(),
req.keyRange.begin.printable().c_str(),
@ -697,7 +702,6 @@ static void handleBlobGranuleFileRequest(BlobWorkerData* bwData, const BlobGranu
rep.chunks.push_back(rep.arena, chunk);
bwData->stats.readReqTotalFilesReturned += chunk.deltaFiles.size() + int(chunk.snapshotFile.present());
bwData->stats.readReqDeltaFilesReturned += chunk.deltaFiles.size();
// TODO yield?
}
@ -996,4 +1000,4 @@ ACTOR Future<Void> blobWorker(BlobWorkerInterface bwInterf, Reference<AsyncVar<S
return Void();
}
// TODO add unit tests for assign/revoke range, especially version ordering
// TODO add unit tests for assign/revoke range, especially version ordering