From cf30c47a292acbcead6336dd189746cb21238b13 Mon Sep 17 00:00:00 2001 From: "A.J. Beamon" Date: Thu, 20 Jun 2019 11:21:05 -0700 Subject: [PATCH 01/22] If onError fails with cluster_version_changed, retry the error on the new transaction. --- fdbclient/MultiVersionTransaction.actor.cpp | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/fdbclient/MultiVersionTransaction.actor.cpp b/fdbclient/MultiVersionTransaction.actor.cpp index c2ef5ad1b6..3fb0f675d1 100644 --- a/fdbclient/MultiVersionTransaction.actor.cpp +++ b/fdbclient/MultiVersionTransaction.actor.cpp @@ -588,7 +588,20 @@ ThreadFuture MultiVersionTransaction::onError(Error const& e) { else { auto tr = getTransaction(); auto f = tr.transaction ? tr.transaction->onError(e) : ThreadFuture(Never()); - return abortableFuture(f, tr.onChange); + f = abortableFuture(f, tr.onChange); + + return flatMapThreadFuture(f, [this, e](ErrorOr ready) { + if(!ready.isError() || ready.getError().code() != error_code_cluster_version_changed) { + if(ready.isError()) { + return ErrorOr>(ready.getError()); + } + + return ErrorOr>(Void()); + } + + updateTransaction(); + return ErrorOr>(onError(e)); + }); } } From 9e62bbdd762e90be73847c9e46f13fb15eebf10e Mon Sep 17 00:00:00 2001 From: "A.J. Beamon" Date: Thu, 20 Jun 2019 11:25:13 -0700 Subject: [PATCH 02/22] Update release notes. --- documentation/sphinx/source/release-notes.rst | 2 ++ 1 file changed, 2 insertions(+) diff --git a/documentation/sphinx/source/release-notes.rst b/documentation/sphinx/source/release-notes.rst index f2a9813030..98a2e73a13 100644 --- a/documentation/sphinx/source/release-notes.rst +++ b/documentation/sphinx/source/release-notes.rst @@ -14,6 +14,8 @@ Performance Fixes ----- +* If a cluster is upgraded during an `onError` call, the cluster could return a `cluster_version_changed` error. `(PR #1734) `_. + Status ------ From 9a55b1fdb5482028562a78b98dbea3f2f2ffbd5f Mon Sep 17 00:00:00 2001 From: "A.J. Beamon" Date: Fri, 21 Jun 2019 09:09:46 -0700 Subject: [PATCH 03/22] Documentation fix --- documentation/sphinx/source/release-notes.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/documentation/sphinx/source/release-notes.rst b/documentation/sphinx/source/release-notes.rst index 98a2e73a13..c4d248ba48 100644 --- a/documentation/sphinx/source/release-notes.rst +++ b/documentation/sphinx/source/release-notes.rst @@ -14,7 +14,7 @@ Performance Fixes ----- -* If a cluster is upgraded during an `onError` call, the cluster could return a `cluster_version_changed` error. `(PR #1734) `_. +* If a cluster is upgraded during an ``onError`` call, the cluster could return a ``cluster_version_changed`` error. `(PR #1734) `_. Status ------ From 0a94f96deefff8505130067fe082b4f14a613b0e Mon Sep 17 00:00:00 2001 From: sramamoorthy Date: Tue, 25 Jun 2019 16:17:45 -0700 Subject: [PATCH 04/22] sev40 if knownCommittedVersion > recoveryVersion --- fdbserver/TagPartitionedLogSystem.actor.cpp | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/fdbserver/TagPartitionedLogSystem.actor.cpp b/fdbserver/TagPartitionedLogSystem.actor.cpp index 2e25daae3b..5cfb33a1db 100644 --- a/fdbserver/TagPartitionedLogSystem.actor.cpp +++ b/fdbserver/TagPartitionedLogSystem.actor.cpp @@ -1477,7 +1477,16 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedrejoins = rejoins; logSystem->lockResults = lockResults; logSystem->recoverAt = minEnd; - logSystem->knownCommittedVersion = knownCommittedVersion; + if (knownCommittedVersion > minEnd) { + // FIXME: Remove the Sev40 once disk snapshot v2 feature is enabled, in all other + // code paths we should never be here. + TraceEvent(SevError, "KCVIsInvalid") + .detail("KnownCommittedVersion", knownCommittedVersion) + .detail("MinEnd", minEnd); + logSystem->knownCommittedVersion = minEnd; + } else { + logSystem->knownCommittedVersion = knownCommittedVersion; + } logSystem->remoteLogsWrittenToCoreState = true; logSystem->stopped = true; logSystem->pseudoLocalities = prevState.pseudoLocalities; From 3a246c1d259ab74ba527b937db1942c16f4227bf Mon Sep 17 00:00:00 2001 From: Andrew Noyes Date: Fri, 28 Jun 2019 14:16:45 -0700 Subject: [PATCH 05/22] Remove -fdiagnostics-color=always This way you can run with -DCMAKE_COLOR_MAKEFILE=0 in CI and not get garbled escape characters in your log --- cmake/ConfigureCompiler.cmake | 1 - 1 file changed, 1 deletion(-) diff --git a/cmake/ConfigureCompiler.cmake b/cmake/ConfigureCompiler.cmake index d989283033..0d7d6a67a8 100644 --- a/cmake/ConfigureCompiler.cmake +++ b/cmake/ConfigureCompiler.cmake @@ -161,7 +161,6 @@ else() -Wno-deprecated -fvisibility=hidden -Wreturn-type - -fdiagnostics-color=always -fPIC) if (GPERFTOOLS_FOUND AND GCC) add_compile_options( From b77338af805274740e795738331ac9ca10e6cbd3 Mon Sep 17 00:00:00 2001 From: Andrew Noyes Date: Fri, 28 Jun 2019 17:00:00 -0700 Subject: [PATCH 06/22] Add -DCMAKE_COLOR_MAKEFILE=0 to docker-compose.yaml --- build/docker-compose.yaml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/build/docker-compose.yaml b/build/docker-compose.yaml index 1519a0f93f..575fd9dfc2 100644 --- a/build/docker-compose.yaml +++ b/build/docker-compose.yaml @@ -60,7 +60,7 @@ services: snapshot-cmake: &snapshot-cmake <<: *build-setup - command: scl enable devtoolset-8 python27 rh-python36 rh-ruby24 -- bash -c 'mkdir -p "$${BUILD_DIR}" && cd "$${BUILD_DIR}" && cmake -DFDB_RELEASE=0 -DVALGRIND=0 /__this_is_some_very_long_name_dir_needed_to_fix_a_bug_with_debug_rpms__/foundationdb && make -j "$${MAKEJOBS}" packages preinstall && cpack' + command: scl enable devtoolset-8 python27 rh-python36 rh-ruby24 -- bash -c 'mkdir -p "$${BUILD_DIR}" && cd "$${BUILD_DIR}" && cmake -DCMAKE_COLOR_MAKEFILE=0 -DFDB_RELEASE=0 -DVALGRIND=0 /__this_is_some_very_long_name_dir_needed_to_fix_a_bug_with_debug_rpms__/foundationdb && make -j "$${MAKEJOBS}" packages preinstall && cpack' prb-cmake: <<: *snapshot-cmake @@ -68,7 +68,7 @@ services: snapshot-ctest: &snapshot-ctest <<: *build-setup - command: scl enable devtoolset-8 python27 rh-python36 rh-ruby24 -- bash -c 'mkdir -p "$${BUILD_DIR}" && cd "$${BUILD_DIR}" && cmake -DFDB_RELEASE=1 /__this_is_some_very_long_name_dir_needed_to_fix_a_bug_with_debug_rpms__/foundationdb && make -j "$${MAKEJOBS}" && ctest -L fast -j "$${MAKEJOBS}" --output-on-failure' + command: scl enable devtoolset-8 python27 rh-python36 rh-ruby24 -- bash -c 'mkdir -p "$${BUILD_DIR}" && cd "$${BUILD_DIR}" && cmake -DCMAKE_COLOR_MAKEFILE=0 -DFDB_RELEASE=1 /__this_is_some_very_long_name_dir_needed_to_fix_a_bug_with_debug_rpms__/foundationdb && make -j "$${MAKEJOBS}" && ctest -L fast -j "$${MAKEJOBS}" --output-on-failure' prb-ctest: <<: *snapshot-ctest @@ -76,7 +76,7 @@ services: snapshot-correctness: &snapshot-correctness <<: *build-setup - command: scl enable devtoolset-8 python27 rh-python36 rh-ruby24 -- bash -c 'mkdir -p "$${BUILD_DIR}" && cd "$${BUILD_DIR}" && cmake -DFDB_RELEASE=1 /__this_is_some_very_long_name_dir_needed_to_fix_a_bug_with_debug_rpms__/foundationdb && make -j "$${MAKEJOBS}" && ctest -j "$${MAKEJOBS}" --output-on-failure + command: scl enable devtoolset-8 python27 rh-python36 rh-ruby24 -- bash -c 'mkdir -p "$${BUILD_DIR}" && cd "$${BUILD_DIR}" && cmake -DCMAKE_COLOR_MAKEFILE=0 -DFDB_RELEASE=1 /__this_is_some_very_long_name_dir_needed_to_fix_a_bug_with_debug_rpms__/foundationdb && make -j "$${MAKEJOBS}" && ctest -j "$${MAKEJOBS}" --output-on-failure prb-correctness: <<: *snapshot-correctness From e2d7929b73df04a738a34ad5cf03fe2f921571bf Mon Sep 17 00:00:00 2001 From: Jingyu Zhou Date: Sun, 30 Jun 2019 08:21:26 -0700 Subject: [PATCH 07/22] Remove redundant actor compiler flags. --- cmake/FlowCommands.cmake | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmake/FlowCommands.cmake b/cmake/FlowCommands.cmake index 29abc1efdf..d4ddd41c05 100644 --- a/cmake/FlowCommands.cmake +++ b/cmake/FlowCommands.cmake @@ -180,12 +180,12 @@ function(add_flow_target) list(APPEND generated_files ${CMAKE_CURRENT_BINARY_DIR}/${generated}) if(WIN32) add_custom_command(OUTPUT "${CMAKE_CURRENT_BINARY_DIR}/${generated}" - COMMAND $ "${CMAKE_CURRENT_SOURCE_DIR}/${src}" "${CMAKE_CURRENT_BINARY_DIR}/${generated}" ${actor_compiler_flags} ${actor_compiler_flags} + COMMAND $ "${CMAKE_CURRENT_SOURCE_DIR}/${src}" "${CMAKE_CURRENT_BINARY_DIR}/${generated}" ${actor_compiler_flags} DEPENDS "${CMAKE_CURRENT_SOURCE_DIR}/${src}" actorcompiler COMMENT "Compile actor: ${src}") else() add_custom_command(OUTPUT "${CMAKE_CURRENT_BINARY_DIR}/${generated}" - COMMAND ${MONO_EXECUTABLE} ${actor_exe} "${CMAKE_CURRENT_SOURCE_DIR}/${src}" "${CMAKE_CURRENT_BINARY_DIR}/${generated}" ${actor_compiler_flags} ${actor_compiler_flags} > /dev/null + COMMAND ${MONO_EXECUTABLE} ${actor_exe} "${CMAKE_CURRENT_SOURCE_DIR}/${src}" "${CMAKE_CURRENT_BINARY_DIR}/${generated}" ${actor_compiler_flags} > /dev/null DEPENDS "${CMAKE_CURRENT_SOURCE_DIR}/${src}" actorcompiler COMMENT "Compile actor: ${src}") endif() From 23de5b64ad009e6c81579eb84f62f7e7399174fa Mon Sep 17 00:00:00 2001 From: Alex Miller Date: Mon, 1 Jul 2019 13:38:06 -0700 Subject: [PATCH 08/22] Memory storage engine to use crc32c DiskQueue by default (in 6.2). --- fdbserver/KeyValueStoreMemory.actor.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fdbserver/KeyValueStoreMemory.actor.cpp b/fdbserver/KeyValueStoreMemory.actor.cpp index 16eebb8a7a..796844f4cd 100644 --- a/fdbserver/KeyValueStoreMemory.actor.cpp +++ b/fdbserver/KeyValueStoreMemory.actor.cpp @@ -715,7 +715,7 @@ KeyValueStoreMemory::KeyValueStoreMemory( IDiskQueue* log, UID id, int64_t memor IKeyValueStore* keyValueStoreMemory( std::string const& basename, UID logID, int64_t memoryLimit, std::string ext ) { TraceEvent("KVSMemOpening", logID).detail("Basename", basename).detail("MemoryLimit", memoryLimit); - IDiskQueue *log = openDiskQueue( basename, ext, logID, DiskQueueVersion::V0 ); + IDiskQueue *log = openDiskQueue( basename, ext, logID, DiskQueueVersion::V1 ); return new KeyValueStoreMemory( log, logID, memoryLimit, false, false, false ); } From b69d7adabcb932cd93b3c994d54ac7f665e1d71d Mon Sep 17 00:00:00 2001 From: Jingyu Zhou Date: Fri, 26 Apr 2019 13:35:18 -0700 Subject: [PATCH 09/22] Remove unused remoteRecovered from master server --- fdbserver/masterserver.actor.cpp | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/fdbserver/masterserver.actor.cpp b/fdbserver/masterserver.actor.cpp index 205b1dbc19..4d6122291d 100644 --- a/fdbserver/masterserver.actor.cpp +++ b/fdbserver/masterserver.actor.cpp @@ -1126,7 +1126,7 @@ ACTOR Future rejoinRequestHandler( Reference self ) { } } -ACTOR Future trackTlogRecovery( Reference self, Reference>> oldLogSystems, Promise remoteRecovered ) { +ACTOR Future trackTlogRecovery( Reference self, Reference>> oldLogSystems ) { state Future rejoinRequests = Never(); state DBRecoveryCount recoverCount = self->cstate.myDBState.recoveryCount + 1; loop { @@ -1170,10 +1170,6 @@ ACTOR Future trackTlogRecovery( Reference self, ReferenceregistrationTrigger.trigger(); - if(allLogs && remoteRecovered.canBeSet()) { - remoteRecovered.send(Void()); - } - if( finalUpdate ) { oldLogSystems->get()->stopRejoins(); rejoinRequests = rejoinRequestHandler(self); @@ -1386,8 +1382,7 @@ ACTOR Future masterCore( Reference self ) { // we made to the new Tlogs (self->recoveryTransactionVersion), and only our own semi-commits can come between our // first commit and the next new TLogs - state Promise remoteRecovered; - self->addActor.send( trackTlogRecovery(self, oldLogSystems, remoteRecovered) ); + self->addActor.send( trackTlogRecovery(self, oldLogSystems) ); debug_advanceMaxCommittedVersion(UID(), self->recoveryTransactionVersion); wait(self->cstateUpdated.getFuture()); debug_advanceMinCommittedVersion(UID(), self->recoveryTransactionVersion); From 0b9cd18fb4848566ea23edc54bad814100575aa1 Mon Sep 17 00:00:00 2001 From: mengranwo Date: Wed, 28 Nov 2018 16:01:40 -0800 Subject: [PATCH 10/22] checking cluster is healthy or not during recovery process(for storage engine), if healthy, delete data files and join as new --- fdbserver/WorkerInterface.actor.h | 3 +- fdbserver/storageserver.actor.cpp | 53 +++++++++++++++++++++++++++++-- fdbserver/worker.actor.cpp | 4 +-- 3 files changed, 54 insertions(+), 6 deletions(-) diff --git a/fdbserver/WorkerInterface.actor.h b/fdbserver/WorkerInterface.actor.h index 8370e7fdde..cb1b196443 100644 --- a/fdbserver/WorkerInterface.actor.h +++ b/fdbserver/WorkerInterface.actor.h @@ -415,7 +415,8 @@ ACTOR Future storageServer(IKeyValueStore* persistentData, StorageServerIn Reference> db, std::string folder); ACTOR Future storageServer(IKeyValueStore* persistentData, StorageServerInterface ssi, Reference> db, std::string folder, - Promise recovered); // changes pssi->id() to be the recovered ID + Promise recovered, + Reference const& connFile ); // changes pssi->id() to be the recovered ID); // changes pssi->id() to be the recovered ID ACTOR Future masterServer(MasterInterface mi, Reference> db, ServerCoordinators serverCoordinators, LifetimeToken lifetime, bool forceRecovery); ACTOR Future masterProxyServer(MasterProxyInterface proxy, InitializeMasterProxyRequest req, diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 1dba3bc92e..7af4f4b2dc 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -31,6 +31,7 @@ #include "fdbclient/SystemData.h" #include "fdbclient/NativeAPI.actor.h" #include "fdbclient/Notified.h" +#include "fdbclient/StatusClient.h" #include "fdbclient/MasterProxyInterface.h" #include "fdbclient/DatabaseContext.h" #include "fdbserver/WorkerInterface.actor.h" @@ -3626,6 +3627,39 @@ bool storageServerTerminated(StorageServer& self, IKeyValueStore* persistentData return false; } +ACTOR Future memoryStoreRecover(IKeyValueStore* store, Reference connFile, UID id) +{ + if(store->getType() != KeyValueStoreType::MEMORY || connFile.getPtr() == nullptr) { + return false; + } + + // create a temp client connect to DB + Database cx = Database::createDatabase(connFile, Database::API_VERSION_LATEST); + + state Transaction tr( cx ); + state int noCanRemoveCount = 0; + loop { + try { + tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + + state bool canRemove = wait( canRemoveStorageServer( &tr, id ) ); + if (!canRemove) { + TEST(true); // it's possible that the caller had a transaction in flight that assigned keys to the server. Wait for it to reverse its mistake. + Void _ = wait( delayJittered(SERVER_KNOBS->REMOVE_RETRY_DELAY, TaskUpdateStorage) ); + tr.reset(); + TraceEvent("RemoveStorageServerRetrying").detail("Count", noCanRemoveCount++).detail("ServerID", id).detail("CanRemove", canRemove); + } else { + Void _ = wait(tr.commit()); + return true; + } + } catch (Error& e) { + state Error err = e; + Void _ = wait( tr.onError(e) ); + TraceEvent("RemoveStorageServerRetrying").error(err); + } + } +} + ACTOR Future storageServer( IKeyValueStore* persistentData, StorageServerInterface ssi, Tag seedTag, ReplyPromise recruitReply, Reference> db, std::string folder ) { @@ -3745,7 +3779,7 @@ ACTOR Future replaceInterface( StorageServer* self, StorageServerInterface return Void(); } -ACTOR Future storageServer( IKeyValueStore* persistentData, StorageServerInterface ssi, Reference> db, std::string folder, Promise recovered ) +ACTOR Future storageServer( IKeyValueStore* persistentData, StorageServerInterface ssi, Reference> db, std::string folder, Promise recovered, Reference connFile) { state StorageServer self(persistentData, db, ssi); self.folder = folder; @@ -3753,8 +3787,21 @@ ACTOR Future storageServer( IKeyValueStore* persistentData, StorageServerI try { state double start = now(); TraceEvent("StorageServerRebootStart", self.thisServerID); - wait(self.storage.init()); - wait(self.storage.commit()); //after a rollback there might be uncommitted changes. + + state Future dispose = memoryStoreRecover (persistentData, connFile, self.thisServerID); + wait(self.storage.init()); + //after a rollback there might be uncommitted changes. + //for memory storage engine type, wait until recovery is done before commit + state Future committed = self.storage.commit(); + wait(success(dispose) || success(committed)) + if (committed.isReady()) { + // recovery finished before dispose + dispose.cancel(); + } else if (dispose.isReady() && dispose.get()){ + TraceEvent("DisposeStorageServer", self.thisServerID); + throw worker_removed(); + } + bool ok = wait( self.storage.restoreDurableState() ); if (!ok) { if(recovered.canBeSet()) recovered.send(Void()); diff --git a/fdbserver/worker.actor.cpp b/fdbserver/worker.actor.cpp index 5f22334b44..68a3a48a91 100644 --- a/fdbserver/worker.actor.cpp +++ b/fdbserver/worker.actor.cpp @@ -556,7 +556,7 @@ ACTOR Future storageServerRollbackRebooter( Future prevStorageServer DUMPTOKEN(recruited.getKeyValueStoreType); DUMPTOKEN(recruited.watchValue); - prevStorageServer = storageServer( store, recruited, db, folder, Promise() ); + prevStorageServer = storageServer( store, recruited, db, folder, Promise(), Reference (nullptr) ); prevStorageServer = handleIOErrors(prevStorageServer, store, id, store->onClosed()); } } @@ -804,7 +804,7 @@ ACTOR Future workerServer( DUMPTOKEN(recruited.watchValue); Promise recovery; - Future f = storageServer( kv, recruited, dbInfo, folder, recovery ); + Future f = storageServer( kv, recruited, dbInfo, folder, recovery, connFile); recoveries.push_back(recovery.getFuture()); f = handleIOErrors( f, kv, s.storeID, kvClosed ); f = storageServerRollbackRebooter( f, s.storeType, s.filename, recruited.id(), recruited.locality, dbInfo, folder, &filesClosed, memoryLimit, kv); From 6b61b0e0309c11ccee0edde7577d7642848fd0d8 Mon Sep 17 00:00:00 2001 From: mengranwo Date: Sun, 16 Jun 2019 19:39:11 -0700 Subject: [PATCH 11/22] fix syntax error, pass compile --- fdbserver/WorkerInterface.actor.h | 2 +- fdbserver/storageserver.actor.cpp | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/fdbserver/WorkerInterface.actor.h b/fdbserver/WorkerInterface.actor.h index cb1b196443..ba33037759 100644 --- a/fdbserver/WorkerInterface.actor.h +++ b/fdbserver/WorkerInterface.actor.h @@ -416,7 +416,7 @@ ACTOR Future storageServer(IKeyValueStore* persistentData, StorageServerIn ACTOR Future storageServer(IKeyValueStore* persistentData, StorageServerInterface ssi, Reference> db, std::string folder, Promise recovered, - Reference const& connFile ); // changes pssi->id() to be the recovered ID); // changes pssi->id() to be the recovered ID + Reference connFile ); // changes pssi->id() to be the recovered ID); // changes pssi->id() to be the recovered ID ACTOR Future masterServer(MasterInterface mi, Reference> db, ServerCoordinators serverCoordinators, LifetimeToken lifetime, bool forceRecovery); ACTOR Future masterProxyServer(MasterProxyInterface proxy, InitializeMasterProxyRequest req, diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 7af4f4b2dc..f9443de770 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -3645,16 +3645,16 @@ ACTOR Future memoryStoreRecover(IKeyValueStore* store, ReferenceREMOVE_RETRY_DELAY, TaskUpdateStorage) ); + wait( delayJittered(SERVER_KNOBS->REMOVE_RETRY_DELAY, TaskUpdateStorage) ); tr.reset(); TraceEvent("RemoveStorageServerRetrying").detail("Count", noCanRemoveCount++).detail("ServerID", id).detail("CanRemove", canRemove); } else { - Void _ = wait(tr.commit()); + wait(tr.commit()); return true; } } catch (Error& e) { state Error err = e; - Void _ = wait( tr.onError(e) ); + wait( tr.onError(e) ); TraceEvent("RemoveStorageServerRetrying").error(err); } } @@ -3792,8 +3792,8 @@ ACTOR Future storageServer( IKeyValueStore* persistentData, StorageServerI wait(self.storage.init()); //after a rollback there might be uncommitted changes. //for memory storage engine type, wait until recovery is done before commit - state Future committed = self.storage.commit(); - wait(success(dispose) || success(committed)) + state Future committed = self.storage.commit(); + wait(success(dispose) || success(committed)); if (committed.isReady()) { // recovery finished before dispose dispose.cancel(); From 11161746f857f59e36904a788eed144469bec503 Mon Sep 17 00:00:00 2001 From: mengranwo Date: Mon, 17 Jun 2019 10:24:28 -0700 Subject: [PATCH 12/22] add try catch block around tx.onerror() --- fdbserver/storageserver.actor.cpp | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index f9443de770..e3122e597a 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -3654,8 +3654,12 @@ ACTOR Future memoryStoreRecover(IKeyValueStore* store, Reference Date: Mon, 17 Jun 2019 13:30:12 -0700 Subject: [PATCH 13/22] fix format issue --- fdbserver/storageserver.actor.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index e3122e597a..a3e4bcb6a3 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -3658,8 +3658,8 @@ ACTOR Future memoryStoreRecover(IKeyValueStore* store, Reference Date: Thu, 20 Jun 2019 17:41:00 -0700 Subject: [PATCH 14/22] address cr comments: --- fdbserver/storageserver.actor.cpp | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index a3e4bcb6a3..ded2ebda3c 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -3627,10 +3627,10 @@ bool storageServerTerminated(StorageServer& self, IKeyValueStore* persistentData return false; } -ACTOR Future memoryStoreRecover(IKeyValueStore* store, Reference connFile, UID id) +ACTOR Future memoryStoreRecover(IKeyValueStore* store, Reference connFile, UID id) { if(store->getType() != KeyValueStoreType::MEMORY || connFile.getPtr() == nullptr) { - return false; + return Never(); } // create a temp client connect to DB @@ -3650,7 +3650,7 @@ ACTOR Future memoryStoreRecover(IKeyValueStore* store, Reference storageServer( IKeyValueStore* persistentData, StorageServerI state double start = now(); TraceEvent("StorageServerRebootStart", self.thisServerID); - state Future dispose = memoryStoreRecover (persistentData, connFile, self.thisServerID); wait(self.storage.init()); - //after a rollback there might be uncommitted changes. - //for memory storage engine type, wait until recovery is done before commit - state Future committed = self.storage.commit(); - wait(success(dispose) || success(committed)); - if (committed.isReady()) { - // recovery finished before dispose - dispose.cancel(); - } else if (dispose.isReady() && dispose.get()){ - TraceEvent("DisposeStorageServer", self.thisServerID); - throw worker_removed(); + choose { + //after a rollback there might be uncommitted changes. + //for memory storage engine type, wait until recovery is done before commit + wait(self.storage.commit()) {} + + wait(memoryStoreRecover (persistentData, connFile, self.thisServerID)) { + TraceEvent("DisposeStorageServer", self.thisServerID); + throw worker_removed(); + } } bool ok = wait( self.storage.restoreDurableState() ); From 819b6e3d6d47e7332f0353b76561c9485ac8fc30 Mon Sep 17 00:00:00 2001 From: mengranwo Date: Thu, 20 Jun 2019 18:24:10 -0700 Subject: [PATCH 15/22] fix compiling error --- fdbserver/storageserver.actor.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index ded2ebda3c..24b8fd9725 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -3796,9 +3796,9 @@ ACTOR Future storageServer( IKeyValueStore* persistentData, StorageServerI choose { //after a rollback there might be uncommitted changes. //for memory storage engine type, wait until recovery is done before commit - wait(self.storage.commit()) {} + when( wait(self.storage.commit())) {} - wait(memoryStoreRecover (persistentData, connFile, self.thisServerID)) { + when( wait(memoryStoreRecover (persistentData, connFile, self.thisServerID))) { TraceEvent("DisposeStorageServer", self.thisServerID); throw worker_removed(); } From 0ad151e70a7cafc4280b635a4eb2e112fc3fb9e6 Mon Sep 17 00:00:00 2001 From: mengranwo Date: Fri, 21 Jun 2019 14:08:49 -0700 Subject: [PATCH 16/22] style formatting --- fdbserver/storageserver.actor.cpp | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 24b8fd9725..376820fae6 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -1318,8 +1318,7 @@ ACTOR Future findKey( StorageServer* data, KeySelectorRef sel, Version vers ASSERT(returnKey != sel.getKey()); return returnKey; - } - else + } else return forward ? range.end : range.begin; } } @@ -3629,9 +3628,9 @@ bool storageServerTerminated(StorageServer& self, IKeyValueStore* persistentData ACTOR Future memoryStoreRecover(IKeyValueStore* store, Reference connFile, UID id) { - if(store->getType() != KeyValueStoreType::MEMORY || connFile.getPtr() == nullptr) { - return Never(); - } + if (store->getType() != KeyValueStoreType::MEMORY || connFile.getPtr() == nullptr) { + return Never(); + } // create a temp client connect to DB Database cx = Database::createDatabase(connFile, Database::API_VERSION_LATEST); @@ -3792,7 +3791,7 @@ ACTOR Future storageServer( IKeyValueStore* persistentData, StorageServerI state double start = now(); TraceEvent("StorageServerRebootStart", self.thisServerID); - wait(self.storage.init()); + wait(self.storage.init()); choose { //after a rollback there might be uncommitted changes. //for memory storage engine type, wait until recovery is done before commit From e54eedf0e2626b4a996bad16d35f5ee0043bb74d Mon Sep 17 00:00:00 2001 From: mengranwo Date: Sun, 30 Jun 2019 16:00:21 -0700 Subject: [PATCH 17/22] Address pr comments, remove wait(tr.commit()) for read-only txn --- fdbserver/storageserver.actor.cpp | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 376820fae6..587c438460 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -3648,16 +3648,11 @@ ACTOR Future memoryStoreRecover(IKeyValueStore* store, Reference Date: Tue, 2 Jul 2019 11:28:27 -0700 Subject: [PATCH 18/22] Fix minor bug in External Workload --- fdbserver/workloads/ExternalWorkload.actor.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fdbserver/workloads/ExternalWorkload.actor.cpp b/fdbserver/workloads/ExternalWorkload.actor.cpp index 1323bf775c..5a0bb4a92b 100644 --- a/fdbserver/workloads/ExternalWorkload.actor.cpp +++ b/fdbserver/workloads/ExternalWorkload.actor.cpp @@ -215,7 +215,7 @@ struct ExternalWorkload : TestWorkload, FDBWorkloadContext { Promise promise; auto f = promise.getFuture(); keepAlive(f, database); - workloadImpl->start(reinterpret_cast(database.getPtr()), + workloadImpl->check(reinterpret_cast(database.getPtr()), GenericPromise(new FDBPromiseImpl(promise))); return f; } From 71ba490cf8bdc32ace52e2765c6a02f2b8094416 Mon Sep 17 00:00:00 2001 From: Stephen Atherton Date: Tue, 2 Jul 2019 00:58:43 -0700 Subject: [PATCH 19/22] Removed use of the C "struct hack" as it is not valid C++. Replaced zero-length members with functions returning a pointer for arrays or a reference for single members. --- fdbserver/DeltaTree.h | 56 ++++++++++++++++++------------ fdbserver/VersionedBTree.actor.cpp | 50 ++++++++++++++++---------- 2 files changed, 66 insertions(+), 40 deletions(-) diff --git a/fdbserver/DeltaTree.h b/fdbserver/DeltaTree.h index 4a9bee5c98..6797d87a77 100644 --- a/fdbserver/DeltaTree.h +++ b/fdbserver/DeltaTree.h @@ -69,6 +69,7 @@ // // Retrieves the previously stored boolean // bool getPrefixSource() const; // +#pragma pack(push,1) template struct DeltaTree { @@ -76,36 +77,47 @@ struct DeltaTree { return std::numeric_limits::max(); }; -#pragma pack(push,1) struct Node { OffsetT leftChildOffset; OffsetT rightChildOffset; - DeltaT delta[0]; + + inline DeltaT & delta() { + return *(DeltaT *)(this + 1); + }; + + inline const DeltaT & delta() const { + return *(const DeltaT *)(this + 1); + }; Node * rightChild() const { - //printf("Node(%p): leftOffset=%d rightOffset=%d deltaSize=%d\n", this, (int)leftChildOffset, (int)rightChildOffset, (int)delta->size()); - return rightChildOffset == 0 ? nullptr : (Node *)((uint8_t *)delta + rightChildOffset); + //printf("Node(%p): leftOffset=%d rightOffset=%d deltaSize=%d\n", this, (int)leftChildOffset, (int)rightChildOffset, (int)delta().size()); + return rightChildOffset == 0 ? nullptr : (Node *)((uint8_t *)&delta() + rightChildOffset); } Node * leftChild() const { - //printf("Node(%p): leftOffset=%d rightOffset=%d deltaSize=%d\n", this, (int)leftChildOffset, (int)rightChildOffset, (int)delta->size()); - return leftChildOffset == 0 ? nullptr : (Node *)((uint8_t *)delta + leftChildOffset); + //printf("Node(%p): leftOffset=%d rightOffset=%d deltaSize=%d\n", this, (int)leftChildOffset, (int)rightChildOffset, (int)delta().size()); + return leftChildOffset == 0 ? nullptr : (Node *)((uint8_t *)&delta() + leftChildOffset); } int size() const { - return sizeof(Node) + delta->size(); + return sizeof(Node) + delta().size(); } }; -#pragma pack(pop) -#pragma pack(push,1) struct { OffsetT nodeBytes; // Total size of all Nodes including the root uint8_t initialDepth; // Levels in the tree as of the last rebuild - Node root[0]; }; #pragma pack(pop) + inline Node & root() { + return *(Node *)(this + 1); + } + + inline const Node & root() const { + return *(const Node *)(this + 1); + } + int size() const { return sizeof(DeltaTree) + nodeBytes; } @@ -119,18 +131,18 @@ public: struct DecodedNode { DecodedNode(Node *raw, const T *prev, const T *next, Arena &arena) : raw(raw), parent(nullptr), left(nullptr), right(nullptr), prev(prev), next(next), - item(raw->delta->apply(raw->delta->getPrefixSource() ? *prev : *next, arena)) + item(raw->delta().apply(raw->delta().getPrefixSource() ? *prev : *next, arena)) { - //printf("DecodedNode1 raw=%p delta=%s\n", raw, raw->delta->toString().c_str()); + //printf("DecodedNode1 raw=%p delta=%s\n", raw, raw->delta().toString().c_str()); } DecodedNode(Node *raw, DecodedNode *parent, bool left, Arena &arena) : parent(parent), raw(raw), left(nullptr), right(nullptr), prev(left ? parent->prev : &parent->item), next(left ? &parent->item : parent->next), - item(raw->delta->apply(raw->delta->getPrefixSource() ? *prev : *next, arena)) + item(raw->delta().apply(raw->delta().getPrefixSource() ? *prev : *next, arena)) { - //printf("DecodedNode2 raw=%p delta=%s\n", raw, raw->delta->toString().c_str()); + //printf("DecodedNode2 raw=%p delta=%s\n", raw, raw->delta().toString().c_str()); } Node *raw; @@ -175,7 +187,7 @@ public: lower = new(arena) T(arena, *lower); upper = new(arena) T(arena, *upper); - root = (tree->nodeBytes == 0) ? nullptr : new (arena) DecodedNode(tree->root, lower, upper, arena); + root = (tree->nodeBytes == 0) ? nullptr : new (arena) DecodedNode(&tree->root(), lower, upper, arena); } const T *lowerBound() const { @@ -330,7 +342,7 @@ public: // The boundary leading to the new page acts as the last time we branched right if(begin != end) { - nodeBytes = build(*root, begin, end, prev, next); + nodeBytes = build(root(), begin, end, prev, next); } else { nodeBytes = 0; @@ -341,7 +353,7 @@ public: private: static OffsetT build(Node &root, const T *begin, const T *end, const T *prev, const T *next) { //printf("build: %s to %s\n", begin->toString().c_str(), (end - 1)->toString().c_str()); - //printf("build: root at %p sizeof(Node) %d delta at %p \n", &root, sizeof(Node), root.delta); + //printf("build: root at %p sizeof(Node) %d delta at %p \n", &root, sizeof(Node), &root.delta()); ASSERT(end != begin); int count = end - begin; @@ -370,12 +382,12 @@ private: base = next; } - int deltaSize = item.writeDelta(*root.delta, *base, commonPrefix); - root.delta->setPrefixSource(prefixSourcePrev); - //printf("Serialized %s to %p\n", item.toString().c_str(), root.delta); + int deltaSize = item.writeDelta(root.delta(), *base, commonPrefix); + root.delta().setPrefixSource(prefixSourcePrev); + //printf("Serialized %s to %p\n", item.toString().c_str(), &root.delta()); // Continue writing after the serialized Delta. - uint8_t *wptr = (uint8_t *)root.delta + deltaSize; + uint8_t *wptr = (uint8_t *)&root.delta() + deltaSize; // Serialize left child if(count > 1) { @@ -388,7 +400,7 @@ private: // Serialize right child if(count > 2) { - root.rightChildOffset = wptr - (uint8_t *)root.delta; + root.rightChildOffset = wptr - (uint8_t *)&root.delta(); wptr += build(*(Node *)wptr, begin + mid + 1, end, &item, next); } else { diff --git a/fdbserver/VersionedBTree.actor.cpp b/fdbserver/VersionedBTree.actor.cpp index 5834687548..a926926d5a 100644 --- a/fdbserver/VersionedBTree.actor.cpp +++ b/fdbserver/VersionedBTree.actor.cpp @@ -431,7 +431,14 @@ struct RedwoodRecordRef { }; uint8_t flags; - byte data[]; + + inline byte * data() { + return (byte *)(this + 1); + } + + inline const byte * data() const { + return (const byte *)(this + 1); + } void setPrefixSource(bool val) { if(val) { @@ -447,7 +454,7 @@ struct RedwoodRecordRef { } RedwoodRecordRef apply(const RedwoodRecordRef &base, Arena &arena) const { - Reader r(data); + Reader r(data()); int intFieldSuffixLen = flags & INT_FIELD_SUFFIX_BITS; int prefixLen = r.readVarInt(); @@ -501,19 +508,19 @@ struct RedwoodRecordRef { } int size() const { - Reader r(data); + Reader r(data()); int intFieldSuffixLen = flags & INT_FIELD_SUFFIX_BITS; r.readVarInt(); // prefixlen int valueLen = (flags & HAS_VALUE) ? r.read() : 0; int keySuffixLen = (flags & HAS_KEY_SUFFIX) ? r.readVarInt() : 0; - return sizeof(Delta) + r.rptr - data + intFieldSuffixLen + valueLen + keySuffixLen; + return sizeof(Delta) + r.rptr - data() + intFieldSuffixLen + valueLen + keySuffixLen; } // Delta can't be determined without the RedwoodRecordRef upon which the Delta is based. std::string toString() const { - Reader r(data); + Reader r(data()); std::string flagString = " "; if(flags & PREFIX_SOURCE) flagString += "prefixSource "; @@ -638,7 +645,7 @@ struct RedwoodRecordRef { commonPrefix = getCommonPrefixLen(base, 0); } - Writer w(d.data); + Writer w(d.data()); // prefixLen w.writeVarInt(commonPrefix); @@ -688,7 +695,7 @@ struct RedwoodRecordRef { w.writeString(value.get()); } - return w.wptr - d.data + sizeof(Delta); + return w.wptr - d.data() + sizeof(Delta); } template @@ -737,10 +744,17 @@ struct BTreePage { uint16_t count; uint32_t kvBytes; uint8_t extensionPageCount; - LogicalPageID extensionPages[0]; }; #pragma pack(pop) + inline LogicalPageID * extensionPages() { + return (LogicalPageID *)(this + 1); + } + + inline const LogicalPageID * extensionPages() const { + return (const LogicalPageID *)(this + 1); + } + int size() const { const BinaryTree *t = &tree(); return (uint8_t *)t - (uint8_t *)this + t->size(); @@ -751,15 +765,15 @@ struct BTreePage { } BinaryTree & tree() { - return *(BinaryTree *)(extensionPages + extensionPageCount); + return *(BinaryTree *)(extensionPages() + extensionPageCount); } const BinaryTree & tree() const { - return *(const BinaryTree *)(extensionPages + extensionPageCount); + return *(const BinaryTree *)(extensionPages() + extensionPageCount); } static inline int GetHeaderSize(int extensionPages = 0) { - return sizeof(BTreePage) + extensionPages + sizeof(LogicalPageID); + return sizeof(BTreePage) + (extensionPages * sizeof(LogicalPageID)); } std::string toString(bool write, LogicalPageID id, Version ver, const RedwoodRecordRef *lowerBound, const RedwoodRecordRef *upperBound) const { @@ -1603,7 +1617,7 @@ private: for(int e = 0, eEnd = extPages.size(); e < eEnd; ++e) { LogicalPageID eid = m_pager->allocateLogicalPage(); debug_printf("%p: writePages(): Writing extension page op=write id=%u @%" PRId64 " (%d of %lu) referencePageID=%u\n", actor_debug, eid, version, e + 1, extPages.size(), id); - newPage->extensionPages[e] = bigEndian32(eid); + newPage->extensionPages()[e] = bigEndian32(eid); // If replacing the primary page below (version == 0) then pass the primary page's ID as the reference page ID m_pager->writePage(eid, extPages[e], version, (version == 0) ? id : invalidLogicalPageID); ++counts.extPageWrites; @@ -1620,8 +1634,8 @@ private: // Free the old extension pages now that all replacement pages have been written for(int i = 0; i < originalPage->extensionPageCount; ++i) { - //debug_printf("%p: writePages(): Freeing old extension op=del id=%u @latest\n", actor_debug, bigEndian32(originalPage->extensionPages[i])); - //m_pager->freeLogicalPage(bigEndian32(originalPage->extensionPages[i]), version); + //debug_printf("%p: writePages(): Freeing old extension op=del id=%u @latest\n", actor_debug, bigEndian32(originalPage->extensionPages()[i])); + //m_pager->freeLogicalPage(bigEndian32(originalPage->extensionPages()[i]), version); } return primaryLogicalPageIDs; @@ -1684,8 +1698,8 @@ private: pageGets.push_back(std::move(result)); for(int i = 0; i < pTreePage->extensionPageCount; ++i) { - debug_printf("readPage() Reading extension page op=read id=%u @%" PRId64 " ext=%d/%d\n", bigEndian32(pTreePage->extensionPages[i]), snapshot->getVersion(), i + 1, (int)pTreePage->extensionPageCount); - pageGets.push_back(snapshot->getPhysicalPage(bigEndian32(pTreePage->extensionPages[i]))); + debug_printf("readPage() Reading extension page op=read id=%u @%" PRId64 " ext=%d/%d\n", bigEndian32(pTreePage->extensionPages()[i]), snapshot->getVersion(), i + 1, (int)pTreePage->extensionPageCount); + pageGets.push_back(snapshot->getPhysicalPage(bigEndian32(pTreePage->extensionPages()[i]))); } std::vector> pages = wait(getAll(pageGets)); @@ -3561,12 +3575,12 @@ TEST_CASE("!/redwood/correctness/unit/deltaTree/RedwoodRecordRef") { while(1) { if(fwd.get() != items[i]) { printf("forward iterator i=%d\n %s found\n %s expected\n", i, fwd.get().toString().c_str(), items[i].toString().c_str()); - printf("Delta: %s\n", fwd.node->raw->delta->toString().c_str()); + printf("Delta: %s\n", fwd.node->raw->delta().toString().c_str()); ASSERT(false); } if(rev.get() != items[items.size() - 1 - i]) { printf("reverse iterator i=%d\n %s found\n %s expected\n", i, rev.get().toString().c_str(), items[items.size() - 1 - i].toString().c_str()); - printf("Delta: %s\n", rev.node->raw->delta->toString().c_str()); + printf("Delta: %s\n", rev.node->raw->delta().toString().c_str()); ASSERT(false); } ++i; From 3fb0999e10712e664e6681a339ae8e2130c3189b Mon Sep 17 00:00:00 2001 From: Evan Tschannen Date: Tue, 2 Jul 2019 16:54:47 -0700 Subject: [PATCH 20/22] revert storage server priority changes --- fdbserver/storageserver.actor.cpp | 18 ++++++++---------- flow/network.h | 3 +-- 2 files changed, 9 insertions(+), 12 deletions(-) diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 587c438460..ed87da7342 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -2865,11 +2865,11 @@ ACTOR Future updateStorage(StorageServer* data) { if (g_network->isSimulated()) { double endTime = g_simulator.checkDisabled(format("%s/updateStorage", data->thisServerID.toString().c_str())); if(endTime > now()) { - wait(delay(endTime - now(), TaskStorage)); + wait(delay(endTime - now(), TaskUpdateStorage)); } } wait( data->desiredOldestVersion.whenAtLeast( data->storageVersion()+1 ) ); - wait( delay(0, TaskStorage) ); + wait( delay(0, TaskUpdateStorage) ); state Promise durableInProgress; data->durableInProgress = durableInProgress.getFuture(); @@ -2884,10 +2884,10 @@ ACTOR Future updateStorage(StorageServer* data) { state bool done = data->storage.makeVersionMutationsDurable(newOldestVersion, desiredVersion, bytesLeft); // We want to forget things from these data structures atomically with changing oldestVersion (and "before", since oldestVersion.set() may trigger waiting actors) // forgetVersionsBeforeAsync visibly forgets immediately (without waiting) but asynchronously frees memory. - Future finishedForgetting = data->mutableData().forgetVersionsBeforeAsync( newOldestVersion, TaskStorage ); + Future finishedForgetting = data->mutableData().forgetVersionsBeforeAsync( newOldestVersion, TaskUpdateStorage ); data->oldestVersion.set( newOldestVersion ); wait( finishedForgetting ); - wait( yield(TaskStorage) ); + wait( yield(TaskUpdateStorage) ); if (done) break; } @@ -2900,9 +2900,7 @@ ACTOR Future updateStorage(StorageServer* data) { state Future durableDelay = Void(); if (bytesLeft > 0) { - durableDelay = delay(SERVER_KNOBS->STORAGE_COMMIT_INTERVAL, TaskStorage); - } else { - durableDelay = delay(0, TaskUpdateStorage) || delay(SERVER_KNOBS->STORAGE_COMMIT_INTERVAL, TaskStorage); + durableDelay = delay(SERVER_KNOBS->STORAGE_COMMIT_INTERVAL, TaskUpdateStorage); } wait( durable ); @@ -2922,7 +2920,7 @@ ACTOR Future updateStorage(StorageServer* data) { } durableInProgress.send(Void()); - wait( delay(0, TaskStorage) ); //Setting durableInProgess could cause the storage server to shut down, so delay to check for cancellation + wait( delay(0, TaskUpdateStorage) ); //Setting durableInProgess could cause the storage server to shut down, so delay to check for cancellation // Taking and releasing the durableVersionLock ensures that no eager reads both begin before the commit was effective and // are applied after we change the durable version. Also ensure that we have to lock while calling changeDurableVersion, @@ -2931,9 +2929,9 @@ ACTOR Future updateStorage(StorageServer* data) { data->popVersion( data->durableVersion.get() + 1 ); while (!changeDurableVersion( data, newOldestVersion )) { - if(g_network->check_yield(TaskStorage)) { + if(g_network->check_yield(TaskUpdateStorage)) { data->durableVersionLock.release(); - wait(delay(0, TaskStorage)); + wait(delay(0, TaskUpdateStorage)); wait( data->durableVersionLock.take() ); } } diff --git a/flow/network.h b/flow/network.h index 02532ba6ea..18bb6148dc 100644 --- a/flow/network.h +++ b/flow/network.h @@ -65,8 +65,6 @@ enum { TaskDefaultOnMainThread = 7500, TaskDefaultDelay = 7010, TaskDefaultYield = 7000, - TaskDiskWrite = 5030, - TaskStorage = 5020, TaskDiskRead = 5010, TaskDefaultEndpoint = 5000, TaskUnknownEndpoint = 4000, @@ -74,6 +72,7 @@ enum { TaskDataDistributionLaunch = 3530, TaskRatekeeper = 3510, TaskDataDistribution = 3500, + TaskDiskWrite = 3010, TaskUpdateStorage = 3000, TaskTLogSpilledPeekReply = 2800, TaskLowPriority = 2000, From 8c10d832a12f9d13c6848a667cdc782d113e633c Mon Sep 17 00:00:00 2001 From: "A.J. Beamon" Date: Wed, 3 Jul 2019 11:09:36 -0700 Subject: [PATCH 21/22] Add coordinator role in trace events --- fdbserver/Coordination.actor.cpp | 25 +++++++++++++++---------- fdbserver/WorkerInterface.actor.h | 1 + fdbserver/worker.actor.cpp | 1 + 3 files changed, 17 insertions(+), 10 deletions(-) diff --git a/fdbserver/Coordination.actor.cpp b/fdbserver/Coordination.actor.cpp index 4c7ec289f0..a6d38bda4a 100644 --- a/fdbserver/Coordination.actor.cpp +++ b/fdbserver/Coordination.actor.cpp @@ -20,8 +20,9 @@ #include "fdbserver/CoordinationInterface.h" #include "fdbserver/IKeyValueStore.h" -#include "flow/ActorCollection.h" #include "fdbserver/Knobs.h" +#include "fdbserver/WorkerInterface.actor.h" +#include "flow/ActorCollection.h" #include "flow/UnitTest.h" #include "flow/IndexedSet.h" #include "flow/actorcompiler.h" // This must be the last #include. @@ -360,11 +361,11 @@ struct LeaderRegisterCollection { return Void(); } - LeaderElectionRegInterface& getInterface(KeyRef key) { + LeaderElectionRegInterface& getInterface(KeyRef key, UID id) { auto i = registerInterfaces.find( key ); if (i == registerInterfaces.end()) { Key k = key; - Future a = wrap(this, k, leaderRegister(registerInterfaces[k], k) ); + Future a = wrap(this, k, leaderRegister(registerInterfaces[k], k), id); if (a.isError()) throw a.getError(); ASSERT( !a.isReady() ); actors.add( a ); @@ -374,11 +375,15 @@ struct LeaderRegisterCollection { return i->value; } - ACTOR static Future wrap( LeaderRegisterCollection* self, Key key, Future actor ) { + ACTOR static Future wrap( LeaderRegisterCollection* self, Key key, Future actor, UID id ) { state Error e; try { + // FIXME: Get worker ID here + startRole(Role::COORDINATOR, id, UID()); wait(actor); + endRole(Role::COORDINATOR, id, "Coordinator changed"); } catch (Error& err) { + endRole(Role::COORDINATOR, id, err.what(), err.code() == error_code_actor_cancelled, err); if (err.code() == error_code_actor_cancelled) throw; e = err; @@ -392,7 +397,7 @@ struct LeaderRegisterCollection { // leaderServer multiplexes multiple leaderRegisters onto a single LeaderElectionRegInterface, // creating and destroying them on demand. -ACTOR Future leaderServer(LeaderElectionRegInterface interf, OnDemandStore *pStore) { +ACTOR Future leaderServer(LeaderElectionRegInterface interf, OnDemandStore *pStore, UID id) { state LeaderRegisterCollection regs( pStore ); state ActorCollection forwarders(false); @@ -404,21 +409,21 @@ ACTOR Future leaderServer(LeaderElectionRegInterface interf, OnDemandStore if( forward.present() ) req.reply.send( forward.get() ); else - regs.getInterface(req.key).getLeader.send( req ); + regs.getInterface(req.key, id).getLeader.send( req ); } when ( CandidacyRequest req = waitNext( interf.candidacy.getFuture() ) ) { Optional forward = regs.getForward(req.key); if( forward.present() ) req.reply.send( forward.get() ); else - regs.getInterface(req.key).candidacy.send(req); + regs.getInterface(req.key, id).candidacy.send(req); } when ( LeaderHeartbeatRequest req = waitNext( interf.leaderHeartbeat.getFuture() ) ) { Optional forward = regs.getForward(req.key); if( forward.present() ) req.reply.send( false ); else - regs.getInterface(req.key).leaderHeartbeat.send(req); + regs.getInterface(req.key, id).leaderHeartbeat.send(req); } when ( ForwardRequest req = waitNext( interf.forward.getFuture() ) ) { Optional forward = regs.getForward(req.key); @@ -426,7 +431,7 @@ ACTOR Future leaderServer(LeaderElectionRegInterface interf, OnDemandStore req.reply.send( Void() ); else { forwarders.add( LeaderRegisterCollection::setForward( ®s, req.key, ClusterConnectionString(req.conn.toString()) ) ); - regs.getInterface(req.key).forward.send(req); + regs.getInterface(req.key, id).forward.send(req); } } when( wait( forwarders.getResult() ) ) { ASSERT(false); throw internal_error(); } @@ -442,7 +447,7 @@ ACTOR Future coordinationServer(std::string dataFolder) { TraceEvent("CoordinationServer", myID).detail("MyInterfaceAddr", myInterface.read.getEndpoint().getPrimaryAddress()).detail("Folder", dataFolder); try { - wait( localGenerationReg(myInterface, &store) || leaderServer(myLeaderInterface, &store) || store.getError() ); + wait( localGenerationReg(myInterface, &store) || leaderServer(myLeaderInterface, &store, myID) || store.getError() ); throw internal_error(); } catch (Error& e) { TraceEvent("CoordinationServerError", myID).error(e, true); diff --git a/fdbserver/WorkerInterface.actor.h b/fdbserver/WorkerInterface.actor.h index ba33037759..680816aad2 100644 --- a/fdbserver/WorkerInterface.actor.h +++ b/fdbserver/WorkerInterface.actor.h @@ -369,6 +369,7 @@ struct Role { static const Role LOG_ROUTER; static const Role DATA_DISTRIBUTOR; static const Role RATEKEEPER; + static const Role COORDINATOR; std::string roleName; std::string abbreviation; diff --git a/fdbserver/worker.actor.cpp b/fdbserver/worker.actor.cpp index 68a3a48a91..74b6ba0b1d 100644 --- a/fdbserver/worker.actor.cpp +++ b/fdbserver/worker.actor.cpp @@ -1403,3 +1403,4 @@ const Role Role::TESTER("Tester", "TS"); const Role Role::LOG_ROUTER("LogRouter", "LR"); const Role Role::DATA_DISTRIBUTOR("DataDistributor", "DD"); const Role Role::RATEKEEPER("Ratekeeper", "RK"); +const Role Role::COORDINATOR("Coordinator", "CD"); From 5ea2e690167399ef619729f89faa291adfbffebd Mon Sep 17 00:00:00 2001 From: Jingyu Zhou Date: Wed, 3 Jul 2019 10:50:25 -0700 Subject: [PATCH 22/22] Remove a fdbprc header from flow library Flow should be an independent library. --- flow/network.h | 1 - 1 file changed, 1 deletion(-) diff --git a/flow/network.h b/flow/network.h index 18bb6148dc..cb9ffa4a97 100644 --- a/flow/network.h +++ b/flow/network.h @@ -29,7 +29,6 @@ #include "boost/asio.hpp" #include "flow/serialize.h" #include "flow/IRandom.h" -#include "fdbrpc/crc32c.h" enum { TaskMaxPriority = 1000000,