diff --git a/akka-actor/src/main/scala/actor/ActorRef.scala b/akka-actor/src/main/scala/actor/ActorRef.scala index 4905e62670..cddbb92fc3 100644 --- a/akka-actor/src/main/scala/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/actor/ActorRef.scala @@ -28,6 +28,17 @@ import java.lang.reflect.Field import scala.reflect.BeanProperty + +object ActorRefStatus { + /** LifeCycles for ActorRefs + */ + private[akka] sealed trait StatusType + object UNSTARTED extends StatusType + object RUNNING extends StatusType + object BEING_RESTARTED extends StatusType + object SHUTDOWN extends StatusType +} + /** * ActorRef is an immutable and serializable handle to an Actor. *

@@ -68,9 +79,7 @@ trait ActorRef extends // Only mutable for RemoteServer in order to maintain identity across nodes @volatile protected[akka] var _uuid = UUID.newUuid.toString - @volatile protected[this] var _isRunning = false - @volatile protected[this] var _isShutDown = false - @volatile protected[akka] var _isBeingRestarted = false + @volatile protected[this] var _status: ActorRefStatus.StatusType = ActorRefStatus.UNSTARTED @volatile protected[akka] var _homeAddress = new InetSocketAddress(RemoteServerModule.HOSTNAME, RemoteServerModule.PORT) @volatile protected[akka] var _futureTimeout: Option[ScheduledFuture[AnyRef]] = None @volatile protected[akka] var registeredInRemoteNodeDuringSerialization = false @@ -229,17 +238,25 @@ trait ActorRef extends /** * Is the actor being restarted? */ - def isBeingRestarted: Boolean = _isBeingRestarted + def isBeingRestarted: Boolean = _status == ActorRefStatus.BEING_RESTARTED /** * Is the actor running? */ - def isRunning: Boolean = _isRunning + def isRunning: Boolean = _status match { + case ActorRefStatus.BEING_RESTARTED | ActorRefStatus.RUNNING => true + case _ => false + } /** * Is the actor shut down? */ - def isShutdown: Boolean = _isShutDown + def isShutdown: Boolean = _status == ActorRefStatus.SHUTDOWN + + /** + * Is the actor ever started? + */ + def isUnstarted: Boolean = _status == ActorRefStatus.UNSTARTED /** * Is the actor able to handle the message passed in as arguments? @@ -800,7 +817,7 @@ class LocalActorRef private[akka]( if (isTransactor) { _transactionFactory = Some(TransactionFactory(_transactionConfig, id)) } - _isRunning = true + _status = ActorRefStatus.RUNNING if (!isInInitialization) initializeActorInstance else runActorInitialization = true } @@ -815,8 +832,7 @@ class LocalActorRef private[akka]( cancelReceiveTimeout dispatcher.unregister(this) _transactionFactory = None - _isRunning = false - _isShutDown = true + _status = ActorRefStatus.SHUTDOWN actor.postStop ActorRegistry.unregister(this) if (isRemotingEnabled) { @@ -1000,7 +1016,7 @@ class LocalActorRef private[akka]( } /** - * Callback for the dispatcher. This is the ingle entry point to the user Actor implementation. + * Callback for the dispatcher. This is the single entry point to the user Actor implementation. */ protected[akka] def invoke(messageHandle: MessageInvocation): Unit = guard.withGuard { if (isShutdown) @@ -1067,7 +1083,7 @@ class LocalActorRef private[akka]( stop } else { - _isBeingRestarted = true + _status = ActorRefStatus.BEING_RESTARTED val failedActor = actorInstance.get guard.withGuard { lifeCycle match { @@ -1077,10 +1093,12 @@ class LocalActorRef private[akka]( Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id) Actor.log.debug("Restarting linked actors for actor [%s].", id) restartLinkedActors(reason, maxNrOfRetries, withinTimeRange) + Actor.log.debug("Invoking 'preRestart' for failed actor instance [%s].", id) if (isProxyableDispatcher(failedActor)) restartProxyableDispatcher(failedActor, reason) - else restartActor(failedActor, reason) - _isBeingRestarted = false + else restartActor(failedActor, reason) + + _status = ActorRefStatus.RUNNING } } } @@ -1236,7 +1254,7 @@ class LocalActorRef private[akka]( private def handleExceptionInDispatch(reason: Throwable, message: Any, topLevelTransaction: Boolean) = { Actor.log.error(reason, "Exception when invoking \n\tactor [%s] \n\twith message [%s]", this, message) - _isBeingRestarted = true + _status = ActorRefStatus.BEING_RESTARTED // abort transaction set if (isTransactionSetInScope) { val txSet = getTransactionSetInScope @@ -1376,13 +1394,12 @@ private[akka] case class RemoteActorRef private[akka] ( } def start: ActorRef = { - _isRunning = true + _status = ActorRefStatus.RUNNING this } def stop: Unit = { - _isRunning = false - _isShutDown = true + _status = ActorRefStatus.SHUTDOWN postMessageToMailbox(RemoteActorSystemMessage.Stop, None) } diff --git a/akka-camel/src/main/scala/component/ActorComponent.scala b/akka-camel/src/main/scala/component/ActorComponent.scala index 6c1c5902fa..89cc0d4d3e 100644 --- a/akka-camel/src/main/scala/component/ActorComponent.scala +++ b/akka-camel/src/main/scala/component/ActorComponent.scala @@ -18,10 +18,10 @@ import se.scalablesolutions.akka.camel.{Failure, CamelMessageConversion, Message import CamelMessageConversion.toExchangeAdapter import se.scalablesolutions.akka.dispatch.{CompletableFuture, MessageInvocation, MessageDispatcher} import se.scalablesolutions.akka.stm.TransactionConfig -import se.scalablesolutions.akka.actor.{ScalaActorRef, ActorRegistry, Actor, ActorRef} import se.scalablesolutions.akka.AkkaException import scala.reflect.BeanProperty +import se.scalablesolutions.akka.actor._ /** * Camel component for sending messages to and receiving replies from (untyped) actors. @@ -199,13 +199,12 @@ private[akka] object AsyncCallbackAdapter { private[akka] class AsyncCallbackAdapter(exchange: Exchange, callback: AsyncCallback) extends ActorRef with ScalaActorRef { def start = { - _isRunning = true + _status = ActorRefStatus.RUNNING this } def stop() = { - _isRunning = false - _isShutDown = true + _status = ActorRefStatus.SHUTDOWN } /** diff --git a/akka-remote/src/main/scala/remote/RemoteServer.scala b/akka-remote/src/main/scala/remote/RemoteServer.scala index 0c8b305364..b258c4867d 100644 --- a/akka-remote/src/main/scala/remote/RemoteServer.scala +++ b/akka-remote/src/main/scala/remote/RemoteServer.scala @@ -383,9 +383,9 @@ class RemoteServer extends Logging with ListenerManagement { protected[akka] override def notifyListeners(message: => Any): Unit = super.notifyListeners(message) - private[akka] def actors() = RemoteServer.actorsFor(address).actors + private[akka] def actors() = RemoteServer.actorsFor(address).actors private[akka] def actorsByUuid() = RemoteServer.actorsFor(address).actorsByUuid - private[akka] def typedActors() = RemoteServer.actorsFor(address).typedActors + private[akka] def typedActors() = RemoteServer.actorsFor(address).typedActors private[akka] def typedActorsByUuid() = RemoteServer.actorsFor(address).typedActorsByUuid } @@ -508,11 +508,12 @@ class RemoteServerHandler( private def handleRemoteRequestProtocol(request: RemoteRequestProtocol, channel: Channel) = { log.debug("Received RemoteRequestProtocol[\n%s]", request.toString) - val actorType = request.getActorInfo.getActorType - if (actorType == SCALA_ACTOR) dispatchToActor(request, channel) - else if (actorType == JAVA_ACTOR) throw new IllegalActorStateException("ActorType JAVA_ACTOR is currently not supported") - else if (actorType == TYPED_ACTOR) dispatchToTypedActor(request, channel) - else throw new IllegalActorStateException("Unknown ActorType [" + actorType + "]") + request.getActorInfo.getActorType match { + case SCALA_ACTOR => dispatchToActor(request, channel) + case TYPED_ACTOR => dispatchToTypedActor(request, channel) + case JAVA_ACTOR => throw new IllegalActorStateException("ActorType JAVA_ACTOR is currently not supported") + case other => throw new IllegalActorStateException("Unknown ActorType [" + other + "]") + } } private def dispatchToActor(request: RemoteRequestProtocol, channel: Channel) = {