Fix racy ActorWithBoundedStashSpec, see #2554
* Several things were racy in ActorWithBoundedStashSpec * pushTimeout could come in to play * shared mailbox between the two test cases * didn't verify order of messages * mailbox capacity could fill up in not intended way, e.g. 11 messages could be sent before any of them was processed * etc
This commit is contained in:
parent
616f8be730
commit
289c161baa
1 changed files with 46 additions and 16 deletions
|
|
@ -21,8 +21,18 @@ object ActorWithBoundedStashSpec {
|
|||
|
||||
class StashingActor extends Actor with Stash {
|
||||
def receive = {
|
||||
case "hello1" ⇒ stash()
|
||||
case "world" ⇒ unstashAll()
|
||||
case msg: String if msg.startsWith("hello") ⇒
|
||||
stash()
|
||||
sender ! "ok"
|
||||
|
||||
case "world" ⇒
|
||||
context.become(afterWorldBehaviour)
|
||||
unstashAll()
|
||||
|
||||
}
|
||||
|
||||
def afterWorldBehaviour: Receive = {
|
||||
case _ ⇒ stash()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -30,29 +40,38 @@ object ActorWithBoundedStashSpec {
|
|||
var numStashed = 0
|
||||
|
||||
def receive = {
|
||||
case "hello2" ⇒
|
||||
case msg: String if msg.startsWith("hello") ⇒
|
||||
numStashed += 1
|
||||
try stash() catch {
|
||||
try { stash(); sender ! "ok" } catch {
|
||||
case _: StashOverflowException ⇒
|
||||
if (numStashed == 21) {
|
||||
sender ! "STASHOVERFLOW"
|
||||
context stop self
|
||||
} else {
|
||||
sender ! "Unexpected StashOverflowException: " + numStashed
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// bounded deque-based mailbox with capacity 10
|
||||
class Bounded(settings: Settings, config: Config) extends BoundedDequeBasedMailbox(10, 10 millis)
|
||||
class Bounded10(settings: Settings, config: Config) extends BoundedDequeBasedMailbox(10, 500 millis)
|
||||
|
||||
val dispatcherId = "my-dispatcher"
|
||||
class Bounded100(settings: Settings, config: Config) extends BoundedDequeBasedMailbox(100, 500 millis)
|
||||
|
||||
val dispatcherId1 = "my-dispatcher-1"
|
||||
val dispatcherId2 = "my-dispatcher-2"
|
||||
|
||||
val testConf: Config = ConfigFactory.parseString("""
|
||||
%s {
|
||||
mailbox-type = "%s"
|
||||
stash-capacity = 20
|
||||
}
|
||||
""".format(dispatcherId, classOf[Bounded].getName))
|
||||
%s {
|
||||
mailbox-type = "%s"
|
||||
stash-capacity = 20
|
||||
}
|
||||
""".format(dispatcherId1, classOf[Bounded10].getName, dispatcherId2, classOf[Bounded100].getName))
|
||||
}
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
|
|
@ -60,8 +79,7 @@ class ActorWithBoundedStashSpec extends AkkaSpec(ActorWithBoundedStashSpec.testC
|
|||
import ActorWithBoundedStashSpec._
|
||||
|
||||
override def atStartup: Unit = {
|
||||
system.eventStream.publish(Mute(EventFilter.warning(pattern = ".*received dead letter from.*hello1")))
|
||||
system.eventStream.publish(Mute(EventFilter.warning(pattern = ".*received dead letter from.*hello2")))
|
||||
system.eventStream.publish(Mute(EventFilter.warning(pattern = ".*received dead letter from.*hello.*")))
|
||||
}
|
||||
|
||||
override def beforeEach(): Unit =
|
||||
|
|
@ -73,23 +91,35 @@ class ActorWithBoundedStashSpec extends AkkaSpec(ActorWithBoundedStashSpec.testC
|
|||
"An Actor with Stash" must {
|
||||
|
||||
"end up in DeadLetters in case of a capacity violation" in {
|
||||
val stasher = system.actorOf(Props[StashingActor].withDispatcher(dispatcherId))
|
||||
val stasher = system.actorOf(Props[StashingActor].withDispatcher(dispatcherId1))
|
||||
// fill up stash
|
||||
(1 to 11) foreach { _ ⇒ stasher ! "hello1" }
|
||||
for (n ← 1 to 11) {
|
||||
stasher ! "hello" + n
|
||||
expectMsg("ok")
|
||||
}
|
||||
|
||||
// cause unstashAll with capacity violation
|
||||
stasher ! "world"
|
||||
expectMsg(DeadLetter("hello1", testActor, stasher))
|
||||
system stop stasher
|
||||
(1 to 10) foreach { _ ⇒ expectMsg(DeadLetter("hello1", testActor, stasher)) }
|
||||
|
||||
stasher ! PoisonPill
|
||||
// stashed messages are sent to deadletters when stasher is stopped
|
||||
for (n ← 2 to 11) expectMsg(DeadLetter("hello" + n, testActor, stasher))
|
||||
}
|
||||
|
||||
"throw a StashOverflowException in case of a stash capacity violation" in {
|
||||
val stasher = system.actorOf(Props[StashingActorWithOverflow].withDispatcher(dispatcherId))
|
||||
val stasher = system.actorOf(Props[StashingActorWithOverflow].withDispatcher(dispatcherId2))
|
||||
// fill up stash
|
||||
(1 to 21) foreach { _ ⇒ stasher ! "hello2" }
|
||||
for (n ← 1 to 20) {
|
||||
stasher ! "hello" + n
|
||||
expectMsg("ok")
|
||||
}
|
||||
|
||||
stasher ! "hello21"
|
||||
expectMsg("STASHOVERFLOW")
|
||||
(1 to 20) foreach { _ ⇒ expectMsg(DeadLetter("hello2", testActor, stasher)) }
|
||||
|
||||
// stashed messages are sent to deadletters when stasher is stopped,
|
||||
for (n ← 1 to 20) expectMsg(DeadLetter("hello" + n, testActor, stasher))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue