diff --git a/bindings/java/src/integration/com/apple/foundationdb/CycleMultiClientIntegrationTest.java b/bindings/java/src/integration/com/apple/foundationdb/CycleMultiClientIntegrationTest.java new file mode 100644 index 0000000000..6c8add0d7c --- /dev/null +++ b/bindings/java/src/integration/com/apple/foundationdb/CycleMultiClientIntegrationTest.java @@ -0,0 +1,202 @@ +/* + * CycleMultiClientIntegrationTest + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2021 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.apple.foundationdb; + +import java.util.Arrays; +import java.util.ArrayList; +import java.util.concurrent.ThreadLocalRandom; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import com.apple.foundationdb.tuple.Tuple; + +import org.junit.jupiter.api.Assertions; + +/** + * Setup: Generating a cycle 0 -> 1 -> 2 -> 3 -> 0, its length is 4 + * Process: randomly choose an element, reverse 2nd and 4rd element, considering the chosen one as the 1st element. + * Check: verify no element is lost or added, and they are still a cycle. + * + * This test is to verify the atomicity of transactions. + */ +public class CycleMultiClientIntegrationTest { + public static final MultiClientHelper clientHelper = new MultiClientHelper(); + + // more write txn than validate txn, as parent thread waits only for validate txn. + private static final int writeTxnCnt = 2000; + private static final int validateTxnCnt = 1000; + private static final int threadPerDB = 5; + + private static final int cycleLength = 4; + private static List expected = new ArrayList<>(Arrays.asList("0", "1", "2", "3")); + + public static void main(String[] args) throws Exception { + FDB fdb = FDB.selectAPIVersion(710); + setupThreads(fdb); + Collection dbs = clientHelper.openDatabases(fdb); // the clientHelper will close the databases for us + System.out.println("Starting tests"); + setup(dbs); + System.out.println("Start processing and validating"); + process(dbs); + check(dbs); + System.out.println("Test finished"); + } + + private static synchronized void setupThreads(FDB fdb) { + int clientThreadsPerVersion = clientHelper.readClusterFromEnv().length; + fdb.options().setClientThreadsPerVersion(clientThreadsPerVersion); + System.out.printf("thread per version is %d\n", clientThreadsPerVersion); + fdb.options().setExternalClientDirectory("/var/dynamic-conf/lib"); + fdb.options().setTraceEnable("/tmp"); + fdb.options().setKnob("min_trace_severity=5"); + } + + private static void setup(Collection dbs) { + // 0 -> 1 -> 2 -> 3 -> 0 + for (int k = 0; k < cycleLength; k++) { + String key = Integer.toString(k); + String value = Integer.toString((k + 1) % cycleLength); + + for (Database db : dbs) { + db.run(tr -> { + tr.set(Tuple.from(key).pack(), Tuple.from(value).pack()); + return null; + }); + } + } + } + + private static void process(Collection dbs) { + for (Database db : dbs) { + for (int i = 0; i < threadPerDB; i++) { + final Thread thread = new Thread(CycleWorkload.create(db)); + thread.start(); + } + } + } + + private static void check(Collection dbs) throws InterruptedException { + final Map threadsToCheckers = new HashMap<>(); + for (Database db : dbs) { + for (int i = 0; i < threadPerDB; i++) { + final CycleChecker checker = new CycleChecker(db); + final Thread thread = new Thread(checker); + thread.start(); + threadsToCheckers.put(thread, checker); + } + } + + for (Map.Entry entry : threadsToCheckers.entrySet()) { + entry.getKey().join(); + final boolean succeed = entry.getValue().succeed(); + Assertions.assertTrue(succeed, "Cycle test failed"); + } + } + + public static class CycleWorkload implements Runnable { + + private final Database db; + + private CycleWorkload(Database db) { + this.db = db; + } + + public static CycleWorkload create(Database db) { + return new CycleWorkload(db); + } + + @Override + public void run() { + for (int i = 0; i < writeTxnCnt; i++) { + db.run(tr -> { + final int k = ThreadLocalRandom.current().nextInt(cycleLength); + final String key = Integer.toString(k); + byte[] result1 = tr.get(Tuple.from(key).pack()).join(); + String value1 = Tuple.fromBytes(result1).getString(0); + + byte[] result2 = tr.get(Tuple.from(value1).pack()).join(); + String value2 = Tuple.fromBytes(result2).getString(0); + + byte[] result3 = tr.get(Tuple.from(value2).pack()).join(); + String value3 = Tuple.fromBytes(result3).getString(0); + + byte[] result4 = tr.get(Tuple.from(value3).pack()).join(); + + tr.set(Tuple.from(key).pack(), Tuple.from(value2).pack()); + tr.set(Tuple.from(value2).pack(), Tuple.from(value1).pack()); + tr.set(Tuple.from(value1).pack(), Tuple.from(value3).pack()); + return null; + }); + } + } + } + + public static class CycleChecker implements Runnable { + private final Database db; + private boolean succeed; + + public CycleChecker(Database db) { + this.db = db; + this.succeed = true; + } + + public static CycleChecker create(Database db) { + return new CycleChecker(db); + } + + @Override + public void run() { + for (int i = 0; i < validateTxnCnt; i++) { + db.run(tr -> { + final int k = ThreadLocalRandom.current().nextInt(cycleLength); + final String key = Integer.toString(k); + byte[] result1 = tr.get(Tuple.from(key).pack()).join(); + String value1 = Tuple.fromBytes(result1).getString(0); + + byte[] result2 = tr.get(Tuple.from(value1).pack()).join(); + String value2 = Tuple.fromBytes(result2).getString(0); + + byte[] result3 = tr.get(Tuple.from(value2).pack()).join(); + String value3 = Tuple.fromBytes(result3).getString(0); + + byte[] result4 = tr.get(Tuple.from(value3).pack()).join(); + String value4 = Tuple.fromBytes(result4).getString(0); + + if (!key.equals(value4)) { + succeed = false; + } + List actual = new ArrayList<>(Arrays.asList(value1, value2, value3, value4)); + Collections.sort(actual); + if (!expected.equals(actual)) { + succeed = false; + } + return null; + }); + } + } + + public boolean succeed() { + return succeed; + } + } +} diff --git a/bindings/java/src/integration/com/apple/foundationdb/SidebandMultiThreadClientTest.java b/bindings/java/src/integration/com/apple/foundationdb/SidebandMultiThreadClientTest.java new file mode 100644 index 0000000000..fb154a20ab --- /dev/null +++ b/bindings/java/src/integration/com/apple/foundationdb/SidebandMultiThreadClientTest.java @@ -0,0 +1,143 @@ +package com.apple.foundationdb; + +import com.apple.foundationdb.tuple.Tuple; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadLocalRandom; + +import org.junit.jupiter.api.Assertions; + +/** + * Each cluster has a queue, producer writes a key and then send a message to this queue in JVM. + * Consumer would consume the key by checking the existence of the key, if it does not find the key, + * then the test would fail. + * + * This test is to verify the causal consistency of transactions for mutli-threaded client. + */ +public class SidebandMultiThreadClientTest { + public static final MultiClientHelper clientHelper = new MultiClientHelper(); + + private static final Map> db2Queues = new HashMap<>(); + private static final int threadPerDB = 5; + private static final int txnCnt = 1000; + + public static void main(String[] args) throws Exception { + FDB fdb = FDB.selectAPIVersion(710); + setupThreads(fdb); + Collection dbs = clientHelper.openDatabases(fdb); // the clientHelper will close the databases for us + for (Database db : dbs) { + db2Queues.put(db, new LinkedBlockingQueue<>()); + } + System.out.println("Start processing and validating"); + process(dbs); + check(dbs); + System.out.println("Test finished"); + } + + private static synchronized void setupThreads(FDB fdb) { + int clientThreadsPerVersion = clientHelper.readClusterFromEnv().length; + fdb.options().setClientThreadsPerVersion(clientThreadsPerVersion); + System.out.printf("thread per version is %d\n", clientThreadsPerVersion); + fdb.options().setExternalClientDirectory("/var/dynamic-conf/lib"); + fdb.options().setTraceEnable("/tmp"); + fdb.options().setKnob("min_trace_severity=5"); + } + + private static void process(Collection dbs) { + for (Database db : dbs) { + for (int i = 0; i < threadPerDB; i++) { + final Thread thread = new Thread(Producer.create(db, db2Queues.get(db))); + thread.start(); + } + } + } + + private static void check(Collection dbs) throws InterruptedException { + final Map threads2Consumers = new HashMap<>(); + for (Database db : dbs) { + for (int i = 0; i < threadPerDB; i++) { + final Consumer consumer = Consumer.create(db, db2Queues.get(db)); + final Thread thread = new Thread(consumer); + thread.start(); + threads2Consumers.put(thread, consumer); + } + } + + for (Map.Entry entry : threads2Consumers.entrySet()) { + entry.getKey().join(); + final boolean succeed = entry.getValue().succeed; + Assertions.assertTrue(succeed, "Sideband test failed"); + } + } + + public static class Producer implements Runnable { + private final Database db; + private final BlockingQueue queue; + + private Producer(Database db, BlockingQueue queue) { + this.db = db; + this.queue = queue; + } + + public static Producer create(Database db, BlockingQueue queue) { + return new Producer(db, queue); + } + + @Override + public void run() { + for (int i = 0; i < txnCnt; i++) { + final long suffix = ThreadLocalRandom.current().nextLong(); + final String key = String.format("Sideband/Multithread/Test/%d", suffix); + db.run(tr -> { + tr.set(Tuple.from(key).pack(), Tuple.from("bar").pack()); + return null; + }); + queue.offer(key); + } + } + } + + public static class Consumer implements Runnable { + private final Database db; + private final BlockingQueue queue; + private boolean succeed; + + private Consumer(Database db, BlockingQueue queue) { + this.db = db; + this.queue = queue; + this.succeed = true; + } + + public static Consumer create(Database db, BlockingQueue queue) { + return new Consumer(db, queue); + } + + @Override + public void run() { + try { + for (int i = 0; i < txnCnt && succeed; i++) { + final String key = queue.take(); + db.run(tr -> { + byte[] result = tr.get(Tuple.from(key).pack()).join(); + if (result == null) { + System.out.println("FAILED to get key " + key + " from DB " + db); + succeed = false; + } + if (!succeed) { + return null; + } + String value = Tuple.fromBytes(result).getString(0); + return null; + }); + } + } catch (InterruptedException e) { + System.out.println("Get Exception in consumer: " + e); + succeed = false; + } + } + } +} diff --git a/bindings/java/src/tests.cmake b/bindings/java/src/tests.cmake index bf3131eeb3..63fb18322d 100644 --- a/bindings/java/src/tests.cmake +++ b/bindings/java/src/tests.cmake @@ -49,6 +49,8 @@ set(JAVA_INTEGRATION_TESTS src/integration/com/apple/foundationdb/DirectoryTest.java src/integration/com/apple/foundationdb/RangeQueryIntegrationTest.java src/integration/com/apple/foundationdb/BasicMultiClientIntegrationTest.java + src/integration/com/apple/foundationdb/CycleMultiClientIntegrationTest.java + src/integration/com/apple/foundationdb/SidebandMultiThreadClientTest.java ) # Resources that are used in integration testing, but are not explicitly test files (JUnit rules, diff --git a/bindings/python/CMakeLists.txt b/bindings/python/CMakeLists.txt index 0893b08aa8..2fe304a0dc 100644 --- a/bindings/python/CMakeLists.txt +++ b/bindings/python/CMakeLists.txt @@ -81,5 +81,15 @@ if (NOT WIN32 AND NOT OPEN_FOR_IDE) COMMAND ${CMAKE_SOURCE_DIR}/bindings/python/tests/fdbcli_tests.py ${CMAKE_BINARY_DIR}/bin/fdbcli @CLUSTER_FILE@ + 1 + ) + add_fdbclient_test( + NAME multi_process_fdbcli_tests + PROCESS_NUMBER 5 + TEST_TIMEOUT 120 # The test can take near to 1 minutes sometime, set timeout to 2 minutes to be safe + COMMAND ${CMAKE_SOURCE_DIR}/bindings/python/tests/fdbcli_tests.py + ${CMAKE_BINARY_DIR}/bin/fdbcli + @CLUSTER_FILE@ + 5 ) endif() diff --git a/bindings/python/tests/fdbcli_tests.py b/bindings/python/tests/fdbcli_tests.py index 71fe5ee6b3..810257be64 100755 --- a/bindings/python/tests/fdbcli_tests.py +++ b/bindings/python/tests/fdbcli_tests.py @@ -332,22 +332,90 @@ def transaction(logger): output7 = run_fdbcli_command('get', 'key') assert output7 == "`key': not found" +def get_fdb_process_addresses(): + # get all processes' network addresses + output = run_fdbcli_command('kill') + # except the first line, each line is one process + addresses = output.split('\n')[1:] + assert len(addresses) == process_number + return addresses + +@enable_logging() +def coordinators(logger): + # we should only have one coordinator for now + output1 = run_fdbcli_command('coordinators') + assert len(output1.split('\n')) > 2 + cluster_description = output1.split('\n')[0].split(': ')[-1] + logger.debug("Cluster description: {}".format(cluster_description)) + coordinators = output1.split('\n')[1].split(': ')[-1] + # verify the coordinator + coordinator_list = get_value_from_status_json(True, 'client', 'coordinators', 'coordinators') + assert len(coordinator_list) == 1 + assert coordinator_list[0]['address'] == coordinators + # verify the cluster description + assert get_value_from_status_json(True, 'cluster', 'connection_string').startswith('{}:'.format(cluster_description)) + addresses = get_fdb_process_addresses() + # set all 5 processes as coordinators and update the cluster description + new_cluster_description = 'a_simple_description' + run_fdbcli_command('coordinators', *addresses, 'description={}'.format(new_cluster_description)) + # verify now we have 5 coordinators and the description is updated + output2 = run_fdbcli_command('coordinators') + assert output2.split('\n')[0].split(': ')[-1] == new_cluster_description + assert output2.split('\n')[1] == 'Cluster coordinators ({}): {}'.format(5, ','.join(addresses)) + # auto change should go back to 1 coordinator + run_fdbcli_command('coordinators', 'auto') + assert len(get_value_from_status_json(True, 'client', 'coordinators', 'coordinators')) == 1 + +@enable_logging() +def exclude(logger): + # get all processes' network addresses + addresses = get_fdb_process_addresses() + logger.debug("Cluster processes: {}".format(' '.join(addresses))) + # There should be no excluded process for now + no_excluded_process_output = 'There are currently no servers or localities excluded from the database.' + output1 = run_fdbcli_command('exclude') + assert no_excluded_process_output in output1 + # randomly pick one and exclude the process + excluded_address = random.choice(addresses) + # sometimes we need to retry the exclude + while True: + logger.debug("Excluding process: {}".format(excluded_address)) + error_message = run_fdbcli_command_and_get_error('exclude', excluded_address) + if not error_message: + break + logger.debug("Retry exclude after 1 second") + time.sleep(1) + output2 = run_fdbcli_command('exclude') + # logger.debug(output3) + assert 'There are currently 1 servers or localities being excluded from the database' in output2 + assert excluded_address in output2 + run_fdbcli_command('include', excluded_address) + # check the include is successful + output4 = run_fdbcli_command('exclude') + assert no_excluded_process_output in output4 if __name__ == '__main__': - # fdbcli_tests.py - assert len(sys.argv) == 3, "Please pass arguments: " + # fdbcli_tests.py + assert len(sys.argv) == 4, "Please pass arguments: " # shell command template command_template = [sys.argv[1], '-C', sys.argv[2], '--exec'] # tests for fdbcli commands # assertions will fail if fdbcli does not work as expected - advanceversion() - cache_range() - consistencycheck() - datadistribution() - kill() - lockAndUnlock() - maintenance() - setclass() - suspend() - transaction() + process_number = int(sys.argv[3]) + if process_number == 1: + advanceversion() + cache_range() + consistencycheck() + datadistribution() + kill() + lockAndUnlock() + maintenance() + setclass() + suspend() + transaction() + else: + assert process_number > 1, "Process number should be positive" + coordinators() + exclude() + diff --git a/cmake/AddFdbTest.cmake b/cmake/AddFdbTest.cmake index 1ecbb70b8b..d292679e91 100644 --- a/cmake/AddFdbTest.cmake +++ b/cmake/AddFdbTest.cmake @@ -136,6 +136,7 @@ function(add_fdb_test) ${VALGRIND_OPTION} ${ADD_FDB_TEST_TEST_FILES} WORKING_DIRECTORY ${PROJECT_BINARY_DIR}) + set_tests_properties("${test_name}" PROPERTIES ENVIRONMENT UBSAN_OPTIONS=print_stacktrace=1:halt_on_error=1) get_filename_component(test_dir_full ${first_file} DIRECTORY) if(NOT ${test_dir_full} STREQUAL "") get_filename_component(test_dir ${test_dir_full} NAME) @@ -397,7 +398,7 @@ endfunction() # Creates a single cluster before running the specified command (usually a ctest test) function(add_fdbclient_test) set(options DISABLED ENABLED) - set(oneValueArgs NAME) + set(oneValueArgs NAME PROCESS_NUMBER TEST_TIMEOUT) set(multiValueArgs COMMAND) cmake_parse_arguments(T "${options}" "${oneValueArgs}" "${multiValueArgs}" "${ARGN}") if(OPEN_FOR_IDE) @@ -413,12 +414,27 @@ function(add_fdbclient_test) message(FATAL_ERROR "COMMAND is a required argument for add_fdbclient_test") endif() message(STATUS "Adding Client test ${T_NAME}") - add_test(NAME "${T_NAME}" + if (T_PROCESS_NUMBER) + add_test(NAME "${T_NAME}" + COMMAND ${CMAKE_SOURCE_DIR}/tests/TestRunner/tmp_cluster.py + --build-dir ${CMAKE_BINARY_DIR} + --process-number ${T_PROCESS_NUMBER} + -- + ${T_COMMAND}) + else() + add_test(NAME "${T_NAME}" COMMAND ${CMAKE_SOURCE_DIR}/tests/TestRunner/tmp_cluster.py --build-dir ${CMAKE_BINARY_DIR} -- ${T_COMMAND}) - set_tests_properties("${T_NAME}" PROPERTIES TIMEOUT 60) + endif() + if (T_TEST_TIMEOUT) + set_tests_properties("${T_NAME}" PROPERTIES TIMEOUT ${T_TEST_TIMEOUT}) + else() + # default timeout + set_tests_properties("${T_NAME}" PROPERTIES TIMEOUT 60) + endif() + set_tests_properties("${T_NAME}" PROPERTIES ENVIRONMENT UBSAN_OPTIONS=print_stacktrace=1:halt_on_error=1) endfunction() # Creates 3 distinct clusters before running the specified command. diff --git a/fdbclient/IConfigTransaction.cpp b/fdbclient/IConfigTransaction.cpp index d37276fb04..060953d55f 100644 --- a/fdbclient/IConfigTransaction.cpp +++ b/fdbclient/IConfigTransaction.cpp @@ -25,11 +25,3 @@ Reference IConfigTransaction::createTestSimple(ConfigTransactionInterface const& cti) { return makeReference(cti); } - -Reference IConfigTransaction::createSimple(Database const& cx) { - return makeReference(cx); -} - -Reference IConfigTransaction::createPaxos(Database const& cx) { - return makeReference(cx); -} diff --git a/fdbclient/IConfigTransaction.h b/fdbclient/IConfigTransaction.h index 9dfe139c8a..e0c4a64f71 100644 --- a/fdbclient/IConfigTransaction.h +++ b/fdbclient/IConfigTransaction.h @@ -40,8 +40,6 @@ public: virtual ~IConfigTransaction() = default; static Reference createTestSimple(ConfigTransactionInterface const&); - static Reference createSimple(Database const&); - static Reference createPaxos(Database const&); // Not implemented: void setVersion(Version) override { throw client_invalid_operation(); } diff --git a/fdbclient/ISingleThreadTransaction.cpp b/fdbclient/ISingleThreadTransaction.cpp index 0b9a8f5abb..c8fcf828d5 100644 --- a/fdbclient/ISingleThreadTransaction.cpp +++ b/fdbclient/ISingleThreadTransaction.cpp @@ -26,33 +26,15 @@ ISingleThreadTransaction* ISingleThreadTransaction::allocateOnForeignThread(Type type) { if (type == Type::RYW) { - auto tr = - (ReadYourWritesTransaction*)(ReadYourWritesTransaction::operator new(sizeof(ReadYourWritesTransaction))); - tr->preinitializeOnForeignThread(); + auto tr = new ReadYourWritesTransaction; return tr; } else if (type == Type::SIMPLE_CONFIG) { - auto tr = (SimpleConfigTransaction*)(SimpleConfigTransaction::operator new(sizeof(SimpleConfigTransaction))); + auto tr = new SimpleConfigTransaction; return tr; } else if (type == Type::PAXOS_CONFIG) { - auto tr = (PaxosConfigTransaction*)(PaxosConfigTransaction::operator new(sizeof(PaxosConfigTransaction))); + auto tr = new PaxosConfigTransaction; return tr; } ASSERT(false); return nullptr; } - -void ISingleThreadTransaction::create(ISingleThreadTransaction* tr, Type type, Database db) { - switch (type) { - case Type::RYW: - new (tr) ReadYourWritesTransaction(db); - break; - case Type::SIMPLE_CONFIG: - new (tr) SimpleConfigTransaction(db); - break; - case Type::PAXOS_CONFIG: - new (tr) PaxosConfigTransaction(db); - break; - default: - ASSERT(false); - } -} diff --git a/fdbclient/ISingleThreadTransaction.h b/fdbclient/ISingleThreadTransaction.h index 407bf97ca1..4f219cfdbd 100644 --- a/fdbclient/ISingleThreadTransaction.h +++ b/fdbclient/ISingleThreadTransaction.h @@ -45,7 +45,7 @@ public: }; static ISingleThreadTransaction* allocateOnForeignThread(Type type); - static void create(ISingleThreadTransaction* tr, Type type, Database db); + virtual void setDatabase(Database const&) = 0; virtual void setVersion(Version v) = 0; virtual Future getReadVersion() = 0; diff --git a/fdbclient/MultiVersionTransaction.actor.cpp b/fdbclient/MultiVersionTransaction.actor.cpp index ffaf51f73e..afc92bfec0 100644 --- a/fdbclient/MultiVersionTransaction.actor.cpp +++ b/fdbclient/MultiVersionTransaction.actor.cpp @@ -113,7 +113,7 @@ ThreadFuture DLTransaction::getRange(const KeySelectorRef& begin, end.offset, limits.rows, limits.bytes, - FDBStreamingModes::EXACT, + FDB_STREAMING_MODE_EXACT, 0, snapshot, reverse); @@ -207,12 +207,12 @@ ThreadFuture>> DLTransaction::getRangeSplitPoints(c void DLTransaction::addReadConflictRange(const KeyRangeRef& keys) { throwIfError(api->transactionAddConflictRange( - tr, keys.begin.begin(), keys.begin.size(), keys.end.begin(), keys.end.size(), FDBConflictRangeTypes::READ)); + tr, keys.begin.begin(), keys.begin.size(), keys.end.begin(), keys.end.size(), FDB_CONFLICT_RANGE_TYPE_READ)); } void DLTransaction::atomicOp(const KeyRef& key, const ValueRef& value, uint32_t operationType) { api->transactionAtomicOp( - tr, key.begin(), key.size(), value.begin(), value.size(), (FDBMutationTypes::Option)operationType); + tr, key.begin(), key.size(), value.begin(), value.size(), static_cast(operationType)); } void DLTransaction::set(const KeyRef& key, const ValueRef& value) { @@ -239,7 +239,7 @@ ThreadFuture DLTransaction::watch(const KeyRef& key) { void DLTransaction::addWriteConflictRange(const KeyRangeRef& keys) { throwIfError(api->transactionAddConflictRange( - tr, keys.begin.begin(), keys.begin.size(), keys.end.begin(), keys.end.size(), FDBConflictRangeTypes::WRITE)); + tr, keys.begin.begin(), keys.begin.size(), keys.end.begin(), keys.end.size(), FDB_CONFLICT_RANGE_TYPE_WRITE)); } ThreadFuture DLTransaction::commit() { @@ -269,8 +269,10 @@ ThreadFuture DLTransaction::getApproximateSize() { } void DLTransaction::setOption(FDBTransactionOptions::Option option, Optional value) { - throwIfError(api->transactionSetOption( - tr, option, value.present() ? value.get().begin() : nullptr, value.present() ? value.get().size() : 0)); + throwIfError(api->transactionSetOption(tr, + static_cast(option), + value.present() ? value.get().begin() : nullptr, + value.present() ? value.get().size() : 0)); } ThreadFuture DLTransaction::onError(Error const& e) { @@ -309,8 +311,10 @@ Reference DLDatabase::createTransaction() { } void DLDatabase::setOption(FDBDatabaseOptions::Option option, Optional value) { - throwIfError(api->databaseSetOption( - db, option, value.present() ? value.get().begin() : nullptr, value.present() ? value.get().size() : 0)); + throwIfError(api->databaseSetOption(db, + static_cast(option), + value.present() ? value.get().begin() : nullptr, + value.present() ? value.get().size() : 0)); } ThreadFuture DLDatabase::rebootWorker(const StringRef& address, bool check, int duration) { @@ -504,7 +508,7 @@ void DLApi::selectApiVersion(int apiVersion) { init(); throwIfError(api->selectApiVersion(apiVersion, headerVersion)); - throwIfError(api->setNetworkOption(FDBNetworkOptions::EXTERNAL_CLIENT, nullptr, 0)); + throwIfError(api->setNetworkOption(static_cast(FDBNetworkOptions::EXTERNAL_CLIENT), nullptr, 0)); } const char* DLApi::getClientVersion() { @@ -516,8 +520,9 @@ const char* DLApi::getClientVersion() { } void DLApi::setNetworkOption(FDBNetworkOptions::Option option, Optional value) { - throwIfError(api->setNetworkOption( - option, value.present() ? value.get().begin() : nullptr, value.present() ? value.get().size() : 0)); + throwIfError(api->setNetworkOption(static_cast(option), + value.present() ? value.get().begin() : nullptr, + value.present() ? value.get().size() : 0)); } void DLApi::setupNetwork() { diff --git a/fdbclient/MultiVersionTransaction.h b/fdbclient/MultiVersionTransaction.h index a98e16b440..65892a8dbf 100644 --- a/fdbclient/MultiVersionTransaction.h +++ b/fdbclient/MultiVersionTransaction.h @@ -22,6 +22,7 @@ #define FDBCLIENT_MULTIVERSIONTRANSACTION_H #pragma once +#include "bindings/c/foundationdb/fdb_c_options.g.h" #include "fdbclient/FDBOptions.g.h" #include "fdbclient/FDBTypes.h" #include "fdbclient/IClientApi.h" @@ -31,10 +32,10 @@ // FdbCApi is used as a wrapper around the FoundationDB C API that gets loaded from an external client library. // All of the required functions loaded from that external library are stored in function pointers in this struct. struct FdbCApi : public ThreadSafeReferenceCounted { - typedef struct future FDBFuture; - typedef struct cluster FDBCluster; - typedef struct database FDBDatabase; - typedef struct transaction FDBTransaction; + typedef struct FDB_future FDBFuture; + typedef struct FDB_cluster FDBCluster; + typedef struct FDB_database FDBDatabase; + typedef struct FDB_transaction FDBTransaction; #pragma pack(push, 4) typedef struct key { @@ -57,16 +58,16 @@ struct FdbCApi : public ThreadSafeReferenceCounted { // Network fdb_error_t (*selectApiVersion)(int runtimeVersion, int headerVersion); const char* (*getClientVersion)(); - fdb_error_t (*setNetworkOption)(FDBNetworkOptions::Option option, uint8_t const* value, int valueLength); + fdb_error_t (*setNetworkOption)(FDBNetworkOption option, uint8_t const* value, int valueLength); fdb_error_t (*setupNetwork)(); fdb_error_t (*runNetwork)(); fdb_error_t (*stopNetwork)(); - fdb_error_t* (*createDatabase)(const char* clusterFilePath, FDBDatabase** db); + fdb_error_t (*createDatabase)(const char* clusterFilePath, FDBDatabase** db); // Database fdb_error_t (*databaseCreateTransaction)(FDBDatabase* database, FDBTransaction** tr); fdb_error_t (*databaseSetOption)(FDBDatabase* database, - FDBDatabaseOptions::Option option, + FDBDatabaseOption option, uint8_t const* value, int valueLength); void (*databaseDestroy)(FDBDatabase* database); @@ -86,7 +87,7 @@ struct FdbCApi : public ThreadSafeReferenceCounted { // Transaction fdb_error_t (*transactionSetOption)(FDBTransaction* tr, - FDBTransactionOptions::Option option, + FDBTransactionOption option, uint8_t const* value, int valueLength); void (*transactionDestroy)(FDBTransaction* tr); @@ -113,7 +114,7 @@ struct FdbCApi : public ThreadSafeReferenceCounted { int endOffset, int limit, int targetBytes, - FDBStreamingModes::Option mode, + FDBStreamingMode mode, int iteration, fdb_bool_t snapshot, fdb_bool_t reverse); @@ -135,7 +136,7 @@ struct FdbCApi : public ThreadSafeReferenceCounted { int keyNameLength, uint8_t const* param, int paramLength, - FDBMutationTypes::Option operationType); + FDBMutationType operationType); FDBFuture* (*transactionGetEstimatedRangeSizeBytes)(FDBTransaction* tr, uint8_t const* begin_key_name, @@ -163,7 +164,7 @@ struct FdbCApi : public ThreadSafeReferenceCounted { int beginKeyNameLength, uint8_t const* endKeyName, int endKeyNameLength, - FDBConflictRangeTypes::Option); + FDBConflictRangeType); // Future fdb_error_t (*futureGetDatabase)(FDBFuture* f, FDBDatabase** outDb); diff --git a/fdbclient/NativeAPI.actor.h b/fdbclient/NativeAPI.actor.h index 9c64af215f..9709b02d1b 100644 --- a/fdbclient/NativeAPI.actor.h +++ b/fdbclient/NativeAPI.actor.h @@ -244,8 +244,6 @@ public: explicit Transaction(Database const& cx); ~Transaction(); - void preinitializeOnForeignThread() { committedVersion = invalidVersion; } - void setVersion(Version v); Future getReadVersion() { return getReadVersion(0); } Future getRawReadVersion(); @@ -421,7 +419,7 @@ private: Database cx; double backoff; - Version committedVersion; + Version committedVersion{ invalidVersion }; CommitTransactionRequest tr; Future readVersion; Promise> metadataVersion; diff --git a/fdbclient/PaxosConfigTransaction.actor.cpp b/fdbclient/PaxosConfigTransaction.actor.cpp index 907c1cc449..f6ac8b69e9 100644 --- a/fdbclient/PaxosConfigTransaction.actor.cpp +++ b/fdbclient/PaxosConfigTransaction.actor.cpp @@ -122,9 +122,14 @@ void PaxosConfigTransaction::checkDeferredError() const { ASSERT(false); } -PaxosConfigTransaction::PaxosConfigTransaction(Database const& cx) { +PaxosConfigTransaction::PaxosConfigTransaction() { // TODO: Implement ASSERT(false); } PaxosConfigTransaction::~PaxosConfigTransaction() = default; + +void PaxosConfigTransaction::setDatabase(Database const& cx) { + // TODO: Implement + ASSERT(false); +} diff --git a/fdbclient/PaxosConfigTransaction.h b/fdbclient/PaxosConfigTransaction.h index f3af19bb98..ea3c130f8f 100644 --- a/fdbclient/PaxosConfigTransaction.h +++ b/fdbclient/PaxosConfigTransaction.h @@ -33,8 +33,9 @@ class PaxosConfigTransaction final : public IConfigTransaction, public FastAlloc PaxosConfigTransactionImpl& impl() { return *_impl; } public: - PaxosConfigTransaction(Database const&); + PaxosConfigTransaction(); ~PaxosConfigTransaction(); + void setDatabase(Database const&) override; Future getReadVersion() override; Optional getCachedReadVersion() const override; diff --git a/fdbclient/ReadYourWrites.actor.cpp b/fdbclient/ReadYourWrites.actor.cpp index dcb5d322a1..d559cb1f99 100644 --- a/fdbclient/ReadYourWrites.actor.cpp +++ b/fdbclient/ReadYourWrites.actor.cpp @@ -1293,6 +1293,10 @@ ReadYourWritesTransaction::ReadYourWritesTransaction(Database const& cx) applyPersistentOptions(); } +void ReadYourWritesTransaction::setDatabase(Database const& cx) { + *this = ReadYourWritesTransaction(cx); +} + ACTOR Future timebomb(double endTime, Promise resetPromise) { while (now() < endTime) { wait(delayUntil(std::min(endTime + 0.0001, now() + CLIENT_KNOBS->TRANSACTION_TIMEOUT_DELAY_INTERVAL))); @@ -1735,10 +1739,6 @@ void ReadYourWritesTransaction::getWriteConflicts(KeyRangeMap* result) { } } -void ReadYourWritesTransaction::preinitializeOnForeignThread() { - tr.preinitializeOnForeignThread(); -} - void ReadYourWritesTransaction::setTransactionID(uint64_t id) { tr.setTransactionID(id); } diff --git a/fdbclient/ReadYourWrites.h b/fdbclient/ReadYourWrites.h index 7a3afecbe0..c0bbb95f55 100644 --- a/fdbclient/ReadYourWrites.h +++ b/fdbclient/ReadYourWrites.h @@ -68,6 +68,7 @@ public: explicit ReadYourWritesTransaction(Database const& cx); ~ReadYourWritesTransaction(); + void setDatabase(Database const&) override; void setVersion(Version v) override { tr.setVersion(v); } Future getReadVersion() override; Optional getCachedReadVersion() const override { return tr.getCachedReadVersion(); } @@ -153,8 +154,6 @@ public: void getWriteConflicts(KeyRangeMap* result) override; - void preinitializeOnForeignThread(); - Database getDatabase() const { return tr.getDatabase(); } const TransactionInfo& getTransactionInfo() const { return tr.info; } diff --git a/fdbclient/SimpleConfigTransaction.actor.cpp b/fdbclient/SimpleConfigTransaction.actor.cpp index f81cabbcdc..453cf26ae0 100644 --- a/fdbclient/SimpleConfigTransaction.actor.cpp +++ b/fdbclient/SimpleConfigTransaction.actor.cpp @@ -290,10 +290,13 @@ void SimpleConfigTransaction::checkDeferredError() const { impl().checkDeferredError(deferredError); } -SimpleConfigTransaction::SimpleConfigTransaction(Database const& cx) - : _impl(std::make_unique(cx)) {} +void SimpleConfigTransaction::setDatabase(Database const& cx) { + _impl = std::make_unique(cx); +} SimpleConfigTransaction::SimpleConfigTransaction(ConfigTransactionInterface const& cti) : _impl(std::make_unique(cti)) {} +SimpleConfigTransaction::SimpleConfigTransaction() = default; + SimpleConfigTransaction::~SimpleConfigTransaction() = default; diff --git a/fdbclient/SimpleConfigTransaction.h b/fdbclient/SimpleConfigTransaction.h index ced40721af..a84ced8dbb 100644 --- a/fdbclient/SimpleConfigTransaction.h +++ b/fdbclient/SimpleConfigTransaction.h @@ -43,6 +43,8 @@ class SimpleConfigTransaction final : public IConfigTransaction, public FastAllo public: SimpleConfigTransaction(ConfigTransactionInterface const&); SimpleConfigTransaction(Database const&); + SimpleConfigTransaction(); + void setDatabase(Database const&) override; ~SimpleConfigTransaction(); Future getReadVersion() override; Optional getCachedReadVersion() const override; diff --git a/fdbclient/ThreadSafeTransaction.cpp b/fdbclient/ThreadSafeTransaction.cpp index 90716ce4f8..cc696ee553 100644 --- a/fdbclient/ThreadSafeTransaction.cpp +++ b/fdbclient/ThreadSafeTransaction.cpp @@ -149,9 +149,9 @@ ThreadSafeTransaction::ThreadSafeTransaction(DatabaseContext* cx, ISingleThreadT auto tr = this->tr = ISingleThreadTransaction::allocateOnForeignThread(type); // No deferred error -- if the construction of the RYW transaction fails, we have no where to put it onMainThreadVoid( - [tr, type, cx]() { + [tr, cx]() { cx->addref(); - ISingleThreadTransaction::create(tr, type, Database(cx)); + tr->setDatabase(Database(cx)); }, nullptr); } diff --git a/fdbclient/rapidjson/internal/stack.h b/fdbclient/rapidjson/internal/stack.h index 7ab15d42a0..fa43aa0171 100644 --- a/fdbclient/rapidjson/internal/stack.h +++ b/fdbclient/rapidjson/internal/stack.h @@ -17,6 +17,7 @@ #include "../allocators.h" #include "swap.h" +#include #if defined(__clang__) RAPIDJSON_DIAG_PUSH @@ -106,7 +107,7 @@ public: template RAPIDJSON_FORCEINLINE void Reserve(size_t count = 1) { // Expand the stack if needed - if (RAPIDJSON_UNLIKELY(stackTop_ + sizeof(T) * count > stackEnd_)) + if (RAPIDJSON_UNLIKELY(static_cast(sizeof(T) * count) > (stackEnd_ - stackTop_))) Expand(count); } @@ -118,7 +119,7 @@ public: template RAPIDJSON_FORCEINLINE T* PushUnsafe(size_t count = 1) { - RAPIDJSON_ASSERT(stackTop_ + sizeof(T) * count <= stackEnd_); + RAPIDJSON_ASSERT(static_cast(sizeof(T) * count) <= (stackEnd_ - stackTop_)); T* ret = reinterpret_cast(stackTop_); stackTop_ += sizeof(T) * count; return ret; diff --git a/fdbclient/vexillographer/c.cs b/fdbclient/vexillographer/c.cs index 2ea6675dff..dab01ff5cf 100644 --- a/fdbclient/vexillographer/c.cs +++ b/fdbclient/vexillographer/c.cs @@ -52,7 +52,7 @@ namespace vexillographer { string parameterComment = ""; if (o.scope.ToString().EndsWith("Option")) - parameterComment = String.Format("{0}/* {1} */\n", indent, "Parameter: " + o.getParameterComment()); + parameterComment = String.Format("{0}/* {1} {2}*/\n", indent, "Parameter: " + o.getParameterComment(), o.hidden ? "This is a hidden parameter and should not be used directly by applications." : ""); return String.Format("{0}/* {2} */\n{5}{0}{1}{3}={4}", indent, prefix, o.comment, o.name.ToUpper(), o.code, parameterComment); } @@ -64,7 +64,7 @@ namespace vexillographer options = new Option[] { new Option{ scope = scope, comment = "This option is only a placeholder for C compatibility and should not be used", code = -1, name = "DUMMY_DO_NOT_USE", paramDesc = null } }; - outFile.WriteLine(string.Join(",\n\n", options.Where(f => !f.hidden).Select(f => getCLine(f, " ", prefix)).ToArray())); + outFile.WriteLine(string.Join(",\n\n", options.Select(f => getCLine(f, " ", prefix)).ToArray())); outFile.WriteLine("}} FDB{0};", scope.ToString()); outFile.WriteLine(); } diff --git a/fdbmonitor/fdbmonitor.cpp b/fdbmonitor/fdbmonitor.cpp index b0853cf5e8..6d88b54af4 100644 --- a/fdbmonitor/fdbmonitor.cpp +++ b/fdbmonitor/fdbmonitor.cpp @@ -137,6 +137,8 @@ int severity_to_priority(Severity severity) { } } +typedef std::string ProcessID; + bool daemonize = false; std::string logGroup = "default"; @@ -390,11 +392,11 @@ public: int pipes[2][2]; Command() : argv(nullptr) {} - Command(const CSimpleIni& ini, std::string _section, uint64_t id, fdb_fd_set fds, int* maxfd) + Command(const CSimpleIni& ini, std::string _section, ProcessID id, fdb_fd_set fds, int* maxfd) : section(_section), argv(nullptr), fork_retry_time(-1), quiet(false), delete_envvars(nullptr), fds(fds), deconfigured(false), kill_on_configuration_change(true) { char _ssection[strlen(section.c_str()) + 22]; - snprintf(_ssection, strlen(section.c_str()) + 22, "%s.%" PRIu64, section.c_str(), id); + snprintf(_ssection, strlen(section.c_str()) + 22, "%s", id.c_str()); ssection = _ssection; for (auto p : pipes) { @@ -593,9 +595,9 @@ public: } }; -std::unordered_map> id_command; -std::unordered_map pid_id; -std::unordered_map id_pid; +std::unordered_map> id_command; +std::unordered_map pid_id; +std::unordered_map id_pid; enum { OPT_CONFFILE, OPT_LOCKFILE, OPT_LOGGROUP, OPT_DAEMONIZE, OPT_HELP }; @@ -608,7 +610,7 @@ CSimpleOpt::SOption g_rgOptions[] = { { OPT_CONFFILE, "--conffile", SO_REQ_SEP } { OPT_HELP, "--help", SO_NONE }, SO_END_OF_OPTIONS }; -void start_process(Command* cmd, uint64_t id, uid_t uid, gid_t gid, int delay, sigset_t* mask) { +void start_process(Command* cmd, ProcessID id, uid_t uid, gid_t gid, int delay, sigset_t* mask) { if (!cmd->argv) return; @@ -758,7 +760,7 @@ bool argv_equal(const char** a1, const char** a2) { return true; } -void kill_process(uint64_t id, bool wait = true) { +void kill_process(ProcessID id, bool wait = true) { pid_t pid = id_pid[id]; log_msg(SevInfo, "Killing process %d\n", pid); @@ -815,7 +817,7 @@ void load_conf(const char* confpath, uid_t& uid, gid_t& gid, sigset_t* mask, fdb /* Any change to uid or gid requires the process to be restarted to take effect */ if (uid != _uid || gid != _gid) { - std::vector kill_ids; + std::vector kill_ids; for (auto i : id_pid) { if (id_command[i.first]->kill_on_configuration_change) { kill_ids.push_back(i.first); @@ -831,8 +833,8 @@ void load_conf(const char* confpath, uid_t& uid, gid_t& gid, sigset_t* mask, fdb gid = _gid; } - std::list kill_ids; - std::list> start_ids; + std::list kill_ids; + std::list> start_ids; for (auto i : id_pid) { if (!loadedConf || ini.GetSectionSize(id_command[i.first]->ssection.c_str()) == -1) { @@ -881,31 +883,24 @@ void load_conf(const char* confpath, uid_t& uid, gid_t& gid, sigset_t* mask, fdb ini.GetAllSections(sections); for (auto i : sections) { if (auto dot = strrchr(i.pItem, '.')) { - char* strtol_end; + ProcessID id = i.pItem; + if (!id_pid.count(id)) { + /* Found something we haven't yet started */ + Command* cmd; - uint64_t id = strtoull(dot + 1, &strtol_end, 10); + auto itr = id_command.find(id); + if (itr != id_command.end()) { + cmd = itr->second.get(); + } else { + std::string section(i.pItem, dot - i.pItem); + auto p = std::make_unique(ini, section, id, rfds, maxfd); + cmd = p.get(); + id_command[id] = std::move(p); + } - if (*strtol_end != '\0' || !(id > 0)) { - log_msg(SevError, "Found bogus id in %s\n", i.pItem); - } else { - if (!id_pid.count(id)) { - /* Found something we haven't yet started */ - Command* cmd; - - auto itr = id_command.find(id); - if (itr != id_command.end()) { - cmd = itr->second.get(); - } else { - std::string section(i.pItem, dot - i.pItem); - auto p = std::make_unique(ini, section, id, rfds, maxfd); - cmd = p.get(); - id_command[id] = std::move(p); - } - - if (cmd->fork_retry_time <= timer()) { - log_msg(SevInfo, "Starting %s\n", i.pItem); - start_process(cmd, id, uid, gid, 0, mask); - } + if (cmd->fork_retry_time <= timer()) { + log_msg(SevInfo, "Starting %s\n", i.pItem); + start_process(cmd, id, uid, gid, 0, mask); } } } @@ -1724,7 +1719,7 @@ int main(int argc, char** argv) { break; } - uint64_t id = pid_id[pid]; + ProcessID id = pid_id[pid]; Command* cmd = id_command[id].get(); pid_id.erase(pid); diff --git a/fdbrpc/FlowTransport.actor.cpp b/fdbrpc/FlowTransport.actor.cpp index e16264591f..8a6b32df56 100644 --- a/fdbrpc/FlowTransport.actor.cpp +++ b/fdbrpc/FlowTransport.actor.cpp @@ -919,11 +919,11 @@ ACTOR static void deliver(TransportData* self, TaskPriority priority, ArenaReader reader, bool inReadSocket) { - // We want to run the task at the right priority. If the priority - // is higher than the current priority (which is ReadSocket) we - // can just upgrade. Otherwise we'll context switch so that we - // don't block other tasks that might run with a higher priority. - if (priority < TaskPriority::ReadSocket || !inReadSocket) { + // We want to run the task at the right priority. If the priority is higher than the current priority (which is + // ReadSocket) we can just upgrade. Otherwise we'll context switch so that we don't block other tasks that might run + // with a higher priority. ReplyPromiseStream needs to guarentee that messages are recieved in the order they were + // sent, so even in the case of local delivery those messages need to skip this delay. + if (priority < TaskPriority::ReadSocket || (priority != TaskPriority::NoDeliverDelay && !inReadSocket)) { wait(delay(0, priority)); } else { g_network->setCurrentTask(priority); diff --git a/fdbrpc/fdbrpc.h b/fdbrpc/fdbrpc.h index b0443d2a23..25736055f6 100644 --- a/fdbrpc/fdbrpc.h +++ b/fdbrpc/fdbrpc.h @@ -404,13 +404,13 @@ struct NetNotifiedQueueWithAcknowledgements final : NotifiedQueue, // Notify the server that a client is not using this ReplyPromiseStream anymore FlowTransport::transport().sendUnreliable( SerializeSource>(operation_obsolete()), - acknowledgements.getEndpoint(TaskPriority::ReadSocket), + acknowledgements.getEndpoint(TaskPriority::NoDeliverDelay), false); } if (isRemoteEndpoint() && !sentError && !acknowledgements.failures.isReady()) { // The ReplyPromiseStream was cancelled before sending an error, so the storage server must have died FlowTransport::transport().sendUnreliable(SerializeSource>>(broken_promise()), - getEndpoint(TaskPriority::ReadSocket), + getEndpoint(TaskPriority::NoDeliverDelay), false); } } @@ -421,6 +421,9 @@ struct NetNotifiedQueueWithAcknowledgements final : NotifiedQueue, template class ReplyPromiseStream { public: + // The endpoints of a ReplyPromiseStream must be initialized at Task::NoDeliverDelay, because a + // delay(0) in FlowTransport deliver can cause out of order delivery. + // stream.send( request ) // Unreliable at most once delivery: Delivers request unless there is a connection failure (zero or one times) @@ -428,7 +431,7 @@ public: void send(U&& value) const { if (queue->isRemoteEndpoint()) { if (!queue->acknowledgements.getRawEndpoint().isValid()) { - value.acknowledgeToken = queue->acknowledgements.getEndpoint(TaskPriority::ReadSocket).token; + value.acknowledgeToken = queue->acknowledgements.getEndpoint(TaskPriority::NoDeliverDelay).token; } queue->acknowledgements.bytesSent += value.expectedSize(); FlowTransport::transport().sendUnreliable( @@ -489,9 +492,9 @@ public: errors->delPromiseRef(); } - // The endpoints of a ReplyPromiseStream must be initialized at Task::ReadSocket, because with lower priorities a + // The endpoints of a ReplyPromiseStream must be initialized at Task::NoDeliverDelay, because with lower priorities a // delay(0) in FlowTransport deliver can cause out of order delivery. - const Endpoint& getEndpoint() const { return queue->getEndpoint(TaskPriority::ReadSocket); } + const Endpoint& getEndpoint() const { return queue->getEndpoint(TaskPriority::NoDeliverDelay); } bool operator==(const ReplyPromiseStream& rhs) const { return queue == rhs.queue; } bool isEmpty() const { return !queue->isReady(); } diff --git a/fdbserver/TagPartitionedLogSystem.actor.cpp b/fdbserver/TagPartitionedLogSystem.actor.cpp index 0411bf2c86..2ab2b18062 100644 --- a/fdbserver/TagPartitionedLogSystem.actor.cpp +++ b/fdbserver/TagPartitionedLogSystem.actor.cpp @@ -1529,11 +1529,12 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounteddbgid; state Future maxGetPoppedDuration = delay(SERVER_KNOBS->TXS_POPPED_MAX_DELAY); wait(waitForAll(poppedReady) || maxGetPoppedDuration); if (maxGetPoppedDuration.isReady()) { - TraceEvent(SevWarnAlways, "PoppedTxsNotReady", self->dbgid); + TraceEvent(SevWarnAlways, "PoppedTxsNotReady", dbgid); } Version maxPopped = 1; diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index e9a23ab309..6008251127 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -3891,7 +3891,7 @@ ACTOR Future update(StorageServer* data, bool* pReceivedUpdate) { if (ver != invalidVersion && ver > data->version.get()) { // TODO(alexmiller): Update to version tracking. - DEBUG_KEY_RANGE("SSUpdate", ver, KeyRangeRef()); + // DEBUG_KEY_RANGE("SSUpdate", ver, KeyRangeRef()); data->mutableData().createNewVersion(ver); if (data->otherError.getFuture().isReady()) @@ -4179,7 +4179,7 @@ bool StorageServerDisk::makeVersionMutationsDurable(Version& prevStorageVersion, VerUpdateRef const& v = u->second; ASSERT(v.version > prevStorageVersion && v.version <= newStorageVersion); // TODO(alexmiller): Update to version tracking. - DEBUG_KEY_RANGE("makeVersionMutationsDurable", v.version, KeyRangeRef()); + // DEBUG_KEY_RANGE("makeVersionMutationsDurable", v.version, KeyRangeRef()); writeMutations(v.mutations, v.version, "makeVersionDurable"); for (const auto& m : v.mutations) bytesLeft -= mvccStorageBytes(m); diff --git a/flow/Histogram.cpp b/flow/Histogram.cpp index 595dcc8fef..7814b89d60 100644 --- a/flow/Histogram.cpp +++ b/flow/Histogram.cpp @@ -45,7 +45,7 @@ static HistogramRegistry* globalHistograms = nullptr; #pragma region HistogramRegistry HistogramRegistry& GetHistogramRegistry() { - ISimulator::ProcessInfo* info = g_simulator.getCurrentProcess(); + ISimulator::ProcessInfo* info = g_network && g_network->isSimulated() ? g_simulator.getCurrentProcess() : nullptr; if (info) { // in simulator; scope histograms to simulated process diff --git a/flow/network.h b/flow/network.h index d12ba1a0e8..0d0f8a2d34 100644 --- a/flow/network.h +++ b/flow/network.h @@ -45,6 +45,7 @@ enum class TaskPriority { WriteSocket = 10000, PollEIO = 9900, DiskIOComplete = 9150, + NoDeliverDelay = 9100, LoadBalancedEndpoint = 9000, ReadSocket = 9000, AcceptSocket = 8950, diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 1feb231389..2105391eef 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -275,10 +275,12 @@ if(WITH_PYTHON) NAME multiversion_client/unit_tests COMMAND $ -r unittests -f /fdbclient/multiversionclient/ ) + set_tests_properties("multiversion_client/unit_tests" PROPERTIES ENVIRONMENT UBSAN_OPTIONS=print_stacktrace=1:halt_on_error=1) add_test( NAME threadsafe_threadfuture_to_future/unit_tests COMMAND $ -r unittests -f /flow/safeThreadFutureToFuture/ ) + set_tests_properties("threadsafe_threadfuture_to_future/unit_tests" PROPERTIES ENVIRONMENT UBSAN_OPTIONS=print_stacktrace=1:halt_on_error=1) endif() verify_testing() diff --git a/tests/TestRunner/local_cluster.py b/tests/TestRunner/local_cluster.py index 68318d51dd..11c6c7b7e7 100644 --- a/tests/TestRunner/local_cluster.py +++ b/tests/TestRunner/local_cluster.py @@ -38,7 +38,7 @@ cluster_file = {etcdir}/fdb.cluster command = {fdbserver_bin} public_address = auto:$ID listen_address = public -datadir = {datadir} +datadir = {datadir}/$ID logdir = {logdir} # logsize = 10MiB # maxlogssize = 100MiB @@ -53,13 +53,13 @@ logdir = {logdir} ## An individual fdbserver process with id 4000 ## Parameters set here override defaults from the [fdbserver] section -[fdbserver.{server_port}] - """ + +""" valid_letters_for_secret = string.ascii_letters + string.digits def __init__(self, basedir: str, fdbserver_binary: str, fdbmonitor_binary: str, - fdbcli_binary: str, create_config=True, port=None, ip_address=None): + fdbcli_binary: str, process_number: int, create_config=True, port=None, ip_address=None): self.basedir = Path(basedir) self.fdbserver_binary = Path(fdbserver_binary) self.fdbmonitor_binary = Path(fdbmonitor_binary) @@ -93,9 +93,16 @@ logdir = {logdir} etcdir=self.etc, fdbserver_bin=self.fdbserver_binary, datadir=self.data, - logdir=self.log, - server_port=self.port + logdir=self.log )) + # By default, the cluster only has one process + # If a port number is given and process_number > 1, we will use subsequent numbers + # E.g., port = 4000, process_number = 5 + # Then 4000,4001,4002,4003,4004 will be used as ports + # If port number is not given, we will randomly pick free ports + for index, _ in enumerate(range(process_number)): + f.write('[fdbserver.{server_port}]\n'.format(server_port=self.port)) + self.port = get_free_port() if port is None else str(int(self.port) + 1) def __enter__(self): assert not self.running, "Can't start a server that is already running" diff --git a/tests/TestRunner/tmp_cluster.py b/tests/TestRunner/tmp_cluster.py index f8ae4ef813..5c84f6086f 100755 --- a/tests/TestRunner/tmp_cluster.py +++ b/tests/TestRunner/tmp_cluster.py @@ -11,7 +11,7 @@ from random import choice from pathlib import Path class TempCluster: - def __init__(self, build_dir: str,port: str = None): + def __init__(self, build_dir: str, process_number: int = 1, port: str = None): self.build_dir = Path(build_dir).resolve() assert self.build_dir.exists(), "{} does not exist".format(build_dir) assert self.build_dir.is_dir(), "{} is not a directory".format(build_dir) @@ -23,6 +23,7 @@ class TempCluster: self.build_dir.joinpath('bin', 'fdbserver'), self.build_dir.joinpath('bin', 'fdbmonitor'), self.build_dir.joinpath('bin', 'fdbcli'), + process_number, port = port) self.log = self.cluster.log self.etc = self.cluster.etc @@ -63,9 +64,10 @@ if __name__ == '__main__': """) parser.add_argument('--build-dir', '-b', metavar='BUILD_DIRECTORY', help='FDB build directory', required=True) parser.add_argument('cmd', metavar="COMMAND", nargs="+", help="The command to run") + parser.add_argument('--process-number', '-p', help="Number of fdb processes running", type=int, default=1) args = parser.parse_args() errcode = 1 - with TempCluster(args.build_dir) as cluster: + with TempCluster(args.build_dir, args.process_number) as cluster: print("log-dir: {}".format(cluster.log)) print("etc-dir: {}".format(cluster.etc)) print("data-dir: {}".format(cluster.data))