Update mutations interpreter.

This commit is contained in:
Nikolai Kochetov 2020-09-15 20:13:13 +03:00
parent 4c783f19ee
commit 118a8a513e
15 changed files with 111 additions and 121 deletions

View File

@ -1866,34 +1866,7 @@ void InterpreterSelectQuery::executeSubqueriesInSetsAndJoins(QueryPlan & query_p
return;
SizeLimits limits(settings.max_rows_to_transfer, settings.max_bytes_to_transfer, settings.transfer_overflow_mode);
std::vector<QueryPlan> plans;
DataStreams input_streams;
input_streams.emplace_back(query_plan.getCurrentDataStream());
for (auto & [description, set] : subqueries_for_sets)
{
auto plan = std::move(set.source);
std::string type = (set.join != nullptr) ? "JOIN"
: "subquery";
auto creating_set = std::make_unique<CreatingSetStep>(
plan->getCurrentDataStream(),
query_plan.getCurrentDataStream().header,
std::move(description),
std::move(set),
limits,
*context);
creating_set->setStepDescription("Create set for " + type);
plan->addStep(std::move(creating_set));
input_streams.emplace_back(plan->getCurrentDataStream());
plans.emplace_back(std::move(*plan));
}
auto creating_sets = std::make_unique<CreatingSetsStep>(std::move(input_streams));
creating_sets->setStepDescription("Create sets before main query execution");
query_plan.unitePlans(std::move(creating_sets), std::move(plans));
addCreatingSetsStep(query_plan, std::move(subqueries_for_sets), limits, *context);
}

View File

@ -183,13 +183,14 @@ void InterpreterSelectWithUnionQuery::buildQueryPlan(QueryPlan & query_plan)
return;
}
std::vector<QueryPlan> plans(num_plans);
std::vector<std::unique_ptr<QueryPlan>> plans(num_plans);
DataStreams data_streams(num_plans);
for (size_t i = 0; i < num_plans; ++i)
{
nested_interpreters[i]->buildQueryPlan(plans[i]);
data_streams[i] = plans[i].getCurrentDataStream();
plans[i] = std::make_unique<QueryPlan>();
nested_interpreters[i]->buildQueryPlan(*plans[i]);
data_streams[i] = plans[i]->getCurrentDataStream();
}
auto max_threads = context->getSettingsRef().max_threads;

View File

