Merge pull request #9583 from ClickHouse/trying_to_fix_clone_replica

Better clone of lost replica
This commit is contained in:
alexey-milovidov 2020-03-10 20:23:49 +03:00 committed by GitHub
commit 8382af4dd0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 64 additions and 29 deletions

View File

@ -539,7 +539,7 @@ void StorageReplicatedMergeTree::createReplica()
String is_lost_value = last_added_replica.empty() ? "0" : "1";
Coordination::Requests ops;
Coordination::Responses resps;
Coordination::Responses responses;
ops.emplace_back(zkutil::makeCreateRequest(replica_path, "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/host", "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/log_pointer", "", zkutil::CreateMode::Persistent));
@ -553,13 +553,13 @@ void StorageReplicatedMergeTree::createReplica()
/// Check version of /replicas to see if there are any replicas created at the same moment of time.
ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/replicas", "last added replica: " + replica_name, replicas_stat.version));
code = zookeeper->tryMulti(ops, resps);
code = zookeeper->tryMulti(ops, responses);
if (code == Coordination::Error::ZNODEEXISTS)
throw Exception("Replica " + replica_path + " already exists.", ErrorCodes::REPLICA_IS_ALREADY_EXIST);
else if (code == Coordination::Error::ZBADVERSION)
LOG_ERROR(log, "Retrying createReplica(), because some other replicas were created at the same time");
else
zkutil::KeeperMultiException::check(code, ops, resps);
zkutil::KeeperMultiException::check(code, ops, responses);
} while (code == Coordination::Error::ZBADVERSION);
}
@ -1894,9 +1894,15 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coo
event->wait();
}
/// The order of the following three actions is important. Entries in the log can be duplicated, but they can not be lost.
/// The order of the following three actions is important.
String raw_log_pointer = zookeeper->get(source_path + "/log_pointer");
Strings source_queue_names;
/// We are trying to get consistent /log_pointer and /queue state. Otherwise
/// we can possibly duplicate entries in queue of cloned replica.
while (true)
{
Coordination::Stat log_pointer_stat;
String raw_log_pointer = zookeeper->get(source_path + "/log_pointer", &log_pointer_stat);
Coordination::Requests ops;
ops.push_back(zkutil::makeSetRequest(replica_path + "/log_pointer", raw_log_pointer, -1));
@ -1912,19 +1918,48 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coo
else /// The replica we clone should not suddenly become lost.
ops.push_back(zkutil::makeCheckRequest(source_path + "/is_lost", source_is_lost_stat.version));
Coordination::Responses resp;
auto error = zookeeper->tryMulti(ops, resp);
if (error == Coordination::Error::ZBADVERSION)
throw Exception("Can not clone replica, because the " + source_replica + " became lost", ErrorCodes::REPLICA_STATUS_CHANGED);
else if (error == Coordination::Error::ZNODEEXISTS)
throw Exception("Can not clone replica, because the " + source_replica + " updated to new ClickHouse version", ErrorCodes::REPLICA_STATUS_CHANGED);
else
zkutil::KeeperMultiException::check(error, ops, resp);
Coordination::Responses responses;
/// Let's remember the queue of the reference/master replica.
Strings source_queue_names = zookeeper->getChildren(source_path + "/queue");
source_queue_names = zookeeper->getChildren(source_path + "/queue");
/// Check that our log pointer didn't changed while we read queue entries
ops.push_back(zkutil::makeCheckRequest(source_path + "/log_pointer", log_pointer_stat.version));
auto rc = zookeeper->tryMulti(ops, responses);
if (rc == Coordination::ZOK)
{
break;
}
else if (rc == Coordination::Error::ZNODEEXISTS)
{
throw Exception(
"Can not clone replica, because the " + source_replica + " updated to new ClickHouse version",
ErrorCodes::REPLICA_STATUS_CHANGED);
}
else if (responses[1]->error == Coordination::Error::ZBADVERSION)
{
/// If is_lost node version changed than source replica also lost,
/// so we cannot clone from it.
throw Exception(
"Can not clone replica, because the " + source_replica + " became lost", ErrorCodes::REPLICA_STATUS_CHANGED);
}
else if (responses.back()->error == Coordination::Error::ZBADVERSION)
{
/// If source replica's log_pointer changed than we probably read
/// stale state of /queue and have to try one more time.
LOG_WARNING(log, "Log pointer of source replica " << source_replica << " changed while we loading queue nodes. Will retry.");
continue;
}
else
{
zkutil::KeeperMultiException::check(rc, ops, responses);
}
}
std::sort(source_queue_names.begin(), source_queue_names.end());
Strings source_queue;
for (const String & entry_name : source_queue_names)
{