Remove USE_OBJECT_SERIALIZER knob
This commit is contained in:
parent
36d87c60b5
commit
1248d2b8b4
|
@ -511,7 +511,7 @@ ACTOR Future<Void> asyncDeserializeClusterInterface(Reference<AsyncVar<Value>> s
|
|||
Reference<AsyncVar<Optional<ClusterInterface>>> outKnownLeader) {
|
||||
state Reference<AsyncVar<Optional<ClusterControllerClientInterface>>> knownLeader(
|
||||
new AsyncVar<Optional<ClusterControllerClientInterface>>{});
|
||||
state Future<Void> deserializer = asyncDeserialize(serializedInfo, knownLeader, FLOW_KNOBS->USE_OBJECT_SERIALIZER);
|
||||
state Future<Void> deserializer = asyncDeserialize(serializedInfo, knownLeader);
|
||||
loop {
|
||||
choose {
|
||||
when(wait(deserializer)) { UNSTOPPABLE_ASSERT(false); }
|
||||
|
@ -655,15 +655,10 @@ ACTOR Future<Void> monitorLeaderForProxies( Key clusterKey, vector<NetworkAddres
|
|||
}
|
||||
|
||||
if (leader.get().first.serializedInfo.size()) {
|
||||
if (FLOW_KNOBS->USE_OBJECT_SERIALIZER) {
|
||||
ObjectReader reader(leader.get().first.serializedInfo.begin(), IncludeVersion());
|
||||
ClusterControllerClientInterface res;
|
||||
reader.deserialize(res);
|
||||
knownLeader->set(res);
|
||||
} else {
|
||||
ClusterControllerClientInterface res = BinaryReader::fromStringRef<ClusterControllerClientInterface>( leader.get().first.serializedInfo, IncludeVersion() );
|
||||
knownLeader->set(res);
|
||||
}
|
||||
ObjectReader reader(leader.get().first.serializedInfo.begin(), IncludeVersion());
|
||||
ClusterControllerClientInterface res;
|
||||
reader.deserialize(res);
|
||||
knownLeader->set(res);
|
||||
}
|
||||
}
|
||||
wait( nomineeChange.onTrigger() || allActors );
|
||||
|
|
|
@ -71,7 +71,7 @@ template <class LeaderInterface>
|
|||
struct LeaderDeserializer {
|
||||
Future<Void> operator()(const Reference<AsyncVar<Value>>& serializedInfo,
|
||||
const Reference<AsyncVar<Optional<LeaderInterface>>>& outKnownLeader) {
|
||||
return asyncDeserialize(serializedInfo, outKnownLeader, FLOW_KNOBS->USE_OBJECT_SERIALIZER);
|
||||
return asyncDeserialize(serializedInfo, outKnownLeader);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -578,9 +578,7 @@ void Peer::prependConnectPacket() {
|
|||
|
||||
pkt.connectPacketLength = sizeof(pkt) - sizeof(pkt.connectPacketLength);
|
||||
pkt.protocolVersion = currentProtocolVersion;
|
||||
if (FLOW_KNOBS->USE_OBJECT_SERIALIZER) {
|
||||
pkt.protocolVersion.addObjectSerializerFlag();
|
||||
}
|
||||
pkt.protocolVersion.addObjectSerializerFlag();
|
||||
pkt.connectionId = transport->transportId;
|
||||
|
||||
PacketBuffer* pb_first = PacketBuffer::create();
|
||||
|
@ -655,14 +653,10 @@ ACTOR static void deliver(TransportData* self, Endpoint destination, ArenaReader
|
|||
if (receiver) {
|
||||
try {
|
||||
g_currentDeliveryPeerAddress = destination.addresses;
|
||||
if (FLOW_KNOBS->USE_OBJECT_SERIALIZER) {
|
||||
StringRef data = reader.arenaReadAll();
|
||||
ASSERT(data.size() > 8);
|
||||
ArenaObjectReader objReader(reader.arena(), reader.arenaReadAll(), AssumeVersion(reader.protocolVersion()));
|
||||
receiver->receive(objReader);
|
||||
} else {
|
||||
receiver->receive(reader);
|
||||
}
|
||||
StringRef data = reader.arenaReadAll();
|
||||
ASSERT(data.size() > 8);
|
||||
ArenaObjectReader objReader(reader.arena(), reader.arenaReadAll(), AssumeVersion(reader.protocolVersion()));
|
||||
receiver->receive(objReader);
|
||||
g_currentDeliveryPeerAddress = { NetworkAddress() };
|
||||
} catch (Error& e) {
|
||||
g_currentDeliveryPeerAddress = {NetworkAddress()};
|
||||
|
@ -864,8 +858,8 @@ ACTOR static Future<Void> connectionReader(
|
|||
serializer(pktReader, pkt);
|
||||
|
||||
uint64_t connectionId = pkt.connectionId;
|
||||
if((FLOW_KNOBS->USE_OBJECT_SERIALIZER != 0) != pkt.protocolVersion.hasObjectSerializerFlag() ||
|
||||
!pkt.protocolVersion.isCompatible(currentProtocolVersion)) {
|
||||
if (!pkt.protocolVersion.hasObjectSerializerFlag() ||
|
||||
!pkt.protocolVersion.isCompatible(currentProtocolVersion)) {
|
||||
incompatibleProtocolVersionNewer = pkt.protocolVersion > currentProtocolVersion;
|
||||
NetworkAddress addr = pkt.canonicalRemotePort
|
||||
? NetworkAddress(pkt.canonicalRemoteIp(), pkt.canonicalRemotePort)
|
||||
|
@ -896,8 +890,7 @@ ACTOR static Future<Void> connectionReader(
|
|||
// Older versions expected us to hang up. It may work even if we don't hang up here, but it's safer to keep the old behavior.
|
||||
throw incompatible_protocol_version();
|
||||
}
|
||||
}
|
||||
else {
|
||||
} else {
|
||||
compatible = true;
|
||||
TraceEvent("ConnectionEstablished", conn->getDebugID())
|
||||
.suppressFor(1.0)
|
||||
|
@ -1161,15 +1154,9 @@ static void sendLocal( TransportData* self, ISerializeSource const& what, const
|
|||
// SOMEDAY: Would it be better to avoid (de)serialization by doing this check in flow?
|
||||
|
||||
Standalone<StringRef> copy;
|
||||
if (FLOW_KNOBS->USE_OBJECT_SERIALIZER) {
|
||||
ObjectWriter wr(AssumeVersion(currentProtocolVersion));
|
||||
what.serializeObjectWriter(wr);
|
||||
copy = wr.toStringRef();
|
||||
} else {
|
||||
BinaryWriter wr( AssumeVersion(currentProtocolVersion) );
|
||||
what.serializeBinaryWriter(wr);
|
||||
copy = wr.toValue();
|
||||
}
|
||||
ObjectWriter wr(AssumeVersion(currentProtocolVersion));
|
||||
what.serializeObjectWriter(wr);
|
||||
copy = wr.toStringRef();
|
||||
#if VALGRIND
|
||||
VALGRIND_CHECK_MEM_IS_DEFINED(copy.begin(), copy.size());
|
||||
#endif
|
||||
|
@ -1208,7 +1195,7 @@ static ReliablePacket* sendPacket( TransportData* self, Reference<Peer> peer, IS
|
|||
|
||||
wr.writeAhead(packetInfoSize , &packetInfoBuffer);
|
||||
wr << destination.token;
|
||||
what.serializePacketWriter(wr, FLOW_KNOBS->USE_OBJECT_SERIALIZER);
|
||||
what.serializePacketWriter(wr);
|
||||
pb = wr.finish();
|
||||
len = wr.size() - packetInfoSize;
|
||||
|
||||
|
|
|
@ -34,19 +34,11 @@ ACTOR template <class T>
|
|||
void networkSender(Future<T> input, Endpoint endpoint) {
|
||||
try {
|
||||
T value = wait(input);
|
||||
if (FLOW_KNOBS->USE_OBJECT_SERIALIZER) {
|
||||
FlowTransport::transport().sendUnreliable(SerializeSource<ErrorOr<EnsureTable<T>>>(value), endpoint, false);
|
||||
} else {
|
||||
FlowTransport::transport().sendUnreliable(SerializeBoolAnd<T>(true, value), endpoint, false);
|
||||
}
|
||||
FlowTransport::transport().sendUnreliable(SerializeSource<ErrorOr<EnsureTable<T>>>(value), endpoint, false);
|
||||
} catch (Error& err) {
|
||||
// if (err.code() == error_code_broken_promise) return;
|
||||
ASSERT(err.code() != error_code_actor_cancelled);
|
||||
if (FLOW_KNOBS->USE_OBJECT_SERIALIZER) {
|
||||
FlowTransport::transport().sendUnreliable(SerializeSource<ErrorOr<EnsureTable<T>>>(err), endpoint, false);
|
||||
} else {
|
||||
FlowTransport::transport().sendUnreliable(SerializeBoolAnd<Error>(false, err), endpoint, false);
|
||||
}
|
||||
FlowTransport::transport().sendUnreliable(SerializeSource<ErrorOr<EnsureTable<T>>>(err), endpoint, false);
|
||||
}
|
||||
}
|
||||
#include "flow/unactorcompiler.h"
|
||||
|
|
|
@ -61,11 +61,9 @@ Future<Void> tryBecomeLeader( ServerCoordinators const& coordinators,
|
|||
Reference<AsyncVar<ClusterControllerPriorityInfo>> const& asyncPriorityInfo)
|
||||
{
|
||||
Reference<AsyncVar<Value>> serializedInfo(new AsyncVar<Value>);
|
||||
Future<Void> m = tryBecomeLeaderInternal(
|
||||
coordinators,
|
||||
FLOW_KNOBS->USE_OBJECT_SERIALIZER ? ObjectWriter::toValue(proposedInterface, IncludeVersion()) : BinaryWriter::toValue(proposedInterface, IncludeVersion()),
|
||||
serializedInfo, hasConnected, asyncPriorityInfo);
|
||||
return m || asyncDeserialize(serializedInfo, outKnownLeader, FLOW_KNOBS->USE_OBJECT_SERIALIZER);
|
||||
Future<Void> m = tryBecomeLeaderInternal(coordinators, ObjectWriter::toValue(proposedInterface, IncludeVersion()),
|
||||
serializedInfo, hasConnected, asyncPriorityInfo);
|
||||
return m || asyncDeserialize(serializedInfo, outKnownLeader);
|
||||
}
|
||||
|
||||
#ifndef __INTEL_COMPILER
|
||||
|
|
|
@ -69,7 +69,6 @@ FlowKnobs::FlowKnobs(bool randomize, bool isSimulated) {
|
|||
init( RECONNECTION_TIME_GROWTH_RATE, 1.2 );
|
||||
init( RECONNECTION_RESET_TIME, 5.0 );
|
||||
init( CONNECTION_ACCEPT_DELAY, 0.5 );
|
||||
init( USE_OBJECT_SERIALIZER, 1 );
|
||||
init( TOO_MANY_CONNECTIONS_CLOSED_RESET_DELAY, 5.0 );
|
||||
init( TOO_MANY_CONNECTIONS_CLOSED_TIMEOUT, 20.0 );
|
||||
init( PEER_UNAVAILABLE_FOR_LONG_TIME_TIMEOUT, 3600.0 );
|
||||
|
|
|
@ -88,7 +88,6 @@ public:
|
|||
double RECONNECTION_TIME_GROWTH_RATE;
|
||||
double RECONNECTION_RESET_TIME;
|
||||
double CONNECTION_ACCEPT_DELAY;
|
||||
int USE_OBJECT_SERIALIZER;
|
||||
|
||||
int TLS_CERT_REFRESH_DELAY_SECONDS;
|
||||
double TLS_SERVER_CONNECTION_THROTTLE_TIMEOUT;
|
||||
|
|
|
@ -718,17 +718,15 @@ private:
|
|||
}
|
||||
};
|
||||
|
||||
ACTOR template <class T> Future<Void> asyncDeserialize( Reference<AsyncVar<Standalone<StringRef>>> input, Reference<AsyncVar<Optional<T>>> output, bool useObjSerializer ) {
|
||||
ACTOR template <class T>
|
||||
Future<Void> asyncDeserialize(Reference<AsyncVar<Standalone<StringRef>>> input,
|
||||
Reference<AsyncVar<Optional<T>>> output) {
|
||||
loop {
|
||||
if (input->get().size()) {
|
||||
if (useObjSerializer) {
|
||||
ObjectReader reader(input->get().begin(), IncludeVersion());
|
||||
T res;
|
||||
reader.deserialize(res);
|
||||
output->set(res);
|
||||
} else {
|
||||
output->set( BinaryReader::fromStringRef<T>( input->get(), IncludeVersion() ) );
|
||||
}
|
||||
ObjectReader reader(input->get().begin(), IncludeVersion());
|
||||
T res;
|
||||
reader.deserialize(res);
|
||||
output->set(res);
|
||||
} else
|
||||
output->set( Optional<T>() );
|
||||
wait( input->onChange() );
|
||||
|
|
|
@ -773,7 +773,7 @@ private:
|
|||
};
|
||||
|
||||
struct ISerializeSource {
|
||||
virtual void serializePacketWriter(PacketWriter&, bool useObjectSerializer) const = 0;
|
||||
virtual void serializePacketWriter(PacketWriter&) const = 0;
|
||||
virtual void serializeBinaryWriter(BinaryWriter&) const = 0;
|
||||
virtual void serializeObjectWriter(ObjectWriter&) const = 0;
|
||||
};
|
||||
|
@ -781,13 +781,9 @@ struct ISerializeSource {
|
|||
template <class T, class V>
|
||||
struct MakeSerializeSource : ISerializeSource {
|
||||
using value_type = V;
|
||||
virtual void serializePacketWriter(PacketWriter& w, bool useObjectSerializer) const {
|
||||
if (useObjectSerializer) {
|
||||
ObjectWriter writer([&](size_t size) { return w.writeBytes(size); }, AssumeVersion(w.protocolVersion()));
|
||||
writer.serialize(get()); // Writes directly into buffer supplied by |w|
|
||||
} else {
|
||||
static_cast<T const*>(this)->serialize(w);
|
||||
}
|
||||
virtual void serializePacketWriter(PacketWriter& w) const {
|
||||
ObjectWriter writer([&](size_t size) { return w.writeBytes(size); }, AssumeVersion(w.protocolVersion()));
|
||||
writer.serialize(get()); // Writes directly into buffer supplied by |w|
|
||||
}
|
||||
virtual void serializeBinaryWriter(BinaryWriter& w) const { static_cast<T const*>(this)->serialize(w); }
|
||||
virtual value_type const& get() const = 0;
|
||||
|
|
Loading…
Reference in New Issue