unstashAll as terminal Effect, #26489

* Effect.persist(event).thenUnstashAll().thenRun(..) can be misinterpreted
  as the callback of thenRun is invoked when all unstashing has been completed,
  while it is actually running the callback first and the the unstashing process
  follows.
* Unstashing is a process where stashed commands are processed one-by-one
  and waiting for persist effects to complete before processing next.
* Even if we would come up with a way to keep pending callbacks around during
  the unstashing it would probably be complicated for user to reason about it.
  Suddenly a callback is executed from an old command although several other
  commands (that were stashed/unstashed) have been processed inbetween.
* This change makes the unstashAll Effect terminal, meaning that additional
  effects like thenRun can't be added after unstashAll.
* ReplyEffect is also terminal, which makes sense since it's supposed to be
  returned effect. It must still be possible to combine with thenUnstashAll,
  thenReply.thenUnstashAll.
This commit is contained in:
Patrik Nordwall 2019-03-25 20:15:46 +01:00
parent 08abca7956
commit 11a18370c4
5 changed files with 124 additions and 85 deletions

View file

@ -5,17 +5,19 @@
package akka.persistence.typed.internal package akka.persistence.typed.internal
import scala.collection.immutable import scala.collection.immutable
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.persistence.typed.ExpectingReply import akka.persistence.typed.ExpectingReply
import akka.persistence.typed.javadsl import akka.persistence.typed.javadsl
import akka.persistence.typed.scaladsl import akka.persistence.typed.scaladsl
import akka.persistence.typed.scaladsl.ReplyEffect
/** INTERNAL API */ /** INTERNAL API */
@InternalApi @InternalApi
private[akka] abstract class EffectImpl[+Event, State] private[akka] abstract class EffectImpl[+Event, State]
extends javadsl.ReplyEffect[Event, State] extends javadsl.EffectBuilder[Event, State]
with scaladsl.ReplyEffect[Event, State] { with javadsl.ReplyEffect[Event, State]
with scaladsl.ReplyEffect[Event, State]
with scaladsl.EffectBuilder[Event, State] {
/* All events that will be persisted in this effect */ /* All events that will be persisted in this effect */
override def events: immutable.Seq[Event] = Nil override def events: immutable.Seq[Event] = Nil
@ -23,7 +25,7 @@ private[akka] abstract class EffectImpl[+Event, State]
CompositeEffect(this, new Callback[State](chainedEffect)) CompositeEffect(this, new Callback[State](chainedEffect))
override def thenReply[ReplyMessage](cmd: ExpectingReply[ReplyMessage])( override def thenReply[ReplyMessage](cmd: ExpectingReply[ReplyMessage])(
replyWithMessage: State => ReplyMessage): ReplyEffect[Event, State] = replyWithMessage: State => ReplyMessage): EffectImpl[Event, State] =
CompositeEffect(this, new ReplyEffectImpl[ReplyMessage, State](cmd.replyTo, replyWithMessage)) CompositeEffect(this, new ReplyEffectImpl[ReplyMessage, State](cmd.replyTo, replyWithMessage))
override def thenUnstashAll(): EffectImpl[Event, State] = override def thenUnstashAll(): EffectImpl[Event, State] =
@ -41,7 +43,7 @@ private[akka] abstract class EffectImpl[+Event, State]
@InternalApi @InternalApi
private[akka] object CompositeEffect { private[akka] object CompositeEffect {
def apply[Event, State]( def apply[Event, State](
effect: scaladsl.Effect[Event, State], effect: scaladsl.EffectBuilder[Event, State],
sideEffects: SideEffect[State]): CompositeEffect[Event, State] = sideEffects: SideEffect[State]): CompositeEffect[Event, State] =
CompositeEffect[Event, State](effect, sideEffects :: Nil) CompositeEffect[Event, State](effect, sideEffects :: Nil)
} }
@ -49,7 +51,7 @@ private[akka] object CompositeEffect {
/** INTERNAL API */ /** INTERNAL API */
@InternalApi @InternalApi
private[akka] final case class CompositeEffect[Event, State]( private[akka] final case class CompositeEffect[Event, State](
persistingEffect: scaladsl.Effect[Event, State], persistingEffect: scaladsl.EffectBuilder[Event, State],
_sideEffects: immutable.Seq[SideEffect[State]]) _sideEffects: immutable.Seq[SideEffect[State]])
extends EffectImpl[Event, State] { extends EffectImpl[Event, State] {

View file

@ -18,7 +18,7 @@ import akka.persistence.typed.internal._
@InternalApi private[akka] object EffectFactories extends EffectFactories[Nothing, Nothing] @InternalApi private[akka] object EffectFactories extends EffectFactories[Nothing, Nothing]
/** /**
* Factory methods for creating [[Effect]] directives - how a persistent actor reacts on a command. * Factory methods for creating [[Effect]] directives - how an event sourced actor reacts on a command.
* Created via [[EventSourcedBehavior.Effect]]. * Created via [[EventSourcedBehavior.Effect]].
* *
* Not for user extension * Not for user extension
@ -28,29 +28,29 @@ import akka.persistence.typed.internal._
/** /**
* Persist a single event * Persist a single event
*/ */
final def persist(event: Event): Effect[Event, State] = Persist(event) final def persist(event: Event): EffectBuilder[Event, State] = Persist(event)
/** /**
* Persist all of a the given events. Each event will be applied through `applyEffect` separately but not until * Persist all of a the given events. Each event will be applied through `applyEffect` separately but not until
* all events has been persisted. If an `afterCallBack` is added through [[Effect#andThen]] that will invoked * all events has been persisted. If `callback` is added through [[Effect#thenRun]] that will invoked
* after all the events has been persisted. * after all the events has been persisted.
*/ */
final def persist(events: java.util.List[Event]): Effect[Event, State] = PersistAll(events.asScala.toVector) final def persist(events: java.util.List[Event]): EffectBuilder[Event, State] = PersistAll(events.asScala.toVector)
/** /**
* Do not persist anything * Do not persist anything
*/ */
def none(): Effect[Event, State] = PersistNothing.asInstanceOf[Effect[Event, State]] def none(): EffectBuilder[Event, State] = PersistNothing.asInstanceOf[EffectBuilder[Event, State]]
/** /**
* Stop this persistent actor * Stop this persistent actor
*/ */
def stop(): Effect[Event, State] = none().thenStop() def stop(): EffectBuilder[Event, State] = none().thenStop()
/** /**
* This command is not handled, but it is not an error that it isn't. * This command is not handled, but it is not an error that it isn't.
*/ */
def unhandled(): Effect[Event, State] = Unhandled.asInstanceOf[Effect[Event, State]] def unhandled(): EffectBuilder[Event, State] = Unhandled.asInstanceOf[EffectBuilder[Event, State]]
/** /**
* Stash the current command. Can be unstashed later with `Effect.thenUnstashAll` * Stash the current command. Can be unstashed later with `Effect.thenUnstashAll`
@ -61,10 +61,10 @@ import akka.persistence.typed.internal._
* thrown from processing a command or side effect after persisting. The stash buffer is preserved for persist * thrown from processing a command or side effect after persisting. The stash buffer is preserved for persist
* failures if an `onPersistFailure` backoff supervisor strategy is defined. * failures if an `onPersistFailure` backoff supervisor strategy is defined.
* *
* Side effects can be chained with `andThen`. * Side effects can be chained with `thenRun`.
*/ */
def stash(): ReplyEffect[Event, State] = def stash(): ReplyEffect[Event, State] =
Stash.asInstanceOf[Effect[Event, State]].thenNoReply() Stash.asInstanceOf[EffectBuilder[Event, State]].thenNoReply()
/** /**
* Unstash the commands that were stashed with `EffectFactories.stash`. * Unstash the commands that were stashed with `EffectFactories.stash`.
@ -73,9 +73,6 @@ import akka.persistence.typed.internal._
* commands will not be processed by this `unstashAll` effect and have to be unstashed * commands will not be processed by this `unstashAll` effect and have to be unstashed
* by another `unstashAll`. * by another `unstashAll`.
* *
* Side effects can be chained with `andThen`, but note that the side effect is run immediately and not after
* processing all unstashed commands.
*
* @see [[Effect.thenUnstashAll]] * @see [[Effect.thenUnstashAll]]
*/ */
def unstashAll(): Effect[Event, State] = def unstashAll(): Effect[Event, State] =
@ -112,13 +109,24 @@ import akka.persistence.typed.internal._
/** /**
* A command handler returns an `Effect` directive that defines what event or events to persist. * A command handler returns an `Effect` directive that defines what event or events to persist.
* *
* Additional side effects can be performed in the callback `andThen` * Instances of `Effect` are available through factories [[EventSourcedBehavior.Effect]].
*
* Not intended for user extension.
*/
@DoNotInherit trait Effect[+Event, State] {
self: EffectImpl[Event, State] =>
}
/**
* A command handler returns an `Effect` directive that defines what event or events to persist.
*
* Additional side effects can be performed in the callback `thenRun`
* *
* Instances of `Effect` are available through factories [[EventSourcedBehavior.Effect]]. * Instances of `Effect` are available through factories [[EventSourcedBehavior.Effect]].
* *
* Not intended for user extension. * Not intended for user extension.
*/ */
@DoNotInherit abstract class Effect[+Event, State] { @DoNotInherit abstract class EffectBuilder[+Event, State] extends Effect[Event, State] {
self: EffectImpl[Event, State] => self: EffectImpl[Event, State] =>
/** /**
@ -130,17 +138,17 @@ import akka.persistence.typed.internal._
* If the state is not of the expected type an [[java.lang.ClassCastException]] is thrown. * If the state is not of the expected type an [[java.lang.ClassCastException]] is thrown.
* *
*/ */
final def thenRun[NewState <: State](callback: function.Procedure[NewState]): Effect[Event, State] = final def thenRun[NewState <: State](callback: function.Procedure[NewState]): EffectBuilder[Event, State] =
CompositeEffect(this, SideEffect[State](s => callback.apply(s.asInstanceOf[NewState]))) CompositeEffect(this, SideEffect[State](s => callback.apply(s.asInstanceOf[NewState])))
/** /**
* Run the given callback. Callbacks are run sequentially. * Run the given callback. Callbacks are run sequentially.
*/ */
final def thenRun(callback: function.Effect): Effect[Event, State] = final def thenRun(callback: function.Effect): EffectBuilder[Event, State] =
CompositeEffect(this, SideEffect[State]((_: State) => callback.apply())) CompositeEffect(this, SideEffect[State]((_: State) => callback.apply()))
/** The side effect is to stop the actor */ /** The side effect is to stop the actor */
def thenStop(): Effect[Event, State] def thenStop(): EffectBuilder[Event, State]
/** /**
* Unstash the commands that were stashed with `EffectFactories.stash`. * Unstash the commands that were stashed with `EffectFactories.stash`.
@ -182,6 +190,15 @@ import akka.persistence.typed.internal._
* Then there will be compilation errors if the returned effect isn't a [[ReplyEffect]], which can be * Then there will be compilation errors if the returned effect isn't a [[ReplyEffect]], which can be
* created with `Effects().reply`, `Effects().noReply`, [[Effect.thenReply]], or [[Effect.thenNoReply]]. * created with `Effects().reply`, `Effects().noReply`, [[Effect.thenReply]], or [[Effect.thenNoReply]].
*/ */
@DoNotInherit abstract class ReplyEffect[+Event, State] extends Effect[Event, State] { @DoNotInherit trait ReplyEffect[+Event, State] extends Effect[Event, State] {
self: EffectImpl[Event, State] => self: EffectImpl[Event, State] =>
/**
* Unstash the commands that were stashed with `EffectFactories.stash`.
*
* It's allowed to stash messages while unstashing. Those newly added
* commands will not be processed by this `unstashAll` effect and have to be unstashed
* by another `unstashAll`.
*/
def thenUnstashAll(): ReplyEffect[Event, State]
} }

View file

@ -11,52 +11,53 @@ import akka.persistence.typed.internal.SideEffect
import akka.persistence.typed.internal._ import akka.persistence.typed.internal._
/** /**
* Factory methods for creating [[Effect]] directives - how a persistent actor reacts on a command. * Factory methods for creating [[Effect]] directives - how an event sourced actor reacts on a command.
*/ */
object Effect { object Effect {
/** /**
* Persist a single event * Persist a single event
* *
* Side effects can be chained with `andThen` * Side effects can be chained with `thenRun`
*/ */
def persist[Event, State](event: Event): Effect[Event, State] = Persist(event) def persist[Event, State](event: Event): EffectBuilder[Event, State] = Persist(event)
/** /**
* Persist multiple events * Persist multiple events
* *
* Side effects can be chained with `andThen` * Side effects can be chained with `thenRun`
*/ */
def persist[Event, A <: Event, B <: Event, State](evt1: A, evt2: B, events: Event*): Effect[Event, State] = def persist[Event, A <: Event, B <: Event, State](evt1: A, evt2: B, events: Event*): EffectBuilder[Event, State] =
persist(evt1 :: evt2 :: events.toList) persist(evt1 :: evt2 :: events.toList)
/** /**
* Persist multiple events * Persist multiple events
* *
* Side effects can be chained with `andThen` * Side effects can be chained with `thenRun`
*/ */
def persist[Event, State](events: im.Seq[Event]): Effect[Event, State] = def persist[Event, State](events: im.Seq[Event]): EffectBuilder[Event, State] =
PersistAll(events) PersistAll(events)
/** /**
* Do not persist anything * Do not persist anything
* *
* Side effects can be chained with `andThen` * Side effects can be chained with `thenRun`
*/ */
def none[Event, State]: Effect[Event, State] = PersistNothing.asInstanceOf[Effect[Event, State]] def none[Event, State]: EffectBuilder[Event, State] = PersistNothing.asInstanceOf[EffectBuilder[Event, State]]
/** /**
* This command is not handled, but it is not an error that it isn't. * This command is not handled, but it is not an error that it isn't.
* *
* Side effects can be chained with `andThen` * Side effects can be chained with `thenRun`
*/ */
def unhandled[Event, State]: Effect[Event, State] = Unhandled.asInstanceOf[Effect[Event, State]] def unhandled[Event, State]: EffectBuilder[Event, State] = Unhandled.asInstanceOf[EffectBuilder[Event, State]]
/** /**
* Stop this persistent actor * Stop this persistent actor
* Side effects can be chained with `andThen` * Side effects can be chained with `thenRun`
*/ */
def stop[Event, State](): Effect[Event, State] = none.thenStop() def stop[Event, State](): EffectBuilder[Event, State] =
none.thenStop()
/** /**
* Stash the current command. Can be unstashed later with [[Effect.unstashAll]]. * Stash the current command. Can be unstashed later with [[Effect.unstashAll]].
@ -66,10 +67,10 @@ object Effect {
* thrown from processing a command or side effect after persisting. The stash buffer is preserved for persist * thrown from processing a command or side effect after persisting. The stash buffer is preserved for persist
* failures if a backoff supervisor strategy is defined with [[EventSourcedBehavior.onPersistFailure]]. * failures if a backoff supervisor strategy is defined with [[EventSourcedBehavior.onPersistFailure]].
* *
* Side effects can be chained with `andThen` * Side effects can be chained with `thenRun`
*/ */
def stash[Event, State](): ReplyEffect[Event, State] = def stash[Event, State](): ReplyEffect[Event, State] =
Stash.asInstanceOf[Effect[Event, State]].thenNoReply() Stash.asInstanceOf[EffectBuilder[Event, State]].thenNoReply()
/** /**
* Unstash the commands that were stashed with [[Effect.stash]]. * Unstash the commands that were stashed with [[Effect.stash]].
@ -78,13 +79,10 @@ object Effect {
* commands will not be processed by this `unstashAll` effect and have to be unstashed * commands will not be processed by this `unstashAll` effect and have to be unstashed
* by another `unstashAll`. * by another `unstashAll`.
* *
* Side effects can be chained with `andThen`, but note that the side effect is run immediately and not after
* processing all unstashed commands.
*
* @see [[Effect.thenUnstashAll]] * @see [[Effect.thenUnstashAll]]
*/ */
def unstashAll[Event, State](): Effect[Event, State] = def unstashAll[Event, State](): Effect[Event, State] =
CompositeEffect(none.asInstanceOf[Effect[Event, State]], SideEffect.unstashAll[State]()) CompositeEffect(none.asInstanceOf[EffectBuilder[Event, State]], SideEffect.unstashAll[State]())
/** /**
* Send a reply message to the command, which implements [[ExpectingReply]]. The type of the * Send a reply message to the command, which implements [[ExpectingReply]]. The type of the
@ -113,22 +111,36 @@ object Effect {
} }
/** /**
* A command handler returns an `Effect` directive that defines what event or events to persist.
*
* Instances are created through the factories in the [[Effect]] companion object. * Instances are created through the factories in the [[Effect]] companion object.
* *
* Not for user extension. * Not for user extension.
*/ */
@DoNotInherit @DoNotInherit
trait Effect[+Event, State] { trait Effect[+Event, State]
/**
* A command handler returns an `Effect` directive that defines what event or events to persist.
*
* Instances are created through the factories in the [[Effect]] companion object.
*
* Additional side effects can be performed in the callback `thenRun`
*
* Not for user extension.
*/
@DoNotInherit
trait EffectBuilder[+Event, State] extends Effect[Event, State] {
/* All events that will be persisted in this effect */ /* All events that will be persisted in this effect */
def events: im.Seq[Event] def events: im.Seq[Event]
/** /**
* Run the given callback. Callbacks are run sequentially. * Run the given callback. Callbacks are run sequentially.
*/ */
def thenRun(callback: State => Unit): Effect[Event, State] def thenRun(callback: State => Unit): EffectBuilder[Event, State]
/** The side effect is to stop the actor */ /** The side effect is to stop the actor */
def thenStop(): Effect[Event, State] def thenStop(): EffectBuilder[Event, State]
/** /**
* Unstash the commands that were stashed with [[Effect.stash]]. * Unstash the commands that were stashed with [[Effect.stash]].
@ -170,4 +182,14 @@ trait Effect[+Event, State] {
* *
* Not intended for user extension. * Not intended for user extension.
*/ */
@DoNotInherit trait ReplyEffect[+Event, State] extends Effect[Event, State] @DoNotInherit trait ReplyEffect[+Event, State] extends Effect[Event, State] {
/**
* Unstash the commands that were stashed with [[Effect.stash]].
*
* It's allowed to stash messages while unstashing. Those newly added
* commands will not be processed by this `unstashAll` effect and have to be unstashed
* by another `unstashAll`.
*/
def thenUnstashAll(): ReplyEffect[Event, State]
}

View file

@ -152,7 +152,7 @@ object EventSourcedBehaviorStashSpec {
// already inactive // already inactive
Effect.reply(cmd)(Ack(cmd.id)) Effect.reply(cmd)(Ack(cmd.id))
case cmd: Activate => case cmd: Activate =>
Effect.persist(Activated).thenUnstashAll().thenReply(cmd)(_ => Ack(cmd.id)) Effect.persist(Activated).thenReply(cmd)((_: State) => Ack(cmd.id)).thenUnstashAll()
case _: Unhandled => case _: Unhandled =>
Effect.unhandled.thenNoReply() Effect.unhandled.thenNoReply()
case Throw(id, t, replyTo) => case Throw(id, t, replyTo) =>
@ -505,40 +505,38 @@ class EventSourcedBehaviorStashSpec
"discard when stash has reached limit with default dropped setting" in { "discard when stash has reached limit with default dropped setting" in {
val probe = TestProbe[AnyRef]() val probe = TestProbe[AnyRef]()
system.toUntyped.eventStream.subscribe(probe.ref.toUntyped, classOf[Dropped]) system.toUntyped.eventStream.subscribe(probe.ref.toUntyped, classOf[Dropped])
val behavior = EventSourcedBehavior[String, String, Boolean]( val behavior = Behaviors.setup[String] { context =>
persistenceId = PersistenceId("stash-is-full-drop"), EventSourcedBehavior[String, String, Boolean](
emptyState = false, persistenceId = PersistenceId("stash-is-full-drop"),
commandHandler = { (state, command) => emptyState = false,
state match { commandHandler = { (state, command) =>
case false => state match {
command match { case false =>
case "ping" => command match {
probe.ref ! "pong" case "ping" =>
Effect.none probe.ref ! "pong"
case "start-stashing" => Effect.none
Effect.persist("start-stashing") case "start-stashing" =>
case msg => Effect.persist("start-stashing")
probe.ref ! msg case msg =>
Effect.none probe.ref ! msg
} Effect.none
}
case true => case true =>
command match { command match {
case "unstash" => case "unstash" =>
Effect Effect.persist("unstash").thenRun((_: Boolean) => context.self ! "done-unstashing").thenUnstashAll()
.persist("unstash") case _ =>
.thenUnstashAll() Effect.stash()
// FIXME #26489: this is run before unstash, so not sequentially as the docs say }
.thenRun(_ => probe.ref ! "done-unstashing") }
case _ => }, {
Effect.stash() case (_, "start-stashing") => true
} case (_, "unstash") => false
} case (_, _) => throw new IllegalArgumentException()
}, { })
case (_, "start-stashing") => true }
case (_, "unstash") => false
case (_, _) => throw new IllegalArgumentException()
})
val c = spawn(behavior) val c = spawn(behavior)
@ -558,10 +556,10 @@ class EventSourcedBehaviorStashSpec
// we can still unstash and continue interacting // we can still unstash and continue interacting
c ! "unstash" c ! "unstash"
probe.expectMessage("done-unstashing") // before actually unstashing, see above (0 until limit).foreach { n =>
(0 to (limit - 1)).foreach { n =>
probe.expectMessage(s"cmd-$n") probe.expectMessage(s"cmd-$n")
} }
probe.expectMessage("done-unstashing") // before actually unstashing, see above
c ! "ping" c ! "ping"
probe.expectMessage("pong") probe.expectMessage("pong")

View file

@ -310,7 +310,7 @@ object PersistentActorCompileOnlyTest {
case class MoodChanged(to: Mood) extends Event case class MoodChanged(to: Mood) extends Event
case class Remembered(memory: String) extends Event case class Remembered(memory: String) extends Event
def changeMoodIfNeeded(currentState: Mood, newMood: Mood): Effect[Event, Mood] = def changeMoodIfNeeded(currentState: Mood, newMood: Mood): EffectBuilder[Event, Mood] =
if (currentState == newMood) Effect.none if (currentState == newMood) Effect.none
else Effect.persist(MoodChanged(newMood)) else Effect.persist(MoodChanged(newMood))
@ -337,7 +337,7 @@ object PersistentActorCompileOnlyTest {
case Remember(memory) => case Remember(memory) =>
// A more elaborate example to show we still have full control over the effects // A more elaborate example to show we still have full control over the effects
// if needed (e.g. when some logic is factored out but you want to add more effects) // if needed (e.g. when some logic is factored out but you want to add more effects)
val commonEffects: Effect[Event, Mood] = changeMoodIfNeeded(state, Happy) val commonEffects: EffectBuilder[Event, Mood] = changeMoodIfNeeded(state, Happy)
Effect.persist(commonEffects.events :+ Remembered(memory)).thenRun(commonChainedEffects) Effect.persist(commonEffects.events :+ Remembered(memory)).thenRun(commonChainedEffects)
} }
} }