Merge 'cnch_dev_cp_replace_if_not_null' into 'cnch-dev'

feat(clickhousech@m-5306499047): [cp] add support for replace_if_not_null

See merge request: !25161
This commit is contained in:
勾王敏浩 2024-09-20 06:28:22 +00:00 committed by Fred Wang
parent 70da7d1bb7
commit a15cb38d81
14 changed files with 214 additions and 6 deletions

View File

@ -492,6 +492,9 @@ ASTPtr InterpreterCreateQuery::formatColumns(const ColumnsDescription & columns,
column_declaration->on_update_expression = column.on_update_expression->clone();
}
if (column.replace_if_not_null)
column_declaration->replace_if_not_null = column.replace_if_not_null;
if (!column.comment.empty())
{
column_declaration->comment = std::make_shared<ASTLiteral>(Field(column.comment));
@ -739,6 +742,9 @@ ColumnsDescription InterpreterCreateQuery::getColumnsDescription(
column.on_update_expression = getDefaultExpression(col_decl.on_update_expression);
}
if (col_decl.replace_if_not_null)
column.replace_if_not_null = col_decl.replace_if_not_null;
if (col_decl.comment)
column.comment = col_decl.comment->as<ASTLiteral &>().value.get<String>();

View File

@ -236,13 +236,19 @@ namespace
if (optimize_for_same_update_columns)
{
Names columns_from_metadata = res.getNames();
NameSet columns_of_replace_if_not_null = data.getInMemoryMetadataPtr()->getColumns().getReplaceIfNotNullColumns();
for (const auto & col_name : columns_from_metadata)
{
if (!same_update_column_set.contains(col_name))
continue;
/// For map type, we still need to read origin value even if it's in update_columns when partial_update_enable_merge_map is true
if (!data.getInMemoryMetadataPtr()->getColumns().getPhysical(col_name).type->isMap()
|| !data.getSettings()->partial_update_enable_merge_map)
bool need_read_for_map_column = data.getInMemoryMetadataPtr()->getColumns().getPhysical(col_name).type->isMap() && data.getSettings()->partial_update_enable_merge_map;
/// For nullable type, we still need to read origin value when replace_if_not_null is true
bool need_read_for_nullable_column = data.getInMemoryMetadataPtr()->getColumns().getPhysical(col_name).type->isNullable() &&
(columns_of_replace_if_not_null.count(col_name) || data.getSettings()->partial_update_replace_if_not_null);
if (!need_read_for_map_column && !need_read_for_nullable_column)
res.erase(col_name);
}
}
@ -2124,6 +2130,8 @@ void MergeTreeDataDeduper::replaceColumnsAndFilterData(
parseUpdateColumns(update_columns->getDataAt(i).toString(), default_filters, check_column, get_column_by_index, i);
}
}
NameSet columns_of_replace_if_not_null = data.getInMemoryMetadataPtr()->getColumns().getReplaceIfNotNullColumns();
auto parse_update_columns_time = timer.elapsedMilliseconds();
size_t thread_num = std::max(static_cast<size_t>(1), std::min(block.columns(), static_cast<size_t>(data.getSettings()->partial_update_replace_columns_thread_size)));
ThreadPool replace_column_pool(thread_num);
@ -2167,6 +2175,15 @@ void MergeTreeDataDeduper::replaceColumnsAndFilterData(
if (!default_filters.count(col.name))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can not find default filter of column {} when partial_update_enable_specify_update_columns is true.", col.name);
if (col.type->isNullable() && (columns_of_replace_if_not_null.count(col.name) || data.getSettings()->partial_update_replace_if_not_null))
{
auto& default_filter_ref = default_filters[col.name];
for (size_t row = 0; row < block_size; ++row)
{
if (col.column->isNullAt(row))
default_filter_ref->getData()[row] = 1;
}
}
is_default_col = std::move(default_filters[col.name]);
const IColumn::Filter & is_default_filter = assert_cast<const ColumnUInt8 &>(*is_default_col).getData();

View File

@ -382,6 +382,12 @@ void MergeTreeMetaBase::checkProperties(
}
}
for (const auto & column : new_metadata.columns)
{
if (column.replace_if_not_null && !column.type->isNullable())
throw Exception("REPLACE_IF_NOT_NULL could not used with nullable type, column name: " + column.name, ErrorCodes::LOGICAL_ERROR);
}
checkKeyExpression(*new_sorting_key.expression, new_sorting_key.sample_block, "Sorting", allow_nullable_key);
}

