#23695 Remove onSignal from typed persistent actor (#24253)

This commit is contained in:
Martynas Mickevičius 2018-01-12 19:50:49 +07:00 committed by GitHub
parent a3ae494325
commit 930d2e9133
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 34 additions and 123 deletions

View file

@ -42,13 +42,13 @@ object ClusterShardingPersistenceSpec {
PersistentActor.persistentEntity[Command, String, String](
persistenceIdFromActorName = name "Test-" + name,
initialState = "",
commandHandler = CommandHandler((_, state, cmd) cmd match {
commandHandler = (_, state, cmd) cmd match {
case Add(s) Effect.persist(s)
case Get(replyTo)
replyTo ! state
Effect.none
case StopPlz Effect.stop
}),
},
eventHandler = (state, evt) if (state.isEmpty) evt else state + "|" + evt)
val typeKey = EntityTypeKey[Command]("test")

View file

@ -40,13 +40,13 @@ object ClusterSingletonPersistenceSpec {
PersistentActor.immutable[Command, String, String](
persistenceId = "TheSingleton",
initialState = "",
commandHandler = CommandHandler((_, state, cmd) cmd match {
commandHandler = (_, state, cmd) cmd match {
case Add(s) Effect.persist(s)
case Get(replyTo)
replyTo ! state
Effect.none
case StopPlz Effect.stop
}),
},
eventHandler = (state, evt) if (state.isEmpty) evt else state + "|" + evt)
}

View file

@ -80,15 +80,12 @@ import akka.actor.typed.internal.adapter.ActorRefAdapter
case msg
try {
val effects = msg match {
case a.Terminated(ref)
val sig = Terminated(ActorRefAdapter(ref))(null)
commandHandler.sigHandler(state).applyOrElse((ctx, state, sig), unhandledSignal)
case a.ReceiveTimeout
commandHandler.commandHandler(ctx, state, ctxAdapter.receiveTimeoutMsg)
// TODO note that PostStop and PreRestart signals are not handled, we wouldn't be able to persist there
commandHandler(ctx, state, ctxAdapter.receiveTimeoutMsg)
// TODO note that PostStop, PreRestart and Terminated signals are not handled, we wouldn't be able to persist there
case cmd: C @unchecked
// FIXME we could make it more safe by using ClassTag for C
commandHandler.commandHandler(ctx, state, cmd)
commandHandler(ctx, state, cmd)
}
applyEffects(msg, effects)

View file

@ -4,9 +4,8 @@
package akka.persistence.typed.scaladsl
import scala.collection.{ immutable im }
import akka.annotation.{ ApiMayChange, DoNotInherit, InternalApi }
import akka.annotation.{ DoNotInherit, InternalApi }
import akka.actor.typed.Behavior.UntypedBehavior
import akka.actor.typed.Signal
import akka.persistence.typed.internal.PersistentActorImpl
import akka.actor.typed.scaladsl.ActorContext
@ -144,40 +143,29 @@ object PersistentActor {
@InternalApi
private[akka] case object Unhandled extends Effect[Nothing, Nothing]
type CommandToEffect[Command, Event, State] = (ActorContext[Command], State, Command) Effect[Event, State]
type SignalHandler[Command, Event, State] = PartialFunction[(ActorContext[Command], State, Signal), Effect[Event, State]]
type CommandHandler[Command, Event, State] = (ActorContext[Command], State, Command) Effect[Event, State]
/**
* The `CommandHandler` defines how to act on commands and partial function for other signals,
* e.g. `Termination` messages if `watch` is used.
* The `CommandHandler` defines how to act on commands.
*
* Note that you can have different command handlers based on current state by using
* [[CommandHandler#byState]].
*/
object CommandHandler {
/**
* Create a command handler that will be applied for commands.
*
* @see [[Effect]] for possible effects of a command.
*/
// Note: using full parameter type instead of type aliase here to make API more straight forward to figure out in an IDE
def apply[Command, Event, State](commandHandler: (ActorContext[Command], State, Command) Effect[Event, State]): CommandHandler[Command, Event, State] =
new CommandHandler(commandHandler, Map.empty)
/**
* Convenience for simple commands that don't need the state and context.
*
* @see [[Effect]] for possible effects of a command.
*/
def command[Command, Event, State](commandHandler: Command Effect[Event, State]): CommandHandler[Command, Event, State] =
apply((_, _, cmd) commandHandler(cmd))
(_, _, cmd) commandHandler(cmd)
/**
* Select different command handlers based on current state.
*/
def byState[Command, Event, State](choice: State CommandHandler[Command, Event, State]): CommandHandler[Command, Event, State] =
new ByStateCommandHandler(choice, signalHandler = PartialFunction.empty)
new ByStateCommandHandler(choice)
}
@ -185,46 +173,13 @@ object PersistentActor {
* INTERNAL API
*/
@InternalApi private[akka] final class ByStateCommandHandler[Command, Event, State](
choice: State CommandHandler[Command, Event, State],
signalHandler: SignalHandler[Command, Event, State])
extends CommandHandler[Command, Event, State](
commandHandler = (ctx, state, cmd) choice(state).commandHandler(ctx, state, cmd),
signalHandler) {
choice: State CommandHandler[Command, Event, State])
extends CommandHandler[Command, Event, State] {
// SignalHandler may be registered in the wrapper or in the wrapped
private[akka] override def sigHandler(state: State): SignalHandler[Command, Event, State] =
choice(state).sigHandler(state).orElse(signalHandler)
// override to preserve the ByStateCommandHandler
private[akka] override def withSignalHandler(
handler: SignalHandler[Command, Event, State]): CommandHandler[Command, Event, State] =
new ByStateCommandHandler(choice, handler)
override def apply(ctx: ActorContext[Command], state: State, cmd: Command): Effect[Event, State] =
choice(state)(ctx, state, cmd)
}
/**
* `CommandHandler` defines command handlers and partial function for other signals,
* e.g. `Termination` messages if `watch` is used.
* `CommandHandler` is an immutable class.
*/
@DoNotInherit class CommandHandler[Command, Event, State] private[akka] (
val commandHandler: CommandToEffect[Command, Event, State],
val signalHandler: SignalHandler[Command, Event, State]) {
@InternalApi private[akka] def sigHandler(state: State): SignalHandler[Command, Event, State] =
signalHandler
// Note: using full parameter type instead of type alias here to make API more straight forward to figure out in an IDE
def onSignal(handler: PartialFunction[(ActorContext[Command], State, Signal), Effect[Event, State]]): CommandHandler[Command, Event, State] =
withSignalHandler(signalHandler.orElse(handler))
/** INTERNAL API */
@InternalApi private[akka] def withSignalHandler(
handler: SignalHandler[Command, Event, State]): CommandHandler[Command, Event, State] =
new CommandHandler(commandHandler, handler)
}
}
class PersistentBehavior[Command, Event, State](

View file

@ -111,14 +111,14 @@ object PersistentActorCompileOnlyTest {
initialState = EventsInFlight(0, Map.empty),
commandHandler = CommandHandler((ctx, state, cmd) cmd match {
commandHandler = (ctx, state, cmd) cmd match {
case DoSideEffect(data)
Effect.persist(IntentRecorded(state.nextCorrelationId, data)).andThen {
performSideEffect(ctx.self, state.nextCorrelationId, data)
}
case AcknowledgeSideEffect(correlationId)
Effect.persist(SideEffectAcknowledged(correlationId))
}),
},
eventHandler = (state, evt) evt match {
case IntentRecorded(correlationId, data)
@ -220,7 +220,7 @@ object PersistentActorCompileOnlyTest {
PersistentActor.immutable[Command, Event, State](
persistenceId = "asdf",
initialState = State(Nil),
commandHandler = CommandHandler((ctx, _, cmd) cmd match {
commandHandler = (ctx, _, cmd) cmd match {
case RegisterTask(task)
Effect.persist(TaskRegistered(task))
.andThen {
@ -229,42 +229,6 @@ object PersistentActorCompileOnlyTest {
ctx.watchWith(child, TaskDone(task))
}
case TaskDone(task) Effect.persist(TaskRemoved(task))
}),
eventHandler = (state, evt) evt match {
case TaskRegistered(task) State(task :: state.tasksInFlight)
case TaskRemoved(task) State(state.tasksInFlight.filter(_ != task))
})
}
object UsingSignals {
type Task = String
case class RegisterTask(task: Task)
sealed trait Event
case class TaskRegistered(task: Task) extends Event
case class TaskRemoved(task: Task) extends Event
case class State(tasksInFlight: List[Task])
def worker(task: Task): Behavior[Nothing] = ???
PersistentActor.immutable[RegisterTask, Event, State](
persistenceId = "asdf",
initialState = State(Nil),
// The 'onSignal' seems to break type inference here.. not sure if that can be avoided?
commandHandler = CommandHandler[RegisterTask, Event, State]((ctx, state, cmd) cmd match {
case RegisterTask(task) Effect.persist(TaskRegistered(task))
.andThen {
val child = ctx.spawn[Nothing](worker(task), task)
// This assumes *any* termination of the child may trigger a `TaskDone`:
ctx.watch(child)
}
}).onSignal {
case (ctx, _, Terminated(actorRef))
// watchWith (as in the above example) is nicer because it means we don't have to
// 'manually' associate the task and the child actor, but we wanted to demonstrate
// signals here:
Effect.persist(TaskRemoved(actorRef.path.name))
},
eventHandler = (state, evt) evt match {
case TaskRegistered(task) State(task :: state.tasksInFlight)
@ -321,7 +285,7 @@ object PersistentActorCompileOnlyTest {
initialState = Nil,
commandHandler =
CommandHandler.byState(state
if (isFullyHydrated(basket, state)) CommandHandler { (ctx, state, cmd)
if (isFullyHydrated(basket, state)) (ctx, state, cmd)
cmd match {
case AddItem(id) addItem(id, ctx.self)
case RemoveItem(id) Effect.persist(ItemRemoved(id))
@ -332,8 +296,7 @@ object PersistentActorCompileOnlyTest {
sender ! basket.items.map(_.price).sum
Effect.none
}
}
else CommandHandler { (ctx, state, cmd)
else (ctx, state, cmd)
cmd match {
case AddItem(id) addItem(id, ctx.self)
case RemoveItem(id) Effect.persist(ItemRemoved(id))
@ -348,7 +311,7 @@ object PersistentActorCompileOnlyTest {
stash :+= cmd
Effect.none
}
}),
),
eventHandler = (state, evt) evt match {
case ItemAdded(id) id +: state
case ItemRemoved(id) state.filter(_ != id)
@ -382,7 +345,7 @@ object PersistentActorCompileOnlyTest {
PersistentActor.immutable[Command, Event, Mood](
persistenceId = "myPersistenceId",
initialState = Sad,
commandHandler = CommandHandler { (_, state, cmd)
commandHandler = (_, state, cmd)
cmd match {
case Greet(whom)
println(s"Hi there, I'm $state!")
@ -398,8 +361,7 @@ object PersistentActorCompileOnlyTest {
val commonEffects = changeMoodIfNeeded(state, Happy)
Effect.persist(commonEffects.events :+ Remembered(memory), commonEffects.sideEffects)
}
},
},
eventHandler = {
case (_, MoodChanged(to)) to
case (state, Remembered(_)) state

View file

@ -28,6 +28,7 @@ object PersistentActorSpec {
final case object DoNothingAndThenLog extends Command
final case object EmptyEventsListAndThenLog extends Command
final case class GetValue(replyTo: ActorRef[State]) extends Command
final case object DelayFinished extends Command
private case object Timeout extends Command
sealed trait Event
@ -48,7 +49,7 @@ object PersistentActorSpec {
PersistentActor.immutable[Command, Event, State](
persistenceId,
initialState = State(0, Vector.empty),
commandHandler = CommandHandler[Command, Event, State]((ctx, state, cmd) cmd match {
commandHandler = (ctx, state, cmd) cmd match {
case Increment
Effect.persist(Incremented(1))
case GetValue(replyTo)
@ -62,8 +63,10 @@ object PersistentActorSpec {
case Tick Actor.stopped
})
})
ctx.watch(delay)
ctx.watchWith(delay, DelayFinished)
Effect.none
case DelayFinished
Effect.persist(Incremented(10))
case IncrementAfterReceiveTimeout
ctx.setReceiveTimeout(10.millis, Timeout)
Effect.none
@ -94,11 +97,7 @@ object PersistentActorSpec {
.andThen {
loggingActor ! firstLogging
}
})
.onSignal {
case (_, _, Terminated(_))
Effect.persist(Incremented(10))
},
},
eventHandler = (state, evt) evt match {
case Incremented(delta)
State(state.value + delta, state.history :+ state.value)

View file

@ -17,7 +17,7 @@ object BasicPersistentActorSpec {
PersistentActor.immutable[Command, Event, State](
persistenceId = "abc",
initialState = State(),
commandHandler = PersistentActor.CommandHandler { (ctx, state, cmd) ??? },
commandHandler = (ctx, state, cmd) ???,
eventHandler = (state, evt) ???)
//#structure
@ -26,7 +26,7 @@ object BasicPersistentActorSpec {
PersistentActor.immutable[Command, Event, State](
persistenceId = "abc",
initialState = State(),
commandHandler = PersistentActor.CommandHandler { (ctx, state, cmd) ??? },
commandHandler = (ctx, state, cmd) ???,
eventHandler = (state, evt) ???)
.onRecoveryCompleted { (ctx, state)
???

View file

@ -51,7 +51,7 @@ object InDepthPersistentActorSpec {
//#initial-command-handler
private def initial: CommandHandler[BlogCommand, BlogEvent, BlogState] =
CommandHandler { (ctx, state, cmd)
(ctx, state, cmd)
cmd match {
case AddPost(content, replyTo)
val evt = PostAdded(content.postId, content)
@ -64,12 +64,11 @@ object InDepthPersistentActorSpec {
case _
Effect.unhandled
}
}
//#initial-command-handler
//#post-added-command-handler
private def postAdded: CommandHandler[BlogCommand, BlogEvent, BlogState] = {
CommandHandler { (ctx, state, cmd)
(ctx, state, cmd)
cmd match {
case ChangeBody(newBody, replyTo)
val evt = BodyChanged(state.postId, newBody)
@ -89,7 +88,6 @@ object InDepthPersistentActorSpec {
case PassivatePost
Effect.stop
}
}
}
//#post-added-command-handler