diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala index 6be5ee2d52..608fb5e706 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala @@ -14,8 +14,8 @@ import akka.actor.Props import akka.actor.Terminated import akka.cluster.sharding.Shard.ShardCommand import akka.actor.Actor - import akka.util.MessageBufferMap + import scala.concurrent.Future import scala.concurrent.duration.FiniteDuration import akka.cluster.Cluster @@ -23,16 +23,7 @@ import akka.cluster.ddata.ORSet import akka.cluster.ddata.ORSetKey import akka.cluster.ddata.Replicator._ import akka.actor.Stash -import akka.persistence.PersistentActor -import akka.persistence.SnapshotOffer -import akka.persistence.SaveSnapshotSuccess -import akka.persistence.DeleteSnapshotsFailure -import akka.persistence.DeleteMessagesSuccess -import akka.persistence.SaveSnapshotFailure -import akka.persistence.DeleteMessagesFailure -import akka.persistence.DeleteSnapshotsSuccess -import akka.persistence.SnapshotSelectionCriteria -import akka.persistence.RecoveryCompleted +import akka.persistence._ import akka.actor.NoSerializationVerificationNeeded /** @@ -531,7 +522,7 @@ private[akka] class PersistentShard( /* * delete old events but keep the latest around because * - * it's not safe to delete all events immediate because snapshots are typically stored with a weaker consistency + * it's not safe to delete all events immediately because snapshots are typically stored with a weaker consistency * level which means that a replay might "see" the deleted events before it sees the stored snapshot, * i.e. it will use an older snapshot and then not replay the full sequence of events * @@ -543,20 +534,25 @@ private[akka] class PersistentShard( } case SaveSnapshotFailure(_, reason) ⇒ - log.warning("PersistentShard snapshot failure: {}", reason.getMessage) + log.warning("PersistentShard snapshot failure: [{}]", reason.getMessage) case DeleteMessagesSuccess(toSequenceNr) ⇒ - log.debug("PersistentShard messages to {} deleted successfully", toSequenceNr) - deleteSnapshots(SnapshotSelectionCriteria(maxSequenceNr = toSequenceNr - 1)) + val deleteTo = toSequenceNr - 1 + val deleteFrom = math.max(0, deleteTo - (keepNrOfBatches * snapshotAfter)) + log.debug("PersistentShard messages to [{}] deleted successfully. Deleting snapshots from [{}] to [{}]", toSequenceNr, deleteFrom, deleteTo) + deleteSnapshots(SnapshotSelectionCriteria( + minSequenceNr = deleteFrom, + maxSequenceNr = deleteTo + )) case DeleteMessagesFailure(reason, toSequenceNr) ⇒ - log.warning("PersistentShard messages to {} deletion failure: {}", toSequenceNr, reason.getMessage) + log.warning("PersistentShard messages to [{}] deletion failure: [{}]", toSequenceNr, reason.getMessage) case DeleteSnapshotsSuccess(m) ⇒ - log.debug("PersistentShard snapshots matching {} deleted successfully", m) + log.debug("PersistentShard snapshots matching [{}] deleted successfully", m) case DeleteSnapshotsFailure(m, reason) ⇒ - log.warning("PersistentShard snapshots matching {} deletion failure: {}", m, reason.getMessage) + log.warning("PersistentShard snapshots matching [{}] deletion failure: [{}]", m, reason.getMessage) }: Receive).orElse(super.receiveCommand) diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/RemoveInternalClusterShardingDataSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/RemoveInternalClusterShardingDataSpec.scala index a4055b1df7..ea89c05f44 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/RemoveInternalClusterShardingDataSpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/RemoveInternalClusterShardingDataSpec.scala @@ -40,7 +40,7 @@ object RemoveInternalClusterShardingDataSpec { akka.persistence.snapshot-store.local.dir = "target/snapshots-RemoveInternalClusterShardingDataSpec" akka.cluster.sharding.snapshot-after = 5 akka.cluster.sharding.state-store-mode = persistence - |akka.cluster.sharding.keep-nr-of-batches = 0 + akka.cluster.sharding.keep-nr-of-batches = 0 """ val extractEntityId: ShardRegion.ExtractEntityId = { diff --git a/akka-docs/src/main/paradox/persistence.md b/akka-docs/src/main/paradox/persistence.md index fbeef9c6fb..6873e29df5 100644 --- a/akka-docs/src/main/paradox/persistence.md +++ b/akka-docs/src/main/paradox/persistence.md @@ -693,7 +693,9 @@ A persistent actor can delete individual snapshots by calling the `deleteSnapsho when the snapshot was taken. To bulk-delete a range of snapshots matching `SnapshotSelectionCriteria`, -persistent actors should use the `deleteSnapshots` method. +persistent actors should use the `deleteSnapshots` method. Depending on the journal used this might be inefficient. It is +best practice to do specific deletes with `deleteSnapshot` or to include a `minSequenceNr` as well as a `maxSequenceNr` +for the `SnapshotSelectionCriteria`. ### Snapshot status handling