From 70e225b734ae0d72378db833905b7802fcdfbbaf Mon Sep 17 00:00:00 2001 From: Konrad Malawski Date: Tue, 23 Jan 2018 19:32:11 +0900 Subject: [PATCH] =per native typed implementation of Eventsourced=>PersistendBehavior timeout is explicitly a message of Command persitAll and chainable side effects work well more tests pasing additional sanity check that mutable behaviors work as expected unstashing needs to "loop through" the AdapterActor otherwise Stopped won't work solve unstashing/stop issue, by not randomly init()ing, but unstashing snapshotting works all tests green rebased nicer log source remove IncomingCommand wrapper, we dont need it no need for shared counter remove not needed methods and state more state cleanup, using Behaviors.same reminder that we DO need that same alias, since stash does not work with the Behavior.same introduce config for stash buffer stopping now works after persisting compile fix cleanup reduced number of adapter styles needed for co-existence of persistence final cleanup done, less passing around 40 objects, carriers provided now --- .../scala/akka/actor/typed/BehaviorSpec.scala | 124 +++--- .../scala/akka/actor/typed/WatchSpec.scala | 7 + .../typed/scaladsl/adapter/AdapterSpec.scala | 12 +- .../scala/akka/actor/typed/Behavior.scala | 31 +- .../typed/internal/ActorContextImpl.scala | 1 - .../actor/typed/internal/BehaviorImpl.scala | 2 + .../typed/internal/StashBufferImpl.scala | 8 +- .../typed/internal/TimerSchedulerImpl.scala | 12 +- .../adapter/ActorContextAdapter.scala | 12 +- .../main/scala/akka/actor/typed/package.scala | 3 + .../actor/typed/scaladsl/ActorContext.scala | 3 +- .../akka/actor/typed/scaladsl/Behaviors.scala | 2 +- .../typed/scaladsl/adapter/package.scala | 10 +- .../main/scala/akka/util/ConstantFun.scala | 6 + .../typed/internal/ClusterShardingImpl.scala | 6 +- .../ClusterShardingPersistenceSpec.scala | 2 +- .../AdaptedClusterSingletonImpl.scala | 6 +- .../ClusterSingletonPersistenceSpec.scala | 3 +- .../src/main/paradox/typed/persistence.md | 6 +- .../src/main/resources/reference.conf | 11 + .../typed/internal/EffectImpl.scala | 39 +- .../typed/internal/EventsourcedBehavior.scala | 102 +++++ .../typed/internal/EventsourcedCarriers.scala | 20 + .../EventsourcedRecoveringEvents.scala | 222 +++++++++++ .../EventsourcedRecoveringSnapshot.scala | 217 +++++++++++ ...EventsourcedRequestingRecoveryPermit.scala | 96 +++++ .../typed/internal/EventsourcedRunning.scala | 365 ++++++++++++++++++ .../EventsourcedStashManagement.scala | 76 ++++ .../typed/internal/PersistentActorImpl.scala | 157 -------- .../persistence/typed/javadsl/Effect.scala | 7 +- .../typed/javadsl/PersistentBehavior.scala | 105 +++++ .../typed/javadsl/PersistentBehaviors.scala | 78 ---- .../persistence/typed/scaladsl/Effect.scala | 78 ++++ .../typed/scaladsl/PersistentBehaviors.scala | 239 +++++++----- .../typed/javadsl/PersistentActorTest.java | 6 +- .../typed/BasicPersistentBehaviorsTest.java | 14 +- .../PersistentActorCompileOnlyTest.scala | 2 +- ...pec.scala => PersistentBehaviorSpec.scala} | 93 +++-- .../typed/InDepthPersistentBehaviorSpec.scala | 3 +- .../scala/akka/persistence/Eventsourced.scala | 22 +- .../akka/persistence/RecoveryPermitter.scala | 16 +- .../scala/akka/persistence/Snapshotter.scala | 2 +- .../stream/impl/io/InputStreamSinkStage.scala | 2 +- 43 files changed, 1696 insertions(+), 532 deletions(-) create mode 100644 akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedBehavior.scala create mode 100644 akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedCarriers.scala create mode 100644 akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRecoveringEvents.scala create mode 100644 akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRecoveringSnapshot.scala create mode 100644 akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRequestingRecoveryPermit.scala create mode 100644 akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala create mode 100644 akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedStashManagement.scala delete mode 100644 akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/PersistentActorImpl.scala create mode 100644 akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/PersistentBehavior.scala delete mode 100644 akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/PersistentBehaviors.scala create mode 100644 akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/Effect.scala rename akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/{PersistentActorSpec.scala => PersistentBehaviorSpec.scala} (84%) diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/BehaviorSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/BehaviorSpec.scala index 6332519a9e..4530ef9359 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/BehaviorSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/BehaviorSpec.scala @@ -3,8 +3,8 @@ */ package akka.actor.typed -import akka.actor.typed.scaladsl.{ Behaviors ⇒ SActor } -import akka.actor.typed.javadsl.{ ActorContext ⇒ JActorContext, Behaviors ⇒ JActor } +import akka.actor.typed.scaladsl.{ Behaviors ⇒ SBehaviors } +import akka.actor.typed.javadsl.{ ActorContext ⇒ JActorContext, Behaviors ⇒ JBehaviors } import akka.japi.function.{ Function ⇒ F1e, Function2 ⇒ F2, Procedure2 ⇒ P2 } import akka.japi.pf.{ FI, PFBuilder } import java.util.function.{ Function ⇒ F1 } @@ -137,16 +137,16 @@ object BehaviorSpec { } def mkFull(monitor: ActorRef[Event], state: State = StateA): Behavior[Command] = { - SActor.immutable[Command] { + SBehaviors.immutable[Command] { case (ctx, GetSelf) ⇒ monitor ! Self(ctx.self) - SActor.same + SBehaviors.same case (_, Miss) ⇒ monitor ! Missed - SActor.unhandled + SBehaviors.unhandled case (_, Ignore) ⇒ monitor ! Ignored - SActor.same + SBehaviors.same case (_, Ping) ⇒ monitor ! Pong mkFull(monitor, state) @@ -155,13 +155,13 @@ object BehaviorSpec { mkFull(monitor, state.next) case (_, GetState()) ⇒ monitor ! state - SActor.same - case (_, Stop) ⇒ SActor.stopped - case (_, _) ⇒ SActor.unhandled + SBehaviors.same + case (_, Stop) ⇒ SBehaviors.stopped + case (_, _) ⇒ SBehaviors.unhandled } onSignal { case (_, signal) ⇒ monitor ! GotSignal(signal) - SActor.same + SBehaviors.same } } /* @@ -345,16 +345,16 @@ class FullBehaviorSpec extends TypedAkkaSpec with Messages with BecomeWithLifecy class ImmutableBehaviorSpec extends Messages with BecomeWithLifecycle with Stoppable { override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor, StateA) → null private def behv(monitor: ActorRef[Event], state: State): Behavior[Command] = { - SActor.immutable[Command] { + SBehaviors.immutable[Command] { case (ctx, GetSelf) ⇒ monitor ! Self(ctx.self) - SActor.same + SBehaviors.same case (_, Miss) ⇒ monitor ! Missed - SActor.unhandled + SBehaviors.unhandled case (_, Ignore) ⇒ monitor ! Ignored - SActor.same + SBehaviors.same case (_, Ping) ⇒ monitor ! Pong behv(monitor, state) @@ -363,13 +363,13 @@ class ImmutableBehaviorSpec extends Messages with BecomeWithLifecycle with Stopp behv(monitor, state.next) case (_, GetState()) ⇒ monitor ! state - SActor.same - case (_, Stop) ⇒ SActor.stopped - case (_, _: AuxPing) ⇒ SActor.unhandled + SBehaviors.same + case (_, Stop) ⇒ SBehaviors.stopped + case (_, _: AuxPing) ⇒ SBehaviors.unhandled } onSignal { case (_, signal) ⇒ monitor ! GotSignal(signal) - SActor.same + SBehaviors.same } } } @@ -379,18 +379,18 @@ class ImmutableWithSignalScalaBehaviorSpec extends TypedAkkaSpec with Messages w override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor) → null def behv(monitor: ActorRef[Event], state: State = StateA): Behavior[Command] = - SActor.immutable[Command] { + SBehaviors.immutable[Command] { (ctx, msg) ⇒ msg match { case GetSelf ⇒ monitor ! Self(ctx.self) - SActor.same + SBehaviors.same case Miss ⇒ monitor ! Missed - SActor.unhandled + SBehaviors.unhandled case Ignore ⇒ monitor ! Ignored - SActor.same + SBehaviors.same case Ping ⇒ monitor ! Pong behv(monitor, state) @@ -399,14 +399,14 @@ class ImmutableWithSignalScalaBehaviorSpec extends TypedAkkaSpec with Messages w behv(monitor, state.next) case GetState() ⇒ monitor ! state - SActor.same - case Stop ⇒ SActor.stopped - case _: AuxPing ⇒ SActor.unhandled + SBehaviors.same + case Stop ⇒ SBehaviors.stopped + case _: AuxPing ⇒ SBehaviors.unhandled } } onSignal { case (_, sig) ⇒ monitor ! GotSignal(sig) - SActor.same + SBehaviors.same } } @@ -415,17 +415,17 @@ class ImmutableScalaBehaviorSpec extends Messages with Become with Stoppable { override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor, StateA) → null def behv(monitor: ActorRef[Event], state: State): Behavior[Command] = - SActor.immutable[Command] { (ctx, msg) ⇒ + SBehaviors.immutable[Command] { (ctx, msg) ⇒ msg match { case GetSelf ⇒ monitor ! Self(ctx.self) - SActor.same + SBehaviors.same case Miss ⇒ monitor ! Missed - SActor.unhandled + SBehaviors.unhandled case Ignore ⇒ monitor ! Ignored - SActor.same + SBehaviors.same case Ping ⇒ monitor ! Pong behv(monitor, state) @@ -434,9 +434,9 @@ class ImmutableScalaBehaviorSpec extends Messages with Become with Stoppable { behv(monitor, state.next) case GetState() ⇒ monitor ! state - SActor.same - case Stop ⇒ SActor.stopped - case _: AuxPing ⇒ SActor.unhandled + SBehaviors.same + case Stop ⇒ SBehaviors.stopped + case _: AuxPing ⇒ SBehaviors.unhandled } } } @@ -446,8 +446,8 @@ class MutableScalaBehaviorSpec extends Messages with Become with Stoppable { override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor) → null def behv(monitor: ActorRef[Event]): Behavior[Command] = - SActor.mutable[Command] { ctx ⇒ - new SActor.MutableBehavior[Command] { + SBehaviors.mutable[Command] { ctx ⇒ + new SBehaviors.MutableBehavior[Command] { private var state: State = StateA override def onMessage(msg: Command): Behavior[Command] = { @@ -457,10 +457,10 @@ class MutableScalaBehaviorSpec extends Messages with Become with Stoppable { this case Miss ⇒ monitor ! Missed - SActor.unhandled + SBehaviors.unhandled case Ignore ⇒ monitor ! Ignored - SActor.same // this or same works the same way + SBehaviors.same // this or same works the same way case Ping ⇒ monitor ! Pong this @@ -471,8 +471,8 @@ class MutableScalaBehaviorSpec extends Messages with Become with Stoppable { case GetState() ⇒ monitor ! state this - case Stop ⇒ SActor.stopped - case _: AuxPing ⇒ SActor.unhandled + case Stop ⇒ SBehaviors.stopped + case _: AuxPing ⇒ SBehaviors.unhandled } } } @@ -481,7 +481,7 @@ class MutableScalaBehaviorSpec extends Messages with Become with Stoppable { class WidenedScalaBehaviorSpec extends ImmutableWithSignalScalaBehaviorSpec with Reuse with Siphon { - import SActor.BehaviorDecorators + import SBehaviors.BehaviorDecorators override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = { val inbox = TestInbox[Command]("widenedListener") @@ -494,7 +494,7 @@ class DeferredScalaBehaviorSpec extends ImmutableWithSignalScalaBehaviorSpec { override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = { val inbox = TestInbox[Done]("deferredListener") - (SActor.setup(_ ⇒ { + (SBehaviors.setup(_ ⇒ { inbox.ref ! Done super.behavior(monitor)._1 }), inbox) @@ -507,30 +507,30 @@ class DeferredScalaBehaviorSpec extends ImmutableWithSignalScalaBehaviorSpec { class TapScalaBehaviorSpec extends ImmutableWithSignalScalaBehaviorSpec with Reuse with SignalSiphon { override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = { val inbox = TestInbox[Either[Signal, Command]]("tapListener") - (SActor.tap((_, msg) ⇒ inbox.ref ! Right(msg), (_, sig) ⇒ inbox.ref ! Left(sig), super.behavior(monitor)._1), inbox) + (SBehaviors.tap((_, msg) ⇒ inbox.ref ! Right(msg), (_, sig) ⇒ inbox.ref ! Left(sig), super.behavior(monitor)._1), inbox) } } class RestarterScalaBehaviorSpec extends ImmutableWithSignalScalaBehaviorSpec with Reuse { override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = { - SActor.supervise(super.behavior(monitor)._1).onFailure(SupervisorStrategy.restart) → null + SBehaviors.supervise(super.behavior(monitor)._1).onFailure(SupervisorStrategy.restart) → null } } class ImmutableWithSignalJavaBehaviorSpec extends Messages with BecomeWithLifecycle with Stoppable { override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor) → null def behv(monitor: ActorRef[Event], state: State = StateA): Behavior[Command] = - JActor.immutable( + JBehaviors.immutable( fc((ctx, msg) ⇒ msg match { case GetSelf ⇒ monitor ! Self(ctx.getSelf) - SActor.same + SBehaviors.same case Miss ⇒ monitor ! Missed - SActor.unhandled + SBehaviors.unhandled case Ignore ⇒ monitor ! Ignored - SActor.same + SBehaviors.same case Ping ⇒ monitor ! Pong behv(monitor, state) @@ -539,31 +539,31 @@ class ImmutableWithSignalJavaBehaviorSpec extends Messages with BecomeWithLifecy behv(monitor, state.next) case GetState() ⇒ monitor ! state - SActor.same - case Stop ⇒ SActor.stopped - case _: AuxPing ⇒ SActor.unhandled + SBehaviors.same + case Stop ⇒ SBehaviors.stopped + case _: AuxPing ⇒ SBehaviors.unhandled }), fs((_, sig) ⇒ { monitor ! GotSignal(sig) - SActor.same + SBehaviors.same })) } class ImmutableJavaBehaviorSpec extends Messages with Become with Stoppable { override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor, StateA) → null def behv(monitor: ActorRef[Event], state: State): Behavior[Command] = - JActor.immutable { + JBehaviors.immutable { fc((ctx, msg) ⇒ msg match { case GetSelf ⇒ monitor ! Self(ctx.getSelf) - SActor.same + SBehaviors.same case Miss ⇒ monitor ! Missed - SActor.unhandled + SBehaviors.unhandled case Ignore ⇒ monitor ! Ignored - SActor.same + SBehaviors.same case Ping ⇒ monitor ! Pong behv(monitor, state) @@ -572,9 +572,9 @@ class ImmutableJavaBehaviorSpec extends Messages with Become with Stoppable { behv(monitor, state.next) case GetState() ⇒ monitor ! state - SActor.same - case Stop ⇒ SActor.stopped - case _: AuxPing ⇒ SActor.unhandled + SBehaviors.same + case Stop ⇒ SBehaviors.stopped + case _: AuxPing ⇒ SBehaviors.unhandled }) } } @@ -582,7 +582,7 @@ class ImmutableJavaBehaviorSpec extends Messages with Become with Stoppable { class WidenedJavaBehaviorSpec extends ImmutableWithSignalJavaBehaviorSpec with Reuse with Siphon { override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = { val inbox = TestInbox[Command]("widenedListener") - JActor.widened(super.behavior(monitor)._1, pf(_.`match`(classOf[Command], fi(x ⇒ { + JBehaviors.widened(super.behavior(monitor)._1, pf(_.`match`(classOf[Command], fi(x ⇒ { inbox.ref ! x x })))) → inbox @@ -594,7 +594,7 @@ class DeferredJavaBehaviorSpec extends ImmutableWithSignalJavaBehaviorSpec { override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = { val inbox = TestInbox[Done]("deferredListener") - (JActor.setup(df(_ ⇒ { + (JBehaviors.setup(df(_ ⇒ { inbox.ref ! Done super.behavior(monitor)._1 })), inbox) @@ -607,7 +607,7 @@ class DeferredJavaBehaviorSpec extends ImmutableWithSignalJavaBehaviorSpec { class TapJavaBehaviorSpec extends ImmutableWithSignalJavaBehaviorSpec with Reuse with SignalSiphon { override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = { val inbox = TestInbox[Either[Signal, Command]]("tapListener") - (JActor.tap( + (JBehaviors.tap( pc((_, msg) ⇒ inbox.ref ! Right(msg)), ps((_, sig) ⇒ inbox.ref ! Left(sig)), super.behavior(monitor)._1), inbox) @@ -616,7 +616,7 @@ class TapJavaBehaviorSpec extends ImmutableWithSignalJavaBehaviorSpec with Reuse class RestarterJavaBehaviorSpec extends ImmutableWithSignalJavaBehaviorSpec with Reuse { override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = { - JActor.supervise(super.behavior(monitor)._1) + JBehaviors.supervise(super.behavior(monitor)._1) .onFailure(classOf[Exception], SupervisorStrategy.restart) → null } } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/WatchSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/WatchSpec.scala index 4b510c68b1..0a0bff0a33 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/WatchSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/WatchSpec.scala @@ -5,6 +5,7 @@ package akka.actor.typed import akka.Done import akka.actor.typed.scaladsl.Behaviors +import akka.actor.typed.scaladsl.Behaviors.MutableBehavior import akka.actor.typed.scaladsl.adapter._ import akka.testkit.EventFilter import akka.testkit.typed.scaladsl.{ ActorTestKit, TestProbe } @@ -23,6 +24,12 @@ object WatchSpec { case (_, Stop) ⇒ Behaviors.stopped } + val mutableTerminatorBehavior = new MutableBehavior[Stop.type] { + override def onMessage(msg: Stop.type) = msg match { + case Stop ⇒ Behaviors.stopped + } + } + sealed trait Message sealed trait CustomTerminationMessage extends Message case object CustomTerminationMessage extends CustomTerminationMessage diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/adapter/AdapterSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/adapter/AdapterSpec.scala index fa71c68e65..6b2d2bf2b9 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/adapter/AdapterSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/adapter/AdapterSpec.scala @@ -10,7 +10,7 @@ import akka.actor.{ InvalidMessageException, Props } import akka.actor.typed.scaladsl.Behaviors import akka.{ Done, NotUsed, actor ⇒ untyped } import akka.testkit._ -import akka.actor.typed.Behavior.UntypedBehavior +import akka.actor.typed.Behavior.UntypedPropsBehavior import scala.concurrent.Await @@ -301,8 +301,9 @@ class AdapterSpec extends AkkaSpec { "spawn untyped behavior anonymously" in { val probe = TestProbe() - val untypedBehavior: Behavior[String] = new UntypedBehavior[String] { - override private[akka] def untypedProps: Props = untypedForwarder(probe.ref) + val untypedBehavior: Behavior[String] = new UntypedPropsBehavior[String] { + override def untypedProps(props: akka.actor.typed.Props): akka.actor.Props = + untypedForwarder(probe.ref) } val ref = system.spawnAnonymous(untypedBehavior) ref ! "hello" @@ -311,8 +312,9 @@ class AdapterSpec extends AkkaSpec { "spawn untyped behavior" in { val probe = TestProbe() - val untypedBehavior: Behavior[String] = new UntypedBehavior[String] { - override private[akka] def untypedProps: Props = untypedForwarder(probe.ref) + val untypedBehavior: Behavior[String] = new UntypedPropsBehavior[String] { + override def untypedProps(props: akka.actor.typed.Props): akka.actor.Props = + untypedForwarder(probe.ref) } val ref = system.spawn(untypedBehavior, "test") ref ! "hello" diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala index 828a65e7b4..96f6c8c2c4 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala @@ -150,7 +150,7 @@ object Behavior { } /** - * INTERNAL API. + * INTERNAL API */ @InternalApi private[akka] object UnhandledBehavior extends Behavior[Nothing] { @@ -158,14 +158,13 @@ object Behavior { } /** - * INTERNAL API. + * INTERNAL API + * Used to create untyped props from behaviours, or directly returning an untyped props that implements this behavior. */ @InternalApi - private[akka] abstract class UntypedBehavior[T] extends Behavior[T] { - /** - * INTERNAL API - */ - @InternalApi private[akka] def untypedProps: akka.actor.Props + private[akka] abstract class UntypedPropsBehavior[T] extends Behavior[T] { + /** INTERNAL API */ + @InternalApi private[akka] def untypedProps(props: Props): akka.actor.Props } /** @@ -180,7 +179,8 @@ object Behavior { * Not placed in internal.BehaviorImpl because Behavior is sealed. */ @InternalApi - private[akka] final case class DeferredBehavior[T](factory: SAC[T] ⇒ Behavior[T]) extends Behavior[T] { + @DoNotInherit + private[akka] class DeferredBehavior[T](val factory: SAC[T] ⇒ Behavior[T]) extends Behavior[T] { /** start the deferred behavior */ @throws(classOf[Exception]) @@ -188,6 +188,10 @@ object Behavior { override def toString: String = s"Deferred(${LineNumbers(factory)})" } + object DeferredBehavior { + def apply[T](factory: SAC[T] ⇒ Behavior[T]) = + new DeferredBehavior[T](factory) + } /** * INTERNAL API. @@ -299,12 +303,12 @@ object Behavior { def interpretSignal[T](behavior: Behavior[T], ctx: ActorContext[T], signal: Signal): Behavior[T] = interpret(behavior, ctx, signal) - private def interpret[T](behavior: Behavior[T], ctx: ActorContext[T], msg: Any): Behavior[T] = + private def interpret[T](behavior: Behavior[T], ctx: ActorContext[T], msg: Any): Behavior[T] = { behavior match { case null ⇒ throw new InvalidMessageException("[null] is not an allowed message") case SameBehavior | UnhandledBehavior ⇒ throw new IllegalArgumentException(s"cannot execute with [$behavior] as behavior") - case _: UntypedBehavior[_] ⇒ + case _: UntypedPropsBehavior[_] ⇒ throw new IllegalArgumentException(s"cannot wrap behavior [$behavior] in " + "Behaviors.setup, Behaviors.supervise or similar") case d: DeferredBehavior[_] ⇒ throw new IllegalArgumentException(s"deferred [$d] should not be passed to interpreter") @@ -318,6 +322,7 @@ object Behavior { } start(possiblyDeferredResult, ctx) } + } /** * INTERNAL API @@ -331,10 +336,8 @@ object Behavior { if (!Behavior.isAlive(b2) || !messages.hasNext) b2 else { val nextB = messages.next() match { - case sig: Signal ⇒ - Behavior.interpretSignal(b2, ctx, sig) - case msg ⇒ - Behavior.interpretMessage(b2, ctx, msg) + case sig: Signal ⇒ Behavior.interpretSignal(b2, ctx, sig) + case msg ⇒ Behavior.interpretMessage(b2, ctx, msg) } interpretOne(Behavior.canonicalize(nextB, b, ctx)) // recursive } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala index f88a2b1e85..961ea0121e 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala @@ -18,7 +18,6 @@ import scala.util.Try import akka.annotation.InternalApi import akka.util.OptionVal -import akka.event.LoggingAdapter import akka.util.Timeout /** diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/BehaviorImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/BehaviorImpl.scala index 8931abb11c..387cf08c1e 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/BehaviorImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/BehaviorImpl.scala @@ -61,7 +61,9 @@ import scala.reflect.ClassTag override def receiveSignal(ctx: AC[T], msg: Signal): Behavior[T] = onSignal.applyOrElse((ctx.asScala, msg), Behavior.unhandledSignal.asInstanceOf[PartialFunction[(SAC[T], Signal), Behavior[T]]]) + override def receiveMessage(ctx: AC[T], msg: T) = onMessage(ctx.asScala, msg) + override def toString = s"Immutable(${LineNumbers(onMessage)})" } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/StashBufferImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/StashBufferImpl.scala index bb690fe01c..b219eb2e36 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/StashBufferImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/StashBufferImpl.scala @@ -86,7 +86,7 @@ import akka.util.ConstantFun } } - override def forEach(f: Consumer[T]): Unit = foreach(f.accept) + override def forEach(f: Consumer[T]): Unit = foreach(f.accept(_)) override def unstashAll(ctx: scaladsl.ActorContext[T], behavior: Behavior[T]): Behavior[T] = unstash(ctx, behavior, size, ConstantFun.scalaIdentityFunction[T]) @@ -98,8 +98,8 @@ import akka.util.ConstantFun numberOfMessages: Int, wrap: T ⇒ T): Behavior[T] = { val iter = new Iterator[T] { override def hasNext: Boolean = StashBufferImpl.this.nonEmpty - override def next(): T = StashBufferImpl.this.dropHead() - }.take(numberOfMessages).map(wrap) + override def next(): T = wrap(StashBufferImpl.this.dropHead()) + }.take(numberOfMessages) val ctx = scaladslCtx.asInstanceOf[ActorContext[T]] Behavior.interpretMessages[T](behavior, ctx, iter) } @@ -108,5 +108,7 @@ import akka.util.ConstantFun numberOfMessages: Int, wrap: JFunction[T, T]): Behavior[T] = unstash(ctx.asScala, behavior, numberOfMessages, x ⇒ wrap.apply(x)) + override def toString: String = + s"StashBuffer($size/$capacity)" } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/TimerSchedulerImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/TimerSchedulerImpl.scala index d5ed348a4d..9ccb6eb574 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/TimerSchedulerImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/TimerSchedulerImpl.scala @@ -26,11 +26,13 @@ import scala.reflect.ClassTag final case class TimerMsg(key: Any, generation: Int, owner: AnyRef) def withTimers[T](factory: TimerSchedulerImpl[T] ⇒ Behavior[T]): Behavior[T] = { - scaladsl.Behaviors.setup[T] { ctx ⇒ - val timerScheduler = new TimerSchedulerImpl[T](ctx) - val behavior = factory(timerScheduler) - timerScheduler.intercept(behavior) - } + scaladsl.Behaviors.setup[T](wrapWithTimers(factory)) + } + + def wrapWithTimers[T](factory: TimerSchedulerImpl[T] ⇒ Behavior[T])(ctx: ActorContext[T]): Behavior[T] = { + val timerScheduler = new TimerSchedulerImpl[T](ctx) + val behavior = factory(timerScheduler) + timerScheduler.intercept(behavior) } } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorContextAdapter.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorContextAdapter.scala index 01003415f2..5b3f1ae9a8 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorContextAdapter.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorContextAdapter.scala @@ -6,7 +6,7 @@ package internal package adapter import akka.actor.ExtendedActorSystem -import akka.actor.typed.Behavior.UntypedBehavior +import akka.actor.typed.Behavior.UntypedPropsBehavior import akka.annotation.InternalApi import akka.util.OptionVal import akka.{ ConfigurationException, actor ⇒ a } @@ -125,9 +125,10 @@ import scala.concurrent.duration._ def spawnAnonymous[T](ctx: akka.actor.ActorContext, behavior: Behavior[T], props: Props): ActorRef[T] = { behavior match { - case b: UntypedBehavior[_] ⇒ + case b: UntypedPropsBehavior[_] ⇒ // TODO dispatcher from props - ActorRefAdapter(ctx.actorOf(b.untypedProps)) + ActorRefAdapter(ctx.actorOf(b.untypedProps(props))) + case _ ⇒ try { Behavior.validateAsInitial(behavior) @@ -141,9 +142,10 @@ import scala.concurrent.duration._ def spawn[T](ctx: akka.actor.ActorContext, behavior: Behavior[T], name: String, props: Props): ActorRef[T] = { behavior match { - case b: UntypedBehavior[_] ⇒ + case b: UntypedPropsBehavior[_] ⇒ // TODO dispatcher from props - ActorRefAdapter(ctx.actorOf(b.untypedProps, name)) + ActorRefAdapter(ctx.actorOf(b.untypedProps(props), name)) + case _ ⇒ try { Behavior.validateAsInitial(behavior) diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/package.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/package.scala index c2d4d6ebef..e4429e6c43 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/package.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/package.scala @@ -1,3 +1,6 @@ +/** + * Copyright (C) 2017-2018 Lightbend Inc. + */ package akka.actor import akka.actor.typed.internal.ActorRefImpl diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/ActorContext.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/ActorContext.scala index 973a2b7e00..1e2de14a56 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/ActorContext.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/ActorContext.scala @@ -273,7 +273,6 @@ trait ActorContext[T] { this: akka.actor.typed.javadsl.ActorContext[T] ⇒ * @tparam Req The request protocol, what the other actor accepts * @tparam Res The response protocol, what the other actor sends back */ - def ask[Req, Res]( - otherActor: ActorRef[Req])(createRequest: ActorRef[Res] ⇒ Req)(mapResponse: Try[Res] ⇒ T)(implicit responseTimeout: Timeout, classTag: ClassTag[Res]): Unit + def ask[Req, Res](otherActor: ActorRef[Req])(createRequest: ActorRef[Res] ⇒ Req)(mapResponse: Try[Res] ⇒ T)(implicit responseTimeout: Timeout, classTag: ClassTag[Res]): Unit } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Behaviors.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Behaviors.scala index 5187d9d8eb..f6641f21ea 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Behaviors.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Behaviors.scala @@ -99,7 +99,7 @@ object Behaviors { @throws(classOf[Exception]) override final def receiveSignal(ctx: akka.actor.typed.ActorContext[T], msg: Signal): Behavior[T] = - onSignal.applyOrElse(msg, { case _ ⇒ Behavior.unhandled }: PartialFunction[Signal, Behavior[T]]) + onSignal.applyOrElse(msg, { case msg ⇒ Behavior.unhandled }: PartialFunction[Signal, Behavior[T]]) /** * Override this method to process an incoming [[akka.actor.typed.Signal]] and return the next behavior. diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/adapter/package.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/adapter/package.scala index 506851edd2..e1d20ad33c 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/adapter/package.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/adapter/package.scala @@ -4,7 +4,7 @@ package akka.actor.typed package scaladsl -import akka.actor.typed.Behavior.UntypedBehavior +import akka.actor.typed.Behavior.UntypedPropsBehavior import akka.actor.typed.internal.adapter._ /** @@ -38,8 +38,8 @@ package object adapter { def spawnAnonymous[T](behavior: Behavior[T], props: Props = Props.empty): ActorRef[T] = { behavior match { - case b: UntypedBehavior[_] ⇒ - ActorRefAdapter(sys.actorOf(b.untypedProps)) + case b: UntypedPropsBehavior[_] ⇒ + ActorRefAdapter(sys.actorOf(b.untypedProps(props))) case _ ⇒ ActorRefAdapter(sys.actorOf(PropsAdapter(Behavior.validateAsInitial(behavior), props))) } @@ -47,8 +47,8 @@ package object adapter { def spawn[T](behavior: Behavior[T], name: String, props: Props = Props.empty): ActorRef[T] = { behavior match { - case b: UntypedBehavior[_] ⇒ - ActorRefAdapter(sys.actorOf(b.untypedProps, name)) + case b: UntypedPropsBehavior[_] ⇒ + ActorRefAdapter(sys.actorOf(b.untypedProps(props), name)) case _ ⇒ ActorRefAdapter(sys.actorOf(PropsAdapter(Behavior.validateAsInitial(behavior), props), name)) } diff --git a/akka-actor/src/main/scala/akka/util/ConstantFun.scala b/akka-actor/src/main/scala/akka/util/ConstantFun.scala index 3105d040ad..2a97f764c9 100644 --- a/akka-actor/src/main/scala/akka/util/ConstantFun.scala +++ b/akka-actor/src/main/scala/akka/util/ConstantFun.scala @@ -27,6 +27,8 @@ import akka.japi.{ Pair ⇒ JPair } def scalaAnyToNone[A, B]: A ⇒ Option[B] = none def scalaAnyTwoToNone[A, B, C]: (A, B) ⇒ Option[C] = two2none + def scalaAnyTwoToUnit[A, B]: (A, B) ⇒ Unit = two2unit + def scalaAnyThreeToFalse[A, B, C]: (A, B, C) ⇒ Boolean = three2false def javaAnyToNone[A, B]: A ⇒ Option[B] = none def nullFun[T] = _nullFun.asInstanceOf[Any ⇒ T] @@ -44,4 +46,8 @@ import akka.japi.{ Pair ⇒ JPair } private val two2none = (_: Any, _: Any) ⇒ None + private val two2unit = (_: Any, _: Any) ⇒ () + + private val three2false = (_: Any, _: Any, _: Any) ⇒ false + } diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala index 76a0ad6fc0..d461456b64 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala @@ -14,7 +14,7 @@ import akka.actor.{ InternalActorRef, Scheduler } import akka.actor.typed.ActorRef import akka.actor.typed.ActorSystem import akka.actor.typed.Behavior -import akka.actor.typed.Behavior.UntypedBehavior +import akka.actor.typed.Behavior.UntypedPropsBehavior import akka.actor.typed.Props import akka.actor.typed.internal.adapter.ActorRefAdapter import akka.actor.typed.internal.adapter.ActorSystemAdapter @@ -142,8 +142,8 @@ import akka.japi.function.{ Function ⇒ JFunction } val untypedEntityPropsFactory: String ⇒ akka.actor.Props = { entityId ⇒ behavior(entityId) match { - case u: UntypedBehavior[_] ⇒ u.untypedProps // PersistentBehavior - case b ⇒ PropsAdapter(b, entityProps) + case u: UntypedPropsBehavior[_] ⇒ u.untypedProps(Props.empty) // PersistentBehavior + case b ⇒ PropsAdapter(b, entityProps) } } diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingPersistenceSpec.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingPersistenceSpec.scala index 273a7cef4f..0d6eee4566 100644 --- a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingPersistenceSpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingPersistenceSpec.scala @@ -10,7 +10,7 @@ import akka.actor.typed.TypedAkkaSpecWithShutdown import akka.cluster.sharding.typed.ClusterShardingSettings import akka.cluster.typed.Cluster import akka.cluster.typed.Join -import akka.persistence.typed.scaladsl.PersistentBehaviors +import akka.persistence.typed.scaladsl.{ Effect, PersistentBehaviors } import akka.testkit.typed.scaladsl.{ ActorTestKit, TestProbe } import com.typesafe.config.ConfigFactory diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/AdaptedClusterSingletonImpl.scala b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/AdaptedClusterSingletonImpl.scala index 60d1bec605..610e890934 100644 --- a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/AdaptedClusterSingletonImpl.scala +++ b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/AdaptedClusterSingletonImpl.scala @@ -9,7 +9,7 @@ import java.util.function.{ Function ⇒ JFunction } import akka.actor.{ ExtendedActorSystem, InvalidActorNameException } import akka.annotation.InternalApi import akka.cluster.singleton.{ ClusterSingletonProxy, ClusterSingletonManager ⇒ OldSingletonManager } -import akka.actor.typed.Behavior.UntypedBehavior +import akka.actor.typed.Behavior.UntypedPropsBehavior import akka.cluster.typed.{ Cluster, ClusterSingleton, ClusterSingletonImpl, ClusterSingletonSettings } import akka.actor.typed.internal.adapter.ActorSystemAdapter import akka.actor.typed.scaladsl.adapter._ @@ -40,8 +40,8 @@ private[akka] final class AdaptedClusterSingletonImpl(system: ActorSystem[_]) ex val managerName = managerNameFor(singletonName) // start singleton on this node val untypedProps = behavior match { - case u: UntypedBehavior[_] ⇒ u.untypedProps // PersistentBehavior - case _ ⇒ PropsAdapter(behavior, props) + case u: UntypedPropsBehavior[_] ⇒ u.untypedProps(props) // PersistentBehavior + case _ ⇒ PropsAdapter(behavior, props) } try { untypedSystem.systemActorOf( diff --git a/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterSingletonPersistenceSpec.scala b/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterSingletonPersistenceSpec.scala index 5f5124151f..4575a19c02 100644 --- a/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterSingletonPersistenceSpec.scala +++ b/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterSingletonPersistenceSpec.scala @@ -5,8 +5,7 @@ package akka.cluster.typed import akka.actor.typed.{ ActorRef, Behavior, Props, TypedAkkaSpecWithShutdown } -import akka.persistence.typed.scaladsl.PersistentBehaviors -import akka.persistence.typed.scaladsl.PersistentBehaviors.{ CommandHandler, Effect } +import akka.persistence.typed.scaladsl.{ Effect, PersistentBehaviors } import akka.testkit.typed.scaladsl.{ ActorTestKit, TestProbe } import com.typesafe.config.ConfigFactory diff --git a/akka-docs/src/main/paradox/typed/persistence.md b/akka-docs/src/main/paradox/typed/persistence.md index 46c309a79e..19a9ccd473 100644 --- a/akka-docs/src/main/paradox/typed/persistence.md +++ b/akka-docs/src/main/paradox/typed/persistence.md @@ -62,8 +62,10 @@ and can be used to create various effects such as: External side effects can be performed after successful persist with the `andThen` function e.g @scala[`Effect.persist(..).andThen`]@java[`Effect().persist(..).andThen`]. -In the example below a reply is sent to the `replyTo` ActorRef. Note that the new state after applying -the event is passed as parameter to the `andThen` function. + +In the example below a reply is sent to the `replyTo` ActorRef. Note that the new state after applying +the event is passed as parameter to the `andThen` function. All `andThen*` registered callbacks +are executed after successful execution of the persist statement (or immediately, in case of `none` and `unhandled`). ### Event handler diff --git a/akka-persistence-typed/src/main/resources/reference.conf b/akka-persistence-typed/src/main/resources/reference.conf index e69de29bb2..15febc5cb0 100644 --- a/akka-persistence-typed/src/main/resources/reference.conf +++ b/akka-persistence-typed/src/main/resources/reference.conf @@ -0,0 +1,11 @@ +akka.persistence.typed { + + # default stash buffer size for incoming messages to persistent actors + stash-buffer-size = 1024 + + # enables automatic logging of messages stashed automatically by an PersistentBehavior, + # this may happen while it receives commands while it is recovering events or while it is persisting events + # Set to a log-level (debug, info, warn, error) to log stashed messages on given log level; + log-stashing = off + +} diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EffectImpl.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EffectImpl.scala index 315f6eaa55..2e7240d333 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EffectImpl.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EffectImpl.scala @@ -3,17 +3,15 @@ */ package akka.persistence.typed.internal -import akka.persistence.typed.{ javadsl ⇒ j } +import akka.persistence.typed.javadsl +import akka.persistence.typed.scaladsl import scala.collection.{ immutable ⇒ im } -import akka.annotation.InternalApi -import akka.persistence.typed.scaladsl.PersistentBehaviors.{ ChainableEffect, Effect } +import akka.annotation.{ DoNotInherit, InternalApi } -/** - * INTERNAL API - */ +/** INTERNAL API */ @InternalApi -private[akka] abstract class EffectImpl[+Event, State] extends j.Effect[Event, State] with Effect[Event, State] { +private[akka] abstract class EffectImpl[+Event, State] extends javadsl.Effect[Event, State] with scaladsl.Effect[Event, State] { /* All events that will be persisted in this effect */ override def events: im.Seq[Event] = Nil @@ -21,19 +19,14 @@ private[akka] abstract class EffectImpl[+Event, State] extends j.Effect[Event, S override def sideEffects[E >: Event]: im.Seq[ChainableEffect[E, State]] = Nil } -/** - * INTERNAL API - */ +/** INTERNAL API */ @InternalApi private[akka] object CompositeEffect { - def apply[Event, State](effect: EffectImpl[Event, State], sideEffects: ChainableEffect[Event, State]): EffectImpl[Event, State] = { - CompositeEffect[Event, State]( - effect, - sideEffects :: Nil - ) - } + def apply[Event, State](effect: EffectImpl[Event, State], sideEffects: ChainableEffect[Event, State]): EffectImpl[Event, State] = + CompositeEffect[Event, State](effect, sideEffects :: Nil) } +/** INTERNAL API */ @InternalApi private[akka] final case class CompositeEffect[Event, State]( persistingEffect: EffectImpl[Event, State], @@ -43,25 +36,39 @@ private[akka] final case class CompositeEffect[Event, State]( override def sideEffects[E >: Event]: im.Seq[ChainableEffect[E, State]] = _sideEffects.asInstanceOf[im.Seq[ChainableEffect[E, State]]] + override def toString: String = + s"CompositeEffect($persistingEffect, sideEffects: ${_sideEffects.size})" } +/** INTERNAL API */ @InternalApi private[akka] case object PersistNothing extends EffectImpl[Nothing, Nothing] +/** INTERNAL API */ @InternalApi private[akka] case class Persist[Event, State](event: Event) extends EffectImpl[Event, State] { override def events = event :: Nil } +/** INTERNAL API */ @InternalApi private[akka] case class PersistAll[Event, State](override val events: im.Seq[Event]) extends EffectImpl[Event, State] +/** INTERNAL API */ @InternalApi private[akka] case class SideEffect[Event, State](effect: State ⇒ Unit) extends ChainableEffect[Event, State] +/** INTERNAL API */ @InternalApi private[akka] case object Stop extends ChainableEffect[Nothing, Nothing] +/** INTERNAL API */ @InternalApi private[akka] case object Unhandled extends EffectImpl[Nothing, Nothing] +/** + * Not for user extension + */ +@DoNotInherit +abstract class ChainableEffect[Event, State] extends EffectImpl[Event, State] + diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedBehavior.scala new file mode 100644 index 0000000000..66a19b2e29 --- /dev/null +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedBehavior.scala @@ -0,0 +1,102 @@ +/** + * Copyright (C) 2016-2018 Lightbend Inc. + */ + +package akka.persistence.typed.internal + +import java.util.UUID +import java.util.concurrent.atomic.AtomicInteger + +import akka.actor.NoSerializationVerificationNeeded +import akka.actor.typed.Behavior +import akka.actor.typed.Behavior.StoppedBehavior +import akka.actor.typed.scaladsl.{ ActorContext, TimerScheduler } +import akka.annotation.InternalApi +import akka.event.{ LogSource, Logging } +import akka.persistence.typed.scaladsl.PersistentBehaviors +import akka.persistence.{ JournalProtocol, Persistence, RecoveryPermitter, SnapshotProtocol } +import akka.{ actor ⇒ a } + +/** INTERNAL API */ +@InternalApi +private[akka] object EventsourcedBehavior { + + // ok to wrap around (2*Int.MaxValue restarts will not happen within a journal roundtrip) + private[akka] val instanceIdCounter = new AtomicInteger(1) + + @InternalApi private[akka] object WriterIdentity { + def newIdentity(): WriterIdentity = { + val instanceId: Int = EventsourcedBehavior.instanceIdCounter.getAndIncrement() + val writerUuid: String = UUID.randomUUID.toString + WriterIdentity(instanceId, writerUuid) + } + } + private[akka] final case class WriterIdentity(instanceId: Int, writerUuid: String) + + /** Protocol used internally by the eventsourced behaviors, never exposed to user-land */ + private[akka] sealed trait EventsourcedProtocol + private[akka] case object RecoveryPermitGranted extends EventsourcedProtocol + private[akka] final case class JournalResponse(msg: akka.persistence.JournalProtocol.Response) extends EventsourcedProtocol + private[akka] final case class SnapshotterResponse(msg: akka.persistence.SnapshotProtocol.Response) extends EventsourcedProtocol + private[akka] final case class RecoveryTickEvent(snapshot: Boolean) extends EventsourcedProtocol + private[akka] final case class ReceiveTimeout(timeout: akka.actor.ReceiveTimeout) extends EventsourcedProtocol + + implicit object PersistentBehaviorLogSource extends LogSource[EventsourcedBehavior[_, _, _]] { + override def genString(b: EventsourcedBehavior[_, _, _]): String = { + val behaviorShortName = b match { + case _: EventsourcedRunning[_, _, _] ⇒ "running" + case _: EventsourcedRecoveringEvents[_, _, _] ⇒ "recover-events" + case _: EventsourcedRecoveringSnapshot[_, _, _] ⇒ "recover-snap" + case _: EventsourcedRequestingRecoveryPermit[_, _, _] ⇒ "awaiting-permit" + } + s"PersistentBehavior[id:${b.persistenceId}][${b.context.self.path}][$behaviorShortName]" + } + } + +} + +/** INTERNAL API */ +@InternalApi +private[akka] trait EventsourcedBehavior[Command, Event, State] { + import EventsourcedBehavior._ + import akka.actor.typed.scaladsl.adapter._ + + protected def context: ActorContext[Any] + protected def timers: TimerScheduler[Any] + + type C = Command + type AC = ActorContext[C] + type E = Event + type S = State + + // used for signaling intent in type signatures + type SeqNr = Long + + def persistenceId: String + + protected def callbacks: EventsourcedCallbacks[Command, Event, State] + protected def initialState: State = callbacks.initialState + protected def commandHandler: PersistentBehaviors.CommandHandler[Command, Event, State] = callbacks.commandHandler + protected def eventHandler: (State, Event) ⇒ State = callbacks.eventHandler + protected def snapshotWhen: (State, Event, SeqNr) ⇒ Boolean = callbacks.snapshotWhen + protected def tagger: Event ⇒ Set[String] = callbacks.tagger + + protected def pluginIds: EventsourcedPluginIds + protected final def journalPluginId: String = pluginIds.journalPluginId + protected final def snapshotPluginId: String = pluginIds.snapshotPluginId + + // ------ common ------- + + protected lazy val extension = Persistence(context.system.toUntyped) + protected lazy val journal: a.ActorRef = extension.journalFor(journalPluginId) + protected lazy val snapshotStore: a.ActorRef = extension.snapshotStoreFor(snapshotPluginId) + + protected lazy val selfUntyped: a.ActorRef = context.self.toUntyped + protected lazy val selfUntypedAdapted: a.ActorRef = context.messageAdapter[Any] { + case res: JournalProtocol.Response ⇒ JournalResponse(res) + case RecoveryPermitter.RecoveryPermitGranted ⇒ RecoveryPermitGranted + case res: SnapshotProtocol.Response ⇒ SnapshotterResponse(res) + case cmd: Command @unchecked ⇒ cmd // if it was wrong, we'll realise when trying to onMessage the cmd + }.toUntyped + +} diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedCarriers.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedCarriers.scala new file mode 100644 index 0000000000..864a4e4d70 --- /dev/null +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedCarriers.scala @@ -0,0 +1,20 @@ +package akka.persistence.typed.internal + +import akka.actor.typed.scaladsl.ActorContext +import akka.persistence.typed.scaladsl.PersistentBehaviors.CommandHandler + +/** INTERNAL API: Used to carry user callbacks between behaviors of an Persistent Behavior */ +private[akka] final case class EventsourcedCallbacks[Command, Event, State]( + initialState: State, + commandHandler: CommandHandler[Command, Event, State], + eventHandler: (State, Event) ⇒ State, + snapshotWhen: (State, Event, Long) ⇒ Boolean, + recoveryCompleted: (ActorContext[Command], State) ⇒ Unit, + tagger: Event ⇒ Set[String] +) + +/** INTERNAL API: Used to carry settings between behaviors of an Persistent Behavior */ +private[akka] final case class EventsourcedPluginIds( + journalPluginId: String, + snapshotPluginId: String +) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRecoveringEvents.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRecoveringEvents.scala new file mode 100644 index 0000000000..f09483187d --- /dev/null +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRecoveringEvents.scala @@ -0,0 +1,222 @@ +/** + * Copyright (C) 2016-2018 Lightbend Inc. + */ +package akka.persistence.typed.internal + +import akka.actor.typed.Behavior +import akka.actor.typed.scaladsl.Behaviors.MutableBehavior +import akka.actor.typed.scaladsl.{ ActorContext, Behaviors, StashBuffer, TimerScheduler } +import akka.annotation.InternalApi +import akka.event.Logging +import akka.persistence.JournalProtocol._ +import akka.persistence._ +import akka.persistence.typed.internal.EventsourcedBehavior.WriterIdentity +import akka.persistence.typed.scaladsl.PersistentBehaviors._ +import akka.util.Helpers._ + +import scala.util.control.NonFatal + +/** + * INTERNAL API + * + * Third (of four) behavior of an PersistentBehavior. + * + * In this behavior we finally start replaying events, beginning from the last applied sequence number + * (i.e. the one up-until-which the snapshot recovery has brought us). + * + * Once recovery is completed, the actor becomes [[EventsourcedRunning]], stashed messages are flushed + * and control is given to the user's handlers to drive the actors behavior from there. + * + */ +@InternalApi +private[akka] class EventsourcedRecoveringEvents[Command, Event, State]( + val persistenceId: String, + override val context: ActorContext[Any], + override val timers: TimerScheduler[Any], + override val internalStash: StashBuffer[Any], + + val recovery: Recovery, + private var sequenceNr: Long, + val writerIdentity: WriterIdentity, + + private var state: State, + + val callbacks: EventsourcedCallbacks[Command, Event, State], + val pluginIds: EventsourcedPluginIds +) extends MutableBehavior[Any] + with EventsourcedBehavior[Command, Event, State] + with EventsourcedStashManagement { + + import Behaviors.same + import EventsourcedBehavior._ + import akka.actor.typed.scaladsl.adapter._ + + protected val log = Logging(context.system.toUntyped, this) + + // -------- initialize -------- + startRecoveryTimer() + + replayEvents(sequenceNr + 1L, recovery.toSequenceNr) + // ---- end of initialize ---- + + private def commandContext: ActorContext[Command] = context.asInstanceOf[ActorContext[Command]] + + // ---------- + + def snapshotSequenceNr: Long = sequenceNr + + private def updateLastSequenceNr(persistent: PersistentRepr): Unit = + if (persistent.sequenceNr > sequenceNr) sequenceNr = persistent.sequenceNr + + private def setLastSequenceNr(value: Long): Unit = + sequenceNr = value + + // ---------- + + // FIXME it's a bit of a pain to have those lazy vals, change everything to constructor parameters + lazy val timeout = extension.journalConfigFor(journalPluginId).getMillisDuration("recovery-event-timeout") + + // protect against snapshot stalling forever because of journal overloaded and such + private val RecoveryTickTimerKey = "recovery-tick" + private def startRecoveryTimer(): Unit = timers.startPeriodicTimer(RecoveryTickTimerKey, RecoveryTickEvent(snapshot = false), timeout) + private def cancelRecoveryTimer(): Unit = timers.cancel(RecoveryTickTimerKey) + + private var eventSeenInInterval = false + + def onCommand(cmd: Command): Behavior[Any] = { + // during recovery, stash all incoming commands + stash(cmd) + same + } + + def onJournalResponse(response: JournalProtocol.Response): Behavior[Any] = try { + response match { + case ReplayedMessage(repr) ⇒ + eventSeenInInterval = true + updateLastSequenceNr(repr) + // TODO we need some state adapters here? + val newState = eventHandler(state, repr.payload.asInstanceOf[Event]) + state = newState + same + + case RecoverySuccess(highestSeqNr) ⇒ + log.debug("Recovery successful, recovered until sequenceNr: {}", highestSeqNr) + cancelRecoveryTimer() + setLastSequenceNr(highestSeqNr) + + try onRecoveryCompleted(state) + catch { case NonFatal(ex) ⇒ onRecoveryFailure(ex, Some(state)) } + + case ReplayMessagesFailure(cause) ⇒ + onRecoveryFailure(cause, event = None) + + case other ⇒ + stash(other) + Behaviors.same + } + } catch { + case NonFatal(e) ⇒ + cancelRecoveryTimer() + onRecoveryFailure(e, None) + } + + def onSnapshotterResponse(response: SnapshotProtocol.Response): Behavior[Any] = { + log.warning("Unexpected [{}] from SnapshotStore, already in recovering events state.", Logging.simpleName(response)) + Behaviors.same // ignore the response + } + + /** + * Called whenever a message replay fails. By default it logs the error. + * + * The actor is always stopped after this method has been invoked. + * + * @param cause failure cause. + * @param event the event that was processed in `receiveRecover`, if the exception was thrown there + */ + protected def onRecoveryFailure(cause: Throwable, event: Option[Any]): Behavior[Any] = { + returnRecoveryPermit("on recovery failure: " + cause.getMessage) + cancelRecoveryTimer() + + event match { + case Some(evt) ⇒ + log.error(cause, "Exception in receiveRecover when replaying event type [{}] with sequence number [{}].", evt.getClass.getName, sequenceNr) + Behaviors.stopped + + case None ⇒ + log.error(cause, "Persistence failure when replaying events. Last known sequence number [{}]", persistenceId, sequenceNr) + Behaviors.stopped + } + } + + protected def onRecoveryCompleted(state: State): Behavior[Any] = { + try { + returnRecoveryPermit("recovery completed successfully") + callbacks.recoveryCompleted(commandContext, state) + + val running = new EventsourcedRunning[Command, Event, State]( + persistenceId, + context, + timers, + internalStash, + + sequenceNr, + writerIdentity, + + state, + + callbacks, + pluginIds + ) + + tryUnstash(context, running) + } finally { + cancelRecoveryTimer() + } + } + + protected def onRecoveryTick(snapshot: Boolean): Behavior[Any] = + if (!snapshot) { + if (!eventSeenInInterval) { + cancelRecoveryTimer() + val msg = s"Recovery timed out, didn't get event within $timeout, highest sequence number seen $sequenceNr" + onRecoveryFailure(new RecoveryTimedOut(msg), event = None) // TODO allow users to hook into this? + } else { + eventSeenInInterval = false + same + } + } else { + // snapshot timeout, but we're already in the events recovery phase + Behavior.unhandled + } + + // ---------- + + override def onMessage(msg: Any): Behavior[Any] = { + msg match { + // TODO explore crazy hashcode hack to make this match quicker...? + case JournalResponse(r) ⇒ onJournalResponse(r) + case RecoveryTickEvent(snapshot) ⇒ onRecoveryTick(snapshot = snapshot) + case SnapshotterResponse(r) ⇒ onSnapshotterResponse(r) + case c: Command @unchecked ⇒ onCommand(c.asInstanceOf[Command]) // explicit cast to fail eagerly + } + } + + // ---------- + + // ---------- journal interactions --------- + + private def replayEvents(fromSeqNr: SeqNr, toSeqNr: SeqNr): Unit = { + log.debug("Replaying messages: from: {}, to: {}", fromSeqNr, toSeqNr) + // reply is sent to `selfUntypedAdapted`, it is important to target that one + journal ! ReplayMessages(fromSeqNr, toSeqNr, recovery.replayMax, persistenceId, selfUntypedAdapted) + } + + private def returnRecoveryPermit(reason: String): Unit = { + log.debug("Returning recovery permit, reason: " + reason) + // IMPORTANT to use selfUntyped, and not an adapter, since recovery permitter watches/unwatches those refs (and adapters are new refs) + extension.recoveryPermitter.tell(RecoveryPermitter.ReturnRecoveryPermit, selfUntyped) + } + + override def toString = s"EventsourcedRecoveringEvents($persistenceId)" + +} diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRecoveringSnapshot.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRecoveringSnapshot.scala new file mode 100644 index 0000000000..232d81e535 --- /dev/null +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRecoveringSnapshot.scala @@ -0,0 +1,217 @@ +/** + * Copyright (C) 2016-2018 Lightbend Inc. + */ +package akka.persistence.typed.internal + +import akka.actor.typed.Behavior +import akka.actor.typed.scaladsl.Behaviors.MutableBehavior +import akka.actor.typed.scaladsl.{ ActorContext, Behaviors, StashBuffer, TimerScheduler } +import akka.annotation.InternalApi +import akka.event.Logging +import akka.persistence.SnapshotProtocol.{ LoadSnapshot, LoadSnapshotFailed, LoadSnapshotResult } +import akka.persistence._ +import akka.persistence.typed.internal.EventsourcedBehavior.WriterIdentity +import akka.util.Helpers._ + +import scala.util.control.NonFatal +import scala.util.{ Failure, Success, Try } + +/** + * INTERNAL API + * + * Second (of four) behavior of an PersistentBehavior. + * + * In this behavior the recovery process is initiated. + * We try to obtain a snapshot from the configured snapshot store, + * and if it exists, we use it instead of the `initialState`. + * + * Once snapshot recovery is done (or no snapshot was selected), + * recovery of events continues in [[EventsourcedRecoveringEvents]]. + */ +@InternalApi +final class EventsourcedRecoveringSnapshot[Command, Event, State]( + val persistenceId: String, + override val context: ActorContext[Any], + override val timers: TimerScheduler[Any], + override val internalStash: StashBuffer[Any], + + val recovery: Recovery, + val writerIdentity: WriterIdentity, + + val callbacks: EventsourcedCallbacks[Command, Event, State], + val pluginIds: EventsourcedPluginIds +) extends MutableBehavior[Any] + with EventsourcedBehavior[Command, Event, State] + with EventsourcedStashManagement { + + import Behaviors.same + import EventsourcedBehavior._ + import akka.actor.typed.scaladsl.adapter._ + + protected val log = Logging(context.system.toUntyped, this) + + // -------- initialize -------- + startRecoveryTimer() + + loadSnapshot(persistenceId, recovery.fromSnapshot, recovery.toSequenceNr) + // ---- end of initialize ---- + + val commandContext: ActorContext[Command] = context.asInstanceOf[ActorContext[Command]] + + // ---------- + + protected var awaitingSnapshot: Boolean = true + + // ---------- + + private var lastSequenceNr: Long = 0L + def snapshotSequenceNr: Long = lastSequenceNr + + // ---------- + + lazy val timeout = extension.journalConfigFor(journalPluginId).getMillisDuration("recovery-event-timeout") + + // protect against snapshot stalling forever because of journal overloaded and such + private val RecoveryTickTimerKey = "recovery-tick" + private def startRecoveryTimer(): Unit = { + timers.startPeriodicTimer(RecoveryTickTimerKey, RecoveryTickEvent(snapshot = false), timeout) + } + private def cancelRecoveryTimer(): Unit = timers.cancel(RecoveryTickTimerKey) + + def onCommand(cmd: Command): Behavior[Any] = { + // during recovery, stash all incoming commands + stash(cmd) + Behavior.same + } + + def onJournalResponse(response: JournalProtocol.Response): Behavior[Any] = try { + throw new Exception("Should not talk to journal yet! But got: " + response) + } catch { + case NonFatal(cause) ⇒ + returnRecoveryPermitOnlyOnFailure(cause) + throw cause + } + + def onSnapshotterResponse(response: SnapshotProtocol.Response): Behavior[Any] = try { + response match { + case LoadSnapshotResult(sso, toSnr) ⇒ + var state: S = initialState + val re: Try[SeqNr] = Try { + sso match { + case Some(SelectedSnapshot(metadata, snapshot)) ⇒ + state = snapshot.asInstanceOf[State] + metadata.sequenceNr + + case None ⇒ + 0 // from the start please + } + } + + re match { + case Success(seqNr) ⇒ + lastSequenceNr = seqNr + replayMessages(state, toSnr) + + case Failure(cause) ⇒ + // FIXME better exception type + val ex = new RuntimeException(s"Failed to recover state for [$persistenceId] from snapshot offer.", cause) + onRecoveryFailure(ex, event = None) // FIXME the failure logs has bad messages... FIXME + } + + case LoadSnapshotFailed(cause) ⇒ + cancelRecoveryTimer() + + onRecoveryFailure(cause, event = None) + + case other ⇒ + stash(other) + same + } + } catch { + case NonFatal(cause) ⇒ + returnRecoveryPermitOnlyOnFailure(cause) + throw cause + } + + private def replayMessages(state: State, toSnr: SeqNr): Behavior[Any] = { + cancelRecoveryTimer() + + val rec = recovery.copy(toSequenceNr = toSnr, fromSnapshot = SnapshotSelectionCriteria.None) // TODO introduce new types + + new EventsourcedRecoveringEvents[Command, Event, State]( + persistenceId, + context, + timers, + internalStash, + + rec, + lastSequenceNr, + writerIdentity, + + state, + callbacks, + pluginIds + ) + } + + /** + * Called whenever a message replay fails. By default it logs the error. + * + * The actor is always stopped after this method has been invoked. + * + * @param cause failure cause. + * @param event the event that was processed in `receiveRecover`, if the exception was thrown there + */ + protected def onRecoveryFailure(cause: Throwable, event: Option[Any]): Behavior[Any] = { + cancelRecoveryTimer() + event match { + case Some(evt) ⇒ + log.error(cause, "Exception in receiveRecover when replaying event type [{}] with sequence number [{}] for " + + "persistenceId [{}].", evt.getClass.getName, lastSequenceNr, persistenceId) + Behaviors.stopped + + case None ⇒ + log.error(cause, "Persistence failure when replaying events for persistenceId [{}]. " + + "Last known sequence number [{}]", persistenceId, lastSequenceNr) + Behaviors.stopped + } + } + + protected def onRecoveryTick(snapshot: Boolean): Behavior[Any] = + // we know we're in snapshotting mode + if (snapshot) onRecoveryFailure(new RecoveryTimedOut(s"Recovery timed out, didn't get snapshot within $timeout"), event = None) + else same // ignore, since we received the snapshot already + + // ---------- + + override def onMessage(msg: Any): Behavior[Any] = { + msg match { + // TODO explore crazy hashcode hack to make this match quicker...? + case SnapshotterResponse(r) ⇒ onSnapshotterResponse(r) + case JournalResponse(r) ⇒ onJournalResponse(r) + case RecoveryTickEvent(snapshot) ⇒ onRecoveryTick(snapshot = snapshot) + case c: Command @unchecked ⇒ onCommand(c.asInstanceOf[Command]) // explicit cast to fail eagerly + } + } + + // ---------- + + // ---------- journal interactions --------- + + /** + * Instructs the snapshot store to load the specified snapshot and send it via an [[SnapshotOffer]] + * to the running [[PersistentActor]]. + */ + private def loadSnapshot(persistenceId: String, criteria: SnapshotSelectionCriteria, toSequenceNr: Long): Unit = { + snapshotStore.tell(LoadSnapshot(persistenceId, criteria, toSequenceNr), selfUntypedAdapted) + } + + private def returnRecoveryPermitOnlyOnFailure(cause: Throwable): Unit = { + log.debug("Returning recovery permit, on failure because: " + cause.getMessage) + // IMPORTANT to use selfUntyped, and not an adapter, since recovery permitter watches/unwatches those refs (and adapters are new refs) + extension.recoveryPermitter.tell(RecoveryPermitter.ReturnRecoveryPermit, selfUntyped) + } + + override def toString = s"EventsourcedRecoveringSnapshot($persistenceId)" + +} diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRequestingRecoveryPermit.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRequestingRecoveryPermit.scala new file mode 100644 index 0000000000..6b1d7517da --- /dev/null +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRequestingRecoveryPermit.scala @@ -0,0 +1,96 @@ +/** + * Copyright (C) 2016-2018 Lightbend Inc. + */ +package akka.persistence.typed.internal + +import akka.actor.typed.Behavior +import akka.actor.typed.scaladsl.Behaviors.MutableBehavior +import akka.actor.typed.scaladsl.{ ActorContext, Behaviors, StashBuffer, TimerScheduler } +import akka.annotation.InternalApi +import akka.event.Logging +import akka.persistence._ +import akka.persistence.typed.internal.EventsourcedBehavior.WriterIdentity + +/** + * INTERNAL API + * + * First (of four) behaviour of an PersistentBehaviour. + * + * Requests a permit to start recovering this actor; this is tone to avoid + * hammering the journal with too many concurrently recovering actors. + */ +@InternalApi +private[akka] final class EventsourcedRequestingRecoveryPermit[Command, Event, State]( + val persistenceId: String, + override val context: ActorContext[Any], + override val timers: TimerScheduler[Any], + + val recovery: Recovery, + + val callbacks: EventsourcedCallbacks[Command, Event, State], + val pluginIds: EventsourcedPluginIds +) extends MutableBehavior[Any] + with EventsourcedBehavior[Command, Event, State] + with EventsourcedStashManagement { + + import akka.actor.typed.scaladsl.adapter._ + + // has to be lazy, since we want to obtain the persistenceId + protected lazy val log = Logging(context.system.toUntyped, this) + + override protected val internalStash: StashBuffer[Any] = { + val stashSize = context.system.settings.config + .getInt("akka.persistence.typed.stash-buffer-size") + StashBuffer[Any](stashSize) + } + + // --- initialization --- + // only once we have a permit, we can become active: + requestRecoveryPermit() + + val writerIdentity: WriterIdentity = WriterIdentity.newIdentity() + + // --- end of initialization --- + + // ---------- + + def becomeRecovering(): Behavior[Any] = { + log.debug(s"Initializing snapshot recovery: {}", recovery) + + new EventsourcedRecoveringSnapshot( + persistenceId, + context, + timers, + internalStash, + + recovery, + writerIdentity, + + callbacks, + pluginIds + ) + } + + // ---------- + + override def onMessage(msg: Any): Behavior[Any] = { + msg match { + case RecoveryPermitter.RecoveryPermitGranted ⇒ + log.debug("Awaiting permit, received: RecoveryPermitGranted") + becomeRecovering() + + case other ⇒ + stash(other) + Behaviors.same + } + } + + // ---------- journal interactions --------- + + private def requestRecoveryPermit(): Unit = { + // IMPORTANT to use selfUntyped, and not an adapter, since recovery permitter watches/unwatches those refs (and adapters are new refs) + extension.recoveryPermitter.tell(RecoveryPermitter.RequestRecoveryPermit, selfUntyped) + } + + override def toString = s"EventsourcedRequestingRecoveryPermit($persistenceId)" +} diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala new file mode 100644 index 0000000000..f19a0ed886 --- /dev/null +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala @@ -0,0 +1,365 @@ +/** + * Copyright (C) 2016-2018 Lightbend Inc. + */ +package akka.persistence.typed.internal + +import akka.actor.typed.Behavior +import akka.actor.typed.Behavior.StoppedBehavior +import akka.actor.typed.scaladsl.Behaviors.MutableBehavior +import akka.actor.typed.scaladsl.{ ActorContext, Behaviors, StashBuffer, TimerScheduler } +import akka.annotation.InternalApi +import akka.event.Logging +import akka.persistence.Eventsourced.{ PendingHandlerInvocation, StashingHandlerInvocation } +import akka.persistence.JournalProtocol._ +import akka.persistence._ +import akka.persistence.journal.Tagged +import akka.persistence.typed.internal.EventsourcedBehavior.WriterIdentity + +import scala.annotation.tailrec +import scala.collection.immutable + +/** + * INTERNAL API + * + * Fourth (of four) -- also known as 'final' or 'ultimate' -- form of PersistentBehavior. + * + * In this phase recovery has completed successfully and we continue handling incoming commands, + * as well as persisting new events as dictated by the user handlers. + * + * This behavior operates in two phases: + * - HandlingCommands - where the command handler is invoked for incoming commands + * - PersistingEvents - where incoming commands are stashed until persistence completes + * + * This is implemented as such to avoid creating many EventsourcedRunning instances, + * which perform the Persistence extension lookup on creation and similar things (config lookup) + * + */ +@InternalApi +class EventsourcedRunning[Command, Event, State]( + val persistenceId: String, + override val context: ActorContext[Any], + override val timers: TimerScheduler[Any], + override val internalStash: StashBuffer[Any], + + private var sequenceNr: Long, + val writerIdentity: WriterIdentity, + + private var state: State, + + val callbacks: EventsourcedCallbacks[Command, Event, State], + val pluginIds: EventsourcedPluginIds +) extends MutableBehavior[Any] + with EventsourcedBehavior[Command, Event, State] + with EventsourcedStashManagement { same ⇒ + + import EventsourcedBehavior._ + import akka.actor.typed.scaladsl.adapter._ + + protected val log = Logging(context.system.toUntyped, this) + + private def commandContext: ActorContext[Command] = context.asInstanceOf[ActorContext[Command]] + + // ---------- + + // Holds callbacks for persist calls (note that we do not implement persistAsync currently) + private def hasNoPendingInvocations: Boolean = pendingInvocations.isEmpty + private val pendingInvocations = new java.util.LinkedList[PendingHandlerInvocation]() // we only append / isEmpty / get(0) on it + + // ---------- + + private def snapshotSequenceNr: Long = sequenceNr + + private def updateLastSequenceNr(persistent: PersistentRepr): Unit = + if (persistent.sequenceNr > sequenceNr) sequenceNr = persistent.sequenceNr + private def nextSequenceNr(): Long = { + sequenceNr += 1L + sequenceNr + } + // ---------- + + private def onSnapshotterResponse(response: SnapshotProtocol.Response): Behavior[Any] = { + response match { + case SaveSnapshotSuccess(meta) ⇒ + log.debug("Save snapshot successful: " + meta) + same + case SaveSnapshotFailure(meta, ex) ⇒ + log.error(ex, "Save snapshot failed! " + meta) + same // FIXME https://github.com/akka/akka/issues/24637 should we provide callback for this? to allow Stop + } + } + + // ---------- + + trait EventsourcedRunningPhase { + def name: String + def onCommand(c: Command): Behavior[Any] + def onJournalResponse(response: JournalProtocol.Response): Behavior[Any] + } + + object HandlingCommands extends EventsourcedRunningPhase { + def name = "HandlingCommands" + + final override def onCommand(command: Command): Behavior[Any] = { + val effect = commandHandler(commandContext, state, command) + applyEffects(command, effect.asInstanceOf[EffectImpl[E, S]]) // TODO can we avoid the cast? + } + final override def onJournalResponse(response: Response): Behavior[Any] = { + // should not happen, what would it reply? + throw new RuntimeException("Received message which should not happen in Running state!") + } + } + + object PersistingEventsNoSideEffects extends PersistingEvents(Nil) + + sealed class PersistingEvents(sideEffects: immutable.Seq[ChainableEffect[_, S]]) extends EventsourcedRunningPhase { + def name = "PersistingEvents" + + final override def onCommand(c: Command): Behavior[Any] = { + stash(c) + same + } + + final override def onJournalResponse(response: Response): Behavior[Any] = { + log.debug("Received Journal response: {}", response) + response match { + case WriteMessageSuccess(p, id) ⇒ + // instanceId mismatch can happen for persistAsync and defer in case of actor restart + // while message is in flight, in that case we ignore the call to the handler + if (id == writerIdentity.instanceId) { + updateLastSequenceNr(p) + popApplyHandler(p.payload) + onWriteMessageComplete() + tryUnstash(context, applySideEffects(sideEffects)) + } else same + + case WriteMessageRejected(p, cause, id) ⇒ + // instanceId mismatch can happen for persistAsync and defer in case of actor restart + // while message is in flight, in that case the handler has already been discarded + if (id == writerIdentity.instanceId) { + updateLastSequenceNr(p) + onPersistRejected(cause, p.payload, p.sequenceNr) // does not stop + tryUnstash(context, applySideEffects(sideEffects)) + } else same + + case WriteMessageFailure(p, cause, id) ⇒ + // instanceId mismatch can happen for persistAsync and defer in case of actor restart + // while message is in flight, in that case the handler has already been discarded + if (id == writerIdentity.instanceId) { + onWriteMessageComplete() + onPersistFailureThenStop(cause, p.payload, p.sequenceNr) + } else same + + case WriteMessagesSuccessful ⇒ + // ignore + same + + case WriteMessagesFailed(_) ⇒ + // ignore + same // it will be stopped by the first WriteMessageFailure message; not applying side effects + + case _: LoopMessageSuccess ⇒ + // ignore, should never happen as there is no persistAsync in typed + same + } + } + + private def onWriteMessageComplete(): Unit = + tryBecomeHandlingCommands() + + private def onPersistRejected(cause: Throwable, event: Any, seqNr: Long): Unit = { + log.error( + cause, + "Rejected to persist event type [{}] with sequence number [{}] for persistenceId [{}] due to [{}].", + event.getClass.getName, seqNr, persistenceId, cause.getMessage) + } + + private def onPersistFailureThenStop(cause: Throwable, event: Any, seqNr: Long): Behavior[Any] = { + log.error(cause, "Failed to persist event type [{}] with sequence number [{}] for persistenceId [{}].", + event.getClass.getName, seqNr, persistenceId) + + // FIXME see #24479 for reconsidering the stopping behaviour + Behaviors.stopped + } + + } + + // the active phase switches between PersistingEvents and HandlingCommands; + // we do this via a var instead of behaviours to keep allocations down as this will be flip/flaping on every Persist effect + private[this] var phase: EventsourcedRunningPhase = HandlingCommands + + override def onMessage(msg: Any): Behavior[Any] = { + msg match { + // TODO explore crazy hashcode hack to make this match quicker...? + case SnapshotterResponse(r) ⇒ onSnapshotterResponse(r) + case JournalResponse(r) ⇒ phase.onJournalResponse(r) + case command: Command @unchecked ⇒ + // the above type-check does nothing, since Command is tun + // we cast explicitly to fail early in case of type mismatch + val c = command.asInstanceOf[Command] + phase.onCommand(c) + } + } + + // ---------- + + def applySideEffects(effects: immutable.Seq[ChainableEffect[_, S]]): Behavior[Any] = { + var res: Behavior[Any] = same + val it = effects.iterator + + // if at least one effect results in a `stop`, we need to stop + // manual loop implementation to avoid allocations and multiple scans + while (it.hasNext) { + val effect = it.next() + applySideEffect(effect) match { + case _: StoppedBehavior[_] ⇒ res = Behaviors.stopped + case _ ⇒ // nothing to do + } + } + + res + } + + def applySideEffect(effect: ChainableEffect[_, S]): Behavior[Any] = effect match { + case _: Stop.type @unchecked ⇒ + Behaviors.stopped + + case SideEffect(sideEffects) ⇒ + sideEffects(state) + same + + case _ ⇒ + throw new IllegalArgumentException(s"Not supported effect detected [${effect.getClass.getName}]!") + } + + def applyEvent(s: S, event: E): S = + eventHandler(s, event) + + @tailrec private def applyEffects(msg: Any, effect: EffectImpl[E, S], sideEffects: immutable.Seq[ChainableEffect[_, S]] = Nil): Behavior[Any] = { + if (log.isDebugEnabled) + log.debug(s"Handled command [{}], resulting effect: [{}], side effects: [{}]", msg.getClass.getName, effect, sideEffects.size) + + effect match { + case CompositeEffect(e, currentSideEffects) ⇒ + // unwrap and accumulate effects + applyEffects(msg, e, currentSideEffects ++ sideEffects) + + case Persist(event) ⇒ + // apply the event before persist so that validation exception is handled before persisting + // the invalid event, in case such validation is implemented in the event handler. + // also, ensure that there is an event handler for each single event + state = applyEvent(state, event) + val tags = tagger(event) + val eventToPersist = if (tags.isEmpty) event else Tagged(event, tags) + + internalPersist(eventToPersist, sideEffects) { _ ⇒ + if (snapshotWhen(state, event, sequenceNr)) + internalSaveSnapshot(state) + } + + case PersistAll(events) ⇒ + if (events.nonEmpty) { + // apply the event before persist so that validation exception is handled before persisting + // the invalid event, in case such validation is implemented in the event handler. + // also, ensure that there is an event handler for each single event + var count = events.size + var seqNr = sequenceNr + val (newState, shouldSnapshotAfterPersist) = events.foldLeft((state, false)) { + case ((currentState, snapshot), event) ⇒ + seqNr += 1 + val shouldSnapshot = snapshot || snapshotWhen(currentState, event, seqNr) + (applyEvent(currentState, event), shouldSnapshot) + } + state = newState + val eventsToPersist = events.map { event ⇒ + val tags = tagger(event) + if (tags.isEmpty) event else Tagged(event, tags) + } + + internalPersistAll(eventsToPersist, sideEffects) { _ ⇒ + count -= 1 + if (count == 0) { + sideEffects.foreach(applySideEffect) + if (shouldSnapshotAfterPersist) + internalSaveSnapshot(state) + } + } + } else { + // run side-effects even when no events are emitted + tryUnstash(context, applySideEffects(sideEffects)) + } + + case e: PersistNothing.type @unchecked ⇒ + tryUnstash(context, applySideEffects(sideEffects)) + + case _: Unhandled.type @unchecked ⇒ + applySideEffects(sideEffects) + Behavior.unhandled + + case c: ChainableEffect[_, S] ⇒ + applySideEffect(c) + } + } + + private def popApplyHandler(payload: Any): Unit = + pendingInvocations.pop().handler(payload) + + private def becomePersistingEvents(sideEffects: immutable.Seq[ChainableEffect[_, S]]): Behavior[Any] = { + if (phase.isInstanceOf[PersistingEvents]) throw new IllegalArgumentException( + "Attempted to become PersistingEvents while already in this phase! Logic error?") + + phase = + if (sideEffects.isEmpty) PersistingEventsNoSideEffects + else new PersistingEvents(sideEffects) + + same + } + + private def tryBecomeHandlingCommands(): Behavior[Any] = { + if (phase == HandlingCommands) throw new IllegalArgumentException( + "Attempted to become HandlingCommands while already in this phase! Logic error?") + + if (hasNoPendingInvocations) { // CAN THIS EVER NOT HAPPEN? + phase = HandlingCommands + } + + same + } + + // ---------- journal interactions --------- + + // Any since can be `E` or `Tagged` + private def internalPersist(event: Any, sideEffects: immutable.Seq[ChainableEffect[_, S]])(handler: Any ⇒ Unit): Behavior[Any] = { + pendingInvocations addLast StashingHandlerInvocation(event, handler.asInstanceOf[Any ⇒ Unit]) + + val senderNotKnownBecauseAkkaTyped = null + val repr = PersistentRepr(event, persistenceId = persistenceId, sequenceNr = nextSequenceNr(), writerUuid = writerIdentity.writerUuid, sender = senderNotKnownBecauseAkkaTyped) + + val eventBatch = AtomicWrite(repr) :: Nil // batching not used, since no persistAsync + journal.tell(JournalProtocol.WriteMessages(eventBatch, selfUntypedAdapted, writerIdentity.instanceId), selfUntypedAdapted) + + becomePersistingEvents(sideEffects) + } + + private def internalPersistAll(events: immutable.Seq[Any], sideEffects: immutable.Seq[ChainableEffect[_, S]])(handler: Any ⇒ Unit): Behavior[Any] = { + if (events.nonEmpty) { + val senderNotKnownBecauseAkkaTyped = null + + events.foreach { event ⇒ + pendingInvocations addLast StashingHandlerInvocation(event, handler.asInstanceOf[Any ⇒ Unit]) + } + + val write = AtomicWrite(events.map(PersistentRepr.apply(_, persistenceId = persistenceId, + sequenceNr = nextSequenceNr(), writerUuid = writerIdentity.writerUuid, sender = senderNotKnownBecauseAkkaTyped))) + + journal.tell(JournalProtocol.WriteMessages(write :: Nil, selfUntypedAdapted, writerIdentity.instanceId), selfUntypedAdapted) + + becomePersistingEvents(sideEffects) + } else same + } + + private def internalSaveSnapshot(snapshot: State): Unit = { + snapshotStore.tell(SnapshotProtocol.SaveSnapshot(SnapshotMetadata(persistenceId, snapshotSequenceNr), snapshot), selfUntypedAdapted) + } + + override def toString = s"EventsourcedRunning($persistenceId,${phase.name})" +} diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedStashManagement.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedStashManagement.scala new file mode 100644 index 0000000000..416ce45c74 --- /dev/null +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedStashManagement.scala @@ -0,0 +1,76 @@ +package akka.persistence.typed.internal + +import java.util.Locale + +import akka.actor.typed.{ ActorSystem, Behavior } +import akka.actor.{ DeadLetter, StashOverflowException } +import akka.actor.typed.scaladsl.{ ActorContext, StashBuffer } +import akka.annotation.InternalApi +import akka.event.Logging.LogLevel +import akka.event.{ Logging, LoggingAdapter } +import akka.persistence._ +import akka.util.ConstantFun +import akka.{ actor ⇒ a } + +/** INTERNAL API: Stash management for persistent behaviors */ +@InternalApi +private[akka] trait EventsourcedStashManagement { + import EventsourcedStashManagement._ + import akka.actor.typed.scaladsl.adapter._ + + protected def log: LoggingAdapter + + protected def extension: Persistence + protected def context: ActorContext[Any] + + protected val internalStash: StashBuffer[Any] + + private lazy val logLevel = { + val configuredLevel = extension.system.settings.config + .getString("akka.persistence.typed.log-stashing") + Logging.levelFor(configuredLevel).getOrElse(OffLevel) // this is OffLevel + } + + /** + * The returned [[StashOverflowStrategy]] object determines how to handle the message failed to stash + * when the internal Stash capacity exceeded. + */ + protected val internalStashOverflowStrategy: StashOverflowStrategy = + extension.defaultInternalStashOverflowStrategy match { + case ReplyToStrategy(_) ⇒ + throw new RuntimeException("ReplyToStrategy is not supported in Akka Typed, since there is no sender()!") + case other ⇒ + other // the other strategies are supported + } + + protected def stash(msg: Any): Unit = { + if (logLevel != OffLevel) log.log(logLevel, "Stashing message: {}", msg) + + try internalStash.stash(msg) catch { + case e: StashOverflowException ⇒ + internalStashOverflowStrategy match { + case DiscardToDeadLetterStrategy ⇒ + val snd: a.ActorRef = a.ActorRef.noSender // FIXME can we improve it somehow? + context.system.deadLetters.tell(DeadLetter(msg, snd, context.self.toUntyped)) + + case ReplyToStrategy(response) ⇒ + throw new RuntimeException("ReplyToStrategy does not make sense at all in Akka Typed, since there is no sender()!") + + case ThrowOverflowExceptionStrategy ⇒ + throw e + } + } + } + + protected def tryUnstash(ctx: ActorContext[Any], behavior: Behavior[Any]): Behavior[Any] = { + if (internalStash.nonEmpty) { + log.debug("Unstashing message: {}", internalStash.head.getClass) + internalStash.unstash(context, behavior, 1, ConstantFun.scalaIdentityFunction) + } else behavior + } + +} + +object EventsourcedStashManagement { + private val OffLevel = LogLevel(Int.MinValue) +} diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/PersistentActorImpl.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/PersistentActorImpl.scala deleted file mode 100644 index a29dcd2d70..0000000000 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/PersistentActorImpl.scala +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Copyright (C) 2017-2018 Lightbend Inc. - */ -package akka.persistence.typed.internal - -import akka.actor.ActorLogging -import akka.actor.typed.internal.adapter.ActorContextAdapter -import akka.annotation.InternalApi -import akka.persistence.journal.Tagged -import akka.persistence.typed.scaladsl.{ PersistentBehavior, PersistentBehaviors } -import akka.persistence.{ RecoveryCompleted, SaveSnapshotFailure, SaveSnapshotSuccess, SnapshotOffer, PersistentActor ⇒ UntypedPersistentActor } -import akka.{ actor ⇒ a } - -/** - * INTERNAL API - */ -@InternalApi private[akka] object PersistentActorImpl { - - /** - * Stop the actor for passivation. `PoisonPill` does not work well - * with persistent actors. - */ - case object StopForPassivation - - def props[C, E, S]( - behaviorFactory: () ⇒ PersistentBehavior[C, E, S]): a.Props = - a.Props(new PersistentActorImpl(behaviorFactory())) - -} - -/** - * INTERNAL API - * The `PersistentActor` that runs a `PersistentBehavior`. - */ -@InternalApi private[akka] class PersistentActorImpl[C, E, S]( - behavior: PersistentBehavior[C, E, S]) extends UntypedPersistentActor with ActorLogging { - - import PersistentBehaviors._ - - override val persistenceId: String = behavior.persistenceIdFromActorName(self.path.name) - - private var state: S = behavior.initialState - - private val commandHandler: CommandHandler[C, E, S] = behavior.commandHandler - - private val eventHandler: (S, E) ⇒ S = behavior.eventHandler - - private val ctxAdapter = new ActorContextAdapter[C](context) - private val ctx = ctxAdapter.asScala - - override def receiveRecover: Receive = { - case SnapshotOffer(_, snapshot) ⇒ - state = snapshot.asInstanceOf[S] - - case RecoveryCompleted ⇒ - behavior.recoveryCompleted(ctx, state) - - case event: E @unchecked ⇒ - state = applyEvent(state, event) - } - - def applyEvent(s: S, event: E): S = - eventHandler.apply(s, event) - - override def receiveCommand: Receive = { - case PersistentActorImpl.StopForPassivation ⇒ - context.stop(self) - - case SaveSnapshotSuccess(meta) ⇒ - log.debug("Snapshot saved: {}", meta) - case SaveSnapshotFailure(meta, thr) ⇒ - log.error(thr, "Snapshot failed: {}", meta) - - case msg ⇒ - try { - val effects = msg match { - case a.ReceiveTimeout ⇒ - commandHandler(ctx, state, ctxAdapter.receiveTimeoutMsg) - // TODO note that PostStop, PreRestart and Terminated signals are not handled, we wouldn't be able to persist there - case cmd: C @unchecked ⇒ - // FIXME we could make it more safe by using ClassTag for C - commandHandler(ctx, state, cmd) - } - applyEffects(msg, effects) - } catch { - case _: MatchError ⇒ throw new IllegalStateException( - s"Undefined state [${state.getClass.getName}] or handler for [${msg.getClass.getName} " + - s"in [${behavior.getClass.getName}] with persistenceId [$persistenceId]") - } - } - - private def applyEffects(msg: Any, effect: Effect[E, S], sideEffects: Seq[ChainableEffect[_, S]] = Nil): Unit = effect match { - case CompositeEffect(persist, currentSideEffects) ⇒ - applyEffects(msg, persist, currentSideEffects ++ sideEffects) - case Persist(event) ⇒ - // apply the event before persist so that validation exception is handled before persisting - // the invalid event, in case such validation is implemented in the event handler. - // also, ensure that there is an event handler for each single event - state = applyEvent(state, event) - val tags = behavior.tagger(event) - val eventToPersist = if (tags.isEmpty) event else Tagged(event, tags) - persist(eventToPersist) { _ ⇒ - sideEffects.foreach(applySideEffect) - if (shouldSnapshot(state, event, lastSequenceNr)) - saveSnapshot(state) - } - case PersistAll(events) ⇒ - if (events.nonEmpty) { - // apply the event before persist so that validation exception is handled before persisting - // the invalid event, in case such validation is implemented in the event handler. - // also, ensure that there is an event handler for each single event - var count = events.size - var seqNr = lastSequenceNr - val (newState, shouldSnapshotAfterPersist) = events.foldLeft((state, false)) { - case ((currentState, snapshot), event) ⇒ - seqNr += 1 - (applyEvent(currentState, event), snapshot || shouldSnapshot(currentState, event, seqNr)) - } - state = newState - val eventsToPersist = events.map { event ⇒ - val tags = behavior.tagger(event) - if (tags.isEmpty) event else Tagged(event, tags) - } - persistAll(eventsToPersist) { _ ⇒ - count -= 1 - if (count == 0) { - sideEffects.foreach(applySideEffect) - if (shouldSnapshotAfterPersist) - saveSnapshot(state) - } - } - } else { - // run side-effects even when no events are emitted - sideEffects.foreach(applySideEffect) - } - case _: PersistNothing.type @unchecked ⇒ - // FIXME: Why don't we do the side effects here?? - sideEffects.foreach(applySideEffect) - case _: Unhandled.type @unchecked ⇒ - // FIXME: Why don't we do the side effects here?? We do allow users to add them - super.unhandled(msg) - case c: ChainableEffect[_, S] ⇒ - applySideEffect(c) - } - - def applySideEffect(effect: ChainableEffect[_, S]): Unit = effect match { - case _: Stop.type @unchecked ⇒ - context.stop(self) - case SideEffect(callbacks) ⇒ callbacks.apply(state) - } - - private def shouldSnapshot(state: S, event: E, sequenceNr: Long): Boolean = { - behavior.snapshotOn(state, event, sequenceNr) - } - -} - diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/Effect.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/Effect.scala index eb1a9b5c7e..6778c3582f 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/Effect.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/Effect.scala @@ -4,8 +4,7 @@ package akka.persistence.typed.javadsl import akka.annotation.DoNotInherit -import akka.persistence.typed.scaladsl.PersistentBehaviors._ -import akka.japi.{ function ⇒ japi } +import akka.japi.function import akka.persistence.typed.internal._ import scala.collection.JavaConverters._ @@ -53,10 +52,10 @@ object EffectFactory extends EffectFactories[Nothing, Nothing, Nothing] @DoNotInherit abstract class Effect[+Event, State] { self: EffectImpl[Event, State] ⇒ /** Convenience method to register a side effect with just a callback function */ - final def andThen(callback: japi.Procedure[State]): Effect[Event, State] = + final def andThen(callback: function.Procedure[State]): Effect[Event, State] = CompositeEffect(this, SideEffect[Event, State](s ⇒ callback.apply(s))) /** Convenience method to register a side effect that doesn't need access to state */ - final def andThen(callback: japi.Effect): Effect[Event, State] = + final def andThen(callback: function.Effect): Effect[Event, State] = CompositeEffect(this, SideEffect[Event, State]((_: State) ⇒ callback.apply())) } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/PersistentBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/PersistentBehavior.scala new file mode 100644 index 0000000000..0c335d6623 --- /dev/null +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/PersistentBehavior.scala @@ -0,0 +1,105 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ +package akka.persistence.typed.javadsl + +import java.util.Collections + +import akka.actor.typed.Behavior.UntypedPropsBehavior +import akka.actor.typed.internal.adapter.PropsAdapter +import akka.actor.typed.javadsl.ActorContext +import akka.annotation.{ ApiMayChange, InternalApi } +import akka.persistence.typed._ +import akka.persistence.typed.internal._ + +/** Java API */ +@ApiMayChange +abstract class PersistentBehavior[Command, Event, State >: Null](val persistenceId: String) extends UntypedPropsBehavior[Command] { + + /** + * Factory of effects. + * + * Return effects from your handlers in order to instruct persistence on how to act on the incoming message (i.e. persist events). + */ + protected final def Effect: EffectFactories[Command, Event, State] = EffectFactory.asInstanceOf[EffectFactories[Command, Event, State]] + + /** + * Implement by returning the initial state object. + * This object will be passed into this behaviors handlers, until a new state replaces it. + * + * Also known as "zero state" or "neutral state". + */ + protected def initialState: State + + /** + * Implement by handling incoming commands and return an `Effect()` to persist or signal other effects + * of the command handling such as stopping the behavior or others. + * + * This method is only invoked when the actor is running (i.e. not recovering). + * While the actor is persisting events, the incoming messages are stashed and only + * delivered to the handler once persisting them has completed. + */ + protected def commandHandler(): CommandHandler[Command, Event, State] + + /** + * Implement by applying the event to the current state in order to return a new state. + * + * This method invoked during recovery as well as running operation of this behavior, + * in order to keep updating the state state. + * + * For that reason it is strongly discouraged to perform side-effects in this handler; + * Side effects should be executed in `andThen` or `recoveryCompleted` blocks. + */ + protected def eventHandler(): EventHandler[Event, State] + + /** + * @return A new, mutable, by state command handler builder + */ + protected final def commandHandlerBuilder(): CommandHandlerBuilder[Command, Event, State] = + new CommandHandlerBuilder[Command, Event, State]() + + /** + * @return A new, mutable, by state command handler builder + */ + protected final def byStateCommandHandlerBuilder(): ByStateCommandHandlerBuilder[Command, Event, State] = + new ByStateCommandHandlerBuilder[Command, Event, State]() + + /** + * @return A new, mutable, event handler builder + */ + protected final def eventHandlerBuilder(): EventHandlerBuilder[Event, State] = + new EventHandlerBuilder[Event, State]() + + /** + * The `callback` function is called to notify the actor that the recovery process + * is finished. + */ + def onRecoveryCompleted(ctx: ActorContext[Command], state: State): Unit = {} + + /** + * Initiates a snapshot if the given function returns true. + * When persisting multiple events at once the snapshot is triggered after all the events have + * been persisted. + * + * `predicate` receives the State, Event and the sequenceNr used for the Event + */ + def shouldSnapshot(state: State, event: Event, sequenceNr: Long): Boolean = false + /** + * The `tagger` function should give event tags, which will be used in persistence query + */ + def tagsFor(event: Event): java.util.Set[String] = Collections.emptySet() + + /** INTERNAL API */ + @InternalApi private[akka] override def untypedProps(props: akka.actor.typed.Props): akka.actor.Props = { + val behaviorImpl = scaladsl.PersistentBehaviors.immutable[Command, Event, State]( + persistenceId, + initialState, + (c, state, cmd) ⇒ commandHandler()(c.asJava, state, cmd).asInstanceOf[EffectImpl[Event, State]], + eventHandler()(_, _) + ) + + PropsAdapter(() ⇒ behaviorImpl, props) + } + +} + diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/PersistentBehaviors.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/PersistentBehaviors.scala deleted file mode 100644 index 271c0addfa..0000000000 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/PersistentBehaviors.scala +++ /dev/null @@ -1,78 +0,0 @@ -/** - * Copyright (C) 2018 Lightbend Inc. - */ -package akka.persistence.typed.javadsl - -import java.util.Collections - -import akka.actor.typed.Behavior.UntypedBehavior -import akka.actor.typed.javadsl.ActorContext -import akka.annotation.ApiMayChange -import akka.persistence.typed._ -import akka.persistence.typed.internal._ - -import scala.collection.JavaConverters._ - -@ApiMayChange abstract class PersistentBehavior[Command, Event, State >: Null](val persistenceId: String) extends UntypedBehavior[Command] { - - def Effect: EffectFactories[Command, Event, State] = EffectFactory.asInstanceOf[EffectFactories[Command, Event, State]] - - val initialState: State - - def commandHandler(): CommandHandler[Command, Event, State] - - def eventHandler(): EventHandler[Event, State] - - /** - * @return A new, mutable, by state command handler builder - */ - protected final def commandHandlerBuilder(): CommandHandlerBuilder[Command, Event, State] = - new CommandHandlerBuilder[Command, Event, State]() - - /** - * @return A new, mutable, by state command handler builder - */ - protected final def byStateCommandHandlerBuilder(): ByStateCommandHandlerBuilder[Command, Event, State] = - new ByStateCommandHandlerBuilder[Command, Event, State]() - - /** - * @return A new, mutable, builder - */ - protected final def eventHandlerBuilder(): EventHandlerBuilder[Event, State] = - new EventHandlerBuilder[Event, State]() - - /** - * The `callback` function is called to notify the actor that the recovery process - * is finished. - */ - def onRecoveryCompleted(ctx: ActorContext[Command], state: State): Unit = {} - - /** - * Initiates a snapshot if the given function returns true. - * When persisting multiple events at once the snapshot is triggered after all the events have - * been persisted. - * - * `predicate` receives the State, Event and the sequenceNr used for the Event - */ - def shouldSnapshot(state: State, event: Event, sequenceNr: Long): Boolean = false - /** - * The `tagger` function should give event tags, which will be used in persistence query - */ - def tagsFor(event: Event): java.util.Set[String] = Collections.emptySet() - - /** - * INTERNAL API - */ - override private[akka] def untypedProps = { - new scaladsl.PersistentBehavior[Command, Event, State]( - _ ⇒ persistenceId, - initialState, - (ctx, s, e) ⇒ commandHandler.apply(ctx.asJava, s, e).asInstanceOf[EffectImpl[Event, State]], - (s: State, e: Event) ⇒ eventHandler().apply(s, e), - (ctx, s) ⇒ onRecoveryCompleted(ctx.asJava, s), - e ⇒ tagsFor(e).asScala.toSet, - shouldSnapshot - ).untypedProps - } -} - diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/Effect.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/Effect.scala new file mode 100644 index 0000000000..e2d31faa15 --- /dev/null +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/Effect.scala @@ -0,0 +1,78 @@ +/** + * Copyright (C) 2017-2018 Lightbend Inc. + */ +package akka.persistence.typed.scaladsl + +import akka.actor.typed.scaladsl.Behaviors +import akka.annotation.DoNotInherit +import akka.persistence.typed.internal._ + +import scala.collection.{ immutable ⇒ im } + +/** + * Factories for effects - how a persistent actor reacts on a command + */ +object Effect { + + // TODO docs + def persist[Event, State](event: Event): Effect[Event, State] = Persist(event) + + // TODO docs + def persist[Event, A <: Event, B <: Event, State](evt1: A, evt2: B, events: Event*): Effect[Event, State] = + persist(evt1 :: evt2 :: events.toList) + + // TODO docs + def persist[Event, State](eventOpt: Option[Event]): Effect[Event, State] = + eventOpt match { + case Some(evt) ⇒ persist[Event, State](evt) + case _ ⇒ none[Event, State] + } + + // TODO docs + def persist[Event, State](events: im.Seq[Event]): Effect[Event, State] = + PersistAll(events) + + // TODO docs + def persist[Event, State](events: im.Seq[Event], sideEffects: im.Seq[ChainableEffect[Event, State]]): Effect[Event, State] = + new CompositeEffect[Event, State](PersistAll[Event, State](events), sideEffects) + + /** + * Do not persist anything + */ + def none[Event, State]: Effect[Event, State] = PersistNothing.asInstanceOf[Effect[Event, State]] + + /** + * This command is not handled, but it is not an error that it isn't. + */ + def unhandled[Event, State]: Effect[Event, State] = Unhandled.asInstanceOf[Effect[Event, State]] + + /** + * Stop this persistent actor + */ + def stop[Event, State]: ChainableEffect[Event, State] = Stop.asInstanceOf[ChainableEffect[Event, State]] +} + +/** + * Instances are created through the factories in the [[Effect]] companion object. + * + * Not for user extension. + */ +@DoNotInherit +trait Effect[+Event, State] extends akka.persistence.typed.javadsl.Effect[Event, State] { self: EffectImpl[Event, State] ⇒ + /* All events that will be persisted in this effect */ + def events: im.Seq[Event] + + def sideEffects[E >: Event]: im.Seq[ChainableEffect[E, State]] + + /** Convenience method to register a side effect with just a callback function */ + final def andThen(callback: State ⇒ Unit): Effect[Event, State] = + CompositeEffect(this, SideEffect[Event, State](callback)) + + /** Convenience method to register a side effect with just a lazy expression */ + final def andThen(callback: ⇒ Unit): Effect[Event, State] = + CompositeEffect(this, SideEffect[Event, State]((_: State) ⇒ callback)) + + /** The side effect is to stop the actor */ + def andThenStop: Effect[Event, State] = + CompositeEffect(this, Effect.stop[Event, State]) +} diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentBehaviors.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentBehaviors.scala index 58e17f4eed..9fe6e2828d 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentBehaviors.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentBehaviors.scala @@ -3,102 +3,52 @@ */ package akka.persistence.typed.scaladsl -import akka.actor.typed.Behavior.UntypedBehavior -import akka.actor.typed.scaladsl.ActorContext +import akka.actor.typed.Behavior +import akka.actor.typed.Behavior.DeferredBehavior +import akka.actor.typed.internal.TimerSchedulerImpl +import akka.actor.typed.scaladsl.{ ActorContext, TimerScheduler } import akka.annotation.{ DoNotInherit, InternalApi } +import akka.persistence.{ Recovery, SnapshotSelectionCriteria } import akka.persistence.typed.internal._ +import akka.persistence.typed.scaladsl.PersistentBehaviors.CommandHandler +import akka.util.ConstantFun -import scala.collection.{ immutable ⇒ im } +import scala.language.implicitConversions object PersistentBehaviors { + // we use this type internally, however it's easier for users to understand the function, so we use it in external api + type CommandHandler[Command, Event, State] = (ActorContext[Command], State, Command) ⇒ Effect[Event, State] + /** * Create a `Behavior` for a persistent actor. */ def immutable[Command, Event, State]( persistenceId: String, initialState: State, - commandHandler: CommandHandler[Command, Event, State], - eventHandler: (State, Event) ⇒ State): PersistentBehavior[Command, Event, State] = { - // FIXME remove `persistenceIdFromActorName: String ⇒ String` from PersistentBehavior - new PersistentBehavior(_ ⇒ persistenceId, initialState, commandHandler, eventHandler, - recoveryCompleted = (_, _) ⇒ (), - tagger = _ ⇒ Set.empty, - snapshotOn = (_, _, _) ⇒ false) - } + commandHandler: (ActorContext[Command], State, Command) ⇒ Effect[Event, State], + eventHandler: (State, Event) ⇒ State): PersistentBehavior[Command, Event, State] = + new PersistentBehavior( + persistenceId = persistenceId, + initialState = initialState, + commandHandler = commandHandler, + eventHandler = eventHandler + ) /** - * Factories for effects - how a persistent actor reacts on a command - */ - object Effect { - - def persist[Event, State](event: Event): Effect[Event, State] = - Persist(event) - - def persist[Event, A <: Event, B <: Event, State](evt1: A, evt2: B, events: Event*): Effect[Event, State] = - persist(evt1 :: evt2 :: events.toList) - - def persist[Event, State](eventOpt: Option[Event]): Effect[Event, State] = - eventOpt match { - case Some(evt) ⇒ persist[Event, State](evt) - case _ ⇒ none[Event, State] - } - - def persist[Event, State](events: im.Seq[Event]): Effect[Event, State] = - PersistAll(events) - - def persist[Event, State](events: im.Seq[Event], sideEffects: im.Seq[ChainableEffect[Event, State]]): Effect[Event, State] = - new CompositeEffect[Event, State](PersistAll[Event, State](events), sideEffects) - - /** - * Do not persist anything - */ - def none[Event, State]: Effect[Event, State] = PersistNothing.asInstanceOf[Effect[Event, State]] - - /** - * This command is not handled, but it is not an error that it isn't. - */ - def unhandled[Event, State]: Effect[Event, State] = Unhandled.asInstanceOf[Effect[Event, State]] - - /** - * Stop this persistent actor - */ - def stop[Event, State]: ChainableEffect[Event, State] = Stop.asInstanceOf[ChainableEffect[Event, State]] - } - - /** - * Instances are created through the factories in the [[Effect]] companion object. + * Create a `Behavior` for a persistent actor in Cluster Sharding, when the persistenceId is not known + * until the actor is started and typically based on the entityId, which + * is the actor name. * - * Not for user extension. + * TODO This will not be needed when it can be wrapped in `Actor.deferred`. */ - @DoNotInherit - trait Effect[+Event, State] { - self: EffectImpl[Event, State] ⇒ - /* All events that will be persisted in this effect */ - def events: im.Seq[Event] - - def sideEffects[E >: Event]: im.Seq[ChainableEffect[E, State]] - - /** Convenience method to register a side effect with just a callback function */ - final def andThen(callback: State ⇒ Unit): Effect[Event, State] = - CompositeEffect(this, SideEffect[Event, State](callback)) - - /** Convenience method to register a side effect with just a lazy expression */ - final def andThen(callback: ⇒ Unit): Effect[Event, State] = - CompositeEffect(this, SideEffect[Event, State]((_: State) ⇒ callback)) - - /** The side effect is to stop the actor */ - def andThenStop: Effect[Event, State] = - CompositeEffect(this, Effect.stop[Event, State]) - } - - /** - * Not for user extension - */ - @DoNotInherit - abstract class ChainableEffect[Event, State] extends EffectImpl[Event, State] - - type CommandHandler[Command, Event, State] = (ActorContext[Command], State, Command) ⇒ Effect[Event, State] + @Deprecated // FIXME remove this + def persistentEntity[Command, Event, State]( + persistenceIdFromActorName: String ⇒ String, + initialState: State, + commandHandler: (ActorContext[Command], State, Command) ⇒ Effect[Event, State], + eventHandler: (State, Event) ⇒ State): PersistentBehavior[Command, Event, State] = + ??? /** * The `CommandHandler` defines how to act on commands. @@ -137,19 +87,62 @@ object PersistentBehaviors { } } -class PersistentBehavior[Command, Event, State]( - @InternalApi private[akka] val persistenceIdFromActorName: String ⇒ String, - val initialState: State, - val commandHandler: PersistentBehaviors.CommandHandler[Command, Event, State], - val eventHandler: (State, Event) ⇒ State, - val recoveryCompleted: (ActorContext[Command], State) ⇒ Unit, - val tagger: Event ⇒ Set[String], - val snapshotOn: (State, Event, Long) ⇒ Boolean) extends UntypedBehavior[Command] { +@DoNotInherit +class PersistentBehavior[Command, Event, State] private ( + val persistenceId: String, + val initialState: State, + val commandHandler: PersistentBehaviors.CommandHandler[Command, Event, State], + val eventHandler: (State, Event) ⇒ State, - import PersistentBehaviors._ + val recoveryCompleted: (ActorContext[Command], State) ⇒ Unit, + val tagger: Event ⇒ Set[String], + val journalPluginId: String, + val snapshotPluginId: String, + val snapshotWhen: (State, Event, Long) ⇒ Boolean, + val recovery: Recovery +) extends DeferredBehavior[Command](ctx ⇒ + TimerSchedulerImpl.wrapWithTimers[Command] { timers ⇒ + val callbacks = EventsourcedCallbacks[Command, Event, State]( + initialState, + commandHandler, + eventHandler, + snapshotWhen, + recoveryCompleted, + tagger + ) + val pluginIds = EventsourcedPluginIds( + journalPluginId, + snapshotPluginId + ) + new EventsourcedRequestingRecoveryPermit( + persistenceId, + ctx.asInstanceOf[ActorContext[Any]], // sorry + timers.asInstanceOf[TimerScheduler[Any]], // sorry + recovery, + callbacks, + pluginIds + ).narrow[Command] - /** INTERNAL API */ - @InternalApi private[akka] override def untypedProps: akka.actor.Props = PersistentActorImpl.props(() ⇒ this) + }(ctx)) { + + def this( + persistenceId: String, + initialState: State, + commandHandler: PersistentBehaviors.CommandHandler[Command, Event, State], + eventHandler: (State, Event) ⇒ State) { + this( + persistenceId, + initialState, + commandHandler, + eventHandler, + recoveryCompleted = ConstantFun.scalaAnyTwoToUnit, + tagger = (_: Event) ⇒ Set.empty[String], + journalPluginId = "" /* default plugin */ , + snapshotPluginId = "" /* default plugin */ , + snapshotWhen = ConstantFun.scalaAnyThreeToFalse, + recovery = Recovery() + ) + } /** * The `callback` function is called to notify the actor that the recovery process @@ -165,8 +158,8 @@ class PersistentBehavior[Command, Event, State]( * * `predicate` receives the State, Event and the sequenceNr used for the Event */ - def snapshotOn(predicate: (State, Event, Long) ⇒ Boolean): PersistentBehavior[Command, Event, State] = - copy(snapshotOn = predicate) + def snapshotWhen(predicate: (State, Event, Long) ⇒ Boolean): PersistentBehavior[Command, Event, State] = + copy(snapshotWhen = predicate) /** * Snapshot every N events @@ -175,7 +168,35 @@ class PersistentBehavior[Command, Event, State]( */ def snapshotEvery(numberOfEvents: Long): PersistentBehavior[Command, Event, State] = { require(numberOfEvents > 0, s"numberOfEvents should be positive: Was $numberOfEvents") - copy(snapshotOn = (_, _, seqNr) ⇒ seqNr % numberOfEvents == 0) + copy(snapshotWhen = (_, _, seqNr) ⇒ seqNr % numberOfEvents == 0) + } + + /** + * Change the journal plugin id that this actor should use. + */ + def withPersistencePluginId(id: String): PersistentBehavior[Command, Event, State] = { + require(id != null, "persistence plugin id must not be null; use empty string for 'default' journal") + copy(journalPluginId = id) + } + + /** + * Change the snapshot store plugin id that this actor should use. + */ + def withSnapshotPluginId(id: String): PersistentBehavior[Command, Event, State] = { + require(id != null, "snapshot plugin id must not be null; use empty string for 'default' snapshot store") + copy(snapshotPluginId = id) + } + + /** + * Changes the snapshot selection criteria used by this behavior. + * By default the most recent snapshot is used, and the remaining state updates are recovered by replaying events + * from the sequence number up until which the snapshot reached. + * + * You may configure the behavior to skip recovering snapshots completely, in which case the recovery will be + * performed by replaying all events -- which may take a long time. + */ + def withSnapshotSelectionCriteria(selection: SnapshotSelectionCriteria): PersistentBehavior[Command, Event, State] = { + copy(recovery = Recovery(selection)) } /** @@ -185,12 +206,26 @@ class PersistentBehavior[Command, Event, State]( copy(tagger = tagger) private def copy( - persistenceIdFromActorName: String ⇒ String = persistenceIdFromActorName, - initialState: State = initialState, - commandHandler: CommandHandler[Command, Event, State] = commandHandler, - eventHandler: (State, Event) ⇒ State = eventHandler, - recoveryCompleted: (ActorContext[Command], State) ⇒ Unit = recoveryCompleted, - tagger: Event ⇒ Set[String] = tagger, - snapshotOn: (State, Event, Long) ⇒ Boolean = snapshotOn): PersistentBehavior[Command, Event, State] = - new PersistentBehavior(persistenceIdFromActorName, initialState, commandHandler, eventHandler, recoveryCompleted, tagger, snapshotOn) + initialState: State = initialState, + commandHandler: CommandHandler[Command, Event, State] = commandHandler, + eventHandler: (State, Event) ⇒ State = eventHandler, + recoveryCompleted: (ActorContext[Command], State) ⇒ Unit = recoveryCompleted, + tagger: Event ⇒ Set[String] = tagger, + snapshotWhen: (State, Event, Long) ⇒ Boolean = snapshotWhen, + journalPluginId: String = journalPluginId, + snapshotPluginId: String = snapshotPluginId, + recovery: Recovery = recovery): PersistentBehavior[Command, Event, State] = + new PersistentBehavior[Command, Event, State]( + persistenceId = persistenceId, + initialState = initialState, + commandHandler = commandHandler, + eventHandler = eventHandler, + recoveryCompleted = recoveryCompleted, + tagger = tagger, + journalPluginId = journalPluginId, + snapshotPluginId = snapshotPluginId, + snapshotWhen = snapshotWhen, + recovery = recovery) + } + diff --git a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorTest.java b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorTest.java index 8d7ea07599..54db4b9a40 100644 --- a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorTest.java +++ b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorTest.java @@ -7,8 +7,8 @@ import akka.actor.typed.ActorRef; import akka.actor.typed.javadsl.Behaviors; import akka.japi.Pair; import akka.japi.function.Function3; -import akka.persistence.typed.scaladsl.PersistentActorSpec; -import akka.persistence.typed.scaladsl.PersistentActorSpec$; +import akka.persistence.typed.scaladsl.PersistentBehaviorSpec; +import akka.persistence.typed.scaladsl.PersistentBehaviorSpec$; import akka.testkit.AkkaJUnitActorSystemResource; import akka.testkit.typed.javadsl.TestKitJunitResource; import akka.testkit.typed.scaladsl.ActorTestKit; @@ -26,7 +26,7 @@ import static org.junit.Assert.assertEquals; public class PersistentActorTest { @ClassRule - public static final TestKitJunitResource testKit = new TestKitJunitResource(PersistentActorSpec$.MODULE$.config()); + public static final TestKitJunitResource testKit = new TestKitJunitResource(PersistentBehaviorSpec$.MODULE$.config()); static final Incremented timeoutEvent = new Incremented(100); static final State emptyState = new State(0, Collections.emptyList()); diff --git a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorsTest.java b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorsTest.java index b213035df9..214687193d 100644 --- a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorsTest.java +++ b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorsTest.java @@ -30,22 +30,12 @@ public class BasicPersistentBehaviorsTest { @Override public CommandHandler commandHandler() { - return new CommandHandler() { - @Override - public Effect apply(ActorContext ctx, State state, Command command) { - return Effect().none(); - } - }; + return (ctx, state, command) -> Effect().none(); } @Override public EventHandler eventHandler() { - return new EventHandler() { - @Override - public State apply(State state, Event event) { - return state; - } - }; + return (state, event) -> state; } //#recovery diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala index 845f43b63c..f0c622d3a8 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala @@ -199,7 +199,7 @@ object PersistentActorCompileOnlyTest { eventHandler = (state, evt) ⇒ evt match { case TaskRegistered(task) ⇒ State(task :: state.tasksInFlight) case TaskRemoved(task) ⇒ State(state.tasksInFlight.filter(_ != task)) - }).snapshotOn { (state, e, seqNr) ⇒ state.tasksInFlight.isEmpty } + }).snapshotWhen { (state, e, seqNr) ⇒ state.tasksInFlight.isEmpty } } object SpawnChild { diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorSpec.scala similarity index 84% rename from akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorSpec.scala rename to akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorSpec.scala index 746fd64858..e3758dfff7 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorSpec.scala @@ -3,31 +3,31 @@ */ package akka.persistence.typed.scaladsl +import akka.actor.ActorSystemImpl import akka.actor.typed.scaladsl.Behaviors -import akka.actor.typed.{ ActorRef, ActorSystem, SupervisorStrategy, Terminated, TypedAkkaSpecWithShutdown } +import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, SupervisorStrategy, Terminated, TypedAkkaSpecWithShutdown } import akka.persistence.snapshot.SnapshotStore import akka.persistence.typed.scaladsl.PersistentBehaviors._ import akka.persistence.{ SelectedSnapshot, SnapshotMetadata, SnapshotSelectionCriteria } import akka.testkit.typed.TestKitSettings import akka.testkit.typed.scaladsl._ -import com.typesafe.config.ConfigFactory +import com.typesafe.config.{ Config, ConfigFactory } import org.scalatest.concurrent.Eventually import scala.concurrent.Future import scala.concurrent.duration._ -object PersistentActorSpec { +object PersistentBehaviorSpec { class InMemorySnapshotStore extends SnapshotStore { + private var state = Map.empty[String, (Any, SnapshotMetadata)] def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = { - log.debug("loadAsync: {} {}", persistenceId, criteria) Future.successful(state.get(persistenceId).map(r ⇒ SelectedSnapshot(r._2, r._1))) } def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = { - log.debug("saveAsync: {} {}", metadata, snapshot) state += (metadata.persistenceId -> (snapshot, metadata)) Future.successful(()) } @@ -39,13 +39,18 @@ object PersistentActorSpec { val config = ConfigFactory.parseString( s""" akka.loglevel = INFO - akka.persistence.snapshot-store.inmem.class = "akka.persistence.typed.scaladsl.PersistentActorSpec$$InMemorySnapshotStore" + # akka.persistence.typed.log-stashing = INFO + + akka.persistence.snapshot-store.inmem.class = "akka.persistence.typed.scaladsl.PersistentBehaviorSpec$$InMemorySnapshotStore" akka.persistence.journal.plugin = "akka.persistence.journal.inmem" akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.inmem" + """) sealed trait Command final case object Increment extends Command + final case object IncrementThenLogThenStop extends Command + final case object IncrementTwiceThenLogThenStop extends Command final case class IncrementWithPersistAll(nr: Int) extends Command final case object IncrementLater extends Command final case object IncrementAfterReceiveTimeout extends Command @@ -86,11 +91,28 @@ object PersistentActorSpec { commandHandler = (ctx, state, cmd) ⇒ cmd match { case Increment ⇒ Effect.persist(Incremented(1)) + + case IncrementThenLogThenStop ⇒ + Effect.persist(Incremented(1)) + .andThen { + loggingActor ! firstLogging + } + .andThenStop + + case IncrementTwiceThenLogThenStop ⇒ + Effect.persist(Incremented(1), Incremented(2)) + .andThen { + loggingActor ! firstLogging + } + .andThenStop + case IncrementWithPersistAll(n) ⇒ Effect.persist((0 until n).map(_ ⇒ Incremented(1))) + case GetValue(replyTo) ⇒ replyTo ! state Effect.none + case IncrementLater ⇒ // purpose is to test signals val delay = ctx.spawnAnonymous(Behaviors.withTimers[Tick.type] { timers ⇒ @@ -101,14 +123,18 @@ object PersistentActorSpec { }) ctx.watchWith(delay, DelayFinished) Effect.none + case DelayFinished ⇒ Effect.persist(Incremented(10)) + case IncrementAfterReceiveTimeout ⇒ ctx.setReceiveTimeout(10.millis, Timeout) Effect.none + case Timeout ⇒ ctx.cancelReceiveTimeout() Effect.persist(Incremented(100)) + case IncrementTwiceAndThenLog ⇒ Effect .persist(Incremented(1), Incremented(1)) @@ -132,25 +158,29 @@ object PersistentActorSpec { .andThen { loggingActor ! firstLogging } + case LogThenStop ⇒ - Effect.none.andThen { - loggingActor ! firstLogging - }.andThenStop + Effect.none + .andThen { + loggingActor ! firstLogging + } + .andThenStop }, eventHandler = (state, evt) ⇒ evt match { case Incremented(delta) ⇒ - probe ! (state, evt) + probe ! ((state, evt)) State(state.value + delta, state.history :+ state.value) }) } } -class PersistentActorSpec extends ActorTestKit with Eventually with TypedAkkaSpecWithShutdown { +class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown with Eventually { + import PersistentBehaviorSpec._ - override def config = PersistentActorSpec.config + override def config: Config = PersistentBehaviorSpec.config - import PersistentActorSpec._ + implicit val testSettings = TestKitSettings(system) "A typed persistent actor" must { @@ -171,7 +201,7 @@ class PersistentActorSpec extends ActorTestKit with Eventually with TypedAkkaSpe c ! Increment c ! Increment c ! GetValue(probe.ref) - probe.expectMessage(State(3, Vector(0, 1, 2))) + probe.expectMessage(10.seconds, State(3, Vector(0, 1, 2))) val c2 = spawn(counter("c2")) c2 ! GetValue(probe.ref) @@ -224,6 +254,27 @@ class PersistentActorSpec extends ActorTestKit with Eventually with TypedAkkaSpe loggingProbe.expectMessage(secondLogging) } + "persist then stop" in { + val loggingProbe = TestProbe[String] + val c = spawn(counter("c5a", loggingProbe.ref)) + val watchProbe = watcher(c) + + c ! IncrementThenLogThenStop + loggingProbe.expectMessage(firstLogging) + watchProbe.expectMessage("Terminated") + } + + "persist(All) then stop" in { + val loggingProbe = TestProbe[String] + val c = spawn(counter("c5b", loggingProbe.ref)) + val watchProbe = watcher(c) + + c ! IncrementTwiceThenLogThenStop + loggingProbe.expectMessage(firstLogging) + watchProbe.expectMessage("Terminated") + + } + /** Proves that side-effects are called when emitting an empty list of events */ "chainable side effects without events" in { val loggingProbe = TestProbe[String] @@ -249,10 +300,6 @@ class PersistentActorSpec extends ActorTestKit with Eventually with TypedAkkaSpe } "work when wrapped in other behavior" in { - // FIXME This is a major problem with current implementation. Since the - // behavior is running as an untyped PersistentActor it's not possible to - // wrap it in Actor.setup or Actor.supervise - pending val probe = TestProbe[State] val behavior = Behaviors.supervise[Command](counter("c13")) .onFailure(SupervisorStrategy.restartWithBackoff(1.second, 10.seconds, 0.1)) @@ -262,7 +309,7 @@ class PersistentActorSpec extends ActorTestKit with Eventually with TypedAkkaSpe probe.expectMessage(State(1, Vector(0))) } - "stop after persisting" in { + "stop after logging (no persisting)" in { val loggingProbe = TestProbe[String] val c: ActorRef[Command] = spawn(counter("c8", loggingProbe.ref)) val watchProbe = watcher(c) @@ -272,7 +319,7 @@ class PersistentActorSpec extends ActorTestKit with Eventually with TypedAkkaSpe } "snapshot via predicate" in { - val alwaysSnapshot = counter("c9").snapshotOn { (_, _, _) ⇒ true } + val alwaysSnapshot = counter("c9").snapshotWhen { (_, _, _) ⇒ true } val c = spawn(alwaysSnapshot) val watchProbe = watcher(c) val replyProbe = TestProbe[State]() @@ -293,9 +340,7 @@ class PersistentActorSpec extends ActorTestKit with Eventually with TypedAkkaSpe } "check all events for snapshot in PersistAll" in { - val snapshotAtTwo = counter("c11").snapshotOn { (s, e, _) ⇒ - s.value == 2 - } + val snapshotAtTwo = counter("c11").snapshotWhen { (s, _, _) ⇒ s.value == 2 } val c: ActorRef[Command] = spawn(snapshotAtTwo) val watchProbe = watcher(c) val replyProbe = TestProbe[State]() @@ -325,7 +370,7 @@ class PersistentActorSpec extends ActorTestKit with Eventually with TypedAkkaSpe c ! LogThenStop watchProbe.expectMessage("Terminated") - // no shapshot should have happened + // no snapshot should have happened val probeC2 = TestProbe[(State, Event)]() val c2 = spawn(counterWithProbe("c10", probeC2.ref).snapshotEvery(2)) probeC2.expectMessage[(State, Event)]((State(0, Vector()), Incremented(1))) diff --git a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentBehaviorSpec.scala b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentBehaviorSpec.scala index 604980a04f..b157af27e6 100644 --- a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentBehaviorSpec.scala +++ b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentBehaviorSpec.scala @@ -6,7 +6,8 @@ package docs.akka.persistence.typed import akka.Done import akka.actor.typed.{ ActorRef, Behavior } import akka.persistence.typed.scaladsl.PersistentBehaviors -import akka.persistence.typed.scaladsl.PersistentBehaviors.{ CommandHandler, Effect } +import akka.persistence.typed.scaladsl.PersistentBehaviors.CommandHandler +import akka.persistence.typed.scaladsl.Effect object InDepthPersistentBehaviorSpec { diff --git a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala index 53c3446072..a3abe74318 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala @@ -17,24 +17,24 @@ import scala.collection.immutable import scala.concurrent.duration.FiniteDuration import scala.util.control.NonFatal -/** - * INTERNAL API - */ +/** INTERNAL API */ +@InternalApi private[persistence] object Eventsourced { - // ok to wrap around (2*Int.MaxValue restarts will not happen within a journal roundtrip) + // ok to wrap around (2*Int.MaxValue restarts will not happen within a journal round-trip) private val instanceIdCounter = new AtomicInteger(1) - private sealed trait PendingHandlerInvocation { + /** INTERNAL API */ + private[akka] sealed trait PendingHandlerInvocation { def evt: Any def handler: Any ⇒ Unit } - /** forces actor to stash incoming commands until all these invocations are handled */ - private final case class StashingHandlerInvocation(evt: Any, handler: Any ⇒ Unit) extends PendingHandlerInvocation - /** does not force the actor to stash commands; Originates from either `persistAsync` or `defer` calls */ - private final case class AsyncHandlerInvocation(evt: Any, handler: Any ⇒ Unit) extends PendingHandlerInvocation + /** INTERNAL API: forces actor to stash incoming commands until all these invocations are handled */ + private[akka] final case class StashingHandlerInvocation(evt: Any, handler: Any ⇒ Unit) extends PendingHandlerInvocation + /** INTERNAL API: does not force the actor to stash commands; Originates from either `persistAsync` or `defer` calls */ + private[akka] final case class AsyncHandlerInvocation(evt: Any, handler: Any ⇒ Unit) extends PendingHandlerInvocation - /** message used to detect that recovery timed out */ - private final case class RecoveryTick(snapshot: Boolean) + /** INTERNAL API: message used to detect that recovery timed out */ + private[akka] final case class RecoveryTick(snapshot: Boolean) } /** diff --git a/akka-persistence/src/main/scala/akka/persistence/RecoveryPermitter.scala b/akka-persistence/src/main/scala/akka/persistence/RecoveryPermitter.scala index e49f90a723..fb3aa9869d 100644 --- a/akka-persistence/src/main/scala/akka/persistence/RecoveryPermitter.scala +++ b/akka-persistence/src/main/scala/akka/persistence/RecoveryPermitter.scala @@ -18,9 +18,11 @@ import akka.actor.Terminated def props(maxPermits: Int): Props = Props(new RecoveryPermitter(maxPermits)) - case object RequestRecoveryPermit - case object RecoveryPermitGranted - case object ReturnRecoveryPermit + sealed trait Protocol + sealed trait Reply extends Protocol + case object RequestRecoveryPermit extends Protocol + case object RecoveryPermitGranted extends Reply + case object ReturnRecoveryPermit extends Protocol } @@ -49,18 +51,18 @@ import akka.actor.Terminated } case ReturnRecoveryPermit ⇒ - returnRecoveryPermit(sender()) + onReturnRecoveryPermit(sender()) case Terminated(ref) ⇒ // pre-mature termination should be rare if (!pending.remove(ref)) - returnRecoveryPermit(ref) + onReturnRecoveryPermit(ref) } - private def returnRecoveryPermit(ref: ActorRef): Unit = { + private def onReturnRecoveryPermit(ref: ActorRef): Unit = { usedPermits -= 1 context.unwatch(ref) - if (usedPermits < 0) throw new IllegalStateException("permits must not be negative") + if (usedPermits < 0) throw new IllegalStateException(s"permits must not be negative (returned by: ${ref})") if (!pending.isEmpty) { val ref = pending.poll() recoveryPermitGranted(ref) diff --git a/akka-persistence/src/main/scala/akka/persistence/Snapshotter.scala b/akka-persistence/src/main/scala/akka/persistence/Snapshotter.scala index f8e899205b..3edd80b2c8 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Snapshotter.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Snapshotter.scala @@ -29,7 +29,7 @@ trait Snapshotter extends Actor { * Instructs the snapshot store to load the specified snapshot and send it via an [[SnapshotOffer]] * to the running [[PersistentActor]]. */ - def loadSnapshot(persistenceId: String, criteria: SnapshotSelectionCriteria, toSequenceNr: Long) = + def loadSnapshot(persistenceId: String, criteria: SnapshotSelectionCriteria, toSequenceNr: Long): Unit = snapshotStore ! LoadSnapshot(persistenceId, criteria, toSequenceNr) /** diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala index c4c637e42c..c89997ac8c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala @@ -3,7 +3,7 @@ */ package akka.stream.impl.io -import java.io.{ IOException, InputStream } +import java.io.{ BufferedOutputStream, ByteArrayOutputStream, IOException, InputStream } import java.util.concurrent.{ BlockingQueue, LinkedBlockingDeque, TimeUnit } import akka.annotation.InternalApi