Use max ttl and add introspection to system parts about recompression

This commit is contained in:
alesapin 2020-09-09 12:15:42 +03:00
parent a696cf12f6
commit 485b104898
8 changed files with 34 additions and 31 deletions

View File

@ -3039,7 +3039,8 @@ CompressionCodecPtr MergeTreeData::getCompressionCodecForPart(size_t part_size_c
auto metadata_snapshot = getInMemoryMetadataPtr();
const auto & recompression_ttl_entries = metadata_snapshot->getRecompressionTTLs();
auto best_ttl_entry = selectTTLDescriptionForTTLInfos(recompression_ttl_entries, ttl_infos.recompression_ttl, current_time, false);
auto best_ttl_entry = selectTTLDescriptionForTTLInfos(recompression_ttl_entries, ttl_infos.recompression_ttl, current_time, true);
if (best_ttl_entry)
return CompressionCodecFactory::instance().get(best_ttl_entry->recompression_codec, {});

View File

@ -162,30 +162,19 @@ void MergeTreeDataPartTTLInfos::write(WriteBuffer & out) const
writeString("}", out);
}
time_t MergeTreeDataPartTTLInfos::getMinRecompressionTTL() const
time_t MergeTreeDataPartTTLInfos::getMinimalMaxRecompressionTTL() const
{
time_t min = std::numeric_limits<time_t>::max();
time_t max = std::numeric_limits<time_t>::max();
for (const auto & [name, info] : recompression_ttl)
{
if (info.min != 0)
min = std::min(info.min, min);
}
if (info.max != 0)
max = std::min(info.max, max);
if (min == std::numeric_limits<time_t>::max())
if (max == std::numeric_limits<time_t>::max())
return 0;
return min;
}
time_t MergeTreeDataPartTTLInfos::getMaxRecompressionTTL() const
{
time_t max = 0;
for (const auto & [name, info] : recompression_ttl)
max = std::max(info.max, max);
return max;
}
std::optional<TTLDescription> selectTTLDescriptionForTTLInfos(const TTLDescriptions & descriptions, const TTLInfoMap & ttl_info_map, time_t current_time, bool use_max)
{
time_t best_ttl_time = 0;

View File

@ -49,11 +49,9 @@ struct MergeTreeDataPartTTLInfos
TTLInfoMap recompression_ttl;
/// Return min recompression TTL value if any, otherwise return zero.
time_t getMinRecompressionTTL() const;
/// Return smalles max recompression TTL value
time_t getMinimalMaxRecompressionTTL() const;
/// Return max recompression TTL value if any, otherwise return zero.
time_t getMaxRecompressionTTL() const;
void read(ReadBuffer & in);
void write(WriteBuffer & out) const;

View File

@ -99,7 +99,7 @@ time_t TTLDeleteMergeSelector::getTTLForPart(const IMergeSelector::Part & part)
time_t TTLRecompressMergeSelector::getTTLForPart(const IMergeSelector::Part & part) const
{
return part.ttl_infos.getMinRecompressionTTL();
return part.ttl_infos.getMinimalMaxRecompressionTTL();
}
bool TTLRecompressMergeSelector::isTTLAlreadySatisfied(const IMergeSelector::Part & part) const
@ -107,7 +107,7 @@ bool TTLRecompressMergeSelector::isTTLAlreadySatisfied(const IMergeSelector::Par
if (recompression_ttls.empty())
return false;
auto ttl_description = selectTTLDescriptionForTTLInfos(recompression_ttls, part.ttl_infos.recompression_ttl, current_time, false);
auto ttl_description = selectTTLDescriptionForTTLInfos(recompression_ttls, part.ttl_infos.recompression_ttl, current_time, true);
if (!ttl_description)
return true;

View File

@ -63,6 +63,10 @@ StorageSystemParts::StorageSystemParts(const StorageID & table_id_)
{"move_ttl_info.max", std::make_shared<DataTypeArray>(std::make_shared<DataTypeDateTime>())},
{"default_compression_codec", std::make_shared<DataTypeString>()},
{"recompression_ttl_info.expression", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())},
{"recompression_ttl_info.min", std::make_shared<DataTypeArray>(std::make_shared<DataTypeDateTime>())},
{"recompression_ttl_info.max", std::make_shared<DataTypeArray>(std::make_shared<DataTypeDateTime>())},
}
)
{
@ -154,26 +158,30 @@ void StorageSystemParts::processNextStorage(MutableColumns & columns_, const Sto
columns_[i++]->insert(static_cast<UInt32>(part->ttl_infos.table_ttl.max));
}
/// move_ttl_info
auto add_ttl_info_map = [&](const TTLInfoMap & ttl_info_map)
{
Array expression_array;
Array min_array;
Array max_array;
expression_array.reserve(part->ttl_infos.moves_ttl.size());
min_array.reserve(part->ttl_infos.moves_ttl.size());
max_array.reserve(part->ttl_infos.moves_ttl.size());
for (const auto & [expression, move_ttl_info] : part->ttl_infos.moves_ttl)
expression_array.reserve(ttl_info_map.size());
min_array.reserve(ttl_info_map.size());
max_array.reserve(ttl_info_map.size());
for (const auto & [expression, ttl_info] : ttl_info_map)
{
expression_array.emplace_back(expression);
min_array.push_back(static_cast<UInt32>(move_ttl_info.min));
max_array.push_back(static_cast<UInt32>(move_ttl_info.max));
min_array.push_back(static_cast<UInt32>(ttl_info.min));
max_array.push_back(static_cast<UInt32>(ttl_info.max));
}
columns_[i++]->insert(expression_array);
columns_[i++]->insert(min_array);
columns_[i++]->insert(max_array);
}
};
add_ttl_info_map(part->ttl_infos.moves_ttl);
columns_[i++]->insert(queryToString(part->default_codec->getCodecDesc()));
add_ttl_info_map(part->ttl_infos.recompression_ttl);
}
}

View File

@ -106,6 +106,8 @@ def test_recompression_multiple_ttls(started_cluster):
assert node2.query("SELECT default_compression_codec FROM system.parts where name = 'all_1_1_4'") == "ZSTD(12)\n"
assert node2.query("SELECT recompression_ttl_info.expression FROM system.parts where name = 'all_1_1_4'") == "['plus(d, toIntervalSecond(10))','plus(d, toIntervalSecond(15))','plus(d, toIntervalSecond(5))']\n"
def test_recompression_replicated(started_cluster):
for i, node in enumerate([node1, node2]):

View File

@ -13,6 +13,9 @@ CREATE TABLE default.recompression_table\n(\n `dt` DateTime,\n `key` UInt6
1_1_1_2_4 LZ4
2_2_2_2_4 ZSTD(12)
3_3_3_2_4 ZSTD(12)
1_1_1_2_4 ['plus(dt, toIntervalDay(1))']
2_2_2_2_4 ['plus(dt, toIntervalDay(1))']
3_3_3_2_4 ['plus(dt, toIntervalDay(1))']
1_1_1_0 LZ4
2_2_2_0 LZ4
3_3_3_0 LZ4

View File

@ -42,6 +42,8 @@ OPTIMIZE TABLE recompression_table FINAL;
SELECT name, default_compression_codec FROM system.parts WHERE table = 'recompression_table' and active = 1 and database = currentDatabase() ORDER BY name;
SELECT name, recompression_ttl_info.expression FROM system.parts WHERE table = 'recompression_table' and active = 1 and database = currentDatabase() ORDER BY name;
DROP TABLE IF EXISTS recompression_table;
CREATE TABLE recompression_table_compact