fix: the tlog would not pop data from the disk queue after a storage server was removed, because the tag still exists in memory on the logs
fix: we could incorrectly make data durable if eraseMessagesFromMemory was in progress while running updatePersistentData the quiet database check now ensure that tlogs have no more than 30 seconds of versions unpopped from the disk queue
This commit is contained in:
parent
b2a5d056d0
commit
4059d68348
|
@ -437,6 +437,7 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
|
|||
specialCounter(cc, "PersistentDataVersion", [this](){ return this->persistentDataVersion; });
|
||||
specialCounter(cc, "PersistentDataDurableVersion", [this](){ return this->persistentDataDurableVersion; });
|
||||
specialCounter(cc, "KnownCommittedVersion", [this](){ return this->knownCommittedVersion; });
|
||||
specialCounter(cc, "QueuePoppedVersion", [this](){ return this->persistentDataDurableVersion; });
|
||||
specialCounter(cc, "SharedBytesInput", [tLogData](){ return tLogData->bytesInput; });
|
||||
specialCounter(cc, "SharedBytesDurable", [tLogData](){ return tLogData->bytesDurable; });
|
||||
specialCounter(cc, "SharedOverheadBytesInput", [tLogData](){ return tLogData->overheadBytesInput; });
|
||||
|
|
|
@ -124,8 +124,18 @@ int64_t getQueueSize( const TraceEventFields& md ) {
|
|||
return inputBytes - durableBytes;
|
||||
}
|
||||
|
||||
//Computes the popped version lag for tlogs
|
||||
int64_t getPoppedVersionLag( const TraceEventFields& md ) {
|
||||
int64_t persistentDataDurableVersion, queuePoppedVersion;
|
||||
|
||||
sscanf(md.getValue("PersistentDataDurableVersion").c_str(), "%lld", &persistentDataDurableVersion);
|
||||
sscanf(md.getValue("QueuePoppedVersion").c_str(), "%lld", &queuePoppedVersion);
|
||||
|
||||
return persistentDataDurableVersion - queuePoppedVersion;
|
||||
}
|
||||
|
||||
// This is not robust in the face of a TLog failure
|
||||
ACTOR Future<int64_t> getMaxTLogQueueSize( Database cx, Reference<AsyncVar<ServerDBInfo>> dbInfo ) {
|
||||
ACTOR Future<std::pair<int64_t,int64_t>> getTLogQueueInfo( Database cx, Reference<AsyncVar<ServerDBInfo>> dbInfo ) {
|
||||
TraceEvent("MaxTLogQueueSize").detail("Stage", "ContactingLogs");
|
||||
|
||||
state std::vector<WorkerDetails> workers = wait(getWorkers(dbInfo));
|
||||
|
@ -150,17 +160,19 @@ ACTOR Future<int64_t> getMaxTLogQueueSize( Database cx, Reference<AsyncVar<Serve
|
|||
TraceEvent("MaxTLogQueueSize").detail("Stage", "ComputingMax").detail("MessageCount", messages.size());
|
||||
|
||||
state int64_t maxQueueSize = 0;
|
||||
state int64_t maxPoppedVersionLag = 0;
|
||||
state int i = 0;
|
||||
for(; i < messages.size(); i++) {
|
||||
try {
|
||||
maxQueueSize = std::max( maxQueueSize, getQueueSize( messages[i].get() ) );
|
||||
maxPoppedVersionLag = std::max( maxPoppedVersionLag, getPoppedVersionLag( messages[i].get() ) );
|
||||
} catch( Error &e ) {
|
||||
TraceEvent("QuietDatabaseFailure").detail("Reason", "Failed to extract MaxTLogQueue").detail("Tlog", tlogs[i].id());
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
return maxQueueSize;
|
||||
return std::make_pair( maxQueueSize, maxPoppedVersionLag );
|
||||
}
|
||||
|
||||
ACTOR Future<vector<StorageServerInterface>> getStorageServers( Database cx, bool use_system_priority = false) {
|
||||
|
@ -397,7 +409,7 @@ ACTOR Future<Void> reconfigureAfter(Database cx, double time, Reference<AsyncVar
|
|||
}
|
||||
|
||||
ACTOR Future<Void> waitForQuietDatabase( Database cx, Reference<AsyncVar<ServerDBInfo>> dbInfo, std::string phase, int64_t dataInFlightGate = 2e6,
|
||||
int64_t maxTLogQueueGate = 5e6, int64_t maxStorageServerQueueGate = 5e6, int64_t maxDataDistributionQueueSize = 0 ) {
|
||||
int64_t maxTLogQueueGate = 5e6, int64_t maxStorageServerQueueGate = 5e6, int64_t maxDataDistributionQueueSize = 0, int64_t maxPoppedVersionLag = 30e6 ) {
|
||||
state Future<Void> reconfig = reconfigureAfter(cx, 100 + (g_random->random01()*100), dbInfo, "QuietDatabase");
|
||||
|
||||
TraceEvent(("QuietDatabase" + phase + "Begin").c_str());
|
||||
|
@ -417,26 +429,27 @@ ACTOR Future<Void> waitForQuietDatabase( Database cx, Reference<AsyncVar<ServerD
|
|||
TraceEvent("QuietDatabaseGotDataDistributor", distributorUID).detail("Locality", distributorWorker.locality.toString());
|
||||
|
||||
state Future<int64_t> dataInFlight = getDataInFlight( cx, distributorWorker);
|
||||
state Future<int64_t> tLogQueueSize = getMaxTLogQueueSize( cx, dbInfo );
|
||||
state Future<std::pair<int64_t,int64_t>> tLogQueueInfo = getTLogQueueInfo( cx, dbInfo );
|
||||
state Future<int64_t> dataDistributionQueueSize = getDataDistributionQueueSize( cx, distributorWorker, dataInFlightGate == 0);
|
||||
state Future<bool> teamCollectionValid = getTeamCollectionValid(cx, distributorWorker);
|
||||
state Future<int64_t> storageQueueSize = getMaxStorageServerQueueSize( cx, dbInfo );
|
||||
state Future<bool> dataDistributionActive = getDataDistributionActive( cx, distributorWorker );
|
||||
state Future<bool> storageServersRecruiting = getStorageServersRecruiting ( cx, distributorWorker, distributorUID );
|
||||
|
||||
wait(success(dataInFlight) && success(tLogQueueSize) && success(dataDistributionQueueSize) &&
|
||||
wait(success(dataInFlight) && success(tLogQueueInfo) && success(dataDistributionQueueSize) &&
|
||||
success(teamCollectionValid) && success(storageQueueSize) && success(dataDistributionActive) &&
|
||||
success(storageServersRecruiting));
|
||||
TraceEvent(("QuietDatabase" + phase).c_str())
|
||||
.detail("DataInFlight", dataInFlight.get())
|
||||
.detail("MaxTLogQueueSize", tLogQueueSize.get())
|
||||
.detail("MaxTLogQueueSize", tLogQueueInfo.get().first)
|
||||
.detail("MaxTLogPoppedVersionLag", tLogQueueInfo.get().second)
|
||||
.detail("DataDistributionQueueSize", dataDistributionQueueSize.get())
|
||||
.detail("TeamCollectionValid", teamCollectionValid.get())
|
||||
.detail("MaxStorageQueueSize", storageQueueSize.get())
|
||||
.detail("DataDistributionActive", dataDistributionActive.get())
|
||||
.detail("StorageServersRecruiting", storageServersRecruiting.get());
|
||||
|
||||
if (dataInFlight.get() > dataInFlightGate || tLogQueueSize.get() > maxTLogQueueGate ||
|
||||
if (dataInFlight.get() > dataInFlightGate || tLogQueueInfo.get().first > maxTLogQueueGate || tLogQueueInfo.get().second > maxPoppedVersionLag ||
|
||||
dataDistributionQueueSize.get() > maxDataDistributionQueueSize ||
|
||||
storageQueueSize.get() > maxStorageServerQueueGate || dataDistributionActive.get() == false ||
|
||||
storageServersRecruiting.get() == true || teamCollectionValid.get() == false) {
|
||||
|
@ -469,6 +482,6 @@ ACTOR Future<Void> waitForQuietDatabase( Database cx, Reference<AsyncVar<ServerD
|
|||
}
|
||||
|
||||
Future<Void> quietDatabase( Database const& cx, Reference<AsyncVar<ServerDBInfo>> const& dbInfo, std::string phase, int64_t dataInFlightGate,
|
||||
int64_t maxTLogQueueGate, int64_t maxStorageServerQueueGate, int64_t maxDataDistributionQueueSize ) {
|
||||
return waitForQuietDatabase(cx, dbInfo, phase, dataInFlightGate, maxTLogQueueGate, maxStorageServerQueueGate, maxDataDistributionQueueSize);
|
||||
int64_t maxTLogQueueGate, int64_t maxStorageServerQueueGate, int64_t maxDataDistributionQueueSize, int64_t maxPoppedVersionLag ) {
|
||||
return waitForQuietDatabase(cx, dbInfo, phase, dataInFlightGate, maxTLogQueueGate, maxStorageServerQueueGate, maxDataDistributionQueueSize, maxPoppedVersionLag);
|
||||
}
|
||||
|
|
|
@ -29,7 +29,7 @@
|
|||
#include "flow/actorcompiler.h"
|
||||
|
||||
Future<int64_t> getDataInFlight( Database const& cx, Reference<AsyncVar<struct ServerDBInfo>> const& );
|
||||
Future<int64_t> getMaxTLogQueueSize( Database const& cx, Reference<AsyncVar<struct ServerDBInfo>> const& );
|
||||
Future<std::pair<int64_t,int64_t>> getTLogQueueInfo( Database const& cx, Reference<AsyncVar<struct ServerDBInfo>> const& );
|
||||
Future<int64_t> getMaxStorageServerQueueSize( Database const& cx, Reference<AsyncVar<struct ServerDBInfo>> const& );
|
||||
Future<int64_t> getDataDistributionQueueSize( Database const &cx, Reference<AsyncVar<struct ServerDBInfo>> const&, bool const& reportInFlight );
|
||||
Future<bool> getTeamCollectionValid(Database const& cx, WorkerInterface const&);
|
||||
|
|
|
@ -427,6 +427,7 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
|
|||
NotifiedVersion version, queueCommittedVersion;
|
||||
Version queueCommittingVersion;
|
||||
Version knownCommittedVersion, durableKnownCommittedVersion, minKnownCommittedVersion;
|
||||
Version queuePoppedVersion;
|
||||
|
||||
Deque<std::pair<Version, Standalone<VectorRef<uint8_t>>>> messageBlocks;
|
||||
std::vector<std::vector<Reference<TagData>>> tag_data; //tag.locality | tag.id
|
||||
|
@ -481,7 +482,7 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
|
|||
|
||||
explicit LogData(TLogData* tLogData, TLogInterface interf, Tag remoteTag, bool isPrimary, int logRouterTags, UID recruitmentID, uint64_t protocolVersion, std::vector<Tag> tags) : tLogData(tLogData), knownCommittedVersion(0), logId(interf.id()),
|
||||
cc("TLog", interf.id().toString()), bytesInput("BytesInput", cc), bytesDurable("BytesDurable", cc), remoteTag(remoteTag), isPrimary(isPrimary), logRouterTags(logRouterTags), recruitmentID(recruitmentID), protocolVersion(protocolVersion),
|
||||
logSystem(new AsyncVar<Reference<ILogSystem>>()), logRouterPoppedVersion(0), durableKnownCommittedVersion(0), minKnownCommittedVersion(0), allTags(tags.begin(), tags.end()), terminated(tLogData->terminated.getFuture()),
|
||||
logSystem(new AsyncVar<Reference<ILogSystem>>()), logRouterPoppedVersion(0), durableKnownCommittedVersion(0), minKnownCommittedVersion(0), queuePoppedVersion(0), allTags(tags.begin(), tags.end()), terminated(tLogData->terminated.getFuture()),
|
||||
// These are initialized differently on init() or recovery
|
||||
recoveryCount(), stopped(false), initialized(false), queueCommittingVersion(0), newPersistentDataVersion(invalidVersion), unrecoveredBefore(1), recoveredAt(1), unpoppedRecoveredTags(0),
|
||||
logRouterPopToVersion(0), locality(tagLocalityInvalid)
|
||||
|
@ -498,6 +499,7 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
|
|||
specialCounter(cc, "PersistentDataVersion", [this](){ return this->persistentDataVersion; });
|
||||
specialCounter(cc, "PersistentDataDurableVersion", [this](){ return this->persistentDataDurableVersion; });
|
||||
specialCounter(cc, "KnownCommittedVersion", [this](){ return this->knownCommittedVersion; });
|
||||
specialCounter(cc, "QueuePoppedVersion", [this](){ return this->queuePoppedVersion; });
|
||||
specialCounter(cc, "SharedBytesInput", [tLogData](){ return tLogData->bytesInput; });
|
||||
specialCounter(cc, "SharedBytesDurable", [tLogData](){ return tLogData->bytesDurable; });
|
||||
specialCounter(cc, "SharedOverheadBytesInput", [tLogData](){ return tLogData->overheadBytesInput; });
|
||||
|
@ -708,12 +710,18 @@ ACTOR Future<Void> popDiskQueue( TLogData* self, Reference<LogData> logData ) {
|
|||
wait(waitForAll(updates));
|
||||
|
||||
auto lastItem = logData->versionLocation.lastItem();
|
||||
IDiskQueue::location minLocation = lastItem == logData->versionLocation.end() ? 0 : lastItem->value.second;
|
||||
IDiskQueue::location minLocation = 0;
|
||||
Version minVersion = 0;
|
||||
if(lastItem != logData->versionLocation.end()) {
|
||||
minLocation = lastItem->value.second;
|
||||
minVersion = lastItem->key;
|
||||
}
|
||||
for(int tagLocality = 0; tagLocality < logData->tag_data.size(); tagLocality++) {
|
||||
for(int tagId = 0; tagId < logData->tag_data[tagLocality].size(); tagId++) {
|
||||
Reference<LogData::TagData> tagData = logData->tag_data[tagLocality][tagId];
|
||||
if (tagData) {
|
||||
if (tagData && tagData->tag != txsTag && (tagData->versionMessages.size() > 0 || !tagData->nothingPersistent)) {
|
||||
minLocation = std::min(minLocation, tagData->poppedLocation);
|
||||
minVersion = std::min(minVersion, tagData->popped);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -726,6 +734,7 @@ ACTOR Future<Void> popDiskQueue( TLogData* self, Reference<LogData> logData ) {
|
|||
lastCommittedLocation = locationIter->value.first;
|
||||
}
|
||||
self->persistentQueue->pop( std::min(minLocation, lastCommittedLocation) );
|
||||
logData->queuePoppedVersion = std::max(logData->queuePoppedVersion, minVersion);
|
||||
}
|
||||
|
||||
return Void();
|
||||
|
@ -752,6 +761,7 @@ ACTOR Future<Void> updatePersistentData( TLogData* self, Reference<LogData> logD
|
|||
for(tagId = 0; tagId < logData->tag_data[tagLocality].size(); tagId++) {
|
||||
state Reference<LogData::TagData> tagData = logData->tag_data[tagLocality][tagId];
|
||||
if(tagData) {
|
||||
wait(tagData->eraseMessagesBefore( tagData->popped, self, logData, TaskUpdateStorage ));
|
||||
state Version currentVersion = 0;
|
||||
// Clear recently popped versions from persistentData if necessary
|
||||
updatePersistentPopped( self, logData, tagData );
|
||||
|
|
|
@ -115,7 +115,7 @@ struct ConsistencyCheckWorkload : TestWorkload
|
|||
}
|
||||
|
||||
try {
|
||||
wait(timeoutError(quietDatabase(cx, self->dbInfo, "ConsistencyCheckStart", 0, 1e5, 0, 0), self->quiescentWaitTimeout)); // FIXME: should be zero?
|
||||
wait(timeoutError(quietDatabase(cx, self->dbInfo, "ConsistencyCheckStart", 0, 2e6, 0, 0), self->quiescentWaitTimeout)); // FIXME: should be zero?
|
||||
}
|
||||
catch (Error& e) {
|
||||
TraceEvent("ConsistencyCheck_QuietDatabaseError").error(e);
|
||||
|
@ -219,13 +219,19 @@ struct ConsistencyCheckWorkload : TestWorkload
|
|||
}
|
||||
|
||||
//Check that nothing is in the TLog queues
|
||||
int64_t maxTLogQueueSize = wait(getMaxTLogQueueSize(cx, self->dbInfo));
|
||||
if(maxTLogQueueSize > 1e5) // FIXME: Should be zero?
|
||||
std::pair<int64_t,int64_t> maxTLogQueueInfo = wait(getTLogQueueInfo(cx, self->dbInfo));
|
||||
if(maxTLogQueueInfo.first > 1e5) // FIXME: Should be zero?
|
||||
{
|
||||
TraceEvent("ConsistencyCheck_NonZeroTLogQueue").detail("MaxQueueSize", maxTLogQueueSize);
|
||||
TraceEvent("ConsistencyCheck_NonZeroTLogQueue").detail("MaxQueueSize", maxTLogQueueInfo.first);
|
||||
self->testFailure("Non-zero tlog queue size");
|
||||
}
|
||||
|
||||
if(maxTLogQueueInfo.second > 30e6)
|
||||
{
|
||||
TraceEvent("ConsistencyCheck_PoppedVersionLag").detail("PoppedVersionLag", maxTLogQueueInfo.second);
|
||||
self->testFailure("large popped version lag");
|
||||
}
|
||||
|
||||
//Check that nothing is in the storage server queues
|
||||
try
|
||||
{
|
||||
|
|
|
@ -214,7 +214,7 @@ double testKeyToDouble(const KeyRef& p, const KeyRef& prefix);
|
|||
ACTOR Future<Void> databaseWarmer(Database cx);
|
||||
|
||||
Future<Void> quietDatabase( Database const& cx, Reference<AsyncVar<struct ServerDBInfo>> const&, std::string phase, int64_t dataInFlightGate = 2e6, int64_t maxTLogQueueGate = 5e6,
|
||||
int64_t maxStorageServerQueueGate = 5e6, int64_t maxDataDistributionQueueSize = 0);
|
||||
int64_t maxStorageServerQueueGate = 5e6, int64_t maxDataDistributionQueueSize = 0, int64_t maxPoppedVersionLag = 30e6);
|
||||
|
||||
|
||||
#include "flow/unactorcompiler.h"
|
||||
|
|
Loading…
Reference in New Issue