Merge pull request #28832 from akka/wip-unstash-one-same-patriknw
Unstash to correct Beahviors.same when unstash in progress, #28831
This commit is contained in:
commit
a55ff87c93
2 changed files with 78 additions and 27 deletions
|
|
@ -321,18 +321,15 @@ class UnstashingSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with
|
|||
"Unstashing" must {
|
||||
|
||||
"work with initial Behaviors.same" in {
|
||||
// FIXME #26148 unstashAll doesn't support Behavior.same
|
||||
pending
|
||||
|
||||
val probe = TestProbe[String]()
|
||||
// unstashing is inside setup
|
||||
val ref = spawn(Behaviors.receive[String] {
|
||||
case (_, "unstash") =>
|
||||
val ref = spawn(Behaviors.receiveMessage[String] {
|
||||
case "unstash" =>
|
||||
Behaviors.withStash(10) { stash =>
|
||||
stash.stash("one")
|
||||
stash.unstashAll(Behaviors.same)
|
||||
}
|
||||
case (_, msg) =>
|
||||
case msg =>
|
||||
probe.ref ! msg
|
||||
Behaviors.same
|
||||
})
|
||||
|
|
@ -344,8 +341,8 @@ class UnstashingSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with
|
|||
"work with intermediate Behaviors.same" in {
|
||||
val probe = TestProbe[String]()
|
||||
// unstashing is inside setup
|
||||
val ref = spawn(Behaviors.receivePartial[String] {
|
||||
case (_, "unstash") =>
|
||||
val ref = spawn(Behaviors.receiveMessagePartial[String] {
|
||||
case "unstash" =>
|
||||
Behaviors.withStash(10) { stash =>
|
||||
stash.stash("one")
|
||||
stash.stash("two")
|
||||
|
|
@ -364,20 +361,17 @@ class UnstashingSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with
|
|||
}
|
||||
|
||||
"work with supervised initial Behaviors.same" in {
|
||||
// FIXME #26148 unstashAll doesn't support Behavior.same
|
||||
pending
|
||||
|
||||
val probe = TestProbe[String]()
|
||||
// unstashing is inside setup
|
||||
val ref = spawn(
|
||||
Behaviors
|
||||
.supervise(Behaviors.receivePartial[String] {
|
||||
case (_, "unstash") =>
|
||||
.supervise(Behaviors.receiveMessagePartial[String] {
|
||||
case "unstash" =>
|
||||
Behaviors.withStash(10) { stash =>
|
||||
stash.stash("one")
|
||||
stash.unstashAll(Behaviors.same)
|
||||
}
|
||||
case (_, msg) =>
|
||||
case msg =>
|
||||
probe.ref ! msg
|
||||
Behaviors.same
|
||||
})
|
||||
|
|
@ -394,8 +388,8 @@ class UnstashingSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with
|
|||
// unstashing is inside setup
|
||||
val ref = spawn(
|
||||
Behaviors
|
||||
.supervise(Behaviors.receivePartial[String] {
|
||||
case (_, "unstash") =>
|
||||
.supervise(Behaviors.receiveMessagePartial[String] {
|
||||
case "unstash" =>
|
||||
Behaviors.withStash(10) { stash =>
|
||||
stash.stash("one")
|
||||
stash.stash("two")
|
||||
|
|
@ -414,6 +408,47 @@ class UnstashingSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with
|
|||
probe.expectMessage("three")
|
||||
}
|
||||
|
||||
"work with Behaviors.same when switching Behavior while unstashing one" in {
|
||||
val probe = TestProbe[String]()
|
||||
|
||||
val ref = spawn(Behaviors.receiveMessage[String] {
|
||||
case "unstash" =>
|
||||
Behaviors.withStash(10) {
|
||||
stash =>
|
||||
def expectingA: Behaviors.Receive[String] = Behaviors.receiveMessage {
|
||||
case "a" =>
|
||||
probe.ref ! "a"
|
||||
stash.unstash(expectingB, 1, identity)
|
||||
case other =>
|
||||
probe.ref ! s"unexpected [$other] when expecting [a]"
|
||||
Behaviors.stopped
|
||||
}
|
||||
|
||||
def expectingB: Behaviors.Receive[String] = Behaviors.receiveMessage {
|
||||
case b @ ("b1" | "b2") =>
|
||||
probe.ref ! b
|
||||
stash.unstash(Behaviors.same, 1, identity)
|
||||
case other =>
|
||||
probe.ref ! s"unexpected [$other] when expecting [b]"
|
||||
Behaviors.stopped
|
||||
}
|
||||
|
||||
stash.stash("a")
|
||||
stash.stash("b1")
|
||||
stash.stash("b2")
|
||||
stash.unstash(expectingA, 1, identity)
|
||||
}
|
||||
case other =>
|
||||
probe.ref ! s"unexpected [$other] in first Behavior"
|
||||
Behaviors.stopped
|
||||
})
|
||||
|
||||
ref ! "unstash"
|
||||
probe.expectMessage("a")
|
||||
probe.expectMessage("b1")
|
||||
probe.expectMessage("b2")
|
||||
}
|
||||
|
||||
def testPostStop(probe: TestProbe[String], ref: ActorRef[String]): Unit = {
|
||||
ref ! "stash"
|
||||
ref ! "stash"
|
||||
|
|
|
|||
|
|
@ -7,9 +7,9 @@ package akka.actor.typed.internal
|
|||
import java.util.function.{ Function => JFunction }
|
||||
|
||||
import akka.actor.DeadLetter
|
||||
|
||||
import scala.annotation.tailrec
|
||||
import scala.util.control.NonFatal
|
||||
|
||||
import akka.actor.typed.Behavior
|
||||
import akka.actor.typed.Signal
|
||||
import akka.actor.typed.TypedActorContext
|
||||
|
|
@ -18,6 +18,7 @@ import akka.actor.typed.scaladsl
|
|||
import akka.actor.typed.scaladsl.ActorContext
|
||||
import akka.annotation.{ InternalApi, InternalStableApi }
|
||||
import akka.japi.function.Procedure
|
||||
import akka.util.OptionVal
|
||||
import akka.util.{ unused, ConstantFun }
|
||||
|
||||
/**
|
||||
|
|
@ -47,6 +48,8 @@ import akka.util.{ unused, ConstantFun }
|
|||
|
||||
private var _size: Int = if (_first eq null) 0 else 1
|
||||
|
||||
private var currentBehaviorWhenUnstashInProgress: OptionVal[Behavior[T]] = OptionVal.None
|
||||
|
||||
override def isEmpty: Boolean = _first eq null
|
||||
|
||||
override def nonEmpty: Boolean = !isEmpty
|
||||
|
|
@ -128,15 +131,24 @@ import akka.util.{ unused, ConstantFun }
|
|||
if (isEmpty)
|
||||
behavior // optimization
|
||||
else {
|
||||
val iter = new Iterator[Node[T]] {
|
||||
override def hasNext: Boolean = StashBufferImpl.this.nonEmpty
|
||||
override def next(): Node[T] = {
|
||||
val next = StashBufferImpl.this.dropHeadForUnstash()
|
||||
unstashed(ctx, next)
|
||||
next
|
||||
}
|
||||
}.take(math.min(numberOfMessages, size))
|
||||
interpretUnstashedMessages(behavior, ctx, iter, wrap)
|
||||
// currentBehaviorWhenUnstashInProgress is needed to keep track of current Behavior for Behaviors.same
|
||||
// when unstash is called when a previous unstash is already in progress (in same call stack)
|
||||
val unstashAlreadyInProgress = currentBehaviorWhenUnstashInProgress.isDefined
|
||||
try {
|
||||
val iter = new Iterator[Node[T]] {
|
||||
override def hasNext: Boolean = StashBufferImpl.this.nonEmpty
|
||||
|
||||
override def next(): Node[T] = {
|
||||
val next = StashBufferImpl.this.dropHeadForUnstash()
|
||||
unstashed(ctx, next)
|
||||
next
|
||||
}
|
||||
}.take(math.min(numberOfMessages, size))
|
||||
interpretUnstashedMessages(behavior, ctx, iter, wrap)
|
||||
} finally {
|
||||
if (!unstashAlreadyInProgress)
|
||||
currentBehaviorWhenUnstashInProgress = OptionVal.None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -147,6 +159,7 @@ import akka.util.{ unused, ConstantFun }
|
|||
wrap: T => T): Behavior[T] = {
|
||||
@tailrec def interpretOne(b: Behavior[T]): Behavior[T] = {
|
||||
val b2 = Behavior.start(b, ctx)
|
||||
currentBehaviorWhenUnstashInProgress = OptionVal.Some(b2)
|
||||
if (!Behavior.isAlive(b2) || !messages.hasNext) b2
|
||||
else {
|
||||
val node = messages.next()
|
||||
|
|
@ -183,7 +196,10 @@ import akka.util.{ unused, ConstantFun }
|
|||
if (Behavior.isUnhandled(started))
|
||||
throw new IllegalArgumentException("Cannot unstash with unhandled as starting behavior")
|
||||
else if (started == BehaviorImpl.same) {
|
||||
ctx.asScala.currentBehavior
|
||||
currentBehaviorWhenUnstashInProgress match {
|
||||
case OptionVal.None => ctx.asScala.currentBehavior
|
||||
case OptionVal.Some(c) => c
|
||||
}
|
||||
} else started
|
||||
|
||||
if (Behavior.isAlive(actualInitialBehavior)) {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue