diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala index 40711bff05..b9ef25fcb7 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala @@ -562,21 +562,9 @@ private[akka] class PersistentShard( } override def receiveCommand: Receive = ({ - case SaveSnapshotSuccess(m) ⇒ + case e: SaveSnapshotSuccess ⇒ log.debug("PersistentShard snapshot saved successfully") - /* - * delete old events but keep the latest around because - * - * it's not safe to delete all events immediately because snapshots are typically stored with a weaker consistency - * level which means that a replay might "see" the deleted events before it sees the stored snapshot, - * i.e. it will use an older snapshot and then not replay the full sequence of events - * - * for debugging if something goes wrong in production it's very useful to be able to inspect the events - */ - val deleteToSequenceNr = m.sequenceNr - keepNrOfBatches * snapshotAfter - if (deleteToSequenceNr > 0) { - deleteMessages(deleteToSequenceNr) - } + internalDeleteMessagesBeforeSnapshot(e, keepNrOfBatches, snapshotAfter) case SaveSnapshotFailure(_, reason) ⇒ log.warning("PersistentShard snapshot failure: [{}]", reason.getMessage) diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala index 8dc81bcdaf..50e47ba2ba 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala @@ -874,21 +874,9 @@ class PersistentShardCoordinator(typeName: String, settings: ClusterShardingSett }: Receive).orElse[Any, Unit](receiveTerminated).orElse[Any, Unit](receiveSnapshotResult) def receiveSnapshotResult: Receive = { - case SaveSnapshotSuccess(m) ⇒ + case e: SaveSnapshotSuccess ⇒ log.debug("Persistent snapshot saved successfully") - /* - * delete old events but keep the latest around because - * - * it's not safe to delete all events immediate because snapshots are typically stored with a weaker consistency - * level which means that a replay might "see" the deleted events before it sees the stored snapshot, - * i.e. it will use an older snapshot and then not replay the full sequence of events - * - * for debugging if something goes wrong in production it's very useful to be able to inspect the events - */ - val deleteToSequenceNr = m.sequenceNr - keepNrOfBatches * snapshotAfter - if (deleteToSequenceNr > 0) { - deleteMessages(deleteToSequenceNr) - } + internalDeleteMessagesBeforeSnapshot(e, keepNrOfBatches, snapshotAfter) case SaveSnapshotFailure(_, reason) ⇒ log.warning("Persistent snapshot failure: {}", reason.getMessage) diff --git a/akka-persistence/src/main/mima-filters/2.5.21.backwards.excludes b/akka-persistence/src/main/mima-filters/2.5.21.backwards.excludes new file mode 100644 index 0000000000..5e14f18a73 --- /dev/null +++ b/akka-persistence/src/main/mima-filters/2.5.21.backwards.excludes @@ -0,0 +1,2 @@ +# #26451 Consolidate duplicate persistence sharding function +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.Eventsourced.internalDeleteMessagesBeforeSnapshot") diff --git a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala index 533574fb70..d6fe156f79 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala @@ -442,6 +442,29 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas s"toSequenceNr [$toSequenceNr] must be less than or equal to lastSequenceNr [$lastSequenceNr]"), toSequenceNr) } + /** + * INTERNAL API. + * An [[Eventsourced]] actor can request cleanup by deleting either a range of, or all persistent events. + * For example, on successful snapshot completion, delete messages within a configurable `snapshotAfter` + * range that are less than or equal to the given [[SnapshotMetadata.sequenceNr]] + * (provided the [[SnapshotMetadata.sequenceNr]] is <= to [[Eventsourced#lastSequenceNr]]). + * + * Or delete all by using `Long.MaxValue` as the `toSequenceNr` + * {{{ m.copy(sequenceNr = Long.MaxValue) }}} + */ + @InternalApi private[akka] def internalDeleteMessagesBeforeSnapshot( + e: SaveSnapshotSuccess, + keepNrOfBatches: Int, + snapshotAfter: Int): Unit = { + /* Delete old events but keep the latest around + 1. It's not safe to delete all events immediately because snapshots are typically stored with + a weaker consistency level. A replay might "see" the deleted events before it sees the stored + snapshot, i.e. it could use an older snapshot and not replay the full sequence of events + 2. If there is a production failure, it's useful to be able to inspect the events while debugging */ + val sequenceNr = e.metadata.sequenceNr - keepNrOfBatches * snapshotAfter + if (sequenceNr > 0) deleteMessages(sequenceNr) + } + /** * Returns `true` if this persistent actor is currently recovering. */