merge anoyes/stable-interface and add back in isCompatible

This commit is contained in:
Richard Chen 2020-09-19 01:32:32 +00:00
parent d5096253c8
commit 4eb20a1283
12 changed files with 149 additions and 61 deletions

View File

@ -30,6 +30,11 @@
const int MAX_CLUSTER_FILE_BYTES = 60000;
constexpr UID WLTOKEN_CLIENTLEADERREG_GETLEADER(-1, 2);
constexpr UID WLTOKEN_CLIENTLEADERREG_OPENDATABASE(-1, 3);
constexpr UID WLTOKEN_PROTOCOL_INFO(-1, 10);
struct ClientLeaderRegInterface {
RequestStream< struct GetLeaderRequest > getLeader;
RequestStream< struct OpenDatabaseCoordRequest > openDatabase;
@ -186,4 +191,30 @@ public:
ClientCoordinators() {}
};
struct ProtocolInfoReply {
constexpr static FileIdentifier file_identifier = 7784298;
ProtocolVersion version;
template <class Ar>
void serialize(Ar& ar) {
uint64_t version_ = 0;
if (Ar::isSerializing) {
version_ = version.versionWithFlags();
}
serializer(ar, version_);
if (Ar::isDeserializing) {
version = ProtocolVersion(version_);
}
}
};
struct ProtocolInfoRequest {
constexpr static FileIdentifier file_identifier = 13261233;
ReplyPromise<ProtocolInfoReply> reply{ PeerCompatibilityPolicy{ RequirePeer::AtLeast,
ProtocolVersion::withStableInterfaces() } };
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, reply);
}
};
#endif

View File

@ -371,10 +371,6 @@ ClientCoordinators::ClientCoordinators( Key clusterKey, std::vector<NetworkAddre
ccf = Reference<ClusterConnectionFile>(new ClusterConnectionFile( ClusterConnectionString( coordinators, clusterKey ) ) );
}
UID WLTOKEN_CLIENTLEADERREG_GETLEADER( -1, 2 );
UID WLTOKEN_CLIENTLEADERREG_OPENDATABASE( -1, 3 );
ClientLeaderRegInterface::ClientLeaderRegInterface( NetworkAddress remote )
: getLeader( Endpoint({remote}, WLTOKEN_CLIENTLEADERREG_GETLEADER) ),
openDatabase( Endpoint({remote}, WLTOKEN_CLIENTLEADERREG_OPENDATABASE) )

View File

@ -292,7 +292,17 @@ ACTOR Future<Optional<StatusObject>> clientCoordinatorsStatusFetcher(Reference<C
for (int i = 0; i < coord.clientLeaderServers.size(); i++)
leaderServers.push_back(retryBrokenPromise(coord.clientLeaderServers[i].getLeader, GetLeaderRequest(coord.clusterKey, UID()), TaskPriority::CoordinationReply));
wait( smartQuorum(leaderServers, leaderServers.size() / 2 + 1, 1.5) || delay(2.0) );
state vector<Future<ProtocolInfoReply>> coordProtocols;
coordProtocols.reserve(coord.clientLeaderServers.size());
for (int i = 0; i < coord.clientLeaderServers.size(); i++) {
RequestStream<ProtocolInfoRequest> requestStream{ Endpoint{
{ coord.clientLeaderServers[i].getLeader.getEndpoint().addresses }, WLTOKEN_PROTOCOL_INFO } };
coordProtocols.push_back(retryBrokenPromise(requestStream, ProtocolInfoRequest{}));
}
wait(smartQuorum(leaderServers, leaderServers.size() / 2 + 1, 1.5) &&
smartQuorum(coordProtocols, coordProtocols.size() / 2 + 1, 1.5) ||
delay(2.0));
statusObj["quorum_reachable"] = *quorum_reachable = quorum(leaderServers, leaderServers.size() / 2 + 1).isReady();
@ -309,6 +319,9 @@ ACTOR Future<Optional<StatusObject>> clientCoordinatorsStatusFetcher(Reference<C
coordinatorsUnavailable++;
coordStatus["reachable"] = false;
}
if (coordProtocols[i].isReady()) {
coordStatus["protocol"] = coordProtocols[i].get().version.version();
}
coordsStatus.push_back(coordStatus);
}
statusObj["coordinators"] = coordsStatus;

View File

@ -42,15 +42,15 @@
static NetworkAddressList g_currentDeliveryPeerAddress = NetworkAddressList();
const UID WLTOKEN_ENDPOINT_NOT_FOUND(-1, 0);
const UID WLTOKEN_PING_PACKET(-1, 1);
const UID TOKEN_IGNORE_PACKET(0, 2);
constexpr UID WLTOKEN_ENDPOINT_NOT_FOUND(-1, 0);
constexpr UID WLTOKEN_PING_PACKET(-1, 1);
const uint64_t TOKEN_STREAM_FLAG = 1;
class EndpointMap : NonCopyable {
public:
EndpointMap();
// Reserve space for this many wellKnownEndpoints
explicit EndpointMap(int wellKnownEndpointCount);
void insertWellKnown(NetworkMessageReceiver* r, const Endpoint::Token& token, TaskPriority priority);
void insert( NetworkMessageReceiver* r, Endpoint::Token& token, TaskPriority priority );
const Endpoint& insert( NetworkAddressList localAddresses, std::vector<std::pair<FlowReceiver*, TaskPriority>> const& streams );
NetworkMessageReceiver* get( Endpoint::Token const& token );
@ -65,17 +65,16 @@ private:
uint64_t uid[2]; // priority packed into lower 32 bits; actual lower 32 bits of token are the index in data[]
uint32_t nextFree;
};
NetworkMessageReceiver* receiver;
NetworkMessageReceiver* receiver = nullptr;
Endpoint::Token& token() { return *(Endpoint::Token*)uid; }
};
int wellKnownEndpointCount;
std::vector<Entry> data;
uint32_t firstFree;
};
EndpointMap::EndpointMap()
: firstFree(-1)
{
}
EndpointMap::EndpointMap(int wellKnownEndpointCount)
: wellKnownEndpointCount(wellKnownEndpointCount), data(wellKnownEndpointCount), firstFree(-1) {}
void EndpointMap::realloc() {
int oldSize = data.size();
@ -88,6 +87,14 @@ void EndpointMap::realloc() {
firstFree = oldSize;
}
void EndpointMap::insertWellKnown(NetworkMessageReceiver* r, const Endpoint::Token& token, TaskPriority priority) {
int index = token.second();
ASSERT(data[index].receiver == nullptr);
data[index].receiver = r;
data[index].token() =
Endpoint::Token(token.first(), (token.second() & 0xffffffff00000000LL) | static_cast<uint32_t>(priority));
}
void EndpointMap::insert( NetworkMessageReceiver* r, Endpoint::Token& token, TaskPriority priority ) {
if (firstFree == uint32_t(-1)) realloc();
int index = firstFree;
@ -135,6 +142,9 @@ const Endpoint& EndpointMap::insert( NetworkAddressList localAddresses, std::vec
NetworkMessageReceiver* EndpointMap::get( Endpoint::Token const& token ) {
uint32_t index = token.second();
if (index < wellKnownEndpointCount && data[index].receiver == nullptr) {
TraceEvent(SevWarnAlways, "WellKnownEndpointNotAdded").detail("Token", token);
}
if ( index < data.size() && data[index].token().first() == token.first() && ((data[index].token().second()&0xffffffff00000000LL)|index)==token.second() )
return data[index].receiver;
return 0;
@ -147,9 +157,13 @@ TaskPriority EndpointMap::getPriority( Endpoint::Token const& token ) {
return TaskPriority::UnknownEndpoint;
}
void EndpointMap::remove( Endpoint::Token const& token, NetworkMessageReceiver* r ) {
void EndpointMap::remove(Endpoint::Token const& token, NetworkMessageReceiver* r) {
uint32_t index = token.second();
if ( index < data.size() && data[index].token().first() == token.first() && ((data[index].token().second()&0xffffffff00000000LL)|index)==token.second() && data[index].receiver == r ) {
if (index < wellKnownEndpointCount) {
data[index].receiver = nullptr;
} else if (index < data.size() && data[index].token().first() == token.first() &&
((data[index].token().second() & 0xffffffff00000000LL) | index) == token.second() &&
data[index].receiver == r) {
data[index].receiver = 0;
data[index].nextFree = firstFree;
firstFree = index;
@ -158,13 +172,10 @@ void EndpointMap::remove( Endpoint::Token const& token, NetworkMessageReceiver*
struct EndpointNotFoundReceiver : NetworkMessageReceiver {
EndpointNotFoundReceiver(EndpointMap& endpoints) {
//endpoints[WLTOKEN_ENDPOINT_NOT_FOUND] = this;
Endpoint::Token e = WLTOKEN_ENDPOINT_NOT_FOUND;
endpoints.insert(this, e, TaskPriority::DefaultEndpoint);
ASSERT( e == WLTOKEN_ENDPOINT_NOT_FOUND );
endpoints.insertWellKnown(this, WLTOKEN_ENDPOINT_NOT_FOUND, TaskPriority::DefaultEndpoint);
}
virtual void receive(ArenaObjectReader& reader) override {
// Remote machine tells us it doesn't have endpoint e
void receive(ArenaObjectReader& reader) override {
Endpoint e;
reader.deserialize(e);
IFailureMonitor::failureMonitor().endpointNotFound(e);
@ -173,11 +184,10 @@ struct EndpointNotFoundReceiver : NetworkMessageReceiver {
struct PingReceiver : NetworkMessageReceiver {
PingReceiver(EndpointMap& endpoints) {
Endpoint::Token e = WLTOKEN_PING_PACKET;
endpoints.insert(this, e, TaskPriority::ReadSocket);
ASSERT( e == WLTOKEN_PING_PACKET );
endpoints.insertWellKnown(this, WLTOKEN_PING_PACKET, TaskPriority::ReadSocket);
}
virtual void receive(ArenaObjectReader& reader) override {
void receive(ArenaObjectReader& reader) override {
ReplyPromise<Void> reply;
reader.deserialize(reply);
reply.send(Void());
@ -187,13 +197,8 @@ struct PingReceiver : NetworkMessageReceiver {
class TransportData {
public:
TransportData(uint64_t transportId)
: endpointNotFoundReceiver(endpoints),
pingReceiver(endpoints),
warnAlwaysForLargePacket(true),
lastIncompatibleMessage(0),
transportId(transportId),
numIncompatibleConnections(0)
{
: endpoints(/*wellKnownTokenCount*/ 11), warnAlwaysForLargePacket(true), lastIncompatibleMessage(0),
transportId(transportId), numIncompatibleConnections(0) {
degraded = Reference<AsyncVar<bool>>( new AsyncVar<bool>(false) );
}
@ -222,11 +227,9 @@ public:
Reference<AsyncVar<bool>> degraded;
bool warnAlwaysForLargePacket;
// These declarations must be in exactly this order
EndpointMap endpoints;
EndpointNotFoundReceiver endpointNotFoundReceiver;
PingReceiver pingReceiver;
// End ordered declarations
EndpointNotFoundReceiver endpointNotFoundReceiver{ endpoints };
PingReceiver pingReceiver{ endpoints };
Int64MetricHandle bytesSent;
Int64MetricHandle countPacketsReceived;
@ -758,6 +761,15 @@ TransportData::~TransportData() {
}
}
static bool checkCompatible(const PeerCompatibilityPolicy& policy, ProtocolVersion version) {
switch (policy.requirement) {
case RequirePeer::Exactly:
return version.version() == policy.version.version();
case RequirePeer::AtLeast:
return version.version() >= policy.version.version();
}
}
ACTOR static void deliver(TransportData* self, Endpoint destination, ArenaReader reader, bool inReadSocket) {
TaskPriority priority = self->endpoints.getPriority(destination.token);
if (priority < TaskPriority::ReadSocket || !inReadSocket) {
@ -768,6 +780,10 @@ ACTOR static void deliver(TransportData* self, Endpoint destination, ArenaReader
auto receiver = self->endpoints.get(destination.token);
if (receiver) {
if (!checkCompatible(receiver->peerCompatibilityPolicy(), reader.protocolVersion())) {
// TODO(anoyes): Report incompatibility somehow
return;
}
try {
g_currentDeliveryPeerAddress = destination.addresses;
StringRef data = reader.arenaReadAll();
@ -868,7 +884,9 @@ static void scanPackets(TransportData* transport, uint8_t*& unprocessed_begin, c
#if VALGRIND
VALGRIND_CHECK_MEM_IS_DEFINED(p, packetLen);
#endif
ArenaReader reader(arena, StringRef(p, packetLen), AssumeVersion(currentProtocolVersion));
// remove object serializer flag to account for flat buffer
peerProtocolVersion.removeObjectSerializerFlag();
ArenaReader reader(arena, StringRef(p, packetLen), AssumeVersion(peerProtocolVersion));
UID token;
reader >> token;
@ -979,7 +997,8 @@ ACTOR static Future<Void> connectionReader(
uint64_t connectionId = pkt.connectionId;
if (!pkt.protocolVersion.hasObjectSerializerFlag() ||
!pkt.protocolVersion.isCompatible(currentProtocolVersion)) {
// !pkt.protocolVersion.hasStableInterfaces()) {
!pkt.protocolVersion.isCompatible(currentProtocolVersion)) {
incompatibleProtocolVersionNewer = pkt.protocolVersion > currentProtocolVersion;
NetworkAddress addr = pkt.canonicalRemotePort
? NetworkAddress(pkt.canonicalRemoteIp(), pkt.canonicalRemotePort)
@ -992,7 +1011,6 @@ ACTOR static Future<Void> connectionReader(
.detail("Reason", "IncompatibleProtocolVersion")
.detail("LocalVersion", currentProtocolVersion.version())
.detail("RejectedVersion", pkt.protocolVersion.version())
.detail("VersionMask", ProtocolVersion::compatibleProtocolVersionMask)
.detail("Peer", pkt.canonicalRemotePort ? NetworkAddress(pkt.canonicalRemoteIp(), pkt.canonicalRemotePort)
: conn->getPeerAddress())
.detail("ConnectionId", connectionId);
@ -1056,7 +1074,7 @@ ACTOR static Future<Void> connectionReader(
}
}
}
if (compatible) {
if (compatible || peerProtocolVersion.hasStableInterfaces()) { // if compatible or peerProtocolVersion.hasStableInterfaces
scanPackets( transport, unprocessed_begin, unprocessed_end, arena, peerAddress, peerProtocolVersion );
}
else if(!expectConnectPacket) {
@ -1284,10 +1302,8 @@ void FlowTransport::removeEndpoint( const Endpoint& endpoint, NetworkMessageRece
void FlowTransport::addWellKnownEndpoint( Endpoint& endpoint, NetworkMessageReceiver* receiver, TaskPriority taskID ) {
endpoint.addresses = self->localAddresses;
ASSERT( ((endpoint.token.first() & TOKEN_STREAM_FLAG)!=0) == receiver->isStream() );
Endpoint::Token otoken = endpoint.token;
self->endpoints.insert( receiver, endpoint.token, taskID );
ASSERT( endpoint.token == otoken );
ASSERT(receiver->isStream());
self->endpoints.insertWellKnown(receiver, endpoint.token, taskID);
}
static void sendLocal( TransportData* self, ISerializeSource const& what, const Endpoint& destination ) {

View File

@ -115,11 +115,22 @@ namespace std
};
}
enum class RequirePeer { Exactly, AtLeast };
struct PeerCompatibilityPolicy {
RequirePeer requirement;
ProtocolVersion version;
};
class ArenaObjectReader;
class NetworkMessageReceiver {
public:
virtual void receive(ArenaObjectReader&) = 0;
virtual bool isStream() const { return false; }
virtual PeerCompatibilityPolicy peerCompatibilityPolicy() const {
// TODO(anoyes) Add "this process's protocol version" to INetwork interface and use that here instead.
return { RequirePeer::Exactly, currentProtocolVersion };
}
};
struct TransportData;

View File

@ -66,6 +66,12 @@ struct FlowReceiver : public NetworkMessageReceiver {
endpoint = e;
}
void setPeerCompatibilityPolicy(const PeerCompatibilityPolicy& policy) { peerCompatibilityPolicy_ = policy; }
PeerCompatibilityPolicy peerCompatibilityPolicy() const override {
return peerCompatibilityPolicy_.orDefault(NetworkMessageReceiver::peerCompatibilityPolicy());
}
void makeWellKnownEndpoint(Endpoint::Token token, TaskPriority taskID) {
ASSERT(!endpoint.isValid());
m_isLocalEndpoint = true;
@ -74,6 +80,7 @@ struct FlowReceiver : public NetworkMessageReceiver {
}
private:
Optional<PeerCompatibilityPolicy> peerCompatibilityPolicy_;
Endpoint endpoint;
bool m_isLocalEndpoint;
bool m_stream;
@ -117,6 +124,9 @@ public:
bool isSet() { return sav->isSet(); }
bool isValid() const { return sav != nullptr; }
ReplyPromise() : sav(new NetSAV<T>(0, 1)) {}
explicit ReplyPromise(const PeerCompatibilityPolicy& policy) : ReplyPromise() {
sav->setPeerCompatibilityPolicy(policy);
}
ReplyPromise(const ReplyPromise& rhs) : sav(rhs.sav) { sav->addPromiseRef(); }
ReplyPromise(ReplyPromise&& rhs) noexcept : sav(rhs.sav) { rhs.sav = 0; }
~ReplyPromise() { if (sav) sav->delPromiseRef(); }
@ -359,6 +369,9 @@ public:
FutureStream<T> getFuture() const { queue->addFutureRef(); return FutureStream<T>(queue); }
RequestStream() : queue(new NetNotifiedQueue<T>(0, 1)) {}
explicit RequestStream(PeerCompatibilityPolicy policy) : RequestStream() {
queue->setPeerCompatibilityPolicy(policy);
}
RequestStream(const RequestStream& rhs) : queue(rhs.queue) { queue->addPromiseRef(); }
RequestStream(RequestStream&& rhs) noexcept : queue(rhs.queue) { rhs.queue = 0; }
void operator=(const RequestStream& rhs) {

View File

@ -85,8 +85,6 @@ void ISimulator::displayWorkers() const
return;
}
const UID TOKEN_ENDPOINT_NOT_FOUND(-1, -1);
ISimulator* g_pSimulator = 0;
thread_local ISimulator::ProcessInfo* ISimulator::currentProcess = 0;
int openCount = 0;

View File

@ -42,17 +42,6 @@ struct GenerationRegVal {
}
};
// The order of UIDs here must match the order in which makeWellKnownEndpoint is called.
// UID WLTOKEN_CLIENTLEADERREG_GETLEADER( -1, 2 ); // from fdbclient/MonitorLeader.actor.cpp
// UID WLTOKEN_CLIENTLEADERREG_OPENDATABASE( -1, 3 ); // from fdbclient/MonitorLeader.actor.cpp
UID WLTOKEN_LEADERELECTIONREG_CANDIDACY( -1, 4 );
UID WLTOKEN_LEADERELECTIONREG_ELECTIONRESULT( -1, 5 );
UID WLTOKEN_LEADERELECTIONREG_LEADERHEARTBEAT( -1, 6 );
UID WLTOKEN_LEADERELECTIONREG_FORWARD( -1, 7 );
UID WLTOKEN_GENERATIONREG_READ( -1, 8 );
UID WLTOKEN_GENERATIONREG_WRITE( -1, 9 );
GenerationRegInterface::GenerationRegInterface( NetworkAddress remote )
: read( Endpoint({remote}, WLTOKEN_GENERATIONREG_READ) ),
write( Endpoint({remote}, WLTOKEN_GENERATIONREG_WRITE) )

View File

@ -24,6 +24,13 @@
#include "fdbclient/CoordinationInterface.h"
constexpr UID WLTOKEN_LEADERELECTIONREG_CANDIDACY(-1, 4);
constexpr UID WLTOKEN_LEADERELECTIONREG_ELECTIONRESULT(-1, 5);
constexpr UID WLTOKEN_LEADERELECTIONREG_LEADERHEARTBEAT(-1, 6);
constexpr UID WLTOKEN_LEADERELECTIONREG_FORWARD(-1, 7);
constexpr UID WLTOKEN_GENERATIONREG_READ(-1, 8);
constexpr UID WLTOKEN_GENERATIONREG_WRITE(-1, 9);
struct GenerationRegInterface {
constexpr static FileIdentifier file_identifier = 16726744;
RequestStream< struct GenerationRegReadRequest > read;

View File

@ -1792,6 +1792,16 @@ ACTOR Future<Void> monitorLeaderRemotelyWithDelayedCandidacy( Reference<ClusterC
}
}
ACTOR Future<Void> serveProtocolInfo() {
state RequestStream<ProtocolInfoRequest> protocolInfo(
PeerCompatibilityPolicy{ RequirePeer::AtLeast, ProtocolVersion::withStableInterfaces() });
protocolInfo.makeWellKnownEndpoint(WLTOKEN_PROTOCOL_INFO, TaskPriority::DefaultEndpoint);
loop {
ProtocolInfoRequest req = waitNext(protocolInfo.getFuture());
req.reply.send(ProtocolInfoReply{ currentProtocolVersion });
}
}
ACTOR Future<Void> fdbd(
Reference<ClusterConnectionFile> connFile,
LocalityData localities,
@ -1807,6 +1817,8 @@ ACTOR Future<Void> fdbd(
state vector<Future<Void>> actors;
state Promise<Void> recoveredDiskFiles;
actors.push_back(serveProtocolInfo());
try {
ServerCoordinators coordinators( connFile );
if (g_network->isSimulated()) {

View File

@ -66,7 +66,7 @@ class UID {
public:
constexpr static FileIdentifier file_identifier = 15597147;
UID() { part[0] = part[1] = 0; }
UID( uint64_t a, uint64_t b ) { part[0]=a; part[1]=b; }
constexpr UID(uint64_t a, uint64_t b) : part{ a, b } {}
std::string toString() const;
std::string shortString() const;
bool isValid() const { return part[0] || part[1]; }

View File

@ -39,7 +39,7 @@ class ProtocolVersion {
public: // constants
static constexpr uint64_t versionFlagMask = 0x0FFFFFFFFFFFFFFFLL;
static constexpr uint64_t objectSerializerFlag = 0x1000000000000000LL;
static constexpr uint64_t compatibleProtocolVersionMask = 0xffffffffffff0000LL;
static constexpr uint64_t compatibleProtocolVersionMask = 0xFFFFFFFFFFFF0000LL;
static constexpr uint64_t minValidProtocolVersion = 0x0FDB00A200060001LL;
public:
@ -49,6 +49,7 @@ public:
constexpr bool isCompatible(ProtocolVersion other) const {
return (other.version() & compatibleProtocolVersionMask) == (version() & compatibleProtocolVersionMask);
}
constexpr bool isValid() const { return version() >= minValidProtocolVersion; }
constexpr uint64_t version() const { return _version & versionFlagMask; }
@ -128,6 +129,7 @@ public: // introduced features
PROTOCOL_VERSION_FEATURE(0x0FDB00B063010000LL, ReportConflictingKeys);
PROTOCOL_VERSION_FEATURE(0x0FDB00B063010000LL, SmallEndpoints);
PROTOCOL_VERSION_FEATURE(0x0FDB00B063010000LL, CacheRole);
PROTOCOL_VERSION_FEATURE(0x0FDB00B070010000LL, StableInterfaces);
PROTOCOL_VERSION_FEATURE(0x0FDB00B070010001LL, TagThrottleValueReason);
};