bugfix exception issue 5580 (#5581)

## Why are these changes needed?

fixing intermittent bug #5580 

was using the wrong scheduler. 

## Related issue number

closes #5580
This commit is contained in:
Ryan Sweet 2025-02-17 15:46:22 -08:00 committed by GitHub
parent 17888819c2
commit 9a10427a7d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 96 additions and 10 deletions

View File

@ -12,10 +12,14 @@ internal sealed class MessageRegistryGrain(
ILogger<MessageRegistryGrain> logger
) : Grain, IMessageRegistryGrain
{
// <summary>
// Helper class for managing state writes.
// </summary>
private readonly StateManager _stateManager = new(state);
// <summary>
// The number of times to retry writing the state before giving up.
// </summary>
private const int _retries = 5;
/// <summary>
/// The time to wait before removing a message from the event buffer.
@ -35,12 +39,9 @@ internal sealed class MessageRegistryGrain(
{
await TryWriteMessageAsync("eb", topic, message).ConfigureAwait(true);
// Schedule the removal task to run in the background after bufferTime
_ = Task.Delay(_bufferTime)
.ContinueWith(
async _ => await RemoveMessage(topic, message),
TaskScheduler.Default
);
RemoveMessageAfterDelay(topic, message).Ignore();
}
/// <summary>
/// remove a specific message from the buffer for a given topic
/// </summary>
@ -54,13 +55,24 @@ internal sealed class MessageRegistryGrain(
if (events != null && events.Remove(message))
{
state.State.EventBuffer.AddOrUpdate(topic, events, (_, _) => events);
await state.WriteStateAsync().ConfigureAwait(true);
await _stateManager.WriteStateAsync().ConfigureAwait(true);
return true;
}
}
return false;
}
/// <summary>
/// remove a specific message from the buffer for a given topic after a delay
/// </summary>
/// <param name="topic"></param>
/// <param name="message"></param>
private async Task RemoveMessageAfterDelay(string topic, CloudEvent message)
{
await Task.Delay(_bufferTime);
await RemoveMessage(topic, message);
}
/// <summary>
/// Tries to write a message to the given queue in Orleans state.
/// Allows for retries using etag for optimistic concurrency.
@ -114,7 +126,7 @@ internal sealed class MessageRegistryGrain(
default:
throw new ArgumentException($"Invalid queue name: {whichQueue}");
}
await state.WriteStateAsync().ConfigureAwait(true);
await _stateManager.WriteStateAsync().ConfigureAwait(true);
return true;
}
@ -124,7 +136,7 @@ internal sealed class MessageRegistryGrain(
var messages = new List<CloudEvent>();
if (state.State.DeadLetterQueue != null && state.State.DeadLetterQueue.Remove(topic, out List<CloudEvent>? letters))
{
await state.WriteStateAsync().ConfigureAwait(true);
await _stateManager.WriteStateAsync().ConfigureAwait(true);
if (letters != null)
{
messages.AddRange(letters);
@ -132,7 +144,7 @@ internal sealed class MessageRegistryGrain(
}
if (state.State.EventBuffer != null && state.State.EventBuffer.Remove(topic, out List<CloudEvent>? events))
{
await state.WriteStateAsync().ConfigureAwait(true);
await _stateManager.WriteStateAsync().ConfigureAwait(true);
if (events != null)
{
messages.AddRange(events);

View File

@ -0,0 +1,74 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// StateManager.cs
using Orleans.Core;
namespace Microsoft.AutoGen.RuntimeGateway.Grpc;
/// <summary>
/// A helper class which wraps a grain state instance and ensures that only a single write operation is outstanding at any moment in time.
/// </summary>
/// <param name="state">The grain state.</param>
internal sealed class StateManager(IStorage state)
{
/// <summary>
/// Allows state writing to happen in the background.
/// </summary>
private Task? _pendingOperation;
// When reentrant grain is doing WriteStateAsync, etag violations are possible due to concurrent writes.
// The solution is to serialize and batch writes, and make sure only a single write is outstanding at any moment in time.
public async ValueTask WriteStateAsync()
{
await PerformOperationAsync(static state => state.WriteStateAsync());
}
public async ValueTask ClearStateAsync()
{
await PerformOperationAsync(static state => state.ClearStateAsync());
}
public async ValueTask PerformOperationAsync(Func<IStorage, Task> performOperation)
{
if (_pendingOperation is Task currentWriteStateOperation)
{
// await the outstanding write, but ignore it since it doesn't include our changes
await currentWriteStateOperation.ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing | ConfigureAwaitOptions.ContinueOnCapturedContext);
if (_pendingOperation == currentWriteStateOperation)
{
// only null out the outstanding operation if it's the same one as the one we awaited, otherwise
// another request might have already done so.
_pendingOperation = null;
}
}
Task operation;
if (_pendingOperation is null)
{
// If after the initial write is completed, no other request initiated a new write operation, do it now.
operation = performOperation(state);
_pendingOperation = operation;
}
else
{
// If there were many requests enqueued to persist state, there is no reason to enqueue a new write
// operation for each, since any write (after the initial one that we already awaited) will have cumulative
// changes including the one requested by our caller. Just await the new outstanding write.
operation = _pendingOperation;
}
try
{
await operation;
}
finally
{
if (_pendingOperation == operation)
{
// only null out the outstanding operation if it's the same one as the one we awaited, otherwise
// another request might have already done so.
_pendingOperation = null;
}
}
}
}