@ -11,6 +11,11 @@
#include <Processors/Transforms/MaterializingTransform.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/QueryPipeline.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/FilterStep.h>
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
#include <Processors/Executors/PipelineExecutingBlockInputStream.h>
#include <DataStreams/CheckSortedBlockInputStream.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTFunction.h>
@ -19,6 +24,7 @@
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/formatAST.h>
#include <IO/WriteHelpers.h>
#include <Processors/QueryPlan/CreatingSetsStep.h>
namespace DB
@ -524,10 +530,11 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run)
SelectQueryOptions().analyze(/* dry_run = */ false).ignoreLimits()};
auto first_stage_header = interpreter.getSampleBlock();
QueryPipeline pipeline;
pipeline.init(Pipe(std::make_shared<NullSource>(first_stage_header)));
addStreamsForLaterStages(stages_copy, pipeline);
updated_header = std::make_unique<Block>(pipeline.getHeader());
QueryPlan plan;
auto source = std::make_shared<NullSource>(first_stage_header);
plan.addStep(std::make_unique<ReadFromPreparedSource>(Pipe(std::move(source))));
auto pipeline = addStreamsForLaterStages(stages_copy, plan);
updated_header = std::make_unique<Block>(pipeline->getHeader());
}
/// Special step to recalculate affected indices and TTL expressions.
@ -656,7 +663,7 @@ ASTPtr MutationsInterpreter::prepareInterpreterSelectQuery(std::vector<Stage> &
return select;
}
void MutationsInterpreter::addStreamsForLaterStages(const std::vector<Stage> & prepared_stages, QueryPipeline & pipeline) const
QueryPipelinePtr MutationsInterpreter::addStreamsForLaterStages(const std::vector<Stage> & prepared_stages, QueryPlan & plan) const
{
for (size_t i_stage = 1; i_stage < prepared_stages.size(); ++i_stage)
{
@ -668,18 +675,12 @@ void MutationsInterpreter::addStreamsForLaterStages(const std::vector<Stage> & p
if (i < stage.filter_column_names.size())
{
/// Execute DELETEs.
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<FilterTransform>(header, step->actions(), stage.filter_column_names[i], false);
});
plan.addStep(std::make_unique<FilterStep>(plan.getCurrentDataStream(), step->actions(), stage.filter_column_names[i], false));
}
else
{
/// Execute UPDATE or final projection.
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ExpressionTransform>(header, step->actions());
});
plan.addStep(std::make_unique<ExpressionStep>(plan.getCurrentDataStream(), step->actions()));
}
}
@ -689,14 +690,17 @@ void MutationsInterpreter::addStreamsForLaterStages(const std::vector<Stage> & p
const Settings & settings = context.getSettingsRef();
SizeLimits network_transfer_limits(
settings.max_rows_to_transfer, settings.max_bytes_to_transfer, settings.transfer_overflow_mode);
pipeline.addCreatingSetsTransform(std::move(subqueries_for_sets), network_transfer_limits, context);
addCreatingSetsStep(plan, std::move(subqueries_for_sets), network_transfer_limits, context);
}
}
pipeline.addSimpleTransform([&](const Block & header)
auto pipeline = plan.buildQueryPipeline();
pipeline->addSimpleTransform([&](const Block & header)
{
return std::make_shared<MaterializingTransform>(header);
});
return pipeline;
}
void MutationsInterpreter::validate()
@ -718,8 +722,11 @@ void MutationsInterpreter::validate()
}
}
auto block_io = select_interpreter->execute();
addStreamsForLaterStages(stages, block_io.pipeline);
QueryPlan plan;
select_interpreter->buildQueryPlan(plan);
addStreamsForLaterStages(stages, plan);
auto pipeline = plan.buildQueryPipeline();
}
BlockInputStreamPtr MutationsInterpreter::execute()
@ -727,10 +734,13 @@ BlockInputStreamPtr MutationsInterpreter::execute()
if (!can_execute)
throw Exception("Cannot execute mutations interpreter because can_execute flag set to false", ErrorCodes::LOGICAL_ERROR);
auto block_io = select_interpreter->execute();
addStreamsForLaterStages(stages, block_io.pipeline);
QueryPlan plan;
select_interpreter->buildQueryPlan(plan);
auto result_stream = block_io.getInputStream();
addStreamsForLaterStages(stages, plan);
auto pipeline = plan.buildQueryPipeline();
BlockInputStreamPtr result_stream = std::make_shared<PipelineExecutingBlockInputStream>(std::move(*pipeline));
/// Sometimes we update just part of columns (for example UPDATE mutation)
/// in this case we don't read sorting key, so just we don't check anything.

View File

