Init TLogPersistent storage for a sharedTLog (#8363)
* Init TLogPersistent storage for a sharedTLog Description diff-1: Address review comments Patch udpates the code to intialize TLogPersistent storage for a shared TLog independent of intializing persistentState for a versioned Tlog data. Appraoch allows initializing Tlog persistent storage as well as writing 'persistFormat' key for a shared TLog earlier in the TLog creation lifecycle. Testing devRunCorrectness - 100K
This commit is contained in:
parent
90ce4053f4
commit
201eac77cf
|
@ -2359,10 +2359,7 @@ ACTOR Future<Void> initPersistentState(TLogData* self, Reference<LogData> logDat
|
|||
wait(self->persistentDataCommitLock.take());
|
||||
state FlowLock::Releaser commitLockReleaser(self->persistentDataCommitLock);
|
||||
|
||||
// PERSIST: Initial setup of persistentData for a brand new tLog for a new database
|
||||
state IKeyValueStore* storage = self->persistentData;
|
||||
wait(storage->init());
|
||||
storage->set(persistFormat);
|
||||
storage->set(
|
||||
KeyValueRef(BinaryWriter::toValue(logData->logId, Unversioned()).withPrefix(persistCurrentVersionKeys.begin),
|
||||
BinaryWriter::toValue(logData->version.get(), Unversioned())));
|
||||
|
@ -2392,7 +2389,7 @@ ACTOR Future<Void> initPersistentState(TLogData* self, Reference<LogData> logDat
|
|||
updatePersistentPopped(self, logData, logData->getTagData(tag));
|
||||
}
|
||||
|
||||
TraceEvent("TLogInitCommit", logData->logId).log();
|
||||
TraceEvent("TLogInitCommit", logData->logId);
|
||||
wait(self->persistentData->commit());
|
||||
return Void();
|
||||
}
|
||||
|
@ -2991,10 +2988,20 @@ ACTOR Future<Void> checkEmptyQueue(TLogData* self) {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> checkRecovered(TLogData* self) {
|
||||
TraceEvent("TLogCheckRecoveredBegin", self->dbgid).log();
|
||||
Optional<Value> v = wait(self->persistentData->readValue(StringRef()));
|
||||
TraceEvent("TLogCheckRecoveredEnd", self->dbgid).log();
|
||||
ACTOR Future<Void> initPersistentStorage(TLogData* self) {
|
||||
TraceEvent("TLogInitPersistentStorageStart", self->dbgid);
|
||||
|
||||
wait(self->persistentDataCommitLock.take());
|
||||
state FlowLock::Releaser commitLockReleaser(self->persistentDataCommitLock);
|
||||
|
||||
// PERSIST: Initial setup of persistentData for a brand new tLog for a new database
|
||||
state IKeyValueStore* storage = self->persistentData;
|
||||
wait(storage->init());
|
||||
storage->set(persistFormat);
|
||||
|
||||
wait(storage->commit());
|
||||
|
||||
TraceEvent("TLogInitPersistentStorageDone", self->dbgid);
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
@ -3577,13 +3584,13 @@ ACTOR Future<Void> tLog(IKeyValueStore* persistentData,
|
|||
state TLogData self(tlogId, workerID, persistentData, persistentQueue, db, degraded, folder);
|
||||
state Future<Void> error = actorCollection(self.sharedActors.getFuture());
|
||||
|
||||
TraceEvent("SharedTlog", tlogId).log();
|
||||
TraceEvent("SharedTlog", tlogId);
|
||||
try {
|
||||
try {
|
||||
if (restoreFromDisk) {
|
||||
wait(restorePersistentState(&self, locality, oldLog, recovered, tlogRequests));
|
||||
} else {
|
||||
wait(ioTimeoutError(checkEmptyQueue(&self) && checkRecovered(&self),
|
||||
wait(ioTimeoutError(checkEmptyQueue(&self) && initPersistentStorage(&self),
|
||||
SERVER_KNOBS->TLOG_MAX_CREATE_DURATION));
|
||||
}
|
||||
|
||||
|
@ -3593,9 +3600,9 @@ ACTOR Future<Void> tLog(IKeyValueStore* persistentData,
|
|||
if (recovered.canBeSet())
|
||||
recovered.send(Void());
|
||||
|
||||
// if (!self.durableClusterId.isValid()) {
|
||||
// self.sharedActors.send(updateDurableClusterID(&self));
|
||||
// }
|
||||
if (!self.durableClusterId.isValid()) {
|
||||
self.sharedActors.send(updateDurableClusterID(&self));
|
||||
}
|
||||
self.sharedActors.send(commitQueue(&self));
|
||||
self.sharedActors.send(updateStorageLoop(&self));
|
||||
self.sharedActors.send(traceRole(Role::SHARED_TRANSACTION_LOG, tlogId));
|
||||
|
|
Loading…
Reference in New Issue