review comments
This commit is contained in:
parent
46adada5ff
commit
ff96843c58
|
@ -212,6 +212,7 @@ ACTOR Future<BlobFileIndex> writeDeltaFile(BlobWorkerData* bwData,
|
|||
}
|
||||
}
|
||||
} catch (Error& e) {
|
||||
// FIXME: only delete if key doesn't exist
|
||||
// if transaction throws non-retryable error, delete s3 file before exiting
|
||||
if (BW_DEBUG) {
|
||||
printf("deleting s3 delta file %s after error %s\n", fname.c_str(), e.name());
|
||||
|
@ -298,6 +299,7 @@ ACTOR Future<BlobFileIndex> writeSnapshot(BlobWorkerData* bwData,
|
|||
}
|
||||
}
|
||||
} catch (Error& e) {
|
||||
// FIXME: only delete if key doesn't exist
|
||||
// if transaction throws non-retryable error, delete s3 file before exiting
|
||||
if (BW_DEBUG) {
|
||||
printf("deleting s3 snapshot file %s after error %s\n", fname.c_str(), e.name());
|
||||
|
@ -326,6 +328,7 @@ ACTOR Future<BlobFileIndex> dumpInitialSnapshotFromFDB(BlobWorkerData* bwData, R
|
|||
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(bwData->db);
|
||||
|
||||
loop {
|
||||
state Key beginKey = metadata->keyRange.begin;
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
try {
|
||||
state Version readVersion = wait(tr->getReadVersion());
|
||||
|
@ -333,8 +336,8 @@ ACTOR Future<BlobFileIndex> dumpInitialSnapshotFromFDB(BlobWorkerData* bwData, R
|
|||
state Future<BlobFileIndex> snapshotWriter = writeSnapshot(
|
||||
bwData, metadata->keyRange, metadata->lockEpoch, metadata->lockSeqno, readVersion, rowsStream);
|
||||
|
||||
state Key beginKey = metadata->keyRange.begin;
|
||||
loop {
|
||||
// TODO: use streaming range read
|
||||
// TODO knob for limit?
|
||||
RangeResult res = wait(tr->getRange(KeyRangeRef(beginKey, metadata->keyRange.end), 1000));
|
||||
rowsStream.send(res);
|
||||
|
@ -690,8 +693,6 @@ ACTOR Future<Void> persistAssignWorkerRange(BlobWorkerData* bwData, KeyRange key
|
|||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
|
||||
// TODO confirm this should add a read conflict range since we read/wrote the same key
|
||||
|
||||
Optional<Value> prevLockValue = wait(tr->get(lockKey));
|
||||
if (prevLockValue.present()) {
|
||||
std::pair<int64_t, int64_t> prevOwner = decodeBlobGranuleLockValue(prevLockValue.get());
|
||||
|
|
Loading…
Reference in New Issue