@ -13,7 +13,10 @@ namespace DB
{
class Context;
class QueryPlan;
class QueryPipeline;
using QueryPipelinePtr = std::unique_ptr<QueryPipeline>;
/// Return false if the data isn't going to be changed by mutations.
bool isStorageTouchedByMutations(
@ -52,7 +55,7 @@ private:
struct Stage;
ASTPtr prepareInterpreterSelectQuery(std::vector<Stage> &prepared_stages, bool dry_run);
void addStreamsForLaterStages(const std::vector<Stage> & prepared_stages, QueryPipeline & pipeline) const;
QueryPipelinePtr addStreamsForLaterStages(const std::vector<Stage> & prepared_stages, QueryPlan & plan) const;
std::optional<SortDescription> getStorageSortDescriptionIfPossible(const Block & header) const;

View File

@ -8,6 +8,11 @@
namespace DB
{
SubqueryForSet::SubqueryForSet() = default;
SubqueryForSet::~SubqueryForSet() = default;
SubqueryForSet::SubqueryForSet(SubqueryForSet &&) = default;
SubqueryForSet & SubqueryForSet::operator= (SubqueryForSet &&) = default;
void SubqueryForSet::makeSource(std::shared_ptr<InterpreterSelectWithUnionQuery> & interpreter,
NamesWithAliases && joined_block_aliases_)
{

View File

@ -18,6 +18,11 @@ class QueryPlan;
/// Information on what to do when executing a subquery in the [GLOBAL] IN/JOIN section.
struct SubqueryForSet
{
SubqueryForSet();
~SubqueryForSet();
SubqueryForSet(SubqueryForSet &&);
SubqueryForSet & operator= (SubqueryForSet &&);
/// The source is obtained using the InterpreterSelectQuery subquery.
std::unique_ptr<QueryPlan> source;

View File

@ -196,59 +196,6 @@ void QueryPipeline::addExtremesTransform()
pipe.addTransform(std::move(transform), nullptr, port);
}
void QueryPipeline::addCreatingSetsTransform(SubqueriesForSets subqueries_for_sets, const SizeLimits & network_transfer_limits, const Context & context)
{
checkInitializedAndNotCompleted();
Pipes sources;
for (auto & subquery : subqueries_for_sets)
{
if (subquery.second.source)
{
auto & source = sources.emplace_back(std::move(subquery.second.source));
if (source.numOutputPorts() > 1)
source.addTransform(std::make_shared<ResizeProcessor>(source.getHeader(), source.numOutputPorts(), 1));
source.dropExtremes();
auto creating_sets = std::make_shared<CreatingSetsTransform>(
source.getHeader(),
getHeader(),
std::move(subquery.second),
network_transfer_limits,
context);
InputPort * totals = nullptr;
if (source.getTotalsPort())
totals = creating_sets->addTotalsPort();
source.addTransform(std::move(creating_sets), totals, nullptr);
}
}
if (sources.empty())
return;
auto * collected_processors = pipe.collected_processors;
/// We unite all sources together.
/// Set collected_processors to attach all newly-added processors to current query plan step.
auto source = Pipe::unitePipes(std::move(sources), collected_processors);
if (source.numOutputPorts() > 1)
source.addTransform(std::make_shared<ResizeProcessor>(source.getHeader(), source.numOutputPorts(), 1));
source.collected_processors = nullptr;
resize(1);
Pipes pipes;
pipes.emplace_back(std::move(source));
pipes.emplace_back(std::move(pipe));
pipe = Pipe::unitePipes(std::move(pipes), collected_processors);
pipe.addTransform(std::make_shared<ConcatProcessor>(getHeader(), 2));
}
void QueryPipeline::setOutputFormat(ProcessorPtr output)
{
checkInitializedAndNotCompleted();
@ -315,7 +262,7 @@ QueryPipeline QueryPipeline::unitePipelines(
return pipeline;
}
void QueryPipeline::addDelayedPipeline(QueryPipeline pipeline)
void QueryPipeline::addDelayingPipeline(QueryPipeline pipeline)
{
pipeline.resize(1);

View File

@ -55,8 +55,6 @@ public:
void addTotalsHavingTransform(ProcessorPtr transform);
/// Add transform which calculates extremes. This transform adds extremes port and doesn't change inputs number.
void addExtremesTransform();
/// Adds transform which creates sets. It will be executed before reading any data from input ports.
void addCreatingSetsTransform(SubqueriesForSets subqueries_for_sets, const SizeLimits & network_transfer_limits, const Context & context);
/// Resize pipeline to single output and add IOutputFormat. Pipeline will be completed after this transformation.
void setOutputFormat(ProcessorPtr output);
/// Get current OutputFormat.
@ -87,7 +85,9 @@ public:
size_t max_threads_limit = 0,
Processors * collected_processors = nullptr);
void addDelayedPipeline(QueryPipeline);
/// Add other pipeline and execute it before current one.
/// Pipeline must have same header.
void addDelayingPipeline(QueryPipeline pipeline);
PipelineExecutorPtr execute();

View File

@ -97,7 +97,7 @@ QueryPipelinePtr CreatingSetsStep::updatePipeline(QueryPipelines pipelines)
delayed_pipeline = std::move(*pipelines.front());
QueryPipelineProcessorsCollector collector(*main_pipeline, this);
main_pipeline->addDelayedPipeline(std::move(delayed_pipeline));
main_pipeline->addDelayingPipeline(std::move(delayed_pipeline));
auto added_processors = collector.detachProcessors();
processors.insert(processors.end(), added_processors.begin(), added_processors.end());
@ -109,4 +109,39 @@ void CreatingSetsStep::describePipeline(FormatSettings & settings) const
IQueryPlanStep::describePipeline(processors, settings);
}
void addCreatingSetsStep(
QueryPlan & query_plan, SubqueriesForSets subqueries_for_sets, const SizeLimits & limits, const Context & context)
{
DataStreams input_streams;
input_streams.emplace_back(query_plan.getCurrentDataStream());
std::vector<std::unique_ptr<QueryPlan>> plans;
plans.emplace_back(std::make_unique<QueryPlan>(std::move(query_plan)));
query_plan = QueryPlan();
for (auto & [description, set] : subqueries_for_sets)
{
auto plan = std::move(set.source);
std::string type = (set.join != nullptr) ? "JOIN"
: "subquery";
auto creating_set = std::make_unique<CreatingSetStep>(
plan->getCurrentDataStream(),
input_streams.front().header,
std::move(description),
std::move(set),
limits,
context);
creating_set->setStepDescription("Create set for " + type);
plan->addStep(std::move(creating_set));
input_streams.emplace_back(plan->getCurrentDataStream());
plans.emplace_back(std::move(plan));
}
auto creating_sets = std::make_unique<CreatingSetsStep>(std::move(input_streams));
creating_sets->setStepDescription("Create sets before main query execution");
query_plan.unitePlans(std::move(creating_sets), std::move(plans));
}
}

View File

@ -18,7 +18,7 @@ public:
SizeLimits network_transfer_limits_,
const Context & context_);
String getName() const override { return "CreatingSets"; }
String getName() const override { return "CreatingSet"; }
void transformPipeline(QueryPipeline & pipeline) override;
@ -46,4 +46,10 @@ private:
Processors processors;
};
void addCreatingSetsStep(
QueryPlan & query_plan,
SubqueriesForSets subqueries_for_sets,
const SizeLimits & limits,
const Context & context);
}

