foundationdb/fdbserver/LogSystemDiskQueueAdapter.a...

255 lines
9.6 KiB
C++

/*
* LogSystemDiskQueueAdapter.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbserver/IDiskQueue.h"
#include "fdbserver/LogSystem.h"
#include "fdbserver/LogSystemDiskQueueAdapter.h"
#include "fdbserver/Knobs.h"
#include "flow/actorcompiler.h" // has to be last include
class LogSystemDiskQueueAdapterImpl {
public:
ACTOR static Future<Standalone<StringRef>> readNext(LogSystemDiskQueueAdapter* self, int bytes) {
while (self->recoveryQueueDataSize < bytes) {
if (self->recoveryLoc == self->logSystem->getEnd()) {
// Recovery will be complete once the current recoveryQueue is consumed, so we no longer need
// self->logSystem
TraceEvent("PeekNextEnd")
.detail("Queue", self->recoveryQueue.size())
.detail("Bytes", bytes)
.detail("Loc", self->recoveryLoc)
.detail("End", self->logSystem->getEnd());
self->logSystem.clear();
break;
}
if (!self->cursor->hasMessage()) {
loop {
choose {
when(wait(self->cursor->getMore())) { break; }
when(wait(self->localityChanged)) {
self->cursor = self->logSystem->peekTxs(
UID(),
self->recoveryLoc,
self->peekLocality ? self->peekLocality->get().primaryLocality : tagLocalityInvalid,
self->peekLocality ? self->peekLocality->get().knownCommittedVersion : invalidVersion,
self->totalRecoveredBytes == 0);
self->localityChanged = self->peekLocality->onChange();
}
when(wait(delay(self->peekTypeSwitches == 0
? SERVER_KNOBS->DISK_QUEUE_ADAPTER_MIN_SWITCH_TIME
: SERVER_KNOBS->DISK_QUEUE_ADAPTER_MAX_SWITCH_TIME))) {
self->peekTypeSwitches++;
if (self->peekTypeSwitches % 3 == 1) {
self->cursor = self->logSystem->peekTxs(UID(),
self->recoveryLoc,
tagLocalityInvalid,
invalidVersion,
self->totalRecoveredBytes == 0);
self->localityChanged = Never();
} else if (self->peekTypeSwitches % 3 == 2) {
self->cursor = self->logSystem->peekTxs(
UID(),
self->recoveryLoc,
self->peekLocality ? self->peekLocality->get().secondaryLocality
: tagLocalityInvalid,
self->peekLocality ? self->peekLocality->get().knownCommittedVersion
: invalidVersion,
self->totalRecoveredBytes == 0);
self->localityChanged = self->peekLocality->onChange();
} else {
self->cursor = self->logSystem->peekTxs(
UID(),
self->recoveryLoc,
self->peekLocality ? self->peekLocality->get().primaryLocality : tagLocalityInvalid,
self->peekLocality ? self->peekLocality->get().knownCommittedVersion
: invalidVersion,
self->totalRecoveredBytes == 0);
self->localityChanged = self->peekLocality->onChange();
}
}
}
}
TraceEvent("PeekNextGetMore")
.detail("Total", self->totalRecoveredBytes)
.detail("Queue", self->recoveryQueue.size())
.detail("Bytes", bytes)
.detail("Loc", self->recoveryLoc)
.detail("End", self->logSystem->getEnd())
.detail("HasMessage", self->cursor->hasMessage())
.detail("Version", self->cursor->version().version);
bool buggify = !self->hasDiscardedData && BUGGIFY_WITH_PROB(0.01);
if (self->cursor->popped() != 0 || buggify) {
TraceEvent(SevWarnAlways, "DiskQueueAdapterReset")
.detail("Version", self->cursor->popped())
.detail("PeekTypeSwitch", self->peekTypeSwitches % 3);
CODE_PROBE(true, "disk adapter reset");
if (self->cursor->popped() != 0) {
self->recoveryLoc = self->cursor->popped();
} else {
self->recoveryLoc = self->startLoc;
}
self->recoveryQueue.clear();
self->recoveryQueueDataSize = 0;
self->recoveryQueueLoc = self->recoveryLoc;
self->totalRecoveredBytes = 0;
if (self->peekTypeSwitches % 3 == 1) {
self->cursor = self->logSystem->peekTxs(
UID(), self->recoveryLoc, tagLocalityInvalid, invalidVersion, true);
self->localityChanged = Never();
} else if (self->peekTypeSwitches % 3 == 2) {
self->cursor = self->logSystem->peekTxs(
UID(),
self->recoveryLoc,
self->peekLocality ? self->peekLocality->get().secondaryLocality : tagLocalityInvalid,
self->peekLocality ? self->peekLocality->get().knownCommittedVersion : invalidVersion,
true);
self->localityChanged = self->peekLocality->onChange();
} else {
self->cursor = self->logSystem->peekTxs(
UID(),
self->recoveryLoc,
self->peekLocality ? self->peekLocality->get().primaryLocality : tagLocalityInvalid,
self->peekLocality ? self->peekLocality->get().knownCommittedVersion : invalidVersion,
true);
self->localityChanged = self->peekLocality->onChange();
}
self->hasDiscardedData = true;
throw disk_adapter_reset();
}
if (self->recoveryQueueDataSize == 0) {
self->recoveryQueueLoc = self->recoveryLoc;
}
if (!self->cursor->hasMessage()) {
self->recoveryLoc = self->cursor->version().version;
wait(yield());
continue;
}
}
self->recoveryQueue.push_back(Standalone<StringRef>(self->cursor->getMessage(), self->cursor->arena()));
self->recoveryQueueDataSize += self->recoveryQueue.back().size();
self->totalRecoveredBytes += self->recoveryQueue.back().size();
self->cursor->nextMessage();
if (!self->cursor->hasMessage())
self->recoveryLoc = self->cursor->version().version;
//TraceEvent("PeekNextResults").detail("From", self->recoveryLoc).detail("Queue", self->recoveryQueue.size()).detail("Bytes", bytes).detail("Has", self->cursor->hasMessage()).detail("End", self->logSystem->getEnd());
}
if (self->recoveryQueue.size() > 1) {
self->recoveryQueue[0] = concatenate(self->recoveryQueue.begin(), self->recoveryQueue.end());
self->recoveryQueue.resize(1);
}
if (self->recoveryQueueDataSize == 0)
return Standalone<StringRef>();
ASSERT(self->recoveryQueue[0].size() == self->recoveryQueueDataSize);
//TraceEvent("PeekNextReturn").detail("Bytes", bytes).detail("QueueSize", self->recoveryQueue.size());
bytes = std::min(bytes, self->recoveryQueue[0].size());
Standalone<StringRef> result(self->recoveryQueue[0].substr(0, bytes), self->recoveryQueue[0].arena());
self->recoveryQueue[0].contents() = self->recoveryQueue[0].substr(bytes);
self->recoveryQueueDataSize = self->recoveryQueue[0].size();
if (self->recoveryQueue[0].size() == 0) {
self->recoveryQueue.clear();
}
return result;
}
};
Future<Standalone<StringRef>> LogSystemDiskQueueAdapter::readNext(int bytes) {
if (!enableRecovery)
return Standalone<StringRef>();
return LogSystemDiskQueueAdapterImpl::readNext(this, bytes);
}
IDiskQueue::location LogSystemDiskQueueAdapter::getNextReadLocation() const {
return IDiskQueue::location(0, recoveryQueueLoc);
}
IDiskQueue::location LogSystemDiskQueueAdapter::push(StringRef contents) {
while (contents.size()) {
int remainder = pushedData.size() == 0 ? 0 : pushedData.back().capacity() - pushedData.back().size();
if (remainder == 0) {
VectorRef<uint8_t> block;
block.reserve(pushedData.arena(), SERVER_KNOBS->LOG_SYSTEM_PUSHED_DATA_BLOCK_SIZE);
pushedData.push_back(pushedData.arena(), block);
remainder = block.capacity();
}
pushedData.back().append(pushedData.arena(), contents.begin(), std::min(remainder, contents.size()));
contents = contents.substr(std::min(remainder, contents.size()));
}
return IDiskQueue::location(0, nextCommit);
}
void LogSystemDiskQueueAdapter::pop(location upTo) {
ASSERT(upTo.hi == 0);
poppedUpTo = std::max(upTo.lo, poppedUpTo);
}
Future<Void> LogSystemDiskQueueAdapter::commit() {
ASSERT(!commitMessages.empty());
auto promise = commitMessages.front();
commitMessages.pop_front();
CommitMessage cm;
cm.messages = this->pushedData;
this->pushedData = Standalone<VectorRef<VectorRef<uint8_t>>>();
cm.popTo = poppedUpTo;
promise.send(cm);
return cm.acknowledge.getFuture();
}
Future<Void> LogSystemDiskQueueAdapter::getError() const {
return Void();
}
Future<Void> LogSystemDiskQueueAdapter::onClosed() const {
return Void();
}
void LogSystemDiskQueueAdapter::dispose() {
delete this;
}
void LogSystemDiskQueueAdapter::close() {
delete this;
}
Future<LogSystemDiskQueueAdapter::CommitMessage> LogSystemDiskQueueAdapter::getCommitMessage() {
Promise<CommitMessage> pcm;
commitMessages.push_back(pcm);
return pcm.getFuture();
}
LogSystemDiskQueueAdapter* openDiskQueueAdapter(Reference<ILogSystem> logSystem,
Reference<AsyncVar<PeekTxsInfo>> peekLocality,
Version txsPoppedVersion) {
return new LogSystemDiskQueueAdapter(logSystem, peekLocality, txsPoppedVersion, true);
}