Revert "CDs reject requests from external cluster or requests need to be forwarded based on an old forward request."
This commit is contained in:
parent
8091d8179d
commit
5e80e16802
|
@ -428,18 +428,12 @@ ACTOR Future<Void> leaderRegister(LeaderElectionRegInterface interf, Key key) {
|
|||
// (they are always derived from a ClusterConnectionString key).
|
||||
// Forwarding values are stored in this range:
|
||||
const KeyRangeRef fwdKeys( LiteralStringRef( "\xff" "fwd" ), LiteralStringRef( "\xff" "fwe" ) );
|
||||
// Forwarding time are stored in this range:
|
||||
const KeyRangeRef fwdTimeKeys(LiteralStringRef("\xff"
|
||||
"fwdTime"),
|
||||
LiteralStringRef("\xff"
|
||||
"fwdTimf"));
|
||||
|
||||
struct LeaderRegisterCollection {
|
||||
// SOMEDAY: Factor this into a generic tool? Extend ActorCollection to support removal actions? What?
|
||||
ActorCollection actors;
|
||||
Map<Key, LeaderElectionRegInterface> registerInterfaces;
|
||||
Map<Key, LeaderInfo> forward;
|
||||
Map<Key, double> forwardStartTime;
|
||||
OnDemandStore *pStore;
|
||||
|
||||
LeaderRegisterCollection( OnDemandStore *pStore ) : actors( false ), pStore( pStore ) {}
|
||||
|
@ -448,21 +442,13 @@ struct LeaderRegisterCollection {
|
|||
if( !self->pStore->exists() )
|
||||
return Void();
|
||||
OnDemandStore &store = *self->pStore;
|
||||
state Future<Standalone<RangeResultRef>> forwardingInfoF = store->readRange(fwdKeys);
|
||||
state Future<Standalone<RangeResultRef>> forwardingTimeF = store->readRange(fwdTimeKeys);
|
||||
wait(success(forwardingInfoF) && success(forwardingTimeF));
|
||||
Standalone<RangeResultRef> forwardingInfo = forwardingInfoF.get();
|
||||
Standalone<RangeResultRef> forwardingTime = forwardingTimeF.get();
|
||||
Standalone<RangeResultRef> forwardingInfo = wait( store->readRange( fwdKeys ) );
|
||||
for( int i = 0; i < forwardingInfo.size(); i++ ) {
|
||||
LeaderInfo forwardInfo;
|
||||
forwardInfo.forward = true;
|
||||
forwardInfo.serializedInfo = forwardingInfo[i].value;
|
||||
self->forward[ forwardingInfo[i].key.removePrefix( fwdKeys.begin ) ] = forwardInfo;
|
||||
}
|
||||
for (int i = 0; i < forwardingTime.size(); i++) {
|
||||
double time = BinaryReader::fromStringRef<double>(forwardingInfo[i].value, IncludeVersion());
|
||||
self->forwardStartTime[forwardingInfo[i].key.removePrefix(fwdTimeKeys.begin)] = time;
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
@ -470,31 +456,18 @@ struct LeaderRegisterCollection {
|
|||
|
||||
Optional<LeaderInfo> getForward(KeyRef key) {
|
||||
auto i = forward.find( key );
|
||||
auto t = forwardStartTime.find(key);
|
||||
if (i == forward.end())
|
||||
return Optional<LeaderInfo>();
|
||||
if (t != forwardStartTime.end()) {
|
||||
double forwardTime = t->value;
|
||||
if (now() - forwardTime > SERVER_KNOBS->FORWARD_REQUEST_TOO_OLD) {
|
||||
TraceEvent(SevWarnAlways, "AccessOldForward")
|
||||
.detail("ForwardSetSecondsAgo", now() - forwardTime)
|
||||
.detail("ForwardClusterKey", key);
|
||||
}
|
||||
}
|
||||
return i->value;
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> setForward(LeaderRegisterCollection *self, KeyRef key, ClusterConnectionString conn) {
|
||||
double forwardTime = now();
|
||||
LeaderInfo forwardInfo;
|
||||
forwardInfo.forward = true;
|
||||
forwardInfo.serializedInfo = conn.toString();
|
||||
self->forward[ key ] = forwardInfo;
|
||||
self->forwardStartTime[key] = forwardTime;
|
||||
OnDemandStore &store = *self->pStore;
|
||||
store->set( KeyValueRef( key.withPrefix( fwdKeys.begin ), conn.toString() ) );
|
||||
store->set(
|
||||
KeyValueRef(key.withPrefix(fwdTimeKeys.begin), BinaryWriter::toValue(forwardTime, IncludeVersion())));
|
||||
wait(store->commit());
|
||||
return Void();
|
||||
}
|
||||
|
@ -535,8 +508,7 @@ struct LeaderRegisterCollection {
|
|||
|
||||
// leaderServer multiplexes multiple leaderRegisters onto a single LeaderElectionRegInterface,
|
||||
// creating and destroying them on demand.
|
||||
ACTOR Future<Void> leaderServer(LeaderElectionRegInterface interf, OnDemandStore* pStore, UID id,
|
||||
Reference<ClusterConnectionFile> ccf) {
|
||||
ACTOR Future<Void> leaderServer(LeaderElectionRegInterface interf, OnDemandStore *pStore, UID id) {
|
||||
state LeaderRegisterCollection regs( pStore );
|
||||
state ActorCollection forwarders(false);
|
||||
|
||||
|
@ -550,111 +522,53 @@ ACTOR Future<Void> leaderServer(LeaderElectionRegInterface interf, OnDemandStore
|
|||
info.id = deterministicRandom()->randomUniqueID();
|
||||
info.forward = forward.get().serializedInfo;
|
||||
req.reply.send( CachedSerialization<ClientDBInfo>(info) );
|
||||
} else {
|
||||
if (!SERVER_KNOBS->ENABLE_CROSS_CLUSTER_SUPPORT &&
|
||||
(ccf->getConnectionString().clusterKey() != req.clusterKey ||
|
||||
ccf->getConnectionString().coordinators() != req.coordinators)) {
|
||||
TraceEvent(SevWarnAlways, "CCFMismatch")
|
||||
.detail("RequestType", "OpenDatabaseCoordRequest")
|
||||
.detail("LocalCS", ccf->getConnectionString().toString())
|
||||
.detail("IncomingClusterKey", req.clusterKey)
|
||||
.detail("IncomingCoordinators", describeList(req.coordinators, req.coordinators.size()));
|
||||
req.reply.sendError(wrong_connection_file());
|
||||
} else {
|
||||
regs.getInterface(req.clusterKey, id).openDatabase.send( req );
|
||||
}
|
||||
}
|
||||
}
|
||||
when ( ElectionResultRequest req = waitNext( interf.electionResult.getFuture() ) ) {
|
||||
Optional<LeaderInfo> forward = regs.getForward(req.key);
|
||||
if( forward.present() ) {
|
||||
req.reply.send( forward.get() );
|
||||
} else {
|
||||
if (!SERVER_KNOBS->ENABLE_CROSS_CLUSTER_SUPPORT &&
|
||||
(ccf->getConnectionString().clusterKey() != req.key ||
|
||||
ccf->getConnectionString().coordinators() != req.coordinators)) {
|
||||
TraceEvent(SevWarnAlways, "CCFMismatch")
|
||||
.detail("RequestType", "ElectionResultRequest")
|
||||
.detail("LocalCS", ccf->getConnectionString().toString())
|
||||
.detail("IncomingClusterKey", req.key)
|
||||
.detail("IncomingCoordinators", describeList(req.coordinators, req.coordinators.size()));
|
||||
req.reply.sendError(wrong_connection_file());
|
||||
} else {
|
||||
regs.getInterface(req.key, id).electionResult.send( req );
|
||||
}
|
||||
}
|
||||
}
|
||||
when ( GetLeaderRequest req = waitNext( interf.getLeader.getFuture() ) ) {
|
||||
Optional<LeaderInfo> forward = regs.getForward(req.key);
|
||||
if( forward.present() )
|
||||
req.reply.send( forward.get() );
|
||||
else {
|
||||
if (!SERVER_KNOBS->ENABLE_CROSS_CLUSTER_SUPPORT && ccf->getConnectionString().clusterKey() != req.key) {
|
||||
TraceEvent(SevWarnAlways, "CCFMismatch")
|
||||
.detail("RequestType", "GetLeaderRequest")
|
||||
.detail("LocalCS", ccf->getConnectionString().toString())
|
||||
.detail("IncomingClusterKey", req.key);
|
||||
req.reply.sendError(wrong_connection_file());
|
||||
} else {
|
||||
else
|
||||
regs.getInterface(req.key, id).getLeader.send( req );
|
||||
}
|
||||
}
|
||||
}
|
||||
when ( CandidacyRequest req = waitNext( interf.candidacy.getFuture() ) ) {
|
||||
Optional<LeaderInfo> forward = regs.getForward(req.key);
|
||||
if( forward.present() )
|
||||
req.reply.send( forward.get() );
|
||||
else {
|
||||
if (!SERVER_KNOBS->ENABLE_CROSS_CLUSTER_SUPPORT && ccf->getConnectionString().clusterKey() != req.key) {
|
||||
TraceEvent(SevWarnAlways, "CCFMismatch")
|
||||
.detail("RequestType", "CandidacyRequest")
|
||||
.detail("LocalCS", ccf->getConnectionString().toString())
|
||||
.detail("IncomingClusterKey", req.key);
|
||||
req.reply.sendError(wrong_connection_file());
|
||||
} else {
|
||||
else
|
||||
regs.getInterface(req.key, id).candidacy.send(req);
|
||||
}
|
||||
}
|
||||
}
|
||||
when ( LeaderHeartbeatRequest req = waitNext( interf.leaderHeartbeat.getFuture() ) ) {
|
||||
Optional<LeaderInfo> forward = regs.getForward(req.key);
|
||||
if( forward.present() )
|
||||
req.reply.send(LeaderHeartbeatReply{ false });
|
||||
else {
|
||||
if (!SERVER_KNOBS->ENABLE_CROSS_CLUSTER_SUPPORT && ccf->getConnectionString().clusterKey() != req.key) {
|
||||
TraceEvent(SevWarnAlways, "CCFMismatch")
|
||||
.detail("RequestType", "LeaderHeartbeatRequest")
|
||||
.detail("LocalCS", ccf->getConnectionString().toString())
|
||||
.detail("IncomingClusterKey", req.key);
|
||||
req.reply.sendError(wrong_connection_file());
|
||||
} else {
|
||||
else
|
||||
regs.getInterface(req.key, id).leaderHeartbeat.send(req);
|
||||
}
|
||||
}
|
||||
}
|
||||
when ( ForwardRequest req = waitNext( interf.forward.getFuture() ) ) {
|
||||
Optional<LeaderInfo> forward = regs.getForward(req.key);
|
||||
if( forward.present() )
|
||||
req.reply.send( Void() );
|
||||
else {
|
||||
if (!SERVER_KNOBS->ENABLE_CROSS_CLUSTER_SUPPORT && ccf->getConnectionString().clusterKey() != req.key) {
|
||||
TraceEvent(SevWarnAlways, "CCFMismatch")
|
||||
.detail("RequestType", "ForwardRequest")
|
||||
.detail("LocalCS", ccf->getConnectionString().toString())
|
||||
.detail("IncomingClusterKey", req.key);
|
||||
req.reply.sendError(wrong_connection_file());
|
||||
} else {
|
||||
forwarders.add(LeaderRegisterCollection::setForward(®s, req.key,
|
||||
ClusterConnectionString(req.conn.toString())));
|
||||
forwarders.add( LeaderRegisterCollection::setForward( ®s, req.key, ClusterConnectionString(req.conn.toString()) ) );
|
||||
regs.getInterface(req.key, id).forward.send(req);
|
||||
}
|
||||
}
|
||||
}
|
||||
when( wait( forwarders.getResult() ) ) { ASSERT(false); throw internal_error(); }
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> coordinationServer(std::string dataFolder, Reference<ClusterConnectionFile> ccf) {
|
||||
ACTOR Future<Void> coordinationServer(std::string dataFolder) {
|
||||
state UID myID = deterministicRandom()->randomUniqueID();
|
||||
state LeaderElectionRegInterface myLeaderInterface( g_network );
|
||||
state GenerationRegInterface myInterface( g_network );
|
||||
|
@ -663,8 +577,7 @@ ACTOR Future<Void> coordinationServer(std::string dataFolder, Reference<ClusterC
|
|||
TraceEvent("CoordinationServer", myID).detail("MyInterfaceAddr", myInterface.read.getEndpoint().getPrimaryAddress()).detail("Folder", dataFolder);
|
||||
|
||||
try {
|
||||
wait(localGenerationReg(myInterface, &store) || leaderServer(myLeaderInterface, &store, myID, ccf) ||
|
||||
store.getError());
|
||||
wait( localGenerationReg(myInterface, &store) || leaderServer(myLeaderInterface, &store, myID) || store.getError() );
|
||||
throw internal_error();
|
||||
} catch (Error& e) {
|
||||
TraceEvent("CoordinationServerError", myID).error(e, true);
|
||||
|
|
|
@ -209,6 +209,6 @@ public:
|
|||
vector<GenerationRegInterface> stateServers;
|
||||
};
|
||||
|
||||
Future<Void> coordinationServer(std::string const& dataFolder, Reference<ClusterConnectionFile> const& ccf);
|
||||
Future<Void> coordinationServer( std::string const& dataFolder );
|
||||
|
||||
#endif
|
||||
|
|
|
@ -609,8 +609,6 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
|
|||
|
||||
// Coordination
|
||||
init( COORDINATED_STATE_ONCONFLICT_POLL_INTERVAL, 1.0 ); if( randomize && BUGGIFY ) COORDINATED_STATE_ONCONFLICT_POLL_INTERVAL = 10.0;
|
||||
init( FORWARD_REQUEST_TOO_OLD, 600.0 ); if( randomize && BUGGIFY ) FORWARD_REQUEST_TOO_OLD = 60.0;
|
||||
init( ENABLE_CROSS_CLUSTER_SUPPORT, true ); if( randomize && BUGGIFY ) ENABLE_CROSS_CLUSTER_SUPPORT = false;
|
||||
|
||||
// Buggification
|
||||
init( BUGGIFIED_EVENTUAL_CONSISTENCY, 1.0 );
|
||||
|
|
|
@ -538,9 +538,6 @@ public:
|
|||
|
||||
// Coordination
|
||||
double COORDINATED_STATE_ONCONFLICT_POLL_INTERVAL;
|
||||
double FORWARD_REQUEST_TOO_OLD;
|
||||
bool ENABLE_CROSS_CLUSTER_SUPPORT; // Allow a coordinator to serve requests whose connection string does not match
|
||||
// the local copy
|
||||
|
||||
// Buggification
|
||||
double BUGGIFIED_EVENTUAL_CONSISTENCY;
|
||||
|
|
|
@ -1133,7 +1133,6 @@ void setupSimulatedSystem(vector<Future<Void>>* systemActors, std::string baseFo
|
|||
TEST( !useIPv6 );
|
||||
|
||||
vector<NetworkAddress> coordinatorAddresses;
|
||||
vector<NetworkAddress> extraCoordinatorAddresses; // Used by extra DB if the DR db is a new one
|
||||
if(minimumRegions > 1) {
|
||||
//do not put coordinators in the primary region so that we can kill that region safely
|
||||
int nonPrimaryDcs = dataCenters/2;
|
||||
|
@ -1142,9 +1141,6 @@ void setupSimulatedSystem(vector<Future<Void>>* systemActors, std::string baseFo
|
|||
for(int m = 0; m < dcCoordinators; m++) {
|
||||
auto ip = makeIPAddressForSim(useIPv6, { 2, dc, 1, m });
|
||||
coordinatorAddresses.push_back(NetworkAddress(ip, sslEnabled && !sslOnly ? 2 : 1, true, sslEnabled && sslOnly));
|
||||
auto extraIp = makeIPAddressForSim(useIPv6, { 4, dc, 1, m });
|
||||
extraCoordinatorAddresses.push_back(
|
||||
NetworkAddress(extraIp, sslEnabled && !sslOnly ? 2 : 1, true, sslEnabled && sslOnly));
|
||||
TraceEvent("SelectedCoordinator").detail("Address", coordinatorAddresses.back());
|
||||
}
|
||||
}
|
||||
|
@ -1170,9 +1166,6 @@ void setupSimulatedSystem(vector<Future<Void>>* systemActors, std::string baseFo
|
|||
} else {
|
||||
auto ip = makeIPAddressForSim(useIPv6, { 2, dc, 1, m });
|
||||
coordinatorAddresses.push_back(NetworkAddress(ip, sslEnabled && !sslOnly ? 2 : 1, true, sslEnabled && sslOnly));
|
||||
auto extraIp = makeIPAddressForSim(useIPv6, { 4, dc, 1, m });
|
||||
extraCoordinatorAddresses.push_back(
|
||||
NetworkAddress(extraIp, sslEnabled && !sslOnly ? 2 : 1, true, sslEnabled && sslOnly));
|
||||
TraceEvent("SelectedCoordinator").detail("Address", coordinatorAddresses.back()).detail("M", m).detail("Machines", machines).detail("Assigned", assignedMachines).detail("DcCoord", dcCoordinators).detail("P1", (m+1==dcCoordinators)).detail("P2", (assignedMachines<4)).detail("P3", (assignedMachines+machines-dcCoordinators>=4)).detail("CoordinatorCount", coordinatorCount);
|
||||
}
|
||||
assignedMachines++;
|
||||
|
@ -1200,13 +1193,10 @@ void setupSimulatedSystem(vector<Future<Void>>* systemActors, std::string baseFo
|
|||
// If extraDB==0, leave g_simulator.extraDB as null because the test does not use DR.
|
||||
if(extraDB==1) {
|
||||
// The DR database can be either a new database or itself
|
||||
g_simulator.extraDB =
|
||||
BUGGIFY ? new ClusterConnectionString(coordinatorAddresses, LiteralStringRef("TestCluster:0"))
|
||||
: new ClusterConnectionString(extraCoordinatorAddresses, LiteralStringRef("ExtraCluster:0"));
|
||||
g_simulator.extraDB = new ClusterConnectionString(coordinatorAddresses, BUGGIFY ? LiteralStringRef("TestCluster:0") : LiteralStringRef("ExtraCluster:0"));
|
||||
} else if(extraDB==2) {
|
||||
// The DR database is a new database
|
||||
g_simulator.extraDB =
|
||||
new ClusterConnectionString(extraCoordinatorAddresses, LiteralStringRef("ExtraCluster:0"));
|
||||
g_simulator.extraDB = new ClusterConnectionString(coordinatorAddresses, LiteralStringRef("ExtraCluster:0"));
|
||||
} else if(extraDB==3) {
|
||||
// The DR database is the same database
|
||||
g_simulator.extraDB = new ClusterConnectionString(coordinatorAddresses, LiteralStringRef("TestCluster:0"));
|
||||
|
|
|
@ -1756,7 +1756,7 @@ ACTOR Future<Void> fdbd(
|
|||
if (coordFolder.size()) {
|
||||
// SOMEDAY: remove the fileNotFound wrapper and make DiskQueue construction safe from errors setting up
|
||||
// their files
|
||||
actors.push_back(fileNotFoundToNever(coordinationServer(coordFolder, coordinators.ccf)));
|
||||
actors.push_back(fileNotFoundToNever(coordinationServer(coordFolder)));
|
||||
}
|
||||
|
||||
state UID processIDUid = wait(createAndLockProcessIdFile(dataFolder));
|
||||
|
|
|
@ -72,7 +72,6 @@ ERROR( connection_idle, 1049, "Connection closed after idle timeout" )
|
|||
ERROR( disk_adapter_reset, 1050, "The disk queue adpater reset" )
|
||||
ERROR( batch_transaction_throttled, 1051, "Batch GRV request rate limit exceeded")
|
||||
ERROR( dd_cancelled, 1052, "Data distribution components cancelled")
|
||||
ERROR( wrong_connection_file, 1053, "Connection file mismatch")
|
||||
|
||||
ERROR( broken_promise, 1100, "Broken promise" )
|
||||
ERROR( operation_cancelled, 1101, "Asynchronous operation cancelled" )
|
||||
|
|
Loading…
Reference in New Issue