Allow changing the recovery strategy in a typed persistence actor #25216 (#28932)

* Allow changing the recovery strategy in a typed persistence actor #25216

* scalafmt fix

* added mima exclusion

* added typed.Recovery allowing only default or disabled recovery strategy

* typed.Recovery takes SnapshotSelectionCriteria
deprecated withSnapshotSelectionCriteria

* updated docs
This commit is contained in:
Andreas Gabor 2020-04-28 08:18:50 +02:00 committed by GitHub
parent dc9f907caa
commit 30e79c6231
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 283 additions and 8 deletions

View file

@ -516,6 +516,20 @@ akka.persistence.journal.leveldb.replay-filter {
} }
``` ```
### Disable recovery
You can also completely disable the recovery of events and snapshots:
Scala
: @@snip [BasicPersistentBehaviorCompileOnly.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala) { #recovery-disabled }
Java
: @@snip [BasicPersistentBehaviorTest.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java) { #recovery-disabled }
Please refer to @ref[snapshots](persistence-snapshot.md#snapshots) if you need to disable only the snapshot recovery, or you need to select specific snapshots.
In any case, the highest sequence number will always be recovered so you can keep persisting new events without corrupting your event log.
## Tagging ## Tagging
Persistence allows you to use event tags without using an @ref[`EventAdapter`](../persistence.md#event-adapters): Persistence allows you to use event tags without using an @ref[`EventAdapter`](../persistence.md#event-adapters):

View file

@ -0,0 +1,3 @@
# #25216 customize recovery strategy for typed persistence behaviors
# abstract method withRecovery(akka.persistence.Recovery)akka.persistence.typed.scaladsl.EventSourcedBehavior in interface akka.persistence.typed.scaladsl.EventSourcedBehavior is present only in current version
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.typed.scaladsl.EventSourcedBehavior.withRecovery")

View file

@ -30,6 +30,7 @@ import akka.persistence.typed.DeleteSnapshotsFailed
import akka.persistence.typed.DeletionTarget import akka.persistence.typed.DeletionTarget
import akka.persistence.typed.EventAdapter import akka.persistence.typed.EventAdapter
import akka.persistence.typed.NoOpEventAdapter import akka.persistence.typed.NoOpEventAdapter
import akka.persistence.typed.scaladsl.{ Recovery => TypedRecovery }
import akka.persistence.typed.PersistenceId import akka.persistence.typed.PersistenceId
import akka.persistence.typed.SnapshotAdapter import akka.persistence.typed.SnapshotAdapter
import akka.persistence.typed.SnapshotCompleted import akka.persistence.typed.SnapshotCompleted
@ -220,6 +221,9 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
backoffStrategy: BackoffSupervisorStrategy): EventSourcedBehavior[Command, Event, State] = backoffStrategy: BackoffSupervisorStrategy): EventSourcedBehavior[Command, Event, State] =
copy(supervisionStrategy = backoffStrategy) copy(supervisionStrategy = backoffStrategy)
override def withRecovery(recovery: TypedRecovery): EventSourcedBehavior[Command, Event, State] = {
copy(recovery = recovery.toClassic)
}
} }
/** Protocol used internally by the eventsourced behaviors. */ /** Protocol used internally by the eventsourced behaviors. */

View file

@ -0,0 +1,50 @@
/*
* Copyright (C) 2019-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed.internal
import akka.annotation.InternalApi
import akka.persistence.typed.{ javadsl, scaladsl, SnapshotSelectionCriteria }
/**
* INTERNAL API
*/
@InternalApi private[akka] case object DefaultRecovery extends javadsl.Recovery with scaladsl.Recovery {
override def asScala: scaladsl.Recovery = this
override def asJava: javadsl.Recovery = this
/**
* INTERNAL API
*/
override private[akka] def toClassic = akka.persistence.Recovery()
}
/**
* INTERNAL API
*/
@InternalApi private[akka] case object DisabledRecovery extends javadsl.Recovery with scaladsl.Recovery {
override def asScala: scaladsl.Recovery = this
override def asJava: javadsl.Recovery = this
/**
* INTERNAL API
*/
override private[akka] def toClassic = akka.persistence.Recovery.none
}
/**
* INTERNAL API
*/
@InternalApi private[akka] case class RecoveryWithSnapshotSelectionCriteria(
snapshotSelectionCriteria: SnapshotSelectionCriteria)
extends javadsl.Recovery
with scaladsl.Recovery {
override def asScala: scaladsl.Recovery = this
override def asJava: javadsl.Recovery = this
/**
* INTERNAL API
*/
override private[akka] def toClassic = akka.persistence.Recovery(snapshotSelectionCriteria.toClassic)
}

View file

@ -124,6 +124,7 @@ abstract class EventSourcedBehavior[Command, Event, State] private[akka] (
* You may configure the behavior to skip replaying snapshots completely, in which case the recovery will be * You may configure the behavior to skip replaying snapshots completely, in which case the recovery will be
* performed by replaying all events -- which may take a long time. * performed by replaying all events -- which may take a long time.
*/ */
@deprecated("override recovery instead", "2.6.5")
def snapshotSelectionCriteria: SnapshotSelectionCriteria = SnapshotSelectionCriteria.latest def snapshotSelectionCriteria: SnapshotSelectionCriteria = SnapshotSelectionCriteria.latest
/** /**
@ -151,6 +152,12 @@ abstract class EventSourcedBehavior[Command, Event, State] private[akka] (
*/ */
def retentionCriteria: RetentionCriteria = RetentionCriteria.disabled def retentionCriteria: RetentionCriteria = RetentionCriteria.disabled
/**
* Override to change the strategy for recovery of snapshots and events.
* By default, snapshots and events are recovered.
*/
def recovery: Recovery = Recovery.default
/** /**
* The `tagger` function should give event tags, which will be used in persistence query * The `tagger` function should give event tags, which will be used in persistence query
*/ */
@ -194,7 +201,7 @@ abstract class EventSourcedBehavior[Command, Event, State] private[akka] (
.snapshotAdapter(snapshotAdapter()) .snapshotAdapter(snapshotAdapter())
.withJournalPluginId(journalPluginId) .withJournalPluginId(journalPluginId)
.withSnapshotPluginId(snapshotPluginId) .withSnapshotPluginId(snapshotPluginId)
.withSnapshotSelectionCriteria(snapshotSelectionCriteria) .withRecovery(recovery.asScala)
val handler = signalHandler() val handler = signalHandler()
val behaviorWithSignalHandler = val behaviorWithSignalHandler =

View file

@ -0,0 +1,50 @@
/*
* Copyright (C) 2019-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed.javadsl
import akka.annotation.InternalApi
import akka.persistence.typed.SnapshotSelectionCriteria
import akka.persistence.typed.internal.{ DefaultRecovery, DisabledRecovery, RecoveryWithSnapshotSelectionCriteria }
/**
* Strategy for recovery of snapshots and events.
*/
abstract class Recovery {
def asScala: akka.persistence.typed.scaladsl.Recovery
/**
* INTERNAL API
*/
@InternalApi private[akka] def toClassic: akka.persistence.Recovery
}
/**
* Strategy for recovery of snapshots and events.
*/
object Recovery {
/**
* Snapshots and events are recovered
*/
val default: Recovery = DefaultRecovery
/**
* Neither snapshots nor events are recovered
*/
val disabled: Recovery = DisabledRecovery
/**
* Changes the snapshot selection criteria used for the recovery.
*
* By default the most recent snapshot is used, and the remaining state updates are recovered by replaying events
* from the sequence number up until which the snapshot reached.
*
* You may configure the behavior to skip replaying snapshots completely, in which case the recovery will be
* performed by replaying all events -- which may take a long time.
*/
def withSnapshotSelectionCriteria(snapshotSelectionCriteria: SnapshotSelectionCriteria) =
RecoveryWithSnapshotSelectionCriteria(snapshotSelectionCriteria)
}

View file

@ -46,9 +46,9 @@ object EventSourcedBehavior {
* Create a `Behavior` for a persistent actor. * Create a `Behavior` for a persistent actor.
* *
* @param persistenceId stable unique identifier for the event sourced behavior * @param persistenceId stable unique identifier for the event sourced behavior
* @param emtpyState the intial state for the entity before any events have been processed * @param emptyState the intial state for the entity before any events have been processed
* @param commandHandler map commands to effects e.g. persisting events, replying to commands * @param commandHandler map commands to effects e.g. persisting events, replying to commands
* @param evnetHandler compute the new state given the current state when an event has been persisted * @param eventHandler compute the new state given the current state when an event has been persisted
*/ */
def apply[Command, Event, State]( def apply[Command, Event, State](
persistenceId: PersistenceId, persistenceId: PersistenceId,
@ -158,6 +158,7 @@ object EventSourcedBehavior {
* You may configure the behavior to skip replaying snapshots completely, in which case the recovery will be * You may configure the behavior to skip replaying snapshots completely, in which case the recovery will be
* performed by replaying all events -- which may take a long time. * performed by replaying all events -- which may take a long time.
*/ */
@deprecated("use withRecovery(Recovery.withSnapshotSelectionCriteria(...))", "2.6.5")
def withSnapshotSelectionCriteria(selection: SnapshotSelectionCriteria): EventSourcedBehavior[Command, Event, State] def withSnapshotSelectionCriteria(selection: SnapshotSelectionCriteria): EventSourcedBehavior[Command, Event, State]
/** /**
@ -208,4 +209,10 @@ object EventSourcedBehavior {
* If not specified the actor will be stopped on failure. * If not specified the actor will be stopped on failure.
*/ */
def onPersistFailure(backoffStrategy: BackoffSupervisorStrategy): EventSourcedBehavior[Command, Event, State] def onPersistFailure(backoffStrategy: BackoffSupervisorStrategy): EventSourcedBehavior[Command, Event, State]
/**
* Change the recovery strategy.
* By default, snapshots and events are recovered.
*/
def withRecovery(recovery: Recovery): EventSourcedBehavior[Command, Event, State]
} }

View file

@ -0,0 +1,51 @@
/*
* Copyright (C) 2019-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed.scaladsl
import akka.annotation.InternalApi
import akka.persistence.typed.SnapshotSelectionCriteria
import akka.persistence.typed.internal.{ DefaultRecovery, DisabledRecovery, RecoveryWithSnapshotSelectionCriteria }
/**
* Strategy for recovery of snapshots and events.
*/
trait Recovery {
def asJava: akka.persistence.typed.javadsl.Recovery
/**
* INTERNAL API
*/
@InternalApi private[akka] def toClassic: akka.persistence.Recovery
}
/**
* Strategy for recovery of snapshots and events.
*/
object Recovery {
/**
* Snapshots and events are recovered
*/
val default: Recovery = DefaultRecovery
/**
* Neither snapshots nor events are recovered
*/
val disabled: Recovery = DisabledRecovery
/**
* Changes the snapshot selection criteria used for the recovery.
*
* By default the most recent snapshot is used, and the remaining state updates are recovered by replaying events
* from the sequence number up until which the snapshot reached.
*
* You may configure the behavior to skip replaying snapshots completely, in which case the recovery will be
* performed by replaying all events -- which may take a long time.
*/
def withSnapshotSelectionCriteria(snapshotSelectionCriteria: SnapshotSelectionCriteria) =
RecoveryWithSnapshotSelectionCriteria(snapshotSelectionCriteria)
}

View file

@ -16,6 +16,7 @@ import akka.persistence.typed.SnapshotFailed;
import akka.persistence.typed.SnapshotSelectionCriteria; import akka.persistence.typed.SnapshotSelectionCriteria;
import akka.persistence.typed.javadsl.CommandHandler; import akka.persistence.typed.javadsl.CommandHandler;
import akka.persistence.typed.javadsl.Effect; import akka.persistence.typed.javadsl.Effect;
import akka.persistence.typed.javadsl.Recovery;
import akka.persistence.typed.javadsl.EventHandler; import akka.persistence.typed.javadsl.EventHandler;
// #behavior // #behavior
import akka.persistence.typed.javadsl.EventSourcedBehavior; import akka.persistence.typed.javadsl.EventSourcedBehavior;
@ -344,6 +345,13 @@ public class BasicPersistentBehaviorTest {
} }
// #recovery // #recovery
// #recovery-disabled
@Override
public Recovery recovery() {
return Recovery.disabled();
}
// #recovery-disabled
// #tagging // #tagging
@Override @Override
public Set<String> tagsFor(Event event) { public Set<String> tagsFor(Event event) {
@ -555,8 +563,8 @@ public class BasicPersistentBehaviorTest {
// #snapshotSelection // #snapshotSelection
@Override @Override
public SnapshotSelectionCriteria snapshotSelectionCriteria() { public Recovery recovery() {
return SnapshotSelectionCriteria.none(); return Recovery.withSnapshotSelectionCriteria(SnapshotSelectionCriteria.none());
} }
// #snapshotSelection // #snapshotSelection
} }

View file

@ -32,6 +32,7 @@ import akka.actor.typed.scaladsl.Behaviors
import akka.persistence.{ SnapshotMetadata => ClassicSnapshotMetadata } import akka.persistence.{ SnapshotMetadata => ClassicSnapshotMetadata }
import akka.persistence.{ SnapshotSelectionCriteria => ClassicSnapshotSelectionCriteria } import akka.persistence.{ SnapshotSelectionCriteria => ClassicSnapshotSelectionCriteria }
import akka.persistence.SelectedSnapshot import akka.persistence.SelectedSnapshot
import akka.persistence.typed.SnapshotSelectionCriteria
import akka.persistence.journal.inmem.InmemJournal import akka.persistence.journal.inmem.InmemJournal
import akka.persistence.query.EventEnvelope import akka.persistence.query.EventEnvelope
import akka.persistence.query.PersistenceQuery import akka.persistence.query.PersistenceQuery
@ -345,6 +346,73 @@ class EventSourcedBehaviorSpec
} }
} }
"adhere default and disabled Recovery strategies" in {
val pid = nextPid()
val probe = TestProbe[State]
def counterWithRecoveryStrategy(recoveryStrategy: Recovery) =
Behaviors.setup[Command](counter(_, pid).withRecovery(recoveryStrategy))
val counterSetup = spawn(counterWithRecoveryStrategy(Recovery.default))
counterSetup ! Increment
counterSetup ! Increment
counterSetup ! Increment
counterSetup ! GetValue(probe.ref)
probe.expectMessage(State(3, Vector(0, 1, 2)))
val counterDefaultRecoveryStrategy = spawn(counterWithRecoveryStrategy(Recovery.default))
counterSetup ! Increment
counterDefaultRecoveryStrategy ! GetValue(probe.ref)
probe.expectMessage(State(4, Vector(0, 1, 2, 3)))
val counterDisabledRecoveryStrategy = spawn(counterWithRecoveryStrategy(Recovery.disabled))
counterDisabledRecoveryStrategy ! Increment
counterDisabledRecoveryStrategy ! Increment
counterDisabledRecoveryStrategy ! GetValue(probe.ref)
probe.expectMessage(State(2, Vector(0, 1)))
}
"adhere Recovery strategy with SnapshotSelectionCriteria" in {
val pid = nextPid()
val eventProbe = TestProbe[(State, Event)]
val commandProbe = TestProbe[State]
val snapshotProbe = TestProbe[Try[SnapshotMetadata]]
def counterWithSnapshotSelectionCriteria(recoveryStrategy: Recovery) =
Behaviors.setup[Command](
counterWithProbe(_, pid, eventProbe.ref, snapshotProbe.ref).withRecovery(recoveryStrategy).snapshotWhen {
case (_, _, _) => true
})
val counterSetup = spawn(counterWithSnapshotSelectionCriteria(Recovery.default))
counterSetup ! Increment
counterSetup ! Increment
counterSetup ! Increment
eventProbe.receiveMessages(3)
snapshotProbe.receiveMessages(3)
counterSetup ! GetValue(commandProbe.ref)
commandProbe.expectMessage(State(3, Vector(0, 1, 2)))
val counterWithSnapshotSelectionCriteriaNone = spawn(
counterWithSnapshotSelectionCriteria(Recovery.withSnapshotSelectionCriteria(SnapshotSelectionCriteria.none)))
// replay all events, no snapshot
eventProbe.expectMessage(State(0, Vector.empty) -> Incremented(1))
eventProbe.expectMessage(State(1, Vector(0)) -> Incremented(1))
eventProbe.expectMessage(State(2, Vector(0, 1)) -> Incremented(1))
counterWithSnapshotSelectionCriteriaNone ! Increment
eventProbe.expectMessage(State(3, Vector(0, 1, 2)) -> Incremented(1))
counterWithSnapshotSelectionCriteriaNone ! GetValue(commandProbe.ref)
commandProbe.expectMessage(State(4, Vector(0, 1, 2, 3)))
val counterWithSnapshotSelectionCriteriaLatest = spawn(
counterWithSnapshotSelectionCriteria(Recovery.withSnapshotSelectionCriteria(SnapshotSelectionCriteria.latest)))
// replay no events, only latest snapshot
eventProbe.expectNoMessage()
counterWithSnapshotSelectionCriteriaLatest ! Increment
counterWithSnapshotSelectionCriteriaLatest ! GetValue(commandProbe.ref)
commandProbe.expectMessage(State(5, Vector(0, 1, 2, 3, 4)))
}
/** /**
* Verify that all side-effects callbacks are called (in order) and only once. * Verify that all side-effects callbacks are called (in order) and only once.
* The [[IncrementTwiceAndThenLog]] command will emit two Increment events * The [[IncrementTwiceAndThenLog]] command will emit two Increment events

View file

@ -12,7 +12,7 @@ import akka.actor.testkit.typed.TestException
import akka.actor.testkit.typed.scaladsl.{ LogCapturing, LoggingTestKit, ScalaTestWithActorTestKit, TestProbe } import akka.actor.testkit.typed.scaladsl.{ LogCapturing, LoggingTestKit, ScalaTestWithActorTestKit, TestProbe }
import akka.actor.typed._ import akka.actor.typed._
import akka.actor.typed.scaladsl.{ ActorContext, Behaviors } import akka.actor.typed.scaladsl.{ ActorContext, Behaviors }
import akka.persistence.Recovery import akka.persistence.{ Recovery => ClassicRecovery }
import akka.persistence.typed.{ NoOpEventAdapter, PersistenceId, RecoveryCompleted } import akka.persistence.typed.{ NoOpEventAdapter, PersistenceId, RecoveryCompleted }
import akka.persistence.typed.internal.{ import akka.persistence.typed.internal.{
BehaviorSetup, BehaviorSetup,
@ -62,7 +62,7 @@ class EventSourcedBehaviorWatchSpec
NoOpEventAdapter.instance[String], NoOpEventAdapter.instance[String],
NoOpSnapshotAdapter.instance[String], NoOpSnapshotAdapter.instance[String],
snapshotWhen = ConstantFun.scalaAnyThreeToFalse, snapshotWhen = ConstantFun.scalaAnyThreeToFalse,
Recovery(), ClassicRecovery(),
RetentionCriteria.disabled, RetentionCriteria.disabled,
holdingRecoveryPermit = false, holdingRecoveryPermit = false,
settings = settings, settings = settings,

View file

@ -15,6 +15,7 @@ import akka.persistence.typed.DeleteEventsFailed
import akka.persistence.typed.DeleteSnapshotsFailed import akka.persistence.typed.DeleteSnapshotsFailed
import akka.persistence.typed.EventAdapter import akka.persistence.typed.EventAdapter
import akka.persistence.typed.EventSeq import akka.persistence.typed.EventSeq
import akka.persistence.typed.scaladsl.Recovery
//#structure //#structure
//#behavior //#behavior
import akka.persistence.typed.scaladsl.EventSourcedBehavior import akka.persistence.typed.scaladsl.EventSourcedBehavior
@ -121,6 +122,18 @@ object BasicPersistentBehaviorCompileOnly {
//#recovery //#recovery
} }
object RecoveryDisabledBehavior {
def apply(): Behavior[Command] =
//#recovery-disabled
EventSourcedBehavior[Command, Event, State](
persistenceId = PersistenceId.ofUniqueId("abc"),
emptyState = State(),
commandHandler = (state, cmd) => throw new NotImplementedError("TODO: process the command & return an Effect"),
eventHandler = (state, evt) => throw new NotImplementedError("TODO: process the event return the next state"))
.withRecovery(Recovery.disabled)
//#recovery-disabled
}
object TaggingBehavior { object TaggingBehavior {
def apply(): Behavior[Command] = def apply(): Behavior[Command] =
//#tagging //#tagging
@ -241,7 +254,7 @@ object BasicPersistentBehaviorCompileOnly {
emptyState = State(), emptyState = State(),
commandHandler = (state, cmd) => throw new NotImplementedError("TODO: process the command & return an Effect"), commandHandler = (state, cmd) => throw new NotImplementedError("TODO: process the command & return an Effect"),
eventHandler = (state, evt) => throw new NotImplementedError("TODO: process the event return the next state")) eventHandler = (state, evt) => throw new NotImplementedError("TODO: process the event return the next state"))
.withSnapshotSelectionCriteria(SnapshotSelectionCriteria.none) .withRecovery(Recovery.withSnapshotSelectionCriteria(SnapshotSelectionCriteria.none))
//#snapshotSelection //#snapshotSelection
//#retentionCriteria //#retentionCriteria