From f7f177b015d6390cab60ddb79469b29e3c525703 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=9C=E5=B3=B0?= Date: Thu, 19 Sep 2024 09:01:05 +0000 Subject: [PATCH] Merge 'fix_async_mv_misc_cnch_dev' into 'cnch-dev' fix(clickhousech@m-4676840578): fix misc cnch async materialized view issues for cnch-dev See merge request: !25127 --- .../CnchRefreshMaterializedViewThread.cpp | 14 ++++++++------ src/Interpreters/InJoinSubqueriesPreprocessor.cpp | 10 +++------- .../MaterializedView/PartitionTransformer.cpp | 4 +--- src/Storages/StorageMaterializedView.cpp | 5 ++--- 4 files changed, 14 insertions(+), 19 deletions(-) diff --git a/src/CloudServices/CnchRefreshMaterializedViewThread.cpp b/src/CloudServices/CnchRefreshMaterializedViewThread.cpp index 75b8ee2b17..5cd9e76d87 100644 --- a/src/CloudServices/CnchRefreshMaterializedViewThread.cpp +++ b/src/CloudServices/CnchRefreshMaterializedViewThread.cpp @@ -185,6 +185,8 @@ void CnchRefreshMaterializedViewThread::runImpl() bool CnchRefreshMaterializedViewThread::constructAndScheduleRefreshTasks(StoragePtr & istorage, StorageMaterializedView & storage) { ContextMutablePtr query_context = Context::createCopy(getContext()); + query_context->makeQueryContext(); + query_context->makeSessionContext(); auto refresh_params = storage.getAsyncRefreshParams(query_context, false); std::vector task_ids = {}; @@ -296,19 +298,19 @@ String CnchRefreshMaterializedViewThread::executeTaskLocal( task_id = task_id, mv_refresh_param = mv_refresh_param, command_context = Context::createCopy(query_context)]() { + auto settings = query_context->getSettings(); + auto user_password = const_cast (*command_context).getCnchInterserverCredentials(); command_context->setCurrentTransaction(nullptr, false); command_context->setCurrentVW(nullptr); command_context->setCurrentWorkerGroup(nullptr); - command_context->makeSessionContext(); - command_context->makeQueryContext(); - auto settings = query_context->getSettings(); command_context->setSettings(settings); - CurrentThread::get().pushTenantId(command_context->getSettingsRef().tenant_id); - - auto user_password = const_cast (*command_context).getCnchInterserverCredentials(); + command_context->setTenantId(command_context->getSettingsRef().tenant_id); command_context->setUser(user_password.first, user_password.second, Poco::Net::SocketAddress{}); command_context->setCurrentQueryId(task_id); + command_context->makeSessionContext(); + command_context->makeQueryContext(); + storage.refreshAsync(mv_refresh_param, command_context); command_context->setCurrentTransaction(nullptr); }); diff --git a/src/Interpreters/InJoinSubqueriesPreprocessor.cpp b/src/Interpreters/InJoinSubqueriesPreprocessor.cpp index b4ef1e8afa..a9d144ddbf 100644 --- a/src/Interpreters/InJoinSubqueriesPreprocessor.cpp +++ b/src/Interpreters/InJoinSubqueriesPreprocessor.cpp @@ -90,15 +90,11 @@ private: { const DistributedProductMode distributed_product_mode = getContext()->getSettingsRef().distributed_product_mode; - StoragePtr storage = tryGetTable(database_and_table, getContext()); - if (!storage || !checker.hasAtLeastTwoShards(*storage)) - return; - + /// Convert distributed table to corresponding remote table. if (distributed_product_mode == DistributedProductMode::LOCAL) { - /// Convert distributed table to corresponding remote table. - StorageDistributed * distributed = dynamic_cast(storage.get()); - if (!distributed) + StoragePtr storage = tryGetTable(database_and_table, getContext()); + if (!storage || !checker.hasAtLeastTwoShards(*storage)) return; std::string database; diff --git a/src/Storages/MaterializedView/PartitionTransformer.cpp b/src/Storages/MaterializedView/PartitionTransformer.cpp index be5f0db8b1..fa31044c7f 100644 --- a/src/Storages/MaterializedView/PartitionTransformer.cpp +++ b/src/Storages/MaterializedView/PartitionTransformer.cpp @@ -16,7 +16,6 @@ #include #include #include -#include #include #include @@ -128,7 +127,7 @@ void PartitionTransformer::validate(ContextMutablePtr local_context) return; LOG_DEBUG(log, "PartitionTransformer::validate mv_query: {} ", queryToString(mv_query)); interpretSettings(mv_query, local_context); - CurrentThread::get().pushTenantId(local_context->getSettingsRef().tenant_id); + local_context->setTenantId(local_context->getSettingsRef().tenant_id); MaterializedViewStructurePtr structure = MaterializedViewStructure::buildFrom(target_table_id, target_table_id, mv_query->clone(), async_materialized_view, local_context); validate(local_context, structure); @@ -371,7 +370,6 @@ AsyncRefreshParamPtrs PartitionTransformer::constructRefreshParams( AsyncRefreshParamPtr refresh_param = std::make_shared(); const auto & settings = local_context->getSettingsRef(); auto query = mv_query->clone(); - QueryRewriter{}.rewrite(query, local_context, false); bool cascading = local_context->getSettingsRef().cascading_refresh_materialized_view; bool insert_overwrite = local_context->getSettingsRef().enable_mv_async_insert_overwrite; diff --git a/src/Storages/StorageMaterializedView.cpp b/src/Storages/StorageMaterializedView.cpp index 7f69d37e2a..33546b7cd0 100644 --- a/src/Storages/StorageMaterializedView.cpp +++ b/src/Storages/StorageMaterializedView.cpp @@ -590,7 +590,7 @@ void StorageMaterializedView::executeByDropInsert(AsyncRefreshParamPtr param, Co { std::optional query_scope; query_scope.emplace(insert_context); - CurrentThread::get().pushTenantId(insert_context->getSettingsRef().tenant_id); + insert_context->setTenantId(insert_context->getSettingsRef().tenant_id); BlockIO insert_io; try { @@ -623,7 +623,6 @@ void StorageMaterializedView::executeByDropInsert(AsyncRefreshParamPtr param, Co insert_io.onException(); throw; } - query_scope.reset(); } catch (...) @@ -701,7 +700,7 @@ void StorageMaterializedView::executeByInsertOverwrite(AsyncRefreshParamPtr para { std::optional query_scope; query_scope.emplace(insert_overwrite_context); - CurrentThread::get().pushTenantId(insert_overwrite_context->getSettingsRef().tenant_id); + insert_overwrite_context->setTenantId(insert_overwrite_context->getSettingsRef().tenant_id); LOG_DEBUG(log, "refresh sync materialized view refresh insert overwite query: {}", param->insert_overwrite_query); BlockIO insert_io;