DiskQueue should only pop based off of persisted popped tag versions

This commit is to fix a bug where popping a tag between
updatePersistentData and popDiskQueue can cause the TLog to recover to
an incorrect understanding of what data it has available.

The following series of events need to happen to trigger this bug:

    Tag 1:1 is popped to version 10
    updatePersistentData is run...
      updatePersistentPopped runs and we persistentData stores 1:1 as popped to 10
      A mutation is spilled for 1:1 at version 11 at location 1000
      A mutation is spilled for 1:1 at version 21 at location 5000
    updatePersistentData finishes and commits the btree changes
    Tag 1:1 is popped to version 20
    popDiskQueue runs
      The btree is read for spilled mutations with version >=20
      The minimum location required for the disk queue is found to be location 5000
      The disk queue is popped to location 5000

    The TLog crashes

    The worker restarts, and reloads the TLog files from disk
    restorePersistentPopped restores tag 1:1 as having been popped to version 10
    Parallel peeks are received for tag 1:1 starting at version 0
      The first peek is less than the popped version, so we respond with no data, and an end version of 10
      The second peek starts at version 10, which is greater than the popped version
      The btree is read for spilled mutations, and we find that there is a mutation at version 11 at location 1000
      Location 1000 is read in the DiskQueue

The resulting page read at Location 1000 was popped pre-crash, and thus
might either (a) be corrupt or (b) have an incorrect sequence number.

The fix to this is to force popDiskQueue/updatePoppedLocation to use the
popped version that was persisted to disk, and not the most recently
popped version for the given tag.

This bug doesn't manifest in simulation, because we don't have any code
that peeks at a lower version than what has been popped.
This commit is contained in:
Alex Miller 2019-12-12 17:16:22 -08:00
parent 05ac4a38f8
commit b36062a509
2 changed files with 26 additions and 14 deletions

View File

