merge with master

This commit is contained in:
Xiaoxi Wang 2021-07-16 19:19:51 +00:00
commit 36dca1f927
32 changed files with 583 additions and 144 deletions

View File

@ -0,0 +1,202 @@
/*
* CycleMultiClientIntegrationTest
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2021 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.apple.foundationdb;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.concurrent.ThreadLocalRandom;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import com.apple.foundationdb.tuple.Tuple;
import org.junit.jupiter.api.Assertions;
/**
* Setup: Generating a cycle 0 -> 1 -> 2 -> 3 -> 0, its length is 4
* Process: randomly choose an element, reverse 2nd and 4rd element, considering the chosen one as the 1st element.
* Check: verify no element is lost or added, and they are still a cycle.
*
* This test is to verify the atomicity of transactions.
*/
public class CycleMultiClientIntegrationTest {
public static final MultiClientHelper clientHelper = new MultiClientHelper();
// more write txn than validate txn, as parent thread waits only for validate txn.
private static final int writeTxnCnt = 2000;
private static final int validateTxnCnt = 1000;
private static final int threadPerDB = 5;
private static final int cycleLength = 4;
private static List<String> expected = new ArrayList<>(Arrays.asList("0", "1", "2", "3"));
public static void main(String[] args) throws Exception {
FDB fdb = FDB.selectAPIVersion(710);
setupThreads(fdb);
Collection<Database> dbs = clientHelper.openDatabases(fdb); // the clientHelper will close the databases for us
System.out.println("Starting tests");
setup(dbs);
System.out.println("Start processing and validating");
process(dbs);
check(dbs);
System.out.println("Test finished");
}
private static synchronized void setupThreads(FDB fdb) {
int clientThreadsPerVersion = clientHelper.readClusterFromEnv().length;
fdb.options().setClientThreadsPerVersion(clientThreadsPerVersion);
System.out.printf("thread per version is %d\n", clientThreadsPerVersion);
fdb.options().setExternalClientDirectory("/var/dynamic-conf/lib");
fdb.options().setTraceEnable("/tmp");
fdb.options().setKnob("min_trace_severity=5");
}
private static void setup(Collection<Database> dbs) {
// 0 -> 1 -> 2 -> 3 -> 0
for (int k = 0; k < cycleLength; k++) {
String key = Integer.toString(k);
String value = Integer.toString((k + 1) % cycleLength);
for (Database db : dbs) {
db.run(tr -> {
tr.set(Tuple.from(key).pack(), Tuple.from(value).pack());
return null;
});
}
}
}
private static void process(Collection<Database> dbs) {
for (Database db : dbs) {
for (int i = 0; i < threadPerDB; i++) {
final Thread thread = new Thread(CycleWorkload.create(db));
thread.start();
}
}
}
private static void check(Collection<Database> dbs) throws InterruptedException {
final Map<Thread, CycleChecker> threadsToCheckers = new HashMap<>();
for (Database db : dbs) {
for (int i = 0; i < threadPerDB; i++) {
final CycleChecker checker = new CycleChecker(db);
final Thread thread = new Thread(checker);
thread.start();
threadsToCheckers.put(thread, checker);
}
}
for (Map.Entry<Thread, CycleChecker> entry : threadsToCheckers.entrySet()) {
entry.getKey().join();
final boolean succeed = entry.getValue().succeed();
Assertions.assertTrue(succeed, "Cycle test failed");
}
}
public static class CycleWorkload implements Runnable {
private final Database db;
private CycleWorkload(Database db) {
this.db = db;
}
public static CycleWorkload create(Database db) {
return new CycleWorkload(db);
}
@Override
public void run() {
for (int i = 0; i < writeTxnCnt; i++) {
db.run(tr -> {
final int k = ThreadLocalRandom.current().nextInt(cycleLength);
final String key = Integer.toString(k);
byte[] result1 = tr.get(Tuple.from(key).pack()).join();
String value1 = Tuple.fromBytes(result1).getString(0);
byte[] result2 = tr.get(Tuple.from(value1).pack()).join();
String value2 = Tuple.fromBytes(result2).getString(0);
byte[] result3 = tr.get(Tuple.from(value2).pack()).join();
String value3 = Tuple.fromBytes(result3).getString(0);
byte[] result4 = tr.get(Tuple.from(value3).pack()).join();
tr.set(Tuple.from(key).pack(), Tuple.from(value2).pack());
tr.set(Tuple.from(value2).pack(), Tuple.from(value1).pack());
tr.set(Tuple.from(value1).pack(), Tuple.from(value3).pack());
return null;
});
}
}
}
public static class CycleChecker implements Runnable {
private final Database db;
private boolean succeed;
public CycleChecker(Database db) {
this.db = db;
this.succeed = true;
}
public static CycleChecker create(Database db) {
return new CycleChecker(db);
}
@Override
public void run() {
for (int i = 0; i < validateTxnCnt; i++) {
db.run(tr -> {
final int k = ThreadLocalRandom.current().nextInt(cycleLength);
final String key = Integer.toString(k);
byte[] result1 = tr.get(Tuple.from(key).pack()).join();
String value1 = Tuple.fromBytes(result1).getString(0);
byte[] result2 = tr.get(Tuple.from(value1).pack()).join();
String value2 = Tuple.fromBytes(result2).getString(0);
byte[] result3 = tr.get(Tuple.from(value2).pack()).join();
String value3 = Tuple.fromBytes(result3).getString(0);
byte[] result4 = tr.get(Tuple.from(value3).pack()).join();
String value4 = Tuple.fromBytes(result4).getString(0);
if (!key.equals(value4)) {
succeed = false;
}
List<String> actual = new ArrayList<>(Arrays.asList(value1, value2, value3, value4));
Collections.sort(actual);
if (!expected.equals(actual)) {
succeed = false;
}
return null;
});
}
}
public boolean succeed() {
return succeed;
}
}
}

View File

