foundationdb/fdbclient/FluentDSampleIngestor.cpp

261 lines
7.6 KiB
C++

/*
* FluentDSampleIngestor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2021 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbclient/ActorLineageProfiler.h"
#include <boost/asio.hpp>
#include <boost/asio/co_spawn.hpp>
#include <msgpack.hpp>
namespace {
boost::asio::ip::address ipAddress(IPAddress const& n) {
if (n.isV6()) {
return boost::asio::ip::address_v6(n.toV6());
} else {
return boost::asio::ip::address_v4(n.toV4());
}
}
template <class Protocol>
boost::asio::ip::basic_endpoint<Protocol> toEndpoint(NetworkAddress const n) {
return boost::asio::ip::basic_endpoint<Protocol>(ipAddress(n.ip), n.port);
}
struct FluentDSocket {
virtual ~FluentDSocket() {}
virtual void connect(NetworkAddress const& endpoint) = 0;
virtual void send(std::shared_ptr<Sample> const& sample) = 0;
virtual const boost::system::error_code& failed() const = 0;
};
template <class Protocol, class Callback>
class SampleSender : public std::enable_shared_from_this<SampleSender<Protocol, Callback>> {
using Socket = typename Protocol::socket;
using Iter = typename decltype(Sample::data)::iterator;
Socket& socket;
Callback callback;
Iter iter, end;
std::shared_ptr<Sample> sample_; // to keep from being deallocated
struct Buf {
const char* data;
const unsigned size;
Buf(const char* data, unsigned size) : data(data), size(size) {}
Buf(Buf const&) = delete;
Buf& operator=(Buf const&) = delete;
~Buf() { delete[] data; }
};
void sendCompletionHandler(boost::system::error_code const& ec) {
if (ec) {
callback(ec);
} else {
++iter;
sendNext();
}
}
void send(boost::asio::ip::tcp::socket& socket, std::shared_ptr<Buf> const& buf) {
boost::system::error_code ec;
socket.send(boost::asio::const_buffer(buf->data, buf->size), 0, ec);
this->sendCompletionHandler(ec);
}
void send(boost::asio::ip::udp::socket& socket, std::shared_ptr<Buf> const& buf) {
boost::system::error_code ec;
socket.send(boost::asio::const_buffer(buf->data, buf->size), 0, ec);
this->sendCompletionHandler(ec);
}
void sendNext() {
if (iter == end) {
callback(boost::system::error_code());
return;
}
// 1. calculate size of buffer
unsigned size = 1; // 1 for fixmap identifier byte
auto waitState = to_string(iter->first);
if (waitState.size() < 32) {
size += waitState.size() + 1;
} else {
size += waitState.size() + 2;
}
size += iter->second.second;
// 2. allocate the buffer
std::unique_ptr<char[]> buf(new char[size]);
unsigned off = 0;
// 3. serialize fixmap
buf[off++] = 0x81; // map of size 1
// 3.1 serialize key
if (waitState.size() < 32) {
buf[off++] = 0xa0 + waitState.size(); // fixstr
} else {
buf[off++] = 0xd9;
buf[off++] = char(waitState.size());
}
memcpy(buf.get() + off, waitState.data(), waitState.size());
off += waitState.size();
// 3.2 append serialized value
memcpy(buf.get() + off, iter->second.first, iter->second.second);
// 4. send the result to fluentd
send(socket, std::make_shared<Buf>(buf.release(), size));
}
public:
SampleSender(Socket& socket, Callback const& callback, std::shared_ptr<Sample> const& sample)
: socket(socket), callback(callback), iter(sample->data.begin()), end(sample->data.end()), sample_(sample) {
sendNext();
}
};
// Sample function to make instanciation of SampleSender easier
template <class Protocol, class Callback>
std::shared_ptr<SampleSender<Protocol, Callback>> makeSampleSender(typename Protocol::socket& socket,
Callback const& callback,
std::shared_ptr<Sample> const& sample) {
return std::make_shared<SampleSender<Protocol, Callback>>(socket, callback, sample);
}
template <class Protocol>
struct FluentDSocketImpl : FluentDSocket, std::enable_shared_from_this<FluentDSocketImpl<Protocol>> {
static constexpr unsigned MAX_QUEUE_SIZE = 100;
boost::asio::io_context& context;
typename Protocol::socket socket;
FluentDSocketImpl(boost::asio::io_context& context) : context(context), socket(context) {}
bool ready = false;
std::deque<std::shared_ptr<Sample>> queue;
boost::system::error_code _failed;
const boost::system::error_code& failed() const override { return _failed; }
void sendCompletionHandler(boost::system::error_code const& ec) {
if (ec) {
// TODO: trace error
_failed = ec;
return;
}
if (queue.empty()) {
ready = true;
} else {
auto sample = queue.front();
queue.pop_front();
sendImpl(sample);
}
}
void sendImpl(std::shared_ptr<Sample> const& sample) {
makeSampleSender<Protocol>(
socket,
[self = this->shared_from_this()](boost::system::error_code const& ec) { self->sendCompletionHandler(ec); },
sample);
}
void send(std::shared_ptr<Sample> const& sample) override {
if (_failed) {
return;
}
if (ready) {
ready = false;
sendImpl(sample);
} else {
if (queue.size() < MAX_QUEUE_SIZE) {
queue.push_back(sample);
} // TODO: else trace a warning
}
}
void connect(NetworkAddress const& endpoint) override {
auto to = toEndpoint<Protocol>(endpoint);
socket.async_connect(to, [self = this->shared_from_this()](boost::system::error_code const& ec) {
if (ec) {
// TODO: error handling
self->_failed = ec;
return;
}
self->ready = true;
});
}
};
} // namespace
struct FluentDIngestorImpl {
using Protocol = FluentDIngestor::Protocol;
Protocol protocol;
NetworkAddress endpoint;
boost::asio::io_context& io_context;
std::shared_ptr<FluentDSocket> socket;
boost::asio::steady_timer retryTimer;
FluentDIngestorImpl(Protocol protocol, NetworkAddress const& endpoint)
: protocol(protocol), endpoint(endpoint), io_context(ActorLineageProfiler::instance().context()),
retryTimer(io_context) {
connect();
}
~FluentDIngestorImpl() { retryTimer.cancel(); }
void connect() {
switch (protocol) {
case Protocol::TCP:
socket.reset(new FluentDSocketImpl<boost::asio::ip::tcp>(io_context));
break;
case Protocol::UDP:
socket.reset(new FluentDSocketImpl<boost::asio::ip::udp>(io_context));
break;
}
socket->connect(endpoint);
}
void retry() {
retryTimer = boost::asio::steady_timer(io_context, std::chrono::seconds(1));
retryTimer.async_wait([this](auto const& ec) {
if (ec) {
return;
}
connect();
});
socket.reset();
}
};
FluentDIngestor::~FluentDIngestor() {
delete impl;
}
FluentDIngestor::FluentDIngestor(Protocol protocol, NetworkAddress& endpoint)
: impl(new FluentDIngestorImpl(protocol, endpoint)) {}
void FluentDIngestor::ingest(const std::shared_ptr<Sample>& sample) {
if (!impl->socket) {
// the connection failed in the past and we wait for a timeout before we retry
return;
} else if (impl->socket->failed()) {
impl->retry();
return;
} else {
impl->socket->send(sample);
}
}
void FluentDIngestor::getConfig(std::map<std::string, std::string>& res) const {
res["ingestor"] = "fluentd";
res["collector_endpoint"] = impl->endpoint.toString();
res["collector_protocol"] = impl->protocol == Protocol::TCP ? "tcp" : "udp";
}