Merge branch 'cnch_dev_handle_large_kv' into 'cnch-dev'

feat(clickhousech@m-4209342798): catalog support large KV

See merge request dp/ClickHouse!22351
# Conflicts:
#	ci_scripts/config/users.xml
This commit is contained in:
fredwang 2024-08-30 05:42:08 +00:00
parent fecbc10fc9
commit 49ce776559
21 changed files with 599 additions and 25 deletions

View File

@ -21,6 +21,7 @@
<background_schedule_pool_size>6</background_schedule_pool_size>
<min_insert_block_size_bytes>8589934592</min_insert_block_size_bytes>
<cnch_max_cached_storage>50000</cnch_max_cached_storage>
<max_query_size>0</max_query_size>
</default>
<!-- Profile that allows only read queries. -->

View File

@ -20,6 +20,7 @@
#include <Catalog/MetastoreFDBImpl.h>
#include <Common/HostWithPorts.h>
#include <Catalog/StringHelper.h>
#include <Catalog/LargeKVHandler.h>
#include <Protos/cnch_common.pb.h>
#include <Protos/data_models.pb.h>
#include <Common/filesystemHelpers.h>
@ -362,6 +363,14 @@ private:
{
std::string value;
metastore_ptr->get(full_key, value);
// try parse large KV before really dump metadata.
DB::Protos::DataModelLargeKVMeta large_kv_model;
if (Catalog::tryParseLargeKVMetaModel(value, large_kv_model))
{
std::cout << "Large KV base value: \n" << large_kv_model.DebugString() << std::endl;
tryGetLargeValue(metastore_ptr, name_space, full_key, value);
std::cout << "Original value : " << std::endl;
}
dumpMetadata(cmd.key, value);
break;
}

View File

