Removing blocking dequeues from MailboxConfig due to high risk and no gain

This commit is contained in:
Viktor Klang 2011-04-27 12:21:19 +02:00
parent 3ae80d971c
commit 5068f0d48a
6 changed files with 41 additions and 78 deletions

View file

@ -24,7 +24,7 @@ abstract class MailboxSpec extends
name should {
"create a !blockDequeue && unbounded mailbox" in {
val config = UnboundedMailbox(false)
val config = UnboundedMailbox()
val q = factory(config)
ensureInitialMailboxState(config, q)
@ -37,8 +37,8 @@ abstract class MailboxSpec extends
f.await.resultOrException must be === Some(null)
}
"create a !blockDequeue and bounded mailbox with 10 capacity and with push timeout" in {
val config = BoundedMailbox(false, 10, Duration(10,TimeUnit.MILLISECONDS))
"create a bounded mailbox with 10 capacity and with push timeout" in {
val config = BoundedMailbox(10, Duration(10,TimeUnit.MILLISECONDS))
val q = factory(config)
ensureInitialMailboxState(config, q)
@ -59,30 +59,16 @@ abstract class MailboxSpec extends
}
"dequeue what was enqueued properly for unbounded mailboxes" in {
testEnqueueDequeue(UnboundedMailbox(false))
testEnqueueDequeue(UnboundedMailbox())
}
"dequeue what was enqueued properly for bounded mailboxes" in {
testEnqueueDequeue(BoundedMailbox(false, 10000, Duration(-1, TimeUnit.MILLISECONDS)))
testEnqueueDequeue(BoundedMailbox(10000, Duration(-1, TimeUnit.MILLISECONDS)))
}
"dequeue what was enqueued properly for bounded mailboxes with pushTimeout" in {
testEnqueueDequeue(BoundedMailbox(false, 10000, Duration(100, TimeUnit.MILLISECONDS)))
testEnqueueDequeue(BoundedMailbox(10000, Duration(100, TimeUnit.MILLISECONDS)))
}
/** FIXME Adapt test so it works with the last dequeue
"dequeue what was enqueued properly for unbounded mailboxes with blockDeque" in {
testEnqueueDequeue(UnboundedMailbox(true))
}
"dequeue what was enqueued properly for bounded mailboxes with blockDeque" in {
testEnqueueDequeue(BoundedMailbox(true, 1000, Duration(-1, TimeUnit.MILLISECONDS)))
}
"dequeue what was enqueued properly for bounded mailboxes with blockDeque and pushTimeout" in {
testEnqueueDequeue(BoundedMailbox(true, 1000, Duration(100, TimeUnit.MILLISECONDS)))
}*/
}
//CANDIDATE FOR TESTKIT
@ -111,8 +97,8 @@ abstract class MailboxSpec extends
q match {
case aQueue: BlockingQueue[_] =>
config match {
case BoundedMailbox(_,capacity,_) => aQueue.remainingCapacity must be === capacity
case UnboundedMailbox(_) => aQueue.remainingCapacity must be === Int.MaxValue
case BoundedMailbox(capacity,_) => aQueue.remainingCapacity must be === capacity
case UnboundedMailbox() => aQueue.remainingCapacity must be === Int.MaxValue
}
case _ =>
}
@ -165,10 +151,8 @@ abstract class MailboxSpec extends
class DefaultMailboxSpec extends MailboxSpec {
lazy val name = "The default mailbox implementation"
def factory = {
case UnboundedMailbox(blockDequeue) =>
new DefaultUnboundedMessageQueue(blockDequeue)
case BoundedMailbox(blocking, capacity, pushTimeOut) =>
new DefaultBoundedMessageQueue(capacity, pushTimeOut, blocking)
case UnboundedMailbox() => new DefaultUnboundedMessageQueue()
case BoundedMailbox(capacity, pushTimeOut) => new DefaultBoundedMessageQueue(capacity, pushTimeOut)
}
}
@ -176,9 +160,7 @@ class PriorityMailboxSpec extends MailboxSpec {
val comparator = PriorityGenerator(_.##)
lazy val name = "The priority mailbox implementation"
def factory = {
case UnboundedMailbox(blockDequeue) =>
new UnboundedPriorityMessageQueue(blockDequeue, comparator)
case BoundedMailbox(blocking, capacity, pushTimeOut) =>
new BoundedPriorityMessageQueue(capacity, pushTimeOut, blocking, comparator)
case UnboundedMailbox() => new UnboundedPriorityMessageQueue(comparator)
case BoundedMailbox(capacity, pushTimeOut) => new BoundedPriorityMessageQueue(capacity, pushTimeOut, comparator)
}
}

View file

@ -10,11 +10,11 @@ class PriorityDispatcherSpec extends WordSpec with MustMatchers {
"A PriorityExecutorBasedEventDrivenDispatcher" must {
"Order it's messages according to the specified comparator using an unbounded mailbox" in {
testOrdering(UnboundedMailbox(false))
testOrdering(UnboundedMailbox())
}
"Order it's messages according to the specified comparator using a bounded mailbox" in {
testOrdering(BoundedMailbox(false,1000))
testOrdering(BoundedMailbox(1000))
}
}

View file

@ -117,20 +117,14 @@ class ExecutorBasedEventDrivenDispatcher(
def createMailbox(actorRef: ActorRef): AnyRef = mailboxType match {
case b: UnboundedMailbox =>
if (b.blocking) {
new DefaultUnboundedMessageQueue(true) with ExecutableMailbox {
final def dispatcher = ExecutorBasedEventDrivenDispatcher.this
}
} else { //If we have an unbounded, non-blocking mailbox, we can go lockless
new ConcurrentLinkedQueue[MessageInvocation] with MessageQueue with ExecutableMailbox {
final def dispatcher = ExecutorBasedEventDrivenDispatcher.this
final def enqueue(m: MessageInvocation) = this.add(m)
final def dequeue(): MessageInvocation = this.poll()
}
@inline final def dispatcher = ExecutorBasedEventDrivenDispatcher.this
@inline final def enqueue(m: MessageInvocation) = this.add(m)
@inline final def dequeue(): MessageInvocation = this.poll()
}
case b: BoundedMailbox =>
new DefaultBoundedMessageQueue(b.capacity, b.pushTimeOut, b.blocking) with ExecutableMailbox {
final def dispatcher = ExecutorBasedEventDrivenDispatcher.this
new DefaultBoundedMessageQueue(b.capacity, b.pushTimeOut) with ExecutableMailbox {
@inline final def dispatcher = ExecutorBasedEventDrivenDispatcher.this
}
}
@ -294,13 +288,13 @@ trait PriorityMailbox { self: ExecutorBasedEventDrivenDispatcher =>
override def createMailbox(actorRef: ActorRef): AnyRef = self.mailboxType match {
case b: UnboundedMailbox =>
new UnboundedPriorityMessageQueue(b.blocking, comparator) with ExecutableMailbox {
final def dispatcher = self
new UnboundedPriorityMessageQueue(comparator) with ExecutableMailbox {
@inline final def dispatcher = self
}
case b: BoundedMailbox =>
new BoundedPriorityMessageQueue(b.capacity, b.pushTimeOut, b.blocking, comparator) with ExecutableMailbox {
final def dispatcher = self
new BoundedPriorityMessageQueue(b.capacity, b.pushTimeOut, comparator) with ExecutableMailbox {
@inline final def dispatcher = self
}
}
}

View file

@ -30,9 +30,8 @@ trait MessageQueue {
*/
sealed trait MailboxType
case class UnboundedMailbox(val blocking: Boolean = false) extends MailboxType
case class UnboundedMailbox() extends MailboxType
case class BoundedMailbox(
val blocking: Boolean = false,
val capacity: Int = { if (Dispatchers.MAILBOX_CAPACITY < 0) Int.MaxValue else Dispatchers.MAILBOX_CAPACITY },
val pushTimeOut: Duration = Dispatchers.MAILBOX_PUSH_TIME_OUT) extends MailboxType {
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative")
@ -40,46 +39,35 @@ case class BoundedMailbox(
}
trait UnboundedMessageQueueSemantics extends MessageQueue { self: BlockingQueue[MessageInvocation] =>
def blockDequeue: Boolean
final def enqueue(handle: MessageInvocation) {
this add handle
}
final def dequeue(): MessageInvocation = {
if (blockDequeue) this.take()
else this.poll()
}
@inline final def enqueue(handle: MessageInvocation): Unit = this add handle
@inline final def dequeue(): MessageInvocation = this.poll()
}
trait BoundedMessageQueueSemantics extends MessageQueue { self: BlockingQueue[MessageInvocation] =>
def blockDequeue: Boolean
def pushTimeOut: Duration
final def enqueue(handle: MessageInvocation) {
if (pushTimeOut.length > 0 && pushTimeOut.toMillis > 0) {
if (!this.offer(handle, pushTimeOut.length, pushTimeOut.unit))
throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + toString)
if (pushTimeOut.length > 0) {
this.offer(handle, pushTimeOut.length, pushTimeOut.unit) || {
throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + toString) }
} else this put handle
}
final def dequeue(): MessageInvocation =
if (blockDequeue) this.take()
else this.poll()
@inline final def dequeue(): MessageInvocation = this.poll()
}
class DefaultUnboundedMessageQueue(val blockDequeue: Boolean) extends
class DefaultUnboundedMessageQueue extends
LinkedBlockingQueue[MessageInvocation] with
UnboundedMessageQueueSemantics
class DefaultBoundedMessageQueue(capacity: Int, val pushTimeOut: Duration, val blockDequeue: Boolean) extends
class DefaultBoundedMessageQueue(capacity: Int, val pushTimeOut: Duration) extends
LinkedBlockingQueue[MessageInvocation](capacity) with
BoundedMessageQueueSemantics
class UnboundedPriorityMessageQueue(val blockDequeue: Boolean, cmp: Comparator[MessageInvocation]) extends
class UnboundedPriorityMessageQueue(cmp: Comparator[MessageInvocation]) extends
PriorityBlockingQueue[MessageInvocation](11, cmp) with
UnboundedMessageQueueSemantics
class BoundedPriorityMessageQueue(capacity: Int, val pushTimeOut: Duration, val blockDequeue: Boolean, cmp: Comparator[MessageInvocation]) extends
class BoundedPriorityMessageQueue(capacity: Int, val pushTimeOut: Duration, cmp: Comparator[MessageInvocation]) extends
BoundedBlockingQueue[MessageInvocation](capacity, new PriorityQueue[MessageInvocation](11, cmp)) with
BoundedMessageQueueSemantics

View file

@ -221,9 +221,8 @@ abstract class MessageDispatcherConfigurator {
def mailboxType(config: Configuration): MailboxType = {
val capacity = config.getInt("mailbox-capacity", Dispatchers.MAILBOX_CAPACITY)
// FIXME how do we read in isBlocking for mailbox? Now set to 'false'.
if (capacity < 1) UnboundedMailbox()
else BoundedMailbox(false, capacity, Duration(config.getInt("mailbox-push-timeout-time", Dispatchers.MAILBOX_PUSH_TIME_OUT.toMillis.toInt), TIME_UNIT))
else BoundedMailbox(capacity, Duration(config.getInt("mailbox-push-timeout-time", Dispatchers.MAILBOX_PUSH_TIME_OUT.toMillis.toInt), TIME_UNIT))
}
def configureThreadPool(config: Configuration, createDispatcher: => (ThreadPoolConfig) => MessageDispatcher): ThreadPoolConfigDispatcherBuilder = {

View file

@ -25,13 +25,13 @@ class ThreadBasedDispatcher(_actor: ActorRef, _mailboxType: MailboxType)
private[akka] val owner = new AtomicReference[ActorRef](_actor)
def this(actor: ActorRef) =
this(actor, UnboundedMailbox(true)) // For Java API
this(actor, UnboundedMailbox()) // For Java API
def this(actor: ActorRef, capacity: Int) =
this(actor, BoundedMailbox(true, capacity)) //For Java API
this(actor, BoundedMailbox(capacity)) //For Java API
def this(actor: ActorRef, capacity: Int, pushTimeOut: Duration) = //For Java API
this(actor, BoundedMailbox(true, capacity, pushTimeOut))
this(actor, BoundedMailbox(capacity, pushTimeOut))
override def register(actorRef: ActorRef) = {
val actor = owner.get()