From 8af92a99ed6a38336516ab5e426c1fe1b417b323 Mon Sep 17 00:00:00 2001 From: Trevor Clinkenbeard Date: Wed, 19 Jun 2019 16:53:14 -0700 Subject: [PATCH] Abort reads when connection file changes --- fdbclient/DatabaseContext.h | 4 ++-- fdbclient/NativeAPI.actor.cpp | 41 +++++++++++++++++++++++++++++------ 2 files changed, 36 insertions(+), 9 deletions(-) diff --git a/fdbclient/DatabaseContext.h b/fdbclient/DatabaseContext.h index 56d4275701..997d091f1e 100644 --- a/fdbclient/DatabaseContext.h +++ b/fdbclient/DatabaseContext.h @@ -109,7 +109,7 @@ public: // switchConnectionFile guarantees that any read with a version from the old cluster will not be attempted on the // new cluster. Future switchConnectionFile(Reference standby); - Future recreateWatches(); + Future connectionFileChanged(); bool switchable = false; // private: @@ -137,7 +137,7 @@ public: }; std::map versionBatcher; - AsyncTrigger recreateWatchesTrigger; + AsyncTrigger connectionFileChangedTrigger; // Disallow any reads at a read version lower than minAcceptableReadVersion. This way the client does not have to // trust that the read version (possibly set manually by the application) is actually from the correct cluster. diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 24ce7ea746..9a759e8984 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -810,7 +810,7 @@ ACTOR static Future switchConnectionFileImpl(ReferenceminAcceptableReadVersion); ASSERT(self->minAcceptableReadVersion != std::numeric_limits::max()); - self->recreateWatchesTrigger.trigger(); + self->connectionFileChangedTrigger.trigger(); return Void(); } catch (Error& e) { TraceEvent("SwitchConnectionFileError").detail("Error", e.what()); @@ -829,8 +829,8 @@ Future DatabaseContext::switchConnectionFile(Reference DatabaseContext::recreateWatches() { - return recreateWatchesTrigger.onTrigger(); +Future DatabaseContext::connectionFileChanged() { + return connectionFileChangedTrigger.onTrigger(); } Database Database::createDatabase( Reference connFile, int apiVersion, LocalityData const& clientLocality, DatabaseContext *preallocatedDb ) { @@ -1424,7 +1424,16 @@ ACTOR Future> getValue( Future version, Key key, Databa startTime = timer_int(); startTimeD = now(); ++cx->transactionPhysicalReads; - state GetValueReply reply = wait( loadBalance( ssi.second, &StorageServerInterface::getValue, GetValueRequest(key, ver, getValueID), TaskDefaultPromiseEndpoint, false, cx->enableLocalityLoadBalance ? &cx->queueModel : NULL ) ); + state GetValueReply reply; + choose { + when(wait(cx->connectionFileChanged())) { throw all_alternatives_failed(); } + when(GetValueReply _reply = + wait(loadBalance(ssi.second, &StorageServerInterface::getValue, + GetValueRequest(key, ver, getValueID), TaskDefaultPromiseEndpoint, false, + cx->enableLocalityLoadBalance ? &cx->queueModel : nullptr))) { + reply = _reply; + } + } double latency = now() - startTimeD; cx->readLatencies.addSample(latency); if (trLogInfo) { @@ -1487,7 +1496,16 @@ ACTOR Future getKey( Database cx, KeySelector k, Future version, T if( info.debugID.present() ) g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKey.Before"); //.detail("StartKey", k.getKey()).detail("Offset",k.offset).detail("OrEqual",k.orEqual); ++cx->transactionPhysicalReads; - GetKeyReply reply = wait( loadBalance( ssi.second, &StorageServerInterface::getKey, GetKeyRequest(k, version.get()), TaskDefaultPromiseEndpoint, false, cx->enableLocalityLoadBalance ? &cx->queueModel : NULL ) ); + state GetKeyReply reply; + choose { + when(wait(cx->connectionFileChanged())) { throw all_alternatives_failed(); } + when(GetKeyReply _reply = + wait(loadBalance(ssi.second, &StorageServerInterface::getKey, GetKeyRequest(k, version.get()), + TaskDefaultPromiseEndpoint, false, + cx->enableLocalityLoadBalance ? &cx->queueModel : nullptr))) { + reply = _reply; + } + } if( info.debugID.present() ) g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKey.After"); //.detail("NextKey",reply.sel.key).detail("Offset", reply.sel.offset).detail("OrEqual", k.orEqual); k = reply.sel; @@ -1651,7 +1669,16 @@ ACTOR Future> getExactRange( Database cx, Version ver .detail("Servers", locations[shard].second->description());*/ } ++cx->transactionPhysicalReads; - GetKeyValuesReply rep = wait( loadBalance( locations[shard].second, &StorageServerInterface::getKeyValues, req, TaskDefaultPromiseEndpoint, false, cx->enableLocalityLoadBalance ? &cx->queueModel : NULL ) ); + state GetKeyValuesReply rep; + choose { + when(wait(cx->connectionFileChanged())) { throw all_alternatives_failed(); } + when(GetKeyValuesReply _rep = + wait(loadBalance(locations[shard].second, &StorageServerInterface::getKeyValues, req, + TaskDefaultPromiseEndpoint, false, + cx->enableLocalityLoadBalance ? &cx->queueModel : nullptr))) { + rep = _rep; + } + } if( info.debugID.present() ) g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getExactRange.After"); output.arena().dependsOn( rep.arena ); @@ -2176,7 +2203,7 @@ ACTOR Future watch( Reference watch, Database cx, Transaction *self // NativeAPI watchValue future finishes or errors when(wait(watch->watchFuture)) { break; } - when(wait(cx->recreateWatches())) { + when(wait(cx->connectionFileChanged())) { TEST(true); // Recreated a watch after switch watch->watchFuture = watchValue(cx->minAcceptableReadVersion, watch->key, watch->value, cx, info);