Provide minSequenceNr for snapshot deletion (#25590)
* Provide minSequenceNr for snapshot deletion Journals can use this to make the bulk deletion more efficient Use keepNrBatches to delete the last few snapshots in case previous deletes failed.
This commit is contained in:
parent
d7c463e033
commit
23b9266fca
3 changed files with 18 additions and 20 deletions
|
|
@ -14,8 +14,8 @@ import akka.actor.Props
|
||||||
import akka.actor.Terminated
|
import akka.actor.Terminated
|
||||||
import akka.cluster.sharding.Shard.ShardCommand
|
import akka.cluster.sharding.Shard.ShardCommand
|
||||||
import akka.actor.Actor
|
import akka.actor.Actor
|
||||||
|
|
||||||
import akka.util.MessageBufferMap
|
import akka.util.MessageBufferMap
|
||||||
|
|
||||||
import scala.concurrent.Future
|
import scala.concurrent.Future
|
||||||
import scala.concurrent.duration.FiniteDuration
|
import scala.concurrent.duration.FiniteDuration
|
||||||
import akka.cluster.Cluster
|
import akka.cluster.Cluster
|
||||||
|
|
@ -23,16 +23,7 @@ import akka.cluster.ddata.ORSet
|
||||||
import akka.cluster.ddata.ORSetKey
|
import akka.cluster.ddata.ORSetKey
|
||||||
import akka.cluster.ddata.Replicator._
|
import akka.cluster.ddata.Replicator._
|
||||||
import akka.actor.Stash
|
import akka.actor.Stash
|
||||||
import akka.persistence.PersistentActor
|
import akka.persistence._
|
||||||
import akka.persistence.SnapshotOffer
|
|
||||||
import akka.persistence.SaveSnapshotSuccess
|
|
||||||
import akka.persistence.DeleteSnapshotsFailure
|
|
||||||
import akka.persistence.DeleteMessagesSuccess
|
|
||||||
import akka.persistence.SaveSnapshotFailure
|
|
||||||
import akka.persistence.DeleteMessagesFailure
|
|
||||||
import akka.persistence.DeleteSnapshotsSuccess
|
|
||||||
import akka.persistence.SnapshotSelectionCriteria
|
|
||||||
import akka.persistence.RecoveryCompleted
|
|
||||||
import akka.actor.NoSerializationVerificationNeeded
|
import akka.actor.NoSerializationVerificationNeeded
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -531,7 +522,7 @@ private[akka] class PersistentShard(
|
||||||
/*
|
/*
|
||||||
* delete old events but keep the latest around because
|
* delete old events but keep the latest around because
|
||||||
*
|
*
|
||||||
* it's not safe to delete all events immediate because snapshots are typically stored with a weaker consistency
|
* it's not safe to delete all events immediately because snapshots are typically stored with a weaker consistency
|
||||||
* level which means that a replay might "see" the deleted events before it sees the stored snapshot,
|
* level which means that a replay might "see" the deleted events before it sees the stored snapshot,
|
||||||
* i.e. it will use an older snapshot and then not replay the full sequence of events
|
* i.e. it will use an older snapshot and then not replay the full sequence of events
|
||||||
*
|
*
|
||||||
|
|
@ -543,20 +534,25 @@ private[akka] class PersistentShard(
|
||||||
}
|
}
|
||||||
|
|
||||||
case SaveSnapshotFailure(_, reason) ⇒
|
case SaveSnapshotFailure(_, reason) ⇒
|
||||||
log.warning("PersistentShard snapshot failure: {}", reason.getMessage)
|
log.warning("PersistentShard snapshot failure: [{}]", reason.getMessage)
|
||||||
|
|
||||||
case DeleteMessagesSuccess(toSequenceNr) ⇒
|
case DeleteMessagesSuccess(toSequenceNr) ⇒
|
||||||
log.debug("PersistentShard messages to {} deleted successfully", toSequenceNr)
|
val deleteTo = toSequenceNr - 1
|
||||||
deleteSnapshots(SnapshotSelectionCriteria(maxSequenceNr = toSequenceNr - 1))
|
val deleteFrom = math.max(0, deleteTo - (keepNrOfBatches * snapshotAfter))
|
||||||
|
log.debug("PersistentShard messages to [{}] deleted successfully. Deleting snapshots from [{}] to [{}]", toSequenceNr, deleteFrom, deleteTo)
|
||||||
|
deleteSnapshots(SnapshotSelectionCriteria(
|
||||||
|
minSequenceNr = deleteFrom,
|
||||||
|
maxSequenceNr = deleteTo
|
||||||
|
))
|
||||||
|
|
||||||
case DeleteMessagesFailure(reason, toSequenceNr) ⇒
|
case DeleteMessagesFailure(reason, toSequenceNr) ⇒
|
||||||
log.warning("PersistentShard messages to {} deletion failure: {}", toSequenceNr, reason.getMessage)
|
log.warning("PersistentShard messages to [{}] deletion failure: [{}]", toSequenceNr, reason.getMessage)
|
||||||
|
|
||||||
case DeleteSnapshotsSuccess(m) ⇒
|
case DeleteSnapshotsSuccess(m) ⇒
|
||||||
log.debug("PersistentShard snapshots matching {} deleted successfully", m)
|
log.debug("PersistentShard snapshots matching [{}] deleted successfully", m)
|
||||||
|
|
||||||
case DeleteSnapshotsFailure(m, reason) ⇒
|
case DeleteSnapshotsFailure(m, reason) ⇒
|
||||||
log.warning("PersistentShard snapshots matching {} deletion failure: {}", m, reason.getMessage)
|
log.warning("PersistentShard snapshots matching [{}] deletion failure: [{}]", m, reason.getMessage)
|
||||||
|
|
||||||
}: Receive).orElse(super.receiveCommand)
|
}: Receive).orElse(super.receiveCommand)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -40,7 +40,7 @@ object RemoveInternalClusterShardingDataSpec {
|
||||||
akka.persistence.snapshot-store.local.dir = "target/snapshots-RemoveInternalClusterShardingDataSpec"
|
akka.persistence.snapshot-store.local.dir = "target/snapshots-RemoveInternalClusterShardingDataSpec"
|
||||||
akka.cluster.sharding.snapshot-after = 5
|
akka.cluster.sharding.snapshot-after = 5
|
||||||
akka.cluster.sharding.state-store-mode = persistence
|
akka.cluster.sharding.state-store-mode = persistence
|
||||||
|akka.cluster.sharding.keep-nr-of-batches = 0
|
akka.cluster.sharding.keep-nr-of-batches = 0
|
||||||
"""
|
"""
|
||||||
|
|
||||||
val extractEntityId: ShardRegion.ExtractEntityId = {
|
val extractEntityId: ShardRegion.ExtractEntityId = {
|
||||||
|
|
|
||||||
|
|
@ -693,7 +693,9 @@ A persistent actor can delete individual snapshots by calling the `deleteSnapsho
|
||||||
when the snapshot was taken.
|
when the snapshot was taken.
|
||||||
|
|
||||||
To bulk-delete a range of snapshots matching `SnapshotSelectionCriteria`,
|
To bulk-delete a range of snapshots matching `SnapshotSelectionCriteria`,
|
||||||
persistent actors should use the `deleteSnapshots` method.
|
persistent actors should use the `deleteSnapshots` method. Depending on the journal used this might be inefficient. It is
|
||||||
|
best practice to do specific deletes with `deleteSnapshot` or to include a `minSequenceNr` as well as a `maxSequenceNr`
|
||||||
|
for the `SnapshotSelectionCriteria`.
|
||||||
|
|
||||||
### Snapshot status handling
|
### Snapshot status handling
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue