Merge pull request #1283 from akka/wip-mpsc-√

Wip mpsc √
This commit is contained in:
Viktor Klang (√) 2013-04-03 13:34:33 -07:00
commit e01e629ff9
5 changed files with 179 additions and 86 deletions

View file

@ -11,12 +11,10 @@ import akka.actor.{ ActorCell, ActorRef, Cell, ActorSystem, InternalActorRef, De
import akka.util.{ Unsafe, BoundedBlockingQueue }
import akka.event.Logging.Error
import scala.concurrent.duration.Duration
import scala.concurrent.duration.FiniteDuration
import scala.annotation.tailrec
import scala.util.control.NonFatal
import com.typesafe.config.Config
import scala.concurrent.duration.FiniteDuration
import akka.actor.DeadLetter
/**
* INTERNAL API
*/
@ -194,7 +192,6 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
val s = status
updateStatus(s, s & ~Scheduled) || setAsIdle()
}
/*
* AtomicReferenceFieldUpdater for system queue.
*/
@ -350,6 +347,25 @@ trait MessageQueue {
def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit
}
class NodeMessageQueue extends AbstractNodeQueue[Envelope] with MessageQueue {
final def enqueue(receiver: ActorRef, handle: Envelope): Unit = add(handle)
final def dequeue(): Envelope = poll()
final def numberOfMessages: Int = count()
final def hasMessages: Boolean = !isEmpty()
@tailrec final def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = {
val envelope = dequeue()
if (envelope ne null) {
deadLetters.enqueue(owner, envelope)
cleanUp(owner, deadLetters)
}
}
}
/**
* INTERNAL API
*/
@ -520,6 +536,18 @@ case class UnboundedMailbox() extends MailboxType {
}
}
/**
* SingleConsumerOnlyUnboundedMailbox is a high-performance, multiple producersingle consumer, unbounded MailboxType,
* the only drawback is that you can't have multiple consumers,
* which rules out using it with BalancingDispatcher for instance.
*/
case class SingleConsumerOnlyUnboundedMailbox() extends MailboxType {
def this(settings: ActorSystem.Settings, config: Config) = this()
final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = new NodeMessageQueue()
}
/**
* BoundedMailbox is the default bounded MailboxType used by Akka Actors.
*/