@ -0,0 +1,143 @@
package com.apple.foundationdb;
import com.apple.foundationdb.tuple.Tuple;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import org.junit.jupiter.api.Assertions;
/**
* Each cluster has a queue, producer writes a key and then send a message to this queue in JVM.
* Consumer would consume the key by checking the existence of the key, if it does not find the key,
* then the test would fail.
*
* This test is to verify the causal consistency of transactions for mutli-threaded client.
*/
public class SidebandMultiThreadClientTest {
public static final MultiClientHelper clientHelper = new MultiClientHelper();
private static final Map<Database, BlockingQueue<String>> db2Queues = new HashMap<>();
private static final int threadPerDB = 5;
private static final int txnCnt = 1000;
public static void main(String[] args) throws Exception {
FDB fdb = FDB.selectAPIVersion(710);
setupThreads(fdb);
Collection<Database> dbs = clientHelper.openDatabases(fdb); // the clientHelper will close the databases for us
for (Database db : dbs) {
db2Queues.put(db, new LinkedBlockingQueue<>());
}
System.out.println("Start processing and validating");
process(dbs);
check(dbs);
System.out.println("Test finished");
}
private static synchronized void setupThreads(FDB fdb) {
int clientThreadsPerVersion = clientHelper.readClusterFromEnv().length;
fdb.options().setClientThreadsPerVersion(clientThreadsPerVersion);
System.out.printf("thread per version is %d\n", clientThreadsPerVersion);
fdb.options().setExternalClientDirectory("/var/dynamic-conf/lib");
fdb.options().setTraceEnable("/tmp");
fdb.options().setKnob("min_trace_severity=5");
}
private static void process(Collection<Database> dbs) {
for (Database db : dbs) {
for (int i = 0; i < threadPerDB; i++) {
final Thread thread = new Thread(Producer.create(db, db2Queues.get(db)));
thread.start();
}
}
}
private static void check(Collection<Database> dbs) throws InterruptedException {
final Map<Thread, Consumer> threads2Consumers = new HashMap<>();
for (Database db : dbs) {
for (int i = 0; i < threadPerDB; i++) {
final Consumer consumer = Consumer.create(db, db2Queues.get(db));
final Thread thread = new Thread(consumer);
thread.start();
threads2Consumers.put(thread, consumer);
}
}
for (Map.Entry<Thread, Consumer> entry : threads2Consumers.entrySet()) {
entry.getKey().join();
final boolean succeed = entry.getValue().succeed;
Assertions.assertTrue(succeed, "Sideband test failed");
}
}
public static class Producer implements Runnable {
private final Database db;
private final BlockingQueue<String> queue;
private Producer(Database db, BlockingQueue<String> queue) {
this.db = db;
this.queue = queue;
}
public static Producer create(Database db, BlockingQueue<String> queue) {
return new Producer(db, queue);
}
@Override
public void run() {
for (int i = 0; i < txnCnt; i++) {
final long suffix = ThreadLocalRandom.current().nextLong();
final String key = String.format("Sideband/Multithread/Test/%d", suffix);
db.run(tr -> {
tr.set(Tuple.from(key).pack(), Tuple.from("bar").pack());
return null;
});
queue.offer(key);
}
}
}
public static class Consumer implements Runnable {
private final Database db;
private final BlockingQueue<String> queue;
private boolean succeed;
private Consumer(Database db, BlockingQueue<String> queue) {
this.db = db;
this.queue = queue;
this.succeed = true;
}
public static Consumer create(Database db, BlockingQueue<String> queue) {
return new Consumer(db, queue);
}
@Override
public void run() {
try {
for (int i = 0; i < txnCnt && succeed; i++) {
final String key = queue.take();
db.run(tr -> {
byte[] result = tr.get(Tuple.from(key).pack()).join();
if (result == null) {
System.out.println("FAILED to get key " + key + " from DB " + db);
succeed = false;
}
if (!succeed) {
return null;
}
String value = Tuple.fromBytes(result).getString(0);
return null;
});
}
} catch (InterruptedException e) {
System.out.println("Get Exception in consumer: " + e);
succeed = false;
}
}
}
}

View File

@ -49,6 +49,8 @@ set(JAVA_INTEGRATION_TESTS
src/integration/com/apple/foundationdb/DirectoryTest.java
src/integration/com/apple/foundationdb/RangeQueryIntegrationTest.java
src/integration/com/apple/foundationdb/BasicMultiClientIntegrationTest.java
src/integration/com/apple/foundationdb/CycleMultiClientIntegrationTest.java
src/integration/com/apple/foundationdb/SidebandMultiThreadClientTest.java
)
# Resources that are used in integration testing, but are not explicitly test files (JUnit rules,

View File

@ -81,5 +81,15 @@ if (NOT WIN32 AND NOT OPEN_FOR_IDE)
COMMAND ${CMAKE_SOURCE_DIR}/bindings/python/tests/fdbcli_tests.py
${CMAKE_BINARY_DIR}/bin/fdbcli
@CLUSTER_FILE@
1
)
add_fdbclient_test(
NAME multi_process_fdbcli_tests
PROCESS_NUMBER 5
TEST_TIMEOUT 120 # The test can take near to 1 minutes sometime, set timeout to 2 minutes to be safe
COMMAND ${CMAKE_SOURCE_DIR}/bindings/python/tests/fdbcli_tests.py
${CMAKE_BINARY_DIR}/bin/fdbcli
@CLUSTER_FILE@
5
)
endif()

View File

