Merge 'fix_async_mv_misc_cnch_dev' into 'cnch-dev'

fix(clickhousech@m-4676840578):  fix misc cnch async materialized view issues for cnch-dev

See merge request: !25127
This commit is contained in:
杜峰 2024-09-19 09:01:05 +00:00 committed by Fred Wang
parent 35b01aafe6
commit f7f177b015
4 changed files with 14 additions and 19 deletions

View File

@ -185,6 +185,8 @@ void CnchRefreshMaterializedViewThread::runImpl()
bool CnchRefreshMaterializedViewThread::constructAndScheduleRefreshTasks(StoragePtr & istorage, StorageMaterializedView & storage)
{
ContextMutablePtr query_context = Context::createCopy(getContext());
query_context->makeQueryContext();
query_context->makeSessionContext();
auto refresh_params = storage.getAsyncRefreshParams(query_context, false);
std::vector<String> task_ids = {};
@ -296,19 +298,19 @@ String CnchRefreshMaterializedViewThread::executeTaskLocal(
task_id = task_id,
mv_refresh_param = mv_refresh_param,
command_context = Context::createCopy(query_context)]() {
auto settings = query_context->getSettings();
auto user_password = const_cast<const Context &> (*command_context).getCnchInterserverCredentials();
command_context->setCurrentTransaction(nullptr, false);
command_context->setCurrentVW(nullptr);
command_context->setCurrentWorkerGroup(nullptr);
command_context->makeSessionContext();
command_context->makeQueryContext();
auto settings = query_context->getSettings();
command_context->setSettings(settings);
CurrentThread::get().pushTenantId(command_context->getSettingsRef().tenant_id);
auto user_password = const_cast<const Context &> (*command_context).getCnchInterserverCredentials();
command_context->setTenantId(command_context->getSettingsRef().tenant_id);
command_context->setUser(user_password.first, user_password.second, Poco::Net::SocketAddress{});
command_context->setCurrentQueryId(task_id);
command_context->makeSessionContext();
command_context->makeQueryContext();
storage.refreshAsync(mv_refresh_param, command_context);
command_context->setCurrentTransaction(nullptr);
});

View File

@ -90,15 +90,11 @@ private:
{
const DistributedProductMode distributed_product_mode = getContext()->getSettingsRef().distributed_product_mode;
StoragePtr storage = tryGetTable(database_and_table, getContext());
if (!storage || !checker.hasAtLeastTwoShards(*storage))
return;
/// Convert distributed table to corresponding remote table.
if (distributed_product_mode == DistributedProductMode::LOCAL)
{
/// Convert distributed table to corresponding remote table.
StorageDistributed * distributed = dynamic_cast<StorageDistributed *>(storage.get());
if (!distributed)
StoragePtr storage = tryGetTable(database_and_table, getContext());
if (!storage || !checker.hasAtLeastTwoShards(*storage))
return;
std::string database;

View File

@ -16,7 +16,6 @@
#include <Storages/Hive/HivePartition.h>
#include <Common/Exception.h>
#include <Common/typeid_cast.h>
#include <Analyzers/QueryRewriter.h>
#include <Databases/DatabasesCommon.h>
#include <Optimizer/MaterializedView/ExpressionSubstitution.h>
@ -128,7 +127,7 @@ void PartitionTransformer::validate(ContextMutablePtr local_context)
return;
LOG_DEBUG(log, "PartitionTransformer::validate mv_query: {} ", queryToString(mv_query));
interpretSettings(mv_query, local_context);
CurrentThread::get().pushTenantId(local_context->getSettingsRef().tenant_id);
local_context->setTenantId(local_context->getSettingsRef().tenant_id);
MaterializedViewStructurePtr structure
= MaterializedViewStructure::buildFrom(target_table_id, target_table_id, mv_query->clone(), async_materialized_view, local_context);
validate(local_context, structure);
@ -371,7 +370,6 @@ AsyncRefreshParamPtrs PartitionTransformer::constructRefreshParams(
AsyncRefreshParamPtr refresh_param = std::make_shared<AsyncRefreshParam>();
const auto & settings = local_context->getSettingsRef();
auto query = mv_query->clone();
QueryRewriter{}.rewrite(query, local_context, false);
bool cascading = local_context->getSettingsRef().cascading_refresh_materialized_view;
bool insert_overwrite = local_context->getSettingsRef().enable_mv_async_insert_overwrite;

View File

@ -590,7 +590,7 @@ void StorageMaterializedView::executeByDropInsert(AsyncRefreshParamPtr param, Co
{
std::optional<CurrentThread::QueryScope> query_scope;
query_scope.emplace(insert_context);
CurrentThread::get().pushTenantId(insert_context->getSettingsRef().tenant_id);
insert_context->setTenantId(insert_context->getSettingsRef().tenant_id);
BlockIO insert_io;
try
{
@ -623,7 +623,6 @@ void StorageMaterializedView::executeByDropInsert(AsyncRefreshParamPtr param, Co
insert_io.onException();
throw;
}
query_scope.reset();
}
catch (...)
@ -701,7 +700,7 @@ void StorageMaterializedView::executeByInsertOverwrite(AsyncRefreshParamPtr para
{
std::optional<CurrentThread::QueryScope> query_scope;
query_scope.emplace(insert_overwrite_context);
CurrentThread::get().pushTenantId(insert_overwrite_context->getSettingsRef().tenant_id);
insert_overwrite_context->setTenantId(insert_overwrite_context->getSettingsRef().tenant_id);
LOG_DEBUG(log, "refresh sync materialized view refresh insert overwite query: {}", param->insert_overwrite_query);
BlockIO insert_io;