2018-10-29 17:19:37 +08:00
|
|
|
/*
|
2021-01-08 17:55:38 +01:00
|
|
|
* Copyright (C) 2015-2021 Lightbend Inc. <https://www.lightbend.com>
|
2015-01-10 14:44:25 +00:00
|
|
|
*/
|
2018-03-13 23:45:55 +09:00
|
|
|
|
2015-01-10 14:44:25 +00:00
|
|
|
package akka.dispatch
|
|
|
|
|
|
2020-04-27 20:32:18 +08:00
|
|
|
import scala.concurrent.duration._
|
|
|
|
|
|
2015-01-10 14:44:25 +00:00
|
|
|
import com.typesafe.config.Config
|
2020-04-27 20:32:18 +08:00
|
|
|
import language.postfixOps
|
|
|
|
|
|
2019-03-11 10:38:24 +01:00
|
|
|
import akka.actor.{ Actor, ActorSystem, Props }
|
|
|
|
|
import akka.testkit.{ AkkaSpec, DefaultTimeout }
|
2019-04-15 09:54:16 +01:00
|
|
|
import akka.util.unused
|
|
|
|
|
|
2015-01-10 14:44:25 +00:00
|
|
|
object StablePriorityDispatcherSpec {
|
2019-05-24 08:11:50 +02:00
|
|
|
case object Result
|
|
|
|
|
|
2015-01-10 14:44:25 +00:00
|
|
|
val config = """
|
|
|
|
|
unbounded-stable-prio-dispatcher {
|
|
|
|
|
mailbox-type = "akka.dispatch.StablePriorityDispatcherSpec$Unbounded"
|
|
|
|
|
}
|
|
|
|
|
bounded-stable-prio-dispatcher {
|
|
|
|
|
mailbox-type = "akka.dispatch.StablePriorityDispatcherSpec$Bounded"
|
|
|
|
|
}
|
|
|
|
|
"""
|
|
|
|
|
|
2019-04-15 09:54:16 +01:00
|
|
|
class Unbounded(@unused settings: ActorSystem.Settings, @unused config: Config)
|
2019-03-11 10:38:24 +01:00
|
|
|
extends UnboundedStablePriorityMailbox(PriorityGenerator({
|
|
|
|
|
case i: Int if i <= 100 => i // Small integers have high priority
|
2019-04-15 09:54:16 +01:00
|
|
|
case _: Int => 101 // Don't care for other integers
|
2019-05-24 08:11:50 +02:00
|
|
|
case Result => Int.MaxValue
|
2019-03-11 10:38:24 +01:00
|
|
|
}: Any => Int))
|
2015-01-10 14:44:25 +00:00
|
|
|
|
2019-04-15 09:54:16 +01:00
|
|
|
class Bounded(@unused settings: ActorSystem.Settings, @unused config: Config)
|
2019-03-11 10:38:24 +01:00
|
|
|
extends BoundedStablePriorityMailbox(PriorityGenerator({
|
|
|
|
|
case i: Int if i <= 100 => i // Small integers have high priority
|
2019-04-15 09:54:16 +01:00
|
|
|
case _: Int => 101 // Don't care for other integers
|
2019-05-24 08:11:50 +02:00
|
|
|
case Result => Int.MaxValue
|
2019-03-11 10:38:24 +01:00
|
|
|
}: Any => Int), 1000, 10 seconds)
|
2015-01-10 14:44:25 +00:00
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
class StablePriorityDispatcherSpec extends AkkaSpec(StablePriorityDispatcherSpec.config) with DefaultTimeout {
|
2019-05-24 08:11:50 +02:00
|
|
|
import StablePriorityDispatcherSpec._
|
2015-01-10 14:44:25 +00:00
|
|
|
|
|
|
|
|
"A StablePriorityDispatcher" must {
|
|
|
|
|
"Order its messages according to the specified comparator while preserving FIFO for equal priority messages, " +
|
2019-03-11 10:38:24 +01:00
|
|
|
"using an unbounded mailbox" in {
|
|
|
|
|
val dispatcherKey = "unbounded-stable-prio-dispatcher"
|
|
|
|
|
testOrdering(dispatcherKey)
|
|
|
|
|
}
|
2015-01-10 14:44:25 +00:00
|
|
|
|
|
|
|
|
"Order its messages according to the specified comparator while preserving FIFO for equal priority messages, " +
|
2019-03-11 10:38:24 +01:00
|
|
|
"using a bounded mailbox" in {
|
|
|
|
|
val dispatcherKey = "bounded-stable-prio-dispatcher"
|
|
|
|
|
testOrdering(dispatcherKey)
|
|
|
|
|
}
|
2015-01-10 14:44:25 +00:00
|
|
|
|
2018-07-25 20:38:27 +09:00
|
|
|
def testOrdering(dispatcherKey: String): Unit = {
|
2015-01-10 14:44:25 +00:00
|
|
|
val msgs = (1 to 200) toList
|
|
|
|
|
val shuffled = scala.util.Random.shuffle(msgs)
|
|
|
|
|
|
|
|
|
|
// It's important that the actor under test is not a top level actor
|
|
|
|
|
// with RepointableActorRef, since messages might be queued in
|
|
|
|
|
// UnstartedCell and then sent to the StablePriorityQueue and consumed immediately
|
|
|
|
|
// without the ordering taking place.
|
2019-04-15 09:54:16 +01:00
|
|
|
system.actorOf(Props(new Actor {
|
2015-01-10 14:44:25 +00:00
|
|
|
context.actorOf(Props(new Actor {
|
|
|
|
|
|
|
|
|
|
val acc = scala.collection.mutable.ListBuffer[Int]()
|
|
|
|
|
|
2019-03-11 10:38:24 +01:00
|
|
|
shuffled.foreach { m =>
|
|
|
|
|
self ! m
|
|
|
|
|
}
|
2015-01-10 14:44:25 +00:00
|
|
|
|
2019-05-24 08:11:50 +02:00
|
|
|
self.tell(Result, testActor)
|
2015-01-10 14:44:25 +00:00
|
|
|
|
|
|
|
|
def receive = {
|
2019-05-24 08:11:50 +02:00
|
|
|
case i: Int => acc += i
|
|
|
|
|
case Result => sender() ! acc.toList
|
2015-01-10 14:44:25 +00:00
|
|
|
}
|
|
|
|
|
}).withDispatcher(dispatcherKey))
|
|
|
|
|
|
|
|
|
|
def receive = Actor.emptyBehavior
|
|
|
|
|
|
|
|
|
|
}))
|
|
|
|
|
|
|
|
|
|
// Low messages should come out first, and in priority order. High messages follow - they are equal priority and
|
|
|
|
|
// should come out in the same order in which they were sent.
|
|
|
|
|
val lo = (1 to 100) toList
|
2019-03-11 10:38:24 +01:00
|
|
|
val hi = shuffled.filter { _ > 100 }
|
2015-01-16 11:09:59 +01:00
|
|
|
expectMsgType[List[Int]] should ===(lo ++ hi)
|
2015-01-10 14:44:25 +00:00
|
|
|
}
|
|
|
|
|
}
|
2015-01-16 11:09:59 +01:00
|
|
|
}
|