Merge remote-tracking branch 'origin/main' into add-quota-clear-command
This commit is contained in:
commit
6bfb5c00fb
|
@ -585,6 +585,58 @@ extern "C" DLLEXPORT FDBFuture* fdb_tenant_wait_purge_granules_complete(FDBTenan
|
|||
.extractPtr());
|
||||
}
|
||||
|
||||
extern "C" DLLEXPORT FDBFuture* fdb_tenant_blobbify_range(FDBTenant* tenant,
|
||||
uint8_t const* begin_key_name,
|
||||
int begin_key_name_length,
|
||||
uint8_t const* end_key_name,
|
||||
int end_key_name_length) {
|
||||
return (FDBFuture*)(TENANT(tenant)
|
||||
->blobbifyRange(KeyRangeRef(StringRef(begin_key_name, begin_key_name_length),
|
||||
StringRef(end_key_name, end_key_name_length)))
|
||||
.extractPtr());
|
||||
}
|
||||
|
||||
extern "C" DLLEXPORT FDBFuture* fdb_tenant_unblobbify_range(FDBTenant* tenant,
|
||||
uint8_t const* begin_key_name,
|
||||
int begin_key_name_length,
|
||||
uint8_t const* end_key_name,
|
||||
int end_key_name_length) {
|
||||
return (FDBFuture*)(TENANT(tenant)
|
||||
->unblobbifyRange(KeyRangeRef(StringRef(begin_key_name, begin_key_name_length),
|
||||
StringRef(end_key_name, end_key_name_length)))
|
||||
.extractPtr());
|
||||
}
|
||||
|
||||
extern "C" DLLEXPORT FDBFuture* fdb_tenant_list_blobbified_ranges(FDBTenant* tenant,
|
||||
uint8_t const* begin_key_name,
|
||||
int begin_key_name_length,
|
||||
uint8_t const* end_key_name,
|
||||
int end_key_name_length,
|
||||
int rangeLimit) {
|
||||
return (FDBFuture*)(TENANT(tenant)
|
||||
->listBlobbifiedRanges(KeyRangeRef(StringRef(begin_key_name, begin_key_name_length),
|
||||
StringRef(end_key_name, end_key_name_length)),
|
||||
rangeLimit)
|
||||
.extractPtr());
|
||||
}
|
||||
|
||||
extern "C" DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_tenant_verify_blob_range(FDBTenant* tenant,
|
||||
uint8_t const* begin_key_name,
|
||||
int begin_key_name_length,
|
||||
uint8_t const* end_key_name,
|
||||
int end_key_name_length,
|
||||
int64_t version) {
|
||||
Optional<Version> rv;
|
||||
if (version != latestVersion) {
|
||||
rv = version;
|
||||
}
|
||||
return (FDBFuture*)(TENANT(tenant)
|
||||
->verifyBlobRange(KeyRangeRef(StringRef(begin_key_name, begin_key_name_length),
|
||||
StringRef(end_key_name, end_key_name_length)),
|
||||
rv)
|
||||
.extractPtr());
|
||||
}
|
||||
|
||||
extern "C" DLLEXPORT void fdb_tenant_destroy(FDBTenant* tenant) {
|
||||
try {
|
||||
TENANT(tenant)->delref();
|
||||
|
|
|
@ -376,6 +376,39 @@ DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_tenant_wait_purge_granules_complete(
|
|||
uint8_t const* purge_key_name,
|
||||
int purge_key_name_length);
|
||||
|
||||
DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_tenant_blobbify_range(FDBTenant* tenant,
|
||||
uint8_t const* begin_key_name,
|
||||
int begin_key_name_length,
|
||||
uint8_t const* end_key_name,
|
||||
int end_key_name_length);
|
||||
|
||||
DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_tenant_unblobbify_range(FDBTenant* tenant,
|
||||
uint8_t const* begin_key_name,
|
||||
int begin_key_name_length,
|
||||
uint8_t const* end_key_name,
|
||||
int end_key_name_length);
|
||||
|
||||
DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_tenant_list_blobbified_ranges(FDBTenant* tenant,
|
||||
uint8_t const* begin_key_name,
|
||||
int begin_key_name_length,
|
||||
uint8_t const* end_key_name,
|
||||
int end_key_name_length,
|
||||
int rangeLimit);
|
||||
|
||||
DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_tenant_list_blobbified_ranges(FDBTenant* tenant,
|
||||
uint8_t const* begin_key_name,
|
||||
int begin_key_name_length,
|
||||
uint8_t const* end_key_name,
|
||||
int end_key_name_length,
|
||||
int rangeLimit);
|
||||
|
||||
DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_tenant_verify_blob_range(FDBTenant* tenant,
|
||||
uint8_t const* begin_key_name,
|
||||
int begin_key_name_length,
|
||||
uint8_t const* end_key_name,
|
||||
int end_key_name_length,
|
||||
int64_t version);
|
||||
|
||||
DLLEXPORT void fdb_tenant_destroy(FDBTenant* tenant);
|
||||
|
||||
DLLEXPORT void fdb_transaction_destroy(FDBTransaction* tr);
|
||||
|
|
|
@ -24,6 +24,7 @@
|
|||
|
||||
#include "com_apple_foundationdb_FDB.h"
|
||||
#include "com_apple_foundationdb_FDBDatabase.h"
|
||||
#include "com_apple_foundationdb_FDBTenant.h"
|
||||
#include "com_apple_foundationdb_FDBTransaction.h"
|
||||
#include "com_apple_foundationdb_FutureBool.h"
|
||||
#include "com_apple_foundationdb_FutureInt64.h"
|
||||
|
@ -1102,6 +1103,203 @@ JNIEXPORT void JNICALL Java_com_apple_foundationdb_FDBTenant_Tenant_1dispose(JNI
|
|||
fdb_tenant_destroy((FDBTenant*)tPtr);
|
||||
}
|
||||
|
||||
JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FDBTenant_Tenant_1purgeBlobGranules(JNIEnv* jenv,
|
||||
jobject,
|
||||
jlong tPtr,
|
||||
jbyteArray beginKeyBytes,
|
||||
jbyteArray endKeyBytes,
|
||||
jlong purgeVersion,
|
||||
jboolean force) {
|
||||
if (!tPtr || !beginKeyBytes || !endKeyBytes) {
|
||||
throwParamNotNull(jenv);
|
||||
return 0;
|
||||
}
|
||||
FDBTenant* tenant = (FDBTenant*)tPtr;
|
||||
|
||||
uint8_t* beginKeyArr = (uint8_t*)jenv->GetByteArrayElements(beginKeyBytes, JNI_NULL);
|
||||
if (!beginKeyArr) {
|
||||
if (!jenv->ExceptionOccurred())
|
||||
throwRuntimeEx(jenv, "Error getting handle to native resources");
|
||||
return 0;
|
||||
}
|
||||
|
||||
uint8_t* endKeyArr = (uint8_t*)jenv->GetByteArrayElements(endKeyBytes, JNI_NULL);
|
||||
if (!endKeyArr) {
|
||||
jenv->ReleaseByteArrayElements(beginKeyBytes, (jbyte*)beginKeyArr, JNI_ABORT);
|
||||
if (!jenv->ExceptionOccurred())
|
||||
throwRuntimeEx(jenv, "Error getting handle to native resources");
|
||||
return 0;
|
||||
}
|
||||
|
||||
FDBFuture* f = fdb_tenant_purge_blob_granules(tenant,
|
||||
beginKeyArr,
|
||||
jenv->GetArrayLength(beginKeyBytes),
|
||||
endKeyArr,
|
||||
jenv->GetArrayLength(endKeyBytes),
|
||||
purgeVersion,
|
||||
(fdb_bool_t)force);
|
||||
jenv->ReleaseByteArrayElements(beginKeyBytes, (jbyte*)beginKeyArr, JNI_ABORT);
|
||||
jenv->ReleaseByteArrayElements(endKeyBytes, (jbyte*)endKeyArr, JNI_ABORT);
|
||||
return (jlong)f;
|
||||
}
|
||||
|
||||
JNIEXPORT jlong JNICALL
|
||||
Java_com_apple_foundationdb_FDBTenant_Tenant_1waitPurgeGranulesComplete(JNIEnv* jenv,
|
||||
jobject,
|
||||
jlong tPtr,
|
||||
jbyteArray purgeKeyBytes) {
|
||||
if (!tPtr || !purgeKeyBytes) {
|
||||
throwParamNotNull(jenv);
|
||||
return 0;
|
||||
}
|
||||
FDBTenant* tenant = (FDBTenant*)tPtr;
|
||||
uint8_t* purgeKeyArr = (uint8_t*)jenv->GetByteArrayElements(purgeKeyBytes, JNI_NULL);
|
||||
|
||||
if (!purgeKeyArr) {
|
||||
if (!jenv->ExceptionOccurred())
|
||||
throwRuntimeEx(jenv, "Error getting handle to native resources");
|
||||
return 0;
|
||||
}
|
||||
FDBFuture* f = fdb_tenant_wait_purge_granules_complete(tenant, purgeKeyArr, jenv->GetArrayLength(purgeKeyBytes));
|
||||
jenv->ReleaseByteArrayElements(purgeKeyBytes, (jbyte*)purgeKeyArr, JNI_ABORT);
|
||||
|
||||
return (jlong)f;
|
||||
}
|
||||
|
||||
JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FDBTenant_Tenant_1blobbifyRange(JNIEnv* jenv,
|
||||
jobject,
|
||||
jlong tPtr,
|
||||
jbyteArray beginKeyBytes,
|
||||
jbyteArray endKeyBytes) {
|
||||
if (!tPtr || !beginKeyBytes || !endKeyBytes) {
|
||||
throwParamNotNull(jenv);
|
||||
return 0;
|
||||
}
|
||||
FDBTenant* tenant = (FDBTenant*)tPtr;
|
||||
|
||||
uint8_t* beginKeyArr = (uint8_t*)jenv->GetByteArrayElements(beginKeyBytes, JNI_NULL);
|
||||
if (!beginKeyArr) {
|
||||
if (!jenv->ExceptionOccurred())
|
||||
throwRuntimeEx(jenv, "Error getting handle to native resources");
|
||||
return 0;
|
||||
}
|
||||
|
||||
uint8_t* endKeyArr = (uint8_t*)jenv->GetByteArrayElements(endKeyBytes, JNI_NULL);
|
||||
if (!endKeyArr) {
|
||||
jenv->ReleaseByteArrayElements(beginKeyBytes, (jbyte*)beginKeyArr, JNI_ABORT);
|
||||
if (!jenv->ExceptionOccurred())
|
||||
throwRuntimeEx(jenv, "Error getting handle to native resources");
|
||||
return 0;
|
||||
}
|
||||
|
||||
FDBFuture* f = fdb_tenant_blobbify_range(
|
||||
tenant, beginKeyArr, jenv->GetArrayLength(beginKeyBytes), endKeyArr, jenv->GetArrayLength(endKeyBytes));
|
||||
jenv->ReleaseByteArrayElements(beginKeyBytes, (jbyte*)beginKeyArr, JNI_ABORT);
|
||||
jenv->ReleaseByteArrayElements(endKeyBytes, (jbyte*)endKeyArr, JNI_ABORT);
|
||||
return (jlong)f;
|
||||
}
|
||||
|
||||
JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FDBTenant_Tenant_1unblobbifyRange(JNIEnv* jenv,
|
||||
jobject,
|
||||
jlong tPtr,
|
||||
jbyteArray beginKeyBytes,
|
||||
jbyteArray endKeyBytes) {
|
||||
if (!tPtr || !beginKeyBytes || !endKeyBytes) {
|
||||
throwParamNotNull(jenv);
|
||||
return 0;
|
||||
}
|
||||
FDBTenant* tenant = (FDBTenant*)tPtr;
|
||||
|
||||
uint8_t* beginKeyArr = (uint8_t*)jenv->GetByteArrayElements(beginKeyBytes, JNI_NULL);
|
||||
if (!beginKeyArr) {
|
||||
if (!jenv->ExceptionOccurred())
|
||||
throwRuntimeEx(jenv, "Error getting handle to native resources");
|
||||
return 0;
|
||||
}
|
||||
|
||||
uint8_t* endKeyArr = (uint8_t*)jenv->GetByteArrayElements(endKeyBytes, JNI_NULL);
|
||||
if (!endKeyArr) {
|
||||
jenv->ReleaseByteArrayElements(beginKeyBytes, (jbyte*)beginKeyArr, JNI_ABORT);
|
||||
if (!jenv->ExceptionOccurred())
|
||||
throwRuntimeEx(jenv, "Error getting handle to native resources");
|
||||
return 0;
|
||||
}
|
||||
|
||||
FDBFuture* f = fdb_tenant_unblobbify_range(
|
||||
tenant, beginKeyArr, jenv->GetArrayLength(beginKeyBytes), endKeyArr, jenv->GetArrayLength(endKeyBytes));
|
||||
jenv->ReleaseByteArrayElements(beginKeyBytes, (jbyte*)beginKeyArr, JNI_ABORT);
|
||||
jenv->ReleaseByteArrayElements(endKeyBytes, (jbyte*)endKeyArr, JNI_ABORT);
|
||||
return (jlong)f;
|
||||
}
|
||||
|
||||
JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FDBTenant_Tenant_1listBlobbifiedRanges(JNIEnv* jenv,
|
||||
jobject,
|
||||
jlong tPtr,
|
||||
jbyteArray beginKeyBytes,
|
||||
jbyteArray endKeyBytes,
|
||||
jint rangeLimit) {
|
||||
if (!tPtr || !beginKeyBytes || !endKeyBytes) {
|
||||
throwParamNotNull(jenv);
|
||||
return 0;
|
||||
}
|
||||
FDBTenant* tenant = (FDBTenant*)tPtr;
|
||||
|
||||
uint8_t* startKey = (uint8_t*)jenv->GetByteArrayElements(beginKeyBytes, JNI_NULL);
|
||||
if (!startKey) {
|
||||
if (!jenv->ExceptionOccurred())
|
||||
throwRuntimeEx(jenv, "Error getting handle to native resources");
|
||||
return 0;
|
||||
}
|
||||
|
||||
uint8_t* endKey = (uint8_t*)jenv->GetByteArrayElements(endKeyBytes, JNI_NULL);
|
||||
if (!endKey) {
|
||||
jenv->ReleaseByteArrayElements(beginKeyBytes, (jbyte*)startKey, JNI_ABORT);
|
||||
if (!jenv->ExceptionOccurred())
|
||||
throwRuntimeEx(jenv, "Error getting handle to native resources");
|
||||
return 0;
|
||||
}
|
||||
|
||||
FDBFuture* f = fdb_tenant_list_blobbified_ranges(
|
||||
tenant, startKey, jenv->GetArrayLength(beginKeyBytes), endKey, jenv->GetArrayLength(endKeyBytes), rangeLimit);
|
||||
jenv->ReleaseByteArrayElements(beginKeyBytes, (jbyte*)startKey, JNI_ABORT);
|
||||
jenv->ReleaseByteArrayElements(endKeyBytes, (jbyte*)endKey, JNI_ABORT);
|
||||
return (jlong)f;
|
||||
}
|
||||
|
||||
JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FDBTenant_Tenant_1verifyBlobRange(JNIEnv* jenv,
|
||||
jobject,
|
||||
jlong tPtr,
|
||||
jbyteArray beginKeyBytes,
|
||||
jbyteArray endKeyBytes,
|
||||
jlong version) {
|
||||
if (!tPtr || !beginKeyBytes || !endKeyBytes) {
|
||||
throwParamNotNull(jenv);
|
||||
return 0;
|
||||
}
|
||||
FDBTenant* tenant = (FDBTenant*)tPtr;
|
||||
|
||||
uint8_t* startKey = (uint8_t*)jenv->GetByteArrayElements(beginKeyBytes, JNI_NULL);
|
||||
if (!startKey) {
|
||||
if (!jenv->ExceptionOccurred())
|
||||
throwRuntimeEx(jenv, "Error getting handle to native resources");
|
||||
return 0;
|
||||
}
|
||||
|
||||
uint8_t* endKey = (uint8_t*)jenv->GetByteArrayElements(endKeyBytes, JNI_NULL);
|
||||
if (!endKey) {
|
||||
jenv->ReleaseByteArrayElements(beginKeyBytes, (jbyte*)startKey, JNI_ABORT);
|
||||
if (!jenv->ExceptionOccurred())
|
||||
throwRuntimeEx(jenv, "Error getting handle to native resources");
|
||||
return 0;
|
||||
}
|
||||
|
||||
FDBFuture* f = fdb_tenant_verify_blob_range(
|
||||
tenant, startKey, jenv->GetArrayLength(beginKeyBytes), endKey, jenv->GetArrayLength(endKeyBytes), version);
|
||||
jenv->ReleaseByteArrayElements(beginKeyBytes, (jbyte*)startKey, JNI_ABORT);
|
||||
jenv->ReleaseByteArrayElements(endKeyBytes, (jbyte*)endKey, JNI_ABORT);
|
||||
return (jlong)f;
|
||||
}
|
||||
|
||||
JNIEXPORT void JNICALL Java_com_apple_foundationdb_FDBTransaction_Transaction_1setVersion(JNIEnv* jenv,
|
||||
jobject,
|
||||
jlong tPtr,
|
||||
|
|
|
@ -138,6 +138,66 @@ class FDBTenant extends NativeObjectWrapper implements Tenant {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<byte[]> purgeBlobGranules(byte[] beginKey, byte[] endKey, long purgeVersion, boolean force, Executor e) {
|
||||
pointerReadLock.lock();
|
||||
try {
|
||||
return new FutureKey(Tenant_purgeBlobGranules(getPtr(), beginKey, endKey, purgeVersion, force), e, eventKeeper);
|
||||
} finally {
|
||||
pointerReadLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> waitPurgeGranulesComplete(byte[] purgeKey, Executor e) {
|
||||
pointerReadLock.lock();
|
||||
try {
|
||||
return new FutureVoid(Tenant_waitPurgeGranulesComplete(getPtr(), purgeKey), e);
|
||||
} finally {
|
||||
pointerReadLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> blobbifyRange(byte[] beginKey, byte[] endKey, Executor e) {
|
||||
pointerReadLock.lock();
|
||||
try {
|
||||
return new FutureBool(Tenant_blobbifyRange(getPtr(), beginKey, endKey), e);
|
||||
} finally {
|
||||
pointerReadLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> unblobbifyRange(byte[] beginKey, byte[] endKey, Executor e) {
|
||||
pointerReadLock.lock();
|
||||
try {
|
||||
return new FutureBool(Tenant_unblobbifyRange(getPtr(), beginKey, endKey), e);
|
||||
} finally {
|
||||
pointerReadLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<KeyRangeArrayResult> listBlobbifiedRanges(byte[] beginKey, byte[] endKey, int rangeLimit, Executor e) {
|
||||
pointerReadLock.lock();
|
||||
try {
|
||||
return new FutureKeyRangeArray(Tenant_listBlobbifiedRanges(getPtr(), beginKey, endKey, rangeLimit), e);
|
||||
} finally {
|
||||
pointerReadLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Long> verifyBlobRange(byte[] beginKey, byte[] endKey, long version, Executor e) {
|
||||
pointerReadLock.lock();
|
||||
try {
|
||||
return new FutureInt64(Tenant_verifyBlobRange(getPtr(), beginKey, endKey, version), e);
|
||||
} finally {
|
||||
pointerReadLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getName() {
|
||||
return name;
|
||||
|
@ -155,4 +215,10 @@ class FDBTenant extends NativeObjectWrapper implements Tenant {
|
|||
|
||||
private native long Tenant_createTransaction(long cPtr);
|
||||
private native void Tenant_dispose(long cPtr);
|
||||
private native long Tenant_purgeBlobGranules(long cPtr, byte[] beginKey, byte[] endKey, long purgeVersion, boolean force);
|
||||
private native long Tenant_waitPurgeGranulesComplete(long cPtr, byte[] purgeKey);
|
||||
private native long Tenant_blobbifyRange(long cPtr, byte[] beginKey, byte[] endKey);
|
||||
private native long Tenant_unblobbifyRange(long cPtr, byte[] beginKey, byte[] endKey);
|
||||
private native long Tenant_listBlobbifiedRanges(long cPtr, byte[] beginKey, byte[] endKey, int rangeLimit);
|
||||
private native long Tenant_verifyBlobRange(long cPtr, byte[] beginKey, byte[] endKey, long version);
|
||||
}
|
|
@ -247,6 +247,173 @@ public interface Tenant extends AutoCloseable, TransactionContext {
|
|||
<T> CompletableFuture<T> runAsync(
|
||||
Function<? super Transaction, ? extends CompletableFuture<T>> retryable, Executor e);
|
||||
|
||||
|
||||
/**
|
||||
* Runs {@link #purgeBlobGranules(Function)} on the default executor.
|
||||
*
|
||||
* @param beginKey start of the key range
|
||||
* @param endKey end of the key range
|
||||
* @param force if true delete all data, if not keep data >= purgeVersion
|
||||
*
|
||||
* @return the key to watch for purge complete
|
||||
*/
|
||||
default CompletableFuture<byte[]> purgeBlobGranules(byte[] beginKey, byte[] endKey, boolean force) {
|
||||
return purgeBlobGranules(beginKey, endKey, -2, force, getExecutor());
|
||||
}
|
||||
|
||||
/**
|
||||
* Runs {@link #purgeBlobGranules(Function)} on the default executor.
|
||||
*
|
||||
* @param beginKey start of the key range
|
||||
* @param endKey end of the key range
|
||||
* @param purgeVersion version to purge at
|
||||
* @param force if true delete all data, if not keep data >= purgeVersion
|
||||
*
|
||||
* @return the key to watch for purge complete
|
||||
*/
|
||||
default CompletableFuture<byte[]> purgeBlobGranules(byte[] beginKey, byte[] endKey, long purgeVersion, boolean force) {
|
||||
return purgeBlobGranules(beginKey, endKey, purgeVersion, force, getExecutor());
|
||||
}
|
||||
|
||||
/**
|
||||
* Queues a purge of blob granules for specified key range of this tenant, at the specified version.
|
||||
*
|
||||
* @param beginKey start of the key range
|
||||
* @param endKey end of the key range
|
||||
* @param purgeVersion version to purge at
|
||||
* @param force if true delete all data, if not keep data >= purgeVersion
|
||||
* @param e the {@link Executor} to use for asynchronous callbacks
|
||||
|
||||
* @return the key to watch for purge complete
|
||||
*/
|
||||
CompletableFuture<byte[]> purgeBlobGranules(byte[] beginKey, byte[] endKey, long purgeVersion, boolean force, Executor e);
|
||||
|
||||
|
||||
/**
|
||||
* Runs {@link #waitPurgeGranulesComplete(Function)} on the default executor.
|
||||
*
|
||||
* @param purgeKey key to watch
|
||||
*/
|
||||
default CompletableFuture<Void> waitPurgeGranulesComplete(byte[] purgeKey) {
|
||||
return waitPurgeGranulesComplete(purgeKey, getExecutor());
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for a previous call to purgeBlobGranules to complete.
|
||||
*
|
||||
* @param purgeKey key to watch
|
||||
* @param e the {@link Executor} to use for asynchronous callbacks
|
||||
*/
|
||||
CompletableFuture<Void> waitPurgeGranulesComplete(byte[] purgeKey, Executor e);
|
||||
|
||||
/**
|
||||
* Runs {@link #blobbifyRange(Function)} on the default executor.
|
||||
*
|
||||
* @param beginKey start of the key range
|
||||
* @param endKey end of the key range
|
||||
|
||||
* @return if the recording of the range was successful
|
||||
*/
|
||||
default CompletableFuture<Boolean> blobbifyRange(byte[] beginKey, byte[] endKey) {
|
||||
return blobbifyRange(beginKey, endKey, getExecutor());
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets a range to be blobbified in this tenant. Must be a completely unblobbified range.
|
||||
*
|
||||
* @param beginKey start of the key range
|
||||
* @param endKey end of the key range
|
||||
* @param e the {@link Executor} to use for asynchronous callbacks
|
||||
|
||||
* @return if the recording of the range was successful
|
||||
*/
|
||||
CompletableFuture<Boolean> blobbifyRange(byte[] beginKey, byte[] endKey, Executor e);
|
||||
|
||||
/**
|
||||
* Runs {@link #unblobbifyRange(Function)} on the default executor.
|
||||
*
|
||||
* @param beginKey start of the key range
|
||||
* @param endKey end of the key range
|
||||
|
||||
* @return if the recording of the range was successful
|
||||
*/
|
||||
default CompletableFuture<Boolean> unblobbifyRange(byte[] beginKey, byte[] endKey) {
|
||||
return unblobbifyRange(beginKey, endKey, getExecutor());
|
||||
}
|
||||
|
||||
/**
|
||||
* Unsets a blobbified range in this tenant. The range must be aligned to known blob ranges.
|
||||
*
|
||||
* @param beginKey start of the key range
|
||||
* @param endKey end of the key range
|
||||
* @param e the {@link Executor} to use for asynchronous callbacks
|
||||
|
||||
* @return if the recording of the range was successful
|
||||
*/
|
||||
CompletableFuture<Boolean> unblobbifyRange(byte[] beginKey, byte[] endKey, Executor e);
|
||||
|
||||
/**
|
||||
* Runs {@link #listBlobbifiedRanges(Function)} on the default executor.
|
||||
*
|
||||
* @param beginKey start of the key range
|
||||
* @param endKey end of the key range
|
||||
* @param rangeLimit batch size
|
||||
* @param e the {@link Executor} to use for asynchronous callbacks
|
||||
|
||||
* @return a future with the list of blobbified ranges: [lastLessThan(beginKey), firstGreaterThanOrEqual(endKey)]
|
||||
*/
|
||||
default CompletableFuture<KeyRangeArrayResult> listBlobbifiedRanges(byte[] beginKey, byte[] endKey, int rangeLimit) {
|
||||
return listBlobbifiedRanges(beginKey, endKey, rangeLimit, getExecutor());
|
||||
}
|
||||
|
||||
/**
|
||||
* Lists blobbified ranges in this tenant. There may be more if result.size() == rangeLimit.
|
||||
*
|
||||
* @param beginKey start of the key range
|
||||
* @param endKey end of the key range
|
||||
* @param rangeLimit batch size
|
||||
* @param e the {@link Executor} to use for asynchronous callbacks
|
||||
|
||||
* @return a future with the list of blobbified ranges: [lastLessThan(beginKey), firstGreaterThanOrEqual(endKey)]
|
||||
*/
|
||||
CompletableFuture<KeyRangeArrayResult> listBlobbifiedRanges(byte[] beginKey, byte[] endKey, int rangeLimit, Executor e);
|
||||
|
||||
/**
|
||||
* Runs {@link #verifyBlobRange(Function)} on the default executor.
|
||||
*
|
||||
* @param beginKey start of the key range
|
||||
* @param endKey end of the key range
|
||||
*
|
||||
* @return a future with the version of the last blob granule.
|
||||
*/
|
||||
default CompletableFuture<Long> verifyBlobRange(byte[] beginKey, byte[] endKey) {
|
||||
return verifyBlobRange(beginKey, endKey, -2, getExecutor());
|
||||
}
|
||||
|
||||
/**
|
||||
* Runs {@link #verifyBlobRange(Function)} on the default executor.
|
||||
*
|
||||
* @param beginKey start of the key range
|
||||
* @param endKey end of the key range
|
||||
* @param version version to read at
|
||||
*
|
||||
* @return a future with the version of the last blob granule.
|
||||
*/
|
||||
default CompletableFuture<Long> verifyBlobRange(byte[] beginKey, byte[] endKey, long version) {
|
||||
return verifyBlobRange(beginKey, endKey, version, getExecutor());
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if a blob range is blobbified in this tenant.
|
||||
*
|
||||
* @param beginKey start of the key range
|
||||
* @param endKey end of the key range
|
||||
* @param version version to read at
|
||||
*
|
||||
* @return a future with the version of the last blob granule.
|
||||
*/
|
||||
CompletableFuture<Long> verifyBlobRange(byte[] beginKey, byte[] endKey, long version, Executor e);
|
||||
|
||||
/**
|
||||
* Close the {@code Tenant} object and release any associated resources. This must be called at
|
||||
* least once after the {@code Tenant} object is no longer in use. This can be called multiple
|
||||
|
|
|
@ -74,7 +74,7 @@ function(compile_boost)
|
|||
BUILD_IN_SOURCE ON
|
||||
INSTALL_COMMAND ""
|
||||
UPDATE_COMMAND ""
|
||||
BUILD_BYPRODUCTS "${BOOST_INSTALL_DIR}/boost/config.hpp"
|
||||
BUILD_BYPRODUCTS "${BOOST_INSTALL_DIR}/include/boost/config.hpp"
|
||||
"${BOOST_INSTALL_DIR}/lib/libboost_context.a"
|
||||
"${BOOST_INSTALL_DIR}/lib/libboost_filesystem.a"
|
||||
"${BOOST_INSTALL_DIR}/lib/libboost_iostreams.a")
|
||||
|
|
|
@ -42,9 +42,10 @@ class ToSummaryTree(xml.sax.handler.ContentHandler):
|
|||
|
||||
def _print_summary(summary: SummaryTree, commands: Set[str]):
|
||||
cmd = []
|
||||
is_valgrind_run = False
|
||||
if config.reproduce_prefix is not None:
|
||||
cmd.append(config.reproduce_prefix)
|
||||
cmd.append('fdbserver')
|
||||
cmd.append('bin/fdbserver')
|
||||
if 'TestFile' in summary.attributes:
|
||||
file_name = summary.attributes['TestFile']
|
||||
role = 'test' if test_harness.run.is_no_sim(Path(file_name)) else 'simulation'
|
||||
|
@ -63,11 +64,6 @@ def _print_summary(summary: SummaryTree, commands: Set[str]):
|
|||
else:
|
||||
cmd += ['b', '<ERROR>']
|
||||
cmd += ['--crash', '--trace_format', config.trace_format]
|
||||
key = ' '.join(cmd)
|
||||
count = 1
|
||||
while key in commands:
|
||||
key = '{} # {}'.format(' '.join(cmd), count)
|
||||
count += 1
|
||||
# we want the command as the first attribute
|
||||
attributes = {'Command': ' '.join(cmd)}
|
||||
for k, v in summary.attributes.items():
|
||||
|
@ -76,18 +72,6 @@ def _print_summary(summary: SummaryTree, commands: Set[str]):
|
|||
else:
|
||||
attributes[k] = v
|
||||
summary.attributes = attributes
|
||||
if config.details:
|
||||
key = str(len(commands))
|
||||
str_io = io.StringIO()
|
||||
summary.dump(str_io, prefix=(' ' if config.pretty_print else ''))
|
||||
if config.output_format == 'json':
|
||||
sys.stdout.write('{}"Test{}": {}'.format(' ' if config.pretty_print else '',
|
||||
key, str_io.getvalue()))
|
||||
else:
|
||||
sys.stdout.write(str_io.getvalue())
|
||||
if config.pretty_print:
|
||||
sys.stdout.write('\n' if config.output_format == 'xml' else ',\n')
|
||||
return key
|
||||
error_count = 0
|
||||
warning_count = 0
|
||||
small_summary = SummaryTree('Test')
|
||||
|
@ -98,6 +82,8 @@ def _print_summary(summary: SummaryTree, commands: Set[str]):
|
|||
for child in summary.children:
|
||||
if 'Severity' in child.attributes and child.attributes['Severity'] == '40' and error_count < config.max_errors:
|
||||
error_count += 1
|
||||
if errors.name == 'ValgrindError':
|
||||
is_valgrind_run = True
|
||||
errors.append(child)
|
||||
if 'Severity' in child.attributes and child.attributes[
|
||||
'Severity'] == '30' and warning_count < config.max_warnings:
|
||||
|
@ -122,6 +108,26 @@ def _print_summary(summary: SummaryTree, commands: Set[str]):
|
|||
small_summary.children.append(errors)
|
||||
if len(warnings.children) > 0:
|
||||
small_summary.children.append(warnings)
|
||||
if is_valgrind_run:
|
||||
idx = 0 if config.reproduce_prefix is None else 1
|
||||
cmd.insert(idx, 'valgrind')
|
||||
key = ' '.join(cmd)
|
||||
count = 1
|
||||
while key in commands:
|
||||
key = '{} # {}'.format(' '.join(cmd), count)
|
||||
count += 1
|
||||
if config.details:
|
||||
key = str(len(commands))
|
||||
str_io = io.StringIO()
|
||||
summary.dump(str_io, prefix=(' ' if config.pretty_print else ''))
|
||||
if config.output_format == 'json':
|
||||
sys.stdout.write('{}"Test{}": {}'.format(' ' if config.pretty_print else '',
|
||||
key, str_io.getvalue()))
|
||||
else:
|
||||
sys.stdout.write(str_io.getvalue())
|
||||
if config.pretty_print:
|
||||
sys.stdout.write('\n' if config.output_format == 'xml' else ',\n')
|
||||
return key
|
||||
output = io.StringIO()
|
||||
small_summary.dump(output, prefix=(' ' if config.pretty_print else ''))
|
||||
if config.output_format == 'json':
|
||||
|
|
|
@ -490,6 +490,73 @@ ThreadFuture<Void> DLTenant::waitPurgeGranulesComplete(const KeyRef& purgeKey) {
|
|||
return toThreadFuture<Void>(api, f, [](FdbCApi::FDBFuture* f, FdbCApi* api) { return Void(); });
|
||||
}
|
||||
|
||||
ThreadFuture<bool> DLTenant::blobbifyRange(const KeyRangeRef& keyRange) {
|
||||
if (!api->tenantBlobbifyRange) {
|
||||
return unsupported_operation();
|
||||
}
|
||||
|
||||
FdbCApi::FDBFuture* f = api->tenantBlobbifyRange(
|
||||
tenant, keyRange.begin.begin(), keyRange.begin.size(), keyRange.end.begin(), keyRange.end.size());
|
||||
|
||||
return toThreadFuture<bool>(api, f, [](FdbCApi::FDBFuture* f, FdbCApi* api) {
|
||||
FdbCApi::fdb_bool_t ret = false;
|
||||
ASSERT(!api->futureGetBool(f, &ret));
|
||||
return ret;
|
||||
});
|
||||
}
|
||||
|
||||
ThreadFuture<bool> DLTenant::unblobbifyRange(const KeyRangeRef& keyRange) {
|
||||
if (!api->tenantUnblobbifyRange) {
|
||||
return unsupported_operation();
|
||||
}
|
||||
|
||||
FdbCApi::FDBFuture* f = api->tenantUnblobbifyRange(
|
||||
tenant, keyRange.begin.begin(), keyRange.begin.size(), keyRange.end.begin(), keyRange.end.size());
|
||||
|
||||
return toThreadFuture<bool>(api, f, [](FdbCApi::FDBFuture* f, FdbCApi* api) {
|
||||
FdbCApi::fdb_bool_t ret = false;
|
||||
ASSERT(!api->futureGetBool(f, &ret));
|
||||
return ret;
|
||||
});
|
||||
}
|
||||
|
||||
ThreadFuture<Standalone<VectorRef<KeyRangeRef>>> DLTenant::listBlobbifiedRanges(const KeyRangeRef& keyRange,
|
||||
int rangeLimit) {
|
||||
if (!api->tenantListBlobbifiedRanges) {
|
||||
return unsupported_operation();
|
||||
}
|
||||
|
||||
FdbCApi::FDBFuture* f = api->tenantListBlobbifiedRanges(
|
||||
tenant, keyRange.begin.begin(), keyRange.begin.size(), keyRange.end.begin(), keyRange.end.size(), rangeLimit);
|
||||
|
||||
return toThreadFuture<Standalone<VectorRef<KeyRangeRef>>>(api, f, [](FdbCApi::FDBFuture* f, FdbCApi* api) {
|
||||
const FdbCApi::FDBKeyRange* keyRanges;
|
||||
int keyRangesLength;
|
||||
FdbCApi::fdb_error_t error = api->futureGetKeyRangeArray(f, &keyRanges, &keyRangesLength);
|
||||
ASSERT(!error);
|
||||
// The memory for this is stored in the FDBFuture and is released when the future gets destroyed.
|
||||
return Standalone<VectorRef<KeyRangeRef>>(VectorRef<KeyRangeRef>((KeyRangeRef*)keyRanges, keyRangesLength),
|
||||
Arena());
|
||||
});
|
||||
}
|
||||
|
||||
ThreadFuture<Version> DLTenant::verifyBlobRange(const KeyRangeRef& keyRange, Optional<Version> version) {
|
||||
if (!api->tenantVerifyBlobRange) {
|
||||
return unsupported_operation();
|
||||
}
|
||||
|
||||
Version readVersion = version.present() ? version.get() : latestVersion;
|
||||
|
||||
FdbCApi::FDBFuture* f = api->tenantVerifyBlobRange(
|
||||
tenant, keyRange.begin.begin(), keyRange.begin.size(), keyRange.end.begin(), keyRange.end.size(), readVersion);
|
||||
|
||||
return toThreadFuture<Version>(api, f, [](FdbCApi::FDBFuture* f, FdbCApi* api) {
|
||||
Version version = invalidVersion;
|
||||
ASSERT(!api->futureGetInt64(f, &version));
|
||||
return version;
|
||||
});
|
||||
}
|
||||
|
||||
// DLDatabase
|
||||
DLDatabase::DLDatabase(Reference<FdbCApi> api, ThreadFuture<FdbCApi::FDBDatabase*> dbFuture) : api(api), db(nullptr) {
|
||||
addref();
|
||||
|
@ -827,12 +894,32 @@ void DLApi::init() {
|
|||
lib,
|
||||
fdbCPath,
|
||||
"fdb_tenant_purge_blob_granules",
|
||||
headerVersion >= ApiVersion::withBlobRangeApi().version());
|
||||
headerVersion >= ApiVersion::withTenantBlobRangeApi().version());
|
||||
loadClientFunction(&api->tenantWaitPurgeGranulesComplete,
|
||||
lib,
|
||||
fdbCPath,
|
||||
"fdb_tenant_wait_purge_granules_complete",
|
||||
headerVersion >= ApiVersion::withBlobRangeApi().version());
|
||||
headerVersion >= ApiVersion::withTenantBlobRangeApi().version());
|
||||
loadClientFunction(&api->tenantBlobbifyRange,
|
||||
lib,
|
||||
fdbCPath,
|
||||
"fdb_tenant_blobbify_range",
|
||||
headerVersion >= ApiVersion::withTenantBlobRangeApi().version());
|
||||
loadClientFunction(&api->tenantUnblobbifyRange,
|
||||
lib,
|
||||
fdbCPath,
|
||||
"fdb_tenant_unblobbify_range",
|
||||
headerVersion >= ApiVersion::withTenantBlobRangeApi().version());
|
||||
loadClientFunction(&api->tenantListBlobbifiedRanges,
|
||||
lib,
|
||||
fdbCPath,
|
||||
"fdb_tenant_list_blobbified_ranges",
|
||||
headerVersion >= ApiVersion::withTenantBlobRangeApi().version());
|
||||
loadClientFunction(&api->tenantVerifyBlobRange,
|
||||
lib,
|
||||
fdbCPath,
|
||||
"fdb_tenant_verify_blob_range",
|
||||
headerVersion >= ApiVersion::withTenantBlobRangeApi().version());
|
||||
loadClientFunction(&api->tenantDestroy, lib, fdbCPath, "fdb_tenant_destroy", headerVersion >= 710);
|
||||
|
||||
loadClientFunction(&api->transactionSetOption, lib, fdbCPath, "fdb_transaction_set_option", headerVersion >= 0);
|
||||
|
@ -1608,13 +1695,41 @@ Reference<ITransaction> MultiVersionTenant::createTransaction() {
|
|||
}
|
||||
|
||||
ThreadFuture<Key> MultiVersionTenant::purgeBlobGranules(const KeyRangeRef& keyRange, Version purgeVersion, bool force) {
|
||||
auto f = tenantState->db ? tenantState->db->purgeBlobGranules(keyRange, purgeVersion, force)
|
||||
: ThreadFuture<Key>(Never());
|
||||
return abortableFuture(f, tenantState->db->dbState->dbVar->get().onChange);
|
||||
auto tenantDb = tenantState->tenantVar->get();
|
||||
auto f =
|
||||
tenantDb.value ? tenantDb.value->purgeBlobGranules(keyRange, purgeVersion, force) : ThreadFuture<Key>(Never());
|
||||
return abortableFuture(f, tenantDb.onChange);
|
||||
}
|
||||
ThreadFuture<Void> MultiVersionTenant::waitPurgeGranulesComplete(const KeyRef& purgeKey) {
|
||||
auto f = tenantState->db ? tenantState->db->waitPurgeGranulesComplete(purgeKey) : ThreadFuture<Void>(Never());
|
||||
return abortableFuture(f, tenantState->db->dbState->dbVar->get().onChange);
|
||||
auto tenantDb = tenantState->tenantVar->get();
|
||||
auto f = tenantDb.value ? tenantDb.value->waitPurgeGranulesComplete(purgeKey) : ThreadFuture<Void>(Never());
|
||||
return abortableFuture(f, tenantDb.onChange);
|
||||
}
|
||||
|
||||
ThreadFuture<bool> MultiVersionTenant::blobbifyRange(const KeyRangeRef& keyRange) {
|
||||
auto tenantDb = tenantState->tenantVar->get();
|
||||
auto f = tenantDb.value ? tenantDb.value->blobbifyRange(keyRange) : ThreadFuture<bool>(Never());
|
||||
return abortableFuture(f, tenantDb.onChange);
|
||||
}
|
||||
|
||||
ThreadFuture<bool> MultiVersionTenant::unblobbifyRange(const KeyRangeRef& keyRange) {
|
||||
auto tenantDb = tenantState->tenantVar->get();
|
||||
auto f = tenantDb.value ? tenantDb.value->unblobbifyRange(keyRange) : ThreadFuture<bool>(Never());
|
||||
return abortableFuture(f, tenantDb.onChange);
|
||||
}
|
||||
|
||||
ThreadFuture<Standalone<VectorRef<KeyRangeRef>>> MultiVersionTenant::listBlobbifiedRanges(const KeyRangeRef& keyRange,
|
||||
int rangeLimit) {
|
||||
auto tenantDb = tenantState->tenantVar->get();
|
||||
auto f = tenantDb.value ? tenantDb.value->listBlobbifiedRanges(keyRange, rangeLimit)
|
||||
: ThreadFuture<Standalone<VectorRef<KeyRangeRef>>>(Never());
|
||||
return abortableFuture(f, tenantDb.onChange);
|
||||
}
|
||||
|
||||
ThreadFuture<Version> MultiVersionTenant::verifyBlobRange(const KeyRangeRef& keyRange, Optional<Version> version) {
|
||||
auto tenantDb = tenantState->tenantVar->get();
|
||||
auto f = tenantDb.value ? tenantDb.value->verifyBlobRange(keyRange, version) : ThreadFuture<Version>(Never());
|
||||
return abortableFuture(f, tenantDb.onChange);
|
||||
}
|
||||
|
||||
MultiVersionTenant::TenantState::TenantState(Reference<MultiVersionDatabase> db, TenantNameRef tenantName)
|
||||
|
|
|
@ -7865,25 +7865,46 @@ ACTOR Future<Standalone<VectorRef<KeyRef>>> getRangeSplitPoints(Reference<Transa
|
|||
}
|
||||
|
||||
// kind of a hack, but necessary to work around needing to access system keys in a tenant-enabled transaction
|
||||
ACTOR Future<TenantMapEntry> blobGranuleGetTenantEntry(Transaction* self, Key rangeStartKey) {
|
||||
ACTOR Future<TenantMapEntry> blobGranuleGetTenantEntry(Transaction* self,
|
||||
Key rangeStartKey,
|
||||
Optional<TenantName> tenantName) {
|
||||
ASSERT(tenantName.present() || self->getTenant().present());
|
||||
TenantName tName = tenantName.present() ? tenantName.get() : self->getTenant().get();
|
||||
state TenantMapEntry tme;
|
||||
|
||||
Optional<KeyRangeLocationInfo> cachedLocationInfo =
|
||||
self->trState->cx->getCachedLocation(self->getTenant().get(), rangeStartKey, Reverse::False);
|
||||
self->trState->cx->getCachedLocation(tName, rangeStartKey, Reverse::False);
|
||||
if (!cachedLocationInfo.present()) {
|
||||
// If we're passing in a tenant, use that and do not touch the transaction.
|
||||
TenantInfo tInfo;
|
||||
if (tenantName.present()) {
|
||||
tInfo = TenantInfo(tName, {}, TenantInfo::INVALID_TENANT);
|
||||
} else {
|
||||
tInfo = self->trState->getTenantInfo(AllowInvalidTenantID::True);
|
||||
}
|
||||
KeyRangeLocationInfo l = wait(getKeyLocation_internal(
|
||||
self->trState->cx,
|
||||
self->trState->getTenantInfo(AllowInvalidTenantID::True),
|
||||
tInfo,
|
||||
rangeStartKey,
|
||||
self->trState->spanContext,
|
||||
self->trState->readOptions.present() ? self->trState->readOptions.get().debugID : Optional<UID>(),
|
||||
self->trState->useProvisionalProxies,
|
||||
Reverse::False,
|
||||
latestVersion));
|
||||
self->trState->trySetTenantId(l.tenantEntry.id);
|
||||
return l.tenantEntry;
|
||||
tme = l.tenantEntry;
|
||||
} else {
|
||||
self->trState->trySetTenantId(cachedLocationInfo.get().tenantEntry.id);
|
||||
return cachedLocationInfo.get().tenantEntry;
|
||||
tme = cachedLocationInfo.get().tenantEntry;
|
||||
}
|
||||
|
||||
if (tme.id == TenantInfo::INVALID_TENANT) {
|
||||
throw tenant_not_found();
|
||||
}
|
||||
|
||||
// Modify transaction if desired.
|
||||
if (!tenantName.present()) {
|
||||
self->trState->trySetTenantId(tme.id);
|
||||
}
|
||||
return tme;
|
||||
}
|
||||
|
||||
Future<Standalone<VectorRef<KeyRef>>> Transaction::getRangeSplitPoints(KeyRange const& keys, int64_t chunkSize) {
|
||||
|
@ -7908,7 +7929,7 @@ ACTOR Future<Standalone<VectorRef<KeyRangeRef>>> getBlobGranuleRangesActor(Trans
|
|||
|
||||
if (self->getTenant().present()) {
|
||||
// have to bypass tenant to read system key space, and add tenant prefix to part of mapping
|
||||
TenantMapEntry tenantEntry = wait(blobGranuleGetTenantEntry(self, currentRange.begin));
|
||||
TenantMapEntry tenantEntry = wait(blobGranuleGetTenantEntry(self, currentRange.begin, {}));
|
||||
tenantPrefix = tenantEntry.prefix;
|
||||
} else {
|
||||
self->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
|
@ -8011,7 +8032,7 @@ ACTOR Future<Standalone<VectorRef<BlobGranuleChunkRef>>> readBlobGranulesActor(
|
|||
|
||||
if (self->getTenant().present()) {
|
||||
// have to bypass tenant to read system key space, and add tenant prefix to part of mapping
|
||||
TenantMapEntry tenantEntry = wait(blobGranuleGetTenantEntry(self, range.begin));
|
||||
TenantMapEntry tenantEntry = wait(blobGranuleGetTenantEntry(self, range.begin, {}));
|
||||
tenantPrefix = tenantEntry.prefix;
|
||||
Standalone<StringRef> mappingPrefix = tenantEntry.prefix.withPrefix(blobGranuleMappingKeys.begin);
|
||||
|
||||
|
@ -8342,7 +8363,10 @@ ACTOR Future<Version> checkBlobSubrange(Database db, KeyRange keyRange, Optional
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Version> verifyBlobRangeActor(Reference<DatabaseContext> cx, KeyRange range, Optional<Version> version) {
|
||||
ACTOR Future<Version> verifyBlobRangeActor(Reference<DatabaseContext> cx,
|
||||
KeyRange range,
|
||||
Optional<Version> version,
|
||||
Optional<TenantName> tenantName) {
|
||||
state Database db(cx);
|
||||
state Transaction tr(db);
|
||||
state Standalone<VectorRef<KeyRangeRef>> allRanges;
|
||||
|
@ -8350,6 +8374,7 @@ ACTOR Future<Version> verifyBlobRangeActor(Reference<DatabaseContext> cx, KeyRan
|
|||
state Version readVersionOut = invalidVersion;
|
||||
state int batchSize = BUGGIFY ? deterministicRandom()->randomInt(2, 10) : CLIENT_KNOBS->BG_TOO_MANY_GRANULES / 2;
|
||||
state int loadSize = (BUGGIFY ? deterministicRandom()->randomInt(1, 20) : 20) * batchSize;
|
||||
state bool loadedTenantEntry = false;
|
||||
|
||||
if (version.present()) {
|
||||
if (version.get() == latestVersion) {
|
||||
|
@ -8373,6 +8398,12 @@ ACTOR Future<Version> verifyBlobRangeActor(Reference<DatabaseContext> cx, KeyRan
|
|||
if (curRegion.begin >= range.end) {
|
||||
return readVersionOut;
|
||||
}
|
||||
if (tenantName.present() && !loadedTenantEntry) {
|
||||
TenantMapEntry tenantEntry = wait(blobGranuleGetTenantEntry(&tr, range.begin, tenantName));
|
||||
loadedTenantEntry = true;
|
||||
range = range.withPrefix(tenantEntry.prefix);
|
||||
curRegion = KeyRangeRef(range.begin, range.begin);
|
||||
}
|
||||
loop {
|
||||
try {
|
||||
wait(store(allRanges, tr.getBlobGranuleRanges(KeyRangeRef(curRegion.begin, range.end), loadSize)));
|
||||
|
@ -8424,8 +8455,10 @@ ACTOR Future<Version> verifyBlobRangeActor(Reference<DatabaseContext> cx, KeyRan
|
|||
}
|
||||
}
|
||||
|
||||
Future<Version> DatabaseContext::verifyBlobRange(const KeyRange& range, Optional<Version> version) {
|
||||
return verifyBlobRangeActor(Reference<DatabaseContext>::addRef(this), range, version);
|
||||
Future<Version> DatabaseContext::verifyBlobRange(const KeyRange& range,
|
||||
Optional<Version> version,
|
||||
Optional<TenantName> tenantName) {
|
||||
return verifyBlobRangeActor(Reference<DatabaseContext>::addRef(this), range, version, tenantName);
|
||||
}
|
||||
|
||||
ACTOR Future<std::vector<std::pair<UID, StorageWiggleValue>>> readStorageWiggleValues(Database cx,
|
||||
|
@ -10492,7 +10525,7 @@ ACTOR Future<Key> purgeBlobGranulesActor(Reference<DatabaseContext> db,
|
|||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
|
||||
if (tenant.present() && !loadedTenantPrefix) {
|
||||
TenantMapEntry tenantEntry = wait(blobGranuleGetTenantEntry(&tr, range.begin));
|
||||
TenantMapEntry tenantEntry = wait(blobGranuleGetTenantEntry(&tr, range.begin, tenant));
|
||||
loadedTenantPrefix = true;
|
||||
purgeRange = purgeRange.withPrefix(tenantEntry.prefix);
|
||||
}
|
||||
|
@ -10611,9 +10644,13 @@ ACTOR Future<Standalone<VectorRef<KeyRangeRef>>> getBlobRanges(Reference<ReadYou
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<bool> setBlobRangeActor(Reference<DatabaseContext> cx, KeyRange range, bool active) {
|
||||
ACTOR Future<bool> setBlobRangeActor(Reference<DatabaseContext> cx,
|
||||
KeyRange range,
|
||||
bool active,
|
||||
Optional<TenantName> tenantName) {
|
||||
state Database db(cx);
|
||||
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(db);
|
||||
state bool loadedTenantEntry = false;
|
||||
|
||||
state Value value = active ? blobRangeActive : blobRangeInactive;
|
||||
loop {
|
||||
|
@ -10621,6 +10658,13 @@ ACTOR Future<bool> setBlobRangeActor(Reference<DatabaseContext> cx, KeyRange ran
|
|||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
|
||||
if (tenantName.present() && !loadedTenantEntry) {
|
||||
TenantMapEntry tenantEntry =
|
||||
wait(blobGranuleGetTenantEntry(&tr->getTransaction(), range.begin, tenantName));
|
||||
loadedTenantEntry = true;
|
||||
range = range.withPrefix(tenantEntry.prefix);
|
||||
}
|
||||
|
||||
Standalone<VectorRef<KeyRangeRef>> startBlobRanges = wait(getBlobRanges(tr, range, 1));
|
||||
|
||||
if (active) {
|
||||
|
@ -10657,27 +10701,59 @@ ACTOR Future<bool> setBlobRangeActor(Reference<DatabaseContext> cx, KeyRange ran
|
|||
}
|
||||
}
|
||||
|
||||
Future<bool> DatabaseContext::blobbifyRange(KeyRange range) {
|
||||
return setBlobRangeActor(Reference<DatabaseContext>::addRef(this), range, true);
|
||||
Future<bool> DatabaseContext::blobbifyRange(KeyRange range, Optional<TenantName> tenantName) {
|
||||
return setBlobRangeActor(Reference<DatabaseContext>::addRef(this), range, true, tenantName);
|
||||
}
|
||||
|
||||
Future<bool> DatabaseContext::unblobbifyRange(KeyRange range) {
|
||||
return setBlobRangeActor(Reference<DatabaseContext>::addRef(this), range, false);
|
||||
Future<bool> DatabaseContext::unblobbifyRange(KeyRange range, Optional<TenantName> tenantName) {
|
||||
return setBlobRangeActor(Reference<DatabaseContext>::addRef(this), range, false, tenantName);
|
||||
}
|
||||
|
||||
ACTOR Future<Standalone<VectorRef<KeyRangeRef>>> listBlobbifiedRangesActor(Reference<DatabaseContext> cx,
|
||||
KeyRange range,
|
||||
int rangeLimit) {
|
||||
int rangeLimit,
|
||||
Optional<TenantName> tenantName) {
|
||||
state Database db(cx);
|
||||
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(db);
|
||||
state TenantMapEntry tme;
|
||||
|
||||
loop {
|
||||
try {
|
||||
if (tenantName.present()) {
|
||||
wait(store(tme, blobGranuleGetTenantEntry(&tr->getTransaction(), range.begin, tenantName)));
|
||||
range = range.withPrefix(tme.prefix);
|
||||
}
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tr->onError(e));
|
||||
}
|
||||
}
|
||||
|
||||
state Standalone<VectorRef<KeyRangeRef>> blobRanges = wait(getBlobRanges(tr, range, rangeLimit));
|
||||
|
||||
if (!tenantName.present()) {
|
||||
return blobRanges;
|
||||
}
|
||||
|
||||
Future<Standalone<VectorRef<KeyRangeRef>>> DatabaseContext::listBlobbifiedRanges(KeyRange range, int rowLimit) {
|
||||
return listBlobbifiedRangesActor(Reference<DatabaseContext>::addRef(this), range, rowLimit);
|
||||
// Strip tenant prefix out.
|
||||
state Standalone<VectorRef<KeyRangeRef>> tenantBlobRanges;
|
||||
for (auto& blobRange : blobRanges) {
|
||||
// Filter out blob ranges that span tenants for some reason.
|
||||
if (!blobRange.begin.startsWith(tme.prefix) || !blobRange.end.startsWith(tme.prefix)) {
|
||||
TraceEvent("ListBlobbifiedRangeSpansTenants")
|
||||
.suppressFor(/*seconds=*/5)
|
||||
.detail("Tenant", tenantName.get())
|
||||
.detail("Range", blobRange);
|
||||
continue;
|
||||
}
|
||||
tenantBlobRanges.push_back_deep(tenantBlobRanges.arena(), blobRange.removePrefix(tme.prefix));
|
||||
}
|
||||
return tenantBlobRanges;
|
||||
}
|
||||
|
||||
Future<Standalone<VectorRef<KeyRangeRef>>> DatabaseContext::listBlobbifiedRanges(KeyRange range,
|
||||
int rowLimit,
|
||||
Optional<TenantName> tenantName) {
|
||||
return listBlobbifiedRangesActor(Reference<DatabaseContext>::addRef(this), range, rowLimit, tenantName);
|
||||
}
|
||||
|
||||
int64_t getMaxKeySize(KeyRef const& key) {
|
||||
|
|
|
@ -726,6 +726,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
|||
init( GLOBAL_TAG_THROTTLING_MIN_RATE, 1.0 );
|
||||
init( GLOBAL_TAG_THROTTLING_FOLDING_TIME, 10.0 );
|
||||
init( GLOBAL_TAG_THROTTLING_RW_FUNGIBILITY_RATIO, 5.0 );
|
||||
init( GLOBAL_TAG_THROTTLING_MAX_TAGS_TRACKED, 10 );
|
||||
init( GLOBAL_TAG_THROTTLING_TAG_EXPIRE_AFTER, 240.0 );
|
||||
|
||||
//Storage Metrics
|
||||
init( STORAGE_METRICS_AVERAGE_INTERVAL, 120.0 );
|
||||
|
|
|
@ -246,6 +246,47 @@ ThreadFuture<Void> ThreadSafeTenant::waitPurgeGranulesComplete(const KeyRef& pur
|
|||
});
|
||||
}
|
||||
|
||||
ThreadFuture<bool> ThreadSafeTenant::blobbifyRange(const KeyRangeRef& keyRange) {
|
||||
DatabaseContext* db = this->db->db;
|
||||
TenantName tenantName = this->name;
|
||||
KeyRange range = keyRange;
|
||||
return onMainThread([=]() -> Future<bool> {
|
||||
db->checkDeferredError();
|
||||
return db->blobbifyRange(range, tenantName);
|
||||
});
|
||||
}
|
||||
|
||||
ThreadFuture<bool> ThreadSafeTenant::unblobbifyRange(const KeyRangeRef& keyRange) {
|
||||
DatabaseContext* db = this->db->db;
|
||||
TenantName tenantName = this->name;
|
||||
KeyRange range = keyRange;
|
||||
return onMainThread([=]() -> Future<bool> {
|
||||
db->checkDeferredError();
|
||||
return db->unblobbifyRange(range, tenantName);
|
||||
});
|
||||
}
|
||||
|
||||
ThreadFuture<Standalone<VectorRef<KeyRangeRef>>> ThreadSafeTenant::listBlobbifiedRanges(const KeyRangeRef& keyRange,
|
||||
int rangeLimit) {
|
||||
DatabaseContext* db = this->db->db;
|
||||
TenantName tenantName = this->name;
|
||||
KeyRange range = keyRange;
|
||||
return onMainThread([=]() -> Future<Standalone<VectorRef<KeyRangeRef>>> {
|
||||
db->checkDeferredError();
|
||||
return db->listBlobbifiedRanges(range, rangeLimit, tenantName);
|
||||
});
|
||||
}
|
||||
|
||||
ThreadFuture<Version> ThreadSafeTenant::verifyBlobRange(const KeyRangeRef& keyRange, Optional<Version> version) {
|
||||
DatabaseContext* db = this->db->db;
|
||||
TenantName tenantName = this->name;
|
||||
KeyRange range = keyRange;
|
||||
return onMainThread([=]() -> Future<Version> {
|
||||
db->checkDeferredError();
|
||||
return db->verifyBlobRange(range, version, tenantName);
|
||||
});
|
||||
}
|
||||
|
||||
ThreadSafeTenant::~ThreadSafeTenant() {}
|
||||
|
||||
ThreadSafeTransaction::ThreadSafeTransaction(DatabaseContext* cx,
|
||||
|
|
|
@ -388,10 +388,14 @@ public:
|
|||
bool force = false);
|
||||
Future<Void> waitPurgeGranulesComplete(Key purgeKey);
|
||||
|
||||
Future<bool> blobbifyRange(KeyRange range);
|
||||
Future<bool> unblobbifyRange(KeyRange range);
|
||||
Future<Standalone<VectorRef<KeyRangeRef>>> listBlobbifiedRanges(KeyRange range, int rangeLimit);
|
||||
Future<Version> verifyBlobRange(const KeyRange& range, Optional<Version> version);
|
||||
Future<bool> blobbifyRange(KeyRange range, Optional<TenantName> tenantName = {});
|
||||
Future<bool> unblobbifyRange(KeyRange range, Optional<TenantName> tenantName = {});
|
||||
Future<Standalone<VectorRef<KeyRangeRef>>> listBlobbifiedRanges(KeyRange range,
|
||||
int rangeLimit,
|
||||
Optional<TenantName> tenantName = {});
|
||||
Future<Version> verifyBlobRange(const KeyRange& range,
|
||||
Optional<Version> version,
|
||||
Optional<TenantName> tenantName = {});
|
||||
|
||||
// private:
|
||||
explicit DatabaseContext(Reference<AsyncVar<Reference<IClusterConnectionRecord>>> connectionRecord,
|
||||
|
|
|
@ -150,6 +150,13 @@ public:
|
|||
virtual ThreadFuture<Key> purgeBlobGranules(const KeyRangeRef& keyRange, Version purgeVersion, bool force) = 0;
|
||||
virtual ThreadFuture<Void> waitPurgeGranulesComplete(const KeyRef& purgeKey) = 0;
|
||||
|
||||
virtual ThreadFuture<bool> blobbifyRange(const KeyRangeRef& keyRange) = 0;
|
||||
virtual ThreadFuture<bool> unblobbifyRange(const KeyRangeRef& keyRange) = 0;
|
||||
virtual ThreadFuture<Standalone<VectorRef<KeyRangeRef>>> listBlobbifiedRanges(const KeyRangeRef& keyRange,
|
||||
int rangeLimit) = 0;
|
||||
|
||||
virtual ThreadFuture<Version> verifyBlobRange(const KeyRangeRef& keyRange, Optional<Version> version) = 0;
|
||||
|
||||
virtual void addref() = 0;
|
||||
virtual void delref() = 0;
|
||||
};
|
||||
|
|
|
@ -221,6 +221,32 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
|
|||
uint8_t const* purge_key_name,
|
||||
int purge_key_name_length);
|
||||
|
||||
FDBFuture* (*tenantBlobbifyRange)(FDBTenant* tenant,
|
||||
uint8_t const* begin_key_name,
|
||||
int begin_key_name_length,
|
||||
uint8_t const* end_key_name,
|
||||
int end_key_name_length);
|
||||
|
||||
FDBFuture* (*tenantUnblobbifyRange)(FDBTenant* tenant,
|
||||
uint8_t const* begin_key_name,
|
||||
int begin_key_name_length,
|
||||
uint8_t const* end_key_name,
|
||||
int end_key_name_length);
|
||||
|
||||
FDBFuture* (*tenantListBlobbifiedRanges)(FDBTenant* tenant,
|
||||
uint8_t const* begin_key_name,
|
||||
int begin_key_name_length,
|
||||
uint8_t const* end_key_name,
|
||||
int end_key_name_length,
|
||||
int rangeLimit);
|
||||
|
||||
FDBFuture* (*tenantVerifyBlobRange)(FDBTenant* tenant,
|
||||
uint8_t const* begin_key_name,
|
||||
int begin_key_name_length,
|
||||
uint8_t const* end_key_name,
|
||||
int end_key_name_length,
|
||||
int64_t version);
|
||||
|
||||
void (*tenantDestroy)(FDBTenant* tenant);
|
||||
|
||||
// Transaction
|
||||
|
@ -513,6 +539,13 @@ public:
|
|||
ThreadFuture<Key> purgeBlobGranules(const KeyRangeRef& keyRange, Version purgeVersion, bool force) override;
|
||||
ThreadFuture<Void> waitPurgeGranulesComplete(const KeyRef& purgeKey) override;
|
||||
|
||||
ThreadFuture<bool> blobbifyRange(const KeyRangeRef& keyRange) override;
|
||||
ThreadFuture<bool> unblobbifyRange(const KeyRangeRef& keyRange) override;
|
||||
ThreadFuture<Standalone<VectorRef<KeyRangeRef>>> listBlobbifiedRanges(const KeyRangeRef& keyRange,
|
||||
int rangeLimit) override;
|
||||
|
||||
ThreadFuture<Version> verifyBlobRange(const KeyRangeRef& keyRange, Optional<Version> version) override;
|
||||
|
||||
void addref() override { ThreadSafeReferenceCounted<DLTenant>::addref(); }
|
||||
void delref() override { ThreadSafeReferenceCounted<DLTenant>::delref(); }
|
||||
|
||||
|
@ -560,6 +593,7 @@ public:
|
|||
ThreadFuture<bool> unblobbifyRange(const KeyRangeRef& keyRange) override;
|
||||
ThreadFuture<Standalone<VectorRef<KeyRangeRef>>> listBlobbifiedRanges(const KeyRangeRef& keyRange,
|
||||
int rangeLimit) override;
|
||||
|
||||
ThreadFuture<Version> verifyBlobRange(const KeyRangeRef& keyRange, Optional<Version> version) override;
|
||||
|
||||
ThreadFuture<DatabaseSharedState*> createSharedState() override;
|
||||
|
@ -809,6 +843,12 @@ public:
|
|||
ThreadFuture<Key> purgeBlobGranules(const KeyRangeRef& keyRange, Version purgeVersion, bool force) override;
|
||||
ThreadFuture<Void> waitPurgeGranulesComplete(const KeyRef& purgeKey) override;
|
||||
|
||||
ThreadFuture<bool> blobbifyRange(const KeyRangeRef& keyRange) override;
|
||||
ThreadFuture<bool> unblobbifyRange(const KeyRangeRef& keyRange) override;
|
||||
ThreadFuture<Standalone<VectorRef<KeyRangeRef>>> listBlobbifiedRanges(const KeyRangeRef& keyRange,
|
||||
int rangeLimit) override;
|
||||
ThreadFuture<Version> verifyBlobRange(const KeyRangeRef& keyRange, Optional<Version> version) override;
|
||||
|
||||
void addref() override { ThreadSafeReferenceCounted<MultiVersionTenant>::addref(); }
|
||||
void delref() override { ThreadSafeReferenceCounted<MultiVersionTenant>::delref(); }
|
||||
|
||||
|
|
|
@ -624,6 +624,12 @@ public:
|
|||
double GLOBAL_TAG_THROTTLING_FOLDING_TIME;
|
||||
// Cost multiplier for writes (because write operations are more expensive than reads)
|
||||
double GLOBAL_TAG_THROTTLING_RW_FUNGIBILITY_RATIO;
|
||||
// Maximum number of tags tracked by global tag throttler. Additional tags will be ignored
|
||||
// until some existing tags expire
|
||||
int64_t GLOBAL_TAG_THROTTLING_MAX_TAGS_TRACKED;
|
||||
// Global tag throttler forgets about throughput from a tag once no new transactions from that
|
||||
// tag have been received for this duration (in seconds):
|
||||
int64_t GLOBAL_TAG_THROTTLING_TAG_EXPIRE_AFTER;
|
||||
|
||||
double MAX_TRANSACTIONS_PER_BYTE;
|
||||
|
||||
|
|
|
@ -96,6 +96,13 @@ public:
|
|||
ThreadFuture<Key> purgeBlobGranules(const KeyRangeRef& keyRange, Version purgeVersion, bool force) override;
|
||||
ThreadFuture<Void> waitPurgeGranulesComplete(const KeyRef& purgeKey) override;
|
||||
|
||||
ThreadFuture<bool> blobbifyRange(const KeyRangeRef& keyRange) override;
|
||||
ThreadFuture<bool> unblobbifyRange(const KeyRangeRef& keyRange) override;
|
||||
ThreadFuture<Standalone<VectorRef<KeyRangeRef>>> listBlobbifiedRanges(const KeyRangeRef& keyRange,
|
||||
int rangeLimit) override;
|
||||
|
||||
ThreadFuture<Version> verifyBlobRange(const KeyRangeRef& keyRange, Optional<Version> version) override;
|
||||
|
||||
void addref() override { ThreadSafeReferenceCounted<ThreadSafeTenant>::addref(); }
|
||||
void delref() override { ThreadSafeReferenceCounted<ThreadSafeTenant>::delref(); }
|
||||
|
||||
|
|
|
@ -120,12 +120,13 @@ class GlobalTagThrottlerImpl {
|
|||
Smoother transactionCounter;
|
||||
Smoother perClientRate;
|
||||
Smoother targetRate;
|
||||
double transactionsLastAdded;
|
||||
|
||||
public:
|
||||
explicit PerTagStatistics()
|
||||
: transactionCounter(SERVER_KNOBS->GLOBAL_TAG_THROTTLING_FOLDING_TIME),
|
||||
perClientRate(SERVER_KNOBS->GLOBAL_TAG_THROTTLING_FOLDING_TIME),
|
||||
targetRate(SERVER_KNOBS->GLOBAL_TAG_THROTTLING_FOLDING_TIME) {}
|
||||
targetRate(SERVER_KNOBS->GLOBAL_TAG_THROTTLING_FOLDING_TIME), transactionsLastAdded(now()) {}
|
||||
|
||||
Optional<ThrottleApi::TagQuotaValue> getQuota() const { return quota; }
|
||||
|
||||
|
@ -133,7 +134,10 @@ class GlobalTagThrottlerImpl {
|
|||
|
||||
void clearQuota() { quota = {}; }
|
||||
|
||||
void addTransactions(int count) { transactionCounter.addDelta(count); }
|
||||
void addTransactions(int count) {
|
||||
transactionsLastAdded = now();
|
||||
transactionCounter.addDelta(count);
|
||||
}
|
||||
|
||||
double getTransactionRate() const { return transactionCounter.smoothRate(); }
|
||||
|
||||
|
@ -151,6 +155,10 @@ class GlobalTagThrottlerImpl {
|
|||
targetRate.setTotal(targetTps);
|
||||
return targetRate.smoothTotal();
|
||||
}
|
||||
|
||||
bool recentTransactionsAdded() const {
|
||||
return now() - transactionsLastAdded < SERVER_KNOBS->GLOBAL_TAG_THROTTLING_TAG_EXPIRE_AFTER;
|
||||
}
|
||||
};
|
||||
|
||||
Database db;
|
||||
|
@ -278,7 +286,7 @@ class GlobalTagThrottlerImpl {
|
|||
for (const auto& t : tagsAffectingStorageServer) {
|
||||
auto const tQuota = getQuota(t, LimitType::TOTAL);
|
||||
sumQuota += tQuota.orDefault(0);
|
||||
if (tag.compare(tag) == 0) {
|
||||
if (t.compare(tag) == 0) {
|
||||
tagQuota = tQuota.orDefault(0);
|
||||
}
|
||||
}
|
||||
|
@ -360,6 +368,7 @@ class GlobalTagThrottlerImpl {
|
|||
tagsWithQuota.insert(tag);
|
||||
}
|
||||
self->removeUnseenQuotas(tagsWithQuota);
|
||||
self->removeExpiredTags();
|
||||
++self->throttledTagChangeId;
|
||||
wait(delay(5.0));
|
||||
break;
|
||||
|
@ -397,7 +406,24 @@ class GlobalTagThrottlerImpl {
|
|||
public:
|
||||
GlobalTagThrottlerImpl(Database db, UID id) : db(db), id(id) {}
|
||||
Future<Void> monitorThrottlingChanges() { return monitorThrottlingChanges(this); }
|
||||
void addRequests(TransactionTag tag, int count) { tagStatistics[tag].addTransactions(static_cast<double>(count)); }
|
||||
void addRequests(TransactionTag tag, int count) {
|
||||
auto it = tagStatistics.find(tag);
|
||||
if (it == tagStatistics.end()) {
|
||||
if (tagStatistics.size() == SERVER_KNOBS->GLOBAL_TAG_THROTTLING_MAX_TAGS_TRACKED) {
|
||||
CODE_PROBE(true,
|
||||
"Global tag throttler ignoring transactions because maximum number of trackable tags has "
|
||||
"been reached");
|
||||
TraceEvent("GlobalTagThrottler_IgnoringRequests")
|
||||
.suppressFor(1.0)
|
||||
.detail("Tag", printable(tag))
|
||||
.detail("Count", count);
|
||||
} else {
|
||||
tagStatistics[tag].addTransactions(static_cast<double>(count));
|
||||
}
|
||||
} else {
|
||||
it->second.addTransactions(static_cast<double>(count));
|
||||
}
|
||||
}
|
||||
uint64_t getThrottledTagChangeId() const { return throttledTagChangeId; }
|
||||
|
||||
TransactionTagMap<double> getProxyRates(int numProxies) {
|
||||
|
@ -465,11 +491,15 @@ public:
|
|||
throttlingRatios[ss.id] = ss.getThrottlingRatio(SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER,
|
||||
SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER);
|
||||
for (const auto& busyReadTag : ss.busiestReadTags) {
|
||||
if (tagStatistics.find(busyReadTag.tag) != tagStatistics.end()) {
|
||||
throughput[ss.id][busyReadTag.tag].updateCost(busyReadTag.rate, OpType::READ);
|
||||
}
|
||||
}
|
||||
for (const auto& busyWriteTag : ss.busiestWriteTags) {
|
||||
if (tagStatistics.find(busyWriteTag.tag) != tagStatistics.end()) {
|
||||
throughput[ss.id][busyWriteTag.tag].updateCost(busyWriteTag.rate, OpType::WRITE);
|
||||
}
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
@ -478,6 +508,22 @@ public:
|
|||
}
|
||||
|
||||
void removeQuota(TransactionTagRef tag) { tagStatistics[tag].clearQuota(); }
|
||||
|
||||
void removeExpiredTags() {
|
||||
for (auto it = tagStatistics.begin(); it != tagStatistics.end();) {
|
||||
const auto& [tag, stats] = *it;
|
||||
if (!stats.recentTransactionsAdded()) {
|
||||
for (auto& [ss, tagToCounters] : throughput) {
|
||||
tagToCounters.erase(tag);
|
||||
}
|
||||
it = tagStatistics.erase(it);
|
||||
} else {
|
||||
++it;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
uint32_t tagsTracked() const { return tagStatistics.size(); }
|
||||
};
|
||||
|
||||
GlobalTagThrottler::GlobalTagThrottler(Database db, UID id) : impl(PImpl<GlobalTagThrottlerImpl>::create(db, id)) {}
|
||||
|
@ -526,6 +572,14 @@ void GlobalTagThrottler::removeQuota(TransactionTagRef tag) {
|
|||
return impl->removeQuota(tag);
|
||||
}
|
||||
|
||||
uint32_t GlobalTagThrottler::tagsTracked() const {
|
||||
return impl->tagsTracked();
|
||||
}
|
||||
|
||||
void GlobalTagThrottler::removeExpiredTags() {
|
||||
return impl->removeExpiredTags();
|
||||
}
|
||||
|
||||
namespace GlobalTagThrottlerTesting {
|
||||
|
||||
enum class LimitType { RESERVED, TOTAL };
|
||||
|
@ -1025,3 +1079,47 @@ TEST_CASE("/GlobalTagThrottler/ReservedQuota") {
|
|||
wait(timeoutError(monitor || client || updater, 600.0));
|
||||
return Void();
|
||||
}
|
||||
|
||||
// Test that tags are expired iff a sufficient amount of time has passed since the
|
||||
// last transaction with that tag
|
||||
TEST_CASE("/GlobalTagThrottler/ExpireTags") {
|
||||
state GlobalTagThrottler globalTagThrottler(Database{}, UID{});
|
||||
state GlobalTagThrottlerTesting::StorageServerCollection storageServers(10, 5);
|
||||
TransactionTag testTag = "sampleTag1"_sr;
|
||||
|
||||
state Future<Void> client =
|
||||
timeout(GlobalTagThrottlerTesting::runClient(
|
||||
&globalTagThrottler, &storageServers, testTag, 10.0, 6.0, GlobalTagThrottlerTesting::OpType::READ),
|
||||
60.0,
|
||||
Void());
|
||||
state Future<Void> updater = timeout(
|
||||
GlobalTagThrottlerTesting::updateGlobalTagThrottler(&globalTagThrottler, &storageServers), 60.0, Void());
|
||||
wait(client && updater);
|
||||
client.cancel();
|
||||
updater.cancel();
|
||||
ASSERT_EQ(globalTagThrottler.tagsTracked(), 1);
|
||||
globalTagThrottler.removeExpiredTags();
|
||||
ASSERT_EQ(globalTagThrottler.tagsTracked(), 1);
|
||||
wait(delay(SERVER_KNOBS->GLOBAL_TAG_THROTTLING_TAG_EXPIRE_AFTER + 1.0));
|
||||
ASSERT_EQ(globalTagThrottler.tagsTracked(), 1);
|
||||
globalTagThrottler.removeExpiredTags();
|
||||
ASSERT_EQ(globalTagThrottler.tagsTracked(), 0);
|
||||
return Void();
|
||||
}
|
||||
|
||||
// Test that the number of tags tracked does not grow beyond SERVER_KNOBS->GLOBAL_TAG_THROTTLING_MAX_TAGS_TRACKED
|
||||
TEST_CASE("/GlobalTagThrottler/TagLimit") {
|
||||
state GlobalTagThrottler globalTagThrottler(Database{}, UID{});
|
||||
state GlobalTagThrottlerTesting::StorageServerCollection storageServers(10, 5);
|
||||
std::vector<Future<Void>> futures;
|
||||
for (int i = 0; i < 2 * SERVER_KNOBS->GLOBAL_TAG_THROTTLING_MAX_TAGS_TRACKED; ++i) {
|
||||
Arena arena;
|
||||
TransactionTag tag = makeString(8, arena);
|
||||
deterministicRandom()->randomBytes(mutateString(tag), tag.size());
|
||||
futures.push_back(GlobalTagThrottlerTesting::runClient(
|
||||
&globalTagThrottler, &storageServers, tag, 1.0, 6.0, GlobalTagThrottlerTesting::OpType::READ));
|
||||
}
|
||||
wait(timeout(waitForAll(futures), 60.0, Void()));
|
||||
ASSERT_EQ(globalTagThrottler.tagsTracked(), SERVER_KNOBS->GLOBAL_TAG_THROTTLING_MAX_TAGS_TRACKED);
|
||||
return Void();
|
||||
}
|
||||
|
|
|
@ -62,6 +62,7 @@ struct GrvProxyStats {
|
|||
LatencySample defaultTxnGRVTimeInQueue;
|
||||
LatencySample batchTxnGRVTimeInQueue;
|
||||
|
||||
// These latency bands and samples ignore latency injected by the GrvProxyTransactionTagThrottler
|
||||
LatencyBands grvLatencyBands;
|
||||
LatencySample grvLatencySample; // GRV latency metric sample of default priority
|
||||
LatencySample grvBatchLatencySample; // GRV latency metric sample of batched priority
|
||||
|
@ -692,7 +693,7 @@ ACTOR Future<Void> sendGrvReplies(Future<GetReadVersionReply> replyFuture,
|
|||
|
||||
double end = g_network->timer();
|
||||
for (GetReadVersionRequest const& request : requests) {
|
||||
double duration = end - request.requestTime();
|
||||
double duration = end - request.requestTime() - request.proxyTagThrottledDuration;
|
||||
if (request.priority == TransactionPriority::BATCH) {
|
||||
stats->grvBatchLatencySample.addMeasurement(duration);
|
||||
}
|
||||
|
|
|
@ -1693,7 +1693,7 @@ private:
|
|||
tlsConfig.addVerifyPeers(args.OptionArg());
|
||||
break;
|
||||
case OPT_KMS_CONN_DISCOVERY_URL_FILE: {
|
||||
knobs.emplace_back("rest_kms_connector_kms_discovery_url_file", args.OptionArg());
|
||||
knobs.emplace_back("rest_kms_connector_discover_kms_url_file", args.OptionArg());
|
||||
break;
|
||||
}
|
||||
case OPT_KMS_CONNECTOR_TYPE: {
|
||||
|
|
|
@ -39,6 +39,7 @@
|
|||
#include "flow/xxhash.h"
|
||||
|
||||
#include <functional>
|
||||
#include <limits>
|
||||
#include <tuple>
|
||||
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
@ -154,12 +155,14 @@ public:
|
|||
const EncodingHeader* h = reinterpret_cast<const EncodingHeader*>(encodingHeader);
|
||||
EncryptionKey s;
|
||||
s.xorKey = h->xorKey;
|
||||
s.xorWith = xorWith;
|
||||
return s;
|
||||
}
|
||||
|
||||
Future<EncryptionKey> getLatestDefaultEncryptionKey() override {
|
||||
EncryptionKey s;
|
||||
s.xorKey = xorWith;
|
||||
s.xorKey = static_cast<uint8_t>(deterministicRandom()->randomInt(0, std::numeric_limits<uint8_t>::max() + 1));
|
||||
s.xorWith = xorWith;
|
||||
return s;
|
||||
}
|
||||
|
||||
|
|
|
@ -228,6 +228,7 @@ public:
|
|||
struct EncryptionKeyRef {
|
||||
TextAndHeaderCipherKeys aesKey; // For AESEncryptionV1
|
||||
uint8_t xorKey; // For XOREncryption_TestOnly
|
||||
uint8_t xorWith; // For XOREncryption_TestOnly
|
||||
};
|
||||
using EncryptionKey = Standalone<EncryptionKeyRef>;
|
||||
|
||||
|
@ -345,8 +346,9 @@ public:
|
|||
Header* h = reinterpret_cast<Header*>(header);
|
||||
h->checksum = XXH3_64bits_withSeed(payload, len, seed);
|
||||
h->xorKey = encryptionKey.xorKey;
|
||||
uint8_t xorMask = ~encryptionKey.xorKey ^ encryptionKey.xorWith;
|
||||
for (int i = 0; i < len; ++i) {
|
||||
payload[i] ^= h->xorKey;
|
||||
payload[i] ^= xorMask;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -356,8 +358,9 @@ public:
|
|||
int len,
|
||||
PhysicalPageID seed) {
|
||||
Header* h = reinterpret_cast<Header*>(header);
|
||||
uint8_t xorMask = ~encryptionKey.xorKey ^ encryptionKey.xorWith;
|
||||
for (int i = 0; i < len; ++i) {
|
||||
payload[i] ^= h->xorKey;
|
||||
payload[i] ^= xorMask;
|
||||
}
|
||||
if (h->checksum != XXH3_64bits_withSeed(payload, len, seed)) {
|
||||
throw page_decoding_failed();
|
||||
|
|
|
@ -100,4 +100,6 @@ public:
|
|||
public:
|
||||
void setQuota(TransactionTagRef, ThrottleApi::TagQuotaValue const&);
|
||||
void removeQuota(TransactionTagRef);
|
||||
void removeExpiredTags();
|
||||
uint32_t tagsTracked() const;
|
||||
};
|
||||
|
|
|
@ -23,6 +23,7 @@
|
|||
#include "fdbclient/ManagementAPI.actor.h"
|
||||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbclient/SystemData.h"
|
||||
#include "fdbclient/TenantManagement.actor.h"
|
||||
#include "fdbserver/TesterInterface.actor.h"
|
||||
#include "fdbserver/workloads/workloads.actor.h"
|
||||
#include "fdbserver/workloads/BulkSetup.actor.h"
|
||||
|
@ -53,6 +54,7 @@ struct BlobGranuleRangesWorkload : TestWorkload {
|
|||
Future<Void> client;
|
||||
Future<Void> unitClient;
|
||||
bool stopUnitClient;
|
||||
Optional<TenantName> tenantName;
|
||||
|
||||
int32_t nextKey;
|
||||
|
||||
|
@ -85,6 +87,9 @@ struct BlobGranuleRangesWorkload : TestWorkload {
|
|||
nextKey = 10000000 * clientId;
|
||||
|
||||
stopUnitClient = false;
|
||||
if (deterministicRandom()->coinflip()) {
|
||||
tenantName = StringRef("bgrwTenant" + std::to_string(clientId));
|
||||
}
|
||||
|
||||
TraceEvent("BlobGranuleRangesWorkloadInit").detail("TargetRanges", targetRanges);
|
||||
}
|
||||
|
@ -100,17 +105,17 @@ struct BlobGranuleRangesWorkload : TestWorkload {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<bool> setRange(Database cx, KeyRange range, bool active) {
|
||||
ACTOR Future<bool> setRange(Database cx, KeyRange range, bool active, Optional<TenantName> tenantName) {
|
||||
if (active) {
|
||||
bool success = wait(cx->blobbifyRange(range));
|
||||
bool success = wait(cx->blobbifyRange(range, tenantName));
|
||||
return success;
|
||||
} else {
|
||||
bool success = wait(cx->unblobbifyRange(range));
|
||||
bool success = wait(cx->unblobbifyRange(range, tenantName));
|
||||
return success;
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> registerNewRange(Database cx, BlobGranuleRangesWorkload* self) {
|
||||
ACTOR Future<Void> registerNewRange(Database cx, BlobGranuleRangesWorkload* self, Optional<TenantName> tenantName) {
|
||||
std::string nextRangeKey = "R_" + self->newKey();
|
||||
state KeyRange range(KeyRangeRef(StringRef(nextRangeKey), strinc(StringRef(nextRangeKey))));
|
||||
if (BGRW_DEBUG) {
|
||||
|
@ -119,7 +124,8 @@ struct BlobGranuleRangesWorkload : TestWorkload {
|
|||
|
||||
// don't put in active ranges until AFTER set range command succeeds, to avoid checking a range that maybe
|
||||
// wasn't initialized
|
||||
bool success = wait(self->setRange(cx, range, true));
|
||||
bool success =
|
||||
wait(self->setRange(cx, range, true, tenantName.present() ? tenantName.get() : self->tenantName));
|
||||
ASSERT(success);
|
||||
|
||||
if (BGRW_DEBUG) {
|
||||
|
@ -154,10 +160,10 @@ struct BlobGranuleRangesWorkload : TestWorkload {
|
|||
range.begin.printable(),
|
||||
range.end.printable());
|
||||
}
|
||||
Key purgeKey = wait(self->versionedForcePurge(cx, range, {}));
|
||||
Key purgeKey = wait(self->versionedForcePurge(cx, range, self->tenantName));
|
||||
wait(cx->waitPurgeGranulesComplete(purgeKey));
|
||||
}
|
||||
bool success = wait(self->setRange(cx, range, false));
|
||||
bool success = wait(self->setRange(cx, range, false, self->tenantName));
|
||||
ASSERT(success);
|
||||
|
||||
if (BGRW_DEBUG) {
|
||||
|
@ -169,16 +175,45 @@ struct BlobGranuleRangesWorkload : TestWorkload {
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<TenantMapEntry> setupTenant(Database cx, TenantName name) {
|
||||
if (BGRW_DEBUG) {
|
||||
fmt::print("Creating tenant: {0}\n", name.printable());
|
||||
}
|
||||
|
||||
Optional<TenantMapEntry> entry = wait(TenantAPI::createTenant(cx.getReference(), name));
|
||||
ASSERT(entry.present());
|
||||
|
||||
if (BGRW_DEBUG) {
|
||||
fmt::print("Created tenant {0}: {1}\n", name.printable(), entry.get().prefix.printable());
|
||||
}
|
||||
|
||||
return entry.get();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> _setup(Database cx, BlobGranuleRangesWorkload* self) {
|
||||
// create initial target ranges
|
||||
TraceEvent("BlobGranuleRangesSetup").detail("InitialRanges", self->targetRanges).log();
|
||||
// set up blob granules
|
||||
wait(success(ManagementAPI::changeConfig(cx.getReference(), "blob_granules_enabled=1", true)));
|
||||
|
||||
if (self->tenantName.present()) {
|
||||
wait(success(ManagementAPI::changeConfig(cx.getReference(), "tenant_mode=optional_experimental", true)));
|
||||
wait(success(self->setupTenant(cx, self->tenantName.get())));
|
||||
|
||||
try {
|
||||
wait(self->registerNewRange(cx, self, "BogusTenant"_sr));
|
||||
ASSERT(false);
|
||||
} catch (Error& e) {
|
||||
if (e.code() != error_code_tenant_not_found) {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
state int i;
|
||||
std::vector<Future<Void>> createInitialRanges;
|
||||
for (i = 0; i < self->targetRanges; i++) {
|
||||
wait(self->registerNewRange(cx, self));
|
||||
wait(self->registerNewRange(cx, self, {}));
|
||||
}
|
||||
TraceEvent("BlobGranuleRangesSetupComplete");
|
||||
return Void();
|
||||
|
@ -200,19 +235,19 @@ struct BlobGranuleRangesWorkload : TestWorkload {
|
|||
return _check(cx, this);
|
||||
}
|
||||
|
||||
ACTOR Future<bool> isRangeActive(Database cx, KeyRange range) {
|
||||
ACTOR Future<bool> isRangeActive(Database cx, KeyRange range, Optional<TenantName> tenantName) {
|
||||
Optional<Version> rv;
|
||||
if (deterministicRandom()->coinflip()) {
|
||||
rv = latestVersion;
|
||||
}
|
||||
state Version v = wait(cx->verifyBlobRange(range, rv));
|
||||
state Version v = wait(cx->verifyBlobRange(range, rv, tenantName));
|
||||
return v != invalidVersion;
|
||||
}
|
||||
|
||||
ACTOR Future<Void> checkRange(Database cx, BlobGranuleRangesWorkload* self, KeyRange range, bool isActive) {
|
||||
// Check that a read completes for the range. If not loop around and try again
|
||||
loop {
|
||||
bool completed = wait(self->isRangeActive(cx, range));
|
||||
bool completed = wait(self->isRangeActive(cx, range, self->tenantName));
|
||||
|
||||
if (completed == isActive) {
|
||||
break;
|
||||
|
@ -228,7 +263,8 @@ struct BlobGranuleRangesWorkload : TestWorkload {
|
|||
wait(delay(1.0));
|
||||
}
|
||||
|
||||
Standalone<VectorRef<KeyRangeRef>> blobRanges = wait(cx->listBlobbifiedRanges(range, 1000000));
|
||||
Standalone<VectorRef<KeyRangeRef>> blobRanges =
|
||||
wait(cx->listBlobbifiedRanges(range, 1000000, self->tenantName));
|
||||
if (isActive) {
|
||||
ASSERT(blobRanges.size() == 1);
|
||||
ASSERT(blobRanges[0].begin <= range.begin);
|
||||
|
@ -237,7 +273,7 @@ struct BlobGranuleRangesWorkload : TestWorkload {
|
|||
ASSERT(blobRanges.empty());
|
||||
}
|
||||
|
||||
state Transaction tr(cx);
|
||||
state Transaction tr(cx, self->tenantName);
|
||||
loop {
|
||||
try {
|
||||
Standalone<VectorRef<KeyRangeRef>> granules = wait(tr.getBlobGranuleRanges(range, 1000000));
|
||||
|
@ -303,7 +339,7 @@ struct BlobGranuleRangesWorkload : TestWorkload {
|
|||
state Future<Void> waitNextOp = poisson(&last, 1.0 / self->operationsPerSecond);
|
||||
|
||||
if (self->activeRanges.empty() || deterministicRandom()->coinflip()) {
|
||||
wait(self->registerNewRange(cx, self));
|
||||
wait(self->registerNewRange(cx, self, {}));
|
||||
} else {
|
||||
wait(self->unregisterRandomRange(cx, self));
|
||||
}
|
||||
|
@ -318,9 +354,9 @@ struct BlobGranuleRangesWorkload : TestWorkload {
|
|||
}
|
||||
|
||||
// tear down range at end
|
||||
Key purgeKey = wait(self->versionedForcePurge(cx, range, {}));
|
||||
Key purgeKey = wait(self->versionedForcePurge(cx, range, self->tenantName));
|
||||
wait(cx->waitPurgeGranulesComplete(purgeKey));
|
||||
bool success = wait(self->setRange(cx, range, false));
|
||||
bool success = wait(self->setRange(cx, range, false, self->tenantName));
|
||||
ASSERT(success);
|
||||
|
||||
if (BGRW_DEBUG) {
|
||||
|
@ -337,35 +373,35 @@ struct BlobGranuleRangesWorkload : TestWorkload {
|
|||
if (BGRW_DEBUG) {
|
||||
fmt::print("VerifyRangeUnit: [{0} - {1})\n", range.begin.printable(), range.end.printable());
|
||||
}
|
||||
bool setSuccess = wait(self->setRange(cx, activeRange, true));
|
||||
bool setSuccess = wait(self->setRange(cx, activeRange, true, self->tenantName));
|
||||
ASSERT(setSuccess);
|
||||
wait(self->checkRange(cx, self, activeRange, true));
|
||||
|
||||
bool success1 = wait(self->isRangeActive(cx, KeyRangeRef(activeRange.begin, middleKey)));
|
||||
bool success1 = wait(self->isRangeActive(cx, KeyRangeRef(activeRange.begin, middleKey), self->tenantName));
|
||||
ASSERT(success1);
|
||||
|
||||
bool success2 = wait(self->isRangeActive(cx, KeyRangeRef(middleKey, activeRange.end)));
|
||||
bool success2 = wait(self->isRangeActive(cx, KeyRangeRef(middleKey, activeRange.end), self->tenantName));
|
||||
ASSERT(success2);
|
||||
|
||||
bool fail1 = wait(self->isRangeActive(cx, range));
|
||||
bool fail1 = wait(self->isRangeActive(cx, range, self->tenantName));
|
||||
ASSERT(!fail1);
|
||||
|
||||
bool fail2 = wait(self->isRangeActive(cx, KeyRangeRef(range.begin, activeRange.begin)));
|
||||
bool fail2 = wait(self->isRangeActive(cx, KeyRangeRef(range.begin, activeRange.begin), self->tenantName));
|
||||
ASSERT(!fail2);
|
||||
|
||||
bool fail3 = wait(self->isRangeActive(cx, KeyRangeRef(activeRange.end, range.end)));
|
||||
bool fail3 = wait(self->isRangeActive(cx, KeyRangeRef(activeRange.end, range.end), self->tenantName));
|
||||
ASSERT(!fail3);
|
||||
|
||||
bool fail4 = wait(self->isRangeActive(cx, KeyRangeRef(range.begin, middleKey)));
|
||||
bool fail4 = wait(self->isRangeActive(cx, KeyRangeRef(range.begin, middleKey), self->tenantName));
|
||||
ASSERT(!fail4);
|
||||
|
||||
bool fail5 = wait(self->isRangeActive(cx, KeyRangeRef(middleKey, range.end)));
|
||||
bool fail5 = wait(self->isRangeActive(cx, KeyRangeRef(middleKey, range.end), self->tenantName));
|
||||
ASSERT(!fail5);
|
||||
|
||||
bool fail6 = wait(self->isRangeActive(cx, KeyRangeRef(range.begin, activeRange.end)));
|
||||
bool fail6 = wait(self->isRangeActive(cx, KeyRangeRef(range.begin, activeRange.end), self->tenantName));
|
||||
ASSERT(!fail6);
|
||||
|
||||
bool fail7 = wait(self->isRangeActive(cx, KeyRangeRef(activeRange.begin, range.end)));
|
||||
bool fail7 = wait(self->isRangeActive(cx, KeyRangeRef(activeRange.begin, range.end), self->tenantName));
|
||||
ASSERT(!fail7);
|
||||
|
||||
wait(self->tearDownRangeAfterUnit(cx, self, activeRange));
|
||||
|
@ -390,7 +426,7 @@ struct BlobGranuleRangesWorkload : TestWorkload {
|
|||
for (i = 0; i < rangeCount; i++) {
|
||||
state KeyRange subRange(KeyRangeRef(boundaries[i], boundaries[i + 1]));
|
||||
if (i != rangeToNotBlobbify) {
|
||||
bool setSuccess = wait(self->setRange(cx, subRange, true));
|
||||
bool setSuccess = wait(self->setRange(cx, subRange, true, self->tenantName));
|
||||
ASSERT(setSuccess);
|
||||
wait(self->checkRange(cx, self, subRange, true));
|
||||
} else {
|
||||
|
@ -398,7 +434,7 @@ struct BlobGranuleRangesWorkload : TestWorkload {
|
|||
}
|
||||
}
|
||||
|
||||
bool success = wait(self->isRangeActive(cx, range));
|
||||
bool success = wait(self->isRangeActive(cx, range, self->tenantName));
|
||||
ASSERT(!success);
|
||||
|
||||
if (rangeToNotBlobbify != 0) {
|
||||
|
@ -416,11 +452,12 @@ struct BlobGranuleRangesWorkload : TestWorkload {
|
|||
BlobGranuleRangesWorkload* self,
|
||||
KeyRange expectedRange,
|
||||
KeyRange queryRange) {
|
||||
Standalone<VectorRef<KeyRangeRef>> blobRanges = wait(cx->listBlobbifiedRanges(queryRange, 1000000));
|
||||
Standalone<VectorRef<KeyRangeRef>> blobRanges =
|
||||
wait(cx->listBlobbifiedRanges(queryRange, 1000000, self->tenantName));
|
||||
ASSERT(blobRanges.size() == 1);
|
||||
ASSERT(blobRanges[0] == expectedRange);
|
||||
|
||||
state Transaction tr(cx);
|
||||
state Transaction tr(cx, self->tenantName);
|
||||
loop {
|
||||
try {
|
||||
Standalone<VectorRef<KeyRangeRef>> granules = wait(tr.getBlobGranuleRanges(queryRange, 1000000));
|
||||
|
@ -436,7 +473,7 @@ struct BlobGranuleRangesWorkload : TestWorkload {
|
|||
}
|
||||
|
||||
ACTOR Future<Void> rangesMisalignedUnit(Database cx, BlobGranuleRangesWorkload* self, KeyRange range) {
|
||||
bool setSuccess = wait(self->setRange(cx, range, true));
|
||||
bool setSuccess = wait(self->setRange(cx, range, true, self->tenantName));
|
||||
ASSERT(setSuccess);
|
||||
state KeyRange subRange(KeyRangeRef(range.begin.withSuffix("A"_sr), range.begin.withSuffix("B"_sr)));
|
||||
|
||||
|
@ -451,7 +488,7 @@ struct BlobGranuleRangesWorkload : TestWorkload {
|
|||
wait(self->checkRangesMisaligned(cx, self, range, KeyRangeRef(subRange.begin, range.end)));
|
||||
|
||||
try {
|
||||
wait(success(cx->purgeBlobGranules(subRange, 1, {}, false)));
|
||||
wait(success(cx->purgeBlobGranules(subRange, 1, self->tenantName, false)));
|
||||
ASSERT(false);
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_operation_cancelled) {
|
||||
|
@ -461,7 +498,7 @@ struct BlobGranuleRangesWorkload : TestWorkload {
|
|||
}
|
||||
|
||||
try {
|
||||
wait(success(cx->purgeBlobGranules(subRange, 1, {}, true)));
|
||||
wait(success(cx->purgeBlobGranules(subRange, 1, self->tenantName, true)));
|
||||
ASSERT(false);
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_operation_cancelled) {
|
||||
|
@ -489,50 +526,51 @@ struct BlobGranuleRangesWorkload : TestWorkload {
|
|||
|
||||
// unblobbifying range that already doesn't exist should be no-op
|
||||
if (deterministicRandom()->coinflip()) {
|
||||
bool unblobbifyStartSuccess = wait(self->setRange(cx, activeRange, false));
|
||||
bool unblobbifyStartSuccess = wait(self->setRange(cx, activeRange, false, self->tenantName));
|
||||
ASSERT(unblobbifyStartSuccess);
|
||||
}
|
||||
|
||||
bool success = wait(self->setRange(cx, activeRange, true));
|
||||
bool success = wait(self->setRange(cx, activeRange, true, self->tenantName));
|
||||
ASSERT(success);
|
||||
wait(self->checkRange(cx, self, activeRange, true));
|
||||
|
||||
// check that re-blobbifying same range is successful
|
||||
bool retrySuccess = wait(self->setRange(cx, activeRange, true));
|
||||
bool retrySuccess = wait(self->setRange(cx, activeRange, true, self->tenantName));
|
||||
ASSERT(retrySuccess);
|
||||
wait(self->checkRange(cx, self, activeRange, true));
|
||||
|
||||
// check that blobbifying range that overlaps but does not match existing blob range fails
|
||||
bool fail1 = wait(self->setRange(cx, range, true));
|
||||
bool fail1 = wait(self->setRange(cx, range, true, self->tenantName));
|
||||
ASSERT(!fail1);
|
||||
|
||||
bool fail2 = wait(self->setRange(cx, KeyRangeRef(range.begin, activeRange.end), true));
|
||||
bool fail2 = wait(self->setRange(cx, KeyRangeRef(range.begin, activeRange.end), true, self->tenantName));
|
||||
ASSERT(!fail2);
|
||||
|
||||
bool fail3 = wait(self->setRange(cx, KeyRangeRef(activeRange.begin, range.end), true));
|
||||
bool fail3 = wait(self->setRange(cx, KeyRangeRef(activeRange.begin, range.end), true, self->tenantName));
|
||||
ASSERT(!fail3);
|
||||
|
||||
bool fail4 = wait(self->setRange(cx, KeyRangeRef(range.begin, middleKey), true));
|
||||
bool fail4 = wait(self->setRange(cx, KeyRangeRef(range.begin, middleKey), true, self->tenantName));
|
||||
ASSERT(!fail4);
|
||||
|
||||
bool fail5 = wait(self->setRange(cx, KeyRangeRef(middleKey, range.end), true));
|
||||
bool fail5 = wait(self->setRange(cx, KeyRangeRef(middleKey, range.end), true, self->tenantName));
|
||||
ASSERT(!fail5);
|
||||
|
||||
bool fail6 = wait(self->setRange(cx, KeyRangeRef(activeRange.begin, middleKey), true));
|
||||
bool fail6 = wait(self->setRange(cx, KeyRangeRef(activeRange.begin, middleKey), true, self->tenantName));
|
||||
ASSERT(!fail6);
|
||||
|
||||
bool fail7 = wait(self->setRange(cx, KeyRangeRef(middleKey, activeRange.end), true));
|
||||
bool fail7 = wait(self->setRange(cx, KeyRangeRef(middleKey, activeRange.end), true, self->tenantName));
|
||||
ASSERT(!fail7);
|
||||
|
||||
bool fail8 = wait(self->setRange(cx, KeyRangeRef(middleKey, middleKey2), true));
|
||||
bool fail8 = wait(self->setRange(cx, KeyRangeRef(middleKey, middleKey2), true, self->tenantName));
|
||||
ASSERT(!fail8);
|
||||
|
||||
{
|
||||
Standalone<VectorRef<KeyRangeRef>> blobRanges = wait(cx->listBlobbifiedRanges(range, 1000000));
|
||||
Standalone<VectorRef<KeyRangeRef>> blobRanges =
|
||||
wait(cx->listBlobbifiedRanges(range, 1000000, self->tenantName));
|
||||
ASSERT(blobRanges.size() == 1);
|
||||
ASSERT(blobRanges[0] == activeRange);
|
||||
|
||||
state Transaction tr(cx);
|
||||
state Transaction tr(cx, self->tenantName);
|
||||
loop {
|
||||
try {
|
||||
Standalone<VectorRef<KeyRangeRef>> granules = wait(tr.getBlobGranuleRanges(range, 1000000));
|
||||
|
@ -545,50 +583,58 @@ struct BlobGranuleRangesWorkload : TestWorkload {
|
|||
}
|
||||
|
||||
// tear down + check that un-blobbifying at a non-aligned range also doesn't work
|
||||
Key purgeKey = wait(self->versionedForcePurge(cx, activeRange, {}));
|
||||
Key purgeKey = wait(self->versionedForcePurge(cx, activeRange, self->tenantName));
|
||||
wait(cx->waitPurgeGranulesComplete(purgeKey));
|
||||
|
||||
if (deterministicRandom()->coinflip()) {
|
||||
// force purge again and ensure it is idempotent
|
||||
Key purgeKeyAgain = wait(cx->purgeBlobGranules(activeRange, 1, {}, true));
|
||||
Key purgeKeyAgain = wait(cx->purgeBlobGranules(activeRange, 1, self->tenantName, true));
|
||||
wait(cx->waitPurgeGranulesComplete(purgeKeyAgain));
|
||||
}
|
||||
}
|
||||
|
||||
// Check that the blob range is still listed
|
||||
{
|
||||
Standalone<VectorRef<KeyRangeRef>> blobRanges = wait(cx->listBlobbifiedRanges(range, 1000000));
|
||||
Standalone<VectorRef<KeyRangeRef>> blobRanges =
|
||||
wait(cx->listBlobbifiedRanges(range, 1000000, self->tenantName));
|
||||
ASSERT(blobRanges.size() == 1);
|
||||
ASSERT(blobRanges[0] == activeRange);
|
||||
|
||||
bool unblobbifyFail1 = wait(self->setRange(cx, range, false));
|
||||
bool unblobbifyFail1 = wait(self->setRange(cx, range, false, self->tenantName));
|
||||
ASSERT(!unblobbifyFail1);
|
||||
|
||||
bool unblobbifyFail2 = wait(self->setRange(cx, KeyRangeRef(range.begin, activeRange.end), false));
|
||||
bool unblobbifyFail2 =
|
||||
wait(self->setRange(cx, KeyRangeRef(range.begin, activeRange.end), false, self->tenantName));
|
||||
ASSERT(!unblobbifyFail2);
|
||||
|
||||
bool unblobbifyFail3 = wait(self->setRange(cx, KeyRangeRef(activeRange.begin, range.end), false));
|
||||
bool unblobbifyFail3 =
|
||||
wait(self->setRange(cx, KeyRangeRef(activeRange.begin, range.end), false, self->tenantName));
|
||||
ASSERT(!unblobbifyFail3);
|
||||
|
||||
bool unblobbifyFail4 = wait(self->setRange(cx, KeyRangeRef(activeRange.begin, middleKey), false));
|
||||
bool unblobbifyFail4 =
|
||||
wait(self->setRange(cx, KeyRangeRef(activeRange.begin, middleKey), false, self->tenantName));
|
||||
ASSERT(!unblobbifyFail4);
|
||||
|
||||
bool unblobbifyFail5 = wait(self->setRange(cx, KeyRangeRef(middleKey, activeRange.end), false));
|
||||
bool unblobbifyFail5 =
|
||||
wait(self->setRange(cx, KeyRangeRef(middleKey, activeRange.end), false, self->tenantName));
|
||||
ASSERT(!unblobbifyFail5);
|
||||
|
||||
bool unblobbifyFail6 = wait(self->setRange(cx, KeyRangeRef(activeRange.begin, middleKey), false));
|
||||
bool unblobbifyFail6 =
|
||||
wait(self->setRange(cx, KeyRangeRef(activeRange.begin, middleKey), false, self->tenantName));
|
||||
ASSERT(!unblobbifyFail6);
|
||||
|
||||
bool unblobbifyFail7 = wait(self->setRange(cx, KeyRangeRef(middleKey, activeRange.end), false));
|
||||
bool unblobbifyFail7 =
|
||||
wait(self->setRange(cx, KeyRangeRef(middleKey, activeRange.end), false, self->tenantName));
|
||||
ASSERT(!unblobbifyFail7);
|
||||
|
||||
bool unblobbifyFail8 = wait(self->setRange(cx, KeyRangeRef(middleKey, middleKey2), false));
|
||||
bool unblobbifyFail8 =
|
||||
wait(self->setRange(cx, KeyRangeRef(middleKey, middleKey2), false, self->tenantName));
|
||||
ASSERT(!unblobbifyFail8);
|
||||
|
||||
bool unblobbifySuccess = wait(self->setRange(cx, activeRange, true));
|
||||
bool unblobbifySuccess = wait(self->setRange(cx, activeRange, true, self->tenantName));
|
||||
ASSERT(unblobbifySuccess);
|
||||
|
||||
bool unblobbifySuccessAgain = wait(self->setRange(cx, activeRange, true));
|
||||
bool unblobbifySuccessAgain = wait(self->setRange(cx, activeRange, true, self->tenantName));
|
||||
ASSERT(unblobbifySuccessAgain);
|
||||
}
|
||||
|
||||
|
@ -596,20 +642,20 @@ struct BlobGranuleRangesWorkload : TestWorkload {
|
|||
}
|
||||
|
||||
ACTOR Future<Void> reBlobbifyUnit(Database cx, BlobGranuleRangesWorkload* self, KeyRange range) {
|
||||
bool setSuccess = wait(self->setRange(cx, range, true));
|
||||
bool setSuccess = wait(self->setRange(cx, range, true, self->tenantName));
|
||||
ASSERT(setSuccess);
|
||||
wait(self->checkRange(cx, self, range, true));
|
||||
|
||||
// force purge range
|
||||
Key purgeKey = wait(self->versionedForcePurge(cx, range, {}));
|
||||
Key purgeKey = wait(self->versionedForcePurge(cx, range, self->tenantName));
|
||||
wait(cx->waitPurgeGranulesComplete(purgeKey));
|
||||
wait(self->checkRange(cx, self, range, false));
|
||||
|
||||
bool unsetSuccess = wait(self->setRange(cx, range, false));
|
||||
bool unsetSuccess = wait(self->setRange(cx, range, false, self->tenantName));
|
||||
ASSERT(unsetSuccess);
|
||||
wait(self->checkRange(cx, self, range, false));
|
||||
|
||||
bool reSetSuccess = wait(self->setRange(cx, range, true));
|
||||
bool reSetSuccess = wait(self->setRange(cx, range, true, self->tenantName));
|
||||
ASSERT(reSetSuccess);
|
||||
wait(self->checkRange(cx, self, range, true));
|
||||
|
||||
|
|
|
@ -70,6 +70,7 @@ public: // introduced features
|
|||
API_VERSION_FEATURE(@FDB_AV_CREATE_DB_FROM_CONN_STRING@, CreateDBFromConnString);
|
||||
API_VERSION_FEATURE(@FDB_AV_FUTURE_GET_BOOL@, FutureGetBool);
|
||||
API_VERSION_FEATURE(@FDB_AV_FUTURE_PROTOCOL_VERSION_API@, FutureProtocolVersionApi);
|
||||
API_VERSION_FEATURE(@FDB_AV_TENANT_BLOB_RANGE_API@, TenantBlobRangeApi);
|
||||
};
|
||||
|
||||
#endif // FLOW_CODE_API_VERSION_H
|
||||
|
|
|
@ -11,3 +11,4 @@ set(FDB_AV_BLOB_RANGE_API "720")
|
|||
set(FDB_AV_CREATE_DB_FROM_CONN_STRING "720")
|
||||
set(FDB_AV_FUTURE_GET_BOOL "720")
|
||||
set(FDB_AV_FUTURE_PROTOCOL_VERSION_API "720")
|
||||
set(FDB_AV_TENANT_BLOB_RANGE_API "720")
|
|
@ -73,6 +73,7 @@ void FlowKnobs::initialize(Randomize randomize, IsSimulated isSimulated) {
|
|||
|
||||
init( RANDOMSEED_RETRY_LIMIT, 4 );
|
||||
init( FAST_ALLOC_LOGGING_BYTES, 10e6 );
|
||||
init( FAST_ALLOC_ALLOW_GUARD_PAGES, false );
|
||||
init( HUGE_ARENA_LOGGING_BYTES, 100e6 );
|
||||
init( HUGE_ARENA_LOGGING_INTERVAL, 5.0 );
|
||||
|
||||
|
|
|
@ -2126,7 +2126,7 @@ static void mprotectSafe(void* p, size_t s, int prot) {
|
|||
}
|
||||
|
||||
static void* mmapInternal(size_t length, int flags, bool guardPages) {
|
||||
if (guardPages) {
|
||||
if (guardPages && FLOW_KNOBS->FAST_ALLOC_ALLOW_GUARD_PAGES) {
|
||||
static size_t pageSize = sysconf(_SC_PAGESIZE);
|
||||
length = RightAlign(length, pageSize);
|
||||
length += 2 * pageSize; // Map enough for the guard pages
|
||||
|
|
|
@ -131,6 +131,7 @@ public:
|
|||
|
||||
int RANDOMSEED_RETRY_LIMIT;
|
||||
double FAST_ALLOC_LOGGING_BYTES;
|
||||
bool FAST_ALLOC_ALLOW_GUARD_PAGES;
|
||||
double HUGE_ARENA_LOGGING_BYTES;
|
||||
double HUGE_ARENA_LOGGING_INTERVAL;
|
||||
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
[configuration]
|
||||
extraMachineCountDC = 2
|
||||
storageEngineExcludeTypes = [3]
|
||||
|
||||
[[test]]
|
||||
testTitle = 'CloggedConfigureDatabaseTest'
|
||||
|
|
|
@ -1,6 +1,3 @@
|
|||
[configuration]
|
||||
storageEngineExcludeTypes = [3]
|
||||
|
||||
[[test]]
|
||||
testTitle='CloggedConfigureDatabaseTest'
|
||||
clearAfterTest=false
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
storageEngineExcludeTypes=3,4,5
|
||||
storageEngineExcludeTypes=4,5
|
||||
|
||||
;Take snap and do cycle test
|
||||
testTitle=SnapCyclePre
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
storageEngineExcludeTypes=3,4,5
|
||||
storageEngineExcludeTypes=4,5
|
||||
|
||||
logAntiQuorum = 0
|
||||
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
storageEngineExcludeTypes=3,4,5
|
||||
storageEngineExcludeTypes=4,5
|
||||
|
||||
;write 1000 Keys ending with even numbers
|
||||
testTitle=SnapTestPre
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
storageEngineExcludeTypes=3,4,5
|
||||
storageEngineExcludeTypes=4,5
|
||||
|
||||
;write 1000 Keys ending with even numbers
|
||||
testTitle=SnapTestPre
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
storageEngineExcludeTypes=3,4,5
|
||||
storageEngineExcludeTypes=4,5
|
||||
|
||||
;write 1000 Keys ending with even number
|
||||
testTitle=SnapSimplePre
|
||||
|
|
|
@ -1,6 +1,3 @@
|
|||
[configuration]
|
||||
storageEngineExcludeTypes = [3]
|
||||
|
||||
[[knobs]]
|
||||
enable_version_vector = true
|
||||
enable_version_vector_tlog_unicast = true
|
||||
|
|
|
@ -1,6 +1,3 @@
|
|||
[configuration]
|
||||
storageEngineExcludeTypes = [3]
|
||||
|
||||
[[knobs]]
|
||||
enable_version_vector = false
|
||||
enable_version_vector_tlog_unicast = false
|
||||
|
|
|
@ -1,4 +1,3 @@
|
|||
storageEngineExcludeTypes=3
|
||||
extraDatabaseMode=Local
|
||||
|
||||
testTitle=DrUpgrade
|
||||
|
|
|
@ -3,7 +3,7 @@ extraMachineCountDC = 2
|
|||
maxTLogVersion=6
|
||||
disableHostname=true
|
||||
disableEncryption=true
|
||||
storageEngineExcludeTypes=[3,4]
|
||||
storageEngineExcludeTypes=[4]
|
||||
|
||||
[[knobs]]
|
||||
# This can be removed once the lower bound of this downgrade test is a version that understands the new protocol
|
||||
|
|
Loading…
Reference in New Issue