Add interceptors for interpretUnstashedMessage and onReplayingSnapshot.

This commit is contained in:
Yury Gribkov 2019-11-18 18:53:42 -05:00
parent 548b3589f2
commit 9fabd36fb5
3 changed files with 35 additions and 14 deletions

View file

@ -91,6 +91,15 @@ import akka.util.{ unused, ConstantFun }
message
}
@InternalStableApi
private def interpretUnstashedMessage(
behavior: Behavior[T],
ctx: TypedActorContext[T],
wrappedMessage: T,
@unused node: Node[T]): Behavior[T] = {
Behavior.interpretMessage(behavior, ctx, wrappedMessage)
}
private def rawHead: Node[T] =
if (nonEmpty) _first
else throw new NoSuchElementException("head of empty buffer")
@ -119,31 +128,33 @@ import akka.util.{ unused, ConstantFun }
if (isEmpty)
behavior // optimization
else {
val iter = new Iterator[T] {
val iter = new Iterator[Node[T]] {
override def hasNext: Boolean = StashBufferImpl.this.nonEmpty
override def next(): T = {
override def next(): Node[T] = {
val next = StashBufferImpl.this.dropHeadForUnstash()
unstashed(ctx, next)
wrap(next.message)
next
}
}.take(math.min(numberOfMessages, size))
interpretUnstashedMessages(behavior, ctx, iter)
interpretUnstashedMessages(behavior, ctx, iter, wrap)
}
}
private def interpretUnstashedMessages(
behavior: Behavior[T],
ctx: TypedActorContext[T],
messages: Iterator[T]): Behavior[T] = {
messages: Iterator[Node[T]],
wrap: T => T): Behavior[T] = {
@tailrec def interpretOne(b: Behavior[T]): Behavior[T] = {
val b2 = Behavior.start(b, ctx)
if (!Behavior.isAlive(b2) || !messages.hasNext) b2
else {
val message = messages.next()
val node = messages.next()
val message = wrap(node.message)
val interpretResult = try {
message match {
case sig: Signal => Behavior.interpretSignal(b2, ctx, sig)
case msg => Behavior.interpretMessage(b2, ctx, msg)
case msg => interpretUnstashedMessage(b2, ctx, msg, node)
}
} catch {
case NonFatal(e) => throw UnstashException(e, b2)
@ -161,7 +172,7 @@ import akka.util.{ unused, ConstantFun }
if (Behavior.isAlive(actualNext))
interpretOne(Behavior.canonicalize(actualNext, b2, ctx)) // recursive
else {
unstashRestToDeadLetters(ctx, messages)
unstashRestToDeadLetters(ctx, messages, wrap)
actualNext
}
}
@ -178,17 +189,17 @@ import akka.util.{ unused, ConstantFun }
if (Behavior.isAlive(actualInitialBehavior)) {
interpretOne(actualInitialBehavior)
} else {
unstashRestToDeadLetters(ctx, messages)
unstashRestToDeadLetters(ctx, messages, wrap)
started
}
}
private def unstashRestToDeadLetters(ctx: TypedActorContext[T], messages: Iterator[T]): Unit = {
private def unstashRestToDeadLetters(ctx: TypedActorContext[T], messages: Iterator[Node[T]], wrap: T => T): Unit = {
val scalaCtx = ctx.asScala
import akka.actor.typed.scaladsl.adapter._
val classicDeadLetters = scalaCtx.system.deadLetters.toClassic
messages.foreach(msg =>
scalaCtx.system.deadLetters ! DeadLetter(msg, classicDeadLetters, ctx.asScala.self.toClassic))
messages.foreach(node =>
scalaCtx.system.deadLetters ! DeadLetter(wrap(node.message), classicDeadLetters, ctx.asScala.self.toClassic))
}
override def unstash(behavior: Behavior[T], numberOfMessages: Int, wrap: JFunction[T, T]): Behavior[T] =

View file

@ -45,6 +45,8 @@ private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup
import InternalProtocol._
onRecoveryStart(setup.context)
def createBehavior(receivedPoisonPillInPreviousPhase: Boolean): Behavior[InternalProtocol] = {
// protect against snapshot stalling forever because of journal overloaded and such
setup.startRecoveryTimer(snapshot = true)
@ -100,6 +102,8 @@ private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup
throw new JournalFailureException(msg, cause)
}
@InternalStableApi
def onRecoveryStart(@unused context: ActorContext[_]): Unit = ()
@InternalStableApi
def onRecoveryFailed(@unused context: ActorContext[_], @unused reason: Throwable): Unit = ()

View file

@ -6,8 +6,9 @@ package akka.persistence.typed.internal
import akka.actor.typed.Behavior
import akka.actor.typed.internal.PoisonPill
import akka.actor.typed.scaladsl.Behaviors
import akka.annotation.InternalApi
import akka.actor.typed.scaladsl.{ ActorContext, Behaviors }
import akka.annotation.{ InternalApi, InternalStableApi }
import akka.util.unused
/**
* INTERNAL API
@ -33,6 +34,8 @@ private[akka] class RequestingRecoveryPermit[C, E, S](override val setup: Behavi
with JournalInteractions[C, E, S]
with SnapshotInteractions[C, E, S] {
onRequestingRecoveryPermit(setup.context)
def createBehavior(): Behavior[InternalProtocol] = {
// request a permit, as only once we obtain one we can start replaying
requestRecoveryPermit()
@ -65,6 +68,9 @@ private[akka] class RequestingRecoveryPermit[C, E, S](override val setup: Behavi
stay(receivedPoisonPill = false)
}
@InternalStableApi
def onRequestingRecoveryPermit(@unused context: ActorContext[_]): Unit = ()
private def becomeReplaying(receivedPoisonPill: Boolean): Behavior[InternalProtocol] = {
setup.log.debug(s"Initializing snapshot recovery: {}", setup.recovery)