Attempting to hunt down and find the race in the RoutingSpec

This commit is contained in:
Viktor Klang 2012-11-13 20:07:33 +01:00
parent 6a348e3c76
commit cbd301247a
2 changed files with 69 additions and 79 deletions

View file

@ -5,11 +5,11 @@
package akka.actor
import java.io.ObjectStreamException
import java.util.{ LinkedList JLinkedList, Queue JQueue }
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
import scala.annotation.tailrec
import scala.collection.mutable.Queue
import scala.concurrent.forkjoin.ThreadLocalRandom
import akka.actor.dungeon.ChildrenContainer
@ -122,24 +122,26 @@ private[akka] class RepointableActorRef(
protected def writeReplace(): AnyRef = SerializedActorRef(path)
}
private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl, val self: RepointableActorRef, val props: Props, val supervisor: InternalActorRef, val uid: Int)
extends Cell {
private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl,
val self: RepointableActorRef,
val props: Props,
val supervisor: InternalActorRef,
val uid: Int) extends Cell {
/*
* This lock protects all accesses to this cells queues. It also ensures
* safe switching to the started ActorCell.
*/
val lock = new ReentrantLock
private[this] final val lock = new ReentrantLock
// use Envelope to keep on-send checks in the same place
val queue: Queue[Envelope] = Queue()
val systemQueue: Queue[SystemMessage] = Queue()
var suspendCount: Int = 0
// use Envelope to keep on-send checks in the same place ACCESS MUST BE PROTECTED BY THE LOCK
private[this] final val queue: JQueue[Envelope] = new JLinkedList()
private[this] final val systemQueue: JQueue[SystemMessage] = new JLinkedList()
private[this] var suspendCount: Int = 0
private def timeout = system.settings.UnstartedPushTimeout.duration.toMillis
import systemImpl.settings.UnstartedPushTimeout.{ duration timeout }
def replaceWith(cell: Cell): Unit = {
lock.lock()
def replaceWith(cell: Cell): Unit = locked {
try {
/*
* The CallingThreadDispatcher nicely dives under the ReentrantLock and
@ -149,13 +151,13 @@ private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl, val self: Rep
* lock, double-tap (well, N-tap, really); concurrent modification is
* still not possible because were the only thread accessing the queues.
*/
while (systemQueue.nonEmpty || queue.nonEmpty) {
while (systemQueue.nonEmpty) {
val msg = systemQueue.dequeue()
while (!systemQueue.isEmpty || !queue.isEmpty) {
while (!systemQueue.isEmpty) {
val msg = systemQueue.poll()
cell.sendSystemMessage(msg)
}
if (queue.nonEmpty) {
val envelope = queue.dequeue()
if (!queue.isEmpty) {
val envelope = queue.poll()
cell.tell(envelope.message, envelope.sender)
}
}
@ -163,76 +165,67 @@ private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl, val self: Rep
self.swapCell(cell)
finally try
for (_ 1 to suspendCount) cell.suspend()
finally
lock.unlock()
}
def system: ActorSystem = systemImpl
def suspend(): Unit = {
lock.lock()
try suspendCount += 1
finally lock.unlock()
}
def resume(causedByFailure: Throwable): Unit = {
lock.lock()
try suspendCount -= 1
finally lock.unlock()
}
def restart(cause: Throwable): Unit = {
lock.lock()
try suspendCount -= 1
finally lock.unlock()
}
def suspend(): Unit = locked { suspendCount += 1 }
def resume(causedByFailure: Throwable): Unit = locked { suspendCount -= 1 }
def restart(cause: Throwable): Unit = locked { suspendCount -= 1 }
def stop(): Unit = sendSystemMessage(Terminate())
def isTerminated: Boolean = false
def parent: InternalActorRef = supervisor
def childrenRefs: ChildrenContainer = ChildrenContainer.EmptyChildrenContainer
def getChildByName(name: String): Option[ChildRestartStats] = None
def tell(message: Any, sender: ActorRef): Unit = {
val useSender = if (sender eq Actor.noSender) system.deadLetters else sender
if (lock.tryLock(timeout, TimeUnit.MILLISECONDS)) {
if (lock.tryLock(timeout.length, timeout.unit)) {
try {
if (self.underlying eq this) queue enqueue Envelope(message, useSender, system)
else self.underlying.tell(message, useSender)
} finally {
lock.unlock()
}
val cell = self.underlying
if (cell ne this) {
cell.tell(message, useSender)
} else if (!queue.offer(Envelope(message, useSender, system))) {
system.eventStream.publish(Warning(self.path.toString, getClass, "dropping message of type " + message.getClass + " due to enqueue failure"))
system.deadLetters ! DeadLetter(message, useSender, self)
}
} finally lock.unlock()
} else {
system.eventStream.publish(Warning(self.path.toString, getClass, "dropping message of type" + message.getClass + " due to lock timeout"))
system.deadLetters ! DeadLetter(message, useSender, self)
}
}
def sendSystemMessage(msg: SystemMessage): Unit = {
if (lock.tryLock(timeout, TimeUnit.MILLISECONDS)) {
// FIXME: once we have guaranteed delivery of system messages, hook this in!
def sendSystemMessage(msg: SystemMessage): Unit =
if (lock.tryLock(timeout.length, timeout.unit)) {
try {
if (self.underlying eq this) systemQueue enqueue msg
else self.underlying.sendSystemMessage(msg)
} finally {
lock.unlock()
}
val cell = self.underlying
if (cell ne this) {
cell.sendSystemMessage(msg)
} else if (!systemQueue.offer(msg)) {
system.eventStream.publish(Warning(self.path.toString, getClass, "dropping system message " + msg + " due to enqueue failure"))
system.deadLetters ! DeadLetter(msg, self, self)
}
} finally lock.unlock()
} else {
// FIXME: once we have guaranteed delivery of system messages, hook this in!
system.eventStream.publish(Warning(self.path.toString, getClass, "dropping system message " + msg + " due to lock timeout"))
system.deadLetters ! DeadLetter(msg, self, self)
}
}
def isLocal = true
def hasMessages: Boolean = {
lock.lock()
try {
if (self.underlying eq this) !queue.isEmpty
else self.underlying.hasMessages
} finally {
lock.unlock()
}
def hasMessages: Boolean = locked {
val cell = self.underlying
if (cell eq this) !queue.isEmpty else cell.hasMessages
}
def numberOfMessages: Int = {
def numberOfMessages: Int = locked {
val cell = self.underlying
if (cell eq this) queue.size else cell.numberOfMessages
}
private[this] final def locked[T](body: T): T = {
lock.lock()
try {
if (self.underlying eq this) queue.size
else self.underlying.numberOfMessages
} finally {
lock.unlock()
}
try body finally lock.unlock()
}
}

View file

@ -91,7 +91,7 @@ private[akka] class RoutedActorCell(_system: ActorSystemImpl, _ref: InternalActo
* `RouterConfig.createRoute` and `Resizer.resize`
*/
private[akka] def addRoutees(newRoutees: immutable.Iterable[ActorRef]): Unit = {
_routees = _routees ++ newRoutees
_routees ++= newRoutees
// subscribe to Terminated messages for all route destinations, to be handled by Router actor
newRoutees foreach watch
}
@ -107,30 +107,27 @@ private[akka] class RoutedActorCell(_system: ActorSystemImpl, _ref: InternalActo
}
override def tell(message: Any, sender: ActorRef): Unit = {
resize()
resize() // Mucho importante
val s = if (sender eq null) system.deadLetters else sender
val msg = message match {
case wrapped: RouterEnvelope wrapped.message
case m m
}
applyRoute(s, message) match {
case Destination(_, x) :: Nil if x == self super.tell(message, s)
case refs
refs foreach (p
if (p.recipient == self) super.tell(msg, p.sender)
else p.recipient.!(msg)(p.sender))
case Destination(_, x) :: Nil if x == self
super.tell(message, s)
case refs refs foreach { p
val msg = message match {
case wrapped: RouterEnvelope wrapped.message
case m m
}
if (p.recipient == self) super.tell(msg, p.sender)
else p.recipient.!(msg)(p.sender)
}
}
}
def resize(): Unit = {
def resize(): Unit =
for (r routerConfig.resizer) {
if (r.isTimeForResize(resizeCounter.getAndIncrement()) && resizeInProgress.compareAndSet(false, true))
super.tell(Router.Resize, self)
}
}
}
/**