Abort reads when connection file changes

This commit is contained in:
Trevor Clinkenbeard 2019-06-19 16:53:14 -07:00
parent f367fcaf25
commit 8af92a99ed
2 changed files with 36 additions and 9 deletions

View File

@ -109,7 +109,7 @@ public:
// switchConnectionFile guarantees that any read with a version from the old cluster will not be attempted on the
// new cluster.
Future<Void> switchConnectionFile(Reference<ClusterConnectionFile> standby);
Future<Void> recreateWatches();
Future<Void> connectionFileChanged();
bool switchable = false;
// private:
@ -137,7 +137,7 @@ public:
};
std::map<uint32_t, VersionBatcher> versionBatcher;
AsyncTrigger recreateWatchesTrigger;
AsyncTrigger connectionFileChangedTrigger;
// Disallow any reads at a read version lower than minAcceptableReadVersion. This way the client does not have to
// trust that the read version (possibly set manually by the application) is actually from the correct cluster.

View File

@ -810,7 +810,7 @@ ACTOR static Future<Void> switchConnectionFileImpl(Reference<ClusterConnectionFi
.detail("ReadVersion", v)
.detail("MinAcceptableReadVersion", self->minAcceptableReadVersion);
ASSERT(self->minAcceptableReadVersion != std::numeric_limits<Version>::max());
self->recreateWatchesTrigger.trigger();
self->connectionFileChangedTrigger.trigger();
return Void();
} catch (Error& e) {
TraceEvent("SwitchConnectionFileError").detail("Error", e.what());
@ -829,8 +829,8 @@ Future<Void> DatabaseContext::switchConnectionFile(Reference<ClusterConnectionFi
return switchConnectionFileImpl(standby, this);
}
Future<Void> DatabaseContext::recreateWatches() {
return recreateWatchesTrigger.onTrigger();
Future<Void> DatabaseContext::connectionFileChanged() {
return connectionFileChangedTrigger.onTrigger();
}
Database Database::createDatabase( Reference<ClusterConnectionFile> connFile, int apiVersion, LocalityData const& clientLocality, DatabaseContext *preallocatedDb ) {
@ -1424,7 +1424,16 @@ ACTOR Future<Optional<Value>> getValue( Future<Version> version, Key key, Databa
startTime = timer_int();
startTimeD = now();
++cx->transactionPhysicalReads;
state GetValueReply reply = wait( loadBalance( ssi.second, &StorageServerInterface::getValue, GetValueRequest(key, ver, getValueID), TaskDefaultPromiseEndpoint, false, cx->enableLocalityLoadBalance ? &cx->queueModel : NULL ) );
state GetValueReply reply;
choose {
when(wait(cx->connectionFileChanged())) { throw all_alternatives_failed(); }
when(GetValueReply _reply =
wait(loadBalance(ssi.second, &StorageServerInterface::getValue,
GetValueRequest(key, ver, getValueID), TaskDefaultPromiseEndpoint, false,
cx->enableLocalityLoadBalance ? &cx->queueModel : nullptr))) {
reply = _reply;
}
}
double latency = now() - startTimeD;
cx->readLatencies.addSample(latency);
if (trLogInfo) {
@ -1487,7 +1496,16 @@ ACTOR Future<Key> getKey( Database cx, KeySelector k, Future<Version> version, T
if( info.debugID.present() )
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKey.Before"); //.detail("StartKey", k.getKey()).detail("Offset",k.offset).detail("OrEqual",k.orEqual);
++cx->transactionPhysicalReads;
GetKeyReply reply = wait( loadBalance( ssi.second, &StorageServerInterface::getKey, GetKeyRequest(k, version.get()), TaskDefaultPromiseEndpoint, false, cx->enableLocalityLoadBalance ? &cx->queueModel : NULL ) );
state GetKeyReply reply;
choose {
when(wait(cx->connectionFileChanged())) { throw all_alternatives_failed(); }
when(GetKeyReply _reply =
wait(loadBalance(ssi.second, &StorageServerInterface::getKey, GetKeyRequest(k, version.get()),
TaskDefaultPromiseEndpoint, false,
cx->enableLocalityLoadBalance ? &cx->queueModel : nullptr))) {
reply = _reply;
}
}
if( info.debugID.present() )
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKey.After"); //.detail("NextKey",reply.sel.key).detail("Offset", reply.sel.offset).detail("OrEqual", k.orEqual);
k = reply.sel;
@ -1651,7 +1669,16 @@ ACTOR Future<Standalone<RangeResultRef>> getExactRange( Database cx, Version ver
.detail("Servers", locations[shard].second->description());*/
}
++cx->transactionPhysicalReads;
GetKeyValuesReply rep = wait( loadBalance( locations[shard].second, &StorageServerInterface::getKeyValues, req, TaskDefaultPromiseEndpoint, false, cx->enableLocalityLoadBalance ? &cx->queueModel : NULL ) );
state GetKeyValuesReply rep;
choose {
when(wait(cx->connectionFileChanged())) { throw all_alternatives_failed(); }
when(GetKeyValuesReply _rep =
wait(loadBalance(locations[shard].second, &StorageServerInterface::getKeyValues, req,
TaskDefaultPromiseEndpoint, false,
cx->enableLocalityLoadBalance ? &cx->queueModel : nullptr))) {
rep = _rep;
}
}
if( info.debugID.present() )
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getExactRange.After");
output.arena().dependsOn( rep.arena );
@ -2176,7 +2203,7 @@ ACTOR Future<Void> watch( Reference<Watch> watch, Database cx, Transaction *self
// NativeAPI watchValue future finishes or errors
when(wait(watch->watchFuture)) { break; }
when(wait(cx->recreateWatches())) {
when(wait(cx->connectionFileChanged())) {
TEST(true); // Recreated a watch after switch
watch->watchFuture =
watchValue(cx->minAcceptableReadVersion, watch->key, watch->value, cx, info);