diff --git a/fdbclient/MultiVersionTransaction.actor.cpp b/fdbclient/MultiVersionTransaction.actor.cpp index 0fa775ce63..8fe5976c74 100644 --- a/fdbclient/MultiVersionTransaction.actor.cpp +++ b/fdbclient/MultiVersionTransaction.actor.cpp @@ -18,15 +18,18 @@ * limitations under the License. */ +#include "fdbclient/CoordinationInterface.h" #include "fdbclient/MultiVersionTransaction.h" #include "fdbclient/MultiVersionAssignmentVars.h" +#include "fdbclient/Status.h" #include "fdbclient/ThreadSafeTransaction.h" +#include "fdbrpc/fdbrpc.h" #include "flow/Platform.h" #include "flow/UnitTest.h" -#include "flow/actorcompiler.h" // This must be the last #include. #include "flow/network.h" +#include "flow/actorcompiler.h" // This must be the last #include. void throwIfError(FdbCApi::fdb_error_t e) { if(e) { @@ -718,6 +721,8 @@ MultiVersionDatabase::MultiVersionDatabase(MultiVersionApi *api, std::string clu dbState->db = db; dbState->dbVar->set(db); + std::cout << "TRYING TO ADD CONNECTION" << std::endl; + if(!openConnectors) { dbState->currentClientIndex = 0; } @@ -906,6 +911,133 @@ void MultiVersionDatabase::DatabaseState::stateChanged() { currentClientIndex = newIndex; } +ACTOR Future getServerProtocol(Endpoint endpoint) { + RequestStream requestStream{ endpoint }; + auto f = retryBrokenPromise(requestStream, ProtocolInfoRequest{}); + std::cout << " MAKING REQUEST FOR PROTOCOL INFO FROM: " << requestStream.getEndpoint().addresses.toString() << std::endl; + + while(!f.isReady()) { // TODO figure out how to make this work + if(f.isError() || !f.isValid()) { + std::cout << "SOMEE ERROR OCCURED" << std::endl; + } + } + ProtocolInfoReply res = wait(f); + + std::cout << "GOT VERSION: " << res.version.version() << std::endl; + return Void(); +} + +// Check if a quorum of coordination servers is reachable +// Will not throw, will just return non-present Optional if error +ACTOR Future> multiVersionClientCoordinatorsStatusFetcher(Reference f, bool *quorum_reachable, int *coordinatorsFaultTolerance) { + try { + std::cout << "TYING TO GET STATUS" << std::endl; + state ClientCoordinators coord(f); + state StatusObject statusObj; + + state vector>> leaderServers; + for (int i = 0; i < coord.clientLeaderServers.size(); i++) { + leaderServers.push_back(retryBrokenPromise(coord.clientLeaderServers[i].getLeader, GetLeaderRequest(coord.clusterKey, UID()), TaskPriority::CoordinationReply)); + } + + state vector> coordProtocols; + coordProtocols.reserve(coord.clientLeaderServers.size()); + for (int i = 0; i < coord.clientLeaderServers.size(); i++) { + std::cout << "SENDING TO : " << coord.clientLeaderServers[i].getLeader.getEndpoint().addresses.toString() << std::endl; + RequestStream requestStream{ Endpoint{ + { coord.clientLeaderServers[i].getLeader.getEndpoint().addresses }, WLTOKEN_PROTOCOL_INFO } }; + Future r = retryBrokenPromise(requestStream, ProtocolInfoRequest{}); + while(!r.isReady()) { // TODO figure out how to make this work + if(r.isError() || !r.isValid()) { + std::cout << "SOMEE ERROR OCCURED" << std::endl; + } + } + coordProtocols.push_back(r); + // coordProtocols.push_back(retryBrokenPromise(requestStream, ProtocolInfoRequest{})); + } + // wait(getServerProtocol(Endpoint{{ coord.clientLeaderServers[0].getLeader.getEndpoint().addresses }, WLTOKEN_PROTOCOL_INFO } )); + // return Optional(); + + + wait(waitForAll(coordProtocols)); + + for(Future r : coordProtocols) { + std::cout << "PROTOCOL VESRION: " << r.get().version.version() << std::endl; + } + + std::cout << "WAITING FOR LEADER SERVERS" << std::endl; + wait(smartQuorum(leaderServers, leaderServers.size() / 2 + 1, 1.5) && + smartQuorum(coordProtocols, coordProtocols.size() / 2 + 1, 1.5) || + delay(2.0)); + std::cout << "GOT LEADER SERVERS" << std::endl; + + statusObj["quorum_reachable"] = *quorum_reachable = quorum(leaderServers, leaderServers.size() / 2 + 1).isReady(); + + StatusArray coordsStatus; + int coordinatorsUnavailable = 0; + for (int i = 0; i < leaderServers.size(); i++) { + StatusObject coordStatus; + coordStatus["address"] = coord.clientLeaderServers[i].getLeader.getEndpoint().getPrimaryAddress().toString(); + + if (leaderServers[i].isReady()){ + coordStatus["reachable"] = true; + } + else { + coordinatorsUnavailable++; + coordStatus["reachable"] = false; + } + if (coordProtocols[i].isReady()) { + coordStatus["protocol"] = coordProtocols[i].get().version.version(); + } + coordsStatus.push_back(coordStatus); + } + statusObj["coordinators"] = coordsStatus; + + *coordinatorsFaultTolerance = (leaderServers.size() - 1) / 2 - coordinatorsUnavailable; + return statusObj; + } + catch (Error &e){ + *quorum_reachable = false; + return Optional(); + } +} + +void getCoordinatorProtocolFromStatusObject(StatusObjectReader statusObj) { + // StatusObjectReader statusObjClient; + // statusObj.get("client", statusObjClient); + StatusObjectReader statusObjCoordinators; + StatusArray coordinatorsArr; + + if (statusObj.get("coordinators", statusObjCoordinators)) { + std::cout << "IN HERE" << std::endl; + // Look for a second "coordinators", under the first one. + if (statusObjCoordinators.has("coordinators")) + coordinatorsArr = statusObjCoordinators.last().get_array(); + } + + for (StatusObjectReader coor : coordinatorsArr){ + uint64_t version; + coor.get("protocol", version); + std::cout << "COORD PROTOCOL VERSION: " << version << std::endl; + } +} + +ACTOR Future getCoordinatorProtocols(Reference f) { + state bool quorum_reachable = false; + state int coordinatorsFaultTolerance = 0; + + Optional statusObjOptional = wait(multiVersionClientCoordinatorsStatusFetcher(f, &quorum_reachable, &coordinatorsFaultTolerance)); + if(!quorum_reachable) { + std::cout << "QUORUM NOT REACHABLE" << std::endl; + } + std::cout << "GOT STATUS OBJECT" << std::endl; + + StatusObject statusObj= statusObjOptional.get(); + getCoordinatorProtocolFromStatusObject(statusObj); + + return Void(); +} + // TODO: adding connections here and attempt connections void MultiVersionDatabase::DatabaseState::addConnection(Reference client, std::string clusterFilePath) { // do check if compatible - call StatusClient::statusFetcher(Database)? @@ -1061,7 +1193,7 @@ void MultiVersionApi::addExternalLibraryDirectory(std::string path) { if(externalClients.count(filename) == 0) { TraceEvent("AddingExternalClient").detail("LibraryPath", filename); externalClients[filename] = Reference(new ClientInfo(new DLApi(lib), lib)); - } + } } } @@ -1154,8 +1286,10 @@ void MultiVersionApi::setNetworkOptionInternal(FDBNetworkOptions::Option option, } } +// TODO: should the check be happening here? Before we setup network? the network should be setup before external connections are added void MultiVersionApi::setupNetwork() { if(!externalClient) { + std::cout << "SETTING UP ENV VARIABLES" << std::endl; loadEnvironmentVariableNetworkOptions(); } @@ -1181,12 +1315,14 @@ void MultiVersionApi::setupNetwork() { } localClient->loadProtocolVersion(); + std::cout << "SET UP NETWORK LOCAL CLIENT PROTOCOL VERSION: " << localClient->protocolVersion.version() << std::endl; if(!bypassMultiClientApi) { runOnExternalClients([this](Reference client) { TraceEvent("InitializingExternalClient").detail("LibraryPath", client->libPath); client->api->selectApiVersion(apiVersion); client->loadProtocolVersion(); + std::cout << "EXTERNAL CLIENT PROTOCOL VERSION: " << client->protocolVersion.version() << std::endl; }); MutexHolder holder(lock); @@ -1229,6 +1365,40 @@ void MultiVersionApi::runNetwork() { lock.leave(); + std::cout << "STARTING NETWORK THREAD" << std::endl; + + // TODO: ONLY RUN NETWORK THREADS OF OTHER CLIENTS IF WE KNOW THAT THEY ARE COMPATIBLE?? + // OR ALLOW THE NETWORK THREADS TO START, BUT KILL THEM AFTER REALIZING THEY'RE INCOMPATIBLE + // MAYBE: ONLY CREATE NETWORK THREAD FOR CURRENT CLIENT. THEN WHEN CREATING DATABASE, START NETWORK THREADS FOR OTHER AS WELL? + + // std::vector handles; + // if(!bypassMultiClientApi) { + // runOnExternalClients([&handles](Reference client) { + // if(client->external) { + // handles.push_back(g_network->startThread(&runNetworkThread, client.getPtr())); + // } + // }); + // } + + localClient->api->runNetwork(); + + // for(auto h : handles) { + // waitThread(h); + // } + // std::cout << "DONE WAITING FOR EXTERNAL CLIENTS" << std::endl; +} + +void MultiVersionApi::runExternalNetwork() { + lock.enter(); + if(!networkSetup) { + lock.leave(); + throw network_not_setup(); + } + + lock.leave(); + + std::cout << "STARTING EXTERNAL NETWORK THREAD" << std::endl; + std::vector handles; if(!bypassMultiClientApi) { runOnExternalClients([&handles](Reference client) { @@ -1243,6 +1413,7 @@ void MultiVersionApi::runNetwork() { for(auto h : handles) { waitThread(h); } + std::cout << "DONE WAITING FOR EXTERNAL CLIENTS" << std::endl; } void MultiVersionApi::stopNetwork() { @@ -1280,20 +1451,33 @@ void MultiVersionApi::addNetworkThreadCompletionHook(void (*hook)(void*), void * } Reference MultiVersionApi::createDatabase(const char *clusterFilePath) { + std::cout << "CREATING DATABASE FROM MULTIVERSIONAPI" << std::endl; lock.enter(); if(!networkSetup) { + std::cout << "NETWORK NOT SETUP" << std::endl; lock.leave(); throw network_not_setup(); } lock.leave(); std::string clusterFile(clusterFilePath); + + // TODO: may need to keep map of compatible clients per cluster file + // how to wait to get coordinator protocol? createDatabase cannot be an Actor + + // May have to setup a reciever on this end? + + // async try to find which clients are compatible. Set map varialbe to true if compatible? + auto f = Reference(new ClusterConnectionFile(clusterFile)); + getCoordinatorProtocols(f); + if(localClientDisabled) { return Reference(new MultiVersionDatabase(this, clusterFile, Reference())); } auto db = localClient->api->createDatabase(clusterFilePath); if(bypassMultiClientApi) { + std::cout << "IS BYPASS" << std::endl; return db; } else { diff --git a/fdbclient/MultiVersionTransaction.h b/fdbclient/MultiVersionTransaction.h index ce561f1d37..6d7550ecc7 100644 --- a/fdbclient/MultiVersionTransaction.h +++ b/fdbclient/MultiVersionTransaction.h @@ -20,6 +20,8 @@ #ifndef FDBCLIENT_MULTIVERSIONTRANSACTION_H #define FDBCLIENT_MULTIVERSIONTRANSACTION_H +#include "fdbclient/CoordinationInterface.h" +#include "fdbrpc/fdbrpc.h" #pragma once #include "fdbclient/FDBOptions.g.h" @@ -27,6 +29,7 @@ #include "fdbclient/IClientApi.h" #include "flow/ThreadHelper.actor.h" +#include "flow/actorcompiler.h" // This must be the last #include. struct FdbCApi : public ThreadSafeReferenceCounted { typedef struct future FDBFuture; @@ -415,8 +418,12 @@ private: void setNetworkOptionInternal(FDBNetworkOptions::Option option, Optional value); + void runExternalNetwork(); + Reference localClient; std::map> externalClients; + std::map> clusterFileCompatibleClients; // should this be scoped to clusterfile or coordinator address + volatile std::map clusterFileCompatibleSet; bool networkStartSetup; volatile bool networkSetup; diff --git a/fdbrpc/FlowTransport.actor.cpp b/fdbrpc/FlowTransport.actor.cpp index d5ce4dde59..6f16b42624 100644 --- a/fdbrpc/FlowTransport.actor.cpp +++ b/fdbrpc/FlowTransport.actor.cpp @@ -996,14 +996,13 @@ ACTOR static Future connectionReader( BinaryReader pktReader(unprocessed_begin, connectPacketSize, AssumeVersion(protocolVersion)); ConnectPacket pkt; serializer(pktReader, pkt); - std::cout << "INCOMING PKT VERSION: " << pkt.protocolVersion.version() << std::endl; + // std::cout << "INCOMING PKT VERSION: " << pkt.protocolVersion.version() << " FROM " << peer->destination.toString() << " TO " << conn->getPeerAddress().toString() << std::endl; uint64_t connectionId = pkt.connectionId; if (!pkt.protocolVersion.hasObjectSerializerFlag() || // !pkt.protocolVersion.hasStableInterfaces()) { !pkt.protocolVersion.isCompatible(g_network->protocolVersion())) { - // !pkt.protocolVersion.isCompatible(ProtocolVersion(ProtocolVersion::minValidProtocolVersion))) { // TODO DELETE THIS incompatibleProtocolVersionNewer = pkt.protocolVersion > g_network->protocolVersion(); NetworkAddress addr = pkt.canonicalRemotePort ? NetworkAddress(pkt.canonicalRemoteIp(), pkt.canonicalRemotePort) @@ -1066,6 +1065,7 @@ ACTOR static Future connectionReader( peerAddress = NetworkAddress(pkt.canonicalRemoteIp(), pkt.canonicalRemotePort, true, peerAddress.isTLS()); } + // TODO: try to get or open peer here peer = transport->getOrOpenPeer(peerAddress, false); peer->compatible = compatible; peer->incompatibleProtocolVersionNewer = incompatibleProtocolVersionNewer; @@ -1161,6 +1161,8 @@ Reference TransportData::getPeer( NetworkAddress const& address ) { } Reference TransportData::getOrOpenPeer( NetworkAddress const& address, bool startConnectionKeeper ) { + // std::cout << "Get or open peer " << address.toString() << std::endl; + // TraceEvent("Get Or open peer"); auto peer = getPeer(address); if(!peer) { peer = Reference( new Peer(this, address) ); diff --git a/fdbrpc/sim2.actor.cpp b/fdbrpc/sim2.actor.cpp index 22486139a9..5c2e04e561 100644 --- a/fdbrpc/sim2.actor.cpp +++ b/fdbrpc/sim2.actor.cpp @@ -1030,6 +1030,7 @@ public: ProcessInfo* m = new ProcessInfo(name, locality, startingClass, addresses, this, dataFolder, coordinationFolder); for (int processPort = port; processPort < port + listenPerProcess; ++processPort) { NetworkAddress address(ip, processPort, true, sslEnabled && processPort == port); + TraceEvent("SETTING UP SIM2LIstenr"); m->listenerMap[address] = Reference( new Sim2Listener(m, address) ); addressMap[address] = m; } @@ -1704,7 +1705,7 @@ public: return Void(); return delay( 0, taskID, process->machine->machineProcess ); } - + virtual ProtocolVersion protocolVersion() override { ASSERT(processProtocolVersion.find(getCurrentProcess()) != processProtocolVersion.end()); return processProtocolVersion.at(getCurrentProcess()); diff --git a/fdbserver/ClusterController.actor.cpp b/fdbserver/ClusterController.actor.cpp index 1ecf7aa5a8..956d1c1f5b 100644 --- a/fdbserver/ClusterController.actor.cpp +++ b/fdbserver/ClusterController.actor.cpp @@ -2950,6 +2950,7 @@ ACTOR Future clusterControllerCore( ClusterControllerFullInterface interf, registerWorker( req, &self ); } when( GetWorkersRequest req = waitNext( interf.getWorkers.getFuture() ) ) { + std::cout << "GOT WORKER FROM: " << req.reply.getEndpoint().addresses.toString() << std::endl; ++self.getWorkersRequests; vector workers; diff --git a/fdbserver/Coordination.actor.cpp b/fdbserver/Coordination.actor.cpp index 96bababe1c..de636538ca 100644 --- a/fdbserver/Coordination.actor.cpp +++ b/fdbserver/Coordination.actor.cpp @@ -24,10 +24,13 @@ #include "fdbserver/WorkerInterface.actor.h" #include "fdbserver/Status.h" #include "flow/ActorCollection.h" +#include "flow/ProtocolVersion.h" #include "flow/UnitTest.h" #include "flow/IndexedSet.h" #include "fdbclient/MonitorLeader.h" #include "flow/actorcompiler.h" // This must be the last #include. +#include "flow/network.h" +#include // This module implements coordinationServer() and the interfaces in CoordinationInterface.h @@ -204,7 +207,7 @@ TEST_CASE("/fdbserver/Coordination/localGenerationReg/simple") { } ACTOR Future openDatabase(ClientData* db, int* clientCount, Reference> hasConnectedClients, OpenDatabaseCoordRequest req) { - // std::cout << "OPEN DATAASE" << std::endl; + std::cout << "OPEN DATABASE" << std::endl; ++(*clientCount); hasConnectedClients->set(true); @@ -464,6 +467,7 @@ struct LeaderRegisterCollection { } LeaderElectionRegInterface& getInterface(KeyRef key, UID id) { + std::cout << "TRYING TO GET INTERFACE" << std::endl; auto i = registerInterfaces.find( key ); if (i == registerInterfaces.end()) { Key k = key; @@ -505,6 +509,10 @@ ACTOR Future leaderServer(LeaderElectionRegInterface interf, OnDemandStore wait( LeaderRegisterCollection::init( ®s ) ); + // if(g_network->protocolVersion() == currentProtocolVersion){ + // loop choose {} + // } + loop choose { when ( OpenDatabaseCoordRequest req = waitNext( interf.openDatabase.getFuture() ) ) { Optional forward = regs.getForward(req.clusterKey); diff --git a/fdbserver/SimulatedCluster.actor.cpp b/fdbserver/SimulatedCluster.actor.cpp index 023b0240ff..5204e56ba1 100644 --- a/fdbserver/SimulatedCluster.actor.cpp +++ b/fdbserver/SimulatedCluster.actor.cpp @@ -18,9 +18,11 @@ * limitations under the License. */ +#include #include #include #include +#include "fdbrpc/Locality.h" #include "fdbrpc/simulator.h" #include "fdbclient/DatabaseContext.h" #include "fdbserver/TesterInterface.actor.h" @@ -36,6 +38,7 @@ #include "fdbclient/versions.h" #include "flow/ProtocolVersion.h" #include "flow/actorcompiler.h" // This must be the last #include. +#include "flow/network.h" #undef max #undef min @@ -139,7 +142,7 @@ ACTOR Future simulatedFDBDRebooter(ReferencerandomUniqueID(); state int cycles = 0; @@ -156,7 +159,7 @@ ACTOR Future simulatedFDBDRebooter(Referencec_str(), - coordFolder->c_str(), currentProtocolVersion); + coordFolder->c_str(), protocolVersion); wait(g_simulator.onProcess(process, TaskPriority::DefaultYield)); // Now switch execution to the process on which we will run state Future onShutdown = process->onShutdown(); @@ -299,7 +302,7 @@ std::map< Optional>, std::vector< std::vector< std::string // process count is no longer needed because it is now the length of the vector of ip's, because it was one ip per process ACTOR Future simulatedMachine(ClusterConnectionString connStr, std::vector ips, bool sslEnabled, LocalityData localities, ProcessClass processClass, std::string baseFolder, bool restarting, - bool useSeedFile, AgentMode runBackupAgents, bool sslOnly, std::string whitelistBinPaths) { + bool useSeedFile, AgentMode runBackupAgents, bool sslOnly, std::string whitelistBinPaths, ProtocolVersion protocolVersion, bool* addedNew) { state int bootCount = 0; state std::vector myFolders; state std::vector coordFolders; @@ -342,7 +345,11 @@ ACTOR Future simulatedMachine(ClusterConnectionString connStr, std::vector Reference clusterFile(useSeedFile ? new ClusterConnectionFile(path, connStr.toString()) : new ClusterConnectionFile(path)); const int listenPort = i*listenPerProcess + 1; AgentMode agentMode = runBackupAgents == AgentOnly ? ( i == ips.size()-1 ? AgentOnly : AgentNone ) : runBackupAgents; - processes.push_back(simulatedFDBDRebooter(clusterFile, ips[i], sslEnabled, listenPort, listenPerProcess, localities, processClass, &myFolders[i], &coordFolders[i], baseFolder, connStr, useSeedFile, agentMode, whitelistBinPaths)); + if(*addedNew == false && i == ips.size()-1) { + processes.push_back(simulatedFDBDRebooter(clusterFile, ips[i], sslEnabled, listenPort, listenPerProcess, localities, processClass, &myFolders[i], &coordFolders[i], baseFolder, connStr, useSeedFile, agentMode, whitelistBinPaths, currentProtocolVersion)); // currentProtocolVersion + *addedNew = true; + } + else processes.push_back(simulatedFDBDRebooter(clusterFile, ips[i], sslEnabled, listenPort, listenPerProcess, localities, processClass, &myFolders[i], &coordFolders[i], baseFolder, connStr, useSeedFile, agentMode, whitelistBinPaths, protocolVersion)); TraceEvent("SimulatedMachineProcess", randomId).detail("Address", NetworkAddress(ips[i], listenPort, true, false)).detail("ZoneId", localities.zoneId()).detail("DataHall", localities.dataHallId()).detail("Folder", myFolders[i]); } @@ -547,7 +554,7 @@ IPAddress makeIPAddressForSim(bool isIPv6, std::array parts) { ACTOR Future restartSimulatedSystem(vector>* systemActors, std::string baseFolder, int* pTesterCount, Optional* pConnString, Standalone* pStartingConfiguration, - int extraDB, std::string whitelistBinPaths) { + int extraDB, std::string whitelistBinPaths, ProtocolVersion protocolVersion) { CSimpleIni ini; ini.SetUnicode(); ini.LoadFile(joinPath(baseFolder, "restartInfo.ini").c_str()); @@ -641,12 +648,12 @@ ACTOR Future restartSimulatedSystem(vector>* systemActors, st LocalityData localities(Optional>(), zoneId, machineId, dcUID); localities.set(LiteralStringRef("data_hall"), dcUID); + bool addedNew = true; // SOMEDAY: parse backup agent from test file systemActors->push_back(reportErrors( simulatedMachine(conn, ipAddrs, usingSSL, localities, processClass, baseFolder, true, - i == useSeedForMachine, AgentAddition, - usingSSL && (listenersPerProcess == 1 || processClass == ProcessClass::TesterClass), - whitelistBinPaths), + i == useSeedForMachine, enableExtraDB ? AgentAddition : AgentNone, + usingSSL && (listenersPerProcess == 1 || processClass == ProcessClass::TesterClass), whitelistBinPaths, protocolVersion, &addedNew), processClass == ProcessClass::TesterClass ? "SimulatedTesterMachine" : "SimulatedMachine")); } @@ -1053,8 +1060,8 @@ void SimulationConfig::generateNormalConfig(int minimumReplication, int minimumR void setupSimulatedSystem(vector>* systemActors, std::string baseFolder, int* pTesterCount, Optional* pConnString, Standalone* pStartingConfiguration, - int extraDB, int minimumReplication, int minimumRegions, std::string whitelistBinPaths, - bool configureLocked, int logAntiQuorum) { + int extraDB, int minimumReplication, int minimumRegions, std::string whitelistBinPaths, bool configureLocked, + int logAntiQuorum, ProtocolVersion protocolVersion) { // SOMEDAY: this does not test multi-interface configurations SimulationConfig simconfig(extraDB, minimumReplication, minimumRegions); if (logAntiQuorum != -1) { @@ -1219,6 +1226,8 @@ void setupSimulatedSystem(vector>* systemActors, std::string baseFo bool requiresExtraDBMachines = extraDB && g_simulator.extraDB->toString() != conn.toString(); int assignedMachines = 0, nonVersatileMachines = 0; std::vector processClassesSubSet = {ProcessClass::UnsetClass, ProcessClass::ResolutionClass, ProcessClass::MasterClass}; + + bool addedNew = false; for( int dc = 0; dc < dataCenters; dc++ ) { //FIXME: test unset dcID Optional> dcUID = StringRef(format("%d", dc)); @@ -1276,7 +1285,7 @@ void setupSimulatedSystem(vector>* systemActors, std::string baseFo LocalityData localities(Optional>(), zoneId, machineId, dcUID); localities.set(LiteralStringRef("data_hall"), dcUID); systemActors->push_back(reportErrors(simulatedMachine(conn, ips, sslEnabled, - localities, processClass, baseFolder, false, machine == useSeedForMachine, requiresExtraDBMachines ? AgentOnly : AgentAddition, sslOnly, whitelistBinPaths ), "SimulatedMachine")); + localities, processClass, baseFolder, false, machine == useSeedForMachine, requiresExtraDBMachines ? AgentOnly : AgentAddition, sslOnly, whitelistBinPaths, protocolVersion, &addedNew ), "SimulatedMachine")); if (requiresExtraDBMachines) { std::vector extraIps; @@ -1290,7 +1299,7 @@ void setupSimulatedSystem(vector>* systemActors, std::string baseFo localities.set(LiteralStringRef("data_hall"), dcUID); systemActors->push_back(reportErrors(simulatedMachine(*g_simulator.extraDB, extraIps, sslEnabled, localities, - processClass, baseFolder, false, machine == useSeedForMachine, AgentNone, sslOnly, whitelistBinPaths ), "SimulatedMachine")); + processClass, baseFolder, false, machine == useSeedForMachine, AgentNone, sslOnly, whitelistBinPaths, protocolVersion, &addedNew ), "SimulatedMachine")); } assignedMachines++; @@ -1314,11 +1323,11 @@ void setupSimulatedSystem(vector>* systemActors, std::string baseFo std::vector ips; ips.push_back(makeIPAddressForSim(useIPv6, { 3, 4, 3, i + 1 })); Standalone newZoneId = Standalone(deterministicRandom()->randomUniqueID().toString()); - LocalityData localities(Optional>(), newZoneId, newZoneId, Optional>()); + LocalityData localities(Optional>(), newZoneId, newZoneId, Optional>()); systemActors->push_back( reportErrors( simulatedMachine( conn, ips, sslEnabled && sslOnly, localities, ProcessClass(ProcessClass::TesterClass, ProcessClass::CommandLineSource), - baseFolder, false, i == useSeedForMachine, AgentNone, sslEnabled && sslOnly, whitelistBinPaths ), + baseFolder, false, i == useSeedForMachine, AgentNone, sslEnabled && sslOnly, whitelistBinPaths, protocolVersion, &addedNew ), "SimulatedTesterMachine") ); } *pStartingConfiguration = startingConfigString; @@ -1338,7 +1347,7 @@ void setupSimulatedSystem(vector>* systemActors, std::string baseFo } void checkTestConf(const char* testFile, int& extraDB, int& minimumReplication, int& minimumRegions, - int& configureLocked, int& logAntiQuorum) { + int& configureLocked, int& logAntiQuorum, ProtocolVersion& protocolVersion) { std::ifstream ifs; ifs.open(testFile, std::ifstream::in); if (!ifs.good()) @@ -1372,7 +1381,13 @@ void checkTestConf(const char* testFile, int& extraDB, int& minimumReplication, } if (attrib == "configureLocked") { - sscanf(value.c_str(), "%d", &configureLocked); + sscanf( value.c_str(), "%d", &configureLocked ); + } + + if (attrib == "protocolVersion") { + uint64_t protocolVersionInt = 0; + sscanf(value.c_str(), "%" SCNx64, &protocolVersionInt); + protocolVersion = ProtocolVersion(protocolVersionInt); } if (attrib == "logAntiQuorum") { sscanf(value.c_str(), "%d", &logAntiQuorum); @@ -1392,7 +1407,8 @@ ACTOR void setupAndRun(std::string dataFolder, const char *testFile, bool reboot state int minimumRegions = 0; state int configureLocked = 0; state int logAntiQuorum = -1; - checkTestConf(testFile, extraDB, minimumReplication, minimumRegions, configureLocked, logAntiQuorum); + state ProtocolVersion protocolVersion = currentProtocolVersion; + checkTestConf(testFile, extraDB, minimumReplication, minimumRegions, configureLocked, logAntiQuorum, protocolVersion); // TODO (IPv6) Use IPv6? wait(g_simulator.onProcess( @@ -1401,7 +1417,7 @@ ACTOR void setupAndRun(std::string dataFolder, const char *testFile, bool reboot Standalone(deterministicRandom()->randomUniqueID().toString()), Standalone(deterministicRandom()->randomUniqueID().toString()), Optional>()), - ProcessClass(ProcessClass::TesterClass, ProcessClass::CommandLineSource), "", "", currentProtocolVersion), // this won't work + ProcessClass(ProcessClass::TesterClass, ProcessClass::CommandLineSource), "", "", protocolVersion), TaskPriority::DefaultYield)); Sim2FileSystem::newFileSystem(); FlowTransport::createInstance(true, 1); @@ -1410,7 +1426,7 @@ ACTOR void setupAndRun(std::string dataFolder, const char *testFile, bool reboot try { //systemActors.push_back( startSystemMonitor(dataFolder) ); if (rebooting) { - wait( timeoutError( restartSimulatedSystem( &systemActors, dataFolder, &testerCount, &connFile, &startingConfiguration, extraDB, whitelistBinPaths), 100.0 ) ); + wait( timeoutError( restartSimulatedSystem( &systemActors, dataFolder, &testerCount, &connFile, &startingConfiguration, extraDB, whitelistBinPaths, protocolVersion), 100.0 ) ); // FIXME: snapshot restore does not support multi-region restore, hence restore it as single region always if (restoring) { startingConfiguration = LiteralStringRef("usable_regions=1"); @@ -1419,12 +1435,13 @@ ACTOR void setupAndRun(std::string dataFolder, const char *testFile, bool reboot else { g_expect_full_pointermap = 1; setupSimulatedSystem(&systemActors, dataFolder, &testerCount, &connFile, &startingConfiguration, extraDB, - minimumReplication, minimumRegions, whitelistBinPaths, configureLocked, logAntiQuorum); + minimumReplication, minimumRegions, whitelistBinPaths, configureLocked, logAntiQuorum, protocolVersion); wait( delay(1.0) ); // FIXME: WHY!!! //wait for machines to boot } std::string clusterFileDir = joinPath( dataFolder, deterministicRandom()->randomUniqueID().toString() ); platform::createDirectory( clusterFileDir ); writeFile(joinPath(clusterFileDir, "fdb.cluster"), connFile.get().toString()); + std::cout << "TESTER COUNT: " << testerCount << std::endl; wait(timeoutError(runTests(Reference( new ClusterConnectionFile(joinPath(clusterFileDir, "fdb.cluster"))), TEST_TYPE_FROM_FILE, TEST_ON_TESTERS, testerCount, testFile, startingConfiguration), diff --git a/fdbserver/tester.actor.cpp b/fdbserver/tester.actor.cpp index 70c95558fa..d39431eada 100644 --- a/fdbserver/tester.actor.cpp +++ b/fdbserver/tester.actor.cpp @@ -904,6 +904,9 @@ std::map> testSpecGlobalKey // else { } It is enable by default for tester TraceEvent("TestParserTest").detail("ClientInfoLogging", value); }}, + {"protocolVersion", [](const std::string& value) { + TraceEvent("TestParserTest").detail("ParsedProtocolVersion", value); + }} }; std::map> testSpecTestKeys = { @@ -1288,9 +1291,12 @@ ACTOR Future runTests( Reference testerTimeout = delay(600.0); // wait 600 sec for testers to show up state vector workers; + minTestersExpected = 1; + loop { choose { when( vector w = wait( cc->get().present() ? brokenPromiseToNever( cc->get().get().getWorkers.getReply( GetWorkersRequest( flags ) ) ) : Never() ) ) { + std::cout << "WORKERS RECRUITEED" << std::endl; if (w.size() >= minTestersExpected) { workers = w; break; @@ -1300,6 +1306,7 @@ ACTOR Future runTests( ReferenceonChange() ) ) {} when( wait( testerTimeout ) ) { TraceEvent(SevError, "TesterRecruitmentTimeout"); + std::cout << "TESTER TIMEOUT" << std::endl; throw timed_out(); } } @@ -1366,10 +1373,13 @@ ACTOR Future runTests( Reference connFile, test_typ if (at == TEST_HERE) { Reference> db( new AsyncVar ); vector iTesters(1); + TraceEvent("BEFORE MONITOR SERVER DBINFO"); actors.push_back( reportErrors(monitorServerDBInfo( cc, LocalityData(), db ), "MonitorServerDBInfo") ); // FIXME: Locality + TraceEvent("BEFORE MONITOR TESTER SERVER CORE"); actors.push_back( reportErrors(testerServerCore( iTesters[0], connFile, db, locality ), "TesterServerCore") ); tests = runTests( cc, ci, iTesters, testSpecs, startingConfiguration, locality ); } else { + TraceEvent("BEFORE RUNTESTS"); tests = reportErrors(runTests(cc, ci, testSpecs, at, minTestersExpected, startingConfiguration, locality), "RunTests"); } diff --git a/fdbserver/worker.actor.cpp b/fdbserver/worker.actor.cpp index 5029c05770..e97574a421 100644 --- a/fdbserver/worker.actor.cpp +++ b/fdbserver/worker.actor.cpp @@ -25,6 +25,7 @@ #include "fdbclient/StorageServerInterface.h" #include "fdbserver/Knobs.h" #include "flow/ActorCollection.h" +#include "flow/ProtocolVersion.h" #include "flow/SystemMonitor.h" #include "flow/TDMetric.actor.h" #include "fdbrpc/simulator.h" @@ -1797,8 +1798,11 @@ ACTOR Future serveProtocolInfo() { state RequestStream protocolInfo( PeerCompatibilityPolicy{ RequirePeer::AtLeast, ProtocolVersion::withStableInterfaces() }); protocolInfo.makeWellKnownEndpoint(WLTOKEN_PROTOCOL_INFO, TaskPriority::DefaultEndpoint); + std::cout << "SETUP PROTOCOL INFO ENDPOINT ON: " << g_network->getLocalAddress().toString() << " ON VERSION " << g_network->protocolVersion().version() << std::endl; loop { ProtocolInfoRequest req = waitNext(protocolInfo.getFuture()); + std::cout << "SENDING BACK PROTOCOL: " << g_network->protocolVersion().version() << " TO: " << req.reply.getEndpoint().addresses.toString() << std::endl; + TraceEvent("SENDING PROTOCOL INFO TO CLIENT"); req.reply.send(ProtocolInfoReply{ g_network->protocolVersion() }); } } diff --git a/fdbserver/workloads/ProtocolVersion.actor.cpp b/fdbserver/workloads/ProtocolVersion.actor.cpp index 0d36eb0da6..a3ae8bb1a1 100644 --- a/fdbserver/workloads/ProtocolVersion.actor.cpp +++ b/fdbserver/workloads/ProtocolVersion.actor.cpp @@ -19,10 +19,12 @@ */ #include +#include #include #include #include #include "fdbclient/CoordinationInterface.h" +#include "fdbclient/MultiVersionTransaction.h" #include "fdbrpc/FlowTransport.h" #include "fdbrpc/Locality.h" #include "fdbserver/SimulatedCluster.h" @@ -37,66 +39,10 @@ #include "flow/network.h" #include "flow/actorcompiler.h" // has to be last include -ACTOR Future getProtocol(Endpoint endpoint) { - std::vector> coordProtocols; - RequestStream requestStream{ endpoint }; - auto f = retryBrokenPromise(requestStream, ProtocolInfoRequest{}); - ProtocolInfoReply res = wait(f); - - std::cout << "GOT VERSION: " << res.version.version() << std::endl; - return Void(); -} - -struct _Struct { - static constexpr FileIdentifier file_identifier = 2340487; - int oldField = 0; -}; - -struct NewStruct : public _Struct { - int newField = 0; - - bool isSet() const { - return oldField == 1 && newField == 2; - } - void setFields() { - oldField = 1; - newField = 2; - } - - template - void serialize(Archive& ar) { - serializer(ar, oldField, newField); - } -}; - -ACTOR static Future writeNew(Database cx, int numObjects, Key key) { - ProtocolVersion protocolVersion = ProtocolVersion(0x0FDB00B070010000LL); - protocolVersion.addObjectSerializerFlag(); - ObjectWriter writer(IncludeVersion(protocolVersion)); - std::vector data(numObjects); - for (auto& newObject : data) { - newObject.setFields(); - } - writer.serialize(data); - state Value value = writer.toStringRef(); - - state Transaction tr(cx); - loop { - try { - tr.set(key, value); - wait(tr.commit()); - return Void(); - } catch (Error& e) { - wait(tr.onError(e)); - } - } -} - struct ProtocolVersionWorkload : TestWorkload { ProtocolVersionWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) { - protocol = ProtocolVersion(getOption(options, LiteralStringRef("protocolVersion"), 0.0)); - clusterFilePath = getOption(options, LiteralStringRef("clusterFilePath"), LiteralStringRef("")).toString(); + // protocol = ProtocolVersion(getOption(options, LiteralStringRef("protocolVersion"), 0.0)); } virtual std::string description() { @@ -108,51 +54,64 @@ struct ProtocolVersionWorkload : TestWorkload { } ACTOR Future _start(ProtocolVersionWorkload* self, Database cx) { - CSimpleIni ini; - state Reference connFile(new ClusterConnectionFile(self->clusterFilePath)); - state const char* whitelistBinPaths = ""; - state std::string dataFolder = "simfdb"; - state LocalityData localities = LocalityData(Optional>(), - Standalone(deterministicRandom()->randomUniqueID().toString()), - Standalone(deterministicRandom()->randomUniqueID().toString()), - Optional>()); - state std::string coordFolder = ini.GetValue(printable(localities.machineId()).c_str(), "coordinationFolder", ""); - state ProcessClass processClass = ProcessClass(ProcessClass::TesterClass, ProcessClass::CommandLineSource); - state uint16_t listenPerProcess = 1; - state uint16_t port = 1; - state IPAddress ip = IPAddress(0x01010101); - state bool sslEnabled = false; + // CSimpleIni ini; + // // state Reference connFile(new ClusterConnectionFile(self->clusterFilePath)); + // state const char* whitelistBinPaths = ""; + // state std::string dataFolder = "simfdb"; + // state LocalityData localities = LocalityData(Optional>(), + // Standalone(deterministicRandom()->randomUniqueID().toString()), + // Standalone(deterministicRandom()->randomUniqueID().toString()), + // Optional>()); + // state std::string coordFolder = ini.GetValue(printable(localities.machineId()).c_str(), "coordinationFolder", ""); + // state ProcessClass processClass = ProcessClass(ProcessClass::TesterClass, ProcessClass::CommandLineSource); + // state uint16_t listenPerProcess = 1; + // state uint16_t port = 1; + // state IPAddress ip = IPAddress(0x01011F11); + // state bool sslEnabled = false; - state ISimulator::ProcessInfo* process = g_pSimulator->newProcess("ProtocolVersionProcess", ip, port, sslEnabled, listenPerProcess, - // localities, processClass, dataFolder.c_str(), coordFolder.c_str(), currentProtocolVersion); - localities, processClass, dataFolder.c_str(), coordFolder.c_str(), ProtocolVersion(0x0FDB00B070010000LL)); - wait(g_pSimulator->onProcess(process, TaskPriority::DefaultYield)); + // state ISimulator::ProcessInfo* process = g_pSimulator->newProcess("ProtocolVersionProcess", ip, port, sslEnabled, listenPerProcess, + // localities, processClass, dataFolder.c_str(), coordFolder.c_str(), currentProtocolVersion); + // // localities, processClass, dataFolder.c_str(), coordFolder.c_str(), ProtocolVersion(0x0FDB00B070010000LL)); + // wait(g_pSimulator->onProcess(process, TaskPriority::DefaultYield)); - FlowTransport::createInstance(true, 1); - Sim2FileSystem::newFileSystem(); + // FlowTransport::createInstance(true, 1); + // Sim2FileSystem::newFileSystem(); - NetworkAddress n(ip, port, true, sslEnabled); - FlowTransport::transport().bind( n, n ); + // NetworkAddress n(ip, port, true, sslEnabled); + // FlowTransport::transport().bind( n, n ); - state vector> actors; - actors.push_back(fdbd( connFile, localities, processClass, dataFolder, coordFolder, 500e6, "", "", -1, whitelistBinPaths)); + // state vector> actors; + // actors.push_back(fdbd( cx->getConnectionFile(), localities, processClass, dataFolder, coordFolder, 500e6, "", "", -1, whitelistBinPaths)); - state std::vector allProcesses = g_pSimulator->getAllProcesses(); - state ISimulator::ProcessInfo* nextProcess = nullptr; - for(ISimulator::ProcessInfo* p : allProcesses){ - if(p->address != process->address){ - nextProcess = p; - break; - } + // getting coord protocols from current protocol version + state vector> coordProtocols; + vector coordAddresses = cx->getConnectionFile()->getConnectionString().coordinators(); + for(int i = 0; i requestStream{ Endpoint{ { coordAddresses[i] }, WLTOKEN_PROTOCOL_INFO } }; + coordProtocols.push_back(retryBrokenPromise(requestStream, ProtocolInfoRequest{})); } - ASSERT(nextProcess); - wait(g_pSimulator->onProcess(nextProcess)); + wait(waitForAll(coordProtocols)); - wait(getProtocol(Endpoint{{process->addresses}, WLTOKEN_PROTOCOL_INFO})); - stopAfter(waitForAll(actors)); + // state std::vector allProcesses = g_pSimulator->getAllProcesses(); + // state int i = 0; + // for(; ionProcess(allProcesses[i], TaskPriority::DefaultYield); + // wait(f); + // std::cout << "PROCESS PRO VESRION: " << g_network->protocolVersion().version() << std::endl; + // } + std::cout << "CURR VERSION: " << g_network->protocolVersion().version() << std::endl; + std::vector protocolMatches; + protocolMatches.reserve(coordProtocols.size()); + for(int i = 0; iprotocolVersion() != coordProtocols[i].get().version) std::cout << "MISMATCHED VERSIONS" << std::endl; + protocolMatches.push_back(g_network->protocolVersion() == coordProtocols[i].get().version); + } - // wait(writeNew(cx, 1, LiteralStringRef("TEST"))); + // ASSERT(count(protocolMatches.begin(), protocolMatches.end(), false) >= 1); + + // g_pSimulator->killProcess(process, ISimulator::KillType::KillInstantly); + // stopAfter(waitForAll(actors)); return Void(); } diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 834f5086eb..952a187fa1 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -130,6 +130,7 @@ if(WITH_PYTHON) add_fdb_test(TEST_FILES fast/LowLatency.toml) add_fdb_test(TEST_FILES fast/MemoryLifetime.toml) add_fdb_test(TEST_FILES fast/MoveKeysCycle.toml) + add_fdb_test(TEST_FILES fast/ProtocolVersion.toml) add_fdb_test(TEST_FILES fast/RandomSelector.toml) add_fdb_test(TEST_FILES fast/RandomUnitTests.toml) add_fdb_test(TEST_FILES fast/ReadHotDetectionCorrectness.toml IGNORE) # TODO re-enable once read hot detection is enabled. diff --git a/tests/fast/ProtocolVersion.toml b/tests/fast/ProtocolVersion.toml index 6a1e0d1f67..1f5d81cedf 100644 --- a/tests/fast/ProtocolVersion.toml +++ b/tests/fast/ProtocolVersion.toml @@ -1,3 +1,5 @@ +protocolVersion = 0x0FDB00B070010000 + [[test]] testTitle = 'ProtocolVersionTest'