-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Fixed possible racing conditions in Journal and SnapshotStore #2735
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixed possible racing conditions in Journal and SnapshotStore #2735
Conversation
.ContinueWith(t => | ||
{ | ||
if (!t.IsFaulted && !t.IsCanceled) | ||
if (t.IsCompleted) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
!t.IsFaulted && !t.IsCanceled
is the same as t.IsCompleted
@@ -253,43 +261,42 @@ private void HandleReplayMessages(ReplayMessages message) | |||
// Send replayed messages and replay result to persistentActor directly. No need | |||
// to resequence replayed messages relative to written and looped messages. | |||
// not possible to use circuit breaker here | |||
ReplayMessagesAsync(context, message.PersistenceId, message.FromSequenceNr, toSequenceNr, | |||
message.Max, p => | |||
ReplayMessagesAsync(context, message.PersistenceId, message.FromSequenceNr, toSequenceNr, message.Max, p => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need the parameter context
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the context
is being handled asynchronously, then we do need to close over it... however, the context is volatile - so I'm not sure why we'd want to snapshot it. What data are we using from it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cconstantin @Horusiath As I see - we dont use it in our persistence plugins
{ | ||
var msg = (LoadSnapshot) message; | ||
var senderPersistentActor = Sender; // Sender is PersistentActor | ||
var self = Self; //Self MUST BE CLOSED OVER here, or the code below will be subject to race conditions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These variables should be closed over here, in order to prevent racing conditions
.ContinueWith(t => | ||
{ | ||
if (_publish) eventStream.Publish(message); | ||
if (t.IsCompleted && _publish) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added an additional check
{ | ||
return (!t.IsFaulted && !t.IsCanceled) | ||
? (object) new DeleteMessagesSuccess(message.ToSequenceNr) | ||
_breaker.WithCircuitBreaker(() => DeleteMessagesToAsync(message.PersistenceId, message.ToSequenceNr)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Circuit Breaker was added here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me, same as JVM source
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like good fixes to me. Nice work @alexvaluyskiy
{ | ||
return (!t.IsFaulted && !t.IsCanceled) | ||
? (object) new DeleteMessagesSuccess(message.ToSequenceNr) | ||
_breaker.WithCircuitBreaker(() => DeleteMessagesToAsync(message.PersistenceId, message.ToSequenceNr)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me, same as JVM source
@@ -253,43 +261,42 @@ private void HandleReplayMessages(ReplayMessages message) | |||
// Send replayed messages and replay result to persistentActor directly. No need | |||
// to resequence replayed messages relative to written and looped messages. | |||
// not possible to use circuit breaker here | |||
ReplayMessagesAsync(context, message.PersistenceId, message.FromSequenceNr, toSequenceNr, | |||
message.Max, p => | |||
ReplayMessagesAsync(context, message.PersistenceId, message.FromSequenceNr, toSequenceNr, message.Max, p => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the context
is being handled asynchronously, then we do need to close over it... however, the context is volatile - so I'm not sure why we'd want to snapshot it. What data are we using from it?
.PipeTo(replyTo) | ||
.ContinueWith(t => | ||
{ | ||
if (!t.IsFaulted && CanPublish) context.System.EventStream.Publish(message); | ||
if (t.IsCompleted && CanPublish) eventStream.Publish(message); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea to close over the EventStream
👍
No description provided.