@ -24,6 +24,8 @@
#include <Catalog/CatalogFactory.h>
#include <Catalog/DataModelPartWrapper.h>
#include <Catalog/StringHelper.h>
#include <Catalog/LargeKVHandler.h>
#include <Catalog/CatalogBackgroundTask.h>
#include <Parsers/parseQuery.h>
#include <Parsers/ParserQuery.h>
#include <Parsers/queryToString.h>
@ -557,6 +559,9 @@ namespace Catalog
topology_key = name_space;
else
topology_key = name_space + "_" + config.topology_key;
// Add background task to do some GC job for Catalog.
bg_task = std::make_shared<CatalogBackgroundTask>(Context::createCopy(context.shared_from_this()), meta_proxy->getMetastore(), name_space);
},
ProfileEvents::CatalogConstructorSuccess,
ProfileEvents::CatalogConstructorFailed);
@ -957,7 +962,11 @@ namespace Catalog
return;
}
auto storage = tryGetTableByUUID(context, UUIDHelpers::UUIDToString(uuid), TxnTimestamp::maxTS());
StoragePtr storage;
if (auto query_context = CurrentThread::getGroup()->query_context.lock())
storage = tryGetTableByUUID(*query_context, UUIDHelpers::UUIDToString(uuid), TxnTimestamp::maxTS());
else
storage = tryGetTableByUUID(context, UUIDHelpers::UUIDToString(uuid), TxnTimestamp::maxTS());
if (auto pcm = context.getPartCacheManager(); pcm && storage)
{
@ -6233,7 +6242,12 @@ namespace Catalog
for (const String & dependency : dependencies)
batch_write.AddDelete(MetastoreProxy::viewDependencyKey(name_space, dependency, table_id.uuid()));
batch_write.AddPut(SinglePutRequest(MetastoreProxy::tableStoreKey(name_space, table_id.uuid(), ts.toUInt64()), table.SerializeAsString()));
addPotentialLargeKVToBatchwrite(
meta_proxy->getMetastore(),
batch_write,
name_space,
MetastoreProxy::tableStoreKey(name_space, table_id.uuid(), ts.toUInt64()),
table.SerializeAsString());
// use database name and table name in table_id is required because it may different with that in table data model.
batch_write.AddPut(SinglePutRequest(
MetastoreProxy::tableTrashKey(name_space, table_id.database(), table_id.name(), ts.toUInt64()), table_id.SerializeAsString()));

View File

@ -82,6 +82,8 @@ enum class VisibilityLevel
All
};
class CatalogBackgroundTask;
class Catalog
{
public:
@ -890,6 +892,8 @@ public:
void commitCheckpointVersion(const UUID & uuid, std::shared_ptr<DB::Protos::ManifestListModel> checkpoint_version);
void cleanTableVersions(const UUID & uuid, std::vector<std::shared_ptr<DB::Protos::ManifestListModel>> versions_to_clean);
void shutDown() {bg_task.reset();}
private:
Poco::Logger * log = &Poco::Logger::get("Catalog");
Context & context;
@ -901,6 +905,8 @@ private:
std::mutex all_storage_nhut_mutex;
CatalogSettings settings;
std::shared_ptr<CatalogBackgroundTask> bg_task;
std::shared_ptr<Protos::DataModelDB> tryGetDatabaseFromMetastore(const String & database, const UInt64 & ts);
std::shared_ptr<Protos::DataModelTable>
tryGetTableFromMetastore(const String & table_uuid, const UInt64 & ts, bool with_prev_versions = false, bool with_deleted = false);

View File

@ -0,0 +1,118 @@
#include <Catalog/CatalogBackgroundTask.h>
#include <Catalog/MetastoreProxy.h>
#include <Catalog/LargeKVHandler.h>
#include <MergeTreeCommon/CnchServerManager.h>
namespace DB
{
namespace Catalog
{
CatalogBackgroundTask::CatalogBackgroundTask(
const ContextPtr & context_,
const std::shared_ptr<IMetaStore> & metastore_,
const String & name_space_)
: context(context_),
metastore(metastore_),
name_space(name_space_)
{
task_holder = context->getSchedulePool().createTask(
"CatalogBGTask",
[this](){
execute();
}
);
task_holder->activate();
// wait for server startup
task_holder->scheduleAfter(30*1000);
}
CatalogBackgroundTask::~CatalogBackgroundTask()
{
try
{
task_holder->deactivate();
}
catch (...)
{
tryLogCurrentException(log);
}
}
void CatalogBackgroundTask::execute()
{
// only server can perform catalog bg task
if (context->getServerType() != ServerType::cnch_server)
return;
LOG_DEBUG(log, "Try execute catalog bg task.");
try
{
cleanStaleLargeKV();
}
catch (...)
{
tryLogCurrentException(log, "Exception happens while executing catalog bg task.");
}
// execute every 1 hour.
task_holder->scheduleAfter(60*60*1000);
}
void CatalogBackgroundTask::cleanStaleLargeKV()
{
// only leader can execute clean job
if (!context->getCnchServerManager()->isLeader())
return;
// scan large kv records
std::unordered_map<String, String> uuid_to_key;
String large_kv_reference_prefix = MetastoreProxy::largeKVReferencePrefix(name_space);
auto it = metastore->getByPrefix(large_kv_reference_prefix);
while (it->next())
{
String uuid = it->key().substr(large_kv_reference_prefix.size());
uuid_to_key.emplace(uuid, it->value());
}
// check for each large KV if still been referenced by stored key
for (const auto & [uuid, key] : uuid_to_key)
{
String value;
metastore->get(key, value);
if (!value.empty())
{
Protos::DataModelLargeKVMeta large_kv_model;
if (tryParseLargeKVMetaModel(value, large_kv_model) && large_kv_model.uuid() == uuid)
continue;
}
// remove large KV because it is not been referenced by original key
BatchCommitRequest batch_write;
BatchCommitResponse resp;
auto large_kv_it = metastore->getByPrefix(MetastoreProxy::largeKVDataPrefix(name_space, uuid));
while (large_kv_it->next())
batch_write.AddDelete(large_kv_it->key());
batch_write.AddDelete(MetastoreProxy::largeKVReferenceKey(name_space, uuid));
try
{
metastore->batchWrite(batch_write, resp);
LOG_DEBUG(log, "Removed large KV(uuid: {}) from metastore.", uuid);
}
catch (...)
{
tryLogCurrentException(log, "Error occurs while removing large kv.");
}
}
}
}
}

View File

@ -0,0 +1,41 @@
#pragma once
#include <Catalog/IMetastore.h>
#include <Core/BackgroundSchedulePool.h>
#include <Interpreters/Context.h>
namespace DB
{
namespace Catalog
{
class CatalogBackgroundTask
{
public:
CatalogBackgroundTask(
const ContextPtr & context_,
const std::shared_ptr<IMetaStore> & metastore_,
const String & name_space_);
~CatalogBackgroundTask();
void execute();
private:
void cleanStaleLargeKV();
Poco::Logger * log = &Poco::Logger::get("CatalogBGTask");
ContextPtr context;
std::shared_ptr<IMetaStore> metastore;
String name_space;
BackgroundSchedulePool::TaskHolder task_holder;
};
}
}

View File

@ -117,6 +117,11 @@ public:
* get limitations of the kv store
*/
virtual uint32_t getMaxBatchSize() = 0;
/***
* get limitation single a KV size
*/
virtual uint32_t getMaxKVSize() = 0;
};
}

View File

@ -0,0 +1,167 @@
#include <Catalog/LargeKVHandler.h>
#include <Catalog/MetastoreProxy.h>
#include <Common/SipHash.h>
#include <common/logger_useful.h>
#include <Poco/SHA1Engine.h>
#include <boost/algorithm/hex.hpp>
namespace DB
{
namespace ErrorCodes
{
extern const int CORRUPTED_DATA;
}
namespace Catalog
{
// Using SHA-1 value of the KV as its UUID so that we can perform CAS based on it.
String getUUIDForLargeKV(const String & key, const String & value)
{
Poco::SHA1Engine engine;
engine.update(key.data(), key.size());
engine.update(value.data(), value.size());
const std::vector<unsigned char> & sha1_value = engine.digest();
String hexed_hash;
hexed_hash.resize(sha1_value.size() * 2);
boost::algorithm::hex(sha1_value.begin(), sha1_value.end(), hexed_hash.data());
return hexed_hash;
}
bool tryParseLargeKVMetaModel(const String & serialized, Protos::DataModelLargeKVMeta & model)
{
if (serialized.compare(0, 4, MAGIC_NUMBER) == 0)
return model.ParseFromArray(serialized.c_str() + 4, serialized.size()-4);
return false;
}
void tryGetLargeValue(const std::shared_ptr<IMetaStore> & metastore, const String & name_space, const String & key, String & value)
{
Protos::DataModelLargeKVMeta large_kv_model;
if (!tryParseLargeKVMetaModel(value, large_kv_model))
return;
String kv_id = large_kv_model.uuid();
UInt32 subkv_number = large_kv_model.subkv_number();
String resolved;
if (large_kv_model.has_value_size())
resolved.reserve(large_kv_model.value_size());
if (subkv_number < 10)
{
std::vector<String> request_keys(subkv_number);
for (size_t i=0; i<subkv_number; i++)
request_keys[i] = MetastoreProxy::largeKVDataKey(name_space, kv_id, i);
const auto & sub_values = metastore->multiGet(request_keys);
for (const auto & [subvalue, _] : sub_values)
resolved += subvalue;
}
else
{
auto it = metastore->getByPrefix(MetastoreProxy::largeKVDataPrefix(name_space, kv_id));
while (it->next())
resolved += it->value();
}
//check kv uuid(KV hash) to verity the data integrity
if (getUUIDForLargeKV(key, resolved) != kv_id)
throw Exception(fmt::format("Cannot resolve value of big KV. Data may be corrupted. Origin value size : {}, resolved size : {}"
, large_kv_model.value_size(), resolved.size()), ErrorCodes::CORRUPTED_DATA);
value.swap(resolved);
}
LargeKVWrapperPtr tryGetLargeKVWrapper(
const std::shared_ptr<IMetaStore> & metastore,
const String & name_space,
const String & key,
const String & value,
bool if_not_exists,
const String & expected)
{
const size_t max_allowed_kv_size = metastore->getMaxKVSize();
size_t value_size = value.size();
if (value_size > max_allowed_kv_size)
{
String large_kv_id = getUUIDForLargeKV(key, value);
std::vector<SinglePutRequest> puts;
UInt64 sub_key_index = 0;
// split serialized data to make substrings match the KV size limitation
for (size_t i=0; i<value_size; i+=max_allowed_kv_size)
{
puts.emplace_back(SinglePutRequest(MetastoreProxy::largeKVDataKey(name_space, large_kv_id, sub_key_index++),
value.substr(i, max_allowed_kv_size)));
}
size_t subkv_number = puts.size();
// Write an additional kv reference for GC. Use CAS to aoivd id conflict (Although it unlikely happens)
puts.emplace_back(SinglePutRequest(MetastoreProxy::largeKVReferenceKey(name_space, large_kv_id), key, true));
Protos::DataModelLargeKVMeta large_kv_meta;
large_kv_meta.set_uuid(large_kv_id);
large_kv_meta.set_subkv_number(subkv_number);
large_kv_meta.set_value_size(value_size);
SinglePutRequest base_req(key, MAGIC_NUMBER + large_kv_meta.SerializeAsString());
base_req.if_not_exists = if_not_exists;
if (!expected.empty())
{
// if expected value size exceed max kv size, construct DataModelLargeKVMeta to perform CAS.
if (expected.size() > max_allowed_kv_size)
{
Protos::DataModelLargeKVMeta expected_large_kv_model;
expected_large_kv_model.set_uuid(getUUIDForLargeKV(key, expected));
expected_large_kv_model.set_subkv_number(1 + ((expected.size() - 1) / max_allowed_kv_size));
expected_large_kv_model.set_value_size(expected.size());
base_req.expected_value = MAGIC_NUMBER + expected_large_kv_model.SerializeAsString();
}
else
base_req.expected_value = expected;
}
LargeKVWrapperPtr wrapper = std::make_shared<LargeKVWrapper>(std::move(base_req));
wrapper->sub_requests.swap(puts);
return wrapper;
}
else
{
SinglePutRequest base_req(key, value);
base_req.if_not_exists = if_not_exists;
if (!expected.empty())
base_req.expected_value = expected;
LargeKVWrapperPtr wrapper = std::make_shared<LargeKVWrapper>(std::move(base_req));
return wrapper;
}
}
void addPotentialLargeKVToBatchwrite(
const std::shared_ptr<IMetaStore> & metastore,
BatchCommitRequest & batch_request,
const String & name_space,
const String & key,
const String & value,
bool if_not_eixts,
const String & expected)
{
LargeKVWrapperPtr largekv_wrapper = tryGetLargeKVWrapper(metastore, name_space, key, value, if_not_eixts, expected);
for (auto & sub_req : largekv_wrapper->sub_requests)
batch_request.AddPut(sub_req);
batch_request.AddPut(largekv_wrapper->base_request);
}
}
}

