Merge pull request #8262 from sfc-gh-dzhou/purge-fixes
blob: java api fixes (purge + verify)
This commit is contained in:
commit
4794ebabcc
|
@ -548,10 +548,14 @@ extern "C" DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_database_verify_blob_rang
|
|||
uint8_t const* end_key_name,
|
||||
int end_key_name_length,
|
||||
int64_t version) {
|
||||
Optional<Version> rv;
|
||||
if (version != latestVersion) {
|
||||
rv = version;
|
||||
}
|
||||
return (FDBFuture*)(DB(db)
|
||||
->verifyBlobRange(KeyRangeRef(StringRef(begin_key_name, begin_key_name_length),
|
||||
StringRef(end_key_name, end_key_name_length)),
|
||||
version)
|
||||
rv)
|
||||
.extractPtr());
|
||||
}
|
||||
|
||||
|
|
|
@ -1037,7 +1037,7 @@ JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FDBDatabase_Database_1verify
|
|||
return 0;
|
||||
}
|
||||
|
||||
FDBFuture* f = fdb_database_list_blobbified_ranges(
|
||||
FDBFuture* f = fdb_database_verify_blob_range(
|
||||
tr, startKey, jenv->GetArrayLength(beginKeyBytes), endKey, jenv->GetArrayLength(endKeyBytes), version);
|
||||
jenv->ReleaseByteArrayElements(beginKeyBytes, (jbyte*)startKey, JNI_ABORT);
|
||||
jenv->ReleaseByteArrayElements(endKeyBytes, (jbyte*)endKey, JNI_ABORT);
|
||||
|
|
|
@ -161,6 +161,19 @@ public interface Database extends AutoCloseable, TransactionContext {
|
|||
*/
|
||||
double getMainThreadBusyness();
|
||||
|
||||
/**
|
||||
* Runs {@link #purgeBlobGranules(Function)} on the default executor.
|
||||
*
|
||||
* @param beginKey start of the key range
|
||||
* @param endKey end of the key range
|
||||
* @param force if true delete all data, if not keep data >= purgeVersion
|
||||
*
|
||||
* @return the key to watch for purge complete
|
||||
*/
|
||||
default CompletableFuture<byte[]> purgeBlobGranules(byte[] beginKey, byte[] endKey, boolean force) {
|
||||
return purgeBlobGranules(beginKey, endKey, -2, force, getExecutor());
|
||||
}
|
||||
|
||||
/**
|
||||
* Runs {@link #purgeBlobGranules(Function)} on the default executor.
|
||||
*
|
||||
|
@ -278,6 +291,18 @@ public interface Database extends AutoCloseable, TransactionContext {
|
|||
*/
|
||||
CompletableFuture<KeyRangeArrayResult> listBlobbifiedRanges(byte[] beginKey, byte[] endKey, int rangeLimit, Executor e);
|
||||
|
||||
/**
|
||||
* Runs {@link #verifyBlobRange(Function)} on the default executor.
|
||||
*
|
||||
* @param beginKey start of the key range
|
||||
* @param endKey end of the key range
|
||||
*
|
||||
* @return a future with the version of the last blob granule.
|
||||
*/
|
||||
default CompletableFuture<Long> verifyBlobRange(byte[] beginKey, byte[] endKey) {
|
||||
return verifyBlobRange(beginKey, endKey, -2, getExecutor());
|
||||
}
|
||||
|
||||
/**
|
||||
* Runs {@link #verifyBlobRange(Function)} on the default executor.
|
||||
*
|
||||
|
|
|
@ -698,8 +698,10 @@ ThreadFuture<Version> DLDatabase::verifyBlobRange(const KeyRangeRef& keyRange, O
|
|||
return unsupported_operation();
|
||||
}
|
||||
|
||||
Version readVersion = version.present() ? version.get() : latestVersion;
|
||||
|
||||
FdbCApi::FDBFuture* f = api->databaseVerifyBlobRange(
|
||||
db, keyRange.begin.begin(), keyRange.begin.size(), keyRange.end.begin(), keyRange.end.size(), version);
|
||||
db, keyRange.begin.begin(), keyRange.begin.size(), keyRange.end.begin(), keyRange.end.size(), readVersion);
|
||||
|
||||
return toThreadFuture<Version>(api, f, [](FdbCApi::FDBFuture* f, FdbCApi* api) {
|
||||
Version version = invalidVersion;
|
||||
|
|
|
@ -8122,6 +8122,25 @@ ACTOR Future<Version> verifyBlobRangeActor(Reference<DatabaseContext> cx, KeyRan
|
|||
state Version readVersionOut = invalidVersion;
|
||||
state int batchSize = BUGGIFY ? deterministicRandom()->randomInt(2, 10) : CLIENT_KNOBS->BG_TOO_MANY_GRANULES / 2;
|
||||
state int loadSize = (BUGGIFY ? deterministicRandom()->randomInt(1, 20) : 20) * batchSize;
|
||||
|
||||
if (version.present()) {
|
||||
if (version.get() == latestVersion) {
|
||||
loop {
|
||||
try {
|
||||
Version _version = wait(tr.getReadVersion());
|
||||
version = _version;
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
if (version.get() <= 0) {
|
||||
TraceEvent("VerifyBlobInvalidVersion").detail("Range", range).detail("Version", version);
|
||||
throw unsupported_operation();
|
||||
}
|
||||
}
|
||||
|
||||
loop {
|
||||
if (curRegion.begin >= range.end) {
|
||||
return readVersionOut;
|
||||
|
@ -9911,6 +9930,24 @@ ACTOR Future<Key> purgeBlobGranulesActor(Reference<DatabaseContext> db,
|
|||
state KeyRange purgeRange = range;
|
||||
state bool loadedTenantPrefix = false;
|
||||
|
||||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
if (purgeVersion == latestVersion) {
|
||||
loop {
|
||||
try {
|
||||
Version _purgeVersion = wait(tr.getReadVersion());
|
||||
purgeVersion = _purgeVersion;
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
tr.reset();
|
||||
}
|
||||
if (purgeVersion <= 0) {
|
||||
TraceEvent("PurgeInvalidVersion").detail("Range", range).detail("Version", purgeVersion).detail("Force", force);
|
||||
throw unsupported_operation();
|
||||
}
|
||||
|
||||
loop {
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
|
|
|
@ -204,7 +204,7 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
|
|||
int begin_key_name_length,
|
||||
uint8_t const* end_key_name,
|
||||
int end_key_name_length,
|
||||
Optional<Version> version);
|
||||
int64_t version);
|
||||
|
||||
// Tenant
|
||||
fdb_error_t (*tenantCreateTransaction)(FDBTenant* tenant, FDBTransaction** outTransaction);
|
||||
|
|
|
@ -130,6 +130,13 @@ struct BlobGranuleRangesWorkload : TestWorkload {
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Key> versionedForcePurge(Database cx, KeyRange range, Optional<TenantName> tenantName) {
|
||||
Version rv = deterministicRandom()->coinflip() ? latestVersion : 1;
|
||||
Key purgeKey = wait(cx->purgeBlobGranules(range, rv, tenantName, true));
|
||||
|
||||
return purgeKey;
|
||||
}
|
||||
|
||||
ACTOR Future<Void> unregisterRandomRange(Database cx, BlobGranuleRangesWorkload* self) {
|
||||
int randomRangeIdx = deterministicRandom()->randomInt(0, self->activeRanges.size());
|
||||
state KeyRange range = self->activeRanges[randomRangeIdx];
|
||||
|
@ -147,7 +154,7 @@ struct BlobGranuleRangesWorkload : TestWorkload {
|
|||
range.begin.printable(),
|
||||
range.end.printable());
|
||||
}
|
||||
Key purgeKey = wait(cx->purgeBlobGranules(range, 1, {}, true));
|
||||
Key purgeKey = wait(self->versionedForcePurge(cx, range, {}));
|
||||
wait(cx->waitPurgeGranulesComplete(purgeKey));
|
||||
}
|
||||
bool success = wait(self->setRange(cx, range, false));
|
||||
|
@ -194,7 +201,11 @@ struct BlobGranuleRangesWorkload : TestWorkload {
|
|||
}
|
||||
|
||||
ACTOR Future<bool> isRangeActive(Database cx, KeyRange range) {
|
||||
Version v = wait(cx->verifyBlobRange(range, {}));
|
||||
Optional<Version> rv;
|
||||
if (deterministicRandom()->coinflip()) {
|
||||
rv = latestVersion;
|
||||
}
|
||||
state Version v = wait(cx->verifyBlobRange(range, rv));
|
||||
return v != invalidVersion;
|
||||
}
|
||||
|
||||
|
@ -307,7 +318,7 @@ struct BlobGranuleRangesWorkload : TestWorkload {
|
|||
}
|
||||
|
||||
// tear down range at end
|
||||
Key purgeKey = wait(cx->purgeBlobGranules(range, 1, {}, true));
|
||||
Key purgeKey = wait(self->versionedForcePurge(cx, range, {}));
|
||||
wait(cx->waitPurgeGranulesComplete(purgeKey));
|
||||
bool success = wait(self->setRange(cx, range, false));
|
||||
ASSERT(success);
|
||||
|
@ -533,7 +544,7 @@ struct BlobGranuleRangesWorkload : TestWorkload {
|
|||
}
|
||||
|
||||
// tear down + check that un-blobbifying at a non-aligned range also doesn't work
|
||||
Key purgeKey = wait(cx->purgeBlobGranules(activeRange, 1, {}, true));
|
||||
Key purgeKey = wait(self->versionedForcePurge(cx, activeRange, {}));
|
||||
wait(cx->waitPurgeGranulesComplete(purgeKey));
|
||||
|
||||
if (deterministicRandom()->coinflip()) {
|
||||
|
@ -586,7 +597,7 @@ struct BlobGranuleRangesWorkload : TestWorkload {
|
|||
wait(self->checkRange(cx, self, range, true));
|
||||
|
||||
// force purge range
|
||||
Key purgeKey = wait(cx->purgeBlobGranules(range, 1, {}, true));
|
||||
Key purgeKey = wait(self->versionedForcePurge(cx, range, {}));
|
||||
wait(cx->waitPurgeGranulesComplete(purgeKey));
|
||||
wait(self->checkRange(cx, self, range, false));
|
||||
|
||||
|
|
Loading…
Reference in New Issue