A number of bug fixes of rare correctness errors
This commit is contained in:
parent
76fb345dd1
commit
7cebe743f9
|
@ -1490,6 +1490,12 @@ namespace dbBackup {
|
|||
Version bVersion = wait(srcTr->getReadVersion());
|
||||
beginVersionKey = BinaryWriter::toValue(bVersion, Unversioned());
|
||||
|
||||
state Key versionKey = logUidValue.withPrefix(destUidValue).withPrefix(backupLatestVersionsPrefix);
|
||||
Optional<Key> versionRecord = wait( scrTr->get(versionKey) );
|
||||
if(!versionRecord.present()) {
|
||||
srcTr->set(versionKey, beginVersionKey);
|
||||
}
|
||||
|
||||
task->params[BackupAgentBase::destUid] = destUidValue;
|
||||
|
||||
wait(srcTr->commit());
|
||||
|
@ -1539,9 +1545,6 @@ namespace dbBackup {
|
|||
if(v.present() && BinaryReader::fromStringRef<Version>(v.get(), Unversioned()) >= BinaryReader::fromStringRef<Version>(task->params[DatabaseBackupAgent::keyFolderId], Unversioned()))
|
||||
return Void();
|
||||
|
||||
Key versionKey = logUidValue.withPrefix(destUidValue).withPrefix(backupLatestVersionsPrefix);
|
||||
srcTr2->set(versionKey, beginVersionKey);
|
||||
|
||||
srcTr2->set( Subspace(databaseBackupPrefixRange.begin).get(BackupAgentBase::keySourceTagName).pack(task->params[BackupAgentBase::keyTagName]), logUidValue );
|
||||
srcTr2->set( sourceStates.pack(DatabaseBackupAgent::keyFolderId), task->params[DatabaseBackupAgent::keyFolderId] );
|
||||
srcTr2->set( sourceStates.pack(DatabaseBackupAgent::keyStateStatus), StringRef(BackupAgentBase::getStateText(BackupAgentBase::STATE_RUNNING)));
|
||||
|
|
|
@ -1019,7 +1019,12 @@ public:
|
|||
return Void();
|
||||
}
|
||||
|
||||
watchFuture = ryw->tr.watch(watch); // throws if there are too many outstanding watches
|
||||
try {
|
||||
watchFuture = ryw->tr.watch(watch); // throws if there are too many outstanding watches
|
||||
} catch( Error &e ) {
|
||||
done.send(Void());
|
||||
throw;
|
||||
}
|
||||
done.send(Void());
|
||||
|
||||
wait(watchFuture);
|
||||
|
|
|
@ -461,7 +461,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
|
|||
init( SPRING_BYTES_TLOG_BATCH, 300e6 ); if( smallTlogTarget ) SPRING_BYTES_TLOG_BATCH = 150e3;
|
||||
init( TLOG_SPILL_THRESHOLD, 1500e6 ); if( smallTlogTarget ) TLOG_SPILL_THRESHOLD = 1500e3; if( randomize && BUGGIFY ) TLOG_SPILL_THRESHOLD = 0;
|
||||
init( REFERENCE_SPILL_UPDATE_STORAGE_BYTE_LIMIT, 20e6 ); if( (randomize && BUGGIFY) || smallTlogTarget ) REFERENCE_SPILL_UPDATE_STORAGE_BYTE_LIMIT = 1e6;
|
||||
init( TLOG_HARD_LIMIT_BYTES, 3000e6 ); if( smallTlogTarget ) TLOG_HARD_LIMIT_BYTES = 3000e3;
|
||||
init( TLOG_HARD_LIMIT_BYTES, 3000e6 ); if( smallTlogTarget ) TLOG_HARD_LIMIT_BYTES = 30e6;
|
||||
init( TLOG_RECOVER_MEMORY_LIMIT, TARGET_BYTES_PER_TLOG + SPRING_BYTES_TLOG );
|
||||
|
||||
init( MAX_TRANSACTIONS_PER_BYTE, 1000 );
|
||||
|
|
|
@ -347,7 +347,7 @@ ACTOR Future<Void> logRouterPeekMessages( LogRouterData* self, TLogPeekRequest r
|
|||
peekId = req.sequence.get().first;
|
||||
sequence = req.sequence.get().second;
|
||||
if (sequence >= SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS && self->peekTracker.find(peekId) == self->peekTracker.end()) {
|
||||
throw timed_out();
|
||||
throw operation_obsolete();
|
||||
}
|
||||
auto& trackerData = self->peekTracker[peekId];
|
||||
if (sequence == 0 && trackerData.sequence_version.find(0) == trackerData.sequence_version.end()) {
|
||||
|
|
|
@ -1088,6 +1088,10 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
|
|||
state UID peekId;
|
||||
state double queueStart = now();
|
||||
|
||||
if(req.tag.locality == tagLocalityTxs && req.tag.id >= logData->txsTags && logData->txsTags > 0) {
|
||||
req.tag.id = req.tag.id % logData->txsTags;
|
||||
}
|
||||
|
||||
if(req.sequence.present()) {
|
||||
try {
|
||||
peekId = req.sequence.get().first;
|
||||
|
|
|
@ -1391,6 +1391,10 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
|
|||
state int sequence = -1;
|
||||
state UID peekId;
|
||||
state double queueStart = now();
|
||||
|
||||
if(req.tag.locality == tagLocalityTxs && req.tag.id >= logData->txsTags && logData->txsTags > 0) {
|
||||
req.tag.id = req.tag.id % logData->txsTags;
|
||||
}
|
||||
|
||||
if(req.sequence.present()) {
|
||||
try {
|
||||
|
|
|
@ -1404,6 +1404,10 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
|
|||
state int sequence = -1;
|
||||
state UID peekId;
|
||||
state double queueStart = now();
|
||||
|
||||
if(req.tag.locality == tagLocalityTxs && req.tag.id >= logData->txsTags && logData->txsTags > 0) {
|
||||
req.tag.id = req.tag.id % logData->txsTags;
|
||||
}
|
||||
|
||||
if(req.sequence.present()) {
|
||||
try {
|
||||
|
|
|
@ -827,7 +827,7 @@ struct WriteDuringReadWorkload : TestWorkload {
|
|||
self->addedConflicts.insert(allKeys, false);
|
||||
return Void();
|
||||
}
|
||||
if( e.code() == error_code_not_committed || e.code() == error_code_commit_unknown_result || e.code() == error_code_transaction_too_large || e.code() == error_code_key_too_large || e.code() == error_code_value_too_large || cancelled )
|
||||
if( e.code() == error_code_not_committed || e.code() == error_code_commit_unknown_result || e.code() == error_code_transaction_too_large || e.code() == error_code_key_too_large || e.code() == error_code_value_too_large || e.code() == error_code_too_many_watches || cancelled )
|
||||
throw not_committed();
|
||||
try {
|
||||
wait( tr.onError(e) );
|
||||
|
|
Loading…
Reference in New Issue