diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index 46bf609c7a..deafb9cdc1 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -110,8 +110,9 @@ object ActorModelSpec { val stops = new AtomicLong(0) def getStats(actorRef: ActorRef) = { - stats.putIfAbsent(actorRef, new InterceptorStats) match { - case null ⇒ stats.get(actorRef) + val is = new InterceptorStats + stats.putIfAbsent(actorRef, is) match { + case null ⇒ is case other ⇒ other } } @@ -127,12 +128,12 @@ object ActorModelSpec { } protected[akka] abstract override def register(actor: ActorCell) { - getStats(actor.self).registers.incrementAndGet() + assert(getStats(actor.self).registers.incrementAndGet() == 1) super.register(actor) } protected[akka] abstract override def unregister(actor: ActorCell) { - getStats(actor.self).unregisters.incrementAndGet() + assert(getStats(actor.self).unregisters.incrementAndGet() == 1) super.unregister(actor) } @@ -368,13 +369,18 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa val buddies = dispatcher.buddies val mq = dispatcher.messageQueue - System.err.println("Buddies left: ") - buddies.toArray foreach { + System.err.println("Buddies left: " + buddies.size + " stopLatch: " + stopLatch.getCount + " inhab:" + dispatcher.inhabitants) + buddies.toArray sorted new Ordering[AnyRef] { + def compare(l: AnyRef, r: AnyRef) = (l, r) match { + case (ll: ActorCell, rr: ActorCell) ⇒ ll.self.path.toString.compareTo(rr.self.path.toString) + } + } foreach { case cell: ActorCell ⇒ System.err.println(" - " + cell.self.path + " " + cell.isTerminated + " " + cell.mailbox.status + " " + cell.mailbox.numberOfMessages + " " + SystemMessage.size(cell.mailbox.systemDrain())) } - System.err.println("Mailbox: " + mq.numberOfMessages + " " + mq.hasMessages + " ") + System.err.println("Mailbox: " + mq.numberOfMessages + " " + mq.hasMessages) + Iterator.continually(mq.dequeue) takeWhile (_ ne null) foreach System.err.println case _ ⇒ } @@ -540,7 +546,8 @@ object BalancingDispatcherModelSpec { Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS), mailboxType, configureExecutor(), - Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS)) with MessageDispatcherInterceptor + Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS), + config.getBoolean("attempt-teamwork")) with MessageDispatcherInterceptor override def dispatcher(): MessageDispatcher = instance } diff --git a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala index dd5149ad8e..127907412e 100644 --- a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala @@ -53,6 +53,7 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference) { c.getMilliseconds("shutdown-timeout") must equal(1 * 1000) c.getInt("throughput") must equal(5) c.getMilliseconds("throughput-deadline-time") must equal(0) + c.getBoolean("attempt-teamwork") must equal(true) } //Fork join executor config diff --git a/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala index 2130afe107..1f78c64edf 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala @@ -11,6 +11,7 @@ import akka.util.duration._ import akka.actor.ActorRef import java.util.concurrent.atomic.AtomicInteger import akka.pattern.ask +import akka.util.Duration object ResizerSpec { @@ -160,53 +161,48 @@ class ResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultTimeout with // as influenced by the backlog of blocking pooled actors val resizer = DefaultResizer( - lowerBound = 2, - upperBound = 4, + lowerBound = 3, + upperBound = 5, rampupRate = 0.1, + backoffRate = 0.0, pressureThreshold = 1, messagesPerResize = 1, backoffThreshold = 0.0) val router = system.actorOf(Props(new Actor { def receive = { - case (n: Int, latch: TestLatch, count: AtomicInteger) ⇒ - (n millis).dilated.sleep - count.incrementAndGet - latch.countDown() + case d: Duration ⇒ d.dilated.sleep; sender ! "done" + case "echo" ⇒ sender ! "reply" } }).withRouter(RoundRobinRouter(resizer = Some(resizer)))) // first message should create the minimum number of routees - router ! 1 + router ! "echo" + expectMsg("reply") - Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees.size must be(2) + def routees(r: ActorRef): Int = { + r ! CurrentRoutees + expectMsgType[RouterRoutees].routees.size + } - def loop(loops: Int, t: Int, latch: TestLatch, count: AtomicInteger) = { - (100 millis).dilated.sleep - for (m ← 0 until loops) { - router.!((t, latch, count)) - (100 millis).dilated.sleep - } + routees(router) must be(3) + + def loop(loops: Int, d: Duration) = { + for (m ← 0 until loops) router ! d + for (m ← 0 until loops) expectMsg(d * 3, "done") } // 2 more should go thru without triggering more - val count1 = new AtomicInteger - val latch1 = TestLatch(2) - loop(2, 200, latch1, count1) - Await.ready(latch1, TestLatch.DefaultTimeout) - count1.get must be(2) + loop(2, 200 millis) - Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees.size must be(2) + routees(router) must be(3) // a whole bunch should max it out - val count2 = new AtomicInteger - val latch2 = TestLatch(10) - loop(10, 500, latch2, count2) - Await.ready(latch2, TestLatch.DefaultTimeout) - count2.get must be(10) - - Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees.size must be(4) + loop(10, 500 millis) + awaitCond(routees(router) > 3) + loop(10, 500 millis) + awaitCond(routees(router) == 5) } "backoff" in { @@ -239,7 +235,7 @@ class ResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultTimeout with (300 millis).dilated.sleep // let it cool down - for (m ← 0 to 3) { + for (m ← 0 to 5) { router ! 1 (500 millis).dilated.sleep } diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index 62d49d61cd..8240aee4d7 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -245,6 +245,11 @@ akka { # mailbox is used. The Class of the FQCN must have a constructor with a # com.typesafe.config.Config parameter. mailbox-type = "" + + # For BalancingDispatcher: If the balancing dispatcher should attempt to + # schedule idle actors using the same dispatcher when a message comes in, + # and the dispatchers ExecutorService is not fully busy already. + attempt-teamwork = on } debug { diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index aa718e12c8..c22529b2c8 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -285,14 +285,18 @@ private[akka] class ActorCell( final def isTerminated: Boolean = mailbox.isClosed final def start(): Unit = { + /* + * Create the mailbox and enqueue the Create() message to ensure that + * this is processed before anything else. + */ mailbox = dispatcher.createMailbox(this) + // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ + mailbox.systemEnqueue(self, Create()) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ parent.sendSystemMessage(akka.dispatch.Supervise(self)) - // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - dispatcher.systemDispatch(this, Create()) - + // This call is expected to start off the actor by scheduling its mailbox. dispatcher.attach(this) } diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 1b31be630c..6046e249af 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -156,7 +156,10 @@ trait ExecutionContext { * log the problem or whatever is appropriate for the implementation. */ def reportFailure(t: Throwable): Unit +} +private[akka] trait LoadMetrics { self: Executor ⇒ + def atFullThrottle(): Boolean } object MessageDispatcher { @@ -185,9 +188,14 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext def id: String /** - * Attaches the specified actor instance to this dispatcher + * Attaches the specified actor instance to this dispatcher, which includes + * scheduling it to run for the first time (Create() is expected to have + * been enqueued by the ActorCell upon mailbox creation). */ - final def attach(actor: ActorCell): Unit = register(actor) + final def attach(actor: ActorCell): Unit = { + register(actor) + registerForExecution(actor.mailbox, false, true) + } /** * Detaches the specified actor instance from this dispatcher @@ -243,7 +251,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext () ⇒ if (inhabitantsUpdater.decrementAndGet(this) == 0) ifSensibleToDoSoThenScheduleShutdown() /** - * If you override it, you must call it. But only ever once. See "attach" for only invocation + * If you override it, you must call it. But only ever once. See "attach" for only invocation. */ protected[akka] def register(actor: ActorCell) { inhabitantsUpdater.incrementAndGet(this) @@ -260,6 +268,8 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext mailBox.cleanUp() } + def inhabitants: Long = inhabitantsUpdater.get(this) + private val shutdownAction = new Runnable { @tailrec final def run() { @@ -440,11 +450,13 @@ object ForkJoinExecutorConfigurator { final class AkkaForkJoinPool(parallelism: Int, threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, unhandledExceptionHandler: Thread.UncaughtExceptionHandler) - extends ForkJoinPool(parallelism, threadFactory, unhandledExceptionHandler, true) { + extends ForkJoinPool(parallelism, threadFactory, unhandledExceptionHandler, true) with LoadMetrics { override def execute(r: Runnable): Unit = r match { case m: Mailbox ⇒ super.execute(new MailboxExecutionTask(m)) case other ⇒ super.execute(other) } + + def atFullThrottle(): Boolean = this.getActiveThreadCount() >= this.getParallelism() } /** diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index 8542ac69c8..d2d978341c 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -4,12 +4,11 @@ package akka.dispatch -import util.DynamicVariable import akka.actor.{ ActorCell, ActorRef } -import java.util.concurrent.{ LinkedBlockingQueue, ConcurrentLinkedQueue, ConcurrentSkipListSet } import annotation.tailrec -import java.util.concurrent.atomic.AtomicBoolean -import akka.util.Duration +import akka.util.{ Duration, Helpers } +import java.util.{ Comparator, Iterator } +import java.util.concurrent.{ Executor, LinkedBlockingQueue, ConcurrentLinkedQueue, ConcurrentSkipListSet } /** * An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed @@ -32,20 +31,27 @@ class BalancingDispatcher( throughputDeadlineTime: Duration, mailboxType: MailboxType, _executorServiceFactoryProvider: ExecutorServiceFactoryProvider, - _shutdownTimeout: Duration) + _shutdownTimeout: Duration, + attemptTeamWork: Boolean) extends Dispatcher(_prerequisites, _id, throughput, throughputDeadlineTime, mailboxType, _executorServiceFactoryProvider, _shutdownTimeout) { - val buddies = new ConcurrentSkipListSet[ActorCell](akka.util.Helpers.IdentityHashComparator) - val rebalance = new AtomicBoolean(false) + val buddies = new ConcurrentSkipListSet[ActorCell]( + Helpers.identityHashComparator(new Comparator[ActorCell] { + def compare(l: ActorCell, r: ActorCell) = l.self.path compareTo r.self.path + })) val messageQueue: MessageQueue = mailboxType match { - case u: UnboundedMailbox ⇒ new QueueBasedMessageQueue with UnboundedMessageQueueSemantics { - final val queue = new ConcurrentLinkedQueue[Envelope] - } - case BoundedMailbox(cap, timeout) ⇒ new QueueBasedMessageQueue with BoundedMessageQueueSemantics { - final val queue = new LinkedBlockingQueue[Envelope](cap) - final val pushTimeOut = timeout - } + case UnboundedMailbox() ⇒ + new QueueBasedMessageQueue with UnboundedMessageQueueSemantics { + final val queue = new ConcurrentLinkedQueue[Envelope] + } + + case BoundedMailbox(cap, timeout) ⇒ + new QueueBasedMessageQueue with BoundedMessageQueueSemantics { + final val queue = new LinkedBlockingQueue[Envelope](cap) + final val pushTimeOut = timeout + } + case other ⇒ throw new IllegalArgumentException("Only handles BoundedMailbox and UnboundedMailbox, but you specified [" + other + "]") } @@ -84,30 +90,20 @@ class BalancingDispatcher( protected[akka] override def unregister(actor: ActorCell) = { buddies.remove(actor) super.unregister(actor) - intoTheFray(except = actor) //When someone leaves, he tosses a friend into the fray + if (messageQueue.hasMessages) scheduleOne() } - def intoTheFray(except: ActorCell): Unit = - if (rebalance.compareAndSet(false, true)) { - try { - val i = buddies.iterator() - - @tailrec - def throwIn(): Unit = { - val n = if (i.hasNext) i.next() else null - if (n eq null) () - else if ((n ne except) && registerForExecution(n.mailbox, false, false)) () - else throwIn() - } - throwIn() - } finally { - rebalance.set(false) - } - } - override protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope) = { messageQueue.enqueue(receiver.self, invocation) - registerForExecution(receiver.mailbox, false, false) - intoTheFray(except = receiver) + if (!registerForExecution(receiver.mailbox, false, false) && doTeamWork) scheduleOne() } -} + + protected def doTeamWork(): Boolean = + attemptTeamWork && (executorService.get().executor match { + case lm: LoadMetrics ⇒ lm.atFullThrottle == false + case other ⇒ true + }) + + @tailrec private def scheduleOne(i: Iterator[ActorCell] = buddies.iterator): Unit = + if (i.hasNext && !registerForExecution(i.next.mailbox, false, false)) scheduleOne(i) +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala index a735ea367e..2046f02286 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala @@ -32,12 +32,11 @@ class Dispatcher( val shutdownTimeout: Duration) extends MessageDispatcher(_prerequisites) { - protected[akka] val executorServiceFactory: ExecutorServiceFactory = + protected val executorServiceFactory: ExecutorServiceFactory = executorServiceFactoryProvider.createExecutorServiceFactory(id, prerequisites.threadFactory) - protected[akka] val executorService = new AtomicReference[ExecutorService](new ExecutorServiceDelegate { - lazy val executor = executorServiceFactory.createExecutorService - }) + protected val executorService = new AtomicReference[ExecutorServiceDelegate]( + new ExecutorServiceDelegate { lazy val executor = executorServiceFactory.createExecutorService }) protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope) = { val mbox = receiver.mailbox diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala index 8e99e05b06..5f4528146d 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala @@ -189,7 +189,8 @@ class BalancingDispatcherConfigurator(config: Config, prerequisites: DispatcherP config.getInt("throughput"), Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS), mailboxType, configureExecutor(), - Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS)) + Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS), + config.getBoolean("attempt-teamwork")) /** * Returns the same dispatcher instance for each invocation diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index cc15ae2173..4c50cb5c8d 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -190,7 +190,10 @@ private[akka] abstract class Mailbox(val actor: ActorCell) extends MessageQueue var nextMessage = systemDrain() try { while ((nextMessage ne null) && !isClosed) { - if (debug) println(actor.self + " processing system message " + nextMessage + " with children " + actor.childrenRefs) + if (debug) println(actor.self + " processing system message " + nextMessage + " with " + + (if (actor.childrenRefs.isEmpty) "no children" + else if (actor.childrenRefs.size > 20) actor.childrenRefs.size + " children" + else actor.childrenRefs.mkString("children:\n ", "\n ", ""))) actor systemInvoke nextMessage nextMessage = nextMessage.next // don’t ever execute normal message when system message present! @@ -236,15 +239,26 @@ private[akka] abstract class Mailbox(val actor: ActorCell) extends MessageQueue } trait MessageQueue { - /* - * These method need to be implemented in subclasses; they should not rely on the internal stuff above. + /** + * Try to enqueue the message to this queue, or throw an exception. */ - def enqueue(receiver: ActorRef, handle: Envelope) + def enqueue(receiver: ActorRef, handle: Envelope): Unit // NOTE: receiver is used only in two places, but cannot be removed + /** + * Try to dequeue the next message from this queue, return null failing that. + */ def dequeue(): Envelope + /** + * Should return the current number of messages held in this queue; may + * always return 0 if no other value is available efficiently. Do not use + * this for testing for presence of messages, use `hasMessages` instead. + */ def numberOfMessages: Int + /** + * Indicates whether this queue is non-empty. + */ def hasMessages: Boolean } @@ -292,15 +306,15 @@ trait DefaultSystemMessageQueue { self: Mailbox ⇒ } trait UnboundedMessageQueueSemantics extends QueueBasedMessageQueue { - final def enqueue(receiver: ActorRef, handle: Envelope): Unit = queue add handle - final def dequeue(): Envelope = queue.poll() + def enqueue(receiver: ActorRef, handle: Envelope): Unit = queue add handle + def dequeue(): Envelope = queue.poll() } trait BoundedMessageQueueSemantics extends QueueBasedMessageQueue { def pushTimeOut: Duration override def queue: BlockingQueue[Envelope] - final def enqueue(receiver: ActorRef, handle: Envelope) { + def enqueue(receiver: ActorRef, handle: Envelope) { if (pushTimeOut.length > 0) { queue.offer(handle, pushTimeOut.length, pushTimeOut.unit) || { throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + receiver) @@ -308,13 +322,13 @@ trait BoundedMessageQueueSemantics extends QueueBasedMessageQueue { } else queue put handle } - final def dequeue(): Envelope = queue.poll() + def dequeue(): Envelope = queue.poll() } trait QueueBasedMessageQueue extends MessageQueue { def queue: Queue[Envelope] - final def numberOfMessages = queue.size - final def hasMessages = !queue.isEmpty + def numberOfMessages = queue.size + def hasMessages = !queue.isEmpty } /** diff --git a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala index 1c63831013..b6fd432296 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala @@ -81,14 +81,16 @@ case class ThreadPoolConfig(allowCorePoolTimeout: Boolean = ThreadPoolConfig.def extends ExecutorServiceFactoryProvider { class ThreadPoolExecutorServiceFactory(val threadFactory: ThreadFactory) extends ExecutorServiceFactory { def createExecutorService: ExecutorService = { - val service = new ThreadPoolExecutor( + val service: ThreadPoolExecutor = new ThreadPoolExecutor( corePoolSize, maxPoolSize, threadTimeout.length, threadTimeout.unit, queueFactory(), threadFactory, - rejectionPolicy) + rejectionPolicy) with LoadMetrics { + def atFullThrottle(): Boolean = this.getActiveCount >= this.getPoolSize + } service.allowCoreThreadTimeOut(allowCorePoolTimeout) service } @@ -182,7 +184,7 @@ case class MonitorableThreadFactory(name: String, protected def wire[T <: Thread](t: T): T = { t.setUncaughtExceptionHandler(exceptionHandler) t.setDaemon(daemonic) - contextClassLoader foreach (t.setContextClassLoader(_)) + contextClassLoader foreach t.setContextClassLoader t } } diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index da2c81d1a7..4ff6609255 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -1028,7 +1028,8 @@ case class DefaultResizer( */ def capacity(routees: IndexedSeq[ActorRef]): Int = { val currentSize = routees.size - val delta = filter(pressure(routees), currentSize) + val press = pressure(routees) + val delta = filter(press, currentSize) val proposed = currentSize + delta if (proposed < lowerBound) delta + (lowerBound - proposed) @@ -1058,7 +1059,7 @@ case class DefaultResizer( case a: LocalActorRef ⇒ val cell = a.underlying pressureThreshold match { - case 1 ⇒ cell.mailbox.isScheduled && cell.currentMessage != null + case 1 ⇒ cell.mailbox.isScheduled && cell.mailbox.hasMessages case i if i < 1 ⇒ cell.mailbox.isScheduled && cell.currentMessage != null case threshold ⇒ cell.mailbox.numberOfMessages >= threshold } diff --git a/akka-actor/src/main/scala/akka/util/Helpers.scala b/akka-actor/src/main/scala/akka/util/Helpers.scala index 60e6be8b65..25cb279f2e 100644 --- a/akka-actor/src/main/scala/akka/util/Helpers.scala +++ b/akka-actor/src/main/scala/akka/util/Helpers.scala @@ -21,8 +21,18 @@ object Helpers { if (diff > 0) 1 else if (diff < 0) -1 else 0 } - val IdentityHashComparator = new Comparator[AnyRef] { - def compare(a: AnyRef, b: AnyRef): Int = compareIdentityHash(a, b) + /** + * Create a comparator which will efficiently use `System.identityHashCode`, + * unless that happens to be the same for two non-equals objects, in which + * case the supplied “real” comparator is used; the comparator must be + * consistent with equals, otherwise it would not be an enhancement over + * the identityHashCode. + */ + def identityHashComparator[T <: AnyRef](comp: Comparator[T]): Comparator[T] = new Comparator[T] { + def compare(a: T, b: T): Int = compareIdentityHash(a, b) match { + case 0 if a != b ⇒ comp.compare(a, b) + case x ⇒ x + } } final val base64chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789+~" diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index 8282ee58f5..8b2d15a079 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -8,7 +8,7 @@ import java.util.concurrent.locks.ReentrantLock import java.util.LinkedList import scala.annotation.tailrec import com.typesafe.config.Config -import akka.actor.{ ExtensionIdProvider, ExtensionId, Extension, ExtendedActorSystem, ActorRef, ActorCell } +import akka.actor.{ ActorInitializationException, ExtensionIdProvider, ExtensionId, Extension, ExtendedActorSystem, ActorRef, ActorCell } import akka.dispatch.{ TaskInvocation, SystemMessage, Suspend, Resume, MessageDispatcherConfigurator, MessageDispatcher, Mailbox, Envelope, DispatcherPrerequisites, DefaultSystemMessageQueue } import akka.util.duration.intToDurationInt import akka.util.{ Switch, Duration } @@ -132,6 +132,17 @@ class CallingThreadDispatcher( protected[akka] override def shutdownTimeout = 1 second + protected[akka] override def register(actor: ActorCell): Unit = { + super.register(actor) + actor.mailbox match { + case mbox: CallingThreadMailbox ⇒ + val queue = mbox.queue + queue.enter + runQueue(mbox, queue) + case x ⇒ throw new ActorInitializationException("expected CallingThreadMailbox, got " + x.getClass) + } + } + override def suspend(actor: ActorCell) { actor.mailbox match { case m: CallingThreadMailbox ⇒ m.suspendSwitch.switchOn