From bc4523941ccbe30ece41277c411f18d4af1a83e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Tue, 26 Mar 2019 15:06:02 +0100 Subject: [PATCH] Typed unstash improvements (#26599) * Handle unhandled while unstashing #26362 * Handle stop while unstashing * unstashing with initial Behaviors.same to refer to current actor behavior * Unstash into deadletter when stopping during unstash * More unhandled test coverage * Avoid expecting ActorContextAdapter for internals currentBehavior and onUnhandled moved to scaladsl.ActorContext as internal methods allowing support for touching them also in the behavior testkit * Scaladoc fixes * Some more periods --- .../typed/internal/BehaviorTestKitImpl.scala | 2 +- .../internal/EffectfulActorContext.scala | 5 +- .../typed/internal/StubbedActorContext.scala | 25 +++- .../typed/scaladsl/StashBufferSpec.scala | 4 +- .../akka/actor/typed/scaladsl/StashSpec.scala | 125 ++++++++++++++++-- .../typed/internal/StashBufferImpl.scala | 47 ++++++- .../adapter/ActorContextAdapter.scala | 8 +- .../actor/typed/javadsl/StashBuffer.scala | 4 + .../actor/typed/scaladsl/ActorContext.scala | 11 ++ .../actor/typed/scaladsl/StashBuffer.scala | 4 + .../typed/scaladsl/EventSourcedBehavior.scala | 14 +- 11 files changed, 212 insertions(+), 37 deletions(-) diff --git a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/BehaviorTestKitImpl.scala b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/BehaviorTestKitImpl.scala index 634167817a..ff8547c30e 100644 --- a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/BehaviorTestKitImpl.scala +++ b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/BehaviorTestKitImpl.scala @@ -28,7 +28,7 @@ private[akka] final class BehaviorTestKitImpl[T](_path: ActorPath, _initialBehav with akka.actor.testkit.typed.scaladsl.BehaviorTestKit[T] { // really this should be private, make so when we port out tests that need it - private[akka] val context = new EffectfulActorContext[T](_path) + private[akka] val context = new EffectfulActorContext[T](_path, () => currentBehavior) private[akka] def as[U]: BehaviorTestKitImpl[U] = this.asInstanceOf[BehaviorTestKitImpl[U]] diff --git a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/EffectfulActorContext.scala b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/EffectfulActorContext.scala index 7aea630c9d..660db77fd1 100644 --- a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/EffectfulActorContext.scala +++ b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/EffectfulActorContext.scala @@ -20,7 +20,10 @@ import scala.compat.java8.FunctionConverters._ /** * INTERNAL API */ -@InternalApi private[akka] final class EffectfulActorContext[T](path: ActorPath) extends StubbedActorContext[T](path) { +@InternalApi private[akka] final class EffectfulActorContext[T]( + path: ActorPath, + currentBehaviorProvider: () => Behavior[T]) + extends StubbedActorContext[T](path, currentBehaviorProvider) { private[akka] val effectQueue = new ConcurrentLinkedQueue[Effect] diff --git a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/StubbedActorContext.scala b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/StubbedActorContext.scala index f495e3fdd6..060133a0f2 100644 --- a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/StubbedActorContext.scala +++ b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/StubbedActorContext.scala @@ -159,10 +159,11 @@ private[akka] final class FunctionRef[-T](override val path: ActorPath, send: (T * provides only stubs for the effects an Actor can perform and replaces * created child Actors by a synchronous Inbox (see `Inbox.sync`). */ -@InternalApi private[akka] class StubbedActorContext[T](val path: ActorPath) extends ActorContextImpl[T] { +@InternalApi private[akka] class StubbedActorContext[T](val path: ActorPath, currentBehaviorProvider: () => Behavior[T]) + extends ActorContextImpl[T] { - def this(name: String) = { - this((TestInbox.address / name).withUid(rnd().nextInt())) + def this(name: String, currentBehaviorProvider: () => Behavior[T]) = { + this((TestInbox.address / name).withUid(rnd().nextInt()), currentBehaviorProvider) } /** @@ -175,6 +176,7 @@ private[akka] final class FunctionRef[-T](override val path: ActorPath, send: (T private var _children = TreeMap.empty[String, BehaviorTestKitImpl[_]] private val childName = Iterator.from(0).map(Helpers.base64(_)) private val loggingAdapter = new StubbedLogger + private var unhandled: List[T] = Nil override def children: Iterable[ActorRef[Nothing]] = _children.values.map(_.context.self) def childrenNames: Iterable[String] = _children.keys @@ -286,7 +288,22 @@ private[akka] final class FunctionRef[-T](override val path: ActorPath, send: (T def logEntries: List[CapturedLogEvent] = loggingAdapter.logEntries /** - * Clear the log entries + * Clear the log entries. */ def clearLog(): Unit = loggingAdapter.clearLog() + + override private[akka] def onUnhandled(msg: T): Unit = + unhandled = msg :: unhandled + + /** + * Messages that are marked as unhandled. + */ + def unhandledMessages: List[T] = unhandled.reverse + + /** + * Clear the list of captured unhandled messages. + */ + def clearUnhandled(): Unit = unhandled = Nil + + override private[akka] def currentBehavior: Behavior[T] = currentBehaviorProvider() } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/StashBufferSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/StashBufferSpec.scala index 49cfda9374..39221dcec5 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/StashBufferSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/StashBufferSpec.scala @@ -11,7 +11,9 @@ import org.scalatest.{ Matchers, WordSpec } class StashBufferSpec extends WordSpec with Matchers { - val context = new StubbedActorContext[String]("StashBufferSpec") + val context = new StubbedActorContext[String]( + "StashBufferSpec", + () => throw new UnsupportedOperationException("Will never be invoked in this test")) "A StashBuffer" must { diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/StashSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/StashSpec.scala index 39e8518917..b0540c8e0f 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/StashSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/StashSpec.scala @@ -8,8 +8,8 @@ package scaladsl import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit +import akka.actor.DeadLetter import scala.concurrent.duration._ - import akka.actor.testkit.typed.TestException import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import akka.actor.testkit.typed.scaladsl.TestProbe @@ -562,19 +562,15 @@ class UnstashingSpec extends ScalaTestWithActorTestKit(""" val stash = StashBuffer[String](10) stash.stash("one") - // FIXME #26148 using AbstractBehavior because unstashAll doesn't support Behavior.same - // unstashing is inside setup - new AbstractBehavior[String] { - override def onMessage(msg: String): Behavior[String] = msg match { - case "unstash" => - Behaviors.setup[String] { ctx => - stash.unstashAll(ctx, this) - } - case _ => - probe.ref ! msg - Behavior.same - } + Behaviors.receiveMessage { + case "unstash" => + Behaviors.setup[String] { ctx => + stash.unstashAll(ctx, Behaviors.same) + } + case msg => + probe.ref ! msg + Behavior.same } }) @@ -582,6 +578,107 @@ class UnstashingSpec extends ScalaTestWithActorTestKit(""" probe.expectMessage("one") } - } + "deal with unhandled the same way as normal unhandled" in { + val probe = TestProbe[String]() + val ref = spawn(Behaviors.setup[String] { ctx => + val stash = StashBuffer[String](10) + stash.stash("unhandled") + stash.stash("handled") + stash.stash("handled") + stash.stash("unhandled") + stash.stash("handled") + def unstashing(n: Int): Behavior[String] = + Behaviors.receiveMessage { + case "unhandled" => Behavior.unhandled + case "handled" => + probe.ref ! s"handled $n" + unstashing(n + 1) + } + + Behaviors.receiveMessage { + case "unstash" => + stash.unstashAll(ctx, unstashing(1)) + } + }) + + EventFilter.warning(start = "unhandled message from", occurrences = 2).intercept { + ref ! "unstash" + } + probe.expectMessage("handled 1") + probe.expectMessage("handled 2") + probe.expectMessage("handled 3") + + ref ! "handled" + probe.expectMessage("handled 4") + } + + "fail quick on invalid start behavior" in { + val stash = StashBuffer[String](10) + stash.stash("one") + intercept[IllegalArgumentException](stash.unstashAll(null, Behavior.unhandled)) + } + + "deal with initial stop" in { + val probe = TestProbe[Any] + val ref = spawn(Behaviors.setup[String] { ctx => + val stash = StashBuffer[String](10) + stash.stash("one") + + Behaviors.receiveMessage { + case "unstash" => + stash.unstashAll(ctx, Behaviors.stopped) + } + }) + + ref ! "unstash" + probe.expectTerminated(ref) + } + + "deal with stop" in { + val probe = TestProbe[Any] + import akka.actor.typed.scaladsl.adapter._ + untypedSys.eventStream.subscribe(probe.ref.toUntyped, classOf[DeadLetter]) + val ref = spawn(Behaviors.setup[String] { ctx => + val stash = StashBuffer[String](10) + stash.stash("one") + stash.stash("two") + + Behaviors.receiveMessage { + case "unstash" => + stash.unstashAll(ctx, Behaviors.receiveMessage { + case unstashed => + probe.ref ! unstashed + Behavior.stopped + }) + case _ => + Behavior.same + } + }) + ref ! "unstash" + probe.expectMessage("one") + probe.expectMessageType[DeadLetter].message should equal("two") + probe.expectTerminated(ref) + } + + "work with initial same" in { + val probe = TestProbe[Any] + val ref = spawn(Behaviors.setup[String] { ctx => + val stash = StashBuffer[String](10) + stash.stash("one") + stash.stash("two") + + Behaviors.receiveMessage { + case "unstash" => + stash.unstashAll(ctx, Behaviors.same) + case msg => + probe.ref ! msg + Behavior.same + } + }) + ref ! "unstash" + probe.expectMessage("one") + probe.expectMessage("two") + } + } } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/StashBufferImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/StashBufferImpl.scala index 3bf2488f89..196281e759 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/StashBufferImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/StashBufferImpl.scala @@ -7,9 +7,10 @@ package akka.actor.typed.internal import java.util.function.Consumer import java.util.function.{ Function => JFunction } +import akka.actor.DeadLetter + import scala.annotation.tailrec import scala.util.control.NonFatal - import akka.actor.typed.Behavior import akka.actor.typed.Signal import akka.actor.typed.TypedActorContext @@ -125,8 +126,9 @@ import akka.util.ConstantFun val b2 = Behavior.start(b, ctx) if (!Behavior.isAlive(b2) || !messages.hasNext) b2 else { - val nextB = try { - messages.next() match { + val message = messages.next() + val interpretResult = try { + message match { case sig: Signal => Behavior.interpretSignal(b2, ctx, sig) case msg => Behavior.interpretMessage(b2, ctx, msg) } @@ -134,11 +136,46 @@ import akka.util.ConstantFun case NonFatal(e) => throw UnstashException(e, b2) } - interpretOne(Behavior.canonicalize(nextB, b2, ctx)) // recursive + val actualNext = + if (interpretResult == Behavior.same) b2 + else if (Behavior.isUnhandled(interpretResult)) { + ctx.asScala.onUnhandled(message) + b2 + } else { + interpretResult + } + + if (Behavior.isAlive(actualNext)) + interpretOne(Behavior.canonicalize(actualNext, b2, ctx)) // recursive + else { + unstashRestToDeadLetters(ctx, messages) + actualNext + } } } - interpretOne(Behavior.start(behavior, ctx)) + val started = Behavior.start(behavior, ctx) + val actualInitialBehavior = + if (Behavior.isUnhandled(started)) + throw new IllegalArgumentException("Cannot unstash with unhandled as starting behavior") + else if (started == Behavior.same) { + ctx.asScala.currentBehavior + } else started + + if (Behavior.isAlive(actualInitialBehavior)) { + interpretOne(actualInitialBehavior) + } else { + unstashRestToDeadLetters(ctx, messages) + started + } + } + + private def unstashRestToDeadLetters(ctx: TypedActorContext[T], messages: Iterator[T]): Unit = { + val scalaCtx = ctx.asScala + import akka.actor.typed.scaladsl.adapter._ + val untypedDeadLetters = scalaCtx.system.deadLetters.toUntyped + messages.foreach(msg => + scalaCtx.system.deadLetters ! DeadLetter(msg, untypedDeadLetters, ctx.asScala.self.toUntyped)) } override def unstash( diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorContextAdapter.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorContextAdapter.scala index 65bed16b88..d477f6772a 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorContextAdapter.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorContextAdapter.scala @@ -25,7 +25,7 @@ import scala.concurrent.duration._ import ActorRefAdapter.toUntyped - private[akka] def currentBehavior: Behavior[T] = adapter.currentBehavior + private[akka] override def currentBehavior: Behavior[T] = adapter.currentBehavior // lazily initialized private var actorLogger: OptionVal[Logger] = OptionVal.None @@ -110,6 +110,12 @@ import scala.concurrent.duration._ override def setLoggerClass(clazz: Class[_]): Unit = { initLoggerWithClass(clazz) } + + /** + * Made accessible to allow stash to deal with unhandled messages as though they were interpreted by + * the adapter itself, even though the unstashing occurs inside the behavior stack. + */ + private[akka] override def onUnhandled(msg: T): Unit = adapter.unhandled(msg) } /** diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/StashBuffer.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/StashBuffer.scala index b1ed2b2444..b8763a73fc 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/StashBuffer.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/StashBuffer.scala @@ -100,6 +100,8 @@ object StashBuffer { * It's allowed to stash messages while unstashing. Those newly added * messages will not be processed by this call and have to be unstashed * in another call. + * + * The `behavior` passed to `unstashAll` must not be `unhandled`. */ def unstashAll(ctx: ActorContext[T], behavior: Behavior[T]): Behavior[T] @@ -121,6 +123,8 @@ object StashBuffer { * It's allowed to stash messages while unstashing. Those newly added * messages will not be processed by this call and have to be unstashed * in another call. + * + * The `behavior` passed to `unstash` must not be `unhandled`. */ def unstash(ctx: ActorContext[T], behavior: Behavior[T], numberOfMessages: Int, wrap: JFunction[T, T]): Behavior[T] diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/ActorContext.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/ActorContext.scala index 25eb404017..4ed2e6c6dc 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/ActorContext.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/ActorContext.scala @@ -299,4 +299,15 @@ trait ActorContext[T] extends TypedActorContext[T] { */ def pipeToSelf[Value](future: Future[Value])(mapResult: Try[Value] => T): Unit + /** + * INTERNAL API + */ + @InternalApi + private[akka] def onUnhandled(msg: T): Unit + + /** + * INTERNAL API + */ + private[akka] def currentBehavior: Behavior[T] + } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/StashBuffer.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/StashBuffer.scala index 17b44d5cba..a142a2fe02 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/StashBuffer.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/StashBuffer.scala @@ -94,6 +94,8 @@ object StashBuffer { * It's allowed to stash messages while unstashing. Those newly added * messages will not be processed by this call and have to be unstashed * in another call. + * + * The initial `behavior` passed to `unstashAll` must not be `unhandled`. */ def unstashAll(ctx: ActorContext[T], behavior: Behavior[T]): Behavior[T] @@ -115,6 +117,8 @@ object StashBuffer { * It's allowed to stash messages while unstashing. Those newly added * messages will not be processed by this call and have to be unstashed * in another call. + * + * The `behavior` passed to `unstash` must not be `unhandled`. */ def unstash(ctx: ActorContext[T], behavior: Behavior[T], numberOfMessages: Int, wrap: T => T): Behavior[T] diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala index d49aaf6d80..f679c3a958 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala @@ -12,7 +12,6 @@ import akka.actor.typed.Behavior.DeferredBehavior import akka.actor.typed.Signal import akka.actor.typed.internal.InterceptorImpl import akka.actor.typed.internal.LoggerClass -import akka.actor.typed.internal.adapter.ActorContextAdapter import akka.actor.typed.scaladsl.ActorContext import akka.annotation.DoNotInherit import akka.persistence.typed.EventAdapter @@ -105,15 +104,10 @@ object EventSourcedBehavior { case concrete => concrete } - context match { - case impl: ActorContextAdapter[_] => - extractConcreteBehavior(impl.currentBehavior) match { - case w: Running.WithSeqNrAccessible => w.currentSequenceNumber - case s => - throw new IllegalStateException(s"Cannot extract the lastSequenceNumber in state ${s.getClass.getName}") - } - case c => - throw new IllegalStateException(s"Cannot extract the lastSequenceNumber from context ${c.getClass.getName}") + extractConcreteBehavior(context.currentBehavior) match { + case w: Running.WithSeqNrAccessible => w.currentSequenceNumber + case s => + throw new IllegalStateException(s"Cannot extract the lastSequenceNumber in state ${s.getClass.getName}") } }