diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingPersistenceSpec.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingPersistenceSpec.scala index 33e394764d..9f438c595a 100644 --- a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingPersistenceSpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingPersistenceSpec.scala @@ -8,17 +8,16 @@ import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit -import scala.concurrent.duration._ import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.Promise import scala.concurrent.duration._ - import akka.Done import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import akka.actor.testkit.typed.scaladsl.TestProbe import akka.actor.typed.ActorRef import akka.actor.typed.Behavior +import akka.actor.typed.PostStop import akka.actor.typed.internal.PoisonPill import akka.actor.typed.scaladsl.Behaviors import akka.cluster.sharding.ShardRegion.CurrentShardRegionState @@ -29,6 +28,7 @@ import akka.cluster.sharding.{ ClusterSharding => UntypedClusterSharding } import akka.cluster.typed.Cluster import akka.cluster.typed.Join import akka.persistence.typed.ExpectingReply +import akka.persistence.typed.RecoveryCompleted import akka.persistence.typed.scaladsl.Effect import com.typesafe.config.ConfigFactory import org.scalatest.WordSpecLike @@ -120,25 +120,25 @@ object ClusterShardingPersistenceSpec { stashing = false Effect.unstashAll() - case UnstashAllAndPassivate => + case UnstashAllAndPassivate ⇒ stashing = false shard ! Passivate(ctx.self) Effect.unstashAll() }, - eventHandler = - (state, evt) => if (state.isEmpty) evt else state + "|" + evt) - .onRecoveryCompleted { state => + eventHandler = (state, evt) ⇒ + if (state.isEmpty) evt else state + "|" + evt).receiveSignal { + case RecoveryCompleted(state) ⇒ ctx.log.debug("onRecoveryCompleted: [{}]", state) lifecycleProbes.get(entityId) match { - case null => ctx.log.debug("no lifecycleProbe (onRecoveryCompleted) for [{}]", entityId) - case p => p ! s"recoveryCompleted:$state" + case null ⇒ ctx.log.debug("no lifecycleProbe (onRecoveryCompleted) for [{}]", entityId) + case p ⇒ p ! s"recoveryCompleted:$state" } - } - .onPostStop(() => + case PostStop ⇒ lifecycleProbes.get(entityId) match { - case null => ctx.log.debug("no lifecycleProbe (postStop) for [{}]", entityId) - case p => p ! "stopped" - }) + case null ⇒ ctx.log.debug("no lifecycleProbe (postStop) for [{}]", entityId) + case p ⇒ p ! "stopped" + } + } } } diff --git a/akka-docs/src/main/paradox/typed/persistence.md b/akka-docs/src/main/paradox/typed/persistence.md index 4e44e07219..5dcfdf9f16 100644 --- a/akka-docs/src/main/paradox/typed/persistence.md +++ b/akka-docs/src/main/paradox/typed/persistence.md @@ -336,7 +336,7 @@ Strategies for that can be found in the @ref:[schema evolution](../persistence-s ## Recovery It is strongly discouraged to perform side effects in `applyEvent`, -so side effects should be performed once recovery has completed @scala[in the `onRecoveryCompleted` callback.] @java[by overriding `onRecoveryCompleted`] +so side effects should be performed once recovery has completed as a reaction to the `RecoveryCompleted` signal @scala[`receiveSignal` handler] @java[by overriding `receiveSignal`] Scala : @@snip [BasicPersistentBehaviorCompileOnly.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala) { #recovery } @@ -344,8 +344,7 @@ Scala Java : @@snip [BasicPersistentBehaviorTest.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java) { #recovery } -The `onRecoveryCompleted` takes @scala[an `ActorContext` and] the current `State`, -and doesn't return anything. +The `RecoveryCompleted` contains the current `State`. @ref[Snapshots)[persistence-snapshot.md] can be used for optimizing recovery times. diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/EventSourcedSignal.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/EventSourcedSignal.scala new file mode 100644 index 0000000000..317910645a --- /dev/null +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/EventSourcedSignal.scala @@ -0,0 +1,95 @@ +/* + * Copyright (C) 2009-2019 Lightbend Inc. + */ + +package akka.persistence.typed + +import akka.actor.typed.Signal +import akka.annotation.DoNotInherit +import akka.persistence.SnapshotMetadata +import akka.persistence.SnapshotSelectionCriteria + +/** + * Supertype for all Akka Persistence Typed specific signals + * + * Not for user extension + */ +@DoNotInherit +sealed trait EventSourcedSignal extends Signal + +final case class RecoveryCompleted[State](state: State) extends EventSourcedSignal { + + /** + * Java API + */ + def getState(): State = state +} +final case class RecoveryFailed(failure: Throwable) extends EventSourcedSignal { + + /** + * Java API + */ + def getFailure(): Throwable = failure +} + +final case class SnapshotCompleted(metadata: SnapshotMetadata) extends EventSourcedSignal { + + /** + * Java API + */ + def getSnapshotMetadata(): SnapshotMetadata = metadata +} +final case class SnapshotFailed(metadata: SnapshotMetadata, failure: Throwable) extends EventSourcedSignal { + + /** + * Java API + */ + def getFailure(): Throwable = failure + + /** + * Java API + */ + def getSnapshotMetadata(): SnapshotMetadata = metadata +} + +final case class DeleteSnapshotCompleted(target: DeletionTarget) extends EventSourcedSignal { + + /** + * Java API + */ + def getTarget(): DeletionTarget = target +} +final case class DeleteSnapshotFailed(target: DeletionTarget, failure: Throwable) extends EventSourcedSignal { + + /** + * Java API + */ + def getFailure(): Throwable = failure + + /** + * Java API + */ + def getTarget(): DeletionTarget = target +} + +/** + * Not for user extension + */ +@DoNotInherit +sealed trait DeletionTarget +object DeletionTarget { + final case class Individual(metadata: SnapshotMetadata) extends DeletionTarget { + + /** + * Java API + */ + def getSnapshotMetadata(): SnapshotMetadata = metadata + } + final case class Criteria(selection: SnapshotSelectionCriteria) extends DeletionTarget { + + /** + * Java API + */ + def getSnapshotSelection(): SnapshotSelectionCriteria = selection + } +} diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala index 3cc5f56d97..1596d66aec 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala @@ -5,18 +5,18 @@ package akka.persistence.typed.internal import scala.concurrent.ExecutionContext -import scala.util.Try -import akka.Done import akka.actor.Cancellable import akka.actor.typed.Logger import akka.actor.typed.scaladsl.ActorContext import akka.actor.ActorRef +import akka.actor.typed.Signal import akka.annotation.InternalApi import akka.persistence._ import akka.persistence.typed.EventAdapter import akka.persistence.typed.PersistenceId import akka.persistence.typed.scaladsl.EventSourcedBehavior +import akka.util.ConstantFun import akka.util.OptionVal /** @@ -29,12 +29,10 @@ private[akka] final class BehaviorSetup[C, E, S](val context: ActorContext[Inter val commandHandler: EventSourcedBehavior.CommandHandler[C, E, S], val eventHandler: EventSourcedBehavior.EventHandler[S, E], val writerIdentity: EventSourcedBehaviorImpl.WriterIdentity, - val recoveryCompleted: S => Unit, - val onRecoveryFailure: Throwable => Unit, - val onSnapshot: (SnapshotMetadata, Try[Done]) => Unit, - val tagger: E => Set[String], + private val signalHandler: PartialFunction[Signal, Unit], + val tagger: E ⇒ Set[String], val eventAdapter: EventAdapter[E, _], - val snapshotWhen: (S, E, Long) => Boolean, + val snapshotWhen: (S, E, Long) ⇒ Boolean, val recovery: Recovery, var holdingRecoveryPermit: Boolean, val settings: EventSourcedSettings, @@ -99,6 +97,10 @@ private[akka] final class BehaviorSetup[C, E, S](val context: ActorContext[Inter recoveryTimer = OptionVal.None } + def onSignal(signal: Signal): Unit = { + signalHandler.applyOrElse(signal, ConstantFun.scalaAnyToUnit) + } + } /** diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala index 076d313880..e2616c199f 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala @@ -11,15 +11,12 @@ import scala.util.Failure import scala.util.Success import scala.util.Try import scala.util.control.NonFatal - import akka.Done import akka.actor.typed import akka.actor.typed.BackoffSupervisorStrategy import akka.actor.typed.Behavior import akka.actor.typed.BehaviorInterceptor -import akka.actor.typed.Logger import akka.actor.typed.PostStop -import akka.actor.typed.PreRestart import akka.actor.typed.Signal import akka.actor.typed.SupervisorStrategy import akka.actor.typed.scaladsl.ActorContext @@ -29,21 +26,14 @@ import akka.persistence._ import akka.persistence.typed.EventAdapter import akka.persistence.typed.NoOpEventAdapter import akka.persistence.typed.PersistenceId +import akka.persistence.typed.SnapshotCompleted +import akka.persistence.typed.SnapshotFailed import akka.persistence.typed.scaladsl._ import akka.util.ConstantFun @InternalApi private[akka] object EventSourcedBehaviorImpl { - def defaultOnSnapshot[A](ctx: ActorContext[A], meta: SnapshotMetadata, result: Try[Done]): Unit = { - result match { - case Success(_) => - ctx.log.debug("Save snapshot successful, snapshot metadata: [{}]", meta) - case Failure(t) => - ctx.log.error(t, "Save snapshot failed, snapshot metadata: [{}]", meta) - } - } - object WriterIdentity { // ok to wrap around (2*Int.MaxValue restarts will not happen within a journal roundtrip) @@ -68,16 +58,12 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State]( loggerClass: Class[_], journalPluginId: Option[String] = None, snapshotPluginId: Option[String] = None, - recoveryCompleted: State => Unit = ConstantFun.scalaAnyToUnit, - postStop: () => Unit = ConstantFun.unitToUnit, - preRestart: () => Unit = ConstantFun.unitToUnit, - tagger: Event => Set[String] = (_: Event) => Set.empty[String], + tagger: Event ⇒ Set[String] = (_: Event) ⇒ Set.empty[String], eventAdapter: EventAdapter[Event, Any] = NoOpEventAdapter.instance[Event], - snapshotWhen: (State, Event, Long) => Boolean = ConstantFun.scalaAnyThreeToFalse, + snapshotWhen: (State, Event, Long) ⇒ Boolean = ConstantFun.scalaAnyThreeToFalse, recovery: Recovery = Recovery(), supervisionStrategy: SupervisorStrategy = SupervisorStrategy.stop, - onSnapshot: (SnapshotMetadata, Try[Done]) => Unit = ConstantFun.scalaAnyTwoToUnit, - onRecoveryFailure: Throwable => Unit = ConstantFun.scalaAnyToUnit) + override val signalHandler: PartialFunction[Signal, Unit] = PartialFunction.empty) extends EventSourcedBehavior[Command, Event, State] { import EventSourcedBehaviorImpl.WriterIdentity @@ -90,25 +76,24 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State]( // stashState outside supervise because StashState should survive restarts due to persist failures val stashState = new StashState(settings) + val actualSignalHandler: PartialFunction[Signal, Unit] = signalHandler.orElse { + // default signal handler is always the fallback + case SnapshotCompleted(meta: SnapshotMetadata) ⇒ + ctx.log.debug("Save snapshot successful, snapshot metadata: [{}]", meta) + case SnapshotFailed(meta, failure) ⇒ + ctx.log.error(failure, "Save snapshot failed, snapshot metadata: [{}]", meta) + } + Behaviors .supervise { - Behaviors.setup[Command] { _ => - // the default impl needs context which isn't available until here, so we - // use the anyTwoToUnit as a marker to use the default - val actualOnSnapshot: (SnapshotMetadata, Try[Done]) => Unit = - if (onSnapshot == ConstantFun.scalaAnyTwoToUnit) - EventSourcedBehaviorImpl.defaultOnSnapshot[Command](ctx, _, _) - else onSnapshot - - val eventsourcedSetup = new BehaviorSetup(ctx.asInstanceOf[ActorContext[InternalProtocol]], + Behaviors.setup[Command] { _ ⇒ + val eventSourcedSetup = new BehaviorSetup(ctx.asInstanceOf[ActorContext[InternalProtocol]], persistenceId, emptyState, commandHandler, eventHandler, WriterIdentity.newIdentity(), - recoveryCompleted, - onRecoveryFailure, - actualOnSnapshot, + actualSignalHandler, tagger, eventAdapter, snapshotWhen, @@ -132,22 +117,26 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State]( signal: Signal, target: SignalTarget[Any]): Behavior[Any] = { if (signal == PostStop) { - eventsourcedSetup.cancelRecoveryTimer() + eventSourcedSetup.cancelRecoveryTimer() // clear stash to be GC friendly stashState.clearStashBuffers() - signalPostStop(eventsourcedSetup.log) - } else if (signal == PreRestart) { - signalPreRestart(eventsourcedSetup.log) } - target(ctx, signal) + val nextBehavior = target(ctx, signal) + try { + eventSourcedSetup.onSignal(signal) + } catch { + case NonFatal(ex) ⇒ + ctx.asScala.log.error(ex, s"Error while processing signal [{}]", signal) + } + nextBehavior } } - val widened = RequestingRecoveryPermit(eventsourcedSetup).widen[Any] { - case res: JournalProtocol.Response => InternalProtocol.JournalResponse(res) - case res: SnapshotProtocol.Response => InternalProtocol.SnapshotterResponse(res) - case RecoveryPermitter.RecoveryPermitGranted => InternalProtocol.RecoveryPermitGranted - case internal: InternalProtocol => internal // such as RecoveryTickEvent - case cmd: Command @unchecked => InternalProtocol.IncomingCommand(cmd) + val widened = RequestingRecoveryPermit(eventSourcedSetup).widen[Any] { + case res: JournalProtocol.Response ⇒ InternalProtocol.JournalResponse(res) + case res: SnapshotProtocol.Response ⇒ InternalProtocol.SnapshotterResponse(res) + case RecoveryPermitter.RecoveryPermitGranted ⇒ InternalProtocol.RecoveryPermitGranted + case internal: InternalProtocol ⇒ internal // such as RecoveryTickEvent + case cmd: Command @unchecked ⇒ InternalProtocol.IncomingCommand(cmd) } Behaviors.intercept(onStopInterceptor)(widened).narrow[Command] } @@ -156,30 +145,8 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State]( .onFailure[JournalFailureException](supervisionStrategy) } - def signalPostStop(log: Logger): Unit = { - try postStop() - catch { - case NonFatal(e) => - log.warning("Exception in postStop: {}", e) - } - } - - def signalPreRestart(log: Logger): Unit = { - try preRestart() - catch { - case NonFatal(e) => - log.warning("Exception in preRestart: {}", e) - } - } - - override def onRecoveryCompleted(callback: State => Unit): EventSourcedBehavior[Command, Event, State] = - copy(recoveryCompleted = callback) - - override def onPostStop(callback: () => Unit): EventSourcedBehavior[Command, Event, State] = - copy(postStop = callback) - - override def onPreRestart(callback: () => Unit): EventSourcedBehavior[Command, Event, State] = - copy(preRestart = callback) + override def receiveSignal(handler: PartialFunction[Signal, Unit]): EventSourcedBehavior[Command, Event, State] = + copy(signalHandler = handler) override def snapshotWhen(predicate: (State, Event, Long) => Boolean): EventSourcedBehavior[Command, Event, State] = copy(snapshotWhen = predicate) @@ -210,16 +177,10 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State]( override def eventAdapter(adapter: EventAdapter[Event, _]): EventSourcedBehavior[Command, Event, State] = copy(eventAdapter = adapter.asInstanceOf[EventAdapter[Event, Any]]) - override def onSnapshot( - callback: (SnapshotMetadata, Try[Done]) => Unit): EventSourcedBehavior[Command, Event, State] = - copy(onSnapshot = callback) - override def onPersistFailure( backoffStrategy: BackoffSupervisorStrategy): EventSourcedBehavior[Command, Event, State] = copy(supervisionStrategy = backoffStrategy) - override def onRecoveryFailure(callback: Throwable => Unit): EventSourcedBehavior[Command, Event, State] = - copy(onRecoveryFailure = callback) } /** Protocol used internally by the eventsourced behaviors. */ diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala index e2922140a5..c71930a00a 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala @@ -16,6 +16,8 @@ import akka.annotation.InternalApi import akka.event.Logging import akka.persistence.JournalProtocol._ import akka.persistence._ +import akka.persistence.typed.RecoveryFailed +import akka.persistence.typed.RecoveryCompleted import akka.persistence.typed.internal.ReplayingEvents.ReplayingState import akka.persistence.typed.internal.Running.WithSeqNrAccessible @@ -158,9 +160,9 @@ private[akka] final class ReplayingEvents[C, E, S](override val setup: BehaviorS sequenceNr: Long, message: Option[Any]): Behavior[InternalProtocol] = { try { - setup.onRecoveryFailure(cause) + setup.onSignal(RecoveryFailed(cause)) } catch { - case NonFatal(t) => setup.log.error(t, "onRecoveryFailure threw exception") + case NonFatal(t) => setup.log.error(t, "RecoveryFailed signal handler threw exception") } setup.cancelRecoveryTimer() tryReturnRecoveryPermit("on replay failure: " + cause.getMessage) @@ -179,7 +181,7 @@ private[akka] final class ReplayingEvents[C, E, S](override val setup: BehaviorS protected def onRecoveryCompleted(state: ReplayingState[S]): Behavior[InternalProtocol] = try { tryReturnRecoveryPermit("replay completed successfully") - setup.recoveryCompleted(state.state) + setup.onSignal(RecoveryCompleted(state.state)) if (state.receivedPoisonPill && isInternalStashEmpty && !isUnstashAllInProgress) Behaviors.stopped diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala index e6bc6fcb87..f87f855919 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala @@ -6,10 +6,6 @@ package akka.persistence.typed.internal import scala.annotation.tailrec import scala.collection.immutable -import scala.util.Failure -import scala.util.Success - -import akka.Done import akka.actor.UnhandledMessage import akka.actor.typed.Behavior import akka.actor.typed.Signal @@ -20,10 +16,14 @@ import akka.annotation.InternalApi import akka.persistence.JournalProtocol._ import akka.persistence._ import akka.persistence.journal.Tagged - import akka.persistence.typed.Callback +import akka.persistence.typed.DeleteSnapshotCompleted +import akka.persistence.typed.DeleteSnapshotFailed +import akka.persistence.typed.DeletionTarget import akka.persistence.typed.EventRejectedException import akka.persistence.typed.SideEffect +import akka.persistence.typed.SnapshotCompleted +import akka.persistence.typed.SnapshotFailed import akka.persistence.typed.Stop import akka.persistence.typed.UnstashAll import akka.persistence.typed.internal.Running.WithSeqNrAccessible @@ -310,21 +310,23 @@ private[akka] object Running { } def onSnapshotterResponse(response: SnapshotProtocol.Response): Unit = { - response match { + val signal = response match { case SaveSnapshotSuccess(meta) => - setup.onSnapshot(meta, Success(Done)) + Some(SnapshotCompleted(meta)) case SaveSnapshotFailure(meta, ex) => - setup.onSnapshot(meta, Failure(ex)) - - // FIXME #24698 not implemented yet - case DeleteSnapshotFailure(_, _) => ??? - case DeleteSnapshotSuccess(_) => ??? - case DeleteSnapshotsFailure(_, _) => ??? - case DeleteSnapshotsSuccess(_) => ??? - - // ignore LoadSnapshot messages - case _ => + Some(SnapshotFailed(meta, ex)) + case DeleteSnapshotSuccess(meta) => + Some(DeleteSnapshotCompleted(DeletionTarget.Individual(meta))) + case DeleteSnapshotFailure(meta, ex) => + Some(DeleteSnapshotFailed(DeletionTarget.Individual(meta), ex)) + case DeleteSnapshotsSuccess(criteria) => + Some(DeleteSnapshotCompleted(DeletionTarget.Criteria(criteria))) + case DeleteSnapshotsFailure(criteria, failure) => + Some(DeleteSnapshotFailed(DeletionTarget.Criteria(criteria), failure)) + case _ => None } + + signal.foreach(setup.onSignal _) } Behaviors diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedBehavior.scala index 9f730aad22..12855b7e99 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedBehavior.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedBehavior.scala @@ -7,8 +7,6 @@ package akka.persistence.typed.javadsl import java.util.Collections import java.util.Optional -import scala.util.Failure -import scala.util.Success import akka.actor.typed import akka.actor.typed.BackoffSupervisorStrategy import akka.actor.typed.Behavior @@ -16,7 +14,6 @@ import akka.actor.typed.Behavior.DeferredBehavior import akka.actor.typed.javadsl.ActorContext import akka.annotation.ApiMayChange import akka.annotation.InternalApi -import akka.persistence.SnapshotMetadata import akka.persistence.typed.EventAdapter import akka.persistence.typed._ import akka.persistence.typed.internal._ @@ -76,6 +73,19 @@ abstract class EventSourcedBehavior[Command, Event, State >: Null] private[akka] */ protected def eventHandler(): EventHandler[State, Event] + /** + * Override to react on general lifecycle signals and persistence specific signals (subtypes of + * [[akka.persistence.typed.EventSourcedSignal]]). + * + * Use [[EventSourcedBehavior#newSignalHandlerBuilder]] to define the signal handler. + */ + protected def signalHandler(): SignalHandler = SignalHandler.Empty + + /** + * @return A new, mutable signal handler builder + */ + protected final def newSignalHandlerBuilder(): SignalHandlerBuilder = new SignalHandlerBuilder + /** * @return A new, mutable, command handler builder */ @@ -89,35 +99,6 @@ abstract class EventSourcedBehavior[Command, Event, State >: Null] private[akka] protected final def newEventHandlerBuilder(): EventHandlerBuilder[State, Event] = EventHandlerBuilder.builder[State, Event]() - /** - * The callback is invoked to notify the actor that the recovery process - * is finished. - */ - def onRecoveryCompleted(state: State): Unit = () - - /** - * The callback is invoked to notify the actor that the recovery process - * has failed - */ - def onRecoveryFailure(failure: Throwable): Unit = () - - /** - * The callback is invoked to notify that the actor has stopped. - */ - def onPostStop(): Unit = () - - /** - * The callback is invoked to notify that the actor is restarted. - */ - def onPreRestart(): Unit = () - - /** - * Override to get notified when a snapshot is finished. - * - * @param result None if successful otherwise contains the exception thrown when snapshotting - */ - def onSnapshot(meta: SnapshotMetadata, result: Optional[Throwable]): Unit = () - /** * Override and define that snapshot should be saved every N events. * @@ -171,38 +152,23 @@ abstract class EventSourcedBehavior[Command, Event, State >: Null] private[akka] emptyState, (state, cmd) => commandHandler()(state, cmd).asInstanceOf[EffectImpl[Event, State]], eventHandler()(_, _), - getClass) - .onRecoveryCompleted(onRecoveryCompleted) - .onPostStop(() => onPostStop()) - .onPreRestart(() => onPreRestart()) - .snapshotWhen(snapshotWhen) - .withTagger(tagger) - .onSnapshot((meta, result) => { - result match { - case Success(_) => - context.asScala.log.debug("Save snapshot successful, snapshot metadata: [{}]", meta) - case Failure(e) => - context.asScala.log.error(e, "Save snapshot failed, snapshot metadata: [{}]", meta) - } + getClass).snapshotWhen(snapshotWhen).withTagger(tagger).eventAdapter(eventAdapter()) - onSnapshot(meta, result match { - case Success(_) => Optional.empty() - case Failure(t) => Optional.of(t) - }) - }) - .eventAdapter(eventAdapter()) - .onRecoveryFailure(onRecoveryFailure) + val handler = signalHandler() + val behaviorWithSignalHandler = + if (handler.isEmpty) behavior + else behavior.receiveSignal(handler.handler) if (onPersistFailure.isPresent) - behavior.onPersistFailure(onPersistFailure.get) + behaviorWithSignalHandler.onPersistFailure(onPersistFailure.get) else - behavior + behaviorWithSignalHandler } /** * The last sequence number that was persisted, can only be called from inside the handlers of an `EventSourcedBehavior` */ - def lastSequenceNumber(ctx: ActorContext[_]): Long = { + final def lastSequenceNumber(ctx: ActorContext[_]): Long = { scaladsl.EventSourcedBehavior.lastSequenceNumber(ctx.asScala) } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/SignalHandler.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/SignalHandler.scala new file mode 100644 index 0000000000..ff92c25dee --- /dev/null +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/SignalHandler.scala @@ -0,0 +1,66 @@ +/* + * Copyright (C) 2009-2019 Lightbend Inc. + */ + +package akka.persistence.typed.javadsl + +import akka.actor.typed.Signal +import akka.annotation.InternalApi +import akka.japi.function.Procedure +import akka.japi.function.{ Effect ⇒ JEffect } + +object SignalHandler { + val Empty: SignalHandler = new SignalHandler(PartialFunction.empty) +} + +final class SignalHandler(_handler: PartialFunction[Signal, Unit]) { + + /** + * INTERNAL API + */ + @InternalApi + private[akka] def isEmpty: Boolean = _handler eq PartialFunction.empty + + /** + * INTERNAL API + */ + @InternalApi + private[akka] def handler: PartialFunction[Signal, Unit] = _handler +} + +/** + * Mutable builder for handling signals in [[EventSourcedBehavior]] + * + * Not for user instantiation, use [[EventSourcedBehavior#newSignalHandlerBuilder()]] to get an instance. + */ +final class SignalHandlerBuilder { + + private var handler: PartialFunction[Signal, Unit] = PartialFunction.empty + + /** + * If the behavior recieves a signal of type `T`, `callback` is invoked with the signal instance as input. + */ + def onSignal[T <: Signal](signalType: Class[T], callback: Procedure[T]): SignalHandlerBuilder = { + val newPF: PartialFunction[Signal, Unit] = { + case t if signalType.isInstance(t) ⇒ + callback(t.asInstanceOf[T]) + } + handler = newPF.orElse(handler) + this + } + + /** + * If the behavior receives exactly the signal `signal`, `callback` is invoked. + */ + def onSignal[T <: Signal](signal: T, callback: JEffect): SignalHandlerBuilder = { + val newPF: PartialFunction[Signal, Unit] = { + case `signal` ⇒ + callback() + } + handler = newPF.orElse(handler) + this + } + + def build: SignalHandler = new SignalHandler(handler) + +} diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala index 9173d6788a..aabc48964b 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala @@ -6,11 +6,11 @@ package akka.persistence.typed.scaladsl import scala.util.Try import scala.annotation.tailrec - import akka.Done import akka.actor.typed.BackoffSupervisorStrategy import akka.actor.typed.Behavior import akka.actor.typed.Behavior.DeferredBehavior +import akka.actor.typed.Signal import akka.actor.typed.internal.LoggerClass import akka.actor.typed.internal.InterceptorImpl import akka.actor.typed.internal.adapter.ActorContextAdapter @@ -127,30 +127,18 @@ object EventSourcedBehavior { def persistenceId: PersistenceId /** - * The `callback` function is called to notify that the recovery process has finished. + * Allows the event sourced behavior to react on signals. + * + * The regular lifecycle signals can be handled as well as + * Akka Persistence specific signals (snapshot and recovery related). Those are all subtypes of + * [[akka.persistence.typed.EventSourcedSignal]] */ - def onRecoveryCompleted(callback: State => Unit): EventSourcedBehavior[Command, Event, State] + def receiveSignal(signalHandler: PartialFunction[Signal, Unit]): EventSourcedBehavior[Command, Event, State] /** - * The `callback` function is called to notify that recovery has failed. For setting a supervision - * strategy `onPersistFailure` + * @return The currently defined signal handler or an empty handler if no custom handler previously defined */ - def onRecoveryFailure(callback: Throwable => Unit): EventSourcedBehavior[Command, Event, State] - - /** - * The `callback` function is called to notify that the actor has stopped. - */ - def onPostStop(callback: () => Unit): EventSourcedBehavior[Command, Event, State] - - /** - * The `callback` function is called to notify that the actor is restarted. - */ - def onPreRestart(callback: () => Unit): EventSourcedBehavior[Command, Event, State] - - /** - * The `callback` function is called to notify when a snapshot is complete. - */ - def onSnapshot(callback: (SnapshotMetadata, Try[Done]) => Unit): EventSourcedBehavior[Command, Event, State] + def signalHandler: PartialFunction[Signal, Unit] /** * Initiates a snapshot if the given function returns true. diff --git a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/EventSourcedActorFailureTest.java b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/EventSourcedActorFailureTest.java index 63c4d5fa39..ec50543547 100644 --- a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/EventSourcedActorFailureTest.java +++ b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/EventSourcedActorFailureTest.java @@ -9,8 +9,12 @@ import akka.actor.testkit.typed.javadsl.TestKitJunitResource; import akka.actor.testkit.typed.javadsl.TestProbe; import akka.actor.typed.ActorRef; import akka.actor.typed.Behavior; +import akka.actor.typed.Signal; import akka.actor.typed.SupervisorStrategy; +import akka.japi.function.Effect; import akka.persistence.typed.PersistenceId; +import akka.persistence.typed.RecoveryCompleted; +import akka.persistence.typed.RecoveryFailed; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import org.junit.ClassRule; @@ -39,13 +43,19 @@ class FailingEventSourcedActor extends EventSourcedBehavior { + probe.tell("starting"); + }) + .onSignal( + RecoveryFailed.class, + (signal) -> { + recoveryFailureProbe.tell(signal.getFailure()); + }) + .build(); } @Override diff --git a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/NullEmptyStateTest.java b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/NullEmptyStateTest.java index 7afc6c6734..f215a284f1 100644 --- a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/NullEmptyStateTest.java +++ b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/NullEmptyStateTest.java @@ -10,6 +10,7 @@ import akka.actor.typed.ActorRef; import akka.actor.typed.Behavior; import akka.actor.typed.javadsl.Behaviors; import akka.persistence.typed.PersistenceId; +import akka.persistence.typed.RecoveryCompleted; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import org.junit.ClassRule; @@ -39,8 +40,14 @@ public class NullEmptyStateTest extends JUnitSuite { } @Override - public void onRecoveryCompleted(String s) { - probe.tell("onRecoveryCompleted:" + s); + public SignalHandler signalHandler() { + return newSignalHandlerBuilder() + .onSignal( + RecoveryCompleted.class, + (completed) -> { + probe.tell("onRecoveryCompleted:" + completed.getState()); + }) + .build(); } @Override diff --git a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java index b56d68d64b..5e89bf088c 100644 --- a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java +++ b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java @@ -17,9 +17,7 @@ import akka.persistence.query.NoOffset; import akka.persistence.query.PersistenceQuery; import akka.persistence.query.Sequence; import akka.persistence.query.journal.leveldb.javadsl.LeveldbReadJournal; -import akka.persistence.typed.EventAdapter; -import akka.persistence.typed.ExpectingReply; -import akka.persistence.typed.PersistenceId; +import akka.persistence.typed.*; import akka.stream.ActorMaterializer; import akka.stream.javadsl.Sink; import akka.actor.testkit.typed.javadsl.TestKitJunitResource; @@ -416,8 +414,19 @@ public class PersistentActorJavaDslTest extends JUnitSuite { } @Override - public void onSnapshot(SnapshotMetadata meta, Optional result) { - snapshotProbe.ref().tell(result); + public SignalHandler signalHandler() { + return newSignalHandlerBuilder() + .onSignal( + SnapshotCompleted.class, + (completed) -> { + snapshotProbe.ref().tell(Optional.empty()); + }) + .onSignal( + SnapshotFailed.class, + (signal) -> { + snapshotProbe.ref().tell(Optional.of(signal.getFailure())); + }) + .build(); } }); ActorRef c = testKit.spawn(snapshoter); @@ -464,9 +473,16 @@ public class PersistentActorJavaDslTest extends JUnitSuite { Behaviors.setup( ctx -> new CounterBehavior(new PersistenceId("c5"), ctx) { + @Override - public void onPostStop() { - probe.ref().tell("stopped"); + public SignalHandler signalHandler() { + return newSignalHandlerBuilder() + .onSignal( + PostStop.instance(), + () -> { + probe.ref().tell("stopped"); + }) + .build(); } }); ActorRef c = testKit.spawn(counter); @@ -620,8 +636,14 @@ public class PersistentActorJavaDslTest extends JUnitSuite { } @Override - public void onRecoveryCompleted(Object o) { - startedProbe.tell("started!"); + public SignalHandler signalHandler() { + return newSignalHandlerBuilder() + .onSignal( + RecoveryCompleted.class, + (completed) -> { + startedProbe.tell("started!"); + }) + .build(); } @Override @@ -695,8 +717,14 @@ public class PersistentActorJavaDslTest extends JUnitSuite { } @Override - public void onRecoveryCompleted(String s) { - probe.tell(lastSequenceNumber(context) + " onRecoveryCompleted"); + public SignalHandler signalHandler() { + return newSignalHandlerBuilder() + .onSignal( + RecoveryCompleted.class, + (completed) -> { + probe.tell(lastSequenceNumber(context) + " onRecoveryCompleted"); + }) + .build(); } } diff --git a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PrimitiveStateTest.java b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PrimitiveStateTest.java index 545f14a7ee..322db5625d 100644 --- a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PrimitiveStateTest.java +++ b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PrimitiveStateTest.java @@ -10,6 +10,7 @@ import akka.actor.typed.ActorRef; import akka.actor.typed.Behavior; import akka.actor.typed.javadsl.Behaviors; import akka.persistence.typed.PersistenceId; +import akka.persistence.typed.RecoveryCompleted; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import org.junit.ClassRule; @@ -39,8 +40,14 @@ public class PrimitiveStateTest extends JUnitSuite { } @Override - public void onRecoveryCompleted(Integer n) { - probe.tell("onRecoveryCompleted:" + n); + public SignalHandler signalHandler() { + return newSignalHandlerBuilder() + .onSignal( + RecoveryCompleted.class, + (completed) -> { + probe.tell("onRecoveryCompleted:" + completed.getState()); + }) + .build(); } @Override diff --git a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java index 3be760d6cf..fbb354faca 100644 --- a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java +++ b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java @@ -9,9 +9,12 @@ import akka.actor.typed.SupervisorStrategy; import akka.actor.typed.javadsl.ActorContext; import akka.actor.typed.javadsl.Behaviors; import akka.persistence.typed.PersistenceId; +import akka.persistence.typed.RecoveryCompleted; import akka.persistence.typed.javadsl.CommandHandler; import akka.persistence.typed.javadsl.EventHandler; import akka.persistence.typed.javadsl.EventSourcedBehavior; +import akka.persistence.typed.javadsl.SignalHandler; + import java.time.Duration; import java.util.ArrayList; @@ -194,9 +197,16 @@ public class BasicPersistentBehaviorTest { } // #recovery + @Override - public void onRecoveryCompleted(State state) { - throw new RuntimeException("TODO: add some end-of-recovery side-effect here"); + public SignalHandler signalHandler() { + return newSignalHandlerBuilder() + .onSignal( + RecoveryCompleted.class, + (completed) -> { + throw new RuntimeException("TODO: add some end-of-recovery side-effect here"); + }) + .build(); } // #recovery diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/internal/RecoveryPermitterSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/internal/RecoveryPermitterSpec.scala index 2197fa7579..0e2ef0179f 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/internal/RecoveryPermitterSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/internal/RecoveryPermitterSpec.scala @@ -14,11 +14,12 @@ import akka.persistence.RecoveryPermitter.{ RecoveryPermitGranted, RequestRecove import akka.persistence.typed.scaladsl.EventSourcedBehavior.CommandHandler import akka.persistence.typed.scaladsl.{ Effect, EventSourcedBehavior } import akka.testkit.EventFilter + import scala.concurrent.duration._ import scala.util.control.NoStackTrace - import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import akka.persistence.typed.PersistenceId +import akka.persistence.typed.RecoveryCompleted import org.scalatest.WordSpecLike object RecoveryPermitterSpec { @@ -51,9 +52,10 @@ object RecoveryPermitterSpec { }, eventHandler = { (state, event) => eventProbe.ref ! event; state - }).onRecoveryCompleted { _ => - eventProbe.ref ! Recovered - if (throwOnRecovery) throw new TE + }).receiveSignal { + case RecoveryCompleted(state) => + eventProbe.ref ! Recovered + if (throwOnRecovery) throw new TE } def forwardingBehavior(target: TestProbe[Any]): Behavior[Any] = diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorFailureSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorFailureSpec.scala index fd00006004..b9fb323f03 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorFailureSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorFailureSpec.scala @@ -8,17 +8,21 @@ import scala.collection.immutable import scala.concurrent.Future import scala.concurrent.duration._ import scala.util.Try - import akka.actor.testkit.typed.TestException import akka.actor.testkit.typed.TestKitSettings import akka.actor.testkit.typed.scaladsl._ import akka.actor.typed.ActorRef +import akka.actor.typed.PostStop +import akka.actor.typed.PreRestart +import akka.actor.typed.Signal import akka.actor.typed.SupervisorStrategy import akka.actor.typed.scaladsl.Behaviors import akka.persistence.AtomicWrite import akka.persistence.journal.inmem.InmemJournal import akka.persistence.typed.EventRejectedException import akka.persistence.typed.PersistenceId +import akka.persistence.typed.RecoveryCompleted +import akka.persistence.typed.RecoveryFailed import com.typesafe.config.Config import com.typesafe.config.ConfigFactory import org.scalatest.WordSpecLike @@ -77,24 +81,29 @@ class EventSourcedBehaviorFailureSpec implicit val testSettings: TestKitSettings = TestKitSettings(system) def failingPersistentActor(pid: PersistenceId, - probe: ActorRef[String]): EventSourcedBehavior[String, String, String] = + probe: ActorRef[String], + additionalSignalHandler: PartialFunction[Signal, Unit] = PartialFunction.empty) + : EventSourcedBehavior[String, String, String] = EventSourcedBehavior[String, String, String](pid, "", - (_, cmd) => { + (_, cmd) ⇒ { if (cmd == "wrong") throw new TestException("wrong command") probe.tell("persisting") Effect.persist(cmd) }, - (state, event) => { + (state, event) ⇒ { probe.tell(event) state + event }) - .onRecoveryCompleted { _ => - probe.tell("starting") - } - .onPostStop(() => probe.tell("stopped")) - .onPreRestart(() => probe.tell("restarting")) + .receiveSignal(additionalSignalHandler.orElse { + case RecoveryCompleted(_) ⇒ + probe.tell("starting") + case PostStop ⇒ + probe.tell("stopped") + case PreRestart ⇒ + probe.tell("restarting") + }) .onPersistFailure( SupervisorStrategy.restartWithBackoff(1.milli, 5.millis, 0.1).withLoggingEnabled(enabled = false)) @@ -103,7 +112,11 @@ class EventSourcedBehaviorFailureSpec "call onRecoveryFailure when replay fails" in { val probe = TestProbe[String]() val excProbe = TestProbe[Throwable]() - spawn(failingPersistentActor(PersistenceId("fail-recovery"), probe.ref).onRecoveryFailure(t => excProbe.ref ! t)) + spawn(failingPersistentActor(PersistenceId("fail-recovery"), probe.ref, { + case RecoveryFailed(t) ⇒ + println("signal recovery failed") + excProbe.ref ! t + })) excProbe.expectMessageType[TestException].message shouldEqual "Nope" probe.expectMessage("restarting") @@ -111,8 +124,10 @@ class EventSourcedBehaviorFailureSpec "handle exceptions in onRecoveryFailure" in { val probe = TestProbe[String]() - val pa = spawn(failingPersistentActor(PersistenceId("fail-recovery-twice"), probe.ref).onRecoveryFailure(_ => - throw TestException("recovery call back failure"))) + val pa = spawn(failingPersistentActor(PersistenceId("fail-recovery-twice"), probe.ref, { + case RecoveryFailed(t) ⇒ + throw TestException("recovery call back failure") + })) pa ! "one" probe.expectMessage("starting") probe.expectMessage("persisting") diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala index 057315fb06..0c7fc3665a 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala @@ -35,12 +35,17 @@ import akka.persistence.snapshot.SnapshotStore import akka.persistence.typed.EventAdapter import akka.persistence.typed.ExpectingReply import akka.persistence.typed.PersistenceId +import akka.persistence.typed.RecoveryCompleted +import akka.persistence.typed.SnapshotCompleted +import akka.persistence.typed.SnapshotFailed import akka.stream.ActorMaterializer import akka.stream.scaladsl.Sink import com.typesafe.config.Config import com.typesafe.config.ConfigFactory import org.scalatest.WordSpecLike +import scala.util.Failure + object EventSourcedBehaviorSpec { //#event-wrapper @@ -248,14 +253,17 @@ object EventSourcedBehaviorSpec { Effect.none.thenStop() }, - eventHandler = (state, evt) => + eventHandler = (state, evt) ⇒ evt match { - case Incremented(delta) => + case Incremented(delta) ⇒ probe ! ((state, evt)) State(state.value + delta, state.history :+ state.value) - }).onRecoveryCompleted(_ => ()).onSnapshot { - case (_, result) => - snapshotProbe ! result + }).receiveSignal { + case RecoveryCompleted(_) ⇒ () + case SnapshotCompleted(_) ⇒ + snapshotProbe ! Success(Done) + case SnapshotFailed(_, failure) ⇒ + snapshotProbe ! Failure(failure) } } diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedSequenceNumberSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedSequenceNumberSpec.scala index 07056a7eb8..1451ca16a4 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedSequenceNumberSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedSequenceNumberSpec.scala @@ -8,6 +8,7 @@ import akka.actor.testkit.typed.scaladsl.{ ScalaTestWithActorTestKit, TestProbe import akka.actor.typed.{ ActorRef, Behavior } import akka.actor.typed.scaladsl.Behaviors import akka.persistence.typed.PersistenceId +import akka.persistence.typed.RecoveryCompleted import com.typesafe.config.ConfigFactory import org.scalatest.WordSpecLike @@ -24,14 +25,17 @@ class EventSourcedSequenceNumberSpec with WordSpecLike { private def behavior(pid: PersistenceId, probe: ActorRef[String]): Behavior[String] = - Behaviors.setup(ctx => + Behaviors.setup(ctx ⇒ EventSourcedBehavior[String, String, String](pid, "", { (_, command) => probe ! (EventSourcedBehavior.lastSequenceNumber(ctx) + " onCommand") - Effect.persist(command).thenRun(_ => probe ! (EventSourcedBehavior.lastSequenceNumber(ctx) + " thenRun")) - }, { (state, evt) => + Effect.persist(command).thenRun(_ ⇒ probe ! (EventSourcedBehavior.lastSequenceNumber(ctx) + " thenRun")) + }, { (state, evt) ⇒ probe ! (EventSourcedBehavior.lastSequenceNumber(ctx) + " eventHandler") state + evt - }).onRecoveryCompleted(_ => probe ! (EventSourcedBehavior.lastSequenceNumber(ctx) + " onRecoveryComplete"))) + }).receiveSignal { + case RecoveryCompleted(_) ⇒ + probe ! (EventSourcedBehavior.lastSequenceNumber(ctx) + " onRecoveryComplete") + }) "The sequence number" must { diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/NullEmptyStateSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/NullEmptyStateSpec.scala index 2114c22ce8..4b948f4299 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/NullEmptyStateSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/NullEmptyStateSpec.scala @@ -9,6 +9,7 @@ import akka.actor.testkit.typed.scaladsl._ import akka.actor.typed.ActorRef import akka.actor.typed.Behavior import akka.persistence.typed.PersistenceId +import akka.persistence.typed.RecoveryCompleted import com.typesafe.config.ConfigFactory import org.scalatest.WordSpecLike @@ -35,8 +36,9 @@ class NullEmptyStateSpec extends ScalaTestWithActorTestKit(NullEmptyStateSpec.co eventHandler = (state, event) => { probe.tell("eventHandler:" + state + ":" + event) if (state == null) event else state + event - }).onRecoveryCompleted { s => - probe.tell("onRecoveryCompleted:" + s) + }).receiveSignal { + case RecoveryCompleted(s) ⇒ + probe.tell("onRecoveryCompleted:" + s) } "A typed persistent actor with primitive state" must { diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PerformanceSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PerformanceSpec.scala index 63bf098a58..f5d5e0b06c 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PerformanceSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PerformanceSpec.scala @@ -7,7 +7,6 @@ package akka.persistence.typed.scaladsl import java.util.UUID import scala.concurrent.duration._ - import akka.actor.testkit.typed.TestException import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import akka.actor.testkit.typed.scaladsl.TestProbe @@ -15,6 +14,7 @@ import akka.actor.typed.ActorRef import akka.actor.typed.SupervisorStrategy import akka.actor.typed.scaladsl.Behaviors import akka.persistence.typed.PersistenceId +import akka.persistence.typed.RecoveryCompleted import akka.persistence.typed.scaladsl.EventSourcedBehavior.CommandHandler import com.typesafe.config.ConfigFactory import org.scalatest.WordSpecLike @@ -74,16 +74,17 @@ object PerformanceSpec { EventSourcedBehavior[Command, String, String](persistenceId = PersistenceId(name), "", commandHandler = CommandHandler.command { - case StopMeasure => + case StopMeasure ⇒ Effect.none.thenRun(_ => probe.ref ! StopMeasure) - case FailAt(sequence) => + case FailAt(sequence) ⇒ Effect.none.thenRun(_ => parameters.failAt = sequence) - case command => other(command, parameters) + case command ⇒ other(command, parameters) }, eventHandler = { case (state, _) => state - }).onRecoveryCompleted { _ => - if (parameters.every(1000)) print("r") + }).receiveSignal { + case RecoveryCompleted(_) => + if (parameters.every(1000)) print("r") } }) .onFailure(SupervisorStrategy.restart) diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala index 7d04002dd3..0cdc7373da 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala @@ -10,6 +10,7 @@ import akka.actor.typed.{ ActorRef, Behavior } import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.TimerScheduler import akka.persistence.typed.PersistenceId +import akka.persistence.typed.RecoveryCompleted import akka.persistence.typed.SideEffect import scala.concurrent.Future @@ -69,37 +70,41 @@ object PersistentActorCompileOnlyTest { response.map(response => AcknowledgeSideEffect(response.correlationId)).foreach(sender ! _) } - val behavior: Behavior[Command] = Behaviors.setup( - ctx => - EventSourcedBehavior[Command, Event, EventsInFlight](persistenceId = PersistenceId("recovery-complete-id"), - emptyState = EventsInFlight(0, Map.empty), - commandHandler = (state, cmd) => - cmd match { - case DoSideEffect(data) => - Effect - .persist( - IntentRecorded(state.nextCorrelationId, data)) - .thenRun { _ => - performSideEffect(ctx.self, - state.nextCorrelationId, - data) - } - case AcknowledgeSideEffect(correlationId) => - Effect.persist(SideEffectAcknowledged(correlationId)) - }, - eventHandler = (state, evt) => - evt match { - case IntentRecorded(correlationId, data) => - EventsInFlight( - nextCorrelationId = correlationId + 1, - dataByCorrelationId = state.dataByCorrelationId + (correlationId -> data)) - case SideEffectAcknowledged(correlationId) => - state.copy( - dataByCorrelationId = state.dataByCorrelationId - correlationId) - }).onRecoveryCompleted(state => - state.dataByCorrelationId.foreach { - case (correlationId, data) => performSideEffect(ctx.self, correlationId, data) - })) + val behavior: Behavior[Command] = + Behaviors.setup( + ctx => + EventSourcedBehavior[Command, Event, EventsInFlight](persistenceId = PersistenceId("recovery-complete-id"), + emptyState = EventsInFlight(0, Map.empty), + commandHandler = (state, cmd) => + cmd match { + case DoSideEffect(data) => + Effect + .persist( + IntentRecorded(state.nextCorrelationId, data)) + .thenRun { _ => + performSideEffect(ctx.self, + state.nextCorrelationId, + data) + } + case AcknowledgeSideEffect(correlationId) => + Effect.persist( + SideEffectAcknowledged(correlationId)) + }, + eventHandler = (state, evt) => + evt match { + case IntentRecorded(correlationId, data) => + EventsInFlight( + nextCorrelationId = correlationId + 1, + dataByCorrelationId = state.dataByCorrelationId + (correlationId → data)) + case SideEffectAcknowledged(correlationId) => + state.copy( + dataByCorrelationId = state.dataByCorrelationId - correlationId) + }).receiveSignal { + case RecoveryCompleted(state: EventsInFlight) => + state.dataByCorrelationId.foreach { + case (correlationId, data) => performSideEffect(ctx.self, correlationId, data) + } + }) } @@ -285,8 +290,10 @@ object PersistentActorCompileOnlyTest { evt match { case ItemAdded(id) => id +: state case ItemRemoved(id) => state.filter(_ != id) - }).onRecoveryCompleted(state => - state.foreach(id => metadataRegistry ! GetMetaData(id, adapt))) + }).receiveSignal { + case RecoveryCompleted(state: List[Id]) => + state.foreach(id => metadataRegistry ! GetMetaData(id, adapt)) + } } } diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PrimitiveStateSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PrimitiveStateSpec.scala index 63f97b2595..7ac59fd3b0 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PrimitiveStateSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PrimitiveStateSpec.scala @@ -9,6 +9,7 @@ import akka.actor.testkit.typed.scaladsl._ import akka.actor.typed.ActorRef import akka.actor.typed.Behavior import akka.persistence.typed.PersistenceId +import akka.persistence.typed.RecoveryCompleted import com.typesafe.config.ConfigFactory import org.scalatest.WordSpecLike @@ -35,8 +36,9 @@ class PrimitiveStateSpec extends ScalaTestWithActorTestKit(PrimitiveStateSpec.co eventHandler = (state, event) => { probe.tell("eventHandler:" + state + ":" + event) state + event - }).onRecoveryCompleted { n => - probe.tell("onRecoveryCompleted:" + n) + }).receiveSignal { + case RecoveryCompleted(n) => + probe.tell("onRecoveryCompleted:" + n) } "A typed persistent actor with primitive state" must { diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/SnapshotMutableStateSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/SnapshotMutableStateSpec.scala index 40d88d9040..9ba38a8f5f 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/SnapshotMutableStateSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/SnapshotMutableStateSpec.scala @@ -10,7 +10,6 @@ import java.util.concurrent.atomic.AtomicInteger import scala.concurrent.Future import scala.util.Failure import scala.util.Success - import akka.actor.testkit.typed.scaladsl._ import akka.actor.typed.ActorRef import akka.actor.typed.Behavior @@ -19,6 +18,8 @@ import akka.persistence.SnapshotMetadata import akka.persistence.SnapshotSelectionCriteria import akka.persistence.snapshot.SnapshotStore import akka.persistence.typed.PersistenceId +import akka.persistence.typed.SnapshotCompleted +import akka.persistence.typed.SnapshotFailed import com.typesafe.config.Config import com.typesafe.config.ConfigFactory import org.scalatest.WordSpecLike @@ -91,9 +92,11 @@ object SnapshotMutableStateSpec { state.value += 1 probe ! s"incremented-${state.value}" state - }).onSnapshot { - case (meta, Success(_)) => probe ! s"snapshot-success-${meta.sequenceNr}" - case (meta, Failure(_)) => probe ! s"snapshot-failure-${meta.sequenceNr}" + }).receiveSignal { + case SnapshotCompleted(meta) => + probe ! s"snapshot-success-${meta.sequenceNr}" + case SnapshotFailed(meta, _) => + probe ! s"snapshot-failure-${meta.sequenceNr}" } } diff --git a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala index 70b5f2274e..e1e7d8d666 100644 --- a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala +++ b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala @@ -8,9 +8,10 @@ import akka.actor.typed.ActorRef import akka.actor.typed.{ Behavior, SupervisorStrategy } import akka.actor.typed.scaladsl.Behaviors import akka.persistence.typed.scaladsl.EventSourcedBehavior -import scala.concurrent.duration._ +import scala.concurrent.duration._ import akka.persistence.typed.PersistenceId +import akka.persistence.typed.RecoveryCompleted object BasicPersistentBehaviorCompileOnly { @@ -84,10 +85,10 @@ object BasicPersistentBehaviorCompileOnly { "TODO: process the command & return an Effect"), eventHandler = (state, evt) => throw new RuntimeException( - "TODO: process the event return the next state")) - .onRecoveryCompleted { state => + "TODO: process the event return the next state")).receiveSignal { + case RecoveryCompleted(state) ⇒ throw new RuntimeException("TODO: add some end-of-recovery side-effect here") - } + } //#recovery //#tagging @@ -112,8 +113,9 @@ object BasicPersistentBehaviorCompileOnly { eventHandler = (state, evt) => throw new RuntimeException( "TODO: process the event return the next state")) - .onRecoveryCompleted { state => - throw new RuntimeException("TODO: add some end-of-recovery side-effect here") + .receiveSignal { + case RecoveryCompleted(state) ⇒ + throw new RuntimeException("TODO: add some end-of-recovery side-effect here") } val debugAlwaysSnapshot: Behavior[Command] = Behaviors.setup { context =>