diff --git a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala index 8ecb1cbb72..1f1c9cae0a 100644 --- a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala @@ -5,11 +5,11 @@ package akka.actor import java.io.ObjectStreamException +import java.util.{ LinkedList ⇒ JLinkedList, Queue ⇒ JQueue } import java.util.concurrent.TimeUnit import java.util.concurrent.locks.ReentrantLock import scala.annotation.tailrec -import scala.collection.mutable.Queue import scala.concurrent.forkjoin.ThreadLocalRandom import akka.actor.dungeon.ChildrenContainer @@ -122,24 +122,26 @@ private[akka] class RepointableActorRef( protected def writeReplace(): AnyRef = SerializedActorRef(path) } -private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl, val self: RepointableActorRef, val props: Props, val supervisor: InternalActorRef, val uid: Int) - extends Cell { +private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl, + val self: RepointableActorRef, + val props: Props, + val supervisor: InternalActorRef, + val uid: Int) extends Cell { /* * This lock protects all accesses to this cell’s queues. It also ensures * safe switching to the started ActorCell. */ - val lock = new ReentrantLock + private[this] final val lock = new ReentrantLock - // use Envelope to keep on-send checks in the same place - val queue: Queue[Envelope] = Queue() - val systemQueue: Queue[SystemMessage] = Queue() - var suspendCount: Int = 0 + // use Envelope to keep on-send checks in the same place ACCESS MUST BE PROTECTED BY THE LOCK + private[this] final val queue: JQueue[Envelope] = new JLinkedList() + private[this] final val systemQueue: JQueue[SystemMessage] = new JLinkedList() + private[this] var suspendCount: Int = 0 - private def timeout = system.settings.UnstartedPushTimeout.duration.toMillis + import systemImpl.settings.UnstartedPushTimeout.{ duration ⇒ timeout } - def replaceWith(cell: Cell): Unit = { - lock.lock() + def replaceWith(cell: Cell): Unit = locked { try { /* * The CallingThreadDispatcher nicely dives under the ReentrantLock and @@ -149,13 +151,13 @@ private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl, val self: Rep * lock, double-tap (well, N-tap, really); concurrent modification is * still not possible because we’re the only thread accessing the queues. */ - while (systemQueue.nonEmpty || queue.nonEmpty) { - while (systemQueue.nonEmpty) { - val msg = systemQueue.dequeue() + while (!systemQueue.isEmpty || !queue.isEmpty) { + while (!systemQueue.isEmpty) { + val msg = systemQueue.poll() cell.sendSystemMessage(msg) } - if (queue.nonEmpty) { - val envelope = queue.dequeue() + if (!queue.isEmpty) { + val envelope = queue.poll() cell.tell(envelope.message, envelope.sender) } } @@ -163,76 +165,67 @@ private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl, val self: Rep self.swapCell(cell) finally try for (_ ← 1 to suspendCount) cell.suspend() - finally - lock.unlock() } def system: ActorSystem = systemImpl - def suspend(): Unit = { - lock.lock() - try suspendCount += 1 - finally lock.unlock() - } - def resume(causedByFailure: Throwable): Unit = { - lock.lock() - try suspendCount -= 1 - finally lock.unlock() - } - def restart(cause: Throwable): Unit = { - lock.lock() - try suspendCount -= 1 - finally lock.unlock() - } + def suspend(): Unit = locked { suspendCount += 1 } + def resume(causedByFailure: Throwable): Unit = locked { suspendCount -= 1 } + def restart(cause: Throwable): Unit = locked { suspendCount -= 1 } def stop(): Unit = sendSystemMessage(Terminate()) def isTerminated: Boolean = false def parent: InternalActorRef = supervisor def childrenRefs: ChildrenContainer = ChildrenContainer.EmptyChildrenContainer def getChildByName(name: String): Option[ChildRestartStats] = None + def tell(message: Any, sender: ActorRef): Unit = { val useSender = if (sender eq Actor.noSender) system.deadLetters else sender - if (lock.tryLock(timeout, TimeUnit.MILLISECONDS)) { + if (lock.tryLock(timeout.length, timeout.unit)) { try { - if (self.underlying eq this) queue enqueue Envelope(message, useSender, system) - else self.underlying.tell(message, useSender) - } finally { - lock.unlock() - } + val cell = self.underlying + if (cell ne this) { + cell.tell(message, useSender) + } else if (!queue.offer(Envelope(message, useSender, system))) { + system.eventStream.publish(Warning(self.path.toString, getClass, "dropping message of type " + message.getClass + " due to enqueue failure")) + system.deadLetters ! DeadLetter(message, useSender, self) + } + } finally lock.unlock() } else { + system.eventStream.publish(Warning(self.path.toString, getClass, "dropping message of type" + message.getClass + " due to lock timeout")) system.deadLetters ! DeadLetter(message, useSender, self) } } - def sendSystemMessage(msg: SystemMessage): Unit = { - if (lock.tryLock(timeout, TimeUnit.MILLISECONDS)) { + + // FIXME: once we have guaranteed delivery of system messages, hook this in! + def sendSystemMessage(msg: SystemMessage): Unit = + if (lock.tryLock(timeout.length, timeout.unit)) { try { - if (self.underlying eq this) systemQueue enqueue msg - else self.underlying.sendSystemMessage(msg) - } finally { - lock.unlock() - } + val cell = self.underlying + if (cell ne this) { + cell.sendSystemMessage(msg) + } else if (!systemQueue.offer(msg)) { + system.eventStream.publish(Warning(self.path.toString, getClass, "dropping system message " + msg + " due to enqueue failure")) + system.deadLetters ! DeadLetter(msg, self, self) + } + } finally lock.unlock() } else { - // FIXME: once we have guaranteed delivery of system messages, hook this in! system.eventStream.publish(Warning(self.path.toString, getClass, "dropping system message " + msg + " due to lock timeout")) system.deadLetters ! DeadLetter(msg, self, self) } - } + def isLocal = true - def hasMessages: Boolean = { - lock.lock() - try { - if (self.underlying eq this) !queue.isEmpty - else self.underlying.hasMessages - } finally { - lock.unlock() - } + def hasMessages: Boolean = locked { + val cell = self.underlying + if (cell eq this) !queue.isEmpty else cell.hasMessages } - def numberOfMessages: Int = { + + def numberOfMessages: Int = locked { + val cell = self.underlying + if (cell eq this) queue.size else cell.numberOfMessages + } + + private[this] final def locked[T](body: ⇒ T): T = { lock.lock() - try { - if (self.underlying eq this) queue.size - else self.underlying.numberOfMessages - } finally { - lock.unlock() - } + try body finally lock.unlock() } } diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 8c3f059a40..d460bed7f3 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -91,7 +91,7 @@ private[akka] class RoutedActorCell(_system: ActorSystemImpl, _ref: InternalActo * `RouterConfig.createRoute` and `Resizer.resize` */ private[akka] def addRoutees(newRoutees: immutable.Iterable[ActorRef]): Unit = { - _routees = _routees ++ newRoutees + _routees ++= newRoutees // subscribe to Terminated messages for all route destinations, to be handled by Router actor newRoutees foreach watch } @@ -107,30 +107,27 @@ private[akka] class RoutedActorCell(_system: ActorSystemImpl, _ref: InternalActo } override def tell(message: Any, sender: ActorRef): Unit = { - resize() - + resize() // Mucho importante val s = if (sender eq null) system.deadLetters else sender - - val msg = message match { - case wrapped: RouterEnvelope ⇒ wrapped.message - case m ⇒ m - } - applyRoute(s, message) match { - case Destination(_, x) :: Nil if x == self ⇒ super.tell(message, s) - case refs ⇒ - refs foreach (p ⇒ - if (p.recipient == self) super.tell(msg, p.sender) - else p.recipient.!(msg)(p.sender)) + case Destination(_, x) :: Nil if x == self ⇒ + super.tell(message, s) + case refs ⇒ refs foreach { p ⇒ + val msg = message match { + case wrapped: RouterEnvelope ⇒ wrapped.message + case m ⇒ m + } + if (p.recipient == self) super.tell(msg, p.sender) + else p.recipient.!(msg)(p.sender) + } } } - def resize(): Unit = { + def resize(): Unit = for (r ← routerConfig.resizer) { if (r.isTimeForResize(resizeCounter.getAndIncrement()) && resizeInProgress.compareAndSet(false, true)) super.tell(Router.Resize, self) } - } } /**