Resolve conflict when merge with master

This commit is contained in:
Meng Xu 2019-07-22 10:56:16 -07:00
commit 378db79441
156 changed files with 2644 additions and 1150 deletions

View File

@ -26,7 +26,7 @@ sys.path[:0] = [os.path.join(os.path.dirname(__file__), '..', '..', 'bindings',
import util
FDB_API_VERSION = 610
FDB_API_VERSION = 620
LOGGING = {
'version': 1,

View File

@ -158,7 +158,7 @@ def choose_api_version(selected_api_version, tester_min_version, tester_max_vers
api_version = min_version
elif random.random() < 0.9:
api_version = random.choice([v for v in [13, 14, 16, 21, 22, 23, 100, 200, 300, 400, 410, 420, 430,
440, 450, 460, 500, 510, 520, 600, 610] if v >= min_version and v <= max_version])
440, 450, 460, 500, 510, 520, 600, 610, 620] if v >= min_version and v <= max_version])
else:
api_version = random.randint(min_version, max_version)

View File

@ -20,7 +20,7 @@
import os
MAX_API_VERSION = 610
MAX_API_VERSION = 620
COMMON_TYPES = ['null', 'bytes', 'string', 'int', 'uuid', 'bool', 'float', 'double', 'tuple']
ALL_TYPES = COMMON_TYPES + ['versionstamp']

View File

@ -277,6 +277,12 @@ futures must apply the following rules to the result:
internal stack machine state as the last seen version. Pushes the byte
string "GOT_COMMITTED_VERSION" onto the stack.
#### GET_APPROXIMATE_SIZE
Calls get_approximate_size and pushes the byte string "GOT_APPROXIMATE_SIZE"
onto the stack. Note bindings may issue GET_RANGE calls with different
limits, so these bindings can obtain different sizes back.
#### WAIT_FUTURE
Pops the top item off the stack and pushes it back on. If the top item on

View File

@ -156,6 +156,7 @@ class ApiTest(Test):
resets = ['ON_ERROR', 'RESET', 'CANCEL']
read_conflicts = ['READ_CONFLICT_RANGE', 'READ_CONFLICT_KEY']
write_conflicts = ['WRITE_CONFLICT_RANGE', 'WRITE_CONFLICT_KEY', 'DISABLE_WRITE_CONFLICT']
txn_sizes = ['GET_APPROXIMATE_SIZE']
op_choices += reads
op_choices += mutations
@ -168,6 +169,7 @@ class ApiTest(Test):
op_choices += read_conflicts
op_choices += write_conflicts
op_choices += resets
op_choices += txn_sizes
idempotent_atomic_ops = [u'BIT_AND', u'BIT_OR', u'MAX', u'MIN', u'BYTE_MIN', u'BYTE_MAX']
atomic_ops = idempotent_atomic_ops + [u'ADD', u'BIT_XOR', u'APPEND_IF_FITS']
@ -434,6 +436,10 @@ class ApiTest(Test):
self.can_set_version = True
self.can_use_key_selectors = True
elif op == 'GET_APPROXIMATE_SIZE':
instructions.append(op)
self.add_strings(1)
elif op == 'TUPLE_PACK' or op == 'TUPLE_RANGE':
tup = self.random.random_tuple(10)
instructions.push_args(len(tup), *tup)

View File

@ -34,7 +34,7 @@ fdb.api_version(FDB_API_VERSION)
class ScriptedTest(Test):
TEST_API_VERSION = 610
TEST_API_VERSION = 620
def __init__(self, subspace):
super(ScriptedTest, self).__init__(subspace, ScriptedTest.TEST_API_VERSION, ScriptedTest.TEST_API_VERSION)

View File

@ -60,16 +60,20 @@ if(NOT WIN32)
if(OPEN_FOR_IDE)
add_library(fdb_c_performance_test OBJECT test/performance_test.c test/test.h)
add_library(fdb_c_ryw_benchmark OBJECT test/ryw_benchmark.c test/test.h)
add_library(fdb_c_txn_size_test OBJECT test/txn_size_test.c test/test.h)
add_library(mako OBJECT ${MAKO_SRCS})
else()
add_executable(fdb_c_performance_test test/performance_test.c test/test.h)
add_executable(fdb_c_ryw_benchmark test/ryw_benchmark.c test/test.h)
add_executable(fdb_c_txn_size_test test/txn_size_test.c test/test.h)
add_executable(mako ${MAKO_SRCS})
strip_debug_symbols(fdb_c_performance_test)
strip_debug_symbols(fdb_c_ryw_benchmark)
strip_debug_symbols(fdb_c_txn_size_test)
endif()
target_link_libraries(fdb_c_performance_test PRIVATE fdb_c)
target_link_libraries(fdb_c_ryw_benchmark PRIVATE fdb_c)
target_link_libraries(fdb_c_txn_size_test PRIVATE fdb_c)
# do not set RPATH for mako
set_property(TARGET mako PROPERTY SKIP_BUILD_RPATH TRUE)
target_link_libraries(mako PRIVATE fdb_c)

View File

@ -18,7 +18,7 @@
* limitations under the License.
*/
#define FDB_API_VERSION 610
#define FDB_API_VERSION 620
#define FDB_INCLUDE_LEGACY_TYPES
#include "fdbclient/MultiVersionTransaction.h"
@ -215,10 +215,15 @@ fdb_error_t fdb_future_get_error_v22( FDBFuture* f, const char** description ) {
}
extern "C" DLLEXPORT
fdb_error_t fdb_future_get_version( FDBFuture* f, int64_t* out_version ) {
fdb_error_t fdb_future_get_version_v619( FDBFuture* f, int64_t* out_version ) {
CATCH_AND_RETURN( *out_version = TSAV(Version, f)->get(); );
}
extern "C" DLLEXPORT
fdb_error_t fdb_future_get_int64( FDBFuture* f, int64_t* out_value ) {
CATCH_AND_RETURN( *out_value = TSAV(int64_t, f)->get(); );
}
extern "C" DLLEXPORT
fdb_error_t fdb_future_get_key( FDBFuture* f, uint8_t const** out_key,
int* out_key_length ) {
@ -584,6 +589,11 @@ fdb_error_t fdb_transaction_get_committed_version( FDBTransaction* tr,
*out_version = TXN(tr)->getCommittedVersion(); );
}
extern "C" DLLEXPORT
FDBFuture* fdb_transaction_get_approximate_size(FDBTransaction* tr) {
return (FDBFuture*)TXN(tr)->getApproximateSize().extractPtr();
}
extern "C" DLLEXPORT
FDBFuture* fdb_transaction_get_versionstamp( FDBTransaction* tr )
{
@ -670,6 +680,7 @@ fdb_error_t fdb_select_api_version_impl( int runtime_version, int header_version
// Versioned API changes -- descending order by version (new changes at top)
// FDB_API_CHANGED( function, ver ) means there is a new implementation as of ver, and a function function_(ver-1) is the old implementation
// FDB_API_REMOVED( function, ver ) means the function was removed as of ver, and function_(ver-1) is the old implementation
FDB_API_REMOVED( fdb_future_get_version, 620 );
FDB_API_REMOVED( fdb_create_cluster, 610 );
FDB_API_REMOVED( fdb_cluster_create_database, 610 );
FDB_API_REMOVED( fdb_cluster_set_option, 610 );

View File

@ -28,10 +28,10 @@
#endif
#if !defined(FDB_API_VERSION)
#error You must #define FDB_API_VERSION prior to including fdb_c.h (current version is 610)
#error You must #define FDB_API_VERSION prior to including fdb_c.h (current version is 620)
#elif FDB_API_VERSION < 13
#error API version no longer supported (upgrade to 13)
#elif FDB_API_VERSION > 610
#elif FDB_API_VERSION > 620
#error Requested API version requires a newer version of this header
#endif
@ -120,8 +120,13 @@ extern "C" {
fdb_future_get_error( FDBFuture* f );
#endif
#if FDB_API_VERSION < 620
DLLEXPORT WARN_UNUSED_RESULT fdb_error_t
fdb_future_get_version( FDBFuture* f, int64_t* out_version );
#endif
DLLEXPORT WARN_UNUSED_RESULT fdb_error_t
fdb_future_get_int64( FDBFuture* f, int64_t* out );
DLLEXPORT WARN_UNUSED_RESULT fdb_error_t
fdb_future_get_key( FDBFuture* f, uint8_t const** out_key,
@ -225,6 +230,13 @@ extern "C" {
fdb_transaction_get_committed_version( FDBTransaction* tr,
int64_t* out_version );
// This function intentionally returns an FDBFuture instead of an integer directly,
// so that calling this API can see the effect of previous mutations on the transaction.
// Specifically, mutations are applied asynchronously by the main thread. In order to
// see them, this call has to be serviced by the main thread too.
DLLEXPORT WARN_UNUSED_RESULT FDBFuture*
fdb_transaction_get_approximate_size(FDBTransaction* tr);
DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_transaction_get_versionstamp( FDBTransaction* tr );
DLLEXPORT WARN_UNUSED_RESULT FDBFuture*

View File

@ -290,13 +290,22 @@ int64_t run_op_getreadversion(FDBTransaction *transaction) {
} while (err && retry--);
if (err) {
fprintf(stderr, "ERROR: fdb_transaction_get_version: %s\n", fdb_get_error(err));
fprintf(stderr, "ERROR: fdb_transaction_get_read_version: %s\n", fdb_get_error(err));
return -1;
}
#if FDB_API_VERSION < 620
err = fdb_future_get_version(f, &rv);
#else
err = fdb_future_get_int64(f, &rv);
#endif
if (err) {
#if FDB_API_VERSION < 620
fprintf(stderr, "ERROR: fdb_future_get_version: %s\n", fdb_get_error(err));
#else
fprintf(stderr, "ERROR: fdb_future_get_int64: %s\n", fdb_get_error(err));
#endif
}
fdb_future_destroy(f);
return rv;

View File

@ -3,7 +3,7 @@
#pragma once
#ifndef FDB_API_VERSION
#define FDB_API_VERSION 610
#define FDB_API_VERSION 620
#endif
#include <foundationdb/fdb_c.h>

View File

@ -603,7 +603,7 @@ void runTests(struct ResultSet *rs) {
int main(int argc, char **argv) {
srand(time(NULL));
struct ResultSet *rs = newResultSet();
checkError(fdb_select_api_version(610), "select API version", rs);
checkError(fdb_select_api_version(620), "select API version", rs);
printf("Running performance test at client version: %s\n", fdb_get_client_version());
valueStr = (uint8_t*)malloc((sizeof(uint8_t))*valueSize);

View File

@ -224,7 +224,7 @@ void runTests(struct ResultSet *rs) {
checkError(fdb_future_block_until_ready(f), "block for read version", rs);
int64_t version;
checkError(fdb_future_get_version(f, &version), "get version", rs);
checkError(fdb_future_get_int64(f, &version), "get version", rs);
fdb_future_destroy(f);
insertData(tr);
@ -244,7 +244,7 @@ void runTests(struct ResultSet *rs) {
int main(int argc, char **argv) {
srand(time(NULL));
struct ResultSet *rs = newResultSet();
checkError(fdb_select_api_version(610), "select API version", rs);
checkError(fdb_select_api_version(620), "select API version", rs);
printf("Running RYW Benchmark test at client version: %s\n", fdb_get_client_version());
keys = generateKeys(numKeys, keySize);

View File

@ -29,7 +29,7 @@
#include <inttypes.h>
#ifndef FDB_API_VERSION
#define FDB_API_VERSION 610
#define FDB_API_VERSION 620
#endif
#include <foundationdb/fdb_c.h>

View File

@ -0,0 +1,110 @@
/*
* txn_size_test.c
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2019 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "test.h"
#include <assert.h>
#include <stdio.h>
#include <pthread.h>
#include <foundationdb/fdb_c.h>
#include <foundationdb/fdb_c_options.g.h>
pthread_t netThread;
const int numKeys = 100;
uint8_t** keys = NULL;
#define KEY_SIZE 16
#define VALUE_SIZE 100
uint8_t valueStr[VALUE_SIZE];
fdb_error_t getSize(struct ResultSet* rs, FDBTransaction* tr, int64_t* out_size) {
fdb_error_t e;
FDBFuture* future = fdb_transaction_get_approximate_size(tr);
e = maybeLogError(fdb_future_block_until_ready(future), "waiting for get future", rs);
if (e) {
fdb_future_destroy(future);
return e;
}
e = maybeLogError(fdb_future_get_int64(future, out_size), "getting future value", rs);
if (e) {
fdb_future_destroy(future);
return e;
}
fdb_future_destroy(future);
return 0;
}
void runTests(struct ResultSet *rs) {
int64_t sizes[numKeys];
int i = 0, j = 0;
FDBDatabase *db = openDatabase(rs, &netThread);
FDBTransaction *tr = NULL;
fdb_error_t e = fdb_database_create_transaction(db, &tr);
checkError(e, "create transaction", rs);
memset(sizes, 0, numKeys * sizeof(uint32_t));
fdb_transaction_set(tr, keys[i], KEY_SIZE, valueStr, VALUE_SIZE);
e = getSize(rs, tr, sizes + i);
checkError(e, "transaction get size", rs);
printf("size %d: %u\n", i, sizes[i]);
i++;
fdb_transaction_set(tr, keys[i], KEY_SIZE, valueStr, VALUE_SIZE);
e = getSize(rs, tr, sizes + i);
checkError(e, "transaction get size", rs);
printf("size %d: %u\n", i, sizes[i]);
i++;
fdb_transaction_clear(tr, keys[i], KEY_SIZE);
e = getSize(rs, tr, sizes + i);
checkError(e, "transaction get size", rs);
printf("size %d: %u\n", i, sizes[i]);
i++;
fdb_transaction_clear_range(tr, keys[i], KEY_SIZE, keys[i+1], KEY_SIZE);
e = getSize(rs, tr, sizes + i);
checkError(e, "transaction get size", rs);
printf("size %d: %u\n", i, sizes[i]);
i++;
for (j = 0; j + 1 < i; j++) {
assert(sizes[j] < sizes[j + 1]);
}
printf("Test passed!\n");
}
int main(int argc, char **argv) {
srand(time(NULL));
struct ResultSet *rs = newResultSet();
checkError(fdb_select_api_version(620), "select API version", rs);
printf("Running performance test at client version: %s\n", fdb_get_client_version());
keys = generateKeys(numKeys, KEY_SIZE);
runTests(rs);
freeResultSet(rs);
freeKeys(keys, numKeys);
return 0;
}

View File

@ -18,7 +18,7 @@
* limitations under the License.
*/
#define FDB_API_VERSION 610
#define FDB_API_VERSION 620
#include "foundationdb/fdb_c.h"
#undef DLLEXPORT
#include "workloads.h"
@ -258,7 +258,7 @@ struct SimpleWorkload : FDBWorkload {
insertsPerTx = context->getOption("insertsPerTx", 100ul);
opsPerTx = context->getOption("opsPerTx", 100ul);
runFor = context->getOption("runFor", 10.0);
auto err = fdb_select_api_version(610);
auto err = fdb_select_api_version(620);
if (err) {
context->trace(FDBSeverity::Info, "SelectAPIVersionFailed",
{ { "Error", std::string(fdb_get_error(err)) } });

View File

@ -20,12 +20,13 @@
#include "fdb_flow.h"
#include "flow/DeterministicRandom.h"
#include "flow/SystemMonitor.h"
#include <stdio.h>
#include <cinttypes>
#include "flow/DeterministicRandom.h"
#include "flow/SystemMonitor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
using namespace FDB;
THREAD_FUNC networkThread(void* fdb) {
@ -34,7 +35,7 @@ THREAD_FUNC networkThread(void* fdb) {
}
ACTOR Future<Void> _test() {
API *fdb = FDB::API::selectAPIVersion(610);
API *fdb = FDB::API::selectAPIVersion(620);
auto db = fdb->createDatabase();
state Reference<Transaction> tr = db->createTransaction();
@ -77,7 +78,7 @@ ACTOR Future<Void> _test() {
}
void fdb_flow_test() {
API *fdb = FDB::API::selectAPIVersion(610);
API *fdb = FDB::API::selectAPIVersion(620);
fdb->setupNetwork();
startThread(networkThread, fdb);
@ -147,6 +148,7 @@ namespace FDB {
void setOption(FDBTransactionOption option, Optional<StringRef> value = Optional<StringRef>()) override;
Future<int64_t> getApproximateSize() override;
Future<Void> onError(Error const& e) override;
void cancel() override;
@ -290,7 +292,7 @@ namespace FDB {
return backToFuture<Version>( fdb_transaction_get_read_version( tr ), [](Reference<CFuture> f){
Version value;
throw_on_error( fdb_future_get_version( f->f, &value ) );
throw_on_error( fdb_future_get_int64( f->f, &value ) );
return value;
} );
@ -408,6 +410,14 @@ namespace FDB {
}
}
Future<int64_t> TransactionImpl::getApproximateSize() {
return backToFuture<int64_t>(fdb_transaction_get_approximate_size(tr), [](Reference<CFuture> f) {
int64_t size = 0;
throw_on_error(fdb_future_get_int64(f->f, &size));
return size;
});
}
Future<Void> TransactionImpl::onError(Error const& e) {
return backToFuture< Void >( fdb_transaction_on_error( tr, e.code() ), [](Reference<CFuture> f) {
throw_on_error( fdb_future_get_error( f->f ) );
@ -422,4 +432,5 @@ namespace FDB {
void TransactionImpl::reset() {
fdb_transaction_reset( tr );
}
}
} // namespace FDB

View File

@ -23,7 +23,7 @@
#include <flow/flow.h>
#define FDB_API_VERSION 610
#define FDB_API_VERSION 620
#include <bindings/c/foundationdb/fdb_c.h>
#undef DLLEXPORT
@ -112,6 +112,7 @@ namespace FDB {
virtual Future<Void> commit() = 0;
virtual Version getCommittedVersion() = 0;
virtual Future<int64_t> getApproximateSize() = 0;
virtual Future<FDBStandalone<StringRef>> getVersionstamp() = 0;
};

View File

@ -19,6 +19,7 @@
*/
#include "Tester.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
using namespace FDB;

View File

@ -18,16 +18,18 @@
* limitations under the License.
*/
#include "fdbrpc/fdbrpc.h"
#include "flow/DeterministicRandom.h"
#include "bindings/flow/Tuple.h"
#include "bindings/flow/FDBLoanerTypes.h"
#include "Tester.actor.h"
#include <cinttypes>
#ifdef __linux__
#include <string.h>
#endif
#include "bindings/flow/Tuple.h"
#include "bindings/flow/FDBLoanerTypes.h"
#include "fdbrpc/fdbrpc.h"
#include "flow/DeterministicRandom.h"
#include "flow/actorcompiler.h" // This must be the last #include.
// Otherwise we have to type setupNetwork(), FDB::open(), etc.
using namespace FDB;
@ -292,7 +294,7 @@ ACTOR Future<Void> printFlowTesterStack(FlowTesterStack* stack) {
state int idx;
for (idx = stack->data.size() - 1; idx >= 0; --idx) {
Standalone<StringRef> value = wait(stack->data[idx].value);
// printf("==========stack item:%d, index:%d, value:%s\n", idx, stack->data[idx].index, printable(value).c_str());
// printf("==========stack item:%d, index:%d, value:%s\n", idx, stack->data[idx].index, value.printable().c_str());
}
return Void();
}
@ -703,6 +705,20 @@ struct GetCommittedVersionFunc : InstructionFunc {
const char* GetCommittedVersionFunc::name = "GET_COMMITTED_VERSION";
REGISTER_INSTRUCTION_FUNC(GetCommittedVersionFunc);
// GET_APPROXIMATE_SIZE
struct GetApproximateSizeFunc : InstructionFunc {
static const char* name;
ACTOR static Future<Void> call(Reference<FlowTesterData> data, Reference<InstructionData> instruction) {
int64_t _ = wait(instruction->tr->getApproximateSize());
(void) _; // disable unused variable warning
data->stack.pushTuple(LiteralStringRef("GOT_APPROXIMATE_SIZE"));
return Void();
}
};
const char* GetApproximateSizeFunc::name = "GET_APPROXIMATE_SIZE";
REGISTER_INSTRUCTION_FUNC(GetApproximateSizeFunc);
// GET_VERSIONSTAMP
struct GetVersionstampFunc : InstructionFunc {
static const char* name;
@ -1638,7 +1654,7 @@ ACTOR static Future<Void> doInstructions(Reference<FlowTesterData> data) {
op = op.substr(0, op.size() - 9);
// printf("[==========]%ld/%ld:%s:%s: isDatabase:%d, isSnapshot:%d, stack count:%ld\n",
// idx, data->instructions.size(), printable(StringRef(data->instructions[idx].key)).c_str(), printable(StringRef(data->instructions[idx].value)).c_str(),
// idx, data->instructions.size(), StringRef(data->instructions[idx].key).printable().c_str(), StringRef(data->instructions[idx].value).printable().c_str(),
// isDatabase, isSnapshot, data->stack.data.size());
//wait(printFlowTesterStack(&(data->stack)));
@ -1774,7 +1790,7 @@ ACTOR void _test_versionstamp() {
try {
g_network = newNet2(false);
API *fdb = FDB::API::selectAPIVersion(610);
API *fdb = FDB::API::selectAPIVersion(620);
fdb->setupNetwork();
startThread(networkThread, fdb);

View File

@ -9,7 +9,7 @@ This package requires:
- [Mono](http://www.mono-project.com/) (macOS or Linux) or [Visual Studio](https://www.visualstudio.com/) (Windows) (build-time only)
- FoundationDB C API 2.0.x-6.1.x (part of the [FoundationDB client packages](https://apple.github.io/foundationdb/downloads.html#c))
Use of this package requires the selection of a FoundationDB API version at runtime. This package currently supports FoundationDB API versions 200-610.
Use of this package requires the selection of a FoundationDB API version at runtime. This package currently supports FoundationDB API versions 200-620.
To install this package, you can run the "fdb-go-install.sh" script (for versions 5.0.x and greater):

View File

@ -590,6 +590,9 @@ func (sm *StackMachine) processInst(idx int, inst tuple.Tuple) {
panic(e)
}
sm.store(idx, []byte("GOT_COMMITTED_VERSION"))
case op == "GET_APPROXIMATE_SIZE":
_ = sm.currentTransaction().GetApproximateSize().MustGet()
sm.store(idx, []byte("GOT_APPROXIMATE_SIZE"))
case op == "GET_VERSIONSTAMP":
sm.store(idx, sm.currentTransaction().GetVersionstamp())
case op == "GET_KEY":

View File

@ -22,7 +22,7 @@
package fdb
// #define FDB_API_VERSION 610
// #define FDB_API_VERSION 620
// #include <foundationdb/fdb_c.h>
import "C"

View File

@ -22,7 +22,7 @@
package fdb
// #define FDB_API_VERSION 610
// #define FDB_API_VERSION 620
// #include <foundationdb/fdb_c.h>
import "C"

View File

@ -46,7 +46,7 @@ A basic interaction with the FoundationDB API is demonstrated below:
func main() {
// Different API versions may expose different runtime behaviors.
fdb.MustAPIVersion(610)
fdb.MustAPIVersion(620)
// Open the default database from the system cluster
db := fdb.MustOpenDefault()

View File

@ -22,7 +22,7 @@
package fdb
// #define FDB_API_VERSION 610
// #define FDB_API_VERSION 620
// #include <foundationdb/fdb_c.h>
import "C"

View File

@ -22,7 +22,7 @@
package fdb
// #define FDB_API_VERSION 610
// #define FDB_API_VERSION 620
// #include <foundationdb/fdb_c.h>
// #include <stdlib.h>
import "C"
@ -108,7 +108,7 @@ func (opt NetworkOptions) setOpt(code int, param []byte) error {
// library, an error will be returned. APIVersion must be called prior to any
// other functions in the fdb package.
//
// Currently, this package supports API versions 200 through 610.
// Currently, this package supports API versions 200 through 620.
//
// Warning: When using the multi-version client API, setting an API version that
// is not supported by a particular client library will prevent that client from
@ -116,7 +116,7 @@ func (opt NetworkOptions) setOpt(code int, param []byte) error {
// the API version of your application after upgrading your client until the
// cluster has also been upgraded.
func APIVersion(version int) error {
headerVersion := 610
headerVersion := 620
networkMutex.Lock()
defer networkMutex.Unlock()
@ -128,7 +128,7 @@ func APIVersion(version int) error {
return errAPIVersionAlreadySet
}
if version < 200 || version > 610 {
if version < 200 || version > 620 {
return errAPIVersionNotSupported
}

View File

@ -23,7 +23,7 @@
package fdb
// #cgo LDFLAGS: -lfdb_c -lm
// #define FDB_API_VERSION 610
// #define FDB_API_VERSION 620
// #include <foundationdb/fdb_c.h>
// #include <string.h>
//
@ -331,7 +331,7 @@ func (f *futureInt64) Get() (int64, error) {
f.BlockUntilReady()
var ver C.int64_t
if err := C.fdb_future_get_version(f.ptr, &ver); err != 0 {
if err := C.fdb_future_get_int64(f.ptr, &ver); err != 0 {
return 0, Error{int(err)}
}

View File

@ -22,7 +22,7 @@
package fdb
// #define FDB_API_VERSION 610
// #define FDB_API_VERSION 620
// #include <foundationdb/fdb_c.h>
import "C"

View File

@ -22,7 +22,7 @@
package fdb
// #define FDB_API_VERSION 610
// #define FDB_API_VERSION 620
// #include <foundationdb/fdb_c.h>
import "C"
@ -372,6 +372,16 @@ func (t Transaction) GetVersionstamp() FutureKey {
return &futureKey{future: newFuture(C.fdb_transaction_get_versionstamp(t.ptr))}
}
func (t *transaction) getApproximateSize() FutureInt64 {
return &futureInt64{
future: newFuture(C.fdb_transaction_get_approximate_size(t.ptr)),
}
}
func (t Transaction) GetApproximateSize() FutureInt64 {
return t.getApproximateSize()
}
// Reset rolls back a transaction, completely resetting it to its initial
// state. This is logically equivalent to destroying the transaction and
// creating a new one.

View File

@ -25,11 +25,11 @@ set(JAVA_BINDING_SRCS
src/main/com/apple/foundationdb/FDB.java
src/main/com/apple/foundationdb/FDBDatabase.java
src/main/com/apple/foundationdb/FDBTransaction.java
src/main/com/apple/foundationdb/FutureInt64.java
src/main/com/apple/foundationdb/FutureKey.java
src/main/com/apple/foundationdb/FutureResult.java
src/main/com/apple/foundationdb/FutureResults.java
src/main/com/apple/foundationdb/FutureStrings.java
src/main/com/apple/foundationdb/FutureVersion.java
src/main/com/apple/foundationdb/FutureVoid.java
src/main/com/apple/foundationdb/JNIUtil.java
src/main/com/apple/foundationdb/KeySelector.java
@ -141,9 +141,33 @@ target_include_directories(java_workloads PUBLIC ${JNI_INCLUDE_DIRS})
set(CMAKE_JAVA_COMPILE_FLAGS "-source" "1.8" "-target" "1.8")
set(CMAKE_JNI_TARGET TRUE)
set(JAR_VERSION "${FDB_MAJOR}.${FDB_MINOR}.${FDB_REVISION}")
# build a manifest file
set(JAR_VERSION "${FDB_MAJOR}.${FDB_MINOR}.${FDB_PATCH}")
string(TIMESTAMP BND_LAST_MODIFIED_SEC "%s")
set(MANIFEST_TEXT "Manifest-Version: 1.0
Bnd-LastModified: ${BND_LAST_MODIFIED_SEC}000
Build-Jdk: ${Java_VERSION_STRING}
Built-By: CMake ${CMAKE_VERSION}
Bundle-Description: FoundationDB Java bindings and API
Bundle-ManifestVersion: 2
Bundle-Name: fdb-java
Bundle-SymbolicName: com.apple.foundationdb
Bundle-Version: ${JAR_VERSION}
Created-By: CMake ${CMAKE_VERSION}
Implementation-Title: fdb-java
Implementation-Version: ${CURRENT_GIT_VERSION}
Specification-Title: FoundationDB Java bindings and API
Specification-Version: ${JAR_VERSION}
")
# write the manifest file
file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/resources/META-INF)
set(MANIFEST_FILE ${CMAKE_CURRENT_BINARY_DIR}/resources/META-INF/MANIFEST.MF)
file(WRITE ${MANIFEST_FILE} ${MANIFEST_TEXT})
add_jar(fdb-java ${JAVA_BINDING_SRCS} ${GENERATED_JAVA_FILES} ${CMAKE_SOURCE_DIR}/LICENSE
OUTPUT_DIR ${PROJECT_BINARY_DIR}/lib VERSION ${CMAKE_PROJECT_VERSION})
OUTPUT_DIR ${PROJECT_BINARY_DIR}/lib VERSION ${CMAKE_PROJECT_VERSION} MANIFEST ${MANIFEST_FILE})
add_dependencies(fdb-java fdb_java_options fdb_java)
add_jar(foundationdb-tests SOURCES ${JAVA_TESTS_SRCS} INCLUDE_JARS fdb-java)
add_dependencies(foundationdb-tests fdb_java_options)
@ -201,7 +225,7 @@ if(NOT OPEN_FOR_IDE)
add_dependencies(copy_lib unpack_jar)
set(target_jar ${jar_destination}/fdb-java-${CMAKE_PROJECT_VERSION}.jar)
add_custom_command(OUTPUT ${target_jar}
COMMAND ${Java_JAR_EXECUTABLE} cf ${target_jar} .
COMMAND ${Java_JAR_EXECUTABLE} cfm ${target_jar} ${unpack_dir}/META-INF/MANIFEST.MF .
WORKING_DIRECTORY ${unpack_dir}
DEPENDS ${CMAKE_CURRENT_BINARY_DIR}/lib_copied
COMMENT "Build ${jar_destination}/fdb-java-${CMAKE_PROJECT_VERSION}.jar")

View File

@ -19,7 +19,7 @@
*/
#include <foundationdb/ClientWorkload.h>
#define FDB_API_VERSION 610
#define FDB_API_VERSION 620
#include <foundationdb/fdb_c.h>
#include <jni.h>
@ -370,7 +370,7 @@ struct JVM {
jmethodID selectMethod =
env->GetStaticMethodID(fdbClass, "selectAPIVersion", "(IZ)Lcom/apple/foundationdb/FDB;");
checkException();
env->CallStaticObjectMethod(fdbClass, selectMethod, jint(610), jboolean(false));
env->CallStaticObjectMethod(fdbClass, selectMethod, jint(620), jboolean(false));
checkException();
}

View File

@ -21,7 +21,7 @@
#include <jni.h>
#include <string.h>
#define FDB_API_VERSION 610
#define FDB_API_VERSION 620
#include <foundationdb/fdb_c.h>
@ -243,21 +243,21 @@ JNIEXPORT void JNICALL Java_com_apple_foundationdb_NativeFuture_Future_1releaseM
fdb_future_release_memory(var);
}
JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FutureVersion_FutureVersion_1get(JNIEnv *jenv, jobject, jlong future) {
if( !future ) {
JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FutureInt64_FutureInt64_1get(JNIEnv *jenv, jobject, jlong future) {
if (!future) {
throwParamNotNull(jenv);
return 0;
}
FDBFuture *f = (FDBFuture *)future;
int64_t version = 0;
fdb_error_t err = fdb_future_get_version(f, &version);
if( err ) {
safeThrow( jenv, getThrowable( jenv, err ) );
int64_t value = 0;
fdb_error_t err = fdb_future_get_int64(f, &value);
if (err) {
safeThrow(jenv, getThrowable(jenv, err));
return 0;
}
return (jlong)version;
return (jlong)value;
}
JNIEXPORT jobject JNICALL Java_com_apple_foundationdb_FutureStrings_FutureStrings_1get(JNIEnv *jenv, jobject, jlong future) {
@ -805,6 +805,15 @@ JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FDBTransaction_Transaction_1
return (jlong)version;
}
JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FDBTransaction_Transaction_1getApproximateSize(JNIEnv *jenv, jobject, jlong tPtr) {
if (!tPtr) {
throwParamNotNull(jenv);
return 0;
}
FDBFuture* f = fdb_transaction_get_approximate_size((FDBTransaction*)tPtr);
return (jlong)f;
}
JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FDBTransaction_Transaction_1getVersionstamp(JNIEnv *jenv, jobject, jlong tPtr) {
if (!tPtr) {
throwParamNotNull(jenv);

View File

@ -35,7 +35,7 @@ import java.util.concurrent.atomic.AtomicInteger;
* This call is required before using any other part of the API. The call allows
* an error to be thrown at this point to prevent client code from accessing a later library
* with incorrect assumptions from the current version. The API version documented here is version
* {@code 610}.<br><br>
* {@code 620}.<br><br>
* FoundationDB encapsulates multiple versions of its interface by requiring
* the client to explicitly specify the version of the API it uses. The purpose
* of this design is to allow you to upgrade the server, client libraries, or
@ -193,8 +193,8 @@ public class FDB {
}
if(version < 510)
throw new IllegalArgumentException("API version not supported (minimum 510)");
if(version > 610)
throw new IllegalArgumentException("API version not supported (maximum 610)");
if(version > 620)
throw new IllegalArgumentException("API version not supported (maximum 620)");
Select_API_version(version);
FDB fdb = new FDB(version, controlRuntime);

View File

@ -216,7 +216,7 @@ class FDBTransaction extends NativeObjectWrapper implements Transaction, OptionC
public CompletableFuture<Long> getReadVersion() {
pointerReadLock.lock();
try {
return new FutureVersion(Transaction_getReadVersion(getPtr()), executor);
return new FutureInt64(Transaction_getReadVersion(getPtr()), executor);
} finally {
pointerReadLock.unlock();
}
@ -514,6 +514,16 @@ class FDBTransaction extends NativeObjectWrapper implements Transaction, OptionC
}
}
@Override
public CompletableFuture<Long> getApproximateSize() {
pointerReadLock.lock();
try {
return new FutureInt64(Transaction_getApproximateSize(getPtr()), executor);
} finally {
pointerReadLock.unlock();
}
}
@Override
public CompletableFuture<Void> watch(byte[] key) throws FDBException {
pointerReadLock.lock();
@ -642,6 +652,7 @@ class FDBTransaction extends NativeObjectWrapper implements Transaction, OptionC
private native long Transaction_commit(long cPtr);
private native long Transaction_getCommittedVersion(long cPtr);
private native long Transaction_getVersionstamp(long cPtr);
private native long Transaction_getApproximateSize(long cPtr);
private native long Transaction_onError(long cPtr, int errorCode);
private native void Transaction_dispose(long cPtr);
private native void Transaction_reset(long cPtr);

View File

@ -1,9 +1,9 @@
/*
* FutureVersion.java
* FutureInt64.java
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
* Copyright 2013-2019 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -22,16 +22,16 @@ package com.apple.foundationdb;
import java.util.concurrent.Executor;
class FutureVersion extends NativeFuture<Long> {
FutureVersion(long cPtr, Executor executor) {
class FutureInt64 extends NativeFuture<Long> {
FutureInt64(long cPtr, Executor executor) {
super(cPtr);
registerMarshalCallback(executor);
}
@Override
protected Long getIfDone_internal(long cPtr) throws FDBException {
return FutureVersion_get(cPtr);
return FutureInt64_get(cPtr);
}
private native long FutureVersion_get(long cPtr) throws FDBException;
private native long FutureInt64_get(long cPtr) throws FDBException;
}

View File

@ -260,6 +260,15 @@ public interface Transaction extends AutoCloseable, ReadTransaction, Transaction
*/
CompletableFuture<byte[]> getVersionstamp();
/**
* Returns a future that will contain the approximated size of the commit, which is the
* summation of mutations, read conflict ranges, and write conflict ranges. This can be
* called multiple times before transaction commit.
*
* @return a future that will contain the approximated size of the commit.
*/
CompletableFuture<Long> getApproximateSize();
/**
* Resets a transaction and returns a delayed signal for error recovery. If the error
* encountered by the {@code Transaction} could not be recovered from, the returned

View File

@ -13,7 +13,7 @@ and then added to your classpath.<br>
<h3>Getting started</h3>
To start using FoundationDB from Java, create an instance of the
{@link com.apple.foundationdb.FDB FoundationDB API interface} with the version of the
API that you want to use (this release of the FoundationDB Java API supports versions between {@code 510} and {@code 610}).
API that you want to use (this release of the FoundationDB Java API supports versions between {@code 510} and {@code 620}).
With this API object you can then open {@link com.apple.foundationdb.Cluster Cluster}s and
{@link com.apple.foundationdb.Database Database}s and start using
{@link com.apple.foundationdb.Transaction Transaction}s.
@ -29,7 +29,7 @@ import com.apple.foundationdb.tuple.Tuple;
public class Example {
public static void main(String[] args) {
FDB fdb = FDB.selectAPIVersion(610);
FDB fdb = FDB.selectAPIVersion(620);
try(Database db = fdb.open()) {
// Run an operation on the database

View File

@ -27,7 +27,7 @@ import com.apple.foundationdb.Database;
import com.apple.foundationdb.FDB;
public abstract class AbstractTester {
public static final int API_VERSION = 610;
public static final int API_VERSION = 620;
protected static final int NUM_RUNS = 25;
protected static final Charset ASCII = Charset.forName("ASCII");

View File

@ -282,6 +282,11 @@ public class AsyncStackTester {
return AsyncUtil.DONE;
}
else if(op == StackOperation.GET_APPROXIMATE_SIZE) {
return inst.tr.getApproximateSize().thenAcceptAsync(size -> {
inst.push("GOT_APPROXIMATE_SIZE".getBytes());
}, FDB.DEFAULT_EXECUTOR);
}
else if(op == StackOperation.GET_VERSIONSTAMP) {
try {
inst.push(inst.tr.getVersionstamp());

View File

@ -33,7 +33,7 @@ public class BlockingBenchmark {
private static final int PARALLEL = 100;
public static void main(String[] args) throws InterruptedException {
FDB fdb = FDB.selectAPIVersion(610);
FDB fdb = FDB.selectAPIVersion(620);
// The cluster file DOES NOT need to be valid, although it must exist.
// This is because the database is never really contacted in this test.

View File

@ -48,7 +48,7 @@ public class ConcurrentGetSetGet {
}
public static void main(String[] args) {
try(Database database = FDB.selectAPIVersion(610).open()) {
try(Database database = FDB.selectAPIVersion(620).open()) {
new ConcurrentGetSetGet().apply(database);
}
}

View File

@ -33,7 +33,7 @@ import com.apple.foundationdb.directory.DirectorySubspace;
public class DirectoryTest {
public static void main(String[] args) throws Exception {
try {
FDB fdb = FDB.selectAPIVersion(610);
FDB fdb = FDB.selectAPIVersion(620);
try(Database db = fdb.open()) {
runTests(db);
}

View File

@ -26,7 +26,7 @@ import com.apple.foundationdb.tuple.Tuple;
public class Example {
public static void main(String[] args) {
FDB fdb = FDB.selectAPIVersion(610);
FDB fdb = FDB.selectAPIVersion(620);
try(Database db = fdb.open()) {
// Run an operation on the database

View File

@ -31,7 +31,7 @@ public class IterableTest {
public static void main(String[] args) throws InterruptedException {
final int reps = 1000;
try {
FDB fdb = FDB.selectAPIVersion(610);
FDB fdb = FDB.selectAPIVersion(620);
try(Database db = fdb.open()) {
runTests(reps, db);
}

View File

@ -34,7 +34,7 @@ import com.apple.foundationdb.tuple.ByteArrayUtil;
public class LocalityTests {
public static void main(String[] args) {
FDB fdb = FDB.selectAPIVersion(610);
FDB fdb = FDB.selectAPIVersion(620);
try(Database database = fdb.open(args[0])) {
try(Transaction tr = database.createTransaction()) {
String[] keyAddresses = LocalityUtil.getAddressesForKey(tr, "a".getBytes()).join();

View File

@ -43,7 +43,7 @@ public class ParallelRandomScan {
private static final int PARALLELISM_STEP = 5;
public static void main(String[] args) throws InterruptedException {
FDB api = FDB.selectAPIVersion(610);
FDB api = FDB.selectAPIVersion(620);
try(Database database = api.open(args[0])) {
for(int i = PARALLELISM_MIN; i <= PARALLELISM_MAX; i += PARALLELISM_STEP) {
runTest(database, i, ROWS, DURATION_MS);

View File

@ -34,7 +34,7 @@ import com.apple.foundationdb.Transaction;
import com.apple.foundationdb.async.AsyncIterable;
public class RangeTest {
private static final int API_VERSION = 610;
private static final int API_VERSION = 620;
public static void main(String[] args) {
System.out.println("About to use version " + API_VERSION);

View File

@ -34,7 +34,7 @@ public class SerialInsertion {
private static final int NODES = 1000000;
public static void main(String[] args) {
FDB api = FDB.selectAPIVersion(610);
FDB api = FDB.selectAPIVersion(620);
try(Database database = api.open()) {
long start = System.currentTimeMillis();

View File

@ -39,7 +39,7 @@ public class SerialIteration {
private static final int THREAD_COUNT = 1;
public static void main(String[] args) throws InterruptedException {
FDB api = FDB.selectAPIVersion(610);
FDB api = FDB.selectAPIVersion(620);
try(Database database = api.open(args[0])) {
for(int i = 1; i <= THREAD_COUNT; i++) {
runThreadedTest(database, i);

View File

@ -30,7 +30,7 @@ public class SerialTest {
public static void main(String[] args) throws InterruptedException {
final int reps = 1000;
try {
FDB fdb = FDB.selectAPIVersion(610);
FDB fdb = FDB.selectAPIVersion(620);
try(Database db = fdb.open()) {
runTests(reps, db);
}

View File

@ -39,7 +39,7 @@ public class SnapshotTransactionTest {
private static final Subspace SUBSPACE = new Subspace(Tuple.from("test", "conflict_ranges"));
public static void main(String[] args) {
FDB fdb = FDB.selectAPIVersion(610);
FDB fdb = FDB.selectAPIVersion(620);
try(Database db = fdb.open()) {
snapshotReadShouldNotConflict(db);
snapshotShouldNotAddConflictRange(db);

View File

@ -54,6 +54,7 @@ enum StackOperation {
GET_KEY,
GET_READ_VERSION,
GET_COMMITTED_VERSION,
GET_APPROXIMATE_SIZE,
GET_VERSIONSTAMP,
SET_READ_VERSION,
ON_ERROR,

View File

@ -261,6 +261,10 @@ public class StackTester {
inst.context.lastVersion = inst.tr.getCommittedVersion();
inst.push("GOT_COMMITTED_VERSION".getBytes());
}
else if(op == StackOperation.GET_APPROXIMATE_SIZE) {
Long size = inst.tr.getApproximateSize().join();
inst.push("GOT_APPROXIMATE_SIZE".getBytes());
}
else if(op == StackOperation.GET_VERSIONSTAMP) {
inst.push(inst.tr.getVersionstamp());
}

View File

@ -50,7 +50,7 @@ public class TupleTest {
public static void main(String[] args) throws NoSuchFieldException {
final int reps = 1000;
try {
FDB fdb = FDB.selectAPIVersion(610);
FDB fdb = FDB.selectAPIVersion(620);
addMethods();
comparisons();
emptyTuple();

View File

@ -32,7 +32,7 @@ import com.apple.foundationdb.tuple.Versionstamp;
public class VersionstampSmokeTest {
public static void main(String[] args) {
FDB fdb = FDB.selectAPIVersion(610);
FDB fdb = FDB.selectAPIVersion(620);
try(Database db = fdb.open()) {
db.run(tr -> {
tr.clear(Tuple.from("prefix").range());

View File

@ -34,7 +34,7 @@ import com.apple.foundationdb.Transaction;
public class WatchTest {
public static void main(String[] args) {
FDB fdb = FDB.selectAPIVersion(610);
FDB fdb = FDB.selectAPIVersion(620);
try(Database database = fdb.open(args[0])) {
database.options().setLocationCacheSize(42);
try(Transaction tr = database.createTransaction()) {

View File

@ -52,7 +52,7 @@ def get_api_version():
def api_version(ver):
header_version = 610
header_version = 620
if '_version' in globals():
if globals()['_version'] != ver:

View File

@ -405,7 +405,7 @@ class TransactionRead(_FDBBase):
def get_read_version(self):
"""Get the read version of the transaction."""
return FutureVersion(self.capi.fdb_transaction_get_read_version(self.tpointer))
return FutureInt64(self.capi.fdb_transaction_get_read_version(self.tpointer))
def get(self, key):
key = keyToBytes(key)
@ -541,6 +541,10 @@ class Transaction(TransactionRead):
self.capi.fdb_transaction_get_committed_version(self.tpointer, ctypes.byref(version))
return version.value
def get_approximate_size(self):
"""Get the approximate commit size of the transaction."""
return FutureInt64(self.capi.fdb_transaction_get_approximate_size(self.tpointer))
def get_versionstamp(self):
return Key(self.capi.fdb_transaction_get_versionstamp(self.tpointer))
@ -687,12 +691,12 @@ class FutureVoid(Future):
return None
class FutureVersion(Future):
class FutureInt64(Future):
def wait(self):
self.block_until_ready()
version = ctypes.c_int64()
self.capi.fdb_future_get_version(self.fpointer, ctypes.byref(version))
return version.value
value = ctypes.c_int64()
self.capi.fdb_future_get_int64(self.fpointer, ctypes.byref(value))
return value.value
class FutureKeyValueArray(Future):
@ -1359,9 +1363,9 @@ def init_c_api():
_capi.fdb_future_get_error.restype = int
_capi.fdb_future_get_error.errcheck = check_error_code
_capi.fdb_future_get_version.argtypes = [ctypes.c_void_p, ctypes.POINTER(ctypes.c_int64)]
_capi.fdb_future_get_version.restype = ctypes.c_int
_capi.fdb_future_get_version.errcheck = check_error_code
_capi.fdb_future_get_int64.argtypes = [ctypes.c_void_p, ctypes.POINTER(ctypes.c_int64)]
_capi.fdb_future_get_int64.restype = ctypes.c_int
_capi.fdb_future_get_int64.errcheck = check_error_code
_capi.fdb_future_get_key.argtypes = [ctypes.c_void_p, ctypes.POINTER(ctypes.POINTER(ctypes.c_byte)),
ctypes.POINTER(ctypes.c_int)]
@ -1453,6 +1457,9 @@ def init_c_api():
_capi.fdb_transaction_get_committed_version.restype = ctypes.c_int
_capi.fdb_transaction_get_committed_version.errcheck = check_error_code
_capi.fdb_transaction_get_approximate_size.argtypes = [ctypes.c_void_p]
_capi.fdb_transaction_get_approximate_size.restype = ctypes.c_void_p
_capi.fdb_transaction_get_versionstamp.argtypes = [ctypes.c_void_p]
_capi.fdb_transaction_get_versionstamp.restype = ctypes.c_void_p

View File

@ -22,7 +22,7 @@ import fdb
import sys
if __name__ == '__main__':
fdb.api_version(610)
fdb.api_version(620)
@fdb.transactional
def setValue(tr, key, value):
@ -34,8 +34,6 @@ def setValueWithLimit(tr, key, value, limit):
tr[key] = value
def test_size_limit_option(db):
db.options.set_transaction_timeout(2000) # 2 seconds
db.options.set_transaction_retry_limit(3)
value = b'a' * 1024
setValue(db, b't1', value)
@ -68,9 +66,33 @@ def test_size_limit_option(db):
except fdb.FDBError as e:
assert(e.code == 2101) # Transaction exceeds byte limit (2101)
@fdb.transactional
def test_get_approximate_size(tr):
tr[b'key1'] = b'value1'
s1 = tr.get_approximate_size().wait()
tr[b'key2'] = b'value2'
s2 = tr.get_approximate_size().wait()
assert(s1 < s2)
tr.clear(b'key3')
s3 = tr.get_approximate_size().wait()
assert(s2 < s3)
tr.add_read_conflict_key(b'key3+')
s4 = tr.get_approximate_size().wait()
assert(s3 < s4)
tr.add_write_conflict_key(b'key4')
s5 = tr.get_approximate_size().wait()
assert(s4 < s5)
# Expect a cluster file as input. This test will write to the FDB cluster, so
# be aware of potential side effects.
if __name__ == '__main__':
clusterFile = sys.argv[1]
db = fdb.open(clusterFile)
db.options.set_transaction_timeout(2000) # 2 seconds
db.options.set_transaction_retry_limit(3)
test_size_limit_option(db)
test_get_approximate_size(db)

View File

@ -48,7 +48,7 @@ from cancellation_timeout_tests import test_retry_limits
from cancellation_timeout_tests import test_db_retry_limits
from cancellation_timeout_tests import test_combinations
from size_limit_tests import test_size_limit_option
from size_limit_tests import test_size_limit_option, test_get_approximate_size
random.seed(0)
@ -478,6 +478,9 @@ class Tester:
elif inst.op == six.u("GET_COMMITTED_VERSION"):
self.last_version = inst.tr.get_committed_version()
inst.push(b"GOT_COMMITTED_VERSION")
elif inst.op == six.u("GET_APPROXIMATE_SIZE"):
approximate_size = inst.tr.get_approximate_size().wait()
inst.push(b"GOT_APPROXIMATE_SIZE")
elif inst.op == six.u("GET_VERSIONSTAMP"):
inst.push(inst.tr.get_versionstamp())
elif inst.op == six.u("TUPLE_PACK"):
@ -561,6 +564,7 @@ class Tester:
test_predicates()
test_size_limit_option(db)
test_get_approximate_size(db)
except fdb.FDBError as e:
print("Unit tests failed: %s" % e.description)

View File

@ -36,7 +36,7 @@ module FDB
end
end
def self.api_version(version)
header_version = 610
header_version = 620
if self.is_api_version_selected?()
if @@chosen_version != version
raise "FDB API already loaded at version #{@@chosen_version}."

View File

@ -85,7 +85,7 @@ module FDB
attach_function :fdb_future_set_callback, [ :pointer, :fdb_future_callback, :pointer ], :fdb_error
attach_function :fdb_future_get_error, [ :pointer ], :fdb_error
attach_function :fdb_future_get_version, [ :pointer, :pointer ], :fdb_error
attach_function :fdb_future_get_int64, [ :pointer, :pointer ], :fdb_error
attach_function :fdb_future_get_key, [ :pointer, :pointer, :pointer ], :fdb_error
attach_function :fdb_future_get_value, [ :pointer, :pointer, :pointer, :pointer ], :fdb_error
attach_function :fdb_future_get_keyvalue_array, [ :pointer, :pointer, :pointer, :pointer ], :fdb_error
@ -114,6 +114,7 @@ module FDB
attach_function :fdb_transaction_watch, [ :pointer, :pointer, :int ], :pointer
attach_function :fdb_transaction_commit, [ :pointer ], :pointer
attach_function :fdb_transaction_get_committed_version, [ :pointer, :pointer ], :fdb_error
attach_function :fdb_transaction_get_approximate_size, [ :pointer ], :pointer
attach_function :fdb_transaction_get_versionstamp, [ :pointer ], :pointer
attach_function :fdb_transaction_on_error, [ :pointer, :fdb_error ], :pointer
attach_function :fdb_transaction_reset, [ :pointer ], :void
@ -443,11 +444,11 @@ module FDB
end
end
class Version < LazyFuture
class Int64Future < LazyFuture
def getter
version = FFI::MemoryPointer.new :int64
FDBC.check_error FDBC.fdb_future_get_version(@fpointer, version)
@value = version.read_long_long
val = FFI::MemoryPointer.new :int64
FDBC.check_error FDBC.fdb_future_get_int64(@fpointer, val)
@value = val.read_long_long
end
private :getter
end
@ -687,7 +688,7 @@ module FDB
end
def get_read_version
Version.new(FDBC.fdb_transaction_get_read_version @tpointer)
Int64Future.new(FDBC.fdb_transaction_get_read_version @tpointer)
end
def get(key)
@ -904,6 +905,10 @@ module FDB
version.read_long_long
end
def get_approximate_size
Int64Future.new(FDBC.fdb_transaction_get_approximate_size @tpointer)
end
def get_versionstamp
Key.new(FDBC.fdb_transaction_get_versionstamp(@tpointer))
end

View File

@ -381,6 +381,9 @@ class Tester
when "GET_COMMITTED_VERSION"
@last_version = inst.tr.get_committed_version
inst.push("GOT_COMMITTED_VERSION")
when "GET_APPROXIMATE_SIZE"
size = inst.tr.get_approximate_size.to_i
inst.push("GOT_APPROXIMATE_SIZE")
when "GET_VERSIONSTAMP"
inst.push(inst.tr.get_versionstamp)
when "TUPLE_PACK"

View File

@ -1,7 +1,7 @@
#define FDB_API_VERSION 610
#define FDB_API_VERSION 620
#include <foundationdb/fdb_c.h>
int main(int argc, char* argv[]) {
fdb_select_api_version(610);
fdb_select_api_version(620);
return 0;
}

View File

@ -65,7 +65,7 @@ then
python setup.py install
successOr "Installing python bindings failed"
popd
python -c 'import fdb; fdb.api_version(610)'
python -c 'import fdb; fdb.api_version(620)'
successOr "Loading python bindings failed"
# Test cmake and pkg-config integration: https://github.com/apple/foundationdb/issues/1483

View File

@ -106,6 +106,10 @@ function(add_fdb_test)
if (ENABLE_BUGGIFY)
set(BUGGIFY_OPTION "-B")
endif()
set(VALGRIND_OPTION "")
if (USE_VALGRIND)
set(VALGRIND_OPTION "--use-valgrind")
endif()
list(TRANSFORM ADD_FDB_TEST_TEST_FILES PREPEND "${CMAKE_CURRENT_SOURCE_DIR}/")
add_test(NAME ${test_name}
COMMAND $<TARGET_FILE:Python::Interpreter> ${TestRunner}
@ -120,6 +124,7 @@ function(add_fdb_test)
--seed ${SEED}
--test-number ${assigned_id}
${BUGGIFY_OPTION}
${VALGRIND_OPTION}
${ADD_FDB_TEST_TEST_FILES}
WORKING_DIRECTORY ${PROJECT_BINARY_DIR})
get_filename_component(test_dir_full ${first_file} DIRECTORY)

View File

@ -1,5 +1,4 @@
set(USE_GPERFTOOLS OFF CACHE BOOL "Use gperfools for profiling")
set(PORTABLE_BINARY OFF CACHE BOOL "Create a binary that runs on older OS versions")
set(USE_VALGRIND OFF CACHE BOOL "Compile for valgrind usage")
set(ALLOC_INSTRUMENTATION OFF CACHE BOOL "Instrument alloc")
set(WITH_UNDODB OFF CACHE BOOL "Use rr or undodb")
@ -9,6 +8,7 @@ set(USE_LD "LD" CACHE STRING "The linker to use for building: can be LD (system
set(USE_LIBCXX OFF CACHE BOOL "Use libc++")
set(USE_CCACHE OFF CACHE BOOL "Use ccache for compilation if available")
set(RELATIVE_DEBUG_PATHS OFF CACHE BOOL "Use relative file paths in debug info")
set(STATIC_LINK_LIBCXX ON CACHE BOOL "Statically link libstdcpp/libc++")
set(rel_debug_paths OFF)
if(RELATIVE_DEBUG_PATHS)
@ -131,6 +131,11 @@ else()
set(CMAKE_SHARED_LINKER_FLAGS "-static-libstdc++ -static-libgcc ${CMAKE_SHARED_LINKER_FLAGS}")
set(CMAKE_EXE_LINKER_FLAGS "-static-libstdc++ -static-libgcc ${CMAKE_EXE_LINKER_FLAGS}")
endif()
if(STATIC_LINK_LIBCXX)
if (NOT USE_LIBCXX AND NOT APPLE)
add_link_options(-static-libstdc++ -static-libgcc)
endif()
endif()
# Instruction sets we require to be supported by the CPU
add_compile_options(
-maes

View File

@ -131,7 +131,7 @@ API versioning
Prior to including ``fdb_c.h``, you must define the ``FDB_API_VERSION`` macro. This, together with the :func:`fdb_select_api_version()` function, allows programs written against an older version of the API to compile and run with newer versions of the C library. The current version of the FoundationDB C API is |api-version|. ::
#define FDB_API_VERSION 610
#define FDB_API_VERSION 620
#include <foundationdb/fdb_c.h>
.. function:: fdb_error_t fdb_select_api_version(int version)
@ -291,9 +291,9 @@ See :ref:`developer-guide-programming-with-futures` for further (language-indepe
|future-get-return1|.
.. function:: fdb_error_t fdb_future_get_version(FDBFuture* future, int64_t* out_version)
.. function:: fdb_error_t fdb_future_get_int64(FDBFuture* future, int64_t* out)
Extracts a version from an :type:`FDBFuture` into a caller-provided variable of type ``int64_t``. |future-warning|
Extracts a 64-bit integer from an :type:`FDBFuture*` into a caller-provided variable of type ``int64_t``. |future-warning|
|future-get-return1| |future-get-return2|.
@ -447,7 +447,7 @@ Applications must provide error handling and an appropriate retry loop around th
.. function:: FDBFuture* fdb_transaction_get_read_version(FDBTransaction* transaction)
|future-return0| the transaction snapshot read version. |future-return1| call :func:`fdb_future_get_version()` to extract the version into an int64_t that you provide, |future-return2|
|future-return0| the transaction snapshot read version. |future-return1| call :func:`fdb_future_get_int64()` to extract the version into an int64_t that you provide, |future-return2|
The transaction obtains a snapshot read version automatically at the time of the first call to :func:`fdb_transaction_get_*()` (including this one) and (unless causal consistency has been deliberately compromised by transaction options) is guaranteed to represent all transactions which were reported committed before that call.
@ -720,6 +720,12 @@ Applications must provide error handling and an appropriate retry loop around th
Most applications will not call this function.
.. function:: FDBFuture* fdb_transaction_get_approximate_size(FDBTransaction* transaction)
|future-return0| the approximate transaction size so far in the returned future, which is the summation of the estimated size of mutations, read conflict ranges, and write conflict ranges. |future-return1| call :func:`fdb_future_get_int64()` to extract the size, |future-return2|
This can be called multiple times before the transaction is committed.
.. function:: FDBFuture* fdb_transaction_get_versionstamp(FDBTransaction* transaction)
|future-return0| the versionstamp which was used by any versionstamp operations in this transaction. |future-return1| call :func:`fdb_future_get_key()` to extract the key, |future-return2|

View File

@ -147,7 +147,7 @@
.. |atomic-versionstamps-tuple-warning-value| replace::
At this time, versionstamped values are not compatible with the Tuple layer except in Java, Python, and Go. Note that this implies versionstamped values may not be used with the Subspace and Directory layers except in those languages.
.. |api-version| replace:: 610
.. |api-version| replace:: 620
.. |streaming-mode-blurb1| replace::
When using |get-range-func| and similar interfaces, API clients can request large ranges of the database to iterate over. Making such a request doesn't necessarily mean that the client will consume all of the data in the range - sometimes the client doesn't know how far it intends to iterate in advance. FoundationDB tries to balance latency and bandwidth by requesting data for iteration in batches.

View File

@ -102,7 +102,7 @@ Opening a database
After importing the ``fdb`` module and selecting an API version, you probably want to open a :class:`Database` using :func:`open`::
import fdb
fdb.api_version(610)
fdb.api_version(620)
db = fdb.open()
.. function:: open( cluster_file=None, event_model=None )

View File

@ -91,7 +91,7 @@ Opening a database
After requiring the ``FDB`` gem and selecting an API version, you probably want to open a :class:`Database` using :func:`open`::
require 'fdb'
FDB.api_version 610
FDB.api_version 620
db = FDB.open
.. function:: open( cluster_file=nil ) -> Database

View File

@ -29,7 +29,7 @@ Before using the API, we need to specify the API version. This allows programs t
.. code-block:: go
fdb.MustAPIVersion(610)
fdb.MustAPIVersion(620)
Next, we open a FoundationDB database. The API will connect to the FoundationDB cluster indicated by the :ref:`default cluster file <default-cluster-file>`.
@ -78,7 +78,7 @@ If this is all working, it looks like we are ready to start building a real appl
func main() {
// Different API versions may expose different runtime behaviors.
fdb.MustAPIVersion(610)
fdb.MustAPIVersion(620)
// Open the default database from the system cluster
db := fdb.MustOpenDefault()
@ -666,7 +666,7 @@ Here's the code for the scheduling tutorial:
}
func main() {
fdb.MustAPIVersion(610)
fdb.MustAPIVersion(620)
db := fdb.MustOpenDefault()
db.Options().SetTransactionTimeout(60000) // 60,000 ms = 1 minute
db.Options().SetTransactionRetryLimit(100)

View File

@ -30,7 +30,7 @@ Before using the API, we need to specify the API version. This allows programs t
private static final Database db;
static {
fdb = FDB.selectAPIVersion(610);
fdb = FDB.selectAPIVersion(620);
db = fdb.open();
}
@ -66,7 +66,7 @@ If this is all working, it looks like we are ready to start building a real appl
private static final Database db;
static {
fdb = FDB.selectAPIVersion(610);
fdb = FDB.selectAPIVersion(620);
db = fdb.open();
}
@ -441,7 +441,7 @@ Here's the code for the scheduling tutorial:
private static final Database db;
static {
fdb = FDB.selectAPIVersion(610);
fdb = FDB.selectAPIVersion(620);
db = fdb.open();
db.options().setTransactionTimeout(60000); // 60,000 ms = 1 minute
db.options().setRetryLimit(100);

View File

@ -23,7 +23,7 @@ Open a Ruby interactive interpreter and import the FoundationDB API module::
Before using the API, we need to specify the API version. This allows programs to maintain compatibility even if the API is modified in future versions::
> FDB.api_version 610
> FDB.api_version 620
=> nil
Next, we open a FoundationDB database. The API will connect to the FoundationDB cluster indicated by the :ref:`default cluster file <default-cluster-file>`. ::
@ -46,7 +46,7 @@ If this is all working, it looks like we are ready to start building a real appl
.. code-block:: ruby
require 'fdb'
FDB.api_version 610
FDB.api_version 620
@db = FDB.open
@db['hello'] = 'world'
print 'hello ', @db['hello']
@ -373,7 +373,7 @@ Here's the code for the scheduling tutorial:
require 'fdb'
FDB.api_version 610
FDB.api_version 620
####################################
## Initialization ##

View File

@ -30,7 +30,7 @@ Open a Python interactive interpreter and import the FoundationDB API module::
Before using the API, we need to specify the API version. This allows programs to maintain compatibility even if the API is modified in future versions::
>>> fdb.api_version(610)
>>> fdb.api_version(620)
Next, we open a FoundationDB database. The API will connect to the FoundationDB cluster indicated by the :ref:`default cluster file <default-cluster-file>`. ::
@ -48,7 +48,7 @@ When this command returns without exception, the modification is durably stored
If this is all working, it looks like we are ready to start building a real application. For reference, here's the full code for "hello world"::
import fdb
fdb.api_version(610)
fdb.api_version(620)
db = fdb.open()
db[b'hello'] = b'world'
print 'hello', db[b'hello']
@ -91,7 +91,7 @@ FoundationDB includes a few tools that make it easy to model data using this app
opening a :ref:`directory <developer-guide-directories>` in the database::
import fdb
fdb.api_version(610)
fdb.api_version(620)
db = fdb.open()
scheduling = fdb.directory.create_or_open(db, ('scheduling',))
@ -337,7 +337,7 @@ Here's the code for the scheduling tutorial::
import fdb
import fdb.tuple
fdb.api_version(610)
fdb.api_version(620)
####################################

View File

@ -69,7 +69,7 @@ Heres a basic implementation of the recipe.
private static final long EMPTY_ARRAY = -1;
static {
fdb = FDB.selectAPIVersion(610);
fdb = FDB.selectAPIVersion(620);
db = fdb.open();
docSpace = new Subspace(Tuple.from("D"));
}

View File

@ -333,7 +333,9 @@
"storage_servers_error",
"status_incomplete",
"layer_status_incomplete",
"database_availability_timeout"
"database_availability_timeout",
"consistencycheck_suspendkey_fetch_timeout",
"consistencycheck_disabled"
]
},
"issues":[

View File

@ -74,7 +74,7 @@ Heres a simple implementation of multimaps with multisets as described:
private static final int N = 100;
static {
fdb = FDB.selectAPIVersion(610);
fdb = FDB.selectAPIVersion(620);
db = fdb.open();
multi = new Subspace(Tuple.from("M"));
}

View File

@ -74,7 +74,7 @@ Here's a basic implementation of the model:
private static final Random randno;
static{
fdb = FDB.selectAPIVersion(610);
fdb = FDB.selectAPIVersion(620);
db = fdb.open();
pq = new Subspace(Tuple.from("P"));

View File

@ -73,7 +73,7 @@ The following is a simple implementation of the basic pattern:
private static final Random randno;
static{
fdb = FDB.selectAPIVersion(610);
fdb = FDB.selectAPIVersion(620);
db = fdb.open();
queue = new Subspace(Tuple.from("Q"));
randno = new Random();

View File

@ -28,6 +28,8 @@ Status
Bindings
--------
* Added a new API to get the approximated transaction size before commit, e.g., ``fdb_transaction_get_approximate_size`` in the C binding. `(PR #1756) <https://github.com/apple/foundationdb/pull/1756>`_.
* C: ``fdb_future_get_version`` has been renamed to ``fdb_future_get_int64``. `(PR #1756) <https://github.com/apple/foundationdb/pull/1756>`_.
* Go: The Go bindings now require Go version 1.11 or later.
* Go: Fix issue with finalizers running too early that could lead to undefined behavior. `(PR #1451) <https://github.com/apple/foundationdb/pull/1451>`_.
* Added transaction option to control the field length of keys and values in debug transaction logging in order to avoid truncation. `(PR #1844) <https://github.com/apple/foundationdb/pull/1844>`_.

View File

@ -87,7 +87,7 @@ In this example, were storing user data based on user ID but sometimes need t
private static final Subspace index;
static {
fdb = FDB.selectAPIVersion(610);
fdb = FDB.selectAPIVersion(620);
db = fdb.open();
main = new Subspace(Tuple.from("user"));
index = new Subspace(Tuple.from("zipcode_index"));

View File

@ -62,7 +62,7 @@ Heres a simple implementation of the basic table pattern:
private static final Subspace colIndex;
static {
fdb = FDB.selectAPIVersion(610);
fdb = FDB.selectAPIVersion(620);
db = fdb.open();
table = new Subspace(Tuple.from("T"));
rowIndex = table.subspace(Tuple.from("R"));

View File

@ -77,7 +77,7 @@ Heres the basic pattern:
private static final Subspace vector;
static {
fdb = FDB.selectAPIVersion(610);
fdb = FDB.selectAPIVersion(620);
db = fdb.open();
vector = new Subspace(Tuple.from("V"));
}

View File

@ -117,37 +117,40 @@ enum {
OPT_DEST_CLUSTER,
OPT_CLEANUP,
OPT_TRACE_FORMAT
OPT_TRACE_FORMAT,
OPT_USE_OBJECT_SERIALIZER
};
CSimpleOpt::SOption g_rgAgentOptions[] = {
#ifdef _WIN32
{ OPT_PARENTPID, "--parentpid", SO_REQ_SEP },
{ OPT_PARENTPID, "--parentpid", SO_REQ_SEP },
#endif
{ OPT_CLUSTERFILE, "-C", SO_REQ_SEP },
{ OPT_CLUSTERFILE, "--cluster_file", SO_REQ_SEP },
{ OPT_KNOB, "--knob_", SO_REQ_SEP },
{ OPT_VERSION, "--version", SO_NONE },
{ OPT_VERSION, "-v", SO_NONE },
{ OPT_QUIET, "-q", SO_NONE },
{ OPT_QUIET, "--quiet", SO_NONE },
{ OPT_TRACE, "--log", SO_NONE },
{ OPT_TRACE_DIR, "--logdir", SO_REQ_SEP },
{ OPT_TRACE_FORMAT, "--trace_format", SO_REQ_SEP },
{ OPT_TRACE_LOG_GROUP, "--loggroup", SO_REQ_SEP },
{ OPT_CRASHONERROR, "--crash", SO_NONE },
{ OPT_LOCALITY, "--locality_", SO_REQ_SEP },
{ OPT_MEMLIMIT, "-m", SO_REQ_SEP },
{ OPT_MEMLIMIT, "--memory", SO_REQ_SEP },
{ OPT_HELP, "-?", SO_NONE },
{ OPT_HELP, "-h", SO_NONE },
{ OPT_HELP, "--help", SO_NONE },
{ OPT_DEVHELP, "--dev-help", SO_NONE },
{ OPT_CLUSTERFILE, "-C", SO_REQ_SEP },
{ OPT_CLUSTERFILE, "--cluster_file", SO_REQ_SEP },
{ OPT_KNOB, "--knob_", SO_REQ_SEP },
{ OPT_VERSION, "--version", SO_NONE },
{ OPT_VERSION, "-v", SO_NONE },
{ OPT_QUIET, "-q", SO_NONE },
{ OPT_QUIET, "--quiet", SO_NONE },
{ OPT_TRACE, "--log", SO_NONE },
{ OPT_TRACE_DIR, "--logdir", SO_REQ_SEP },
{ OPT_TRACE_FORMAT, "--trace_format", SO_REQ_SEP },
{ OPT_USE_OBJECT_SERIALIZER, "-S", SO_REQ_SEP },
{ OPT_USE_OBJECT_SERIALIZER, "--object-serializer", SO_REQ_SEP },
{ OPT_TRACE_LOG_GROUP, "--loggroup", SO_REQ_SEP },
{ OPT_CRASHONERROR, "--crash", SO_NONE },
{ OPT_LOCALITY, "--locality_", SO_REQ_SEP },
{ OPT_MEMLIMIT, "-m", SO_REQ_SEP },
{ OPT_MEMLIMIT, "--memory", SO_REQ_SEP },
{ OPT_HELP, "-?", SO_NONE },
{ OPT_HELP, "-h", SO_NONE },
{ OPT_HELP, "--help", SO_NONE },
{ OPT_DEVHELP, "--dev-help", SO_NONE },
{ OPT_BLOB_CREDENTIALS, "--blob_credentials", SO_REQ_SEP },
#ifndef TLS_DISABLED
TLS_OPTION_FLAGS
#endif
SO_END_OF_OPTIONS
SO_END_OF_OPTIONS
};
CSimpleOpt::SOption g_rgBackupStartOptions[] = {
@ -173,6 +176,8 @@ CSimpleOpt::SOption g_rgBackupStartOptions[] = {
{ OPT_TRACE, "--log", SO_NONE },
{ OPT_TRACE_DIR, "--logdir", SO_REQ_SEP },
{ OPT_TRACE_FORMAT, "--trace_format", SO_REQ_SEP },
{ OPT_USE_OBJECT_SERIALIZER, "-S", SO_REQ_SEP },
{ OPT_USE_OBJECT_SERIALIZER, "--object-serializer", SO_REQ_SEP },
{ OPT_TRACE_LOG_GROUP, "--loggroup", SO_REQ_SEP },
{ OPT_QUIET, "-q", SO_NONE },
{ OPT_QUIET, "--quiet", SO_NONE },
@ -240,6 +245,8 @@ CSimpleOpt::SOption g_rgBackupStatusOptions[] = {
{ OPT_TRACE, "--log", SO_NONE },
{ OPT_TRACE_DIR, "--logdir", SO_REQ_SEP },
{ OPT_TRACE_FORMAT, "--trace_format", SO_REQ_SEP },
{ OPT_USE_OBJECT_SERIALIZER, "-S", SO_REQ_SEP },
{ OPT_USE_OBJECT_SERIALIZER, "--object-serializer", SO_REQ_SEP },
{ OPT_TRACE_LOG_GROUP, "--loggroup", SO_REQ_SEP },
{ OPT_VERSION, "--version", SO_NONE },
{ OPT_VERSION, "-v", SO_NONE },
@ -270,6 +277,8 @@ CSimpleOpt::SOption g_rgBackupAbortOptions[] = {
{ OPT_TRACE, "--log", SO_NONE },
{ OPT_TRACE_DIR, "--logdir", SO_REQ_SEP },
{ OPT_TRACE_FORMAT, "--trace_format", SO_REQ_SEP },
{ OPT_USE_OBJECT_SERIALIZER, "-S", SO_REQ_SEP },
{ OPT_USE_OBJECT_SERIALIZER, "--object-serializer", SO_REQ_SEP },
{ OPT_TRACE_LOG_GROUP, "--loggroup", SO_REQ_SEP },
{ OPT_QUIET, "-q", SO_NONE },
{ OPT_QUIET, "--quiet", SO_NONE },
@ -301,6 +310,8 @@ CSimpleOpt::SOption g_rgBackupDiscontinueOptions[] = {
{ OPT_TRACE, "--log", SO_NONE },
{ OPT_TRACE_DIR, "--logdir", SO_REQ_SEP },
{ OPT_TRACE_FORMAT, "--trace_format", SO_REQ_SEP },
{ OPT_USE_OBJECT_SERIALIZER, "-S", SO_REQ_SEP },
{ OPT_USE_OBJECT_SERIALIZER, "--object-serializer", SO_REQ_SEP },
{ OPT_TRACE_LOG_GROUP, "--loggroup", SO_REQ_SEP },
{ OPT_QUIET, "-q", SO_NONE },
{ OPT_QUIET, "--quiet", SO_NONE },
@ -332,6 +343,8 @@ CSimpleOpt::SOption g_rgBackupWaitOptions[] = {
{ OPT_TRACE, "--log", SO_NONE },
{ OPT_TRACE_DIR, "--logdir", SO_REQ_SEP },
{ OPT_TRACE_FORMAT, "--trace_format", SO_REQ_SEP },
{ OPT_USE_OBJECT_SERIALIZER, "-S", SO_REQ_SEP },
{ OPT_USE_OBJECT_SERIALIZER, "--object-serializer", SO_REQ_SEP },
{ OPT_TRACE_LOG_GROUP, "--loggroup", SO_REQ_SEP },
{ OPT_QUIET, "-q", SO_NONE },
{ OPT_QUIET, "--quiet", SO_NONE },
@ -359,6 +372,8 @@ CSimpleOpt::SOption g_rgBackupPauseOptions[] = {
{ OPT_TRACE, "--log", SO_NONE },
{ OPT_TRACE_DIR, "--logdir", SO_REQ_SEP },
{ OPT_TRACE_FORMAT, "--trace_format", SO_REQ_SEP },
{ OPT_USE_OBJECT_SERIALIZER, "-S", SO_REQ_SEP },
{ OPT_USE_OBJECT_SERIALIZER, "--object-serializer", SO_REQ_SEP },
{ OPT_TRACE_LOG_GROUP, "--loggroup", SO_REQ_SEP },
{ OPT_QUIET, "-q", SO_NONE },
{ OPT_QUIET, "--quiet", SO_NONE },
@ -388,6 +403,8 @@ CSimpleOpt::SOption g_rgBackupExpireOptions[] = {
{ OPT_TRACE, "--log", SO_NONE },
{ OPT_TRACE_DIR, "--logdir", SO_REQ_SEP },
{ OPT_TRACE_FORMAT, "--trace_format", SO_REQ_SEP },
{ OPT_USE_OBJECT_SERIALIZER, "-S", SO_REQ_SEP },
{ OPT_USE_OBJECT_SERIALIZER, "--object-serializer", SO_REQ_SEP },
{ OPT_TRACE_LOG_GROUP, "--loggroup", SO_REQ_SEP },
{ OPT_QUIET, "-q", SO_NONE },
{ OPT_QUIET, "--quiet", SO_NONE },
@ -425,6 +442,8 @@ CSimpleOpt::SOption g_rgBackupDeleteOptions[] = {
{ OPT_TRACE, "--log", SO_NONE },
{ OPT_TRACE_DIR, "--logdir", SO_REQ_SEP },
{ OPT_TRACE_FORMAT, "--trace_format", SO_REQ_SEP },
{ OPT_USE_OBJECT_SERIALIZER, "-S", SO_REQ_SEP },
{ OPT_USE_OBJECT_SERIALIZER, "--object-serializer", SO_REQ_SEP },
{ OPT_TRACE_LOG_GROUP, "--loggroup", SO_REQ_SEP },
{ OPT_QUIET, "-q", SO_NONE },
{ OPT_QUIET, "--quiet", SO_NONE },
@ -456,6 +475,8 @@ CSimpleOpt::SOption g_rgBackupDescribeOptions[] = {
{ OPT_TRACE, "--log", SO_NONE },
{ OPT_TRACE_DIR, "--logdir", SO_REQ_SEP },
{ OPT_TRACE_FORMAT, "--trace_format", SO_REQ_SEP },
{ OPT_USE_OBJECT_SERIALIZER, "-S", SO_REQ_SEP },
{ OPT_USE_OBJECT_SERIALIZER, "--object-serializer", SO_REQ_SEP },
{ OPT_TRACE_LOG_GROUP, "--loggroup", SO_REQ_SEP },
{ OPT_QUIET, "-q", SO_NONE },
{ OPT_QUIET, "--quiet", SO_NONE },
@ -520,6 +541,8 @@ CSimpleOpt::SOption g_rgBackupListOptions[] = {
{ OPT_TRACE, "--log", SO_NONE },
{ OPT_TRACE_DIR, "--logdir", SO_REQ_SEP },
{ OPT_TRACE_FORMAT, "--trace_format", SO_REQ_SEP },
{ OPT_USE_OBJECT_SERIALIZER, "-S", SO_REQ_SEP },
{ OPT_USE_OBJECT_SERIALIZER, "--object-serializer", SO_REQ_SEP },
{ OPT_TRACE_LOG_GROUP, "--loggroup", SO_REQ_SEP },
{ OPT_QUIET, "-q", SO_NONE },
{ OPT_QUIET, "--quiet", SO_NONE },
@ -562,6 +585,8 @@ CSimpleOpt::SOption g_rgRestoreOptions[] = {
{ OPT_TRACE, "--log", SO_NONE },
{ OPT_TRACE_DIR, "--logdir", SO_REQ_SEP },
{ OPT_TRACE_FORMAT, "--trace_format", SO_REQ_SEP },
{ OPT_USE_OBJECT_SERIALIZER, "-S", SO_REQ_SEP },
{ OPT_USE_OBJECT_SERIALIZER, "--object-serializer", SO_REQ_SEP },
{ OPT_TRACE_LOG_GROUP, "--loggroup", SO_REQ_SEP },
{ OPT_QUIET, "-q", SO_NONE },
{ OPT_QUIET, "--quiet", SO_NONE },
@ -599,6 +624,8 @@ CSimpleOpt::SOption g_rgDBAgentOptions[] = {
{ OPT_TRACE, "--log", SO_NONE },
{ OPT_TRACE_DIR, "--logdir", SO_REQ_SEP },
{ OPT_TRACE_FORMAT, "--trace_format", SO_REQ_SEP },
{ OPT_USE_OBJECT_SERIALIZER, "-S", SO_REQ_SEP },
{ OPT_USE_OBJECT_SERIALIZER, "--object-serializer", SO_REQ_SEP },
{ OPT_TRACE_LOG_GROUP, "--loggroup", SO_REQ_SEP },
{ OPT_CRASHONERROR, "--crash", SO_NONE },
{ OPT_LOCALITY, "--locality_", SO_REQ_SEP },
@ -629,6 +656,8 @@ CSimpleOpt::SOption g_rgDBStartOptions[] = {
{ OPT_TRACE, "--log", SO_NONE },
{ OPT_TRACE_DIR, "--logdir", SO_REQ_SEP },
{ OPT_TRACE_FORMAT, "--trace_format", SO_REQ_SEP },
{ OPT_USE_OBJECT_SERIALIZER, "-S", SO_REQ_SEP },
{ OPT_USE_OBJECT_SERIALIZER, "--object-serializer", SO_REQ_SEP },
{ OPT_TRACE_LOG_GROUP, "--loggroup", SO_REQ_SEP },
{ OPT_QUIET, "-q", SO_NONE },
{ OPT_QUIET, "--quiet", SO_NONE },
@ -662,6 +691,8 @@ CSimpleOpt::SOption g_rgDBStatusOptions[] = {
{ OPT_TRACE, "--log", SO_NONE },
{ OPT_TRACE_DIR, "--logdir", SO_REQ_SEP },
{ OPT_TRACE_FORMAT, "--trace_format", SO_REQ_SEP },
{ OPT_USE_OBJECT_SERIALIZER, "-S", SO_REQ_SEP },
{ OPT_USE_OBJECT_SERIALIZER, "--object-serializer", SO_REQ_SEP },
{ OPT_TRACE_LOG_GROUP, "--loggroup", SO_REQ_SEP },
{ OPT_VERSION, "--version", SO_NONE },
{ OPT_VERSION, "-v", SO_NONE },
@ -693,6 +724,8 @@ CSimpleOpt::SOption g_rgDBSwitchOptions[] = {
{ OPT_TRACE, "--log", SO_NONE },
{ OPT_TRACE_DIR, "--logdir", SO_REQ_SEP },
{ OPT_TRACE_FORMAT, "--trace_format", SO_REQ_SEP },
{ OPT_USE_OBJECT_SERIALIZER, "-S", SO_REQ_SEP },
{ OPT_USE_OBJECT_SERIALIZER, "--object-serializer", SO_REQ_SEP },
{ OPT_TRACE_LOG_GROUP, "--loggroup", SO_REQ_SEP },
{ OPT_QUIET, "-q", SO_NONE },
{ OPT_QUIET, "--quiet", SO_NONE },
@ -726,6 +759,8 @@ CSimpleOpt::SOption g_rgDBAbortOptions[] = {
{ OPT_TRACE, "--log", SO_NONE },
{ OPT_TRACE_DIR, "--logdir", SO_REQ_SEP },
{ OPT_TRACE_FORMAT, "--trace_format", SO_REQ_SEP },
{ OPT_USE_OBJECT_SERIALIZER, "-S", SO_REQ_SEP },
{ OPT_USE_OBJECT_SERIALIZER, "--object-serializer", SO_REQ_SEP },
{ OPT_TRACE_LOG_GROUP, "--loggroup", SO_REQ_SEP },
{ OPT_QUIET, "-q", SO_NONE },
{ OPT_QUIET, "--quiet", SO_NONE },
@ -755,6 +790,8 @@ CSimpleOpt::SOption g_rgDBPauseOptions[] = {
{ OPT_TRACE, "--log", SO_NONE },
{ OPT_TRACE_DIR, "--logdir", SO_REQ_SEP },
{ OPT_TRACE_FORMAT, "--trace_format", SO_REQ_SEP },
{ OPT_USE_OBJECT_SERIALIZER, "-S", SO_REQ_SEP },
{ OPT_USE_OBJECT_SERIALIZER, "--object-serializer", SO_REQ_SEP },
{ OPT_TRACE_LOG_GROUP, "--loggroup", SO_REQ_SEP },
{ OPT_QUIET, "-q", SO_NONE },
{ OPT_QUIET, "--quiet", SO_NONE },
@ -829,6 +866,10 @@ static void printAgentUsage(bool devhelp) {
printf(" --trace_format FORMAT\n"
" Select the format of the trace files. xml (the default) and json are supported.\n"
" Has no effect unless --log is specified.\n");
printf(" -S ON|OFF, --object-serializer ON|OFF\n"
" Use object serializer for sending messages. The object serializer\n"
" is currently a beta feature and it allows fdb processes to talk to\n"
" each other even if they don't have the same version\n");
printf(" -m SIZE, --memory SIZE\n"
" Memory limit. The default value is 8GiB. When specified\n"
" without a unit, MiB is assumed.\n");
@ -904,6 +945,17 @@ static void printBackupUsage(bool devhelp) {
printf(" -k KEYS List of key ranges to backup.\n"
" If not specified, the entire database will be backed up.\n");
printf(" -n, --dryrun For start or restore operations, performs a trial run with no actual changes made.\n");
printf(" --log Enables trace file logging for the CLI session.\n"
" --logdir PATH Specifes the output directory for trace files. If\n"
" unspecified, defaults to the current directory. Has\n"
" no effect unless --log is specified.\n");
printf(" --trace_format FORMAT\n"
" Select the format of the trace files. xml (the default) and json are supported.\n"
" Has no effect unless --log is specified.\n");
printf(" -S ON|OFF, --object-serializer ON|OFF\n"
" Use object serializer for sending messages. The object serializer\n"
" is currently a beta feature and it allows fdb processes to talk to\n"
" each other even if they don't have the same version\n");
#ifndef TLS_DISABLED
printf(TLS_HELP);
#endif
@ -951,6 +1003,17 @@ static void printRestoreUsage(bool devhelp ) {
printf(" --add_prefix PREFIX\n");
printf(" Prefix to add to the restored keys\n");
printf(" -n, --dryrun Perform a trial run with no changes made.\n");
printf(" --log Enables trace file logging for the CLI session.\n"
" --logdir PATH Specifes the output directory for trace files. If\n"
" unspecified, defaults to the current directory. Has\n"
" no effect unless --log is specified.\n");
printf(" --trace_format FORMAT\n"
" Select the format of the trace files. xml (the default) and json are supported.\n"
" Has no effect unless --log is specified.\n");
printf(" -S ON|OFF, --object-serializer ON|OFF\n"
" Use object serializer for sending messages. The object serializer\n"
" is currently a beta feature and it allows fdb processes to talk to\n"
" each other even if they don't have the same version\n");
#ifndef TLS_DISABLED
printf(TLS_HELP);
#endif
@ -992,6 +1055,10 @@ static void printDBAgentUsage(bool devhelp) {
printf(" --trace_format FORMAT\n"
" Select the format of the trace files. xml (the default) and json are supported.\n"
" Has no effect unless --log is specified.\n");
printf(" -S ON|OFF, --object-serializer ON|OFF\n"
" Use object serializer for sending messages. The object serializer\n"
" is currently a beta feature and it allows fdb processes to talk to\n"
" each other even if they don't have the same version\n");
printf(" -m SIZE, --memory SIZE\n"
" Memory limit. The default value is 8GiB. When specified\n"
" without a unit, MiB is assumed.\n");
@ -1028,6 +1095,17 @@ static void printDBBackupUsage(bool devhelp) {
#ifndef TLS_DISABLED
printf(TLS_HELP);
#endif
printf(" --log Enables trace file logging for the CLI session.\n"
" --logdir PATH Specifes the output directory for trace files. If\n"
" unspecified, defaults to the current directory. Has\n"
" no effect unless --log is specified.\n");
printf(" --trace_format FORMAT\n"
" Select the format of the trace files. xml (the default) and json are supported.\n"
" Has no effect unless --log is specified.\n");
printf(" -S ON|OFF, --object-serializer ON|OFF\n"
" Use object serializer for sending messages. The object serializer\n"
" is currently a beta feature and it allows fdb processes to talk to\n"
" each other even if they don't have the same version\n");
printf(" -v, --version Print version information and exit.\n");
printf(" -h, --help Display this help and exit.\n");
printf("\n"
@ -2662,6 +2740,7 @@ int main(int argc, char* argv[]) {
bool dryRun = false;
std::string traceDir = "";
std::string traceFormat = "";
bool useObjectSerializer = true;
std::string traceLogGroup;
uint64_t traceRollSize = TRACE_DEFAULT_ROLL_SIZE;
uint64_t traceMaxLogsSize = TRACE_DEFAULT_MAX_LOGS_SIZE;
@ -2772,6 +2851,18 @@ int main(int argc, char* argv[]) {
}
traceFormat = args->OptionArg();
break;
case OPT_USE_OBJECT_SERIALIZER: {
std::string s = args->OptionArg();
std::transform(s.begin(), s.end(), s.begin(), ::tolower);
if (s == "on" || s == "true" || s == "1") {
useObjectSerializer = true;
} else if (s == "off" || s == "false" || s == "0") {
useObjectSerializer = false;
} else {
fprintf(stderr, "ERROR: Could not parse object serializer option: `%s'\n", s.c_str());
}
break;
}
case OPT_TRACE_LOG_GROUP:
traceLogGroup = args->OptionArg();
break;
@ -3118,6 +3209,11 @@ int main(int argc, char* argv[]) {
setNetworkOption(FDBNetworkOptions::ENABLE_SLOW_TASK_PROFILING);
}
setNetworkOption(FDBNetworkOptions::DISABLE_CLIENT_STATISTICS_LOGGING);
// The USE_OBJECT_SERIALIZER network option expects an 8 byte little endian integer which is interpreted as
// zero = false, non-zero = true.
setNetworkOption(FDBNetworkOptions::USE_OBJECT_SERIALIZER,
useObjectSerializer ? LiteralStringRef("\x01\x00\x00\x00\x00\x00\x00\x00")
: LiteralStringRef("\x00\x00\x00\x00\x00\x00\x00\x00"));
// deferred TLS options
if (tlsCertPath.size()) {

View File

@ -479,10 +479,12 @@ void initHelp() {
"coordinators auto|<ADDRESS>+ [description=new_cluster_description]",
"change cluster coordinators or description",
"If 'auto' is specified, coordinator addresses will be choosen automatically to support the configured redundancy level. (If the current set of coordinators are healthy and already support the redundancy level, nothing will be changed.)\n\nOtherwise, sets the coordinators to the list of IP:port pairs specified by <ADDRESS>+. An fdbserver process must be running on each of the specified addresses.\n\ne.g. coordinators 10.0.0.1:4000 10.0.0.2:4000 10.0.0.3:4000\n\nIf 'description=desc' is specified then the description field in the cluster\nfile is changed to desc, which must match [A-Za-z0-9_]+.");
helpMap["exclude"] = CommandHelp(
"exclude <ADDRESS>*",
"exclude servers from the database",
"If no addresses are specified, lists the set of excluded servers.\n\nFor each IP address or IP:port pair in <ADDRESS>*, adds the address to the set of excluded servers then waits until all database state has been safely moved away from the specified servers.");
helpMap["exclude"] =
CommandHelp("exclude [no_wait] <ADDRESS>*", "exclude servers from the database",
"If no addresses are specified, lists the set of excluded servers.\n\nFor each IP address or "
"IP:port pair in <ADDRESS>*, adds the address to the set of excluded servers then waits until all "
"database state has been safely moved away from the specified servers. If 'no_wait' is set, the "
"command returns \nimmediately without checking if the exclusions have completed successfully.");
helpMap["include"] = CommandHelp(
"include all|<ADDRESS>*",
"permit previously-excluded servers to rejoin the database",
@ -554,6 +556,10 @@ void initHelp() {
"maintenance [on|off] [ZONEID] [SECONDS]",
"mark a zone for maintenance",
"Calling this command with `on' prevents data distribution from moving data away from the processes with the specified ZONEID. Data distribution will automatically be turned back on for ZONEID after the specified SECONDS have elapsed, or after a storage server with a different ZONEID fails. Only one ZONEID can be marked for maintenance. Calling this command with no arguments will display any ongoing maintenance. Calling this command with `off' will disable maintenance.\n");
helpMap["consistencycheck"] = CommandHelp(
"consistencycheck [on|off]",
"enables or disables consistencycheck",
"Calling this command with `on' enables consistency check to run and `off' will disable the same. Calling this command with no arguments will display setting for consistency check.\n");
hiddenCommands.insert("expensive_data_check");
hiddenCommands.insert("datadistribution");
@ -626,6 +632,9 @@ std::string getDateInfoString(StatusObjectReader statusObj, std::string key) {
}
std::string getProcessAddressByServerID(StatusObjectReader processesMap, std::string serverID) {
if(serverID == "")
return "unknown";
for (auto proc : processesMap.obj()){
try {
StatusArray rolesArray = proc.second.get_obj()["roles"].get_array();
@ -1981,13 +1990,18 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc
printf("To find out whether it is safe to remove one or more of these\n"
"servers from the cluster, type `exclude <addresses>'.\n"
"To return one of these servers to the cluster, type `include <addresses>'.\n");
return false;
} else {
state std::vector<AddressExclusion> addresses;
state std::set<AddressExclusion> exclusions;
bool force = false;
state bool waitForAllExcluded = true;
for(auto t = tokens.begin()+1; t != tokens.end(); ++t) {
if(*t == LiteralStringRef("FORCE")) {
force = true;
} else if (*t == LiteralStringRef("no_wait")) {
waitForAllExcluded = false;
} else {
auto a = AddressExclusion::parse( *t );
if (!a.isValid()) {
@ -2089,13 +2103,16 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc
wait( makeInterruptable(excludeServers(db,addresses)) );
printf("Waiting for state to be removed from all excluded servers. This may take a while.\n");
printf("(Interrupting this wait with CTRL+C will not cancel the data movement.)\n");
if (waitForAllExcluded) {
printf("Waiting for state to be removed from all excluded servers. This may take a while.\n");
printf("(Interrupting this wait with CTRL+C will not cancel the data movement.)\n");
}
if(warn.isValid())
warn.cancel();
wait( makeInterruptable(waitForExcludedServers(db,addresses)) );
state std::set<NetworkAddress> notExcludedServers =
wait(makeInterruptable(checkForExcludingServers(db, addresses, waitForAllExcluded)));
std::vector<ProcessData> workers = wait( makeInterruptable(getWorkers(db)) );
std::map<IPAddress, std::set<uint16_t>> workerPorts;
for(auto addr : workers)
@ -2122,9 +2139,18 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc
}
printf("\n");
}
else
} else if (notExcludedServers.empty()) {
printf("\nIt is now safe to remove these machines or processes from the cluster.\n");
} else {
printf("\nWARNING: Exclusion in progress. It is not safe to remove the following machines\n"
"or processes from the cluster:\n");
for (auto addr : notExcludedServers) {
if (addr.port == 0)
printf(" %s\n", addr.ip.toString().c_str());
else
printf(" %s\n", addr.toString().c_str());
}
}
bool foundCoordinator = false;
auto ccs = ClusterConnectionFile( ccf->getFilename() ).getConnectionString();
@ -2138,8 +2164,9 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc
if (foundCoordinator)
printf("Type `help coordinators' for information on how to change the\n"
"cluster's coordination servers before removing them.\n");
return false;
}
return false;
}
ACTOR Future<bool> createSnapshot(Database db, StringRef snapCmd) {
@ -2349,7 +2376,7 @@ struct CLIOptions {
bool trace;
std::string traceDir;
std::string traceFormat;
bool useObjectSerializer = false;
bool useObjectSerializer = true;
int exit_timeout;
Optional<std::string> exec;
bool initialStatusCheck;
@ -2949,6 +2976,31 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
continue;
}
if (tokencmp(tokens[0], "consistencycheck")) {
getTransaction(db, tr, options, intrans);
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
if (tokens.size() == 1) {
state Future<Optional<Standalone<StringRef>>> ccSuspendSettingFuture = tr->get(fdbShouldConsistencyCheckBeSuspended);
wait( makeInterruptable( success(ccSuspendSettingFuture) ) );
bool ccSuspendSetting = ccSuspendSettingFuture.get().present() ? BinaryReader::fromStringRef<bool>(ccSuspendSettingFuture.get().get(), Unversioned()) : false;
printf("ConsistencyCheck is %s\n", ccSuspendSetting ? "off" : "on");
}
else if (tokens.size() == 2 && tokencmp(tokens[1], "off")) {
tr->set(fdbShouldConsistencyCheckBeSuspended, BinaryWriter::toValue(true, Unversioned()));
wait( commitTransaction(tr) );
}
else if (tokens.size() == 2 && tokencmp(tokens[1], "on")) {
tr->set(fdbShouldConsistencyCheckBeSuspended, BinaryWriter::toValue(false, Unversioned()));
wait( commitTransaction(tr) );
} else {
printUsage(tokens[0]);
is_error = true;
}
continue;
}
if (tokencmp(tokens[0], "profile")) {
if (tokens.size() == 1) {
printf("ERROR: Usage: profile <client|list|flow|heap>\n");

View File

@ -843,4 +843,6 @@ public:
return updateErrorInfo(cx, e, details);
}
};
#include "flow/unactorcompiler.h"
#endif

View File

@ -82,8 +82,8 @@ struct struct_like_traits<Tag> : std::true_type {
using Member = Tag;
using types = pack<uint16_t, int8_t>;
template <int i>
static const index_t<i, types>& get(const Member& m) {
template <int i, class Context>
static const index_t<i, types>& get(const Member& m, Context&) {
if constexpr (i == 0) {
return m.id;
} else {
@ -92,8 +92,8 @@ struct struct_like_traits<Tag> : std::true_type {
}
}
template <int i, class Type>
static const void assign(Member& m, const Type& t) {
template <int i, class Type, class Context>
static const void assign(Member& m, const Type& t, Context&) {
if constexpr (i == 0) {
m.id = t;
} else {
@ -322,6 +322,43 @@ struct KeyValueRef {
};
};
template<>
struct string_serialized_traits<KeyValueRef> : std::true_type {
int32_t getSize(const KeyValueRef& item) const {
return 2*sizeof(uint32_t) + item.key.size() + item.value.size();
}
uint32_t save(uint8_t* out, const KeyValueRef& item) const {
auto begin = out;
uint32_t sz = item.key.size();
memcpy(out, &sz, sizeof(sz));
out += sizeof(sz);
memcpy(out, item.key.begin(), sz);
out += sz;
sz = item.value.size();
memcpy(out, &sz, sizeof(sz));
out += sizeof(sz);
memcpy(out, item.value.begin(), sz);
out += sz;
return out - begin;
}
template <class Context>
uint32_t load(const uint8_t* data, KeyValueRef& t, Context& context) {
auto begin = data;
uint32_t sz;
memcpy(&sz, data, sizeof(sz));
data += sizeof(sz);
t.key = StringRef(context.tryReadZeroCopy(data, sz), sz);
data += sz;
memcpy(&sz, data, sizeof(sz));
data += sizeof(sz);
t.value = StringRef(context.tryReadZeroCopy(data, sz), sz);
data += sz;
return data - begin;
}
};
template<>
struct Traceable<KeyValueRef> : std::true_type {
static std::string toString(const KeyValueRef& value) {

View File

@ -334,7 +334,7 @@ namespace HTTP {
}
// Write headers to a packet buffer chain
PacketBuffer *pFirst = new PacketBuffer();
PacketBuffer* pFirst = PacketBuffer::create();
PacketBuffer *pLast = writeRequestHeader(verb, resource, headers, pFirst);
// Prepend headers to content packer buffer chain
pContent->prependWriteBuffer(pFirst, pLast);

View File

@ -61,6 +61,7 @@ public:
virtual ThreadFuture<Void> commit() = 0;
virtual Version getCommittedVersion() = 0;
virtual ThreadFuture<int64_t> getApproximateSize() = 0;
virtual void setOption(FDBTransactionOptions::Option option, Optional<StringRef> value=Optional<StringRef>()) = 0;

View File

@ -1420,13 +1420,16 @@ ACTOR Future<int> setDDMode( Database cx, int mode ) {
}
}
ACTOR Future<Void> waitForExcludedServers( Database cx, vector<AddressExclusion> excl ) {
ACTOR Future<std::set<NetworkAddress>> checkForExcludingServers(Database cx, vector<AddressExclusion> excl,
bool waitForAllExcluded) {
state std::set<AddressExclusion> exclusions( excl.begin(), excl.end() );
state std::set<NetworkAddress> inProgressExclusion;
if (!excl.size()) return Void();
if (!excl.size()) return inProgressExclusion;
loop {
state Transaction tr(cx);
try {
tr.setOption( FDBTransactionOptions::READ_SYSTEM_KEYS );
tr.setOption( FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE ); // necessary?
@ -1439,11 +1442,12 @@ ACTOR Future<Void> waitForExcludedServers( Database cx, vector<AddressExclusion>
ASSERT( !serverList.more && serverList.size() < CLIENT_KNOBS->TOO_MANY );
state bool ok = true;
inProgressExclusion.clear();
for(auto& s : serverList) {
auto addr = decodeServerListValue( s.value ).address();
if ( addressExcluded(exclusions, addr) ) {
ok = false;
break;
inProgressExclusion.insert(addr);
}
}
@ -1454,24 +1458,27 @@ ACTOR Future<Void> waitForExcludedServers( Database cx, vector<AddressExclusion>
for( auto const& log : logs.first ) {
if (log.second == NetworkAddress() || addressExcluded(exclusions, log.second)) {
ok = false;
break;
inProgressExclusion.insert(log.second);
}
}
for( auto const& log : logs.second ) {
if (log.second == NetworkAddress() || addressExcluded(exclusions, log.second)) {
ok = false;
break;
inProgressExclusion.insert(log.second);
}
}
}
if (ok) return Void();
if (ok) return inProgressExclusion;
if (!waitForAllExcluded) break;
wait( delayJittered( 1.0 ) ); // SOMEDAY: watches!
} catch (Error& e) {
wait( tr.onError(e) );
}
}
return inProgressExclusion;
}
ACTOR Future<Void> mgmtSnapCreate(Database cx, StringRef snapCmd) {

View File

@ -154,9 +154,11 @@ ACTOR Future<Void> setClass( Database cx, AddressExclusion server, ProcessClas
// Get the current list of excluded servers
ACTOR Future<vector<AddressExclusion>> getExcludedServers( Database cx );
// Wait for the given, previously excluded servers to be evacuated (no longer used for state). Once this returns it is safe to shut down all such
// machines without impacting fault tolerance, until and unless any of them are explicitly included with includeServers()
ACTOR Future<Void> waitForExcludedServers( Database cx, vector<AddressExclusion> servers );
// Check for the given, previously excluded servers to be evacuated (no longer used for state). If waitForExclusion is
// true, this actor returns once it is safe to shut down all such machines without impacting fault tolerance, until and
// unless any of them are explicitly included with includeServers()
ACTOR Future<std::set<NetworkAddress>> checkForExcludingServers(Database cx, vector<AddressExclusion> servers,
bool waitForAllExcluded);
// Gets a list of all workers in the cluster (excluding testers)
ACTOR Future<vector<ProcessData>> getWorkers( Database cx );

View File

@ -246,10 +246,10 @@ TEST_CASE("/flow/FlatBuffers/LeaderInfo") {
}
in.serializedInfo = rndString;
}
ObjectWriter writer;
ObjectWriter writer(IncludeVersion());
writer.serialize(in);
Standalone<StringRef> copy = writer.toStringRef();
ArenaObjectReader reader(copy.arena(), copy);
ArenaObjectReader reader(copy.arena(), copy, IncludeVersion());
reader.deserialize(out);
ASSERT(in.forward == out.forward);
ASSERT(in.changeID == out.changeID);
@ -268,10 +268,10 @@ TEST_CASE("/flow/FlatBuffers/LeaderInfo") {
ErrorOr<EnsureTable<Optional<LeaderInfo>>> objIn(leaderInfo);
ErrorOr<EnsureTable<Optional<LeaderInfo>>> objOut;
Standalone<StringRef> copy;
ObjectWriter writer;
ObjectWriter writer(IncludeVersion());
writer.serialize(objIn);
copy = writer.toStringRef();
ArenaObjectReader reader(copy.arena(), copy);
ArenaObjectReader reader(copy.arena(), copy, IncludeVersion());
reader.deserialize(objOut);
ASSERT(!objOut.isError());

View File

@ -47,7 +47,7 @@ ThreadFuture<Version> DLTransaction::getReadVersion() {
return toThreadFuture<Version>(api, f, [](FdbCApi::FDBFuture *f, FdbCApi *api) {
int64_t version;
FdbCApi::fdb_error_t error = api->futureGetVersion(f, &version);
FdbCApi::fdb_error_t error = api->futureGetInt64(f, &version);
ASSERT(!error);
return version;
});
@ -195,6 +195,20 @@ Version DLTransaction::getCommittedVersion() {
return version;
}
ThreadFuture<int64_t> DLTransaction::getApproximateSize() {
if(!api->transactionGetApproximateSize) {
return unsupported_operation();
}
FdbCApi::FDBFuture *f = api->transactionGetApproximateSize(tr);
return toThreadFuture<int64_t>(api, f, [](FdbCApi::FDBFuture *f, FdbCApi *api) {
int64_t size = 0;
FdbCApi::fdb_error_t error = api->futureGetInt64(f, &size);
ASSERT(!error);
return size;
});
}
void DLTransaction::setOption(FDBTransactionOptions::Option option, Optional<StringRef> value) {
throwIfError(api->transactionSetOption(tr, option, value.present() ? value.get().begin() : NULL, value.present() ? value.get().size() : 0));
}
@ -287,6 +301,7 @@ void DLApi::init() {
loadClientFunction(&api->transactionAtomicOp, lib, fdbCPath, "fdb_transaction_atomic_op");
loadClientFunction(&api->transactionCommit, lib, fdbCPath, "fdb_transaction_commit");
loadClientFunction(&api->transactionGetCommittedVersion, lib, fdbCPath, "fdb_transaction_get_committed_version");
loadClientFunction(&api->transactionGetApproximateSize, lib, fdbCPath, "fdb_transaction_get_approximate_size", headerVersion >= 620);
loadClientFunction(&api->transactionWatch, lib, fdbCPath, "fdb_transaction_watch");
loadClientFunction(&api->transactionOnError, lib, fdbCPath, "fdb_transaction_on_error");
loadClientFunction(&api->transactionReset, lib, fdbCPath, "fdb_transaction_reset");
@ -294,7 +309,7 @@ void DLApi::init() {
loadClientFunction(&api->transactionAddConflictRange, lib, fdbCPath, "fdb_transaction_add_conflict_range");
loadClientFunction(&api->futureGetDatabase, lib, fdbCPath, "fdb_future_get_database");
loadClientFunction(&api->futureGetVersion, lib, fdbCPath, "fdb_future_get_version");
loadClientFunction(&api->futureGetInt64, lib, fdbCPath, headerVersion >= 620 ? "fdb_future_get_int64" : "fdb_future_get_version");
loadClientFunction(&api->futureGetError, lib, fdbCPath, "fdb_future_get_error");
loadClientFunction(&api->futureGetKey, lib, fdbCPath, "fdb_future_get_key");
loadClientFunction(&api->futureGetValue, lib, fdbCPath, "fdb_future_get_value");
@ -595,6 +610,12 @@ Version MultiVersionTransaction::getCommittedVersion() {
return invalidVersion;
}
ThreadFuture<int64_t> MultiVersionTransaction::getApproximateSize() {
auto tr = getTransaction();
auto f = tr.transaction ? tr.transaction->getApproximateSize() : ThreadFuture<int64_t>(Never());
return abortableFuture(f, tr.onChange);
}
void MultiVersionTransaction::setOption(FDBTransactionOptions::Option option, Optional<StringRef> value) {
auto itr = FDBTransactionOptions::optionInfo.find(option);
if(itr == FDBTransactionOptions::optionInfo.end()) {

View File

@ -84,6 +84,7 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
FDBFuture* (*transactionCommit)(FDBTransaction *tr);
fdb_error_t (*transactionGetCommittedVersion)(FDBTransaction *tr, int64_t *outVersion);
FDBFuture* (*transactionGetApproximateSize)(FDBTransaction *tr);
FDBFuture* (*transactionWatch)(FDBTransaction *tr, uint8_t const *keyName, int keyNameLength);
FDBFuture* (*transactionOnError)(FDBTransaction *tr, fdb_error_t error);
void (*transactionReset)(FDBTransaction *tr);
@ -94,7 +95,7 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
//Future
fdb_error_t (*futureGetDatabase)(FDBFuture *f, FDBDatabase **outDb);
fdb_error_t (*futureGetVersion)(FDBFuture *f, int64_t *outVersion);
fdb_error_t (*futureGetInt64)(FDBFuture *f, int64_t *outValue);
fdb_error_t (*futureGetError)(FDBFuture *f);
fdb_error_t (*futureGetKey)(FDBFuture *f, uint8_t const **outKey, int *outKeyLength);
fdb_error_t (*futureGetValue)(FDBFuture *f, fdb_bool_t *outPresent, uint8_t const **outValue, int *outValueLength);
@ -116,41 +117,42 @@ public:
DLTransaction(Reference<FdbCApi> api, FdbCApi::FDBTransaction *tr) : api(api), tr(tr) {}
~DLTransaction() { api->transactionDestroy(tr); }
void cancel();
void setVersion(Version v);
ThreadFuture<Version> getReadVersion();
void cancel() override;
void setVersion(Version v) override;
ThreadFuture<Version> getReadVersion() override;
ThreadFuture<Optional<Value>> get(const KeyRef& key, bool snapshot=false);
ThreadFuture<Key> getKey(const KeySelectorRef& key, bool snapshot=false);
ThreadFuture<Standalone<RangeResultRef>> getRange(const KeySelectorRef& begin, const KeySelectorRef& end, int limit, bool snapshot=false, bool reverse=false);
ThreadFuture<Standalone<RangeResultRef>> getRange(const KeySelectorRef& begin, const KeySelectorRef& end, GetRangeLimits limits, bool snapshot=false, bool reverse=false);
ThreadFuture<Standalone<RangeResultRef>> getRange(const KeyRangeRef& keys, int limit, bool snapshot=false, bool reverse=false);
ThreadFuture<Standalone<RangeResultRef>> getRange( const KeyRangeRef& keys, GetRangeLimits limits, bool snapshot=false, bool reverse=false);
ThreadFuture<Standalone<VectorRef<const char*>>> getAddressesForKey(const KeyRef& key);
ThreadFuture<Standalone<StringRef>> getVersionstamp();
ThreadFuture<Optional<Value>> get(const KeyRef& key, bool snapshot=false) override;
ThreadFuture<Key> getKey(const KeySelectorRef& key, bool snapshot=false) override;
ThreadFuture<Standalone<RangeResultRef>> getRange(const KeySelectorRef& begin, const KeySelectorRef& end, int limit, bool snapshot=false, bool reverse=false) override;
ThreadFuture<Standalone<RangeResultRef>> getRange(const KeySelectorRef& begin, const KeySelectorRef& end, GetRangeLimits limits, bool snapshot=false, bool reverse=false) override;
ThreadFuture<Standalone<RangeResultRef>> getRange(const KeyRangeRef& keys, int limit, bool snapshot=false, bool reverse=false) override;
ThreadFuture<Standalone<RangeResultRef>> getRange( const KeyRangeRef& keys, GetRangeLimits limits, bool snapshot=false, bool reverse=false) override;
ThreadFuture<Standalone<VectorRef<const char*>>> getAddressesForKey(const KeyRef& key) override;
ThreadFuture<Standalone<StringRef>> getVersionstamp() override;
void addReadConflictRange(const KeyRangeRef& keys);
void addReadConflictRange(const KeyRangeRef& keys) override;
void atomicOp(const KeyRef& key, const ValueRef& value, uint32_t operationType);
void set(const KeyRef& key, const ValueRef& value);
void clear(const KeyRef& begin, const KeyRef& end);
void clear(const KeyRangeRef& range);
void clear(const KeyRef& key);
void atomicOp(const KeyRef& key, const ValueRef& value, uint32_t operationType) override;
void set(const KeyRef& key, const ValueRef& value) override;
void clear(const KeyRef& begin, const KeyRef& end) override;
void clear(const KeyRangeRef& range) override;
void clear(const KeyRef& key) override;
ThreadFuture<Void> watch(const KeyRef& key);
ThreadFuture<Void> watch(const KeyRef& key) override;
void addWriteConflictRange(const KeyRangeRef& keys);
void addWriteConflictRange(const KeyRangeRef& keys) override;
ThreadFuture<Void> commit();
Version getCommittedVersion();
ThreadFuture<Void> commit() override;
Version getCommittedVersion() override;
ThreadFuture<int64_t> getApproximateSize() override;
void setOption(FDBTransactionOptions::Option option, Optional<StringRef> value=Optional<StringRef>());
void setOption(FDBTransactionOptions::Option option, Optional<StringRef> value=Optional<StringRef>()) override;
ThreadFuture<Void> onError(Error const& e);
void reset();
ThreadFuture<Void> onError(Error const& e) override;
void reset() override;
void addref() { ThreadSafeReferenceCounted<DLTransaction>::addref(); }
void delref() { ThreadSafeReferenceCounted<DLTransaction>::delref(); }
void addref() override { ThreadSafeReferenceCounted<DLTransaction>::addref(); }
void delref() override { ThreadSafeReferenceCounted<DLTransaction>::delref(); }
private:
const Reference<FdbCApi> api;
@ -165,11 +167,11 @@ public:
ThreadFuture<Void> onReady();
Reference<ITransaction> createTransaction();
void setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value = Optional<StringRef>());
Reference<ITransaction> createTransaction() override;
void setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;
void addref() { ThreadSafeReferenceCounted<DLDatabase>::addref(); }
void delref() { ThreadSafeReferenceCounted<DLDatabase>::delref(); }
void addref() override { ThreadSafeReferenceCounted<DLDatabase>::addref(); }
void delref() override { ThreadSafeReferenceCounted<DLDatabase>::delref(); }
private:
const Reference<FdbCApi> api;
@ -181,18 +183,18 @@ class DLApi : public IClientApi {
public:
DLApi(std::string fdbCPath);
void selectApiVersion(int apiVersion);
const char* getClientVersion();
void selectApiVersion(int apiVersion) override;
const char* getClientVersion() override;
void setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> value = Optional<StringRef>());
void setupNetwork();
void runNetwork();
void stopNetwork();
void setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;
void setupNetwork() override;
void runNetwork() override;
void stopNetwork() override;
Reference<IDatabase> createDatabase(const char *clusterFilePath);
Reference<IDatabase> createDatabase(const char *clusterFilePath) override;
Reference<IDatabase> createDatabase609(const char *clusterFilePath); // legacy database creation
void addNetworkThreadCompletionHook(void (*hook)(void*), void *hookParameter);
void addNetworkThreadCompletionHook(void (*hook)(void*), void *hookParameter) override;
private:
const std::string fdbCPath;
@ -212,41 +214,42 @@ class MultiVersionTransaction : public ITransaction, ThreadSafeReferenceCounted<
public:
MultiVersionTransaction(Reference<MultiVersionDatabase> db, UniqueOrderedOptionList<FDBTransactionOptions> defaultOptions);
void cancel();
void setVersion(Version v);
ThreadFuture<Version> getReadVersion();
void cancel() override;
void setVersion(Version v) override;
ThreadFuture<Version> getReadVersion() override;
ThreadFuture<Optional<Value>> get(const KeyRef& key, bool snapshot=false);
ThreadFuture<Key> getKey(const KeySelectorRef& key, bool snapshot=false);
ThreadFuture<Standalone<RangeResultRef>> getRange(const KeySelectorRef& begin, const KeySelectorRef& end, int limit, bool snapshot=false, bool reverse=false);
ThreadFuture<Standalone<RangeResultRef>> getRange(const KeySelectorRef& begin, const KeySelectorRef& end, GetRangeLimits limits, bool snapshot=false, bool reverse=false);
ThreadFuture<Standalone<RangeResultRef>> getRange(const KeyRangeRef& keys, int limit, bool snapshot=false, bool reverse=false);
ThreadFuture<Standalone<RangeResultRef>> getRange( const KeyRangeRef& keys, GetRangeLimits limits, bool snapshot=false, bool reverse=false);
ThreadFuture<Standalone<VectorRef<const char*>>> getAddressesForKey(const KeyRef& key);
ThreadFuture<Standalone<StringRef>> getVersionstamp();
ThreadFuture<Optional<Value>> get(const KeyRef& key, bool snapshot=false) override;
ThreadFuture<Key> getKey(const KeySelectorRef& key, bool snapshot=false) override;
ThreadFuture<Standalone<RangeResultRef>> getRange(const KeySelectorRef& begin, const KeySelectorRef& end, int limit, bool snapshot=false, bool reverse=false) override;
ThreadFuture<Standalone<RangeResultRef>> getRange(const KeySelectorRef& begin, const KeySelectorRef& end, GetRangeLimits limits, bool snapshot=false, bool reverse=false) override;
ThreadFuture<Standalone<RangeResultRef>> getRange(const KeyRangeRef& keys, int limit, bool snapshot=false, bool reverse=false) override;
ThreadFuture<Standalone<RangeResultRef>> getRange( const KeyRangeRef& keys, GetRangeLimits limits, bool snapshot=false, bool reverse=false) override;
ThreadFuture<Standalone<VectorRef<const char*>>> getAddressesForKey(const KeyRef& key) override;
ThreadFuture<Standalone<StringRef>> getVersionstamp() override;
void addReadConflictRange(const KeyRangeRef& keys);
void addReadConflictRange(const KeyRangeRef& keys) override;
void atomicOp(const KeyRef& key, const ValueRef& value, uint32_t operationType);
void set(const KeyRef& key, const ValueRef& value);
void clear(const KeyRef& begin, const KeyRef& end);
void clear(const KeyRangeRef& range);
void clear(const KeyRef& key);
void atomicOp(const KeyRef& key, const ValueRef& value, uint32_t operationType) override;
void set(const KeyRef& key, const ValueRef& value) override;
void clear(const KeyRef& begin, const KeyRef& end) override;
void clear(const KeyRangeRef& range) override;
void clear(const KeyRef& key) override;
ThreadFuture<Void> watch(const KeyRef& key);
ThreadFuture<Void> watch(const KeyRef& key) override;
void addWriteConflictRange(const KeyRangeRef& keys);
void addWriteConflictRange(const KeyRangeRef& keys) override;
ThreadFuture<Void> commit();
Version getCommittedVersion();
ThreadFuture<Void> commit() override;
Version getCommittedVersion() override;
ThreadFuture<int64_t> getApproximateSize() override;
void setOption(FDBTransactionOptions::Option option, Optional<StringRef> value=Optional<StringRef>());
void setOption(FDBTransactionOptions::Option option, Optional<StringRef> value=Optional<StringRef>()) override;
ThreadFuture<Void> onError(Error const& e);
void reset();
ThreadFuture<Void> onError(Error const& e) override;
void reset() override;
void addref() { ThreadSafeReferenceCounted<MultiVersionTransaction>::addref(); }
void delref() { ThreadSafeReferenceCounted<MultiVersionTransaction>::delref(); }
void addref() override { ThreadSafeReferenceCounted<MultiVersionTransaction>::addref(); }
void delref() override { ThreadSafeReferenceCounted<MultiVersionTransaction>::delref(); }
private:
const Reference<MultiVersionDatabase> db;
@ -289,11 +292,11 @@ public:
MultiVersionDatabase(MultiVersionApi *api, std::string clusterFilePath, Reference<IDatabase> db, bool openConnectors=true);
~MultiVersionDatabase();
Reference<ITransaction> createTransaction();
void setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value = Optional<StringRef>());
Reference<ITransaction> createTransaction() override;
void setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;
void addref() { ThreadSafeReferenceCounted<MultiVersionDatabase>::addref(); }
void delref() { ThreadSafeReferenceCounted<MultiVersionDatabase>::delref(); }
void addref() override { ThreadSafeReferenceCounted<MultiVersionDatabase>::addref(); }
void delref() override { ThreadSafeReferenceCounted<MultiVersionDatabase>::delref(); }
static Reference<IDatabase> debugCreateFromExistingDatabase(Reference<IDatabase> db);
@ -354,16 +357,16 @@ private:
class MultiVersionApi : public IClientApi {
public:
void selectApiVersion(int apiVersion);
const char* getClientVersion();
void selectApiVersion(int apiVersion) override;
const char* getClientVersion() override;
void setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> value = Optional<StringRef>());
void setupNetwork();
void runNetwork();
void stopNetwork();
void addNetworkThreadCompletionHook(void (*hook)(void*), void *hookParameter);
void setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;
void setupNetwork() override;
void runNetwork() override;
void stopNetwork() override;
void addNetworkThreadCompletionHook(void (*hook)(void*), void *hookParameter) override;
Reference<IDatabase> createDatabase(const char *clusterFilePath);
Reference<IDatabase> createDatabase(const char *clusterFilePath) override;
static MultiVersionApi* api;
Reference<ClientInfo> getLocalClient();

View File

@ -1871,6 +1871,7 @@ ACTOR Future<Standalone<RangeResultRef>> getRange( Database cx, Reference<Transa
state bool modifiedSelectors = false;
state GetKeyValuesRequest req;
req.isFetchKeys = (info.taskID == TaskPriority::FetchKeys);
req.version = readVersion;
if( reverse && (begin-1).isDefinitelyLess(shard.begin) &&
@ -2824,7 +2825,7 @@ Future<Void> Transaction::commitMutations() {
cx->mutationsPerCommit.addSample(tr.transaction.mutations.size());
cx->bytesPerCommit.addSample(tr.transaction.mutations.expectedSize());
size_t transactionSize = tr.transaction.mutations.expectedSize() + tr.transaction.read_conflict_ranges.expectedSize() + tr.transaction.write_conflict_ranges.expectedSize();
size_t transactionSize = getSize();
if (transactionSize > (uint64_t)FLOW_KNOBS->PACKET_WARNING) {
TraceEvent(!g_network->isSimulated() ? SevWarnAlways : SevWarn, "LargeTransaction")
.suppressFor(1.0)
@ -3191,6 +3192,12 @@ Future<Standalone<StringRef>> Transaction::getVersionstamp() {
return versionstampPromise.getFuture();
}
uint32_t Transaction::getSize() {
auto s = tr.transaction.mutations.expectedSize() + tr.transaction.read_conflict_ranges.expectedSize() +
tr.transaction.write_conflict_ranges.expectedSize();
return s;
}
Future<Void> Transaction::onError( Error const& e ) {
if (e.code() == error_code_success) {
return client_invalid_operation();

View File

@ -67,7 +67,7 @@ struct NetworkOptions {
NetworkOptions()
: localAddress(""), clusterFile(""), traceDirectory(Optional<std::string>()),
traceRollSize(TRACE_DEFAULT_ROLL_SIZE), traceMaxLogsSize(TRACE_DEFAULT_MAX_LOGS_SIZE), traceLogGroup("default"),
traceFormat("xml"), slowTaskProfilingEnabled(false), useObjectSerializer(false) {}
traceFormat("xml"), slowTaskProfilingEnabled(false), useObjectSerializer(true) {}
};
class Database {
@ -287,6 +287,7 @@ public:
Promise<Standalone<StringRef>> versionstampPromise;
uint32_t getSize();
Future<Void> onError( Error const& e );
void flushTrLogsIfEnabled();

View File

@ -990,9 +990,10 @@ public:
if(!ryw->options.readYourWritesDisabled) {
ryw->watchMap[key].push_back(watch);
val = readWithConflictRange( ryw, GetValueReq(key), false );
}
else
} else {
ryw->approximateSize += 2 * key.expectedSize() + 1;
val = ryw->tr.get(key);
}
try {
wait(ryw->resetPromise.getFuture() || success(val) || watch->onChangeTrigger.getFuture());
@ -1045,7 +1046,7 @@ public:
return Void();
}
ryw->writeRangeToNativeTransaction(KeyRangeRef(StringRef(), allKeys.end));
auto conflictRanges = ryw->readConflicts.ranges();
@ -1123,8 +1124,11 @@ public:
}
};
ReadYourWritesTransaction::ReadYourWritesTransaction( Database const& cx ) : cache(&arena), writes(&arena), tr(cx), retries(0), creationTime(now()), commitStarted(false), options(tr), deferredError(cx->deferredError) {
std::copy(cx.getTransactionDefaults().begin(), cx.getTransactionDefaults().end(), std::back_inserter(persistentOptions));
ReadYourWritesTransaction::ReadYourWritesTransaction(Database const& cx)
: cache(&arena), writes(&arena), tr(cx), retries(0), approximateSize(0), creationTime(now()), commitStarted(false),
options(tr), deferredError(cx->deferredError) {
std::copy(cx.getTransactionDefaults().begin(), cx.getTransactionDefaults().end(),
std::back_inserter(persistentOptions));
applyPersistentOptions();
}
@ -1367,6 +1371,7 @@ void ReadYourWritesTransaction::addReadConflictRange( KeyRangeRef const& keys )
}
if(options.readYourWritesDisabled) {
approximateSize += r.expectedSize() + sizeof(KeyRangeRef);
tr.addReadConflictRange(r);
return;
}
@ -1381,6 +1386,7 @@ void ReadYourWritesTransaction::updateConflictMap( KeyRef const& key, WriteMap::
//it.skip( key );
//ASSERT( it.beginKey() <= key && key < it.endKey() );
if( it.is_unmodified_range() || ( it.is_operation() && !it.is_independent() ) ) {
approximateSize += 2 * key.expectedSize() + 1 + sizeof(KeyRangeRef);
readConflicts.insert( singleKeyRange( key, arena ), true );
}
}
@ -1391,13 +1397,15 @@ void ReadYourWritesTransaction::updateConflictMap( KeyRangeRef const& keys, Writ
for(; it.beginKey() < keys.end; ++it ) {
if( it.is_unmodified_range() || ( it.is_operation() && !it.is_independent() ) ) {
KeyRangeRef insert_range = KeyRangeRef( std::max( keys.begin, it.beginKey().toArenaOrRef( arena ) ), std::min( keys.end, it.endKey().toArenaOrRef( arena ) ) );
if( !insert_range.empty() )
if (!insert_range.empty()) {
approximateSize += keys.expectedSize() + sizeof(KeyRangeRef);
readConflicts.insert( insert_range, true );
}
}
}
}
void ReadYourWritesTransaction::writeRangeToNativeTransaction( KeyRangeRef const& keys ) {
void ReadYourWritesTransaction::writeRangeToNativeTransaction(KeyRangeRef const& keys) {
WriteMap::iterator it( &writes );
it.skip(keys.begin);
@ -1410,12 +1418,12 @@ void ReadYourWritesTransaction::writeRangeToNativeTransaction( KeyRangeRef const
clearBegin = std::max(ExtStringRef(keys.begin), it.beginKey());
inClearRange = true;
} else if( !it.is_cleared_range() && inClearRange ) {
tr.clear( KeyRangeRef( clearBegin.toArenaOrRef(arena), it.beginKey().toArenaOrRef(arena) ), false );
tr.clear(KeyRangeRef(clearBegin.toArenaOrRef(arena), it.beginKey().toArenaOrRef(arena)), false);
inClearRange = false;
}
}
if( inClearRange ) {
if (inClearRange) {
tr.clear(KeyRangeRef(clearBegin.toArenaOrRef(arena), keys.end), false);
}
@ -1429,7 +1437,7 @@ void ReadYourWritesTransaction::writeRangeToNativeTransaction( KeyRangeRef const
conflictBegin = std::max(ExtStringRef(keys.begin), it.beginKey());
inConflictRange = true;
} else if( !it.is_conflict_range() && inConflictRange ) {
tr.addWriteConflictRange( KeyRangeRef( conflictBegin.toArenaOrRef(arena), it.beginKey().toArenaOrRef(arena) ) );
tr.addWriteConflictRange(KeyRangeRef(conflictBegin.toArenaOrRef(arena), it.beginKey().toArenaOrRef(arena)));
inConflictRange = false;
}
@ -1440,9 +1448,9 @@ void ReadYourWritesTransaction::writeRangeToNativeTransaction( KeyRangeRef const
switch(op[i].type) {
case MutationRef::SetValue:
if (op[i].value.present()) {
tr.set( it.beginKey().assertRef(), op[i].value.get(), false );
tr.set(it.beginKey().assertRef(), op[i].value.get(), false);
} else {
tr.clear( it.beginKey().assertRef(), false );
tr.clear(it.beginKey().assertRef(), false);
}
break;
case MutationRef::AddValue:
@ -1469,7 +1477,7 @@ void ReadYourWritesTransaction::writeRangeToNativeTransaction( KeyRangeRef const
}
if( inConflictRange ) {
tr.addWriteConflictRange( KeyRangeRef( conflictBegin.toArenaOrRef(arena), keys.end ) );
tr.addWriteConflictRange(KeyRangeRef(conflictBegin.toArenaOrRef(arena), keys.end));
}
}
@ -1574,7 +1582,9 @@ void ReadYourWritesTransaction::atomicOp( const KeyRef& key, const ValueRef& ope
throw client_invalid_operation();
}
if(options.readYourWritesDisabled) {
approximateSize += k.expectedSize() + v.expectedSize() + sizeof(MutationRef) +
(addWriteConflict ? sizeof(KeyRangeRef) + 2 * key.expectedSize() + 1 : 0);
if (options.readYourWritesDisabled) {
return tr.atomicOp(k, v, (MutationRef::Type) operationType, addWriteConflict);
}
@ -1604,7 +1614,9 @@ void ReadYourWritesTransaction::set( const KeyRef& key, const ValueRef& value )
if(key >= getMaxWriteKey())
throw key_outside_legal_range();
if(options.readYourWritesDisabled ) {
approximateSize += key.expectedSize() + value.expectedSize() + sizeof(MutationRef) +
(addWriteConflict ? sizeof(KeyRangeRef) + 2 * key.expectedSize() + 1 : 0);
if (options.readYourWritesDisabled) {
return tr.set(key, value, addWriteConflict);
}
@ -1632,10 +1644,12 @@ void ReadYourWritesTransaction::clear( const KeyRangeRef& range ) {
if(range.begin > maxKey || range.end > maxKey)
throw key_outside_legal_range();
if( options.readYourWritesDisabled ) {
approximateSize += range.expectedSize() + sizeof(MutationRef) +
(addWriteConflict ? sizeof(KeyRangeRef) + range.expectedSize() : 0);
if (options.readYourWritesDisabled) {
return tr.clear(range, addWriteConflict);
}
//There aren't any keys in the database with size larger than KEY_SIZE_LIMIT, so if range contains large keys
//we can translate it to an equivalent one with smaller keys
KeyRef begin = range.begin;
@ -1676,6 +1690,8 @@ void ReadYourWritesTransaction::clear( const KeyRef& key ) {
}
KeyRangeRef r = singleKeyRange( key, arena );
approximateSize += r.expectedSize() + sizeof(KeyRangeRef) +
(addWriteConflict ? sizeof(KeyRangeRef) + r.expectedSize() : 0);
//SOMEDAY: add an optimized single key clear to write map
writes.clear(r, addWriteConflict);
@ -1703,7 +1719,7 @@ Future<Void> ReadYourWritesTransaction::watch(const Key& key) {
return RYWImpl::watch(this, key);
}
void ReadYourWritesTransaction::addWriteConflictRange( KeyRangeRef const& keys ) {
void ReadYourWritesTransaction::addWriteConflictRange(KeyRangeRef const& keys) {
if(checkUsedDuringCommit()) {
throw used_during_commit();
}
@ -1730,6 +1746,7 @@ void ReadYourWritesTransaction::addWriteConflictRange( KeyRangeRef const& keys )
return;
}
approximateSize += r.expectedSize() + sizeof(KeyRangeRef);
if(options.readYourWritesDisabled) {
tr.addWriteConflictRange(r);
return;
@ -1854,6 +1871,7 @@ void ReadYourWritesTransaction::operator=(ReadYourWritesTransaction&& r) BOOST_N
r.resetPromise = Promise<Void>();
deferredError = std::move( r.deferredError );
retries = r.retries;
approximateSize = r.approximateSize;
timeoutActor = r.timeoutActor;
creationTime = r.creationTime;
commitStarted = r.commitStarted;
@ -1870,6 +1888,7 @@ ReadYourWritesTransaction::ReadYourWritesTransaction(ReadYourWritesTransaction&&
arena( std::move(r.arena) ),
reading( std::move(r.reading) ),
retries( r.retries ),
approximateSize(r.approximateSize),
creationTime( r.creationTime ),
deferredError( std::move(r.deferredError) ),
timeoutActor( std::move(r.timeoutActor) ),
@ -1921,6 +1940,7 @@ void ReadYourWritesTransaction::resetRyow() {
readConflicts = CoalescedKeyRefRangeMap<bool>();
watchMap.clear();
reading = AndFuture();
approximateSize = 0;
commitStarted = false;
deferredError = Error();
@ -1941,6 +1961,7 @@ void ReadYourWritesTransaction::cancel() {
void ReadYourWritesTransaction::reset() {
retries = 0;
approximateSize = 0;
creationTime = now();
timeoutActor.cancel();
persistentOptions.clear();

View File

@ -98,6 +98,7 @@ public:
Future<Void> commit();
Version getCommittedVersion() { return tr.getCommittedVersion(); }
int64_t getApproximateSize() { return approximateSize; }
Future<Standalone<StringRef>> getVersionstamp();
void setOption( FDBTransactionOptions::Option option, Optional<StringRef> value = Optional<StringRef>() );
@ -140,6 +141,7 @@ private:
Promise<Void> resetPromise;
AndFuture reading;
int retries;
int64_t approximateSize;
Future<Void> timeoutActor;
double creationTime;
bool commitStarted;
@ -149,7 +151,7 @@ private:
void resetTimeout();
void updateConflictMap( KeyRef const& key, WriteMap::iterator& it ); // pre: it.segmentContains(key)
void updateConflictMap( KeyRangeRef const& keys, WriteMap::iterator& it ); // pre: it.segmentContains(keys.begin), keys are already inside this->arena
void writeRangeToNativeTransaction( KeyRangeRef const& keys );
void writeRangeToNativeTransaction(KeyRangeRef const& keys);
void resetRyow(); // doesn't reset the encapsulated transaction, or creation time/retry state
KeyRef getMaxReadKey();

Some files were not shown because too many files have changed in this diff Show More