mirror of https://github.com/ByConity/ByConity
Merge pull request #1759 from yandex/CLICKHOUSE-3432
Fixed 'intersects previous part' due to blind OPTIMIZE
This commit is contained in:
commit
1b462b0b94
|
@ -364,6 +364,7 @@ namespace ErrorCodes
|
|||
extern const int MULTIPLE_STREAMS_REQUIRED = 385;
|
||||
extern const int NO_COMMON_TYPE = 386;
|
||||
extern const int EXTERNAL_LOADABLE_ALREADY_EXISTS = 387;
|
||||
extern const int CANNOT_ASSIGN_OPTIMIZE = 388;
|
||||
|
||||
extern const int KEEPER_EXCEPTION = 999;
|
||||
extern const int POCO_EXCEPTION = 1000;
|
||||
|
|
|
@ -305,6 +305,7 @@ struct Settings
|
|||
M(SettingSeconds, http_connection_timeout, DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT, "HTTP connection timeout.") \
|
||||
M(SettingSeconds, http_send_timeout, DEFAULT_HTTP_READ_BUFFER_TIMEOUT, "HTTP send timeout") \
|
||||
M(SettingSeconds, http_receive_timeout, DEFAULT_HTTP_READ_BUFFER_TIMEOUT, "HTTP receive timeout") \
|
||||
M(SettingBool, optimize_throw_if_noop, false, "If setting is enabled and OPTIMIZE query didn't actually assign a merge then an explanatory exception is thrown")
|
||||
|
||||
|
||||
/// Possible limits for query execution.
|
||||
|
|
|
@ -150,12 +150,17 @@ bool MergeTreeDataMerger::selectPartsToMerge(
|
|||
FuturePart & future_part,
|
||||
bool aggressive,
|
||||
size_t max_total_size_to_merge,
|
||||
const AllowedMergingPredicate & can_merge_callback)
|
||||
const AllowedMergingPredicate & can_merge_callback,
|
||||
String * out_disable_reason)
|
||||
{
|
||||
MergeTreeData::DataPartsVector data_parts = data.getDataPartsVector();
|
||||
|
||||
if (data_parts.empty())
|
||||
{
|
||||
if (out_disable_reason)
|
||||
*out_disable_reason = "There are no parts in the table";
|
||||
return false;
|
||||
}
|
||||
|
||||
time_t current_time = time(nullptr);
|
||||
|
||||
|
@ -166,7 +171,7 @@ bool MergeTreeDataMerger::selectPartsToMerge(
|
|||
for (const MergeTreeData::DataPartPtr & part : data_parts)
|
||||
{
|
||||
const String & partition_id = part->info.partition_id;
|
||||
if (!prev_partition_id || partition_id != *prev_partition_id || (prev_part && !can_merge_callback(*prev_part, part)))
|
||||
if (!prev_partition_id || partition_id != *prev_partition_id || (prev_part && !can_merge_callback(*prev_part, part, nullptr)))
|
||||
{
|
||||
if (partitions.empty() || !partitions.back().empty())
|
||||
partitions.emplace_back();
|
||||
|
@ -205,7 +210,11 @@ bool MergeTreeDataMerger::selectPartsToMerge(
|
|||
max_total_size_to_merge);
|
||||
|
||||
if (parts_to_merge.empty())
|
||||
{
|
||||
if (out_disable_reason)
|
||||
*out_disable_reason = "There are no need to merge parts according to merge selector algorithm";
|
||||
return false;
|
||||
}
|
||||
|
||||
if (parts_to_merge.size() == 1)
|
||||
throw Exception("Logical error: merge selector returned only one part to merge", ErrorCodes::LOGICAL_ERROR);
|
||||
|
@ -229,7 +238,8 @@ bool MergeTreeDataMerger::selectAllPartsToMergeWithinPartition(
|
|||
size_t available_disk_space,
|
||||
const AllowedMergingPredicate & can_merge,
|
||||
const String & partition_id,
|
||||
bool final)
|
||||
bool final,
|
||||
String * out_disable_reason)
|
||||
{
|
||||
MergeTreeData::DataPartsVector parts = selectAllPartsFromPartition(partition_id);
|
||||
|
||||
|
@ -237,7 +247,11 @@ bool MergeTreeDataMerger::selectAllPartsToMergeWithinPartition(
|
|||
return false;
|
||||
|
||||
if (!final && parts.size() == 1)
|
||||
{
|
||||
if (out_disable_reason)
|
||||
*out_disable_reason = "There is only one part inside partition";
|
||||
return false;
|
||||
}
|
||||
|
||||
auto it = parts.begin();
|
||||
auto prev_it = it;
|
||||
|
@ -246,7 +260,7 @@ bool MergeTreeDataMerger::selectAllPartsToMergeWithinPartition(
|
|||
while (it != parts.end())
|
||||
{
|
||||
/// For the case of one part, we check that it can be merged "with itself".
|
||||
if ((it != parts.begin() || parts.size() == 1) && !can_merge(*prev_it, *it))
|
||||
if ((it != parts.begin() || parts.size() == 1) && !can_merge(*prev_it, *it, out_disable_reason))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
@ -258,7 +272,8 @@ bool MergeTreeDataMerger::selectAllPartsToMergeWithinPartition(
|
|||
}
|
||||
|
||||
/// Enough disk space to cover the new merge with a margin.
|
||||
if (available_disk_space <= sum_bytes * DISK_USAGE_COEFFICIENT_TO_SELECT)
|
||||
auto required_disk_space = sum_bytes * DISK_USAGE_COEFFICIENT_TO_SELECT;
|
||||
if (available_disk_space <= required_disk_space)
|
||||
{
|
||||
time_t now = time(nullptr);
|
||||
if (now - disk_space_warning_time > 3600)
|
||||
|
@ -273,6 +288,11 @@ bool MergeTreeDataMerger::selectAllPartsToMergeWithinPartition(
|
|||
<< " required now (+" << static_cast<int>((DISK_USAGE_COEFFICIENT_TO_SELECT - 1.0) * 100)
|
||||
<< "% on overhead); suppressing similar warnings for the next hour");
|
||||
}
|
||||
|
||||
if (out_disable_reason)
|
||||
*out_disable_reason = "Insufficient available disk space, required " +
|
||||
formatReadableSizeWithDecimalSuffix(required_disk_space);
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
|
|
|
@ -20,7 +20,7 @@ class MergeTreeDataMerger
|
|||
{
|
||||
public:
|
||||
using CancellationHook = std::function<void()>;
|
||||
using AllowedMergingPredicate = std::function<bool (const MergeTreeData::DataPartPtr &, const MergeTreeData::DataPartPtr &)>;
|
||||
using AllowedMergingPredicate = std::function<bool (const MergeTreeData::DataPartPtr &, const MergeTreeData::DataPartPtr &, String * reason)>;
|
||||
|
||||
struct FuturePart
|
||||
{
|
||||
|
@ -65,7 +65,8 @@ public:
|
|||
FuturePart & future_part,
|
||||
bool aggressive,
|
||||
size_t max_total_size_to_merge,
|
||||
const AllowedMergingPredicate & can_merge);
|
||||
const AllowedMergingPredicate & can_merge,
|
||||
String * out_disable_reason = nullptr);
|
||||
|
||||
/** Select all the parts in the specified partition for merge, if possible.
|
||||
* final - choose to merge even a single part - that is, allow to merge one part "with itself".
|
||||
|
@ -75,7 +76,8 @@ public:
|
|||
size_t available_disk_space,
|
||||
const AllowedMergingPredicate & can_merge,
|
||||
const String & partition_id,
|
||||
bool final);
|
||||
bool final,
|
||||
String * out_disable_reason = nullptr);
|
||||
|
||||
/** Merge the parts.
|
||||
* If `reservation != nullptr`, now and then reduces the size of the reserved space
|
||||
|
|
|
@ -27,6 +27,7 @@ namespace ErrorCodes
|
|||
extern const int ABORTED;
|
||||
extern const int BAD_ARGUMENTS;
|
||||
extern const int INCORRECT_DATA;
|
||||
extern const int CANNOT_ASSIGN_OPTIMIZE;
|
||||
}
|
||||
|
||||
|
||||
|
@ -283,7 +284,8 @@ bool StorageMergeTree::merge(
|
|||
bool aggressive,
|
||||
const String & partition_id,
|
||||
bool final,
|
||||
bool deduplicate)
|
||||
bool deduplicate,
|
||||
String * out_disable_reason)
|
||||
{
|
||||
/// Clear old parts. It does not matter to do it more frequently than each second.
|
||||
if (auto lock = time_after_previous_cleanup.lockTestAndRestartAfter(1))
|
||||
|
@ -304,7 +306,7 @@ bool StorageMergeTree::merge(
|
|||
{
|
||||
std::lock_guard<std::mutex> lock(currently_merging_mutex);
|
||||
|
||||
auto can_merge = [this] (const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right)
|
||||
auto can_merge = [this] (const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right, String *)
|
||||
{
|
||||
return !currently_merging.count(left) && !currently_merging.count(right);
|
||||
};
|
||||
|
@ -315,11 +317,11 @@ bool StorageMergeTree::merge(
|
|||
{
|
||||
size_t max_parts_size_for_merge = merger.getMaxPartsSizeForMerge();
|
||||
if (max_parts_size_for_merge > 0)
|
||||
selected = merger.selectPartsToMerge(future_part, aggressive, max_parts_size_for_merge, can_merge);
|
||||
selected = merger.selectPartsToMerge(future_part, aggressive, max_parts_size_for_merge, can_merge, out_disable_reason);
|
||||
}
|
||||
else
|
||||
{
|
||||
selected = merger.selectAllPartsToMergeWithinPartition(future_part, disk_space, can_merge, partition_id, final);
|
||||
selected = merger.selectAllPartsToMergeWithinPartition(future_part, disk_space, can_merge, partition_id, final, out_disable_reason);
|
||||
}
|
||||
|
||||
if (!selected)
|
||||
|
@ -452,7 +454,16 @@ bool StorageMergeTree::optimize(
|
|||
String partition_id;
|
||||
if (partition)
|
||||
partition_id = data.getPartitionIDFromQuery(partition, context);
|
||||
return merge(context.getSettingsRef().min_bytes_to_use_direct_io, true, partition_id, final, deduplicate);
|
||||
|
||||
String disable_reason;
|
||||
if (!merge(context.getSettingsRef().min_bytes_to_use_direct_io, true, partition_id, final, deduplicate, &disable_reason))
|
||||
{
|
||||
if (context.getSettingsRef().optimize_throw_if_noop)
|
||||
throw Exception(disable_reason.empty() ? "Can't OPTIMIZE by some reason" : disable_reason, ErrorCodes::CANNOT_ASSIGN_OPTIMIZE);
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -114,7 +114,8 @@ private:
|
|||
* If aggressive - when selects parts don't takes into account their ratio size and novelty (used for OPTIMIZE query).
|
||||
* Returns true if merge is finished successfully.
|
||||
*/
|
||||
bool merge(size_t aio_threshold, bool aggressive, const String & partition_id, bool final, bool deduplicate);
|
||||
bool merge(size_t aio_threshold, bool aggressive, const String & partition_id, bool final, bool deduplicate,
|
||||
String * out_disable_reason = nullptr);
|
||||
|
||||
bool mergeTask();
|
||||
|
||||
|
|
|
@ -95,6 +95,7 @@ namespace ErrorCodes
|
|||
extern const int TOO_MUCH_FETCHES;
|
||||
extern const int BAD_DATA_PART_NAME;
|
||||
extern const int PART_IS_TEMPORARILY_LOCKED;
|
||||
extern const int CANNOT_ASSIGN_OPTIMIZE;
|
||||
}
|
||||
|
||||
|
||||
|
@ -1583,7 +1584,7 @@ namespace
|
|||
bool canMergePartsAccordingToZooKeeperInfo(
|
||||
const MergeTreeData::DataPartPtr & left,
|
||||
const MergeTreeData::DataPartPtr & right,
|
||||
zkutil::ZooKeeperPtr && zookeeper, const String & zookeeper_path, const MergeTreeData & data)
|
||||
zkutil::ZooKeeperPtr && zookeeper, const String & zookeeper_path, const MergeTreeData & data, String * out_reason = nullptr)
|
||||
{
|
||||
const String & partition_id = left->info.partition_id;
|
||||
|
||||
|
@ -1601,8 +1602,12 @@ namespace
|
|||
throw Exception("Logical error: part written with quorum covers more than one block numbers", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
if (left->info.max_block <= part_info.min_block && right->info.min_block >= part_info.max_block)
|
||||
{
|
||||
if (out_reason)
|
||||
*out_reason = "Quorum status condition is unsatisfied";
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/// Won't merge last_part even if quorum is satisfied, because we gonna check if replica has this part
|
||||
/// on SELECT execution.
|
||||
|
@ -1615,8 +1620,12 @@ namespace
|
|||
throw Exception("Logical error: part written with quorum covers more than one block numbers", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
if (left->info.max_block <= part_info.min_block && right->info.min_block >= part_info.max_block)
|
||||
{
|
||||
if (out_reason)
|
||||
*out_reason = "Quorum 'last part' condition is unsatisfied";
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/// You can merge the parts, if all the numbers between them are abandoned - do not correspond to any blocks.
|
||||
for (Int64 number = left->info.max_block + 1; number <= right->info.min_block - 1; ++number)
|
||||
|
@ -1626,8 +1635,34 @@ namespace
|
|||
|
||||
if (AbandonableLockInZooKeeper::check(path1, *zookeeper) != AbandonableLockInZooKeeper::ABANDONED &&
|
||||
AbandonableLockInZooKeeper::check(path2, *zookeeper) != AbandonableLockInZooKeeper::ABANDONED)
|
||||
{
|
||||
if (out_reason)
|
||||
*out_reason = "Block " + toString(number) + " in gap between merging parts " + left->name + " and "
|
||||
+ right->name + " is not abandoned";
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
/// If any of the parts is already going to be merged into a larger one, do not agree to merge it.
|
||||
bool partsWillNotBeMergedOrDisabled(const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right,
|
||||
ReplicatedMergeTreeQueue & queue, String * out_reason = nullptr)
|
||||
{
|
||||
auto set_reason = [&out_reason] (const String & part_name)
|
||||
{
|
||||
if (out_reason)
|
||||
*out_reason = "Part " + part_name + " cannot be merged yet, a merge has already assigned for it or it is temporarily disabled";
|
||||
return false;
|
||||
};
|
||||
|
||||
if (queue.partWillBeMergedOrMergesDisabled(left->name))
|
||||
return set_reason(left->name);
|
||||
|
||||
if (left.get() != right.get() && queue.partWillBeMergedOrMergesDisabled(right->name))
|
||||
return set_reason(right->name);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
@ -1722,7 +1757,6 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
|
|||
LOG_DEBUG(log, "Merge selecting thread started");
|
||||
|
||||
bool deduplicate = false; /// TODO: read deduplicate option from table config
|
||||
bool need_pull = true;
|
||||
|
||||
auto uncached_merging_predicate = [this](const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right)
|
||||
{
|
||||
|
@ -1739,15 +1773,10 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
|
|||
/// Will be updated below.
|
||||
std::chrono::steady_clock::time_point now;
|
||||
|
||||
auto can_merge = [&]
|
||||
(const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right)
|
||||
auto can_merge = [&] (const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right, String *)
|
||||
{
|
||||
/// If any of the parts is already going to be merge into a larger one, do not agree to merge it.
|
||||
if (queue.partWillBeMergedOrMergesDisabled(left->name)
|
||||
|| (left.get() != right.get() && queue.partWillBeMergedOrMergesDisabled(right->name)))
|
||||
return false;
|
||||
|
||||
return cached_merging_predicate.get(now, uncached_merging_predicate, merging_predicate_args_to_key, left, right);
|
||||
return partsWillNotBeMergedOrDisabled(left, right, queue)
|
||||
&& cached_merging_predicate.get(now, uncached_merging_predicate, merging_predicate_args_to_key, left, right);
|
||||
};
|
||||
|
||||
while (!shutdown_called && is_leader_node)
|
||||
|
@ -1756,20 +1785,20 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
|
|||
|
||||
try
|
||||
{
|
||||
if (need_pull)
|
||||
{
|
||||
/// You need to load new entries into the queue before you select parts to merge.
|
||||
/// (so we know which parts are already going to be merged).
|
||||
pullLogsToQueue();
|
||||
need_pull = false;
|
||||
}
|
||||
|
||||
std::lock_guard<std::mutex> merge_selecting_lock(merge_selecting_mutex);
|
||||
|
||||
/** If many merges is already queued, then will queue only small enough merges.
|
||||
* Otherwise merge queue could be filled with only large merges,
|
||||
* and in the same time, many small parts could be created and won't be merged.
|
||||
*/
|
||||
/// You need to load new entries into the queue before you select parts to merge.
|
||||
/// (so we know which parts are already going to be merged).
|
||||
/// We must select parts for merge under the mutex because other threads (OPTIMIZE queries) could push new merges.
|
||||
if (merge_selecting_logs_pulling_is_required)
|
||||
{
|
||||
pullLogsToQueue();
|
||||
merge_selecting_logs_pulling_is_required = false;
|
||||
}
|
||||
|
||||
/// If many merges is already queued, then will queue only small enough merges.
|
||||
/// Otherwise merge queue could be filled with only large merges,
|
||||
/// and in the same time, many small parts could be created and won't be merged.
|
||||
size_t merges_queued = queue.countMerges();
|
||||
|
||||
if (merges_queued >= data.settings.max_replicated_merges_in_queue)
|
||||
|
@ -1787,14 +1816,10 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
|
|||
now = std::chrono::steady_clock::now();
|
||||
|
||||
if (max_parts_size_for_merge > 0
|
||||
&& merger.selectPartsToMerge(
|
||||
future_merged_part, false,
|
||||
max_parts_size_for_merge,
|
||||
can_merge)
|
||||
&& createLogEntryToMergeParts(future_merged_part.parts, future_merged_part.name, deduplicate))
|
||||
&& merger.selectPartsToMerge(future_merged_part, false, max_parts_size_for_merge, can_merge))
|
||||
{
|
||||
success = true;
|
||||
need_pull = true;
|
||||
merge_selecting_logs_pulling_is_required = true;
|
||||
success = createLogEntryToMergeParts(future_merged_part.parts, future_merged_part.name, deduplicate);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2346,42 +2371,54 @@ bool StorageReplicatedMergeTree::optimize(const ASTPtr & query, const ASTPtr & p
|
|||
return true;
|
||||
}
|
||||
|
||||
auto can_merge = [this]
|
||||
(const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right)
|
||||
auto can_merge = [this] (const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right, String * out_reason)
|
||||
{
|
||||
return canMergePartsAccordingToZooKeeperInfo(left, right, getZooKeeper(), zookeeper_path, data);
|
||||
return partsWillNotBeMergedOrDisabled(left, right, queue, out_reason)
|
||||
&& canMergePartsAccordingToZooKeeperInfo(left, right, getZooKeeper(), zookeeper_path, data, out_reason);
|
||||
};
|
||||
|
||||
pullLogsToQueue();
|
||||
|
||||
ReplicatedMergeTreeLogEntryData merge_entry;
|
||||
{
|
||||
std::lock_guard<std::mutex> merge_selecting_lock(merge_selecting_mutex);
|
||||
|
||||
/// We must select parts for merge under the mutex because other threads (OPTIMIZE queries) could push new merges.
|
||||
pullLogsToQueue();
|
||||
|
||||
size_t disk_space = DiskSpaceMonitor::getUnreservedFreeSpace(full_path);
|
||||
|
||||
MergeTreeDataMerger::FuturePart future_merged_part;
|
||||
String disable_reason;
|
||||
bool selected = false;
|
||||
|
||||
if (!partition)
|
||||
{
|
||||
selected = merger.selectPartsToMerge(
|
||||
future_merged_part, true, data.settings.max_bytes_to_merge_at_max_space_in_pool, can_merge);
|
||||
future_merged_part, true, data.settings.max_bytes_to_merge_at_max_space_in_pool, can_merge, &disable_reason);
|
||||
}
|
||||
else
|
||||
{
|
||||
String partition_id = data.getPartitionIDFromQuery(partition, context);
|
||||
selected = merger.selectAllPartsToMergeWithinPartition(future_merged_part, disk_space, can_merge, partition_id, final);
|
||||
selected = merger.selectAllPartsToMergeWithinPartition(future_merged_part, disk_space, can_merge, partition_id, final, &disable_reason);
|
||||
}
|
||||
|
||||
auto handle_noop = [&] (const String & message)
|
||||
{
|
||||
if (context.getSettingsRef().optimize_throw_if_noop)
|
||||
throw Exception(message, ErrorCodes::CANNOT_ASSIGN_OPTIMIZE);
|
||||
return false;
|
||||
};
|
||||
|
||||
if (!selected)
|
||||
{
|
||||
LOG_INFO(log, "Cannot select parts for optimization");
|
||||
return false;
|
||||
LOG_INFO(log, "Cannot select parts for optimization" + (disable_reason.empty() ? "" : ": " + disable_reason));
|
||||
return handle_noop(disable_reason);
|
||||
}
|
||||
|
||||
/// It is important to pull new logs (even if creation of the entry fails due to network error)
|
||||
merge_selecting_logs_pulling_is_required = true;
|
||||
|
||||
if (!createLogEntryToMergeParts(future_merged_part.parts, future_merged_part.name, deduplicate, &merge_entry))
|
||||
return false;
|
||||
return handle_noop("Can't create merge queue node in ZooKeeper");
|
||||
}
|
||||
|
||||
waitForAllReplicasToProcessLogEntry(merge_entry);
|
||||
|
|
|
@ -256,7 +256,11 @@ private:
|
|||
/// A thread that selects parts to merge.
|
||||
std::thread merge_selecting_thread;
|
||||
Poco::Event merge_selecting_event;
|
||||
std::mutex merge_selecting_mutex; /// It is taken for each iteration of the selection of parts to merge.
|
||||
/// It is acquired for each iteration of the selection of parts to merge or each OPTIMIZE query.
|
||||
std::mutex merge_selecting_mutex;
|
||||
/// If true then new entries might added to the queue, so we must pull logs before selecting parts for merge.
|
||||
/// Is used only to avoid superfluous pullLogsToQueue() calls
|
||||
bool merge_selecting_logs_pulling_is_required = true;
|
||||
|
||||
/// A thread that removes old parts, log entries, and blocks.
|
||||
std::unique_ptr<ReplicatedMergeTreeCleanupThread> cleanup_thread;
|
||||
|
|
Loading…
Reference in New Issue