@ -332,22 +332,90 @@ def transaction(logger):
output7 = run_fdbcli_command('get', 'key')
assert output7 == "`key': not found"
def get_fdb_process_addresses():
# get all processes' network addresses
output = run_fdbcli_command('kill')
# except the first line, each line is one process
addresses = output.split('\n')[1:]
assert len(addresses) == process_number
return addresses
@enable_logging()
def coordinators(logger):
# we should only have one coordinator for now
output1 = run_fdbcli_command('coordinators')
assert len(output1.split('\n')) > 2
cluster_description = output1.split('\n')[0].split(': ')[-1]
logger.debug("Cluster description: {}".format(cluster_description))
coordinators = output1.split('\n')[1].split(': ')[-1]
# verify the coordinator
coordinator_list = get_value_from_status_json(True, 'client', 'coordinators', 'coordinators')
assert len(coordinator_list) == 1
assert coordinator_list[0]['address'] == coordinators
# verify the cluster description
assert get_value_from_status_json(True, 'cluster', 'connection_string').startswith('{}:'.format(cluster_description))
addresses = get_fdb_process_addresses()
# set all 5 processes as coordinators and update the cluster description
new_cluster_description = 'a_simple_description'
run_fdbcli_command('coordinators', *addresses, 'description={}'.format(new_cluster_description))
# verify now we have 5 coordinators and the description is updated
output2 = run_fdbcli_command('coordinators')
assert output2.split('\n')[0].split(': ')[-1] == new_cluster_description
assert output2.split('\n')[1] == 'Cluster coordinators ({}): {}'.format(5, ','.join(addresses))
# auto change should go back to 1 coordinator
run_fdbcli_command('coordinators', 'auto')
assert len(get_value_from_status_json(True, 'client', 'coordinators', 'coordinators')) == 1
@enable_logging()
def exclude(logger):
# get all processes' network addresses
addresses = get_fdb_process_addresses()
logger.debug("Cluster processes: {}".format(' '.join(addresses)))
# There should be no excluded process for now
no_excluded_process_output = 'There are currently no servers or localities excluded from the database.'
output1 = run_fdbcli_command('exclude')
assert no_excluded_process_output in output1
# randomly pick one and exclude the process
excluded_address = random.choice(addresses)
# sometimes we need to retry the exclude
while True:
logger.debug("Excluding process: {}".format(excluded_address))
error_message = run_fdbcli_command_and_get_error('exclude', excluded_address)
if not error_message:
break
logger.debug("Retry exclude after 1 second")
time.sleep(1)
output2 = run_fdbcli_command('exclude')
# logger.debug(output3)
assert 'There are currently 1 servers or localities being excluded from the database' in output2
assert excluded_address in output2
run_fdbcli_command('include', excluded_address)
# check the include is successful
output4 = run_fdbcli_command('exclude')
assert no_excluded_process_output in output4
if __name__ == '__main__':
# fdbcli_tests.py <path_to_fdbcli_binary> <path_to_fdb_cluster_file>
assert len(sys.argv) == 3, "Please pass arguments: <path_to_fdbcli_binary> <path_to_fdb_cluster_file>"
# fdbcli_tests.py <path_to_fdbcli_binary> <path_to_fdb_cluster_file> <process_number>
assert len(sys.argv) == 4, "Please pass arguments: <path_to_fdbcli_binary> <path_to_fdb_cluster_file> <process_number>"
# shell command template
command_template = [sys.argv[1], '-C', sys.argv[2], '--exec']
# tests for fdbcli commands
# assertions will fail if fdbcli does not work as expected
advanceversion()
cache_range()
consistencycheck()
datadistribution()
kill()
lockAndUnlock()
maintenance()
setclass()
suspend()
transaction()
process_number = int(sys.argv[3])
if process_number == 1:
advanceversion()
cache_range()
consistencycheck()
datadistribution()
kill()
lockAndUnlock()
maintenance()
setclass()
suspend()
transaction()
else:
assert process_number > 1, "Process number should be positive"
coordinators()
exclude()

View File

@ -136,6 +136,7 @@ function(add_fdb_test)
${VALGRIND_OPTION}
${ADD_FDB_TEST_TEST_FILES}
WORKING_DIRECTORY ${PROJECT_BINARY_DIR})
set_tests_properties("${test_name}" PROPERTIES ENVIRONMENT UBSAN_OPTIONS=print_stacktrace=1:halt_on_error=1)
get_filename_component(test_dir_full ${first_file} DIRECTORY)
if(NOT ${test_dir_full} STREQUAL "")
get_filename_component(test_dir ${test_dir_full} NAME)
@ -397,7 +398,7 @@ endfunction()
# Creates a single cluster before running the specified command (usually a ctest test)
function(add_fdbclient_test)
set(options DISABLED ENABLED)
set(oneValueArgs NAME)
set(oneValueArgs NAME PROCESS_NUMBER TEST_TIMEOUT)
set(multiValueArgs COMMAND)
cmake_parse_arguments(T "${options}" "${oneValueArgs}" "${multiValueArgs}" "${ARGN}")
if(OPEN_FOR_IDE)
@ -413,12 +414,27 @@ function(add_fdbclient_test)
message(FATAL_ERROR "COMMAND is a required argument for add_fdbclient_test")
endif()
message(STATUS "Adding Client test ${T_NAME}")
add_test(NAME "${T_NAME}"
if (T_PROCESS_NUMBER)
add_test(NAME "${T_NAME}"
COMMAND ${CMAKE_SOURCE_DIR}/tests/TestRunner/tmp_cluster.py
--build-dir ${CMAKE_BINARY_DIR}
--process-number ${T_PROCESS_NUMBER}
--
${T_COMMAND})
else()
add_test(NAME "${T_NAME}"
COMMAND ${CMAKE_SOURCE_DIR}/tests/TestRunner/tmp_cluster.py
--build-dir ${CMAKE_BINARY_DIR}
--
${T_COMMAND})
set_tests_properties("${T_NAME}" PROPERTIES TIMEOUT 60)
endif()
if (T_TEST_TIMEOUT)
set_tests_properties("${T_NAME}" PROPERTIES TIMEOUT ${T_TEST_TIMEOUT})
else()
# default timeout
set_tests_properties("${T_NAME}" PROPERTIES TIMEOUT 60)
endif()
set_tests_properties("${T_NAME}" PROPERTIES ENVIRONMENT UBSAN_OPTIONS=print_stacktrace=1:halt_on_error=1)
endfunction()
# Creates 3 distinct clusters before running the specified command.

View File

@ -25,11 +25,3 @@
Reference<IConfigTransaction> IConfigTransaction::createTestSimple(ConfigTransactionInterface const& cti) {
return makeReference<SimpleConfigTransaction>(cti);
}
Reference<IConfigTransaction> IConfigTransaction::createSimple(Database const& cx) {
return makeReference<SimpleConfigTransaction>(cx);
}
Reference<IConfigTransaction> IConfigTransaction::createPaxos(Database const& cx) {
return makeReference<PaxosConfigTransaction>(cx);
}

View File

@ -40,8 +40,6 @@ public:
virtual ~IConfigTransaction() = default;
static Reference<IConfigTransaction> createTestSimple(ConfigTransactionInterface const&);
static Reference<IConfigTransaction> createSimple(Database const&);
static Reference<IConfigTransaction> createPaxos(Database const&);
// Not implemented:
void setVersion(Version) override { throw client_invalid_operation(); }

View File

@ -26,33 +26,15 @@
ISingleThreadTransaction* ISingleThreadTransaction::allocateOnForeignThread(Type type) {
if (type == Type::RYW) {
auto tr =
(ReadYourWritesTransaction*)(ReadYourWritesTransaction::operator new(sizeof(ReadYourWritesTransaction)));
tr->preinitializeOnForeignThread();
auto tr = new ReadYourWritesTransaction;
return tr;
} else if (type == Type::SIMPLE_CONFIG) {
auto tr = (SimpleConfigTransaction*)(SimpleConfigTransaction::operator new(sizeof(SimpleConfigTransaction)));
auto tr = new SimpleConfigTransaction;
return tr;
} else if (type == Type::PAXOS_CONFIG) {
auto tr = (PaxosConfigTransaction*)(PaxosConfigTransaction::operator new(sizeof(PaxosConfigTransaction)));
auto tr = new PaxosConfigTransaction;
return tr;
}
ASSERT(false);
return nullptr;
}
void ISingleThreadTransaction::create(ISingleThreadTransaction* tr, Type type, Database db) {
switch (type) {
case Type::RYW:
new (tr) ReadYourWritesTransaction(db);
break;
case Type::SIMPLE_CONFIG:
new (tr) SimpleConfigTransaction(db);
break;
case Type::PAXOS_CONFIG:
new (tr) PaxosConfigTransaction(db);
break;
default:
ASSERT(false);
}
}

View File

@ -45,7 +45,7 @@ public:
};
static ISingleThreadTransaction* allocateOnForeignThread(Type type);
static void create(ISingleThreadTransaction* tr, Type type, Database db);
virtual void setDatabase(Database const&) = 0;
virtual void setVersion(Version v) = 0;
virtual Future<Version> getReadVersion() = 0;

