Merge commit '8733614f632590f67cd52eeb73c800e4a25ab143' into storageserver-pml

This commit is contained in:
Steve Atherton 2022-10-07 22:58:37 -07:00
commit 1847f801b7
6 changed files with 23 additions and 20 deletions

View File

@ -295,7 +295,10 @@ struct CommitTransactionRef {
serializer(ar, report_conflicting_keys);
}
if (ar.protocolVersion().hasResolverPrivateMutations()) {
serializer(ar, lock_aware, spanContext);
serializer(ar, lock_aware);
}
if (ar.protocolVersion().hasOTELSpanContext()) {
serializer(ar, spanContext);
}
}
}

View File

@ -1633,6 +1633,7 @@ ACTOR Future<Void> tLog(IKeyValueStore* persistentData,
TraceEvent("SharedTlog", tlogId).detail("Version", "4.6");
try {
wait(ioTimeoutError(persistentData->init(), SERVER_KNOBS->TLOG_MAX_CREATE_DURATION));
wait(restorePersistentState(&self, locality));
self.sharedActors.send(cleanupPeekTrackers(&self));

View File

@ -1676,7 +1676,6 @@ ACTOR Future<Void> initPersistentState(TLogData* self, Reference<LogData> logDat
// PERSIST: Initial setup of persistentData for a brand new tLog for a new database
state IKeyValueStore* storage = self->persistentData;
wait(ioTimeoutError(storage->init(), SERVER_KNOBS->TLOG_MAX_CREATE_DURATION));
storage->set(persistFormat);
storage->set(
KeyValueRef(BinaryWriter::toValue(logData->logId, Unversioned()).withPrefix(persistCurrentVersionKeys.begin),
@ -2295,7 +2294,6 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
TraceEvent("TLogRestorePersistentState", self->dbgid).log();
state IKeyValueStore* storage = self->persistentData;
wait(storage->init());
state Future<Optional<Value>> fFormat = storage->readValue(persistFormat.key);
state Future<RangeResult> fVers = storage->readRange(persistCurrentVersionKeys);
state Future<RangeResult> fKnownCommitted = storage->readRange(persistKnownCommittedVersionKeys);
@ -2803,6 +2801,8 @@ ACTOR Future<Void> tLog(IKeyValueStore* persistentData,
TraceEvent("SharedTlog", tlogId).detail("Version", "6.0");
try {
wait(ioTimeoutError(persistentData->init(), SERVER_KNOBS->TLOG_MAX_CREATE_DURATION));
if (restoreFromDisk) {
wait(restorePersistentState(&self, locality, oldLog, recovered, tlogRequests));
} else {

View File

@ -2117,7 +2117,6 @@ ACTOR Future<Void> initPersistentState(TLogData* self, Reference<LogData> logDat
// PERSIST: Initial setup of persistentData for a brand new tLog for a new database
state IKeyValueStore* storage = self->persistentData;
wait(storage->init());
storage->set(persistFormat);
storage->set(
KeyValueRef(BinaryWriter::toValue(logData->logId, Unversioned()).withPrefix(persistCurrentVersionKeys.begin),
@ -2759,7 +2758,6 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
TraceEvent("TLogRestorePersistentState", self->dbgid).log();
state IKeyValueStore* storage = self->persistentData;
wait(storage->init());
state Future<Optional<Value>> fFormat = storage->readValue(persistFormat.key);
state Future<Optional<Value>> fRecoveryLocation = storage->readValue(persistRecoveryLocationKey);
state Future<RangeResult> fVers = storage->readRange(persistCurrentVersionKeys);
@ -3293,6 +3291,8 @@ ACTOR Future<Void> tLog(IKeyValueStore* persistentData,
TraceEvent("SharedTlog", tlogId).detail("Version", "6.2");
try {
wait(ioTimeoutError(persistentData->init(), SERVER_KNOBS->TLOG_MAX_CREATE_DURATION));
if (restoreFromDisk) {
wait(restorePersistentState(&self, locality, oldLog, recovered, tlogRequests));
} else {

View File

@ -2996,7 +2996,6 @@ ACTOR Future<Void> initPersistentStorage(TLogData* self) {
// PERSIST: Initial setup of persistentData for a brand new tLog for a new database
state IKeyValueStore* storage = self->persistentData;
wait(storage->init());
storage->set(persistFormat);
wait(storage->commit());
@ -3019,7 +3018,6 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
TraceEvent("TLogRestorePersistentState", self->dbgid).log();
state IKeyValueStore* storage = self->persistentData;
wait(storage->init());
state Future<Optional<Value>> fFormat = storage->readValue(persistFormat.key);
state Future<Optional<Value>> fRecoveryLocation = storage->readValue(persistRecoveryLocationKey);
state Future<Optional<Value>> fClusterId = storage->readValue(persistClusterIdKey);
@ -3587,6 +3585,8 @@ ACTOR Future<Void> tLog(IKeyValueStore* persistentData,
TraceEvent("SharedTlog", tlogId);
try {
try {
wait(ioTimeoutError(persistentData->init(), SERVER_KNOBS->TLOG_MAX_CREATE_DURATION));
if (restoreFromDisk) {
wait(restorePersistentState(&self, locality, oldLog, recovered, tlogRequests));
} else {

View File

@ -65,8 +65,8 @@ static double g_debugStart = 0;
static FILE* g_debugStream = stdout;
#define debug_printf_always(...) \
if (now() >= g_debugStart && \
(!g_network->getLocalAddress().isValid() || g_network->getLocalAddress() == g_debugAddress)) { \
if (now() >= g_debugStart && (!g_network->getLocalAddress().isValid() || !g_debugAddress.isValid() || \
g_network->getLocalAddress() == g_debugAddress)) { \
std::string prefix = format("%s %f %04d ", g_network->getLocalAddress().toString().c_str(), now(), __LINE__); \
std::string msg = format(__VA_ARGS__); \
fputs(addPrefix(prefix, msg).c_str(), g_debugStream); \
@ -5199,7 +5199,11 @@ public:
m_latestCommit.cancel();
}
Future<Void> commit(Version v) { return commit_impl(this, v); }
Future<Void> commit(Version v) {
// Replace latest commit with a new one which waits on the old one
m_latestCommit = commit_impl(this, v, m_latestCommit);
return m_latestCommit;
}
// Clear all btree data, allow pager remap to fully process its queue, and verify final
// page counts in pager and queues.
@ -5474,6 +5478,7 @@ private:
struct CommitBatch {
Version readVersion;
Version writeVersion;
Version newOldestVersion;
std::unique_ptr<MutationBuffer> mutations;
int64_t mutationCount;
Reference<IPagerSnapshot> snapshot;
@ -7431,7 +7436,7 @@ private:
}
}
ACTOR static Future<Void> commit_impl(VersionedBTree* self, Version writeVersion) {
ACTOR static Future<Void> commit_impl(VersionedBTree* self, Version writeVersion, Future<Void> previousCommit) {
// Take ownership of the current mutation buffer and make a new one
state CommitBatch batch;
batch.mutations = std::move(self->m_pBuffer);
@ -7440,11 +7445,7 @@ private:
self->m_mutationCount = 0;
batch.writeVersion = writeVersion;
// Replace the lastCommit future with a new one and then wait on the old one
state Promise<Void> committed;
Future<Void> previousCommit = self->m_latestCommit;
self->m_latestCommit = committed.getFuture();
batch.newOldestVersion = self->m_newOldestVersion;
// Wait for the latest commit to be finished.
wait(previousCommit);
@ -7454,20 +7455,19 @@ private:
if (writeVersion == self->m_pager->getLastCommittedVersion()) {
ASSERT(batch.mutationCount == 0);
debug_printf("%s: Empty commit at repeat version %" PRId64 "\n", self->m_name.c_str(), batch.writeVersion);
committed.send(Void());
return Void();
}
// For this commit, use the latest snapshot that was just committed.
batch.readVersion = self->m_pager->getLastCommittedVersion();
self->m_pager->setOldestReadableVersion(self->m_newOldestVersion);
self->m_pager->setOldestReadableVersion(batch.newOldestVersion);
debug_printf("%s: Beginning commit of version %" PRId64 ", read version %" PRId64
", new oldest version set to %" PRId64 "\n",
self->m_name.c_str(),
batch.writeVersion,
batch.readVersion,
self->m_newOldestVersion);
batch.newOldestVersion);
batch.snapshot = self->m_pager->getReadSnapshot(batch.readVersion);
@ -7525,7 +7525,6 @@ private:
++g_redwoodMetrics.metric.opCommit;
self->m_lazyClearActor = incrementalLazyClear(self);
committed.send(Void());
return Void();
}