simulator: Support multiple listeners on single process

Sim2Listener can now take the network address to listen on. This is
used to listen to multiple ports in simulator and test the patch
which added multiple network addresses to single endpoint.
This commit is contained in:
Vishesh Yadav 2018-12-06 11:48:50 -08:00
parent 3eb9b23024
commit e04abf25f7
3 changed files with 60 additions and 61 deletions

View File

@ -676,7 +676,10 @@ struct SimDiskSpace {
void doReboot( ISimulator::ProcessInfo* const& p, ISimulator::KillType const& kt );
struct Sim2Listener : IListener, ReferenceCounted<Sim2Listener> {
explicit Sim2Listener( ISimulator::ProcessInfo* process ) : process(process) {}
explicit Sim2Listener( ISimulator::ProcessInfo* process, const NetworkAddress& listenAddr )
: process(process),
address(listenAddr) {}
void incomingConnection( double seconds, Reference<IConnection> conn ) { // Called by another process!
incoming( Reference<Sim2Listener>::addRef( this ), seconds, conn );
}
@ -688,7 +691,7 @@ struct Sim2Listener : IListener, ReferenceCounted<Sim2Listener> {
return popOne( nextConnection.getFuture() );
}
virtual NetworkAddress getListenAddress() { return process->address; }
virtual NetworkAddress getListenAddress() { return address; }
private:
ISimulator::ProcessInfo* process;
@ -707,6 +710,8 @@ private:
((Sim2Conn*)c.getPtr())->opened = true;
return c;
}
NetworkAddress address;
};
#define g_sim2 ((Sim2&)g_simulator)
@ -775,9 +780,11 @@ public:
Reference<Sim2Conn> myc( new Sim2Conn( getCurrentProcess() ) );
Reference<Sim2Conn> peerc( new Sim2Conn( peerp ) );
myc->connect(peerc, toAddr); peerc->connect(myc, NetworkAddress( getCurrentProcess()->address.ip + g_random->randomInt(0,256), g_random->randomInt(40000, 60000) ));
myc->connect(peerc, toAddr);
peerc->connect(myc, NetworkAddress( getCurrentProcess()->address.ip + g_random->randomInt(0,256),
g_random->randomInt(40000, 60000) ));
((Sim2Listener*)peerp->listener.getPtr())->incomingConnection( 0.5*g_random->random01(), Reference<IConnection>(peerc) );
((Sim2Listener*)peerp->getListener(toAddr).getPtr())->incomingConnection( 0.5*g_random->random01(), Reference<IConnection>(peerc) );
return onConnect( ::delay(0.5*g_random->random01()), myc );
}
virtual Future<std::vector<NetworkAddress>> resolveTCPEndpoint( std::string host, std::string service) {
@ -794,8 +801,9 @@ public:
}
virtual Reference<IListener> listen( NetworkAddress localAddr ) {
ASSERT( !localAddr.isTLS() );
ASSERT( localAddr == getCurrentProcess()->address );
return Reference<IListener>( getCurrentProcess()->listener );
Reference<IListener> listener( getCurrentProcess()->getListener(localAddr) );
ASSERT(listener);
return listener;
}
ACTOR static Future<Reference<IConnection>> waitForProcessAndConnect(
NetworkAddress toAddr, INetworkConnections *self ) {
@ -949,7 +957,7 @@ public:
virtual void run() {
_run(this);
}
virtual ProcessInfo* newProcess(const char* name, uint32_t ip, uint16_t port,
virtual ProcessInfo* newProcess(const char* name, uint32_t ip, uint16_t port, uint16_t listenPerProcess,
LocalityData locality, ProcessClass startingClass, const char* dataFolder, const char* coordinationFolder) {
ASSERT( locality.zoneId().present() );
MachineInfo& machine = machines[ locality.zoneId().get() ];
@ -969,19 +977,26 @@ public:
// These files must live on after process kills for sim purposes.
if( machine.machineProcess == 0 ) {
NetworkAddress machineAddress(ip, 0, false, false);
machine.machineProcess = new ProcessInfo("Machine", locality, startingClass, machineAddress, this, "", "");
machine.machineProcess = new ProcessInfo("Machine", locality, startingClass, {machineAddress}, this, "", "");
machine.machineProcess->machine = &machine;
}
NetworkAddress address(ip, port, true, false); // SOMEDAY see above about becoming SSL!
ProcessInfo* m = new ProcessInfo(name, locality, startingClass, address, this, dataFolder, coordinationFolder);
m->listener = Reference<IListener>( new Sim2Listener(m) );
NetworkAddressList addresses;
for (int processPort = port; processPort < port + listenPerProcess; ++processPort) {
addresses.emplace_back(ip, processPort, true, false);
}
ProcessInfo* m = new ProcessInfo(name, locality, startingClass, addresses, this, dataFolder, coordinationFolder);
for (int processPort = port; processPort < port + listenPerProcess; ++processPort) {
NetworkAddress address(ip, processPort, true, false); // SOMEDAY see above about becoming SSL!
m->listenerMap[address] = Reference<IListener>( new Sim2Listener(m, address) );
addressMap[address] = m;
}
m->machine = &machine;
machine.processes.push_back(m);
currentlyRebootingProcesses.erase(address);
addressMap[ m->address ] = m;
m->excluded = g_simulator.isExcluded(address);
m->cleared = g_simulator.isCleared(address);
currentlyRebootingProcesses.erase(addresses[0]);
m->excluded = g_simulator.isExcluded(addresses[0]);
m->cleared = g_simulator.isCleared(addresses[0]);
m->setGlobal(enTDMetrics, (flowGlobalType) &m->tdmetrics);
m->setGlobal(enNetworkConnections, (flowGlobalType) m->network);
@ -1514,7 +1529,7 @@ public:
Sim2() : time(0.0), taskCount(0), yielded(false), yield_limit(0), currentTaskID(-1) {
// Not letting currentProcess be NULL eliminates some annoying special cases
currentProcess = new ProcessInfo( "NoMachine", LocalityData(Optional<Standalone<StringRef>>(), StringRef(), StringRef(), StringRef()), ProcessClass(), NetworkAddress(), this, "", "" );
currentProcess = new ProcessInfo( "NoMachine", LocalityData(Optional<Standalone<StringRef>>(), StringRef(), StringRef(), StringRef()), ProcessClass(), {NetworkAddress()}, this, "", "" );
g_network = net2 = newNet2(false, true);
Net2FileSystem::newFileSystem();
check_yield(0);

View File

@ -49,11 +49,12 @@ public:
const char* coordinationFolder;
const char* dataFolder;
MachineInfo* machine;
NetworkAddressList addresses;
NetworkAddress address;
LocalityData locality;
ProcessClass startingClass;
TDMetricCollection tdmetrics;
Reference<IListener> listener;
std::map<NetworkAddress, Reference<IListener>> listenerMap;
bool failed;
bool excluded;
bool cleared;
@ -66,9 +67,9 @@ public:
uint64_t fault_injection_r;
double fault_injection_p1, fault_injection_p2;
ProcessInfo(const char* name, LocalityData locality, ProcessClass startingClass, NetworkAddress address,
ProcessInfo(const char* name, LocalityData locality, ProcessClass startingClass, NetworkAddressList addresses,
INetworkConnections *net, const char* dataFolder, const char* coordinationFolder )
: name(name), locality(locality), startingClass(startingClass), address(address), dataFolder(dataFolder),
: name(name), locality(locality), startingClass(startingClass), addresses(addresses), address(addresses[0]), dataFolder(dataFolder),
network(net), coordinationFolder(coordinationFolder), failed(false), excluded(false), cpuTicks(0),
rebooting(false), fault_injection_p1(0), fault_injection_p2(0),
fault_injection_r(0), machine(0), cleared(false) {}
@ -98,6 +99,15 @@ public:
}
}
const Reference<IListener> getListener(const NetworkAddress& addr) {
auto listener = listenerMap.find(addr);
if (listener == listenerMap.end()) {
TraceEvent("VISHESH: GetListenerFailed").detail("Address", addr.toString());
return Reference<IListener>();
}
return listener->second;
}
inline flowGlobalType global(int id) { return (globals.size() > id) ? globals[id] : NULL; };
inline void setGlobal(size_t id, flowGlobalType v) { globals.resize(std::max(globals.size(),id+1)); globals[id] = v; };
@ -120,31 +130,12 @@ public:
MachineInfo() : machineProcess(0) {}
};
template <class Func>
ProcessInfo* asNewProcess( const char* name, uint32_t ip, uint16_t port, LocalityData locality, ProcessClass startingClass,
Func func, const char* dataFolder, const char* coordinationFolder ) {
ProcessInfo* m = newProcess(name, ip, port, locality, startingClass, dataFolder, coordinationFolder);
// ProcessInfo* m = newProcess(name, ip, port, zoneId, machineId, dcId, startingClass, dataFolder, coordinationFolder);
std::swap(m, currentProcess);
try {
func();
} catch (Error& e) {
TraceEvent(SevError, "NewMachineError").error(e);
killProcess(currentProcess, KillInstantly);
} catch (...) {
TraceEvent(SevError, "NewMachineError").error(unknown_error());
killProcess(currentProcess, KillInstantly);
}
std::swap(m, currentProcess);
return m;
}
ProcessInfo* getProcess( Endpoint const& endpoint ) { return getProcessByAddress(endpoint.getPrimaryAddress()); }
ProcessInfo* getCurrentProcess() { return currentProcess; }
virtual Future<Void> onProcess( ISimulator::ProcessInfo *process, int taskID = -1 ) = 0;
virtual Future<Void> onMachine( ISimulator::ProcessInfo *process, int taskID = -1 ) = 0;
virtual ProcessInfo* newProcess(const char* name, uint32_t ip, uint16_t port, LocalityData locality, ProcessClass startingClass, const char* dataFolder, const char* coordinationFolder) = 0;
virtual ProcessInfo* newProcess(const char* name, uint32_t ip, uint16_t port, uint16_t listenPerProcess, LocalityData locality, ProcessClass startingClass, const char* dataFolder, const char* coordinationFolder) = 0;
virtual void killProcess( ProcessInfo* machine, KillType ) = 0;
virtual void rebootProcess(Optional<Standalone<StringRef>> zoneId, bool allProcesses ) = 0;
virtual void rebootProcess( ProcessInfo* process, KillType kt ) = 0;

View File

@ -198,6 +198,7 @@ ACTOR Future<ISimulator::KillType> simulatedFDBDRebooter(
bool sslEnabled,
Reference<TLSOptions> tlsOptions,
uint16_t port,
uint16_t listenPerProcess,
LocalityData localities,
ProcessClass processClass,
std::string* dataFolder,
@ -221,7 +222,7 @@ ACTOR Future<ISimulator::KillType> simulatedFDBDRebooter(
wait( delay( waitTime ) );
state ISimulator::ProcessInfo *process = g_simulator.newProcess( "Server", ip, port, localities, processClass, dataFolder->c_str(), coordFolder->c_str() );
state ISimulator::ProcessInfo *process = g_simulator.newProcess( "Server", ip, port, listenPerProcess, localities, processClass, dataFolder->c_str(), coordFolder->c_str() );
wait( g_simulator.onProcess(process, TaskDefaultYield) ); // Now switch execution to the process on which we will run
state Future<ISimulator::KillType> onShutdown = process->onShutdown();
@ -251,13 +252,21 @@ ACTOR Future<ISimulator::KillType> simulatedFDBDRebooter(
if (sslEnabled) {
tlsOptions->register_network();
}
NetworkAddress n(ip, port, true, sslEnabled);
Future<Void> listen = FlowTransport::transport().bind( n, n );
vector<Future<Void>> futures;
for (int listenPort = port; listenPort < port + listenPerProcess; ++listenPort) {
NetworkAddress n(ip, listenPort, true, sslEnabled);
futures.push_back(FlowTransport::transport().bind( n, n ));
}
Future<Void> fd = fdbd( connFile, localities, processClass, *dataFolder, *coordFolder, 500e6, "", "");
Future<Void> backup = runBackupAgents ? runBackup(connFile) : Future<Void>(Never());
Future<Void> dr = runBackupAgents ? runDr(connFile) : Future<Void>(Never());
wait(listen || fd || success(onShutdown) || backup || dr);
futures.push_back(fd);
futures.push_back(backup);
futures.push_back(dr);
futures.push_back(success(onShutdown));
wait( waitForAny(futures) );
} catch (Error& e) {
// If in simulation, if we make it here with an error other than io_timeout but enASIOTimedOut is set then somewhere an io_timeout was converted to a different error.
if(g_network->isSimulated() && e.code() != error_code_io_timeout && (bool)g_network->global(INetwork::enASIOTimedOut))
@ -406,7 +415,7 @@ ACTOR Future<Void> simulatedMachine(
for( int i = 0; i < ips.size(); i++ ) {
std::string path = joinPath(myFolders[i], "fdb.cluster");
Reference<ClusterConnectionFile> clusterFile(useSeedFile ? new ClusterConnectionFile(path, connStr.toString()) : new ClusterConnectionFile(path));
processes.push_back(simulatedFDBDRebooter(clusterFile, ips[i], sslEnabled, tlsOptions, i + 1, localities, processClass, &myFolders[i], &coordFolders[i], baseFolder, connStr, useSeedFile, runBackupAgents));
processes.push_back(simulatedFDBDRebooter(clusterFile, ips[i], sslEnabled, tlsOptions, i*1 + 1, 1, localities, processClass, &myFolders[i], &coordFolders[i], baseFolder, connStr, useSeedFile, runBackupAgents));
TraceEvent("SimulatedMachineProcess", randomId).detail("Address", NetworkAddress(ips[i], i+1, true, false)).detailext("ZoneId", localities.zoneId()).detailext("DataHall", localities.dataHallId()).detail("Folder", myFolders[i]);
}
@ -1204,22 +1213,6 @@ void setupSimulatedSystem( vector<Future<Void>> *systemActors, std::string baseF
baseFolder, false, i == useSeedForMachine, false ),
"SimulatedTesterMachine") );
}
/*int testerCount = g_random->randomInt(4, 9);
for(int i=0; i<testerCount; i++)
g_simulator.asNewProcess("TestWorker", 0x03040301 + i, LocalityData(g_random->randomUniqueID().toString(), Optional<Standalone<StringRef>>()), [&] {
vector<Future<Void>> v;
Reference<AsyncVar<ClusterControllerFullInterface>> cc( new AsyncVar<ClusterControllerFullInterface> );
Reference<AsyncVar<ClusterInterface>> ci( new AsyncVar<ClusterInterface> );
v.push_back( monitorLeader( coordinators, cc ) );
v.push_back( extractClusterInterface(cc,ci) );
v.push_back( failureMonitorClient( ci ) );
v.push_back( testerServer( cc ) );
systemActors->push_back( waitForAll(v) );
});*/
*pStartingConfiguration = startingConfigString;
// save some state that we only need when restarting the simulator.
@ -1283,7 +1276,7 @@ ACTOR void setupAndRun(std::string dataFolder, const char *testFile, bool reboot
checkExtraDB(testFile, extraDB, minimumReplication, minimumRegions);
wait( g_simulator.onProcess( g_simulator.newProcess(
"TestSystem", 0x01010101, 1, LocalityData(Optional<Standalone<StringRef>>(), Standalone<StringRef>(g_random->randomUniqueID().toString()), Optional<Standalone<StringRef>>(), Optional<Standalone<StringRef>>()), ProcessClass(ProcessClass::TesterClass, ProcessClass::CommandLineSource), "", "" ), TaskDefaultYield ) );
"TestSystem", 0x01010101, 1, 1, LocalityData(Optional<Standalone<StringRef>>(), Standalone<StringRef>(g_random->randomUniqueID().toString()), Optional<Standalone<StringRef>>(), Optional<Standalone<StringRef>>()), ProcessClass(ProcessClass::TesterClass, ProcessClass::CommandLineSource), "", "" ), TaskDefaultYield ) );
Sim2FileSystem::newFileSystem();
FlowTransport::createInstance(1);
if (tlsOptions->enabled()) {