diff --git a/akka-typed-tests/src/test/scala/akka/typed/scaladsl/persistence/ApiTest.scala b/akka-typed-tests/src/test/scala/akka/typed/persistence/scaladsl/PersistentActorCompileOnlyTest.scala similarity index 66% rename from akka-typed-tests/src/test/scala/akka/typed/scaladsl/persistence/ApiTest.scala rename to akka-typed-tests/src/test/scala/akka/typed/persistence/scaladsl/PersistentActorCompileOnlyTest.scala index 3005507be4..ae71f11c65 100644 --- a/akka-typed-tests/src/test/scala/akka/typed/scaladsl/persistence/ApiTest.scala +++ b/akka-typed-tests/src/test/scala/akka/typed/persistence/scaladsl/PersistentActorCompileOnlyTest.scala @@ -1,78 +1,21 @@ -package akka.typed.scaladsl.persistence - -import akka.typed - -import scala.concurrent.duration._ -import akka.typed.{ ActorRef, Behavior, ExtensibleBehavior, Signal, Terminated } -import akka.typed.scaladsl.{ ActorContext, TimerScheduler } +/** + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.typed.persistence.scaladsl import scala.concurrent.ExecutionContext -import scala.reflect.ClassTag +import scala.concurrent.duration._ -class ApiTest { - object TypedPersistentActor { +import akka.typed.ActorRef +import akka.typed.Behavior +import akka.typed.Terminated +import akka.typed.scaladsl.Actor +import akka.typed.scaladsl.ActorContext +import akka.typed.scaladsl.TimerScheduler - sealed abstract class PersistentEffect[+Event, State]() { - def andThen(callback: State ⇒ Unit): PersistentEffect[Event, State] - } +object PersistentActorCompileOnlyTest { - case class PersistNothing[Event, State](callbacks: List[State ⇒ Unit] = Nil) extends PersistentEffect[Event, State] { - def andThen(callback: State ⇒ Unit) = copy(callbacks = callback :: callbacks) - } - - case class Persist[Event, State](event: Event, callbacks: List[State ⇒ Unit] = Nil) extends PersistentEffect[Event, State] { - def andThen(callback: State ⇒ Unit) = copy(callbacks = callback :: callbacks) - } - - case class Unhandled[Event, State](callbacks: List[State ⇒ Unit] = Nil) extends PersistentEffect[Event, State] { - def andThen(callback: State ⇒ Unit) = copy(callbacks = callback :: callbacks) - } - - class ActionHandler[Command: ClassTag, Event, State](val handler: ((Any, State, ActorContext[Command]) ⇒ PersistentEffect[Event, State])) { - def onSignal(signalHandler: PartialFunction[(Any, State, ActorContext[Command]), PersistentEffect[Event, State]]): ActionHandler[Command, Event, State] = - ActionHandler { - case (command: Command, state, ctx) ⇒ handler(command, state, ctx) - case (signal: Signal, state, ctx) ⇒ signalHandler.orElse(unhandledSignal).apply((signal, state, ctx)) - case _ ⇒ Unhandled() - } - private val unhandledSignal: PartialFunction[(Any, State, ActorContext[Command]), PersistentEffect[Event, State]] = { case _ ⇒ Unhandled() } - } - object ActionHandler { - def cmd[Command: ClassTag, Event, State](commandHandler: Command ⇒ PersistentEffect[Event, State]): ActionHandler[Command, Event, State] = ??? - def apply[Command: ClassTag, Event, State](commandHandler: ((Command, State, ActorContext[Command]) ⇒ PersistentEffect[Event, State])): ActionHandler[Command, Event, State] = ??? - def byState[Command: ClassTag, Event, State](actionHandler: State ⇒ ActionHandler[Command, Event, State]): ActionHandler[Command, Event, State] = - new ActionHandler(handler = { - case (action, state, ctx) ⇒ actionHandler(state).handler(action, state, ctx) - }) - } - } - - object Actor { - import TypedPersistentActor._ - - class PersistentBehavior[Command, Event, State] extends ExtensibleBehavior[Command] { - override def receiveSignal(ctx: typed.ActorContext[Command], msg: Signal): Behavior[Command] = ??? - override def receiveMessage(ctx: typed.ActorContext[Command], msg: Command): Behavior[Command] = ??? - - def onRecoveryComplete(callback: (ActorContext[Command], State) ⇒ Unit): PersistentBehavior[Command, Event, State] = ??? - def snapshotOnState(predicate: State ⇒ Boolean): PersistentBehavior[Command, Event, State] = ??? - def snapshotOn(predicate: (State, Event) ⇒ Boolean): PersistentBehavior[Command, Event, State] = ??? - } - - def persistent[Command, Event, State]( - persistenceId: String, - initialState: State, - commandHandler: ActionHandler[Command, Event, State], - onEvent: (Event, State) ⇒ State - ): PersistentBehavior[Command, Event, State] = ??? - } - - import TypedPersistentActor._ - - import akka.typed.scaladsl.AskPattern._ - implicit val timeout: akka.util.Timeout = 1.second - implicit val scheduler: akka.actor.Scheduler = ??? - implicit val ec: ExecutionContext = ??? + import akka.typed.persistence.scaladsl.PersistentActor._ object Simple { sealed trait MyCommand @@ -83,19 +26,18 @@ class ApiTest { case class ExampleState(events: List[String] = Nil) - Actor.persistent[MyCommand, MyEvent, ExampleState]( + PersistentActor.persistent[MyCommand, MyEvent, ExampleState]( persistenceId = "sample-id-1", initialState = ExampleState(Nil), - commandHandler = ActionHandler.cmd { + actions = Actions.command { case Cmd(data) ⇒ Persist(Evt(data)) }, onEvent = { case (Evt(data), state) ⇒ state.copy(data :: state.events) - } - ) + }) } object WithAck { @@ -109,12 +51,12 @@ class ApiTest { case class ExampleState(events: List[String] = Nil) - Actor.persistent[MyCommand, MyEvent, ExampleState]( + PersistentActor.persistent[MyCommand, MyEvent, ExampleState]( persistenceId = "sample-id-1", initialState = ExampleState(Nil), - commandHandler = ActionHandler.cmd { + actions = Actions.command { case Cmd(data, sender) ⇒ Persist[MyEvent, ExampleState](Evt(data)) .andThen { _ ⇒ { sender ! Ack } } @@ -122,8 +64,7 @@ class ApiTest { onEvent = { case (Evt(data), state) ⇒ state.copy(data :: state.events) - } - ) + }) } object RecoveryComplete { @@ -142,17 +83,22 @@ class ApiTest { val sideEffectProcessor: ActorRef[Request] = ??? def performSideEffect(sender: ActorRef[AcknowledgeSideEffect], correlationId: Int, data: String) = { + import akka.typed.scaladsl.AskPattern._ + implicit val timeout: akka.util.Timeout = 1.second + implicit val scheduler: akka.actor.Scheduler = ??? + implicit val ec: ExecutionContext = ??? + (sideEffectProcessor ? (Request(correlationId, data, _: ActorRef[Response]))) .map(response ⇒ AcknowledgeSideEffect(response.correlationId)) .foreach(sender ! _) } - Actor.persistent[Command, Event, EventsInFlight]( + PersistentActor.persistent[Command, Event, EventsInFlight]( persistenceId = "recovery-complete-id", initialState = EventsInFlight(0, Map.empty), - commandHandler = ActionHandler((cmd, state, ctx) ⇒ cmd match { + actions = Actions((cmd, state, ctx) ⇒ cmd match { case DoSideEffect(data) ⇒ Persist[Event, EventsInFlight](IntentRecorded(state.nextCorrelationId, data)).andThen { _ ⇒ performSideEffect(ctx.self, state.nextCorrelationId, data) @@ -168,17 +114,17 @@ class ApiTest { dataByCorrelationId = state.dataByCorrelationId + (correlationId → data)) case SideEffectAcknowledged(correlationId) ⇒ state.copy(dataByCorrelationId = state.dataByCorrelationId - correlationId) - }).onRecoveryComplete { - case (ctx, state) ⇒ { + }).onRecoveryCompleted { + case (state, ctx) ⇒ { state.dataByCorrelationId.foreach { case (correlationId, data) ⇒ performSideEffect(ctx.self, correlationId, data) } + state } } } - // Example with 'become' object Become { sealed trait Mood case object Happy extends Mood @@ -191,17 +137,17 @@ class ApiTest { sealed trait Event case class MoodChanged(to: Mood) extends Event - val b: Behavior[Command] = Actor.persistent[Command, Event, Mood]( + val b: Behavior[Command] = PersistentActor.persistent[Command, Event, Mood]( persistenceId = "myPersistenceId", initialState = Happy, - commandHandler = ActionHandler.byState { - case Happy ⇒ ActionHandler.cmd { + actions = Actions.byState { + case Happy ⇒ Actions.command { case Greet(whom) ⇒ println(s"Super happy to meet you $whom!") PersistNothing() case MoodSwing ⇒ Persist(MoodChanged(Sad)) } - case Sad ⇒ ActionHandler.cmd { + case Sad ⇒ Actions.command { case Greet(whom) ⇒ println(s"hi $whom") PersistNothing() @@ -210,16 +156,15 @@ class ApiTest { }, onEvent = { case (MoodChanged(to), _) ⇒ to - } - ) + }) - akka.typed.scaladsl.Actor.withTimers((timers: TimerScheduler[Command]) ⇒ { + // FIXME this doesn't work, wrapping is not supported + Actor.withTimers((timers: TimerScheduler[Command]) ⇒ { timers.startPeriodicTimer("swing", MoodSwing, 10.seconds) b }) } - // explicit snapshots object ExplicitSnapshots { type Task = String @@ -233,18 +178,17 @@ class ApiTest { case class State(tasksInFlight: List[Task]) - Actor.persistent[Command, Event, State]( + PersistentActor.persistent[Command, Event, State]( persistenceId = "asdf", initialState = State(Nil), - commandHandler = ActionHandler.cmd { + actions = Actions.command { case RegisterTask(task) ⇒ Persist(TaskRegistered(task)) case TaskDone(task) ⇒ Persist(TaskRemoved(task)) }, onEvent = (evt, state) ⇒ evt match { case TaskRegistered(task) ⇒ State(task :: state.tasksInFlight) case TaskRemoved(task) ⇒ State(state.tasksInFlight.filter(_ != task)) - } - ).snapshotOnState(_.tasksInFlight.isEmpty) + }).snapshotOnState(_.tasksInFlight.isEmpty) } object SpawnChild { @@ -261,10 +205,10 @@ class ApiTest { def worker(task: Task): Behavior[Nothing] = ??? - Actor.persistent[Command, Event, State]( + PersistentActor.persistent[Command, Event, State]( persistenceId = "asdf", initialState = State(Nil), - commandHandler = ActionHandler((cmd, _, ctx) ⇒ cmd match { + actions = Actions((cmd, _, ctx) ⇒ cmd match { case RegisterTask(task) ⇒ Persist[Event, State](TaskRegistered(task)) .andThen { _ ⇒ val child = ctx.spawn[Nothing](worker(task), task) @@ -276,8 +220,7 @@ class ApiTest { onEvent = (evt, state) ⇒ evt match { case TaskRegistered(task) ⇒ State(task :: state.tasksInFlight) case TaskRemoved(task) ⇒ State(state.tasksInFlight.filter(_ != task)) - } - ) + }) } object UsingSignals { @@ -292,11 +235,11 @@ class ApiTest { def worker(task: Task): Behavior[Nothing] = ??? - Actor.persistent[RegisterTask, Event, State]( + PersistentActor.persistent[RegisterTask, Event, State]( persistenceId = "asdf", initialState = State(Nil), // The 'onSignal' seems to break type inference here.. not sure if that can be avoided? - commandHandler = ActionHandler[RegisterTask, Event, State]((cmd, state, ctx) ⇒ cmd match { + actions = Actions[RegisterTask, Event, State]((cmd, state, ctx) ⇒ cmd match { case RegisterTask(task) ⇒ Persist[Event, State](TaskRegistered(task)) .andThen { _ ⇒ val child = ctx.spawn[Nothing](worker(task), task) @@ -313,8 +256,7 @@ class ApiTest { onEvent = (evt, state) ⇒ evt match { case TaskRegistered(task) ⇒ State(task :: state.tasksInFlight) case TaskRemoved(task) ⇒ State(state.tasksInFlight.filter(_ != task)) - } - ) + }) } object Rehydrating { @@ -349,22 +291,23 @@ class ApiTest { def isFullyHydrated(basket: Basket, ids: List[Id]) = basket.items.map(_.id) == ids - akka.typed.scaladsl.Actor.deferred { ctx: ActorContext[Command] ⇒ + Actor.deferred { ctx: ActorContext[Command] ⇒ + // FIXME this doesn't work, wrapping not supported + var basket = Basket(Nil) var stash: Seq[Command] = Nil val adapt = ctx.spawnAdapter((m: MetaData) ⇒ GotMetaData(m)) def addItem(id: Id, self: ActorRef[Command]) = Persist[Event, List[Id]](ItemAdded(id)).andThen(_ ⇒ - metadataRegistry ! GetMetaData(id, adapt) - ) + metadataRegistry ! GetMetaData(id, adapt)) - Actor.persistent[Command, Event, List[Id]]( + PersistentActor.persistent[Command, Event, List[Id]]( persistenceId = "basket-1", initialState = Nil, - commandHandler = - ActionHandler.byState(state ⇒ - if (isFullyHydrated(basket, state)) ActionHandler { (cmd, state, ctx) ⇒ + actions = + Actions.byState(state ⇒ + if (isFullyHydrated(basket, state)) Actions { (cmd, state, ctx) ⇒ cmd match { case AddItem(id) ⇒ addItem(id, ctx.self) case RemoveItem(id) ⇒ Persist(ItemRemoved(id)) @@ -373,7 +316,7 @@ class ApiTest { case GetTotalPrice(sender) ⇒ sender ! basket.items.map(_.price).sum; PersistNothing() } } - else ActionHandler { (cmd, state, ctx) ⇒ + else Actions { (cmd, state, ctx) ⇒ cmd match { case AddItem(id) ⇒ addItem(id, ctx.self) case RemoveItem(id) ⇒ Persist(ItemRemoved(id)) @@ -386,15 +329,14 @@ class ApiTest { PersistNothing() case cmd: GetTotalPrice ⇒ stash :+= cmd; PersistNothing() } - } - ), + }), onEvent = (evt, state) ⇒ evt match { case ItemAdded(id) ⇒ id +: state case ItemRemoved(id) ⇒ state.filter(_ != id) - } - ).onRecoveryComplete((ctx, state) ⇒ { + }).onRecoveryCompleted((state, ctx) ⇒ { val ad = ctx.spawnAdapter((m: MetaData) ⇒ GotMetaData(m)) state.foreach(id ⇒ metadataRegistry ! GetMetaData(id, ad)) + state }) } } @@ -417,10 +359,10 @@ class ApiTest { if (currentState == newMood) PersistNothing() else Persist(MoodChanged(newMood)) - Actor.persistent[Command, Event, Mood]( + PersistentActor.persistent[Command, Event, Mood]( persistenceId = "myPersistenceId", initialState = Sad, - commandHandler = ActionHandler { (cmd, state, _) ⇒ + actions = Actions { (cmd, state, _) ⇒ cmd match { case Greet(whom) ⇒ println(s"Hi there, I'm $state!") @@ -432,8 +374,8 @@ class ApiTest { }, onEvent = { case (MoodChanged(to), _) ⇒ to - } - ) + }) } -} \ No newline at end of file + +} diff --git a/akka-typed/src/main/scala/akka/typed/persistence/scaladsl/PersistentActor.scala b/akka-typed/src/main/scala/akka/typed/persistence/scaladsl/PersistentActor.scala index 524884b3b6..80d5891b80 100644 --- a/akka-typed/src/main/scala/akka/typed/persistence/scaladsl/PersistentActor.scala +++ b/akka-typed/src/main/scala/akka/typed/persistence/scaladsl/PersistentActor.scala @@ -49,6 +49,12 @@ object PersistentActor { def apply[Command, Event, State](commandHandler: CommandHandler[Command, Event, State]): Actions[Command, Event, State] = new Actions(commandHandler, Map.empty) + /** + * Convenience for simple commands that don't need the state and context. + */ + def command[Command, Event, State](commandHandler: Command ⇒ PersistentEffect[Event, State]): Actions[Command, Event, State] = + apply((cmd, _, _) ⇒ commandHandler(cmd)) + /** * Select different actions based on current state. */