873 lines
25 KiB
C++
873 lines
25 KiB
C++
/*
|
|
* serialize.h
|
|
*
|
|
* This source file is part of the FoundationDB open source project
|
|
*
|
|
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
#ifndef FLOW_SERIALIZE_H
|
|
#define FLOW_SERIALIZE_H
|
|
#pragma once
|
|
|
|
#include <stdint.h>
|
|
#include <array>
|
|
#include <set>
|
|
#include "flow/ProtocolVersion.h"
|
|
#include "flow/Error.h"
|
|
#include "flow/Arena.h"
|
|
#include "flow/FileIdentifier.h"
|
|
#include "flow/ObjectSerializer.h"
|
|
#include "flow/network.h"
|
|
#include <algorithm>
|
|
#include <deque>
|
|
|
|
// Though similar, is_binary_serializable cannot be replaced by std::is_pod, as doing so would prefer
|
|
// memcpy over a defined serialize() method on a POD struct. As not all of our structs are packed,
|
|
// this would both inflate message sizes by transmitting padding, and mean that we're transmitting
|
|
// undefined bytes over the wire.
|
|
// A more intelligent SFINAE that does "binarySerialize if POD and no serialize() is defined" could
|
|
// replace the usage of is_binary_serializable.
|
|
template <class T>
|
|
struct is_binary_serializable : std::false_type {};
|
|
|
|
#define BINARY_SERIALIZABLE(T) \
|
|
template <> \
|
|
struct is_binary_serializable<T> : std::true_type {};
|
|
|
|
BINARY_SERIALIZABLE(int8_t);
|
|
BINARY_SERIALIZABLE(uint8_t);
|
|
BINARY_SERIALIZABLE(int16_t);
|
|
BINARY_SERIALIZABLE(uint16_t);
|
|
BINARY_SERIALIZABLE(int32_t);
|
|
BINARY_SERIALIZABLE(uint32_t);
|
|
BINARY_SERIALIZABLE(int64_t);
|
|
BINARY_SERIALIZABLE(uint64_t);
|
|
BINARY_SERIALIZABLE(bool);
|
|
BINARY_SERIALIZABLE(double);
|
|
BINARY_SERIALIZABLE(ProtocolVersion);
|
|
|
|
template <>
|
|
struct scalar_traits<ProtocolVersion> : std::true_type {
|
|
constexpr static size_t size = sizeof(uint64_t);
|
|
|
|
static void save(uint8_t* out, const ProtocolVersion& v) {
|
|
*reinterpret_cast<uint64_t*>(out) = v.versionWithFlags();
|
|
}
|
|
|
|
template <class Context>
|
|
static void load(const uint8_t* i, ProtocolVersion& out, Context& context) {
|
|
const uint64_t* in = reinterpret_cast<const uint64_t*>(i);
|
|
out = ProtocolVersion(*in);
|
|
}
|
|
};
|
|
|
|
template <class Archive, class Item>
|
|
inline typename Archive::WRITER& operator<<(Archive& ar, const Item& item) {
|
|
save(ar, item);
|
|
return ar;
|
|
}
|
|
|
|
template <class Archive, class Item>
|
|
inline typename Archive::READER& operator>>(Archive& ar, Item& item) {
|
|
ar.deserialize(item);
|
|
return ar;
|
|
}
|
|
|
|
template <class Archive, class Item, class... Items>
|
|
typename Archive::WRITER& serializer(Archive& ar, const Item& item, const Items&... items) {
|
|
save(ar, item);
|
|
if constexpr (sizeof...(Items) > 0) {
|
|
serializer(ar, items...);
|
|
}
|
|
return ar;
|
|
}
|
|
|
|
template <class Archive, class Item, class... Items>
|
|
typename Archive::READER& serializer(Archive& ar, Item& item, Items&... items) {
|
|
load(ar, item);
|
|
if constexpr (sizeof...(Items) > 0) {
|
|
serializer(ar, items...);
|
|
}
|
|
return ar;
|
|
}
|
|
|
|
template <class Archive, class T, class Enable = void>
|
|
class Serializer {
|
|
public:
|
|
static void serialize(Archive& ar, T& t) {
|
|
t.serialize(ar);
|
|
ASSERT(ar.protocolVersion().isValid());
|
|
}
|
|
};
|
|
|
|
template <class Ar, class T>
|
|
inline void save(Ar& ar, const T& value) {
|
|
Serializer<Ar, T>::serialize(ar, const_cast<T&>(value));
|
|
}
|
|
|
|
template <class Ar, class T>
|
|
inline void load(Ar& ar, T& value) {
|
|
Serializer<Ar, T>::serialize(ar, value);
|
|
}
|
|
|
|
template <class CharT, class Traits, class Allocator>
|
|
struct FileIdentifierFor<std::basic_string<CharT, Traits, Allocator>> {
|
|
constexpr static FileIdentifier value = 15694229;
|
|
};
|
|
|
|
template <class Archive>
|
|
inline void load(Archive& ar, std::string& value) {
|
|
int32_t length;
|
|
ar >> length;
|
|
value.resize(length);
|
|
ar.serializeBytes(&value[0], (int)value.length());
|
|
ASSERT(ar.protocolVersion().isValid());
|
|
}
|
|
|
|
template <class Archive>
|
|
inline void save(Archive& ar, const std::string& value) {
|
|
ar << (int32_t)value.length();
|
|
ar.serializeBytes((void*)&value[0], (int)value.length());
|
|
ASSERT(ar.protocolVersion().isValid());
|
|
}
|
|
|
|
template <class Archive, class T>
|
|
class Serializer<Archive, T, typename std::enable_if_t<is_binary_serializable<T>::value>> {
|
|
public:
|
|
static void serialize(Archive& ar, T& t) { ar.serializeBinaryItem(t); }
|
|
};
|
|
|
|
template <class Archive, class T>
|
|
class Serializer<Archive, T, typename std::enable_if_t<std::is_enum_v<T>>> {
|
|
public:
|
|
static void serialize(Archive& ar, T& t) {
|
|
static_assert(is_binary_serializable<std::underlying_type_t<T>>::value);
|
|
ar.serializeBinaryItem(reinterpret_cast<std::underlying_type_t<T>&>(t));
|
|
}
|
|
};
|
|
|
|
template <class Archive, class T1, class T2>
|
|
class Serializer<Archive, std::pair<T1, T2>, void> {
|
|
public:
|
|
static void serialize(Archive& ar, std::pair<T1, T2>& p) { serializer(ar, p.first, p.second); }
|
|
};
|
|
|
|
template <class T, class Allocator>
|
|
struct FileIdentifierFor<std::vector<T, Allocator>> : ComposedIdentifierExternal<T, 5> {};
|
|
|
|
template <class T, class Allocator>
|
|
struct CompositionDepthFor<std::vector<T, Allocator>> : std::integral_constant<int, CompositionDepthFor<T>::value + 1> {
|
|
};
|
|
|
|
template <class Archive, class T>
|
|
inline void save(Archive& ar, const std::vector<T>& value) {
|
|
ar << (int)value.size();
|
|
for (auto it = value.begin(); it != value.end(); ++it)
|
|
ar << *it;
|
|
ASSERT(ar.protocolVersion().isValid());
|
|
}
|
|
template <class Archive, class T>
|
|
inline void load(Archive& ar, std::vector<T>& value) {
|
|
int s;
|
|
ar >> s;
|
|
value.clear();
|
|
value.reserve(s);
|
|
for (int i = 0; i < s; i++) {
|
|
value.push_back(T());
|
|
ar >> value[i];
|
|
}
|
|
ASSERT(ar.protocolVersion().isValid());
|
|
}
|
|
|
|
template <class Archive, class T>
|
|
inline void save(Archive& ar, const std::deque<T>& value) {
|
|
ar << (int)value.size();
|
|
for (auto it = value.begin(); it != value.end(); ++it)
|
|
ar << *it;
|
|
ASSERT(ar.protocolVersion().isValid());
|
|
}
|
|
|
|
template <class Archive, class T>
|
|
inline void load(Archive& ar, std::deque<T>& value) {
|
|
int s;
|
|
ar >> s;
|
|
value.clear();
|
|
value.reserve(s);
|
|
for (int i = 0; i < s; i++) {
|
|
value.push_back(T());
|
|
ar >> value[i];
|
|
}
|
|
ASSERT(ar.protocolVersion().isValid());
|
|
}
|
|
|
|
template <class Archive, class T, size_t N>
|
|
inline void save(Archive& ar, const std::array<T, N>& value) {
|
|
for (int ii = 0; ii < N; ++ii)
|
|
ar << value[ii];
|
|
ASSERT(ar.protocolVersion().isValid());
|
|
}
|
|
template <class Archive, class T, size_t N>
|
|
inline void load(Archive& ar, std::array<T, N>& value) {
|
|
for (int ii = 0; ii < N; ii++) {
|
|
ar >> value[ii];
|
|
}
|
|
ASSERT(ar.protocolVersion().isValid());
|
|
}
|
|
|
|
template <class Archive, class T>
|
|
inline void save(Archive& ar, const std::set<T>& value) {
|
|
ar << (int)value.size();
|
|
for (auto it = value.begin(); it != value.end(); ++it)
|
|
ar << *it;
|
|
ASSERT(ar.protocolVersion().isValid());
|
|
}
|
|
template <class Archive, class T>
|
|
inline void load(Archive& ar, std::set<T>& value) {
|
|
int s;
|
|
ar >> s;
|
|
value.clear();
|
|
T currentValue;
|
|
for (int i = 0; i < s; i++) {
|
|
ar >> currentValue;
|
|
value.insert(currentValue);
|
|
}
|
|
ASSERT(ar.protocolVersion().isValid());
|
|
}
|
|
|
|
template <class Archive, class K, class V>
|
|
inline void save(Archive& ar, const std::map<K, V>& value) {
|
|
ar << (int)value.size();
|
|
for (const auto& it : value) {
|
|
ar << it.first << it.second;
|
|
}
|
|
ASSERT(ar.protocolVersion().isValid());
|
|
}
|
|
template <class Archive, class K, class V>
|
|
inline void load(Archive& ar, std::map<K, V>& value) {
|
|
int s;
|
|
ar >> s;
|
|
value.clear();
|
|
for (int i = 0; i < s; ++i) {
|
|
std::pair<K, V> p;
|
|
ar >> p.first >> p.second;
|
|
value.emplace(p);
|
|
}
|
|
ASSERT(ar.protocolVersion().isValid());
|
|
}
|
|
|
|
#pragma intrinsic(memcpy)
|
|
|
|
#if VALGRIND
|
|
static bool valgrindCheck(const void* data, int bytes, const char* context) {
|
|
auto first = VALGRIND_CHECK_MEM_IS_DEFINED(data, bytes);
|
|
if (first) {
|
|
int und = 0;
|
|
for (int b = 0; b < bytes; b++)
|
|
if (VALGRIND_CHECK_MEM_IS_DEFINED((uint8_t*)data + b, 1))
|
|
und++;
|
|
TraceEvent(SevError, "UndefinedData")
|
|
.detail("In", context)
|
|
.detail("Size", bytes)
|
|
.detail("Undefined", und)
|
|
.detail("FirstAt", (int64_t)first - (int64_t)data);
|
|
return false;
|
|
}
|
|
return true;
|
|
}
|
|
#else
|
|
static inline bool valgrindCheck(const void* data, int bytes, const char* context) {
|
|
return true;
|
|
}
|
|
#endif
|
|
|
|
struct _IncludeVersion {
|
|
ProtocolVersion v;
|
|
explicit _IncludeVersion(ProtocolVersion defaultVersion) : v(defaultVersion) { ASSERT(defaultVersion.isValid()); }
|
|
template <class Ar>
|
|
void write(Ar& ar) {
|
|
ar.setProtocolVersion(v);
|
|
ar << v;
|
|
}
|
|
template <class Ar>
|
|
void read(Ar& ar) {
|
|
ar >> v;
|
|
if (!v.isValid()) {
|
|
auto err = incompatible_protocol_version();
|
|
TraceEvent(SevWarnAlways, "InvalidSerializationVersion")
|
|
.error(err)
|
|
.detailf("Version", "%llx", v.versionWithFlags());
|
|
throw err;
|
|
}
|
|
if (v >= minInvalidProtocolVersion) {
|
|
// Downgrades are only supported for one minor version
|
|
auto err = incompatible_protocol_version();
|
|
TraceEvent(SevError, "FutureProtocolVersion").error(err).detailf("Version", "%llx", v.versionWithFlags());
|
|
throw err;
|
|
}
|
|
ar.setProtocolVersion(v);
|
|
}
|
|
};
|
|
struct _AssumeVersion {
|
|
ProtocolVersion v;
|
|
explicit _AssumeVersion(ProtocolVersion version);
|
|
template <class Ar>
|
|
void write(Ar& ar) {
|
|
ar.setProtocolVersion(v);
|
|
}
|
|
template <class Ar>
|
|
void read(Ar& ar) {
|
|
ar.setProtocolVersion(v);
|
|
}
|
|
};
|
|
struct _Unversioned {
|
|
template <class Ar>
|
|
void write(Ar& ar) {}
|
|
template <class Ar>
|
|
void read(Ar& ar) {}
|
|
};
|
|
|
|
// These functions return valid options to the VersionOptions parameter of the constructor of each archive type
|
|
inline _IncludeVersion IncludeVersion(ProtocolVersion defaultVersion = currentProtocolVersion) {
|
|
return _IncludeVersion(defaultVersion);
|
|
}
|
|
inline _AssumeVersion AssumeVersion(ProtocolVersion version) {
|
|
return _AssumeVersion(version);
|
|
}
|
|
inline _Unversioned Unversioned() {
|
|
return _Unversioned();
|
|
}
|
|
|
|
// static uint64_t size_limits[] = { 0ULL, 255ULL, 65535ULL, 16777215ULL, 4294967295ULL, 1099511627775ULL,
|
|
// 281474976710655ULL, 72057594037927935ULL, 18446744073709551615ULL };
|
|
|
|
class BinaryWriter : NonCopyable {
|
|
public:
|
|
static const int isDeserializing = 0;
|
|
static constexpr bool isSerializing = true;
|
|
typedef BinaryWriter WRITER;
|
|
|
|
void serializeBytes(StringRef bytes) { serializeBytes(bytes.begin(), bytes.size()); }
|
|
void serializeBytes(const void* data, int bytes) {
|
|
if (bytes > 0) {
|
|
valgrindCheck(data, bytes, "serializeBytes");
|
|
void* p = writeBytes(bytes);
|
|
memcpy(p, data, bytes);
|
|
}
|
|
}
|
|
template <class T>
|
|
void serializeBinaryItem(const T& t) {
|
|
static_assert(is_binary_serializable<T>::value,
|
|
"Object must be binary serializable, see BINARY_SERIALIZABLE macro");
|
|
*(T*)writeBytes(sizeof(T)) = t;
|
|
}
|
|
void* getData() { return data; }
|
|
int getLength() const { return size; }
|
|
Standalone<StringRef> toValue() const { return Standalone<StringRef>(StringRef(data, size), arena); }
|
|
template <class VersionOptions>
|
|
explicit BinaryWriter(VersionOptions vo) : data(nullptr), size(0), allocated(0) {
|
|
vo.write(*this);
|
|
}
|
|
BinaryWriter(BinaryWriter&& rhs)
|
|
: arena(std::move(rhs.arena)), data(rhs.data), size(rhs.size), allocated(rhs.allocated),
|
|
m_protocolVersion(rhs.m_protocolVersion) {
|
|
rhs.size = 0;
|
|
rhs.allocated = 0;
|
|
rhs.data = nullptr;
|
|
}
|
|
void operator=(BinaryWriter&& r) {
|
|
arena = std::move(r.arena);
|
|
data = r.data;
|
|
size = r.size;
|
|
allocated = r.allocated;
|
|
m_protocolVersion = r.m_protocolVersion;
|
|
r.size = 0;
|
|
r.allocated = 0;
|
|
r.data = nullptr;
|
|
}
|
|
|
|
template <class T, class VersionOptions>
|
|
static Standalone<StringRef> toValue(T const& t, VersionOptions vo) {
|
|
BinaryWriter wr(vo);
|
|
wr << t;
|
|
return wr.toValue();
|
|
}
|
|
|
|
static int bytesNeeded(uint64_t val) {
|
|
int n;
|
|
for (n = 1; n < 8 && (val >> (n * 8)); ++n)
|
|
;
|
|
return n;
|
|
}
|
|
|
|
void serializeAsTuple(StringRef str) {
|
|
size_t last_pos = 0;
|
|
|
|
serializeBytes(LiteralStringRef("\x01"));
|
|
|
|
for (size_t pos = 0; pos < str.size(); ++pos) {
|
|
if (str[pos] == '\x00') {
|
|
serializeBytes(str.substr(last_pos, pos - last_pos));
|
|
serializeBytes(LiteralStringRef("\x00\xff"));
|
|
last_pos = pos + 1;
|
|
}
|
|
}
|
|
serializeBytes(str.substr(last_pos, str.size() - last_pos));
|
|
serializeBytes(LiteralStringRef("\x00"));
|
|
}
|
|
|
|
void serializeAsTuple(bool t) {
|
|
if (!t) {
|
|
void* p = writeBytes(1);
|
|
((uint8_t*)p)[0] = (uint8_t)20;
|
|
} else {
|
|
void* p = writeBytes(2);
|
|
((uint8_t*)p)[0] = (uint8_t)21;
|
|
((uint8_t*)p)[1] = (uint8_t)1;
|
|
}
|
|
}
|
|
|
|
void serializeAsTuple(uint64_t t) {
|
|
if (t == 0) {
|
|
void* p = writeBytes(1);
|
|
((uint8_t*)p)[0] = (uint8_t)20;
|
|
return;
|
|
}
|
|
|
|
// int n = ( std::lower_bound(size_limits, size_limits+9, t) - size_limits );
|
|
// ASSERT( n <= 8 );
|
|
int n = bytesNeeded(t);
|
|
void* p = writeBytes(n + 1);
|
|
((uint8_t*)p)[0] = (uint8_t)(20 + n);
|
|
uint64_t x = bigEndian64(t);
|
|
memcpy((uint8_t*)p + 1, (uint8_t*)&x + (8 - n), n);
|
|
}
|
|
|
|
void serializeAsTuple(int64_t t) {
|
|
if (t == 0) {
|
|
void* p = writeBytes(1);
|
|
((uint8_t*)p)[0] = (uint8_t)20;
|
|
} else if (t > 0) {
|
|
// int n = ( std::lower_bound(size_limits, size_limits+9, t) - size_limits );
|
|
// ASSERT( n <= 9 );
|
|
int n = bytesNeeded(t);
|
|
|
|
void* p = writeBytes(n + 1);
|
|
((uint8_t*)p)[0] = (uint8_t)(20 + n);
|
|
uint64_t x = bigEndian64((uint64_t)t);
|
|
memcpy((uint8_t*)p + 1, (uint8_t*)&x + (8 - n), n);
|
|
} else {
|
|
// int n = ( std::lower_bound(size_limits, size_limits+9, -t) - size_limits );
|
|
// ASSERT( n <= 9 );
|
|
int n = bytesNeeded(-t);
|
|
|
|
void* p = writeBytes(n + 1);
|
|
((uint8_t*)p)[0] = (uint8_t)(20 - n);
|
|
uint64_t x = bigEndian64(t - 1);
|
|
memcpy((uint8_t*)p + 1, (uint8_t*)&x + (8 - n), n);
|
|
}
|
|
}
|
|
|
|
ProtocolVersion protocolVersion() const { return m_protocolVersion; }
|
|
void setProtocolVersion(ProtocolVersion pv) { m_protocolVersion = pv; }
|
|
|
|
private:
|
|
Arena arena;
|
|
uint8_t* data;
|
|
int size, allocated;
|
|
ProtocolVersion m_protocolVersion;
|
|
|
|
void* writeBytes(int s) {
|
|
int p = size;
|
|
size += s;
|
|
if (size > allocated) {
|
|
if (size <= 512 - sizeof(ArenaBlock)) {
|
|
allocated = 512 - sizeof(ArenaBlock);
|
|
} else if (size <= 4096 - sizeof(ArenaBlock)) {
|
|
allocated = 4096 - sizeof(ArenaBlock);
|
|
} else {
|
|
allocated = std::max(allocated * 2, size);
|
|
}
|
|
Arena newArena;
|
|
uint8_t* newData = new (newArena) uint8_t[allocated];
|
|
if (p > 0) {
|
|
memcpy(newData, data, p);
|
|
}
|
|
arena = newArena;
|
|
data = newData;
|
|
}
|
|
return data + p;
|
|
}
|
|
};
|
|
|
|
// A known-length memory segment and an unknown-length memory segment which can be written to as a whole.
|
|
struct SplitBuffer {
|
|
void write(const void* data, int length);
|
|
void write(const void* data, int length, int offset);
|
|
void writeAndShrink(const void* data, int length);
|
|
uint8_t *begin, *next;
|
|
int first_length;
|
|
};
|
|
|
|
// A writer that can serialize to a SplitBuffer
|
|
class OverWriter {
|
|
public:
|
|
typedef OverWriter WRITER;
|
|
|
|
template <class VersionOptions>
|
|
explicit OverWriter(SplitBuffer buf, VersionOptions vo) : len(std::numeric_limits<int>::max()), buf(buf) {
|
|
vo.write(*this);
|
|
}
|
|
|
|
template <class VersionOptions>
|
|
explicit OverWriter(void* ptr, int len, VersionOptions vo) : len(len) {
|
|
buf.begin = (uint8_t*)ptr;
|
|
buf.first_length = len;
|
|
vo.write(*this);
|
|
}
|
|
|
|
void serializeBytes(StringRef bytes) { serializeBytes(bytes.begin(), bytes.size()); }
|
|
void serializeBytes(const void* data, int bytes) {
|
|
valgrindCheck(data, bytes, "serializeBytes");
|
|
writeBytes(data, bytes);
|
|
}
|
|
template <class T>
|
|
void serializeBinaryItem(const T& t) {
|
|
static_assert(is_binary_serializable<T>::value,
|
|
"Object must be binary serializable, see BINARY_SERIALIZABLE macro");
|
|
writeBytes(&t, sizeof(T));
|
|
}
|
|
|
|
ProtocolVersion protocolVersion() const { return m_protocolVersion; }
|
|
void setProtocolVersion(ProtocolVersion pv) { m_protocolVersion = pv; }
|
|
|
|
private:
|
|
int len;
|
|
SplitBuffer buf;
|
|
ProtocolVersion m_protocolVersion;
|
|
|
|
void writeBytes(const void* data, int wlen) {
|
|
ASSERT(wlen <= len);
|
|
buf.writeAndShrink(data, wlen);
|
|
len -= wlen;
|
|
}
|
|
};
|
|
|
|
template <class Impl>
|
|
class _Reader {
|
|
public:
|
|
static const int isDeserializing = 1;
|
|
static constexpr bool isSerializing = false;
|
|
using READER = Impl;
|
|
|
|
const void* peekBytes(int bytes) const {
|
|
ASSERT(begin + bytes <= end);
|
|
return begin;
|
|
}
|
|
|
|
void serializeBytes(void* data, int bytes) { memcpy(data, static_cast<Impl*>(this)->readBytes(bytes), bytes); }
|
|
|
|
template <class T>
|
|
void serializeBinaryItem(T& t) {
|
|
static_assert(is_binary_serializable<T>::value,
|
|
"Object must be binary serializable, see BINARY_SERIALIZABLE macro");
|
|
t = *(T*)(static_cast<Impl*>(this)->readBytes(sizeof(T)));
|
|
}
|
|
|
|
Arena& arena() { return m_pool; }
|
|
|
|
ProtocolVersion protocolVersion() const { return m_protocolVersion; }
|
|
void setProtocolVersion(ProtocolVersion pv) { m_protocolVersion = pv; }
|
|
|
|
bool empty() const { return begin == end; }
|
|
|
|
void checkpoint() { check = begin; }
|
|
|
|
void rewind() {
|
|
ASSERT(check != nullptr);
|
|
begin = check;
|
|
check = nullptr;
|
|
}
|
|
|
|
protected:
|
|
_Reader(const char* begin, const char* end) : begin(begin), end(end) {}
|
|
_Reader(const char* begin, const char* end, const Arena& arena) : begin(begin), end(end), m_pool(arena) {}
|
|
|
|
const char *begin, *end;
|
|
const char* check = nullptr;
|
|
Arena m_pool;
|
|
ProtocolVersion m_protocolVersion;
|
|
};
|
|
|
|
class ArenaReader : public _Reader<ArenaReader> {
|
|
Optional<ArenaObjectReader> arenaObjectReader;
|
|
|
|
public:
|
|
const void* readBytes(int bytes) {
|
|
const char* b = begin;
|
|
const char* e = b + bytes;
|
|
ASSERT(e <= end);
|
|
begin = e;
|
|
return b;
|
|
}
|
|
|
|
const uint8_t* arenaRead(int bytes) { return (const uint8_t*)readBytes(bytes); }
|
|
|
|
const void* peekBytes(int bytes) const {
|
|
ASSERT(begin + bytes <= end);
|
|
return begin;
|
|
}
|
|
|
|
StringRef arenaReadAll() const { return StringRef(reinterpret_cast<const uint8_t*>(begin), end - begin); }
|
|
|
|
template <class VersionOptions>
|
|
ArenaReader(Arena const& arena, const StringRef& input, VersionOptions vo)
|
|
: _Reader(reinterpret_cast<const char*>(input.begin()), reinterpret_cast<const char*>(input.end()), arena) {
|
|
vo.read(*this);
|
|
if (m_protocolVersion.hasObjectSerializerFlag()) {
|
|
arenaObjectReader = ArenaObjectReader(arena, input, vo);
|
|
}
|
|
}
|
|
|
|
template <class T>
|
|
void deserialize(T& t) {
|
|
if constexpr (HasFileIdentifier<T>::value) {
|
|
if (arenaObjectReader.present()) {
|
|
arenaObjectReader.get().deserialize(t);
|
|
} else {
|
|
load(*this, t);
|
|
}
|
|
} else {
|
|
load(*this, t);
|
|
}
|
|
}
|
|
};
|
|
|
|
class BinaryReader : public _Reader<BinaryReader> {
|
|
Optional<ObjectReader> objectReader;
|
|
|
|
public:
|
|
const void* readBytes(int bytes);
|
|
|
|
const uint8_t* arenaRead(int bytes) {
|
|
// Reads and returns the next bytes.
|
|
// The returned pointer has the lifetime of this.arena()
|
|
// Could be implemented zero-copy if [begin,end) was in this.arena() already; for now is a copy
|
|
if (!bytes)
|
|
return nullptr;
|
|
uint8_t* dat = new (arena()) uint8_t[bytes];
|
|
serializeBytes(dat, bytes);
|
|
return dat;
|
|
}
|
|
|
|
template <class T, class VersionOptions>
|
|
static T fromStringRef(StringRef sr, VersionOptions vo) {
|
|
T t;
|
|
BinaryReader r(sr, vo);
|
|
r >> t;
|
|
return t;
|
|
}
|
|
|
|
ProtocolVersion protocolVersion() const { return m_protocolVersion; }
|
|
void setProtocolVersion(ProtocolVersion pv) { m_protocolVersion = pv; }
|
|
|
|
void assertEnd() const { ASSERT(begin == end); }
|
|
|
|
bool empty() const { return begin == end; }
|
|
|
|
template <class VersionOptions>
|
|
BinaryReader(const void* data, int length, VersionOptions vo)
|
|
: _Reader(reinterpret_cast<const char*>(data), reinterpret_cast<const char*>(data) + length) {
|
|
readVersion(vo);
|
|
}
|
|
template <class VersionOptions>
|
|
BinaryReader(const StringRef& s, VersionOptions vo)
|
|
: _Reader(reinterpret_cast<const char*>(s.begin()), reinterpret_cast<const char*>(s.end())) {
|
|
readVersion(vo);
|
|
}
|
|
template <class VersionOptions>
|
|
BinaryReader(const std::string& s, VersionOptions vo) : _Reader(s.c_str(), s.c_str() + s.size()) {
|
|
readVersion(vo);
|
|
}
|
|
|
|
template <class T>
|
|
void deserialize(T& t) {
|
|
if constexpr (HasFileIdentifier<T>::value) {
|
|
if (objectReader.present()) {
|
|
objectReader.get().deserialize(t);
|
|
} else {
|
|
load(*this, t);
|
|
}
|
|
} else {
|
|
load(*this, t);
|
|
}
|
|
}
|
|
|
|
private:
|
|
template <class VersionOptions>
|
|
void readVersion(VersionOptions vo) {
|
|
vo.read(*this);
|
|
if (m_protocolVersion.hasObjectSerializerFlag()) {
|
|
objectReader = ObjectReader(reinterpret_cast<const uint8_t*>(begin), AssumeVersion(m_protocolVersion));
|
|
}
|
|
}
|
|
};
|
|
|
|
class SendBuffer {
|
|
protected:
|
|
uint8_t* _data;
|
|
|
|
public:
|
|
inline uint8_t const* data() const { return _data; }
|
|
inline uint8_t* data() { return _data; }
|
|
SendBuffer* next;
|
|
int bytes_written, bytes_sent;
|
|
int bytes_unsent() const { return bytes_written - bytes_sent; }
|
|
};
|
|
|
|
struct PacketBuffer : SendBuffer {
|
|
private:
|
|
int reference_count;
|
|
uint32_t const size_;
|
|
static constexpr size_t PACKET_BUFFER_MIN_SIZE = 16384;
|
|
static constexpr size_t PACKET_BUFFER_OVERHEAD = 40;
|
|
|
|
public:
|
|
double const enqueue_time;
|
|
|
|
size_t size() const { return size_; }
|
|
|
|
private:
|
|
explicit PacketBuffer(size_t size) : reference_count(1), size_(size), enqueue_time(g_network->now()) {
|
|
next = nullptr;
|
|
bytes_written = bytes_sent = 0;
|
|
_data = reinterpret_cast<uint8_t*>(this + 1);
|
|
static_assert(sizeof(PacketBuffer) == PACKET_BUFFER_OVERHEAD);
|
|
}
|
|
|
|
public:
|
|
static PacketBuffer* create(size_t size = 0) {
|
|
size = std::max(size, PACKET_BUFFER_MIN_SIZE - PACKET_BUFFER_OVERHEAD);
|
|
if (size == PACKET_BUFFER_MIN_SIZE - PACKET_BUFFER_OVERHEAD) {
|
|
return new (FastAllocator<PACKET_BUFFER_MIN_SIZE>::allocate()) PacketBuffer{ size };
|
|
}
|
|
uint8_t* mem = new uint8_t[size + PACKET_BUFFER_OVERHEAD];
|
|
return new (mem) PacketBuffer{ size };
|
|
}
|
|
PacketBuffer* nextPacketBuffer() { return static_cast<PacketBuffer*>(next); }
|
|
void addref() { ++reference_count; }
|
|
void delref() {
|
|
if (!--reference_count) {
|
|
if (size_ == PACKET_BUFFER_MIN_SIZE - PACKET_BUFFER_OVERHEAD) {
|
|
FastAllocator<PACKET_BUFFER_MIN_SIZE>::release(this);
|
|
} else {
|
|
delete[] this;
|
|
}
|
|
}
|
|
}
|
|
int bytes_unwritten() const { return size_ - bytes_written; }
|
|
};
|
|
|
|
struct PacketWriter {
|
|
static constexpr int isDeserializing = 0;
|
|
static constexpr bool isSerializing = true;
|
|
typedef PacketWriter WRITER;
|
|
|
|
PacketBuffer* buffer;
|
|
struct ReliablePacket*
|
|
reliable; // nullptr if this is unreliable; otherwise the last entry in the ReliablePacket::cont chain
|
|
int length;
|
|
ProtocolVersion m_protocolVersion;
|
|
|
|
// reliable is nullptr if this is an unreliable packet, or points to a ReliablePacket. PacketWriter is responsible
|
|
// for filling in reliable->buffer, ->cont, ->begin, and ->end, but not ->prev or ->next.
|
|
template <class VersionOptions>
|
|
PacketWriter(PacketBuffer* buf, ReliablePacket* reliable, VersionOptions vo) {
|
|
init(buf, reliable);
|
|
vo.read(*this);
|
|
}
|
|
|
|
void serializeBytes(const void* data, int bytes) {
|
|
if (bytes <= buffer->bytes_unwritten()) {
|
|
memcpy(buffer->data() + buffer->bytes_written, data, bytes);
|
|
buffer->bytes_written += bytes;
|
|
} else {
|
|
serializeBytesAcrossBoundary(data, bytes);
|
|
}
|
|
}
|
|
void writeAhead(int bytes, struct SplitBuffer*);
|
|
PacketBuffer* finish();
|
|
int size() const { return length; }
|
|
|
|
void serializeBytes(StringRef bytes) { serializeBytes(bytes.begin(), bytes.size()); }
|
|
template <class T>
|
|
void serializeBinaryItem(const T& t) {
|
|
static_assert(is_binary_serializable<T>::value,
|
|
"Object must be binary serializable, see BINARY_SERIALIZABLE macro");
|
|
if (sizeof(T) <= buffer->bytes_unwritten()) {
|
|
*(T*)(buffer->data() + buffer->bytes_written) = t;
|
|
buffer->bytes_written += sizeof(T);
|
|
} else {
|
|
serializeBytesAcrossBoundary(&t, sizeof(T));
|
|
}
|
|
}
|
|
ProtocolVersion protocolVersion() const { return m_protocolVersion; }
|
|
void setProtocolVersion(ProtocolVersion pv) { m_protocolVersion = pv; }
|
|
|
|
private:
|
|
void serializeBytesAcrossBoundary(const void* data, int bytes);
|
|
void nextBuffer(size_t size = 0 /* downstream it will default to at least 4k minus some padding */);
|
|
uint8_t* writeBytes(size_t size) {
|
|
if (size > buffer->bytes_unwritten()) {
|
|
nextBuffer(size);
|
|
ASSERT(buffer->size() >= size);
|
|
}
|
|
uint8_t* result = buffer->data() + buffer->bytes_written;
|
|
buffer->bytes_written += size;
|
|
return result;
|
|
}
|
|
|
|
template <class, class>
|
|
friend class MakeSerializeSource;
|
|
|
|
void init(PacketBuffer* buf, ReliablePacket* reliable);
|
|
};
|
|
|
|
struct ISerializeSource {
|
|
virtual void serializePacketWriter(PacketWriter&) const = 0;
|
|
virtual void serializeObjectWriter(ObjectWriter&) const = 0;
|
|
};
|
|
|
|
template <class T, class V>
|
|
struct MakeSerializeSource : ISerializeSource {
|
|
using value_type = V;
|
|
void serializePacketWriter(PacketWriter& w) const override {
|
|
ObjectWriter writer([&](size_t size) { return w.writeBytes(size); }, AssumeVersion(w.protocolVersion()));
|
|
writer.serialize(get()); // Writes directly into buffer supplied by |w|
|
|
}
|
|
virtual value_type const& get() const = 0;
|
|
};
|
|
|
|
template <class T>
|
|
struct SerializeSource : MakeSerializeSource<SerializeSource<T>, T> {
|
|
using value_type = T;
|
|
T const& value;
|
|
SerializeSource(T const& value) : value(value) {}
|
|
void serializeObjectWriter(ObjectWriter& w) const override { w.serialize(value); }
|
|
T const& get() const override { return value; }
|
|
};
|
|
|
|
#endif
|