Fixing bug where a dispatcher would shut down the executor service before all tasks were executed, also taking the opportunity to decrease the size per mailbox by atleast 4 bytes
This commit is contained in:
parent
0307a655d3
commit
e40b7cd8d6
7 changed files with 22 additions and 36 deletions
|
|
@ -353,14 +353,8 @@ abstract class ActorModelSpec extends AkkaSpec {
|
|||
val stopLatch = new CountDownLatch(num)
|
||||
val waitTime = (30 seconds).dilated.toMillis
|
||||
val boss = actorOf(Props(context ⇒ {
|
||||
case "run" ⇒
|
||||
for (_ ← 1 to num) {
|
||||
val child = context.actorOf(props)
|
||||
context.self startsMonitoring child
|
||||
child ! cachedMessage
|
||||
}
|
||||
case Terminated(child) ⇒
|
||||
stopLatch.countDown()
|
||||
case "run" ⇒ for (_ ← 1 to num) (context.self startsMonitoring context.actorOf(props)) ! cachedMessage
|
||||
case Terminated(child) ⇒ stopLatch.countDown()
|
||||
}).withDispatcher(wavesSupervisorDispatcher(dispatcher)))
|
||||
boss ! "run"
|
||||
assertCountDown(cachedMessage.latch, waitTime, "Counting down from " + num)
|
||||
|
|
|
|||
|
|
@ -141,8 +141,8 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn
|
|||
class DefaultMailboxSpec extends MailboxSpec {
|
||||
lazy val name = "The default mailbox implementation"
|
||||
def factory = {
|
||||
case u: UnboundedMailbox ⇒ u.create(null, null)
|
||||
case b: BoundedMailbox ⇒ b.create(null, null)
|
||||
case u: UnboundedMailbox ⇒ u.create(null)
|
||||
case b: BoundedMailbox ⇒ b.create(null)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -150,7 +150,7 @@ class PriorityMailboxSpec extends MailboxSpec {
|
|||
val comparator = PriorityGenerator(_.##)
|
||||
lazy val name = "The priority mailbox implementation"
|
||||
def factory = {
|
||||
case UnboundedMailbox() ⇒ UnboundedPriorityMailbox(comparator).create(null, null)
|
||||
case BoundedMailbox(capacity, pushTimeOut) ⇒ BoundedPriorityMailbox(comparator, capacity, pushTimeOut).create(null, null)
|
||||
case UnboundedMailbox() ⇒ UnboundedPriorityMailbox(comparator).create(null)
|
||||
case BoundedMailbox(capacity, pushTimeOut) ⇒ BoundedPriorityMailbox(comparator, capacity, pushTimeOut).create(null)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -212,7 +212,6 @@ class ActorSystem(val name: String, val config: Configuration) extends ActorRefF
|
|||
val deadLetters = new DeadLetterActorRef(this)
|
||||
val deadLetterMailbox = new Mailbox(null) {
|
||||
becomeClosed()
|
||||
override def dispatcher = null //MessageDispatcher.this
|
||||
override def enqueue(receiver: ActorRef, envelope: Envelope) { deadLetters ! DeadLetter(envelope.message, envelope.sender, receiver) }
|
||||
override def dequeue() = null
|
||||
override def systemEnqueue(receiver: ActorRef, handle: SystemMessage) { deadLetters ! DeadLetter(handle, receiver, receiver) }
|
||||
|
|
|
|||
|
|
@ -64,8 +64,6 @@ class BalancingDispatcher(
|
|||
final def numberOfMessages: Int = messageQueue.numberOfMessages
|
||||
|
||||
final def hasMessages: Boolean = messageQueue.hasMessages
|
||||
|
||||
final val dispatcher = BalancingDispatcher.this
|
||||
}
|
||||
|
||||
protected[akka] override def register(actor: ActorCell) = {
|
||||
|
|
|
|||
|
|
@ -98,14 +98,15 @@ class Dispatcher(
|
|||
}
|
||||
}
|
||||
|
||||
protected[akka] def createMailbox(actor: ActorCell): Mailbox = mailboxType.create(this, actor)
|
||||
protected[akka] def createMailbox(actor: ActorCell): Mailbox = mailboxType.create(actor)
|
||||
|
||||
protected[akka] def start {}
|
||||
|
||||
protected[akka] def shutdown {
|
||||
val old = executorService.getAndSet(new LazyExecutorServiceWrapper(executorServiceFactory.createExecutorService))
|
||||
if (old ne null)
|
||||
old.shutdownNow()
|
||||
if (old ne null) {
|
||||
old.shutdown()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -4,12 +4,10 @@
|
|||
package akka.dispatch
|
||||
|
||||
import akka.AkkaException
|
||||
import java.util.{ Comparator, PriorityQueue }
|
||||
import java.util.{ Comparator, PriorityQueue, Queue }
|
||||
import akka.util._
|
||||
import java.util.Queue
|
||||
import akka.actor.{ ActorContext, ActorCell, ActorRef }
|
||||
import akka.actor.{ ActorCell, ActorRef }
|
||||
import java.util.concurrent._
|
||||
import atomic.{ AtomicInteger, AtomicReferenceFieldUpdater }
|
||||
import annotation.tailrec
|
||||
import akka.event.Logging.Error
|
||||
|
||||
|
|
@ -31,7 +29,7 @@ object Mailbox {
|
|||
final val Scheduled = 4
|
||||
|
||||
// mailbox debugging helper using println (see below)
|
||||
// TODO take this out before release
|
||||
// FIXME TODO take this out before release
|
||||
final val debug = false
|
||||
}
|
||||
|
||||
|
|
@ -175,7 +173,6 @@ abstract class Mailbox(val actor: ActorCell) extends AbstractMailbox with Messag
|
|||
do {
|
||||
if (debug) println(actor.self + " processing message " + nextMessage)
|
||||
actor invoke nextMessage
|
||||
|
||||
processAllSystemMessages() //After we're done, process all system messages
|
||||
|
||||
nextMessage = if (isActive) { // If we aren't suspended, we need to make sure we're not overstepping our boundaries
|
||||
|
|
@ -210,7 +207,8 @@ abstract class Mailbox(val actor: ActorCell) extends AbstractMailbox with Messag
|
|||
}
|
||||
}
|
||||
|
||||
def dispatcher: MessageDispatcher
|
||||
@inline
|
||||
final def dispatcher: MessageDispatcher = actor.dispatcher
|
||||
}
|
||||
|
||||
trait MessageQueue {
|
||||
|
|
@ -299,17 +297,16 @@ trait QueueBasedMessageQueue extends MessageQueue {
|
|||
* Mailbox configuration.
|
||||
*/
|
||||
trait MailboxType {
|
||||
def create(dispatcher: MessageDispatcher, receiver: ActorCell): Mailbox
|
||||
def create(receiver: ActorCell): Mailbox
|
||||
}
|
||||
|
||||
/**
|
||||
* It's a case class for Java (new UnboundedMailbox)
|
||||
*/
|
||||
case class UnboundedMailbox() extends MailboxType {
|
||||
override def create(_dispatcher: MessageDispatcher, receiver: ActorCell) =
|
||||
override def create(receiver: ActorCell) =
|
||||
new Mailbox(receiver) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue {
|
||||
final val queue = new ConcurrentLinkedQueue[Envelope]()
|
||||
final val dispatcher = _dispatcher
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -318,19 +315,17 @@ case class BoundedMailbox( final val capacity: Int, final val pushTimeOut: Durat
|
|||
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative")
|
||||
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")
|
||||
|
||||
override def create(_dispatcher: MessageDispatcher, receiver: ActorCell) =
|
||||
override def create(receiver: ActorCell) =
|
||||
new Mailbox(receiver) with QueueBasedMessageQueue with BoundedMessageQueueSemantics with DefaultSystemMessageQueue {
|
||||
final val queue = new LinkedBlockingQueue[Envelope](capacity)
|
||||
final val pushTimeOut = BoundedMailbox.this.pushTimeOut
|
||||
final val dispatcher = _dispatcher
|
||||
}
|
||||
}
|
||||
|
||||
case class UnboundedPriorityMailbox( final val cmp: Comparator[Envelope]) extends MailboxType {
|
||||
override def create(_dispatcher: MessageDispatcher, receiver: ActorCell) =
|
||||
override def create(receiver: ActorCell) =
|
||||
new Mailbox(receiver) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue {
|
||||
final val queue = new PriorityBlockingQueue[Envelope](11, cmp)
|
||||
final val dispatcher = _dispatcher
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -339,11 +334,10 @@ case class BoundedPriorityMailbox( final val cmp: Comparator[Envelope], final va
|
|||
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative")
|
||||
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")
|
||||
|
||||
override def create(_dispatcher: MessageDispatcher, receiver: ActorCell) =
|
||||
override def create(receiver: ActorCell) =
|
||||
new Mailbox(receiver) with QueueBasedMessageQueue with BoundedMessageQueueSemantics with DefaultSystemMessageQueue {
|
||||
final val queue = new BoundedBlockingQueue[Envelope](capacity, new PriorityQueue[Envelope](11, cmp))
|
||||
final val pushTimeOut = BoundedPriorityMailbox.this.pushTimeOut
|
||||
final val dispatcher = _dispatcher
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -106,7 +106,7 @@ private[testkit] object CallingThreadDispatcher {
|
|||
class CallingThreadDispatcher(_app: ActorSystem, val name: String = "calling-thread") extends MessageDispatcher(_app) {
|
||||
import CallingThreadDispatcher._
|
||||
|
||||
protected[akka] override def createMailbox(actor: ActorCell) = new CallingThreadMailbox(this, actor)
|
||||
protected[akka] override def createMailbox(actor: ActorCell) = new CallingThreadMailbox(actor)
|
||||
|
||||
private def getMailbox(actor: ActorCell): Option[CallingThreadMailbox] = actor.mailbox match {
|
||||
case m: CallingThreadMailbox ⇒ Some(m)
|
||||
|
|
@ -257,7 +257,7 @@ class NestingQueue {
|
|||
def isActive = active
|
||||
}
|
||||
|
||||
class CallingThreadMailbox(val dispatcher: MessageDispatcher, _receiver: ActorCell) extends Mailbox(_receiver) with DefaultSystemMessageQueue {
|
||||
class CallingThreadMailbox(_receiver: ActorCell) extends Mailbox(_receiver) with DefaultSystemMessageQueue {
|
||||
|
||||
private val q = new ThreadLocal[NestingQueue]() {
|
||||
override def initialValue = {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue