draft diff protocol

This commit is contained in:
Richard Chen 2020-10-05 17:07:51 +00:00
parent b233f44d2d
commit 5488ff1d81
12 changed files with 315 additions and 119 deletions

View File

@ -18,15 +18,18 @@
* limitations under the License.
*/
#include "fdbclient/CoordinationInterface.h"
#include "fdbclient/MultiVersionTransaction.h"
#include "fdbclient/MultiVersionAssignmentVars.h"
#include "fdbclient/Status.h"
#include "fdbclient/ThreadSafeTransaction.h"
#include "fdbrpc/fdbrpc.h"
#include "flow/Platform.h"
#include "flow/UnitTest.h"
#include "flow/actorcompiler.h" // This must be the last #include.
#include "flow/network.h"
#include "flow/actorcompiler.h" // This must be the last #include.
void throwIfError(FdbCApi::fdb_error_t e) {
if(e) {
@ -718,6 +721,8 @@ MultiVersionDatabase::MultiVersionDatabase(MultiVersionApi *api, std::string clu
dbState->db = db;
dbState->dbVar->set(db);
std::cout << "TRYING TO ADD CONNECTION" << std::endl;
if(!openConnectors) {
dbState->currentClientIndex = 0;
}
@ -906,6 +911,133 @@ void MultiVersionDatabase::DatabaseState::stateChanged() {
currentClientIndex = newIndex;
}
ACTOR Future<Void> getServerProtocol(Endpoint endpoint) {
RequestStream<ProtocolInfoRequest> requestStream{ endpoint };
auto f = retryBrokenPromise(requestStream, ProtocolInfoRequest{});
std::cout << " MAKING REQUEST FOR PROTOCOL INFO FROM: " << requestStream.getEndpoint().addresses.toString() << std::endl;
while(!f.isReady()) { // TODO figure out how to make this work
if(f.isError() || !f.isValid()) {
std::cout << "SOMEE ERROR OCCURED" << std::endl;
}
}
ProtocolInfoReply res = wait(f);
std::cout << "GOT VERSION: " << res.version.version() << std::endl;
return Void();
}
// Check if a quorum of coordination servers is reachable
// Will not throw, will just return non-present Optional if error
ACTOR Future<Optional<StatusObject>> multiVersionClientCoordinatorsStatusFetcher(Reference<ClusterConnectionFile> f, bool *quorum_reachable, int *coordinatorsFaultTolerance) {
try {
std::cout << "TYING TO GET STATUS" << std::endl;
state ClientCoordinators coord(f);
state StatusObject statusObj;
state vector<Future<Optional<LeaderInfo>>> leaderServers;
for (int i = 0; i < coord.clientLeaderServers.size(); i++) {
leaderServers.push_back(retryBrokenPromise(coord.clientLeaderServers[i].getLeader, GetLeaderRequest(coord.clusterKey, UID()), TaskPriority::CoordinationReply));
}
state vector<Future<ProtocolInfoReply>> coordProtocols;
coordProtocols.reserve(coord.clientLeaderServers.size());
for (int i = 0; i < coord.clientLeaderServers.size(); i++) {
std::cout << "SENDING TO : " << coord.clientLeaderServers[i].getLeader.getEndpoint().addresses.toString() << std::endl;
RequestStream<ProtocolInfoRequest> requestStream{ Endpoint{
{ coord.clientLeaderServers[i].getLeader.getEndpoint().addresses }, WLTOKEN_PROTOCOL_INFO } };
Future<ProtocolInfoReply> r = retryBrokenPromise(requestStream, ProtocolInfoRequest{});
while(!r.isReady()) { // TODO figure out how to make this work
if(r.isError() || !r.isValid()) {
std::cout << "SOMEE ERROR OCCURED" << std::endl;
}
}
coordProtocols.push_back(r);
// coordProtocols.push_back(retryBrokenPromise(requestStream, ProtocolInfoRequest{}));
}
// wait(getServerProtocol(Endpoint{{ coord.clientLeaderServers[0].getLeader.getEndpoint().addresses }, WLTOKEN_PROTOCOL_INFO } ));
// return Optional<StatusObject>();
wait(waitForAll(coordProtocols));
for(Future<ProtocolInfoReply> r : coordProtocols) {
std::cout << "PROTOCOL VESRION: " << r.get().version.version() << std::endl;
}
std::cout << "WAITING FOR LEADER SERVERS" << std::endl;
wait(smartQuorum(leaderServers, leaderServers.size() / 2 + 1, 1.5) &&
smartQuorum(coordProtocols, coordProtocols.size() / 2 + 1, 1.5) ||
delay(2.0));
std::cout << "GOT LEADER SERVERS" << std::endl;
statusObj["quorum_reachable"] = *quorum_reachable = quorum(leaderServers, leaderServers.size() / 2 + 1).isReady();
StatusArray coordsStatus;
int coordinatorsUnavailable = 0;
for (int i = 0; i < leaderServers.size(); i++) {
StatusObject coordStatus;
coordStatus["address"] = coord.clientLeaderServers[i].getLeader.getEndpoint().getPrimaryAddress().toString();
if (leaderServers[i].isReady()){
coordStatus["reachable"] = true;
}
else {
coordinatorsUnavailable++;
coordStatus["reachable"] = false;
}
if (coordProtocols[i].isReady()) {
coordStatus["protocol"] = coordProtocols[i].get().version.version();
}
coordsStatus.push_back(coordStatus);
}
statusObj["coordinators"] = coordsStatus;
*coordinatorsFaultTolerance = (leaderServers.size() - 1) / 2 - coordinatorsUnavailable;
return statusObj;
}
catch (Error &e){
*quorum_reachable = false;
return Optional<StatusObject>();
}
}
void getCoordinatorProtocolFromStatusObject(StatusObjectReader statusObj) {
// StatusObjectReader statusObjClient;
// statusObj.get("client", statusObjClient);
StatusObjectReader statusObjCoordinators;
StatusArray coordinatorsArr;
if (statusObj.get("coordinators", statusObjCoordinators)) {
std::cout << "IN HERE" << std::endl;
// Look for a second "coordinators", under the first one.
if (statusObjCoordinators.has("coordinators"))
coordinatorsArr = statusObjCoordinators.last().get_array();
}
for (StatusObjectReader coor : coordinatorsArr){
uint64_t version;
coor.get("protocol", version);
std::cout << "COORD PROTOCOL VERSION: " << version << std::endl;
}
}
ACTOR Future<Void> getCoordinatorProtocols(Reference<ClusterConnectionFile> f) {
state bool quorum_reachable = false;
state int coordinatorsFaultTolerance = 0;
Optional<StatusObject> statusObjOptional = wait(multiVersionClientCoordinatorsStatusFetcher(f, &quorum_reachable, &coordinatorsFaultTolerance));
if(!quorum_reachable) {
std::cout << "QUORUM NOT REACHABLE" << std::endl;
}
std::cout << "GOT STATUS OBJECT" << std::endl;
StatusObject statusObj= statusObjOptional.get();
getCoordinatorProtocolFromStatusObject(statusObj);
return Void();
}
// TODO: adding connections here and attempt connections
void MultiVersionDatabase::DatabaseState::addConnection(Reference<ClientInfo> client, std::string clusterFilePath) {
// do check if compatible - call StatusClient::statusFetcher(Database)?
@ -1061,7 +1193,7 @@ void MultiVersionApi::addExternalLibraryDirectory(std::string path) {
if(externalClients.count(filename) == 0) {
TraceEvent("AddingExternalClient").detail("LibraryPath", filename);
externalClients[filename] = Reference<ClientInfo>(new ClientInfo(new DLApi(lib), lib));
}
}
}
}
@ -1154,8 +1286,10 @@ void MultiVersionApi::setNetworkOptionInternal(FDBNetworkOptions::Option option,
}
}
// TODO: should the check be happening here? Before we setup network? the network should be setup before external connections are added
void MultiVersionApi::setupNetwork() {
if(!externalClient) {
std::cout << "SETTING UP ENV VARIABLES" << std::endl;
loadEnvironmentVariableNetworkOptions();
}
@ -1181,12 +1315,14 @@ void MultiVersionApi::setupNetwork() {
}
localClient->loadProtocolVersion();
std::cout << "SET UP NETWORK LOCAL CLIENT PROTOCOL VERSION: " << localClient->protocolVersion.version() << std::endl;
if(!bypassMultiClientApi) {
runOnExternalClients([this](Reference<ClientInfo> client) {
TraceEvent("InitializingExternalClient").detail("LibraryPath", client->libPath);
client->api->selectApiVersion(apiVersion);
client->loadProtocolVersion();
std::cout << "EXTERNAL CLIENT PROTOCOL VERSION: " << client->protocolVersion.version() << std::endl;
});
MutexHolder holder(lock);
@ -1229,6 +1365,40 @@ void MultiVersionApi::runNetwork() {
lock.leave();
std::cout << "STARTING NETWORK THREAD" << std::endl;
// TODO: ONLY RUN NETWORK THREADS OF OTHER CLIENTS IF WE KNOW THAT THEY ARE COMPATIBLE??
// OR ALLOW THE NETWORK THREADS TO START, BUT KILL THEM AFTER REALIZING THEY'RE INCOMPATIBLE
// MAYBE: ONLY CREATE NETWORK THREAD FOR CURRENT CLIENT. THEN WHEN CREATING DATABASE, START NETWORK THREADS FOR OTHER AS WELL?
// std::vector<THREAD_HANDLE> handles;
// if(!bypassMultiClientApi) {
// runOnExternalClients([&handles](Reference<ClientInfo> client) {
// if(client->external) {
// handles.push_back(g_network->startThread(&runNetworkThread, client.getPtr()));
// }
// });
// }
localClient->api->runNetwork();
// for(auto h : handles) {
// waitThread(h);
// }
// std::cout << "DONE WAITING FOR EXTERNAL CLIENTS" << std::endl;
}
void MultiVersionApi::runExternalNetwork() {
lock.enter();
if(!networkSetup) {
lock.leave();
throw network_not_setup();
}
lock.leave();
std::cout << "STARTING EXTERNAL NETWORK THREAD" << std::endl;
std::vector<THREAD_HANDLE> handles;
if(!bypassMultiClientApi) {
runOnExternalClients([&handles](Reference<ClientInfo> client) {
@ -1243,6 +1413,7 @@ void MultiVersionApi::runNetwork() {
for(auto h : handles) {
waitThread(h);
}
std::cout << "DONE WAITING FOR EXTERNAL CLIENTS" << std::endl;
}
void MultiVersionApi::stopNetwork() {
@ -1280,20 +1451,33 @@ void MultiVersionApi::addNetworkThreadCompletionHook(void (*hook)(void*), void *
}
Reference<IDatabase> MultiVersionApi::createDatabase(const char *clusterFilePath) {
std::cout << "CREATING DATABASE FROM MULTIVERSIONAPI" << std::endl;
lock.enter();
if(!networkSetup) {
std::cout << "NETWORK NOT SETUP" << std::endl;
lock.leave();
throw network_not_setup();
}
lock.leave();
std::string clusterFile(clusterFilePath);
// TODO: may need to keep map of compatible clients per cluster file
// how to wait to get coordinator protocol? createDatabase cannot be an Actor
// May have to setup a reciever on this end?
// async try to find which clients are compatible. Set map varialbe to true if compatible?
auto f = Reference<ClusterConnectionFile>(new ClusterConnectionFile(clusterFile));
getCoordinatorProtocols(f);
if(localClientDisabled) {
return Reference<IDatabase>(new MultiVersionDatabase(this, clusterFile, Reference<IDatabase>()));
}
auto db = localClient->api->createDatabase(clusterFilePath);
if(bypassMultiClientApi) {
std::cout << "IS BYPASS" << std::endl;
return db;
}
else {

View File

@ -20,6 +20,8 @@
#ifndef FDBCLIENT_MULTIVERSIONTRANSACTION_H
#define FDBCLIENT_MULTIVERSIONTRANSACTION_H
#include "fdbclient/CoordinationInterface.h"
#include "fdbrpc/fdbrpc.h"
#pragma once
#include "fdbclient/FDBOptions.g.h"
@ -27,6 +29,7 @@
#include "fdbclient/IClientApi.h"
#include "flow/ThreadHelper.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
typedef struct future FDBFuture;
@ -415,8 +418,12 @@ private:
void setNetworkOptionInternal(FDBNetworkOptions::Option option, Optional<StringRef> value);
void runExternalNetwork();
Reference<ClientInfo> localClient;
std::map<std::string, Reference<ClientInfo>> externalClients;
std::map<std::string, Reference<ClientInfo>> clusterFileCompatibleClients; // should this be scoped to clusterfile or coordinator address
volatile std::map<std::string, bool> clusterFileCompatibleSet;
bool networkStartSetup;
volatile bool networkSetup;

View File

@ -996,14 +996,13 @@ ACTOR static Future<Void> connectionReader(
BinaryReader pktReader(unprocessed_begin, connectPacketSize, AssumeVersion(protocolVersion));
ConnectPacket pkt;
serializer(pktReader, pkt);
std::cout << "INCOMING PKT VERSION: " << pkt.protocolVersion.version() << std::endl;
// std::cout << "INCOMING PKT VERSION: " << pkt.protocolVersion.version() << " FROM " << peer->destination.toString() << " TO " << conn->getPeerAddress().toString() << std::endl;
uint64_t connectionId = pkt.connectionId;
if (!pkt.protocolVersion.hasObjectSerializerFlag() ||
// !pkt.protocolVersion.hasStableInterfaces()) {
!pkt.protocolVersion.isCompatible(g_network->protocolVersion())) {
// !pkt.protocolVersion.isCompatible(ProtocolVersion(ProtocolVersion::minValidProtocolVersion))) { // TODO DELETE THIS
incompatibleProtocolVersionNewer = pkt.protocolVersion > g_network->protocolVersion();
NetworkAddress addr = pkt.canonicalRemotePort
? NetworkAddress(pkt.canonicalRemoteIp(), pkt.canonicalRemotePort)
@ -1066,6 +1065,7 @@ ACTOR static Future<Void> connectionReader(
peerAddress = NetworkAddress(pkt.canonicalRemoteIp(), pkt.canonicalRemotePort, true,
peerAddress.isTLS());
}
// TODO: try to get or open peer here
peer = transport->getOrOpenPeer(peerAddress, false);
peer->compatible = compatible;
peer->incompatibleProtocolVersionNewer = incompatibleProtocolVersionNewer;
@ -1161,6 +1161,8 @@ Reference<Peer> TransportData::getPeer( NetworkAddress const& address ) {
}
Reference<Peer> TransportData::getOrOpenPeer( NetworkAddress const& address, bool startConnectionKeeper ) {
// std::cout << "Get or open peer " << address.toString() << std::endl;
// TraceEvent("Get Or open peer");
auto peer = getPeer(address);
if(!peer) {
peer = Reference<Peer>( new Peer(this, address) );

View File

@ -1030,6 +1030,7 @@ public:
ProcessInfo* m = new ProcessInfo(name, locality, startingClass, addresses, this, dataFolder, coordinationFolder);
for (int processPort = port; processPort < port + listenPerProcess; ++processPort) {
NetworkAddress address(ip, processPort, true, sslEnabled && processPort == port);
TraceEvent("SETTING UP SIM2LIstenr");
m->listenerMap[address] = Reference<IListener>( new Sim2Listener(m, address) );
addressMap[address] = m;
}
@ -1704,7 +1705,7 @@ public:
return Void();
return delay( 0, taskID, process->machine->machineProcess );
}
virtual ProtocolVersion protocolVersion() override {
ASSERT(processProtocolVersion.find(getCurrentProcess()) != processProtocolVersion.end());
return processProtocolVersion.at(getCurrentProcess());

View File

@ -2950,6 +2950,7 @@ ACTOR Future<Void> clusterControllerCore( ClusterControllerFullInterface interf,
registerWorker( req, &self );
}
when( GetWorkersRequest req = waitNext( interf.getWorkers.getFuture() ) ) {
std::cout << "GOT WORKER FROM: " << req.reply.getEndpoint().addresses.toString() << std::endl;
++self.getWorkersRequests;
vector<WorkerDetails> workers;

View File

@ -24,10 +24,13 @@
#include "fdbserver/WorkerInterface.actor.h"
#include "fdbserver/Status.h"
#include "flow/ActorCollection.h"
#include "flow/ProtocolVersion.h"
#include "flow/UnitTest.h"
#include "flow/IndexedSet.h"
#include "fdbclient/MonitorLeader.h"
#include "flow/actorcompiler.h" // This must be the last #include.
#include "flow/network.h"
#include <cstdint>
// This module implements coordinationServer() and the interfaces in CoordinationInterface.h
@ -204,7 +207,7 @@ TEST_CASE("/fdbserver/Coordination/localGenerationReg/simple") {
}
ACTOR Future<Void> openDatabase(ClientData* db, int* clientCount, Reference<AsyncVar<bool>> hasConnectedClients, OpenDatabaseCoordRequest req) {
// std::cout << "OPEN DATAASE" << std::endl;
std::cout << "OPEN DATABASE" << std::endl;
++(*clientCount);
hasConnectedClients->set(true);
@ -464,6 +467,7 @@ struct LeaderRegisterCollection {
}
LeaderElectionRegInterface& getInterface(KeyRef key, UID id) {
std::cout << "TRYING TO GET INTERFACE" << std::endl;
auto i = registerInterfaces.find( key );
if (i == registerInterfaces.end()) {
Key k = key;
@ -505,6 +509,10 @@ ACTOR Future<Void> leaderServer(LeaderElectionRegInterface interf, OnDemandStore
wait( LeaderRegisterCollection::init( &regs ) );
// if(g_network->protocolVersion() == currentProtocolVersion){
// loop choose {}
// }
loop choose {
when ( OpenDatabaseCoordRequest req = waitNext( interf.openDatabase.getFuture() ) ) {
Optional<LeaderInfo> forward = regs.getForward(req.clusterKey);

View File

@ -18,9 +18,11 @@
* limitations under the License.
*/
#include <cinttypes>
#include <cstdint>
#include <fstream>
#include <ostream>
#include "fdbrpc/Locality.h"
#include "fdbrpc/simulator.h"
#include "fdbclient/DatabaseContext.h"
#include "fdbserver/TesterInterface.actor.h"
@ -36,6 +38,7 @@
#include "fdbclient/versions.h"
#include "flow/ProtocolVersion.h"
#include "flow/actorcompiler.h" // This must be the last #include.
#include "flow/network.h"
#undef max
#undef min
@ -139,7 +142,7 @@ ACTOR Future<ISimulator::KillType> simulatedFDBDRebooter(Reference<ClusterConnec
std::string* dataFolder, std::string* coordFolder,
std::string baseFolder, ClusterConnectionString connStr,
bool useSeedFile, AgentMode runBackupAgents,
std::string whitelistBinPaths) {
std::string whitelistBinPaths, ProtocolVersion protocolVersion) {
state ISimulator::ProcessInfo *simProcess = g_simulator.getCurrentProcess();
state UID randomId = nondeterministicRandom()->randomUniqueID();
state int cycles = 0;
@ -156,7 +159,7 @@ ACTOR Future<ISimulator::KillType> simulatedFDBDRebooter(Reference<ClusterConnec
state ISimulator::ProcessInfo* process =
g_simulator.newProcess("Server", ip, port, sslEnabled, listenPerProcess, localities, processClass, dataFolder->c_str(),
coordFolder->c_str(), currentProtocolVersion);
coordFolder->c_str(), protocolVersion);
wait(g_simulator.onProcess(process,
TaskPriority::DefaultYield)); // Now switch execution to the process on which we will run
state Future<ISimulator::KillType> onShutdown = process->onShutdown();
@ -299,7 +302,7 @@ std::map< Optional<Standalone<StringRef>>, std::vector< std::vector< std::string
// process count is no longer needed because it is now the length of the vector of ip's, because it was one ip per process
ACTOR Future<Void> simulatedMachine(ClusterConnectionString connStr, std::vector<IPAddress> ips, bool sslEnabled, LocalityData localities,
ProcessClass processClass, std::string baseFolder, bool restarting,
bool useSeedFile, AgentMode runBackupAgents, bool sslOnly, std::string whitelistBinPaths) {
bool useSeedFile, AgentMode runBackupAgents, bool sslOnly, std::string whitelistBinPaths, ProtocolVersion protocolVersion, bool* addedNew) {
state int bootCount = 0;
state std::vector<std::string> myFolders;
state std::vector<std::string> coordFolders;
@ -342,7 +345,11 @@ ACTOR Future<Void> simulatedMachine(ClusterConnectionString connStr, std::vector
Reference<ClusterConnectionFile> clusterFile(useSeedFile ? new ClusterConnectionFile(path, connStr.toString()) : new ClusterConnectionFile(path));
const int listenPort = i*listenPerProcess + 1;
AgentMode agentMode = runBackupAgents == AgentOnly ? ( i == ips.size()-1 ? AgentOnly : AgentNone ) : runBackupAgents;
processes.push_back(simulatedFDBDRebooter(clusterFile, ips[i], sslEnabled, listenPort, listenPerProcess, localities, processClass, &myFolders[i], &coordFolders[i], baseFolder, connStr, useSeedFile, agentMode, whitelistBinPaths));
if(*addedNew == false && i == ips.size()-1) {
processes.push_back(simulatedFDBDRebooter(clusterFile, ips[i], sslEnabled, listenPort, listenPerProcess, localities, processClass, &myFolders[i], &coordFolders[i], baseFolder, connStr, useSeedFile, agentMode, whitelistBinPaths, currentProtocolVersion)); // currentProtocolVersion
*addedNew = true;
}
else processes.push_back(simulatedFDBDRebooter(clusterFile, ips[i], sslEnabled, listenPort, listenPerProcess, localities, processClass, &myFolders[i], &coordFolders[i], baseFolder, connStr, useSeedFile, agentMode, whitelistBinPaths, protocolVersion));
TraceEvent("SimulatedMachineProcess", randomId).detail("Address", NetworkAddress(ips[i], listenPort, true, false)).detail("ZoneId", localities.zoneId()).detail("DataHall", localities.dataHallId()).detail("Folder", myFolders[i]);
}
@ -547,7 +554,7 @@ IPAddress makeIPAddressForSim(bool isIPv6, std::array<int, 4> parts) {
ACTOR Future<Void> restartSimulatedSystem(vector<Future<Void>>* systemActors, std::string baseFolder, int* pTesterCount,
Optional<ClusterConnectionString>* pConnString,
Standalone<StringRef>* pStartingConfiguration,
int extraDB, std::string whitelistBinPaths) {
int extraDB, std::string whitelistBinPaths, ProtocolVersion protocolVersion) {
CSimpleIni ini;
ini.SetUnicode();
ini.LoadFile(joinPath(baseFolder, "restartInfo.ini").c_str());
@ -641,12 +648,12 @@ ACTOR Future<Void> restartSimulatedSystem(vector<Future<Void>>* systemActors, st
LocalityData localities(Optional<Standalone<StringRef>>(), zoneId, machineId, dcUID);
localities.set(LiteralStringRef("data_hall"), dcUID);
bool addedNew = true;
// SOMEDAY: parse backup agent from test file
systemActors->push_back(reportErrors(
simulatedMachine(conn, ipAddrs, usingSSL, localities, processClass, baseFolder, true,
i == useSeedForMachine, AgentAddition,
usingSSL && (listenersPerProcess == 1 || processClass == ProcessClass::TesterClass),
whitelistBinPaths),
i == useSeedForMachine, enableExtraDB ? AgentAddition : AgentNone,
usingSSL && (listenersPerProcess == 1 || processClass == ProcessClass::TesterClass), whitelistBinPaths, protocolVersion, &addedNew),
processClass == ProcessClass::TesterClass ? "SimulatedTesterMachine" : "SimulatedMachine"));
}
@ -1053,8 +1060,8 @@ void SimulationConfig::generateNormalConfig(int minimumReplication, int minimumR
void setupSimulatedSystem(vector<Future<Void>>* systemActors, std::string baseFolder, int* pTesterCount,
Optional<ClusterConnectionString>* pConnString, Standalone<StringRef>* pStartingConfiguration,
int extraDB, int minimumReplication, int minimumRegions, std::string whitelistBinPaths,
bool configureLocked, int logAntiQuorum) {
int extraDB, int minimumReplication, int minimumRegions, std::string whitelistBinPaths, bool configureLocked,
int logAntiQuorum, ProtocolVersion protocolVersion) {
// SOMEDAY: this does not test multi-interface configurations
SimulationConfig simconfig(extraDB, minimumReplication, minimumRegions);
if (logAntiQuorum != -1) {
@ -1219,6 +1226,8 @@ void setupSimulatedSystem(vector<Future<Void>>* systemActors, std::string baseFo
bool requiresExtraDBMachines = extraDB && g_simulator.extraDB->toString() != conn.toString();
int assignedMachines = 0, nonVersatileMachines = 0;
std::vector<ProcessClass::ClassType> processClassesSubSet = {ProcessClass::UnsetClass, ProcessClass::ResolutionClass, ProcessClass::MasterClass};
bool addedNew = false;
for( int dc = 0; dc < dataCenters; dc++ ) {
//FIXME: test unset dcID
Optional<Standalone<StringRef>> dcUID = StringRef(format("%d", dc));
@ -1276,7 +1285,7 @@ void setupSimulatedSystem(vector<Future<Void>>* systemActors, std::string baseFo
LocalityData localities(Optional<Standalone<StringRef>>(), zoneId, machineId, dcUID);
localities.set(LiteralStringRef("data_hall"), dcUID);
systemActors->push_back(reportErrors(simulatedMachine(conn, ips, sslEnabled,
localities, processClass, baseFolder, false, machine == useSeedForMachine, requiresExtraDBMachines ? AgentOnly : AgentAddition, sslOnly, whitelistBinPaths ), "SimulatedMachine"));
localities, processClass, baseFolder, false, machine == useSeedForMachine, requiresExtraDBMachines ? AgentOnly : AgentAddition, sslOnly, whitelistBinPaths, protocolVersion, &addedNew ), "SimulatedMachine"));
if (requiresExtraDBMachines) {
std::vector<IPAddress> extraIps;
@ -1290,7 +1299,7 @@ void setupSimulatedSystem(vector<Future<Void>>* systemActors, std::string baseFo
localities.set(LiteralStringRef("data_hall"), dcUID);
systemActors->push_back(reportErrors(simulatedMachine(*g_simulator.extraDB, extraIps, sslEnabled,
localities,
processClass, baseFolder, false, machine == useSeedForMachine, AgentNone, sslOnly, whitelistBinPaths ), "SimulatedMachine"));
processClass, baseFolder, false, machine == useSeedForMachine, AgentNone, sslOnly, whitelistBinPaths, protocolVersion, &addedNew ), "SimulatedMachine"));
}
assignedMachines++;
@ -1314,11 +1323,11 @@ void setupSimulatedSystem(vector<Future<Void>>* systemActors, std::string baseFo
std::vector<IPAddress> ips;
ips.push_back(makeIPAddressForSim(useIPv6, { 3, 4, 3, i + 1 }));
Standalone<StringRef> newZoneId = Standalone<StringRef>(deterministicRandom()->randomUniqueID().toString());
LocalityData localities(Optional<Standalone<StringRef>>(), newZoneId, newZoneId, Optional<Standalone<StringRef>>());
LocalityData localities(Optional<Standalone<StringRef>>(), newZoneId, newZoneId, Optional<Standalone<StringRef>>());
systemActors->push_back( reportErrors( simulatedMachine(
conn, ips, sslEnabled && sslOnly,
localities, ProcessClass(ProcessClass::TesterClass, ProcessClass::CommandLineSource),
baseFolder, false, i == useSeedForMachine, AgentNone, sslEnabled && sslOnly, whitelistBinPaths ),
baseFolder, false, i == useSeedForMachine, AgentNone, sslEnabled && sslOnly, whitelistBinPaths, protocolVersion, &addedNew ),
"SimulatedTesterMachine") );
}
*pStartingConfiguration = startingConfigString;
@ -1338,7 +1347,7 @@ void setupSimulatedSystem(vector<Future<Void>>* systemActors, std::string baseFo
}
void checkTestConf(const char* testFile, int& extraDB, int& minimumReplication, int& minimumRegions,
int& configureLocked, int& logAntiQuorum) {
int& configureLocked, int& logAntiQuorum, ProtocolVersion& protocolVersion) {
std::ifstream ifs;
ifs.open(testFile, std::ifstream::in);
if (!ifs.good())
@ -1372,7 +1381,13 @@ void checkTestConf(const char* testFile, int& extraDB, int& minimumReplication,
}
if (attrib == "configureLocked") {
sscanf(value.c_str(), "%d", &configureLocked);
sscanf( value.c_str(), "%d", &configureLocked );
}
if (attrib == "protocolVersion") {
uint64_t protocolVersionInt = 0;
sscanf(value.c_str(), "%" SCNx64, &protocolVersionInt);
protocolVersion = ProtocolVersion(protocolVersionInt);
}
if (attrib == "logAntiQuorum") {
sscanf(value.c_str(), "%d", &logAntiQuorum);
@ -1392,7 +1407,8 @@ ACTOR void setupAndRun(std::string dataFolder, const char *testFile, bool reboot
state int minimumRegions = 0;
state int configureLocked = 0;
state int logAntiQuorum = -1;
checkTestConf(testFile, extraDB, minimumReplication, minimumRegions, configureLocked, logAntiQuorum);
state ProtocolVersion protocolVersion = currentProtocolVersion;
checkTestConf(testFile, extraDB, minimumReplication, minimumRegions, configureLocked, logAntiQuorum, protocolVersion);
// TODO (IPv6) Use IPv6?
wait(g_simulator.onProcess(
@ -1401,7 +1417,7 @@ ACTOR void setupAndRun(std::string dataFolder, const char *testFile, bool reboot
Standalone<StringRef>(deterministicRandom()->randomUniqueID().toString()),
Standalone<StringRef>(deterministicRandom()->randomUniqueID().toString()),
Optional<Standalone<StringRef>>()),
ProcessClass(ProcessClass::TesterClass, ProcessClass::CommandLineSource), "", "", currentProtocolVersion), // this won't work
ProcessClass(ProcessClass::TesterClass, ProcessClass::CommandLineSource), "", "", protocolVersion),
TaskPriority::DefaultYield));
Sim2FileSystem::newFileSystem();
FlowTransport::createInstance(true, 1);
@ -1410,7 +1426,7 @@ ACTOR void setupAndRun(std::string dataFolder, const char *testFile, bool reboot
try {
//systemActors.push_back( startSystemMonitor(dataFolder) );
if (rebooting) {
wait( timeoutError( restartSimulatedSystem( &systemActors, dataFolder, &testerCount, &connFile, &startingConfiguration, extraDB, whitelistBinPaths), 100.0 ) );
wait( timeoutError( restartSimulatedSystem( &systemActors, dataFolder, &testerCount, &connFile, &startingConfiguration, extraDB, whitelistBinPaths, protocolVersion), 100.0 ) );
// FIXME: snapshot restore does not support multi-region restore, hence restore it as single region always
if (restoring) {
startingConfiguration = LiteralStringRef("usable_regions=1");
@ -1419,12 +1435,13 @@ ACTOR void setupAndRun(std::string dataFolder, const char *testFile, bool reboot
else {
g_expect_full_pointermap = 1;
setupSimulatedSystem(&systemActors, dataFolder, &testerCount, &connFile, &startingConfiguration, extraDB,
minimumReplication, minimumRegions, whitelistBinPaths, configureLocked, logAntiQuorum);
minimumReplication, minimumRegions, whitelistBinPaths, configureLocked, logAntiQuorum, protocolVersion);
wait( delay(1.0) ); // FIXME: WHY!!! //wait for machines to boot
}
std::string clusterFileDir = joinPath( dataFolder, deterministicRandom()->randomUniqueID().toString() );
platform::createDirectory( clusterFileDir );
writeFile(joinPath(clusterFileDir, "fdb.cluster"), connFile.get().toString());
std::cout << "TESTER COUNT: " << testerCount << std::endl;
wait(timeoutError(runTests(Reference<ClusterConnectionFile>(
new ClusterConnectionFile(joinPath(clusterFileDir, "fdb.cluster"))),
TEST_TYPE_FROM_FILE, TEST_ON_TESTERS, testerCount, testFile, startingConfiguration),

View File

@ -904,6 +904,9 @@ std::map<std::string, std::function<void(const std::string&)>> testSpecGlobalKey
// else { } It is enable by default for tester
TraceEvent("TestParserTest").detail("ClientInfoLogging", value);
}},
{"protocolVersion", [](const std::string& value) {
TraceEvent("TestParserTest").detail("ParsedProtocolVersion", value);
}}
};
std::map<std::string, std::function<void(const std::string& value, TestSpec* spec)>> testSpecTestKeys = {
@ -1288,9 +1291,12 @@ ACTOR Future<Void> runTests( Reference<AsyncVar<Optional<struct ClusterControlle
state Future<Void> testerTimeout = delay(600.0); // wait 600 sec for testers to show up
state vector<WorkerDetails> workers;
minTestersExpected = 1;
loop {
choose {
when( vector<WorkerDetails> w = wait( cc->get().present() ? brokenPromiseToNever( cc->get().get().getWorkers.getReply( GetWorkersRequest( flags ) ) ) : Never() ) ) {
std::cout << "WORKERS RECRUITEED" << std::endl;
if (w.size() >= minTestersExpected) {
workers = w;
break;
@ -1300,6 +1306,7 @@ ACTOR Future<Void> runTests( Reference<AsyncVar<Optional<struct ClusterControlle
when( wait( cc->onChange() ) ) {}
when( wait( testerTimeout ) ) {
TraceEvent(SevError, "TesterRecruitmentTimeout");
std::cout << "TESTER TIMEOUT" << std::endl;
throw timed_out();
}
}
@ -1366,10 +1373,13 @@ ACTOR Future<Void> runTests( Reference<ClusterConnectionFile> connFile, test_typ
if (at == TEST_HERE) {
Reference<AsyncVar<ServerDBInfo>> db( new AsyncVar<ServerDBInfo> );
vector<TesterInterface> iTesters(1);
TraceEvent("BEFORE MONITOR SERVER DBINFO");
actors.push_back( reportErrors(monitorServerDBInfo( cc, LocalityData(), db ), "MonitorServerDBInfo") ); // FIXME: Locality
TraceEvent("BEFORE MONITOR TESTER SERVER CORE");
actors.push_back( reportErrors(testerServerCore( iTesters[0], connFile, db, locality ), "TesterServerCore") );
tests = runTests( cc, ci, iTesters, testSpecs, startingConfiguration, locality );
} else {
TraceEvent("BEFORE RUNTESTS");
tests = reportErrors(runTests(cc, ci, testSpecs, at, minTestersExpected, startingConfiguration, locality), "RunTests");
}

View File

@ -25,6 +25,7 @@
#include "fdbclient/StorageServerInterface.h"
#include "fdbserver/Knobs.h"
#include "flow/ActorCollection.h"
#include "flow/ProtocolVersion.h"
#include "flow/SystemMonitor.h"
#include "flow/TDMetric.actor.h"
#include "fdbrpc/simulator.h"
@ -1797,8 +1798,11 @@ ACTOR Future<Void> serveProtocolInfo() {
state RequestStream<ProtocolInfoRequest> protocolInfo(
PeerCompatibilityPolicy{ RequirePeer::AtLeast, ProtocolVersion::withStableInterfaces() });
protocolInfo.makeWellKnownEndpoint(WLTOKEN_PROTOCOL_INFO, TaskPriority::DefaultEndpoint);
std::cout << "SETUP PROTOCOL INFO ENDPOINT ON: " << g_network->getLocalAddress().toString() << " ON VERSION " << g_network->protocolVersion().version() << std::endl;
loop {
ProtocolInfoRequest req = waitNext(protocolInfo.getFuture());
std::cout << "SENDING BACK PROTOCOL: " << g_network->protocolVersion().version() << " TO: " << req.reply.getEndpoint().addresses.toString() << std::endl;
TraceEvent("SENDING PROTOCOL INFO TO CLIENT");
req.reply.send(ProtocolInfoReply{ g_network->protocolVersion() });
}
}

View File

@ -19,10 +19,12 @@
*/
#include <cstdint>
#include <ostream>
#include <string>
#include <thread>
#include <vector>
#include "fdbclient/CoordinationInterface.h"
#include "fdbclient/MultiVersionTransaction.h"
#include "fdbrpc/FlowTransport.h"
#include "fdbrpc/Locality.h"
#include "fdbserver/SimulatedCluster.h"
@ -37,66 +39,10 @@
#include "flow/network.h"
#include "flow/actorcompiler.h" // has to be last include
ACTOR Future<Void> getProtocol(Endpoint endpoint) {
std::vector<Future<ProtocolInfoReply>> coordProtocols;
RequestStream<ProtocolInfoRequest> requestStream{ endpoint };
auto f = retryBrokenPromise(requestStream, ProtocolInfoRequest{});
ProtocolInfoReply res = wait(f);
std::cout << "GOT VERSION: " << res.version.version() << std::endl;
return Void();
}
struct _Struct {
static constexpr FileIdentifier file_identifier = 2340487;
int oldField = 0;
};
struct NewStruct : public _Struct {
int newField = 0;
bool isSet() const {
return oldField == 1 && newField == 2;
}
void setFields() {
oldField = 1;
newField = 2;
}
template <class Archive>
void serialize(Archive& ar) {
serializer(ar, oldField, newField);
}
};
ACTOR static Future<Void> writeNew(Database cx, int numObjects, Key key) {
ProtocolVersion protocolVersion = ProtocolVersion(0x0FDB00B070010000LL);
protocolVersion.addObjectSerializerFlag();
ObjectWriter writer(IncludeVersion(protocolVersion));
std::vector<NewStruct> data(numObjects);
for (auto& newObject : data) {
newObject.setFields();
}
writer.serialize(data);
state Value value = writer.toStringRef();
state Transaction tr(cx);
loop {
try {
tr.set(key, value);
wait(tr.commit());
return Void();
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
struct ProtocolVersionWorkload : TestWorkload {
ProtocolVersionWorkload(WorkloadContext const& wcx)
: TestWorkload(wcx) {
protocol = ProtocolVersion(getOption(options, LiteralStringRef("protocolVersion"), 0.0));
clusterFilePath = getOption(options, LiteralStringRef("clusterFilePath"), LiteralStringRef("")).toString();
// protocol = ProtocolVersion(getOption(options, LiteralStringRef("protocolVersion"), 0.0));
}
virtual std::string description() {
@ -108,51 +54,64 @@ struct ProtocolVersionWorkload : TestWorkload {
}
ACTOR Future<Void> _start(ProtocolVersionWorkload* self, Database cx) {
CSimpleIni ini;
state Reference<ClusterConnectionFile> connFile(new ClusterConnectionFile(self->clusterFilePath));
state const char* whitelistBinPaths = "";
state std::string dataFolder = "simfdb";
state LocalityData localities = LocalityData(Optional<Standalone<StringRef>>(),
Standalone<StringRef>(deterministicRandom()->randomUniqueID().toString()),
Standalone<StringRef>(deterministicRandom()->randomUniqueID().toString()),
Optional<Standalone<StringRef>>());
state std::string coordFolder = ini.GetValue(printable(localities.machineId()).c_str(), "coordinationFolder", "");
state ProcessClass processClass = ProcessClass(ProcessClass::TesterClass, ProcessClass::CommandLineSource);
state uint16_t listenPerProcess = 1;
state uint16_t port = 1;
state IPAddress ip = IPAddress(0x01010101);
state bool sslEnabled = false;
// CSimpleIni ini;
// // state Reference<ClusterConnectionFile> connFile(new ClusterConnectionFile(self->clusterFilePath));
// state const char* whitelistBinPaths = "";
// state std::string dataFolder = "simfdb";
// state LocalityData localities = LocalityData(Optional<Standalone<StringRef>>(),
// Standalone<StringRef>(deterministicRandom()->randomUniqueID().toString()),
// Standalone<StringRef>(deterministicRandom()->randomUniqueID().toString()),
// Optional<Standalone<StringRef>>());
// state std::string coordFolder = ini.GetValue(printable(localities.machineId()).c_str(), "coordinationFolder", "");
// state ProcessClass processClass = ProcessClass(ProcessClass::TesterClass, ProcessClass::CommandLineSource);
// state uint16_t listenPerProcess = 1;
// state uint16_t port = 1;
// state IPAddress ip = IPAddress(0x01011F11);
// state bool sslEnabled = false;
state ISimulator::ProcessInfo* process = g_pSimulator->newProcess("ProtocolVersionProcess", ip, port, sslEnabled, listenPerProcess,
// localities, processClass, dataFolder.c_str(), coordFolder.c_str(), currentProtocolVersion);
localities, processClass, dataFolder.c_str(), coordFolder.c_str(), ProtocolVersion(0x0FDB00B070010000LL));
wait(g_pSimulator->onProcess(process, TaskPriority::DefaultYield));
// state ISimulator::ProcessInfo* process = g_pSimulator->newProcess("ProtocolVersionProcess", ip, port, sslEnabled, listenPerProcess,
// localities, processClass, dataFolder.c_str(), coordFolder.c_str(), currentProtocolVersion);
// // localities, processClass, dataFolder.c_str(), coordFolder.c_str(), ProtocolVersion(0x0FDB00B070010000LL));
// wait(g_pSimulator->onProcess(process, TaskPriority::DefaultYield));
FlowTransport::createInstance(true, 1);
Sim2FileSystem::newFileSystem();
// FlowTransport::createInstance(true, 1);
// Sim2FileSystem::newFileSystem();
NetworkAddress n(ip, port, true, sslEnabled);
FlowTransport::transport().bind( n, n );
// NetworkAddress n(ip, port, true, sslEnabled);
// FlowTransport::transport().bind( n, n );
state vector<Future<Void>> actors;
actors.push_back(fdbd( connFile, localities, processClass, dataFolder, coordFolder, 500e6, "", "", -1, whitelistBinPaths));
// state vector<Future<Void>> actors;
// actors.push_back(fdbd( cx->getConnectionFile(), localities, processClass, dataFolder, coordFolder, 500e6, "", "", -1, whitelistBinPaths));
state std::vector<ISimulator::ProcessInfo*> allProcesses = g_pSimulator->getAllProcesses();
state ISimulator::ProcessInfo* nextProcess = nullptr;
for(ISimulator::ProcessInfo* p : allProcesses){
if(p->address != process->address){
nextProcess = p;
break;
}
// getting coord protocols from current protocol version
state vector<Future<ProtocolInfoReply>> coordProtocols;
vector<NetworkAddress> coordAddresses = cx->getConnectionFile()->getConnectionString().coordinators();
for(int i = 0; i<coordAddresses.size(); i++) {
RequestStream<ProtocolInfoRequest> requestStream{ Endpoint{ { coordAddresses[i] }, WLTOKEN_PROTOCOL_INFO } };
coordProtocols.push_back(retryBrokenPromise(requestStream, ProtocolInfoRequest{}));
}
ASSERT(nextProcess);
wait(g_pSimulator->onProcess(nextProcess));
wait(waitForAll(coordProtocols));
wait(getProtocol(Endpoint{{process->addresses}, WLTOKEN_PROTOCOL_INFO}));
stopAfter(waitForAll(actors));
// state std::vector<ISimulator::ProcessInfo*> allProcesses = g_pSimulator->getAllProcesses();
// state int i = 0;
// for(; i<allProcesses.size(); i++) {
// auto f = g_pSimulator->onProcess(allProcesses[i], TaskPriority::DefaultYield);
// wait(f);
// std::cout << "PROCESS PRO VESRION: " << g_network->protocolVersion().version() << std::endl;
// }
std::cout << "CURR VERSION: " << g_network->protocolVersion().version() << std::endl;
std::vector<bool> protocolMatches;
protocolMatches.reserve(coordProtocols.size());
for(int i = 0; i<coordProtocols.size(); i++){
if(g_network->protocolVersion() != coordProtocols[i].get().version) std::cout << "MISMATCHED VERSIONS" << std::endl;
protocolMatches.push_back(g_network->protocolVersion() == coordProtocols[i].get().version);
}
// wait(writeNew(cx, 1, LiteralStringRef("TEST")));
// ASSERT(count(protocolMatches.begin(), protocolMatches.end(), false) >= 1);
// g_pSimulator->killProcess(process, ISimulator::KillType::KillInstantly);
// stopAfter(waitForAll(actors));
return Void();
}

View File

@ -130,6 +130,7 @@ if(WITH_PYTHON)
add_fdb_test(TEST_FILES fast/LowLatency.toml)
add_fdb_test(TEST_FILES fast/MemoryLifetime.toml)
add_fdb_test(TEST_FILES fast/MoveKeysCycle.toml)
add_fdb_test(TEST_FILES fast/ProtocolVersion.toml)
add_fdb_test(TEST_FILES fast/RandomSelector.toml)
add_fdb_test(TEST_FILES fast/RandomUnitTests.toml)
add_fdb_test(TEST_FILES fast/ReadHotDetectionCorrectness.toml IGNORE) # TODO re-enable once read hot detection is enabled.

View File

@ -1,3 +1,5 @@
protocolVersion = 0x0FDB00B070010000
[[test]]
testTitle = 'ProtocolVersionTest'