diff --git a/fdbclient/MonitorLeader.actor.cpp b/fdbclient/MonitorLeader.actor.cpp index b066b03b13..e391978efd 100644 --- a/fdbclient/MonitorLeader.actor.cpp +++ b/fdbclient/MonitorLeader.actor.cpp @@ -246,10 +246,10 @@ TEST_CASE("/flow/FlatBuffers/LeaderInfo") { } in.serializedInfo = rndString; } - ObjectWriter writer; + ObjectWriter writer(Unversioned()); writer.serialize(in); Standalone copy = writer.toStringRef(); - ArenaObjectReader reader(copy.arena(), copy); + ArenaObjectReader reader(copy.arena(), copy, Unversioned()); reader.deserialize(out); ASSERT(in.forward == out.forward); ASSERT(in.changeID == out.changeID); @@ -268,10 +268,10 @@ TEST_CASE("/flow/FlatBuffers/LeaderInfo") { ErrorOr>> objIn(leaderInfo); ErrorOr>> objOut; Standalone copy; - ObjectWriter writer; + ObjectWriter writer(Unversioned()); writer.serialize(objIn); copy = writer.toStringRef(); - ArenaObjectReader reader(copy.arena(), copy); + ArenaObjectReader reader(copy.arena(), copy, Unversioned()); reader.deserialize(objOut); ASSERT(!objOut.isError()); diff --git a/fdbrpc/FlowTransport.actor.cpp b/fdbrpc/FlowTransport.actor.cpp index ba5655c71d..33c98c3601 100644 --- a/fdbrpc/FlowTransport.actor.cpp +++ b/fdbrpc/FlowTransport.actor.cpp @@ -657,7 +657,7 @@ ACTOR static void deliver(TransportData* self, Endpoint destination, ArenaReader if (g_network->useObjectSerializer()) { StringRef data = reader.arenaReadAll(); ASSERT(data.size() > 8); - ArenaObjectReader objReader(reader.arena(), reader.arenaReadAll()); + ArenaObjectReader objReader(reader.arena(), reader.arenaReadAll(), AssumeVersion(reader.protocolVersion())); receiver->receive(objReader); } else { receiver->receive(reader); @@ -1150,7 +1150,7 @@ static PacketID sendPacket( TransportData* self, ISerializeSource const& what, c Standalone copy; if (g_network->useObjectSerializer()) { - ObjectWriter wr; + ObjectWriter wr(AssumeVersion(currentProtocolVersion)); what.serializeObjectWriter(wr); copy = wr.toStringRef(); } else { diff --git a/fdbserver/LeaderElection.h b/fdbserver/LeaderElection.h index 8e90c53034..8bf2774d7c 100644 --- a/fdbserver/LeaderElection.h +++ b/fdbserver/LeaderElection.h @@ -61,7 +61,7 @@ Future tryBecomeLeader( ServerCoordinators const& coordinators, Reference> serializedInfo(new AsyncVar); Future m = tryBecomeLeaderInternal( coordinators, - g_network->useObjectSerializer() ? ObjectWriter::toValue(proposedInterface) : BinaryWriter::toValue(proposedInterface, IncludeVersion()), + g_network->useObjectSerializer() ? ObjectWriter::toValue(proposedInterface, IncludeVersion()) : BinaryWriter::toValue(proposedInterface, IncludeVersion()), serializedInfo, hasConnected, asyncPriorityInfo); return m || asyncDeserialize(serializedInfo, outKnownLeader, g_network->useObjectSerializer()); } diff --git a/flow/ObjectSerializer.h b/flow/ObjectSerializer.h index d0537d389b..e8222da0f7 100644 --- a/flow/ObjectSerializer.h +++ b/flow/ObjectSerializer.h @@ -22,6 +22,7 @@ #include "flow/Error.h" #include "flow/Arena.h" #include "flow/flat_buffers.h" +#include "flow/ProtocolVersion.h" template struct LoadContext { @@ -45,7 +46,12 @@ struct LoadContext { template class _ObjectReader { + ProtocolVersion mProtocolVersion; public: + + ProtocolVersion protocolVersion() const { return mProtocolVersion; } + void setProtocolVersion(ProtocolVersion v) { mProtocolVersion = v; } + template void deserialize(FileIdentifier file_identifier, Items&... items) { const uint8_t* data = static_cast(this)->data(); @@ -61,10 +67,20 @@ public: }; class ObjectReader : public _ObjectReader { + friend class _IncludeVersion; + ObjectReader& operator>> (ProtocolVersion& version) { + uint64_t result; + memcpy(&result, _data, sizeof(result)); + _data += sizeof(result); + return *this; + } public: static constexpr bool ownsUnderlyingMemory = false; - ObjectReader(const uint8_t* data) : _data(data) {} + template + ObjectReader(const uint8_t* data, VersionOptions vo) : _data(data) { + vo.read(*this); + } const uint8_t* data() { return _data; } @@ -76,10 +92,21 @@ private: }; class ArenaObjectReader : public _ObjectReader { + friend class _IncludeVersion; + ArenaObjectReader& operator>> (ProtocolVersion& version) { + uint64_t result; + memcpy(&result, _data, sizeof(result)); + _data += sizeof(result); + return *this; + } public: static constexpr bool ownsUnderlyingMemory = true; - ArenaObjectReader(Arena const& arena, const StringRef& input) : _data(input.begin()), _arena(arena) {} + template + ArenaObjectReader(Arena const& arena, const StringRef& input, VersionOptions vo) + : _data(input.begin()), _arena(arena) { + vo.read(*this); + } const uint8_t* data() { return _data; } @@ -91,25 +118,45 @@ private: }; class ObjectWriter { + friend class _IncludeVersion; + bool writeProtocolVersion = false; + ObjectWriter& operator<< (const ProtocolVersion& version) { + writeProtocolVersion = true; + return *this; + } + ProtocolVersion mProtocolVersion; public: - ObjectWriter() = default; - explicit ObjectWriter(std::function customAllocator) : customAllocator(customAllocator) {} + template + ObjectWriter(VersionOptions vo) { + vo.write(*this); + } + template + explicit ObjectWriter(std::function customAllocator, VersionOptions vo) + : customAllocator(customAllocator) { + vo.write(*this); + } template void serialize(FileIdentifier file_identifier, Items const&... items) { + int allocations = 0; + auto allocator = [this, &allocations](size_t size_) { + ++allocations; + size = size_; + auto toAllocate = writeProtocolVersion ? size + sizeof(uint64_t) : size; + if (customAllocator) { + data = customAllocator(toAllocate); + } else { + data = new (arena) uint8_t[toAllocate]; + } + if (writeProtocolVersion) { + auto v = protocolVersion().versionWithFlags(); + memcpy(data, &v, sizeof(uint64_t)); + return data + sizeof(uint64_t); + } + return data; + }; ASSERT(data == nullptr); // object serializer can only serialize one object - if (customAllocator) { - save_members(customAllocator, file_identifier, items...); - } else { - int allocations = 0; - auto allocator = [this, &allocations](size_t size_) { - ++allocations; - size = size_; - data = new (arena) uint8_t[size]; - return data; - }; - save_members(allocator, file_identifier, items...); - ASSERT(allocations == 1); - } + save_members(allocator, file_identifier, items...); + ASSERT(allocations == 1); } template @@ -126,13 +173,19 @@ public: return Standalone(toStringRef(), arena); } - template - static Standalone toValue(Item const& item) { - ObjectWriter writer; + template + static Standalone toValue(Item const& item, VersionOptions vo) { + ObjectWriter writer(vo); writer.serialize(item); return writer.toString(); } + ProtocolVersion protocolVersion() const { return mProtocolVersion; } + void setProtocolVersion(ProtocolVersion v) { + mProtocolVersion = v; + ASSERT(mProtocolVersion.isValid()); + } + private: Arena arena; std::function customAllocator; diff --git a/flow/flat_buffers.cpp b/flow/flat_buffers.cpp index 2b0e354df4..e1bb29de44 100644 --- a/flow/flat_buffers.cpp +++ b/flow/flat_buffers.cpp @@ -439,11 +439,11 @@ TEST_CASE("/flow/FlatBuffers/VectorRef") { for (const auto& str : src) { vec.push_back(arena, str); } - ObjectWriter writer; + ObjectWriter writer(Unversioned()); writer.serialize(FileIdentifierFor::value, arena, vec); serializedVector = StringRef(readerArena, writer.toStringRef()); } - ArenaObjectReader reader(readerArena, serializedVector); + ArenaObjectReader reader(readerArena, serializedVector, Unversioned()); reader.deserialize(FileIdentifierFor::value, vecArena, outVec); } ASSERT(src.size() == outVec.size()); @@ -461,8 +461,8 @@ TEST_CASE("/flow/FlatBuffers/Standalone") { auto str = deterministicRandom()->randomAlphaNumeric(deterministicRandom()->randomInt(0, 30)); vecIn.push_back(vecIn.arena(), StringRef(vecIn.arena(), str)); } - Standalone value = ObjectWriter::toValue(vecIn); - ArenaObjectReader reader(value.arena(), value); + Standalone value = ObjectWriter::toValue(vecIn, Unversioned()); + ArenaObjectReader reader(value.arena(), value, Unversioned()); VectorRef> vecOut; reader.deserialize(vecOut); ASSERT(vecOut.size() == vecIn.size()); diff --git a/flow/flow.cpp b/flow/flow.cpp index 722c8004f1..21e206b24c 100644 --- a/flow/flow.cpp +++ b/flow/flow.cpp @@ -253,10 +253,10 @@ TEST_CASE("/flow/FlatBuffers/ErrorOr") { { ErrorOr in(worker_removed()); ErrorOr out; - ObjectWriter writer; + ObjectWriter writer(Unversioned()); writer.serialize(in); Standalone copy = writer.toStringRef(); - ArenaObjectReader reader(copy.arena(), copy); + ArenaObjectReader reader(copy.arena(), copy, Unversioned()); reader.deserialize(out); ASSERT(out.isError()); ASSERT(out.getError().code() == in.getError().code()); @@ -264,10 +264,10 @@ TEST_CASE("/flow/FlatBuffers/ErrorOr") { { ErrorOr in(deterministicRandom()->randomUInt32()); ErrorOr out; - ObjectWriter writer; + ObjectWriter writer(Unversioned()); writer.serialize(in); Standalone copy = writer.toStringRef(); - ArenaObjectReader reader(copy.arena(), copy); + ArenaObjectReader reader(copy.arena(), copy, Unversioned()); reader.deserialize(out); ASSERT(!out.isError()); ASSERT(out.get() == in.get()); @@ -279,20 +279,20 @@ TEST_CASE("/flow/FlatBuffers/Optional") { { Optional in; Optional out; - ObjectWriter writer; + ObjectWriter writer(Unversioned()); writer.serialize(in); Standalone copy = writer.toStringRef(); - ArenaObjectReader reader(copy.arena(), copy); + ArenaObjectReader reader(copy.arena(), copy, Unversioned()); reader.deserialize(out); ASSERT(!out.present()); } { Optional in(deterministicRandom()->randomUInt32()); Optional out; - ObjectWriter writer; + ObjectWriter writer(Unversioned()); writer.serialize(in); Standalone copy = writer.toStringRef(); - ArenaObjectReader reader(copy.arena(), copy); + ArenaObjectReader reader(copy.arena(), copy, Unversioned()); reader.deserialize(out); ASSERT(out.present()); ASSERT(out.get() == in.get()); @@ -304,20 +304,20 @@ TEST_CASE("/flow/FlatBuffers/Standalone") { { Standalone in(std::string("foobar")); StringRef out; - ObjectWriter writer; + ObjectWriter writer(Unversioned()); writer.serialize(in); Standalone copy = writer.toStringRef(); - ArenaObjectReader reader(copy.arena(), copy); + ArenaObjectReader reader(copy.arena(), copy, Unversioned()); reader.deserialize(out); ASSERT(in == out); } { StringRef in = LiteralStringRef("foobar"); Standalone out; - ObjectWriter writer; + ObjectWriter writer(Unversioned()); writer.serialize(in); Standalone copy = writer.toStringRef(); - ArenaObjectReader reader(copy.arena(), copy); + ArenaObjectReader reader(copy.arena(), copy, Unversioned()); reader.deserialize(out); ASSERT(in == out); } diff --git a/flow/genericactors.actor.h b/flow/genericactors.actor.h index fdf02a30d2..591d286c55 100644 --- a/flow/genericactors.actor.h +++ b/flow/genericactors.actor.h @@ -740,7 +740,7 @@ ACTOR template Future asyncDeserialize( Referenceget().size()) { if (useObjSerializer) { - ObjectReader reader(input->get().begin()); + ObjectReader reader(input->get().begin(), IncludeVersion()); T res; reader.deserialize(res); output->set(res); diff --git a/flow/serialize.h b/flow/serialize.h index 5a204f77fa..47c3a0ab79 100644 --- a/flow/serialize.h +++ b/flow/serialize.h @@ -779,7 +779,7 @@ struct MakeSerializeSource : ISerializeSource { using value_type = V; virtual void serializePacketWriter(PacketWriter& w, bool useObjectSerializer) const { if (useObjectSerializer) { - ObjectWriter writer([&](size_t size) { return w.writeBytes(size); }); + ObjectWriter writer([&](size_t size) { return w.writeBytes(size); }, AssumeVersion(w.protocolVersion())); writer.serialize(get()); // Writes directly into buffer supplied by |w| } else { static_cast(this)->serialize(w);