Move ApiTest to PersistentActorCompileOnlyTest

This commit is contained in:
Arnout Engelen 2017-09-21 07:49:47 +02:00 committed by Patrik Nordwall
parent b671d2a47b
commit 25567ea868
2 changed files with 68 additions and 120 deletions

View file

@ -1,78 +1,21 @@
package akka.typed.scaladsl.persistence
import akka.typed
import scala.concurrent.duration._
import akka.typed.{ ActorRef, Behavior, ExtensibleBehavior, Signal, Terminated }
import akka.typed.scaladsl.{ ActorContext, TimerScheduler }
/**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.typed.persistence.scaladsl
import scala.concurrent.ExecutionContext
import scala.reflect.ClassTag
import scala.concurrent.duration._
class ApiTest {
object TypedPersistentActor {
import akka.typed.ActorRef
import akka.typed.Behavior
import akka.typed.Terminated
import akka.typed.scaladsl.Actor
import akka.typed.scaladsl.ActorContext
import akka.typed.scaladsl.TimerScheduler
sealed abstract class PersistentEffect[+Event, State]() {
def andThen(callback: State Unit): PersistentEffect[Event, State]
}
object PersistentActorCompileOnlyTest {
case class PersistNothing[Event, State](callbacks: List[State Unit] = Nil) extends PersistentEffect[Event, State] {
def andThen(callback: State Unit) = copy(callbacks = callback :: callbacks)
}
case class Persist[Event, State](event: Event, callbacks: List[State Unit] = Nil) extends PersistentEffect[Event, State] {
def andThen(callback: State Unit) = copy(callbacks = callback :: callbacks)
}
case class Unhandled[Event, State](callbacks: List[State Unit] = Nil) extends PersistentEffect[Event, State] {
def andThen(callback: State Unit) = copy(callbacks = callback :: callbacks)
}
class ActionHandler[Command: ClassTag, Event, State](val handler: ((Any, State, ActorContext[Command]) PersistentEffect[Event, State])) {
def onSignal(signalHandler: PartialFunction[(Any, State, ActorContext[Command]), PersistentEffect[Event, State]]): ActionHandler[Command, Event, State] =
ActionHandler {
case (command: Command, state, ctx) handler(command, state, ctx)
case (signal: Signal, state, ctx) signalHandler.orElse(unhandledSignal).apply((signal, state, ctx))
case _ Unhandled()
}
private val unhandledSignal: PartialFunction[(Any, State, ActorContext[Command]), PersistentEffect[Event, State]] = { case _ Unhandled() }
}
object ActionHandler {
def cmd[Command: ClassTag, Event, State](commandHandler: Command PersistentEffect[Event, State]): ActionHandler[Command, Event, State] = ???
def apply[Command: ClassTag, Event, State](commandHandler: ((Command, State, ActorContext[Command]) PersistentEffect[Event, State])): ActionHandler[Command, Event, State] = ???
def byState[Command: ClassTag, Event, State](actionHandler: State ActionHandler[Command, Event, State]): ActionHandler[Command, Event, State] =
new ActionHandler(handler = {
case (action, state, ctx) actionHandler(state).handler(action, state, ctx)
})
}
}
object Actor {
import TypedPersistentActor._
class PersistentBehavior[Command, Event, State] extends ExtensibleBehavior[Command] {
override def receiveSignal(ctx: typed.ActorContext[Command], msg: Signal): Behavior[Command] = ???
override def receiveMessage(ctx: typed.ActorContext[Command], msg: Command): Behavior[Command] = ???
def onRecoveryComplete(callback: (ActorContext[Command], State) Unit): PersistentBehavior[Command, Event, State] = ???
def snapshotOnState(predicate: State Boolean): PersistentBehavior[Command, Event, State] = ???
def snapshotOn(predicate: (State, Event) Boolean): PersistentBehavior[Command, Event, State] = ???
}
def persistent[Command, Event, State](
persistenceId: String,
initialState: State,
commandHandler: ActionHandler[Command, Event, State],
onEvent: (Event, State) State
): PersistentBehavior[Command, Event, State] = ???
}
import TypedPersistentActor._
import akka.typed.scaladsl.AskPattern._
implicit val timeout: akka.util.Timeout = 1.second
implicit val scheduler: akka.actor.Scheduler = ???
implicit val ec: ExecutionContext = ???
import akka.typed.persistence.scaladsl.PersistentActor._
object Simple {
sealed trait MyCommand
@ -83,19 +26,18 @@ class ApiTest {
case class ExampleState(events: List[String] = Nil)
Actor.persistent[MyCommand, MyEvent, ExampleState](
PersistentActor.persistent[MyCommand, MyEvent, ExampleState](
persistenceId = "sample-id-1",
initialState = ExampleState(Nil),
commandHandler = ActionHandler.cmd {
actions = Actions.command {
case Cmd(data) Persist(Evt(data))
},
onEvent = {
case (Evt(data), state) state.copy(data :: state.events)
}
)
})
}
object WithAck {
@ -109,12 +51,12 @@ class ApiTest {
case class ExampleState(events: List[String] = Nil)
Actor.persistent[MyCommand, MyEvent, ExampleState](
PersistentActor.persistent[MyCommand, MyEvent, ExampleState](
persistenceId = "sample-id-1",
initialState = ExampleState(Nil),
commandHandler = ActionHandler.cmd {
actions = Actions.command {
case Cmd(data, sender)
Persist[MyEvent, ExampleState](Evt(data))
.andThen { _ { sender ! Ack } }
@ -122,8 +64,7 @@ class ApiTest {
onEvent = {
case (Evt(data), state) state.copy(data :: state.events)
}
)
})
}
object RecoveryComplete {
@ -142,17 +83,22 @@ class ApiTest {
val sideEffectProcessor: ActorRef[Request] = ???
def performSideEffect(sender: ActorRef[AcknowledgeSideEffect], correlationId: Int, data: String) = {
import akka.typed.scaladsl.AskPattern._
implicit val timeout: akka.util.Timeout = 1.second
implicit val scheduler: akka.actor.Scheduler = ???
implicit val ec: ExecutionContext = ???
(sideEffectProcessor ? (Request(correlationId, data, _: ActorRef[Response])))
.map(response AcknowledgeSideEffect(response.correlationId))
.foreach(sender ! _)
}
Actor.persistent[Command, Event, EventsInFlight](
PersistentActor.persistent[Command, Event, EventsInFlight](
persistenceId = "recovery-complete-id",
initialState = EventsInFlight(0, Map.empty),
commandHandler = ActionHandler((cmd, state, ctx) cmd match {
actions = Actions((cmd, state, ctx) cmd match {
case DoSideEffect(data)
Persist[Event, EventsInFlight](IntentRecorded(state.nextCorrelationId, data)).andThen { _
performSideEffect(ctx.self, state.nextCorrelationId, data)
@ -168,17 +114,17 @@ class ApiTest {
dataByCorrelationId = state.dataByCorrelationId + (correlationId data))
case SideEffectAcknowledged(correlationId)
state.copy(dataByCorrelationId = state.dataByCorrelationId - correlationId)
}).onRecoveryComplete {
case (ctx, state) {
}).onRecoveryCompleted {
case (state, ctx) {
state.dataByCorrelationId.foreach {
case (correlationId, data) performSideEffect(ctx.self, correlationId, data)
}
state
}
}
}
// Example with 'become'
object Become {
sealed trait Mood
case object Happy extends Mood
@ -191,17 +137,17 @@ class ApiTest {
sealed trait Event
case class MoodChanged(to: Mood) extends Event
val b: Behavior[Command] = Actor.persistent[Command, Event, Mood](
val b: Behavior[Command] = PersistentActor.persistent[Command, Event, Mood](
persistenceId = "myPersistenceId",
initialState = Happy,
commandHandler = ActionHandler.byState {
case Happy ActionHandler.cmd {
actions = Actions.byState {
case Happy Actions.command {
case Greet(whom)
println(s"Super happy to meet you $whom!")
PersistNothing()
case MoodSwing Persist(MoodChanged(Sad))
}
case Sad ActionHandler.cmd {
case Sad Actions.command {
case Greet(whom)
println(s"hi $whom")
PersistNothing()
@ -210,16 +156,15 @@ class ApiTest {
},
onEvent = {
case (MoodChanged(to), _) to
}
)
})
akka.typed.scaladsl.Actor.withTimers((timers: TimerScheduler[Command]) {
// FIXME this doesn't work, wrapping is not supported
Actor.withTimers((timers: TimerScheduler[Command]) {
timers.startPeriodicTimer("swing", MoodSwing, 10.seconds)
b
})
}
// explicit snapshots
object ExplicitSnapshots {
type Task = String
@ -233,18 +178,17 @@ class ApiTest {
case class State(tasksInFlight: List[Task])
Actor.persistent[Command, Event, State](
PersistentActor.persistent[Command, Event, State](
persistenceId = "asdf",
initialState = State(Nil),
commandHandler = ActionHandler.cmd {
actions = Actions.command {
case RegisterTask(task) Persist(TaskRegistered(task))
case TaskDone(task) Persist(TaskRemoved(task))
},
onEvent = (evt, state) evt match {
case TaskRegistered(task) State(task :: state.tasksInFlight)
case TaskRemoved(task) State(state.tasksInFlight.filter(_ != task))
}
).snapshotOnState(_.tasksInFlight.isEmpty)
}).snapshotOnState(_.tasksInFlight.isEmpty)
}
object SpawnChild {
@ -261,10 +205,10 @@ class ApiTest {
def worker(task: Task): Behavior[Nothing] = ???
Actor.persistent[Command, Event, State](
PersistentActor.persistent[Command, Event, State](
persistenceId = "asdf",
initialState = State(Nil),
commandHandler = ActionHandler((cmd, _, ctx) cmd match {
actions = Actions((cmd, _, ctx) cmd match {
case RegisterTask(task) Persist[Event, State](TaskRegistered(task))
.andThen { _
val child = ctx.spawn[Nothing](worker(task), task)
@ -276,8 +220,7 @@ class ApiTest {
onEvent = (evt, state) evt match {
case TaskRegistered(task) State(task :: state.tasksInFlight)
case TaskRemoved(task) State(state.tasksInFlight.filter(_ != task))
}
)
})
}
object UsingSignals {
@ -292,11 +235,11 @@ class ApiTest {
def worker(task: Task): Behavior[Nothing] = ???
Actor.persistent[RegisterTask, Event, State](
PersistentActor.persistent[RegisterTask, Event, State](
persistenceId = "asdf",
initialState = State(Nil),
// The 'onSignal' seems to break type inference here.. not sure if that can be avoided?
commandHandler = ActionHandler[RegisterTask, Event, State]((cmd, state, ctx) cmd match {
actions = Actions[RegisterTask, Event, State]((cmd, state, ctx) cmd match {
case RegisterTask(task) Persist[Event, State](TaskRegistered(task))
.andThen { _
val child = ctx.spawn[Nothing](worker(task), task)
@ -313,8 +256,7 @@ class ApiTest {
onEvent = (evt, state) evt match {
case TaskRegistered(task) State(task :: state.tasksInFlight)
case TaskRemoved(task) State(state.tasksInFlight.filter(_ != task))
}
)
})
}
object Rehydrating {
@ -349,22 +291,23 @@ class ApiTest {
def isFullyHydrated(basket: Basket, ids: List[Id]) = basket.items.map(_.id) == ids
akka.typed.scaladsl.Actor.deferred { ctx: ActorContext[Command]
Actor.deferred { ctx: ActorContext[Command]
// FIXME this doesn't work, wrapping not supported
var basket = Basket(Nil)
var stash: Seq[Command] = Nil
val adapt = ctx.spawnAdapter((m: MetaData) GotMetaData(m))
def addItem(id: Id, self: ActorRef[Command]) =
Persist[Event, List[Id]](ItemAdded(id)).andThen(_
metadataRegistry ! GetMetaData(id, adapt)
)
metadataRegistry ! GetMetaData(id, adapt))
Actor.persistent[Command, Event, List[Id]](
PersistentActor.persistent[Command, Event, List[Id]](
persistenceId = "basket-1",
initialState = Nil,
commandHandler =
ActionHandler.byState(state
if (isFullyHydrated(basket, state)) ActionHandler { (cmd, state, ctx)
actions =
Actions.byState(state
if (isFullyHydrated(basket, state)) Actions { (cmd, state, ctx)
cmd match {
case AddItem(id) addItem(id, ctx.self)
case RemoveItem(id) Persist(ItemRemoved(id))
@ -373,7 +316,7 @@ class ApiTest {
case GetTotalPrice(sender) sender ! basket.items.map(_.price).sum; PersistNothing()
}
}
else ActionHandler { (cmd, state, ctx)
else Actions { (cmd, state, ctx)
cmd match {
case AddItem(id) addItem(id, ctx.self)
case RemoveItem(id) Persist(ItemRemoved(id))
@ -386,15 +329,14 @@ class ApiTest {
PersistNothing()
case cmd: GetTotalPrice stash :+= cmd; PersistNothing()
}
}
),
}),
onEvent = (evt, state) evt match {
case ItemAdded(id) id +: state
case ItemRemoved(id) state.filter(_ != id)
}
).onRecoveryComplete((ctx, state) {
}).onRecoveryCompleted((state, ctx) {
val ad = ctx.spawnAdapter((m: MetaData) GotMetaData(m))
state.foreach(id metadataRegistry ! GetMetaData(id, ad))
state
})
}
}
@ -417,10 +359,10 @@ class ApiTest {
if (currentState == newMood) PersistNothing()
else Persist(MoodChanged(newMood))
Actor.persistent[Command, Event, Mood](
PersistentActor.persistent[Command, Event, Mood](
persistenceId = "myPersistenceId",
initialState = Sad,
commandHandler = ActionHandler { (cmd, state, _)
actions = Actions { (cmd, state, _)
cmd match {
case Greet(whom)
println(s"Hi there, I'm $state!")
@ -432,8 +374,8 @@ class ApiTest {
},
onEvent = {
case (MoodChanged(to), _) to
}
)
})
}
}
}

View file

@ -49,6 +49,12 @@ object PersistentActor {
def apply[Command, Event, State](commandHandler: CommandHandler[Command, Event, State]): Actions[Command, Event, State] =
new Actions(commandHandler, Map.empty)
/**
* Convenience for simple commands that don't need the state and context.
*/
def command[Command, Event, State](commandHandler: Command PersistentEffect[Event, State]): Actions[Command, Event, State] =
apply((cmd, _, _) commandHandler(cmd))
/**
* Select different actions based on current state.
*/