diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 48ba57d0c7..1f651f0c3d 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -539,7 +539,7 @@ void StorageReplicatedMergeTree::createReplica() String is_lost_value = last_added_replica.empty() ? "0" : "1"; Coordination::Requests ops; - Coordination::Responses resps; + Coordination::Responses responses; ops.emplace_back(zkutil::makeCreateRequest(replica_path, "", zkutil::CreateMode::Persistent)); ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/host", "", zkutil::CreateMode::Persistent)); ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/log_pointer", "", zkutil::CreateMode::Persistent)); @@ -553,13 +553,13 @@ void StorageReplicatedMergeTree::createReplica() /// Check version of /replicas to see if there are any replicas created at the same moment of time. ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/replicas", "last added replica: " + replica_name, replicas_stat.version)); - code = zookeeper->tryMulti(ops, resps); + code = zookeeper->tryMulti(ops, responses); if (code == Coordination::Error::ZNODEEXISTS) throw Exception("Replica " + replica_path + " already exists.", ErrorCodes::REPLICA_IS_ALREADY_EXIST); else if (code == Coordination::Error::ZBADVERSION) LOG_ERROR(log, "Retrying createReplica(), because some other replicas were created at the same time"); else - zkutil::KeeperMultiException::check(code, ops, resps); + zkutil::KeeperMultiException::check(code, ops, responses); } while (code == Coordination::Error::ZBADVERSION); } @@ -1894,37 +1894,72 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coo event->wait(); } - /// The order of the following three actions is important. Entries in the log can be duplicated, but they can not be lost. + /// The order of the following three actions is important. - String raw_log_pointer = zookeeper->get(source_path + "/log_pointer"); - - Coordination::Requests ops; - ops.push_back(zkutil::makeSetRequest(replica_path + "/log_pointer", raw_log_pointer, -1)); - - /// For support old versions CH. - if (source_is_lost_stat.version == -1) + Strings source_queue_names; + /// We are trying to get consistent /log_pointer and /queue state. Otherwise + /// we can possibly duplicate entries in queue of cloned replica. + while (true) { - /// We check that it was not suddenly upgraded to new version. - /// Otherwise it can be upgraded and instantly become lost, but we cannot notice that. - ops.push_back(zkutil::makeCreateRequest(source_path + "/is_lost", "0", zkutil::CreateMode::Persistent)); - ops.push_back(zkutil::makeRemoveRequest(source_path + "/is_lost", -1)); + Coordination::Stat log_pointer_stat; + String raw_log_pointer = zookeeper->get(source_path + "/log_pointer", &log_pointer_stat); + + Coordination::Requests ops; + ops.push_back(zkutil::makeSetRequest(replica_path + "/log_pointer", raw_log_pointer, -1)); + + /// For support old versions CH. + if (source_is_lost_stat.version == -1) + { + /// We check that it was not suddenly upgraded to new version. + /// Otherwise it can be upgraded and instantly become lost, but we cannot notice that. + ops.push_back(zkutil::makeCreateRequest(source_path + "/is_lost", "0", zkutil::CreateMode::Persistent)); + ops.push_back(zkutil::makeRemoveRequest(source_path + "/is_lost", -1)); + } + else /// The replica we clone should not suddenly become lost. + ops.push_back(zkutil::makeCheckRequest(source_path + "/is_lost", source_is_lost_stat.version)); + + Coordination::Responses responses; + + /// Let's remember the queue of the reference/master replica. + source_queue_names = zookeeper->getChildren(source_path + "/queue"); + + /// Check that our log pointer didn't changed while we read queue entries + ops.push_back(zkutil::makeCheckRequest(source_path + "/log_pointer", log_pointer_stat.version)); + + auto rc = zookeeper->tryMulti(ops, responses); + + if (rc == Coordination::ZOK) + { + break; + } + else if (rc == Coordination::Error::ZNODEEXISTS) + { + throw Exception( + "Can not clone replica, because the " + source_replica + " updated to new ClickHouse version", + ErrorCodes::REPLICA_STATUS_CHANGED); + } + else if (responses[1]->error == Coordination::Error::ZBADVERSION) + { + /// If is_lost node version changed than source replica also lost, + /// so we cannot clone from it. + throw Exception( + "Can not clone replica, because the " + source_replica + " became lost", ErrorCodes::REPLICA_STATUS_CHANGED); + } + else if (responses.back()->error == Coordination::Error::ZBADVERSION) + { + /// If source replica's log_pointer changed than we probably read + /// stale state of /queue and have to try one more time. + LOG_WARNING(log, "Log pointer of source replica " << source_replica << " changed while we loading queue nodes. Will retry."); + continue; + } + else + { + zkutil::KeeperMultiException::check(rc, ops, responses); + } } - else /// The replica we clone should not suddenly become lost. - ops.push_back(zkutil::makeCheckRequest(source_path + "/is_lost", source_is_lost_stat.version)); - Coordination::Responses resp; - - auto error = zookeeper->tryMulti(ops, resp); - if (error == Coordination::Error::ZBADVERSION) - throw Exception("Can not clone replica, because the " + source_replica + " became lost", ErrorCodes::REPLICA_STATUS_CHANGED); - else if (error == Coordination::Error::ZNODEEXISTS) - throw Exception("Can not clone replica, because the " + source_replica + " updated to new ClickHouse version", ErrorCodes::REPLICA_STATUS_CHANGED); - else - zkutil::KeeperMultiException::check(error, ops, resp); - - /// Let's remember the queue of the reference/master replica. - Strings source_queue_names = zookeeper->getChildren(source_path + "/queue"); std::sort(source_queue_names.begin(), source_queue_names.end()); + Strings source_queue; for (const String & entry_name : source_queue_names) {