Merge commit '7dacaed98368ec0790c9e18a63dfa0035a31fcff' into redwood-page-lifetimes

This commit is contained in:
Steve Atherton 2022-10-22 02:30:43 -07:00
commit 76316339ac
64 changed files with 1069 additions and 581 deletions

View File

@ -274,93 +274,21 @@ if(NOT WIN32)
@CLUSTER_FILE@
${CMAKE_CURRENT_BINARY_DIR}/libfdb_c_external.so
)
add_fdbclient_test(
NAME fdb_c_api_tests
DISABLE_LOG_DUMP
COMMAND ${CMAKE_SOURCE_DIR}/bindings/c/test/apitester/run_c_api_tests.py
--cluster-file
@CLUSTER_FILE@
--tester-binary
$<TARGET_FILE:fdb_c_api_tester>
--external-client-library
${CMAKE_CURRENT_BINARY_DIR}/libfdb_c_external.so
--test-dir
${CMAKE_SOURCE_DIR}/bindings/c/test/apitester/tests
--tmp-dir
@TMP_DIR@
--log-dir
@LOG_DIR@
--knob
delete-native-lib-after-loading=false # for properly symbolizing xSAN errors
)
add_fdbclient_test(
NAME fdb_c_api_tests_local_only
DISABLE_LOG_DUMP
COMMAND ${CMAKE_SOURCE_DIR}/bindings/c/test/apitester/run_c_api_tests.py
--cluster-file
@CLUSTER_FILE@
--tester-binary
$<TARGET_FILE:fdb_c_api_tester>
--test-dir
${CMAKE_SOURCE_DIR}/bindings/c/test/apitester/local_tests
--tmp-dir
@TMP_DIR@
--log-dir
@LOG_DIR@
--knob
delete-native-lib-after-loading=false # for properly symbolizing xSAN errors
)
add_fdbclient_test(
NAME fdb_c_api_tests_blob_granule
DISABLE_LOG_DUMP
API_TEST_BLOB_GRANULES_ENABLED
COMMAND ${CMAKE_SOURCE_DIR}/bindings/c/test/apitester/run_c_api_tests.py
--cluster-file
@CLUSTER_FILE@
--tester-binary
$<TARGET_FILE:fdb_c_api_tester>
--external-client-library
${CMAKE_CURRENT_BINARY_DIR}/libfdb_c_external.so
--test-dir
${CMAKE_SOURCE_DIR}/bindings/c/test/apitester/blobgranuletests
--blob-granule-local-file-path
@DATA_DIR@/fdbblob/
--tmp-dir
@TMP_DIR@
--log-dir
@LOG_DIR@
--knob
delete-native-lib-after-loading=false # for properly symbolizing xSAN errors
)
add_fdbclient_test(
NAME fdb_c_api_tests_with_tls
DISABLE_LOG_DUMP
TLS_ENABLED
COMMAND ${CMAKE_SOURCE_DIR}/bindings/c/test/apitester/run_c_api_tests.py
--cluster-file
@CLUSTER_FILE@
--tester-binary
$<TARGET_FILE:fdb_c_api_tester>
--external-client-library
${CMAKE_CURRENT_BINARY_DIR}/libfdb_c_external.so
--test-dir
${CMAKE_SOURCE_DIR}/bindings/c/test/apitester/tests
--tmp-dir
@TMP_DIR@
--log-dir
@LOG_DIR@
--tls-cert-file
@CLIENT_CERT_FILE@
--tls-key-file
@CLIENT_KEY_FILE@
--tls-ca-file
@SERVER_CA_FILE@
--knob
delete-native-lib-after-loading=false # for properly symbolizing xSAN errors
)
file(GLOB API_TEST_FILES "${CMAKE_CURRENT_SOURCE_DIR}/test/apitester/tests/*.toml")
foreach(test_file ${API_TEST_FILES})
get_filename_component(file_name "${test_file}" NAME_WE)
set(test_name "fdb_c_api_test_${file_name}")
add_test(NAME "${test_name}"
COMMAND ${CMAKE_SOURCE_DIR}/bindings/c/test/apitester/run_c_api_tests.py
--build-dir ${CMAKE_BINARY_DIR}
--api-tester-bin $<TARGET_FILE:fdb_c_api_tester>
--external-client-library ${CMAKE_CURRENT_BINARY_DIR}/libfdb_c_external.so
--test-file ${test_file}
--knob delete-native-lib-after-loading=false
)
set_tests_properties("${test_name}" PROPERTIES TIMEOUT 300)
endforeach()
add_test(NAME fdb_c_upgrade_to_future_version
COMMAND ${CMAKE_SOURCE_DIR}/tests/TestRunner/upgrade_test.py

View File

@ -1,15 +0,0 @@
[[test]]
title = 'Blob Granule API Correctness Single Threaded'
minClients = 1
maxClients = 3
multiThreaded = false
[[test.workload]]
name = 'ApiBlobGranuleCorrectness'
minKeyLength = 1
maxKeyLength = 64
minValueLength = 1
maxValueLength = 1000
maxKeysPerTransaction = 50
initialSize = 100
numRandomOperations = 100

View File

@ -1,15 +0,0 @@
[[test]]
title = 'Blob Granule Errors Single Threaded'
minClients = 1
maxClients = 3
multiThreaded = false
[[test.workload]]
name = 'BlobGranuleErrors'
minKeyLength = 1
maxKeyLength = 64
minValueLength = 1
maxValueLength = 1000
maxKeysPerTransaction = 50
initialSize = 100
numRandomOperations = 100

View File

@ -279,9 +279,9 @@ bool parseArgs(TesterOptions& options, int argc, char** argv) {
return true;
}
void fdb_check(fdb::Error e) {
if (e) {
fmt::print(stderr, "Unexpected FDB error: {}({})\n", e.code(), e.what());
void fdb_check(fdb::Error e, std::string_view msg, fdb::Error::CodeType expectedError = error_code_success) {
if (e.code()) {
fmt::print(stderr, "{}, Error: {}({})\n", msg, e.code(), e.what());
std::abort();
}
}
@ -453,13 +453,13 @@ int main(int argc, char** argv) {
applyNetworkOptions(options);
fdb::network::setup();
std::thread network_thread{ &fdb::network::run };
std::thread network_thread{ [] { fdb_check(fdb::network::run(), "FDB network thread failed"); } };
if (!runWorkloads(options)) {
retCode = 1;
}
fdb_check(fdb::network::stop());
fdb_check(fdb::network::stop(), "Failed to stop FDB thread");
network_thread.join();
} catch (const std::exception& err) {
fmt::print(stderr, "ERROR: {}\n", err.what());

View File

@ -1,29 +0,0 @@
[[test]]
title = 'API Correctness Single Threaded'
minClients = 1
maxClients = 3
minDatabases = 1
maxDatabases = 3
multiThreaded = false
disableClientBypass = true
[[test.workload]]
name = 'ApiCorrectness'
minKeyLength = 1
maxKeyLength = 64
minValueLength = 1
maxValueLength = 1000
maxKeysPerTransaction = 50
initialSize = 100
numRandomOperations = 100
readExistingKeysRatio = 0.9
[[test.workload]]
name = 'AtomicOpsCorrectness'
initialSize = 0
numRandomOperations = 100
[[test.workload]]
name = 'WatchAndWait'
initialSize = 0
numRandomOperations = 10

View File

@ -29,31 +29,39 @@ from pathlib import Path
import glob
import random
import string
import toml
sys.path[:0] = [os.path.join(os.path.dirname(__file__), "..", "..", "..", "..", "tests", "TestRunner")]
# fmt: off
from tmp_cluster import TempCluster
from local_cluster import TLSConfig
# fmt: on
TESTER_STATS_INTERVAL_SEC = 5
def random_string(len):
return ''.join(random.choice(string.ascii_letters + string.digits) for i in range(len))
return "".join(random.choice(string.ascii_letters + string.digits) for i in range(len))
def get_logger():
return logging.getLogger('foundationdb.run_c_api_tests')
return logging.getLogger("foundationdb.run_c_api_tests")
def initialize_logger_level(logging_level):
logger = get_logger()
assert logging_level in ['DEBUG', 'INFO', 'WARNING', 'ERROR']
assert logging_level in ["DEBUG", "INFO", "WARNING", "ERROR"]
logging.basicConfig(format='%(message)s')
if logging_level == 'DEBUG':
logging.basicConfig(format="%(message)s")
if logging_level == "DEBUG":
logger.setLevel(logging.DEBUG)
elif logging_level == 'INFO':
elif logging_level == "INFO":
logger.setLevel(logging.INFO)
elif logging_level == 'WARNING':
elif logging_level == "WARNING":
logger.setLevel(logging.WARNING)
elif logging_level == 'ERROR':
elif logging_level == "ERROR":
logger.setLevel(logging.ERROR)
@ -65,39 +73,52 @@ def dump_client_logs(log_dir):
print(">>>>>>>>>>>>>>>>>>>> End of {}:".format(log_file))
def run_tester(args, test_file):
cmd = [args.tester_binary,
"--cluster-file", args.cluster_file,
"--test-file", test_file,
"--stats-interval", str(TESTER_STATS_INTERVAL_SEC*1000)]
def run_tester(args, cluster, test_file):
build_dir = Path(args.build_dir).resolve()
tester_binary = Path(args.api_tester_bin).resolve()
external_client_library = build_dir.joinpath("bindings", "c", "libfdb_c_external.so")
log_dir = Path(cluster.log).joinpath("client")
log_dir.mkdir(exist_ok=True)
cmd = [
tester_binary,
"--cluster-file",
cluster.cluster_file,
"--test-file",
test_file,
"--stats-interval",
str(TESTER_STATS_INTERVAL_SEC * 1000),
"--tmp-dir",
cluster.tmp_dir,
"--log",
"--log-dir",
str(log_dir),
]
if args.external_client_library is not None:
cmd += ["--external-client-library", args.external_client_library]
if args.tmp_dir is not None:
cmd += ["--tmp-dir", args.tmp_dir]
log_dir = None
if args.log_dir is not None:
log_dir = Path(args.log_dir).joinpath(random_string(8))
log_dir.mkdir(exist_ok=True)
cmd += ['--log', "--log-dir", str(log_dir)]
external_client_library = Path(args.external_client_library).resolve()
cmd += ["--external-client-library", external_client_library]
if args.blob_granule_local_file_path is not None:
cmd += ["--blob-granule-local-file-path",
args.blob_granule_local_file_path]
if cluster.blob_granules_enabled:
cmd += [
"--blob-granule-local-file-path",
str(cluster.data.joinpath("fdbblob")) + os.sep,
]
if args.tls_ca_file is not None:
cmd += ["--tls-ca-file", args.tls_ca_file]
if args.tls_key_file is not None:
cmd += ["--tls-key-file", args.tls_key_file]
if args.tls_cert_file is not None:
cmd += ["--tls-cert-file", args.tls_cert_file]
if cluster.tls_config is not None:
cmd += [
"--tls-ca-file",
cluster.server_ca_file,
"--tls-key-file",
cluster.client_key_file,
"--tls-cert-file",
cluster.client_cert_file,
]
for knob in args.knobs:
knob_name, knob_value = knob.split("=")
cmd += ["--knob-" + knob_name, knob_value]
get_logger().info('\nRunning tester \'%s\'...' % ' '.join(cmd))
get_logger().info("\nRunning tester '%s'..." % " ".join(map(str, cmd)))
proc = Popen(cmd, stdout=sys.stdout, stderr=sys.stderr)
timed_out = False
ret_code = 1
@ -107,34 +128,76 @@ def run_tester(args, test_file):
proc.kill()
timed_out = True
except Exception as e:
raise Exception('Unable to run tester (%s)' % e)
raise Exception("Unable to run tester (%s)" % e)
if ret_code != 0:
if timed_out:
reason = 'timed out after %d seconds' % args.timeout
reason = "timed out after %d seconds" % args.timeout
elif ret_code < 0:
reason = signal.Signals(-ret_code).name
else:
reason = 'exit code: %d' % ret_code
get_logger().error('\n\'%s\' did not complete succesfully (%s)' %
(cmd[0], reason))
if (log_dir is not None):
reason = "exit code: %d" % ret_code
get_logger().error("\n'%s' did not complete succesfully (%s)" % (cmd[0], reason))
if log_dir is not None:
dump_client_logs(log_dir)
get_logger().info('')
get_logger().info("")
return ret_code
class TestConfig:
def __init__(self, test_file):
config = toml.load(test_file)
server_config = config.get("server", [{}])[0]
self.tenants_enabled = server_config.get("tenants_enabled", True)
self.blob_granules_enabled = server_config.get("blob_granules_enabled", False)
self.tls_enabled = server_config.get("tls_enabled", False)
self.client_chain_len = server_config.get("tls_client_chain_len", 2)
self.server_chain_len = server_config.get("tls_server_chain_len", 3)
self.min_num_processes = server_config.get("min_num_processes", 1)
self.max_num_processes = server_config.get("max_num_processes", 3)
self.num_processes = random.randint(self.min_num_processes, self.max_num_processes)
def run_test(args, test_file):
config = TestConfig(test_file)
tls_config = None
if config.tls_enabled:
tls_config = TLSConfig(
server_chain_len=config.client_chain_len,
client_chain_len=config.server_chain_len,
)
with TempCluster(
args.build_dir,
config.num_processes,
enable_tenants=config.tenants_enabled,
blob_granules_enabled=config.blob_granules_enabled,
tls_config=tls_config,
) as cluster:
ret_code = run_tester(args, cluster, test_file)
if not cluster.check_cluster_logs():
ret_code = 1 if ret_code == 0 else ret_code
return ret_code
def run_tests(args):
num_failed = 0
test_files = [f for f in os.listdir(args.test_dir) if os.path.isfile(
os.path.join(args.test_dir, f)) and f.endswith(".toml")]
if args.test_file is not None:
test_files = [Path(args.test_file).resolve()]
else:
test_files = [
f
for f in os.listdir(args.test_dir)
if os.path.isfile(os.path.join(args.test_dir, f)) and f.endswith(".toml")
]
for test_file in test_files:
get_logger().info('=========================================================')
get_logger().info('Running test %s' % test_file)
get_logger().info('=========================================================')
ret_code = run_tester(args, os.path.join(args.test_dir, test_file))
get_logger().info("=========================================================")
get_logger().info("Running test %s" % test_file)
get_logger().info("=========================================================")
ret_code = run_test(args, os.path.join(args.test_dir, test_file))
if ret_code != 0:
num_failed += 1
@ -142,34 +205,49 @@ def run_tests(args):
def parse_args(argv):
parser = argparse.ArgumentParser(description='FoundationDB C API Tester')
parser.add_argument('--cluster-file', type=str, default="fdb.cluster",
help='The cluster file for the cluster being connected to. (default: fdb.cluster)')
parser.add_argument('--tester-binary', type=str, default="fdb_c_api_tester",
help='Path to the fdb_c_api_tester executable. (default: fdb_c_api_tester)')
parser.add_argument('--external-client-library', type=str, default=None,
help='Path to the external client library. (default: None)')
parser.add_argument('--test-dir', type=str, default="./",
help='Path to a directory with test definitions. (default: ./)')
parser.add_argument('--timeout', type=int, default=300,
help='The timeout in seconds for running each individual test. (default 300)')
parser.add_argument('--log-dir', type=str, default=None,
help='The directory for storing logs (default: None)')
parser.add_argument('--logging-level', type=str, default='INFO',
choices=['ERROR', 'WARNING', 'INFO', 'DEBUG'], help='Specifies the level of detail in the tester output (default=\'INFO\').')
parser.add_argument('--tmp-dir', type=str, default=None,
help='The directory for storing temporary files (default: None)')
parser.add_argument('--blob-granule-local-file-path', type=str, default=None,
help='Enable blob granule tests if set, value is path to local blob granule files')
parser.add_argument('--tls-ca-file', type=str, default=None,
help='Path to client\'s TLS CA file: i.e. certificate of CA that signed the server certificate')
parser.add_argument('--tls-cert-file', type=str, default=None,
help='Path to client\'s TLS certificate file')
parser.add_argument('--tls-key-file', type=str, default=None,
help='Path to client\'s TLS private key file')
parser.add_argument('--knob', type=str, default=[], action="append", dest="knobs",
help='[lowercase-knob-name]=[knob-value] (there may be multiple --knob options)')
parser = argparse.ArgumentParser(description="FoundationDB C API Tester")
parser.add_argument("--build-dir", "-b", type=str, required=True, help="FDB build directory")
parser.add_argument("--api-tester-bin", type=str, help="Path to the fdb_c_api_tester executable.", required=True)
parser.add_argument("--external-client-library", type=str, help="Path to the external client library.")
parser.add_argument(
"--cluster-file",
type=str,
default="fdb.cluster",
help="The cluster file for the cluster being connected to. (default: fdb.cluster)",
)
parser.add_argument(
"--test-dir",
type=str,
default="./",
help="Path to a directory with test definitions. (default: ./)",
)
parser.add_argument(
"--test-file",
type=str,
default=None,
help="Path to a single test definition to be executed, overrides --test-dir if set.",
)
parser.add_argument(
"--timeout",
type=int,
default=300,
help="The timeout in seconds for running each individual test. (default 300)",
)
parser.add_argument(
"--logging-level",
type=str,
default="INFO",
choices=["ERROR", "WARNING", "INFO", "DEBUG"],
help="Specifies the level of detail in the tester output (default='INFO').",
)
parser.add_argument(
"--knob",
type=str,
default=[],
action="append",
dest="knobs",
help="[lowercase-knob-name]=[knob-value] (there may be multiple --knob options)",
)
return parser.parse_args(argv)
@ -180,5 +258,5 @@ def main(argv):
return run_tests(args)
if __name__ == '__main__':
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))

View File

@ -12,13 +12,15 @@ maxClientThreads = 8
minClients = 2
maxClients = 8
[[server]]
blob_granules_enabled = true
[[test.workload]]
name = 'ApiBlobGranuleCorrectness'
minKeyLength = 1
maxKeyLength = 64
minValueLength = 1
maxValueLength = 1000
maxKeysPerTransaction = 50
initialSize = 100
numRandomOperations = 100
[[test.workload]]
name = 'ApiBlobGranuleCorrectness'
minKeyLength = 1
maxKeyLength = 64
minValueLength = 1
maxValueLength = 1000
maxKeysPerTransaction = 50
initialSize = 100
numRandomOperations = 100

View File

@ -11,13 +11,15 @@ maxClientThreads = 8
minClients = 2
maxClients = 8
[[test.workload]]
name = 'ApiBlobGranuleCorrectness'
minKeyLength = 1
maxKeyLength = 64
minValueLength = 1
maxValueLength = 1000
maxKeysPerTransaction = 50
initialSize = 100
numRandomOperations = 100
[[server]]
blob_granules_enabled = true
[[test.workload]]
name = 'ApiBlobGranuleCorrectness'
minKeyLength = 1
maxKeyLength = 64
minValueLength = 1
maxValueLength = 1000
maxKeysPerTransaction = 50
initialSize = 100
numRandomOperations = 100

View File

@ -0,0 +1,18 @@
[[test]]
title = 'Blob Granule API Correctness Single Threaded'
minClients = 1
maxClients = 3
multiThreaded = false
[[server]]
blob_granules_enabled = true
[[test.workload]]
name = 'ApiBlobGranuleCorrectness'
minKeyLength = 1
maxKeyLength = 64
minValueLength = 1
maxValueLength = 1000
maxKeysPerTransaction = 50
initialSize = 100
numRandomOperations = 100

View File

@ -11,12 +11,15 @@ maxClientThreads = 8
minClients = 2
maxClients = 8
[[test.workload]]
name = 'BlobGranuleErrors'
minKeyLength = 1
maxKeyLength = 64
minValueLength = 1
maxValueLength = 1000
maxKeysPerTransaction = 50
initialSize = 100
numRandomOperations = 100
[[server]]
blob_granules_enabled = true
[[test.workload]]
name = 'BlobGranuleErrors'
minKeyLength = 1
maxKeyLength = 64
minValueLength = 1
maxValueLength = 1000
maxKeysPerTransaction = 50
initialSize = 100
numRandomOperations = 100

View File

@ -11,12 +11,15 @@ maxClientThreads = 8
minClients = 2
maxClients = 8
[[test.workload]]
name = 'BlobGranuleErrors'
minKeyLength = 1
maxKeyLength = 64
minValueLength = 1
maxValueLength = 1000
maxKeysPerTransaction = 50
initialSize = 100
numRandomOperations = 100
[[server]]
blob_granules_enabled = true
[[test.workload]]
name = 'BlobGranuleErrors'
minKeyLength = 1
maxKeyLength = 64
minValueLength = 1
maxValueLength = 1000
maxKeysPerTransaction = 50
initialSize = 100
numRandomOperations = 100

View File

@ -0,0 +1,18 @@
[[test]]
title = 'Blob Granule Errors Single Threaded'
minClients = 1
maxClients = 3
multiThreaded = false
[[server]]
blob_granules_enabled = true
[[test.workload]]
name = 'BlobGranuleErrors'
minKeyLength = 1
maxKeyLength = 64
minValueLength = 1
maxValueLength = 1000
maxKeysPerTransaction = 50
initialSize = 100
numRandomOperations = 100

View File

@ -12,13 +12,13 @@ maxClientThreads = 8
minClients = 2
maxClients = 8
[[test.workload]]
name = 'CancelTransaction'
minKeyLength = 1
maxKeyLength = 64
minValueLength = 1
maxValueLength = 1000
maxKeysPerTransaction = 50
initialSize = 100
numRandomOperations = 100
readExistingKeysRatio = 0.9
[[test.workload]]
name = 'CancelTransaction'
minKeyLength = 1
maxKeyLength = 64
minValueLength = 1
maxValueLength = 1000
maxKeysPerTransaction = 50
initialSize = 100
numRandomOperations = 100
readExistingKeysRatio = 0.9

View File

@ -11,13 +11,13 @@ maxClientThreads = 8
minClients = 2
maxClients = 8
[[test.workload]]
name = 'CancelTransaction'
minKeyLength = 1
maxKeyLength = 64
minValueLength = 1
maxValueLength = 1000
maxKeysPerTransaction = 50
initialSize = 100
numRandomOperations = 100
readExistingKeysRatio = 0.9
[[test.workload]]
name = 'CancelTransaction'
minKeyLength = 1
maxKeyLength = 64
minValueLength = 1
maxValueLength = 1000
maxKeysPerTransaction = 50
initialSize = 100
numRandomOperations = 100
readExistingKeysRatio = 0.9

View File

@ -12,13 +12,13 @@ maxClientThreads = 8
minClients = 2
maxClients = 8
[[test.workload]]
name = 'CancelTransaction'
minKeyLength = 1
maxKeyLength = 64
minValueLength = 1
maxValueLength = 1000
maxKeysPerTransaction = 50
initialSize = 100
numRandomOperations = 100
readExistingKeysRatio = 0.9
[[test.workload]]
name = 'CancelTransaction'
minKeyLength = 1
maxKeyLength = 64
minValueLength = 1
maxValueLength = 1000
maxKeysPerTransaction = 50
initialSize = 100
numRandomOperations = 100
readExistingKeysRatio = 0.9

View File

@ -0,0 +1,28 @@
[[test]]
title = 'Cancel Transaction with Database per Transaction with TLS'
multiThreaded = true
buggify = true
databasePerTransaction = true
minFdbThreads = 2
maxFdbThreads = 8
minDatabases = 2
maxDatabases = 8
minClientThreads = 2
maxClientThreads = 8
minClients = 2
maxClients = 8
[[server]]
tls_enabled = true
max_num_processes = 1
[[test.workload]]
name = 'CancelTransaction'
minKeyLength = 1
maxKeyLength = 64
minValueLength = 1
maxValueLength = 1000
maxKeysPerTransaction = 50
initialSize = 100
numRandomOperations = 100
readExistingKeysRatio = 0.9

View File

@ -11,15 +11,15 @@ maxClientThreads = 8
minClients = 2
maxClients = 8
[[test.workload]]
name = 'CancelTransaction'
minKeyLength = 1
maxKeyLength = 64
minValueLength = 1
maxValueLength = 1000
maxKeysPerTransaction = 50
initialSize = 100
numRandomOperations = 100
readExistingKeysRatio = 0.9
minTxTimeoutMs = 10
maxTxTimeoutMs = 10000
[[test.workload]]
name = 'CancelTransaction'
minKeyLength = 1
maxKeyLength = 64
minValueLength = 1
maxValueLength = 1000
maxKeysPerTransaction = 50
initialSize = 100
numRandomOperations = 100
readExistingKeysRatio = 0.9
minTxTimeoutMs = 10
maxTxTimeoutMs = 10000

View File

@ -12,23 +12,23 @@ maxClientThreads = 8
minClients = 2
maxClients = 8
[[test.workload]]
name = 'ApiCorrectness'
minKeyLength = 1
maxKeyLength = 64
minValueLength = 1
maxValueLength = 1000
maxKeysPerTransaction = 50
initialSize = 100
numRandomOperations = 100
readExistingKeysRatio = 0.9
[[test.workload]]
name = 'ApiCorrectness'
minKeyLength = 1
maxKeyLength = 64
minValueLength = 1
maxValueLength = 1000
maxKeysPerTransaction = 50
initialSize = 100
numRandomOperations = 100
readExistingKeysRatio = 0.9
[[test.workload]]
name = 'AtomicOpsCorrectness'
initialSize = 0
numRandomOperations = 100
[[test.workload]]
name = 'AtomicOpsCorrectness'
initialSize = 0
numRandomOperations = 100
[[test.workload]]
name = 'WatchAndWait'
initialSize = 0
numRandomOperations = 10
[[test.workload]]
name = 'WatchAndWait'
initialSize = 0
numRandomOperations = 10

View File

@ -12,23 +12,23 @@ maxClientThreads = 8
minClients = 2
maxClients = 8
[[test.workload]]
name = 'ApiCorrectness'
minKeyLength = 1
maxKeyLength = 64
minValueLength = 1
maxValueLength = 1000
maxKeysPerTransaction = 50
initialSize = 100
numRandomOperations = 100
readExistingKeysRatio = 0.9
[[test.workload]]
name = 'ApiCorrectness'
minKeyLength = 1
maxKeyLength = 64
minValueLength = 1
maxValueLength = 1000
maxKeysPerTransaction = 50
initialSize = 100
numRandomOperations = 100
readExistingKeysRatio = 0.9
[[test.workload]]
name = 'AtomicOpsCorrectness'
initialSize = 0
numRandomOperations = 100
[[test.workload]]
name = 'AtomicOpsCorrectness'
initialSize = 0
numRandomOperations = 100
[[test.workload]]
name = 'WatchAndWait'
initialSize = 0
numRandomOperations = 10
[[test.workload]]
name = 'WatchAndWait'
initialSize = 0
numRandomOperations = 10

View File

@ -12,23 +12,23 @@ maxClientThreads = 8
minClients = 2
maxClients = 8
[[test.workload]]
name = 'ApiCorrectness'
minKeyLength = 1
maxKeyLength = 64
minValueLength = 1
maxValueLength = 1000
maxKeysPerTransaction = 50
initialSize = 100
numRandomOperations = 100
readExistingKeysRatio = 0.9
[[test.workload]]
name = 'ApiCorrectness'
minKeyLength = 1
maxKeyLength = 64
minValueLength = 1
maxValueLength = 1000
maxKeysPerTransaction = 50
initialSize = 100
numRandomOperations = 100
readExistingKeysRatio = 0.9
[[test.workload]]
name = 'AtomicOpsCorrectness'
initialSize = 0
numRandomOperations = 100
[[test.workload]]
name = 'AtomicOpsCorrectness'
initialSize = 0
numRandomOperations = 100
[[test.workload]]
name = 'WatchAndWait'
initialSize = 0
numRandomOperations = 10
[[test.workload]]
name = 'WatchAndWait'
initialSize = 0
numRandomOperations = 10

View File

@ -0,0 +1,29 @@
[[test]]
title = 'API Correctness Single Threaded'
minClients = 1
maxClients = 3
minDatabases = 1
maxDatabases = 3
multiThreaded = false
disableClientBypass = true
[[test.workload]]
name = 'ApiCorrectness'
minKeyLength = 1
maxKeyLength = 64
minValueLength = 1
maxValueLength = 1000
maxKeysPerTransaction = 50
initialSize = 100
numRandomOperations = 100
readExistingKeysRatio = 0.9
[[test.workload]]
name = 'AtomicOpsCorrectness'
initialSize = 0
numRandomOperations = 100
[[test.workload]]
name = 'WatchAndWait'
initialSize = 0
numRandomOperations = 10

View File

@ -11,23 +11,23 @@ maxClientThreads = 8
minClients = 2
maxClients = 8
[[test.workload]]
name = 'ApiCorrectness'
minKeyLength = 1
maxKeyLength = 64
minValueLength = 1
maxValueLength = 1000
maxKeysPerTransaction = 50
initialSize = 100
numRandomOperations = 100
readExistingKeysRatio = 0.9
[[test.workload]]
name = 'ApiCorrectness'
minKeyLength = 1
maxKeyLength = 64
minValueLength = 1
maxValueLength = 1000
maxKeysPerTransaction = 50
initialSize = 100
numRandomOperations = 100
readExistingKeysRatio = 0.9
[[test.workload]]
name = 'AtomicOpsCorrectness'
initialSize = 0
numRandomOperations = 100
[[test.workload]]
name = 'AtomicOpsCorrectness'
initialSize = 0
numRandomOperations = 100
[[test.workload]]
name = 'WatchAndWait'
initialSize = 0
numRandomOperations = 10
[[test.workload]]
name = 'WatchAndWait'
initialSize = 0
numRandomOperations = 10

View File

@ -4,23 +4,23 @@ minClients = 1
maxClients = 3
multiThreaded = false
[[test.workload]]
name = 'ApiCorrectness'
minKeyLength = 1
maxKeyLength = 64
minValueLength = 1
maxValueLength = 1000
maxKeysPerTransaction = 50
initialSize = 100
numRandomOperations = 100
readExistingKeysRatio = 0.9
[[test.workload]]
name = 'ApiCorrectness'
minKeyLength = 1
maxKeyLength = 64
minValueLength = 1
maxValueLength = 1000
maxKeysPerTransaction = 50
initialSize = 100
numRandomOperations = 100
readExistingKeysRatio = 0.9
[[test.workload]]
name = 'AtomicOpsCorrectness'
initialSize = 0
numRandomOperations = 100
[[test.workload]]
name = 'AtomicOpsCorrectness'
initialSize = 0
numRandomOperations = 100
[[test.workload]]
name = 'WatchAndWait'
initialSize = 0
numRandomOperations = 10
[[test.workload]]
name = 'WatchAndWait'
initialSize = 0
numRandomOperations = 10

View File

@ -0,0 +1,37 @@
[[test]]
title = 'API Correctness with TLS'
multiThreaded = true
buggify = true
minFdbThreads = 2
maxFdbThreads = 8
minDatabases = 2
maxDatabases = 8
minClientThreads = 2
maxClientThreads = 8
minClients = 2
maxClients = 8
[[server]]
tls_enabled = true
max_num_processes = 1
[[test.workload]]
name = 'ApiCorrectness'
minKeyLength = 1
maxKeyLength = 64
minValueLength = 1
maxValueLength = 1000
maxKeysPerTransaction = 50
initialSize = 100
numRandomOperations = 100
readExistingKeysRatio = 0.9
[[test.workload]]
name = 'AtomicOpsCorrectness'
initialSize = 0
numRandomOperations = 100
[[test.workload]]
name = 'WatchAndWait'
initialSize = 0
numRandomOperations = 10

View File

@ -11,23 +11,22 @@ maxClientThreads = 8
minClients = 2
maxClients = 8
[[test.workload]]
name = 'ApiCorrectness'
minKeyLength = 1
maxKeyLength = 64
minValueLength = 1
maxValueLength = 1000
maxKeysPerTransaction = 50
initialSize = 100
numRandomOperations = 100
readExistingKeysRatio = 0.9
minTxTimeoutMs = 100
maxTxTimeoutMs = 10000
[[test.workload]]
name = 'AtomicOpsCorrectness'
initialSize = 0
numRandomOperations = 100
minTxTimeoutMs = 100
maxTxTimeoutMs = 10000
[[test.workload]]
name = 'ApiCorrectness'
minKeyLength = 1
maxKeyLength = 64
minValueLength = 1
maxValueLength = 1000
maxKeysPerTransaction = 50
initialSize = 100
numRandomOperations = 100
readExistingKeysRatio = 0.9
minTxTimeoutMs = 100
maxTxTimeoutMs = 10000
[[test.workload]]
name = 'AtomicOpsCorrectness'
initialSize = 0
numRandomOperations = 100
minTxTimeoutMs = 100
maxTxTimeoutMs = 10000

View File

@ -9,13 +9,13 @@ maxClients = 8
minTenants = 2
maxTenants = 5
[[test.workload]]
name = 'ApiCorrectness'
minKeyLength = 1
maxKeyLength = 64
minValueLength = 1
maxValueLength = 1000
maxKeysPerTransaction = 5
initialSize = 100
numRandomOperations = 200
readExistingKeysRatio = 0.9
[[test.workload]]
name = 'ApiCorrectness'
minKeyLength = 1
maxKeyLength = 64
minValueLength = 1
maxValueLength = 1000
maxKeysPerTransaction = 5
initialSize = 100
numRandomOperations = 200
readExistingKeysRatio = 0.9

View File

@ -0,0 +1,25 @@
[[test]]
title = 'Multi-tenant API Correctness Multi Threaded'
multiThreaded = true
buggify = true
minFdbThreads = 2
maxFdbThreads = 8
minClients = 2
maxClients = 8
minTenants = 2
maxTenants = 5
[[server]]
tls_enabled = true
max_num_processes = 1
[[test.workload]]
name = 'ApiCorrectness'
minKeyLength = 1
maxKeyLength = 64
minValueLength = 1
maxValueLength = 1000
maxKeysPerTransaction = 5
initialSize = 100
numRandomOperations = 200
readExistingKeysRatio = 0.9

View File

@ -12,13 +12,13 @@ maxClientThreads = 4
minClients = 2
maxClients = 4
[[test.workload]]
name = 'ApiCorrectness'
minKeyLength = 1
maxKeyLength = 64
minValueLength = 1
maxValueLength = 1000
maxKeysPerTransaction = 50
initialSize = 100
numRandomOperations = 100
readExistingKeysRatio = 0.9
[[test.workload]]
name = 'ApiCorrectness'
minKeyLength = 1
maxKeyLength = 64
minValueLength = 1
maxValueLength = 1000
maxKeysPerTransaction = 50
initialSize = 100
numRandomOperations = 100
readExistingKeysRatio = 0.9

View File

@ -0,0 +1,28 @@
[[test]]
title = 'Test tampering the cluster file with TLS'
multiThreaded = true
buggify = true
tamperClusterFile = true
minFdbThreads = 2
maxFdbThreads = 4
minDatabases = 2
maxDatabases = 4
minClientThreads = 2
maxClientThreads = 4
minClients = 2
maxClients = 4
[[server]]
tls_enabled = true
max_num_processes = 1
[[test.workload]]
name = 'ApiCorrectness'
minKeyLength = 1
maxKeyLength = 64
minValueLength = 1
maxValueLength = 1000
maxKeysPerTransaction = 50
initialSize = 100
numRandomOperations = 100
readExistingKeysRatio = 0.9

View File

@ -46,7 +46,7 @@ int main(int argc, char** argv) {
}
fdb_check(fdb_select_api_version(FDB_API_VERSION));
fdb_check(fdb_setup_network());
std::thread network_thread{ &fdb_run_network };
std::thread network_thread{ [] { fdb_check(fdb_run_network()); } };
fdb_check(
fdb_network_set_option(FDBNetworkOption::FDB_NET_OPTION_TRACE_ENABLE, reinterpret_cast<const uint8_t*>(""), 0));

View File

@ -233,7 +233,7 @@ int main(int argc, char** argv) {
applyNetworkOptions(options);
fdb::network::setup();
std::thread network_thread{ &fdb::network::run };
std::thread network_thread{ [] { fdb_check(fdb::network::run(), "FDB network thread failed"); } };
// Try calling some basic functionality that is available
// in all recent API versions

View File

@ -271,7 +271,7 @@ int main(int argc, char** argv) {
context.applyCommandLine(argc, argv);
fdb_check(fdb_setup_network());
std::thread network_thread{ &fdb_run_network };
std::thread network_thread{ [] { fdb_check(fdb_run_network()); } };
db = fdb_open_database(argv[1]);
timeoutDb = fdb_open_database(argv[1]);

View File

@ -66,7 +66,7 @@ TEST_CASE("setup") {
},
&context));
std::thread network_thread{ &fdb_run_network };
std::thread network_thread{ [] { fdb_check(fdb_run_network()); } };
CHECK(!context.called);
fdb_check(fdb_stop_network());

View File

@ -68,7 +68,7 @@ int main(int argc, char** argv) {
set_net_opt(FDBNetworkOption::FDB_NET_OPTION_TRACE_PARTIAL_FILE_SUFFIX, trace_partial_file_suffix);
fdb_check(fdb_setup_network());
std::thread network_thread{ &fdb_run_network };
std::thread network_thread{ [] { fdb_check(fdb_run_network()); } };
// Apparently you need to open a database to initialize logging
FDBDatabase* out;

View File

@ -2998,7 +2998,7 @@ int main(int argc, char** argv) {
context.applyCommandLine(argc, argv);
fdb_check(fdb_setup_network());
std::thread network_thread{ &fdb_run_network };
std::thread network_thread{ [] { fdb_check(fdb_run_network()); } };
db = fdb_open_database(argv[1]);
clusterFilePath = std::string(argv[1]);

View File

@ -88,7 +88,7 @@ int main(int argc, char** argv) {
context.applyCommandLine(argc, argv);
fdb_check(fdb_setup_network());
std::thread network_thread{ &fdb_run_network };
std::thread network_thread{ [] { fdb_check(fdb_run_network()); } };
{
FDBCluster* cluster;

View File

@ -320,11 +320,11 @@ function(create_long_running_correctness_package)
add_custom_command(
OUTPUT ${tar_file}
DEPENDS ${package_files}
${CMAKE_SOURCE_DIR}/contrib/Joshua/scripts/correctnessTest.sh
${CMAKE_SOURCE_DIR}/contrib/Joshua/scripts/correctnessTimeout.sh
COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_SOURCE_DIR}/contrib/Joshua/scripts/correctnessTest.sh
${CMAKE_SOURCE_DIR}/contrib/Joshua/scripts/longRunningCorrectnessTest.sh
${CMAKE_SOURCE_DIR}/contrib/Joshua/scripts/longRunningCorrectnessTimeout.sh
COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_SOURCE_DIR}/contrib/Joshua/scripts/longRunningCorrectnessTest.sh
${out_dir}/joshua_test
COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_SOURCE_DIR}/contrib/Joshua/scripts/correctnessTimeout.sh
COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_SOURCE_DIR}/contrib/Joshua/scripts/longRunningCorrectnessTimeout.sh
${out_dir}/joshua_timeout
COMMAND ${CMAKE_COMMAND} -E tar cfz ${tar_file} ${package_files}
${out_dir}/joshua_test

View File

@ -0,0 +1,9 @@
#!/bin/sh
# Simulation currently has memory leaks. We need to investigate before we can enable leak detection in joshua.
export ASAN_OPTIONS="detect_leaks=0"
OLDBINDIR="${OLDBINDIR:-/app/deploy/global_data/oldBinaries}"
#mono bin/TestHarness.exe joshua-run "${OLDBINDIR}" false
python3 -m test_harness.app -s ${JOSHUA_SEED} --old-binaries-path ${OLDBINDIR} --long-running

View File

@ -0,0 +1,3 @@
#!/bin/bash -u
python3 -m test_harness.timeout --long-running

View File

@ -184,6 +184,8 @@ class Config:
self.reproduce_prefix: str | None = None
self.reproduce_prefix_args = {'type': str, 'required': False,
'help': 'When printing the results, prepend this string to the command'}
self.long_running: bool = False
self.long_running_args = {'action': 'store_true'}
self._env_names: Dict[str, str] = {}
self._config_map = self._build_map()
self._read_env()

View File

@ -303,6 +303,7 @@ class TestRun:
self.stats: str | None = stats
self.expected_unseed: int | None = expected_unseed
self.use_valgrind: bool = config.use_valgrind
self.long_running: bool = config.long_running
self.old_binary_path: Path = config.old_binaries_path
self.buggify_enabled: bool = buggify_enabled
self.fault_injection_enabled: bool = True
@ -375,7 +376,7 @@ class TestRun:
process = subprocess.Popen(command, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, cwd=self.temp_path,
text=True, env=env)
did_kill = False
timeout = 20 * config.kill_seconds if self.use_valgrind else config.kill_seconds
timeout = 20 * config.kill_seconds if self.use_valgrind or self.long_running else config.kill_seconds
err_out: str
try:
_, err_out = process.communicate(timeout=timeout)

View File

@ -1057,6 +1057,9 @@ ParsedDeltaBoundaryRef deltaAtVersion(const DeltaBoundaryRef& delta, Version beg
beginVersion <= delta.clearVersion.get();
if (delta.values.empty()) {
return ParsedDeltaBoundaryRef(delta.key, clearAfter);
} else if (readVersion >= delta.values.back().version && beginVersion <= delta.values.back().version) {
// for all but zero or one delta files, readVersion >= the entire delta file. optimize this case
return ParsedDeltaBoundaryRef(delta.key, clearAfter, delta.values.back());
}
auto valueAtVersion = std::lower_bound(delta.values.begin(),
delta.values.end(),
@ -1338,6 +1341,10 @@ static RangeResult mergeDeltaStreams(const BlobGranuleChunkRef& chunk,
std::set<int16_t, std::greater<int16_t>> activeClears;
int16_t maxActiveClear = -1;
// trade off memory for cpu performance by assuming all inserts
RangeResult result;
int maxExpectedSize = 0;
// check if a given stream is actively clearing
bool clearActive[streams.size()];
for (int16_t i = 0; i < streams.size(); i++) {
@ -1355,14 +1362,16 @@ static RangeResult mergeDeltaStreams(const BlobGranuleChunkRef& chunk,
item.streamIdx = i;
item.dataIdx = 0;
next.push(item);
maxExpectedSize += streams[i].size();
result.arena().dependsOn(streams[i].arena());
}
}
result.reserve(result.arena(), maxExpectedSize);
if (chunk.snapshotFile.present()) {
stats.snapshotRows += streams[0].size();
}
RangeResult result;
std::vector<MergeStreamNext> cur;
cur.reserve(streams.size());
while (!next.empty()) {
@ -1397,7 +1406,7 @@ static RangeResult mergeDeltaStreams(const BlobGranuleChunkRef& chunk,
if (v.isSet() && maxActiveClear < it.streamIdx) {
KeyRef finalKey =
chunk.tenantPrefix.present() ? v.key.removePrefix(chunk.tenantPrefix.get()) : v.key;
result.push_back_deep(result.arena(), KeyValueRef(finalKey, v.value));
result.push_back(result.arena(), KeyValueRef(finalKey, v.value));
if (!includesSnapshot) {
stats.rowsInserted++;
} else if (it.streamIdx > 0) {
@ -1426,11 +1435,39 @@ static RangeResult mergeDeltaStreams(const BlobGranuleChunkRef& chunk,
}
}
// FIXME: if memory assumption was wrong and result is significantly smaller than total input size, could copy it
// with push_back_deep to a new result. This is rare though
stats.outputBytes += result.expectedSize();
return result;
}
RangeResult materializeJustSnapshot(const BlobGranuleChunkRef& chunk,
Optional<StringRef> snapshotData,
const KeyRange& requestRange,
GranuleMaterializeStats& stats) {
stats.inputBytes += snapshotData.get().size();
Standalone<VectorRef<ParsedDeltaBoundaryRef>> snapshotRows = loadSnapshotFile(
chunk.snapshotFile.get().filename, snapshotData.get(), requestRange, chunk.snapshotFile.get().cipherKeysCtx);
RangeResult result;
if (!snapshotRows.empty()) {
result.arena().dependsOn(snapshotRows.arena());
result.reserve(result.arena(), snapshotRows.size());
for (auto& it : snapshotRows) {
// TODO REMOVE validation
ASSERT(it.op == MutationRef::Type::SetValue);
KeyRef finalKey = chunk.tenantPrefix.present() ? it.key.removePrefix(chunk.tenantPrefix.get()) : it.key;
result.push_back(result.arena(), KeyValueRef(finalKey, it.value));
}
stats.outputBytes += result.expectedSize();
stats.snapshotRows += result.size();
}
return result;
}
RangeResult materializeBlobGranule(const BlobGranuleChunkRef& chunk,
KeyRangeRef keyRange,
Version beginVersion,
@ -1454,6 +1491,11 @@ RangeResult materializeBlobGranule(const BlobGranuleChunkRef& chunk,
requestRange = keyRange;
}
// fast case for only-snapshot read
if (chunk.snapshotFile.present() && chunk.deltaFiles.empty() && chunk.newDeltas.empty()) {
return materializeJustSnapshot(chunk, snapshotData, requestRange, stats);
}
std::vector<Standalone<VectorRef<ParsedDeltaBoundaryRef>>> streams;
std::vector<bool> startClears;
// +1 for possible snapshot, +1 for possible memory deltas
@ -2675,6 +2717,14 @@ struct CommonPrefixStats {
int totalKeys = 0;
int minKeySize = 1000000000;
int maxKeySize = 0;
int64_t logicalBytes = 0;
int64_t totalLogicalBytes = 0;
int deltas = 0;
int deltasSet = 0;
int deltasClear = 0;
int deltasNoOp = 0;
int deltasClearAfter = 0;
void addKey(const KeyRef& k) {
if (len == -1) {
@ -2689,7 +2739,38 @@ struct CommonPrefixStats {
maxKeySize = std::max(maxKeySize, k.size());
}
void addKeyValue(const KeyRef& k, const ValueRef& v) {
addKey(k);
logicalBytes += k.size();
logicalBytes += v.size();
}
void addBoundary(const ParsedDeltaBoundaryRef& d) {
addKey(d.key);
deltas++;
if (d.isSet()) {
deltasSet++;
logicalBytes += d.value.size();
} else if (d.isClear()) {
deltasClear++;
} else {
ASSERT(d.isNoOp());
deltasNoOp++;
}
if (d.clearAfter) {
deltasClearAfter++;
}
}
void doneFile() {
totalLogicalBytes += logicalBytes;
fmt::print("Logical Size: {0}\n", logicalBytes);
logicalBytes = 0;
}
Key done() {
doneFile();
ASSERT(len >= 0);
fmt::print("Common prefix: {0}\nCommon Prefix Length: {1}\nAverage Key Size: {2}\nMin Key Size: {3}, Max Key "
"Size: {4}\n",
@ -2698,11 +2779,21 @@ struct CommonPrefixStats {
totalKeySize / totalKeys,
minKeySize,
maxKeySize);
if (deltas > 0) {
fmt::print("Delta stats: {0} deltas, {1} sets, {2} clears, {3} noops, {4} clearAfters\n",
deltas,
deltasSet,
deltasClear,
deltasNoOp,
deltasClearAfter);
}
fmt::print("Logical Size: {0}\n", totalLogicalBytes);
return key.substr(0, len);
}
};
FileSet loadFileSet(std::string basePath, const std::vector<std::string>& filenames) {
FileSet loadFileSet(std::string basePath, const std::vector<std::string>& filenames, bool newFormat) {
FileSet files;
CommonPrefixStats stats;
for (int i = 0; i < filenames.size(); i++) {
@ -2713,40 +2804,66 @@ FileSet loadFileSet(std::string basePath, const std::vector<std::string>& filena
std::string fpath = basePath + filenames[i];
Value data = loadFileData(fpath);
Arena arena;
GranuleSnapshot file;
ObjectReader dataReader(data.begin(), Unversioned());
dataReader.deserialize(FileIdentifierFor<GranuleSnapshot>::value, file, arena);
Standalone<GranuleSnapshot> parsed(file, arena);
Standalone<GranuleSnapshot> parsed;
if (!newFormat) {
Arena arena;
GranuleSnapshot file;
ObjectReader dataReader(data.begin(), Unversioned());
dataReader.deserialize(FileIdentifierFor<GranuleSnapshot>::value, file, arena);
parsed = Standalone<GranuleSnapshot>(file, arena);
fmt::print("Loaded {0} rows from snapshot file\n", parsed.size());
for (auto& it : parsed) {
stats.addKeyValue(it.key, it.value);
}
} else {
Standalone<VectorRef<ParsedDeltaBoundaryRef>> res = loadSnapshotFile(""_sr, data, normalKeys, {});
fmt::print("Loaded {0} rows from snapshot file\n", res.size());
for (auto& it : res) {
stats.addKeyValue(it.key, it.value);
}
}
fmt::print("Loaded {0} rows from snapshot file\n", parsed.size());
files.snapshotFile = { filenames[i], version, data, parsed };
for (auto& it : parsed) {
stats.addKey(it.key);
}
} else {
std::string fpath = basePath + filenames[i];
Value data = loadFileData(fpath);
Arena arena;
GranuleDeltas file;
ObjectReader dataReader(data.begin(), Unversioned());
dataReader.deserialize(FileIdentifierFor<GranuleDeltas>::value, file, arena);
Standalone<GranuleDeltas> parsed(file, arena);
if (!newFormat) {
Arena arena;
GranuleDeltas file;
ObjectReader dataReader(data.begin(), Unversioned());
dataReader.deserialize(FileIdentifierFor<GranuleDeltas>::value, file, arena);
Standalone<GranuleDeltas> parsed(file, arena);
fmt::print("Loaded {0} deltas from delta file\n", parsed.size());
files.deltaFiles.push_back({ filenames[i], version, data, parsed });
fmt::print("Loaded {0} deltas from delta file\n", parsed.size());
files.deltaFiles.push_back({ filenames[i], version, data, parsed });
for (auto& it : parsed) {
for (auto& it2 : it.mutations) {
stats.addKey(it2.param1);
if (it2.type == MutationRef::Type::ClearRange) {
stats.addKey(it2.param2);
for (auto& it : parsed) {
for (auto& it2 : it.mutations) {
stats.addKey(it2.param1);
if (it2.type == MutationRef::Type::ClearRange) {
stats.addKey(it2.param2);
}
}
}
} else {
bool startClear = false;
Standalone<VectorRef<ParsedDeltaBoundaryRef>> res =
loadChunkedDeltaFile(""_sr, data, normalKeys, 0, version, {}, startClear);
ASSERT(!startClear);
Standalone<GranuleDeltas> parsed;
fmt::print("Loaded {0} boundaries from delta file\n", res.size());
files.deltaFiles.push_back({ filenames[i], version, data, parsed });
for (auto& it : res) {
stats.addBoundary(it);
}
}
}
stats.doneFile();
}
files.commonPrefix = stats.done();
@ -2804,6 +2921,28 @@ std::pair<int64_t, double> doDeltaWriteBench(const Standalone<GranuleDeltas>& da
return { serializedBytes, elapsed };
}
void chunkFromFileSet(const FileSet& fileSet,
Standalone<BlobGranuleChunkRef>& chunk,
StringRef* deltaPtrs,
Version readVersion,
Optional<BlobGranuleCipherKeysCtx> keys,
int numDeltaFiles) {
size_t snapshotSize = std::get<3>(fileSet.snapshotFile).size();
chunk.snapshotFile =
BlobFilePointerRef(chunk.arena(), std::get<0>(fileSet.snapshotFile), 0, snapshotSize, snapshotSize, keys);
for (int i = 0; i < numDeltaFiles; i++) {
size_t deltaSize = std::get<3>(fileSet.deltaFiles[i]).size();
chunk.deltaFiles.emplace_back_deep(
chunk.arena(), std::get<0>(fileSet.deltaFiles[i]), 0, deltaSize, deltaSize, keys);
deltaPtrs[i] = std::get<2>(fileSet.deltaFiles[i]);
}
chunk.keyRange = fileSet.range;
chunk.includedVersion = readVersion;
chunk.snapshotVersion = std::get<1>(fileSet.snapshotFile);
}
FileSet rewriteChunkedFileSet(const FileSet& fileSet,
Optional<BlobGranuleCipherKeysCtx> keys,
Optional<CompressionFilter> compressionFilter) {
@ -2830,41 +2969,30 @@ std::pair<int64_t, double> doReadBench(const FileSet& fileSet,
KeyRange readRange,
bool clearAllAtEnd,
Optional<BlobGranuleCipherKeysCtx> keys,
Optional<CompressionFilter> compressionFilter) {
int numDeltaFiles,
bool printStats = false) {
Version readVersion = std::get<1>(fileSet.deltaFiles.back());
Standalone<BlobGranuleChunkRef> chunk;
GranuleMaterializeStats stats;
StringRef deltaPtrs[fileSet.deltaFiles.size()];
ASSERT(numDeltaFiles >= 0 && numDeltaFiles <= fileSet.deltaFiles.size());
StringRef deltaPtrs[numDeltaFiles];
MutationRef clearAllAtEndMutation;
if (clearAllAtEnd) {
clearAllAtEndMutation = MutationRef(MutationRef::Type::ClearRange, readRange.begin, readRange.end);
}
if (chunked) {
size_t snapshotSize = std::get<3>(fileSet.snapshotFile).size();
chunk.snapshotFile =
BlobFilePointerRef(chunk.arena(), std::get<0>(fileSet.snapshotFile), 0, snapshotSize, snapshotSize, keys);
for (int i = 0; i < fileSet.deltaFiles.size(); i++) {
size_t deltaSize = std::get<3>(fileSet.deltaFiles[i]).size();
chunk.deltaFiles.emplace_back_deep(
chunk.arena(), std::get<0>(fileSet.deltaFiles[i]), 0, deltaSize, deltaSize, keys);
deltaPtrs[i] = std::get<2>(fileSet.deltaFiles[i]);
}
chunkFromFileSet(fileSet, chunk, deltaPtrs, readVersion, keys, numDeltaFiles);
if (clearAllAtEnd) {
readVersion++;
MutationsAndVersionRef lastDelta;
lastDelta.version = readVersion;
lastDelta.mutations.push_back(chunk.arena(), clearAllAtEndMutation);
chunk.includedVersion = readVersion;
chunk.newDeltas.push_back_deep(chunk.arena(), lastDelta);
}
chunk.keyRange = fileSet.range;
chunk.includedVersion = readVersion;
chunk.snapshotVersion = std::get<1>(fileSet.snapshotFile);
}
int64_t serializedBytes = 0;
@ -2897,15 +3025,16 @@ std::pair<int64_t, double> doReadBench(const FileSet& fileSet,
elapsed /= READ_RUNS;
serializedBytes /= READ_RUNS;
// TODO REMOVE
fmt::print("Materialize stats:\n");
fmt::print(" Input bytes: {0}\n", stats.inputBytes);
fmt::print(" Output bytes: {0}\n", stats.outputBytes);
fmt::print(" Write Amp: {0}\n", (1.0 * stats.inputBytes) / stats.outputBytes);
fmt::print(" Snapshot Rows: {0}\n", stats.snapshotRows);
fmt::print(" Rows Cleared: {0}\n", stats.rowsCleared);
fmt::print(" Rows Inserted: {0}\n", stats.rowsInserted);
fmt::print(" Rows Updated: {0}\n", stats.rowsUpdated);
if (printStats) {
fmt::print("Materialize stats:\n");
fmt::print(" Input bytes: {0}\n", stats.inputBytes / READ_RUNS);
fmt::print(" Output bytes: {0}\n", stats.outputBytes / READ_RUNS);
fmt::print(" Write Amp: {0}\n", (1.0 * stats.inputBytes) / stats.outputBytes);
fmt::print(" Snapshot Rows: {0}\n", stats.snapshotRows / READ_RUNS);
fmt::print(" Rows Cleared: {0}\n", stats.rowsCleared / READ_RUNS);
fmt::print(" Rows Inserted: {0}\n", stats.rowsInserted / READ_RUNS);
fmt::print(" Rows Updated: {0}\n", stats.rowsUpdated / READ_RUNS);
}
return { serializedBytes, elapsed };
}
@ -2937,7 +3066,7 @@ TEST_CASE("!/blobgranule/files/benchFromFiles") {
int64_t logicalSnapshotSize = 0;
int64_t logicalDeltaSize = 0;
for (auto& it : fileSetNames) {
FileSet fileSet = loadFileSet(basePath, it);
FileSet fileSet = loadFileSet(basePath, it, false);
fileSets.push_back(fileSet);
logicalSnapshotSize += std::get<3>(fileSet.snapshotFile).expectedSize();
for (auto& deltaFile : fileSet.deltaFiles) {
@ -2968,7 +3097,7 @@ TEST_CASE("!/blobgranule/files/benchFromFiles") {
if (encrypt) {
name += "ENC";
}
if (compressionFilter.present()) {
if (compressionFilter.present() && compressionFilter.get() != CompressionFilter::NONE) {
name += "CMP";
}
if (name.empty()) {
@ -3024,9 +3153,16 @@ TEST_CASE("!/blobgranule/files/benchFromFiles") {
std::vector<std::string> readRunNames = {};
std::vector<std::pair<int64_t, double>> readMetrics;
bool doEdgeCaseReadTests = true;
bool doEdgeCaseReadTests = false;
bool doVaryingDeltaTests = false;
std::vector<double> clearAllReadMetrics;
std::vector<double> readSingleKeyMetrics;
std::vector<std::vector<std::pair<int64_t, double>>> varyingDeltaMetrics;
size_t maxDeltaFiles = 100000;
for (auto& f : fileSets) {
maxDeltaFiles = std::min(maxDeltaFiles, f.deltaFiles.size());
}
for (bool chunk : chunkModes) {
for (bool encrypt : encryptionModes) {
@ -3049,7 +3185,7 @@ TEST_CASE("!/blobgranule/files/benchFromFiles") {
if (encrypt) {
name += "ENC";
}
if (compressionFilter.present()) {
if (compressionFilter.present() && compressionFilter.get() != CompressionFilter::NONE) {
name += "CMP";
}
if (name.empty()) {
@ -3062,6 +3198,10 @@ TEST_CASE("!/blobgranule/files/benchFromFiles") {
double totalElapsed = 0.0;
double totalElapsedClearAll = 0.0;
double totalElapsedSingleKey = 0.0;
std::vector<std::pair<int64_t, double>> varyingDeltas;
for (int i = 0; i <= maxDeltaFiles; i++) {
varyingDeltas.push_back({ 0, 0.0 });
}
for (auto& fileSet : fileSets) {
FileSet newFileSet;
if (!chunk) {
@ -3070,24 +3210,38 @@ TEST_CASE("!/blobgranule/files/benchFromFiles") {
newFileSet = rewriteChunkedFileSet(fileSet, keys, compressionFilter);
}
auto res = doReadBench(newFileSet, chunk, fileSet.range, false, keys, compressionFilter);
auto res = doReadBench(newFileSet, chunk, fileSet.range, false, keys, newFileSet.deltaFiles.size());
totalBytesRead += res.first;
totalElapsed += res.second;
if (doEdgeCaseReadTests) {
totalElapsedClearAll +=
doReadBench(newFileSet, chunk, fileSet.range, true, keys, compressionFilter).second;
doReadBench(newFileSet, chunk, fileSet.range, true, keys, newFileSet.deltaFiles.size())
.second;
Key k = std::get<3>(fileSet.snapshotFile).front().key;
KeyRange singleKeyRange(KeyRangeRef(k, keyAfter(k)));
totalElapsedSingleKey +=
doReadBench(newFileSet, chunk, singleKeyRange, false, keys, compressionFilter).second;
doReadBench(newFileSet, chunk, singleKeyRange, false, keys, newFileSet.deltaFiles.size())
.second;
}
if (doVaryingDeltaTests && chunk) {
for (int i = 0; i <= maxDeltaFiles; i++) {
auto r = doReadBench(newFileSet, chunk, fileSet.range, false, keys, i);
varyingDeltas[i].first += r.first;
varyingDeltas[i].second += r.second;
}
}
}
readMetrics.push_back({ totalBytesRead, totalElapsed });
if (doEdgeCaseReadTests) {
clearAllReadMetrics.push_back(totalElapsedClearAll);
readSingleKeyMetrics.push_back(totalElapsedSingleKey);
}
if (doVaryingDeltaTests) {
varyingDeltaMetrics.push_back(varyingDeltas);
}
}
}
}
@ -3121,6 +3275,25 @@ TEST_CASE("!/blobgranule/files/benchFromFiles") {
}
}
if (doVaryingDeltaTests) {
ASSERT(readRunNames.size() == varyingDeltaMetrics.size());
fmt::print("\n\nVarying Deltas Read Results:\nDF#\t");
for (int i = 0; i <= maxDeltaFiles; i++) {
fmt::print("{0}\t", i);
}
fmt::print("\n");
for (int i = 0; i < readRunNames.size(); i++) {
fmt::print("{0}", readRunNames[i]);
for (auto& it : varyingDeltaMetrics[i]) {
double MBperCPUsec = (it.first / 1024.0 / 1024.0) / it.second;
fmt::print("\t{:.6}", MBperCPUsec);
}
fmt::print("\n");
}
}
fmt::print("\n\nCombined Results:\n");
ASSERT(readRunNames.size() == runNames.size() - 1);
for (int i = 0; i < readRunNames.size(); i++) {
@ -3137,3 +3310,22 @@ TEST_CASE("!/blobgranule/files/benchFromFiles") {
return Void();
}
TEST_CASE("!/blobgranule/files/repeatFromFiles") {
std::string basePath = "SET_ME";
std::vector<std::vector<std::string>> fileSetNames = { { "SET_ME" } };
int64_t totalBytesRead = 0;
double totalElapsed = 0.0;
for (auto& it : fileSetNames) {
FileSet fileSet = loadFileSet(basePath, it, true);
auto res = doReadBench(fileSet, true, fileSet.range, false, {}, fileSet.deltaFiles.size(), true);
totalBytesRead += res.first;
totalElapsed += res.second;
}
double MBperCPUsec = (totalBytesRead / 1024.0 / 1024.0) / totalElapsed;
fmt::print("Read Results: {:.6} MB/cpusec\n", MBperCPUsec);
return Void();
}

View File

@ -421,6 +421,10 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
// Enable this knob only for experminatal purpose, never enable this in production.
// If enabled, all the committed in-memory memtable writes are lost on a crash.
init( ROCKSDB_DISABLE_WAL_EXPERIMENTAL, false );
// If ROCKSDB_SINGLEKEY_DELETES_ON_CLEARRANGE is enabled, disable ENABLE_CLEAR_RANGE_EAGER_READS knob.
// These knobs have contrary functionality.
init( ROCKSDB_SINGLEKEY_DELETES_ON_CLEARRANGE, false ); if( randomize && BUGGIFY ) ROCKSDB_SINGLEKEY_DELETES_ON_CLEARRANGE = deterministicRandom()->coinflip() ? false : true;
init( ROCKSDB_SINGLEKEY_DELETES_BYTES_LIMIT, 200000 ); // 200KB
// Can commit will delay ROCKSDB_CAN_COMMIT_DELAY_ON_OVERLOAD seconds for
// ROCKSDB_CAN_COMMIT_DELAY_TIMES_ON_OVERLOAD times, if rocksdb overloaded.
// Set ROCKSDB_CAN_COMMIT_DELAY_TIMES_ON_OVERLOAD to 0, to disable
@ -788,7 +792,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( RANGESTREAM_LIMIT_BYTES, 2e6 ); if( randomize && BUGGIFY ) RANGESTREAM_LIMIT_BYTES = 1;
init( CHANGEFEEDSTREAM_LIMIT_BYTES, 1e6 ); if( randomize && BUGGIFY ) CHANGEFEEDSTREAM_LIMIT_BYTES = 1;
init( BLOBWORKERSTATUSSTREAM_LIMIT_BYTES, 1e4 ); if( randomize && BUGGIFY ) BLOBWORKERSTATUSSTREAM_LIMIT_BYTES = 1;
init( ENABLE_CLEAR_RANGE_EAGER_READS, true );
init( ENABLE_CLEAR_RANGE_EAGER_READS, true ); if( randomize && BUGGIFY ) ENABLE_CLEAR_RANGE_EAGER_READS = deterministicRandom()->coinflip() ? false : true;
init( CHECKPOINT_TRANSFER_BLOCK_BYTES, 40e6 );
init( QUICK_GET_VALUE_FALLBACK, true );
init( QUICK_GET_KEY_VALUES_FALLBACK, true );

View File

@ -104,6 +104,11 @@ Future<std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>>> getL
// Collect cached cipher keys.
for (auto& domain : domains) {
if (domain.first == FDB_DEFAULT_ENCRYPT_DOMAIN_ID) {
ASSERT(domain.second == FDB_DEFAULT_ENCRYPT_DOMAIN_NAME);
} else if (domain.first == SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_ID) {
ASSERT(domain.second == FDB_SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_NAME);
}
Reference<BlobCipherKey> cachedCipherKey = cipherKeyCache->getLatestCipherKey(domain.first /*domainId*/);
if (cachedCipherKey.isValid()) {
cipherKeys[domain.first] = cachedCipherKey;
@ -301,7 +306,7 @@ template <class T>
Future<TextAndHeaderCipherKeys> getLatestSystemEncryptCipherKeys(const Reference<AsyncVar<T> const>& db,
BlobCipherMetrics::UsageType usageType) {
return getLatestEncryptCipherKeysForDomain(
db, SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_ID, FDB_DEFAULT_ENCRYPT_DOMAIN_NAME, usageType);
db, SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_ID, FDB_SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_NAME, usageType);
}
ACTOR template <class T>

View File

@ -345,6 +345,8 @@ public:
int ROCKSDB_CAN_COMMIT_DELAY_ON_OVERLOAD;
int ROCKSDB_CAN_COMMIT_DELAY_TIMES_ON_OVERLOAD;
bool ROCKSDB_DISABLE_WAL_EXPERIMENTAL;
bool ROCKSDB_SINGLEKEY_DELETES_ON_CLEARRANGE;
int64_t ROCKSDB_SINGLEKEY_DELETES_BYTES_LIMIT;
int64_t ROCKSDB_COMPACTION_READAHEAD_SIZE;
int64_t ROCKSDB_BLOCK_SIZE;
bool ENABLE_SHARDED_ROCKSDB;

View File

@ -687,6 +687,9 @@ struct DDQueue : public IDDRelocationQueue {
Reference<EventCacheHolder> movedKeyServersEventHolder;
int moveReusePhysicalShard;
int moveCreateNewPhysicalShard;
void startRelocation(int priority, int healthPriority) {
// Although PRIORITY_TEAM_REDUNDANT has lower priority than split and merge shard movement,
// we must count it into unhealthyRelocations; because team removers relies on unhealthyRelocations to
@ -750,7 +753,8 @@ struct DDQueue : public IDDRelocationQueue {
output(output), input(input), getShardMetrics(getShardMetrics), getTopKMetrics(getTopKMetrics), lastInterval(0),
suppressIntervals(0), rawProcessingUnhealthy(new AsyncVar<bool>(false)),
rawProcessingWiggle(new AsyncVar<bool>(false)), unhealthyRelocations(0),
movedKeyServersEventHolder(makeReference<EventCacheHolder>("MovedKeyServers")) {}
movedKeyServersEventHolder(makeReference<EventCacheHolder>("MovedKeyServers")), moveReusePhysicalShard(0),
moveCreateNewPhysicalShard(0) {}
DDQueue() = default;
void validate() {
@ -1676,6 +1680,11 @@ ACTOR Future<Void> dataDistributionRelocator(DDQueue* self,
// when !rd.isRestore(), dataMoveId is just decided as physicalShardIDCandidate
// thus, update the physicalShardIDCandidate to related data structures
ASSERT(physicalShardIDCandidate != UID().first());
if (self->physicalShardCollection->physicalShardExists(physicalShardIDCandidate)) {
self->moveReusePhysicalShard++;
} else {
self->moveCreateNewPhysicalShard++;
}
rd.dataMoveId = newShardId(physicalShardIDCandidate, AssignEmptyRange::False);
auto inFlightRange = self->inFlight.rangeContaining(rd.keys.begin);
inFlightRange.value().dataMoveId = rd.dataMoveId;
@ -2472,6 +2481,14 @@ ACTOR Future<Void> dataDistributionQueue(Reference<IDDTxnProcessor> db,
.trackLatest("MovingData"); // This trace event's trackLatest lifetime is controlled by
// DataDistributor::movingDataEventHolder. The track latest
// key we use here must match the key used in the holder.
if (SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA && SERVER_KNOBS->ENABLE_DD_PHYSICAL_SHARD) {
TraceEvent("PhysicalShardMoveStats")
.detail("MoveCreateNewPhysicalShard", self.moveCreateNewPhysicalShard)
.detail("MoveReusePhysicalShard", self.moveReusePhysicalShard);
self.moveCreateNewPhysicalShard = 0;
self.moveReusePhysicalShard = 0;
}
}
when(wait(self.error.getFuture())) {} // Propagate errors from dataDistributionRelocator
when(wait(waitForAll(ddQueueFutures))) {}

View File

@ -2081,6 +2081,10 @@ void PhysicalShardCollection::logPhysicalShardCollection() {
}
}
bool PhysicalShardCollection::physicalShardExists(uint64_t physicalShardID) {
return physicalShardInstances.find(physicalShardID) != physicalShardInstances.end();
}
// FIXME: complete this test with non-empty range
TEST_CASE("/DataDistributor/Tracker/FetchTopK") {
state DataDistributionTracker self;

View File

@ -388,6 +388,15 @@ ACTOR Future<Void> getCipherKeysByBaseCipherKeyIds(Reference<EncryptKeyProxyData
try {
KmsConnLookupEKsByKeyIdsReq keysByIdsReq;
for (const auto& item : lookupCipherInfoMap) {
// TODO: Currently getEncryptCipherKeys does not pass the domain name, once that is fixed we can remove
// the check on the empty domain name
if (!item.second.domainName.empty()) {
if (item.second.domainId == FDB_DEFAULT_ENCRYPT_DOMAIN_ID) {
ASSERT(item.second.domainName == FDB_DEFAULT_ENCRYPT_DOMAIN_NAME);
} else if (item.second.domainId == SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_ID) {
ASSERT(item.second.domainName == FDB_SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_NAME);
}
}
keysByIdsReq.encryptKeyInfos.emplace_back_deep(
keysByIdsReq.arena, item.second.domainId, item.second.baseCipherId, item.second.domainName);
}
@ -527,6 +536,11 @@ ACTOR Future<Void> getLatestCipherKeys(Reference<EncryptKeyProxyData> ekpProxyDa
try {
KmsConnLookupEKsByDomainIdsReq keysByDomainIdReq;
for (const auto& item : lookupCipherDomains) {
if (item.second.domainId == FDB_DEFAULT_ENCRYPT_DOMAIN_ID) {
ASSERT(item.second.domainName == FDB_DEFAULT_ENCRYPT_DOMAIN_NAME);
} else if (item.second.domainId == SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_ID) {
ASSERT(item.second.domainName == FDB_SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_NAME);
}
keysByDomainIdReq.encryptDomainInfos.emplace_back_deep(
keysByDomainIdReq.arena, item.second.domainId, item.second.domainName);
}

View File

@ -53,7 +53,11 @@ struct KeyValueStoreCompressTestData final : IKeyValueStore {
void set(KeyValueRef keyValue, const Arena* arena = nullptr) override {
store->set(KeyValueRef(keyValue.key, pack(keyValue.value)), arena);
}
void clear(KeyRangeRef range, const Arena* arena = nullptr) override { store->clear(range, arena); }
void clear(KeyRangeRef range,
const StorageServerMetrics* storageMetrics = nullptr,
const Arena* arena = nullptr) override {
store->clear(range, storageMetrics, arena);
}
Future<Void> commit(bool sequential = false) override { return store->commit(sequential); }
Future<Optional<Value>> readValue(KeyRef key, Optional<ReadOptions> options) override {

View File

@ -130,7 +130,7 @@ public:
}
}
void clear(KeyRangeRef range, const Arena* arena) override {
void clear(KeyRangeRef range, const StorageServerMetrics* storageMetrics, const Arena* arena) override {
// A commit that occurs with no available space returns Never, so we can throw out all modifications
if (getAvailableSize() <= 0)
return;

View File

@ -1846,22 +1846,52 @@ struct RocksDBKeyValueStore : IKeyValueStore {
void set(KeyValueRef kv, const Arena*) override {
if (writeBatch == nullptr) {
writeBatch.reset(new rocksdb::WriteBatch());
keysSet.clear();
}
ASSERT(defaultFdbCF != nullptr);
writeBatch->Put(defaultFdbCF, toSlice(kv.key), toSlice(kv.value));
if (SERVER_KNOBS->ROCKSDB_SINGLEKEY_DELETES_ON_CLEARRANGE) {
keysSet.insert(kv.key);
}
}
void clear(KeyRangeRef keyRange, const Arena*) override {
void clear(KeyRangeRef keyRange, const StorageServerMetrics* storageMetrics, const Arena*) override {
if (writeBatch == nullptr) {
writeBatch.reset(new rocksdb::WriteBatch());
keysSet.clear();
}
ASSERT(defaultFdbCF != nullptr);
if (keyRange.singleKeyRange()) {
writeBatch->Delete(defaultFdbCF, toSlice(keyRange.begin));
} else {
writeBatch->DeleteRange(defaultFdbCF, toSlice(keyRange.begin), toSlice(keyRange.end));
if (SERVER_KNOBS->ROCKSDB_SINGLEKEY_DELETES_ON_CLEARRANGE && storageMetrics != nullptr &&
storageMetrics->byteSample.getEstimate(keyRange) <
SERVER_KNOBS->ROCKSDB_SINGLEKEY_DELETES_BYTES_LIMIT) {
rocksdb::ReadOptions options = sharedState->getReadOptions();
auto beginSlice = toSlice(keyRange.begin);
auto endSlice = toSlice(keyRange.end);
options.iterate_lower_bound = &beginSlice;
options.iterate_upper_bound = &endSlice;
auto cursor = std::unique_ptr<rocksdb::Iterator>(db->NewIterator(options, defaultFdbCF));
cursor->Seek(toSlice(keyRange.begin));
while (cursor->Valid() && toStringRef(cursor->key()) < keyRange.end) {
writeBatch->Delete(defaultFdbCF, cursor->key());
cursor->Next();
}
if (!cursor->status().ok()) {
// if readrange iteration fails, then do a deleteRange.
writeBatch->DeleteRange(defaultFdbCF, toSlice(keyRange.begin), toSlice(keyRange.end));
} else {
auto it = keysSet.lower_bound(keyRange.begin);
while (it != keysSet.end() && *it < keyRange.end) {
writeBatch->Delete(defaultFdbCF, toSlice(*it));
it++;
}
}
} else {
writeBatch->DeleteRange(defaultFdbCF, toSlice(keyRange.begin), toSlice(keyRange.end));
}
}
}
@ -1890,6 +1920,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
}
auto a = new Writer::CommitAction();
a->batchToCommit = std::move(writeBatch);
keysSet.clear();
auto res = a->done.getFuture();
writeThread->post(a);
return res;
@ -2083,6 +2114,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
Promise<Void> closePromise;
Future<Void> openFuture;
std::unique_ptr<rocksdb::WriteBatch> writeBatch;
std::set<Key> keysSet;
Optional<Future<Void>> metrics;
FlowLock readSemaphore;
int numReadWaiters;

View File

@ -1596,7 +1596,9 @@ public:
StorageBytes getStorageBytes() const override;
void set(KeyValueRef keyValue, const Arena* arena = nullptr) override;
void clear(KeyRangeRef range, const Arena* arena = nullptr) override;
void clear(KeyRangeRef range,
const StorageServerMetrics* storageMetrics = nullptr,
const Arena* arena = nullptr) override;
Future<Void> commit(bool sequential = false) override;
Future<Optional<Value>> readValue(KeyRef key, Optional<ReadOptions> optionss) override;
@ -2215,7 +2217,7 @@ void KeyValueStoreSQLite::set(KeyValueRef keyValue, const Arena* arena) {
++writesRequested;
writeThread->post(new Writer::SetAction(keyValue));
}
void KeyValueStoreSQLite::clear(KeyRangeRef range, const Arena* arena) {
void KeyValueStoreSQLite::clear(KeyRangeRef range, const StorageServerMetrics* storageMetrics, const Arena* arena) {
++writesRequested;
writeThread->post(new Writer::ClearAction(range));
}

View File

@ -49,6 +49,7 @@ static_assert((ROCKSDB_MAJOR == 6 && ROCKSDB_MINOR == 27) ? ROCKSDB_PATCH >= 3 :
"Unsupported rocksdb version. Update the rocksdb to 6.27.3 version");
const std::string rocksDataFolderSuffix = "-data";
const std::string METADATA_SHARD_ID = "kvs-metadata";
const KeyRef shardMappingPrefix("\xff\xff/ShardMapping/"_sr);
// TODO: move constants to a header file.
const StringRef ROCKSDBSTORAGE_HISTOGRAM_GROUP = "RocksDBStorage"_sr;
@ -304,13 +305,12 @@ rocksdb::ReadOptions getReadOptions() {
}
struct ReadIterator {
rocksdb::ColumnFamilyHandle* cf;
uint64_t index; // incrementing counter to uniquely identify read iterator.
bool inUse;
std::shared_ptr<rocksdb::Iterator> iter;
double creationTime;
ReadIterator(rocksdb::ColumnFamilyHandle* cf, uint64_t index, rocksdb::DB* db, rocksdb::ReadOptions& options)
: cf(cf), index(index), inUse(true), creationTime(now()), iter(db->NewIterator(options, cf)) {}
: index(index), inUse(true), creationTime(now()), iter(db->NewIterator(options, cf)) {}
};
/*
@ -475,13 +475,26 @@ struct PhysicalShard {
}
~PhysicalShard() {
if (!deletePending)
return;
logShardEvent(id, ShardOp::CLOSE);
isInitialized.store(false);
readIterPool.reset();
// Destroy CF
auto s = db->DropColumnFamily(cf);
// Deleting default column family is not allowed.
if (id == "default") {
return;
}
if (deletePending) {
auto s = db->DropColumnFamily(cf);
if (!s.ok()) {
logRocksDBError(s, "DestroyShard");
logShardEvent(id, ShardOp::DESTROY, SevError, s.ToString());
return;
}
}
auto s = db->DestroyColumnFamilyHandle(cf);
if (!s.ok()) {
logRocksDBError(s, "DestroyShard");
logRocksDBError(s, "DestroyCFHandle");
logShardEvent(id, ShardOp::DESTROY, SevError, s.ToString());
return;
}
@ -628,7 +641,7 @@ public:
std::vector<rocksdb::ColumnFamilyDescriptor> descriptors;
bool foundMetadata = false;
for (const auto& name : columnFamilies) {
if (name == "kvs-metadata") {
if (name == METADATA_SHARD_ID) {
foundMetadata = true;
}
descriptors.push_back(rocksdb::ColumnFamilyDescriptor{ name, cfOptions });
@ -652,19 +665,19 @@ public:
TraceEvent(SevInfo, "ShardedRocksInitLoadPhysicalShards", this->logId)
.detail("PhysicalShardCount", handles.size());
std::shared_ptr<PhysicalShard> metadataShard = nullptr;
for (auto handle : handles) {
if (handle->GetName() == "kvs-metadata") {
metadataShard = std::make_shared<PhysicalShard>(db, "kvs-metadata", handle);
} else {
physicalShards[handle->GetName()] = std::make_shared<PhysicalShard>(db, handle->GetName(), handle);
auto shard = std::make_shared<PhysicalShard>(db, handle->GetName(), handle);
if (shard->id == METADATA_SHARD_ID) {
metadataShard = shard;
}
physicalShards[shard->id] = shard;
columnFamilyMap[handle->GetID()] = handle;
TraceEvent(SevVerbose, "ShardedRocksInitPhysicalShard", this->logId)
.detail("PhysicalShard", handle->GetName());
TraceEvent(SevVerbose, "ShardedRocksInitPhysicalShard", this->logId).detail("PhysicalShard", shard->id);
}
std::set<std::string> unusedShards(columnFamilies.begin(), columnFamilies.end());
unusedShards.erase("kvs-metadata");
unusedShards.erase(METADATA_SHARD_ID);
unusedShards.erase("default");
KeyRange keyRange = prefixRange(shardMappingPrefix);
@ -746,9 +759,11 @@ public:
defaultShard->dataShards[specialKeys.begin.toString()] = std::move(dataShard);
physicalShards[defaultShard->id] = defaultShard;
metadataShard = std::make_shared<PhysicalShard>(db, "kvs-metadata");
// Create metadata shard.
auto metadataShard = std::make_shared<PhysicalShard>(db, METADATA_SHARD_ID);
metadataShard->init();
columnFamilyMap[metadataShard->cf->GetID()] = metadataShard->cf;
physicalShards[METADATA_SHARD_ID] = metadataShard;
// Write special key range metadata.
writeBatch = std::make_unique<rocksdb::WriteBatch>();
@ -763,7 +778,6 @@ public:
TraceEvent(SevInfo, "ShardedRocksInitializeMetaDataShard", this->logId)
.detail("MetadataShardCF", metadataShard->cf->GetID());
}
physicalShards["kvs-metadata"] = metadataShard;
writeBatch = std::make_unique<rocksdb::WriteBatch>();
dirtyShards = std::make_unique<std::set<PhysicalShard*>>();
@ -910,6 +924,9 @@ public:
std::vector<std::shared_ptr<PhysicalShard>> getPendingDeletionShards(double cleanUpDelay) {
std::vector<std::shared_ptr<PhysicalShard>> emptyShards;
double currentTime = now();
TraceEvent(SevInfo, "ShardedRocksDB", logId)
.detail("PendingDeletionShardQueueSize", pendingDeletionShards.size());
while (!pendingDeletionShards.empty()) {
const auto& id = pendingDeletionShards.front();
auto it = physicalShards.find(id);
@ -976,6 +993,10 @@ public:
.detail("Info", "RangeToPersist")
.detail("BeginKey", range.begin)
.detail("EndKey", range.end);
auto it = physicalShards.find(METADATA_SHARD_ID);
ASSERT(it != physicalShards.end());
auto metadataShard = it->second;
writeBatch->DeleteRange(metadataShard->cf,
getShardMappingKey(range.begin, shardMappingPrefix),
getShardMappingKey(range.end, shardMappingPrefix));
@ -1043,24 +1064,30 @@ public:
}
void closeAllShards() {
for (auto& [_, shard] : physicalShards) {
shard->readIterPool.reset();
}
columnFamilyMap.clear();
physicalShards.clear();
// Close DB.
auto s = db->Close();
if (!s.ok()) {
logRocksDBError(s, "Close");
return;
}
TraceEvent("ShardedRocksDB", this->logId).detail("Info", "DBClosed");
}
void destroyAllShards() {
closeAllShards();
std::vector<rocksdb::ColumnFamilyDescriptor> cfs;
for (const auto& [key, _] : physicalShards) {
cfs.push_back(rocksdb::ColumnFamilyDescriptor{ key, getCFOptions() });
columnFamilyMap.clear();
for (auto& [_, shard] : physicalShards) {
shard->deletePending = true;
}
auto s = rocksdb::DestroyDB(path, getOptions(), cfs);
physicalShards.clear();
// Close DB.
auto s = db->Close();
if (!s.ok()) {
logRocksDBError(s, "Close");
return;
}
s = rocksdb::DestroyDB(path, getOptions());
if (!s.ok()) {
logRocksDBError(s, "DestroyDB");
}
@ -1121,7 +1148,6 @@ private:
std::unique_ptr<rocksdb::WriteBatch> writeBatch;
std::unique_ptr<std::set<PhysicalShard*>> dirtyShards;
KeyRangeMap<DataShard*> dataShardMap;
std::shared_ptr<PhysicalShard> metadataShard = nullptr;
std::deque<std::string> pendingDeletionShards;
};
@ -2240,6 +2266,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
// TODO: Adapt the simulation framework to not advance time quickly when background reads/writes are
// occurring.
if (g_network->isSimulated()) {
TraceEvent(SevDebug, "ShardedRocksDB").detail("Info", "Use Coro threads in simulation.");
writeThread = CoroThreadPool::createThreadPool();
readThreads = CoroThreadPool::createThreadPool();
} else {
@ -2316,7 +2343,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
void set(KeyValueRef kv, const Arena*) override { shardManager.put(kv.key, kv.value); }
void clear(KeyRangeRef range, const Arena*) override {
void clear(KeyRangeRef range, const StorageServerMetrics*, const Arena*) override {
if (range.singleKeyRange()) {
shardManager.clear(range.begin);
} else {

View File

@ -529,6 +529,7 @@ ACTOR Future<Void> fetchCheckpointFile(Database cx,
state int64_t offset = 0;
state Reference<IAsyncFile> asyncFile;
loop {
offset = 0;
try {
asyncFile = Reference<IAsyncFile>();
++attempt;
@ -559,7 +560,8 @@ ACTOR Future<Void> fetchCheckpointFile(Database cx,
offset += rep.data.size();
}
} catch (Error& e) {
if (e.code() != error_code_end_of_stream) {
if (e.code() != error_code_end_of_stream ||
(g_network->isSimulated() && attempt == 1 && deterministicRandom()->coinflip())) {
TraceEvent("FetchCheckpointFileError")
.errorUnsuppressed(e)
.detail("RemoteFile", remoteFile)

View File

@ -8208,7 +8208,9 @@ public:
Future<Void> getError() const override { return delayed(m_error.getFuture()); };
void clear(KeyRangeRef range, const Arena* arena = 0) override {
void clear(KeyRangeRef range,
const StorageServerMetrics* storageMetrics = nullptr,
const Arena* arena = 0) override {
debug_printf("CLEAR %s\n", printable(range).c_str());
m_tree->clear(range);
}

View File

@ -322,6 +322,9 @@ public:
// Log physicalShard
void logPhysicalShardCollection();
// Checks if a physical shard exists.
bool physicalShardExists(uint64_t physicalShardID);
private:
// Track physicalShard metrics by tracking keyRange metrics
void updatePhysicalShardMetricsByKeyRange(KeyRange keyRange,

View File

@ -29,6 +29,7 @@
#include "fdbserver/IClosable.h"
#include "fdbserver/IPageEncryptionKeyProvider.actor.h"
#include "fdbserver/ServerDBInfo.h"
#include "fdbserver/StorageMetrics.h"
struct CheckpointRequest {
const Version version; // The FDB version at which the checkpoint is created.
@ -52,7 +53,9 @@ public:
// persistRangeMapping().
virtual bool shardAware() const { return false; }
virtual void set(KeyValueRef keyValue, const Arena* arena = nullptr) = 0;
virtual void clear(KeyRangeRef range, const Arena* arena = nullptr) = 0;
virtual void clear(KeyRangeRef range,
const StorageServerMetrics* storageMetrics = nullptr,
const Arena* arena = nullptr) = 0;
virtual Future<Void> canCommit() { return Void(); }
virtual Future<Void> commit(
bool sequential = false) = 0; // returns when prior sets and clears are (atomically) durable

View File

@ -390,7 +390,9 @@ struct RemoteIKeyValueStore : public IKeyValueStore {
void set(KeyValueRef keyValue, const Arena* arena = nullptr) override {
interf.set.send(IKVSSetRequest{ keyValue, ReplyPromise<Void>() });
}
void clear(KeyRangeRef range, const Arena* arena = nullptr) override {
void clear(KeyRangeRef range,
const StorageServerMetrics* storageMetrics = nullptr,
const Arena* arena = nullptr) override {
interf.clear.send(IKVSClearRequest{ range, ReplyPromise<Void>() });
}

View File

@ -2231,6 +2231,7 @@ ACTOR Future<Void> deleteCheckpointQ(StorageServer* self, Version version, Check
// Serves FetchCheckpointRequests.
ACTOR Future<Void> fetchCheckpointQ(StorageServer* self, FetchCheckpointRequest req) {
state ICheckpointReader* reader = nullptr;
state int64_t totalSize = 0;
TraceEvent("ServeFetchCheckpointBegin", self->thisServerID)
.detail("CheckpointID", req.checkpointID)
.detail("Token", req.token);
@ -2255,12 +2256,14 @@ ACTOR Future<Void> fetchCheckpointQ(StorageServer* self, FetchCheckpointRequest
FetchCheckpointReply reply(req.token);
reply.data = data;
req.reply.send(reply);
totalSize += data.size();
}
} catch (Error& e) {
if (e.code() == error_code_end_of_stream) {
req.reply.sendError(end_of_stream());
TraceEvent("ServeFetchCheckpointEnd", self->thisServerID)
.detail("CheckpointID", req.checkpointID)
.detail("TotalSize", totalSize)
.detail("Token", req.token);
} else {
TraceEvent(SevWarnAlways, "ServerFetchCheckpointFailure")
@ -9471,7 +9474,7 @@ void setAssignedStatus(StorageServer* self, KeyRangeRef keys, bool nowAssigned)
}
void StorageServerDisk::clearRange(KeyRangeRef keys) {
storage->clear(keys);
storage->clear(keys, &data->metrics);
++(*kvClearRanges);
}
@ -9485,7 +9488,7 @@ void StorageServerDisk::writeMutation(MutationRef mutation) {
storage->set(KeyValueRef(mutation.param1, mutation.param2));
*kvCommitLogicalBytes += mutation.expectedSize();
} else if (mutation.type == MutationRef::ClearRange) {
storage->clear(KeyRangeRef(mutation.param1, mutation.param2));
storage->clear(KeyRangeRef(mutation.param1, mutation.param2), &data->metrics);
++(*kvClearRanges);
} else
ASSERT(false);
@ -9500,7 +9503,7 @@ void StorageServerDisk::writeMutations(const VectorRef<MutationRef>& mutations,
storage->set(KeyValueRef(m.param1, m.param2));
*kvCommitLogicalBytes += m.expectedSize();
} else if (m.type == MutationRef::ClearRange) {
storage->clear(KeyRangeRef(m.param1, m.param2));
storage->clear(KeyRangeRef(m.param1, m.param2), &data->metrics);
++(*kvClearRanges);
}
}
@ -9929,7 +9932,7 @@ ACTOR Future<bool> restoreDurableState(StorageServer* data, IKeyValueStore* stor
++data->counters.kvSystemClearRanges;
// TODO(alexmiller): Figure out how to selectively enable spammy data distribution events.
// DEBUG_KEY_RANGE("clearInvalidVersion", invalidVersion, clearRange);
storage->clear(clearRange);
storage->clear(clearRange, &data->metrics);
++data->counters.kvSystemClearRanges;
data->byteSampleApplyClear(clearRange, invalidVersion);
}

View File

@ -75,9 +75,11 @@ struct SSCheckpointRestoreWorkload : TestWorkload {
state KeyRange testRange = KeyRangeRef(key, endKey);
state std::vector<CheckpointMetaData> records;
TraceEvent("TestCheckpointRestoreBegin");
int ignore = wait(setDDMode(cx, 0));
state Version version = wait(self->writeAndVerify(self, cx, key, oldValue));
TraceEvent("TestCreatingCheckpoint").detail("Range", testRange);
// Create checkpoint.
state Transaction tr(cx);
state CheckpointFormat format = deterministicRandom()->coinflip() ? RocksDBColumnFamily : RocksDB;

View File

@ -335,6 +335,7 @@ if(WITH_PYTHON)
add_fdb_test(TEST_FILES slow/DifferentClustersSameRV.toml)
add_fdb_test(TEST_FILES slow/DiskFailureCycle.toml)
add_fdb_test(TEST_FILES slow/FastTriggeredWatches.toml)
add_fdb_test(TEST_FILES slow/LongRunning.toml LONG_RUNNING)
add_fdb_test(TEST_FILES slow/LowLatencyWithFailures.toml)
add_fdb_test(TEST_FILES slow/MetaclusterManagement.toml)
add_fdb_test(TEST_FILES slow/MoveKeysClean.toml)

View File

@ -213,8 +213,12 @@ logdir = {logdir}
tls_config=self.tls_conf_string(),
authz_public_key_config=self.authz_public_key_conf_string(),
optional_tls=":tls" if self.tls_config is not None else "",
custom_config='\n'.join(["{} = {}".format(key, value) for key, value in self.custom_config.items()]),
use_future_protocol_version="use-future-protocol-version = true" if self.use_future_protocol_version else "",
custom_config="\n".join(
["{} = {}".format(key, value) for key, value in self.custom_config.items()]
),
use_future_protocol_version="use-future-protocol-version = true"
if self.use_future_protocol_version
else "",
)
)
# By default, the cluster only has one process
@ -534,3 +538,29 @@ logdir = {logdir}
self.save_config()
self.wait_for_server_update()
print("Old servers successfully removed from the cluster. Time: {}s".format(time.time() - start_time))
# Check the cluster log for errors
def check_cluster_logs(self, error_limit=100):
sev40s = subprocess.getoutput("grep -r 'Severity=\"40\"' {}".format(self.log.as_posix())).rstrip().splitlines()
err_cnt = 0
for line in sev40s:
# When running ASAN we expect to see this message. Boost coroutine should be using the
# correct asan annotations so that it shouldn't produce any false positives.
if line.endswith(
"WARNING: ASan doesn't fully support makecontext/swapcontext functions and may produce false "
"positives in some cases!"
):
continue
if err_cnt < error_limit:
print(line)
err_cnt += 1
if err_cnt > 0:
print(
">>>>>>>>>>>>>>>>>>>> Found {} severity 40 events - the test fails",
err_cnt,
)
else:
print("No errors found in logs")
return err_cnt == 0

View File

@ -145,8 +145,9 @@ class UpgradeTest:
self.cluster.fdbmonitor_binary = self.downloader.binary_path(version, "fdbmonitor")
self.cluster.fdbserver_binary = self.downloader.binary_path(version, "fdbserver")
self.cluster.fdbcli_binary = self.downloader.binary_path(version, "fdbcli")
self.cluster.set_env_var("LD_LIBRARY_PATH", "%s:%s" % (
self.downloader.lib_dir(version), os.getenv("LD_LIBRARY_PATH")))
self.cluster.set_env_var(
"LD_LIBRARY_PATH", "%s:%s" % (self.downloader.lib_dir(version), os.getenv("LD_LIBRARY_PATH"))
)
self.cluster.use_legacy_conf_syntax = version_before(version, "7.1.0")
self.cluster.use_future_protocol_version = version == FUTURE_VERSION
self.cluster.save_config()
@ -325,36 +326,6 @@ class UpgradeTest:
.splitlines()
)
# Check the cluster log for errors
def check_cluster_logs(self, error_limit=100):
sev40s = (
subprocess.getoutput("grep -r 'Severity=\"40\"' {}".format(self.cluster.log.as_posix()))
.rstrip()
.splitlines()
)
err_cnt = 0
for line in sev40s:
# When running ASAN we expect to see this message. Boost coroutine should be using the
# correct asan annotations so that it shouldn't produce any false positives.
if line.endswith(
"WARNING: ASan doesn't fully support makecontext/swapcontext functions and may produce false "
"positives in some cases!"
):
continue
if err_cnt < error_limit:
print(line)
err_cnt += 1
if err_cnt > 0:
print(
">>>>>>>>>>>>>>>>>>>> Found {} severity 40 events - the test fails",
err_cnt,
)
else:
print("No errors found in logs")
return err_cnt == 0
# Check the server and client logs for warnings and dump them
def dump_warnings_in_logs(self, limit=100):
sev30s = (
@ -454,7 +425,7 @@ if __name__ == "__main__":
print("data-dir: {}".format(test.data))
print("cluster-file: {}".format(test.etc.joinpath("fdb.cluster")))
errcode = test.exec_test(args)
if not test.check_cluster_logs():
if not test.cluster.check_cluster_logs():
errcode = 1 if errcode == 0 else errcode
test.dump_warnings_in_logs()
if errcode != 0 and not args.disable_log_dump:

View File

@ -0,0 +1,12 @@
[[test]]
testTitle = 'CycleTestWithKills'
[[test.workload]]
testName = 'Cycle'
transactionsPerSecond = 2500.0
testDuration = 10000.0
expectedRate = 0
[[test.workload]]
testName = 'Attrition'
testDuration = 10000.0