From 9fabd36fb544e5df3bece8f16aabeae68bf1feb0 Mon Sep 17 00:00:00 2001 From: Yury Gribkov Date: Mon, 18 Nov 2019 18:53:42 -0500 Subject: [PATCH] Add interceptors for interpretUnstashedMessage and onReplayingSnapshot. --- .../typed/internal/StashBufferImpl.scala | 35 ++++++++++++------- .../typed/internal/ReplayingSnapshot.scala | 4 +++ .../internal/RequestingRecoveryPermit.scala | 10 ++++-- 3 files changed, 35 insertions(+), 14 deletions(-) diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/StashBufferImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/StashBufferImpl.scala index d70e8c5f6b..f919bde9ca 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/StashBufferImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/StashBufferImpl.scala @@ -91,6 +91,15 @@ import akka.util.{ unused, ConstantFun } message } + @InternalStableApi + private def interpretUnstashedMessage( + behavior: Behavior[T], + ctx: TypedActorContext[T], + wrappedMessage: T, + @unused node: Node[T]): Behavior[T] = { + Behavior.interpretMessage(behavior, ctx, wrappedMessage) + } + private def rawHead: Node[T] = if (nonEmpty) _first else throw new NoSuchElementException("head of empty buffer") @@ -119,31 +128,33 @@ import akka.util.{ unused, ConstantFun } if (isEmpty) behavior // optimization else { - val iter = new Iterator[T] { + val iter = new Iterator[Node[T]] { override def hasNext: Boolean = StashBufferImpl.this.nonEmpty - override def next(): T = { + override def next(): Node[T] = { val next = StashBufferImpl.this.dropHeadForUnstash() unstashed(ctx, next) - wrap(next.message) + next } }.take(math.min(numberOfMessages, size)) - interpretUnstashedMessages(behavior, ctx, iter) + interpretUnstashedMessages(behavior, ctx, iter, wrap) } } private def interpretUnstashedMessages( behavior: Behavior[T], ctx: TypedActorContext[T], - messages: Iterator[T]): Behavior[T] = { + messages: Iterator[Node[T]], + wrap: T => T): Behavior[T] = { @tailrec def interpretOne(b: Behavior[T]): Behavior[T] = { val b2 = Behavior.start(b, ctx) if (!Behavior.isAlive(b2) || !messages.hasNext) b2 else { - val message = messages.next() + val node = messages.next() + val message = wrap(node.message) val interpretResult = try { message match { case sig: Signal => Behavior.interpretSignal(b2, ctx, sig) - case msg => Behavior.interpretMessage(b2, ctx, msg) + case msg => interpretUnstashedMessage(b2, ctx, msg, node) } } catch { case NonFatal(e) => throw UnstashException(e, b2) @@ -161,7 +172,7 @@ import akka.util.{ unused, ConstantFun } if (Behavior.isAlive(actualNext)) interpretOne(Behavior.canonicalize(actualNext, b2, ctx)) // recursive else { - unstashRestToDeadLetters(ctx, messages) + unstashRestToDeadLetters(ctx, messages, wrap) actualNext } } @@ -178,17 +189,17 @@ import akka.util.{ unused, ConstantFun } if (Behavior.isAlive(actualInitialBehavior)) { interpretOne(actualInitialBehavior) } else { - unstashRestToDeadLetters(ctx, messages) + unstashRestToDeadLetters(ctx, messages, wrap) started } } - private def unstashRestToDeadLetters(ctx: TypedActorContext[T], messages: Iterator[T]): Unit = { + private def unstashRestToDeadLetters(ctx: TypedActorContext[T], messages: Iterator[Node[T]], wrap: T => T): Unit = { val scalaCtx = ctx.asScala import akka.actor.typed.scaladsl.adapter._ val classicDeadLetters = scalaCtx.system.deadLetters.toClassic - messages.foreach(msg => - scalaCtx.system.deadLetters ! DeadLetter(msg, classicDeadLetters, ctx.asScala.self.toClassic)) + messages.foreach(node => + scalaCtx.system.deadLetters ! DeadLetter(wrap(node.message), classicDeadLetters, ctx.asScala.self.toClassic)) } override def unstash(behavior: Behavior[T], numberOfMessages: Int, wrap: JFunction[T, T]): Behavior[T] = diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala index 0e27576356..e5a6112887 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala @@ -45,6 +45,8 @@ private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup import InternalProtocol._ + onRecoveryStart(setup.context) + def createBehavior(receivedPoisonPillInPreviousPhase: Boolean): Behavior[InternalProtocol] = { // protect against snapshot stalling forever because of journal overloaded and such setup.startRecoveryTimer(snapshot = true) @@ -100,6 +102,8 @@ private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup throw new JournalFailureException(msg, cause) } + @InternalStableApi + def onRecoveryStart(@unused context: ActorContext[_]): Unit = () @InternalStableApi def onRecoveryFailed(@unused context: ActorContext[_], @unused reason: Throwable): Unit = () diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/RequestingRecoveryPermit.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/RequestingRecoveryPermit.scala index 7de1ff24ce..9dc28bb6cd 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/RequestingRecoveryPermit.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/RequestingRecoveryPermit.scala @@ -6,8 +6,9 @@ package akka.persistence.typed.internal import akka.actor.typed.Behavior import akka.actor.typed.internal.PoisonPill -import akka.actor.typed.scaladsl.Behaviors -import akka.annotation.InternalApi +import akka.actor.typed.scaladsl.{ ActorContext, Behaviors } +import akka.annotation.{ InternalApi, InternalStableApi } +import akka.util.unused /** * INTERNAL API @@ -33,6 +34,8 @@ private[akka] class RequestingRecoveryPermit[C, E, S](override val setup: Behavi with JournalInteractions[C, E, S] with SnapshotInteractions[C, E, S] { + onRequestingRecoveryPermit(setup.context) + def createBehavior(): Behavior[InternalProtocol] = { // request a permit, as only once we obtain one we can start replaying requestRecoveryPermit() @@ -65,6 +68,9 @@ private[akka] class RequestingRecoveryPermit[C, E, S](override val setup: Behavi stay(receivedPoisonPill = false) } + @InternalStableApi + def onRequestingRecoveryPermit(@unused context: ActorContext[_]): Unit = () + private def becomeReplaying(receivedPoisonPill: Boolean): Behavior[InternalProtocol] = { setup.log.debug(s"Initializing snapshot recovery: {}", setup.recovery)