diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index 450c201a75..40013a1c80 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -232,9 +232,6 @@ abstract class ActorModelSpec extends AkkaSpec { protected def newInterceptedDispatcher: MessageDispatcherInterceptor protected def dispatcherType: String - // BalancingDispatcher of course does not work when another actor is in the pool, so overridden below - protected def wavesSupervisorDispatcher(dispatcher: MessageDispatcher) = dispatcher - "A " + dispatcherType must { "must dynamically handle its own life cycle" in { @@ -347,9 +344,25 @@ abstract class ActorModelSpec extends AkkaSpec { val boss = actorOf(Props(context ⇒ { case "run" ⇒ for (_ ← 1 to num) (context.self startsWatching context.actorOf(props)) ! cachedMessage case Terminated(child) ⇒ stopLatch.countDown() - }).withDispatcher(wavesSupervisorDispatcher(dispatcher))) + }).withDispatcher(system.dispatcherFactory.newPinnedDispatcher("boss"))) boss ! "run" - assertCountDown(cachedMessage.latch, waitTime, "Counting down from " + num) + try { + assertCountDown(cachedMessage.latch, waitTime, "Counting down from " + num) + } catch { + case e ⇒ + val buddies = dispatcher.asInstanceOf[BalancingDispatcher].buddies + val mq = dispatcher.asInstanceOf[BalancingDispatcher].messageQueue + + System.err.println("Buddies left: ") + buddies.toArray foreach { + case cell: ActorCell ⇒ + System.err.println(" - " + cell.self.path + " " + cell.isShutdown + " " + cell.mailbox.status + " " + cell.mailbox.numberOfMessages + " " + SystemMessage.size(cell.mailbox.systemDrain())) + } + + System.err.println("Mailbox: " + mq.numberOfMessages + " " + mq.hasMessages + " ") + + throw e + } assertCountDown(stopLatch, waitTime, "Expected all children to stop") boss.stop() } @@ -451,8 +464,6 @@ class BalancingDispatcherModelSpec extends ActorModelSpec { def dispatcherType = "Balancing Dispatcher" - override def wavesSupervisorDispatcher(dispatcher: MessageDispatcher) = system.dispatcher - "A " + dispatcherType must { "process messages in parallel" in { implicit val dispatcher = newInterceptedDispatcher diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 8a5f0eff75..f43c8fd9f0 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -167,6 +167,7 @@ private[akka] class ActorCell( } } + //Memory consistency is handled by the Mailbox (reading mailbox status then processing messages, then writing mailbox status final def systemInvoke(message: SystemMessage) { def create(): Unit = try { @@ -244,26 +245,23 @@ private[akka] class ActorCell( } try { - val isClosed = mailbox.isClosed //Fence plus volatile read - if (!isClosed) { - if (stopping) message match { - case Terminate() ⇒ terminate() // to allow retry - case _ ⇒ - } - else message match { - case Create() ⇒ create() - case Recreate(cause) ⇒ recreate(cause) - case Link(subject) ⇒ - system.deathWatch.subscribe(self, subject) - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.toString, "now monitoring " + subject)) - case Unlink(subject) ⇒ - system.deathWatch.unsubscribe(self, subject) - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.toString, "stopped monitoring " + subject)) - case Suspend() ⇒ suspend() - case Resume() ⇒ resume() - case Terminate() ⇒ terminate() - case Supervise(child) ⇒ supervise(child) - } + if (stopping) message match { + case Terminate() ⇒ terminate() // to allow retry + case _ ⇒ + } + else message match { + case Create() ⇒ create() + case Recreate(cause) ⇒ recreate(cause) + case Link(subject) ⇒ + system.deathWatch.subscribe(self, subject) + if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.toString, "now monitoring " + subject)) + case Unlink(subject) ⇒ + system.deathWatch.unsubscribe(self, subject) + if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.toString, "stopped monitoring " + subject)) + case Suspend() ⇒ suspend() + case Resume() ⇒ resume() + case Terminate() ⇒ terminate() + case Supervise(child) ⇒ supervise(child) } } catch { case e ⇒ //Should we really catch everything here? @@ -273,50 +271,48 @@ private[akka] class ActorCell( } } + //Memory consistency is handled by the Mailbox (reading mailbox status then processing messages, then writing mailbox status final def invoke(messageHandle: Envelope) { try { - val isClosed = mailbox.isClosed //Fence plus volatile read - if (!isClosed) { - currentMessage = messageHandle + currentMessage = messageHandle + try { try { - try { - cancelReceiveTimeout() // FIXME: leave this here? - messageHandle.message match { - case msg: AutoReceivedMessage ⇒ autoReceiveMessage(messageHandle) - case msg ⇒ - if (stopping) { - // receiving Terminated in response to stopping children is too common to generate noise - if (!msg.isInstanceOf[Terminated]) system.deadLetterMailbox.enqueue(self, messageHandle) - } else { - actor(msg) - } - } - currentMessage = null // reset current message after successful invocation - } catch { - case e ⇒ - system.eventStream.publish(Error(e, self.toString, e.getMessage)) - - // prevent any further messages to be processed until the actor has been restarted - dispatcher.suspend(this) - - // make sure that InterruptedException does not leave this thread - if (e.isInstanceOf[InterruptedException]) { - val ex = ActorInterruptedException(e) - props.faultHandler.handleSupervisorFailing(self, children) - parent.tell(Failed(ex), self) - throw e //Re-throw InterruptedExceptions as expected + cancelReceiveTimeout() // FIXME: leave this here? + messageHandle.message match { + case msg: AutoReceivedMessage ⇒ autoReceiveMessage(messageHandle) + case msg ⇒ + if (stopping) { + // receiving Terminated in response to stopping children is too common to generate noise + if (!msg.isInstanceOf[Terminated]) system.deadLetterMailbox.enqueue(self, messageHandle) } else { - props.faultHandler.handleSupervisorFailing(self, children) - parent.tell(Failed(e), self) + actor(msg) } - } finally { - checkReceiveTimeout // Reschedule receive timeout } + currentMessage = null // reset current message after successful invocation } catch { case e ⇒ system.eventStream.publish(Error(e, self.toString, e.getMessage)) - throw e + + // prevent any further messages to be processed until the actor has been restarted + dispatcher.suspend(this) + + // make sure that InterruptedException does not leave this thread + if (e.isInstanceOf[InterruptedException]) { + val ex = ActorInterruptedException(e) + props.faultHandler.handleSupervisorFailing(self, children) + parent.tell(Failed(ex), self) + throw e //Re-throw InterruptedExceptions as expected + } else { + props.faultHandler.handleSupervisorFailing(self, children) + parent.tell(Failed(e), self) + } + } finally { + checkReceiveTimeout // Reschedule receive timeout } + } catch { + case e ⇒ + system.eventStream.publish(Error(e, self.toString, e.getMessage)) + throw e } } } diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index aa3edd69ac..6eb5b2c609 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -236,10 +236,8 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext */ def resume(actor: ActorCell): Unit = { val mbox = actor.mailbox - if (mbox.dispatcher eq this) { - mbox.becomeOpen() + if ((mbox.dispatcher eq this) && mbox.becomeOpen()) registerForExecution(mbox, false, false) - } } /** diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index 1266dcde96..b30de4a102 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -12,6 +12,7 @@ import annotation.tailrec import akka.actor.ActorSystem import akka.event.EventStream import akka.actor.Scheduler +import java.util.concurrent.atomic.AtomicBoolean /** * An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed @@ -39,9 +40,10 @@ class BalancingDispatcher( _timeoutMs: Long) extends Dispatcher(_prerequisites, _name, throughput, throughputDeadlineTime, mailboxType, config, _timeoutMs) { - private val buddies = new ConcurrentSkipListSet[ActorCell](akka.util.Helpers.IdentityHashComparator) + val buddies = new ConcurrentSkipListSet[ActorCell](akka.util.Helpers.IdentityHashComparator) + val rebalance = new AtomicBoolean(false) - protected val messageQueue: MessageQueue = mailboxType match { + val messageQueue: MessageQueue = mailboxType match { case u: UnboundedMailbox ⇒ new QueueBasedMessageQueue with UnboundedMessageQueueSemantics { final val queue = new ConcurrentLinkedQueue[Envelope] } @@ -66,13 +68,13 @@ class BalancingDispatcher( protected[akka] override def register(actor: ActorCell) = { super.register(actor) - registerForExecution(actor.mailbox, false, false) //Allow newcomers to be productive from the first moment + buddies.add(actor) } protected[akka] override def unregister(actor: ActorCell) = { - super.unregister(actor) buddies.remove(actor) - intoTheFray(except = actor) + super.unregister(actor) + intoTheFray(except = actor) //When someone leaves, he tosses a friend into the fray } protected override def cleanUpMailboxFor(actor: ActorCell, mailBox: Mailbox) { @@ -88,29 +90,27 @@ class BalancingDispatcher( } } - protected[akka] override def registerForExecution(mbox: Mailbox, hasMessagesHint: Boolean, hasSystemMessagesHint: Boolean): Boolean = { - if (!super.registerForExecution(mbox, hasMessagesHint, hasSystemMessagesHint)) { - mbox match { - case share: SharingMailbox if !share.isClosed ⇒ buddies.add(share.actor); false - case _ ⇒ false - } - } else true - } + def intoTheFray(except: ActorCell): Unit = + if (rebalance.compareAndSet(false, true)) { + try { + val i = buddies.iterator() - def intoTheFray(except: ActorCell): Unit = { - var buddy = buddies.pollFirst() - while (buddy ne null) { - val mbox = buddy.mailbox - buddy = if ((buddy eq except) || (!registerForExecution(mbox, false, false) && mbox.isClosed)) buddies.pollFirst() else null + @tailrec + def throwIn(): Unit = { + val n = if (i.hasNext) i.next() else null + if (n eq null) () + else if ((n ne except) && registerForExecution(n.mailbox, false, false)) () + else throwIn() + } + throwIn() + } finally { + rebalance.set(false) + } } - } override protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope) = { messageQueue.enqueue(receiver.self, invocation) - + registerForExecution(receiver.mailbox, false, false) intoTheFray(except = receiver) - - if (!registerForExecution(receiver.mailbox, false, false)) - intoTheFray(except = receiver) } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala index e8f799c414..5bda68850e 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala @@ -107,20 +107,16 @@ class Dispatcher( protected[akka] def createMailbox(actor: ActorCell): Mailbox = mailboxType.create(actor) - protected[akka] def shutdown { - executorService.getAndSet(new ExecutorServiceDelegate { + protected[akka] def shutdown: Unit = + Option(executorService.getAndSet(new ExecutorServiceDelegate { lazy val executor = executorServiceFactory.createExecutorService - }) match { - case null ⇒ - case some ⇒ some.shutdown() - } - } + })) foreach { _.shutdown() } /** * Returns if it was registered */ protected[akka] override def registerForExecution(mbox: Mailbox, hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = { - if (mbox.shouldBeScheduledForExecution(hasMessageHint, hasSystemMessageHint)) { //This needs to be here to ensure thread safety and no races + if (mbox.canBeScheduledForExecution(hasMessageHint, hasSystemMessageHint)) { //This needs to be here to ensure thread safety and no races if (mbox.setAsScheduled()) { try { executorService.get() execute mbox diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index ddbebdf3ef..6f268f7e9a 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -128,15 +128,20 @@ abstract class Mailbox(val actor: ActorCell) extends AbstractMailbox with Messag protected final def systemQueueGet: SystemMessage = AbstractMailbox.systemQueueUpdater.get(this) protected final def systemQueuePut(_old: SystemMessage, _new: SystemMessage): Boolean = AbstractMailbox.systemQueueUpdater.compareAndSet(this, _old, _new) - def shouldBeScheduledForExecution(hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = status match { + final def canBeScheduledForExecution(hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = status match { case Open | Scheduled ⇒ hasMessageHint || hasSystemMessageHint || hasSystemMessages || hasMessages case Closed ⇒ false case _ ⇒ hasSystemMessageHint || hasSystemMessages } final def run = { - try processMailbox() finally { - setAsIdle() + try { + if (!isClosed) { //Volatile read, needed here + processAllSystemMessages() //First, deal with any system messages + processMailbox() //Then deal with messages + } + } finally { + setAsIdle() //Volatile write, needed here dispatcher.registerForExecution(this, false, false) } } @@ -146,9 +151,7 @@ abstract class Mailbox(val actor: ActorCell) extends AbstractMailbox with Messag * * @return true if the processing finished before the mailbox was empty, due to the throughput constraint */ - final def processMailbox() { - processAllSystemMessages() //First, process all system messages - + private final def processMailbox() { if (shouldProcessMessage) { var nextMessage = dequeue() if (nextMessage ne null) { //If we have a message @@ -175,7 +178,7 @@ abstract class Mailbox(val actor: ActorCell) extends AbstractMailbox with Messag } } - def processAllSystemMessages() { + final def processAllSystemMessages() { var nextMessage = systemDrain() try { while (nextMessage ne null) { diff --git a/akka-actor/src/main/scala/akka/util/Helpers.scala b/akka-actor/src/main/scala/akka/util/Helpers.scala index 9cba2c5f3d..2b9f59d757 100644 --- a/akka-actor/src/main/scala/akka/util/Helpers.scala +++ b/akka-actor/src/main/scala/akka/util/Helpers.scala @@ -26,19 +26,6 @@ object Helpers { def compare(a: AnyRef, b: AnyRef): Int = compareIdentityHash(a, b) } - def intToBytes(value: Int): Array[Byte] = { - val bytes = new Array[Byte](4) - bytes(0) = (value >>> 24).asInstanceOf[Byte] - bytes(1) = (value >>> 16).asInstanceOf[Byte] - bytes(2) = (value >>> 8).asInstanceOf[Byte] - bytes(3) = value.asInstanceOf[Byte] - bytes - } - - def bytesToInt(bytes: Array[Byte], offset: Int): Int = { - (0 until 4).foldLeft(0)((value, index) ⇒ value + ((bytes(index + offset) & 0x000000FF) << ((4 - 1 - index) * 8))) - } - final val base64chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789*?" @tailrec