Renaming EBEDD to Dispatcher, EBEDWSD to BalancingDispatcher, ThreadBasedDispatcher to PinnedDispatcher and PEBEDD to PriorityDispatcher, closing ticket #784
This commit is contained in:
parent
1f024f1c9e
commit
3181905fed
28 changed files with 156 additions and 156 deletions
|
|
@ -264,7 +264,7 @@ class TypedActorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach
|
||||||
"be able to use work-stealing dispatcher" in {
|
"be able to use work-stealing dispatcher" in {
|
||||||
val config = Configuration(
|
val config = Configuration(
|
||||||
Duration(6600, "ms"),
|
Duration(6600, "ms"),
|
||||||
Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pooled-dispatcher")
|
Dispatchers.newBalancingDispatcher("pooled-dispatcher")
|
||||||
.withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity
|
.withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity
|
||||||
.setCorePoolSize(60)
|
.setCorePoolSize(60)
|
||||||
.setMaxPoolSize(60)
|
.setMaxPoolSize(60)
|
||||||
|
|
|
||||||
|
|
@ -16,7 +16,7 @@ class SupervisorMiscSpec extends WordSpec with MustMatchers {
|
||||||
val countDownLatch = new CountDownLatch(4)
|
val countDownLatch = new CountDownLatch(4)
|
||||||
|
|
||||||
val actor1 = Actor.actorOf(new Actor {
|
val actor1 = Actor.actorOf(new Actor {
|
||||||
self.dispatcher = Dispatchers.newThreadBasedDispatcher(self)
|
self.dispatcher = Dispatchers.newPinnedDispatcher(self)
|
||||||
override def postRestart(cause: Throwable) { countDownLatch.countDown() }
|
override def postRestart(cause: Throwable) { countDownLatch.countDown() }
|
||||||
|
|
||||||
protected def receive = {
|
protected def receive = {
|
||||||
|
|
@ -26,7 +26,7 @@ class SupervisorMiscSpec extends WordSpec with MustMatchers {
|
||||||
}).start()
|
}).start()
|
||||||
|
|
||||||
val actor2 = Actor.actorOf(new Actor {
|
val actor2 = Actor.actorOf(new Actor {
|
||||||
self.dispatcher = Dispatchers.newThreadBasedDispatcher(self)
|
self.dispatcher = Dispatchers.newPinnedDispatcher(self)
|
||||||
override def postRestart(cause: Throwable) { countDownLatch.countDown() }
|
override def postRestart(cause: Throwable) { countDownLatch.countDown() }
|
||||||
|
|
||||||
protected def receive = {
|
protected def receive = {
|
||||||
|
|
@ -36,7 +36,7 @@ class SupervisorMiscSpec extends WordSpec with MustMatchers {
|
||||||
}).start()
|
}).start()
|
||||||
|
|
||||||
val actor3 = Actor.actorOf(new Actor {
|
val actor3 = Actor.actorOf(new Actor {
|
||||||
self.dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("test").build
|
self.dispatcher = Dispatchers.newDispatcher("test").build
|
||||||
override def postRestart(cause: Throwable) { countDownLatch.countDown() }
|
override def postRestart(cause: Throwable) { countDownLatch.countDown() }
|
||||||
|
|
||||||
protected def receive = {
|
protected def receive = {
|
||||||
|
|
@ -46,7 +46,7 @@ class SupervisorMiscSpec extends WordSpec with MustMatchers {
|
||||||
}).start()
|
}).start()
|
||||||
|
|
||||||
val actor4 = Actor.actorOf(new Actor {
|
val actor4 = Actor.actorOf(new Actor {
|
||||||
self.dispatcher = Dispatchers.newThreadBasedDispatcher(self)
|
self.dispatcher = Dispatchers.newPinnedDispatcher(self)
|
||||||
override def postRestart(cause: Throwable) { countDownLatch.countDown() }
|
override def postRestart(cause: Throwable) { countDownLatch.countDown() }
|
||||||
|
|
||||||
protected def receive = {
|
protected def receive = {
|
||||||
|
|
|
||||||
|
|
@ -20,7 +20,7 @@ class ConfigSpec extends WordSpec with MustMatchers {
|
||||||
getString("akka.time-unit") must equal(Some("seconds"))
|
getString("akka.time-unit") must equal(Some("seconds"))
|
||||||
getString("akka.version") must equal(Some("2.0-SNAPSHOT"))
|
getString("akka.version") must equal(Some("2.0-SNAPSHOT"))
|
||||||
|
|
||||||
getString("akka.actor.default-dispatcher.type") must equal(Some("GlobalExecutorBasedEventDriven"))
|
getString("akka.actor.default-dispatcher.type") must equal(Some("GlobalDispatcher"))
|
||||||
getInt("akka.actor.default-dispatcher.keep-alive-time") must equal(Some(60))
|
getInt("akka.actor.default-dispatcher.keep-alive-time") must equal(Some(60))
|
||||||
getDouble("akka.actor.default-dispatcher.core-pool-size-factor") must equal(Some(1.0))
|
getDouble("akka.actor.default-dispatcher.core-pool-size-factor") must equal(Some(1.0))
|
||||||
getDouble("akka.actor.default-dispatcher.max-pool-size-factor") must equal(Some(4.0))
|
getDouble("akka.actor.default-dispatcher.max-pool-size-factor") must equal(Some(4.0))
|
||||||
|
|
|
||||||
|
|
@ -344,12 +344,12 @@ abstract class ActorModelSpec extends JUnitSuite {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class ExecutorBasedEventDrivenDispatcherModelTest extends ActorModelSpec {
|
class DispatcherModelTest extends ActorModelSpec {
|
||||||
def newInterceptedDispatcher =
|
def newInterceptedDispatcher =
|
||||||
new ExecutorBasedEventDrivenDispatcher("foo") with MessageDispatcherInterceptor
|
new Dispatcher("foo") with MessageDispatcherInterceptor
|
||||||
}
|
}
|
||||||
|
|
||||||
class ExecutorBasedEventDrivenWorkStealingDispatcherModelTest extends ActorModelSpec {
|
class BalancingDispatcherModelTest extends ActorModelSpec {
|
||||||
def newInterceptedDispatcher =
|
def newInterceptedDispatcher =
|
||||||
new ExecutorBasedEventDrivenWorkStealingDispatcher("foo") with MessageDispatcherInterceptor
|
new BalancingDispatcher("foo") with MessageDispatcherInterceptor
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -21,15 +21,15 @@ object DispatchersSpec {
|
||||||
val executorbounds = "executor-bounds"
|
val executorbounds = "executor-bounds"
|
||||||
val allowcoretimeout = "allow-core-timeout"
|
val allowcoretimeout = "allow-core-timeout"
|
||||||
val rejectionpolicy = "rejection-policy" // abort, caller-runs, discard-oldest, discard
|
val rejectionpolicy = "rejection-policy" // abort, caller-runs, discard-oldest, discard
|
||||||
val throughput = "throughput" // Throughput for ExecutorBasedEventDrivenDispatcher
|
val throughput = "throughput" // Throughput for Dispatcher
|
||||||
|
|
||||||
def instance(dispatcher: MessageDispatcher): (MessageDispatcher) ⇒ Boolean = _ == dispatcher
|
def instance(dispatcher: MessageDispatcher): (MessageDispatcher) ⇒ Boolean = _ == dispatcher
|
||||||
def ofType[T <: MessageDispatcher: Manifest]: (MessageDispatcher) ⇒ Boolean = _.getClass == manifest[T].erasure
|
def ofType[T <: MessageDispatcher: Manifest]: (MessageDispatcher) ⇒ Boolean = _.getClass == manifest[T].erasure
|
||||||
|
|
||||||
def typesAndValidators: Map[String, (MessageDispatcher) ⇒ Boolean] = Map(
|
def typesAndValidators: Map[String, (MessageDispatcher) ⇒ Boolean] = Map(
|
||||||
"ExecutorBasedEventDrivenWorkStealing" -> ofType[ExecutorBasedEventDrivenWorkStealingDispatcher],
|
"BalancingDispatcher" -> ofType[BalancingDispatcher],
|
||||||
"ExecutorBasedEventDriven" -> ofType[ExecutorBasedEventDrivenDispatcher],
|
"Dispatcher" -> ofType[Dispatcher],
|
||||||
"GlobalExecutorBasedEventDriven" -> instance(globalExecutorBasedEventDrivenDispatcher))
|
"GlobalDispatcher" -> instance(globalDispatcher))
|
||||||
|
|
||||||
def validTypes = typesAndValidators.keys.toList
|
def validTypes = typesAndValidators.keys.toList
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -3,14 +3,14 @@ package akka.actor.dispatch
|
||||||
import java.util.concurrent.{ CountDownLatch, TimeUnit }
|
import java.util.concurrent.{ CountDownLatch, TimeUnit }
|
||||||
import org.scalatest.junit.JUnitSuite
|
import org.scalatest.junit.JUnitSuite
|
||||||
import org.junit.Test
|
import org.junit.Test
|
||||||
import akka.dispatch.{ Dispatchers, ExecutorBasedEventDrivenDispatcher }
|
import akka.dispatch.{ Dispatchers, Dispatcher }
|
||||||
import akka.actor.Actor
|
import akka.actor.Actor
|
||||||
import Actor._
|
import Actor._
|
||||||
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger }
|
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger }
|
||||||
|
|
||||||
object ExecutorBasedEventDrivenDispatcherActorSpec {
|
object DispatcherActorSpec {
|
||||||
class TestActor extends Actor {
|
class TestActor extends Actor {
|
||||||
self.dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(self.uuid.toString).build
|
self.dispatcher = Dispatchers.newDispatcher(self.uuid.toString).build
|
||||||
def receive = {
|
def receive = {
|
||||||
case "Hello" ⇒
|
case "Hello" ⇒
|
||||||
self.reply("World")
|
self.reply("World")
|
||||||
|
|
@ -23,14 +23,14 @@ object ExecutorBasedEventDrivenDispatcherActorSpec {
|
||||||
val oneWay = new CountDownLatch(1)
|
val oneWay = new CountDownLatch(1)
|
||||||
}
|
}
|
||||||
class OneWayTestActor extends Actor {
|
class OneWayTestActor extends Actor {
|
||||||
self.dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(self.uuid.toString).build
|
self.dispatcher = Dispatchers.newDispatcher(self.uuid.toString).build
|
||||||
def receive = {
|
def receive = {
|
||||||
case "OneWay" ⇒ OneWayTestActor.oneWay.countDown()
|
case "OneWay" ⇒ OneWayTestActor.oneWay.countDown()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
class ExecutorBasedEventDrivenDispatcherActorSpec extends JUnitSuite {
|
class DispatcherActorSpec extends JUnitSuite {
|
||||||
import ExecutorBasedEventDrivenDispatcherActorSpec._
|
import DispatcherActorSpec._
|
||||||
|
|
||||||
private val unit = TimeUnit.MILLISECONDS
|
private val unit = TimeUnit.MILLISECONDS
|
||||||
|
|
||||||
|
|
@ -74,7 +74,7 @@ class ExecutorBasedEventDrivenDispatcherActorSpec extends JUnitSuite {
|
||||||
@Test
|
@Test
|
||||||
def shouldRespectThroughput {
|
def shouldRespectThroughput {
|
||||||
val throughputDispatcher = Dispatchers.
|
val throughputDispatcher = Dispatchers.
|
||||||
newExecutorBasedEventDrivenDispatcher("THROUGHPUT", 101, 0, Dispatchers.MAILBOX_TYPE).
|
newDispatcher("THROUGHPUT", 101, 0, Dispatchers.MAILBOX_TYPE).
|
||||||
setCorePoolSize(1).
|
setCorePoolSize(1).
|
||||||
build
|
build
|
||||||
|
|
||||||
|
|
@ -110,7 +110,7 @@ class ExecutorBasedEventDrivenDispatcherActorSpec extends JUnitSuite {
|
||||||
def shouldRespectThroughputDeadline {
|
def shouldRespectThroughputDeadline {
|
||||||
val deadlineMs = 100
|
val deadlineMs = 100
|
||||||
val throughputDispatcher = Dispatchers.
|
val throughputDispatcher = Dispatchers.
|
||||||
newExecutorBasedEventDrivenDispatcher("THROUGHPUT", 2, deadlineMs, Dispatchers.MAILBOX_TYPE).
|
newDispatcher("THROUGHPUT", 2, deadlineMs, Dispatchers.MAILBOX_TYPE).
|
||||||
setCorePoolSize(1).
|
setCorePoolSize(1).
|
||||||
build
|
build
|
||||||
val works = new AtomicBoolean(true)
|
val works = new AtomicBoolean(true)
|
||||||
|
|
|
||||||
|
|
@ -12,7 +12,7 @@ import Actor._
|
||||||
*
|
*
|
||||||
* @author Jan Van Besien
|
* @author Jan Van Besien
|
||||||
*/
|
*/
|
||||||
class ExecutorBasedEventDrivenDispatcherActorsSpec extends JUnitSuite with MustMatchers {
|
class DispatcherActorsSpec extends JUnitSuite with MustMatchers {
|
||||||
class SlowActor(finishedCounter: CountDownLatch) extends Actor {
|
class SlowActor(finishedCounter: CountDownLatch) extends Actor {
|
||||||
|
|
||||||
def receive = {
|
def receive = {
|
||||||
|
|
|
||||||
|
|
@ -10,9 +10,9 @@ import akka.actor.{ IllegalActorStateException, Actor }
|
||||||
import Actor._
|
import Actor._
|
||||||
import akka.dispatch.{ MessageQueue, Dispatchers }
|
import akka.dispatch.{ MessageQueue, Dispatchers }
|
||||||
|
|
||||||
object ExecutorBasedEventDrivenWorkStealingDispatcherSpec {
|
object BalancingDispatcherSpec {
|
||||||
|
|
||||||
def newWorkStealer() = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pooled-dispatcher", 1).build
|
def newWorkStealer() = Dispatchers.newBalancingDispatcher("pooled-dispatcher", 1).build
|
||||||
|
|
||||||
val delayableActorDispatcher, sharedActorDispatcher, parentActorDispatcher = newWorkStealer()
|
val delayableActorDispatcher, sharedActorDispatcher, parentActorDispatcher = newWorkStealer()
|
||||||
|
|
||||||
|
|
@ -52,8 +52,8 @@ object ExecutorBasedEventDrivenWorkStealingDispatcherSpec {
|
||||||
/**
|
/**
|
||||||
* @author Jan Van Besien
|
* @author Jan Van Besien
|
||||||
*/
|
*/
|
||||||
class ExecutorBasedEventDrivenWorkStealingDispatcherSpec extends JUnitSuite with MustMatchers {
|
class BalancingDispatcherSpec extends JUnitSuite with MustMatchers {
|
||||||
import ExecutorBasedEventDrivenWorkStealingDispatcherSpec._
|
import BalancingDispatcherSpec._
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
def fastActorShouldStealWorkFromSlowActor {
|
def fastActorShouldStealWorkFromSlowActor {
|
||||||
|
|
|
||||||
|
|
@ -8,7 +8,7 @@ import java.util.concurrent.CountDownLatch
|
||||||
|
|
||||||
class PriorityDispatcherSpec extends WordSpec with MustMatchers {
|
class PriorityDispatcherSpec extends WordSpec with MustMatchers {
|
||||||
|
|
||||||
"A PriorityExecutorBasedEventDrivenDispatcher" must {
|
"A PriorityDispatcher" must {
|
||||||
"Order it's messages according to the specified comparator using an unbounded mailbox" in {
|
"Order it's messages according to the specified comparator using an unbounded mailbox" in {
|
||||||
testOrdering(UnboundedMailbox())
|
testOrdering(UnboundedMailbox())
|
||||||
}
|
}
|
||||||
|
|
@ -19,7 +19,7 @@ class PriorityDispatcherSpec extends WordSpec with MustMatchers {
|
||||||
}
|
}
|
||||||
|
|
||||||
def testOrdering(mboxType: MailboxType) {
|
def testOrdering(mboxType: MailboxType) {
|
||||||
val dispatcher = new PriorityExecutorBasedEventDrivenDispatcher("Test",
|
val dispatcher = new PriorityDispatcher("Test",
|
||||||
PriorityGenerator({
|
PriorityGenerator({
|
||||||
case i: Int ⇒ i //Reverse order
|
case i: Int ⇒ i //Reverse order
|
||||||
case 'Result ⇒ Int.MaxValue
|
case 'Result ⇒ Int.MaxValue
|
||||||
|
|
|
||||||
|
|
@ -10,7 +10,7 @@ import Actor._
|
||||||
|
|
||||||
object ThreadBasedActorSpec {
|
object ThreadBasedActorSpec {
|
||||||
class TestActor extends Actor {
|
class TestActor extends Actor {
|
||||||
self.dispatcher = Dispatchers.newThreadBasedDispatcher(self)
|
self.dispatcher = Dispatchers.newPinnedDispatcher(self)
|
||||||
|
|
||||||
def receive = {
|
def receive = {
|
||||||
case "Hello" ⇒
|
case "Hello" ⇒
|
||||||
|
|
@ -30,7 +30,7 @@ class ThreadBasedActorSpec extends JUnitSuite {
|
||||||
def shouldSendOneWay {
|
def shouldSendOneWay {
|
||||||
var oneWay = new CountDownLatch(1)
|
var oneWay = new CountDownLatch(1)
|
||||||
val actor = actorOf(new Actor {
|
val actor = actorOf(new Actor {
|
||||||
self.dispatcher = Dispatchers.newThreadBasedDispatcher(self)
|
self.dispatcher = Dispatchers.newPinnedDispatcher(self)
|
||||||
def receive = {
|
def receive = {
|
||||||
case "OneWay" ⇒ oneWay.countDown()
|
case "OneWay" ⇒ oneWay.countDown()
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -160,7 +160,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Akka Java API. <p/>
|
* Akka Java API. <p/>
|
||||||
* The default dispatcher is the <tt>Dispatchers.globalExecutorBasedEventDrivenDispatcher</tt>.
|
* The default dispatcher is the <tt>Dispatchers.globalDispatcher</tt>.
|
||||||
* This means that all actors will share the same event-driven executor based dispatcher.
|
* This means that all actors will share the same event-driven executor based dispatcher.
|
||||||
* <p/>
|
* <p/>
|
||||||
* You can override it so it fits the specific use-case that the actor is used for.
|
* You can override it so it fits the specific use-case that the actor is used for.
|
||||||
|
|
|
||||||
|
|
@ -19,18 +19,18 @@ import util.DynamicVariable
|
||||||
* The preferred way of creating dispatchers is to use
|
* The preferred way of creating dispatchers is to use
|
||||||
* the {@link akka.dispatch.Dispatchers} factory object.
|
* the {@link akka.dispatch.Dispatchers} factory object.
|
||||||
*
|
*
|
||||||
* @see akka.dispatch.ExecutorBasedEventDrivenWorkStealingDispatcher
|
* @see akka.dispatch.BalancingDispatcher
|
||||||
* @see akka.dispatch.Dispatchers
|
* @see akka.dispatch.Dispatchers
|
||||||
*
|
*
|
||||||
* @author Viktor Klang
|
* @author Viktor Klang
|
||||||
*/
|
*/
|
||||||
class ExecutorBasedEventDrivenWorkStealingDispatcher(
|
class BalancingDispatcher(
|
||||||
_name: String,
|
_name: String,
|
||||||
throughput: Int = Dispatchers.THROUGHPUT,
|
throughput: Int = Dispatchers.THROUGHPUT,
|
||||||
throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS,
|
throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS,
|
||||||
mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE,
|
mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE,
|
||||||
config: ThreadPoolConfig = ThreadPoolConfig())
|
config: ThreadPoolConfig = ThreadPoolConfig())
|
||||||
extends ExecutorBasedEventDrivenDispatcher(_name, throughput, throughputDeadlineTime, mailboxType, config) {
|
extends Dispatcher(_name, throughput, throughputDeadlineTime, mailboxType, config) {
|
||||||
|
|
||||||
def this(_name: String, throughput: Int, throughputDeadlineTime: Int, mailboxType: MailboxType) =
|
def this(_name: String, throughput: Int, throughputDeadlineTime: Int, mailboxType: MailboxType) =
|
||||||
this(_name, throughput, throughputDeadlineTime, mailboxType, ThreadPoolConfig()) // Needed for Java API usage
|
this(_name, throughput, throughputDeadlineTime, mailboxType, ThreadPoolConfig()) // Needed for Java API usage
|
||||||
|
|
@ -28,7 +28,7 @@ import java.util.concurrent.{ TimeUnit, ExecutorService, RejectedExecutionExcept
|
||||||
* <p/>
|
* <p/>
|
||||||
* Example usage:
|
* Example usage:
|
||||||
* <pre/>
|
* <pre/>
|
||||||
* val dispatcher = new ExecutorBasedEventDrivenDispatcher("name")
|
* val dispatcher = new Dispatcher("name")
|
||||||
* dispatcher
|
* dispatcher
|
||||||
* .withNewThreadPoolWithBoundedBlockingQueue(100)
|
* .withNewThreadPoolWithBoundedBlockingQueue(100)
|
||||||
* .setCorePoolSize(16)
|
* .setCorePoolSize(16)
|
||||||
|
|
@ -43,7 +43,7 @@ import java.util.concurrent.{ TimeUnit, ExecutorService, RejectedExecutionExcept
|
||||||
* <p/>
|
* <p/>
|
||||||
* Example usage:
|
* Example usage:
|
||||||
* <pre/>
|
* <pre/>
|
||||||
* ExecutorBasedEventDrivenDispatcher dispatcher = new ExecutorBasedEventDrivenDispatcher("name");
|
* Dispatcher dispatcher = new Dispatcher("name");
|
||||||
* dispatcher
|
* dispatcher
|
||||||
* .withNewThreadPoolWithBoundedBlockingQueue(100)
|
* .withNewThreadPoolWithBoundedBlockingQueue(100)
|
||||||
* .setCorePoolSize(16)
|
* .setCorePoolSize(16)
|
||||||
|
|
@ -63,7 +63,7 @@ import java.util.concurrent.{ TimeUnit, ExecutorService, RejectedExecutionExcept
|
||||||
* always continues until the mailbox is empty.
|
* always continues until the mailbox is empty.
|
||||||
* Larger values (or zero or negative) increase throughput, smaller values increase fairness
|
* Larger values (or zero or negative) increase throughput, smaller values increase fairness
|
||||||
*/
|
*/
|
||||||
class ExecutorBasedEventDrivenDispatcher(
|
class Dispatcher(
|
||||||
_name: String,
|
_name: String,
|
||||||
val throughput: Int = Dispatchers.THROUGHPUT,
|
val throughput: Int = Dispatchers.THROUGHPUT,
|
||||||
val throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS,
|
val throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS,
|
||||||
|
|
@ -117,7 +117,7 @@ class ExecutorBasedEventDrivenDispatcher(
|
||||||
case b: UnboundedMailbox ⇒
|
case b: UnboundedMailbox ⇒
|
||||||
new ConcurrentLinkedQueue[MessageInvocation] with MessageQueue with ExecutableMailbox {
|
new ConcurrentLinkedQueue[MessageInvocation] with MessageQueue with ExecutableMailbox {
|
||||||
@inline
|
@inline
|
||||||
final def dispatcher = ExecutorBasedEventDrivenDispatcher.this
|
final def dispatcher = Dispatcher.this
|
||||||
@inline
|
@inline
|
||||||
final def enqueue(m: MessageInvocation) = this.add(m)
|
final def enqueue(m: MessageInvocation) = this.add(m)
|
||||||
@inline
|
@inline
|
||||||
|
|
@ -126,7 +126,7 @@ class ExecutorBasedEventDrivenDispatcher(
|
||||||
case b: BoundedMailbox ⇒
|
case b: BoundedMailbox ⇒
|
||||||
new DefaultBoundedMessageQueue(b.capacity, b.pushTimeOut) with ExecutableMailbox {
|
new DefaultBoundedMessageQueue(b.capacity, b.pushTimeOut) with ExecutableMailbox {
|
||||||
@inline
|
@inline
|
||||||
final def dispatcher = ExecutorBasedEventDrivenDispatcher.this
|
final def dispatcher = Dispatcher.this
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -173,11 +173,11 @@ class ExecutorBasedEventDrivenDispatcher(
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This is the behavior of an ExecutorBasedEventDrivenDispatchers mailbox.
|
* This is the behavior of an Dispatchers mailbox.
|
||||||
*/
|
*/
|
||||||
trait ExecutableMailbox extends Runnable { self: MessageQueue ⇒
|
trait ExecutableMailbox extends Runnable { self: MessageQueue ⇒
|
||||||
|
|
||||||
def dispatcher: ExecutorBasedEventDrivenDispatcher
|
def dispatcher: Dispatcher
|
||||||
|
|
||||||
final def run = {
|
final def run = {
|
||||||
try {
|
try {
|
||||||
|
|
@ -237,7 +237,7 @@ object PriorityGenerator {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A PriorityGenerator is a convenience API to create a Comparator that orders the messages of a
|
* A PriorityGenerator is a convenience API to create a Comparator that orders the messages of a
|
||||||
* PriorityExecutorBasedEventDrivenDispatcher
|
* PriorityDispatcher
|
||||||
*/
|
*/
|
||||||
abstract class PriorityGenerator extends java.util.Comparator[MessageInvocation] {
|
abstract class PriorityGenerator extends java.util.Comparator[MessageInvocation] {
|
||||||
def gen(message: Any): Int
|
def gen(message: Any): Int
|
||||||
|
|
@ -247,18 +247,18 @@ abstract class PriorityGenerator extends java.util.Comparator[MessageInvocation]
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A version of ExecutorBasedEventDrivenDispatcher that gives all actors registered to it a priority mailbox,
|
* A version of Dispatcher that gives all actors registered to it a priority mailbox,
|
||||||
* prioritized according to the supplied comparator.
|
* prioritized according to the supplied comparator.
|
||||||
*
|
*
|
||||||
* The dispatcher will process the messages with the _lowest_ priority first.
|
* The dispatcher will process the messages with the _lowest_ priority first.
|
||||||
*/
|
*/
|
||||||
class PriorityExecutorBasedEventDrivenDispatcher(
|
class PriorityDispatcher(
|
||||||
name: String,
|
name: String,
|
||||||
val comparator: java.util.Comparator[MessageInvocation],
|
val comparator: java.util.Comparator[MessageInvocation],
|
||||||
throughput: Int = Dispatchers.THROUGHPUT,
|
throughput: Int = Dispatchers.THROUGHPUT,
|
||||||
throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS,
|
throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS,
|
||||||
mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE,
|
mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE,
|
||||||
config: ThreadPoolConfig = ThreadPoolConfig()) extends ExecutorBasedEventDrivenDispatcher(name, throughput, throughputDeadlineTime, mailboxType, config) with PriorityMailbox {
|
config: ThreadPoolConfig = ThreadPoolConfig()) extends Dispatcher(name, throughput, throughputDeadlineTime, mailboxType, config) with PriorityMailbox {
|
||||||
|
|
||||||
def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int, throughputDeadlineTime: Int, mailboxType: MailboxType) =
|
def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int, throughputDeadlineTime: Int, mailboxType: MailboxType) =
|
||||||
this(name, comparator, throughput, throughputDeadlineTime, mailboxType, ThreadPoolConfig()) // Needed for Java API usage
|
this(name, comparator, throughput, throughputDeadlineTime, mailboxType, ThreadPoolConfig()) // Needed for Java API usage
|
||||||
|
|
@ -277,14 +277,14 @@ class PriorityExecutorBasedEventDrivenDispatcher(
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Can be used to give an ExecutorBasedEventDrivenDispatcher's actors priority-enabled mailboxes
|
* Can be used to give an Dispatcher's actors priority-enabled mailboxes
|
||||||
*
|
*
|
||||||
* Usage:
|
* Usage:
|
||||||
* new ExecutorBasedEventDrivenDispatcher(...) with PriorityMailbox {
|
* new Dispatcher(...) with PriorityMailbox {
|
||||||
* val comparator = ...comparator that determines mailbox priority ordering...
|
* val comparator = ...comparator that determines mailbox priority ordering...
|
||||||
* }
|
* }
|
||||||
*/
|
*/
|
||||||
trait PriorityMailbox { self: ExecutorBasedEventDrivenDispatcher ⇒
|
trait PriorityMailbox { self: Dispatcher ⇒
|
||||||
def comparator: java.util.Comparator[MessageInvocation]
|
def comparator: java.util.Comparator[MessageInvocation]
|
||||||
|
|
||||||
override def createMailbox(actorRef: ActorRef): AnyRef = self.mailboxType match {
|
override def createMailbox(actorRef: ActorRef): AnyRef = self.mailboxType match {
|
||||||
|
|
@ -18,7 +18,7 @@ import java.util.concurrent.TimeUnit
|
||||||
* <p/>
|
* <p/>
|
||||||
* Example usage:
|
* Example usage:
|
||||||
* <pre/>
|
* <pre/>
|
||||||
* val dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("name")
|
* val dispatcher = Dispatchers.newDispatcher("name")
|
||||||
* dispatcher
|
* dispatcher
|
||||||
* .withNewThreadPoolWithLinkedBlockingQueueWithCapacity(100)
|
* .withNewThreadPoolWithLinkedBlockingQueueWithCapacity(100)
|
||||||
* .setCorePoolSize(16)
|
* .setCorePoolSize(16)
|
||||||
|
|
@ -32,7 +32,7 @@ import java.util.concurrent.TimeUnit
|
||||||
* <p/>
|
* <p/>
|
||||||
* Example usage:
|
* Example usage:
|
||||||
* <pre/>
|
* <pre/>
|
||||||
* MessageDispatcher dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("name");
|
* MessageDispatcher dispatcher = Dispatchers.newDispatcher("name");
|
||||||
* dispatcher
|
* dispatcher
|
||||||
* .withNewThreadPoolWithLinkedBlockingQueueWithCapacity(100)
|
* .withNewThreadPoolWithLinkedBlockingQueueWithCapacity(100)
|
||||||
* .setCorePoolSize(16)
|
* .setCorePoolSize(16)
|
||||||
|
|
@ -57,10 +57,10 @@ object Dispatchers {
|
||||||
val MAILBOX_TYPE: MailboxType = if (MAILBOX_CAPACITY < 1) UnboundedMailbox() else BoundedMailbox()
|
val MAILBOX_TYPE: MailboxType = if (MAILBOX_CAPACITY < 1) UnboundedMailbox() else BoundedMailbox()
|
||||||
|
|
||||||
lazy val defaultGlobalDispatcher = {
|
lazy val defaultGlobalDispatcher = {
|
||||||
config.getSection("akka.actor.default-dispatcher").flatMap(from).getOrElse(globalExecutorBasedEventDrivenDispatcher)
|
config.getSection("akka.actor.default-dispatcher").flatMap(from).getOrElse(globalDispatcher)
|
||||||
}
|
}
|
||||||
|
|
||||||
object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher("global", THROUGHPUT, THROUGHPUT_DEADLINE_TIME_MILLIS, MAILBOX_TYPE)
|
object globalDispatcher extends Dispatcher("global", THROUGHPUT, THROUGHPUT_DEADLINE_TIME_MILLIS, MAILBOX_TYPE)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates an thread based dispatcher serving a single actor through the same single thread.
|
* Creates an thread based dispatcher serving a single actor through the same single thread.
|
||||||
|
|
@ -68,9 +68,9 @@ object Dispatchers {
|
||||||
* <p/>
|
* <p/>
|
||||||
* E.g. each actor consumes its own thread.
|
* E.g. each actor consumes its own thread.
|
||||||
*/
|
*/
|
||||||
def newThreadBasedDispatcher(actor: ActorRef) = actor match {
|
def newPinnedDispatcher(actor: ActorRef) = actor match {
|
||||||
case null ⇒ new ThreadBasedDispatcher()
|
case null ⇒ new PinnedDispatcher()
|
||||||
case some ⇒ new ThreadBasedDispatcher(some)
|
case some ⇒ new PinnedDispatcher(some)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -79,9 +79,9 @@ object Dispatchers {
|
||||||
* <p/>
|
* <p/>
|
||||||
* E.g. each actor consumes its own thread.
|
* E.g. each actor consumes its own thread.
|
||||||
*/
|
*/
|
||||||
def newThreadBasedDispatcher(actor: ActorRef, mailboxType: MailboxType) = actor match {
|
def newPinnedDispatcher(actor: ActorRef, mailboxType: MailboxType) = actor match {
|
||||||
case null ⇒ new ThreadBasedDispatcher(mailboxType)
|
case null ⇒ new PinnedDispatcher(mailboxType)
|
||||||
case some ⇒ new ThreadBasedDispatcher(some, mailboxType)
|
case some ⇒ new PinnedDispatcher(some, mailboxType)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -89,77 +89,77 @@ object Dispatchers {
|
||||||
* <p/>
|
* <p/>
|
||||||
* E.g. each actor consumes its own thread.
|
* E.g. each actor consumes its own thread.
|
||||||
*/
|
*/
|
||||||
def newThreadBasedDispatcher(name: String, mailboxType: MailboxType) =
|
def newPinnedDispatcher(name: String, mailboxType: MailboxType) =
|
||||||
new ThreadBasedDispatcher(name, mailboxType)
|
new PinnedDispatcher(name, mailboxType)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates an thread based dispatcher serving a single actor through the same single thread.
|
* Creates an thread based dispatcher serving a single actor through the same single thread.
|
||||||
* <p/>
|
* <p/>
|
||||||
* E.g. each actor consumes its own thread.
|
* E.g. each actor consumes its own thread.
|
||||||
*/
|
*/
|
||||||
def newThreadBasedDispatcher(name: String) =
|
def newPinnedDispatcher(name: String) =
|
||||||
new ThreadBasedDispatcher(name)
|
new PinnedDispatcher(name)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool.
|
* Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool.
|
||||||
* <p/>
|
* <p/>
|
||||||
* Has a fluent builder interface for configuring its semantics.
|
* Has a fluent builder interface for configuring its semantics.
|
||||||
*/
|
*/
|
||||||
def newExecutorBasedEventDrivenDispatcher(name: String) =
|
def newDispatcher(name: String) =
|
||||||
ThreadPoolConfigDispatcherBuilder(config ⇒ new ExecutorBasedEventDrivenDispatcher(name, config), ThreadPoolConfig())
|
ThreadPoolConfigDispatcherBuilder(config ⇒ new Dispatcher(name, config), ThreadPoolConfig())
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool.
|
* Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool.
|
||||||
* <p/>
|
* <p/>
|
||||||
* Has a fluent builder interface for configuring its semantics.
|
* Has a fluent builder interface for configuring its semantics.
|
||||||
*/
|
*/
|
||||||
def newExecutorBasedEventDrivenDispatcher(name: String, throughput: Int, mailboxType: MailboxType) =
|
def newDispatcher(name: String, throughput: Int, mailboxType: MailboxType) =
|
||||||
ThreadPoolConfigDispatcherBuilder(config ⇒
|
ThreadPoolConfigDispatcherBuilder(config ⇒
|
||||||
new ExecutorBasedEventDrivenDispatcher(name, throughput, THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType, config), ThreadPoolConfig())
|
new Dispatcher(name, throughput, THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType, config), ThreadPoolConfig())
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool.
|
* Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool.
|
||||||
* <p/>
|
* <p/>
|
||||||
* Has a fluent builder interface for configuring its semantics.
|
* Has a fluent builder interface for configuring its semantics.
|
||||||
*/
|
*/
|
||||||
def newExecutorBasedEventDrivenDispatcher(name: String, throughput: Int, throughputDeadlineMs: Int, mailboxType: MailboxType) =
|
def newDispatcher(name: String, throughput: Int, throughputDeadlineMs: Int, mailboxType: MailboxType) =
|
||||||
ThreadPoolConfigDispatcherBuilder(config ⇒
|
ThreadPoolConfigDispatcherBuilder(config ⇒
|
||||||
new ExecutorBasedEventDrivenDispatcher(name, throughput, throughputDeadlineMs, mailboxType, config), ThreadPoolConfig())
|
new Dispatcher(name, throughput, throughputDeadlineMs, mailboxType, config), ThreadPoolConfig())
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool.
|
* Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool.
|
||||||
* <p/>
|
* <p/>
|
||||||
* Has a fluent builder interface for configuring its semantics.
|
* Has a fluent builder interface for configuring its semantics.
|
||||||
*/
|
*/
|
||||||
def newExecutorBasedEventDrivenWorkStealingDispatcher(name: String) =
|
def newBalancingDispatcher(name: String) =
|
||||||
ThreadPoolConfigDispatcherBuilder(config ⇒ new ExecutorBasedEventDrivenWorkStealingDispatcher(name, config), ThreadPoolConfig())
|
ThreadPoolConfigDispatcherBuilder(config ⇒ new BalancingDispatcher(name, config), ThreadPoolConfig())
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool.
|
* Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool.
|
||||||
* <p/>
|
* <p/>
|
||||||
* Has a fluent builder interface for configuring its semantics.
|
* Has a fluent builder interface for configuring its semantics.
|
||||||
*/
|
*/
|
||||||
def newExecutorBasedEventDrivenWorkStealingDispatcher(name: String, throughput: Int) =
|
def newBalancingDispatcher(name: String, throughput: Int) =
|
||||||
ThreadPoolConfigDispatcherBuilder(config ⇒
|
ThreadPoolConfigDispatcherBuilder(config ⇒
|
||||||
new ExecutorBasedEventDrivenWorkStealingDispatcher(name, throughput, THROUGHPUT_DEADLINE_TIME_MILLIS, MAILBOX_TYPE, config), ThreadPoolConfig())
|
new BalancingDispatcher(name, throughput, THROUGHPUT_DEADLINE_TIME_MILLIS, MAILBOX_TYPE, config), ThreadPoolConfig())
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool.
|
* Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool.
|
||||||
* <p/>
|
* <p/>
|
||||||
* Has a fluent builder interface for configuring its semantics.
|
* Has a fluent builder interface for configuring its semantics.
|
||||||
*/
|
*/
|
||||||
def newExecutorBasedEventDrivenWorkStealingDispatcher(name: String, throughput: Int, mailboxType: MailboxType) =
|
def newBalancingDispatcher(name: String, throughput: Int, mailboxType: MailboxType) =
|
||||||
ThreadPoolConfigDispatcherBuilder(config ⇒
|
ThreadPoolConfigDispatcherBuilder(config ⇒
|
||||||
new ExecutorBasedEventDrivenWorkStealingDispatcher(name, throughput, THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType, config), ThreadPoolConfig())
|
new BalancingDispatcher(name, throughput, THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType, config), ThreadPoolConfig())
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool.
|
* Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool.
|
||||||
* <p/>
|
* <p/>
|
||||||
* Has a fluent builder interface for configuring its semantics.
|
* Has a fluent builder interface for configuring its semantics.
|
||||||
*/
|
*/
|
||||||
def newExecutorBasedEventDrivenWorkStealingDispatcher(name: String, throughput: Int, throughputDeadlineMs: Int, mailboxType: MailboxType) =
|
def newBalancingDispatcher(name: String, throughput: Int, throughputDeadlineMs: Int, mailboxType: MailboxType) =
|
||||||
ThreadPoolConfigDispatcherBuilder(config ⇒
|
ThreadPoolConfigDispatcherBuilder(config ⇒
|
||||||
new ExecutorBasedEventDrivenWorkStealingDispatcher(name, throughput, throughputDeadlineMs, mailboxType, config), ThreadPoolConfig())
|
new BalancingDispatcher(name, throughput, throughputDeadlineMs, mailboxType, config), ThreadPoolConfig())
|
||||||
/**
|
/**
|
||||||
* Utility function that tries to load the specified dispatcher config from the akka.conf
|
* Utility function that tries to load the specified dispatcher config from the akka.conf
|
||||||
* or else use the supplied default dispatcher
|
* or else use the supplied default dispatcher
|
||||||
|
|
@ -181,7 +181,7 @@ object Dispatchers {
|
||||||
* executor-bounds = -1 # Makes the Executor bounded, -1 is unbounded
|
* executor-bounds = -1 # Makes the Executor bounded, -1 is unbounded
|
||||||
* allow-core-timeout = on # Allow core threads to time out
|
* allow-core-timeout = on # Allow core threads to time out
|
||||||
* rejection-policy = "caller-runs" # abort, caller-runs, discard-oldest, discard
|
* rejection-policy = "caller-runs" # abort, caller-runs, discard-oldest, discard
|
||||||
* throughput = 5 # Throughput for ExecutorBasedEventDrivenDispatcher
|
* throughput = 5 # Throughput for Dispatcher
|
||||||
* }
|
* }
|
||||||
* ex: from(config.getConfigMap(identifier).get)
|
* ex: from(config.getConfigMap(identifier).get)
|
||||||
*
|
*
|
||||||
|
|
@ -192,9 +192,9 @@ object Dispatchers {
|
||||||
*/
|
*/
|
||||||
def from(cfg: Configuration): Option[MessageDispatcher] = {
|
def from(cfg: Configuration): Option[MessageDispatcher] = {
|
||||||
cfg.getString("type") map {
|
cfg.getString("type") map {
|
||||||
case "ExecutorBasedEventDriven" ⇒ new ExecutorBasedEventDrivenDispatcherConfigurator()
|
case "Dispatcher" ⇒ new DispatcherConfigurator()
|
||||||
case "ExecutorBasedEventDrivenWorkStealing" ⇒ new ExecutorBasedEventDrivenWorkStealingDispatcherConfigurator()
|
case "BalancingDispatcher" ⇒ new BalancingDispatcherConfigurator()
|
||||||
case "GlobalExecutorBasedEventDriven" ⇒ GlobalExecutorBasedEventDrivenDispatcherConfigurator
|
case "GlobalDispatcher" ⇒ GlobalDispatcherConfigurator
|
||||||
case fqn ⇒
|
case fqn ⇒
|
||||||
ReflectiveAccess.getClassFor[MessageDispatcherConfigurator](fqn) match {
|
ReflectiveAccess.getClassFor[MessageDispatcherConfigurator](fqn) match {
|
||||||
case r: Right[_, Class[MessageDispatcherConfigurator]] ⇒
|
case r: Right[_, Class[MessageDispatcherConfigurator]] ⇒
|
||||||
|
|
@ -212,13 +212,13 @@ object Dispatchers {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
object GlobalExecutorBasedEventDrivenDispatcherConfigurator extends MessageDispatcherConfigurator {
|
object GlobalDispatcherConfigurator extends MessageDispatcherConfigurator {
|
||||||
def configure(config: Configuration): MessageDispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher
|
def configure(config: Configuration): MessageDispatcher = Dispatchers.globalDispatcher
|
||||||
}
|
}
|
||||||
|
|
||||||
class ExecutorBasedEventDrivenDispatcherConfigurator extends MessageDispatcherConfigurator {
|
class DispatcherConfigurator extends MessageDispatcherConfigurator {
|
||||||
def configure(config: Configuration): MessageDispatcher = {
|
def configure(config: Configuration): MessageDispatcher = {
|
||||||
configureThreadPool(config, threadPoolConfig ⇒ new ExecutorBasedEventDrivenDispatcher(
|
configureThreadPool(config, threadPoolConfig ⇒ new Dispatcher(
|
||||||
config.getString("name", newUuid.toString),
|
config.getString("name", newUuid.toString),
|
||||||
config.getInt("throughput", Dispatchers.THROUGHPUT),
|
config.getInt("throughput", Dispatchers.THROUGHPUT),
|
||||||
config.getInt("throughput-deadline-time", Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS),
|
config.getInt("throughput-deadline-time", Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS),
|
||||||
|
|
@ -227,9 +227,9 @@ class ExecutorBasedEventDrivenDispatcherConfigurator extends MessageDispatcherCo
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class ExecutorBasedEventDrivenWorkStealingDispatcherConfigurator extends MessageDispatcherConfigurator {
|
class BalancingDispatcherConfigurator extends MessageDispatcherConfigurator {
|
||||||
def configure(config: Configuration): MessageDispatcher = {
|
def configure(config: Configuration): MessageDispatcher = {
|
||||||
configureThreadPool(config, threadPoolConfig ⇒ new ExecutorBasedEventDrivenWorkStealingDispatcher(
|
configureThreadPool(config, threadPoolConfig ⇒ new BalancingDispatcher(
|
||||||
config.getString("name", newUuid.toString),
|
config.getString("name", newUuid.toString),
|
||||||
config.getInt("throughput", Dispatchers.THROUGHPUT),
|
config.getInt("throughput", Dispatchers.THROUGHPUT),
|
||||||
config.getInt("throughput-deadline-time", Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS),
|
config.getInt("throughput-deadline-time", Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS),
|
||||||
|
|
|
||||||
|
|
@ -14,9 +14,9 @@ import akka.actor.{ Actor, ActorRef }
|
||||||
*
|
*
|
||||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||||
*/
|
*/
|
||||||
class ThreadBasedDispatcher(_actor: ActorRef, _name: String, _mailboxType: MailboxType)
|
class PinnedDispatcher(_actor: ActorRef, _name: String, _mailboxType: MailboxType)
|
||||||
extends ExecutorBasedEventDrivenDispatcher(
|
extends Dispatcher(
|
||||||
_name, Dispatchers.THROUGHPUT, -1, _mailboxType, ThreadBasedDispatcher.oneThread) {
|
_name, Dispatchers.THROUGHPUT, -1, _mailboxType, PinnedDispatcher.oneThread) {
|
||||||
|
|
||||||
def this(_name: String, _mailboxType: MailboxType) = this(null, _name, _mailboxType)
|
def this(_name: String, _mailboxType: MailboxType) = this(null, _name, _mailboxType)
|
||||||
|
|
||||||
|
|
@ -47,7 +47,7 @@ class ThreadBasedDispatcher(_actor: ActorRef, _name: String, _mailboxType: Mailb
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
object ThreadBasedDispatcher {
|
object PinnedDispatcher {
|
||||||
val oneThread: ThreadPoolConfig = ThreadPoolConfig(allowCorePoolTimeout = true, corePoolSize = 1, maxPoolSize = 1)
|
val oneThread: ThreadPoolConfig = ThreadPoolConfig(allowCorePoolTimeout = true, corePoolSize = 1, maxPoolSize = 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -89,7 +89,7 @@ object EventHandler extends ListenerManagement {
|
||||||
|
|
||||||
class EventHandlerException extends AkkaException
|
class EventHandlerException extends AkkaException
|
||||||
|
|
||||||
lazy val EventHandlerDispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("event:handler").build
|
lazy val EventHandlerDispatcher = Dispatchers.newDispatcher("event:handler").build
|
||||||
|
|
||||||
implicit object defaultListenerFormat extends StatelessActorFormat[DefaultListener]
|
implicit object defaultListenerFormat extends StatelessActorFormat[DefaultListener]
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1580,7 +1580,7 @@ object RemoteClusterDaemon {
|
||||||
val ADDRESS = "akka-cluster-daemon".intern
|
val ADDRESS = "akka-cluster-daemon".intern
|
||||||
|
|
||||||
// FIXME configure functionServerDispatcher to what?
|
// FIXME configure functionServerDispatcher to what?
|
||||||
val functionServerDispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("akka:cloud:cluster:function:server").build
|
val functionServerDispatcher = Dispatchers.newDispatcher("akka:cloud:cluster:function:server").build
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -1591,7 +1591,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
|
||||||
import RemoteClusterDaemon._
|
import RemoteClusterDaemon._
|
||||||
import Cluster._
|
import Cluster._
|
||||||
|
|
||||||
self.dispatcher = Dispatchers.newThreadBasedDispatcher(self)
|
self.dispatcher = Dispatchers.newPinnedDispatcher(self)
|
||||||
|
|
||||||
def receive: Receive = {
|
def receive: Receive = {
|
||||||
case message: RemoteDaemonMessageProtocol ⇒
|
case message: RemoteDaemonMessageProtocol ⇒
|
||||||
|
|
|
||||||
|
|
@ -59,13 +59,13 @@ actor is oblivious to which type of mailbox it is using. Here is an example::
|
||||||
|
|
||||||
or for a thread-based durable dispatcher::
|
or for a thread-based durable dispatcher::
|
||||||
|
|
||||||
self.dispatcher = DurableThreadBasedDispatcher(
|
self.dispatcher = DurablePinnedDispatcher(
|
||||||
self,
|
self,
|
||||||
FileDurableMailboxStorage)
|
FileDurableMailboxStorage)
|
||||||
|
|
||||||
There are 2 different durable dispatchers, ``DurableEventBasedDispatcher`` and
|
There are 2 different durable dispatchers, ``DurableEventBasedDispatcher`` and
|
||||||
``DurableThreadBasedDispatcher``, which are durable versions of
|
``DurablePinnedDispatcher``, which are durable versions of
|
||||||
``ExecutorBasedEventDrivenDispatcher`` and ``ThreadBasedDispatcher``.
|
``Dispatcher`` and ``PinnedDispatcher``.
|
||||||
|
|
||||||
This gives you an excellent way of creating bulkheads in your application, where
|
This gives you an excellent way of creating bulkheads in your application, where
|
||||||
groups of actors sharing the same dispatcher also share the same backing
|
groups of actors sharing the same dispatcher also share the same backing
|
||||||
|
|
@ -120,7 +120,7 @@ Here is an example of how you can configure your dispatcher to use this mailbox:
|
||||||
|
|
||||||
or for a thread-based durable dispatcher::
|
or for a thread-based durable dispatcher::
|
||||||
|
|
||||||
self.dispatcher = DurableThreadBasedDispatcher(
|
self.dispatcher = DurablePinnedDispatcher(
|
||||||
self,
|
self,
|
||||||
RedisDurableMailboxStorage)
|
RedisDurableMailboxStorage)
|
||||||
|
|
||||||
|
|
@ -164,7 +164,7 @@ Here is an example of how you can configure your dispatcher to use this mailbox:
|
||||||
|
|
||||||
or for a thread-based durable dispatcher::
|
or for a thread-based durable dispatcher::
|
||||||
|
|
||||||
self.dispatcher = DurableThreadBasedDispatcher(
|
self.dispatcher = DurablePinnedDispatcher(
|
||||||
self,
|
self,
|
||||||
ZooKeeperDurableMailboxStorage)
|
ZooKeeperDurableMailboxStorage)
|
||||||
|
|
||||||
|
|
@ -202,7 +202,7 @@ Beanstalk documentation on how to do that. ::
|
||||||
|
|
||||||
or for a thread-based durable dispatcher. ::
|
or for a thread-based durable dispatcher. ::
|
||||||
|
|
||||||
self.dispatcher = DurableThreadBasedDispatcher(
|
self.dispatcher = DurablePinnedDispatcher(
|
||||||
self,
|
self,
|
||||||
BeanstalkDurableMailboxStorage)
|
BeanstalkDurableMailboxStorage)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -58,7 +58,7 @@ A custom ``akka.conf`` might look like this:
|
||||||
"sample.myservice.Boot"]
|
"sample.myservice.Boot"]
|
||||||
|
|
||||||
actor {
|
actor {
|
||||||
throughput = 10 # Throughput for ExecutorBasedEventDrivenDispatcher, set to 1 for complete fairness
|
throughput = 10 # Throughput for Dispatcher, set to 1 for complete fairness
|
||||||
}
|
}
|
||||||
|
|
||||||
remote {
|
remote {
|
||||||
|
|
|
||||||
|
|
@ -18,7 +18,7 @@ The event-based Actors currently consume ~600 bytes per Actor which means that y
|
||||||
Default dispatcher
|
Default dispatcher
|
||||||
------------------
|
------------------
|
||||||
|
|
||||||
For most scenarios the default settings are the best. Here we have one single event-based dispatcher for all Actors created. The dispatcher used is globalExecutorBasedEventDrivenDispatcher in akka.dispatch.Dispatchers.
|
For most scenarios the default settings are the best. Here we have one single event-based dispatcher for all Actors created. The dispatcher used is globalDispatcher in akka.dispatch.Dispatchers.
|
||||||
|
|
||||||
But if you feel that you are starting to contend on the single dispatcher (the 'Executor' and its queue) or want to group a specific set of Actors for a dedicated dispatcher for better flexibility and configurability then you can override the defaults and define your own dispatcher. See below for details on which ones are available and how they can be configured.
|
But if you feel that you are starting to contend on the single dispatcher (the 'Executor' and its queue) or want to group a specific set of Actors for a dedicated dispatcher for better flexibility and configurability then you can override the defaults and define your own dispatcher. See below for details on which ones are available and how they can be configured.
|
||||||
|
|
||||||
|
|
@ -59,11 +59,11 @@ Let's now walk through the different dispatchers in more detail.
|
||||||
Thread-based
|
Thread-based
|
||||||
^^^^^^^^^^^^
|
^^^^^^^^^^^^
|
||||||
|
|
||||||
The 'ThreadBasedDispatcher' binds a dedicated OS thread to each specific Actor. The messages are posted to a 'LinkedBlockingQueue' which feeds the messages to the dispatcher one by one. A 'ThreadBasedDispatcher' cannot be shared between actors. This dispatcher has worse performance and scalability than the event-based dispatcher but works great for creating "daemon" Actors that consumes a low frequency of messages and are allowed to go off and do their own thing for a longer period of time. Another advantage with this dispatcher is that Actors do not block threads for each other.
|
The 'PinnedDispatcher' binds a dedicated OS thread to each specific Actor. The messages are posted to a 'LinkedBlockingQueue' which feeds the messages to the dispatcher one by one. A 'PinnedDispatcher' cannot be shared between actors. This dispatcher has worse performance and scalability than the event-based dispatcher but works great for creating "daemon" Actors that consumes a low frequency of messages and are allowed to go off and do their own thing for a longer period of time. Another advantage with this dispatcher is that Actors do not block threads for each other.
|
||||||
|
|
||||||
.. code-block:: java
|
.. code-block:: java
|
||||||
|
|
||||||
Dispatcher dispatcher = Dispatchers.newThreadBasedDispatcher(actorRef);
|
Dispatcher dispatcher = Dispatchers.newPinnedDispatcher(actorRef);
|
||||||
|
|
||||||
It would normally by used from within the actor like this:
|
It would normally by used from within the actor like this:
|
||||||
|
|
||||||
|
|
@ -71,7 +71,7 @@ It would normally by used from within the actor like this:
|
||||||
|
|
||||||
class MyActor extends UntypedActor {
|
class MyActor extends UntypedActor {
|
||||||
public MyActor() {
|
public MyActor() {
|
||||||
getContext().setDispatcher(Dispatchers.newThreadBasedDispatcher(getContext()));
|
getContext().setDispatcher(Dispatchers.newPinnedDispatcher(getContext()));
|
||||||
}
|
}
|
||||||
...
|
...
|
||||||
}
|
}
|
||||||
|
|
@ -79,7 +79,7 @@ It would normally by used from within the actor like this:
|
||||||
Event-based
|
Event-based
|
||||||
^^^^^^^^^^^
|
^^^^^^^^^^^
|
||||||
|
|
||||||
The 'ExecutorBasedEventDrivenDispatcher' binds a set of Actors to a thread pool backed up by a 'BlockingQueue'. This dispatcher is highly configurable and supports a fluent configuration API to configure the 'BlockingQueue' (type of queue, max items etc.) as well as the thread pool.
|
The 'Dispatcher' binds a set of Actors to a thread pool backed up by a 'BlockingQueue'. This dispatcher is highly configurable and supports a fluent configuration API to configure the 'BlockingQueue' (type of queue, max items etc.) as well as the thread pool.
|
||||||
|
|
||||||
The event-driven dispatchers **must be shared** between multiple Typed Actors and/or Actors. One best practice is to let each top-level Actor, e.g. the Actors you define in the declarative supervisor config, to get their own dispatcher but reuse the dispatcher for each new Actor that the top-level Actor creates. But you can also share dispatcher between multiple top-level Actors. This is very use-case specific and needs to be tried out on a case by case basis. The important thing is that Akka tries to provide you with the freedom you need to design and implement your system in the most efficient way in regards to performance, throughput and latency.
|
The event-driven dispatchers **must be shared** between multiple Typed Actors and/or Actors. One best practice is to let each top-level Actor, e.g. the Actors you define in the declarative supervisor config, to get their own dispatcher but reuse the dispatcher for each new Actor that the top-level Actor creates. But you can also share dispatcher between multiple top-level Actors. This is very use-case specific and needs to be tried out on a case by case basis. The important thing is that Akka tries to provide you with the freedom you need to design and implement your system in the most efficient way in regards to performance, throughput and latency.
|
||||||
|
|
||||||
|
|
@ -109,7 +109,7 @@ Here is an example:
|
||||||
|
|
||||||
class MyActor extends UntypedActor {
|
class MyActor extends UntypedActor {
|
||||||
public MyActor() {
|
public MyActor() {
|
||||||
getContext().setDispatcher(Dispatchers.newExecutorBasedEventDrivenDispatcher(name)
|
getContext().setDispatcher(Dispatchers.newDispatcher(name)
|
||||||
.withNewThreadPoolWithLinkedBlockingQueueWithCapacity(100)
|
.withNewThreadPoolWithLinkedBlockingQueueWithCapacity(100)
|
||||||
.setCorePoolSize(16)
|
.setCorePoolSize(16)
|
||||||
.setMaxPoolSize(128)
|
.setMaxPoolSize(128)
|
||||||
|
|
@ -120,7 +120,7 @@ Here is an example:
|
||||||
...
|
...
|
||||||
}
|
}
|
||||||
|
|
||||||
This 'ExecutorBasedEventDrivenDispatcher' allows you to define the 'throughput' it should have. This defines the number of messages for a specific Actor the dispatcher should process in one single sweep.
|
This 'Dispatcher' allows you to define the 'throughput' it should have. This defines the number of messages for a specific Actor the dispatcher should process in one single sweep.
|
||||||
Setting this to a higher number will increase throughput but lower fairness, and vice versa. If you don't specify it explicitly then it uses the default value defined in the 'akka.conf' configuration file:
|
Setting this to a higher number will increase throughput but lower fairness, and vice versa. If you don't specify it explicitly then it uses the default value defined in the 'akka.conf' configuration file:
|
||||||
|
|
||||||
.. code-block:: xml
|
.. code-block:: xml
|
||||||
|
|
@ -136,10 +136,10 @@ Browse the :ref:`scaladoc` or look at the code for all the options available.
|
||||||
Priority event-based
|
Priority event-based
|
||||||
^^^^^^^^^^^^^^^^^^^^
|
^^^^^^^^^^^^^^^^^^^^
|
||||||
|
|
||||||
Sometimes it's useful to be able to specify priority order of messages, that is done by using PriorityExecutorBasedEventDrivenDispatcher and supply
|
Sometimes it's useful to be able to specify priority order of messages, that is done by using PriorityDispatcher and supply
|
||||||
a java.util.Comparator[MessageInvocation] or use a akka.dispatch.PriorityGenerator (recommended):
|
a java.util.Comparator[MessageInvocation] or use a akka.dispatch.PriorityGenerator (recommended):
|
||||||
|
|
||||||
Creating a PriorityExecutorBasedEventDrivenDispatcher using PriorityGenerator:
|
Creating a PriorityDispatcher using PriorityGenerator:
|
||||||
|
|
||||||
.. code-block:: java
|
.. code-block:: java
|
||||||
|
|
||||||
|
|
@ -168,7 +168,7 @@ Creating a PriorityExecutorBasedEventDrivenDispatcher using PriorityGenerator:
|
||||||
// We create an instance of the actor that will print out the messages it processes
|
// We create an instance of the actor that will print out the messages it processes
|
||||||
ActorRef ref = Actors.actorOf(MyActor.class);
|
ActorRef ref = Actors.actorOf(MyActor.class);
|
||||||
// We create a new Priority dispatcher and seed it with the priority generator
|
// We create a new Priority dispatcher and seed it with the priority generator
|
||||||
ref.setDispatcher(new PriorityExecutorBasedEventDrivenDispatcher("foo", gen));
|
ref.setDispatcher(new PriorityDispatcher("foo", gen));
|
||||||
|
|
||||||
ref.start(); // Start the actor
|
ref.start(); // Start the actor
|
||||||
ref.getDispatcher().suspend(ref); // Suspending the actor so it doesn't start to treat the messages before we have enqueued all of them :-)
|
ref.getDispatcher().suspend(ref); // Suspending the actor so it doesn't start to treat the messages before we have enqueued all of them :-)
|
||||||
|
|
@ -196,14 +196,14 @@ lowpriority
|
||||||
Work-stealing event-based
|
Work-stealing event-based
|
||||||
^^^^^^^^^^^^^^^^^^^^^^^^^
|
^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||||
|
|
||||||
The 'ExecutorBasedEventDrivenWorkStealingDispatcher' is a variation of the 'ExecutorBasedEventDrivenDispatcher' in which Actors of the same type can be set up to share this dispatcher and during execution time the different actors will steal messages from other actors if they have less messages to process. This can be a great way to improve throughput at the cost of a little higher latency.
|
The 'BalancingDispatcher' is a variation of the 'Dispatcher' in which Actors of the same type can be set up to share this dispatcher and during execution time the different actors will steal messages from other actors if they have less messages to process. This can be a great way to improve throughput at the cost of a little higher latency.
|
||||||
|
|
||||||
Normally the way you use it is to define a static field to hold the dispatcher and then set in in the Actor explicitly.
|
Normally the way you use it is to define a static field to hold the dispatcher and then set in in the Actor explicitly.
|
||||||
|
|
||||||
.. code-block:: java
|
.. code-block:: java
|
||||||
|
|
||||||
class MyActor extends UntypedActor {
|
class MyActor extends UntypedActor {
|
||||||
public static MessageDispatcher dispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher(name).build();
|
public static MessageDispatcher dispatcher = Dispatchers.newBalancingDispatcher(name).build();
|
||||||
|
|
||||||
public MyActor() {
|
public MyActor() {
|
||||||
getContext().setDispatcher(dispatcher);
|
getContext().setDispatcher(dispatcher);
|
||||||
|
|
@ -236,7 +236,7 @@ Per-instance based configuration
|
||||||
|
|
||||||
You can also do it on a specific dispatcher instance.
|
You can also do it on a specific dispatcher instance.
|
||||||
|
|
||||||
For the 'ExecutorBasedEventDrivenDispatcher' and the 'ExecutorBasedWorkStealingDispatcher' you can do it through their constructor
|
For the 'Dispatcher' and the 'ExecutorBasedWorkStealingDispatcher' you can do it through their constructor
|
||||||
|
|
||||||
.. code-block:: java
|
.. code-block:: java
|
||||||
|
|
||||||
|
|
@ -246,13 +246,13 @@ For the 'ExecutorBasedEventDrivenDispatcher' and the 'ExecutorBasedWorkStealingD
|
||||||
Duration pushTimeout = new FiniteDuration(10, TimeUnit.SECONDS);
|
Duration pushTimeout = new FiniteDuration(10, TimeUnit.SECONDS);
|
||||||
MailboxType mailboxCapacity = new BoundedMailbox(false, capacity, pushTimeout);
|
MailboxType mailboxCapacity = new BoundedMailbox(false, capacity, pushTimeout);
|
||||||
MessageDispatcher dispatcher =
|
MessageDispatcher dispatcher =
|
||||||
Dispatchers.newExecutorBasedEventDrivenDispatcher(name, throughput, mailboxCapacity).build();
|
Dispatchers.newDispatcher(name, throughput, mailboxCapacity).build();
|
||||||
getContext().setDispatcher(dispatcher);
|
getContext().setDispatcher(dispatcher);
|
||||||
}
|
}
|
||||||
...
|
...
|
||||||
}
|
}
|
||||||
|
|
||||||
For the 'ThreadBasedDispatcher', it is non-shareable between actors, and associates a dedicated Thread with the actor.
|
For the 'PinnedDispatcher', it is non-shareable between actors, and associates a dedicated Thread with the actor.
|
||||||
Making it bounded (by specifying a capacity) is optional, but if you do, you need to provide a pushTimeout (default is 10 seconds). When trying to send a message to the Actor it will throw a MessageQueueAppendFailedException("BlockingMessageTransferQueue transfer timed out") if the message cannot be added to the mailbox within the time specified by the pushTimeout.
|
Making it bounded (by specifying a capacity) is optional, but if you do, you need to provide a pushTimeout (default is 10 seconds). When trying to send a message to the Actor it will throw a MessageQueueAppendFailedException("BlockingMessageTransferQueue transfer timed out") if the message cannot be added to the mailbox within the time specified by the pushTimeout.
|
||||||
|
|
||||||
.. code-block:: java
|
.. code-block:: java
|
||||||
|
|
@ -261,7 +261,7 @@ Making it bounded (by specifying a capacity) is optional, but if you do, you nee
|
||||||
public MyActor() {
|
public MyActor() {
|
||||||
int mailboxCapacity = 100;
|
int mailboxCapacity = 100;
|
||||||
Duration pushTimeout = new FiniteDuration(10, TimeUnit.SECONDS);
|
Duration pushTimeout = new FiniteDuration(10, TimeUnit.SECONDS);
|
||||||
getContext().setDispatcher(Dispatchers.newThreadBasedDispatcher(getContext(), mailboxCapacity, pushTimeout));
|
getContext().setDispatcher(Dispatchers.newPinnedDispatcher(getContext(), mailboxCapacity, pushTimeout));
|
||||||
}
|
}
|
||||||
...
|
...
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -36,7 +36,7 @@ Configuration
|
||||||
|
|
||||||
actor {
|
actor {
|
||||||
timeout = 5 # default timeout for future based invocations
|
timeout = 5 # default timeout for future based invocations
|
||||||
throughput = 5 # default throughput for ExecutorBasedEventDrivenDispatcher
|
throughput = 5 # default throughput for Dispatcher
|
||||||
}
|
}
|
||||||
...
|
...
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -199,7 +199,7 @@ Release 1.0-MILESTONE1
|
||||||
- **FIX** - #420 REST endpoints should be able to be processed in parallel (Viktor Klang)
|
- **FIX** - #420 REST endpoints should be able to be processed in parallel (Viktor Klang)
|
||||||
- **FIX** - #422 Dispatcher config should work for ThreadPoolBuilder-based dispatchers (Viktor Klang)
|
- **FIX** - #422 Dispatcher config should work for ThreadPoolBuilder-based dispatchers (Viktor Klang)
|
||||||
- **FIX** - #401 ActorRegistry should not leak memory (Viktor Klang)
|
- **FIX** - #401 ActorRegistry should not leak memory (Viktor Klang)
|
||||||
- **FIX** - #250 Performance optimization for ExecutorBasedEventDrivenDispatcher (Viktor Klang)
|
- **FIX** - #250 Performance optimization for Dispatcher (Viktor Klang)
|
||||||
- **FIX** - #419 Rename init and shutdown callbacks to preStart and postStop, and remove initTransactionalState (Viktor Klang)
|
- **FIX** - #419 Rename init and shutdown callbacks to preStart and postStop, and remove initTransactionalState (Viktor Klang)
|
||||||
- **FIX** - #346 Make max no of restarts (and within) are now both optional (Viktor Klang)
|
- **FIX** - #346 Make max no of restarts (and within) are now both optional (Viktor Klang)
|
||||||
- **FIX** - #424 Actors self.supervisor not set by the time init() is called when started by startLink() (Viktor Klang)
|
- **FIX** - #424 Actors self.supervisor not set by the time init() is called when started by startLink() (Viktor Klang)
|
||||||
|
|
@ -210,7 +210,7 @@ Release 1.0-MILESTONE1
|
||||||
- **FIX** - Logger.warn now properly works with varargs (Viktor Klang)
|
- **FIX** - Logger.warn now properly works with varargs (Viktor Klang)
|
||||||
- **FIX** - #450 Removed ActorRef lifeCycle boilerplate: Some(LifeCycle(Permanent)) => Permanent (Viktor Klang)
|
- **FIX** - #450 Removed ActorRef lifeCycle boilerplate: Some(LifeCycle(Permanent)) => Permanent (Viktor Klang)
|
||||||
- **FIX** - Moved ActorRef.trapExit into ActorRef.faultHandler and removed Option-boilerplate from faultHandler (Viktor Klang)
|
- **FIX** - Moved ActorRef.trapExit into ActorRef.faultHandler and removed Option-boilerplate from faultHandler (Viktor Klang)
|
||||||
- **FIX** - ThreadBasedDispatcher cheaper for idling actors, also benefits from all that is ExecutorBasedEventDrivenDispatcher (Viktor Klang)
|
- **FIX** - PinnedDispatcher cheaper for idling actors, also benefits from all that is Dispatcher (Viktor Klang)
|
||||||
- **FIX** - Fixing Futures.future, uses Actor.spawn under the hood, specify dispatcher to control where block is executed (Viktor Klang)
|
- **FIX** - Fixing Futures.future, uses Actor.spawn under the hood, specify dispatcher to control where block is executed (Viktor Klang)
|
||||||
- **FIX** - #469 Akka "dist" now uses a root folder to avoid loitering if unzipped in a folder (Viktor Klang)
|
- **FIX** - #469 Akka "dist" now uses a root folder to avoid loitering if unzipped in a folder (Viktor Klang)
|
||||||
- **FIX** - Removed ScalaConfig, JavaConfig and rewrote Supervision configuration (Viktor Klang)
|
- **FIX** - Removed ScalaConfig, JavaConfig and rewrote Supervision configuration (Viktor Klang)
|
||||||
|
|
@ -224,7 +224,7 @@ Release 1.0-MILESTONE1
|
||||||
- **ADD** - #262 Add Java API for Agent (Viktor Klang)
|
- **ADD** - #262 Add Java API for Agent (Viktor Klang)
|
||||||
- **ADD** - #264 Add Java API for Dataflow (Viktor Klang)
|
- **ADD** - #264 Add Java API for Dataflow (Viktor Klang)
|
||||||
- **ADD** - Using JerseySimpleBroadcaster instead of JerseyBroadcaster in AkkaBroadcaster (Viktor Klang)
|
- **ADD** - Using JerseySimpleBroadcaster instead of JerseyBroadcaster in AkkaBroadcaster (Viktor Klang)
|
||||||
- **ADD** - #433 Throughput deadline added for ExecutorBasedEventDrivenDispatcher (Viktor Klang)
|
- **ADD** - #433 Throughput deadline added for Dispatcher (Viktor Klang)
|
||||||
- **ADD** - Add possibility to set default cometSupport in akka.conf (Viktor Klang)
|
- **ADD** - Add possibility to set default cometSupport in akka.conf (Viktor Klang)
|
||||||
- **ADD** - #451 Added possibility to use akka-http as a standalone REST server (Viktor Klang)
|
- **ADD** - #451 Added possibility to use akka-http as a standalone REST server (Viktor Klang)
|
||||||
- **ADD** - #446 Added support for Erlang-style receiveTimeout (Viktor Klang)
|
- **ADD** - #446 Added support for Erlang-style receiveTimeout (Viktor Klang)
|
||||||
|
|
@ -308,7 +308,7 @@ Release 0.10 - Aug 21 2010
|
||||||
- **ADD** - Java API for the STM (Peter Vlugter)
|
- **ADD** - Java API for the STM (Peter Vlugter)
|
||||||
- **ADD** - #379 Create STM Atomic templates for Java API (Peter Vlugter)
|
- **ADD** - #379 Create STM Atomic templates for Java API (Peter Vlugter)
|
||||||
- **ADD** - #270 SBT plugin for Akka (Peter Vlugter)
|
- **ADD** - #270 SBT plugin for Akka (Peter Vlugter)
|
||||||
- **ADD** - #198 support for ThreadBasedDispatcher in Spring config (Michael Kober)
|
- **ADD** - #198 support for PinnedDispatcher in Spring config (Michael Kober)
|
||||||
- **ADD** - #377 support HawtDispatcher in Spring config (Michael Kober)
|
- **ADD** - #377 support HawtDispatcher in Spring config (Michael Kober)
|
||||||
- **ADD** - #376 support Spring config for untyped actors (Michael Kober)
|
- **ADD** - #376 support Spring config for untyped actors (Michael Kober)
|
||||||
- **ADD** - #200 support WorkStealingDispatcher in Spring config (Michael Kober)
|
- **ADD** - #200 support WorkStealingDispatcher in Spring config (Michael Kober)
|
||||||
|
|
|
||||||
|
|
@ -22,7 +22,7 @@ For most scenarios the default settings are the best. Here we have one single ev
|
||||||
|
|
||||||
.. code-block:: scala
|
.. code-block:: scala
|
||||||
|
|
||||||
Dispatchers.globalExecutorBasedEventDrivenDispatcher
|
Dispatchers.globalDispatcher
|
||||||
|
|
||||||
But if you feel that you are starting to contend on the single dispatcher (the 'Executor' and its queue) or want to group a specific set of Actors for a dedicated dispatcher for better flexibility and configurability then you can override the defaults and define your own dispatcher. See below for details on which ones are available and how they can be configured.
|
But if you feel that you are starting to contend on the single dispatcher (the 'Executor' and its queue) or want to group a specific set of Actors for a dedicated dispatcher for better flexibility and configurability then you can override the defaults and define your own dispatcher. See below for details on which ones are available and how they can be configured.
|
||||||
|
|
||||||
|
|
@ -61,7 +61,7 @@ Let's now walk through the different dispatchers in more detail.
|
||||||
Thread-based
|
Thread-based
|
||||||
^^^^^^^^^^^^
|
^^^^^^^^^^^^
|
||||||
|
|
||||||
The 'ThreadBasedDispatcher' binds a dedicated OS thread to each specific Actor. The messages are posted to a 'LinkedBlockingQueue' which feeds the messages to the dispatcher one by one. A 'ThreadBasedDispatcher' cannot be shared between actors. This dispatcher has worse performance and scalability than the event-based dispatcher but works great for creating "daemon" Actors that consumes a low frequency of messages and are allowed to go off and do their own thing for a longer period of time. Another advantage with this dispatcher is that Actors do not block threads for each other.
|
The 'PinnedDispatcher' binds a dedicated OS thread to each specific Actor. The messages are posted to a 'LinkedBlockingQueue' which feeds the messages to the dispatcher one by one. A 'PinnedDispatcher' cannot be shared between actors. This dispatcher has worse performance and scalability than the event-based dispatcher but works great for creating "daemon" Actors that consumes a low frequency of messages and are allowed to go off and do their own thing for a longer period of time. Another advantage with this dispatcher is that Actors do not block threads for each other.
|
||||||
|
|
||||||
It would normally by used from within the actor like this:
|
It would normally by used from within the actor like this:
|
||||||
|
|
||||||
|
|
@ -69,7 +69,7 @@ It would normally by used from within the actor like this:
|
||||||
|
|
||||||
class MyActor extends Actor {
|
class MyActor extends Actor {
|
||||||
public MyActor() {
|
public MyActor() {
|
||||||
self.dispatcher = Dispatchers.newThreadBasedDispatcher(self)
|
self.dispatcher = Dispatchers.newPinnedDispatcher(self)
|
||||||
}
|
}
|
||||||
...
|
...
|
||||||
}
|
}
|
||||||
|
|
@ -77,7 +77,7 @@ It would normally by used from within the actor like this:
|
||||||
Event-based
|
Event-based
|
||||||
^^^^^^^^^^^
|
^^^^^^^^^^^
|
||||||
|
|
||||||
The 'ExecutorBasedEventDrivenDispatcher' binds a set of Actors to a thread pool backed up by a 'BlockingQueue'. This dispatcher is highly configurable and supports a fluent configuration API to configure the 'BlockingQueue' (type of queue, max items etc.) as well as the thread pool.
|
The 'Dispatcher' binds a set of Actors to a thread pool backed up by a 'BlockingQueue'. This dispatcher is highly configurable and supports a fluent configuration API to configure the 'BlockingQueue' (type of queue, max items etc.) as well as the thread pool.
|
||||||
|
|
||||||
The event-driven dispatchers **must be shared** between multiple Actors. One best practice is to let each top-level Actor, e.g. the Actors you define in the declarative supervisor config, to get their own dispatcher but reuse the dispatcher for each new Actor that the top-level Actor creates. But you can also share dispatcher between multiple top-level Actors. This is very use-case specific and needs to be tried out on a case by case basis. The important thing is that Akka tries to provide you with the freedom you need to design and implement your system in the most efficient way in regards to performance, throughput and latency.
|
The event-driven dispatchers **must be shared** between multiple Actors. One best practice is to let each top-level Actor, e.g. the Actors you define in the declarative supervisor config, to get their own dispatcher but reuse the dispatcher for each new Actor that the top-level Actor creates. But you can also share dispatcher between multiple top-level Actors. This is very use-case specific and needs to be tried out on a case by case basis. The important thing is that Akka tries to provide you with the freedom you need to design and implement your system in the most efficient way in regards to performance, throughput and latency.
|
||||||
|
|
||||||
|
|
@ -106,7 +106,7 @@ Here is an example:
|
||||||
import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy
|
import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy
|
||||||
|
|
||||||
class MyActor extends Actor {
|
class MyActor extends Actor {
|
||||||
self.dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(name)
|
self.dispatcher = Dispatchers.newDispatcher(name)
|
||||||
.withNewThreadPoolWithLinkedBlockingQueueWithCapacity(100)
|
.withNewThreadPoolWithLinkedBlockingQueueWithCapacity(100)
|
||||||
.setCorePoolSize(16)
|
.setCorePoolSize(16)
|
||||||
.setMaxPoolSize(128)
|
.setMaxPoolSize(128)
|
||||||
|
|
@ -116,7 +116,7 @@ Here is an example:
|
||||||
...
|
...
|
||||||
}
|
}
|
||||||
|
|
||||||
This 'ExecutorBasedEventDrivenDispatcher' allows you to define the 'throughput' it should have. This defines the number of messages for a specific Actor the dispatcher should process in one single sweep.
|
This 'Dispatcher' allows you to define the 'throughput' it should have. This defines the number of messages for a specific Actor the dispatcher should process in one single sweep.
|
||||||
Setting this to a higher number will increase throughput but lower fairness, and vice versa. If you don't specify it explicitly then it uses the default value defined in the 'akka.conf' configuration file:
|
Setting this to a higher number will increase throughput but lower fairness, and vice versa. If you don't specify it explicitly then it uses the default value defined in the 'akka.conf' configuration file:
|
||||||
|
|
||||||
.. code-block:: ruby
|
.. code-block:: ruby
|
||||||
|
|
@ -132,10 +132,10 @@ Browse the `ScalaDoc <scaladoc>`_ or look at the code for all the options availa
|
||||||
Priority event-based
|
Priority event-based
|
||||||
^^^^^^^^^^^^^^^^^^^^
|
^^^^^^^^^^^^^^^^^^^^
|
||||||
|
|
||||||
Sometimes it's useful to be able to specify priority order of messages, that is done by using PriorityExecutorBasedEventDrivenDispatcher and supply
|
Sometimes it's useful to be able to specify priority order of messages, that is done by using PriorityDispatcher and supply
|
||||||
a java.util.Comparator[MessageInvocation] or use a akka.dispatch.PriorityGenerator (recommended):
|
a java.util.Comparator[MessageInvocation] or use a akka.dispatch.PriorityGenerator (recommended):
|
||||||
|
|
||||||
Creating a PriorityExecutorBasedEventDrivenDispatcher using PriorityGenerator:
|
Creating a PriorityDispatcher using PriorityGenerator:
|
||||||
|
|
||||||
.. code-block:: scala
|
.. code-block:: scala
|
||||||
|
|
||||||
|
|
@ -156,7 +156,7 @@ Creating a PriorityExecutorBasedEventDrivenDispatcher using PriorityGenerator:
|
||||||
})
|
})
|
||||||
|
|
||||||
// We create a new Priority dispatcher and seed it with the priority generator
|
// We create a new Priority dispatcher and seed it with the priority generator
|
||||||
a.dispatcher = new PriorityExecutorBasedEventDrivenDispatcher("foo", gen)
|
a.dispatcher = new PriorityDispatcher("foo", gen)
|
||||||
a.start // Start the Actor
|
a.start // Start the Actor
|
||||||
|
|
||||||
a.dispatcher.suspend(a) // Suspending the actor so it doesn't start to treat the messages before we have enqueued all of them :-)
|
a.dispatcher.suspend(a) // Suspending the actor so it doesn't start to treat the messages before we have enqueued all of them :-)
|
||||||
|
|
@ -184,14 +184,14 @@ Prints:
|
||||||
Work-stealing event-based
|
Work-stealing event-based
|
||||||
^^^^^^^^^^^^^^^^^^^^^^^^^
|
^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||||
|
|
||||||
The 'ExecutorBasedEventDrivenWorkStealingDispatcher' is a variation of the 'ExecutorBasedEventDrivenDispatcher' in which Actors of the same type can be set up to share this dispatcher and during execution time the different actors will steal messages from other actors if they have less messages to process. This can be a great way to improve throughput at the cost of a little higher latency.
|
The 'BalancingDispatcher' is a variation of the 'Dispatcher' in which Actors of the same type can be set up to share this dispatcher and during execution time the different actors will steal messages from other actors if they have less messages to process. This can be a great way to improve throughput at the cost of a little higher latency.
|
||||||
|
|
||||||
Normally the way you use it is to create an Actor companion object to hold the dispatcher and then set in in the Actor explicitly.
|
Normally the way you use it is to create an Actor companion object to hold the dispatcher and then set in in the Actor explicitly.
|
||||||
|
|
||||||
.. code-block:: scala
|
.. code-block:: scala
|
||||||
|
|
||||||
object MyActor {
|
object MyActor {
|
||||||
val dispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher(name).build
|
val dispatcher = Dispatchers.newBalancingDispatcher(name).build
|
||||||
}
|
}
|
||||||
|
|
||||||
class MyActor extends Actor {
|
class MyActor extends Actor {
|
||||||
|
|
@ -224,24 +224,24 @@ Per-instance based configuration
|
||||||
|
|
||||||
You can also do it on a specific dispatcher instance.
|
You can also do it on a specific dispatcher instance.
|
||||||
|
|
||||||
For the 'ExecutorBasedEventDrivenDispatcher' and the 'ExecutorBasedWorkStealingDispatcher' you can do it through their constructor
|
For the 'Dispatcher' and the 'ExecutorBasedWorkStealingDispatcher' you can do it through their constructor
|
||||||
|
|
||||||
.. code-block:: scala
|
.. code-block:: scala
|
||||||
|
|
||||||
class MyActor extends Actor {
|
class MyActor extends Actor {
|
||||||
val mailboxCapacity = BoundedMailbox(capacity = 100)
|
val mailboxCapacity = BoundedMailbox(capacity = 100)
|
||||||
self.dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(name, throughput, mailboxCapacity).build
|
self.dispatcher = Dispatchers.newDispatcher(name, throughput, mailboxCapacity).build
|
||||||
...
|
...
|
||||||
}
|
}
|
||||||
|
|
||||||
For the 'ThreadBasedDispatcher', it is non-shareable between actors, and associates a dedicated Thread with the actor.
|
For the 'PinnedDispatcher', it is non-shareable between actors, and associates a dedicated Thread with the actor.
|
||||||
Making it bounded (by specifying a capacity) is optional, but if you do, you need to provide a pushTimeout (default is 10 seconds). When trying to send a message to the Actor it will throw a MessageQueueAppendFailedException("BlockingMessageTransferQueue transfer timed out") if the message cannot be added to the mailbox within the time specified by the pushTimeout.
|
Making it bounded (by specifying a capacity) is optional, but if you do, you need to provide a pushTimeout (default is 10 seconds). When trying to send a message to the Actor it will throw a MessageQueueAppendFailedException("BlockingMessageTransferQueue transfer timed out") if the message cannot be added to the mailbox within the time specified by the pushTimeout.
|
||||||
|
|
||||||
.. code-block:: scala
|
.. code-block:: scala
|
||||||
|
|
||||||
class MyActor extends Actor {
|
class MyActor extends Actor {
|
||||||
import akka.util.duration._
|
import akka.util.duration._
|
||||||
self.dispatcher = Dispatchers.newThreadBasedDispatcher(self, mailboxCapacity = 100,
|
self.dispatcher = Dispatchers.newPinnedDispatcher(self, mailboxCapacity = 100,
|
||||||
pushTimeOut = 10 seconds)
|
pushTimeOut = 10 seconds)
|
||||||
...
|
...
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -52,7 +52,7 @@ case object FileDurableMailboxStorage extends DurableMailboxStorage("akka.a
|
||||||
case object ZooKeeperDurableMailboxStorage extends DurableMailboxStorage("akka.actor.mailbox.ZooKeeperBasedMailbox")
|
case object ZooKeeperDurableMailboxStorage extends DurableMailboxStorage("akka.actor.mailbox.ZooKeeperBasedMailbox")
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The durable equivalent of ExecutorBasedEventDrivenDispatcher
|
* The durable equivalent of Dispatcher
|
||||||
*
|
*
|
||||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||||
*/
|
*/
|
||||||
|
|
@ -62,7 +62,7 @@ case class DurableEventBasedDispatcher(
|
||||||
_throughput: Int = Dispatchers.THROUGHPUT,
|
_throughput: Int = Dispatchers.THROUGHPUT,
|
||||||
_throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS,
|
_throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS,
|
||||||
_mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE,
|
_mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE,
|
||||||
_config: ThreadPoolConfig = ThreadPoolConfig()) extends ExecutorBasedEventDrivenDispatcher(
|
_config: ThreadPoolConfig = ThreadPoolConfig()) extends Dispatcher(
|
||||||
_name,
|
_name,
|
||||||
_throughput,
|
_throughput,
|
||||||
_throughputDeadlineTime,
|
_throughputDeadlineTime,
|
||||||
|
|
@ -101,14 +101,14 @@ case class DurableEventBasedDispatcher(
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The durable equivalent of ThreadBasedDispatcher
|
* The durable equivalent of PinnedDispatcher
|
||||||
*
|
*
|
||||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||||
*/
|
*/
|
||||||
case class DurableThreadBasedDispatcher(
|
case class DurablePinnedDispatcher(
|
||||||
_actor: ActorRef,
|
_actor: ActorRef,
|
||||||
_storage: DurableMailboxStorage,
|
_storage: DurableMailboxStorage,
|
||||||
_mailboxType: MailboxType) extends ThreadBasedDispatcher(_actor,_mailboxType) {
|
_mailboxType: MailboxType) extends PinnedDispatcher(_actor,_mailboxType) {
|
||||||
|
|
||||||
def this(actor: ActorRef, _storage: DurableMailboxStorage) =
|
def this(actor: ActorRef, _storage: DurableMailboxStorage) =
|
||||||
this(actor, _storage, UnboundedMailbox()) // For Java API
|
this(actor, _storage, UnboundedMailbox()) // For Java API
|
||||||
|
|
|
||||||
|
|
@ -37,8 +37,8 @@ abstract class DurableExecutableMailbox(owner: ActorRef) extends MessageQueue wi
|
||||||
|
|
||||||
EventHandler.debug(this, "Creating %s mailbox [%s]".format(getClass.getName, name))
|
EventHandler.debug(this, "Creating %s mailbox [%s]".format(getClass.getName, name))
|
||||||
|
|
||||||
val dispatcher: ExecutorBasedEventDrivenDispatcher = owner.dispatcher match {
|
val dispatcher: Dispatcher = owner.dispatcher match {
|
||||||
case e: ExecutorBasedEventDrivenDispatcher => e
|
case e: Dispatcher => e
|
||||||
case _ => null
|
case _ => null
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -293,7 +293,7 @@ class AgentUpdater[T](agent: Agent[T]) extends Actor {
|
||||||
* Thread-based agent updater actor. Used internally for `sendOff` actions.
|
* Thread-based agent updater actor. Used internally for `sendOff` actions.
|
||||||
*/
|
*/
|
||||||
class ThreadBasedAgentUpdater[T](agent: Agent[T]) extends Actor {
|
class ThreadBasedAgentUpdater[T](agent: Agent[T]) extends Actor {
|
||||||
self.dispatcher = Dispatchers.newThreadBasedDispatcher(self)
|
self.dispatcher = Dispatchers.newPinnedDispatcher(self)
|
||||||
|
|
||||||
val txFactory = TransactionFactory(familyName = "ThreadBasedAgentUpdater", readonly = false)
|
val txFactory = TransactionFactory(familyName = "ThreadBasedAgentUpdater", readonly = false)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -30,8 +30,8 @@ akka {
|
||||||
# - UntypedActor: sendRequestReply && sendRequestReplyFuture
|
# - UntypedActor: sendRequestReply && sendRequestReplyFuture
|
||||||
# - TypedActor: methods with non-void return type
|
# - TypedActor: methods with non-void return type
|
||||||
serialize-messages = off # Does a deep clone of (non-primitive) messages to ensure immutability
|
serialize-messages = off # Does a deep clone of (non-primitive) messages to ensure immutability
|
||||||
throughput = 5 # Default throughput for all ExecutorBasedEventDrivenDispatcher, set to 1 for complete fairness
|
throughput = 5 # Default throughput for all Dispatcher, set to 1 for complete fairness
|
||||||
throughput-deadline-time = -1 # Default throughput deadline for all ExecutorBasedEventDrivenDispatcher, set to 0 or negative for no deadline
|
throughput-deadline-time = -1 # Default throughput deadline for all Dispatcher, set to 0 or negative for no deadline
|
||||||
dispatcher-shutdown-timeout = 1 # Using the akka.time-unit, how long dispatchers by default will wait for new actors until they shut down
|
dispatcher-shutdown-timeout = 1 # Using the akka.time-unit, how long dispatchers by default will wait for new actors until they shut down
|
||||||
|
|
||||||
deployment {
|
deployment {
|
||||||
|
|
@ -75,22 +75,22 @@ akka {
|
||||||
}
|
}
|
||||||
|
|
||||||
default-dispatcher {
|
default-dispatcher {
|
||||||
type = "GlobalExecutorBasedEventDriven" # Must be one of the following, all "Global*" are non-configurable
|
type = "GlobalDispatcher" # Must be one of the following, all "Global*" are non-configurable
|
||||||
# - ExecutorBasedEventDriven
|
# - Dispatcher
|
||||||
# - ExecutorBasedEventDrivenWorkStealing
|
# - BalancingDispatcher
|
||||||
# - GlobalExecutorBasedEventDriven
|
# - GlobalDispatcher
|
||||||
keep-alive-time = 60 # Keep alive time for threads
|
keep-alive-time = 60 # Keep alive time for threads
|
||||||
core-pool-size-factor = 1.0 # No of core threads ... ceil(available processors * factor)
|
core-pool-size-factor = 1.0 # No of core threads ... ceil(available processors * factor)
|
||||||
max-pool-size-factor = 4.0 # Max no of threads ... ceil(available processors * factor)
|
max-pool-size-factor = 4.0 # Max no of threads ... ceil(available processors * factor)
|
||||||
executor-bounds = -1 # Makes the Executor bounded, -1 is unbounded
|
executor-bounds = -1 # Makes the Executor bounded, -1 is unbounded
|
||||||
allow-core-timeout = on # Allow core threads to time out
|
allow-core-timeout = on # Allow core threads to time out
|
||||||
rejection-policy = "caller-runs" # abort, caller-runs, discard-oldest, discard
|
rejection-policy = "caller-runs" # abort, caller-runs, discard-oldest, discard
|
||||||
throughput = 5 # Throughput for ExecutorBasedEventDrivenDispatcher, set to 1 for complete fairness
|
throughput = 5 # Throughput for Dispatcher, set to 1 for complete fairness
|
||||||
throughput-deadline-time = -1 # Throughput deadline for ExecutorBasedEventDrivenDispatcher, set to 0 or negative for no deadline
|
throughput-deadline-time = -1 # Throughput deadline for Dispatcher, set to 0 or negative for no deadline
|
||||||
mailbox-capacity = -1 # If negative (or zero) then an unbounded mailbox is used (default)
|
mailbox-capacity = -1 # If negative (or zero) then an unbounded mailbox is used (default)
|
||||||
# If positive then a bounded mailbox is used and the capacity is set using the property
|
# If positive then a bounded mailbox is used and the capacity is set using the property
|
||||||
# NOTE: setting a mailbox to 'blocking' can be a bit dangerous, could lead to deadlock, use with care
|
# NOTE: setting a mailbox to 'blocking' can be a bit dangerous, could lead to deadlock, use with care
|
||||||
# The following are only used for ExecutorBasedEventDriven and only if mailbox-capacity > 0
|
# The following are only used for Dispatcher and only if mailbox-capacity > 0
|
||||||
mailbox-push-timeout-time = 10 # Specifies the timeout to add a new message to a mailbox that is full - negative number means infinite timeout
|
mailbox-push-timeout-time = 10 # Specifies the timeout to add a new message to a mailbox that is full - negative number means infinite timeout
|
||||||
# (in unit defined by the time-unit property)
|
# (in unit defined by the time-unit property)
|
||||||
}
|
}
|
||||||
|
|
@ -197,7 +197,7 @@ akka {
|
||||||
|
|
||||||
# If you are using akka.http.AkkaMistServlet
|
# If you are using akka.http.AkkaMistServlet
|
||||||
mist-dispatcher {
|
mist-dispatcher {
|
||||||
#type = "GlobalExecutorBasedEventDriven" # Uncomment if you want to use a different dispatcher than the default one for Comet
|
#type = "GlobalDispatcher" # Uncomment if you want to use a different dispatcher than the default one for Comet
|
||||||
}
|
}
|
||||||
connection-close = true # toggles the addition of the "Connection" response header with a "close" value
|
connection-close = true # toggles the addition of the "Connection" response header with a "close" value
|
||||||
root-actor-id = "_httproot" # the id of the actor to use as the root endpoint
|
root-actor-id = "_httproot" # the id of the actor to use as the root endpoint
|
||||||
|
|
|
||||||
|
|
@ -19,7 +19,7 @@ class AkkaParentProject(info: ProjectInfo) extends ParentProject(info) with Exec
|
||||||
val scalaCompileSettings =
|
val scalaCompileSettings =
|
||||||
Seq("-deprecation",
|
Seq("-deprecation",
|
||||||
//"-Xmigration",
|
//"-Xmigration",
|
||||||
//"-optimise",
|
"-optimise",
|
||||||
"-encoding", "utf8")
|
"-encoding", "utf8")
|
||||||
|
|
||||||
val javaCompileSettings = Seq("-Xlint:unchecked")
|
val javaCompileSettings = Seq("-Xlint:unchecked")
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue