diff --git a/akka-docs/src/main/paradox/typed/persistence.md b/akka-docs/src/main/paradox/typed/persistence.md index b8e587c737..2019e569b4 100644 --- a/akka-docs/src/main/paradox/typed/persistence.md +++ b/akka-docs/src/main/paradox/typed/persistence.md @@ -54,20 +54,23 @@ The command handler is a function with @java[2 parameters for]@scala[3 parameter current `State` and `Command`. A command handler returns an `Effect` directive that defines what event or events, if any, to persist. -@java[Effects are created using a factory that is returned via the `Effect()` method] -@scala[Effects are created using the `Effect` factory] -and can be used to create various effects such as: +Effects are created using @java[a factory that is returned via the `Effect()` method] @scala[the `Effect` factory] +and can be one of: * `persist` will persist one single event or several events atomically, i.e. all events are stored or none of them are stored if there is an error * `none` no events are to be persisted, for example a read-only command * `unhandled` the command is unhandled (not supported) in current state +* `stop` stop this actor -External side effects are to be performed after successful persist which is achieved with the `andThen` function e.g @scala[`Effect.persist(..).andThen`]@java[`Effect().persist(..).andThen`]. +In addition to returning the primary `Effect` for the command `PersistentBehavior`s can also +chain side effects (`SideEffect`s) are to be performed after successful persist which is achieved with the `andThen` and `thenRun` +function e.g @scala[`Effect.persist(..).andThen`]@java[`Effect().persist(..).andThen`]. The `thenRun` function +is a convenience around creating a `SideEffect`. In the example below a reply is sent to the `replyTo` ActorRef. Note that the new state after applying -the event is passed as parameter to the `andThen` function. All `andThen*` registered callbacks -are executed after successful execution of the persist statement (or immediately, in case of `none` and `unhandled`). +the event is passed as parameter to the `thenRun` function. All `thenRun` registered callbacks +are executed sequentially after successful execution of the persist statement (or immediately, in case of `none` and `unhandled`). ### Event handler @@ -126,6 +129,7 @@ The `PersistentBehavior` can then be run as with any plain typed actor as descri @java[The `ActorContext` can be obtained with `Behaviors.setup` and be passed as a constructor parameter.] + ## Larger example After processing a message, plain typed actors are able to return the `Behavior` that is used @@ -206,6 +210,34 @@ Scala Java : @@snip [InDepthPersistentBehaviorTest.java]($akka$/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/InDepthPersistentBehaviorTest.java) { #behavior } + +## Effects and Side Effects + +Each command has a single `Effect` which can be: + +* Persist events +* None: Accept the comment but no effects +* Unhandled: Don't handle this message + +Note that there is only one of these. It is not possible to both persist and say none/unhandled. +These are created using @java[a factory that is returned via the `Effect()` method] +@scala[the `Effect` factory] and once created +additional `SideEffects` can be added. + +Most of them time this will be done with the `thenRun` method on the `Effect` above. It is also possible +factor out common `SideEffect`s. For example: + +Scala +: @@snip [BasicPersistentBehaviorsCompileOnly.scala]($akka$/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala) { #commonChainedEffects } + +Java +: @@snip [BasicPersistentBehaviorsCompileOnly.scala]($akka$/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java) { #commonChainedEffects } + +### Side effects ordering and guarantees + +Any `SideEffect`s are executed on an at-once basis and will not be executed if the persist fails. +The `SideEffect`s are executed sequentially, it is not possible to execute `SideEffect`s in parallel. + ## Serialization The same @ref:[serialization](../serialization.md) mechanism as for untyped diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/SideEffect.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/SideEffect.scala new file mode 100644 index 0000000000..40fecd3243 --- /dev/null +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/SideEffect.scala @@ -0,0 +1,45 @@ +/* + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.persistence.typed + +import akka.japi.function +import akka.annotation.{ DoNotInherit, InternalApi } + +/** + * A [[SideEffect]] is an side effect that can be chained after a main effect. + * + * Persist, none and unhandled are main effects. Then any number of + * call backs can be added to these effects with `andThen`. + * + * Not for user extension + */ +sealed abstract class SideEffect[State] + +/** INTERNAL API */ +@InternalApi +final private[akka] case class Callback[State](effect: State ⇒ Unit) extends SideEffect[State] + +/** INTERNAL API */ +@InternalApi +private[akka] case object Stop extends SideEffect[Nothing] + +object SideEffect { + /** + * Create a ChainedEffect that can be run after Effects + */ + def apply[State](callback: State ⇒ Unit): SideEffect[State] = + Callback(callback) + + /** + * Java API + * + * Create a ChainedEffect that can be run after Effects + */ + def create[State](callback: function.Procedure[State]): SideEffect[State] = + Callback(s ⇒ callback.apply(s)) + + def stop[State](): SideEffect[State] = Stop.asInstanceOf[SideEffect[State]] +} + diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EffectImpl.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EffectImpl.scala index 67a496a21b..4358db72f3 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EffectImpl.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EffectImpl.scala @@ -4,11 +4,11 @@ package akka.persistence.typed.internal -import akka.persistence.typed.javadsl -import akka.persistence.typed.scaladsl +import akka.persistence.typed.{ SideEffect, javadsl, scaladsl } import scala.collection.{ immutable ⇒ im } -import akka.annotation.{ DoNotInherit, InternalApi } +import akka.annotation.InternalApi +import akka.persistence.typed.scaladsl.Effect /** INTERNAL API */ @InternalApi @@ -16,27 +16,26 @@ private[akka] abstract class EffectImpl[+Event, State] extends javadsl.Effect[Ev /* All events that will be persisted in this effect */ override def events: im.Seq[Event] = Nil - /* All side effects that will be performed in this effect */ - override def sideEffects[E >: Event]: im.Seq[ChainableEffect[E, State]] = Nil + override def andThen(chainedEffect: SideEffect[State]): EffectImpl[Event, State] = + CompositeEffect(this, chainedEffect) + } /** INTERNAL API */ @InternalApi private[akka] object CompositeEffect { - def apply[Event, State](effect: EffectImpl[Event, State], sideEffects: ChainableEffect[Event, State]): EffectImpl[Event, State] = + def apply[Event, State](effect: Effect[Event, State], sideEffects: SideEffect[State]): EffectImpl[Event, State] = CompositeEffect[Event, State](effect, sideEffects :: Nil) } /** INTERNAL API */ @InternalApi private[akka] final case class CompositeEffect[Event, State]( - persistingEffect: EffectImpl[Event, State], - _sideEffects: im.Seq[ChainableEffect[Event, State]]) extends EffectImpl[Event, State] { + persistingEffect: Effect[Event, State], + _sideEffects: im.Seq[SideEffect[State]]) extends EffectImpl[Event, State] { override val events = persistingEffect.events - override def sideEffects[E >: Event]: im.Seq[ChainableEffect[E, State]] = _sideEffects.asInstanceOf[im.Seq[ChainableEffect[E, State]]] - override def toString: String = s"CompositeEffect($persistingEffect, sideEffects: ${_sideEffects.size})" } @@ -55,21 +54,7 @@ private[akka] case class Persist[Event, State](event: Event) extends EffectImpl[ @InternalApi private[akka] case class PersistAll[Event, State](override val events: im.Seq[Event]) extends EffectImpl[Event, State] -/** INTERNAL API */ -@InternalApi -private[akka] case class SideEffect[Event, State](effect: State ⇒ Unit) extends ChainableEffect[Event, State] - -/** INTERNAL API */ -@InternalApi -private[akka] case object Stop extends ChainableEffect[Nothing, Nothing] - /** INTERNAL API */ @InternalApi private[akka] case object Unhandled extends EffectImpl[Nothing, Nothing] -/** - * Not for user extension - */ -@DoNotInherit -abstract class ChainableEffect[Event, State] extends EffectImpl[Event, State] - diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala index 5dff502a6e..e84a997297 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala @@ -12,9 +12,10 @@ import akka.annotation.InternalApi import akka.persistence.JournalProtocol._ import akka.persistence._ import akka.persistence.journal.Tagged -import akka.persistence.typed.EventRejectedException +import akka.persistence.typed.{ Callback, EventRejectedException, SideEffect, Stop } import akka.persistence.typed.internal.EventsourcedBehavior.{ InternalProtocol, MDC } import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol._ +import akka.persistence.typed.scaladsl.Effect import scala.annotation.tailrec import scala.collection.immutable @@ -84,8 +85,8 @@ private[akka] object EventsourcedRunning { @tailrec def applyEffects( msg: Any, state: EventsourcedState[S], - effect: EffectImpl[E, S], - sideEffects: immutable.Seq[ChainableEffect[_, S]] = Nil + effect: Effect[E, S], + sideEffects: immutable.Seq[SideEffect[S]] = Nil ): Behavior[InternalProtocol] = { if (setup.log.isDebugEnabled) setup.log.debug( @@ -135,15 +136,12 @@ private[akka] object EventsourcedRunning { tryUnstash(applySideEffects(sideEffects, state)) } - case _: PersistNothing.type @unchecked ⇒ + case _: PersistNothing.type ⇒ tryUnstash(applySideEffects(sideEffects, state)) - case _: Unhandled.type @unchecked ⇒ + case _: Unhandled.type ⇒ applySideEffects(sideEffects, state) Behavior.unhandled - - case c: ChainableEffect[_, S] ⇒ - applySideEffect(c, state) } } @@ -172,7 +170,7 @@ private[akka] object EventsourcedRunning { state: EventsourcedState[S], numberOfEvents: Int, shouldSnapshotAfterPersist: Boolean, - sideEffects: immutable.Seq[ChainableEffect[_, S]] + sideEffects: immutable.Seq[SideEffect[S]] ): Behavior[InternalProtocol] = { setup.setMdc(persistingEventsMdc) new PersistingEvents(state, numberOfEvents, shouldSnapshotAfterPersist, sideEffects) @@ -182,7 +180,7 @@ private[akka] object EventsourcedRunning { var state: EventsourcedState[S], numberOfEvents: Int, shouldSnapshotAfterPersist: Boolean, - var sideEffects: immutable.Seq[ChainableEffect[_, S]]) + var sideEffects: immutable.Seq[SideEffect[S]]) extends MutableBehavior[EventsourcedBehavior.InternalProtocol] { private var eventCounter = 0 @@ -277,7 +275,7 @@ private[akka] object EventsourcedRunning { // -------------------------- - def applySideEffects(effects: immutable.Seq[ChainableEffect[_, S]], state: EventsourcedState[S]): Behavior[InternalProtocol] = { + def applySideEffects(effects: immutable.Seq[SideEffect[S]], state: EventsourcedState[S]): Behavior[InternalProtocol] = { var res: Behavior[InternalProtocol] = handlingCommands(state) val it = effects.iterator @@ -292,16 +290,16 @@ private[akka] object EventsourcedRunning { res } - def applySideEffect(effect: ChainableEffect[_, S], state: EventsourcedState[S]): Behavior[InternalProtocol] = effect match { + def applySideEffect(effect: SideEffect[S], state: EventsourcedState[S]): Behavior[InternalProtocol] = effect match { case _: Stop.type @unchecked ⇒ Behaviors.stopped - case SideEffect(sideEffects) ⇒ + case Callback(sideEffects) ⇒ sideEffects(state.state) Behaviors.same case _ ⇒ - throw new IllegalArgumentException(s"Not supported effect detected [${effect.getClass.getName}]!") + throw new IllegalArgumentException(s"Not supported side effect detected [${effect.getClass.getName}]!") } } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/Effect.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/Effect.scala index 96ca7cb167..751d0087fd 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/Effect.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/Effect.scala @@ -7,6 +7,7 @@ package akka.persistence.typed.javadsl import akka.annotation.DoNotInherit import akka.japi.function import akka.persistence.typed.internal._ +import akka.persistence.typed.{ SideEffect, Stop } import scala.collection.JavaConverters._ @@ -33,7 +34,7 @@ object EffectFactory extends EffectFactories[Nothing, Nothing, Nothing] /** * Stop this persistent actor */ - def stop: Effect[Event, State] = Stop.asInstanceOf[ChainableEffect[Event, State]] + def stop: Effect[Event, State] = none.thenStop() /** * This command is not handled, but it is not an error that it isn't. @@ -54,9 +55,15 @@ object EffectFactory extends EffectFactories[Nothing, Nothing, Nothing] self: EffectImpl[Event, State] ⇒ /** Convenience method to register a side effect with just a callback function */ final def andThen(callback: function.Procedure[State]): Effect[Event, State] = - CompositeEffect(this, SideEffect[Event, State](s ⇒ callback.apply(s))) + CompositeEffect(this, SideEffect[State](s ⇒ callback.apply(s))) /** Convenience method to register a side effect that doesn't need access to state */ final def andThen(callback: function.Effect): Effect[Event, State] = - CompositeEffect(this, SideEffect[Event, State]((_: State) ⇒ callback.apply())) + CompositeEffect(this, SideEffect[State]((_: State) ⇒ callback.apply())) + + def andThen(chainedEffect: SideEffect[State]): Effect[Event, State] + + final def thenStop(): Effect[Event, State] = + CompositeEffect(this, Stop.asInstanceOf[SideEffect[State]]) + } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/Effect.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/Effect.scala index 049fc380b9..c636f0f346 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/Effect.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/Effect.scala @@ -4,7 +4,9 @@ package akka.persistence.typed.scaladsl +import akka.japi.function import akka.annotation.DoNotInherit +import akka.persistence.typed.{ SideEffect, Stop } import akka.persistence.typed.internal._ import scala.collection.{ immutable ⇒ im } @@ -14,42 +16,48 @@ import scala.collection.{ immutable ⇒ im } */ object Effect { - // TODO docs + /** + * Persist a single event + * + * Side effects can be chained with `andThen` + */ def persist[Event, State](event: Event): Effect[Event, State] = Persist(event) - // TODO docs + /** + * Persist multiple events + * + * Side effects can be chained with `andThen` + */ def persist[Event, A <: Event, B <: Event, State](evt1: A, evt2: B, events: Event*): Effect[Event, State] = persist(evt1 :: evt2 :: events.toList) - // TODO docs - def persist[Event, State](eventOpt: Option[Event]): Effect[Event, State] = - eventOpt match { - case Some(evt) ⇒ persist[Event, State](evt) - case _ ⇒ none[Event, State] - } - - // TODO docs + /** + * Persist multiple events + * + * Side effects can be chained with `andThen` + */ def persist[Event, State](events: im.Seq[Event]): Effect[Event, State] = PersistAll(events) - // TODO docs - def persist[Event, State](events: im.Seq[Event], sideEffects: im.Seq[ChainableEffect[Event, State]]): Effect[Event, State] = - new CompositeEffect[Event, State](PersistAll[Event, State](events), sideEffects) - /** * Do not persist anything + * + * Side effects can be chained with `andThen` */ def none[Event, State]: Effect[Event, State] = PersistNothing.asInstanceOf[Effect[Event, State]] /** * This command is not handled, but it is not an error that it isn't. + * + * Side effects can be chained with `andThen` */ def unhandled[Event, State]: Effect[Event, State] = Unhandled.asInstanceOf[Effect[Event, State]] /** * Stop this persistent actor + * Side effects can be chained with `andThen` */ - def stop[Event, State]: ChainableEffect[Event, State] = Stop.asInstanceOf[ChainableEffect[Event, State]] + def stop[Event, State]: Effect[Event, State] = none.andThenStop() } /** @@ -58,17 +66,30 @@ object Effect { * Not for user extension. */ @DoNotInherit -trait Effect[+Event, State] { self: EffectImpl[Event, State] ⇒ +trait Effect[+Event, State] { /* All events that will be persisted in this effect */ def events: im.Seq[Event] - def sideEffects[E >: Event]: im.Seq[ChainableEffect[E, State]] + /** + * Run the given callback. Callbacks are run sequentially. + */ + final def thenRun(callback: State ⇒ Unit): Effect[Event, State] = + CompositeEffect(this, SideEffect(callback)) - /** Convenience method to register a side effect with just a callback function */ - final def andThen(callback: State ⇒ Unit): Effect[Event, State] = - CompositeEffect(this, SideEffect[Event, State](callback)) + /** + * Run the given callback after the current Effect + */ + def andThen(chainedEffect: SideEffect[State]): Effect[Event, State] + + /** + * Run the given callbacks sequentially after the current Effect + */ + final def andThen(chainedEffects: im.Seq[SideEffect[State]]): Effect[Event, State] = + CompositeEffect(this, chainedEffects) /** The side effect is to stop the actor */ - def andThenStop: Effect[Event, State] = - CompositeEffect(this, Effect.stop[Event, State]) + def andThenStop(): Effect[Event, State] = { + CompositeEffect(this, Stop.asInstanceOf[SideEffect[State]]) + } } + diff --git a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java index c1417ade80..62d5d82f57 100644 --- a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java +++ b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java @@ -9,6 +9,7 @@ import akka.actor.typed.javadsl.ActorContext; import akka.actor.typed.ActorRef; import akka.persistence.typed.EventAdapter; import akka.actor.testkit.typed.javadsl.TestInbox; +import akka.persistence.typed.SideEffect; import akka.util.Timeout; import java.util.*; @@ -149,6 +150,12 @@ public class PersistentActorCompileOnlyTest { private List events = new ArrayList<>(); } + //#commonChainedEffects + // Factored out Chained effect + static final SideEffect commonChainedEffect = SideEffect.create(s -> System.out.println("Command handled!")); + + //#commonChainedEffects + private PersistentBehavior pa = new PersistentBehavior("pa") { @Override public ExampleState emptyState() { @@ -157,10 +164,15 @@ public class PersistentActorCompileOnlyTest { @Override public CommandHandler commandHandler() { - return commandHandlerBuilder(ExampleState.class) - .matchCommand(Cmd.class, (state, cmd) -> Effect().persist(new Evt(cmd.data)) - .andThen(() -> cmd.sender.tell(new Ack()))) - .build(); + + //#commonChainedEffects + return commandHandlerBuilder(ExampleState.class) + .matchCommand(Cmd.class, (state, cmd) -> Effect().persist(new Evt(cmd.data)) + .andThen(() -> cmd.sender.tell(new Ack())) + .andThen(commonChainedEffect) + ) + .build(); + //#commonChainedEffects } @Override diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/ManyRecoveriesSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/ManyRecoveriesSpec.scala index 6a8e758e57..e46e76d05f 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/ManyRecoveriesSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/ManyRecoveriesSpec.scala @@ -31,7 +31,7 @@ object ManyRecoveriesSpec { persistenceId = name, emptyState = "", commandHandler = CommandHandler.command { - case Cmd(s) ⇒ Effect.persist(Evt(s)).andThen(_ ⇒ probe.ref ! s"$name-$s") + case Cmd(s) ⇒ Effect.persist(Evt(s)).thenRun(_ ⇒ probe.ref ! s"$name-$s") }, eventHandler = { case (state, _) ⇒ latch.foreach(Await.ready(_, 10.seconds)); state diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/OptionalSnapshotStoreSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/OptionalSnapshotStoreSpec.scala index d9795f7dcb..42d07bb6b5 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/OptionalSnapshotStoreSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/OptionalSnapshotStoreSpec.scala @@ -31,7 +31,7 @@ object OptionalSnapshotStoreSpec { persistenceId = name, emptyState = State(), commandHandler = CommandHandler.command { - _ ⇒ Effect.persist(Event()).andThen(probe.ref ! _) + _ ⇒ Effect.persist(Event()).thenRun(probe.ref ! _) }, eventHandler = { case (_, _) ⇒ State() diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PerformanceSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PerformanceSpec.scala index 92fef2c853..a100fa2465 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PerformanceSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PerformanceSpec.scala @@ -64,8 +64,8 @@ object PerformanceSpec { persistenceId = name, "", commandHandler = CommandHandler.command { - case StopMeasure ⇒ Effect.none.andThen(_ ⇒ probe.ref ! StopMeasure) - case FailAt(sequence) ⇒ Effect.none.andThen(_ ⇒ parameters.failAt = sequence) + case StopMeasure ⇒ Effect.none.thenRun(_ ⇒ probe.ref ! StopMeasure) + case FailAt(sequence) ⇒ Effect.none.thenRun(_ ⇒ parameters.failAt = sequence) case command ⇒ other(command, parameters) }, eventHandler = { @@ -80,7 +80,7 @@ object PerformanceSpec { def eventSourcedTestPersistenceBehavior(name: String, probe: TestProbe[Command]) = behavior(name, probe) { case (CommandWithEvent(evt), parameters) ⇒ - Effect.persist(evt).andThen(_ ⇒ { + Effect.persist(evt).thenRun(_ ⇒ { parameters.persistCalls += 1 if (parameters.every(1000)) print(".") if (parameters.shouldFail) throw TE("boom") diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala index 54e2e39d81..6a24a50781 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala @@ -6,11 +6,11 @@ package akka.persistence.typed.scaladsl import scala.concurrent.ExecutionContext import scala.concurrent.duration._ - import akka.actor.typed.{ ActorRef, Behavior } import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.ActorContext import akka.actor.typed.scaladsl.TimerScheduler +import akka.persistence.typed.SideEffect object PersistentActorCompileOnlyTest { @@ -72,7 +72,7 @@ object PersistentActorCompileOnlyTest { commandHandler = CommandHandler.command { case Cmd(data, sender) ⇒ Effect.persist(Evt(data)) - .andThen { _ ⇒ + .thenRun { _ ⇒ sender ! Ack } }, @@ -115,7 +115,7 @@ object PersistentActorCompileOnlyTest { commandHandler = (ctx: ActorContext[Command], state, cmd) ⇒ cmd match { case DoSideEffect(data) ⇒ - Effect.persist(IntentRecorded(state.nextCorrelationId, data)).andThen { _ ⇒ + Effect.persist(IntentRecorded(state.nextCorrelationId, data)).thenRun { _ ⇒ performSideEffect(ctx.self, state.nextCorrelationId, data) } case AcknowledgeSideEffect(correlationId) ⇒ @@ -224,7 +224,7 @@ object PersistentActorCompileOnlyTest { commandHandler = (ctx, _, cmd) ⇒ cmd match { case RegisterTask(task) ⇒ Effect.persist(TaskRegistered(task)) - .andThen { _ ⇒ + .thenRun { _ ⇒ val child = ctx.spawn[Nothing](worker(task), task) // This assumes *any* termination of the child may trigger a `TaskDone`: ctx.watchWith(child, TaskDone(task)) @@ -279,7 +279,7 @@ object PersistentActorCompileOnlyTest { def addItem(id: Id, self: ActorRef[Command]) = Effect .persist[Event, List[Id]](ItemAdded(id)) - .andThen(_ ⇒ metadataRegistry ! GetMetaData(id, adapt)) + .thenRun(_ ⇒ metadataRegistry ! GetMetaData(id, adapt)) PersistentBehaviors.receive[Command, Event, List[Id]]( persistenceId = "basket-1", @@ -342,6 +342,14 @@ object PersistentActorCompileOnlyTest { if (currentState == newMood) Effect.none else Effect.persist(MoodChanged(newMood)) + //#commonChainedEffects + // Example factoring out a chained effect rather than using `andThen` + val commonChainedEffects = SideEffect[Mood](_ ⇒ println("Command processed")) + // Then in a command handler: + Effect.persist(Remembered("Yep")) // persist event + .andThen(commonChainedEffects) // add on common chained effect + //#commonChainedEffects + val commandHandler: CommandHandler[Command, Event, Mood] = { (_, state, cmd) ⇒ cmd match { case Greet(whom) ⇒ @@ -349,15 +357,15 @@ object PersistentActorCompileOnlyTest { Effect.none case CheerUp(sender) ⇒ changeMoodIfNeeded(state, Happy) - .andThen { _ ⇒ + .thenRun { _ ⇒ sender ! Ack - } + }.andThen(commonChainedEffects) case Remember(memory) ⇒ // A more elaborate example to show we still have full control over the effects // if needed (e.g. when some logic is factored out but you want to add more effects) val commonEffects: Effect[Event, Mood] = changeMoodIfNeeded(state, Happy) - Effect.persist(commonEffects.events :+ Remembered(memory), commonEffects.sideEffects) - + Effect.persist(commonEffects.events :+ Remembered(memory)) + .andThen(commonChainedEffects) } } @@ -386,7 +394,7 @@ object PersistentActorCompileOnlyTest { private val commandHandler: CommandHandler[Command, Event, State] = CommandHandler.command { case Enough ⇒ Effect.persist(Done) - .andThen((_: State) ⇒ println("yay")) + .thenRun((_: State) ⇒ println("yay")) .andThenStop } @@ -411,7 +419,7 @@ object PersistentActorCompileOnlyTest { emptyState = new First, commandHandler = CommandHandler.command { cmd ⇒ - Effect.persist(cmd).andThen { + Effect.persist(cmd).thenRun { case _: First ⇒ println("first") case _: Second ⇒ println("second") } diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorSpec.scala index 8796ebb7c4..a4b5b32404 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorSpec.scala @@ -126,14 +126,14 @@ object PersistentBehaviorSpec { case IncrementThenLogThenStop ⇒ Effect.persist(Incremented(1)) - .andThen { (_: State) ⇒ + .thenRun { (_: State) ⇒ loggingActor ! firstLogging } .andThenStop case IncrementTwiceThenLogThenStop ⇒ Effect.persist(Incremented(1), Incremented(2)) - .andThen { (_: State) ⇒ + .thenRun { (_: State) ⇒ loggingActor ! firstLogging } .andThenStop @@ -170,30 +170,30 @@ object PersistentBehaviorSpec { case IncrementTwiceAndThenLog ⇒ Effect .persist(Incremented(1), Incremented(1)) - .andThen { (_: State) ⇒ + .thenRun { (_: State) ⇒ loggingActor ! firstLogging } - .andThen { _ ⇒ + .thenRun { _ ⇒ loggingActor ! secondLogging } case EmptyEventsListAndThenLog ⇒ Effect .persist(List.empty) // send empty list of events - .andThen { _ ⇒ + .thenRun { _ ⇒ loggingActor ! firstLogging } case DoNothingAndThenLog ⇒ Effect .none - .andThen { _ ⇒ + .thenRun { _ ⇒ loggingActor ! firstLogging } case LogThenStop ⇒ Effect.none[Event, State] - .andThen { _ ⇒ + .thenRun { _ ⇒ loggingActor ! firstLogging } .andThenStop diff --git a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExample1.scala b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExample1.scala index 1d9fed3a82..2d464fada7 100644 --- a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExample1.scala +++ b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExample1.scala @@ -43,7 +43,7 @@ object AccountExample1 { else { Effect .persist(Withdrawn(amount)) - .andThen { + .thenRun { case Some(OpenedAccount(balance)) ⇒ // do some side-effect using balance println(balance) diff --git a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExample2.scala b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExample2.scala index 91b8a64b3b..7eeec37d95 100644 --- a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExample2.scala +++ b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExample2.scala @@ -61,7 +61,7 @@ object AccountExample2 { else { Effect .persist(Withdrawn(amount)) - .andThen { + .thenRun { case OpenedAccount(balance) ⇒ // do some side-effect using balance println(balance) diff --git a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorsCompileOnly.scala b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorsCompileOnly.scala index ae5a421c7d..a72b942fdb 100644 --- a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorsCompileOnly.scala +++ b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorsCompileOnly.scala @@ -4,6 +4,7 @@ package docs.akka.persistence.typed +import akka.actor.typed.ActorRef import akka.actor.typed.{ Behavior, SupervisorStrategy } import akka.actor.typed.scaladsl.Behaviors import akka.persistence.typed.scaladsl.PersistentBehaviors @@ -30,6 +31,9 @@ object BasicPersistentBehaviorsCompileOnly { ) //#structure + case class CommandWithSender(reply: ActorRef[String]) extends Command + case class VeryImportantEvent() extends Event + //#recovery val recoveryBehavior: Behavior[Command] = PersistentBehaviors.receive[Command, Event, State]( @@ -58,7 +62,6 @@ object BasicPersistentBehaviorsCompileOnly { (state, evt) ⇒ throw new RuntimeException("TODO: process the event return the next state") ).withTagger(_ ⇒ Set("tag1", "tag2")) - //#tagging //#wrapPersistentBehavior @@ -94,4 +97,5 @@ object BasicPersistentBehaviorsCompileOnly { randomFactor = 0.1 )) //#supervision + } diff --git a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentBehaviorSpec.scala b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentBehaviorSpec.scala index e64f5b1e46..78c0abcfc2 100644 --- a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentBehaviorSpec.scala +++ b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentBehaviorSpec.scala @@ -58,7 +58,7 @@ object InDepthPersistentBehaviorSpec { cmd match { case AddPost(content, replyTo) ⇒ val evt = PostAdded(content.postId, content) - Effect.persist(evt).andThen { state2 ⇒ + Effect.persist(evt).thenRun { state2 ⇒ // After persist is done additional side effects can be performed replyTo ! AddPostDone(content.postId) } @@ -75,11 +75,11 @@ object InDepthPersistentBehaviorSpec { cmd match { case ChangeBody(newBody, replyTo) ⇒ val evt = BodyChanged(state.postId, newBody) - Effect.persist(evt).andThen { _ ⇒ + Effect.persist(evt).thenRun { _ ⇒ replyTo ! Done } case Publish(replyTo) ⇒ - Effect.persist(Published(state.postId)).andThen { _ ⇒ + Effect.persist(Published(state.postId)).thenRun { _ ⇒ println(s"Blog post ${state.postId} was published") replyTo ! Done }