diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/BehaviorSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/BehaviorSpec.scala index 6332519a9e..4530ef9359 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/BehaviorSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/BehaviorSpec.scala @@ -3,8 +3,8 @@ */ package akka.actor.typed -import akka.actor.typed.scaladsl.{ Behaviors ⇒ SActor } -import akka.actor.typed.javadsl.{ ActorContext ⇒ JActorContext, Behaviors ⇒ JActor } +import akka.actor.typed.scaladsl.{ Behaviors ⇒ SBehaviors } +import akka.actor.typed.javadsl.{ ActorContext ⇒ JActorContext, Behaviors ⇒ JBehaviors } import akka.japi.function.{ Function ⇒ F1e, Function2 ⇒ F2, Procedure2 ⇒ P2 } import akka.japi.pf.{ FI, PFBuilder } import java.util.function.{ Function ⇒ F1 } @@ -137,16 +137,16 @@ object BehaviorSpec { } def mkFull(monitor: ActorRef[Event], state: State = StateA): Behavior[Command] = { - SActor.immutable[Command] { + SBehaviors.immutable[Command] { case (ctx, GetSelf) ⇒ monitor ! Self(ctx.self) - SActor.same + SBehaviors.same case (_, Miss) ⇒ monitor ! Missed - SActor.unhandled + SBehaviors.unhandled case (_, Ignore) ⇒ monitor ! Ignored - SActor.same + SBehaviors.same case (_, Ping) ⇒ monitor ! Pong mkFull(monitor, state) @@ -155,13 +155,13 @@ object BehaviorSpec { mkFull(monitor, state.next) case (_, GetState()) ⇒ monitor ! state - SActor.same - case (_, Stop) ⇒ SActor.stopped - case (_, _) ⇒ SActor.unhandled + SBehaviors.same + case (_, Stop) ⇒ SBehaviors.stopped + case (_, _) ⇒ SBehaviors.unhandled } onSignal { case (_, signal) ⇒ monitor ! GotSignal(signal) - SActor.same + SBehaviors.same } } /* @@ -345,16 +345,16 @@ class FullBehaviorSpec extends TypedAkkaSpec with Messages with BecomeWithLifecy class ImmutableBehaviorSpec extends Messages with BecomeWithLifecycle with Stoppable { override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor, StateA) → null private def behv(monitor: ActorRef[Event], state: State): Behavior[Command] = { - SActor.immutable[Command] { + SBehaviors.immutable[Command] { case (ctx, GetSelf) ⇒ monitor ! Self(ctx.self) - SActor.same + SBehaviors.same case (_, Miss) ⇒ monitor ! Missed - SActor.unhandled + SBehaviors.unhandled case (_, Ignore) ⇒ monitor ! Ignored - SActor.same + SBehaviors.same case (_, Ping) ⇒ monitor ! Pong behv(monitor, state) @@ -363,13 +363,13 @@ class ImmutableBehaviorSpec extends Messages with BecomeWithLifecycle with Stopp behv(monitor, state.next) case (_, GetState()) ⇒ monitor ! state - SActor.same - case (_, Stop) ⇒ SActor.stopped - case (_, _: AuxPing) ⇒ SActor.unhandled + SBehaviors.same + case (_, Stop) ⇒ SBehaviors.stopped + case (_, _: AuxPing) ⇒ SBehaviors.unhandled } onSignal { case (_, signal) ⇒ monitor ! GotSignal(signal) - SActor.same + SBehaviors.same } } } @@ -379,18 +379,18 @@ class ImmutableWithSignalScalaBehaviorSpec extends TypedAkkaSpec with Messages w override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor) → null def behv(monitor: ActorRef[Event], state: State = StateA): Behavior[Command] = - SActor.immutable[Command] { + SBehaviors.immutable[Command] { (ctx, msg) ⇒ msg match { case GetSelf ⇒ monitor ! Self(ctx.self) - SActor.same + SBehaviors.same case Miss ⇒ monitor ! Missed - SActor.unhandled + SBehaviors.unhandled case Ignore ⇒ monitor ! Ignored - SActor.same + SBehaviors.same case Ping ⇒ monitor ! Pong behv(monitor, state) @@ -399,14 +399,14 @@ class ImmutableWithSignalScalaBehaviorSpec extends TypedAkkaSpec with Messages w behv(monitor, state.next) case GetState() ⇒ monitor ! state - SActor.same - case Stop ⇒ SActor.stopped - case _: AuxPing ⇒ SActor.unhandled + SBehaviors.same + case Stop ⇒ SBehaviors.stopped + case _: AuxPing ⇒ SBehaviors.unhandled } } onSignal { case (_, sig) ⇒ monitor ! GotSignal(sig) - SActor.same + SBehaviors.same } } @@ -415,17 +415,17 @@ class ImmutableScalaBehaviorSpec extends Messages with Become with Stoppable { override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor, StateA) → null def behv(monitor: ActorRef[Event], state: State): Behavior[Command] = - SActor.immutable[Command] { (ctx, msg) ⇒ + SBehaviors.immutable[Command] { (ctx, msg) ⇒ msg match { case GetSelf ⇒ monitor ! Self(ctx.self) - SActor.same + SBehaviors.same case Miss ⇒ monitor ! Missed - SActor.unhandled + SBehaviors.unhandled case Ignore ⇒ monitor ! Ignored - SActor.same + SBehaviors.same case Ping ⇒ monitor ! Pong behv(monitor, state) @@ -434,9 +434,9 @@ class ImmutableScalaBehaviorSpec extends Messages with Become with Stoppable { behv(monitor, state.next) case GetState() ⇒ monitor ! state - SActor.same - case Stop ⇒ SActor.stopped - case _: AuxPing ⇒ SActor.unhandled + SBehaviors.same + case Stop ⇒ SBehaviors.stopped + case _: AuxPing ⇒ SBehaviors.unhandled } } } @@ -446,8 +446,8 @@ class MutableScalaBehaviorSpec extends Messages with Become with Stoppable { override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor) → null def behv(monitor: ActorRef[Event]): Behavior[Command] = - SActor.mutable[Command] { ctx ⇒ - new SActor.MutableBehavior[Command] { + SBehaviors.mutable[Command] { ctx ⇒ + new SBehaviors.MutableBehavior[Command] { private var state: State = StateA override def onMessage(msg: Command): Behavior[Command] = { @@ -457,10 +457,10 @@ class MutableScalaBehaviorSpec extends Messages with Become with Stoppable { this case Miss ⇒ monitor ! Missed - SActor.unhandled + SBehaviors.unhandled case Ignore ⇒ monitor ! Ignored - SActor.same // this or same works the same way + SBehaviors.same // this or same works the same way case Ping ⇒ monitor ! Pong this @@ -471,8 +471,8 @@ class MutableScalaBehaviorSpec extends Messages with Become with Stoppable { case GetState() ⇒ monitor ! state this - case Stop ⇒ SActor.stopped - case _: AuxPing ⇒ SActor.unhandled + case Stop ⇒ SBehaviors.stopped + case _: AuxPing ⇒ SBehaviors.unhandled } } } @@ -481,7 +481,7 @@ class MutableScalaBehaviorSpec extends Messages with Become with Stoppable { class WidenedScalaBehaviorSpec extends ImmutableWithSignalScalaBehaviorSpec with Reuse with Siphon { - import SActor.BehaviorDecorators + import SBehaviors.BehaviorDecorators override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = { val inbox = TestInbox[Command]("widenedListener") @@ -494,7 +494,7 @@ class DeferredScalaBehaviorSpec extends ImmutableWithSignalScalaBehaviorSpec { override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = { val inbox = TestInbox[Done]("deferredListener") - (SActor.setup(_ ⇒ { + (SBehaviors.setup(_ ⇒ { inbox.ref ! Done super.behavior(monitor)._1 }), inbox) @@ -507,30 +507,30 @@ class DeferredScalaBehaviorSpec extends ImmutableWithSignalScalaBehaviorSpec { class TapScalaBehaviorSpec extends ImmutableWithSignalScalaBehaviorSpec with Reuse with SignalSiphon { override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = { val inbox = TestInbox[Either[Signal, Command]]("tapListener") - (SActor.tap((_, msg) ⇒ inbox.ref ! Right(msg), (_, sig) ⇒ inbox.ref ! Left(sig), super.behavior(monitor)._1), inbox) + (SBehaviors.tap((_, msg) ⇒ inbox.ref ! Right(msg), (_, sig) ⇒ inbox.ref ! Left(sig), super.behavior(monitor)._1), inbox) } } class RestarterScalaBehaviorSpec extends ImmutableWithSignalScalaBehaviorSpec with Reuse { override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = { - SActor.supervise(super.behavior(monitor)._1).onFailure(SupervisorStrategy.restart) → null + SBehaviors.supervise(super.behavior(monitor)._1).onFailure(SupervisorStrategy.restart) → null } } class ImmutableWithSignalJavaBehaviorSpec extends Messages with BecomeWithLifecycle with Stoppable { override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor) → null def behv(monitor: ActorRef[Event], state: State = StateA): Behavior[Command] = - JActor.immutable( + JBehaviors.immutable( fc((ctx, msg) ⇒ msg match { case GetSelf ⇒ monitor ! Self(ctx.getSelf) - SActor.same + SBehaviors.same case Miss ⇒ monitor ! Missed - SActor.unhandled + SBehaviors.unhandled case Ignore ⇒ monitor ! Ignored - SActor.same + SBehaviors.same case Ping ⇒ monitor ! Pong behv(monitor, state) @@ -539,31 +539,31 @@ class ImmutableWithSignalJavaBehaviorSpec extends Messages with BecomeWithLifecy behv(monitor, state.next) case GetState() ⇒ monitor ! state - SActor.same - case Stop ⇒ SActor.stopped - case _: AuxPing ⇒ SActor.unhandled + SBehaviors.same + case Stop ⇒ SBehaviors.stopped + case _: AuxPing ⇒ SBehaviors.unhandled }), fs((_, sig) ⇒ { monitor ! GotSignal(sig) - SActor.same + SBehaviors.same })) } class ImmutableJavaBehaviorSpec extends Messages with Become with Stoppable { override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor, StateA) → null def behv(monitor: ActorRef[Event], state: State): Behavior[Command] = - JActor.immutable { + JBehaviors.immutable { fc((ctx, msg) ⇒ msg match { case GetSelf ⇒ monitor ! Self(ctx.getSelf) - SActor.same + SBehaviors.same case Miss ⇒ monitor ! Missed - SActor.unhandled + SBehaviors.unhandled case Ignore ⇒ monitor ! Ignored - SActor.same + SBehaviors.same case Ping ⇒ monitor ! Pong behv(monitor, state) @@ -572,9 +572,9 @@ class ImmutableJavaBehaviorSpec extends Messages with Become with Stoppable { behv(monitor, state.next) case GetState() ⇒ monitor ! state - SActor.same - case Stop ⇒ SActor.stopped - case _: AuxPing ⇒ SActor.unhandled + SBehaviors.same + case Stop ⇒ SBehaviors.stopped + case _: AuxPing ⇒ SBehaviors.unhandled }) } } @@ -582,7 +582,7 @@ class ImmutableJavaBehaviorSpec extends Messages with Become with Stoppable { class WidenedJavaBehaviorSpec extends ImmutableWithSignalJavaBehaviorSpec with Reuse with Siphon { override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = { val inbox = TestInbox[Command]("widenedListener") - JActor.widened(super.behavior(monitor)._1, pf(_.`match`(classOf[Command], fi(x ⇒ { + JBehaviors.widened(super.behavior(monitor)._1, pf(_.`match`(classOf[Command], fi(x ⇒ { inbox.ref ! x x })))) → inbox @@ -594,7 +594,7 @@ class DeferredJavaBehaviorSpec extends ImmutableWithSignalJavaBehaviorSpec { override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = { val inbox = TestInbox[Done]("deferredListener") - (JActor.setup(df(_ ⇒ { + (JBehaviors.setup(df(_ ⇒ { inbox.ref ! Done super.behavior(monitor)._1 })), inbox) @@ -607,7 +607,7 @@ class DeferredJavaBehaviorSpec extends ImmutableWithSignalJavaBehaviorSpec { class TapJavaBehaviorSpec extends ImmutableWithSignalJavaBehaviorSpec with Reuse with SignalSiphon { override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = { val inbox = TestInbox[Either[Signal, Command]]("tapListener") - (JActor.tap( + (JBehaviors.tap( pc((_, msg) ⇒ inbox.ref ! Right(msg)), ps((_, sig) ⇒ inbox.ref ! Left(sig)), super.behavior(monitor)._1), inbox) @@ -616,7 +616,7 @@ class TapJavaBehaviorSpec extends ImmutableWithSignalJavaBehaviorSpec with Reuse class RestarterJavaBehaviorSpec extends ImmutableWithSignalJavaBehaviorSpec with Reuse { override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = { - JActor.supervise(super.behavior(monitor)._1) + JBehaviors.supervise(super.behavior(monitor)._1) .onFailure(classOf[Exception], SupervisorStrategy.restart) → null } } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/WatchSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/WatchSpec.scala index 4b510c68b1..0a0bff0a33 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/WatchSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/WatchSpec.scala @@ -5,6 +5,7 @@ package akka.actor.typed import akka.Done import akka.actor.typed.scaladsl.Behaviors +import akka.actor.typed.scaladsl.Behaviors.MutableBehavior import akka.actor.typed.scaladsl.adapter._ import akka.testkit.EventFilter import akka.testkit.typed.scaladsl.{ ActorTestKit, TestProbe } @@ -23,6 +24,12 @@ object WatchSpec { case (_, Stop) ⇒ Behaviors.stopped } + val mutableTerminatorBehavior = new MutableBehavior[Stop.type] { + override def onMessage(msg: Stop.type) = msg match { + case Stop ⇒ Behaviors.stopped + } + } + sealed trait Message sealed trait CustomTerminationMessage extends Message case object CustomTerminationMessage extends CustomTerminationMessage diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/adapter/AdapterSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/adapter/AdapterSpec.scala index fa71c68e65..6b2d2bf2b9 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/adapter/AdapterSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/adapter/AdapterSpec.scala @@ -10,7 +10,7 @@ import akka.actor.{ InvalidMessageException, Props } import akka.actor.typed.scaladsl.Behaviors import akka.{ Done, NotUsed, actor ⇒ untyped } import akka.testkit._ -import akka.actor.typed.Behavior.UntypedBehavior +import akka.actor.typed.Behavior.UntypedPropsBehavior import scala.concurrent.Await @@ -301,8 +301,9 @@ class AdapterSpec extends AkkaSpec { "spawn untyped behavior anonymously" in { val probe = TestProbe() - val untypedBehavior: Behavior[String] = new UntypedBehavior[String] { - override private[akka] def untypedProps: Props = untypedForwarder(probe.ref) + val untypedBehavior: Behavior[String] = new UntypedPropsBehavior[String] { + override def untypedProps(props: akka.actor.typed.Props): akka.actor.Props = + untypedForwarder(probe.ref) } val ref = system.spawnAnonymous(untypedBehavior) ref ! "hello" @@ -311,8 +312,9 @@ class AdapterSpec extends AkkaSpec { "spawn untyped behavior" in { val probe = TestProbe() - val untypedBehavior: Behavior[String] = new UntypedBehavior[String] { - override private[akka] def untypedProps: Props = untypedForwarder(probe.ref) + val untypedBehavior: Behavior[String] = new UntypedPropsBehavior[String] { + override def untypedProps(props: akka.actor.typed.Props): akka.actor.Props = + untypedForwarder(probe.ref) } val ref = system.spawn(untypedBehavior, "test") ref ! "hello" diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala index 828a65e7b4..96f6c8c2c4 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala @@ -150,7 +150,7 @@ object Behavior { } /** - * INTERNAL API. + * INTERNAL API */ @InternalApi private[akka] object UnhandledBehavior extends Behavior[Nothing] { @@ -158,14 +158,13 @@ object Behavior { } /** - * INTERNAL API. + * INTERNAL API + * Used to create untyped props from behaviours, or directly returning an untyped props that implements this behavior. */ @InternalApi - private[akka] abstract class UntypedBehavior[T] extends Behavior[T] { - /** - * INTERNAL API - */ - @InternalApi private[akka] def untypedProps: akka.actor.Props + private[akka] abstract class UntypedPropsBehavior[T] extends Behavior[T] { + /** INTERNAL API */ + @InternalApi private[akka] def untypedProps(props: Props): akka.actor.Props } /** @@ -180,7 +179,8 @@ object Behavior { * Not placed in internal.BehaviorImpl because Behavior is sealed. */ @InternalApi - private[akka] final case class DeferredBehavior[T](factory: SAC[T] ⇒ Behavior[T]) extends Behavior[T] { + @DoNotInherit + private[akka] class DeferredBehavior[T](val factory: SAC[T] ⇒ Behavior[T]) extends Behavior[T] { /** start the deferred behavior */ @throws(classOf[Exception]) @@ -188,6 +188,10 @@ object Behavior { override def toString: String = s"Deferred(${LineNumbers(factory)})" } + object DeferredBehavior { + def apply[T](factory: SAC[T] ⇒ Behavior[T]) = + new DeferredBehavior[T](factory) + } /** * INTERNAL API. @@ -299,12 +303,12 @@ object Behavior { def interpretSignal[T](behavior: Behavior[T], ctx: ActorContext[T], signal: Signal): Behavior[T] = interpret(behavior, ctx, signal) - private def interpret[T](behavior: Behavior[T], ctx: ActorContext[T], msg: Any): Behavior[T] = + private def interpret[T](behavior: Behavior[T], ctx: ActorContext[T], msg: Any): Behavior[T] = { behavior match { case null ⇒ throw new InvalidMessageException("[null] is not an allowed message") case SameBehavior | UnhandledBehavior ⇒ throw new IllegalArgumentException(s"cannot execute with [$behavior] as behavior") - case _: UntypedBehavior[_] ⇒ + case _: UntypedPropsBehavior[_] ⇒ throw new IllegalArgumentException(s"cannot wrap behavior [$behavior] in " + "Behaviors.setup, Behaviors.supervise or similar") case d: DeferredBehavior[_] ⇒ throw new IllegalArgumentException(s"deferred [$d] should not be passed to interpreter") @@ -318,6 +322,7 @@ object Behavior { } start(possiblyDeferredResult, ctx) } + } /** * INTERNAL API @@ -331,10 +336,8 @@ object Behavior { if (!Behavior.isAlive(b2) || !messages.hasNext) b2 else { val nextB = messages.next() match { - case sig: Signal ⇒ - Behavior.interpretSignal(b2, ctx, sig) - case msg ⇒ - Behavior.interpretMessage(b2, ctx, msg) + case sig: Signal ⇒ Behavior.interpretSignal(b2, ctx, sig) + case msg ⇒ Behavior.interpretMessage(b2, ctx, msg) } interpretOne(Behavior.canonicalize(nextB, b, ctx)) // recursive } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala index f88a2b1e85..961ea0121e 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala @@ -18,7 +18,6 @@ import scala.util.Try import akka.annotation.InternalApi import akka.util.OptionVal -import akka.event.LoggingAdapter import akka.util.Timeout /** diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/BehaviorImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/BehaviorImpl.scala index 8931abb11c..387cf08c1e 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/BehaviorImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/BehaviorImpl.scala @@ -61,7 +61,9 @@ import scala.reflect.ClassTag override def receiveSignal(ctx: AC[T], msg: Signal): Behavior[T] = onSignal.applyOrElse((ctx.asScala, msg), Behavior.unhandledSignal.asInstanceOf[PartialFunction[(SAC[T], Signal), Behavior[T]]]) + override def receiveMessage(ctx: AC[T], msg: T) = onMessage(ctx.asScala, msg) + override def toString = s"Immutable(${LineNumbers(onMessage)})" } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/StashBufferImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/StashBufferImpl.scala index bb690fe01c..b219eb2e36 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/StashBufferImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/StashBufferImpl.scala @@ -86,7 +86,7 @@ import akka.util.ConstantFun } } - override def forEach(f: Consumer[T]): Unit = foreach(f.accept) + override def forEach(f: Consumer[T]): Unit = foreach(f.accept(_)) override def unstashAll(ctx: scaladsl.ActorContext[T], behavior: Behavior[T]): Behavior[T] = unstash(ctx, behavior, size, ConstantFun.scalaIdentityFunction[T]) @@ -98,8 +98,8 @@ import akka.util.ConstantFun numberOfMessages: Int, wrap: T ⇒ T): Behavior[T] = { val iter = new Iterator[T] { override def hasNext: Boolean = StashBufferImpl.this.nonEmpty - override def next(): T = StashBufferImpl.this.dropHead() - }.take(numberOfMessages).map(wrap) + override def next(): T = wrap(StashBufferImpl.this.dropHead()) + }.take(numberOfMessages) val ctx = scaladslCtx.asInstanceOf[ActorContext[T]] Behavior.interpretMessages[T](behavior, ctx, iter) } @@ -108,5 +108,7 @@ import akka.util.ConstantFun numberOfMessages: Int, wrap: JFunction[T, T]): Behavior[T] = unstash(ctx.asScala, behavior, numberOfMessages, x ⇒ wrap.apply(x)) + override def toString: String = + s"StashBuffer($size/$capacity)" } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/TimerSchedulerImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/TimerSchedulerImpl.scala index d5ed348a4d..9ccb6eb574 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/TimerSchedulerImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/TimerSchedulerImpl.scala @@ -26,11 +26,13 @@ import scala.reflect.ClassTag final case class TimerMsg(key: Any, generation: Int, owner: AnyRef) def withTimers[T](factory: TimerSchedulerImpl[T] ⇒ Behavior[T]): Behavior[T] = { - scaladsl.Behaviors.setup[T] { ctx ⇒ - val timerScheduler = new TimerSchedulerImpl[T](ctx) - val behavior = factory(timerScheduler) - timerScheduler.intercept(behavior) - } + scaladsl.Behaviors.setup[T](wrapWithTimers(factory)) + } + + def wrapWithTimers[T](factory: TimerSchedulerImpl[T] ⇒ Behavior[T])(ctx: ActorContext[T]): Behavior[T] = { + val timerScheduler = new TimerSchedulerImpl[T](ctx) + val behavior = factory(timerScheduler) + timerScheduler.intercept(behavior) } } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorContextAdapter.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorContextAdapter.scala index 01003415f2..5b3f1ae9a8 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorContextAdapter.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorContextAdapter.scala @@ -6,7 +6,7 @@ package internal package adapter import akka.actor.ExtendedActorSystem -import akka.actor.typed.Behavior.UntypedBehavior +import akka.actor.typed.Behavior.UntypedPropsBehavior import akka.annotation.InternalApi import akka.util.OptionVal import akka.{ ConfigurationException, actor ⇒ a } @@ -125,9 +125,10 @@ import scala.concurrent.duration._ def spawnAnonymous[T](ctx: akka.actor.ActorContext, behavior: Behavior[T], props: Props): ActorRef[T] = { behavior match { - case b: UntypedBehavior[_] ⇒ + case b: UntypedPropsBehavior[_] ⇒ // TODO dispatcher from props - ActorRefAdapter(ctx.actorOf(b.untypedProps)) + ActorRefAdapter(ctx.actorOf(b.untypedProps(props))) + case _ ⇒ try { Behavior.validateAsInitial(behavior) @@ -141,9 +142,10 @@ import scala.concurrent.duration._ def spawn[T](ctx: akka.actor.ActorContext, behavior: Behavior[T], name: String, props: Props): ActorRef[T] = { behavior match { - case b: UntypedBehavior[_] ⇒ + case b: UntypedPropsBehavior[_] ⇒ // TODO dispatcher from props - ActorRefAdapter(ctx.actorOf(b.untypedProps, name)) + ActorRefAdapter(ctx.actorOf(b.untypedProps(props), name)) + case _ ⇒ try { Behavior.validateAsInitial(behavior) diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/package.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/package.scala index c2d4d6ebef..e4429e6c43 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/package.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/package.scala @@ -1,3 +1,6 @@ +/** + * Copyright (C) 2017-2018 Lightbend Inc. + */ package akka.actor import akka.actor.typed.internal.ActorRefImpl diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/ActorContext.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/ActorContext.scala index 973a2b7e00..1e2de14a56 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/ActorContext.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/ActorContext.scala @@ -273,7 +273,6 @@ trait ActorContext[T] { this: akka.actor.typed.javadsl.ActorContext[T] ⇒ * @tparam Req The request protocol, what the other actor accepts * @tparam Res The response protocol, what the other actor sends back */ - def ask[Req, Res]( - otherActor: ActorRef[Req])(createRequest: ActorRef[Res] ⇒ Req)(mapResponse: Try[Res] ⇒ T)(implicit responseTimeout: Timeout, classTag: ClassTag[Res]): Unit + def ask[Req, Res](otherActor: ActorRef[Req])(createRequest: ActorRef[Res] ⇒ Req)(mapResponse: Try[Res] ⇒ T)(implicit responseTimeout: Timeout, classTag: ClassTag[Res]): Unit } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Behaviors.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Behaviors.scala index 5187d9d8eb..f6641f21ea 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Behaviors.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Behaviors.scala @@ -99,7 +99,7 @@ object Behaviors { @throws(classOf[Exception]) override final def receiveSignal(ctx: akka.actor.typed.ActorContext[T], msg: Signal): Behavior[T] = - onSignal.applyOrElse(msg, { case _ ⇒ Behavior.unhandled }: PartialFunction[Signal, Behavior[T]]) + onSignal.applyOrElse(msg, { case msg ⇒ Behavior.unhandled }: PartialFunction[Signal, Behavior[T]]) /** * Override this method to process an incoming [[akka.actor.typed.Signal]] and return the next behavior. diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/adapter/package.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/adapter/package.scala index 506851edd2..e1d20ad33c 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/adapter/package.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/adapter/package.scala @@ -4,7 +4,7 @@ package akka.actor.typed package scaladsl -import akka.actor.typed.Behavior.UntypedBehavior +import akka.actor.typed.Behavior.UntypedPropsBehavior import akka.actor.typed.internal.adapter._ /** @@ -38,8 +38,8 @@ package object adapter { def spawnAnonymous[T](behavior: Behavior[T], props: Props = Props.empty): ActorRef[T] = { behavior match { - case b: UntypedBehavior[_] ⇒ - ActorRefAdapter(sys.actorOf(b.untypedProps)) + case b: UntypedPropsBehavior[_] ⇒ + ActorRefAdapter(sys.actorOf(b.untypedProps(props))) case _ ⇒ ActorRefAdapter(sys.actorOf(PropsAdapter(Behavior.validateAsInitial(behavior), props))) } @@ -47,8 +47,8 @@ package object adapter { def spawn[T](behavior: Behavior[T], name: String, props: Props = Props.empty): ActorRef[T] = { behavior match { - case b: UntypedBehavior[_] ⇒ - ActorRefAdapter(sys.actorOf(b.untypedProps, name)) + case b: UntypedPropsBehavior[_] ⇒ + ActorRefAdapter(sys.actorOf(b.untypedProps(props), name)) case _ ⇒ ActorRefAdapter(sys.actorOf(PropsAdapter(Behavior.validateAsInitial(behavior), props), name)) } diff --git a/akka-actor/src/main/scala/akka/util/ConstantFun.scala b/akka-actor/src/main/scala/akka/util/ConstantFun.scala index 3105d040ad..2a97f764c9 100644 --- a/akka-actor/src/main/scala/akka/util/ConstantFun.scala +++ b/akka-actor/src/main/scala/akka/util/ConstantFun.scala @@ -27,6 +27,8 @@ import akka.japi.{ Pair ⇒ JPair } def scalaAnyToNone[A, B]: A ⇒ Option[B] = none def scalaAnyTwoToNone[A, B, C]: (A, B) ⇒ Option[C] = two2none + def scalaAnyTwoToUnit[A, B]: (A, B) ⇒ Unit = two2unit + def scalaAnyThreeToFalse[A, B, C]: (A, B, C) ⇒ Boolean = three2false def javaAnyToNone[A, B]: A ⇒ Option[B] = none def nullFun[T] = _nullFun.asInstanceOf[Any ⇒ T] @@ -44,4 +46,8 @@ import akka.japi.{ Pair ⇒ JPair } private val two2none = (_: Any, _: Any) ⇒ None + private val two2unit = (_: Any, _: Any) ⇒ () + + private val three2false = (_: Any, _: Any, _: Any) ⇒ false + } diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala index 76a0ad6fc0..d461456b64 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala @@ -14,7 +14,7 @@ import akka.actor.{ InternalActorRef, Scheduler } import akka.actor.typed.ActorRef import akka.actor.typed.ActorSystem import akka.actor.typed.Behavior -import akka.actor.typed.Behavior.UntypedBehavior +import akka.actor.typed.Behavior.UntypedPropsBehavior import akka.actor.typed.Props import akka.actor.typed.internal.adapter.ActorRefAdapter import akka.actor.typed.internal.adapter.ActorSystemAdapter @@ -142,8 +142,8 @@ import akka.japi.function.{ Function ⇒ JFunction } val untypedEntityPropsFactory: String ⇒ akka.actor.Props = { entityId ⇒ behavior(entityId) match { - case u: UntypedBehavior[_] ⇒ u.untypedProps // PersistentBehavior - case b ⇒ PropsAdapter(b, entityProps) + case u: UntypedPropsBehavior[_] ⇒ u.untypedProps(Props.empty) // PersistentBehavior + case b ⇒ PropsAdapter(b, entityProps) } } diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingPersistenceSpec.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingPersistenceSpec.scala index 273a7cef4f..0d6eee4566 100644 --- a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingPersistenceSpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingPersistenceSpec.scala @@ -10,7 +10,7 @@ import akka.actor.typed.TypedAkkaSpecWithShutdown import akka.cluster.sharding.typed.ClusterShardingSettings import akka.cluster.typed.Cluster import akka.cluster.typed.Join -import akka.persistence.typed.scaladsl.PersistentBehaviors +import akka.persistence.typed.scaladsl.{ Effect, PersistentBehaviors } import akka.testkit.typed.scaladsl.{ ActorTestKit, TestProbe } import com.typesafe.config.ConfigFactory diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/AdaptedClusterSingletonImpl.scala b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/AdaptedClusterSingletonImpl.scala index 60d1bec605..610e890934 100644 --- a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/AdaptedClusterSingletonImpl.scala +++ b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/AdaptedClusterSingletonImpl.scala @@ -9,7 +9,7 @@ import java.util.function.{ Function ⇒ JFunction } import akka.actor.{ ExtendedActorSystem, InvalidActorNameException } import akka.annotation.InternalApi import akka.cluster.singleton.{ ClusterSingletonProxy, ClusterSingletonManager ⇒ OldSingletonManager } -import akka.actor.typed.Behavior.UntypedBehavior +import akka.actor.typed.Behavior.UntypedPropsBehavior import akka.cluster.typed.{ Cluster, ClusterSingleton, ClusterSingletonImpl, ClusterSingletonSettings } import akka.actor.typed.internal.adapter.ActorSystemAdapter import akka.actor.typed.scaladsl.adapter._ @@ -40,8 +40,8 @@ private[akka] final class AdaptedClusterSingletonImpl(system: ActorSystem[_]) ex val managerName = managerNameFor(singletonName) // start singleton on this node val untypedProps = behavior match { - case u: UntypedBehavior[_] ⇒ u.untypedProps // PersistentBehavior - case _ ⇒ PropsAdapter(behavior, props) + case u: UntypedPropsBehavior[_] ⇒ u.untypedProps(props) // PersistentBehavior + case _ ⇒ PropsAdapter(behavior, props) } try { untypedSystem.systemActorOf( diff --git a/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterSingletonPersistenceSpec.scala b/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterSingletonPersistenceSpec.scala index 5f5124151f..4575a19c02 100644 --- a/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterSingletonPersistenceSpec.scala +++ b/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterSingletonPersistenceSpec.scala @@ -5,8 +5,7 @@ package akka.cluster.typed import akka.actor.typed.{ ActorRef, Behavior, Props, TypedAkkaSpecWithShutdown } -import akka.persistence.typed.scaladsl.PersistentBehaviors -import akka.persistence.typed.scaladsl.PersistentBehaviors.{ CommandHandler, Effect } +import akka.persistence.typed.scaladsl.{ Effect, PersistentBehaviors } import akka.testkit.typed.scaladsl.{ ActorTestKit, TestProbe } import com.typesafe.config.ConfigFactory diff --git a/akka-docs/src/main/paradox/typed/persistence.md b/akka-docs/src/main/paradox/typed/persistence.md index 46c309a79e..19a9ccd473 100644 --- a/akka-docs/src/main/paradox/typed/persistence.md +++ b/akka-docs/src/main/paradox/typed/persistence.md @@ -62,8 +62,10 @@ and can be used to create various effects such as: External side effects can be performed after successful persist with the `andThen` function e.g @scala[`Effect.persist(..).andThen`]@java[`Effect().persist(..).andThen`]. -In the example below a reply is sent to the `replyTo` ActorRef. Note that the new state after applying -the event is passed as parameter to the `andThen` function. + +In the example below a reply is sent to the `replyTo` ActorRef. Note that the new state after applying +the event is passed as parameter to the `andThen` function. All `andThen*` registered callbacks +are executed after successful execution of the persist statement (or immediately, in case of `none` and `unhandled`). ### Event handler diff --git a/akka-persistence-typed/src/main/resources/reference.conf b/akka-persistence-typed/src/main/resources/reference.conf index e69de29bb2..15febc5cb0 100644 --- a/akka-persistence-typed/src/main/resources/reference.conf +++ b/akka-persistence-typed/src/main/resources/reference.conf @@ -0,0 +1,11 @@ +akka.persistence.typed { + + # default stash buffer size for incoming messages to persistent actors + stash-buffer-size = 1024 + + # enables automatic logging of messages stashed automatically by an PersistentBehavior, + # this may happen while it receives commands while it is recovering events or while it is persisting events + # Set to a log-level (debug, info, warn, error) to log stashed messages on given log level; + log-stashing = off + +} diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EffectImpl.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EffectImpl.scala index 315f6eaa55..2e7240d333 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EffectImpl.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EffectImpl.scala @@ -3,17 +3,15 @@ */ package akka.persistence.typed.internal -import akka.persistence.typed.{ javadsl ⇒ j } +import akka.persistence.typed.javadsl +import akka.persistence.typed.scaladsl import scala.collection.{ immutable ⇒ im } -import akka.annotation.InternalApi -import akka.persistence.typed.scaladsl.PersistentBehaviors.{ ChainableEffect, Effect } +import akka.annotation.{ DoNotInherit, InternalApi } -/** - * INTERNAL API - */ +/** INTERNAL API */ @InternalApi -private[akka] abstract class EffectImpl[+Event, State] extends j.Effect[Event, State] with Effect[Event, State] { +private[akka] abstract class EffectImpl[+Event, State] extends javadsl.Effect[Event, State] with scaladsl.Effect[Event, State] { /* All events that will be persisted in this effect */ override def events: im.Seq[Event] = Nil @@ -21,19 +19,14 @@ private[akka] abstract class EffectImpl[+Event, State] extends j.Effect[Event, S override def sideEffects[E >: Event]: im.Seq[ChainableEffect[E, State]] = Nil } -/** - * INTERNAL API - */ +/** INTERNAL API */ @InternalApi private[akka] object CompositeEffect { - def apply[Event, State](effect: EffectImpl[Event, State], sideEffects: ChainableEffect[Event, State]): EffectImpl[Event, State] = { - CompositeEffect[Event, State]( - effect, - sideEffects :: Nil - ) - } + def apply[Event, State](effect: EffectImpl[Event, State], sideEffects: ChainableEffect[Event, State]): EffectImpl[Event, State] = + CompositeEffect[Event, State](effect, sideEffects :: Nil) } +/** INTERNAL API */ @InternalApi private[akka] final case class CompositeEffect[Event, State]( persistingEffect: EffectImpl[Event, State], @@ -43,25 +36,39 @@ private[akka] final case class CompositeEffect[Event, State]( override def sideEffects[E >: Event]: im.Seq[ChainableEffect[E, State]] = _sideEffects.asInstanceOf[im.Seq[ChainableEffect[E, State]]] + override def toString: String = + s"CompositeEffect($persistingEffect, sideEffects: ${_sideEffects.size})" } +/** INTERNAL API */ @InternalApi private[akka] case object PersistNothing extends EffectImpl[Nothing, Nothing] +/** INTERNAL API */ @InternalApi private[akka] case class Persist[Event, State](event: Event) extends EffectImpl[Event, State] { override def events = event :: Nil } +/** INTERNAL API */ @InternalApi private[akka] case class PersistAll[Event, State](override val events: im.Seq[Event]) extends EffectImpl[Event, State] +/** INTERNAL API */ @InternalApi private[akka] case class SideEffect[Event, State](effect: State ⇒ Unit) extends ChainableEffect[Event, State] +/** INTERNAL API */ @InternalApi private[akka] case object Stop extends ChainableEffect[Nothing, Nothing] +/** INTERNAL API */ @InternalApi private[akka] case object Unhandled extends EffectImpl[Nothing, Nothing] +/** + * Not for user extension + */ +@DoNotInherit +abstract class ChainableEffect[Event, State] extends EffectImpl[Event, State] + diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedBehavior.scala new file mode 100644 index 0000000000..66a19b2e29 --- /dev/null +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedBehavior.scala @@ -0,0 +1,102 @@ +/** + * Copyright (C) 2016-2018 Lightbend Inc. + */ + +package akka.persistence.typed.internal + +import java.util.UUID +import java.util.concurrent.atomic.AtomicInteger + +import akka.actor.NoSerializationVerificationNeeded +import akka.actor.typed.Behavior +import akka.actor.typed.Behavior.StoppedBehavior +import akka.actor.typed.scaladsl.{ ActorContext, TimerScheduler } +import akka.annotation.InternalApi +import akka.event.{ LogSource, Logging } +import akka.persistence.typed.scaladsl.PersistentBehaviors +import akka.persistence.{ JournalProtocol, Persistence, RecoveryPermitter, SnapshotProtocol } +import akka.{ actor ⇒ a } + +/** INTERNAL API */ +@InternalApi +private[akka] object EventsourcedBehavior { + + // ok to wrap around (2*Int.MaxValue restarts will not happen within a journal roundtrip) + private[akka] val instanceIdCounter = new AtomicInteger(1) + + @InternalApi private[akka] object WriterIdentity { + def newIdentity(): WriterIdentity = { + val instanceId: Int = EventsourcedBehavior.instanceIdCounter.getAndIncrement() + val writerUuid: String = UUID.randomUUID.toString + WriterIdentity(instanceId, writerUuid) + } + } + private[akka] final case class WriterIdentity(instanceId: Int, writerUuid: String) + + /** Protocol used internally by the eventsourced behaviors, never exposed to user-land */ + private[akka] sealed trait EventsourcedProtocol + private[akka] case object RecoveryPermitGranted extends EventsourcedProtocol + private[akka] final case class JournalResponse(msg: akka.persistence.JournalProtocol.Response) extends EventsourcedProtocol + private[akka] final case class SnapshotterResponse(msg: akka.persistence.SnapshotProtocol.Response) extends EventsourcedProtocol + private[akka] final case class RecoveryTickEvent(snapshot: Boolean) extends EventsourcedProtocol + private[akka] final case class ReceiveTimeout(timeout: akka.actor.ReceiveTimeout) extends EventsourcedProtocol + + implicit object PersistentBehaviorLogSource extends LogSource[EventsourcedBehavior[_, _, _]] { + override def genString(b: EventsourcedBehavior[_, _, _]): String = { + val behaviorShortName = b match { + case _: EventsourcedRunning[_, _, _] ⇒ "running" + case _: EventsourcedRecoveringEvents[_, _, _] ⇒ "recover-events" + case _: EventsourcedRecoveringSnapshot[_, _, _] ⇒ "recover-snap" + case _: EventsourcedRequestingRecoveryPermit[_, _, _] ⇒ "awaiting-permit" + } + s"PersistentBehavior[id:${b.persistenceId}][${b.context.self.path}][$behaviorShortName]" + } + } + +} + +/** INTERNAL API */ +@InternalApi +private[akka] trait EventsourcedBehavior[Command, Event, State] { + import EventsourcedBehavior._ + import akka.actor.typed.scaladsl.adapter._ + + protected def context: ActorContext[Any] + protected def timers: TimerScheduler[Any] + + type C = Command + type AC = ActorContext[C] + type E = Event + type S = State + + // used for signaling intent in type signatures + type SeqNr = Long + + def persistenceId: String + + protected def callbacks: EventsourcedCallbacks[Command, Event, State] + protected def initialState: State = callbacks.initialState + protected def commandHandler: PersistentBehaviors.CommandHandler[Command, Event, State] = callbacks.commandHandler + protected def eventHandler: (State, Event) ⇒ State = callbacks.eventHandler + protected def snapshotWhen: (State, Event, SeqNr) ⇒ Boolean = callbacks.snapshotWhen + protected def tagger: Event ⇒ Set[String] = callbacks.tagger + + protected def pluginIds: EventsourcedPluginIds + protected final def journalPluginId: String = pluginIds.journalPluginId + protected final def snapshotPluginId: String = pluginIds.snapshotPluginId + + // ------ common ------- + + protected lazy val extension = Persistence(context.system.toUntyped) + protected lazy val journal: a.ActorRef = extension.journalFor(journalPluginId) + protected lazy val snapshotStore: a.ActorRef = extension.snapshotStoreFor(snapshotPluginId) + + protected lazy val selfUntyped: a.ActorRef = context.self.toUntyped + protected lazy val selfUntypedAdapted: a.ActorRef = context.messageAdapter[Any] { + case res: JournalProtocol.Response ⇒ JournalResponse(res) + case RecoveryPermitter.RecoveryPermitGranted ⇒ RecoveryPermitGranted + case res: SnapshotProtocol.Response ⇒ SnapshotterResponse(res) + case cmd: Command @unchecked ⇒ cmd // if it was wrong, we'll realise when trying to onMessage the cmd + }.toUntyped + +} diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedCarriers.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedCarriers.scala new file mode 100644 index 0000000000..864a4e4d70 --- /dev/null +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedCarriers.scala @@ -0,0 +1,20 @@ +package akka.persistence.typed.internal + +import akka.actor.typed.scaladsl.ActorContext +import akka.persistence.typed.scaladsl.PersistentBehaviors.CommandHandler + +/** INTERNAL API: Used to carry user callbacks between behaviors of an Persistent Behavior */ +private[akka] final case class EventsourcedCallbacks[Command, Event, State]( + initialState: State, + commandHandler: CommandHandler[Command, Event, State], + eventHandler: (State, Event) ⇒ State, + snapshotWhen: (State, Event, Long) ⇒ Boolean, + recoveryCompleted: (ActorContext[Command], State) ⇒ Unit, + tagger: Event ⇒ Set[String] +) + +/** INTERNAL API: Used to carry settings between behaviors of an Persistent Behavior */ +private[akka] final case class EventsourcedPluginIds( + journalPluginId: String, + snapshotPluginId: String +) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRecoveringEvents.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRecoveringEvents.scala new file mode 100644 index 0000000000..f09483187d --- /dev/null +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRecoveringEvents.scala @@ -0,0 +1,222 @@ +/** + * Copyright (C) 2016-2018 Lightbend Inc. + */ +package akka.persistence.typed.internal + +import akka.actor.typed.Behavior +import akka.actor.typed.scaladsl.Behaviors.MutableBehavior +import akka.actor.typed.scaladsl.{ ActorContext, Behaviors, StashBuffer, TimerScheduler } +import akka.annotation.InternalApi +import akka.event.Logging +import akka.persistence.JournalProtocol._ +import akka.persistence._ +import akka.persistence.typed.internal.EventsourcedBehavior.WriterIdentity +import akka.persistence.typed.scaladsl.PersistentBehaviors._ +import akka.util.Helpers._ + +import scala.util.control.NonFatal + +/** + * INTERNAL API + * + * Third (of four) behavior of an PersistentBehavior. + * + * In this behavior we finally start replaying events, beginning from the last applied sequence number + * (i.e. the one up-until-which the snapshot recovery has brought us). + * + * Once recovery is completed, the actor becomes [[EventsourcedRunning]], stashed messages are flushed + * and control is given to the user's handlers to drive the actors behavior from there. + * + */ +@InternalApi +private[akka] class EventsourcedRecoveringEvents[Command, Event, State]( + val persistenceId: String, + override val context: ActorContext[Any], + override val timers: TimerScheduler[Any], + override val internalStash: StashBuffer[Any], + + val recovery: Recovery, + private var sequenceNr: Long, + val writerIdentity: WriterIdentity, + + private var state: State, + + val callbacks: EventsourcedCallbacks[Command, Event, State], + val pluginIds: EventsourcedPluginIds +) extends MutableBehavior[Any] + with EventsourcedBehavior[Command, Event, State] + with EventsourcedStashManagement { + + import Behaviors.same + import EventsourcedBehavior._ + import akka.actor.typed.scaladsl.adapter._ + + protected val log = Logging(context.system.toUntyped, this) + + // -------- initialize -------- + startRecoveryTimer() + + replayEvents(sequenceNr + 1L, recovery.toSequenceNr) + // ---- end of initialize ---- + + private def commandContext: ActorContext[Command] = context.asInstanceOf[ActorContext[Command]] + + // ---------- + + def snapshotSequenceNr: Long = sequenceNr + + private def updateLastSequenceNr(persistent: PersistentRepr): Unit = + if (persistent.sequenceNr > sequenceNr) sequenceNr = persistent.sequenceNr + + private def setLastSequenceNr(value: Long): Unit = + sequenceNr = value + + // ---------- + + // FIXME it's a bit of a pain to have those lazy vals, change everything to constructor parameters + lazy val timeout = extension.journalConfigFor(journalPluginId).getMillisDuration("recovery-event-timeout") + + // protect against snapshot stalling forever because of journal overloaded and such + private val RecoveryTickTimerKey = "recovery-tick" + private def startRecoveryTimer(): Unit = timers.startPeriodicTimer(RecoveryTickTimerKey, RecoveryTickEvent(snapshot = false), timeout) + private def cancelRecoveryTimer(): Unit = timers.cancel(RecoveryTickTimerKey) + + private var eventSeenInInterval = false + + def onCommand(cmd: Command): Behavior[Any] = { + // during recovery, stash all incoming commands + stash(cmd) + same + } + + def onJournalResponse(response: JournalProtocol.Response): Behavior[Any] = try { + response match { + case ReplayedMessage(repr) ⇒ + eventSeenInInterval = true + updateLastSequenceNr(repr) + // TODO we need some state adapters here? + val newState = eventHandler(state, repr.payload.asInstanceOf[Event]) + state = newState + same + + case RecoverySuccess(highestSeqNr) ⇒ + log.debug("Recovery successful, recovered until sequenceNr: {}", highestSeqNr) + cancelRecoveryTimer() + setLastSequenceNr(highestSeqNr) + + try onRecoveryCompleted(state) + catch { case NonFatal(ex) ⇒ onRecoveryFailure(ex, Some(state)) } + + case ReplayMessagesFailure(cause) ⇒ + onRecoveryFailure(cause, event = None) + + case other ⇒ + stash(other) + Behaviors.same + } + } catch { + case NonFatal(e) ⇒ + cancelRecoveryTimer() + onRecoveryFailure(e, None) + } + + def onSnapshotterResponse(response: SnapshotProtocol.Response): Behavior[Any] = { + log.warning("Unexpected [{}] from SnapshotStore, already in recovering events state.", Logging.simpleName(response)) + Behaviors.same // ignore the response + } + + /** + * Called whenever a message replay fails. By default it logs the error. + * + * The actor is always stopped after this method has been invoked. + * + * @param cause failure cause. + * @param event the event that was processed in `receiveRecover`, if the exception was thrown there + */ + protected def onRecoveryFailure(cause: Throwable, event: Option[Any]): Behavior[Any] = { + returnRecoveryPermit("on recovery failure: " + cause.getMessage) + cancelRecoveryTimer() + + event match { + case Some(evt) ⇒ + log.error(cause, "Exception in receiveRecover when replaying event type [{}] with sequence number [{}].", evt.getClass.getName, sequenceNr) + Behaviors.stopped + + case None ⇒ + log.error(cause, "Persistence failure when replaying events. Last known sequence number [{}]", persistenceId, sequenceNr) + Behaviors.stopped + } + } + + protected def onRecoveryCompleted(state: State): Behavior[Any] = { + try { + returnRecoveryPermit("recovery completed successfully") + callbacks.recoveryCompleted(commandContext, state) + + val running = new EventsourcedRunning[Command, Event, State]( + persistenceId, + context, + timers, + internalStash, + + sequenceNr, + writerIdentity, + + state, + + callbacks, + pluginIds + ) + + tryUnstash(context, running) + } finally { + cancelRecoveryTimer() + } + } + + protected def onRecoveryTick(snapshot: Boolean): Behavior[Any] = + if (!snapshot) { + if (!eventSeenInInterval) { + cancelRecoveryTimer() + val msg = s"Recovery timed out, didn't get event within $timeout, highest sequence number seen $sequenceNr" + onRecoveryFailure(new RecoveryTimedOut(msg), event = None) // TODO allow users to hook into this? + } else { + eventSeenInInterval = false + same + } + } else { + // snapshot timeout, but we're already in the events recovery phase + Behavior.unhandled + } + + // ---------- + + override def onMessage(msg: Any): Behavior[Any] = { + msg match { + // TODO explore crazy hashcode hack to make this match quicker...? + case JournalResponse(r) ⇒ onJournalResponse(r) + case RecoveryTickEvent(snapshot) ⇒ onRecoveryTick(snapshot = snapshot) + case SnapshotterResponse(r) ⇒ onSnapshotterResponse(r) + case c: Command @unchecked ⇒ onCommand(c.asInstanceOf[Command]) // explicit cast to fail eagerly + } + } + + // ---------- + + // ---------- journal interactions --------- + + private def replayEvents(fromSeqNr: SeqNr, toSeqNr: SeqNr): Unit = { + log.debug("Replaying messages: from: {}, to: {}", fromSeqNr, toSeqNr) + // reply is sent to `selfUntypedAdapted`, it is important to target that one + journal ! ReplayMessages(fromSeqNr, toSeqNr, recovery.replayMax, persistenceId, selfUntypedAdapted) + } + + private def returnRecoveryPermit(reason: String): Unit = { + log.debug("Returning recovery permit, reason: " + reason) + // IMPORTANT to use selfUntyped, and not an adapter, since recovery permitter watches/unwatches those refs (and adapters are new refs) + extension.recoveryPermitter.tell(RecoveryPermitter.ReturnRecoveryPermit, selfUntyped) + } + + override def toString = s"EventsourcedRecoveringEvents($persistenceId)" + +} diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRecoveringSnapshot.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRecoveringSnapshot.scala new file mode 100644 index 0000000000..232d81e535 --- /dev/null +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRecoveringSnapshot.scala @@ -0,0 +1,217 @@ +/** + * Copyright (C) 2016-2018 Lightbend Inc. + */ +package akka.persistence.typed.internal + +import akka.actor.typed.Behavior +import akka.actor.typed.scaladsl.Behaviors.MutableBehavior +import akka.actor.typed.scaladsl.{ ActorContext, Behaviors, StashBuffer, TimerScheduler } +import akka.annotation.InternalApi +import akka.event.Logging +import akka.persistence.SnapshotProtocol.{ LoadSnapshot, LoadSnapshotFailed, LoadSnapshotResult } +import akka.persistence._ +import akka.persistence.typed.internal.EventsourcedBehavior.WriterIdentity +import akka.util.Helpers._ + +import scala.util.control.NonFatal +import scala.util.{ Failure, Success, Try } + +/** + * INTERNAL API + * + * Second (of four) behavior of an PersistentBehavior. + * + * In this behavior the recovery process is initiated. + * We try to obtain a snapshot from the configured snapshot store, + * and if it exists, we use it instead of the `initialState`. + * + * Once snapshot recovery is done (or no snapshot was selected), + * recovery of events continues in [[EventsourcedRecoveringEvents]]. + */ +@InternalApi +final class EventsourcedRecoveringSnapshot[Command, Event, State]( + val persistenceId: String, + override val context: ActorContext[Any], + override val timers: TimerScheduler[Any], + override val internalStash: StashBuffer[Any], + + val recovery: Recovery, + val writerIdentity: WriterIdentity, + + val callbacks: EventsourcedCallbacks[Command, Event, State], + val pluginIds: EventsourcedPluginIds +) extends MutableBehavior[Any] + with EventsourcedBehavior[Command, Event, State] + with EventsourcedStashManagement { + + import Behaviors.same + import EventsourcedBehavior._ + import akka.actor.typed.scaladsl.adapter._ + + protected val log = Logging(context.system.toUntyped, this) + + // -------- initialize -------- + startRecoveryTimer() + + loadSnapshot(persistenceId, recovery.fromSnapshot, recovery.toSequenceNr) + // ---- end of initialize ---- + + val commandContext: ActorContext[Command] = context.asInstanceOf[ActorContext[Command]] + + // ---------- + + protected var awaitingSnapshot: Boolean = true + + // ---------- + + private var lastSequenceNr: Long = 0L + def snapshotSequenceNr: Long = lastSequenceNr + + // ---------- + + lazy val timeout = extension.journalConfigFor(journalPluginId).getMillisDuration("recovery-event-timeout") + + // protect against snapshot stalling forever because of journal overloaded and such + private val RecoveryTickTimerKey = "recovery-tick" + private def startRecoveryTimer(): Unit = { + timers.startPeriodicTimer(RecoveryTickTimerKey, RecoveryTickEvent(snapshot = false), timeout) + } + private def cancelRecoveryTimer(): Unit = timers.cancel(RecoveryTickTimerKey) + + def onCommand(cmd: Command): Behavior[Any] = { + // during recovery, stash all incoming commands + stash(cmd) + Behavior.same + } + + def onJournalResponse(response: JournalProtocol.Response): Behavior[Any] = try { + throw new Exception("Should not talk to journal yet! But got: " + response) + } catch { + case NonFatal(cause) ⇒ + returnRecoveryPermitOnlyOnFailure(cause) + throw cause + } + + def onSnapshotterResponse(response: SnapshotProtocol.Response): Behavior[Any] = try { + response match { + case LoadSnapshotResult(sso, toSnr) ⇒ + var state: S = initialState + val re: Try[SeqNr] = Try { + sso match { + case Some(SelectedSnapshot(metadata, snapshot)) ⇒ + state = snapshot.asInstanceOf[State] + metadata.sequenceNr + + case None ⇒ + 0 // from the start please + } + } + + re match { + case Success(seqNr) ⇒ + lastSequenceNr = seqNr + replayMessages(state, toSnr) + + case Failure(cause) ⇒ + // FIXME better exception type + val ex = new RuntimeException(s"Failed to recover state for [$persistenceId] from snapshot offer.", cause) + onRecoveryFailure(ex, event = None) // FIXME the failure logs has bad messages... FIXME + } + + case LoadSnapshotFailed(cause) ⇒ + cancelRecoveryTimer() + + onRecoveryFailure(cause, event = None) + + case other ⇒ + stash(other) + same + } + } catch { + case NonFatal(cause) ⇒ + returnRecoveryPermitOnlyOnFailure(cause) + throw cause + } + + private def replayMessages(state: State, toSnr: SeqNr): Behavior[Any] = { + cancelRecoveryTimer() + + val rec = recovery.copy(toSequenceNr = toSnr, fromSnapshot = SnapshotSelectionCriteria.None) // TODO introduce new types + + new EventsourcedRecoveringEvents[Command, Event, State]( + persistenceId, + context, + timers, + internalStash, + + rec, + lastSequenceNr, + writerIdentity, + + state, + callbacks, + pluginIds + ) + } + + /** + * Called whenever a message replay fails. By default it logs the error. + * + * The actor is always stopped after this method has been invoked. + * + * @param cause failure cause. + * @param event the event that was processed in `receiveRecover`, if the exception was thrown there + */ + protected def onRecoveryFailure(cause: Throwable, event: Option[Any]): Behavior[Any] = { + cancelRecoveryTimer() + event match { + case Some(evt) ⇒ + log.error(cause, "Exception in receiveRecover when replaying event type [{}] with sequence number [{}] for " + + "persistenceId [{}].", evt.getClass.getName, lastSequenceNr, persistenceId) + Behaviors.stopped + + case None ⇒ + log.error(cause, "Persistence failure when replaying events for persistenceId [{}]. " + + "Last known sequence number [{}]", persistenceId, lastSequenceNr) + Behaviors.stopped + } + } + + protected def onRecoveryTick(snapshot: Boolean): Behavior[Any] = + // we know we're in snapshotting mode + if (snapshot) onRecoveryFailure(new RecoveryTimedOut(s"Recovery timed out, didn't get snapshot within $timeout"), event = None) + else same // ignore, since we received the snapshot already + + // ---------- + + override def onMessage(msg: Any): Behavior[Any] = { + msg match { + // TODO explore crazy hashcode hack to make this match quicker...? + case SnapshotterResponse(r) ⇒ onSnapshotterResponse(r) + case JournalResponse(r) ⇒ onJournalResponse(r) + case RecoveryTickEvent(snapshot) ⇒ onRecoveryTick(snapshot = snapshot) + case c: Command @unchecked ⇒ onCommand(c.asInstanceOf[Command]) // explicit cast to fail eagerly + } + } + + // ---------- + + // ---------- journal interactions --------- + + /** + * Instructs the snapshot store to load the specified snapshot and send it via an [[SnapshotOffer]] + * to the running [[PersistentActor]]. + */ + private def loadSnapshot(persistenceId: String, criteria: SnapshotSelectionCriteria, toSequenceNr: Long): Unit = { + snapshotStore.tell(LoadSnapshot(persistenceId, criteria, toSequenceNr), selfUntypedAdapted) + } + + private def returnRecoveryPermitOnlyOnFailure(cause: Throwable): Unit = { + log.debug("Returning recovery permit, on failure because: " + cause.getMessage) + // IMPORTANT to use selfUntyped, and not an adapter, since recovery permitter watches/unwatches those refs (and adapters are new refs) + extension.recoveryPermitter.tell(RecoveryPermitter.ReturnRecoveryPermit, selfUntyped) + } + + override def toString = s"EventsourcedRecoveringSnapshot($persistenceId)" + +} diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRequestingRecoveryPermit.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRequestingRecoveryPermit.scala new file mode 100644 index 0000000000..6b1d7517da --- /dev/null +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRequestingRecoveryPermit.scala @@ -0,0 +1,96 @@ +/** + * Copyright (C) 2016-2018 Lightbend Inc. + */ +package akka.persistence.typed.internal + +import akka.actor.typed.Behavior +import akka.actor.typed.scaladsl.Behaviors.MutableBehavior +import akka.actor.typed.scaladsl.{ ActorContext, Behaviors, StashBuffer, TimerScheduler } +import akka.annotation.InternalApi +import akka.event.Logging +import akka.persistence._ +import akka.persistence.typed.internal.EventsourcedBehavior.WriterIdentity + +/** + * INTERNAL API + * + * First (of four) behaviour of an PersistentBehaviour. + * + * Requests a permit to start recovering this actor; this is tone to avoid + * hammering the journal with too many concurrently recovering actors. + */ +@InternalApi +private[akka] final class EventsourcedRequestingRecoveryPermit[Command, Event, State]( + val persistenceId: String, + override val context: ActorContext[Any], + override val timers: TimerScheduler[Any], + + val recovery: Recovery, + + val callbacks: EventsourcedCallbacks[Command, Event, State], + val pluginIds: EventsourcedPluginIds +) extends MutableBehavior[Any] + with EventsourcedBehavior[Command, Event, State] + with EventsourcedStashManagement { + + import akka.actor.typed.scaladsl.adapter._ + + // has to be lazy, since we want to obtain the persistenceId + protected lazy val log = Logging(context.system.toUntyped, this) + + override protected val internalStash: StashBuffer[Any] = { + val stashSize = context.system.settings.config + .getInt("akka.persistence.typed.stash-buffer-size") + StashBuffer[Any](stashSize) + } + + // --- initialization --- + // only once we have a permit, we can become active: + requestRecoveryPermit() + + val writerIdentity: WriterIdentity = WriterIdentity.newIdentity() + + // --- end of initialization --- + + // ---------- + + def becomeRecovering(): Behavior[Any] = { + log.debug(s"Initializing snapshot recovery: {}", recovery) + + new EventsourcedRecoveringSnapshot( + persistenceId, + context, + timers, + internalStash, + + recovery, + writerIdentity, + + callbacks, + pluginIds + ) + } + + // ---------- + + override def onMessage(msg: Any): Behavior[Any] = { + msg match { + case RecoveryPermitter.RecoveryPermitGranted ⇒ + log.debug("Awaiting permit, received: RecoveryPermitGranted") + becomeRecovering() + + case other ⇒ + stash(other) + Behaviors.same + } + } + + // ---------- journal interactions --------- + + private def requestRecoveryPermit(): Unit = { + // IMPORTANT to use selfUntyped, and not an adapter, since recovery permitter watches/unwatches those refs (and adapters are new refs) + extension.recoveryPermitter.tell(RecoveryPermitter.RequestRecoveryPermit, selfUntyped) + } + + override def toString = s"EventsourcedRequestingRecoveryPermit($persistenceId)" +} diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala new file mode 100644 index 0000000000..f19a0ed886 --- /dev/null +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala @@ -0,0 +1,365 @@ +/** + * Copyright (C) 2016-2018 Lightbend Inc. + */ +package akka.persistence.typed.internal + +import akka.actor.typed.Behavior +import akka.actor.typed.Behavior.StoppedBehavior +import akka.actor.typed.scaladsl.Behaviors.MutableBehavior +import akka.actor.typed.scaladsl.{ ActorContext, Behaviors, StashBuffer, TimerScheduler } +import akka.annotation.InternalApi +import akka.event.Logging +import akka.persistence.Eventsourced.{ PendingHandlerInvocation, StashingHandlerInvocation } +import akka.persistence.JournalProtocol._ +import akka.persistence._ +import akka.persistence.journal.Tagged +import akka.persistence.typed.internal.EventsourcedBehavior.WriterIdentity + +import scala.annotation.tailrec +import scala.collection.immutable + +/** + * INTERNAL API + * + * Fourth (of four) -- also known as 'final' or 'ultimate' -- form of PersistentBehavior. + * + * In this phase recovery has completed successfully and we continue handling incoming commands, + * as well as persisting new events as dictated by the user handlers. + * + * This behavior operates in two phases: + * - HandlingCommands - where the command handler is invoked for incoming commands + * - PersistingEvents - where incoming commands are stashed until persistence completes + * + * This is implemented as such to avoid creating many EventsourcedRunning instances, + * which perform the Persistence extension lookup on creation and similar things (config lookup) + * + */ +@InternalApi +class EventsourcedRunning[Command, Event, State]( + val persistenceId: String, + override val context: ActorContext[Any], + override val timers: TimerScheduler[Any], + override val internalStash: StashBuffer[Any], + + private var sequenceNr: Long, + val writerIdentity: WriterIdentity, + + private var state: State, + + val callbacks: EventsourcedCallbacks[Command, Event, State], + val pluginIds: EventsourcedPluginIds +) extends MutableBehavior[Any] + with EventsourcedBehavior[Command, Event, State] + with EventsourcedStashManagement { same ⇒ + + import EventsourcedBehavior._ + import akka.actor.typed.scaladsl.adapter._ + + protected val log = Logging(context.system.toUntyped, this) + + private def commandContext: ActorContext[Command] = context.asInstanceOf[ActorContext[Command]] + + // ---------- + + // Holds callbacks for persist calls (note that we do not implement persistAsync currently) + private def hasNoPendingInvocations: Boolean = pendingInvocations.isEmpty + private val pendingInvocations = new java.util.LinkedList[PendingHandlerInvocation]() // we only append / isEmpty / get(0) on it + + // ---------- + + private def snapshotSequenceNr: Long = sequenceNr + + private def updateLastSequenceNr(persistent: PersistentRepr): Unit = + if (persistent.sequenceNr > sequenceNr) sequenceNr = persistent.sequenceNr + private def nextSequenceNr(): Long = { + sequenceNr += 1L + sequenceNr + } + // ---------- + + private def onSnapshotterResponse(response: SnapshotProtocol.Response): Behavior[Any] = { + response match { + case SaveSnapshotSuccess(meta) ⇒ + log.debug("Save snapshot successful: " + meta) + same + case SaveSnapshotFailure(meta, ex) ⇒ + log.error(ex, "Save snapshot failed! " + meta) + same // FIXME https://github.com/akka/akka/issues/24637 should we provide callback for this? to allow Stop + } + } + + // ---------- + + trait EventsourcedRunningPhase { + def name: String + def onCommand(c: Command): Behavior[Any] + def onJournalResponse(response: JournalProtocol.Response): Behavior[Any] + } + + object HandlingCommands extends EventsourcedRunningPhase { + def name = "HandlingCommands" + + final override def onCommand(command: Command): Behavior[Any] = { + val effect = commandHandler(commandContext, state, command) + applyEffects(command, effect.asInstanceOf[EffectImpl[E, S]]) // TODO can we avoid the cast? + } + final override def onJournalResponse(response: Response): Behavior[Any] = { + // should not happen, what would it reply? + throw new RuntimeException("Received message which should not happen in Running state!") + } + } + + object PersistingEventsNoSideEffects extends PersistingEvents(Nil) + + sealed class PersistingEvents(sideEffects: immutable.Seq[ChainableEffect[_, S]]) extends EventsourcedRunningPhase { + def name = "PersistingEvents" + + final override def onCommand(c: Command): Behavior[Any] = { + stash(c) + same + } + + final override def onJournalResponse(response: Response): Behavior[Any] = { + log.debug("Received Journal response: {}", response) + response match { + case WriteMessageSuccess(p, id) ⇒ + // instanceId mismatch can happen for persistAsync and defer in case of actor restart + // while message is in flight, in that case we ignore the call to the handler + if (id == writerIdentity.instanceId) { + updateLastSequenceNr(p) + popApplyHandler(p.payload) + onWriteMessageComplete() + tryUnstash(context, applySideEffects(sideEffects)) + } else same + + case WriteMessageRejected(p, cause, id) ⇒ + // instanceId mismatch can happen for persistAsync and defer in case of actor restart + // while message is in flight, in that case the handler has already been discarded + if (id == writerIdentity.instanceId) { + updateLastSequenceNr(p) + onPersistRejected(cause, p.payload, p.sequenceNr) // does not stop + tryUnstash(context, applySideEffects(sideEffects)) + } else same + + case WriteMessageFailure(p, cause, id) ⇒ + // instanceId mismatch can happen for persistAsync and defer in case of actor restart + // while message is in flight, in that case the handler has already been discarded + if (id == writerIdentity.instanceId) { + onWriteMessageComplete() + onPersistFailureThenStop(cause, p.payload, p.sequenceNr) + } else same + + case WriteMessagesSuccessful ⇒ + // ignore + same + + case WriteMessagesFailed(_) ⇒ + // ignore + same // it will be stopped by the first WriteMessageFailure message; not applying side effects + + case _: LoopMessageSuccess ⇒ + // ignore, should never happen as there is no persistAsync in typed + same + } + } + + private def onWriteMessageComplete(): Unit = + tryBecomeHandlingCommands() + + private def onPersistRejected(cause: Throwable, event: Any, seqNr: Long): Unit = { + log.error( + cause, + "Rejected to persist event type [{}] with sequence number [{}] for persistenceId [{}] due to [{}].", + event.getClass.getName, seqNr, persistenceId, cause.getMessage) + } + + private def onPersistFailureThenStop(cause: Throwable, event: Any, seqNr: Long): Behavior[Any] = { + log.error(cause, "Failed to persist event type [{}] with sequence number [{}] for persistenceId [{}].", + event.getClass.getName, seqNr, persistenceId) + + // FIXME see #24479 for reconsidering the stopping behaviour + Behaviors.stopped + } + + } + + // the active phase switches between PersistingEvents and HandlingCommands; + // we do this via a var instead of behaviours to keep allocations down as this will be flip/flaping on every Persist effect + private[this] var phase: EventsourcedRunningPhase = HandlingCommands + + override def onMessage(msg: Any): Behavior[Any] = { + msg match { + // TODO explore crazy hashcode hack to make this match quicker...? + case SnapshotterResponse(r) ⇒ onSnapshotterResponse(r) + case JournalResponse(r) ⇒ phase.onJournalResponse(r) + case command: Command @unchecked ⇒ + // the above type-check does nothing, since Command is tun + // we cast explicitly to fail early in case of type mismatch + val c = command.asInstanceOf[Command] + phase.onCommand(c) + } + } + + // ---------- + + def applySideEffects(effects: immutable.Seq[ChainableEffect[_, S]]): Behavior[Any] = { + var res: Behavior[Any] = same + val it = effects.iterator + + // if at least one effect results in a `stop`, we need to stop + // manual loop implementation to avoid allocations and multiple scans + while (it.hasNext) { + val effect = it.next() + applySideEffect(effect) match { + case _: StoppedBehavior[_] ⇒ res = Behaviors.stopped + case _ ⇒ // nothing to do + } + } + + res + } + + def applySideEffect(effect: ChainableEffect[_, S]): Behavior[Any] = effect match { + case _: Stop.type @unchecked ⇒ + Behaviors.stopped + + case SideEffect(sideEffects) ⇒ + sideEffects(state) + same + + case _ ⇒ + throw new IllegalArgumentException(s"Not supported effect detected [${effect.getClass.getName}]!") + } + + def applyEvent(s: S, event: E): S = + eventHandler(s, event) + + @tailrec private def applyEffects(msg: Any, effect: EffectImpl[E, S], sideEffects: immutable.Seq[ChainableEffect[_, S]] = Nil): Behavior[Any] = { + if (log.isDebugEnabled) + log.debug(s"Handled command [{}], resulting effect: [{}], side effects: [{}]", msg.getClass.getName, effect, sideEffects.size) + + effect match { + case CompositeEffect(e, currentSideEffects) ⇒ + // unwrap and accumulate effects + applyEffects(msg, e, currentSideEffects ++ sideEffects) + + case Persist(event) ⇒ + // apply the event before persist so that validation exception is handled before persisting + // the invalid event, in case such validation is implemented in the event handler. + // also, ensure that there is an event handler for each single event + state = applyEvent(state, event) + val tags = tagger(event) + val eventToPersist = if (tags.isEmpty) event else Tagged(event, tags) + + internalPersist(eventToPersist, sideEffects) { _ ⇒ + if (snapshotWhen(state, event, sequenceNr)) + internalSaveSnapshot(state) + } + + case PersistAll(events) ⇒ + if (events.nonEmpty) { + // apply the event before persist so that validation exception is handled before persisting + // the invalid event, in case such validation is implemented in the event handler. + // also, ensure that there is an event handler for each single event + var count = events.size + var seqNr = sequenceNr + val (newState, shouldSnapshotAfterPersist) = events.foldLeft((state, false)) { + case ((currentState, snapshot), event) ⇒ + seqNr += 1 + val shouldSnapshot = snapshot || snapshotWhen(currentState, event, seqNr) + (applyEvent(currentState, event), shouldSnapshot) + } + state = newState + val eventsToPersist = events.map { event ⇒ + val tags = tagger(event) + if (tags.isEmpty) event else Tagged(event, tags) + } + + internalPersistAll(eventsToPersist, sideEffects) { _ ⇒ + count -= 1 + if (count == 0) { + sideEffects.foreach(applySideEffect) + if (shouldSnapshotAfterPersist) + internalSaveSnapshot(state) + } + } + } else { + // run side-effects even when no events are emitted + tryUnstash(context, applySideEffects(sideEffects)) + } + + case e: PersistNothing.type @unchecked ⇒ + tryUnstash(context, applySideEffects(sideEffects)) + + case _: Unhandled.type @unchecked ⇒ + applySideEffects(sideEffects) + Behavior.unhandled + + case c: ChainableEffect[_, S] ⇒ + applySideEffect(c) + } + } + + private def popApplyHandler(payload: Any): Unit = + pendingInvocations.pop().handler(payload) + + private def becomePersistingEvents(sideEffects: immutable.Seq[ChainableEffect[_, S]]): Behavior[Any] = { + if (phase.isInstanceOf[PersistingEvents]) throw new IllegalArgumentException( + "Attempted to become PersistingEvents while already in this phase! Logic error?") + + phase = + if (sideEffects.isEmpty) PersistingEventsNoSideEffects + else new PersistingEvents(sideEffects) + + same + } + + private def tryBecomeHandlingCommands(): Behavior[Any] = { + if (phase == HandlingCommands) throw new IllegalArgumentException( + "Attempted to become HandlingCommands while already in this phase! Logic error?") + + if (hasNoPendingInvocations) { // CAN THIS EVER NOT HAPPEN? + phase = HandlingCommands + } + + same + } + + // ---------- journal interactions --------- + + // Any since can be `E` or `Tagged` + private def internalPersist(event: Any, sideEffects: immutable.Seq[ChainableEffect[_, S]])(handler: Any ⇒ Unit): Behavior[Any] = { + pendingInvocations addLast StashingHandlerInvocation(event, handler.asInstanceOf[Any ⇒ Unit]) + + val senderNotKnownBecauseAkkaTyped = null + val repr = PersistentRepr(event, persistenceId = persistenceId, sequenceNr = nextSequenceNr(), writerUuid = writerIdentity.writerUuid, sender = senderNotKnownBecauseAkkaTyped) + + val eventBatch = AtomicWrite(repr) :: Nil // batching not used, since no persistAsync + journal.tell(JournalProtocol.WriteMessages(eventBatch, selfUntypedAdapted, writerIdentity.instanceId), selfUntypedAdapted) + + becomePersistingEvents(sideEffects) + } + + private def internalPersistAll(events: immutable.Seq[Any], sideEffects: immutable.Seq[ChainableEffect[_, S]])(handler: Any ⇒ Unit): Behavior[Any] = { + if (events.nonEmpty) { + val senderNotKnownBecauseAkkaTyped = null + + events.foreach { event ⇒ + pendingInvocations addLast StashingHandlerInvocation(event, handler.asInstanceOf[Any ⇒ Unit]) + } + + val write = AtomicWrite(events.map(PersistentRepr.apply(_, persistenceId = persistenceId, + sequenceNr = nextSequenceNr(), writerUuid = writerIdentity.writerUuid, sender = senderNotKnownBecauseAkkaTyped))) + + journal.tell(JournalProtocol.WriteMessages(write :: Nil, selfUntypedAdapted, writerIdentity.instanceId), selfUntypedAdapted) + + becomePersistingEvents(sideEffects) + } else same + } + + private def internalSaveSnapshot(snapshot: State): Unit = { + snapshotStore.tell(SnapshotProtocol.SaveSnapshot(SnapshotMetadata(persistenceId, snapshotSequenceNr), snapshot), selfUntypedAdapted) + } + + override def toString = s"EventsourcedRunning($persistenceId,${phase.name})" +} diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedStashManagement.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedStashManagement.scala new file mode 100644 index 0000000000..416ce45c74 --- /dev/null +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedStashManagement.scala @@ -0,0 +1,76 @@ +package akka.persistence.typed.internal + +import java.util.Locale + +import akka.actor.typed.{ ActorSystem, Behavior } +import akka.actor.{ DeadLetter, StashOverflowException } +import akka.actor.typed.scaladsl.{ ActorContext, StashBuffer } +import akka.annotation.InternalApi +import akka.event.Logging.LogLevel +import akka.event.{ Logging, LoggingAdapter } +import akka.persistence._ +import akka.util.ConstantFun +import akka.{ actor ⇒ a } + +/** INTERNAL API: Stash management for persistent behaviors */ +@InternalApi +private[akka] trait EventsourcedStashManagement { + import EventsourcedStashManagement._ + import akka.actor.typed.scaladsl.adapter._ + + protected def log: LoggingAdapter + + protected def extension: Persistence + protected def context: ActorContext[Any] + + protected val internalStash: StashBuffer[Any] + + private lazy val logLevel = { + val configuredLevel = extension.system.settings.config + .getString("akka.persistence.typed.log-stashing") + Logging.levelFor(configuredLevel).getOrElse(OffLevel) // this is OffLevel + } + + /** + * The returned [[StashOverflowStrategy]] object determines how to handle the message failed to stash + * when the internal Stash capacity exceeded. + */ + protected val internalStashOverflowStrategy: StashOverflowStrategy = + extension.defaultInternalStashOverflowStrategy match { + case ReplyToStrategy(_) ⇒ + throw new RuntimeException("ReplyToStrategy is not supported in Akka Typed, since there is no sender()!") + case other ⇒ + other // the other strategies are supported + } + + protected def stash(msg: Any): Unit = { + if (logLevel != OffLevel) log.log(logLevel, "Stashing message: {}", msg) + + try internalStash.stash(msg) catch { + case e: StashOverflowException ⇒ + internalStashOverflowStrategy match { + case DiscardToDeadLetterStrategy ⇒ + val snd: a.ActorRef = a.ActorRef.noSender // FIXME can we improve it somehow? + context.system.deadLetters.tell(DeadLetter(msg, snd, context.self.toUntyped)) + + case ReplyToStrategy(response) ⇒ + throw new RuntimeException("ReplyToStrategy does not make sense at all in Akka Typed, since there is no sender()!") + + case ThrowOverflowExceptionStrategy ⇒ + throw e + } + } + } + + protected def tryUnstash(ctx: ActorContext[Any], behavior: Behavior[Any]): Behavior[Any] = { + if (internalStash.nonEmpty) { + log.debug("Unstashing message: {}", internalStash.head.getClass) + internalStash.unstash(context, behavior, 1, ConstantFun.scalaIdentityFunction) + } else behavior + } + +} + +object EventsourcedStashManagement { + private val OffLevel = LogLevel(Int.MinValue) +} diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/PersistentActorImpl.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/PersistentActorImpl.scala deleted file mode 100644 index a29dcd2d70..0000000000 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/PersistentActorImpl.scala +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Copyright (C) 2017-2018 Lightbend Inc. - */ -package akka.persistence.typed.internal - -import akka.actor.ActorLogging -import akka.actor.typed.internal.adapter.ActorContextAdapter -import akka.annotation.InternalApi -import akka.persistence.journal.Tagged -import akka.persistence.typed.scaladsl.{ PersistentBehavior, PersistentBehaviors } -import akka.persistence.{ RecoveryCompleted, SaveSnapshotFailure, SaveSnapshotSuccess, SnapshotOffer, PersistentActor ⇒ UntypedPersistentActor } -import akka.{ actor ⇒ a } - -/** - * INTERNAL API - */ -@InternalApi private[akka] object PersistentActorImpl { - - /** - * Stop the actor for passivation. `PoisonPill` does not work well - * with persistent actors. - */ - case object StopForPassivation - - def props[C, E, S]( - behaviorFactory: () ⇒ PersistentBehavior[C, E, S]): a.Props = - a.Props(new PersistentActorImpl(behaviorFactory())) - -} - -/** - * INTERNAL API - * The `PersistentActor` that runs a `PersistentBehavior`. - */ -@InternalApi private[akka] class PersistentActorImpl[C, E, S]( - behavior: PersistentBehavior[C, E, S]) extends UntypedPersistentActor with ActorLogging { - - import PersistentBehaviors._ - - override val persistenceId: String = behavior.persistenceIdFromActorName(self.path.name) - - private var state: S = behavior.initialState - - private val commandHandler: CommandHandler[C, E, S] = behavior.commandHandler - - private val eventHandler: (S, E) ⇒ S = behavior.eventHandler - - private val ctxAdapter = new ActorContextAdapter[C](context) - private val ctx = ctxAdapter.asScala - - override def receiveRecover: Receive = { - case SnapshotOffer(_, snapshot) ⇒ - state = snapshot.asInstanceOf[S] - - case RecoveryCompleted ⇒ - behavior.recoveryCompleted(ctx, state) - - case event: E @unchecked ⇒ - state = applyEvent(state, event) - } - - def applyEvent(s: S, event: E): S = - eventHandler.apply(s, event) - - override def receiveCommand: Receive = { - case PersistentActorImpl.StopForPassivation ⇒ - context.stop(self) - - case SaveSnapshotSuccess(meta) ⇒ - log.debug("Snapshot saved: {}", meta) - case SaveSnapshotFailure(meta, thr) ⇒ - log.error(thr, "Snapshot failed: {}", meta) - - case msg ⇒ - try { - val effects = msg match { - case a.ReceiveTimeout ⇒ - commandHandler(ctx, state, ctxAdapter.receiveTimeoutMsg) - // TODO note that PostStop, PreRestart and Terminated signals are not handled, we wouldn't be able to persist there - case cmd: C @unchecked ⇒ - // FIXME we could make it more safe by using ClassTag for C - commandHandler(ctx, state, cmd) - } - applyEffects(msg, effects) - } catch { - case _: MatchError ⇒ throw new IllegalStateException( - s"Undefined state [${state.getClass.getName}] or handler for [${msg.getClass.getName} " + - s"in [${behavior.getClass.getName}] with persistenceId [$persistenceId]") - } - } - - private def applyEffects(msg: Any, effect: Effect[E, S], sideEffects: Seq[ChainableEffect[_, S]] = Nil): Unit = effect match { - case CompositeEffect(persist, currentSideEffects) ⇒ - applyEffects(msg, persist, currentSideEffects ++ sideEffects) - case Persist(event) ⇒ - // apply the event before persist so that validation exception is handled before persisting - // the invalid event, in case such validation is implemented in the event handler. - // also, ensure that there is an event handler for each single event - state = applyEvent(state, event) - val tags = behavior.tagger(event) - val eventToPersist = if (tags.isEmpty) event else Tagged(event, tags) - persist(eventToPersist) { _ ⇒ - sideEffects.foreach(applySideEffect) - if (shouldSnapshot(state, event, lastSequenceNr)) - saveSnapshot(state) - } - case PersistAll(events) ⇒ - if (events.nonEmpty) { - // apply the event before persist so that validation exception is handled before persisting - // the invalid event, in case such validation is implemented in the event handler. - // also, ensure that there is an event handler for each single event - var count = events.size - var seqNr = lastSequenceNr - val (newState, shouldSnapshotAfterPersist) = events.foldLeft((state, false)) { - case ((currentState, snapshot), event) ⇒ - seqNr += 1 - (applyEvent(currentState, event), snapshot || shouldSnapshot(currentState, event, seqNr)) - } - state = newState - val eventsToPersist = events.map { event ⇒ - val tags = behavior.tagger(event) - if (tags.isEmpty) event else Tagged(event, tags) - } - persistAll(eventsToPersist) { _ ⇒ - count -= 1 - if (count == 0) { - sideEffects.foreach(applySideEffect) - if (shouldSnapshotAfterPersist) - saveSnapshot(state) - } - } - } else { - // run side-effects even when no events are emitted - sideEffects.foreach(applySideEffect) - } - case _: PersistNothing.type @unchecked ⇒ - // FIXME: Why don't we do the side effects here?? - sideEffects.foreach(applySideEffect) - case _: Unhandled.type @unchecked ⇒ - // FIXME: Why don't we do the side effects here?? We do allow users to add them - super.unhandled(msg) - case c: ChainableEffect[_, S] ⇒ - applySideEffect(c) - } - - def applySideEffect(effect: ChainableEffect[_, S]): Unit = effect match { - case _: Stop.type @unchecked ⇒ - context.stop(self) - case SideEffect(callbacks) ⇒ callbacks.apply(state) - } - - private def shouldSnapshot(state: S, event: E, sequenceNr: Long): Boolean = { - behavior.snapshotOn(state, event, sequenceNr) - } - -} - diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/Effect.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/Effect.scala index eb1a9b5c7e..6778c3582f 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/Effect.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/Effect.scala @@ -4,8 +4,7 @@ package akka.persistence.typed.javadsl import akka.annotation.DoNotInherit -import akka.persistence.typed.scaladsl.PersistentBehaviors._ -import akka.japi.{ function ⇒ japi } +import akka.japi.function import akka.persistence.typed.internal._ import scala.collection.JavaConverters._ @@ -53,10 +52,10 @@ object EffectFactory extends EffectFactories[Nothing, Nothing, Nothing] @DoNotInherit abstract class Effect[+Event, State] { self: EffectImpl[Event, State] ⇒ /** Convenience method to register a side effect with just a callback function */ - final def andThen(callback: japi.Procedure[State]): Effect[Event, State] = + final def andThen(callback: function.Procedure[State]): Effect[Event, State] = CompositeEffect(this, SideEffect[Event, State](s ⇒ callback.apply(s))) /** Convenience method to register a side effect that doesn't need access to state */ - final def andThen(callback: japi.Effect): Effect[Event, State] = + final def andThen(callback: function.Effect): Effect[Event, State] = CompositeEffect(this, SideEffect[Event, State]((_: State) ⇒ callback.apply())) } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/PersistentBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/PersistentBehavior.scala new file mode 100644 index 0000000000..0c335d6623 --- /dev/null +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/PersistentBehavior.scala @@ -0,0 +1,105 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ +package akka.persistence.typed.javadsl + +import java.util.Collections + +import akka.actor.typed.Behavior.UntypedPropsBehavior +import akka.actor.typed.internal.adapter.PropsAdapter +import akka.actor.typed.javadsl.ActorContext +import akka.annotation.{ ApiMayChange, InternalApi } +import akka.persistence.typed._ +import akka.persistence.typed.internal._ + +/** Java API */ +@ApiMayChange +abstract class PersistentBehavior[Command, Event, State >: Null](val persistenceId: String) extends UntypedPropsBehavior[Command] { + + /** + * Factory of effects. + * + * Return effects from your handlers in order to instruct persistence on how to act on the incoming message (i.e. persist events). + */ + protected final def Effect: EffectFactories[Command, Event, State] = EffectFactory.asInstanceOf[EffectFactories[Command, Event, State]] + + /** + * Implement by returning the initial state object. + * This object will be passed into this behaviors handlers, until a new state replaces it. + * + * Also known as "zero state" or "neutral state". + */ + protected def initialState: State + + /** + * Implement by handling incoming commands and return an `Effect()` to persist or signal other effects + * of the command handling such as stopping the behavior or others. + * + * This method is only invoked when the actor is running (i.e. not recovering). + * While the actor is persisting events, the incoming messages are stashed and only + * delivered to the handler once persisting them has completed. + */ + protected def commandHandler(): CommandHandler[Command, Event, State] + + /** + * Implement by applying the event to the current state in order to return a new state. + * + * This method invoked during recovery as well as running operation of this behavior, + * in order to keep updating the state state. + * + * For that reason it is strongly discouraged to perform side-effects in this handler; + * Side effects should be executed in `andThen` or `recoveryCompleted` blocks. + */ + protected def eventHandler(): EventHandler[Event, State] + + /** + * @return A new, mutable, by state command handler builder + */ + protected final def commandHandlerBuilder(): CommandHandlerBuilder[Command, Event, State] = + new CommandHandlerBuilder[Command, Event, State]() + + /** + * @return A new, mutable, by state command handler builder + */ + protected final def byStateCommandHandlerBuilder(): ByStateCommandHandlerBuilder[Command, Event, State] = + new ByStateCommandHandlerBuilder[Command, Event, State]() + + /** + * @return A new, mutable, event handler builder + */ + protected final def eventHandlerBuilder(): EventHandlerBuilder[Event, State] = + new EventHandlerBuilder[Event, State]() + + /** + * The `callback` function is called to notify the actor that the recovery process + * is finished. + */ + def onRecoveryCompleted(ctx: ActorContext[Command], state: State): Unit = {} + + /** + * Initiates a snapshot if the given function returns true. + * When persisting multiple events at once the snapshot is triggered after all the events have + * been persisted. + * + * `predicate` receives the State, Event and the sequenceNr used for the Event + */ + def shouldSnapshot(state: State, event: Event, sequenceNr: Long): Boolean = false + /** + * The `tagger` function should give event tags, which will be used in persistence query + */ + def tagsFor(event: Event): java.util.Set[String] = Collections.emptySet() + + /** INTERNAL API */ + @InternalApi private[akka] override def untypedProps(props: akka.actor.typed.Props): akka.actor.Props = { + val behaviorImpl = scaladsl.PersistentBehaviors.immutable[Command, Event, State]( + persistenceId, + initialState, + (c, state, cmd) ⇒ commandHandler()(c.asJava, state, cmd).asInstanceOf[EffectImpl[Event, State]], + eventHandler()(_, _) + ) + + PropsAdapter(() ⇒ behaviorImpl, props) + } + +} + diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/PersistentBehaviors.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/PersistentBehaviors.scala deleted file mode 100644 index 271c0addfa..0000000000 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/PersistentBehaviors.scala +++ /dev/null @@ -1,78 +0,0 @@ -/** - * Copyright (C) 2018 Lightbend Inc. - */ -package akka.persistence.typed.javadsl - -import java.util.Collections - -import akka.actor.typed.Behavior.UntypedBehavior -import akka.actor.typed.javadsl.ActorContext -import akka.annotation.ApiMayChange -import akka.persistence.typed._ -import akka.persistence.typed.internal._ - -import scala.collection.JavaConverters._ - -@ApiMayChange abstract class PersistentBehavior[Command, Event, State >: Null](val persistenceId: String) extends UntypedBehavior[Command] { - - def Effect: EffectFactories[Command, Event, State] = EffectFactory.asInstanceOf[EffectFactories[Command, Event, State]] - - val initialState: State - - def commandHandler(): CommandHandler[Command, Event, State] - - def eventHandler(): EventHandler[Event, State] - - /** - * @return A new, mutable, by state command handler builder - */ - protected final def commandHandlerBuilder(): CommandHandlerBuilder[Command, Event, State] = - new CommandHandlerBuilder[Command, Event, State]() - - /** - * @return A new, mutable, by state command handler builder - */ - protected final def byStateCommandHandlerBuilder(): ByStateCommandHandlerBuilder[Command, Event, State] = - new ByStateCommandHandlerBuilder[Command, Event, State]() - - /** - * @return A new, mutable, builder - */ - protected final def eventHandlerBuilder(): EventHandlerBuilder[Event, State] = - new EventHandlerBuilder[Event, State]() - - /** - * The `callback` function is called to notify the actor that the recovery process - * is finished. - */ - def onRecoveryCompleted(ctx: ActorContext[Command], state: State): Unit = {} - - /** - * Initiates a snapshot if the given function returns true. - * When persisting multiple events at once the snapshot is triggered after all the events have - * been persisted. - * - * `predicate` receives the State, Event and the sequenceNr used for the Event - */ - def shouldSnapshot(state: State, event: Event, sequenceNr: Long): Boolean = false - /** - * The `tagger` function should give event tags, which will be used in persistence query - */ - def tagsFor(event: Event): java.util.Set[String] = Collections.emptySet() - - /** - * INTERNAL API - */ - override private[akka] def untypedProps = { - new scaladsl.PersistentBehavior[Command, Event, State]( - _ ⇒ persistenceId, - initialState, - (ctx, s, e) ⇒ commandHandler.apply(ctx.asJava, s, e).asInstanceOf[EffectImpl[Event, State]], - (s: State, e: Event) ⇒ eventHandler().apply(s, e), - (ctx, s) ⇒ onRecoveryCompleted(ctx.asJava, s), - e ⇒ tagsFor(e).asScala.toSet, - shouldSnapshot - ).untypedProps - } -} - diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/Effect.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/Effect.scala new file mode 100644 index 0000000000..e2d31faa15 --- /dev/null +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/Effect.scala @@ -0,0 +1,78 @@ +/** + * Copyright (C) 2017-2018 Lightbend Inc. + */ +package akka.persistence.typed.scaladsl + +import akka.actor.typed.scaladsl.Behaviors +import akka.annotation.DoNotInherit +import akka.persistence.typed.internal._ + +import scala.collection.{ immutable ⇒ im } + +/** + * Factories for effects - how a persistent actor reacts on a command + */ +object Effect { + + // TODO docs + def persist[Event, State](event: Event): Effect[Event, State] = Persist(event) + + // TODO docs + def persist[Event, A <: Event, B <: Event, State](evt1: A, evt2: B, events: Event*): Effect[Event, State] = + persist(evt1 :: evt2 :: events.toList) + + // TODO docs + def persist[Event, State](eventOpt: Option[Event]): Effect[Event, State] = + eventOpt match { + case Some(evt) ⇒ persist[Event, State](evt) + case _ ⇒ none[Event, State] + } + + // TODO docs + def persist[Event, State](events: im.Seq[Event]): Effect[Event, State] = + PersistAll(events) + + // TODO docs + def persist[Event, State](events: im.Seq[Event], sideEffects: im.Seq[ChainableEffect[Event, State]]): Effect[Event, State] = + new CompositeEffect[Event, State](PersistAll[Event, State](events), sideEffects) + + /** + * Do not persist anything + */ + def none[Event, State]: Effect[Event, State] = PersistNothing.asInstanceOf[Effect[Event, State]] + + /** + * This command is not handled, but it is not an error that it isn't. + */ + def unhandled[Event, State]: Effect[Event, State] = Unhandled.asInstanceOf[Effect[Event, State]] + + /** + * Stop this persistent actor + */ + def stop[Event, State]: ChainableEffect[Event, State] = Stop.asInstanceOf[ChainableEffect[Event, State]] +} + +/** + * Instances are created through the factories in the [[Effect]] companion object. + * + * Not for user extension. + */ +@DoNotInherit +trait Effect[+Event, State] extends akka.persistence.typed.javadsl.Effect[Event, State] { self: EffectImpl[Event, State] ⇒ + /* All events that will be persisted in this effect */ + def events: im.Seq[Event] + + def sideEffects[E >: Event]: im.Seq[ChainableEffect[E, State]] + + /** Convenience method to register a side effect with just a callback function */ + final def andThen(callback: State ⇒ Unit): Effect[Event, State] = + CompositeEffect(this, SideEffect[Event, State](callback)) + + /** Convenience method to register a side effect with just a lazy expression */ + final def andThen(callback: ⇒ Unit): Effect[Event, State] = + CompositeEffect(this, SideEffect[Event, State]((_: State) ⇒ callback)) + + /** The side effect is to stop the actor */ + def andThenStop: Effect[Event, State] = + CompositeEffect(this, Effect.stop[Event, State]) +} diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentBehaviors.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentBehaviors.scala index 58e17f4eed..9fe6e2828d 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentBehaviors.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentBehaviors.scala @@ -3,102 +3,52 @@ */ package akka.persistence.typed.scaladsl -import akka.actor.typed.Behavior.UntypedBehavior -import akka.actor.typed.scaladsl.ActorContext +import akka.actor.typed.Behavior +import akka.actor.typed.Behavior.DeferredBehavior +import akka.actor.typed.internal.TimerSchedulerImpl +import akka.actor.typed.scaladsl.{ ActorContext, TimerScheduler } import akka.annotation.{ DoNotInherit, InternalApi } +import akka.persistence.{ Recovery, SnapshotSelectionCriteria } import akka.persistence.typed.internal._ +import akka.persistence.typed.scaladsl.PersistentBehaviors.CommandHandler +import akka.util.ConstantFun -import scala.collection.{ immutable ⇒ im } +import scala.language.implicitConversions object PersistentBehaviors { + // we use this type internally, however it's easier for users to understand the function, so we use it in external api + type CommandHandler[Command, Event, State] = (ActorContext[Command], State, Command) ⇒ Effect[Event, State] + /** * Create a `Behavior` for a persistent actor. */ def immutable[Command, Event, State]( persistenceId: String, initialState: State, - commandHandler: CommandHandler[Command, Event, State], - eventHandler: (State, Event) ⇒ State): PersistentBehavior[Command, Event, State] = { - // FIXME remove `persistenceIdFromActorName: String ⇒ String` from PersistentBehavior - new PersistentBehavior(_ ⇒ persistenceId, initialState, commandHandler, eventHandler, - recoveryCompleted = (_, _) ⇒ (), - tagger = _ ⇒ Set.empty, - snapshotOn = (_, _, _) ⇒ false) - } + commandHandler: (ActorContext[Command], State, Command) ⇒ Effect[Event, State], + eventHandler: (State, Event) ⇒ State): PersistentBehavior[Command, Event, State] = + new PersistentBehavior( + persistenceId = persistenceId, + initialState = initialState, + commandHandler = commandHandler, + eventHandler = eventHandler + ) /** - * Factories for effects - how a persistent actor reacts on a command - */ - object Effect { - - def persist[Event, State](event: Event): Effect[Event, State] = - Persist(event) - - def persist[Event, A <: Event, B <: Event, State](evt1: A, evt2: B, events: Event*): Effect[Event, State] = - persist(evt1 :: evt2 :: events.toList) - - def persist[Event, State](eventOpt: Option[Event]): Effect[Event, State] = - eventOpt match { - case Some(evt) ⇒ persist[Event, State](evt) - case _ ⇒ none[Event, State] - } - - def persist[Event, State](events: im.Seq[Event]): Effect[Event, State] = - PersistAll(events) - - def persist[Event, State](events: im.Seq[Event], sideEffects: im.Seq[ChainableEffect[Event, State]]): Effect[Event, State] = - new CompositeEffect[Event, State](PersistAll[Event, State](events), sideEffects) - - /** - * Do not persist anything - */ - def none[Event, State]: Effect[Event, State] = PersistNothing.asInstanceOf[Effect[Event, State]] - - /** - * This command is not handled, but it is not an error that it isn't. - */ - def unhandled[Event, State]: Effect[Event, State] = Unhandled.asInstanceOf[Effect[Event, State]] - - /** - * Stop this persistent actor - */ - def stop[Event, State]: ChainableEffect[Event, State] = Stop.asInstanceOf[ChainableEffect[Event, State]] - } - - /** - * Instances are created through the factories in the [[Effect]] companion object. + * Create a `Behavior` for a persistent actor in Cluster Sharding, when the persistenceId is not known + * until the actor is started and typically based on the entityId, which + * is the actor name. * - * Not for user extension. + * TODO This will not be needed when it can be wrapped in `Actor.deferred`. */ - @DoNotInherit - trait Effect[+Event, State] { - self: EffectImpl[Event, State] ⇒ - /* All events that will be persisted in this effect */ - def events: im.Seq[Event] - - def sideEffects[E >: Event]: im.Seq[ChainableEffect[E, State]] - - /** Convenience method to register a side effect with just a callback function */ - final def andThen(callback: State ⇒ Unit): Effect[Event, State] = - CompositeEffect(this, SideEffect[Event, State](callback)) - - /** Convenience method to register a side effect with just a lazy expression */ - final def andThen(callback: ⇒ Unit): Effect[Event, State] = - CompositeEffect(this, SideEffect[Event, State]((_: State) ⇒ callback)) - - /** The side effect is to stop the actor */ - def andThenStop: Effect[Event, State] = - CompositeEffect(this, Effect.stop[Event, State]) - } - - /** - * Not for user extension - */ - @DoNotInherit - abstract class ChainableEffect[Event, State] extends EffectImpl[Event, State] - - type CommandHandler[Command, Event, State] = (ActorContext[Command], State, Command) ⇒ Effect[Event, State] + @Deprecated // FIXME remove this + def persistentEntity[Command, Event, State]( + persistenceIdFromActorName: String ⇒ String, + initialState: State, + commandHandler: (ActorContext[Command], State, Command) ⇒ Effect[Event, State], + eventHandler: (State, Event) ⇒ State): PersistentBehavior[Command, Event, State] = + ??? /** * The `CommandHandler` defines how to act on commands. @@ -137,19 +87,62 @@ object PersistentBehaviors { } } -class PersistentBehavior[Command, Event, State]( - @InternalApi private[akka] val persistenceIdFromActorName: String ⇒ String, - val initialState: State, - val commandHandler: PersistentBehaviors.CommandHandler[Command, Event, State], - val eventHandler: (State, Event) ⇒ State, - val recoveryCompleted: (ActorContext[Command], State) ⇒ Unit, - val tagger: Event ⇒ Set[String], - val snapshotOn: (State, Event, Long) ⇒ Boolean) extends UntypedBehavior[Command] { +@DoNotInherit +class PersistentBehavior[Command, Event, State] private ( + val persistenceId: String, + val initialState: State, + val commandHandler: PersistentBehaviors.CommandHandler[Command, Event, State], + val eventHandler: (State, Event) ⇒ State, - import PersistentBehaviors._ + val recoveryCompleted: (ActorContext[Command], State) ⇒ Unit, + val tagger: Event ⇒ Set[String], + val journalPluginId: String, + val snapshotPluginId: String, + val snapshotWhen: (State, Event, Long) ⇒ Boolean, + val recovery: Recovery +) extends DeferredBehavior[Command](ctx ⇒ + TimerSchedulerImpl.wrapWithTimers[Command] { timers ⇒ + val callbacks = EventsourcedCallbacks[Command, Event, State]( + initialState, + commandHandler, + eventHandler, + snapshotWhen, + recoveryCompleted, + tagger + ) + val pluginIds = EventsourcedPluginIds( + journalPluginId, + snapshotPluginId + ) + new EventsourcedRequestingRecoveryPermit( + persistenceId, + ctx.asInstanceOf[ActorContext[Any]], // sorry + timers.asInstanceOf[TimerScheduler[Any]], // sorry + recovery, + callbacks, + pluginIds + ).narrow[Command] - /** INTERNAL API */ - @InternalApi private[akka] override def untypedProps: akka.actor.Props = PersistentActorImpl.props(() ⇒ this) + }(ctx)) { + + def this( + persistenceId: String, + initialState: State, + commandHandler: PersistentBehaviors.CommandHandler[Command, Event, State], + eventHandler: (State, Event) ⇒ State) { + this( + persistenceId, + initialState, + commandHandler, + eventHandler, + recoveryCompleted = ConstantFun.scalaAnyTwoToUnit, + tagger = (_: Event) ⇒ Set.empty[String], + journalPluginId = "" /* default plugin */ , + snapshotPluginId = "" /* default plugin */ , + snapshotWhen = ConstantFun.scalaAnyThreeToFalse, + recovery = Recovery() + ) + } /** * The `callback` function is called to notify the actor that the recovery process @@ -165,8 +158,8 @@ class PersistentBehavior[Command, Event, State]( * * `predicate` receives the State, Event and the sequenceNr used for the Event */ - def snapshotOn(predicate: (State, Event, Long) ⇒ Boolean): PersistentBehavior[Command, Event, State] = - copy(snapshotOn = predicate) + def snapshotWhen(predicate: (State, Event, Long) ⇒ Boolean): PersistentBehavior[Command, Event, State] = + copy(snapshotWhen = predicate) /** * Snapshot every N events @@ -175,7 +168,35 @@ class PersistentBehavior[Command, Event, State]( */ def snapshotEvery(numberOfEvents: Long): PersistentBehavior[Command, Event, State] = { require(numberOfEvents > 0, s"numberOfEvents should be positive: Was $numberOfEvents") - copy(snapshotOn = (_, _, seqNr) ⇒ seqNr % numberOfEvents == 0) + copy(snapshotWhen = (_, _, seqNr) ⇒ seqNr % numberOfEvents == 0) + } + + /** + * Change the journal plugin id that this actor should use. + */ + def withPersistencePluginId(id: String): PersistentBehavior[Command, Event, State] = { + require(id != null, "persistence plugin id must not be null; use empty string for 'default' journal") + copy(journalPluginId = id) + } + + /** + * Change the snapshot store plugin id that this actor should use. + */ + def withSnapshotPluginId(id: String): PersistentBehavior[Command, Event, State] = { + require(id != null, "snapshot plugin id must not be null; use empty string for 'default' snapshot store") + copy(snapshotPluginId = id) + } + + /** + * Changes the snapshot selection criteria used by this behavior. + * By default the most recent snapshot is used, and the remaining state updates are recovered by replaying events + * from the sequence number up until which the snapshot reached. + * + * You may configure the behavior to skip recovering snapshots completely, in which case the recovery will be + * performed by replaying all events -- which may take a long time. + */ + def withSnapshotSelectionCriteria(selection: SnapshotSelectionCriteria): PersistentBehavior[Command, Event, State] = { + copy(recovery = Recovery(selection)) } /** @@ -185,12 +206,26 @@ class PersistentBehavior[Command, Event, State]( copy(tagger = tagger) private def copy( - persistenceIdFromActorName: String ⇒ String = persistenceIdFromActorName, - initialState: State = initialState, - commandHandler: CommandHandler[Command, Event, State] = commandHandler, - eventHandler: (State, Event) ⇒ State = eventHandler, - recoveryCompleted: (ActorContext[Command], State) ⇒ Unit = recoveryCompleted, - tagger: Event ⇒ Set[String] = tagger, - snapshotOn: (State, Event, Long) ⇒ Boolean = snapshotOn): PersistentBehavior[Command, Event, State] = - new PersistentBehavior(persistenceIdFromActorName, initialState, commandHandler, eventHandler, recoveryCompleted, tagger, snapshotOn) + initialState: State = initialState, + commandHandler: CommandHandler[Command, Event, State] = commandHandler, + eventHandler: (State, Event) ⇒ State = eventHandler, + recoveryCompleted: (ActorContext[Command], State) ⇒ Unit = recoveryCompleted, + tagger: Event ⇒ Set[String] = tagger, + snapshotWhen: (State, Event, Long) ⇒ Boolean = snapshotWhen, + journalPluginId: String = journalPluginId, + snapshotPluginId: String = snapshotPluginId, + recovery: Recovery = recovery): PersistentBehavior[Command, Event, State] = + new PersistentBehavior[Command, Event, State]( + persistenceId = persistenceId, + initialState = initialState, + commandHandler = commandHandler, + eventHandler = eventHandler, + recoveryCompleted = recoveryCompleted, + tagger = tagger, + journalPluginId = journalPluginId, + snapshotPluginId = snapshotPluginId, + snapshotWhen = snapshotWhen, + recovery = recovery) + } + diff --git a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorTest.java b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorTest.java index 8d7ea07599..54db4b9a40 100644 --- a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorTest.java +++ b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorTest.java @@ -7,8 +7,8 @@ import akka.actor.typed.ActorRef; import akka.actor.typed.javadsl.Behaviors; import akka.japi.Pair; import akka.japi.function.Function3; -import akka.persistence.typed.scaladsl.PersistentActorSpec; -import akka.persistence.typed.scaladsl.PersistentActorSpec$; +import akka.persistence.typed.scaladsl.PersistentBehaviorSpec; +import akka.persistence.typed.scaladsl.PersistentBehaviorSpec$; import akka.testkit.AkkaJUnitActorSystemResource; import akka.testkit.typed.javadsl.TestKitJunitResource; import akka.testkit.typed.scaladsl.ActorTestKit; @@ -26,7 +26,7 @@ import static org.junit.Assert.assertEquals; public class PersistentActorTest { @ClassRule - public static final TestKitJunitResource testKit = new TestKitJunitResource(PersistentActorSpec$.MODULE$.config()); + public static final TestKitJunitResource testKit = new TestKitJunitResource(PersistentBehaviorSpec$.MODULE$.config()); static final Incremented timeoutEvent = new Incremented(100); static final State emptyState = new State(0, Collections.emptyList()); diff --git a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorsTest.java b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorsTest.java index b213035df9..214687193d 100644 --- a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorsTest.java +++ b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorsTest.java @@ -30,22 +30,12 @@ public class BasicPersistentBehaviorsTest { @Override public CommandHandler commandHandler() { - return new CommandHandler() { - @Override - public Effect apply(ActorContext ctx, State state, Command command) { - return Effect().none(); - } - }; + return (ctx, state, command) -> Effect().none(); } @Override public EventHandler eventHandler() { - return new EventHandler() { - @Override - public State apply(State state, Event event) { - return state; - } - }; + return (state, event) -> state; } //#recovery diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala index 845f43b63c..f0c622d3a8 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala @@ -199,7 +199,7 @@ object PersistentActorCompileOnlyTest { eventHandler = (state, evt) ⇒ evt match { case TaskRegistered(task) ⇒ State(task :: state.tasksInFlight) case TaskRemoved(task) ⇒ State(state.tasksInFlight.filter(_ != task)) - }).snapshotOn { (state, e, seqNr) ⇒ state.tasksInFlight.isEmpty } + }).snapshotWhen { (state, e, seqNr) ⇒ state.tasksInFlight.isEmpty } } object SpawnChild { diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorSpec.scala similarity index 84% rename from akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorSpec.scala rename to akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorSpec.scala index 746fd64858..e3758dfff7 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorSpec.scala @@ -3,31 +3,31 @@ */ package akka.persistence.typed.scaladsl +import akka.actor.ActorSystemImpl import akka.actor.typed.scaladsl.Behaviors -import akka.actor.typed.{ ActorRef, ActorSystem, SupervisorStrategy, Terminated, TypedAkkaSpecWithShutdown } +import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, SupervisorStrategy, Terminated, TypedAkkaSpecWithShutdown } import akka.persistence.snapshot.SnapshotStore import akka.persistence.typed.scaladsl.PersistentBehaviors._ import akka.persistence.{ SelectedSnapshot, SnapshotMetadata, SnapshotSelectionCriteria } import akka.testkit.typed.TestKitSettings import akka.testkit.typed.scaladsl._ -import com.typesafe.config.ConfigFactory +import com.typesafe.config.{ Config, ConfigFactory } import org.scalatest.concurrent.Eventually import scala.concurrent.Future import scala.concurrent.duration._ -object PersistentActorSpec { +object PersistentBehaviorSpec { class InMemorySnapshotStore extends SnapshotStore { + private var state = Map.empty[String, (Any, SnapshotMetadata)] def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = { - log.debug("loadAsync: {} {}", persistenceId, criteria) Future.successful(state.get(persistenceId).map(r ⇒ SelectedSnapshot(r._2, r._1))) } def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = { - log.debug("saveAsync: {} {}", metadata, snapshot) state += (metadata.persistenceId -> (snapshot, metadata)) Future.successful(()) } @@ -39,13 +39,18 @@ object PersistentActorSpec { val config = ConfigFactory.parseString( s""" akka.loglevel = INFO - akka.persistence.snapshot-store.inmem.class = "akka.persistence.typed.scaladsl.PersistentActorSpec$$InMemorySnapshotStore" + # akka.persistence.typed.log-stashing = INFO + + akka.persistence.snapshot-store.inmem.class = "akka.persistence.typed.scaladsl.PersistentBehaviorSpec$$InMemorySnapshotStore" akka.persistence.journal.plugin = "akka.persistence.journal.inmem" akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.inmem" + """) sealed trait Command final case object Increment extends Command + final case object IncrementThenLogThenStop extends Command + final case object IncrementTwiceThenLogThenStop extends Command final case class IncrementWithPersistAll(nr: Int) extends Command final case object IncrementLater extends Command final case object IncrementAfterReceiveTimeout extends Command @@ -86,11 +91,28 @@ object PersistentActorSpec { commandHandler = (ctx, state, cmd) ⇒ cmd match { case Increment ⇒ Effect.persist(Incremented(1)) + + case IncrementThenLogThenStop ⇒ + Effect.persist(Incremented(1)) + .andThen { + loggingActor ! firstLogging + } + .andThenStop + + case IncrementTwiceThenLogThenStop ⇒ + Effect.persist(Incremented(1), Incremented(2)) + .andThen { + loggingActor ! firstLogging + } + .andThenStop + case IncrementWithPersistAll(n) ⇒ Effect.persist((0 until n).map(_ ⇒ Incremented(1))) + case GetValue(replyTo) ⇒ replyTo ! state Effect.none + case IncrementLater ⇒ // purpose is to test signals val delay = ctx.spawnAnonymous(Behaviors.withTimers[Tick.type] { timers ⇒ @@ -101,14 +123,18 @@ object PersistentActorSpec { }) ctx.watchWith(delay, DelayFinished) Effect.none + case DelayFinished ⇒ Effect.persist(Incremented(10)) + case IncrementAfterReceiveTimeout ⇒ ctx.setReceiveTimeout(10.millis, Timeout) Effect.none + case Timeout ⇒ ctx.cancelReceiveTimeout() Effect.persist(Incremented(100)) + case IncrementTwiceAndThenLog ⇒ Effect .persist(Incremented(1), Incremented(1)) @@ -132,25 +158,29 @@ object PersistentActorSpec { .andThen { loggingActor ! firstLogging } + case LogThenStop ⇒ - Effect.none.andThen { - loggingActor ! firstLogging - }.andThenStop + Effect.none + .andThen { + loggingActor ! firstLogging + } + .andThenStop }, eventHandler = (state, evt) ⇒ evt match { case Incremented(delta) ⇒ - probe ! (state, evt) + probe ! ((state, evt)) State(state.value + delta, state.history :+ state.value) }) } } -class PersistentActorSpec extends ActorTestKit with Eventually with TypedAkkaSpecWithShutdown { +class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown with Eventually { + import PersistentBehaviorSpec._ - override def config = PersistentActorSpec.config + override def config: Config = PersistentBehaviorSpec.config - import PersistentActorSpec._ + implicit val testSettings = TestKitSettings(system) "A typed persistent actor" must { @@ -171,7 +201,7 @@ class PersistentActorSpec extends ActorTestKit with Eventually with TypedAkkaSpe c ! Increment c ! Increment c ! GetValue(probe.ref) - probe.expectMessage(State(3, Vector(0, 1, 2))) + probe.expectMessage(10.seconds, State(3, Vector(0, 1, 2))) val c2 = spawn(counter("c2")) c2 ! GetValue(probe.ref) @@ -224,6 +254,27 @@ class PersistentActorSpec extends ActorTestKit with Eventually with TypedAkkaSpe loggingProbe.expectMessage(secondLogging) } + "persist then stop" in { + val loggingProbe = TestProbe[String] + val c = spawn(counter("c5a", loggingProbe.ref)) + val watchProbe = watcher(c) + + c ! IncrementThenLogThenStop + loggingProbe.expectMessage(firstLogging) + watchProbe.expectMessage("Terminated") + } + + "persist(All) then stop" in { + val loggingProbe = TestProbe[String] + val c = spawn(counter("c5b", loggingProbe.ref)) + val watchProbe = watcher(c) + + c ! IncrementTwiceThenLogThenStop + loggingProbe.expectMessage(firstLogging) + watchProbe.expectMessage("Terminated") + + } + /** Proves that side-effects are called when emitting an empty list of events */ "chainable side effects without events" in { val loggingProbe = TestProbe[String] @@ -249,10 +300,6 @@ class PersistentActorSpec extends ActorTestKit with Eventually with TypedAkkaSpe } "work when wrapped in other behavior" in { - // FIXME This is a major problem with current implementation. Since the - // behavior is running as an untyped PersistentActor it's not possible to - // wrap it in Actor.setup or Actor.supervise - pending val probe = TestProbe[State] val behavior = Behaviors.supervise[Command](counter("c13")) .onFailure(SupervisorStrategy.restartWithBackoff(1.second, 10.seconds, 0.1)) @@ -262,7 +309,7 @@ class PersistentActorSpec extends ActorTestKit with Eventually with TypedAkkaSpe probe.expectMessage(State(1, Vector(0))) } - "stop after persisting" in { + "stop after logging (no persisting)" in { val loggingProbe = TestProbe[String] val c: ActorRef[Command] = spawn(counter("c8", loggingProbe.ref)) val watchProbe = watcher(c) @@ -272,7 +319,7 @@ class PersistentActorSpec extends ActorTestKit with Eventually with TypedAkkaSpe } "snapshot via predicate" in { - val alwaysSnapshot = counter("c9").snapshotOn { (_, _, _) ⇒ true } + val alwaysSnapshot = counter("c9").snapshotWhen { (_, _, _) ⇒ true } val c = spawn(alwaysSnapshot) val watchProbe = watcher(c) val replyProbe = TestProbe[State]() @@ -293,9 +340,7 @@ class PersistentActorSpec extends ActorTestKit with Eventually with TypedAkkaSpe } "check all events for snapshot in PersistAll" in { - val snapshotAtTwo = counter("c11").snapshotOn { (s, e, _) ⇒ - s.value == 2 - } + val snapshotAtTwo = counter("c11").snapshotWhen { (s, _, _) ⇒ s.value == 2 } val c: ActorRef[Command] = spawn(snapshotAtTwo) val watchProbe = watcher(c) val replyProbe = TestProbe[State]() @@ -325,7 +370,7 @@ class PersistentActorSpec extends ActorTestKit with Eventually with TypedAkkaSpe c ! LogThenStop watchProbe.expectMessage("Terminated") - // no shapshot should have happened + // no snapshot should have happened val probeC2 = TestProbe[(State, Event)]() val c2 = spawn(counterWithProbe("c10", probeC2.ref).snapshotEvery(2)) probeC2.expectMessage[(State, Event)]((State(0, Vector()), Incremented(1))) diff --git a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentBehaviorSpec.scala b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentBehaviorSpec.scala index 604980a04f..b157af27e6 100644 --- a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentBehaviorSpec.scala +++ b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentBehaviorSpec.scala @@ -6,7 +6,8 @@ package docs.akka.persistence.typed import akka.Done import akka.actor.typed.{ ActorRef, Behavior } import akka.persistence.typed.scaladsl.PersistentBehaviors -import akka.persistence.typed.scaladsl.PersistentBehaviors.{ CommandHandler, Effect } +import akka.persistence.typed.scaladsl.PersistentBehaviors.CommandHandler +import akka.persistence.typed.scaladsl.Effect object InDepthPersistentBehaviorSpec { diff --git a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala index 53c3446072..a3abe74318 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala @@ -17,24 +17,24 @@ import scala.collection.immutable import scala.concurrent.duration.FiniteDuration import scala.util.control.NonFatal -/** - * INTERNAL API - */ +/** INTERNAL API */ +@InternalApi private[persistence] object Eventsourced { - // ok to wrap around (2*Int.MaxValue restarts will not happen within a journal roundtrip) + // ok to wrap around (2*Int.MaxValue restarts will not happen within a journal round-trip) private val instanceIdCounter = new AtomicInteger(1) - private sealed trait PendingHandlerInvocation { + /** INTERNAL API */ + private[akka] sealed trait PendingHandlerInvocation { def evt: Any def handler: Any ⇒ Unit } - /** forces actor to stash incoming commands until all these invocations are handled */ - private final case class StashingHandlerInvocation(evt: Any, handler: Any ⇒ Unit) extends PendingHandlerInvocation - /** does not force the actor to stash commands; Originates from either `persistAsync` or `defer` calls */ - private final case class AsyncHandlerInvocation(evt: Any, handler: Any ⇒ Unit) extends PendingHandlerInvocation + /** INTERNAL API: forces actor to stash incoming commands until all these invocations are handled */ + private[akka] final case class StashingHandlerInvocation(evt: Any, handler: Any ⇒ Unit) extends PendingHandlerInvocation + /** INTERNAL API: does not force the actor to stash commands; Originates from either `persistAsync` or `defer` calls */ + private[akka] final case class AsyncHandlerInvocation(evt: Any, handler: Any ⇒ Unit) extends PendingHandlerInvocation - /** message used to detect that recovery timed out */ - private final case class RecoveryTick(snapshot: Boolean) + /** INTERNAL API: message used to detect that recovery timed out */ + private[akka] final case class RecoveryTick(snapshot: Boolean) } /** diff --git a/akka-persistence/src/main/scala/akka/persistence/RecoveryPermitter.scala b/akka-persistence/src/main/scala/akka/persistence/RecoveryPermitter.scala index e49f90a723..fb3aa9869d 100644 --- a/akka-persistence/src/main/scala/akka/persistence/RecoveryPermitter.scala +++ b/akka-persistence/src/main/scala/akka/persistence/RecoveryPermitter.scala @@ -18,9 +18,11 @@ import akka.actor.Terminated def props(maxPermits: Int): Props = Props(new RecoveryPermitter(maxPermits)) - case object RequestRecoveryPermit - case object RecoveryPermitGranted - case object ReturnRecoveryPermit + sealed trait Protocol + sealed trait Reply extends Protocol + case object RequestRecoveryPermit extends Protocol + case object RecoveryPermitGranted extends Reply + case object ReturnRecoveryPermit extends Protocol } @@ -49,18 +51,18 @@ import akka.actor.Terminated } case ReturnRecoveryPermit ⇒ - returnRecoveryPermit(sender()) + onReturnRecoveryPermit(sender()) case Terminated(ref) ⇒ // pre-mature termination should be rare if (!pending.remove(ref)) - returnRecoveryPermit(ref) + onReturnRecoveryPermit(ref) } - private def returnRecoveryPermit(ref: ActorRef): Unit = { + private def onReturnRecoveryPermit(ref: ActorRef): Unit = { usedPermits -= 1 context.unwatch(ref) - if (usedPermits < 0) throw new IllegalStateException("permits must not be negative") + if (usedPermits < 0) throw new IllegalStateException(s"permits must not be negative (returned by: ${ref})") if (!pending.isEmpty) { val ref = pending.poll() recoveryPermitGranted(ref) diff --git a/akka-persistence/src/main/scala/akka/persistence/Snapshotter.scala b/akka-persistence/src/main/scala/akka/persistence/Snapshotter.scala index f8e899205b..3edd80b2c8 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Snapshotter.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Snapshotter.scala @@ -29,7 +29,7 @@ trait Snapshotter extends Actor { * Instructs the snapshot store to load the specified snapshot and send it via an [[SnapshotOffer]] * to the running [[PersistentActor]]. */ - def loadSnapshot(persistenceId: String, criteria: SnapshotSelectionCriteria, toSequenceNr: Long) = + def loadSnapshot(persistenceId: String, criteria: SnapshotSelectionCriteria, toSequenceNr: Long): Unit = snapshotStore ! LoadSnapshot(persistenceId, criteria, toSequenceNr) /** diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala index c4c637e42c..c89997ac8c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala @@ -3,7 +3,7 @@ */ package akka.stream.impl.io -import java.io.{ IOException, InputStream } +import java.io.{ BufferedOutputStream, ByteArrayOutputStream, IOException, InputStream } import java.util.concurrent.{ BlockingQueue, LinkedBlockingDeque, TimeUnit } import akka.annotation.InternalApi