Removing HawtDispatch, the old WorkStealing dispatcher, replace old workstealer with new workstealer based on EBEDD, and remove jsr166x dependency, only 3 more deps to go until 0 deps for akka-actor

This commit is contained in:
Viktor Klang 2011-02-27 22:44:37 +01:00
parent a76e62096f
commit a6bfe644d5
14 changed files with 113 additions and 740 deletions

View file

@ -92,10 +92,6 @@ object Actor extends Logging {
private[akka] lazy val shutdownHook = {
val hook = new Runnable {
override def run {
// Shutdown HawtDispatch GlobalQueue
log.slf4j.info("Shutting down Hawt Dispatch global queue")
org.fusesource.hawtdispatch.globalQueue.asInstanceOf[org.fusesource.hawtdispatch.internal.GlobalDispatchQueue].shutdown
// Clear Thread.subclassAudits
log.slf4j.info("Clearing subclass audits")
val tf = classOf[java.lang.Thread].getDeclaredField("subclassAudits")

View file

@ -61,19 +61,8 @@ object Dispatchers extends Logging {
config.getConfigMap("akka.actor.default-dispatcher").flatMap(from).getOrElse(globalExecutorBasedEventDrivenDispatcher)
}
object globalHawtDispatcher extends HawtDispatcher
object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher("global", THROUGHPUT, THROUGHPUT_DEADLINE_TIME_MILLIS, MAILBOX_TYPE)
/**
* Creates an event-driven dispatcher based on the excellent HawtDispatch library.
* <p/>
* Can be beneficial to use the <code>HawtDispatcher.pin(self)</code> to "pin" an actor to a specific thread.
* <p/>
* See the ScalaDoc for the {@link akka.dispatch.HawtDispatcher} for details.
*/
def newHawtDispatcher(aggregate: Boolean) = new HawtDispatcher(aggregate)
/**
* Creates an thread based dispatcher serving a single actor through the same single thread.
* Uses the default timeout
@ -141,7 +130,7 @@ object Dispatchers extends Logging {
* Has a fluent builder interface for configuring its semantics.
*/
def newExecutorBasedEventDrivenWorkStealingDispatcher(name: String, mailboxType: MailboxType) =
ThreadPoolConfigDispatcherBuilder(config => new ExecutorBasedEventDrivenWorkStealingDispatcher(name,mailboxType,config),ThreadPoolConfig())
ThreadPoolConfigDispatcherBuilder(config => new ExecutorBasedEventDrivenWorkStealingDispatcher(name, THROUGHPUT, THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType, config, THROUGHPUT),ThreadPoolConfig())
/**
* Utility function that tries to load the specified dispatcher config from the akka.conf
@ -156,7 +145,7 @@ object Dispatchers extends Logging {
* default-dispatcher {
* type = "GlobalExecutorBasedEventDriven" # Must be one of the following, all "Global*" are non-configurable
* # (ExecutorBasedEventDrivenWorkStealing), ExecutorBasedEventDriven,
* # Hawt, GlobalExecutorBasedEventDriven, GlobalHawt
* # GlobalExecutorBasedEventDriven
* keep-alive-time = 60 # Keep alive time for threads
* core-pool-size-factor = 1.0 # No of core threads ... ceil(available processors * factor)
* max-pool-size-factor = 4.0 # Max no of threads ... ceil(available processors * factor)
@ -164,7 +153,6 @@ object Dispatchers extends Logging {
* allow-core-timeout = on # Allow core threads to time out
* rejection-policy = "caller-runs" # abort, caller-runs, discard-oldest, discard
* throughput = 5 # Throughput for ExecutorBasedEventDrivenDispatcher
* aggregate = off # Aggregate on/off for HawtDispatchers
* }
* ex: from(config.getConfigMap(identifier).get)
*
@ -211,11 +199,14 @@ object Dispatchers extends Logging {
threadPoolConfig)).build
case "ExecutorBasedEventDrivenWorkStealing" =>
configureThreadPool(poolCfg => new ExecutorBasedEventDrivenWorkStealingDispatcher(name, mailboxType,poolCfg)).build
case "Hawt" => new HawtDispatcher(cfg.getBool("aggregate",true))
configureThreadPool(threadPoolConfig => new ExecutorBasedEventDrivenWorkStealingDispatcher(
name,
cfg.getInt("throughput", THROUGHPUT),
cfg.getInt("throughput-deadline-time", THROUGHPUT_DEADLINE_TIME_MILLIS),
mailboxType,
threadPoolConfig,
cfg.getInt("max-donation", THROUGHPUT))).build
case "GlobalExecutorBasedEventDriven" => globalExecutorBasedEventDrivenDispatcher
case "GlobalHawt" => globalHawtDispatcher
case unknown => throw new IllegalArgumentException("Unknown dispatcher type [%s]" format unknown)
}
}

View file

@ -101,7 +101,7 @@ class ExecutorBasedEventDrivenDispatcher(
/**
* @return the mailbox associated with the actor
*/
private def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[MessageQueue with ExecutableMailbox]
protected def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[MessageQueue with ExecutableMailbox]
override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size
@ -126,7 +126,6 @@ class ExecutorBasedEventDrivenDispatcher(
}
}
private[akka] def registerForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = if (active.isOn) {
if (!mbox.suspended.locked && mbox.dispatcherLock.tryLock()) {
try {
@ -137,7 +136,11 @@ class ExecutorBasedEventDrivenDispatcher(
throw e
}
}
} else log.slf4j.warn("{} is shut down,\n\tignoring the rest of the messages in the mailbox of\n\t{}", this, mbox)
}
else log.slf4j.warn("{} is shut down,\n\tignoring the rest of the messages in the mailbox of\n\t{}", this, mbox)
private[akka] def reRegisterForExecution(mbox: MessageQueue with ExecutableMailbox): Unit =
registerForExecution(mbox)
override val toString = getClass.getSimpleName + "[" + name + "]"
@ -150,7 +153,7 @@ class ExecutorBasedEventDrivenDispatcher(
log.slf4j.debug("Resuming {}",actorRef.uuid)
val mbox = getMailbox(actorRef)
mbox.suspended.tryUnlock
registerForExecution(mbox)
reRegisterForExecution(mbox)
}
}
@ -170,7 +173,7 @@ trait ExecutableMailbox extends Runnable { self: MessageQueue =>
dispatcherLock.unlock()
}
if (!self.isEmpty)
dispatcher.registerForExecution(this)
dispatcher.reRegisterForExecution(this)
}
/**

View file

@ -4,14 +4,12 @@
package akka.dispatch
import akka.actor.{ActorRef, Actor, IllegalActorStateException}
import akka.util.{ReflectiveAccess, Switch}
import akka.actor.{Actor, ActorRef, IllegalActorStateException}
import akka.util.Switch
import java.util.concurrent. {ExecutorService, CopyOnWriteArrayList}
import java.util.concurrent.atomic.AtomicReference
import jsr166x.{Deque, LinkedBlockingDeque}
import java.util.Queue
import java.util.concurrent.atomic.{AtomicReference, AtomicInteger}
import java.util.concurrent.{ TimeUnit, ExecutorService, RejectedExecutionException, ConcurrentLinkedQueue, LinkedBlockingQueue}
/**
* An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed
@ -21,235 +19,116 @@ import jsr166x.{Deque, LinkedBlockingDeque}
* Although the technique used in this implementation is commonly known as "work stealing", the actual implementation is probably
* best described as "work donating" because the actor of which work is being stolen takes the initiative.
* <p/>
* This dispatcher attempts to redistribute work between actors each time a message is dispatched on a busy actor. Work
* will not be redistributed when actors are busy, but no new messages are dispatched.
* TODO: it would be nice to be able to redistribute work even when no new messages are being dispatched, without impacting dispatching performance ?!
* <p/>
* The preferred way of creating dispatchers is to use
* the {@link akka.dispatch.Dispatchers} factory object.
*
* @see akka.dispatch.ExecutorBasedEventDrivenWorkStealingDispatcher
* @see akka.dispatch.Dispatchers
*
* @author Jan Van Besien
* @author Viktor Klang
*/
class ExecutorBasedEventDrivenWorkStealingDispatcher(
_name: String,
val mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE,
config: ThreadPoolConfig = ThreadPoolConfig()) extends MessageDispatcher {
throughput: Int = Dispatchers.THROUGHPUT,
throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS,
mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE,
config: ThreadPoolConfig = ThreadPoolConfig(),
val maxDonationQty: Int = Dispatchers.THROUGHPUT)
extends ExecutorBasedEventDrivenDispatcher(_name, throughput, throughputDeadlineTime, mailboxType, config) {
def this(_name: String, mailboxType: MailboxType) = this(_name, mailboxType,ThreadPoolConfig())
def this(_name: String, throughput: Int, throughputDeadlineTime: Int, mailboxType: MailboxType) =
this(_name, throughput, throughputDeadlineTime, mailboxType,ThreadPoolConfig()) // Needed for Java API usage
def this(_name: String) = this(_name, Dispatchers.MAILBOX_TYPE,ThreadPoolConfig())
def this(_name: String, throughput: Int, mailboxType: MailboxType) =
this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage
val name = "akka:event-driven-work-stealing:dispatcher:" + _name
def this(_name: String, throughput: Int) =
this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
def this(_name: String, _config: ThreadPoolConfig) =
this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE, _config)
def this(_name: String, memberType: Class[_ <: Actor]) =
this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
/** Type of the actors registered in this dispatcher. */
@volatile private var actorType: Option[Class[_]] = None
private val pooledActors = new CopyOnWriteArrayList[ActorRef]
private[akka] val threadFactory = new MonitorableThreadFactory(name)
private[akka] val executorService = new AtomicReference[ExecutorService](config.createLazyExecutorService(threadFactory))
@volatile private var members = Vector[ActorRef]()
/** The index in the pooled actors list which was last used to steal work */
@volatile private var lastThiefIndex = 0
/**
* @return the mailbox associated with the actor
*/
private def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[Deque[MessageInvocation] with MessageQueue with Runnable]
override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size
private[akka] def dispatch(invocation: MessageInvocation) {
val mbox = getMailbox(invocation.receiver)
mbox enqueue invocation
executorService.get() execute mbox
}
/**
* Try processing the mailbox of the given actor. Fails if the dispatching lock on the actor is already held by
* another thread (because then that thread is already processing the mailbox).
*
* @return true if the mailbox was processed, false otherwise
*/
private def tryProcessMailbox(mailbox: MessageQueue): Boolean = {
var mailboxWasProcessed = false
// this do-wile loop is required to prevent missing new messages between the end of processing
// the mailbox and releasing the lock
do {
if (mailbox.dispatcherLock.tryLock) {
try {
mailboxWasProcessed = processMailbox(mailbox)
} finally {
mailbox.dispatcherLock.unlock
}
}
} while ((mailboxWasProcessed && !mailbox.isEmpty))
mailboxWasProcessed
}
/**
* Process the messages in the mailbox of the given actor.
* @return
*/
private def processMailbox(mailbox: MessageQueue): Boolean = try {
if (mailbox.suspended.locked)
return false
var messageInvocation = mailbox.dequeue
while (messageInvocation ne null) {
messageInvocation.invoke
if (mailbox.suspended.locked)
return false
messageInvocation = mailbox.dequeue
}
true
} catch {
case ie: InterruptedException => false
}
private def findThief(receiver: ActorRef): Option[ActorRef] = {
// copy to prevent concurrent modifications having any impact
val actors = pooledActors.toArray(new Array[ActorRef](pooledActors.size))
val i = if ( lastThiefIndex > actors.size ) 0 else lastThiefIndex
// we risk to pick a thief which is unregistered from the dispatcher in the meantime, but that typically means
// the dispatcher is being shut down...
val (thief: Option[ActorRef], index: Int) = doFindThief(receiver, actors, i)
lastThiefIndex = (index + 1) % actors.size
thief
}
/**
* Find a thief to process the receivers messages from the given list of actors.
*
* @param receiver original receiver of the message
* @param actors list of actors to find a thief in
* @param startIndex first index to start looking in the list (i.e. for round robin)
* @return the thief (or None) and the new index to start searching next time
*/
private def doFindThief(receiver: ActorRef, actors: Array[ActorRef], startIndex: Int): (Option[ActorRef], Int) = {
for (i <- 0 to actors.length) {
val index = (i + startIndex) % actors.length
val actor = actors(index)
if (actor != receiver && getMailbox(actor).isEmpty) return (Some(actor), index)
}
(None, startIndex) // nothing found, reuse same start index next time
}
/**
* Try donating messages to the thief and processing the thiefs mailbox. Doesn't do anything if we can not acquire
* the thiefs dispatching lock, because in that case another thread is already processing the thiefs mailbox.
*/
private def tryDonateAndProcessMessages(receiver: ActorRef, thief: ActorRef) = {
val mailbox = getMailbox(thief)
if (mailbox.dispatcherLock.tryLock) {
try {
while(donateMessage(receiver, thief)) processMailbox(mailbox)
} finally {
mailbox.dispatcherLock.unlock
}
}
}
/**
* Steal a message from the receiver and give it to the thief.
*/
private def donateMessage(receiver: ActorRef, thief: ActorRef): Boolean = {
val donated = getMailbox(receiver).pollLast
if (donated ne null) {
if (donated.senderFuture.isDefined) thief.postMessageToMailboxAndCreateFutureResultWithTimeout[Any](
donated.message, receiver.timeout, donated.sender, donated.senderFuture)
else if (donated.sender.isDefined) thief.postMessageToMailbox(donated.message, donated.sender)
else thief.postMessageToMailbox(donated.message, None)
true
} else false
}
private[akka] def start = log.slf4j.debug("Starting up {}",toString)
private[akka] def shutdown {
val old = executorService.getAndSet(config.createLazyExecutorService(threadFactory))
if (old ne null) {
log.slf4j.debug("Shutting down {}", toString)
old.shutdownNow()
}
}
def suspend(actorRef: ActorRef) {
getMailbox(actorRef).suspended.tryLock
}
def resume(actorRef: ActorRef) {
val mbox = getMailbox(actorRef)
mbox.suspended.tryUnlock
executorService.get() execute mbox
}
override val toString = "ExecutorBasedEventDrivenWorkStealingDispatcher[" + name + "]"
private[akka] def createMailbox(actorRef: ActorRef): AnyRef = mailboxType match {
case UnboundedMailbox(blockDequeue) =>
new LinkedBlockingDeque[MessageInvocation] with MessageQueue with Runnable {
final def enqueue(handle: MessageInvocation) {
this add handle
}
final def dequeue(): MessageInvocation = {
if (blockDequeue) this.take()
else this.poll()
}
def run = if (!tryProcessMailbox(this)) {
// we are not able to process our mailbox (another thread is busy with it), so lets donate some of our mailbox
// to another actor and then process his mailbox in stead.
findThief(actorRef).foreach( tryDonateAndProcessMessages(actorRef,_) )
}
}
case BoundedMailbox(blockDequeue, capacity, pushTimeOut) =>
new LinkedBlockingDeque[MessageInvocation](capacity) with MessageQueue with Runnable {
final def enqueue(handle: MessageInvocation) {
if (pushTimeOut.toMillis > 0) {
if (!this.offer(handle, pushTimeOut.length, pushTimeOut.unit))
throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + toString)
} else this put handle
}
final def dequeue(): MessageInvocation =
if (blockDequeue) this.take()
else this.poll()
def run = if (!tryProcessMailbox(this)) {
// we are not able to process our mailbox (another thread is busy with it), so lets donate some of our mailbox
// to another actor and then process his mailbox in stead.
findThief(actorRef).foreach( tryDonateAndProcessMessages(actorRef, _) )
}
}
}
private val lastDonorRecipient = new AtomicInteger(0)
private[akka] override def register(actorRef: ActorRef) = {
verifyActorsAreOfSameType(actorRef)
pooledActors add actorRef
//Verify actor type conformity
actorType match {
case None => actorType = Some(actorRef.actor.getClass)
case Some(aType) =>
if (aType != actorRef.actor.getClass)
throw new IllegalActorStateException(String.format(
"Can't register actor %s in a work stealing dispatcher which already knows actors of type %s",
actorRef, aType))
}
synchronized { members :+= actorRef } //Update members
super.register(actorRef)
}
private[akka] override def unregister(actorRef: ActorRef) = {
pooledActors remove actorRef
synchronized { members = members.filterNot(actorRef eq) } //Update members
super.unregister(actorRef)
}
private def verifyActorsAreOfSameType(actorOfId: ActorRef) = {
actorType match {
case None => actorType = Some(actorOfId.actor.getClass)
case Some(aType) =>
if (aType != actorOfId.actor.getClass)
throw new IllegalActorStateException(String.format(
"Can't register actor {} in a work stealing dispatcher which already knows actors of type {}",
actorOfId.actor, aType))
override private[akka] def reRegisterForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = {
donateFrom(mbox) //When we reregister, first donate messages to another actor
if (!mbox.isEmpty) //If we still have messages left to process, reschedule for execution
super.reRegisterForExecution(mbox)
}
private[akka] def donateFrom(donorMbox: MessageQueue with ExecutableMailbox): Unit = {
val actors = members // copy to prevent concurrent modifications having any impact
val actorSz = actors.size
val ldr = lastDonorRecipient.get
val i = if ( ldr > actorSz ) 0 else ldr
def doFindDonorRecipient(donorMbox: MessageQueue with ExecutableMailbox, potentialRecipients: Vector[ActorRef], startIndex: Int): ActorRef = {
val prSz = potentialRecipients.size
var i = 0
var recipient: ActorRef = null
while((i < prSz) && (recipient eq null)) {
val index = (i + startIndex) % prSz //Wrap-around, one full lap
val actor = potentialRecipients(index)
val mbox = getMailbox(actor)
if ((mbox ne donorMbox) && mbox.isEmpty) { //Don't donate to yourself
lastDonorRecipient.set((index + 1) % actors.length)
recipient = actor //Found!
}
i += 1
}
lastDonorRecipient.compareAndSet(ldr, (startIndex + 1) % actors.length)
recipient // nothing found, reuse same start index next time
}
// we risk to pick a thief which is unregistered from the dispatcher in the meantime, but that typically means
// the dispatcher is being shut down...
val recipient = doFindDonorRecipient(donorMbox, actors, i)
if (recipient ne null) {
def tryDonate(): Boolean = {
var organ = donorMbox.dequeue //FIXME switch to something that cannot block
if (organ ne null) {
println("DONATING!!!")
if (organ.senderFuture.isDefined) recipient.postMessageToMailboxAndCreateFutureResultWithTimeout[Any](
organ.message, recipient.timeout, organ.sender, organ.senderFuture)
else if (organ.sender.isDefined) recipient.postMessageToMailbox(organ.message, organ.sender)
else recipient.postMessageToMailbox(organ.message, None)
true
} else false
}
var donated = 0
while(donated < maxDonationQty && tryDonate())
donated += 1
}
}
}
}

