fix some bugs, but probably not the pesky ones
- enqueuing system messages to DeadLetterMailbox was broken in principle, but not in practice for the current default deadLetter implementation - add assert that system messages are not enqueued multiple times - *BusSpec was using incorrect compareSubscribers based on identityHashCode, so moved the proper solution out of BalancingDispatcher and into akka.util.Helpers and reuse that in all places
This commit is contained in:
parent
9a10953219
commit
3808853845
5 changed files with 25 additions and 16 deletions
|
|
@ -173,7 +173,7 @@ object ScanningEventBusSpec {
|
||||||
|
|
||||||
class MyScanningEventBus extends ScanningEventBus[Int, akka.japi.Procedure[Int], String] {
|
class MyScanningEventBus extends ScanningEventBus[Int, akka.japi.Procedure[Int], String] {
|
||||||
protected def compareClassifiers(a: Classifier, b: Classifier): Int = a compareTo b
|
protected def compareClassifiers(a: Classifier, b: Classifier): Int = a compareTo b
|
||||||
protected def compareSubscribers(a: Subscriber, b: Subscriber): Int = System.identityHashCode(a) - System.identityHashCode(b)
|
protected def compareSubscribers(a: Subscriber, b: Subscriber): Int = akka.util.Helpers.compareIdentityHash(a, b)
|
||||||
|
|
||||||
protected def matches(classifier: Classifier, event: Event): Boolean = event.toString == classifier
|
protected def matches(classifier: Classifier, event: Event): Boolean = event.toString == classifier
|
||||||
|
|
||||||
|
|
@ -200,7 +200,7 @@ class ScanningEventBusSpec extends EventBusSpec("ScanningEventBus") {
|
||||||
object LookupEventBusSpec {
|
object LookupEventBusSpec {
|
||||||
class MyLookupEventBus extends akka.event.japi.LookupEventBus[Int, akka.japi.Procedure[Int], String] {
|
class MyLookupEventBus extends akka.event.japi.LookupEventBus[Int, akka.japi.Procedure[Int], String] {
|
||||||
protected def classify(event: Event): Classifier = event.toString
|
protected def classify(event: Event): Classifier = event.toString
|
||||||
protected def compareSubscribers(a: Subscriber, b: Subscriber): Int = System.identityHashCode(a) - System.identityHashCode(b)
|
protected def compareSubscribers(a: Subscriber, b: Subscriber): Int = akka.util.Helpers.compareIdentityHash(a, b)
|
||||||
protected def mapSize = 32
|
protected def mapSize = 32
|
||||||
protected def publish(event: Event, subscriber: Subscriber): Unit = subscriber(event)
|
protected def publish(event: Event, subscriber: Subscriber): Unit = subscriber(event)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -222,8 +222,9 @@ abstract class MessageDispatcher(val app: ActorSystem) extends Serializable {
|
||||||
if (mailBox.hasSystemMessages) {
|
if (mailBox.hasSystemMessages) {
|
||||||
var message = mailBox.systemDrain()
|
var message = mailBox.systemDrain()
|
||||||
while (message ne null) {
|
while (message ne null) {
|
||||||
|
val next = message.next
|
||||||
deadLetterMailbox.systemEnqueue(message)
|
deadLetterMailbox.systemEnqueue(message)
|
||||||
message = message.next
|
message = next
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -37,18 +37,7 @@ class BalancingDispatcher(
|
||||||
_timeoutMs: Long)
|
_timeoutMs: Long)
|
||||||
extends Dispatcher(_app, _name, throughput, throughputDeadlineTime, mailboxType, config, _timeoutMs) {
|
extends Dispatcher(_app, _name, throughput, throughputDeadlineTime, mailboxType, config, _timeoutMs) {
|
||||||
|
|
||||||
private val buddies = new ConcurrentSkipListSet[ActorCell](
|
private val buddies = new ConcurrentSkipListSet[ActorCell](akka.util.Helpers.IdentityHashComparator)
|
||||||
new Comparator[ActorCell] {
|
|
||||||
def compare(a: ActorCell, b: ActorCell): Int = {
|
|
||||||
/*
|
|
||||||
* make sure that there is no overflow or underflow in comparisons, so
|
|
||||||
* that the ordering is actually consistent and you cannot have a
|
|
||||||
* sequence which cyclically is monotone without end.
|
|
||||||
*/
|
|
||||||
val diff = ((System.identityHashCode(a) & 0xffffffffL) - (System.identityHashCode(b) & 0xffffffffL))
|
|
||||||
if (diff > 0) 1 else if (diff < 0) -1 else 0
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
protected val messageQueue: MessageQueue = mailboxType match {
|
protected val messageQueue: MessageQueue = mailboxType match {
|
||||||
case u: UnboundedMailbox ⇒ new QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
|
case u: UnboundedMailbox ⇒ new QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
|
||||||
|
|
|
||||||
|
|
@ -244,6 +244,7 @@ trait DefaultSystemMessageQueue { self: Mailbox ⇒
|
||||||
|
|
||||||
@tailrec
|
@tailrec
|
||||||
final def systemEnqueue(message: SystemMessage): Unit = {
|
final def systemEnqueue(message: SystemMessage): Unit = {
|
||||||
|
assert(message.next eq null)
|
||||||
if (Mailbox.debug) println(actor + " having enqueued " + message)
|
if (Mailbox.debug) println(actor + " having enqueued " + message)
|
||||||
val head = systemQueueGet
|
val head = systemQueueGet
|
||||||
/*
|
/*
|
||||||
|
|
@ -253,7 +254,10 @@ trait DefaultSystemMessageQueue { self: Mailbox ⇒
|
||||||
* Hence, SystemMessage.next does not need to be volatile.
|
* Hence, SystemMessage.next does not need to be volatile.
|
||||||
*/
|
*/
|
||||||
message.next = head
|
message.next = head
|
||||||
if (!systemQueuePut(head, message)) systemEnqueue(message)
|
if (!systemQueuePut(head, message)) {
|
||||||
|
message.next = null
|
||||||
|
systemEnqueue(message)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@tailrec
|
@tailrec
|
||||||
|
|
|
||||||
|
|
@ -4,6 +4,7 @@
|
||||||
package akka.util
|
package akka.util
|
||||||
|
|
||||||
import java.io.{ PrintWriter, StringWriter }
|
import java.io.{ PrintWriter, StringWriter }
|
||||||
|
import java.util.Comparator
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||||
|
|
@ -12,6 +13,20 @@ object Helpers {
|
||||||
|
|
||||||
implicit def null2Option[T](t: T): Option[T] = Option(t)
|
implicit def null2Option[T](t: T): Option[T] = Option(t)
|
||||||
|
|
||||||
|
def compareIdentityHash(a: AnyRef, b: AnyRef): Int = {
|
||||||
|
/*
|
||||||
|
* make sure that there is no overflow or underflow in comparisons, so
|
||||||
|
* that the ordering is actually consistent and you cannot have a
|
||||||
|
* sequence which cyclically is monotone without end.
|
||||||
|
*/
|
||||||
|
val diff = ((System.identityHashCode(a) & 0xffffffffL) - (System.identityHashCode(b) & 0xffffffffL))
|
||||||
|
if (diff > 0) 1 else if (diff < 0) -1 else 0
|
||||||
|
}
|
||||||
|
|
||||||
|
val IdentityHashComparator = new Comparator[AnyRef] {
|
||||||
|
def compare(a: AnyRef, b: AnyRef): Int = compareIdentityHash(a, b)
|
||||||
|
}
|
||||||
|
|
||||||
def intToBytes(value: Int): Array[Byte] = {
|
def intToBytes(value: Int): Array[Byte] = {
|
||||||
val bytes = new Array[Byte](4)
|
val bytes = new Array[Byte](4)
|
||||||
bytes(0) = (value >>> 24).asInstanceOf[Byte]
|
bytes(0) = (value >>> 24).asInstanceOf[Byte]
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue