Make RetentionCritera evolvable, #26545

* allow combining snapshotWhen and RetentionCriteria
* RetentionCritieria unit test
* move retention and snapshot tests to separate EventSourcedBehaviorRetentionSpec
  * because EventSourcedBehaviorSpec is testing too many different things
* Update Scaldoc and reference documentation
  * moved it to the snapshotting page
This commit is contained in:
Patrik Nordwall 2019-03-28 07:55:02 +01:00
parent a2f65de599
commit 81f11c97dd
19 changed files with 1028 additions and 473 deletions

View file

@ -7,7 +7,7 @@ prone to accumulating extremely long event logs and experiencing long recovery t
may be to split out into a set of shorter lived actors. However, when this is not an option, you can use snapshots
to reduce recovery times drastically.
Persistent actors can save snapshots of internal state every N events or when a given predicated of the state
Persistent actors can save snapshots of internal state every N events or when a given predicate of the state
is fulfilled.
Scala
@ -46,7 +46,9 @@ events. This can be useful if snapshot serialization format has changed in an in
not be used when events have been deleted.
In order to use snapshots, a default snapshot-store (`akka.persistence.snapshot-store.plugin`) must be configured,
or the @scala[`PersistentActor`]@java[persistent actor] can pick a snapshot store explicitly by overriding @scala[`def snapshotPluginId: String`]@java[`String snapshotPluginId()`].
or you can pick a snapshot store for for a specific `EventSourcedBehavior by
@scala[defining it with `withSnapshotPluginId` of the `EventSourcedBehavior`]@java[overriding `snapshotPluginId` in
the `EventSourcedBehavior`].
Because some use cases may not benefit from or need snapshots, it is perfectly valid not to not configure a snapshot store.
However, Akka will log a warning message when this situation is detected and then continue to operate until
@ -55,10 +57,71 @@ an actor tries to store a snapshot, at which point the operation will fail.
## Snapshot failures
Saving snapshots can either succeed or fail this information is reported back to the persistent actor via
the `onSnapshot` callback. Snapshot failures are, by default, logged but do not cause the actor to stop or
restart.
the `SnapshotCompleted` or `SnapshotFailed` signal. Snapshot failures are logged by default but do not cause
the actor to stop or restart.
If there is a problem with recovering the state of the actor from the journal when the actor is
started, `onRecoveryFailure` is called (logging the error by default), and the actor will be stopped.
started, `RecoveryFailed` signal is emitted (logging the error by default), and the actor will be stopped.
Note that failure to load snapshot is also treated like this, but you can disable loading of snapshots
if you for example know that serialization format has changed in an incompatible way.
## Snapshot deletion
To free up space, an event sourced actor can automatically delete older snapshots based on the given `RetentionCriteria`.
Scala
: @@snip [BasicPersistentBehaviorCompileOnly.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala) { #retentionCriteria }
Java
: @@snip [BasicPersistentBehaviorTest.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java) { #retentionCriteria #snapshottingPredicate }
Snapshot deletion is triggered after saving a new snapshot.
The above example will save snapshots automatically every `numberOfEvents = 100`. Snapshots that have sequence
number less than the sequence number of the saved snapshot minus `keepNSnapshots * numberOfEvents` (`100 * 2`) are automatically
deleted.
In addition, it will also save a snapshot when the persisted event is `BookingCompleted`. Automatic snapshotting
based on `numberOfEvents` can be used without specifying @scala[`snapshotWhen`]@java[`shouldSnapshot`]. Snapshots
triggered by the @scala[`snapshotWhen`]@java[`shouldSnapshot`] predicate will not trigger deletion of old snapshots.
On async deletion, either a `DeleteSnapshotsCompleted` or `DeleteSnapshotsFailed` signal is emitted.
You can react to signal outcomes by using @scala[with `receiveSignal` handler] @java[by overriding `receiveSignal`].
By default, successful completion is logged by the system at log level `debug`, failures at log level `warning`.
Scala
: @@snip [BasicPersistentBehaviorCompileOnly.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala) { #retentionCriteriaWithSignals }
Java
: @@snip [BasicPersistentBehaviorTest.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java) { #retentionCriteriaWithSignals }
## Event deletion
Deleting events in event sourcing based applications is typically either not used at all, or used in conjunction with snapshotting.
By deleting events you will lose the history of how the system changed before it reached current state, which is
one of the main reasons for using event sourcing in the first place.
If snapshot-based retention is enabled, after a snapshot has been successfully stored, a delete of the events
(journaled by a single event sourced actor) up until the sequence number of the data held by that snapshot can be issued.
To elect to use this, enable `withDeleteEventsOnSnapshot` of the `RetentionCriteria` which is disabled by default.
Scala
: @@snip [BasicPersistentBehaviorCompileOnly.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala) { #snapshotAndEventDeletes }
Java
: @@snip [BasicPersistentBehaviorTest.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java) { #snapshotAndEventDeletes }
Event deletion is triggered after saving a new snapshot. Old events would be deleted prior to old snapshots being deleted.
On async deletion, either a `DeleteEventsCompleted` or `DeleteEventsFailed` signal is emitted.
You can react to signal outcomes by using @scala[with `receiveSignal` handler] @java[by overriding `receiveSignal`].
By default, successful completion is logged by the system at log level `debug`, failures at log level `warning`.
Message deletion does not affect the highest sequence number of the journal, even if all messages were deleted from it after a delete occurs.
@@@ note
It is up to the journal implementation whether events are actually removed from storage.
@@@

View file

@ -453,54 +453,3 @@ processed.
It's allowed to stash messages while unstashing. Those newly added commands will not be processed by the
`unstashAll` effect that was in progress and have to be unstashed by another `unstashAll`.
## Retention - snapshots and events
Retention of snapshots and events are controlled by a few factors. Deletes to free up space is currently available.
### Snapshot deletion
To free up space, an event sourced actor can automatically delete older snapshots
based on a user provided or default `RetentionCriteria` @scala[from `withRetention`] @java[by overriding `retentionCriteria`]
combined with the `snapshotWhen` method.
Scala
: @@snip [BasicPersistentBehaviorCompileOnly.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala) { #retentionCriteria }
Java
: @@snip [BasicPersistentBehaviorTest.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java) { #retentionCriteria }
On async deletion, either a `SnapshotCompleted` or `SnapshotFailed` is emitted. Successful completion is logged by the system at log level `debug`, failures at log level `warning`.
You can leverage `EventSourcedSignal` to react to outcomes @scala[with `receiveSignal` handler] @java[by overriding `receiveSignal`].
Scala
: @@snip [BasicPersistentBehaviorCompileOnly.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala) { #retentionCriteriaWithSignals }
Java
: @@snip [BasicPersistentBehaviorTest.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java) { #retentionCriteriaWithSignals }
## Event deletion
Deleting events in event sourcing based applications is typically either not used at all, or used in conjunction with snapshotting.
If snapshot-based recovery is enabled, after a snapshot has been successfully stored, a delete (journaled by a single event sourced actor) up until the sequence number of the data held by that snapshot can be issued.
To elect to use this, enable `RetentionCriteria.deleteEventsOnSnapshot` which is disabled by default.
You can leverage `EventSourcedSignal` to react to outcomes @scala[with `receiveSignal` handler] @java[by overriding `receiveSignal`].
Scala
: @@snip [BasicPersistentBehaviorCompileOnly.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala) { #snapshotAndEventDeletes }
Java
: @@snip [BasicPersistentBehaviorTest.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java) { #retentionCriteria }
On `SaveSnapshotSuccess`, old events would be deleted based on `RetentionCriteria` prior to old snapshots being deleted. On async deletion, either `DeleteEventsCompleted` or `DeleteEventsFailed` is emitted. Successful completion is logged by the
system at log level `debug`, failures at log level `warning`.
Message deletion does not affect the highest sequence number of the journal, even if all messages were deleted from it after a delete occurs.
@@@ note
It is up to the journal implementation whether events are actually removed from storage.
Deleting events prevents future replaying of old events to apply new state.
@@@ note

View file

@ -1,52 +0,0 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed
/**
* Setup snapshot and event delete/retention behavior. Retention bridges snapshot
* and journal behavior. This defines the retention criteria.
*
* @param snapshotEveryNEvents Snapshots are used to reduce playback/recovery times.
* This defines when a new snapshot is persisted - on every N events.
* `snapshotEveryNEvents` should be greater than 0.
*
* @param keepNSnapshots After a snapshot is successfully completed,
* - if 2: retain last maximum 2 *`snapshot-size` events
* and 3 snapshots (2 old + latest snapshot)
* - if 0: all events with equal or lower sequence number
* will not be retained.
*
* @param deleteEventsOnSnapshot Opt-in ability to delete older events on successful
* save of snapshot. Defaults to disabled.
*/
final case class RetentionCriteria(snapshotEveryNEvents: Long, keepNSnapshots: Long, deleteEventsOnSnapshot: Boolean) {
/**
* Delete Messages:
* {{{ toSequenceNr - keepNSnapshots * snapshotEveryNEvents }}}
* Delete Snapshots:
* {{{ (toSequenceNr - 1) - (keepNSnapshots * snapshotEveryNEvents) }}}
*
* @param lastSequenceNr the sequence number to delete to if `deleteEventsOnSnapshot` is false
*/
def toSequenceNumber(lastSequenceNr: Long): Long = {
// Delete old events, retain the latest
math.max(0, lastSequenceNr - (keepNSnapshots * snapshotEveryNEvents))
}
/** Java API. */
def withDeleteEventsOnSnapshot(): RetentionCriteria =
copy(deleteEventsOnSnapshot = true)
}
object RetentionCriteria {
val disabled: RetentionCriteria =
RetentionCriteria(snapshotEveryNEvents = 0, keepNSnapshots = 0, deleteEventsOnSnapshot = false)
def snapshotEvery(numberOfEvents: Long, keepNSnapshots: Long): RetentionCriteria =
apply(numberOfEvents, keepNSnapshots, false)
}

View file

@ -16,11 +16,21 @@ import akka.annotation.InternalApi
import akka.persistence._
import akka.persistence.typed.EventAdapter
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.RetentionCriteria
import akka.persistence.typed.scaladsl.EventSourcedBehavior
import akka.persistence.typed.scaladsl.RetentionCriteria
import akka.util.ConstantFun
import akka.util.OptionVal
/**
* INTERNAL API
*/
@InternalApi private[akka] object BehaviorSetup {
sealed trait SnapshotAfterPersist
case object NoSnapshot extends SnapshotAfterPersist
case object SnapshotWithRetention extends SnapshotAfterPersist
case object SnapshotWithoutRetention extends SnapshotAfterPersist
}
/**
* INTERNAL API: Carry state for the Persistent behavior implementation behaviors.
*/
@ -44,6 +54,7 @@ private[akka] final class BehaviorSetup[C, E, S](
import InternalProtocol.RecoveryTickEvent
import akka.actor.typed.scaladsl.adapter._
import BehaviorSetup._
val persistence: Persistence = Persistence(context.system.toUntyped)
@ -121,6 +132,18 @@ private[akka] final class BehaviorSetup[C, E, S](
}
}
def shouldSnapshot(state: S, event: E, sequenceNr: Long): SnapshotAfterPersist = {
retention match {
case DisabledRetentionCriteria =>
if (snapshotWhen(state, event, sequenceNr)) SnapshotWithoutRetention
else NoSnapshot
case s: SnapshotCountRetentionCriteriaImpl =>
if (s.snapshotWhen(sequenceNr)) SnapshotWithRetention
else if (snapshotWhen(state, event, sequenceNr)) SnapshotWithoutRetention
else NoSnapshot
}
}
}
/**

View file

@ -7,8 +7,6 @@ package akka.persistence.typed.internal
import java.util.UUID
import java.util.concurrent.atomic.AtomicInteger
import scala.util.control.NonFatal
import akka.actor.typed
import akka.actor.typed.BackoffSupervisorStrategy
import akka.actor.typed.Behavior
@ -31,10 +29,10 @@ import akka.persistence.typed.DeletionTarget
import akka.persistence.typed.EventAdapter
import akka.persistence.typed.NoOpEventAdapter
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.RetentionCriteria
import akka.persistence.typed.SnapshotCompleted
import akka.persistence.typed.SnapshotFailed
import akka.persistence.typed.SnapshotSelectionCriteria
import akka.persistence.typed.scaladsl.RetentionCriteria
import akka.persistence.typed.scaladsl._
import akka.util.ConstantFun
@ -164,9 +162,6 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
handler: PartialFunction[(State, Signal), Unit]): EventSourcedBehavior[Command, Event, State] =
copy(signalHandler = handler)
override def snapshotWhen(predicate: (State, Event, Long) => Boolean): EventSourcedBehavior[Command, Event, State] =
copy(snapshotWhen = predicate)
override def withJournalPluginId(id: String): EventSourcedBehavior[Command, Event, State] = {
require(id != null, "journal plugin id must not be null; use empty string for 'default' journal")
copy(journalPluginId = if (id != "") Some(id) else None)
@ -182,12 +177,11 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
copy(recovery = Recovery(selection.toUntyped))
}
override def withRetention(criteria: RetentionCriteria): EventSourcedBehavior[Command, Event, State] = {
require(
criteria.snapshotEveryNEvents > 0,
s"'snapshotEveryNEvents' must be positive but was ${criteria.snapshotEveryNEvents}")
copy(retention = criteria, snapshotWhen = (_, _, seqNr) => seqNr % criteria.snapshotEveryNEvents == 0)
}
override def snapshotWhen(predicate: (State, Event, Long) => Boolean): EventSourcedBehavior[Command, Event, State] =
copy(snapshotWhen = predicate)
override def withRetention(criteria: RetentionCriteria): EventSourcedBehavior[Command, Event, State] =
copy(retention = criteria)
override def withTagger(tagger: Event => Set[String]): EventSourcedBehavior[Command, Event, State] =
copy(tagger = tagger)

View file

@ -11,7 +11,6 @@ import scala.concurrent.duration._
import akka.actor.typed.ActorSystem
import akka.annotation.InternalApi
import akka.persistence.Persistence
import akka.persistence.Persistence.verifyPluginConfigIsDefined
import com.typesafe.config.Config
/**

View file

@ -105,27 +105,22 @@ private[akka] trait JournalInteractions[C, E, S] {
}
/**
* On [[akka.persistence.SaveSnapshotSuccess]], if [[akka.persistence.typed.RetentionCriteria.deleteEventsOnSnapshot]]
* is enabled, old messages are deleted based on [[akka.persistence.typed.RetentionCriteria.snapshotEveryNEvents]]
* On [[akka.persistence.SaveSnapshotSuccess]], if `SnapshotCountRetentionCriteria.deleteEventsOnSnapshot`
* is enabled, old messages are deleted based on `SnapshotCountRetentionCriteria.snapshotEveryNEvents`
* before old snapshots are deleted.
*/
protected def internalDeleteEvents(e: SaveSnapshotSuccess, state: Running.RunningState[S]): Unit =
if (setup.retention.deleteEventsOnSnapshot) {
val toSequenceNr = setup.retention.toSequenceNumber(e.metadata.sequenceNr)
protected def internalDeleteEvents(lastSequenceNr: Long, toSequenceNr: Long): Unit =
if (toSequenceNr > 0) {
val lastSequenceNr = state.seqNr
val self = setup.selfUntyped
if (toSequenceNr == Long.MaxValue || toSequenceNr <= lastSequenceNr)
setup.journal ! JournalProtocol.DeleteMessagesTo(e.metadata.persistenceId, toSequenceNr, self)
setup.journal ! JournalProtocol.DeleteMessagesTo(setup.persistenceId.id, toSequenceNr, self)
else
self ! DeleteMessagesFailure(
new RuntimeException(
s"toSequenceNr [$toSequenceNr] must be less than or equal to lastSequenceNr [$lastSequenceNr]"),
toSequenceNr)
}
}
}
/** INTERNAL API */
@ -153,12 +148,8 @@ private[akka] trait SnapshotInteractions[C, E, S] {
}
/** Deletes the snapshots up to and including the `sequenceNr`. */
protected def internalDeleteSnapshots(toSequenceNr: Long): Unit = {
protected def internalDeleteSnapshots(fromSequenceNr: Long, toSequenceNr: Long): Unit = {
if (toSequenceNr > 0) {
// We could use 0 as fromSequenceNr to delete all older snapshots, but that might be inefficient for
// large ranges depending on how it's implemented in the snapshot plugin. Therefore we use the
// same window as defined for how much to keep in the retention criteria
val fromSequenceNr = setup.retention.toSequenceNumber(toSequenceNr)
val snapshotCriteria = SnapshotSelectionCriteria(minSequenceNr = fromSequenceNr, maxSequenceNr = toSequenceNr)
setup.log.debug("Deleting snapshots from sequenceNr [{}] to [{}]", fromSequenceNr, toSequenceNr)
setup.snapshotStore

View file

@ -0,0 +1,55 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed.internal
import akka.annotation.InternalApi
import akka.persistence.typed.scaladsl
import akka.persistence.typed.javadsl
/**
* INTERNAL API
*/
@InternalApi private[akka] final case class SnapshotCountRetentionCriteriaImpl(
snapshotEveryNEvents: Int,
keepNSnapshots: Int,
deleteEventsOnSnapshot: Boolean)
extends javadsl.SnapshotCountRetentionCriteria
with scaladsl.SnapshotCountRetentionCriteria {
require(snapshotEveryNEvents > 0, s"'snapshotEveryNEvents' must be greater than 0, was [$snapshotEveryNEvents]")
require(keepNSnapshots > 0, s"'keepNSnapshots' must be greater than 0, was [$keepNSnapshots]")
def snapshotWhen(currentSequenceNr: Long): Boolean =
currentSequenceNr % snapshotEveryNEvents == 0
def deleteUpperSequenceNr(lastSequenceNr: Long): Long = {
// Delete old events, retain the latest
math.max(0, lastSequenceNr - (keepNSnapshots * snapshotEveryNEvents))
}
def deleteLowerSequenceNr(upperSequenceNr: Long): Long = {
// We could use 0 as fromSequenceNr to delete all older snapshots, but that might be inefficient for
// large ranges depending on how it's implemented in the snapshot plugin. Therefore we use the
// same window as defined for how much to keep in the retention criteria
math.max(0, upperSequenceNr - (keepNSnapshots * snapshotEveryNEvents))
}
override def withDeleteEventsOnSnapshot: SnapshotCountRetentionCriteriaImpl =
copy(deleteEventsOnSnapshot = true)
override def asScala: scaladsl.RetentionCriteria = this
override def asJava: javadsl.RetentionCriteria = this
}
/**
* INTERNAL API
*/
@InternalApi private[akka] case object DisabledRetentionCriteria
extends javadsl.RetentionCriteria
with scaladsl.RetentionCriteria {
override def asScala: scaladsl.RetentionCriteria = this
override def asJava: javadsl.RetentionCriteria = this
}

View file

@ -94,6 +94,7 @@ private[akka] object Running {
with StashManagement[C, E, S] {
import InternalProtocol._
import Running.RunningState
import BehaviorSetup._
private val runningCmdsMdc = MDC.create(setup.persistenceId, MDC.RunningCmds)
private val persistingEventsMdc = MDC.create(setup.persistenceId, MDC.PersistingEvents)
@ -151,7 +152,7 @@ private[akka] object Running {
val newState2 = internalPersist(newState, eventToPersist)
val shouldSnapshotAfterPersist = setup.snapshotWhen(newState2.state, event, newState2.seqNr)
val shouldSnapshotAfterPersist = setup.shouldSnapshot(newState2.state, event, newState2.seqNr)
persistingEvents(newState2, state, numberOfEvents = 1, shouldSnapshotAfterPersist, sideEffects)
@ -161,10 +162,11 @@ private[akka] object Running {
// the invalid event, in case such validation is implemented in the event handler.
// also, ensure that there is an event handler for each single event
var seqNr = state.seqNr
val (newState, shouldSnapshotAfterPersist) = events.foldLeft((state, false)) {
val (newState, shouldSnapshotAfterPersist) = events.foldLeft((state, NoSnapshot: SnapshotAfterPersist)) {
case ((currentState, snapshot), event) =>
seqNr += 1
val shouldSnapshot = snapshot || setup.snapshotWhen(currentState.state, event, seqNr)
val shouldSnapshot =
if (snapshot == NoSnapshot) setup.shouldSnapshot(currentState.state, event, seqNr) else snapshot
(currentState.applyEvent(setup, event), shouldSnapshot)
}
@ -214,7 +216,7 @@ private[akka] object Running {
state: RunningState[S],
visibleState: RunningState[S], // previous state until write success
numberOfEvents: Int,
shouldSnapshotAfterPersist: Boolean,
shouldSnapshotAfterPersist: SnapshotAfterPersist,
sideEffects: immutable.Seq[SideEffect[S]]): Behavior[InternalProtocol] = {
setup.setMdc(persistingEventsMdc)
new PersistingEvents(state, visibleState, numberOfEvents, shouldSnapshotAfterPersist, sideEffects)
@ -225,7 +227,7 @@ private[akka] object Running {
var state: RunningState[S],
var visibleState: RunningState[S], // previous state until write success
numberOfEvents: Int,
shouldSnapshotAfterPersist: Boolean,
shouldSnapshotAfterPersist: SnapshotAfterPersist,
var sideEffects: immutable.Seq[SideEffect[S]])
extends AbstractBehavior[InternalProtocol]
with WithSeqNrAccessible {
@ -264,11 +266,12 @@ private[akka] object Running {
if (eventCounter < numberOfEvents) this
else {
visibleState = state
if (shouldSnapshotAfterPersist && state.state != null) {
internalSaveSnapshot(state)
storingSnapshot(state, sideEffects)
} else
if (shouldSnapshotAfterPersist == NoSnapshot || state.state == null) {
tryUnstashOne(applySideEffects(sideEffects, state))
} else {
internalSaveSnapshot(state)
storingSnapshot(state, sideEffects, shouldSnapshotAfterPersist)
}
}
}
@ -316,7 +319,10 @@ private[akka] object Running {
// ===============================================
def storingSnapshot(state: RunningState[S], sideEffects: immutable.Seq[SideEffect[S]]): Behavior[InternalProtocol] = {
def storingSnapshot(
state: RunningState[S],
sideEffects: immutable.Seq[SideEffect[S]],
snapshotReason: SnapshotAfterPersist): Behavior[InternalProtocol] = {
setup.setMdc(storingSnapshotMdc)
def onCommand(cmd: IncomingCommand[C]): Behavior[InternalProtocol] = {
@ -332,13 +338,22 @@ private[akka] object Running {
def onSaveSnapshotResponse(response: SnapshotProtocol.Response): Unit = {
val signal = response match {
case e @ SaveSnapshotSuccess(meta) =>
// # 24698 The deletion of old events are automatic, snapshots are triggered by the SaveSnapshotSuccess.
case SaveSnapshotSuccess(meta) =>
setup.log.debug(s"Persistent snapshot [{}] saved successfully", meta)
if (setup.retention.deleteEventsOnSnapshot)
internalDeleteEvents(e, state)
else
internalDeleteSnapshots(setup.retention.toSequenceNumber(meta.sequenceNr))
if (snapshotReason == SnapshotWithRetention) {
// deletion of old events and snspahots are triggered by the SaveSnapshotSuccess
setup.retention match {
case DisabledRetentionCriteria => // no further actions
case s @ SnapshotCountRetentionCriteriaImpl(_, _, true) =>
// deleteEventsOnSnapshot == true, deletion of old events
val deleteEventsToSeqNr = s.deleteUpperSequenceNr(meta.sequenceNr)
internalDeleteEvents(meta.sequenceNr, deleteEventsToSeqNr)
case s @ SnapshotCountRetentionCriteriaImpl(_, _, false) =>
// deleteEventsOnSnapshot == false, deletion of old snapshots
val deleteSnapshotsToSeqNr = s.deleteUpperSequenceNr(meta.sequenceNr)
internalDeleteSnapshots(s.deleteLowerSequenceNr(deleteSnapshotsToSeqNr), deleteSnapshotsToSeqNr)
}
}
Some(SnapshotCompleted(SnapshotMetadata.fromUntyped(meta)))
@ -374,7 +389,7 @@ private[akka] object Running {
.receiveSignal {
case (_, PoisonPill) =>
// wait for snapshot response before stopping
storingSnapshot(state.copy(receivedPoisonPill = true), sideEffects)
storingSnapshot(state.copy(receivedPoisonPill = true), sideEffects, snapshotReason)
case (_, signal) =>
setup.onSignal(state.state, signal, catchAndLog = false)
Behaviors.same
@ -430,10 +445,15 @@ private[akka] object Running {
val signal = response match {
case DeleteMessagesSuccess(toSequenceNr) =>
setup.log.debug("Persistent events to sequenceNr [{}] deleted successfully.", toSequenceNr)
setup.retention match {
case DisabledRetentionCriteria => // no further actions
case s: SnapshotCountRetentionCriteriaImpl =>
// The reason for -1 is that a snapshot at the exact toSequenceNr is still useful and the events
// after that can be replayed after that snapshot, but replaying the events after toSequenceNr without
// starting at the snapshot at toSequenceNr would be invalid.
internalDeleteSnapshots(toSequenceNr - 1)
val deleteSnapshotsToSeqNr = toSequenceNr - 1
internalDeleteSnapshots(s.deleteLowerSequenceNr(deleteSnapshotsToSeqNr), deleteSnapshotsToSeqNr)
}
Some(DeleteEventsCompleted(toSequenceNr))
case DeleteMessagesFailure(e, toSequenceNr) =>
Some(DeleteEventsFailed(toSequenceNr, e))

View file

@ -17,6 +17,7 @@ import akka.annotation.InternalApi
import akka.persistence.typed.EventAdapter
import akka.persistence.typed._
import akka.persistence.typed.internal._
import akka.util.unused
@ApiMayChange
abstract class EventSourcedBehavior[Command, Event, State] private[akka] (
@ -100,18 +101,6 @@ abstract class EventSourcedBehavior[Command, Event, State] private[akka] (
protected final def newEventHandlerBuilder(): EventHandlerBuilder[State, Event] =
EventHandlerBuilder.builder[State, Event]()
/**
* Initiates a snapshot if the given function returns true.
* When persisting multiple events at once the snapshot is triggered after all the events have
* been persisted.
*
* receives the State, Event and the sequenceNr used for the Event
*
* @return `true` if snapshot should be saved for the given event
* @see [[EventSourcedBehavior#snapshotEvery]]
*/
def shouldSnapshot(state: State, event: Event, sequenceNr: Long): Boolean = false
/**
* Override and define the journal plugin id that this actor should use instead of the default.
*/
@ -132,26 +121,43 @@ abstract class EventSourcedBehavior[Command, Event, State] private[akka] (
*/
def snapshotSelectionCriteria: SnapshotSelectionCriteria = SnapshotSelectionCriteria.latest
/**
* Initiates a snapshot if the given predicate evaluates to true.
*
* Decide to store a snapshot based on the State, Event and sequenceNr when the event has
* been successfully persisted.
*
* When persisting multiple events at once the snapshot is triggered after all the events have
* been persisted.
*
* Snapshots triggered by `snapshotWhen` will not trigger deletes of old snapshots and events if
* [[EventSourcedBehavior.retentionCriteria]] with [[RetentionCriteria.snapshotEvery]] is used together with
* `shouldSnapshot`. Such deletes are only triggered by snapshots matching the `numberOfEvents` in the
* [[RetentionCriteria]].
*
* @return `true` if snapshot should be saved at the given `state`, `event` and `sequenceNr` when the event has
* been successfully persisted
*/
def shouldSnapshot(@unused state: State, @unused event: Event, @unused sequenceNr: Long): Boolean = false
/**
* Criteria for retention/deletion of snapshots and events.
* By default, retention is disabled and snapshots are not saved and deleted automatically.
*/
def retentionCriteria: RetentionCriteria = RetentionCriteria.disabled
/**
* The `tagger` function should give event tags, which will be used in persistence query
*/
def tagsFor(event: Event): java.util.Set[String] = Collections.emptySet()
def tagsFor(@unused event: Event): java.util.Set[String] = Collections.emptySet()
def eventAdapter(): EventAdapter[Event, _] = NoOpEventAdapter.instance[Event]
def retentionCriteria: RetentionCriteria = RetentionCriteria.disabled
/**
* INTERNAL API: DeferredBehavior init
*/
@InternalApi override def apply(context: typed.TypedActorContext[Command]): Behavior[Command] = {
val snapshotWhen: (State, Event, Long) => Boolean = { (state, event, seqNr) =>
val n = retentionCriteria.snapshotEveryNEvents
if (n > 0)
seqNr % n == 0
else
shouldSnapshot(state, event, seqNr)
}
val snapshotWhen: (State, Event, Long) => Boolean = (state, event, seqNr) => shouldSnapshot(state, event, seqNr)
val tagger: Event => Set[String] = { event =>
import scala.collection.JavaConverters._
@ -167,6 +173,7 @@ abstract class EventSourcedBehavior[Command, Event, State] private[akka] (
eventHandler()(_, _),
getClass)
.snapshotWhen(snapshotWhen)
.withRetention(retentionCriteria.asScala)
.withTagger(tagger)
.eventAdapter(eventAdapter())
.withJournalPluginId(journalPluginId)

View file

@ -0,0 +1,50 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed.javadsl
import akka.annotation.DoNotInherit
import akka.persistence.typed.internal.DisabledRetentionCriteria
import akka.persistence.typed.internal.SnapshotCountRetentionCriteriaImpl
/**
* Criteria for retention/deletion of snapshots and events.
*/
abstract class RetentionCriteria {
def asScala: akka.persistence.typed.scaladsl.RetentionCriteria
}
/**
* Criteria for retention/deletion of snapshots and events.
*/
object RetentionCriteria {
/**
* Snapshots are not saved and deleted automatically, events are not deleted.
*/
val disabled: RetentionCriteria = DisabledRetentionCriteria
/**
* Save snapshots automatically every `numberOfEvents`. Snapshots that have sequence number
* less than the sequence number of the saved snapshot minus `keepNSnapshots * numberOfEvents` are automatically
* deleted.
*
* Use [[SnapshotCountRetentionCriteria.withDeleteEventsOnSnapshot]] to
* delete old events. Events are not deleted by default.
*/
def snapshotEvery(numberOfEvents: Int, keepNSnapshots: Int): SnapshotCountRetentionCriteria =
SnapshotCountRetentionCriteriaImpl(numberOfEvents, keepNSnapshots, deleteEventsOnSnapshot = false)
}
@DoNotInherit abstract class SnapshotCountRetentionCriteria extends RetentionCriteria {
/**
* Delete events after saving snapshot via [[RetentionCriteria.snapshotEvery()]].
* Events that have sequence number less than the snapshot sequence number minus
* `keepNSnapshots * numberOfEvents` are deleted.
*/
def withDeleteEventsOnSnapshot: SnapshotCountRetentionCriteria
}

View file

@ -17,7 +17,6 @@ import akka.annotation.DoNotInherit
import akka.persistence.typed.EventAdapter
import akka.persistence.typed.ExpectingReply
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.RetentionCriteria
import akka.persistence.typed.SnapshotSelectionCriteria
import akka.persistence.typed.internal._
@ -136,15 +135,6 @@ object EventSourcedBehavior {
*/
def signalHandler: PartialFunction[(State, Signal), Unit]
/**
* Initiates a snapshot if the given function returns true.
* When persisting multiple events at once the snapshot is triggered after all the events have
* been persisted.
*
* `predicate` receives the State, Event and the sequenceNr used for the Event
*/
def snapshotWhen(predicate: (State, Event, Long) => Boolean): EventSourcedBehavior[Command, Event, State]
/**
* Change the journal plugin id that this actor should use.
*/
@ -166,7 +156,24 @@ object EventSourcedBehavior {
def withSnapshotSelectionCriteria(selection: SnapshotSelectionCriteria): EventSourcedBehavior[Command, Event, State]
/**
* Criteria for internal retention/deletion of snapshots and events.
* Initiates a snapshot if the given `predicate` evaluates to true.
*
* Decide to store a snapshot based on the State, Event and sequenceNr when the event has
* been successfully persisted.
*
* When persisting multiple events at once the snapshot is triggered after all the events have
* been persisted.
*
* Snapshots triggered by `snapshotWhen` will not trigger deletes of old snapshots and events if
* [[EventSourcedBehavior.withRetention]] with [[RetentionCriteria.snapshotEvery]] is used together with
* `snapshotWhen`. Such deletes are only triggered by snapshots matching the `numberOfEvents` in the
* [[RetentionCriteria]].
*/
def snapshotWhen(predicate: (State, Event, Long) => Boolean): EventSourcedBehavior[Command, Event, State]
/**
* Criteria for retention/deletion of snapshots and events.
* By default, retention is disabled and snapshots are not saved and deleted automatically.
*/
def withRetention(criteria: RetentionCriteria): EventSourcedBehavior[Command, Event, State]

View file

@ -0,0 +1,49 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed.scaladsl
import akka.annotation.DoNotInherit
import akka.persistence.typed.internal.DisabledRetentionCriteria
import akka.persistence.typed.internal.SnapshotCountRetentionCriteriaImpl
/**
* Criteria for retention/deletion of snapshots and events.
*/
trait RetentionCriteria {
def asJava: akka.persistence.typed.javadsl.RetentionCriteria
}
/**
* Criteria for retention/deletion of snapshots and events.
*/
object RetentionCriteria {
/**
* Snapshots are not saved and deleted automatically, events are not deleted.
*/
val disabled: RetentionCriteria = DisabledRetentionCriteria
/**
* Save snapshots automatically every `numberOfEvents`. Snapshots that have sequence number
* less than sequence number of the saved snapshot minus `keepNSnapshots * numberOfEvents` are
* automatically deleted.
*
* Use [[SnapshotCountRetentionCriteria.withDeleteEventsOnSnapshot]] to
* delete old events. Events are not deleted by default.
*/
def snapshotEvery(numberOfEvents: Int, keepNSnapshots: Int): SnapshotCountRetentionCriteria =
SnapshotCountRetentionCriteriaImpl(numberOfEvents, keepNSnapshots, deleteEventsOnSnapshot = false)
}
@DoNotInherit trait SnapshotCountRetentionCriteria extends RetentionCriteria {
/**
* Delete events after saving snapshot via [[RetentionCriteria.snapshotEvery()]].
* Events that have sequence number less than the snapshot sequence number minus
* `keepNSnapshots * numberOfEvents` are deleted.
*/
def withDeleteEventsOnSnapshot: SnapshotCountRetentionCriteria
}

View file

@ -411,6 +411,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
public boolean shouldSnapshot(State state, Incremented event, long sequenceNr) {
return state.value % 2 == 0;
}
@Override
public SignalHandler signalHandler() {
return newSignalHandlerBuilder()

View file

@ -8,19 +8,18 @@ import akka.actor.typed.Behavior;
import akka.actor.typed.SupervisorStrategy;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.persistence.typed.DeleteEventsFailed;
import akka.persistence.typed.DeleteSnapshotsFailed;
import akka.persistence.typed.PersistenceId;
import akka.persistence.typed.RecoveryCompleted;
import akka.persistence.typed.RetentionCriteria;
import akka.persistence.typed.SnapshotFailed;
import akka.persistence.typed.DeleteSnapshotsFailed;
import akka.persistence.typed.DeleteEventsFailed;
import akka.persistence.typed.javadsl.CommandHandler;
import akka.persistence.typed.javadsl.EventHandler;
import akka.persistence.typed.javadsl.EventSourcedBehavior;
import akka.persistence.typed.javadsl.RetentionCriteria;
import akka.persistence.typed.javadsl.SignalHandler;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
@ -278,8 +277,7 @@ public class BasicPersistentBehaviorTest {
// #retentionCriteria
@Override // override retentionCriteria in EventSourcedBehavior
public RetentionCriteria retentionCriteria() {
// to also delete events use `RetentionCriteria.withDeleteEvents()`
return RetentionCriteria.snapshotEvery(1000, 2);
return RetentionCriteria.snapshotEvery(100, 2);
}
// #retentionCriteria
@ -308,6 +306,19 @@ public class BasicPersistentBehaviorTest {
}
// #retentionCriteriaWithSignals
}
public static class Snapshotting2 extends Snapshotting {
public Snapshotting2(PersistenceId persistenceId) {
super(persistenceId);
}
// #snapshotAndEventDeletes
@Override // override retentionCriteria in EventSourcedBehavior
public RetentionCriteria retentionCriteria() {
return RetentionCriteria.snapshotEvery(100, 2).withDeleteEventsOnSnapshot();
}
// #snapshotAndEventDeletes
}
}
interface WithActorContext {

View file

@ -0,0 +1,71 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed.internal
import akka.persistence.typed.scaladsl.RetentionCriteria
import org.scalatest.Matchers
import org.scalatest.TestSuite
import org.scalatest.WordSpecLike
class RetentionCriteriaSpec extends TestSuite with Matchers with WordSpecLike {
"RetentionCriteria" must {
"snapshotWhen the sequenceNr matches numberOfEvents" in {
val criteria = RetentionCriteria.snapshotEvery(3, 2).asInstanceOf[SnapshotCountRetentionCriteriaImpl]
criteria.snapshotWhen(1) should ===(false)
criteria.snapshotWhen(2) should ===(false)
criteria.snapshotWhen(3) should ===(true)
criteria.snapshotWhen(4) should ===(false)
criteria.snapshotWhen(6) should ===(true)
criteria.snapshotWhen(21) should ===(true)
criteria.snapshotWhen(31) should ===(false)
}
"have valid sequenceNr range based on keepNSnapshots" in {
val criteria = RetentionCriteria.snapshotEvery(3, 2).asInstanceOf[SnapshotCountRetentionCriteriaImpl]
val expected = List(
1 -> (0 -> 0),
3 -> (0 -> 0),
4 -> (0 -> 0),
6 -> (0 -> 0),
7 -> (0 -> 1),
9 -> (0 -> 3),
10 -> (0 -> 4),
12 -> (0 -> 6),
13 -> (1 -> 7),
15 -> (3 -> 9),
18 -> (6 -> 12),
20 -> (8 -> 14))
expected.foreach {
case (seqNr, (lower, upper)) =>
withClue(s"seqNr=$seqNr:") {
criteria.deleteUpperSequenceNr(seqNr) should ===(upper)
criteria.deleteLowerSequenceNr(upper) should ===(lower)
}
}
}
"require keepNSnapshots >= 1" in {
RetentionCriteria.snapshotEvery(100, 1) // ok
intercept[IllegalArgumentException] {
RetentionCriteria.snapshotEvery(100, 0)
}
intercept[IllegalArgumentException] {
RetentionCriteria.snapshotEvery(100, -1)
}
}
"require numberOfEvents >= 1" in {
RetentionCriteria.snapshotEvery(1, 2) // ok
intercept[IllegalArgumentException] {
RetentionCriteria.snapshotEvery(0, 0)
}
intercept[IllegalArgumentException] {
RetentionCriteria.snapshotEvery(-1, -1)
}
}
}
}

View file

@ -0,0 +1,567 @@
/*
* Copyright (C) 2017-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed.scaladsl
import java.util.UUID
import java.util.concurrent.atomic.AtomicInteger
import scala.util.Failure
import scala.util.Success
import scala.util.Try
import akka.actor.testkit.typed.scaladsl._
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.actor.typed.Terminated
import akka.actor.typed.scaladsl.ActorContext
import akka.actor.typed.scaladsl.Behaviors
import akka.persistence.typed.DeleteEventsCompleted
import akka.persistence.typed.DeleteSnapshotsCompleted
import akka.persistence.typed.DeletionTarget
import akka.persistence.typed.EventSourcedSignal
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.RecoveryCompleted
import akka.persistence.typed.SnapshotCompleted
import akka.persistence.typed.SnapshotFailed
import akka.persistence.typed.SnapshotMetadata
import akka.persistence.typed.SnapshotSelectionCriteria
import akka.testkit.EventFilter
import akka.testkit.TestEvent.Mute
import akka.util.unused
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import org.scalatest.WordSpecLike
object EventSourcedBehaviorRetentionSpec {
def conf: Config = ConfigFactory.parseString(s"""
akka.loglevel = INFO
akka.loggers = [akka.testkit.TestEventListener]
# akka.persistence.typed.log-stashing = on
akka.persistence.journal.leveldb.dir = "target/typed-persistence-${UUID.randomUUID().toString}"
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
akka.persistence.snapshot-store.local.dir = "target/typed-persistence-${UUID.randomUUID().toString}"
""")
sealed trait Command
final case object Increment extends Command
final case class IncrementWithPersistAll(nr: Int) extends Command
final case class GetValue(replyTo: ActorRef[State]) extends Command
final case object StopIt extends Command
sealed trait Event
final case class Incremented(delta: Int) extends Event
final case class State(value: Int, history: Vector[Int])
def counter(persistenceId: PersistenceId)(implicit system: ActorSystem[_]): Behavior[Command] =
Behaviors.setup(ctx => counter(ctx, persistenceId))
def counter(ctx: ActorContext[Command], persistenceId: PersistenceId)(
implicit system: ActorSystem[_]): EventSourcedBehavior[Command, Event, State] =
counter(
ctx,
persistenceId,
probe = TestProbe[(State, Event)].ref,
snapshotProbe = TestProbe[Try[SnapshotMetadata]].ref,
retentionProbe = TestProbe[Try[EventSourcedSignal]].ref)
def counterWithProbe(
ctx: ActorContext[Command],
persistenceId: PersistenceId,
probe: ActorRef[(State, Event)],
snapshotProbe: ActorRef[Try[SnapshotMetadata]])(
implicit system: ActorSystem[_]): EventSourcedBehavior[Command, Event, State] =
counter(ctx, persistenceId, probe, snapshotProbe, TestProbe[Try[EventSourcedSignal]].ref)
def counterWithProbe(ctx: ActorContext[Command], persistenceId: PersistenceId, probe: ActorRef[(State, Event)])(
implicit system: ActorSystem[_]): EventSourcedBehavior[Command, Event, State] =
counter(ctx, persistenceId, probe, TestProbe[Try[SnapshotMetadata]].ref, TestProbe[Try[EventSourcedSignal]].ref)
def counterWithSnapshotProbe(
ctx: ActorContext[Command],
persistenceId: PersistenceId,
probe: ActorRef[Try[SnapshotMetadata]])(
implicit system: ActorSystem[_]): EventSourcedBehavior[Command, Event, State] =
counter(
ctx,
persistenceId,
TestProbe[(State, Event)].ref,
snapshotProbe = probe,
TestProbe[Try[EventSourcedSignal]].ref)
def counterWithSnapshotAndRetentionProbe(
ctx: ActorContext[Command],
persistenceId: PersistenceId,
probeS: ActorRef[Try[SnapshotMetadata]],
probeR: ActorRef[Try[EventSourcedSignal]])(
implicit system: ActorSystem[_]): EventSourcedBehavior[Command, Event, State] =
counter(ctx, persistenceId, TestProbe[(State, Event)].ref, snapshotProbe = probeS, retentionProbe = probeR)
def counter(
@unused ctx: ActorContext[Command],
persistenceId: PersistenceId,
probe: ActorRef[(State, Event)],
snapshotProbe: ActorRef[Try[SnapshotMetadata]],
retentionProbe: ActorRef[Try[EventSourcedSignal]]): EventSourcedBehavior[Command, Event, State] = {
EventSourcedBehavior[Command, Event, State](
persistenceId,
emptyState = State(0, Vector.empty),
commandHandler = (state, cmd) =>
cmd match {
case Increment =>
Effect.persist(Incremented(1))
case IncrementWithPersistAll(n) =>
Effect.persist((0 until n).map(_ => Incremented(1)))
case GetValue(replyTo) =>
replyTo ! state
Effect.none
case StopIt =>
Effect.none.thenStop()
},
eventHandler = (state, evt)
evt match {
case Incremented(delta)
probe ! ((state, evt))
State(state.value + delta, state.history :+ state.value)
}).receiveSignal {
case (_, RecoveryCompleted) => ()
case (_, SnapshotCompleted(metadata))
snapshotProbe ! Success(metadata)
case (_, SnapshotFailed(_, failure))
snapshotProbe ! Failure(failure)
case (_, e: EventSourcedSignal) =>
retentionProbe ! Success(e)
}
}
}
class EventSourcedBehaviorRetentionSpec
extends ScalaTestWithActorTestKit(EventSourcedBehaviorRetentionSpec.conf)
with WordSpecLike {
import EventSourcedBehaviorRetentionSpec._
import akka.actor.typed.scaladsl.adapter._
// needed for the untyped event filter
implicit val actorSystem = system.toUntyped
val pidCounter = new AtomicInteger(0)
private def nextPid(): PersistenceId = PersistenceId(s"c${pidCounter.incrementAndGet()})")
actorSystem.eventStream.publish(Mute(EventFilter.info(pattern = ".*was not delivered.*", occurrences = 100)))
actorSystem.eventStream.publish(Mute(EventFilter.warning(pattern = ".*received dead letter.*", occurrences = 100)))
"EventSourcedBehavior with retention" must {
"snapshot every N sequence nrs" in {
val pid = nextPid()
val c = spawn(Behaviors.setup[Command](ctx =>
counter(ctx, pid).withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 2, keepNSnapshots = 2))))
val watchProbe = watcher(c)
val replyProbe = TestProbe[State]()
c ! Increment
c ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(1, Vector(0)))
c ! StopIt
watchProbe.expectMessage("Terminated")
// no snapshot should have happened
val probeC2 = TestProbe[(State, Event)]()
val snapshotProbe = TestProbe[Try[SnapshotMetadata]]()
val c2 = spawn(
Behaviors.setup[Command](ctx =>
counterWithProbe(ctx, pid, probeC2.ref, snapshotProbe.ref)
.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 2, keepNSnapshots = 2))))
probeC2.expectMessage[(State, Event)]((State(0, Vector()), Incremented(1)))
val watchProbeC2 = watcher(c2)
c2 ! Increment
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(2)
c2 ! StopIt
watchProbeC2.expectMessage("Terminated")
val probeC3 = TestProbe[(State, Event)]()
val c3 = spawn(
Behaviors.setup[Command](ctx =>
counterWithProbe(ctx, pid, probeC3.ref)
.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 2, keepNSnapshots = 2))))
// this time it should have been snapshotted so no events to replay
probeC3.expectNoMessage()
c3 ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(2, Vector(0, 1)))
}
"snapshot every N sequence nrs when persisting multiple events" in {
val pid = nextPid()
val snapshotProbe = TestProbe[Try[SnapshotMetadata]]()
val c =
spawn(
Behaviors.setup[Command](ctx =>
counterWithSnapshotProbe(ctx, pid, snapshotProbe.ref)
.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 2, keepNSnapshots = 2))))
val watchProbe = watcher(c)
val replyProbe = TestProbe[State]()
c ! IncrementWithPersistAll(3)
c ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(3, Vector(0, 1, 2)))
// snapshot at seqNr 3 because of persistAll
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(3)
c ! StopIt
watchProbe.expectMessage("Terminated")
val probeC2 = TestProbe[(State, Event)]()
val c2 = spawn(
Behaviors.setup[Command](ctx =>
counterWithProbe(ctx, pid, probeC2.ref)
.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 2, keepNSnapshots = 2))))
probeC2.expectNoMessage()
c2 ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(3, Vector(0, 1, 2)))
}
"snapshot via predicate" in {
val pid = nextPid()
val snapshotProbe = TestProbe[Try[SnapshotMetadata]]
val alwaysSnapshot: Behavior[Command] =
Behaviors.setup { ctx =>
counterWithSnapshotProbe(ctx, pid, snapshotProbe.ref).snapshotWhen { (_, _, _) =>
true
}
}
val c = spawn(alwaysSnapshot)
val watchProbe = watcher(c)
val replyProbe = TestProbe[State]()
c ! Increment
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(1)
c ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(1, Vector(0)))
c ! StopIt
watchProbe.expectMessage("Terminated")
val probe = TestProbe[(State, Event)]()
val c2 = spawn(Behaviors.setup[Command](ctx => counterWithProbe(ctx, pid, probe.ref)))
// state should be rebuilt from snapshot, no events replayed
// Fails as snapshot is async (i think)
probe.expectNoMessage()
c2 ! Increment
c2 ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(2, Vector(0, 1)))
}
"check all events for snapshot in PersistAll" in {
val pid = nextPid()
val snapshotProbe = TestProbe[Try[SnapshotMetadata]]
val snapshotAtTwo = Behaviors.setup[Command](ctx =>
counterWithSnapshotProbe(ctx, pid, snapshotProbe.ref).snapshotWhen { (s, _, _) =>
s.value == 2
})
val c: ActorRef[Command] = spawn(snapshotAtTwo)
val watchProbe = watcher(c)
val replyProbe = TestProbe[State]()
c ! IncrementWithPersistAll(3)
c ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(3, Vector(0, 1, 2)))
// snapshot at seqNr 3 because of persistAll
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(3)
c ! StopIt
watchProbe.expectMessage("Terminated")
val probeC2 = TestProbe[(State, Event)]()
val c2 = spawn(Behaviors.setup[Command](ctx => counterWithProbe(ctx, pid, probeC2.ref)))
// middle event triggered all to be snapshot
probeC2.expectNoMessage()
c2 ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(3, Vector(0, 1, 2)))
}
def expectDeleteSnapshotCompleted(
retentionProbe: TestProbe[Try[EventSourcedSignal]],
maxSequenceNr: Long,
minSequenceNr: Long): Unit = {
retentionProbe.expectMessageType[Success[DeleteSnapshotsCompleted]].value should ===(
DeleteSnapshotsCompleted(DeletionTarget.Criteria(
SnapshotSelectionCriteria.latest.withMaxSequenceNr(maxSequenceNr).withMinSequenceNr(minSequenceNr))))
}
"delete snapshots automatically, based on criteria" in {
val pid = nextPid()
val snapshotProbe = TestProbe[Try[SnapshotMetadata]]()
val retentionProbe = TestProbe[Try[EventSourcedSignal]]()
val replyProbe = TestProbe[State]()
val persistentActor = spawn(
Behaviors.setup[Command](ctx
counterWithSnapshotAndRetentionProbe(ctx, pid, snapshotProbe.ref, retentionProbe.ref)
.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 3, keepNSnapshots = 2))))
(1 to 10).foreach(_ => persistentActor ! Increment)
persistentActor ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(10, (0 until 10).toVector))
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(3)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(6)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(9)
expectDeleteSnapshotCompleted(retentionProbe, 3, 0)
(1 to 10).foreach(_ => persistentActor ! Increment)
persistentActor ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(20, (0 until 20).toVector))
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(12)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(15)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(18)
expectDeleteSnapshotCompleted(retentionProbe, 6, 0)
expectDeleteSnapshotCompleted(retentionProbe, 9, 3)
expectDeleteSnapshotCompleted(retentionProbe, 12, 6)
retentionProbe.expectNoMessage()
}
"optionally delete both old events and snapshots" in {
val pid = nextPid()
val snapshotProbe = TestProbe[Try[SnapshotMetadata]]()
val retentionProbe = TestProbe[Try[EventSourcedSignal]]()
val replyProbe = TestProbe[State]()
val persistentActor = spawn(
Behaviors.setup[Command](
ctx
counterWithSnapshotAndRetentionProbe(ctx, pid, snapshotProbe.ref, retentionProbe.ref).withRetention(
// tests the Java API as well
RetentionCriteria.snapshotEvery(numberOfEvents = 3, keepNSnapshots = 2).withDeleteEventsOnSnapshot)))
(1 to 10).foreach(_ => persistentActor ! Increment)
persistentActor ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(10, (0 until 10).toVector))
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(3)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(6)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(9)
retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 3
// Note that when triggering deletion of snapshots from deletion of events it is intentionally "off by one".
// The reason for -1 is that a snapshot at the exact toSequenceNr is still useful and the events
// after that can be replayed after that snapshot, but replaying the events after toSequenceNr without
// starting at the snapshot at toSequenceNr would be invalid.
expectDeleteSnapshotCompleted(retentionProbe, 2, 0)
(1 to 10).foreach(_ => persistentActor ! Increment)
persistentActor ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(20, (0 until 20).toVector))
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(12)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(15)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(18)
retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 6
expectDeleteSnapshotCompleted(retentionProbe, 5, 0)
retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 9
expectDeleteSnapshotCompleted(retentionProbe, 8, 2)
retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 12
expectDeleteSnapshotCompleted(retentionProbe, 11, 5)
retentionProbe.expectNoMessage()
}
"be possible to combine snapshotWhen and retention criteria" in {
val pid = nextPid()
val snapshotProbe = TestProbe[Try[SnapshotMetadata]]()
val retentionProbe = TestProbe[Try[EventSourcedSignal]]()
val replyProbe = TestProbe[State]()
val persistentActor = spawn(
Behaviors.setup[Command](
ctx
counterWithSnapshotAndRetentionProbe(ctx, pid, snapshotProbe.ref, retentionProbe.ref)
.snapshotWhen((_, _, seqNr) => seqNr == 3 || seqNr == 13)
.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 5, keepNSnapshots = 1))))
(1 to 3).foreach(_ => persistentActor ! Increment)
persistentActor ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(3, (0 until 3).toVector))
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(3)
retentionProbe.expectNoMessage()
(4 to 10).foreach(_ => persistentActor ! Increment)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(5)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(10)
expectDeleteSnapshotCompleted(retentionProbe, 5, 0)
(11 to 13).foreach(_ => persistentActor ! Increment)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(13)
// no deletes triggered by snapshotWhen
retentionProbe.expectNoMessage()
(14 to 16).foreach(_ => persistentActor ! Increment)
persistentActor ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(16, (0 until 16).toVector))
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(15)
expectDeleteSnapshotCompleted(retentionProbe, 10, 5)
retentionProbe.expectNoMessage()
}
"be possible to combine snapshotWhen and retention criteria withDeleteEventsOnSnapshot" in {
val pid = nextPid()
val snapshotProbe = TestProbe[Try[SnapshotMetadata]]()
val retentionProbe = TestProbe[Try[EventSourcedSignal]]()
val replyProbe = TestProbe[State]()
val persistentActor = spawn(
Behaviors.setup[Command](
ctx
counterWithSnapshotAndRetentionProbe(ctx, pid, snapshotProbe.ref, retentionProbe.ref)
.snapshotWhen((_, _, seqNr) => seqNr == 3 || seqNr == 13)
.withRetention(
RetentionCriteria.snapshotEvery(numberOfEvents = 2, keepNSnapshots = 3).withDeleteEventsOnSnapshot)))
(1 to 3).foreach(_ => persistentActor ! Increment)
persistentActor ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(3, (0 until 3).toVector))
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(2)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(3)
retentionProbe.expectNoMessage()
(4 to 10).foreach(_ => persistentActor ! Increment)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(4)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(6)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(8)
retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 2
expectDeleteSnapshotCompleted(retentionProbe, 1, 0)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(10)
retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 4
expectDeleteSnapshotCompleted(retentionProbe, 3, 0)
(11 to 13).foreach(_ => persistentActor ! Increment)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(12)
retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 6
expectDeleteSnapshotCompleted(retentionProbe, 5, 0)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(13)
// no deletes triggered by snapshotWhen
retentionProbe.expectNoMessage()
(14 to 16).foreach(_ => persistentActor ! Increment)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(14)
retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 8
expectDeleteSnapshotCompleted(retentionProbe, 7, 1)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(16)
retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 10
expectDeleteSnapshotCompleted(retentionProbe, 9, 3)
retentionProbe.expectNoMessage()
}
"be possible to snapshot every event" in {
// very bad idea to snapshot every event, but technically possible
val pid = nextPid()
val snapshotProbe = TestProbe[Try[SnapshotMetadata]]()
val retentionProbe = TestProbe[Try[EventSourcedSignal]]()
val replyProbe = TestProbe[State]()
val persistentActor = spawn(
Behaviors.setup[Command](ctx
counterWithSnapshotAndRetentionProbe(ctx, pid, snapshotProbe.ref, retentionProbe.ref)
.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 1, keepNSnapshots = 3))))
(1 to 10).foreach(_ => persistentActor ! Increment)
persistentActor ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(10, (0 until 10).toVector))
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(1)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(2)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(3)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(4)
expectDeleteSnapshotCompleted(retentionProbe, 1, 0)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(5)
expectDeleteSnapshotCompleted(retentionProbe, 2, 0)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(6)
expectDeleteSnapshotCompleted(retentionProbe, 3, 0)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(7)
expectDeleteSnapshotCompleted(retentionProbe, 4, 1)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(8)
expectDeleteSnapshotCompleted(retentionProbe, 5, 2)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(9)
expectDeleteSnapshotCompleted(retentionProbe, 6, 3)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(10)
expectDeleteSnapshotCompleted(retentionProbe, 7, 4)
}
"be possible to snapshot every event withDeleteEventsOnSnapshot" in {
// very bad idea to snapshot every event, but technically possible
val pid = nextPid()
val snapshotProbe = TestProbe[Try[SnapshotMetadata]]()
val retentionProbe = TestProbe[Try[EventSourcedSignal]]()
val replyProbe = TestProbe[State]()
val persistentActor = spawn(
Behaviors.setup[Command](ctx
counterWithSnapshotAndRetentionProbe(ctx, pid, snapshotProbe.ref, retentionProbe.ref).withRetention(
RetentionCriteria.snapshotEvery(numberOfEvents = 1, keepNSnapshots = 3).withDeleteEventsOnSnapshot)))
(1 to 10).foreach(_ => persistentActor ! Increment)
persistentActor ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(10, (0 until 10).toVector))
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(1)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(2)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(3)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(4)
retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 1
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(5)
retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 2
expectDeleteSnapshotCompleted(retentionProbe, 1, 0)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(6)
retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 3
expectDeleteSnapshotCompleted(retentionProbe, 2, 0)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(7)
retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 4
expectDeleteSnapshotCompleted(retentionProbe, 3, 0)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(8)
retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 5
expectDeleteSnapshotCompleted(retentionProbe, 4, 1)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(9)
retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 6
expectDeleteSnapshotCompleted(retentionProbe, 5, 2)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(10)
retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 7
expectDeleteSnapshotCompleted(retentionProbe, 6, 3)
}
def watcher(toWatch: ActorRef[_]): TestProbe[String] = {
val probe = TestProbe[String]()
val w = Behaviors.setup[Any] { ctx =>
ctx.watch(toWatch)
Behaviors
.receive[Any] { (_, _) =>
Behaviors.same
}
.receiveSignal {
case (_, _: Terminated) =>
probe.ref ! "Terminated"
Behaviors.stopped
}
}
spawn(w)
probe
}
}
}

View file

@ -32,19 +32,13 @@ import akka.persistence.query.PersistenceQuery
import akka.persistence.query.Sequence
import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal
import akka.persistence.snapshot.SnapshotStore
import akka.persistence.typed.DeleteEventsCompleted
import akka.persistence.typed.DeleteSnapshotsCompleted
import akka.persistence.typed.DeletionTarget
import akka.persistence.typed.EventAdapter
import akka.persistence.typed.EventSourcedSignal
import akka.persistence.typed.ExpectingReply
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.RecoveryCompleted
import akka.persistence.typed.RetentionCriteria
import akka.persistence.typed.SnapshotCompleted
import akka.persistence.typed.SnapshotFailed
import akka.persistence.typed.SnapshotMetadata
import akka.persistence.typed.SnapshotSelectionCriteria
import akka.persistence.{ SnapshotMetadata => UntypedSnapshotMetadata }
import akka.persistence.{ SnapshotSelectionCriteria => UntypedSnapshotSelectionCriteria }
import akka.stream.ActorMaterializer
@ -153,8 +147,7 @@ object EventSourcedBehaviorSpec {
persistenceId,
loggingActor = TestProbe[String].ref,
probe = TestProbe[(State, Event)].ref,
snapshotProbe = TestProbe[Try[SnapshotMetadata]].ref,
retentionProbe = TestProbe[Try[EventSourcedSignal]].ref)
snapshotProbe = TestProbe[Try[SnapshotMetadata]].ref)
def counter(ctx: ActorContext[Command], persistenceId: PersistenceId, logging: ActorRef[String])(
implicit system: ActorSystem[_]): EventSourcedBehavior[Command, Event, State] =
@ -163,8 +156,7 @@ object EventSourcedBehaviorSpec {
persistenceId,
loggingActor = logging,
probe = TestProbe[(State, Event)].ref,
TestProbe[Try[SnapshotMetadata]].ref,
TestProbe[Try[EventSourcedSignal]].ref)
TestProbe[Try[SnapshotMetadata]].ref)
def counterWithProbe(
ctx: ActorContext[Command],
@ -172,52 +164,25 @@ object EventSourcedBehaviorSpec {
probe: ActorRef[(State, Event)],
snapshotProbe: ActorRef[Try[SnapshotMetadata]])(
implicit system: ActorSystem[_]): EventSourcedBehavior[Command, Event, State] =
counter(ctx, persistenceId, TestProbe[String].ref, probe, snapshotProbe, TestProbe[Try[EventSourcedSignal]].ref)
counter(ctx, persistenceId, TestProbe[String].ref, probe, snapshotProbe)
def counterWithProbe(ctx: ActorContext[Command], persistenceId: PersistenceId, probe: ActorRef[(State, Event)])(
implicit system: ActorSystem[_]): EventSourcedBehavior[Command, Event, State] =
counter(
ctx,
persistenceId,
TestProbe[String].ref,
probe,
TestProbe[Try[SnapshotMetadata]].ref,
TestProbe[Try[EventSourcedSignal]].ref)
counter(ctx, persistenceId, TestProbe[String].ref, probe, TestProbe[Try[SnapshotMetadata]].ref)
def counterWithSnapshotProbe(
ctx: ActorContext[Command],
persistenceId: PersistenceId,
probe: ActorRef[Try[SnapshotMetadata]])(
implicit system: ActorSystem[_]): EventSourcedBehavior[Command, Event, State] =
counter(
ctx,
persistenceId,
TestProbe[String].ref,
TestProbe[(State, Event)].ref,
snapshotProbe = probe,
TestProbe[Try[EventSourcedSignal]].ref)
def counterWithSnapshotAndRetentionProbe(
ctx: ActorContext[Command],
persistenceId: PersistenceId,
probeS: ActorRef[Try[SnapshotMetadata]],
probeR: ActorRef[Try[EventSourcedSignal]])(
implicit system: ActorSystem[_]): EventSourcedBehavior[Command, Event, State] =
counter(
ctx,
persistenceId,
TestProbe[String].ref,
TestProbe[(State, Event)].ref,
snapshotProbe = probeS,
retentionProbe = probeR)
counter(ctx, persistenceId, TestProbe[String].ref, TestProbe[(State, Event)].ref, snapshotProbe = probe)
def counter(
ctx: ActorContext[Command],
persistenceId: PersistenceId,
loggingActor: ActorRef[String],
probe: ActorRef[(State, Event)],
snapshotProbe: ActorRef[Try[SnapshotMetadata]],
retentionProbe: ActorRef[Try[EventSourcedSignal]]): EventSourcedBehavior[Command, Event, State] = {
snapshotProbe: ActorRef[Try[SnapshotMetadata]]): EventSourcedBehavior[Command, Event, State] = {
EventSourcedBehavior[Command, Event, State](
persistenceId,
emptyState = State(0, Vector.empty),
@ -323,8 +288,6 @@ object EventSourcedBehaviorSpec {
snapshotProbe ! Success(metadata)
case (_, SnapshotFailed(_, failure))
snapshotProbe ! Failure(failure)
case (_, e: EventSourcedSignal) =>
retentionProbe ! Success(e)
}
}
}
@ -496,123 +459,6 @@ class EventSourcedBehaviorSpec extends ScalaTestWithActorTestKit(EventSourcedBeh
watchProbe.expectMessage("Terminated")
}
"snapshot via predicate" in {
val pid = nextPid
val snapshotProbe = TestProbe[Try[SnapshotMetadata]]
val alwaysSnapshot: Behavior[Command] =
Behaviors.setup { ctx =>
counterWithSnapshotProbe(ctx, pid, snapshotProbe.ref).snapshotWhen { (_, _, _) =>
true
}
}
val c = spawn(alwaysSnapshot)
val watchProbe = watcher(c)
val replyProbe = TestProbe[State]()
c ! Increment
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(1)
c ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(1, Vector(0)))
c ! LogThenStop
watchProbe.expectMessage("Terminated")
val probe = TestProbe[(State, Event)]()
val c2 = spawn(Behaviors.setup[Command](ctx => counterWithProbe(ctx, pid, probe.ref)))
// state should be rebuilt from snapshot, no events replayed
// Fails as snapshot is async (i think)
probe.expectNoMessage()
c2 ! Increment
c2 ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(2, Vector(0, 1)))
}
"check all events for snapshot in PersistAll" in {
val pid = nextPid
val snapshotProbe = TestProbe[Try[SnapshotMetadata]]
val snapshotAtTwo = Behaviors.setup[Command](ctx =>
counterWithSnapshotProbe(ctx, pid, snapshotProbe.ref).snapshotWhen { (s, _, _) =>
s.value == 2
})
val c: ActorRef[Command] = spawn(snapshotAtTwo)
val watchProbe = watcher(c)
val replyProbe = TestProbe[State]()
c ! IncrementWithPersistAll(3)
c ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(3, Vector(0, 1, 2)))
// snapshot at seqNr 3 because of persistAll
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(3)
c ! LogThenStop
watchProbe.expectMessage("Terminated")
val probeC2 = TestProbe[(State, Event)]()
val c2 = spawn(Behaviors.setup[Command](ctx => counterWithProbe(ctx, pid, probeC2.ref)))
// middle event triggered all to be snapshot
probeC2.expectNoMessage()
c2 ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(3, Vector(0, 1, 2)))
}
"snapshot every N sequence nrs" in {
val pid = nextPid
val c = spawn(Behaviors.setup[Command](ctx =>
counter(ctx, pid).withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 2, keepNSnapshots = 2))))
val watchProbe = watcher(c)
val replyProbe = TestProbe[State]()
c ! Increment
c ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(1, Vector(0)))
c ! LogThenStop
watchProbe.expectMessage("Terminated")
// no snapshot should have happened
val probeC2 = TestProbe[(State, Event)]()
val snapshotProbe = TestProbe[Try[SnapshotMetadata]]()
val c2 = spawn(Behaviors.setup[Command](ctx =>
counterWithProbe(ctx, pid, probeC2.ref, snapshotProbe.ref).withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 2, keepNSnapshots = 2))))
probeC2.expectMessage[(State, Event)]((State(0, Vector()), Incremented(1)))
val watchProbeC2 = watcher(c2)
c2 ! Increment
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(2)
c2 ! LogThenStop
watchProbeC2.expectMessage("Terminated")
val probeC3 = TestProbe[(State, Event)]()
val c3 = spawn(Behaviors.setup[Command](ctx =>
counterWithProbe(ctx, pid, probeC3.ref).withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 2, keepNSnapshots = 2))))
// this time it should have been snapshotted so no events to replay
probeC3.expectNoMessage()
c3 ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(2, Vector(0, 1)))
}
"snapshot every N sequence nrs when persisting multiple events" in {
val pid = nextPid
val snapshotProbe = TestProbe[Try[SnapshotMetadata]]()
val c =
spawn(Behaviors.setup[Command](ctx =>
counterWithSnapshotProbe(ctx, pid, snapshotProbe.ref).withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 2, keepNSnapshots = 2))))
val watchProbe = watcher(c)
val replyProbe = TestProbe[State]()
c ! IncrementWithPersistAll(3)
c ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(3, Vector(0, 1, 2)))
// snapshot at seqNr 3 because of persistAll
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(3)
c ! LogThenStop
watchProbe.expectMessage("Terminated")
val probeC2 = TestProbe[(State, Event)]()
val c2 = spawn(Behaviors.setup[Command](ctx =>
counterWithProbe(ctx, pid, probeC2.ref).withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 2, keepNSnapshots = 2))))
probeC2.expectNoMessage()
c2 ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(3, Vector(0, 1, 2)))
}
"wrap persistent behavior in tap" in {
val probe = TestProbe[Command]
val wrapped: Behavior[Command] = Behaviors.monitor(probe.ref, counter(nextPid))
@ -766,93 +612,6 @@ class EventSourcedBehaviorSpec extends ScalaTestWithActorTestKit(EventSourcedBeh
}
}
def expectDeleteSnapshotCompleted(
retentionProbe: TestProbe[Try[EventSourcedSignal]],
maxSequenceNr: Long,
minSequenceNr: Long): Unit = {
retentionProbe.expectMessageType[Success[DeleteSnapshotsCompleted]].value should ===(
DeleteSnapshotsCompleted(DeletionTarget.Criteria(
SnapshotSelectionCriteria.latest.withMaxSequenceNr(maxSequenceNr).withMinSequenceNr(minSequenceNr))))
}
"delete snapshots automatically, based on criteria" in {
val pid = nextPid
val snapshotProbe = TestProbe[Try[SnapshotMetadata]]()
val retentionProbe = TestProbe[Try[EventSourcedSignal]]()
val replyProbe = TestProbe[State]()
val persistentActor = spawn(
Behaviors.setup[Command](ctx
counterWithSnapshotAndRetentionProbe(ctx, pid, snapshotProbe.ref, retentionProbe.ref)
.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 3, keepNSnapshots = 2))))
(1 to 10).foreach(_ => persistentActor ! Increment)
persistentActor ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(10, (0 until 10).toVector))
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(3)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(6)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(9)
expectDeleteSnapshotCompleted(retentionProbe, 3, 0)
(1 to 10).foreach(_ => persistentActor ! Increment)
persistentActor ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(20, (0 until 20).toVector))
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(12)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(15)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(18)
expectDeleteSnapshotCompleted(retentionProbe, 6, 0)
expectDeleteSnapshotCompleted(retentionProbe, 9, 3)
expectDeleteSnapshotCompleted(retentionProbe, 12, 6)
retentionProbe.expectNoMessage()
}
"optionally delete both old events and snapshots" in {
val pid = nextPid
val snapshotProbe = TestProbe[Try[SnapshotMetadata]]()
val retentionProbe = TestProbe[Try[EventSourcedSignal]]()
val replyProbe = TestProbe[State]()
val persistentActor = spawn(
Behaviors.setup[Command](
ctx
counterWithSnapshotAndRetentionProbe(ctx, pid, snapshotProbe.ref, retentionProbe.ref).withRetention(
// tests the Java API as well
RetentionCriteria.snapshotEvery(numberOfEvents = 3, keepNSnapshots = 2).withDeleteEventsOnSnapshot())))
(1 to 10).foreach(_ => persistentActor ! Increment)
persistentActor ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(10, (0 until 10).toVector))
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(3)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(6)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(9)
retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 3
// Note that when triggering deletion of snapshots from deletion of events it is intentionally "off by one".
// The reason for -1 is that a snapshot at the exact toSequenceNr is still useful and the events
// after that can be replayed after that snapshot, but replaying the events after toSequenceNr without
// starting at the snapshot at toSequenceNr would be invalid.
expectDeleteSnapshotCompleted(retentionProbe, 2, 0)
(1 to 10).foreach(_ => persistentActor ! Increment)
persistentActor ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(20, (0 until 20).toVector))
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(12)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(15)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(18)
retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 6
expectDeleteSnapshotCompleted(retentionProbe, 5, 0)
retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 9
expectDeleteSnapshotCompleted(retentionProbe, 8, 2)
retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 12
expectDeleteSnapshotCompleted(retentionProbe, 11, 5)
retentionProbe.expectNoMessage()
}
"fail fast if persistenceId is null" in {
intercept[IllegalArgumentException] {
PersistenceId(null)

View file

@ -6,17 +6,20 @@ package docs.akka.persistence.typed
import scala.concurrent.duration._
import akka.actor.typed.{ Behavior, SupervisorStrategy }
import akka.actor.typed.Behavior
import akka.actor.typed.SupervisorStrategy
import akka.actor.typed.scaladsl.Behaviors
import akka.persistence.typed.scaladsl.EventSourcedBehavior
import akka.persistence.typed.DeleteEventsFailed
import akka.persistence.typed.DeleteSnapshotsFailed
import akka.persistence.typed.SnapshotFailed
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.RecoveryCompleted
import akka.persistence.typed.SnapshotFailed
import akka.persistence.typed.scaladsl.EventSourcedBehavior
object BasicPersistentBehaviorCompileOnly {
import akka.persistence.typed.scaladsl.RetentionCriteria
object FirstExample {
//#command
sealed trait Command
@ -147,17 +150,12 @@ object BasicPersistentBehaviorCompileOnly {
final case class BookingCompleted(orderNr: String) extends Event
//#snapshottingEveryN
import akka.persistence.typed.RetentionCriteria
val snapshottingEveryN = EventSourcedBehavior[Command, Event, State](
persistenceId = PersistenceId("abc"),
emptyState = State(),
commandHandler = (state, cmd) => throw new RuntimeException("TODO: process the command & return an Effect"),
eventHandler = (state, evt) => throw new RuntimeException("TODO: process the event return the next state"))
.snapshotWhen {
case (_, BookingCompleted(_), _) => true
case (_, _, _) => false
}
.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 1000, keepNSnapshots = 2))
//#snapshottingEveryN
@ -185,7 +183,6 @@ object BasicPersistentBehaviorCompileOnly {
//#snapshotSelection
//#retentionCriteria
import akka.persistence.typed.RetentionCriteria
val snapshotRetention = EventSourcedBehavior[Command, Event, State](
persistenceId = PersistenceId("abc"),
@ -196,41 +193,35 @@ object BasicPersistentBehaviorCompileOnly {
case (state, BookingCompleted(_), sequenceNumber) => true
case (state, event, sequenceNumber) => false
}
.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 1000, keepNSnapshots = 5))
.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100, keepNSnapshots = 2))
//#retentionCriteria
//#snapshotAndEventDeletes
import akka.persistence.typed.RetentionCriteria
val snapshotAndEventsRetention = EventSourcedBehavior[Command, Event, State](
persistenceId = PersistenceId("abc"),
emptyState = State(),
commandHandler = (state, cmd) => Effect.noReply, // do something based on a particular command and state
eventHandler = (state, evt) => state) // do something based on a particular event and state
.snapshotWhen {
case (state, BookingCompleted(_), sequenceNumber) => true
case (state, event, sequenceNumber) => false
.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100, keepNSnapshots = 2).withDeleteEventsOnSnapshot)
.receiveSignal { // optionally respond to signals
case (state, _: SnapshotFailed) => // react to failure
case (state, _: DeleteSnapshotsFailed) => // react to failure
case (state, _: DeleteEventsFailed) => // react to failure
}
.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 1000, keepNSnapshots = 2).withDeleteEventsOnSnapshot)
//#snapshotAndEventDeletes
//#retentionCriteriaWithSignals
import akka.persistence.typed.RetentionCriteria
val fullDeletesSampleWithSignals = EventSourcedBehavior[Command, Event, State](
persistenceId = PersistenceId("abc"),
emptyState = State(),
commandHandler = (state, cmd) => Effect.noReply, // do something based on a particular command and state
eventHandler = (state, evt) => state) // do something based on a particular event and state
.snapshotWhen {
case (state, BookingCompleted(_), sequenceNumber) => true
case (state, event, sequenceNumber) => false
}
.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 1000, keepNSnapshots = 2))
.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100, keepNSnapshots = 2))
.receiveSignal { // optionally respond to signals
case (state, _: SnapshotFailed) => // react to failure
case (state, _: DeleteSnapshotsFailed) => // react to failure
case (state, _: DeleteEventsFailed) => // react to failure
}
//#retentionCriteriaWithSignals