View file

@ -1,201 +0,0 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.dispatch
import akka.actor.ActorRef
import akka.util.Switch
import org.fusesource.hawtdispatch._
import org.fusesource.hawtdispatch.DispatchQueue.QueueType
import java.util.concurrent.atomic.{AtomicInteger, AtomicBoolean}
import java.util.concurrent.CountDownLatch
/**
* Holds helper methods for working with actors that are using a HawtDispatcher as it's dispatcher.
*/
object HawtDispatcher {
private val retained = new AtomicInteger()
@volatile private var shutdownLatch: CountDownLatch = _
private def retainNonDaemon = if (retained.getAndIncrement == 0) {
shutdownLatch = new CountDownLatch(1)
new Thread("HawtDispatch Non-Daemon") {
override def run = {
try {
shutdownLatch.await
} catch {
case _ =>
}
}
}.start()
}
private def releaseNonDaemon = if (retained.decrementAndGet == 0) {
shutdownLatch.countDown
shutdownLatch = null
}
/**
* @return the mailbox associated with the actor
*/
private def mailbox(actorRef: ActorRef) = actorRef.mailbox.asInstanceOf[HawtDispatcherMailbox]
/**
* @return the dispatch queue associated with the actor
*/
def queue(actorRef: ActorRef) = mailbox(actorRef).queue
/**
* <p>
* Pins an actor to a random thread queue. Once pinned the actor will always execute
* on the same thread.
* </p>
*
* <p>
* This method can only succeed if the actor it's dispatcher is set to a HawtDispatcher and it has been started
* </p>
*
* @return true if the actor was pinned
*/
def pin(actorRef: ActorRef) = actorRef.mailbox match {
case x: HawtDispatcherMailbox =>
x.queue.setTargetQueue( getRandomThreadQueue )
true
case _ => false
}
/**
* <p>
* Unpins the actor so that all threads in the hawt dispatch thread pool
* compete to execute him.
* </p>
*
* <p>
* This method can only succeed if the actor it's dispatcher is set to a HawtDispatcher and it has been started
* </p>
* @return true if the actor was unpinned
*/
def unpin(actorRef: ActorRef) = target(actorRef, globalQueue)
/**
* @return true if the actor was pinned to a thread.
*/
def pinned(actorRef: ActorRef):Boolean = actorRef.mailbox match {
case x: HawtDispatcherMailbox => x.queue.getTargetQueue.getQueueType == QueueType.THREAD_QUEUE
case _ => false
}
/**
* <p>
* Updates the actor's target dispatch queue to the value specified. This allows
* you to do odd things like targeting another serial queue.
* </p>
*
* <p>
* This method can only succeed if the actor it's dispatcher is set to a HawtDispatcher and it has been started
* </p>
* @return true if the actor was unpinned
*/
def target(actorRef: ActorRef, parent: DispatchQueue) = actorRef.mailbox match {
case x: HawtDispatcherMailbox =>
x.queue.setTargetQueue(parent)
true
case _ => false
}
}
/**
* <p>
* A HawtDispatch based MessageDispatcher. Actors with this dispatcher are executed
* on the HawtDispatch fixed sized thread pool. The number of of threads will match
* the number of cores available on your system.
*
* </p>
* <p>
* Actors using this dispatcher are restricted to only executing non blocking
* operations. The actor cannot synchronously call another actor or call 3rd party
* libraries that can block for a long time. You should use non blocking IO APIs
* instead of blocking IO apis to avoid blocking that actor for an extended amount
* of time.
* </p>
*
* <p>
* This dispatcher delivers messages to the actors in the order that they
* were producer at the sender.
* </p>
*
* <p>
* HawtDispatch supports processing Non blocking Socket IO in both the reactor
* and proactor styles. For more details, see the <code>HawtDispacherEchoServer.scala</code>
* example.
* </p>
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
class HawtDispatcher(val aggregate: Boolean = true, val parent: DispatchQueue = globalQueue) extends MessageDispatcher {
import HawtDispatcher._
private[akka] def start { retainNonDaemon }
private[akka] def shutdown { releaseNonDaemon }
private[akka] def dispatch(invocation: MessageInvocation){
mailbox(invocation.receiver).dispatch(invocation)
}
// hawtdispatch does not have a way to get queue sizes, getting an accurate
// size can cause extra contention.. is this really needed?
// TODO: figure out if this can be optional in akka
override def mailboxSize(actorRef: ActorRef) = 0
def createMailbox(actorRef: ActorRef): AnyRef = {
val queue = parent.createQueue(actorRef.toString)
if (aggregate) new AggregatingHawtDispatcherMailbox(queue)
else new HawtDispatcherMailbox(queue)
}
def suspend(actorRef: ActorRef) = mailbox(actorRef).suspend
def resume(actorRef:ActorRef) = mailbox(actorRef).resume
override def toString = "HawtDispatcher"
}
class HawtDispatcherMailbox(val queue: DispatchQueue) {
def dispatch(invocation: MessageInvocation) {
queue {
invocation.invoke
}
}
def suspend = queue.suspend
def resume = queue.resume
}
class AggregatingHawtDispatcherMailbox(queue:DispatchQueue) extends HawtDispatcherMailbox(queue) {
private val source = createSource(new ListEventAggregator[MessageInvocation](), queue)
source.setEventHandler (^{drain_source} )
source.resume
private def drain_source = source.getData.foreach(_.invoke)
override def suspend = source.suspend
override def resume = source.resume
override def dispatch(invocation: MessageInvocation) {
if (getCurrentQueue eq null) {
// we are being call from a non hawtdispatch thread, can't aggregate
// it's events
super.dispatch(invocation)
} else {
// we are being call from a hawtdispatch thread, use the dispatch source
// so that multiple invocations issues on this thread will aggregate and then once
// the thread runs out of work, they get transferred as a batch to the other thread.
source.merge(invocation)
}
}
}

View file

@ -33,7 +33,7 @@ sealed trait MailboxType
case class UnboundedMailbox(val blocking: Boolean = false) extends MailboxType
case class BoundedMailbox(
val blocking: Boolean = false,
val blocking: Boolean = false,
val capacity: Int = { if (Dispatchers.MAILBOX_CAPACITY < 0) Int.MaxValue else Dispatchers.MAILBOX_CAPACITY },
val pushTimeOut: Duration = Dispatchers.MAILBOX_PUSH_TIME_OUT) extends MailboxType {
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative")

View file

@ -46,7 +46,7 @@ class SupervisorMiscSpec extends WordSpec with MustMatchers {
}).start
val actor4 = Actor.actorOf(new Actor {
self.dispatcher = Dispatchers.newHawtDispatcher(true)
self.dispatcher = Dispatchers.newThreadBasedDispatcher(self)
override def postRestart(cause: Throwable) {countDownLatch.countDown}
protected def receive = {

View file

@ -303,10 +303,6 @@ class ExecutorBasedEventDrivenDispatcherModelTest extends ActorModelSpec {
new ExecutorBasedEventDrivenDispatcher("foo") with MessageDispatcherInterceptor
}
class HawtDispatcherModelTest extends ActorModelSpec {
def newInterceptedDispatcher = new HawtDispatcher(false) with MessageDispatcherInterceptor
}
class ExecutorBasedEventDrivenWorkStealingDispatcherModelTest extends ActorModelSpec {
def newInterceptedDispatcher = new ExecutorBasedEventDrivenWorkStealingDispatcher("foo") with MessageDispatcherInterceptor
}
}

View file

@ -22,7 +22,6 @@ object DispatchersSpec {
val allowcoretimeout = "allow-core-timeout"
val rejectionpolicy = "rejection-policy" // abort, caller-runs, discard-oldest, discard
val throughput = "throughput" // Throughput for ExecutorBasedEventDrivenDispatcher
val aggregate = "aggregate" // Aggregate on/off for HawtDispatchers
def instance(dispatcher: MessageDispatcher): (MessageDispatcher) => Boolean = _ == dispatcher
def ofType[T <: MessageDispatcher : Manifest]: (MessageDispatcher) => Boolean = _.getClass == manifest[T].erasure
@ -30,9 +29,7 @@ object DispatchersSpec {
def typesAndValidators: Map[String,(MessageDispatcher) => Boolean] = Map(
"ExecutorBasedEventDrivenWorkStealing" -> ofType[ExecutorBasedEventDrivenWorkStealingDispatcher],
"ExecutorBasedEventDriven" -> ofType[ExecutorBasedEventDrivenDispatcher],
"Hawt" -> ofType[HawtDispatcher],
"GlobalExecutorBasedEventDriven" -> instance(globalExecutorBasedEventDrivenDispatcher),
"GlobalHawt" -> instance(globalHawtDispatcher)
"GlobalExecutorBasedEventDriven" -> instance(globalExecutorBasedEventDrivenDispatcher)
)
def validTypes = typesAndValidators.keys.toList

View file

@ -1,71 +0,0 @@
package akka.actor.dispatch
import java.util.concurrent.{CountDownLatch, TimeUnit}
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import akka.dispatch.{HawtDispatcher, Dispatchers}
import akka.actor.Actor
import Actor._
object HawtDispatcherActorSpec {
class TestActor extends Actor {
self.dispatcher = new HawtDispatcher()
def receive = {
case "Hello" =>
self.reply("World")
case "Failure" =>
throw new RuntimeException("Expected exception; to test fault-tolerance")
}
}
object OneWayTestActor {
val oneWay = new CountDownLatch(1)
}
class OneWayTestActor extends Actor {
self.dispatcher = new HawtDispatcher()
def receive = {
case "OneWay" => OneWayTestActor.oneWay.countDown
}
}
}
class HawtDispatcherActorSpec extends JUnitSuite {
import HawtDispatcherActorSpec._
private val unit = TimeUnit.MILLISECONDS
@Test def shouldSendOneWay = {
val actor = actorOf[OneWayTestActor].start
val result = actor ! "OneWay"
assert(OneWayTestActor.oneWay.await(1, TimeUnit.SECONDS))
actor.stop
}
@Test def shouldSendReplySync = {
val actor = actorOf[TestActor].start
val result = (actor !! ("Hello", 10000)).as[String]
assert("World" === result.get)
actor.stop
}
@Test def shouldSendReplyAsync = {
val actor = actorOf[TestActor].start
val result = actor !! "Hello"
assert("World" === result.get.asInstanceOf[String])
actor.stop
}
@Test def shouldSendReceiveException = {
val actor = actorOf[TestActor].start
try {
actor !! "Failure"
fail("Should have thrown an exception")
} catch {
case e =>
assert("Expected exception; to test fault-tolerance" === e.getMessage())
}
actor.stop
}
}

View file

@ -1,206 +0,0 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.actor.dispatch
import scala.collection.mutable.ListBuffer
import java.util.concurrent.TimeUnit
import java.net.InetSocketAddress
import java.io.IOException
import java.nio.ByteBuffer
import java.nio.channels.{SocketChannel, SelectionKey, ServerSocketChannel}
import akka.actor._
import akka.actor.Actor._
import akka.dispatch.HawtDispatcher
import org.fusesource.hawtdispatch._
/**
* This is an example of how to crate an Akka actor based TCP echo server using
* the HawtDispatch dispatcher and NIO event sources.
*/
object HawtDispatcherEchoServer {
private val hawt = new HawtDispatcher
var port=4444;
var useReactorPattern=true
def main(args:Array[String]):Unit = run
def run() = {
val server = actorOf(new Server(port))
server.start
Scheduler.schedule(server, DisplayStats, 1, 5, TimeUnit.SECONDS)
println("Press enter to shutdown.");
System.in.read
server ! Shutdown
}
case object Shutdown
case object DisplayStats
case class SessionClosed(session:ActorRef)
class Server(val port: Int) extends Actor {
self.dispatcher = hawt
var channel:ServerSocketChannel = _
var accept_source:DispatchSource = _
var sessions = ListBuffer[ActorRef]()
override def preStart = {
channel = ServerSocketChannel.open();
channel.socket().bind(new InetSocketAddress(port));
channel.configureBlocking(false);
// Setup the accept source, it will callback to the handler methods
// via the actor's mailbox so you don't need to worry about
// synchronizing with the local variables
accept_source = createSource(channel, SelectionKey.OP_ACCEPT, HawtDispatcher.queue(self));
accept_source.setEventHandler(^{ accept });
accept_source.setDisposer(^{
channel.close();
println("Closed port: "+port);
});
accept_source.resume
println("Listening on port: "+port);
}
private def accept() = {
var socket = channel.accept();
while( socket!=null ) {
try {
socket.configureBlocking(false);
val session = actorOf(new Session(self, socket))
session.start()
sessions += session
} catch {
case e: Exception =>
socket.close
}
socket = channel.accept();
}
}
def receive = {
case SessionClosed(session) =>
sessions = sessions.filterNot( _ == session )
session.stop
case DisplayStats =>
sessions.foreach { session=>
session ! DisplayStats
}
case Shutdown =>
sessions.foreach { session=>
session.stop
}
sessions.clear
accept_source.release
self.stop
}
}
class Session(val server:ActorRef, val channel: SocketChannel) extends Actor {
self.dispatcher = hawt
val buffer = ByteBuffer.allocate(1024);
val remote_address = channel.socket.getRemoteSocketAddress.toString
var read_source:DispatchSource = _
var write_source:DispatchSource = _
var readCounter = 0L
var writeCounter = 0L
var closed = false
override def preStart = {
if(useReactorPattern) {
// Then we will be using the reactor pattern for handling IO:
// Pin this actor to a single thread. The read/write event sources will poll
// a Selector on the pinned thread. Since the IO events are generated on the same
// thread as where the Actor is pinned to, it can avoid a substantial amount
// thread synchronization. Plus your GC will perform better since all the IO
// processing is done on a single thread.
HawtDispatcher.pin(self)
} else {
// Then we will be using sing the proactor pattern for handling IO:
// Then the actor will not be pinned to a specific thread. The read/write
// event sources will poll a Selector and then asynchronously dispatch the
// event's to the actor via the thread pool.
}
// Setup the sources, they will callback to the handler methods
// via the actor's mailbox so you don't need to worry about
// synchronizing with the local variables
read_source = createSource(channel, SelectionKey.OP_READ, HawtDispatcher.queue(self));
read_source.setEventHandler(^{ read })
read_source.setCancelHandler(^{ close })
write_source = createSource(channel, SelectionKey.OP_WRITE, HawtDispatcher.queue(self));
write_source.setEventHandler(^{ write })
write_source.setCancelHandler(^{ close })
read_source.resume
println("Accepted connection from: "+remote_address);
}
override def postStop = {
closed = true
read_source.release
write_source.release
channel.close
}
private def catchio(func: =>Unit):Unit = {
try {
func
} catch {
case e:IOException => close
}
}
def read():Unit = catchio {
channel.read(buffer) match {
case -1 =>
close // peer disconnected.
case 0 =>
case count:Int =>
readCounter += count
buffer.flip;
read_source.suspend
write_source.resume
write()
}
}
def write() = catchio {
writeCounter += channel.write(buffer)
if (buffer.remaining == 0) {
buffer.clear
write_source.suspend
read_source.resume
}
}
def close() = {
if( !closed ) {
closed = true
server ! SessionClosed(self)
}
}
def receive = {
case DisplayStats =>
println("connection to %s reads: %,d bytes, writes: %,d".format(remote_address, readCounter, writeCounter))
}
}
}

