switch to test newer incompatible version. Fix PR comments. Modify schema

This commit is contained in:
Richard Chen 2020-10-09 20:22:40 +00:00
parent 2e2ed6d93d
commit 8c96763ea9
11 changed files with 25 additions and 61 deletions

View File

@ -780,7 +780,8 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
"coordinators":[
{
"reachable":true,
"address":"127.0.0.1:4701"
"address":"127.0.0.1:4701",
"protocol": 1142507688261910528
}
],
"quorum_reachable":true

View File

@ -269,7 +269,7 @@ struct YieldMockNetwork : INetwork, ReferenceCounted<YieldMockNetwork> {
static TLSConfig emptyConfig;
return emptyConfig;
}
virtual ProtocolVersion protocolVersion() override {
ProtocolVersion protocolVersion() override {
return baseNetwork->protocolVersion();
}
};

View File

@ -782,7 +782,6 @@ ACTOR static void deliver(TransportData* self, Endpoint destination, ArenaReader
auto receiver = self->endpoints.get(destination.token);
if (receiver) {
if (!checkCompatible(receiver->peerCompatibilityPolicy(), reader.protocolVersion())) {
// TODO(anoyes): Report incompatibility somehow
return;
}
try {
@ -1061,7 +1060,6 @@ ACTOR static Future<Void> connectionReader(
peerAddress = NetworkAddress(pkt.canonicalRemoteIp(), pkt.canonicalRemotePort, true,
peerAddress.isTLS());
}
// TODO: try to get or open peer here
peer = transport->getOrOpenPeer(peerAddress, false);
peer->compatible = compatible;
peer->incompatibleProtocolVersionNewer = incompatibleProtocolVersionNewer;
@ -1157,8 +1155,6 @@ Reference<Peer> TransportData::getPeer( NetworkAddress const& address ) {
}
Reference<Peer> TransportData::getOrOpenPeer( NetworkAddress const& address, bool startConnectionKeeper ) {
// std::cout << "Get or open peer " << address.toString() << std::endl;
// TraceEvent("Get Or open peer");
auto peer = getPeer(address);
if(!peer) {
peer = Reference<Peer>( new Peer(this, address) );

View File

@ -1030,7 +1030,6 @@ public:
ProcessInfo* m = new ProcessInfo(name, locality, startingClass, addresses, this, dataFolder, coordinationFolder);
for (int processPort = port; processPort < port + listenPerProcess; ++processPort) {
NetworkAddress address(ip, processPort, true, sslEnabled && processPort == port);
TraceEvent("SETTING UP SIM2LIstenr");
m->listenerMap[address] = Reference<IListener>( new Sim2Listener(m, address) );
addressMap[address] = m;
}
@ -1039,7 +1038,6 @@ public:
currentlyRebootingProcesses.erase(addresses.address);
m->excluded = g_simulator.isExcluded(NetworkAddress(ip, port, true, false));
m->cleared = g_simulator.isCleared(addresses.address);
processProtocolVersion[m] = protocol;
m->protocolVersion = protocol;
m->setGlobal(enTDMetrics, (flowGlobalType) &m->tdmetrics);
@ -1707,9 +1705,8 @@ public:
return delay( 0, taskID, process->machine->machineProcess );
}
virtual ProtocolVersion protocolVersion() override {
ASSERT(processProtocolVersion.find(getCurrentProcess()) != processProtocolVersion.end());
return processProtocolVersion.at(getCurrentProcess());
ProtocolVersion protocolVersion() override {
return getCurrentProcess()->protocolVersion;
}
//time is guarded by ISimulator::mutex. It is not necessary to guard reads on the main thread because
@ -1736,8 +1733,6 @@ public:
//Map from machine IP -> machine disk space info
std::map<IPAddress, SimDiskSpace> diskSpaceMap;
std::map<ProcessInfo*, ProtocolVersion> processProtocolVersion;
//Whether or not yield has returned true during the current iteration of the run loop
bool yielded;
int yield_limit; // how many more times yield may return false before next returning true

View File

@ -328,7 +328,7 @@ public:
BackupAgentType backupAgents;
BackupAgentType drAgents;
bool hasDiffProtocol;
bool hasDiffProtocolProcess; // true if simulator is testing a process with a different version
bool setDiffProtocol;
virtual flowGlobalType global(int id) const { return getCurrentProcess()->global(id); };

View File

@ -345,7 +345,7 @@ ACTOR Future<Void> simulatedMachine(ClusterConnectionString connStr, std::vector
Reference<ClusterConnectionFile> clusterFile(useSeedFile ? new ClusterConnectionFile(path, connStr.toString()) : new ClusterConnectionFile(path));
const int listenPort = i*listenPerProcess + 1;
AgentMode agentMode = runBackupAgents == AgentOnly ? ( i == ips.size()-1 ? AgentOnly : AgentNone ) : runBackupAgents;
if(g_simulator.hasDiffProtocol && !g_simulator.setDiffProtocol && agentMode == AgentNone) {
if(g_simulator.hasDiffProtocolProcess && !g_simulator.setDiffProtocol && agentMode == AgentNone) {
processes.push_back(simulatedFDBDRebooter(clusterFile, ips[i], sslEnabled, listenPort, listenPerProcess, localities, processClass, &myFolders[i], &coordFolders[i], baseFolder, connStr, useSeedFile, agentMode, whitelistBinPaths, protocolVersion));
g_simulator.setDiffProtocol = true;
}
@ -1348,7 +1348,7 @@ void setupSimulatedSystem(vector<Future<Void>>* systemActors, std::string baseFo
}
void checkTestConf(const char* testFile, int& extraDB, int& minimumReplication, int& minimumRegions,
int& configureLocked, ProtocolVersion& protocolVersion) {
int& configureLocked, bool& startIncompatibleProcess) {
std::ifstream ifs;
ifs.open(testFile, std::ifstream::in);
if (!ifs.good())
@ -1385,10 +1385,8 @@ void checkTestConf(const char* testFile, int& extraDB, int& minimumReplication,
sscanf( value.c_str(), "%d", &configureLocked );
}
if (attrib == "protocolVersion") {
uint64_t protocolVersionInt = 0;
sscanf(value.c_str(), "%" SCNx64, &protocolVersionInt);
protocolVersion = ProtocolVersion(protocolVersionInt);
if (attrib == "startIncompatibleProcess") {
startIncompatibleProcess = strcmp(value.c_str(), "true") == 0;
}
}
@ -1404,10 +1402,11 @@ ACTOR void setupAndRun(std::string dataFolder, const char *testFile, bool reboot
state int minimumReplication = 0;
state int minimumRegions = 0;
state int configureLocked = 0;
state ProtocolVersion protocolVersion = currentProtocolVersion;
checkTestConf(testFile, extraDB, minimumReplication, minimumRegions, configureLocked, protocolVersion);
g_simulator.hasDiffProtocol = protocolVersion != currentProtocolVersion;
state bool startIncompatibleProcess = false;
checkTestConf(testFile, extraDB, minimumReplication, minimumRegions, configureLocked, startIncompatibleProcess);
g_simulator.hasDiffProtocolProcess = startIncompatibleProcess;
g_simulator.setDiffProtocol = false;
state ProtocolVersion protocolVersion = startIncompatibleProcess ? ProtocolVersion(currentProtocolVersion.version()+1) : currentProtocolVersion;
// TODO (IPv6) Use IPv6?
wait(g_simulator.onProcess(

View File

@ -901,8 +901,8 @@ std::map<std::string, std::function<void(const std::string&)>> testSpecGlobalKey
// else { } It is enable by default for tester
TraceEvent("TestParserTest").detail("ClientInfoLogging", value);
}},
{"protocolVersion", [](const std::string& value) {
TraceEvent("TestParserTest").detail("ParsedProtocolVersion", value);
{"startIncompatibleProcess", [](const std::string& value) {
TraceEvent("TestParserTest").detail("ParsedStartIncompatibleProcess", value);
}}
};

View File

@ -1794,11 +1794,8 @@ ACTOR Future<Void> serveProtocolInfo() {
state RequestStream<ProtocolInfoRequest> protocolInfo(
PeerCompatibilityPolicy{ RequirePeer::AtLeast, ProtocolVersion::withStableInterfaces() });
protocolInfo.makeWellKnownEndpoint(WLTOKEN_PROTOCOL_INFO, TaskPriority::DefaultEndpoint);
std::cout << "SETUP PROTOCOL INFO ENDPOINT ON: " << g_network->getLocalAddress().toString() << " ON VERSION " << g_network->protocolVersion().version() << std::endl;
loop {
ProtocolInfoRequest req = waitNext(protocolInfo.getFuture());
std::cout << "SENDING BACK PROTOCOL: " << g_network->protocolVersion().version() << " TO: " << req.reply.getEndpoint().addresses.toString() << std::endl;
TraceEvent("SENDING PROTOCOL INFO TO CLIENT");
req.reply.send(ProtocolInfoReply{ g_network->protocolVersion() });
}
}

View File

@ -45,53 +45,34 @@ struct ProtocolVersionWorkload : TestWorkload {
}
virtual std::string description() {
std::string description() override {
return "ProtocolVersionWorkload";
}
virtual Future<Void> start(Database const& cx) {
Future<Void> start(Database const& cx) override {
return _start(this, cx);
}
ACTOR Future<Void> _start(ProtocolVersionWorkload* self, Database cx) {
state ISimulator::ProcessInfo* oldProcess = g_pSimulator->getCurrentProcess();
state std::vector<ISimulator::ProcessInfo*> allProcesses = g_pSimulator->getAllProcesses();
state std::vector<ISimulator::ProcessInfo*>::iterator diffVersionProcess = find_if(allProcesses.begin(), allProcesses.end(), [](const ISimulator::ProcessInfo* p){
return p->protocolVersion != currentProtocolVersion;
});
ASSERT(diffVersionProcess != allProcesses.end());
wait(g_pSimulator->onProcess(*diffVersionProcess, TaskPriority::DefaultYield));
// getting coord protocols from current protocol version
state vector<Future<ProtocolInfoReply>> coordProtocols;
vector<NetworkAddress> coordAddresses = cx->getConnectionFile()->getConnectionString().coordinators();
for(int i = 0; i<coordAddresses.size(); i++) {
RequestStream<ProtocolInfoRequest> requestStream{ Endpoint{ { coordAddresses[i] }, WLTOKEN_PROTOCOL_INFO } };
coordProtocols.push_back(retryBrokenPromise(requestStream, ProtocolInfoRequest{}));
}
wait(waitForAll(coordProtocols));
std::vector<bool> protocolMatches;
protocolMatches.reserve(coordProtocols.size());
for(int i = 0; i<coordProtocols.size(); i++){
protocolMatches.push_back(g_network->protocolVersion() == coordProtocols[i].get().version);
}
ASSERT(count(protocolMatches.begin(), protocolMatches.end(), false) >= 1);
// go back to orig process for consistency check
wait(g_pSimulator->onProcess(oldProcess, TaskPriority::DefaultYield));
RequestStream<ProtocolInfoRequest> requestStream{ Endpoint{ { (*diffVersionProcess)->addresses }, WLTOKEN_PROTOCOL_INFO } };
ProtocolInfoReply reply = wait(retryBrokenPromise(requestStream, ProtocolInfoRequest{}));
ASSERT(reply.version != g_network->protocolVersion());
return Void();
}
virtual Future<bool> check(Database const& cx) {
Future<bool> check(Database const& cx) override {
return true;
}
virtual void getMetrics(vector<PerfMetric>& m) {
void getMetrics(vector<PerfMetric>& m) override {
}
};

View File

@ -437,11 +437,6 @@ class INetwork;
extern INetwork* g_network;
extern INetwork* newNet2(const TLSConfig& tlsConfig, bool useThreadPool = false, bool useMetrics = false);
// Make current protocol a member
// find all usess of current protocol version
// add member function that returns version
// in simulation, they share same instance of INetwork. In simulated, its possible to return the calling processes protocol version.
class INetwork {
public:
// This interface abstracts the physical or simulated network, event loop and hardware that FoundationDB is running on.

View File

@ -1,4 +1,4 @@
protocolVersion = 0x0FDB00B070010000
startIncompatibleProcess = true
[[test]]
testTitle = 'ProtocolVersionTest'