diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 30f82e975f..23014cd884 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -231,7 +231,8 @@ private[akka] class ActorCell( def uuid: Uuid = self.uuid - def dispatcher: MessageDispatcher = props.dispatcher + @inline + final def dispatcher: MessageDispatcher = if (props.dispatcher == Props.defaultDispatcher) app.dispatcher else props.dispatcher def isShutdown: Boolean = mailbox.isClosed diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 6e47f60300..c511847261 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -152,12 +152,12 @@ abstract class ActorRef extends ActorRefShared with UntypedChannel with ReplyCha */ class LocalActorRef private[akka] ( app: AkkaApplication, - private[this] val props: Props, + props: Props, givenAddress: String, val systemService: Boolean = false, override private[akka] val uuid: Uuid = newUuid, receiveTimeout: Option[Long] = None, - hotswap: Stack[PartialFunction[Any, Unit]] = Stack.empty) + hotswap: Stack[PartialFunction[Any, Unit]] = Props.noHotSwap) extends ActorRef with ScalaActorRef { private[this] val actorCell = new ActorCell(app, this, props, receiveTimeout, hotswap) @@ -253,15 +253,6 @@ class LocalActorRef private[akka] ( } } -/** - * System messages for RemoteActorRef. - * - * @author Jonas Bonér - */ -object RemoteActorSystemMessage { - val Stop = "RemoteActorRef:stop".intern -} - /** * This trait represents the common (external) methods for all ActorRefs * Needed because implicit conversions aren't applied when instance imports are used diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 0b94e94809..a8a2465458 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -8,12 +8,11 @@ import akka.config.ConfigurationException import akka.util.ReflectiveAccess import akka.routing._ import akka.AkkaApplication -import akka.dispatch.MessageDispatcher import java.util.concurrent.ConcurrentHashMap -import akka.dispatch.Promise import com.eaio.uuid.UUID import akka.AkkaException import akka.event.{ ActorClassification, DeathWatch, EventHandler } +import akka.dispatch.{ Future, MessageDispatcher, Promise } /** * Interface for all ActorRef providers to implement. @@ -79,13 +78,14 @@ class ActorRefProviderException(message: String) extends AkkaException(message) */ class LocalActorRefProvider(val app: AkkaApplication) extends ActorRefProvider { - private val actors = new ConcurrentHashMap[String, Promise[ActorRef]] + private val actors = new ConcurrentHashMap[String, AnyRef] def actorOf(props: Props, address: String): ActorRef = actorOf(props, address, false) def actorFor(address: String): Option[ActorRef] = actors.get(address) match { - case null ⇒ None - case future ⇒ Some(future.get) + case null ⇒ None + case actor: ActorRef ⇒ Some(actor) + case future: Future[_] ⇒ Some(future.get.asInstanceOf[ActorRef]) } /** @@ -94,61 +94,53 @@ class LocalActorRefProvider(val app: AkkaApplication) extends ActorRefProvider { private[akka] def evict(address: String): Boolean = actors.remove(address) ne null private[akka] def actorOf(props: Props, address: String, systemService: Boolean): ActorRef = { - Address.validate(address) - - val localProps = - if (props.dispatcher == Props.defaultDispatcher) - props.copy(dispatcher = app.dispatcher) - else - props - - val defaultTimeout = app.AkkaConfig.ActorTimeout val newFuture = Promise[ActorRef](5000)(app.dispatcher) // FIXME is this proper timeout? - val oldFuture = actors.putIfAbsent(address, newFuture) - if (oldFuture eq null) { // we won the race -- create the actor and resolve the future + actors.putIfAbsent(address, newFuture) match { + case null ⇒ + val actor: ActorRef = try { + app.deployer.lookupDeploymentFor(address) match { // see if the deployment already exists, if so use it, if not create actor - val actor: ActorRef = try { - app.deployer.lookupDeploymentFor(address) match { // see if the deployment already exists, if so use it, if not create actor + // create a local actor + case None | Some(DeploymentConfig.Deploy(_, _, DeploymentConfig.Direct, _, _, DeploymentConfig.LocalScope)) ⇒ + new LocalActorRef(app, props, address, systemService) // create a local actor - // create a local actor - case None | Some(DeploymentConfig.Deploy(_, _, DeploymentConfig.Direct, _, _, DeploymentConfig.LocalScope)) ⇒ - new LocalActorRef(app, localProps, address, systemService) // create a local actor + // create a routed actor ref + case deploy @ Some(DeploymentConfig.Deploy(_, _, routerType, nrOfInstances, _, DeploymentConfig.LocalScope)) ⇒ + val routerFactory: () ⇒ Router = DeploymentConfig.routerTypeFor(routerType) match { + case RouterType.Direct ⇒ () ⇒ new DirectRouter + case RouterType.Random ⇒ () ⇒ new RandomRouter + case RouterType.RoundRobin ⇒ () ⇒ new RoundRobinRouter + case RouterType.ScatterGather ⇒ () ⇒ new ScatterGatherFirstCompletedRouter()( + if (props.dispatcher == Props.defaultDispatcher) app.dispatcher else props.dispatcher, app.AkkaConfig.ActorTimeout) + case RouterType.LeastCPU ⇒ sys.error("Router LeastCPU not supported yet") + case RouterType.LeastRAM ⇒ sys.error("Router LeastRAM not supported yet") + case RouterType.LeastMessages ⇒ sys.error("Router LeastMessages not supported yet") + case RouterType.Custom(implClass) ⇒ () ⇒ Routing.createCustomRouter(implClass) + } - // create a routed actor ref - case deploy @ Some(DeploymentConfig.Deploy(_, _, routerType, nrOfInstances, _, DeploymentConfig.LocalScope)) ⇒ - val routerFactory: () ⇒ Router = DeploymentConfig.routerTypeFor(routerType) match { - case RouterType.Direct ⇒ () ⇒ new DirectRouter - case RouterType.Random ⇒ () ⇒ new RandomRouter - case RouterType.RoundRobin ⇒ () ⇒ new RoundRobinRouter - case RouterType.ScatterGather ⇒ () ⇒ new ScatterGatherFirstCompletedRouter()(props.dispatcher, defaultTimeout) - case RouterType.LeastCPU ⇒ sys.error("Router LeastCPU not supported yet") - case RouterType.LeastRAM ⇒ sys.error("Router LeastRAM not supported yet") - case RouterType.LeastMessages ⇒ sys.error("Router LeastMessages not supported yet") - case RouterType.Custom(implClass) ⇒ () ⇒ Routing.createCustomRouter(implClass) - } + val connections: Iterable[ActorRef] = + if (nrOfInstances.factor > 0) Vector.fill(nrOfInstances.factor)(new LocalActorRef(app, props, "", systemService)) else Nil - val connections: Iterable[ActorRef] = - if (nrOfInstances.factor > 0) - Vector.fill(nrOfInstances.factor)(new LocalActorRef(app, localProps, new UUID().toString, systemService)) - else Nil + actorOf(RoutedProps(routerFactory = routerFactory, connectionManager = new LocalConnectionManager(connections)), address) - actorOf(RoutedProps(routerFactory = routerFactory, connectionManager = new LocalConnectionManager(connections)), address) - - case _ ⇒ throw new Exception("Don't know how to create this actor ref! Why?") + case _ ⇒ throw new Exception("Don't know how to create this actor ref! Why?") + } + } catch { + case e: Exception ⇒ + newFuture completeWithException e // so the other threads gets notified of error + //TODO FIXME should we remove the mapping in "actors" here? + throw e } - } catch { - case e: Exception ⇒ - newFuture completeWithException e // so the other threads gets notified of error - throw e - } - newFuture completeWithResult actor - actor - - } else { // we lost the race -- wait for future to complete - oldFuture.await.resultOrException.get + newFuture completeWithResult actor + actors.replace(address, newFuture, actor) + actor + case actor: ActorRef ⇒ + actor + case future: Future[_] ⇒ + future.get.asInstanceOf[ActorRef] } } @@ -161,7 +153,7 @@ class LocalActorRefProvider(val app: AkkaApplication) extends ActorRefProvider { //TODO If address matches an already created actor (Ahead-of-time deployed) return that actor //TODO If address exists in config, it will override the specified Props (should we attempt to merge?) //TODO If the actor deployed uses a different config, then ignore or throw exception? - if (props.connectionManager.size == 0) throw new ConfigurationException("RoutedProps used for creating actor [" + address + "] has zero connections configured; can't create a router") + if (props.connectionManager.isEmpty) throw new ConfigurationException("RoutedProps used for creating actor [" + address + "] has zero connections configured; can't create a router") // val clusteringEnabled = ReflectiveAccess.ClusterModule.isEnabled // val localOnly = props.localOnly // if (clusteringEnabled && !props.localOnly) ReflectiveAccess.ClusterModule.newClusteredActorRef(props) diff --git a/akka-actor/src/main/scala/akka/actor/Props.scala b/akka-actor/src/main/scala/akka/actor/Props.scala index 2aa4eeb8e7..7aedfbdd0a 100644 --- a/akka-actor/src/main/scala/akka/actor/Props.scala +++ b/akka-actor/src/main/scala/akka/actor/Props.scala @@ -7,6 +7,7 @@ package akka.actor import akka.dispatch._ import akka.japi.Creator import akka.util._ +import collection.immutable.Stack /** * ActorRef configuration object, this is threadsafe and fully sharable @@ -20,6 +21,7 @@ object Props { final val defaultTimeout: Timeout = Timeout(Duration.MinusInf) final val defaultFaultHandler: FaultHandlingStrategy = OneForOneStrategy(classOf[Exception] :: Nil, None, None) final val defaultSupervisor: Option[ActorRef] = None + final val noHotSwap: Stack[Actor.Receive] = Stack.empty /** * The default Props instance, uses the settings from the Props object starting with default* diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index 64e6fc64dd..d37b3f8c16 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -251,8 +251,8 @@ class TypedActor(val app: AkkaApplication) { //Warning, do not change order of the following statements, it's some elaborate chicken-n-egg handling val actorVar = new AtomVar[ActorRef](null) val timeout = props.timeout match { - case Timeout(Duration.MinusInf) ⇒ app.AkkaConfig.ActorTimeout - case x ⇒ x + case Props.`defaultTimeout` ⇒ app.AkkaConfig.ActorTimeout + case x ⇒ x } val proxy: T = Proxy.newProxyInstance(loader, interfaces, new TypedActorInvocationHandler(actorVar)(timeout)).asInstanceOf[T] proxyVar.set(proxy) // Chicken and egg situation we needed to solve, set the proxy so that we can set the self-reference inside each receive diff --git a/akka-actor/src/main/scala/akka/routing/ConnectionManager.scala b/akka-actor/src/main/scala/akka/routing/ConnectionManager.scala index 2d6d8c549e..80230e73ff 100644 --- a/akka-actor/src/main/scala/akka/routing/ConnectionManager.scala +++ b/akka-actor/src/main/scala/akka/routing/ConnectionManager.scala @@ -40,6 +40,12 @@ trait ConnectionManager { */ def size: Int + /** + * Returns if the number of 'available' is 0 or not. Value could be stale as soon as received, and this method can't be combined (easily) + * with an atomic read of and isEmpty and version. + */ + def isEmpty: Boolean + /** * Shuts the connection manager down, which stops all managed actors */ @@ -90,6 +96,8 @@ class LocalConnectionManager(initialConnections: Iterable[ActorRef]) extends Con def size: Int = state.get.connections.size + def isEmpty: Boolean = state.get.connections.isEmpty + def connections = state.get def shutdown() { diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 0cc5efa591..0f4692a6af 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -34,7 +34,7 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider val local = new LocalActorRefProvider(app) val remote = new Remote(app) - private val actors = new ConcurrentHashMap[String, Promise[ActorRef]] + private val actors = new ConcurrentHashMap[String, AnyRef] private val remoteDaemonConnectionManager = new RemoteConnectionManager( app, @@ -47,98 +47,90 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider def actorOf(props: Props, address: String): ActorRef = actorOf(props, address, false) def actorOf(props: Props, address: String, systemService: Boolean): ActorRef = { - Address.validate(address) - val newFuture = Promise[ActorRef](5000)(defaultDispatcher) // FIXME is this proper timeout? - val oldFuture = actors.putIfAbsent(address, newFuture) - if (oldFuture eq null) { // we won the race -- create the actor and resolve the future - val actor: ActorRef = try { - app.deployer.lookupDeploymentFor(address) match { - case Some(DeploymentConfig.Deploy(_, _, routerType, nrOfInstances, failureDetectorType, DeploymentConfig.RemoteScope(remoteAddresses))) ⇒ + actors.putIfAbsent(address, newFuture) match { // we won the race -- create the actor and resolve the future + case null ⇒ + val actor: ActorRef = try { + app.deployer.lookupDeploymentFor(address) match { + case Some(DeploymentConfig.Deploy(_, _, routerType, nrOfInstances, failureDetectorType, DeploymentConfig.RemoteScope(remoteAddresses))) ⇒ - val failureDetector = DeploymentConfig.failureDetectorTypeFor(failureDetectorType) match { - case FailureDetectorType.NoOp ⇒ new NoOpFailureDetector - case FailureDetectorType.RemoveConnectionOnFirstFailure ⇒ new RemoveConnectionOnFirstFailureFailureDetector - case FailureDetectorType.BannagePeriod(timeToBan) ⇒ new BannagePeriodFailureDetector(timeToBan) - case FailureDetectorType.Custom(implClass) ⇒ FailureDetector.createCustomFailureDetector(implClass) - } - - val thisHostname = remote.address.getHostName - val thisPort = remote.address.getPort - - def isReplicaNode: Boolean = remoteAddresses exists { remoteAddress ⇒ - remoteAddress.hostname == thisHostname && remoteAddress.port == thisPort - } - - if (isReplicaNode) { - // we are on one of the replica node for this remote actor - val localProps = - if (props.dispatcher == Props.defaultDispatcher) props.copy(dispatcher = app.dispatcher) - else props - new LocalActorRef(app, localProps, address, false) - } else { - - // we are on the single "reference" node uses the remote actors on the replica nodes - val routerFactory: () ⇒ Router = DeploymentConfig.routerTypeFor(routerType) match { - case RouterType.Direct ⇒ - if (remoteAddresses.size != 1) throw new ConfigurationException( - "Actor [%s] configured with Direct router must have exactly 1 remote node configured. Found [%s]" - .format(address, remoteAddresses.mkString(", "))) - () ⇒ new DirectRouter - - case RouterType.Random ⇒ - if (remoteAddresses.size < 1) throw new ConfigurationException( - "Actor [%s] configured with Random router must have at least 1 remote node configured. Found [%s]" - .format(address, remoteAddresses.mkString(", "))) - () ⇒ new RandomRouter - - case RouterType.RoundRobin ⇒ - if (remoteAddresses.size < 1) throw new ConfigurationException( - "Actor [%s] configured with RoundRobin router must have at least 1 remote node configured. Found [%s]" - .format(address, remoteAddresses.mkString(", "))) - () ⇒ new RoundRobinRouter - - case RouterType.ScatterGather ⇒ - if (remoteAddresses.size < 1) throw new ConfigurationException( - "Actor [%s] configured with ScatterGather router must have at least 1 remote node configured. Found [%s]" - .format(address, remoteAddresses.mkString(", "))) - () ⇒ new ScatterGatherFirstCompletedRouter()(defaultDispatcher, defaultTimeout) - - case RouterType.LeastCPU ⇒ sys.error("Router LeastCPU not supported yet") - case RouterType.LeastRAM ⇒ sys.error("Router LeastRAM not supported yet") - case RouterType.LeastMessages ⇒ sys.error("Router LeastMessages not supported yet") - case RouterType.Custom(implClass) ⇒ () ⇒ Routing.createCustomRouter(implClass) + val failureDetector = DeploymentConfig.failureDetectorTypeFor(failureDetectorType) match { + case FailureDetectorType.NoOp ⇒ new NoOpFailureDetector + case FailureDetectorType.RemoveConnectionOnFirstFailure ⇒ new RemoveConnectionOnFirstFailureFailureDetector + case FailureDetectorType.BannagePeriod(timeToBan) ⇒ new BannagePeriodFailureDetector(timeToBan) + case FailureDetectorType.Custom(implClass) ⇒ FailureDetector.createCustomFailureDetector(implClass) } - var connections = Map.empty[InetSocketAddress, ActorRef] - remoteAddresses foreach { remoteAddress: DeploymentConfig.RemoteAddress ⇒ - val inetSocketAddress = new InetSocketAddress(remoteAddress.hostname, remoteAddress.port) - connections += (inetSocketAddress -> RemoteActorRef(remote.server, inetSocketAddress, address, None)) + val thisHostname = remote.address.getHostName + val thisPort = remote.address.getPort + + def isReplicaNode: Boolean = remoteAddresses exists { some ⇒ some.hostname == thisHostname && some.port == thisPort } + + if (isReplicaNode) { + // we are on one of the replica node for this remote actor + new LocalActorRef(app, props, address, false) + } else { + + // we are on the single "reference" node uses the remote actors on the replica nodes + val routerFactory: () ⇒ Router = DeploymentConfig.routerTypeFor(routerType) match { + case RouterType.Direct ⇒ + if (remoteAddresses.size != 1) throw new ConfigurationException( + "Actor [%s] configured with Direct router must have exactly 1 remote node configured. Found [%s]" + .format(address, remoteAddresses.mkString(", "))) + () ⇒ new DirectRouter + + case RouterType.Random ⇒ + if (remoteAddresses.size < 1) throw new ConfigurationException( + "Actor [%s] configured with Random router must have at least 1 remote node configured. Found [%s]" + .format(address, remoteAddresses.mkString(", "))) + () ⇒ new RandomRouter + + case RouterType.RoundRobin ⇒ + if (remoteAddresses.size < 1) throw new ConfigurationException( + "Actor [%s] configured with RoundRobin router must have at least 1 remote node configured. Found [%s]" + .format(address, remoteAddresses.mkString(", "))) + () ⇒ new RoundRobinRouter + + case RouterType.ScatterGather ⇒ + if (remoteAddresses.size < 1) throw new ConfigurationException( + "Actor [%s] configured with ScatterGather router must have at least 1 remote node configured. Found [%s]" + .format(address, remoteAddresses.mkString(", "))) + () ⇒ new ScatterGatherFirstCompletedRouter()(defaultDispatcher, defaultTimeout) + + case RouterType.LeastCPU ⇒ sys.error("Router LeastCPU not supported yet") + case RouterType.LeastRAM ⇒ sys.error("Router LeastRAM not supported yet") + case RouterType.LeastMessages ⇒ sys.error("Router LeastMessages not supported yet") + case RouterType.Custom(implClass) ⇒ () ⇒ Routing.createCustomRouter(implClass) + } + + val connections = (Map.empty[InetSocketAddress, ActorRef] /: remoteAddresses) { (conns, a) ⇒ + val inetAddr = new InetSocketAddress(a.hostname, a.port) + conns + (inetAddr -> RemoteActorRef(remote.server, inetAddr, address, None)) + } + + val connectionManager = new RemoteConnectionManager(app, remote, connections, failureDetector) + + connections.keys foreach { useActorOnNode(_, address, props.creator) } + + actorOf(RoutedProps(routerFactory = routerFactory, connectionManager = connectionManager), address) } - val connectionManager = new RemoteConnectionManager(app, remote, connections, failureDetector) - - connections.keys foreach { useActorOnNode(_, address, props.creator) } - - actorOf(RoutedProps(routerFactory = routerFactory, connectionManager = connectionManager), address) - } - - case deploy ⇒ local.actorOf(props, address, systemService) + case deploy ⇒ local.actorOf(props, address, systemService) + } + } catch { + case e: Exception ⇒ + newFuture completeWithException e // so the other threads gets notified of error + throw e } - } catch { - case e: Exception ⇒ - newFuture completeWithException e // so the other threads gets notified of error - throw e - } - // actor foreach app.registry.register // only for ActorRegistry backward compat, will be removed later + // actor foreach app.registry.register // only for ActorRegistry backward compat, will be removed later - newFuture completeWithResult actor - actor - - } else { // we lost the race -- wait for future to complete - oldFuture.get + newFuture completeWithResult actor + actors.replace(address, newFuture, actor) + actor + case actor: ActorRef ⇒ actor + case future: Future[_] ⇒ future.get.asInstanceOf[ActorRef] } } @@ -151,8 +143,9 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider } def actorFor(address: String): Option[ActorRef] = actors.get(address) match { - case null ⇒ None - case future ⇒ Some(future.get) + case null ⇒ None + case actor: ActorRef ⇒ Some(actor) + case future: Future[_] ⇒ Some(future.get.asInstanceOf[ActorRef]) } /** diff --git a/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala b/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala index b89b9310a5..d5a99fe893 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala @@ -52,6 +52,8 @@ class RemoteConnectionManager( def size: Int = connections.connections.size + def isEmpty: Boolean = connections.connections.isEmpty + def shutdown() { state.get.iterable foreach (_.stop()) // shut down all remote connections }