foundationdb/flow/serialize.h

854 lines
24 KiB
C++

/*
* serialize.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FLOW_SERIALIZE_H
#define FLOW_SERIALIZE_H
#pragma once
#include <stdint.h>
#include <array>
#include <set>
#include "flow/ProtocolVersion.h"
#include "flow/Error.h"
#include "flow/Arena.h"
#include "flow/FileIdentifier.h"
#include "flow/ObjectSerializer.h"
#include "flow/network.h"
#include <algorithm>
#include <deque>
// Though similar, is_binary_serializable cannot be replaced by std::is_pod, as doing so would prefer
// memcpy over a defined serialize() method on a POD struct. As not all of our structs are packed,
// this would both inflate message sizes by transmitting padding, and mean that we're transmitting
// undefined bytes over the wire.
// A more intelligent SFINAE that does "binarySerialize if POD and no serialize() is defined" could
// replace the usage of is_binary_serializable.
template <class T>
struct is_binary_serializable : std::false_type {};
#define BINARY_SERIALIZABLE(T) \
template <> \
struct is_binary_serializable<T> : std::true_type {};
BINARY_SERIALIZABLE( int8_t );
BINARY_SERIALIZABLE( uint8_t );
BINARY_SERIALIZABLE( int16_t );
BINARY_SERIALIZABLE( uint16_t );
BINARY_SERIALIZABLE( int32_t );
BINARY_SERIALIZABLE( uint32_t );
BINARY_SERIALIZABLE( int64_t );
BINARY_SERIALIZABLE( uint64_t );
BINARY_SERIALIZABLE( bool );
BINARY_SERIALIZABLE( double );
BINARY_SERIALIZABLE( ProtocolVersion );
template<>
struct scalar_traits<ProtocolVersion> : std::true_type {
constexpr static size_t size = sizeof(uint64_t);
static void save(uint8_t* out, const ProtocolVersion& v) {
*reinterpret_cast<uint64_t*>(out) = v.versionWithFlags();
}
template <class Context>
static void load(const uint8_t* i, ProtocolVersion& out, Context& context) {
const uint64_t* in = reinterpret_cast<const uint64_t*>(i);
out = ProtocolVersion(*in);
}
};
template <class Archive, class Item>
inline typename Archive::WRITER& operator << (Archive& ar, const Item& item ) {
save(ar, item);
return ar;
}
template <class Archive, class Item>
inline typename Archive::READER& operator >> (Archive& ar, Item& item ) {
ar.deserialize(item);
return ar;
}
template <class Archive, class Item, class... Items>
typename Archive::WRITER& serializer(Archive& ar, const Item& item, const Items&... items) {
save(ar, item);
if constexpr (sizeof...(Items) > 0) {
serializer(ar, items...);
}
return ar;
}
template <class Archive, class Item, class... Items>
typename Archive::READER& serializer(Archive& ar, Item& item, Items&... items) {
load(ar, item);
if constexpr (sizeof...(Items) > 0) {
serializer(ar, items...);
}
return ar;
}
template <class Archive, class T, class Enable = void>
class Serializer {
public:
static void serialize( Archive& ar, T& t ) {
t.serialize(ar);
ASSERT( ar.protocolVersion().isValid() );
}
};
template <class Ar, class T>
inline void save( Ar& ar, const T& value ) {
Serializer<Ar,T>::serialize(ar, const_cast<T&>(value));
}
template <class Ar, class T>
inline void load( Ar& ar, T& value ) {
Serializer<Ar,T>::serialize(ar, value);
}
template <class CharT, class Traits, class Allocator>
struct FileIdentifierFor<std::basic_string<CharT, Traits, Allocator>> {
constexpr static FileIdentifier value = 15694229;
};
template <class Archive>
inline void load( Archive& ar, std::string& value ) {
int32_t length;
ar >> length;
value.resize(length);
ar.serializeBytes( &value[0], (int)value.length() );
ASSERT( ar.protocolVersion().isValid() );
}
template <class Archive>
inline void save( Archive& ar, const std::string& value ) {
ar << (int32_t)value.length();
ar.serializeBytes( (void*)&value[0], (int)value.length() );
ASSERT( ar.protocolVersion().isValid() );
}
template <class Archive, class T>
class Serializer<Archive, T, typename std::enable_if_t<is_binary_serializable<T>::value>> {
public:
static void serialize( Archive& ar, T& t ) {
ar.serializeBinaryItem(t);
}
};
template <class Archive, class T>
class Serializer<Archive, T, typename std::enable_if_t<std::is_enum_v<T>>> {
public:
static void serialize(Archive& ar, T& t) {
static_assert(is_binary_serializable<std::underlying_type_t<T>>::value);
ar.serializeBinaryItem(reinterpret_cast<std::underlying_type_t<T>&>(t));
}
};
template <class Archive, class T1, class T2>
class Serializer< Archive, std::pair<T1,T2>, void > {
public:
static void serialize( Archive& ar, std::pair<T1, T2>& p ) {
serializer(ar, p.first, p.second);
}
};
template <class T, class Allocator>
struct FileIdentifierFor<std::vector<T, Allocator>> : ComposedIdentifierExternal<T, 5> {};
template <class T, class Allocator>
struct CompositionDepthFor<std::vector<T, Allocator>> : std::integral_constant<int, CompositionDepthFor<T>::value + 1> {
};
template <class Archive, class T>
inline void save( Archive& ar, const std::vector<T>& value ) {
ar << (int)value.size();
for(auto it = value.begin(); it != value.end(); ++it)
ar << *it;
ASSERT( ar.protocolVersion().isValid() );
}
template <class Archive, class T>
inline void load( Archive& ar, std::vector<T>& value ) {
int s;
ar >> s;
value.clear();
value.reserve(s);
for (int i = 0; i < s; i++) {
value.push_back(T());
ar >> value[i];
}
ASSERT(ar.protocolVersion().isValid());
}
template <class Archive, class T>
inline void save(Archive& ar, const std::deque<T>& value) {
ar << (int)value.size();
for (auto it = value.begin(); it != value.end(); ++it) ar << *it;
ASSERT(ar.protocolVersion().isValid());
}
template <class Archive, class T>
inline void load(Archive& ar, std::deque<T>& value) {
int s;
ar >> s;
value.clear();
value.reserve(s);
for (int i = 0; i < s; i++) {
value.push_back(T());
ar >> value[i];
}
ASSERT( ar.protocolVersion().isValid() );
}
template <class Archive, class T, size_t N>
inline void save( Archive& ar, const std::array<T, N>& value ) {
for(int ii = 0; ii < N; ++ii)
ar << value[ii];
ASSERT( ar.protocolVersion().isValid() );
}
template <class Archive, class T, size_t N>
inline void load( Archive& ar, std::array<T, N>& value ) {
for (int ii = 0; ii < N; ii++) {
ar >> value[ii];
}
ASSERT( ar.protocolVersion().isValid() );
}
template <class Archive, class T>
inline void save( Archive& ar, const std::set<T>& value ) {
ar << (int)value.size();
for(auto it = value.begin(); it != value.end(); ++it)
ar << *it;
ASSERT( ar.protocolVersion().isValid() );
}
template <class Archive, class T>
inline void load( Archive& ar, std::set<T>& value ) {
int s;
ar >> s;
value.clear();
T currentValue;
for (int i = 0; i < s; i++) {
ar >> currentValue;
value.insert(currentValue);
}
ASSERT( ar.protocolVersion().isValid() );
}
template <class Archive, class K, class V>
inline void save( Archive& ar, const std::map<K, V>& value ) {
ar << (int)value.size();
for (const auto &it : value) {
ar << it.first << it.second;
}
ASSERT( ar.protocolVersion().isValid() );
}
template <class Archive, class K, class V>
inline void load( Archive& ar, std::map<K, V>& value ) {
int s;
ar >> s;
value.clear();
for (int i = 0; i < s; ++i) {
std::pair<K, V> p;
ar >> p.first >> p.second;
value.emplace(p);
}
ASSERT( ar.protocolVersion().isValid() );
}
#pragma intrinsic (memcpy)
#if VALGRIND
static bool valgrindCheck( const void* data, int bytes, const char* context ) {
auto first= VALGRIND_CHECK_MEM_IS_DEFINED( data, bytes );
if (first) {
int und=0;
for(int b=0; b<bytes; b++)
if (VALGRIND_CHECK_MEM_IS_DEFINED( (uint8_t*)data+b, 1 ))
und++;
TraceEvent(SevError, "UndefinedData").detail("In", context).detail("Size", bytes).detail("Undefined", und).detail("FirstAt", (int64_t)first-(int64_t)data);
return false;
}
return true;
}
#else
static inline bool valgrindCheck( const void* data, int bytes, const char* context ) { return true; }
#endif
struct _IncludeVersion {
ProtocolVersion v;
explicit _IncludeVersion( ProtocolVersion defaultVersion ) : v(defaultVersion) {
ASSERT( defaultVersion.isValid() );
}
template <class Ar>
void write( Ar& ar ) {
ar.setProtocolVersion(v);
ar << v;
}
template <class Ar>
void read( Ar& ar ) {
ar >> v;
if (!v.isValid()) {
auto err = incompatible_protocol_version();
TraceEvent(SevWarnAlways, "InvalidSerializationVersion").error(err).detailf("Version", "%llx", v.versionWithFlags());
throw err;
}
if (v >= minInvalidProtocolVersion) {
// Downgrades are only supported for one minor version
auto err = incompatible_protocol_version();
TraceEvent(SevError, "FutureProtocolVersion").error(err).detailf("Version", "%llx", v.versionWithFlags());
throw err;
}
ar.setProtocolVersion(v);
}
};
struct _AssumeVersion {
ProtocolVersion v;
explicit _AssumeVersion( ProtocolVersion version );
template <class Ar> void write( Ar& ar ) { ar.setProtocolVersion(v); }
template <class Ar> void read( Ar& ar ) { ar.setProtocolVersion(v); }
};
struct _Unversioned {
template <class Ar> void write( Ar& ar ) {}
template <class Ar> void read( Ar& ar ) {}
};
// These functions return valid options to the VersionOptions parameter of the constructor of each archive type
inline _IncludeVersion IncludeVersion( ProtocolVersion defaultVersion = currentProtocolVersion ) { return _IncludeVersion(defaultVersion); }
inline _AssumeVersion AssumeVersion( ProtocolVersion version ) { return _AssumeVersion(version); }
inline _Unversioned Unversioned() { return _Unversioned(); }
//static uint64_t size_limits[] = { 0ULL, 255ULL, 65535ULL, 16777215ULL, 4294967295ULL, 1099511627775ULL, 281474976710655ULL, 72057594037927935ULL, 18446744073709551615ULL };
class BinaryWriter : NonCopyable {
public:
static const int isDeserializing = 0;
static constexpr bool isSerializing = true;
typedef BinaryWriter WRITER;
void serializeBytes( StringRef bytes ) {
serializeBytes(bytes.begin(), bytes.size());
}
void serializeBytes(const void* data, int bytes) {
if (bytes > 0) {
valgrindCheck(data, bytes, "serializeBytes");
void* p = writeBytes(bytes);
memcpy(p, data, bytes);
}
}
template <class T>
void serializeBinaryItem( const T& t ) {
*(T*)writeBytes(sizeof(T)) = t;
}
void* getData() { return data; }
int getLength() const { return size; }
Standalone<StringRef> toValue() const { return Standalone<StringRef>(StringRef(data, size), arena); }
template <class VersionOptions>
explicit BinaryWriter( VersionOptions vo ) : data(nullptr), size(0), allocated(0) { vo.write(*this); }
BinaryWriter( BinaryWriter&& rhs ) : arena(std::move(rhs.arena)), data(rhs.data), size(rhs.size), allocated(rhs.allocated), m_protocolVersion(rhs.m_protocolVersion) {
rhs.size = 0;
rhs.allocated = 0;
rhs.data = nullptr;
}
void operator=( BinaryWriter&& r) {
arena = std::move(r.arena);
data = r.data;
size = r.size;
allocated = r.allocated;
m_protocolVersion = r.m_protocolVersion;
r.size = 0;
r.allocated = 0;
r.data = nullptr;
}
template <class T, class VersionOptions>
static Standalone<StringRef> toValue( T const& t, VersionOptions vo ) {
BinaryWriter wr(vo);
wr << t;
return wr.toValue();
}
static int bytesNeeded( uint64_t val ) {
int n;
for( n=1; n<8 && (val>>(n*8)); ++n );
return n;
}
void serializeAsTuple( StringRef str ) {
size_t last_pos = 0;
serializeBytes(LiteralStringRef("\x01"));
for (size_t pos = 0; pos < str.size(); ++pos) {
if (str[pos] == '\x00') {
serializeBytes(str.substr(last_pos,pos - last_pos));
serializeBytes(LiteralStringRef("\x00\xff"));
last_pos = pos + 1;
}
}
serializeBytes(str.substr(last_pos,str.size() - last_pos));
serializeBytes(LiteralStringRef("\x00"));
}
void serializeAsTuple( bool t ) {
if(!t) {
void* p = writeBytes(1);
((uint8_t*)p)[0] = (uint8_t)20;
} else {
void* p = writeBytes(2);
((uint8_t*)p)[0] = (uint8_t)21;
((uint8_t*)p)[1] = (uint8_t)1;
}
}
void serializeAsTuple( uint64_t t ) {
if(t == 0) {
void* p = writeBytes(1);
((uint8_t*)p)[0] = (uint8_t)20;
return;
}
//int n = ( std::lower_bound(size_limits, size_limits+9, t) - size_limits );
//ASSERT( n <= 8 );
int n = bytesNeeded(t);
void* p = writeBytes(n+1);
((uint8_t*)p)[0] = (uint8_t)(20+n);
uint64_t x = bigEndian64(t);
memcpy((uint8_t*)p+1, (uint8_t*)&x+(8-n), n);
}
void serializeAsTuple( int64_t t ) {
if(t == 0) {
void* p = writeBytes(1);
((uint8_t*)p)[0] = (uint8_t)20;
} else if(t > 0) {
//int n = ( std::lower_bound(size_limits, size_limits+9, t) - size_limits );
//ASSERT( n <= 9 );
int n = bytesNeeded(t);
void* p = writeBytes(n+1);
((uint8_t*)p)[0] = (uint8_t)(20+n);
uint64_t x = bigEndian64((uint64_t)t);
memcpy((uint8_t*)p+1, (uint8_t*)&x+(8-n), n);
} else {
//int n = ( std::lower_bound(size_limits, size_limits+9, -t) - size_limits );
//ASSERT( n <= 9 );
int n = bytesNeeded(-t);
void* p = writeBytes(n+1);
((uint8_t*)p)[0] = (uint8_t)(20-n);
uint64_t x = bigEndian64(t-1);
memcpy((uint8_t*)p+1, (uint8_t*)&x+(8-n), n);
}
}
ProtocolVersion protocolVersion() const { return m_protocolVersion; }
void setProtocolVersion(ProtocolVersion pv) { m_protocolVersion = pv; }
private:
Arena arena;
uint8_t* data;
int size, allocated;
ProtocolVersion m_protocolVersion;
void* writeBytes(int s) {
int p = size;
size += s;
if (size > allocated) {
if(size <= 512-sizeof(ArenaBlock)) {
allocated = 512-sizeof(ArenaBlock);
} else if(size <= 4096-sizeof(ArenaBlock)) {
allocated = 4096-sizeof(ArenaBlock);
} else {
allocated = std::max(allocated*2, size);
}
Arena newArena;
uint8_t* newData = new ( newArena ) uint8_t[ allocated ];
if (p > 0) {
memcpy(newData, data, p);
}
arena = newArena;
data = newData;
}
return data+p;
}
};
// A known-length memory segment and an unknown-length memory segment which can be written to as a whole.
struct SplitBuffer {
void write(const void* data, int length);
void write(const void* data, int length, int offset);
void writeAndShrink(const void* data, int length);
uint8_t *begin, *next;
int first_length;
};
// A writer that can serialize to a SplitBuffer
class OverWriter {
public:
typedef OverWriter WRITER;
template <class VersionOptions>
explicit OverWriter(SplitBuffer buf, VersionOptions vo) : buf(buf), len(std::numeric_limits<int>::max()) { vo.write(*this); }
template <class VersionOptions>
explicit OverWriter(void *ptr, int len, VersionOptions vo) : len(len) {
buf.begin = (uint8_t *)ptr;
buf.first_length = len;
vo.write(*this);
}
void serializeBytes( StringRef bytes ) {
serializeBytes(bytes.begin(), bytes.size());
}
void serializeBytes(const void* data, int bytes) {
valgrindCheck( data, bytes, "serializeBytes" );
writeBytes(data, bytes);
}
template <class T>
void serializeBinaryItem( const T& t ) {
writeBytes(&t, sizeof(T));
}
ProtocolVersion protocolVersion() const { return m_protocolVersion; }
void setProtocolVersion(ProtocolVersion pv) { m_protocolVersion = pv; }
private:
int len;
SplitBuffer buf;
ProtocolVersion m_protocolVersion;
void writeBytes(const void *data, int wlen) {
ASSERT(wlen <= len);
buf.writeAndShrink(data, wlen);
len -= wlen;
}
};
template<class Impl>
class _Reader {
public:
static const int isDeserializing = 1;
static constexpr bool isSerializing = false;
using READER = Impl;
const void *peekBytes(int bytes) const {
ASSERT(begin + bytes <= end);
return begin;
}
void serializeBytes(void *data, int bytes) {
memcpy(data, static_cast<Impl*>(this)->readBytes(bytes), bytes);
}
template <class T>
void serializeBinaryItem( T& t ) {
t = *(T*)(static_cast<Impl*>(this)->readBytes(sizeof(T)));
}
Arena &arena() { return m_pool; }
ProtocolVersion protocolVersion() const { return m_protocolVersion; }
void setProtocolVersion(ProtocolVersion pv) { m_protocolVersion = pv; }
bool empty() const { return begin == end; }
void checkpoint() {
check = begin;
}
void rewind() {
ASSERT(check != nullptr);
begin = check;
check = nullptr;
}
protected:
_Reader(const char* begin, const char* end) : begin(begin), end(end) {}
_Reader(const char* begin, const char* end, const Arena& arena) : begin(begin), end(end), m_pool(arena) {}
const char *begin, *end;
const char* check = nullptr;
Arena m_pool;
ProtocolVersion m_protocolVersion;
};
class ArenaReader : public _Reader<ArenaReader> {
Optional<ArenaObjectReader> arenaObjectReader;
public:
const void* readBytes( int bytes ) {
const char* b = begin;
const char* e = b + bytes;
ASSERT( e <= end );
begin = e;
return b;
}
const uint8_t* arenaRead( int bytes ) {
return (const uint8_t*)readBytes(bytes);
}
const void* peekBytes(int bytes) const {
ASSERT( begin + bytes <= end );
return begin;
}
StringRef arenaReadAll() const {
return StringRef(reinterpret_cast<const uint8_t*>(begin), end - begin);
}
template <class VersionOptions>
ArenaReader(Arena const& arena, const StringRef& input, VersionOptions vo)
: _Reader(reinterpret_cast<const char*>(input.begin()), reinterpret_cast<const char*>(input.end()), arena) {
vo.read(*this);
if (m_protocolVersion.hasObjectSerializerFlag()) {
arenaObjectReader = ArenaObjectReader(arena, input, vo);
}
}
template <class T>
void deserialize(T& t) {
if constexpr (HasFileIdentifier<T>::value) {
if (arenaObjectReader.present()) {
arenaObjectReader.get().deserialize(t);
} else {
load(*this, t);
}
} else {
load(*this, t);
}
}
};
class BinaryReader : public _Reader<BinaryReader> {
Optional<ObjectReader> objectReader;
public:
const void* readBytes( int bytes );
const uint8_t* arenaRead( int bytes ) {
// Reads and returns the next bytes.
// The returned pointer has the lifetime of this.arena()
// Could be implemented zero-copy if [begin,end) was in this.arena() already; for now is a copy
if (!bytes) return nullptr;
uint8_t* dat = new (arena()) uint8_t[ bytes ];
serializeBytes( dat, bytes );
return dat;
}
template <class T, class VersionOptions>
static T fromStringRef( StringRef sr, VersionOptions vo ) {
T t;
BinaryReader r(sr, vo);
r >> t;
return t;
}
ProtocolVersion protocolVersion() const { return m_protocolVersion; }
void setProtocolVersion(ProtocolVersion pv) { m_protocolVersion = pv; }
void assertEnd() const { ASSERT(begin == end); }
bool empty() const { return begin == end; }
template <class VersionOptions>
BinaryReader(const void* data, int length, VersionOptions vo)
: _Reader(reinterpret_cast<const char*>(data), reinterpret_cast<const char*>(data) + length) {
readVersion(vo);
}
template <class VersionOptions>
BinaryReader(const StringRef& s, VersionOptions vo)
: _Reader(reinterpret_cast<const char*>(s.begin()), reinterpret_cast<const char*>(s.end())) {
readVersion(vo);
}
template <class VersionOptions>
BinaryReader(const std::string& s, VersionOptions vo) : _Reader(s.c_str(), s.c_str() + s.size()) {
readVersion(vo);
}
template<class T>
void deserialize(T &t) {
if constexpr (HasFileIdentifier<T>::value) {
if (objectReader.present()) {
objectReader.get().deserialize(t);
} else {
load(*this, t);
}
} else {
load(*this, t);
}
}
private:
template <class VersionOptions>
void readVersion(VersionOptions vo) {
vo.read(*this);
if (m_protocolVersion.hasObjectSerializerFlag()) {
objectReader = ObjectReader(reinterpret_cast<const uint8_t*>(begin), AssumeVersion(m_protocolVersion));
}
}
};
class SendBuffer {
protected:
uint8_t* _data;
public:
inline uint8_t const* data() const { return _data; }
inline uint8_t* data() { return _data; }
SendBuffer* next;
int bytes_written, bytes_sent;
int bytes_unsent() const {
return bytes_written - bytes_sent;
}
};
struct PacketBuffer : SendBuffer {
private:
int reference_count;
uint32_t const size_;
static constexpr size_t PACKET_BUFFER_MIN_SIZE = 16384;
static constexpr size_t PACKET_BUFFER_OVERHEAD = 40;
public:
double const enqueue_time;
size_t size() const { return size_; }
private:
explicit PacketBuffer(size_t size) : reference_count(1), size_(size), enqueue_time(g_network->now()) {
next = nullptr;
bytes_written = bytes_sent = 0;
_data = reinterpret_cast<uint8_t*>(this + 1);
static_assert(sizeof(PacketBuffer) == PACKET_BUFFER_OVERHEAD);
}
public:
static PacketBuffer* create(size_t size = 0) {
size = std::max(size, PACKET_BUFFER_MIN_SIZE - PACKET_BUFFER_OVERHEAD);
if (size == PACKET_BUFFER_MIN_SIZE - PACKET_BUFFER_OVERHEAD) {
return new (FastAllocator<PACKET_BUFFER_MIN_SIZE>::allocate()) PacketBuffer{ size };
}
uint8_t* mem = new uint8_t[size + PACKET_BUFFER_OVERHEAD];
return new (mem) PacketBuffer{ size };
}
PacketBuffer* nextPacketBuffer() { return static_cast<PacketBuffer*>(next); }
void addref() { ++reference_count; }
void delref() {
if (!--reference_count) {
if (size_ == PACKET_BUFFER_MIN_SIZE - PACKET_BUFFER_OVERHEAD) {
FastAllocator<PACKET_BUFFER_MIN_SIZE>::release(this);
} else {
delete[] this;
}
}
}
int bytes_unwritten() const { return size_ - bytes_written; }
};
struct PacketWriter {
static const int isDeserializing = 0;
static constexpr bool isSerializing = true;
typedef PacketWriter WRITER;
PacketBuffer* buffer;
struct ReliablePacket *reliable; // nullptr if this is unreliable; otherwise the last entry in the ReliablePacket::cont chain
int length;
ProtocolVersion m_protocolVersion;
// reliable is nullptr if this is an unreliable packet, or points to a ReliablePacket. PacketWriter is responsible
// for filling in reliable->buffer, ->cont, ->begin, and ->end, but not ->prev or ->next.
template <class VersionOptions>
PacketWriter(PacketBuffer* buf, ReliablePacket* reliable, VersionOptions vo) { init(buf, reliable); vo.read(*this); }
void serializeBytes(const void* data, int bytes) {
if (bytes <= buffer->bytes_unwritten()) {
memcpy(buffer->data() + buffer->bytes_written, data, bytes);
buffer->bytes_written += bytes;
} else {
serializeBytesAcrossBoundary(data, bytes);
}
}
void serializeBytesAcrossBoundary(const void* data, int bytes);
void writeAhead( int bytes, struct SplitBuffer* );
void nextBuffer(size_t size = 0 /* downstream it will default to at least 4k minus some padding */);
PacketBuffer* finish();
int size() const { return length; }
void serializeBytes( StringRef bytes ) {
serializeBytes(bytes.begin(), bytes.size());
}
template <class T>
void serializeBinaryItem( const T& t ) {
if (sizeof(T) <= buffer->bytes_unwritten()) {
*(T*)(buffer->data() + buffer->bytes_written) = t;
buffer->bytes_written += sizeof(T);
} else {
serializeBytesAcrossBoundary(&t, sizeof(T));
}
}
ProtocolVersion protocolVersion() const { return m_protocolVersion; }
void setProtocolVersion(ProtocolVersion pv) { m_protocolVersion = pv; }
private:
uint8_t* writeBytes(size_t size) {
if (size > buffer->bytes_unwritten()) {
nextBuffer(size);
ASSERT(buffer->size() >= size);
}
uint8_t* result = buffer->data() + buffer->bytes_written;
buffer->bytes_written += size;
return result;
}
template <class, class>
friend class MakeSerializeSource;
void init( PacketBuffer* buf, ReliablePacket* reliable );
};
struct ISerializeSource {
virtual void serializePacketWriter(PacketWriter&) const = 0;
virtual void serializeObjectWriter(ObjectWriter&) const = 0;
};
template <class T, class V>
struct MakeSerializeSource : ISerializeSource {
using value_type = V;
virtual void serializePacketWriter(PacketWriter& w) const {
ObjectWriter writer([&](size_t size) { return w.writeBytes(size); }, AssumeVersion(w.protocolVersion()));
writer.serialize(get()); // Writes directly into buffer supplied by |w|
}
virtual value_type const& get() const = 0;
};
template <class T>
struct SerializeSource : MakeSerializeSource<SerializeSource<T>, T> {
using value_type = T;
T const& value;
SerializeSource(T const& value) : value(value) {}
virtual void serializeObjectWriter(ObjectWriter& w) const {
w.serialize(value);
}
virtual T const& get() const { return value; }
};
#endif