Unstash to correct Beahviors.same when unstash in progress, #28831

* When unstashing one message the currentBehavior from the context
  is stale.
* While unstash is in progress we can keep track of currentBehavior
  inside StashBufferImpl.
This commit is contained in:
Patrik Nordwall 2020-03-27 17:01:59 +01:00
parent 37d87811b5
commit fa04d8efe3
2 changed files with 78 additions and 27 deletions

View file

@ -321,18 +321,15 @@ class UnstashingSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with
"Unstashing" must {
"work with initial Behaviors.same" in {
// FIXME #26148 unstashAll doesn't support Behavior.same
pending
val probe = TestProbe[String]()
// unstashing is inside setup
val ref = spawn(Behaviors.receive[String] {
case (_, "unstash") =>
val ref = spawn(Behaviors.receiveMessage[String] {
case "unstash" =>
Behaviors.withStash(10) { stash =>
stash.stash("one")
stash.unstashAll(Behaviors.same)
}
case (_, msg) =>
case msg =>
probe.ref ! msg
Behaviors.same
})
@ -344,8 +341,8 @@ class UnstashingSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with
"work with intermediate Behaviors.same" in {
val probe = TestProbe[String]()
// unstashing is inside setup
val ref = spawn(Behaviors.receivePartial[String] {
case (_, "unstash") =>
val ref = spawn(Behaviors.receiveMessagePartial[String] {
case "unstash" =>
Behaviors.withStash(10) { stash =>
stash.stash("one")
stash.stash("two")
@ -364,20 +361,17 @@ class UnstashingSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with
}
"work with supervised initial Behaviors.same" in {
// FIXME #26148 unstashAll doesn't support Behavior.same
pending
val probe = TestProbe[String]()
// unstashing is inside setup
val ref = spawn(
Behaviors
.supervise(Behaviors.receivePartial[String] {
case (_, "unstash") =>
.supervise(Behaviors.receiveMessagePartial[String] {
case "unstash" =>
Behaviors.withStash(10) { stash =>
stash.stash("one")
stash.unstashAll(Behaviors.same)
}
case (_, msg) =>
case msg =>
probe.ref ! msg
Behaviors.same
})
@ -394,8 +388,8 @@ class UnstashingSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with
// unstashing is inside setup
val ref = spawn(
Behaviors
.supervise(Behaviors.receivePartial[String] {
case (_, "unstash") =>
.supervise(Behaviors.receiveMessagePartial[String] {
case "unstash" =>
Behaviors.withStash(10) { stash =>
stash.stash("one")
stash.stash("two")
@ -414,6 +408,47 @@ class UnstashingSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with
probe.expectMessage("three")
}
"work with Behaviors.same when switching Behavior while unstashing one" in {
val probe = TestProbe[String]()
val ref = spawn(Behaviors.receiveMessage[String] {
case "unstash" =>
Behaviors.withStash(10) {
stash =>
def expectingA: Behaviors.Receive[String] = Behaviors.receiveMessage {
case "a" =>
probe.ref ! "a"
stash.unstash(expectingB, 1, identity)
case other =>
probe.ref ! s"unexpected [$other] when expecting [a]"
Behaviors.stopped
}
def expectingB: Behaviors.Receive[String] = Behaviors.receiveMessage {
case b @ ("b1" | "b2") =>
probe.ref ! b
stash.unstash(Behaviors.same, 1, identity)
case other =>
probe.ref ! s"unexpected [$other] when expecting [b]"
Behaviors.stopped
}
stash.stash("a")
stash.stash("b1")
stash.stash("b2")
stash.unstash(expectingA, 1, identity)
}
case other =>
probe.ref ! s"unexpected [$other] in first Behavior"
Behaviors.stopped
})
ref ! "unstash"
probe.expectMessage("a")
probe.expectMessage("b1")
probe.expectMessage("b2")
}
def testPostStop(probe: TestProbe[String], ref: ActorRef[String]): Unit = {
ref ! "stash"
ref ! "stash"

View file

@ -7,9 +7,9 @@ package akka.actor.typed.internal
import java.util.function.{ Function => JFunction }
import akka.actor.DeadLetter
import scala.annotation.tailrec
import scala.util.control.NonFatal
import akka.actor.typed.Behavior
import akka.actor.typed.Signal
import akka.actor.typed.TypedActorContext
@ -18,6 +18,7 @@ import akka.actor.typed.scaladsl
import akka.actor.typed.scaladsl.ActorContext
import akka.annotation.{ InternalApi, InternalStableApi }
import akka.japi.function.Procedure
import akka.util.OptionVal
import akka.util.{ unused, ConstantFun }
/**
@ -47,6 +48,8 @@ import akka.util.{ unused, ConstantFun }
private var _size: Int = if (_first eq null) 0 else 1
private var currentBehaviorWhenUnstashInProgress: OptionVal[Behavior[T]] = OptionVal.None
override def isEmpty: Boolean = _first eq null
override def nonEmpty: Boolean = !isEmpty
@ -128,15 +131,24 @@ import akka.util.{ unused, ConstantFun }
if (isEmpty)
behavior // optimization
else {
val iter = new Iterator[Node[T]] {
override def hasNext: Boolean = StashBufferImpl.this.nonEmpty
override def next(): Node[T] = {
val next = StashBufferImpl.this.dropHeadForUnstash()
unstashed(ctx, next)
next
}
}.take(math.min(numberOfMessages, size))
interpretUnstashedMessages(behavior, ctx, iter, wrap)
// currentBehaviorWhenUnstashInProgress is needed to keep track of current Behavior for Behaviors.same
// when unstash is called when a previous unstash is already in progress (in same call stack)
val unstashAlreadyInProgress = currentBehaviorWhenUnstashInProgress.isDefined
try {
val iter = new Iterator[Node[T]] {
override def hasNext: Boolean = StashBufferImpl.this.nonEmpty
override def next(): Node[T] = {
val next = StashBufferImpl.this.dropHeadForUnstash()
unstashed(ctx, next)
next
}
}.take(math.min(numberOfMessages, size))
interpretUnstashedMessages(behavior, ctx, iter, wrap)
} finally {
if (!unstashAlreadyInProgress)
currentBehaviorWhenUnstashInProgress = OptionVal.None
}
}
}
@ -147,6 +159,7 @@ import akka.util.{ unused, ConstantFun }
wrap: T => T): Behavior[T] = {
@tailrec def interpretOne(b: Behavior[T]): Behavior[T] = {
val b2 = Behavior.start(b, ctx)
currentBehaviorWhenUnstashInProgress = OptionVal.Some(b2)
if (!Behavior.isAlive(b2) || !messages.hasNext) b2
else {
val node = messages.next()
@ -183,7 +196,10 @@ import akka.util.{ unused, ConstantFun }
if (Behavior.isUnhandled(started))
throw new IllegalArgumentException("Cannot unstash with unhandled as starting behavior")
else if (started == BehaviorImpl.same) {
ctx.asScala.currentBehavior
currentBehaviorWhenUnstashInProgress match {
case OptionVal.None => ctx.asScala.currentBehavior
case OptionVal.Some(c) => c
}
} else started
if (Behavior.isAlive(actualInitialBehavior)) {