Merge branch 'main' into transaction-debug-logging

This commit is contained in:
A.J. Beamon 2023-03-01 10:09:17 -08:00
commit 544890a6cd
4 changed files with 114 additions and 57 deletions

View File

@ -254,38 +254,34 @@ namespace ThrottleApi {
// or using IClientAPI like IDatabase, ITransaction
ACTOR template <class Tr>
Future<bool> getValidAutoEnabled(Reference<Tr> tr) {
state bool result;
loop {
// hold the returned standalone object's memory
state typename Tr::template FutureT<Optional<Value>> valueF = tr->get(tagThrottleAutoEnabledKey);
Optional<Value> value = wait(safeThreadFutureToFuture(valueF));
if (!value.present()) {
tr->reset();
wait(delay(CLIENT_KNOBS->DEFAULT_BACKOFF));
continue;
} else if (value.get() == "1"_sr) {
result = true;
} else if (value.get() == "0"_sr) {
result = false;
} else {
TraceEvent(SevWarnAlways, "InvalidAutoTagThrottlingValue").detail("Value", value.get());
tr->reset();
wait(delay(CLIENT_KNOBS->DEFAULT_BACKOFF));
continue;
}
return result;
};
Future<Optional<bool>> getValidAutoEnabled(Reference<Tr> tr) {
// hold the returned standalone object's memory
state typename Tr::template FutureT<Optional<Value>> valueF = tr->get(tagThrottleAutoEnabledKey);
Optional<Value> value = wait(safeThreadFutureToFuture(valueF));
if (!value.present()) {
return {};
} else if (value.get() == "1"_sr) {
return true;
} else if (value.get() == "0"_sr) {
return false;
} else {
TraceEvent(SevWarnAlways, "InvalidAutoTagThrottlingValue").detail("Value", value.get());
return {};
}
}
ACTOR template <class DB>
Future<std::vector<TagThrottleInfo>> getRecommendedTags(Reference<DB> db, int limit) {
state Reference<typename DB::TransactionT> tr = db->createTransaction();
loop {
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
try {
bool enableAuto = wait(getValidAutoEnabled(tr));
if (enableAuto) {
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
Optional<bool> enableAuto = wait(getValidAutoEnabled(tr));
if (!enableAuto.present()) {
tr->reset();
wait(delay(CLIENT_KNOBS->DEFAULT_BACKOFF));
continue;
} else if (enableAuto.get()) {
return std::vector<TagThrottleInfo>();
}
state typename DB::TransactionT::template FutureT<RangeResult> f =
@ -307,15 +303,19 @@ ACTOR template <class DB>
Future<std::vector<TagThrottleInfo>>
getThrottledTags(Reference<DB> db, int limit, ContainsRecommended containsRecommended = ContainsRecommended::False) {
state Reference<typename DB::TransactionT> tr = db->createTransaction();
state bool reportAuto = containsRecommended;
state Optional<bool> reportAuto;
loop {
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
try {
if (!containsRecommended) {
wait(store(reportAuto, getValidAutoEnabled(tr)));
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
wait(store(reportAuto, getValidAutoEnabled(tr)));
if (!reportAuto.present()) {
tr->reset();
wait(delay(CLIENT_KNOBS->DEFAULT_BACKOFF));
continue;
}
state typename DB::TransactionT::template FutureT<RangeResult> f = tr->getRange(
reportAuto ? tagThrottleKeys : KeyRangeRef(tagThrottleKeysPrefix, tagThrottleAutoKeysPrefix), limit);
reportAuto.get() ? tagThrottleKeys : KeyRangeRef(tagThrottleKeysPrefix, tagThrottleAutoKeysPrefix),
limit);
RangeResult throttles = wait(safeThreadFutureToFuture(f));
std::vector<TagThrottleInfo> results;
for (auto throttle : throttles) {

View File

@ -3103,7 +3103,9 @@ ACTOR Future<Void> monitorBlobWorker(Reference<BlobManagerData> bmData, BlobWork
choose {
when(wait(waitFailure)) {
wait(delay(SERVER_KNOBS->BLOB_WORKER_REJOIN_TIME));
if (SERVER_KNOBS->BLOB_WORKER_DISK_ENABLED) {
wait(delay(SERVER_KNOBS->BLOB_WORKER_REJOIN_TIME));
}
if (BM_DEBUG) {
fmt::print("BM {0} detected BW {1} is dead\n", bmData->epoch, bwInterf.id().toString());
}

View File

@ -1000,7 +1000,7 @@ TEST_CASE("/flow/Arena/OptionalMap") {
}
TEST_CASE("/flow/Arena/Secure") {
#ifndef USE_SANITIZER
#if !defined(USE_SANITIZER) && !defined(VALGRIND)
// Note: Assumptions underlying this unit test are speculative.
// Disable for a build configuration or entirely if deemed flaky.
// As of writing, below equivalency of (buf == newBuf) holds except for ASAN builds.
@ -1053,6 +1053,6 @@ TEST_CASE("/flow/Arena/Secure") {
}
}
fmt::print("Total iterations: {}, # of times check passed: {}\n", totalIters, samePtrCount);
#endif // USE_SANITIZER
#endif // !defined(USE_SANITIZER) && !defind(VALGRIND)
return Void();
}

View File

@ -20,7 +20,7 @@ TENANT_API_VERSION = 720
CLUSTER_ACTIONS = ["wiggle"]
HEALTH_CHECK_TIMEOUT_SEC = 5
PROGRESS_CHECK_TIMEOUT_SEC = 30
PROGRESS_CHECK_TIMEOUT_SEC = 60
TESTER_STATS_INTERVAL_SEC = 5
TRANSACTION_RETRY_LIMIT = 100
RUN_WITH_GDB = False
@ -64,7 +64,9 @@ class UpgradeTest:
self.create_external_lib_dir()
self.testing_future_version = FUTURE_VERSION in self.upgrade_path
self.future_version_client_lib_path = (
self.downloader.lib_path(FUTURE_VERSION) if self.testing_future_version else None
self.downloader.lib_path(FUTURE_VERSION)
if self.testing_future_version
else None
)
init_version = self.upgrade_path[0]
self.cluster = LocalCluster(
@ -82,8 +84,12 @@ class UpgradeTest:
self.log = self.cluster.log
self.etc = self.cluster.etc
self.data = self.cluster.data
self.input_pipe_path = self.tmp_dir.joinpath("input.{}".format(random_alphanum_string(8)))
self.output_pipe_path = self.tmp_dir.joinpath("output.{}".format(random_alphanum_string(8)))
self.input_pipe_path = self.tmp_dir.joinpath(
"input.{}".format(random_alphanum_string(8))
)
self.output_pipe_path = self.tmp_dir.joinpath(
"output.{}".format(random_alphanum_string(8))
)
os.mkfifo(self.input_pipe_path)
os.mkfifo(self.output_pipe_path)
self.progress_event = Event()
@ -109,7 +115,9 @@ class UpgradeTest:
continue
src_file_path = self.downloader.lib_path(version)
assert src_file_path.exists(), "{} does not exist".format(src_file_path)
target_file_path = self.external_lib_dir.joinpath("libfdb_c.{}.so".format(version))
target_file_path = self.external_lib_dir.joinpath(
"libfdb_c.{}.so".format(version)
)
shutil.copyfile(src_file_path, target_file_path)
# Perform a health check of the cluster: Use fdbcli status command to check if the number of
@ -125,7 +133,11 @@ class UpgradeTest:
continue
num_proc = len(status["cluster"]["processes"])
if num_proc != self.cluster.process_number:
print("Health check: {} of {} processes found. Retrying".format(num_proc, self.cluster.process_number))
print(
"Health check: {} of {} processes found. Retrying".format(
num_proc, self.cluster.process_number
)
)
time.sleep(1)
continue
expected_version = self.cluster_version
@ -133,7 +145,9 @@ class UpgradeTest:
expected_version = CURRENT_VERSION
for (_, proc_stat) in status["cluster"]["processes"].items():
proc_ver = proc_stat["version"]
assert proc_ver == expected_version, "Process version: expected: {}, actual: {}".format(
assert (
proc_ver == expected_version
), "Process version: expected: {}, actual: {}".format(
expected_version, proc_ver
)
print("Health check: OK")
@ -142,11 +156,16 @@ class UpgradeTest:
# Create and save a cluster configuration for the given version
def configure_version(self, version):
self.cluster.fdbmonitor_binary = self.downloader.binary_path(version, "fdbmonitor")
self.cluster.fdbserver_binary = self.downloader.binary_path(version, "fdbserver")
self.cluster.fdbmonitor_binary = self.downloader.binary_path(
version, "fdbmonitor"
)
self.cluster.fdbserver_binary = self.downloader.binary_path(
version, "fdbserver"
)
self.cluster.fdbcli_binary = self.downloader.binary_path(version, "fdbcli")
self.cluster.set_env_var(
"LD_LIBRARY_PATH", "%s:%s" % (self.downloader.lib_dir(version), os.getenv("LD_LIBRARY_PATH"))
"LD_LIBRARY_PATH",
"%s:%s" % (self.downloader.lib_dir(version), os.getenv("LD_LIBRARY_PATH")),
)
self.cluster.use_legacy_conf_syntax = version_before(version, "7.1.0")
self.cluster.use_future_protocol_version = version == FUTURE_VERSION
@ -165,7 +184,9 @@ class UpgradeTest:
def __enter__(self):
print("Starting cluster version {}".format(self.cluster_version))
self.cluster.start_cluster()
self.cluster.create_database(enable_tenants=(self.api_version >= TENANT_API_VERSION))
self.cluster.create_database(
enable_tenants=(self.api_version >= TENANT_API_VERSION)
)
return self
def __exit__(self, xc_type, exc_value, traceback):
@ -213,12 +234,24 @@ class UpgradeTest:
if RUN_WITH_GDB:
cmd_args = ["gdb", "-ex", "run", "--args"] + cmd_args
if FUTURE_VERSION in self.upgrade_path:
cmd_args += ["--future-version-client-library", self.future_version_client_lib_path]
cmd_args += [
"--future-version-client-library",
self.future_version_client_lib_path,
]
if self.cluster.blob_granules_enabled:
cmd_args += ["--blob-granule-local-file-path", str(self.cluster.data.joinpath("fdbblob")) + "/"]
print("Executing test command: {}".format(" ".join([str(c) for c in cmd_args])))
cmd_args += [
"--blob-granule-local-file-path",
str(self.cluster.data.joinpath("fdbblob")) + "/",
]
print(
"Executing test command: {}".format(
" ".join([str(c) for c in cmd_args])
)
)
self.tester_proc = subprocess.Popen(cmd_args, stdout=sys.stdout, stderr=sys.stderr)
self.tester_proc = subprocess.Popen(
cmd_args, stdout=sys.stdout, stderr=sys.stderr
)
self.tester_retcode = self.tester_proc.wait()
self.tester_proc = None
@ -231,7 +264,9 @@ class UpgradeTest:
# If the tester failed to initialize, other threads of the test may stay
# blocked on trying to open the named pipes
if self.ctrl_pipe is None or self.output_pipe is None:
print("Tester failed before initializing named pipes. Aborting the test")
print(
"Tester failed before initializing named pipes. Aborting the test"
)
os._exit(1)
# Perform a progress check: Trigger it and wait until it is completed
@ -242,7 +277,9 @@ class UpgradeTest:
if self.progress_event.is_set():
print("Progress check: OK")
else:
assert False, "Progress check failed after upgrade to version {}".format(self.cluster_version)
assert False, "Progress check failed after upgrade to version {}".format(
self.cluster_version
)
# The main function of a thread for reading and processing
# the notifications received from the tester
@ -273,7 +310,9 @@ class UpgradeTest:
if entry == "wiggle":
self.cluster.cluster_wiggle()
else:
assert entry in self.used_versions, "Unexpected entry in the upgrade path: {}".format(entry)
assert (
entry in self.used_versions
), "Unexpected entry in the upgrade path: {}".format(entry)
self.upgrade_to(entry)
self.health_check()
self.progress_check()
@ -325,7 +364,11 @@ class UpgradeTest:
def grep_logs_for_events(self, severity):
return (
subprocess.getoutput("grep -r 'Severity=\"{}\"' {}".format(severity, self.cluster.log.as_posix()))
subprocess.getoutput(
"grep -r 'Severity=\"{}\"' {}".format(
severity, self.cluster.log.as_posix()
)
)
.rstrip()
.splitlines()
)
@ -333,7 +376,9 @@ class UpgradeTest:
# Check the server and client logs for warnings and dump them
def dump_warnings_in_logs(self, limit=100):
sev30s = (
subprocess.getoutput("grep -r 'Severity=\"30\"' {}".format(self.cluster.log.as_posix()))
subprocess.getoutput(
"grep -r 'Severity=\"30\"' {}".format(self.cluster.log.as_posix())
)
.rstrip()
.splitlines()
)
@ -341,7 +386,11 @@ class UpgradeTest:
if len(sev30s) == 0:
print("No warnings found in logs")
else:
print(">>>>>>>>>>>>>>>>>>>> Found {} severity 30 events (warnings):".format(len(sev30s)))
print(
">>>>>>>>>>>>>>>>>>>> Found {} severity 30 events (warnings):".format(
len(sev30s)
)
)
for line in sev30s[:limit]:
print(line)
@ -378,7 +427,9 @@ if __name__ == "__main__":
"--upgrade-path",
nargs="+",
help="Cluster upgrade path: a space separated list of versions.\n"
+ "The list may also contain cluster change actions: {}".format(CLUSTER_ACTIONS),
+ "The list may also contain cluster change actions: {}".format(
CLUSTER_ACTIONS
),
default=[CURRENT_VERSION],
)
parser.add_argument(
@ -409,8 +460,12 @@ if __name__ == "__main__":
help="In case of an error do not remove any of the generated files",
action="store_true",
)
parser.add_argument("--blob-granules-enabled", help="Enable blob granules", action="store_true")
parser.add_argument("--run-with-gdb", help="Execute the tester binary from gdb", action="store_true")
parser.add_argument(
"--blob-granules-enabled", help="Enable blob granules", action="store_true"
)
parser.add_argument(
"--run-with-gdb", help="Execute the tester binary from gdb", action="store_true"
)
args = parser.parse_args()
if args.process_number == 0:
args.process_number = random.randint(1, 5)