Separate out Effects and ChainedEffects in typed persistence (#25357)

* Separate out Effects and ChainedEffects in typed persistence

* Document order of execution for ChainedEffects
* Change stop to a just a ChainedEffect rather than both

Closes #25042
Closes #25041

* ChainedEffect renamed to SideEffect
This commit is contained in:
Christopher Batey 2018-08-03 09:15:49 +01:00 committed by Konrad `ktoso` Malawski
parent 729313c66f
commit 131e6d10d6
16 changed files with 214 additions and 102 deletions

View file

@ -54,20 +54,23 @@ The command handler is a function with @java[2 parameters for]@scala[3 parameter
current `State` and `Command`. current `State` and `Command`.
A command handler returns an `Effect` directive that defines what event or events, if any, to persist. A command handler returns an `Effect` directive that defines what event or events, if any, to persist.
@java[Effects are created using a factory that is returned via the `Effect()` method] Effects are created using @java[a factory that is returned via the `Effect()` method] @scala[the `Effect` factory]
@scala[Effects are created using the `Effect` factory] and can be one of:
and can be used to create various effects such as:
* `persist` will persist one single event or several events atomically, i.e. all events * `persist` will persist one single event or several events atomically, i.e. all events
are stored or none of them are stored if there is an error are stored or none of them are stored if there is an error
* `none` no events are to be persisted, for example a read-only command * `none` no events are to be persisted, for example a read-only command
* `unhandled` the command is unhandled (not supported) in current state * `unhandled` the command is unhandled (not supported) in current state
* `stop` stop this actor
External side effects are to be performed after successful persist which is achieved with the `andThen` function e.g @scala[`Effect.persist(..).andThen`]@java[`Effect().persist(..).andThen`]. In addition to returning the primary `Effect` for the command `PersistentBehavior`s can also
chain side effects (`SideEffect`s) are to be performed after successful persist which is achieved with the `andThen` and `thenRun`
function e.g @scala[`Effect.persist(..).andThen`]@java[`Effect().persist(..).andThen`]. The `thenRun` function
is a convenience around creating a `SideEffect`.
In the example below a reply is sent to the `replyTo` ActorRef. Note that the new state after applying In the example below a reply is sent to the `replyTo` ActorRef. Note that the new state after applying
the event is passed as parameter to the `andThen` function. All `andThen*` registered callbacks the event is passed as parameter to the `thenRun` function. All `thenRun` registered callbacks
are executed after successful execution of the persist statement (or immediately, in case of `none` and `unhandled`). are executed sequentially after successful execution of the persist statement (or immediately, in case of `none` and `unhandled`).
### Event handler ### Event handler
@ -126,6 +129,7 @@ The `PersistentBehavior` can then be run as with any plain typed actor as descri
@java[The `ActorContext` can be obtained with `Behaviors.setup` and be passed as a constructor parameter.] @java[The `ActorContext` can be obtained with `Behaviors.setup` and be passed as a constructor parameter.]
## Larger example ## Larger example
After processing a message, plain typed actors are able to return the `Behavior` that is used After processing a message, plain typed actors are able to return the `Behavior` that is used
@ -206,6 +210,34 @@ Scala
Java Java
: @@snip [InDepthPersistentBehaviorTest.java]($akka$/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/InDepthPersistentBehaviorTest.java) { #behavior } : @@snip [InDepthPersistentBehaviorTest.java]($akka$/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/InDepthPersistentBehaviorTest.java) { #behavior }
## Effects and Side Effects
Each command has a single `Effect` which can be:
* Persist events
* None: Accept the comment but no effects
* Unhandled: Don't handle this message
Note that there is only one of these. It is not possible to both persist and say none/unhandled.
These are created using @java[a factory that is returned via the `Effect()` method]
@scala[the `Effect` factory] and once created
additional `SideEffects` can be added.
Most of them time this will be done with the `thenRun` method on the `Effect` above. It is also possible
factor out common `SideEffect`s. For example:
Scala
: @@snip [BasicPersistentBehaviorsCompileOnly.scala]($akka$/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala) { #commonChainedEffects }
Java
: @@snip [BasicPersistentBehaviorsCompileOnly.scala]($akka$/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java) { #commonChainedEffects }
### Side effects ordering and guarantees
Any `SideEffect`s are executed on an at-once basis and will not be executed if the persist fails.
The `SideEffect`s are executed sequentially, it is not possible to execute `SideEffect`s in parallel.
## Serialization ## Serialization
The same @ref:[serialization](../serialization.md) mechanism as for untyped The same @ref:[serialization](../serialization.md) mechanism as for untyped

View file

@ -0,0 +1,45 @@
/*
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed
import akka.japi.function
import akka.annotation.{ DoNotInherit, InternalApi }
/**
* A [[SideEffect]] is an side effect that can be chained after a main effect.
*
* Persist, none and unhandled are main effects. Then any number of
* call backs can be added to these effects with `andThen`.
*
* Not for user extension
*/
sealed abstract class SideEffect[State]
/** INTERNAL API */
@InternalApi
final private[akka] case class Callback[State](effect: State Unit) extends SideEffect[State]
/** INTERNAL API */
@InternalApi
private[akka] case object Stop extends SideEffect[Nothing]
object SideEffect {
/**
* Create a ChainedEffect that can be run after Effects
*/
def apply[State](callback: State Unit): SideEffect[State] =
Callback(callback)
/**
* Java API
*
* Create a ChainedEffect that can be run after Effects
*/
def create[State](callback: function.Procedure[State]): SideEffect[State] =
Callback(s callback.apply(s))
def stop[State](): SideEffect[State] = Stop.asInstanceOf[SideEffect[State]]
}

View file

@ -4,11 +4,11 @@
package akka.persistence.typed.internal package akka.persistence.typed.internal
import akka.persistence.typed.javadsl import akka.persistence.typed.{ SideEffect, javadsl, scaladsl }
import akka.persistence.typed.scaladsl
import scala.collection.{ immutable im } import scala.collection.{ immutable im }
import akka.annotation.{ DoNotInherit, InternalApi } import akka.annotation.InternalApi
import akka.persistence.typed.scaladsl.Effect
/** INTERNAL API */ /** INTERNAL API */
@InternalApi @InternalApi
@ -16,27 +16,26 @@ private[akka] abstract class EffectImpl[+Event, State] extends javadsl.Effect[Ev
/* All events that will be persisted in this effect */ /* All events that will be persisted in this effect */
override def events: im.Seq[Event] = Nil override def events: im.Seq[Event] = Nil
/* All side effects that will be performed in this effect */ override def andThen(chainedEffect: SideEffect[State]): EffectImpl[Event, State] =
override def sideEffects[E >: Event]: im.Seq[ChainableEffect[E, State]] = Nil CompositeEffect(this, chainedEffect)
} }
/** INTERNAL API */ /** INTERNAL API */
@InternalApi @InternalApi
private[akka] object CompositeEffect { private[akka] object CompositeEffect {
def apply[Event, State](effect: EffectImpl[Event, State], sideEffects: ChainableEffect[Event, State]): EffectImpl[Event, State] = def apply[Event, State](effect: Effect[Event, State], sideEffects: SideEffect[State]): EffectImpl[Event, State] =
CompositeEffect[Event, State](effect, sideEffects :: Nil) CompositeEffect[Event, State](effect, sideEffects :: Nil)
} }
/** INTERNAL API */ /** INTERNAL API */
@InternalApi @InternalApi
private[akka] final case class CompositeEffect[Event, State]( private[akka] final case class CompositeEffect[Event, State](
persistingEffect: EffectImpl[Event, State], persistingEffect: Effect[Event, State],
_sideEffects: im.Seq[ChainableEffect[Event, State]]) extends EffectImpl[Event, State] { _sideEffects: im.Seq[SideEffect[State]]) extends EffectImpl[Event, State] {
override val events = persistingEffect.events override val events = persistingEffect.events
override def sideEffects[E >: Event]: im.Seq[ChainableEffect[E, State]] = _sideEffects.asInstanceOf[im.Seq[ChainableEffect[E, State]]]
override def toString: String = override def toString: String =
s"CompositeEffect($persistingEffect, sideEffects: ${_sideEffects.size})" s"CompositeEffect($persistingEffect, sideEffects: ${_sideEffects.size})"
} }
@ -55,21 +54,7 @@ private[akka] case class Persist[Event, State](event: Event) extends EffectImpl[
@InternalApi @InternalApi
private[akka] case class PersistAll[Event, State](override val events: im.Seq[Event]) extends EffectImpl[Event, State] private[akka] case class PersistAll[Event, State](override val events: im.Seq[Event]) extends EffectImpl[Event, State]
/** INTERNAL API */
@InternalApi
private[akka] case class SideEffect[Event, State](effect: State Unit) extends ChainableEffect[Event, State]
/** INTERNAL API */
@InternalApi
private[akka] case object Stop extends ChainableEffect[Nothing, Nothing]
/** INTERNAL API */ /** INTERNAL API */
@InternalApi @InternalApi
private[akka] case object Unhandled extends EffectImpl[Nothing, Nothing] private[akka] case object Unhandled extends EffectImpl[Nothing, Nothing]
/**
* Not for user extension
*/
@DoNotInherit
abstract class ChainableEffect[Event, State] extends EffectImpl[Event, State]

View file

@ -12,9 +12,10 @@ import akka.annotation.InternalApi
import akka.persistence.JournalProtocol._ import akka.persistence.JournalProtocol._
import akka.persistence._ import akka.persistence._
import akka.persistence.journal.Tagged import akka.persistence.journal.Tagged
import akka.persistence.typed.EventRejectedException import akka.persistence.typed.{ Callback, EventRejectedException, SideEffect, Stop }
import akka.persistence.typed.internal.EventsourcedBehavior.{ InternalProtocol, MDC } import akka.persistence.typed.internal.EventsourcedBehavior.{ InternalProtocol, MDC }
import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol._ import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol._
import akka.persistence.typed.scaladsl.Effect
import scala.annotation.tailrec import scala.annotation.tailrec
import scala.collection.immutable import scala.collection.immutable
@ -84,8 +85,8 @@ private[akka] object EventsourcedRunning {
@tailrec def applyEffects( @tailrec def applyEffects(
msg: Any, msg: Any,
state: EventsourcedState[S], state: EventsourcedState[S],
effect: EffectImpl[E, S], effect: Effect[E, S],
sideEffects: immutable.Seq[ChainableEffect[_, S]] = Nil sideEffects: immutable.Seq[SideEffect[S]] = Nil
): Behavior[InternalProtocol] = { ): Behavior[InternalProtocol] = {
if (setup.log.isDebugEnabled) if (setup.log.isDebugEnabled)
setup.log.debug( setup.log.debug(
@ -135,15 +136,12 @@ private[akka] object EventsourcedRunning {
tryUnstash(applySideEffects(sideEffects, state)) tryUnstash(applySideEffects(sideEffects, state))
} }
case _: PersistNothing.type @unchecked case _: PersistNothing.type
tryUnstash(applySideEffects(sideEffects, state)) tryUnstash(applySideEffects(sideEffects, state))
case _: Unhandled.type @unchecked case _: Unhandled.type
applySideEffects(sideEffects, state) applySideEffects(sideEffects, state)
Behavior.unhandled Behavior.unhandled
case c: ChainableEffect[_, S]
applySideEffect(c, state)
} }
} }
@ -172,7 +170,7 @@ private[akka] object EventsourcedRunning {
state: EventsourcedState[S], state: EventsourcedState[S],
numberOfEvents: Int, numberOfEvents: Int,
shouldSnapshotAfterPersist: Boolean, shouldSnapshotAfterPersist: Boolean,
sideEffects: immutable.Seq[ChainableEffect[_, S]] sideEffects: immutable.Seq[SideEffect[S]]
): Behavior[InternalProtocol] = { ): Behavior[InternalProtocol] = {
setup.setMdc(persistingEventsMdc) setup.setMdc(persistingEventsMdc)
new PersistingEvents(state, numberOfEvents, shouldSnapshotAfterPersist, sideEffects) new PersistingEvents(state, numberOfEvents, shouldSnapshotAfterPersist, sideEffects)
@ -182,7 +180,7 @@ private[akka] object EventsourcedRunning {
var state: EventsourcedState[S], var state: EventsourcedState[S],
numberOfEvents: Int, numberOfEvents: Int,
shouldSnapshotAfterPersist: Boolean, shouldSnapshotAfterPersist: Boolean,
var sideEffects: immutable.Seq[ChainableEffect[_, S]]) var sideEffects: immutable.Seq[SideEffect[S]])
extends MutableBehavior[EventsourcedBehavior.InternalProtocol] { extends MutableBehavior[EventsourcedBehavior.InternalProtocol] {
private var eventCounter = 0 private var eventCounter = 0
@ -277,7 +275,7 @@ private[akka] object EventsourcedRunning {
// -------------------------- // --------------------------
def applySideEffects(effects: immutable.Seq[ChainableEffect[_, S]], state: EventsourcedState[S]): Behavior[InternalProtocol] = { def applySideEffects(effects: immutable.Seq[SideEffect[S]], state: EventsourcedState[S]): Behavior[InternalProtocol] = {
var res: Behavior[InternalProtocol] = handlingCommands(state) var res: Behavior[InternalProtocol] = handlingCommands(state)
val it = effects.iterator val it = effects.iterator
@ -292,16 +290,16 @@ private[akka] object EventsourcedRunning {
res res
} }
def applySideEffect(effect: ChainableEffect[_, S], state: EventsourcedState[S]): Behavior[InternalProtocol] = effect match { def applySideEffect(effect: SideEffect[S], state: EventsourcedState[S]): Behavior[InternalProtocol] = effect match {
case _: Stop.type @unchecked case _: Stop.type @unchecked
Behaviors.stopped Behaviors.stopped
case SideEffect(sideEffects) case Callback(sideEffects)
sideEffects(state.state) sideEffects(state.state)
Behaviors.same Behaviors.same
case _ case _
throw new IllegalArgumentException(s"Not supported effect detected [${effect.getClass.getName}]!") throw new IllegalArgumentException(s"Not supported side effect detected [${effect.getClass.getName}]!")
} }
} }

View file

@ -7,6 +7,7 @@ package akka.persistence.typed.javadsl
import akka.annotation.DoNotInherit import akka.annotation.DoNotInherit
import akka.japi.function import akka.japi.function
import akka.persistence.typed.internal._ import akka.persistence.typed.internal._
import akka.persistence.typed.{ SideEffect, Stop }
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
@ -33,7 +34,7 @@ object EffectFactory extends EffectFactories[Nothing, Nothing, Nothing]
/** /**
* Stop this persistent actor * Stop this persistent actor
*/ */
def stop: Effect[Event, State] = Stop.asInstanceOf[ChainableEffect[Event, State]] def stop: Effect[Event, State] = none.thenStop()
/** /**
* This command is not handled, but it is not an error that it isn't. * This command is not handled, but it is not an error that it isn't.
@ -54,9 +55,15 @@ object EffectFactory extends EffectFactories[Nothing, Nothing, Nothing]
self: EffectImpl[Event, State] self: EffectImpl[Event, State]
/** Convenience method to register a side effect with just a callback function */ /** Convenience method to register a side effect with just a callback function */
final def andThen(callback: function.Procedure[State]): Effect[Event, State] = final def andThen(callback: function.Procedure[State]): Effect[Event, State] =
CompositeEffect(this, SideEffect[Event, State](s callback.apply(s))) CompositeEffect(this, SideEffect[State](s callback.apply(s)))
/** Convenience method to register a side effect that doesn't need access to state */ /** Convenience method to register a side effect that doesn't need access to state */
final def andThen(callback: function.Effect): Effect[Event, State] = final def andThen(callback: function.Effect): Effect[Event, State] =
CompositeEffect(this, SideEffect[Event, State]((_: State) callback.apply())) CompositeEffect(this, SideEffect[State]((_: State) callback.apply()))
def andThen(chainedEffect: SideEffect[State]): Effect[Event, State]
final def thenStop(): Effect[Event, State] =
CompositeEffect(this, Stop.asInstanceOf[SideEffect[State]])
} }

View file

@ -4,7 +4,9 @@
package akka.persistence.typed.scaladsl package akka.persistence.typed.scaladsl
import akka.japi.function
import akka.annotation.DoNotInherit import akka.annotation.DoNotInherit
import akka.persistence.typed.{ SideEffect, Stop }
import akka.persistence.typed.internal._ import akka.persistence.typed.internal._
import scala.collection.{ immutable im } import scala.collection.{ immutable im }
@ -14,42 +16,48 @@ import scala.collection.{ immutable ⇒ im }
*/ */
object Effect { object Effect {
// TODO docs /**
* Persist a single event
*
* Side effects can be chained with `andThen`
*/
def persist[Event, State](event: Event): Effect[Event, State] = Persist(event) def persist[Event, State](event: Event): Effect[Event, State] = Persist(event)
// TODO docs /**
* Persist multiple events
*
* Side effects can be chained with `andThen`
*/
def persist[Event, A <: Event, B <: Event, State](evt1: A, evt2: B, events: Event*): Effect[Event, State] = def persist[Event, A <: Event, B <: Event, State](evt1: A, evt2: B, events: Event*): Effect[Event, State] =
persist(evt1 :: evt2 :: events.toList) persist(evt1 :: evt2 :: events.toList)
// TODO docs /**
def persist[Event, State](eventOpt: Option[Event]): Effect[Event, State] = * Persist multiple events
eventOpt match { *
case Some(evt) persist[Event, State](evt) * Side effects can be chained with `andThen`
case _ none[Event, State] */
}
// TODO docs
def persist[Event, State](events: im.Seq[Event]): Effect[Event, State] = def persist[Event, State](events: im.Seq[Event]): Effect[Event, State] =
PersistAll(events) PersistAll(events)
// TODO docs
def persist[Event, State](events: im.Seq[Event], sideEffects: im.Seq[ChainableEffect[Event, State]]): Effect[Event, State] =
new CompositeEffect[Event, State](PersistAll[Event, State](events), sideEffects)
/** /**
* Do not persist anything * Do not persist anything
*
* Side effects can be chained with `andThen`
*/ */
def none[Event, State]: Effect[Event, State] = PersistNothing.asInstanceOf[Effect[Event, State]] def none[Event, State]: Effect[Event, State] = PersistNothing.asInstanceOf[Effect[Event, State]]
/** /**
* This command is not handled, but it is not an error that it isn't. * This command is not handled, but it is not an error that it isn't.
*
* Side effects can be chained with `andThen`
*/ */
def unhandled[Event, State]: Effect[Event, State] = Unhandled.asInstanceOf[Effect[Event, State]] def unhandled[Event, State]: Effect[Event, State] = Unhandled.asInstanceOf[Effect[Event, State]]
/** /**
* Stop this persistent actor * Stop this persistent actor
* Side effects can be chained with `andThen`
*/ */
def stop[Event, State]: ChainableEffect[Event, State] = Stop.asInstanceOf[ChainableEffect[Event, State]] def stop[Event, State]: Effect[Event, State] = none.andThenStop()
} }
/** /**
@ -58,17 +66,30 @@ object Effect {
* Not for user extension. * Not for user extension.
*/ */
@DoNotInherit @DoNotInherit
trait Effect[+Event, State] { self: EffectImpl[Event, State] trait Effect[+Event, State] {
/* All events that will be persisted in this effect */ /* All events that will be persisted in this effect */
def events: im.Seq[Event] def events: im.Seq[Event]
def sideEffects[E >: Event]: im.Seq[ChainableEffect[E, State]] /**
* Run the given callback. Callbacks are run sequentially.
*/
final def thenRun(callback: State Unit): Effect[Event, State] =
CompositeEffect(this, SideEffect(callback))
/** Convenience method to register a side effect with just a callback function */ /**
final def andThen(callback: State Unit): Effect[Event, State] = * Run the given callback after the current Effect
CompositeEffect(this, SideEffect[Event, State](callback)) */
def andThen(chainedEffect: SideEffect[State]): Effect[Event, State]
/**
* Run the given callbacks sequentially after the current Effect
*/
final def andThen(chainedEffects: im.Seq[SideEffect[State]]): Effect[Event, State] =
CompositeEffect(this, chainedEffects)
/** The side effect is to stop the actor */ /** The side effect is to stop the actor */
def andThenStop: Effect[Event, State] = def andThenStop(): Effect[Event, State] = {
CompositeEffect(this, Effect.stop[Event, State]) CompositeEffect(this, Stop.asInstanceOf[SideEffect[State]])
}
} }

View file

@ -9,6 +9,7 @@ import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.ActorRef; import akka.actor.typed.ActorRef;
import akka.persistence.typed.EventAdapter; import akka.persistence.typed.EventAdapter;
import akka.actor.testkit.typed.javadsl.TestInbox; import akka.actor.testkit.typed.javadsl.TestInbox;
import akka.persistence.typed.SideEffect;
import akka.util.Timeout; import akka.util.Timeout;
import java.util.*; import java.util.*;
@ -149,6 +150,12 @@ public class PersistentActorCompileOnlyTest {
private List<String> events = new ArrayList<>(); private List<String> events = new ArrayList<>();
} }
//#commonChainedEffects
// Factored out Chained effect
static final SideEffect<ExampleState> commonChainedEffect = SideEffect.create(s -> System.out.println("Command handled!"));
//#commonChainedEffects
private PersistentBehavior<MyCommand, MyEvent, ExampleState> pa = new PersistentBehavior<MyCommand, MyEvent, ExampleState>("pa") { private PersistentBehavior<MyCommand, MyEvent, ExampleState> pa = new PersistentBehavior<MyCommand, MyEvent, ExampleState>("pa") {
@Override @Override
public ExampleState emptyState() { public ExampleState emptyState() {
@ -157,10 +164,15 @@ public class PersistentActorCompileOnlyTest {
@Override @Override
public CommandHandler<MyCommand, MyEvent, ExampleState> commandHandler() { public CommandHandler<MyCommand, MyEvent, ExampleState> commandHandler() {
//#commonChainedEffects
return commandHandlerBuilder(ExampleState.class) return commandHandlerBuilder(ExampleState.class)
.matchCommand(Cmd.class, (state, cmd) -> Effect().persist(new Evt(cmd.data)) .matchCommand(Cmd.class, (state, cmd) -> Effect().persist(new Evt(cmd.data))
.andThen(() -> cmd.sender.tell(new Ack()))) .andThen(() -> cmd.sender.tell(new Ack()))
.andThen(commonChainedEffect)
)
.build(); .build();
//#commonChainedEffects
} }
@Override @Override

View file

@ -31,7 +31,7 @@ object ManyRecoveriesSpec {
persistenceId = name, persistenceId = name,
emptyState = "", emptyState = "",
commandHandler = CommandHandler.command { commandHandler = CommandHandler.command {
case Cmd(s) Effect.persist(Evt(s)).andThen(_ probe.ref ! s"$name-$s") case Cmd(s) Effect.persist(Evt(s)).thenRun(_ probe.ref ! s"$name-$s")
}, },
eventHandler = { eventHandler = {
case (state, _) latch.foreach(Await.ready(_, 10.seconds)); state case (state, _) latch.foreach(Await.ready(_, 10.seconds)); state

View file

@ -31,7 +31,7 @@ object OptionalSnapshotStoreSpec {
persistenceId = name, persistenceId = name,
emptyState = State(), emptyState = State(),
commandHandler = CommandHandler.command { commandHandler = CommandHandler.command {
_ Effect.persist(Event()).andThen(probe.ref ! _) _ Effect.persist(Event()).thenRun(probe.ref ! _)
}, },
eventHandler = { eventHandler = {
case (_, _) State() case (_, _) State()

View file

@ -64,8 +64,8 @@ object PerformanceSpec {
persistenceId = name, persistenceId = name,
"", "",
commandHandler = CommandHandler.command { commandHandler = CommandHandler.command {
case StopMeasure Effect.none.andThen(_ probe.ref ! StopMeasure) case StopMeasure Effect.none.thenRun(_ probe.ref ! StopMeasure)
case FailAt(sequence) Effect.none.andThen(_ parameters.failAt = sequence) case FailAt(sequence) Effect.none.thenRun(_ parameters.failAt = sequence)
case command other(command, parameters) case command other(command, parameters)
}, },
eventHandler = { eventHandler = {
@ -80,7 +80,7 @@ object PerformanceSpec {
def eventSourcedTestPersistenceBehavior(name: String, probe: TestProbe[Command]) = def eventSourcedTestPersistenceBehavior(name: String, probe: TestProbe[Command]) =
behavior(name, probe) { behavior(name, probe) {
case (CommandWithEvent(evt), parameters) case (CommandWithEvent(evt), parameters)
Effect.persist(evt).andThen(_ { Effect.persist(evt).thenRun(_ {
parameters.persistCalls += 1 parameters.persistCalls += 1
if (parameters.every(1000)) print(".") if (parameters.every(1000)) print(".")
if (parameters.shouldFail) throw TE("boom") if (parameters.shouldFail) throw TE("boom")

View file

@ -6,11 +6,11 @@ package akka.persistence.typed.scaladsl
import scala.concurrent.ExecutionContext import scala.concurrent.ExecutionContext
import scala.concurrent.duration._ import scala.concurrent.duration._
import akka.actor.typed.{ ActorRef, Behavior } import akka.actor.typed.{ ActorRef, Behavior }
import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.ActorContext import akka.actor.typed.scaladsl.ActorContext
import akka.actor.typed.scaladsl.TimerScheduler import akka.actor.typed.scaladsl.TimerScheduler
import akka.persistence.typed.SideEffect
object PersistentActorCompileOnlyTest { object PersistentActorCompileOnlyTest {
@ -72,7 +72,7 @@ object PersistentActorCompileOnlyTest {
commandHandler = CommandHandler.command { commandHandler = CommandHandler.command {
case Cmd(data, sender) case Cmd(data, sender)
Effect.persist(Evt(data)) Effect.persist(Evt(data))
.andThen { _ .thenRun { _
sender ! Ack sender ! Ack
} }
}, },
@ -115,7 +115,7 @@ object PersistentActorCompileOnlyTest {
commandHandler = (ctx: ActorContext[Command], state, cmd) cmd match { commandHandler = (ctx: ActorContext[Command], state, cmd) cmd match {
case DoSideEffect(data) case DoSideEffect(data)
Effect.persist(IntentRecorded(state.nextCorrelationId, data)).andThen { _ Effect.persist(IntentRecorded(state.nextCorrelationId, data)).thenRun { _
performSideEffect(ctx.self, state.nextCorrelationId, data) performSideEffect(ctx.self, state.nextCorrelationId, data)
} }
case AcknowledgeSideEffect(correlationId) case AcknowledgeSideEffect(correlationId)
@ -224,7 +224,7 @@ object PersistentActorCompileOnlyTest {
commandHandler = (ctx, _, cmd) cmd match { commandHandler = (ctx, _, cmd) cmd match {
case RegisterTask(task) case RegisterTask(task)
Effect.persist(TaskRegistered(task)) Effect.persist(TaskRegistered(task))
.andThen { _ .thenRun { _
val child = ctx.spawn[Nothing](worker(task), task) val child = ctx.spawn[Nothing](worker(task), task)
// This assumes *any* termination of the child may trigger a `TaskDone`: // This assumes *any* termination of the child may trigger a `TaskDone`:
ctx.watchWith(child, TaskDone(task)) ctx.watchWith(child, TaskDone(task))
@ -279,7 +279,7 @@ object PersistentActorCompileOnlyTest {
def addItem(id: Id, self: ActorRef[Command]) = def addItem(id: Id, self: ActorRef[Command]) =
Effect Effect
.persist[Event, List[Id]](ItemAdded(id)) .persist[Event, List[Id]](ItemAdded(id))
.andThen(_ metadataRegistry ! GetMetaData(id, adapt)) .thenRun(_ metadataRegistry ! GetMetaData(id, adapt))
PersistentBehaviors.receive[Command, Event, List[Id]]( PersistentBehaviors.receive[Command, Event, List[Id]](
persistenceId = "basket-1", persistenceId = "basket-1",
@ -342,6 +342,14 @@ object PersistentActorCompileOnlyTest {
if (currentState == newMood) Effect.none if (currentState == newMood) Effect.none
else Effect.persist(MoodChanged(newMood)) else Effect.persist(MoodChanged(newMood))
//#commonChainedEffects
// Example factoring out a chained effect rather than using `andThen`
val commonChainedEffects = SideEffect[Mood](_ println("Command processed"))
// Then in a command handler:
Effect.persist(Remembered("Yep")) // persist event
.andThen(commonChainedEffects) // add on common chained effect
//#commonChainedEffects
val commandHandler: CommandHandler[Command, Event, Mood] = { (_, state, cmd) val commandHandler: CommandHandler[Command, Event, Mood] = { (_, state, cmd)
cmd match { cmd match {
case Greet(whom) case Greet(whom)
@ -349,15 +357,15 @@ object PersistentActorCompileOnlyTest {
Effect.none Effect.none
case CheerUp(sender) case CheerUp(sender)
changeMoodIfNeeded(state, Happy) changeMoodIfNeeded(state, Happy)
.andThen { _ .thenRun { _
sender ! Ack sender ! Ack
} }.andThen(commonChainedEffects)
case Remember(memory) case Remember(memory)
// A more elaborate example to show we still have full control over the effects // A more elaborate example to show we still have full control over the effects
// if needed (e.g. when some logic is factored out but you want to add more effects) // if needed (e.g. when some logic is factored out but you want to add more effects)
val commonEffects: Effect[Event, Mood] = changeMoodIfNeeded(state, Happy) val commonEffects: Effect[Event, Mood] = changeMoodIfNeeded(state, Happy)
Effect.persist(commonEffects.events :+ Remembered(memory), commonEffects.sideEffects) Effect.persist(commonEffects.events :+ Remembered(memory))
.andThen(commonChainedEffects)
} }
} }
@ -386,7 +394,7 @@ object PersistentActorCompileOnlyTest {
private val commandHandler: CommandHandler[Command, Event, State] = CommandHandler.command { private val commandHandler: CommandHandler[Command, Event, State] = CommandHandler.command {
case Enough case Enough
Effect.persist(Done) Effect.persist(Done)
.andThen((_: State) println("yay")) .thenRun((_: State) println("yay"))
.andThenStop .andThenStop
} }
@ -411,7 +419,7 @@ object PersistentActorCompileOnlyTest {
emptyState = new First, emptyState = new First,
commandHandler = CommandHandler.command { commandHandler = CommandHandler.command {
cmd cmd
Effect.persist(cmd).andThen { Effect.persist(cmd).thenRun {
case _: First println("first") case _: First println("first")
case _: Second println("second") case _: Second println("second")
} }

View file

@ -126,14 +126,14 @@ object PersistentBehaviorSpec {
case IncrementThenLogThenStop case IncrementThenLogThenStop
Effect.persist(Incremented(1)) Effect.persist(Incremented(1))
.andThen { (_: State) .thenRun { (_: State)
loggingActor ! firstLogging loggingActor ! firstLogging
} }
.andThenStop .andThenStop
case IncrementTwiceThenLogThenStop case IncrementTwiceThenLogThenStop
Effect.persist(Incremented(1), Incremented(2)) Effect.persist(Incremented(1), Incremented(2))
.andThen { (_: State) .thenRun { (_: State)
loggingActor ! firstLogging loggingActor ! firstLogging
} }
.andThenStop .andThenStop
@ -170,30 +170,30 @@ object PersistentBehaviorSpec {
case IncrementTwiceAndThenLog case IncrementTwiceAndThenLog
Effect Effect
.persist(Incremented(1), Incremented(1)) .persist(Incremented(1), Incremented(1))
.andThen { (_: State) .thenRun { (_: State)
loggingActor ! firstLogging loggingActor ! firstLogging
} }
.andThen { _ .thenRun { _
loggingActor ! secondLogging loggingActor ! secondLogging
} }
case EmptyEventsListAndThenLog case EmptyEventsListAndThenLog
Effect Effect
.persist(List.empty) // send empty list of events .persist(List.empty) // send empty list of events
.andThen { _ .thenRun { _
loggingActor ! firstLogging loggingActor ! firstLogging
} }
case DoNothingAndThenLog case DoNothingAndThenLog
Effect Effect
.none .none
.andThen { _ .thenRun { _
loggingActor ! firstLogging loggingActor ! firstLogging
} }
case LogThenStop case LogThenStop
Effect.none[Event, State] Effect.none[Event, State]
.andThen { _ .thenRun { _
loggingActor ! firstLogging loggingActor ! firstLogging
} }
.andThenStop .andThenStop

View file

@ -43,7 +43,7 @@ object AccountExample1 {
else { else {
Effect Effect
.persist(Withdrawn(amount)) .persist(Withdrawn(amount))
.andThen { .thenRun {
case Some(OpenedAccount(balance)) case Some(OpenedAccount(balance))
// do some side-effect using balance // do some side-effect using balance
println(balance) println(balance)

View file

@ -61,7 +61,7 @@ object AccountExample2 {
else { else {
Effect Effect
.persist(Withdrawn(amount)) .persist(Withdrawn(amount))
.andThen { .thenRun {
case OpenedAccount(balance) case OpenedAccount(balance)
// do some side-effect using balance // do some side-effect using balance
println(balance) println(balance)

View file

@ -4,6 +4,7 @@
package docs.akka.persistence.typed package docs.akka.persistence.typed
import akka.actor.typed.ActorRef
import akka.actor.typed.{ Behavior, SupervisorStrategy } import akka.actor.typed.{ Behavior, SupervisorStrategy }
import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.Behaviors
import akka.persistence.typed.scaladsl.PersistentBehaviors import akka.persistence.typed.scaladsl.PersistentBehaviors
@ -30,6 +31,9 @@ object BasicPersistentBehaviorsCompileOnly {
) )
//#structure //#structure
case class CommandWithSender(reply: ActorRef[String]) extends Command
case class VeryImportantEvent() extends Event
//#recovery //#recovery
val recoveryBehavior: Behavior[Command] = val recoveryBehavior: Behavior[Command] =
PersistentBehaviors.receive[Command, Event, State]( PersistentBehaviors.receive[Command, Event, State](
@ -58,7 +62,6 @@ object BasicPersistentBehaviorsCompileOnly {
(state, evt) (state, evt)
throw new RuntimeException("TODO: process the event return the next state") throw new RuntimeException("TODO: process the event return the next state")
).withTagger(_ Set("tag1", "tag2")) ).withTagger(_ Set("tag1", "tag2"))
//#tagging //#tagging
//#wrapPersistentBehavior //#wrapPersistentBehavior
@ -94,4 +97,5 @@ object BasicPersistentBehaviorsCompileOnly {
randomFactor = 0.1 randomFactor = 0.1
)) ))
//#supervision //#supervision
} }

View file

@ -58,7 +58,7 @@ object InDepthPersistentBehaviorSpec {
cmd match { cmd match {
case AddPost(content, replyTo) case AddPost(content, replyTo)
val evt = PostAdded(content.postId, content) val evt = PostAdded(content.postId, content)
Effect.persist(evt).andThen { state2 Effect.persist(evt).thenRun { state2
// After persist is done additional side effects can be performed // After persist is done additional side effects can be performed
replyTo ! AddPostDone(content.postId) replyTo ! AddPostDone(content.postId)
} }
@ -75,11 +75,11 @@ object InDepthPersistentBehaviorSpec {
cmd match { cmd match {
case ChangeBody(newBody, replyTo) case ChangeBody(newBody, replyTo)
val evt = BodyChanged(state.postId, newBody) val evt = BodyChanged(state.postId, newBody)
Effect.persist(evt).andThen { _ Effect.persist(evt).thenRun { _
replyTo ! Done replyTo ! Done
} }
case Publish(replyTo) case Publish(replyTo)
Effect.persist(Published(state.postId)).andThen { _ Effect.persist(Published(state.postId)).thenRun { _
println(s"Blog post ${state.postId} was published") println(s"Blog post ${state.postId} was published")
replyTo ! Done replyTo ! Done
} }