diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EffectImpl.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EffectImpl.scala index bdcfd0ffbf..7fb0ff3d6c 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EffectImpl.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EffectImpl.scala @@ -5,17 +5,19 @@ package akka.persistence.typed.internal import scala.collection.immutable + import akka.annotation.InternalApi import akka.persistence.typed.ExpectingReply import akka.persistence.typed.javadsl import akka.persistence.typed.scaladsl -import akka.persistence.typed.scaladsl.ReplyEffect /** INTERNAL API */ @InternalApi private[akka] abstract class EffectImpl[+Event, State] - extends javadsl.ReplyEffect[Event, State] - with scaladsl.ReplyEffect[Event, State] { + extends javadsl.EffectBuilder[Event, State] + with javadsl.ReplyEffect[Event, State] + with scaladsl.ReplyEffect[Event, State] + with scaladsl.EffectBuilder[Event, State] { /* All events that will be persisted in this effect */ override def events: immutable.Seq[Event] = Nil @@ -23,7 +25,7 @@ private[akka] abstract class EffectImpl[+Event, State] CompositeEffect(this, new Callback[State](chainedEffect)) override def thenReply[ReplyMessage](cmd: ExpectingReply[ReplyMessage])( - replyWithMessage: State => ReplyMessage): ReplyEffect[Event, State] = + replyWithMessage: State => ReplyMessage): EffectImpl[Event, State] = CompositeEffect(this, new ReplyEffectImpl[ReplyMessage, State](cmd.replyTo, replyWithMessage)) override def thenUnstashAll(): EffectImpl[Event, State] = @@ -41,7 +43,7 @@ private[akka] abstract class EffectImpl[+Event, State] @InternalApi private[akka] object CompositeEffect { def apply[Event, State]( - effect: scaladsl.Effect[Event, State], + effect: scaladsl.EffectBuilder[Event, State], sideEffects: SideEffect[State]): CompositeEffect[Event, State] = CompositeEffect[Event, State](effect, sideEffects :: Nil) } @@ -49,7 +51,7 @@ private[akka] object CompositeEffect { /** INTERNAL API */ @InternalApi private[akka] final case class CompositeEffect[Event, State]( - persistingEffect: scaladsl.Effect[Event, State], + persistingEffect: scaladsl.EffectBuilder[Event, State], _sideEffects: immutable.Seq[SideEffect[State]]) extends EffectImpl[Event, State] { diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/Effect.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/Effect.scala index a6d15c4235..b8afded1c9 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/Effect.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/Effect.scala @@ -18,7 +18,7 @@ import akka.persistence.typed.internal._ @InternalApi private[akka] object EffectFactories extends EffectFactories[Nothing, Nothing] /** - * Factory methods for creating [[Effect]] directives - how a persistent actor reacts on a command. + * Factory methods for creating [[Effect]] directives - how an event sourced actor reacts on a command. * Created via [[EventSourcedBehavior.Effect]]. * * Not for user extension @@ -28,29 +28,29 @@ import akka.persistence.typed.internal._ /** * Persist a single event */ - final def persist(event: Event): Effect[Event, State] = Persist(event) + final def persist(event: Event): EffectBuilder[Event, State] = Persist(event) /** * Persist all of a the given events. Each event will be applied through `applyEffect` separately but not until - * all events has been persisted. If an `afterCallBack` is added through [[Effect#andThen]] that will invoked + * all events has been persisted. If `callback` is added through [[Effect#thenRun]] that will invoked * after all the events has been persisted. */ - final def persist(events: java.util.List[Event]): Effect[Event, State] = PersistAll(events.asScala.toVector) + final def persist(events: java.util.List[Event]): EffectBuilder[Event, State] = PersistAll(events.asScala.toVector) /** * Do not persist anything */ - def none(): Effect[Event, State] = PersistNothing.asInstanceOf[Effect[Event, State]] + def none(): EffectBuilder[Event, State] = PersistNothing.asInstanceOf[EffectBuilder[Event, State]] /** * Stop this persistent actor */ - def stop(): Effect[Event, State] = none().thenStop() + def stop(): EffectBuilder[Event, State] = none().thenStop() /** * This command is not handled, but it is not an error that it isn't. */ - def unhandled(): Effect[Event, State] = Unhandled.asInstanceOf[Effect[Event, State]] + def unhandled(): EffectBuilder[Event, State] = Unhandled.asInstanceOf[EffectBuilder[Event, State]] /** * Stash the current command. Can be unstashed later with `Effect.thenUnstashAll` @@ -61,10 +61,10 @@ import akka.persistence.typed.internal._ * thrown from processing a command or side effect after persisting. The stash buffer is preserved for persist * failures if an `onPersistFailure` backoff supervisor strategy is defined. * - * Side effects can be chained with `andThen`. + * Side effects can be chained with `thenRun`. */ def stash(): ReplyEffect[Event, State] = - Stash.asInstanceOf[Effect[Event, State]].thenNoReply() + Stash.asInstanceOf[EffectBuilder[Event, State]].thenNoReply() /** * Unstash the commands that were stashed with `EffectFactories.stash`. @@ -73,9 +73,6 @@ import akka.persistence.typed.internal._ * commands will not be processed by this `unstashAll` effect and have to be unstashed * by another `unstashAll`. * - * Side effects can be chained with `andThen`, but note that the side effect is run immediately and not after - * processing all unstashed commands. - * * @see [[Effect.thenUnstashAll]] */ def unstashAll(): Effect[Event, State] = @@ -112,13 +109,24 @@ import akka.persistence.typed.internal._ /** * A command handler returns an `Effect` directive that defines what event or events to persist. * - * Additional side effects can be performed in the callback `andThen` + * Instances of `Effect` are available through factories [[EventSourcedBehavior.Effect]]. + * + * Not intended for user extension. + */ +@DoNotInherit trait Effect[+Event, State] { + self: EffectImpl[Event, State] => +} + +/** + * A command handler returns an `Effect` directive that defines what event or events to persist. + * + * Additional side effects can be performed in the callback `thenRun` * * Instances of `Effect` are available through factories [[EventSourcedBehavior.Effect]]. * * Not intended for user extension. */ -@DoNotInherit abstract class Effect[+Event, State] { +@DoNotInherit abstract class EffectBuilder[+Event, State] extends Effect[Event, State] { self: EffectImpl[Event, State] => /** @@ -130,17 +138,17 @@ import akka.persistence.typed.internal._ * If the state is not of the expected type an [[java.lang.ClassCastException]] is thrown. * */ - final def thenRun[NewState <: State](callback: function.Procedure[NewState]): Effect[Event, State] = + final def thenRun[NewState <: State](callback: function.Procedure[NewState]): EffectBuilder[Event, State] = CompositeEffect(this, SideEffect[State](s => callback.apply(s.asInstanceOf[NewState]))) /** * Run the given callback. Callbacks are run sequentially. */ - final def thenRun(callback: function.Effect): Effect[Event, State] = + final def thenRun(callback: function.Effect): EffectBuilder[Event, State] = CompositeEffect(this, SideEffect[State]((_: State) => callback.apply())) /** The side effect is to stop the actor */ - def thenStop(): Effect[Event, State] + def thenStop(): EffectBuilder[Event, State] /** * Unstash the commands that were stashed with `EffectFactories.stash`. @@ -182,6 +190,15 @@ import akka.persistence.typed.internal._ * Then there will be compilation errors if the returned effect isn't a [[ReplyEffect]], which can be * created with `Effects().reply`, `Effects().noReply`, [[Effect.thenReply]], or [[Effect.thenNoReply]]. */ -@DoNotInherit abstract class ReplyEffect[+Event, State] extends Effect[Event, State] { +@DoNotInherit trait ReplyEffect[+Event, State] extends Effect[Event, State] { self: EffectImpl[Event, State] => + + /** + * Unstash the commands that were stashed with `EffectFactories.stash`. + * + * It's allowed to stash messages while unstashing. Those newly added + * commands will not be processed by this `unstashAll` effect and have to be unstashed + * by another `unstashAll`. + */ + def thenUnstashAll(): ReplyEffect[Event, State] } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/Effect.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/Effect.scala index 6036eca239..8b2c201e78 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/Effect.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/Effect.scala @@ -11,52 +11,53 @@ import akka.persistence.typed.internal.SideEffect import akka.persistence.typed.internal._ /** - * Factory methods for creating [[Effect]] directives - how a persistent actor reacts on a command. + * Factory methods for creating [[Effect]] directives - how an event sourced actor reacts on a command. */ object Effect { /** * Persist a single event * - * Side effects can be chained with `andThen` + * Side effects can be chained with `thenRun` */ - def persist[Event, State](event: Event): Effect[Event, State] = Persist(event) + def persist[Event, State](event: Event): EffectBuilder[Event, State] = Persist(event) /** * Persist multiple events * - * Side effects can be chained with `andThen` + * Side effects can be chained with `thenRun` */ - def persist[Event, A <: Event, B <: Event, State](evt1: A, evt2: B, events: Event*): Effect[Event, State] = + def persist[Event, A <: Event, B <: Event, State](evt1: A, evt2: B, events: Event*): EffectBuilder[Event, State] = persist(evt1 :: evt2 :: events.toList) /** * Persist multiple events * - * Side effects can be chained with `andThen` + * Side effects can be chained with `thenRun` */ - def persist[Event, State](events: im.Seq[Event]): Effect[Event, State] = + def persist[Event, State](events: im.Seq[Event]): EffectBuilder[Event, State] = PersistAll(events) /** * Do not persist anything * - * Side effects can be chained with `andThen` + * Side effects can be chained with `thenRun` */ - def none[Event, State]: Effect[Event, State] = PersistNothing.asInstanceOf[Effect[Event, State]] + def none[Event, State]: EffectBuilder[Event, State] = PersistNothing.asInstanceOf[EffectBuilder[Event, State]] /** * This command is not handled, but it is not an error that it isn't. * - * Side effects can be chained with `andThen` + * Side effects can be chained with `thenRun` */ - def unhandled[Event, State]: Effect[Event, State] = Unhandled.asInstanceOf[Effect[Event, State]] + def unhandled[Event, State]: EffectBuilder[Event, State] = Unhandled.asInstanceOf[EffectBuilder[Event, State]] /** * Stop this persistent actor - * Side effects can be chained with `andThen` + * Side effects can be chained with `thenRun` */ - def stop[Event, State](): Effect[Event, State] = none.thenStop() + def stop[Event, State](): EffectBuilder[Event, State] = + none.thenStop() /** * Stash the current command. Can be unstashed later with [[Effect.unstashAll]]. @@ -66,10 +67,10 @@ object Effect { * thrown from processing a command or side effect after persisting. The stash buffer is preserved for persist * failures if a backoff supervisor strategy is defined with [[EventSourcedBehavior.onPersistFailure]]. * - * Side effects can be chained with `andThen` + * Side effects can be chained with `thenRun` */ def stash[Event, State](): ReplyEffect[Event, State] = - Stash.asInstanceOf[Effect[Event, State]].thenNoReply() + Stash.asInstanceOf[EffectBuilder[Event, State]].thenNoReply() /** * Unstash the commands that were stashed with [[Effect.stash]]. @@ -78,13 +79,10 @@ object Effect { * commands will not be processed by this `unstashAll` effect and have to be unstashed * by another `unstashAll`. * - * Side effects can be chained with `andThen`, but note that the side effect is run immediately and not after - * processing all unstashed commands. - * * @see [[Effect.thenUnstashAll]] */ def unstashAll[Event, State](): Effect[Event, State] = - CompositeEffect(none.asInstanceOf[Effect[Event, State]], SideEffect.unstashAll[State]()) + CompositeEffect(none.asInstanceOf[EffectBuilder[Event, State]], SideEffect.unstashAll[State]()) /** * Send a reply message to the command, which implements [[ExpectingReply]]. The type of the @@ -113,22 +111,36 @@ object Effect { } /** + * A command handler returns an `Effect` directive that defines what event or events to persist. + * * Instances are created through the factories in the [[Effect]] companion object. * * Not for user extension. */ @DoNotInherit -trait Effect[+Event, State] { +trait Effect[+Event, State] + +/** + * A command handler returns an `Effect` directive that defines what event or events to persist. + * + * Instances are created through the factories in the [[Effect]] companion object. + * + * Additional side effects can be performed in the callback `thenRun` + * + * Not for user extension. + */ +@DoNotInherit +trait EffectBuilder[+Event, State] extends Effect[Event, State] { /* All events that will be persisted in this effect */ def events: im.Seq[Event] /** * Run the given callback. Callbacks are run sequentially. */ - def thenRun(callback: State => Unit): Effect[Event, State] + def thenRun(callback: State => Unit): EffectBuilder[Event, State] /** The side effect is to stop the actor */ - def thenStop(): Effect[Event, State] + def thenStop(): EffectBuilder[Event, State] /** * Unstash the commands that were stashed with [[Effect.stash]]. @@ -170,4 +182,14 @@ trait Effect[+Event, State] { * * Not intended for user extension. */ -@DoNotInherit trait ReplyEffect[+Event, State] extends Effect[Event, State] +@DoNotInherit trait ReplyEffect[+Event, State] extends Effect[Event, State] { + + /** + * Unstash the commands that were stashed with [[Effect.stash]]. + * + * It's allowed to stash messages while unstashing. Those newly added + * commands will not be processed by this `unstashAll` effect and have to be unstashed + * by another `unstashAll`. + */ + def thenUnstashAll(): ReplyEffect[Event, State] +} diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorStashSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorStashSpec.scala index 61fe771c5d..a6def1f462 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorStashSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorStashSpec.scala @@ -152,7 +152,7 @@ object EventSourcedBehaviorStashSpec { // already inactive Effect.reply(cmd)(Ack(cmd.id)) case cmd: Activate => - Effect.persist(Activated).thenUnstashAll().thenReply(cmd)(_ => Ack(cmd.id)) + Effect.persist(Activated).thenReply(cmd)((_: State) => Ack(cmd.id)).thenUnstashAll() case _: Unhandled => Effect.unhandled.thenNoReply() case Throw(id, t, replyTo) => @@ -505,40 +505,38 @@ class EventSourcedBehaviorStashSpec "discard when stash has reached limit with default dropped setting" in { val probe = TestProbe[AnyRef]() system.toUntyped.eventStream.subscribe(probe.ref.toUntyped, classOf[Dropped]) - val behavior = EventSourcedBehavior[String, String, Boolean]( - persistenceId = PersistenceId("stash-is-full-drop"), - emptyState = false, - commandHandler = { (state, command) => - state match { - case false => - command match { - case "ping" => - probe.ref ! "pong" - Effect.none - case "start-stashing" => - Effect.persist("start-stashing") - case msg => - probe.ref ! msg - Effect.none - } + val behavior = Behaviors.setup[String] { context => + EventSourcedBehavior[String, String, Boolean]( + persistenceId = PersistenceId("stash-is-full-drop"), + emptyState = false, + commandHandler = { (state, command) => + state match { + case false => + command match { + case "ping" => + probe.ref ! "pong" + Effect.none + case "start-stashing" => + Effect.persist("start-stashing") + case msg => + probe.ref ! msg + Effect.none + } - case true => - command match { - case "unstash" => - Effect - .persist("unstash") - .thenUnstashAll() - // FIXME #26489: this is run before unstash, so not sequentially as the docs say - .thenRun(_ => probe.ref ! "done-unstashing") - case _ => - Effect.stash() - } - } - }, { - case (_, "start-stashing") => true - case (_, "unstash") => false - case (_, _) => throw new IllegalArgumentException() - }) + case true => + command match { + case "unstash" => + Effect.persist("unstash").thenRun((_: Boolean) => context.self ! "done-unstashing").thenUnstashAll() + case _ => + Effect.stash() + } + } + }, { + case (_, "start-stashing") => true + case (_, "unstash") => false + case (_, _) => throw new IllegalArgumentException() + }) + } val c = spawn(behavior) @@ -558,10 +556,10 @@ class EventSourcedBehaviorStashSpec // we can still unstash and continue interacting c ! "unstash" - probe.expectMessage("done-unstashing") // before actually unstashing, see above - (0 to (limit - 1)).foreach { n => + (0 until limit).foreach { n => probe.expectMessage(s"cmd-$n") } + probe.expectMessage("done-unstashing") // before actually unstashing, see above c ! "ping" probe.expectMessage("pong") diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala index 8cd3f21f4a..6ee818e221 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala @@ -310,7 +310,7 @@ object PersistentActorCompileOnlyTest { case class MoodChanged(to: Mood) extends Event case class Remembered(memory: String) extends Event - def changeMoodIfNeeded(currentState: Mood, newMood: Mood): Effect[Event, Mood] = + def changeMoodIfNeeded(currentState: Mood, newMood: Mood): EffectBuilder[Event, Mood] = if (currentState == newMood) Effect.none else Effect.persist(MoodChanged(newMood)) @@ -337,7 +337,7 @@ object PersistentActorCompileOnlyTest { case Remember(memory) => // A more elaborate example to show we still have full control over the effects // if needed (e.g. when some logic is factored out but you want to add more effects) - val commonEffects: Effect[Event, Mood] = changeMoodIfNeeded(state, Happy) + val commonEffects: EffectBuilder[Event, Mood] = changeMoodIfNeeded(state, Happy) Effect.persist(commonEffects.events :+ Remembered(memory)).thenRun(commonChainedEffects) } }