Signals instead of callbacks for eventsourced behavior #25428
This commit is contained in:
parent
2e247001f7
commit
7fc591c182
25 changed files with 482 additions and 293 deletions
|
|
@ -8,17 +8,16 @@ import java.util.concurrent.ConcurrentHashMap
|
|||
import java.util.concurrent.CountDownLatch
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import scala.concurrent.ExecutionContext
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.Promise
|
||||
import scala.concurrent.duration._
|
||||
|
||||
import akka.Done
|
||||
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
|
||||
import akka.actor.testkit.typed.scaladsl.TestProbe
|
||||
import akka.actor.typed.ActorRef
|
||||
import akka.actor.typed.Behavior
|
||||
import akka.actor.typed.PostStop
|
||||
import akka.actor.typed.internal.PoisonPill
|
||||
import akka.actor.typed.scaladsl.Behaviors
|
||||
import akka.cluster.sharding.ShardRegion.CurrentShardRegionState
|
||||
|
|
@ -29,6 +28,7 @@ import akka.cluster.sharding.{ ClusterSharding => UntypedClusterSharding }
|
|||
import akka.cluster.typed.Cluster
|
||||
import akka.cluster.typed.Join
|
||||
import akka.persistence.typed.ExpectingReply
|
||||
import akka.persistence.typed.RecoveryCompleted
|
||||
import akka.persistence.typed.scaladsl.Effect
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import org.scalatest.WordSpecLike
|
||||
|
|
@ -120,25 +120,25 @@ object ClusterShardingPersistenceSpec {
|
|||
stashing = false
|
||||
Effect.unstashAll()
|
||||
|
||||
case UnstashAllAndPassivate =>
|
||||
case UnstashAllAndPassivate ⇒
|
||||
stashing = false
|
||||
shard ! Passivate(ctx.self)
|
||||
Effect.unstashAll()
|
||||
},
|
||||
eventHandler =
|
||||
(state, evt) => if (state.isEmpty) evt else state + "|" + evt)
|
||||
.onRecoveryCompleted { state =>
|
||||
eventHandler = (state, evt) ⇒
|
||||
if (state.isEmpty) evt else state + "|" + evt).receiveSignal {
|
||||
case RecoveryCompleted(state) ⇒
|
||||
ctx.log.debug("onRecoveryCompleted: [{}]", state)
|
||||
lifecycleProbes.get(entityId) match {
|
||||
case null => ctx.log.debug("no lifecycleProbe (onRecoveryCompleted) for [{}]", entityId)
|
||||
case p => p ! s"recoveryCompleted:$state"
|
||||
case null ⇒ ctx.log.debug("no lifecycleProbe (onRecoveryCompleted) for [{}]", entityId)
|
||||
case p ⇒ p ! s"recoveryCompleted:$state"
|
||||
}
|
||||
}
|
||||
.onPostStop(() =>
|
||||
case PostStop ⇒
|
||||
lifecycleProbes.get(entityId) match {
|
||||
case null => ctx.log.debug("no lifecycleProbe (postStop) for [{}]", entityId)
|
||||
case p => p ! "stopped"
|
||||
})
|
||||
case null ⇒ ctx.log.debug("no lifecycleProbe (postStop) for [{}]", entityId)
|
||||
case p ⇒ p ! "stopped"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -336,7 +336,7 @@ Strategies for that can be found in the @ref:[schema evolution](../persistence-s
|
|||
## Recovery
|
||||
|
||||
It is strongly discouraged to perform side effects in `applyEvent`,
|
||||
so side effects should be performed once recovery has completed @scala[in the `onRecoveryCompleted` callback.] @java[by overriding `onRecoveryCompleted`]
|
||||
so side effects should be performed once recovery has completed as a reaction to the `RecoveryCompleted` signal @scala[`receiveSignal` handler] @java[by overriding `receiveSignal`]
|
||||
|
||||
Scala
|
||||
: @@snip [BasicPersistentBehaviorCompileOnly.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala) { #recovery }
|
||||
|
|
@ -344,8 +344,7 @@ Scala
|
|||
Java
|
||||
: @@snip [BasicPersistentBehaviorTest.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java) { #recovery }
|
||||
|
||||
The `onRecoveryCompleted` takes @scala[an `ActorContext` and] the current `State`,
|
||||
and doesn't return anything.
|
||||
The `RecoveryCompleted` contains the current `State`.
|
||||
|
||||
@ref[Snapshots)[persistence-snapshot.md] can be used for optimizing recovery times.
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,95 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.persistence.typed
|
||||
|
||||
import akka.actor.typed.Signal
|
||||
import akka.annotation.DoNotInherit
|
||||
import akka.persistence.SnapshotMetadata
|
||||
import akka.persistence.SnapshotSelectionCriteria
|
||||
|
||||
/**
|
||||
* Supertype for all Akka Persistence Typed specific signals
|
||||
*
|
||||
* Not for user extension
|
||||
*/
|
||||
@DoNotInherit
|
||||
sealed trait EventSourcedSignal extends Signal
|
||||
|
||||
final case class RecoveryCompleted[State](state: State) extends EventSourcedSignal {
|
||||
|
||||
/**
|
||||
* Java API
|
||||
*/
|
||||
def getState(): State = state
|
||||
}
|
||||
final case class RecoveryFailed(failure: Throwable) extends EventSourcedSignal {
|
||||
|
||||
/**
|
||||
* Java API
|
||||
*/
|
||||
def getFailure(): Throwable = failure
|
||||
}
|
||||
|
||||
final case class SnapshotCompleted(metadata: SnapshotMetadata) extends EventSourcedSignal {
|
||||
|
||||
/**
|
||||
* Java API
|
||||
*/
|
||||
def getSnapshotMetadata(): SnapshotMetadata = metadata
|
||||
}
|
||||
final case class SnapshotFailed(metadata: SnapshotMetadata, failure: Throwable) extends EventSourcedSignal {
|
||||
|
||||
/**
|
||||
* Java API
|
||||
*/
|
||||
def getFailure(): Throwable = failure
|
||||
|
||||
/**
|
||||
* Java API
|
||||
*/
|
||||
def getSnapshotMetadata(): SnapshotMetadata = metadata
|
||||
}
|
||||
|
||||
final case class DeleteSnapshotCompleted(target: DeletionTarget) extends EventSourcedSignal {
|
||||
|
||||
/**
|
||||
* Java API
|
||||
*/
|
||||
def getTarget(): DeletionTarget = target
|
||||
}
|
||||
final case class DeleteSnapshotFailed(target: DeletionTarget, failure: Throwable) extends EventSourcedSignal {
|
||||
|
||||
/**
|
||||
* Java API
|
||||
*/
|
||||
def getFailure(): Throwable = failure
|
||||
|
||||
/**
|
||||
* Java API
|
||||
*/
|
||||
def getTarget(): DeletionTarget = target
|
||||
}
|
||||
|
||||
/**
|
||||
* Not for user extension
|
||||
*/
|
||||
@DoNotInherit
|
||||
sealed trait DeletionTarget
|
||||
object DeletionTarget {
|
||||
final case class Individual(metadata: SnapshotMetadata) extends DeletionTarget {
|
||||
|
||||
/**
|
||||
* Java API
|
||||
*/
|
||||
def getSnapshotMetadata(): SnapshotMetadata = metadata
|
||||
}
|
||||
final case class Criteria(selection: SnapshotSelectionCriteria) extends DeletionTarget {
|
||||
|
||||
/**
|
||||
* Java API
|
||||
*/
|
||||
def getSnapshotSelection(): SnapshotSelectionCriteria = selection
|
||||
}
|
||||
}
|
||||
|
|
@ -5,18 +5,18 @@
|
|||
package akka.persistence.typed.internal
|
||||
|
||||
import scala.concurrent.ExecutionContext
|
||||
import scala.util.Try
|
||||
|
||||
import akka.Done
|
||||
import akka.actor.Cancellable
|
||||
import akka.actor.typed.Logger
|
||||
import akka.actor.typed.scaladsl.ActorContext
|
||||
import akka.actor.ActorRef
|
||||
import akka.actor.typed.Signal
|
||||
import akka.annotation.InternalApi
|
||||
import akka.persistence._
|
||||
import akka.persistence.typed.EventAdapter
|
||||
import akka.persistence.typed.PersistenceId
|
||||
import akka.persistence.typed.scaladsl.EventSourcedBehavior
|
||||
import akka.util.ConstantFun
|
||||
import akka.util.OptionVal
|
||||
|
||||
/**
|
||||
|
|
@ -29,12 +29,10 @@ private[akka] final class BehaviorSetup[C, E, S](val context: ActorContext[Inter
|
|||
val commandHandler: EventSourcedBehavior.CommandHandler[C, E, S],
|
||||
val eventHandler: EventSourcedBehavior.EventHandler[S, E],
|
||||
val writerIdentity: EventSourcedBehaviorImpl.WriterIdentity,
|
||||
val recoveryCompleted: S => Unit,
|
||||
val onRecoveryFailure: Throwable => Unit,
|
||||
val onSnapshot: (SnapshotMetadata, Try[Done]) => Unit,
|
||||
val tagger: E => Set[String],
|
||||
private val signalHandler: PartialFunction[Signal, Unit],
|
||||
val tagger: E ⇒ Set[String],
|
||||
val eventAdapter: EventAdapter[E, _],
|
||||
val snapshotWhen: (S, E, Long) => Boolean,
|
||||
val snapshotWhen: (S, E, Long) ⇒ Boolean,
|
||||
val recovery: Recovery,
|
||||
var holdingRecoveryPermit: Boolean,
|
||||
val settings: EventSourcedSettings,
|
||||
|
|
@ -99,6 +97,10 @@ private[akka] final class BehaviorSetup[C, E, S](val context: ActorContext[Inter
|
|||
recoveryTimer = OptionVal.None
|
||||
}
|
||||
|
||||
def onSignal(signal: Signal): Unit = {
|
||||
signalHandler.applyOrElse(signal, ConstantFun.scalaAnyToUnit)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -11,15 +11,12 @@ import scala.util.Failure
|
|||
import scala.util.Success
|
||||
import scala.util.Try
|
||||
import scala.util.control.NonFatal
|
||||
|
||||
import akka.Done
|
||||
import akka.actor.typed
|
||||
import akka.actor.typed.BackoffSupervisorStrategy
|
||||
import akka.actor.typed.Behavior
|
||||
import akka.actor.typed.BehaviorInterceptor
|
||||
import akka.actor.typed.Logger
|
||||
import akka.actor.typed.PostStop
|
||||
import akka.actor.typed.PreRestart
|
||||
import akka.actor.typed.Signal
|
||||
import akka.actor.typed.SupervisorStrategy
|
||||
import akka.actor.typed.scaladsl.ActorContext
|
||||
|
|
@ -29,21 +26,14 @@ import akka.persistence._
|
|||
import akka.persistence.typed.EventAdapter
|
||||
import akka.persistence.typed.NoOpEventAdapter
|
||||
import akka.persistence.typed.PersistenceId
|
||||
import akka.persistence.typed.SnapshotCompleted
|
||||
import akka.persistence.typed.SnapshotFailed
|
||||
import akka.persistence.typed.scaladsl._
|
||||
import akka.util.ConstantFun
|
||||
|
||||
@InternalApi
|
||||
private[akka] object EventSourcedBehaviorImpl {
|
||||
|
||||
def defaultOnSnapshot[A](ctx: ActorContext[A], meta: SnapshotMetadata, result: Try[Done]): Unit = {
|
||||
result match {
|
||||
case Success(_) =>
|
||||
ctx.log.debug("Save snapshot successful, snapshot metadata: [{}]", meta)
|
||||
case Failure(t) =>
|
||||
ctx.log.error(t, "Save snapshot failed, snapshot metadata: [{}]", meta)
|
||||
}
|
||||
}
|
||||
|
||||
object WriterIdentity {
|
||||
|
||||
// ok to wrap around (2*Int.MaxValue restarts will not happen within a journal roundtrip)
|
||||
|
|
@ -68,16 +58,12 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
|
|||
loggerClass: Class[_],
|
||||
journalPluginId: Option[String] = None,
|
||||
snapshotPluginId: Option[String] = None,
|
||||
recoveryCompleted: State => Unit = ConstantFun.scalaAnyToUnit,
|
||||
postStop: () => Unit = ConstantFun.unitToUnit,
|
||||
preRestart: () => Unit = ConstantFun.unitToUnit,
|
||||
tagger: Event => Set[String] = (_: Event) => Set.empty[String],
|
||||
tagger: Event ⇒ Set[String] = (_: Event) ⇒ Set.empty[String],
|
||||
eventAdapter: EventAdapter[Event, Any] = NoOpEventAdapter.instance[Event],
|
||||
snapshotWhen: (State, Event, Long) => Boolean = ConstantFun.scalaAnyThreeToFalse,
|
||||
snapshotWhen: (State, Event, Long) ⇒ Boolean = ConstantFun.scalaAnyThreeToFalse,
|
||||
recovery: Recovery = Recovery(),
|
||||
supervisionStrategy: SupervisorStrategy = SupervisorStrategy.stop,
|
||||
onSnapshot: (SnapshotMetadata, Try[Done]) => Unit = ConstantFun.scalaAnyTwoToUnit,
|
||||
onRecoveryFailure: Throwable => Unit = ConstantFun.scalaAnyToUnit)
|
||||
override val signalHandler: PartialFunction[Signal, Unit] = PartialFunction.empty)
|
||||
extends EventSourcedBehavior[Command, Event, State] {
|
||||
|
||||
import EventSourcedBehaviorImpl.WriterIdentity
|
||||
|
|
@ -90,25 +76,24 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
|
|||
// stashState outside supervise because StashState should survive restarts due to persist failures
|
||||
val stashState = new StashState(settings)
|
||||
|
||||
val actualSignalHandler: PartialFunction[Signal, Unit] = signalHandler.orElse {
|
||||
// default signal handler is always the fallback
|
||||
case SnapshotCompleted(meta: SnapshotMetadata) ⇒
|
||||
ctx.log.debug("Save snapshot successful, snapshot metadata: [{}]", meta)
|
||||
case SnapshotFailed(meta, failure) ⇒
|
||||
ctx.log.error(failure, "Save snapshot failed, snapshot metadata: [{}]", meta)
|
||||
}
|
||||
|
||||
Behaviors
|
||||
.supervise {
|
||||
Behaviors.setup[Command] { _ =>
|
||||
// the default impl needs context which isn't available until here, so we
|
||||
// use the anyTwoToUnit as a marker to use the default
|
||||
val actualOnSnapshot: (SnapshotMetadata, Try[Done]) => Unit =
|
||||
if (onSnapshot == ConstantFun.scalaAnyTwoToUnit)
|
||||
EventSourcedBehaviorImpl.defaultOnSnapshot[Command](ctx, _, _)
|
||||
else onSnapshot
|
||||
|
||||
val eventsourcedSetup = new BehaviorSetup(ctx.asInstanceOf[ActorContext[InternalProtocol]],
|
||||
Behaviors.setup[Command] { _ ⇒
|
||||
val eventSourcedSetup = new BehaviorSetup(ctx.asInstanceOf[ActorContext[InternalProtocol]],
|
||||
persistenceId,
|
||||
emptyState,
|
||||
commandHandler,
|
||||
eventHandler,
|
||||
WriterIdentity.newIdentity(),
|
||||
recoveryCompleted,
|
||||
onRecoveryFailure,
|
||||
actualOnSnapshot,
|
||||
actualSignalHandler,
|
||||
tagger,
|
||||
eventAdapter,
|
||||
snapshotWhen,
|
||||
|
|
@ -132,22 +117,26 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
|
|||
signal: Signal,
|
||||
target: SignalTarget[Any]): Behavior[Any] = {
|
||||
if (signal == PostStop) {
|
||||
eventsourcedSetup.cancelRecoveryTimer()
|
||||
eventSourcedSetup.cancelRecoveryTimer()
|
||||
// clear stash to be GC friendly
|
||||
stashState.clearStashBuffers()
|
||||
signalPostStop(eventsourcedSetup.log)
|
||||
} else if (signal == PreRestart) {
|
||||
signalPreRestart(eventsourcedSetup.log)
|
||||
}
|
||||
target(ctx, signal)
|
||||
val nextBehavior = target(ctx, signal)
|
||||
try {
|
||||
eventSourcedSetup.onSignal(signal)
|
||||
} catch {
|
||||
case NonFatal(ex) ⇒
|
||||
ctx.asScala.log.error(ex, s"Error while processing signal [{}]", signal)
|
||||
}
|
||||
nextBehavior
|
||||
}
|
||||
}
|
||||
val widened = RequestingRecoveryPermit(eventsourcedSetup).widen[Any] {
|
||||
case res: JournalProtocol.Response => InternalProtocol.JournalResponse(res)
|
||||
case res: SnapshotProtocol.Response => InternalProtocol.SnapshotterResponse(res)
|
||||
case RecoveryPermitter.RecoveryPermitGranted => InternalProtocol.RecoveryPermitGranted
|
||||
case internal: InternalProtocol => internal // such as RecoveryTickEvent
|
||||
case cmd: Command @unchecked => InternalProtocol.IncomingCommand(cmd)
|
||||
val widened = RequestingRecoveryPermit(eventSourcedSetup).widen[Any] {
|
||||
case res: JournalProtocol.Response ⇒ InternalProtocol.JournalResponse(res)
|
||||
case res: SnapshotProtocol.Response ⇒ InternalProtocol.SnapshotterResponse(res)
|
||||
case RecoveryPermitter.RecoveryPermitGranted ⇒ InternalProtocol.RecoveryPermitGranted
|
||||
case internal: InternalProtocol ⇒ internal // such as RecoveryTickEvent
|
||||
case cmd: Command @unchecked ⇒ InternalProtocol.IncomingCommand(cmd)
|
||||
}
|
||||
Behaviors.intercept(onStopInterceptor)(widened).narrow[Command]
|
||||
}
|
||||
|
|
@ -156,30 +145,8 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
|
|||
.onFailure[JournalFailureException](supervisionStrategy)
|
||||
}
|
||||
|
||||
def signalPostStop(log: Logger): Unit = {
|
||||
try postStop()
|
||||
catch {
|
||||
case NonFatal(e) =>
|
||||
log.warning("Exception in postStop: {}", e)
|
||||
}
|
||||
}
|
||||
|
||||
def signalPreRestart(log: Logger): Unit = {
|
||||
try preRestart()
|
||||
catch {
|
||||
case NonFatal(e) =>
|
||||
log.warning("Exception in preRestart: {}", e)
|
||||
}
|
||||
}
|
||||
|
||||
override def onRecoveryCompleted(callback: State => Unit): EventSourcedBehavior[Command, Event, State] =
|
||||
copy(recoveryCompleted = callback)
|
||||
|
||||
override def onPostStop(callback: () => Unit): EventSourcedBehavior[Command, Event, State] =
|
||||
copy(postStop = callback)
|
||||
|
||||
override def onPreRestart(callback: () => Unit): EventSourcedBehavior[Command, Event, State] =
|
||||
copy(preRestart = callback)
|
||||
override def receiveSignal(handler: PartialFunction[Signal, Unit]): EventSourcedBehavior[Command, Event, State] =
|
||||
copy(signalHandler = handler)
|
||||
|
||||
override def snapshotWhen(predicate: (State, Event, Long) => Boolean): EventSourcedBehavior[Command, Event, State] =
|
||||
copy(snapshotWhen = predicate)
|
||||
|
|
@ -210,16 +177,10 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
|
|||
override def eventAdapter(adapter: EventAdapter[Event, _]): EventSourcedBehavior[Command, Event, State] =
|
||||
copy(eventAdapter = adapter.asInstanceOf[EventAdapter[Event, Any]])
|
||||
|
||||
override def onSnapshot(
|
||||
callback: (SnapshotMetadata, Try[Done]) => Unit): EventSourcedBehavior[Command, Event, State] =
|
||||
copy(onSnapshot = callback)
|
||||
|
||||
override def onPersistFailure(
|
||||
backoffStrategy: BackoffSupervisorStrategy): EventSourcedBehavior[Command, Event, State] =
|
||||
copy(supervisionStrategy = backoffStrategy)
|
||||
|
||||
override def onRecoveryFailure(callback: Throwable => Unit): EventSourcedBehavior[Command, Event, State] =
|
||||
copy(onRecoveryFailure = callback)
|
||||
}
|
||||
|
||||
/** Protocol used internally by the eventsourced behaviors. */
|
||||
|
|
|
|||
|
|
@ -16,6 +16,8 @@ import akka.annotation.InternalApi
|
|||
import akka.event.Logging
|
||||
import akka.persistence.JournalProtocol._
|
||||
import akka.persistence._
|
||||
import akka.persistence.typed.RecoveryFailed
|
||||
import akka.persistence.typed.RecoveryCompleted
|
||||
import akka.persistence.typed.internal.ReplayingEvents.ReplayingState
|
||||
import akka.persistence.typed.internal.Running.WithSeqNrAccessible
|
||||
|
||||
|
|
@ -158,9 +160,9 @@ private[akka] final class ReplayingEvents[C, E, S](override val setup: BehaviorS
|
|||
sequenceNr: Long,
|
||||
message: Option[Any]): Behavior[InternalProtocol] = {
|
||||
try {
|
||||
setup.onRecoveryFailure(cause)
|
||||
setup.onSignal(RecoveryFailed(cause))
|
||||
} catch {
|
||||
case NonFatal(t) => setup.log.error(t, "onRecoveryFailure threw exception")
|
||||
case NonFatal(t) => setup.log.error(t, "RecoveryFailed signal handler threw exception")
|
||||
}
|
||||
setup.cancelRecoveryTimer()
|
||||
tryReturnRecoveryPermit("on replay failure: " + cause.getMessage)
|
||||
|
|
@ -179,7 +181,7 @@ private[akka] final class ReplayingEvents[C, E, S](override val setup: BehaviorS
|
|||
protected def onRecoveryCompleted(state: ReplayingState[S]): Behavior[InternalProtocol] =
|
||||
try {
|
||||
tryReturnRecoveryPermit("replay completed successfully")
|
||||
setup.recoveryCompleted(state.state)
|
||||
setup.onSignal(RecoveryCompleted(state.state))
|
||||
|
||||
if (state.receivedPoisonPill && isInternalStashEmpty && !isUnstashAllInProgress)
|
||||
Behaviors.stopped
|
||||
|
|
|
|||
|
|
@ -6,10 +6,6 @@ package akka.persistence.typed.internal
|
|||
|
||||
import scala.annotation.tailrec
|
||||
import scala.collection.immutable
|
||||
import scala.util.Failure
|
||||
import scala.util.Success
|
||||
|
||||
import akka.Done
|
||||
import akka.actor.UnhandledMessage
|
||||
import akka.actor.typed.Behavior
|
||||
import akka.actor.typed.Signal
|
||||
|
|
@ -20,10 +16,14 @@ import akka.annotation.InternalApi
|
|||
import akka.persistence.JournalProtocol._
|
||||
import akka.persistence._
|
||||
import akka.persistence.journal.Tagged
|
||||
|
||||
import akka.persistence.typed.Callback
|
||||
import akka.persistence.typed.DeleteSnapshotCompleted
|
||||
import akka.persistence.typed.DeleteSnapshotFailed
|
||||
import akka.persistence.typed.DeletionTarget
|
||||
import akka.persistence.typed.EventRejectedException
|
||||
import akka.persistence.typed.SideEffect
|
||||
import akka.persistence.typed.SnapshotCompleted
|
||||
import akka.persistence.typed.SnapshotFailed
|
||||
import akka.persistence.typed.Stop
|
||||
import akka.persistence.typed.UnstashAll
|
||||
import akka.persistence.typed.internal.Running.WithSeqNrAccessible
|
||||
|
|
@ -310,21 +310,23 @@ private[akka] object Running {
|
|||
}
|
||||
|
||||
def onSnapshotterResponse(response: SnapshotProtocol.Response): Unit = {
|
||||
response match {
|
||||
val signal = response match {
|
||||
case SaveSnapshotSuccess(meta) =>
|
||||
setup.onSnapshot(meta, Success(Done))
|
||||
Some(SnapshotCompleted(meta))
|
||||
case SaveSnapshotFailure(meta, ex) =>
|
||||
setup.onSnapshot(meta, Failure(ex))
|
||||
|
||||
// FIXME #24698 not implemented yet
|
||||
case DeleteSnapshotFailure(_, _) => ???
|
||||
case DeleteSnapshotSuccess(_) => ???
|
||||
case DeleteSnapshotsFailure(_, _) => ???
|
||||
case DeleteSnapshotsSuccess(_) => ???
|
||||
|
||||
// ignore LoadSnapshot messages
|
||||
case _ =>
|
||||
Some(SnapshotFailed(meta, ex))
|
||||
case DeleteSnapshotSuccess(meta) =>
|
||||
Some(DeleteSnapshotCompleted(DeletionTarget.Individual(meta)))
|
||||
case DeleteSnapshotFailure(meta, ex) =>
|
||||
Some(DeleteSnapshotFailed(DeletionTarget.Individual(meta), ex))
|
||||
case DeleteSnapshotsSuccess(criteria) =>
|
||||
Some(DeleteSnapshotCompleted(DeletionTarget.Criteria(criteria)))
|
||||
case DeleteSnapshotsFailure(criteria, failure) =>
|
||||
Some(DeleteSnapshotFailed(DeletionTarget.Criteria(criteria), failure))
|
||||
case _ => None
|
||||
}
|
||||
|
||||
signal.foreach(setup.onSignal _)
|
||||
}
|
||||
|
||||
Behaviors
|
||||
|
|
|
|||
|
|
@ -7,8 +7,6 @@ package akka.persistence.typed.javadsl
|
|||
import java.util.Collections
|
||||
import java.util.Optional
|
||||
|
||||
import scala.util.Failure
|
||||
import scala.util.Success
|
||||
import akka.actor.typed
|
||||
import akka.actor.typed.BackoffSupervisorStrategy
|
||||
import akka.actor.typed.Behavior
|
||||
|
|
@ -16,7 +14,6 @@ import akka.actor.typed.Behavior.DeferredBehavior
|
|||
import akka.actor.typed.javadsl.ActorContext
|
||||
import akka.annotation.ApiMayChange
|
||||
import akka.annotation.InternalApi
|
||||
import akka.persistence.SnapshotMetadata
|
||||
import akka.persistence.typed.EventAdapter
|
||||
import akka.persistence.typed._
|
||||
import akka.persistence.typed.internal._
|
||||
|
|
@ -76,6 +73,19 @@ abstract class EventSourcedBehavior[Command, Event, State >: Null] private[akka]
|
|||
*/
|
||||
protected def eventHandler(): EventHandler[State, Event]
|
||||
|
||||
/**
|
||||
* Override to react on general lifecycle signals and persistence specific signals (subtypes of
|
||||
* [[akka.persistence.typed.EventSourcedSignal]]).
|
||||
*
|
||||
* Use [[EventSourcedBehavior#newSignalHandlerBuilder]] to define the signal handler.
|
||||
*/
|
||||
protected def signalHandler(): SignalHandler = SignalHandler.Empty
|
||||
|
||||
/**
|
||||
* @return A new, mutable signal handler builder
|
||||
*/
|
||||
protected final def newSignalHandlerBuilder(): SignalHandlerBuilder = new SignalHandlerBuilder
|
||||
|
||||
/**
|
||||
* @return A new, mutable, command handler builder
|
||||
*/
|
||||
|
|
@ -89,35 +99,6 @@ abstract class EventSourcedBehavior[Command, Event, State >: Null] private[akka]
|
|||
protected final def newEventHandlerBuilder(): EventHandlerBuilder[State, Event] =
|
||||
EventHandlerBuilder.builder[State, Event]()
|
||||
|
||||
/**
|
||||
* The callback is invoked to notify the actor that the recovery process
|
||||
* is finished.
|
||||
*/
|
||||
def onRecoveryCompleted(state: State): Unit = ()
|
||||
|
||||
/**
|
||||
* The callback is invoked to notify the actor that the recovery process
|
||||
* has failed
|
||||
*/
|
||||
def onRecoveryFailure(failure: Throwable): Unit = ()
|
||||
|
||||
/**
|
||||
* The callback is invoked to notify that the actor has stopped.
|
||||
*/
|
||||
def onPostStop(): Unit = ()
|
||||
|
||||
/**
|
||||
* The callback is invoked to notify that the actor is restarted.
|
||||
*/
|
||||
def onPreRestart(): Unit = ()
|
||||
|
||||
/**
|
||||
* Override to get notified when a snapshot is finished.
|
||||
*
|
||||
* @param result None if successful otherwise contains the exception thrown when snapshotting
|
||||
*/
|
||||
def onSnapshot(meta: SnapshotMetadata, result: Optional[Throwable]): Unit = ()
|
||||
|
||||
/**
|
||||
* Override and define that snapshot should be saved every N events.
|
||||
*
|
||||
|
|
@ -171,38 +152,23 @@ abstract class EventSourcedBehavior[Command, Event, State >: Null] private[akka]
|
|||
emptyState,
|
||||
(state, cmd) => commandHandler()(state, cmd).asInstanceOf[EffectImpl[Event, State]],
|
||||
eventHandler()(_, _),
|
||||
getClass)
|
||||
.onRecoveryCompleted(onRecoveryCompleted)
|
||||
.onPostStop(() => onPostStop())
|
||||
.onPreRestart(() => onPreRestart())
|
||||
.snapshotWhen(snapshotWhen)
|
||||
.withTagger(tagger)
|
||||
.onSnapshot((meta, result) => {
|
||||
result match {
|
||||
case Success(_) =>
|
||||
context.asScala.log.debug("Save snapshot successful, snapshot metadata: [{}]", meta)
|
||||
case Failure(e) =>
|
||||
context.asScala.log.error(e, "Save snapshot failed, snapshot metadata: [{}]", meta)
|
||||
}
|
||||
getClass).snapshotWhen(snapshotWhen).withTagger(tagger).eventAdapter(eventAdapter())
|
||||
|
||||
onSnapshot(meta, result match {
|
||||
case Success(_) => Optional.empty()
|
||||
case Failure(t) => Optional.of(t)
|
||||
})
|
||||
})
|
||||
.eventAdapter(eventAdapter())
|
||||
.onRecoveryFailure(onRecoveryFailure)
|
||||
val handler = signalHandler()
|
||||
val behaviorWithSignalHandler =
|
||||
if (handler.isEmpty) behavior
|
||||
else behavior.receiveSignal(handler.handler)
|
||||
|
||||
if (onPersistFailure.isPresent)
|
||||
behavior.onPersistFailure(onPersistFailure.get)
|
||||
behaviorWithSignalHandler.onPersistFailure(onPersistFailure.get)
|
||||
else
|
||||
behavior
|
||||
behaviorWithSignalHandler
|
||||
}
|
||||
|
||||
/**
|
||||
* The last sequence number that was persisted, can only be called from inside the handlers of an `EventSourcedBehavior`
|
||||
*/
|
||||
def lastSequenceNumber(ctx: ActorContext[_]): Long = {
|
||||
final def lastSequenceNumber(ctx: ActorContext[_]): Long = {
|
||||
scaladsl.EventSourcedBehavior.lastSequenceNumber(ctx.asScala)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,66 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.persistence.typed.javadsl
|
||||
|
||||
import akka.actor.typed.Signal
|
||||
import akka.annotation.InternalApi
|
||||
import akka.japi.function.Procedure
|
||||
import akka.japi.function.{ Effect ⇒ JEffect }
|
||||
|
||||
object SignalHandler {
|
||||
val Empty: SignalHandler = new SignalHandler(PartialFunction.empty)
|
||||
}
|
||||
|
||||
final class SignalHandler(_handler: PartialFunction[Signal, Unit]) {
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi
|
||||
private[akka] def isEmpty: Boolean = _handler eq PartialFunction.empty
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi
|
||||
private[akka] def handler: PartialFunction[Signal, Unit] = _handler
|
||||
}
|
||||
|
||||
/**
|
||||
* Mutable builder for handling signals in [[EventSourcedBehavior]]
|
||||
*
|
||||
* Not for user instantiation, use [[EventSourcedBehavior#newSignalHandlerBuilder()]] to get an instance.
|
||||
*/
|
||||
final class SignalHandlerBuilder {
|
||||
|
||||
private var handler: PartialFunction[Signal, Unit] = PartialFunction.empty
|
||||
|
||||
/**
|
||||
* If the behavior recieves a signal of type `T`, `callback` is invoked with the signal instance as input.
|
||||
*/
|
||||
def onSignal[T <: Signal](signalType: Class[T], callback: Procedure[T]): SignalHandlerBuilder = {
|
||||
val newPF: PartialFunction[Signal, Unit] = {
|
||||
case t if signalType.isInstance(t) ⇒
|
||||
callback(t.asInstanceOf[T])
|
||||
}
|
||||
handler = newPF.orElse(handler)
|
||||
this
|
||||
}
|
||||
|
||||
/**
|
||||
* If the behavior receives exactly the signal `signal`, `callback` is invoked.
|
||||
*/
|
||||
def onSignal[T <: Signal](signal: T, callback: JEffect): SignalHandlerBuilder = {
|
||||
val newPF: PartialFunction[Signal, Unit] = {
|
||||
case `signal` ⇒
|
||||
callback()
|
||||
}
|
||||
handler = newPF.orElse(handler)
|
||||
this
|
||||
}
|
||||
|
||||
def build: SignalHandler = new SignalHandler(handler)
|
||||
|
||||
}
|
||||
|
|
@ -6,11 +6,11 @@ package akka.persistence.typed.scaladsl
|
|||
|
||||
import scala.util.Try
|
||||
import scala.annotation.tailrec
|
||||
|
||||
import akka.Done
|
||||
import akka.actor.typed.BackoffSupervisorStrategy
|
||||
import akka.actor.typed.Behavior
|
||||
import akka.actor.typed.Behavior.DeferredBehavior
|
||||
import akka.actor.typed.Signal
|
||||
import akka.actor.typed.internal.LoggerClass
|
||||
import akka.actor.typed.internal.InterceptorImpl
|
||||
import akka.actor.typed.internal.adapter.ActorContextAdapter
|
||||
|
|
@ -127,30 +127,18 @@ object EventSourcedBehavior {
|
|||
def persistenceId: PersistenceId
|
||||
|
||||
/**
|
||||
* The `callback` function is called to notify that the recovery process has finished.
|
||||
* Allows the event sourced behavior to react on signals.
|
||||
*
|
||||
* The regular lifecycle signals can be handled as well as
|
||||
* Akka Persistence specific signals (snapshot and recovery related). Those are all subtypes of
|
||||
* [[akka.persistence.typed.EventSourcedSignal]]
|
||||
*/
|
||||
def onRecoveryCompleted(callback: State => Unit): EventSourcedBehavior[Command, Event, State]
|
||||
def receiveSignal(signalHandler: PartialFunction[Signal, Unit]): EventSourcedBehavior[Command, Event, State]
|
||||
|
||||
/**
|
||||
* The `callback` function is called to notify that recovery has failed. For setting a supervision
|
||||
* strategy `onPersistFailure`
|
||||
* @return The currently defined signal handler or an empty handler if no custom handler previously defined
|
||||
*/
|
||||
def onRecoveryFailure(callback: Throwable => Unit): EventSourcedBehavior[Command, Event, State]
|
||||
|
||||
/**
|
||||
* The `callback` function is called to notify that the actor has stopped.
|
||||
*/
|
||||
def onPostStop(callback: () => Unit): EventSourcedBehavior[Command, Event, State]
|
||||
|
||||
/**
|
||||
* The `callback` function is called to notify that the actor is restarted.
|
||||
*/
|
||||
def onPreRestart(callback: () => Unit): EventSourcedBehavior[Command, Event, State]
|
||||
|
||||
/**
|
||||
* The `callback` function is called to notify when a snapshot is complete.
|
||||
*/
|
||||
def onSnapshot(callback: (SnapshotMetadata, Try[Done]) => Unit): EventSourcedBehavior[Command, Event, State]
|
||||
def signalHandler: PartialFunction[Signal, Unit]
|
||||
|
||||
/**
|
||||
* Initiates a snapshot if the given function returns true.
|
||||
|
|
|
|||
|
|
@ -9,8 +9,12 @@ import akka.actor.testkit.typed.javadsl.TestKitJunitResource;
|
|||
import akka.actor.testkit.typed.javadsl.TestProbe;
|
||||
import akka.actor.typed.ActorRef;
|
||||
import akka.actor.typed.Behavior;
|
||||
import akka.actor.typed.Signal;
|
||||
import akka.actor.typed.SupervisorStrategy;
|
||||
import akka.japi.function.Effect;
|
||||
import akka.persistence.typed.PersistenceId;
|
||||
import akka.persistence.typed.RecoveryCompleted;
|
||||
import akka.persistence.typed.RecoveryFailed;
|
||||
import com.typesafe.config.Config;
|
||||
import com.typesafe.config.ConfigFactory;
|
||||
import org.junit.ClassRule;
|
||||
|
|
@ -39,13 +43,19 @@ class FailingEventSourcedActor extends EventSourcedBehavior<String, String, Stri
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onRecoveryCompleted(String s) {
|
||||
probe.tell("starting");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onRecoveryFailure(Throwable failure) {
|
||||
recoveryFailureProbe.tell(failure);
|
||||
public SignalHandler signalHandler() {
|
||||
return newSignalHandlerBuilder()
|
||||
.onSignal(
|
||||
RecoveryCompleted.class,
|
||||
(recoveryCompleted) -> {
|
||||
probe.tell("starting");
|
||||
})
|
||||
.onSignal(
|
||||
RecoveryFailed.class,
|
||||
(signal) -> {
|
||||
recoveryFailureProbe.tell(signal.getFailure());
|
||||
})
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ import akka.actor.typed.ActorRef;
|
|||
import akka.actor.typed.Behavior;
|
||||
import akka.actor.typed.javadsl.Behaviors;
|
||||
import akka.persistence.typed.PersistenceId;
|
||||
import akka.persistence.typed.RecoveryCompleted;
|
||||
import com.typesafe.config.Config;
|
||||
import com.typesafe.config.ConfigFactory;
|
||||
import org.junit.ClassRule;
|
||||
|
|
@ -39,8 +40,14 @@ public class NullEmptyStateTest extends JUnitSuite {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onRecoveryCompleted(String s) {
|
||||
probe.tell("onRecoveryCompleted:" + s);
|
||||
public SignalHandler signalHandler() {
|
||||
return newSignalHandlerBuilder()
|
||||
.onSignal(
|
||||
RecoveryCompleted.class,
|
||||
(completed) -> {
|
||||
probe.tell("onRecoveryCompleted:" + completed.getState());
|
||||
})
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -17,9 +17,7 @@ import akka.persistence.query.NoOffset;
|
|||
import akka.persistence.query.PersistenceQuery;
|
||||
import akka.persistence.query.Sequence;
|
||||
import akka.persistence.query.journal.leveldb.javadsl.LeveldbReadJournal;
|
||||
import akka.persistence.typed.EventAdapter;
|
||||
import akka.persistence.typed.ExpectingReply;
|
||||
import akka.persistence.typed.PersistenceId;
|
||||
import akka.persistence.typed.*;
|
||||
import akka.stream.ActorMaterializer;
|
||||
import akka.stream.javadsl.Sink;
|
||||
import akka.actor.testkit.typed.javadsl.TestKitJunitResource;
|
||||
|
|
@ -416,8 +414,19 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onSnapshot(SnapshotMetadata meta, Optional<Throwable> result) {
|
||||
snapshotProbe.ref().tell(result);
|
||||
public SignalHandler signalHandler() {
|
||||
return newSignalHandlerBuilder()
|
||||
.onSignal(
|
||||
SnapshotCompleted.class,
|
||||
(completed) -> {
|
||||
snapshotProbe.ref().tell(Optional.empty());
|
||||
})
|
||||
.onSignal(
|
||||
SnapshotFailed.class,
|
||||
(signal) -> {
|
||||
snapshotProbe.ref().tell(Optional.of(signal.getFailure()));
|
||||
})
|
||||
.build();
|
||||
}
|
||||
});
|
||||
ActorRef<Command> c = testKit.spawn(snapshoter);
|
||||
|
|
@ -464,9 +473,16 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
|
|||
Behaviors.setup(
|
||||
ctx ->
|
||||
new CounterBehavior(new PersistenceId("c5"), ctx) {
|
||||
|
||||
@Override
|
||||
public void onPostStop() {
|
||||
probe.ref().tell("stopped");
|
||||
public SignalHandler signalHandler() {
|
||||
return newSignalHandlerBuilder()
|
||||
.onSignal(
|
||||
PostStop.instance(),
|
||||
() -> {
|
||||
probe.ref().tell("stopped");
|
||||
})
|
||||
.build();
|
||||
}
|
||||
});
|
||||
ActorRef<Command> c = testKit.spawn(counter);
|
||||
|
|
@ -620,8 +636,14 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onRecoveryCompleted(Object o) {
|
||||
startedProbe.tell("started!");
|
||||
public SignalHandler signalHandler() {
|
||||
return newSignalHandlerBuilder()
|
||||
.onSignal(
|
||||
RecoveryCompleted.class,
|
||||
(completed) -> {
|
||||
startedProbe.tell("started!");
|
||||
})
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -695,8 +717,14 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onRecoveryCompleted(String s) {
|
||||
probe.tell(lastSequenceNumber(context) + " onRecoveryCompleted");
|
||||
public SignalHandler signalHandler() {
|
||||
return newSignalHandlerBuilder()
|
||||
.onSignal(
|
||||
RecoveryCompleted.class,
|
||||
(completed) -> {
|
||||
probe.tell(lastSequenceNumber(context) + " onRecoveryCompleted");
|
||||
})
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ import akka.actor.typed.ActorRef;
|
|||
import akka.actor.typed.Behavior;
|
||||
import akka.actor.typed.javadsl.Behaviors;
|
||||
import akka.persistence.typed.PersistenceId;
|
||||
import akka.persistence.typed.RecoveryCompleted;
|
||||
import com.typesafe.config.Config;
|
||||
import com.typesafe.config.ConfigFactory;
|
||||
import org.junit.ClassRule;
|
||||
|
|
@ -39,8 +40,14 @@ public class PrimitiveStateTest extends JUnitSuite {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onRecoveryCompleted(Integer n) {
|
||||
probe.tell("onRecoveryCompleted:" + n);
|
||||
public SignalHandler signalHandler() {
|
||||
return newSignalHandlerBuilder()
|
||||
.onSignal(
|
||||
RecoveryCompleted.class,
|
||||
(completed) -> {
|
||||
probe.tell("onRecoveryCompleted:" + completed.getState());
|
||||
})
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -9,9 +9,12 @@ import akka.actor.typed.SupervisorStrategy;
|
|||
import akka.actor.typed.javadsl.ActorContext;
|
||||
import akka.actor.typed.javadsl.Behaviors;
|
||||
import akka.persistence.typed.PersistenceId;
|
||||
import akka.persistence.typed.RecoveryCompleted;
|
||||
import akka.persistence.typed.javadsl.CommandHandler;
|
||||
import akka.persistence.typed.javadsl.EventHandler;
|
||||
import akka.persistence.typed.javadsl.EventSourcedBehavior;
|
||||
import akka.persistence.typed.javadsl.SignalHandler;
|
||||
|
||||
import java.time.Duration;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
|
@ -194,9 +197,16 @@ public class BasicPersistentBehaviorTest {
|
|||
}
|
||||
|
||||
// #recovery
|
||||
|
||||
@Override
|
||||
public void onRecoveryCompleted(State state) {
|
||||
throw new RuntimeException("TODO: add some end-of-recovery side-effect here");
|
||||
public SignalHandler signalHandler() {
|
||||
return newSignalHandlerBuilder()
|
||||
.onSignal(
|
||||
RecoveryCompleted.class,
|
||||
(completed) -> {
|
||||
throw new RuntimeException("TODO: add some end-of-recovery side-effect here");
|
||||
})
|
||||
.build();
|
||||
}
|
||||
// #recovery
|
||||
|
||||
|
|
|
|||
|
|
@ -14,11 +14,12 @@ import akka.persistence.RecoveryPermitter.{ RecoveryPermitGranted, RequestRecove
|
|||
import akka.persistence.typed.scaladsl.EventSourcedBehavior.CommandHandler
|
||||
import akka.persistence.typed.scaladsl.{ Effect, EventSourcedBehavior }
|
||||
import akka.testkit.EventFilter
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import scala.util.control.NoStackTrace
|
||||
|
||||
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
|
||||
import akka.persistence.typed.PersistenceId
|
||||
import akka.persistence.typed.RecoveryCompleted
|
||||
import org.scalatest.WordSpecLike
|
||||
|
||||
object RecoveryPermitterSpec {
|
||||
|
|
@ -51,9 +52,10 @@ object RecoveryPermitterSpec {
|
|||
},
|
||||
eventHandler = { (state, event) =>
|
||||
eventProbe.ref ! event; state
|
||||
}).onRecoveryCompleted { _ =>
|
||||
eventProbe.ref ! Recovered
|
||||
if (throwOnRecovery) throw new TE
|
||||
}).receiveSignal {
|
||||
case RecoveryCompleted(state) =>
|
||||
eventProbe.ref ! Recovered
|
||||
if (throwOnRecovery) throw new TE
|
||||
}
|
||||
|
||||
def forwardingBehavior(target: TestProbe[Any]): Behavior[Any] =
|
||||
|
|
|
|||
|
|
@ -8,17 +8,21 @@ import scala.collection.immutable
|
|||
import scala.concurrent.Future
|
||||
import scala.concurrent.duration._
|
||||
import scala.util.Try
|
||||
|
||||
import akka.actor.testkit.typed.TestException
|
||||
import akka.actor.testkit.typed.TestKitSettings
|
||||
import akka.actor.testkit.typed.scaladsl._
|
||||
import akka.actor.typed.ActorRef
|
||||
import akka.actor.typed.PostStop
|
||||
import akka.actor.typed.PreRestart
|
||||
import akka.actor.typed.Signal
|
||||
import akka.actor.typed.SupervisorStrategy
|
||||
import akka.actor.typed.scaladsl.Behaviors
|
||||
import akka.persistence.AtomicWrite
|
||||
import akka.persistence.journal.inmem.InmemJournal
|
||||
import akka.persistence.typed.EventRejectedException
|
||||
import akka.persistence.typed.PersistenceId
|
||||
import akka.persistence.typed.RecoveryCompleted
|
||||
import akka.persistence.typed.RecoveryFailed
|
||||
import com.typesafe.config.Config
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import org.scalatest.WordSpecLike
|
||||
|
|
@ -77,24 +81,29 @@ class EventSourcedBehaviorFailureSpec
|
|||
implicit val testSettings: TestKitSettings = TestKitSettings(system)
|
||||
|
||||
def failingPersistentActor(pid: PersistenceId,
|
||||
probe: ActorRef[String]): EventSourcedBehavior[String, String, String] =
|
||||
probe: ActorRef[String],
|
||||
additionalSignalHandler: PartialFunction[Signal, Unit] = PartialFunction.empty)
|
||||
: EventSourcedBehavior[String, String, String] =
|
||||
EventSourcedBehavior[String, String, String](pid,
|
||||
"",
|
||||
(_, cmd) => {
|
||||
(_, cmd) ⇒ {
|
||||
if (cmd == "wrong")
|
||||
throw new TestException("wrong command")
|
||||
probe.tell("persisting")
|
||||
Effect.persist(cmd)
|
||||
},
|
||||
(state, event) => {
|
||||
(state, event) ⇒ {
|
||||
probe.tell(event)
|
||||
state + event
|
||||
})
|
||||
.onRecoveryCompleted { _ =>
|
||||
probe.tell("starting")
|
||||
}
|
||||
.onPostStop(() => probe.tell("stopped"))
|
||||
.onPreRestart(() => probe.tell("restarting"))
|
||||
.receiveSignal(additionalSignalHandler.orElse {
|
||||
case RecoveryCompleted(_) ⇒
|
||||
probe.tell("starting")
|
||||
case PostStop ⇒
|
||||
probe.tell("stopped")
|
||||
case PreRestart ⇒
|
||||
probe.tell("restarting")
|
||||
})
|
||||
.onPersistFailure(
|
||||
SupervisorStrategy.restartWithBackoff(1.milli, 5.millis, 0.1).withLoggingEnabled(enabled = false))
|
||||
|
||||
|
|
@ -103,7 +112,11 @@ class EventSourcedBehaviorFailureSpec
|
|||
"call onRecoveryFailure when replay fails" in {
|
||||
val probe = TestProbe[String]()
|
||||
val excProbe = TestProbe[Throwable]()
|
||||
spawn(failingPersistentActor(PersistenceId("fail-recovery"), probe.ref).onRecoveryFailure(t => excProbe.ref ! t))
|
||||
spawn(failingPersistentActor(PersistenceId("fail-recovery"), probe.ref, {
|
||||
case RecoveryFailed(t) ⇒
|
||||
println("signal recovery failed")
|
||||
excProbe.ref ! t
|
||||
}))
|
||||
|
||||
excProbe.expectMessageType[TestException].message shouldEqual "Nope"
|
||||
probe.expectMessage("restarting")
|
||||
|
|
@ -111,8 +124,10 @@ class EventSourcedBehaviorFailureSpec
|
|||
|
||||
"handle exceptions in onRecoveryFailure" in {
|
||||
val probe = TestProbe[String]()
|
||||
val pa = spawn(failingPersistentActor(PersistenceId("fail-recovery-twice"), probe.ref).onRecoveryFailure(_ =>
|
||||
throw TestException("recovery call back failure")))
|
||||
val pa = spawn(failingPersistentActor(PersistenceId("fail-recovery-twice"), probe.ref, {
|
||||
case RecoveryFailed(t) ⇒
|
||||
throw TestException("recovery call back failure")
|
||||
}))
|
||||
pa ! "one"
|
||||
probe.expectMessage("starting")
|
||||
probe.expectMessage("persisting")
|
||||
|
|
|
|||
|
|
@ -35,12 +35,17 @@ import akka.persistence.snapshot.SnapshotStore
|
|||
import akka.persistence.typed.EventAdapter
|
||||
import akka.persistence.typed.ExpectingReply
|
||||
import akka.persistence.typed.PersistenceId
|
||||
import akka.persistence.typed.RecoveryCompleted
|
||||
import akka.persistence.typed.SnapshotCompleted
|
||||
import akka.persistence.typed.SnapshotFailed
|
||||
import akka.stream.ActorMaterializer
|
||||
import akka.stream.scaladsl.Sink
|
||||
import com.typesafe.config.Config
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import org.scalatest.WordSpecLike
|
||||
|
||||
import scala.util.Failure
|
||||
|
||||
object EventSourcedBehaviorSpec {
|
||||
|
||||
//#event-wrapper
|
||||
|
|
@ -248,14 +253,17 @@ object EventSourcedBehaviorSpec {
|
|||
Effect.none.thenStop()
|
||||
|
||||
},
|
||||
eventHandler = (state, evt) =>
|
||||
eventHandler = (state, evt) ⇒
|
||||
evt match {
|
||||
case Incremented(delta) =>
|
||||
case Incremented(delta) ⇒
|
||||
probe ! ((state, evt))
|
||||
State(state.value + delta, state.history :+ state.value)
|
||||
}).onRecoveryCompleted(_ => ()).onSnapshot {
|
||||
case (_, result) =>
|
||||
snapshotProbe ! result
|
||||
}).receiveSignal {
|
||||
case RecoveryCompleted(_) ⇒ ()
|
||||
case SnapshotCompleted(_) ⇒
|
||||
snapshotProbe ! Success(Done)
|
||||
case SnapshotFailed(_, failure) ⇒
|
||||
snapshotProbe ! Failure(failure)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ import akka.actor.testkit.typed.scaladsl.{ ScalaTestWithActorTestKit, TestProbe
|
|||
import akka.actor.typed.{ ActorRef, Behavior }
|
||||
import akka.actor.typed.scaladsl.Behaviors
|
||||
import akka.persistence.typed.PersistenceId
|
||||
import akka.persistence.typed.RecoveryCompleted
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import org.scalatest.WordSpecLike
|
||||
|
||||
|
|
@ -24,14 +25,17 @@ class EventSourcedSequenceNumberSpec
|
|||
with WordSpecLike {
|
||||
|
||||
private def behavior(pid: PersistenceId, probe: ActorRef[String]): Behavior[String] =
|
||||
Behaviors.setup(ctx =>
|
||||
Behaviors.setup(ctx ⇒
|
||||
EventSourcedBehavior[String, String, String](pid, "", { (_, command) =>
|
||||
probe ! (EventSourcedBehavior.lastSequenceNumber(ctx) + " onCommand")
|
||||
Effect.persist(command).thenRun(_ => probe ! (EventSourcedBehavior.lastSequenceNumber(ctx) + " thenRun"))
|
||||
}, { (state, evt) =>
|
||||
Effect.persist(command).thenRun(_ ⇒ probe ! (EventSourcedBehavior.lastSequenceNumber(ctx) + " thenRun"))
|
||||
}, { (state, evt) ⇒
|
||||
probe ! (EventSourcedBehavior.lastSequenceNumber(ctx) + " eventHandler")
|
||||
state + evt
|
||||
}).onRecoveryCompleted(_ => probe ! (EventSourcedBehavior.lastSequenceNumber(ctx) + " onRecoveryComplete")))
|
||||
}).receiveSignal {
|
||||
case RecoveryCompleted(_) ⇒
|
||||
probe ! (EventSourcedBehavior.lastSequenceNumber(ctx) + " onRecoveryComplete")
|
||||
})
|
||||
|
||||
"The sequence number" must {
|
||||
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ import akka.actor.testkit.typed.scaladsl._
|
|||
import akka.actor.typed.ActorRef
|
||||
import akka.actor.typed.Behavior
|
||||
import akka.persistence.typed.PersistenceId
|
||||
import akka.persistence.typed.RecoveryCompleted
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import org.scalatest.WordSpecLike
|
||||
|
||||
|
|
@ -35,8 +36,9 @@ class NullEmptyStateSpec extends ScalaTestWithActorTestKit(NullEmptyStateSpec.co
|
|||
eventHandler = (state, event) => {
|
||||
probe.tell("eventHandler:" + state + ":" + event)
|
||||
if (state == null) event else state + event
|
||||
}).onRecoveryCompleted { s =>
|
||||
probe.tell("onRecoveryCompleted:" + s)
|
||||
}).receiveSignal {
|
||||
case RecoveryCompleted(s) ⇒
|
||||
probe.tell("onRecoveryCompleted:" + s)
|
||||
}
|
||||
|
||||
"A typed persistent actor with primitive state" must {
|
||||
|
|
|
|||
|
|
@ -7,7 +7,6 @@ package akka.persistence.typed.scaladsl
|
|||
import java.util.UUID
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
import akka.actor.testkit.typed.TestException
|
||||
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
|
||||
import akka.actor.testkit.typed.scaladsl.TestProbe
|
||||
|
|
@ -15,6 +14,7 @@ import akka.actor.typed.ActorRef
|
|||
import akka.actor.typed.SupervisorStrategy
|
||||
import akka.actor.typed.scaladsl.Behaviors
|
||||
import akka.persistence.typed.PersistenceId
|
||||
import akka.persistence.typed.RecoveryCompleted
|
||||
import akka.persistence.typed.scaladsl.EventSourcedBehavior.CommandHandler
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import org.scalatest.WordSpecLike
|
||||
|
|
@ -74,16 +74,17 @@ object PerformanceSpec {
|
|||
EventSourcedBehavior[Command, String, String](persistenceId = PersistenceId(name),
|
||||
"",
|
||||
commandHandler = CommandHandler.command {
|
||||
case StopMeasure =>
|
||||
case StopMeasure ⇒
|
||||
Effect.none.thenRun(_ => probe.ref ! StopMeasure)
|
||||
case FailAt(sequence) =>
|
||||
case FailAt(sequence) ⇒
|
||||
Effect.none.thenRun(_ => parameters.failAt = sequence)
|
||||
case command => other(command, parameters)
|
||||
case command ⇒ other(command, parameters)
|
||||
},
|
||||
eventHandler = {
|
||||
case (state, _) => state
|
||||
}).onRecoveryCompleted { _ =>
|
||||
if (parameters.every(1000)) print("r")
|
||||
}).receiveSignal {
|
||||
case RecoveryCompleted(_) =>
|
||||
if (parameters.every(1000)) print("r")
|
||||
}
|
||||
})
|
||||
.onFailure(SupervisorStrategy.restart)
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ import akka.actor.typed.{ ActorRef, Behavior }
|
|||
import akka.actor.typed.scaladsl.Behaviors
|
||||
import akka.actor.typed.scaladsl.TimerScheduler
|
||||
import akka.persistence.typed.PersistenceId
|
||||
import akka.persistence.typed.RecoveryCompleted
|
||||
import akka.persistence.typed.SideEffect
|
||||
|
||||
import scala.concurrent.Future
|
||||
|
|
@ -69,37 +70,41 @@ object PersistentActorCompileOnlyTest {
|
|||
response.map(response => AcknowledgeSideEffect(response.correlationId)).foreach(sender ! _)
|
||||
}
|
||||
|
||||
val behavior: Behavior[Command] = Behaviors.setup(
|
||||
ctx =>
|
||||
EventSourcedBehavior[Command, Event, EventsInFlight](persistenceId = PersistenceId("recovery-complete-id"),
|
||||
emptyState = EventsInFlight(0, Map.empty),
|
||||
commandHandler = (state, cmd) =>
|
||||
cmd match {
|
||||
case DoSideEffect(data) =>
|
||||
Effect
|
||||
.persist(
|
||||
IntentRecorded(state.nextCorrelationId, data))
|
||||
.thenRun { _ =>
|
||||
performSideEffect(ctx.self,
|
||||
state.nextCorrelationId,
|
||||
data)
|
||||
}
|
||||
case AcknowledgeSideEffect(correlationId) =>
|
||||
Effect.persist(SideEffectAcknowledged(correlationId))
|
||||
},
|
||||
eventHandler = (state, evt) =>
|
||||
evt match {
|
||||
case IntentRecorded(correlationId, data) =>
|
||||
EventsInFlight(
|
||||
nextCorrelationId = correlationId + 1,
|
||||
dataByCorrelationId = state.dataByCorrelationId + (correlationId -> data))
|
||||
case SideEffectAcknowledged(correlationId) =>
|
||||
state.copy(
|
||||
dataByCorrelationId = state.dataByCorrelationId - correlationId)
|
||||
}).onRecoveryCompleted(state =>
|
||||
state.dataByCorrelationId.foreach {
|
||||
case (correlationId, data) => performSideEffect(ctx.self, correlationId, data)
|
||||
}))
|
||||
val behavior: Behavior[Command] =
|
||||
Behaviors.setup(
|
||||
ctx =>
|
||||
EventSourcedBehavior[Command, Event, EventsInFlight](persistenceId = PersistenceId("recovery-complete-id"),
|
||||
emptyState = EventsInFlight(0, Map.empty),
|
||||
commandHandler = (state, cmd) =>
|
||||
cmd match {
|
||||
case DoSideEffect(data) =>
|
||||
Effect
|
||||
.persist(
|
||||
IntentRecorded(state.nextCorrelationId, data))
|
||||
.thenRun { _ =>
|
||||
performSideEffect(ctx.self,
|
||||
state.nextCorrelationId,
|
||||
data)
|
||||
}
|
||||
case AcknowledgeSideEffect(correlationId) =>
|
||||
Effect.persist(
|
||||
SideEffectAcknowledged(correlationId))
|
||||
},
|
||||
eventHandler = (state, evt) =>
|
||||
evt match {
|
||||
case IntentRecorded(correlationId, data) =>
|
||||
EventsInFlight(
|
||||
nextCorrelationId = correlationId + 1,
|
||||
dataByCorrelationId = state.dataByCorrelationId + (correlationId → data))
|
||||
case SideEffectAcknowledged(correlationId) =>
|
||||
state.copy(
|
||||
dataByCorrelationId = state.dataByCorrelationId - correlationId)
|
||||
}).receiveSignal {
|
||||
case RecoveryCompleted(state: EventsInFlight) =>
|
||||
state.dataByCorrelationId.foreach {
|
||||
case (correlationId, data) => performSideEffect(ctx.self, correlationId, data)
|
||||
}
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
|
|
@ -285,8 +290,10 @@ object PersistentActorCompileOnlyTest {
|
|||
evt match {
|
||||
case ItemAdded(id) => id +: state
|
||||
case ItemRemoved(id) => state.filter(_ != id)
|
||||
}).onRecoveryCompleted(state =>
|
||||
state.foreach(id => metadataRegistry ! GetMetaData(id, adapt)))
|
||||
}).receiveSignal {
|
||||
case RecoveryCompleted(state: List[Id]) =>
|
||||
state.foreach(id => metadataRegistry ! GetMetaData(id, adapt))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ import akka.actor.testkit.typed.scaladsl._
|
|||
import akka.actor.typed.ActorRef
|
||||
import akka.actor.typed.Behavior
|
||||
import akka.persistence.typed.PersistenceId
|
||||
import akka.persistence.typed.RecoveryCompleted
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import org.scalatest.WordSpecLike
|
||||
|
||||
|
|
@ -35,8 +36,9 @@ class PrimitiveStateSpec extends ScalaTestWithActorTestKit(PrimitiveStateSpec.co
|
|||
eventHandler = (state, event) => {
|
||||
probe.tell("eventHandler:" + state + ":" + event)
|
||||
state + event
|
||||
}).onRecoveryCompleted { n =>
|
||||
probe.tell("onRecoveryCompleted:" + n)
|
||||
}).receiveSignal {
|
||||
case RecoveryCompleted(n) =>
|
||||
probe.tell("onRecoveryCompleted:" + n)
|
||||
}
|
||||
|
||||
"A typed persistent actor with primitive state" must {
|
||||
|
|
|
|||
|
|
@ -10,7 +10,6 @@ import java.util.concurrent.atomic.AtomicInteger
|
|||
import scala.concurrent.Future
|
||||
import scala.util.Failure
|
||||
import scala.util.Success
|
||||
|
||||
import akka.actor.testkit.typed.scaladsl._
|
||||
import akka.actor.typed.ActorRef
|
||||
import akka.actor.typed.Behavior
|
||||
|
|
@ -19,6 +18,8 @@ import akka.persistence.SnapshotMetadata
|
|||
import akka.persistence.SnapshotSelectionCriteria
|
||||
import akka.persistence.snapshot.SnapshotStore
|
||||
import akka.persistence.typed.PersistenceId
|
||||
import akka.persistence.typed.SnapshotCompleted
|
||||
import akka.persistence.typed.SnapshotFailed
|
||||
import com.typesafe.config.Config
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import org.scalatest.WordSpecLike
|
||||
|
|
@ -91,9 +92,11 @@ object SnapshotMutableStateSpec {
|
|||
state.value += 1
|
||||
probe ! s"incremented-${state.value}"
|
||||
state
|
||||
}).onSnapshot {
|
||||
case (meta, Success(_)) => probe ! s"snapshot-success-${meta.sequenceNr}"
|
||||
case (meta, Failure(_)) => probe ! s"snapshot-failure-${meta.sequenceNr}"
|
||||
}).receiveSignal {
|
||||
case SnapshotCompleted(meta) =>
|
||||
probe ! s"snapshot-success-${meta.sequenceNr}"
|
||||
case SnapshotFailed(meta, _) =>
|
||||
probe ! s"snapshot-failure-${meta.sequenceNr}"
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -8,9 +8,10 @@ import akka.actor.typed.ActorRef
|
|||
import akka.actor.typed.{ Behavior, SupervisorStrategy }
|
||||
import akka.actor.typed.scaladsl.Behaviors
|
||||
import akka.persistence.typed.scaladsl.EventSourcedBehavior
|
||||
import scala.concurrent.duration._
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import akka.persistence.typed.PersistenceId
|
||||
import akka.persistence.typed.RecoveryCompleted
|
||||
|
||||
object BasicPersistentBehaviorCompileOnly {
|
||||
|
||||
|
|
@ -84,10 +85,10 @@ object BasicPersistentBehaviorCompileOnly {
|
|||
"TODO: process the command & return an Effect"),
|
||||
eventHandler = (state, evt) =>
|
||||
throw new RuntimeException(
|
||||
"TODO: process the event return the next state"))
|
||||
.onRecoveryCompleted { state =>
|
||||
"TODO: process the event return the next state")).receiveSignal {
|
||||
case RecoveryCompleted(state) ⇒
|
||||
throw new RuntimeException("TODO: add some end-of-recovery side-effect here")
|
||||
}
|
||||
}
|
||||
//#recovery
|
||||
|
||||
//#tagging
|
||||
|
|
@ -112,8 +113,9 @@ object BasicPersistentBehaviorCompileOnly {
|
|||
eventHandler = (state, evt) =>
|
||||
throw new RuntimeException(
|
||||
"TODO: process the event return the next state"))
|
||||
.onRecoveryCompleted { state =>
|
||||
throw new RuntimeException("TODO: add some end-of-recovery side-effect here")
|
||||
.receiveSignal {
|
||||
case RecoveryCompleted(state) ⇒
|
||||
throw new RuntimeException("TODO: add some end-of-recovery side-effect here")
|
||||
}
|
||||
|
||||
val debugAlwaysSnapshot: Behavior[Command] = Behaviors.setup { context =>
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue