Merge pull request #4935 from dlambrig/expired-forward
This commit is contained in:
commit
4395d46ee8
|
@ -655,6 +655,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
|||
|
||||
// Coordination
|
||||
init( COORDINATED_STATE_ONCONFLICT_POLL_INTERVAL, 1.0 ); if( randomize && BUGGIFY ) COORDINATED_STATE_ONCONFLICT_POLL_INTERVAL = 10.0;
|
||||
init( FORWARD_REQUEST_TOO_OLD, 4*24*60*60 ); if( randomize && BUGGIFY ) FORWARD_REQUEST_TOO_OLD = 60.0;
|
||||
init( ENABLE_CROSS_CLUSTER_SUPPORT, true ); if( randomize && BUGGIFY ) ENABLE_CROSS_CLUSTER_SUPPORT = false;
|
||||
init( COORDINATOR_LEADER_CONNECTION_TIMEOUT, 20.0 );
|
||||
|
||||
|
|
|
@ -594,6 +594,7 @@ public:
|
|||
double COORDINATED_STATE_ONCONFLICT_POLL_INTERVAL;
|
||||
bool ENABLE_CROSS_CLUSTER_SUPPORT; // Allow a coordinator to serve requests whose connection string does not match
|
||||
// the local descriptor
|
||||
double FORWARD_REQUEST_TOO_OLD; // Do not forward requests older than this setting
|
||||
double COORDINATOR_LEADER_CONNECTION_TIMEOUT;
|
||||
|
||||
// Buggification
|
||||
|
|
|
@ -465,12 +465,18 @@ const KeyRangeRef fwdKeys(LiteralStringRef("\xff"
|
|||
LiteralStringRef("\xff"
|
||||
"fwe"));
|
||||
|
||||
// The time when forwarding was last set is stored in this range:
|
||||
const KeyRangeRef fwdTimeKeys(LiteralStringRef("\xff"
|
||||
"fwdTime"),
|
||||
LiteralStringRef("\xff"
|
||||
"fwdTimf"));
|
||||
struct LeaderRegisterCollection {
|
||||
// SOMEDAY: Factor this into a generic tool? Extend ActorCollection to support removal actions? What?
|
||||
ActorCollection actors;
|
||||
Map<Key, LeaderElectionRegInterface> registerInterfaces;
|
||||
Map<Key, LeaderInfo> forward;
|
||||
OnDemandStore* pStore;
|
||||
Map<Key, double> forwardStartTime;
|
||||
|
||||
LeaderRegisterCollection(OnDemandStore* pStore) : actors(false), pStore(pStore) {}
|
||||
|
||||
|
@ -478,32 +484,58 @@ struct LeaderRegisterCollection {
|
|||
if (!self->pStore->exists())
|
||||
return Void();
|
||||
OnDemandStore& store = *self->pStore;
|
||||
RangeResult forwardingInfo = wait(store->readRange(fwdKeys));
|
||||
state Future<Standalone<RangeResultRef>> forwardingInfoF = store->readRange(fwdKeys);
|
||||
state Future<Standalone<RangeResultRef>> forwardingTimeF = store->readRange(fwdTimeKeys);
|
||||
wait(success(forwardingInfoF) && success(forwardingTimeF));
|
||||
Standalone<RangeResultRef> forwardingInfo = forwardingInfoF.get();
|
||||
Standalone<RangeResultRef> forwardingTime = forwardingTimeF.get();
|
||||
for (int i = 0; i < forwardingInfo.size(); i++) {
|
||||
LeaderInfo forwardInfo;
|
||||
forwardInfo.forward = true;
|
||||
forwardInfo.serializedInfo = forwardingInfo[i].value;
|
||||
self->forward[forwardingInfo[i].key.removePrefix(fwdKeys.begin)] = forwardInfo;
|
||||
}
|
||||
for (int i = 0; i < forwardingTime.size(); i++) {
|
||||
double time = BinaryReader::fromStringRef<double>(forwardingTime[i].value, Unversioned());
|
||||
self->forwardStartTime[forwardingTime[i].key.removePrefix(fwdTimeKeys.begin)] = time;
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
Future<Void> onError() { return actors.getResult(); }
|
||||
|
||||
// Check if the this coordinator is no longer the leader, and the new one was stored in the "forward" keyspace.
|
||||
// If the "forward" keyspace was set some time ago (as configured by knob), log an error to indicate the client is
|
||||
// using a very old cluster file.
|
||||
Optional<LeaderInfo> getForward(KeyRef key) {
|
||||
auto i = forward.find(key);
|
||||
auto t = forwardStartTime.find(key);
|
||||
if (i == forward.end())
|
||||
return Optional<LeaderInfo>();
|
||||
if (t != forwardStartTime.end()) {
|
||||
double forwardTime = t->value;
|
||||
if (now() - forwardTime > SERVER_KNOBS->FORWARD_REQUEST_TOO_OLD) {
|
||||
TraceEvent(SevWarnAlways, "AccessOldForward")
|
||||
.detail("ForwardSetSecondsAgo", now() - forwardTime)
|
||||
.detail("ForwardClusterKey", key);
|
||||
}
|
||||
}
|
||||
return i->value;
|
||||
}
|
||||
|
||||
// When the lead coordinator changes, store the new connection ID in the "fwd" keyspace.
|
||||
// If a request arrives using an old connection id, resend it to the new coordinator using the stored connection id.
|
||||
// Store when this change took place in the fwdTime keyspace.
|
||||
ACTOR static Future<Void> setForward(LeaderRegisterCollection* self, KeyRef key, ClusterConnectionString conn) {
|
||||
double forwardTime = now();
|
||||
LeaderInfo forwardInfo;
|
||||
forwardInfo.forward = true;
|
||||
forwardInfo.serializedInfo = conn.toString();
|
||||
self->forward[key] = forwardInfo;
|
||||
self->forwardStartTime[key] = forwardTime;
|
||||
OnDemandStore& store = *self->pStore;
|
||||
store->set(KeyValueRef(key.withPrefix(fwdKeys.begin), conn.toString()));
|
||||
store->set(KeyValueRef(key.withPrefix(fwdTimeKeys.begin), BinaryWriter::toValue(forwardTime, Unversioned())));
|
||||
wait(store->commit());
|
||||
return Void();
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue