Merge pull request #6635 from sfc-gh-etschannen/fix-txn-recovery
Do not reply to a txnStateRequest for the final sequence unless we have processes the txnStateStore
This commit is contained in:
commit
44bce12216
|
@ -1955,6 +1955,8 @@ struct TransactionStateResolveContext {
|
||||||
// Pointer to transaction state store, shortcut for commitData.txnStateStore
|
// Pointer to transaction state store, shortcut for commitData.txnStateStore
|
||||||
IKeyValueStore* pTxnStateStore = nullptr;
|
IKeyValueStore* pTxnStateStore = nullptr;
|
||||||
|
|
||||||
|
Future<Void> txnRecovery;
|
||||||
|
|
||||||
// Actor streams
|
// Actor streams
|
||||||
PromiseStream<Future<Void>>* pActors = nullptr;
|
PromiseStream<Future<Void>>* pActors = nullptr;
|
||||||
|
|
||||||
|
@ -2070,6 +2072,9 @@ ACTOR Future<Void> processTransactionStateRequestPart(TransactionStateResolveCon
|
||||||
ASSERT(pContext->pActors != nullptr);
|
ASSERT(pContext->pActors != nullptr);
|
||||||
|
|
||||||
if (pContext->receivedSequences.count(request.sequence)) {
|
if (pContext->receivedSequences.count(request.sequence)) {
|
||||||
|
if (pContext->receivedSequences.size() == pContext->maxSequence) {
|
||||||
|
wait(pContext->txnRecovery);
|
||||||
|
}
|
||||||
// This part is already received. Still we will re-broadcast it to other CommitProxies
|
// This part is already received. Still we will re-broadcast it to other CommitProxies
|
||||||
pContext->pActors->send(broadcastTxnRequest(request, SERVER_KNOBS->TXN_STATE_SEND_AMOUNT, true));
|
pContext->pActors->send(broadcastTxnRequest(request, SERVER_KNOBS->TXN_STATE_SEND_AMOUNT, true));
|
||||||
wait(yield());
|
wait(yield());
|
||||||
|
@ -2095,7 +2100,8 @@ ACTOR Future<Void> processTransactionStateRequestPart(TransactionStateResolveCon
|
||||||
if (pContext->receivedSequences.size() == pContext->maxSequence) {
|
if (pContext->receivedSequences.size() == pContext->maxSequence) {
|
||||||
// Received all components of the txnStateRequest
|
// Received all components of the txnStateRequest
|
||||||
ASSERT(!pContext->processed);
|
ASSERT(!pContext->processed);
|
||||||
wait(processCompleteTransactionStateRequest(pContext));
|
pContext->txnRecovery = processCompleteTransactionStateRequest(pContext);
|
||||||
|
wait(pContext->txnRecovery);
|
||||||
pContext->processed = true;
|
pContext->processed = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue