#21725 cluster-sharding doesn't delete snapshots and messages (#21777)

* #21725 cluster-sharding doesn't delete snapshots and messages
Fixes #21725
Without deleting messages those pollute persistence with not needed anymore messages. Naive and bullet proof flow is snapshot -> delete messges -> delete snapshots.

# Пожалуйста, введите сообщение коммита для ваших изменений. Строки,
# начинающиеся с «#» будут оставлены; вы можете удалить их вручную,
# если хотите. Пустое сообщение отменяет процесс коммита.
#
# Дата:      Mon Oct 31 23:24:37 2016 +0300
#
# интерактивное перемещение в процессе; над 432b53c
# Последняя команда выполнена (1 команда выполнена):
#    edit f86b015 21725 cluster-sharding doesn't delete snapshots and messages Fixes #21725 Without deleting messages those pollute persistence with not needed anymore messages. Naive and bullet proof flow is snapshot -> delete messges -> delete snapshots.
# Следующая команда для выполнения (1 команда осталась):
#    pick 56adb40 #21725 keeping N number of batches (messages and snapshot) using N from configuration
# Вы сейчас редактируете коммит при перемещении ветки  «fix-21725-delete-messages-after-snapshot» над «432b53c».
#
# Изменения, которые будут включены в коммит:
#	изменено:      akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala
#	изменено:      akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala
#

* #21725 keeping N number of batches (messages and snapshot) using N from configuration
This commit is contained in:
Kirill Plyashkevich 2017-02-21 16:17:19 +03:00 committed by Patrik Nordwall
parent 59a3596360
commit c5ba0a3565
5 changed files with 118 additions and 18 deletions

View file

