Removing ActorRef.isRunning - replaced in full by isShutdown, if it returns true the actor is forever dead, if it returns false, it might be (race)

This commit is contained in:
Viktor Klang 2011-10-03 19:43:45 +02:00
parent a1593c0419
commit 0f049d67e6
12 changed files with 22 additions and 54 deletions

View file

@ -85,7 +85,7 @@ class ActorFireForgetRequestReplySpec extends WordSpec with MustMatchers with Be
filterEvents(EventFilter[Exception]("Expected")) {
val supervisor = Supervisor(OneForOneStrategy(List(classOf[Exception]), Some(0)))
val actor = actorOf(Props[CrashingActor].withSupervisor(supervisor))
actor.isRunning must be(true)
actor.isShutdown must be(false)
actor ! "Die"
state.finished.await
sleepFor(1 second)

View file

@ -363,7 +363,6 @@ class ActorRefSpec extends WordSpec with MustMatchers with TestKit {
fnull.get must be("null")
awaitCond(ref.isShutdown, 100 millis)
ref.isRunning must be(false)
}
"restart when Kill:ed" in {

View file

@ -100,7 +100,7 @@ class RestartStrategySpec extends JUnitSuite with BeforeAndAfterAll {
(1 to 100) foreach { _ slave ! Crash }
assert(countDownLatch.await(120, TimeUnit.SECONDS))
assert(slave.isRunning)
assert(!slave.isShutdown)
}
@Test
@ -160,7 +160,7 @@ class RestartStrategySpec extends JUnitSuite with BeforeAndAfterAll {
assert(thirdRestartLatch.tryAwait(1, TimeUnit.SECONDS))
assert(slave.isRunning)
assert(!slave.isShutdown)
}
@Test
@ -199,7 +199,7 @@ class RestartStrategySpec extends JUnitSuite with BeforeAndAfterAll {
// test restart and post restart ping
assert(restartLatch.tryAwait(1, TimeUnit.SECONDS))
assert(slave.isRunning)
assert(!slave.isShutdown)
// now crash again... should not restart
slave ! Crash
@ -213,7 +213,7 @@ class RestartStrategySpec extends JUnitSuite with BeforeAndAfterAll {
slave ! Crash
assert(stopLatch.tryAwait(1, TimeUnit.SECONDS))
sleep(500L)
assert(!slave.isRunning)
assert(slave.isShutdown)
}
@Test
@ -251,7 +251,7 @@ class RestartStrategySpec extends JUnitSuite with BeforeAndAfterAll {
// test restart and post restart ping
assert(restartLatch.tryAwait(1, TimeUnit.SECONDS))
assert(slave.isRunning)
assert(!slave.isShutdown)
// now crash again... should not restart
slave ! Crash
@ -267,7 +267,7 @@ class RestartStrategySpec extends JUnitSuite with BeforeAndAfterAll {
assert(maxNoOfRestartsLatch.tryAwait(1, TimeUnit.SECONDS))
sleep(500L)
assert(!slave.isRunning)
assert(slave.isShutdown)
}
}

View file

@ -350,7 +350,7 @@ abstract class ActorModelSpec extends JUnitSuite {
a.stop
b.stop
while (a.isRunning && b.isRunning) {} //Busy wait for termination
while (!a.isShutdown && !b.isShutdown) {} //Busy wait for termination
assertRefDefaultZero(a)(registers = 1, unregisters = 1, msgsReceived = 1, msgsProcessed = 1)
assertRefDefaultZero(b)(registers = 1, unregisters = 1, msgsReceived = 1, msgsProcessed = 1)

View file

@ -30,7 +30,7 @@ class RoutingSpec extends WordSpec with MustMatchers {
val props = RoutedProps(() new DirectRouter, List(actor1))
val actor = Routing.actorOf(props, "foo")
actor.isRunning must be(true)
actor.isShutdown must be(false)
}
"throw IllegalArgumentException at construction when no connections" in {
@ -94,7 +94,7 @@ class RoutingSpec extends WordSpec with MustMatchers {
val props = RoutedProps(() new RoundRobinRouter, List(actor1))
val actor = Routing.actorOf(props, "foo")
actor.isRunning must be(true)
actor.isShutdown must be(false)
}
"throw IllegalArgumentException at construction when no connections" in {
@ -218,7 +218,7 @@ class RoutingSpec extends WordSpec with MustMatchers {
val props = RoutedProps(() new RandomRouter, List(actor1))
val actor = Routing.actorOf(props, "foo")
actor.isRunning must be(true)
actor.isShutdown must be(false)
}
"throw IllegalArgumentException at construction when no connections" in {
@ -231,7 +231,7 @@ class RoutingSpec extends WordSpec with MustMatchers {
}
}
"deliver messages in a random fashion" in {
"deliver messages in a random fashion" ignore {
}

View file

@ -80,7 +80,7 @@ class Ticket1111Spec extends WordSpec with MustMatchers {
.withRouter(() new ScatterGatherFirstCompletedRouter())
val actor = Routing.actorOf(props, "foo")
actor.isRunning must be(true)
actor.isShutdown must be(false)
}

View file

@ -232,7 +232,6 @@ private[akka] class ActorCell(
def dispatcher: MessageDispatcher = props.dispatcher
def isRunning: Boolean = !isShutdown
def isShutdown: Boolean = mailbox.isClosed
@volatile //This must be volatile

View file

@ -117,11 +117,6 @@ abstract class ActorRef extends ActorRefShared with UntypedChannel with ReplyCha
*/
def stop(): Unit
/**
* Is the actor running?
*/
def isRunning: Boolean // TODO remove this method
/**
* Is the actor shut down?
*/
@ -202,14 +197,9 @@ class LocalActorRef private[akka] (
private[this] val actorCell = new ActorCell(this, props, receiveTimeout, hotswap)
actorCell.start()
/**
* Is the actor running?
*/
//FIXME TODO REMOVE THIS, NO REPLACEMENT
def isRunning: Boolean = actorCell.isRunning
/**
* Is the actor shut down?
* If this method returns true, it will never return false again, but if it returns false, you cannot be sure if it's alive still (race condition)
*/
//FIXME TODO RENAME TO isTerminated
def isShutdown: Boolean = actorCell.isShutdown
@ -271,7 +261,7 @@ class LocalActorRef private[akka] (
// @deprecated("This method does a spin-lock to block for the actor, which might never be there, do not use this", "2.0")
protected[akka] def underlyingActorInstance: Actor = {
var instance = actorCell.actor
while ((instance eq null) && actorCell.isRunning) {
while ((instance eq null) && !actorCell.isShutdown) {
try { Thread.sleep(1) } catch { case i: InterruptedException }
instance = actorCell.actor
}
@ -330,8 +320,6 @@ private[akka] case class RemoteActorRef private[akka] (
@volatile
private var running: Boolean = true
def isRunning: Boolean = running
def isShutdown: Boolean = !running
RemoteModule.ensureEnabled()

View file

@ -232,8 +232,6 @@ private[akka] class RoutedActorRef(val routedProps: RoutedProps, val address: St
@volatile
private var running: Boolean = true
def isRunning: Boolean = running
def isShutdown: Boolean = !running
def stop() {

View file

@ -55,14 +55,8 @@ trait ListenerManagement {
val iterator = listeners.iterator
while (iterator.hasNext) {
val listener = iterator.next
// Uncomment if those exceptions are so frequent as to bottleneck
// if (listener.isShutdown) iterator.remove() else
try {
listener ! msg
} catch {
case e: ActorInitializationException
if (listener.isShutdown) iterator.remove()
}
if (listener.isShutdown) iterator.remove()
else listener ! msg
}
}
}
@ -74,7 +68,8 @@ trait ListenerManagement {
val iterator = listeners.iterator
while (iterator.hasNext) {
val listener = iterator.next
if (listener.isRunning) f(listener)
if (listener.isShutdown) iterator.remove()
else f(listener)
}
}
}

View file

@ -266,17 +266,10 @@ private[akka] class AsyncCallbackAdapter(exchange: Exchange, callback: AsyncCall
val address = exchange.getExchangeId
@volatile
private var running: Boolean = false
def isRunning: Boolean = running
private var running: Boolean = true
def isShutdown: Boolean = !running
def start = {
running = true
this
}
def suspend(): Unit = ()
def resume(): Unit = ()
@ -293,7 +286,7 @@ private[akka] class AsyncCallbackAdapter(exchange: Exchange, callback: AsyncCall
* @param message reply message
* @param sender ignored
*/
protected[akka] def postMessageToMailbox(message: Any, channel: UntypedChannel) = {
protected[akka] def postMessageToMailbox(message: Any, channel: UntypedChannel) = if(running) {
message match {
case Ack { /* no response message to set */ }
case msg: Failure exchange.fromFailureMessage(msg)
@ -302,16 +295,13 @@ private[akka] class AsyncCallbackAdapter(exchange: Exchange, callback: AsyncCall
callback.done(false)
}
def dispatcher_=(md: MessageDispatcher): Unit = unsupported
def dispatcher: MessageDispatcher = unsupported
def link(actorRef: ActorRef): ActorRef = unsupported
def unlink(actorRef: ActorRef): ActorRef = unsupported
def shutdownLinkedActors: Unit = unsupported
def supervisor: Option[ActorRef] = unsupported
protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout(message: Any, timeout: Timeout, channel: UntypedChannel) = unsupported
protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = unsupported
protected[akka] def registerSupervisorAsRemoteActor = unsupported
protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit = unsupported
private def unsupported = throw new UnsupportedOperationException("Not supported for %s" format classOf[AsyncCallbackAdapter].getName)
}

View file

@ -162,7 +162,6 @@ class TestActorRefSpec extends WordSpec with MustMatchers with BeforeAndAfterEac
intercept[ActorKilledException] {
(a ? PoisonPill).get
}
a must not be ('running)
a must be('shutdown)
assertThread
}