Add fdb_transaction_get_tag_throttled_duration function to C bindings

This commit is contained in:
sfc-gh-tclinkenbeard 2022-11-13 11:22:57 -08:00
parent 8a99f9ea05
commit 7fc85d86b4
20 changed files with 141 additions and 10 deletions

View File

@ -251,6 +251,10 @@ extern "C" DLLEXPORT fdb_error_t fdb_future_get_uint64(FDBFuture* f, uint64_t* o
CATCH_AND_RETURN(*out = TSAV(uint64_t, f)->get(););
}
extern "C" DLLEXPORT fdb_error_t fdb_future_get_double(FDBFuture* f, double* out) {
CATCH_AND_RETURN(*out = TSAV(double, f)->get(););
}
extern "C" DLLEXPORT fdb_error_t fdb_future_get_key(FDBFuture* f, uint8_t const** out_key, int* out_key_length) {
CATCH_AND_RETURN(KeyRef key = TSAV(Key, f)->get(); *out_key = key.begin(); *out_key_length = key.size(););
}
@ -905,6 +909,10 @@ extern "C" DLLEXPORT fdb_error_t fdb_transaction_get_committed_version(FDBTransa
CATCH_AND_RETURN(*out_version = TXN(tr)->getCommittedVersion(););
}
extern "C" DLLEXPORT FDBFuture* fdb_transaction_get_tag_throttled_duration(FDBTransaction* tr) {
return (FDBFuture*)TXN(tr)->getTagThrottledDuration().extractPtr();
}
extern "C" DLLEXPORT FDBFuture* fdb_transaction_get_total_cost(FDBTransaction* tr) {
return (FDBFuture*)TXN(tr)->getTotalCost().extractPtr();
}

View File

