From b671d2a47b8b9b9c97fe288ef2e49e68cd197a61 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 21 Sep 2017 07:11:26 +0200 Subject: [PATCH] first impl of Typed PersistentActor, #22273 * illustrate problem with wrapping behavior --- .../scaladsl/PersistentActorSpec.scala | 104 ++++++++++++++ .../src/main/scala/akka/typed/Behavior.scala | 25 +++- .../scala/akka/typed/internal/ActorCell.scala | 3 + .../adapter/ActorContextAdapter.scala | 21 ++- .../internal/PersistentActorImpl.scala | 114 +++++++++++++++ .../scaladsl/PersistentActor.scala | 135 +++++++++++++----- build.sbt | 3 +- 7 files changed, 362 insertions(+), 43 deletions(-) create mode 100644 akka-typed-tests/src/test/scala/akka/typed/persistence/scaladsl/PersistentActorSpec.scala create mode 100644 akka-typed/src/main/scala/akka/typed/persistence/internal/PersistentActorImpl.scala diff --git a/akka-typed-tests/src/test/scala/akka/typed/persistence/scaladsl/PersistentActorSpec.scala b/akka-typed-tests/src/test/scala/akka/typed/persistence/scaladsl/PersistentActorSpec.scala new file mode 100644 index 0000000000..c1cc7f5ea8 --- /dev/null +++ b/akka-typed-tests/src/test/scala/akka/typed/persistence/scaladsl/PersistentActorSpec.scala @@ -0,0 +1,104 @@ +/** + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.typed.persistence.scaladsl + +import scala.concurrent.duration._ + +import akka.typed.ActorRef +import akka.typed.ActorSystem +import akka.typed.Behavior +import akka.typed.TypedSpec +import akka.typed.scaladsl.Actor +import akka.typed.scaladsl.AskPattern._ +import akka.typed.scaladsl.adapter._ +import akka.typed.testkit.TestKitSettings +import akka.typed.testkit.scaladsl._ +import com.typesafe.config.ConfigFactory +import org.scalatest.concurrent.Eventually +import akka.util.Timeout +import akka.typed.persistence.scaladsl.PersistentActor._ +import akka.typed.SupervisorStrategy + +object PersistentActorSpec { + + val config = ConfigFactory.parseString(""" + akka.persistence.journal.plugin = "akka.persistence.journal.inmem" + """) + + sealed trait Command + final case object Increment extends Command + final case class GetValue(replyTo: ActorRef[State]) extends Command + + sealed trait Event + final case class Incremented(delta: Int) extends Event + + final case class State(value: Int, history: Vector[Int]) + + def counter(persistenceId: String): Behavior[Command] = { + PersistentActor.persistent[Command, Event, State]( + persistenceId, + initialState = State(0, Vector.empty), + actions = Actions((cmd, state, ctx) ⇒ cmd match { + case Increment ⇒ + Persist(Incremented(1)) + case GetValue(replyTo) ⇒ + replyTo ! state + PersistNothing() + }), + onEvent = (evt, state) ⇒ evt match { + case Incremented(delta) ⇒ + State(state.value + delta, state.history :+ state.value) + }) + } + +} + +class PersistentActorSpec extends TypedSpec(PersistentActorSpec.config) with Eventually { + import PersistentActorSpec._ + + trait RealTests extends StartSupport { + implicit def system: ActorSystem[TypedSpec.Command] + implicit val testSettings = TestKitSettings(system) + + def `persist an event`(): Unit = { + val c = start(counter("c1")) + + val probe = TestProbe[State] + c ! Increment + c ! GetValue(probe.ref) + probe.expectMsg(State(1, Vector(0))) + } + + def `replay stored events`(): Unit = { + val c = start(counter("c2")) + + val probe = TestProbe[State] + c ! Increment + c ! Increment + c ! Increment + c ! GetValue(probe.ref) + probe.expectMsg(State(3, Vector(0, 1, 2))) + + val c2 = start(counter("c2")) + c2 ! GetValue(probe.ref) + probe.expectMsg(State(3, Vector(0, 1, 2))) + c ! Increment + c ! GetValue(probe.ref) + probe.expectMsg(State(4, Vector(0, 1, 2, 3))) + } + + def `work when wrapped in other behavior`(): Unit = { + // FIXME This is a major problem with current implementation. Since the + // behavior is running as an untyped PersistentActor it's not possible to + // wrap it in Actor.deferred or Actor.supervise + pending + val behavior = Actor.supervise[Command](counter("c3")) + .onFailure(SupervisorStrategy.restartWithBackoff(1.second, 10.seconds, 0.1)) + val c = start(behavior) + } + + } + + object `A PersistentActor (real, adapted)` extends RealTests with AdaptedSystem +} diff --git a/akka-typed/src/main/scala/akka/typed/Behavior.scala b/akka-typed/src/main/scala/akka/typed/Behavior.scala index 777248fa59..5888bedf38 100644 --- a/akka-typed/src/main/scala/akka/typed/Behavior.scala +++ b/akka-typed/src/main/scala/akka/typed/Behavior.scala @@ -153,6 +153,17 @@ object Behavior { override def toString = "Unhandled" } + /** + * INTERNAL API. + */ + @InternalApi + private[akka] abstract class UntypedBehavior[T] extends Behavior[T] { + /** + * INTERNAL API + */ + @InternalApi private[akka] def untypedProps: akka.actor.Props + } + /** * INTERNAL API */ @@ -264,11 +275,15 @@ object Behavior { private def interpret[T](behavior: Behavior[T], ctx: ActorContext[T], msg: Any): Behavior[T] = behavior match { - case SameBehavior | UnhandledBehavior ⇒ throw new IllegalArgumentException(s"cannot execute with [$behavior] as behavior") - case d: DeferredBehavior[_] ⇒ throw new IllegalArgumentException(s"deferred [$d] should not be passed to interpreter") - case IgnoreBehavior ⇒ SameBehavior.asInstanceOf[Behavior[T]] - case s: StoppedBehavior[T] ⇒ s - case EmptyBehavior ⇒ UnhandledBehavior.asInstanceOf[Behavior[T]] + case SameBehavior | UnhandledBehavior ⇒ + throw new IllegalArgumentException(s"cannot execute with [$behavior] as behavior") + case _: UntypedBehavior[_] ⇒ + throw new IllegalArgumentException(s"cannot wrap behavior [$behavior] in " + + "Actor.deferred, Actor.supervise or similar") + case d: DeferredBehavior[_] ⇒ throw new IllegalArgumentException(s"deferred [$d] should not be passed to interpreter") + case IgnoreBehavior ⇒ SameBehavior.asInstanceOf[Behavior[T]] + case s: StoppedBehavior[T] ⇒ s + case EmptyBehavior ⇒ UnhandledBehavior.asInstanceOf[Behavior[T]] case ext: ExtensibleBehavior[T] ⇒ val possiblyDeferredResult = msg match { case signal: Signal ⇒ ext.receiveSignal(ctx, signal) diff --git a/akka-typed/src/main/scala/akka/typed/internal/ActorCell.scala b/akka-typed/src/main/scala/akka/typed/internal/ActorCell.scala index f5fbd3632b..2bf0f8511a 100644 --- a/akka-typed/src/main/scala/akka/typed/internal/ActorCell.scala +++ b/akka-typed/src/main/scala/akka/typed/internal/ActorCell.scala @@ -20,6 +20,7 @@ import akka.event.Logging.Error import akka.event.Logging import akka.typed.Behavior.StoppedBehavior import akka.util.OptionVal +import akka.typed.Behavior.UntypedBehavior /** * INTERNAL API @@ -104,6 +105,8 @@ private[typed] class ActorCell[T]( protected def ctx: ActorContext[T] = this override def spawn[U](behavior: Behavior[U], name: String, props: Props): ActorRef[U] = { + if (behavior.isInstanceOf[UntypedBehavior[_]]) + throw new IllegalArgumentException(s"${behavior.getClass.getName} requires untyped ActorSystem") if (childrenMap contains name) throw InvalidActorNameException(s"actor name [$name] is not unique") if (terminatingMap contains name) throw InvalidActorNameException(s"actor name [$name] is not yet free") val dispatcher = props.firstOrElse[DispatcherSelector](DispatcherFromExecutionContext(executionContext)) diff --git a/akka-typed/src/main/scala/akka/typed/internal/adapter/ActorContextAdapter.scala b/akka-typed/src/main/scala/akka/typed/internal/adapter/ActorContextAdapter.scala index 4fb98a9734..6629ddc02f 100644 --- a/akka-typed/src/main/scala/akka/typed/internal/adapter/ActorContextAdapter.scala +++ b/akka-typed/src/main/scala/akka/typed/internal/adapter/ActorContextAdapter.scala @@ -9,6 +9,7 @@ import akka.{ actor ⇒ a } import scala.concurrent.duration._ import scala.concurrent.ExecutionContextExecutor import akka.annotation.InternalApi +import akka.typed.Behavior.UntypedBehavior /** * INTERNAL API. Wrapping an [[akka.actor.ActorContext]] as an [[ActorContext]]. @@ -97,12 +98,24 @@ import akka.annotation.InternalApi } def spawnAnonymous[T](ctx: akka.actor.ActorContext, behavior: Behavior[T], props: Props): ActorRef[T] = { - Behavior.validateAsInitial(behavior) - ActorRefAdapter(ctx.actorOf(PropsAdapter(() ⇒ behavior, props))) + behavior match { + case b: UntypedBehavior[_] ⇒ + // TODO dispatcher from props + ActorRefAdapter(ctx.actorOf(b.untypedProps)) + case _ ⇒ + Behavior.validateAsInitial(behavior) + ActorRefAdapter(ctx.actorOf(PropsAdapter(() ⇒ behavior, props))) + } } def spawn[T](ctx: akka.actor.ActorContext, behavior: Behavior[T], name: String, props: Props): ActorRef[T] = { - Behavior.validateAsInitial(behavior) - ActorRefAdapter(ctx.actorOf(PropsAdapter(() ⇒ behavior, props), name)) + behavior match { + case b: UntypedBehavior[_] ⇒ + // TODO dispatcher from props + ActorRefAdapter(ctx.actorOf(b.untypedProps, name)) + case _ ⇒ + Behavior.validateAsInitial(behavior) + ActorRefAdapter(ctx.actorOf(PropsAdapter(() ⇒ behavior, props), name)) + } } } diff --git a/akka-typed/src/main/scala/akka/typed/persistence/internal/PersistentActorImpl.scala b/akka-typed/src/main/scala/akka/typed/persistence/internal/PersistentActorImpl.scala new file mode 100644 index 0000000000..6bed7937e3 --- /dev/null +++ b/akka-typed/src/main/scala/akka/typed/persistence/internal/PersistentActorImpl.scala @@ -0,0 +1,114 @@ +/* + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.typed.persistence.internal + +import akka.actor.Props +import akka.annotation.InternalApi +import akka.event.Logging +import akka.persistence.{ PersistentActor ⇒ UntypedPersistentActor } +import akka.persistence.RecoveryCompleted +import akka.persistence.SnapshotOffer +import akka.typed.Signal +import akka.typed.internal.adapter.ActorContextAdapter +import akka.typed.persistence.scaladsl.PersistentActor +import akka.typed.persistence.scaladsl.PersistentBehavior +import akka.typed.scaladsl.ActorContext + +/** + * INTERNAL API + */ +@InternalApi private[akka] object PersistentActorImpl { + + /** + * Stop the actor for passivation. `PoisonPill` does not work well + * with persistent actors. + */ + case object Stop + + def props[C, E, S]( + behaviorFactory: () ⇒ PersistentBehavior[C, E, S]): Props = + Props(new PersistentActorImpl(behaviorFactory())) + +} + +/** + * INTERNAL API + * The `PersistentActor` that runs a `PersistentBehavior`. + */ +@InternalApi private[akka] class PersistentActorImpl[C, E, S]( + behavior: PersistentBehavior[C, E, S]) extends UntypedPersistentActor { + + import PersistentActorImpl._ + import PersistentActor._ + + private val log = Logging(context.system, behavior.getClass) + + override val persistenceId: String = behavior.persistenceId + + private var state: S = behavior.initialState + + private val actions: Actions[C, E, S] = behavior.actions + + private val eventHandler: (E, S) ⇒ S = behavior.onEvent + + private val ctx = new ActorContextAdapter[C](context).asScala + + override def receiveRecover: Receive = { + case SnapshotOffer(_, snapshot) ⇒ + state = snapshot.asInstanceOf[S] + + case RecoveryCompleted ⇒ + state = behavior.recoveryCompleted(state, ctx) + + case event: E @unchecked ⇒ + state = applyEvent(state, event) + } + + def applyEvent(s: S, event: E): S = + eventHandler.apply(event, s) + + private val unhandledSignal: PartialFunction[(Signal, S, ActorContext[C]), PersistentEffect[E, S]] = { + case sig ⇒ Unhandled() + } + + override def receiveCommand: Receive = { + case PersistentActorImpl.Stop ⇒ + context.stop(self) + + case msg ⇒ + try { + // FIXME sigHandler(state) + val effect = msg match { + case sig: Signal ⇒ + actions.sigHandler(state).applyOrElse((sig, state, ctx), unhandledSignal) + case cmd: C @unchecked ⇒ + // FIXME we could make it more safe by using ClassTag for C + actions.commandHandler(cmd, state, ctx) + } + + effect match { + case Persist(event, callbacks) ⇒ + // apply the event before persist so that validation exception is handled before persisting + // the invalid event, in case such validation is implemented in the event handler. + state = applyEvent(state, event) + persist(event) { _ ⇒ + callbacks.foreach(_.apply(state)) + } + // FIXME PersistAll + case PersistNothing(callbacks) ⇒ + callbacks.foreach(_.apply(state)) + case Unhandled(callbacks) ⇒ + super.unhandled(msg) + callbacks.foreach(_.apply(state)) + } + } catch { + case e: MatchError ⇒ throw new IllegalStateException( + s"Undefined state [${state.getClass.getName}] or handler for [${msg.getClass.getName} " + + s"in [${behavior.getClass.getName}] with persistenceId [${persistenceId}]") + } + + } + +} + diff --git a/akka-typed/src/main/scala/akka/typed/persistence/scaladsl/PersistentActor.scala b/akka-typed/src/main/scala/akka/typed/persistence/scaladsl/PersistentActor.scala index b20ee40930..524884b3b6 100644 --- a/akka-typed/src/main/scala/akka/typed/persistence/scaladsl/PersistentActor.scala +++ b/akka-typed/src/main/scala/akka/typed/persistence/scaladsl/PersistentActor.scala @@ -3,21 +3,21 @@ */ package akka.typed.persistence.scaladsl -import akka.typed -import akka.typed.scaladsl.ActorContext -import akka.typed.ExtensibleBehavior -import akka.typed.Signal -import akka.typed.Behavior -import scala.reflect.ClassTag +import akka.annotation.DoNotInherit +import akka.annotation.InternalApi import akka.typed.Behavior.UntypedBehavior +import akka.typed.Signal +import akka.typed.persistence.internal.PersistentActorImpl +import akka.typed.scaladsl.ActorContext object PersistentActor { def persistent[Command, Event, State]( - persistenceId: String, - initialState: State, - commandHandler: ActionHandler[Command, Event, State], - onEvent: (Event, State) ⇒ State): PersistentBehavior[Command, Event, State] = - new PersistentBehavior + persistenceId: String, + initialState: State, + actions: Actions[Command, Event, State], + onEvent: (Event, State) ⇒ State): PersistentBehavior[Command, Event, State] = + new PersistentBehavior(persistenceId, initialState, actions, onEvent, + recoveryCompleted = (state, _) ⇒ state) sealed abstract class PersistentEffect[+Event, State]() { def andThen(callback: State ⇒ Unit): PersistentEffect[Event, State] @@ -35,31 +35,100 @@ object PersistentActor { def andThen(callback: State ⇒ Unit) = copy(callbacks = callback :: callbacks) } - class ActionHandler[Command: ClassTag, Event, State](val handler: ((Any, State, ActorContext[Command]) ⇒ PersistentEffect[Event, State])) { - def onSignal(signalHandler: PartialFunction[(Any, State, ActorContext[Command]), PersistentEffect[Event, State]]): ActionHandler[Command, Event, State] = - ActionHandler { - case (command: Command, state, ctx) ⇒ handler(command, state, ctx) - case (signal: Signal, state, ctx) ⇒ signalHandler.orElse(unhandledSignal).apply((signal, state, ctx)) - case _ ⇒ Unhandled() - } - private val unhandledSignal: PartialFunction[(Any, State, ActorContext[Command]), PersistentEffect[Event, State]] = { case _ ⇒ Unhandled() } + type CommandHandler[Command, Event, State] = Function3[Command, State, ActorContext[Command], PersistentEffect[Event, State]] + type SignalHandler[Command, Event, State] = PartialFunction[(Signal, State, ActorContext[Command]), PersistentEffect[Event, State]] + + /** + * `Actions` defines command handlers and partial function for other signals, + * e.g. `Termination` messages if `watch` is used. + * + * Note that you can have different actions based on current state by using + * [[Actions#byState]]. + */ + object Actions { + def apply[Command, Event, State](commandHandler: CommandHandler[Command, Event, State]): Actions[Command, Event, State] = + new Actions(commandHandler, Map.empty) + + /** + * Select different actions based on current state. + */ + def byState[Command, Event, State](choice: State ⇒ Actions[Command, Event, State]): Actions[Command, Event, State] = + new ByStateActions(choice, signalHandler = PartialFunction.empty) + } - object ActionHandler { - def cmd[Command: ClassTag, Event, State](commandHandler: Command ⇒ PersistentEffect[Event, State]): ActionHandler[Command, Event, State] = ??? - def apply[Command: ClassTag, Event, State](commandHandler: ((Command, State, ActorContext[Command]) ⇒ PersistentEffect[Event, State])): ActionHandler[Command, Event, State] = - new ActionHandler(commandHandler.asInstanceOf[((Any, State, ActorContext[Command]) ⇒ PersistentEffect[Event, State])]) - def byState[Command: ClassTag, Event, State](actionHandler: State ⇒ ActionHandler[Command, Event, State]): ActionHandler[Command, Event, State] = - new ActionHandler(handler = { - case (action, state, ctx) ⇒ actionHandler(state).handler(action, state, ctx) - }) + + /** + * INTERNAL API + */ + @InternalApi private[akka] final class ByStateActions[Command, Event, State]( + choice: State ⇒ Actions[Command, Event, State], + signalHandler: SignalHandler[Command, Event, State]) + extends Actions[Command, Event, State]( + commandHandler = (cmd, state, ctx) ⇒ choice(state).commandHandler(cmd, state, ctx), + signalHandler) { + + // SignalHandler may be registered in the wrapper or in the wrapped + private[akka] override def sigHandler(state: State): SignalHandler[Command, Event, State] = + choice(state).sigHandler(state).orElse(signalHandler) + + // override to preserve the ByStateActions + private[akka] override def withSignalHandler( + handler: SignalHandler[Command, Event, State]): Actions[Command, Event, State] = + new ByStateActions(choice, handler) + } + + /** + * `Actions` defines command handlers and partial function for other signals, + * e.g. `Termination` messages if `watch` is used. + * `Actions` is an immutable class. + */ + @DoNotInherit class Actions[Command, Event, State] private[akka] ( + val commandHandler: CommandHandler[Command, Event, State], + val signalHandler: SignalHandler[Command, Event, State]) { + + @InternalApi private[akka] def sigHandler(state: State): SignalHandler[Command, Event, State] = + signalHandler + + def onSignal(handler: SignalHandler[Command, Event, State]): Actions[Command, Event, State] = + withSignalHandler(signalHandler.orElse(handler)) + + /** INTERNAL API */ + @InternalApi private[akka] def withSignalHandler( + handler: SignalHandler[Command, Event, State]): Actions[Command, Event, State] = + new Actions(commandHandler, handler) + + } + } -class PersistentBehavior[Command, Event, State] extends ExtensibleBehavior[Command] { - override def receiveSignal(ctx: typed.ActorContext[Command], msg: Signal): Behavior[Command] = ??? - override def receiveMessage(ctx: typed.ActorContext[Command], msg: Command): Behavior[Command] = ??? +class PersistentBehavior[Command, Event, State]( + val persistenceId: String, + val initialState: State, + val actions: PersistentActor.Actions[Command, Event, State], + val onEvent: (Event, State) ⇒ State, + val recoveryCompleted: (State, ActorContext[Command]) ⇒ State) extends UntypedBehavior[Command] { + import PersistentActor._ - def onRecoveryComplete(callback: (ActorContext[Command], State) ⇒ Unit): PersistentBehavior[Command, Event, State] = ??? - def snapshotOnState(predicate: State ⇒ Boolean): PersistentBehavior[Command, Event, State] = ??? - def snapshotOn(predicate: (State, Event) ⇒ Boolean): PersistentBehavior[Command, Event, State] = ??? + /** INTERNAL API */ + @InternalApi private[akka] override def untypedProps: akka.actor.Props = PersistentActorImpl.props(() ⇒ this) + + /** + * The `callback` function is called to notify the actor that the recovery process + * is finished. + */ + def onRecoveryCompleted(callback: (State, ActorContext[Command]) ⇒ State): PersistentBehavior[Command, Event, State] = + copy(recoveryCompleted = callback) + + def snapshotOnState(predicate: State ⇒ Boolean): PersistentBehavior[Command, Event, State] = ??? // FIXME + + def snapshotOn(predicate: (State, Event) ⇒ Boolean): PersistentBehavior[Command, Event, State] = ??? // FIXME + + private def copy( + persistenceId: String = persistenceId, + initialState: State = initialState, + actions: Actions[Command, Event, State] = actions, + onEvent: (Event, State) ⇒ State = onEvent, + recoveryCompleted: (State, ActorContext[Command]) ⇒ State = recoveryCompleted): PersistentBehavior[Command, Event, State] = + new PersistentBehavior(persistenceId, initialState, actions, onEvent, recoveryCompleted) } diff --git a/build.sbt b/build.sbt index e7e1544077..ee9f6c0dd9 100644 --- a/build.sbt +++ b/build.sbt @@ -163,7 +163,8 @@ lazy val typed = akkaModule("akka-typed") cluster % "compile->compile;test->test", clusterTools, clusterSharding, - distributedData) + distributedData, + persistence % "compile->compile;test->test") lazy val typedTests = akkaModule("akka-typed-tests") .dependsOn(typed, typedTestkit % "compile->compile;test->test")