diff --git a/akka-actor-tests/src/test/scala/akka/event/EventBusSpec.scala b/akka-actor-tests/src/test/scala/akka/event/EventBusSpec.scala index afd255a27c..6e2f672507 100644 --- a/akka-actor-tests/src/test/scala/akka/event/EventBusSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/event/EventBusSpec.scala @@ -173,7 +173,7 @@ object ScanningEventBusSpec { class MyScanningEventBus extends ScanningEventBus[Int, akka.japi.Procedure[Int], String] { protected def compareClassifiers(a: Classifier, b: Classifier): Int = a compareTo b - protected def compareSubscribers(a: Subscriber, b: Subscriber): Int = System.identityHashCode(a) - System.identityHashCode(b) + protected def compareSubscribers(a: Subscriber, b: Subscriber): Int = akka.util.Helpers.compareIdentityHash(a, b) protected def matches(classifier: Classifier, event: Event): Boolean = event.toString == classifier @@ -200,7 +200,7 @@ class ScanningEventBusSpec extends EventBusSpec("ScanningEventBus") { object LookupEventBusSpec { class MyLookupEventBus extends akka.event.japi.LookupEventBus[Int, akka.japi.Procedure[Int], String] { protected def classify(event: Event): Classifier = event.toString - protected def compareSubscribers(a: Subscriber, b: Subscriber): Int = System.identityHashCode(a) - System.identityHashCode(b) + protected def compareSubscribers(a: Subscriber, b: Subscriber): Int = akka.util.Helpers.compareIdentityHash(a, b) protected def mapSize = 32 protected def publish(event: Event, subscriber: Subscriber): Unit = subscriber(event) } diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 144110db4b..2611ae6926 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -222,8 +222,9 @@ abstract class MessageDispatcher(val app: ActorSystem) extends Serializable { if (mailBox.hasSystemMessages) { var message = mailBox.systemDrain() while (message ne null) { + val next = message.next deadLetterMailbox.systemEnqueue(message) - message = message.next + message = next } } diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index a366a246b6..33545d551f 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -37,18 +37,7 @@ class BalancingDispatcher( _timeoutMs: Long) extends Dispatcher(_app, _name, throughput, throughputDeadlineTime, mailboxType, config, _timeoutMs) { - private val buddies = new ConcurrentSkipListSet[ActorCell]( - new Comparator[ActorCell] { - def compare(a: ActorCell, b: ActorCell): Int = { - /* - * make sure that there is no overflow or underflow in comparisons, so - * that the ordering is actually consistent and you cannot have a - * sequence which cyclically is monotone without end. - */ - val diff = ((System.identityHashCode(a) & 0xffffffffL) - (System.identityHashCode(b) & 0xffffffffL)) - if (diff > 0) 1 else if (diff < 0) -1 else 0 - } - }) + private val buddies = new ConcurrentSkipListSet[ActorCell](akka.util.Helpers.IdentityHashComparator) protected val messageQueue: MessageQueue = mailboxType match { case u: UnboundedMailbox ⇒ new QueueBasedMessageQueue with UnboundedMessageQueueSemantics { diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index ae54c5bf50..34729914c2 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -244,6 +244,7 @@ trait DefaultSystemMessageQueue { self: Mailbox ⇒ @tailrec final def systemEnqueue(message: SystemMessage): Unit = { + assert(message.next eq null) if (Mailbox.debug) println(actor + " having enqueued " + message) val head = systemQueueGet /* @@ -253,7 +254,10 @@ trait DefaultSystemMessageQueue { self: Mailbox ⇒ * Hence, SystemMessage.next does not need to be volatile. */ message.next = head - if (!systemQueuePut(head, message)) systemEnqueue(message) + if (!systemQueuePut(head, message)) { + message.next = null + systemEnqueue(message) + } } @tailrec diff --git a/akka-actor/src/main/scala/akka/util/Helpers.scala b/akka-actor/src/main/scala/akka/util/Helpers.scala index 0975081d97..14af35523c 100644 --- a/akka-actor/src/main/scala/akka/util/Helpers.scala +++ b/akka-actor/src/main/scala/akka/util/Helpers.scala @@ -4,6 +4,7 @@ package akka.util import java.io.{ PrintWriter, StringWriter } +import java.util.Comparator /** * @author Jonas Bonér @@ -12,6 +13,20 @@ object Helpers { implicit def null2Option[T](t: T): Option[T] = Option(t) + def compareIdentityHash(a: AnyRef, b: AnyRef): Int = { + /* + * make sure that there is no overflow or underflow in comparisons, so + * that the ordering is actually consistent and you cannot have a + * sequence which cyclically is monotone without end. + */ + val diff = ((System.identityHashCode(a) & 0xffffffffL) - (System.identityHashCode(b) & 0xffffffffL)) + if (diff > 0) 1 else if (diff < 0) -1 else 0 + } + + val IdentityHashComparator = new Comparator[AnyRef] { + def compare(a: AnyRef, b: AnyRef): Int = compareIdentityHash(a, b) + } + def intToBytes(value: Int): Array[Byte] = { val bytes = new Array[Byte](4) bytes(0) = (value >>> 24).asInstanceOf[Byte]