Merge pull request #3858 from sfc-gh-rchen/stable_interfaces

Stable interfaces
This commit is contained in:
Andrew Noyes 2020-12-11 09:34:27 -08:00 committed by GitHub
commit 9601769b01
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
58 changed files with 562 additions and 134 deletions

View File

@ -18,6 +18,7 @@
* limitations under the License.
*/
#include <cstdint>
#define FDB_API_VERSION 700
#define FDB_INCLUDE_LEGACY_TYPES
@ -226,6 +227,11 @@ fdb_error_t fdb_future_get_int64( FDBFuture* f, int64_t* out_value ) {
CATCH_AND_RETURN( *out_value = TSAV(int64_t, f)->get(); );
}
extern "C" DLLEXPORT
fdb_error_t fdb_future_get_uint64(FDBFuture *f, uint64_t *out) {
CATCH_AND_RETURN( *out = TSAV(uint64_t, f)->get(); );
}
extern "C" DLLEXPORT
fdb_error_t fdb_future_get_key( FDBFuture* f, uint8_t const** out_key,
int* out_key_length ) {
@ -598,6 +604,11 @@ FDBFuture* fdb_transaction_get_approximate_size(FDBTransaction* tr) {
return (FDBFuture*)TXN(tr)->getApproximateSize().extractPtr();
}
extern "C" DLLEXPORT
FDBFuture* fdb_get_server_protocol(const char* clusterFilePath){
return (FDBFuture*)( API->getServerProtocol(clusterFilePath ? clusterFilePath : "").extractPtr() );
}
extern "C" DLLEXPORT
FDBFuture* fdb_transaction_get_versionstamp( FDBTransaction* tr )
{

View File

@ -136,6 +136,9 @@ extern "C" {
DLLEXPORT WARN_UNUSED_RESULT fdb_error_t
fdb_future_get_int64( FDBFuture* f, int64_t* out );
DLLEXPORT WARN_UNUSED_RESULT fdb_error_t
fdb_future_get_uint64( FDBFuture* f, uint64_t* out );
DLLEXPORT WARN_UNUSED_RESULT fdb_error_t
fdb_future_get_key( FDBFuture* f, uint8_t const** out_key,
int* out_key_length );
@ -248,6 +251,9 @@ extern "C" {
DLLEXPORT WARN_UNUSED_RESULT FDBFuture*
fdb_transaction_get_approximate_size(FDBTransaction* tr);
DLLEXPORT WARN_UNUSED_RESULT FDBFuture*
fdb_get_server_protocol(const char* clusterFilePath);
DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_transaction_get_versionstamp( FDBTransaction* tr );
DLLEXPORT WARN_UNUSED_RESULT FDBFuture*

View File

@ -89,7 +89,6 @@ class Int64Future : public Future {
Int64Future(FDBFuture* f) : Future(f) {}
};
class KeyFuture : public Future {
public:
// Call this function instead of fdb_future_get_key when using the KeyFuture
@ -148,7 +147,6 @@ class EmptyFuture : public Future {
EmptyFuture(FDBFuture* f) : Future(f) {}
};
// Wrapper around FDBTransaction, providing the same set of calls as the C API.
// Handles cleanup of memory, removing the need to call
// fdb_transaction_destroy.

View File

@ -55,6 +55,7 @@ FDBDatabase *fdb_open_database(const char *clusterFile) {
static FDBDatabase *db = nullptr;
static std::string prefix;
static std::string clusterFilePath = "";
std::string key(const std::string& key) {
return prefix + key;
@ -1537,6 +1538,15 @@ TEST_CASE("fdb_transaction_get_approximate_size") {
}
}
TEST_CASE("fdb_get_server_protocol") {
FDBFuture* protocolFuture = fdb_get_server_protocol(clusterFilePath.c_str());
uint64_t out;
fdb_check(fdb_future_block_until_ready(protocolFuture));
fdb_check(fdb_future_get_uint64(protocolFuture, &out));
fdb_future_destroy(protocolFuture);
}
TEST_CASE("fdb_transaction_watch read_your_writes_disable") {
// Watches created on a transaction with the option READ_YOUR_WRITES_DISABLE
// should return a watches_disabled error.
@ -2037,6 +2047,7 @@ int main(int argc, char **argv) {
std::thread network_thread{ &fdb_run_network };
db = fdb_open_database(argv[1]);
clusterFilePath = std::string(argv[1]);
prefix = argv[2];
int res = context.run();
fdb_database_destroy(db);

View File

@ -95,6 +95,7 @@ def api_version(ver):
'transactional',
'options',
'StreamingMode',
'get_server_protocol'
)
_add_symbols(fdb.impl, list)

View File

@ -733,6 +733,12 @@ class FutureInt64(Future):
self.capi.fdb_future_get_int64(self.fpointer, ctypes.byref(value))
return value.value
class FutureUInt64(Future):
def wait(self):
self.block_until_ready()
value = ctypes.c_uint64()
self.capi.fdb_future_get_uint64(self.fpointer, ctypes.byref(value))
return value.value
class FutureKeyValueArray(Future):
def wait(self):
@ -1417,6 +1423,10 @@ def init_c_api():
_capi.fdb_future_get_int64.restype = ctypes.c_int
_capi.fdb_future_get_int64.errcheck = check_error_code
_capi.fdb_future_get_uint64.argtypes = [ctypes.c_void_p, ctypes.POINTER(ctypes.c_uint64)]
_capi.fdb_future_get_uint64.restype = ctypes.c_uint
_capi.fdb_future_get_uint64.errcheck = check_error_code
_capi.fdb_future_get_key.argtypes = [ctypes.c_void_p, ctypes.POINTER(ctypes.POINTER(ctypes.c_byte)),
ctypes.POINTER(ctypes.c_int)]
_capi.fdb_future_get_key.restype = ctypes.c_int
@ -1521,6 +1531,9 @@ def init_c_api():
_capi.fdb_transaction_get_approximate_size.argtypes = [ctypes.c_void_p]
_capi.fdb_transaction_get_approximate_size.restype = ctypes.c_void_p
_capi.fdb_get_server_protocol.argtypes = [ctypes.c_char_p]
_capi.fdb_get_server_protocol.restype = ctypes.c_void_p
_capi.fdb_transaction_get_versionstamp.argtypes = [ctypes.c_void_p]
_capi.fdb_transaction_get_versionstamp.restype = ctypes.c_void_p
@ -1720,6 +1733,12 @@ open_databases = {}
cacheLock = threading.Lock()
def get_server_protocol(clusterFilePath=None):
with _network_thread_reentrant_lock:
if not _network_thread:
init()
return FutureUInt64(_capi.fdb_get_server_protocol(optionalParamToBytes(clusterFilePath)[0]))
def open(cluster_file=None, event_model=None):
"""Opens the given database (or the default database of the cluster indicated

91
design/flow_transport.md Normal file
View File

@ -0,0 +1,91 @@
# Flow Transport
This section describes the design and implementation of the flow transport wire protocol (as of release 6.3).
## ConnectPacket
The first bytes sent over a tcp connection in flow are the `ConnectPacket`.
This is a variable length message (though fixed length at a given protocol
version) designed with forward and backward compatibility in mind. The expected length of the `ConnectPacket` is encoded as the first 4 bytes (unsigned, little-endian). Upon receiving an incoming connection, a peer reads the `ProtocolVersion` (the next 8 bytes unsigned, little-endian. The most significant 4 bits encode flags and should be zeroed before interpreting numerically.) from the `ConnectPacket`.
## Protocol compatibility
Based on the incoming connection's `ProtocolVersion`, this connection is either
"compatible" or "incompatible". If this connection is incompatible, then we
will not actually look at any bytes sent after the `ConnectPacket`, but we will
keep the connection open so that the peer does not keep trying to open new
connections.
If this connection is compatible, then we know that our peer is using the same wire protocol as we are and we can proceed.
## Framing and checksumming protocol
As of release 6.3, the structure of subsequent messages is as follows:
* For TLS connections:
1. packet length (4 bytes unsigned little-endian)
2. token (16 opaque bytes that identify the recipient of this message)
3. message contents (packet length - 16 bytes to be interpreted by the recipient)
* For non-TLS connections, there's additionally a crc32 checksum for message integrity:
1. packet length (4 bytes unsigned little-endian)
2. 4 byte crc32 checksum of token + message
3. token
4. message
## Well-known endpoints
Endpoints are a pair of a 16 byte token that identifies the recipient and a
network address to send a message to. Endpoints are usually obtained over the
network - for example a request conventionally includes the endpoint the
reply should be sent to (like a self-addressed stamped envelope). So if you
can send a message and get endpoints in reply you can start sending messages
those endpoints. But how do you send that first message?
That's where the concept of a "well-known" endpoint comes in. Some endpoints
(for example the endpoints coordinators are listening on) use "well-known"
tokens that are agreed upon ahead of time. Technically the value of these
tokens could be changed as part of an incompatible protocol version bump, but
in practice this hasn't happened and shouldn't ever need to happen.
## Flatbuffers
Prior to release-6.2 the structure of messages (e.g. how many fields a
message has) was implicitly part of the protocol version, and so adding a
field to any message required a protocol version bump. Since release-6.2
messages are encoded as flatbuffers messages, and you can technically add
fields without a protocol version bump. This is a powerful and dangerous tool
that needs to be used with caution. If you add a field without a protocol version bump, then you can no longer be certain that this field will always be present (e.g. if you get a message from an old peer it might not include that field.)
We don't have a good way to test two or more fdbserver binaries in
simulation, so we discourage adding fields or otherwise making any protocol
changes without a protocol version bump.
Bumping the protocol version is costly for clients though, since now they need a whole new libfdb_c.so to be able to talk to the cluster _at all_.
## Stable Endpoints
Stable endpoints are a proposal to allow protocol compatibility to be checked
per endpoint rather than per connection. The proposal is to commit to the
current (release-6.3) framing protocol for opening connections, and allow a
newer framing protocol (for example a new checksum) to be negotiated after
the connection has been established. This way even if peers are at different
protocol versions they can still read the token each message is addressed to,
and they can use that token to decide whether or not to attempt to handle the
message. By default, tokens will have the same compatibility requirements as
before where the protocol version must match exactly. But new tokens can
optionally have a different policy - e.g. handle anything from a protocol
version >= release-7.0.
One of the main features motivating "Stable Endpoints" is the ability to download a compatible libfdb_c from a coordinator.
### Changes to flow transport for Stable Endpoints
1. Well known endpoints must never change (this just makes it official)
2. The (initial) framing protocol must remain fixed. If we want to change the checksum, we can add a stable, well known endpoint that advertises what checksums are supported and use this to change the checksum after the connection has already been established.
3. Each endpoint can have a different compatibility policy: e.g. an endpoint can be marked as requiring at least `ProtocolVersion::withStableInterfaces()` like this:
```
ReplyPromise<ProtocolInfoReply> reply{ PeerCompatibilityPolicy{ RequirePeer::AtLeast,
ProtocolVersion::withStableInterfaces() } };
```
4. Well known endpoints no longer need to be added in a particular order. Instead you reserve the number of well known endpoints ahead of time and then you can add them in any order.

View File

@ -175,7 +175,7 @@ struct MutationFilesReadProgress : public ReferenceCounted<MutationFilesReadProg
int msgSize = bigEndian32(reader.consume<int>());
const uint8_t* message = reader.consume(msgSize);
ArenaReader rd(buf.arena(), StringRef(message, msgSize), AssumeVersion(currentProtocolVersion));
ArenaReader rd(buf.arena(), StringRef(message, msgSize), AssumeVersion(g_network->protocolVersion()));
MutationRef m;
rd >> m;
count++;
@ -433,7 +433,7 @@ ACTOR Future<Void> convert(ConvertParams params) {
state BackupDescription desc = wait(container->describeBackup());
std::cout << "\n" << desc.toString() << "\n";
// std::cout << "Using Protocol Version: 0x" << std::hex << currentProtocolVersion.version() << std::dec << "\n";
// std::cout << "Using Protocol Version: 0x" << std::hex << g_network->protocolVersion().version() << std::dec << "\n";
std::vector<LogFile> logs = getRelevantLogFiles(listing.logs, params.begin, params.end);
printLogFiles("Range has", logs);
@ -460,7 +460,7 @@ ACTOR Future<Void> convert(ConvertParams params) {
arena = Arena();
}
ArenaReader rd(data.arena, data.message, AssumeVersion(currentProtocolVersion));
ArenaReader rd(data.arena, data.message, AssumeVersion(g_network->protocolVersion()));
MutationRef m;
rd >> m;
std::cout << data.version.toString() << " m = " << m.toString() << "\n";

View File

@ -30,6 +30,11 @@
const int MAX_CLUSTER_FILE_BYTES = 60000;
constexpr UID WLTOKEN_CLIENTLEADERREG_GETLEADER(-1, 2);
constexpr UID WLTOKEN_CLIENTLEADERREG_OPENDATABASE(-1, 3);
constexpr UID WLTOKEN_PROTOCOL_INFO(-1, 10);
struct ClientLeaderRegInterface {
RequestStream< struct GetLeaderRequest > getLeader;
RequestStream< struct OpenDatabaseCoordRequest > openDatabase;
@ -186,4 +191,30 @@ public:
ClientCoordinators() {}
};
struct ProtocolInfoReply {
constexpr static FileIdentifier file_identifier = 7784298;
ProtocolVersion version;
template <class Ar>
void serialize(Ar& ar) {
uint64_t version_ = 0;
if (Ar::isSerializing) {
version_ = version.versionWithFlags();
}
serializer(ar, version_);
if (Ar::isDeserializing) {
version = ProtocolVersion(version_);
}
}
};
struct ProtocolInfoRequest {
constexpr static FileIdentifier file_identifier = 13261233;
ReplyPromise<ProtocolInfoReply> reply{ PeerCompatibilityPolicy{ RequirePeer::AtLeast,
ProtocolVersion::withStableInterfaces() } };
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, reply);
}
};
#endif

View File

@ -92,6 +92,7 @@ public:
virtual void selectApiVersion(int apiVersion) = 0;
virtual const char* getClientVersion() = 0;
virtual ThreadFuture<uint64_t> getServerProtocol(const char* clusterFilePath) = 0;
virtual void setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) = 0;
virtual void setupNetwork() = 0;

View File

@ -371,10 +371,6 @@ ClientCoordinators::ClientCoordinators( Key clusterKey, std::vector<NetworkAddre
ccf = makeReference<ClusterConnectionFile>(ClusterConnectionString(coordinators, clusterKey));
}
UID WLTOKEN_CLIENTLEADERREG_GETLEADER( -1, 2 );
UID WLTOKEN_CLIENTLEADERREG_OPENDATABASE( -1, 3 );
ClientLeaderRegInterface::ClientLeaderRegInterface( NetworkAddress remote )
: getLeader( Endpoint({remote}, WLTOKEN_CLIENTLEADERREG_GETLEADER) ),
openDatabase( Endpoint({remote}, WLTOKEN_CLIENTLEADERREG_OPENDATABASE) )

View File

@ -18,14 +18,17 @@
* limitations under the License.
*/
#include "fdbclient/CoordinationInterface.h"
#include "fdbclient/MultiVersionTransaction.h"
#include "fdbclient/MultiVersionAssignmentVars.h"
#include "fdbclient/ThreadSafeTransaction.h"
#include "flow/network.h"
#include "flow/Platform.h"
#include "flow/ProtocolVersion.h"
#include "flow/UnitTest.h"
#include "flow/actorcompiler.h" // This must be the last #include.
#include "flow/actorcompiler.h" // This must be the last #include.
void throwIfError(FdbCApi::fdb_error_t e) {
if(e) {
@ -343,6 +346,7 @@ void DLApi::init() {
headerVersion >= 700);
loadClientFunction(&api->futureGetInt64, lib, fdbCPath, headerVersion >= 620 ? "fdb_future_get_int64" : "fdb_future_get_version");
loadClientFunction(&api->futureGetUInt64, lib, fdbCPath, "fdb_future_get_uint64");
loadClientFunction(&api->futureGetError, lib, fdbCPath, "fdb_future_get_error");
loadClientFunction(&api->futureGetKey, lib, fdbCPath, "fdb_future_get_key");
loadClientFunction(&api->futureGetValue, lib, fdbCPath, "fdb_future_get_value");
@ -378,6 +382,11 @@ const char* DLApi::getClientVersion() {
return api->getClientVersion();
}
ThreadFuture<uint64_t> DLApi::getServerProtocol(const char *clusterFilePath) {
ASSERT(false);
return ThreadFuture<uint64_t>();
}
void DLApi::setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> value) {
throwIfError(api->setNetworkOption(option, value.present() ? value.get().begin() : nullptr, value.present() ? value.get().size() : 0));
}
@ -990,6 +999,11 @@ const char* MultiVersionApi::getClientVersion() {
return localClient->api->getClientVersion();
}
ThreadFuture<uint64_t> MultiVersionApi::getServerProtocol(const char *clusterFilePath) {
return api->localClient->api->getServerProtocol(clusterFilePath);
}
void validateOption(Optional<StringRef> value, bool canBePresent, bool canBeAbsent, bool canBeEmpty=true) {
ASSERT(canBePresent || canBeAbsent);

View File

@ -55,6 +55,7 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
//Network
fdb_error_t (*selectApiVersion)(int runtimeVersion, int headerVersion);
const char* (*getClientVersion)();
FDBFuture* (*getServerProtocol)(const char* clusterFilePath);
fdb_error_t (*setNetworkOption)(FDBNetworkOptions::Option option, uint8_t const *value, int valueLength);
fdb_error_t (*setupNetwork)();
fdb_error_t (*runNetwork)();
@ -107,6 +108,7 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
//Future
fdb_error_t (*futureGetDatabase)(FDBFuture *f, FDBDatabase **outDb);
fdb_error_t (*futureGetInt64)(FDBFuture *f, int64_t *outValue);
fdb_error_t (*futureGetUInt64)(FDBFuture *f, uint64_t *outValue);
fdb_error_t (*futureGetError)(FDBFuture *f);
fdb_error_t (*futureGetKey)(FDBFuture *f, uint8_t const **outKey, int *outKeyLength);
fdb_error_t (*futureGetValue)(FDBFuture *f, fdb_bool_t *outPresent, uint8_t const **outValue, int *outValueLength);
@ -204,6 +206,7 @@ public:
void selectApiVersion(int apiVersion) override;
const char* getClientVersion() override;
ThreadFuture<uint64_t> getServerProtocol(const char* clusterFilePath) override;
void setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;
void setupNetwork() override;
@ -381,6 +384,7 @@ class MultiVersionApi : public IClientApi {
public:
void selectApiVersion(int apiVersion) override;
const char* getClientVersion() override;
ThreadFuture<uint64_t> getServerProtocol(const char* clusterFilePath) override;
void setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;
void setupNetwork() override;

View File

@ -4200,6 +4200,37 @@ Future<Standalone<StringRef>> Transaction::getVersionstamp() {
return versionstampPromise.getFuture();
}
ACTOR Future<ProtocolVersion> coordinatorProtocolsFetcher(Reference<ClusterConnectionFile> f) {
state ClientCoordinators coord(f);
state vector<Future<ProtocolInfoReply>> coordProtocols;
coordProtocols.reserve(coord.clientLeaderServers.size());
for (int i = 0; i < coord.clientLeaderServers.size(); i++) {
RequestStream<ProtocolInfoRequest> requestStream{ Endpoint{
{ coord.clientLeaderServers[i].getLeader.getEndpoint().addresses }, WLTOKEN_PROTOCOL_INFO } };
coordProtocols.push_back(retryBrokenPromise(requestStream, ProtocolInfoRequest{}));
}
wait(smartQuorum(coordProtocols, coordProtocols.size() / 2 + 1, 1.5));
std::unordered_map<uint64_t, int> protocolCount;
for(int i = 0; i<coordProtocols.size(); i++) {
if(coordProtocols[i].isReady()) {
protocolCount[coordProtocols[i].get().version.version()]++;
}
}
uint64_t majorityProtocol = std::max_element(protocolCount.begin(), protocolCount.end(), [](const std::pair<uint64_t, int>& l, const std::pair<uint64_t, int>& r){
return l.second < r.second;
})->first;
return ProtocolVersion(majorityProtocol);
}
ACTOR Future<uint64_t> getCoordinatorProtocols(Reference<ClusterConnectionFile> f) {
ProtocolVersion protocolVersion = wait(coordinatorProtocolsFetcher(f));
return protocolVersion.version();
}
uint32_t Transaction::getSize() {
auto s = tr.transaction.mutations.expectedSize() + tr.transaction.read_conflict_ranges.expectedSize() +
tr.transaction.write_conflict_ranges.expectedSize();

View File

@ -284,6 +284,8 @@ public:
[[nodiscard]] Future<Standalone<StringRef>>
getVersionstamp(); // Will be fulfilled only after commit() returns success
Future<uint64_t> getProtocolVersion();
Promise<Standalone<StringRef>> versionstampPromise;
uint32_t getSize();
@ -361,6 +363,8 @@ ACTOR Future<Void> snapCreate(Database cx, Standalone<StringRef> snapCmd, UID sn
// Checks with Data Distributor that it is safe to mark all servers in exclusions as failed
ACTOR Future<bool> checkSafeExclusions(Database cx, vector<AddressExclusion> exclusions);
ACTOR Future<uint64_t> getCoordinatorProtocols(Reference<ClusterConnectionFile> f);
inline uint64_t getWriteOperationCost(uint64_t bytes) {
return bytes / std::max(1, CLIENT_KNOBS->WRITE_COST_BYTE_FACTOR) + 1;
}

View File

@ -786,7 +786,8 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
"coordinators":[
{
"reachable":true,
"address":"127.0.0.1:4701"
"address":"127.0.0.1:4701",
"protocol": "0fdb00b070010001"
}
],
"quorum_reachable":true

View File

@ -28,6 +28,7 @@
#include "fdbclient/json_spirit/json_spirit_reader_template.h"
#include "fdbrpc/genericactors.actor.h"
#include "flow/actorcompiler.h" // has to be last include
#include <cstdint>
json_spirit::mValue readJSONStrictly(const std::string &s) {
json_spirit::mValue val;
@ -292,7 +293,17 @@ ACTOR Future<Optional<StatusObject>> clientCoordinatorsStatusFetcher(Reference<C
for (int i = 0; i < coord.clientLeaderServers.size(); i++)
leaderServers.push_back(retryBrokenPromise(coord.clientLeaderServers[i].getLeader, GetLeaderRequest(coord.clusterKey, UID()), TaskPriority::CoordinationReply));
wait( smartQuorum(leaderServers, leaderServers.size() / 2 + 1, 1.5) || delay(2.0) );
state vector<Future<ProtocolInfoReply>> coordProtocols;
coordProtocols.reserve(coord.clientLeaderServers.size());
for (int i = 0; i < coord.clientLeaderServers.size(); i++) {
RequestStream<ProtocolInfoRequest> requestStream{ Endpoint{
{ coord.clientLeaderServers[i].getLeader.getEndpoint().addresses }, WLTOKEN_PROTOCOL_INFO } };
coordProtocols.push_back(retryBrokenPromise(requestStream, ProtocolInfoRequest{}));
}
wait(smartQuorum(leaderServers, leaderServers.size() / 2 + 1, 1.5) &&
smartQuorum(coordProtocols, coordProtocols.size() / 2 + 1, 1.5) ||
delay(2.0));
statusObj["quorum_reachable"] = *quorum_reachable = quorum(leaderServers, leaderServers.size() / 2 + 1).isReady();
@ -309,12 +320,17 @@ ACTOR Future<Optional<StatusObject>> clientCoordinatorsStatusFetcher(Reference<C
coordinatorsUnavailable++;
coordStatus["reachable"] = false;
}
if (coordProtocols[i].isReady()) {
uint64_t protocolVersionInt = coordProtocols[i].get().version.version();
std::stringstream hexSs;
hexSs << std::hex << std::setw(2*sizeof(protocolVersionInt)) << std::setfill('0') << protocolVersionInt;
coordStatus["protocol"] = hexSs.str();
}
coordsStatus.push_back(coordStatus);
}
statusObj["coordinators"] = coordsStatus;
*coordinatorsFaultTolerance = (leaderServers.size() - 1) / 2 - coordinatorsUnavailable;
return statusObj;
}
catch (Error &e){

View File

@ -22,6 +22,7 @@
#include "fdbclient/ReadYourWrites.h"
#include "fdbclient/DatabaseContext.h"
#include "fdbclient/versions.h"
#include "fdbclient/NativeAPI.actor.h"
// Users of ThreadSafeTransaction might share Reference<ThreadSafe...> between different threads as long as they don't call addRef (e.g. C API follows this).
// Therefore, it is unsafe to call (explicitly or implicitly) this->addRef in any of these functions.
@ -364,6 +365,15 @@ const char* ThreadSafeApi::getClientVersion() {
return clientVersion.c_str();
}
ThreadFuture<uint64_t> ThreadSafeApi::getServerProtocol(const char* clusterFilePath) {
auto [clusterFile, isDefault] = ClusterConnectionFile::lookupClusterFileName(std::string(clusterFilePath));
Reference<ClusterConnectionFile> f = Reference<ClusterConnectionFile>(new ClusterConnectionFile(clusterFile));
return onMainThread( [f]() -> Future< uint64_t > {
return getCoordinatorProtocols(f);
} );
}
void ThreadSafeApi::setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> value) {
if (option == FDBNetworkOptions::EXTERNAL_CLIENT_TRANSPORT_ID) {
if(value.present()) {

View File

@ -92,6 +92,8 @@ public:
Version getCommittedVersion() override;
ThreadFuture<int64_t> getApproximateSize() override;
ThreadFuture<uint64_t> getProtocolVersion() override;
void setOption( FDBTransactionOptions::Option option, Optional<StringRef> value = Optional<StringRef>() ) override;
ThreadFuture<Void> checkDeferredError();
@ -115,6 +117,7 @@ class ThreadSafeApi : public IClientApi, ThreadSafeReferenceCounted<ThreadSafeAp
public:
void selectApiVersion(int apiVersion);
const char* getClientVersion();
ThreadFuture<uint64_t> getServerProtocol(const char* clusterFilePath) override;
void setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> value = Optional<StringRef>());
void setupNetwork();

View File

@ -20,6 +20,7 @@
// Unit tests for the flow language and libraries
#include "flow/ProtocolVersion.h"
#include "flow/UnitTest.h"
#include "flow/DeterministicRandom.h"
#include "flow/IThreadPool.h"
@ -281,6 +282,9 @@ struct YieldMockNetwork final : INetwork, ReferenceCounted<YieldMockNetwork> {
static TLSConfig emptyConfig;
return emptyConfig;
}
ProtocolVersion protocolVersion() override {
return baseNetwork->protocolVersion();
}
};
struct NonserializableThing {};

View File

@ -18,8 +18,11 @@
* limitations under the License.
*/
#include "fdbclient/CoordinationInterface.h"
#include "fdbrpc/FlowTransport.h"
#include "flow/network.h"
#include <cstdint>
#include <unordered_map>
#if VALGRIND
#include <memcheck.h>
@ -38,19 +41,21 @@
#include "flow/TDMetric.actor.h"
#include "flow/ObjectSerializer.h"
#include "flow/ProtocolVersion.h"
#include "flow/UnitTest.h"
#include "flow/actorcompiler.h" // This must be the last #include.
static NetworkAddressList g_currentDeliveryPeerAddress = NetworkAddressList();
const UID WLTOKEN_ENDPOINT_NOT_FOUND(-1, 0);
const UID WLTOKEN_PING_PACKET(-1, 1);
const UID TOKEN_IGNORE_PACKET(0, 2);
constexpr UID WLTOKEN_ENDPOINT_NOT_FOUND(-1, 0);
constexpr UID WLTOKEN_PING_PACKET(-1, 1);
constexpr int PACKET_LEN_WIDTH = sizeof(uint32_t);
const uint64_t TOKEN_STREAM_FLAG = 1;
class EndpointMap : NonCopyable {
public:
EndpointMap();
// Reserve space for this many wellKnownEndpoints
explicit EndpointMap(int wellKnownEndpointCount);
void insertWellKnown(NetworkMessageReceiver* r, const Endpoint::Token& token, TaskPriority priority);
void insert( NetworkMessageReceiver* r, Endpoint::Token& token, TaskPriority priority );
const Endpoint& insert( NetworkAddressList localAddresses, std::vector<std::pair<FlowReceiver*, TaskPriority>> const& streams );
NetworkMessageReceiver* get( Endpoint::Token const& token );
@ -65,17 +70,16 @@ private:
uint64_t uid[2]; // priority packed into lower 32 bits; actual lower 32 bits of token are the index in data[]
uint32_t nextFree;
};
NetworkMessageReceiver* receiver;
NetworkMessageReceiver* receiver = nullptr;
Endpoint::Token& token() { return *(Endpoint::Token*)uid; }
};
int wellKnownEndpointCount;
std::vector<Entry> data;
uint32_t firstFree;
};
EndpointMap::EndpointMap()
: firstFree(-1)
{
}
EndpointMap::EndpointMap(int wellKnownEndpointCount)
: wellKnownEndpointCount(wellKnownEndpointCount), data(wellKnownEndpointCount), firstFree(-1) {}
void EndpointMap::realloc() {
int oldSize = data.size();
@ -88,6 +92,14 @@ void EndpointMap::realloc() {
firstFree = oldSize;
}
void EndpointMap::insertWellKnown(NetworkMessageReceiver* r, const Endpoint::Token& token, TaskPriority priority) {
int index = token.second();
ASSERT(data[index].receiver == nullptr);
data[index].receiver = r;
data[index].token() =
Endpoint::Token(token.first(), (token.second() & 0xffffffff00000000LL) | static_cast<uint32_t>(priority));
}
void EndpointMap::insert( NetworkMessageReceiver* r, Endpoint::Token& token, TaskPriority priority ) {
if (firstFree == uint32_t(-1)) realloc();
int index = firstFree;
@ -135,6 +147,9 @@ const Endpoint& EndpointMap::insert( NetworkAddressList localAddresses, std::vec
NetworkMessageReceiver* EndpointMap::get( Endpoint::Token const& token ) {
uint32_t index = token.second();
if (index < wellKnownEndpointCount && data[index].receiver == nullptr) {
TraceEvent(SevWarnAlways, "WellKnownEndpointNotAdded").detail("Token", token);
}
if ( index < data.size() && data[index].token().first() == token.first() && ((data[index].token().second()&0xffffffff00000000LL)|index)==token.second() )
return data[index].receiver;
return 0;
@ -147,9 +162,13 @@ TaskPriority EndpointMap::getPriority( Endpoint::Token const& token ) {
return TaskPriority::UnknownEndpoint;
}
void EndpointMap::remove( Endpoint::Token const& token, NetworkMessageReceiver* r ) {
void EndpointMap::remove(Endpoint::Token const& token, NetworkMessageReceiver* r) {
uint32_t index = token.second();
if ( index < data.size() && data[index].token().first() == token.first() && ((data[index].token().second()&0xffffffff00000000LL)|index)==token.second() && data[index].receiver == r ) {
if (index < wellKnownEndpointCount) {
data[index].receiver = nullptr;
} else if (index < data.size() && data[index].token().first() == token.first() &&
((data[index].token().second() & 0xffffffff00000000LL) | index) == token.second() &&
data[index].receiver == r) {
data[index].receiver = 0;
data[index].nextFree = firstFree;
firstFree = index;
@ -158,11 +177,9 @@ void EndpointMap::remove( Endpoint::Token const& token, NetworkMessageReceiver*
struct EndpointNotFoundReceiver final : NetworkMessageReceiver {
EndpointNotFoundReceiver(EndpointMap& endpoints) {
//endpoints[WLTOKEN_ENDPOINT_NOT_FOUND] = this;
Endpoint::Token e = WLTOKEN_ENDPOINT_NOT_FOUND;
endpoints.insert(this, e, TaskPriority::DefaultEndpoint);
ASSERT( e == WLTOKEN_ENDPOINT_NOT_FOUND );
endpoints.insertWellKnown(this, WLTOKEN_ENDPOINT_NOT_FOUND, TaskPriority::DefaultEndpoint);
}
void receive(ArenaObjectReader& reader) override {
// Remote machine tells us it doesn't have endpoint e
Endpoint e;
@ -173,9 +190,7 @@ struct EndpointNotFoundReceiver final : NetworkMessageReceiver {
struct PingReceiver final : NetworkMessageReceiver {
PingReceiver(EndpointMap& endpoints) {
Endpoint::Token e = WLTOKEN_PING_PACKET;
endpoints.insert(this, e, TaskPriority::ReadSocket);
ASSERT( e == WLTOKEN_PING_PACKET );
endpoints.insertWellKnown(this, WLTOKEN_PING_PACKET, TaskPriority::ReadSocket);
}
void receive(ArenaObjectReader& reader) override {
ReplyPromise<Void> reply;
@ -214,11 +229,9 @@ public:
Reference<AsyncVar<bool>> degraded;
bool warnAlwaysForLargePacket;
// These declarations must be in exactly this order
EndpointMap endpoints;
EndpointNotFoundReceiver endpointNotFoundReceiver;
PingReceiver pingReceiver;
// End ordered declarations
EndpointNotFoundReceiver endpointNotFoundReceiver{ endpoints };
PingReceiver pingReceiver{ endpoints };
Int64MetricHandle bytesSent;
Int64MetricHandle countPacketsReceived;
@ -294,7 +307,8 @@ ACTOR Future<Void> pingLatencyLogger(TransportData* self) {
}
TransportData::TransportData(uint64_t transportId)
: endpointNotFoundReceiver(endpoints),
: endpoints(/*wellKnownTokenCount*/ 11),
endpointNotFoundReceiver(endpoints),
pingReceiver(endpoints),
warnAlwaysForLargePacket(true),
lastIncompatibleMessage(0),
@ -770,7 +784,7 @@ void Peer::prependConnectPacket() {
}
pkt.connectPacketLength = sizeof(pkt) - sizeof(pkt.connectPacketLength);
pkt.protocolVersion = currentProtocolVersion;
pkt.protocolVersion = g_network->protocolVersion();
pkt.protocolVersion.addObjectSerializerFlag();
pkt.connectionId = transport->transportId;
@ -835,6 +849,15 @@ TransportData::~TransportData() {
}
}
static bool checkCompatible(const PeerCompatibilityPolicy& policy, ProtocolVersion version) {
switch (policy.requirement) {
case RequirePeer::Exactly:
return version.version() == policy.version.version();
case RequirePeer::AtLeast:
return version.version() >= policy.version.version();
}
}
ACTOR static void deliver(TransportData* self, Endpoint destination, ArenaReader reader, bool inReadSocket) {
TaskPriority priority = self->endpoints.getPriority(destination.token);
if (priority < TaskPriority::ReadSocket || !inReadSocket) {
@ -845,6 +868,9 @@ ACTOR static void deliver(TransportData* self, Endpoint destination, ArenaReader
auto receiver = self->endpoints.get(destination.token);
if (receiver) {
if (!checkCompatible(receiver->peerCompatibilityPolicy(), reader.protocolVersion())) {
return;
}
try {
g_currentDeliveryPeerAddress = destination.addresses;
StringRef data = reader.arenaReadAll();
@ -890,11 +916,11 @@ static void scanPackets(TransportData* transport, uint8_t*& unprocessed_begin, c
//Retrieve packet length and checksum
if (checksumEnabled) {
if (e-p < sizeof(uint32_t) * 2) break;
packetLen = *(uint32_t*)p; p += sizeof(uint32_t);
packetLen = *(uint32_t*)p; p += PACKET_LEN_WIDTH;
packetChecksum = *(uint32_t*)p; p += sizeof(uint32_t);
} else {
if (e-p < sizeof(uint32_t)) break;
packetLen = *(uint32_t*)p; p += sizeof(uint32_t);
packetLen = *(uint32_t*)p; p += PACKET_LEN_WIDTH;
}
if (packetLen > FLOW_KNOBS->PACKET_LIMIT) {
@ -945,7 +971,9 @@ static void scanPackets(TransportData* transport, uint8_t*& unprocessed_begin, c
#if VALGRIND
VALGRIND_CHECK_MEM_IS_DEFINED(p, packetLen);
#endif
ArenaReader reader(arena, StringRef(p, packetLen), AssumeVersion(currentProtocolVersion));
// remove object serializer flag to account for flat buffer
peerProtocolVersion.removeObjectSerializerFlag();
ArenaReader reader(arena, StringRef(p, packetLen), AssumeVersion(peerProtocolVersion));
UID token;
reader >> token;
@ -972,9 +1000,9 @@ static void scanPackets(TransportData* transport, uint8_t*& unprocessed_begin, c
// Given unprocessed buffer [begin, end), check if next packet size is known and return
// enough size for the next packet, whose format is: {size, optional_checksum, data} +
// next_packet_size.
static int getNewBufferSize(const uint8_t* begin, const uint8_t* end, const NetworkAddress& peerAddress) {
static int getNewBufferSize(const uint8_t* begin, const uint8_t* end, const NetworkAddress& peerAddress, ProtocolVersion peerProtocolVersion) {
const int len = end - begin;
if (len < sizeof(uint32_t)) {
if (len < PACKET_LEN_WIDTH) {
return FLOW_KNOBS->MIN_PACKET_BUFFER_BYTES;
}
const uint32_t packetLen = *(uint32_t*)begin;
@ -1017,7 +1045,7 @@ ACTOR static Future<Void> connectionReader(
if (readAllBytes < FLOW_KNOBS->MIN_PACKET_BUFFER_FREE_BYTES) {
Arena newArena;
const int unproc_len = unprocessed_end - unprocessed_begin;
const int len = getNewBufferSize(unprocessed_begin, unprocessed_end, peerAddress);
const int len = getNewBufferSize(unprocessed_begin, unprocessed_end, peerAddress, peerProtocolVersion);
uint8_t* const newBuffer = new (newArena) uint8_t[ len ];
if (unproc_len > 0) {
memcpy(newBuffer, unprocessed_begin, unproc_len);
@ -1056,8 +1084,8 @@ ACTOR static Future<Void> connectionReader(
uint64_t connectionId = pkt.connectionId;
if (!pkt.protocolVersion.hasObjectSerializerFlag() ||
!pkt.protocolVersion.isCompatible(currentProtocolVersion)) {
incompatibleProtocolVersionNewer = pkt.protocolVersion > currentProtocolVersion;
!pkt.protocolVersion.isCompatible(g_network->protocolVersion())) {
incompatibleProtocolVersionNewer = pkt.protocolVersion > g_network->protocolVersion();
NetworkAddress addr = pkt.canonicalRemotePort
? NetworkAddress(pkt.canonicalRemoteIp(), pkt.canonicalRemotePort)
: conn->getPeerAddress();
@ -1067,9 +1095,8 @@ ACTOR static Future<Void> connectionReader(
if(now() - transport->lastIncompatibleMessage > FLOW_KNOBS->CONNECTION_REJECTED_MESSAGE_DELAY) {
TraceEvent(SevWarn, "ConnectionRejected", conn->getDebugID())
.detail("Reason", "IncompatibleProtocolVersion")
.detail("LocalVersion", currentProtocolVersion.version())
.detail("LocalVersion", g_network->protocolVersion().version())
.detail("RejectedVersion", pkt.protocolVersion.version())
.detail("VersionMask", ProtocolVersion::compatibleProtocolVersionMask)
.detail("Peer", pkt.canonicalRemotePort ? NetworkAddress(pkt.canonicalRemoteIp(), pkt.canonicalRemotePort)
: conn->getPeerAddress())
.detail("ConnectionId", connectionId);
@ -1081,7 +1108,6 @@ ACTOR static Future<Void> connectionReader(
} else if(connectionId > 1) {
transport->multiVersionConnections[connectionId] = now() + FLOW_KNOBS->CONNECTION_ID_TIMEOUT;
}
compatible = false;
if(!protocolVersion.hasMultiVersionClient()) {
// Older versions expected us to hang up. It may work even if we don't hang up here, but it's safer to keep the old behavior.
@ -1133,7 +1159,7 @@ ACTOR static Future<Void> connectionReader(
}
}
}
if (compatible) {
if (compatible || peerProtocolVersion.hasStableInterfaces()) {
scanPackets( transport, unprocessed_begin, unprocessed_end, arena, peerAddress, peerProtocolVersion );
}
else if(!expectConnectPacket) {
@ -1364,10 +1390,8 @@ void FlowTransport::removeEndpoint( const Endpoint& endpoint, NetworkMessageRece
void FlowTransport::addWellKnownEndpoint( Endpoint& endpoint, NetworkMessageReceiver* receiver, TaskPriority taskID ) {
endpoint.addresses = self->localAddresses;
ASSERT( ((endpoint.token.first() & TOKEN_STREAM_FLAG)!=0) == receiver->isStream() );
Endpoint::Token otoken = endpoint.token;
self->endpoints.insert( receiver, endpoint.token, taskID );
ASSERT( endpoint.token == otoken );
ASSERT(receiver->isStream());
self->endpoints.insertWellKnown(receiver, endpoint.token, taskID);
}
static void sendLocal( TransportData* self, ISerializeSource const& what, const Endpoint& destination ) {
@ -1375,7 +1399,7 @@ static void sendLocal( TransportData* self, ISerializeSource const& what, const
// SOMEDAY: Would it be better to avoid (de)serialization by doing this check in flow?
Standalone<StringRef> copy;
ObjectWriter wr(AssumeVersion(currentProtocolVersion));
ObjectWriter wr(AssumeVersion(g_network->protocolVersion()));
what.serializeObjectWriter(wr);
copy = wr.toStringRef();
#if VALGRIND
@ -1383,7 +1407,7 @@ static void sendLocal( TransportData* self, ISerializeSource const& what, const
#endif
ASSERT(copy.size() > 0);
deliver(self, destination, ArenaReader(copy.arena(), copy, AssumeVersion(currentProtocolVersion)), false);
deliver(self, destination, ArenaReader(copy.arena(), copy, AssumeVersion(g_network->protocolVersion())), false);
}
static ReliablePacket* sendPacket(TransportData* self, Reference<Peer> peer, ISerializeSource const& what,
@ -1405,12 +1429,12 @@ static ReliablePacket* sendPacket(TransportData* self, Reference<Peer> peer, ISe
int prevBytesWritten = pb->bytes_written;
PacketBuffer* checksumPb = pb;
PacketWriter wr(pb,rp,AssumeVersion(currentProtocolVersion)); // SOMEDAY: Can we downgrade to talk to older peers?
PacketWriter wr(pb,rp,AssumeVersion(g_network->protocolVersion())); // SOMEDAY: Can we downgrade to talk to older peers?
// Reserve some space for packet length and checksum, write them after serializing data
SplitBuffer packetInfoBuffer;
uint32_t len, checksum = 0;
int packetInfoSize = sizeof(len);
int packetInfoSize = PACKET_LEN_WIDTH;
if (checksumEnabled) {
packetInfoSize += sizeof(checksum);
}

View File

@ -27,6 +27,7 @@
#include "flow/genericactors.actor.h"
#include "flow/network.h"
#include "flow/FileIdentifier.h"
#include "flow/ProtocolVersion.h"
#include "flow/Net2Packet.h"
#include "fdbrpc/ContinuousSample.h"
@ -116,11 +117,21 @@ namespace std
};
}
enum class RequirePeer { Exactly, AtLeast };
struct PeerCompatibilityPolicy {
RequirePeer requirement;
ProtocolVersion version;
};
class ArenaObjectReader;
class NetworkMessageReceiver {
public:
virtual void receive(ArenaObjectReader&) = 0;
virtual bool isStream() const { return false; }
virtual PeerCompatibilityPolicy peerCompatibilityPolicy() const {
return { RequirePeer::Exactly, g_network->protocolVersion() };
}
};
struct TransportData;

View File

@ -621,7 +621,7 @@ void showArena( ArenaBlock* a, ArenaBlock* parent) {
}
void arenaTest() {
BinaryWriter wr(AssumeVersion(currentProtocolVersion));
BinaryWriter wr(AssumeVersion(g_network->protocolVersion()));
{
Arena arena;
VectorRef<StringRef> test;
@ -639,7 +639,7 @@ void arenaTest() {
{
Arena arena2;
VectorRef<StringRef> test2;
BinaryReader reader(wr.getData(),wr.getLength(), AssumeVersion(currentProtocolVersion));
BinaryReader reader(wr.getData(),wr.getLength(), AssumeVersion(g_network->protocolVersion()));
reader >> test2 >> arena2;
for(auto i = test2.begin(); i != test2.end(); ++i)

View File

@ -66,6 +66,12 @@ struct FlowReceiver : public NetworkMessageReceiver {
endpoint = e;
}
void setPeerCompatibilityPolicy(const PeerCompatibilityPolicy& policy) { peerCompatibilityPolicy_ = policy; }
PeerCompatibilityPolicy peerCompatibilityPolicy() const override {
return peerCompatibilityPolicy_.orDefault(NetworkMessageReceiver::peerCompatibilityPolicy());
}
void makeWellKnownEndpoint(Endpoint::Token token, TaskPriority taskID) {
ASSERT(!endpoint.isValid());
m_isLocalEndpoint = true;
@ -74,6 +80,7 @@ struct FlowReceiver : public NetworkMessageReceiver {
}
private:
Optional<PeerCompatibilityPolicy> peerCompatibilityPolicy_;
Endpoint endpoint;
bool m_isLocalEndpoint;
bool m_stream;
@ -117,6 +124,9 @@ public:
bool isSet() { return sav->isSet(); }
bool isValid() const { return sav != nullptr; }
ReplyPromise() : sav(new NetSAV<T>(0, 1)) {}
explicit ReplyPromise(const PeerCompatibilityPolicy& policy) : ReplyPromise() {
sav->setPeerCompatibilityPolicy(policy);
}
ReplyPromise(const ReplyPromise& rhs) : sav(rhs.sav) { sav->addPromiseRef(); }
ReplyPromise(ReplyPromise&& rhs) noexcept : sav(rhs.sav) { rhs.sav = 0; }
~ReplyPromise() { if (sav) sav->delPromiseRef(); }
@ -354,6 +364,9 @@ public:
FutureStream<T> getFuture() const { queue->addFutureRef(); return FutureStream<T>(queue); }
RequestStream() : queue(new NetNotifiedQueue<T>(0, 1)) {}
explicit RequestStream(PeerCompatibilityPolicy policy) : RequestStream() {
queue->setPeerCompatibilityPolicy(policy);
}
RequestStream(const RequestStream& rhs) : queue(rhs.queue) { queue->addPromiseRef(); }
RequestStream(RequestStream&& rhs) noexcept : queue(rhs.queue) { rhs.queue = 0; }
void operator=(const RequestStream& rhs) {

View File

@ -27,6 +27,7 @@
#include "flow/ActorCollection.h"
#include "flow/IRandom.h"
#include "flow/IThreadPool.h"
#include "flow/ProtocolVersion.h"
#include "flow/Util.h"
#include "fdbrpc/IAsyncFile.h"
#include "fdbrpc/AsyncFileCached.actor.h"
@ -92,8 +93,6 @@ void ISimulator::displayWorkers() const
return;
}
const UID TOKEN_ENDPOINT_NOT_FOUND(-1, -1);
int openCount = 0;
struct SimClogging {
@ -999,8 +998,8 @@ public:
net2->run();
}
ProcessInfo* newProcess(const char* name, IPAddress ip, uint16_t port, bool sslEnabled, uint16_t listenPerProcess,
LocalityData locality, ProcessClass startingClass, const char* dataFolder,
const char* coordinationFolder) override {
LocalityData locality, ProcessClass startingClass, const char* dataFolder,
const char* coordinationFolder, ProtocolVersion protocol) override {
ASSERT( locality.machineId().present() );
MachineInfo& machine = machines[ locality.machineId().get() ];
if (!machine.machineId.present())
@ -1043,6 +1042,7 @@ public:
currentlyRebootingProcesses.erase(addresses.address);
m->excluded = g_simulator.isExcluded(NetworkAddress(ip, port, true, false));
m->cleared = g_simulator.isCleared(addresses.address);
m->protocolVersion = protocol;
m->setGlobal(enTDMetrics, (flowGlobalType) &m->tdmetrics);
m->setGlobal(enNetworkConnections, (flowGlobalType) m->network);
@ -1708,6 +1708,10 @@ public:
return Void();
return delay( 0, taskID, process->machine->machineProcess );
}
ProtocolVersion protocolVersion() override {
return getCurrentProcess()->protocolVersion;
}
//time is guarded by ISimulator::mutex. It is not necessary to guard reads on the main thread because
//time should only be modified from the main thread.

View File

@ -20,6 +20,7 @@
#ifndef FLOW_SIMULATOR_H
#define FLOW_SIMULATOR_H
#include "flow/ProtocolVersion.h"
#pragma once
#include "flow/flow.h"
@ -71,6 +72,8 @@ public:
UID uid;
ProtocolVersion protocolVersion;
ProcessInfo(const char* name, LocalityData locality, ProcessClass startingClass, NetworkAddressList addresses,
INetworkConnections* net, const char* dataFolder, const char* coordinationFolder)
: name(name), locality(locality), startingClass(startingClass), addresses(addresses),
@ -162,7 +165,7 @@ public:
virtual ProcessInfo* newProcess(const char* name, IPAddress ip, uint16_t port, bool sslEnabled, uint16_t listenPerProcess,
LocalityData locality, ProcessClass startingClass, const char* dataFolder,
const char* coordinationFolder) = 0;
const char* coordinationFolder, ProtocolVersion protocol) = 0;
virtual void killProcess( ProcessInfo* machine, KillType ) = 0;
virtual void rebootProcess(Optional<Standalone<StringRef>> zoneId, bool allProcesses ) = 0;
virtual void rebootProcess( ProcessInfo* process, KillType kt ) = 0;
@ -176,6 +179,8 @@ public:
virtual bool datacenterDead(Optional<Standalone<StringRef>> dcId) const = 0;
virtual void displayWorkers() const;
virtual ProtocolVersion protocolVersion() = 0;
virtual void addRole(NetworkAddress const& address, std::string const& role) {
roleAddresses[address][role] ++;
TraceEvent("RoleAdd").detail("Address", address).detail("Role", role).detail("NumRoles", roleAddresses[address].size()).detail("Value", roleAddresses[address][role]);
@ -327,6 +332,9 @@ public:
BackupAgentType backupAgents;
BackupAgentType drAgents;
bool hasDiffProtocolProcess; // true if simulator is testing a process with a different version
bool setDiffProtocol; // true if a process with a different protocol version has been started
virtual flowGlobalType global(int id) const { return getCurrentProcess()->global(id); };
virtual void setGlobal(size_t id, flowGlobalType v) { getCurrentProcess()->setGlobal(id,v); };

View File

@ -59,7 +59,7 @@ struct VersionedMessage {
}
}
ArenaReader reader(arena, message, AssumeVersion(currentProtocolVersion));
ArenaReader reader(arena, message, AssumeVersion(g_network->protocolVersion()));
// Return false for LogProtocolMessage and SpanContextMessage metadata messages.
if (LogProtocolMessage::isNextIn(reader)) return false;
@ -756,7 +756,7 @@ ACTOR Future<Void> saveMutationsToFile(BackupData* self, Version popVersion, int
const auto& subrange = range.range();
intersectionRange = mutationRange & subrange;
MutationRef subm(MutationRef::Type::ClearRange, intersectionRange.begin, intersectionRange.end);
BinaryWriter wr(AssumeVersion(currentProtocolVersion));
BinaryWriter wr(AssumeVersion(g_network->protocolVersion()));
wr << subm;
mutations.push_back(wr.toValue());
for (int index : range.value()) {

View File

@ -178,6 +178,7 @@ set(FDBSERVER_SRCS
workloads/Performance.actor.cpp
workloads/Ping.actor.cpp
workloads/PopulateTPCC.actor.cpp
workloads/ProtocolVersion.actor.cpp
workloads/PubSubMultiples.actor.cpp
workloads/QueuePush.actor.cpp
workloads/RandomClogging.actor.cpp

View File

@ -2883,7 +2883,7 @@ ACTOR Future<Void> dbInfoUpdater( ClusterControllerData* self ) {
dbInfoChange = self->db.serverInfo->onChange();
updateDBInfo = self->updateDBInfo.onTrigger();
req.serializedDbInfo = BinaryWriter::toValue(self->db.serverInfo->get(), AssumeVersion(currentProtocolVersion));
req.serializedDbInfo = BinaryWriter::toValue(self->db.serverInfo->get(), AssumeVersion(g_network->protocolVersion()));
TraceEvent("DBInfoStartBroadcast", self->id);
choose {

View File

@ -24,10 +24,13 @@
#include "fdbserver/WorkerInterface.actor.h"
#include "fdbserver/Status.h"
#include "flow/ActorCollection.h"
#include "flow/ProtocolVersion.h"
#include "flow/UnitTest.h"
#include "flow/IndexedSet.h"
#include "fdbclient/MonitorLeader.h"
#include "flow/actorcompiler.h" // This must be the last #include.
#include "flow/network.h"
#include <cstdint>
// This module implements coordinationServer() and the interfaces in CoordinationInterface.h
@ -42,17 +45,6 @@ struct GenerationRegVal {
}
};
// The order of UIDs here must match the order in which makeWellKnownEndpoint is called.
// UID WLTOKEN_CLIENTLEADERREG_GETLEADER( -1, 2 ); // from fdbclient/MonitorLeader.actor.cpp
// UID WLTOKEN_CLIENTLEADERREG_OPENDATABASE( -1, 3 ); // from fdbclient/MonitorLeader.actor.cpp
UID WLTOKEN_LEADERELECTIONREG_CANDIDACY( -1, 4 );
UID WLTOKEN_LEADERELECTIONREG_ELECTIONRESULT( -1, 5 );
UID WLTOKEN_LEADERELECTIONREG_LEADERHEARTBEAT( -1, 6 );
UID WLTOKEN_LEADERELECTIONREG_FORWARD( -1, 7 );
UID WLTOKEN_GENERATIONREG_READ( -1, 8 );
UID WLTOKEN_GENERATIONREG_WRITE( -1, 9 );
GenerationRegInterface::GenerationRegInterface( NetworkAddress remote )
: read( Endpoint({remote}, WLTOKEN_GENERATIONREG_READ) ),
write( Endpoint({remote}, WLTOKEN_GENERATIONREG_WRITE) )

View File

@ -24,6 +24,13 @@
#include "fdbclient/CoordinationInterface.h"
constexpr UID WLTOKEN_LEADERELECTIONREG_CANDIDACY(-1, 4);
constexpr UID WLTOKEN_LEADERELECTIONREG_ELECTIONRESULT(-1, 5);
constexpr UID WLTOKEN_LEADERELECTIONREG_LEADERHEARTBEAT(-1, 6);
constexpr UID WLTOKEN_LEADERELECTIONREG_FORWARD(-1, 7);
constexpr UID WLTOKEN_GENERATIONREG_READ(-1, 8);
constexpr UID WLTOKEN_GENERATIONREG_WRITE(-1, 9);
struct GenerationRegInterface {
constexpr static FileIdentifier file_identifier = 16726744;
RequestStream< struct GenerationRegReadRequest > read;

View File

@ -849,7 +849,7 @@ struct LogPushData : NonCopyable {
for(auto& log : logSystem->getLogSystemConfig().tLogs) {
if(log.isLocal) {
for(int i = 0; i < log.tLogs.size(); i++) {
messagesWriter.push_back( BinaryWriter( AssumeVersion(currentProtocolVersion) ) );
messagesWriter.push_back( BinaryWriter( AssumeVersion(g_network->protocolVersion()) ) );
}
}
}
@ -916,7 +916,7 @@ struct LogPushData : NonCopyable {
msg_locations.clear();
logSystem->getPushLocations(prev_tags, msg_locations, allLocations);
BinaryWriter bw(AssumeVersion(currentProtocolVersion));
BinaryWriter bw(AssumeVersion(g_network->protocolVersion()));
// Metadata messages (currently LogProtocolMessage is the only metadata
// message) should be written before span information. If this isn't a

View File

@ -342,7 +342,7 @@ ACTOR Future<Void> updateMetricRegistration(Database cx, MetricsConfig *config,
loop {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
try {
Value timestamp = BinaryWriter::toValue(CompressedInt<int64_t>(now()), AssumeVersion(currentProtocolVersion));
Value timestamp = BinaryWriter::toValue(CompressedInt<int64_t>(now()), AssumeVersion(g_network->protocolVersion()));
for(auto &key : keys) {
//fprintf(stderr, "%s: register: %s\n", collection->address.toString().c_str(), printable(key).c_str());
tr.set(key, timestamp);

View File

@ -51,7 +51,7 @@ TraceEvent debugKeyRangeEnabled( const char* context, Version version, KeyRangeR
}
TraceEvent debugTagsAndMessageEnabled( const char* context, Version version, StringRef commitBlob ) {
BinaryReader rdr(commitBlob, AssumeVersion(currentProtocolVersion));
BinaryReader rdr(commitBlob, AssumeVersion(g_network->protocolVersion()));
while (!rdr.empty()) {
if (*(int32_t*)rdr.peekBytes(4) == VERSION_HEADER) {
int32_t dummy;

View File

@ -1371,7 +1371,7 @@ void peekMessagesFromMemory( Reference<LogData> self, TLogPeekRequest const& req
ACTOR Future<std::vector<StringRef>> parseMessagesForTag( StringRef commitBlob, Tag tag, int logRouters ) {
// See the comment in LogSystem.cpp for the binary format of commitBlob.
state std::vector<StringRef> relevantMessages;
state BinaryReader rd(commitBlob, AssumeVersion(currentProtocolVersion));
state BinaryReader rd(commitBlob, AssumeVersion(g_network->protocolVersion()));
while (!rd.empty()) {
TagsAndMessage tagsAndMessage;
tagsAndMessage.loadFromArena(&rd, nullptr);
@ -2753,7 +2753,7 @@ ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, Localit
bool recovering = (req.recoverFrom.logSystemType == LogSystemType::tagPartitioned);
state Reference<LogData> logData = Reference<LogData>( new LogData(self, recruited, req.remoteTag, req.isPrimary, req.logRouterTags, req.txsTags, req.recruitmentID, currentProtocolVersion, req.allTags, recovering ? "Recovered" : "Recruited") );
state Reference<LogData> logData = Reference<LogData>( new LogData(self, recruited, req.remoteTag, req.isPrimary, req.logRouterTags, req.txsTags, req.recruitmentID, g_network->protocolVersion(), req.allTags, recovering ? "Recovered" : "Recruited") );
self->id_data[recruited.id()] = logData;
logData->locality = req.locality;
logData->recoveryCount = req.epoch;

View File

@ -397,7 +397,7 @@ ACTOR static Future<Void> _parsePartitionedLogFileOnLoader(
// only one clear mutation is generated (i.e., always inserted).
ASSERT(inserted);
ArenaReader rd(buf.arena(), StringRef(message, msgSize), AssumeVersion(currentProtocolVersion));
ArenaReader rd(buf.arena(), StringRef(message, msgSize), AssumeVersion(g_network->protocolVersion()));
MutationRef mutation;
rd >> mutation;

View File

@ -18,8 +18,10 @@
* limitations under the License.
*/
#include <cstdint>
#include <fstream>
#include <ostream>
#include "fdbrpc/Locality.h"
#include "fdbrpc/simulator.h"
#include "fdbclient/DatabaseContext.h"
#include "fdbserver/TesterInterface.actor.h"
@ -33,7 +35,9 @@
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/BackupAgent.actor.h"
#include "fdbclient/versions.h"
#include "flow/ProtocolVersion.h"
#include "flow/actorcompiler.h" // This must be the last #include.
#include "flow/network.h"
#undef max
#undef min
@ -47,9 +51,9 @@ bool destructed = false;
template <class T>
T simulate( const T& in ) {
BinaryWriter writer(AssumeVersion(currentProtocolVersion));
BinaryWriter writer(AssumeVersion(g_network->protocolVersion()));
writer << in;
BinaryReader reader( writer.getData(), writer.getLength(), AssumeVersion(currentProtocolVersion) );
BinaryReader reader( writer.getData(), writer.getLength(), AssumeVersion(g_network->protocolVersion()) );
T out;
reader >> out;
return out;
@ -137,7 +141,7 @@ ACTOR Future<ISimulator::KillType> simulatedFDBDRebooter(Reference<ClusterConnec
std::string* dataFolder, std::string* coordFolder,
std::string baseFolder, ClusterConnectionString connStr,
bool useSeedFile, AgentMode runBackupAgents,
std::string whitelistBinPaths) {
std::string whitelistBinPaths, ProtocolVersion protocolVersion) {
state ISimulator::ProcessInfo *simProcess = g_simulator.getCurrentProcess();
state UID randomId = nondeterministicRandom()->randomUniqueID();
state int cycles = 0;
@ -154,7 +158,7 @@ ACTOR Future<ISimulator::KillType> simulatedFDBDRebooter(Reference<ClusterConnec
state ISimulator::ProcessInfo* process =
g_simulator.newProcess("Server", ip, port, sslEnabled, listenPerProcess, localities, processClass, dataFolder->c_str(),
coordFolder->c_str());
coordFolder->c_str(), protocolVersion);
wait(g_simulator.onProcess(process,
TaskPriority::DefaultYield)); // Now switch execution to the process on which we will run
state Future<ISimulator::KillType> onShutdown = process->onShutdown();
@ -298,7 +302,7 @@ std::map< Optional<Standalone<StringRef>>, std::vector< std::vector< std::string
// process count is no longer needed because it is now the length of the vector of ip's, because it was one ip per process
ACTOR Future<Void> simulatedMachine(ClusterConnectionString connStr, std::vector<IPAddress> ips, bool sslEnabled, LocalityData localities,
ProcessClass processClass, std::string baseFolder, bool restarting,
bool useSeedFile, AgentMode runBackupAgents, bool sslOnly, std::string whitelistBinPaths) {
bool useSeedFile, AgentMode runBackupAgents, bool sslOnly, std::string whitelistBinPaths, ProtocolVersion protocolVersion) {
state int bootCount = 0;
state std::vector<std::string> myFolders;
state std::vector<std::string> coordFolders;
@ -341,7 +345,13 @@ ACTOR Future<Void> simulatedMachine(ClusterConnectionString connStr, std::vector
Reference<ClusterConnectionFile> clusterFile(useSeedFile ? new ClusterConnectionFile(path, connStr.toString()) : new ClusterConnectionFile(path));
const int listenPort = i*listenPerProcess + 1;
AgentMode agentMode = runBackupAgents == AgentOnly ? ( i == ips.size()-1 ? AgentOnly : AgentNone ) : runBackupAgents;
processes.push_back(simulatedFDBDRebooter(clusterFile, ips[i], sslEnabled, listenPort, listenPerProcess, localities, processClass, &myFolders[i], &coordFolders[i], baseFolder, connStr, useSeedFile, agentMode, whitelistBinPaths));
if(g_simulator.hasDiffProtocolProcess && !g_simulator.setDiffProtocol && agentMode == AgentNone) {
processes.push_back(simulatedFDBDRebooter(clusterFile, ips[i], sslEnabled, listenPort, listenPerProcess, localities, processClass, &myFolders[i], &coordFolders[i], baseFolder, connStr, useSeedFile, agentMode, whitelistBinPaths, protocolVersion));
g_simulator.setDiffProtocol = true;
}
else {
processes.push_back(simulatedFDBDRebooter(clusterFile, ips[i], sslEnabled, listenPort, listenPerProcess, localities, processClass, &myFolders[i], &coordFolders[i], baseFolder, connStr, useSeedFile, agentMode, whitelistBinPaths, g_network->protocolVersion()));
}
TraceEvent("SimulatedMachineProcess", randomId).detail("Address", NetworkAddress(ips[i], listenPort, true, false)).detail("ZoneId", localities.zoneId()).detail("DataHall", localities.dataHallId()).detail("Folder", myFolders[i]);
}
@ -546,7 +556,7 @@ IPAddress makeIPAddressForSim(bool isIPv6, std::array<int, 4> parts) {
ACTOR Future<Void> restartSimulatedSystem(vector<Future<Void>>* systemActors, std::string baseFolder, int* pTesterCount,
Optional<ClusterConnectionString>* pConnString,
Standalone<StringRef>* pStartingConfiguration,
int extraDB, std::string whitelistBinPaths) {
int extraDB, std::string whitelistBinPaths, ProtocolVersion protocolVersion) {
CSimpleIni ini;
ini.SetUnicode();
ini.LoadFile(joinPath(baseFolder, "restartInfo.ini").c_str());
@ -645,7 +655,7 @@ ACTOR Future<Void> restartSimulatedSystem(vector<Future<Void>>* systemActors, st
simulatedMachine(conn, ipAddrs, usingSSL, localities, processClass, baseFolder, true,
i == useSeedForMachine, AgentAddition,
usingSSL && (listenersPerProcess == 1 || processClass == ProcessClass::TesterClass),
whitelistBinPaths),
whitelistBinPaths, protocolVersion),
processClass == ProcessClass::TesterClass ? "SimulatedTesterMachine" : "SimulatedMachine"));
}
@ -1052,8 +1062,8 @@ void SimulationConfig::generateNormalConfig(int minimumReplication, int minimumR
void setupSimulatedSystem(vector<Future<Void>>* systemActors, std::string baseFolder, int* pTesterCount,
Optional<ClusterConnectionString>* pConnString, Standalone<StringRef>* pStartingConfiguration,
int extraDB, int minimumReplication, int minimumRegions, std::string whitelistBinPaths,
bool configureLocked, int logAntiQuorum) {
int extraDB, int minimumReplication, int minimumRegions, std::string whitelistBinPaths, bool configureLocked,
int logAntiQuorum, ProtocolVersion protocolVersion) {
// SOMEDAY: this does not test multi-interface configurations
SimulationConfig simconfig(extraDB, minimumReplication, minimumRegions);
if (logAntiQuorum != -1) {
@ -1218,6 +1228,7 @@ void setupSimulatedSystem(vector<Future<Void>>* systemActors, std::string baseFo
bool requiresExtraDBMachines = extraDB && g_simulator.extraDB->toString() != conn.toString();
int assignedMachines = 0, nonVersatileMachines = 0;
std::vector<ProcessClass::ClassType> processClassesSubSet = {ProcessClass::UnsetClass, ProcessClass::ResolutionClass, ProcessClass::MasterClass};
for( int dc = 0; dc < dataCenters; dc++ ) {
//FIXME: test unset dcID
Optional<Standalone<StringRef>> dcUID = StringRef(format("%d", dc));
@ -1275,7 +1286,7 @@ void setupSimulatedSystem(vector<Future<Void>>* systemActors, std::string baseFo
LocalityData localities(Optional<Standalone<StringRef>>(), zoneId, machineId, dcUID);
localities.set(LiteralStringRef("data_hall"), dcUID);
systemActors->push_back(reportErrors(simulatedMachine(conn, ips, sslEnabled,
localities, processClass, baseFolder, false, machine == useSeedForMachine, requiresExtraDBMachines ? AgentOnly : AgentAddition, sslOnly, whitelistBinPaths ), "SimulatedMachine"));
localities, processClass, baseFolder, false, machine == useSeedForMachine, requiresExtraDBMachines ? AgentOnly : AgentAddition, sslOnly, whitelistBinPaths, protocolVersion ), "SimulatedMachine"));
if (requiresExtraDBMachines) {
std::vector<IPAddress> extraIps;
@ -1289,7 +1300,7 @@ void setupSimulatedSystem(vector<Future<Void>>* systemActors, std::string baseFo
localities.set(LiteralStringRef("data_hall"), dcUID);
systemActors->push_back(reportErrors(simulatedMachine(*g_simulator.extraDB, extraIps, sslEnabled,
localities,
processClass, baseFolder, false, machine == useSeedForMachine, AgentNone, sslOnly, whitelistBinPaths ), "SimulatedMachine"));
processClass, baseFolder, false, machine == useSeedForMachine, AgentNone, sslOnly, whitelistBinPaths, protocolVersion ), "SimulatedMachine"));
}
assignedMachines++;
@ -1313,13 +1324,18 @@ void setupSimulatedSystem(vector<Future<Void>>* systemActors, std::string baseFo
std::vector<IPAddress> ips;
ips.push_back(makeIPAddressForSim(useIPv6, { 3, 4, 3, i + 1 }));
Standalone<StringRef> newZoneId = Standalone<StringRef>(deterministicRandom()->randomUniqueID().toString());
LocalityData localities(Optional<Standalone<StringRef>>(), newZoneId, newZoneId, Optional<Standalone<StringRef>>());
LocalityData localities(Optional<Standalone<StringRef>>(), newZoneId, newZoneId, Optional<Standalone<StringRef>>());
systemActors->push_back( reportErrors( simulatedMachine(
conn, ips, sslEnabled && sslOnly,
localities, ProcessClass(ProcessClass::TesterClass, ProcessClass::CommandLineSource),
baseFolder, false, i == useSeedForMachine, AgentNone, sslEnabled && sslOnly, whitelistBinPaths ),
baseFolder, false, i == useSeedForMachine, AgentNone, sslEnabled && sslOnly, whitelistBinPaths, protocolVersion ),
"SimulatedTesterMachine") );
}
if(g_simulator.setDiffProtocol) {
--(*pTesterCount);
}
*pStartingConfiguration = startingConfigString;
// save some state that we only need when restarting the simulator.
@ -1337,7 +1353,7 @@ void setupSimulatedSystem(vector<Future<Void>>* systemActors, std::string baseFo
}
void checkTestConf(const char* testFile, int& extraDB, int& minimumReplication, int& minimumRegions,
int& configureLocked, int& logAntiQuorum) {
int& configureLocked, int& logAntiQuorum, bool& startIncompatibleProcess) {
std::ifstream ifs;
ifs.open(testFile, std::ifstream::in);
if (!ifs.good())
@ -1371,7 +1387,11 @@ void checkTestConf(const char* testFile, int& extraDB, int& minimumReplication,
}
if (attrib == "configureLocked") {
sscanf(value.c_str(), "%d", &configureLocked);
sscanf( value.c_str(), "%d", &configureLocked );
}
if (attrib == "startIncompatibleProcess") {
startIncompatibleProcess = strcmp(value.c_str(), "true") == 0;
}
if (attrib == "logAntiQuorum") {
sscanf(value.c_str(), "%d", &logAntiQuorum);
@ -1391,7 +1411,17 @@ ACTOR void setupAndRun(std::string dataFolder, const char *testFile, bool reboot
state int minimumRegions = 0;
state int configureLocked = 0;
state int logAntiQuorum = -1;
checkTestConf(testFile, extraDB, minimumReplication, minimumRegions, configureLocked, logAntiQuorum);
state bool startIncompatibleProcess = false;
checkTestConf(testFile, extraDB, minimumReplication, minimumRegions, configureLocked, logAntiQuorum, startIncompatibleProcess);
g_simulator.hasDiffProtocolProcess = startIncompatibleProcess;
g_simulator.setDiffProtocol = false;
state ProtocolVersion protocolVersion = currentProtocolVersion;
if(startIncompatibleProcess) {
// isolates right most 1 bit of compatibleProtocolVersionMask to make this protocolVersion incompatible
uint64_t minAddToMakeIncompatible = ProtocolVersion::compatibleProtocolVersionMask & ~(ProtocolVersion::compatibleProtocolVersionMask-1);
protocolVersion = ProtocolVersion(currentProtocolVersion.version() + minAddToMakeIncompatible);
}
// TODO (IPv6) Use IPv6?
wait(g_simulator.onProcess(
@ -1400,7 +1430,7 @@ ACTOR void setupAndRun(std::string dataFolder, const char *testFile, bool reboot
Standalone<StringRef>(deterministicRandom()->randomUniqueID().toString()),
Standalone<StringRef>(deterministicRandom()->randomUniqueID().toString()),
Optional<Standalone<StringRef>>()),
ProcessClass(ProcessClass::TesterClass, ProcessClass::CommandLineSource), "", ""),
ProcessClass(ProcessClass::TesterClass, ProcessClass::CommandLineSource), "", "", currentProtocolVersion),
TaskPriority::DefaultYield));
Sim2FileSystem::newFileSystem();
FlowTransport::createInstance(true, 1);
@ -1409,7 +1439,7 @@ ACTOR void setupAndRun(std::string dataFolder, const char *testFile, bool reboot
try {
//systemActors.push_back( startSystemMonitor(dataFolder) );
if (rebooting) {
wait( timeoutError( restartSimulatedSystem( &systemActors, dataFolder, &testerCount, &connFile, &startingConfiguration, extraDB, whitelistBinPaths), 100.0 ) );
wait( timeoutError( restartSimulatedSystem( &systemActors, dataFolder, &testerCount, &connFile, &startingConfiguration, extraDB, whitelistBinPaths, protocolVersion), 100.0 ) );
// FIXME: snapshot restore does not support multi-region restore, hence restore it as single region always
if (restoring) {
startingConfiguration = LiteralStringRef("usable_regions=1");
@ -1418,7 +1448,7 @@ ACTOR void setupAndRun(std::string dataFolder, const char *testFile, bool reboot
else {
g_expect_full_pointermap = 1;
setupSimulatedSystem(&systemActors, dataFolder, &testerCount, &connFile, &startingConfiguration, extraDB,
minimumReplication, minimumRegions, whitelistBinPaths, configureLocked, logAntiQuorum);
minimumReplication, minimumRegions, whitelistBinPaths, configureLocked, logAntiQuorum, protocolVersion);
wait( delay(1.0) ); // FIXME: WHY!!! //wait for machines to boot
}
std::string clusterFileDir = joinPath( dataFolder, deterministicRandom()->randomUniqueID().toString() );

View File

@ -2518,7 +2518,7 @@ ACTOR Future<StatusReply> clusterGetStatus(
state JsonBuilderObject qos;
state JsonBuilderObject data_overlay;
statusObj["protocol_version"] = format("%" PRIx64, currentProtocolVersion.version());
statusObj["protocol_version"] = format("%" PRIx64, g_network->protocolVersion().version());
statusObj["connection_string"] = coordinators.ccf->getConnectionString().toString();
state Optional<DatabaseConfiguration> configuration;

View File

@ -1752,7 +1752,7 @@ ACTOR Future<Void> pullAsyncData( StorageCacheData *data ) {
//TODO cache servers should write the LogProtocolMessage when they are created
//cloneCursor1->setProtocolVersion(data->logProtocol);
cloneCursor1->setProtocolVersion(currentProtocolVersion);
cloneCursor1->setProtocolVersion(g_network->protocolVersion());
for (; cloneCursor1->hasMessage(); cloneCursor1->nextMessage()) {
ArenaReader& cloneReader = *cloneCursor1->reader();
@ -1820,7 +1820,7 @@ ACTOR Future<Void> pullAsyncData( StorageCacheData *data ) {
//FIXME: ensure this can only read data from the current version
//cloneCursor2->setProtocolVersion(data->logProtocol);
cloneCursor2->setProtocolVersion(currentProtocolVersion);
cloneCursor2->setProtocolVersion(g_network->protocolVersion());
ver = invalidVersion;
// Now process the mutations

View File

@ -1431,7 +1431,7 @@ void peekMessagesFromMemory( Reference<LogData> self, TLogPeekRequest const& req
ACTOR Future<std::vector<StringRef>> parseMessagesForTag( StringRef commitBlob, Tag tag, int logRouters ) {
// See the comment in LogSystem.cpp for the binary format of commitBlob.
state std::vector<StringRef> relevantMessages;
state BinaryReader rd(commitBlob, AssumeVersion(currentProtocolVersion));
state BinaryReader rd(commitBlob, AssumeVersion(g_network->protocolVersion()));
while (!rd.empty()) {
TagsAndMessage tagsAndMessage;
tagsAndMessage.loadFromArena(&rd, nullptr);
@ -2834,7 +2834,7 @@ ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, Localit
bool recovering = (req.recoverFrom.logSystemType == LogSystemType::tagPartitioned);
state Reference<LogData> logData = makeReference<LogData>(
self, recruited, req.remoteTag, req.isPrimary, req.logRouterTags, req.txsTags, req.recruitmentID,
currentProtocolVersion, req.spillType, req.allTags, recovering ? "Recovered" : "Recruited");
g_network->protocolVersion(), req.spillType, req.allTags, recovering ? "Recovered" : "Recruited");
self->id_data[recruited.id()] = logData;
logData->locality = req.locality;
logData->recoveryCount = req.epoch;

View File

@ -60,6 +60,7 @@
#include "fdbserver/workloads/workloads.actor.h"
#include "flow/DeterministicRandom.h"
#include "flow/Platform.h"
#include "flow/ProtocolVersion.h"
#include "flow/SimpleOpt.h"
#include "flow/SystemMonitor.h"
#include "flow/TLSConfig.actor.h"

View File

@ -908,6 +908,9 @@ std::map<std::string, std::function<void(const std::string&)>> testSpecGlobalKey
// else { } It is enable by default for tester
TraceEvent("TestParserTest").detail("ClientInfoLogging", value);
}},
{"startIncompatibleProcess", [](const std::string& value) {
TraceEvent("TestParserTest").detail("ParsedStartIncompatibleProcess", value);
}}
};
std::map<std::string, std::function<void(const std::string& value, TestSpec* spec)>> testSpecTestKeys = {

View File

@ -25,6 +25,7 @@
#include "fdbclient/StorageServerInterface.h"
#include "fdbserver/Knobs.h"
#include "flow/ActorCollection.h"
#include "flow/ProtocolVersion.h"
#include "flow/SystemMonitor.h"
#include "flow/TDMetric.actor.h"
#include "fdbrpc/simulator.h"
@ -46,6 +47,7 @@
#include "flow/Profiler.h"
#include "flow/ThreadHelper.actor.h"
#include "flow/Trace.h"
#include "flow/network.h"
#ifdef __linux__
#include <fcntl.h>
@ -1141,7 +1143,7 @@ ACTOR Future<Void> workerServer(
loop choose {
when( UpdateServerDBInfoRequest req = waitNext( interf.updateServerDBInfo.getFuture() ) ) {
ServerDBInfo localInfo = BinaryReader::fromStringRef<ServerDBInfo>(req.serializedDbInfo, AssumeVersion(currentProtocolVersion));
ServerDBInfo localInfo = BinaryReader::fromStringRef<ServerDBInfo>(req.serializedDbInfo, AssumeVersion(g_network->protocolVersion()));
localInfo.myLocality = locality;
if(localInfo.infoGeneration < dbInfo->get().infoGeneration && localInfo.clusterInterface == dbInfo->get().clusterInterface) {
@ -1796,6 +1798,16 @@ ACTOR Future<Void> monitorLeaderRemotelyWithDelayedCandidacy( Reference<ClusterC
}
}
ACTOR Future<Void> serveProtocolInfo() {
state RequestStream<ProtocolInfoRequest> protocolInfo(
PeerCompatibilityPolicy{ RequirePeer::AtLeast, ProtocolVersion::withStableInterfaces() });
protocolInfo.makeWellKnownEndpoint(WLTOKEN_PROTOCOL_INFO, TaskPriority::DefaultEndpoint);
loop {
state ProtocolInfoRequest req = waitNext(protocolInfo.getFuture());
req.reply.send(ProtocolInfoReply{ g_network->protocolVersion() });
}
}
ACTOR Future<Void> fdbd(
Reference<ClusterConnectionFile> connFile,
LocalityData localities,
@ -1811,6 +1823,8 @@ ACTOR Future<Void> fdbd(
state vector<Future<Void>> actors;
state Promise<Void> recoveredDiskFiles;
actors.push_back(serveProtocolInfo());
try {
ServerCoordinators coordinators( connFile );
if (g_network->isSimulated()) {

View File

@ -34,6 +34,7 @@
#include "flow/DeterministicRandom.h"
#include "fdbclient/ManagementAPI.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
#include "flow/network.h"
//#define SevCCheckInfo SevVerbose
#define SevCCheckInfo SevInfo
@ -1295,7 +1296,7 @@ struct ConsistencyCheckWorkload : TestWorkload
vector<ISimulator::ProcessInfo*> all = g_simulator.getAllProcesses();
for(int i = 0; i < all.size(); i++) {
if( all[i]->isReliable() && all[i]->name == std::string("Server") && all[i]->startingClass != ProcessClass::TesterClass ) {
if( all[i]->isReliable() && all[i]->name == std::string("Server") && all[i]->startingClass != ProcessClass::TesterClass && all[i]->protocolVersion == g_network->protocolVersion() ) {
if(!workerAddresses.count(all[i]->address)) {
TraceEvent("ConsistencyCheck_WorkerMissingFromList").detail("Addr", all[i]->address);
return false;

View File

@ -69,7 +69,7 @@ struct DowngradeWorkload : TestWorkload {
};
ACTOR static Future<Void> writeOld(Database cx, int numObjects, Key key) {
BinaryWriter writer(IncludeVersion(currentProtocolVersion));
BinaryWriter writer(IncludeVersion(g_network->protocolVersion()));
std::vector<OldStruct> data(numObjects);
for (auto& oldObject : data) {
oldObject.setFields();
@ -90,7 +90,7 @@ struct DowngradeWorkload : TestWorkload {
}
ACTOR static Future<Void> writeNew(Database cx, int numObjects, Key key) {
ProtocolVersion protocolVersion = currentProtocolVersion;
ProtocolVersion protocolVersion = g_network->protocolVersion();
protocolVersion.addObjectSerializerFlag();
ObjectWriter writer(IncludeVersion(protocolVersion));
std::vector<NewStruct> data(numObjects);

View File

@ -0,0 +1,51 @@
/*
* ProtocolVersion.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2019 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbserver/workloads/workloads.actor.h"
struct ProtocolVersionWorkload : TestWorkload {
ProtocolVersionWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {}
std::string description() const override { return "ProtocolVersionWorkload"; }
Future<Void> start(Database const& cx) override { return _start(this, cx); }
ACTOR Future<Void> _start(ProtocolVersionWorkload* self, Database cx) {
state std::vector<ISimulator::ProcessInfo*> allProcesses = g_pSimulator->getAllProcesses();
state std::vector<ISimulator::ProcessInfo*>::iterator diffVersionProcess =
find_if(allProcesses.begin(), allProcesses.end(),
[](const ISimulator::ProcessInfo* p) { return p->protocolVersion != currentProtocolVersion; });
ASSERT(diffVersionProcess != allProcesses.end());
RequestStream<ProtocolInfoRequest> requestStream{ Endpoint{ { (*diffVersionProcess)->addresses },
WLTOKEN_PROTOCOL_INFO } };
ProtocolInfoReply reply = wait(retryBrokenPromise(requestStream, ProtocolInfoRequest{}));
ASSERT(reply.version != g_network->protocolVersion());
return Void();
}
Future<bool> check(Database const& cx) override { return true; }
void getMetrics(vector<PerfMetric>& m) override {}
};
WorkloadFactory<ProtocolVersionWorkload> ProtocolVersionWorkloadFactory("ProtocolVersion");

View File

@ -166,7 +166,7 @@ struct StatusWorkload : TestWorkload {
state double issued = now();
StatusObject result = wait(StatusClient::statusFetcher(cx));
++self->replies;
BinaryWriter br(AssumeVersion(currentProtocolVersion));
BinaryWriter br(AssumeVersion(g_network->protocolVersion()));
save(br, result);
self->totalSize += br.getLength();
TraceEvent("StatusWorkloadReply").detail("ReplySize", br.getLength()).detail("Latency", now() - issued);//.detail("Reply", json_spirit::write_string(json_spirit::mValue(result)));

View File

@ -156,7 +156,7 @@ struct StorefrontWorkload : TestWorkload {
updaters.clear();
// set value for the order
BinaryWriter wr(AssumeVersion(currentProtocolVersion)); wr << itemList;
BinaryWriter wr(AssumeVersion(g_network->protocolVersion())); wr << itemList;
tr.set(orderKey, wr.toValue());
wait( tr.commit() );
@ -187,7 +187,7 @@ struct StorefrontWorkload : TestWorkload {
int orderIdx;
for(orderIdx=0; orderIdx<values.size(); orderIdx++) {
vector<int> saved;
BinaryReader br( values[orderIdx].value, AssumeVersion(currentProtocolVersion) );
BinaryReader br( values[orderIdx].value, AssumeVersion(g_network->protocolVersion()) );
br >> saved;
for(int c=0; c<saved.size(); c++)
result[saved[c]]++;
@ -247,7 +247,7 @@ struct StorefrontWorkload : TestWorkload {
for( int i=0; i < it->second; i++ )
itemList.push_back( it->first );
}
BinaryWriter wr(AssumeVersion(currentProtocolVersion)); wr << itemList;
BinaryWriter wr(AssumeVersion(g_network->protocolVersion())); wr << itemList;
if( wr.toValue() != val.get().toString() ) {
TraceEvent(SevError, "TestFailure")
.detail("Reason", "OrderContentsMismatch")

View File

@ -59,7 +59,7 @@ void printBitsBig(size_t const size, void const * const ptr)
template<typename IntType>
void testCompressedInt(IntType n, StringRef rep = StringRef()) {
BinaryWriter w(AssumeVersion(currentProtocolVersion));
BinaryWriter w(AssumeVersion(g_network->protocolVersion()));
CompressedInt<IntType> cn(n);
w << cn;
@ -74,7 +74,7 @@ void testCompressedInt(IntType n, StringRef rep = StringRef()) {
rep = w.toValue();
cn.value = 0;
BinaryReader r(rep, AssumeVersion(currentProtocolVersion));
BinaryReader r(rep, AssumeVersion(g_network->protocolVersion()));
r >> cn;
if(cn.value != n) {

View File

@ -66,7 +66,7 @@ class UID {
public:
constexpr static FileIdentifier file_identifier = 15597147;
UID() { part[0] = part[1] = 0; }
UID( uint64_t a, uint64_t b ) { part[0]=a; part[1]=b; }
constexpr UID(uint64_t a, uint64_t b) : part{ a, b } {}
std::string toString() const;
std::string shortString() const;
bool isValid() const { return part[0] || part[1]; }

View File

@ -170,6 +170,9 @@ public:
virtual flowGlobalType global(int id) const override { return (globals.size() > id) ? globals[id] : nullptr; }
virtual void setGlobal(size_t id, flowGlobalType v) { globals.resize(std::max(globals.size(),id+1)); globals[id] = v; }
ProtocolVersion protocolVersion() override { return currentProtocolVersion; }
std::vector<flowGlobalType> globals;
virtual const TLSConfig& getTLSConfig() const override { return tlsConfig; }
@ -2028,7 +2031,7 @@ void net2_test() {
SendBuffer* pb = unsent.getWriteBuffer();
ReliablePacket* rp = new ReliablePacket; // 0
PacketWriter wr(pb,rp,AssumeVersion(currentProtocolVersion));
PacketWriter wr(pb,rp,AssumeVersion(g_network->protocolVersion()));
//BinaryWriter wr;
SplitBuffer packetLen;
uint32_t len = 0;

View File

@ -39,7 +39,7 @@ class ProtocolVersion {
public: // constants
static constexpr uint64_t versionFlagMask = 0x0FFFFFFFFFFFFFFFLL;
static constexpr uint64_t objectSerializerFlag = 0x1000000000000000LL;
static constexpr uint64_t compatibleProtocolVersionMask = 0xffffffffffff0000LL;
static constexpr uint64_t compatibleProtocolVersionMask = 0xFFFFFFFFFFFF0000LL;
static constexpr uint64_t minValidProtocolVersion = 0x0FDB00A200060001LL;
public:
@ -49,6 +49,7 @@ public:
constexpr bool isCompatible(ProtocolVersion other) const {
return (other.version() & compatibleProtocolVersionMask) == (version() & compatibleProtocolVersionMask);
}
constexpr bool isValid() const { return version() >= minValidProtocolVersion; }
constexpr uint64_t version() const { return _version & versionFlagMask; }
@ -128,6 +129,7 @@ public: // introduced features
PROTOCOL_VERSION_FEATURE(0x0FDB00B063010000LL, ReportConflictingKeys);
PROTOCOL_VERSION_FEATURE(0x0FDB00B063010000LL, SmallEndpoints);
PROTOCOL_VERSION_FEATURE(0x0FDB00B063010000LL, CacheRole);
PROTOCOL_VERSION_FEATURE(0x0FDB00B070010000LL, StableInterfaces);
PROTOCOL_VERSION_FEATURE(0x0FDB00B070010001LL, TagThrottleValueReason);
PROTOCOL_VERSION_FEATURE(0x0FDB00B070010001LL, SpanContext);
};

View File

@ -221,7 +221,7 @@ struct MetricData {
BinaryWriter writer;
explicit MetricData(uint64_t appendStart = 0) :
writer(AssumeVersion(currentProtocolVersion)),
writer(AssumeVersion(g_network->protocolVersion())),
start(0),
rollTime(std::numeric_limits<uint64_t>::max()),
appendStart(appendStart) {
@ -521,7 +521,7 @@ struct FieldLevel {
// Calculate header as of the end of a value block
static Header calculateHeader(StringRef block) {
BinaryReader r(block, AssumeVersion(currentProtocolVersion));
BinaryReader r(block, AssumeVersion(g_network->protocolVersion()));
Header h;
r >> h;
Encoder dec;
@ -534,11 +534,11 @@ struct FieldLevel {
// Read header at position, update it with previousHeader, overwrite old header with new header.
static void updateSerializedHeader(StringRef buf, const Header &patch) {
BinaryReader r(buf, AssumeVersion(currentProtocolVersion));
BinaryReader r(buf, AssumeVersion(g_network->protocolVersion()));
Header h;
r >> h;
h.update(patch);
OverWriter w(mutateString(buf), buf.size(), AssumeVersion(currentProtocolVersion));
OverWriter w(mutateString(buf), buf.size(), AssumeVersion(g_network->protocolVersion()));
w << h;
}
@ -1257,7 +1257,7 @@ public:
Standalone<StringRef> getLatestAsValue() {
FieldValueBlockEncoding< TimeAndValue< T > > enc;
BinaryWriter wr(AssumeVersion(currentProtocolVersion));
BinaryWriter wr(AssumeVersion(g_network->protocolVersion()));
// Write a header so the client can treat this value like a normal data value block.
// TOOD: If it is useful, this could be the current header value of the most recently logged level.
wr << FieldHeader<TimeAndValue<T>>();

View File

@ -170,7 +170,7 @@ struct Root {
struct TestContextArena {
Arena& _arena;
Arena& arena() { return _arena; }
ProtocolVersion protocolVersion() const { return currentProtocolVersion; }
ProtocolVersion protocolVersion() const { return g_network->protocolVersion(); }
uint8_t* allocate(size_t size) { return new (_arena) uint8_t[size]; }
};
@ -228,7 +228,7 @@ struct Arena {
struct TestContext {
Arena& _arena;
Arena& arena() { return _arena; }
ProtocolVersion protocolVersion() const { return currentProtocolVersion; }
ProtocolVersion protocolVersion() const { return g_network->protocolVersion(); }
uint8_t* allocate(size_t size) { return _arena(size); }
TestContext& context() { return *this; }
};

View File

@ -256,7 +256,7 @@ public:
//This should only be called from the ObjectSerializer load function
Standalone<StringRef> getCache() const {
if(cacheType != SerializeType::Object) {
cache = ObjectWriter::toValue(ErrorOr<EnsureTable<T>>(data), AssumeVersion(currentProtocolVersion));
cache = ObjectWriter::toValue(ErrorOr<EnsureTable<T>>(data), AssumeVersion(g_network->protocolVersion()));
cacheType = SerializeType::Object;
}
return cache;
@ -283,7 +283,7 @@ public:
serializer(ar, data);
} else {
if (cacheType != SerializeType::Binary) {
cache = BinaryWriter::toValue(data, AssumeVersion(currentProtocolVersion));
cache = BinaryWriter::toValue(data, AssumeVersion(g_network->protocolVersion()));
cacheType = SerializeType::Binary;
}
ar.serializeBytes(const_cast<uint8_t*>(cache.begin()), cache.size());
@ -327,7 +327,7 @@ struct serialize_raw<ErrorOr<EnsureTable<CachedSerialization<V>>>> : std::true_t
static uint8_t* save_raw(Context& context, const ErrorOr<EnsureTable<CachedSerialization<V>>>& obj) {
auto cache = obj.present() ? obj.get().asUnderlyingType().getCache()
: ObjectWriter::toValue(ErrorOr<EnsureTable<V>>(obj.getError()),
AssumeVersion(currentProtocolVersion));
AssumeVersion(g_network->protocolVersion()));
uint8_t* out = context.allocate(cache.size());
memcpy(out, cache.begin(), cache.size());
return out;

View File

@ -20,6 +20,7 @@
#ifndef FLOW_OPENNETWORK_H
#define FLOW_OPENNETWORK_H
#include "flow/ProtocolVersion.h"
#pragma once
#include <array>
@ -532,6 +533,8 @@ public:
// If the network has not been run and this function has not been previously called, returns true. Otherwise, returns false.
virtual bool checkRunnable() = 0;
virtual ProtocolVersion protocolVersion() = 0;
// Shorthand for transport().getLocalAddress()
static NetworkAddress getLocalAddress()
{

View File

@ -129,6 +129,7 @@ if(WITH_PYTHON)
add_fdb_test(TEST_FILES fast/LowLatency.toml)
add_fdb_test(TEST_FILES fast/MemoryLifetime.toml)
add_fdb_test(TEST_FILES fast/MoveKeysCycle.toml)
add_fdb_test(TEST_FILES fast/ProtocolVersion.toml)
add_fdb_test(TEST_FILES fast/RandomSelector.toml)
add_fdb_test(TEST_FILES fast/RandomUnitTests.toml)
add_fdb_test(TEST_FILES fast/ReadHotDetectionCorrectness.toml IGNORE) # TODO re-enable once read hot detection is enabled.

View File

@ -0,0 +1,7 @@
startIncompatibleProcess = true
[[test]]
testTitle = 'ProtocolVersionTest'
[[test.workload]]
testName = 'ProtocolVersion'