Fix kafka tests (#6805)

* Commit offsets more precisely
* Get rid of DelimitedReadBuffer since read buffers don't line up well
* Increase timeouts
This commit is contained in:
Ivan 2019-09-05 00:25:33 +03:00 committed by GitHub
parent b310d755fe
commit ab7df6b6dd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 78 additions and 114 deletions

View File

@ -88,21 +88,12 @@ public:
}
/** How many bytes have been read/written, counting those that are still in the buffer. */
size_t count() const
{
return bytes + offset();
}
size_t count() const { return bytes + offset(); }
/** Check that there is more bytes in buffer after cursor. */
bool ALWAYS_INLINE hasPendingData() const
{
return pos != working_buffer.end();
}
bool ALWAYS_INLINE hasPendingData() const { return available() > 0; }
bool isPadded() const
{
return padded;
}
bool isPadded() const { return padded; }
protected:
/// Read/write position.

View File

@ -1,57 +0,0 @@
#pragma once
#include <IO/ReadBuffer.h>
#include <Common/typeid_cast.h>
namespace DB
{
/// Consistently reads from one sub-buffer in a circle, and delimits its output with a character.
/// Owns sub-buffer.
class DelimitedReadBuffer : public ReadBuffer
{
public:
DelimitedReadBuffer(std::unique_ptr<ReadBuffer> buffer_, char delimiter_) : ReadBuffer(nullptr, 0), buffer(std::move(buffer_)), delimiter(delimiter_)
{
// TODO: check that `buffer_` is not nullptr.
}
template <class BufferType>
BufferType * subBufferAs()
{
return typeid_cast<BufferType *>(buffer.get());
}
void reset()
{
BufferBase::set(nullptr, 0, 0);
}
protected:
// XXX: don't know how to guarantee that the next call to this method is done after we read all previous data.
bool nextImpl() override
{
if (put_delimiter)
{
BufferBase::set(&delimiter, 1, 0);
put_delimiter = false;
}
else
{
if (!buffer->next())
return false;
BufferBase::set(buffer->position(), buffer->available(), 0);
put_delimiter = (delimiter != 0);
}
return true;
}
private:
std::unique_ptr<ReadBuffer> buffer; // FIXME: should be `const`, but `ReadBuffer` doesn't allow
char delimiter; // FIXME: should be `const`, but `ReadBuffer` doesn't allow
bool put_delimiter = false;
};
}

View File

@ -28,10 +28,7 @@ KafkaBlockInputStream::~KafkaBlockInputStream()
return;
if (broken)
{
buffer->subBufferAs<ReadBufferFromKafkaConsumer>()->unsubscribe();
buffer->reset();
}
buffer->unsubscribe();
storage.pushReadBuffer(buffer);
}
@ -50,23 +47,22 @@ void KafkaBlockInputStream::readPrefixImpl()
if (!buffer)
return;
buffer->subBufferAs<ReadBufferFromKafkaConsumer>()->subscribe(storage.getTopics());
buffer->subscribe(storage.getTopics());
const auto & limits_ = getLimits();
const size_t poll_timeout = buffer->subBufferAs<ReadBufferFromKafkaConsumer>()->pollTimeout();
const size_t poll_timeout = buffer->pollTimeout();
size_t rows_portion_size = poll_timeout ? std::min<size_t>(max_block_size, limits_.max_execution_time.totalMilliseconds() / poll_timeout) : max_block_size;
rows_portion_size = std::max(rows_portion_size, 1ul);
auto non_virtual_header = storage.getSampleBlockNonMaterialized(); /// FIXME: add materialized columns support
auto read_callback = [this]
{
const auto * sub_buffer = buffer->subBufferAs<ReadBufferFromKafkaConsumer>();
virtual_columns[0]->insert(sub_buffer->currentTopic()); // "topic"
virtual_columns[1]->insert(sub_buffer->currentKey()); // "key"
virtual_columns[2]->insert(sub_buffer->currentOffset()); // "offset"
virtual_columns[3]->insert(sub_buffer->currentPartition()); // "partition"
virtual_columns[0]->insert(buffer->currentTopic()); // "topic"
virtual_columns[1]->insert(buffer->currentKey()); // "key"
virtual_columns[2]->insert(buffer->currentOffset()); // "offset"
virtual_columns[3]->insert(buffer->currentPartition()); // "partition"
auto timestamp = sub_buffer->currentTimestamp();
auto timestamp = buffer->currentTimestamp();
if (timestamp)
virtual_columns[4]->insert(std::chrono::duration_cast<std::chrono::seconds>(timestamp->get_timestamp()).count()); // "timestamp"
};
@ -106,7 +102,7 @@ void KafkaBlockInputStream::readSuffixImpl()
if (!buffer)
return;
buffer->subBufferAs<ReadBufferFromKafkaConsumer>()->commit();
buffer->commit();
broken = false;
}

