From a6bfe644d5efcfd6b2f7d2de3d1b70dc6d37065f Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Sun, 27 Feb 2011 22:44:37 +0100 Subject: [PATCH] Removing HawtDispatch, the old WorkStealing dispatcher, replace old workstealer with new workstealer based on EBEDD, and remove jsr166x dependency, only 3 more deps to go until 0 deps for akka-actor --- .../src/main/scala/akka/actor/Actor.scala | 4 - .../scala/akka/dispatch/Dispatchers.scala | 27 +- .../ExecutorBasedEventDrivenDispatcher.scala | 13 +- ...sedEventDrivenWorkStealingDispatcher.scala | 303 ++++++------------ .../scala/akka/dispatch/HawtDispatcher.scala | 201 ------------ .../scala/akka/dispatch/MailboxHandling.scala | 2 +- .../actor/supervisor/SupervisorMiscSpec.scala | 2 +- .../scala/akka/dispatch/ActorModelSpec.scala | 6 +- .../scala/akka/dispatch/DispatchersSpec.scala | 5 +- .../dispatch/HawtDispatcherActorSpec.scala | 71 ---- .../dispatch/HawtDispatcherEchoServer.scala | 206 ------------ .../src/main/scala/AkkaProject.scala | 1 - config/akka-reference.conf | 5 +- project/build/AkkaProject.scala | 7 - 14 files changed, 113 insertions(+), 740 deletions(-) delete mode 100644 akka-actor/src/main/scala/akka/dispatch/HawtDispatcher.scala delete mode 100644 akka-actor/src/test/scala/akka/dispatch/HawtDispatcherActorSpec.scala delete mode 100644 akka-actor/src/test/scala/akka/dispatch/HawtDispatcherEchoServer.scala diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 8c3e478644..84583c39da 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -92,10 +92,6 @@ object Actor extends Logging { private[akka] lazy val shutdownHook = { val hook = new Runnable { override def run { - // Shutdown HawtDispatch GlobalQueue - log.slf4j.info("Shutting down Hawt Dispatch global queue") - org.fusesource.hawtdispatch.globalQueue.asInstanceOf[org.fusesource.hawtdispatch.internal.GlobalDispatchQueue].shutdown - // Clear Thread.subclassAudits log.slf4j.info("Clearing subclass audits") val tf = classOf[java.lang.Thread].getDeclaredField("subclassAudits") diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala index bc6dc02ada..dc4d0f5d74 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala @@ -61,19 +61,8 @@ object Dispatchers extends Logging { config.getConfigMap("akka.actor.default-dispatcher").flatMap(from).getOrElse(globalExecutorBasedEventDrivenDispatcher) } - object globalHawtDispatcher extends HawtDispatcher - object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher("global", THROUGHPUT, THROUGHPUT_DEADLINE_TIME_MILLIS, MAILBOX_TYPE) - /** - * Creates an event-driven dispatcher based on the excellent HawtDispatch library. - *

- * Can be beneficial to use the HawtDispatcher.pin(self) to "pin" an actor to a specific thread. - *

- * See the ScalaDoc for the {@link akka.dispatch.HawtDispatcher} for details. - */ - def newHawtDispatcher(aggregate: Boolean) = new HawtDispatcher(aggregate) - /** * Creates an thread based dispatcher serving a single actor through the same single thread. * Uses the default timeout @@ -141,7 +130,7 @@ object Dispatchers extends Logging { * Has a fluent builder interface for configuring its semantics. */ def newExecutorBasedEventDrivenWorkStealingDispatcher(name: String, mailboxType: MailboxType) = - ThreadPoolConfigDispatcherBuilder(config => new ExecutorBasedEventDrivenWorkStealingDispatcher(name,mailboxType,config),ThreadPoolConfig()) + ThreadPoolConfigDispatcherBuilder(config => new ExecutorBasedEventDrivenWorkStealingDispatcher(name, THROUGHPUT, THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType, config, THROUGHPUT),ThreadPoolConfig()) /** * Utility function that tries to load the specified dispatcher config from the akka.conf @@ -156,7 +145,7 @@ object Dispatchers extends Logging { * default-dispatcher { * type = "GlobalExecutorBasedEventDriven" # Must be one of the following, all "Global*" are non-configurable * # (ExecutorBasedEventDrivenWorkStealing), ExecutorBasedEventDriven, - * # Hawt, GlobalExecutorBasedEventDriven, GlobalHawt + * # GlobalExecutorBasedEventDriven * keep-alive-time = 60 # Keep alive time for threads * core-pool-size-factor = 1.0 # No of core threads ... ceil(available processors * factor) * max-pool-size-factor = 4.0 # Max no of threads ... ceil(available processors * factor) @@ -164,7 +153,6 @@ object Dispatchers extends Logging { * allow-core-timeout = on # Allow core threads to time out * rejection-policy = "caller-runs" # abort, caller-runs, discard-oldest, discard * throughput = 5 # Throughput for ExecutorBasedEventDrivenDispatcher - * aggregate = off # Aggregate on/off for HawtDispatchers * } * ex: from(config.getConfigMap(identifier).get) * @@ -211,11 +199,14 @@ object Dispatchers extends Logging { threadPoolConfig)).build case "ExecutorBasedEventDrivenWorkStealing" => - configureThreadPool(poolCfg => new ExecutorBasedEventDrivenWorkStealingDispatcher(name, mailboxType,poolCfg)).build - - case "Hawt" => new HawtDispatcher(cfg.getBool("aggregate",true)) + configureThreadPool(threadPoolConfig => new ExecutorBasedEventDrivenWorkStealingDispatcher( + name, + cfg.getInt("throughput", THROUGHPUT), + cfg.getInt("throughput-deadline-time", THROUGHPUT_DEADLINE_TIME_MILLIS), + mailboxType, + threadPoolConfig, + cfg.getInt("max-donation", THROUGHPUT))).build case "GlobalExecutorBasedEventDriven" => globalExecutorBasedEventDrivenDispatcher - case "GlobalHawt" => globalHawtDispatcher case unknown => throw new IllegalArgumentException("Unknown dispatcher type [%s]" format unknown) } } diff --git a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala index 2fa16eca71..e98603f8a6 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -101,7 +101,7 @@ class ExecutorBasedEventDrivenDispatcher( /** * @return the mailbox associated with the actor */ - private def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[MessageQueue with ExecutableMailbox] + protected def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[MessageQueue with ExecutableMailbox] override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size @@ -126,7 +126,6 @@ class ExecutorBasedEventDrivenDispatcher( } } - private[akka] def registerForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = if (active.isOn) { if (!mbox.suspended.locked && mbox.dispatcherLock.tryLock()) { try { @@ -137,7 +136,11 @@ class ExecutorBasedEventDrivenDispatcher( throw e } } - } else log.slf4j.warn("{} is shut down,\n\tignoring the rest of the messages in the mailbox of\n\t{}", this, mbox) + } + else log.slf4j.warn("{} is shut down,\n\tignoring the rest of the messages in the mailbox of\n\t{}", this, mbox) + + private[akka] def reRegisterForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = + registerForExecution(mbox) override val toString = getClass.getSimpleName + "[" + name + "]" @@ -150,7 +153,7 @@ class ExecutorBasedEventDrivenDispatcher( log.slf4j.debug("Resuming {}",actorRef.uuid) val mbox = getMailbox(actorRef) mbox.suspended.tryUnlock - registerForExecution(mbox) + reRegisterForExecution(mbox) } } @@ -170,7 +173,7 @@ trait ExecutableMailbox extends Runnable { self: MessageQueue => dispatcherLock.unlock() } if (!self.isEmpty) - dispatcher.registerForExecution(this) + dispatcher.reRegisterForExecution(this) } /** diff --git a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala index 54aec2607d..7701562b64 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala @@ -4,14 +4,12 @@ package akka.dispatch +import akka.actor.{ActorRef, Actor, IllegalActorStateException} +import akka.util.{ReflectiveAccess, Switch} -import akka.actor.{Actor, ActorRef, IllegalActorStateException} -import akka.util.Switch - -import java.util.concurrent. {ExecutorService, CopyOnWriteArrayList} -import java.util.concurrent.atomic.AtomicReference - -import jsr166x.{Deque, LinkedBlockingDeque} +import java.util.Queue +import java.util.concurrent.atomic.{AtomicReference, AtomicInteger} +import java.util.concurrent.{ TimeUnit, ExecutorService, RejectedExecutionException, ConcurrentLinkedQueue, LinkedBlockingQueue} /** * An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed @@ -21,235 +19,116 @@ import jsr166x.{Deque, LinkedBlockingDeque} * Although the technique used in this implementation is commonly known as "work stealing", the actual implementation is probably * best described as "work donating" because the actor of which work is being stolen takes the initiative. *

- * This dispatcher attempts to redistribute work between actors each time a message is dispatched on a busy actor. Work - * will not be redistributed when actors are busy, but no new messages are dispatched. - * TODO: it would be nice to be able to redistribute work even when no new messages are being dispatched, without impacting dispatching performance ?! - *

* The preferred way of creating dispatchers is to use * the {@link akka.dispatch.Dispatchers} factory object. * * @see akka.dispatch.ExecutorBasedEventDrivenWorkStealingDispatcher * @see akka.dispatch.Dispatchers * - * @author Jan Van Besien + * @author Viktor Klang */ class ExecutorBasedEventDrivenWorkStealingDispatcher( _name: String, - val mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE, - config: ThreadPoolConfig = ThreadPoolConfig()) extends MessageDispatcher { + throughput: Int = Dispatchers.THROUGHPUT, + throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, + mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE, + config: ThreadPoolConfig = ThreadPoolConfig(), + val maxDonationQty: Int = Dispatchers.THROUGHPUT) + extends ExecutorBasedEventDrivenDispatcher(_name, throughput, throughputDeadlineTime, mailboxType, config) { - def this(_name: String, mailboxType: MailboxType) = this(_name, mailboxType,ThreadPoolConfig()) + def this(_name: String, throughput: Int, throughputDeadlineTime: Int, mailboxType: MailboxType) = + this(_name, throughput, throughputDeadlineTime, mailboxType,ThreadPoolConfig()) // Needed for Java API usage - def this(_name: String) = this(_name, Dispatchers.MAILBOX_TYPE,ThreadPoolConfig()) + def this(_name: String, throughput: Int, mailboxType: MailboxType) = + this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage - val name = "akka:event-driven-work-stealing:dispatcher:" + _name + def this(_name: String, throughput: Int) = + this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage + + def this(_name: String, _config: ThreadPoolConfig) = + this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE, _config) + + def this(_name: String, memberType: Class[_ <: Actor]) = + this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage - /** Type of the actors registered in this dispatcher. */ @volatile private var actorType: Option[Class[_]] = None - private val pooledActors = new CopyOnWriteArrayList[ActorRef] - private[akka] val threadFactory = new MonitorableThreadFactory(name) - private[akka] val executorService = new AtomicReference[ExecutorService](config.createLazyExecutorService(threadFactory)) + @volatile private var members = Vector[ActorRef]() /** The index in the pooled actors list which was last used to steal work */ - @volatile private var lastThiefIndex = 0 - - /** - * @return the mailbox associated with the actor - */ - private def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[Deque[MessageInvocation] with MessageQueue with Runnable] - - override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size - - private[akka] def dispatch(invocation: MessageInvocation) { - val mbox = getMailbox(invocation.receiver) - mbox enqueue invocation - executorService.get() execute mbox - } - - /** - * Try processing the mailbox of the given actor. Fails if the dispatching lock on the actor is already held by - * another thread (because then that thread is already processing the mailbox). - * - * @return true if the mailbox was processed, false otherwise - */ - private def tryProcessMailbox(mailbox: MessageQueue): Boolean = { - var mailboxWasProcessed = false - - // this do-wile loop is required to prevent missing new messages between the end of processing - // the mailbox and releasing the lock - do { - if (mailbox.dispatcherLock.tryLock) { - try { - mailboxWasProcessed = processMailbox(mailbox) - } finally { - mailbox.dispatcherLock.unlock - } - } - } while ((mailboxWasProcessed && !mailbox.isEmpty)) - - mailboxWasProcessed - } - - /** - * Process the messages in the mailbox of the given actor. - * @return - */ - private def processMailbox(mailbox: MessageQueue): Boolean = try { - if (mailbox.suspended.locked) - return false - - var messageInvocation = mailbox.dequeue - while (messageInvocation ne null) { - messageInvocation.invoke - if (mailbox.suspended.locked) - return false - messageInvocation = mailbox.dequeue - } - true - } catch { - case ie: InterruptedException => false - } - - private def findThief(receiver: ActorRef): Option[ActorRef] = { - // copy to prevent concurrent modifications having any impact - val actors = pooledActors.toArray(new Array[ActorRef](pooledActors.size)) - val i = if ( lastThiefIndex > actors.size ) 0 else lastThiefIndex - - // we risk to pick a thief which is unregistered from the dispatcher in the meantime, but that typically means - // the dispatcher is being shut down... - val (thief: Option[ActorRef], index: Int) = doFindThief(receiver, actors, i) - lastThiefIndex = (index + 1) % actors.size - thief - } - - /** - * Find a thief to process the receivers messages from the given list of actors. - * - * @param receiver original receiver of the message - * @param actors list of actors to find a thief in - * @param startIndex first index to start looking in the list (i.e. for round robin) - * @return the thief (or None) and the new index to start searching next time - */ - private def doFindThief(receiver: ActorRef, actors: Array[ActorRef], startIndex: Int): (Option[ActorRef], Int) = { - for (i <- 0 to actors.length) { - val index = (i + startIndex) % actors.length - val actor = actors(index) - if (actor != receiver && getMailbox(actor).isEmpty) return (Some(actor), index) - } - (None, startIndex) // nothing found, reuse same start index next time - } - - /** - * Try donating messages to the thief and processing the thiefs mailbox. Doesn't do anything if we can not acquire - * the thiefs dispatching lock, because in that case another thread is already processing the thiefs mailbox. - */ - private def tryDonateAndProcessMessages(receiver: ActorRef, thief: ActorRef) = { - val mailbox = getMailbox(thief) - if (mailbox.dispatcherLock.tryLock) { - try { - while(donateMessage(receiver, thief)) processMailbox(mailbox) - } finally { - mailbox.dispatcherLock.unlock - } - } - } - - /** - * Steal a message from the receiver and give it to the thief. - */ - private def donateMessage(receiver: ActorRef, thief: ActorRef): Boolean = { - val donated = getMailbox(receiver).pollLast - if (donated ne null) { - if (donated.senderFuture.isDefined) thief.postMessageToMailboxAndCreateFutureResultWithTimeout[Any]( - donated.message, receiver.timeout, donated.sender, donated.senderFuture) - else if (donated.sender.isDefined) thief.postMessageToMailbox(donated.message, donated.sender) - else thief.postMessageToMailbox(donated.message, None) - true - } else false - } - - private[akka] def start = log.slf4j.debug("Starting up {}",toString) - - private[akka] def shutdown { - val old = executorService.getAndSet(config.createLazyExecutorService(threadFactory)) - if (old ne null) { - log.slf4j.debug("Shutting down {}", toString) - old.shutdownNow() - } - } - - - def suspend(actorRef: ActorRef) { - getMailbox(actorRef).suspended.tryLock - } - - def resume(actorRef: ActorRef) { - val mbox = getMailbox(actorRef) - mbox.suspended.tryUnlock - executorService.get() execute mbox - } - - override val toString = "ExecutorBasedEventDrivenWorkStealingDispatcher[" + name + "]" - - private[akka] def createMailbox(actorRef: ActorRef): AnyRef = mailboxType match { - case UnboundedMailbox(blockDequeue) => - new LinkedBlockingDeque[MessageInvocation] with MessageQueue with Runnable { - final def enqueue(handle: MessageInvocation) { - this add handle - } - - final def dequeue(): MessageInvocation = { - if (blockDequeue) this.take() - else this.poll() - } - - def run = if (!tryProcessMailbox(this)) { - // we are not able to process our mailbox (another thread is busy with it), so lets donate some of our mailbox - // to another actor and then process his mailbox in stead. - findThief(actorRef).foreach( tryDonateAndProcessMessages(actorRef,_) ) - } - } - case BoundedMailbox(blockDequeue, capacity, pushTimeOut) => - new LinkedBlockingDeque[MessageInvocation](capacity) with MessageQueue with Runnable { - - final def enqueue(handle: MessageInvocation) { - if (pushTimeOut.toMillis > 0) { - if (!this.offer(handle, pushTimeOut.length, pushTimeOut.unit)) - throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + toString) - } else this put handle - } - - final def dequeue(): MessageInvocation = - if (blockDequeue) this.take() - else this.poll() - - def run = if (!tryProcessMailbox(this)) { - // we are not able to process our mailbox (another thread is busy with it), so lets donate some of our mailbox - // to another actor and then process his mailbox in stead. - findThief(actorRef).foreach( tryDonateAndProcessMessages(actorRef, _) ) - } - } - } + private val lastDonorRecipient = new AtomicInteger(0) private[akka] override def register(actorRef: ActorRef) = { - verifyActorsAreOfSameType(actorRef) - pooledActors add actorRef + //Verify actor type conformity + actorType match { + case None => actorType = Some(actorRef.actor.getClass) + case Some(aType) => + if (aType != actorRef.actor.getClass) + throw new IllegalActorStateException(String.format( + "Can't register actor %s in a work stealing dispatcher which already knows actors of type %s", + actorRef, aType)) + } + + synchronized { members :+= actorRef } //Update members super.register(actorRef) } private[akka] override def unregister(actorRef: ActorRef) = { - pooledActors remove actorRef + synchronized { members = members.filterNot(actorRef eq) } //Update members super.unregister(actorRef) } - private def verifyActorsAreOfSameType(actorOfId: ActorRef) = { - actorType match { - case None => actorType = Some(actorOfId.actor.getClass) - case Some(aType) => - if (aType != actorOfId.actor.getClass) - throw new IllegalActorStateException(String.format( - "Can't register actor {} in a work stealing dispatcher which already knows actors of type {}", - actorOfId.actor, aType)) + override private[akka] def reRegisterForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = { + donateFrom(mbox) //When we reregister, first donate messages to another actor + if (!mbox.isEmpty) //If we still have messages left to process, reschedule for execution + super.reRegisterForExecution(mbox) + } + + private[akka] def donateFrom(donorMbox: MessageQueue with ExecutableMailbox): Unit = { + val actors = members // copy to prevent concurrent modifications having any impact + val actorSz = actors.size + val ldr = lastDonorRecipient.get + val i = if ( ldr > actorSz ) 0 else ldr + + def doFindDonorRecipient(donorMbox: MessageQueue with ExecutableMailbox, potentialRecipients: Vector[ActorRef], startIndex: Int): ActorRef = { + val prSz = potentialRecipients.size + var i = 0 + var recipient: ActorRef = null + while((i < prSz) && (recipient eq null)) { + val index = (i + startIndex) % prSz //Wrap-around, one full lap + val actor = potentialRecipients(index) + val mbox = getMailbox(actor) + + if ((mbox ne donorMbox) && mbox.isEmpty) { //Don't donate to yourself + lastDonorRecipient.set((index + 1) % actors.length) + recipient = actor //Found! + } + + i += 1 + } + + lastDonorRecipient.compareAndSet(ldr, (startIndex + 1) % actors.length) + recipient // nothing found, reuse same start index next time + } + + // we risk to pick a thief which is unregistered from the dispatcher in the meantime, but that typically means + // the dispatcher is being shut down... + val recipient = doFindDonorRecipient(donorMbox, actors, i) + if (recipient ne null) { + def tryDonate(): Boolean = { + var organ = donorMbox.dequeue //FIXME switch to something that cannot block + if (organ ne null) { + println("DONATING!!!") + if (organ.senderFuture.isDefined) recipient.postMessageToMailboxAndCreateFutureResultWithTimeout[Any]( + organ.message, recipient.timeout, organ.sender, organ.senderFuture) + else if (organ.sender.isDefined) recipient.postMessageToMailbox(organ.message, organ.sender) + else recipient.postMessageToMailbox(organ.message, None) + true + } else false + } + + var donated = 0 + while(donated < maxDonationQty && tryDonate()) + donated += 1 } } -} +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/dispatch/HawtDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/HawtDispatcher.scala deleted file mode 100644 index e5e54ea0e9..0000000000 --- a/akka-actor/src/main/scala/akka/dispatch/HawtDispatcher.scala +++ /dev/null @@ -1,201 +0,0 @@ -/** - * Copyright (C) 2009-2011 Scalable Solutions AB - */ - -package akka.dispatch - -import akka.actor.ActorRef -import akka.util.Switch - -import org.fusesource.hawtdispatch._ -import org.fusesource.hawtdispatch.DispatchQueue.QueueType - -import java.util.concurrent.atomic.{AtomicInteger, AtomicBoolean} -import java.util.concurrent.CountDownLatch - -/** - * Holds helper methods for working with actors that are using a HawtDispatcher as it's dispatcher. - */ -object HawtDispatcher { - - private val retained = new AtomicInteger() - - @volatile private var shutdownLatch: CountDownLatch = _ - - private def retainNonDaemon = if (retained.getAndIncrement == 0) { - shutdownLatch = new CountDownLatch(1) - new Thread("HawtDispatch Non-Daemon") { - override def run = { - try { - shutdownLatch.await - } catch { - case _ => - } - } - }.start() - } - - private def releaseNonDaemon = if (retained.decrementAndGet == 0) { - shutdownLatch.countDown - shutdownLatch = null - } - - /** - * @return the mailbox associated with the actor - */ - private def mailbox(actorRef: ActorRef) = actorRef.mailbox.asInstanceOf[HawtDispatcherMailbox] - - /** - * @return the dispatch queue associated with the actor - */ - def queue(actorRef: ActorRef) = mailbox(actorRef).queue - - /** - *

- * Pins an actor to a random thread queue. Once pinned the actor will always execute - * on the same thread. - *

- * - *

- * This method can only succeed if the actor it's dispatcher is set to a HawtDispatcher and it has been started - *

- * - * @return true if the actor was pinned - */ - def pin(actorRef: ActorRef) = actorRef.mailbox match { - case x: HawtDispatcherMailbox => - x.queue.setTargetQueue( getRandomThreadQueue ) - true - case _ => false - } - - /** - *

- * Unpins the actor so that all threads in the hawt dispatch thread pool - * compete to execute him. - *

- * - *

- * This method can only succeed if the actor it's dispatcher is set to a HawtDispatcher and it has been started - *

- * @return true if the actor was unpinned - */ - def unpin(actorRef: ActorRef) = target(actorRef, globalQueue) - - /** - * @return true if the actor was pinned to a thread. - */ - def pinned(actorRef: ActorRef):Boolean = actorRef.mailbox match { - case x: HawtDispatcherMailbox => x.queue.getTargetQueue.getQueueType == QueueType.THREAD_QUEUE - case _ => false - } - - /** - *

- * Updates the actor's target dispatch queue to the value specified. This allows - * you to do odd things like targeting another serial queue. - *

- * - *

- * This method can only succeed if the actor it's dispatcher is set to a HawtDispatcher and it has been started - *

- * @return true if the actor was unpinned - */ - def target(actorRef: ActorRef, parent: DispatchQueue) = actorRef.mailbox match { - case x: HawtDispatcherMailbox => - x.queue.setTargetQueue(parent) - true - case _ => false - } -} - -/** - *

- * A HawtDispatch based MessageDispatcher. Actors with this dispatcher are executed - * on the HawtDispatch fixed sized thread pool. The number of of threads will match - * the number of cores available on your system. - * - *

- *

- * Actors using this dispatcher are restricted to only executing non blocking - * operations. The actor cannot synchronously call another actor or call 3rd party - * libraries that can block for a long time. You should use non blocking IO APIs - * instead of blocking IO apis to avoid blocking that actor for an extended amount - * of time. - *

- * - *

- * This dispatcher delivers messages to the actors in the order that they - * were producer at the sender. - *

- * - *

- * HawtDispatch supports processing Non blocking Socket IO in both the reactor - * and proactor styles. For more details, see the HawtDispacherEchoServer.scala - * example. - *

- * - * @author Hiram Chirino - */ -class HawtDispatcher(val aggregate: Boolean = true, val parent: DispatchQueue = globalQueue) extends MessageDispatcher { - import HawtDispatcher._ - - private[akka] def start { retainNonDaemon } - - private[akka] def shutdown { releaseNonDaemon } - - private[akka] def dispatch(invocation: MessageInvocation){ - mailbox(invocation.receiver).dispatch(invocation) - } - - // hawtdispatch does not have a way to get queue sizes, getting an accurate - // size can cause extra contention.. is this really needed? - // TODO: figure out if this can be optional in akka - override def mailboxSize(actorRef: ActorRef) = 0 - - def createMailbox(actorRef: ActorRef): AnyRef = { - val queue = parent.createQueue(actorRef.toString) - if (aggregate) new AggregatingHawtDispatcherMailbox(queue) - else new HawtDispatcherMailbox(queue) - } - - def suspend(actorRef: ActorRef) = mailbox(actorRef).suspend - def resume(actorRef:ActorRef) = mailbox(actorRef).resume - - override def toString = "HawtDispatcher" -} - -class HawtDispatcherMailbox(val queue: DispatchQueue) { - def dispatch(invocation: MessageInvocation) { - queue { - invocation.invoke - } - } - - def suspend = queue.suspend - def resume = queue.resume -} - -class AggregatingHawtDispatcherMailbox(queue:DispatchQueue) extends HawtDispatcherMailbox(queue) { - private val source = createSource(new ListEventAggregator[MessageInvocation](), queue) - source.setEventHandler (^{drain_source} ) - source.resume - - private def drain_source = source.getData.foreach(_.invoke) - - override def suspend = source.suspend - override def resume = source.resume - - override def dispatch(invocation: MessageInvocation) { - if (getCurrentQueue eq null) { - // we are being call from a non hawtdispatch thread, can't aggregate - // it's events - super.dispatch(invocation) - } else { - // we are being call from a hawtdispatch thread, use the dispatch source - // so that multiple invocations issues on this thread will aggregate and then once - // the thread runs out of work, they get transferred as a batch to the other thread. - source.merge(invocation) - } - } -} diff --git a/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala index 68e8cf68ce..8fcf688d55 100644 --- a/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala +++ b/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala @@ -33,7 +33,7 @@ sealed trait MailboxType case class UnboundedMailbox(val blocking: Boolean = false) extends MailboxType case class BoundedMailbox( - val blocking: Boolean = false, + val blocking: Boolean = false, val capacity: Int = { if (Dispatchers.MAILBOX_CAPACITY < 0) Int.MaxValue else Dispatchers.MAILBOX_CAPACITY }, val pushTimeOut: Duration = Dispatchers.MAILBOX_PUSH_TIME_OUT) extends MailboxType { if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative") diff --git a/akka-actor/src/test/scala/akka/actor/supervisor/SupervisorMiscSpec.scala b/akka-actor/src/test/scala/akka/actor/supervisor/SupervisorMiscSpec.scala index f02b369387..78547b4d19 100644 --- a/akka-actor/src/test/scala/akka/actor/supervisor/SupervisorMiscSpec.scala +++ b/akka-actor/src/test/scala/akka/actor/supervisor/SupervisorMiscSpec.scala @@ -46,7 +46,7 @@ class SupervisorMiscSpec extends WordSpec with MustMatchers { }).start val actor4 = Actor.actorOf(new Actor { - self.dispatcher = Dispatchers.newHawtDispatcher(true) + self.dispatcher = Dispatchers.newThreadBasedDispatcher(self) override def postRestart(cause: Throwable) {countDownLatch.countDown} protected def receive = { diff --git a/akka-actor/src/test/scala/akka/dispatch/ActorModelSpec.scala b/akka-actor/src/test/scala/akka/dispatch/ActorModelSpec.scala index b282e08090..1a5e9753b8 100644 --- a/akka-actor/src/test/scala/akka/dispatch/ActorModelSpec.scala +++ b/akka-actor/src/test/scala/akka/dispatch/ActorModelSpec.scala @@ -303,10 +303,6 @@ class ExecutorBasedEventDrivenDispatcherModelTest extends ActorModelSpec { new ExecutorBasedEventDrivenDispatcher("foo") with MessageDispatcherInterceptor } -class HawtDispatcherModelTest extends ActorModelSpec { - def newInterceptedDispatcher = new HawtDispatcher(false) with MessageDispatcherInterceptor -} - class ExecutorBasedEventDrivenWorkStealingDispatcherModelTest extends ActorModelSpec { def newInterceptedDispatcher = new ExecutorBasedEventDrivenWorkStealingDispatcher("foo") with MessageDispatcherInterceptor -} +} \ No newline at end of file diff --git a/akka-actor/src/test/scala/akka/dispatch/DispatchersSpec.scala b/akka-actor/src/test/scala/akka/dispatch/DispatchersSpec.scala index 826cd72690..d09c088c99 100644 --- a/akka-actor/src/test/scala/akka/dispatch/DispatchersSpec.scala +++ b/akka-actor/src/test/scala/akka/dispatch/DispatchersSpec.scala @@ -22,7 +22,6 @@ object DispatchersSpec { val allowcoretimeout = "allow-core-timeout" val rejectionpolicy = "rejection-policy" // abort, caller-runs, discard-oldest, discard val throughput = "throughput" // Throughput for ExecutorBasedEventDrivenDispatcher - val aggregate = "aggregate" // Aggregate on/off for HawtDispatchers def instance(dispatcher: MessageDispatcher): (MessageDispatcher) => Boolean = _ == dispatcher def ofType[T <: MessageDispatcher : Manifest]: (MessageDispatcher) => Boolean = _.getClass == manifest[T].erasure @@ -30,9 +29,7 @@ object DispatchersSpec { def typesAndValidators: Map[String,(MessageDispatcher) => Boolean] = Map( "ExecutorBasedEventDrivenWorkStealing" -> ofType[ExecutorBasedEventDrivenWorkStealingDispatcher], "ExecutorBasedEventDriven" -> ofType[ExecutorBasedEventDrivenDispatcher], - "Hawt" -> ofType[HawtDispatcher], - "GlobalExecutorBasedEventDriven" -> instance(globalExecutorBasedEventDrivenDispatcher), - "GlobalHawt" -> instance(globalHawtDispatcher) + "GlobalExecutorBasedEventDriven" -> instance(globalExecutorBasedEventDrivenDispatcher) ) def validTypes = typesAndValidators.keys.toList diff --git a/akka-actor/src/test/scala/akka/dispatch/HawtDispatcherActorSpec.scala b/akka-actor/src/test/scala/akka/dispatch/HawtDispatcherActorSpec.scala deleted file mode 100644 index 1d1b2c1e24..0000000000 --- a/akka-actor/src/test/scala/akka/dispatch/HawtDispatcherActorSpec.scala +++ /dev/null @@ -1,71 +0,0 @@ -package akka.actor.dispatch - -import java.util.concurrent.{CountDownLatch, TimeUnit} - -import org.scalatest.junit.JUnitSuite -import org.junit.Test - -import akka.dispatch.{HawtDispatcher, Dispatchers} -import akka.actor.Actor -import Actor._ - -object HawtDispatcherActorSpec { - class TestActor extends Actor { - self.dispatcher = new HawtDispatcher() - def receive = { - case "Hello" => - self.reply("World") - case "Failure" => - throw new RuntimeException("Expected exception; to test fault-tolerance") - } - } - - object OneWayTestActor { - val oneWay = new CountDownLatch(1) - } - class OneWayTestActor extends Actor { - self.dispatcher = new HawtDispatcher() - def receive = { - case "OneWay" => OneWayTestActor.oneWay.countDown - } - } -} - -class HawtDispatcherActorSpec extends JUnitSuite { - import HawtDispatcherActorSpec._ - - private val unit = TimeUnit.MILLISECONDS - - @Test def shouldSendOneWay = { - val actor = actorOf[OneWayTestActor].start - val result = actor ! "OneWay" - assert(OneWayTestActor.oneWay.await(1, TimeUnit.SECONDS)) - actor.stop - } - - @Test def shouldSendReplySync = { - val actor = actorOf[TestActor].start - val result = (actor !! ("Hello", 10000)).as[String] - assert("World" === result.get) - actor.stop - } - - @Test def shouldSendReplyAsync = { - val actor = actorOf[TestActor].start - val result = actor !! "Hello" - assert("World" === result.get.asInstanceOf[String]) - actor.stop - } - - @Test def shouldSendReceiveException = { - val actor = actorOf[TestActor].start - try { - actor !! "Failure" - fail("Should have thrown an exception") - } catch { - case e => - assert("Expected exception; to test fault-tolerance" === e.getMessage()) - } - actor.stop - } -} diff --git a/akka-actor/src/test/scala/akka/dispatch/HawtDispatcherEchoServer.scala b/akka-actor/src/test/scala/akka/dispatch/HawtDispatcherEchoServer.scala deleted file mode 100644 index 047d557e12..0000000000 --- a/akka-actor/src/test/scala/akka/dispatch/HawtDispatcherEchoServer.scala +++ /dev/null @@ -1,206 +0,0 @@ -/** - * Copyright (C) 2009-2011 Scalable Solutions AB - */ - -package akka.actor.dispatch - -import scala.collection.mutable.ListBuffer - -import java.util.concurrent.TimeUnit -import java.net.InetSocketAddress -import java.io.IOException -import java.nio.ByteBuffer -import java.nio.channels.{SocketChannel, SelectionKey, ServerSocketChannel} - -import akka.actor._ -import akka.actor.Actor._ -import akka.dispatch.HawtDispatcher - -import org.fusesource.hawtdispatch._ - -/** - * This is an example of how to crate an Akka actor based TCP echo server using - * the HawtDispatch dispatcher and NIO event sources. - */ -object HawtDispatcherEchoServer { - - private val hawt = new HawtDispatcher - var port=4444; - var useReactorPattern=true - - def main(args:Array[String]):Unit = run - - def run() = { - val server = actorOf(new Server(port)) - server.start - Scheduler.schedule(server, DisplayStats, 1, 5, TimeUnit.SECONDS) - - println("Press enter to shutdown."); - System.in.read - server ! Shutdown - } - - case object Shutdown - case object DisplayStats - case class SessionClosed(session:ActorRef) - - class Server(val port: Int) extends Actor { - - self.dispatcher = hawt - - var channel:ServerSocketChannel = _ - var accept_source:DispatchSource = _ - var sessions = ListBuffer[ActorRef]() - - override def preStart = { - channel = ServerSocketChannel.open(); - channel.socket().bind(new InetSocketAddress(port)); - channel.configureBlocking(false); - - // Setup the accept source, it will callback to the handler methods - // via the actor's mailbox so you don't need to worry about - // synchronizing with the local variables - accept_source = createSource(channel, SelectionKey.OP_ACCEPT, HawtDispatcher.queue(self)); - accept_source.setEventHandler(^{ accept }); - accept_source.setDisposer(^{ - channel.close(); - println("Closed port: "+port); - }); - - accept_source.resume - - println("Listening on port: "+port); - } - - - private def accept() = { - var socket = channel.accept(); - while( socket!=null ) { - try { - socket.configureBlocking(false); - val session = actorOf(new Session(self, socket)) - session.start() - sessions += session - } catch { - case e: Exception => - socket.close - } - socket = channel.accept(); - } - } - - def receive = { - case SessionClosed(session) => - sessions = sessions.filterNot( _ == session ) - session.stop - case DisplayStats => - sessions.foreach { session=> - session ! DisplayStats - } - case Shutdown => - sessions.foreach { session=> - session.stop - } - sessions.clear - accept_source.release - self.stop - } - } - - class Session(val server:ActorRef, val channel: SocketChannel) extends Actor { - - self.dispatcher = hawt - - val buffer = ByteBuffer.allocate(1024); - val remote_address = channel.socket.getRemoteSocketAddress.toString - - var read_source:DispatchSource = _ - var write_source:DispatchSource = _ - - var readCounter = 0L - var writeCounter = 0L - var closed = false - - override def preStart = { - - if(useReactorPattern) { - // Then we will be using the reactor pattern for handling IO: - // Pin this actor to a single thread. The read/write event sources will poll - // a Selector on the pinned thread. Since the IO events are generated on the same - // thread as where the Actor is pinned to, it can avoid a substantial amount - // thread synchronization. Plus your GC will perform better since all the IO - // processing is done on a single thread. - HawtDispatcher.pin(self) - } else { - // Then we will be using sing the proactor pattern for handling IO: - // Then the actor will not be pinned to a specific thread. The read/write - // event sources will poll a Selector and then asynchronously dispatch the - // event's to the actor via the thread pool. - } - - // Setup the sources, they will callback to the handler methods - // via the actor's mailbox so you don't need to worry about - // synchronizing with the local variables - read_source = createSource(channel, SelectionKey.OP_READ, HawtDispatcher.queue(self)); - read_source.setEventHandler(^{ read }) - read_source.setCancelHandler(^{ close }) - - write_source = createSource(channel, SelectionKey.OP_WRITE, HawtDispatcher.queue(self)); - write_source.setEventHandler(^{ write }) - write_source.setCancelHandler(^{ close }) - - read_source.resume - println("Accepted connection from: "+remote_address); - } - - override def postStop = { - closed = true - read_source.release - write_source.release - channel.close - } - - private def catchio(func: =>Unit):Unit = { - try { - func - } catch { - case e:IOException => close - } - } - - def read():Unit = catchio { - channel.read(buffer) match { - case -1 => - close // peer disconnected. - case 0 => - case count:Int => - readCounter += count - buffer.flip; - read_source.suspend - write_source.resume - write() - } - } - - def write() = catchio { - writeCounter += channel.write(buffer) - if (buffer.remaining == 0) { - buffer.clear - write_source.suspend - read_source.resume - } - } - - def close() = { - if( !closed ) { - closed = true - server ! SessionClosed(self) - } - } - - def receive = { - case DisplayStats => - println("connection to %s reads: %,d bytes, writes: %,d".format(remote_address, readCounter, writeCounter)) - } - } -} diff --git a/akka-sbt-plugin/src/main/scala/AkkaProject.scala b/akka-sbt-plugin/src/main/scala/AkkaProject.scala index 7266b1f851..10660639d0 100644 --- a/akka-sbt-plugin/src/main/scala/AkkaProject.scala +++ b/akka-sbt-plugin/src/main/scala/AkkaProject.scala @@ -30,7 +30,6 @@ trait AkkaBaseProject extends BasicScalaProject { val facebookModuleConfig = ModuleConfiguration("com.facebook", AkkaRepo) val h2lzfModuleConfig = ModuleConfiguration("voldemort.store.compress", AkkaRepo) val hbaseModuleConfig = ModuleConfiguration("org.apache.hbase", AkkaRepo) - val jsr166xModuleConfig = ModuleConfiguration("jsr166x", AkkaRepo) val memcachedModuleConfig = ModuleConfiguration("spy", "memcached", AkkaRepo) val netLagModuleConfig = ModuleConfiguration("net.lag", AkkaRepo) val redisModuleConfig = ModuleConfiguration("com.redis", AkkaRepo) diff --git a/config/akka-reference.conf b/config/akka-reference.conf index dda07c8030..150cbc6773 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -34,10 +34,8 @@ akka { default-dispatcher { type = "GlobalExecutorBasedEventDriven" # Must be one of the following, all "Global*" are non-configurable - # - Hawt # - ExecutorBasedEventDriven # - ExecutorBasedEventDrivenWorkStealing - # - GlobalHawt # - GlobalExecutorBasedEventDriven keep-alive-time = 60 # Keep alive time for threads core-pool-size-factor = 1.0 # No of core threads ... ceil(available processors * factor) @@ -47,7 +45,6 @@ akka { rejection-policy = "caller-runs" # abort, caller-runs, discard-oldest, discard throughput = 5 # Throughput for ExecutorBasedEventDrivenDispatcher, set to 1 for complete fairness throughput-deadline-time = -1 # Throughput deadline for ExecutorBasedEventDrivenDispatcher, set to 0 or negative for no deadline - aggregate = off # Aggregate on/off for HawtDispatchers mailbox-capacity = -1 # If negative (or zero) then an unbounded mailbox is used (default) # If positive then a bounded mailbox is used and the capacity is set using the property # NOTE: setting a mailbox to 'blocking' can be a bit dangerous, @@ -104,7 +101,7 @@ akka { #If you are using akka.http.AkkaMistServlet mist-dispatcher { - #type = "Hawt" # Uncomment if you want to use a different dispatcher than the default one for Comet + #type = "GlobalExecutorBasedEventDriven" # Uncomment if you want to use a different dispatcher than the default one for Comet } connection-close = true # toggles the addition of the "Connection" response header with a "close" value root-actor-id = "_httproot" # the id of the actor to use as the root endpoint diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index edbd1315b3..b09e4d60ee 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -111,7 +111,6 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { // ------------------------------------------------------------------------------------------------------------------- lazy val DISPATCH_VERSION = "0.7.4" - lazy val HAWT_DISPATCH_VERSION = "1.1" lazy val JACKSON_VERSION = "1.4.3" lazy val JERSEY_VERSION = "1.3" lazy val MULTIVERSE_VERSION = "0.6.2" @@ -159,8 +158,6 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { lazy val h2_lzf = "voldemort.store.compress" % "h2-lzf" % "1.0" % "compile" //ApacheV2 - lazy val hawtdispatch = "org.fusesource.hawtdispatch" % "hawtdispatch-scala" % HAWT_DISPATCH_VERSION % "compile" //ApacheV2 - lazy val jackson = "org.codehaus.jackson" % "jackson-mapper-asl" % JACKSON_VERSION % "compile" //ApacheV2 lazy val jackson_core = "org.codehaus.jackson" % "jackson-core-asl" % JACKSON_VERSION % "compile" //ApacheV2 @@ -169,8 +166,6 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { lazy val jersey_server = "com.sun.jersey" % "jersey-server" % JERSEY_VERSION % "compile" //CDDL v1 lazy val jersey_contrib = "com.sun.jersey.contribs" % "jersey-scala" % JERSEY_VERSION % "compile" //CDDL v1 - lazy val jsr166x = "jsr166x" % "jsr166x" % "1.0" % "compile" //CC Public Domain - lazy val jsr250 = "javax.annotation" % "jsr250-api" % "1.0" % "compile" //CDDL v1 lazy val jsr311 = "javax.ws.rs" % "jsr311-api" % "1.1" % "compile" //CDDL v1 @@ -322,8 +317,6 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { class AkkaActorProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) { val uuid = Dependencies.uuid val configgy = Dependencies.configgy - val hawtdispatch = Dependencies.hawtdispatch - val jsr166x = Dependencies.jsr166x val logback = Dependencies.logback // testing