fix race condition in Running.storingSnapshot, #26601
* When storing a snapshot, and waiting for the response, it received a response from a previous/concurrent delete snapshot (DeleteSnapshotSuccess) which it handled as a response for the save snapshot, i.e. tryUnstashOne, applySideEffects, and changing phase to HandlingCommands. Thereby missing the SaveSnapshotSuccess.
This commit is contained in:
parent
03897dd6e0
commit
af2ac0aac8
1 changed files with 10 additions and 6 deletions
|
|
@ -321,11 +321,11 @@ private[akka] object Running {
|
|||
Behaviors.unhandled
|
||||
} else {
|
||||
stashUser(cmd)
|
||||
storingSnapshot(state, sideEffects)
|
||||
Behaviors.same
|
||||
}
|
||||
}
|
||||
|
||||
def onSnapshotterResponse(response: SnapshotProtocol.Response): Unit = {
|
||||
def onSaveSnapshotResponse(response: SnapshotProtocol.Response): Unit = {
|
||||
val signal = response match {
|
||||
case e @ SaveSnapshotSuccess(meta) =>
|
||||
// # 24698 The deletion of old events are automatic, snapshots are triggered by the SaveSnapshotSuccess.
|
||||
|
|
@ -342,7 +342,6 @@ private[akka] object Running {
|
|||
Some(SnapshotFailed(SnapshotMetadata.fromUntyped(meta), error))
|
||||
|
||||
case _ =>
|
||||
onDeleteSnapshotResponse(response)
|
||||
None
|
||||
}
|
||||
|
||||
|
|
@ -356,9 +355,14 @@ private[akka] object Running {
|
|||
onCommand(cmd)
|
||||
case JournalResponse(r) =>
|
||||
onDeleteEventsJournalResponse(r)
|
||||
case SnapshotterResponse(r) =>
|
||||
onSnapshotterResponse(r)
|
||||
tryUnstashOne(applySideEffects(sideEffects, state))
|
||||
case SnapshotterResponse(response) =>
|
||||
response match {
|
||||
case _: SaveSnapshotSuccess | _: SaveSnapshotFailure =>
|
||||
onSaveSnapshotResponse(response)
|
||||
tryUnstashOne(applySideEffects(sideEffects, state))
|
||||
case _ =>
|
||||
onDeleteSnapshotResponse(response)
|
||||
}
|
||||
case _ =>
|
||||
Behaviors.unhandled
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue