From cb8a0adbb8f30f4c77be8176659c4ac9cb974394 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 18 Oct 2011 11:26:35 +0200 Subject: [PATCH] Switching to a cached version of Stack.empty, saving 16 bytes per Actor. Switching to purging the Promises in the ActorRefProvider after successful creation to conserve memory. Stopping to clone the props everytime to set the application default dispatcher, and doing a conditional in ActorCell instead. --- .../src/main/scala/akka/actor/ActorCell.scala | 3 +- .../src/main/scala/akka/actor/ActorRef.scala | 13 +- .../scala/akka/actor/ActorRefProvider.scala | 94 +++++----- .../src/main/scala/akka/actor/Props.scala | 2 + .../main/scala/akka/actor/TypedActor.scala | 4 +- .../akka/routing/ConnectionManager.scala | 8 + .../akka/remote/RemoteActorRefProvider.scala | 163 +++++++++--------- .../akka/remote/RemoteConnectionManager.scala | 2 + 8 files changed, 139 insertions(+), 150 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 30f82e975f..23014cd884 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -231,7 +231,8 @@ private[akka] class ActorCell( def uuid: Uuid = self.uuid - def dispatcher: MessageDispatcher = props.dispatcher + @inline + final def dispatcher: MessageDispatcher = if (props.dispatcher == Props.defaultDispatcher) app.dispatcher else props.dispatcher def isShutdown: Boolean = mailbox.isClosed diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 6e47f60300..c511847261 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -152,12 +152,12 @@ abstract class ActorRef extends ActorRefShared with UntypedChannel with ReplyCha */ class LocalActorRef private[akka] ( app: AkkaApplication, - private[this] val props: Props, + props: Props, givenAddress: String, val systemService: Boolean = false, override private[akka] val uuid: Uuid = newUuid, receiveTimeout: Option[Long] = None, - hotswap: Stack[PartialFunction[Any, Unit]] = Stack.empty) + hotswap: Stack[PartialFunction[Any, Unit]] = Props.noHotSwap) extends ActorRef with ScalaActorRef { private[this] val actorCell = new ActorCell(app, this, props, receiveTimeout, hotswap) @@ -253,15 +253,6 @@ class LocalActorRef private[akka] ( } } -/** - * System messages for RemoteActorRef. - * - * @author Jonas Bonér - */ -object RemoteActorSystemMessage { - val Stop = "RemoteActorRef:stop".intern -} - /** * This trait represents the common (external) methods for all ActorRefs * Needed because implicit conversions aren't applied when instance imports are used diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 0b94e94809..a8a2465458 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -8,12 +8,11 @@ import akka.config.ConfigurationException import akka.util.ReflectiveAccess import akka.routing._ import akka.AkkaApplication -import akka.dispatch.MessageDispatcher import java.util.concurrent.ConcurrentHashMap -import akka.dispatch.Promise import com.eaio.uuid.UUID import akka.AkkaException import akka.event.{ ActorClassification, DeathWatch, EventHandler } +import akka.dispatch.{ Future, MessageDispatcher, Promise } /** * Interface for all ActorRef providers to implement. @@ -79,13 +78,14 @@ class ActorRefProviderException(message: String) extends AkkaException(message) */ class LocalActorRefProvider(val app: AkkaApplication) extends ActorRefProvider { - private val actors = new ConcurrentHashMap[String, Promise[ActorRef]] + private val actors = new ConcurrentHashMap[String, AnyRef] def actorOf(props: Props, address: String): ActorRef = actorOf(props, address, false) def actorFor(address: String): Option[ActorRef] = actors.get(address) match { - case null ⇒ None - case future ⇒ Some(future.get) + case null ⇒ None + case actor: ActorRef ⇒ Some(actor) + case future: Future[_] ⇒ Some(future.get.asInstanceOf[ActorRef]) } /** @@ -94,61 +94,53 @@ class LocalActorRefProvider(val app: AkkaApplication) extends ActorRefProvider { private[akka] def evict(address: String): Boolean = actors.remove(address) ne null private[akka] def actorOf(props: Props, address: String, systemService: Boolean): ActorRef = { - Address.validate(address) - - val localProps = - if (props.dispatcher == Props.defaultDispatcher) - props.copy(dispatcher = app.dispatcher) - else - props - - val defaultTimeout = app.AkkaConfig.ActorTimeout val newFuture = Promise[ActorRef](5000)(app.dispatcher) // FIXME is this proper timeout? - val oldFuture = actors.putIfAbsent(address, newFuture) - if (oldFuture eq null) { // we won the race -- create the actor and resolve the future + actors.putIfAbsent(address, newFuture) match { + case null ⇒ + val actor: ActorRef = try { + app.deployer.lookupDeploymentFor(address) match { // see if the deployment already exists, if so use it, if not create actor - val actor: ActorRef = try { - app.deployer.lookupDeploymentFor(address) match { // see if the deployment already exists, if so use it, if not create actor + // create a local actor + case None | Some(DeploymentConfig.Deploy(_, _, DeploymentConfig.Direct, _, _, DeploymentConfig.LocalScope)) ⇒ + new LocalActorRef(app, props, address, systemService) // create a local actor - // create a local actor - case None | Some(DeploymentConfig.Deploy(_, _, DeploymentConfig.Direct, _, _, DeploymentConfig.LocalScope)) ⇒ - new LocalActorRef(app, localProps, address, systemService) // create a local actor + // create a routed actor ref + case deploy @ Some(DeploymentConfig.Deploy(_, _, routerType, nrOfInstances, _, DeploymentConfig.LocalScope)) ⇒ + val routerFactory: () ⇒ Router = DeploymentConfig.routerTypeFor(routerType) match { + case RouterType.Direct ⇒ () ⇒ new DirectRouter + case RouterType.Random ⇒ () ⇒ new RandomRouter + case RouterType.RoundRobin ⇒ () ⇒ new RoundRobinRouter + case RouterType.ScatterGather ⇒ () ⇒ new ScatterGatherFirstCompletedRouter()( + if (props.dispatcher == Props.defaultDispatcher) app.dispatcher else props.dispatcher, app.AkkaConfig.ActorTimeout) + case RouterType.LeastCPU ⇒ sys.error("Router LeastCPU not supported yet") + case RouterType.LeastRAM ⇒ sys.error("Router LeastRAM not supported yet") + case RouterType.LeastMessages ⇒ sys.error("Router LeastMessages not supported yet") + case RouterType.Custom(implClass) ⇒ () ⇒ Routing.createCustomRouter(implClass) + } - // create a routed actor ref - case deploy @ Some(DeploymentConfig.Deploy(_, _, routerType, nrOfInstances, _, DeploymentConfig.LocalScope)) ⇒ - val routerFactory: () ⇒ Router = DeploymentConfig.routerTypeFor(routerType) match { - case RouterType.Direct ⇒ () ⇒ new DirectRouter - case RouterType.Random ⇒ () ⇒ new RandomRouter - case RouterType.RoundRobin ⇒ () ⇒ new RoundRobinRouter - case RouterType.ScatterGather ⇒ () ⇒ new ScatterGatherFirstCompletedRouter()(props.dispatcher, defaultTimeout) - case RouterType.LeastCPU ⇒ sys.error("Router LeastCPU not supported yet") - case RouterType.LeastRAM ⇒ sys.error("Router LeastRAM not supported yet") - case RouterType.LeastMessages ⇒ sys.error("Router LeastMessages not supported yet") - case RouterType.Custom(implClass) ⇒ () ⇒ Routing.createCustomRouter(implClass) - } + val connections: Iterable[ActorRef] = + if (nrOfInstances.factor > 0) Vector.fill(nrOfInstances.factor)(new LocalActorRef(app, props, "", systemService)) else Nil - val connections: Iterable[ActorRef] = - if (nrOfInstances.factor > 0) - Vector.fill(nrOfInstances.factor)(new LocalActorRef(app, localProps, new UUID().toString, systemService)) - else Nil + actorOf(RoutedProps(routerFactory = routerFactory, connectionManager = new LocalConnectionManager(connections)), address) - actorOf(RoutedProps(routerFactory = routerFactory, connectionManager = new LocalConnectionManager(connections)), address) - - case _ ⇒ throw new Exception("Don't know how to create this actor ref! Why?") + case _ ⇒ throw new Exception("Don't know how to create this actor ref! Why?") + } + } catch { + case e: Exception ⇒ + newFuture completeWithException e // so the other threads gets notified of error + //TODO FIXME should we remove the mapping in "actors" here? + throw e } - } catch { - case e: Exception ⇒ - newFuture completeWithException e // so the other threads gets notified of error - throw e - } - newFuture completeWithResult actor - actor - - } else { // we lost the race -- wait for future to complete - oldFuture.await.resultOrException.get + newFuture completeWithResult actor + actors.replace(address, newFuture, actor) + actor + case actor: ActorRef ⇒ + actor + case future: Future[_] ⇒ + future.get.asInstanceOf[ActorRef] } } @@ -161,7 +153,7 @@ class LocalActorRefProvider(val app: AkkaApplication) extends ActorRefProvider { //TODO If address matches an already created actor (Ahead-of-time deployed) return that actor //TODO If address exists in config, it will override the specified Props (should we attempt to merge?) //TODO If the actor deployed uses a different config, then ignore or throw exception? - if (props.connectionManager.size == 0) throw new ConfigurationException("RoutedProps used for creating actor [" + address + "] has zero connections configured; can't create a router") + if (props.connectionManager.isEmpty) throw new ConfigurationException("RoutedProps used for creating actor [" + address + "] has zero connections configured; can't create a router") // val clusteringEnabled = ReflectiveAccess.ClusterModule.isEnabled // val localOnly = props.localOnly // if (clusteringEnabled && !props.localOnly) ReflectiveAccess.ClusterModule.newClusteredActorRef(props) diff --git a/akka-actor/src/main/scala/akka/actor/Props.scala b/akka-actor/src/main/scala/akka/actor/Props.scala index 2aa4eeb8e7..7aedfbdd0a 100644 --- a/akka-actor/src/main/scala/akka/actor/Props.scala +++ b/akka-actor/src/main/scala/akka/actor/Props.scala @@ -7,6 +7,7 @@ package akka.actor import akka.dispatch._ import akka.japi.Creator import akka.util._ +import collection.immutable.Stack /** * ActorRef configuration object, this is threadsafe and fully sharable @@ -20,6 +21,7 @@ object Props { final val defaultTimeout: Timeout = Timeout(Duration.MinusInf) final val defaultFaultHandler: FaultHandlingStrategy = OneForOneStrategy(classOf[Exception] :: Nil, None, None) final val defaultSupervisor: Option[ActorRef] = None + final val noHotSwap: Stack[Actor.Receive] = Stack.empty /** * The default Props instance, uses the settings from the Props object starting with default* diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index 64e6fc64dd..d37b3f8c16 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -251,8 +251,8 @@ class TypedActor(val app: AkkaApplication) { //Warning, do not change order of the following statements, it's some elaborate chicken-n-egg handling val actorVar = new AtomVar[ActorRef](null) val timeout = props.timeout match { - case Timeout(Duration.MinusInf) ⇒ app.AkkaConfig.ActorTimeout - case x ⇒ x + case Props.`defaultTimeout` ⇒ app.AkkaConfig.ActorTimeout + case x ⇒ x } val proxy: T = Proxy.newProxyInstance(loader, interfaces, new TypedActorInvocationHandler(actorVar)(timeout)).asInstanceOf[T] proxyVar.set(proxy) // Chicken and egg situation we needed to solve, set the proxy so that we can set the self-reference inside each receive diff --git a/akka-actor/src/main/scala/akka/routing/ConnectionManager.scala b/akka-actor/src/main/scala/akka/routing/ConnectionManager.scala index 2d6d8c549e..80230e73ff 100644 --- a/akka-actor/src/main/scala/akka/routing/ConnectionManager.scala +++ b/akka-actor/src/main/scala/akka/routing/ConnectionManager.scala @@ -40,6 +40,12 @@ trait ConnectionManager { */ def size: Int + /** + * Returns if the number of 'available' is 0 or not. Value could be stale as soon as received, and this method can't be combined (easily) + * with an atomic read of and isEmpty and version. + */ + def isEmpty: Boolean + /** * Shuts the connection manager down, which stops all managed actors */ @@ -90,6 +96,8 @@ class LocalConnectionManager(initialConnections: Iterable[ActorRef]) extends Con def size: Int = state.get.connections.size + def isEmpty: Boolean = state.get.connections.isEmpty + def connections = state.get def shutdown() { diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 0cc5efa591..0f4692a6af 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -34,7 +34,7 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider val local = new LocalActorRefProvider(app) val remote = new Remote(app) - private val actors = new ConcurrentHashMap[String, Promise[ActorRef]] + private val actors = new ConcurrentHashMap[String, AnyRef] private val remoteDaemonConnectionManager = new RemoteConnectionManager( app, @@ -47,98 +47,90 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider def actorOf(props: Props, address: String): ActorRef = actorOf(props, address, false) def actorOf(props: Props, address: String, systemService: Boolean): ActorRef = { - Address.validate(address) - val newFuture = Promise[ActorRef](5000)(defaultDispatcher) // FIXME is this proper timeout? - val oldFuture = actors.putIfAbsent(address, newFuture) - if (oldFuture eq null) { // we won the race -- create the actor and resolve the future - val actor: ActorRef = try { - app.deployer.lookupDeploymentFor(address) match { - case Some(DeploymentConfig.Deploy(_, _, routerType, nrOfInstances, failureDetectorType, DeploymentConfig.RemoteScope(remoteAddresses))) ⇒ + actors.putIfAbsent(address, newFuture) match { // we won the race -- create the actor and resolve the future + case null ⇒ + val actor: ActorRef = try { + app.deployer.lookupDeploymentFor(address) match { + case Some(DeploymentConfig.Deploy(_, _, routerType, nrOfInstances, failureDetectorType, DeploymentConfig.RemoteScope(remoteAddresses))) ⇒ - val failureDetector = DeploymentConfig.failureDetectorTypeFor(failureDetectorType) match { - case FailureDetectorType.NoOp ⇒ new NoOpFailureDetector - case FailureDetectorType.RemoveConnectionOnFirstFailure ⇒ new RemoveConnectionOnFirstFailureFailureDetector - case FailureDetectorType.BannagePeriod(timeToBan) ⇒ new BannagePeriodFailureDetector(timeToBan) - case FailureDetectorType.Custom(implClass) ⇒ FailureDetector.createCustomFailureDetector(implClass) - } - - val thisHostname = remote.address.getHostName - val thisPort = remote.address.getPort - - def isReplicaNode: Boolean = remoteAddresses exists { remoteAddress ⇒ - remoteAddress.hostname == thisHostname && remoteAddress.port == thisPort - } - - if (isReplicaNode) { - // we are on one of the replica node for this remote actor - val localProps = - if (props.dispatcher == Props.defaultDispatcher) props.copy(dispatcher = app.dispatcher) - else props - new LocalActorRef(app, localProps, address, false) - } else { - - // we are on the single "reference" node uses the remote actors on the replica nodes - val routerFactory: () ⇒ Router = DeploymentConfig.routerTypeFor(routerType) match { - case RouterType.Direct ⇒ - if (remoteAddresses.size != 1) throw new ConfigurationException( - "Actor [%s] configured with Direct router must have exactly 1 remote node configured. Found [%s]" - .format(address, remoteAddresses.mkString(", "))) - () ⇒ new DirectRouter - - case RouterType.Random ⇒ - if (remoteAddresses.size < 1) throw new ConfigurationException( - "Actor [%s] configured with Random router must have at least 1 remote node configured. Found [%s]" - .format(address, remoteAddresses.mkString(", "))) - () ⇒ new RandomRouter - - case RouterType.RoundRobin ⇒ - if (remoteAddresses.size < 1) throw new ConfigurationException( - "Actor [%s] configured with RoundRobin router must have at least 1 remote node configured. Found [%s]" - .format(address, remoteAddresses.mkString(", "))) - () ⇒ new RoundRobinRouter - - case RouterType.ScatterGather ⇒ - if (remoteAddresses.size < 1) throw new ConfigurationException( - "Actor [%s] configured with ScatterGather router must have at least 1 remote node configured. Found [%s]" - .format(address, remoteAddresses.mkString(", "))) - () ⇒ new ScatterGatherFirstCompletedRouter()(defaultDispatcher, defaultTimeout) - - case RouterType.LeastCPU ⇒ sys.error("Router LeastCPU not supported yet") - case RouterType.LeastRAM ⇒ sys.error("Router LeastRAM not supported yet") - case RouterType.LeastMessages ⇒ sys.error("Router LeastMessages not supported yet") - case RouterType.Custom(implClass) ⇒ () ⇒ Routing.createCustomRouter(implClass) + val failureDetector = DeploymentConfig.failureDetectorTypeFor(failureDetectorType) match { + case FailureDetectorType.NoOp ⇒ new NoOpFailureDetector + case FailureDetectorType.RemoveConnectionOnFirstFailure ⇒ new RemoveConnectionOnFirstFailureFailureDetector + case FailureDetectorType.BannagePeriod(timeToBan) ⇒ new BannagePeriodFailureDetector(timeToBan) + case FailureDetectorType.Custom(implClass) ⇒ FailureDetector.createCustomFailureDetector(implClass) } - var connections = Map.empty[InetSocketAddress, ActorRef] - remoteAddresses foreach { remoteAddress: DeploymentConfig.RemoteAddress ⇒ - val inetSocketAddress = new InetSocketAddress(remoteAddress.hostname, remoteAddress.port) - connections += (inetSocketAddress -> RemoteActorRef(remote.server, inetSocketAddress, address, None)) + val thisHostname = remote.address.getHostName + val thisPort = remote.address.getPort + + def isReplicaNode: Boolean = remoteAddresses exists { some ⇒ some.hostname == thisHostname && some.port == thisPort } + + if (isReplicaNode) { + // we are on one of the replica node for this remote actor + new LocalActorRef(app, props, address, false) + } else { + + // we are on the single "reference" node uses the remote actors on the replica nodes + val routerFactory: () ⇒ Router = DeploymentConfig.routerTypeFor(routerType) match { + case RouterType.Direct ⇒ + if (remoteAddresses.size != 1) throw new ConfigurationException( + "Actor [%s] configured with Direct router must have exactly 1 remote node configured. Found [%s]" + .format(address, remoteAddresses.mkString(", "))) + () ⇒ new DirectRouter + + case RouterType.Random ⇒ + if (remoteAddresses.size < 1) throw new ConfigurationException( + "Actor [%s] configured with Random router must have at least 1 remote node configured. Found [%s]" + .format(address, remoteAddresses.mkString(", "))) + () ⇒ new RandomRouter + + case RouterType.RoundRobin ⇒ + if (remoteAddresses.size < 1) throw new ConfigurationException( + "Actor [%s] configured with RoundRobin router must have at least 1 remote node configured. Found [%s]" + .format(address, remoteAddresses.mkString(", "))) + () ⇒ new RoundRobinRouter + + case RouterType.ScatterGather ⇒ + if (remoteAddresses.size < 1) throw new ConfigurationException( + "Actor [%s] configured with ScatterGather router must have at least 1 remote node configured. Found [%s]" + .format(address, remoteAddresses.mkString(", "))) + () ⇒ new ScatterGatherFirstCompletedRouter()(defaultDispatcher, defaultTimeout) + + case RouterType.LeastCPU ⇒ sys.error("Router LeastCPU not supported yet") + case RouterType.LeastRAM ⇒ sys.error("Router LeastRAM not supported yet") + case RouterType.LeastMessages ⇒ sys.error("Router LeastMessages not supported yet") + case RouterType.Custom(implClass) ⇒ () ⇒ Routing.createCustomRouter(implClass) + } + + val connections = (Map.empty[InetSocketAddress, ActorRef] /: remoteAddresses) { (conns, a) ⇒ + val inetAddr = new InetSocketAddress(a.hostname, a.port) + conns + (inetAddr -> RemoteActorRef(remote.server, inetAddr, address, None)) + } + + val connectionManager = new RemoteConnectionManager(app, remote, connections, failureDetector) + + connections.keys foreach { useActorOnNode(_, address, props.creator) } + + actorOf(RoutedProps(routerFactory = routerFactory, connectionManager = connectionManager), address) } - val connectionManager = new RemoteConnectionManager(app, remote, connections, failureDetector) - - connections.keys foreach { useActorOnNode(_, address, props.creator) } - - actorOf(RoutedProps(routerFactory = routerFactory, connectionManager = connectionManager), address) - } - - case deploy ⇒ local.actorOf(props, address, systemService) + case deploy ⇒ local.actorOf(props, address, systemService) + } + } catch { + case e: Exception ⇒ + newFuture completeWithException e // so the other threads gets notified of error + throw e } - } catch { - case e: Exception ⇒ - newFuture completeWithException e // so the other threads gets notified of error - throw e - } - // actor foreach app.registry.register // only for ActorRegistry backward compat, will be removed later + // actor foreach app.registry.register // only for ActorRegistry backward compat, will be removed later - newFuture completeWithResult actor - actor - - } else { // we lost the race -- wait for future to complete - oldFuture.get + newFuture completeWithResult actor + actors.replace(address, newFuture, actor) + actor + case actor: ActorRef ⇒ actor + case future: Future[_] ⇒ future.get.asInstanceOf[ActorRef] } } @@ -151,8 +143,9 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider } def actorFor(address: String): Option[ActorRef] = actors.get(address) match { - case null ⇒ None - case future ⇒ Some(future.get) + case null ⇒ None + case actor: ActorRef ⇒ Some(actor) + case future: Future[_] ⇒ Some(future.get.asInstanceOf[ActorRef]) } /** diff --git a/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala b/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala index b89b9310a5..d5a99fe893 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala @@ -52,6 +52,8 @@ class RemoteConnectionManager( def size: Int = connections.connections.size + def isEmpty: Boolean = connections.connections.isEmpty + def shutdown() { state.get.iterable foreach (_.stop()) // shut down all remote connections }