Merge pull request #2481 from alexmiller-apple/dq-pop-fixes
Fix an issue where a peek starting from version 1 could crash a TLog
This commit is contained in:
commit
db953cc275
|
@ -366,20 +366,22 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
|
|||
bool nothingPersistent; // true means tag is *known* to have no messages in persistentData. false means nothing.
|
||||
bool poppedRecently; // `popped` has changed since last updatePersistentData
|
||||
Version popped; // see popped version tracking contract below
|
||||
bool requiresPoppedLocationUpdate; // `popped` has changed since last updatePoppedLocation
|
||||
Version persistentPopped; // The popped version recorded in the btree.
|
||||
Version versionForPoppedLocation; // `poppedLocation` was calculated at this popped version
|
||||
IDiskQueue::location poppedLocation; // The location of the earliest commit with data for this tag.
|
||||
bool unpoppedRecovered;
|
||||
Tag tag;
|
||||
|
||||
TagData( Tag tag, Version popped, IDiskQueue::location poppedLocation, bool nothingPersistent, bool poppedRecently, bool unpoppedRecovered ) : tag(tag), nothingPersistent(nothingPersistent), poppedRecently(poppedRecently), popped(popped), requiresPoppedLocationUpdate(false), poppedLocation(poppedLocation), unpoppedRecovered(unpoppedRecovered) {}
|
||||
TagData( Tag tag, Version popped, IDiskQueue::location poppedLocation, bool nothingPersistent, bool poppedRecently, bool unpoppedRecovered ) : tag(tag), nothingPersistent(nothingPersistent), poppedRecently(poppedRecently), popped(popped), persistentPopped(0), versionForPoppedLocation(0), poppedLocation(poppedLocation), unpoppedRecovered(unpoppedRecovered) {}
|
||||
|
||||
TagData(TagData&& r) BOOST_NOEXCEPT : versionMessages(std::move(r.versionMessages)), nothingPersistent(r.nothingPersistent), poppedRecently(r.poppedRecently), popped(r.popped), requiresPoppedLocationUpdate(r.requiresPoppedLocationUpdate), poppedLocation(r.poppedLocation), tag(r.tag), unpoppedRecovered(r.unpoppedRecovered) {}
|
||||
TagData(TagData&& r) BOOST_NOEXCEPT : versionMessages(std::move(r.versionMessages)), nothingPersistent(r.nothingPersistent), poppedRecently(r.poppedRecently), popped(r.popped), persistentPopped(r.persistentPopped), versionForPoppedLocation(r.versionForPoppedLocation), poppedLocation(r.poppedLocation), tag(r.tag), unpoppedRecovered(r.unpoppedRecovered) {}
|
||||
void operator= (TagData&& r) BOOST_NOEXCEPT {
|
||||
versionMessages = std::move(r.versionMessages);
|
||||
nothingPersistent = r.nothingPersistent;
|
||||
poppedRecently = r.poppedRecently;
|
||||
popped = r.popped;
|
||||
requiresPoppedLocationUpdate = r.requiresPoppedLocationUpdate;
|
||||
persistentPopped = r.persistentPopped;
|
||||
versionForPoppedLocation = r.versionForPoppedLocation;
|
||||
poppedLocation = r.poppedLocation;
|
||||
tag = r.tag;
|
||||
unpoppedRecovered = r.unpoppedRecovered;
|
||||
|
@ -663,6 +665,7 @@ void updatePersistentPopped( TLogData* self, Reference<LogData> logData, Referen
|
|||
if (!data->poppedRecently) return;
|
||||
self->persistentData->set(KeyValueRef( persistTagPoppedKey(logData->logId, data->tag), persistTagPoppedValue(data->popped) ));
|
||||
data->poppedRecently = false;
|
||||
data->persistentPopped = data->popped;
|
||||
|
||||
if (data->nothingPersistent) return;
|
||||
|
||||
|
@ -687,14 +690,16 @@ ACTOR Future<Void> updatePoppedLocation( TLogData* self, Reference<LogData> logD
|
|||
return Void();
|
||||
}
|
||||
|
||||
if (!data->requiresPoppedLocationUpdate) return Void();
|
||||
data->requiresPoppedLocationUpdate = false;
|
||||
if (data->versionForPoppedLocation >= data->persistentPopped) return Void();
|
||||
data->versionForPoppedLocation = data->persistentPopped;
|
||||
|
||||
if (data->popped <= logData->persistentDataVersion) {
|
||||
// Use persistentPopped and not popped, so that a pop update received after spilling doesn't cause
|
||||
// us to remove data that still is pointed to by SpilledData in the btree.
|
||||
if (data->persistentPopped <= logData->persistentDataVersion) {
|
||||
// Recover the next needed location in the Disk Queue from the index.
|
||||
Standalone<VectorRef<KeyValueRef>> kvrefs = wait(
|
||||
self->persistentData->readRange(KeyRangeRef(
|
||||
persistTagMessageRefsKey(logData->logId, data->tag, data->popped),
|
||||
persistTagMessageRefsKey(logData->logId, data->tag, data->persistentPopped),
|
||||
persistTagMessageRefsKey(logData->logId, data->tag, logData->persistentDataVersion + 1)), 1));
|
||||
|
||||
if (kvrefs.empty()) {
|
||||
|
@ -706,19 +711,21 @@ ACTOR Future<Void> updatePoppedLocation( TLogData* self, Reference<LogData> logD
|
|||
r >> spilledData;
|
||||
|
||||
for (const SpilledData& sd : spilledData) {
|
||||
if (sd.version >= data->popped) {
|
||||
if (sd.version >= data->persistentPopped) {
|
||||
data->poppedLocation = sd.start;
|
||||
data->versionForPoppedLocation = sd.version;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (data->popped >= logData->persistentDataVersion || data->nothingPersistent) {
|
||||
if (data->persistentPopped >= logData->persistentDataVersion || data->nothingPersistent) {
|
||||
// Then the location must be in memory.
|
||||
auto locationIter = logData->versionLocation.lower_bound(data->popped);
|
||||
auto locationIter = logData->versionLocation.lower_bound(data->persistentPopped);
|
||||
if (locationIter != logData->versionLocation.end()) {
|
||||
data->poppedLocation = locationIter->value.first;
|
||||
data->versionForPoppedLocation = locationIter->key;
|
||||
} else {
|
||||
// No data on disk and no data in RAM.
|
||||
// This TLog instance will be removed soon anyway, so we temporarily freeze our poppedLocation
|
||||
|
@ -1240,7 +1247,6 @@ ACTOR Future<Void> tLogPopCore( TLogData* self, Tag inputTag, Version to, Refere
|
|||
} else if (upTo > tagData->popped) {
|
||||
tagData->popped = upTo;
|
||||
tagData->poppedRecently = true;
|
||||
tagData->requiresPoppedLocationUpdate = true;
|
||||
|
||||
if(tagData->unpoppedRecovered && upTo > logData->recoveredAt) {
|
||||
tagData->unpoppedRecovered = false;
|
||||
|
@ -2462,6 +2468,7 @@ ACTOR Future<Void> restorePersistentState( TLogData* self, LocalityData locality
|
|||
auto tagData = logData->getTagData(tag);
|
||||
ASSERT( !tagData );
|
||||
logData->createTagData(tag, popped, false, false, false);
|
||||
logData->getTagData(tag)->persistentPopped = popped;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -365,20 +365,22 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
|
|||
bool nothingPersistent; // true means tag is *known* to have no messages in persistentData. false means nothing.
|
||||
bool poppedRecently; // `popped` has changed since last updatePersistentData
|
||||
Version popped; // see popped version tracking contract below
|
||||
bool requiresPoppedLocationUpdate; // `popped` has changed since last updatePoppedLocation
|
||||
Version persistentPopped; // The popped version recorded in the btree.
|
||||
Version versionForPoppedLocation; // `poppedLocation` was calculated at this popped version
|
||||
IDiskQueue::location poppedLocation; // The location of the earliest commit with data for this tag.
|
||||
bool unpoppedRecovered;
|
||||
Tag tag;
|
||||
|
||||
TagData( Tag tag, Version popped, IDiskQueue::location poppedLocation, bool nothingPersistent, bool poppedRecently, bool unpoppedRecovered ) : tag(tag), nothingPersistent(nothingPersistent), poppedRecently(poppedRecently), popped(popped), requiresPoppedLocationUpdate(false), poppedLocation(poppedLocation), unpoppedRecovered(unpoppedRecovered) {}
|
||||
TagData( Tag tag, Version popped, IDiskQueue::location poppedLocation, bool nothingPersistent, bool poppedRecently, bool unpoppedRecovered ) : tag(tag), nothingPersistent(nothingPersistent), poppedRecently(poppedRecently), popped(popped), persistentPopped(0), versionForPoppedLocation(0), poppedLocation(poppedLocation), unpoppedRecovered(unpoppedRecovered) {}
|
||||
|
||||
TagData(TagData&& r) BOOST_NOEXCEPT : versionMessages(std::move(r.versionMessages)), nothingPersistent(r.nothingPersistent), poppedRecently(r.poppedRecently), popped(r.popped), requiresPoppedLocationUpdate(r.requiresPoppedLocationUpdate), poppedLocation(r.poppedLocation), tag(r.tag), unpoppedRecovered(r.unpoppedRecovered) {}
|
||||
TagData(TagData&& r) BOOST_NOEXCEPT : versionMessages(std::move(r.versionMessages)), nothingPersistent(r.nothingPersistent), poppedRecently(r.poppedRecently), popped(r.popped), persistentPopped(r.persistentPopped), versionForPoppedLocation(r.versionForPoppedLocation), poppedLocation(r.poppedLocation), tag(r.tag), unpoppedRecovered(r.unpoppedRecovered) {}
|
||||
void operator= (TagData&& r) BOOST_NOEXCEPT {
|
||||
versionMessages = std::move(r.versionMessages);
|
||||
nothingPersistent = r.nothingPersistent;
|
||||
poppedRecently = r.poppedRecently;
|
||||
popped = r.popped;
|
||||
requiresPoppedLocationUpdate = r.requiresPoppedLocationUpdate;
|
||||
persistentPopped = r.persistentPopped;
|
||||
versionForPoppedLocation = r.versionForPoppedLocation;
|
||||
poppedLocation = r.poppedLocation;
|
||||
tag = r.tag;
|
||||
unpoppedRecovered = r.unpoppedRecovered;
|
||||
|
@ -680,6 +682,7 @@ void updatePersistentPopped( TLogData* self, Reference<LogData> logData, Referen
|
|||
if (!data->poppedRecently) return;
|
||||
self->persistentData->set(KeyValueRef( persistTagPoppedKey(logData->logId, data->tag), persistTagPoppedValue(data->popped) ));
|
||||
data->poppedRecently = false;
|
||||
data->persistentPopped = data->popped;
|
||||
|
||||
if (data->nothingPersistent) return;
|
||||
|
||||
|
@ -704,14 +707,16 @@ ACTOR Future<Void> updatePoppedLocation( TLogData* self, Reference<LogData> logD
|
|||
return Void();
|
||||
}
|
||||
|
||||
if (!data->requiresPoppedLocationUpdate) return Void();
|
||||
data->requiresPoppedLocationUpdate = false;
|
||||
if (data->versionForPoppedLocation >= data->persistentPopped) return Void();
|
||||
data->versionForPoppedLocation = data->persistentPopped;
|
||||
|
||||
if (data->popped <= logData->persistentDataVersion) {
|
||||
// Use persistentPopped and not popped, so that a pop update received after spilling doesn't cause
|
||||
// us to remove data that still is pointed to by SpilledData in the btree.
|
||||
if (data->persistentPopped <= logData->persistentDataVersion) {
|
||||
// Recover the next needed location in the Disk Queue from the index.
|
||||
Standalone<VectorRef<KeyValueRef>> kvrefs = wait(
|
||||
self->persistentData->readRange(KeyRangeRef(
|
||||
persistTagMessageRefsKey(logData->logId, data->tag, data->popped),
|
||||
persistTagMessageRefsKey(logData->logId, data->tag, data->persistentPopped),
|
||||
persistTagMessageRefsKey(logData->logId, data->tag, logData->persistentDataVersion + 1)), 1));
|
||||
|
||||
if (kvrefs.empty()) {
|
||||
|
@ -723,19 +728,21 @@ ACTOR Future<Void> updatePoppedLocation( TLogData* self, Reference<LogData> logD
|
|||
r >> spilledData;
|
||||
|
||||
for (const SpilledData& sd : spilledData) {
|
||||
if (sd.version >= data->popped) {
|
||||
if (sd.version >= data->persistentPopped) {
|
||||
data->poppedLocation = sd.start;
|
||||
data->versionForPoppedLocation = sd.version;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (data->popped >= logData->persistentDataVersion || data->nothingPersistent) {
|
||||
if (data->persistentPopped >= logData->persistentDataVersion || data->nothingPersistent) {
|
||||
// Then the location must be in memory.
|
||||
auto locationIter = logData->versionLocation.lower_bound(data->popped);
|
||||
auto locationIter = logData->versionLocation.lower_bound(data->persistentPopped);
|
||||
if (locationIter != logData->versionLocation.end()) {
|
||||
data->poppedLocation = locationIter->value.first;
|
||||
data->versionForPoppedLocation = locationIter->key;
|
||||
} else {
|
||||
// No data on disk and no data in RAM.
|
||||
// This TLog instance will be removed soon anyway, so we temporarily freeze our poppedLocation
|
||||
|
@ -987,7 +994,6 @@ ACTOR Future<Void> tLogPopCore( TLogData* self, Tag inputTag, Version to, Refere
|
|||
} else if (upTo > tagData->popped) {
|
||||
tagData->popped = upTo;
|
||||
tagData->poppedRecently = true;
|
||||
tagData->requiresPoppedLocationUpdate = true;
|
||||
|
||||
if(tagData->unpoppedRecovered && upTo > logData->recoveredAt) {
|
||||
tagData->unpoppedRecovered = false;
|
||||
|
@ -2484,6 +2490,7 @@ ACTOR Future<Void> restorePersistentState( TLogData* self, LocalityData locality
|
|||
auto tagData = logData->getTagData(tag);
|
||||
ASSERT( !tagData );
|
||||
logData->createTagData(tag, popped, false, false, false);
|
||||
logData->getTagData(tag)->persistentPopped = popped;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue