2011-12-19 11:07:59 +01:00
|
|
|
/**
|
2013-01-09 01:47:48 +01:00
|
|
|
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
2011-12-19 11:07:59 +01:00
|
|
|
*/
|
2012-05-22 11:37:09 +02:00
|
|
|
package docs.dispatcher
|
2011-12-12 21:20:32 +01:00
|
|
|
|
2012-06-28 15:33:49 +02:00
|
|
|
import language.postfixOps
|
|
|
|
|
|
2011-12-12 21:20:32 +01:00
|
|
|
import org.scalatest.{ BeforeAndAfterAll, WordSpec }
|
|
|
|
|
import org.scalatest.matchers.MustMatchers
|
|
|
|
|
import akka.testkit.AkkaSpec
|
|
|
|
|
import akka.event.Logging
|
|
|
|
|
import akka.event.LoggingAdapter
|
2012-09-21 14:50:06 +02:00
|
|
|
import scala.concurrent.duration._
|
2013-03-31 03:04:12 +02:00
|
|
|
import akka.actor._
|
2011-12-12 21:20:32 +01:00
|
|
|
|
|
|
|
|
object DispatcherDocSpec {
|
|
|
|
|
val config = """
|
|
|
|
|
//#my-dispatcher-config
|
|
|
|
|
my-dispatcher {
|
2012-02-07 09:50:03 +01:00
|
|
|
# Dispatcher is the name of the event-based dispatcher
|
|
|
|
|
type = Dispatcher
|
|
|
|
|
# What kind of ExecutionService to use
|
|
|
|
|
executor = "fork-join-executor"
|
|
|
|
|
# Configuration for the fork join pool
|
|
|
|
|
fork-join-executor {
|
|
|
|
|
# Min number of threads to cap factor-based parallelism number to
|
|
|
|
|
parallelism-min = 2
|
|
|
|
|
# Parallelism (threads) ... ceil(available processors * factor)
|
|
|
|
|
parallelism-factor = 2.0
|
|
|
|
|
# Max number of threads to cap factor-based parallelism number to
|
|
|
|
|
parallelism-max = 10
|
|
|
|
|
}
|
2012-02-24 14:28:17 +01:00
|
|
|
# Throughput defines the maximum number of messages to be
|
|
|
|
|
# processed per actor before the thread jumps to the next actor.
|
|
|
|
|
# Set to 1 for as fair as possible.
|
2012-02-07 09:50:03 +01:00
|
|
|
throughput = 100
|
|
|
|
|
}
|
|
|
|
|
//#my-dispatcher-config
|
|
|
|
|
|
|
|
|
|
//#my-thread-pool-dispatcher-config
|
|
|
|
|
my-thread-pool-dispatcher {
|
2011-12-14 15:12:40 +01:00
|
|
|
# Dispatcher is the name of the event-based dispatcher
|
|
|
|
|
type = Dispatcher
|
2012-01-30 16:34:25 +01:00
|
|
|
# What kind of ExecutionService to use
|
|
|
|
|
executor = "thread-pool-executor"
|
|
|
|
|
# Configuration for the thread pool
|
|
|
|
|
thread-pool-executor {
|
|
|
|
|
# minimum number of threads to cap factor-based core number to
|
|
|
|
|
core-pool-size-min = 2
|
|
|
|
|
# No of core threads ... ceil(available processors * factor)
|
|
|
|
|
core-pool-size-factor = 2.0
|
|
|
|
|
# maximum number of threads to cap factor-based number to
|
|
|
|
|
core-pool-size-max = 10
|
|
|
|
|
}
|
2012-02-24 14:28:17 +01:00
|
|
|
# Throughput defines the maximum number of messages to be
|
|
|
|
|
# processed per actor before the thread jumps to the next actor.
|
|
|
|
|
# Set to 1 for as fair as possible.
|
2011-12-14 15:12:40 +01:00
|
|
|
throughput = 100
|
2011-12-12 21:20:32 +01:00
|
|
|
}
|
2012-02-07 09:50:03 +01:00
|
|
|
//#my-thread-pool-dispatcher-config
|
|
|
|
|
|
|
|
|
|
//#my-pinned-dispatcher-config
|
|
|
|
|
my-pinned-dispatcher {
|
|
|
|
|
executor = "thread-pool-executor"
|
|
|
|
|
type = PinnedDispatcher
|
|
|
|
|
}
|
|
|
|
|
//#my-pinned-dispatcher-config
|
2011-12-14 15:12:40 +01:00
|
|
|
|
2011-12-12 21:20:32 +01:00
|
|
|
//#my-bounded-config
|
|
|
|
|
my-dispatcher-bounded-queue {
|
|
|
|
|
type = Dispatcher
|
2012-01-30 16:34:25 +01:00
|
|
|
executor = "thread-pool-executor"
|
|
|
|
|
thread-pool-executor {
|
|
|
|
|
core-pool-size-factor = 8.0
|
|
|
|
|
max-pool-size-factor = 16.0
|
|
|
|
|
}
|
2012-01-03 10:03:58 +01:00
|
|
|
# Specifies the bounded capacity of the mailbox queue
|
|
|
|
|
mailbox-capacity = 100
|
2011-12-12 21:20:32 +01:00
|
|
|
throughput = 3
|
|
|
|
|
}
|
|
|
|
|
//#my-bounded-config
|
2011-12-14 15:12:40 +01:00
|
|
|
|
2011-12-12 21:20:32 +01:00
|
|
|
//#my-balancing-config
|
|
|
|
|
my-balancing-dispatcher {
|
|
|
|
|
type = BalancingDispatcher
|
2012-01-30 16:34:25 +01:00
|
|
|
executor = "thread-pool-executor"
|
|
|
|
|
thread-pool-executor {
|
|
|
|
|
core-pool-size-factor = 8.0
|
|
|
|
|
max-pool-size-factor = 16.0
|
|
|
|
|
}
|
2011-12-12 21:20:32 +01:00
|
|
|
}
|
|
|
|
|
//#my-balancing-config
|
2011-12-21 20:34:46 +01:00
|
|
|
|
|
|
|
|
//#prio-dispatcher-config
|
|
|
|
|
prio-dispatcher {
|
2012-05-22 11:37:09 +02:00
|
|
|
mailbox-type = "docs.dispatcher.DispatcherDocSpec$MyPrioMailbox"
|
2011-12-21 20:34:46 +01:00
|
|
|
}
|
|
|
|
|
//#prio-dispatcher-config
|
|
|
|
|
|
|
|
|
|
//#prio-dispatcher-config-java
|
|
|
|
|
prio-dispatcher-java {
|
2012-05-22 11:37:09 +02:00
|
|
|
mailbox-type = "docs.dispatcher.DispatcherDocTestBase$MyPrioMailbox"
|
2012-02-24 14:28:17 +01:00
|
|
|
//Other dispatcher configuration goes here
|
2011-12-21 20:34:46 +01:00
|
|
|
}
|
|
|
|
|
//#prio-dispatcher-config-java
|
2011-12-12 21:20:32 +01:00
|
|
|
"""
|
|
|
|
|
|
2011-12-21 20:34:46 +01:00
|
|
|
//#prio-mailbox
|
|
|
|
|
import akka.dispatch.PriorityGenerator
|
|
|
|
|
import akka.dispatch.UnboundedPriorityMailbox
|
|
|
|
|
import com.typesafe.config.Config
|
|
|
|
|
|
2012-02-24 14:28:17 +01:00
|
|
|
// We inherit, in this case, from UnboundedPriorityMailbox
|
|
|
|
|
// and seed it with the priority generator
|
2012-10-02 11:09:38 +02:00
|
|
|
class MyPrioMailbox(settings: ActorSystem.Settings, config: Config)
|
|
|
|
|
extends UnboundedPriorityMailbox(
|
|
|
|
|
// Create a new PriorityGenerator, lower prio means more important
|
|
|
|
|
PriorityGenerator {
|
|
|
|
|
// 'highpriority messages should be treated first if possible
|
|
|
|
|
case 'highpriority ⇒ 0
|
|
|
|
|
|
|
|
|
|
// 'lowpriority messages should be treated last if possible
|
|
|
|
|
case 'lowpriority ⇒ 2
|
|
|
|
|
|
|
|
|
|
// PoisonPill when no other left
|
|
|
|
|
case PoisonPill ⇒ 3
|
|
|
|
|
|
|
|
|
|
// We default to 1, which is in between high and low
|
|
|
|
|
case otherwise ⇒ 1
|
|
|
|
|
})
|
2011-12-21 20:34:46 +01:00
|
|
|
//#prio-mailbox
|
|
|
|
|
|
2011-12-12 21:20:32 +01:00
|
|
|
class MyActor extends Actor {
|
|
|
|
|
def receive = {
|
|
|
|
|
case x ⇒
|
|
|
|
|
}
|
|
|
|
|
}
|
2012-02-24 14:28:17 +01:00
|
|
|
|
|
|
|
|
//#mailbox-implementation-example
|
2012-06-19 14:52:02 +02:00
|
|
|
class MyUnboundedMailbox extends akka.dispatch.MailboxType {
|
2012-06-13 17:57:56 +02:00
|
|
|
import akka.actor.{ ActorRef, ActorSystem }
|
2012-02-24 14:28:17 +01:00
|
|
|
import com.typesafe.config.Config
|
|
|
|
|
import java.util.concurrent.ConcurrentLinkedQueue
|
|
|
|
|
import akka.dispatch.{
|
|
|
|
|
Envelope,
|
|
|
|
|
MessageQueue,
|
|
|
|
|
QueueBasedMessageQueue,
|
|
|
|
|
UnboundedMessageQueueSemantics
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// This constructor signature must exist, it will be called by Akka
|
2012-02-26 21:38:56 +01:00
|
|
|
def this(settings: ActorSystem.Settings, config: Config) = this()
|
2012-02-24 14:28:17 +01:00
|
|
|
|
|
|
|
|
// The create method is called to create the MessageQueue
|
2012-10-01 20:35:19 +02:00
|
|
|
final override def create(owner: Option[ActorRef],
|
|
|
|
|
system: Option[ActorSystem]): MessageQueue =
|
2012-02-24 14:28:17 +01:00
|
|
|
new QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
|
|
|
|
|
final val queue = new ConcurrentLinkedQueue[Envelope]()
|
|
|
|
|
}
|
|
|
|
|
}
|
2012-06-19 14:52:02 +02:00
|
|
|
//#mailbox-implementation-example
|
2011-12-12 21:20:32 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
class DispatcherDocSpec extends AkkaSpec(DispatcherDocSpec.config) {
|
|
|
|
|
|
|
|
|
|
import DispatcherDocSpec.MyActor
|
|
|
|
|
|
|
|
|
|
"defining dispatcher" in {
|
2012-02-24 14:28:17 +01:00
|
|
|
val context = system
|
2011-12-12 21:20:32 +01:00
|
|
|
//#defining-dispatcher
|
|
|
|
|
import akka.actor.Props
|
2012-02-24 14:28:17 +01:00
|
|
|
val myActor =
|
|
|
|
|
context.actorOf(Props[MyActor].withDispatcher("my-dispatcher"), "myactor1")
|
2011-12-12 21:20:32 +01:00
|
|
|
//#defining-dispatcher
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"defining dispatcher with bounded queue" in {
|
2011-12-21 19:07:54 +01:00
|
|
|
val dispatcher = system.dispatchers.lookup("my-dispatcher-bounded-queue")
|
2011-12-12 21:20:32 +01:00
|
|
|
}
|
|
|
|
|
|
2011-12-13 16:18:51 +01:00
|
|
|
"defining pinned dispatcher" in {
|
2012-02-24 14:28:17 +01:00
|
|
|
val context = system
|
2011-12-13 16:18:51 +01:00
|
|
|
//#defining-pinned-dispatcher
|
2012-02-24 14:28:17 +01:00
|
|
|
val myActor =
|
|
|
|
|
context.actorOf(Props[MyActor].withDispatcher("my-pinned-dispatcher"), "myactor2")
|
2011-12-13 16:18:51 +01:00
|
|
|
//#defining-pinned-dispatcher
|
|
|
|
|
}
|
|
|
|
|
|
2012-10-15 21:34:31 +02:00
|
|
|
"looking up a dispatcher" in {
|
|
|
|
|
//#lookup
|
|
|
|
|
// for use with Futures, Scheduler, etc.
|
|
|
|
|
implicit val executionContext = system.dispatchers.lookup("my-dispatcher")
|
|
|
|
|
//#lookup
|
|
|
|
|
}
|
|
|
|
|
|
2011-12-12 21:20:32 +01:00
|
|
|
"defining priority dispatcher" in {
|
|
|
|
|
//#prio-dispatcher
|
|
|
|
|
|
2012-02-24 14:28:17 +01:00
|
|
|
// We create a new Actor that just prints out what it processes
|
|
|
|
|
val a = system.actorOf(
|
2011-12-12 21:20:32 +01:00
|
|
|
Props(new Actor {
|
|
|
|
|
val log: LoggingAdapter = Logging(context.system, this)
|
|
|
|
|
|
|
|
|
|
self ! 'lowpriority
|
|
|
|
|
self ! 'lowpriority
|
|
|
|
|
self ! 'highpriority
|
|
|
|
|
self ! 'pigdog
|
|
|
|
|
self ! 'pigdog2
|
|
|
|
|
self ! 'pigdog3
|
|
|
|
|
self ! 'highpriority
|
|
|
|
|
self ! PoisonPill
|
|
|
|
|
|
|
|
|
|
def receive = {
|
|
|
|
|
case x ⇒ log.info(x.toString)
|
|
|
|
|
}
|
2011-12-21 20:34:46 +01:00
|
|
|
}).withDispatcher("prio-dispatcher"))
|
2011-12-12 21:20:32 +01:00
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
Logs:
|
|
|
|
|
'highpriority
|
|
|
|
|
'highpriority
|
|
|
|
|
'pigdog
|
|
|
|
|
'pigdog2
|
|
|
|
|
'pigdog3
|
|
|
|
|
'lowpriority
|
|
|
|
|
'lowpriority
|
|
|
|
|
*/
|
|
|
|
|
//#prio-dispatcher
|
|
|
|
|
|
2013-03-31 03:04:12 +02:00
|
|
|
watch(a)
|
|
|
|
|
expectMsgPF() { case Terminated(`a`) ⇒ () }
|
2011-12-12 21:20:32 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"defining balancing dispatcher" in {
|
2011-12-21 19:07:54 +01:00
|
|
|
val dispatcher = system.dispatchers.lookup("my-balancing-dispatcher")
|
2011-12-12 21:20:32 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|