View file

@ -30,7 +30,6 @@ trait AkkaBaseProject extends BasicScalaProject {
val facebookModuleConfig = ModuleConfiguration("com.facebook", AkkaRepo)
val h2lzfModuleConfig = ModuleConfiguration("voldemort.store.compress", AkkaRepo)
val hbaseModuleConfig = ModuleConfiguration("org.apache.hbase", AkkaRepo)
val jsr166xModuleConfig = ModuleConfiguration("jsr166x", AkkaRepo)
val memcachedModuleConfig = ModuleConfiguration("spy", "memcached", AkkaRepo)
val netLagModuleConfig = ModuleConfiguration("net.lag", AkkaRepo)
val redisModuleConfig = ModuleConfiguration("com.redis", AkkaRepo)

View file

@ -34,10 +34,8 @@ akka {
default-dispatcher {
type = "GlobalExecutorBasedEventDriven" # Must be one of the following, all "Global*" are non-configurable
# - Hawt
# - ExecutorBasedEventDriven
# - ExecutorBasedEventDrivenWorkStealing
# - GlobalHawt
# - GlobalExecutorBasedEventDriven
keep-alive-time = 60 # Keep alive time for threads
core-pool-size-factor = 1.0 # No of core threads ... ceil(available processors * factor)
@ -47,7 +45,6 @@ akka {
rejection-policy = "caller-runs" # abort, caller-runs, discard-oldest, discard
throughput = 5 # Throughput for ExecutorBasedEventDrivenDispatcher, set to 1 for complete fairness
throughput-deadline-time = -1 # Throughput deadline for ExecutorBasedEventDrivenDispatcher, set to 0 or negative for no deadline
aggregate = off # Aggregate on/off for HawtDispatchers
mailbox-capacity = -1 # If negative (or zero) then an unbounded mailbox is used (default)
# If positive then a bounded mailbox is used and the capacity is set using the property
# NOTE: setting a mailbox to 'blocking' can be a bit dangerous,
@ -104,7 +101,7 @@ akka {
#If you are using akka.http.AkkaMistServlet
mist-dispatcher {
#type = "Hawt" # Uncomment if you want to use a different dispatcher than the default one for Comet
#type = "GlobalExecutorBasedEventDriven" # Uncomment if you want to use a different dispatcher than the default one for Comet
}
connection-close = true # toggles the addition of the "Connection" response header with a "close" value
root-actor-id = "_httproot" # the id of the actor to use as the root endpoint

View file

@ -111,7 +111,6 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
// -------------------------------------------------------------------------------------------------------------------
lazy val DISPATCH_VERSION = "0.7.4"
lazy val HAWT_DISPATCH_VERSION = "1.1"
lazy val JACKSON_VERSION = "1.4.3"
lazy val JERSEY_VERSION = "1.3"
lazy val MULTIVERSE_VERSION = "0.6.2"
@ -159,8 +158,6 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
lazy val h2_lzf = "voldemort.store.compress" % "h2-lzf" % "1.0" % "compile" //ApacheV2
lazy val hawtdispatch = "org.fusesource.hawtdispatch" % "hawtdispatch-scala" % HAWT_DISPATCH_VERSION % "compile" //ApacheV2
lazy val jackson = "org.codehaus.jackson" % "jackson-mapper-asl" % JACKSON_VERSION % "compile" //ApacheV2
lazy val jackson_core = "org.codehaus.jackson" % "jackson-core-asl" % JACKSON_VERSION % "compile" //ApacheV2
@ -169,8 +166,6 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
lazy val jersey_server = "com.sun.jersey" % "jersey-server" % JERSEY_VERSION % "compile" //CDDL v1
lazy val jersey_contrib = "com.sun.jersey.contribs" % "jersey-scala" % JERSEY_VERSION % "compile" //CDDL v1
lazy val jsr166x = "jsr166x" % "jsr166x" % "1.0" % "compile" //CC Public Domain
lazy val jsr250 = "javax.annotation" % "jsr250-api" % "1.0" % "compile" //CDDL v1
lazy val jsr311 = "javax.ws.rs" % "jsr311-api" % "1.1" % "compile" //CDDL v1
@ -322,8 +317,6 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
class AkkaActorProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) {
val uuid = Dependencies.uuid
val configgy = Dependencies.configgy
val hawtdispatch = Dependencies.hawtdispatch
val jsr166x = Dependencies.jsr166x
val logback = Dependencies.logback
// testing