Typed unstash improvements (#26599)

* Handle unhandled while unstashing #26362

* Handle stop while unstashing

* unstashing with initial Behaviors.same to refer to current actor behavior

* Unstash into deadletter when stopping during unstash

* More unhandled test coverage

* Avoid expecting ActorContextAdapter for internals

currentBehavior and onUnhandled moved to scaladsl.ActorContext as internal methods
allowing support for touching them also in the behavior testkit

* Scaladoc fixes

* Some more periods
This commit is contained in:
Johan Andrén 2019-03-26 15:06:02 +01:00 committed by Patrik Nordwall
parent 9ef11c9bfc
commit bc4523941c
11 changed files with 212 additions and 37 deletions

View file

@ -28,7 +28,7 @@ private[akka] final class BehaviorTestKitImpl[T](_path: ActorPath, _initialBehav
with akka.actor.testkit.typed.scaladsl.BehaviorTestKit[T] {
// really this should be private, make so when we port out tests that need it
private[akka] val context = new EffectfulActorContext[T](_path)
private[akka] val context = new EffectfulActorContext[T](_path, () => currentBehavior)
private[akka] def as[U]: BehaviorTestKitImpl[U] = this.asInstanceOf[BehaviorTestKitImpl[U]]

View file

@ -20,7 +20,10 @@ import scala.compat.java8.FunctionConverters._
/**
* INTERNAL API
*/
@InternalApi private[akka] final class EffectfulActorContext[T](path: ActorPath) extends StubbedActorContext[T](path) {
@InternalApi private[akka] final class EffectfulActorContext[T](
path: ActorPath,
currentBehaviorProvider: () => Behavior[T])
extends StubbedActorContext[T](path, currentBehaviorProvider) {
private[akka] val effectQueue = new ConcurrentLinkedQueue[Effect]

View file

@ -159,10 +159,11 @@ private[akka] final class FunctionRef[-T](override val path: ActorPath, send: (T
* provides only stubs for the effects an Actor can perform and replaces
* created child Actors by a synchronous Inbox (see `Inbox.sync`).
*/
@InternalApi private[akka] class StubbedActorContext[T](val path: ActorPath) extends ActorContextImpl[T] {
@InternalApi private[akka] class StubbedActorContext[T](val path: ActorPath, currentBehaviorProvider: () => Behavior[T])
extends ActorContextImpl[T] {
def this(name: String) = {
this((TestInbox.address / name).withUid(rnd().nextInt()))
def this(name: String, currentBehaviorProvider: () => Behavior[T]) = {
this((TestInbox.address / name).withUid(rnd().nextInt()), currentBehaviorProvider)
}
/**
@ -175,6 +176,7 @@ private[akka] final class FunctionRef[-T](override val path: ActorPath, send: (T
private var _children = TreeMap.empty[String, BehaviorTestKitImpl[_]]
private val childName = Iterator.from(0).map(Helpers.base64(_))
private val loggingAdapter = new StubbedLogger
private var unhandled: List[T] = Nil
override def children: Iterable[ActorRef[Nothing]] = _children.values.map(_.context.self)
def childrenNames: Iterable[String] = _children.keys
@ -286,7 +288,22 @@ private[akka] final class FunctionRef[-T](override val path: ActorPath, send: (T
def logEntries: List[CapturedLogEvent] = loggingAdapter.logEntries
/**
* Clear the log entries
* Clear the log entries.
*/
def clearLog(): Unit = loggingAdapter.clearLog()
override private[akka] def onUnhandled(msg: T): Unit =
unhandled = msg :: unhandled
/**
* Messages that are marked as unhandled.
*/
def unhandledMessages: List[T] = unhandled.reverse
/**
* Clear the list of captured unhandled messages.
*/
def clearUnhandled(): Unit = unhandled = Nil
override private[akka] def currentBehavior: Behavior[T] = currentBehaviorProvider()
}

View file

@ -11,7 +11,9 @@ import org.scalatest.{ Matchers, WordSpec }
class StashBufferSpec extends WordSpec with Matchers {
val context = new StubbedActorContext[String]("StashBufferSpec")
val context = new StubbedActorContext[String](
"StashBufferSpec",
() => throw new UnsupportedOperationException("Will never be invoked in this test"))
"A StashBuffer" must {

View file

@ -8,8 +8,8 @@ package scaladsl
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import akka.actor.DeadLetter
import scala.concurrent.duration._
import akka.actor.testkit.typed.TestException
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.testkit.typed.scaladsl.TestProbe
@ -562,26 +562,123 @@ class UnstashingSpec extends ScalaTestWithActorTestKit("""
val stash = StashBuffer[String](10)
stash.stash("one")
// FIXME #26148 using AbstractBehavior because unstashAll doesn't support Behavior.same
// unstashing is inside setup
new AbstractBehavior[String] {
override def onMessage(msg: String): Behavior[String] = msg match {
Behaviors.receiveMessage {
case "unstash" =>
Behaviors.setup[String] { ctx =>
stash.unstashAll(ctx, this)
stash.unstashAll(ctx, Behaviors.same)
}
case _ =>
case msg =>
probe.ref ! msg
Behavior.same
}
}
})
ref ! "unstash"
probe.expectMessage("one")
}
"deal with unhandled the same way as normal unhandled" in {
val probe = TestProbe[String]()
val ref = spawn(Behaviors.setup[String] { ctx =>
val stash = StashBuffer[String](10)
stash.stash("unhandled")
stash.stash("handled")
stash.stash("handled")
stash.stash("unhandled")
stash.stash("handled")
def unstashing(n: Int): Behavior[String] =
Behaviors.receiveMessage {
case "unhandled" => Behavior.unhandled
case "handled" =>
probe.ref ! s"handled $n"
unstashing(n + 1)
}
Behaviors.receiveMessage {
case "unstash" =>
stash.unstashAll(ctx, unstashing(1))
}
})
EventFilter.warning(start = "unhandled message from", occurrences = 2).intercept {
ref ! "unstash"
}
probe.expectMessage("handled 1")
probe.expectMessage("handled 2")
probe.expectMessage("handled 3")
ref ! "handled"
probe.expectMessage("handled 4")
}
"fail quick on invalid start behavior" in {
val stash = StashBuffer[String](10)
stash.stash("one")
intercept[IllegalArgumentException](stash.unstashAll(null, Behavior.unhandled))
}
"deal with initial stop" in {
val probe = TestProbe[Any]
val ref = spawn(Behaviors.setup[String] { ctx =>
val stash = StashBuffer[String](10)
stash.stash("one")
Behaviors.receiveMessage {
case "unstash" =>
stash.unstashAll(ctx, Behaviors.stopped)
}
})
ref ! "unstash"
probe.expectTerminated(ref)
}
"deal with stop" in {
val probe = TestProbe[Any]
import akka.actor.typed.scaladsl.adapter._
untypedSys.eventStream.subscribe(probe.ref.toUntyped, classOf[DeadLetter])
val ref = spawn(Behaviors.setup[String] { ctx =>
val stash = StashBuffer[String](10)
stash.stash("one")
stash.stash("two")
Behaviors.receiveMessage {
case "unstash" =>
stash.unstashAll(ctx, Behaviors.receiveMessage {
case unstashed =>
probe.ref ! unstashed
Behavior.stopped
})
case _ =>
Behavior.same
}
})
ref ! "unstash"
probe.expectMessage("one")
probe.expectMessageType[DeadLetter].message should equal("two")
probe.expectTerminated(ref)
}
"work with initial same" in {
val probe = TestProbe[Any]
val ref = spawn(Behaviors.setup[String] { ctx =>
val stash = StashBuffer[String](10)
stash.stash("one")
stash.stash("two")
Behaviors.receiveMessage {
case "unstash" =>
stash.unstashAll(ctx, Behaviors.same)
case msg =>
probe.ref ! msg
Behavior.same
}
})
ref ! "unstash"
probe.expectMessage("one")
probe.expectMessage("two")
}
}
}

View file

@ -7,9 +7,10 @@ package akka.actor.typed.internal
import java.util.function.Consumer
import java.util.function.{ Function => JFunction }
import akka.actor.DeadLetter
import scala.annotation.tailrec
import scala.util.control.NonFatal
import akka.actor.typed.Behavior
import akka.actor.typed.Signal
import akka.actor.typed.TypedActorContext
@ -125,8 +126,9 @@ import akka.util.ConstantFun
val b2 = Behavior.start(b, ctx)
if (!Behavior.isAlive(b2) || !messages.hasNext) b2
else {
val nextB = try {
messages.next() match {
val message = messages.next()
val interpretResult = try {
message match {
case sig: Signal => Behavior.interpretSignal(b2, ctx, sig)
case msg => Behavior.interpretMessage(b2, ctx, msg)
}
@ -134,11 +136,46 @@ import akka.util.ConstantFun
case NonFatal(e) => throw UnstashException(e, b2)
}
interpretOne(Behavior.canonicalize(nextB, b2, ctx)) // recursive
val actualNext =
if (interpretResult == Behavior.same) b2
else if (Behavior.isUnhandled(interpretResult)) {
ctx.asScala.onUnhandled(message)
b2
} else {
interpretResult
}
if (Behavior.isAlive(actualNext))
interpretOne(Behavior.canonicalize(actualNext, b2, ctx)) // recursive
else {
unstashRestToDeadLetters(ctx, messages)
actualNext
}
}
}
interpretOne(Behavior.start(behavior, ctx))
val started = Behavior.start(behavior, ctx)
val actualInitialBehavior =
if (Behavior.isUnhandled(started))
throw new IllegalArgumentException("Cannot unstash with unhandled as starting behavior")
else if (started == Behavior.same) {
ctx.asScala.currentBehavior
} else started
if (Behavior.isAlive(actualInitialBehavior)) {
interpretOne(actualInitialBehavior)
} else {
unstashRestToDeadLetters(ctx, messages)
started
}
}
private def unstashRestToDeadLetters(ctx: TypedActorContext[T], messages: Iterator[T]): Unit = {
val scalaCtx = ctx.asScala
import akka.actor.typed.scaladsl.adapter._
val untypedDeadLetters = scalaCtx.system.deadLetters.toUntyped
messages.foreach(msg =>
scalaCtx.system.deadLetters ! DeadLetter(msg, untypedDeadLetters, ctx.asScala.self.toUntyped))
}
override def unstash(

View file

@ -25,7 +25,7 @@ import scala.concurrent.duration._
import ActorRefAdapter.toUntyped
private[akka] def currentBehavior: Behavior[T] = adapter.currentBehavior
private[akka] override def currentBehavior: Behavior[T] = adapter.currentBehavior
// lazily initialized
private var actorLogger: OptionVal[Logger] = OptionVal.None
@ -110,6 +110,12 @@ import scala.concurrent.duration._
override def setLoggerClass(clazz: Class[_]): Unit = {
initLoggerWithClass(clazz)
}
/**
* Made accessible to allow stash to deal with unhandled messages as though they were interpreted by
* the adapter itself, even though the unstashing occurs inside the behavior stack.
*/
private[akka] override def onUnhandled(msg: T): Unit = adapter.unhandled(msg)
}
/**

View file

@ -100,6 +100,8 @@ object StashBuffer {
* It's allowed to stash messages while unstashing. Those newly added
* messages will not be processed by this call and have to be unstashed
* in another call.
*
* The `behavior` passed to `unstashAll` must not be `unhandled`.
*/
def unstashAll(ctx: ActorContext[T], behavior: Behavior[T]): Behavior[T]
@ -121,6 +123,8 @@ object StashBuffer {
* It's allowed to stash messages while unstashing. Those newly added
* messages will not be processed by this call and have to be unstashed
* in another call.
*
* The `behavior` passed to `unstash` must not be `unhandled`.
*/
def unstash(ctx: ActorContext[T], behavior: Behavior[T], numberOfMessages: Int, wrap: JFunction[T, T]): Behavior[T]

View file

@ -299,4 +299,15 @@ trait ActorContext[T] extends TypedActorContext[T] {
*/
def pipeToSelf[Value](future: Future[Value])(mapResult: Try[Value] => T): Unit
/**
* INTERNAL API
*/
@InternalApi
private[akka] def onUnhandled(msg: T): Unit
/**
* INTERNAL API
*/
private[akka] def currentBehavior: Behavior[T]
}

View file

@ -94,6 +94,8 @@ object StashBuffer {
* It's allowed to stash messages while unstashing. Those newly added
* messages will not be processed by this call and have to be unstashed
* in another call.
*
* The initial `behavior` passed to `unstashAll` must not be `unhandled`.
*/
def unstashAll(ctx: ActorContext[T], behavior: Behavior[T]): Behavior[T]
@ -115,6 +117,8 @@ object StashBuffer {
* It's allowed to stash messages while unstashing. Those newly added
* messages will not be processed by this call and have to be unstashed
* in another call.
*
* The `behavior` passed to `unstash` must not be `unhandled`.
*/
def unstash(ctx: ActorContext[T], behavior: Behavior[T], numberOfMessages: Int, wrap: T => T): Behavior[T]

View file

@ -12,7 +12,6 @@ import akka.actor.typed.Behavior.DeferredBehavior
import akka.actor.typed.Signal
import akka.actor.typed.internal.InterceptorImpl
import akka.actor.typed.internal.LoggerClass
import akka.actor.typed.internal.adapter.ActorContextAdapter
import akka.actor.typed.scaladsl.ActorContext
import akka.annotation.DoNotInherit
import akka.persistence.typed.EventAdapter
@ -105,16 +104,11 @@ object EventSourcedBehavior {
case concrete => concrete
}
context match {
case impl: ActorContextAdapter[_] =>
extractConcreteBehavior(impl.currentBehavior) match {
extractConcreteBehavior(context.currentBehavior) match {
case w: Running.WithSeqNrAccessible => w.currentSequenceNumber
case s =>
throw new IllegalStateException(s"Cannot extract the lastSequenceNumber in state ${s.getClass.getName}")
}
case c =>
throw new IllegalStateException(s"Cannot extract the lastSequenceNumber from context ${c.getClass.getName}")
}
}
}