From 2e664b6cb1d67815b6fe76befb3502cdbe7221ae Mon Sep 17 00:00:00 2001 From: sfc-gh-tclinkenbeard Date: Wed, 2 Nov 2022 07:16:50 -0700 Subject: [PATCH 01/33] Increase suppression interval for GrvProxyTransactionTagThrottler_MultipleTags --- fdbserver/GrvProxyTransactionTagThrottler.actor.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fdbserver/GrvProxyTransactionTagThrottler.actor.cpp b/fdbserver/GrvProxyTransactionTagThrottler.actor.cpp index 45c0663cf0..d64be27754 100644 --- a/fdbserver/GrvProxyTransactionTagThrottler.actor.cpp +++ b/fdbserver/GrvProxyTransactionTagThrottler.actor.cpp @@ -103,7 +103,7 @@ void GrvProxyTransactionTagThrottler::addRequest(GetReadVersionRequest const& re // SERVER_KNOBS->ENFORCE_TAG_THROTTLING_ON_PROXIES is enabled, there may be // unexpected behaviour, because only one tag is used for throttling. TraceEvent(SevWarnAlways, "GrvProxyTransactionTagThrottler_MultipleTags") - .suppressFor(1.0) + .suppressFor(60.0) .detail("NumTags", req.tags.size()) .detail("UsingTag", printable(tag)); } From f99277f45cac6bc339f585bd344984b039a6871b Mon Sep 17 00:00:00 2001 From: Dan Adkins Date: Wed, 2 Nov 2022 09:56:26 -0700 Subject: [PATCH 02/33] DiskQueue: use explicitly sized int32_t instead of int in on-disk format. It was assumed that the payloadSize field in the page header was 32 bits. This was checked by a static_assert on the size of the PageHeader struct. This change makes that field size explicit, rather than assuming that sizeof(int) == sizeof(int32_t). --- fdbserver/DiskQueue.actor.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fdbserver/DiskQueue.actor.cpp b/fdbserver/DiskQueue.actor.cpp index 34cf407726..a723c77780 100644 --- a/fdbserver/DiskQueue.actor.cpp +++ b/fdbserver/DiskQueue.actor.cpp @@ -1066,7 +1066,7 @@ private: }; uint64_t seq; // seq is the index of the virtually infinite disk queue file. Its unit is bytes. uint64_t popped; - int payloadSize; + int32_t payloadSize; }; // The on disk format depends on the size of PageHeader. static_assert(sizeof(PageHeader) == 36, "PageHeader must be 36 bytes"); @@ -1703,4 +1703,4 @@ TEST_CASE("performance/fdbserver/DiskQueue") { queue->dispose(); wait(queue->onClosed()); return Void(); -} \ No newline at end of file +} From 93142e2c29b1785192fad76eed531bd575c773e2 Mon Sep 17 00:00:00 2001 From: "A.J. Beamon" Date: Wed, 2 Nov 2022 14:47:24 -0700 Subject: [PATCH 03/33] In try commit, wait for the read version future before waiting for the commit cost estimate. This allows us to give precedence to the database_locked error over errors thrown by the commit cost estimation. --- fdbclient/NativeAPI.actor.cpp | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index a6f444ea49..cb1b898fbb 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -6384,8 +6384,11 @@ ACTOR static Future tryCommit(Reference trState, } if (req.tagSet.present() && trState->options.priority < TransactionPriority::IMMEDIATE) { - wait(store(req.transaction.read_snapshot, readVersion) && - store(req.commitCostEstimation, estimateCommitCosts(trState, &req.transaction))); + state Future> commitCostFuture = + estimateCommitCosts(trState, &req.transaction); + // We need to wait for the read version first so that we can be notified if the database is locked + wait(store(req.transaction.read_snapshot, readVersion)); + wait(store(req.commitCostEstimation, commitCostFuture)); } else { wait(store(req.transaction.read_snapshot, readVersion)); } From f09e995ebb6261e0da5ea73004ff9f665d381e42 Mon Sep 17 00:00:00 2001 From: Hui Liu Date: Mon, 24 Oct 2022 17:11:51 -0700 Subject: [PATCH 04/33] blob migrator: advance version and report data copy progress --- fdbclient/ServerKnobs.cpp | 1 + fdbclient/include/fdbclient/ServerKnobs.h | 1 + fdbserver/BlobMigrator.actor.cpp | 75 ++++++++++++++++++++++- 3 files changed, 74 insertions(+), 3 deletions(-) diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index a88fa02bdd..0e2babfd92 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -1002,6 +1002,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( BLOB_MANIFEST_BACKUP, false ); init( BLOB_MANIFEST_BACKUP_INTERVAL, isSimulated ? 5.0 : 30.0 ); init( BLOB_FULL_RESTORE_MODE, false ); + init( BLOB_MIGRATOR_CHECK_INTERVAL, isSimulated ? 1.0 : 5.0); init( BGCC_TIMEOUT, isSimulated ? 10.0 : 120.0 ); init( BGCC_MIN_INTERVAL, isSimulated ? 1.0 : 10.0 ); diff --git a/fdbclient/include/fdbclient/ServerKnobs.h b/fdbclient/include/fdbclient/ServerKnobs.h index caa94cab53..1764a260be 100644 --- a/fdbclient/include/fdbclient/ServerKnobs.h +++ b/fdbclient/include/fdbclient/ServerKnobs.h @@ -982,6 +982,7 @@ public: bool BLOB_MANIFEST_BACKUP; double BLOB_MANIFEST_BACKUP_INTERVAL; bool BLOB_FULL_RESTORE_MODE; + double BLOB_MIGRATOR_CHECK_INTERVAL; // Blob metadata int64_t BLOB_METADATA_CACHE_TTL; diff --git a/fdbserver/BlobMigrator.actor.cpp b/fdbserver/BlobMigrator.actor.cpp index 9be19fa6a4..dd7d5f3ea3 100644 --- a/fdbserver/BlobMigrator.actor.cpp +++ b/fdbserver/BlobMigrator.actor.cpp @@ -18,8 +18,6 @@ * limitations under the License. */ -#include "fdbserver/BlobMigratorInterface.h" -#include "fdbserver/Knobs.h" #include "flow/ActorCollection.h" #include "flow/FastRef.h" #include "flow/IRandom.h" @@ -35,6 +33,8 @@ #include "fdbserver/WaitFailure.h" #include "fdbserver/MoveKeys.actor.h" #include "fdbserver/BlobGranuleServerCommon.actor.h" +#include "fdbserver/BlobMigratorInterface.h" +#include "fdbserver/Knobs.h" #include "flow/actorcompiler.h" // has to be last include #include "flow/network.h" #include @@ -72,7 +72,7 @@ public: self->blobGranules_ = granules; wait(prepare(self, normalKeys)); - + wait(advanceVersion(self)); wait(serverLoop(self)); return Void(); } @@ -148,9 +148,78 @@ private: } } + // Print migration progress periodically + ACTOR static Future logProgress(Reference self) { + loop { + bool done = wait(checkProgress(self)); + if (done) + return Void(); + wait(delay(SERVER_KNOBS->BLOB_MIGRATOR_CHECK_INTERVAL)); + } + } + + // Check key ranges that are migrated. Return true if all ranges are done + ACTOR static Future checkProgress(Reference self) { + state Transaction tr(self->db_); + loop { + tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr.setOption(FDBTransactionOptions::LOCK_AWARE); + try { + // Get key ranges that are still owned by the migrator. Those ranges are + // incompleted migrations + state UID serverID = self->interf_.ssi.id(); + RangeResult ranges = wait(krmGetRanges(&tr, serverKeysPrefixFor(serverID), normalKeys)); + + // Count incompleted size + int64_t incompleted = 0; + for (auto i = 0; i < ranges.size() - 1; ++i) { + if (ranges[i].value == serverKeysTrue) { + KeyRangeRef range(ranges[i].key, ranges[i + 1].key); + int64_t bytes = sizeInBytes(self, range); + dprint(" incompleted {}, size: {}\n", range.toString(), bytes); + incompleted += bytes; + } + } + + // Calculated progress + int64_t total = sizeInBytes(self); + int progress = (total - incompleted) * 100 / total; + bool done = incompleted == 0; + dprint("Progress {} :{}%. done {}\n", serverID.toString(), progress, done); + return done; + } catch (Error& e) { + wait(tr.onError(e)); + } + } + } + + // Advance version, so that future commits will have a larger version than the restored data + ACTOR static Future advanceVersion(Reference self) { + state Transaction tr(self->db_); + loop { + tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr.setOption(FDBTransactionOptions::LOCK_AWARE); + try { + Version currentVersion = wait(tr.getRawReadVersion()); + Version expectedVersion = maxVersion(self); + if (currentVersion <= expectedVersion) { + tr.set(minRequiredCommitVersionKey, BinaryWriter::toValue(expectedVersion + 1, Unversioned())); + dprint("Advance version from {} to {}\n", currentVersion, expectedVersion); + wait(tr.commit()); + } + return Void(); + } catch (Error& e) { + wait(tr.onError(e)); + } + } + } + // Main server loop ACTOR static Future serverLoop(Reference self) { self->actors_.add(waitFailureServer(self->interf_.ssi.waitFailure.getFuture())); + self->actors_.add(logProgress(self)); self->actors_.add(handleRequest(self)); self->actors_.add(handleUnsupportedRequest(self)); loop { From 1fca1085062b914e4abf3233bccefb198d938c28 Mon Sep 17 00:00:00 2001 From: "A.J. Beamon" Date: Wed, 2 Nov 2022 16:50:41 -0700 Subject: [PATCH 05/33] The tenant management workload was not properly handling errors from GRV calls made in the delete tenant operation --- .../TenantManagementWorkload.actor.cpp | 24 ++++--------------- 1 file changed, 4 insertions(+), 20 deletions(-) diff --git a/fdbserver/workloads/TenantManagementWorkload.actor.cpp b/fdbserver/workloads/TenantManagementWorkload.actor.cpp index a0cdf8144f..94ee962247 100644 --- a/fdbserver/workloads/TenantManagementWorkload.actor.cpp +++ b/fdbserver/workloads/TenantManagementWorkload.actor.cpp @@ -665,23 +665,6 @@ struct TenantManagementWorkload : TestWorkload { return Void(); } - // Returns GRV and eats GRV errors - ACTOR static Future getReadVersion(Reference tr) { - loop { - try { - Version version = wait(tr->getReadVersion()); - return version; - } catch (Error& e) { - if (e.code() == error_code_grv_proxy_memory_limit_exceeded || - e.code() == error_code_batch_transaction_throttled) { - wait(tr->onError(e)); - } else { - throw; - } - } - } - } - ACTOR static Future deleteTenant(TenantManagementWorkload* self) { state TenantName beginTenant = self->chooseTenantName(true); state OperationType operationType = self->randomOperationType(); @@ -772,7 +755,8 @@ struct TenantManagementWorkload : TestWorkload { state bool retried = false; loop { try { - state Version beforeVersion = wait(self->getReadVersion(tr)); + state Version beforeVersion = + wait(getLatestReadVersion(self, OperationType::MANAGEMENT_DATABASE)); Optional result = wait(timeout(deleteTenantImpl(tr, beginTenant, endTenant, tenants, operationType, self), deterministicRandom()->randomInt(1, 30))); @@ -780,8 +764,8 @@ struct TenantManagementWorkload : TestWorkload { if (result.present()) { if (anyExists) { if (self->oldestDeletionVersion == 0 && !tenants.empty()) { - tr->reset(); - Version afterVersion = wait(self->getReadVersion(tr)); + Version afterVersion = + wait(self->getLatestReadVersion(self, OperationType::MANAGEMENT_DATABASE)); self->oldestDeletionVersion = afterVersion; } self->newestDeletionVersion = beforeVersion; From e20d0946f94623b7e5ec78d1cdf0c71ae6cae4dc Mon Sep 17 00:00:00 2001 From: Ankita Kejriwal Date: Wed, 2 Nov 2022 17:37:15 -0700 Subject: [PATCH 06/33] Enable DD_TENANT_AWARENESS_ENABLED randomly in testing to add test coverage. --- fdbclient/ServerKnobs.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index a88fa02bdd..1111067af1 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -295,7 +295,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( DD_STORAGE_WIGGLE_PAUSE_THRESHOLD, 10 ); if( randomize && BUGGIFY ) DD_STORAGE_WIGGLE_PAUSE_THRESHOLD = 1000; init( DD_STORAGE_WIGGLE_STUCK_THRESHOLD, 20 ); init( DD_STORAGE_WIGGLE_MIN_SS_AGE_SEC, isSimulated ? 2 : 21 * 60 * 60 * 24 ); if(randomize && BUGGIFY) DD_STORAGE_WIGGLE_MIN_SS_AGE_SEC = isSimulated ? 0: 120; - init( DD_TENANT_AWARENESS_ENABLED, false ); + init( DD_TENANT_AWARENESS_ENABLED, false ); if(isSimulated) DD_TENANT_AWARENESS_ENABLED = deterministicRandom()->coinflip(); init( TENANT_CACHE_LIST_REFRESH_INTERVAL, 2 ); if( randomize && BUGGIFY ) TENANT_CACHE_LIST_REFRESH_INTERVAL = deterministicRandom()->randomInt(1, 10); init( TENANT_CACHE_STORAGE_USAGE_REFRESH_INTERVAL, 2 ); if( randomize && BUGGIFY ) TENANT_CACHE_STORAGE_USAGE_REFRESH_INTERVAL = deterministicRandom()->randomInt(1, 10); init( TENANT_CACHE_STORAGE_QUOTA_REFRESH_INTERVAL, 10 ); if( randomize && BUGGIFY ) TENANT_CACHE_STORAGE_QUOTA_REFRESH_INTERVAL = deterministicRandom()->randomInt(1, 10); From 18b852c4e4882ad965e35ca68fad822c1c6330bc Mon Sep 17 00:00:00 2001 From: Vaidas Gasiunas Date: Thu, 3 Nov 2022 13:20:21 +0100 Subject: [PATCH 07/33] Improving troubleshooting of stopping the FDB client thread (#8629) * Upgrade tests: dump thread call stacks of the tester process if it fails to terminate * ApiTester: log before and after stopping the network thread * Catch and print exceptions in closeTraceFile; Close trace file at the end of MVC runNetwork * Change trace event name for MVC runNetwork termination Co-authored-by: A.J. Beamon Co-authored-by: A.J. Beamon --- .../c/test/apitester/fdb_c_api_tester.cpp | 2 ++ fdbclient/MultiVersionTransaction.actor.cpp | 11 ++++++- fdbclient/ThreadSafeTransaction.cpp | 10 +++--- flow/Trace.cpp | 33 +++++++++++-------- tests/TestRunner/upgrade_test.py | 8 +++-- 5 files changed, 40 insertions(+), 24 deletions(-) diff --git a/bindings/c/test/apitester/fdb_c_api_tester.cpp b/bindings/c/test/apitester/fdb_c_api_tester.cpp index 83977fb29e..161338d1eb 100644 --- a/bindings/c/test/apitester/fdb_c_api_tester.cpp +++ b/bindings/c/test/apitester/fdb_c_api_tester.cpp @@ -459,8 +459,10 @@ int main(int argc, char** argv) { retCode = 1; } + fprintf(stderr, "Stopping FDB network thread\n"); fdb_check(fdb::network::stop(), "Failed to stop FDB thread"); network_thread.join(); + fprintf(stderr, "FDB network thread successfully stopped\n"); } catch (const std::exception& err) { fmt::print(stderr, "ERROR: {}\n", err.what()); retCode = 1; diff --git a/fdbclient/MultiVersionTransaction.actor.cpp b/fdbclient/MultiVersionTransaction.actor.cpp index 3e0c2e18db..bbf376c805 100644 --- a/fdbclient/MultiVersionTransaction.actor.cpp +++ b/fdbclient/MultiVersionTransaction.actor.cpp @@ -18,6 +18,7 @@ * limitations under the License. */ +#include "flow/Trace.h" #ifdef ADDRESS_SANITIZER #include #endif @@ -2812,11 +2813,19 @@ void MultiVersionApi::runNetwork() { }); } - localClient->api->runNetwork(); + try { + localClient->api->runNetwork(); + } catch (const Error& e) { + closeTraceFile(); + throw e; + } for (auto h : handles) { waitThread(h); } + + TraceEvent("MultiVersionRunNetworkTerminating"); + closeTraceFile(); } void MultiVersionApi::stopNetwork() { diff --git a/fdbclient/ThreadSafeTransaction.cpp b/fdbclient/ThreadSafeTransaction.cpp index a79578a509..5821b30dee 100644 --- a/fdbclient/ThreadSafeTransaction.cpp +++ b/fdbclient/ThreadSafeTransaction.cpp @@ -743,10 +743,10 @@ void ThreadSafeApi::runNetwork() { Optional runErr; try { ::runNetwork(); - } catch (Error& e) { + } catch (const Error& e) { TraceEvent(SevError, "RunNetworkError").error(e); runErr = e; - } catch (std::exception& e) { + } catch (const std::exception& e) { runErr = unknown_error(); TraceEvent(SevError, "RunNetworkError").error(unknown_error()).detail("RootException", e.what()); } catch (...) { @@ -757,9 +757,9 @@ void ThreadSafeApi::runNetwork() { for (auto& hook : threadCompletionHooks) { try { hook.first(hook.second); - } catch (Error& e) { + } catch (const Error& e) { TraceEvent(SevError, "NetworkShutdownHookError").error(e); - } catch (std::exception& e) { + } catch (const std::exception& e) { TraceEvent(SevError, "NetworkShutdownHookError").error(unknown_error()).detail("RootException", e.what()); } catch (...) { TraceEvent(SevError, "NetworkShutdownHookError").error(unknown_error()); @@ -767,12 +767,10 @@ void ThreadSafeApi::runNetwork() { } if (runErr.present()) { - closeTraceFile(); throw runErr.get(); } TraceEvent("RunNetworkTerminating"); - closeTraceFile(); } void ThreadSafeApi::stopNetwork() { diff --git a/flow/Trace.cpp b/flow/Trace.cpp index 4717247d16..41b23ad491 100644 --- a/flow/Trace.cpp +++ b/flow/Trace.cpp @@ -25,6 +25,7 @@ #include "flow/JsonTraceLogFormatter.h" #include "flow/flow.h" #include "flow/DeterministicRandom.h" +#include #include #include #include @@ -514,25 +515,29 @@ public: void close() { if (opened) { - MutexHolder hold(mutex); + try { + MutexHolder hold(mutex); - // Write remaining contents - auto a = new WriterThread::WriteBuffer(std::move(eventBuffer)); - loggedLength += bufferLength; - eventBuffer = std::vector(); - bufferLength = 0; - writer->post(a); + // Write remaining contents + auto a = new WriterThread::WriteBuffer(std::move(eventBuffer)); + loggedLength += bufferLength; + eventBuffer = std::vector(); + bufferLength = 0; + writer->post(a); - auto c = new WriterThread::Close(); - writer->post(c); + auto c = new WriterThread::Close(); + writer->post(c); - ThreadFuture f(new ThreadSingleAssignmentVar); - barriers->push(f); - writer->post(new WriterThread::Barrier); + ThreadFuture f(new ThreadSingleAssignmentVar); + barriers->push(f); + writer->post(new WriterThread::Barrier); - f.getBlocking(); + f.getBlocking(); - opened = false; + opened = false; + } catch (const std::exception& e) { + fprintf(stderr, "Error closing trace file: %s\n", e.what()); + } } } diff --git a/tests/TestRunner/upgrade_test.py b/tests/TestRunner/upgrade_test.py index 3948c56533..51700ba8c0 100755 --- a/tests/TestRunner/upgrade_test.py +++ b/tests/TestRunner/upgrade_test.py @@ -280,11 +280,13 @@ class UpgradeTest: os.close(self.ctrl_pipe) # Kill the tester process if it is still alive - def kill_tester_if_alive(self, workload_thread): + def kill_tester_if_alive(self, workload_thread, dump_stacks): if not workload_thread.is_alive(): return if self.tester_proc is not None: try: + if dump_stacks: + os.system("pstack {}".format(self.tester_proc.pid)) print("Killing the tester process") self.tester_proc.kill() workload_thread.join(5) @@ -310,11 +312,11 @@ class UpgradeTest: except Exception: print("Upgrade test failed") print(traceback.format_exc()) - self.kill_tester_if_alive(workload_thread) + self.kill_tester_if_alive(workload_thread, False) finally: workload_thread.join(5) reader_thread.join(5) - self.kill_tester_if_alive(workload_thread) + self.kill_tester_if_alive(workload_thread, True) if test_retcode == 0: test_retcode = self.tester_retcode return test_retcode From 304747d387e41125740e9fe6b83c7bf907c85237 Mon Sep 17 00:00:00 2001 From: "A.J. Beamon" Date: Wed, 2 Nov 2022 16:45:59 -0700 Subject: [PATCH 08/33] The simulator was changing the configuration of the cluster while the consistency check was expecting a quiescent database --- fdbrpc/include/fdbrpc/simulator.h | 1 + fdbserver/ConsistencyScan.actor.cpp | 1 + fdbserver/QuietDatabase.actor.cpp | 2 +- fdbserver/workloads/ConsistencyCheck.actor.cpp | 8 ++++++++ 4 files changed, 11 insertions(+), 1 deletion(-) diff --git a/fdbrpc/include/fdbrpc/simulator.h b/fdbrpc/include/fdbrpc/simulator.h index 0ddf8da238..6d038726d4 100644 --- a/fdbrpc/include/fdbrpc/simulator.h +++ b/fdbrpc/include/fdbrpc/simulator.h @@ -477,6 +477,7 @@ public: Optional> primaryDcId; Reference remoteTLogPolicy; int32_t usableRegions; + bool quiesced = false; std::string disablePrimary; std::string disableRemote; std::string originalRegions; diff --git a/fdbserver/ConsistencyScan.actor.cpp b/fdbserver/ConsistencyScan.actor.cpp index 0af7ff787b..cbe0f7123a 100644 --- a/fdbserver/ConsistencyScan.actor.cpp +++ b/fdbserver/ConsistencyScan.actor.cpp @@ -129,6 +129,7 @@ ACTOR Future getKeyServers( // one needs to be reachable if (performQuiescentChecks && !shards.present()) { TraceEvent("ConsistencyCheck_CommitProxyUnavailable") + .error(shards.getError()) .detail("CommitProxyID", commitProxyInfo->getId(i)); testFailure("Commit proxy unavailable", performQuiescentChecks, true); return false; diff --git a/fdbserver/QuietDatabase.actor.cpp b/fdbserver/QuietDatabase.actor.cpp index 878e5eb554..029080da4d 100644 --- a/fdbserver/QuietDatabase.actor.cpp +++ b/fdbserver/QuietDatabase.actor.cpp @@ -656,7 +656,7 @@ ACTOR Future getVersionOffset(Database cx, ACTOR Future repairDeadDatacenter(Database cx, Reference const> dbInfo, std::string context) { - if (g_network->isSimulated() && g_simulator->usableRegions > 1) { + if (g_network->isSimulated() && g_simulator->usableRegions > 1 && !g_simulator->quiesced) { bool primaryDead = g_simulator->datacenterDead(g_simulator->primaryDcId); bool remoteDead = g_simulator->datacenterDead(g_simulator->remoteDcId); diff --git a/fdbserver/workloads/ConsistencyCheck.actor.cpp b/fdbserver/workloads/ConsistencyCheck.actor.cpp index 5dbd9ef809..6410f49dd3 100644 --- a/fdbserver/workloads/ConsistencyCheck.actor.cpp +++ b/fdbserver/workloads/ConsistencyCheck.actor.cpp @@ -131,6 +131,10 @@ struct ConsistencyCheckWorkload : TestWorkload { try { wait(timeoutError(quietDatabase(cx, self->dbInfo, "ConsistencyCheckStart", 0, 1e5, 0, 0), self->quiescentWaitTimeout)); // FIXME: should be zero? + if (g_network->isSimulated()) { + g_simulator->quiesced = true; + TraceEvent("ConsistencyCheckQuiesced").detail("Quiesced", g_simulator->quiesced); + } } catch (Error& e) { TraceEvent("ConsistencyCheck_QuietDatabaseError").error(e); self->testFailure("Unable to achieve a quiet database"); @@ -201,6 +205,10 @@ struct ConsistencyCheckWorkload : TestWorkload { when(wait(self->suspendConsistencyCheck.onChange())) {} } } + if (self->firstClient && g_network->isSimulated() && self->performQuiescentChecks) { + g_simulator->quiesced = false; + TraceEvent("ConsistencyCheckQuiescedEnd").detail("Quiesced", g_simulator->quiesced); + } return Void(); } From 1daa346cb4d2a9598dedf1fb0a01905a4ee5c749 Mon Sep 17 00:00:00 2001 From: neethuhaneesha Date: Thu, 3 Nov 2022 10:26:47 -0700 Subject: [PATCH 09/33] Adding boundaries to rocksdb read iterator pool. (#8584) --- fdbclient/ServerKnobs.cpp | 2 + fdbclient/include/fdbclient/ServerKnobs.h | 2 + fdbserver/KeyValueStoreRocksDB.actor.cpp | 78 +++++++++++++++++++---- 3 files changed, 69 insertions(+), 13 deletions(-) diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index 0e2babfd92..1ee635729a 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -407,6 +407,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( ROCKSDB_HISTOGRAMS_SAMPLE_RATE, 0.001 ); if( randomize && BUGGIFY ) ROCKSDB_HISTOGRAMS_SAMPLE_RATE = 0; init( ROCKSDB_READ_RANGE_ITERATOR_REFRESH_TIME, 30.0 ); if( randomize && BUGGIFY ) ROCKSDB_READ_RANGE_ITERATOR_REFRESH_TIME = 0.1; init( ROCKSDB_READ_RANGE_REUSE_ITERATORS, true ); if( randomize && BUGGIFY ) ROCKSDB_READ_RANGE_REUSE_ITERATORS = deterministicRandom()->coinflip() ? true : false; + init( ROCKSDB_READ_RANGE_REUSE_BOUNDED_ITERATORS, false ); if( randomize && BUGGIFY ) ROCKSDB_READ_RANGE_REUSE_BOUNDED_ITERATORS = deterministicRandom()->coinflip() ? true : false; + init( ROCKSDB_READ_RANGE_BOUNDED_ITERATORS_MAX_LIMIT, 200 ); // Set to 0 to disable rocksdb write rate limiting. Rate limiter unit: bytes per second. init( ROCKSDB_WRITE_RATE_LIMITER_BYTES_PER_SEC, 0 ); // If true, enables dynamic adjustment of ROCKSDB_WRITE_RATE_LIMITER_BYTES according to the recent demand of background IO. diff --git a/fdbclient/include/fdbclient/ServerKnobs.h b/fdbclient/include/fdbclient/ServerKnobs.h index 1764a260be..2a530febad 100644 --- a/fdbclient/include/fdbclient/ServerKnobs.h +++ b/fdbclient/include/fdbclient/ServerKnobs.h @@ -334,6 +334,8 @@ public: double ROCKSDB_HISTOGRAMS_SAMPLE_RATE; double ROCKSDB_READ_RANGE_ITERATOR_REFRESH_TIME; bool ROCKSDB_READ_RANGE_REUSE_ITERATORS; + bool ROCKSDB_READ_RANGE_REUSE_BOUNDED_ITERATORS; + int ROCKSDB_READ_RANGE_BOUNDED_ITERATORS_MAX_LIMIT; int64_t ROCKSDB_WRITE_RATE_LIMITER_BYTES_PER_SEC; bool ROCKSDB_WRITE_RATE_LIMITER_AUTO_TUNE; std::string DEFAULT_FDB_ROCKSDB_COLUMN_FAMILY; diff --git a/fdbserver/KeyValueStoreRocksDB.actor.cpp b/fdbserver/KeyValueStoreRocksDB.actor.cpp index 61dac56bd2..2b64d5cabd 100644 --- a/fdbserver/KeyValueStoreRocksDB.actor.cpp +++ b/fdbserver/KeyValueStoreRocksDB.actor.cpp @@ -397,13 +397,23 @@ struct Counters { }; struct ReadIterator { - CF& cf; uint64_t index; // incrementing counter to uniquely identify read iterator. bool inUse; std::shared_ptr iter; double creationTime; + KeyRange keyRange; + std::shared_ptr beginSlice, endSlice; ReadIterator(CF& cf, uint64_t index, DB& db, rocksdb::ReadOptions& options) - : cf(cf), index(index), inUse(true), creationTime(now()), iter(db->NewIterator(options, cf)) {} + : index(index), inUse(true), creationTime(now()), iter(db->NewIterator(options, cf)) {} + ReadIterator(CF& cf, uint64_t index, DB& db, rocksdb::ReadOptions options, KeyRange keyRange) + : index(index), inUse(true), creationTime(now()), keyRange(keyRange) { + beginSlice = std::shared_ptr(new rocksdb::Slice(toSlice(keyRange.begin))); + options.iterate_lower_bound = beginSlice.get(); + endSlice = std::shared_ptr(new rocksdb::Slice(toSlice(keyRange.end))); + options.iterate_upper_bound = endSlice.get(); + + iter = std::shared_ptr(db->NewIterator(options, cf)); + } }; /* @@ -426,42 +436,84 @@ public: readRangeOptions.auto_prefix_mode = (SERVER_KNOBS->ROCKSDB_PREFIX_LEN > 0); TraceEvent("ReadIteratorPool", id) .detail("KnobRocksDBReadRangeReuseIterators", SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_ITERATORS) + .detail("KnobRocksDBReadRangeReuseBoundedIterators", + SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_BOUNDED_ITERATORS) + .detail("KnobRocksDBReadRangeBoundedIteratorsMaxLimit", + SERVER_KNOBS->ROCKSDB_READ_RANGE_BOUNDED_ITERATORS_MAX_LIMIT) .detail("KnobRocksDBPrefixLen", SERVER_KNOBS->ROCKSDB_PREFIX_LEN); + if (SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_ITERATORS && + SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_BOUNDED_ITERATORS) { + TraceEvent(SevWarn, "ReadIteratorKnobsMismatch"); + } } // Called on every db commit. void update() { - if (SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_ITERATORS) { + if (SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_ITERATORS || + SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_BOUNDED_ITERATORS) { std::lock_guard lock(mutex); iteratorsMap.clear(); } } // Called on every read operation. - ReadIterator getIterator() { + ReadIterator getIterator(KeyRange keyRange) { if (SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_ITERATORS) { - std::lock_guard lock(mutex); + mutex.lock(); for (it = iteratorsMap.begin(); it != iteratorsMap.end(); it++) { if (!it->second.inUse) { it->second.inUse = true; iteratorsReuseCount++; - return it->second; + ReadIterator iter = it->second; + mutex.unlock(); + return iter; } } index++; - ReadIterator iter(cf, index, db, readRangeOptions); - iteratorsMap.insert({ index, iter }); + uint64_t readIteratorIndex = index; + mutex.unlock(); + + ReadIterator iter(cf, readIteratorIndex, db, readRangeOptions); + mutex.lock(); + iteratorsMap.insert({ readIteratorIndex, iter }); + mutex.unlock(); + return iter; + } else if (SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_BOUNDED_ITERATORS) { + // TODO: Based on the datasize in the keyrange, decide whether to store the iterator for reuse. + mutex.lock(); + for (it = iteratorsMap.begin(); it != iteratorsMap.end(); it++) { + if (!it->second.inUse && it->second.keyRange.contains(keyRange)) { + it->second.inUse = true; + iteratorsReuseCount++; + ReadIterator iter = it->second; + mutex.unlock(); + return iter; + } + } + index++; + uint64_t readIteratorIndex = index; + mutex.unlock(); + + ReadIterator iter(cf, readIteratorIndex, db, readRangeOptions, keyRange); + if (iteratorsMap.size() < SERVER_KNOBS->ROCKSDB_READ_RANGE_BOUNDED_ITERATORS_MAX_LIMIT) { + // Not storing more than ROCKSDB_READ_RANGE_BOUNDED_ITERATORS_MAX_LIMIT of iterators + // to avoid 'out of memory' issues. + mutex.lock(); + iteratorsMap.insert({ readIteratorIndex, iter }); + mutex.unlock(); + } return iter; } else { index++; - ReadIterator iter(cf, index, db, readRangeOptions); + ReadIterator iter(cf, index, db, readRangeOptions, keyRange); return iter; } } // Called on every read operation, after the keys are collected. void returnIterator(ReadIterator& iter) { - if (SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_ITERATORS) { + if (SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_ITERATORS || + SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_BOUNDED_ITERATORS) { std::lock_guard lock(mutex); it = iteratorsMap.find(iter.index); // iterator found: put the iterator back to the pool(inUse=false). @@ -768,7 +820,7 @@ uint64_t PerfContextMetrics::getRocksdbPerfcontextMetric(int metric) { } ACTOR Future refreshReadIteratorPool(std::shared_ptr readIterPool) { - if (SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_ITERATORS) { + if (SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_ITERATORS || SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_BOUNDED_ITERATORS) { loop { wait(delay(SERVER_KNOBS->ROCKSDB_READ_RANGE_ITERATOR_REFRESH_TIME)); readIterPool->refreshIterators(); @@ -1559,7 +1611,7 @@ struct RocksDBKeyValueStore : IKeyValueStore { rocksdb::Status s; if (a.rowLimit >= 0) { double iterCreationBeginTime = a.getHistograms ? timer_monotonic() : 0; - ReadIterator readIter = readIterPool->getIterator(); + ReadIterator readIter = readIterPool->getIterator(a.keys); if (a.getHistograms) { metricPromiseStream->send(std::make_pair(ROCKSDB_READRANGE_NEWITERATOR_HISTOGRAM.toString(), timer_monotonic() - iterCreationBeginTime)); @@ -1588,7 +1640,7 @@ struct RocksDBKeyValueStore : IKeyValueStore { readIterPool->returnIterator(readIter); } else { double iterCreationBeginTime = a.getHistograms ? timer_monotonic() : 0; - ReadIterator readIter = readIterPool->getIterator(); + ReadIterator readIter = readIterPool->getIterator(a.keys); if (a.getHistograms) { metricPromiseStream->send(std::make_pair(ROCKSDB_READRANGE_NEWITERATOR_HISTOGRAM.toString(), timer_monotonic() - iterCreationBeginTime)); From ffe024244ee5bf624e5f91366236e01639a60fd3 Mon Sep 17 00:00:00 2001 From: Aaron Molitor Date: Tue, 1 Nov 2022 16:36:44 -0500 Subject: [PATCH 10/33] skip proxy when fetching kubectl --- packaging/docker/Dockerfile | 2 +- packaging/docker/Dockerfile.eks | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/packaging/docker/Dockerfile b/packaging/docker/Dockerfile index 1c19b1d9bd..9a0fc5b098 100644 --- a/packaging/docker/Dockerfile +++ b/packaging/docker/Dockerfile @@ -178,7 +178,7 @@ RUN yum -y install \ rm -rf /var/cache/yum WORKDIR /tmp -RUN curl -Ls https://s3.us-west-2.amazonaws.com/amazon-eks/1.22.6/2022-03-09/bin/linux/amd64/kubectl -o kubectl && \ +RUN NO_PROXY="" no_proxy="" curl -Ls https://s3.us-west-2.amazonaws.com/amazon-eks/1.22.6/2022-03-09/bin/linux/amd64/kubectl -o kubectl && \ echo "860c3d37a5979491895767e7332404d28dc0d7797c7673c33df30ca80e215a07 kubectl" > kubectl.txt && \ sha256sum --quiet -c kubectl.txt && \ mv kubectl /usr/local/bin/kubectl && \ diff --git a/packaging/docker/Dockerfile.eks b/packaging/docker/Dockerfile.eks index 9a3eacb84b..24e1a9467f 100644 --- a/packaging/docker/Dockerfile.eks +++ b/packaging/docker/Dockerfile.eks @@ -53,7 +53,7 @@ RUN curl -Ls https://github.com/krallin/tini/releases/download/v0.19.0/tini-amd6 mv tini /usr/bin/ && \ rm -rf /tmp/* -RUN curl -Ls https://s3.us-west-2.amazonaws.com/amazon-eks/1.22.6/2022-03-09/bin/linux/amd64/kubectl -o kubectl && \ +RUN NO_PROXY="" no_proxy="" curl -Ls https://s3.us-west-2.amazonaws.com/amazon-eks/1.22.6/2022-03-09/bin/linux/amd64/kubectl -o kubectl && \ echo "860c3d37a5979491895767e7332404d28dc0d7797c7673c33df30ca80e215a07 kubectl" > kubectl.txt && \ sha256sum --quiet -c kubectl.txt && \ mv kubectl /usr/local/bin/kubectl && \ From a7d123643d241f24fd62b87aa466d5abcadaa792 Mon Sep 17 00:00:00 2001 From: Ata E Husain Bohra Date: Thu, 3 Nov 2022 11:16:50 -0700 Subject: [PATCH 11/33] Extend Tlog persistentStorage to persist encryption state (#8344) * Extend Tlog persistentStorage to persist encryption state Description diff-3: Address review comment. diff-2: Extend ClusterController endpoints to allow query cluster's encryptionAtRest status Update Tlog recovery to ensure on-disk encryption status matches with cluster's cstate persisted encryptionAtRest diff-1: Store encryptionAtRestMode state in Coordinators Major changes proposed are: 1. Extend TLog persistentStorage to persist encryption state 2. Encryption state persisted is derived from corresponding db-config and relevant SERVER_KNOBS. In near future, knobs shall be removed. 3. On TLog startup, the persisted encryption state is compared against cluster configuration, if mismatch, the TLog is killed and not allowed to rejoin the cluster. Testing devRunCorrectness - 100K --- fdbclient/DatabaseConfiguration.cpp | 2 +- fdbclient/include/fdbclient/FDBTypes.h | 10 ++- fdbrpc/include/fdbrpc/fdbrpc.h | 1 + fdbserver/ClusterController.actor.cpp | 53 +++++++++----- fdbserver/ClusterRecovery.actor.cpp | 33 ++++++++- fdbserver/TLogServer.actor.cpp | 72 ++++++++++++++++++- .../fdbserver/ClusterController.actor.h | 6 ++ .../include/fdbserver/ClusterRecovery.actor.h | 2 +- fdbserver/include/fdbserver/DBCoreState.h | 3 +- .../include/fdbserver/WorkerInterface.actor.h | 34 ++++++++- 10 files changed, 189 insertions(+), 27 deletions(-) diff --git a/fdbclient/DatabaseConfiguration.cpp b/fdbclient/DatabaseConfiguration.cpp index 76fded095c..0ddc0e1c45 100644 --- a/fdbclient/DatabaseConfiguration.cpp +++ b/fdbclient/DatabaseConfiguration.cpp @@ -658,7 +658,7 @@ bool DatabaseConfiguration::setInternal(KeyRef key, ValueRef value) { parse((&type), value); blobGranulesEnabled = (type != 0); } else if (ck == "encryption_at_rest_mode"_sr) { - encryptionAtRestMode = EncryptionAtRestMode::fromValue(value); + encryptionAtRestMode = EncryptionAtRestMode::fromValueRef(Optional(value)); } else { return false; } diff --git a/fdbclient/include/fdbclient/FDBTypes.h b/fdbclient/include/fdbclient/FDBTypes.h index ddb6404bb8..23e84affee 100644 --- a/fdbclient/include/fdbclient/FDBTypes.h +++ b/fdbclient/include/fdbclient/FDBTypes.h @@ -1494,7 +1494,7 @@ struct EncryptionAtRestMode { bool operator==(const EncryptionAtRestMode& e) const { return isEquals(e); } bool operator!=(const EncryptionAtRestMode& e) const { return !isEquals(e); } - static EncryptionAtRestMode fromValue(Optional val) { + static EncryptionAtRestMode fromValueRef(Optional val) { if (!val.present()) { return DISABLED; } @@ -1508,6 +1508,14 @@ struct EncryptionAtRestMode { return static_cast(num); } + static EncryptionAtRestMode fromValue(Optional val) { + if (!val.present()) { + return EncryptionAtRestMode(); + } + + return EncryptionAtRestMode::fromValueRef(Optional(val.get().contents())); + } + uint32_t mode; }; diff --git a/fdbrpc/include/fdbrpc/fdbrpc.h b/fdbrpc/include/fdbrpc/fdbrpc.h index b0b4b39d4e..7eef5ee46c 100644 --- a/fdbrpc/include/fdbrpc/fdbrpc.h +++ b/fdbrpc/include/fdbrpc/fdbrpc.h @@ -734,6 +734,7 @@ public: // If cancelled, request was or will be delivered zero or more times. template Future getReply(const X& value) const { + // Ensure the same request isn't used multiple times ASSERT(!getReplyPromise(value).getFuture().isReady()); if (queue->isRemoteEndpoint()) { return sendCanceler(getReplyPromise(value), diff --git a/fdbserver/ClusterController.actor.cpp b/fdbserver/ClusterController.actor.cpp index 8da129af74..a8600d5a18 100644 --- a/fdbserver/ClusterController.actor.cpp +++ b/fdbserver/ClusterController.actor.cpp @@ -25,6 +25,7 @@ #include #include +#include "fdbclient/FDBTypes.h" #include "fdbclient/SystemData.h" #include "fdbclient/DatabaseContext.h" #include "fdbrpc/FailureMonitor.h" @@ -32,6 +33,7 @@ #include "fdbserver/BlobGranuleServerCommon.actor.h" #include "fdbserver/BlobMigratorInterface.h" #include "fdbserver/Knobs.h" +#include "fdbserver/WorkerInterface.actor.h" #include "flow/ActorCollection.h" #include "fdbclient/ClusterConnectionMemoryRecord.h" #include "fdbclient/NativeAPI.actor.h" @@ -66,6 +68,7 @@ #include "fdbrpc/ReplicationUtils.h" #include "fdbrpc/sim_validation.h" #include "fdbclient/KeyBackedTypes.h" +#include "flow/Error.h" #include "flow/Trace.h" #include "flow/Util.h" #include "flow/actorcompiler.h" // This must be the last #include. @@ -389,7 +392,7 @@ ACTOR Future clusterWatchDatabase(ClusterControllerData* cluster, wait(delay(0.0)); recoveryCore.cancel(); - wait(cleanupRecoveryActorCollection(recoveryData, true /* exThrown */)); + wait(cleanupRecoveryActorCollection(recoveryData, /*exThrown=*/true)); ASSERT(addActor.isEmpty()); CODE_PROBE(err.code() == error_code_tlog_failed, "Terminated due to tLog failure"); @@ -3025,6 +3028,18 @@ ACTOR Future updateClusterId(ClusterControllerData* self) { } } +ACTOR Future handleGetEncryptionAtRestMode(ClusterControllerData* self, ClusterControllerFullInterface ccInterf) { + loop { + state GetEncryptionAtRestModeRequest req = waitNext(ccInterf.getEncryptionAtRestMode.getFuture()); + TraceEvent("HandleGetEncryptionAtRestModeStart").detail("TlogId", req.tlogId); + EncryptionAtRestMode mode = wait(self->encryptionAtRestMode.getFuture()); + GetEncryptionAtRestModeResponse resp; + resp.mode = mode; + req.reply.send(resp); + TraceEvent("HandleGetEncryptionAtRestModeEnd").detail("TlogId", req.tlogId).detail("Mode", resp.mode); + } +} + ACTOR Future clusterControllerCore(ClusterControllerFullInterface interf, Future leaderFail, ServerCoordinators coordinators, @@ -3070,6 +3085,7 @@ ACTOR Future clusterControllerCore(ClusterControllerFullInterface interf, self.addActor.send(metaclusterMetricsUpdater(&self)); self.addActor.send(dbInfoUpdater(&self)); self.addActor.send(updateClusterId(&self)); + self.addActor.send(handleGetEncryptionAtRestMode(&self, interf)); self.addActor.send(self.clusterControllerMetrics.traceCounters("ClusterControllerMetrics", self.id, SERVER_KNOBS->STORAGE_LOGGING_DELAY, @@ -3090,8 +3106,8 @@ ACTOR Future clusterControllerCore(ClusterControllerFullInterface interf, endRole(Role::CLUSTER_CONTROLLER, interf.id(), "Stop Received Signal", true); } - // We shut down normally even if there was a serious error (so this fdbserver may be re-elected cluster - // controller) + // We shut down normally even if there was a serious error (so this fdbserver may be re-elected + // cluster controller) return Void(); } when(OpenDatabaseRequest req = waitNext(interf.clientInterface.openDatabase.getFuture())) { @@ -3243,11 +3259,11 @@ ACTOR Future clusterController(Reference connRec Reference>> clusterId) { // Defer this wait optimization of cluster configuration has 'Encryption data at-rest' enabled. - // Encryption depends on available of EncryptKeyProxy (EKP) FDB role to enable fetch/refresh of encryption keys - // created and managed by external KeyManagementService (KMS). + // Encryption depends on available of EncryptKeyProxy (EKP) FDB role to enable fetch/refresh of + // encryption keys created and managed by external KeyManagementService (KMS). // - // TODO: Wait optimization is to ensure the worker server on the same process gets registered with the new CC before - // recruitment. Unify the codepath for both Encryption enable vs disable scenarios. + // TODO: Wait optimization is to ensure the worker server on the same process gets registered with the + // new CC before recruitment. Unify the codepath for both Encryption enable vs disable scenarios. if (!SERVER_KNOBS->ENABLE_ENCRYPTION) { wait(recoveredDiskFiles); @@ -3278,8 +3294,8 @@ ACTOR Future clusterController(Reference connRec namespace { -// Tests `ClusterControllerData::updateWorkerHealth()` can update `ClusterControllerData::workerHealth` based on -// `UpdateWorkerHealth` request correctly. +// Tests `ClusterControllerData::updateWorkerHealth()` can update `ClusterControllerData::workerHealth` +// based on `UpdateWorkerHealth` request correctly. TEST_CASE("/fdbserver/clustercontroller/updateWorkerHealth") { // Create a testing ClusterControllerData. Most of the internal states do not matter in this test. state ClusterControllerData data(ClusterControllerFullInterface(), @@ -3292,8 +3308,8 @@ TEST_CASE("/fdbserver/clustercontroller/updateWorkerHealth") { state NetworkAddress badPeer2(IPAddress(0x03030303), 1); state NetworkAddress badPeer3(IPAddress(0x04040404), 1); - // Create a `UpdateWorkerHealthRequest` with two bad peers, and they should appear in the `workerAddress`'s - // degradedPeers. + // Create a `UpdateWorkerHealthRequest` with two bad peers, and they should appear in the + // `workerAddress`'s degradedPeers. { UpdateWorkerHealthRequest req; req.address = workerAddress; @@ -3354,8 +3370,8 @@ TEST_CASE("/fdbserver/clustercontroller/updateWorkerHealth") { previousRefreshTime = health.degradedPeers[badPeer3].lastRefreshTime; } - // Create a `UpdateWorkerHealthRequest` with empty `degradedPeers`, which should not remove the worker from - // `workerHealth`. + // Create a `UpdateWorkerHealthRequest` with empty `degradedPeers`, which should not remove the worker + // from `workerHealth`. { wait(delay(0.001)); UpdateWorkerHealthRequest req; @@ -3439,8 +3455,8 @@ TEST_CASE("/fdbserver/clustercontroller/getDegradationInfo") { NetworkAddress badPeer3(IPAddress(0x04040404), 1); NetworkAddress badPeer4(IPAddress(0x05050505), 1); - // Test that a reported degraded link should stay for sometime before being considered as a degraded link by - // cluster controller. + // Test that a reported degraded link should stay for sometime before being considered as a degraded + // link by cluster controller. { data.workerHealth[worker].degradedPeers[badPeer1] = { now(), now() }; data.workerHealth[worker].disconnectedPeers[badPeer2] = { now(), now() }; @@ -3472,7 +3488,8 @@ TEST_CASE("/fdbserver/clustercontroller/getDegradationInfo") { data.workerHealth.clear(); } - // Test that if both A complains B and B compalins A, only one of the server will be chosen as degraded server. + // Test that if both A complains B and B compalins A, only one of the server will be chosen as degraded + // server. { data.workerHealth[worker].degradedPeers[badPeer1] = { now() - SERVER_KNOBS->CC_MIN_DEGRADATION_INTERVAL - 1, now() }; @@ -3553,8 +3570,8 @@ TEST_CASE("/fdbserver/clustercontroller/getDegradationInfo") { data.workerHealth.clear(); } - // Test that if the degradation is reported both ways between A and other 4 servers, no degraded server is - // returned. + // Test that if the degradation is reported both ways between A and other 4 servers, no degraded server + // is returned. { ASSERT(SERVER_KNOBS->CC_DEGRADED_PEER_DEGREE_TO_EXCLUDE < 4); data.workerHealth[worker].degradedPeers[badPeer1] = { now() - SERVER_KNOBS->CC_MIN_DEGRADATION_INTERVAL - 1, diff --git a/fdbserver/ClusterRecovery.actor.cpp b/fdbserver/ClusterRecovery.actor.cpp index 15702ab500..9b525cf54e 100644 --- a/fdbserver/ClusterRecovery.actor.cpp +++ b/fdbserver/ClusterRecovery.actor.cpp @@ -18,12 +18,14 @@ * limitations under the License. */ +#include "fdbclient/FDBTypes.h" #include "fdbclient/Metacluster.h" #include "fdbrpc/sim_validation.h" #include "fdbserver/ApplyMetadataMutation.h" #include "fdbserver/BackupProgress.actor.h" #include "fdbserver/ClusterRecovery.actor.h" #include "fdbserver/EncryptionOpsUtils.h" +#include "fdbserver/Knobs.h" #include "fdbserver/MasterInterface.h" #include "fdbserver/WaitFailure.h" @@ -429,18 +431,34 @@ ACTOR Future rejoinRequestHandler(Reference self) { } } +namespace { +EncryptionAtRestMode getEncryptionAtRest() { + // TODO: Use db-config encryption config to determine cluster encryption status + if (SERVER_KNOBS->ENABLE_ENCRYPTION) { + return EncryptionAtRestMode(EncryptionAtRestMode::Mode::AES_256_CTR); + } else { + return EncryptionAtRestMode(); + } +} +} // namespace + // Keeps the coordinated state (cstate) updated as the set of recruited tlogs change through recovery. ACTOR Future trackTlogRecovery(Reference self, Reference>> oldLogSystems, Future minRecoveryDuration) { state Future rejoinRequests = Never(); state DBRecoveryCount recoverCount = self->cstate.myDBState.recoveryCount + 1; + state EncryptionAtRestMode encryptionAtRestMode = getEncryptionAtRest(); state DatabaseConfiguration configuration = self->configuration; // self-configuration can be changed by configurationMonitor so we need a copy loop { state DBCoreState newState; self->logSystem->toCoreState(newState); newState.recoveryCount = recoverCount; + + // Update Coordinators EncryptionAtRest status during the very first recovery of the cluster (empty database) + newState.encryptionAtRestMode = encryptionAtRestMode; + state Future changed = self->logSystem->onCoreStateChanged(); ASSERT(newState.tLogs[0].tLogWriteAntiQuorum == configuration.tLogWriteAntiQuorum && @@ -454,6 +472,7 @@ ACTOR Future trackTlogRecovery(Reference self, .detail("FinalUpdate", finalUpdate) .detail("NewState.tlogs", newState.tLogs.size()) .detail("NewState.OldTLogs", newState.oldTLogData.size()) + .detail("NewState.EncryptionAtRestMode", newState.encryptionAtRestMode.toString()) .detail("Expected.tlogs", configuration.expectedLogSets(self->primaryDcId.size() ? self->primaryDcId[0] : Optional())); wait(self->cstate.write(newState, finalUpdate)); @@ -934,7 +953,7 @@ ACTOR Future>> recruitEverything( .detail("Status", RecoveryStatus::names[status]) .trackLatest(self->clusterRecoveryStateEventHolder->trackingKey); return Never(); - } else + } else { TraceEvent(getRecoveryEventName(ClusterRecoveryEventType::CLUSTER_RECOVERY_STATE_EVENT_NAME).c_str(), self->dbgid) .detail("StatusCode", RecoveryStatus::recruiting_transaction_servers) @@ -945,6 +964,12 @@ ACTOR Future>> recruitEverything( .detail("RequiredResolvers", 1) .trackLatest(self->clusterRecoveryStateEventHolder->trackingKey); + // The cluster's EncryptionAtRest status is now readable. + if (self->controllerData->encryptionAtRestMode.canBeSet()) { + self->controllerData->encryptionAtRestMode.send(getEncryptionAtRest()); + } + } + // FIXME: we only need log routers for the same locality as the master int maxLogRouters = self->cstate.prevDBState.logRouterTags; for (auto& old : self->cstate.prevDBState.oldTLogData) { @@ -1443,6 +1468,12 @@ ACTOR Future clusterRecoveryCore(Reference self) { wait(self->cstate.read()); + // Unless the cluster database is 'empty', the cluster's EncryptionAtRest status is readable once cstate is + // recovered + if (!self->cstate.myDBState.tLogs.empty() && self->controllerData->encryptionAtRestMode.canBeSet()) { + self->controllerData->encryptionAtRestMode.send(self->cstate.myDBState.encryptionAtRestMode); + } + if (self->cstate.prevDBState.lowestCompatibleProtocolVersion > currentProtocolVersion()) { TraceEvent(SevWarnAlways, "IncompatibleProtocolVersion", self->dbgid).log(); throw internal_error(); diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index 5bd4433657..eb3c498e11 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -45,6 +45,8 @@ #include "fdbserver/FDBExecHelper.actor.h" #include "flow/Histogram.h" #include "flow/DebugTrace.h" +#include "flow/genericactors.actor.h" +#include "flow/network.h" #include "flow/actorcompiler.h" // This must be the last #include. struct TLogQueueEntryRef { @@ -216,6 +218,8 @@ static const KeyRange persistTagMessagesKeys = prefixRange("TagMsg/"_sr); static const KeyRange persistTagMessageRefsKeys = prefixRange("TagMsgRef/"_sr); static const KeyRange persistTagPoppedKeys = prefixRange("TagPop/"_sr); +static const KeyRef persistEncryptionAtRestModeKey = "encryptionAtRestMode"_sr; + static Key persistTagMessagesKey(UID id, Tag tag, Version version) { BinaryWriter wr(Unversioned()); wr.serializeBytes(persistTagMessagesKeys.begin); @@ -306,6 +310,8 @@ struct TLogData : NonCopyable { UID dbgid; UID workerID; + Optional encryptionAtRestMode; + IKeyValueStore* persistentData; // Durable data on disk that were spilled. IDiskQueue* rawPersistentQueue; // The physical queue the persistentQueue below stores its data. Ideally, log // interface should work without directly accessing rawPersistentQueue @@ -2391,6 +2397,33 @@ ACTOR Future initPersistentState(TLogData* self, Reference logDat return Void(); } +ACTOR Future getEncryptionAtRestMode(TLogData* self) { + loop { + state GetEncryptionAtRestModeRequest req(self->dbgid); + try { + choose { + when(wait(self->dbInfo->onChange())) {} + when(GetEncryptionAtRestModeResponse resp = wait(brokenPromiseToNever( + self->dbInfo->get().clusterInterface.getEncryptionAtRestMode.getReply(req)))) { + TraceEvent("GetEncryptionAtRestMode", self->dbgid).detail("Mode", resp.mode); + + // TODO: TLOG_ENCTYPTION KNOB shall be removed and db-config check should be sufficient to + // determine tlog (and cluster) encryption status + if ((EncryptionAtRestMode::Mode)resp.mode != EncryptionAtRestMode::Mode::DISABLED && + SERVER_KNOBS->ENABLE_TLOG_ENCRYPTION) { + return EncryptionAtRestMode((EncryptionAtRestMode::Mode)resp.mode); + } else { + return EncryptionAtRestMode(); + } + } + } + } catch (Error& e) { + TraceEvent("GetEncryptionAtRestError", self->dbgid).error(e); + throw; + } + } +} + // send stopped promise instead of LogData* to avoid reference cycles ACTOR Future rejoinClusterController(TLogData* self, TLogInterface tli, @@ -2579,6 +2612,32 @@ ACTOR Future tLogEnablePopReq(TLogEnablePopRequest enablePopReq, TLogData* return Void(); } +ACTOR Future checkUpdateEncryptionAtRestMode(TLogData* self) { + EncryptionAtRestMode encryptionAtRestMode = wait(getEncryptionAtRestMode(self)); + + if (self->encryptionAtRestMode.present()) { + // Ensure the TLog encryptionAtRestMode status matches with the cluster config, if not, kill the TLog process. + // Approach prevents a fake TLog process joining the cluster. + if (self->encryptionAtRestMode.get() != encryptionAtRestMode) { + TraceEvent("EncryptionAtRestMismatch", self->dbgid) + .detail("Expected", encryptionAtRestMode.toString()) + .detail("Present", self->encryptionAtRestMode.get().toString()); + ASSERT(false); + } + } else { + self->encryptionAtRestMode = Optional(encryptionAtRestMode); + wait(self->persistentDataCommitLock.take()); + state FlowLock::Releaser commitLockReleaser(self->persistentDataCommitLock); + self->persistentData->set( + KeyValueRef(persistEncryptionAtRestModeKey, self->encryptionAtRestMode.get().toValue())); + wait(self->persistentData->commit()); + TraceEvent("PersistEncryptionAtRestMode", self->dbgid) + .detail("Mode", self->encryptionAtRestMode.get().toString()); + } + + return Void(); +} + ACTOR Future serveTLogInterface(TLogData* self, TLogInterface tli, Reference logData, @@ -2966,6 +3025,7 @@ ACTOR Future restorePersistentState(TLogData* self, state IKeyValueStore* storage = self->persistentData; state Future> fFormat = storage->readValue(persistFormat.key); state Future> fRecoveryLocation = storage->readValue(persistRecoveryLocationKey); + state Future> fEncryptionAtRestMode = storage->readValue(persistEncryptionAtRestModeKey); state Future fVers = storage->readRange(persistCurrentVersionKeys); state Future fKnownCommitted = storage->readRange(persistKnownCommittedVersionKeys); state Future fLocality = storage->readRange(persistLocalityKeys); @@ -2977,7 +3037,7 @@ ACTOR Future restorePersistentState(TLogData* self, // FIXME: metadata in queue? - wait(waitForAll(std::vector{ fFormat, fRecoveryLocation })); + wait(waitForAll(std::vector{ fFormat, fRecoveryLocation, fEncryptionAtRestMode })); wait(waitForAll(std::vector{ fVers, fKnownCommitted, fLocality, @@ -2987,6 +3047,12 @@ ACTOR Future restorePersistentState(TLogData* self, fProtocolVersions, fTLogSpillTypes })); + if (fEncryptionAtRestMode.get().present()) { + self->encryptionAtRestMode = + Optional(EncryptionAtRestMode::fromValue(fEncryptionAtRestMode.get())); + TraceEvent("PersistEncryptionAtRestModeRead").detail("Mode", self->encryptionAtRestMode.get().toString()); + } + if (fFormat.get().present() && !persistFormatReadableRange.contains(fFormat.get().get())) { // FIXME: remove when we no longer need to test upgrades from 4.X releases if (g_network->isSimulated()) { @@ -3537,11 +3603,13 @@ ACTOR Future tLog(IKeyValueStore* persistentData, // Disk errors need a chance to kill this actor. wait(delay(0.000001)); - if (recovered.canBeSet()) + if (recovered.canBeSet()) { recovered.send(Void()); + } self.sharedActors.send(commitQueue(&self)); self.sharedActors.send(updateStorageLoop(&self)); + self.sharedActors.send(checkUpdateEncryptionAtRestMode(&self)); self.sharedActors.send(traceRole(Role::SHARED_TRANSACTION_LOG, tlogId)); state Future activeSharedChange = Void(); diff --git a/fdbserver/include/fdbserver/ClusterController.actor.h b/fdbserver/include/fdbserver/ClusterController.actor.h index f390b4c1ce..b3f9aa27d7 100644 --- a/fdbserver/include/fdbserver/ClusterController.actor.h +++ b/fdbserver/include/fdbserver/ClusterController.actor.h @@ -3400,6 +3400,12 @@ public: excludedDegradedServers; // The degraded servers to be excluded when assigning workers to roles. std::queue recentHealthTriggeredRecoveryTime; + // Capture cluster's Encryption data at-rest mode; the status is set 'only' at the time of cluster creation. + // The promise gets set as part of cluster recovery process and is used by recovering encryption participant + // stateful processes (such as TLog) to ensure the stateful process on-disk encryption status matches with cluster's + // encryption status. + Promise encryptionAtRestMode; + CounterCollection clusterControllerMetrics; Counter openDatabaseRequests; diff --git a/fdbserver/include/fdbserver/ClusterRecovery.actor.h b/fdbserver/include/fdbserver/ClusterRecovery.actor.h index eb2c4bf464..f8c5502abe 100644 --- a/fdbserver/include/fdbserver/ClusterRecovery.actor.h +++ b/fdbserver/include/fdbserver/ClusterRecovery.actor.h @@ -132,7 +132,7 @@ private: try { wait(self->cstate.setExclusive( - BinaryWriter::toValue(newState, IncludeVersion(ProtocolVersion::withDBCoreState())))); + BinaryWriter::toValue(newState, IncludeVersion(ProtocolVersion::withEncryptionAtRest())))); } catch (Error& e) { CODE_PROBE(true, "Master displaced during writeMasterState"); throw; diff --git a/fdbserver/include/fdbserver/DBCoreState.h b/fdbserver/include/fdbserver/DBCoreState.h index 352b9163ac..3d8e14d2b8 100644 --- a/fdbserver/include/fdbserver/DBCoreState.h +++ b/fdbserver/include/fdbserver/DBCoreState.h @@ -28,6 +28,7 @@ #include "fdbrpc/ReplicationPolicy.h" #include "fdbserver/LogSystemConfig.h" #include "fdbserver/MasterInterface.h" +#include "flow/ObjectSerializerTraits.h" class LogSet; struct OldLogData; @@ -143,7 +144,7 @@ struct DBCoreState { std::set pseudoLocalities; ProtocolVersion newestProtocolVersion; ProtocolVersion lowestCompatibleProtocolVersion; - EncryptionAtRestMode encryptionAtRestMode; + EncryptionAtRestMode encryptionAtRestMode; // cluster encryption data at-rest mode DBCoreState() : logRouterTags(0), txsTags(0), recoveryCount(0), logSystemType(LogSystemType::empty), diff --git a/fdbserver/include/fdbserver/WorkerInterface.actor.h b/fdbserver/include/fdbserver/WorkerInterface.actor.h index d1e3854baf..2755968190 100644 --- a/fdbserver/include/fdbserver/WorkerInterface.actor.h +++ b/fdbserver/include/fdbserver/WorkerInterface.actor.h @@ -175,6 +175,7 @@ struct ClusterControllerFullInterface { tlogRejoin; // sent by tlog (whether or not rebooted) to communicate with a new controller RequestStream notifyBackupWorkerDone; RequestStream changeCoordinators; + RequestStream getEncryptionAtRestMode; UID id() const { return clientInterface.id(); } bool operator==(ClusterControllerFullInterface const& r) const { return id() == r.id(); } @@ -189,7 +190,7 @@ struct ClusterControllerFullInterface { getWorkers.getFuture().isReady() || registerMaster.getFuture().isReady() || getServerDBInfo.getFuture().isReady() || updateWorkerHealth.getFuture().isReady() || tlogRejoin.getFuture().isReady() || notifyBackupWorkerDone.getFuture().isReady() || - changeCoordinators.getFuture().isReady(); + changeCoordinators.getFuture().isReady() || getEncryptionAtRestMode.getFuture().isReady(); } void initEndpoints() { @@ -206,6 +207,7 @@ struct ClusterControllerFullInterface { tlogRejoin.getEndpoint(TaskPriority::MasterTLogRejoin); notifyBackupWorkerDone.getEndpoint(TaskPriority::ClusterController); changeCoordinators.getEndpoint(TaskPriority::DefaultEndpoint); + getEncryptionAtRestMode.getEndpoint(TaskPriority::ClusterController); } template @@ -226,7 +228,8 @@ struct ClusterControllerFullInterface { updateWorkerHealth, tlogRejoin, notifyBackupWorkerDone, - changeCoordinators); + changeCoordinators, + getEncryptionAtRestMode); } }; @@ -572,6 +575,33 @@ struct BackupWorkerDoneRequest { } }; +struct GetEncryptionAtRestModeResponse { + constexpr static FileIdentifier file_identifier = 2932156; + uint32_t mode; + + GetEncryptionAtRestModeResponse() : mode(EncryptionAtRestMode::Mode::DISABLED) {} + GetEncryptionAtRestModeResponse(uint32_t m) : mode(m) {} + + template + void serialize(Ar& ar) { + serializer(ar, mode); + } +}; + +struct GetEncryptionAtRestModeRequest { + constexpr static FileIdentifier file_identifier = 2670826; + UID tlogId; + ReplyPromise reply; + + GetEncryptionAtRestModeRequest() {} + GetEncryptionAtRestModeRequest(UID tId) : tlogId(tId) {} + + template + void serialize(Ar& ar) { + serializer(ar, tlogId, reply); + } +}; + struct InitializeTLogRequest { constexpr static FileIdentifier file_identifier = 15604392; UID recruitmentID; From b02509e356a65115b03ddcd361e728a2af849a1c Mon Sep 17 00:00:00 2001 From: Jingyu Zhou Date: Thu, 3 Nov 2022 12:10:41 -0700 Subject: [PATCH 12/33] Fix ASan failure heap use after free "incomplete_reasons" may already be destroyed. --- fdbserver/Status.actor.cpp | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/fdbserver/Status.actor.cpp b/fdbserver/Status.actor.cpp index 2ab6b15870..9747b5bd71 100644 --- a/fdbserver/Status.actor.cpp +++ b/fdbserver/Status.actor.cpp @@ -781,6 +781,8 @@ ACTOR static Future processStatusFetcher( // Map the address of the worker to the error message object tracefileOpenErrorMap[traceFileErrorsItr->first.toString()] = msgObj; } catch (Error& e) { + if (e.code() == error_code_actor_cancelled) + throw; incomplete_reasons->insert("file_open_error details could not be retrieved"); } } @@ -1095,6 +1097,8 @@ ACTOR static Future processStatusFetcher( } } catch (Error& e) { + if (e.code() == error_code_actor_cancelled) + throw; // Something strange occurred, process list is incomplete but what was built so far, if anything, will be // returned. incomplete_reasons->insert("Cannot retrieve all process status information."); @@ -1410,6 +1414,8 @@ ACTOR static Future latencyProbeFetcher(Database cx, wait(waitForAll(probes)); } catch (Error& e) { + if (e.code() == error_code_actor_cancelled) + throw; incomplete_reasons->insert(format("Unable to retrieve latency probe information (%s).", e.what())); } @@ -1449,6 +1455,8 @@ ACTOR static Future consistencyCheckStatusFetcher(Database cx, } } } catch (Error& e) { + if (e.code() == error_code_actor_cancelled) + throw; incomplete_reasons->insert(format("Unable to retrieve consistency check settings (%s).", e.what())); } return Void(); @@ -1540,6 +1548,8 @@ ACTOR static Future logRangeWarningFetcher(Database cx, } } } catch (Error& e) { + if (e.code() == error_code_actor_cancelled) + throw; incomplete_reasons->insert(format("Unable to retrieve log ranges (%s).", e.what())); } return Void(); @@ -1713,7 +1723,9 @@ static JsonBuilderObject configurationFetcher(Optional co } int count = coordinators.clientLeaderServers.size(); statusObj["coordinators_count"] = count; - } catch (Error&) { + } catch (Error& e) { + if (e.code() == error_code_actor_cancelled) + throw; incomplete_reasons->insert("Could not retrieve all configuration status information."); } return statusObj; @@ -2735,6 +2747,8 @@ ACTOR Future layerStatusFetcher(Database cx, } } catch (Error& e) { TraceEvent(SevWarn, "LayerStatusError").error(e); + if (e.code() == error_code_actor_cancelled) + throw; incomplete_reasons->insert(format("Unable to retrieve layer status (%s).", e.what())); json.create("_error") = format("Unable to retrieve layer status (%s).", e.what()); json.create("_valid") = false; From ecb5b5a9cacd880e002309a804b1f28760b5e685 Mon Sep 17 00:00:00 2001 From: Andrew Noyes Date: Thu, 3 Nov 2022 12:33:48 -0700 Subject: [PATCH 13/33] Improve keyAfter, singleKeyRange, and an arena usage (#8674) * Fix a potential memory error The returned KeyRef should live at least as long as the supplied arena * Improve keyAfter and singleKeyRange Avoid duplicating the keyAfter implementation, and share memory between begin and end for singleKeyRange returning a standalone * Avoid creating and destroying an arena in a loop This defeats some of the purposes of Arena's, namely to amortize the cost of calling malloc and free and to improve cache locality. * Improve Arena usage Avoid an arena allocation in keyAfter - instead return a string with static lifetime. I made sure to return the same memory as was just brought into cache to inspect whether key == \xff\xff. Also avoid creating and destroying an arena in a loop for encrypting idempotency id sets. --- fdbclient/include/fdbclient/FDBTypes.h | 31 ++++++++++++-------------- fdbserver/CommitProxyServer.actor.cpp | 6 ++--- 2 files changed, 17 insertions(+), 20 deletions(-) diff --git a/fdbclient/include/fdbclient/FDBTypes.h b/fdbclient/include/fdbclient/FDBTypes.h index 23e84affee..36a12ef14b 100644 --- a/fdbclient/include/fdbclient/FDBTypes.h +++ b/fdbclient/include/fdbclient/FDBTypes.h @@ -546,29 +546,21 @@ struct hash { enum { invalidVersion = -1, latestVersion = -2, MAX_VERSION = std::numeric_limits::max() }; -inline Key keyAfter(const KeyRef& key) { - if (key == "\xff\xff"_sr) - return key; - - Standalone r; - uint8_t* s = new (r.arena()) uint8_t[key.size() + 1]; - if (key.size() > 0) { - memcpy(s, key.begin(), key.size()); - } - s[key.size()] = 0; - ((StringRef&)r) = StringRef(s, key.size() + 1); - return r; -} inline KeyRef keyAfter(const KeyRef& key, Arena& arena) { - if (key == "\xff\xff"_sr) - return key; + // Don't include fdbclient/SystemData.h for the allKeys symbol to avoid a cyclic include + static const auto allKeysEnd = "\xff\xff"_sr; + if (key == allKeysEnd) { + return allKeysEnd; + } uint8_t* t = new (arena) uint8_t[key.size() + 1]; memcpy(t, key.begin(), key.size()); t[key.size()] = 0; return KeyRef(t, key.size() + 1); } -inline KeyRange singleKeyRange(const KeyRef& a) { - return KeyRangeRef(a, keyAfter(a)); +inline Key keyAfter(const KeyRef& key) { + Key result; + result.contents() = keyAfter(key, result.arena()); + return result; } inline KeyRangeRef singleKeyRange(KeyRef const& key, Arena& arena) { uint8_t* t = new (arena) uint8_t[key.size() + 1]; @@ -576,6 +568,11 @@ inline KeyRangeRef singleKeyRange(KeyRef const& key, Arena& arena) { t[key.size()] = 0; return KeyRangeRef(KeyRef(t, key.size()), KeyRef(t, key.size() + 1)); } +inline KeyRange singleKeyRange(const KeyRef& a) { + KeyRange result; + result.contents() = singleKeyRange(a, result.arena()); + return result; +} inline KeyRange prefixRange(KeyRef prefix) { Standalone range; KeyRef start = KeyRef(range.arena(), prefix); diff --git a/fdbserver/CommitProxyServer.actor.cpp b/fdbserver/CommitProxyServer.actor.cpp index b310c66ba1..a49b45bd73 100644 --- a/fdbserver/CommitProxyServer.actor.cpp +++ b/fdbserver/CommitProxyServer.actor.cpp @@ -1627,9 +1627,8 @@ ACTOR Future postResolution(CommitBatchContext* self) { CODE_PROBE(true, "encrypting idempotency mutation"); std::pair p = getEncryptDetailsFromMutationRef(self->pProxyCommitData, idempotencyIdSet); - Arena arena; MutationRef encryptedMutation = idempotencyIdSet.encrypt( - self->cipherKeys, p.second, arena, BlobCipherMetrics::TLOG); + self->cipherKeys, p.second, self->arena, BlobCipherMetrics::TLOG); self->toCommit.writeTypedMessage(encryptedMutation); } else { self->toCommit.writeTypedMessage(idempotencyIdSet); @@ -1640,7 +1639,8 @@ ACTOR Future postResolution(CommitBatchContext* self) { MutationRef& m = pProxyCommitData->idempotencyClears[i]; auto& tags = pProxyCommitData->tagsForKey(m.param1); self->toCommit.addTags(tags); - Arena arena; + // We already have an arena with an appropriate lifetime handy + Arena& arena = pProxyCommitData->idempotencyClears.arena(); wait(success(writeMutation(self, SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_ID, &m, nullptr, &arena))); } pProxyCommitData->idempotencyClears = Standalone>(); From 2a5ff5bba5d1472b5a100be7409c05d4132caf3f Mon Sep 17 00:00:00 2001 From: Jingyu Zhou Date: Thu, 3 Nov 2022 12:19:30 -0700 Subject: [PATCH 14/33] Update 7.1 release notes --- documentation/sphinx/source/release-notes/release-notes-710.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/documentation/sphinx/source/release-notes/release-notes-710.rst b/documentation/sphinx/source/release-notes/release-notes-710.rst index 9611daf1cc..84f5941217 100644 --- a/documentation/sphinx/source/release-notes/release-notes-710.rst +++ b/documentation/sphinx/source/release-notes/release-notes-710.rst @@ -11,6 +11,7 @@ Release Notes * Released with AVX disabled. * Fixed a transaction log data corruption bug. `(PR #8525) `_, `(PR #8562) `_, and `(PR #8647) `_ * Fixed a rare data race in transaction logs when PEEK_BATCHING_EMPTY_MSG is enabled. `(PR #8660) `_ +* Fixed a heap-use-after-free bug in cluster controller. `(PR #8683) `_ * Changed consistency check to report all corruptions. `(PR #8571) `_ * Fixed a rare storage server crashing bug after recovery. `(PR #8468) `_ * Added client knob UNLINKONLOAD_FDBCLIB to control deletion of external client libraries. `(PR #8434) `_ From b02e3cbd0ed396eda01f699125f099a4f538fb40 Mon Sep 17 00:00:00 2001 From: "A.J. Beamon" Date: Thu, 3 Nov 2022 13:26:56 -0700 Subject: [PATCH 15/33] Initialize the contents of an unused value to make valgrind happy with serializing it. --- fdbserver/workloads/TransactionCost.actor.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fdbserver/workloads/TransactionCost.actor.cpp b/fdbserver/workloads/TransactionCost.actor.cpp index 93bebdf985..fa6b0ec436 100644 --- a/fdbserver/workloads/TransactionCost.actor.cpp +++ b/fdbserver/workloads/TransactionCost.actor.cpp @@ -34,7 +34,7 @@ class TransactionCostWorkload : public TestWorkload { return bw.toValue().withPrefix(prefix); } - static Value getValue(uint32_t size) { return makeString(size); } + static Value getValue(uint32_t size) { return ValueRef(std::string(size, '\x00')); } static UID getDebugID(uint64_t testNumber) { return UID(testNumber << 32, testNumber << 32); } From 928be563a28eeac5fe7d15f56f3d42e6f6d6ed6a Mon Sep 17 00:00:00 2001 From: Jingyu Zhou Date: Thu, 3 Nov 2022 13:59:00 -0700 Subject: [PATCH 16/33] Add braces --- fdbserver/Status.actor.cpp | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/fdbserver/Status.actor.cpp b/fdbserver/Status.actor.cpp index 9747b5bd71..fa0da0f974 100644 --- a/fdbserver/Status.actor.cpp +++ b/fdbserver/Status.actor.cpp @@ -781,8 +781,9 @@ ACTOR static Future processStatusFetcher( // Map the address of the worker to the error message object tracefileOpenErrorMap[traceFileErrorsItr->first.toString()] = msgObj; } catch (Error& e) { - if (e.code() == error_code_actor_cancelled) + if (e.code() == error_code_actor_cancelled) { throw; + } incomplete_reasons->insert("file_open_error details could not be retrieved"); } } @@ -1097,8 +1098,9 @@ ACTOR static Future processStatusFetcher( } } catch (Error& e) { - if (e.code() == error_code_actor_cancelled) + if (e.code() == error_code_actor_cancelled) { throw; + } // Something strange occurred, process list is incomplete but what was built so far, if anything, will be // returned. incomplete_reasons->insert("Cannot retrieve all process status information."); @@ -1414,8 +1416,9 @@ ACTOR static Future latencyProbeFetcher(Database cx, wait(waitForAll(probes)); } catch (Error& e) { - if (e.code() == error_code_actor_cancelled) + if (e.code() == error_code_actor_cancelled) { throw; + } incomplete_reasons->insert(format("Unable to retrieve latency probe information (%s).", e.what())); } @@ -1455,8 +1458,9 @@ ACTOR static Future consistencyCheckStatusFetcher(Database cx, } } } catch (Error& e) { - if (e.code() == error_code_actor_cancelled) + if (e.code() == error_code_actor_cancelled) { throw; + } incomplete_reasons->insert(format("Unable to retrieve consistency check settings (%s).", e.what())); } return Void(); @@ -1548,8 +1552,9 @@ ACTOR static Future logRangeWarningFetcher(Database cx, } } } catch (Error& e) { - if (e.code() == error_code_actor_cancelled) + if (e.code() == error_code_actor_cancelled) { throw; + } incomplete_reasons->insert(format("Unable to retrieve log ranges (%s).", e.what())); } return Void(); @@ -1724,8 +1729,9 @@ static JsonBuilderObject configurationFetcher(Optional co int count = coordinators.clientLeaderServers.size(); statusObj["coordinators_count"] = count; } catch (Error& e) { - if (e.code() == error_code_actor_cancelled) + if (e.code() == error_code_actor_cancelled) { throw; + } incomplete_reasons->insert("Could not retrieve all configuration status information."); } return statusObj; @@ -2747,8 +2753,9 @@ ACTOR Future layerStatusFetcher(Database cx, } } catch (Error& e) { TraceEvent(SevWarn, "LayerStatusError").error(e); - if (e.code() == error_code_actor_cancelled) + if (e.code() == error_code_actor_cancelled) { throw; + } incomplete_reasons->insert(format("Unable to retrieve layer status (%s).", e.what())); json.create("_error") = format("Unable to retrieve layer status (%s).", e.what()); json.create("_valid") = false; From 494dd1c5ed0e58e41ceff6e0515a1ca265c7ef42 Mon Sep 17 00:00:00 2001 From: Josh Slocum Date: Thu, 3 Nov 2022 16:42:36 -0500 Subject: [PATCH 17/33] switch bg file data chunks to be serialized with binary writer instead of object writer (#8535) --- fdbclient/BlobGranuleFiles.cpp | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/fdbclient/BlobGranuleFiles.cpp b/fdbclient/BlobGranuleFiles.cpp index db01084bb7..e0646ec0a9 100644 --- a/fdbclient/BlobGranuleFiles.cpp +++ b/fdbclient/BlobGranuleFiles.cpp @@ -650,12 +650,12 @@ struct IndexedBlobGranuleFile { IndexBlobGranuleFileChunkRef chunkRef = IndexBlobGranuleFileChunkRef::fromBytes(cipherKeysCtx, childData, childArena); - ChildType child; - ObjectReader dataReader(chunkRef.chunkBytes.get().begin(), IncludeVersion()); - dataReader.deserialize(FileIdentifierFor::value, child, childArena); - // TODO implement some sort of decrypted+decompressed+deserialized cache, if this object gets reused? - return Standalone(child, childArena); + + BinaryReader br(chunkRef.chunkBytes.get(), IncludeVersion()); + Standalone child; + br >> child; + return child; } template @@ -751,7 +751,7 @@ Value serializeChunkedSnapshot(const Standalone& fileNameRef, if (currentChunkBytesEstimate >= targetChunkBytes || i == snapshot.size() - 1) { Value serialized = - ObjectWriter::toValue(currentChunk, IncludeVersion(ProtocolVersion::withBlobGranuleFile())); + BinaryWriter::toValue(currentChunk, IncludeVersion(ProtocolVersion::withBlobGranuleFile())); Value chunkBytes = IndexBlobGranuleFileChunkRef::toBytes(cipherKeysCtx, compressFilter, serialized, file.arena()); chunks.push_back(chunkBytes); @@ -1020,7 +1020,7 @@ Value serializeChunkedDeltaFile(const Standalone& fileNameRef, if (currentChunkBytesEstimate >= chunkSize || i == boundaries.size() - 1) { Value serialized = - ObjectWriter::toValue(currentChunk, IncludeVersion(ProtocolVersion::withBlobGranuleFile())); + BinaryWriter::toValue(currentChunk, IncludeVersion(ProtocolVersion::withBlobGranuleFile())); Value chunkBytes = IndexBlobGranuleFileChunkRef::toBytes(cipherKeysCtx, compressFilter, serialized, file.arena()); chunks.push_back(chunkBytes); From d68cd3493eb3418fc6fb9a534a2cacfa2d6c59b6 Mon Sep 17 00:00:00 2001 From: Josh Slocum Date: Thu, 3 Nov 2022 16:42:46 -0500 Subject: [PATCH 18/33] making read caching configurable for blob store (#8607) --- documentation/sphinx/source/backups.rst | 2 ++ fdbclient/BackupContainerS3BlobStore.actor.cpp | 12 +++++++----- fdbclient/ClientKnobs.cpp | 1 + fdbclient/S3BlobStore.actor.cpp | 3 +++ fdbclient/include/fdbclient/ClientKnobs.h | 1 + fdbclient/include/fdbclient/S3BlobStore.h | 5 +++-- 6 files changed, 17 insertions(+), 7 deletions(-) diff --git a/documentation/sphinx/source/backups.rst b/documentation/sphinx/source/backups.rst index bb2275b622..158209ddf6 100644 --- a/documentation/sphinx/source/backups.rst +++ b/documentation/sphinx/source/backups.rst @@ -142,6 +142,8 @@ Here is a complete list of valid parameters: *multipart_min_part_size* (or *minps*) - Min part size for multipart uploads. + *enable_read_cache* (or *erc*) - Whether to enable read block cache. + *read_block_size* (or *rbs*) - Block size in bytes to be used for reads. *read_ahead_blocks* (or *rab*) - Number of blocks to read ahead of requested offset. diff --git a/fdbclient/BackupContainerS3BlobStore.actor.cpp b/fdbclient/BackupContainerS3BlobStore.actor.cpp index 413c8ea09b..8946717319 100644 --- a/fdbclient/BackupContainerS3BlobStore.actor.cpp +++ b/fdbclient/BackupContainerS3BlobStore.actor.cpp @@ -175,11 +175,13 @@ Future> BackupContainerS3BlobStore::readFile(const std::st if (usesEncryption()) { f = makeReference(f, AsyncFileEncrypted::Mode::READ_ONLY); } - f = makeReference(f, - m_bstore->knobs.read_block_size, - m_bstore->knobs.read_ahead_blocks, - m_bstore->knobs.concurrent_reads_per_file, - m_bstore->knobs.read_cache_blocks_per_file); + if (m_bstore->knobs.enable_read_cache) { + f = makeReference(f, + m_bstore->knobs.read_block_size, + m_bstore->knobs.read_ahead_blocks, + m_bstore->knobs.concurrent_reads_per_file, + m_bstore->knobs.read_cache_blocks_per_file); + } return f; } diff --git a/fdbclient/ClientKnobs.cpp b/fdbclient/ClientKnobs.cpp index b88a6b6a9f..37110a5503 100644 --- a/fdbclient/ClientKnobs.cpp +++ b/fdbclient/ClientKnobs.cpp @@ -220,6 +220,7 @@ void ClientKnobs::initialize(Randomize randomize) { init( BLOBSTORE_CONCURRENT_WRITES_PER_FILE, 5 ); init( BLOBSTORE_CONCURRENT_READS_PER_FILE, 3 ); + init( BLOBSTORE_ENABLE_READ_CACHE, true ); init( BLOBSTORE_READ_BLOCK_SIZE, 1024 * 1024 ); init( BLOBSTORE_READ_AHEAD_BLOCKS, 0 ); init( BLOBSTORE_READ_CACHE_BLOCKS_PER_FILE, 2 ); diff --git a/fdbclient/S3BlobStore.actor.cpp b/fdbclient/S3BlobStore.actor.cpp index b4f5e9a9b7..049c4a2987 100644 --- a/fdbclient/S3BlobStore.actor.cpp +++ b/fdbclient/S3BlobStore.actor.cpp @@ -88,6 +88,7 @@ S3BlobStoreEndpoint::BlobKnobs::BlobKnobs() { concurrent_lists = CLIENT_KNOBS->BLOBSTORE_CONCURRENT_LISTS; concurrent_reads_per_file = CLIENT_KNOBS->BLOBSTORE_CONCURRENT_READS_PER_FILE; concurrent_writes_per_file = CLIENT_KNOBS->BLOBSTORE_CONCURRENT_WRITES_PER_FILE; + enable_read_cache = CLIENT_KNOBS->BLOBSTORE_ENABLE_READ_CACHE; read_block_size = CLIENT_KNOBS->BLOBSTORE_READ_BLOCK_SIZE; read_ahead_blocks = CLIENT_KNOBS->BLOBSTORE_READ_AHEAD_BLOCKS; read_cache_blocks_per_file = CLIENT_KNOBS->BLOBSTORE_READ_CACHE_BLOCKS_PER_FILE; @@ -125,6 +126,7 @@ bool S3BlobStoreEndpoint::BlobKnobs::set(StringRef name, int value) { TRY_PARAM(concurrent_lists, cl); TRY_PARAM(concurrent_reads_per_file, crpf); TRY_PARAM(concurrent_writes_per_file, cwpf); + TRY_PARAM(enable_read_cache, erc); TRY_PARAM(read_block_size, rbs); TRY_PARAM(read_ahead_blocks, rab); TRY_PARAM(read_cache_blocks_per_file, rcb); @@ -162,6 +164,7 @@ std::string S3BlobStoreEndpoint::BlobKnobs::getURLParameters() const { _CHECK_PARAM(concurrent_lists, cl); _CHECK_PARAM(concurrent_reads_per_file, crpf); _CHECK_PARAM(concurrent_writes_per_file, cwpf); + _CHECK_PARAM(enable_read_cache, erc); _CHECK_PARAM(read_block_size, rbs); _CHECK_PARAM(read_ahead_blocks, rab); _CHECK_PARAM(read_cache_blocks_per_file, rcb); diff --git a/fdbclient/include/fdbclient/ClientKnobs.h b/fdbclient/include/fdbclient/ClientKnobs.h index 5c2b749bc8..89caecd327 100644 --- a/fdbclient/include/fdbclient/ClientKnobs.h +++ b/fdbclient/include/fdbclient/ClientKnobs.h @@ -235,6 +235,7 @@ public: int BLOBSTORE_CONCURRENT_LISTS; int BLOBSTORE_CONCURRENT_WRITES_PER_FILE; int BLOBSTORE_CONCURRENT_READS_PER_FILE; + int BLOBSTORE_ENABLE_READ_CACHE; int BLOBSTORE_READ_BLOCK_SIZE; int BLOBSTORE_READ_AHEAD_BLOCKS; int BLOBSTORE_READ_CACHE_BLOCKS_PER_FILE; diff --git a/fdbclient/include/fdbclient/S3BlobStore.h b/fdbclient/include/fdbclient/S3BlobStore.h index a4eba100c9..25076c1a69 100644 --- a/fdbclient/include/fdbclient/S3BlobStore.h +++ b/fdbclient/include/fdbclient/S3BlobStore.h @@ -58,8 +58,8 @@ public: requests_per_second, list_requests_per_second, write_requests_per_second, read_requests_per_second, delete_requests_per_second, multipart_max_part_size, multipart_min_part_size, concurrent_requests, concurrent_uploads, concurrent_lists, concurrent_reads_per_file, concurrent_writes_per_file, - read_block_size, read_ahead_blocks, read_cache_blocks_per_file, max_send_bytes_per_second, - max_recv_bytes_per_second, sdk_auth; + enable_read_cache, read_block_size, read_ahead_blocks, read_cache_blocks_per_file, + max_send_bytes_per_second, max_recv_bytes_per_second, sdk_auth; bool set(StringRef name, int value); std::string getURLParameters() const; static std::vector getKnobDescriptions() { @@ -86,6 +86,7 @@ public: "concurrent_lists (or cl) Max concurrent list operations that can be in progress at once.", "concurrent_reads_per_file (or crps) Max concurrent reads in progress for any one file.", "concurrent_writes_per_file (or cwps) Max concurrent uploads in progress for any one file.", + "enable_read_cache (or erc) Whether read block caching is enabled.", "read_block_size (or rbs) Block size in bytes to be used for reads.", "read_ahead_blocks (or rab) Number of blocks to read ahead of requested offset.", "read_cache_blocks_per_file (or rcb) Size of the read cache for a file in blocks.", From 647c6487b45b5ef0cfc277f439e361a68073a81e Mon Sep 17 00:00:00 2001 From: Josh Slocum Date: Thu, 3 Nov 2022 16:43:30 -0500 Subject: [PATCH 19/33] add client cache size blob metadata latency metrics (#8484) --- fdbclient/BlobCipher.cpp | 4 ++++ fdbclient/include/fdbclient/BlobCipher.h | 1 + fdbserver/BlobGranuleServerCommon.actor.cpp | 4 +++- 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/fdbclient/BlobCipher.cpp b/fdbclient/BlobCipher.cpp index 9dc2c19798..24bbf3ceb6 100644 --- a/fdbclient/BlobCipher.cpp +++ b/fdbclient/BlobCipher.cpp @@ -76,6 +76,10 @@ BlobCipherMetrics::BlobCipherMetrics() UID(), FLOW_KNOBS->ENCRYPT_KEY_CACHE_LOGGING_INTERVAL, FLOW_KNOBS->ENCRYPT_KEY_CACHE_LOGGING_SAMPLE_SIZE), + getBlobMetadataLatency("GetBlobMetadataLatency", + UID(), + FLOW_KNOBS->ENCRYPT_KEY_CACHE_LOGGING_INTERVAL, + FLOW_KNOBS->ENCRYPT_KEY_CACHE_LOGGING_SAMPLE_SIZE), counterSets({ CounterSet(cc, "TLog"), CounterSet(cc, "KVMemory"), CounterSet(cc, "KVRedwood"), diff --git a/fdbclient/include/fdbclient/BlobCipher.h b/fdbclient/include/fdbclient/BlobCipher.h index e4c52a80af..f0855d8f8d 100644 --- a/fdbclient/include/fdbclient/BlobCipher.h +++ b/fdbclient/include/fdbclient/BlobCipher.h @@ -103,6 +103,7 @@ public: Counter latestCipherKeyCacheNeedsRefresh; LatencySample getCipherKeysLatency; LatencySample getLatestCipherKeysLatency; + LatencySample getBlobMetadataLatency; std::array counterSets; }; diff --git a/fdbserver/BlobGranuleServerCommon.actor.cpp b/fdbserver/BlobGranuleServerCommon.actor.cpp index 3ecbea48d0..ef6f2b1efe 100644 --- a/fdbserver/BlobGranuleServerCommon.actor.cpp +++ b/fdbserver/BlobGranuleServerCommon.actor.cpp @@ -462,7 +462,7 @@ ACTOR Future loadBlobMetadataForTenants( } // FIXME: if one tenant gets an error, don't kill whole process - // TODO: add latency metrics + state double startTime = now(); loop { Future requestFuture; if (self->dbInfo.isValid() && self->dbInfo->get().encryptKeyProxy.present()) { @@ -485,6 +485,8 @@ ACTOR Future loadBlobMetadataForTenants( ASSERT(dataEntry.begin() == info->second.prefix); dataEntry.cvalue()->updateBStore(metadata); } + double elapsed = now() - startTime; + BlobCipherMetrics::getInstance()->getBlobMetadataLatency.addMeasurement(elapsed); return Void(); } when(wait(self->dbInfo->onChange())) {} From 92e21de3620269df76037659d69eb117cfea0cae Mon Sep 17 00:00:00 2001 From: sfc-gh-tclinkenbeard Date: Thu, 3 Nov 2022 15:05:20 -0700 Subject: [PATCH 20/33] Disable ThroughputQuota.toml test --- tests/CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 85f05d1631..cdcb820f83 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -239,7 +239,7 @@ if(WITH_PYTHON) add_fdb_test(TEST_FILES rare/RedwoodCorrectnessBTree.toml) add_fdb_test(TEST_FILES rare/RedwoodDeltaTree.toml) add_fdb_test(TEST_FILES rare/Throttling.toml) - add_fdb_test(TEST_FILES rare/ThroughputQuota.toml) + add_fdb_test(TEST_FILES rare/ThroughputQuota.toml IGNORE) add_fdb_test(TEST_FILES rare/TransactionCost.toml) add_fdb_test(TEST_FILES rare/TransactionTagApiCorrectness.toml) add_fdb_test(TEST_FILES rare/TransactionTagSwizzledApiCorrectness.toml) From 8b9e9fd7d94a91747152aacf5cebcae9f9b43300 Mon Sep 17 00:00:00 2001 From: Josh Slocum Date: Thu, 3 Nov 2022 18:47:54 -0500 Subject: [PATCH 21/33] Parallelize bg purge (#8687) * parallelizing full granule purging * Explicitly sorting full granule purges because they're not guaranteed to be queued in version order --- fdbserver/BlobManager.actor.cpp | 93 ++++++++++++++++++++++++++------- 1 file changed, 74 insertions(+), 19 deletions(-) diff --git a/fdbserver/BlobManager.actor.cpp b/fdbserver/BlobManager.actor.cpp index 84b482cfcf..4adaef1c1e 100644 --- a/fdbserver/BlobManager.actor.cpp +++ b/fdbserver/BlobManager.actor.cpp @@ -4238,7 +4238,13 @@ ACTOR Future fullyDeleteGranule(Reference self, Version purgeVersion, KeyRange granuleRange, Optional mergeChildID, - bool force) { + bool force, + Future parentFuture) { + // wait for parent to finish first to avoid ordering/orphaning issues + wait(parentFuture); + // yield to avoid a long callstack and to allow this to get cancelled + wait(delay(0)); + if (BM_PURGE_DEBUG) { fmt::print("BM {0} Fully deleting granule [{1} - {2}): {3} @ {4}{5}\n", self->epoch, @@ -4296,6 +4302,11 @@ ACTOR Future fullyDeleteGranule(Reference self, // deleting files before corresponding metadata reduces the # of orphaned files. wait(waitForAll(deletions)); + if (BUGGIFY && self->maybeInjectTargetedRestart()) { + wait(delay(0)); // should be cancelled + ASSERT(false); + } + // delete metadata in FDB (history entry and file keys) if (BM_PURGE_DEBUG) { fmt::print( @@ -4331,6 +4342,11 @@ ACTOR Future fullyDeleteGranule(Reference self, } } + if (BUGGIFY && self->maybeInjectTargetedRestart()) { + wait(delay(0)); // should be cancelled + ASSERT(false); + } + if (BM_PURGE_DEBUG) { fmt::print("BM {0} Fully deleting granule {1}: success {2}\n", self->epoch, @@ -4501,7 +4517,7 @@ ACTOR Future purgeRange(Reference self, KeyRangeRef range state std::queue>> historyEntryQueue; // stacks of and (and mergeChildID) to track which granules to delete - state std::vector>> toFullyDelete; + state std::vector, Version>> toFullyDelete; state std::vector> toPartiallyDelete; // track which granules we have already added to traversal @@ -4737,7 +4753,7 @@ ACTOR Future purgeRange(Reference self, KeyRangeRef range fmt::print( "BM {0} Granule {1} will be FULLY deleted\n", self->epoch, currHistoryNode.granuleID.toString()); } - toFullyDelete.push_back({ currHistoryNode.granuleID, historyKey, currRange, mergeChildID }); + toFullyDelete.push_back({ currHistoryNode.granuleID, historyKey, currRange, mergeChildID, startVersion }); } else if (startVersion < purgeVersion) { if (BM_PURGE_DEBUG) { fmt::print("BM {0} Granule {1} will be partially deleted\n", @@ -4810,36 +4826,65 @@ ACTOR Future purgeRange(Reference self, KeyRangeRef range .detail("DeletingFullyCount", toFullyDelete.size()) .detail("DeletingPartiallyCount", toPartiallyDelete.size()); - state std::vector> partialDeletions; state int i; if (BM_PURGE_DEBUG) { fmt::print("BM {0}: {1} granules to fully delete\n", self->epoch, toFullyDelete.size()); } // Go backwards through set of granules to guarantee deleting oldest first. This avoids orphaning granules in the // deletion process - // FIXME: could track explicit parent dependencies and parallelize so long as a parent and child aren't running in - // parallel, but that's non-trivial - for (i = toFullyDelete.size() - 1; i >= 0; --i) { - state UID granuleId; - Key historyKey; - KeyRange keyRange; - Optional mergeChildId; - std::tie(granuleId, historyKey, keyRange, mergeChildId) = toFullyDelete[i]; - // FIXME: consider batching into a single txn (need to take care of txn size limit) - if (BM_PURGE_DEBUG) { - fmt::print("BM {0}: About to fully delete granule {1}\n", self->epoch, granuleId.toString()); + if (!toFullyDelete.empty()) { + state std::vector> fullDeletions; + KeyRangeMap>> parentDelete; + parentDelete.insert(normalKeys, { 0, Future(Void()) }); + + std::vector> deleteOrder; + deleteOrder.reserve(toFullyDelete.size()); + for (int i = 0; i < toFullyDelete.size(); i++) { + deleteOrder.push_back({ std::get<4>(toFullyDelete[i]), i }); } - wait(fullyDeleteGranule(self, granuleId, historyKey, purgeVersion, keyRange, mergeChildId, force)); - if (BUGGIFY && self->maybeInjectTargetedRestart()) { - wait(delay(0)); // should be cancelled - ASSERT(false); + std::sort(deleteOrder.begin(), deleteOrder.end()); + + for (i = 0; i < deleteOrder.size(); i++) { + state UID granuleId; + Key historyKey; + KeyRange keyRange; + Optional mergeChildId; + Version startVersion; + std::tie(granuleId, historyKey, keyRange, mergeChildId, startVersion) = + toFullyDelete[deleteOrder[i].second]; + // FIXME: consider batching into a single txn (need to take care of txn size limit) + if (BM_PURGE_DEBUG) { + fmt::print("BM {0}: About to fully delete granule {1}\n", self->epoch, granuleId.toString()); + } + std::vector> parents; + auto parentRanges = parentDelete.intersectingRanges(keyRange); + for (auto& it : parentRanges) { + if (startVersion <= it.cvalue().first) { + fmt::print("ERROR: [{0} - {1}) @ {2} <= [{3} - {4}) @ {5}\n", + keyRange.begin.printable(), + keyRange.end.printable(), + startVersion, + it.begin().printable(), + it.end().printable(), + it.cvalue().first); + } + ASSERT(startVersion > it.cvalue().first); + parents.push_back(it.cvalue().second); + } + Future deleteFuture = fullyDeleteGranule( + self, granuleId, historyKey, purgeVersion, keyRange, mergeChildId, force, waitForAll(parents)); + fullDeletions.push_back(deleteFuture); + parentDelete.insert(keyRange, { startVersion, deleteFuture }); } + + wait(waitForAll(fullDeletions)); } if (BM_PURGE_DEBUG) { fmt::print("BM {0}: {1} granules to partially delete\n", self->epoch, toPartiallyDelete.size()); } + state std::vector> partialDeletions; for (i = toPartiallyDelete.size() - 1; i >= 0; --i) { UID granuleId; KeyRange keyRange; @@ -4852,6 +4897,11 @@ ACTOR Future purgeRange(Reference self, KeyRangeRef range wait(waitForAll(partialDeletions)); + if (BUGGIFY && self->maybeInjectTargetedRestart()) { + wait(delay(0)); // should be cancelled + ASSERT(false); + } + if (force) { tr.reset(); tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); @@ -4877,6 +4927,11 @@ ACTOR Future purgeRange(Reference self, KeyRangeRef range } } + if (BUGGIFY && self->maybeInjectTargetedRestart()) { + wait(delay(0)); // should be cancelled + ASSERT(false); + } + // Now that all the necessary granules and their files have been deleted, we can // clear the purgeIntent key to signify that the work is done. However, there could have been // another purgeIntent that got written for this table while we were processing this one. From cff99a64f618df9c5358928c51a5c1a7afcc6949 Mon Sep 17 00:00:00 2001 From: Josh Slocum Date: Thu, 3 Nov 2022 18:48:10 -0500 Subject: [PATCH 22/33] Blob Granule Attrition fixes (#8682) * Assert was incorrect in change feed destroy race with moved() clearing map * fixing race between injected fault and granule revoke * Handling race in sim2 blob worker attrition check --- fdbrpc/sim2.actor.cpp | 1 + fdbserver/BlobWorker.actor.cpp | 3 ++- fdbserver/storageserver.actor.cpp | 7 +++++-- 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/fdbrpc/sim2.actor.cpp b/fdbrpc/sim2.actor.cpp index 1618016b77..6cd133d03f 100644 --- a/fdbrpc/sim2.actor.cpp +++ b/fdbrpc/sim2.actor.cpp @@ -1410,6 +1410,7 @@ public: for (auto processInfo : getAllProcesses()) { if (currentDcId != processInfo->locality.dcId() || // skip other dc processInfo->startingClass != ProcessClass::BlobWorkerClass || // skip non blob workers + processInfo->failed || // if process was killed but has not yet been removed from the process list processInfo->locality.machineId() == machineId) { // skip current machine continue; } diff --git a/fdbserver/BlobWorker.actor.cpp b/fdbserver/BlobWorker.actor.cpp index a78ffc2118..3271d69973 100644 --- a/fdbserver/BlobWorker.actor.cpp +++ b/fdbserver/BlobWorker.actor.cpp @@ -4470,9 +4470,10 @@ ACTOR Future handleRangeAssign(Reference bwData, return Void(); } catch (Error& e) { if (e.code() == error_code_operation_cancelled) { - if (!bwData->shuttingDown) { + if (!bwData->shuttingDown && !isSelfReassign) { // the cancelled was because the granule open was cancelled, not because the whole blob // worker was. + ASSERT(!req.reply.isSet()); req.reply.sendError(granule_assignment_conflict()); } throw e; diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index c902c97f03..90f8350b82 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -595,8 +595,11 @@ struct ChangeFeedInfo : ReferenceCounted { auto toRemove = moveTriggers.modify(range); for (auto triggerRange = toRemove.begin(); triggerRange != toRemove.end(); ++triggerRange) { auto streamToRemove = triggerRange->value().find(streamUID); - ASSERT(streamToRemove != triggerRange->cvalue().end()); - triggerRange->value().erase(streamToRemove); + if (streamToRemove == triggerRange->cvalue().end()) { + ASSERT(destroyed); + } else { + triggerRange->value().erase(streamToRemove); + } } // TODO: may be more cleanup possible here } From d853304b1815d6ca167d0f7916a265bc5fcb25b0 Mon Sep 17 00:00:00 2001 From: Ata E Husain Bohra Date: Fri, 4 Nov 2022 07:40:41 -0700 Subject: [PATCH 23/33] Refactor CP::WriteMutation to optimize ACTOR alloc/dealloc (#8677) * Refactor CP::WriteMutation to optimize ACTOR alloc/dealloc Description WriteMutation routine is responsible for appending mutations to be persisted in TLog, the operation isn't a 'blocking' operation, except for few cases when Encryption is supported by the cluster such as: 1. Fetch encryption keys to encrypt the mutation. 2. Split ClearRange mutation to respect Encryption domain boundaries. 3. Ensure sanity of already encrypted mutation - simulation limited check. Patch optimizes the "fast" path by avoiding alloc/dealloc overhead due to be ACTOR framework support, the penalty happens iff any of above conditions are met. Testing devRunCorrectness - 100K --- fdbserver/CommitProxyServer.actor.cpp | 118 ++++++++++++++++++++------ 1 file changed, 90 insertions(+), 28 deletions(-) diff --git a/fdbserver/CommitProxyServer.actor.cpp b/fdbserver/CommitProxyServer.actor.cpp index a49b45bd73..35f6b64c94 100644 --- a/fdbserver/CommitProxyServer.actor.cpp +++ b/fdbserver/CommitProxyServer.actor.cpp @@ -20,6 +20,7 @@ #include #include +#include #include "fdbclient/Atomic.h" #include "fdbclient/BackupAgent.actor.h" @@ -68,6 +69,8 @@ #include "flow/actorcompiler.h" // This must be the last #include. #include "flow/network.h" +using WriteMutationRefVar = std::variant>; + ACTOR Future broadcastTxnRequest(TxnStateRequest req, int sendAmount, bool sendReply) { state ReplyPromise reply = req.reply; resetReply(req); @@ -1256,16 +1259,78 @@ ACTOR Future applyMetadataToCommittedTransactions(CommitBatchContext* self return Void(); } -ACTOR Future writeMutation(CommitBatchContext* self, - int64_t tenantId, - const MutationRef* mutation, - Optional* encryptedMutationOpt, - Arena* arena) { +ACTOR Future writeMutationEncryptedMutation(CommitBatchContext* self, + int64_t tenantId, + const MutationRef* mutation, + Optional* encryptedMutationOpt, + Arena* arena) { + state MutationRef encryptedMutation = encryptedMutationOpt->get(); + state const BlobCipherEncryptHeader* header; + + static_assert(TenantInfo::INVALID_TENANT == INVALID_ENCRYPT_DOMAIN_ID); + ASSERT(self->pProxyCommitData->isEncryptionEnabled); + ASSERT(g_network && g_network->isSimulated()); + + ASSERT(encryptedMutation.isEncrypted()); + Reference const> dbInfo = self->pProxyCommitData->db; + header = encryptedMutation.encryptionHeader(); + TextAndHeaderCipherKeys cipherKeys = wait(getEncryptCipherKeys(dbInfo, *header, BlobCipherMetrics::TLOG)); + MutationRef decryptedMutation = encryptedMutation.decrypt(cipherKeys, *arena, BlobCipherMetrics::TLOG); + + ASSERT(decryptedMutation.param1 == mutation->param1 && decryptedMutation.param2 == mutation->param2 && + decryptedMutation.type == mutation->type); + CODE_PROBE(true, "encrypting non-metadata mutations"); + self->toCommit.writeTypedMessage(encryptedMutation); + return encryptedMutation; +} + +ACTOR Future writeMutationFetchEncryptKey(CommitBatchContext* self, + int64_t tenantId, + const MutationRef* mutation, + Arena* arena) { + + state EncryptCipherDomainId domainId = tenantId; + state MutationRef encryptedMutation; + + static_assert(TenantInfo::INVALID_TENANT == INVALID_ENCRYPT_DOMAIN_ID); + ASSERT(self->pProxyCommitData->isEncryptionEnabled); + ASSERT_NE((MutationRef::Type)mutation->type, MutationRef::Type::ClearRange); + + std::pair p = + getEncryptDetailsFromMutationRef(self->pProxyCommitData, *mutation); + domainId = p.second; + Reference cipherKey = + wait(getLatestEncryptCipherKey(self->pProxyCommitData->db, domainId, p.first, BlobCipherMetrics::TLOG)); + self->cipherKeys[domainId] = cipherKey; + + CODE_PROBE(true, "Raw access mutation encryption"); + ASSERT_NE(domainId, INVALID_ENCRYPT_DOMAIN_ID); + encryptedMutation = mutation->encrypt(self->cipherKeys, domainId, *arena, BlobCipherMetrics::TLOG); + self->toCommit.writeTypedMessage(encryptedMutation); + return encryptedMutation; +} + +Future writeMutation(CommitBatchContext* self, + int64_t tenantId, + const MutationRef* mutation, + Optional* encryptedMutationOpt, + Arena* arena) { static_assert(TenantInfo::INVALID_TENANT == INVALID_ENCRYPT_DOMAIN_ID); + // WriteMutation routine is responsible for appending mutations to be persisted in TLog, the operation + // isn't a 'blocking' operation, except for few cases when Encryption is supported by the cluster such + // as: + // 1. Fetch encryption keys to encrypt the mutation. + // 2. Split ClearRange mutation to respect Encryption domain boundaries. + // 3. Ensure sanity of already encrypted mutation - simulation limited check. + // + // Approach optimizes "fast" path by avoiding alloc/dealloc overhead due to be ACTOR framework support, + // the penalty happens iff any of above conditions are met. Otherwise, corresponding handle routine (ACTOR + // compliant) gets invoked ("slow path"). + if (self->pProxyCommitData->isEncryptionEnabled) { - state EncryptCipherDomainId domainId = tenantId; - state MutationRef encryptedMutation; + EncryptCipherDomainId domainId = tenantId; + MutationRef encryptedMutation; CODE_PROBE(self->pProxyCommitData->db->get().client.tenantMode == TenantMode::DISABLED, "using disabled tenant mode"); CODE_PROBE(self->pProxyCommitData->db->get().client.tenantMode == TenantMode::OPTIONAL_TENANT, @@ -1279,13 +1344,7 @@ ACTOR Future writeMutation(CommitBatchContext* self, ASSERT(encryptedMutation.isEncrypted()); // During simulation check whether the encrypted mutation matches the decrpyted mutation if (g_network && g_network->isSimulated()) { - Reference const> dbInfo = self->pProxyCommitData->db; - state const BlobCipherEncryptHeader* header = encryptedMutation.encryptionHeader(); - TextAndHeaderCipherKeys cipherKeys = - wait(getEncryptCipherKeys(dbInfo, *header, BlobCipherMetrics::TLOG)); - MutationRef decryptedMutation = encryptedMutation.decrypt(cipherKeys, *arena, BlobCipherMetrics::TLOG); - ASSERT(decryptedMutation.param1 == mutation->param1 && decryptedMutation.param2 == mutation->param2 && - decryptedMutation.type == mutation->type); + return writeMutationEncryptedMutation(self, tenantId, mutation, encryptedMutationOpt, arena); } } else { if (domainId == INVALID_ENCRYPT_DOMAIN_ID) { @@ -1294,9 +1353,7 @@ ACTOR Future writeMutation(CommitBatchContext* self, domainId = p.second; if (self->cipherKeys.find(domainId) == self->cipherKeys.end()) { - Reference cipherKey = wait(getLatestEncryptCipherKey( - self->pProxyCommitData->db, domainId, p.first, BlobCipherMetrics::TLOG)); - self->cipherKeys[domainId] = cipherKey; + return writeMutationFetchEncryptKey(self, tenantId, mutation, arena); } CODE_PROBE(true, "Raw access mutation encryption"); @@ -1308,10 +1365,10 @@ ACTOR Future writeMutation(CommitBatchContext* self, ASSERT(encryptedMutation.isEncrypted()); CODE_PROBE(true, "encrypting non-metadata mutations"); self->toCommit.writeTypedMessage(encryptedMutation); - return encryptedMutation; + return std::variant>{ encryptedMutation }; } else { self->toCommit.writeTypedMessage(*mutation); - return *mutation; + return std::variant>{ *mutation }; } } @@ -1399,8 +1456,10 @@ ACTOR Future assignMutationsToStorageServers(CommitBatchContext* self) { if (encryptedMutation.present()) { ASSERT(encryptedMutation.get().isEncrypted()); } - MutationRef tempMutation = wait(writeMutation(self, tenantId, &m, &encryptedMutation, &arena)); - writtenMutation = tempMutation; + WriteMutationRefVar var = wait(writeMutation(self, tenantId, &m, &encryptedMutation, &arena)); + // FIXME: Remove assert once ClearRange RAW_ACCESS usecase handling is done + ASSERT(std::holds_alternative(var)); + writtenMutation = std::get(var); } else if (m.type == MutationRef::ClearRange) { KeyRangeRef clearRange(KeyRangeRef(m.param1, m.param2)); auto ranges = pProxyCommitData->keyInfo.intersectingRanges(clearRange); @@ -1453,8 +1512,10 @@ ACTOR Future assignMutationsToStorageServers(CommitBatchContext* self) { if (pProxyCommitData->needsCacheTag(clearRange)) { self->toCommit.addTag(cacheTag); } - MutationRef tempMutation = wait(writeMutation(self, tenantId, &m, &encryptedMutation, &arena)); - writtenMutation = tempMutation; + WriteMutationRefVar var = wait(writeMutation(self, tenantId, &m, &encryptedMutation, &arena)); + // FIXME: Remove assert once ClearRange RAW_ACCESS usecase handling is done + ASSERT(std::holds_alternative(var)); + writtenMutation = std::get(var); } else { UNREACHABLE(); } @@ -1505,8 +1566,8 @@ ACTOR Future assignMutationsToStorageServers(CommitBatchContext* self) { MutationRef backupMutation( MutationRef::Type::ClearRange, intersectionRange.begin, intersectionRange.end); - // TODO (Nim): Currently clear ranges are encrypted using the default encryption key, this must be - // changed to account for clear ranges which span tenant boundaries + // TODO (Nim): Currently clear ranges are encrypted using the default encryption key, this must + // be changed to account for clear ranges which span tenant boundaries if (self->pProxyCommitData->isEncryptionEnabled) { CODE_PROBE(true, "encrypting clear range backup mutation"); if (backupMutation.param1 == m.param1 && backupMutation.param2 == m.param2 && @@ -1636,12 +1697,13 @@ ACTOR Future postResolution(CommitBatchContext* self) { }); state int i = 0; for (i = 0; i < pProxyCommitData->idempotencyClears.size(); i++) { - MutationRef& m = pProxyCommitData->idempotencyClears[i]; - auto& tags = pProxyCommitData->tagsForKey(m.param1); + auto& tags = pProxyCommitData->tagsForKey(pProxyCommitData->idempotencyClears[i].param1); self->toCommit.addTags(tags); // We already have an arena with an appropriate lifetime handy Arena& arena = pProxyCommitData->idempotencyClears.arena(); - wait(success(writeMutation(self, SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_ID, &m, nullptr, &arena))); + WriteMutationRefVar var = wait(writeMutation( + self, SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_ID, &pProxyCommitData->idempotencyClears[i], nullptr, &arena)); + ASSERT(std::holds_alternative(var)); } pProxyCommitData->idempotencyClears = Standalone>(); From cf97541135e6ac4a47e5ce17752c5954961eff35 Mon Sep 17 00:00:00 2001 From: Lukas Joswiak Date: Mon, 31 Oct 2022 17:29:10 -0700 Subject: [PATCH 24/33] Fix exclude status of machines in status json Previously, `status json` would report a machine as excluded when any of its processes were excluded. But a machine should only be reported as excluded when all of its processes are excluded. I tested this change by running a small, three process test cluster. I excluded one machine, and verified `status json` reported the overall machine as not excluded. --- fdbserver/Status.actor.cpp | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/fdbserver/Status.actor.cpp b/fdbserver/Status.actor.cpp index fa0da0f974..616c34e9e5 100644 --- a/fdbserver/Status.actor.cpp +++ b/fdbserver/Status.actor.cpp @@ -307,7 +307,7 @@ JsonBuilderObject machineStatusFetcher(WorkerEvents mMetrics, // map from machine networkAddress to datacenter ID std::map dcIds; std::map locality; - std::map notExcludedMap; + std::map excludedMap; std::map workerContribMap; std::map machineJsonMap; @@ -377,7 +377,7 @@ JsonBuilderObject machineStatusFetcher(WorkerEvents mMetrics, statusObj["network"] = networkObj; if (configuration.present()) { - notExcludedMap[machineId] = + excludedMap[machineId] = true; // Will be set to false below if this or any later process is not excluded } @@ -385,18 +385,21 @@ JsonBuilderObject machineStatusFetcher(WorkerEvents mMetrics, machineJsonMap[machineId] = statusObj; } - // FIXME: this will not catch if the secondary address of the process was excluded NetworkAddressList tempList; tempList.address = it->first; - bool excludedServer = false; - bool excludedLocality = false; - if (configuration.present() && configuration.get().isExcludedServer(tempList)) - excludedServer = true; + bool excludedServer = true; + bool excludedLocality = true; + if (configuration.present() && !configuration.get().isExcludedServer(tempList)) + excludedServer = false; if (locality.count(it->first) && configuration.present() && - configuration.get().isMachineExcluded(locality[it->first])) - excludedLocality = true; + !configuration.get().isMachineExcluded(locality[it->first])) + excludedLocality = false; - notExcludedMap[machineId] = excludedServer || excludedLocality; + // If any server is not excluded, set the overall exclusion status + // of the machine to false. + if (!excludedServer && !excludedLocality) { + excludedMap[machineId] = false; + } workerContribMap[machineId]++; } catch (Error&) { ++failed; @@ -407,7 +410,7 @@ JsonBuilderObject machineStatusFetcher(WorkerEvents mMetrics, for (auto& mapPair : machineJsonMap) { auto& machineId = mapPair.first; auto& jsonItem = machineJsonMap[machineId]; - jsonItem["excluded"] = notExcludedMap[machineId]; + jsonItem["excluded"] = excludedMap[machineId]; jsonItem["contributing_workers"] = workerContribMap[machineId]; machineMap[machineId] = jsonItem; } From 248d4e95dd0bba3de3f6eb3cbdbbf1e73b51c486 Mon Sep 17 00:00:00 2001 From: "A.J. Beamon" Date: Fri, 4 Nov 2022 10:02:35 -0700 Subject: [PATCH 25/33] Don't copy an empty string in keyAfter and singleKeyRange --- fdbclient/include/fdbclient/FDBTypes.h | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/fdbclient/include/fdbclient/FDBTypes.h b/fdbclient/include/fdbclient/FDBTypes.h index 36a12ef14b..9a2205a1f3 100644 --- a/fdbclient/include/fdbclient/FDBTypes.h +++ b/fdbclient/include/fdbclient/FDBTypes.h @@ -553,7 +553,9 @@ inline KeyRef keyAfter(const KeyRef& key, Arena& arena) { return allKeysEnd; } uint8_t* t = new (arena) uint8_t[key.size() + 1]; - memcpy(t, key.begin(), key.size()); + if (!key.empty()) { + memcpy(t, key.begin(), key.size()); + } t[key.size()] = 0; return KeyRef(t, key.size() + 1); } @@ -564,7 +566,9 @@ inline Key keyAfter(const KeyRef& key) { } inline KeyRangeRef singleKeyRange(KeyRef const& key, Arena& arena) { uint8_t* t = new (arena) uint8_t[key.size() + 1]; - memcpy(t, key.begin(), key.size()); + if (!key.empty()) { + memcpy(t, key.begin(), key.size()); + } t[key.size()] = 0; return KeyRangeRef(KeyRef(t, key.size()), KeyRef(t, key.size() + 1)); } From a9e3a6f8179df4c59b986b56fb2e528252739f6a Mon Sep 17 00:00:00 2001 From: Yao Xiao <87789492+yao-xiao-github@users.noreply.github.com> Date: Fri, 4 Nov 2022 10:41:46 -0700 Subject: [PATCH 26/33] Add bounds to iterator. (#8685) --- .../KeyValueStoreShardedRocksDB.actor.cpp | 27 ++++++++++++++----- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/fdbserver/KeyValueStoreShardedRocksDB.actor.cpp b/fdbserver/KeyValueStoreShardedRocksDB.actor.cpp index 462c42f749..1a072e2c7b 100644 --- a/fdbserver/KeyValueStoreShardedRocksDB.actor.cpp +++ b/fdbserver/KeyValueStoreShardedRocksDB.actor.cpp @@ -155,7 +155,7 @@ struct ShardedRocksDBState { std::shared_ptr rocksdb_block_cache = nullptr; -rocksdb::Slice toSlice(StringRef s) { +const rocksdb::Slice toSlice(StringRef s) { return rocksdb::Slice(reinterpret_cast(s.begin()), s.size()); } @@ -309,8 +309,20 @@ struct ReadIterator { bool inUse; std::shared_ptr iter; double creationTime; - ReadIterator(rocksdb::ColumnFamilyHandle* cf, uint64_t index, rocksdb::DB* db, rocksdb::ReadOptions& options) + KeyRange keyRange; + std::shared_ptr beginSlice, endSlice; + + ReadIterator(rocksdb::ColumnFamilyHandle* cf, uint64_t index, rocksdb::DB* db, const rocksdb::ReadOptions& options) : index(index), inUse(true), creationTime(now()), iter(db->NewIterator(options, cf)) {} + ReadIterator(rocksdb::ColumnFamilyHandle* cf, uint64_t index, rocksdb::DB* db, const KeyRange& range) + : index(index), inUse(true), creationTime(now()), keyRange(range) { + auto options = getReadOptions(); + beginSlice = std::shared_ptr(new rocksdb::Slice(toSlice(keyRange.begin))); + options.iterate_lower_bound = beginSlice.get(); + endSlice = std::shared_ptr(new rocksdb::Slice(toSlice(keyRange.end))); + options.iterate_upper_bound = endSlice.get(); + iter = std::shared_ptr(db->NewIterator(options, cf)); + } }; /* @@ -348,7 +360,8 @@ public: } // Called on every read operation. - ReadIterator getIterator() { + ReadIterator getIterator(const KeyRange& range) { + // Shared iterators are not bounded. if (SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_ITERATORS) { std::lock_guard lock(mutex); for (it = iteratorsMap.begin(); it != iteratorsMap.end(); it++) { @@ -364,7 +377,7 @@ public: return iter; } else { index++; - ReadIterator iter(cf, index, db, readRangeOptions); + ReadIterator iter(cf, index, db, range); return iter; } } @@ -511,7 +524,7 @@ struct PhysicalShard { double deleteTimeSec; }; -int readRangeInDb(PhysicalShard* shard, const KeyRangeRef& range, int rowLimit, int byteLimit, RangeResult* result) { +int readRangeInDb(PhysicalShard* shard, const KeyRangeRef range, int rowLimit, int byteLimit, RangeResult* result) { if (rowLimit == 0 || byteLimit == 0) { return 0; } @@ -523,7 +536,7 @@ int readRangeInDb(PhysicalShard* shard, const KeyRangeRef& range, int rowLimit, // When using a prefix extractor, ensure that keys are returned in order even if they cross // a prefix boundary. if (rowLimit >= 0) { - ReadIterator readIter = shard->readIterPool->getIterator(); + ReadIterator readIter = shard->readIterPool->getIterator(range); auto cursor = readIter.iter; cursor->Seek(toSlice(range.begin)); while (cursor->Valid() && toStringRef(cursor->key()) < range.end) { @@ -540,7 +553,7 @@ int readRangeInDb(PhysicalShard* shard, const KeyRangeRef& range, int rowLimit, s = cursor->status(); shard->readIterPool->returnIterator(readIter); } else { - ReadIterator readIter = shard->readIterPool->getIterator(); + ReadIterator readIter = shard->readIterPool->getIterator(range); auto cursor = readIter.iter; cursor->SeekForPrev(toSlice(range.end)); if (cursor->Valid() && toStringRef(cursor->key()) == range.end) { From 0ca344a91771d74d6fc3c62e74a9616c35450182 Mon Sep 17 00:00:00 2001 From: Yao Xiao <87789492+yao-xiao-github@users.noreply.github.com> Date: Fri, 4 Nov 2022 10:42:08 -0700 Subject: [PATCH 27/33] Add SS read range bytes metrics. (#8697) --- fdbserver/include/fdbserver/StorageMetrics.actor.h | 2 ++ fdbserver/storageserver.actor.cpp | 13 +++++++++++++ 2 files changed, 15 insertions(+) diff --git a/fdbserver/include/fdbserver/StorageMetrics.actor.h b/fdbserver/include/fdbserver/StorageMetrics.actor.h index dc518cf318..c524d36f0a 100644 --- a/fdbserver/include/fdbserver/StorageMetrics.actor.h +++ b/fdbserver/include/fdbserver/StorageMetrics.actor.h @@ -44,6 +44,8 @@ const StringRef TLOG_MSGS_PTREE_UPDATES_LATENCY_HISTOGRAM = "TLogMsgsPTreeUpdate const StringRef STORAGE_UPDATES_DURABLE_LATENCY_HISTOGRAM = "StorageUpdatesDurableLatency"_sr; const StringRef STORAGE_COMMIT_LATENCY_HISTOGRAM = "StorageCommitLatency"_sr; const StringRef SS_DURABLE_VERSION_UPDATE_LATENCY_HISTOGRAM = "SSDurableVersionUpdateLatency"_sr; +const StringRef SS_READ_RANGE_BYTES_RETURNED_HISTOGRAM = "SSReadRangeBytesReturned"_sr; +const StringRef SS_READ_RANGE_BYTES_LIMIT_HISTOGRAM = "SSReadRangeBytesLimit"_sr; struct StorageMetricSample { IndexedSet sample; diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 90f8350b82..ed257dd207 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -735,6 +735,9 @@ public: Reference storageUpdatesDurableLatencyHistogram; Reference storageCommitLatencyHistogram; Reference ssDurableVersionUpdateLatencyHistogram; + // Histograms of requests sent to KVS. + Reference readRangeBytesReturnedHistogram; + Reference readRangeBytesLimitHistogram; // watch map operations Reference getWatchMetadata(KeyRef key) const; @@ -1296,6 +1299,12 @@ public: ssDurableVersionUpdateLatencyHistogram(Histogram::getHistogram(STORAGESERVER_HISTOGRAM_GROUP, SS_DURABLE_VERSION_UPDATE_LATENCY_HISTOGRAM, Histogram::Unit::microseconds)), + readRangeBytesReturnedHistogram(Histogram::getHistogram(STORAGESERVER_HISTOGRAM_GROUP, + SS_READ_RANGE_BYTES_RETURNED_HISTOGRAM, + Histogram::Unit::countLinear)), + readRangeBytesLimitHistogram(Histogram::getHistogram(STORAGESERVER_HISTOGRAM_GROUP, + SS_READ_RANGE_BYTES_LIMIT_HISTOGRAM, + Histogram::Unit::countLinear)), tag(invalidTag), poppedAllAfter(std::numeric_limits::max()), cpuUsage(0.0), diskUsage(0.0), storage(this, storage), shardChangeCounter(0), lastTLogVersion(0), lastVersionWithData(0), restoredVersion(0), prevVersion(0), rebootAfterDurableVersion(std::numeric_limits::max()), @@ -3460,6 +3469,8 @@ ACTOR Future readRange(StorageServer* data, RangeResult atStorageVersion = wait(data->storage.readRange(KeyRangeRef(readBegin, readEnd), limit, *pLimitBytes, options)); data->counters.kvScanBytes += atStorageVersion.logicalSize(); + data->readRangeBytesReturnedHistogram->sample(atStorageVersion.logicalSize()); + data->readRangeBytesLimitHistogram->sample(*pLimitBytes); ASSERT(atStorageVersion.size() <= limit); if (data->storageVersion() > version) { @@ -3555,6 +3566,8 @@ ACTOR Future readRange(StorageServer* data, RangeResult atStorageVersion = wait(data->storage.readRange(KeyRangeRef(readBegin, readEnd), limit, *pLimitBytes, options)); data->counters.kvScanBytes += atStorageVersion.logicalSize(); + data->readRangeBytesReturnedHistogram->sample(atStorageVersion.logicalSize()); + data->readRangeBytesLimitHistogram->sample(*pLimitBytes); ASSERT(atStorageVersion.size() <= -limit); if (data->storageVersion() > version) From 8de6793aa3523fdcf0768687cfe9ffd8cb91ab5e Mon Sep 17 00:00:00 2001 From: Yao Xiao <87789492+yao-xiao-github@users.noreply.github.com> Date: Fri, 4 Nov 2022 11:05:53 -0700 Subject: [PATCH 28/33] Add metrics for read range. (#8692) --- fdbserver/KeyValueStoreShardedRocksDB.actor.cpp | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/fdbserver/KeyValueStoreShardedRocksDB.actor.cpp b/fdbserver/KeyValueStoreShardedRocksDB.actor.cpp index 1a072e2c7b..4cd1410c9e 100644 --- a/fdbserver/KeyValueStoreShardedRocksDB.actor.cpp +++ b/fdbserver/KeyValueStoreShardedRocksDB.actor.cpp @@ -2163,10 +2163,16 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore { : keys(keys), rowLimit(rowLimit), byteLimit(byteLimit), startTime(timer_monotonic()), getHistograms( (deterministicRandom()->random01() < SERVER_KNOBS->ROCKSDB_HISTOGRAMS_SAMPLE_RATE) ? true : false) { + std::set usedShards; for (const DataShard* shard : shards) { - if (shard != nullptr) { - shardRanges.emplace_back(shard->physicalShard, keys & shard->range); - } + ASSERT(shard); + shardRanges.emplace_back(shard->physicalShard, keys & shard->range); + usedShards.insert(shard->physicalShard); + } + if (usedShards.size() != shards.size()) { + TraceEvent("ReadRangeMetrics") + .detail("NumPhysicalShards", usedShards.size()) + .detail("NumDataShards", shards.size()); } } double getTimeEstimate() const override { return SERVER_KNOBS->READ_RANGE_TIME_ESTIMATE; } From bed057e2d351cd1a36fff4fd9205f7011db17b9a Mon Sep 17 00:00:00 2001 From: Josh Slocum Date: Fri, 4 Nov 2022 13:26:44 -0500 Subject: [PATCH 29/33] moving fdbblob in sim folder for simulations (#8701) --- fdbclient/ServerKnobs.cpp | 2 +- fdbserver/fdbserver.actor.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index c15692a5e3..d7849395fd 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -960,7 +960,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( KMS_CONNECTOR_TYPE, "RESTKmsConnector" ); // Blob granlues - init( BG_URL, isSimulated ? "file://fdbblob/" : "" ); // TODO: store in system key space or something, eventually + init( BG_URL, isSimulated ? "file://simfdb/fdbblob/" : "" ); // TODO: store in system key space or something, eventually bool buggifyMediumGranules = simulationMediumShards || (randomize && BUGGIFY); // BlobGranuleVerify* simulation tests use "knobs", BlobGranuleCorrectness* use "tenant", default in real clusters is "knobs" init( BG_METADATA_SOURCE, "knobs" ); diff --git a/fdbserver/fdbserver.actor.cpp b/fdbserver/fdbserver.actor.cpp index 6e4a1ae579..9efa86f297 100644 --- a/fdbserver/fdbserver.actor.cpp +++ b/fdbserver/fdbserver.actor.cpp @@ -2149,7 +2149,7 @@ int main(int argc, char* argv[]) { auto dataFolder = opts.dataFolder.size() ? opts.dataFolder : "simfdb"; std::vector directories = platform::listDirectories(dataFolder); - const std::set allowedDirectories = { ".", "..", "backups", "unittests" }; + const std::set allowedDirectories = { ".", "..", "backups", "unittests", "fdbblob" }; for (const auto& dir : directories) { if (dir.size() != 32 && allowedDirectories.count(dir) == 0 && dir.find("snap") == std::string::npos) { From 32bc9b6ebbce0629ebfdb7cc5ece7e62a910998e Mon Sep 17 00:00:00 2001 From: Zhe Wu Date: Tue, 1 Nov 2022 16:02:42 -0700 Subject: [PATCH 30/33] Fix a race condition between batched peek and pop, where the server removal pop may be lost --- fdbclient/ServerKnobs.cpp | 1 + fdbclient/include/fdbclient/ServerKnobs.h | 1 + fdbserver/LogRouter.actor.cpp | 53 ++++---- fdbserver/TLogServer.actor.cpp | 127 ++++++++++---------- fdbserver/TagPartitionedLogSystem.actor.cpp | 3 +- 5 files changed, 95 insertions(+), 90 deletions(-) diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index d7849395fd..44fe801eb4 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -115,6 +115,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( ENABLE_DETAILED_TLOG_POP_TRACE, false ); if ( randomize && BUGGIFY ) ENABLE_DETAILED_TLOG_POP_TRACE = true; init( PEEK_BATCHING_EMPTY_MSG, false ); if ( randomize && BUGGIFY ) PEEK_BATCHING_EMPTY_MSG = true; init( PEEK_BATCHING_EMPTY_MSG_INTERVAL, 0.001 ); if ( randomize && BUGGIFY ) PEEK_BATCHING_EMPTY_MSG_INTERVAL = 0.01; + init( POP_FROM_LOG_DELAY, 1 ); if ( randomize && BUGGIFY ) POP_FROM_LOG_DELAY = 0; // disk snapshot max timeout, to be put in TLog, storage and coordinator nodes init( MAX_FORKED_PROCESS_OUTPUT, 1024 ); diff --git a/fdbclient/include/fdbclient/ServerKnobs.h b/fdbclient/include/fdbclient/ServerKnobs.h index 2a530febad..de0a6da061 100644 --- a/fdbclient/include/fdbclient/ServerKnobs.h +++ b/fdbclient/include/fdbclient/ServerKnobs.h @@ -110,6 +110,7 @@ public: double BLOCKING_PEEK_TIMEOUT; bool PEEK_BATCHING_EMPTY_MSG; double PEEK_BATCHING_EMPTY_MSG_INTERVAL; + double POP_FROM_LOG_DELAY; // Data distribution queue double HEALTH_POLL_TIME; diff --git a/fdbserver/LogRouter.actor.cpp b/fdbserver/LogRouter.actor.cpp index 399d820608..f09cd12ea6 100644 --- a/fdbserver/LogRouter.actor.cpp +++ b/fdbserver/LogRouter.actor.cpp @@ -514,35 +514,38 @@ Future logRouterPeekMessages(PromiseType replyPromise, } state double startTime = now(); - - Version poppedVer = poppedVersion(self, reqTag); - - if (poppedVer > reqBegin || reqBegin < self->startVersion) { - // This should only happen if a packet is sent multiple times and the reply is not needed. - // Since we are using popped differently, do not send a reply. - TraceEvent(SevWarnAlways, "LogRouterPeekPopped", self->dbgid) - .detail("Begin", reqBegin) - .detail("Popped", poppedVer) - .detail("Start", self->startVersion); - if (std::is_same>::value) { - // kills logRouterPeekStream actor, otherwise that actor becomes stuck - throw operation_obsolete(); - } - replyPromise.send(Never()); - if (reqSequence.present()) { - auto& trackerData = self->peekTracker[peekId]; - auto& sequenceData = trackerData.sequence_version[sequence + 1]; - if (!sequenceData.isSet()) { - sequenceData.send(std::make_pair(reqBegin, reqOnlySpilled)); - } - } - return Void(); - } - + state Version poppedVer; state Version endVersion; // Run the peek logic in a loop to account for the case where there is no data to return to the caller, and we may // want to wait a little bit instead of just sending back an empty message. This feature is controlled by a knob. loop { + + poppedVer = poppedVersion(self, reqTag); + + if (poppedVer > reqBegin || reqBegin < self->startVersion) { + // This should only happen if a packet is sent multiple times and the reply is not needed. + // Since we are using popped differently, do not send a reply. + TraceEvent(SevWarnAlways, "LogRouterPeekPopped", self->dbgid) + .detail("Begin", reqBegin) + .detail("Popped", poppedVer) + .detail("Start", self->startVersion); + if (std::is_same>::value) { + // kills logRouterPeekStream actor, otherwise that actor becomes stuck + throw operation_obsolete(); + } + replyPromise.send(Never()); + if (reqSequence.present()) { + auto& trackerData = self->peekTracker[peekId]; + auto& sequenceData = trackerData.sequence_version[sequence + 1]; + if (!sequenceData.isSet()) { + sequenceData.send(std::make_pair(reqBegin, reqOnlySpilled)); + } + } + return Void(); + } + + ASSERT_WE_THINK(reqBegin >= poppedVersion(self, reqTag) && reqBegin >= self->startVersion); + endVersion = self->version.get() + 1; peekMessagesFromMemory(self, reqTag, reqBegin, messages, endVersion); diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index eb3c498e11..5edffc6d31 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -1802,75 +1802,76 @@ Future tLogPeekMessages(PromiseType replyPromise, } state double workStart = now(); - - state Version poppedVer = poppedVersion(logData, reqTag); - - auto tagData = logData->getTagData(reqTag); - bool tagRecovered = tagData && !tagData->unpoppedRecovered; - if (SERVER_KNOBS->ENABLE_VERSION_VECTOR && poppedVer <= reqBegin && - reqBegin > logData->persistentDataDurableVersion && !reqOnlySpilled && reqTag.locality >= 0 && - !reqReturnIfBlocked && tagRecovered) { - state double startTime = now(); - // TODO (version vector) check if this should be included in "status details" json - // TODO (version vector) all tags may be too many, instead, standard deviation? - wait(waitForMessagesForTag(logData, reqTag, reqBegin, SERVER_KNOBS->BLOCKING_PEEK_TIMEOUT)); - double latency = now() - startTime; - if (logData->blockingPeekLatencies.find(reqTag) == logData->blockingPeekLatencies.end()) { - UID ssID = nondeterministicRandom()->randomUniqueID(); - std::string s = "BlockingPeekLatencies-" + reqTag.toString(); - logData->blockingPeekLatencies.try_emplace( - reqTag, s, ssID, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL, SERVER_KNOBS->LATENCY_SAMPLE_SIZE); - } - LatencySample& sample = logData->blockingPeekLatencies.at(reqTag); - sample.addMeasurement(latency); - poppedVer = poppedVersion(logData, reqTag); - } - - DebugLogTraceEvent("TLogPeekMessages2", self->dbgid) - .detail("LogId", logData->logId) - .detail("Tag", reqTag.toString()) - .detail("ReqBegin", reqBegin) - .detail("PoppedVer", poppedVer); - if (poppedVer > reqBegin) { - TLogPeekReply rep; - rep.maxKnownVersion = logData->version.get(); - rep.minKnownCommittedVersion = logData->minKnownCommittedVersion; - rep.popped = poppedVer; - rep.end = poppedVer; - rep.onlySpilled = false; - - if (reqSequence.present()) { - auto& trackerData = logData->peekTracker[peekId]; - auto& sequenceData = trackerData.sequence_version[sequence + 1]; - trackerData.lastUpdate = now(); - if (trackerData.sequence_version.size() && sequence + 1 < trackerData.sequence_version.begin()->first) { - replyPromise.sendError(operation_obsolete()); - if (!sequenceData.isSet()) - sequenceData.sendError(operation_obsolete()); - return Void(); - } - if (sequenceData.isSet()) { - if (sequenceData.getFuture().get().first != rep.end) { - CODE_PROBE(true, "tlog peek second attempt ended at a different version"); - replyPromise.sendError(operation_obsolete()); - return Void(); - } - } else { - sequenceData.send(std::make_pair(rep.end, rep.onlySpilled)); - } - rep.begin = reqBegin; - } - - replyPromise.send(rep); - return Void(); - } - + state Version poppedVer; state Version endVersion; state bool onlySpilled; // Run the peek logic in a loop to account for the case where there is no data to return to the caller, and we may // want to wait a little bit instead of just sending back an empty message. This feature is controlled by a knob. loop { + poppedVer = poppedVersion(logData, reqTag); + + if (SERVER_KNOBS->ENABLE_VERSION_VECTOR && poppedVer <= reqBegin && + reqBegin > logData->persistentDataDurableVersion && !reqOnlySpilled && reqTag.locality >= 0 && + !reqReturnIfBlocked) { + state double startTime = now(); + // TODO (version vector) check if this should be included in "status details" json + // TODO (version vector) all tags may be too many, instead, standard deviation? + wait(waitForMessagesForTag(logData, reqTag, reqBegin, SERVER_KNOBS->BLOCKING_PEEK_TIMEOUT)); + double latency = now() - startTime; + if (logData->blockingPeekLatencies.find(reqTag) == logData->blockingPeekLatencies.end()) { + UID ssID = nondeterministicRandom()->randomUniqueID(); + std::string s = "BlockingPeekLatencies-" + reqTag.toString(); + logData->blockingPeekLatencies.try_emplace( + reqTag, s, ssID, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL, SERVER_KNOBS->LATENCY_SAMPLE_SIZE); + } + LatencySample& sample = logData->blockingPeekLatencies.at(reqTag); + sample.addMeasurement(latency); + poppedVer = poppedVersion(logData, reqTag); + } + + DisabledTraceEvent("TLogPeekMessages1", self->dbgid) + .detail("LogId", logData->logId) + .detail("Tag", reqTag.toString()) + .detail("ReqBegin", reqBegin) + .detail("PoppedVer", poppedVer); + + if (poppedVer > reqBegin) { + TLogPeekReply rep; + rep.maxKnownVersion = logData->version.get(); + rep.minKnownCommittedVersion = logData->minKnownCommittedVersion; + rep.popped = poppedVer; + rep.end = poppedVer; + rep.onlySpilled = false; + + if (reqSequence.present()) { + auto& trackerData = logData->peekTracker[peekId]; + auto& sequenceData = trackerData.sequence_version[sequence + 1]; + trackerData.lastUpdate = now(); + if (trackerData.sequence_version.size() && sequence + 1 < trackerData.sequence_version.begin()->first) { + replyPromise.sendError(operation_obsolete()); + if (!sequenceData.isSet()) + sequenceData.sendError(operation_obsolete()); + return Void(); + } + if (sequenceData.isSet()) { + if (sequenceData.getFuture().get().first != rep.end) { + TEST(true); // tlog peek second attempt ended at a different version + replyPromise.sendError(operation_obsolete()); + return Void(); + } + } else { + sequenceData.send(std::make_pair(rep.end, rep.onlySpilled)); + } + rep.begin = reqBegin; + } + + replyPromise.send(rep); + return Void(); + } + + ASSERT_WE_THINK(reqBegin >= poppedVersion(logData, reqTag)); + endVersion = logData->version.get() + 1; onlySpilled = false; diff --git a/fdbserver/TagPartitionedLogSystem.actor.cpp b/fdbserver/TagPartitionedLogSystem.actor.cpp index 87fe0dea3c..48e8a070a4 100644 --- a/fdbserver/TagPartitionedLogSystem.actor.cpp +++ b/fdbserver/TagPartitionedLogSystem.actor.cpp @@ -1449,8 +1449,7 @@ void TagPartitionedLogSystem::pop(Version upTo, Tag tag, Version durableKnownCom } if (prev == 0) { // pop tag from log upto version defined in outstandingPops[].first - popActors.add( - popFromLog(this, log, tag, /*delayBeforePop*/ 1.0, /*popLogRouter=*/false)); //< FIXME: knob + popActors.add(popFromLog(this, log, tag, SERVER_KNOBS->POP_FROM_LOG_DELAY, /*popLogRouter=*/false)); } } } From 5296bb96a6e1a8796d54212888257b3c8aff382d Mon Sep 17 00:00:00 2001 From: Zhe Wu Date: Tue, 1 Nov 2022 22:28:15 -0700 Subject: [PATCH 31/33] Apply clang format --- fdbserver/LogRouter.actor.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fdbserver/LogRouter.actor.cpp b/fdbserver/LogRouter.actor.cpp index f09cd12ea6..08370ce7c2 100644 --- a/fdbserver/LogRouter.actor.cpp +++ b/fdbserver/LogRouter.actor.cpp @@ -543,7 +543,7 @@ Future logRouterPeekMessages(PromiseType replyPromise, } return Void(); } - + ASSERT_WE_THINK(reqBegin >= poppedVersion(self, reqTag) && reqBegin >= self->startVersion); endVersion = self->version.get() + 1; From fc35ed9d0a668058596bd37c18c485946d994da1 Mon Sep 17 00:00:00 2001 From: Zhe Wu Date: Wed, 2 Nov 2022 10:13:16 -0700 Subject: [PATCH 32/33] Change ASSERT_WE_THINK to ASSERT when checking that peek reply start version must be greater than latest pop version --- fdbserver/LogRouter.actor.cpp | 2 +- fdbserver/TLogServer.actor.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/fdbserver/LogRouter.actor.cpp b/fdbserver/LogRouter.actor.cpp index 08370ce7c2..51796f9fc0 100644 --- a/fdbserver/LogRouter.actor.cpp +++ b/fdbserver/LogRouter.actor.cpp @@ -544,7 +544,7 @@ Future logRouterPeekMessages(PromiseType replyPromise, return Void(); } - ASSERT_WE_THINK(reqBegin >= poppedVersion(self, reqTag) && reqBegin >= self->startVersion); + ASSERT(reqBegin >= poppedVersion(self, reqTag) && reqBegin >= self->startVersion); endVersion = self->version.get() + 1; peekMessagesFromMemory(self, reqTag, reqBegin, messages, endVersion); diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index 5edffc6d31..92acb71e06 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -1870,7 +1870,7 @@ Future tLogPeekMessages(PromiseType replyPromise, return Void(); } - ASSERT_WE_THINK(reqBegin >= poppedVersion(logData, reqTag)); + ASSERT(reqBegin >= poppedVersion(logData, reqTag)); endVersion = logData->version.get() + 1; onlySpilled = false; From ae4d66c0d7dbf457c2b403181d17a592abc102fd Mon Sep 17 00:00:00 2001 From: Zhe Wu Date: Fri, 4 Nov 2022 13:23:49 -0700 Subject: [PATCH 33/33] Update TLogServer in main --- fdbserver/TLogServer.actor.cpp | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index 92acb71e06..dd2e83e360 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -1811,9 +1811,11 @@ Future tLogPeekMessages(PromiseType replyPromise, loop { poppedVer = poppedVersion(logData, reqTag); + auto tagData = logData->getTagData(reqTag); + bool tagRecovered = tagData && !tagData->unpoppedRecovered; if (SERVER_KNOBS->ENABLE_VERSION_VECTOR && poppedVer <= reqBegin && reqBegin > logData->persistentDataDurableVersion && !reqOnlySpilled && reqTag.locality >= 0 && - !reqReturnIfBlocked) { + !reqReturnIfBlocked && tagRecovered) { state double startTime = now(); // TODO (version vector) check if this should be included in "status details" json // TODO (version vector) all tags may be too many, instead, standard deviation? @@ -1830,12 +1832,11 @@ Future tLogPeekMessages(PromiseType replyPromise, poppedVer = poppedVersion(logData, reqTag); } - DisabledTraceEvent("TLogPeekMessages1", self->dbgid) + DebugLogTraceEvent("TLogPeekMessages2", self->dbgid) .detail("LogId", logData->logId) .detail("Tag", reqTag.toString()) .detail("ReqBegin", reqBegin) .detail("PoppedVer", poppedVer); - if (poppedVer > reqBegin) { TLogPeekReply rep; rep.maxKnownVersion = logData->version.get(); @@ -1856,7 +1857,7 @@ Future tLogPeekMessages(PromiseType replyPromise, } if (sequenceData.isSet()) { if (sequenceData.getFuture().get().first != rep.end) { - TEST(true); // tlog peek second attempt ended at a different version + CODE_PROBE(true, "tlog peek second attempt ended at a different version"); replyPromise.sendError(operation_obsolete()); return Void(); }