View File

@ -1,5 +1,7 @@
#include <Storages/Kafka/ReadBufferFromKafkaConsumer.h>
#include <common/logger_useful.h>
namespace DB
{
@ -11,6 +13,7 @@ ReadBufferFromKafkaConsumer::ReadBufferFromKafkaConsumer(
size_t max_batch_size,
size_t poll_timeout_,
bool intermediate_commit_,
char delimiter_,
const std::atomic<bool> & stopped_)
: ReadBuffer(nullptr, 0)
, consumer(consumer_)
@ -18,6 +21,7 @@ ReadBufferFromKafkaConsumer::ReadBufferFromKafkaConsumer(
, batch_size(max_batch_size)
, poll_timeout(poll_timeout_)
, intermediate_commit(intermediate_commit_)
, delimiter(delimiter_)
, stopped(stopped_)
, current(messages.begin())
{
@ -68,20 +72,9 @@ void ReadBufferFromKafkaConsumer::commit()
PrintOffsets("Polled offset", consumer->get_offsets_position(consumer->get_assignment()));
if (current != messages.end())
{
/// Since we can poll more messages than we already processed,
/// commit only processed messages.
/// Since we can poll more messages than we already processed - commit only processed messages.
if (!messages.empty())
consumer->async_commit(*std::prev(current));
}
else
{
/// Commit everything we polled so far because either:
/// - read all polled messages (current == messages.end()),
/// - read nothing at all (messages.empty()),
/// - stalled.
consumer->async_commit();
}
PrintOffsets("Committed offset", consumer->get_offsets_committed(consumer->get_assignment()));
@ -152,6 +145,15 @@ bool ReadBufferFromKafkaConsumer::nextImpl()
if (stalled || stopped)
return false;
if (put_delimiter)
{
BufferBase::set(&delimiter, 1, 0);
put_delimiter = false;
return true;
}
put_delimiter = (delimiter != 0);
if (current == messages.end())
{
if (intermediate_commit)

View File

@ -2,15 +2,18 @@
#include <Core/Names.h>
#include <Core/Types.h>
#include <IO/DelimitedReadBuffer.h>
#include <common/logger_useful.h>
#include <IO/ReadBuffer.h>
#include <cppkafka/cppkafka.h>
namespace Poco
{
class Logger;
}
namespace DB
{
using ConsumerBufferPtr = std::shared_ptr<DelimitedReadBuffer>;
using ConsumerPtr = std::shared_ptr<cppkafka::Consumer>;
class ReadBufferFromKafkaConsumer : public ReadBuffer
@ -22,6 +25,7 @@ public:
size_t max_batch_size,
size_t poll_timeout_,
bool intermediate_commit_,
char delimiter_,
const std::atomic<bool> & stopped_);
~ReadBufferFromKafkaConsumer() override;
@ -48,6 +52,9 @@ private:
bool stalled = false;
bool intermediate_commit = true;
char delimiter;
bool put_delimiter = false;
const std::atomic<bool> & stopped;
Messages messages;
@ -56,4 +63,6 @@ private:
bool nextImpl() override;
};
using ConsumerBufferPtr = std::shared_ptr<ReadBufferFromKafkaConsumer>;
}

View File

@ -277,9 +277,7 @@ ConsumerBufferPtr StorageKafka::createReadBuffer()
size_t poll_timeout = settings.stream_poll_timeout_ms.totalMilliseconds();
/// NOTE: we pass |stream_cancelled| by reference here, so the buffers should not outlive the storage.
return std::make_shared<DelimitedReadBuffer>(
std::make_unique<ReadBufferFromKafkaConsumer>(consumer, log, batch_size, poll_timeout, intermediate_commit, stream_cancelled),
row_delimiter);
return std::make_shared<ReadBufferFromKafkaConsumer>(consumer, log, batch_size, poll_timeout, intermediate_commit, row_delimiter, stream_cancelled);
}
@ -400,7 +398,7 @@ bool StorageKafka::streamToViews()
else
in = streams[0];
std::atomic<bool> stub;
std::atomic<bool> stub = {false};
copyData(*in, *block_io.out, &stub);
// Check whether the limits were applied during query execution

View File

@ -0,0 +1,24 @@
<?xml version="1.0"?>
<yandex>
<profiles>
<default>
<stream_poll_timeout_ms>30000</stream_poll_timeout_ms>
</default>
</profiles>
<users>
<default>
<password></password>
<networks incl="networks" replace="replace">
<ip>::/0</ip>
</networks>
<profile>default</profile>
<quota>default</quota>
</default>
</users>
<quotas>
<default>
</default>
</quotas>
</yandex>

View File

@ -30,6 +30,7 @@ import kafka_pb2
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance('instance',
config_dir='configs',
main_configs=['configs/kafka.xml'],
with_kafka=True,
clickhouse_path_dir='clickhouse_path')
@ -136,7 +137,7 @@ def kafka_setup_teardown():
# Tests
@pytest.mark.timeout(60)
@pytest.mark.timeout(180)
def test_kafka_settings_old_syntax(kafka_cluster):
instance.query('''
CREATE TABLE test.kafka (key UInt64, value UInt64)
@ -159,7 +160,7 @@ def test_kafka_settings_old_syntax(kafka_cluster):
kafka_check_result(result, True)
@pytest.mark.timeout(60)
@pytest.mark.timeout(180)
def test_kafka_settings_new_syntax(kafka_cluster):
instance.query('''
CREATE TABLE test.kafka (key UInt64, value UInt64)
@ -195,7 +196,7 @@ def test_kafka_settings_new_syntax(kafka_cluster):
kafka_check_result(result, True)
@pytest.mark.timeout(60)
@pytest.mark.timeout(180)
def test_kafka_csv_with_delimiter(kafka_cluster):
instance.query('''
CREATE TABLE test.kafka (key UInt64, value UInt64)
@ -221,7 +222,7 @@ def test_kafka_csv_with_delimiter(kafka_cluster):
kafka_check_result(result, True)
@pytest.mark.timeout(60)
@pytest.mark.timeout(180)
def test_kafka_tsv_with_delimiter(kafka_cluster):
instance.query('''
CREATE TABLE test.kafka (key UInt64, value UInt64)
@ -247,7 +248,7 @@ def test_kafka_tsv_with_delimiter(kafka_cluster):
kafka_check_result(result, True)
@pytest.mark.timeout(60)
@pytest.mark.timeout(180)
def test_kafka_json_without_delimiter(kafka_cluster):
instance.query('''
CREATE TABLE test.kafka (key UInt64, value UInt64)
@ -277,7 +278,7 @@ def test_kafka_json_without_delimiter(kafka_cluster):
kafka_check_result(result, True)
@pytest.mark.timeout(60)
@pytest.mark.timeout(180)
def test_kafka_protobuf(kafka_cluster):
instance.query('''
CREATE TABLE test.kafka (key UInt64, value String)
@ -302,7 +303,7 @@ def test_kafka_protobuf(kafka_cluster):
kafka_check_result(result, True)
@pytest.mark.timeout(60)
@pytest.mark.timeout(180)
def test_kafka_materialized_view(kafka_cluster):
instance.query('''
DROP TABLE IF EXISTS test.view;
@ -339,7 +340,7 @@ def test_kafka_materialized_view(kafka_cluster):
kafka_check_result(result, True)
@pytest.mark.timeout(60)
@pytest.mark.timeout(180)
def test_kafka_many_materialized_views(kafka_cluster):
instance.query('''
DROP TABLE IF EXISTS test.view1;
@ -437,7 +438,7 @@ def test_kafka_flush_on_big_message(kafka_cluster):
assert int(result) == kafka_messages*batch_messages, 'ClickHouse lost some messages: {}'.format(result)
@pytest.mark.timeout(60)
@pytest.mark.timeout(180)
def test_kafka_virtual_columns(kafka_cluster):
instance.query('''
CREATE TABLE test.kafka (key UInt64, value UInt64)
@ -467,7 +468,7 @@ def test_kafka_virtual_columns(kafka_cluster):
kafka_check_result(result, True, 'test_kafka_virtual1.reference')
@pytest.mark.timeout(60)
@pytest.mark.timeout(180)
def test_kafka_virtual_columns_with_materialized_view(kafka_cluster):
instance.query('''
DROP TABLE IF EXISTS test.view;
@ -504,7 +505,7 @@ def test_kafka_virtual_columns_with_materialized_view(kafka_cluster):
kafka_check_result(result, True, 'test_kafka_virtual2.reference')
@pytest.mark.timeout(60)
@pytest.mark.timeout(180)
def test_kafka_insert(kafka_cluster):
instance.query('''
CREATE TABLE test.kafka (key UInt64, value UInt64)
@ -541,7 +542,7 @@ def test_kafka_insert(kafka_cluster):
kafka_check_result(result, True)
@pytest.mark.timeout(60)
@pytest.mark.timeout(180)
def test_kafka_produce_consume(kafka_cluster):
instance.query('''
CREATE TABLE test.kafka (key UInt64, value UInt64)