diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 43cb86bc8e..458f3715e8 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -84,12 +84,6 @@ using std::pair; namespace { -ACTOR template -Future runAfter(Future in, Fun func) { - T res = wait(in); - return func(res); -} - template Future loadBalance( DatabaseContext* ctx, const Reference alternatives, RequestStream Interface::*channel, @@ -99,13 +93,14 @@ Future loadBalance( if (alternatives->hasCaches) { return loadBalance(alternatives->locations(), channel, request, taskID, atMostOnce, model); } - return runAfter(loadBalance(alternatives->locations(), channel, request, taskID, atMostOnce, model), - [ctx](auto res) { - if (res.cached) { - ctx->updateCache.trigger(); - } - return res; - }); + return fmap( + [ctx](auto const& res) { + if (res.cached) { + ctx->updateCache.trigger(); + } + return res; + }, + loadBalance(alternatives->locations(), channel, request, taskID, atMostOnce, model)); } } // namespace diff --git a/fdbrpc/sim2.actor.cpp b/fdbrpc/sim2.actor.cpp index 0d50e8b230..36e2d88f27 100644 --- a/fdbrpc/sim2.actor.cpp +++ b/fdbrpc/sim2.actor.cpp @@ -19,8 +19,14 @@ */ #include +#include +#include +#include +#include #include "fdbrpc/simulator.h" +#include "flow/ActorCollection.h" +#include "flow/IRandom.h" #include "flow/IThreadPool.h" #include "flow/Util.h" #include "fdbrpc/IAsyncFile.h" @@ -29,6 +35,8 @@ #include "flow/crc32c.h" #include "fdbrpc/TraceFileIO.h" #include "flow/FaultInjection.h" +#include "flow/flow.h" +#include "flow/genericactors.actor.h" #include "flow/network.h" #include "flow/TLSConfig.actor.h" #include "fdbrpc/Net2FileSystem.h" @@ -824,6 +832,9 @@ public: ((Sim2Listener*)peerp->getListener(toAddr).getPtr())->incomingConnection( 0.5*deterministicRandom()->random01(), Reference(peerc) ); return onConnect( ::delay(0.5*deterministicRandom()->random01()), myc ); } + + Future> createUDPSocket(NetworkAddress toAddr) override; + Future> createUDPSocket(bool isV6 = false) override; virtual Future> resolveTCPEndpoint( std::string host, std::string service) { throw lookup_failed(); } @@ -1725,6 +1736,196 @@ public: int yield_limit; // how many more times yield may return false before next returning true }; +class UDPSimSocket : public IUDPSocket, ReferenceCounted { + using Packet = std::shared_ptr>; + UID id; + ISimulator::ProcessInfo* process; + Optional peerAddress; + Optional peerProcess; + Optional> peerSocket; + ActorCollection actors; + Promise closed; + std::deque> recvBuffer; + AsyncVar writtenPackets; + NetworkAddress _localAddress; + bool randomDropPacket() { + auto res = deterministicRandom()->random01() < .000001; + TEST(res); // UDP packet drop + return res; + } + + bool isClosed() const { return closed.getFuture().isReady(); } + Future onClosed() const { return closed.getFuture(); } + + ACTOR static Future cleanupPeerSocket(UDPSimSocket* self) { + wait(self->peerSocket.get()->onClosed()); + self->peerSocket.reset(); + return Void(); + } + + ACTOR static Future send(UDPSimSocket* self, Reference peerSocket, uint8_t const* begin, + uint8_t const* end) { + state Packet packet(std::make_shared>()); + packet->resize(begin - end); + std::copy(begin, end, packet->begin()); + wait( delay( .002 * deterministicRandom()->random01() ) ); + peerSocket->recvBuffer.emplace_back(self->_localAddress, std::move(packet)); + peerSocket->writtenPackets.set(self->writtenPackets.get() + 1); + return Void(); + } + + ACTOR static Future receiveFrom(UDPSimSocket* self, uint8_t* begin, uint8_t* end, NetworkAddress* sender) { + state TaskPriority currentTaskID = g_sim2.getCurrentTask(); + wait(self->writtenPackets.onChange()); + wait(g_sim2.onProcess(self->process, currentTaskID)); + auto packet = self->recvBuffer.front().second; + int sz = packet->size(); + ASSERT(sz <= end - begin); + if (sender) { + *sender = self->recvBuffer.front().first; + } + std::copy(packet->begin(), packet->end(), begin); + self->recvBuffer.pop_front(); + return sz; + } + +public: + UDPSimSocket(NetworkAddress const& localAddress, Optional const& peerAddress) + : id(deterministicRandom()->randomUniqueID()), process(g_simulator.getCurrentProcess()), peerAddress(peerAddress), + actors(false), _localAddress(localAddress) { + process->boundUDPSockets.emplace(localAddress, this); + } + ~UDPSimSocket() { + if (!closed.getFuture().isReady()) { + close(); + closed.send(Void()); + } + actors.clear(true); + } + void close() override { process->boundUDPSockets.erase(_localAddress); } + UID getDebugID() const override { return id; } + void addref() override { ReferenceCounted::addref(); } + void delref() override { ReferenceCounted::delref(); } + + Future send(uint8_t const* begin, uint8_t const* end) override { + int sz = int(end - begin); + auto res = fmap([sz](Void){ return sz; }, delay(0.0)); + ASSERT(sz <= IUDPSocket::MAX_PACKET_SIZE); + ASSERT(peerAddress.present()); + if (!peerProcess.present()) { + auto iter = g_sim2.addressMap.find(peerAddress.get()); + if (iter == g_sim2.addressMap.end()) { + return res; + } + peerProcess = iter->second; + } + if (!peerSocket.present() || peerSocket.get()->isClosed()) { + peerSocket.reset(); + auto iter = peerProcess.get()->boundUDPSockets.find(peerAddress.get()); + if (iter == peerProcess.get()->boundUDPSockets.end()) { + return fmap([sz](Void){ return sz; }, delay(0.0)); + } + peerSocket = iter->second.castTo(); + // the notation of leaking connections doesn't make much sense in the context of UDP + // so we simply handle those in the simulator + actors.add(cleanupPeerSocket(this)); + } + if (randomDropPacket()) { + return res; + } + actors.add(send(this, peerSocket.get(), begin, end)); + return res; + } + Future sendTo(uint8_t const* begin, uint8_t const* end, NetworkAddress const& peer) override { + int sz = int(end - begin); + auto res = fmap([sz](Void){ return sz; }, delay(0.0)); + ASSERT(sz <= MAX_PACKET_SIZE); + ISimulator::ProcessInfo* peerProcess = nullptr; + Reference peerSocket; + { + auto iter = g_sim2.addressMap.find(peer); + if (iter == g_sim2.addressMap.end()) { + return res; + } + peerProcess = iter->second; + } + { + auto iter = peerProcess->boundUDPSockets.find(peer); + if (iter == peerProcess->boundUDPSockets.end()) { + return res; + } + peerSocket = iter->second.castTo(); + } + actors.add(send(this, peerSocket, begin, end)); + return res; + } + Future receive(uint8_t* begin, uint8_t* end) override { + return receiveFrom(begin, end, nullptr); + } + Future receiveFrom(uint8_t* begin, uint8_t* end, NetworkAddress* sender) override { + if (!recvBuffer.empty()) { + auto buf = recvBuffer.front().second; + if (sender) { + *sender = recvBuffer.front().first; + } + int sz = buf->size(); + ASSERT(sz <= end - begin); + std::copy(buf->begin(), buf->end(), begin); + auto res = fmap([sz](Void){ return sz; }, delay(0.0)); + recvBuffer.pop_front(); + return res; + } + return receiveFrom(this, begin, end, sender); + } + void bind(NetworkAddress const& addr) override { + process->boundUDPSockets.erase(_localAddress); + process->boundUDPSockets.emplace(addr, Reference::addRef(this)); + _localAddress = addr; + } + + NetworkAddress localAddress() const override { + return _localAddress; + } + +}; + +Future> Sim2::createUDPSocket(NetworkAddress toAddr) { + NetworkAddress localAddress; + auto process = g_simulator.getCurrentProcess(); + if (process->address.ip.isV6()) { + IPAddress::IPAddressStore store = process->address.ip.toV6(); + uint16_t* ipParts = (uint16_t*)store.data(); + ipParts[7] += deterministicRandom()->randomInt(0, 256); + localAddress.ip = IPAddress(store); + } else { + localAddress.ip = IPAddress(process->address.ip.toV4() + deterministicRandom()->randomInt(0, 256)); + } + localAddress.port = deterministicRandom()->randomInt(40000, 60000); + return Reference(new UDPSimSocket(localAddress, toAddr)); +} + +Future> Sim2::createUDPSocket(bool isV6) { + NetworkAddress localAddress; + auto process = g_simulator.getCurrentProcess(); + if (process->address.ip.isV6() == isV6) { + localAddress = process->address; + } else { + ASSERT(process->addresses.secondaryAddress.present() && + process->addresses.secondaryAddress.get().isV6() == isV6); + localAddress = process->addresses.secondaryAddress.get(); + } + if (localAddress.ip.isV6()) { + IPAddress::IPAddressStore store = localAddress.ip.toV6(); + uint16_t* ipParts = (uint16_t*)store.data(); + ipParts[7] += deterministicRandom()->randomInt(0, 256); + localAddress.ip = IPAddress(store); + } else { + localAddress.ip = IPAddress(localAddress.ip.toV4() + deterministicRandom()->randomInt(0, 256)); + } + localAddress.port = deterministicRandom()->randomInt(40000, 60000); + return Reference(new UDPSimSocket(localAddress, Optional{})); +} + void startNewSimulator() { ASSERT( !g_network ); g_network = g_pSimulator = new Sim2(); diff --git a/fdbrpc/simulator.h b/fdbrpc/simulator.h index 23a44e09eb..b75537a439 100644 --- a/fdbrpc/simulator.h +++ b/fdbrpc/simulator.h @@ -55,6 +55,7 @@ public: ProcessClass startingClass; TDMetricCollection tdmetrics; std::map> listenerMap; + std::map> boundUDPSockets; bool failed; bool excluded; bool cleared; diff --git a/fdbserver/CMakeLists.txt b/fdbserver/CMakeLists.txt index d332ae80a5..ad066ce069 100644 --- a/fdbserver/CMakeLists.txt +++ b/fdbserver/CMakeLists.txt @@ -203,6 +203,7 @@ set(FDBSERVER_SRCS workloads/UnitPerf.actor.cpp workloads/UnitTests.actor.cpp workloads/Unreadable.actor.cpp + workloads/UDPWorkload.actor.cpp workloads/VersionStamp.actor.cpp workloads/WatchAndWait.actor.cpp workloads/Watches.actor.cpp diff --git a/fdbserver/workloads/UDPWorkload.actor.cpp b/fdbserver/workloads/UDPWorkload.actor.cpp new file mode 100644 index 0000000000..f8b6436aaf --- /dev/null +++ b/fdbserver/workloads/UDPWorkload.actor.cpp @@ -0,0 +1,266 @@ +/* + * UDPWorkload.actor.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2020 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "fdbclient/FDBTypes.h" +#include "fdbclient/ReadYourWrites.h" +#include "fdbserver/workloads/workloads.actor.h" +#include "flow/ActorCollection.h" +#include "flow/Arena.h" +#include "flow/Error.h" +#include "flow/IRandom.h" +#include "flow/flow.h" +#include "flow/network.h" +#include "flow/actorcompiler.h" // has to be last include +#include "flow/serialize.h" +#include +#include +#include +#include +#include +#include + +namespace { + +struct UDPWorkload : TestWorkload { + constexpr static const char* name = "UDPWorkload"; + // config + Key keyPrefix; + double runFor; + int minPort, maxPort; + // members + NetworkAddress serverAddress; + Reference serverSocket; + std::unordered_map sent, received, acked, successes; + PromiseStream toAck; + + UDPWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) { + keyPrefix = getOption(options, "keyPrefix"_sr, "/udp/"_sr); + runFor = getOption(options, "runFor"_sr, 60.0); + minPort = getOption(options, "minPort"_sr, 5000); + maxPort = getOption(options, "minPort"_sr, 6000); + for (auto p : { minPort, maxPort }) { + ASSERT(p > 0 && p < std::numeric_limits::max()); + } + } + + virtual std::string description() { return name; } + ACTOR static Future _setup(UDPWorkload* self, Database cx) { + state NetworkAddress localAddress(g_network->getLocalAddress().ip, + deterministicRandom()->randomInt(self->minPort, self->maxPort + 1), true, + false); + state Key key = self->keyPrefix.withSuffix(BinaryWriter::toValue(self->clientId, Unversioned())); + state Value serializedLocalAddress = BinaryWriter::toValue(localAddress, IncludeVersion()); + state ReadYourWritesTransaction tr(cx); + Reference s = wait(INetworkConnections::net()->createUDPSocket(localAddress.isV6())); + self->serverSocket = std::move(s); + self->serverSocket->bind(localAddress); + self->serverAddress = localAddress; + loop { + try { + Optional v = wait(tr.get(key)); + if (v.present()) { + return Void(); + } + tr.set(key, serializedLocalAddress); + wait(tr.commit()); + } catch (Error& e) { + wait(tr.onError(e)); + } + } + } + virtual Future setup(Database const& cx) { return _setup(this, cx); } + + class Message { + int _type = 0; + + public: + enum class Type : uint8_t { PING, PONG }; + Message() {} + explicit Message(Type t) { + switch (t) { + case Type::PING: + _type = 0; + break; + case Type::PONG: + _type = 1; + break; + default: + UNSTOPPABLE_ASSERT(false); + } + } + + Type type() const { + switch (_type) { + case 0: + return Type::PING; + case 1: + return Type::PONG; + default: + UNSTOPPABLE_ASSERT(false); + } + } + + template + void serialize(Ar& ar) { + serializer(ar, _type); + } + }; + + static Message ping() { return Message{ Message::Type::PING }; } + static Message pong() { return Message{ Message::Type::PONG }; } + + ACTOR static Future _receiver(UDPWorkload* self) { + state Standalone packetString = makeString(IUDPSocket::MAX_PACKET_SIZE); + state uint8_t* packet = mutateString(packetString); + state NetworkAddress peerAddress; + loop { + int sz = wait(self->serverSocket->receiveFrom(packet, packet + IUDPSocket::MAX_PACKET_SIZE, &peerAddress)); + auto msg = BinaryReader::fromStringRef(packetString.substr(0, sz), Unversioned()); + if (msg.type() == Message::Type::PONG) { + self->successes[peerAddress] += 1; + } else if (msg.type() == Message::Type::PING) { + self->received[peerAddress] += 1; + self->toAck.send(peerAddress); + } else { + UNSTOPPABLE_ASSERT(false); + } + } + } + + ACTOR static Future serverSender(UDPWorkload* self, std::vector* remotes) { + state Standalone packetString; + state NetworkAddress peer; + loop { + choose { + when(wait(delay(0.1))) { + peer = deterministicRandom()->randomChoice(*remotes); + packetString = BinaryWriter::toValue(Message{ Message::Type::PING }, Unversioned()); + self->sent[peer] += 1; + } + when(NetworkAddress p = waitNext(self->toAck.getFuture())) { + peer = p; + packetString = BinaryWriter::toValue(ping(), Unversioned()); + self->acked[peer] += 1; + } + } + int res = wait(self->serverSocket->sendTo(packetString.begin(), packetString.end(), peer)); + ASSERT(res == packetString.size()); + } + } + + ACTOR static Future clientReceiver(UDPWorkload* self, Reference socket, Future done) { + state Standalone packetString = makeString(IUDPSocket::MAX_PACKET_SIZE); + state uint8_t* packet = mutateString(packetString); + state NetworkAddress peer; + state Future finished = Never(); + loop { + choose { + when(int sz = wait(socket->receiveFrom(packet, packet + IUDPSocket::MAX_PACKET_SIZE, &peer))) { + auto res = BinaryReader::fromStringRef(packetString.substr(0, sz), Unversioned()); + ASSERT(res.type() == Message::Type::PONG); + self->successes[peer] += 1; + } + when(wait(done)) { + finished = delay(1.0); + done = Never(); + } + when(wait(finished)) { return Void(); } + } + } + } + + ACTOR static Future clientSender(UDPWorkload* self, std::vector* remotes) { + state AsyncVar> socket; + state Standalone sendString; + state ActorCollection actors(false); + state NetworkAddress peer; + + loop { + choose { + when(wait(delay(0.1))) {} + when(wait(actors.getResult())) { UNSTOPPABLE_ASSERT(false); } + } + if (!socket.get().isValid() || deterministicRandom()->random01() < 0.05) { + peer = deterministicRandom()->randomChoice(*remotes); + Reference s = wait(INetworkConnections::net()->createUDPSocket(peer)); + socket.set(s); + socket = s; + actors.add(clientReceiver(self, socket.get(), socket.onChange())); + } + sendString = BinaryWriter::toValue(ping(), Unversioned()); + int res = wait(socket.get()->sendTo(sendString.begin(), sendString.end(), peer)); + ASSERT(res == sendString.size()); + self->sent[peer] += 1; + } + } + + ACTOR static Future _start(UDPWorkload* self, Database cx) { + state ReadYourWritesTransaction tr(cx); + state std::vector remotes; + loop { + try { + Standalone range = + wait(tr.getRange(prefixRange(self->keyPrefix), CLIENT_KNOBS->TOO_MANY)); + ASSERT(!range.more); + for (auto const& p : range) { + auto cID = BinaryReader::fromStringRefclientId)>( + p.key.removePrefix(self->keyPrefix), Unversioned()); + if (cID != self->clientId) { + remotes.emplace_back(BinaryReader::fromStringRef(p.value, IncludeVersion())); + } + } + break; + } catch (Error& e) { + wait(tr.onError(e)); + } + } + wait(clientSender(self, &remotes) && serverSender(self, &remotes) && _receiver(self)); + UNSTOPPABLE_ASSERT(false); + return Void(); + } + virtual Future start(Database const& cx) { return delay(runFor) || _start(this, cx); } + virtual Future check(Database const& cx) { return true; } + virtual void getMetrics(vector& m) { + unsigned totalReceived = 0, totalSent = 0, totalAcked = 0, totalSuccess = 0; + for (const auto& p : sent) { + totalSent += p.second; + } + for (const auto& p : received) { + totalReceived += p.second; + } + for (const auto& p : acked) { + totalAcked += p.second; + } + for (const auto& p : acked) { + totalAcked += p.second; + } + for (const auto& p : successes) { + totalSuccess += p.second; + } + m.emplace_back("Sent", totalSent, false); + m.emplace_back("Received", totalReceived, false); + m.emplace_back("Acknknowledged", totalAcked, false); + m.emplace_back("Successes", totalSuccess, false); + } +}; + +} // namespace + +WorkloadFactory UDPWorkloadFactory(UDPWorkload::name); diff --git a/flow/Net2.actor.cpp b/flow/Net2.actor.cpp index d7ea9004b9..0218f35dfb 100644 --- a/flow/Net2.actor.cpp +++ b/flow/Net2.actor.cpp @@ -18,8 +18,13 @@ * limitations under the License. */ +#include "boost/asio/buffer.hpp" +#include "boost/asio/ip/address.hpp" +#include "boost/system/system_error.hpp" #include "flow/Platform.h" +#include "flow/Trace.h" #include +#include #define BOOST_SYSTEM_NO_LIB #define BOOST_DATE_TIME_NO_LIB #define BOOST_REGEX_NO_LIB @@ -124,6 +129,8 @@ public: // INetworkConnections interface virtual Future> connect( NetworkAddress toAddr, std::string host ); + virtual Future> createUDPSocket(NetworkAddress toAddr); + virtual Future> createUDPSocket(bool isV6); virtual Future> resolveTCPEndpoint( std::string host, std::string service); virtual Reference listen( NetworkAddress localAddr ); @@ -217,11 +224,14 @@ public: Future logTimeOffset(); Int64MetricHandle bytesReceived; + Int64MetricHandle udpBytesReceived; Int64MetricHandle countWriteProbes; Int64MetricHandle countReadProbes; Int64MetricHandle countReads; + Int64MetricHandle countUDPReads; Int64MetricHandle countWouldBlock; Int64MetricHandle countWrites; + Int64MetricHandle countUDPWrites; Int64MetricHandle countRunLoop; Int64MetricHandle countCantSleep; Int64MetricHandle countWontSleep; @@ -253,10 +263,22 @@ static boost::asio::ip::address tcpAddress(IPAddress const& n) { } } +static IPAddress toIPAddress(boost::asio::ip::address const& addr) { + if (addr.is_v4()) { + return IPAddress(addr.to_v4().to_uint()); + } else { + return IPAddress(addr.to_v6().to_bytes()); + } +} + static tcp::endpoint tcpEndpoint( NetworkAddress const& n ) { return tcp::endpoint(tcpAddress(n.ip), n.port); } +static udp::endpoint udpEndpoint(NetworkAddress const& n) { + return udp::endpoint(tcpAddress(n.ip), n.port); +} + class BindPromise { Promise p; const char* errContext; @@ -482,6 +504,194 @@ private: } }; +class ReadPromise { + Promise p; + const char* errContext; + UID errID; + std::shared_ptr endpoint = nullptr; + +public: + ReadPromise(const char* errContext, UID errID) : errContext(errContext), errID(errID) {} + ReadPromise(ReadPromise const& other) = default; + ReadPromise(ReadPromise&& other) : p(std::move(other.p)), errContext(other.errContext), errID(other.errID) {} + + std::shared_ptr& getEndpoint() { return endpoint; } + + Future getFuture() { return p.getFuture(); } + void operator()(const boost::system::error_code& error, size_t bytesWritten) { + try { + if (error) { + TraceEvent evt(SevWarn, errContext, errID); + evt.suppressFor(1.0).detail("ErrorCode", error.value()).detail("Message", error.message()); + p.sendError(connection_failed()); + } else { + p.send(int(bytesWritten)); + } + } catch (Error& e) { + p.sendError(e); + } catch (...) { + p.sendError(unknown_error()); + } + } +}; + +class UDPSocket : public IUDPSocket, ReferenceCounted { + UID id; + Optional toAddress; + udp::socket socket; + bool isPublic = false; + +public: + ACTOR static Future> connect(boost::asio::io_service* io_service, + Optional toAddress, bool isV6) { + state Reference self(new UDPSocket(*io_service, toAddress, isV6)); + ASSERT(!toAddress.present() || toAddress.get().ip.isV6() == isV6); + if (!toAddress.present()) { + return self; + } + try { + if (toAddress.present()) { + auto to = udpEndpoint(toAddress.get()); + BindPromise p("N2_UDPConnectError", self->id); + Future onConnected = p.getFuture(); + self->socket.async_connect(to, std::move(p)); + + wait(onConnected); + } + self->init(); + return self; + } catch (...) { + self->closeSocket(); + throw; + } + } + + void close() override { closeSocket(); } + + void send(StringRef packet) override {} + + void sendTo(NetworkAddress const& addr, StringRef packet) override {} + + int readFrom(NetworkAddress* outAddr, uint8_t* begin, uint8_t* end) override { return 0; } + + int read(uint8_t* begin, uint8_t* end) override { + // boost::system::error_code err; + return 0; + }; + + Future receive(uint8_t* begin, uint8_t* end) override { + ++g_net2->countUDPReads; + ReadPromise p("N2_UDPReadError", id); + auto res = p.getFuture(); + socket.async_receive(boost::asio::mutable_buffer(begin, end - begin), std::move(p)); + return fmap( + [](int bytes) { + g_net2->udpBytesReceived += bytes; + return bytes; + }, + res); + } + + Future receiveFrom(uint8_t* begin, uint8_t* end, NetworkAddress* sender) override { + ++g_net2->countUDPReads; + ReadPromise p("N2_UDPReadFromError", id); + p.getEndpoint() = std::make_shared(); + auto endpoint = p.getEndpoint().get(); + auto res = p.getFuture(); + socket.async_receive_from(boost::asio::mutable_buffer(begin, end - begin), *endpoint, std::move(p)); + return fmap( + [endpoint, sender](int bytes) { + if (sender) { + sender->port = endpoint->port(); + sender->ip = toIPAddress(endpoint->address()); + } + g_net2->udpBytesReceived += bytes; + return bytes; + }, + res); + } + + Future send(uint8_t const* begin, uint8_t const* end) override { + ++g_net2->countUDPWrites; + ReadPromise p("N2_UDPWriteError", id); + auto res = p.getFuture(); + socket.async_send(boost::asio::const_buffer(begin, end - begin), std::move(p)); + return res; + } + + Future sendTo(uint8_t const* begin, uint8_t const* end, NetworkAddress const& peer) override { + ++g_net2->countUDPWrites; + ReadPromise p("N2_UDPWriteError", id); + auto res = p.getFuture(); + udp::endpoint toEndpoint = udpEndpoint(peer); + socket.async_send_to(boost::asio::const_buffer(begin, end - begin), toEndpoint, std::move(p)); + return res; + } + + void bind(NetworkAddress const& addr) override { + boost::system::error_code ec; + socket.bind(udpEndpoint(addr), ec); + if (ec) { + Error x; + if (ec.value() == EADDRINUSE) + x = address_in_use(); + else if (ec.value() == EADDRNOTAVAIL) + x = invalid_local_address(); + else + x = bind_failed(); + TraceEvent(SevWarnAlways, "Net2UDPBindError").error(x); + throw x; + } + isPublic = true; + } + + UID getDebugID() const override { return id; } + + void addref() override { ReferenceCounted::addref(); } + void delref() override { ReferenceCounted::delref(); } + + NetworkAddress localAddress() const { + auto endpoint = socket.local_endpoint(); + return NetworkAddress(toIPAddress(endpoint.address()), endpoint.port(), isPublic, false); + } + +private: + UDPSocket(boost::asio::io_service& io_service, Optional toAddress, bool isV6) + : id(nondeterministicRandom()->randomUniqueID()), socket(io_service, isV6 ? udp::v6() : udp::v4()) { + socket.non_blocking(true); + } + + void closeSocket() { + boost::system::error_code error; + socket.close(error); + if (error) + TraceEvent(SevWarn, "N2_CloseError", id) + .suppressFor(1.0) + .detail("ErrorCode", error.value()) + .detail("Message", error.message()); + } + + void onReadError(const boost::system::error_code& error) { + TraceEvent(SevWarn, "N2_UDPReadError", id) + .suppressFor(1.0) + .detail("ErrorCode", error.value()) + .detail("Message", error.message()); + closeSocket(); + } + void onWriteError(const boost::system::error_code& error) { + TraceEvent(SevWarn, "N2_UDPWriteError", id) + .suppressFor(1.0) + .detail("ErrorCode", error.value()) + .detail("Message", error.message()); + closeSocket(); + } + + void init() { + socket.non_blocking(true); + platform::setCloseOnExec(socket.native_handle()); + } +}; + class Listener : public IListener, ReferenceCounted { boost::asio::io_context& io_service; NetworkAddress listenAddress; @@ -1430,6 +1640,14 @@ ACTOR static Future> resolveTCPEndpoint_impl( Net2 * return result.get(); } +Future> Net2::createUDPSocket(NetworkAddress toAddr) { + return UDPSocket::connect(&reactor.ios, toAddr, toAddr.ip.isV6()); +} + +Future> Net2::createUDPSocket(bool isV6) { + return UDPSocket::connect(&reactor.ios, Optional(), isV6); +} + Future> Net2::resolveTCPEndpoint( std::string host, std::string service) { return resolveTCPEndpoint_impl(this, host, service); } diff --git a/flow/genericactors.actor.h b/flow/genericactors.actor.h index 33ff691814..91894fe3d1 100644 --- a/flow/genericactors.actor.h +++ b/flow/genericactors.actor.h @@ -21,6 +21,7 @@ #pragma once // When actually compiled (NO_INTELLISENSE), include the generated version of this file. In intellisense use the source version. +#include #if defined(NO_INTELLISENSE) && !defined(FLOW_GENERICACTORS_ACTOR_G_H) #define FLOW_GENERICACTORS_ACTOR_G_H #include "flow/genericactors.actor.g.h" @@ -1810,6 +1811,37 @@ Future timeReply(Future replyToTime, PromiseStream timeOutput){ return Void(); } +// Monad + +ACTOR template +Future()(std::declval()))> fmap(Fun fun, Future f) { + T val = wait(f); + return fun(val); +} + +ACTOR template +Future()(std::declval()).getValue())> runAfter(Future lhs, Fun rhs) { + T val1 = wait(lhs); + decltype(std::declval()(std::declval()).getValue()) res = wait(rhs(val1)); + return res; +} + +ACTOR template +Future runAfter(Future lhs, Future rhs) { + T val1 = wait(lhs); + U res = wait(rhs); + return res; +} + +template +Future operator>>=(Future lhs, std::function(T const&)> rhs) { + return runAfter(lhs, rhs); +} + +template +Future operator>> (Future const& lhs, Future const& rhs) { + return runAfter(lhs, rhs); +} #include "flow/unactorcompiler.h" diff --git a/flow/network.cpp b/flow/network.cpp index 6e1b49a27b..17a9f8c03d 100644 --- a/flow/network.cpp +++ b/flow/network.cpp @@ -167,6 +167,8 @@ Future> INetworkConnections::connect( std::string host, s }); } +IUDPSocket::~IUDPSocket() {} + const std::vector NetworkMetrics::starvationBins = { 1, 3500, 7000, 7500, 8500, 8900, 10500 }; TEST_CASE("/flow/network/ipaddress") { diff --git a/flow/network.h b/flow/network.h index ec325fcfc7..7bc8f5a412 100644 --- a/flow/network.h +++ b/flow/network.h @@ -545,25 +545,22 @@ protected: class IUDPSocket { public: + // see https://en.wikipedia.org/wiki/User_Datagram_Protocol - the max size of a UDP packet + // This is enforced in simulation + constexpr static size_t MAX_PACKET_SIZE = 65535; virtual ~IUDPSocket(); virtual void addref() = 0; virtual void delref() = 0; - virtual void send(StringRef packet) = 0; + virtual void close() = 0; + virtual Future send(uint8_t const* begin, uint8_t const* end) = 0; + virtual Future sendTo(uint8_t const* begin, uint8_t const* end, NetworkAddress const& peer) = 0; + virtual Future receive(uint8_t* begin, uint8_t* end) = 0; + virtual Future receiveFrom(uint8_t* begin, uint8_t* end, NetworkAddress* sender) = 0; + virtual void bind(NetworkAddress const& addr) = 0; - virtual void sendTo(NetworkAddress const& addr, StringRef packet) = 0; - - // Reads as many bytes as possible from the read buffer into [begin,end) and returns the number of bytes read (might be 0) - // Puts the address of the sender into outAddr - virtual int readFrom(NetworkAddress* outAddr, uint8_t* begin, uint8_t* end) = 0; - - // Reads as many bytes as possible from the read buffer into [begin,end) and returns the number of bytes read (might be 0) - virtual int read(uint8_t* begin, uint8_t* end); - - // Precondition: read() has been called and last returned 0 - // returns when read() can read at least one byte (or may throw an error if the connection dies) - virtual Future onReadable() = 0; virtual UID getDebugID() const = 0; + virtual NetworkAddress localAddress() const = 0; }; class INetworkConnections { @@ -575,6 +572,11 @@ public: // Make an outgoing connection to the given address. May return an error or block indefinitely in case of connection problems! virtual Future> connect( NetworkAddress toAddr, std::string host = "") = 0; + // Make an outgoing udp connection and connect to the passed address. + virtual Future> createUDPSocket(NetworkAddress toAddr) = 0; + // Make an outgoing udp connection without establishing a connection + virtual Future> createUDPSocket(bool isV6 = false) = 0; + // Resolve host name and service name (such as "http" or can be a plain number like "80") to a list of 1 or more NetworkAddresses virtual Future> resolveTCPEndpoint( std::string host, std::string service ) = 0;