Tightening up memory management in the blob worker

This commit is contained in:
Josh Slocum 2022-02-01 14:52:28 -06:00
parent d0e89ecdd5
commit a42c80faa9
1 changed files with 34 additions and 30 deletions

View File

@ -64,9 +64,7 @@ struct GranuleMetadata : NonCopyable, ReferenceCounted<GranuleMetadata> {
KeyRange keyRange; KeyRange keyRange;
GranuleFiles files; GranuleFiles files;
GranuleDeltas currentDeltas; // only contain deltas in pendingDeltaVersion + 1, bufferedDeltaVersion Standalone<GranuleDeltas> currentDeltas; // only contain deltas in pendingDeltaVersion + 1, bufferedDeltaVersion
// TODO get rid of this and do Reference<Standalone<GranuleDeltas>>?
Arena deltaArena;
uint64_t bytesInNewDeltaFiles = 0; uint64_t bytesInNewDeltaFiles = 0;
uint64_t bufferedDeltaBytes = 0; uint64_t bufferedDeltaBytes = 0;
@ -431,8 +429,7 @@ ACTOR Future<BlobFileIndex> writeDeltaFile(Reference<BlobWorkerData> bwData,
UID granuleID, UID granuleID,
int64_t epoch, int64_t epoch,
int64_t seqno, int64_t seqno,
Arena deltaArena, Standalone<GranuleDeltas> deltasToWrite,
GranuleDeltas deltasToWrite,
Version currentDeltaVersion, Version currentDeltaVersion,
Future<BlobFileIndex> previousDeltaFileFuture, Future<BlobFileIndex> previousDeltaFileFuture,
Future<Void> waitCommitted, Future<Void> waitCommitted,
@ -446,18 +443,23 @@ ACTOR Future<BlobFileIndex> writeDeltaFile(Reference<BlobWorkerData> bwData,
std::to_string(currentDeltaVersion) + ".delta"; std::to_string(currentDeltaVersion) + ".delta";
state Value serialized = ObjectWriter::toValue(deltasToWrite, Unversioned()); state Value serialized = ObjectWriter::toValue(deltasToWrite, Unversioned());
state size_t serializedSize = serialized.size();
// FIXME: technically we can free up deltaArena here to reduce memory // Free up deltasToWrite here to reduce memory
deltasToWrite = Standalone<GranuleDeltas>();
state Reference<IBackupFile> objectFile = wait(bwData->bstore->writeFile(fname)); state Reference<IBackupFile> objectFile = wait(bwData->bstore->writeFile(fname));
++bwData->stats.s3PutReqs; ++bwData->stats.s3PutReqs;
++bwData->stats.deltaFilesWritten; ++bwData->stats.deltaFilesWritten;
bwData->stats.deltaBytesWritten += serialized.size(); bwData->stats.deltaBytesWritten += serializedSize;
wait(objectFile->append(serialized.begin(), serialized.size())); wait(objectFile->append(serialized.begin(), serializedSize));
wait(objectFile->finish()); wait(objectFile->finish());
// free serialized since it is persisted in blob
serialized = Value();
state int numIterations = 0; state int numIterations = 0;
try { try {
// before updating FDB, wait for the delta file version to be committed and previous delta files to finish // before updating FDB, wait for the delta file version to be committed and previous delta files to finish
@ -474,7 +476,7 @@ ACTOR Future<BlobFileIndex> writeDeltaFile(Reference<BlobWorkerData> bwData,
wait(readAndCheckGranuleLock(tr, keyRange, epoch, seqno)); wait(readAndCheckGranuleLock(tr, keyRange, epoch, seqno));
Key dfKey = blobGranuleFileKeyFor(granuleID, 'D', currentDeltaVersion); Key dfKey = blobGranuleFileKeyFor(granuleID, 'D', currentDeltaVersion);
Value dfValue = blobGranuleFileValueFor(fname, 0, serialized.size()); Value dfValue = blobGranuleFileValueFor(fname, 0, serializedSize);
tr->set(dfKey, dfValue); tr->set(dfKey, dfValue);
if (oldGranuleComplete.present()) { if (oldGranuleComplete.present()) {
@ -493,7 +495,7 @@ ACTOR Future<BlobFileIndex> writeDeltaFile(Reference<BlobWorkerData> bwData,
keyRange.begin.printable(), keyRange.begin.printable(),
keyRange.end.printable(), keyRange.end.printable(),
fname, fname,
serialized.size(), serializedSize,
currentDeltaVersion, currentDeltaVersion,
tr->getCommittedVersion()); tr->getCommittedVersion());
} }
@ -501,7 +503,7 @@ ACTOR Future<BlobFileIndex> writeDeltaFile(Reference<BlobWorkerData> bwData,
if (BUGGIFY_WITH_PROB(0.01)) { if (BUGGIFY_WITH_PROB(0.01)) {
wait(delay(deterministicRandom()->random01())); wait(delay(deterministicRandom()->random01()));
} }
return BlobFileIndex(currentDeltaVersion, fname, 0, serialized.size()); return BlobFileIndex(currentDeltaVersion, fname, 0, serializedSize);
} catch (Error& e) { } catch (Error& e) {
numIterations++; numIterations++;
wait(tr->onError(e)); wait(tr->onError(e));
@ -545,16 +547,15 @@ ACTOR Future<BlobFileIndex> writeSnapshot(Reference<BlobWorkerData> bwData,
state std::string fname = deterministicRandom()->randomUniqueID().shortString() + "_" + granuleID.toString() + state std::string fname = deterministicRandom()->randomUniqueID().shortString() + "_" + granuleID.toString() +
"_T" + std::to_string((uint64_t)(1000.0 * now())) + "_V" + std::to_string(version) + "_T" + std::to_string((uint64_t)(1000.0 * now())) + "_V" + std::to_string(version) +
".snapshot"; ".snapshot";
state Arena arena; state Standalone<GranuleSnapshot> snapshot;
state GranuleSnapshot snapshot;
wait(delay(0, TaskPriority::BlobWorkerUpdateStorage)); wait(delay(0, TaskPriority::BlobWorkerUpdateStorage));
loop { loop {
try { try {
RangeResult res = waitNext(rows.getFuture()); RangeResult res = waitNext(rows.getFuture());
arena.dependsOn(res.arena()); snapshot.arena().dependsOn(res.arena());
snapshot.append(arena, res.begin(), res.size()); snapshot.append(snapshot.arena(), res.begin(), res.size());
wait(yield(TaskPriority::BlobWorkerUpdateStorage)); wait(yield(TaskPriority::BlobWorkerUpdateStorage));
} catch (Error& e) { } catch (Error& e) {
if (e.code() == error_code_end_of_stream) { if (e.code() == error_code_end_of_stream) {
@ -587,20 +588,26 @@ ACTOR Future<BlobFileIndex> writeSnapshot(Reference<BlobWorkerData> bwData,
ASSERT(snapshot[i].key < snapshot[i + 1].key); ASSERT(snapshot[i].key < snapshot[i + 1].key);
} }
// TODO is this easy to read as a flatbuffer from reader? Need to be sure about this data format
state Value serialized = ObjectWriter::toValue(snapshot, Unversioned()); state Value serialized = ObjectWriter::toValue(snapshot, Unversioned());
state size_t serializedSize = serialized.size();
// write to s3 using multi part upload // free snapshot to reduce memory
snapshot = Standalone<GranuleSnapshot>();
// write to blob using multi part upload
state Reference<IBackupFile> objectFile = wait(bwData->bstore->writeFile(fname)); state Reference<IBackupFile> objectFile = wait(bwData->bstore->writeFile(fname));
++bwData->stats.s3PutReqs; ++bwData->stats.s3PutReqs;
++bwData->stats.snapshotFilesWritten; ++bwData->stats.snapshotFilesWritten;
bwData->stats.snapshotBytesWritten += serialized.size(); bwData->stats.snapshotBytesWritten += serializedSize;
// TODO: inject write error // TODO: inject write error
wait(objectFile->append(serialized.begin(), serialized.size())); wait(objectFile->append(serialized.begin(), serializedSize));
wait(objectFile->finish()); wait(objectFile->finish());
// free serialized since it is persisted in blob
serialized = Value();
wait(delay(0, TaskPriority::BlobWorkerUpdateFDB)); wait(delay(0, TaskPriority::BlobWorkerUpdateFDB));
// object uploaded successfully, save it to system key space // object uploaded successfully, save it to system key space
@ -613,7 +620,7 @@ ACTOR Future<BlobFileIndex> writeSnapshot(Reference<BlobWorkerData> bwData,
try { try {
wait(readAndCheckGranuleLock(tr, keyRange, epoch, seqno)); wait(readAndCheckGranuleLock(tr, keyRange, epoch, seqno));
Key snapshotFileKey = blobGranuleFileKeyFor(granuleID, 'S', version); Key snapshotFileKey = blobGranuleFileKeyFor(granuleID, 'S', version);
Key snapshotFileValue = blobGranuleFileValueFor(fname, 0, serialized.size()); Key snapshotFileValue = blobGranuleFileValueFor(fname, 0, serializedSize);
tr->set(snapshotFileKey, snapshotFileValue); tr->set(snapshotFileKey, snapshotFileValue);
// create granule history at version if this is a new granule with the initial dump from FDB // create granule history at version if this is a new granule with the initial dump from FDB
if (createGranuleHistory) { if (createGranuleHistory) {
@ -658,14 +665,14 @@ ACTOR Future<BlobFileIndex> writeSnapshot(Reference<BlobWorkerData> bwData,
keyRange.begin.printable(), keyRange.begin.printable(),
keyRange.end.printable(), keyRange.end.printable(),
fname, fname,
serialized.size()); serializedSize);
} }
if (BUGGIFY_WITH_PROB(0.1)) { if (BUGGIFY_WITH_PROB(0.1)) {
wait(delay(deterministicRandom()->random01())); wait(delay(deterministicRandom()->random01()));
} }
return BlobFileIndex(version, fname, 0, serialized.size()); return BlobFileIndex(version, fname, 0, serializedSize);
} }
ACTOR Future<BlobFileIndex> dumpInitialSnapshotFromFDB(Reference<BlobWorkerData> bwData, ACTOR Future<BlobFileIndex> dumpInitialSnapshotFromFDB(Reference<BlobWorkerData> bwData,
@ -1041,8 +1048,7 @@ static Version doGranuleRollback(Reference<GranuleMetadata> metadata,
} }
// discard all in-memory mutations // discard all in-memory mutations
metadata->deltaArena = Arena(); metadata->currentDeltas = Standalone<GranuleDeltas>();
metadata->currentDeltas = GranuleDeltas();
metadata->bufferedDeltaBytes = 0; metadata->bufferedDeltaBytes = 0;
metadata->bufferedDeltaVersion = cfRollbackVersion; metadata->bufferedDeltaVersion = cfRollbackVersion;
@ -1084,7 +1090,7 @@ static Version doGranuleRollback(Reference<GranuleMetadata> metadata,
metadata->bufferedDeltaBytes); metadata->bufferedDeltaBytes);
} }
metadata->currentDeltas.resize(metadata->deltaArena, mIdx); metadata->currentDeltas.resize(metadata->currentDeltas.arena(), mIdx);
// delete all deltas in rollback range, but we can optimize here to just skip the uncommitted mutations // delete all deltas in rollback range, but we can optimize here to just skip the uncommitted mutations
// directly and immediately pop the rollback out of inProgress to completed // directly and immediately pop the rollback out of inProgress to completed
@ -1588,7 +1594,7 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
if (DEBUG_BW_VERSION(deltas.version)) { if (DEBUG_BW_VERSION(deltas.version)) {
fmt::print("BWB {0}: ({1})\n", deltas.version, deltas.mutations.size()); fmt::print("BWB {0}: ({1})\n", deltas.version, deltas.mutations.size());
} }
metadata->currentDeltas.push_back_deep(metadata->deltaArena, deltas); metadata->currentDeltas.push_back_deep(metadata->currentDeltas.arena(), deltas);
processedAnyMutations = true; processedAnyMutations = true;
ASSERT(deltas.version != invalidVersion); ASSERT(deltas.version != invalidVersion);
@ -1641,7 +1647,6 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
startState.granuleID, startState.granuleID,
metadata->originalEpoch, metadata->originalEpoch,
metadata->originalSeqno, metadata->originalSeqno,
metadata->deltaArena,
metadata->currentDeltas, metadata->currentDeltas,
lastDeltaVersion, lastDeltaVersion,
previousFuture, previousFuture,
@ -1659,8 +1664,7 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
bwData->stats.mutationBytesBuffered -= metadata->bufferedDeltaBytes; bwData->stats.mutationBytesBuffered -= metadata->bufferedDeltaBytes;
// reset current deltas // reset current deltas
metadata->deltaArena = Arena(); metadata->currentDeltas = Standalone<GranuleDeltas>();
metadata->currentDeltas = GranuleDeltas();
metadata->bufferedDeltaBytes = 0; metadata->bufferedDeltaBytes = 0;
// if we just wrote a delta file, check if we need to compact here. // if we just wrote a delta file, check if we need to compact here.
@ -2353,7 +2357,7 @@ ACTOR Future<Void> handleBlobGranuleFileRequest(Reference<BlobWorkerData> bwData
metadata->pendingDeltaVersion); metadata->pendingDeltaVersion);
} }
ASSERT(metadata->durableDeltaVersion.get() == metadata->pendingDeltaVersion); ASSERT(metadata->durableDeltaVersion.get() == metadata->pendingDeltaVersion);
rep.arena.dependsOn(metadata->deltaArena); rep.arena.dependsOn(metadata->currentDeltas.arena());
for (auto& delta : metadata->currentDeltas) { for (auto& delta : metadata->currentDeltas) {
if (delta.version > req.readVersion) { if (delta.version > req.readVersion) {
break; break;