View File

@ -0,0 +1,53 @@
#pragma once
#include <Catalog/MetastoreCommon.h>
#include <Catalog/IMetastore.h>
#include <Protos/data_models.pb.h>
namespace DB
{
namespace Catalog
{
static const char * MAGIC_NUMBER = "LGKV";
struct LargeKVWrapper
{
LargeKVWrapper(SinglePutRequest && base)
: base_request(std::move(base))
{
}
SinglePutRequest base_request;
std::vector<SinglePutRequest> sub_requests;
bool isLargeKV() { return sub_requests.size() > 0; }
};
using LargeKVWrapperPtr = std::shared_ptr<LargeKVWrapper>;
LargeKVWrapperPtr tryGetLargeKVWrapper(
const std::shared_ptr<IMetaStore> & metastore,
const String & name_space,
const String & key,
const String & value,
bool if_not_exists = false,
const String & expected = "");
bool tryParseLargeKVMetaModel(const String & serialized, Protos::DataModelLargeKVMeta & model);
void tryGetLargeValue(const std::shared_ptr<IMetaStore> & metastore, const String & name_space, const String & key, String & value);
void addPotentialLargeKVToBatchwrite(
const std::shared_ptr<IMetaStore> & metastore,
BatchCommitRequest & batch_request,
const String & name_space,
const String & key,
const String & value,
bool if_not_eixts = false,
const String & expected = "");
}
}

View File

@ -122,6 +122,9 @@ public:
// leave some margin
uint32_t getMaxBatchSize() final { return MAX_BYTEKV_BATCH_SIZE - 1000; }
// leave some margin
uint32_t getMaxKVSize() final { return MAX_BYTEKV_KV_SIZE - 200; }
public:
std::shared_ptr<ByteKVClient> client;

View File

