first impl of Typed PersistentActor, #22273

* illustrate problem with wrapping behavior
This commit is contained in:
Patrik Nordwall 2017-09-21 07:11:26 +02:00
parent be26edeb04
commit b671d2a47b
7 changed files with 362 additions and 43 deletions

View file

@ -0,0 +1,104 @@
/**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.typed.persistence.scaladsl
import scala.concurrent.duration._
import akka.typed.ActorRef
import akka.typed.ActorSystem
import akka.typed.Behavior
import akka.typed.TypedSpec
import akka.typed.scaladsl.Actor
import akka.typed.scaladsl.AskPattern._
import akka.typed.scaladsl.adapter._
import akka.typed.testkit.TestKitSettings
import akka.typed.testkit.scaladsl._
import com.typesafe.config.ConfigFactory
import org.scalatest.concurrent.Eventually
import akka.util.Timeout
import akka.typed.persistence.scaladsl.PersistentActor._
import akka.typed.SupervisorStrategy
object PersistentActorSpec {
val config = ConfigFactory.parseString("""
akka.persistence.journal.plugin = "akka.persistence.journal.inmem"
""")
sealed trait Command
final case object Increment extends Command
final case class GetValue(replyTo: ActorRef[State]) extends Command
sealed trait Event
final case class Incremented(delta: Int) extends Event
final case class State(value: Int, history: Vector[Int])
def counter(persistenceId: String): Behavior[Command] = {
PersistentActor.persistent[Command, Event, State](
persistenceId,
initialState = State(0, Vector.empty),
actions = Actions((cmd, state, ctx) cmd match {
case Increment
Persist(Incremented(1))
case GetValue(replyTo)
replyTo ! state
PersistNothing()
}),
onEvent = (evt, state) evt match {
case Incremented(delta)
State(state.value + delta, state.history :+ state.value)
})
}
}
class PersistentActorSpec extends TypedSpec(PersistentActorSpec.config) with Eventually {
import PersistentActorSpec._
trait RealTests extends StartSupport {
implicit def system: ActorSystem[TypedSpec.Command]
implicit val testSettings = TestKitSettings(system)
def `persist an event`(): Unit = {
val c = start(counter("c1"))
val probe = TestProbe[State]
c ! Increment
c ! GetValue(probe.ref)
probe.expectMsg(State(1, Vector(0)))
}
def `replay stored events`(): Unit = {
val c = start(counter("c2"))
val probe = TestProbe[State]
c ! Increment
c ! Increment
c ! Increment
c ! GetValue(probe.ref)
probe.expectMsg(State(3, Vector(0, 1, 2)))
val c2 = start(counter("c2"))
c2 ! GetValue(probe.ref)
probe.expectMsg(State(3, Vector(0, 1, 2)))
c ! Increment
c ! GetValue(probe.ref)
probe.expectMsg(State(4, Vector(0, 1, 2, 3)))
}
def `work when wrapped in other behavior`(): Unit = {
// FIXME This is a major problem with current implementation. Since the
// behavior is running as an untyped PersistentActor it's not possible to
// wrap it in Actor.deferred or Actor.supervise
pending
val behavior = Actor.supervise[Command](counter("c3"))
.onFailure(SupervisorStrategy.restartWithBackoff(1.second, 10.seconds, 0.1))
val c = start(behavior)
}
}
object `A PersistentActor (real, adapted)` extends RealTests with AdaptedSystem
}

View file

@ -153,6 +153,17 @@ object Behavior {
override def toString = "Unhandled"
}
/**
* INTERNAL API.
*/
@InternalApi
private[akka] abstract class UntypedBehavior[T] extends Behavior[T] {
/**
* INTERNAL API
*/
@InternalApi private[akka] def untypedProps: akka.actor.Props
}
/**
* INTERNAL API
*/
@ -264,11 +275,15 @@ object Behavior {
private def interpret[T](behavior: Behavior[T], ctx: ActorContext[T], msg: Any): Behavior[T] =
behavior match {
case SameBehavior | UnhandledBehavior throw new IllegalArgumentException(s"cannot execute with [$behavior] as behavior")
case d: DeferredBehavior[_] throw new IllegalArgumentException(s"deferred [$d] should not be passed to interpreter")
case IgnoreBehavior SameBehavior.asInstanceOf[Behavior[T]]
case s: StoppedBehavior[T] s
case EmptyBehavior UnhandledBehavior.asInstanceOf[Behavior[T]]
case SameBehavior | UnhandledBehavior
throw new IllegalArgumentException(s"cannot execute with [$behavior] as behavior")
case _: UntypedBehavior[_]
throw new IllegalArgumentException(s"cannot wrap behavior [$behavior] in " +
"Actor.deferred, Actor.supervise or similar")
case d: DeferredBehavior[_] throw new IllegalArgumentException(s"deferred [$d] should not be passed to interpreter")
case IgnoreBehavior SameBehavior.asInstanceOf[Behavior[T]]
case s: StoppedBehavior[T] s
case EmptyBehavior UnhandledBehavior.asInstanceOf[Behavior[T]]
case ext: ExtensibleBehavior[T]
val possiblyDeferredResult = msg match {
case signal: Signal ext.receiveSignal(ctx, signal)

View file

@ -20,6 +20,7 @@ import akka.event.Logging.Error
import akka.event.Logging
import akka.typed.Behavior.StoppedBehavior
import akka.util.OptionVal
import akka.typed.Behavior.UntypedBehavior
/**
* INTERNAL API
@ -104,6 +105,8 @@ private[typed] class ActorCell[T](
protected def ctx: ActorContext[T] = this
override def spawn[U](behavior: Behavior[U], name: String, props: Props): ActorRef[U] = {
if (behavior.isInstanceOf[UntypedBehavior[_]])
throw new IllegalArgumentException(s"${behavior.getClass.getName} requires untyped ActorSystem")
if (childrenMap contains name) throw InvalidActorNameException(s"actor name [$name] is not unique")
if (terminatingMap contains name) throw InvalidActorNameException(s"actor name [$name] is not yet free")
val dispatcher = props.firstOrElse[DispatcherSelector](DispatcherFromExecutionContext(executionContext))

View file

@ -9,6 +9,7 @@ import akka.{ actor ⇒ a }
import scala.concurrent.duration._
import scala.concurrent.ExecutionContextExecutor
import akka.annotation.InternalApi
import akka.typed.Behavior.UntypedBehavior
/**
* INTERNAL API. Wrapping an [[akka.actor.ActorContext]] as an [[ActorContext]].
@ -97,12 +98,24 @@ import akka.annotation.InternalApi
}
def spawnAnonymous[T](ctx: akka.actor.ActorContext, behavior: Behavior[T], props: Props): ActorRef[T] = {
Behavior.validateAsInitial(behavior)
ActorRefAdapter(ctx.actorOf(PropsAdapter(() behavior, props)))
behavior match {
case b: UntypedBehavior[_]
// TODO dispatcher from props
ActorRefAdapter(ctx.actorOf(b.untypedProps))
case _
Behavior.validateAsInitial(behavior)
ActorRefAdapter(ctx.actorOf(PropsAdapter(() behavior, props)))
}
}
def spawn[T](ctx: akka.actor.ActorContext, behavior: Behavior[T], name: String, props: Props): ActorRef[T] = {
Behavior.validateAsInitial(behavior)
ActorRefAdapter(ctx.actorOf(PropsAdapter(() behavior, props), name))
behavior match {
case b: UntypedBehavior[_]
// TODO dispatcher from props
ActorRefAdapter(ctx.actorOf(b.untypedProps, name))
case _
Behavior.validateAsInitial(behavior)
ActorRefAdapter(ctx.actorOf(PropsAdapter(() behavior, props), name))
}
}
}

View file

@ -0,0 +1,114 @@
/*
* Copyright (C) 2017 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.typed.persistence.internal
import akka.actor.Props
import akka.annotation.InternalApi
import akka.event.Logging
import akka.persistence.{ PersistentActor UntypedPersistentActor }
import akka.persistence.RecoveryCompleted
import akka.persistence.SnapshotOffer
import akka.typed.Signal
import akka.typed.internal.adapter.ActorContextAdapter
import akka.typed.persistence.scaladsl.PersistentActor
import akka.typed.persistence.scaladsl.PersistentBehavior
import akka.typed.scaladsl.ActorContext
/**
* INTERNAL API
*/
@InternalApi private[akka] object PersistentActorImpl {
/**
* Stop the actor for passivation. `PoisonPill` does not work well
* with persistent actors.
*/
case object Stop
def props[C, E, S](
behaviorFactory: () PersistentBehavior[C, E, S]): Props =
Props(new PersistentActorImpl(behaviorFactory()))
}
/**
* INTERNAL API
* The `PersistentActor` that runs a `PersistentBehavior`.
*/
@InternalApi private[akka] class PersistentActorImpl[C, E, S](
behavior: PersistentBehavior[C, E, S]) extends UntypedPersistentActor {
import PersistentActorImpl._
import PersistentActor._
private val log = Logging(context.system, behavior.getClass)
override val persistenceId: String = behavior.persistenceId
private var state: S = behavior.initialState
private val actions: Actions[C, E, S] = behavior.actions
private val eventHandler: (E, S) S = behavior.onEvent
private val ctx = new ActorContextAdapter[C](context).asScala
override def receiveRecover: Receive = {
case SnapshotOffer(_, snapshot)
state = snapshot.asInstanceOf[S]
case RecoveryCompleted
state = behavior.recoveryCompleted(state, ctx)
case event: E @unchecked
state = applyEvent(state, event)
}
def applyEvent(s: S, event: E): S =
eventHandler.apply(event, s)
private val unhandledSignal: PartialFunction[(Signal, S, ActorContext[C]), PersistentEffect[E, S]] = {
case sig Unhandled()
}
override def receiveCommand: Receive = {
case PersistentActorImpl.Stop
context.stop(self)
case msg
try {
// FIXME sigHandler(state)
val effect = msg match {
case sig: Signal
actions.sigHandler(state).applyOrElse((sig, state, ctx), unhandledSignal)
case cmd: C @unchecked
// FIXME we could make it more safe by using ClassTag for C
actions.commandHandler(cmd, state, ctx)
}
effect match {
case Persist(event, callbacks)
// apply the event before persist so that validation exception is handled before persisting
// the invalid event, in case such validation is implemented in the event handler.
state = applyEvent(state, event)
persist(event) { _
callbacks.foreach(_.apply(state))
}
// FIXME PersistAll
case PersistNothing(callbacks)
callbacks.foreach(_.apply(state))
case Unhandled(callbacks)
super.unhandled(msg)
callbacks.foreach(_.apply(state))
}
} catch {
case e: MatchError throw new IllegalStateException(
s"Undefined state [${state.getClass.getName}] or handler for [${msg.getClass.getName} " +
s"in [${behavior.getClass.getName}] with persistenceId [${persistenceId}]")
}
}
}

View file

@ -3,21 +3,21 @@
*/
package akka.typed.persistence.scaladsl
import akka.typed
import akka.typed.scaladsl.ActorContext
import akka.typed.ExtensibleBehavior
import akka.typed.Signal
import akka.typed.Behavior
import scala.reflect.ClassTag
import akka.annotation.DoNotInherit
import akka.annotation.InternalApi
import akka.typed.Behavior.UntypedBehavior
import akka.typed.Signal
import akka.typed.persistence.internal.PersistentActorImpl
import akka.typed.scaladsl.ActorContext
object PersistentActor {
def persistent[Command, Event, State](
persistenceId: String,
initialState: State,
commandHandler: ActionHandler[Command, Event, State],
onEvent: (Event, State) State): PersistentBehavior[Command, Event, State] =
new PersistentBehavior
persistenceId: String,
initialState: State,
actions: Actions[Command, Event, State],
onEvent: (Event, State) State): PersistentBehavior[Command, Event, State] =
new PersistentBehavior(persistenceId, initialState, actions, onEvent,
recoveryCompleted = (state, _) state)
sealed abstract class PersistentEffect[+Event, State]() {
def andThen(callback: State Unit): PersistentEffect[Event, State]
@ -35,31 +35,100 @@ object PersistentActor {
def andThen(callback: State Unit) = copy(callbacks = callback :: callbacks)
}
class ActionHandler[Command: ClassTag, Event, State](val handler: ((Any, State, ActorContext[Command]) PersistentEffect[Event, State])) {
def onSignal(signalHandler: PartialFunction[(Any, State, ActorContext[Command]), PersistentEffect[Event, State]]): ActionHandler[Command, Event, State] =
ActionHandler {
case (command: Command, state, ctx) handler(command, state, ctx)
case (signal: Signal, state, ctx) signalHandler.orElse(unhandledSignal).apply((signal, state, ctx))
case _ Unhandled()
}
private val unhandledSignal: PartialFunction[(Any, State, ActorContext[Command]), PersistentEffect[Event, State]] = { case _ Unhandled() }
type CommandHandler[Command, Event, State] = Function3[Command, State, ActorContext[Command], PersistentEffect[Event, State]]
type SignalHandler[Command, Event, State] = PartialFunction[(Signal, State, ActorContext[Command]), PersistentEffect[Event, State]]
/**
* `Actions` defines command handlers and partial function for other signals,
* e.g. `Termination` messages if `watch` is used.
*
* Note that you can have different actions based on current state by using
* [[Actions#byState]].
*/
object Actions {
def apply[Command, Event, State](commandHandler: CommandHandler[Command, Event, State]): Actions[Command, Event, State] =
new Actions(commandHandler, Map.empty)
/**
* Select different actions based on current state.
*/
def byState[Command, Event, State](choice: State Actions[Command, Event, State]): Actions[Command, Event, State] =
new ByStateActions(choice, signalHandler = PartialFunction.empty)
}
object ActionHandler {
def cmd[Command: ClassTag, Event, State](commandHandler: Command PersistentEffect[Event, State]): ActionHandler[Command, Event, State] = ???
def apply[Command: ClassTag, Event, State](commandHandler: ((Command, State, ActorContext[Command]) PersistentEffect[Event, State])): ActionHandler[Command, Event, State] =
new ActionHandler(commandHandler.asInstanceOf[((Any, State, ActorContext[Command]) PersistentEffect[Event, State])])
def byState[Command: ClassTag, Event, State](actionHandler: State ActionHandler[Command, Event, State]): ActionHandler[Command, Event, State] =
new ActionHandler(handler = {
case (action, state, ctx) actionHandler(state).handler(action, state, ctx)
})
/**
* INTERNAL API
*/
@InternalApi private[akka] final class ByStateActions[Command, Event, State](
choice: State Actions[Command, Event, State],
signalHandler: SignalHandler[Command, Event, State])
extends Actions[Command, Event, State](
commandHandler = (cmd, state, ctx) choice(state).commandHandler(cmd, state, ctx),
signalHandler) {
// SignalHandler may be registered in the wrapper or in the wrapped
private[akka] override def sigHandler(state: State): SignalHandler[Command, Event, State] =
choice(state).sigHandler(state).orElse(signalHandler)
// override to preserve the ByStateActions
private[akka] override def withSignalHandler(
handler: SignalHandler[Command, Event, State]): Actions[Command, Event, State] =
new ByStateActions(choice, handler)
}
/**
* `Actions` defines command handlers and partial function for other signals,
* e.g. `Termination` messages if `watch` is used.
* `Actions` is an immutable class.
*/
@DoNotInherit class Actions[Command, Event, State] private[akka] (
val commandHandler: CommandHandler[Command, Event, State],
val signalHandler: SignalHandler[Command, Event, State]) {
@InternalApi private[akka] def sigHandler(state: State): SignalHandler[Command, Event, State] =
signalHandler
def onSignal(handler: SignalHandler[Command, Event, State]): Actions[Command, Event, State] =
withSignalHandler(signalHandler.orElse(handler))
/** INTERNAL API */
@InternalApi private[akka] def withSignalHandler(
handler: SignalHandler[Command, Event, State]): Actions[Command, Event, State] =
new Actions(commandHandler, handler)
}
}
class PersistentBehavior[Command, Event, State] extends ExtensibleBehavior[Command] {
override def receiveSignal(ctx: typed.ActorContext[Command], msg: Signal): Behavior[Command] = ???
override def receiveMessage(ctx: typed.ActorContext[Command], msg: Command): Behavior[Command] = ???
class PersistentBehavior[Command, Event, State](
val persistenceId: String,
val initialState: State,
val actions: PersistentActor.Actions[Command, Event, State],
val onEvent: (Event, State) State,
val recoveryCompleted: (State, ActorContext[Command]) State) extends UntypedBehavior[Command] {
import PersistentActor._
def onRecoveryComplete(callback: (ActorContext[Command], State) Unit): PersistentBehavior[Command, Event, State] = ???
def snapshotOnState(predicate: State Boolean): PersistentBehavior[Command, Event, State] = ???
def snapshotOn(predicate: (State, Event) Boolean): PersistentBehavior[Command, Event, State] = ???
/** INTERNAL API */
@InternalApi private[akka] override def untypedProps: akka.actor.Props = PersistentActorImpl.props(() this)
/**
* The `callback` function is called to notify the actor that the recovery process
* is finished.
*/
def onRecoveryCompleted(callback: (State, ActorContext[Command]) State): PersistentBehavior[Command, Event, State] =
copy(recoveryCompleted = callback)
def snapshotOnState(predicate: State Boolean): PersistentBehavior[Command, Event, State] = ??? // FIXME
def snapshotOn(predicate: (State, Event) Boolean): PersistentBehavior[Command, Event, State] = ??? // FIXME
private def copy(
persistenceId: String = persistenceId,
initialState: State = initialState,
actions: Actions[Command, Event, State] = actions,
onEvent: (Event, State) State = onEvent,
recoveryCompleted: (State, ActorContext[Command]) State = recoveryCompleted): PersistentBehavior[Command, Event, State] =
new PersistentBehavior(persistenceId, initialState, actions, onEvent, recoveryCompleted)
}

View file

@ -163,7 +163,8 @@ lazy val typed = akkaModule("akka-typed")
cluster % "compile->compile;test->test",
clusterTools,
clusterSharding,
distributedData)
distributedData,
persistence % "compile->compile;test->test")
lazy val typedTests = akkaModule("akka-typed-tests")
.dependsOn(typed, typedTestkit % "compile->compile;test->test")