@ -366,19 +366,21 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
bool nothingPersistent; // true means tag is *known* to have no messages in persistentData. false means nothing.
bool poppedRecently; // `popped` has changed since last updatePersistentData
Version popped; // see popped version tracking contract below
Version persistentPopped; // The popped version recorded in the btree.
bool requiresPoppedLocationUpdate; // `popped` has changed since last updatePoppedLocation
IDiskQueue::location poppedLocation; // The location of the earliest commit with data for this tag.
bool unpoppedRecovered;
Tag tag;
TagData( Tag tag, Version popped, IDiskQueue::location poppedLocation, bool nothingPersistent, bool poppedRecently, bool unpoppedRecovered ) : tag(tag), nothingPersistent(nothingPersistent), poppedRecently(poppedRecently), popped(popped), requiresPoppedLocationUpdate(false), poppedLocation(poppedLocation), unpoppedRecovered(unpoppedRecovered) {}
TagData( Tag tag, Version popped, IDiskQueue::location poppedLocation, bool nothingPersistent, bool poppedRecently, bool unpoppedRecovered ) : tag(tag), nothingPersistent(nothingPersistent), poppedRecently(poppedRecently), popped(popped), persistentPopped(0), requiresPoppedLocationUpdate(false), poppedLocation(poppedLocation), unpoppedRecovered(unpoppedRecovered) {}
TagData(TagData&& r) BOOST_NOEXCEPT : versionMessages(std::move(r.versionMessages)), nothingPersistent(r.nothingPersistent), poppedRecently(r.poppedRecently), popped(r.popped), requiresPoppedLocationUpdate(r.requiresPoppedLocationUpdate), poppedLocation(r.poppedLocation), tag(r.tag), unpoppedRecovered(r.unpoppedRecovered) {}
TagData(TagData&& r) BOOST_NOEXCEPT : versionMessages(std::move(r.versionMessages)), nothingPersistent(r.nothingPersistent), poppedRecently(r.poppedRecently), popped(r.popped), persistentPopped(r.persistentPopped), requiresPoppedLocationUpdate(r.requiresPoppedLocationUpdate), poppedLocation(r.poppedLocation), tag(r.tag), unpoppedRecovered(r.unpoppedRecovered) {}
void operator= (TagData&& r) BOOST_NOEXCEPT {
versionMessages = std::move(r.versionMessages);
nothingPersistent = r.nothingPersistent;
poppedRecently = r.poppedRecently;
popped = r.popped;
persistentPopped = r.persistentPopped;
requiresPoppedLocationUpdate = r.requiresPoppedLocationUpdate;
poppedLocation = r.poppedLocation;
tag = r.tag;
@ -663,6 +665,7 @@ void updatePersistentPopped( TLogData* self, Reference<LogData> logData, Referen
if (!data->poppedRecently) return;
self->persistentData->set(KeyValueRef( persistTagPoppedKey(logData->logId, data->tag), persistTagPoppedValue(data->popped) ));
data->poppedRecently = false;
data->persistentPopped = data->popped;
if (data->nothingPersistent) return;
@ -690,11 +693,13 @@ ACTOR Future<Void> updatePoppedLocation( TLogData* self, Reference<LogData> logD
if (!data->requiresPoppedLocationUpdate) return Void();
data->requiresPoppedLocationUpdate = false;
if (data->popped <= logData->persistentDataVersion) {
// Use persistentPopped and not popped, so that a pop update received after spilling doesn't cause
// us to remove data that still is pointed to by SpilledData in the btree.
if (data->persistentPopped <= logData->persistentDataVersion) {
// Recover the next needed location in the Disk Queue from the index.
Standalone<VectorRef<KeyValueRef>> kvrefs = wait(
self->persistentData->readRange(KeyRangeRef(
persistTagMessageRefsKey(logData->logId, data->tag, data->popped),
persistTagMessageRefsKey(logData->logId, data->tag, data->persistentPopped),
persistTagMessageRefsKey(logData->logId, data->tag, logData->persistentDataVersion + 1)), 1));
if (kvrefs.empty()) {
@ -706,7 +711,7 @@ ACTOR Future<Void> updatePoppedLocation( TLogData* self, Reference<LogData> logD
r >> spilledData;
for (const SpilledData& sd : spilledData) {
if (sd.version >= data->popped) {
if (sd.version >= data->persistentPopped) {
data->poppedLocation = sd.start;
break;
}
@ -714,9 +719,9 @@ ACTOR Future<Void> updatePoppedLocation( TLogData* self, Reference<LogData> logD
}
}
if (data->popped >= logData->persistentDataVersion || data->nothingPersistent) {
if (data->persistentPopped >= logData->persistentDataVersion || data->nothingPersistent) {
// Then the location must be in memory.
auto locationIter = logData->versionLocation.lower_bound(data->popped);
auto locationIter = logData->versionLocation.lower_bound(data->persistentPopped);
if (locationIter != logData->versionLocation.end()) {
data->poppedLocation = locationIter->value.first;
} else {
@ -2462,6 +2467,7 @@ ACTOR Future<Void> restorePersistentState( TLogData* self, LocalityData locality
auto tagData = logData->getTagData(tag);
ASSERT( !tagData );
logData->createTagData(tag, popped, false, false, false);
logData->getTagData(tag)->persistentPopped = popped;
}
}
}

View File

@ -365,19 +365,21 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
bool nothingPersistent; // true means tag is *known* to have no messages in persistentData. false means nothing.
bool poppedRecently; // `popped` has changed since last updatePersistentData
Version popped; // see popped version tracking contract below
Version persistentPopped; // The popped version recorded in the btree.
bool requiresPoppedLocationUpdate; // `popped` has changed since last updatePoppedLocation
IDiskQueue::location poppedLocation; // The location of the earliest commit with data for this tag.
bool unpoppedRecovered;
Tag tag;
TagData( Tag tag, Version popped, IDiskQueue::location poppedLocation, bool nothingPersistent, bool poppedRecently, bool unpoppedRecovered ) : tag(tag), nothingPersistent(nothingPersistent), poppedRecently(poppedRecently), popped(popped), requiresPoppedLocationUpdate(false), poppedLocation(poppedLocation), unpoppedRecovered(unpoppedRecovered) {}
TagData( Tag tag, Version popped, IDiskQueue::location poppedLocation, bool nothingPersistent, bool poppedRecently, bool unpoppedRecovered ) : tag(tag), nothingPersistent(nothingPersistent), poppedRecently(poppedRecently), popped(popped), persistentPopped(0), requiresPoppedLocationUpdate(false), poppedLocation(poppedLocation), unpoppedRecovered(unpoppedRecovered) {}
TagData(TagData&& r) BOOST_NOEXCEPT : versionMessages(std::move(r.versionMessages)), nothingPersistent(r.nothingPersistent), poppedRecently(r.poppedRecently), popped(r.popped), requiresPoppedLocationUpdate(r.requiresPoppedLocationUpdate), poppedLocation(r.poppedLocation), tag(r.tag), unpoppedRecovered(r.unpoppedRecovered) {}
TagData(TagData&& r) BOOST_NOEXCEPT : versionMessages(std::move(r.versionMessages)), nothingPersistent(r.nothingPersistent), poppedRecently(r.poppedRecently), popped(r.popped), persistentPopped(r.persistentPopped), requiresPoppedLocationUpdate(r.requiresPoppedLocationUpdate), poppedLocation(r.poppedLocation), tag(r.tag), unpoppedRecovered(r.unpoppedRecovered) {}
void operator= (TagData&& r) BOOST_NOEXCEPT {
versionMessages = std::move(r.versionMessages);
nothingPersistent = r.nothingPersistent;
poppedRecently = r.poppedRecently;
popped = r.popped;
persistentPopped = r.persistentPopped;
requiresPoppedLocationUpdate = r.requiresPoppedLocationUpdate;
poppedLocation = r.poppedLocation;
tag = r.tag;
@ -680,6 +682,7 @@ void updatePersistentPopped( TLogData* self, Reference<LogData> logData, Referen
if (!data->poppedRecently) return;
self->persistentData->set(KeyValueRef( persistTagPoppedKey(logData->logId, data->tag), persistTagPoppedValue(data->popped) ));
data->poppedRecently = false;
data->persistentPopped = data->popped;
if (data->nothingPersistent) return;
@ -707,11 +710,13 @@ ACTOR Future<Void> updatePoppedLocation( TLogData* self, Reference<LogData> logD
if (!data->requiresPoppedLocationUpdate) return Void();
data->requiresPoppedLocationUpdate = false;
if (data->popped <= logData->persistentDataVersion) {
// Use persistentPopped and not popped, so that a pop update received after spilling doesn't cause
// us to remove data that still is pointed to by SpilledData in the btree.
if (data->persistentPopped <= logData->persistentDataVersion) {
// Recover the next needed location in the Disk Queue from the index.
Standalone<VectorRef<KeyValueRef>> kvrefs = wait(
self->persistentData->readRange(KeyRangeRef(
persistTagMessageRefsKey(logData->logId, data->tag, data->popped),
persistTagMessageRefsKey(logData->logId, data->tag, data->persistentPopped),
persistTagMessageRefsKey(logData->logId, data->tag, logData->persistentDataVersion + 1)), 1));
if (kvrefs.empty()) {
@ -723,7 +728,7 @@ ACTOR Future<Void> updatePoppedLocation( TLogData* self, Reference<LogData> logD
r >> spilledData;
for (const SpilledData& sd : spilledData) {
if (sd.version >= data->popped) {
if (sd.version >= data->persistentPopped) {
data->poppedLocation = sd.start;
break;
}
@ -731,9 +736,9 @@ ACTOR Future<Void> updatePoppedLocation( TLogData* self, Reference<LogData> logD
}
}
if (data->popped >= logData->persistentDataVersion || data->nothingPersistent) {
if (data->persistentPopped >= logData->persistentDataVersion || data->nothingPersistent) {
// Then the location must be in memory.
auto locationIter = logData->versionLocation.lower_bound(data->popped);
auto locationIter = logData->versionLocation.lower_bound(data->persistentPopped);
if (locationIter != logData->versionLocation.end()) {
data->poppedLocation = locationIter->value.first;
} else {
@ -2484,6 +2489,7 @@ ACTOR Future<Void> restorePersistentState( TLogData* self, LocalityData locality
auto tagData = logData->getTagData(tag);
ASSERT( !tagData );
logData->createTagData(tag, popped, false, false, false);
logData->getTagData(tag)->persistentPopped = popped;
}
}
}