diff --git a/akka-docs/src/main/paradox/typed/persistence.md b/akka-docs/src/main/paradox/typed/persistence.md index 68499cb573..6c45da19a1 100644 --- a/akka-docs/src/main/paradox/typed/persistence.md +++ b/akka-docs/src/main/paradox/typed/persistence.md @@ -453,3 +453,54 @@ processed. It's allowed to stash messages while unstashing. Those newly added commands will not be processed by the `unstashAll` effect that was in progress and have to be unstashed by another `unstashAll`. + +## Retention - snapshots and events + +Retention of snapshots and events are controlled by a few factors. Deletes to free up space is currently available. + +### Snapshot deletion + +To free up space, an event sourced actor can automatically delete older snapshots +based on a user provided or default `RetentionCriteria` @scala[from `withRetention`] @java[by overriding `retentionCriteria`] +combined with either the `snapshotWhen` or `snapshotEvery` methods. + +Scala +: @@snip [BasicPersistentBehaviorCompileOnly.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala) { #snapshotDeletes } + +Java +: @@snip [BasicPersistentBehaviorTest.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java) { #snapshotDeletes } + +On async deletion, either a `SnapshotCompleted` or `SnapshotFailed` is emitted. Successful completion is logged by the system at log level `debug`, failures at log level `warning`. +You can leverage `EventSourcedSignal` to react to outcomes @scala[with `receiveSignal` handler] @java[by overriding `receiveSignal`]. + +Scala +: @@snip [BasicPersistentBehaviorCompileOnly.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala) { #fullDeletesSampleWithSignals } + +Java +: @@snip [BasicPersistentBehaviorTest.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java) { #fullDeletesSampleWithSignals } + +## Event deletion + +Deleting events in event sourcing based applications is typically either not used at all, or used in conjunction with snapshotting. +If snapshot-based recovery is enabled, after a snapshot has been successfully stored, a delete (journaled by a single event sourced actor) up until the sequence number of the data held by that snapshot can be issued. + +To elect to use this, enable `RetentionCriteria.deleteEventsOnSnapshot` which is disabled by default. +You can leverage `EventSourcedSignal` to react to outcomes @scala[with `receiveSignal` handler] @java[by overriding `receiveSignal`]. + +Scala +: @@snip [BasicPersistentBehaviorCompileOnly.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala) { #snapshotAndEventDeletes } + +Java +: @@snip [BasicPersistentBehaviorTest.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java) { #snapshotDeletes } + +On `SaveSnapshotSuccess`, old events would be deleted based on `RetentionCriteria` prior to old snapshots being deleted. On async deletion, either `DeleteEventsCompleted` or `DeleteEventsFailed` is emitted. Successful completion is logged by the +system at log level `debug`, failures at log level `warning`. + +Message deletion does not affect the highest sequence number of the journal, even if all messages were deleted from it after a delete occurs. + +@@@ note + +It is up to the journal implementation whether events are actually removed from storage. +Deleting events prevents future replaying of old events to apply new state. + +@@@ note \ No newline at end of file diff --git a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java index 48cbe9fa88..84213094b5 100644 --- a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java +++ b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java @@ -11,6 +11,9 @@ import akka.actor.typed.javadsl.Behaviors; import akka.persistence.typed.PersistenceId; import akka.persistence.typed.RecoveryCompleted; import akka.persistence.typed.RetentionCriteria; +import akka.persistence.typed.SnapshotFailed; +import akka.persistence.typed.DeleteSnapshotsFailed; +import akka.persistence.typed.DeleteEventsFailed; import akka.persistence.typed.javadsl.CommandHandler; import akka.persistence.typed.javadsl.EventHandler; import akka.persistence.typed.javadsl.EventSourcedBehavior; @@ -279,12 +282,38 @@ public class BasicPersistentBehaviorTest { } // #snapshottingPredicate - // #retentionCriteria + // #snapshotDeletes @Override // override snapshotEvery in EventSourcedBehavior public RetentionCriteria retentionCriteria() { + // to also delete events, use RetentionCriteria.create(1000, 2, true) return RetentionCriteria.create(1000, 2); } - // #retentionCriteria + // #snapshotDeletes + + // #fullDeletesSampleWithSignals + @Override + public SignalHandler signalHandler() { + return newSignalHandlerBuilder() + .onSignal( + SnapshotFailed.class, + (completed) -> { + throw new RuntimeException("TODO: add some on-snapshot-failed side-effect here"); + }) + .onSignal( + DeleteSnapshotsFailed.class, + (completed) -> { + throw new RuntimeException( + "TODO: add some on-delete-snapshot-failed side-effect here"); + }) + .onSignal( + DeleteEventsFailed.class, + (completed) -> { + throw new RuntimeException( + "TODO: add some on-delete-snapshot-failed side-effect here"); + }) + .build(); + } + // #fullDeletesSampleWithSignals } } diff --git a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala index 478e6a327e..f2abbb2f81 100644 --- a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala +++ b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala @@ -4,12 +4,14 @@ package docs.akka.persistence.typed -import akka.actor.typed.ActorRef +import scala.concurrent.duration._ + import akka.actor.typed.{ Behavior, SupervisorStrategy } import akka.actor.typed.scaladsl.Behaviors import akka.persistence.typed.scaladsl.EventSourcedBehavior - -import scala.concurrent.duration._ +import akka.persistence.typed.DeleteEventsFailed +import akka.persistence.typed.DeleteSnapshotsFailed +import akka.persistence.typed.SnapshotFailed import akka.persistence.typed.PersistenceId import akka.persistence.typed.RecoveryCompleted @@ -175,4 +177,54 @@ object BasicPersistentBehaviorCompileOnly { .withSnapshotSelectionCriteria(SnapshotSelectionCriteria.none) //#snapshotSelection + //#snapshotDeletes + import akka.persistence.typed.RetentionCriteria + + val snapshotRetention = EventSourcedBehavior[Command, Event, State]( + persistenceId = PersistenceId("abc"), + emptyState = State(), + commandHandler = (state, cmd) => Effect.noReply, // do something based on a particular command + eventHandler = (state, evt) => state) // do something based on a particular state + .snapshotWhen { + case (state, BookingCompleted(_), sequenceNumber) => true + case (state, event, sequenceNumber) => false + } + .withRetention(RetentionCriteria(snapshotEveryNEvents = 1000, keepNSnapshots = 5)) + //#snapshotDeletes + + //#snapshotAndEventDeletes + import akka.persistence.typed.RetentionCriteria + + val snapshotAndEventsRetention = EventSourcedBehavior[Command, Event, State]( + persistenceId = PersistenceId("abc"), + emptyState = State(), + commandHandler = (state, cmd) => Effect.noReply, // do something based on a particular command and state + eventHandler = (state, evt) => state) // do something based on a particular event and state + .snapshotWhen { + case (state, BookingCompleted(_), sequenceNumber) => true + case (state, event, sequenceNumber) => false + } + .withRetention(RetentionCriteria(snapshotEveryNEvents = 1000, keepNSnapshots = 5, deleteEventsOnSnapshot = true)) + //#snapshotAndEventDeletes + + //#fullDeletesSampleWithSignals + import akka.persistence.typed.RetentionCriteria + + val fullDeletesSampleWithSignals = EventSourcedBehavior[Command, Event, State]( + persistenceId = PersistenceId("abc"), + emptyState = State(), + commandHandler = (state, cmd) => Effect.noReply, // do something based on a particular command and state + eventHandler = (state, evt) => state) // do something based on a particular event and state + .snapshotWhen { + case (state, BookingCompleted(_), sequenceNumber) => true + case (state, event, sequenceNumber) => false + } + .withRetention(RetentionCriteria(snapshotEveryNEvents = 1000, keepNSnapshots = 5, deleteEventsOnSnapshot = true)) + .receiveSignal { // optionally respond to signals + case _: SnapshotFailed => // react to failure + case _: DeleteSnapshotsFailed => // react to failure + case _: DeleteEventsFailed => // react to failure + } + //#fullDeletesSampleWithSignals + }