diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 8c3e478644..84583c39da 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -92,10 +92,6 @@ object Actor extends Logging { private[akka] lazy val shutdownHook = { val hook = new Runnable { override def run { - // Shutdown HawtDispatch GlobalQueue - log.slf4j.info("Shutting down Hawt Dispatch global queue") - org.fusesource.hawtdispatch.globalQueue.asInstanceOf[org.fusesource.hawtdispatch.internal.GlobalDispatchQueue].shutdown - // Clear Thread.subclassAudits log.slf4j.info("Clearing subclass audits") val tf = classOf[java.lang.Thread].getDeclaredField("subclassAudits") diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala index bc6dc02ada..dc4d0f5d74 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala @@ -61,19 +61,8 @@ object Dispatchers extends Logging { config.getConfigMap("akka.actor.default-dispatcher").flatMap(from).getOrElse(globalExecutorBasedEventDrivenDispatcher) } - object globalHawtDispatcher extends HawtDispatcher - object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher("global", THROUGHPUT, THROUGHPUT_DEADLINE_TIME_MILLIS, MAILBOX_TYPE) - /** - * Creates an event-driven dispatcher based on the excellent HawtDispatch library. - *
- * Can be beneficial to use theHawtDispatcher.pin(self) to "pin" an actor to a specific thread.
- *
- * See the ScalaDoc for the {@link akka.dispatch.HawtDispatcher} for details.
- */
- def newHawtDispatcher(aggregate: Boolean) = new HawtDispatcher(aggregate)
-
/**
* Creates an thread based dispatcher serving a single actor through the same single thread.
* Uses the default timeout
@@ -141,7 +130,7 @@ object Dispatchers extends Logging {
* Has a fluent builder interface for configuring its semantics.
*/
def newExecutorBasedEventDrivenWorkStealingDispatcher(name: String, mailboxType: MailboxType) =
- ThreadPoolConfigDispatcherBuilder(config => new ExecutorBasedEventDrivenWorkStealingDispatcher(name,mailboxType,config),ThreadPoolConfig())
+ ThreadPoolConfigDispatcherBuilder(config => new ExecutorBasedEventDrivenWorkStealingDispatcher(name, THROUGHPUT, THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType, config, THROUGHPUT),ThreadPoolConfig())
/**
* Utility function that tries to load the specified dispatcher config from the akka.conf
@@ -156,7 +145,7 @@ object Dispatchers extends Logging {
* default-dispatcher {
* type = "GlobalExecutorBasedEventDriven" # Must be one of the following, all "Global*" are non-configurable
* # (ExecutorBasedEventDrivenWorkStealing), ExecutorBasedEventDriven,
- * # Hawt, GlobalExecutorBasedEventDriven, GlobalHawt
+ * # GlobalExecutorBasedEventDriven
* keep-alive-time = 60 # Keep alive time for threads
* core-pool-size-factor = 1.0 # No of core threads ... ceil(available processors * factor)
* max-pool-size-factor = 4.0 # Max no of threads ... ceil(available processors * factor)
@@ -164,7 +153,6 @@ object Dispatchers extends Logging {
* allow-core-timeout = on # Allow core threads to time out
* rejection-policy = "caller-runs" # abort, caller-runs, discard-oldest, discard
* throughput = 5 # Throughput for ExecutorBasedEventDrivenDispatcher
- * aggregate = off # Aggregate on/off for HawtDispatchers
* }
* ex: from(config.getConfigMap(identifier).get)
*
@@ -211,11 +199,14 @@ object Dispatchers extends Logging {
threadPoolConfig)).build
case "ExecutorBasedEventDrivenWorkStealing" =>
- configureThreadPool(poolCfg => new ExecutorBasedEventDrivenWorkStealingDispatcher(name, mailboxType,poolCfg)).build
-
- case "Hawt" => new HawtDispatcher(cfg.getBool("aggregate",true))
+ configureThreadPool(threadPoolConfig => new ExecutorBasedEventDrivenWorkStealingDispatcher(
+ name,
+ cfg.getInt("throughput", THROUGHPUT),
+ cfg.getInt("throughput-deadline-time", THROUGHPUT_DEADLINE_TIME_MILLIS),
+ mailboxType,
+ threadPoolConfig,
+ cfg.getInt("max-donation", THROUGHPUT))).build
case "GlobalExecutorBasedEventDriven" => globalExecutorBasedEventDrivenDispatcher
- case "GlobalHawt" => globalHawtDispatcher
case unknown => throw new IllegalArgumentException("Unknown dispatcher type [%s]" format unknown)
}
}
diff --git a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala
index 2fa16eca71..e98603f8a6 100644
--- a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala
+++ b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala
@@ -101,7 +101,7 @@ class ExecutorBasedEventDrivenDispatcher(
/**
* @return the mailbox associated with the actor
*/
- private def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[MessageQueue with ExecutableMailbox]
+ protected def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[MessageQueue with ExecutableMailbox]
override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size
@@ -126,7 +126,6 @@ class ExecutorBasedEventDrivenDispatcher(
}
}
-
private[akka] def registerForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = if (active.isOn) {
if (!mbox.suspended.locked && mbox.dispatcherLock.tryLock()) {
try {
@@ -137,7 +136,11 @@ class ExecutorBasedEventDrivenDispatcher(
throw e
}
}
- } else log.slf4j.warn("{} is shut down,\n\tignoring the rest of the messages in the mailbox of\n\t{}", this, mbox)
+ }
+ else log.slf4j.warn("{} is shut down,\n\tignoring the rest of the messages in the mailbox of\n\t{}", this, mbox)
+
+ private[akka] def reRegisterForExecution(mbox: MessageQueue with ExecutableMailbox): Unit =
+ registerForExecution(mbox)
override val toString = getClass.getSimpleName + "[" + name + "]"
@@ -150,7 +153,7 @@ class ExecutorBasedEventDrivenDispatcher(
log.slf4j.debug("Resuming {}",actorRef.uuid)
val mbox = getMailbox(actorRef)
mbox.suspended.tryUnlock
- registerForExecution(mbox)
+ reRegisterForExecution(mbox)
}
}
@@ -170,7 +173,7 @@ trait ExecutableMailbox extends Runnable { self: MessageQueue =>
dispatcherLock.unlock()
}
if (!self.isEmpty)
- dispatcher.registerForExecution(this)
+ dispatcher.reRegisterForExecution(this)
}
/**
diff --git a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala
index 54aec2607d..7701562b64 100644
--- a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala
+++ b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala
@@ -4,14 +4,12 @@
package akka.dispatch
+import akka.actor.{ActorRef, Actor, IllegalActorStateException}
+import akka.util.{ReflectiveAccess, Switch}
-import akka.actor.{Actor, ActorRef, IllegalActorStateException}
-import akka.util.Switch
-
-import java.util.concurrent. {ExecutorService, CopyOnWriteArrayList}
-import java.util.concurrent.atomic.AtomicReference
-
-import jsr166x.{Deque, LinkedBlockingDeque}
+import java.util.Queue
+import java.util.concurrent.atomic.{AtomicReference, AtomicInteger}
+import java.util.concurrent.{ TimeUnit, ExecutorService, RejectedExecutionException, ConcurrentLinkedQueue, LinkedBlockingQueue}
/**
* An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed
@@ -21,235 +19,116 @@ import jsr166x.{Deque, LinkedBlockingDeque}
* Although the technique used in this implementation is commonly known as "work stealing", the actual implementation is probably
* best described as "work donating" because the actor of which work is being stolen takes the initiative.
*
- * This dispatcher attempts to redistribute work between actors each time a message is dispatched on a busy actor. Work
- * will not be redistributed when actors are busy, but no new messages are dispatched.
- * TODO: it would be nice to be able to redistribute work even when no new messages are being dispatched, without impacting dispatching performance ?!
- *
* The preferred way of creating dispatchers is to use
* the {@link akka.dispatch.Dispatchers} factory object.
*
* @see akka.dispatch.ExecutorBasedEventDrivenWorkStealingDispatcher
* @see akka.dispatch.Dispatchers
*
- * @author Jan Van Besien
+ * @author Viktor Klang
*/
class ExecutorBasedEventDrivenWorkStealingDispatcher(
_name: String,
- val mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE,
- config: ThreadPoolConfig = ThreadPoolConfig()) extends MessageDispatcher {
+ throughput: Int = Dispatchers.THROUGHPUT,
+ throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS,
+ mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE,
+ config: ThreadPoolConfig = ThreadPoolConfig(),
+ val maxDonationQty: Int = Dispatchers.THROUGHPUT)
+ extends ExecutorBasedEventDrivenDispatcher(_name, throughput, throughputDeadlineTime, mailboxType, config) {
- def this(_name: String, mailboxType: MailboxType) = this(_name, mailboxType,ThreadPoolConfig())
+ def this(_name: String, throughput: Int, throughputDeadlineTime: Int, mailboxType: MailboxType) =
+ this(_name, throughput, throughputDeadlineTime, mailboxType,ThreadPoolConfig()) // Needed for Java API usage
- def this(_name: String) = this(_name, Dispatchers.MAILBOX_TYPE,ThreadPoolConfig())
+ def this(_name: String, throughput: Int, mailboxType: MailboxType) =
+ this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage
- val name = "akka:event-driven-work-stealing:dispatcher:" + _name
+ def this(_name: String, throughput: Int) =
+ this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
+
+ def this(_name: String, _config: ThreadPoolConfig) =
+ this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE, _config)
+
+ def this(_name: String, memberType: Class[_ <: Actor]) =
+ this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
- /** Type of the actors registered in this dispatcher. */
@volatile private var actorType: Option[Class[_]] = None
- private val pooledActors = new CopyOnWriteArrayList[ActorRef]
- private[akka] val threadFactory = new MonitorableThreadFactory(name)
- private[akka] val executorService = new AtomicReference[ExecutorService](config.createLazyExecutorService(threadFactory))
+ @volatile private var members = Vector[ActorRef]()
/** The index in the pooled actors list which was last used to steal work */
- @volatile private var lastThiefIndex = 0
-
- /**
- * @return the mailbox associated with the actor
- */
- private def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[Deque[MessageInvocation] with MessageQueue with Runnable]
-
- override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size
-
- private[akka] def dispatch(invocation: MessageInvocation) {
- val mbox = getMailbox(invocation.receiver)
- mbox enqueue invocation
- executorService.get() execute mbox
- }
-
- /**
- * Try processing the mailbox of the given actor. Fails if the dispatching lock on the actor is already held by
- * another thread (because then that thread is already processing the mailbox).
- *
- * @return true if the mailbox was processed, false otherwise
- */
- private def tryProcessMailbox(mailbox: MessageQueue): Boolean = {
- var mailboxWasProcessed = false
-
- // this do-wile loop is required to prevent missing new messages between the end of processing
- // the mailbox and releasing the lock
- do {
- if (mailbox.dispatcherLock.tryLock) {
- try {
- mailboxWasProcessed = processMailbox(mailbox)
- } finally {
- mailbox.dispatcherLock.unlock
- }
- }
- } while ((mailboxWasProcessed && !mailbox.isEmpty))
-
- mailboxWasProcessed
- }
-
- /**
- * Process the messages in the mailbox of the given actor.
- * @return
- */
- private def processMailbox(mailbox: MessageQueue): Boolean = try {
- if (mailbox.suspended.locked)
- return false
-
- var messageInvocation = mailbox.dequeue
- while (messageInvocation ne null) {
- messageInvocation.invoke
- if (mailbox.suspended.locked)
- return false
- messageInvocation = mailbox.dequeue
- }
- true
- } catch {
- case ie: InterruptedException => false
- }
-
- private def findThief(receiver: ActorRef): Option[ActorRef] = {
- // copy to prevent concurrent modifications having any impact
- val actors = pooledActors.toArray(new Array[ActorRef](pooledActors.size))
- val i = if ( lastThiefIndex > actors.size ) 0 else lastThiefIndex
-
- // we risk to pick a thief which is unregistered from the dispatcher in the meantime, but that typically means
- // the dispatcher is being shut down...
- val (thief: Option[ActorRef], index: Int) = doFindThief(receiver, actors, i)
- lastThiefIndex = (index + 1) % actors.size
- thief
- }
-
- /**
- * Find a thief to process the receivers messages from the given list of actors.
- *
- * @param receiver original receiver of the message
- * @param actors list of actors to find a thief in
- * @param startIndex first index to start looking in the list (i.e. for round robin)
- * @return the thief (or None) and the new index to start searching next time
- */
- private def doFindThief(receiver: ActorRef, actors: Array[ActorRef], startIndex: Int): (Option[ActorRef], Int) = {
- for (i <- 0 to actors.length) {
- val index = (i + startIndex) % actors.length
- val actor = actors(index)
- if (actor != receiver && getMailbox(actor).isEmpty) return (Some(actor), index)
- }
- (None, startIndex) // nothing found, reuse same start index next time
- }
-
- /**
- * Try donating messages to the thief and processing the thiefs mailbox. Doesn't do anything if we can not acquire
- * the thiefs dispatching lock, because in that case another thread is already processing the thiefs mailbox.
- */
- private def tryDonateAndProcessMessages(receiver: ActorRef, thief: ActorRef) = {
- val mailbox = getMailbox(thief)
- if (mailbox.dispatcherLock.tryLock) {
- try {
- while(donateMessage(receiver, thief)) processMailbox(mailbox)
- } finally {
- mailbox.dispatcherLock.unlock
- }
- }
- }
-
- /**
- * Steal a message from the receiver and give it to the thief.
- */
- private def donateMessage(receiver: ActorRef, thief: ActorRef): Boolean = {
- val donated = getMailbox(receiver).pollLast
- if (donated ne null) {
- if (donated.senderFuture.isDefined) thief.postMessageToMailboxAndCreateFutureResultWithTimeout[Any](
- donated.message, receiver.timeout, donated.sender, donated.senderFuture)
- else if (donated.sender.isDefined) thief.postMessageToMailbox(donated.message, donated.sender)
- else thief.postMessageToMailbox(donated.message, None)
- true
- } else false
- }
-
- private[akka] def start = log.slf4j.debug("Starting up {}",toString)
-
- private[akka] def shutdown {
- val old = executorService.getAndSet(config.createLazyExecutorService(threadFactory))
- if (old ne null) {
- log.slf4j.debug("Shutting down {}", toString)
- old.shutdownNow()
- }
- }
-
-
- def suspend(actorRef: ActorRef) {
- getMailbox(actorRef).suspended.tryLock
- }
-
- def resume(actorRef: ActorRef) {
- val mbox = getMailbox(actorRef)
- mbox.suspended.tryUnlock
- executorService.get() execute mbox
- }
-
- override val toString = "ExecutorBasedEventDrivenWorkStealingDispatcher[" + name + "]"
-
- private[akka] def createMailbox(actorRef: ActorRef): AnyRef = mailboxType match {
- case UnboundedMailbox(blockDequeue) =>
- new LinkedBlockingDeque[MessageInvocation] with MessageQueue with Runnable {
- final def enqueue(handle: MessageInvocation) {
- this add handle
- }
-
- final def dequeue(): MessageInvocation = {
- if (blockDequeue) this.take()
- else this.poll()
- }
-
- def run = if (!tryProcessMailbox(this)) {
- // we are not able to process our mailbox (another thread is busy with it), so lets donate some of our mailbox
- // to another actor and then process his mailbox in stead.
- findThief(actorRef).foreach( tryDonateAndProcessMessages(actorRef,_) )
- }
- }
- case BoundedMailbox(blockDequeue, capacity, pushTimeOut) =>
- new LinkedBlockingDeque[MessageInvocation](capacity) with MessageQueue with Runnable {
-
- final def enqueue(handle: MessageInvocation) {
- if (pushTimeOut.toMillis > 0) {
- if (!this.offer(handle, pushTimeOut.length, pushTimeOut.unit))
- throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + toString)
- } else this put handle
- }
-
- final def dequeue(): MessageInvocation =
- if (blockDequeue) this.take()
- else this.poll()
-
- def run = if (!tryProcessMailbox(this)) {
- // we are not able to process our mailbox (another thread is busy with it), so lets donate some of our mailbox
- // to another actor and then process his mailbox in stead.
- findThief(actorRef).foreach( tryDonateAndProcessMessages(actorRef, _) )
- }
- }
- }
+ private val lastDonorRecipient = new AtomicInteger(0)
private[akka] override def register(actorRef: ActorRef) = {
- verifyActorsAreOfSameType(actorRef)
- pooledActors add actorRef
+ //Verify actor type conformity
+ actorType match {
+ case None => actorType = Some(actorRef.actor.getClass)
+ case Some(aType) =>
+ if (aType != actorRef.actor.getClass)
+ throw new IllegalActorStateException(String.format(
+ "Can't register actor %s in a work stealing dispatcher which already knows actors of type %s",
+ actorRef, aType))
+ }
+
+ synchronized { members :+= actorRef } //Update members
super.register(actorRef)
}
private[akka] override def unregister(actorRef: ActorRef) = {
- pooledActors remove actorRef
+ synchronized { members = members.filterNot(actorRef eq) } //Update members
super.unregister(actorRef)
}
- private def verifyActorsAreOfSameType(actorOfId: ActorRef) = {
- actorType match {
- case None => actorType = Some(actorOfId.actor.getClass)
- case Some(aType) =>
- if (aType != actorOfId.actor.getClass)
- throw new IllegalActorStateException(String.format(
- "Can't register actor {} in a work stealing dispatcher which already knows actors of type {}",
- actorOfId.actor, aType))
+ override private[akka] def reRegisterForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = {
+ donateFrom(mbox) //When we reregister, first donate messages to another actor
+ if (!mbox.isEmpty) //If we still have messages left to process, reschedule for execution
+ super.reRegisterForExecution(mbox)
+ }
+
+ private[akka] def donateFrom(donorMbox: MessageQueue with ExecutableMailbox): Unit = {
+ val actors = members // copy to prevent concurrent modifications having any impact
+ val actorSz = actors.size
+ val ldr = lastDonorRecipient.get
+ val i = if ( ldr > actorSz ) 0 else ldr
+
+ def doFindDonorRecipient(donorMbox: MessageQueue with ExecutableMailbox, potentialRecipients: Vector[ActorRef], startIndex: Int): ActorRef = {
+ val prSz = potentialRecipients.size
+ var i = 0
+ var recipient: ActorRef = null
+ while((i < prSz) && (recipient eq null)) {
+ val index = (i + startIndex) % prSz //Wrap-around, one full lap
+ val actor = potentialRecipients(index)
+ val mbox = getMailbox(actor)
+
+ if ((mbox ne donorMbox) && mbox.isEmpty) { //Don't donate to yourself
+ lastDonorRecipient.set((index + 1) % actors.length)
+ recipient = actor //Found!
+ }
+
+ i += 1
+ }
+
+ lastDonorRecipient.compareAndSet(ldr, (startIndex + 1) % actors.length)
+ recipient // nothing found, reuse same start index next time
+ }
+
+ // we risk to pick a thief which is unregistered from the dispatcher in the meantime, but that typically means
+ // the dispatcher is being shut down...
+ val recipient = doFindDonorRecipient(donorMbox, actors, i)
+ if (recipient ne null) {
+ def tryDonate(): Boolean = {
+ var organ = donorMbox.dequeue //FIXME switch to something that cannot block
+ if (organ ne null) {
+ println("DONATING!!!")
+ if (organ.senderFuture.isDefined) recipient.postMessageToMailboxAndCreateFutureResultWithTimeout[Any](
+ organ.message, recipient.timeout, organ.sender, organ.senderFuture)
+ else if (organ.sender.isDefined) recipient.postMessageToMailbox(organ.message, organ.sender)
+ else recipient.postMessageToMailbox(organ.message, None)
+ true
+ } else false
+ }
+
+ var donated = 0
+ while(donated < maxDonationQty && tryDonate())
+ donated += 1
}
}
-}
+}
\ No newline at end of file
diff --git a/akka-actor/src/main/scala/akka/dispatch/HawtDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/HawtDispatcher.scala
deleted file mode 100644
index e5e54ea0e9..0000000000
--- a/akka-actor/src/main/scala/akka/dispatch/HawtDispatcher.scala
+++ /dev/null
@@ -1,201 +0,0 @@
-/**
- * Copyright (C) 2009-2011 Scalable Solutions AB - * Pins an actor to a random thread queue. Once pinned the actor will always execute - * on the same thread. - *
- * - *- * This method can only succeed if the actor it's dispatcher is set to a HawtDispatcher and it has been started - *
- * - * @return true if the actor was pinned - */ - def pin(actorRef: ActorRef) = actorRef.mailbox match { - case x: HawtDispatcherMailbox => - x.queue.setTargetQueue( getRandomThreadQueue ) - true - case _ => false - } - - /** - *- * Unpins the actor so that all threads in the hawt dispatch thread pool - * compete to execute him. - *
- * - *- * This method can only succeed if the actor it's dispatcher is set to a HawtDispatcher and it has been started - *
- * @return true if the actor was unpinned - */ - def unpin(actorRef: ActorRef) = target(actorRef, globalQueue) - - /** - * @return true if the actor was pinned to a thread. - */ - def pinned(actorRef: ActorRef):Boolean = actorRef.mailbox match { - case x: HawtDispatcherMailbox => x.queue.getTargetQueue.getQueueType == QueueType.THREAD_QUEUE - case _ => false - } - - /** - *- * Updates the actor's target dispatch queue to the value specified. This allows - * you to do odd things like targeting another serial queue. - *
- * - *- * This method can only succeed if the actor it's dispatcher is set to a HawtDispatcher and it has been started - *
- * @return true if the actor was unpinned - */ - def target(actorRef: ActorRef, parent: DispatchQueue) = actorRef.mailbox match { - case x: HawtDispatcherMailbox => - x.queue.setTargetQueue(parent) - true - case _ => false - } -} - -/** - *- * A HawtDispatch based MessageDispatcher. Actors with this dispatcher are executed - * on the HawtDispatch fixed sized thread pool. The number of of threads will match - * the number of cores available on your system. - * - *
- *- * Actors using this dispatcher are restricted to only executing non blocking - * operations. The actor cannot synchronously call another actor or call 3rd party - * libraries that can block for a long time. You should use non blocking IO APIs - * instead of blocking IO apis to avoid blocking that actor for an extended amount - * of time. - *
- * - *- * This dispatcher delivers messages to the actors in the order that they - * were producer at the sender. - *
- * - *
- * HawtDispatch supports processing Non blocking Socket IO in both the reactor
- * and proactor styles. For more details, see the HawtDispacherEchoServer.scala
- * example.
- *