Typed Persistence: docs / sample for deleting snapshots and events (#26603)

#26543
This commit is contained in:
Helena Edelson 2019-03-25 10:26:24 -07:00 committed by GitHub
parent 1fe5505f84
commit 6a46185f87
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 137 additions and 5 deletions

View file

@ -453,3 +453,54 @@ processed.
It's allowed to stash messages while unstashing. Those newly added commands will not be processed by the It's allowed to stash messages while unstashing. Those newly added commands will not be processed by the
`unstashAll` effect that was in progress and have to be unstashed by another `unstashAll`. `unstashAll` effect that was in progress and have to be unstashed by another `unstashAll`.
## Retention - snapshots and events
Retention of snapshots and events are controlled by a few factors. Deletes to free up space is currently available.
### Snapshot deletion
To free up space, an event sourced actor can automatically delete older snapshots
based on a user provided or default `RetentionCriteria` @scala[from `withRetention`] @java[by overriding `retentionCriteria`]
combined with either the `snapshotWhen` or `snapshotEvery` methods.
Scala
: @@snip [BasicPersistentBehaviorCompileOnly.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala) { #snapshotDeletes }
Java
: @@snip [BasicPersistentBehaviorTest.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java) { #snapshotDeletes }
On async deletion, either a `SnapshotCompleted` or `SnapshotFailed` is emitted. Successful completion is logged by the system at log level `debug`, failures at log level `warning`.
You can leverage `EventSourcedSignal` to react to outcomes @scala[with `receiveSignal` handler] @java[by overriding `receiveSignal`].
Scala
: @@snip [BasicPersistentBehaviorCompileOnly.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala) { #fullDeletesSampleWithSignals }
Java
: @@snip [BasicPersistentBehaviorTest.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java) { #fullDeletesSampleWithSignals }
## Event deletion
Deleting events in event sourcing based applications is typically either not used at all, or used in conjunction with snapshotting.
If snapshot-based recovery is enabled, after a snapshot has been successfully stored, a delete (journaled by a single event sourced actor) up until the sequence number of the data held by that snapshot can be issued.
To elect to use this, enable `RetentionCriteria.deleteEventsOnSnapshot` which is disabled by default.
You can leverage `EventSourcedSignal` to react to outcomes @scala[with `receiveSignal` handler] @java[by overriding `receiveSignal`].
Scala
: @@snip [BasicPersistentBehaviorCompileOnly.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala) { #snapshotAndEventDeletes }
Java
: @@snip [BasicPersistentBehaviorTest.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java) { #snapshotDeletes }
On `SaveSnapshotSuccess`, old events would be deleted based on `RetentionCriteria` prior to old snapshots being deleted. On async deletion, either `DeleteEventsCompleted` or `DeleteEventsFailed` is emitted. Successful completion is logged by the
system at log level `debug`, failures at log level `warning`.
Message deletion does not affect the highest sequence number of the journal, even if all messages were deleted from it after a delete occurs.
@@@ note
It is up to the journal implementation whether events are actually removed from storage.
Deleting events prevents future replaying of old events to apply new state.
@@@ note

View file

@ -11,6 +11,9 @@ import akka.actor.typed.javadsl.Behaviors;
import akka.persistence.typed.PersistenceId; import akka.persistence.typed.PersistenceId;
import akka.persistence.typed.RecoveryCompleted; import akka.persistence.typed.RecoveryCompleted;
import akka.persistence.typed.RetentionCriteria; import akka.persistence.typed.RetentionCriteria;
import akka.persistence.typed.SnapshotFailed;
import akka.persistence.typed.DeleteSnapshotsFailed;
import akka.persistence.typed.DeleteEventsFailed;
import akka.persistence.typed.javadsl.CommandHandler; import akka.persistence.typed.javadsl.CommandHandler;
import akka.persistence.typed.javadsl.EventHandler; import akka.persistence.typed.javadsl.EventHandler;
import akka.persistence.typed.javadsl.EventSourcedBehavior; import akka.persistence.typed.javadsl.EventSourcedBehavior;
@ -279,12 +282,38 @@ public class BasicPersistentBehaviorTest {
} }
// #snapshottingPredicate // #snapshottingPredicate
// #retentionCriteria // #snapshotDeletes
@Override // override snapshotEvery in EventSourcedBehavior @Override // override snapshotEvery in EventSourcedBehavior
public RetentionCriteria retentionCriteria() { public RetentionCriteria retentionCriteria() {
// to also delete events, use RetentionCriteria.create(1000, 2, true)
return RetentionCriteria.create(1000, 2); return RetentionCriteria.create(1000, 2);
} }
// #retentionCriteria // #snapshotDeletes
// #fullDeletesSampleWithSignals
@Override
public SignalHandler signalHandler() {
return newSignalHandlerBuilder()
.onSignal(
SnapshotFailed.class,
(completed) -> {
throw new RuntimeException("TODO: add some on-snapshot-failed side-effect here");
})
.onSignal(
DeleteSnapshotsFailed.class,
(completed) -> {
throw new RuntimeException(
"TODO: add some on-delete-snapshot-failed side-effect here");
})
.onSignal(
DeleteEventsFailed.class,
(completed) -> {
throw new RuntimeException(
"TODO: add some on-delete-snapshot-failed side-effect here");
})
.build();
}
// #fullDeletesSampleWithSignals
} }
} }

View file

@ -4,12 +4,14 @@
package docs.akka.persistence.typed package docs.akka.persistence.typed
import akka.actor.typed.ActorRef import scala.concurrent.duration._
import akka.actor.typed.{ Behavior, SupervisorStrategy } import akka.actor.typed.{ Behavior, SupervisorStrategy }
import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.Behaviors
import akka.persistence.typed.scaladsl.EventSourcedBehavior import akka.persistence.typed.scaladsl.EventSourcedBehavior
import akka.persistence.typed.DeleteEventsFailed
import scala.concurrent.duration._ import akka.persistence.typed.DeleteSnapshotsFailed
import akka.persistence.typed.SnapshotFailed
import akka.persistence.typed.PersistenceId import akka.persistence.typed.PersistenceId
import akka.persistence.typed.RecoveryCompleted import akka.persistence.typed.RecoveryCompleted
@ -175,4 +177,54 @@ object BasicPersistentBehaviorCompileOnly {
.withSnapshotSelectionCriteria(SnapshotSelectionCriteria.none) .withSnapshotSelectionCriteria(SnapshotSelectionCriteria.none)
//#snapshotSelection //#snapshotSelection
//#snapshotDeletes
import akka.persistence.typed.RetentionCriteria
val snapshotRetention = EventSourcedBehavior[Command, Event, State](
persistenceId = PersistenceId("abc"),
emptyState = State(),
commandHandler = (state, cmd) => Effect.noReply, // do something based on a particular command
eventHandler = (state, evt) => state) // do something based on a particular state
.snapshotWhen {
case (state, BookingCompleted(_), sequenceNumber) => true
case (state, event, sequenceNumber) => false
}
.withRetention(RetentionCriteria(snapshotEveryNEvents = 1000, keepNSnapshots = 5))
//#snapshotDeletes
//#snapshotAndEventDeletes
import akka.persistence.typed.RetentionCriteria
val snapshotAndEventsRetention = EventSourcedBehavior[Command, Event, State](
persistenceId = PersistenceId("abc"),
emptyState = State(),
commandHandler = (state, cmd) => Effect.noReply, // do something based on a particular command and state
eventHandler = (state, evt) => state) // do something based on a particular event and state
.snapshotWhen {
case (state, BookingCompleted(_), sequenceNumber) => true
case (state, event, sequenceNumber) => false
}
.withRetention(RetentionCriteria(snapshotEveryNEvents = 1000, keepNSnapshots = 5, deleteEventsOnSnapshot = true))
//#snapshotAndEventDeletes
//#fullDeletesSampleWithSignals
import akka.persistence.typed.RetentionCriteria
val fullDeletesSampleWithSignals = EventSourcedBehavior[Command, Event, State](
persistenceId = PersistenceId("abc"),
emptyState = State(),
commandHandler = (state, cmd) => Effect.noReply, // do something based on a particular command and state
eventHandler = (state, evt) => state) // do something based on a particular event and state
.snapshotWhen {
case (state, BookingCompleted(_), sequenceNumber) => true
case (state, event, sequenceNumber) => false
}
.withRetention(RetentionCriteria(snapshotEveryNEvents = 1000, keepNSnapshots = 5, deleteEventsOnSnapshot = true))
.receiveSignal { // optionally respond to signals
case _: SnapshotFailed => // react to failure
case _: DeleteSnapshotsFailed => // react to failure
case _: DeleteEventsFailed => // react to failure
}
//#fullDeletesSampleWithSignals
} }