View File

@ -113,7 +113,7 @@ ThreadFuture<RangeResult> DLTransaction::getRange(const KeySelectorRef& begin,
end.offset,
limits.rows,
limits.bytes,
FDBStreamingModes::EXACT,
FDB_STREAMING_MODE_EXACT,
0,
snapshot,
reverse);
@ -207,12 +207,12 @@ ThreadFuture<Standalone<VectorRef<KeyRef>>> DLTransaction::getRangeSplitPoints(c
void DLTransaction::addReadConflictRange(const KeyRangeRef& keys) {
throwIfError(api->transactionAddConflictRange(
tr, keys.begin.begin(), keys.begin.size(), keys.end.begin(), keys.end.size(), FDBConflictRangeTypes::READ));
tr, keys.begin.begin(), keys.begin.size(), keys.end.begin(), keys.end.size(), FDB_CONFLICT_RANGE_TYPE_READ));
}
void DLTransaction::atomicOp(const KeyRef& key, const ValueRef& value, uint32_t operationType) {
api->transactionAtomicOp(
tr, key.begin(), key.size(), value.begin(), value.size(), (FDBMutationTypes::Option)operationType);
tr, key.begin(), key.size(), value.begin(), value.size(), static_cast<FDBMutationType>(operationType));
}
void DLTransaction::set(const KeyRef& key, const ValueRef& value) {
@ -239,7 +239,7 @@ ThreadFuture<Void> DLTransaction::watch(const KeyRef& key) {
void DLTransaction::addWriteConflictRange(const KeyRangeRef& keys) {
throwIfError(api->transactionAddConflictRange(
tr, keys.begin.begin(), keys.begin.size(), keys.end.begin(), keys.end.size(), FDBConflictRangeTypes::WRITE));
tr, keys.begin.begin(), keys.begin.size(), keys.end.begin(), keys.end.size(), FDB_CONFLICT_RANGE_TYPE_WRITE));
}
ThreadFuture<Void> DLTransaction::commit() {
@ -269,8 +269,10 @@ ThreadFuture<int64_t> DLTransaction::getApproximateSize() {
}
void DLTransaction::setOption(FDBTransactionOptions::Option option, Optional<StringRef> value) {
throwIfError(api->transactionSetOption(
tr, option, value.present() ? value.get().begin() : nullptr, value.present() ? value.get().size() : 0));
throwIfError(api->transactionSetOption(tr,
static_cast<FDBTransactionOption>(option),
value.present() ? value.get().begin() : nullptr,
value.present() ? value.get().size() : 0));
}
ThreadFuture<Void> DLTransaction::onError(Error const& e) {
@ -309,8 +311,10 @@ Reference<ITransaction> DLDatabase::createTransaction() {
}
void DLDatabase::setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value) {
throwIfError(api->databaseSetOption(
db, option, value.present() ? value.get().begin() : nullptr, value.present() ? value.get().size() : 0));
throwIfError(api->databaseSetOption(db,
static_cast<FDBDatabaseOption>(option),
value.present() ? value.get().begin() : nullptr,
value.present() ? value.get().size() : 0));
}
ThreadFuture<int64_t> DLDatabase::rebootWorker(const StringRef& address, bool check, int duration) {
@ -504,7 +508,7 @@ void DLApi::selectApiVersion(int apiVersion) {
init();
throwIfError(api->selectApiVersion(apiVersion, headerVersion));
throwIfError(api->setNetworkOption(FDBNetworkOptions::EXTERNAL_CLIENT, nullptr, 0));
throwIfError(api->setNetworkOption(static_cast<FDBNetworkOption>(FDBNetworkOptions::EXTERNAL_CLIENT), nullptr, 0));
}
const char* DLApi::getClientVersion() {
@ -516,8 +520,9 @@ const char* DLApi::getClientVersion() {
}
void DLApi::setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> value) {
throwIfError(api->setNetworkOption(
option, value.present() ? value.get().begin() : nullptr, value.present() ? value.get().size() : 0));
throwIfError(api->setNetworkOption(static_cast<FDBNetworkOption>(option),
value.present() ? value.get().begin() : nullptr,
value.present() ? value.get().size() : 0));
}
void DLApi::setupNetwork() {

View File

@ -22,6 +22,7 @@
#define FDBCLIENT_MULTIVERSIONTRANSACTION_H
#pragma once
#include "bindings/c/foundationdb/fdb_c_options.g.h"
#include "fdbclient/FDBOptions.g.h"
#include "fdbclient/FDBTypes.h"
#include "fdbclient/IClientApi.h"
@ -31,10 +32,10 @@
// FdbCApi is used as a wrapper around the FoundationDB C API that gets loaded from an external client library.
// All of the required functions loaded from that external library are stored in function pointers in this struct.
struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
typedef struct future FDBFuture;
typedef struct cluster FDBCluster;
typedef struct database FDBDatabase;
typedef struct transaction FDBTransaction;
typedef struct FDB_future FDBFuture;
typedef struct FDB_cluster FDBCluster;
typedef struct FDB_database FDBDatabase;
typedef struct FDB_transaction FDBTransaction;
#pragma pack(push, 4)
typedef struct key {
@ -57,16 +58,16 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
// Network
fdb_error_t (*selectApiVersion)(int runtimeVersion, int headerVersion);
const char* (*getClientVersion)();
fdb_error_t (*setNetworkOption)(FDBNetworkOptions::Option option, uint8_t const* value, int valueLength);
fdb_error_t (*setNetworkOption)(FDBNetworkOption option, uint8_t const* value, int valueLength);
fdb_error_t (*setupNetwork)();
fdb_error_t (*runNetwork)();
fdb_error_t (*stopNetwork)();
fdb_error_t* (*createDatabase)(const char* clusterFilePath, FDBDatabase** db);
fdb_error_t (*createDatabase)(const char* clusterFilePath, FDBDatabase** db);
// Database
fdb_error_t (*databaseCreateTransaction)(FDBDatabase* database, FDBTransaction** tr);
fdb_error_t (*databaseSetOption)(FDBDatabase* database,
FDBDatabaseOptions::Option option,
FDBDatabaseOption option,
uint8_t const* value,
int valueLength);
void (*databaseDestroy)(FDBDatabase* database);
@ -86,7 +87,7 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
// Transaction
fdb_error_t (*transactionSetOption)(FDBTransaction* tr,
FDBTransactionOptions::Option option,
FDBTransactionOption option,
uint8_t const* value,
int valueLength);
void (*transactionDestroy)(FDBTransaction* tr);
@ -113,7 +114,7 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
int endOffset,
int limit,
int targetBytes,
FDBStreamingModes::Option mode,
FDBStreamingMode mode,
int iteration,
fdb_bool_t snapshot,
fdb_bool_t reverse);
@ -135,7 +136,7 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
int keyNameLength,
uint8_t const* param,
int paramLength,
FDBMutationTypes::Option operationType);
FDBMutationType operationType);
FDBFuture* (*transactionGetEstimatedRangeSizeBytes)(FDBTransaction* tr,
uint8_t const* begin_key_name,
@ -163,7 +164,7 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
int beginKeyNameLength,
uint8_t const* endKeyName,
int endKeyNameLength,
FDBConflictRangeTypes::Option);
FDBConflictRangeType);
// Future
fdb_error_t (*futureGetDatabase)(FDBFuture* f, FDBDatabase** outDb);

