DataStreams: added Cap’n Proto format support

Cap'n Proto is a binary message format.
Like Protocol Buffers and Thrift (but unlike JSON or MessagePack), Cap'n Proto messages are strongly-typed and not self-describing. Due to this, it requires a schema setting to specify schema file and the root object. The schema is parsed on runtime and cached for each SQL statement.
This commit is contained in:
Marek Vavruša 2017-10-03 17:00:22 -07:00 committed by alexey-milovidov
parent 3b99b723d7
commit 0d942a69c5
8 changed files with 332 additions and 0 deletions

View File

@ -230,6 +230,7 @@ include (cmake/find_readline_edit.cmake)
include (cmake/find_zookeeper.cmake)
include (cmake/find_re2.cmake)
include (cmake/find_rdkafka.cmake)
include (cmake/find_capnp.cmake)
include (cmake/find_contrib_lib.cmake)
find_contrib_lib(cityhash)

22
cmake/find_capnp.cmake Normal file
View File

@ -0,0 +1,22 @@
option (ENABLE_CAPNP "Enable Cap'n Proto" ON)
if (ENABLE_CAPNP)
set (CAPNP_PATHS "/usr/local/opt/capnp/lib")
set (CAPNP_INCLUDE_PATHS "/usr/local/opt/capnp/include")
find_library (CAPNP capnp PATHS ${CAPNP_PATHS})
find_library (CAPNPC capnpc PATHS ${CAPNP_PATHS})
find_library (KJ kj PATHS ${CAPNP_PATHS})
set (CAPNP_LIBS ${CAPNP} ${CAPNPC} ${KJ})
find_path (CAPNP_INCLUDE_DIR NAMES capnp/schema-parser.h PATHS ${CAPNP_INCLUDE_PATHS})
if (CAPNP_INCLUDE_DIR AND CAPNP_LIBS)
include_directories (${CAPNP_INCLUDE_DIR})
set(USE_CAPNP 1)
endif ()
endif ()
if (USE_CAPNP)
message (STATUS "Using capnp=${USE_CAPNP}: ${CAPNP_INCLUDE_DIR} : ${CAPNP_LIBS}")
else ()
message (STATUS "Build without capnp (support for Cap'n Proto format will be disabled)")
endif ()

View File

