diff --git a/akka-docs/src/main/paradox/typed/persistence-snapshot.md b/akka-docs/src/main/paradox/typed/persistence-snapshot.md index dff463525f..fb3a76dbf1 100644 --- a/akka-docs/src/main/paradox/typed/persistence-snapshot.md +++ b/akka-docs/src/main/paradox/typed/persistence-snapshot.md @@ -7,7 +7,7 @@ prone to accumulating extremely long event logs and experiencing long recovery t may be to split out into a set of shorter lived actors. However, when this is not an option, you can use snapshots to reduce recovery times drastically. -Persistent actors can save snapshots of internal state every N events or when a given predicated of the state +Persistent actors can save snapshots of internal state every N events or when a given predicate of the state is fulfilled. Scala @@ -46,7 +46,9 @@ events. This can be useful if snapshot serialization format has changed in an in not be used when events have been deleted. In order to use snapshots, a default snapshot-store (`akka.persistence.snapshot-store.plugin`) must be configured, -or the @scala[`PersistentActor`]@java[persistent actor] can pick a snapshot store explicitly by overriding @scala[`def snapshotPluginId: String`]@java[`String snapshotPluginId()`]. +or you can pick a snapshot store for for a specific `EventSourcedBehavior by +@scala[defining it with `withSnapshotPluginId` of the `EventSourcedBehavior`]@java[overriding `snapshotPluginId` in +the `EventSourcedBehavior`]. Because some use cases may not benefit from or need snapshots, it is perfectly valid not to not configure a snapshot store. However, Akka will log a warning message when this situation is detected and then continue to operate until @@ -55,10 +57,71 @@ an actor tries to store a snapshot, at which point the operation will fail. ## Snapshot failures Saving snapshots can either succeed or fail – this information is reported back to the persistent actor via -the `onSnapshot` callback. Snapshot failures are, by default, logged but do not cause the actor to stop or -restart. +the `SnapshotCompleted` or `SnapshotFailed` signal. Snapshot failures are logged by default but do not cause +the actor to stop or restart. If there is a problem with recovering the state of the actor from the journal when the actor is -started, `onRecoveryFailure` is called (logging the error by default), and the actor will be stopped. +started, `RecoveryFailed` signal is emitted (logging the error by default), and the actor will be stopped. Note that failure to load snapshot is also treated like this, but you can disable loading of snapshots if you for example know that serialization format has changed in an incompatible way. + +## Snapshot deletion + +To free up space, an event sourced actor can automatically delete older snapshots based on the given `RetentionCriteria`. + +Scala +: @@snip [BasicPersistentBehaviorCompileOnly.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala) { #retentionCriteria } + +Java +: @@snip [BasicPersistentBehaviorTest.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java) { #retentionCriteria #snapshottingPredicate } + +Snapshot deletion is triggered after saving a new snapshot. + +The above example will save snapshots automatically every `numberOfEvents = 100`. Snapshots that have sequence +number less than the sequence number of the saved snapshot minus `keepNSnapshots * numberOfEvents` (`100 * 2`) are automatically +deleted. + +In addition, it will also save a snapshot when the persisted event is `BookingCompleted`. Automatic snapshotting +based on `numberOfEvents` can be used without specifying @scala[`snapshotWhen`]@java[`shouldSnapshot`]. Snapshots +triggered by the @scala[`snapshotWhen`]@java[`shouldSnapshot`] predicate will not trigger deletion of old snapshots. + +On async deletion, either a `DeleteSnapshotsCompleted` or `DeleteSnapshotsFailed` signal is emitted. +You can react to signal outcomes by using @scala[with `receiveSignal` handler] @java[by overriding `receiveSignal`]. +By default, successful completion is logged by the system at log level `debug`, failures at log level `warning`. + +Scala +: @@snip [BasicPersistentBehaviorCompileOnly.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala) { #retentionCriteriaWithSignals } + +Java +: @@snip [BasicPersistentBehaviorTest.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java) { #retentionCriteriaWithSignals } + +## Event deletion + +Deleting events in event sourcing based applications is typically either not used at all, or used in conjunction with snapshotting. +By deleting events you will lose the history of how the system changed before it reached current state, which is +one of the main reasons for using event sourcing in the first place. + +If snapshot-based retention is enabled, after a snapshot has been successfully stored, a delete of the events +(journaled by a single event sourced actor) up until the sequence number of the data held by that snapshot can be issued. + +To elect to use this, enable `withDeleteEventsOnSnapshot` of the `RetentionCriteria` which is disabled by default. + +Scala +: @@snip [BasicPersistentBehaviorCompileOnly.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala) { #snapshotAndEventDeletes } + +Java +: @@snip [BasicPersistentBehaviorTest.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java) { #snapshotAndEventDeletes } + +Event deletion is triggered after saving a new snapshot. Old events would be deleted prior to old snapshots being deleted. + +On async deletion, either a `DeleteEventsCompleted` or `DeleteEventsFailed` signal is emitted. +You can react to signal outcomes by using @scala[with `receiveSignal` handler] @java[by overriding `receiveSignal`]. +By default, successful completion is logged by the system at log level `debug`, failures at log level `warning`. + +Message deletion does not affect the highest sequence number of the journal, even if all messages were deleted from it after a delete occurs. + +@@@ note + +It is up to the journal implementation whether events are actually removed from storage. + +@@@ diff --git a/akka-docs/src/main/paradox/typed/persistence.md b/akka-docs/src/main/paradox/typed/persistence.md index cb23513fa6..3d476fb565 100644 --- a/akka-docs/src/main/paradox/typed/persistence.md +++ b/akka-docs/src/main/paradox/typed/persistence.md @@ -453,54 +453,3 @@ processed. It's allowed to stash messages while unstashing. Those newly added commands will not be processed by the `unstashAll` effect that was in progress and have to be unstashed by another `unstashAll`. - -## Retention - snapshots and events - -Retention of snapshots and events are controlled by a few factors. Deletes to free up space is currently available. - -### Snapshot deletion - -To free up space, an event sourced actor can automatically delete older snapshots -based on a user provided or default `RetentionCriteria` @scala[from `withRetention`] @java[by overriding `retentionCriteria`] -combined with the `snapshotWhen` method. - -Scala -: @@snip [BasicPersistentBehaviorCompileOnly.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala) { #retentionCriteria } - -Java -: @@snip [BasicPersistentBehaviorTest.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java) { #retentionCriteria } - -On async deletion, either a `SnapshotCompleted` or `SnapshotFailed` is emitted. Successful completion is logged by the system at log level `debug`, failures at log level `warning`. -You can leverage `EventSourcedSignal` to react to outcomes @scala[with `receiveSignal` handler] @java[by overriding `receiveSignal`]. - -Scala -: @@snip [BasicPersistentBehaviorCompileOnly.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala) { #retentionCriteriaWithSignals } - -Java -: @@snip [BasicPersistentBehaviorTest.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java) { #retentionCriteriaWithSignals } - -## Event deletion - -Deleting events in event sourcing based applications is typically either not used at all, or used in conjunction with snapshotting. -If snapshot-based recovery is enabled, after a snapshot has been successfully stored, a delete (journaled by a single event sourced actor) up until the sequence number of the data held by that snapshot can be issued. - -To elect to use this, enable `RetentionCriteria.deleteEventsOnSnapshot` which is disabled by default. -You can leverage `EventSourcedSignal` to react to outcomes @scala[with `receiveSignal` handler] @java[by overriding `receiveSignal`]. - -Scala -: @@snip [BasicPersistentBehaviorCompileOnly.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala) { #snapshotAndEventDeletes } - -Java -: @@snip [BasicPersistentBehaviorTest.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java) { #retentionCriteria } - -On `SaveSnapshotSuccess`, old events would be deleted based on `RetentionCriteria` prior to old snapshots being deleted. On async deletion, either `DeleteEventsCompleted` or `DeleteEventsFailed` is emitted. Successful completion is logged by the -system at log level `debug`, failures at log level `warning`. - -Message deletion does not affect the highest sequence number of the journal, even if all messages were deleted from it after a delete occurs. - -@@@ note - -It is up to the journal implementation whether events are actually removed from storage. -Deleting events prevents future replaying of old events to apply new state. - -@@@ note diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/Retention.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/Retention.scala deleted file mode 100644 index 227bb1390d..0000000000 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/Retention.scala +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright (C) 2019 Lightbend Inc. - */ - -package akka.persistence.typed - -/** - * Setup snapshot and event delete/retention behavior. Retention bridges snapshot - * and journal behavior. This defines the retention criteria. - * - * @param snapshotEveryNEvents Snapshots are used to reduce playback/recovery times. - * This defines when a new snapshot is persisted - on every N events. - * `snapshotEveryNEvents` should be greater than 0. - * - * @param keepNSnapshots After a snapshot is successfully completed, - * - if 2: retain last maximum 2 *`snapshot-size` events - * and 3 snapshots (2 old + latest snapshot) - * - if 0: all events with equal or lower sequence number - * will not be retained. - * - * @param deleteEventsOnSnapshot Opt-in ability to delete older events on successful - * save of snapshot. Defaults to disabled. - */ -final case class RetentionCriteria(snapshotEveryNEvents: Long, keepNSnapshots: Long, deleteEventsOnSnapshot: Boolean) { - - /** - * Delete Messages: - * {{{ toSequenceNr - keepNSnapshots * snapshotEveryNEvents }}} - * Delete Snapshots: - * {{{ (toSequenceNr - 1) - (keepNSnapshots * snapshotEveryNEvents) }}} - * - * @param lastSequenceNr the sequence number to delete to if `deleteEventsOnSnapshot` is false - */ - def toSequenceNumber(lastSequenceNr: Long): Long = { - // Delete old events, retain the latest - math.max(0, lastSequenceNr - (keepNSnapshots * snapshotEveryNEvents)) - } - - /** Java API. */ - def withDeleteEventsOnSnapshot(): RetentionCriteria = - copy(deleteEventsOnSnapshot = true) -} - -object RetentionCriteria { - - val disabled: RetentionCriteria = - RetentionCriteria(snapshotEveryNEvents = 0, keepNSnapshots = 0, deleteEventsOnSnapshot = false) - - def snapshotEvery(numberOfEvents: Long, keepNSnapshots: Long): RetentionCriteria = - apply(numberOfEvents, keepNSnapshots, false) - -} diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala index 9d2d3635c2..f9243fb095 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala @@ -16,11 +16,21 @@ import akka.annotation.InternalApi import akka.persistence._ import akka.persistence.typed.EventAdapter import akka.persistence.typed.PersistenceId -import akka.persistence.typed.RetentionCriteria import akka.persistence.typed.scaladsl.EventSourcedBehavior +import akka.persistence.typed.scaladsl.RetentionCriteria import akka.util.ConstantFun import akka.util.OptionVal +/** + * INTERNAL API + */ +@InternalApi private[akka] object BehaviorSetup { + sealed trait SnapshotAfterPersist + case object NoSnapshot extends SnapshotAfterPersist + case object SnapshotWithRetention extends SnapshotAfterPersist + case object SnapshotWithoutRetention extends SnapshotAfterPersist +} + /** * INTERNAL API: Carry state for the Persistent behavior implementation behaviors. */ @@ -44,6 +54,7 @@ private[akka] final class BehaviorSetup[C, E, S]( import InternalProtocol.RecoveryTickEvent import akka.actor.typed.scaladsl.adapter._ + import BehaviorSetup._ val persistence: Persistence = Persistence(context.system.toUntyped) @@ -121,6 +132,18 @@ private[akka] final class BehaviorSetup[C, E, S]( } } + def shouldSnapshot(state: S, event: E, sequenceNr: Long): SnapshotAfterPersist = { + retention match { + case DisabledRetentionCriteria => + if (snapshotWhen(state, event, sequenceNr)) SnapshotWithoutRetention + else NoSnapshot + case s: SnapshotCountRetentionCriteriaImpl => + if (s.snapshotWhen(sequenceNr)) SnapshotWithRetention + else if (snapshotWhen(state, event, sequenceNr)) SnapshotWithoutRetention + else NoSnapshot + } + } + } /** diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala index 7759ae7e47..8d6faefdf6 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala @@ -7,8 +7,6 @@ package akka.persistence.typed.internal import java.util.UUID import java.util.concurrent.atomic.AtomicInteger -import scala.util.control.NonFatal - import akka.actor.typed import akka.actor.typed.BackoffSupervisorStrategy import akka.actor.typed.Behavior @@ -31,10 +29,10 @@ import akka.persistence.typed.DeletionTarget import akka.persistence.typed.EventAdapter import akka.persistence.typed.NoOpEventAdapter import akka.persistence.typed.PersistenceId -import akka.persistence.typed.RetentionCriteria import akka.persistence.typed.SnapshotCompleted import akka.persistence.typed.SnapshotFailed import akka.persistence.typed.SnapshotSelectionCriteria +import akka.persistence.typed.scaladsl.RetentionCriteria import akka.persistence.typed.scaladsl._ import akka.util.ConstantFun @@ -164,9 +162,6 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State]( handler: PartialFunction[(State, Signal), Unit]): EventSourcedBehavior[Command, Event, State] = copy(signalHandler = handler) - override def snapshotWhen(predicate: (State, Event, Long) => Boolean): EventSourcedBehavior[Command, Event, State] = - copy(snapshotWhen = predicate) - override def withJournalPluginId(id: String): EventSourcedBehavior[Command, Event, State] = { require(id != null, "journal plugin id must not be null; use empty string for 'default' journal") copy(journalPluginId = if (id != "") Some(id) else None) @@ -182,12 +177,11 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State]( copy(recovery = Recovery(selection.toUntyped)) } - override def withRetention(criteria: RetentionCriteria): EventSourcedBehavior[Command, Event, State] = { - require( - criteria.snapshotEveryNEvents > 0, - s"'snapshotEveryNEvents' must be positive but was ${criteria.snapshotEveryNEvents}") - copy(retention = criteria, snapshotWhen = (_, _, seqNr) => seqNr % criteria.snapshotEveryNEvents == 0) - } + override def snapshotWhen(predicate: (State, Event, Long) => Boolean): EventSourcedBehavior[Command, Event, State] = + copy(snapshotWhen = predicate) + + override def withRetention(criteria: RetentionCriteria): EventSourcedBehavior[Command, Event, State] = + copy(retention = criteria) override def withTagger(tagger: Event => Set[String]): EventSourcedBehavior[Command, Event, State] = copy(tagger = tagger) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedSettings.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedSettings.scala index 05327fd4fb..f3f9748d53 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedSettings.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedSettings.scala @@ -11,7 +11,6 @@ import scala.concurrent.duration._ import akka.actor.typed.ActorSystem import akka.annotation.InternalApi import akka.persistence.Persistence -import akka.persistence.Persistence.verifyPluginConfigIsDefined import com.typesafe.config.Config /** diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ExternalInteractions.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ExternalInteractions.scala index 2fa2b017a6..84edca5ece 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ExternalInteractions.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ExternalInteractions.scala @@ -105,26 +105,21 @@ private[akka] trait JournalInteractions[C, E, S] { } /** - * On [[akka.persistence.SaveSnapshotSuccess]], if [[akka.persistence.typed.RetentionCriteria.deleteEventsOnSnapshot]] - * is enabled, old messages are deleted based on [[akka.persistence.typed.RetentionCriteria.snapshotEveryNEvents]] + * On [[akka.persistence.SaveSnapshotSuccess]], if `SnapshotCountRetentionCriteria.deleteEventsOnSnapshot` + * is enabled, old messages are deleted based on `SnapshotCountRetentionCriteria.snapshotEveryNEvents` * before old snapshots are deleted. */ - protected def internalDeleteEvents(e: SaveSnapshotSuccess, state: Running.RunningState[S]): Unit = - if (setup.retention.deleteEventsOnSnapshot) { - val toSequenceNr = setup.retention.toSequenceNumber(e.metadata.sequenceNr) + protected def internalDeleteEvents(lastSequenceNr: Long, toSequenceNr: Long): Unit = + if (toSequenceNr > 0) { + val self = setup.selfUntyped - if (toSequenceNr > 0) { - val lastSequenceNr = state.seqNr - val self = setup.selfUntyped - - if (toSequenceNr == Long.MaxValue || toSequenceNr <= lastSequenceNr) - setup.journal ! JournalProtocol.DeleteMessagesTo(e.metadata.persistenceId, toSequenceNr, self) - else - self ! DeleteMessagesFailure( - new RuntimeException( - s"toSequenceNr [$toSequenceNr] must be less than or equal to lastSequenceNr [$lastSequenceNr]"), - toSequenceNr) - } + if (toSequenceNr == Long.MaxValue || toSequenceNr <= lastSequenceNr) + setup.journal ! JournalProtocol.DeleteMessagesTo(setup.persistenceId.id, toSequenceNr, self) + else + self ! DeleteMessagesFailure( + new RuntimeException( + s"toSequenceNr [$toSequenceNr] must be less than or equal to lastSequenceNr [$lastSequenceNr]"), + toSequenceNr) } } @@ -153,12 +148,8 @@ private[akka] trait SnapshotInteractions[C, E, S] { } /** Deletes the snapshots up to and including the `sequenceNr`. */ - protected def internalDeleteSnapshots(toSequenceNr: Long): Unit = { + protected def internalDeleteSnapshots(fromSequenceNr: Long, toSequenceNr: Long): Unit = { if (toSequenceNr > 0) { - // We could use 0 as fromSequenceNr to delete all older snapshots, but that might be inefficient for - // large ranges depending on how it's implemented in the snapshot plugin. Therefore we use the - // same window as defined for how much to keep in the retention criteria - val fromSequenceNr = setup.retention.toSequenceNumber(toSequenceNr) val snapshotCriteria = SnapshotSelectionCriteria(minSequenceNr = fromSequenceNr, maxSequenceNr = toSequenceNr) setup.log.debug("Deleting snapshots from sequenceNr [{}] to [{}]", fromSequenceNr, toSequenceNr) setup.snapshotStore diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/RetentionCriteriaImpl.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/RetentionCriteriaImpl.scala new file mode 100644 index 0000000000..4bdb43c437 --- /dev/null +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/RetentionCriteriaImpl.scala @@ -0,0 +1,55 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.persistence.typed.internal + +import akka.annotation.InternalApi +import akka.persistence.typed.scaladsl +import akka.persistence.typed.javadsl + +/** + * INTERNAL API + */ +@InternalApi private[akka] final case class SnapshotCountRetentionCriteriaImpl( + snapshotEveryNEvents: Int, + keepNSnapshots: Int, + deleteEventsOnSnapshot: Boolean) + extends javadsl.SnapshotCountRetentionCriteria + with scaladsl.SnapshotCountRetentionCriteria { + + require(snapshotEveryNEvents > 0, s"'snapshotEveryNEvents' must be greater than 0, was [$snapshotEveryNEvents]") + require(keepNSnapshots > 0, s"'keepNSnapshots' must be greater than 0, was [$keepNSnapshots]") + + def snapshotWhen(currentSequenceNr: Long): Boolean = + currentSequenceNr % snapshotEveryNEvents == 0 + + def deleteUpperSequenceNr(lastSequenceNr: Long): Long = { + // Delete old events, retain the latest + math.max(0, lastSequenceNr - (keepNSnapshots * snapshotEveryNEvents)) + } + + def deleteLowerSequenceNr(upperSequenceNr: Long): Long = { + // We could use 0 as fromSequenceNr to delete all older snapshots, but that might be inefficient for + // large ranges depending on how it's implemented in the snapshot plugin. Therefore we use the + // same window as defined for how much to keep in the retention criteria + math.max(0, upperSequenceNr - (keepNSnapshots * snapshotEveryNEvents)) + } + + override def withDeleteEventsOnSnapshot: SnapshotCountRetentionCriteriaImpl = + copy(deleteEventsOnSnapshot = true) + + override def asScala: scaladsl.RetentionCriteria = this + + override def asJava: javadsl.RetentionCriteria = this +} + +/** + * INTERNAL API + */ +@InternalApi private[akka] case object DisabledRetentionCriteria + extends javadsl.RetentionCriteria + with scaladsl.RetentionCriteria { + override def asScala: scaladsl.RetentionCriteria = this + override def asJava: javadsl.RetentionCriteria = this +} diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala index 1e8bf6f38f..0db3416109 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala @@ -94,6 +94,7 @@ private[akka] object Running { with StashManagement[C, E, S] { import InternalProtocol._ import Running.RunningState + import BehaviorSetup._ private val runningCmdsMdc = MDC.create(setup.persistenceId, MDC.RunningCmds) private val persistingEventsMdc = MDC.create(setup.persistenceId, MDC.PersistingEvents) @@ -151,7 +152,7 @@ private[akka] object Running { val newState2 = internalPersist(newState, eventToPersist) - val shouldSnapshotAfterPersist = setup.snapshotWhen(newState2.state, event, newState2.seqNr) + val shouldSnapshotAfterPersist = setup.shouldSnapshot(newState2.state, event, newState2.seqNr) persistingEvents(newState2, state, numberOfEvents = 1, shouldSnapshotAfterPersist, sideEffects) @@ -161,10 +162,11 @@ private[akka] object Running { // the invalid event, in case such validation is implemented in the event handler. // also, ensure that there is an event handler for each single event var seqNr = state.seqNr - val (newState, shouldSnapshotAfterPersist) = events.foldLeft((state, false)) { + val (newState, shouldSnapshotAfterPersist) = events.foldLeft((state, NoSnapshot: SnapshotAfterPersist)) { case ((currentState, snapshot), event) => seqNr += 1 - val shouldSnapshot = snapshot || setup.snapshotWhen(currentState.state, event, seqNr) + val shouldSnapshot = + if (snapshot == NoSnapshot) setup.shouldSnapshot(currentState.state, event, seqNr) else snapshot (currentState.applyEvent(setup, event), shouldSnapshot) } @@ -214,7 +216,7 @@ private[akka] object Running { state: RunningState[S], visibleState: RunningState[S], // previous state until write success numberOfEvents: Int, - shouldSnapshotAfterPersist: Boolean, + shouldSnapshotAfterPersist: SnapshotAfterPersist, sideEffects: immutable.Seq[SideEffect[S]]): Behavior[InternalProtocol] = { setup.setMdc(persistingEventsMdc) new PersistingEvents(state, visibleState, numberOfEvents, shouldSnapshotAfterPersist, sideEffects) @@ -225,7 +227,7 @@ private[akka] object Running { var state: RunningState[S], var visibleState: RunningState[S], // previous state until write success numberOfEvents: Int, - shouldSnapshotAfterPersist: Boolean, + shouldSnapshotAfterPersist: SnapshotAfterPersist, var sideEffects: immutable.Seq[SideEffect[S]]) extends AbstractBehavior[InternalProtocol] with WithSeqNrAccessible { @@ -264,11 +266,12 @@ private[akka] object Running { if (eventCounter < numberOfEvents) this else { visibleState = state - if (shouldSnapshotAfterPersist && state.state != null) { - internalSaveSnapshot(state) - storingSnapshot(state, sideEffects) - } else + if (shouldSnapshotAfterPersist == NoSnapshot || state.state == null) { tryUnstashOne(applySideEffects(sideEffects, state)) + } else { + internalSaveSnapshot(state) + storingSnapshot(state, sideEffects, shouldSnapshotAfterPersist) + } } } @@ -316,7 +319,10 @@ private[akka] object Running { // =============================================== - def storingSnapshot(state: RunningState[S], sideEffects: immutable.Seq[SideEffect[S]]): Behavior[InternalProtocol] = { + def storingSnapshot( + state: RunningState[S], + sideEffects: immutable.Seq[SideEffect[S]], + snapshotReason: SnapshotAfterPersist): Behavior[InternalProtocol] = { setup.setMdc(storingSnapshotMdc) def onCommand(cmd: IncomingCommand[C]): Behavior[InternalProtocol] = { @@ -332,13 +338,22 @@ private[akka] object Running { def onSaveSnapshotResponse(response: SnapshotProtocol.Response): Unit = { val signal = response match { - case e @ SaveSnapshotSuccess(meta) => - // # 24698 The deletion of old events are automatic, snapshots are triggered by the SaveSnapshotSuccess. + case SaveSnapshotSuccess(meta) => setup.log.debug(s"Persistent snapshot [{}] saved successfully", meta) - if (setup.retention.deleteEventsOnSnapshot) - internalDeleteEvents(e, state) - else - internalDeleteSnapshots(setup.retention.toSequenceNumber(meta.sequenceNr)) + if (snapshotReason == SnapshotWithRetention) { + // deletion of old events and snspahots are triggered by the SaveSnapshotSuccess + setup.retention match { + case DisabledRetentionCriteria => // no further actions + case s @ SnapshotCountRetentionCriteriaImpl(_, _, true) => + // deleteEventsOnSnapshot == true, deletion of old events + val deleteEventsToSeqNr = s.deleteUpperSequenceNr(meta.sequenceNr) + internalDeleteEvents(meta.sequenceNr, deleteEventsToSeqNr) + case s @ SnapshotCountRetentionCriteriaImpl(_, _, false) => + // deleteEventsOnSnapshot == false, deletion of old snapshots + val deleteSnapshotsToSeqNr = s.deleteUpperSequenceNr(meta.sequenceNr) + internalDeleteSnapshots(s.deleteLowerSequenceNr(deleteSnapshotsToSeqNr), deleteSnapshotsToSeqNr) + } + } Some(SnapshotCompleted(SnapshotMetadata.fromUntyped(meta))) @@ -374,7 +389,7 @@ private[akka] object Running { .receiveSignal { case (_, PoisonPill) => // wait for snapshot response before stopping - storingSnapshot(state.copy(receivedPoisonPill = true), sideEffects) + storingSnapshot(state.copy(receivedPoisonPill = true), sideEffects, snapshotReason) case (_, signal) => setup.onSignal(state.state, signal, catchAndLog = false) Behaviors.same @@ -430,10 +445,15 @@ private[akka] object Running { val signal = response match { case DeleteMessagesSuccess(toSequenceNr) => setup.log.debug("Persistent events to sequenceNr [{}] deleted successfully.", toSequenceNr) - // The reason for -1 is that a snapshot at the exact toSequenceNr is still useful and the events - // after that can be replayed after that snapshot, but replaying the events after toSequenceNr without - // starting at the snapshot at toSequenceNr would be invalid. - internalDeleteSnapshots(toSequenceNr - 1) + setup.retention match { + case DisabledRetentionCriteria => // no further actions + case s: SnapshotCountRetentionCriteriaImpl => + // The reason for -1 is that a snapshot at the exact toSequenceNr is still useful and the events + // after that can be replayed after that snapshot, but replaying the events after toSequenceNr without + // starting at the snapshot at toSequenceNr would be invalid. + val deleteSnapshotsToSeqNr = toSequenceNr - 1 + internalDeleteSnapshots(s.deleteLowerSequenceNr(deleteSnapshotsToSeqNr), deleteSnapshotsToSeqNr) + } Some(DeleteEventsCompleted(toSequenceNr)) case DeleteMessagesFailure(e, toSequenceNr) => Some(DeleteEventsFailed(toSequenceNr, e)) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedBehavior.scala index c889b896c7..26be99c0a1 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedBehavior.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedBehavior.scala @@ -17,6 +17,7 @@ import akka.annotation.InternalApi import akka.persistence.typed.EventAdapter import akka.persistence.typed._ import akka.persistence.typed.internal._ +import akka.util.unused @ApiMayChange abstract class EventSourcedBehavior[Command, Event, State] private[akka] ( @@ -100,18 +101,6 @@ abstract class EventSourcedBehavior[Command, Event, State] private[akka] ( protected final def newEventHandlerBuilder(): EventHandlerBuilder[State, Event] = EventHandlerBuilder.builder[State, Event]() - /** - * Initiates a snapshot if the given function returns true. - * When persisting multiple events at once the snapshot is triggered after all the events have - * been persisted. - * - * receives the State, Event and the sequenceNr used for the Event - * - * @return `true` if snapshot should be saved for the given event - * @see [[EventSourcedBehavior#snapshotEvery]] - */ - def shouldSnapshot(state: State, event: Event, sequenceNr: Long): Boolean = false - /** * Override and define the journal plugin id that this actor should use instead of the default. */ @@ -132,26 +121,43 @@ abstract class EventSourcedBehavior[Command, Event, State] private[akka] ( */ def snapshotSelectionCriteria: SnapshotSelectionCriteria = SnapshotSelectionCriteria.latest + /** + * Initiates a snapshot if the given predicate evaluates to true. + * + * Decide to store a snapshot based on the State, Event and sequenceNr when the event has + * been successfully persisted. + * + * When persisting multiple events at once the snapshot is triggered after all the events have + * been persisted. + * + * Snapshots triggered by `snapshotWhen` will not trigger deletes of old snapshots and events if + * [[EventSourcedBehavior.retentionCriteria]] with [[RetentionCriteria.snapshotEvery]] is used together with + * `shouldSnapshot`. Such deletes are only triggered by snapshots matching the `numberOfEvents` in the + * [[RetentionCriteria]]. + * + * @return `true` if snapshot should be saved at the given `state`, `event` and `sequenceNr` when the event has + * been successfully persisted + */ + def shouldSnapshot(@unused state: State, @unused event: Event, @unused sequenceNr: Long): Boolean = false + + /** + * Criteria for retention/deletion of snapshots and events. + * By default, retention is disabled and snapshots are not saved and deleted automatically. + */ + def retentionCriteria: RetentionCriteria = RetentionCriteria.disabled + /** * The `tagger` function should give event tags, which will be used in persistence query */ - def tagsFor(event: Event): java.util.Set[String] = Collections.emptySet() + def tagsFor(@unused event: Event): java.util.Set[String] = Collections.emptySet() def eventAdapter(): EventAdapter[Event, _] = NoOpEventAdapter.instance[Event] - def retentionCriteria: RetentionCriteria = RetentionCriteria.disabled - /** * INTERNAL API: DeferredBehavior init */ @InternalApi override def apply(context: typed.TypedActorContext[Command]): Behavior[Command] = { - val snapshotWhen: (State, Event, Long) => Boolean = { (state, event, seqNr) => - val n = retentionCriteria.snapshotEveryNEvents - if (n > 0) - seqNr % n == 0 - else - shouldSnapshot(state, event, seqNr) - } + val snapshotWhen: (State, Event, Long) => Boolean = (state, event, seqNr) => shouldSnapshot(state, event, seqNr) val tagger: Event => Set[String] = { event => import scala.collection.JavaConverters._ @@ -167,6 +173,7 @@ abstract class EventSourcedBehavior[Command, Event, State] private[akka] ( eventHandler()(_, _), getClass) .snapshotWhen(snapshotWhen) + .withRetention(retentionCriteria.asScala) .withTagger(tagger) .eventAdapter(eventAdapter()) .withJournalPluginId(journalPluginId) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/RetentionCriteria.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/RetentionCriteria.scala new file mode 100644 index 0000000000..5a320de9c9 --- /dev/null +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/RetentionCriteria.scala @@ -0,0 +1,50 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.persistence.typed.javadsl + +import akka.annotation.DoNotInherit +import akka.persistence.typed.internal.DisabledRetentionCriteria +import akka.persistence.typed.internal.SnapshotCountRetentionCriteriaImpl + +/** + * Criteria for retention/deletion of snapshots and events. + */ +abstract class RetentionCriteria { + def asScala: akka.persistence.typed.scaladsl.RetentionCriteria +} + +/** + * Criteria for retention/deletion of snapshots and events. + */ +object RetentionCriteria { + + /** + * Snapshots are not saved and deleted automatically, events are not deleted. + */ + val disabled: RetentionCriteria = DisabledRetentionCriteria + + /** + * Save snapshots automatically every `numberOfEvents`. Snapshots that have sequence number + * less than the sequence number of the saved snapshot minus `keepNSnapshots * numberOfEvents` are automatically + * deleted. + * + * Use [[SnapshotCountRetentionCriteria.withDeleteEventsOnSnapshot]] to + * delete old events. Events are not deleted by default. + */ + def snapshotEvery(numberOfEvents: Int, keepNSnapshots: Int): SnapshotCountRetentionCriteria = + SnapshotCountRetentionCriteriaImpl(numberOfEvents, keepNSnapshots, deleteEventsOnSnapshot = false) + +} + +@DoNotInherit abstract class SnapshotCountRetentionCriteria extends RetentionCriteria { + + /** + * Delete events after saving snapshot via [[RetentionCriteria.snapshotEvery()]]. + * Events that have sequence number less than the snapshot sequence number minus + * `keepNSnapshots * numberOfEvents` are deleted. + */ + def withDeleteEventsOnSnapshot: SnapshotCountRetentionCriteria + +} diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala index 28854ed96a..a64c9c55c4 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala @@ -17,7 +17,6 @@ import akka.annotation.DoNotInherit import akka.persistence.typed.EventAdapter import akka.persistence.typed.ExpectingReply import akka.persistence.typed.PersistenceId -import akka.persistence.typed.RetentionCriteria import akka.persistence.typed.SnapshotSelectionCriteria import akka.persistence.typed.internal._ @@ -136,15 +135,6 @@ object EventSourcedBehavior { */ def signalHandler: PartialFunction[(State, Signal), Unit] - /** - * Initiates a snapshot if the given function returns true. - * When persisting multiple events at once the snapshot is triggered after all the events have - * been persisted. - * - * `predicate` receives the State, Event and the sequenceNr used for the Event - */ - def snapshotWhen(predicate: (State, Event, Long) => Boolean): EventSourcedBehavior[Command, Event, State] - /** * Change the journal plugin id that this actor should use. */ @@ -166,7 +156,24 @@ object EventSourcedBehavior { def withSnapshotSelectionCriteria(selection: SnapshotSelectionCriteria): EventSourcedBehavior[Command, Event, State] /** - * Criteria for internal retention/deletion of snapshots and events. + * Initiates a snapshot if the given `predicate` evaluates to true. + * + * Decide to store a snapshot based on the State, Event and sequenceNr when the event has + * been successfully persisted. + * + * When persisting multiple events at once the snapshot is triggered after all the events have + * been persisted. + * + * Snapshots triggered by `snapshotWhen` will not trigger deletes of old snapshots and events if + * [[EventSourcedBehavior.withRetention]] with [[RetentionCriteria.snapshotEvery]] is used together with + * `snapshotWhen`. Such deletes are only triggered by snapshots matching the `numberOfEvents` in the + * [[RetentionCriteria]]. + */ + def snapshotWhen(predicate: (State, Event, Long) => Boolean): EventSourcedBehavior[Command, Event, State] + + /** + * Criteria for retention/deletion of snapshots and events. + * By default, retention is disabled and snapshots are not saved and deleted automatically. */ def withRetention(criteria: RetentionCriteria): EventSourcedBehavior[Command, Event, State] diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/RetentionCriteria.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/RetentionCriteria.scala new file mode 100644 index 0000000000..f81dc1d2a4 --- /dev/null +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/RetentionCriteria.scala @@ -0,0 +1,49 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.persistence.typed.scaladsl + +import akka.annotation.DoNotInherit +import akka.persistence.typed.internal.DisabledRetentionCriteria +import akka.persistence.typed.internal.SnapshotCountRetentionCriteriaImpl + +/** + * Criteria for retention/deletion of snapshots and events. + */ +trait RetentionCriteria { + def asJava: akka.persistence.typed.javadsl.RetentionCriteria +} + +/** + * Criteria for retention/deletion of snapshots and events. + */ +object RetentionCriteria { + + /** + * Snapshots are not saved and deleted automatically, events are not deleted. + */ + val disabled: RetentionCriteria = DisabledRetentionCriteria + + /** + * Save snapshots automatically every `numberOfEvents`. Snapshots that have sequence number + * less than sequence number of the saved snapshot minus `keepNSnapshots * numberOfEvents` are + * automatically deleted. + * + * Use [[SnapshotCountRetentionCriteria.withDeleteEventsOnSnapshot]] to + * delete old events. Events are not deleted by default. + */ + def snapshotEvery(numberOfEvents: Int, keepNSnapshots: Int): SnapshotCountRetentionCriteria = + SnapshotCountRetentionCriteriaImpl(numberOfEvents, keepNSnapshots, deleteEventsOnSnapshot = false) + +} + +@DoNotInherit trait SnapshotCountRetentionCriteria extends RetentionCriteria { + + /** + * Delete events after saving snapshot via [[RetentionCriteria.snapshotEvery()]]. + * Events that have sequence number less than the snapshot sequence number minus + * `keepNSnapshots * numberOfEvents` are deleted. + */ + def withDeleteEventsOnSnapshot: SnapshotCountRetentionCriteria +} diff --git a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java index 091751eee7..508e3cd007 100644 --- a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java +++ b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java @@ -411,6 +411,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite { public boolean shouldSnapshot(State state, Incremented event, long sequenceNr) { return state.value % 2 == 0; } + @Override public SignalHandler signalHandler() { return newSignalHandlerBuilder() diff --git a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java index 6e5325873f..7c4d678799 100644 --- a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java +++ b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java @@ -8,19 +8,18 @@ import akka.actor.typed.Behavior; import akka.actor.typed.SupervisorStrategy; import akka.actor.typed.javadsl.ActorContext; import akka.actor.typed.javadsl.Behaviors; +import akka.persistence.typed.DeleteEventsFailed; +import akka.persistence.typed.DeleteSnapshotsFailed; import akka.persistence.typed.PersistenceId; import akka.persistence.typed.RecoveryCompleted; -import akka.persistence.typed.RetentionCriteria; import akka.persistence.typed.SnapshotFailed; -import akka.persistence.typed.DeleteSnapshotsFailed; -import akka.persistence.typed.DeleteEventsFailed; import akka.persistence.typed.javadsl.CommandHandler; import akka.persistence.typed.javadsl.EventHandler; import akka.persistence.typed.javadsl.EventSourcedBehavior; +import akka.persistence.typed.javadsl.RetentionCriteria; import akka.persistence.typed.javadsl.SignalHandler; import java.time.Duration; - import java.util.ArrayList; import java.util.List; import java.util.Set; @@ -278,8 +277,7 @@ public class BasicPersistentBehaviorTest { // #retentionCriteria @Override // override retentionCriteria in EventSourcedBehavior public RetentionCriteria retentionCriteria() { - // to also delete events use `RetentionCriteria.withDeleteEvents()` - return RetentionCriteria.snapshotEvery(1000, 2); + return RetentionCriteria.snapshotEvery(100, 2); } // #retentionCriteria @@ -308,6 +306,19 @@ public class BasicPersistentBehaviorTest { } // #retentionCriteriaWithSignals } + + public static class Snapshotting2 extends Snapshotting { + public Snapshotting2(PersistenceId persistenceId) { + super(persistenceId); + } + + // #snapshotAndEventDeletes + @Override // override retentionCriteria in EventSourcedBehavior + public RetentionCriteria retentionCriteria() { + return RetentionCriteria.snapshotEvery(100, 2).withDeleteEventsOnSnapshot(); + } + // #snapshotAndEventDeletes + } } interface WithActorContext { diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/internal/RetentionCriteriaSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/internal/RetentionCriteriaSpec.scala new file mode 100644 index 0000000000..daaa4d8a37 --- /dev/null +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/internal/RetentionCriteriaSpec.scala @@ -0,0 +1,71 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.persistence.typed.internal + +import akka.persistence.typed.scaladsl.RetentionCriteria +import org.scalatest.Matchers +import org.scalatest.TestSuite +import org.scalatest.WordSpecLike + +class RetentionCriteriaSpec extends TestSuite with Matchers with WordSpecLike { + + "RetentionCriteria" must { + + "snapshotWhen the sequenceNr matches numberOfEvents" in { + val criteria = RetentionCriteria.snapshotEvery(3, 2).asInstanceOf[SnapshotCountRetentionCriteriaImpl] + criteria.snapshotWhen(1) should ===(false) + criteria.snapshotWhen(2) should ===(false) + criteria.snapshotWhen(3) should ===(true) + criteria.snapshotWhen(4) should ===(false) + criteria.snapshotWhen(6) should ===(true) + criteria.snapshotWhen(21) should ===(true) + criteria.snapshotWhen(31) should ===(false) + } + + "have valid sequenceNr range based on keepNSnapshots" in { + val criteria = RetentionCriteria.snapshotEvery(3, 2).asInstanceOf[SnapshotCountRetentionCriteriaImpl] + val expected = List( + 1 -> (0 -> 0), + 3 -> (0 -> 0), + 4 -> (0 -> 0), + 6 -> (0 -> 0), + 7 -> (0 -> 1), + 9 -> (0 -> 3), + 10 -> (0 -> 4), + 12 -> (0 -> 6), + 13 -> (1 -> 7), + 15 -> (3 -> 9), + 18 -> (6 -> 12), + 20 -> (8 -> 14)) + expected.foreach { + case (seqNr, (lower, upper)) => + withClue(s"seqNr=$seqNr:") { + criteria.deleteUpperSequenceNr(seqNr) should ===(upper) + criteria.deleteLowerSequenceNr(upper) should ===(lower) + } + } + } + + "require keepNSnapshots >= 1" in { + RetentionCriteria.snapshotEvery(100, 1) // ok + intercept[IllegalArgumentException] { + RetentionCriteria.snapshotEvery(100, 0) + } + intercept[IllegalArgumentException] { + RetentionCriteria.snapshotEvery(100, -1) + } + } + + "require numberOfEvents >= 1" in { + RetentionCriteria.snapshotEvery(1, 2) // ok + intercept[IllegalArgumentException] { + RetentionCriteria.snapshotEvery(0, 0) + } + intercept[IllegalArgumentException] { + RetentionCriteria.snapshotEvery(-1, -1) + } + } + } +} diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorRetentionSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorRetentionSpec.scala new file mode 100644 index 0000000000..fb11e9742e --- /dev/null +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorRetentionSpec.scala @@ -0,0 +1,567 @@ +/* + * Copyright (C) 2017-2019 Lightbend Inc. + */ + +package akka.persistence.typed.scaladsl + +import java.util.UUID +import java.util.concurrent.atomic.AtomicInteger + +import scala.util.Failure +import scala.util.Success +import scala.util.Try + +import akka.actor.testkit.typed.scaladsl._ +import akka.actor.typed.ActorRef +import akka.actor.typed.ActorSystem +import akka.actor.typed.Behavior +import akka.actor.typed.Terminated +import akka.actor.typed.scaladsl.ActorContext +import akka.actor.typed.scaladsl.Behaviors +import akka.persistence.typed.DeleteEventsCompleted +import akka.persistence.typed.DeleteSnapshotsCompleted +import akka.persistence.typed.DeletionTarget +import akka.persistence.typed.EventSourcedSignal +import akka.persistence.typed.PersistenceId +import akka.persistence.typed.RecoveryCompleted +import akka.persistence.typed.SnapshotCompleted +import akka.persistence.typed.SnapshotFailed +import akka.persistence.typed.SnapshotMetadata +import akka.persistence.typed.SnapshotSelectionCriteria +import akka.testkit.EventFilter +import akka.testkit.TestEvent.Mute +import akka.util.unused +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory +import org.scalatest.WordSpecLike + +object EventSourcedBehaviorRetentionSpec { + + def conf: Config = ConfigFactory.parseString(s""" + akka.loglevel = INFO + akka.loggers = [akka.testkit.TestEventListener] + # akka.persistence.typed.log-stashing = on + akka.persistence.journal.leveldb.dir = "target/typed-persistence-${UUID.randomUUID().toString}" + akka.persistence.journal.plugin = "akka.persistence.journal.leveldb" + akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" + akka.persistence.snapshot-store.local.dir = "target/typed-persistence-${UUID.randomUUID().toString}" + """) + + sealed trait Command + final case object Increment extends Command + final case class IncrementWithPersistAll(nr: Int) extends Command + final case class GetValue(replyTo: ActorRef[State]) extends Command + final case object StopIt extends Command + + sealed trait Event + final case class Incremented(delta: Int) extends Event + + final case class State(value: Int, history: Vector[Int]) + + def counter(persistenceId: PersistenceId)(implicit system: ActorSystem[_]): Behavior[Command] = + Behaviors.setup(ctx => counter(ctx, persistenceId)) + + def counter(ctx: ActorContext[Command], persistenceId: PersistenceId)( + implicit system: ActorSystem[_]): EventSourcedBehavior[Command, Event, State] = + counter( + ctx, + persistenceId, + probe = TestProbe[(State, Event)].ref, + snapshotProbe = TestProbe[Try[SnapshotMetadata]].ref, + retentionProbe = TestProbe[Try[EventSourcedSignal]].ref) + + def counterWithProbe( + ctx: ActorContext[Command], + persistenceId: PersistenceId, + probe: ActorRef[(State, Event)], + snapshotProbe: ActorRef[Try[SnapshotMetadata]])( + implicit system: ActorSystem[_]): EventSourcedBehavior[Command, Event, State] = + counter(ctx, persistenceId, probe, snapshotProbe, TestProbe[Try[EventSourcedSignal]].ref) + + def counterWithProbe(ctx: ActorContext[Command], persistenceId: PersistenceId, probe: ActorRef[(State, Event)])( + implicit system: ActorSystem[_]): EventSourcedBehavior[Command, Event, State] = + counter(ctx, persistenceId, probe, TestProbe[Try[SnapshotMetadata]].ref, TestProbe[Try[EventSourcedSignal]].ref) + + def counterWithSnapshotProbe( + ctx: ActorContext[Command], + persistenceId: PersistenceId, + probe: ActorRef[Try[SnapshotMetadata]])( + implicit system: ActorSystem[_]): EventSourcedBehavior[Command, Event, State] = + counter( + ctx, + persistenceId, + TestProbe[(State, Event)].ref, + snapshotProbe = probe, + TestProbe[Try[EventSourcedSignal]].ref) + + def counterWithSnapshotAndRetentionProbe( + ctx: ActorContext[Command], + persistenceId: PersistenceId, + probeS: ActorRef[Try[SnapshotMetadata]], + probeR: ActorRef[Try[EventSourcedSignal]])( + implicit system: ActorSystem[_]): EventSourcedBehavior[Command, Event, State] = + counter(ctx, persistenceId, TestProbe[(State, Event)].ref, snapshotProbe = probeS, retentionProbe = probeR) + + def counter( + @unused ctx: ActorContext[Command], + persistenceId: PersistenceId, + probe: ActorRef[(State, Event)], + snapshotProbe: ActorRef[Try[SnapshotMetadata]], + retentionProbe: ActorRef[Try[EventSourcedSignal]]): EventSourcedBehavior[Command, Event, State] = { + EventSourcedBehavior[Command, Event, State]( + persistenceId, + emptyState = State(0, Vector.empty), + commandHandler = (state, cmd) => + cmd match { + case Increment => + Effect.persist(Incremented(1)) + + case IncrementWithPersistAll(n) => + Effect.persist((0 until n).map(_ => Incremented(1))) + + case GetValue(replyTo) => + replyTo ! state + Effect.none + + case StopIt => + Effect.none.thenStop() + + }, + eventHandler = (state, evt) ⇒ + evt match { + case Incremented(delta) ⇒ + probe ! ((state, evt)) + State(state.value + delta, state.history :+ state.value) + }).receiveSignal { + case (_, RecoveryCompleted) => () + case (_, SnapshotCompleted(metadata)) ⇒ + snapshotProbe ! Success(metadata) + case (_, SnapshotFailed(_, failure)) ⇒ + snapshotProbe ! Failure(failure) + case (_, e: EventSourcedSignal) => + retentionProbe ! Success(e) + } + } +} + +class EventSourcedBehaviorRetentionSpec + extends ScalaTestWithActorTestKit(EventSourcedBehaviorRetentionSpec.conf) + with WordSpecLike { + + import EventSourcedBehaviorRetentionSpec._ + import akka.actor.typed.scaladsl.adapter._ + + // needed for the untyped event filter + implicit val actorSystem = system.toUntyped + + val pidCounter = new AtomicInteger(0) + private def nextPid(): PersistenceId = PersistenceId(s"c${pidCounter.incrementAndGet()})") + + actorSystem.eventStream.publish(Mute(EventFilter.info(pattern = ".*was not delivered.*", occurrences = 100))) + actorSystem.eventStream.publish(Mute(EventFilter.warning(pattern = ".*received dead letter.*", occurrences = 100))) + + "EventSourcedBehavior with retention" must { + + "snapshot every N sequence nrs" in { + val pid = nextPid() + val c = spawn(Behaviors.setup[Command](ctx => + counter(ctx, pid).withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 2, keepNSnapshots = 2)))) + val watchProbe = watcher(c) + val replyProbe = TestProbe[State]() + + c ! Increment + c ! GetValue(replyProbe.ref) + replyProbe.expectMessage(State(1, Vector(0))) + c ! StopIt + watchProbe.expectMessage("Terminated") + + // no snapshot should have happened + val probeC2 = TestProbe[(State, Event)]() + val snapshotProbe = TestProbe[Try[SnapshotMetadata]]() + val c2 = spawn( + Behaviors.setup[Command](ctx => + counterWithProbe(ctx, pid, probeC2.ref, snapshotProbe.ref) + .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 2, keepNSnapshots = 2)))) + probeC2.expectMessage[(State, Event)]((State(0, Vector()), Incremented(1))) + val watchProbeC2 = watcher(c2) + c2 ! Increment + snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(2) + c2 ! StopIt + watchProbeC2.expectMessage("Terminated") + + val probeC3 = TestProbe[(State, Event)]() + val c3 = spawn( + Behaviors.setup[Command](ctx => + counterWithProbe(ctx, pid, probeC3.ref) + .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 2, keepNSnapshots = 2)))) + // this time it should have been snapshotted so no events to replay + probeC3.expectNoMessage() + c3 ! GetValue(replyProbe.ref) + replyProbe.expectMessage(State(2, Vector(0, 1))) + } + + "snapshot every N sequence nrs when persisting multiple events" in { + val pid = nextPid() + val snapshotProbe = TestProbe[Try[SnapshotMetadata]]() + val c = + spawn( + Behaviors.setup[Command](ctx => + counterWithSnapshotProbe(ctx, pid, snapshotProbe.ref) + .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 2, keepNSnapshots = 2)))) + val watchProbe = watcher(c) + val replyProbe = TestProbe[State]() + + c ! IncrementWithPersistAll(3) + c ! GetValue(replyProbe.ref) + replyProbe.expectMessage(State(3, Vector(0, 1, 2))) + // snapshot at seqNr 3 because of persistAll + snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(3) + c ! StopIt + watchProbe.expectMessage("Terminated") + + val probeC2 = TestProbe[(State, Event)]() + val c2 = spawn( + Behaviors.setup[Command](ctx => + counterWithProbe(ctx, pid, probeC2.ref) + .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 2, keepNSnapshots = 2)))) + probeC2.expectNoMessage() + c2 ! GetValue(replyProbe.ref) + replyProbe.expectMessage(State(3, Vector(0, 1, 2))) + } + + "snapshot via predicate" in { + val pid = nextPid() + val snapshotProbe = TestProbe[Try[SnapshotMetadata]] + val alwaysSnapshot: Behavior[Command] = + Behaviors.setup { ctx => + counterWithSnapshotProbe(ctx, pid, snapshotProbe.ref).snapshotWhen { (_, _, _) => + true + } + } + val c = spawn(alwaysSnapshot) + val watchProbe = watcher(c) + val replyProbe = TestProbe[State]() + + c ! Increment + snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(1) + c ! GetValue(replyProbe.ref) + replyProbe.expectMessage(State(1, Vector(0))) + c ! StopIt + watchProbe.expectMessage("Terminated") + + val probe = TestProbe[(State, Event)]() + val c2 = spawn(Behaviors.setup[Command](ctx => counterWithProbe(ctx, pid, probe.ref))) + // state should be rebuilt from snapshot, no events replayed + // Fails as snapshot is async (i think) + probe.expectNoMessage() + c2 ! Increment + c2 ! GetValue(replyProbe.ref) + replyProbe.expectMessage(State(2, Vector(0, 1))) + } + + "check all events for snapshot in PersistAll" in { + val pid = nextPid() + val snapshotProbe = TestProbe[Try[SnapshotMetadata]] + val snapshotAtTwo = Behaviors.setup[Command](ctx => + counterWithSnapshotProbe(ctx, pid, snapshotProbe.ref).snapshotWhen { (s, _, _) => + s.value == 2 + }) + val c: ActorRef[Command] = spawn(snapshotAtTwo) + val watchProbe = watcher(c) + val replyProbe = TestProbe[State]() + + c ! IncrementWithPersistAll(3) + + c ! GetValue(replyProbe.ref) + replyProbe.expectMessage(State(3, Vector(0, 1, 2))) + // snapshot at seqNr 3 because of persistAll + snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(3) + c ! StopIt + watchProbe.expectMessage("Terminated") + + val probeC2 = TestProbe[(State, Event)]() + val c2 = spawn(Behaviors.setup[Command](ctx => counterWithProbe(ctx, pid, probeC2.ref))) + // middle event triggered all to be snapshot + probeC2.expectNoMessage() + c2 ! GetValue(replyProbe.ref) + replyProbe.expectMessage(State(3, Vector(0, 1, 2))) + } + + def expectDeleteSnapshotCompleted( + retentionProbe: TestProbe[Try[EventSourcedSignal]], + maxSequenceNr: Long, + minSequenceNr: Long): Unit = { + retentionProbe.expectMessageType[Success[DeleteSnapshotsCompleted]].value should ===( + DeleteSnapshotsCompleted(DeletionTarget.Criteria( + SnapshotSelectionCriteria.latest.withMaxSequenceNr(maxSequenceNr).withMinSequenceNr(minSequenceNr)))) + } + + "delete snapshots automatically, based on criteria" in { + val pid = nextPid() + val snapshotProbe = TestProbe[Try[SnapshotMetadata]]() + val retentionProbe = TestProbe[Try[EventSourcedSignal]]() + val replyProbe = TestProbe[State]() + + val persistentActor = spawn( + Behaviors.setup[Command](ctx ⇒ + counterWithSnapshotAndRetentionProbe(ctx, pid, snapshotProbe.ref, retentionProbe.ref) + .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 3, keepNSnapshots = 2)))) + + (1 to 10).foreach(_ => persistentActor ! Increment) + persistentActor ! GetValue(replyProbe.ref) + replyProbe.expectMessage(State(10, (0 until 10).toVector)) + snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(3) + snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(6) + snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(9) + expectDeleteSnapshotCompleted(retentionProbe, 3, 0) + + (1 to 10).foreach(_ => persistentActor ! Increment) + persistentActor ! GetValue(replyProbe.ref) + replyProbe.expectMessage(State(20, (0 until 20).toVector)) + snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(12) + snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(15) + snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(18) + expectDeleteSnapshotCompleted(retentionProbe, 6, 0) + expectDeleteSnapshotCompleted(retentionProbe, 9, 3) + expectDeleteSnapshotCompleted(retentionProbe, 12, 6) + + retentionProbe.expectNoMessage() + } + + "optionally delete both old events and snapshots" in { + val pid = nextPid() + val snapshotProbe = TestProbe[Try[SnapshotMetadata]]() + val retentionProbe = TestProbe[Try[EventSourcedSignal]]() + val replyProbe = TestProbe[State]() + + val persistentActor = spawn( + Behaviors.setup[Command]( + ctx ⇒ + counterWithSnapshotAndRetentionProbe(ctx, pid, snapshotProbe.ref, retentionProbe.ref).withRetention( + // tests the Java API as well + RetentionCriteria.snapshotEvery(numberOfEvents = 3, keepNSnapshots = 2).withDeleteEventsOnSnapshot))) + + (1 to 10).foreach(_ => persistentActor ! Increment) + persistentActor ! GetValue(replyProbe.ref) + replyProbe.expectMessage(State(10, (0 until 10).toVector)) + snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(3) + snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(6) + snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(9) + + retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 3 + // Note that when triggering deletion of snapshots from deletion of events it is intentionally "off by one". + // The reason for -1 is that a snapshot at the exact toSequenceNr is still useful and the events + // after that can be replayed after that snapshot, but replaying the events after toSequenceNr without + // starting at the snapshot at toSequenceNr would be invalid. + expectDeleteSnapshotCompleted(retentionProbe, 2, 0) + + (1 to 10).foreach(_ => persistentActor ! Increment) + persistentActor ! GetValue(replyProbe.ref) + replyProbe.expectMessage(State(20, (0 until 20).toVector)) + snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(12) + snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(15) + snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(18) + + retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 6 + expectDeleteSnapshotCompleted(retentionProbe, 5, 0) + + retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 9 + expectDeleteSnapshotCompleted(retentionProbe, 8, 2) + + retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 12 + expectDeleteSnapshotCompleted(retentionProbe, 11, 5) + + retentionProbe.expectNoMessage() + } + + "be possible to combine snapshotWhen and retention criteria" in { + val pid = nextPid() + val snapshotProbe = TestProbe[Try[SnapshotMetadata]]() + val retentionProbe = TestProbe[Try[EventSourcedSignal]]() + val replyProbe = TestProbe[State]() + + val persistentActor = spawn( + Behaviors.setup[Command]( + ctx ⇒ + counterWithSnapshotAndRetentionProbe(ctx, pid, snapshotProbe.ref, retentionProbe.ref) + .snapshotWhen((_, _, seqNr) => seqNr == 3 || seqNr == 13) + .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 5, keepNSnapshots = 1)))) + + (1 to 3).foreach(_ => persistentActor ! Increment) + persistentActor ! GetValue(replyProbe.ref) + replyProbe.expectMessage(State(3, (0 until 3).toVector)) + snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(3) + retentionProbe.expectNoMessage() + + (4 to 10).foreach(_ => persistentActor ! Increment) + snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(5) + snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(10) + expectDeleteSnapshotCompleted(retentionProbe, 5, 0) + + (11 to 13).foreach(_ => persistentActor ! Increment) + snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(13) + // no deletes triggered by snapshotWhen + retentionProbe.expectNoMessage() + + (14 to 16).foreach(_ => persistentActor ! Increment) + persistentActor ! GetValue(replyProbe.ref) + replyProbe.expectMessage(State(16, (0 until 16).toVector)) + snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(15) + expectDeleteSnapshotCompleted(retentionProbe, 10, 5) + retentionProbe.expectNoMessage() + } + + "be possible to combine snapshotWhen and retention criteria withDeleteEventsOnSnapshot" in { + val pid = nextPid() + val snapshotProbe = TestProbe[Try[SnapshotMetadata]]() + val retentionProbe = TestProbe[Try[EventSourcedSignal]]() + val replyProbe = TestProbe[State]() + + val persistentActor = spawn( + Behaviors.setup[Command]( + ctx ⇒ + counterWithSnapshotAndRetentionProbe(ctx, pid, snapshotProbe.ref, retentionProbe.ref) + .snapshotWhen((_, _, seqNr) => seqNr == 3 || seqNr == 13) + .withRetention( + RetentionCriteria.snapshotEvery(numberOfEvents = 2, keepNSnapshots = 3).withDeleteEventsOnSnapshot))) + + (1 to 3).foreach(_ => persistentActor ! Increment) + persistentActor ! GetValue(replyProbe.ref) + replyProbe.expectMessage(State(3, (0 until 3).toVector)) + snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(2) + snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(3) + retentionProbe.expectNoMessage() + + (4 to 10).foreach(_ => persistentActor ! Increment) + snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(4) + snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(6) + snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(8) + retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 2 + expectDeleteSnapshotCompleted(retentionProbe, 1, 0) + snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(10) + retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 4 + expectDeleteSnapshotCompleted(retentionProbe, 3, 0) + + (11 to 13).foreach(_ => persistentActor ! Increment) + snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(12) + retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 6 + expectDeleteSnapshotCompleted(retentionProbe, 5, 0) + snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(13) + // no deletes triggered by snapshotWhen + retentionProbe.expectNoMessage() + + (14 to 16).foreach(_ => persistentActor ! Increment) + snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(14) + retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 8 + expectDeleteSnapshotCompleted(retentionProbe, 7, 1) + snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(16) + retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 10 + expectDeleteSnapshotCompleted(retentionProbe, 9, 3) + retentionProbe.expectNoMessage() + } + + "be possible to snapshot every event" in { + // very bad idea to snapshot every event, but technically possible + val pid = nextPid() + val snapshotProbe = TestProbe[Try[SnapshotMetadata]]() + val retentionProbe = TestProbe[Try[EventSourcedSignal]]() + val replyProbe = TestProbe[State]() + + val persistentActor = spawn( + Behaviors.setup[Command](ctx ⇒ + counterWithSnapshotAndRetentionProbe(ctx, pid, snapshotProbe.ref, retentionProbe.ref) + .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 1, keepNSnapshots = 3)))) + + (1 to 10).foreach(_ => persistentActor ! Increment) + persistentActor ! GetValue(replyProbe.ref) + replyProbe.expectMessage(State(10, (0 until 10).toVector)) + snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(1) + snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(2) + snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(3) + snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(4) + expectDeleteSnapshotCompleted(retentionProbe, 1, 0) + + snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(5) + expectDeleteSnapshotCompleted(retentionProbe, 2, 0) + + snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(6) + expectDeleteSnapshotCompleted(retentionProbe, 3, 0) + + snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(7) + expectDeleteSnapshotCompleted(retentionProbe, 4, 1) + + snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(8) + expectDeleteSnapshotCompleted(retentionProbe, 5, 2) + + snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(9) + expectDeleteSnapshotCompleted(retentionProbe, 6, 3) + + snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(10) + expectDeleteSnapshotCompleted(retentionProbe, 7, 4) + } + + "be possible to snapshot every event withDeleteEventsOnSnapshot" in { + // very bad idea to snapshot every event, but technically possible + val pid = nextPid() + val snapshotProbe = TestProbe[Try[SnapshotMetadata]]() + val retentionProbe = TestProbe[Try[EventSourcedSignal]]() + val replyProbe = TestProbe[State]() + + val persistentActor = spawn( + Behaviors.setup[Command](ctx ⇒ + counterWithSnapshotAndRetentionProbe(ctx, pid, snapshotProbe.ref, retentionProbe.ref).withRetention( + RetentionCriteria.snapshotEvery(numberOfEvents = 1, keepNSnapshots = 3).withDeleteEventsOnSnapshot))) + + (1 to 10).foreach(_ => persistentActor ! Increment) + persistentActor ! GetValue(replyProbe.ref) + replyProbe.expectMessage(State(10, (0 until 10).toVector)) + snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(1) + snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(2) + snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(3) + snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(4) + retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 1 + + snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(5) + retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 2 + expectDeleteSnapshotCompleted(retentionProbe, 1, 0) + + snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(6) + retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 3 + expectDeleteSnapshotCompleted(retentionProbe, 2, 0) + + snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(7) + retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 4 + expectDeleteSnapshotCompleted(retentionProbe, 3, 0) + + snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(8) + retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 5 + expectDeleteSnapshotCompleted(retentionProbe, 4, 1) + + snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(9) + retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 6 + expectDeleteSnapshotCompleted(retentionProbe, 5, 2) + + snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(10) + retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 7 + expectDeleteSnapshotCompleted(retentionProbe, 6, 3) + } + + def watcher(toWatch: ActorRef[_]): TestProbe[String] = { + val probe = TestProbe[String]() + val w = Behaviors.setup[Any] { ctx => + ctx.watch(toWatch) + Behaviors + .receive[Any] { (_, _) => + Behaviors.same + } + .receiveSignal { + case (_, _: Terminated) => + probe.ref ! "Terminated" + Behaviors.stopped + } + } + spawn(w) + probe + } + } +} diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala index 5c0b5d48df..42c390a30b 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala @@ -32,19 +32,13 @@ import akka.persistence.query.PersistenceQuery import akka.persistence.query.Sequence import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal import akka.persistence.snapshot.SnapshotStore -import akka.persistence.typed.DeleteEventsCompleted -import akka.persistence.typed.DeleteSnapshotsCompleted -import akka.persistence.typed.DeletionTarget import akka.persistence.typed.EventAdapter -import akka.persistence.typed.EventSourcedSignal import akka.persistence.typed.ExpectingReply import akka.persistence.typed.PersistenceId import akka.persistence.typed.RecoveryCompleted -import akka.persistence.typed.RetentionCriteria import akka.persistence.typed.SnapshotCompleted import akka.persistence.typed.SnapshotFailed import akka.persistence.typed.SnapshotMetadata -import akka.persistence.typed.SnapshotSelectionCriteria import akka.persistence.{ SnapshotMetadata => UntypedSnapshotMetadata } import akka.persistence.{ SnapshotSelectionCriteria => UntypedSnapshotSelectionCriteria } import akka.stream.ActorMaterializer @@ -153,8 +147,7 @@ object EventSourcedBehaviorSpec { persistenceId, loggingActor = TestProbe[String].ref, probe = TestProbe[(State, Event)].ref, - snapshotProbe = TestProbe[Try[SnapshotMetadata]].ref, - retentionProbe = TestProbe[Try[EventSourcedSignal]].ref) + snapshotProbe = TestProbe[Try[SnapshotMetadata]].ref) def counter(ctx: ActorContext[Command], persistenceId: PersistenceId, logging: ActorRef[String])( implicit system: ActorSystem[_]): EventSourcedBehavior[Command, Event, State] = @@ -163,8 +156,7 @@ object EventSourcedBehaviorSpec { persistenceId, loggingActor = logging, probe = TestProbe[(State, Event)].ref, - TestProbe[Try[SnapshotMetadata]].ref, - TestProbe[Try[EventSourcedSignal]].ref) + TestProbe[Try[SnapshotMetadata]].ref) def counterWithProbe( ctx: ActorContext[Command], @@ -172,52 +164,25 @@ object EventSourcedBehaviorSpec { probe: ActorRef[(State, Event)], snapshotProbe: ActorRef[Try[SnapshotMetadata]])( implicit system: ActorSystem[_]): EventSourcedBehavior[Command, Event, State] = - counter(ctx, persistenceId, TestProbe[String].ref, probe, snapshotProbe, TestProbe[Try[EventSourcedSignal]].ref) + counter(ctx, persistenceId, TestProbe[String].ref, probe, snapshotProbe) def counterWithProbe(ctx: ActorContext[Command], persistenceId: PersistenceId, probe: ActorRef[(State, Event)])( implicit system: ActorSystem[_]): EventSourcedBehavior[Command, Event, State] = - counter( - ctx, - persistenceId, - TestProbe[String].ref, - probe, - TestProbe[Try[SnapshotMetadata]].ref, - TestProbe[Try[EventSourcedSignal]].ref) + counter(ctx, persistenceId, TestProbe[String].ref, probe, TestProbe[Try[SnapshotMetadata]].ref) def counterWithSnapshotProbe( ctx: ActorContext[Command], persistenceId: PersistenceId, probe: ActorRef[Try[SnapshotMetadata]])( implicit system: ActorSystem[_]): EventSourcedBehavior[Command, Event, State] = - counter( - ctx, - persistenceId, - TestProbe[String].ref, - TestProbe[(State, Event)].ref, - snapshotProbe = probe, - TestProbe[Try[EventSourcedSignal]].ref) - - def counterWithSnapshotAndRetentionProbe( - ctx: ActorContext[Command], - persistenceId: PersistenceId, - probeS: ActorRef[Try[SnapshotMetadata]], - probeR: ActorRef[Try[EventSourcedSignal]])( - implicit system: ActorSystem[_]): EventSourcedBehavior[Command, Event, State] = - counter( - ctx, - persistenceId, - TestProbe[String].ref, - TestProbe[(State, Event)].ref, - snapshotProbe = probeS, - retentionProbe = probeR) + counter(ctx, persistenceId, TestProbe[String].ref, TestProbe[(State, Event)].ref, snapshotProbe = probe) def counter( ctx: ActorContext[Command], persistenceId: PersistenceId, loggingActor: ActorRef[String], probe: ActorRef[(State, Event)], - snapshotProbe: ActorRef[Try[SnapshotMetadata]], - retentionProbe: ActorRef[Try[EventSourcedSignal]]): EventSourcedBehavior[Command, Event, State] = { + snapshotProbe: ActorRef[Try[SnapshotMetadata]]): EventSourcedBehavior[Command, Event, State] = { EventSourcedBehavior[Command, Event, State]( persistenceId, emptyState = State(0, Vector.empty), @@ -323,8 +288,6 @@ object EventSourcedBehaviorSpec { snapshotProbe ! Success(metadata) case (_, SnapshotFailed(_, failure)) ⇒ snapshotProbe ! Failure(failure) - case (_, e: EventSourcedSignal) => - retentionProbe ! Success(e) } } } @@ -496,123 +459,6 @@ class EventSourcedBehaviorSpec extends ScalaTestWithActorTestKit(EventSourcedBeh watchProbe.expectMessage("Terminated") } - "snapshot via predicate" in { - val pid = nextPid - val snapshotProbe = TestProbe[Try[SnapshotMetadata]] - val alwaysSnapshot: Behavior[Command] = - Behaviors.setup { ctx => - counterWithSnapshotProbe(ctx, pid, snapshotProbe.ref).snapshotWhen { (_, _, _) => - true - } - } - val c = spawn(alwaysSnapshot) - val watchProbe = watcher(c) - val replyProbe = TestProbe[State]() - - c ! Increment - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(1) - c ! GetValue(replyProbe.ref) - replyProbe.expectMessage(State(1, Vector(0))) - c ! LogThenStop - watchProbe.expectMessage("Terminated") - - val probe = TestProbe[(State, Event)]() - val c2 = spawn(Behaviors.setup[Command](ctx => counterWithProbe(ctx, pid, probe.ref))) - // state should be rebuilt from snapshot, no events replayed - // Fails as snapshot is async (i think) - probe.expectNoMessage() - c2 ! Increment - c2 ! GetValue(replyProbe.ref) - replyProbe.expectMessage(State(2, Vector(0, 1))) - } - - "check all events for snapshot in PersistAll" in { - val pid = nextPid - val snapshotProbe = TestProbe[Try[SnapshotMetadata]] - val snapshotAtTwo = Behaviors.setup[Command](ctx => - counterWithSnapshotProbe(ctx, pid, snapshotProbe.ref).snapshotWhen { (s, _, _) => - s.value == 2 - }) - val c: ActorRef[Command] = spawn(snapshotAtTwo) - val watchProbe = watcher(c) - val replyProbe = TestProbe[State]() - - c ! IncrementWithPersistAll(3) - - c ! GetValue(replyProbe.ref) - replyProbe.expectMessage(State(3, Vector(0, 1, 2))) - // snapshot at seqNr 3 because of persistAll - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(3) - c ! LogThenStop - watchProbe.expectMessage("Terminated") - - val probeC2 = TestProbe[(State, Event)]() - val c2 = spawn(Behaviors.setup[Command](ctx => counterWithProbe(ctx, pid, probeC2.ref))) - // middle event triggered all to be snapshot - probeC2.expectNoMessage() - c2 ! GetValue(replyProbe.ref) - replyProbe.expectMessage(State(3, Vector(0, 1, 2))) - } - - "snapshot every N sequence nrs" in { - val pid = nextPid - val c = spawn(Behaviors.setup[Command](ctx => - counter(ctx, pid).withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 2, keepNSnapshots = 2)))) - val watchProbe = watcher(c) - val replyProbe = TestProbe[State]() - - c ! Increment - c ! GetValue(replyProbe.ref) - replyProbe.expectMessage(State(1, Vector(0))) - c ! LogThenStop - watchProbe.expectMessage("Terminated") - - // no snapshot should have happened - val probeC2 = TestProbe[(State, Event)]() - val snapshotProbe = TestProbe[Try[SnapshotMetadata]]() - val c2 = spawn(Behaviors.setup[Command](ctx => - counterWithProbe(ctx, pid, probeC2.ref, snapshotProbe.ref).withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 2, keepNSnapshots = 2)))) - probeC2.expectMessage[(State, Event)]((State(0, Vector()), Incremented(1))) - val watchProbeC2 = watcher(c2) - c2 ! Increment - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(2) - c2 ! LogThenStop - watchProbeC2.expectMessage("Terminated") - - val probeC3 = TestProbe[(State, Event)]() - val c3 = spawn(Behaviors.setup[Command](ctx => - counterWithProbe(ctx, pid, probeC3.ref).withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 2, keepNSnapshots = 2)))) - // this time it should have been snapshotted so no events to replay - probeC3.expectNoMessage() - c3 ! GetValue(replyProbe.ref) - replyProbe.expectMessage(State(2, Vector(0, 1))) - } - - "snapshot every N sequence nrs when persisting multiple events" in { - val pid = nextPid - val snapshotProbe = TestProbe[Try[SnapshotMetadata]]() - val c = - spawn(Behaviors.setup[Command](ctx => - counterWithSnapshotProbe(ctx, pid, snapshotProbe.ref).withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 2, keepNSnapshots = 2)))) - val watchProbe = watcher(c) - val replyProbe = TestProbe[State]() - - c ! IncrementWithPersistAll(3) - c ! GetValue(replyProbe.ref) - replyProbe.expectMessage(State(3, Vector(0, 1, 2))) - // snapshot at seqNr 3 because of persistAll - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(3) - c ! LogThenStop - watchProbe.expectMessage("Terminated") - - val probeC2 = TestProbe[(State, Event)]() - val c2 = spawn(Behaviors.setup[Command](ctx => - counterWithProbe(ctx, pid, probeC2.ref).withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 2, keepNSnapshots = 2)))) - probeC2.expectNoMessage() - c2 ! GetValue(replyProbe.ref) - replyProbe.expectMessage(State(3, Vector(0, 1, 2))) - } - "wrap persistent behavior in tap" in { val probe = TestProbe[Command] val wrapped: Behavior[Command] = Behaviors.monitor(probe.ref, counter(nextPid)) @@ -766,93 +612,6 @@ class EventSourcedBehaviorSpec extends ScalaTestWithActorTestKit(EventSourcedBeh } } - def expectDeleteSnapshotCompleted( - retentionProbe: TestProbe[Try[EventSourcedSignal]], - maxSequenceNr: Long, - minSequenceNr: Long): Unit = { - retentionProbe.expectMessageType[Success[DeleteSnapshotsCompleted]].value should ===( - DeleteSnapshotsCompleted(DeletionTarget.Criteria( - SnapshotSelectionCriteria.latest.withMaxSequenceNr(maxSequenceNr).withMinSequenceNr(minSequenceNr)))) - } - - "delete snapshots automatically, based on criteria" in { - val pid = nextPid - val snapshotProbe = TestProbe[Try[SnapshotMetadata]]() - val retentionProbe = TestProbe[Try[EventSourcedSignal]]() - val replyProbe = TestProbe[State]() - - val persistentActor = spawn( - Behaviors.setup[Command](ctx ⇒ - counterWithSnapshotAndRetentionProbe(ctx, pid, snapshotProbe.ref, retentionProbe.ref) - .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 3, keepNSnapshots = 2)))) - - (1 to 10).foreach(_ => persistentActor ! Increment) - persistentActor ! GetValue(replyProbe.ref) - replyProbe.expectMessage(State(10, (0 until 10).toVector)) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(3) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(6) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(9) - expectDeleteSnapshotCompleted(retentionProbe, 3, 0) - - (1 to 10).foreach(_ => persistentActor ! Increment) - persistentActor ! GetValue(replyProbe.ref) - replyProbe.expectMessage(State(20, (0 until 20).toVector)) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(12) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(15) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(18) - expectDeleteSnapshotCompleted(retentionProbe, 6, 0) - expectDeleteSnapshotCompleted(retentionProbe, 9, 3) - expectDeleteSnapshotCompleted(retentionProbe, 12, 6) - - retentionProbe.expectNoMessage() - } - - "optionally delete both old events and snapshots" in { - val pid = nextPid - val snapshotProbe = TestProbe[Try[SnapshotMetadata]]() - val retentionProbe = TestProbe[Try[EventSourcedSignal]]() - val replyProbe = TestProbe[State]() - - val persistentActor = spawn( - Behaviors.setup[Command]( - ctx ⇒ - counterWithSnapshotAndRetentionProbe(ctx, pid, snapshotProbe.ref, retentionProbe.ref).withRetention( - // tests the Java API as well - RetentionCriteria.snapshotEvery(numberOfEvents = 3, keepNSnapshots = 2).withDeleteEventsOnSnapshot()))) - - (1 to 10).foreach(_ => persistentActor ! Increment) - persistentActor ! GetValue(replyProbe.ref) - replyProbe.expectMessage(State(10, (0 until 10).toVector)) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(3) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(6) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(9) - - retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 3 - // Note that when triggering deletion of snapshots from deletion of events it is intentionally "off by one". - // The reason for -1 is that a snapshot at the exact toSequenceNr is still useful and the events - // after that can be replayed after that snapshot, but replaying the events after toSequenceNr without - // starting at the snapshot at toSequenceNr would be invalid. - expectDeleteSnapshotCompleted(retentionProbe, 2, 0) - - (1 to 10).foreach(_ => persistentActor ! Increment) - persistentActor ! GetValue(replyProbe.ref) - replyProbe.expectMessage(State(20, (0 until 20).toVector)) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(12) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(15) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(18) - - retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 6 - expectDeleteSnapshotCompleted(retentionProbe, 5, 0) - - retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 9 - expectDeleteSnapshotCompleted(retentionProbe, 8, 2) - - retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 12 - expectDeleteSnapshotCompleted(retentionProbe, 11, 5) - - retentionProbe.expectNoMessage() - } - "fail fast if persistenceId is null" in { intercept[IllegalArgumentException] { PersistenceId(null) diff --git a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala index 74c5b88461..5ce7f799be 100644 --- a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala +++ b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala @@ -6,17 +6,20 @@ package docs.akka.persistence.typed import scala.concurrent.duration._ -import akka.actor.typed.{ Behavior, SupervisorStrategy } +import akka.actor.typed.Behavior +import akka.actor.typed.SupervisorStrategy import akka.actor.typed.scaladsl.Behaviors -import akka.persistence.typed.scaladsl.EventSourcedBehavior import akka.persistence.typed.DeleteEventsFailed import akka.persistence.typed.DeleteSnapshotsFailed -import akka.persistence.typed.SnapshotFailed import akka.persistence.typed.PersistenceId import akka.persistence.typed.RecoveryCompleted +import akka.persistence.typed.SnapshotFailed +import akka.persistence.typed.scaladsl.EventSourcedBehavior object BasicPersistentBehaviorCompileOnly { + import akka.persistence.typed.scaladsl.RetentionCriteria + object FirstExample { //#command sealed trait Command @@ -147,17 +150,12 @@ object BasicPersistentBehaviorCompileOnly { final case class BookingCompleted(orderNr: String) extends Event //#snapshottingEveryN - import akka.persistence.typed.RetentionCriteria val snapshottingEveryN = EventSourcedBehavior[Command, Event, State]( persistenceId = PersistenceId("abc"), emptyState = State(), commandHandler = (state, cmd) => throw new RuntimeException("TODO: process the command & return an Effect"), eventHandler = (state, evt) => throw new RuntimeException("TODO: process the event return the next state")) - .snapshotWhen { - case (_, BookingCompleted(_), _) => true - case (_, _, _) => false - } .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 1000, keepNSnapshots = 2)) //#snapshottingEveryN @@ -185,7 +183,6 @@ object BasicPersistentBehaviorCompileOnly { //#snapshotSelection //#retentionCriteria - import akka.persistence.typed.RetentionCriteria val snapshotRetention = EventSourcedBehavior[Command, Event, State]( persistenceId = PersistenceId("abc"), @@ -196,41 +193,35 @@ object BasicPersistentBehaviorCompileOnly { case (state, BookingCompleted(_), sequenceNumber) => true case (state, event, sequenceNumber) => false } - .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 1000, keepNSnapshots = 5)) + .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100, keepNSnapshots = 2)) //#retentionCriteria //#snapshotAndEventDeletes - import akka.persistence.typed.RetentionCriteria val snapshotAndEventsRetention = EventSourcedBehavior[Command, Event, State]( persistenceId = PersistenceId("abc"), emptyState = State(), commandHandler = (state, cmd) => Effect.noReply, // do something based on a particular command and state eventHandler = (state, evt) => state) // do something based on a particular event and state - .snapshotWhen { - case (state, BookingCompleted(_), sequenceNumber) => true - case (state, event, sequenceNumber) => false + .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100, keepNSnapshots = 2).withDeleteEventsOnSnapshot) + .receiveSignal { // optionally respond to signals + case (state, _: SnapshotFailed) => // react to failure + case (state, _: DeleteSnapshotsFailed) => // react to failure + case (state, _: DeleteEventsFailed) => // react to failure } - .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 1000, keepNSnapshots = 2).withDeleteEventsOnSnapshot) //#snapshotAndEventDeletes //#retentionCriteriaWithSignals - import akka.persistence.typed.RetentionCriteria val fullDeletesSampleWithSignals = EventSourcedBehavior[Command, Event, State]( persistenceId = PersistenceId("abc"), emptyState = State(), commandHandler = (state, cmd) => Effect.noReply, // do something based on a particular command and state eventHandler = (state, evt) => state) // do something based on a particular event and state - .snapshotWhen { - case (state, BookingCompleted(_), sequenceNumber) => true - case (state, event, sequenceNumber) => false - } - .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 1000, keepNSnapshots = 2)) + .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100, keepNSnapshots = 2)) .receiveSignal { // optionally respond to signals case (state, _: SnapshotFailed) => // react to failure case (state, _: DeleteSnapshotsFailed) => // react to failure - case (state, _: DeleteEventsFailed) => // react to failure } //#retentionCriteriaWithSignals