View File

@ -244,8 +244,6 @@ public:
explicit Transaction(Database const& cx);
~Transaction();
void preinitializeOnForeignThread() { committedVersion = invalidVersion; }
void setVersion(Version v);
Future<Version> getReadVersion() { return getReadVersion(0); }
Future<Version> getRawReadVersion();
@ -421,7 +419,7 @@ private:
Database cx;
double backoff;
Version committedVersion;
Version committedVersion{ invalidVersion };
CommitTransactionRequest tr;
Future<Version> readVersion;
Promise<Optional<Value>> metadataVersion;

View File

@ -122,9 +122,14 @@ void PaxosConfigTransaction::checkDeferredError() const {
ASSERT(false);
}
PaxosConfigTransaction::PaxosConfigTransaction(Database const& cx) {
PaxosConfigTransaction::PaxosConfigTransaction() {
// TODO: Implement
ASSERT(false);
}
PaxosConfigTransaction::~PaxosConfigTransaction() = default;
void PaxosConfigTransaction::setDatabase(Database const& cx) {
// TODO: Implement
ASSERT(false);
}

View File

@ -33,8 +33,9 @@ class PaxosConfigTransaction final : public IConfigTransaction, public FastAlloc
PaxosConfigTransactionImpl& impl() { return *_impl; }
public:
PaxosConfigTransaction(Database const&);
PaxosConfigTransaction();
~PaxosConfigTransaction();
void setDatabase(Database const&) override;
Future<Version> getReadVersion() override;
Optional<Version> getCachedReadVersion() const override;

View File

@ -1293,6 +1293,10 @@ ReadYourWritesTransaction::ReadYourWritesTransaction(Database const& cx)
applyPersistentOptions();
}
void ReadYourWritesTransaction::setDatabase(Database const& cx) {
*this = ReadYourWritesTransaction(cx);
}
ACTOR Future<Void> timebomb(double endTime, Promise<Void> resetPromise) {
while (now() < endTime) {
wait(delayUntil(std::min(endTime + 0.0001, now() + CLIENT_KNOBS->TRANSACTION_TIMEOUT_DELAY_INTERVAL)));
@ -1735,10 +1739,6 @@ void ReadYourWritesTransaction::getWriteConflicts(KeyRangeMap<bool>* result) {
}
}
void ReadYourWritesTransaction::preinitializeOnForeignThread() {
tr.preinitializeOnForeignThread();
}
void ReadYourWritesTransaction::setTransactionID(uint64_t id) {
tr.setTransactionID(id);
}

View File

@ -68,6 +68,7 @@ public:
explicit ReadYourWritesTransaction(Database const& cx);
~ReadYourWritesTransaction();
void setDatabase(Database const&) override;
void setVersion(Version v) override { tr.setVersion(v); }
Future<Version> getReadVersion() override;
Optional<Version> getCachedReadVersion() const override { return tr.getCachedReadVersion(); }
@ -153,8 +154,6 @@ public:
void getWriteConflicts(KeyRangeMap<bool>* result) override;
void preinitializeOnForeignThread();
Database getDatabase() const { return tr.getDatabase(); }
const TransactionInfo& getTransactionInfo() const { return tr.info; }

View File

@ -290,10 +290,13 @@ void SimpleConfigTransaction::checkDeferredError() const {
impl().checkDeferredError(deferredError);
}
SimpleConfigTransaction::SimpleConfigTransaction(Database const& cx)
: _impl(std::make_unique<SimpleConfigTransactionImpl>(cx)) {}
void SimpleConfigTransaction::setDatabase(Database const& cx) {
_impl = std::make_unique<SimpleConfigTransactionImpl>(cx);
}
SimpleConfigTransaction::SimpleConfigTransaction(ConfigTransactionInterface const& cti)
: _impl(std::make_unique<SimpleConfigTransactionImpl>(cti)) {}
SimpleConfigTransaction::SimpleConfigTransaction() = default;
SimpleConfigTransaction::~SimpleConfigTransaction() = default;

View File

@ -43,6 +43,8 @@ class SimpleConfigTransaction final : public IConfigTransaction, public FastAllo
public:
SimpleConfigTransaction(ConfigTransactionInterface const&);
SimpleConfigTransaction(Database const&);
SimpleConfigTransaction();
void setDatabase(Database const&) override;
~SimpleConfigTransaction();
Future<Version> getReadVersion() override;
Optional<Version> getCachedReadVersion() const override;

View File

@ -149,9 +149,9 @@ ThreadSafeTransaction::ThreadSafeTransaction(DatabaseContext* cx, ISingleThreadT
auto tr = this->tr = ISingleThreadTransaction::allocateOnForeignThread(type);
// No deferred error -- if the construction of the RYW transaction fails, we have no where to put it
onMainThreadVoid(
[tr, type, cx]() {
[tr, cx]() {
cx->addref();
ISingleThreadTransaction::create(tr, type, Database(cx));
tr->setDatabase(Database(cx));
},
nullptr);
}

View File

@ -17,6 +17,7 @@
#include "../allocators.h"
#include "swap.h"
#include <cstddef>
#if defined(__clang__)
RAPIDJSON_DIAG_PUSH
@ -106,7 +107,7 @@ public:
template <typename T>
RAPIDJSON_FORCEINLINE void Reserve(size_t count = 1) {
// Expand the stack if needed
if (RAPIDJSON_UNLIKELY(stackTop_ + sizeof(T) * count > stackEnd_))
if (RAPIDJSON_UNLIKELY(static_cast<std::ptrdiff_t>(sizeof(T) * count) > (stackEnd_ - stackTop_)))
Expand<T>(count);
}
@ -118,7 +119,7 @@ public:
template <typename T>
RAPIDJSON_FORCEINLINE T* PushUnsafe(size_t count = 1) {
RAPIDJSON_ASSERT(stackTop_ + sizeof(T) * count <= stackEnd_);
RAPIDJSON_ASSERT(static_cast<std::ptrdiff_t>(sizeof(T) * count) <= (stackEnd_ - stackTop_));
T* ret = reinterpret_cast<T*>(stackTop_);
stackTop_ += sizeof(T) * count;
return ret;

View File

@ -52,7 +52,7 @@ namespace vexillographer
{
string parameterComment = "";
if (o.scope.ToString().EndsWith("Option"))
parameterComment = String.Format("{0}/* {1} */\n", indent, "Parameter: " + o.getParameterComment());
parameterComment = String.Format("{0}/* {1} {2}*/\n", indent, "Parameter: " + o.getParameterComment(), o.hidden ? "This is a hidden parameter and should not be used directly by applications." : "");
return String.Format("{0}/* {2} */\n{5}{0}{1}{3}={4}", indent, prefix, o.comment, o.name.ToUpper(), o.code, parameterComment);
}
@ -64,7 +64,7 @@ namespace vexillographer
options = new Option[] { new Option{ scope = scope,
comment = "This option is only a placeholder for C compatibility and should not be used",
code = -1, name = "DUMMY_DO_NOT_USE", paramDesc = null } };
outFile.WriteLine(string.Join(",\n\n", options.Where(f => !f.hidden).Select(f => getCLine(f, " ", prefix)).ToArray()));
outFile.WriteLine(string.Join(",\n\n", options.Select(f => getCLine(f, " ", prefix)).ToArray()));
outFile.WriteLine("}} FDB{0};", scope.ToString());
outFile.WriteLine();
}

View File

@ -137,6 +137,8 @@ int severity_to_priority(Severity severity) {
}
}
typedef std::string ProcessID;
bool daemonize = false;
std::string logGroup = "default";
@ -390,11 +392,11 @@ public:
int pipes[2][2];
Command() : argv(nullptr) {}
Command(const CSimpleIni& ini, std::string _section, uint64_t id, fdb_fd_set fds, int* maxfd)
Command(const CSimpleIni& ini, std::string _section, ProcessID id, fdb_fd_set fds, int* maxfd)
: section(_section), argv(nullptr), fork_retry_time(-1), quiet(false), delete_envvars(nullptr), fds(fds),
deconfigured(false), kill_on_configuration_change(true) {
char _ssection[strlen(section.c_str()) + 22];
snprintf(_ssection, strlen(section.c_str()) + 22, "%s.%" PRIu64, section.c_str(), id);
snprintf(_ssection, strlen(section.c_str()) + 22, "%s", id.c_str());
ssection = _ssection;
for (auto p : pipes) {
@ -593,9 +595,9 @@ public:
}
};
std::unordered_map<uint64_t, std::unique_ptr<Command>> id_command;
std::unordered_map<pid_t, uint64_t> pid_id;
std::unordered_map<uint64_t, pid_t> id_pid;
std::unordered_map<ProcessID, std::unique_ptr<Command>> id_command;
std::unordered_map<pid_t, ProcessID> pid_id;
std::unordered_map<ProcessID, pid_t> id_pid;
enum { OPT_CONFFILE, OPT_LOCKFILE, OPT_LOGGROUP, OPT_DAEMONIZE, OPT_HELP };
@ -608,7 +610,7 @@ CSimpleOpt::SOption g_rgOptions[] = { { OPT_CONFFILE, "--conffile", SO_REQ_SEP }
{ OPT_HELP, "--help", SO_NONE },
SO_END_OF_OPTIONS };
void start_process(Command* cmd, uint64_t id, uid_t uid, gid_t gid, int delay, sigset_t* mask) {
void start_process(Command* cmd, ProcessID id, uid_t uid, gid_t gid, int delay, sigset_t* mask) {
if (!cmd->argv)
return;
@ -758,7 +760,7 @@ bool argv_equal(const char** a1, const char** a2) {
return true;
}
void kill_process(uint64_t id, bool wait = true) {
void kill_process(ProcessID id, bool wait = true) {
pid_t pid = id_pid[id];
log_msg(SevInfo, "Killing process %d\n", pid);
@ -815,7 +817,7 @@ void load_conf(const char* confpath, uid_t& uid, gid_t& gid, sigset_t* mask, fdb
/* Any change to uid or gid requires the process to be restarted to take effect */
if (uid != _uid || gid != _gid) {
std::vector<uint64_t> kill_ids;
std::vector<ProcessID> kill_ids;
for (auto i : id_pid) {
if (id_command[i.first]->kill_on_configuration_change) {
kill_ids.push_back(i.first);
@ -831,8 +833,8 @@ void load_conf(const char* confpath, uid_t& uid, gid_t& gid, sigset_t* mask, fdb
gid = _gid;
}
std::list<uint64_t> kill_ids;
std::list<std::pair<uint64_t, Command*>> start_ids;
std::list<ProcessID> kill_ids;
std::list<std::pair<ProcessID, Command*>> start_ids;
for (auto i : id_pid) {
if (!loadedConf || ini.GetSectionSize(id_command[i.first]->ssection.c_str()) == -1) {
@ -881,31 +883,24 @@ void load_conf(const char* confpath, uid_t& uid, gid_t& gid, sigset_t* mask, fdb
ini.GetAllSections(sections);
for (auto i : sections) {
if (auto dot = strrchr(i.pItem, '.')) {
char* strtol_end;
ProcessID id = i.pItem;
if (!id_pid.count(id)) {
/* Found something we haven't yet started */
Command* cmd;
uint64_t id = strtoull(dot + 1, &strtol_end, 10);
auto itr = id_command.find(id);
if (itr != id_command.end()) {
cmd = itr->second.get();
} else {
std::string section(i.pItem, dot - i.pItem);
auto p = std::make_unique<Command>(ini, section, id, rfds, maxfd);
cmd = p.get();
id_command[id] = std::move(p);
}
if (*strtol_end != '\0' || !(id > 0)) {
log_msg(SevError, "Found bogus id in %s\n", i.pItem);
} else {
if (!id_pid.count(id)) {
/* Found something we haven't yet started */
Command* cmd;
auto itr = id_command.find(id);
if (itr != id_command.end()) {
cmd = itr->second.get();
} else {
std::string section(i.pItem, dot - i.pItem);
auto p = std::make_unique<Command>(ini, section, id, rfds, maxfd);
cmd = p.get();
id_command[id] = std::move(p);
}
if (cmd->fork_retry_time <= timer()) {
log_msg(SevInfo, "Starting %s\n", i.pItem);
start_process(cmd, id, uid, gid, 0, mask);
}
if (cmd->fork_retry_time <= timer()) {
log_msg(SevInfo, "Starting %s\n", i.pItem);
start_process(cmd, id, uid, gid, 0, mask);
}
}
}
@ -1724,7 +1719,7 @@ int main(int argc, char** argv) {
break;
}
uint64_t id = pid_id[pid];
ProcessID id = pid_id[pid];
Command* cmd = id_command[id].get();
pid_id.erase(pid);

View File

@ -919,11 +919,11 @@ ACTOR static void deliver(TransportData* self,
TaskPriority priority,
ArenaReader reader,
bool inReadSocket) {
// We want to run the task at the right priority. If the priority
// is higher than the current priority (which is ReadSocket) we
// can just upgrade. Otherwise we'll context switch so that we
// don't block other tasks that might run with a higher priority.
if (priority < TaskPriority::ReadSocket || !inReadSocket) {
// We want to run the task at the right priority. If the priority is higher than the current priority (which is
// ReadSocket) we can just upgrade. Otherwise we'll context switch so that we don't block other tasks that might run
// with a higher priority. ReplyPromiseStream needs to guarentee that messages are recieved in the order they were
// sent, so even in the case of local delivery those messages need to skip this delay.
if (priority < TaskPriority::ReadSocket || (priority != TaskPriority::NoDeliverDelay && !inReadSocket)) {
wait(delay(0, priority));
} else {
g_network->setCurrentTask(priority);

View File

@ -404,13 +404,13 @@ struct NetNotifiedQueueWithAcknowledgements final : NotifiedQueue<T>,
// Notify the server that a client is not using this ReplyPromiseStream anymore
FlowTransport::transport().sendUnreliable(
SerializeSource<ErrorOr<AcknowledgementReply>>(operation_obsolete()),
acknowledgements.getEndpoint(TaskPriority::ReadSocket),
acknowledgements.getEndpoint(TaskPriority::NoDeliverDelay),
false);
}
if (isRemoteEndpoint() && !sentError && !acknowledgements.failures.isReady()) {
// The ReplyPromiseStream was cancelled before sending an error, so the storage server must have died
FlowTransport::transport().sendUnreliable(SerializeSource<ErrorOr<EnsureTable<T>>>(broken_promise()),
getEndpoint(TaskPriority::ReadSocket),
getEndpoint(TaskPriority::NoDeliverDelay),
false);
}
}
@ -421,6 +421,9 @@ struct NetNotifiedQueueWithAcknowledgements final : NotifiedQueue<T>,
template <class T>
class ReplyPromiseStream {
public:
// The endpoints of a ReplyPromiseStream must be initialized at Task::NoDeliverDelay, because a
// delay(0) in FlowTransport deliver can cause out of order delivery.
// stream.send( request )
// Unreliable at most once delivery: Delivers request unless there is a connection failure (zero or one times)
@ -428,7 +431,7 @@ public:
void send(U&& value) const {
if (queue->isRemoteEndpoint()) {
if (!queue->acknowledgements.getRawEndpoint().isValid()) {
value.acknowledgeToken = queue->acknowledgements.getEndpoint(TaskPriority::ReadSocket).token;
value.acknowledgeToken = queue->acknowledgements.getEndpoint(TaskPriority::NoDeliverDelay).token;
}
queue->acknowledgements.bytesSent += value.expectedSize();
FlowTransport::transport().sendUnreliable(
@ -489,9 +492,9 @@ public:
errors->delPromiseRef();
}
// The endpoints of a ReplyPromiseStream must be initialized at Task::ReadSocket, because with lower priorities a
// The endpoints of a ReplyPromiseStream must be initialized at Task::NoDeliverDelay, because with lower priorities a
// delay(0) in FlowTransport deliver can cause out of order delivery.
const Endpoint& getEndpoint() const { return queue->getEndpoint(TaskPriority::ReadSocket); }
const Endpoint& getEndpoint() const { return queue->getEndpoint(TaskPriority::NoDeliverDelay); }
bool operator==(const ReplyPromiseStream<T>& rhs) const { return queue == rhs.queue; }
bool isEmpty() const { return !queue->isReady(); }

View File

@ -1529,11 +1529,12 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
}
state UID dbgid = self->dbgid;
state Future<Void> maxGetPoppedDuration = delay(SERVER_KNOBS->TXS_POPPED_MAX_DELAY);
wait(waitForAll(poppedReady) || maxGetPoppedDuration);
if (maxGetPoppedDuration.isReady()) {
TraceEvent(SevWarnAlways, "PoppedTxsNotReady", self->dbgid);
TraceEvent(SevWarnAlways, "PoppedTxsNotReady", dbgid);
}
Version maxPopped = 1;

View File

@ -3891,7 +3891,7 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
if (ver != invalidVersion && ver > data->version.get()) {
// TODO(alexmiller): Update to version tracking.
DEBUG_KEY_RANGE("SSUpdate", ver, KeyRangeRef());
// DEBUG_KEY_RANGE("SSUpdate", ver, KeyRangeRef());
data->mutableData().createNewVersion(ver);
if (data->otherError.getFuture().isReady())
@ -4179,7 +4179,7 @@ bool StorageServerDisk::makeVersionMutationsDurable(Version& prevStorageVersion,
VerUpdateRef const& v = u->second;
ASSERT(v.version > prevStorageVersion && v.version <= newStorageVersion);
// TODO(alexmiller): Update to version tracking.
DEBUG_KEY_RANGE("makeVersionMutationsDurable", v.version, KeyRangeRef());
// DEBUG_KEY_RANGE("makeVersionMutationsDurable", v.version, KeyRangeRef());
writeMutations(v.mutations, v.version, "makeVersionDurable");
for (const auto& m : v.mutations)
bytesLeft -= mvccStorageBytes(m);

View File

@ -45,7 +45,7 @@ static HistogramRegistry* globalHistograms = nullptr;
#pragma region HistogramRegistry
HistogramRegistry& GetHistogramRegistry() {
ISimulator::ProcessInfo* info = g_simulator.getCurrentProcess();
ISimulator::ProcessInfo* info = g_network && g_network->isSimulated() ? g_simulator.getCurrentProcess() : nullptr;
if (info) {
// in simulator; scope histograms to simulated process

View File

@ -45,6 +45,7 @@ enum class TaskPriority {
WriteSocket = 10000,
PollEIO = 9900,
DiskIOComplete = 9150,
NoDeliverDelay = 9100,
LoadBalancedEndpoint = 9000,
ReadSocket = 9000,
AcceptSocket = 8950,

View File

@ -275,10 +275,12 @@ if(WITH_PYTHON)
NAME multiversion_client/unit_tests
COMMAND $<TARGET_FILE:fdbserver> -r unittests -f /fdbclient/multiversionclient/
)
set_tests_properties("multiversion_client/unit_tests" PROPERTIES ENVIRONMENT UBSAN_OPTIONS=print_stacktrace=1:halt_on_error=1)
add_test(
NAME threadsafe_threadfuture_to_future/unit_tests
COMMAND $<TARGET_FILE:fdbserver> -r unittests -f /flow/safeThreadFutureToFuture/
)
set_tests_properties("threadsafe_threadfuture_to_future/unit_tests" PROPERTIES ENVIRONMENT UBSAN_OPTIONS=print_stacktrace=1:halt_on_error=1)
endif()
verify_testing()

View File

@ -38,7 +38,7 @@ cluster_file = {etcdir}/fdb.cluster
command = {fdbserver_bin}
public_address = auto:$ID
listen_address = public
datadir = {datadir}
datadir = {datadir}/$ID
logdir = {logdir}
# logsize = 10MiB
# maxlogssize = 100MiB
@ -53,13 +53,13 @@ logdir = {logdir}
## An individual fdbserver process with id 4000
## Parameters set here override defaults from the [fdbserver] section
[fdbserver.{server_port}]
"""
"""
valid_letters_for_secret = string.ascii_letters + string.digits
def __init__(self, basedir: str, fdbserver_binary: str, fdbmonitor_binary: str,
fdbcli_binary: str, create_config=True, port=None, ip_address=None):
fdbcli_binary: str, process_number: int, create_config=True, port=None, ip_address=None):
self.basedir = Path(basedir)
self.fdbserver_binary = Path(fdbserver_binary)
self.fdbmonitor_binary = Path(fdbmonitor_binary)
@ -93,9 +93,16 @@ logdir = {logdir}
etcdir=self.etc,
fdbserver_bin=self.fdbserver_binary,
datadir=self.data,
logdir=self.log,
server_port=self.port
logdir=self.log
))
# By default, the cluster only has one process
# If a port number is given and process_number > 1, we will use subsequent numbers
# E.g., port = 4000, process_number = 5
# Then 4000,4001,4002,4003,4004 will be used as ports
# If port number is not given, we will randomly pick free ports
for index, _ in enumerate(range(process_number)):
f.write('[fdbserver.{server_port}]\n'.format(server_port=self.port))
self.port = get_free_port() if port is None else str(int(self.port) + 1)
def __enter__(self):
assert not self.running, "Can't start a server that is already running"

View File

@ -11,7 +11,7 @@ from random import choice
from pathlib import Path
class TempCluster:
def __init__(self, build_dir: str,port: str = None):
def __init__(self, build_dir: str, process_number: int = 1, port: str = None):
self.build_dir = Path(build_dir).resolve()
assert self.build_dir.exists(), "{} does not exist".format(build_dir)
assert self.build_dir.is_dir(), "{} is not a directory".format(build_dir)
@ -23,6 +23,7 @@ class TempCluster:
self.build_dir.joinpath('bin', 'fdbserver'),
self.build_dir.joinpath('bin', 'fdbmonitor'),
self.build_dir.joinpath('bin', 'fdbcli'),
process_number,
port = port)
self.log = self.cluster.log
self.etc = self.cluster.etc
@ -63,9 +64,10 @@ if __name__ == '__main__':
""")
parser.add_argument('--build-dir', '-b', metavar='BUILD_DIRECTORY', help='FDB build directory', required=True)
parser.add_argument('cmd', metavar="COMMAND", nargs="+", help="The command to run")
parser.add_argument('--process-number', '-p', help="Number of fdb processes running", type=int, default=1)
args = parser.parse_args()
errcode = 1
with TempCluster(args.build_dir) as cluster:
with TempCluster(args.build_dir, args.process_number) as cluster:
print("log-dir: {}".format(cluster.log))
print("etc-dir: {}".format(cluster.etc))
print("data-dir: {}".format(cluster.data))