Typed Persistence: deleting snapshots / events #24698

This commit is contained in:
Helena Edelson 2019-03-18 05:22:52 -07:00 committed by Johan Andrén
parent 440a85aef1
commit d358a0c3b5
12 changed files with 380 additions and 54 deletions

View file

@ -52,14 +52,14 @@ final case class SnapshotFailed(metadata: SnapshotMetadata, failure: Throwable)
def getSnapshotMetadata(): SnapshotMetadata = metadata
}
final case class DeleteSnapshotCompleted(target: DeletionTarget) extends EventSourcedSignal {
final case class DeleteSnapshotsCompleted(target: DeletionTarget) extends EventSourcedSignal {
/**
* Java API
*/
def getTarget(): DeletionTarget = target
}
final case class DeleteSnapshotFailed(target: DeletionTarget, failure: Throwable) extends EventSourcedSignal {
final case class DeleteSnapshotsFailed(target: DeletionTarget, failure: Throwable) extends EventSourcedSignal {
/**
* Java API
@ -72,6 +72,26 @@ final case class DeleteSnapshotFailed(target: DeletionTarget, failure: Throwable
def getTarget(): DeletionTarget = target
}
final case class DeleteMessagesCompleted(toSequenceNr: Long) extends EventSourcedSignal {
/**
* Java API
*/
def getToSequenceNr(): Long = toSequenceNr
}
final case class DeleteMessagesFailed(toSequenceNr: Long, failure: Throwable) extends EventSourcedSignal {
/**
* Java API
*/
def getFailure(): Throwable = failure
/**
* Java API
*/
def getToSequenceNr(): Long = toSequenceNr
}
/**
* Not for user extension
*/

View file

@ -0,0 +1,55 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed
/**
* Setup snapshot and event delete/retention behavior. Retention bridges snapshot
* and journal behavior. This defines the retention criteria.
*
* @param snapshotEveryNEvents Snapshots are used to reduce playback/recovery times.
* This defines when a new snapshot is persisted.
*
* @param keepNSnapshots After a snapshot is successfully completed,
* - if 2: retain last maximum 2 *`snapshot-size` events
* and 3 snapshots (2 old + latest snapshot)
* - if 0: all events with equal or lower sequence number
* will not be retained.
*
* @param deleteEventsOnSnapshot Opt-in ability to delete older events on successful
* save of snapshot. Defaults to disabled.
*/
final case class RetentionCriteria(snapshotEveryNEvents: Long, keepNSnapshots: Long, deleteEventsOnSnapshot: Boolean) {
/**
* Delete Messages:
* {{{ toSequenceNr - keepNSnapshots * snapshotEveryNEvents }}}
* Delete Snapshots:
* {{{ (toSequenceNr - 1) - (keepNSnapshots * snapshotEveryNEvents) }}}
*
* @param lastSequenceNr the sequence number to delete to if `deleteEventsOnSnapshot` is false
*/
def toSequenceNumber(lastSequenceNr: Long): Long = {
// Delete old events, retain the latest
lastSequenceNr - (keepNSnapshots * snapshotEveryNEvents)
}
}
object RetentionCriteria {
def apply(): RetentionCriteria =
RetentionCriteria(snapshotEveryNEvents = 1000L, keepNSnapshots = 2L, deleteEventsOnSnapshot = false)
/** Scala API. */
def apply(snapshotEveryNEvents: Long, keepNSnapshots: Long): RetentionCriteria =
RetentionCriteria(snapshotEveryNEvents, keepNSnapshots, deleteEventsOnSnapshot = false)
/** Java API. */
def create(snapshotEveryNEvents: Long, keepNSnapshots: Long): RetentionCriteria =
apply(snapshotEveryNEvents, keepNSnapshots)
/** Java API. */
def create(snapshotEveryNEvents: Long, keepNSnapshots: Long, deleteMessagesOnSnapshot: Boolean): RetentionCriteria =
apply(snapshotEveryNEvents, keepNSnapshots, deleteMessagesOnSnapshot)
}

View file

@ -15,6 +15,7 @@ import akka.annotation.InternalApi
import akka.persistence._
import akka.persistence.typed.EventAdapter
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.RetentionCriteria
import akka.persistence.typed.scaladsl.EventSourcedBehavior
import akka.util.ConstantFun
import akka.util.OptionVal
@ -35,9 +36,11 @@ private[akka] final class BehaviorSetup[C, E, S](
val eventAdapter: EventAdapter[E, _],
val snapshotWhen: (S, E, Long) Boolean,
val recovery: Recovery,
val retention: RetentionCriteria,
var holdingRecoveryPermit: Boolean,
val settings: EventSourcedSettings,
val stashState: StashState) {
import InternalProtocol.RecoveryTickEvent
import akka.actor.typed.scaladsl.adapter._

View file

@ -7,11 +7,8 @@ package akka.persistence.typed.internal
import java.util.UUID
import java.util.concurrent.atomic.AtomicInteger
import scala.util.Failure
import scala.util.Success
import scala.util.Try
import scala.util.control.NonFatal
import akka.Done
import akka.actor.typed
import akka.actor.typed.BackoffSupervisorStrategy
import akka.actor.typed.Behavior
@ -23,9 +20,14 @@ import akka.actor.typed.scaladsl.ActorContext
import akka.actor.typed.scaladsl.Behaviors
import akka.annotation.InternalApi
import akka.persistence._
import akka.persistence.typed.DeleteMessagesFailed
import akka.persistence.typed.DeleteSnapshotsCompleted
import akka.persistence.typed.DeleteSnapshotsFailed
import akka.persistence.typed.DeletionTarget
import akka.persistence.typed.EventAdapter
import akka.persistence.typed.NoOpEventAdapter
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.RetentionCriteria
import akka.persistence.typed.SnapshotCompleted
import akka.persistence.typed.SnapshotFailed
import akka.persistence.typed.scaladsl._
@ -62,6 +64,7 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
eventAdapter: EventAdapter[Event, Any] = NoOpEventAdapter.instance[Event],
snapshotWhen: (State, Event, Long) Boolean = ConstantFun.scalaAnyThreeToFalse,
recovery: Recovery = Recovery(),
retention: RetentionCriteria = RetentionCriteria(),
supervisionStrategy: SupervisorStrategy = SupervisorStrategy.stop,
override val signalHandler: PartialFunction[Signal, Unit] = PartialFunction.empty)
extends EventSourcedBehavior[Command, Event, State] {
@ -79,9 +82,19 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
val actualSignalHandler: PartialFunction[Signal, Unit] = signalHandler.orElse {
// default signal handler is always the fallback
case SnapshotCompleted(meta: SnapshotMetadata)
ctx.log.debug("Save snapshot successful, snapshot metadata: [{}]", meta)
ctx.log.debug("Save snapshot successful, snapshot metadata [{}]", meta)
case SnapshotFailed(meta, failure)
ctx.log.error(failure, "Save snapshot failed, snapshot metadata: [{}]", meta)
ctx.log.error(failure, "Save snapshot failed, snapshot metadata [{}]", meta)
case DeleteSnapshotsCompleted(DeletionTarget.Individual(meta)) =>
ctx.log.debug(s"Persistent snapshot [{}] deleted successfully.", meta)
case DeleteSnapshotsCompleted(DeletionTarget.Criteria(criteria)) =>
ctx.log.debug(s"Persistent snapshots given criteria [{}] deleted successfully.", criteria)
case DeleteSnapshotsFailed(DeletionTarget.Individual(meta), failure) =>
ctx.log.warning("Failed to delete snapshot with meta [{}] due to [{}].", meta, failure)
case DeleteSnapshotsFailed(DeletionTarget.Criteria(criteria), failure) =>
ctx.log.warning("Failed to delete snapshots given criteria [{}] due to [{}].", criteria, failure)
case DeleteMessagesFailed(toSequenceNr, failure) =>
ctx.log.warning("Failed to delete messages toSequenceNr [{}] due to [{}].", toSequenceNr, failure)
}
Behaviors
@ -99,6 +112,7 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
eventAdapter,
snapshotWhen,
recovery,
retention,
holdingRecoveryPermit = false,
settings = settings,
stashState = stashState)
@ -171,6 +185,9 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
copy(recovery = Recovery(selection))
}
override def withRetention(criteria: RetentionCriteria): EventSourcedBehavior[Command, Event, State] =
copy(retention = criteria)
override def withTagger(tagger: Event => Set[String]): EventSourcedBehavior[Command, Event, State] =
copy(tagger = tagger)

View file

@ -106,6 +106,29 @@ private[akka] trait JournalInteractions[C, E, S] {
} // else, no need to return the permit
}
/**
* On [[akka.persistence.SaveSnapshotSuccess]], if [[akka.persistence.typed.RetentionCriteria.deleteEventsOnSnapshot]]
* is enabled, old messages are deleted based on [[akka.persistence.typed.RetentionCriteria.snapshotEveryNEvents]]
* before old snapshots are deleted.
*/
protected def internalDeleteEvents(e: SaveSnapshotSuccess, state: Running.RunningState[S]): Unit =
if (setup.retention.deleteEventsOnSnapshot) {
val toSequenceNr = setup.retention.toSequenceNumber(e.metadata.sequenceNr)
if (toSequenceNr > 0) {
val lastSequenceNr = state.seqNr
val self = setup.selfUntyped
if (toSequenceNr == Long.MaxValue || toSequenceNr <= lastSequenceNr)
setup.journal ! JournalProtocol.DeleteMessagesTo(e.metadata.persistenceId, toSequenceNr, self)
else
self ! DeleteMessagesFailure(
new RuntimeException(
s"toSequenceNr [$toSequenceNr] must be less than or equal to lastSequenceNr [$lastSequenceNr]"),
toSequenceNr)
}
}
// ---------- snapshot store interactions ---------
/**
@ -125,4 +148,16 @@ private[akka] trait JournalInteractions[C, E, S] {
setup.selfUntyped)
}
/** Deletes the snapshot identified by `sequenceNr`. */
protected def internalDeleteSnapshots(toSequenceNr: Long): Unit = {
setup.log.debug("Deleting snapshot to [{}]", toSequenceNr)
val deleteTo = toSequenceNr - 1
val deleteFrom = math.max(0, setup.retention.toSequenceNumber(deleteTo))
val snapshotCriteria = SnapshotSelectionCriteria(minSequenceNr = deleteFrom, maxSequenceNr = deleteTo)
setup.log.debug("Deleting snapshots from [{}] to [{}]", deleteFrom, deleteTo)
setup.snapshotStore
.tell(SnapshotProtocol.DeleteSnapshots(setup.persistenceId.id, snapshotCriteria), setup.selfUntyped)
}
}

