fix: persistentData->commit() was not protected by the persistentDataCommitLock (#8200)
* fix: persistentData->commit() was not protected by the persistentDataCommitLock, meaning it is possible for inconsistent data to be made durable on the tlog * fixed a compilation error
This commit is contained in:
parent
7b4598a53d
commit
f5161c362e
|
@ -2624,6 +2624,27 @@ ACTOR Future<Void> tLogEnablePopReq(TLogEnablePopRequest enablePopReq, TLogData*
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> updateDurableClusterID(TLogData* self) {
|
||||
loop {
|
||||
// Persist cluster ID once cluster has recovered.
|
||||
if (self->dbInfo->get().recoveryState == RecoveryState::FULLY_RECOVERED) {
|
||||
ASSERT(!self->durableClusterId.isValid());
|
||||
state UID ccClusterId = self->dbInfo->get().client.clusterId;
|
||||
self->durableClusterId = ccClusterId;
|
||||
ASSERT(ccClusterId.isValid());
|
||||
|
||||
wait(self->persistentDataCommitLock.take());
|
||||
state FlowLock::Releaser commitLockReleaser(self->persistentDataCommitLock);
|
||||
self->persistentData->set(
|
||||
KeyValueRef(persistClusterIdKey, BinaryWriter::toValue(ccClusterId, Unversioned())));
|
||||
wait(self->persistentData->commit());
|
||||
|
||||
return Void();
|
||||
}
|
||||
wait(self->dbInfo->onChange());
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> serveTLogInterface(TLogData* self,
|
||||
TLogInterface tli,
|
||||
Reference<LogData> logData,
|
||||
|
@ -2658,17 +2679,6 @@ ACTOR Future<Void> serveTLogInterface(TLogData* self,
|
|||
} else {
|
||||
logData->logSystem->set(Reference<ILogSystem>());
|
||||
}
|
||||
|
||||
// Persist cluster ID once cluster has recovered.
|
||||
auto ccClusterId = self->dbInfo->get().client.clusterId;
|
||||
if (self->dbInfo->get().recoveryState == RecoveryState::FULLY_RECOVERED &&
|
||||
!self->durableClusterId.isValid()) {
|
||||
ASSERT(ccClusterId.isValid());
|
||||
self->durableClusterId = ccClusterId;
|
||||
self->persistentData->set(
|
||||
KeyValueRef(persistClusterIdKey, BinaryWriter::toValue(ccClusterId, Unversioned())));
|
||||
wait(self->persistentData->commit());
|
||||
}
|
||||
}
|
||||
when(TLogPeekStreamRequest req = waitNext(tli.peekStreamMessages.getFuture())) {
|
||||
TraceEvent(SevDebug, "TLogPeekStream", logData->logId)
|
||||
|
@ -3592,6 +3602,9 @@ ACTOR Future<Void> tLog(IKeyValueStore* persistentData,
|
|||
if (recovered.canBeSet())
|
||||
recovered.send(Void());
|
||||
|
||||
if (!self.durableClusterId.isValid()) {
|
||||
self.sharedActors.send(updateDurableClusterID(&self));
|
||||
}
|
||||
self.sharedActors.send(commitQueue(&self));
|
||||
self.sharedActors.send(updateStorageLoop(&self));
|
||||
self.sharedActors.send(traceRole(Role::SHARED_TRANSACTION_LOG, tlogId));
|
||||
|
@ -3600,21 +3613,6 @@ ACTOR Future<Void> tLog(IKeyValueStore* persistentData,
|
|||
loop {
|
||||
choose {
|
||||
when(state InitializeTLogRequest req = waitNext(tlogRequests.getFuture())) {
|
||||
ASSERT(req.clusterId.isValid());
|
||||
// Durably persist the cluster ID if it is not already
|
||||
// durable and the cluster has progressed far enough
|
||||
// through recovery. To avoid different partitions from
|
||||
// persisting different cluster IDs, we need to wait
|
||||
// until a single cluster ID has been persisted in the
|
||||
// txnStateStore before finally writing it to disk.
|
||||
auto recoveryState = self.dbInfo->get().recoveryState;
|
||||
if (!self.durableClusterId.isValid() && recoveryState >= RecoveryState::ACCEPTING_COMMITS) {
|
||||
self.durableClusterId = req.clusterId;
|
||||
// Will let commit loop durably write the cluster ID.
|
||||
self.persistentData->set(
|
||||
KeyValueRef(persistClusterIdKey, BinaryWriter::toValue(req.clusterId, Unversioned())));
|
||||
}
|
||||
|
||||
if (!self.tlogCache.exists(req.recruitmentID)) {
|
||||
self.tlogCache.set(req.recruitmentID, req.reply.getFuture());
|
||||
self.sharedActors.send(
|
||||
|
|
Loading…
Reference in New Issue