diff --git a/src/Interpreters/InterpreterCreateQuery.cpp b/src/Interpreters/InterpreterCreateQuery.cpp index 7fb372acc9..d9b1ee79f7 100644 --- a/src/Interpreters/InterpreterCreateQuery.cpp +++ b/src/Interpreters/InterpreterCreateQuery.cpp @@ -492,6 +492,9 @@ ASTPtr InterpreterCreateQuery::formatColumns(const ColumnsDescription & columns, column_declaration->on_update_expression = column.on_update_expression->clone(); } + if (column.replace_if_not_null) + column_declaration->replace_if_not_null = column.replace_if_not_null; + if (!column.comment.empty()) { column_declaration->comment = std::make_shared(Field(column.comment)); @@ -739,6 +742,9 @@ ColumnsDescription InterpreterCreateQuery::getColumnsDescription( column.on_update_expression = getDefaultExpression(col_decl.on_update_expression); } + if (col_decl.replace_if_not_null) + column.replace_if_not_null = col_decl.replace_if_not_null; + if (col_decl.comment) column.comment = col_decl.comment->as().value.get(); diff --git a/src/MergeTreeCommon/MergeTreeDataDeduper.cpp b/src/MergeTreeCommon/MergeTreeDataDeduper.cpp index c0b5a00279..c764e2fdf3 100644 --- a/src/MergeTreeCommon/MergeTreeDataDeduper.cpp +++ b/src/MergeTreeCommon/MergeTreeDataDeduper.cpp @@ -236,13 +236,19 @@ namespace if (optimize_for_same_update_columns) { Names columns_from_metadata = res.getNames(); + NameSet columns_of_replace_if_not_null = data.getInMemoryMetadataPtr()->getColumns().getReplaceIfNotNullColumns(); for (const auto & col_name : columns_from_metadata) { if (!same_update_column_set.contains(col_name)) continue; + /// For map type, we still need to read origin value even if it's in update_columns when partial_update_enable_merge_map is true - if (!data.getInMemoryMetadataPtr()->getColumns().getPhysical(col_name).type->isMap() - || !data.getSettings()->partial_update_enable_merge_map) + bool need_read_for_map_column = data.getInMemoryMetadataPtr()->getColumns().getPhysical(col_name).type->isMap() && data.getSettings()->partial_update_enable_merge_map; + /// For nullable type, we still need to read origin value when replace_if_not_null is true + bool need_read_for_nullable_column = data.getInMemoryMetadataPtr()->getColumns().getPhysical(col_name).type->isNullable() && + (columns_of_replace_if_not_null.count(col_name) || data.getSettings()->partial_update_replace_if_not_null); + + if (!need_read_for_map_column && !need_read_for_nullable_column) res.erase(col_name); } } @@ -2124,6 +2130,8 @@ void MergeTreeDataDeduper::replaceColumnsAndFilterData( parseUpdateColumns(update_columns->getDataAt(i).toString(), default_filters, check_column, get_column_by_index, i); } } + + NameSet columns_of_replace_if_not_null = data.getInMemoryMetadataPtr()->getColumns().getReplaceIfNotNullColumns(); auto parse_update_columns_time = timer.elapsedMilliseconds(); size_t thread_num = std::max(static_cast(1), std::min(block.columns(), static_cast(data.getSettings()->partial_update_replace_columns_thread_size))); ThreadPool replace_column_pool(thread_num); @@ -2167,6 +2175,15 @@ void MergeTreeDataDeduper::replaceColumnsAndFilterData( if (!default_filters.count(col.name)) throw Exception(ErrorCodes::LOGICAL_ERROR, "Can not find default filter of column {} when partial_update_enable_specify_update_columns is true.", col.name); + if (col.type->isNullable() && (columns_of_replace_if_not_null.count(col.name) || data.getSettings()->partial_update_replace_if_not_null)) + { + auto& default_filter_ref = default_filters[col.name]; + for (size_t row = 0; row < block_size; ++row) + { + if (col.column->isNullAt(row)) + default_filter_ref->getData()[row] = 1; + } + } is_default_col = std::move(default_filters[col.name]); const IColumn::Filter & is_default_filter = assert_cast(*is_default_col).getData(); diff --git a/src/MergeTreeCommon/MergeTreeMetaBase.cpp b/src/MergeTreeCommon/MergeTreeMetaBase.cpp index 0e5e021bb9..ca306ea472 100644 --- a/src/MergeTreeCommon/MergeTreeMetaBase.cpp +++ b/src/MergeTreeCommon/MergeTreeMetaBase.cpp @@ -382,6 +382,12 @@ void MergeTreeMetaBase::checkProperties( } } + for (const auto & column : new_metadata.columns) + { + if (column.replace_if_not_null && !column.type->isNullable()) + throw Exception("REPLACE_IF_NOT_NULL could not used with nullable type, column name: " + column.name, ErrorCodes::LOGICAL_ERROR); + } + checkKeyExpression(*new_sorting_key.expression, new_sorting_key.sample_block, "Sorting", allow_nullable_key); } diff --git a/src/Parsers/ASTColumnDeclaration.cpp b/src/Parsers/ASTColumnDeclaration.cpp index 08fd0d9213..d4478905d2 100644 --- a/src/Parsers/ASTColumnDeclaration.cpp +++ b/src/Parsers/ASTColumnDeclaration.cpp @@ -55,6 +55,9 @@ ASTPtr ASTColumnDeclaration::clone() const res->children.push_back(res->on_update_expression); } + if (replace_if_not_null) + res->replace_if_not_null = replace_if_not_null; + if (comment) { res->comment = comment->clone(); @@ -123,6 +126,11 @@ void ASTColumnDeclaration::formatImpl(const FormatSettings & settings, FormatSta on_update_expression->formatImpl(settings, state, frame); } + if (replace_if_not_null) + { + settings.ostr << ' ' << (settings.hilite ? hilite_keyword : "") << "REPLACE_IF_NOT_NULL" << (settings.hilite ? hilite_none : ""); + } + if (flags & TYPE_COMPRESSION_FLAG) { settings.ostr << ' ' << (settings.hilite ? hilite_keyword : "") << "COMPRESSION" << (settings.hilite ? hilite_none : ""); @@ -141,18 +149,18 @@ void ASTColumnDeclaration::formatImpl(const FormatSettings & settings, FormatSta if (flags & TYPE_BITENGINE_ENCODE_FLAG) { settings.ostr << ' ' << (settings.hilite ? hilite_keyword : "") << "BitEngineEncode" << (settings.hilite ? hilite_none : ""); - } + } if (flags & TYPE_BLOOM_FLAG) { settings.ostr << ' ' << (settings.hilite ? hilite_keyword : "") << "BLOOM" << (settings.hilite ? hilite_none : ""); - } + } if (flags & TYPE_BITMAP_INDEX_FLAG) { settings.ostr << ' ' << (settings.hilite ? hilite_keyword : "") << "BitmapIndex" << (settings.hilite ? hilite_none : ""); } - + if (flags & TYPE_SEGMENT_BITMAP_INDEX_FLAG) { settings.ostr << ' ' << (settings.hilite ? hilite_keyword : "") << "SegmentBitmapIndex" << (settings.hilite ? hilite_none : ""); diff --git a/src/Parsers/ASTColumnDeclaration.h b/src/Parsers/ASTColumnDeclaration.h index 4e389ec81b..0afcf2cbf3 100644 --- a/src/Parsers/ASTColumnDeclaration.h +++ b/src/Parsers/ASTColumnDeclaration.h @@ -41,6 +41,9 @@ public: ASTPtr on_update_expression; bool auto_increment; bool mysql_primary_key; + /// For partial update, this means the imported data will only be replaced when it is of non-null value + /// We did not add this information to flags because it is only used on the write side and does not need to be serialized to the part. + bool replace_if_not_null = false; ASTPtr comment; ASTPtr codec; ASTPtr ttl; diff --git a/src/Parsers/ParserAlterQuery.cpp b/src/Parsers/ParserAlterQuery.cpp index e455d81313..bee28e1538 100644 --- a/src/Parsers/ParserAlterQuery.cpp +++ b/src/Parsers/ParserAlterQuery.cpp @@ -178,6 +178,7 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected ParserKeyword s_comment("COMMENT"); ParserKeyword s_codec("CODEC"); ParserKeyword s_ttl("TTL"); + ParserKeyword s_replace_if_not_null{"REPLACE_IF_NOT_NULL"}; ParserKeyword s_remove_ttl("REMOVE TTL"); @@ -1126,6 +1127,8 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected command->remove_property = "CODEC"; else if (s_ttl.ignore(pos, expected)) command->remove_property = "TTL"; + else if (s_replace_if_not_null.ignore(pos, expected)) + command->remove_property = "REPLACE_IF_NOT_NULL"; else return false; } diff --git a/src/Parsers/ParserCreateQuery.h b/src/Parsers/ParserCreateQuery.h index d6e710578d..024a680437 100644 --- a/src/Parsers/ParserCreateQuery.h +++ b/src/Parsers/ParserCreateQuery.h @@ -153,6 +153,7 @@ bool IParserColumnDeclaration::parseImpl(Pos & pos, ASTPtr & node, E ParserDataType type_parser(dt); ParserKeyword s_default{"DEFAULT"}; ParserKeyword s_auto_increment{"AUTO_INCREMENT"}; + ParserKeyword s_replace_if_not_null{"REPLACE_IF_NOT_NULL"}; ParserKeyword s_null{"NULL"}; ParserKeyword s_not{"NOT"}; ParserKeyword s_pk{"PRIMARY KEY"}; @@ -244,6 +245,7 @@ bool IParserColumnDeclaration::parseImpl(Pos & pos, ASTPtr & node, E && !s_pk.checkWithoutMoving(pos, expected) && !s_default.checkWithoutMoving(pos, expected) && !s_auto_increment.checkWithoutMoving(pos, expected) + && !s_replace_if_not_null.checkWithoutMoving(pos, expected) && !s_materialized.checkWithoutMoving(pos, expected) && !s_alias.checkWithoutMoving(pos, expected) && (require_type @@ -324,6 +326,11 @@ bool IParserColumnDeclaration::parseImpl(Pos & pos, ASTPtr & node, E column_declaration->mysql_primary_key = true; } + if (s_replace_if_not_null.ignore(pos, expected)) + { + column_declaration->replace_if_not_null = true; + } + if (s_comment.ignore(pos, expected)) { /// should be followed by a string literal diff --git a/src/Storages/AlterCommands.cpp b/src/Storages/AlterCommands.cpp index 92a5c60d9b..26e15d1440 100644 --- a/src/Storages/AlterCommands.cpp +++ b/src/Storages/AlterCommands.cpp @@ -90,6 +90,8 @@ AlterCommand::RemoveProperty removePropertyFromString(const String & property) return AlterCommand::RemoveProperty::CODEC; else if (property == "TTL") return AlterCommand::RemoveProperty::TTL; + else if (property == "REPLACE_IF_NOT_NULL") + return AlterCommand::RemoveProperty::REPLACE_IF_NOT_NULL; throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot remove unknown property '{}'", property); } @@ -224,6 +226,9 @@ std::optional AlterCommand::parse(const ASTAlterCommand * command_ command.default_expression = ast_col_decl.default_expression; } + if (ast_col_decl.replace_if_not_null) + command.replace_if_not_null = ast_col_decl.replace_if_not_null; + if (ast_col_decl.comment) { const auto & ast_comment = ast_col_decl.comment->as(); @@ -586,11 +591,18 @@ void AlterCommand::apply(StorageInMemoryMetadata & metadata, ContextPtr context, { column.ttl.reset(); } + else if (to_remove == RemoveProperty::REPLACE_IF_NOT_NULL) + { + column.replace_if_not_null = false; + } else { if (codec) column.codec = CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(codec, data_type ? data_type : column.type, false, true); + if (replace_if_not_null) + column.replace_if_not_null = replace_if_not_null; + if (comment) column.comment = *comment; @@ -1467,6 +1479,11 @@ void AlterCommands::validate(const StorageInMemoryMetadata & metadata, ContextPt ErrorCodes::BAD_ARGUMENTS, "Column {} doesn't have COMMENT, cannot remove it", backQuote(column_name)); + if (command.to_remove == AlterCommand::RemoveProperty::REPLACE_IF_NOT_NULL && !column_from_table.replace_if_not_null) + throw Exception( + ErrorCodes::BAD_ARGUMENTS, + "Column {} doesn't specify REPLACE_IF_NOT_NULL, cannot remove it", + backQuote(column_name)); } const auto & column = all_columns.get(column_name); diff --git a/src/Storages/AlterCommands.h b/src/Storages/AlterCommands.h index b69481366d..9d36dc7057 100644 --- a/src/Storages/AlterCommands.h +++ b/src/Storages/AlterCommands.h @@ -92,6 +92,7 @@ struct AlterCommand COMMENT, CODEC, TTL, + REPLACE_IF_NOT_NULL, }; Type type = UNKNOWN; @@ -110,6 +111,9 @@ struct AlterCommand ColumnDefaultKind default_kind{}; ASTPtr default_expression{}; + /// For MODIFY REPLACE_IF_NOT_NULL + bool replace_if_not_null = false; + /// For COMMENT column std::optional comment; diff --git a/src/Storages/ColumnsDescription.cpp b/src/Storages/ColumnsDescription.cpp index 375886342e..630842b9c8 100644 --- a/src/Storages/ColumnsDescription.cpp +++ b/src/Storages/ColumnsDescription.cpp @@ -797,6 +797,15 @@ ColumnsDescription::ColumnOnUpdates ColumnsDescription::getOnUpdates() const return ret; } +NameSet ColumnsDescription::getReplaceIfNotNullColumns() const +{ + NameSet ret; + for (const auto & col : columns) + if (col.replace_if_not_null) + ret.emplace(col.name); + return ret; +} + bool ColumnsDescription::hasCompressionCodec(const String & column_name) const { const auto it = columns.get<1>().find(column_name); diff --git a/src/Storages/ColumnsDescription.h b/src/Storages/ColumnsDescription.h index cd9beb4c65..ec5d9a644d 100644 --- a/src/Storages/ColumnsDescription.h +++ b/src/Storages/ColumnsDescription.h @@ -101,8 +101,10 @@ struct ColumnDescription String name; DataTypePtr type; ColumnDefault default_desc; - // TODO: Wrap on update expression using a structure similar to default_desc + /// TODO: Wrap on update expression using a structure similar to default_desc ASTPtr on_update_expression; + /// For partial update, this means the imported data will only be replaced when it is of non-null value + bool replace_if_not_null = false; String comment; ASTPtr codec; ASTPtr ttl; @@ -219,6 +221,7 @@ public: std::optional getDefault(const String & column_name) const; using ColumnOnUpdates = std::unordered_map; ColumnOnUpdates getOnUpdates() const; + NameSet getReplaceIfNotNullColumns() const; /// Does column has non default specified compression codec bool hasCompressionCodec(const String & column_name) const; diff --git a/src/Storages/MergeTree/MergeTreeSettings.h b/src/Storages/MergeTree/MergeTreeSettings.h index c4d736b582..62704bacb4 100644 --- a/src/Storages/MergeTree/MergeTreeSettings.h +++ b/src/Storages/MergeTree/MergeTreeSettings.h @@ -468,6 +468,7 @@ enum StealingCacheMode : UInt64 M(MaxThreads, partial_update_replace_columns_thread_size, 8, "The thread size of replace columns.", 0) \ M(Bool, partial_update_enable_merge_map, true, "Map row will just replace the original one when it's false. Otherwise, it will merge row.", 0) \ M(Bool, partial_update_optimize_for_batch_task, true, "Optimize partial update process when _update_columns_ are all same for batch processing.", 0) \ + M(Bool, partial_update_replace_if_not_null, false, "For partial update, this means the imported data will only be replaced when it is of non-null value.", 0) \ M(DedupImplVersion, dedup_impl_version, DedupImplVersion::DEDUP_IN_WRITE_SUFFIX, "Choose different dedup impl version for unique table write process, current valid values: DEDUP_IN_WRITE_SUFFIX, DEDUP_IN_TXN_COMMIT.", 0) \ M(DedupPickWorkerAlgo, dedup_pick_worker_algo, DedupPickWorkerAlgo::CONSISTENT_HASH, "", 0) \ /** CI settings || test settings **/ \ diff --git a/tests/queries/4_cnch_stateless/10052_uniquekey_test_partial_update_nullable.reference b/tests/queries/4_cnch_stateless/10052_uniquekey_test_partial_update_nullable.reference new file mode 100644 index 0000000000..345c72e81a --- /dev/null +++ b/tests/queries/4_cnch_stateless/10052_uniquekey_test_partial_update_nullable.reference @@ -0,0 +1,30 @@ +test enable replace_if_not_null with non-nullable column, stage 1 +2023-01-01 1001 20 c1 e1 +2023-01-01 1002 21 c2 e2 +2023-01-01 1003 22 c3 e3 +2023-01-02 1004 23 c4 e4 +2023-01-02 1005 30 d2 x2 +test enable replace_if_not_null with non-nullable column, stage 2 +2023-01-01 1001 20 c1 e1 +2023-01-01 1002 21 c2 e2 +2023-01-01 1003 22 c3 e3 +2023-01-02 1004 23 c4 e4 +2023-01-02 1005 30 d2 +test enable replace_if_not_null with non-nullable column, stage 3 +2023-01-01 1001 20 c1 e1 +2023-01-01 1002 21 c2 e2 +2023-01-01 1003 22 c3 e3 +2023-01-02 1004 23 c4 e4 +2023-01-02 1005 30 \N +test enable replace_if_not_null with non-nullable column, stage 4 +2023-01-01 1001 20 c1 e1 +2023-01-01 1002 21 c2 e2 +2023-01-01 1003 22 c3 e3 +2023-01-02 1004 23 c4 e4 +2023-01-02 1005 30 \N +test enable replace_if_not_null with non-nullable column, stage 5 +1 2 m m\0\0\0\0\0\0\0\0\0 [] {'a':2,'d':7} +2 3 \0\0\0\0\0\0\0\0\0\0 [5,6] {'e':6,'f':9} +test enable replace_if_not_null with non-nullable column, stage 6 +1 2 m\0\0\0\0\0\0\0\0\0 [] {'a':2,'d':7} +2 3 \0\0\0\0\0\0\0\0\0\0 [5,6] {'e':6,'f':9} diff --git a/tests/queries/4_cnch_stateless/10052_uniquekey_test_partial_update_nullable.sql b/tests/queries/4_cnch_stateless/10052_uniquekey_test_partial_update_nullable.sql new file mode 100644 index 0000000000..e059008cac --- /dev/null +++ b/tests/queries/4_cnch_stateless/10052_uniquekey_test_partial_update_nullable.sql @@ -0,0 +1,94 @@ +DROP TABLE IF EXISTS unique_partial_update_query_nullable; + +CREATE TABLE unique_partial_update_query_nullable +( + `p_date` Date, + `id` UInt32, + `number` Nullable(UInt32), + `content` Nullable(String), + `extra` String REPLACE_IF_NOT_NULL +) +ENGINE = CnchMergeTree() +PARTITION BY p_date +ORDER BY id +UNIQUE KEY id +SETTINGS enable_unique_partial_update = 1; -- { serverError 49 } + +CREATE TABLE unique_partial_update_query_nullable +( + `p_date` Date, + `id` UInt32, + `number` Nullable(UInt32), + `content` Nullable(String), + `extra` String +) +ENGINE = CnchMergeTree() +PARTITION BY p_date +ORDER BY id +UNIQUE KEY id +SETTINGS enable_unique_partial_update = 1; + +SYSTEM STOP DEDUP WORKER unique_partial_update_query_nullable; + +SET enable_staging_area_for_write=0, enable_unique_partial_update=0; +INSERT INTO unique_partial_update_query_nullable VALUES ('2023-01-01', 1001, 20, 'c1', 'e1'); +INSERT INTO unique_partial_update_query_nullable VALUES ('2023-01-01', 1002, 21, 'c2', 'e2'); +INSERT INTO unique_partial_update_query_nullable VALUES ('2023-01-01', 1003, 22, 'c3', 'e3'); +INSERT INTO unique_partial_update_query_nullable VALUES ('2023-01-02', 1004, 23, 'c4', 'e4'); +INSERT INTO unique_partial_update_query_nullable VALUES ('2023-01-02', 1005, 30, 'd2', 'x2'); + +SELECT 'test enable replace_if_not_null with non-nullable column, stage 1'; +SELECT * FROM unique_partial_update_query_nullable ORDER BY id; + +SET enable_staging_area_for_write=0, enable_unique_partial_update=1; +-- data for unique key 1005: 2023-01-02, 1005, 30, d2, '' +INSERT INTO unique_partial_update_query_nullable (p_date, id, number, content, extra, _update_columns_) VALUES ('2023-01-02', 1005, 30, 'd1', null, 'extra'); + +ALTER TABLE unique_partial_update_query_nullable modify column content REPLACE_IF_NOT_NULL; +-- data for unique key 1005: 2023-01-02, 1005, 30, d2, '' +INSERT INTO unique_partial_update_query_nullable (p_date, id, number, content, extra, _update_columns_) VALUES ('2023-01-02', 1005, 30, null, 'x2', 'content'); + +SELECT 'test enable replace_if_not_null with non-nullable column, stage 2'; +SELECT * FROM unique_partial_update_query_nullable ORDER BY id; + +ALTER TABLE unique_partial_update_query_nullable modify column extra REPLACE_IF_NOT_NULL; -- { serverError 49 } +ALTER TABLE unique_partial_update_query_nullable modify column content remove REPLACE_IF_NOT_NULL; +-- data for unique key 1005: 2023-01-02, 1005, 30, NULL, '' +INSERT INTO unique_partial_update_query_nullable (p_date, id, number, content, extra, _update_columns_) VALUES ('2023-01-02', 1005, 30, null, 'x2', 'content'); +SELECT 'test enable replace_if_not_null with non-nullable column, stage 3'; +SELECT * FROM unique_partial_update_query_nullable ORDER BY id; + +ALTER TABLE unique_partial_update_query_nullable modify setting partial_update_replace_if_not_null = 1; + +SELECT 'test enable replace_if_not_null with non-nullable column, stage 4'; +-- data for unique key 1005: 2023-01-02, 1005, 30, NULL, '' +INSERT INTO unique_partial_update_query_nullable (p_date, id, number, content, extra, _update_columns_) VALUES ('2023-01-02', 1005, null, null, 'x2', 'number'); +SELECT * FROM unique_partial_update_query_nullable ORDER BY id; + +DROP TABLE IF EXISTS unique_partial_update_query_nullable; + +CREATE TABLE unique_partial_update_query_nullable +( + id Int32, + a Nullable(Int32), + b LowCardinality(String), + c Nullable(FixedString(10)), + d Nullable(Array(Int32)), + e Map(String, Int32)) +Engine=CnchMergeTree() +ORDER BY id +UNIQUE KEY id +SETTINGS enable_unique_partial_update = 1, partial_update_enable_merge_map = 1, partial_update_replace_if_not_null = 1; + +SELECT 'test enable replace_if_not_null with non-nullable column, stage 5'; +SET enable_staging_area_for_write=0, enable_unique_partial_update = 0; +INSERT INTO unique_partial_update_query_nullable VALUES +(1, 0, 'x', 'x', [1, 2, 3], {'a': 1}), (1, 0, 'z', 'z', [1, 2], {'b': 3, 'c': 4}), (1, 2, 'm', 'm', [], {'a': 2, 'd': 7}), (2, 0, 'q', 'q', [], {'a': 1}), (2, 0, 't', 't', [4, 5, 6], {'e': 8}), (2, 3, '', '', [5, 6], {'e': 6, 'f': 9}); +SELECT * FROM unique_partial_update_query_nullable ORDER BY id; + +SET enable_staging_area_for_write=0, enable_unique_partial_update = 1; +SELECT 'test enable replace_if_not_null with non-nullable column, stage 6'; +INSERT INTO unique_partial_update_query_nullable VALUES (1, null, null, null, null, null); +SELECT * FROM unique_partial_update_query_nullable ORDER BY id; + +DROP TABLE IF EXISTS unique_partial_update_query_nullable;