Fix merge conflicts
This commit is contained in:
commit
13488ecd1c
52 changed files with 808 additions and 328 deletions
|
|
@ -191,7 +191,10 @@ private[akka] abstract class Mailbox(val actor: ActorCell) extends MessageQueue
|
|||
var nextMessage = systemDrain()
|
||||
try {
|
||||
while ((nextMessage ne null) && !isClosed) {
|
||||
if (debug) println(actor.self + " processing system message " + nextMessage + " with children " + actor.childrenRefs)
|
||||
if (debug) println(actor.self + " processing system message " + nextMessage + " with " +
|
||||
(if (actor.childrenRefs.isEmpty) "no children"
|
||||
else if (actor.childrenRefs.size > 20) actor.childrenRefs.size + " children"
|
||||
else actor.childrenRefs.mkString("children:\n ", "\n ", "")))
|
||||
actor systemInvoke nextMessage
|
||||
nextMessage = nextMessage.next
|
||||
// don’t ever execute normal message when system message present!
|
||||
|
|
@ -237,15 +240,26 @@ private[akka] abstract class Mailbox(val actor: ActorCell) extends MessageQueue
|
|||
}
|
||||
|
||||
trait MessageQueue {
|
||||
/*
|
||||
* These method need to be implemented in subclasses; they should not rely on the internal stuff above.
|
||||
/**
|
||||
* Try to enqueue the message to this queue, or throw an exception.
|
||||
*/
|
||||
def enqueue(receiver: ActorRef, handle: Envelope)
|
||||
def enqueue(receiver: ActorRef, handle: Envelope): Unit // NOTE: receiver is used only in two places, but cannot be removed
|
||||
|
||||
/**
|
||||
* Try to dequeue the next message from this queue, return null failing that.
|
||||
*/
|
||||
def dequeue(): Envelope
|
||||
|
||||
/**
|
||||
* Should return the current number of messages held in this queue; may
|
||||
* always return 0 if no other value is available efficiently. Do not use
|
||||
* this for testing for presence of messages, use `hasMessages` instead.
|
||||
*/
|
||||
def numberOfMessages: Int
|
||||
|
||||
/**
|
||||
* Indicates whether this queue is non-empty.
|
||||
*/
|
||||
def hasMessages: Boolean
|
||||
}
|
||||
|
||||
|
|
@ -299,22 +313,23 @@ trait QueueBasedMessageQueue extends MessageQueue {
|
|||
}
|
||||
|
||||
trait UnboundedMessageQueueSemantics extends QueueBasedMessageQueue {
|
||||
final def enqueue(receiver: ActorRef, handle: Envelope): Unit = queue add handle
|
||||
final def dequeue(): Envelope = queue.poll()
|
||||
def enqueue(receiver: ActorRef, handle: Envelope): Unit = queue add handle
|
||||
def dequeue(): Envelope = queue.poll()
|
||||
}
|
||||
|
||||
trait BoundedMessageQueueSemantics extends QueueBasedMessageQueue {
|
||||
def pushTimeOut: Duration
|
||||
override def queue: BlockingQueue[Envelope]
|
||||
|
||||
final def enqueue(receiver: ActorRef, handle: Envelope): Unit =
|
||||
if (pushTimeOut.length > 0)
|
||||
def enqueue(receiver: ActorRef, handle: Envelope) {
|
||||
if (pushTimeOut.length > 0) {
|
||||
queue.offer(handle, pushTimeOut.length, pushTimeOut.unit) || {
|
||||
throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + receiver)
|
||||
}
|
||||
else queue put handle
|
||||
} else queue put handle
|
||||
}
|
||||
|
||||
final def dequeue(): Envelope = queue.poll()
|
||||
def dequeue(): Envelope = queue.poll()
|
||||
}
|
||||
|
||||
trait DequeBasedMessageQueue extends QueueBasedMessageQueue {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue