mirror of https://github.com/ByConity/ByConity
allow batching Distributed inserts [#CLICKHOUSE-3126]
This commit is contained in:
parent
4a98d83a3b
commit
57ba50f585
|
@ -79,6 +79,9 @@ struct Settings
|
|||
/** Sleep time for StorageDistributed DirectoryMonitors in case there is no work or exception has been thrown */ \
|
||||
M(SettingMilliseconds, distributed_directory_monitor_sleep_time_ms, DBMS_DISTRIBUTED_DIRECTORY_MONITOR_SLEEP_TIME_MS) \
|
||||
\
|
||||
/** Should StorageDistributed DirectoryMonitors try to batch individual inserts into bigger ones. */ \
|
||||
M(SettingBool, distributed_directory_monitor_batch_inserts, false) \
|
||||
\
|
||||
/** Allows disabling WHERE to PREWHERE optimization in SELECT queries from MergeTree */ \
|
||||
M(SettingBool, optimize_move_to_prewhere, true) \
|
||||
\
|
||||
|
|
|
@ -1,11 +1,16 @@
|
|||
#include <DataStreams/RemoteBlockOutputStream.h>
|
||||
#include <DataStreams/NativeBlockInputStream.h>
|
||||
#include <Common/escapeForFileName.h>
|
||||
#include <Common/setThreadName.h>
|
||||
#include <Common/CurrentMetrics.h>
|
||||
#include <Common/StringUtils.h>
|
||||
#include <Common/ClickHouseRevision.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Storages/Distributed/DirectoryMonitor.h>
|
||||
#include <IO/ReadBufferFromFile.h>
|
||||
#include <IO/WriteBufferFromFile.h>
|
||||
#include <IO/CompressedReadBuffer.h>
|
||||
#include <IO/Operators.h>
|
||||
|
||||
#include <boost/algorithm/string/find_iterator.hpp>
|
||||
#include <boost/algorithm/string/finder.hpp>
|
||||
|
@ -82,10 +87,15 @@ namespace
|
|||
|
||||
StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor(StorageDistributed & storage, const std::string & name)
|
||||
: storage(storage), pool{createPool(name)}, path{storage.path + name + '/'}
|
||||
, current_batch_file_path{path + "current_batch.txt"}
|
||||
, default_sleep_time{storage.context.getSettingsRef().distributed_directory_monitor_sleep_time_ms.totalMilliseconds()}
|
||||
, sleep_time{default_sleep_time}
|
||||
, log{&Logger::get(getLoggerName())}
|
||||
{
|
||||
const Settings & settings = storage.context.getSettingsRef();
|
||||
should_batch_inserts = settings.distributed_directory_monitor_batch_inserts;
|
||||
min_batched_block_size_rows = settings.min_insert_block_size_rows;
|
||||
min_batched_block_size_bytes = settings.min_insert_block_size_bytes;
|
||||
}
|
||||
|
||||
|
||||
|
@ -174,12 +184,19 @@ bool StorageDistributedDirectoryMonitor::findFiles()
|
|||
if (files.empty())
|
||||
return false;
|
||||
|
||||
for (const auto & file : files)
|
||||
if (should_batch_inserts)
|
||||
{
|
||||
if (quit)
|
||||
return true;
|
||||
processFilesWithBatching(files);
|
||||
}
|
||||
else
|
||||
{
|
||||
for (const auto & file : files)
|
||||
{
|
||||
if (quit)
|
||||
return true;
|
||||
|
||||
processFile(file.second);
|
||||
processFile(file.second);
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
|
@ -208,26 +225,7 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa
|
|||
}
|
||||
catch (const Exception & e)
|
||||
{
|
||||
const auto code = e.code();
|
||||
|
||||
/// mark file as broken if necessary
|
||||
if (code == ErrorCodes::CHECKSUM_DOESNT_MATCH
|
||||
|| code == ErrorCodes::TOO_LARGE_SIZE_COMPRESSED
|
||||
|| code == ErrorCodes::CANNOT_READ_ALL_DATA
|
||||
|| code == ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF)
|
||||
{
|
||||
const auto last_path_separator_pos = file_path.rfind('/');
|
||||
const auto & path = file_path.substr(0, last_path_separator_pos + 1);
|
||||
const auto & file_name = file_path.substr(last_path_separator_pos + 1);
|
||||
const auto & broken_path = path + "broken/";
|
||||
const auto & broken_file_path = broken_path + file_name;
|
||||
|
||||
Poco::File{broken_path}.createDirectory();
|
||||
Poco::File{file_path}.renameTo(broken_file_path);
|
||||
|
||||
LOG_ERROR(log, "Renamed `" << file_path << "` to `" << broken_file_path << '`');
|
||||
}
|
||||
|
||||
maybeMarkAsBroken(file_path, e);
|
||||
throw;
|
||||
}
|
||||
|
||||
|
@ -236,6 +234,197 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa
|
|||
LOG_TRACE(log, "Finished processing `" << file_path << '`');
|
||||
}
|
||||
|
||||
struct StorageDistributedDirectoryMonitor::Batch
|
||||
{
|
||||
std::vector<UInt64> file_indices;
|
||||
size_t total_rows = 0;
|
||||
size_t total_bytes = 0;
|
||||
|
||||
StorageDistributedDirectoryMonitor & parent;
|
||||
|
||||
explicit Batch(StorageDistributedDirectoryMonitor & parent_) : parent(parent_) {}
|
||||
|
||||
bool isEnoughSize() const
|
||||
{
|
||||
return (!parent.min_batched_block_size_rows && !parent.min_batched_block_size_bytes)
|
||||
|| (parent.min_batched_block_size_rows && total_rows >= parent.min_batched_block_size_rows)
|
||||
|| (parent.min_batched_block_size_bytes && total_bytes >= parent.min_batched_block_size_bytes);
|
||||
}
|
||||
|
||||
void send(bool save, const std::map<UInt64, String> & file_index_to_path)
|
||||
{
|
||||
if (file_indices.empty())
|
||||
return;
|
||||
|
||||
CurrentMetrics::Increment metric_increment{CurrentMetrics::DistributedSend};
|
||||
|
||||
if (save)
|
||||
{
|
||||
/// For deduplication in Replicated tables to work, in case of error
|
||||
/// we must try to re-send exactly the same batches.
|
||||
/// So we save contents of the current batch into the current_batch_file_path file
|
||||
/// and truncate it afterwards if all went well.
|
||||
WriteBufferFromFile out{parent.current_batch_file_path};
|
||||
writeText(out);
|
||||
}
|
||||
|
||||
auto connection = parent.pool->get();
|
||||
|
||||
String insert_query;
|
||||
std::unique_ptr<RemoteBlockOutputStream> remote;
|
||||
bool first = true;
|
||||
|
||||
for (UInt64 file_idx : file_indices)
|
||||
{
|
||||
ReadBufferFromFile in{file_index_to_path.at(file_idx)};
|
||||
readStringBinary(insert_query, in); /// NOTE: all files must have the same insert_query
|
||||
|
||||
if (first)
|
||||
{
|
||||
first = false;
|
||||
remote = std::make_unique<RemoteBlockOutputStream>(*connection, insert_query);
|
||||
remote->writePrefix();
|
||||
}
|
||||
|
||||
remote->writePrepared(in);
|
||||
}
|
||||
|
||||
remote->writeSuffix();
|
||||
|
||||
LOG_TRACE(parent.log, "Sent a batch of " << file_indices.size() << " files.");
|
||||
|
||||
for (UInt64 file_index : file_indices)
|
||||
Poco::File{file_index_to_path.at(file_index)}.remove();
|
||||
file_indices.clear();
|
||||
total_rows = 0;
|
||||
total_bytes = 0;
|
||||
|
||||
Poco::File{parent.current_batch_file_path}.setSize(0);
|
||||
}
|
||||
|
||||
void writeText(WriteBuffer & out)
|
||||
{
|
||||
for (UInt64 file_idx : file_indices)
|
||||
out << file_idx << '\n';
|
||||
}
|
||||
|
||||
void readText(ReadBuffer & in)
|
||||
{
|
||||
while (!in.eof())
|
||||
{
|
||||
UInt64 idx;
|
||||
in >> idx >> "\n";
|
||||
file_indices.push_back(idx);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map<UInt64, std::string> & files)
|
||||
{
|
||||
std::unordered_set<UInt64> file_indices_to_skip;
|
||||
|
||||
if (Poco::File{current_batch_file_path}.exists())
|
||||
{
|
||||
/// Possibly, we failed to send a batch on the previous iteration. Try to send exactly the same batch.
|
||||
Batch batch{*this};
|
||||
ReadBufferFromFile in{current_batch_file_path};
|
||||
batch.readText(in);
|
||||
file_indices_to_skip.insert(batch.file_indices.begin(), batch.file_indices.end());
|
||||
batch.send(/* save = */ false, files);
|
||||
}
|
||||
|
||||
std::unordered_map<String, Batch> query_to_batch;
|
||||
|
||||
for (const auto & file : files)
|
||||
{
|
||||
if (quit)
|
||||
return;
|
||||
|
||||
UInt64 file_idx = file.first;
|
||||
const String & file_path = file.second;
|
||||
|
||||
if (file_indices_to_skip.count(file_idx))
|
||||
continue;
|
||||
|
||||
ReadBufferFromFile in{file_path};
|
||||
String insert_query;
|
||||
readStringBinary(insert_query, in);
|
||||
|
||||
Batch & batch = query_to_batch.try_emplace(insert_query, *this).first->second;
|
||||
|
||||
CompressedReadBuffer decompressing_in(in);
|
||||
NativeBlockInputStream block_in(decompressing_in, ClickHouseRevision::get());
|
||||
|
||||
size_t total_rows = 0;
|
||||
size_t total_bytes = 0;
|
||||
|
||||
block_in.readPrefix();
|
||||
try
|
||||
{
|
||||
/// Determine size of the current file and check if it is not broken.
|
||||
while (Block block = block_in.read())
|
||||
{
|
||||
total_rows += block.rows();
|
||||
total_bytes += block.bytes();
|
||||
}
|
||||
}
|
||||
catch (const Exception & e)
|
||||
{
|
||||
if (maybeMarkAsBroken(file_path, e))
|
||||
{
|
||||
tryLogCurrentException(log, "File is marked broken due to");
|
||||
continue;
|
||||
}
|
||||
else
|
||||
throw;
|
||||
}
|
||||
block_in.readSuffix();
|
||||
|
||||
batch.file_indices.push_back(file_idx);
|
||||
batch.total_rows += total_rows;
|
||||
batch.total_bytes += total_bytes;
|
||||
|
||||
if (batch.isEnoughSize())
|
||||
batch.send(/* save = */ true, files);
|
||||
}
|
||||
|
||||
for (auto & kv : query_to_batch)
|
||||
{
|
||||
Batch & batch = kv.second;
|
||||
batch.send(/* save = */ true, files);
|
||||
}
|
||||
|
||||
Poco::File{current_batch_file_path}.remove();
|
||||
}
|
||||
|
||||
|
||||
bool StorageDistributedDirectoryMonitor::maybeMarkAsBroken(const std::string & file_path, const Exception &e) const
|
||||
{
|
||||
const auto code = e.code();
|
||||
|
||||
/// mark file as broken if necessary
|
||||
if (code == ErrorCodes::CHECKSUM_DOESNT_MATCH
|
||||
|| code == ErrorCodes::TOO_LARGE_SIZE_COMPRESSED
|
||||
|| code == ErrorCodes::CANNOT_READ_ALL_DATA
|
||||
|| code == ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF)
|
||||
{
|
||||
const auto last_path_separator_pos = file_path.rfind('/');
|
||||
const auto & path = file_path.substr(0, last_path_separator_pos + 1);
|
||||
const auto & file_name = file_path.substr(last_path_separator_pos + 1);
|
||||
const auto & broken_path = path + "broken/";
|
||||
const auto & broken_file_path = broken_path + file_name;
|
||||
|
||||
Poco::File{broken_path}.createDirectory();
|
||||
Poco::File{file_path}.renameTo(broken_file_path);
|
||||
|
||||
LOG_ERROR(log, "Renamed `" << file_path << "` to `" << broken_file_path << '`');
|
||||
|
||||
return true;
|
||||
}
|
||||
else
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
std::string StorageDistributedDirectoryMonitor::getLoggerName() const
|
||||
{
|
||||
|
|
|
@ -24,11 +24,21 @@ private:
|
|||
ConnectionPoolPtr createPool(const std::string & name);
|
||||
bool findFiles();
|
||||
void processFile(const std::string & file_path);
|
||||
void processFilesWithBatching(const std::map<UInt64, std::string> & files);
|
||||
bool maybeMarkAsBroken(const std::string & file_path, const Exception &e) const;
|
||||
std::string getLoggerName() const;
|
||||
|
||||
StorageDistributed & storage;
|
||||
ConnectionPoolPtr pool;
|
||||
std::string path;
|
||||
|
||||
bool should_batch_inserts = false;
|
||||
size_t min_batched_block_size_rows = 0;
|
||||
size_t min_batched_block_size_bytes = 0;
|
||||
String current_batch_file_path;
|
||||
|
||||
struct Batch;
|
||||
|
||||
size_t error_count{};
|
||||
std::chrono::milliseconds default_sleep_time;
|
||||
std::chrono::milliseconds sleep_time;
|
||||
|
|
Loading…
Reference in New Issue