View file

@ -6,6 +6,7 @@ package akka.persistence.typed.internal
import scala.annotation.tailrec
import scala.collection.immutable
import akka.actor.UnhandledMessage
import akka.actor.typed.Behavior
import akka.actor.typed.Signal
@ -17,8 +18,10 @@ import akka.persistence.JournalProtocol._
import akka.persistence._
import akka.persistence.journal.Tagged
import akka.persistence.typed.Callback
import akka.persistence.typed.DeleteSnapshotCompleted
import akka.persistence.typed.DeleteSnapshotFailed
import akka.persistence.typed.DeleteSnapshotsCompleted
import akka.persistence.typed.DeleteSnapshotsFailed
import akka.persistence.typed.DeleteMessagesCompleted
import akka.persistence.typed.DeleteMessagesFailed
import akka.persistence.typed.DeletionTarget
import akka.persistence.typed.EventRejectedException
import akka.persistence.typed.SideEffect
@ -93,10 +96,9 @@ private[akka] object Running {
def onMessage(msg: InternalProtocol): Behavior[InternalProtocol] = msg match {
case IncomingCommand(c: C @unchecked) => onCommand(state, c)
case SnapshotterResponse(r) =>
setup.log.warning("Unexpected SnapshotterResponse {}", r)
Behaviors.unhandled
case _ => Behaviors.unhandled
case SnapshotterResponse(r) => onSnapshotterResponse(r)
case JournalResponse(r) => onJournalResponse(r)
case _ => Behaviors.unhandled
}
override def onSignal: PartialFunction[Signal, Behavior[InternalProtocol]] = {
@ -110,6 +112,53 @@ private[akka] object Running {
applyEffects(cmd, state, effect.asInstanceOf[EffectImpl[E, S]]) // TODO can we avoid the cast?
}
/** Handle journal responses for non-persist events workloads. */
def onJournalResponse(response: JournalProtocol.Response): Behavior[InternalProtocol] = {
val signal = response match {
case DeleteMessagesSuccess(toSequenceNr) =>
setup.log.debug("Persistent messages to [{}] deleted successfully.", toSequenceNr)
internalDeleteSnapshots(toSequenceNr)
Some(DeleteMessagesCompleted(toSequenceNr))
case DeleteMessagesFailure(e, toSequenceNr) =>
Some(DeleteMessagesFailed(toSequenceNr, e))
case _ =>
None
}
signal match {
case Some(sig) =>
setup.onSignal(sig)
this
case None =>
Behaviors.unhandled // unexpected journal response
}
}
/** Handle snapshot responses for non-persist events workloads. */
def onSnapshotterResponse(response: SnapshotProtocol.Response): Behavior[InternalProtocol] = {
val signal = response match {
case DeleteSnapshotsSuccess(criteria) =>
Some(DeleteSnapshotsCompleted(DeletionTarget.Criteria(criteria)))
case DeleteSnapshotsFailure(criteria, error) =>
Some(DeleteSnapshotsFailed(DeletionTarget.Criteria(criteria), error))
case DeleteSnapshotSuccess(meta) =>
Some(DeleteSnapshotsCompleted(DeletionTarget.Individual(meta)))
case DeleteSnapshotFailure(meta, error) =>
Some(DeleteSnapshotsFailed(DeletionTarget.Individual(meta), error))
case _ =>
None
}
signal match {
case Some(sig) =>
setup.onSignal(sig)
this
case None =>
Behaviors.unhandled // unexpected snapshot response
}
}
@tailrec def applyEffects(
msg: Any,
state: RunningState[S],
@ -315,21 +364,25 @@ private[akka] object Running {
def onSnapshotterResponse(response: SnapshotProtocol.Response): Unit = {
val signal = response match {
case SaveSnapshotSuccess(meta) =>
case e @ SaveSnapshotSuccess(meta) =>
// # 24698 The deletion of old events are automatic, snapshots are triggered by the SaveSnapshotSuccess.
setup.log.debug(s"Persistent snapshot [{}] saved successfully", meta)
if (setup.retention.deleteEventsOnSnapshot)
internalDeleteEvents(e, state) // if successful, DeleteMessagesSuccess then internalDeleteSnapshots
else
internalDeleteSnapshots(meta.sequenceNr)
Some(SnapshotCompleted(meta))
case SaveSnapshotFailure(meta, ex) =>
Some(SnapshotFailed(meta, ex))
case DeleteSnapshotSuccess(meta) =>
Some(DeleteSnapshotCompleted(DeletionTarget.Individual(meta)))
case DeleteSnapshotFailure(meta, ex) =>
Some(DeleteSnapshotFailed(DeletionTarget.Individual(meta), ex))
case DeleteSnapshotsSuccess(criteria) =>
Some(DeleteSnapshotCompleted(DeletionTarget.Criteria(criteria)))
case DeleteSnapshotsFailure(criteria, failure) =>
Some(DeleteSnapshotFailed(DeletionTarget.Criteria(criteria), failure))
case _ => None
case SaveSnapshotFailure(meta, error) =>
setup.log.warning("Failed to save snapshot given metadata [{}] due to [{}]", meta, error.getMessage)
Some(SnapshotFailed(meta, error))
case _ =>
None
}
setup.log.debug("Received snapshot event [{}], returning signal [{}].", response, signal)
signal.foreach(setup.onSignal _)
}

View file

@ -128,6 +128,8 @@ abstract class EventSourcedBehavior[Command, Event, State >: Null] private[akka]
def eventAdapter(): EventAdapter[Event, _] = NoOpEventAdapter.instance[Event]
def retentionCriteria: RetentionCriteria = RetentionCriteria()
/**
* INTERNAL API: DeferredBehavior init
*/

View file

@ -4,9 +4,8 @@
package akka.persistence.typed.scaladsl
import scala.util.Try
import scala.annotation.tailrec
import akka.Done
import akka.actor.typed.BackoffSupervisorStrategy
import akka.actor.typed.Behavior
import akka.actor.typed.Behavior.DeferredBehavior
@ -20,6 +19,7 @@ import akka.persistence._
import akka.persistence.typed.EventAdapter
import akka.persistence.typed.ExpectingReply
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.RetentionCriteria
import akka.persistence.typed.internal._
object EventSourcedBehavior {
@ -176,6 +176,11 @@ object EventSourcedBehavior {
*/
def withSnapshotSelectionCriteria(selection: SnapshotSelectionCriteria): EventSourcedBehavior[Command, Event, State]
/**
* Criteria for internal retention/deletion of snapshots and events.
*/
def withRetention(criteria: RetentionCriteria): EventSourcedBehavior[Command, Event, State]
/**
* The `tagger` function should give event tags, which will be used in persistence query
*/

View file

@ -10,6 +10,7 @@ import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.persistence.typed.PersistenceId;
import akka.persistence.typed.RecoveryCompleted;
import akka.persistence.typed.RetentionCriteria;
import akka.persistence.typed.javadsl.CommandHandler;
import akka.persistence.typed.javadsl.EventHandler;
import akka.persistence.typed.javadsl.EventSourcedBehavior;
@ -278,6 +279,12 @@ public class BasicPersistentBehaviorTest {
}
// #snapshottingPredicate
// #retentionCriteria
@Override // override snapshotEvery in EventSourcedBehavior
public RetentionCriteria retentionCriteria() {
return RetentionCriteria.create(1000, 2);
}
// #retentionCriteria
}
}

View file

@ -10,8 +10,10 @@ import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.concurrent.duration._
import scala.util.Failure
import scala.util.Success
import scala.util.Try
import akka.Done
import akka.testkit.EventFilter
import akka.actor.testkit.typed.{ TestException, TestKitSettings }
@ -23,29 +25,32 @@ import akka.actor.typed.SupervisorStrategy
import akka.actor.typed.Terminated
import akka.actor.typed.scaladsl.ActorContext
import akka.actor.typed.scaladsl.Behaviors
import akka.persistence.SelectedSnapshot
import akka.persistence.SnapshotMetadata
import akka.persistence.SnapshotSelectionCriteria
import akka.persistence.SelectedSnapshot
import akka.persistence.journal.inmem.InmemJournal
import akka.persistence.query.EventEnvelope
import akka.persistence.query.PersistenceQuery
import akka.persistence.query.Sequence
import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal
import akka.persistence.snapshot.SnapshotStore
import akka.persistence.typed.DeleteSnapshotsCompleted
import akka.persistence.typed.DeleteMessagesCompleted
import akka.persistence.typed.EventAdapter
import akka.persistence.typed.ExpectingReply
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.RecoveryCompleted
import akka.persistence.typed.SnapshotCompleted
import akka.persistence.typed.SnapshotFailed
import akka.persistence.typed.DeletionTarget.Criteria
import akka.persistence.typed.EventSourcedSignal
import akka.persistence.typed.RetentionCriteria
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import org.scalatest.WordSpecLike
import scala.util.Failure
object EventSourcedBehaviorSpec {
//#event-wrapper
@ -69,8 +74,16 @@ object EventSourcedBehaviorSpec {
Future.successful(())
}
def deleteAsync(metadata: SnapshotMetadata) = ???
def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria) = ???
override def deleteAsync(metadata: SnapshotMetadata): Future[Unit] = {
state = state.filterNot { case (k, (_, b)) => k == metadata.persistenceId && b.sequenceNr == metadata.sequenceNr }
Future.successful(())
}
override def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] = {
val range = criteria.minSequenceNr to criteria.maxSequenceNr
state = state.filterNot { case (k, (_, b)) => k == persistenceId && range.contains(b.sequenceNr) }
Future.successful(())
}
}
// also used from PersistentActorTest
@ -134,11 +147,18 @@ object EventSourcedBehaviorSpec {
persistenceId,
loggingActor = TestProbe[String].ref,
probe = TestProbe[(State, Event)].ref,
TestProbe[Try[Done]].ref)
snapshotProbe = TestProbe[Try[Done]].ref,
retentionProbe = TestProbe[Try[EventSourcedSignal]].ref)
def counter(ctx: ActorContext[Command], persistenceId: PersistenceId, logging: ActorRef[String])(
implicit system: ActorSystem[_]): EventSourcedBehavior[Command, Event, State] =
counter(ctx, persistenceId, loggingActor = logging, probe = TestProbe[(State, Event)].ref, TestProbe[Try[Done]].ref)
counter(
ctx,
persistenceId,
loggingActor = logging,
probe = TestProbe[(State, Event)].ref,
TestProbe[Try[Done]].ref,
TestProbe[Try[EventSourcedSignal]].ref)
def counterWithProbe(
ctx: ActorContext[Command],
@ -146,22 +166,49 @@ object EventSourcedBehaviorSpec {
probe: ActorRef[(State, Event)],
snapshotProbe: ActorRef[Try[Done]])(
implicit system: ActorSystem[_]): EventSourcedBehavior[Command, Event, State] =
counter(ctx, persistenceId, TestProbe[String].ref, probe, snapshotProbe)
counter(ctx, persistenceId, TestProbe[String].ref, probe, snapshotProbe, TestProbe[Try[EventSourcedSignal]].ref)
def counterWithProbe(ctx: ActorContext[Command], persistenceId: PersistenceId, probe: ActorRef[(State, Event)])(
implicit system: ActorSystem[_]): EventSourcedBehavior[Command, Event, State] =
counter(ctx, persistenceId, TestProbe[String].ref, probe, TestProbe[Try[Done]].ref)
counter(
ctx,
persistenceId,
TestProbe[String].ref,
probe,
TestProbe[Try[Done]].ref,
TestProbe[Try[EventSourcedSignal]].ref)
def counterWithSnapshotProbe(ctx: ActorContext[Command], persistenceId: PersistenceId, probe: ActorRef[Try[Done]])(
implicit system: ActorSystem[_]): EventSourcedBehavior[Command, Event, State] =
counter(ctx, persistenceId, TestProbe[String].ref, TestProbe[(State, Event)].ref, snapshotProbe = probe)
counter(
ctx,
persistenceId,
TestProbe[String].ref,
TestProbe[(State, Event)].ref,
snapshotProbe = probe,
TestProbe[Try[EventSourcedSignal]].ref)
def counterWithSnapshotAndRetentionProbe(
ctx: ActorContext[Command],
persistenceId: PersistenceId,
probeS: ActorRef[Try[Done]],
probeR: ActorRef[Try[EventSourcedSignal]])(
implicit system: ActorSystem[_]): EventSourcedBehavior[Command, Event, State] =
counter(
ctx,
persistenceId,
TestProbe[String].ref,
TestProbe[(State, Event)].ref,
snapshotProbe = probeS,
retentionProbe = probeR)
def counter(
ctx: ActorContext[Command],
persistenceId: PersistenceId,
loggingActor: ActorRef[String],
probe: ActorRef[(State, Event)],
snapshotProbe: ActorRef[Try[Done]]): EventSourcedBehavior[Command, Event, State] = {
snapshotProbe: ActorRef[Try[Done]],
retentionProbe: ActorRef[Try[EventSourcedSignal]]): EventSourcedBehavior[Command, Event, State] = {
EventSourcedBehavior[Command, Event, State](
persistenceId,
emptyState = State(0, Vector.empty),
@ -267,9 +314,10 @@ object EventSourcedBehaviorSpec {
snapshotProbe ! Success(Done)
case SnapshotFailed(_, failure)
snapshotProbe ! Failure(failure)
case e: EventSourcedSignal =>
retentionProbe ! Success(e)
}
}
}
class EventSourcedBehaviorSpec extends ScalaTestWithActorTestKit(EventSourcedBehaviorSpec.conf) with WordSpecLike {
@ -701,7 +749,88 @@ class EventSourcedBehaviorSpec extends ScalaTestWithActorTestKit(EventSourcedBeh
c2 ! Fail
probe.expectTerminated(c2) // should fail
}
}
"delete snapshots automatically, based on criteria" in {
val unexpected = (signal: EventSourcedSignal) => fail(s"Unexpected signal [$signal].")
val pid = nextPid
val snapshotProbe = TestProbe[Try[Done]]()
val retentionProbe = TestProbe[Try[EventSourcedSignal]]()
val replyProbe = TestProbe[State]()
val persistentActor = spawn(
Behaviors.setup[Command](
ctx
counterWithSnapshotAndRetentionProbe(ctx, pid, snapshotProbe.ref, retentionProbe.ref)
.snapshotEvery(2)
.withRetention(
RetentionCriteria(snapshotEveryNEvents = 2, keepNSnapshots = 2, deleteEventsOnSnapshot = false))))
persistentActor ! IncrementWithPersistAll(3)
persistentActor ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(3, Vector(0, 1, 2)))
snapshotProbe.expectMessage(Try(Done))
retentionProbe.expectMessageType[Success[DeleteSnapshotsCompleted]].value match {
case DeleteSnapshotsCompleted(Criteria(SnapshotSelectionCriteria(maxSequenceNr, _, minSequenceNr, _))) =>
maxSequenceNr shouldEqual 2
minSequenceNr shouldEqual 0
case signal => unexpected(signal)
}
persistentActor ! IncrementWithPersistAll(3)
snapshotProbe.expectMessage(Try(Done))
retentionProbe.expectMessageType[Success[DeleteSnapshotsCompleted]].value match {
case DeleteSnapshotsCompleted(Criteria(SnapshotSelectionCriteria(maxSequenceNr, _, minSequenceNr, _))) =>
maxSequenceNr shouldEqual 5
minSequenceNr shouldEqual 1
case signal => unexpected(signal)
}
persistentActor ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(6, Vector(0, 1, 2, 3, 4, 5)))
}
"optionally delete both old messages and snapshots" in {
val pid = nextPid
val snapshotProbe = TestProbe[Try[Done]]()
val retentionProbe = TestProbe[Try[EventSourcedSignal]]()
val replyProbe = TestProbe[State]()
val persistentActor = spawn(
Behaviors.setup[Command](
ctx
counterWithSnapshotAndRetentionProbe(ctx, pid, snapshotProbe.ref, retentionProbe.ref)
.snapshotEvery(2)
.withRetention(
RetentionCriteria(snapshotEveryNEvents = 2, keepNSnapshots = 2, deleteEventsOnSnapshot = true))))
persistentActor ! IncrementWithPersistAll(10)
persistentActor ! GetValue(replyProbe.ref)
val initialState = replyProbe.expectMessageType[State]
initialState.value shouldEqual 10
initialState.history shouldEqual (0 until initialState.value).toVector
snapshotProbe.expectMessage(Try(Done))
val firstDeleteMessages = retentionProbe.expectMessageType[Success[DeleteMessagesCompleted]].value
firstDeleteMessages.toSequenceNr shouldEqual 6 // 10 - 2 * 2
retentionProbe.expectMessageType[Success[DeleteSnapshotsCompleted]].value match {
case DeleteSnapshotsCompleted(Criteria(SnapshotSelectionCriteria(maxSequenceNr, _, minSequenceNr, _))) =>
maxSequenceNr shouldEqual 5 // 10 / 2
minSequenceNr shouldEqual 1
case signal => fail(s"Unexpected signal [$signal].")
}
persistentActor ! IncrementWithPersistAll(10)
snapshotProbe.expectMessage(Try(Done))
val secondDeleteMessages = retentionProbe.expectMessageType[Success[DeleteMessagesCompleted]].value
secondDeleteMessages.toSequenceNr shouldEqual 16 // 20 - 2 * 2
persistentActor ! GetValue(replyProbe.ref)
val state = replyProbe.expectMessageType[State]
state.value shouldEqual 20
state.history shouldEqual (0 until state.value).toVector
}
def watcher(toWatch: ActorRef[_]): TestProbe[String] = {

View file

@ -144,3 +144,13 @@ private[persistence] object JournalProtocol {
final case class ReplayMessagesFailure(cause: Throwable) extends Response with DeadLetterSuppression
}
/**
* Reply message to a successful [[JournalProtocol.DeleteMessagesTo]] request.
*/
final case class DeleteMessagesSuccess(toSequenceNr: Long) extends JournalProtocol.Response
/**
* Reply message to a failed [[JournalProtocol.DeleteMessagesTo]] request.
*/
final case class DeleteMessagesFailure(cause: Throwable, toSequenceNr: Long) extends JournalProtocol.Response

View file

@ -6,14 +6,14 @@ package akka.persistence
import java.lang.{ Iterable => JIterable }
import akka.actor._
import akka.japi.Procedure
import akka.japi.Util
import com.typesafe.config.Config
import scala.collection.immutable
import scala.util.control.NoStackTrace
import akka.actor._
import akka.annotation.InternalApi
import akka.japi.Procedure
import akka.japi.Util
import com.typesafe.config.Config
abstract class RecoveryCompleted
@ -29,16 +29,6 @@ case object RecoveryCompleted extends RecoveryCompleted {
def getInstance = this
}
/**
* Reply message to a successful [[Eventsourced#deleteMessages]] request.
*/
final case class DeleteMessagesSuccess(toSequenceNr: Long)
/**
* Reply message to a failed [[Eventsourced#deleteMessages]] request.
*/
final case class DeleteMessagesFailure(cause: Throwable, toSequenceNr: Long)
/**
* Recovery mode configuration object to be returned in [[PersistentActor#recovery]].
*