From 3c397fe281b405c2f3d297e9519b61ed23195589 Mon Sep 17 00:00:00 2001 From: "A.J. Beamon" Date: Thu, 4 Jun 2020 15:48:06 -0700 Subject: [PATCH 01/30] When responding to OpenDatabaseCoordRequest and ElectionResultRequest immediately, don't start monitorLeaderForProxies, which will keep a process alive as a coordinator. --- fdbserver/Coordination.actor.cpp | 29 ++++++++++++++--------------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/fdbserver/Coordination.actor.cpp b/fdbserver/Coordination.actor.cpp index 6d8c16b897..541ba965a2 100644 --- a/fdbserver/Coordination.actor.cpp +++ b/fdbserver/Coordination.actor.cpp @@ -215,10 +215,6 @@ TEST_CASE("/fdbserver/Coordination/localGenerationReg/simple") { } ACTOR Future openDatabase(ClientData* db, int* clientCount, Reference> hasConnectedClients, OpenDatabaseCoordRequest req) { - if(db->clientInfo->get().read().id != req.knownClientInfoID && !db->clientInfo->get().read().forward.present()) { - req.reply.send( db->clientInfo->get() ); - return Void(); - } ++(*clientCount); hasConnectedClients->set(true); @@ -247,11 +243,6 @@ ACTOR Future openDatabase(ClientData* db, int* clientCount, Reference remoteMonitorLeader( int* clientCount, Reference> hasConnectedClients, Reference>> currentElectedLeader, ElectionResultRequest req ) { - if (currentElectedLeader->get().present() && req.knownLeader != currentElectedLeader->get().get().changeID) { - req.reply.send( currentElectedLeader->get() ); - return Void(); - } - ++(*clientCount); hasConnectedClients->set(true); @@ -293,16 +284,24 @@ ACTOR Future leaderRegister(LeaderElectionRegInterface interf, Key key) { loop choose { when ( OpenDatabaseCoordRequest req = waitNext( interf.openDatabase.getFuture() ) ) { - if(!leaderMon.isValid()) { - leaderMon = monitorLeaderForProxies(req.clusterKey, req.coordinators, &clientData, currentElectedLeader); + if (clientData.clientInfo->get().read().id != req.knownClientInfoID && !clientData.clientInfo->get().read().forward.present()) { + req.reply.send(clientData.clientInfo->get()); + } else { + if(!leaderMon.isValid()) { + leaderMon = monitorLeaderForProxies(req.clusterKey, req.coordinators, &clientData, currentElectedLeader); + } + actors.add(openDatabase(&clientData, &clientCount, hasConnectedClients, req)); } - actors.add(openDatabase(&clientData, &clientCount, hasConnectedClients, req)); } when ( ElectionResultRequest req = waitNext( interf.electionResult.getFuture() ) ) { - if(!leaderMon.isValid()) { - leaderMon = monitorLeaderForProxies(req.key, req.coordinators, &clientData, currentElectedLeader); + if (currentElectedLeader->get().present() && req.knownLeader != currentElectedLeader->get().get().changeID) { + req.reply.send(currentElectedLeader->get()); + } else { + if(!leaderMon.isValid()) { + leaderMon = monitorLeaderForProxies(req.key, req.coordinators, &clientData, currentElectedLeader); + } + actors.add(remoteMonitorLeader(&clientCount, hasConnectedClients, currentElectedLeader, req)); } - actors.add( remoteMonitorLeader( &clientCount, hasConnectedClients, currentElectedLeader, req ) ); } when ( GetLeaderRequest req = waitNext( interf.getLeader.getFuture() ) ) { if (currentNominee.present() && currentNominee.get().changeID != req.knownLeader) { From 2f85ee360a8273020824c7f8b6c9aabefe737cb7 Mon Sep 17 00:00:00 2001 From: "A.J. Beamon" Date: Thu, 4 Jun 2020 17:18:25 -0700 Subject: [PATCH 02/30] Watches could return future_version errors unnecessarily --- fdbserver/storageserver.actor.cpp | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 57691bdbb5..7ebedb11f0 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -923,16 +923,18 @@ ACTOR Future watchValue_impl( StorageServer* data, WatchValueRequest req ) if( req.debugID.present() ) g_traceBatch.addEvent("WatchValueDebug", req.debugID.get().first(), "watchValueQ.AfterVersion"); //.detail("TaskID", g_network->getCurrentTask()); + state Version minVersion = data->data().latestVersion; loop { try { - state Version latest = data->data().latestVersion; state Future watchFuture = data->watches.onChange(req.key); + state Version latest = data->version.get(); GetValueRequest getReq( req.key, latest, req.debugID ); state Future getValue = getValueQ( data, getReq ); //we are relying on the delay zero at the top of getValueQ, if removed we need one here GetValueReply reply = wait( getReq.reply.getFuture() ); //TraceEvent("WatcherCheckValue").detail("Key", req.key ).detail("Value", req.value ).detail("CurrentValue", v ).detail("Ver", latest); if(reply.error.present()) { + ASSERT(reply.error.get().code() != error_code_future_version); throw reply.error.get(); } @@ -955,7 +957,13 @@ ACTOR Future watchValue_impl( StorageServer* data, WatchValueRequest req ) ++data->numWatches; data->watchBytes += ( req.key.expectedSize() + req.value.expectedSize() + 1000 ); try { - wait( watchFuture ); + if(latest < minVersion) { + // If the version we read is less than minVersion, then we may fail to be notified of any changes that occur up to or including minVersion + // To prevent that, we'll check the key again once the version reaches our minVersion + watchFuture = watchFuture || data->version.whenAtLeast(minVersion); + } + wait(watchFuture); + wait(data->version.whenAtLeast(data->data().latestVersion)); --data->numWatches; data->watchBytes -= ( req.key.expectedSize() + req.value.expectedSize() + 1000 ); } catch( Error &e ) { From c00e6e7ad98ae0c3d8f9a302eacac4915d5a7e16 Mon Sep 17 00:00:00 2001 From: "A.J. Beamon" Date: Fri, 5 Jun 2020 11:24:47 -0700 Subject: [PATCH 03/30] Reorder call to setting up watch future with waiting for data->version to advance to avoid missing potential mutations. Also add tests for and fix the case where reading the value throws a transaction_too_old error. --- fdbserver/storageserver.actor.cpp | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 7ebedb11f0..ee357df8c9 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -924,10 +924,11 @@ ACTOR Future watchValue_impl( StorageServer* data, WatchValueRequest req ) g_traceBatch.addEvent("WatchValueDebug", req.debugID.get().first(), "watchValueQ.AfterVersion"); //.detail("TaskID", g_network->getCurrentTask()); state Version minVersion = data->data().latestVersion; + state Future watchFuture = data->watches.onChange(req.key); loop { try { - state Future watchFuture = data->watches.onChange(req.key); state Version latest = data->version.get(); + TEST(latest >= minVersion && latest < data->data().latestVersion); // Starting watch loop with latestVersion > data->version GetValueRequest getReq( req.key, latest, req.debugID ); state Future getValue = getValueQ( data, getReq ); //we are relying on the delay zero at the top of getValueQ, if removed we need one here GetValueReply reply = wait( getReq.reply.getFuture() ); @@ -937,6 +938,9 @@ ACTOR Future watchValue_impl( StorageServer* data, WatchValueRequest req ) ASSERT(reply.error.get().code() != error_code_future_version); throw reply.error.get(); } + if(BUGGIFY) { + throw transaction_too_old(); + } debugMutation("ShardWatchValue", latest, MutationRef(MutationRef::DebugKey, req.key, reply.value.present() ? StringRef( reply.value.get() ) : LiteralStringRef("") ) ); @@ -962,8 +966,11 @@ ACTOR Future watchValue_impl( StorageServer* data, WatchValueRequest req ) // To prevent that, we'll check the key again once the version reaches our minVersion watchFuture = watchFuture || data->version.whenAtLeast(minVersion); } + if(BUGGIFY) { + // Simulate a trigger on the watch that results in the loop going around without the value changing + watchFuture = watchFuture || delay(deterministicRandom()->random01()); + } wait(watchFuture); - wait(data->version.whenAtLeast(data->data().latestVersion)); --data->numWatches; data->watchBytes -= ( req.key.expectedSize() + req.value.expectedSize() + 1000 ); } catch( Error &e ) { @@ -972,9 +979,15 @@ ACTOR Future watchValue_impl( StorageServer* data, WatchValueRequest req ) throw; } } catch( Error &e ) { - if( e.code() != error_code_transaction_too_old ) + if( e.code() != error_code_transaction_too_old ) { throw; + } + + TEST(true); // Reading a watched key failed with transaction_too_old } + + watchFuture = data->watches.onChange(req.key); + wait(data->version.whenAtLeast(data->data().latestVersion)); } } catch (Error& e) { if(!canReplyWith(e)) From e10704fd76138c6519a4bf743d7698b43f9eb9df Mon Sep 17 00:00:00 2001 From: "A.J. Beamon" Date: Tue, 9 Jun 2020 14:56:21 -0700 Subject: [PATCH 04/30] Cherry-pick region related status changes from 6.3 --- fdbcli/fdbcli.actor.cpp | 71 ++++++++++++++++++++++++++++++++++++++ fdbclient/Schemas.cpp | 1 + fdbserver/Status.actor.cpp | 40 +++++++++++++++++++-- 3 files changed, 110 insertions(+), 2 deletions(-) diff --git a/fdbcli/fdbcli.actor.cpp b/fdbcli/fdbcli.actor.cpp index 8e2ad1319d..5d685c1400 100644 --- a/fdbcli/fdbcli.actor.cpp +++ b/fdbcli/fdbcli.actor.cpp @@ -945,7 +945,11 @@ void printStatus(StatusObjectReader statusObj, StatusClient::StatusLevel level, StatusObjectReader statusObjConfig; StatusArray excludedServersArr; + Optional activePrimaryDC; + if (statusObjCluster.has("active_primary_dc")) { + activePrimaryDC = statusObjCluster["active_primary_dc"].get_str(); + } if (statusObjCluster.get("configuration", statusObjConfig)) { if (statusObjConfig.has("excluded_servers")) excludedServersArr = statusObjConfig.last().get_array(); @@ -1001,6 +1005,73 @@ void printStatus(StatusObjectReader statusObj, StatusClient::StatusLevel level, if (statusObjConfig.get("log_routers", intVal)) outputString += format("\n Desired Log Routers - %d", intVal); + + outputString += "\n Usable Regions - "; + if (statusObjConfig.get("usable_regions", intVal)) { + outputString += std::to_string(intVal); + } else { + outputString += "unknown"; + } + + StatusArray regions; + if (statusObjConfig.has("regions")) { + outputString += "\n Regions: "; + regions = statusObjConfig["regions"].get_array(); + bool isPrimary = false; + std::vector regionSatelliteDCs; + std::string regionDC; + for (StatusObjectReader region : regions) { + for (StatusObjectReader dc : region["datacenters"].get_array()) { + if (!dc.has("satellite")) { + regionDC = dc["id"].get_str(); + if (activePrimaryDC.present() && dc["id"].get_str() == activePrimaryDC.get()) { + isPrimary = true; + } + } else if (dc["satellite"].get_int() == 1) { + regionSatelliteDCs.push_back(dc["id"].get_str()); + } + } + if (activePrimaryDC.present()) { + if (isPrimary) { + outputString += "\n Primary -"; + } else { + outputString += "\n Remote -"; + } + } else { + outputString += "\n Region -"; + } + outputString += format("\n Datacenter - %s", regionDC.c_str()); + if (regionSatelliteDCs.size() > 0) { + outputString += "\n Satellite datacenters - "; + for (int i = 0; i < regionSatelliteDCs.size(); i++) { + if (i != regionSatelliteDCs.size() - 1) { + outputString += format("%s, ", regionSatelliteDCs[i].c_str()); + } else { + outputString += format("%s", regionSatelliteDCs[i].c_str()); + } + } + } + isPrimary = false; + if (region.get("satellite_redundancy_mode", strVal)) { + outputString += format("\n Satellite Redundancy Mode - %s", strVal.c_str()); + } + if (region.get("satellite_anti_quorum", intVal)) { + outputString += format("\n Satellite Anti Quorum - %d", intVal); + } + if (region.get("satellite_logs", intVal)) { + outputString += format("\n Satellite Logs - %d", intVal); + } + if (region.get("satellite_log_policy", strVal)) { + outputString += format("\n Satellite Log Policy - %s", strVal.c_str()); + } + if (region.get("satellite_log_replicas", intVal)) { + outputString += format("\n Satellite Log Replicas - %d", intVal); + } + if (region.get("satellite_usable_dcs", intVal)) { + outputString += format("\n Satellite Usable DCs - %d", intVal); + } + } + } } catch (std::runtime_error& ) { outputString = outputStringCache; diff --git a/fdbclient/Schemas.cpp b/fdbclient/Schemas.cpp index c8c26d158f..b9c8c34d0c 100644 --- a/fdbclient/Schemas.cpp +++ b/fdbclient/Schemas.cpp @@ -520,6 +520,7 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema( "data_distribution_disabled_for_ss_failures":true, "data_distribution_disabled_for_rebalance":true, "data_distribution_disabled":true, + "active_primary_dc":"pv", "configuration":{ "log_anti_quorum":0, "log_replicas":2, diff --git a/fdbserver/Status.actor.cpp b/fdbserver/Status.actor.cpp index 25cd246693..72ade55b7f 100644 --- a/fdbserver/Status.actor.cpp +++ b/fdbserver/Status.actor.cpp @@ -2145,6 +2145,35 @@ ACTOR Future lockedStatusFetcher(Reference> getActivePrimaryDC(Database cx, JsonBuilderArray* messages) { + state ReadYourWritesTransaction tr(cx); + + state Future readTimeout = delay(5); // so that we won't loop forever + loop { + try { + if (readTimeout.isReady()) { + throw timed_out(); + } + tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS); + tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + Optional res = wait(timeoutError(tr.get(primaryDatacenterKey), 5)); + if (!res.present()) { + messages->push_back( + JsonString::makeMessage("primary_dc_missing", "Unable to determine primary datacenter.")); + } + return res; + } catch (Error& e) { + if (e.code() == error_code_timed_out) { + messages->push_back( + JsonString::makeMessage("fetch_primary_dc_timedout", "Fetching primary DC timed out.")); + return Optional(); + } else { + wait(tr.onError(e)); + } + } + } +} + // constructs the cluster section of the json status output ACTOR Future clusterGetStatus( Reference>> db, @@ -2323,6 +2352,7 @@ ACTOR Future clusterGetStatus( state Future>>> proxyFuture = errorOr(getProxiesAndMetrics(db, address_workers)); state int minReplicasRemaining = -1; + state Future> primaryDCFO = getActivePrimaryDC(cx, &messages); std::vector> futures2; futures2.push_back(dataStatusFetcher(ddWorker, configuration.get(), &minReplicasRemaining)); futures2.push_back(workloadStatusFetcher(db, workers, mWorker, rkWorker, &qos, &data_overlay, &status_incomplete_reasons, storageServerFuture)); @@ -2341,11 +2371,17 @@ ACTOR Future clusterGetStatus( statusObj["fault_tolerance"] = faultToleranceStatusFetcher(configuration.get(), coordinators, workers, extraTlogEligibleZones, minReplicasRemaining, loadResult.present() && loadResult.get().healthyZone.present()); } - JsonBuilderObject configObj = configurationFetcher(configuration, coordinators, &status_incomplete_reasons); + state JsonBuilderObject configObj = + configurationFetcher(configuration, coordinators, &status_incomplete_reasons); + wait(success(primaryDCFO)); + if (primaryDCFO.get().present()) { + statusObj["active_primary_dc"] = primaryDCFO.get().get(); + } // configArr could be empty - if (!configObj.empty()) + if (!configObj.empty()) { statusObj["configuration"] = configObj; + } // workloadStatusFetcher returns the workload section but also optionally writes the qos section and adds to the data_overlay object if (!workerStatuses[1].empty()) From 9bc7eaf55a807b205eea169255d2d1c68fb579c7 Mon Sep 17 00:00:00 2001 From: "A.J. Beamon" Date: Tue, 9 Jun 2020 14:57:14 -0700 Subject: [PATCH 05/30] Add missing status field to documentation --- documentation/sphinx/source/mr-status-json-schemas.rst.inc | 1 + 1 file changed, 1 insertion(+) diff --git a/documentation/sphinx/source/mr-status-json-schemas.rst.inc b/documentation/sphinx/source/mr-status-json-schemas.rst.inc index d66ed516bb..479ecfab47 100644 --- a/documentation/sphinx/source/mr-status-json-schemas.rst.inc +++ b/documentation/sphinx/source/mr-status-json-schemas.rst.inc @@ -494,6 +494,7 @@ "data_distribution_disabled_for_ss_failures":true, "data_distribution_disabled_for_rebalance":true, "data_distribution_disabled":true, + "active_primary_dc":"pv", "configuration":{ "log_anti_quorum":0, "log_replicas":2, From c3c1fd5a471d1d298e0e22446aa931442b5020e0 Mon Sep 17 00:00:00 2001 From: "A.J. Beamon" Date: Tue, 9 Jun 2020 16:13:36 -0700 Subject: [PATCH 06/30] Add a release note. --- documentation/sphinx/source/release-notes.rst | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/documentation/sphinx/source/release-notes.rst b/documentation/sphinx/source/release-notes.rst index 34ef860311..8c877a92b3 100644 --- a/documentation/sphinx/source/release-notes.rst +++ b/documentation/sphinx/source/release-notes.rst @@ -2,6 +2,14 @@ Release Notes ############# +6.2.23 +====== + +Status +------ + +* Added ``cluster.active_primary_dc`` that indicates which datacenter is serving as the primary datacenter in multi-region setups. `(PR #3320) `_ + 6.2.22 ====== From c5e2accdaa2119038c474dc18bdfef9d3e2cdefe Mon Sep 17 00:00:00 2001 From: "A.J. Beamon" Date: Wed, 10 Jun 2020 11:22:02 -0700 Subject: [PATCH 07/30] Add a missing release note to 6.2.21 --- documentation/sphinx/source/release-notes.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/documentation/sphinx/source/release-notes.rst b/documentation/sphinx/source/release-notes.rst index 34ef860311..84ce055aef 100644 --- a/documentation/sphinx/source/release-notes.rst +++ b/documentation/sphinx/source/release-notes.rst @@ -21,6 +21,7 @@ Fixes * ``fdbrestore`` prefix options required exactly a single hyphen instead of the standard two. `(PR #3056) `_ * Commits could stall on a newly elected proxy because of inaccurate compute estimates. `(PR #3123) `_ * A transaction class process with a bad disk could be repeatedly recruited as a transaction log. `(PR #3268) `_ +* Fix a potential race condition that could lead to undefined behavior when connecting to a database using the multi-version client API. `(PR #3265) `_ Features -------- From 25f59e84331effd9fa4745a5373c35cbc0a62e1f Mon Sep 17 00:00:00 2001 From: "A.J. Beamon" Date: Wed, 10 Jun 2020 11:32:15 -0700 Subject: [PATCH 08/30] Add some more release notes for 6.3.1 --- documentation/sphinx/source/release-notes.rst | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/documentation/sphinx/source/release-notes.rst b/documentation/sphinx/source/release-notes.rst index 23daafe517..78f6ec17a9 100644 --- a/documentation/sphinx/source/release-notes.rst +++ b/documentation/sphinx/source/release-notes.rst @@ -97,6 +97,12 @@ Other Changes * The ``\xff\xff/worker_interfaces/`` keyspace now begins at a key which includes a trailing ``/`` (previously ``\xff\xff/worker_interfaces``). Range reads to this range now respect the end key passed into the range and include the keyspace prefix in the resulting keys. `(PR #3095) `_ * Added FreeBSD support. `(PR #2634) `_ * Updated boost to 1.72. `(PR #2684) `_ +* Calling ``fdb_run_network`` multiple times in a single run of a client program now returns an error instead of causing undefined behavior. [6.3.1] `(PR #3229) `_ + +Fixes from previous versions +---------------------------- + +* The 6.3.1 patch release includes all fixes from the patch releases 6.2.21 and 6.2.22. :doc:`(6.2 Release Notes) ` Fixes only impacting 6.3.0+ --------------------------- From a332dff2ecc42a50e599d4fd8018e68bc152b5f2 Mon Sep 17 00:00:00 2001 From: sfc-gh-tclinkenbeard Date: Wed, 10 Jun 2020 09:44:35 -0700 Subject: [PATCH 09/30] Fix class-memaccess warnings --- fdbclient/NativeAPI.actor.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 91bce2b236..8f88139fb3 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -2711,7 +2711,7 @@ TransactionOptions::TransactionOptions(Database const& cx) { } TransactionOptions::TransactionOptions() { - memset(this, 0, sizeof(*this)); + memset(static_cast(this), 0, sizeof(*this)); maxBackoff = CLIENT_KNOBS->DEFAULT_MAX_BACKOFF; sizeLimit = CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT; tags = TagSet(); @@ -2720,7 +2720,7 @@ TransactionOptions::TransactionOptions() { } void TransactionOptions::reset(Database const& cx) { - memset(this, 0, sizeof(*this)); + memset(static_cast(this), 0, sizeof(*this)); maxBackoff = CLIENT_KNOBS->DEFAULT_MAX_BACKOFF; sizeLimit = CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT; tags = TagSet(); From 49a1feaa76767059a8a72bf9d1f8b5f9daac3ecd Mon Sep 17 00:00:00 2001 From: "A.J. Beamon" Date: Wed, 10 Jun 2020 14:13:12 -0700 Subject: [PATCH 10/30] Fix unrelated bug in an ASSERT that breaks the windows compile and is already fixed in release-6.3. --- fdbserver/workloads/TriggerRecovery.actor.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fdbserver/workloads/TriggerRecovery.actor.cpp b/fdbserver/workloads/TriggerRecovery.actor.cpp index 31682caf6a..0b4e094849 100644 --- a/fdbserver/workloads/TriggerRecovery.actor.cpp +++ b/fdbserver/workloads/TriggerRecovery.actor.cpp @@ -19,7 +19,7 @@ struct TriggerRecoveryLoopWorkload : TestWorkload { numRecoveries = getOption(options, LiteralStringRef("numRecoveries"), deterministicRandom()->randomInt(1, 10)); delayBetweenRecoveries = getOption(options, LiteralStringRef("delayBetweenRecoveries"), 0.0); killAllProportion = getOption(options, LiteralStringRef("killAllProportion"), 0.1); - ASSERT(numRecoveries > 0 && startTime >= 0 and delayBetweenRecoveries >= 0); + ASSERT((numRecoveries > 0) && (startTime >= 0) && (delayBetweenRecoveries >= 0)); TraceEvent(SevInfo, "TriggerRecoveryLoopSetup") .detail("StartTime", startTime) .detail("NumRecoveries", numRecoveries) From 980bee1d1388e89342aa0f41d03310b13629d4ea Mon Sep 17 00:00:00 2001 From: Chaoguang Lin Date: Thu, 11 Jun 2020 12:22:19 -0700 Subject: [PATCH 11/30] Fix fetchShardMetricsList_impl and add read cache in special key space --- fdbclient/SpecialKeySpace.actor.cpp | 49 +++++-- fdbclient/SpecialKeySpace.actor.h | 46 ++++++- fdbserver/DataDistributionTracker.actor.cpp | 3 +- .../DataDistributionMetrics.actor.cpp | 129 ++++++++++++++---- tests/DataDistributionMetrics.txt | 35 ++--- 5 files changed, 203 insertions(+), 59 deletions(-) diff --git a/fdbclient/SpecialKeySpace.actor.cpp b/fdbclient/SpecialKeySpace.actor.cpp index b23b5b8d87..56f47f69b5 100644 --- a/fdbclient/SpecialKeySpace.actor.cpp +++ b/fdbclient/SpecialKeySpace.actor.cpp @@ -38,12 +38,13 @@ std::unordered_map SpecialKeySpace::moduleToB // orEqual == false && offset == 1 (Standard form) // If the corresponding key is not in the underlying key range, it will move over the range ACTOR Future moveKeySelectorOverRangeActor(const SpecialKeyRangeBaseImpl* skrImpl, ReadYourWritesTransaction* ryw, - KeySelector* ks) { + KeySelector* ks, Optional>* cache) { ASSERT(!ks->orEqual); // should be removed before calling ASSERT(ks->offset != 1); // never being called if KeySelector is already normalized state Key startKey(skrImpl->getKeyRange().begin); state Key endKey(skrImpl->getKeyRange().end); + state Standalone result; if (ks->offset < 1) { // less than the given key @@ -60,7 +61,15 @@ ACTOR Future moveKeySelectorOverRangeActor(const SpecialKeyRangeBaseImpl* .detail("SpecialKeyRangeStart", skrImpl->getKeyRange().begin) .detail("SpecialKeyRangeEnd", skrImpl->getKeyRange().end); - Standalone result = wait(skrImpl->getRange(ryw, KeyRangeRef(startKey, endKey))); + if (skrImpl->isAsync()) { + const SpecialKeyRangeAsyncImpl* ptr = dynamic_cast(skrImpl); + Standalone result_ = wait(ptr->getRange(ryw, KeyRangeRef(startKey, endKey), cache)); + result = result_; + } else { + Standalone result_ = wait(skrImpl->getRange(ryw, KeyRangeRef(startKey, endKey))); + result = result_; + } + if (result.size() == 0) { TraceEvent(SevDebug, "ZeroElementsIntheRange").detail("Start", startKey).detail("End", endKey); return Void(); @@ -107,7 +116,8 @@ void onModuleRead(ReadYourWritesTransaction* ryw, SpecialKeySpace::MODULE module // to maintain; Thus, separate each part to make the code easy to understand and more compact ACTOR Future normalizeKeySelectorActor(SpecialKeySpace* sks, ReadYourWritesTransaction* ryw, KeySelector* ks, Optional* lastModuleRead, int* actualOffset, - Standalone* result) { + Standalone* result, + Optional>* cache) { state RangeMap::Iterator iter = ks->offset < 1 ? sks->getImpls().rangeContainingKeyBefore(ks->getKey()) : sks->getImpls().rangeContaining(ks->getKey()); @@ -115,7 +125,7 @@ ACTOR Future normalizeKeySelectorActor(SpecialKeySpace* sks, ReadYourWrite (ks->offset > 1 && iter != sks->getImpls().ranges().end())) { onModuleRead(ryw, sks->getModules().rangeContaining(iter->begin())->value(), *lastModuleRead); if (iter->value() != nullptr) { - wait(moveKeySelectorOverRangeActor(iter->value(), ryw, ks)); + wait(moveKeySelectorOverRangeActor(iter->value(), ryw, ks, cache)); } ks->offset < 1 ? --iter : ++iter; } @@ -164,13 +174,16 @@ SpecialKeySpace::getRangeAggregationActor(SpecialKeySpace* sks, ReadYourWritesTr // This function handles ranges which cover more than one keyrange and aggregates all results // KeySelector, GetRangeLimits and reverse are all handled here state Standalone result; + state Standalone pairs; state RangeMap::Iterator iter; state int actualBeginOffset; state int actualEndOffset; state Optional lastModuleRead; + // used to cache result from potential first read + state Optional> cache; - wait(normalizeKeySelectorActor(sks, ryw, &begin, &lastModuleRead, &actualBeginOffset, &result)); - wait(normalizeKeySelectorActor(sks, ryw, &end, &lastModuleRead, &actualEndOffset, &result)); + wait(normalizeKeySelectorActor(sks, ryw, &begin, &lastModuleRead, &actualBeginOffset, &result, &cache)); + wait(normalizeKeySelectorActor(sks, ryw, &end, &lastModuleRead, &actualEndOffset, &result, &cache)); // Handle all corner cases like what RYW does // return if range inverted if (actualBeginOffset >= actualEndOffset && begin.getKey() >= end.getKey()) { @@ -195,7 +208,14 @@ SpecialKeySpace::getRangeAggregationActor(SpecialKeySpace* sks, ReadYourWritesTr KeyRangeRef kr = iter->range(); KeyRef keyStart = kr.contains(begin.getKey()) ? begin.getKey() : kr.begin; KeyRef keyEnd = kr.contains(end.getKey()) ? end.getKey() : kr.end; - Standalone pairs = wait(iter->value()->getRange(ryw, KeyRangeRef(keyStart, keyEnd))); + if (iter->value()->isAsync() && cache.present()) { + const SpecialKeyRangeAsyncImpl* ptr = dynamic_cast(iter->value()); + Standalone pairs_ = wait(ptr->getRange(ryw, KeyRangeRef(keyStart, keyEnd), &cache)); + pairs = pairs_; + } else { + Standalone pairs_ = wait(iter->value()->getRange(ryw, KeyRangeRef(keyStart, keyEnd))); + pairs = pairs_; + } result.arena().dependsOn(pairs.arena()); // limits handler for (int i = pairs.size() - 1; i >= 0; --i) { @@ -218,7 +238,14 @@ SpecialKeySpace::getRangeAggregationActor(SpecialKeySpace* sks, ReadYourWritesTr KeyRangeRef kr = iter->range(); KeyRef keyStart = kr.contains(begin.getKey()) ? begin.getKey() : kr.begin; KeyRef keyEnd = kr.contains(end.getKey()) ? end.getKey() : kr.end; - Standalone pairs = wait(iter->value()->getRange(ryw, KeyRangeRef(keyStart, keyEnd))); + if (iter->value()->isAsync() && cache.present()) { + const SpecialKeyRangeAsyncImpl* ptr = dynamic_cast(iter->value()); + Standalone pairs_ = wait(ptr->getRange(ryw, KeyRangeRef(keyStart, keyEnd), &cache)); + pairs = pairs_; + } else { + Standalone pairs_ = wait(iter->value()->getRange(ryw, KeyRangeRef(keyStart, keyEnd))); + pairs = pairs_; + } result.arena().dependsOn(pairs.arena()); // limits handler for (int i = 0; i < pairs.size(); ++i) { @@ -316,7 +343,7 @@ Future> ConflictingKeysImpl::getRange(ReadYourWritesT return result; } -ACTOR Future> ddStatsGetRangeActor(ReadYourWritesTransaction* ryw, KeyRangeRef kr) { +ACTOR Future> ddMetricsGetRangeActor(ReadYourWritesTransaction* ryw, KeyRangeRef kr) { try { auto keys = kr.removePrefix(ddStatsRange.begin); Standalone> resultWithoutPrefix = @@ -339,10 +366,10 @@ ACTOR Future> ddStatsGetRangeActor(ReadYourWritesTran } } -DDStatsRangeImpl::DDStatsRangeImpl(KeyRangeRef kr) : SpecialKeyRangeBaseImpl(kr) {} +DDStatsRangeImpl::DDStatsRangeImpl(KeyRangeRef kr) : SpecialKeyRangeAsyncImpl(kr) {} Future> DDStatsRangeImpl::getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const { - return ddStatsGetRangeActor(ryw, kr); + return ddMetricsGetRangeActor(ryw, kr); } class SpecialKeyRangeTestImpl : public SpecialKeyRangeBaseImpl { diff --git a/fdbclient/SpecialKeySpace.actor.h b/fdbclient/SpecialKeySpace.actor.h index a33ff666a4..4c6b127cbc 100644 --- a/fdbclient/SpecialKeySpace.actor.h +++ b/fdbclient/SpecialKeySpace.actor.h @@ -38,13 +38,55 @@ public: // Each derived class only needs to implement this simple version of getRange virtual Future> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const = 0; - explicit SpecialKeyRangeBaseImpl(KeyRangeRef kr) : range(kr) {} + explicit SpecialKeyRangeBaseImpl(KeyRangeRef kr, bool async = false) : range(kr), async(async) {} KeyRangeRef getKeyRange() const { return range; } + bool isAsync() const { return async; } virtual ~SpecialKeyRangeBaseImpl() {} protected: KeyRange range; // underlying key range for this function + bool async; // true if the range read emits a rpc call, thus we cache the results to keep consistency in the same + // range read +}; + +class SpecialKeyRangeAsyncImpl : public SpecialKeyRangeBaseImpl { +public: + explicit SpecialKeyRangeAsyncImpl(KeyRangeRef kr) : SpecialKeyRangeBaseImpl(kr, true) {} + + Future> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const = 0; + + // calling with a cache object to have consistent results if we need to call rpc + Future> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr, + Optional>* cache) const { + return getRangeAsyncActor(this, ryw, kr, cache); + } + + ACTOR static Future> getRangeAsyncActor(const SpecialKeyRangeBaseImpl* skrAyncImpl, + ReadYourWritesTransaction* ryw, KeyRangeRef kr, + Optional>* cache) { + ASSERT(skrAyncImpl->getKeyRange().contains(kr)); + if (cache == nullptr) { + // a nullptr means we want to read directly and do not need to cache them + Standalone result = wait(skrAyncImpl->getRange(ryw, kr)); + return result; + } else if (!cache->present()) { + // For simplicity, every time we need to cache, we read the whole range + // TODO : improvements are needed if we have expensive rpc calls + Standalone result_ = wait(skrAyncImpl->getRange(ryw, skrAyncImpl->getKeyRange())); + *cache = result_; + } + const auto& allResults = cache->get(); + int start = 0, end = allResults.size(); + while (start < allResults.size() && allResults[start].key < kr.begin) ++start; + while (end > 0 && allResults[end - 1].key >= kr.end) --end; + if (start < end) { + Standalone result = RangeResultRef(allResults.slice(start, end), false); + result.arena().dependsOn(allResults.arena()); + return result; + } else + return Standalone(); + } }; class SpecialKeySpace { @@ -148,7 +190,7 @@ public: Future> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override; }; -class DDStatsRangeImpl : public SpecialKeyRangeBaseImpl { +class DDStatsRangeImpl : public SpecialKeyRangeAsyncImpl { public: explicit DDStatsRangeImpl(KeyRangeRef kr); Future> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override; diff --git a/fdbserver/DataDistributionTracker.actor.cpp b/fdbserver/DataDistributionTracker.actor.cpp index 01d2a34ab9..3d96ed4408 100644 --- a/fdbserver/DataDistributionTracker.actor.cpp +++ b/fdbserver/DataDistributionTracker.actor.cpp @@ -822,7 +822,8 @@ ACTOR Future fetchShardMetricsList_impl( DataDistributionTracker* self, Ge // list of metrics, regenerate on loop when full range unsuccessful Standalone> result; Future onChange; - for (auto t : self->shards.containedRanges(req.keys)) { + for (auto t = self->shards.containedRanges(req.keys).begin(); + t != self->shards.intersectingRanges(req.keys).end(); ++t) { auto &stats = t.value().stats; if( !stats->get().present() ) { onChange = stats->onChange(); diff --git a/fdbserver/workloads/DataDistributionMetrics.actor.cpp b/fdbserver/workloads/DataDistributionMetrics.actor.cpp index 96a0d37510..222b96ccff 100644 --- a/fdbserver/workloads/DataDistributionMetrics.actor.cpp +++ b/fdbserver/workloads/DataDistributionMetrics.actor.cpp @@ -26,52 +26,110 @@ struct DataDistributionMetricsWorkload : KVWorkload { - int numTransactions; - int writesPerTransaction; - int transactionsCommitted; - int numShards; + int numShards, readPerTx, writePerTx; int64_t avgBytes; + double testDuration; + std::string keyPrefix; + PerfIntCounter commits, errors; DataDistributionMetricsWorkload(WorkloadContext const& wcx) - : KVWorkload(wcx), transactionsCommitted(0), numShards(0), avgBytes(0) { - numTransactions = getOption(options, LiteralStringRef("numTransactions"), 100); - writesPerTransaction = getOption(options, LiteralStringRef("writesPerTransaction"), 1000); + : KVWorkload(wcx), numShards(0), avgBytes(0), commits("Commits"), errors("Errors") { + testDuration = getOption(options, LiteralStringRef("testDuration"), 10.0); + keyPrefix = getOption(options, LiteralStringRef("keyPrefix"), LiteralStringRef("DDMetrics")).toString(); + readPerTx = getOption(options, LiteralStringRef("readPerTransaction"), 1); + writePerTx = getOption(options, LiteralStringRef("writePerTransaction"), 5 * readPerTx); + ASSERT(nodeCount > 1); } static Value getRandomValue() { return Standalone(format("Value/%08d", deterministicRandom()->randomInt(0, 10e6))); } - ACTOR static Future _start(Database cx, DataDistributionMetricsWorkload* self) { - state int tNum; - for (tNum = 0; tNum < self->numTransactions; ++tNum) { - loop { - state ReadYourWritesTransaction tr(cx); - try { - state int i; - for (i = 0; i < self->writesPerTransaction; ++i) { - tr.set(StringRef(format("Key/%08d", tNum * self->writesPerTransaction + i)), getRandomValue()); + Key keyForIndex(int n) { return doubleToTestKey((double)n / nodeCount, keyPrefix); } + + ACTOR static Future ddRWClient(Database cx, DataDistributionMetricsWorkload* self) { + loop { + state ReadYourWritesTransaction tr(cx); + state int i; + try { + for (i = 0; i < self->readPerTx; ++i) + wait(success(tr.get(self->keyForIndex(deterministicRandom()->randomInt(0, self->nodeCount))))); // read + for (i = 0; i < self->writePerTx; ++i) + tr.set(self->keyForIndex(deterministicRandom()->randomInt(0, self->nodeCount)), getRandomValue()); // write + wait(tr.commit()); + ++self->commits; + } catch (Error& e) { + wait(tr.onError(e)); + } + tr.reset(); + } + } + + ACTOR Future resultConsistencyCheckClient(Database cx, DataDistributionMetricsWorkload* self) { + state Reference tr = + Reference(new ReadYourWritesTransaction(cx)); + loop { + try { + int startIndex = deterministicRandom()->randomInt(0, self->nodeCount - 1); + int endIndex = deterministicRandom()->randomInt(startIndex + 1, self->nodeCount); + state Key startKey = self->keyForIndex(startIndex); + state Key endKey = self->keyForIndex(endIndex); + // lastLessOrEqual + state KeySelector begin = KeySelectorRef(startKey.withPrefix(ddStatsRange.begin, startKey.arena()), true, 0); + state KeySelector end = KeySelectorRef(endKey.withPrefix(ddStatsRange.begin, endKey.arena()), false, 2); + Standalone result = wait(tr->getRange(begin, end, GetRangeLimits(CLIENT_KNOBS->SHARD_COUNT_LIMIT))); + if (result.size() > 1) { + if (result[0].key <= begin.getKey() && result[1].key > begin.getKey()) { + TraceEvent(SevDebug, "DDMetricsConsistencyTest") + .detail("Size", result.size()) + .detail("FirstKey", result[0].key.toString()) + .detail("SecondKey", result[1].key.toString()) + .detail("BeginKeySelector", begin.toString()); + } else { + ++self->errors; + TraceEvent(SevError, "TestFailure") + .detail("Reason", "Result mismatches the given begin selector") + .detail("Size", result.size()) + .detail("FirstKey", result[0].key.toString()) + .detail("SecondKey", result[1].key.toString()) + .detail("BeginKeySelector", begin.toString()); + } + if (result[result.size()-1].key >= end.getKey() && result[result.size()-2].key < end.getKey()) { + TraceEvent(SevDebug, "DDMetricsConsistencyTest") + .detail("Size", result.size()) + .detail("LastKey", result[result.size()-1].key.toString()) + .detail("SecondLastKey", result[result.size()-2].key.toString()) + .detail("EndKeySelector", end.toString()); + } else { + ++self->errors; + TraceEvent(SevError, "TestFailure") + .detail("Reason", "Result mismatches the given end selector") + .detail("Size", result.size()) + .detail("FirstKey", result[result.size()-1].key.toString()) + .detail("SecondKey", result[result.size()-2].key.toString()) + .detail("EndKeySelector", end.toString()); } - wait(tr.commit()); - ++self->transactionsCommitted; - break; - } catch (Error& e) { - wait(tr.onError(e)); } + } catch (Error& e) { + // Ignore timed_out error and cross_module_read, the end key may potentially point outside the range + if (e.code() == error_code_timed_out || e.code() == error_code_special_keys_cross_module_read) continue; + TraceEvent(SevDebug, "FailedToRetrieveDDMetrics").detail("Error", e.what()); + wait(tr->onError(e)); } } - return Void(); } ACTOR static Future _check(Database cx, DataDistributionMetricsWorkload* self) { - if (self->transactionsCommitted == 0) { - TraceEvent(SevError, "NoTransactionsCommitted"); + if (self->errors.getValue() > 0) { + TraceEvent(SevError, "TestFailure").detail("Reason", "GetRange Results Inconsistent"); return false; } + // TODO : find why this not work + // wait(quietDatabase(cx, self->dbInfo, "PopulateTPCC")); state Reference tr = Reference(new ReadYourWritesTransaction(cx)); try { - state Standalone result = wait(tr->getRange(ddStatsRange, 100)); + state Standalone result = wait(tr->getRange(ddStatsRange, CLIENT_KNOBS->SHARD_COUNT_LIMIT)); ASSERT(!result.more); self->numShards = result.size(); if (self->numShards < 1) return false; @@ -81,19 +139,31 @@ struct DataDistributionMetricsWorkload : KVWorkload { totalBytes += readJSONStrictly(result[i].value.toString()).get_obj()["ShardBytes"].get_int64(); } self->avgBytes = totalBytes / self->numShards; - // fetch data-distribution stats for a smalller range + // fetch data-distribution stats for a smaller range + ASSERT(result.size()); state int idx = deterministicRandom()->randomInt(0, result.size()); Standalone res = wait(tr->getRange( - KeyRangeRef(result[idx].key, idx + 1 < result.size() ? result[idx + 1].key : ddStatsRange.end), 100)); + KeyRangeRef(result[idx].key, idx + 1 < result.size() ? result[idx + 1].key : ddStatsRange.end), + 100)); ASSERT_WE_THINK(res.size() == 1 && - res[0] == result[idx]); // It works good now. However, not sure in any case of data-distribution, the number changes + res[0] == result[idx]); // It works good now. However, not sure in any + // case of data-distribution, the number changes } catch (Error& e) { TraceEvent(SevError, "FailedToRetrieveDDMetrics").detail("Error", e.what()); - return false; + throw; } return true; } + ACTOR Future _start(Database cx, DataDistributionMetricsWorkload* self) { + std::vector> clients; + clients.push_back(self->resultConsistencyCheckClient(cx, self)); + for (int i = 0; i < self->actorCount; ++i) clients.push_back(self->ddRWClient(cx, self)); + wait(timeout(waitForAll(clients), self->testDuration, Void())); + wait(delay(5.0)); + return Void(); + } + virtual std::string description() { return "DataDistributionMetrics"; } virtual Future setup(Database const& cx) { return Void(); } virtual Future start(Database const& cx) { return _start(cx, this); } @@ -102,6 +172,7 @@ struct DataDistributionMetricsWorkload : KVWorkload { virtual void getMetrics(vector& m) { m.push_back(PerfMetric("NumShards", numShards, true)); m.push_back(PerfMetric("AvgBytes", avgBytes, true)); + m.push_back(commits.getMetric()); } }; diff --git a/tests/DataDistributionMetrics.txt b/tests/DataDistributionMetrics.txt index 77c83b0eb6..b9a98ab782 100644 --- a/tests/DataDistributionMetrics.txt +++ b/tests/DataDistributionMetrics.txt @@ -1,21 +1,24 @@ -testTitle=DataDistributionMetrics +testTitle=DataDistributionMetricsCorrectness + testName=DataDistributionMetrics + testDuration=10.0 + nodeCount=100000 + actorCount=64 + keyPrefix=DDMetrics + testName=Cycle transactionsPerSecond=2500.0 testDuration=10.0 expectedRate=0.025 - testName=DataDistributionMetrics - numTransactions=100 - writesPerTransaction=1000 - - testName=Attrition - machinesToKill=1 - machinesToLeave=3 - reboot=true - testDuration=10.0 - - testName=Attrition - machinesToKill=1 - machinesToLeave=3 - reboot=true - testDuration=10.0 \ No newline at end of file + testName=Mako + testDuration=10.0 + transactionsPerSecond=2500 + rows=100000 + sampleSize=100 + valueBytes=16 + keyBytes=16 + operations=u8i + actorCountPerClient=64 + populateData=true + runBenchmark=true + preserveData=false \ No newline at end of file From 5bc2e2e5956180fed3a1362c90293e29c31cbed6 Mon Sep 17 00:00:00 2001 From: Chaoguang Lin Date: Thu, 11 Jun 2020 21:31:05 -0700 Subject: [PATCH 12/30] update comments, make isAsync() virtual, remove unused code --- fdbclient/SpecialKeySpace.actor.h | 23 +++++------ fdbserver/DataDistributionTracker.actor.cpp | 5 ++- .../DataDistributionMetrics.actor.cpp | 39 +++++++++++-------- 3 files changed, 38 insertions(+), 29 deletions(-) diff --git a/fdbclient/SpecialKeySpace.actor.h b/fdbclient/SpecialKeySpace.actor.h index 4c6b127cbc..dedee760cd 100644 --- a/fdbclient/SpecialKeySpace.actor.h +++ b/fdbclient/SpecialKeySpace.actor.h @@ -38,21 +38,22 @@ public: // Each derived class only needs to implement this simple version of getRange virtual Future> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const = 0; - explicit SpecialKeyRangeBaseImpl(KeyRangeRef kr, bool async = false) : range(kr), async(async) {} + explicit SpecialKeyRangeBaseImpl(KeyRangeRef kr) : range(kr) {} KeyRangeRef getKeyRange() const { return range; } - bool isAsync() const { return async; } + // true if the getRange call can emit more than one rpc calls, + // we cache the results to keep consistency in the same getrange lifetime + // TODO : give this function a more descriptive name + virtual bool isAsync() const { return false; } virtual ~SpecialKeyRangeBaseImpl() {} protected: KeyRange range; // underlying key range for this function - bool async; // true if the range read emits a rpc call, thus we cache the results to keep consistency in the same - // range read }; class SpecialKeyRangeAsyncImpl : public SpecialKeyRangeBaseImpl { public: - explicit SpecialKeyRangeAsyncImpl(KeyRangeRef kr) : SpecialKeyRangeBaseImpl(kr, true) {} + explicit SpecialKeyRangeAsyncImpl(KeyRangeRef kr) : SpecialKeyRangeBaseImpl(kr) {} Future> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const = 0; @@ -62,17 +63,17 @@ public: return getRangeAsyncActor(this, ryw, kr, cache); } + bool isAsync() const override { return true; } + ACTOR static Future> getRangeAsyncActor(const SpecialKeyRangeBaseImpl* skrAyncImpl, ReadYourWritesTransaction* ryw, KeyRangeRef kr, Optional>* cache) { ASSERT(skrAyncImpl->getKeyRange().contains(kr)); - if (cache == nullptr) { - // a nullptr means we want to read directly and do not need to cache them - Standalone result = wait(skrAyncImpl->getRange(ryw, kr)); - return result; - } else if (!cache->present()) { + ASSERT(cache != nullptr); + if (!cache->present()) { // For simplicity, every time we need to cache, we read the whole range - // TODO : improvements are needed if we have expensive rpc calls + // Although sometimes the range can be narrowed, + // there is not a general way to do it in complicated scenarios Standalone result_ = wait(skrAyncImpl->getRange(ryw, skrAyncImpl->getKeyRange())); *cache = result_; } diff --git a/fdbserver/DataDistributionTracker.actor.cpp b/fdbserver/DataDistributionTracker.actor.cpp index 3d96ed4408..5e8574e594 100644 --- a/fdbserver/DataDistributionTracker.actor.cpp +++ b/fdbserver/DataDistributionTracker.actor.cpp @@ -822,8 +822,9 @@ ACTOR Future fetchShardMetricsList_impl( DataDistributionTracker* self, Ge // list of metrics, regenerate on loop when full range unsuccessful Standalone> result; Future onChange; - for (auto t = self->shards.containedRanges(req.keys).begin(); - t != self->shards.intersectingRanges(req.keys).end(); ++t) { + auto beginIter = self->shards.containedRanges(req.keys).begin(); + auto endIter = self->shards.intersectingRanges(req.keys).end(); + for (auto t = beginIter; t != endIter; ++t) { auto &stats = t.value().stats; if( !stats->get().present() ) { onChange = stats->onChange(); diff --git a/fdbserver/workloads/DataDistributionMetrics.actor.cpp b/fdbserver/workloads/DataDistributionMetrics.actor.cpp index 222b96ccff..782f8c282e 100644 --- a/fdbserver/workloads/DataDistributionMetrics.actor.cpp +++ b/fdbserver/workloads/DataDistributionMetrics.actor.cpp @@ -53,9 +53,11 @@ struct DataDistributionMetricsWorkload : KVWorkload { state int i; try { for (i = 0; i < self->readPerTx; ++i) - wait(success(tr.get(self->keyForIndex(deterministicRandom()->randomInt(0, self->nodeCount))))); // read + wait(success( + tr.get(self->keyForIndex(deterministicRandom()->randomInt(0, self->nodeCount))))); // read for (i = 0; i < self->writePerTx; ++i) - tr.set(self->keyForIndex(deterministicRandom()->randomInt(0, self->nodeCount)), getRandomValue()); // write + tr.set(self->keyForIndex(deterministicRandom()->randomInt(0, self->nodeCount)), + getRandomValue()); // write wait(tr.commit()); ++self->commits; } catch (Error& e) { @@ -74,10 +76,17 @@ struct DataDistributionMetricsWorkload : KVWorkload { int endIndex = deterministicRandom()->randomInt(startIndex + 1, self->nodeCount); state Key startKey = self->keyForIndex(startIndex); state Key endKey = self->keyForIndex(endIndex); - // lastLessOrEqual - state KeySelector begin = KeySelectorRef(startKey.withPrefix(ddStatsRange.begin, startKey.arena()), true, 0); + // Find the last key <= startKey and use as the begin of the range. Since "Key()" is always the starting point, this key selector will never do cross_module_range_read. + // In addition, the first key in the result will be the last one <= startKey (Condition #1) + state KeySelector begin = + KeySelectorRef(startKey.withPrefix(ddStatsRange.begin, startKey.arena()), true, 0); + // Find the last key less than endKey, move forward 2 keys, and use this key as the (exclusive) end of + // the range. If we didn't read through the end of the range, then the second last key + // in the result will be the last key less than endKey. (Condition #2) state KeySelector end = KeySelectorRef(endKey.withPrefix(ddStatsRange.begin, endKey.arena()), false, 2); - Standalone result = wait(tr->getRange(begin, end, GetRangeLimits(CLIENT_KNOBS->SHARD_COUNT_LIMIT))); + Standalone result = + wait(tr->getRange(begin, end, GetRangeLimits(CLIENT_KNOBS->SHARD_COUNT_LIMIT))); + // Condition #1 and #2 can be broken if multiple rpc calls happened in one getRange if (result.size() > 1) { if (result[0].key <= begin.getKey() && result[1].key > begin.getKey()) { TraceEvent(SevDebug, "DDMetricsConsistencyTest") @@ -94,24 +103,24 @@ struct DataDistributionMetricsWorkload : KVWorkload { .detail("SecondKey", result[1].key.toString()) .detail("BeginKeySelector", begin.toString()); } - if (result[result.size()-1].key >= end.getKey() && result[result.size()-2].key < end.getKey()) { + if (result[result.size() - 1].key >= end.getKey() && result[result.size() - 2].key < end.getKey()) { TraceEvent(SevDebug, "DDMetricsConsistencyTest") .detail("Size", result.size()) - .detail("LastKey", result[result.size()-1].key.toString()) - .detail("SecondLastKey", result[result.size()-2].key.toString()) + .detail("LastKey", result[result.size() - 1].key.toString()) + .detail("SecondLastKey", result[result.size() - 2].key.toString()) .detail("EndKeySelector", end.toString()); } else { ++self->errors; TraceEvent(SevError, "TestFailure") .detail("Reason", "Result mismatches the given end selector") .detail("Size", result.size()) - .detail("FirstKey", result[result.size()-1].key.toString()) - .detail("SecondKey", result[result.size()-2].key.toString()) + .detail("FirstKey", result[result.size() - 1].key.toString()) + .detail("SecondKey", result[result.size() - 2].key.toString()) .detail("EndKeySelector", end.toString()); } } } catch (Error& e) { - // Ignore timed_out error and cross_module_read, the end key may potentially point outside the range + // Ignore timed_out error and cross_module_read, the end key selector may read through the end if (e.code() == error_code_timed_out || e.code() == error_code_special_keys_cross_module_read) continue; TraceEvent(SevDebug, "FailedToRetrieveDDMetrics").detail("Error", e.what()); wait(tr->onError(e)); @@ -143,11 +152,9 @@ struct DataDistributionMetricsWorkload : KVWorkload { ASSERT(result.size()); state int idx = deterministicRandom()->randomInt(0, result.size()); Standalone res = wait(tr->getRange( - KeyRangeRef(result[idx].key, idx + 1 < result.size() ? result[idx + 1].key : ddStatsRange.end), - 100)); - ASSERT_WE_THINK(res.size() == 1 && - res[0] == result[idx]); // It works good now. However, not sure in any - // case of data-distribution, the number changes + KeyRangeRef(result[idx].key, idx + 1 < result.size() ? result[idx + 1].key : ddStatsRange.end), 100)); + ASSERT_WE_THINK(res.size() == 1 && res[0] == result[idx]); // It works good now. However, not sure in any + // case of data-distribution, the number changes } catch (Error& e) { TraceEvent(SevError, "FailedToRetrieveDDMetrics").detail("Error", e.what()); throw; From 6f4d6f192331e5b65637f0ee77f49aefbeb4ac9b Mon Sep 17 00:00:00 2001 From: "A.J. Beamon" Date: Fri, 12 Jun 2020 14:40:10 -0700 Subject: [PATCH 13/30] Refreshing TLS files was done to an incorrect location that resulted in random memory being wiped out. Also fixed a typo that loaded the key bytes into the CA bytes in some cases. --- flow/TLSConfig.actor.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flow/TLSConfig.actor.cpp b/flow/TLSConfig.actor.cpp index 0b33550104..49e557cbe8 100644 --- a/flow/TLSConfig.actor.cpp +++ b/flow/TLSConfig.actor.cpp @@ -287,7 +287,7 @@ ACTOR static Future readEntireFile( std::string filename, std::string* des throw file_too_large(); } destination->resize(filesize); - wait(success(file->read(&destination[0], filesize, 0))); + wait(success(file->read(&((*destination)[0]), filesize, 0))); return Void(); } @@ -313,7 +313,7 @@ ACTOR Future TLSConfig::loadAsync(const TLSConfig* self) { if (CAPath.size()) { reads.push_back( readEntireFile( CAPath, &loaded.tlsCABytes ) ); } else { - loaded.tlsCABytes = self->tlsKeyBytes; + loaded.tlsCABytes = self->tlsCABytes; } wait(waitForAll(reads)); From d0b6a4d2d51215f35d2674cd1004218a39a60d25 Mon Sep 17 00:00:00 2001 From: "A.J. Beamon" Date: Fri, 12 Jun 2020 15:02:30 -0700 Subject: [PATCH 14/30] Add release note. --- documentation/sphinx/source/release-notes.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/documentation/sphinx/source/release-notes.rst b/documentation/sphinx/source/release-notes.rst index 23daafe517..887bbeb1a9 100644 --- a/documentation/sphinx/source/release-notes.rst +++ b/documentation/sphinx/source/release-notes.rst @@ -102,6 +102,7 @@ Fixes only impacting 6.3.0+ --------------------------- * Renamed ``MIN_DELAY_STORAGE_CANDIDACY_SECONDS`` knob to ``MIN_DELAY_CC_WORST_FIT_CANDIDACY_SECONDS``. [6.3.2] `(PR #3327) `_ +* Refreshing TLS certificates could cause crashes. [6.3.2] `(PR #3352) `_ Earlier release notes --------------------- From 3a914c71f2c1072b03396230dd228c2cffd0b763 Mon Sep 17 00:00:00 2001 From: sfc-gh-tclinkenbeard Date: Sat, 13 Jun 2020 17:14:43 -0700 Subject: [PATCH 15/30] Add TransactionOptions::clear function --- fdbclient/NativeAPI.actor.cpp | 29 +++++++++++++++++++---------- fdbclient/NativeAPI.actor.h | 3 +++ 2 files changed, 22 insertions(+), 10 deletions(-) diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 8f88139fb3..9353565f20 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -2710,22 +2710,31 @@ TransactionOptions::TransactionOptions(Database const& cx) { } } -TransactionOptions::TransactionOptions() { - memset(static_cast(this), 0, sizeof(*this)); +void TransactionOptions::clear() { maxBackoff = CLIENT_KNOBS->DEFAULT_MAX_BACKOFF; + getReadVersionFlags = 0; sizeLimit = CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT; - tags = TagSet(); - readTags = TagSet(); + maxTransactionLoggingFieldLength = 0; + checkWritesEnabled = false; + causalWriteRisky = false; + commitOnFirstProxy = false; + debugDump = false; + lockAware = false; + readOnly = false; + firstInBatch = false; + includePort = false; + reportConflictingKeys = false; + tags = TagSet{}; + readTags = TagSet{}; priority = TransactionPriority::DEFAULT; } +TransactionOptions::TransactionOptions() { + clear(); +} + void TransactionOptions::reset(Database const& cx) { - memset(static_cast(this), 0, sizeof(*this)); - maxBackoff = CLIENT_KNOBS->DEFAULT_MAX_BACKOFF; - sizeLimit = CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT; - tags = TagSet(); - readTags = TagSet(); - priority = TransactionPriority::DEFAULT; + clear(); lockAware = cx->lockAware; if (cx->apiVersionAtLeast(630)) { includePort = true; diff --git a/fdbclient/NativeAPI.actor.h b/fdbclient/NativeAPI.actor.h index ac252345fc..c0d6966100 100644 --- a/fdbclient/NativeAPI.actor.h +++ b/fdbclient/NativeAPI.actor.h @@ -141,6 +141,9 @@ struct TransactionOptions { TransactionOptions(); void reset(Database const& cx); + +private: + void clear(); }; class ReadYourWritesTransaction; // workaround cyclic dependency From 7344a4fb678888a129403330bdf9a12b2b986da7 Mon Sep 17 00:00:00 2001 From: sfc-gh-tclinkenbeard Date: Sat, 13 Jun 2020 17:57:23 -0700 Subject: [PATCH 16/30] Suppress array-bounds warnings in mako.c --- bindings/c/CMakeLists.txt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/bindings/c/CMakeLists.txt b/bindings/c/CMakeLists.txt index 0f4b30544a..eb4fc6391a 100644 --- a/bindings/c/CMakeLists.txt +++ b/bindings/c/CMakeLists.txt @@ -72,6 +72,8 @@ if(NOT WIN32) test/mako/utils.c test/mako/utils.h) + set_source_files_properties(test/mako/mako.c PROPERTIES COMPILE_FLAGS -Wno-array-bounds) + if(OPEN_FOR_IDE) add_library(fdb_c_performance_test OBJECT test/performance_test.c test/test.h) add_library(fdb_c_ryw_benchmark OBJECT test/ryw_benchmark.c test/test.h) From 22a48f8349739f745adc18136d83681910e2746c Mon Sep 17 00:00:00 2001 From: sfc-gh-tclinkenbeard Date: Sun, 14 Jun 2020 20:16:43 -0700 Subject: [PATCH 17/30] Ignore -Warray-bounds for only the necessary part of mako.c --- bindings/c/CMakeLists.txt | 2 -- bindings/c/test/mako/mako.c | 4 ++++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/bindings/c/CMakeLists.txt b/bindings/c/CMakeLists.txt index eb4fc6391a..0f4b30544a 100644 --- a/bindings/c/CMakeLists.txt +++ b/bindings/c/CMakeLists.txt @@ -72,8 +72,6 @@ if(NOT WIN32) test/mako/utils.c test/mako/utils.h) - set_source_files_properties(test/mako/mako.c PROPERTIES COMPILE_FLAGS -Wno-array-bounds) - if(OPEN_FOR_IDE) add_library(fdb_c_performance_test OBJECT test/performance_test.c test/test.h) add_library(fdb_c_ryw_benchmark OBJECT test/ryw_benchmark.c test/test.h) diff --git a/bindings/c/test/mako/mako.c b/bindings/c/test/mako/mako.c index 26db163691..8f88d2f197 100644 --- a/bindings/c/test/mako/mako.c +++ b/bindings/c/test/mako/mako.c @@ -1034,6 +1034,9 @@ int parse_transaction(mako_args_t* args, char* optarg) { op = 0; while (*ptr) { +// Clang gives false positive array bounds warning, which must be ignored: +#pragma clang diagnostic push +#pragma clang diagnostic ignored "-Warray-bounds" if (strncmp(ptr, "grv", 3) == 0) { op = OP_GETREADVERSION; ptr += 3; @@ -1080,6 +1083,7 @@ int parse_transaction(mako_args_t* args, char* optarg) { error = 1; break; } +#pragma clang diagnostic pop /* count */ num = 0; From c56d97cc9f49a5526bd5625e0ca16a8974964491 Mon Sep 17 00:00:00 2001 From: Evan Tschannen Date: Sun, 14 Jun 2020 22:26:06 -0700 Subject: [PATCH 18/30] randomize the coordinator a storage worker connects to --- fdbserver/worker.actor.cpp | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/fdbserver/worker.actor.cpp b/fdbserver/worker.actor.cpp index 6f456691a9..f0e2399795 100644 --- a/fdbserver/worker.actor.cpp +++ b/fdbserver/worker.actor.cpp @@ -1591,14 +1591,17 @@ ACTOR Future createAndLockProcessIdFile(std::string folder) { ACTOR Future monitorLeaderRemotelyOneGeneration( Reference connFile, Reference> result, MonitorLeaderInfo info ) { state ClusterConnectionString ccf = info.intermediateConnFile->getConnectionString(); + state vector addrs = ccf.coordinators(); state ElectionResultRequest request; - request.key = ccf.clusterKey(); - request.coordinators = ccf.coordinators(); state int index = 0; state int successIndex = 0; + request.key = ccf.clusterKey(); + request.coordinators = ccf.coordinators(); + + deterministicRandom()->randomShuffle(addrs); loop { - LeaderElectionRegInterface interf( request.coordinators[index] ); + LeaderElectionRegInterface interf( addrs[index] ); request.reply = ReplyPromise>(); ErrorOr> leader = wait( interf.electionResult.tryGetReply( request ) ); @@ -1634,7 +1637,7 @@ ACTOR Future monitorLeaderRemotelyOneGeneration( ReferenceCOORDINATOR_RECONNECTION_DELAY ) ); } From 56addb24a4e08c2bcb8410fed9b2fc6011d07a46 Mon Sep 17 00:00:00 2001 From: Evan Tschannen Date: Sun, 14 Jun 2020 23:16:04 -0700 Subject: [PATCH 19/30] updated documentation for 6.3.2 --- documentation/sphinx/source/downloads.rst | 24 +++++++++---------- documentation/sphinx/source/release-notes.rst | 1 + 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/documentation/sphinx/source/downloads.rst b/documentation/sphinx/source/downloads.rst index 6b81a98a82..0426bfc362 100644 --- a/documentation/sphinx/source/downloads.rst +++ b/documentation/sphinx/source/downloads.rst @@ -10,38 +10,38 @@ macOS The macOS installation package is supported on macOS 10.7+. It includes the client and (optionally) the server. -* `FoundationDB-6.3.1.pkg `_ +* `FoundationDB-6.3.2.pkg `_ Ubuntu ------ The Ubuntu packages are supported on 64-bit Ubuntu 12.04+, but beware of the Linux kernel bug in Ubuntu 12.x. -* `foundationdb-clients-6.3.1-1_amd64.deb `_ -* `foundationdb-server-6.3.1-1_amd64.deb `_ (depends on the clients package) +* `foundationdb-clients-6.3.2-1_amd64.deb `_ +* `foundationdb-server-6.3.2-1_amd64.deb `_ (depends on the clients package) RHEL/CentOS EL6 --------------- The RHEL/CentOS EL6 packages are supported on 64-bit RHEL/CentOS 6.x. -* `foundationdb-clients-6.3.1-1.el6.x86_64.rpm `_ -* `foundationdb-server-6.3.1-1.el6.x86_64.rpm `_ (depends on the clients package) +* `foundationdb-clients-6.3.2-1.el6.x86_64.rpm `_ +* `foundationdb-server-6.3.2-1.el6.x86_64.rpm `_ (depends on the clients package) RHEL/CentOS EL7 --------------- The RHEL/CentOS EL7 packages are supported on 64-bit RHEL/CentOS 7.x. -* `foundationdb-clients-6.3.1-1.el7.x86_64.rpm `_ -* `foundationdb-server-6.3.1-1.el7.x86_64.rpm `_ (depends on the clients package) +* `foundationdb-clients-6.3.2-1.el7.x86_64.rpm `_ +* `foundationdb-server-6.3.2-1.el7.x86_64.rpm `_ (depends on the clients package) Windows ------- The Windows installer is supported on 64-bit Windows XP and later. It includes the client and (optionally) the server. -* `foundationdb-6.3.1-x64.msi `_ +* `foundationdb-6.3.2-x64.msi `_ API Language Bindings ===================== @@ -58,18 +58,18 @@ On macOS and Windows, the FoundationDB Python API bindings are installed as part If you need to use the FoundationDB Python API from other Python installations or paths, use the Python package manager ``pip`` (``pip install foundationdb``) or download the Python package: -* `foundationdb-6.3.1.tar.gz `_ +* `foundationdb-6.3.2.tar.gz `_ Ruby 1.9.3/2.0.0+ ----------------- -* `fdb-6.3.1.gem `_ +* `fdb-6.3.2.gem `_ Java 8+ ------- -* `fdb-java-6.3.1.jar `_ -* `fdb-java-6.3.1-javadoc.jar `_ +* `fdb-java-6.3.2.jar `_ +* `fdb-java-6.3.2-javadoc.jar `_ Go 1.11+ -------- diff --git a/documentation/sphinx/source/release-notes.rst b/documentation/sphinx/source/release-notes.rst index 8f4f997cb0..5009d705c7 100644 --- a/documentation/sphinx/source/release-notes.rst +++ b/documentation/sphinx/source/release-notes.rst @@ -109,6 +109,7 @@ Fixes only impacting 6.3.0+ * Renamed ``MIN_DELAY_STORAGE_CANDIDACY_SECONDS`` knob to ``MIN_DELAY_CC_WORST_FIT_CANDIDACY_SECONDS``. [6.3.2] `(PR #3327) `_ * Refreshing TLS certificates could cause crashes. [6.3.2] `(PR #3352) `_ +* All storage class processes attempted to connect to the same coordinator. [6.3.2] `(PR #3361) `_ Earlier release notes --------------------- From acbfe2e4c9b342cfaa39636808c0ea44c4009daa Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Mon, 15 Jun 2020 12:45:36 -0400 Subject: [PATCH 20/30] Revert "Revert "Initial RocksDB"" --- .../source/mr-status-json-schemas.rst.inc | 1 + fdbclient/DatabaseConfiguration.cpp | 4 +- fdbclient/FDBTypes.h | 2 + fdbclient/ManagementAPI.actor.cpp | 3 + fdbclient/Schemas.cpp | 3 +- fdbserver/CMakeLists.txt | 1 + fdbserver/IKeyValueStore.h | 7 +- fdbserver/KeyValueStoreRocksDB.actor.cpp | 429 ++++++++++++++++++ fdbserver/worker.actor.cpp | 18 +- fdbserver/workloads/KVStoreTest.actor.cpp | 4 +- tests/CMakeLists.txt | 1 + tests/RocksDBTest.txt | 48 ++ 12 files changed, 512 insertions(+), 9 deletions(-) create mode 100644 fdbserver/KeyValueStoreRocksDB.actor.cpp create mode 100644 tests/RocksDBTest.txt diff --git a/documentation/sphinx/source/mr-status-json-schemas.rst.inc b/documentation/sphinx/source/mr-status-json-schemas.rst.inc index 31ccb629fc..874a9b8f25 100644 --- a/documentation/sphinx/source/mr-status-json-schemas.rst.inc +++ b/documentation/sphinx/source/mr-status-json-schemas.rst.inc @@ -575,6 +575,7 @@ "ssd-1", "ssd-2", "ssd-redwood-experimental", + "ssd-rocksdb-experimental", "memory" ]}, "coordinators_count":1, diff --git a/fdbclient/DatabaseConfiguration.cpp b/fdbclient/DatabaseConfiguration.cpp index d7b8468f25..3edd327b0a 100644 --- a/fdbclient/DatabaseConfiguration.cpp +++ b/fdbclient/DatabaseConfiguration.cpp @@ -268,6 +268,8 @@ StatusObject DatabaseConfiguration::toJSON(bool noPolicies) const { result["storage_engine"] = "ssd-2"; } else if( tLogDataStoreType == KeyValueStoreType::SSD_BTREE_V2 && storageServerStoreType == KeyValueStoreType::SSD_REDWOOD_V1 ) { result["storage_engine"] = "ssd-redwood-experimental"; + } else if (tLogDataStoreType == KeyValueStoreType::SSD_BTREE_V2 && storageServerStoreType == KeyValueStoreType::SSD_ROCKSDB_V1) { + result["storage_engine"] = "ssd-rocksdb-experimental"; } else if( tLogDataStoreType == KeyValueStoreType::MEMORY && storageServerStoreType == KeyValueStoreType::MEMORY ) { result["storage_engine"] = "memory-1"; } else if( tLogDataStoreType == KeyValueStoreType::SSD_BTREE_V2 && storageServerStoreType == KeyValueStoreType::MEMORY_RADIXTREE ) { @@ -498,7 +500,7 @@ bool DatabaseConfiguration::isExcludedServer( NetworkAddressList a ) const { return get( encodeExcludedServersKey( AddressExclusion(a.address.ip, a.address.port) ) ).present() || get( encodeExcludedServersKey( AddressExclusion(a.address.ip) ) ).present() || get( encodeFailedServersKey( AddressExclusion(a.address.ip, a.address.port) ) ).present() || - get( encodeFailedServersKey( AddressExclusion(a.address.ip) ) ).present() || + get( encodeFailedServersKey( AddressExclusion(a.address.ip) ) ).present() || ( a.secondaryAddress.present() && ( get( encodeExcludedServersKey( AddressExclusion(a.secondaryAddress.get().ip, a.secondaryAddress.get().port) ) ).present() || get( encodeExcludedServersKey( AddressExclusion(a.secondaryAddress.get().ip) ) ).present() || diff --git a/fdbclient/FDBTypes.h b/fdbclient/FDBTypes.h index c6aff2a804..117414923c 100644 --- a/fdbclient/FDBTypes.h +++ b/fdbclient/FDBTypes.h @@ -670,6 +670,7 @@ struct KeyValueStoreType { SSD_BTREE_V2, SSD_REDWOOD_V1, MEMORY_RADIXTREE, + SSD_ROCKSDB_V1, END }; @@ -689,6 +690,7 @@ struct KeyValueStoreType { case SSD_BTREE_V1: return "ssd-1"; case SSD_BTREE_V2: return "ssd-2"; case SSD_REDWOOD_V1: return "ssd-redwood-experimental"; + case SSD_ROCKSDB_V1: return "ssd-rocksdb-experimental"; case MEMORY: return "memory"; case MEMORY_RADIXTREE: return "memory-radixtree-beta"; default: return "unknown"; diff --git a/fdbclient/ManagementAPI.actor.cpp b/fdbclient/ManagementAPI.actor.cpp index e10edf1121..e683a21d56 100644 --- a/fdbclient/ManagementAPI.actor.cpp +++ b/fdbclient/ManagementAPI.actor.cpp @@ -101,6 +101,9 @@ std::map configForToken( std::string const& mode ) { } else if (mode == "ssd-redwood-experimental") { logType = KeyValueStoreType::SSD_BTREE_V2; storeType = KeyValueStoreType::SSD_REDWOOD_V1; + } else if (mode == "ssd-rocksdb-experimental") { + logType = KeyValueStoreType::SSD_BTREE_V2; + storeType = KeyValueStoreType::SSD_ROCKSDB_V1; } else if (mode == "memory" || mode == "memory-2") { logType = KeyValueStoreType::SSD_BTREE_V2; storeType= KeyValueStoreType::MEMORY; diff --git a/fdbclient/Schemas.cpp b/fdbclient/Schemas.cpp index 6111ed1114..ee372d3bb0 100644 --- a/fdbclient/Schemas.cpp +++ b/fdbclient/Schemas.cpp @@ -150,7 +150,7 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema( "fractional_cost": 0.0, "estimated_cost":{ "hz": 0.0 - } + } } } ], @@ -612,6 +612,7 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema( "ssd-1", "ssd-2", "ssd-redwood-experimental", + "ssd-rocksdb-experimental", "memory", "memory-1", "memory-2", diff --git a/fdbserver/CMakeLists.txt b/fdbserver/CMakeLists.txt index 5d13a1e7f0..97efb6bac3 100644 --- a/fdbserver/CMakeLists.txt +++ b/fdbserver/CMakeLists.txt @@ -29,6 +29,7 @@ set(FDBSERVER_SRCS IVersionedStore.h KeyValueStoreCompressTestData.actor.cpp KeyValueStoreMemory.actor.cpp + KeyValueStoreRocksDB.actor.cpp KeyValueStoreSQLite.actor.cpp Knobs.cpp Knobs.h diff --git a/fdbserver/IKeyValueStore.h b/fdbserver/IKeyValueStore.h index d82b5e8cf9..3c82a0b7a8 100644 --- a/fdbserver/IKeyValueStore.h +++ b/fdbserver/IKeyValueStore.h @@ -87,6 +87,7 @@ protected: extern IKeyValueStore* keyValueStoreSQLite( std::string const& filename, UID logID, KeyValueStoreType storeType, bool checkChecksums=false, bool checkIntegrity=false ); extern IKeyValueStore* keyValueStoreRedwoodV1( std::string const& filename, UID logID); +extern IKeyValueStore* keyValueStoreRocksDB(std::string const& path, UID logID, KeyValueStoreType storeType, bool checkChecksums=false, bool checkIntegrity=false); extern IKeyValueStore* keyValueStoreMemory(std::string const& basename, UID logID, int64_t memoryLimit, std::string ext = "fdq", KeyValueStoreType storeType = KeyValueStoreType::MEMORY); @@ -102,8 +103,10 @@ inline IKeyValueStore* openKVStore( KeyValueStoreType storeType, std::string con return keyValueStoreMemory( filename, logID, memoryLimit ); case KeyValueStoreType::SSD_REDWOOD_V1: return keyValueStoreRedwoodV1( filename, logID ); - case KeyValueStoreType::MEMORY_RADIXTREE: - return keyValueStoreMemory(filename, logID, memoryLimit, "fdr", KeyValueStoreType::MEMORY_RADIXTREE); // for radixTree type, set file ext to "fdr" + case KeyValueStoreType::SSD_ROCKSDB_V1: + return keyValueStoreRocksDB(filename, logID, storeType); + case KeyValueStoreType::MEMORY_RADIXTREE: + return keyValueStoreMemory(filename, logID, memoryLimit, "fdr", KeyValueStoreType::MEMORY_RADIXTREE); // for radixTree type, set file ext to "fdr" default: UNREACHABLE(); } diff --git a/fdbserver/KeyValueStoreRocksDB.actor.cpp b/fdbserver/KeyValueStoreRocksDB.actor.cpp new file mode 100644 index 0000000000..0daafa8628 --- /dev/null +++ b/fdbserver/KeyValueStoreRocksDB.actor.cpp @@ -0,0 +1,429 @@ +#ifdef SSD_ROCKSDB_EXPERIMENTAL + +#include +#include +#include "flow/flow.h" +#include "fdbrpc/AsyncFileCached.actor.h" +#include "fdbserver/CoroFlow.h" + +#endif // SSD_ROCKSDB_EXPERIMENTAL + +#include "fdbserver/IKeyValueStore.h" +#include "flow/actorcompiler.h" // has to be last include + +#ifdef SSD_ROCKSDB_EXPERIMENTAL + +namespace { + +class FlowLogger : public rocksdb::Logger, public FastAllocated { + UID id; + std::string loggerName; + size_t logSize = 0; +public: + explicit FlowLogger(UID id, const std::string& loggerName, const rocksdb::InfoLogLevel log_level = rocksdb::InfoLogLevel::INFO_LEVEL) + : rocksdb::Logger(log_level) + , id(id) + , loggerName(loggerName) {} + + rocksdb::Status Close() override { return rocksdb::Status::OK(); } + + void Logv(const char* fmtString, va_list ap) override { + Logv(rocksdb::InfoLogLevel::INFO_LEVEL, fmtString, ap); + } + + void Logv(const rocksdb::InfoLogLevel log_level, const char* fmtString, va_list ap) override { + Severity sev; + switch (log_level) { + case rocksdb::InfoLogLevel::DEBUG_LEVEL: + sev = SevDebug; + break; + case rocksdb::InfoLogLevel::INFO_LEVEL: + case rocksdb::InfoLogLevel::HEADER_LEVEL: + case rocksdb::InfoLogLevel::NUM_INFO_LOG_LEVELS: + sev = SevInfo; + break; + case rocksdb::InfoLogLevel::WARN_LEVEL: + sev = SevWarn; + break; + case rocksdb::InfoLogLevel::ERROR_LEVEL: + sev = SevWarnAlways; + break; + case rocksdb::InfoLogLevel::FATAL_LEVEL: + sev = SevError; + break; + } + std::string outStr; + auto sz = vsformat(outStr, fmtString, ap); + if (sz < 0) { + TraceEvent(SevError, "RocksDBLogFormatError", id) + .detail("Logger", loggerName) + .detail("FormatString", fmtString); + return; + } + logSize += sz; + TraceEvent(sev, "RocksDBLogMessage", id) + .detail("Msg", outStr); + } + + size_t GetLogFileSize() const override { + return logSize; + } +}; + +rocksdb::Slice toSlice(StringRef s) { + return rocksdb::Slice(reinterpret_cast(s.begin()), s.size()); +} + +StringRef toStringRef(rocksdb::Slice s) { + return StringRef(reinterpret_cast(s.data()), s.size()); +} + +rocksdb::Options getOptions(const std::string& path) { + rocksdb::Options options; + bool exists = directoryExists(path); + options.create_if_missing = !exists; + return options; +} + +rocksdb::ColumnFamilyOptions getCFOptions() { + return {}; +} + +struct RocksDBKeyValueStore : IKeyValueStore { + using DB = rocksdb::DB*; + using CF = rocksdb::ColumnFamilyHandle*; + + struct Writer : IThreadPoolReceiver { + DB& db; + UID id; + + explicit Writer(DB& db, UID id) : db(db), id(id) {} + + ~Writer() { + if (db) { + delete db; + } + } + + void init() override {} + + Error statusToError(const rocksdb::Status& s) { + if (s == rocksdb::Status::IOError()) { + return io_error(); + } else { + return unknown_error(); + } + } + + struct OpenAction : TypedAction { + std::string path; + ThreadReturnPromise done; + + double getTimeEstimate() { + return SERVER_KNOBS->COMMIT_TIME_ESTIMATE; + } + }; + void action(OpenAction& a) { + std::vector defaultCF = { rocksdb::ColumnFamilyDescriptor{ + "default", getCFOptions() } }; + std::vector handle; + auto status = rocksdb::DB::Open(getOptions(a.path), a.path, defaultCF, &handle, &db); + if (!status.ok()) { + TraceEvent(SevError, "RocksDBError").detail("Error", status.ToString()).detail("Method", "Open"); + a.done.sendError(statusToError(status)); + } else { + a.done.send(Void()); + } + } + + struct CommitAction : TypedAction { + std::unique_ptr batchToCommit; + ThreadReturnPromise done; + double getTimeEstimate() override { return SERVER_KNOBS->COMMIT_TIME_ESTIMATE; } + }; + void action(CommitAction& a) { + rocksdb::WriteOptions options; + options.sync = true; + auto s = db->Write(options, a.batchToCommit.get()); + if (!s.ok()) { + TraceEvent(SevError, "RocksDBError").detail("Error", s.ToString()).detail("Method", "Commit"); + a.done.sendError(statusToError(s)); + } else { + a.done.send(Void()); + } + } + + struct CloseAction : TypedAction { + ThreadReturnPromise done; + double getTimeEstimate() override { return SERVER_KNOBS->COMMIT_TIME_ESTIMATE; } + }; + void action(CloseAction& a) { + auto s = db->Close(); + TraceEvent(SevError, "RocksDBError").detail("Error", s.ToString()).detail("Method", "Close"); + a.done.send(Void()); + } + }; + + struct Reader : IThreadPoolReceiver { + DB& db; + rocksdb::ReadOptions readOptions; + std::unique_ptr cursor = nullptr; + + explicit Reader(DB& db) + : db(db) + { + readOptions.total_order_seek = true; + } + + void init() override {} + + struct ReadValueAction : TypedAction { + Key key; + Optional debugID; + ThreadReturnPromise> result; + ReadValueAction(KeyRef key, Optional debugID) + : key(key), debugID(debugID) + {} + double getTimeEstimate() override { return SERVER_KNOBS->READ_VALUE_TIME_ESTIMATE; } + }; + void action(ReadValueAction& a) { + Optional traceBatch; + if (a.debugID.present()) { + traceBatch = { TraceBatch{} }; + traceBatch.get().addEvent("GetValueDebug", a.debugID.get().first(), "Reader.Before"); + } + rocksdb::PinnableSlice value; + auto s = db->Get(readOptions, db->DefaultColumnFamily(), toSlice(a.key), &value); + if (a.debugID.present()) { + traceBatch.get().addEvent("GetValueDebug", a.debugID.get().first(), "Reader.After"); + traceBatch.get().dump(); + } + if (s.ok()) { + a.result.send(Value(toStringRef(value))); + } else { + if (!s.IsNotFound()) { + TraceEvent(SevError, "RocksDBError").detail("Error", s.ToString()).detail("Method", "ReadValue"); + } + a.result.send(Optional()); + } + } + + struct ReadValuePrefixAction : TypedAction { + Key key; + int maxLength; + Optional debugID; + ThreadReturnPromise> result; + ReadValuePrefixAction(Key key, int maxLength, Optional debugID) : key(key), maxLength(maxLength), debugID(debugID) {}; + virtual double getTimeEstimate() { return SERVER_KNOBS->READ_VALUE_TIME_ESTIMATE; } + }; + void action(ReadValuePrefixAction& a) { + rocksdb::PinnableSlice value; + Optional traceBatch; + if (a.debugID.present()) { + traceBatch = { TraceBatch{} }; + traceBatch.get().addEvent("GetValuePrefixDebug", a.debugID.get().first(), + "Reader.Before"); //.detail("TaskID", g_network->getCurrentTask()); + } + auto s = db->Get(readOptions, db->DefaultColumnFamily(), toSlice(a.key), &value); + if (a.debugID.present()) { + traceBatch.get().addEvent("GetValuePrefixDebug", a.debugID.get().first(), + "Reader.After"); //.detail("TaskID", g_network->getCurrentTask()); + traceBatch.get().dump(); + } + if (s.ok()) { + a.result.send(Value(StringRef(reinterpret_cast(value.data()), + std::min(value.size(), size_t(a.maxLength))))); + } else { + TraceEvent(SevError, "RocksDBError").detail("Error", s.ToString()).detail("Method", "ReadValuePrefix"); + a.result.send(Optional()); + } + } + + struct ReadRangeAction : TypedAction, FastAllocated { + KeyRange keys; + int rowLimit, byteLimit; + ThreadReturnPromise> result; + ReadRangeAction(KeyRange keys, int rowLimit, int byteLimit) : keys(keys), rowLimit(rowLimit), byteLimit(byteLimit) {} + virtual double getTimeEstimate() { return SERVER_KNOBS->READ_RANGE_TIME_ESTIMATE; } + }; + void action(ReadRangeAction& a) { + auto cursor = std::unique_ptr(db->NewIterator(readOptions)); + Standalone result; + int accumulatedBytes = 0; + if (a.rowLimit >= 0) { + cursor->Seek(toSlice(a.keys.begin)); + while (cursor->Valid() && toStringRef(cursor->key()) < a.keys.end && result.size() < a.rowLimit && + accumulatedBytes < a.byteLimit) { + KeyValueRef kv(toStringRef(cursor->key()), toStringRef(cursor->value())); + accumulatedBytes += sizeof(KeyValueRef) + kv.expectedSize(); + result.push_back_deep(result.arena(), kv); + cursor->Next(); + } + } else { + cursor->Seek(toSlice(a.keys.end)); + if (!cursor->Valid()) { + cursor->SeekToLast(); + } else { + cursor->Prev(); + } + + while (cursor->Valid() && toStringRef(cursor->key()) >= a.keys.begin && result.size() < -a.rowLimit && + accumulatedBytes < a.byteLimit) { + KeyValueRef kv(toStringRef(cursor->key()), toStringRef(cursor->value())); + accumulatedBytes += sizeof(KeyValueRef) + kv.expectedSize(); + result.push_back_deep(result.arena(), kv); + cursor->Prev(); + } + } + auto s = cursor->status(); + if (!s.ok()) { + TraceEvent(SevError, "RocksDBError").detail("Error", s.ToString()).detail("Method", "ReadRange"); + } + result.more = (result.size() == a.rowLimit); + if (result.more) { + result.readThrough = result[result.size()-1].key; + } + a.result.send(result); + } + }; + + DB db = nullptr; + std::string path; + UID id; + size_t diskBytesUsed = 0; + Reference writeThread; + Reference readThreads; + unsigned nReaders = 2; + Promise errorPromise; + Promise closePromise; + std::unique_ptr writeBatch; + + explicit RocksDBKeyValueStore(const std::string& path, UID id) + : path(path) + , id(id) + { + writeThread = createGenericThreadPool(); + readThreads = createGenericThreadPool(); + writeThread->addThread(new Writer(db, id)); + for (unsigned i = 0; i < nReaders; ++i) { + readThreads->addThread(new Reader(db)); + } + } + + Future getError() override { + return errorPromise.getFuture(); + } + + ACTOR static void doClose(RocksDBKeyValueStore* self, bool deleteOnClose) { + wait(self->readThreads->stop()); + auto a = new Writer::CloseAction{}; + auto f = a->done.getFuture(); + self->writeThread->post(a); + wait(f); + wait(self->writeThread->stop()); + // TODO: delete data on close + if (self->closePromise.canBeSet()) self->closePromise.send(Void()); + if (self->errorPromise.canBeSet()) self->errorPromise.send(Never()); + if (deleteOnClose) { + std::vector defaultCF = { rocksdb::ColumnFamilyDescriptor{ + "default", getCFOptions() } }; + rocksdb::DestroyDB(self->path, getOptions(self->path), defaultCF); + } + delete self; + } + + Future onClosed() override { + return closePromise.getFuture(); + } + + void dispose() override { + doClose(this, true); + } + + void close() override { + doClose(this, false); + } + + KeyValueStoreType getType() override { + return KeyValueStoreType(KeyValueStoreType::SSD_ROCKSDB_V1); + } + + Future init() override { + std::unique_ptr a(new Writer::OpenAction()); + a->path = path; + auto res = a->done.getFuture(); + writeThread->post(a.release()); + return res; + } + + void set(KeyValueRef kv, const Arena*) override { + if (writeBatch == nullptr) { + writeBatch.reset(new rocksdb::WriteBatch()); + } + writeBatch->Put(toSlice(kv.key), toSlice(kv.value)); + } + + void clear(KeyRangeRef keyRange, const Arena*) override { + if (writeBatch == nullptr) { + writeBatch.reset(new rocksdb::WriteBatch()); + } + + writeBatch->DeleteRange(toSlice(keyRange.begin), toSlice(keyRange.end)); + } + + Future commit(bool) override { + // If there is nothing to write, don't write. + if (writeBatch == nullptr) { + return Void(); + } + auto a = new Writer::CommitAction(); + a->batchToCommit = std::move(writeBatch); + auto res = a->done.getFuture(); + writeThread->post(a); + return res; + } + + Future> readValue(KeyRef key, Optional debugID) override { + auto a = new Reader::ReadValueAction(key, debugID); + auto res = a->result.getFuture(); + readThreads->post(a); + return res; + } + + Future> readValuePrefix(KeyRef key, int maxLength, Optional debugID) override { + auto a = new Reader::ReadValuePrefixAction(key, maxLength, debugID); + auto res = a->result.getFuture(); + readThreads->post(a); + return res; + } + + Future> readRange(KeyRangeRef keys, int rowLimit, int byteLimit) override { + auto a = new Reader::ReadRangeAction(keys, rowLimit, byteLimit); + auto res = a->result.getFuture(); + readThreads->post(a); + return res; + } + + StorageBytes getStorageBytes() override { + int64_t free; + int64_t total; + + g_network->getDiskBytes(path, free, total); + + return StorageBytes(free, total, diskBytesUsed, free); + } +}; + +} // namespace + +#endif // SSD_ROCKSDB_EXPERIMENTAL + +IKeyValueStore* keyValueStoreRocksDB(std::string const& path, UID logID, KeyValueStoreType storeType, bool checkChecksums, bool checkIntegrity) { +#ifdef SSD_ROCKSDB_EXPERIMENTAL + return new RocksDBKeyValueStore(path, logID); +#else + TraceEvent(SevError, "RocksDBEngineInitFailure").detail("Reason", "Built without RocksDB"); + ASSERT(false); + return nullptr; +#endif // SSD_ROCKSDB_EXPERIMENTAL +} diff --git a/fdbserver/worker.actor.cpp b/fdbserver/worker.actor.cpp index a5fe4f36dd..64d281b9c8 100644 --- a/fdbserver/worker.actor.cpp +++ b/fdbserver/worker.actor.cpp @@ -254,6 +254,7 @@ std::pair bTreeV2Suffix = std::make_pair(KeyValu std::pair memorySuffix = std::make_pair( KeyValueStoreType::MEMORY, "-0.fdq" ); std::pair memoryRTSuffix = std::make_pair( KeyValueStoreType::MEMORY_RADIXTREE, "-0.fdr" ); std::pair redwoodSuffix = std::make_pair( KeyValueStoreType::SSD_REDWOOD_V1, ".redwood" ); +std::pair rocksdbSuffix = std::make_pair( KeyValueStoreType::SSD_ROCKSDB_V1, ".rocksdb" ); std::string validationFilename = "_validate"; @@ -266,6 +267,8 @@ std::string filenameFromSample( KeyValueStoreType storeType, std::string folder, return joinPath( folder, sample_filename.substr(0, sample_filename.size() - 5) ); else if ( storeType == KeyValueStoreType::SSD_REDWOOD_V1 ) return joinPath(folder, sample_filename); + else if (storeType == KeyValueStoreType::SSD_ROCKSDB_V1) + return joinPath(folder, sample_filename); UNREACHABLE(); } @@ -278,6 +281,8 @@ std::string filenameFromId( KeyValueStoreType storeType, std::string folder, std return joinPath( folder, prefix + id.toString() + "-" ); else if (storeType == KeyValueStoreType::SSD_REDWOOD_V1) return joinPath(folder, prefix + id.toString() + ".redwood"); + else if (storeType == KeyValueStoreType::SSD_ROCKSDB_V1) + return joinPath(folder, prefix + id.toString() + ".rocksdb"); UNREACHABLE(); } @@ -423,8 +428,10 @@ std::vector< DiskStore > getDiskStores( std::string folder ) { result.insert( result.end(), result2.begin(), result2.end() ); auto result3 = getDiskStores( folder, redwoodSuffix.second, redwoodSuffix.first); result.insert( result.end(), result3.begin(), result3.end() ); - auto result4 = getDiskStores( folder, memoryRTSuffix.second, memoryRTSuffix.first ); - result.insert( result.end(), result4.begin(), result4.end() ); + auto result4 = getDiskStores( folder, memoryRTSuffix.second, memoryRTSuffix.first ); + result.insert( result.end(), result4.begin(), result4.end() ); + auto result5 = getDiskStores( folder, rocksdbSuffix.second, rocksdbSuffix.first); + result.insert( result.end(), result3.begin(), result3.end() ); return result; } @@ -1078,7 +1085,7 @@ ACTOR Future workerServer( notUpdated = interf.updateServerDBInfo.getEndpoint(); } else if(localInfo.infoGeneration > dbInfo->get().infoGeneration || dbInfo->get().clusterInterface != ccInterface->get().get()) { - + TraceEvent("GotServerDBInfoChange").detail("ChangeID", localInfo.id).detail("MasterID", localInfo.master.id()) .detail("RatekeeperID", localInfo.ratekeeper.present() ? localInfo.ratekeeper.get().id() : UID()) .detail("DataDistributorID", localInfo.distributor.present() ? localInfo.distributor.get().id() : UID()); @@ -1347,7 +1354,7 @@ ACTOR Future workerServer( DUMPTOKEN( recruited.getQueuingMetrics ); DUMPTOKEN( recruited.confirmRunning ); - errorForwarders.add( zombie(recruited, forwardError( errors, Role::LOG_ROUTER, recruited.id(), + errorForwarders.add( zombie(recruited, forwardError( errors, Role::LOG_ROUTER, recruited.id(), logRouter( recruited, req, dbInfo ) ) ) ); req.reply.send(recruited); } @@ -1386,6 +1393,9 @@ ACTOR Future workerServer( } else if (d.storeType == KeyValueStoreType::SSD_REDWOOD_V1) { included = fileExists(d.filename + "0.pagerlog") && fileExists(d.filename + "1.pagerlog"); + } + else if (d.storeType == KeyValueStoreType::SSD_ROCKSDB_V1) { + included = fileExists(joinPath(d.filename, "CURRENT")) && fileExists(joinPath(d.filename, "IDENTITY")); } else if (d.storeType == KeyValueStoreType::MEMORY) { included = fileExists(d.filename + "1.fdq"); } else { diff --git a/fdbserver/workloads/KVStoreTest.actor.cpp b/fdbserver/workloads/KVStoreTest.actor.cpp index 080f0237b2..f0007f033e 100755 --- a/fdbserver/workloads/KVStoreTest.actor.cpp +++ b/fdbserver/workloads/KVStoreTest.actor.cpp @@ -373,6 +373,8 @@ ACTOR Future testKVStore(KVStoreTestWorkload* workload) { test.store = keyValueStoreSQLite(fn, id, KeyValueStoreType::SSD_REDWOOD_V1); else if (workload->storeType == "ssd-redwood-experimental") test.store = keyValueStoreRedwoodV1(fn, id); + else if (workload->storeType == "ssd-rocksdb-experimental") + test.store = keyValueStoreRocksDB(fn, id, KeyValueStoreType::SSD_ROCKSDB_V1); else if (workload->storeType == "memory") test.store = keyValueStoreMemory(fn, id, 500e6); else if (workload->storeType == "memory-radixtree-beta") @@ -398,4 +400,4 @@ ACTOR Future testKVStore(KVStoreTestWorkload* workload) { wait(c); if (err.code() != invalid_error_code) throw err; return Void(); -} \ No newline at end of file +} diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index f24e73b4c6..4b095cd78d 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -75,6 +75,7 @@ if(WITH_PYTHON) add_fdb_test(TEST_FILES RedwoodPerfSet.txt IGNORE) add_fdb_test(TEST_FILES RedwoodPerfPrefixCompression.txt IGNORE) add_fdb_test(TEST_FILES RedwoodPerfSequentialInsert.txt IGNORE) + add_fdb_test(TEST_FILES RocksDBTest.txt IGNORE) add_fdb_test(TEST_FILES SampleNoSimAttrition.txt IGNORE) if (NOT USE_UBSAN) # TODO re-enable in UBSAN after https://github.com/apple/foundationdb/issues/2410 is resolved add_fdb_test(TEST_FILES SimpleExternalTest.txt) diff --git a/tests/RocksDBTest.txt b/tests/RocksDBTest.txt new file mode 100644 index 0000000000..a1aeb2d32b --- /dev/null +++ b/tests/RocksDBTest.txt @@ -0,0 +1,48 @@ +testTitle=Insert +testName=KVStoreTest +testDuration=0.0 +operationsPerSecond=28000 +commitFraction=0.001 +setFraction=0.01 +nodeCount=20000000 +keyBytes=16 +valueBytes=96 +filename=bttest +storeType=ssd-rocksdb-experimental +setup=true +clear=false +count=false +useDB=false + +testTitle=RandomWriteSaturation +testName=KVStoreTest +testDuration=20.0 +saturation=true +operationsPerSecond=10000 +commitFraction=0.00005 +setFraction=1.0 +nodeCount=20000000 +keyBytes=16 +valueBytes=96 +filename=bttest +storeType=ssd-rocksdb-experimental +setup=false +clear=false +count=false +useDB=false + +testTitle=Scan +testName=KVStoreTest +testDuration=20.0 +operationsPerSecond=28000 +commitFraction=0.0001 +setFraction=0.01 +nodeCount=20000000 +keyBytes=16 +valueBytes=96 +filename=bttest +storeType=ssd-rocksdb-experimental +setup=false +clear=false +count=true +useDB=false From a959c6eb236620739114f3efc2faca136e91e07b Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Mon, 15 Jun 2020 16:48:19 +0000 Subject: [PATCH 21/30] Fix copy/paste error --- fdbserver/worker.actor.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fdbserver/worker.actor.cpp b/fdbserver/worker.actor.cpp index 64d281b9c8..9573dc476e 100644 --- a/fdbserver/worker.actor.cpp +++ b/fdbserver/worker.actor.cpp @@ -431,7 +431,7 @@ std::vector< DiskStore > getDiskStores( std::string folder ) { auto result4 = getDiskStores( folder, memoryRTSuffix.second, memoryRTSuffix.first ); result.insert( result.end(), result4.begin(), result4.end() ); auto result5 = getDiskStores( folder, rocksdbSuffix.second, rocksdbSuffix.first); - result.insert( result.end(), result3.begin(), result3.end() ); + result.insert( result.end(), result5.begin(), result5.end() ); return result; } From 2a9fd611c5ece35da4f1b727d28da7aa969e6bbd Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Mon, 15 Jun 2020 16:51:42 +0000 Subject: [PATCH 22/30] Don't log ok status on DB close --- fdbserver/KeyValueStoreRocksDB.actor.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/fdbserver/KeyValueStoreRocksDB.actor.cpp b/fdbserver/KeyValueStoreRocksDB.actor.cpp index 0daafa8628..3ee3c3c538 100644 --- a/fdbserver/KeyValueStoreRocksDB.actor.cpp +++ b/fdbserver/KeyValueStoreRocksDB.actor.cpp @@ -159,7 +159,9 @@ struct RocksDBKeyValueStore : IKeyValueStore { }; void action(CloseAction& a) { auto s = db->Close(); - TraceEvent(SevError, "RocksDBError").detail("Error", s.ToString()).detail("Method", "Close"); + if (!s.ok()) { + TraceEvent(SevError, "RocksDBError").detail("Error", s.ToString()).detail("Method", "Close"); + } a.done.send(Void()); } }; From fdc14d9c5791648ee18a91452374fbc8a0b613b5 Mon Sep 17 00:00:00 2001 From: Evan Tschannen Date: Mon, 15 Jun 2020 11:06:39 -0700 Subject: [PATCH 23/30] update version to 6.3.3 --- CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 5e11e2ceab..dfcf412132 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -18,7 +18,7 @@ # limitations under the License. cmake_minimum_required(VERSION 3.13) project(foundationdb - VERSION 6.3.2 + VERSION 6.3.3 DESCRIPTION "FoundationDB is a scalable, fault-tolerant, ordered key-value store with full ACID transactions." HOMEPAGE_URL "http://www.foundationdb.org/" LANGUAGES C CXX ASM) From b788fb1e60dcb08217121041abb5161d714b42ef Mon Sep 17 00:00:00 2001 From: Evan Tschannen Date: Mon, 15 Jun 2020 11:06:39 -0700 Subject: [PATCH 24/30] update installer WIX GUID following release --- packaging/msi/FDBInstaller.wxs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packaging/msi/FDBInstaller.wxs b/packaging/msi/FDBInstaller.wxs index 92aa3fa86e..5a2bd02c5d 100644 --- a/packaging/msi/FDBInstaller.wxs +++ b/packaging/msi/FDBInstaller.wxs @@ -32,7 +32,7 @@ Date: Mon, 15 Jun 2020 12:12:08 -0700 Subject: [PATCH 25/30] Added comment to TransactionOptions --- fdbclient/NativeAPI.actor.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/fdbclient/NativeAPI.actor.h b/fdbclient/NativeAPI.actor.h index c0d6966100..0ee9c7b008 100644 --- a/fdbclient/NativeAPI.actor.h +++ b/fdbclient/NativeAPI.actor.h @@ -137,6 +137,8 @@ struct TransactionOptions { TagSet tags; // All tags set on transaction TagSet readTags; // Tags that can be sent with read requests + // update clear function if you add a new field + TransactionOptions(Database const& cx); TransactionOptions(); From 9888fa7a107ef906740aba82751491bd5210a34c Mon Sep 17 00:00:00 2001 From: "A.J. Beamon" Date: Mon, 15 Jun 2020 14:46:16 -0700 Subject: [PATCH 26/30] Use a reference for m_commitReadLock to avoid invalid destruction. --- fdbserver/VersionedBTree.actor.cpp | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/fdbserver/VersionedBTree.actor.cpp b/fdbserver/VersionedBTree.actor.cpp index a7f4d490f3..f06a7a470d 100644 --- a/fdbserver/VersionedBTree.actor.cpp +++ b/fdbserver/VersionedBTree.actor.cpp @@ -2993,7 +2993,7 @@ public: VersionedBTree(IPager2* pager, std::string name) : m_pager(pager), m_writeVersion(invalidVersion), m_lastCommittedVersion(invalidVersion), m_pBuffer(nullptr), - m_commitReadLock(SERVER_KNOBS->REDWOOD_COMMIT_CONCURRENT_READS), m_name(name) { + m_commitReadLock(new FlowLock(SERVER_KNOBS->REDWOOD_COMMIT_CONCURRENT_READS)), m_name(name) { m_lazyClearActor = 0; m_init = init_impl(this); @@ -3441,7 +3441,7 @@ private: Version m_writeVersion; Version m_lastCommittedVersion; Version m_newOldestVersion; - FlowLock m_commitReadLock; + Reference m_commitReadLock; Future m_latestCommit; Future m_init; std::string m_name; @@ -4134,8 +4134,9 @@ private: state Version writeVersion = self->getLastCommittedVersion() + 1; - wait(self->m_commitReadLock.take()); - state FlowLock::Releaser readLock(self->m_commitReadLock); + state Reference commitReadLock = self->m_commitReadLock; + wait(commitReadLock->take()); + state FlowLock::Releaser readLock(*commitReadLock); state Reference page = wait(readPage(snapshot, rootID, update->decodeLowerBound, update->decodeUpperBound)); readLock.release(); From 7a5fe3800da41ae24b788716a5914272ad48c1e1 Mon Sep 17 00:00:00 2001 From: Chaoguang Lin Date: Mon, 15 Jun 2020 15:31:55 -0700 Subject: [PATCH 27/30] Add comments, refine traces, add delay to dd rpc calls in the test --- fdbclient/SpecialKeySpace.actor.cpp | 3 ++ .../DataDistributionMetrics.actor.cpp | 33 ++++++++++--------- 2 files changed, 20 insertions(+), 16 deletions(-) diff --git a/fdbclient/SpecialKeySpace.actor.cpp b/fdbclient/SpecialKeySpace.actor.cpp index 56f47f69b5..d5fcdf12eb 100644 --- a/fdbclient/SpecialKeySpace.actor.cpp +++ b/fdbclient/SpecialKeySpace.actor.cpp @@ -37,6 +37,9 @@ std::unordered_map SpecialKeySpace::moduleToB // This function will move the given KeySelector as far as possible to the standard form: // orEqual == false && offset == 1 (Standard form) // If the corresponding key is not in the underlying key range, it will move over the range +// The cache object is used to cache the first read result from the rpc call during the key resolution, +// then when we need to do key resolution or result filtering, +// we, instead of rpc call, read from this cache object have consistent results ACTOR Future moveKeySelectorOverRangeActor(const SpecialKeyRangeBaseImpl* skrImpl, ReadYourWritesTransaction* ryw, KeySelector* ks, Optional>* cache) { ASSERT(!ks->orEqual); // should be removed before calling diff --git a/fdbserver/workloads/DataDistributionMetrics.actor.cpp b/fdbserver/workloads/DataDistributionMetrics.actor.cpp index 782f8c282e..6855f7c267 100644 --- a/fdbserver/workloads/DataDistributionMetrics.actor.cpp +++ b/fdbserver/workloads/DataDistributionMetrics.actor.cpp @@ -31,6 +31,7 @@ struct DataDistributionMetricsWorkload : KVWorkload { double testDuration; std::string keyPrefix; PerfIntCounter commits, errors; + double delayPerLoop; DataDistributionMetricsWorkload(WorkloadContext const& wcx) : KVWorkload(wcx), numShards(0), avgBytes(0), commits("Commits"), errors("Errors") { @@ -38,6 +39,7 @@ struct DataDistributionMetricsWorkload : KVWorkload { keyPrefix = getOption(options, LiteralStringRef("keyPrefix"), LiteralStringRef("DDMetrics")).toString(); readPerTx = getOption(options, LiteralStringRef("readPerTransaction"), 1); writePerTx = getOption(options, LiteralStringRef("writePerTransaction"), 5 * readPerTx); + delayPerLoop = getOption(options, LiteralStringRef("delayPerLoop"), 0.1); // throttling dd rpc calls ASSERT(nodeCount > 1); } @@ -63,7 +65,6 @@ struct DataDistributionMetricsWorkload : KVWorkload { } catch (Error& e) { wait(tr.onError(e)); } - tr.reset(); } } @@ -72,6 +73,7 @@ struct DataDistributionMetricsWorkload : KVWorkload { Reference(new ReadYourWritesTransaction(cx)); loop { try { + wait(delay(self->delayPerLoop)); int startIndex = deterministicRandom()->randomInt(0, self->nodeCount - 1); int endIndex = deterministicRandom()->randomInt(startIndex + 1, self->nodeCount); state Key startKey = self->keyForIndex(startIndex); @@ -88,13 +90,7 @@ struct DataDistributionMetricsWorkload : KVWorkload { wait(tr->getRange(begin, end, GetRangeLimits(CLIENT_KNOBS->SHARD_COUNT_LIMIT))); // Condition #1 and #2 can be broken if multiple rpc calls happened in one getRange if (result.size() > 1) { - if (result[0].key <= begin.getKey() && result[1].key > begin.getKey()) { - TraceEvent(SevDebug, "DDMetricsConsistencyTest") - .detail("Size", result.size()) - .detail("FirstKey", result[0].key.toString()) - .detail("SecondKey", result[1].key.toString()) - .detail("BeginKeySelector", begin.toString()); - } else { + if (result[0].key > begin.getKey() || result[1].key <= begin.getKey()) { ++self->errors; TraceEvent(SevError, "TestFailure") .detail("Reason", "Result mismatches the given begin selector") @@ -103,13 +99,7 @@ struct DataDistributionMetricsWorkload : KVWorkload { .detail("SecondKey", result[1].key.toString()) .detail("BeginKeySelector", begin.toString()); } - if (result[result.size() - 1].key >= end.getKey() && result[result.size() - 2].key < end.getKey()) { - TraceEvent(SevDebug, "DDMetricsConsistencyTest") - .detail("Size", result.size()) - .detail("LastKey", result[result.size() - 1].key.toString()) - .detail("SecondLastKey", result[result.size() - 2].key.toString()) - .detail("EndKeySelector", end.toString()); - } else { + if (result[result.size() - 1].key < end.getKey() || result[result.size() - 2].key >= end.getKey()) { ++self->errors; TraceEvent(SevError, "TestFailure") .detail("Reason", "Result mismatches the given end selector") @@ -118,11 +108,22 @@ struct DataDistributionMetricsWorkload : KVWorkload { .detail("SecondKey", result[result.size() - 2].key.toString()) .detail("EndKeySelector", end.toString()); } + // Debugging traces + // TraceEvent(SevDebug, "DDMetricsConsistencyTest") + // .detail("Size", result.size()) + // .detail("FirstKey", result[0].key.toString()) + // .detail("SecondKey", result[1].key.toString()) + // .detail("BeginKeySelector", begin.toString()); + // TraceEvent(SevDebug, "DDMetricsConsistencyTest") + // .detail("Size", result.size()) + // .detail("LastKey", result[result.size() - 1].key.toString()) + // .detail("SecondLastKey", result[result.size() - 2].key.toString()) + // .detail("EndKeySelector", end.toString()); } } catch (Error& e) { // Ignore timed_out error and cross_module_read, the end key selector may read through the end if (e.code() == error_code_timed_out || e.code() == error_code_special_keys_cross_module_read) continue; - TraceEvent(SevDebug, "FailedToRetrieveDDMetrics").detail("Error", e.what()); + TraceEvent(SevDebug, "FailedToRetrieveDDMetrics").error(e); wait(tr->onError(e)); } } From ad68e4441993ae2f8f2cc7f4446f9ad58d1d9980 Mon Sep 17 00:00:00 2001 From: Steve Atherton Date: Tue, 16 Jun 2020 03:38:51 -0700 Subject: [PATCH 28/30] Change Redwood read concurrency lock to a reference because it must be able to live longer than the storage engine itself due to destruction order of actors if the storage engine is shut down while read operations are in progress. --- fdbserver/VersionedBTree.actor.cpp | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/fdbserver/VersionedBTree.actor.cpp b/fdbserver/VersionedBTree.actor.cpp index f06a7a470d..8032b66262 100644 --- a/fdbserver/VersionedBTree.actor.cpp +++ b/fdbserver/VersionedBTree.actor.cpp @@ -5444,7 +5444,7 @@ RedwoodRecordRef VersionedBTree::dbEnd(LiteralStringRef("\xff\xff\xff\xff\xff")) class KeyValueStoreRedwoodUnversioned : public IKeyValueStore { public: KeyValueStoreRedwoodUnversioned(std::string filePrefix, UID logID) - : m_filePrefix(filePrefix), m_concurrentReads(SERVER_KNOBS->REDWOOD_KVSTORE_CONCURRENT_READS) { + : m_filePrefix(filePrefix), m_concurrentReads(new FlowLock(SERVER_KNOBS->REDWOOD_KVSTORE_CONCURRENT_READS)) { // TODO: This constructor should really just take an IVersionedStore IPager2* pager = new DWALPager(SERVER_KNOBS->REDWOOD_DEFAULT_PAGE_SIZE, filePrefix, 0); m_tree = new VersionedBTree(pager, filePrefix); @@ -5520,8 +5520,9 @@ public: state VersionedBTree::BTreeCursor cur; wait(self->m_tree->initBTreeCursor(&cur, self->m_tree->getLastCommittedVersion())); - wait(self->m_concurrentReads.take()); - state FlowLock::Releaser releaser(self->m_concurrentReads); + state Reference readLock = self->m_concurrentReads; + wait(readLock->take()); + state FlowLock::Releaser releaser(*readLock); ++g_redwoodMetrics.opGetRange; state Standalone result; @@ -5600,8 +5601,9 @@ public: state VersionedBTree::BTreeCursor cur; wait(self->m_tree->initBTreeCursor(&cur, self->m_tree->getLastCommittedVersion())); - wait(self->m_concurrentReads.take()); - state FlowLock::Releaser releaser(self->m_concurrentReads); + state Reference readLock = self->m_concurrentReads; + wait(readLock->take()); + state FlowLock::Releaser releaser(*readLock); ++g_redwoodMetrics.opGet; wait(cur.seekGTE(key, 0)); @@ -5620,8 +5622,9 @@ public: state VersionedBTree::BTreeCursor cur; wait(self->m_tree->initBTreeCursor(&cur, self->m_tree->getLastCommittedVersion())); - wait(self->m_concurrentReads.take()); - state FlowLock::Releaser releaser(self->m_concurrentReads); + state Reference readLock = self->m_concurrentReads; + wait(readLock->take()); + state FlowLock::Releaser releaser(*readLock); ++g_redwoodMetrics.opGet; wait(cur.seekGTE(key, 0)); @@ -5646,7 +5649,7 @@ private: Future m_init; Promise m_closed; Promise m_error; - FlowLock m_concurrentReads; + Reference m_concurrentReads; template inline Future catchError(Future f) { From 6e18b6454ab67f5628a8d300c427c75cb4cb19c5 Mon Sep 17 00:00:00 2001 From: Andrew Noyes Date: Tue, 16 Jun 2020 10:13:57 -0700 Subject: [PATCH 29/30] Remove obsolete file --- tests/SnowflakeJenkins | 146 ----------------------------------------- 1 file changed, 146 deletions(-) delete mode 100644 tests/SnowflakeJenkins diff --git a/tests/SnowflakeJenkins b/tests/SnowflakeJenkins deleted file mode 100644 index 4f66452607..0000000000 --- a/tests/SnowflakeJenkins +++ /dev/null @@ -1,146 +0,0 @@ -def buildScmInfo -stage("Build") { - node('test-dynamic-slave') { - cleanWs() - - sfScmInfo = checkout([$class: 'GitSCM', - branches: [[name: '*']], - doGenerateSubmoduleConfigurations: false, - extensions: [[$class: 'RelativeTargetDirectory', relativeTargetDir: 'snowflake']], - submoduleCfg: [], - userRemoteConfigs: [[credentialsId: 'a0395839-84c7-4ceb-90e2-bcf66b2d6885', url: 'ssh://bitbucket-internal.int.snowflakecomputing.com:7999/opfdb/fdb_snowflake.git']] - ]) - println("$sfScmInfo") - - buildScmInfo = checkout([ - $class: 'GitSCM', - branches: scm.branches, - doGenerateSubmoduleConfigurations: scm.doGenerateSubmoduleConfigurations, - extensions: scm.extensions, - userRemoteConfigs: scm.userRemoteConfigs, - extensions: [[$class: 'RelativeTargetDirectory', relativeTargetDir: 'snowflake/jenkins/foundationdb']] - ]) - println("$buildScmInfo") - - sh """ - |export GIT_SPECIFIER=${buildScmInfo.GIT_COMMIT} - |virtualenv -p python3.4 venv - |source venv/bin/activate - |pip3 install docker-compose - |docker-compose --version - |git config --global user.name jenkins - |git config --global user.email fdb-devs@snowflake.net - |cd snowflake/jenkins - |./build.sh check_uploaded package sql sql_upload upload - """.stripMargin() - } -} - -def makeTestStep(iteration) { - return { - node("test-dynamic-slave") { - cleanWs() - sfScmInfo = checkout([$class: 'GitSCM', - branches: [[name: '*']], - doGenerateSubmoduleConfigurations: false, - extensions: [[$class: 'RelativeTargetDirectory', relativeTargetDir: 'snowflake']], - submoduleCfg: [], - userRemoteConfigs: [[credentialsId: 'a0395839-84c7-4ceb-90e2-bcf66b2d6885', url: 'ssh://bitbucket-internal.int.snowflakecomputing.com:7999/opfdb/fdb_snowflake.git']] - ]) - println("$sfScmInfo") - - scmInfo = checkout([ - $class: 'GitSCM', - branches: scm.branches, - doGenerateSubmoduleConfigurations: scm.doGenerateSubmoduleConfigurations, - extensions: scm.extensions, - userRemoteConfigs: scm.userRemoteConfigs, - extensions: [[$class: 'RelativeTargetDirectory', relativeTargetDir: 'snowflake/jenkins/foundationdb']] - ]) - println("$scmInfo") - sh """ - |# Clean up the jenkins output; gets messy with too many iterations - |set +x - |exec 3>&1 - |exec 1> \$WORKSPACE/setup_${iteration}.log - |exec 2>&1 - | - |export GIT_SPECIFIER=${scmInfo.GIT_COMMIT} - |virtualenv -p python3.4 venv - |source venv/bin/activate - |pip3 install docker-compose - |docker-compose --version - |git config --global user.name jenkins - |git config --global user.email fdb-devs@snowflake.net - | - |cd snowflake/jenkins - |echo Iteration ${iteration} building >&3 - |./build.sh configure download test sql sql_upload > \$WORKSPACE/iteration_${iteration}.log 2>&1 - |rc=\$? - |seed=\$(find . -name traces.json -exec grep -m 1 CMakeSEED {} \\; | awk '{print \$2}' | head -1 | tr -d '"}') - |echo Iteration ${iteration} completed with \$rc - seed \$seed >&3 - |mv \$WORKSPACE/iteration_${iteration}.log \$WORKSPACE/iteration_${iteration}_\${seed}.log - |find . -name traces.json -exec gzip -c {} > \$WORKSPACE/traces_${iteration}_\${seed}.json.gz \\; - |#cat \$WORKSPACE/iteration_${iteration}.log - """.stripMargin() - archiveArtifacts artifacts: 'setup_*log,iteration_*log,traces_*.json.gz', - optional: true, - onlyIfSuccessful: false - } - } -} - -stage("Test") { - def testSteps = [:] - for (int i = 0; i < 4; i++) { - testSteps["Iteration ${i}"] = makeTestStep(i) - } - println(testSteps) - - parallel testSteps - build job: "NotifyGitHub", - parameters: [ - string(name: 'pr_branch', value: buildScmInfo.GIT_BRANCH), - string(name: 'publish_url', value: "https://foo.bar/stuff") - ], - propagate: false -} -stage("Report") { - node('test-dynamic-slave') { - cleanWs() - - sfScmInfo = checkout([$class: 'GitSCM', - branches: [[name: '*']], - doGenerateSubmoduleConfigurations: false, - extensions: [[$class: 'RelativeTargetDirectory', relativeTargetDir: 'snowflake']], - submoduleCfg: [], - userRemoteConfigs: [[credentialsId: 'a0395839-84c7-4ceb-90e2-bcf66b2d6885', url: 'ssh://bitbucket-internal.int.snowflakecomputing.com:7999/opfdb/fdb_snowflake.git']] - ]) - println("$sfScmInfo") - - buildScmInfo = checkout([ - $class: 'GitSCM', - branches: scm.branches, - doGenerateSubmoduleConfigurations: scm.doGenerateSubmoduleConfigurations, - extensions: scm.extensions, - userRemoteConfigs: scm.userRemoteConfigs, - extensions: [[$class: 'RelativeTargetDirectory', relativeTargetDir: 'snowflake/jenkins/foundationdb']] - ]) - println("$buildScmInfo") - - sh """ - |export GIT_SPECIFIER=${buildScmInfo.GIT_COMMIT} - |virtualenv -p python3.4 venv - |source venv/bin/activate - |git config --global user.name jenkins - |git config --global user.email fdb-devs@snowflake.net - |cd snowflake/jenkins - |./build.sh sql_create_report - |GIT_TREE=(\$(cd foundationdb && git rev-parse HEAD^{tree})) - |cp -f fdb6-report.txt fdb6-report-\${GIT_TREE}.txt - """.stripMargin() - archiveArtifacts artifacts: '**/fdb6-report-*.txt', - optional: true, - onlyIfSuccessful: false - } -} From 327cc31e354d80ca00003a2673bd19a6760285d4 Mon Sep 17 00:00:00 2001 From: Jingyu Zhou Date: Tue, 16 Jun 2020 12:32:42 -0700 Subject: [PATCH 30/30] Revert "Request tracing" --- fdbclient/DatabaseContext.h | 4 +- fdbclient/FDBTypes.h | 1 - fdbclient/MasterProxyInterface.h | 33 ++--- fdbclient/NativeAPI.actor.cpp | 106 +++++----------- fdbclient/NativeAPI.actor.h | 9 +- fdbclient/StorageServerInterface.h | 31 ++--- fdbclient/vexillographer/fdb.options | 2 - fdbserver/BackupWorker.actor.cpp | 5 +- fdbserver/MasterInterface.h | 9 +- fdbserver/MasterProxyServer.actor.cpp | 59 +++------ fdbserver/ResolverInterface.h | 9 +- fdbserver/StorageCache.actor.cpp | 6 +- fdbserver/fdbserver.actor.cpp | 25 +--- fdbserver/masterserver.actor.cpp | 1 - fdbserver/storageserver.actor.cpp | 73 +++++------ .../workloads/ConsistencyCheck.actor.cpp | 7 +- fdbserver/workloads/Cycle.actor.cpp | 27 ++--- flow/Arena.h | 3 - flow/CMakeLists.txt | 3 - flow/FileTraceLogWriter.cpp | 42 +------ flow/FileTraceLogWriter.h | 31 +---- flow/IThreadPool.h | 36 ------ flow/ITrace.h | 61 ---------- flow/Trace.cpp | 63 +++++++++- flow/Trace.h | 32 ++++- flow/Tracing.cpp | 82 ------------- flow/Tracing.h | 114 ------------------ 27 files changed, 214 insertions(+), 660 deletions(-) delete mode 100644 flow/ITrace.h delete mode 100644 flow/Tracing.cpp delete mode 100644 flow/Tracing.h diff --git a/fdbclient/DatabaseContext.h b/fdbclient/DatabaseContext.h index e5e3cac40f..433f0967c2 100644 --- a/fdbclient/DatabaseContext.h +++ b/fdbclient/DatabaseContext.h @@ -223,13 +223,11 @@ public: bool enableLocalityLoadBalance; struct VersionRequest { - SpanID spanContext; Promise reply; TagSet tags; Optional debugID; - VersionRequest(SpanID spanContext, TagSet tags = TagSet(), Optional debugID = Optional()) - : spanContext(spanContext), tags(tags), debugID(debugID) {} + VersionRequest(TagSet tags = TagSet(), Optional debugID = Optional()) : tags(tags), debugID(debugID) {} }; // Transaction start request batching diff --git a/fdbclient/FDBTypes.h b/fdbclient/FDBTypes.h index 20f11fda72..ef2e772d9e 100644 --- a/fdbclient/FDBTypes.h +++ b/fdbclient/FDBTypes.h @@ -36,7 +36,6 @@ typedef uint64_t Sequence; typedef StringRef KeyRef; typedef StringRef ValueRef; typedef int64_t Generation; -typedef UID SpanID; enum { tagLocalitySpecial = -1, diff --git a/fdbclient/MasterProxyInterface.h b/fdbclient/MasterProxyInterface.h index ae564c704e..7216015535 100644 --- a/fdbclient/MasterProxyInterface.h +++ b/fdbclient/MasterProxyInterface.h @@ -153,7 +153,6 @@ struct CommitTransactionRequest : TimedRequest { bool firstInBatch() const { return (flags & FLAG_FIRST_IN_BATCH) != 0; } Arena arena; - SpanID spanContext; CommitTransactionRef transaction; ReplyPromise reply; uint32_t flags; @@ -163,7 +162,7 @@ struct CommitTransactionRequest : TimedRequest { template void serialize(Ar& ar) { - serializer(ar, transaction, reply, arena, flags, debugID, spanContext); + serializer(ar, transaction, reply, arena, flags, debugID); } }; @@ -210,7 +209,6 @@ struct GetReadVersionRequest : TimedRequest { FLAG_PRIORITY_MASK = PRIORITY_SYSTEM_IMMEDIATE, }; - SpanID spanContext; uint32_t transactionCount; uint32_t flags; TransactionPriority priority; @@ -221,11 +219,9 @@ struct GetReadVersionRequest : TimedRequest { ReplyPromise reply; GetReadVersionRequest() : transactionCount(1), flags(0) {} - GetReadVersionRequest(SpanID spanContext, uint32_t transactionCount, TransactionPriority priority, - uint32_t flags = 0, TransactionTagMap tags = TransactionTagMap(), - Optional debugID = Optional()) - : spanContext(spanContext), transactionCount(transactionCount), priority(priority), flags(flags), tags(tags), - debugID(debugID) { + GetReadVersionRequest(uint32_t transactionCount, TransactionPriority priority, uint32_t flags = 0, TransactionTagMap tags = TransactionTagMap(), Optional debugID = Optional()) + : transactionCount(transactionCount), priority(priority), flags(flags), tags(tags), debugID(debugID) + { flags = flags & ~FLAG_PRIORITY_MASK; switch(priority) { case TransactionPriority::BATCH: @@ -241,12 +237,12 @@ struct GetReadVersionRequest : TimedRequest { ASSERT(false); } } - + bool operator < (GetReadVersionRequest const& rhs) const { return priority < rhs.priority; } template void serialize(Ar& ar) { - serializer(ar, transactionCount, flags, tags, debugID, reply, spanContext); + serializer(ar, transactionCount, flags, tags, debugID, reply); if(ar.isDeserializing) { if((flags & PRIORITY_SYSTEM_IMMEDIATE) == PRIORITY_SYSTEM_IMMEDIATE) { @@ -279,7 +275,6 @@ struct GetKeyServerLocationsReply { struct GetKeyServerLocationsRequest { constexpr static FileIdentifier file_identifier = 9144680; Arena arena; - SpanID spanContext; KeyRef begin; Optional end; int limit; @@ -287,28 +282,24 @@ struct GetKeyServerLocationsRequest { ReplyPromise reply; GetKeyServerLocationsRequest() : limit(0), reverse(false) {} - GetKeyServerLocationsRequest(SpanID spanContext, KeyRef const& begin, Optional const& end, int limit, - bool reverse, Arena const& arena) - : spanContext(spanContext), begin(begin), end(end), limit(limit), reverse(reverse), arena(arena) {} - - template + GetKeyServerLocationsRequest( KeyRef const& begin, Optional const& end, int limit, bool reverse, Arena const& arena ) : begin( begin ), end( end ), limit( limit ), reverse( reverse ), arena( arena ) {} + + template void serialize(Ar& ar) { - serializer(ar, begin, end, limit, reverse, reply, spanContext, arena); + serializer(ar, begin, end, limit, reverse, reply, arena); } }; struct GetRawCommittedVersionRequest { constexpr static FileIdentifier file_identifier = 12954034; - SpanID spanContext; Optional debugID; ReplyPromise reply; - explicit GetRawCommittedVersionRequest(SpanID spanContext, Optional const& debugID = Optional()) : spanContext(spanContext), debugID(debugID) {} - explicit GetRawCommittedVersionRequest() : spanContext(), debugID() {} + explicit GetRawCommittedVersionRequest(Optional const& debugID = Optional()) : debugID(debugID) {} template void serialize( Ar& ar ) { - serializer(ar, debugID, reply, spanContext); + serializer(ar, debugID, reply); } }; diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 443ada0de9..6d5a06b8bb 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -36,7 +36,6 @@ #include "fdbclient/ClusterInterface.h" #include "fdbclient/CoordinationInterface.h" #include "fdbclient/DatabaseContext.h" -#include "fdbclient/FDBOptions.g.h" #include "fdbclient/KeyRangeMap.h" #include "fdbclient/Knobs.h" #include "fdbclient/ManagementAPI.actor.h" @@ -47,7 +46,6 @@ #include "fdbclient/SpecialKeySpace.actor.h" #include "fdbclient/StorageServerInterface.h" #include "fdbclient/SystemData.h" -#include "fdbclient/versions.h" #include "fdbrpc/LoadBalance.h" #include "fdbrpc/Net2FileSystem.h" #include "fdbrpc/simulator.h" @@ -61,10 +59,12 @@ #include "flow/Platform.h" #include "flow/SystemMonitor.h" #include "flow/TLSConfig.actor.h" -#include "flow/Tracing.h" +#include "flow/Trace.h" #include "flow/UnitTest.h" #include "flow/serialize.h" +#include "fdbclient/versions.h" + #ifdef WIN32 #define WIN32_LEAN_AND_MEAN #include @@ -1539,7 +1539,6 @@ ACTOR Future>> transactionalGetServerInt //If isBackward == true, returns the shard containing the key before 'key' (an infinitely long, inexpressible key). Otherwise returns the shard containing key ACTOR Future< pair> > getKeyLocation_internal( Database cx, Key key, TransactionInfo info, bool isBackward = false ) { - state Span span("NAPI:getKeyLocation"_loc, { info.span->context }); if (isBackward) { ASSERT( key != allKeys.begin && key <= allKeys.end ); } else { @@ -1553,10 +1552,7 @@ ACTOR Future< pair> > getKeyLocation_internal( ++cx->transactionKeyServerLocationRequests; choose { when ( wait( cx->onMasterProxiesChanged() ) ) {} - when(GetKeyServerLocationsReply rep = wait(basicLoadBalance( - cx->getMasterProxies(info.useProvisionalProxies), &MasterProxyInterface::getKeyServersLocations, - GetKeyServerLocationsRequest(span->context, key, Optional(), 100, isBackward, key.arena()), - TaskPriority::DefaultPromiseEndpoint))) { + when ( GetKeyServerLocationsReply rep = wait( basicLoadBalance( cx->getMasterProxies(info.useProvisionalProxies), &MasterProxyInterface::getKeyServersLocations, GetKeyServerLocationsRequest(key, Optional(), 100, isBackward, key.arena()), TaskPriority::DefaultPromiseEndpoint ) ) ) { ++cx->transactionKeyServerLocationRequestsCompleted; if( info.debugID.present() ) g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocation.After"); @@ -1592,7 +1588,6 @@ Future>> getKeyLocation(Database const& c } ACTOR Future< vector< pair> > > getKeyRangeLocations_internal( Database cx, KeyRange keys, int limit, bool reverse, TransactionInfo info ) { - state Span span("NAPI:getKeyRangeLocations"_loc, { info.span->context }); if( info.debugID.present() ) g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocations.Before"); @@ -1600,10 +1595,7 @@ ACTOR Future< vector< pair> > > getKeyRangeLoca ++cx->transactionKeyServerLocationRequests; choose { when ( wait( cx->onMasterProxiesChanged() ) ) {} - when(GetKeyServerLocationsReply _rep = wait(basicLoadBalance( - cx->getMasterProxies(info.useProvisionalProxies), &MasterProxyInterface::getKeyServersLocations, - GetKeyServerLocationsRequest(span->context, keys.begin, keys.end, limit, reverse, keys.arena()), - TaskPriority::DefaultPromiseEndpoint))) { + when ( GetKeyServerLocationsReply _rep = wait( basicLoadBalance( cx->getMasterProxies(info.useProvisionalProxies), &MasterProxyInterface::getKeyServersLocations, GetKeyServerLocationsRequest(keys.begin, keys.end, limit, reverse, keys.arena()), TaskPriority::DefaultPromiseEndpoint ) ) ) { ++cx->transactionKeyServerLocationRequestsCompleted; state GetKeyServerLocationsReply rep = _rep; if( info.debugID.present() ) @@ -1694,7 +1686,6 @@ Future Transaction::warmRange(Database cx, KeyRange keys) { ACTOR Future> getValue( Future version, Key key, Database cx, TransactionInfo info, Reference trLogInfo, TagSet tags ) { state Version ver = wait( version ); - state Span span("NAPI:getValue"_loc, { info.span->context }); cx->validateVersion(ver); loop { @@ -1727,12 +1718,10 @@ ACTOR Future> getValue( Future version, Key key, Databa } choose { when(wait(cx->connectionFileChanged())) { throw transaction_too_old(); } - when(GetValueReply _reply = wait( - loadBalance(cx.getPtr(), ssi.second, &StorageServerInterface::getValue, - GetValueRequest(span->context, key, ver, - cx->sampleReadTags() ? tags : Optional(), getValueID), - TaskPriority::DefaultPromiseEndpoint, false, - cx->enableLocalityLoadBalance ? &cx->queueModel : nullptr))) { + when(GetValueReply _reply = + wait(loadBalance(cx.getPtr(), ssi.second, &StorageServerInterface::getValue, + GetValueRequest(key, ver, cx->sampleReadTags() ? tags : Optional(), getValueID), TaskPriority::DefaultPromiseEndpoint, false, + cx->enableLocalityLoadBalance ? &cx->queueModel : nullptr))) { reply = _reply; } } @@ -1790,7 +1779,6 @@ ACTOR Future getKey( Database cx, KeySelector k, Future version, T wait(success(version)); state Optional getKeyID = Optional(); - state Span span("NAPI:getKey"_loc, { info.span->context }); if( info.debugID.present() ) { getKeyID = nondeterministicRandom()->randomUniqueID(); @@ -1819,11 +1807,9 @@ ACTOR Future getKey( Database cx, KeySelector k, Future version, T choose { when(wait(cx->connectionFileChanged())) { throw transaction_too_old(); } when(GetKeyReply _reply = - wait(loadBalance(cx.getPtr(), ssi.second, &StorageServerInterface::getKey, - GetKeyRequest(span->context, k, version.get(), - cx->sampleReadTags() ? tags : Optional(), getKeyID), - TaskPriority::DefaultPromiseEndpoint, false, - cx->enableLocalityLoadBalance ? &cx->queueModel : nullptr))) { + wait(loadBalance(cx.getPtr(), ssi.second, &StorageServerInterface::getKey, GetKeyRequest(k, version.get(), cx->sampleReadTags() ? tags : Optional(), getKeyID), + TaskPriority::DefaultPromiseEndpoint, false, + cx->enableLocalityLoadBalance ? &cx->queueModel : nullptr))) { reply = _reply; } } @@ -1856,15 +1842,12 @@ ACTOR Future getKey( Database cx, KeySelector k, Future version, T } } -ACTOR Future waitForCommittedVersion( Database cx, Version version, SpanID spanContext ) { - state Span span("NAPI:waitForCommittedVersion"_loc, { spanContext }); +ACTOR Future waitForCommittedVersion( Database cx, Version version ) { try { loop { choose { when ( wait( cx->onMasterProxiesChanged() ) ) {} - when(GetReadVersionReply v = wait(basicLoadBalance( - cx->getMasterProxies(false), &MasterProxyInterface::getConsistentReadVersion, - GetReadVersionRequest(span->context, 0, TransactionPriority::IMMEDIATE), cx->taskID))) { + when ( GetReadVersionReply v = wait( basicLoadBalance( cx->getMasterProxies(false), &MasterProxyInterface::getConsistentReadVersion, GetReadVersionRequest( 0, TransactionPriority::IMMEDIATE ), cx->taskID ) ) ) { cx->minAcceptableReadVersion = std::min(cx->minAcceptableReadVersion, v.version); if (v.version >= version) @@ -1880,14 +1863,11 @@ ACTOR Future waitForCommittedVersion( Database cx, Version version, Spa } } -ACTOR Future getRawVersion( Database cx, SpanID spanContext ) { - state Span span("NAPI:getRawVersion"_loc, { spanContext }); +ACTOR Future getRawVersion( Database cx ) { loop { choose { when ( wait( cx->onMasterProxiesChanged() ) ) {} - when(GetReadVersionReply v = - wait(basicLoadBalance(cx->getMasterProxies(false), &MasterProxyInterface::getConsistentReadVersion, - GetReadVersionRequest(spanContext, 0, TransactionPriority::IMMEDIATE), cx->taskID))) { + when ( GetReadVersionReply v = wait( basicLoadBalance( cx->getMasterProxies(false), &MasterProxyInterface::getConsistentReadVersion, GetReadVersionRequest( 0, TransactionPriority::IMMEDIATE ), cx->taskID ) ) ) { return v.version; } } @@ -1901,7 +1881,6 @@ ACTOR Future readVersionBatcher( ACTOR Future watchValue(Future version, Key key, Optional value, Database cx, TransactionInfo info, TagSet tags) { state Version ver = wait( version ); - state Span span(deterministicRandom()->randomUniqueID(), "NAPI:watchValue"_loc, { info.span->context }); cx->validateVersion(ver); ASSERT(ver != latestVersion); @@ -1918,11 +1897,9 @@ ACTOR Future watchValue(Future version, Key key, Optional } state WatchValueReply resp; choose { - when(WatchValueReply r = wait( - loadBalance(cx.getPtr(), ssi.second, &StorageServerInterface::watchValue, - WatchValueRequest(span->context, key, value, ver, - cx->sampleReadTags() ? tags : Optional(), watchValueID), - TaskPriority::DefaultPromiseEndpoint))) { + when(WatchValueReply r = wait(loadBalance(cx.getPtr(), ssi.second, &StorageServerInterface::watchValue, + WatchValueRequest(key, value, ver, cx->sampleReadTags() ? tags : Optional(), watchValueID), + TaskPriority::DefaultPromiseEndpoint))) { resp = r; } when(wait(cx->connectionFile ? cx->connectionFile->onChange() : Never())) { wait(Never()); } @@ -1933,7 +1910,7 @@ ACTOR Future watchValue(Future version, Key key, Optional //FIXME: wait for known committed version on the storage server before replying, //cannot do this until the storage server is notified on knownCommittedVersion changes from tlog (faster than the current update loop) - Version v = wait(waitForCommittedVersion(cx, resp.version, span->context)); + Version v = wait(waitForCommittedVersion(cx, resp.version)); //TraceEvent("WatcherCommitted").detail("CommittedVersion", v).detail("WatchVersion", resp.version).detail("Key", key ).detail("Value", value); @@ -1986,7 +1963,6 @@ ACTOR Future> getExactRange( Database cx, Version ver KeyRange keys, GetRangeLimits limits, bool reverse, TransactionInfo info, TagSet tags ) { state Standalone output; - state Span span("NAPI:getExactRange"_loc, { info.span->context }); //printf("getExactRange( '%s', '%s' )\n", keys.begin.toString().c_str(), keys.end.toString().c_str()); loop { @@ -2000,7 +1976,6 @@ ACTOR Future> getExactRange( Database cx, Version ver req.version = version; req.begin = firstGreaterOrEqual( range.begin ); req.end = firstGreaterOrEqual( range.end ); - req.spanContext = span->context; transformRangeLimits(limits, reverse, req); ASSERT(req.limitBytes > 0 && req.limit != 0 && req.limit < 0 == reverse); @@ -2245,7 +2220,6 @@ ACTOR Future> getRange( Database cx, Reference output; - state Span span("NAPI:getRange"_loc, info.span); try { state Version version = wait( fVersion ); @@ -2298,7 +2272,6 @@ ACTOR Future> getRange( Database cx, ReferencesampleReadTags() ? tags : Optional(); req.debugID = info.debugID; - req.spanContext = span->context; try { if( info.debugID.present() ) { g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getRange.Before"); @@ -2636,7 +2609,7 @@ ACTOR Future watch(Reference watch, Database cx, TagSet tags, Trans } Future Transaction::getRawReadVersion() { - return ::getRawVersion(cx, info.span->context); + return ::getRawVersion(cx); } Future< Void > Transaction::watch( Reference watch ) { @@ -2999,7 +2972,6 @@ void Transaction::reset() { void Transaction::fullReset() { reset(); - info.span = Span(info.span->location); backoff = CLIENT_KNOBS->DEFAULT_BACKOFF; } @@ -3116,8 +3088,6 @@ ACTOR void checkWrites( Database cx, Future committed, Promise outCo ACTOR static Future commitDummyTransaction( Database cx, KeyRange range, TransactionInfo info, TransactionOptions options ) { state Transaction tr(cx); state int retries = 0; - state Span span("NAPI:dummyTransaction"_loc, info.span); - tr.info.span->parents.insert(span->context); loop { try { TraceEvent("CommitDummyTransaction").detail("Key", range.begin).detail("Retries", retries); @@ -3164,8 +3134,6 @@ void Transaction::setupWatches() { ACTOR static Future tryCommit( Database cx, Reference trLogInfo, CommitTransactionRequest req, Future readVersion, TransactionInfo info, Version* pCommittedVersion, Transaction* tr, TransactionOptions options) { state TraceInterval interval( "TransactionCommit" ); state double startTime = now(); - state Span span("NAPI:tryCommit"_loc, { info.span->context }); - req.spanContext = span->context; if (info.debugID.present()) TraceEvent(interval.begin()).detail( "Parent", info.debugID.get() ); try { @@ -3589,14 +3557,6 @@ void Transaction::setOption( FDBTransactionOptions::Option option, Optionalparents.emplace(BinaryReader::fromStringRef(value.get(), Unversioned())); - break; - case FDBTransactionOptions::REPORT_CONFLICTING_KEYS: validateOptionValue(value, false); options.reportConflictingKeys = true; @@ -3607,16 +3567,13 @@ void Transaction::setOption( FDBTransactionOptions::Option option, Optional getConsistentReadVersion(Span parentSpan, DatabaseContext* cx, uint32_t transactionCount, - TransactionPriority priority, uint32_t flags, - TransactionTagMap tags, Optional debugID) { - state Span span("NAPI:getConsistentReadVersion"_loc, parentSpan); +ACTOR Future getConsistentReadVersion( DatabaseContext *cx, uint32_t transactionCount, TransactionPriority priority, uint32_t flags, TransactionTagMap tags, Optional debugID ) { try { ++cx->transactionReadVersionBatches; if( debugID.present() ) g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "NativeAPI.getConsistentReadVersion.Before"); loop { - state GetReadVersionRequest req( span->context, transactionCount, priority, flags, tags, debugID ); + state GetReadVersionRequest req( transactionCount, priority, flags, tags, debugID ); choose { when ( wait( cx->onMasterProxiesChanged() ) ) {} when ( GetReadVersionReply v = wait( basicLoadBalance( cx->getMasterProxies(flags & GetReadVersionRequest::FLAG_USE_PROVISIONAL_PROXIES), &MasterProxyInterface::getConsistentReadVersion, req, cx->taskID ) ) ) { @@ -3667,7 +3624,6 @@ ACTOR Future readVersionBatcher( DatabaseContext *cx, FutureStream replyTimes; state PromiseStream _errorStream; state double batchTime = 0; - state Span span("NAPI:readVersionBatcher"_loc); loop { send_batch = false; choose { @@ -3678,7 +3634,6 @@ ACTOR Future readVersionBatcher( DatabaseContext *cx, FutureStreamparents.insert(req.spanContext); requests.push_back(req.reply); for(auto tag : req.tags) { ++tags[tag]; @@ -3706,10 +3661,9 @@ ACTOR Future readVersionBatcher( DatabaseContext *cx, FutureStream batch = incrementalBroadcastWithError( - getConsistentReadVersion(span, cx, count, priority, flags, std::move(tags), std::move(debugID)), + getConsistentReadVersion(cx, count, priority, flags, std::move(tags), std::move(debugID)), std::move(requests), CLIENT_KNOBS->BROADCAST_BATCH_SIZE); - span = Span("NAPI:readVersionBatcher"_loc); tags.clear(); debugID = Optional(); requests.clear(); @@ -3719,11 +3673,7 @@ ACTOR Future readVersionBatcher( DatabaseContext *cx, FutureStream extractReadVersion(Span parentSpan, DatabaseContext* cx, TransactionPriority priority, - Reference trLogInfo, Future f, - bool lockAware, double startTime, Promise> metadataVersion, - TagSet tags) { - // parentSpan here is only used to keep the parent alive until the request completes +ACTOR Future extractReadVersion(DatabaseContext* cx, TransactionPriority priority, Reference trLogInfo, Future f, bool lockAware, double startTime, Promise> metadataVersion, TagSet tags) { GetReadVersionReply rep = wait(f); double latency = now() - startTime; cx->GRVLatencies.addSample(latency); @@ -3845,12 +3795,10 @@ Future Transaction::getReadVersion(uint32_t flags) { batcher.actor = readVersionBatcher( cx.getPtr(), batcher.stream.getFuture(), options.priority, flags ); } - Span span("NAPI:getReadVersion"_loc, info.span); - auto const req = DatabaseContext::VersionRequest(span->context, options.tags, info.debugID); + auto const req = DatabaseContext::VersionRequest(options.tags, info.debugID); batcher.stream.send(req); startTime = now(); - readVersion = extractReadVersion(span, cx.getPtr(), options.priority, trLogInfo, req.reply.getFuture(), - options.lockAware, startTime, metadataVersion, options.tags); + readVersion = extractReadVersion( cx.getPtr(), options.priority, trLogInfo, req.reply.getFuture(), options.lockAware, startTime, metadataVersion, options.tags); } return readVersion; } diff --git a/fdbclient/NativeAPI.actor.h b/fdbclient/NativeAPI.actor.h index 429706e9b6..2015b20fe3 100644 --- a/fdbclient/NativeAPI.actor.h +++ b/fdbclient/NativeAPI.actor.h @@ -19,8 +19,6 @@ */ #pragma once -#include "flow/IRandom.h" -#include "flow/Tracing.h" #if defined(NO_INTELLISENSE) && !defined(FDBCLIENT_NATIVEAPI_ACTOR_G_H) #define FDBCLIENT_NATIVEAPI_ACTOR_G_H #include "fdbclient/NativeAPI.actor.g.h" @@ -154,16 +152,13 @@ class ReadYourWritesTransaction; // workaround cyclic dependency struct TransactionInfo { Optional debugID; TaskPriority taskID; - Span span; bool useProvisionalProxies; // Used to save conflicting keys if FDBTransactionOptions::REPORT_CONFLICTING_KEYS is enabled // prefix/ : '1' - any keys equal or larger than this key are (probably) conflicting keys // prefix/ : '0' - any keys equal or larger than this key are (definitely) not conflicting keys std::shared_ptr> conflictingKeys; - explicit TransactionInfo(TaskPriority taskID) - : taskID(taskID), span(deterministicRandom()->randomUniqueID(), "Transaction"_loc), useProvisionalProxies(false) { - } + explicit TransactionInfo( TaskPriority taskID ) : taskID(taskID), useProvisionalProxies(false) {} }; struct TransactionLogInfo : public ReferenceCounted, NonCopyable { @@ -339,7 +334,7 @@ private: Future committing; }; -ACTOR Future waitForCommittedVersion(Database cx, Version version, SpanID spanContext); +ACTOR Future waitForCommittedVersion(Database cx, Version version); ACTOR Future>> waitDataDistributionMetricsList(Database cx, KeyRange keys, int shardLimit); diff --git a/fdbclient/StorageServerInterface.h b/fdbclient/StorageServerInterface.h index bbf97c7047..cfd8c54ec7 100644 --- a/fdbclient/StorageServerInterface.h +++ b/fdbclient/StorageServerInterface.h @@ -169,7 +169,6 @@ struct GetValueReply : public LoadBalancedReply { struct GetValueRequest : TimedRequest { constexpr static FileIdentifier file_identifier = 8454530; - SpanID spanContext; Key key; Version version; Optional tags; @@ -177,12 +176,11 @@ struct GetValueRequest : TimedRequest { ReplyPromise reply; GetValueRequest(){} - GetValueRequest(SpanID spanContext, const Key& key, Version ver, Optional tags, Optional debugID) - : spanContext(spanContext), key(key), version(ver), tags(tags), debugID(debugID) {} - - template + GetValueRequest(const Key& key, Version ver, Optional tags, Optional debugID) : key(key), version(ver), tags(tags), debugID(debugID) {} + + template void serialize( Ar& ar ) { - serializer(ar, key, version, tags, debugID, reply, spanContext); + serializer(ar, key, version, tags, debugID, reply); } }; @@ -202,7 +200,6 @@ struct WatchValueReply { struct WatchValueRequest { constexpr static FileIdentifier file_identifier = 14747733; - SpanID spanContext; Key key; Optional value; Version version; @@ -211,13 +208,11 @@ struct WatchValueRequest { ReplyPromise reply; WatchValueRequest(){} - WatchValueRequest(SpanID spanContext, const Key& key, Optional value, Version ver, Optional tags, - Optional debugID) - : spanContext(spanContext), key(key), value(value), version(ver), tags(tags), debugID(debugID) {} - - template + WatchValueRequest(const Key& key, Optional value, Version ver, Optional tags, Optional debugID) : key(key), value(value), version(ver), tags(tags), debugID(debugID) {} + + template void serialize( Ar& ar ) { - serializer(ar, key, value, version, tags, debugID, reply, spanContext); + serializer(ar, key, value, version, tags, debugID, reply); } }; @@ -239,7 +234,6 @@ struct GetKeyValuesReply : public LoadBalancedReply { struct GetKeyValuesRequest : TimedRequest { constexpr static FileIdentifier file_identifier = 6795746; - SpanID spanContext; Arena arena; KeySelectorRef begin, end; Version version; // or latestVersion @@ -252,7 +246,7 @@ struct GetKeyValuesRequest : TimedRequest { GetKeyValuesRequest() : isFetchKeys(false) {} template void serialize( Ar& ar ) { - serializer(ar, begin, end, version, limit, limitBytes, isFetchKeys, tags, debugID, reply, spanContext, arena); + serializer(ar, begin, end, version, limit, limitBytes, isFetchKeys, tags, debugID, reply, arena); } }; @@ -272,7 +266,6 @@ struct GetKeyReply : public LoadBalancedReply { struct GetKeyRequest : TimedRequest { constexpr static FileIdentifier file_identifier = 10457870; - SpanID spanContext; Arena arena; KeySelectorRef sel; Version version; // or latestVersion @@ -281,13 +274,11 @@ struct GetKeyRequest : TimedRequest { ReplyPromise reply; GetKeyRequest() {} - GetKeyRequest(SpanID spanContext, KeySelectorRef const& sel, Version version, Optional tags, - Optional debugID) - : spanContext(spanContext), sel(sel), version(version), debugID(debugID) {} + GetKeyRequest(KeySelectorRef const& sel, Version version, Optional tags, Optional debugID) : sel(sel), version(version), debugID(debugID) {} template void serialize( Ar& ar ) { - serializer(ar, sel, version, tags, debugID, reply, spanContext, arena); + serializer(ar, sel, version, tags, debugID, reply, arena); } }; diff --git a/fdbclient/vexillographer/fdb.options b/fdbclient/vexillographer/fdb.options index 86af0f3f5c..d7463e5845 100644 --- a/fdbclient/vexillographer/fdb.options +++ b/fdbclient/vexillographer/fdb.options @@ -268,8 +268,6 @@ description is not currently required but encouraged. description="Adds a tag to the transaction that can be used to apply manual targeted throttling. At most 5 tags can be set on a transaction." />