diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/StashSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/StashSpec.scala index 90edf7cbf6..fefb802478 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/StashSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/StashSpec.scala @@ -321,18 +321,15 @@ class UnstashingSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with "Unstashing" must { "work with initial Behaviors.same" in { - // FIXME #26148 unstashAll doesn't support Behavior.same - pending - val probe = TestProbe[String]() // unstashing is inside setup - val ref = spawn(Behaviors.receive[String] { - case (_, "unstash") => + val ref = spawn(Behaviors.receiveMessage[String] { + case "unstash" => Behaviors.withStash(10) { stash => stash.stash("one") stash.unstashAll(Behaviors.same) } - case (_, msg) => + case msg => probe.ref ! msg Behaviors.same }) @@ -344,8 +341,8 @@ class UnstashingSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with "work with intermediate Behaviors.same" in { val probe = TestProbe[String]() // unstashing is inside setup - val ref = spawn(Behaviors.receivePartial[String] { - case (_, "unstash") => + val ref = spawn(Behaviors.receiveMessagePartial[String] { + case "unstash" => Behaviors.withStash(10) { stash => stash.stash("one") stash.stash("two") @@ -364,20 +361,17 @@ class UnstashingSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with } "work with supervised initial Behaviors.same" in { - // FIXME #26148 unstashAll doesn't support Behavior.same - pending - val probe = TestProbe[String]() // unstashing is inside setup val ref = spawn( Behaviors - .supervise(Behaviors.receivePartial[String] { - case (_, "unstash") => + .supervise(Behaviors.receiveMessagePartial[String] { + case "unstash" => Behaviors.withStash(10) { stash => stash.stash("one") stash.unstashAll(Behaviors.same) } - case (_, msg) => + case msg => probe.ref ! msg Behaviors.same }) @@ -394,8 +388,8 @@ class UnstashingSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with // unstashing is inside setup val ref = spawn( Behaviors - .supervise(Behaviors.receivePartial[String] { - case (_, "unstash") => + .supervise(Behaviors.receiveMessagePartial[String] { + case "unstash" => Behaviors.withStash(10) { stash => stash.stash("one") stash.stash("two") @@ -414,6 +408,47 @@ class UnstashingSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with probe.expectMessage("three") } + "work with Behaviors.same when switching Behavior while unstashing one" in { + val probe = TestProbe[String]() + + val ref = spawn(Behaviors.receiveMessage[String] { + case "unstash" => + Behaviors.withStash(10) { + stash => + def expectingA: Behaviors.Receive[String] = Behaviors.receiveMessage { + case "a" => + probe.ref ! "a" + stash.unstash(expectingB, 1, identity) + case other => + probe.ref ! s"unexpected [$other] when expecting [a]" + Behaviors.stopped + } + + def expectingB: Behaviors.Receive[String] = Behaviors.receiveMessage { + case b @ ("b1" | "b2") => + probe.ref ! b + stash.unstash(Behaviors.same, 1, identity) + case other => + probe.ref ! s"unexpected [$other] when expecting [b]" + Behaviors.stopped + } + + stash.stash("a") + stash.stash("b1") + stash.stash("b2") + stash.unstash(expectingA, 1, identity) + } + case other => + probe.ref ! s"unexpected [$other] in first Behavior" + Behaviors.stopped + }) + + ref ! "unstash" + probe.expectMessage("a") + probe.expectMessage("b1") + probe.expectMessage("b2") + } + def testPostStop(probe: TestProbe[String], ref: ActorRef[String]): Unit = { ref ! "stash" ref ! "stash" diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/StashBufferImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/StashBufferImpl.scala index 0e5d16dcbe..6857b01e68 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/StashBufferImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/StashBufferImpl.scala @@ -7,9 +7,9 @@ package akka.actor.typed.internal import java.util.function.{ Function => JFunction } import akka.actor.DeadLetter - import scala.annotation.tailrec import scala.util.control.NonFatal + import akka.actor.typed.Behavior import akka.actor.typed.Signal import akka.actor.typed.TypedActorContext @@ -18,6 +18,7 @@ import akka.actor.typed.scaladsl import akka.actor.typed.scaladsl.ActorContext import akka.annotation.{ InternalApi, InternalStableApi } import akka.japi.function.Procedure +import akka.util.OptionVal import akka.util.{ unused, ConstantFun } /** @@ -47,6 +48,8 @@ import akka.util.{ unused, ConstantFun } private var _size: Int = if (_first eq null) 0 else 1 + private var currentBehaviorWhenUnstashInProgress: OptionVal[Behavior[T]] = OptionVal.None + override def isEmpty: Boolean = _first eq null override def nonEmpty: Boolean = !isEmpty @@ -128,15 +131,24 @@ import akka.util.{ unused, ConstantFun } if (isEmpty) behavior // optimization else { - val iter = new Iterator[Node[T]] { - override def hasNext: Boolean = StashBufferImpl.this.nonEmpty - override def next(): Node[T] = { - val next = StashBufferImpl.this.dropHeadForUnstash() - unstashed(ctx, next) - next - } - }.take(math.min(numberOfMessages, size)) - interpretUnstashedMessages(behavior, ctx, iter, wrap) + // currentBehaviorWhenUnstashInProgress is needed to keep track of current Behavior for Behaviors.same + // when unstash is called when a previous unstash is already in progress (in same call stack) + val unstashAlreadyInProgress = currentBehaviorWhenUnstashInProgress.isDefined + try { + val iter = new Iterator[Node[T]] { + override def hasNext: Boolean = StashBufferImpl.this.nonEmpty + + override def next(): Node[T] = { + val next = StashBufferImpl.this.dropHeadForUnstash() + unstashed(ctx, next) + next + } + }.take(math.min(numberOfMessages, size)) + interpretUnstashedMessages(behavior, ctx, iter, wrap) + } finally { + if (!unstashAlreadyInProgress) + currentBehaviorWhenUnstashInProgress = OptionVal.None + } } } @@ -147,6 +159,7 @@ import akka.util.{ unused, ConstantFun } wrap: T => T): Behavior[T] = { @tailrec def interpretOne(b: Behavior[T]): Behavior[T] = { val b2 = Behavior.start(b, ctx) + currentBehaviorWhenUnstashInProgress = OptionVal.Some(b2) if (!Behavior.isAlive(b2) || !messages.hasNext) b2 else { val node = messages.next() @@ -183,7 +196,10 @@ import akka.util.{ unused, ConstantFun } if (Behavior.isUnhandled(started)) throw new IllegalArgumentException("Cannot unstash with unhandled as starting behavior") else if (started == BehaviorImpl.same) { - ctx.asScala.currentBehavior + currentBehaviorWhenUnstashInProgress match { + case OptionVal.None => ctx.asScala.currentBehavior + case OptionVal.Some(c) => c + } } else started if (Behavior.isAlive(actualInitialBehavior)) {