Merge branch 'main' into transaction-debug-logging

This commit is contained in:
A.J. Beamon 2023-02-28 14:18:51 -08:00
commit 310fc2ff4e
80 changed files with 495 additions and 200 deletions

View File

@ -25,8 +25,9 @@ import os
sys.path[:0] = [os.path.join(os.path.dirname(__file__), '..', '..', 'bindings', 'python')]
import util
from fdb import LATEST_API_VERSION
FDB_API_VERSION = 720
FDB_API_VERSION = LATEST_API_VERSION
LOGGING = {
'version': 1,

View File

@ -57,6 +57,9 @@ API_VERSIONS = [
700, 710, 720,
]
assert (
API_VERSIONS[-1] == FDB_API_VERSION
), "Bindingtester API version list must be updated to include all supported versions"
fdb.api_version(FDB_API_VERSION)

View File

@ -20,7 +20,9 @@
import os
MAX_API_VERSION = 720
from bindingtester import FDB_API_VERSION
MAX_API_VERSION = FDB_API_VERSION
COMMON_TYPES = ['null', 'bytes', 'string', 'int', 'uuid', 'bool', 'float', 'double', 'tuple']
ALL_TYPES = COMMON_TYPES + ['versionstamp']

View File

@ -34,10 +34,9 @@ fdb.api_version(FDB_API_VERSION)
class ScriptedTest(Test):
TEST_API_VERSION = 720
def __init__(self, subspace):
super(ScriptedTest, self).__init__(subspace, ScriptedTest.TEST_API_VERSION, ScriptedTest.TEST_API_VERSION)
super(ScriptedTest, self).__init__(subspace, FDB_API_VERSION, FDB_API_VERSION)
self.workspace = self.subspace['workspace']
self.results_subspace = self.subspace['results']
# self.thread_subspace = self.subspace['threads'] # TODO: update START_THREAD so that we can create threads in subspaces

View File

@ -41,12 +41,19 @@ add_custom_target(fdb_c_generated DEPENDS ${asm_file}
vexillographer_compile(TARGET fdb_c_options LANG c OUT ${CMAKE_CURRENT_BINARY_DIR}/foundationdb/fdb_c_options.g.h
OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/foundationdb/fdb_c_options.g.h)
if(NOT DEFINED FDB_API_VERSION_FILE)
message(FATAL_ERROR "Missing definitions of API versions")
endif()
include(${FDB_API_VERSION_FILE})
set(fdb_c_apiversion_file ${CMAKE_CURRENT_BINARY_DIR}/foundationdb/fdb_c_apiversion.g.h)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/foundationdb/fdb_c_apiversion.h.cmake ${fdb_c_apiversion_file})
include(GenerateExportHeader)
if(OPEN_FOR_IDE)
add_library(fdb_c OBJECT ${FDB_C_SRCS} ${asm_file})
add_library(fdb_c OBJECT ${FDB_C_SRCS} ${fdb_c_apiversion_file} ${asm_file})
else()
add_library(fdb_c SHARED ${FDB_C_SRCS} ${asm_file})
add_library(fdb_c SHARED ${FDB_C_SRCS} ${fdb_c_apiversion_file} ${asm_file})
strip_debug_symbols(fdb_c)
endif()
add_dependencies(fdb_c fdb_c_generated fdb_c_options)
@ -507,6 +514,7 @@ fdb_install(
fdb_install(
FILES foundationdb/fdb_c.h
${CMAKE_CURRENT_BINARY_DIR}/foundationdb/fdb_c_options.g.h
${fdb_c_apiversion_file}
${CMAKE_SOURCE_DIR}/fdbclient/vexillographer/fdb.options
${CMAKE_SOURCE_DIR}/bindings/c/foundationdb/fdb_c_types.h
DESTINATION include

View File

@ -23,7 +23,7 @@
#include "fdbclient/FDBTypes.h"
#include "flow/ProtocolVersion.h"
#include <cstdint>
#define FDB_API_VERSION 730
#define FDB_USE_LATEST_API_VERSION
#define FDB_INCLUDE_LEGACY_TYPES
#include "fdbclient/MultiVersionTransaction.h"

View File

@ -26,11 +26,18 @@
#define DLLEXPORT
#endif
#include "fdb_c_apiversion.g.h"
#if (defined FDB_USE_LATEST_API_VERSION)
#define FDB_API_VERSION FDB_LATEST_API_VERSION
#elif (defined FDB_USE_LATEST_BINDINGS_API_VERSION)
#define FDB_API_VERSION FDB_LATEST_BINDINGS_API_VERSION
#endif
#if !defined(FDB_API_VERSION)
#error You must #define FDB_API_VERSION prior to including fdb_c.h (current version is 730)
#error You must #define FDB_API_VERSION prior to including fdb_c.h (the latest version is defined as FDB_LATEST_API_VERSION)
#elif FDB_API_VERSION < 13
#error API version no longer supported (upgrade to 13)
#elif FDB_API_VERSION > 730
#elif FDB_API_VERSION > FDB_LATEST_API_VERSION
#error Requested API version requires a newer version of this header
#endif

View File

@ -0,0 +1,41 @@
#ifndef FDB_C_APIVERSION_G_H
#define FDB_C_APIVERSION_G_H
#pragma once
/*
* fdb_c_apiversion.g.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2023 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the 'License');
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an 'AS IS' BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Do not include this file directly.
*/
/* The latest FDB C API version */
#define FDB_LATEST_API_VERSION @FDB_AV_LATEST_VERSION@
/* The latest FDB API version supported by bindings. It may lag behind the latest C API version */
#define FDB_LATEST_BINDINGS_API_VERSION @FDB_AV_LATEST_BINDINGS_VERSION@
/* API version introducing client_tmp_dir option */
#define FDB_API_VERSION_CLIENT_TMP_DIR @FDB_AV_CLIENT_TMP_DIR@
/* API version introducing disable_client_bypass option */
#define FDB_API_VERSION_DISABLE_CLIENT_BYPASS @FDB_AV_DISABLE_CLIENT_BYPASS@
/* API version with multitenancy API released */
#define FDB_API_VERSION_TENANT_API_RELEASED @FDB_AV_TENANT_API_RELEASED@
#endif

View File

@ -26,8 +26,9 @@
#include <string>
#include <unordered_map>
#include <vector>
#include <foundationdb/fdb_c_apiversion.g.h>
#define FDB_API_VERSION 730
#define FDB_API_VERSION FDB_LATEST_API_VERSION
namespace FdbApiTester {

View File

@ -36,8 +36,6 @@ namespace FdbApiTester {
namespace {
#define API_VERSION_CLIENT_TMP_DIR 720
enum TesterOptionId {
OPT_CONNFILE,
OPT_HELP,
@ -294,7 +292,7 @@ void fdb_check(fdb::Error e, std::string_view msg, fdb::Error::CodeType expected
}
void applyNetworkOptions(TesterOptions& options) {
if (!options.tmpDir.empty() && options.apiVersion >= API_VERSION_CLIENT_TMP_DIR) {
if (!options.tmpDir.empty() && options.apiVersion >= FDB_API_VERSION_CLIENT_TMP_DIR) {
fdb::network::setOption(FDBNetworkOption::FDB_NET_OPTION_CLIENT_TMP_DIR, options.tmpDir);
}
if (!options.externalClientLibrary.empty()) {
@ -329,7 +327,7 @@ void applyNetworkOptions(TesterOptions& options) {
fdb::network::setOption(FDBNetworkOption::FDB_NET_OPTION_CLIENT_BUGGIFY_ENABLE);
}
if (options.testSpec.disableClientBypass && options.apiVersion >= 720) {
if (options.testSpec.disableClientBypass && options.apiVersion >= FDB_API_VERSION_DISABLE_CLIENT_BYPASS) {
fdb::network::setOption(FDBNetworkOption::FDB_NET_OPTION_DISABLE_CLIENT_BYPASS);
}

View File

@ -45,8 +45,6 @@
#error Unsupported platform
#endif
#define API_VERSION_CLIENT_TMP_DIR 720
using namespace std::string_view_literals;
namespace {
@ -276,7 +274,7 @@ std::string stringToUpper(const std::string& str) {
}
void applyNetworkOptions() {
if (!options.tmpDir.empty() && options.apiVersion >= API_VERSION_CLIENT_TMP_DIR) {
if (!options.tmpDir.empty() && options.apiVersion >= FDB_API_VERSION_CLIENT_TMP_DIR) {
fdb::network::setOption(FDBNetworkOption::FDB_NET_OPTION_CLIENT_TMP_DIR, options.tmpDir);
}
if (!options.externalClientLibrary.empty()) {

View File

@ -18,7 +18,7 @@
* limitations under the License.
*/
#define FDB_API_VERSION 730
#define FDB_USE_LATEST_API_VERSION
#include <foundationdb/fdb_c.h>
#include "unit/fdb_api.hpp"

View File

@ -23,7 +23,7 @@
#pragma once
#ifndef FDB_API_VERSION
#define FDB_API_VERSION 730
#define FDB_USE_LATEST_API_VERSION
#endif
#include <cassert>
@ -46,8 +46,6 @@ namespace native {
#include <foundationdb/fdb_c.h>
}
#define TENANT_API_VERSION_GUARD 720
using ByteString = std::basic_string<uint8_t>;
using BytesRef = std::basic_string_view<uint8_t>;
using CharsRef = std::string_view;
@ -1088,7 +1086,7 @@ public:
};
inline Error selectApiVersionNothrow(int version) {
if (version < TENANT_API_VERSION_GUARD) {
if (version < FDB_API_VERSION_TENANT_API_RELEASED) {
Tenant::tenantManagementMapPrefix = "\xff\xff/management/tenant_map/";
}
return Error(native::fdb_select_api_version(version));
@ -1101,7 +1099,7 @@ inline void selectApiVersion(int version) {
}
inline Error selectApiVersionCappedNothrow(int version) {
if (version < TENANT_API_VERSION_GUARD) {
if (version < FDB_API_VERSION_TENANT_API_RELEASED) {
Tenant::tenantManagementMapPrefix = "\xff\xff/management/tenant_map/";
}
return Error(

View File

@ -22,7 +22,7 @@
#define MAKO_HPP
#ifndef FDB_API_VERSION
#define FDB_API_VERSION 730
#define FDB_USE_LATEST_API_VERSION
#endif
#include <array>

View File

@ -29,7 +29,7 @@
#include <inttypes.h>
#ifndef FDB_API_VERSION
#define FDB_API_VERSION 730
#define FDB_USE_LATEST_API_VERSION
#endif
#include <foundationdb/fdb_c.h>

View File

@ -20,7 +20,7 @@
// Unit tests that test the timeouts for a disconnected cluster
#define FDB_API_VERSION 730
#define FDB_USE_LATEST_API_VERSION
#include <foundationdb/fdb_c.h>
#include <chrono>

View File

@ -39,7 +39,7 @@
#pragma once
#define FDB_API_VERSION 730
#define FDB_USE_LATEST_API_VERSION
#include <foundationdb/fdb_c.h>
#include <string>

View File

@ -20,7 +20,7 @@
// Unit tests for API setup, network initialization functions from the FDB C API.
#define FDB_API_VERSION 730
#define FDB_USE_LATEST_API_VERSION
#include <foundationdb/fdb_c.h>
#include <iostream>
#include <thread>

View File

@ -26,7 +26,7 @@
#include "flow/Platform.h"
#define FDB_API_VERSION 720
#define FDB_USE_LATEST_API_VERSION
#include "foundationdb/fdb_c.h"
#undef NDEBUG

View File

@ -21,7 +21,7 @@
// Unit tests for the FoundationDB C API.
#include "fdb_c_options.g.h"
#define FDB_API_VERSION 730
#define FDB_USE_LATEST_API_VERSION
#include <foundationdb/fdb_c.h>
#include <assert.h>
#include <string.h>

View File

@ -18,7 +18,7 @@
* limitations under the License.
*/
#define FDB_API_VERSION 730
#define FDB_USE_LATEST_API_VERSION
#include "foundationdb/fdb_c.h"
#undef DLLEXPORT
#include "workloads.h"

View File

@ -23,7 +23,7 @@
#include <flow/flow.h>
#define FDB_API_VERSION 720
#define FDB_USE_LATEST_BINDINGS_API_VERSION
#include <bindings/c/foundationdb/fdb_c.h>
#undef DLLEXPORT

View File

@ -114,6 +114,7 @@ set(JAVA_TESTS_SRCS
src/test/com/apple/foundationdb/test/StackOperation.java
src/test/com/apple/foundationdb/test/StackTester.java
src/test/com/apple/foundationdb/test/StackUtils.java
src/test/com/apple/foundationdb/test/TestApiVersion.java
src/test/com/apple/foundationdb/test/TesterArgs.java
src/test/com/apple/foundationdb/test/TestResult.java
src/test/com/apple/foundationdb/test/TuplePerformanceTest.java
@ -139,6 +140,12 @@ set(GENERATED_JAVA_FILES
vexillographer_compile(TARGET fdb_java_options LANG java OUT ${GENERATED_JAVA_DIR}
OUTPUT ${GENERATED_JAVA_FILES})
if(NOT DEFINED FDB_API_VERSION_FILE)
message(FATAL_ERROR "Missing definitions of API versions")
endif()
include(${FDB_API_VERSION_FILE})
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/src/main/com/apple/foundationdb/ApiVersion.java.cmake ${GENERATED_JAVA_DIR}/ApiVersion.java)
set(SYSTEM_NAME "linux")
if(APPLE)
set(SYSTEM_NAME "osx")
@ -198,7 +205,7 @@ file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/resources/META-INF)
set(MANIFEST_FILE ${CMAKE_CURRENT_BINARY_DIR}/resources/META-INF/MANIFEST.MF)
file(WRITE ${MANIFEST_FILE} ${MANIFEST_TEXT})
add_jar(fdb-java ${JAVA_BINDING_SRCS} ${GENERATED_JAVA_FILES} ${CMAKE_SOURCE_DIR}/LICENSE
add_jar(fdb-java ${JAVA_BINDING_SRCS} ${GENERATED_JAVA_FILES} ${CMAKE_SOURCE_DIR}/LICENSE ${GENERATED_JAVA_DIR}/ApiVersion.java
OUTPUT_DIR ${PROJECT_BINARY_DIR}/lib VERSION ${FDB_VERSION} MANIFEST ${MANIFEST_FILE} GENERATE_NATIVE_HEADERS fdb_java_native)
add_dependencies(fdb-java fdb_java_options)

View File

@ -19,7 +19,7 @@
*/
#include <foundationdb/ClientWorkload.h>
#define FDB_API_VERSION 720
#define FDB_USE_LATEST_BINDINGS_API_VERSION
#include <foundationdb/fdb_c.h>
#include "com_apple_foundationdb_testing_AbstractWorkload.h"

View File

@ -36,7 +36,7 @@
#include "com_apple_foundationdb_FutureStrings.h"
#include "com_apple_foundationdb_NativeFuture.h"
#define FDB_API_VERSION 720
#define FDB_USE_LATEST_BINDINGS_API_VERSION
#include <foundationdb/fdb_c.h>

View File

@ -33,8 +33,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
*/
@ExtendWith(RequiresDatabase.class)
class BlobGranuleIntegrationTest {
public static final int API_VERSION = 720;
private static final FDB fdb = FDB.selectAPIVersion(API_VERSION);
private static final FDB fdb = FDB.selectAPIVersion(ApiVersion.LATEST);
@BeforeEach
@AfterEach

View File

@ -40,8 +40,6 @@ import org.junit.jupiter.api.Assertions;
* This test is to verify the atomicity of transactions.
*/
public class CycleMultiClientIntegrationTest {
public static final int API_VERSION = 720;
public static final MultiClientHelper clientHelper = new MultiClientHelper();
// more write txn than validate txn, as parent thread waits only for validate txn.
@ -53,7 +51,7 @@ public class CycleMultiClientIntegrationTest {
private static List<String> expected = new ArrayList<>(Arrays.asList("0", "1", "2", "3"));
public static void main(String[] args) throws Exception {
FDB fdb = FDB.selectAPIVersion(API_VERSION);
FDB fdb = FDB.selectAPIVersion(ApiVersion.LATEST);
setupThreads(fdb);
Collection<Database> dbs = clientHelper.openDatabases(fdb); // the clientHelper will close the databases for us
System.out.println("Starting tests");

View File

@ -40,8 +40,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
*/
@ExtendWith(RequiresDatabase.class)
class DirectoryTest {
public static final int API_VERSION = 720;
private static final FDB fdb = FDB.selectAPIVersion(API_VERSION);
private static final FDB fdb = FDB.selectAPIVersion(ApiVersion.LATEST);
@Test
void testCanCreateDirectory() throws Exception {

View File

@ -27,8 +27,7 @@ import org.junit.jupiter.api.Test;
* all tests will be skipped if it can't connect to a running instance relatively quickly.
*/
class GetClientStatusIntegrationTest {
public static final int API_VERSION = 720;
private static final FDB fdb = FDB.selectAPIVersion(API_VERSION);
private static final FDB fdb = FDB.selectAPIVersion(ApiVersion.LATEST);
@Test
public void clientStatusIsHealthy() throws Exception {

View File

@ -41,8 +41,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
@ExtendWith(RequiresDatabase.class)
class MappedRangeQueryIntegrationTest {
public static final int API_VERSION = 720;
private static final FDB fdb = FDB.selectAPIVersion(API_VERSION);
private static final FDB fdb = FDB.selectAPIVersion(ApiVersion.LATEST);
public String databaseArg = null;
private Database openFDB() { return fdb.open(databaseArg); }
@ -111,7 +110,7 @@ class MappedRangeQueryIntegrationTest {
boolean validate = true;
@Test
void comparePerformance() {
FDB fdb = FDB.selectAPIVersion(API_VERSION);
FDB fdb = FDB.selectAPIVersion(ApiVersion.LATEST);
try (Database db = openFDB()) {
insertRecordsWithIndexes(numRecords, db);
instrument(rangeQueryAndThenRangeQueries, "rangeQueryAndThenRangeQueries", db);

View File

@ -41,8 +41,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
*/
@ExtendWith(RequiresDatabase.class)
class RangeQueryIntegrationTest {
public static final int API_VERSION = 720;
private static final FDB fdb = FDB.selectAPIVersion(API_VERSION);
private static final FDB fdb = FDB.selectAPIVersion(ApiVersion.LATEST);
@BeforeEach
@AfterEach

View File

@ -41,8 +41,6 @@ import org.junit.jupiter.api.Assertions;
* are still seeting the initialValue even after new transactions set them to a new value.
*/
public class RepeatableReadMultiThreadClientTest {
public static final int API_VERSION = 720;
public static final MultiClientHelper clientHelper = new MultiClientHelper();
private static final int oldValueReadCount = 30;
@ -54,7 +52,7 @@ public class RepeatableReadMultiThreadClientTest {
private static final Map<Thread, OldValueReader> threadToOldValueReaders = new HashMap<>();
public static void main(String[] args) throws Exception {
FDB fdb = FDB.selectAPIVersion(API_VERSION);
FDB fdb = FDB.selectAPIVersion(ApiVersion.LATEST);
setupThreads(fdb);
Collection<Database> dbs = clientHelper.openDatabases(fdb); // the clientHelper will close the databases for us
System.out.println("Starting tests");

View File

@ -47,8 +47,6 @@ import org.opentest4j.TestAbortedException;
* be running a server and you don't want to deal with spurious test failures.
*/
public class RequiresDatabase implements ExecutionCondition, BeforeAllCallback {
public static final int API_VERSION = 720;
public static boolean canRunIntegrationTest() {
String prop = System.getProperty("run.integration.tests");
if (prop == null) {
@ -81,7 +79,7 @@ public class RequiresDatabase implements ExecutionCondition, BeforeAllCallback {
* assume that if we are here, then canRunIntegrationTest() is returning true and we don't have to bother
* checking it.
*/
try (Database db = FDB.selectAPIVersion(API_VERSION).open()) {
try (Database db = FDB.selectAPIVersion(ApiVersion.LATEST).open()) {
db.run(tr -> {
CompletableFuture<byte[]> future = tr.get("test".getBytes());

View File

@ -19,8 +19,6 @@ import org.junit.jupiter.api.Assertions;
* This test is to verify the causal consistency of transactions for mutli-threaded client.
*/
public class SidebandMultiThreadClientTest {
public static final int API_VERSION = 720;
public static final MultiClientHelper clientHelper = new MultiClientHelper();
private static final Map<Database, BlockingQueue<String>> db2Queues = new HashMap<>();
@ -28,7 +26,7 @@ public class SidebandMultiThreadClientTest {
private static final int txnCnt = 1000;
public static void main(String[] args) throws Exception {
FDB fdb = FDB.selectAPIVersion(API_VERSION);
FDB fdb = FDB.selectAPIVersion(ApiVersion.LATEST);
setupThreads(fdb);
Collection<Database> dbs = clientHelper.openDatabases(fdb); // the clientHelper will close the databases for us
for (Database db : dbs) {

View File

@ -33,8 +33,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
*/
@ExtendWith(RequiresDatabase.class)
public class TransactionIntegrationTest {
public static final int API_VERSION = 720;
private static final FDB fdb = FDB.selectAPIVersion(API_VERSION);
private static final FDB fdb = FDB.selectAPIVersion(ApiVersion.LATEST);
@Test
public void testOperationsAfterCommit() throws Exception {

View File

@ -29,8 +29,6 @@ import org.junit.jupiter.api.extension.ExtensionContext;
* are not available for any reason.
*/
public class FDBLibraryRule implements BeforeAllCallback {
public static final int CURRENT_API_VERSION = 720;
private final int apiVersion;
// because FDB is a singleton (currently), this isn't a super-useful cache,
@ -39,7 +37,7 @@ public class FDBLibraryRule implements BeforeAllCallback {
public FDBLibraryRule(int apiVersion) { this.apiVersion = apiVersion; }
public static FDBLibraryRule current() { return new FDBLibraryRule(CURRENT_API_VERSION); }
public static FDBLibraryRule current() { return new FDBLibraryRule(ApiVersion.LATEST); }
public static FDBLibraryRule v63() { return new FDBLibraryRule(630); }

View File

@ -0,0 +1,30 @@
/*
* ApiVersion.java
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2023 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.apple.foundationdb;
/**
* FDB API version constants
*/
public class ApiVersion {
/**
* The latest API version supported by the bindings
*/
public static final int LATEST = @FDB_AV_LATEST_BINDINGS_VERSION@;
}

View File

@ -36,8 +36,8 @@ import com.apple.foundationdb.tuple.ByteArrayUtil;
* The FoundationDB API is accessed with a call to {@link #selectAPIVersion(int)}.
* This call is required before using any other part of the API. The call allows
* an error to be thrown at this point to prevent client code from accessing a later library
* with incorrect assumptions from the current version. The API version documented here is version
* {@code 720}.<br><br>
* with incorrect assumptions from the current version. The latest supported API version
* is defined as {@link ApiVersion.LATEST}.<br><br>
* FoundationDB encapsulates multiple versions of its interface by requiring
* the client to explicitly specify the version of the API it uses. The purpose
* of this design is to allow you to upgrade the server, client libraries, or
@ -185,8 +185,8 @@ public class FDB {
}
if(version < 510)
throw new IllegalArgumentException("API version not supported (minimum 510)");
if(version > 720)
throw new IllegalArgumentException("API version not supported (maximum 720)");
if(version > ApiVersion.LATEST)
throw new IllegalArgumentException(String.format("API version not supported (maximum %d)", ApiVersion.LATEST));
Select_API_version(version);
singleton = new FDB(version);

View File

@ -13,7 +13,7 @@ and then added to your classpath.<br>
<h1>Getting started</h1>
To start using FoundationDB from Java, create an instance of the
{@link com.apple.foundationdb.FDB FoundationDB API interface} with the version of the
API that you want to use (this release of the FoundationDB Java API supports versions between {@code 510} and {@code 720}).
API that you want to use (this release of the FoundationDB Java API supports versions between {@code 510} and {@link ApiVersion.LATEST}).
With this API object you can then open {@link com.apple.foundationdb.Cluster Cluster}s and
{@link com.apple.foundationdb.Database Database}s and start using
{@link com.apple.foundationdb.Transaction Transaction}s.
@ -28,10 +28,8 @@ import com.apple.foundationdb.FDB;
import com.apple.foundationdb.tuple.Tuple;
public class Example {
public static final int apiVersion = 720;
public static void main(String[] args) {
FDB fdb = FDB.selectAPIVersion(apiVersion);
FDB fdb = FDB.selectAPIVersion(ApiVersion.LATEST);
try(Database db = fdb.open()) {
// Run an operation on the database

View File

@ -27,7 +27,6 @@ import com.apple.foundationdb.Database;
import com.apple.foundationdb.FDB;
public abstract class AbstractTester {
public static final int API_VERSION = 720;
protected static final int NUM_RUNS = 25;
protected static final Charset ASCII = Charset.forName("ASCII");
@ -79,7 +78,7 @@ public abstract class AbstractTester {
args = TesterArgs.parseArgs(argStrings);
if (args == null) return;
fdb = FDB.selectAPIVersion(API_VERSION);
fdb = FDB.selectAPIVersion(TestApiVersion.CURRENT);
// Validate argument combinations and set options.
if (!args.useMultiversionApi()) {

View File

@ -29,13 +29,11 @@ import com.apple.foundationdb.FDB;
import com.apple.foundationdb.Transaction;
public class BlockingBenchmark {
public static final int API_VERSION = 720;
private static final int REPS = 100000;
private static final int PARALLEL = 100;
public static void main(String[] args) throws InterruptedException {
FDB fdb = FDB.selectAPIVersion(API_VERSION);
FDB fdb = FDB.selectAPIVersion(TestApiVersion.CURRENT);
// The cluster file DOES NOT need to be valid, although it must exist.
// This is because the database is never really contacted in this test.

View File

@ -30,8 +30,6 @@ import com.apple.foundationdb.Database;
import com.apple.foundationdb.FDB;
public class ConcurrentGetSetGet {
public static final int API_VERSION = 720;
public static final Charset UTF8 = Charset.forName("UTF-8");
final Semaphore semaphore = new Semaphore(CONCURRENCY);
@ -50,7 +48,7 @@ public class ConcurrentGetSetGet {
}
public static void main(String[] args) {
try(Database database = FDB.selectAPIVersion(API_VERSION).open()) {
try(Database database = FDB.selectAPIVersion(TestApiVersion.CURRENT).open()) {
new ConcurrentGetSetGet().apply(database);
}
}

View File

@ -20,15 +20,15 @@
package com.apple.foundationdb.test;
import com.apple.foundationdb.ApiVersion;
import com.apple.foundationdb.Database;
import com.apple.foundationdb.FDB;
import com.apple.foundationdb.tuple.Tuple;
public class Example {
public static final int API_VERSION = 720;
public static void main(String[] args) {
FDB fdb = FDB.selectAPIVersion(API_VERSION);
FDB fdb = FDB.selectAPIVersion(ApiVersion.LATEST);
try(Database db = fdb.open()) {
// Run an operation on the database

View File

@ -28,12 +28,11 @@ import com.apple.foundationdb.KeyValue;
import com.apple.foundationdb.TransactionContext;
public class IterableTest {
public static final int API_VERSION = 720;
public static void main(String[] args) throws InterruptedException {
final int reps = 1000;
try {
FDB fdb = FDB.selectAPIVersion(API_VERSION);
FDB fdb = FDB.selectAPIVersion(TestApiVersion.CURRENT);
try(Database db = fdb.open()) {
runTests(reps, db);
}

View File

@ -27,15 +27,14 @@ import com.apple.foundationdb.Database;
import com.apple.foundationdb.FDB;
import com.apple.foundationdb.LocalityUtil;
import com.apple.foundationdb.Transaction;
import com.apple.foundationdb.async.CloseableAsyncIterator;
import com.apple.foundationdb.async.AsyncUtil;
import com.apple.foundationdb.async.CloseableAsyncIterator;
import com.apple.foundationdb.tuple.ByteArrayUtil;
public class LocalityTests {
public static final int API_VERSION = 720;
public static void main(String[] args) {
FDB fdb = FDB.selectAPIVersion(API_VERSION);
FDB fdb = FDB.selectAPIVersion(TestApiVersion.CURRENT);
try(Database database = fdb.open(args[0])) {
try(Transaction tr = database.createTransaction()) {
String[] keyAddresses = LocalityUtil.getAddressesForKey(tr, "a".getBytes()).join();

View File

@ -36,8 +36,6 @@ import com.apple.foundationdb.async.AsyncIterator;
import com.apple.foundationdb.tuple.ByteArrayUtil;
public class ParallelRandomScan {
public static final int API_VERSION = 720;
private static final int ROWS = 1000000;
private static final int DURATION_MS = 2000;
private static final int PARALLELISM_MIN = 10;
@ -45,7 +43,7 @@ public class ParallelRandomScan {
private static final int PARALLELISM_STEP = 5;
public static void main(String[] args) throws InterruptedException {
FDB api = FDB.selectAPIVersion(API_VERSION);
FDB api = FDB.selectAPIVersion(TestApiVersion.CURRENT);
try(Database database = api.open(args[0])) {
for(int i = PARALLELISM_MIN; i <= PARALLELISM_MAX; i += PARALLELISM_STEP) {
runTest(database, i, ROWS, DURATION_MS);

View File

@ -34,11 +34,10 @@ import com.apple.foundationdb.Transaction;
import com.apple.foundationdb.async.AsyncIterable;
public class RangeTest {
private static final int API_VERSION = 720;
public static void main(String[] args) {
System.out.println("About to use version " + API_VERSION);
FDB fdb = FDB.selectAPIVersion(API_VERSION);
System.out.println("About to use version " + TestApiVersion.CURRENT);
FDB fdb = FDB.selectAPIVersion(TestApiVersion.CURRENT);
try(Database db = fdb.open()) {
try {

View File

@ -29,14 +29,12 @@ import com.apple.foundationdb.FDB;
import com.apple.foundationdb.Transaction;
public class SerialInsertion {
public static final int API_VERSION = 720;
private static final int THREAD_COUNT = 10;
private static final int BATCH_SIZE = 1000;
private static final int NODES = 1000000;
public static void main(String[] args) {
FDB api = FDB.selectAPIVersion(API_VERSION);
FDB api = FDB.selectAPIVersion(TestApiVersion.CURRENT);
try(Database database = api.open()) {
long start = System.currentTimeMillis();

View File

@ -34,14 +34,12 @@ import com.apple.foundationdb.Transaction;
import com.apple.foundationdb.async.AsyncIterable;
public class SerialIteration {
public static final int API_VERSION = 720;
private static final int ROWS = 1000000;
private static final int RUNS = 25;
private static final int THREAD_COUNT = 1;
public static void main(String[] args) throws InterruptedException {
FDB api = FDB.selectAPIVersion(API_VERSION);
FDB api = FDB.selectAPIVersion(TestApiVersion.CURRENT);
try(Database database = api.open(args[0])) {
for(int i = 1; i <= THREAD_COUNT; i++) {
runThreadedTest(database, i);

View File

@ -27,12 +27,11 @@ import com.apple.foundationdb.FDB;
import com.apple.foundationdb.TransactionContext;
public class SerialTest {
public static final int API_VERSION = 720;
public static void main(String[] args) throws InterruptedException {
final int reps = 1000;
try {
FDB fdb = FDB.selectAPIVersion(API_VERSION);
FDB fdb = FDB.selectAPIVersion(TestApiVersion.CURRENT);
try(Database db = fdb.open()) {
runTests(reps, db);
}

View File

@ -35,13 +35,11 @@ import com.apple.foundationdb.tuple.Tuple;
* Some tests regarding conflict ranges to make sure they do what we expect.
*/
public class SnapshotTransactionTest {
public static final int API_VERSION = 720;
private static final int CONFLICT_CODE = 1020;
private static final Subspace SUBSPACE = new Subspace(Tuple.from("test", "conflict_ranges"));
public static void main(String[] args) {
FDB fdb = FDB.selectAPIVersion(API_VERSION);
FDB fdb = FDB.selectAPIVersion(TestApiVersion.CURRENT);
try(Database db = fdb.open()) {
snapshotReadShouldNotConflict(db);
snapshotShouldNotAddConflictRange(db);

View File

@ -0,0 +1,32 @@
/*
* TestApiVersion.java
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2023 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.apple.foundationdb.test;
/**
* Declares the last tested API version of standalone Java tests & benchmarks
* that are not running as a part of CI. The tests should be retested
* manually after updating the API version
*/
public class TestApiVersion {
/**
* The current API version to be used by the tests
*/
public static final int CURRENT = 720;
}

View File

@ -32,14 +32,12 @@ import com.apple.foundationdb.tuple.Tuple;
import com.apple.foundationdb.tuple.Versionstamp;
public class TupleTest {
public static final int API_VERSION = 720;
private static final byte FF = (byte)0xff;
public static void main(String[] args) throws NoSuchFieldException {
final int reps = 1000;
try {
FDB fdb = FDB.selectAPIVersion(API_VERSION);
FDB fdb = FDB.selectAPIVersion(TestApiVersion.CURRENT);
try(Database db = fdb.open()) {
runTests(reps, db);
}

View File

@ -31,10 +31,9 @@ import com.apple.foundationdb.tuple.Tuple;
import com.apple.foundationdb.tuple.Versionstamp;
public class VersionstampSmokeTest {
public static final int API_VERSION = 720;
public static void main(String[] args) {
FDB fdb = FDB.selectAPIVersion(API_VERSION);
FDB fdb = FDB.selectAPIVersion(TestApiVersion.CURRENT);
try(Database db = fdb.open()) {
db.run(tr -> {
tr.clear(Tuple.from("prefix").range());

View File

@ -32,10 +32,9 @@ import com.apple.foundationdb.FDBException;
import com.apple.foundationdb.Transaction;
public class WatchTest {
public static final int API_VERSION = 720;
public static void main(String[] args) {
FDB fdb = FDB.selectAPIVersion(API_VERSION);
FDB fdb = FDB.selectAPIVersion(TestApiVersion.CURRENT);
try(Database database = fdb.open(args[0])) {
database.options().setLocationCacheSize(42);
try(Transaction tr = database.createTransaction()) {

View File

@ -39,9 +39,16 @@ set(options_file ${PROJECT_BINARY_DIR}/bindings/python/fdb/fdboptions.py)
vexillographer_compile(TARGET fdb_python_options LANG python OUT ${options_file}
OUTPUT ${options_file})
if(NOT DEFINED FDB_API_VERSION_FILE)
message(FATAL_ERROR "Missing definitions of API versions")
endif()
include(${FDB_API_VERSION_FILE})
set(apiversion_file ${CMAKE_CURRENT_BINARY_DIR}/fdb/apiversion.py)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fdb/apiversion.py.cmake ${apiversion_file})
add_dependencies(python_binding fdb_python_options)
set(out_files "${out_files};${options_file}")
set(out_files "${out_files};${options_file};${apiversion_file}")
# TODO[mpilman]: it is not clear whether we want to have rpms for python
#install(FILES ${out_files} DESTINATION ${FDB_PYTHON_INSTALL_DIR} COMPONENT python)

View File

@ -23,6 +23,10 @@
"""Documentation for this API can be found at
https://apple.github.io/foundationdb/api-python.html"""
import fdb.apiversion
LATEST_API_VERSION = fdb.apiversion.LATEST_API_VERSION
def open(*args, **kwargs):
raise RuntimeError("You must call api_version() before using any fdb methods")
@ -52,7 +56,7 @@ def get_api_version():
def api_version(ver):
header_version = 720
header_version = LATEST_API_VERSION
if "_version" in globals():
if globals()["_version"] != ver:

View File

@ -0,0 +1 @@
LATEST_API_VERSION = @FDB_AV_LATEST_BINDINGS_VERSION@

View File

@ -280,7 +280,7 @@ def transactional(*tr_args, **tr_kwargs):
@functools.wraps(func)
def wrapper(*args, **kwargs):
# We can't throw this from the decorator, as when a user runs
# >>> import fdb ; fdb.api_version(720)
# >>> import fdb ; fdb.api_version(fdb.LATEST_API_VERSION)
# the code above uses @transactional before the API version is set
if fdb.get_api_version() >= 630 and inspect.isgeneratorfunction(func):
raise ValueError(

View File

@ -22,7 +22,7 @@ import fdb
import sys
if __name__ == '__main__':
fdb.api_version(720)
fdb.api_version(fdb.LATEST_API_VERSION)
@fdb.transactional

View File

@ -25,7 +25,7 @@ import base64
from fdb.tuple import pack
if __name__ == '__main__':
fdb.api_version(720)
fdb.api_version(fdb.LATEST_API_VERSION)
def cleanup_tenant(db, tenant_name):

View File

@ -29,7 +29,7 @@ import json
import fdb
if __name__ == "__main__":
fdb.api_version(720)
fdb.api_version(fdb.LATEST_API_VERSION)
from cancellation_timeout_tests import test_timeouts
from cancellation_timeout_tests import test_db_timeouts

View File

@ -393,7 +393,7 @@ function(prepare_binding_test_files build_directory target_name target_dependenc
COMMAND ${CMAKE_COMMAND} -E copy $<TARGET_FILE:fdb_flow_tester> ${build_directory}/tests/flow/bin/fdb_flow_tester
COMMENT "Copy Flow tester for bindingtester")
set(generated_binding_files python/fdb/fdboptions.py)
set(generated_binding_files python/fdb/fdboptions.py python/fdb/apiversion.py)
if(WITH_JAVA_BINDING)
if(NOT FDB_RELEASE)
set(not_fdb_release_string "-SNAPSHOT")

View File

@ -83,7 +83,7 @@ void ClientKnobs::initialize(Randomize randomize) {
init( CHANGE_FEED_POP_TIMEOUT, 10.0 );
init( CHANGE_FEED_STREAM_MIN_BYTES, 1e4 ); if( randomize && BUGGIFY ) CHANGE_FEED_STREAM_MIN_BYTES = 1;
init( CHANGE_FEED_START_INTERVAL, 20.0 ); if( randomize && BUGGIFY ) CHANGE_FEED_START_INTERVAL = 10.0;
init( CHANGE_FEED_COALESCE_LOCATIONS, true ); if( randomize && BUGGIFY ) CHANGE_FEED_COALESCE_LOCATIONS = false;
init( CHANGE_FEED_COALESCE_LOCATIONS, false ); if( randomize && BUGGIFY ) CHANGE_FEED_COALESCE_LOCATIONS = false;
init( MAX_BATCH_SIZE, 1000 ); if( randomize && BUGGIFY ) MAX_BATCH_SIZE = 1;
init( GRV_BATCH_TIMEOUT, 0.005 ); if( randomize && BUGGIFY ) GRV_BATCH_TIMEOUT = 0.1;

View File

@ -1099,7 +1099,11 @@ void DLApi::init() {
fdbCPath,
"fdb_tenant_flush_blob_range",
headerVersion >= ApiVersion::withTenantBlobRangeApi().version());
loadClientFunction(&api->tenantGetId, lib, fdbCPath, "fdb_tenant_get_id", headerVersion >= 730);
loadClientFunction(&api->tenantGetId,
lib,
fdbCPath,
"fdb_tenant_get_id",
headerVersion >= ApiVersion::withTenantGetId().version());
loadClientFunction(&api->tenantDestroy, lib, fdbCPath, "fdb_tenant_destroy", headerVersion >= 710);
loadClientFunction(&api->transactionSetOption, lib, fdbCPath, "fdb_transaction_set_option", headerVersion >= 0);

View File

@ -156,6 +156,9 @@ FDB_DEFINE_BOOLEAN_PARAM(BalanceOnRequests);
// Whether or not a request should include the tenant name
FDB_BOOLEAN_PARAM(UseTenant);
// Whether a blob granule request is a request for the mapping to read, or a request to get granule boundaries
FDB_BOOLEAN_PARAM(JustGranules);
NetworkOptions networkOptions;
TLSConfig tlsConfig(TLSEndpointType::CLIENT);
@ -1685,7 +1688,7 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<IClusterConnection
std::make_unique<ClientProfilingImpl>(
KeyRangeRef("profiling/"_sr, "profiling0"_sr)
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)),
/* deprecated */ 720);
/* deprecated */ ApiVersion::withClientProfilingDeprecated().version());
registerSpecialKeysImpl(
SpecialKeySpace::MODULE::MANAGEMENT,
SpecialKeySpace::IMPLTYPE::READWRITE,
@ -3210,13 +3213,13 @@ Future<std::vector<KeyRangeLocationInfo>> getKeyRangeLocations(Reference<Transac
: latestVersion);
}
ACTOR Future<std::vector<std::pair<KeyRange, BlobWorkerInterface>>> getBlobGranuleLocations_internal(
ACTOR Future<std::vector<std::pair<KeyRange, UID>>> getBlobGranuleLocations_internal(
Database cx,
TenantInfo tenant,
KeyRange keys,
int limit,
Reverse reverse,
bool justGranules,
JustGranules justGranules,
SpanContext spanContext,
Optional<UID> debugID,
UseProvisionalProxies useProvisionalProxies,
@ -3253,8 +3256,11 @@ ACTOR Future<std::vector<std::pair<KeyRange, BlobWorkerInterface>>> getBlobGranu
ASSERT(!rep.more || !rep.results.empty());
*more = rep.more;
state std::vector<std::pair<KeyRange, BlobWorkerInterface>> results;
state std::vector<std::pair<KeyRange, UID>> results;
state int granule = 0;
for (auto& bwInterf : rep.bwInterfs) {
cx->blobWorker_interf.insert({ bwInterf.id(), bwInterf });
}
for (; granule < rep.results.size(); granule++) {
// FIXME: cache mapping?
KeyRange range(toPrefixRelativeRange(rep.results[granule].first, tenant.prefix));
@ -3272,13 +3278,12 @@ ACTOR Future<std::vector<std::pair<KeyRange, BlobWorkerInterface>>> getBlobGranu
}
// Get the Blob Worker locations for each granule in the 'keys' key-range, similar to getKeyRangeLocations
Future<std::vector<std::pair<KeyRange, BlobWorkerInterface>>> getBlobGranuleLocations(
Database const& cx,
Future<std::vector<std::pair<KeyRange, UID>>> getBlobGranuleLocations(Database const& cx,
TenantInfo const& tenant,
KeyRange const& keys,
int limit,
Reverse reverse,
bool justGranules,
JustGranules justGranules,
SpanContext const& spanContext,
Optional<UID> const& debugID,
UseProvisionalProxies useProvisionalProxies,
@ -3292,13 +3297,12 @@ Future<std::vector<std::pair<KeyRange, BlobWorkerInterface>>> getBlobGranuleLoca
cx, tenant, keys, limit, reverse, justGranules, spanContext, debugID, useProvisionalProxies, version, more);
}
Future<std::vector<std::pair<KeyRange, BlobWorkerInterface>>> getBlobGranuleLocations(
Reference<TransactionState> trState,
Future<std::vector<std::pair<KeyRange, UID>>> getBlobGranuleLocations(Reference<TransactionState> trState,
KeyRange const& keys,
int limit,
Reverse reverse,
UseTenant useTenant,
bool justGranules,
JustGranules justGranules,
bool* more) {
return getBlobGranuleLocations(
trState->cx,
@ -7057,7 +7061,7 @@ void Transaction::setOption(FDBTransactionOptions::Option option, Optional<Strin
case FDBTransactionOptions::USE_GRV_CACHE:
validateOptionValueNotPresent(value);
if (apiVersionAtLeast(720) && !trState->cx->sharedStatePtr) {
if (apiVersionAtLeast(ApiVersion::withGrvCache().version()) && !trState->cx->sharedStatePtr) {
throw invalid_option();
}
if (trState->numErrors == 0) {
@ -8151,8 +8155,8 @@ ACTOR Future<Standalone<VectorRef<KeyRangeRef>>> getBlobGranuleRangesActor(Trans
if (BUGGIFY_WITH_PROB(0.01)) {
remaining = std::min(remaining, deterministicRandom()->randomInt(1, 10));
}
std::vector<std::pair<KeyRange, BlobWorkerInterface>> blobGranuleMapping = wait(getBlobGranuleLocations(
self->trState, currentRange, remaining, Reverse::False, UseTenant::True, true, &more));
std::vector<std::pair<KeyRange, UID>> blobGranuleMapping = wait(getBlobGranuleLocations(
self->trState, currentRange, remaining, Reverse::False, UseTenant::True, JustGranules::True, &more));
for (auto& it : blobGranuleMapping) {
if (!results.empty() && results.back().end > it.first.end) {
ASSERT(results.back().end > it.first.begin);
@ -8231,13 +8235,13 @@ ACTOR Future<Standalone<VectorRef<BlobGranuleChunkRef>>> readBlobGranulesActor(
}
state bool moreMapping = false;
state std::vector<std::pair<KeyRange, BlobWorkerInterface>> blobGranuleMapping =
state std::vector<std::pair<KeyRange, UID>> blobGranuleMapping =
wait(getBlobGranuleLocations(self->trState,
keyRange,
CLIENT_KNOBS->BG_TOO_MANY_GRANULES,
Reverse::False,
UseTenant::True,
false,
JustGranules::False,
&moreMapping));
if (blobGranuleMapping.empty()) {
@ -8253,7 +8257,7 @@ ACTOR Future<Standalone<VectorRef<BlobGranuleChunkRef>>> readBlobGranulesActor(
blobGranuleMapping.size(),
blobGranuleMapping.back().first.begin.printable(),
blobGranuleMapping.back().first.end.printable(),
blobGranuleMapping.back().second.id().shortString());
blobGranuleMapping.back().second.shortString());
}
TraceEvent(SevWarn, "BGMappingTooLarge")
.detail("Range", range)
@ -8270,14 +8274,12 @@ ACTOR Future<Standalone<VectorRef<BlobGranuleChunkRef>>> readBlobGranulesActor(
// Make request for each granule
for (i = 0; i < blobGranuleMapping.size(); i++) {
state KeyRange granule = blobGranuleMapping[i].first;
state BlobWorkerInterface bwInterf = blobGranuleMapping[i].second;
if (!self->trState->cx->blobWorker_interf.count(bwInterf.id())) {
self->trState->cx->blobWorker_interf[bwInterf.id()] = bwInterf;
}
// if this was a time travel and the request returned larger bounds, skip this chunk
if (granule.end <= keyRange.begin) {
continue;
}
state BlobWorkerInterface bwInterf = self->trState->cx->blobWorker_interf[blobGranuleMapping[i].second];
ASSERT(bwInterf.id() != UID());
if (BG_REQUEST_DEBUG) {
fmt::print("Blob granule request mapping [{0} - {1})={2}\n",
granule.begin.printable(),

View File

@ -541,6 +541,9 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
bool buggfyUseResolverPrivateMutations = randomize && BUGGIFY && !ENABLE_VERSION_VECTOR_TLOG_UNICAST;
init( PROXY_USE_RESOLVER_PRIVATE_MUTATIONS, false ); if( buggfyUseResolverPrivateMutations ) PROXY_USE_RESOLVER_PRIVATE_MUTATIONS = deterministicRandom()->coinflip();
init( BURSTINESS_METRICS_ENABLED , false );
init( BURSTINESS_METRICS_LOG_INTERVAL, 0.1 );
init( RESET_MASTER_BATCHES, 200 );
init( RESET_RESOLVER_BATCHES, 200 );
init( RESET_MASTER_DELAY, 300.0 );

View File

@ -475,14 +475,13 @@ struct GetKeyServerLocationsRequest {
struct GetBlobGranuleLocationsReply {
constexpr static FileIdentifier file_identifier = 2923309;
Arena arena;
std::vector<std::pair<KeyRangeRef, BlobWorkerInterface>> results;
std::vector<std::pair<KeyRangeRef, UID>> results;
std::vector<BlobWorkerInterface> bwInterfs;
bool more;
// FIXME: change to vector of <KeyRangeRef, UID> and then separate map UID->BWInterf
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, results, more, arena);
serializer(ar, results, bwInterfs, more, arena);
}
};

View File

@ -447,6 +447,10 @@ public:
double REPORT_TRANSACTION_COST_ESTIMATION_DELAY;
bool PROXY_REJECT_BATCH_QUEUED_TOO_LONG;
bool PROXY_USE_RESOLVER_PRIVATE_MUTATIONS;
bool BURSTINESS_METRICS_ENABLED;
// Interval on which to emit burstiness metrics on the commit proxy (in
// seconds).
double BURSTINESS_METRICS_LOG_INTERVAL;
int RESET_MASTER_BATCHES;
int RESET_RESOLVER_BATCHES;

View File

@ -2928,7 +2928,14 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
// Wait on delta file starting here. If we have too many pending delta file writes, we need to not
// continue to consume from the change feed, as that will pile on even more delta files to write
if (!startDeltaFileWrite.isReady()) {
wait(startDeltaFileWrite);
// If we were waiting on a lock callback, ensure the writeDeltaFile callback goes first with a
// delay(0) here, to create the flow lock releaser, before returning control to this actor which
// could cancel the writeDeltaFile actor and leak the flow lock
wait(delay(0));
}
} else if (metadata->doEarlyReSnapshot()) {
ASSERT(metadata->currentDeltas.empty());
snapshotEligible = true;
@ -4833,8 +4840,19 @@ ACTOR Future<Void> handleRangeAssign(Reference<BlobWorkerData> bwData,
}
}
if (isSelfReassign && e.code() == error_code_granule_assignment_conflict) {
// can happen because another granule owns it, or the range is force purged. Either way, we have a revoke
// incoming and should drop it
TraceEvent(SevWarn, "BlobWorkerConflictOnReassign")
.detail("Range", req.keyRange)
.detail("ManagerEpoch", req.managerEpoch)
.detail("SeqNo", req.managerSeqno);
return Void();
}
TraceEvent(SevError, "BlobWorkerUnexpectedErrorRangeAssign", bwData->id)
.error(e)
.detail("IsSelfReassign", isSelfReassign)
.detail("Range", req.keyRange)
.detail("ManagerEpoch", req.managerEpoch)
.detail("SeqNo", req.managerSeqno);

View File

@ -419,6 +419,7 @@ ACTOR Future<Void> commitBatcher(ProxyCommitData* commitData,
}
++commitData->stats.txnCommitIn;
commitData->stats.uniqueClients.insert(req.reply.getEndpoint().getPrimaryAddress());
if (req.debugID.present()) {
g_traceBatch.addEvent("CommitDebug", req.debugID.get().first(), "CommitProxyServer.batcher");
@ -2853,17 +2854,21 @@ ACTOR static Future<Void> doBlobGranuleLocationRequest(GetBlobGranuleLocationsRe
}
// Mapping is valid, all worker interfaces are cached, we can populate response
std::unordered_set<UID> interfsIncluded;
for (i = 0; i < blobGranuleMapping.size() - 1; i++) {
KeyRangeRef granule(blobGranuleMapping[i].key, blobGranuleMapping[i + 1].key);
if (req.justGranules) {
if (!blobGranuleMapping[i].value.size()) {
continue;
}
rep.results.push_back({ granule, BlobWorkerInterface() });
rep.results.push_back({ granule, UID() });
} else {
// FIXME: avoid duplicate decode?
UID workerId = decodeBlobGranuleMappingValue(blobGranuleMapping[i].value);
rep.results.push_back({ granule, commitData->blobWorkerInterfCache[workerId] });
rep.results.push_back({ granule, workerId });
if (interfsIncluded.insert(workerId).second) {
rep.bwInterfs.push_back(commitData->blobWorkerInterfCache[workerId]);
}
}
}
@ -3525,6 +3530,66 @@ ACTOR Future<Void> processTransactionStateRequestPart(TransactionStateResolveCon
} // anonymous namespace
//
// Metrics related to the commit proxy are logged on a five second interval in
// the `ProxyMetrics` trace. However, it can be hard to determine workload
// burstiness when looking at such a large time range. This function adds much
// more frequent logging for certain metrics to provide fine-grained insight
// into workload patterns. The metrics logged by this function break down into
// two categories:
//
// * existing counters reported by `ProxyMetrics`
// * new counters that are only reported by this function
//
// Neither is implemented optimally, but the data collected should be helpful
// in identifying workload patterns on the server.
//
// Metrics reporting by this function can be disabled by setting the
// `BURSTINESS_METRICS_ENABLED` knob to false. The reporting interval can be
// adjusted by modifying the knob `BURSTINESS_METRICS_LOG_INTERVAL`.
//
ACTOR Future<Void> logDetailedMetrics(ProxyCommitData* commitData) {
state double startTime = 0;
state int64_t commitBatchInBaseline = 0;
state int64_t txnCommitInBaseline = 0;
state int64_t mutationsBaseline = 0;
state int64_t mutationBytesBaseline = 0;
loop {
if (!SERVER_KNOBS->BURSTINESS_METRICS_ENABLED) {
return Void();
}
startTime = now();
commitBatchInBaseline = commitData->stats.commitBatchIn.getValue();
txnCommitInBaseline = commitData->stats.txnCommitIn.getValue();
mutationsBaseline = commitData->stats.mutations.getValue();
mutationBytesBaseline = commitData->stats.mutationBytes.getValue();
wait(delay(SERVER_KNOBS->BURSTINESS_METRICS_LOG_INTERVAL));
int64_t commitBatchInReal = commitData->stats.commitBatchIn.getValue();
int64_t txnCommitInReal = commitData->stats.txnCommitIn.getValue();
int64_t mutationsReal = commitData->stats.mutations.getValue();
int64_t mutationBytesReal = commitData->stats.mutationBytes.getValue();
// Don't log anything if any of the counters got reset during the wait
// interval. Assume that typically all the counters get reset at once.
if (commitBatchInReal < commitBatchInBaseline || txnCommitInReal < txnCommitInBaseline ||
mutationsReal < mutationsBaseline || mutationBytesReal < mutationBytesBaseline) {
continue;
}
TraceEvent("ProxyDetailedMetrics")
.detail("Elapsed", now() - startTime)
.detail("CommitBatchIn", commitBatchInReal - commitBatchInBaseline)
.detail("TxnCommitIn", txnCommitInReal - txnCommitInBaseline)
.detail("Mutations", mutationsReal - mutationsBaseline)
.detail("MutationBytes", mutationBytesReal - mutationBytesBaseline)
.detail("UniqueClients", commitData->stats.getSizeAndResetUniqueClients());
}
}
ACTOR Future<Void> commitProxyServerCore(CommitProxyInterface proxy,
MasterInterface master,
LifetimeToken masterLifetime,
@ -3616,6 +3681,7 @@ ACTOR Future<Void> commitProxyServerCore(CommitProxyInterface proxy,
addActor.send(rejoinServer(proxy, &commitData));
addActor.send(ddMetricsRequestServer(proxy, db));
addActor.send(reportTxnTagCommitCost(proxy.id(), db, &commitData.ssTrTagCommitCost));
addActor.send(logDetailedMetrics(&commitData));
auto openDb = openDBOnServer(db);

View File

@ -223,7 +223,7 @@ ACTOR Future<Void> blobMetadataLookup(KmsConnectorInterface interf, KmsConnBlobM
wait(delay(deterministicRandom()->random01())); // simulate network delay
// buggify errors or omitted tenants in response
if (!g_simulator->speedUpSimulation && BUGGIFY_WITH_PROB(0.01)) {
if (g_network->isSimulated() && !g_simulator->speedUpSimulation && BUGGIFY_WITH_PROB(0.01)) {
if (deterministicRandom()->coinflip()) {
// remove some number of tenants from the response
int targetSize = deterministicRandom()->randomInt(0, rep.metadataDetails.size());

View File

@ -361,6 +361,20 @@ struct TLogData : NonCopyable {
// Distribution of end-to-end server latency of tlog commit requests.
Reference<Histogram> commitLatencyDist;
// Distribution of queue wait times, per request.
// This is the time spent waiting for previous versions.
//
// Note: we only wait for previous versions to enter the
// in-memory DiskQueue commit queue, not until the records are
// flushed and durable.
Reference<Histogram> queueWaitLatencyDist;
// Distribution of just the disk commit times, per request.
//
// Time starts as soon as this request is done waiting for previous versions,
// and ends when the data is flushed and durable.
Reference<Histogram> timeUntilDurableDist;
TLogData(UID dbgid,
UID workerID,
IKeyValueStore* persistentData,
@ -376,7 +390,9 @@ struct TLogData : NonCopyable {
peekMemoryLimiter(SERVER_KNOBS->TLOG_SPILL_REFERENCE_MAX_PEEK_MEMORY_BYTES),
concurrentLogRouterReads(SERVER_KNOBS->CONCURRENT_LOG_ROUTER_READS), ignorePopDeadline(0), dataFolder(folder),
degraded(degraded),
commitLatencyDist(Histogram::getHistogram("tLog"_sr, "commit"_sr, Histogram::Unit::milliseconds)) {
commitLatencyDist(Histogram::getHistogram("tLog"_sr, "commit"_sr, Histogram::Unit::milliseconds)),
queueWaitLatencyDist(Histogram::getHistogram("tLog"_sr, "QueueWait"_sr, Histogram::Unit::milliseconds)),
timeUntilDurableDist(Histogram::getHistogram("tLog"_sr, "TimeUntilDurable"_sr, Histogram::Unit::milliseconds)) {
cx = openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::True);
}
};
@ -2308,6 +2324,10 @@ ACTOR Future<Void> tLogCommit(TLogData* self,
wait(logData->version.whenAtLeast(req.prevVersion));
// Time until now has been spent waiting in the queue to do actual work.
state double queueWaitEndTime = g_network->timer();
self->queueWaitLatencyDist->sampleSeconds(queueWaitEndTime - req.requestTime());
// Calling check_yield instead of yield to avoid a destruction ordering problem in simulation
if (g_network->check_yield(g_network->getCurrentTask())) {
wait(delay(0, g_network->getCurrentTask()));
@ -2371,24 +2391,29 @@ ACTOR Future<Void> tLogCommit(TLogData* self,
wait(
timeoutWarning(logData->queueCommittedVersion.whenAtLeast(req.version) || stopped, 0.1, warningCollectorInput));
// This is the point at which the transaction is durable (unless it timed out, or the tlog stopped).
const double durableTime = g_network->timer();
if (stopped.isReady()) {
ASSERT(logData->stopped());
req.reply.sendError(tlog_stopped());
return Void();
}
// Measure server-side RPC latency from the time a request was
// received to time the response was sent.
const double endTime = g_network->timer();
if (isNotDuplicate) {
self->commitLatencyDist->sampleSeconds(endTime - req.requestTime());
}
if (req.debugID.present())
g_traceBatch.addEvent("CommitDebug", tlogDebugID.get().first(), "TLog.tLogCommit.After");
req.reply.send(logData->durableKnownCommittedVersion);
// Measure server-side RPC latency from the time a request was
// received until time the response was sent.
const double endTime = g_network->timer();
if (isNotDuplicate) {
self->timeUntilDurableDist->sampleSeconds(durableTime - queueWaitEndTime);
self->commitLatencyDist->sampleSeconds(endTime - req.requestTime());
}
return Void();
}

View File

@ -93,6 +93,12 @@ struct ProxyStats {
Reference<Histogram> tlogLoggingDist;
Reference<Histogram> replyCommitDist;
// These metrics are only logged as part of `ProxyDetailedMetrics`. Since
// the detailed proxy metrics combine data from different sources, we can't
// use a `Counter` along with a `CounterCollection` here, and instead have
// to reimplement the basic functionality.
std::unordered_set<NetworkAddress> uniqueClients;
int64_t getAndResetMaxCompute() {
int64_t r = maxComputeNS;
maxComputeNS = 0;
@ -105,6 +111,12 @@ struct ProxyStats {
return r;
}
int64_t getSizeAndResetUniqueClients() {
int64_t r = uniqueClients.size();
uniqueClients.clear();
return r;
}
explicit ProxyStats(UID id,
NotifiedVersion* pVersion,
NotifiedVersion* pCommittedVersion,

View File

@ -645,13 +645,23 @@ struct AuthzSecurityWorkload : TestWorkload {
int locIdx = deterministicRandom()->randomInt(0, rep.get().results.size());
ASSERT(!rep.get().results.empty());
ASSERT(!rep.get().bwInterfs.empty());
BlobGranuleFileRequest req;
req.arena.dependsOn(rep.get().arena);
req.keyRange = rep.get().results[locIdx].first;
req.tenantInfo = TenantInfo(tenant->id(), reqToken);
req.readVersion = committedVersion;
auto& bwInterf = rep.get().results[locIdx].second;
UID bwId = rep.get().results[locIdx].second;
ASSERT(bwId != UID());
int bwInterfIdx;
for (bwInterfIdx = 0; bwInterfIdx < rep.get().bwInterfs.size(); bwInterfIdx++) {
if (rep.get().bwInterfs[bwInterfIdx].id() == bwId) {
break;
}
}
ASSERT(bwInterfIdx < rep.get().bwInterfs.size());
auto& bwInterf = rep.get().bwInterfs[bwInterfIdx];
ErrorOr<BlobGranuleFileReply> fileRep = wait(bwInterf.blobGranuleFileRequest.tryGetReply(req));
if (fileRep.isError()) {
throw fileRep.getError();

View File

@ -690,6 +690,32 @@ struct BlobGranuleRangesWorkload : TestWorkload {
return Void();
}
// create tenant, set up a blob range, force purge blob range, and then delete tenant
ACTOR Future<Void> deleteTenantUnit(Database cx, BlobGranuleRangesWorkload* self, KeyRange range) {
if (!self->tenantName.present()) {
return Void();
}
state Standalone<StringRef> newTenantName =
self->tenantName.get().withSuffix("_" + deterministicRandom()->randomUniqueID().toString());
wait(success(self->setupTenant(cx, newTenantName)));
state Reference<Tenant> newTenant = makeReference<Tenant>(cx, newTenantName);
wait(newTenant->ready());
bool setSuccess = wait(cx->blobbifyRangeBlocking(range, newTenant));
ASSERT(setSuccess);
Key purgeKey = wait(cx->purgeBlobGranules(range, 1, newTenant, true));
wait(cx->waitPurgeGranulesComplete(purgeKey));
bool unblobbifySuccess = wait(cx->unblobbifyRange(range, newTenant));
ASSERT(unblobbifySuccess);
wait(TenantAPI::deleteTenant(cx.getReference(), newTenantName));
return Void();
}
enum UnitTestTypes {
VERIFY_RANGE_UNIT,
VERIFY_RANGE_GAP_UNIT,
@ -698,7 +724,8 @@ struct BlobGranuleRangesWorkload : TestWorkload {
RE_BLOBBIFY,
ADJACENT_PURGE,
BLOBBIFY_BLOCKING_UNIT,
OP_COUNT = 7 /* keep this last */
DELETE_TENANT_UNIT,
OP_COUNT = 8 /* keep this last */
};
ACTOR Future<Void> blobGranuleRangesUnitTests(Database cx, BlobGranuleRangesWorkload* self) {
@ -744,6 +771,8 @@ struct BlobGranuleRangesWorkload : TestWorkload {
wait(self->adjacentPurge(cx, self, range));
} else if (op == BLOBBIFY_BLOCKING_UNIT) {
wait(self->blobbifyBlockingUnit(cx, self, range));
} else if (op == DELETE_TENANT_UNIT) {
wait(self->deleteTenantUnit(cx, self, range));
} else {
ASSERT(false);
}

View File

@ -71,12 +71,18 @@ public: // introduced features
API_VERSION_FEATURE(@FDB_AV_FUTURE_GET_BOOL@, FutureGetBool);
API_VERSION_FEATURE(@FDB_AV_FUTURE_PROTOCOL_VERSION_API@, FutureProtocolVersionApi);
API_VERSION_FEATURE(@FDB_AV_TENANT_BLOB_RANGE_API@, TenantBlobRangeApi);
API_VERSION_FEATURE(@FDB_AV_CLIENT_TMP_DIR@, ClientTmpDir);
API_VERSION_FEATURE(@FDB_AV_DISABLE_CLIENT_BYPASS@, DisableClientBypass)
API_VERSION_FEATURE(@FDB_AV_GRV_CACHE@, GrvCache);
API_VERSION_FEATURE(@FDB_AV_CLIENT_RROFILING_DEPRECATED@, ClientProfilingDeprecated);
API_VERSION_FEATURE(@FDB_AV_TENANT_API_RELEASED@, TenantApiReleased);
API_VERSION_FEATURE(@FDB_AV_GET_TOTAL_COST@, GetTotalCost);
API_VERSION_FEATURE(@FDB_AV_FAIL_ON_EXTERNAL_CLIENT_ERRORS@, FailOnExternalClientErrors);
API_VERSION_FEATURE(@FDB_AV_GET_TAG_THROTTLED_DURATION@, GetTagThrottledDuration);
API_VERSION_FEATURE(@FDB_AV_FUTURE_GET_DOUBLE@, FutureGetDouble);
API_VERSION_FEATURE(@FDB_AV_GET_CLIENT_STATUS@, GetClientStatus);
API_VERSION_FEATURE(@FDB_AV_INITIALIZE_TRACE_ON_SETUP@, InitializeTraceOnSetup);
API_VERSION_FEATURE(@FDB_AV_TENANT_GET_ID@, TenantGetId);
};
#endif // FLOW_CODE_API_VERSION_H

View File

@ -1,5 +1,6 @@
# API Versions
set(FDB_AV_LATEST_VERSION "730")
set(FDB_AV_LATEST_BINDINGS_VERSION "720")
# Features
set(FDB_AV_SNAPSHOT_RYW "300")
@ -12,9 +13,15 @@ set(FDB_AV_CREATE_DB_FROM_CONN_STRING "720")
set(FDB_AV_FUTURE_GET_BOOL "720")
set(FDB_AV_FUTURE_PROTOCOL_VERSION_API "720")
set(FDB_AV_TENANT_BLOB_RANGE_API "720")
set(FDB_AV_CLIENT_TMP_DIR "720")
set(FDB_AV_DISABLE_CLIENT_BYPASS "720")
set(FDB_AV_GRV_CACHE "720")
set(FDB_AV_CLIENT_RROFILING_DEPRECATED "720")
set(FDB_AV_TENANT_API_RELEASED "720")
set(FDB_AV_GET_TOTAL_COST "730")
set(FDB_AV_FAIL_ON_EXTERNAL_CLIENT_ERRORS "730")
set(FDB_AV_GET_TAG_THROTTLED_DURATION "730")
set(FDB_AV_FUTURE_GET_DOUBLE "730")
set(FDB_AV_GET_CLIENT_STATUS "730")
set(FDB_AV_INITIALIZE_TRACE_ON_SETUP "730")
set(FDB_AV_TENANT_GET_ID "730")

View File

@ -113,10 +113,8 @@ public-address = {ip_address}:$ID{optional_tls}
listen-address = public
datadir = {datadir}/$ID
logdir = {logdir}
{bg_knob_line}
{encrypt_knob_line1}
{encrypt_knob_line2}
{encrypt_knob_line3}
{bg_config}
{encrypt_config}
{tls_config}
{authz_public_key_config}
{custom_config}
@ -257,17 +255,26 @@ logdir = {logdir}
new_conf_file = self.conf_file.parent / (self.conf_file.name + ".new")
with open(new_conf_file, "x") as f:
conf_template = LocalCluster.configuration_template
bg_knob_line = ""
encrypt_knob_line1 = ""
encrypt_knob_line2 = ""
encrypt_knob_line3 = ""
bg_config = ""
encrypt_config = ""
if self.use_legacy_conf_syntax:
conf_template = conf_template.replace("-", "_")
if self.blob_granules_enabled:
bg_knob_line = "knob_bg_url=file://" + str(self.data) + "/fdbblob/"
bg_config = "\n".join(
[
"knob_bg_url=file://" + str(self.data) + "/fdbblob/",
"knob_bg_snapshot_file_target_bytes=100000",
"knob_bg_delta_file_target_bytes=5000",
"knob_bg_delta_bytes_before_compact=50000",
]
)
if self.enable_encryption_at_rest:
encrypt_knob_line2 = "knob_kms_connector_type=FDBPerfKmsConnector"
encrypt_knob_line3 = "knob_enable_configurable_encryption=true"
encrypt_config = "\n".join(
[
"knob_kms_connector_type=FDBPerfKmsConnector",
"knob_enable_configurable_encryption=true",
]
)
f.write(
conf_template.format(
etcdir=self.etc,
@ -275,10 +282,8 @@ logdir = {logdir}
datadir=self.data,
logdir=self.log,
ip_address=self.ip_address,
bg_knob_line=bg_knob_line,
encrypt_knob_line1=encrypt_knob_line1,
encrypt_knob_line2=encrypt_knob_line2,
encrypt_knob_line3=encrypt_knob_line3,
bg_config=bg_config,
encrypt_config=encrypt_config,
tls_config=self.tls_conf_string(),
authz_public_key_config=self.authz_public_key_conf_string(),
optional_tls=":tls" if self.tls_config is not None else "",
@ -713,8 +718,12 @@ logdir = {logdir}
def add_trace_check_from(self, check_func, time_begin, filename_substr: str = ""):
self.trace_check_entries.append((check_func, time_begin, None, filename_substr))
def add_trace_check_from_to(self, check_func, time_begin, time_end, filename_substr: str = ""):
self.trace_check_entries.append((check_func, time_begin, time_end, filename_substr))
def add_trace_check_from_to(
self, check_func, time_begin, time_end, filename_substr: str = ""
):
self.trace_check_entries.append(
(check_func, time_begin, time_end, filename_substr)
)
# generator function that yields (filename, event_type, XML_trace_entry) that matches the parameter
def __loop_through_trace(self, time_begin, time_end, filename_substr: str):
@ -734,12 +743,15 @@ logdir = {logdir}
if time_end != None and time_end < ts:
break # no need to look further in this file
yield (file, ev_type, entry)
except ET.ParseError as e:
except ET.ParseError:
pass # ignore header, footer, or broken line
# applies user-provided check_func that takes a trace entry generator as the parameter
def check_trace(self):
for check_func, time_begin, time_end, filename_substr in self.trace_check_entries:
for (
check_func,
time_begin,
time_end,
filename_substr,
) in self.trace_check_entries:
check_func(self.__loop_through_trace(time_begin, time_end, filename_substr))

View File

@ -33,7 +33,7 @@ from typing import Union
from util import random_alphanum_str, random_alphanum_bytes, to_str, to_bytes
import xml.etree.ElementTree as ET
fdb.api_version(720)
fdb.api_version(fdb.LATEST_API_VERSION)
cluster_scope = "module"