diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index c3e94188b1..618c442b89 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -115,7 +115,7 @@ object Actor extends Logging { private[actor] val actorRefInCreation = new scala.util.DynamicVariable[Option[ActorRef]](None) /** - * Creates an ActorRef out of the Actor with type T. + * Creates an ActorRef out of the Actor with type T. *
    *   import Actor._
    *   val actor = actorOf[MyActor]
@@ -128,26 +128,7 @@ object Actor extends Logging {
    *   val actor = actorOf[MyActor].start
    * 
*/ - def actorOf[T <: Actor : Manifest]: ActorRef = - ActorRegistry.actorOf[T] - - /** - * Creates a Client-managed ActorRef out of the Actor of the specified Class. - * If the supplied host and port is identical of the configured local node, it will be a local actor - *
-   *   import Actor._
-   *   val actor = actorOf[MyActor]("www.akka.io",2552)
-   *   actor.start
-   *   actor ! message
-   *   actor.stop
-   * 
- * You can create and start the actor in one statement like this: - *
-   *   val actor = actorOf[MyActor]("www.akka.io",2552).start
-   * 
- */ - def actorOf[T <: Actor : Manifest](host: String, port: Int): ActorRef = - ActorRegistry.actorOf[T](host,port) + def actorOf[T <: Actor : Manifest]: ActorRef = actorOf(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]]) /** * Creates an ActorRef out of the Actor of the specified Class. @@ -163,8 +144,15 @@ object Actor extends Logging { * val actor = actorOf(classOf[MyActor]).start * */ - def actorOf(clazz: Class[_ <: Actor]): ActorRef = - ActorRegistry.actorOf(clazz) + def actorOf(clazz: Class[_ <: Actor]): ActorRef = new LocalActorRef(() => { + import ReflectiveAccess.{ createInstance, noParams, noArgs } + createInstance[Actor](clazz.asInstanceOf[Class[_]], noParams, noArgs).getOrElse( + throw new ActorInitializationException( + "Could not instantiate Actor" + + "\nMake sure Actor is NOT defined inside a class/trait," + + "\nif so put it outside the class/trait, f.e. in a companion object," + + "\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'.")) + }, None) /** * Creates a Client-managed ActorRef out of the Actor of the specified Class. @@ -181,8 +169,62 @@ object Actor extends Logging { * val actor = actorOf(classOf[MyActor],"www.akka.io",2552).start * */ - def actorOf(clazz: Class[_ <: Actor], host: String, port: Int, timeout: Long = Actor.TIMEOUT): ActorRef = - ActorRegistry.actorOf(clazz, host, port, timeout) + def actorOf(factory: => Actor, host: String, port: Int): ActorRef = + ActorRegistry.remote.clientManagedActorOf(() => factory, host, port) + + /** + * Creates a Client-managed ActorRef out of the Actor of the specified Class. + * If the supplied host and port is identical of the configured local node, it will be a local actor + *
+   *   import Actor._
+   *   val actor = actorOf(classOf[MyActor],"www.akka.io",2552)
+   *   actor.start
+   *   actor ! message
+   *   actor.stop
+   * 
+ * You can create and start the actor in one statement like this: + *
+   *   val actor = actorOf(classOf[MyActor],"www.akka.io",2552).start
+   * 
+ */ + def actorOf(clazz: Class[_ <: Actor], host: String, port: Int): ActorRef = { + import ReflectiveAccess.{ createInstance, noParams, noArgs } + ActorRegistry.remote.clientManagedActorOf(() => + createInstance[Actor](clazz.asInstanceOf[Class[_]], noParams, noArgs).getOrElse( + throw new ActorInitializationException( + "Could not instantiate Actor" + + "\nMake sure Actor is NOT defined inside a class/trait," + + "\nif so put it outside the class/trait, f.e. in a companion object," + + "\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'.")), + host, port) + } + + /** + * Creates a Client-managed ActorRef out of the Actor of the specified Class. + * If the supplied host and port is identical of the configured local node, it will be a local actor + *
+   *   import Actor._
+   *   val actor = actorOf[MyActor]("www.akka.io",2552)
+   *   actor.start
+   *   actor ! message
+   *   actor.stop
+   * 
+ * You can create and start the actor in one statement like this: + *
+   *   val actor = actorOf[MyActor]("www.akka.io",2552).start
+   * 
+ */ + def actorOf[T <: Actor : Manifest](host: String, port: Int): ActorRef = { + import ReflectiveAccess.{ createInstance, noParams, noArgs } + ActorRegistry.remote.clientManagedActorOf(() => + createInstance[Actor](manifest[T].erasure.asInstanceOf[Class[_]], noParams, noArgs).getOrElse( + throw new ActorInitializationException( + "Could not instantiate Actor" + + "\nMake sure Actor is NOT defined inside a class/trait," + + "\nif so put it outside the class/trait, f.e. in a companion object," + + "\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'.")), + host, port) + } /** * Creates an ActorRef out of the Actor. Allows you to pass in a factory function @@ -202,25 +244,32 @@ object Actor extends Logging { * val actor = actorOf(new MyActor).start * */ - def actorOf(factory: => Actor): ActorRef = ActorRegistry.actorOf(factory) + def actorOf(factory: => Actor): ActorRef = new LocalActorRef(() => factory, None) /** * Use to spawn out a block of code in an event-driven actor. Will shut actor down when * the block has been executed. *

- * NOTE: If used from within an Actor then has to be qualified with 'Actor.spawn' since + * NOTE: If used from within an Actor then has to be qualified with 'ActorRegistry.spawn' since * there is a method 'spawn[ActorType]' in the Actor trait already. * Example: *

-   * import Actor._
+   * import ActorRegistry.{spawn}
    *
    * spawn  {
    *   ... // do stuff
    * }
    * 
*/ - def spawn(body: => Unit)(implicit dispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher): Unit = - ActorRegistry.spawn(body)(dispatcher) + def spawn(body: => Unit)(implicit dispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher): Unit = { + case object Spawn + actorOf(new Actor() { + self.dispatcher = dispatcher + def receive = { + case Spawn => try { body } finally { self.stop } + } + }).start ! Spawn + } /** diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index ff126441e9..cfd1dd9904 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -172,14 +172,14 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal def getDispatcher(): MessageDispatcher = dispatcher /** - * Returns on which node this actor lives + * Returns on which node this actor lives if None it lives in the local ActorRegistry */ - def homeAddress: InetSocketAddress + def homeAddress: Option[InetSocketAddress] /** * Java API */ - def getHomeAddress(): InetSocketAddress = homeAddress + def getHomeAddress(): InetSocketAddress = homeAddress getOrElse null /** * Holds the hot swapped partial function. @@ -451,7 +451,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal *

* To be invoked from within the actor itself. */ - def spawnRemote(clazz: Class[_ <: Actor], hostname: String, port: Int): ActorRef + def spawnRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long): ActorRef /** * Atomically create (from actor class), link and start an actor. @@ -465,7 +465,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal *

* To be invoked from within the actor itself. */ - def spawnLinkRemote(clazz: Class[_ <: Actor], hostname: String, port: Int): ActorRef + def spawnLinkRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long): ActorRef /** * Returns the mailbox size. @@ -551,7 +551,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal */ class LocalActorRef private[akka] ( private[this] val actorFactory: () => Actor, - val homeAddress: InetSocketAddress = Remote.localAddress) + val homeAddress: Option[InetSocketAddress]) extends ActorRef with ScalaActorRef { @volatile @@ -582,7 +582,7 @@ class LocalActorRef private[akka] ( __supervisor: Option[ActorRef], __hotswap: Stack[PartialFunction[Any, Unit]], __factory: () => Actor, - __homeAddress: InetSocketAddress) = { + __homeAddress: Option[InetSocketAddress]) = { this(__factory, __homeAddress) _uuid = __uuid id = __id @@ -595,7 +595,10 @@ class LocalActorRef private[akka] ( start } - private final def isClientManaged_? = (homeAddress ne Remote.localAddress) && homeAddress != Remote.localAddress + /** + * Returns whether this actor ref is client-managed remote or not + */ + private[akka] final def isClientManaged_? = homeAddress.isDefined && isRemotingEnabled // ========= PUBLIC FUNCTIONS ========= @@ -640,8 +643,8 @@ class LocalActorRef private[akka] ( if ((actorInstance ne null) && (actorInstance.get ne null)) initializeActorInstance - if (isRemotingEnabled && isClientManaged_?) - ActorRegistry.remote.registerClientManagedActor(homeAddress.getHostName,homeAddress.getPort, uuid) + if (isClientManaged_?) + ActorRegistry.remote.registerClientManagedActor(homeAddress.get.getHostName,homeAddress.get.getPort, uuid) checkReceiveTimeout //Schedule the initial Receive timeout } @@ -661,7 +664,7 @@ class LocalActorRef private[akka] ( ActorRegistry.unregister(this) if (isRemotingEnabled) { if (isClientManaged_?) - ActorRegistry.remote.registerClientManagedActor(homeAddress.getHostName,homeAddress.getPort, uuid) + ActorRegistry.remote.registerClientManagedActor(homeAddress.get.getHostName,homeAddress.get.getPort, uuid) ActorRegistry.remote.unregister(this) } setActorSelfFields(actorInstance.get,null) @@ -732,9 +735,11 @@ class LocalActorRef private[akka] ( *

* To be invoked from within the actor itself. */ - def spawnRemote(clazz: Class[_ <: Actor], hostname: String, port: Int): ActorRef = guard.withGuard { + def spawnRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long = Actor.TIMEOUT): ActorRef = guard.withGuard { ensureRemotingEnabled - Actor.actorOf(clazz, hostname, port).start + val ref = Actor.actorOf(clazz, hostname, port) + ref.timeout = timeout + ref.start } /** @@ -754,13 +759,15 @@ class LocalActorRef private[akka] ( *

* To be invoked from within the actor itself. */ - def spawnLinkRemote(clazz: Class[_ <: Actor], hostname: String, port: Int): ActorRef = guard.withGuard { - ensureRemotingEnabled - val actor = Actor.actorOf(clazz, hostname, port) - link(actor) - actor.start - actor - } + def spawnLinkRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long = Actor.TIMEOUT): ActorRef = + guard.withGuard { + ensureRemotingEnabled + val actor = Actor.actorOf(clazz, hostname, port) + actor.timeout = timeout + link(actor) + actor.start + actor + } /** * Returns the mailbox. @@ -790,9 +797,9 @@ class LocalActorRef private[akka] ( protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit = _supervisor = sup protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = - if (isClientManaged_? && isRemotingEnabled) { + if (isClientManaged_?) { ActorRegistry.remote.send[Any]( - message, senderOption, None, homeAddress, timeout, true, this, None, ActorType.ScalaActor, None) + message, senderOption, None, homeAddress.get, timeout, true, this, None, ActorType.ScalaActor, None) } else dispatcher dispatchMessage new MessageInvocation(this, message, senderOption, None) @@ -801,9 +808,9 @@ class LocalActorRef private[akka] ( timeout: Long, senderOption: Option[ActorRef], senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { - if (isClientManaged_? && isRemotingEnabled) { + if (isClientManaged_?) { val future = ActorRegistry.remote.send[T]( - message, senderOption, senderFuture, homeAddress, timeout, false, this, None, ActorType.ScalaActor, None) + message, senderOption, senderFuture, homeAddress.get, timeout, false, this, None, ActorType.ScalaActor, None) if (future.isDefined) future.get else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString) } else { @@ -970,7 +977,8 @@ class LocalActorRef private[akka] ( protected[akka] def registerSupervisorAsRemoteActor: Option[Uuid] = guard.withGuard { ensureRemotingEnabled if (_supervisor.isDefined) { - ActorRegistry.remote.registerSupervisorForActor(this) + if (homeAddress.isDefined) + ActorRegistry.remote.registerSupervisorForActor(this) Some(_supervisor.get.uuid) } else None } @@ -1108,7 +1116,7 @@ private[akka] case class RemoteActorRef private[akka] ( ensureRemotingEnabled - val homeAddress = new InetSocketAddress(hostname, port) + val homeAddress = Some(new InetSocketAddress(hostname, port)) //protected def clientManaged = classOrServiceName.isEmpty //If no class or service name, it's client managed id = classOrServiceName @@ -1119,14 +1127,14 @@ private[akka] case class RemoteActorRef private[akka] ( start def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = - ActorRegistry.remote.send[Any](message, senderOption, None, homeAddress, timeout, true, this, None, actorType, loader) + ActorRegistry.remote.send[Any](message, senderOption, None, homeAddress.get, timeout, true, this, None, actorType, loader) def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( message: Any, timeout: Long, senderOption: Option[ActorRef], senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { - val future = ActorRegistry.remote.send[T](message, senderOption, senderFuture, homeAddress, timeout, false, this, None, actorType, loader) + val future = ActorRegistry.remote.send[T](message, senderOption, senderFuture, homeAddress.get, timeout, false, this, None, actorType, loader) if (future.isDefined) future.get else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString) } @@ -1158,9 +1166,9 @@ private[akka] case class RemoteActorRef private[akka] ( def startLink(actorRef: ActorRef): Unit = unsupported def startLinkRemote(actorRef: ActorRef, hostname: String, port: Int): Unit = unsupported def spawn(clazz: Class[_ <: Actor]): ActorRef = unsupported - def spawnRemote(clazz: Class[_ <: Actor], hostname: String, port: Int): ActorRef = unsupported + def spawnRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long): ActorRef = unsupported def spawnLink(clazz: Class[_ <: Actor]): ActorRef = unsupported - def spawnLinkRemote(clazz: Class[_ <: Actor], hostname: String, port: Int): ActorRef = unsupported + def spawnLinkRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long): ActorRef = unsupported def supervisor: Option[ActorRef] = unsupported def shutdownLinkedActors: Unit = unsupported protected[akka] def mailbox: AnyRef = unsupported @@ -1397,9 +1405,9 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef => /** * Atomically create (from actor class), start and make an actor remote. */ - def spawnRemote[T <: Actor: Manifest](hostname: String, port: Int): ActorRef = { + def spawnRemote[T <: Actor: Manifest](hostname: String, port: Int, timeout: Long): ActorRef = { ensureRemotingEnabled - spawnRemote(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]], hostname, port) + spawnRemote(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]], hostname, port, timeout) } /** @@ -1411,9 +1419,9 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef => /** * Atomically create (from actor class), start, link and make an actor remote. */ - def spawnLinkRemote[T <: Actor: Manifest](hostname: String, port: Int): ActorRef = { + def spawnLinkRemote[T <: Actor: Manifest](hostname: String, port: Int, timeout: Long): ActorRef = { ensureRemotingEnabled - spawnLinkRemote(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]], hostname, port) + spawnLinkRemote(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]], hostname, port, timeout) } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala b/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala index e3934f1a58..d17f406e89 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala @@ -234,125 +234,9 @@ object ActorRegistry extends ListenerManagement { lazy val remote: RemoteSupport = remoteBootstrap.map(_()).getOrElse(throw new UnsupportedOperationException("You need to have akka-remote on classpath")) /** - * Creates an ActorRef out of the Actor with type T. - *

-   *   import Actor._
-   *   val actor = actorOf[MyActor]
-   *   actor.start
-   *   actor ! message
-   *   actor.stop
-   * 
- * You can create and start the actor in one statement like this: - *
-   *   val actor = actorOf[MyActor].start
-   * 
+ * Current home address of this ActorRegistry */ - def actorOf[T <: Actor : Manifest]: ActorRef = actorOf(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]]) - - /** - * Creates a Client-managed ActorRef out of the Actor of the specified Class. - * If the supplied host and port is identical of the configured local node, it will be a local actor - *
-   *   import Actor._
-   *   val actor = actorOf[MyActor]("www.akka.io",2552)
-   *   actor.start
-   *   actor ! message
-   *   actor.stop
-   * 
- * You can create and start the actor in one statement like this: - *
-   *   val actor = actorOf[MyActor]("www.akka.io",2552).start
-   * 
- */ - def actorOf[T <: Actor : Manifest](host: String, port: Int): ActorRef = - actorOf(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]], host, port) - - /** - * Creates an ActorRef out of the Actor of the specified Class. - *
-   *   import Actor._
-   *   val actor = actorOf(classOf[MyActor])
-   *   actor.start
-   *   actor ! message
-   *   actor.stop
-   * 
- * You can create and start the actor in one statement like this: - *
-   *   val actor = actorOf(classOf[MyActor]).start
-   * 
- */ - def actorOf(clazz: Class[_ <: Actor]): ActorRef = new LocalActorRef(() => { - import ReflectiveAccess.{ createInstance, noParams, noArgs } - createInstance[Actor](clazz.asInstanceOf[Class[_]], noParams, noArgs).getOrElse( - throw new ActorInitializationException( - "Could not instantiate Actor" + - "\nMake sure Actor is NOT defined inside a class/trait," + - "\nif so put it outside the class/trait, f.e. in a companion object," + - "\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'.")) - }) - - /** - * Creates a Client-managed ActorRef out of the Actor of the specified Class. - * If the supplied host and port is identical of the configured local node, it will be a local actor - *
-   *   import Actor._
-   *   val actor = actorOf(classOf[MyActor],"www.akka.io",2552)
-   *   actor.start
-   *   actor ! message
-   *   actor.stop
-   * 
- * You can create and start the actor in one statement like this: - *
-   *   val actor = actorOf(classOf[MyActor],"www.akka.io",2552).start
-   * 
- */ - def actorOf(clazz: Class[_ <: Actor], host: String, port: Int, timeout: Long = Actor.TIMEOUT): ActorRef = - remote.clientManagedActorOf(clazz, host, port, timeout) - - /** - * Creates an ActorRef out of the Actor. Allows you to pass in a factory function - * that creates the Actor. Please note that this function can be invoked multiple - * times if for example the Actor is supervised and needs to be restarted. - *

- * This function should NOT be used for remote actors. - *

-   *   import Actor._
-   *   val actor = actorOf(new MyActor)
-   *   actor.start
-   *   actor ! message
-   *   actor.stop
-   * 
- * You can create and start the actor in one statement like this: - *
-   *   val actor = actorOf(new MyActor).start
-   * 
- */ - def actorOf(factory: => Actor): ActorRef = new LocalActorRef(() => factory) - - /** - * Use to spawn out a block of code in an event-driven actor. Will shut actor down when - * the block has been executed. - *

- * NOTE: If used from within an Actor then has to be qualified with 'ActorRegistry.spawn' since - * there is a method 'spawn[ActorType]' in the Actor trait already. - * Example: - *

-   * import ActorRegistry.{spawn}
-   *
-   * spawn  {
-   *   ... // do stuff
-   * }
-   * 
- */ - def spawn(body: => Unit)(implicit dispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher): Unit = { - case object Spawn - actorOf(new Actor() { - self.dispatcher = dispatcher - def receive = { - case Spawn => try { body } finally { self.stop } - } - }).start ! Spawn - } + def homeAddress(): InetSocketAddress = if (isRemotingEnabled) remote.address else Remote.configDefaultAddress /** diff --git a/akka-actor/src/main/scala/akka/actor/Supervisor.scala b/akka-actor/src/main/scala/akka/actor/Supervisor.scala index 288bdfda24..daf7a962de 100644 --- a/akka-actor/src/main/scala/akka/actor/Supervisor.scala +++ b/akka-actor/src/main/scala/akka/actor/Supervisor.scala @@ -142,7 +142,7 @@ sealed class Supervisor(handler: FaultHandlingStrategy) { actorRef.lifeCycle = lifeCycle supervisor.link(actorRef) if (registerAsRemoteService) - ActorRegistry.remote.register(actorRef) //TODO: REVISIT: Is this the most sensible approach? other way of obtaining ActorRegistry? + ActorRegistry.remote.register(actorRef) case supervisorConfig @ SupervisorConfig(_, _) => // recursive supervisor configuration val childSupervisor = Supervisor(supervisorConfig) supervisor.link(childSupervisor.supervisor) diff --git a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala index d12847e149..f0f2f259d6 100644 --- a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala @@ -132,7 +132,7 @@ trait MessageDispatcher extends MailboxFactory with Logging { val i = uuids.iterator while(i.hasNext()) { val uuid = i.next() - ActorRegistry.actorFor(uuid) match { //TODO: REVISIT: How to keep track of which registry? + ActorRegistry.actorFor(uuid) match { case Some(actor) => actor.stop case None => log.slf4j.error("stopAllLinkedActors couldn't find linked actor: " + uuid) diff --git a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala index 11ed70a08a..751bfbc188 100644 --- a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala +++ b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala @@ -13,6 +13,8 @@ import akka.config.Config.{config, TIME_UNIT} import java.util.concurrent.ConcurrentHashMap trait RemoteModule extends Logging { + val UUID_PREFIX = "uuid:" + def optimizeLocalScoped_?(): Boolean //Apply optimizations for remote operations in local scope protected[akka] def notifyListeners(message: => Any): Unit @@ -23,6 +25,35 @@ trait RemoteModule extends Logging { private[akka] def typedActors: ConcurrentHashMap[String, AnyRef] private[akka] def typedActorsByUuid: ConcurrentHashMap[String, AnyRef] private[akka] def typedActorsFactories: ConcurrentHashMap[String, () => AnyRef] + + + /** Lookup methods **/ + + private[akka] def findActorById(id: String) : ActorRef = actors.get(id) + + private[akka] def findActorByUuid(uuid: String) : ActorRef = actorsByUuid.get(uuid) + + private[akka] def findActorFactory(id: String) : () => ActorRef = actorsFactories.get(id) + + private[akka] def findTypedActorById(id: String) : AnyRef = typedActors.get(id) + + private[akka] def findTypedActorFactory(id: String) : () => AnyRef = typedActorsFactories.get(id) + + private[akka] def findTypedActorByUuid(uuid: String) : AnyRef = typedActorsByUuid.get(uuid) + + private[akka] def findActorByIdOrUuid(id: String, uuid: String) : ActorRef = { + var actorRefOrNull = if (id.startsWith(UUID_PREFIX)) findActorByUuid(id.substring(UUID_PREFIX.length)) + else findActorById(id) + if (actorRefOrNull eq null) actorRefOrNull = findActorByUuid(uuid) + actorRefOrNull + } + + private[akka] def findTypedActorByIdOrUuid(id: String, uuid: String) : AnyRef = { + var actorRefOrNull = if (id.startsWith(UUID_PREFIX)) findTypedActorByUuid(id.substring(UUID_PREFIX.length)) + else findTypedActorById(id) + if (actorRefOrNull eq null) actorRefOrNull = findTypedActorByUuid(uuid) + actorRefOrNull + } } @@ -35,11 +66,11 @@ abstract class RemoteSupport extends ListenerManagement with RemoteServerModule protected override def manageLifeCycleOfListeners = false protected[akka] override def notifyListeners(message: => Any): Unit = super.notifyListeners(message) - private[akka] val actors = new ConcurrentHashMap[String, ActorRef] - private[akka] val actorsByUuid = new ConcurrentHashMap[String, ActorRef] - private[akka] val actorsFactories = new ConcurrentHashMap[String, () => ActorRef] - private[akka] val typedActors = new ConcurrentHashMap[String, AnyRef] - private[akka] val typedActorsByUuid = new ConcurrentHashMap[String, AnyRef] + private[akka] val actors = new ConcurrentHashMap[String, ActorRef] + private[akka] val actorsByUuid = new ConcurrentHashMap[String, ActorRef] + private[akka] val actorsFactories = new ConcurrentHashMap[String, () => ActorRef] + private[akka] val typedActors = new ConcurrentHashMap[String, AnyRef] + private[akka] val typedActorsByUuid = new ConcurrentHashMap[String, AnyRef] private[akka] val typedActorsFactories = new ConcurrentHashMap[String, () => AnyRef] def clear { @@ -64,19 +95,16 @@ trait RemoteServerModule extends RemoteModule { def name: String /** - * Gets the current hostname of the server instance + * Gets the address of the server instance */ - def hostname: String - - /** - * Gets the current port of the server instance - */ - def port: Int + def address: InetSocketAddress /** * Starts the server up */ - def start(host: String = ReflectiveAccess.Remote.HOSTNAME, port: Int = ReflectiveAccess.Remote.PORT, loader: Option[ClassLoader] = None): RemoteServerModule + def start(host: String = ReflectiveAccess.Remote.configDefaultAddress.getHostName, + port: Int = ReflectiveAccess.Remote.configDefaultAddress.getPort, + loader: Option[ClassLoader] = None): RemoteServerModule /** * Shuts the server down @@ -222,7 +250,7 @@ trait RemoteClientModule extends RemoteModule { self: RemoteModule => def typedActorFor[T](intfClass: Class[T], serviceId: String, implClassName: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): T = typedActorFor(intfClass, serviceId, implClassName, timeout, hostname, port, Some(loader)) - def clientManagedActorOf(clazz: Class[_ <: Actor], host: String, port: Int, timeout: Long): ActorRef + def clientManagedActorOf(factory: () => Actor, host: String, port: Int): ActorRef /** Methods that needs to be implemented by a transport **/ diff --git a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala index 5ba5f513b2..b449e45552 100644 --- a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala +++ b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala @@ -21,7 +21,7 @@ object ReflectiveAccess extends Logging { val loader = getClass.getClassLoader - lazy val isRemotingEnabled = Remote.isEnabled + def isRemotingEnabled = Remote.isEnabled lazy val isTypedActorEnabled = TypedActorModule.isEnabled def ensureRemotingEnabled = Remote.ensureEnabled @@ -33,18 +33,18 @@ object ReflectiveAccess extends Logging { * @author Jonas Bonér */ object Remote { - val TRANSPORT = Config.config.getString("akka.remote.transport","akka.remote.NettyRemoteSupport") - val HOSTNAME = Config.config.getString("akka.remote.server.hostname", "localhost") - val PORT = Config.config.getInt("akka.remote.server.port", 2552) + val TRANSPORT = Config.config.getString("akka.remote.layer","akka.remote.NettyRemoteSupport") + + private[akka] val configDefaultAddress = + new InetSocketAddress(Config.config.getString("akka.remote.server.hostname", "localhost"), + Config.config.getInt("akka.remote.server.port", 2552)) - lazy val localAddress = new InetSocketAddress(HOSTNAME,PORT) lazy val isEnabled = remoteSupportClass.isDefined def ensureEnabled = if (!isEnabled) throw new ModuleNotAvailableException( "Can't load the remoting module, make sure that akka-remote.jar is on the classpath") - //TODO: REVISIT: Make class configurable val remoteSupportClass: Option[Class[_ <: RemoteSupport]] = getClassFor(TRANSPORT) protected[akka] val defaultRemoteSupport: Option[() => RemoteSupport] = remoteSupportClass map { diff --git a/akka-remote/src/main/scala/akka/remote/BootableRemoteActorService.scala b/akka-remote/src/main/scala/akka/remote/BootableRemoteActorService.scala index 58892c2ad3..d1d4d954f5 100644 --- a/akka-remote/src/main/scala/akka/remote/BootableRemoteActorService.scala +++ b/akka-remote/src/main/scala/akka/remote/BootableRemoteActorService.scala @@ -17,8 +17,7 @@ trait BootableRemoteActorService extends Bootable with Logging { self: BootableActorLoaderService => protected lazy val remoteServerThread = new Thread(new Runnable() { - import ReflectiveAccess.Remote.{HOSTNAME,PORT} - def run = ActorRegistry.remote.start(HOSTNAME,PORT,loader = self.applicationLoader) + def run = ActorRegistry.remote.start(loader = self.applicationLoader) //Use config host/port }, "Akka Remote Service") def startRemoteService = remoteServerThread.start diff --git a/akka-remote/src/main/scala/akka/remote/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/NettyRemoteSupport.scala index c4d2036cc4..da0f326705 100644 --- a/akka-remote/src/main/scala/akka/remote/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/NettyRemoteSupport.scala @@ -115,10 +115,10 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem } private[akka] def registerSupervisorForActor(actorRef: ActorRef): ActorRef = - clientFor(actorRef.homeAddress, None).registerSupervisorForActor(actorRef) + clientFor(actorRef.homeAddress.get, None).registerSupervisorForActor(actorRef) private[akka] def deregisterSupervisorForActor(actorRef: ActorRef): ActorRef = - clientFor(actorRef.homeAddress, None).deregisterSupervisorForActor(actorRef) + clientFor(actorRef.homeAddress.get, None).deregisterSupervisorForActor(actorRef) /** * Clean-up all open connections. @@ -486,8 +486,6 @@ class RemoteClientHandler( */ object RemoteServer { val isRemotingEnabled = config.getList("akka.enabled-modules").exists(_ == "remote") - - val UUID_PREFIX = "uuid:" val MESSAGE_FRAME_SIZE = config.getInt("akka.remote.server.message-frame-size", 1048576) val SECURE_COOKIE = config.getString("akka.remote.secure-cookie") val REQUIRE_COOKIE = { @@ -563,31 +561,22 @@ class NettyRemoteSupport extends RemoteSupport with NettyRemoteServerModule with def optimizeLocalScoped_?() = optimizeLocal.get - protected[akka] def actorFor(serviceId: String, className: String, timeout: Long, hostname: String, port: Int, loader: Option[ClassLoader]): ActorRef = { - //TODO: REVISIT: Possible to optimize server-managed actors in local scope? - //val Host = this.hostname - //val Port = this.port + protected[akka] def actorFor(serviceId: String, className: String, timeout: Long, host: String, port: Int, loader: Option[ClassLoader]): ActorRef = { + if (optimizeLocalScoped_?) { + val home = this.address + if (host == home.getHostName && port == home.getPort) {//TODO: switch to InetSocketAddres.equals? + val localRef = findActorByIdOrUuid(serviceId,serviceId) - //(host,port) match { - // case (Host, Port) if optimizeLocalScoped_? => - //if actor with that servicename or uuid is present locally, return a LocalActorRef to that one - //else return RemoteActorRef(registry, serviceId, className, hostname, port, timeout, false, loader) - // case _ => - // RemoteActorRef(registry, serviceId, className, hostname, port, timeout, false, loader) - //} - RemoteActorRef(serviceId, className, hostname, port, timeout, loader) + if (localRef ne null) return localRef //Code significantly simpler with the return statement + } + } + + RemoteActorRef(serviceId, className, host, port, timeout, loader) } - def clientManagedActorOf(clazz: Class[_ <: Actor], host: String, port: Int, timeout: Long): ActorRef = { - import ReflectiveAccess.{ createInstance, noParams, noArgs } - val ref = new LocalActorRef(() => createInstance[Actor](clazz.asInstanceOf[Class[_]], noParams, noArgs).getOrElse( - throw new ActorInitializationException( - "Could not instantiate Actor" + - "\nMake sure Actor is NOT defined inside a class/trait," + - "\nif so put it outside the class/trait, f.e. in a companion object," + - "\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'.")), - new InetSocketAddress(host, port)) - ref.timeout = timeout + def clientManagedActorOf(factory: () => Actor, host: String, port: Int): ActorRef = { + val ref = new LocalActorRef(factory, Some(new InetSocketAddress(host, port))) + //ref.timeout = timeout //removed because setting default timeout should be done after construction ref } } @@ -595,6 +584,7 @@ class NettyRemoteSupport extends RemoteSupport with NettyRemoteServerModule with class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String, val port: Int, val loader: Option[ClassLoader]) { val name = "NettyRemoteServer@" + host + ":" + port + val address = new InetSocketAddress(host,port) private val factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool,Executors.newCachedThreadPool) @@ -610,7 +600,7 @@ class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String, bootstrap.setOption("child.reuseAddress", true) bootstrap.setOption("child.connectTimeoutMillis", RemoteServer.CONNECTION_TIMEOUT_MILLIS.toMillis) - openChannels.add(bootstrap.bind(new InetSocketAddress(host, port))) + openChannels.add(bootstrap.bind(address)) serverModule.notifyListeners(RemoteServerStarted(serverModule)) def shutdown { @@ -630,19 +620,16 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule => import RemoteServer._ private[akka] val currentServer = new AtomicReference[Option[NettyRemoteServer]](None) - def hostname = currentServer.get match { - case Some(s) => s.host - case None => ReflectiveAccess.Remote.HOSTNAME - } - - def port = currentServer.get match { - case Some(s) => s.port - case None => ReflectiveAccess.Remote.PORT + def address = currentServer.get match { + case Some(s) => s.address + case None => ReflectiveAccess.Remote.configDefaultAddress } def name = currentServer.get match { case Some(s) => s.name - case None => "NettyRemoteServer@" + ReflectiveAccess.Remote.HOSTNAME + ":" + ReflectiveAccess.Remote.PORT + case None => + val a = ReflectiveAccess.Remote.configDefaultAddress + "NettyRemoteServer@" + a.getHostName + ":" + a.getPort } private val _isRunning = new Switch(false) @@ -1096,57 +1083,18 @@ class RemoteServerHandler( } } - private def findActorById(id: String) : ActorRef = { - server.actors.get(id) - } - - private def findActorByUuid(uuid: String) : ActorRef = { - log.slf4j.debug("Trying to find actor for uuid '{}' inside {}",uuid,server.actorsByUuid) - server.actorsByUuid.get(uuid) - } - - private def findActorFactory(id: String) : () => ActorRef = { - server.actorsFactories.get(id) - } - private def findSessionActor(id: String, channel: Channel) : ActorRef = { val map = sessionActors.get(channel) if (map ne null) map.get(id) else null } - private def findTypedActorById(id: String) : AnyRef = { - server.typedActors.get(id) - } - - private def findTypedActorFactory(id: String) : () => AnyRef = { - server.typedActorsFactories.get(id) - } - private def findTypedSessionActor(id: String, channel: Channel) : AnyRef = { val map = typedSessionActors.get(channel) if (map ne null) map.get(id) else null } - private def findTypedActorByUuid(uuid: String) : AnyRef = { - server.typedActorsByUuid.get(uuid) - } - - private def findActorByIdOrUuid(id: String, uuid: String) : ActorRef = { - var actorRefOrNull = if (id.startsWith(UUID_PREFIX)) findActorByUuid(id.substring(UUID_PREFIX.length)) - else findActorById(id) - if (actorRefOrNull eq null) actorRefOrNull = findActorByUuid(uuid) - actorRefOrNull - } - - private def findTypedActorByIdOrUuid(id: String, uuid: String) : AnyRef = { - var actorRefOrNull = if (id.startsWith(UUID_PREFIX)) findTypedActorByUuid(id.substring(UUID_PREFIX.length)) - else findTypedActorById(id) - if (actorRefOrNull eq null) actorRefOrNull = findTypedActorByUuid(uuid) - actorRefOrNull - } - /** * gets the actor from the session, or creates one if there is a factory for it */ @@ -1159,7 +1107,7 @@ class RemoteServerHandler( sessionActorRefOrNull } else { // we dont have it in the session either, see if we have a factory for it - val actorFactoryOrNull = findActorFactory(id) + val actorFactoryOrNull = server.findActorFactory(id) if (actorFactoryOrNull ne null) { val actorRef = actorFactoryOrNull() actorRef.uuid = uuidFrom(uuid.getHigh,uuid.getLow) @@ -1185,7 +1133,7 @@ class RemoteServerHandler( log.slf4j.info("Creating a new client-managed remote actor [{}:{}]", name, uuid) val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(name) else Class.forName(name) - val actorRef = ActorRegistry.actorOf(clazz.asInstanceOf[Class[_ <: Actor]]) + val actorRef = Actor.actorOf(clazz.asInstanceOf[Class[_ <: Actor]]) actorRef.uuid = uuidFrom(uuid.getHigh,uuid.getLow) actorRef.id = id actorRef.timeout = timeout @@ -1211,7 +1159,7 @@ class RemoteServerHandler( val uuid = actorInfo.getUuid val id = actorInfo.getId - val actorRefOrNull = findActorByIdOrUuid(id, uuidFrom(uuid.getHigh,uuid.getLow).toString) + val actorRefOrNull = server.findActorByIdOrUuid(id, uuidFrom(uuid.getHigh,uuid.getLow).toString) if (actorRefOrNull ne null) actorRefOrNull @@ -1235,7 +1183,7 @@ class RemoteServerHandler( if (sessionActorRefOrNull ne null) sessionActorRefOrNull else { - val actorFactoryOrNull = findTypedActorFactory(id) + val actorFactoryOrNull = server.findTypedActorFactory(id) if (actorFactoryOrNull ne null) { val newInstance = actorFactoryOrNull() typedSessionActors.get(channel).put(id, newInstance) @@ -1280,7 +1228,7 @@ class RemoteServerHandler( val uuid = actorInfo.getUuid val id = actorInfo.getId - val typedActorOrNull = findTypedActorByIdOrUuid(id, uuidFrom(uuid.getHigh,uuid.getLow).toString) + val typedActorOrNull = server.findTypedActorByIdOrUuid(id, uuidFrom(uuid.getHigh,uuid.getLow).toString) if (typedActorOrNull ne null) typedActorOrNull else diff --git a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala index 92edf9d582..83b75dd5da 100644 --- a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala +++ b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala @@ -17,7 +17,6 @@ import scala.collection.immutable.Stack import com.google.protobuf.ByteString import akka.util.ReflectiveAccess -import akka.util.ReflectiveAccess.Remote.{HOSTNAME,PORT} import java.net.InetSocketAddress /** @@ -91,11 +90,14 @@ object ActorSerialization { def toBinaryJ[T <: Actor](a: ActorRef, format: Format[T], srlMailBox: Boolean = true): Array[Byte] = toBinary(a, srlMailBox)(format) - private[akka] def toAddressProtocol(actorRef: ActorRef) = + private[akka] def toAddressProtocol(actorRef: ActorRef) = { + val address = actorRef.homeAddress.getOrElse(ActorRegistry.remote.address) AddressProtocol.newBuilder - .setHostname(actorRef.homeAddress.getHostName) - .setPort(actorRef.homeAddress.getPort) + .setHostname(address.getHostName) + .setPort(address.getPort) .build + } + private[akka] def toSerializedActorRefProtocol[T <: Actor]( actorRef: ActorRef, format: Format[T], serializeMailBox: Boolean = true): SerializedActorRefProtocol = { @@ -129,7 +131,7 @@ object ActorSerialization { messages.map(m => RemoteActorSerialization.createRemoteMessageProtocolBuilder( Some(actorRef), - Left(actorRef.uuid), //TODO: REVISIT: generate uuid for the request + Left(actorRef.uuid), actorRef.id, actorRef.actorClassName, actorRef.timeout, @@ -201,7 +203,7 @@ object ActorSerialization { supervisor, hotswap, factory, - new InetSocketAddress(protocol.getOriginalAddress.getHostname,protocol.getOriginalAddress.getPort)) + None) //TODO: shouldn't originalAddress be optional? val messages = protocol.getMessagesList.toArray.toList.asInstanceOf[List[RemoteMessageProtocol]] messages.foreach(message => ar ! MessageSerializer.deserialize(message.getMessage)) diff --git a/akka-remote/src/test/scala/remote/AkkaRemoteTest.scala b/akka-remote/src/test/scala/remote/AkkaRemoteTest.scala index b22eaa8b27..2c2ee4ca9a 100644 --- a/akka-remote/src/test/scala/remote/AkkaRemoteTest.scala +++ b/akka-remote/src/test/scala/remote/AkkaRemoteTest.scala @@ -27,8 +27,9 @@ class AkkaRemoteTest extends val remote = ActorRegistry.remote val unit = TimeUnit.SECONDS - val host = remote.hostname - val port = remote.port + + val host = "localhost" + val port = 25520 var optimizeLocal_? = remote.asInstanceOf[NettyRemoteSupport].optimizeLocalScoped_? @@ -41,7 +42,7 @@ class AkkaRemoteTest extends } override def beforeEach { - remote.start() + remote.start(host,port) Thread.sleep(2000) super.beforeEach } diff --git a/akka-remote/src/test/scala/remote/RemoteTypedActorSpec.scala b/akka-remote/src/test/scala/remote/RemoteTypedActorSpec.scala index ce66a3d108..2301d4b253 100644 --- a/akka-remote/src/test/scala/remote/RemoteTypedActorSpec.scala +++ b/akka-remote/src/test/scala/remote/RemoteTypedActorSpec.scala @@ -4,23 +4,11 @@ package akka.actor.remote -import org.scalatest.matchers.ShouldMatchers -import org.scalatest.junit.JUnitRunner -import org.junit.runner.RunWith - import akka.config.Supervision._ import akka.actor._ -import akka.remote.{RemoteServer, RemoteClient} import java.util.concurrent.{LinkedBlockingQueue, TimeUnit, BlockingQueue} -import org.scalatest.{BeforeAndAfterEach, Spec, Assertions, BeforeAndAfterAll} -import akka.config.{Config, TypedActorConfigurator, RemoteAddress} -/* THIS SHOULD BE UNCOMMENTED -object RemoteTypedActorSpec { - val HOSTNAME = "localhost" - val PORT = 9988 - var server: RemoteServer = null -}*/ +import akka.config. {RemoteAddress, Config, TypedActorConfigurator} object RemoteTypedActorLog { val messageLog: BlockingQueue[String] = new LinkedBlockingQueue[String] @@ -31,22 +19,17 @@ object RemoteTypedActorLog { oneWayLog.clear } } -/* THIS SHOULD BE UNCOMMENTED -@RunWith(classOf[JUnitRunner]) -class RemoteTypedActorSpec extends - Spec with - ShouldMatchers with - BeforeAndAfterEach with BeforeAndAfterAll { + +class RemoteTypedActorSpec extends AkkaRemoteTest { import RemoteTypedActorLog._ - import RemoteTypedActorSpec._ - private val conf = new TypedActorConfigurator + private var conf: TypedActorConfigurator = _ - override def beforeAll = { - server = new RemoteServer() - server.start("localhost", 9995) + override def beforeEach { + super.beforeEach Config.config + conf = new TypedActorConfigurator conf.configure( new AllForOneStrategy(List(classOf[Exception]), 3, 5000), List( @@ -55,74 +38,57 @@ class RemoteTypedActorSpec extends classOf[RemoteTypedActorOneImpl], Permanent, 10000, - new RemoteAddress("localhost", 9995)), + RemoteAddress(host,port)), new SuperviseTypedActor( classOf[RemoteTypedActorTwo], classOf[RemoteTypedActorTwoImpl], Permanent, 10000, - new RemoteAddress("localhost", 9995)) + RemoteAddress(host,port)) ).toArray).supervise + } + + override def afterEach { + clearMessageLogs + conf.stop + super.afterEach Thread.sleep(1000) } - override def afterAll = { - conf.stop - try { - server.shutdown - RemoteClient.shutdownAll - Thread.sleep(1000) - } catch { - case e => () - } - ActorRegistry.shutdownAll - } + "Remote Typed Actor " should { - override def afterEach() { - server.typedActors.clear - } - - describe("Remote Typed Actor ") { - - it("should receive one-way message") { - clearMessageLogs + /*"receives one-way message" in { val ta = conf.getInstance(classOf[RemoteTypedActorOne]) - expect("oneway") { - ta.oneWay - oneWayLog.poll(5, TimeUnit.SECONDS) - } + ta.oneWay + oneWayLog.poll(5, TimeUnit.SECONDS) must equal ("oneway") } - it("should respond to request-reply message") { - clearMessageLogs + "responds to request-reply message" in { + val ta = conf.getInstance(classOf[RemoteTypedActorOne]) + ta.requestReply("ping") must equal ("pong") + } */ + + "be restarted on failure" in { val ta = conf.getInstance(classOf[RemoteTypedActorOne]) - expect("pong") { - ta.requestReply("ping") - } - } - - it("should be restarted on failure") { - clearMessageLogs - val ta = conf.getInstance(classOf[RemoteTypedActorOne]) - - intercept[RuntimeException] { + try { ta.requestReply("die") - } - messageLog.poll(5, TimeUnit.SECONDS) should equal ("Expected exception; to test fault-tolerance") + fail("Shouldn't get here") + } catch { case re: RuntimeException if re.getMessage == "Expected exception; to test fault-tolerance" => } + messageLog.poll(5, TimeUnit.SECONDS) must equal ("Expected exception; to test fault-tolerance") } - it("should restart linked friends on failure") { - clearMessageLogs + /* "restarts linked friends on failure" in { val ta1 = conf.getInstance(classOf[RemoteTypedActorOne]) val ta2 = conf.getInstance(classOf[RemoteTypedActorTwo]) - intercept[RuntimeException] { + try { ta1.requestReply("die") - } - messageLog.poll(5, TimeUnit.SECONDS) should equal ("Expected exception; to test fault-tolerance") - messageLog.poll(5, TimeUnit.SECONDS) should equal ("Expected exception; to test fault-tolerance") - } + fail("Shouldn't get here") + } catch { case re: RuntimeException if re.getMessage == "Expected exception; to test fault-tolerance" => } + messageLog.poll(5, TimeUnit.SECONDS) must equal ("Expected exception; to test fault-tolerance") + messageLog.poll(5, TimeUnit.SECONDS) must equal ("Expected exception; to test fault-tolerance") + }*/ } -} */ +} diff --git a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala index c09e266e62..37b303bead 100644 --- a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala +++ b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala @@ -1,16 +1,10 @@ package akka.actor.remote import java.util.concurrent.{CountDownLatch, TimeUnit} -import org.scalatest.WordSpec -import org.scalatest.matchers.MustMatchers -import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} -import org.scalatest.junit.JUnitRunner -import org.junit.runner.RunWith -import akka.util._ import akka.actor.Actor._ import akka.actor.{ActorRegistry, ActorRef, Actor} -import akka.remote. {NettyRemoteSupport, RemoteServer, RemoteClient} +import akka.remote. {NettyRemoteSupport} object ServerInitiatedRemoteActorSpec { case class Send(actor: ActorRef) diff --git a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteSessionActorSpec.scala b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteSessionActorSpec.scala index b0b10e5ada..6cad41e5e7 100644 --- a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteSessionActorSpec.scala +++ b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteSessionActorSpec.scala @@ -4,13 +4,6 @@ package akka.actor.remote -import org.scalatest._ -import org.scalatest.WordSpec -import org.scalatest.matchers.MustMatchers -import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} -import org.scalatest.junit.JUnitRunner -import org.junit.runner.RunWith - import akka.actor._ import akka.actor.Actor._ import akka.remote.NettyRemoteSupport diff --git a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedActorSpec.scala b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedActorSpec.scala index d504d7c4e3..79de741377 100644 --- a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedActorSpec.scala +++ b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedActorSpec.scala @@ -2,137 +2,99 @@ * Copyright (C) 2009-2010 Scalable Solutions AB */ -/* THIS SHOULD BE UNCOMMENTED package akka.actor.remote -import org.scalatest.Spec -import org.scalatest.matchers.ShouldMatchers -import org.scalatest.BeforeAndAfterAll -import org.scalatest.junit.JUnitRunner -import org.junit.runner.RunWith - import java.util.concurrent.TimeUnit import akka.remote.{RemoteServer, RemoteClient} import akka.actor._ import RemoteTypedActorLog._ -object ServerInitiatedRemoteTypedActorSpec { - val HOSTNAME = "localhost" - val PORT = 9990 - var server: RemoteServer = null +class ServerInitiatedRemoteTypedActorSpec extends AkkaRemoteTest { + + override def beforeEach = { + super.beforeEach + val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000) + remote.registerTypedActor("typed-actor-service", typedActor) + } + + override def afterEach { + super.afterEach + clearMessageLogs + } + + def createRemoteActorRef = remote.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", 5000L, host, port) + + "Server managed remote typed Actor " should { + + "receive one-way message" in { + val actor = createRemoteActorRef + actor.oneWay + oneWayLog.poll(5, TimeUnit.SECONDS) must equal ("oneway") + } + + "should respond to request-reply message" in { + val actor = createRemoteActorRef + actor.requestReply("ping") must equal ("pong") + } + + "should not recreate registered actors" in { + val actor = createRemoteActorRef + val numberOfActorsInRegistry = ActorRegistry.actors.length + actor.oneWay + oneWayLog.poll(5, TimeUnit.SECONDS) must equal ("oneway") + numberOfActorsInRegistry must be (ActorRegistry.actors.length) + } + + "should support multiple variants to get the actor from client side" in { + var actor = createRemoteActorRef + + actor.oneWay + oneWayLog.poll(5, TimeUnit.SECONDS) must equal ("oneway") + + actor = remote.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", host, port) + + actor.oneWay + oneWayLog.poll(5, TimeUnit.SECONDS) must equal ("oneway") + + actor = remote.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", 5000L, host, port, this.getClass().getClassLoader) + + actor.oneWay + oneWayLog.poll(5, TimeUnit.SECONDS) must equal ("oneway") + } + + "should register and unregister typed actors" in { + val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000) + remote.registerTypedActor("my-test-service", typedActor) + remote.typedActors.get("my-test-service") must not be (null) + remote.unregisterTypedActor("my-test-service") + remote.typedActors.get("my-test-service") must be (null) + } + + "should register and unregister typed actors by uuid" in { + val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000) + val init = AspectInitRegistry.initFor(typedActor) + val uuid = "uuid:" + init.actorRef.uuid + + remote.registerTypedActor(uuid, typedActor) + remote.typedActorsByUuid.get(init.actorRef.uuid.toString) must not be (null) + + remote.unregisterTypedActor(uuid) + remote.typedActorsByUuid.get(init.actorRef.uuid.toString) must be (null) + } + + "should find typed actors by uuid" in { + val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000) + val init = AspectInitRegistry.initFor(typedActor) + val uuid = "uuid:" + init.actorRef.uuid + + remote.registerTypedActor(uuid, typedActor) + remote.typedActorsByUuid.get(init.actorRef.uuid.toString) must not be (null) + + val actor = remote.typedActorFor(classOf[RemoteTypedActorOne], uuid, host, port) + actor.oneWay + oneWayLog.poll(5, TimeUnit.SECONDS) must equal ("oneway") + } + } } -@RunWith(classOf[JUnitRunner]) -class ServerInitiatedRemoteTypedActorSpec extends - Spec with - ShouldMatchers with - BeforeAndAfterAll { - import ServerInitiatedRemoteTypedActorSpec._ - - private val unit = TimeUnit.MILLISECONDS - - - override def beforeAll = { - server = new RemoteServer() - server.start(HOSTNAME, PORT) - - val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000) - server.registerTypedActor("typed-actor-service", typedActor) - - Thread.sleep(1000) - } - - // make sure the servers shutdown cleanly after the test has finished - override def afterAll = { - try { - server.shutdown - RemoteClient.shutdownAll - Thread.sleep(1000) - } catch { - case e => () - } - } - - describe("Server managed remote typed Actor ") { - - it("should receive one-way message") { - clearMessageLogs - val actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", 5000L, HOSTNAME, PORT) - expect("oneway") { - actor.oneWay - oneWayLog.poll(5, TimeUnit.SECONDS) - } - } - - it("should respond to request-reply message") { - clearMessageLogs - val actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", 5000L, HOSTNAME, PORT) - expect("pong") { - actor.requestReply("ping") - } - } - - it("should not recreate registered actors") { - val actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", 5000L, HOSTNAME, PORT) - val numberOfActorsInRegistry = ActorRegistry.actors.length - expect("oneway") { - actor.oneWay - oneWayLog.poll(5, TimeUnit.SECONDS) - } - assert(numberOfActorsInRegistry === ActorRegistry.actors.length) - } - - it("should support multiple variants to get the actor from client side") { - var actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", 5000L, HOSTNAME, PORT) - expect("oneway") { - actor.oneWay - oneWayLog.poll(5, TimeUnit.SECONDS) - } - actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", HOSTNAME, PORT) - expect("oneway") { - actor.oneWay - oneWayLog.poll(5, TimeUnit.SECONDS) - } - actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", 5000L, HOSTNAME, PORT, this.getClass().getClassLoader) - expect("oneway") { - actor.oneWay - oneWayLog.poll(5, TimeUnit.SECONDS) - } - } - - it("should register and unregister typed actors") { - val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000) - server.registerTypedActor("my-test-service", typedActor) - assert(server.typedActors.get("my-test-service") ne null, "typed actor registered") - server.unregisterTypedActor("my-test-service") - assert(server.typedActors.get("my-test-service") eq null, "typed actor unregistered") - } - - it("should register and unregister typed actors by uuid") { - val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000) - val init = AspectInitRegistry.initFor(typedActor) - val uuid = "uuid:" + init.actorRef.uuid - server.registerTypedActor(uuid, typedActor) - assert(server.typedActorsByUuid.get(init.actorRef.uuid.toString) ne null, "typed actor registered") - server.unregisterTypedActor(uuid) - assert(server.typedActorsByUuid.get(init.actorRef.uuid.toString) eq null, "typed actor unregistered") - } - - it("should find typed actors by uuid") { - val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000) - val init = AspectInitRegistry.initFor(typedActor) - val uuid = "uuid:" + init.actorRef.uuid - server.registerTypedActor(uuid, typedActor) - assert(server.typedActorsByUuid.get(init.actorRef.uuid.toString) ne null, "typed actor registered") - - val actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], uuid, HOSTNAME, PORT) - expect("oneway") { - actor.oneWay - oneWayLog.poll(5, TimeUnit.SECONDS) - } - - } - } -} */ - diff --git a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedSessionActorSpec.scala b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedSessionActorSpec.scala index 0217734745..5bbd0f4a29 100644 --- a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedSessionActorSpec.scala +++ b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedSessionActorSpec.scala @@ -2,108 +2,74 @@ * Copyright (C) 2009-2010 Scalable Solutions AB */ -/* THIS SHOULD BE UNCOMMENTED package akka.actor.remote -import org.scalatest._ -import org.scalatest.matchers.ShouldMatchers -import org.scalatest.BeforeAndAfterAll -import org.scalatest.junit.JUnitRunner -import org.junit.runner.RunWith - -import java.util.concurrent.TimeUnit - -import akka.remote.{RemoteServer, RemoteClient} import akka.actor._ import RemoteTypedActorLog._ -object ServerInitiatedRemoteTypedSessionActorSpec { - val HOSTNAME = "localhost" - val PORT = 9990 - var server: RemoteServer = null -} - -@RunWith(classOf[JUnitRunner]) -class ServerInitiatedRemoteTypedSessionActorSpec extends - FlatSpec with - ShouldMatchers with - BeforeAndAfterEach { - import ServerInitiatedRemoteTypedActorSpec._ - - private val unit = TimeUnit.MILLISECONDS +class ServerInitiatedRemoteTypedSessionActorSpec extends AkkaRemoteTest { override def beforeEach = { - server = new RemoteServer() - server.start(HOSTNAME, PORT) + super.beforeEach - server.registerTypedPerSessionActor("typed-session-actor-service", + remote.registerTypedPerSessionActor("typed-session-actor-service", TypedActor.newInstance(classOf[RemoteTypedSessionActor], classOf[RemoteTypedSessionActorImpl], 1000)) - - Thread.sleep(1000) } // make sure the servers shutdown cleanly after the test has finished override def afterEach = { - try { - server.shutdown - RemoteClient.shutdownAll + super.afterEach + clearMessageLogs + } + + "A remote session Actor" should { + "create a new session actor per connection" in { + + val session1 = remote.typedActorFor(classOf[RemoteTypedSessionActor], "typed-session-actor-service", 5000L, host, port) + + session1.getUser() must equal ("anonymous") + session1.login("session[1]") + session1.getUser() must equal ("session[1]") + + remote.shutdownClientModule + + val session2 = remote.typedActorFor(classOf[RemoteTypedSessionActor], "typed-session-actor-service", 5000L, host, port) + + session2.getUser() must equal ("anonymous") + + } + + "stop the actor when the client disconnects" in { + val session1 = remote.typedActorFor(classOf[RemoteTypedSessionActor], "typed-session-actor-service", 5000L, host, port) + + session1.getUser() must equal ("anonymous") + + RemoteTypedSessionActorImpl.getInstances() must have size (1) + remote.shutdownClientModule Thread.sleep(1000) - } catch { - case e => () + RemoteTypedSessionActorImpl.getInstances() must have size (0) + + } + + "stop the actor when there is an error" in { + val session1 = remote.typedActorFor(classOf[RemoteTypedSessionActor], "typed-session-actor-service", 5000L, host, port) + + session1.doSomethingFunny() + + remote.shutdownClientModule + Thread.sleep(1000) + RemoteTypedSessionActorImpl.getInstances() must have size (0) + } + + + "be able to unregister" in { + remote.registerTypedPerSessionActor("my-service-1",TypedActor.newInstance(classOf[RemoteTypedSessionActor], classOf[RemoteTypedSessionActorImpl], 1000)) + + remote.typedActorsFactories.get("my-service-1") must not be (null) + remote.unregisterTypedPerSessionActor("my-service-1") + remote.typedActorsFactories.get("my-service-1") must be (null) } } - - "A remote session Actor" should "create a new session actor per connection" in { - clearMessageLogs - - val session1 = RemoteClient.typedActorFor(classOf[RemoteTypedSessionActor], "typed-session-actor-service", 5000L, HOSTNAME, PORT) - - session1.getUser() should equal ("anonymous") - session1.login("session[1]") - session1.getUser() should equal ("session[1]") - - RemoteClient.shutdownAll - - val session2 = RemoteClient.typedActorFor(classOf[RemoteTypedSessionActor], "typed-session-actor-service", 5000L, HOSTNAME, PORT) - - session2.getUser() should equal ("anonymous") - - } - - it should "stop the actor when the client disconnects" in { - - val session1 = RemoteClient.typedActorFor(classOf[RemoteTypedSessionActor], "typed-session-actor-service", 5000L, HOSTNAME, PORT) - - session1.getUser() should equal ("anonymous") - - RemoteTypedSessionActorImpl.getInstances() should have size (1) - RemoteClient.shutdownAll - Thread.sleep(1000) - RemoteTypedSessionActorImpl.getInstances() should have size (0) - - } - - it should "stop the actor when there is an error" in { - - val session1 = RemoteClient.typedActorFor(classOf[RemoteTypedSessionActor], "typed-session-actor-service", 5000L, HOSTNAME, PORT) - - session1.doSomethingFunny() - - RemoteClient.shutdownAll - Thread.sleep(1000) - RemoteTypedSessionActorImpl.getInstances() should have size (0) - - } - - - it should "be able to unregister" in { - server.registerTypedPerSessionActor("my-service-1",TypedActor.newInstance(classOf[RemoteTypedSessionActor], classOf[RemoteTypedSessionActorImpl], 1000)) - - server.typedActorsFactories.get("my-service-1") should not be (null) - server.unregisterTypedPerSessionActor("my-service-1") - server.typedActorsFactories.get("my-service-1") should be (null) - } - -}*/ +} diff --git a/akka-remote/src/test/scala/serialization/TypedActorSerializationSpec.scala b/akka-remote/src/test/scala/serialization/TypedActorSerializationSpec.scala index 9333736821..861a77d7e0 100644 --- a/akka-remote/src/test/scala/serialization/TypedActorSerializationSpec.scala +++ b/akka-remote/src/test/scala/serialization/TypedActorSerializationSpec.scala @@ -2,49 +2,30 @@ * Copyright (C) 2009-2010 Scalable Solutions AB */ -/* THIS SHOULD BE UNCOMMENTED -package akka.actor.serialization -import org.scalatest.Spec -import org.scalatest.matchers.ShouldMatchers -import org.scalatest.BeforeAndAfterAll -import org.scalatest.junit.JUnitRunner -import org.junit.runner.RunWith +package akka.actor.serialization import akka.serialization._ import akka.actor._ import TypedActorSerialization._ import Actor._ -import akka.remote.{RemoteClient, RemoteServer} import akka.actor.remote.ServerInitiatedRemoteActorSpec.RemoteActorSpecActorUnidirectional +import akka.actor.remote.AkkaRemoteTest -@RunWith(classOf[JUnitRunner]) -class TypedActorSerializationSpec extends - Spec with - ShouldMatchers with - BeforeAndAfterAll { - - var server1: RemoteServer = null +class TypedActorSerializationSpec extends AkkaRemoteTest { var typedActor: MyTypedActor = null override def beforeAll = { - server1 = new RemoteServer().start("localhost", 9991) + super.beforeAll typedActor = TypedActor.newInstance(classOf[MyTypedActor], classOf[MyTypedActorImpl], 1000) - server1.registerTypedActor("typed-actor-service", typedActor) - Thread.sleep(1000) + remote.registerTypedActor("typed-actor-service", typedActor) } // make sure the servers shutdown cleanly after the test has finished override def afterAll = { - try { - TypedActor.stop(typedActor) - server1.shutdown - RemoteClient.shutdownAll - Thread.sleep(1000) - } catch { - case e => () - } + TypedActor.stop(typedActor) + super.afterAll } object MyTypedStatelessActorFormat extends StatelessActorFormat[MyStatelessTypedActorImpl] @@ -71,48 +52,48 @@ class TypedActorSerializationSpec extends } - describe("Serializable typed actor") { + "Serializable typed actor" should { - it("should be able to serialize and de-serialize a stateless typed actor") { + "should be able to serialize and de-serialize a stateless typed actor" in { val typedActor1 = TypedActor.newInstance(classOf[MyTypedActor], classOf[MyStatelessTypedActorImpl], 1000) - typedActor1.requestReply("hello") should equal("world") - typedActor1.requestReply("hello") should equal("world") + typedActor1.requestReply("hello") must equal("world") + typedActor1.requestReply("hello") must equal("world") val bytes = toBinaryJ(typedActor1, MyTypedStatelessActorFormat) val typedActor2: MyTypedActor = fromBinaryJ(bytes, MyTypedStatelessActorFormat) - typedActor2.requestReply("hello") should equal("world") + typedActor2.requestReply("hello") must equal("world") } - it("should be able to serialize and de-serialize a stateful typed actor") { + "should be able to serialize and de-serialize a stateful typed actor" in { val typedActor1 = TypedActor.newInstance(classOf[MyTypedActor], classOf[MyTypedActorImpl], 1000) - typedActor1.requestReply("hello") should equal("world 1") - typedActor1.requestReply("scala") should equal("hello scala 2") + typedActor1.requestReply("hello") must equal("world 1") + typedActor1.requestReply("scala") must equal("hello scala 2") val f = new MyTypedActorFormat val bytes = toBinaryJ(typedActor1, f) val typedActor2: MyTypedActor = fromBinaryJ(bytes, f) - typedActor2.requestReply("hello") should equal("world 3") + typedActor2.requestReply("hello") must equal("world 3") } - it("should be able to serialize and de-serialize a stateful typed actor with compound state") { + "should be able to serialize and de-serialize a stateful typed actor with compound state" in { val typedActor1 = TypedActor.newInstance(classOf[MyTypedActor], classOf[MyTypedActorWithDualCounter], 1000) - typedActor1.requestReply("hello") should equal("world 1 1") - typedActor1.requestReply("hello") should equal("world 2 2") + typedActor1.requestReply("hello") must equal("world 1 1") + typedActor1.requestReply("hello") must equal("world 2 2") val f = new MyTypedActorWithDualCounterFormat val bytes = toBinaryJ(typedActor1, f) val typedActor2: MyTypedActor = fromBinaryJ(bytes, f) - typedActor2.requestReply("hello") should equal("world 3 3") + typedActor2.requestReply("hello") must equal("world 3 3") } - it("should be able to serialize a local yped actor ref to a remote typed actor ref proxy") { + "should be able to serialize a local yped actor ref to a remote typed actor ref proxy" in { val typedActor1 = TypedActor.newInstance(classOf[MyTypedActor], classOf[MyStatelessTypedActorImpl], 1000) - typedActor1.requestReply("hello") should equal("world") - typedActor1.requestReply("hello") should equal("world") + typedActor1.requestReply("hello") must equal("world") + typedActor1.requestReply("hello") must equal("world") val bytes = RemoteTypedActorSerialization.toBinary(typedActor1) val typedActor2: MyTypedActor = RemoteTypedActorSerialization.fromBinaryToRemoteTypedActorRef(bytes) - typedActor1.requestReply("hello") should equal("world") + typedActor1.requestReply("hello") must equal("world") } } } @@ -165,6 +146,4 @@ class MyStatelessTypedActorImpl extends TypedActor with MyTypedActor { override def requestReply(message: String) : String = { if (message == "hello") "world" else ("hello " + message) } -} - -*/ \ No newline at end of file +} \ No newline at end of file diff --git a/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala index 5adabb3e54..34288f1637 100644 --- a/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala @@ -478,21 +478,29 @@ object TypedActor extends Logging { * @param factory factory method that constructs the typed actor * @paramm config configuration object fo the typed actor */ - def newInstance[T](intfClass: Class[T], factory: => AnyRef, config: TypedActorConfiguration): T = { - val actorRef = actorOf(newTypedActor(factory)) - newInstance(intfClass, actorRef, config) + def newInstance[T](intfClass: Class[T], factory: => AnyRef, config: TypedActorConfiguration): T = + newInstance(intfClass, createActorRef(newTypedActor(factory),config), config) + + /** + * Creates an ActorRef, can be local only or client-managed-remote + */ + private[akka] def createActorRef(typedActor: => TypedActor, config: TypedActorConfiguration): ActorRef = { + config match { + case null => actorOf(typedActor) + case c: TypedActorConfiguration if (c._host.isDefined) => + actorOf(typedActor, c._host.get.getHostName, c._host.get.getPort) + case _ => actorOf(typedActor) + } } /** - * Factory method for typed actor. + * Factory method for typed actor. * @param intfClass interface the typed actor implements * @param targetClass implementation class of the typed actor * @paramm config configuration object fo the typed actor */ - def newInstance[T](intfClass: Class[T], targetClass: Class[_], config: TypedActorConfiguration): T = { - val actorRef = actorOf(newTypedActor(targetClass)) - newInstance(intfClass, actorRef, config) - } + def newInstance[T](intfClass: Class[T], targetClass: Class[_], config: TypedActorConfiguration): T = + newInstance(intfClass, createActorRef(newTypedActor(targetClass),config), config) private[akka] def newInstance[T](intfClass: Class[T], actorRef: ActorRef): T = { if (!actorRef.actorInstance.get.isInstanceOf[TypedActor]) throw new IllegalArgumentException("ActorRef is not a ref to a typed actor") @@ -512,9 +520,9 @@ object TypedActor extends Logging { typedActor.initialize(proxy) if (config._messageDispatcher.isDefined) actorRef.dispatcher = config._messageDispatcher.get if (config._threadBasedDispatcher.isDefined) actorRef.dispatcher = Dispatchers.newThreadBasedDispatcher(actorRef) - if (config._host.isDefined) log.slf4j.warn("Client-managed typed actors are not supported!") //TODO: REVISIT: FIXME actorRef.timeout = config.timeout - AspectInitRegistry.register(proxy, AspectInit(intfClass, typedActor, actorRef, None, actorRef.timeout)) //TODO: REVISIT fix Client managed typed actor + log.slf4j.warn("config._host for {} is {} but homeAddress is {}",intfClass, config._host) + AspectInitRegistry.register(proxy, AspectInit(intfClass, typedActor, actorRef, config._host, actorRef.timeout)) //TODO: REVISIT fix Client managed typed actor actorRef.start proxy.asInstanceOf[T] } @@ -582,7 +590,7 @@ object TypedActor extends Logging { val jProxy = JProxy.newProxyInstance(intfClass.getClassLoader(), interfaces, handler) val awProxy = Proxy.newInstance(interfaces, Array(jProxy, jProxy), true, false) - AspectInitRegistry.register(awProxy, AspectInit(intfClass, null, actorRef, None, 5000L)) + AspectInitRegistry.register(awProxy, AspectInit(intfClass, null, actorRef, None, 5000L)) //TODO: does .homeAddress work here or do we need to check if it's local and then provide None? awProxy.asInstanceOf[T] } diff --git a/akka-typed-actor/src/main/scala/akka/config/TypedActorGuiceConfigurator.scala b/akka-typed-actor/src/main/scala/akka/config/TypedActorGuiceConfigurator.scala index d2a9ebca26..93639460b0 100644 --- a/akka-typed-actor/src/main/scala/akka/config/TypedActorGuiceConfigurator.scala +++ b/akka-typed-actor/src/main/scala/akka/config/TypedActorGuiceConfigurator.scala @@ -108,26 +108,24 @@ private[akka] class TypedActorGuiceConfigurator extends TypedActorConfiguratorBa val implementationClass = component.target val timeout = component.timeout - val actorRef = Actor.actorOf(TypedActor.newTypedActor(implementationClass)) + val (remoteAddress,actorRef) = + component.remoteAddress match { + case Some(a) => + (Some(new InetSocketAddress(a.hostname, a.port)), + Actor.actorOf(TypedActor.newTypedActor(implementationClass), a.hostname, a.port)) + case None => + (None, Actor.actorOf(TypedActor.newTypedActor(implementationClass))) + } + actorRef.timeout = timeout if (component.dispatcher.isDefined) actorRef.dispatcher = component.dispatcher.get val typedActor = actorRef.actorInstance.get.asInstanceOf[TypedActor] val proxy = Proxy.newInstance(Array(interfaceClass), Array(typedActor), true, false) - /* - val remoteAddress = - if (component.remoteAddress.isDefined) - Some(new InetSocketAddress(component.remoteAddress.get.hostname, component.remoteAddress.get.port)) - else None - - remoteAddress.foreach { address => - actorRef.makeRemote(remoteAddress.get) - }*/ - AspectInitRegistry.register( proxy, - AspectInit(interfaceClass, typedActor, actorRef, None, timeout)) //TODO: REVISIT: FIX CLIENT MANAGED ACTORS + AspectInit(interfaceClass, typedActor, actorRef, remoteAddress, timeout)) typedActor.initialize(proxy) actorRef.start