View File

@ -26,6 +26,8 @@ namespace ErrorCodes
QueryPlan::QueryPlan() = default;
QueryPlan::~QueryPlan() = default;
QueryPlan::QueryPlan(QueryPlan &&) = default;
QueryPlan & QueryPlan::operator=(QueryPlan &&) = default;
void QueryPlan::checkInitialized() const
{
@ -51,7 +53,7 @@ const DataStream & QueryPlan::getCurrentDataStream() const
return root->step->getOutputStream();
}
void QueryPlan::unitePlans(QueryPlanStepPtr step, std::vector<QueryPlan> plans)
void QueryPlan::unitePlans(QueryPlanStepPtr step, std::vector<std::unique_ptr<QueryPlan>> plans)
{
if (isInitialized())
throw Exception("Cannot unite plans because current QueryPlan is already initialized",
@ -70,7 +72,7 @@ void QueryPlan::unitePlans(QueryPlanStepPtr step, std::vector<QueryPlan> plans)
for (size_t i = 0; i < num_inputs; ++i)
{
const auto & step_header = inputs[i].header;
const auto & plan_header = plans[i].getCurrentDataStream().header;
const auto & plan_header = plans[i]->getCurrentDataStream().header;
if (!blocksHaveEqualStructure(step_header, plan_header))
throw Exception("Cannot unite QueryPlans using " + step->getName() + " because "
"it has incompatible header with plan " + root->step->getName() + " "
@ -79,19 +81,19 @@ void QueryPlan::unitePlans(QueryPlanStepPtr step, std::vector<QueryPlan> plans)
}
for (auto & plan : plans)
nodes.splice(nodes.end(), std::move(plan.nodes));
nodes.splice(nodes.end(), std::move(plan->nodes));
nodes.emplace_back(Node{.step = std::move(step)});
root = &nodes.back();
for (auto & plan : plans)
root->children.emplace_back(plan.root);
root->children.emplace_back(plan->root);
for (auto & plan : plans)
{
max_threads = std::max(max_threads, plan.max_threads);
max_threads = std::max(max_threads, plan->max_threads);
interpreter_context.insert(interpreter_context.end(),
plan.interpreter_context.begin(), plan.interpreter_context.end());
plan->interpreter_context.begin(), plan->interpreter_context.end());
}
}

View File

@ -25,8 +25,10 @@ class QueryPlan
public:
QueryPlan();
~QueryPlan();
QueryPlan(QueryPlan &&);
QueryPlan & operator=(QueryPlan &&);
void unitePlans(QueryPlanStepPtr step, std::vector<QueryPlan> plans);
void unitePlans(QueryPlanStepPtr step, std::vector<std::unique_ptr<QueryPlan>> plans);
void addStep(QueryPlanStepPtr step);
bool isInitialized() const { return root != nullptr; } /// Tree is not empty

View File

@ -14,7 +14,8 @@ ReadFromPreparedSource::ReadFromPreparedSource(Pipe pipe_, std::shared_ptr<Conte
void ReadFromPreparedSource::initializePipeline(QueryPipeline & pipeline)
{
pipeline.init(std::move(pipe));
pipeline.addInterpreterContext(std::move(context));
if (context)
pipeline.addInterpreterContext(std::move(context));
}
}

View File

@ -9,7 +9,7 @@ namespace DB
class ReadFromPreparedSource : public ISourceStep
{
public:
explicit ReadFromPreparedSource(Pipe pipe_, std::shared_ptr<Context> context_);
explicit ReadFromPreparedSource(Pipe pipe_, std::shared_ptr<Context> context_ = nullptr);
String getName() const override { return "ReadNothing"; }

View File

@ -14,7 +14,7 @@ public:
JoiningTransform(Block input_header, JoinPtr join_,
bool on_totals_ = false, bool default_totals_ = false);
String getName() const override { return "InflatingExpressionTransform"; }
String getName() const override { return "JoiningTransform"; }
static Block transformHeader(Block header, const JoinPtr & join);