@ -78,6 +78,13 @@ akka.cluster.sharding {
# Only used when state-store-mode=persistence
snapshot-after = 1000
# The shard deletes persistent events (messages and snapshots) after doing snapshot
# keeping this number of old persistent batches.
# Batch is of size `snapshot-after`.
# When set to 0 after snapshot is successfully done all messages with equal or lower sequence number will be deleted.
# Default value of 2 leaves last maximum 2*`snapshot-after` messages and 3 snapshots (2 old ones + fresh snapshot)
keep-nr-of-batches = 2
# Setting for the default shard allocation strategy
least-shard-allocation-strategy {
# Threshold of how large the difference between most and least number of

View file

@ -37,6 +37,7 @@ object ClusterShardingSettings {
entityRestartBackoff = config.getDuration("entity-restart-backoff", MILLISECONDS).millis,
rebalanceInterval = config.getDuration("rebalance-interval", MILLISECONDS).millis,
snapshotAfter = config.getInt("snapshot-after"),
keepNrOfBatches = config.getInt("keep-nr-of-batches"),
leastShardAllocationRebalanceThreshold =
config.getInt("least-shard-allocation-strategy.rebalance-threshold"),
leastShardAllocationMaxSimultaneousRebalance =
@ -87,6 +88,7 @@ object ClusterShardingSettings {
val entityRestartBackoff: FiniteDuration,
val rebalanceInterval: FiniteDuration,
val snapshotAfter: Int,
val keepNrOfBatches: Int,
val leastShardAllocationRebalanceThreshold: Int,
val leastShardAllocationMaxSimultaneousRebalance: Int,
val waitingForStateTimeout: FiniteDuration,
@ -99,6 +101,44 @@ object ClusterShardingSettings {
entityRecoveryStrategy == "all" || entityRecoveryStrategy == "constant",
s"Unknown 'entity-recovery-strategy' [$entityRecoveryStrategy], valid values are 'all' or 'constant'")
// included for binary compatibility
def this(
coordinatorFailureBackoff: FiniteDuration,
retryInterval: FiniteDuration,
bufferSize: Int,
handOffTimeout: FiniteDuration,
shardStartTimeout: FiniteDuration,
shardFailureBackoff: FiniteDuration,
entityRestartBackoff: FiniteDuration,
rebalanceInterval: FiniteDuration,
snapshotAfter: Int,
leastShardAllocationRebalanceThreshold: Int,
leastShardAllocationMaxSimultaneousRebalance: Int,
waitingForStateTimeout: FiniteDuration,
updatingStateTimeout: FiniteDuration,
entityRecoveryStrategy: String,
entityRecoveryConstantRateStrategyFrequency: FiniteDuration,
entityRecoveryConstantRateStrategyNumberOfEntities: Int) = {
this(
coordinatorFailureBackoff,
retryInterval,
bufferSize,
handOffTimeout,
shardStartTimeout,
shardFailureBackoff,
entityRestartBackoff,
rebalanceInterval,
snapshotAfter,
2,
leastShardAllocationRebalanceThreshold,
leastShardAllocationMaxSimultaneousRebalance,
waitingForStateTimeout,
updatingStateTimeout,
entityRecoveryStrategy,
entityRecoveryConstantRateStrategyFrequency,
entityRecoveryConstantRateStrategyNumberOfEntities)
}
// included for binary compatibility
def this(
coordinatorFailureBackoff: FiniteDuration,

View file

@ -4,6 +4,7 @@
package akka.cluster.sharding
import java.net.URLEncoder
import akka.actor.ActorLogging
import akka.actor.ActorRef
import akka.actor.ActorSystem
@ -11,12 +12,9 @@ import akka.actor.Deploy
import akka.actor.Props
import akka.actor.Terminated
import akka.cluster.sharding.Shard.ShardCommand
import akka.persistence.PersistentActor
import akka.persistence.SnapshotOffer
import akka.persistence._
import akka.actor.Actor
import akka.persistence.RecoveryCompleted
import akka.persistence.SaveSnapshotFailure
import akka.persistence.SaveSnapshotSuccess
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
import akka.cluster.Cluster
@ -210,7 +208,7 @@ private[akka] class Shard(
}
def receiveTerminated(ref: ActorRef): Unit = {
if (handOffStopper.exists(_ == ref))
if (handOffStopper.contains(ref))
context stop self
else if (idByRef.contains(ref) && handOffStopper.isEmpty)
entityTerminated(ref)
@ -401,7 +399,7 @@ private[akka] class PersistentShard(
import Shard._
import settings.tuningParameters._
override def persistenceId = s"/sharding/${typeName}Shard/${shardId}"
override def persistenceId = s"/sharding/${typeName}Shard/$shardId"
override def journalPluginId: String = settings.journalPluginId
@ -435,10 +433,38 @@ private[akka] class PersistentShard(
}
override def receiveCommand: Receive = ({
case _: SaveSnapshotSuccess
case SaveSnapshotSuccess(m)
log.debug("PersistentShard snapshot saved successfully")
/*
* delete old events but keep the latest around because
*
* it's not safe to delete all events immediate because snapshots are typically stored with a weaker consistency
* level which means that a replay might "see" the deleted events before it sees the stored snapshot,
* i.e. it will use an older snapshot and then not replay the full sequence of events
*
* for debugging if something goes wrong in production it's very useful to be able to inspect the events
*/
val deleteToSequenceNr = m.sequenceNr - keepNrOfBatches * snapshotAfter
if (deleteToSequenceNr > 0) {
deleteMessages(deleteToSequenceNr)
}
case SaveSnapshotFailure(_, reason)
log.warning("PersistentShard snapshot failure: {}", reason.getMessage)
case DeleteMessagesSuccess(toSequenceNr)
log.debug("PersistentShard messages to {} deleted successfully", toSequenceNr)
deleteSnapshots(SnapshotSelectionCriteria(maxSequenceNr = toSequenceNr - 1))
case DeleteMessagesFailure(reason, toSequenceNr)
log.warning("PersistentShard messages to {} deletion failure: {}", toSequenceNr, reason.getMessage)
case DeleteSnapshotSuccess(m)
log.debug("PersistentShard snapshots matching {} deleted successfully", m)
case DeleteSnapshotFailure(m, reason)
log.warning("PersistentShard snapshots matching {} deletion falure: {}", m, reason.getMessage)
}: Receive).orElse(super.receiveCommand)
}
@ -617,8 +643,8 @@ final class AllAtOnceEntityRecoveryStrategy extends EntityRecoveryStrategy {
final class ConstantRateEntityRecoveryStrategy(actorSystem: ActorSystem, frequency: FiniteDuration, numberOfEntities: Int) extends EntityRecoveryStrategy {
import ShardRegion.EntityId
import akka.pattern.after
import actorSystem.dispatcher
import akka.pattern.after
override def recoverEntities(entities: Set[EntityId]): Set[Future[Set[EntityId]]] =
entities.grouped(numberOfEntities).foldLeft((frequency, Set[Future[Set[EntityId]]]())) {

View file

@ -803,11 +803,37 @@ class PersistentShardCoordinator(typeName: String, settings: ClusterShardingSett
}: Receive).orElse[Any, Unit](receiveTerminated).orElse[Any, Unit](receiveSnapshotResult)
def receiveSnapshotResult: Receive = {
case SaveSnapshotSuccess(_)
case SaveSnapshotSuccess(m)
log.debug("Persistent snapshot saved successfully")
/*
* delete old events but keep the latest around because
*
* it's not safe to delete all events immediate because snapshots are typically stored with a weaker consistency
* level which means that a replay might "see" the deleted events before it sees the stored snapshot,
* i.e. it will use an older snapshot and then not replay the full sequence of events
*
* for debugging if something goes wrong in production it's very useful to be able to inspect the events
*/
val deleteToSequenceNr = m.sequenceNr - keepNrOfBatches * snapshotAfter
if (deleteToSequenceNr > 0) {
deleteMessages(deleteToSequenceNr)
}
case SaveSnapshotFailure(_, reason)
log.warning("Persistent snapshot failure: {}", reason.getMessage)
case DeleteMessagesSuccess(toSequenceNr)
log.debug("Persistent messages to {} deleted successfully", toSequenceNr)
deleteSnapshots(SnapshotSelectionCriteria(maxSequenceNr = toSequenceNr - 1))
case DeleteMessagesFailure(reason, toSequenceNr)
log.warning("Persistent messages to {} deletion failure: {}", toSequenceNr, reason.getMessage)
case DeleteSnapshotSuccess(m)
log.debug("Persistent snapshots matching {} deleted successfully", m)
case DeleteSnapshotFailure(m, reason)
log.warning("Persistent snapshots matching {} deletion falure: {}", m, reason.getMessage)
}
def update[E <: DomainEvent](evt: E)(f: E Unit): Unit = {

View file

@ -39,6 +39,7 @@ object RemoveInternalClusterShardingDataSpec {
akka.persistence.snapshot-store.local.dir = "target/snapshots-RemoveInternalClusterShardingDataSpec"
akka.cluster.sharding.snapshot-after = 5
akka.cluster.sharding.state-store-mode = persistence
|akka.cluster.sharding.keep-nr-of-batches = 0
"""
val extractEntityId: ShardRegion.ExtractEntityId = {
@ -97,11 +98,11 @@ class RemoveInternalClusterShardingDataSpec extends AkkaSpec(RemoveInternalClust
"akka.persistence.journal.leveldb.dir",
"akka.persistence.snapshot-store.local.dir").map(s new File(system.settings.config.getString(s)))
override protected def atStartup() {
override protected def atStartup(): Unit = {
storageLocations.foreach(dir if (dir.exists) FileUtils.deleteDirectory(dir))
}
override protected def afterTermination() {
override protected def afterTermination(): Unit = {
storageLocations.foreach(dir if (dir.exists) FileUtils.deleteDirectory(dir))
}
@ -122,8 +123,8 @@ class RemoveInternalClusterShardingDataSpec extends AkkaSpec(RemoveInternalClust
"setup sharding" in {
Cluster(system).join(Cluster(system).selfAddress)
val settings = ClusterShardingSettings(system)
ClusterSharding(system).start("type1", Props[EchoActor], settings, extractEntityId, extractShardId)
ClusterSharding(system).start("type2", Props[EchoActor], settings, extractEntityId, extractShardId)
ClusterSharding(system).start("type1", Props[EchoActor](), settings, extractEntityId, extractShardId)
ClusterSharding(system).start("type2", Props[EchoActor](), settings, extractEntityId, extractShardId)
}
"work when no data" in within(10.seconds) {
@ -132,7 +133,7 @@ class RemoveInternalClusterShardingDataSpec extends AkkaSpec(RemoveInternalClust
val rm = system.actorOf(RemoveInternalClusterShardingData.RemoveOnePersistenceId.props(
journalPluginId = "", persistenceId("type1"), testActor))
watch(rm)
expectMsg(Result(Success(Removals(false, false))))
expectMsg(Result(Success(Removals(events = false, snapshots = false))))
expectTerminated(rm)
}
@ -146,7 +147,7 @@ class RemoveInternalClusterShardingDataSpec extends AkkaSpec(RemoveInternalClust
val rm = system.actorOf(RemoveInternalClusterShardingData.RemoveOnePersistenceId.props(
journalPluginId = "", persistenceId("type1"), testActor))
watch(rm)
expectMsg(Result(Success(Removals(true, false))))
expectMsg(Result(Success(Removals(events = true, snapshots = false))))
expectTerminated(rm)
hasSnapshots("type1") should ===(false)
hasEvents("type1") should ===(false)
@ -165,7 +166,7 @@ class RemoveInternalClusterShardingDataSpec extends AkkaSpec(RemoveInternalClust
val rm = system.actorOf(RemoveInternalClusterShardingData.RemoveOnePersistenceId.props(
journalPluginId = "", persistenceId("type2"), testActor))
watch(rm)
expectMsg(Result(Success(Removals(true, true))))
expectMsg(Result(Success(Removals(events = true, snapshots = true))))
expectTerminated(rm)
hasSnapshots("type2") should ===(false)
hasEvents("type2") should ===(false)
@ -179,7 +180,7 @@ class RemoveInternalClusterShardingDataSpec extends AkkaSpec(RemoveInternalClust
Cluster(system).join(Cluster(system).selfAddress)
val settings = ClusterShardingSettings(system)
typeNames.foreach { typeName
ClusterSharding(system).start(typeName, Props[EchoActor], settings, extractEntityId, extractShardId)
ClusterSharding(system).start(typeName, Props[EchoActor](), settings, extractEntityId, extractShardId)
}
}