This commit is contained in:
Vishesh Yadav 2019-01-09 07:41:02 -08:00
parent 209ecd09ee
commit 51b89ae083
9 changed files with 91 additions and 49 deletions

2
.gitignore vendored
View File

@ -82,3 +82,5 @@ compile_commands.json
.envrc
.DS_Store
temp/
.cquery_cached_index

View File

@ -2369,6 +2369,7 @@ ACTOR static Future<Void> commitDummyTransaction( Database cx, KeyRange range, T
loop {
try {
TraceEvent("CommitDummyTransaction").detail("Key", printable(range.begin)).detail("Retries", retries);
tr.debugTransaction(g_debug_random->randomUniqueID());
tr.options = options;
tr.info.taskID = info.taskID;
tr.setOption( FDBTransactionOptions::ACCESS_SYSTEM_KEYS );

View File

@ -168,6 +168,10 @@ public:
if (localAddresses.empty()) {
return NetworkAddress();
}
if (localAddresses.size() < 2) {
// TraceEvent("VISHESHGetFirstLocalAddress").detail("Size", localAddresses.size());
return localAddresses[0];
}
return localAddresses[0];
}
@ -877,31 +881,30 @@ void FlowTransport::loadedEndpoint( Endpoint& endpoint ) {
void FlowTransport::addPeerReference( const Endpoint& endpoint, NetworkMessageReceiver* receiver ) {
if (!receiver->isStream() || !endpoint.getPrimaryAddress().isValid()) return;
for (const NetworkAddress& address : endpoint.addresses) {
Peer* peer = self->getPeer(address);
if(peer->peerReferences == -1) {
peer->peerReferences = 1;
} else {
peer->peerReferences++;
}
const NetworkAddress& connectAddress = endpoint.getCompatibleAddress();
Peer* peer = self->getPeer(connectAddress);
if(peer->peerReferences == -1) {
peer->peerReferences = 1;
} else {
peer->peerReferences++;
}
}
void FlowTransport::removePeerReference( const Endpoint& endpoint, NetworkMessageReceiver* receiver ) {
if (!receiver->isStream() || !endpoint.getPrimaryAddress().isValid()) return;
for (const NetworkAddress& address : endpoint.addresses) {
Peer* peer = self->getPeer(address, false);
if(peer) {
peer->peerReferences--;
if(peer->peerReferences < 0) {
TraceEvent(SevError, "InvalidPeerReferences")
.detail("References", peer->peerReferences)
.detail("Address", address)
.detail("Token", endpoint.token);
}
if(peer->peerReferences == 0 && peer->reliable.empty() && peer->unsent.empty()) {
peer->incompatibleDataRead.trigger();
}
const NetworkAddress& connectAddress = endpoint.getCompatibleAddress();
Peer* peer = self->getPeer(connectAddress, false);
if(peer) {
peer->peerReferences--;
if(peer->peerReferences < 0) {
TraceEvent(SevError, "InvalidPeerReferences")
.detail("References", peer->peerReferences)
.detail("Address", endpoint.getPrimaryAddress())
.detail("ConnectAddress", connectAddress)
.detail("Token", endpoint.token);
}
if(peer->peerReferences == 0 && peer->reliable.empty() && peer->unsent.empty()) {
peer->incompatibleDataRead.trigger();
}
}
}
@ -953,7 +956,9 @@ static PacketID sendPacket( TransportData* self, ISerializeSource const& what, c
++self->countPacketsGenerated;
Peer* peer = self->getPeer(destination.getRandomAddress(), openConnection);
const NetworkAddress& selectedAddress = destination.getCompatibleAddress();
Peer* peer = self->getPeer(selectedAddress, openConnection);
TraceEvent("SendPacket").detail("PeerAddress", selectedAddress.toString());
// If there isn't an open connection, a public address, or the peer isn't compatible, we can't send
if (!peer || (peer->outgoingConnectionIdle && !destination.getPrimaryAddress().isPublic()) || (peer->incompatibleProtocolVersionNewer && destination.token != WLTOKEN_PING_PACKET)) {

View File

@ -48,12 +48,16 @@ public:
return addresses[0];
}
const NetworkAddress& getRandomAddress() const {
return addresses[g_random->randomChoice(addresses)];
const NetworkAddress& getCompatibleAddress() const {
if (addresses.size() < 2) {
// TraceEvent("VISHESHGetCompatibleAddress").detail("Size", addresses.size());
return addresses[0];
}
return addresses[0];
}
bool operator == (Endpoint const& r) const {
return addresses == r.addresses && token == r.token;
return getPrimaryAddress() == r.getPrimaryAddress() && token == r.token;
}
bool operator != (Endpoint const& r) const {
return !(*this == r);
@ -61,8 +65,8 @@ public:
//TODO: (Vishesh) Figure out what to do for vector of addresses this.
bool operator < (Endpoint const& r) const {
const NetworkAddress& left = addresses[0];
const NetworkAddress& right = r.addresses[0];
const NetworkAddress& left = getPrimaryAddress();
const NetworkAddress& right = r.getPrimaryAddress();
if (left != right)
return left < right;
else
@ -71,11 +75,15 @@ public:
template <class Ar>
void serialize(Ar& ar) {
if (ar.isDeserializing && ar.protocolVersion() < 0x0FDB00B061020002LL) {
ar & addresses[0] & token;
} else {
// if (ar.isDeserializing && ar.protocolVersion() < 0x0FDB00B061020001LL) {
// ar & addresses[0] & token;
// } else {
const char* msg = ar.isDeserializing ? "EndpointDeserializing" : "EndpointSerializing";
ar & addresses & token;
}
TraceEvent(msg).detail("Size", addresses.size())
.detail("Address", getPrimaryAddress())
.detail("CompatibleAddress", getCompatibleAddress());
// }
}
};
#pragma pack(pop)
@ -155,9 +163,14 @@ public:
loadedEndpoint(e);
}
const Endpoint& findEndpoint(const NetworkAddress& addr) {
return addressToEndpointMap[addr];
}
private:
class TransportData* self;
std::map<NetworkAddress, Endpoint> addressToEndpointMap;
void loadedEndpoint(Endpoint&);
};

View File

@ -62,7 +62,7 @@ struct FlowReceiver : private NetworkMessageReceiver {
FlowTransport::transport().addWellKnownEndpoint(endpoint, this, taskID);
}
private:
protected:
Endpoint endpoint;
bool m_isLocalEndpoint;
};
@ -204,6 +204,9 @@ struct NetNotifiedQueue : NotifiedQueue<T>, FlowReceiver, FastAllocated<NetNotif
virtual void destroy() { delete this; }
virtual void receive(ArenaReader& reader) {
TraceEvent("NetNotifiedReceive").detail("Size", endpoint.addresses.size())
.detail("PrimaryAddress", endpoint.getPrimaryAddress())
.detail("CompatibleAddress", endpoint.getCompatibleAddress());
this->addPromiseRef();
T message;
reader >> message;

View File

@ -186,7 +186,7 @@ struct Sim2Conn : IConnection, ReferenceCounted<Sim2Conn> {
this->peerEndpoint = peerEndpoint;
// Every one-way connection gets a random permanent latency and a random send buffer for the duration of the connection
auto latency = g_clogging.setPairLatencyIfNotSet( peerProcess->address.ip, process->address.ip, FLOW_KNOBS->MAX_CLOGGING_LATENCY*g_random->random01() );
auto latency = g_clogging.setPairLatencyIfNotSet( peerEndpoint.ip, process->address.ip, FLOW_KNOBS->MAX_CLOGGING_LATENCY*g_random->random01() );
sendBufSize = std::max<double>( g_random->randomInt(0, 5000000), 25e6 * (latency + .002) );
TraceEvent("Sim2Connection").detail("SendBufSize", sendBufSize).detail("Latency", latency);
}
@ -305,6 +305,7 @@ private:
}
ACTOR static Future<Void> receiver( Sim2Conn* self ) {
loop {
TraceEvent("Received from").detail("PeerAddr", self->peerEndpoint.toString());
if (self->sentBytes.get() != self->receivedBytes.get())
wait( g_simulator.onProcess( self->peerProcess ) );
while ( self->sentBytes.get() == self->receivedBytes.get() )
@ -359,17 +360,17 @@ private:
}
void rollRandomClose() {
if (now() - g_simulator.lastConnectionFailure > g_simulator.connectionFailuresDisableDuration && g_random->random01() < .00001) {
g_simulator.lastConnectionFailure = now();
double a = g_random->random01(), b = g_random->random01();
TEST(true); // Simulated connection failure
TraceEvent("ConnectionFailure", dbgid).detail("MyAddr", process->address).detail("PeerAddr", peerProcess->address).detail("SendClosed", a > .33).detail("RecvClosed", a < .66).detail("Explicit", b < .3);
if (a < .66 && peer) peer->closeInternal();
if (a > .33) closeInternal();
// At the moment, we occasionally notice the connection failed immediately. In principle, this could happen but only after a delay.
if (b < .3)
throw connection_failed();
}
// if (now() - g_simulator.lastConnectionFailure > g_simulator.connectionFailuresDisableDuration && g_random->random01() < .00001) {
// g_simulator.lastConnectionFailure = now();
// double a = g_random->random01(), b = g_random->random01();
// TEST(true); // Simulated connection failure
// TraceEvent("ConnectionFailure", dbgid).detail("MyAddr", process->address).detail("PeerAddr", peerProcess->address).detail("SendClosed", a > .33).detail("RecvClosed", a < .66).detail("Explicit", b < .3);
// if (a < .66 && peer) peer->closeInternal();
// if (a > .33) closeInternal();
// // At the moment, we occasionally notice the connection failed immediately. In principle, this could happen but only after a delay.
// if (b < .3)
// throw connection_failed();
// }
}
ACTOR static Future<Void> trackLeakedConnection( Sim2Conn* self ) {
@ -702,7 +703,9 @@ private:
wait( delay( seconds ) );
if (((Sim2Conn*)conn.getPtr())->isPeerGone() && g_random->random01()<0.5)
return;
TraceEvent("Sim2IncomingConn", conn->getDebugID());
TraceEvent("Sim2IncomingConn", conn->getDebugID())
.detail("ListenAddress", self->getListenAddress())
.detail("PeerAddress", conn->getPeerAddress());
self->nextConnection.send( conn );
}
ACTOR static Future<Reference<IConnection>> popOne( FutureStream< Reference<IConnection> > conns ) {

View File

@ -72,7 +72,10 @@ public:
: name(name), locality(locality), startingClass(startingClass), addresses(addresses), address(addresses[0]), dataFolder(dataFolder),
network(net), coordinationFolder(coordinationFolder), failed(false), excluded(false), cpuTicks(0),
rebooting(false), fault_injection_p1(0), fault_injection_p2(0),
fault_injection_r(0), machine(0), cleared(false) {}
fault_injection_r(0), machine(0), cleared(false) {
ASSERT(addresses.size() >= 1);
}
Future<KillType> onShutdown() { return shutdownSignal.getFuture(); }
@ -81,6 +84,10 @@ public:
bool isExcluded() const { return excluded; }
bool isCleared() const { return cleared; }
const NetworkAddress& getPrimaryAddress() {
return addresses[0];
}
// Returns true if the class represents an acceptable worker
bool isAvailableClass() const {
switch (startingClass._class) {
@ -112,8 +119,10 @@ public:
inline void setGlobal(size_t id, flowGlobalType v) { globals.resize(std::max(globals.size(),id+1)); globals[id] = v; };
std::string toString() const {
const NetworkAddress& address = addresses[0];
return format("name: %s address: %d.%d.%d.%d:%d zone: %s datahall: %s class: %s excluded: %d cleared: %d",
name, (address.ip>>24)&0xff, (address.ip>>16)&0xff, (address.ip>>8)&0xff, address.ip&0xff, address.port, (locality.zoneId().present() ? locality.zoneId().get().printable().c_str() : "[unset]"), (locality.dataHallId().present() ? locality.dataHallId().get().printable().c_str() : "[unset]"), startingClass.toString().c_str(), excluded, cleared); }
name, (address.ip>>24)&0xff, (address.ip>>16)&0xff, (address.ip>>8)&0xff, address.ip&0xff, address.port, (locality.zoneId().present() ? locality.zoneId().get().printable().c_str() : "[unset]"), (locality.dataHallId().present() ? locality.dataHallId().get().printable().c_str() : "[unset]"), startingClass.toString().c_str(), excluded, cleared);
}
// Members not for external use
Promise<KillType> shutdownSignal;

View File

@ -380,6 +380,7 @@ ACTOR Future<Void> simulatedMachine(
state std::vector<std::string> myFolders;
state std::vector<std::string> coordFolders;
state UID randomId = g_nondeterministic_random->randomUniqueID();
state int listenPerProcess = 2; // g_random->randomInt(1, 3);
try {
CSimpleIni ini;
@ -415,8 +416,9 @@ ACTOR Future<Void> simulatedMachine(
for( int i = 0; i < ips.size(); i++ ) {
std::string path = joinPath(myFolders[i], "fdb.cluster");
Reference<ClusterConnectionFile> clusterFile(useSeedFile ? new ClusterConnectionFile(path, connStr.toString()) : new ClusterConnectionFile(path));
processes.push_back(simulatedFDBDRebooter(clusterFile, ips[i], sslEnabled, tlsOptions, i*1 + 1, 1, localities, processClass, &myFolders[i], &coordFolders[i], baseFolder, connStr, useSeedFile, runBackupAgents));
TraceEvent("SimulatedMachineProcess", randomId).detail("Address", NetworkAddress(ips[i], i+1, true, false)).detailext("ZoneId", localities.zoneId()).detailext("DataHall", localities.dataHallId()).detail("Folder", myFolders[i]);
const int listenPort = i*listenPerProcess + 1;
processes.push_back(simulatedFDBDRebooter(clusterFile, ips[i], sslEnabled, tlsOptions, listenPort, listenPerProcess, localities, processClass, &myFolders[i], &coordFolders[i], baseFolder, connStr, useSeedFile, runBackupAgents));
TraceEvent("SimulatedMachineProcess", randomId).detail("Address", NetworkAddress(ips[i], listenPort, true, false)).detailext("ZoneId", localities.zoneId()).detailext("DataHall", localities.dataHallId()).detail("Folder", myFolders[i]);
}
TEST( bootCount >= 1 ); // Simulated machine rebooted
@ -1117,6 +1119,10 @@ void setupSimulatedSystem( vector<Future<Void>> *systemActors, std::string baseF
g_random->randomShuffle(coordinatorAddresses);
ASSERT( coordinatorAddresses.size() == coordinatorCount );
printf("Coordinator Address: %d\n", coordinatorAddresses.size());
for (const auto& coord : coordinatorAddresses) {
printf("Coordinator: %s\n", coord.toString().c_str());
}
ClusterConnectionString conn(coordinatorAddresses, LiteralStringRef("TestCluster:0"));
// If extraDB==0, leave g_simulator.extraDB as null because the test does not use DR.

View File

@ -55,7 +55,7 @@ using namespace boost::asio::ip;
//
// xyzdev
// vvvv
const uint64_t currentProtocolVersion = 0x0FDB00B061020002LL;
const uint64_t currentProtocolVersion = 0x0FDB00B061020001LL;
const uint64_t compatibleProtocolVersionMask = 0xffffffffffff0000LL;
const uint64_t minValidProtocolVersion = 0x0FDB00A200060001LL;