diff --git a/akka-cluster-sharding/src/main/resources/reference.conf b/akka-cluster-sharding/src/main/resources/reference.conf index 5c5088f20c..3ef539382b 100644 --- a/akka-cluster-sharding/src/main/resources/reference.conf +++ b/akka-cluster-sharding/src/main/resources/reference.conf @@ -87,7 +87,7 @@ akka.cluster.sharding { state-store-mode = "ddata" # The shard saves persistent snapshots after this number of persistent - # events. Snapshots are used to reduce recovery times. + # events. Snapshots are used to reduce recovery times. A snapshot trigger might be delayed if a batch of updates is processed. # Only used when state-store-mode=persistence snapshot-after = 1000 @@ -95,7 +95,8 @@ akka.cluster.sharding { # keeping this number of old persistent batches. # Batch is of size `snapshot-after`. # When set to 0 after snapshot is successfully done all events with equal or lower sequence number will be deleted. - # Default value of 2 leaves last maximum 2*`snapshot-after` events and 3 snapshots (2 old ones + latest snapshot) + # Default value of 2 leaves last maximum 2*`snapshot-after` events and 3 snapshots (2 old ones + latest snapshot). + # If larger than 0, one additional batch of journal messages is kept when state-store-mode=persistence to include messages from delayed snapshots. keep-nr-of-batches = 2 # Settings for LeastShardAllocationStrategy. diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EventSourcedRememberEntitiesShardStore.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EventSourcedRememberEntitiesShardStore.scala index 445fad6b15..6715edb8e3 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EventSourcedRememberEntitiesShardStore.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EventSourcedRememberEntitiesShardStore.scala @@ -96,13 +96,17 @@ private[akka] final class EventSourcedRememberEntitiesShardStore( (if (started.nonEmpty) EntitiesStarted(started) :: Nil else Nil) ::: (if (stopped.nonEmpty) EntitiesStopped(stopped) :: Nil else Nil) var left = events.size + var saveSnap = false def persistEventsAndHandleComplete(evts: List[StateChange]): Unit = { persistAll(evts) { _ => left -= 1 + saveSnap = saveSnap || isSnapshotNeeded if (left == 0) { sender() ! RememberEntitiesShardStore.UpdateDone(started, stopped) state = state.copy(state.entities.union(started).diff(stopped)) - saveSnapshotWhenNeeded() + if (saveSnap) { + saveSnapshot() + } } } } @@ -126,7 +130,9 @@ private[akka] final class EventSourcedRememberEntitiesShardStore( case DeleteMessagesSuccess(toSequenceNr) => val deleteTo = toSequenceNr - 1 - val deleteFrom = math.max(0, deleteTo - (keepNrOfBatches * snapshotAfter)) + // keeping one additional batch of messages in case snapshotAfter has been delayed to the end of a processed batch + val keepNrOfBatchesWithSafetyBatch = if (keepNrOfBatches == 0) 0 else keepNrOfBatches + 1 + val deleteFrom = math.max(0, deleteTo - (keepNrOfBatchesWithSafetyBatch * snapshotAfter)) log.debug( "Messages to [{}] deleted successfully. Deleting snapshots from [{}] to [{}]", toSequenceNr, @@ -151,10 +157,17 @@ private[akka] final class EventSourcedRememberEntitiesShardStore( } def saveSnapshotWhenNeeded(): Unit = { - if (lastSequenceNr % snapshotAfter == 0 && lastSequenceNr != 0) { - log.debug("Saving snapshot, sequence number [{}]", snapshotSequenceNr) - saveSnapshot(state) + if (isSnapshotNeeded) { + saveSnapshot() } } + private def saveSnapshot(): Unit = { + log.debug("Saving snapshot, sequence number [{}]", snapshotSequenceNr) + saveSnapshot(state) + } + + private def isSnapshotNeeded = { + lastSequenceNr % snapshotAfter == 0 && lastSequenceNr != 0 + } }