Resolve snapshot check skipped for some events (#30225) (#30226)

This commit is contained in:
Jens Neumaier 2021-06-03 17:04:57 +02:00 committed by GitHub
parent e43f2be6cd
commit 7d1b412b22
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 21 additions and 7 deletions

View file

@ -87,7 +87,7 @@ akka.cluster.sharding {
state-store-mode = "ddata"
# The shard saves persistent snapshots after this number of persistent
# events. Snapshots are used to reduce recovery times.
# events. Snapshots are used to reduce recovery times. A snapshot trigger might be delayed if a batch of updates is processed.
# Only used when state-store-mode=persistence
snapshot-after = 1000
@ -95,7 +95,8 @@ akka.cluster.sharding {
# keeping this number of old persistent batches.
# Batch is of size `snapshot-after`.
# When set to 0 after snapshot is successfully done all events with equal or lower sequence number will be deleted.
# Default value of 2 leaves last maximum 2*`snapshot-after` events and 3 snapshots (2 old ones + latest snapshot)
# Default value of 2 leaves last maximum 2*`snapshot-after` events and 3 snapshots (2 old ones + latest snapshot).
# If larger than 0, one additional batch of journal messages is kept when state-store-mode=persistence to include messages from delayed snapshots.
keep-nr-of-batches = 2
# Settings for LeastShardAllocationStrategy.

View file

@ -96,13 +96,17 @@ private[akka] final class EventSourcedRememberEntitiesShardStore(
(if (started.nonEmpty) EntitiesStarted(started) :: Nil else Nil) :::
(if (stopped.nonEmpty) EntitiesStopped(stopped) :: Nil else Nil)
var left = events.size
var saveSnap = false
def persistEventsAndHandleComplete(evts: List[StateChange]): Unit = {
persistAll(evts) { _ =>
left -= 1
saveSnap = saveSnap || isSnapshotNeeded
if (left == 0) {
sender() ! RememberEntitiesShardStore.UpdateDone(started, stopped)
state = state.copy(state.entities.union(started).diff(stopped))
saveSnapshotWhenNeeded()
if (saveSnap) {
saveSnapshot()
}
}
}
}
@ -126,7 +130,9 @@ private[akka] final class EventSourcedRememberEntitiesShardStore(
case DeleteMessagesSuccess(toSequenceNr) =>
val deleteTo = toSequenceNr - 1
val deleteFrom = math.max(0, deleteTo - (keepNrOfBatches * snapshotAfter))
// keeping one additional batch of messages in case snapshotAfter has been delayed to the end of a processed batch
val keepNrOfBatchesWithSafetyBatch = if (keepNrOfBatches == 0) 0 else keepNrOfBatches + 1
val deleteFrom = math.max(0, deleteTo - (keepNrOfBatchesWithSafetyBatch * snapshotAfter))
log.debug(
"Messages to [{}] deleted successfully. Deleting snapshots from [{}] to [{}]",
toSequenceNr,
@ -151,10 +157,17 @@ private[akka] final class EventSourcedRememberEntitiesShardStore(
}
def saveSnapshotWhenNeeded(): Unit = {
if (lastSequenceNr % snapshotAfter == 0 && lastSequenceNr != 0) {
log.debug("Saving snapshot, sequence number [{}]", snapshotSequenceNr)
saveSnapshot(state)
if (isSnapshotNeeded) {
saveSnapshot()
}
}
private def saveSnapshot(): Unit = {
log.debug("Saving snapshot, sequence number [{}]", snapshotSequenceNr)
saveSnapshot(state)
}
private def isSnapshotNeeded = {
lastSequenceNr % snapshotAfter == 0 && lastSequenceNr != 0
}
}