diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index e934bdb80f..bfbefad3f0 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -1474,11 +1474,11 @@ ACTOR Future watchValue(Future version, Key key, Optional g_traceBatch.addAttach("WatchValueAttachID", info.debugID.get().first(), watchValueID.get().first()); g_traceBatch.addEvent("WatchValueDebug", watchValueID.get().first(), "NativeAPI.watchValue.Before"); //.detail("TaskID", g_network->getCurrentTask()); } - state Version resp; + state WatchValueReply resp; choose { - when(Version r = wait(loadBalance(ssi.second, &StorageServerInterface::watchValue, - WatchValueRequest(key, value, ver, watchValueID), - TaskPriority::DefaultPromiseEndpoint))) { + when(WatchValueReply r = wait(loadBalance(ssi.second, &StorageServerInterface::watchValue, + WatchValueRequest(key, value, ver, watchValueID), + TaskPriority::DefaultPromiseEndpoint))) { resp = r; } when(wait(cx->connectionFile ? cx->connectionFile->onChange() : Never())) { wait(Never()); } @@ -1489,12 +1489,13 @@ ACTOR Future watchValue(Future version, Key key, Optional //FIXME: wait for known committed version on the storage server before replying, //cannot do this until the storage server is notified on knownCommittedVersion changes from tlog (faster than the current update loop) - Version v = wait( waitForCommittedVersion( cx, resp ) ); + Version v = wait(waitForCommittedVersion(cx, resp.version)); - //TraceEvent("WatcherCommitted").detail("CommittedVersion", v).detail("WatchVersion", resp).detail("Key", key ).detail("Value", value); + //TraceEvent("WatcherCommitted").detail("CommittedVersion", v).detail("WatchVersion", resp.version).detail("Key", key ).detail("Value", value); - if( v - resp < 50000000 ) // False if there is a master failure between getting the response and getting the committed version, Dependent on SERVER_KNOBS->MAX_VERSIONS_IN_FLIGHT - return Void(); + // False if there is a master failure between getting the response and getting the committed version, + // Dependent on SERVER_KNOBS->MAX_VERSIONS_IN_FLIGHT + if (v - resp.version < 50000000) return Void(); ver = v; } catch (Error& e) { if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed) { diff --git a/fdbclient/StorageServerInterface.h b/fdbclient/StorageServerInterface.h index fb93407143..c7447a3877 100644 --- a/fdbclient/StorageServerInterface.h +++ b/fdbclient/StorageServerInterface.h @@ -30,6 +30,20 @@ #include "flow/Stats.h" #include "fdbrpc/TimedRequest.h" +// Dead code, removed in the next protocol version +struct VersionReply { + constexpr static FileIdentifier file_identifier = 3; + + Version version; + VersionReply() = default; + explicit VersionReply(Version version) : version(version) {} + + template + void serialize(Ar& ar) { + serializer(ar, version); + } +}; + struct StorageServerInterface { constexpr static FileIdentifier file_identifier = 15302073; enum { BUSY_ALLOWED = 0, BUSY_FORCE = 1, BUSY_LOCAL = 2 }; @@ -40,7 +54,7 @@ struct StorageServerInterface { LocalityData locality; UID uniqueID; - RequestStream> getVersion; + RequestStream> getVersion; RequestStream getValue; RequestStream getKey; @@ -140,14 +154,27 @@ struct GetValueRequest : TimedRequest { } }; +struct WatchValueReply { + constexpr static FileIdentifier file_identifier = 3; + + Version version; + WatchValueReply() = default; + explicit WatchValueReply(Version version) : version(version) {} + + template + void serialize(Ar& ar) { + serializer(ar, version); + } +}; + struct WatchValueRequest { constexpr static FileIdentifier file_identifier = 14747733; Key key; Optional value; Version version; Optional debugID; - ReplyPromise< Version > reply; - + ReplyPromise reply; + WatchValueRequest(){} WatchValueRequest(const Key& key, Optional value, Version ver, Optional debugID) : key(key), value(value), version(ver), debugID(debugID) {} @@ -219,6 +246,20 @@ struct GetKeyRequest : TimedRequest { } }; +struct GetShardStateReply { + constexpr static FileIdentifier file_identifier = 0; + + Version first; + Version second; + GetShardStateReply() = default; + GetShardStateReply(Version first, Version second) : first(first), second(second) {} + + template + void serialize(Ar& ar) { + serializer(ar, first, second); + } +}; + struct GetShardStateRequest { constexpr static FileIdentifier file_identifier = 15860168; enum waitMode { @@ -229,7 +270,7 @@ struct GetShardStateRequest { KeyRange keys; int32_t mode; - ReplyPromise< std::pair > reply; + ReplyPromise reply; GetShardStateRequest() {} GetShardStateRequest( KeyRange const& keys, waitMode mode ) : keys(keys), mode(mode) {} diff --git a/fdbrpc/FlowTests.actor.cpp b/fdbrpc/FlowTests.actor.cpp index ff38945632..d14903b233 100644 --- a/fdbrpc/FlowTests.actor.cpp +++ b/fdbrpc/FlowTests.actor.cpp @@ -268,6 +268,20 @@ TEST_CASE("/flow/flow/cancel2") return Void(); } +namespace { +// Simple message for flatbuffers unittests +struct Int { + constexpr static FileIdentifier file_identifier = 12345; + uint32_t value; + Int() = default; + Int(uint32_t value) : value(value) {} + template + void serialize(Ar& ar) { + serializer(ar, value); + } +}; +} // namespace + TEST_CASE("/flow/flow/nonserializable futures") { // Types no longer need to be statically serializable to make futures, promises, actors @@ -283,20 +297,20 @@ TEST_CASE("/flow/flow/nonserializable futures") // ReplyPromise can be used like a normal promise { - ReplyPromise rpInt; - Future f = rpInt.getFuture(); + ReplyPromise rpInt; + Future f = rpInt.getFuture(); ASSERT(!f.isReady()); rpInt.send(123); - ASSERT(f.get() == 123); + ASSERT(f.get().value == 123); } { - RequestStream rsInt; - FutureStream f = rsInt.getFuture(); + RequestStream rsInt; + FutureStream f = rsInt.getFuture(); rsInt.send(1); rsInt.send(2); - ASSERT(f.pop() == 1); - ASSERT(f.pop() == 2); + ASSERT(f.pop().value == 1); + ASSERT(f.pop().value == 2); } return Void(); @@ -306,14 +320,14 @@ TEST_CASE("/flow/flow/networked futures") { // RequestStream can be serialized { - RequestStream locInt; + RequestStream locInt; BinaryWriter wr(IncludeVersion()); wr << locInt; ASSERT(locInt.getEndpoint().isValid() && locInt.getEndpoint().isLocal() && locInt.getEndpoint().getPrimaryAddress() == FlowTransport::transport().getLocalAddress()); BinaryReader rd(wr.toValue(), IncludeVersion()); - RequestStream remoteInt; + RequestStream remoteInt; rd >> remoteInt; ASSERT(remoteInt.getEndpoint() == locInt.getEndpoint()); @@ -323,14 +337,14 @@ TEST_CASE("/flow/flow/networked futures") // ReplyPromise can be serialized // TODO: This needs to fiddle with g_currentDeliveryPeerAddress if (0) { - ReplyPromise locInt; + ReplyPromise locInt; BinaryWriter wr(IncludeVersion()); wr << locInt; ASSERT(locInt.getEndpoint().isValid() && locInt.getEndpoint().isLocal()); BinaryReader rd(wr.toValue(), IncludeVersion()); - ReplyPromise remoteInt; + ReplyPromise remoteInt; rd >> remoteInt; ASSERT(remoteInt.getEndpoint() == locInt.getEndpoint()); diff --git a/fdbserver/Coordination.actor.cpp b/fdbserver/Coordination.actor.cpp index 5a46283a5e..b88f9879bb 100644 --- a/fdbserver/Coordination.actor.cpp +++ b/fdbserver/Coordination.actor.cpp @@ -302,7 +302,8 @@ ACTOR Future leaderRegister(LeaderElectionRegInterface interf, Key key) { //TODO: use notify to only send a heartbeat once per interval availableLeaders.erase( LeaderInfo(req.prevChangeID) ); availableLeaders.insert( req.myInfo ); - req.reply.send( currentNominee.present() && currentNominee.get().equalInternalId(req.myInfo) ); + req.reply.send( + LeaderHeartbeatReply{ currentNominee.present() && currentNominee.get().equalInternalId(req.myInfo) }); } when (ForwardRequest req = waitNext( interf.forward.getFuture() ) ) { LeaderInfo newInfo; @@ -499,7 +500,7 @@ ACTOR Future leaderServer(LeaderElectionRegInterface interf, OnDemandStore when ( LeaderHeartbeatRequest req = waitNext( interf.leaderHeartbeat.getFuture() ) ) { Optional forward = regs.getForward(req.key); if( forward.present() ) - req.reply.send( false ); + req.reply.send(LeaderHeartbeatReply{ false }); else regs.getInterface(req.key, id).leaderHeartbeat.send(req); } diff --git a/fdbserver/CoordinationInterface.h b/fdbserver/CoordinationInterface.h index b943618ab3..7e77cdbf0e 100644 --- a/fdbserver/CoordinationInterface.h +++ b/fdbserver/CoordinationInterface.h @@ -136,12 +136,29 @@ struct CandidacyRequest { } }; +struct LeaderHeartbeatReply { + constexpr static FileIdentifier file_identifier = 11; + + bool value = false; + LeaderHeartbeatReply() = default; + explicit LeaderHeartbeatReply(bool value) : value(value) {} + + template + void serialize(Ar& ar) { + serializer(ar, value); + } +}; + +inline bool operator==(const LeaderHeartbeatReply& lhs, const LeaderHeartbeatReply& rhs) { + return lhs.value == rhs.value; +} + struct LeaderHeartbeatRequest { constexpr static FileIdentifier file_identifier = 9495992; Key key; LeaderInfo myInfo; UID prevChangeID; - ReplyPromise reply; + ReplyPromise reply; LeaderHeartbeatRequest() {} explicit LeaderHeartbeatRequest( Key key, LeaderInfo const& myInfo, UID prevChangeID ) : key(key), myInfo(myInfo), prevChangeID(prevChangeID) {} diff --git a/fdbserver/LeaderElection.actor.cpp b/fdbserver/LeaderElection.actor.cpp index 5a97b6358f..be23f7da8e 100644 --- a/fdbserver/LeaderElection.actor.cpp +++ b/fdbserver/LeaderElection.actor.cpp @@ -183,9 +183,11 @@ ACTOR Future tryBecomeLeaderInternal(ServerCoordinators coordinators, Valu state vector> true_heartbeats; state vector> false_heartbeats; for(int i=0; i hb = retryBrokenPromise( coordinators.leaderElectionServers[i].leaderHeartbeat, LeaderHeartbeatRequest( coordinators.clusterKey, myInfo, prevChangeID ), TaskPriority::CoordinationReply ); - true_heartbeats.push_back( onEqual(hb, true) ); - false_heartbeats.push_back( onEqual(hb, false) ); + Future hb = retryBrokenPromise( + coordinators.leaderElectionServers[i].leaderHeartbeat, + LeaderHeartbeatRequest(coordinators.clusterKey, myInfo, prevChangeID), TaskPriority::CoordinationReply); + true_heartbeats.push_back(onEqual(hb, LeaderHeartbeatReply{ true })); + false_heartbeats.push_back(onEqual(hb, LeaderHeartbeatReply{ false })); } state Future rate = delay( SERVER_KNOBS->HEARTBEAT_FREQUENCY, TaskPriority::CoordinationReply ) || asyncPriorityInfo->onChange(); // SOMEDAY: Move to server side? diff --git a/fdbserver/MasterInterface.h b/fdbserver/MasterInterface.h index 6cab65cbe6..534ce01610 100644 --- a/fdbserver/MasterInterface.h +++ b/fdbserver/MasterInterface.h @@ -55,10 +55,25 @@ struct MasterInterface { } }; +struct TLogRejoinReply { + constexpr static FileIdentifier file_identifier = 11; + + // false means someone else registered, so we should re-register. true means this master is recovered, so don't + // send again to the same master. + bool masterIsRecovered; + TLogRejoinReply() = default; + explicit TLogRejoinReply(bool masterIsRecovered) : masterIsRecovered(masterIsRecovered) {} + + template + void serialize(Ar& ar) { + serializer(ar, masterIsRecovered); + } +}; + struct TLogRejoinRequest { constexpr static FileIdentifier file_identifier = 15692200; TLogInterface myInterface; - ReplyPromise reply; // false means someone else registered, so we should re-register. true means this master is recovered, so don't send again to the same master. + ReplyPromise reply; TLogRejoinRequest() { } explicit TLogRejoinRequest(const TLogInterface &interf) : myInterface(interf) { } diff --git a/fdbserver/MoveKeys.actor.cpp b/fdbserver/MoveKeys.actor.cpp index d95f14a19d..7f4c73e266 100644 --- a/fdbserver/MoveKeys.actor.cpp +++ b/fdbserver/MoveKeys.actor.cpp @@ -139,8 +139,8 @@ Future checkMoveKeysLockReadOnly( Transaction* tr, MoveKeysLock lock ) { return checkMoveKeysLock(tr, lock, false); } -ACTOR Future> checkReadWrite( Future< ErrorOr> > fReply, UID uid, Version version ) { - ErrorOr> reply = wait( fReply ); +ACTOR Future> checkReadWrite(Future> fReply, UID uid, Version version) { + ErrorOr reply = wait(fReply); if (!reply.present() || reply.get().first < version) return Optional(); return Optional(uid); @@ -443,7 +443,8 @@ ACTOR Future startMoveKeys( Database occ, KeyRange keys, vector serve ACTOR Future waitForShardReady( StorageServerInterface server, KeyRange keys, Version minVersion, GetShardStateRequest::waitMode mode ) { loop { try { - std::pair rep = wait( server.getShardState.getReply( GetShardStateRequest(keys, mode), TaskPriority::MoveKeys ) ); + GetShardStateReply rep = + wait(server.getShardState.getReply(GetShardStateRequest(keys, mode), TaskPriority::MoveKeys)); if (rep.first >= minVersion) { return Void(); } diff --git a/fdbserver/OldTLogServer_4_6.actor.cpp b/fdbserver/OldTLogServer_4_6.actor.cpp index c8e246dc1e..c07f820f3e 100644 --- a/fdbserver/OldTLogServer_4_6.actor.cpp +++ b/fdbserver/OldTLogServer_4_6.actor.cpp @@ -1119,11 +1119,11 @@ namespace oldTLog_4_6 { req.myInterface = tli; TraceEvent("TLogRejoining", self->dbgid).detail("Master", self->dbInfo->get().master.id()); choose { - when ( bool success = wait( brokenPromiseToNever( self->dbInfo->get().master.tlogRejoin.getReply( req ) ) ) ) { - if (success) - lastMasterID = self->dbInfo->get().master.id(); - } - when ( wait( self->dbInfo->onChange() ) ) { } + when(TLogRejoinReply rep = + wait(brokenPromiseToNever(self->dbInfo->get().master.tlogRejoin.getReply(req)))) { + if (rep.masterIsRecovered) lastMasterID = self->dbInfo->get().master.id(); + } + when ( wait( self->dbInfo->onChange() ) ) { } } } else { wait( self->dbInfo->onChange() ); diff --git a/fdbserver/OldTLogServer_6_0.actor.cpp b/fdbserver/OldTLogServer_6_0.actor.cpp index 10f191b937..70dd8e0453 100644 --- a/fdbserver/OldTLogServer_6_0.actor.cpp +++ b/fdbserver/OldTLogServer_6_0.actor.cpp @@ -1477,9 +1477,9 @@ ACTOR Future rejoinMasters( TLogData* self, TLogInterface tli, DBRecoveryC TLogRejoinRequest req(tli); TraceEvent("TLogRejoining", self->dbgid).detail("Master", self->dbInfo->get().master.id()); choose { - when ( bool success = wait( brokenPromiseToNever( self->dbInfo->get().master.tlogRejoin.getReply( req ) ) ) ) { - if (success) - lastMasterID = self->dbInfo->get().master.id(); + when(TLogRejoinReply rep = + wait(brokenPromiseToNever(self->dbInfo->get().master.tlogRejoin.getReply(req)))) { + if (rep.masterIsRecovered) lastMasterID = self->dbInfo->get().master.id(); } when ( wait( self->dbInfo->onChange() ) ) { } } diff --git a/fdbserver/ResolverInterface.h b/fdbserver/ResolverInterface.h index 65b46a5941..029bde6475 100644 --- a/fdbserver/ResolverInterface.h +++ b/fdbserver/ResolverInterface.h @@ -103,9 +103,22 @@ struct ResolveTransactionBatchRequest { } }; +struct ResolutionMetricsReply { + constexpr static FileIdentifier file_identifier = 3; + + int64_t value; + ResolutionMetricsReply() = default; + explicit ResolutionMetricsReply(int64_t value) : value(value) {} + + template + void serialize(Ar& ar) { + serializer(ar, value); + } +}; + struct ResolutionMetricsRequest { constexpr static FileIdentifier file_identifier = 11663527; - ReplyPromise reply; + ReplyPromise reply; template void serialize(Archive& ar) { diff --git a/fdbserver/TLogInterface.h b/fdbserver/TLogInterface.h index b83ef75a45..bf54f4c3fd 100644 --- a/fdbserver/TLogInterface.h +++ b/fdbserver/TLogInterface.h @@ -216,6 +216,19 @@ struct TagMessagesRef { } }; +struct TLogCommitReply { + constexpr static FileIdentifier file_identifier = 3; + + Version version; + TLogCommitReply() = default; + explicit TLogCommitReply(Version version) : version(version) {} + + template + void serialize(Ar& ar) { + serializer(ar, version); + } +}; + struct TLogCommitRequest { constexpr static FileIdentifier file_identifier = 4022206; Arena arena; @@ -223,7 +236,7 @@ struct TLogCommitRequest { StringRef messages;// Each message prefixed by a 4-byte length - ReplyPromise reply; + ReplyPromise reply; Optional debugID; TLogCommitRequest() {} diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index 95d51267c5..9d52a45304 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -1853,9 +1853,9 @@ ACTOR Future rejoinMasters( TLogData* self, TLogInterface tli, DBRecoveryC TLogRejoinRequest req(tli); TraceEvent("TLogRejoining", self->dbgid).detail("Master", self->dbInfo->get().master.id()); choose { - when ( bool success = wait( brokenPromiseToNever( self->dbInfo->get().master.tlogRejoin.getReply( req ) ) ) ) { - if (success) - lastMasterID = self->dbInfo->get().master.id(); + when(TLogRejoinReply rep = + wait(brokenPromiseToNever(self->dbInfo->get().master.tlogRejoin.getReply(req)))) { + if (rep.masterIsRecovered) lastMasterID = self->dbInfo->get().master.id(); } when ( wait( self->dbInfo->onChange() ) ) { } } diff --git a/fdbserver/TagPartitionedLogSystem.actor.cpp b/fdbserver/TagPartitionedLogSystem.actor.cpp index 2065166b27..9aa91105e8 100644 --- a/fdbserver/TagPartitionedLogSystem.actor.cpp +++ b/fdbserver/TagPartitionedLogSystem.actor.cpp @@ -30,12 +30,12 @@ #include "fdbserver/RecoveryState.h" #include "flow/actorcompiler.h" // This must be the last #include. -ACTOR Future minVersionWhenReady( Future f, std::vector> replies) { +ACTOR Future minVersionWhenReady(Future f, std::vector> replies) { wait(f); Version minVersion = std::numeric_limits::max(); for(auto& reply : replies) { if(reply.isReady() && !reply.isError()) { - minVersion = std::min(minVersion, reply.get()); + minVersion = std::min(minVersion, reply.get().version); } } return minVersion; @@ -429,7 +429,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted push( Version prevVersion, Version version, Version knownCommittedVersion, Version minKnownCommittedVersion, LogPushData& data, Optional debugID ) { // FIXME: Randomize request order as in LegacyLogSystem? vector> quorumResults; - vector> allReplies; + vector> allReplies; int location = 0; for(auto& it : tLogs) { if(it->isLocal && it->logServers.size()) { @@ -2271,7 +2271,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted trackRejoins( UID dbgid, std::vector>>> logServers, FutureStream< struct TLogRejoinRequest > rejoinRequests ) { - state std::map> lastReply; + state std::map> lastReply; try { loop { @@ -2287,7 +2287,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedget().present() || req.myInterface.commit.getEndpoint() != logServers[pos]->get().interf().commit.getEndpoint()) logServers[pos]->setUnconditional( OptionalInterface(req.myInterface) ); - lastReply[req.myInterface.id()].send(false); + lastReply[req.myInterface.id()].send(TLogRejoinReply{ false }); lastReply[req.myInterface.id()] = req.reply; } else { @@ -2296,8 +2296,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedsecond.send(true); + for (auto it = lastReply.begin(); it != lastReply.end(); ++it) it->second.send(TLogRejoinReply{ true }); throw; } } diff --git a/fdbserver/TesterInterface.actor.h b/fdbserver/TesterInterface.actor.h index d5b02ef76e..f51422b4b6 100644 --- a/fdbserver/TesterInterface.actor.h +++ b/fdbserver/TesterInterface.actor.h @@ -31,12 +31,22 @@ #include "fdbrpc/PerfMetric.h" #include "fdbclient/NativeAPI.actor.h" #include "flow/actorcompiler.h" // has to be last include +struct CheckReply { + constexpr static FileIdentifier file_identifier = 11; + + bool value = false; + + template + void serialize(Ar& ar) { + serializer(ar, value); + } +}; struct WorkloadInterface { constexpr static FileIdentifier file_identifier = 4454551; RequestStream> setup; RequestStream> start; - RequestStream> check; + RequestStream> check; RequestStream > > metrics; RequestStream> stop; diff --git a/fdbserver/masterserver.actor.cpp b/fdbserver/masterserver.actor.cpp index 3c478d241c..5ca2830f58 100644 --- a/fdbserver/masterserver.actor.cpp +++ b/fdbserver/masterserver.actor.cpp @@ -1018,7 +1018,7 @@ ACTOR Future resolutionBalancing(Reference self) { wait(delay(SERVER_KNOBS->MIN_BALANCE_TIME, TaskPriority::ResolutionMetrics)); while(self->resolverChanges.get().size()) wait(self->resolverChanges.onChange()); - state std::vector> futures; + state std::vector> futures; for (auto& p : self->resolvers) futures.push_back(brokenPromiseToNever(p.metrics.getReply(ResolutionMetricsRequest(), TaskPriority::ResolutionMetrics))); wait( waitForAll(futures) ); @@ -1026,8 +1026,8 @@ ACTOR Future resolutionBalancing(Reference self) { int64_t total = 0; for (int i = 0; i < futures.size(); i++) { - total += futures[i].get(); - metrics.insert(std::make_pair(futures[i].get(), i), NoMetric()); + total += futures[i].get().value; + metrics.insert(std::make_pair(futures[i].get().value, i), NoMetric()); //TraceEvent("ResolverMetric").detail("I", i).detail("Metric", futures[i].get()); } if( metrics.lastItem()->first - metrics.begin()->first > SERVER_KNOBS->MIN_BALANCE_DIFFERENCE ) { diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index ea910f2396..0156197e30 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -934,7 +934,7 @@ ACTOR Future watchValue_impl( StorageServer* data, WatchValueRequest req ) g_traceBatch.addEvent("WatchValueDebug", req.debugID.get().first(), "watchValueQ.AfterRead"); //.detail("TaskID", g_network->getCurrentTask()); if( reply.value != req.value ) { - req.reply.send( latest ); + req.reply.send(WatchValueReply{ latest }); return Void(); } @@ -1012,7 +1012,7 @@ ACTOR Future getShardState_impl( StorageServer* data, GetShardStateRequest } if( !onChange.size() ) { - req.reply.send(std::make_pair(data->version.get(), data->durableVersion.get())); + req.reply.send(GetShardStateReply{ data->version.get(), data->durableVersion.get() }); return Void(); } @@ -3533,7 +3533,7 @@ ACTOR Future storageServerCore( StorageServer* self, StorageServerInterfac when (GetShardStateRequest req = waitNext(ssi.getShardState.getFuture()) ) { if (req.mode == GetShardStateRequest::NO_WAIT ) { if( self->isReadable( req.keys ) ) - req.reply.send(std::make_pair(self->version.get(),self->durableVersion.get())); + req.reply.send(GetShardStateReply{ self->version.get(), self->durableVersion.get() }); else req.reply.sendError(wrong_shard_server()); } else { @@ -3543,7 +3543,7 @@ ACTOR Future storageServerCore( StorageServer* self, StorageServerInterfac when (StorageQueuingMetricsRequest req = waitNext(ssi.getQueuingMetrics.getFuture())) { getQueuingMetrics(self, req); } - when( ReplyPromise reply = waitNext(ssi.getVersion.getFuture()) ) { + when(ReplyPromise reply = waitNext(ssi.getVersion.getFuture())) { reply.send( self->version.get() ); } when( ReplyPromise reply = waitNext(ssi.getKeyValueStoreType.getFuture()) ) { diff --git a/fdbserver/tester.actor.cpp b/fdbserver/tester.actor.cpp index 43f9a01cb9..9c7639aa46 100644 --- a/fdbserver/tester.actor.cpp +++ b/fdbserver/tester.actor.cpp @@ -404,10 +404,10 @@ ACTOR Future runWorkloadAsync( Database cx, WorkloadInterface workIface, T state unique_ptr delw(workload); state Optional> setupResult; state Optional> startResult; - state Optional> checkResult; + state Optional> checkResult; state ReplyPromise setupReq; state ReplyPromise startReq; - state ReplyPromise checkReq; + state ReplyPromise checkReq; TraceEvent("TestBeginAsync", workIface.id()).detail("Workload", workload->description()).detail("DatabasePingDelay", databasePingDelay); @@ -452,12 +452,12 @@ ACTOR Future runWorkloadAsync( Database cx, WorkloadInterface workIface, T } sendResult( startReq, startResult ); } - when( ReplyPromise req = waitNext( workIface.check.getFuture() ) ) { + when(ReplyPromise req = waitNext(workIface.check.getFuture())) { checkReq = req; if (!checkResult.present()) { try { bool check = wait( timeoutError( workload->check(cx), workload->getCheckTimeout() ) ); - checkResult = (!startResult.present() || !startResult.get().isError()) && check; + checkResult = CheckReply{ (!startResult.present() || !startResult.get().isError()) && check }; } catch (Error& e) { checkResult = operation_failed(); // was: checkResult = false; if( e.code() == error_code_please_reboot || e.code() == error_code_please_reboot_delete) throw; @@ -693,16 +693,16 @@ ACTOR Future runWorkload( Database cx, std::vector< Test wait( delay(3.0) ); } - state std::vector< Future> > checks; + state std::vector>> checks; TraceEvent("CheckingResults"); printf("checking test (%s)...\n", printable(spec.title).c_str()); for(int i= 0; i < workloads.size(); i++) - checks.push_back( workloads[i].check.template getReplyUnlessFailedFor(waitForFailureTime, 0) ); + checks.push_back(workloads[i].check.template getReplyUnlessFailedFor(waitForFailureTime, 0)); wait( waitForAll( checks ) ); throwIfError(checks, "CheckFailedForWorkload" + printable(spec.title)); for(int i = 0; i < checks.size(); i++) { - if(checks[i].get().get()) + if (checks[i].get().get().value) success++; else failure++; diff --git a/flow/FileIdentifier.h b/flow/FileIdentifier.h index 1ae1d59374..15dde95b11 100644 --- a/flow/FileIdentifier.h +++ b/flow/FileIdentifier.h @@ -72,68 +72,3 @@ template struct ComposedIdentifierExternal { static constexpr FileIdentifier value = ComposedIdentifier::file_identifier; }; - -template <> -struct FileIdentifierFor { - constexpr static FileIdentifier value = 1; -}; - -template <> -struct FileIdentifierFor { - constexpr static FileIdentifier value = 2; -}; - -template <> -struct FileIdentifierFor { - constexpr static FileIdentifier value = 3; -}; - -template <> -struct FileIdentifierFor { - constexpr static FileIdentifier value = 4; -}; - -template <> -struct FileIdentifierFor { - constexpr static FileIdentifier value = 5; -}; - -template <> -struct FileIdentifierFor { - constexpr static FileIdentifier value = 6; -}; - -template <> -struct FileIdentifierFor { - constexpr static FileIdentifier value = 7; -}; - -template <> -struct FileIdentifierFor { - constexpr static FileIdentifier value = 8; -}; - -template <> -struct FileIdentifierFor { - constexpr static FileIdentifier value = 9; -}; - -template <> -struct FileIdentifierFor { - constexpr static FileIdentifier value = 10; -}; - -template <> -struct FileIdentifierFor { - constexpr static FileIdentifier value = 11; -}; - -template <> -struct FileIdentifierFor { - constexpr static FileIdentifier value = 7266212; -}; - -template <> -struct FileIdentifierFor { - constexpr static FileIdentifier value = 9348150; -}; diff --git a/flow/flat_buffers.h b/flow/flat_buffers.h index 27e6f37980..54b3368916 100644 --- a/flow/flat_buffers.h +++ b/flow/flat_buffers.h @@ -1140,12 +1140,19 @@ inline FileIdentifier read_file_identifier(const uint8_t* in) { return result; } +namespace detail { +template +struct YesFileIdentifier { + constexpr static FileIdentifier file_identifier = FileIdentifierFor::value; +}; +struct NoFileIdentifier {}; +}; // namespace detail + // members of unions must be tables in flatbuffers, so you can use this to // introduce the indirection only when necessary. template -struct EnsureTable { - static_assert(HasFileIdentifier::value); - constexpr static FileIdentifier file_identifier = FileIdentifierFor::value; +struct EnsureTable + : std::conditional_t::value, detail::YesFileIdentifier, detail::NoFileIdentifier> { EnsureTable() = default; EnsureTable(const T& t) : t(t) {} template diff --git a/flow/flow.cpp b/flow/flow.cpp index 21e206b24c..66feb0d126 100644 --- a/flow/flow.cpp +++ b/flow/flow.cpp @@ -249,10 +249,24 @@ void enableBuggify(bool enabled, BuggifyType type) { buggifyActivated[int(type)] = enabled; } +namespace { +// Simple message for flatbuffers unittests +struct Int { + constexpr static FileIdentifier file_identifier = 12345; + uint32_t value; + Int() = default; + Int(uint32_t value) : value(value) {} + template + void serialize(Ar& ar) { + serializer(ar, value); + } +}; +} // namespace + TEST_CASE("/flow/FlatBuffers/ErrorOr") { { - ErrorOr in(worker_removed()); - ErrorOr out; + ErrorOr in(worker_removed()); + ErrorOr out; ObjectWriter writer(Unversioned()); writer.serialize(in); Standalone copy = writer.toStringRef(); @@ -262,23 +276,23 @@ TEST_CASE("/flow/FlatBuffers/ErrorOr") { ASSERT(out.getError().code() == in.getError().code()); } { - ErrorOr in(deterministicRandom()->randomUInt32()); - ErrorOr out; + ErrorOr in(deterministicRandom()->randomUInt32()); + ErrorOr out; ObjectWriter writer(Unversioned()); writer.serialize(in); Standalone copy = writer.toStringRef(); ArenaObjectReader reader(copy.arena(), copy, Unversioned()); reader.deserialize(out); ASSERT(!out.isError()); - ASSERT(out.get() == in.get()); + ASSERT(out.get().value == in.get().value); } return Void(); } TEST_CASE("/flow/FlatBuffers/Optional") { { - Optional in; - Optional out; + Optional in; + Optional out; ObjectWriter writer(Unversioned()); writer.serialize(in); Standalone copy = writer.toStringRef(); @@ -287,15 +301,15 @@ TEST_CASE("/flow/FlatBuffers/Optional") { ASSERT(!out.present()); } { - Optional in(deterministicRandom()->randomUInt32()); - Optional out; + Optional in(deterministicRandom()->randomUInt32()); + Optional out; ObjectWriter writer(Unversioned()); writer.serialize(in); Standalone copy = writer.toStringRef(); ArenaObjectReader reader(copy.arena(), copy, Unversioned()); reader.deserialize(out); ASSERT(out.present()); - ASSERT(out.get() == in.get()); + ASSERT(out.get().value == in.get().value); } return Void(); }