@ -182,6 +182,10 @@ if (USE_ICU)
target_link_libraries (dbms ${ICU_LIBS})
endif ()
if (USE_CAPNP)
target_link_libraries (dbms ${CAPNP_LIBS})
endif ()
target_link_libraries (dbms
${PLATFORM_LIBS}
${CMAKE_DL_LIBS}

View File

@ -7,6 +7,7 @@
#cmakedefine01 USE_RE2_ST
#cmakedefine01 USE_VECTORCLASS
#cmakedefine01 USE_RDKAFKA
#cmakedefine01 USE_CAPNP
#cmakedefine01 Poco_DataODBC_FOUND
#cmakedefine01 Poco_MongoDB_FOUND
#cmakedefine01 Poco_NetSSL_FOUND

View File

@ -0,0 +1,197 @@
#if USE_CAPNP
#include <Core/Block.h>
#include <IO/ReadBuffer.h>
#include <DataStreams/CapnProtoInputStream.h>
#include <capnp/serialize.h>
#include <capnp/dynamic.h>
#include <boost/algorithm/string.hpp>
#include <boost/range/join.hpp>
#include <common/logger_useful.h>
namespace DB
{
CapnProtoInputStream::NestedField split(const Block & sample, size_t i)
{
CapnProtoInputStream::NestedField field = {{}, i};
// Remove leading dot in field definition, e.g. ".msg" -> "msg"
String name(sample.safeGetByPosition(i).name);
if (name.size() > 0 && name[0] == '.')
name.erase(0, 1);
boost::split(field.tokens, name, boost::is_any_of("."));
return field;
}
Field convertNodeToField(capnp::DynamicValue::Reader value)
{
switch (value.getType()) {
case capnp::DynamicValue::UNKNOWN:
throw Exception("Unknown field type");
case capnp::DynamicValue::VOID:
return Field();
case capnp::DynamicValue::BOOL:
return UInt64(value.as<bool>() ? 1 : 0);
case capnp::DynamicValue::INT:
return Int64((value.as<int64_t>()));
case capnp::DynamicValue::UINT:
return UInt64(value.as<uint64_t>());
case capnp::DynamicValue::FLOAT:
return Float64(value.as<double>());
case capnp::DynamicValue::TEXT:
{
auto arr = value.as<capnp::Text>();
return String(arr.begin(), arr.size());
}
case capnp::DynamicValue::DATA:
{
auto arr = value.as<capnp::Data>().asChars();
return String(arr.begin(), arr.size());
}
case capnp::DynamicValue::LIST:
{
auto listValue = value.as<capnp::DynamicList>();
Array res(listValue.size());
for (auto i : kj::indices(listValue))
res[i] = convertNodeToField(listValue[i]);
return res;
}
case capnp::DynamicValue::ENUM:
return UInt64(value.as<capnp::DynamicEnum>().getRaw());
case capnp::DynamicValue::STRUCT:
throw Exception("STRUCT type not supported, read individual fields instead");
case capnp::DynamicValue::CAPABILITY:
throw Exception("CAPABILITY type not supported");
case capnp::DynamicValue::ANY_POINTER:
throw Exception("ANY_POINTER type not supported");
}
}
capnp::StructSchema::Field getFieldOrThrow(capnp::StructSchema node, const std::string & field)
{
KJ_IF_MAYBE(child, node.findFieldByName(field))
return *child;
else
throw Exception("Field " + field + " doesn't exist in schema.");
}
void CapnProtoInputStream::createActions(const NestedFieldList & sortedFields, capnp::StructSchema reader)
{
String last;
size_t level = 0;
capnp::StructSchema::Field parent;
for (const auto & field : sortedFields)
{
// Move to a different field in the same structure, keep parent
if (level > 0 && field.tokens[level - 1] != last)
{
auto child = getFieldOrThrow(parent.getContainingStruct(), field.tokens[level - 1]);
reader = child.getType().asStruct();
actions.push_back({Action::POP});
actions.push_back({Action::PUSH, child});
}
// Descend to a nested structure
for (; level < field.tokens.size() - 1; ++level)
{
last = field.tokens[level];
parent = getFieldOrThrow(reader, last);
reader = parent.getType().asStruct();
actions.push_back({Action::PUSH, parent});
}
// Read field from the structure
actions.push_back({Action::READ, getFieldOrThrow(reader, field.tokens[level]), field.pos});
}
}
CapnProtoInputStream::CapnProtoInputStream(ReadBuffer & istr_, const Block & sample_, const String & schema_file, const String & root_object)
: istr(istr_), sample(sample_), parser(std::make_shared<SchemaParser>())
{
// Parse the schema and fetch the root object
auto schema = parser->impl.parseDiskFile(schema_file, schema_file, {});
root = schema.getNested(root_object).asStruct();
/**
* The schema typically consists of fields in various nested structures.
* Here we gather the list of fields and sort them in a way so that fields in the same structur are adjacent,
* and the nesting level doesn't decrease to make traversal easier.
*/
NestedFieldList list;
size_t columns = sample.columns();
for (size_t i = 0; i < columns; ++i)
list.push_back(split(sample, i));
// Reorder list to make sure we don't have to backtrack
std::sort(list.begin(), list.end(), [](const NestedField & a, const NestedField & b)
{
if (a.tokens.size() == b.tokens.size())
return a.tokens < b.tokens;
return a.tokens.size() < b.tokens.size();
});
createActions(list, root);
}
bool CapnProtoInputStream::read(Block & block)
{
if (istr.eof())
return false;
// Read from underlying buffer directly
auto buf = istr.buffer();
auto base = reinterpret_cast<const capnp::word *>(istr.position());
// Check if there's enough bytes in the buffer to read the full message
kj::Array<capnp::word> heap_array;
auto array = kj::arrayPtr(base, buf.size() - istr.offset());
auto expected_words = capnp::expectedSizeInWordsFromPrefix(array);
if (expected_words * sizeof(capnp::word) > array.size())
{
// We'll need to reassemble the message in a contiguous buffer
heap_array = kj::heapArray<capnp::word>(expected_words);
istr.readStrict(heap_array.asChars().begin(), heap_array.asChars().size());
array = heap_array.asPtr();
}
capnp::FlatArrayMessageReader msg(array);
std::vector<capnp::DynamicStruct::Reader> stack;
stack.push_back(msg.getRoot<capnp::DynamicStruct>(root));
for (auto action : actions)
{
switch (action.type) {
case Action::READ: {
auto & col = block.getByPosition(action.column);
Field value = convertNodeToField(stack.back().get(action.field));
col.column->insert(value);
break;
}
case Action::POP:
stack.pop_back();
break;
case Action::PUSH:
stack.push_back(stack.back().get(action.field).as<capnp::DynamicStruct>());
break;
}
}
// Advance buffer position if used directly
if (heap_array.size() == 0)
{
auto parsed = (msg.getEnd() - base) * sizeof(capnp::word);
istr.position() += parsed;
}
return true;
}
}
#endif

View File

@ -0,0 +1,68 @@
#pragma once
#include <Core/Block.h>
#include <DataStreams/IRowInputStream.h>
#include <capnp/schema-parser.h>
namespace DB
{
class ReadBuffer;
/** A stream for reading messages in Cap'n Proto format in given schema.
* Like Protocol Buffers and Thrift (but unlike JSON or MessagePack),
* Cap'n Proto messages are strongly-typed and not self-describing.
* The schema in this case cannot be compiled in, so it uses a runtime schema parser.
* See https://capnproto.org/cxx.html
*/
class CapnProtoInputStream : public IRowInputStream
{
public:
struct NestedField
{
std::vector<std::string> tokens;
size_t pos;
};
using NestedFieldList = std::vector<NestedField>;
/** schema_file - location of the capnproto schema, e.g. "schema.canpn"
* root_object - name to the root object, e.g. "Message"
*/
CapnProtoInputStream(ReadBuffer & istr_, const Block & sample_, const String & schema_file, const String & root_object);
bool read(Block & block) override;
private:
// Build a traversal plan from a sorted list of fields
void createActions(const NestedFieldList & sortedFields, capnp::StructSchema reader);
/* Action for state machine for traversing nested structures. */
struct Action
{
enum Type { POP, PUSH, READ };
Type type;
capnp::StructSchema::Field field;
size_t column;
};
// Wrapper for classes that could throw in destructor
// https://github.com/capnproto/capnproto/issues/553
template <typename T>
struct DestructorCatcher
{
T impl;
template <typename ... Arg>
DestructorCatcher(Arg && ... args) : impl(kj::fwd<Arg>(args)...) {}
~DestructorCatcher() noexcept try { } catch (...) { }
};
using SchemaParser = DestructorCatcher<capnp::SchemaParser>;
ReadBuffer & istr;
const Block sample;
std::shared_ptr<SchemaParser> parser;
capnp::StructSchema root;
std::vector<Action> actions;
};
}

View File

@ -1,3 +1,4 @@
#include <Common/config.h>
#include <Interpreters/Context.h>
#include <DataStreams/NativeBlockInputStream.h>
#include <DataStreams/NativeBlockOutputStream.h>
@ -30,6 +31,11 @@
#include <DataStreams/FormatFactory.h>
#include <DataStreams/SquashingBlockOutputStream.h>
#include <DataTypes/FormatSettingsJSON.h>
#if USE_CAPNP
#include <DataStreams/CapnProtoInputStream.h>
#endif
#include <boost/algorithm/string.hpp>
namespace DB
{
@ -92,6 +98,18 @@ BlockInputStreamPtr FormatFactory::getInput(const String & name, ReadBuffer & bu
{
return wrap_row_stream(std::make_shared<JSONEachRowRowInputStream>(buf, sample, settings.input_format_skip_unknown_fields));
}
#if USE_CAPNP
else if (name == "CapnProto")
{
std::vector<String> tokens;
auto schema_and_root = settings.format_schema.toString();
boost::split(tokens, schema_and_root, boost::is_any_of(":"));
if (tokens.size() != 2)
throw Exception("Format CapnProto requires 'format_schema' setting to have schema_file:root_object format, e.g. 'schema.capnp:Message'");
return wrap_row_stream(std::make_shared<CapnProtoInputStream>(buf, sample, tokens[0], tokens[1]));
}
#endif
else if (name == "TabSeparatedRaw"
|| name == "TSVRaw"
|| name == "BlockTabSeparated"

View File

@ -0,0 +1,21 @@
CapnProto
---------
Cap'n Proto is a binary message format. Like Protocol Buffers and Thrift (but unlike JSON or MessagePack), Cap'n Proto messages are strongly-typed and not self-describing. Due to this, it requires a ``schema`` setting to specify schema file and the root object. The schema is parsed on runtime and cached for each SQL statement.
.. code-block:: sql
SELECT SearchPhrase, count() AS c FROM test.hits GROUP BY SearchPhrase FORMAT CapnProto SETTINGS schema = 'schema.capnp:Message'
When the schema file looks like:
.. code-block:: text
struct Message {
SearchPhrase @0 :Text;
c @1 :Uint64;
}
Deserialisation is almost as efficient as the binary rows format, with typically zero allocation overhead per message.
You can use this format as an efficient exchange message format in your data processing pipeline.