@ -33,7 +33,7 @@ namespace Catalog
class MetastoreFDBImpl : public IMetaStore
{
// Limitations of FDB (in bytes)
#define MAX_FDB_KV_SIZE 10000
#define MAX_FDB_KV_SIZE 100000 //Hard limit.Keys cannot exceed 10,000 bytes in size. Values cannot exceed 100,000 bytes in size
#define MAX_FDB_TRANSACTION_SIZE 10000000
public:
@ -105,6 +105,9 @@ public:
// leave some margin
uint32_t getMaxBatchSize() final { return MAX_FDB_TRANSACTION_SIZE - 1000; }
// leave some margin
uint32_t getMaxKVSize() final { return MAX_FDB_KV_SIZE - 200; }
private:
/// convert metastore specific error code to Clickhouse error code for processing convenience in upper layer.
static int toCommonErrorCode(const fdb_error_t & error_t);

View File

@ -20,6 +20,7 @@
#include <string.h>
#include <Catalog/MetastoreCommon.h>
#include <Catalog/MetastoreProxy.h>
#include <Catalog/LargeKVHandler.h>
#include <CloudServices/CnchBGThreadCommon.h>
#include <DaemonManager/BGJobStatusInCatalog.h>
#include <Databases/MySQL/MaterializedMySQLCommon.h>
@ -342,8 +343,16 @@ void MetastoreProxy::createTable(const String & name_space, const UUID & db_uuid
BatchCommitRequest batch_write;
batch_write.AddPut(SinglePutRequest(nonHostUpdateKey(name_space, uuid), "0", true));
// insert table meta
batch_write.AddPut(SinglePutRequest(tableStoreKey(name_space, uuid, table_data.commit_time()), serialized_meta, true));
// insert table meta. Handle by largeKVHandler in case the table meta exceeds KV size limitation
addPotentialLargeKVToBatchwrite(
metastore_ptr,
batch_write,
name_space,
tableStoreKey(name_space, uuid, table_data.commit_time()),
serialized_meta,
true/*if_not_exists*/);
/// add dependency mapping if need
for (const String & dependency : dependencies)
batch_write.AddPut(SinglePutRequest(viewDependencyKey(name_space, dependency, uuid), uuid));
@ -419,14 +428,34 @@ void MetastoreProxy::dropUDF(const String & name_space, const String &resolved_n
void MetastoreProxy::updateTable(const String & name_space, const String & table_uuid, const String & table_info_new, const UInt64 & ts)
{
metastore_ptr->put(tableStoreKey(name_space, table_uuid, ts), table_info_new);
if (table_info_new.size() > metastore_ptr->getMaxKVSize())
{
BatchCommitRequest batch_write;
addPotentialLargeKVToBatchwrite(
metastore_ptr,
batch_write,
name_space,
tableStoreKey(name_space, table_uuid, ts),
table_info_new);
BatchCommitResponse resp;
metastore_ptr->batchWrite(batch_write, resp);
}
else
metastore_ptr->put(tableStoreKey(name_space, table_uuid, ts), table_info_new);
}
void MetastoreProxy::updateTableWithID(const String & name_space, const Protos::TableIdentifier & table_id, const DB::Protos::DataModelTable & table_data)
{
BatchCommitRequest batch_write;
batch_write.AddPut(SinglePutRequest(tableUUIDMappingKey(name_space, table_id.database(), table_id.name()), table_id.SerializeAsString()));
batch_write.AddPut(SinglePutRequest(tableStoreKey(name_space, table_id.uuid(), table_data.commit_time()), table_data.SerializeAsString()));
addPotentialLargeKVToBatchwrite(
metastore_ptr,
batch_write,
name_space,
tableStoreKey(name_space, table_id.uuid(), table_data.commit_time()),
table_data.SerializeAsString());
BatchCommitResponse resp;
metastore_ptr->batchWrite(batch_write, resp);
}
@ -436,7 +465,10 @@ void MetastoreProxy::getTableByUUID(const String & name_space, const String & ta
auto it = metastore_ptr->getByPrefix(tableStorePrefix(name_space, table_uuid));
while(it->next())
{
tables_info.emplace_back(it->value());
String table_meta = it->value();
/// NOTE: Too many large KVs will cause severe performance regression. It rarely happens
tryGetLargeValue(metastore_ptr, name_space, it->key(), table_meta);
tables_info.emplace_back(std::move(table_meta));
}
}
@ -830,10 +862,14 @@ void MetastoreProxy::prepareRenameTable(const String & name_space,
RPCHelpers::fillUUID(to_db_uuid, *identifier.mutable_db_uuid());
batch_write.AddPut(SinglePutRequest(tableUUIDMappingKey(name_space, to_table.database(), to_table.name()), identifier.SerializeAsString(), true));
String meta_data;
to_table.SerializeToString(&meta_data);
/// add new table meta data with new name
batch_write.AddPut(SinglePutRequest(tableStoreKey(name_space, table_uuid, to_table.commit_time()), meta_data, true));
addPotentialLargeKVToBatchwrite(
metastore_ptr,
batch_write,
name_space,
tableStoreKey(name_space, table_uuid, to_table.commit_time()),
to_table.SerializeAsString(),
true/*if_not_exists*/);
}
bool MetastoreProxy::alterTable(const String & name_space, const Protos::DataModelTable & table, const Strings & masks_to_remove, const Strings & masks_to_add)
@ -841,7 +877,14 @@ bool MetastoreProxy::alterTable(const String & name_space, const Protos::DataMod
BatchCommitRequest batch_write;
String table_uuid = UUIDHelpers::UUIDToString(RPCHelpers::createUUID(table.uuid()));
batch_write.AddPut(SinglePutRequest(tableStoreKey(name_space, table_uuid, table.commit_time()), table.SerializeAsString(), true));
addPotentialLargeKVToBatchwrite(
metastore_ptr,
batch_write,
name_space,
tableStoreKey(name_space, table_uuid, table.commit_time()),
table.SerializeAsString(),
true/*if_not_exists*/);
Protos::TableIdentifier identifier;
identifier.set_database(table.database());
@ -3371,7 +3414,9 @@ std::shared_ptr<Protos::DataModelSensitiveDatabase> MetastoreProxy::getSensitive
String MetastoreProxy::getAccessEntity(EntityType type, const String & name_space, const String & name) const
{
String data;
metastore_ptr->get(accessEntityKey(type, name_space, name), data);
String access_entity_key = accessEntityKey(type, name_space, name);
metastore_ptr->get(access_entity_key, data);
tryGetLargeValue(metastore_ptr, name_space, access_entity_key, data);
return data;
}
@ -3394,7 +3439,16 @@ std::vector<std::pair<String, UInt64>> MetastoreProxy::getEntities(EntityType ty
requests.push_back(accessEntityKey(type, name_space, s));
}
return metastore_ptr->multiGet(requests);
auto res = metastore_ptr->multiGet(requests);
for (size_t i=0; i<res.size(); i++)
{
// try parse large value
String & value = res[i].first;
tryGetLargeValue(metastore_ptr, name_space, requests[i], value);
}
return res;
}
Strings MetastoreProxy::getAllAccessEntities(EntityType type, const String & name_space) const
@ -3403,7 +3457,10 @@ Strings MetastoreProxy::getAllAccessEntities(EntityType type, const String & nam
auto it = metastore_ptr->getByPrefix(accessEntityPrefix(type, name_space));
while (it->next())
{
models.push_back(it->value());
String value = it->value();
/// NOTE: Too many large KVs will cause severe performance regression.
tryGetLargeValue(metastore_ptr, name_space, it->key(), value);
models.push_back(std::move(value));
}
return models;
}
@ -3431,12 +3488,30 @@ bool MetastoreProxy::putAccessEntity(EntityType type, const String & name_space,
BatchCommitRequest batch_write;
BatchCommitResponse resp;
auto is_rename = !old_access_entity.name().empty() && new_access_entity.name() != old_access_entity.name();
auto put_access_entity_request = SinglePutRequest(accessEntityKey(type, name_space, new_access_entity.name()), new_access_entity.SerializeAsString(), !replace_if_exists);
String uuid = UUIDHelpers::UUIDToString(RPCHelpers::createUUID(new_access_entity.uuid()));
String serialized_old_access_entity = old_access_entity.SerializeAsString();
if (!serialized_old_access_entity.empty() && !is_rename)
put_access_entity_request.expected_value = serialized_old_access_entity;
batch_write.AddPut(put_access_entity_request);
{
addPotentialLargeKVToBatchwrite(
metastore_ptr,
batch_write,
name_space,
accessEntityKey(type, name_space, new_access_entity.name()),
new_access_entity.SerializeAsString(),
!replace_if_exists,
serialized_old_access_entity);
}
else
{
addPotentialLargeKVToBatchwrite(
metastore_ptr,
batch_write,
name_space,
accessEntityKey(type, name_space, new_access_entity.name()),
new_access_entity.SerializeAsString(),
!replace_if_exists);
}
batch_write.AddPut(SinglePutRequest(accessEntityUUIDNameMappingKey(name_space, uuid), new_access_entity.name(), !replace_if_exists));
if (is_rename)
batch_write.AddDelete(accessEntityKey(type, name_space, old_access_entity.name())); // delete old one in case of rename
@ -3446,21 +3521,22 @@ bool MetastoreProxy::putAccessEntity(EntityType type, const String & name_space,
}
catch (Exception & e)
{
auto puts_size = batch_write.puts.size();
if (e.code() == ErrorCodes::METASTORE_COMMIT_CAS_FAILURE)
{
if (resp.puts.count(0) && replace_if_exists && !serialized_old_access_entity.empty())
if (resp.puts.count(puts_size-2) && replace_if_exists && !serialized_old_access_entity.empty())
{
throw Exception(
"Access Entity has recently been changed in catalog. Please try the request again.",
ErrorCodes::METASTORE_ACCESS_ENTITY_CAS_ERROR);
}
else if (resp.puts.count(0) && !replace_if_exists)
else if (resp.puts.count(puts_size-2) && !replace_if_exists)
{
throw Exception(
"Access Entity with the same name already exists in catalog. Please use another name and try again.",
ErrorCodes::METASTORE_ACCESS_ENTITY_EXISTS_ERROR);
}
else if (resp.puts.count(1) && !replace_if_exists)
else if (resp.puts.count(puts_size-1) && !replace_if_exists)
{
throw Exception(
"Access Entity with the same UUID already exists in catalog. Please use another name and try again.",

View File

@ -132,6 +132,9 @@ namespace DB::Catalog
#define MANIFEST_DATA_PREFIX "MFST_"
#define MANIFEST_LIST_PREFIX "MFSTS_"
#define LARGE_KV_DATA_PREFIX "LGKV_"
#define LARGE_KV_REFERENCE "LGKVRF_"
using EntityType = IAccessEntity::Type;
struct EntityMetastorePrefix
{
@ -954,6 +957,29 @@ public:
return manifestListPrefix(name_space, uuid) + toString(table_version);
}
static String largeKVDataPrefix(const String & name_space, const String & uuid)
{
return escapeString(name_space) + '_' + LARGE_KV_DATA_PREFIX + uuid + '_';
}
static String largeKVDataKey(const String & name_space, const String & uuid, UInt64 index)
{
// keep records in the kv storage with the same order as index. Support at most 10k sub-kv
std::ostringstream oss;
oss << std::setw(5) << std::setfill('0') << index;
return largeKVDataPrefix(name_space, uuid) + oss.str();
}
static String largeKVReferencePrefix(const String & name_space)
{
return escapeString(name_space) + '_' + LARGE_KV_REFERENCE;
}
static String largeKVReferenceKey(const String & name_space, const String & uuid)
{
return largeKVReferencePrefix(name_space) + uuid;
}
// parse the first key in format of '{prefix}{escapedString(first_key)}_postfix'
// note that prefix should contains _, like TCS_
// return [first_key, postfix]
@ -1037,7 +1063,7 @@ public:
void updateTableWithID(const String & name_space, const Protos::TableIdentifier & table_id, const DB::Protos::DataModelTable & table_data);
void getTableByUUID(const String & name_space, const String & table_uuid, Strings & tables_info);
void clearTableMeta(const String & name_space, const String & database, const String & table, const String & uuid, const Strings & dependencies, const UInt64 & ts = 0);
static void prepareRenameTable(const String & name_space, const String & table_uuid, const String & from_db, const String & from_table, const UUID & to_db_uuid, Protos::DataModelTable & to_table, BatchCommitRequest & batch_write);
void prepareRenameTable(const String & name_space, const String & table_uuid, const String & from_db, const String & from_table, const UUID & to_db_uuid, Protos::DataModelTable & to_table, BatchCommitRequest & batch_write);
bool alterTable(const String & name_space, const Protos::DataModelTable & table, const Strings & masks_to_remove, const Strings & masks_to_add);
Strings getAllTablesInDB(const String & name_space, const String & database);
IMetaStore::IteratorPtr getAllTablesMeta(const String & name_space);

View File

@ -1842,7 +1842,8 @@ void CnchServerServiceImpl::notifyTableCreated(
catch (...)
{
tryLogCurrentException(log, __PRETTY_FUNCTION__);
RPCHelpers::handleException(response->mutable_exception());
(void)response;
//RPCHelpers::handleException(response->mutable_exception());
}
});
}

View File

@ -593,6 +593,8 @@ struct ContextSharedPart
if (worker_status_manager)
worker_status_manager->shutdown();
if (cnch_catalog)
cnch_catalog->shutDown();
std::unique_ptr<SystemLogs> delete_system_logs;
std::unique_ptr<CnchSystemLogs> delete_cnch_system_logs;

View File

@ -811,3 +811,10 @@ message ManifestListModel
repeated uint64 txn_ids = 2;
optional bool checkpoint = 3;
}
message DataModelLargeKVMeta
{
required bytes uuid = 1; //uuid of the large KV
required uint64 subkv_number = 2;
optional uint64 value_size = 3; // record the value size of the large KV
}

View File

@ -2142,7 +2142,7 @@ void StorageCnchMergeTree::alter(const AlterCommands & commands, ContextPtr loca
StorageInMemoryMetadata new_metadata = getInMemoryMetadata();
TransactionCnchPtr txn = local_context->getCurrentTransaction();
auto action = txn->createAction<DDLAlterAction>(shared_from_this(), local_context->getSettingsRef(), local_context->getCurrentQueryId());
auto action = txn->createActionWithLocalContext<DDLAlterAction>(local_context, shared_from_this(), local_context->getSettingsRef(), local_context->getCurrentQueryId());
auto & alter_act = action->as<DDLAlterAction &>();
alter_act.setMutationCommands(mutation_commands);

View File

@ -89,7 +89,7 @@ void DDLAlterAction::executeV1(TxnTimestamp commit_time)
// updateTsCache(table->getStorageUUID(), commit_time);
if (!new_schema.empty() && new_schema!=old_schema)
{
catalog->alterTable(global_context, query_settings, table, new_schema, table->commit_time, txn_id, commit_time, is_modify_cluster_by);
catalog->alterTable(*getContext(), query_settings, table, new_schema, table->commit_time, txn_id, commit_time, is_modify_cluster_by);
LOG_DEBUG(log, "Successfully change schema in catalog.");
}
else

View File

@ -122,6 +122,13 @@ public:
{
return std::make_shared<TAction>(global_context, txn_record.txnID(), std::forward<Args>(args)...);
}
template <typename TAction, typename... Args>
ActionPtr createActionWithLocalContext(const ContextPtr & local_context, Args &&... args) const
{
return std::make_shared<TAction>(local_context, txn_record.txnID(), std::forward<Args>(args)...);
}
template <typename... Args>
IntentLockPtr createIntentLock(const String & lock_prefix, Args &&... args) const
{

View File

@ -0,0 +1,2 @@
1
1

View File

@ -0,0 +1,33 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
COL_NUMBER=5000
$CLICKHOUSE_CLIENT --multiquery <<EOF
SET max_query_size=0;
CREATE DATABASE IF NOT EXISTS test;
DROP TABLE IF EXISTS test.big_table;
EOF
QUERY=$(
echo "CREATE TABLE test.big_table(p_date Date, id Int32, "
for i in $(seq 1 5000); do
printf %s, "col_$i String COMMENT 'Add some column comment to make the column definition suprisingly loooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooong' "
done
echo "last_column String) ENGINE=CnchMergeTree PARTITION BY p_date ORDER BY id"
)
echo $QUERY | $CLICKHOUSE_CLIENT -n --max_query_size 0
$CLICKHOUSE_CLIENT --multiquery <<EOF
SET max_query_size=0;
SELECT countIf(length(definition) > 1000000) FROM system.cnch_tables where database='test' AND name='big_table';
ALTER TABLE test.big_table ADD COLUMN extra String;
SELECT count() FROM system.cnch_columns WHERE database='test' AND table='big_table' AND name='extra';
DROP TABLE test.big_table;
EOF
rm -f $TEMP_QUERY_FILE