diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ClusterShardingPersistenceSpec.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ClusterShardingPersistenceSpec.scala index ac9821f92c..27dc19a778 100644 --- a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ClusterShardingPersistenceSpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ClusterShardingPersistenceSpec.scala @@ -42,13 +42,13 @@ object ClusterShardingPersistenceSpec { PersistentActor.persistentEntity[Command, String, String]( persistenceIdFromActorName = name ⇒ "Test-" + name, initialState = "", - commandHandler = CommandHandler((_, state, cmd) ⇒ cmd match { + commandHandler = (_, state, cmd) ⇒ cmd match { case Add(s) ⇒ Effect.persist(s) case Get(replyTo) ⇒ replyTo ! state Effect.none case StopPlz ⇒ Effect.stop - }), + }, eventHandler = (state, evt) ⇒ if (state.isEmpty) evt else state + "|" + evt) val typeKey = EntityTypeKey[Command]("test") diff --git a/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterSingletonPersistenceSpec.scala b/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterSingletonPersistenceSpec.scala index 470a20bdb5..cdafce169e 100644 --- a/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterSingletonPersistenceSpec.scala +++ b/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterSingletonPersistenceSpec.scala @@ -40,13 +40,13 @@ object ClusterSingletonPersistenceSpec { PersistentActor.immutable[Command, String, String]( persistenceId = "TheSingleton", initialState = "", - commandHandler = CommandHandler((_, state, cmd) ⇒ cmd match { + commandHandler = (_, state, cmd) ⇒ cmd match { case Add(s) ⇒ Effect.persist(s) case Get(replyTo) ⇒ replyTo ! state Effect.none case StopPlz ⇒ Effect.stop - }), + }, eventHandler = (state, evt) ⇒ if (state.isEmpty) evt else state + "|" + evt) } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/PersistentActorImpl.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/PersistentActorImpl.scala index c5fd65649f..365344b4e3 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/PersistentActorImpl.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/PersistentActorImpl.scala @@ -80,15 +80,12 @@ import akka.actor.typed.internal.adapter.ActorRefAdapter case msg ⇒ try { val effects = msg match { - case a.Terminated(ref) ⇒ - val sig = Terminated(ActorRefAdapter(ref))(null) - commandHandler.sigHandler(state).applyOrElse((ctx, state, sig), unhandledSignal) case a.ReceiveTimeout ⇒ - commandHandler.commandHandler(ctx, state, ctxAdapter.receiveTimeoutMsg) - // TODO note that PostStop and PreRestart signals are not handled, we wouldn't be able to persist there + commandHandler(ctx, state, ctxAdapter.receiveTimeoutMsg) + // TODO note that PostStop, PreRestart and Terminated signals are not handled, we wouldn't be able to persist there case cmd: C @unchecked ⇒ // FIXME we could make it more safe by using ClassTag for C - commandHandler.commandHandler(ctx, state, cmd) + commandHandler(ctx, state, cmd) } applyEffects(msg, effects) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentActor.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentActor.scala index 685f89a48e..1ea64736eb 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentActor.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentActor.scala @@ -4,9 +4,8 @@ package akka.persistence.typed.scaladsl import scala.collection.{ immutable ⇒ im } -import akka.annotation.{ ApiMayChange, DoNotInherit, InternalApi } +import akka.annotation.{ DoNotInherit, InternalApi } import akka.actor.typed.Behavior.UntypedBehavior -import akka.actor.typed.Signal import akka.persistence.typed.internal.PersistentActorImpl import akka.actor.typed.scaladsl.ActorContext @@ -144,40 +143,29 @@ object PersistentActor { @InternalApi private[akka] case object Unhandled extends Effect[Nothing, Nothing] - type CommandToEffect[Command, Event, State] = (ActorContext[Command], State, Command) ⇒ Effect[Event, State] - type SignalHandler[Command, Event, State] = PartialFunction[(ActorContext[Command], State, Signal), Effect[Event, State]] + type CommandHandler[Command, Event, State] = (ActorContext[Command], State, Command) ⇒ Effect[Event, State] /** - * The `CommandHandler` defines how to act on commands and partial function for other signals, - * e.g. `Termination` messages if `watch` is used. + * The `CommandHandler` defines how to act on commands. * * Note that you can have different command handlers based on current state by using * [[CommandHandler#byState]]. */ object CommandHandler { - /** - * Create a command handler that will be applied for commands. - * - * @see [[Effect]] for possible effects of a command. - */ - // Note: using full parameter type instead of type aliase here to make API more straight forward to figure out in an IDE - def apply[Command, Event, State](commandHandler: (ActorContext[Command], State, Command) ⇒ Effect[Event, State]): CommandHandler[Command, Event, State] = - new CommandHandler(commandHandler, Map.empty) - /** * Convenience for simple commands that don't need the state and context. * * @see [[Effect]] for possible effects of a command. */ def command[Command, Event, State](commandHandler: Command ⇒ Effect[Event, State]): CommandHandler[Command, Event, State] = - apply((_, _, cmd) ⇒ commandHandler(cmd)) + (_, _, cmd) ⇒ commandHandler(cmd) /** * Select different command handlers based on current state. */ def byState[Command, Event, State](choice: State ⇒ CommandHandler[Command, Event, State]): CommandHandler[Command, Event, State] = - new ByStateCommandHandler(choice, signalHandler = PartialFunction.empty) + new ByStateCommandHandler(choice) } @@ -185,46 +173,13 @@ object PersistentActor { * INTERNAL API */ @InternalApi private[akka] final class ByStateCommandHandler[Command, Event, State]( - choice: State ⇒ CommandHandler[Command, Event, State], - signalHandler: SignalHandler[Command, Event, State]) - extends CommandHandler[Command, Event, State]( - commandHandler = (ctx, state, cmd) ⇒ choice(state).commandHandler(ctx, state, cmd), - signalHandler) { + choice: State ⇒ CommandHandler[Command, Event, State]) + extends CommandHandler[Command, Event, State] { - // SignalHandler may be registered in the wrapper or in the wrapped - private[akka] override def sigHandler(state: State): SignalHandler[Command, Event, State] = - choice(state).sigHandler(state).orElse(signalHandler) - - // override to preserve the ByStateCommandHandler - private[akka] override def withSignalHandler( - handler: SignalHandler[Command, Event, State]): CommandHandler[Command, Event, State] = - new ByStateCommandHandler(choice, handler) + override def apply(ctx: ActorContext[Command], state: State, cmd: Command): Effect[Event, State] = + choice(state)(ctx, state, cmd) } - - /** - * `CommandHandler` defines command handlers and partial function for other signals, - * e.g. `Termination` messages if `watch` is used. - * `CommandHandler` is an immutable class. - */ - @DoNotInherit class CommandHandler[Command, Event, State] private[akka] ( - val commandHandler: CommandToEffect[Command, Event, State], - val signalHandler: SignalHandler[Command, Event, State]) { - - @InternalApi private[akka] def sigHandler(state: State): SignalHandler[Command, Event, State] = - signalHandler - - // Note: using full parameter type instead of type alias here to make API more straight forward to figure out in an IDE - def onSignal(handler: PartialFunction[(ActorContext[Command], State, Signal), Effect[Event, State]]): CommandHandler[Command, Event, State] = - withSignalHandler(signalHandler.orElse(handler)) - - /** INTERNAL API */ - @InternalApi private[akka] def withSignalHandler( - handler: SignalHandler[Command, Event, State]): CommandHandler[Command, Event, State] = - new CommandHandler(commandHandler, handler) - - } - } class PersistentBehavior[Command, Event, State]( diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala index 0b6698f6a2..6837d3512b 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala @@ -111,14 +111,14 @@ object PersistentActorCompileOnlyTest { initialState = EventsInFlight(0, Map.empty), - commandHandler = CommandHandler((ctx, state, cmd) ⇒ cmd match { + commandHandler = (ctx, state, cmd) ⇒ cmd match { case DoSideEffect(data) ⇒ Effect.persist(IntentRecorded(state.nextCorrelationId, data)).andThen { performSideEffect(ctx.self, state.nextCorrelationId, data) } case AcknowledgeSideEffect(correlationId) ⇒ Effect.persist(SideEffectAcknowledged(correlationId)) - }), + }, eventHandler = (state, evt) ⇒ evt match { case IntentRecorded(correlationId, data) ⇒ @@ -220,7 +220,7 @@ object PersistentActorCompileOnlyTest { PersistentActor.immutable[Command, Event, State]( persistenceId = "asdf", initialState = State(Nil), - commandHandler = CommandHandler((ctx, _, cmd) ⇒ cmd match { + commandHandler = (ctx, _, cmd) ⇒ cmd match { case RegisterTask(task) ⇒ Effect.persist(TaskRegistered(task)) .andThen { @@ -229,42 +229,6 @@ object PersistentActorCompileOnlyTest { ctx.watchWith(child, TaskDone(task)) } case TaskDone(task) ⇒ Effect.persist(TaskRemoved(task)) - }), - eventHandler = (state, evt) ⇒ evt match { - case TaskRegistered(task) ⇒ State(task :: state.tasksInFlight) - case TaskRemoved(task) ⇒ State(state.tasksInFlight.filter(_ != task)) - }) - } - - object UsingSignals { - type Task = String - case class RegisterTask(task: Task) - - sealed trait Event - case class TaskRegistered(task: Task) extends Event - case class TaskRemoved(task: Task) extends Event - - case class State(tasksInFlight: List[Task]) - - def worker(task: Task): Behavior[Nothing] = ??? - - PersistentActor.immutable[RegisterTask, Event, State]( - persistenceId = "asdf", - initialState = State(Nil), - // The 'onSignal' seems to break type inference here.. not sure if that can be avoided? - commandHandler = CommandHandler[RegisterTask, Event, State]((ctx, state, cmd) ⇒ cmd match { - case RegisterTask(task) ⇒ Effect.persist(TaskRegistered(task)) - .andThen { - val child = ctx.spawn[Nothing](worker(task), task) - // This assumes *any* termination of the child may trigger a `TaskDone`: - ctx.watch(child) - } - }).onSignal { - case (ctx, _, Terminated(actorRef)) ⇒ - // watchWith (as in the above example) is nicer because it means we don't have to - // 'manually' associate the task and the child actor, but we wanted to demonstrate - // signals here: - Effect.persist(TaskRemoved(actorRef.path.name)) }, eventHandler = (state, evt) ⇒ evt match { case TaskRegistered(task) ⇒ State(task :: state.tasksInFlight) @@ -321,7 +285,7 @@ object PersistentActorCompileOnlyTest { initialState = Nil, commandHandler = CommandHandler.byState(state ⇒ - if (isFullyHydrated(basket, state)) CommandHandler { (ctx, state, cmd) ⇒ + if (isFullyHydrated(basket, state)) (ctx, state, cmd) ⇒ cmd match { case AddItem(id) ⇒ addItem(id, ctx.self) case RemoveItem(id) ⇒ Effect.persist(ItemRemoved(id)) @@ -332,8 +296,7 @@ object PersistentActorCompileOnlyTest { sender ! basket.items.map(_.price).sum Effect.none } - } - else CommandHandler { (ctx, state, cmd) ⇒ + else (ctx, state, cmd) ⇒ cmd match { case AddItem(id) ⇒ addItem(id, ctx.self) case RemoveItem(id) ⇒ Effect.persist(ItemRemoved(id)) @@ -348,7 +311,7 @@ object PersistentActorCompileOnlyTest { stash :+= cmd Effect.none } - }), + ), eventHandler = (state, evt) ⇒ evt match { case ItemAdded(id) ⇒ id +: state case ItemRemoved(id) ⇒ state.filter(_ != id) @@ -382,7 +345,7 @@ object PersistentActorCompileOnlyTest { PersistentActor.immutable[Command, Event, Mood]( persistenceId = "myPersistenceId", initialState = Sad, - commandHandler = CommandHandler { (_, state, cmd) ⇒ + commandHandler = (_, state, cmd) ⇒ cmd match { case Greet(whom) ⇒ println(s"Hi there, I'm $state!") @@ -398,8 +361,7 @@ object PersistentActorCompileOnlyTest { val commonEffects = changeMoodIfNeeded(state, Happy) Effect.persist(commonEffects.events :+ Remembered(memory), commonEffects.sideEffects) - } - }, + }, eventHandler = { case (_, MoodChanged(to)) ⇒ to case (state, Remembered(_)) ⇒ state diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorSpec.scala index 3024c442da..e8797203df 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorSpec.scala @@ -28,6 +28,7 @@ object PersistentActorSpec { final case object DoNothingAndThenLog extends Command final case object EmptyEventsListAndThenLog extends Command final case class GetValue(replyTo: ActorRef[State]) extends Command + final case object DelayFinished extends Command private case object Timeout extends Command sealed trait Event @@ -48,7 +49,7 @@ object PersistentActorSpec { PersistentActor.immutable[Command, Event, State]( persistenceId, initialState = State(0, Vector.empty), - commandHandler = CommandHandler[Command, Event, State]((ctx, state, cmd) ⇒ cmd match { + commandHandler = (ctx, state, cmd) ⇒ cmd match { case Increment ⇒ Effect.persist(Incremented(1)) case GetValue(replyTo) ⇒ @@ -62,8 +63,10 @@ object PersistentActorSpec { case Tick ⇒ Actor.stopped }) }) - ctx.watch(delay) + ctx.watchWith(delay, DelayFinished) Effect.none + case DelayFinished ⇒ + Effect.persist(Incremented(10)) case IncrementAfterReceiveTimeout ⇒ ctx.setReceiveTimeout(10.millis, Timeout) Effect.none @@ -94,11 +97,7 @@ object PersistentActorSpec { .andThen { loggingActor ! firstLogging } - }) - .onSignal { - case (_, _, Terminated(_)) ⇒ - Effect.persist(Incremented(10)) - }, + }, eventHandler = (state, evt) ⇒ evt match { case Incremented(delta) ⇒ State(state.value + delta, state.history :+ state.value) diff --git a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentActorSpec.scala b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentActorSpec.scala index b1162fecce..9678307656 100644 --- a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentActorSpec.scala +++ b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentActorSpec.scala @@ -17,7 +17,7 @@ object BasicPersistentActorSpec { PersistentActor.immutable[Command, Event, State]( persistenceId = "abc", initialState = State(), - commandHandler = PersistentActor.CommandHandler { (ctx, state, cmd) ⇒ ??? }, + commandHandler = (ctx, state, cmd) ⇒ ???, eventHandler = (state, evt) ⇒ ???) //#structure @@ -26,7 +26,7 @@ object BasicPersistentActorSpec { PersistentActor.immutable[Command, Event, State]( persistenceId = "abc", initialState = State(), - commandHandler = PersistentActor.CommandHandler { (ctx, state, cmd) ⇒ ??? }, + commandHandler = (ctx, state, cmd) ⇒ ???, eventHandler = (state, evt) ⇒ ???) .onRecoveryCompleted { (ctx, state) ⇒ ??? diff --git a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentActorSpec.scala b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentActorSpec.scala index 7bf2264b3e..b3b5ed1bc5 100644 --- a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentActorSpec.scala +++ b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentActorSpec.scala @@ -51,7 +51,7 @@ object InDepthPersistentActorSpec { //#initial-command-handler private def initial: CommandHandler[BlogCommand, BlogEvent, BlogState] = - CommandHandler { (ctx, state, cmd) ⇒ + (ctx, state, cmd) ⇒ cmd match { case AddPost(content, replyTo) ⇒ val evt = PostAdded(content.postId, content) @@ -64,12 +64,11 @@ object InDepthPersistentActorSpec { case _ ⇒ Effect.unhandled } - } //#initial-command-handler //#post-added-command-handler private def postAdded: CommandHandler[BlogCommand, BlogEvent, BlogState] = { - CommandHandler { (ctx, state, cmd) ⇒ + (ctx, state, cmd) ⇒ cmd match { case ChangeBody(newBody, replyTo) ⇒ val evt = BodyChanged(state.postId, newBody) @@ -89,7 +88,6 @@ object InDepthPersistentActorSpec { case PassivatePost ⇒ Effect.stop } - } } //#post-added-command-handler