diff --git a/akka-typed-tests/src/test/scala/akka/typed/cluster/sharding/ClusterShardingPersistenceSpec.scala b/akka-typed-tests/src/test/scala/akka/typed/cluster/sharding/ClusterShardingPersistenceSpec.scala index 258c758c0b..5c55e7ebbf 100644 --- a/akka-typed-tests/src/test/scala/akka/typed/cluster/sharding/ClusterShardingPersistenceSpec.scala +++ b/akka-typed-tests/src/test/scala/akka/typed/cluster/sharding/ClusterShardingPersistenceSpec.scala @@ -53,14 +53,14 @@ object ClusterShardingPersistenceSpec { PersistentActor.persistentEntity[Command, String, String]( persistenceIdFromActorName = name ⇒ "Test-" + name, initialState = "", - actions = Actions((ctx, cmd, state) ⇒ cmd match { - case Add(s) ⇒ Persist(s) + commandHandler = CommandHandler((ctx, state, cmd) ⇒ cmd match { + case Add(s) ⇒ Effect.persist(s) case Get(replyTo) ⇒ replyTo ! state - PersistNothing() - case StopPlz ⇒ Stop() + Effect.done + case StopPlz ⇒ Effect.stop }), - applyEvent = (evt, state) ⇒ if (state.isEmpty) evt else state + "|" + evt) + eventHandler = (state, evt) ⇒ if (state.isEmpty) evt else state + "|" + evt) val typeKey = EntityTypeKey[Command]("test") diff --git a/akka-typed-tests/src/test/scala/akka/typed/persistence/scaladsl/PersistentActorCompileOnlyTest.scala b/akka-typed-tests/src/test/scala/akka/typed/persistence/scaladsl/PersistentActorCompileOnlyTest.scala index 8c9598dd33..9fec9205fe 100644 --- a/akka-typed-tests/src/test/scala/akka/typed/persistence/scaladsl/PersistentActorCompileOnlyTest.scala +++ b/akka-typed-tests/src/test/scala/akka/typed/persistence/scaladsl/PersistentActorCompileOnlyTest.scala @@ -31,12 +31,12 @@ object PersistentActorCompileOnlyTest { initialState = ExampleState(Nil), - actions = Actions.command { - case Cmd(data) ⇒ Persist(Evt(data)) + commandHandler = CommandHandler.command { + case Cmd(data) ⇒ Effect.persist(Evt(data)) }, - applyEvent = { - case (Evt(data), state) ⇒ state.copy(data :: state.events) + eventHandler = { + case (state, Evt(data)) ⇒ state.copy(data :: state.events) }) } @@ -56,14 +56,14 @@ object PersistentActorCompileOnlyTest { initialState = ExampleState(Nil), - actions = Actions.command { + commandHandler = CommandHandler.command { case Cmd(data, sender) ⇒ - Persist(Evt(data)) + Effect.persist(Evt(data)) .andThen { sender ! Ack } }, - applyEvent = { - case (Evt(data), state) ⇒ state.copy(data :: state.events) + eventHandler = { + case (state, Evt(data)) ⇒ state.copy(data :: state.events) }) } @@ -98,16 +98,16 @@ object PersistentActorCompileOnlyTest { initialState = EventsInFlight(0, Map.empty), - actions = Actions((ctx, cmd, state) ⇒ cmd match { + commandHandler = CommandHandler((ctx, state, cmd) ⇒ cmd match { case DoSideEffect(data) ⇒ - Persist(IntentRecorded(state.nextCorrelationId, data)).andThen { + Effect.persist(IntentRecorded(state.nextCorrelationId, data)).andThen { performSideEffect(ctx.self, state.nextCorrelationId, data) } case AcknowledgeSideEffect(correlationId) ⇒ - Persist(SideEffectAcknowledged(correlationId)) + Effect.persist(SideEffectAcknowledged(correlationId)) }), - applyEvent = (evt, state) ⇒ evt match { + eventHandler = (state, evt) ⇒ evt match { case IntentRecorded(correlationId, data) ⇒ EventsInFlight( nextCorrelationId = correlationId + 1, @@ -119,7 +119,6 @@ object PersistentActorCompileOnlyTest { state.dataByCorrelationId.foreach { case (correlationId, data) ⇒ performSideEffect(ctx.self, correlationId, data) } - state } } @@ -140,22 +139,22 @@ object PersistentActorCompileOnlyTest { val b: Behavior[Command] = PersistentActor.immutable[Command, Event, Mood]( persistenceId = "myPersistenceId", initialState = Happy, - actions = Actions.byState { - case Happy ⇒ Actions.command { + commandHandler = CommandHandler.byState { + case Happy ⇒ CommandHandler.command { case Greet(whom) ⇒ println(s"Super happy to meet you $whom!") - PersistNothing() - case MoodSwing ⇒ Persist(MoodChanged(Sad)) + Effect.done + case MoodSwing ⇒ Effect.persist(MoodChanged(Sad)) } - case Sad ⇒ Actions.command { + case Sad ⇒ CommandHandler.command { case Greet(whom) ⇒ println(s"hi $whom") - PersistNothing() - case MoodSwing ⇒ Persist(MoodChanged(Happy)) + Effect.done + case MoodSwing ⇒ Effect.persist(MoodChanged(Happy)) } }, - applyEvent = { - case (MoodChanged(to), _) ⇒ to + eventHandler = { + case (_, MoodChanged(to)) ⇒ to }) // FIXME this doesn't work, wrapping is not supported @@ -181,11 +180,11 @@ object PersistentActorCompileOnlyTest { PersistentActor.immutable[Command, Event, State]( persistenceId = "asdf", initialState = State(Nil), - actions = Actions.command { - case RegisterTask(task) ⇒ Persist(TaskRegistered(task)) - case TaskDone(task) ⇒ Persist(TaskRemoved(task)) + commandHandler = CommandHandler.command { + case RegisterTask(task) ⇒ Effect.persist(TaskRegistered(task)) + case TaskDone(task) ⇒ Effect.persist(TaskRemoved(task)) }, - applyEvent = (evt, state) ⇒ evt match { + eventHandler = (state, evt) ⇒ evt match { case TaskRegistered(task) ⇒ State(task :: state.tasksInFlight) case TaskRemoved(task) ⇒ State(state.tasksInFlight.filter(_ != task)) }).snapshotOnState(_.tasksInFlight.isEmpty) @@ -208,16 +207,17 @@ object PersistentActorCompileOnlyTest { PersistentActor.immutable[Command, Event, State]( persistenceId = "asdf", initialState = State(Nil), - actions = Actions((ctx, cmd, _) ⇒ cmd match { - case RegisterTask(task) ⇒ Persist(TaskRegistered(task)) - .andThen { - val child = ctx.spawn[Nothing](worker(task), task) - // This assumes *any* termination of the child may trigger a `TaskDone`: - ctx.watchWith(child, TaskDone(task)) - } - case TaskDone(task) ⇒ Persist(TaskRemoved(task)) + commandHandler = CommandHandler((ctx, _, cmd) ⇒ cmd match { + case RegisterTask(task) ⇒ + Effect.persist(TaskRegistered(task)) + .andThen { + val child = ctx.spawn[Nothing](worker(task), task) + // This assumes *any* termination of the child may trigger a `TaskDone`: + ctx.watchWith(child, TaskDone(task)) + } + case TaskDone(task) ⇒ Effect.persist(TaskRemoved(task)) }), - applyEvent = (evt, state) ⇒ evt match { + eventHandler = (state, evt) ⇒ evt match { case TaskRegistered(task) ⇒ State(task :: state.tasksInFlight) case TaskRemoved(task) ⇒ State(state.tasksInFlight.filter(_ != task)) }) @@ -239,21 +239,21 @@ object PersistentActorCompileOnlyTest { persistenceId = "asdf", initialState = State(Nil), // The 'onSignal' seems to break type inference here.. not sure if that can be avoided? - actions = Actions[RegisterTask, Event, State]((ctx, cmd, state) ⇒ cmd match { - case RegisterTask(task) ⇒ Persist(TaskRegistered(task)) + commandHandler = CommandHandler[RegisterTask, Event, State]((ctx, state, cmd) ⇒ cmd match { + case RegisterTask(task) ⇒ Effect.persist(TaskRegistered(task)) .andThen { val child = ctx.spawn[Nothing](worker(task), task) // This assumes *any* termination of the child may trigger a `TaskDone`: ctx.watch(child) } }).onSignal { - case (ctx, Terminated(actorRef), _) ⇒ + case (ctx, _, Terminated(actorRef)) ⇒ // watchWith (as in the above example) is nicer because it means we don't have to // 'manually' associate the task and the child actor, but we wanted to demonstrate // signals here: - Persist(TaskRemoved(actorRef.path.name)) + Effect.persist(TaskRemoved(actorRef.path.name)) }, - applyEvent = (evt, state) ⇒ evt match { + eventHandler = (state, evt) ⇒ evt match { case TaskRegistered(task) ⇒ State(task :: state.tasksInFlight) case TaskRemoved(task) ⇒ State(state.tasksInFlight.filter(_ != task)) }) @@ -305,38 +305,42 @@ object PersistentActorCompileOnlyTest { PersistentActor.immutable[Command, Event, List[Id]]( persistenceId = "basket-1", initialState = Nil, - actions = - Actions.byState(state ⇒ - if (isFullyHydrated(basket, state)) Actions { (ctx, cmd, state) ⇒ + commandHandler = + CommandHandler.byState(state ⇒ + if (isFullyHydrated(basket, state)) CommandHandler { (ctx, state, cmd) ⇒ cmd match { case AddItem(id) ⇒ addItem(id, ctx.self) - case RemoveItem(id) ⇒ Persist(ItemRemoved(id)) + case RemoveItem(id) ⇒ Effect.persist(ItemRemoved(id)) case GotMetaData(data) ⇒ - basket = basket.updatedWith(data); PersistNothing() - case GetTotalPrice(sender) ⇒ sender ! basket.items.map(_.price).sum; PersistNothing() + basket = basket.updatedWith(data) + Effect.done + case GetTotalPrice(sender) ⇒ + sender ! basket.items.map(_.price).sum + Effect.done } } - else Actions { (ctx, cmd, state) ⇒ + else CommandHandler { (ctx, state, cmd) ⇒ cmd match { case AddItem(id) ⇒ addItem(id, ctx.self) - case RemoveItem(id) ⇒ Persist(ItemRemoved(id)) + case RemoveItem(id) ⇒ Effect.persist(ItemRemoved(id)) case GotMetaData(data) ⇒ basket = basket.updatedWith(data) if (isFullyHydrated(basket, state)) { stash.foreach(ctx.self ! _) stash = Nil } - PersistNothing() - case cmd: GetTotalPrice ⇒ stash :+= cmd; PersistNothing() + Effect.done + case cmd: GetTotalPrice ⇒ + stash :+= cmd + Effect.done } }), - applyEvent = (evt, state) ⇒ evt match { + eventHandler = (state, evt) ⇒ evt match { case ItemAdded(id) ⇒ id +: state case ItemRemoved(id) ⇒ state.filter(_ != id) }).onRecoveryCompleted((ctx, state) ⇒ { val ad = ctx.spawnAdapter((m: MetaData) ⇒ GotMetaData(m)) state.foreach(id ⇒ metadataRegistry ! GetMetaData(id, ad)) - state }) } } @@ -358,17 +362,17 @@ object PersistentActorCompileOnlyTest { case class Remembered(memory: String) extends Event def changeMoodIfNeeded(currentState: Mood, newMood: Mood): Effect[Event, Mood] = - if (currentState == newMood) PersistNothing() - else Persist(MoodChanged(newMood)) + if (currentState == newMood) Effect.done + else Effect.persist(MoodChanged(newMood)) PersistentActor.immutable[Command, Event, Mood]( persistenceId = "myPersistenceId", initialState = Sad, - actions = Actions { (_, cmd, state) ⇒ + commandHandler = CommandHandler { (_, state, cmd) ⇒ cmd match { case Greet(whom) ⇒ println(s"Hi there, I'm $state!") - PersistNothing() + Effect.done case CheerUp(sender) ⇒ changeMoodIfNeeded(state, Happy) .andThen { sender ! Ack } @@ -376,15 +380,13 @@ object PersistentActorCompileOnlyTest { // A more elaborate example to show we still have full control over the effects // if needed (e.g. when some logic is factored out but you want to add more effects) val commonEffects = changeMoodIfNeeded(state, Happy) - CompositeEffect( - PersistAll[Event, Mood](commonEffects.events :+ Remembered(memory)), - commonEffects.sideEffects) + Effect.persistAll(commonEffects.events :+ Remembered(memory), commonEffects.sideEffects) } }, - applyEvent = { - case (MoodChanged(to), _) ⇒ to - case (Remembered(_), state) ⇒ state + eventHandler = { + case (_, MoodChanged(to)) ⇒ to + case (state, Remembered(_)) ⇒ state }) } @@ -396,22 +398,20 @@ object PersistentActorCompileOnlyTest { sealed trait Event case object Done extends Event - type State = Unit + class State PersistentActor.immutable[Command, Event, State]( persistenceId = "myPersistenceId", - initialState = (), - actions = Actions { (_, cmd, _) ⇒ - cmd match { - case Enough ⇒ - Persist(Done) - .andThen( - SideEffect(_ ⇒ println("yay")), - Stop()) - } + initialState = new State, + commandHandler = CommandHandler.command { + case Enough ⇒ + Effect.persist(Done) + .andThen(println("yay")) + .andThenStop + }, - applyEvent = { - case (Done, _) ⇒ () + eventHandler = { + case (state, Done) ⇒ state }) } diff --git a/akka-typed-tests/src/test/scala/akka/typed/persistence/scaladsl/PersistentActorSpec.scala b/akka-typed-tests/src/test/scala/akka/typed/persistence/scaladsl/PersistentActorSpec.scala index 00730d7908..3bd07189e8 100644 --- a/akka-typed-tests/src/test/scala/akka/typed/persistence/scaladsl/PersistentActorSpec.scala +++ b/akka-typed-tests/src/test/scala/akka/typed/persistence/scaladsl/PersistentActorSpec.scala @@ -45,12 +45,12 @@ object PersistentActorSpec { PersistentActor.immutable[Command, Event, State]( persistenceId, initialState = State(0, Vector.empty), - actions = Actions[Command, Event, State]((ctx, cmd, state) ⇒ cmd match { + commandHandler = CommandHandler[Command, Event, State]((ctx, state, cmd) ⇒ cmd match { case Increment ⇒ - Persist(Incremented(1)) + Effect.persist(Incremented(1)) case GetValue(replyTo) ⇒ replyTo ! state - PersistNothing() + Effect.done case IncrementLater ⇒ // purpose is to test signals val delay = ctx.spawnAnonymous(Actor.withTimers[Tick.type] { timers ⇒ @@ -60,19 +60,19 @@ object PersistentActorSpec { }) }) ctx.watch(delay) - PersistNothing() + Effect.done case IncrementAfterReceiveTimeout ⇒ ctx.setReceiveTimeout(10.millis, Timeout) - PersistNothing() + Effect.done case Timeout ⇒ ctx.cancelReceiveTimeout() - Persist(Incremented(100)) + Effect.persist(Incremented(100)) }) .onSignal { - case (_, Terminated(_), _) ⇒ - Persist(Incremented(10)) + case (_, _, Terminated(_)) ⇒ + Effect.persist(Incremented(10)) }, - applyEvent = (evt, state) ⇒ evt match { + eventHandler = (state, evt) ⇒ evt match { case Incremented(delta) ⇒ State(state.value + delta, state.history :+ state.value) }) diff --git a/akka-typed/src/main/scala/akka/typed/persistence/internal/PersistentActorImpl.scala b/akka-typed/src/main/scala/akka/typed/persistence/internal/PersistentActorImpl.scala index 92c15d093f..1112ddc63e 100644 --- a/akka-typed/src/main/scala/akka/typed/persistence/internal/PersistentActorImpl.scala +++ b/akka-typed/src/main/scala/akka/typed/persistence/internal/PersistentActorImpl.scala @@ -50,9 +50,9 @@ import akka.typed.internal.adapter.ActorRefAdapter private var state: S = behavior.initialState - private val actions: Actions[C, E, S] = behavior.actions + private val commandHandler: CommandHandler[C, E, S] = behavior.commandHandler - private val eventHandler: (E, S) ⇒ S = behavior.applyEvent + private val eventHandler: (S, E) ⇒ S = behavior.eventHandler private val ctxAdapter = new ActorContextAdapter[C](context) private val ctx = ctxAdapter.asScala @@ -62,17 +62,17 @@ import akka.typed.internal.adapter.ActorRefAdapter state = snapshot.asInstanceOf[S] case RecoveryCompleted ⇒ - state = behavior.recoveryCompleted(ctx, state) + behavior.recoveryCompleted(ctx, state) case event: E @unchecked ⇒ state = applyEvent(state, event) } def applyEvent(s: S, event: E): S = - eventHandler.apply(event, s) + eventHandler.apply(s, event) - private val unhandledSignal: PartialFunction[(ActorContext[C], Signal, S), Effect[E, S]] = { - case sig ⇒ Unhandled() + private val unhandledSignal: PartialFunction[(ActorContext[C], S, Signal), Effect[E, S]] = { + case sig ⇒ Effect.unhandled } override def receiveCommand: Receive = { @@ -84,13 +84,13 @@ import akka.typed.internal.adapter.ActorRefAdapter val effects = msg match { case a.Terminated(ref) ⇒ val sig = Terminated(ActorRefAdapter(ref))(null) - actions.sigHandler(state).applyOrElse((ctx, sig, state), unhandledSignal) + commandHandler.sigHandler(state).applyOrElse((ctx, state, sig), unhandledSignal) case a.ReceiveTimeout ⇒ - actions.commandHandler(ctx, ctxAdapter.receiveTimeoutMsg, state) + commandHandler.commandHandler(ctx, state, ctxAdapter.receiveTimeoutMsg) // TODO note that PostStop and PreRestart signals are not handled, we wouldn't be able to persist there case cmd: C @unchecked ⇒ // FIXME we could make it more safe by using ClassTag for C - actions.commandHandler(ctx, cmd, state) + commandHandler.commandHandler(ctx, state, cmd) } applyEffects(msg, effects) @@ -121,16 +121,16 @@ import akka.typed.internal.adapter.ActorRefAdapter persistAll(scala.collection.immutable.Seq(events)) { _ ⇒ sideEffects.foreach(applySideEffect) } - case PersistNothing() ⇒ - case Unhandled() ⇒ + case _: PersistNothing.type @unchecked ⇒ + case _: Unhandled.type @unchecked ⇒ super.unhandled(msg) case c: ChainableEffect[_, S] ⇒ applySideEffect(c) } def applySideEffect(effect: ChainableEffect[_, S]): Unit = effect match { - case Stop() ⇒ context.stop(self) - case SideEffect(callbacks) ⇒ callbacks.apply(state) + case _: Stop.type @unchecked ⇒ context.stop(self) + case SideEffect(callbacks) ⇒ callbacks.apply(state) } } diff --git a/akka-typed/src/main/scala/akka/typed/persistence/scaladsl/PersistentActor.scala b/akka-typed/src/main/scala/akka/typed/persistence/scaladsl/PersistentActor.scala index 124009222c..a51bc6ef00 100644 --- a/akka-typed/src/main/scala/akka/typed/persistence/scaladsl/PersistentActor.scala +++ b/akka-typed/src/main/scala/akka/typed/persistence/scaladsl/PersistentActor.scala @@ -4,23 +4,23 @@ package akka.typed.persistence.scaladsl import scala.collection.{ immutable ⇒ im } -import akka.annotation.DoNotInherit -import akka.annotation.InternalApi +import akka.annotation.{ ApiMayChange, DoNotInherit, InternalApi } import akka.typed.Behavior.UntypedBehavior import akka.typed.Signal import akka.typed.persistence.internal.PersistentActorImpl import akka.typed.scaladsl.ActorContext object PersistentActor { + /** * Create a `Behavior` for a persistent actor. */ def immutable[Command, Event, State]( - persistenceId: String, - initialState: State, - actions: Actions[Command, Event, State], - applyEvent: (Event, State) ⇒ State): PersistentBehavior[Command, Event, State] = - persistentEntity(_ ⇒ persistenceId, initialState, actions, applyEvent) + persistenceId: String, + initialState: State, + commandHandler: CommandHandler[Command, Event, State], + eventHandler: (State, Event) ⇒ State): PersistentBehavior[Command, Event, State] = + persistentEntity(_ ⇒ persistenceId, initialState, commandHandler, eventHandler) /** * Create a `Behavior` for a persistent actor in Cluster Sharding, when the persistenceId is not known @@ -32,125 +32,186 @@ object PersistentActor { def persistentEntity[Command, Event, State]( persistenceIdFromActorName: String ⇒ String, initialState: State, - actions: Actions[Command, Event, State], - applyEvent: (Event, State) ⇒ State): PersistentBehavior[Command, Event, State] = - new PersistentBehavior(persistenceIdFromActorName, initialState, actions, applyEvent, - recoveryCompleted = (_, state) ⇒ state) + commandHandler: CommandHandler[Command, Event, State], + eventHandler: (State, Event) ⇒ State): PersistentBehavior[Command, Event, State] = + new PersistentBehavior(persistenceIdFromActorName, initialState, commandHandler, eventHandler, + recoveryCompleted = (_, _) ⇒ ()) - sealed abstract class Effect[+Event, State]() { + /** + * Factories for effects - how a persitent actor reacts on a command + */ + object Effect { + def persist[Event, State](event: Event): Effect[Event, State] = + new Persist[Event, State](event) + + def persistAll[Event, State](events: im.Seq[Event]): Effect[Event, State] = + new PersistAll[Event, State](events) + + def persistAll[Event, State](events: im.Seq[Event], sideEffects: im.Seq[ChainableEffect[Event, State]]): Effect[Event, State] = + new CompositeEffect[Event, State](Some(new PersistAll[Event, State](events)), sideEffects) + + /** + * Do not persist anything + */ + def done[Event, State]: Effect[Event, State] = PersistNothing.asInstanceOf[Effect[Event, State]] + + /** + * This command is not handled, but it is not an error that it isn't. + */ + def unhandled[Event, State]: Effect[Event, State] = Unhandled.asInstanceOf[Effect[Event, State]] + + /** + * Stop this persistent actor + */ + def stop[Event, State]: ChainableEffect[Event, State] = Stop.asInstanceOf[ChainableEffect[Event, State]] + + } + + /** + * Instances are created through the factories in the [[Effect]] companion object. + * + * Not for user extension. + */ + @DoNotInherit + sealed abstract class Effect[+Event, State] { /* All events that will be persisted in this effect */ def events: im.Seq[Event] = Nil /* All side effects that will be performed in this effect */ - def sideEffects: im.Seq[ChainableEffect[_, State]] = - if (isInstanceOf[ChainableEffect[_, State]]) im.Seq(asInstanceOf[ChainableEffect[_, State]]) - else Nil - - def andThen(sideEffects: ChainableEffect[_, State]*): Effect[Event, State] = - CompositeEffect(if (events.isEmpty) None else Some(this), sideEffects.toList) + def sideEffects[E >: Event]: im.Seq[ChainableEffect[E, State]] = Nil /** Convenience method to register a side effect with just a callback function */ def andThen(callback: State ⇒ Unit): Effect[Event, State] = - andThen(SideEffect[Event, State](callback)) + CompositeEffect(this, SideEffect[Event, State](callback)) /** Convenience method to register a side effect with just a lazy expression */ def andThen(callback: ⇒ Unit): Effect[Event, State] = - andThen(SideEffect[Event, State]((_: State) ⇒ callback)) + CompositeEffect(this, SideEffect[Event, State]((_: State) ⇒ callback)) + + /** The side effect is to stop the actor */ + def andThenStop: Effect[Event, State] = + CompositeEffect(this, Effect.stop[Event, State]) } - case class CompositeEffect[Event, State](persistingEffect: Option[Effect[Event, State]], override val sideEffects: im.Seq[ChainableEffect[_, State]]) extends Effect[Event, State] { + @InternalApi + private[akka] object CompositeEffect { + def apply[Event, State](effect: Effect[Event, State], sideEffects: ChainableEffect[Event, State]): Effect[Event, State] = + CompositeEffect[Event, State]( + if (effect.events.isEmpty) None else Some(effect), + sideEffects :: Nil) + } + + @InternalApi + private[akka] final case class CompositeEffect[Event, State]( + persistingEffect: Option[Effect[Event, State]], + _sideEffects: im.Seq[ChainableEffect[Event, State]]) extends Effect[Event, State] { override val events = persistingEffect.map(_.events).getOrElse(Nil) - override def andThen(additionalSideEffects: ChainableEffect[_, State]*): CompositeEffect[Event, State] = - copy(sideEffects = sideEffects ++ additionalSideEffects) - } - object CompositeEffect { - def apply[Event, State](persistAll: PersistAll[Event, State], sideEffects: im.Seq[ChainableEffect[_, State]]): CompositeEffect[Event, State] = - CompositeEffect(Some(persistAll), sideEffects) + + override def sideEffects[E >: Event]: im.Seq[ChainableEffect[E, State]] = _sideEffects.asInstanceOf[im.Seq[ChainableEffect[E, State]]] + } - case class PersistNothing[Event, State]() extends Effect[Event, State] + @InternalApi + private[akka] case object PersistNothing extends Effect[Nothing, Nothing] - case class Persist[Event, State](event: Event) extends Effect[Event, State] { + @InternalApi + private[akka] case class Persist[Event, State](event: Event) extends Effect[Event, State] { override val events = event :: Nil } - case class PersistAll[Event, State](override val events: im.Seq[Event]) extends Effect[Event, State] - - trait ChainableEffect[Event, State] { - self: Effect[Event, State] ⇒ - } - case class SideEffect[Event, State](effect: State ⇒ Unit) extends Effect[Event, State] with ChainableEffect[Event, State] - case class Stop[Event, State]() extends Effect[Event, State] with ChainableEffect[Event, State]() - - case class Unhandled[Event, State]() extends Effect[Event, State] - - type CommandHandler[Command, Event, State] = Function3[ActorContext[Command], Command, State, Effect[Event, State]] - type SignalHandler[Command, Event, State] = PartialFunction[(ActorContext[Command], Signal, State), Effect[Event, State]] + @InternalApi + private[akka] case class PersistAll[Event, State](override val events: im.Seq[Event]) extends Effect[Event, State] /** - * `Actions` defines command handlers and partial function for other signals, + * Not for user extension + */ + @DoNotInherit + sealed abstract class ChainableEffect[Event, State] extends Effect[Event, State] + @InternalApi + private[akka] case class SideEffect[Event, State](effect: State ⇒ Unit) extends ChainableEffect[Event, State] + @InternalApi + private[akka] case object Stop extends ChainableEffect[Nothing, Nothing] + + @InternalApi + private[akka] case object Unhandled extends Effect[Nothing, Nothing] + + type CommandToEffect[Command, Event, State] = (ActorContext[Command], State, Command) ⇒ Effect[Event, State] + type SignalHandler[Command, Event, State] = PartialFunction[(ActorContext[Command], State, Signal), Effect[Event, State]] + + /** + * The `CommandHandler` defines how to act on commands and partial function for other signals, * e.g. `Termination` messages if `watch` is used. * - * Note that you can have different actions based on current state by using - * [[Actions#byState]]. + * Note that you can have different command handlers based on current state by using + * [[CommandHandler#byState]]. */ - object Actions { - def apply[Command, Event, State](commandHandler: CommandHandler[Command, Event, State]): Actions[Command, Event, State] = - new Actions(commandHandler, Map.empty) + object CommandHandler { + + /** + * Create a command handler that will be applied for commands. + * + * @see [[Effect]] for possible effects of a command. + */ + // Note: using full parameter type instead of type aliase here to make API more straight forward to figure out in an IDE + def apply[Command, Event, State](commandHandler: (ActorContext[Command], State, Command) ⇒ Effect[Event, State]): CommandHandler[Command, Event, State] = + new CommandHandler(commandHandler, Map.empty) /** * Convenience for simple commands that don't need the state and context. + * + * @see [[Effect]] for possible effects of a command. */ - def command[Command, Event, State](commandHandler: Command ⇒ Effect[Event, State]): Actions[Command, Event, State] = - apply((_, cmd, _) ⇒ commandHandler(cmd)) + def command[Command, Event, State](commandHandler: Command ⇒ Effect[Event, State]): CommandHandler[Command, Event, State] = + apply((_, _, cmd) ⇒ commandHandler(cmd)) /** - * Select different actions based on current state. + * Select different command handlers based on current state. */ - def byState[Command, Event, State](choice: State ⇒ Actions[Command, Event, State]): Actions[Command, Event, State] = - new ByStateActions(choice, signalHandler = PartialFunction.empty) + def byState[Command, Event, State](choice: State ⇒ CommandHandler[Command, Event, State]): CommandHandler[Command, Event, State] = + new ByStateCommandHandler(choice, signalHandler = PartialFunction.empty) } /** * INTERNAL API */ - @InternalApi private[akka] final class ByStateActions[Command, Event, State]( - choice: State ⇒ Actions[Command, Event, State], + @InternalApi private[akka] final class ByStateCommandHandler[Command, Event, State]( + choice: State ⇒ CommandHandler[Command, Event, State], signalHandler: SignalHandler[Command, Event, State]) - extends Actions[Command, Event, State]( - commandHandler = (ctx, cmd, state) ⇒ choice(state).commandHandler(ctx, cmd, state), + extends CommandHandler[Command, Event, State]( + commandHandler = (ctx, state, cmd) ⇒ choice(state).commandHandler(ctx, state, cmd), signalHandler) { // SignalHandler may be registered in the wrapper or in the wrapped private[akka] override def sigHandler(state: State): SignalHandler[Command, Event, State] = choice(state).sigHandler(state).orElse(signalHandler) - // override to preserve the ByStateActions + // override to preserve the ByStateCommandHandler private[akka] override def withSignalHandler( - handler: SignalHandler[Command, Event, State]): Actions[Command, Event, State] = - new ByStateActions(choice, handler) + handler: SignalHandler[Command, Event, State]): CommandHandler[Command, Event, State] = + new ByStateCommandHandler(choice, handler) } /** - * `Actions` defines command handlers and partial function for other signals, + * `CommandHandler` defines command handlers and partial function for other signals, * e.g. `Termination` messages if `watch` is used. - * `Actions` is an immutable class. + * `CommandHandler` is an immutable class. */ - @DoNotInherit class Actions[Command, Event, State] private[akka] ( - val commandHandler: CommandHandler[Command, Event, State], + @DoNotInherit class CommandHandler[Command, Event, State] private[akka] ( + val commandHandler: CommandToEffect[Command, Event, State], val signalHandler: SignalHandler[Command, Event, State]) { @InternalApi private[akka] def sigHandler(state: State): SignalHandler[Command, Event, State] = signalHandler - def onSignal(handler: SignalHandler[Command, Event, State]): Actions[Command, Event, State] = + // Note: using full parameter type instead of type alias here to make API more straight forward to figure out in an IDE + def onSignal(handler: PartialFunction[(ActorContext[Command], State, Signal), Effect[Event, State]]): CommandHandler[Command, Event, State] = withSignalHandler(signalHandler.orElse(handler)) /** INTERNAL API */ @InternalApi private[akka] def withSignalHandler( - handler: SignalHandler[Command, Event, State]): Actions[Command, Event, State] = - new Actions(commandHandler, handler) + handler: SignalHandler[Command, Event, State]): CommandHandler[Command, Event, State] = + new CommandHandler(commandHandler, handler) } @@ -159,9 +220,9 @@ object PersistentActor { class PersistentBehavior[Command, Event, State]( @InternalApi private[akka] val persistenceIdFromActorName: String ⇒ String, val initialState: State, - val actions: PersistentActor.Actions[Command, Event, State], - val applyEvent: (Event, State) ⇒ State, - val recoveryCompleted: (ActorContext[Command], State) ⇒ State) extends UntypedBehavior[Command] { + val commandHandler: PersistentActor.CommandHandler[Command, Event, State], + val eventHandler: (State, Event) ⇒ State, + val recoveryCompleted: (ActorContext[Command], State) ⇒ Unit) extends UntypedBehavior[Command] { import PersistentActor._ /** INTERNAL API */ @@ -171,7 +232,7 @@ class PersistentBehavior[Command, Event, State]( * The `callback` function is called to notify the actor that the recovery process * is finished. */ - def onRecoveryCompleted(callback: (ActorContext[Command], State) ⇒ State): PersistentBehavior[Command, Event, State] = + def onRecoveryCompleted(callback: (ActorContext[Command], State) ⇒ Unit): PersistentBehavior[Command, Event, State] = copy(recoveryCompleted = callback) /** @@ -185,10 +246,10 @@ class PersistentBehavior[Command, Event, State]( def snapshotOn(predicate: (State, Event) ⇒ Boolean): PersistentBehavior[Command, Event, State] = ??? private def copy( - persistenceIdFromActorName: String ⇒ String = persistenceIdFromActorName, - initialState: State = initialState, - actions: Actions[Command, Event, State] = actions, - applyEvent: (Event, State) ⇒ State = applyEvent, - recoveryCompleted: (ActorContext[Command], State) ⇒ State = recoveryCompleted): PersistentBehavior[Command, Event, State] = - new PersistentBehavior(persistenceIdFromActorName, initialState, actions, applyEvent, recoveryCompleted) + persistenceIdFromActorName: String ⇒ String = persistenceIdFromActorName, + initialState: State = initialState, + commandHandler: CommandHandler[Command, Event, State] = commandHandler, + eventHandler: (State, Event) ⇒ State = eventHandler, + recoveryCompleted: (ActorContext[Command], State) ⇒ Unit = recoveryCompleted): PersistentBehavior[Command, Event, State] = + new PersistentBehavior(persistenceIdFromActorName, initialState, commandHandler, eventHandler, recoveryCompleted) }