UDP implementation (untested)

This commit is contained in:
Markus Pilman 2020-08-06 14:06:50 -06:00
parent 918fce74e9
commit 8976694ba1
9 changed files with 744 additions and 26 deletions

View File

@ -84,12 +84,6 @@ using std::pair;
namespace {
ACTOR template <class T, class Fun>
Future<T> runAfter(Future<T> in, Fun func) {
T res = wait(in);
return func(res);
}
template <class Interface, class Request>
Future<REPLY_TYPE(Request)> loadBalance(
DatabaseContext* ctx, const Reference<LocationInfo> alternatives, RequestStream<Request> Interface::*channel,
@ -99,13 +93,14 @@ Future<REPLY_TYPE(Request)> loadBalance(
if (alternatives->hasCaches) {
return loadBalance(alternatives->locations(), channel, request, taskID, atMostOnce, model);
}
return runAfter(loadBalance(alternatives->locations(), channel, request, taskID, atMostOnce, model),
[ctx](auto res) {
if (res.cached) {
ctx->updateCache.trigger();
}
return res;
});
return fmap(
[ctx](auto const& res) {
if (res.cached) {
ctx->updateCache.trigger();
}
return res;
},
loadBalance(alternatives->locations(), channel, request, taskID, atMostOnce, model));
}
} // namespace

View File

@ -19,8 +19,14 @@
*/
#include <cinttypes>
#include <deque>
#include <memory>
#include <sys/socket.h>
#include <vector>
#include "fdbrpc/simulator.h"
#include "flow/ActorCollection.h"
#include "flow/IRandom.h"
#include "flow/IThreadPool.h"
#include "flow/Util.h"
#include "fdbrpc/IAsyncFile.h"
@ -29,6 +35,8 @@
#include "flow/crc32c.h"
#include "fdbrpc/TraceFileIO.h"
#include "flow/FaultInjection.h"
#include "flow/flow.h"
#include "flow/genericactors.actor.h"
#include "flow/network.h"
#include "flow/TLSConfig.actor.h"
#include "fdbrpc/Net2FileSystem.h"
@ -824,6 +832,9 @@ public:
((Sim2Listener*)peerp->getListener(toAddr).getPtr())->incomingConnection( 0.5*deterministicRandom()->random01(), Reference<IConnection>(peerc) );
return onConnect( ::delay(0.5*deterministicRandom()->random01()), myc );
}
Future<Reference<IUDPSocket>> createUDPSocket(NetworkAddress toAddr) override;
Future<Reference<IUDPSocket>> createUDPSocket(bool isV6 = false) override;
virtual Future<std::vector<NetworkAddress>> resolveTCPEndpoint( std::string host, std::string service) {
throw lookup_failed();
}
@ -1725,6 +1736,196 @@ public:
int yield_limit; // how many more times yield may return false before next returning true
};
class UDPSimSocket : public IUDPSocket, ReferenceCounted<UDPSimSocket> {
using Packet = std::shared_ptr<std::vector<uint8_t>>;
UID id;
ISimulator::ProcessInfo* process;
Optional<NetworkAddress> peerAddress;
Optional<ISimulator::ProcessInfo*> peerProcess;
Optional<Reference<UDPSimSocket>> peerSocket;
ActorCollection actors;
Promise<Void> closed;
std::deque<std::pair<NetworkAddress, Packet>> recvBuffer;
AsyncVar<int64_t> writtenPackets;
NetworkAddress _localAddress;
bool randomDropPacket() {
auto res = deterministicRandom()->random01() < .000001;
TEST(res); // UDP packet drop
return res;
}
bool isClosed() const { return closed.getFuture().isReady(); }
Future<Void> onClosed() const { return closed.getFuture(); }
ACTOR static Future<Void> cleanupPeerSocket(UDPSimSocket* self) {
wait(self->peerSocket.get()->onClosed());
self->peerSocket.reset();
return Void();
}
ACTOR static Future<Void> send(UDPSimSocket* self, Reference<UDPSimSocket> peerSocket, uint8_t const* begin,
uint8_t const* end) {
state Packet packet(std::make_shared<std::vector<uint8_t>>());
packet->resize(begin - end);
std::copy(begin, end, packet->begin());
wait( delay( .002 * deterministicRandom()->random01() ) );
peerSocket->recvBuffer.emplace_back(self->_localAddress, std::move(packet));
peerSocket->writtenPackets.set(self->writtenPackets.get() + 1);
return Void();
}
ACTOR static Future<int> receiveFrom(UDPSimSocket* self, uint8_t* begin, uint8_t* end, NetworkAddress* sender) {
state TaskPriority currentTaskID = g_sim2.getCurrentTask();
wait(self->writtenPackets.onChange());
wait(g_sim2.onProcess(self->process, currentTaskID));
auto packet = self->recvBuffer.front().second;
int sz = packet->size();
ASSERT(sz <= end - begin);
if (sender) {
*sender = self->recvBuffer.front().first;
}
std::copy(packet->begin(), packet->end(), begin);
self->recvBuffer.pop_front();
return sz;
}
public:
UDPSimSocket(NetworkAddress const& localAddress, Optional<NetworkAddress> const& peerAddress)
: id(deterministicRandom()->randomUniqueID()), process(g_simulator.getCurrentProcess()), peerAddress(peerAddress),
actors(false), _localAddress(localAddress) {
process->boundUDPSockets.emplace(localAddress, this);
}
~UDPSimSocket() {
if (!closed.getFuture().isReady()) {
close();
closed.send(Void());
}
actors.clear(true);
}
void close() override { process->boundUDPSockets.erase(_localAddress); }
UID getDebugID() const override { return id; }
void addref() override { ReferenceCounted<UDPSimSocket>::addref(); }
void delref() override { ReferenceCounted<UDPSimSocket>::delref(); }
Future<int> send(uint8_t const* begin, uint8_t const* end) override {
int sz = int(end - begin);
auto res = fmap([sz](Void){ return sz; }, delay(0.0));
ASSERT(sz <= IUDPSocket::MAX_PACKET_SIZE);
ASSERT(peerAddress.present());
if (!peerProcess.present()) {
auto iter = g_sim2.addressMap.find(peerAddress.get());
if (iter == g_sim2.addressMap.end()) {
return res;
}
peerProcess = iter->second;
}
if (!peerSocket.present() || peerSocket.get()->isClosed()) {
peerSocket.reset();
auto iter = peerProcess.get()->boundUDPSockets.find(peerAddress.get());
if (iter == peerProcess.get()->boundUDPSockets.end()) {
return fmap([sz](Void){ return sz; }, delay(0.0));
}
peerSocket = iter->second.castTo<UDPSimSocket>();
// the notation of leaking connections doesn't make much sense in the context of UDP
// so we simply handle those in the simulator
actors.add(cleanupPeerSocket(this));
}
if (randomDropPacket()) {
return res;
}
actors.add(send(this, peerSocket.get(), begin, end));
return res;
}
Future<int> sendTo(uint8_t const* begin, uint8_t const* end, NetworkAddress const& peer) override {
int sz = int(end - begin);
auto res = fmap([sz](Void){ return sz; }, delay(0.0));
ASSERT(sz <= MAX_PACKET_SIZE);
ISimulator::ProcessInfo* peerProcess = nullptr;
Reference<UDPSimSocket> peerSocket;
{
auto iter = g_sim2.addressMap.find(peer);
if (iter == g_sim2.addressMap.end()) {
return res;
}
peerProcess = iter->second;
}
{
auto iter = peerProcess->boundUDPSockets.find(peer);
if (iter == peerProcess->boundUDPSockets.end()) {
return res;
}
peerSocket = iter->second.castTo<UDPSimSocket>();
}
actors.add(send(this, peerSocket, begin, end));
return res;
}
Future<int> receive(uint8_t* begin, uint8_t* end) override {
return receiveFrom(begin, end, nullptr);
}
Future<int> receiveFrom(uint8_t* begin, uint8_t* end, NetworkAddress* sender) override {
if (!recvBuffer.empty()) {
auto buf = recvBuffer.front().second;
if (sender) {
*sender = recvBuffer.front().first;
}
int sz = buf->size();
ASSERT(sz <= end - begin);
std::copy(buf->begin(), buf->end(), begin);
auto res = fmap([sz](Void){ return sz; }, delay(0.0));
recvBuffer.pop_front();
return res;
}
return receiveFrom(this, begin, end, sender);
}
void bind(NetworkAddress const& addr) override {
process->boundUDPSockets.erase(_localAddress);
process->boundUDPSockets.emplace(addr, Reference<UDPSimSocket>::addRef(this));
_localAddress = addr;
}
NetworkAddress localAddress() const override {
return _localAddress;
}
};
Future<Reference<IUDPSocket>> Sim2::createUDPSocket(NetworkAddress toAddr) {
NetworkAddress localAddress;
auto process = g_simulator.getCurrentProcess();
if (process->address.ip.isV6()) {
IPAddress::IPAddressStore store = process->address.ip.toV6();
uint16_t* ipParts = (uint16_t*)store.data();
ipParts[7] += deterministicRandom()->randomInt(0, 256);
localAddress.ip = IPAddress(store);
} else {
localAddress.ip = IPAddress(process->address.ip.toV4() + deterministicRandom()->randomInt(0, 256));
}
localAddress.port = deterministicRandom()->randomInt(40000, 60000);
return Reference<IUDPSocket>(new UDPSimSocket(localAddress, toAddr));
}
Future<Reference<IUDPSocket>> Sim2::createUDPSocket(bool isV6) {
NetworkAddress localAddress;
auto process = g_simulator.getCurrentProcess();
if (process->address.ip.isV6() == isV6) {
localAddress = process->address;
} else {
ASSERT(process->addresses.secondaryAddress.present() &&
process->addresses.secondaryAddress.get().isV6() == isV6);
localAddress = process->addresses.secondaryAddress.get();
}
if (localAddress.ip.isV6()) {
IPAddress::IPAddressStore store = localAddress.ip.toV6();
uint16_t* ipParts = (uint16_t*)store.data();
ipParts[7] += deterministicRandom()->randomInt(0, 256);
localAddress.ip = IPAddress(store);
} else {
localAddress.ip = IPAddress(localAddress.ip.toV4() + deterministicRandom()->randomInt(0, 256));
}
localAddress.port = deterministicRandom()->randomInt(40000, 60000);
return Reference<IUDPSocket>(new UDPSimSocket(localAddress, Optional<NetworkAddress>{}));
}
void startNewSimulator() {
ASSERT( !g_network );
g_network = g_pSimulator = new Sim2();

View File

@ -55,6 +55,7 @@ public:
ProcessClass startingClass;
TDMetricCollection tdmetrics;
std::map<NetworkAddress, Reference<IListener>> listenerMap;
std::map<NetworkAddress, Reference<IUDPSocket>> boundUDPSockets;
bool failed;
bool excluded;
bool cleared;

View File

@ -203,6 +203,7 @@ set(FDBSERVER_SRCS
workloads/UnitPerf.actor.cpp
workloads/UnitTests.actor.cpp
workloads/Unreadable.actor.cpp
workloads/UDPWorkload.actor.cpp
workloads/VersionStamp.actor.cpp
workloads/WatchAndWait.actor.cpp
workloads/Watches.actor.cpp

View File

@ -0,0 +1,266 @@
/*
* UDPWorkload.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2020 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbclient/FDBTypes.h"
#include "fdbclient/ReadYourWrites.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "flow/ActorCollection.h"
#include "flow/Arena.h"
#include "flow/Error.h"
#include "flow/IRandom.h"
#include "flow/flow.h"
#include "flow/network.h"
#include "flow/actorcompiler.h" // has to be last include
#include "flow/serialize.h"
#include <functional>
#include <limits>
#include <unordered_map>
#include <vector>
#include <memory>
#include <functional>
namespace {
struct UDPWorkload : TestWorkload {
constexpr static const char* name = "UDPWorkload";
// config
Key keyPrefix;
double runFor;
int minPort, maxPort;
// members
NetworkAddress serverAddress;
Reference<IUDPSocket> serverSocket;
std::unordered_map<NetworkAddress, unsigned> sent, received, acked, successes;
PromiseStream<NetworkAddress> toAck;
UDPWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
keyPrefix = getOption(options, "keyPrefix"_sr, "/udp/"_sr);
runFor = getOption(options, "runFor"_sr, 60.0);
minPort = getOption(options, "minPort"_sr, 5000);
maxPort = getOption(options, "minPort"_sr, 6000);
for (auto p : { minPort, maxPort }) {
ASSERT(p > 0 && p < std::numeric_limits<unsigned short>::max());
}
}
virtual std::string description() { return name; }
ACTOR static Future<Void> _setup(UDPWorkload* self, Database cx) {
state NetworkAddress localAddress(g_network->getLocalAddress().ip,
deterministicRandom()->randomInt(self->minPort, self->maxPort + 1), true,
false);
state Key key = self->keyPrefix.withSuffix(BinaryWriter::toValue(self->clientId, Unversioned()));
state Value serializedLocalAddress = BinaryWriter::toValue(localAddress, IncludeVersion());
state ReadYourWritesTransaction tr(cx);
Reference<IUDPSocket> s = wait(INetworkConnections::net()->createUDPSocket(localAddress.isV6()));
self->serverSocket = std::move(s);
self->serverSocket->bind(localAddress);
self->serverAddress = localAddress;
loop {
try {
Optional<Value> v = wait(tr.get(key));
if (v.present()) {
return Void();
}
tr.set(key, serializedLocalAddress);
wait(tr.commit());
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
virtual Future<Void> setup(Database const& cx) { return _setup(this, cx); }
class Message {
int _type = 0;
public:
enum class Type : uint8_t { PING, PONG };
Message() {}
explicit Message(Type t) {
switch (t) {
case Type::PING:
_type = 0;
break;
case Type::PONG:
_type = 1;
break;
default:
UNSTOPPABLE_ASSERT(false);
}
}
Type type() const {
switch (_type) {
case 0:
return Type::PING;
case 1:
return Type::PONG;
default:
UNSTOPPABLE_ASSERT(false);
}
}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, _type);
}
};
static Message ping() { return Message{ Message::Type::PING }; }
static Message pong() { return Message{ Message::Type::PONG }; }
ACTOR static Future<Void> _receiver(UDPWorkload* self) {
state Standalone<StringRef> packetString = makeString(IUDPSocket::MAX_PACKET_SIZE);
state uint8_t* packet = mutateString(packetString);
state NetworkAddress peerAddress;
loop {
int sz = wait(self->serverSocket->receiveFrom(packet, packet + IUDPSocket::MAX_PACKET_SIZE, &peerAddress));
auto msg = BinaryReader::fromStringRef<Message>(packetString.substr(0, sz), Unversioned());
if (msg.type() == Message::Type::PONG) {
self->successes[peerAddress] += 1;
} else if (msg.type() == Message::Type::PING) {
self->received[peerAddress] += 1;
self->toAck.send(peerAddress);
} else {
UNSTOPPABLE_ASSERT(false);
}
}
}
ACTOR static Future<Void> serverSender(UDPWorkload* self, std::vector<NetworkAddress>* remotes) {
state Standalone<StringRef> packetString;
state NetworkAddress peer;
loop {
choose {
when(wait(delay(0.1))) {
peer = deterministicRandom()->randomChoice(*remotes);
packetString = BinaryWriter::toValue(Message{ Message::Type::PING }, Unversioned());
self->sent[peer] += 1;
}
when(NetworkAddress p = waitNext(self->toAck.getFuture())) {
peer = p;
packetString = BinaryWriter::toValue(ping(), Unversioned());
self->acked[peer] += 1;
}
}
int res = wait(self->serverSocket->sendTo(packetString.begin(), packetString.end(), peer));
ASSERT(res == packetString.size());
}
}
ACTOR static Future<Void> clientReceiver(UDPWorkload* self, Reference<IUDPSocket> socket, Future<Void> done) {
state Standalone<StringRef> packetString = makeString(IUDPSocket::MAX_PACKET_SIZE);
state uint8_t* packet = mutateString(packetString);
state NetworkAddress peer;
state Future<Void> finished = Never();
loop {
choose {
when(int sz = wait(socket->receiveFrom(packet, packet + IUDPSocket::MAX_PACKET_SIZE, &peer))) {
auto res = BinaryReader::fromStringRef<Message>(packetString.substr(0, sz), Unversioned());
ASSERT(res.type() == Message::Type::PONG);
self->successes[peer] += 1;
}
when(wait(done)) {
finished = delay(1.0);
done = Never();
}
when(wait(finished)) { return Void(); }
}
}
}
ACTOR static Future<Void> clientSender(UDPWorkload* self, std::vector<NetworkAddress>* remotes) {
state AsyncVar<Reference<IUDPSocket>> socket;
state Standalone<StringRef> sendString;
state ActorCollection actors(false);
state NetworkAddress peer;
loop {
choose {
when(wait(delay(0.1))) {}
when(wait(actors.getResult())) { UNSTOPPABLE_ASSERT(false); }
}
if (!socket.get().isValid() || deterministicRandom()->random01() < 0.05) {
peer = deterministicRandom()->randomChoice(*remotes);
Reference<IUDPSocket> s = wait(INetworkConnections::net()->createUDPSocket(peer));
socket.set(s);
socket = s;
actors.add(clientReceiver(self, socket.get(), socket.onChange()));
}
sendString = BinaryWriter::toValue(ping(), Unversioned());
int res = wait(socket.get()->sendTo(sendString.begin(), sendString.end(), peer));
ASSERT(res == sendString.size());
self->sent[peer] += 1;
}
}
ACTOR static Future<Void> _start(UDPWorkload* self, Database cx) {
state ReadYourWritesTransaction tr(cx);
state std::vector<NetworkAddress> remotes;
loop {
try {
Standalone<RangeResultRef> range =
wait(tr.getRange(prefixRange(self->keyPrefix), CLIENT_KNOBS->TOO_MANY));
ASSERT(!range.more);
for (auto const& p : range) {
auto cID = BinaryReader::fromStringRef<decltype(self->clientId)>(
p.key.removePrefix(self->keyPrefix), Unversioned());
if (cID != self->clientId) {
remotes.emplace_back(BinaryReader::fromStringRef<NetworkAddress>(p.value, IncludeVersion()));
}
}
break;
} catch (Error& e) {
wait(tr.onError(e));
}
}
wait(clientSender(self, &remotes) && serverSender(self, &remotes) && _receiver(self));
UNSTOPPABLE_ASSERT(false);
return Void();
}
virtual Future<Void> start(Database const& cx) { return delay(runFor) || _start(this, cx); }
virtual Future<bool> check(Database const& cx) { return true; }
virtual void getMetrics(vector<PerfMetric>& m) {
unsigned totalReceived = 0, totalSent = 0, totalAcked = 0, totalSuccess = 0;
for (const auto& p : sent) {
totalSent += p.second;
}
for (const auto& p : received) {
totalReceived += p.second;
}
for (const auto& p : acked) {
totalAcked += p.second;
}
for (const auto& p : acked) {
totalAcked += p.second;
}
for (const auto& p : successes) {
totalSuccess += p.second;
}
m.emplace_back("Sent", totalSent, false);
m.emplace_back("Received", totalReceived, false);
m.emplace_back("Acknknowledged", totalAcked, false);
m.emplace_back("Successes", totalSuccess, false);
}
};
} // namespace
WorkloadFactory<UDPWorkload> UDPWorkloadFactory(UDPWorkload::name);

View File

@ -18,8 +18,13 @@
* limitations under the License.
*/
#include "boost/asio/buffer.hpp"
#include "boost/asio/ip/address.hpp"
#include "boost/system/system_error.hpp"
#include "flow/Platform.h"
#include "flow/Trace.h"
#include <algorithm>
#include <memory>
#define BOOST_SYSTEM_NO_LIB
#define BOOST_DATE_TIME_NO_LIB
#define BOOST_REGEX_NO_LIB
@ -124,6 +129,8 @@ public:
// INetworkConnections interface
virtual Future<Reference<IConnection>> connect( NetworkAddress toAddr, std::string host );
virtual Future<Reference<IUDPSocket>> createUDPSocket(NetworkAddress toAddr);
virtual Future<Reference<IUDPSocket>> createUDPSocket(bool isV6);
virtual Future<std::vector<NetworkAddress>> resolveTCPEndpoint( std::string host, std::string service);
virtual Reference<IListener> listen( NetworkAddress localAddr );
@ -217,11 +224,14 @@ public:
Future<Void> logTimeOffset();
Int64MetricHandle bytesReceived;
Int64MetricHandle udpBytesReceived;
Int64MetricHandle countWriteProbes;
Int64MetricHandle countReadProbes;
Int64MetricHandle countReads;
Int64MetricHandle countUDPReads;
Int64MetricHandle countWouldBlock;
Int64MetricHandle countWrites;
Int64MetricHandle countUDPWrites;
Int64MetricHandle countRunLoop;
Int64MetricHandle countCantSleep;
Int64MetricHandle countWontSleep;
@ -253,10 +263,22 @@ static boost::asio::ip::address tcpAddress(IPAddress const& n) {
}
}
static IPAddress toIPAddress(boost::asio::ip::address const& addr) {
if (addr.is_v4()) {
return IPAddress(addr.to_v4().to_uint());
} else {
return IPAddress(addr.to_v6().to_bytes());
}
}
static tcp::endpoint tcpEndpoint( NetworkAddress const& n ) {
return tcp::endpoint(tcpAddress(n.ip), n.port);
}
static udp::endpoint udpEndpoint(NetworkAddress const& n) {
return udp::endpoint(tcpAddress(n.ip), n.port);
}
class BindPromise {
Promise<Void> p;
const char* errContext;
@ -482,6 +504,194 @@ private:
}
};
class ReadPromise {
Promise<int> p;
const char* errContext;
UID errID;
std::shared_ptr<udp::endpoint> endpoint = nullptr;
public:
ReadPromise(const char* errContext, UID errID) : errContext(errContext), errID(errID) {}
ReadPromise(ReadPromise const& other) = default;
ReadPromise(ReadPromise&& other) : p(std::move(other.p)), errContext(other.errContext), errID(other.errID) {}
std::shared_ptr<udp::endpoint>& getEndpoint() { return endpoint; }
Future<int> getFuture() { return p.getFuture(); }
void operator()(const boost::system::error_code& error, size_t bytesWritten) {
try {
if (error) {
TraceEvent evt(SevWarn, errContext, errID);
evt.suppressFor(1.0).detail("ErrorCode", error.value()).detail("Message", error.message());
p.sendError(connection_failed());
} else {
p.send(int(bytesWritten));
}
} catch (Error& e) {
p.sendError(e);
} catch (...) {
p.sendError(unknown_error());
}
}
};
class UDPSocket : public IUDPSocket, ReferenceCounted<UDPSocket> {
UID id;
Optional<NetworkAddress> toAddress;
udp::socket socket;
bool isPublic = false;
public:
ACTOR static Future<Reference<IUDPSocket>> connect(boost::asio::io_service* io_service,
Optional<NetworkAddress> toAddress, bool isV6) {
state Reference<UDPSocket> self(new UDPSocket(*io_service, toAddress, isV6));
ASSERT(!toAddress.present() || toAddress.get().ip.isV6() == isV6);
if (!toAddress.present()) {
return self;
}
try {
if (toAddress.present()) {
auto to = udpEndpoint(toAddress.get());
BindPromise p("N2_UDPConnectError", self->id);
Future<Void> onConnected = p.getFuture();
self->socket.async_connect(to, std::move(p));
wait(onConnected);
}
self->init();
return self;
} catch (...) {
self->closeSocket();
throw;
}
}
void close() override { closeSocket(); }
void send(StringRef packet) override {}
void sendTo(NetworkAddress const& addr, StringRef packet) override {}
int readFrom(NetworkAddress* outAddr, uint8_t* begin, uint8_t* end) override { return 0; }
int read(uint8_t* begin, uint8_t* end) override {
// boost::system::error_code err;
return 0;
};
Future<int> receive(uint8_t* begin, uint8_t* end) override {
++g_net2->countUDPReads;
ReadPromise p("N2_UDPReadError", id);
auto res = p.getFuture();
socket.async_receive(boost::asio::mutable_buffer(begin, end - begin), std::move(p));
return fmap(
[](int bytes) {
g_net2->udpBytesReceived += bytes;
return bytes;
},
res);
}
Future<int> receiveFrom(uint8_t* begin, uint8_t* end, NetworkAddress* sender) override {
++g_net2->countUDPReads;
ReadPromise p("N2_UDPReadFromError", id);
p.getEndpoint() = std::make_shared<udp::endpoint>();
auto endpoint = p.getEndpoint().get();
auto res = p.getFuture();
socket.async_receive_from(boost::asio::mutable_buffer(begin, end - begin), *endpoint, std::move(p));
return fmap(
[endpoint, sender](int bytes) {
if (sender) {
sender->port = endpoint->port();
sender->ip = toIPAddress(endpoint->address());
}
g_net2->udpBytesReceived += bytes;
return bytes;
},
res);
}
Future<int> send(uint8_t const* begin, uint8_t const* end) override {
++g_net2->countUDPWrites;
ReadPromise p("N2_UDPWriteError", id);
auto res = p.getFuture();
socket.async_send(boost::asio::const_buffer(begin, end - begin), std::move(p));
return res;
}
Future<int> sendTo(uint8_t const* begin, uint8_t const* end, NetworkAddress const& peer) override {
++g_net2->countUDPWrites;
ReadPromise p("N2_UDPWriteError", id);
auto res = p.getFuture();
udp::endpoint toEndpoint = udpEndpoint(peer);
socket.async_send_to(boost::asio::const_buffer(begin, end - begin), toEndpoint, std::move(p));
return res;
}
void bind(NetworkAddress const& addr) override {
boost::system::error_code ec;
socket.bind(udpEndpoint(addr), ec);
if (ec) {
Error x;
if (ec.value() == EADDRINUSE)
x = address_in_use();
else if (ec.value() == EADDRNOTAVAIL)
x = invalid_local_address();
else
x = bind_failed();
TraceEvent(SevWarnAlways, "Net2UDPBindError").error(x);
throw x;
}
isPublic = true;
}
UID getDebugID() const override { return id; }
void addref() override { ReferenceCounted<UDPSocket>::addref(); }
void delref() override { ReferenceCounted<UDPSocket>::delref(); }
NetworkAddress localAddress() const {
auto endpoint = socket.local_endpoint();
return NetworkAddress(toIPAddress(endpoint.address()), endpoint.port(), isPublic, false);
}
private:
UDPSocket(boost::asio::io_service& io_service, Optional<NetworkAddress> toAddress, bool isV6)
: id(nondeterministicRandom()->randomUniqueID()), socket(io_service, isV6 ? udp::v6() : udp::v4()) {
socket.non_blocking(true);
}
void closeSocket() {
boost::system::error_code error;
socket.close(error);
if (error)
TraceEvent(SevWarn, "N2_CloseError", id)
.suppressFor(1.0)
.detail("ErrorCode", error.value())
.detail("Message", error.message());
}
void onReadError(const boost::system::error_code& error) {
TraceEvent(SevWarn, "N2_UDPReadError", id)
.suppressFor(1.0)
.detail("ErrorCode", error.value())
.detail("Message", error.message());
closeSocket();
}
void onWriteError(const boost::system::error_code& error) {
TraceEvent(SevWarn, "N2_UDPWriteError", id)
.suppressFor(1.0)
.detail("ErrorCode", error.value())
.detail("Message", error.message());
closeSocket();
}
void init() {
socket.non_blocking(true);
platform::setCloseOnExec(socket.native_handle());
}
};
class Listener : public IListener, ReferenceCounted<Listener> {
boost::asio::io_context& io_service;
NetworkAddress listenAddress;
@ -1430,6 +1640,14 @@ ACTOR static Future<std::vector<NetworkAddress>> resolveTCPEndpoint_impl( Net2 *
return result.get();
}
Future<Reference<IUDPSocket>> Net2::createUDPSocket(NetworkAddress toAddr) {
return UDPSocket::connect(&reactor.ios, toAddr, toAddr.ip.isV6());
}
Future<Reference<IUDPSocket>> Net2::createUDPSocket(bool isV6) {
return UDPSocket::connect(&reactor.ios, Optional<NetworkAddress>(), isV6);
}
Future<std::vector<NetworkAddress>> Net2::resolveTCPEndpoint( std::string host, std::string service) {
return resolveTCPEndpoint_impl(this, host, service);
}

View File

@ -21,6 +21,7 @@
#pragma once
// When actually compiled (NO_INTELLISENSE), include the generated version of this file. In intellisense use the source version.
#include <utility>
#if defined(NO_INTELLISENSE) && !defined(FLOW_GENERICACTORS_ACTOR_G_H)
#define FLOW_GENERICACTORS_ACTOR_G_H
#include "flow/genericactors.actor.g.h"
@ -1810,6 +1811,37 @@ Future<Void> timeReply(Future<T> replyToTime, PromiseStream<double> timeOutput){
return Void();
}
// Monad
ACTOR template <class Fun, class T>
Future<decltype(std::declval<Fun>()(std::declval<T>()))> fmap(Fun fun, Future<T> f) {
T val = wait(f);
return fun(val);
}
ACTOR template <class T, class Fun>
Future<decltype(std::declval<Fun>()(std::declval<T>()).getValue())> runAfter(Future<T> lhs, Fun rhs) {
T val1 = wait(lhs);
decltype(std::declval<Fun>()(std::declval<T>()).getValue()) res = wait(rhs(val1));
return res;
}
ACTOR template <class T, class U>
Future<U> runAfter(Future<T> lhs, Future<U> rhs) {
T val1 = wait(lhs);
U res = wait(rhs);
return res;
}
template <class T, class Res>
Future<Res> operator>>=(Future<T> lhs, std::function<Future<Res>(T const&)> rhs) {
return runAfter(lhs, rhs);
}
template <class T, class U>
Future<U> operator>> (Future<T> const& lhs, Future<U> const& rhs) {
return runAfter(lhs, rhs);
}
#include "flow/unactorcompiler.h"

View File

@ -167,6 +167,8 @@ Future<Reference<IConnection>> INetworkConnections::connect( std::string host, s
});
}
IUDPSocket::~IUDPSocket() {}
const std::vector<int> NetworkMetrics::starvationBins = { 1, 3500, 7000, 7500, 8500, 8900, 10500 };
TEST_CASE("/flow/network/ipaddress") {

View File

@ -545,25 +545,22 @@ protected:
class IUDPSocket {
public:
// see https://en.wikipedia.org/wiki/User_Datagram_Protocol - the max size of a UDP packet
// This is enforced in simulation
constexpr static size_t MAX_PACKET_SIZE = 65535;
virtual ~IUDPSocket();
virtual void addref() = 0;
virtual void delref() = 0;
virtual void send(StringRef packet) = 0;
virtual void close() = 0;
virtual Future<int> send(uint8_t const* begin, uint8_t const* end) = 0;
virtual Future<int> sendTo(uint8_t const* begin, uint8_t const* end, NetworkAddress const& peer) = 0;
virtual Future<int> receive(uint8_t* begin, uint8_t* end) = 0;
virtual Future<int> receiveFrom(uint8_t* begin, uint8_t* end, NetworkAddress* sender) = 0;
virtual void bind(NetworkAddress const& addr) = 0;
virtual void sendTo(NetworkAddress const& addr, StringRef packet) = 0;
// Reads as many bytes as possible from the read buffer into [begin,end) and returns the number of bytes read (might be 0)
// Puts the address of the sender into outAddr
virtual int readFrom(NetworkAddress* outAddr, uint8_t* begin, uint8_t* end) = 0;
// Reads as many bytes as possible from the read buffer into [begin,end) and returns the number of bytes read (might be 0)
virtual int read(uint8_t* begin, uint8_t* end);
// Precondition: read() has been called and last returned 0
// returns when read() can read at least one byte (or may throw an error if the connection dies)
virtual Future<Void> onReadable() = 0;
virtual UID getDebugID() const = 0;
virtual NetworkAddress localAddress() const = 0;
};
class INetworkConnections {
@ -575,6 +572,11 @@ public:
// Make an outgoing connection to the given address. May return an error or block indefinitely in case of connection problems!
virtual Future<Reference<IConnection>> connect( NetworkAddress toAddr, std::string host = "") = 0;
// Make an outgoing udp connection and connect to the passed address.
virtual Future<Reference<IUDPSocket>> createUDPSocket(NetworkAddress toAddr) = 0;
// Make an outgoing udp connection without establishing a connection
virtual Future<Reference<IUDPSocket>> createUDPSocket(bool isV6 = false) = 0;
// Resolve host name and service name (such as "http" or can be a plain number like "80") to a list of 1 or more NetworkAddresses
virtual Future<std::vector<NetworkAddress>> resolveTCPEndpoint( std::string host, std::string service ) = 0;