Some additional api adjustments, #22273

* rename PersistentEffect to Effect
* change order of parameters, ctx first
* rename onEvent to applyEvent
* persistenceIdFromActorName for Cluster Sharding
* PersistenActor.immutable
This commit is contained in:
Patrik Nordwall 2017-09-22 10:56:52 +02:00
parent 6d94eb426b
commit 0c5440c036
4 changed files with 107 additions and 86 deletions

View file

@ -26,7 +26,7 @@ object PersistentActorCompileOnlyTest {
case class ExampleState(events: List[String] = Nil)
PersistentActor.persistent[MyCommand, MyEvent, ExampleState](
PersistentActor.immutable[MyCommand, MyEvent, ExampleState](
persistenceId = "sample-id-1",
initialState = ExampleState(Nil),
@ -35,7 +35,7 @@ object PersistentActorCompileOnlyTest {
case Cmd(data) Persist(Evt(data))
},
onEvent = {
applyEvent = {
case (Evt(data), state) state.copy(data :: state.events)
})
}
@ -51,7 +51,7 @@ object PersistentActorCompileOnlyTest {
case class ExampleState(events: List[String] = Nil)
PersistentActor.persistent[MyCommand, MyEvent, ExampleState](
PersistentActor.immutable[MyCommand, MyEvent, ExampleState](
persistenceId = "sample-id-1",
initialState = ExampleState(Nil),
@ -62,7 +62,7 @@ object PersistentActorCompileOnlyTest {
.andThen { _ { sender ! Ack } }
},
onEvent = {
applyEvent = {
case (Evt(data), state) state.copy(data :: state.events)
})
}
@ -93,12 +93,12 @@ object PersistentActorCompileOnlyTest {
.foreach(sender ! _)
}
PersistentActor.persistent[Command, Event, EventsInFlight](
PersistentActor.immutable[Command, Event, EventsInFlight](
persistenceId = "recovery-complete-id",
initialState = EventsInFlight(0, Map.empty),
actions = Actions((cmd, state, ctx) cmd match {
actions = Actions((ctx, cmd, state) cmd match {
case DoSideEffect(data)
Persist(IntentRecorded(state.nextCorrelationId, data)).andThen { _
performSideEffect(ctx.self, state.nextCorrelationId, data)
@ -107,7 +107,7 @@ object PersistentActorCompileOnlyTest {
Persist(SideEffectAcknowledged(correlationId))
}),
onEvent = (evt, state) evt match {
applyEvent = (evt, state) evt match {
case IntentRecorded(correlationId, data)
EventsInFlight(
nextCorrelationId = correlationId + 1,
@ -115,7 +115,7 @@ object PersistentActorCompileOnlyTest {
case SideEffectAcknowledged(correlationId)
state.copy(dataByCorrelationId = state.dataByCorrelationId - correlationId)
}).onRecoveryCompleted {
case (state, ctx) {
case (ctx, state) {
state.dataByCorrelationId.foreach {
case (correlationId, data) performSideEffect(ctx.self, correlationId, data)
}
@ -137,7 +137,7 @@ object PersistentActorCompileOnlyTest {
sealed trait Event
case class MoodChanged(to: Mood) extends Event
val b: Behavior[Command] = PersistentActor.persistent[Command, Event, Mood](
val b: Behavior[Command] = PersistentActor.immutable[Command, Event, Mood](
persistenceId = "myPersistenceId",
initialState = Happy,
actions = Actions.byState {
@ -154,7 +154,7 @@ object PersistentActorCompileOnlyTest {
case MoodSwing Persist(MoodChanged(Happy))
}
},
onEvent = {
applyEvent = {
case (MoodChanged(to), _) to
})
@ -178,14 +178,14 @@ object PersistentActorCompileOnlyTest {
case class State(tasksInFlight: List[Task])
PersistentActor.persistent[Command, Event, State](
PersistentActor.immutable[Command, Event, State](
persistenceId = "asdf",
initialState = State(Nil),
actions = Actions.command {
case RegisterTask(task) Persist(TaskRegistered(task))
case TaskDone(task) Persist(TaskRemoved(task))
},
onEvent = (evt, state) evt match {
applyEvent = (evt, state) evt match {
case TaskRegistered(task) State(task :: state.tasksInFlight)
case TaskRemoved(task) State(state.tasksInFlight.filter(_ != task))
}).snapshotOnState(_.tasksInFlight.isEmpty)
@ -205,10 +205,10 @@ object PersistentActorCompileOnlyTest {
def worker(task: Task): Behavior[Nothing] = ???
PersistentActor.persistent[Command, Event, State](
PersistentActor.immutable[Command, Event, State](
persistenceId = "asdf",
initialState = State(Nil),
actions = Actions((cmd, _, ctx) cmd match {
actions = Actions((ctx, cmd, _) cmd match {
case RegisterTask(task) Persist(TaskRegistered(task))
.andThen { _
val child = ctx.spawn[Nothing](worker(task), task)
@ -217,7 +217,7 @@ object PersistentActorCompileOnlyTest {
}
case TaskDone(task) Persist(TaskRemoved(task))
}),
onEvent = (evt, state) evt match {
applyEvent = (evt, state) evt match {
case TaskRegistered(task) State(task :: state.tasksInFlight)
case TaskRemoved(task) State(state.tasksInFlight.filter(_ != task))
})
@ -235,11 +235,11 @@ object PersistentActorCompileOnlyTest {
def worker(task: Task): Behavior[Nothing] = ???
PersistentActor.persistent[RegisterTask, Event, State](
PersistentActor.immutable[RegisterTask, Event, State](
persistenceId = "asdf",
initialState = State(Nil),
// The 'onSignal' seems to break type inference here.. not sure if that can be avoided?
actions = Actions[RegisterTask, Event, State]((cmd, state, ctx) cmd match {
actions = Actions[RegisterTask, Event, State]((ctx, cmd, state) cmd match {
case RegisterTask(task) Persist(TaskRegistered(task))
.andThen { _
val child = ctx.spawn[Nothing](worker(task), task)
@ -247,13 +247,13 @@ object PersistentActorCompileOnlyTest {
ctx.watch(child)
}
}).onSignal {
case (Terminated(actorRef), _, ctx)
case (ctx, Terminated(actorRef), _)
// watchWith (as in the above example) is nicer because it means we don't have to
// 'manually' associate the task and the child actor, but we wanted to demonstrate
// signals here:
Persist(TaskRemoved(actorRef.path.name))
},
onEvent = (evt, state) evt match {
applyEvent = (evt, state) evt match {
case TaskRegistered(task) State(task :: state.tasksInFlight)
case TaskRemoved(task) State(state.tasksInFlight.filter(_ != task))
})
@ -302,12 +302,12 @@ object PersistentActorCompileOnlyTest {
Persist[Event, List[Id]](ItemAdded(id)).andThen(_
metadataRegistry ! GetMetaData(id, adapt))
PersistentActor.persistent[Command, Event, List[Id]](
PersistentActor.immutable[Command, Event, List[Id]](
persistenceId = "basket-1",
initialState = Nil,
actions =
Actions.byState(state
if (isFullyHydrated(basket, state)) Actions { (cmd, state, ctx)
if (isFullyHydrated(basket, state)) Actions { (ctx, cmd, state)
cmd match {
case AddItem(id) addItem(id, ctx.self)
case RemoveItem(id) Persist(ItemRemoved(id))
@ -316,7 +316,7 @@ object PersistentActorCompileOnlyTest {
case GetTotalPrice(sender) sender ! basket.items.map(_.price).sum; PersistNothing()
}
}
else Actions { (cmd, state, ctx)
else Actions { (ctx, cmd, state)
cmd match {
case AddItem(id) addItem(id, ctx.self)
case RemoveItem(id) Persist(ItemRemoved(id))
@ -330,10 +330,10 @@ object PersistentActorCompileOnlyTest {
case cmd: GetTotalPrice stash :+= cmd; PersistNothing()
}
}),
onEvent = (evt, state) evt match {
applyEvent = (evt, state) evt match {
case ItemAdded(id) id +: state
case ItemRemoved(id) state.filter(_ != id)
}).onRecoveryCompleted((state, ctx) {
}).onRecoveryCompleted((ctx, state) {
val ad = ctx.spawnAdapter((m: MetaData) GotMetaData(m))
state.foreach(id metadataRegistry ! GetMetaData(id, ad))
state
@ -357,14 +357,14 @@ object PersistentActorCompileOnlyTest {
case class MoodChanged(to: Mood) extends Event
case class Remembered(memory: String) extends Event
def changeMoodIfNeeded(currentState: Mood, newMood: Mood): PersistentEffect[Event, Mood] =
def changeMoodIfNeeded(currentState: Mood, newMood: Mood): Effect[Event, Mood] =
if (currentState == newMood) PersistNothing()
else Persist(MoodChanged(newMood))
PersistentActor.persistent[Command, Event, Mood](
PersistentActor.immutable[Command, Event, Mood](
persistenceId = "myPersistenceId",
initialState = Sad,
actions = Actions { (cmd, state, _)
actions = Actions { (_, cmd, state)
cmd match {
case Greet(whom)
println(s"Hi there, I'm $state!")
@ -378,12 +378,11 @@ object PersistentActorCompileOnlyTest {
val commonEffects = changeMoodIfNeeded(state, Happy)
CompositeEffect(
PersistAll[Event, Mood](commonEffects.events :+ Remembered(memory)),
commonEffects.sideEffects
)
commonEffects.sideEffects)
}
},
onEvent = {
applyEvent = {
case (MoodChanged(to), _) to
case (Remembered(_), state) state
})
@ -399,10 +398,10 @@ object PersistentActorCompileOnlyTest {
type State = Unit
PersistentActor.persistent[Command, Event, State](
PersistentActor.immutable[Command, Event, State](
persistenceId = "myPersistenceId",
initialState = (),
actions = Actions { (cmd, _, _)
actions = Actions { (_, cmd, _)
cmd match {
case Enough
Persist(Done)
@ -411,7 +410,7 @@ object PersistentActorCompileOnlyTest {
Stop())
}
},
onEvent = {
applyEvent = {
case (Done, _) ()
})
}

View file

@ -42,10 +42,10 @@ object PersistentActorSpec {
case object Tick
def counter(persistenceId: String): Behavior[Command] = {
PersistentActor.persistent[Command, Event, State](
PersistentActor.immutable[Command, Event, State](
persistenceId,
initialState = State(0, Vector.empty),
actions = Actions[Command, Event, State]((cmd, state, ctx) cmd match {
actions = Actions[Command, Event, State]((ctx, cmd, state) cmd match {
case Increment
Persist(Incremented(1))
case GetValue(replyTo)
@ -69,10 +69,10 @@ object PersistentActorSpec {
Persist(Incremented(100))
})
.onSignal {
case (Terminated(_), _, _)
case (_, Terminated(_), _)
Persist(Incremented(10))
},
onEvent = (evt, state) evt match {
applyEvent = (evt, state) evt match {
case Incremented(delta)
State(state.value + delta, state.history :+ state.value)
})

View file

@ -46,13 +46,13 @@ import akka.typed.internal.adapter.ActorRefAdapter
private val log = Logging(context.system, behavior.getClass)
override val persistenceId: String = behavior.persistenceId
override val persistenceId: String = behavior.persistenceIdFromActorName(self.path.name)
private var state: S = behavior.initialState
private val actions: Actions[C, E, S] = behavior.actions
private val eventHandler: (E, S) S = behavior.onEvent
private val eventHandler: (E, S) S = behavior.applyEvent
private val ctxAdapter = new ActorContextAdapter[C](context)
private val ctx = ctxAdapter.asScala
@ -62,7 +62,7 @@ import akka.typed.internal.adapter.ActorRefAdapter
state = snapshot.asInstanceOf[S]
case RecoveryCompleted
state = behavior.recoveryCompleted(state, ctx)
state = behavior.recoveryCompleted(ctx, state)
case event: E @unchecked
state = applyEvent(state, event)
@ -71,7 +71,7 @@ import akka.typed.internal.adapter.ActorRefAdapter
def applyEvent(s: S, event: E): S =
eventHandler.apply(event, s)
private val unhandledSignal: PartialFunction[(Signal, S, ActorContext[C]), PersistentEffect[E, S]] = {
private val unhandledSignal: PartialFunction[(ActorContext[C], Signal, S), Effect[E, S]] = {
case sig Unhandled()
}
@ -81,17 +81,16 @@ import akka.typed.internal.adapter.ActorRefAdapter
case msg
try {
// FIXME sigHandler(state)
val effects = msg match {
case a.Terminated(ref)
val sig = Terminated(ActorRefAdapter(ref))(null)
actions.sigHandler(state).applyOrElse((sig, state, ctx), unhandledSignal)
actions.sigHandler(state).applyOrElse((ctx, sig, state), unhandledSignal)
case a.ReceiveTimeout
actions.commandHandler(ctxAdapter.receiveTimeoutMsg, state, ctx)
actions.commandHandler(ctx, ctxAdapter.receiveTimeoutMsg, state)
// TODO note that PostStop and PreRestart signals are not handled, we wouldn't be able to persist there
case cmd: C @unchecked
// FIXME we could make it more safe by using ClassTag for C
actions.commandHandler(cmd, state, ctx)
actions.commandHandler(ctx, cmd, state)
}
applyEffects(msg, effects)
@ -103,7 +102,7 @@ import akka.typed.internal.adapter.ActorRefAdapter
}
private def applyEffects(msg: Any, effect: PersistentEffect[E, S], sideEffects: Seq[ChainableEffect[_, S]] = Nil): Unit = effect match {
private def applyEffects(msg: Any, effect: Effect[E, S], sideEffects: Seq[ChainableEffect[_, S]] = Nil): Unit = effect match {
case CompositeEffect(Some(persist), sideEffects)
applyEffects(msg, persist, sideEffects)
case CompositeEffect(_, sideEffects)
@ -130,7 +129,7 @@ import akka.typed.internal.adapter.ActorRefAdapter
}
def applySideEffect(effect: ChainableEffect[_, S]): Unit = effect match {
case Stop() // FIXME implement
case Stop() context.stop(self)
case SideEffect(callbacks) callbacks.apply(state)
}
}

View file

@ -3,7 +3,7 @@
*/
package akka.typed.persistence.scaladsl
import scala.collection.immutable
import scala.collection.{ immutable im }
import akka.annotation.DoNotInherit
import akka.annotation.InternalApi
import akka.typed.Behavior.UntypedBehavior
@ -12,58 +12,75 @@ import akka.typed.persistence.internal.PersistentActorImpl
import akka.typed.scaladsl.ActorContext
object PersistentActor {
def persistent[Command, Event, State](
/**
* Create a `Behavior` for a persistent actor.
*/
def immutable[Command, Event, State](
persistenceId: String,
initialState: State,
actions: Actions[Command, Event, State],
onEvent: (Event, State) State): PersistentBehavior[Command, Event, State] =
new PersistentBehavior(persistenceId, initialState, actions, onEvent,
recoveryCompleted = (state, _) state)
applyEvent: (Event, State) State): PersistentBehavior[Command, Event, State] =
persistentEntity(_ persistenceId, initialState, actions, applyEvent)
sealed abstract class PersistentEffect[+Event, State]() {
/**
* Create a `Behavior` for a persistent actor in Cluster Sharding, when the persistenceId is not known
* until the actor is started and typically based on the entityId, which
* is the actor name.
*
* TODO This will not be needed when it can be wrapped in `Actor.deferred`.
*/
def persistentEntity[Command, Event, State](
persistenceIdFromActorName: String String,
initialState: State,
actions: Actions[Command, Event, State],
applyEvent: (Event, State) State): PersistentBehavior[Command, Event, State] =
new PersistentBehavior(persistenceIdFromActorName, initialState, actions, applyEvent,
recoveryCompleted = (_, state) state)
sealed abstract class Effect[+Event, State]() {
/* All events that will be persisted in this effect */
def events: immutable.Seq[Event] = Nil
def events: im.Seq[Event] = Nil
/* All side effects that will be performed in this effect */
def sideEffects: immutable.Seq[ChainableEffect[_, State]] =
if (isInstanceOf[ChainableEffect[_, State]]) immutable.Seq(asInstanceOf[ChainableEffect[_, State]])
def sideEffects: im.Seq[ChainableEffect[_, State]] =
if (isInstanceOf[ChainableEffect[_, State]]) im.Seq(asInstanceOf[ChainableEffect[_, State]])
else Nil
def andThen(sideEffects: ChainableEffect[_, State]*): PersistentEffect[Event, State] =
def andThen(sideEffects: ChainableEffect[_, State]*): Effect[Event, State] =
CompositeEffect(if (events.isEmpty) None else Some(this), sideEffects.toList)
/** Convenience method to register a side effect with just a callback function */
def andThen(callback: State Unit): PersistentEffect[Event, State] =
def andThen(callback: State Unit): Effect[Event, State] =
andThen(SideEffect[Event, State](callback))
}
case class CompositeEffect[Event, State](persistingEffect: Option[PersistentEffect[Event, State]], override val sideEffects: immutable.Seq[ChainableEffect[_, State]]) extends PersistentEffect[Event, State] {
case class CompositeEffect[Event, State](persistingEffect: Option[Effect[Event, State]], override val sideEffects: im.Seq[ChainableEffect[_, State]]) extends Effect[Event, State] {
override val events = persistingEffect.map(_.events).getOrElse(Nil)
override def andThen(additionalSideEffects: ChainableEffect[_, State]*): CompositeEffect[Event, State] =
copy(sideEffects = sideEffects ++ additionalSideEffects)
}
object CompositeEffect {
def apply[Event, State](persistAll: PersistAll[Event, State], sideEffects: immutable.Seq[ChainableEffect[_, State]]): CompositeEffect[Event, State] =
def apply[Event, State](persistAll: PersistAll[Event, State], sideEffects: im.Seq[ChainableEffect[_, State]]): CompositeEffect[Event, State] =
CompositeEffect(Some(persistAll), sideEffects)
}
case class PersistNothing[Event, State]() extends PersistentEffect[Event, State]
case class PersistNothing[Event, State]() extends Effect[Event, State]
case class Persist[Event, State](event: Event) extends PersistentEffect[Event, State] {
case class Persist[Event, State](event: Event) extends Effect[Event, State] {
override val events = event :: Nil
}
case class PersistAll[Event, State](override val events: immutable.Seq[Event]) extends PersistentEffect[Event, State]
case class PersistAll[Event, State](override val events: im.Seq[Event]) extends Effect[Event, State]
trait ChainableEffect[Event, State] {
self: PersistentEffect[Event, State]
self: Effect[Event, State]
}
case class SideEffect[Event, State](effect: State Unit) extends PersistentEffect[Event, State] with ChainableEffect[Event, State]
case class Stop[Event, State]() extends PersistentEffect[Event, State] with ChainableEffect[Event, State]()
case class SideEffect[Event, State](effect: State Unit) extends Effect[Event, State] with ChainableEffect[Event, State]
case class Stop[Event, State]() extends Effect[Event, State] with ChainableEffect[Event, State]()
case class Unhandled[Event, State]() extends PersistentEffect[Event, State]
case class Unhandled[Event, State]() extends Effect[Event, State]
type CommandHandler[Command, Event, State] = Function3[Command, State, ActorContext[Command], PersistentEffect[Event, State]]
type SignalHandler[Command, Event, State] = PartialFunction[(Signal, State, ActorContext[Command]), PersistentEffect[Event, State]]
type CommandHandler[Command, Event, State] = Function3[ActorContext[Command], Command, State, Effect[Event, State]]
type SignalHandler[Command, Event, State] = PartialFunction[(ActorContext[Command], Signal, State), Effect[Event, State]]
/**
* `Actions` defines command handlers and partial function for other signals,
@ -79,8 +96,8 @@ object PersistentActor {
/**
* Convenience for simple commands that don't need the state and context.
*/
def command[Command, Event, State](commandHandler: Command PersistentEffect[Event, State]): Actions[Command, Event, State] =
apply((cmd, _, _) commandHandler(cmd))
def command[Command, Event, State](commandHandler: Command Effect[Event, State]): Actions[Command, Event, State] =
apply((_, cmd, _) commandHandler(cmd))
/**
* Select different actions based on current state.
@ -97,7 +114,7 @@ object PersistentActor {
choice: State Actions[Command, Event, State],
signalHandler: SignalHandler[Command, Event, State])
extends Actions[Command, Event, State](
commandHandler = (cmd, state, ctx) choice(state).commandHandler(cmd, state, ctx),
commandHandler = (ctx, cmd, state) choice(state).commandHandler(ctx, cmd, state),
signalHandler) {
// SignalHandler may be registered in the wrapper or in the wrapped
@ -136,11 +153,11 @@ object PersistentActor {
}
class PersistentBehavior[Command, Event, State](
val persistenceId: String,
val initialState: State,
val actions: PersistentActor.Actions[Command, Event, State],
val onEvent: (Event, State) State,
val recoveryCompleted: (State, ActorContext[Command]) State) extends UntypedBehavior[Command] {
@InternalApi private[akka] val persistenceIdFromActorName: String String,
val initialState: State,
val actions: PersistentActor.Actions[Command, Event, State],
val applyEvent: (Event, State) State,
val recoveryCompleted: (ActorContext[Command], State) State) extends UntypedBehavior[Command] {
import PersistentActor._
/** INTERNAL API */
@ -150,18 +167,24 @@ class PersistentBehavior[Command, Event, State](
* The `callback` function is called to notify the actor that the recovery process
* is finished.
*/
def onRecoveryCompleted(callback: (State, ActorContext[Command]) State): PersistentBehavior[Command, Event, State] =
def onRecoveryCompleted(callback: (ActorContext[Command], State) State): PersistentBehavior[Command, Event, State] =
copy(recoveryCompleted = callback)
def snapshotOnState(predicate: State Boolean): PersistentBehavior[Command, Event, State] = ??? // FIXME
/**
* FIXME snapshots are not implemented yet, this is only an API placeholder
*/
def snapshotOnState(predicate: State Boolean): PersistentBehavior[Command, Event, State] = ???
def snapshotOn(predicate: (State, Event) Boolean): PersistentBehavior[Command, Event, State] = ??? // FIXME
/**
* FIXME snapshots are not implemented yet, this is only an API placeholder
*/
def snapshotOn(predicate: (State, Event) Boolean): PersistentBehavior[Command, Event, State] = ???
private def copy(
persistenceId: String = persistenceId,
initialState: State = initialState,
actions: Actions[Command, Event, State] = actions,
onEvent: (Event, State) State = onEvent,
recoveryCompleted: (State, ActorContext[Command]) State = recoveryCompleted): PersistentBehavior[Command, Event, State] =
new PersistentBehavior(persistenceId, initialState, actions, onEvent, recoveryCompleted)
persistenceIdFromActorName: String String = persistenceIdFromActorName,
initialState: State = initialState,
actions: Actions[Command, Event, State] = actions,
applyEvent: (Event, State) State = applyEvent,
recoveryCompleted: (ActorContext[Command], State) State = recoveryCompleted): PersistentBehavior[Command, Event, State] =
new PersistentBehavior(persistenceIdFromActorName, initialState, actions, applyEvent, recoveryCompleted)
}