@ -241,6 +241,8 @@ DLLEXPORT WARN_UNUSED_RESULT fdb_error_t fdb_future_get_int64(FDBFuture* f, int6
DLLEXPORT WARN_UNUSED_RESULT fdb_error_t fdb_future_get_uint64(FDBFuture* f, uint64_t* out);
DLLEXPORT WARN_UNUSED_RESULT fdb_error_t fdb_future_get_double(FDBFuture* f, double* out);
DLLEXPORT WARN_UNUSED_RESULT fdb_error_t fdb_future_get_key(FDBFuture* f, uint8_t const** out_key, int* out_key_length);
DLLEXPORT WARN_UNUSED_RESULT fdb_error_t fdb_future_get_value(FDBFuture* f,
@ -514,12 +516,14 @@ DLLEXPORT WARN_UNUSED_RESULT fdb_error_t fdb_transaction_get_committed_version(F
int64_t* out_version);
/*
* These functions intentionally return an FDBFuture instead of an integer
* These functions intentionally return an FDBFuture instead of a numeric value
* directly, so that calling the API can see the effect of previous
* mutations on the transaction. Specifically, mutations are applied
* asynchronously by the main thread. In order to see them, this call has to
* be serviced by the main thread too.
*/
DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_transaction_get_tag_throttled_duration(FDBTransaction* tr);
DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_transaction_get_total_cost(FDBTransaction* tr);
DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_transaction_get_approximate_size(FDBTransaction* tr);

View File

@ -60,6 +60,12 @@ void Future::cancel() {
return fdb_future_get_int64(future_, out);
}
// DoubleFuture
[[nodiscard]] fdb_error_t DoubleFuture::get(double* out) {
return fdb_future_get_double(future_, out);
}
// ValueFuture
[[nodiscard]] fdb_error_t ValueFuture::get(fdb_bool_t* out_present, const uint8_t** out_value, int* out_value_length) {
@ -235,6 +241,10 @@ Int64Future Transaction::get_total_cost() {
return Int64Future(fdb_transaction_get_total_cost(tr_));
}
DoubleFuture Transaction::get_tag_throttled_duration() {
return DoubleFuture(fdb_transaction_get_tag_throttled_duration(tr_));
}
KeyFuture Transaction::get_versionstamp() {
return KeyFuture(fdb_transaction_get_versionstamp(tr_));
}

View File

@ -80,7 +80,7 @@ protected:
class Int64Future : public Future {
public:
// Call this function instead of fdb_future_get_int64 when using the
// Int64Future type. It's behavior is identical to fdb_future_get_int64.
// Int64Future type. Its behavior is identical to fdb_future_get_int64.
fdb_error_t get(int64_t* out);
private:
@ -89,10 +89,22 @@ private:
Int64Future(FDBFuture* f) : Future(f) {}
};
class DoubleFuture : public Future {
public:
// Call this function instead of fdb_future_get_double when using the
// DoubleFuture type. Its behavior is identical to fdb_future_get_double.
fdb_error_t get(double* out);
private:
friend class Transaction;
friend class Database;
DoubleFuture(FDBFuture* f) : Future(f) {}
};
class KeyFuture : public Future {
public:
// Call this function instead of fdb_future_get_key when using the KeyFuture
// type. It's behavior is identical to fdb_future_get_key.
// type. Its behavior is identical to fdb_future_get_key.
fdb_error_t get(const uint8_t** out_key, int* out_key_length);
private:
@ -105,7 +117,7 @@ private:
class ValueFuture : public Future {
public:
// Call this function instead of fdb_future_get_value when using the
// ValueFuture type. It's behavior is identical to fdb_future_get_value.
// ValueFuture type. Its behavior is identical to fdb_future_get_value.
fdb_error_t get(fdb_bool_t* out_present, const uint8_t** out_value, int* out_value_length);
private:
@ -116,7 +128,7 @@ private:
class StringArrayFuture : public Future {
public:
// Call this function instead of fdb_future_get_string_array when using the
// StringArrayFuture type. It's behavior is identical to
// StringArrayFuture type. Its behavior is identical to
// fdb_future_get_string_array.
fdb_error_t get(const char*** out_strings, int* out_count);
@ -128,7 +140,7 @@ private:
class KeyValueArrayFuture : public Future {
public:
// Call this function instead of fdb_future_get_keyvalue_array when using
// the KeyValueArrayFuture type. It's behavior is identical to
// the KeyValueArrayFuture type. Its behavior is identical to
// fdb_future_get_keyvalue_array.
fdb_error_t get(const FDBKeyValue** out_kv, int* out_count, fdb_bool_t* out_more);
@ -152,7 +164,7 @@ private:
class KeyRangeArrayFuture : public Future {
public:
// Call this function instead of fdb_future_get_keyrange_array when using
// the KeyRangeArrayFuture type. It's behavior is identical to
// the KeyRangeArrayFuture type. Its behavior is identical to
// fdb_future_get_keyrange_array.
fdb_error_t get(const FDBKeyRange** out_keyranges, int* out_count);
@ -164,7 +176,7 @@ private:
class GranuleSummaryArrayFuture : public Future {
public:
// Call this function instead of fdb_future_get_granule_summary_array when using
// the GranuleSummaryArrayFuture type. It's behavior is identical to
// the GranuleSummaryArrayFuture type. Its behavior is identical to
// fdb_future_get_granule_summary_array.
fdb_error_t get(const FDBGranuleSummary** out_summaries, int* out_count);
@ -193,7 +205,7 @@ protected:
class KeyValueArrayResult : public Result {
public:
// Call this function instead of fdb_result_get_keyvalue_array when using
// the KeyValueArrayREsult type. It's behavior is identical to
// the KeyValueArrayREsult type. Its behavior is identical to
// fdb_result_get_keyvalue_array.
fdb_error_t get(const FDBKeyValue** out_kv, int* out_count, fdb_bool_t* out_more);
@ -276,9 +288,12 @@ public:
// Returns a future which will be set to the approximate transaction size so far.
Int64Future get_approximate_size();
// Returns a future which will be set tot the transaction's total cost so far.
// Returns a future which will be set to the transaction's total cost so far.
Int64Future get_total_cost();
// Returns a future which will be set to the transaction's tag throttling duration.
DoubleFuture get_tag_throttled_duration();
// Returns a future which will be set to the versionstamp which was used by
// any versionstamp operations in the transaction.
KeyFuture get_versionstamp();

View File

@ -1945,6 +1945,30 @@ TEST_CASE("fdb_transaction_get_committed_version") {
}
}
TEST_CASE("fdb_transaction_get_tag_throttled_duration") {
fdb::Transaction tr(db);
while (1) {
fdb::ValueFuture f1 = tr.get("foo", /*snapshot*/ false);
fdb_error_t err = wait_future(f1);
if (err) {
fdb::EmptyFuture fOnError = tr.on_error(err);
fdb_check(wait_future(fOnError));
continue;
}
fdb::DoubleFuture f2 = tr.get_tag_throttled_duration();
err = wait_future(f2);
if (err) {
fdb::EmptyFuture fOnError = tr.on_error(err);
fdb_check(wait_future(fOnError));
continue;
}
double tagThrottledDuration;
fdb_check(f2.get(&tagThrottledDuration));
CHECK(tagThrottledDuration >= 0.0);
break;
}
}
TEST_CASE("fdb_transaction_get_total_cost") {
fdb::Transaction tr(db);
while (1) {

View File

@ -415,6 +415,20 @@ Version DLTransaction::getCommittedVersion() {
return version;
}
ThreadFuture<double> DLTransaction::getTagThrottledDuration() {
if (!api->transactionGetTagThrottledDuration) {
return unsupported_operation();
}
FdbCApi::FDBFuture* f = api->transactionGetTagThrottledDuration(tr);
return toThreadFuture<double>(api, f, [](FdbCApi::FDBFuture* f, FdbCApi* api) {
double duration;
FdbCApi::fdb_error_t error = api->futureGetDouble(f, &duration);
ASSERT(!error);
return duration;
});
}
ThreadFuture<int64_t> DLTransaction::getTotalCost() {
if (!api->transactionGetTotalCost) {
return unsupported_operation();
@ -965,6 +979,11 @@ void DLApi::init() {
fdbCPath,
"fdb_transaction_get_committed_version",
headerVersion >= 0);
loadClientFunction(&api->transactionGetTagThrottledDuration,
lib,
fdbCPath,
"fdb_transaction_get_tag_throttled_duration",
headerVersion >= ApiVersion::withGetTagThrottledDuration().version());
loadClientFunction(&api->transactionGetTotalCost,
lib,
fdbCPath,
@ -1014,6 +1033,11 @@ void DLApi::init() {
fdbCPath,
"fdb_transaction_summarize_blob_granules",
headerVersion >= ApiVersion::withBlobRangeApi().version());
loadClientFunction(&api->futureGetDouble,
lib,
fdbCPath,
"fdb_get_double",
headerVersion >= ApiVersion::withFutureGetDouble().version());
loadClientFunction(&api->futureGetInt64,
lib,
fdbCPath,
@ -1506,6 +1530,12 @@ ThreadFuture<SpanContext> MultiVersionTransaction::getSpanContext() {
return SpanContext();
}
ThreadFuture<double> MultiVersionTransaction::getTagThrottledDuration() {
auto tr = getTransaction();
auto f = tr.transaction ? tr.transaction->getTagThrottledDuration() : makeTimeout<double>();
return abortableFuture(f, tr.onChange);
}
ThreadFuture<int64_t> MultiVersionTransaction::getTotalCost() {
auto tr = getTransaction();
auto f = tr.transaction ? tr.transaction->getTotalCost() : makeTimeout<int64_t>();

View File

@ -7190,6 +7190,7 @@ ACTOR Future<Version> extractReadVersion(Reference<TransactionState> trState,
double latency = replyTime - trState->startTime;
trState->cx->lastProxyRequestTime = trState->startTime;
trState->cx->updateCachedReadVersion(trState->startTime, rep.version);
trState->proxyTagThrottledDuration = rep.proxyTagThrottledDuration;
if (rep.rkBatchThrottled) {
trState->cx->lastRkBatchThrottleTime = replyTime;
}
@ -7378,6 +7379,11 @@ Optional<Version> Transaction::getCachedReadVersion() const {
}
}
double Transaction::getTagThrottledDuration() const {
// TODO: Implement
return 0.0;
}
Future<Standalone<StringRef>> Transaction::getVersionstamp() {
if (committing.isValid()) {
return transaction_invalid_version();

View File

@ -564,6 +564,10 @@ Version PaxosConfigTransaction::getCommittedVersion() const {
return impl->getCommittedVersion();
}
double PaxosConfigTransaction::getTagThrottledDuration() const {
return 0.0;
}
int64_t PaxosConfigTransaction::getTotalCost() const {
return 0;
}

View File

@ -296,6 +296,10 @@ Version SimpleConfigTransaction::getCommittedVersion() const {
return impl->getCommittedVersion();
}
double SimpleConfigTransaction::getTagThrottledDuration() const {
return 0.0;
}
int64_t SimpleConfigTransaction::getTotalCost() const {
return 0;
}

View File

@ -626,6 +626,14 @@ ThreadFuture<SpanContext> ThreadSafeTransaction::getSpanContext() {
});
}
ThreadFuture<double> ThreadSafeTransaction::getTagThrottledDuration() {
ISingleThreadTransaction* tr = this->tr;
return onMainThread([tr]() -> Future<double> {
tr->checkDeferredError();
return tr->getTagThrottledDuration();
});
}
ThreadFuture<int64_t> ThreadSafeTransaction::getTotalCost() {
ISingleThreadTransaction* tr = this->tr;
return onMainThread([tr]() -> Future<int64_t> {

View File

@ -120,6 +120,7 @@ public:
// later if they are not really needed.
virtual ThreadFuture<VersionVector> getVersionVector() = 0;
virtual ThreadFuture<SpanContext> getSpanContext() = 0;
virtual ThreadFuture<double> getTagThrottledDuration() = 0;
virtual ThreadFuture<int64_t> getTotalCost() = 0;
virtual ThreadFuture<int64_t> getApproximateSize() = 0;

View File

@ -101,6 +101,7 @@ public:
virtual Version getCommittedVersion() const = 0;
virtual VersionVector getVersionVector() const = 0;
virtual SpanContext getSpanContext() const = 0;
virtual double getTagThrottledDuration() const = 0;
virtual int64_t getTotalCost() const = 0;
virtual int64_t getApproximateSize() const = 0;
virtual Future<Standalone<StringRef>> getVersionstamp() = 0;

View File

@ -377,6 +377,7 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
FDBFuture* (*transactionCommit)(FDBTransaction* tr);
fdb_error_t (*transactionGetCommittedVersion)(FDBTransaction* tr, int64_t* outVersion);
FDBFuture* (*transactionGetTagThrottledDuration)(FDBTransaction* tr);
FDBFuture* (*transactionGetTotalCost)(FDBTransaction* tr);
FDBFuture* (*transactionGetApproximateSize)(FDBTransaction* tr);
FDBFuture* (*transactionWatch)(FDBTransaction* tr, uint8_t const* keyName, int keyNameLength);
@ -396,6 +397,7 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
fdb_error_t (*futureGetInt64)(FDBFuture* f, int64_t* outValue);
fdb_error_t (*futureGetUInt64)(FDBFuture* f, uint64_t* outValue);
fdb_error_t (*futureGetBool)(FDBFuture* f, fdb_bool_t* outValue);
fdb_error_t (*futureGetDouble)(FDBFuture* f, double* outValue);
fdb_error_t (*futureGetError)(FDBFuture* f);
fdb_error_t (*futureGetKey)(FDBFuture* f, uint8_t const** outKey, int* outKeyLength);
fdb_error_t (*futureGetValue)(FDBFuture* f, fdb_bool_t* outPresent, uint8_t const** outValue, int* outValueLength);
@ -506,6 +508,7 @@ public:
Version getCommittedVersion() override;
ThreadFuture<VersionVector> getVersionVector() override;
ThreadFuture<SpanContext> getSpanContext() override { return SpanContext(); };
ThreadFuture<double> getTagThrottledDuration() override;
ThreadFuture<int64_t> getTotalCost() override;
ThreadFuture<int64_t> getApproximateSize() override;
@ -734,6 +737,7 @@ public:
Version getCommittedVersion() override;
ThreadFuture<VersionVector> getVersionVector() override;
ThreadFuture<SpanContext> getSpanContext() override;
ThreadFuture<double> getTagThrottledDuration() override;
ThreadFuture<int64_t> getTotalCost() override;
ThreadFuture<int64_t> getApproximateSize() override;

View File

@ -253,6 +253,8 @@ struct TransactionState : ReferenceCounted<TransactionState> {
// after rounding up to the nearest page size and applying a write penalty
int64_t totalCost = 0;
double proxyTagThrottledDuration = 0.0;
// Special flag to skip prepending tenant prefix to mutations and conflict ranges
// when a dummy, internal transaction gets commited. The sole purpose of commitDummyTransaction() is to
// resolve the state of earlier transaction that returned commit_unknown_result or request_maybe_delivered.
@ -452,6 +454,8 @@ public:
int64_t getTotalCost() const { return trState->totalCost; }
double getTagThrottledDuration() const;
// Will be fulfilled only after commit() returns success
[[nodiscard]] Future<Standalone<StringRef>> getVersionstamp();

View File

@ -64,6 +64,7 @@ public:
void clear(KeyRef const&) override;
Future<Void> commit() override;
Version getCommittedVersion() const override;
double getTagThrottledDuration() const override;
int64_t getTotalCost() const override;
int64_t getApproximateSize() const override;
void setOption(FDBTransactionOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;

View File

@ -149,6 +149,7 @@ public:
VersionVector getVersionVector() const override { return tr.getVersionVector(); }
SpanContext getSpanContext() const override { return tr.getSpanContext(); }
double getTagThrottledDuration() const override { return tr.getTagThrottledDuration(); }
int64_t getTotalCost() const override { return tr.getTotalCost(); }
int64_t getApproximateSize() const override { return approximateSize; }
[[nodiscard]] Future<Standalone<StringRef>> getVersionstamp() override;

View File

@ -76,6 +76,7 @@ public:
void reset() override;
void debugTransaction(UID dID) override;
void checkDeferredError() const override;
double getTagThrottledDuration() const override;
int64_t getTotalCost() const override;
int64_t getApproximateSize() const override;
void set(KeyRef const&, ValueRef const&) override;

View File

@ -205,6 +205,7 @@ public:
Version getCommittedVersion() override;
ThreadFuture<VersionVector> getVersionVector() override;
ThreadFuture<SpanContext> getSpanContext() override;
ThreadFuture<double> getTagThrottledDuration() override;
ThreadFuture<int64_t> getTotalCost() override;
ThreadFuture<int64_t> getApproximateSize() override;

View File

@ -73,6 +73,8 @@ public: // introduced features
API_VERSION_FEATURE(@FDB_AV_TENANT_BLOB_RANGE_API@, TenantBlobRangeApi);
API_VERSION_FEATURE(@FDB_AV_GET_TOTAL_COST@, GetTotalCost);
API_VERSION_FEATURE(@FDB_AV_FAIL_ON_EXTERNAL_CLIENT_ERRORS@, FailOnExternalClientErrors);
API_VERSION_FEATURE(@FDB_AV_GET_TAG_THROTTLED_DURATION@, GetTagThrottledDuration);
API_VERSION_FEATURE(@FDB_AV_FUTURE_GET_DOUBLE@, FutureGetDouble);
};
#endif // FLOW_CODE_API_VERSION_H

View File

@ -14,3 +14,5 @@ set(FDB_AV_FUTURE_PROTOCOL_VERSION_API "720")
set(FDB_AV_TENANT_BLOB_RANGE_API "720")
set(FDB_AV_GET_TOTAL_COST "730")
set(FDB_AV_FAIL_ON_EXTERNAL_CLIENT_ERRORS "730")
set(FDB_AV_GET_TAG_THROTTLED_DURATION "730")
set(FDB_AV_FUTURE_GET_DOUBLE "730")