diff --git a/akka-cluster-sharding/src/main/resources/reference.conf b/akka-cluster-sharding/src/main/resources/reference.conf index e4c8b9b94a..3ec3819ac3 100644 --- a/akka-cluster-sharding/src/main/resources/reference.conf +++ b/akka-cluster-sharding/src/main/resources/reference.conf @@ -78,6 +78,13 @@ akka.cluster.sharding { # Only used when state-store-mode=persistence snapshot-after = 1000 + # The shard deletes persistent events (messages and snapshots) after doing snapshot + # keeping this number of old persistent batches. + # Batch is of size `snapshot-after`. + # When set to 0 after snapshot is successfully done all messages with equal or lower sequence number will be deleted. + # Default value of 2 leaves last maximum 2*`snapshot-after` messages and 3 snapshots (2 old ones + fresh snapshot) + keep-nr-of-batches = 2 + # Setting for the default shard allocation strategy least-shard-allocation-strategy { # Threshold of how large the difference between most and least number of diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala index 5e31ce23a4..91675e7353 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala @@ -37,6 +37,7 @@ object ClusterShardingSettings { entityRestartBackoff = config.getDuration("entity-restart-backoff", MILLISECONDS).millis, rebalanceInterval = config.getDuration("rebalance-interval", MILLISECONDS).millis, snapshotAfter = config.getInt("snapshot-after"), + keepNrOfBatches = config.getInt("keep-nr-of-batches"), leastShardAllocationRebalanceThreshold = config.getInt("least-shard-allocation-strategy.rebalance-threshold"), leastShardAllocationMaxSimultaneousRebalance = @@ -87,6 +88,7 @@ object ClusterShardingSettings { val entityRestartBackoff: FiniteDuration, val rebalanceInterval: FiniteDuration, val snapshotAfter: Int, + val keepNrOfBatches: Int, val leastShardAllocationRebalanceThreshold: Int, val leastShardAllocationMaxSimultaneousRebalance: Int, val waitingForStateTimeout: FiniteDuration, @@ -99,6 +101,44 @@ object ClusterShardingSettings { entityRecoveryStrategy == "all" || entityRecoveryStrategy == "constant", s"Unknown 'entity-recovery-strategy' [$entityRecoveryStrategy], valid values are 'all' or 'constant'") + // included for binary compatibility + def this( + coordinatorFailureBackoff: FiniteDuration, + retryInterval: FiniteDuration, + bufferSize: Int, + handOffTimeout: FiniteDuration, + shardStartTimeout: FiniteDuration, + shardFailureBackoff: FiniteDuration, + entityRestartBackoff: FiniteDuration, + rebalanceInterval: FiniteDuration, + snapshotAfter: Int, + leastShardAllocationRebalanceThreshold: Int, + leastShardAllocationMaxSimultaneousRebalance: Int, + waitingForStateTimeout: FiniteDuration, + updatingStateTimeout: FiniteDuration, + entityRecoveryStrategy: String, + entityRecoveryConstantRateStrategyFrequency: FiniteDuration, + entityRecoveryConstantRateStrategyNumberOfEntities: Int) = { + this( + coordinatorFailureBackoff, + retryInterval, + bufferSize, + handOffTimeout, + shardStartTimeout, + shardFailureBackoff, + entityRestartBackoff, + rebalanceInterval, + snapshotAfter, + 2, + leastShardAllocationRebalanceThreshold, + leastShardAllocationMaxSimultaneousRebalance, + waitingForStateTimeout, + updatingStateTimeout, + entityRecoveryStrategy, + entityRecoveryConstantRateStrategyFrequency, + entityRecoveryConstantRateStrategyNumberOfEntities) + } + // included for binary compatibility def this( coordinatorFailureBackoff: FiniteDuration, diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala index 323a45d87b..c8155758f0 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala @@ -4,6 +4,7 @@ package akka.cluster.sharding import java.net.URLEncoder + import akka.actor.ActorLogging import akka.actor.ActorRef import akka.actor.ActorSystem @@ -11,12 +12,9 @@ import akka.actor.Deploy import akka.actor.Props import akka.actor.Terminated import akka.cluster.sharding.Shard.ShardCommand -import akka.persistence.PersistentActor -import akka.persistence.SnapshotOffer +import akka.persistence._ import akka.actor.Actor -import akka.persistence.RecoveryCompleted -import akka.persistence.SaveSnapshotFailure -import akka.persistence.SaveSnapshotSuccess + import scala.concurrent.Future import scala.concurrent.duration.FiniteDuration import akka.cluster.Cluster @@ -210,7 +208,7 @@ private[akka] class Shard( } def receiveTerminated(ref: ActorRef): Unit = { - if (handOffStopper.exists(_ == ref)) + if (handOffStopper.contains(ref)) context stop self else if (idByRef.contains(ref) && handOffStopper.isEmpty) entityTerminated(ref) @@ -401,7 +399,7 @@ private[akka] class PersistentShard( import Shard._ import settings.tuningParameters._ - override def persistenceId = s"/sharding/${typeName}Shard/${shardId}" + override def persistenceId = s"/sharding/${typeName}Shard/$shardId" override def journalPluginId: String = settings.journalPluginId @@ -435,10 +433,38 @@ private[akka] class PersistentShard( } override def receiveCommand: Receive = ({ - case _: SaveSnapshotSuccess ⇒ + case SaveSnapshotSuccess(m) ⇒ log.debug("PersistentShard snapshot saved successfully") + /* + * delete old events but keep the latest around because + * + * it's not safe to delete all events immediate because snapshots are typically stored with a weaker consistency + * level which means that a replay might "see" the deleted events before it sees the stored snapshot, + * i.e. it will use an older snapshot and then not replay the full sequence of events + * + * for debugging if something goes wrong in production it's very useful to be able to inspect the events + */ + val deleteToSequenceNr = m.sequenceNr - keepNrOfBatches * snapshotAfter + if (deleteToSequenceNr > 0) { + deleteMessages(deleteToSequenceNr) + } + case SaveSnapshotFailure(_, reason) ⇒ log.warning("PersistentShard snapshot failure: {}", reason.getMessage) + + case DeleteMessagesSuccess(toSequenceNr) ⇒ + log.debug("PersistentShard messages to {} deleted successfully", toSequenceNr) + deleteSnapshots(SnapshotSelectionCriteria(maxSequenceNr = toSequenceNr - 1)) + + case DeleteMessagesFailure(reason, toSequenceNr) ⇒ + log.warning("PersistentShard messages to {} deletion failure: {}", toSequenceNr, reason.getMessage) + + case DeleteSnapshotSuccess(m) ⇒ + log.debug("PersistentShard snapshots matching {} deleted successfully", m) + + case DeleteSnapshotFailure(m, reason) ⇒ + log.warning("PersistentShard snapshots matching {} deletion falure: {}", m, reason.getMessage) + }: Receive).orElse(super.receiveCommand) } @@ -617,8 +643,8 @@ final class AllAtOnceEntityRecoveryStrategy extends EntityRecoveryStrategy { final class ConstantRateEntityRecoveryStrategy(actorSystem: ActorSystem, frequency: FiniteDuration, numberOfEntities: Int) extends EntityRecoveryStrategy { import ShardRegion.EntityId - import akka.pattern.after import actorSystem.dispatcher + import akka.pattern.after override def recoverEntities(entities: Set[EntityId]): Set[Future[Set[EntityId]]] = entities.grouped(numberOfEntities).foldLeft((frequency, Set[Future[Set[EntityId]]]())) { diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala index 003ea15771..585e9e0087 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala @@ -803,11 +803,37 @@ class PersistentShardCoordinator(typeName: String, settings: ClusterShardingSett }: Receive).orElse[Any, Unit](receiveTerminated).orElse[Any, Unit](receiveSnapshotResult) def receiveSnapshotResult: Receive = { - case SaveSnapshotSuccess(_) ⇒ + case SaveSnapshotSuccess(m) ⇒ log.debug("Persistent snapshot saved successfully") + /* + * delete old events but keep the latest around because + * + * it's not safe to delete all events immediate because snapshots are typically stored with a weaker consistency + * level which means that a replay might "see" the deleted events before it sees the stored snapshot, + * i.e. it will use an older snapshot and then not replay the full sequence of events + * + * for debugging if something goes wrong in production it's very useful to be able to inspect the events + */ + val deleteToSequenceNr = m.sequenceNr - keepNrOfBatches * snapshotAfter + if (deleteToSequenceNr > 0) { + deleteMessages(deleteToSequenceNr) + } case SaveSnapshotFailure(_, reason) ⇒ log.warning("Persistent snapshot failure: {}", reason.getMessage) + + case DeleteMessagesSuccess(toSequenceNr) ⇒ + log.debug("Persistent messages to {} deleted successfully", toSequenceNr) + deleteSnapshots(SnapshotSelectionCriteria(maxSequenceNr = toSequenceNr - 1)) + + case DeleteMessagesFailure(reason, toSequenceNr) ⇒ + log.warning("Persistent messages to {} deletion failure: {}", toSequenceNr, reason.getMessage) + + case DeleteSnapshotSuccess(m) ⇒ + log.debug("Persistent snapshots matching {} deleted successfully", m) + + case DeleteSnapshotFailure(m, reason) ⇒ + log.warning("Persistent snapshots matching {} deletion falure: {}", m, reason.getMessage) } def update[E <: DomainEvent](evt: E)(f: E ⇒ Unit): Unit = { diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/RemoveInternalClusterShardingDataSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/RemoveInternalClusterShardingDataSpec.scala index 2d9e246703..1e6745be0f 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/RemoveInternalClusterShardingDataSpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/RemoveInternalClusterShardingDataSpec.scala @@ -39,6 +39,7 @@ object RemoveInternalClusterShardingDataSpec { akka.persistence.snapshot-store.local.dir = "target/snapshots-RemoveInternalClusterShardingDataSpec" akka.cluster.sharding.snapshot-after = 5 akka.cluster.sharding.state-store-mode = persistence + |akka.cluster.sharding.keep-nr-of-batches = 0 """ val extractEntityId: ShardRegion.ExtractEntityId = { @@ -97,11 +98,11 @@ class RemoveInternalClusterShardingDataSpec extends AkkaSpec(RemoveInternalClust "akka.persistence.journal.leveldb.dir", "akka.persistence.snapshot-store.local.dir").map(s ⇒ new File(system.settings.config.getString(s))) - override protected def atStartup() { + override protected def atStartup(): Unit = { storageLocations.foreach(dir ⇒ if (dir.exists) FileUtils.deleteDirectory(dir)) } - override protected def afterTermination() { + override protected def afterTermination(): Unit = { storageLocations.foreach(dir ⇒ if (dir.exists) FileUtils.deleteDirectory(dir)) } @@ -122,8 +123,8 @@ class RemoveInternalClusterShardingDataSpec extends AkkaSpec(RemoveInternalClust "setup sharding" in { Cluster(system).join(Cluster(system).selfAddress) val settings = ClusterShardingSettings(system) - ClusterSharding(system).start("type1", Props[EchoActor], settings, extractEntityId, extractShardId) - ClusterSharding(system).start("type2", Props[EchoActor], settings, extractEntityId, extractShardId) + ClusterSharding(system).start("type1", Props[EchoActor](), settings, extractEntityId, extractShardId) + ClusterSharding(system).start("type2", Props[EchoActor](), settings, extractEntityId, extractShardId) } "work when no data" in within(10.seconds) { @@ -132,7 +133,7 @@ class RemoveInternalClusterShardingDataSpec extends AkkaSpec(RemoveInternalClust val rm = system.actorOf(RemoveInternalClusterShardingData.RemoveOnePersistenceId.props( journalPluginId = "", persistenceId("type1"), testActor)) watch(rm) - expectMsg(Result(Success(Removals(false, false)))) + expectMsg(Result(Success(Removals(events = false, snapshots = false)))) expectTerminated(rm) } @@ -146,7 +147,7 @@ class RemoveInternalClusterShardingDataSpec extends AkkaSpec(RemoveInternalClust val rm = system.actorOf(RemoveInternalClusterShardingData.RemoveOnePersistenceId.props( journalPluginId = "", persistenceId("type1"), testActor)) watch(rm) - expectMsg(Result(Success(Removals(true, false)))) + expectMsg(Result(Success(Removals(events = true, snapshots = false)))) expectTerminated(rm) hasSnapshots("type1") should ===(false) hasEvents("type1") should ===(false) @@ -165,7 +166,7 @@ class RemoveInternalClusterShardingDataSpec extends AkkaSpec(RemoveInternalClust val rm = system.actorOf(RemoveInternalClusterShardingData.RemoveOnePersistenceId.props( journalPluginId = "", persistenceId("type2"), testActor)) watch(rm) - expectMsg(Result(Success(Removals(true, true)))) + expectMsg(Result(Success(Removals(events = true, snapshots = true)))) expectTerminated(rm) hasSnapshots("type2") should ===(false) hasEvents("type2") should ===(false) @@ -179,7 +180,7 @@ class RemoveInternalClusterShardingDataSpec extends AkkaSpec(RemoveInternalClust Cluster(system).join(Cluster(system).selfAddress) val settings = ClusterShardingSettings(system) typeNames.foreach { typeName ⇒ - ClusterSharding(system).start(typeName, Props[EchoActor], settings, extractEntityId, extractShardId) + ClusterSharding(system).start(typeName, Props[EchoActor](), settings, extractEntityId, extractShardId) } }