View File

@ -55,6 +55,9 @@ ASTPtr ASTColumnDeclaration::clone() const
res->children.push_back(res->on_update_expression);
}
if (replace_if_not_null)
res->replace_if_not_null = replace_if_not_null;
if (comment)
{
res->comment = comment->clone();
@ -123,6 +126,11 @@ void ASTColumnDeclaration::formatImpl(const FormatSettings & settings, FormatSta
on_update_expression->formatImpl(settings, state, frame);
}
if (replace_if_not_null)
{
settings.ostr << ' ' << (settings.hilite ? hilite_keyword : "") << "REPLACE_IF_NOT_NULL" << (settings.hilite ? hilite_none : "");
}
if (flags & TYPE_COMPRESSION_FLAG)
{
settings.ostr << ' ' << (settings.hilite ? hilite_keyword : "") << "COMPRESSION" << (settings.hilite ? hilite_none : "");
@ -141,18 +149,18 @@ void ASTColumnDeclaration::formatImpl(const FormatSettings & settings, FormatSta
if (flags & TYPE_BITENGINE_ENCODE_FLAG)
{
settings.ostr << ' ' << (settings.hilite ? hilite_keyword : "") << "BitEngineEncode" << (settings.hilite ? hilite_none : "");
}
}
if (flags & TYPE_BLOOM_FLAG)
{
settings.ostr << ' ' << (settings.hilite ? hilite_keyword : "") << "BLOOM" << (settings.hilite ? hilite_none : "");
}
}
if (flags & TYPE_BITMAP_INDEX_FLAG)
{
settings.ostr << ' ' << (settings.hilite ? hilite_keyword : "") << "BitmapIndex" << (settings.hilite ? hilite_none : "");
}
if (flags & TYPE_SEGMENT_BITMAP_INDEX_FLAG)
{
settings.ostr << ' ' << (settings.hilite ? hilite_keyword : "") << "SegmentBitmapIndex" << (settings.hilite ? hilite_none : "");

View File

@ -41,6 +41,9 @@ public:
ASTPtr on_update_expression;
bool auto_increment;
bool mysql_primary_key;
/// For partial update, this means the imported data will only be replaced when it is of non-null value
/// We did not add this information to flags because it is only used on the write side and does not need to be serialized to the part.
bool replace_if_not_null = false;
ASTPtr comment;
ASTPtr codec;
ASTPtr ttl;

View File

@ -178,6 +178,7 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
ParserKeyword s_comment("COMMENT");
ParserKeyword s_codec("CODEC");
ParserKeyword s_ttl("TTL");
ParserKeyword s_replace_if_not_null{"REPLACE_IF_NOT_NULL"};
ParserKeyword s_remove_ttl("REMOVE TTL");
@ -1126,6 +1127,8 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
command->remove_property = "CODEC";
else if (s_ttl.ignore(pos, expected))
command->remove_property = "TTL";
else if (s_replace_if_not_null.ignore(pos, expected))
command->remove_property = "REPLACE_IF_NOT_NULL";
else
return false;
}

View File

@ -153,6 +153,7 @@ bool IParserColumnDeclaration<NameParser>::parseImpl(Pos & pos, ASTPtr & node, E
ParserDataType type_parser(dt);
ParserKeyword s_default{"DEFAULT"};
ParserKeyword s_auto_increment{"AUTO_INCREMENT"};
ParserKeyword s_replace_if_not_null{"REPLACE_IF_NOT_NULL"};
ParserKeyword s_null{"NULL"};
ParserKeyword s_not{"NOT"};
ParserKeyword s_pk{"PRIMARY KEY"};
@ -244,6 +245,7 @@ bool IParserColumnDeclaration<NameParser>::parseImpl(Pos & pos, ASTPtr & node, E
&& !s_pk.checkWithoutMoving(pos, expected)
&& !s_default.checkWithoutMoving(pos, expected)
&& !s_auto_increment.checkWithoutMoving(pos, expected)
&& !s_replace_if_not_null.checkWithoutMoving(pos, expected)
&& !s_materialized.checkWithoutMoving(pos, expected)
&& !s_alias.checkWithoutMoving(pos, expected)
&& (require_type
@ -324,6 +326,11 @@ bool IParserColumnDeclaration<NameParser>::parseImpl(Pos & pos, ASTPtr & node, E
column_declaration->mysql_primary_key = true;
}
if (s_replace_if_not_null.ignore(pos, expected))
{
column_declaration->replace_if_not_null = true;
}
if (s_comment.ignore(pos, expected))
{
/// should be followed by a string literal

View File

@ -90,6 +90,8 @@ AlterCommand::RemoveProperty removePropertyFromString(const String & property)
return AlterCommand::RemoveProperty::CODEC;
else if (property == "TTL")
return AlterCommand::RemoveProperty::TTL;
else if (property == "REPLACE_IF_NOT_NULL")
return AlterCommand::RemoveProperty::REPLACE_IF_NOT_NULL;
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot remove unknown property '{}'", property);
}
@ -224,6 +226,9 @@ std::optional<AlterCommand> AlterCommand::parse(const ASTAlterCommand * command_
command.default_expression = ast_col_decl.default_expression;
}
if (ast_col_decl.replace_if_not_null)
command.replace_if_not_null = ast_col_decl.replace_if_not_null;
if (ast_col_decl.comment)
{
const auto & ast_comment = ast_col_decl.comment->as<ASTLiteral &>();
@ -586,11 +591,18 @@ void AlterCommand::apply(StorageInMemoryMetadata & metadata, ContextPtr context,
{
column.ttl.reset();
}
else if (to_remove == RemoveProperty::REPLACE_IF_NOT_NULL)
{
column.replace_if_not_null = false;
}
else
{
if (codec)
column.codec = CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(codec, data_type ? data_type : column.type, false, true);
if (replace_if_not_null)
column.replace_if_not_null = replace_if_not_null;
if (comment)
column.comment = *comment;
@ -1467,6 +1479,11 @@ void AlterCommands::validate(const StorageInMemoryMetadata & metadata, ContextPt
ErrorCodes::BAD_ARGUMENTS,
"Column {} doesn't have COMMENT, cannot remove it",
backQuote(column_name));
if (command.to_remove == AlterCommand::RemoveProperty::REPLACE_IF_NOT_NULL && !column_from_table.replace_if_not_null)
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Column {} doesn't specify REPLACE_IF_NOT_NULL, cannot remove it",
backQuote(column_name));
}
const auto & column = all_columns.get(column_name);

View File

@ -92,6 +92,7 @@ struct AlterCommand
COMMENT,
CODEC,
TTL,
REPLACE_IF_NOT_NULL,
};
Type type = UNKNOWN;
@ -110,6 +111,9 @@ struct AlterCommand
ColumnDefaultKind default_kind{};
ASTPtr default_expression{};
/// For MODIFY REPLACE_IF_NOT_NULL
bool replace_if_not_null = false;
/// For COMMENT column
std::optional<String> comment;

View File

@ -797,6 +797,15 @@ ColumnsDescription::ColumnOnUpdates ColumnsDescription::getOnUpdates() const
return ret;
}
NameSet ColumnsDescription::getReplaceIfNotNullColumns() const
{
NameSet ret;
for (const auto & col : columns)
if (col.replace_if_not_null)
ret.emplace(col.name);
return ret;
}
bool ColumnsDescription::hasCompressionCodec(const String & column_name) const
{
const auto it = columns.get<1>().find(column_name);

View File

@ -101,8 +101,10 @@ struct ColumnDescription
String name;
DataTypePtr type;
ColumnDefault default_desc;
// TODO: Wrap on update expression using a structure similar to default_desc
/// TODO: Wrap on update expression using a structure similar to default_desc
ASTPtr on_update_expression;
/// For partial update, this means the imported data will only be replaced when it is of non-null value
bool replace_if_not_null = false;
String comment;
ASTPtr codec;
ASTPtr ttl;
@ -219,6 +221,7 @@ public:
std::optional<ColumnDefault> getDefault(const String & column_name) const;
using ColumnOnUpdates = std::unordered_map<std::string, ASTPtr>;
ColumnOnUpdates getOnUpdates() const;
NameSet getReplaceIfNotNullColumns() const;
/// Does column has non default specified compression codec
bool hasCompressionCodec(const String & column_name) const;

View File

@ -468,6 +468,7 @@ enum StealingCacheMode : UInt64
M(MaxThreads, partial_update_replace_columns_thread_size, 8, "The thread size of replace columns.", 0) \
M(Bool, partial_update_enable_merge_map, true, "Map row will just replace the original one when it's false. Otherwise, it will merge row.", 0) \
M(Bool, partial_update_optimize_for_batch_task, true, "Optimize partial update process when _update_columns_ are all same for batch processing.", 0) \
M(Bool, partial_update_replace_if_not_null, false, "For partial update, this means the imported data will only be replaced when it is of non-null value.", 0) \
M(DedupImplVersion, dedup_impl_version, DedupImplVersion::DEDUP_IN_WRITE_SUFFIX, "Choose different dedup impl version for unique table write process, current valid values: DEDUP_IN_WRITE_SUFFIX, DEDUP_IN_TXN_COMMIT.", 0) \
M(DedupPickWorkerAlgo, dedup_pick_worker_algo, DedupPickWorkerAlgo::CONSISTENT_HASH, "", 0) \
/** CI settings || test settings **/ \

View File

@ -0,0 +1,30 @@
test enable replace_if_not_null with non-nullable column, stage 1
2023-01-01 1001 20 c1 e1
2023-01-01 1002 21 c2 e2
2023-01-01 1003 22 c3 e3
2023-01-02 1004 23 c4 e4
2023-01-02 1005 30 d2 x2
test enable replace_if_not_null with non-nullable column, stage 2
2023-01-01 1001 20 c1 e1
2023-01-01 1002 21 c2 e2
2023-01-01 1003 22 c3 e3
2023-01-02 1004 23 c4 e4
2023-01-02 1005 30 d2
test enable replace_if_not_null with non-nullable column, stage 3
2023-01-01 1001 20 c1 e1
2023-01-01 1002 21 c2 e2
2023-01-01 1003 22 c3 e3
2023-01-02 1004 23 c4 e4
2023-01-02 1005 30 \N
test enable replace_if_not_null with non-nullable column, stage 4
2023-01-01 1001 20 c1 e1
2023-01-01 1002 21 c2 e2
2023-01-01 1003 22 c3 e3
2023-01-02 1004 23 c4 e4
2023-01-02 1005 30 \N
test enable replace_if_not_null with non-nullable column, stage 5
1 2 m m\0\0\0\0\0\0\0\0\0 [] {'a':2,'d':7}
2 3 \0\0\0\0\0\0\0\0\0\0 [5,6] {'e':6,'f':9}
test enable replace_if_not_null with non-nullable column, stage 6
1 2 m\0\0\0\0\0\0\0\0\0 [] {'a':2,'d':7}
2 3 \0\0\0\0\0\0\0\0\0\0 [5,6] {'e':6,'f':9}

View File

@ -0,0 +1,94 @@
DROP TABLE IF EXISTS unique_partial_update_query_nullable;
CREATE TABLE unique_partial_update_query_nullable
(
`p_date` Date,
`id` UInt32,
`number` Nullable(UInt32),
`content` Nullable(String),
`extra` String REPLACE_IF_NOT_NULL
)
ENGINE = CnchMergeTree()
PARTITION BY p_date
ORDER BY id
UNIQUE KEY id
SETTINGS enable_unique_partial_update = 1; -- { serverError 49 }
CREATE TABLE unique_partial_update_query_nullable
(
`p_date` Date,
`id` UInt32,
`number` Nullable(UInt32),
`content` Nullable(String),
`extra` String
)
ENGINE = CnchMergeTree()
PARTITION BY p_date
ORDER BY id
UNIQUE KEY id
SETTINGS enable_unique_partial_update = 1;
SYSTEM STOP DEDUP WORKER unique_partial_update_query_nullable;
SET enable_staging_area_for_write=0, enable_unique_partial_update=0;
INSERT INTO unique_partial_update_query_nullable VALUES ('2023-01-01', 1001, 20, 'c1', 'e1');
INSERT INTO unique_partial_update_query_nullable VALUES ('2023-01-01', 1002, 21, 'c2', 'e2');
INSERT INTO unique_partial_update_query_nullable VALUES ('2023-01-01', 1003, 22, 'c3', 'e3');
INSERT INTO unique_partial_update_query_nullable VALUES ('2023-01-02', 1004, 23, 'c4', 'e4');
INSERT INTO unique_partial_update_query_nullable VALUES ('2023-01-02', 1005, 30, 'd2', 'x2');
SELECT 'test enable replace_if_not_null with non-nullable column, stage 1';
SELECT * FROM unique_partial_update_query_nullable ORDER BY id;
SET enable_staging_area_for_write=0, enable_unique_partial_update=1;
-- data for unique key 1005: 2023-01-02, 1005, 30, d2, ''
INSERT INTO unique_partial_update_query_nullable (p_date, id, number, content, extra, _update_columns_) VALUES ('2023-01-02', 1005, 30, 'd1', null, 'extra');
ALTER TABLE unique_partial_update_query_nullable modify column content REPLACE_IF_NOT_NULL;
-- data for unique key 1005: 2023-01-02, 1005, 30, d2, ''
INSERT INTO unique_partial_update_query_nullable (p_date, id, number, content, extra, _update_columns_) VALUES ('2023-01-02', 1005, 30, null, 'x2', 'content');
SELECT 'test enable replace_if_not_null with non-nullable column, stage 2';
SELECT * FROM unique_partial_update_query_nullable ORDER BY id;
ALTER TABLE unique_partial_update_query_nullable modify column extra REPLACE_IF_NOT_NULL; -- { serverError 49 }
ALTER TABLE unique_partial_update_query_nullable modify column content remove REPLACE_IF_NOT_NULL;
-- data for unique key 1005: 2023-01-02, 1005, 30, NULL, ''
INSERT INTO unique_partial_update_query_nullable (p_date, id, number, content, extra, _update_columns_) VALUES ('2023-01-02', 1005, 30, null, 'x2', 'content');
SELECT 'test enable replace_if_not_null with non-nullable column, stage 3';
SELECT * FROM unique_partial_update_query_nullable ORDER BY id;
ALTER TABLE unique_partial_update_query_nullable modify setting partial_update_replace_if_not_null = 1;
SELECT 'test enable replace_if_not_null with non-nullable column, stage 4';
-- data for unique key 1005: 2023-01-02, 1005, 30, NULL, ''
INSERT INTO unique_partial_update_query_nullable (p_date, id, number, content, extra, _update_columns_) VALUES ('2023-01-02', 1005, null, null, 'x2', 'number');
SELECT * FROM unique_partial_update_query_nullable ORDER BY id;
DROP TABLE IF EXISTS unique_partial_update_query_nullable;
CREATE TABLE unique_partial_update_query_nullable
(
id Int32,
a Nullable(Int32),
b LowCardinality(String),
c Nullable(FixedString(10)),
d Nullable(Array(Int32)),
e Map(String, Int32))
Engine=CnchMergeTree()
ORDER BY id
UNIQUE KEY id
SETTINGS enable_unique_partial_update = 1, partial_update_enable_merge_map = 1, partial_update_replace_if_not_null = 1;
SELECT 'test enable replace_if_not_null with non-nullable column, stage 5';
SET enable_staging_area_for_write=0, enable_unique_partial_update = 0;
INSERT INTO unique_partial_update_query_nullable VALUES
(1, 0, 'x', 'x', [1, 2, 3], {'a': 1}), (1, 0, 'z', 'z', [1, 2], {'b': 3, 'c': 4}), (1, 2, 'm', 'm', [], {'a': 2, 'd': 7}), (2, 0, 'q', 'q', [], {'a': 1}), (2, 0, 't', 't', [4, 5, 6], {'e': 8}), (2, 3, '', '', [5, 6], {'e': 6, 'f': 9});
SELECT * FROM unique_partial_update_query_nullable ORDER BY id;
SET enable_staging_area_for_write=0, enable_unique_partial_update = 1;
SELECT 'test enable replace_if_not_null with non-nullable column, stage 6';
INSERT INTO unique_partial_update_query_nullable VALUES (1, null, null, null, null, null);
SELECT * FROM unique_partial_update_query_nullable ORDER BY id;
DROP TABLE IF EXISTS unique_partial_update_query_nullable;