diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 58ceaf44b3..d604f3f441 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -126,7 +126,7 @@ object Actor extends Logging { private[actor] val actorRefInCreation = new scala.util.DynamicVariable[Option[ActorRef]](None) - /** + /** * Creates an ActorRef out of the Actor with type T. *
    *   import Actor._
@@ -140,7 +140,8 @@ object Actor extends Logging {
    *   val actor = actorOf[MyActor].start
    * 
*/ - def actorOf[T <: Actor : Manifest]: ActorRef = actorOf(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]]) + def actorOf[T <: Actor : Manifest]: ActorRef = + ActorRegistry.actorOf[T] /** * Creates a Client-managed ActorRef out of the Actor of the specified Class. @@ -158,7 +159,7 @@ object Actor extends Logging { * */ def actorOf[T <: Actor : Manifest](host: String, port: Int): ActorRef = - actorOf(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]], host, port) + ActorRegistry.actorOf[T](host,port) /** * Creates an ActorRef out of the Actor of the specified Class. @@ -174,15 +175,8 @@ object Actor extends Logging { * val actor = actorOf(classOf[MyActor]).start * */ - def actorOf(clazz: Class[_ <: Actor]): ActorRef = new LocalActorRef(() => { - import ReflectiveAccess.{ createInstance, noParams, noArgs } - createInstance[Actor](clazz.asInstanceOf[Class[_]], noParams, noArgs).getOrElse( - throw new ActorInitializationException( - "Could not instantiate Actor" + - "\nMake sure Actor is NOT defined inside a class/trait," + - "\nif so put it outside the class/trait, f.e. in a companion object," + - "\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'.")) - }) + def actorOf(clazz: Class[_ <: Actor]): ActorRef = + ActorRegistry.actorOf(clazz) /** * Creates a Client-managed ActorRef out of the Actor of the specified Class. @@ -199,23 +193,8 @@ object Actor extends Logging { * val actor = actorOf(classOf[MyActor],"www.akka.io",2552).start * */ - def actorOf(clazz: Class[_ <: Actor], host: String, port: Int, timeout: Long = TIMEOUT): ActorRef = { - import ReflectiveAccess._ - import ReflectiveAccess.RemoteServerModule.{HOSTNAME,PORT} - ensureRemotingEnabled - - (host,port) match { - case null => throw new IllegalArgumentException("No location specified") - case (HOSTNAME, PORT) => actorOf(clazz) //Local - case _ => new RemoteActorRef(clazz.getName, - clazz.getName, - host, - port, - timeout, - true, //Client managed - None) - } - } + def actorOf(clazz: Class[_ <: Actor], host: String, port: Int, timeout: Long = Actor.TIMEOUT): ActorRef = + ActorRegistry.actorOf(clazz, host, port, timeout) /** * Creates an ActorRef out of the Actor. Allows you to pass in a factory function @@ -235,7 +214,7 @@ object Actor extends Logging { * val actor = actorOf(new MyActor).start * */ - def actorOf(factory: => Actor): ActorRef = new LocalActorRef(() => factory) + def actorOf(factory: => Actor): ActorRef = ActorRegistry.actorOf(factory) /** * Use to spawn out a block of code in an event-driven actor. Will shut actor down when @@ -252,15 +231,9 @@ object Actor extends Logging { * } * */ - def spawn(body: => Unit)(implicit dispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher): Unit = { - case object Spawn - actorOf(new Actor() { - self.dispatcher = dispatcher - def receive = { - case Spawn => try { body } finally { self.stop } - } - }).start ! Spawn - } + def spawn(body: => Unit)(implicit dispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher): Unit = + ActorRegistry.spawn(body)(dispatcher) + /** * Implicitly converts the given Option[Any] to a AnyOptionAsTypedOption which offers the method as[T] diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index d938366b62..c97edba37e 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -80,6 +80,8 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal protected[akka] var _futureTimeout: Option[ScheduledFuture[AnyRef]] = None protected[akka] val guard = new ReentrantGuard + private[akka] def registry: ActorRegistryInstance + /** * User overridable callback/setting. *

@@ -541,6 +543,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal * @author Jonas Bonér */ class LocalActorRef private[akka] ( + private[akka] val registry: ActorRegistryInstance, private[this] val actorFactory: () => Actor) extends ActorRef with ScalaActorRef { @@ -563,7 +566,9 @@ class LocalActorRef private[akka] ( if (isRunning) initializeActorInstance // used only for deserialization - private[akka] def this(__uuid: Uuid, + private[akka] def this( + __registry: ActorRegistryInstance, + __uuid: Uuid, __id: String, __hostname: String, __port: Int, @@ -573,7 +578,7 @@ class LocalActorRef private[akka] ( __supervisor: Option[ActorRef], __hotswap: Stack[PartialFunction[Any, Unit]], __factory: () => Actor) = { - this(__factory) + this(__registry, __factory) _uuid = __uuid id = __id timeout = __timeout @@ -583,7 +588,7 @@ class LocalActorRef private[akka] ( hotswap = __hotswap setActorSelfFields(actor,this) start - ActorRegistry.register(this) + __registry.register(this) } // ========= PUBLIC FUNCTIONS ========= @@ -644,7 +649,7 @@ class LocalActorRef private[akka] ( dispatcher.detach(this) _status = ActorRefInternals.SHUTDOWN actor.postStop - ActorRegistry.unregister(this) + registry.unregister(this) setActorSelfFields(actorInstance.get,null) } //else if (isBeingRestarted) throw new ActorKilledException("Actor [" + toString + "] is being restarted.") } @@ -942,8 +947,8 @@ class LocalActorRef private[akka] ( } //TODO: REVISIT: REMOVE - /* - protected[akka] def registerSupervisorAsRemoteActor: Option[Uuid] = guard.withGuard { + + /*protected[akka] def registerSupervisorAsRemoteActor: Option[Uuid] = guard.withGuard { ensureRemotingEnabled if (_supervisor.isDefined) { remoteAddress.foreach(address => RemoteClientModule.registerSupervisorForActor(address, this)) @@ -1053,34 +1058,8 @@ class LocalActorRef private[akka] ( private def initializeActorInstance = { actor.preStart // run actor preStart Actor.log.slf4j.trace("[{}] has started", toString) - ActorRegistry.register(this) + registry.register(this) } - - /* - private def serializeMessage(message: AnyRef): AnyRef = if (Actor.SERIALIZE_MESSAGES) { - if (!message.isInstanceOf[String] && - !message.isInstanceOf[Byte] && - !message.isInstanceOf[Int] && - !message.isInstanceOf[Long] && - !message.isInstanceOf[Float] && - !message.isInstanceOf[Double] && - !message.isInstanceOf[Boolean] && - !message.isInstanceOf[Char] && - !message.isInstanceOf[Tuple2[_, _]] && - !message.isInstanceOf[Tuple3[_, _, _]] && - !message.isInstanceOf[Tuple4[_, _, _, _]] && - !message.isInstanceOf[Tuple5[_, _, _, _, _]] && - !message.isInstanceOf[Tuple6[_, _, _, _, _, _]] && - !message.isInstanceOf[Tuple7[_, _, _, _, _, _, _]] && - !message.isInstanceOf[Tuple8[_, _, _, _, _, _, _, _]] && - !message.getClass.isArray && - !message.isInstanceOf[List[_]] && - !message.isInstanceOf[scala.collection.immutable.Map[_, _]] && - !message.isInstanceOf[scala.collection.immutable.Set[_]]) { - Serializer.Java.deepClone(message) - } else message - } else message - */ } /** @@ -1099,12 +1078,13 @@ object RemoteActorSystemMessage { * @author Jonas Bonér */ private[akka] case class RemoteActorRef private[akka] ( + registry: ActorRegistryInstance, classOrServiceName: String, val actorClassName: String, val hostname: String, val port: Int, _timeout: Long, - clientManaged: Boolean, + clientManaged: Boolean, //TODO: REVISIT: ENCODE CLIENT_MANAGED INTO REMOTE PROTOCOL loader: Option[ClassLoader], val actorType: ActorType = ActorType.ScalaActor) extends ActorRef with ScalaActorRef { @@ -1119,16 +1099,14 @@ private[akka] case class RemoteActorRef private[akka] ( start def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = - RemoteClientModule.send[Any]( - message, senderOption, None, homeAddress, timeout, true, this, None, actorType) + registry.remote.send[Any](message, senderOption, None, homeAddress, timeout, true, this, None, actorType) def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( message: Any, timeout: Long, senderOption: Option[ActorRef], senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { - val future = RemoteClientModule.send[T]( - message, senderOption, senderFuture, homeAddress, timeout, false, this, None, actorType) + val future = registry.remote.send[T](message, senderOption, senderFuture, homeAddress, timeout, false, this, None, actorType) if (future.isDefined) future.get else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString) } @@ -1136,7 +1114,7 @@ private[akka] case class RemoteActorRef private[akka] ( def start: ActorRef = synchronized { _status = ActorRefInternals.RUNNING if (clientManaged) { - RemoteClientModule.register(homeAddress, uuid) + registry.remote.registerClientManagedActor(homeAddress.getHostName,homeAddress.getPort, uuid) } this } @@ -1146,8 +1124,8 @@ private[akka] case class RemoteActorRef private[akka] ( _status = ActorRefInternals.SHUTDOWN postMessageToMailbox(RemoteActorSystemMessage.Stop, None) if (clientManaged) { - RemoteClientModule.unregister(homeAddress, uuid) - ActorRegistry.remote.unregister(this) //TODO: Why does this need to be deregistered from the server? + registry.remote.unregisterClientManagedActor(homeAddress.getHostName,homeAddress.getPort, uuid) + registry.remote.unregister(this) //TODO: REVISIT: Why does this need to be deregistered from the server? } } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala b/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala index d80e5273da..f966928965 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala @@ -13,8 +13,9 @@ import java.util.{Set => JSet} import annotation.tailrec import akka.util.ReflectiveAccess._ import java.net.InetSocketAddress -import akka.util. {ReadWriteGuard, Address, ListenerManagement} -import akka.remoteinterface.RemoteServerModule +import akka.util. {ReflectiveAccess, ReadWriteGuard, Address, ListenerManagement} +import akka.dispatch. {MessageDispatcher, Dispatchers} +import akka.remoteinterface. {RemoteSupport, RemoteServerModule} /** * Base trait for ActorRegistry events, allows listen to when an actor is added and removed from the ActorRegistry. @@ -37,7 +38,9 @@ case class ActorUnregistered(actor: ActorRef) extends ActorRegistryEvent * * @author Jonas Bonér */ -object ActorRegistry extends ListenerManagement { +object ActorRegistry extends ActorRegistryInstance(ReflectiveAccess.Remote.defaultRemoteSupport) + +class ActorRegistryInstance(remoteBootstrap: Option[(ActorRegistryInstance) => RemoteSupport]) extends ListenerManagement { private val actorsByUUID = new ConcurrentHashMap[Uuid, ActorRef] private val actorsById = new Index[String,ActorRef] private val remoteActorSets = Map[Address, RemoteActorSet]() @@ -227,11 +230,127 @@ object ActorRegistry extends ListenerManagement { /** * Handy access to the RemoteServer module */ - lazy val remote: RemoteServerModule = getObjectFor("akka.remote.RemoteNode$") match { - case Some(module) => module - case None => - log.slf4j.error("Wanted remote module but didn't exist on classpath") - null + lazy val remote: RemoteSupport = remoteBootstrap.map(_(this)).getOrElse(throw new UnsupportedOperationException("You need to have akka-remote on classpath")) + + /** + * Creates an ActorRef out of the Actor with type T. + *

+   *   import Actor._
+   *   val actor = actorOf[MyActor]
+   *   actor.start
+   *   actor ! message
+   *   actor.stop
+   * 
+ * You can create and start the actor in one statement like this: + *
+   *   val actor = actorOf[MyActor].start
+   * 
+ */ + def actorOf[T <: Actor : Manifest]: ActorRef = actorOf(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]]) + + /** + * Creates a Client-managed ActorRef out of the Actor of the specified Class. + * If the supplied host and port is identical of the configured local node, it will be a local actor + *
+   *   import Actor._
+   *   val actor = actorOf[MyActor]("www.akka.io",2552)
+   *   actor.start
+   *   actor ! message
+   *   actor.stop
+   * 
+ * You can create and start the actor in one statement like this: + *
+   *   val actor = actorOf[MyActor]("www.akka.io",2552).start
+   * 
+ */ + def actorOf[T <: Actor : Manifest](host: String, port: Int): ActorRef = + actorOf(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]], host, port) + + /** + * Creates an ActorRef out of the Actor of the specified Class. + *
+   *   import Actor._
+   *   val actor = actorOf(classOf[MyActor])
+   *   actor.start
+   *   actor ! message
+   *   actor.stop
+   * 
+ * You can create and start the actor in one statement like this: + *
+   *   val actor = actorOf(classOf[MyActor]).start
+   * 
+ */ + def actorOf(clazz: Class[_ <: Actor]): ActorRef = new LocalActorRef(this, () => { + import ReflectiveAccess.{ createInstance, noParams, noArgs } + createInstance[Actor](clazz.asInstanceOf[Class[_]], noParams, noArgs).getOrElse( + throw new ActorInitializationException( + "Could not instantiate Actor" + + "\nMake sure Actor is NOT defined inside a class/trait," + + "\nif so put it outside the class/trait, f.e. in a companion object," + + "\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'.")) + }) + + /** + * Creates a Client-managed ActorRef out of the Actor of the specified Class. + * If the supplied host and port is identical of the configured local node, it will be a local actor + *
+   *   import Actor._
+   *   val actor = actorOf(classOf[MyActor],"www.akka.io",2552)
+   *   actor.start
+   *   actor ! message
+   *   actor.stop
+   * 
+ * You can create and start the actor in one statement like this: + *
+   *   val actor = actorOf(classOf[MyActor],"www.akka.io",2552).start
+   * 
+ */ + def actorOf(clazz: Class[_ <: Actor], host: String, port: Int, timeout: Long = Actor.TIMEOUT): ActorRef = + remote.clientManagedActorOf(clazz, host, port, timeout) + + /** + * Creates an ActorRef out of the Actor. Allows you to pass in a factory function + * that creates the Actor. Please note that this function can be invoked multiple + * times if for example the Actor is supervised and needs to be restarted. + *

+ * This function should NOT be used for remote actors. + *

+   *   import Actor._
+   *   val actor = actorOf(new MyActor)
+   *   actor.start
+   *   actor ! message
+   *   actor.stop
+   * 
+ * You can create and start the actor in one statement like this: + *
+   *   val actor = actorOf(new MyActor).start
+   * 
+ */ + def actorOf(factory: => Actor): ActorRef = new LocalActorRef(this,() => factory) + + /** + * Use to spawn out a block of code in an event-driven actor. Will shut actor down when + * the block has been executed. + *

+ * NOTE: If used from within an Actor then has to be qualified with 'ActorRegistry.spawn' since + * there is a method 'spawn[ActorType]' in the Actor trait already. + * Example: + *

+   * import ActorRegistry.{spawn}
+   *
+   * spawn  {
+   *   ... // do stuff
+   * }
+   * 
+ */ + def spawn(body: => Unit)(implicit dispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher): Unit = { + case object Spawn + actorOf(new Actor() { + self.dispatcher = dispatcher + def receive = { + case Spawn => try { body } finally { self.stop } + } + }).start ! Spawn } @@ -303,12 +422,12 @@ object ActorRegistry extends ListenerManagement { private[akka] def typedActorsFactories(address: Address) = actorsFor(address).typedActorsFactories private[akka] class RemoteActorSet { - private[ActorRegistry] val actors = new ConcurrentHashMap[String, ActorRef] - private[ActorRegistry] val actorsByUuid = new ConcurrentHashMap[String, ActorRef] - private[ActorRegistry] val actorsFactories = new ConcurrentHashMap[String, () => ActorRef] - private[ActorRegistry] val typedActors = new ConcurrentHashMap[String, AnyRef] - private[ActorRegistry] val typedActorsByUuid = new ConcurrentHashMap[String, AnyRef] - private[ActorRegistry] val typedActorsFactories = new ConcurrentHashMap[String, () => AnyRef] + private[ActorRegistryInstance] val actors = new ConcurrentHashMap[String, ActorRef] + private[ActorRegistryInstance] val actorsByUuid = new ConcurrentHashMap[String, ActorRef] + private[ActorRegistryInstance] val actorsFactories = new ConcurrentHashMap[String, () => ActorRef] + private[ActorRegistryInstance] val typedActors = new ConcurrentHashMap[String, AnyRef] + private[ActorRegistryInstance] val typedActorsByUuid = new ConcurrentHashMap[String, AnyRef] + private[ActorRegistryInstance] val typedActorsFactories = new ConcurrentHashMap[String, () => AnyRef] } } diff --git a/akka-actor/src/main/scala/akka/actor/Supervisor.scala b/akka-actor/src/main/scala/akka/actor/Supervisor.scala index daf7a962de..288bdfda24 100644 --- a/akka-actor/src/main/scala/akka/actor/Supervisor.scala +++ b/akka-actor/src/main/scala/akka/actor/Supervisor.scala @@ -142,7 +142,7 @@ sealed class Supervisor(handler: FaultHandlingStrategy) { actorRef.lifeCycle = lifeCycle supervisor.link(actorRef) if (registerAsRemoteService) - ActorRegistry.remote.register(actorRef) + ActorRegistry.remote.register(actorRef) //TODO: REVISIT: Is this the most sensible approach? other way of obtaining ActorRegistry? case supervisorConfig @ SupervisorConfig(_, _) => // recursive supervisor configuration val childSupervisor = Supervisor(supervisorConfig) supervisor.link(childSupervisor.supervisor) diff --git a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala index f0f2f259d6..d12847e149 100644 --- a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala @@ -132,7 +132,7 @@ trait MessageDispatcher extends MailboxFactory with Logging { val i = uuids.iterator while(i.hasNext()) { val uuid = i.next() - ActorRegistry.actorFor(uuid) match { + ActorRegistry.actorFor(uuid) match { //TODO: REVISIT: How to keep track of which registry? case Some(actor) => actor.stop case None => log.slf4j.error("stopAllLinkedActors couldn't find linked actor: " + uuid) diff --git a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala index 4f6cd11a67..cf246e4ae4 100644 --- a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala +++ b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala @@ -5,13 +5,33 @@ package akka.remoteinterface import akka.japi.Creator -import akka.actor.ActorRef -import akka.util.{ReentrantGuard, Logging, ListenerManagement} +import java.net.InetSocketAddress +import akka.actor._ +import akka.util._ +import akka.dispatch.CompletableFuture +import akka.actor. {ActorRegistryInstance, ActorType, RemoteActorRef, ActorRef} +import akka.config.Config.{config, TIME_UNIT} + +trait RemoteModule extends Logging { + def registry: ActorRegistryInstance + def optimizeLocalScoped_?(): Boolean //Apply optimizations for remote operations in local scope + protected[akka] def notifyListeners(message: => Any): Unit +} + + +abstract class RemoteSupport extends ListenerManagement with RemoteServerModule with RemoteClientModule { + def shutdown { + this.shutdownServerModule + this.shutdownClientModule + } + protected override def manageLifeCycleOfListeners = false + protected[akka] override def notifyListeners(message: => Any): Unit = super.notifyListeners(message) +} /** * This is the interface for the RemoteServer functionality, it's used in ActorRegistry.remote */ -trait RemoteServerModule extends ListenerManagement with Logging { +trait RemoteServerModule extends RemoteModule { protected val guard = new ReentrantGuard /** @@ -37,12 +57,12 @@ trait RemoteServerModule extends ListenerManagement with Logging { /** * Starts the server up */ - def start(host: String, port: Int, loader: Option[ClassLoader] = None): RemoteServerModule //TODO possibly hidden + def start(host: String, port: Int, loader: Option[ClassLoader] = None): RemoteServerModule /** * Shuts the server down */ - def shutdown: Unit //TODO possibly hidden + def shutdownServerModule: Unit /** * Register typed actor by interface name. @@ -146,4 +166,72 @@ trait RemoteServerModule extends ListenerManagement with Logging { * NOTE: You need to call this method if you have registered an actor by a custom ID. */ def unregisterTypedPerSessionActor(id: String): Unit +} + +trait RemoteClientModule extends RemoteModule { self: RemoteModule => + + def actorFor(classNameOrServiceId: String, hostname: String, port: Int): ActorRef = + actorFor(classNameOrServiceId, classNameOrServiceId, Actor.TIMEOUT, hostname, port, None) + + def actorFor(classNameOrServiceId: String, hostname: String, port: Int, loader: ClassLoader): ActorRef = + actorFor(classNameOrServiceId, classNameOrServiceId, Actor.TIMEOUT, hostname, port, Some(loader)) + + def actorFor(serviceId: String, className: String, hostname: String, port: Int): ActorRef = + actorFor(serviceId, className, Actor.TIMEOUT, hostname, port, None) + + def actorFor(serviceId: String, className: String, hostname: String, port: Int, loader: ClassLoader): ActorRef = + actorFor(serviceId, className, Actor.TIMEOUT, hostname, port, Some(loader)) + + def actorFor(classNameOrServiceId: String, timeout: Long, hostname: String, port: Int): ActorRef = + actorFor(classNameOrServiceId, classNameOrServiceId, timeout, hostname, port, None) + + def actorFor(classNameOrServiceId: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): ActorRef = + actorFor(classNameOrServiceId, classNameOrServiceId, timeout, hostname, port, Some(loader)) + + def actorFor(serviceId: String, className: String, timeout: Long, hostname: String, port: Int): ActorRef = + actorFor(serviceId, className, timeout, hostname, port, None) + + def typedActorFor[T](intfClass: Class[T], serviceIdOrClassName: String, hostname: String, port: Int): T = + typedActorFor(intfClass, serviceIdOrClassName, serviceIdOrClassName, Actor.TIMEOUT, hostname, port, None) + + def typedActorFor[T](intfClass: Class[T], serviceIdOrClassName: String, timeout: Long, hostname: String, port: Int): T = + typedActorFor(intfClass, serviceIdOrClassName, serviceIdOrClassName, timeout, hostname, port, None) + + def typedActorFor[T](intfClass: Class[T], serviceIdOrClassName: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): T = + typedActorFor(intfClass, serviceIdOrClassName, serviceIdOrClassName, timeout, hostname, port, Some(loader)) + + def typedActorFor[T](intfClass: Class[T], serviceId: String, implClassName: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): T = + typedActorFor(intfClass, serviceId, implClassName, timeout, hostname, port, Some(loader)) + + def clientManagedActorOf(clazz: Class[_ <: Actor], host: String, port: Int, timeout: Long): ActorRef + + /** Methods that needs to be implemented by a transport **/ + + protected[akka] def typedActorFor[T](intfClass: Class[T], serviceId: String, implClassName: String, timeout: Long, host: String, port: Int, loader: Option[ClassLoader]): T + + protected[akka] def actorFor(serviceId: String, className: String, timeout: Long, hostname: String, port: Int, loader: Option[ClassLoader]): ActorRef + + protected[akka] def send[T](message: Any, + senderOption: Option[ActorRef], + senderFuture: Option[CompletableFuture[T]], + remoteAddress: InetSocketAddress, + timeout: Long, + isOneWay: Boolean, + actorRef: ActorRef, + typedActorInfo: Option[Tuple2[String, String]], + actorType: ActorType): Option[CompletableFuture[T]] + + //TODO: REVISIT: IMPLEMENT OR REMOVE + //private[akka] def registerSupervisorForActor(actorRef: ActorRef): ActorRef + + //private[akka] def deregisterSupervisorForActor(actorRef: ActorRef): ActorRef + + /** + * Clean-up all open connections. + */ + def shutdownClientModule: Unit + + private[akka] def registerClientManagedActor(hostname: String, port: Int, uuid: Uuid): Unit + + private[akka] def unregisterClientManagedActor(hostname: String, port: Int, uuid: Uuid): Unit } \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala index 14bde14e89..a923b52dd2 100644 --- a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala +++ b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala @@ -4,12 +4,13 @@ package akka.util -import akka.actor.{ActorRef, IllegalActorStateException, ActorType, Uuid} import akka.dispatch.{Future, CompletableFuture, MessageInvocation} import akka.config.{Config, ModuleNotAvailableException} import akka.AkkaException import java.net.InetSocketAddress +import akka.remoteinterface.RemoteSupport +import akka.actor._ /** * Helper class for reflective access to different modules in order to allow optional loading of modules. @@ -20,10 +21,10 @@ object ReflectiveAccess extends Logging { val loader = getClass.getClassLoader - lazy val isRemotingEnabled = RemoteClientModule.isEnabled + lazy val isRemotingEnabled = Remote.isEnabled lazy val isTypedActorEnabled = TypedActorModule.isEnabled - def ensureRemotingEnabled = RemoteClientModule.ensureEnabled + def ensureRemotingEnabled = Remote.ensureEnabled def ensureTypedActorEnabled = TypedActorModule.ensureEnabled /** @@ -31,82 +32,26 @@ object ReflectiveAccess extends Logging { * * @author Jonas Bonér */ - object RemoteClientModule { + object Remote { + val TRANSPORT = Config.config.getString("akka.remote.transport","akka.remote.NettyRemoteSupport") + val HOSTNAME = Config.config.getString("akka.remote.server.hostname", "localhost") + val PORT = Config.config.getInt("akka.remote.server.port", 2552) - type RemoteClient = { - def send[T]( - message: Any, - senderOption: Option[ActorRef], - senderFuture: Option[CompletableFuture[_]], - remoteAddress: InetSocketAddress, - timeout: Long, - isOneWay: Boolean, - actorRef: ActorRef, - typedActorInfo: Option[Tuple2[String, String]], - actorType: ActorType): Option[CompletableFuture[T]] - def registerSupervisorForActor(actorRef: ActorRef) - } - - type RemoteClientObject = { - def register(hostname: String, port: Int, uuid: Uuid): Unit - def unregister(hostname: String, port: Int, uuid: Uuid): Unit - def clientFor(address: InetSocketAddress): RemoteClient - def clientFor(hostname: String, port: Int, loader: Option[ClassLoader]): RemoteClient - } - - lazy val isEnabled = remoteClientObjectInstance.isDefined + lazy val isEnabled = remoteSupportClass.isDefined def ensureEnabled = if (!isEnabled) throw new ModuleNotAvailableException( "Can't load the remoting module, make sure that akka-remote.jar is on the classpath") - val remoteClientObjectInstance: Option[RemoteClientObject] = - getObjectFor("akka.remote.RemoteClient$") + //TODO: REVISIT: Make class configurable + val remoteSupportClass: Option[Class[_ <: RemoteSupport]] = getClassFor(TRANSPORT) - def register(address: InetSocketAddress, uuid: Uuid) = { - ensureEnabled - remoteClientObjectInstance.get.register(address.getHostName, address.getPort, uuid) + protected[akka] val defaultRemoteSupport: Option[ActorRegistryInstance => RemoteSupport] = remoteSupportClass map { + remoteClass => (registry: ActorRegistryInstance) => + createInstance[RemoteSupport](remoteClass,Array[Class[_]](classOf[ActorRegistryInstance]),Array[AnyRef](registry)). + getOrElse(throw new ModuleNotAvailableException("Can't instantiate "+ + remoteClass.getName+ + ", make sure that akka-remote.jar is on the classpath")) } - - def unregister(address: InetSocketAddress, uuid: Uuid) = { - ensureEnabled - remoteClientObjectInstance.get.unregister(address.getHostName, address.getPort, uuid) - } - - def registerSupervisorForActor(remoteAddress: InetSocketAddress, actorRef: ActorRef) = { - ensureEnabled - val remoteClient = remoteClientObjectInstance.get.clientFor(remoteAddress) - remoteClient.registerSupervisorForActor(actorRef) - } - - def clientFor(hostname: String, port: Int, loader: Option[ClassLoader]): RemoteClient = { - ensureEnabled - remoteClientObjectInstance.get.clientFor(hostname, port, loader) - } - - def send[T]( - message: Any, - senderOption: Option[ActorRef], - senderFuture: Option[CompletableFuture[_]], - remoteAddress: InetSocketAddress, - timeout: Long, - isOneWay: Boolean, - actorRef: ActorRef, - typedActorInfo: Option[Tuple2[String, String]], - actorType: ActorType): Option[CompletableFuture[T]] = { - ensureEnabled - clientFor(remoteAddress.getHostName, remoteAddress.getPort, None).send[T]( - message, senderOption, senderFuture, remoteAddress, timeout, isOneWay, actorRef, typedActorInfo, actorType) - } - } - - /** - * Reflective access to the RemoteServer module. - * - * @author Jonas Bonér - */ - object RemoteServerModule { - val HOSTNAME = Config.config.getString("akka.remote.server.hostname", "localhost") - val PORT = Config.config.getInt("akka.remote.server.port", 2552) } /** diff --git a/akka-actor/src/test/java/akka/config/SupervisionConfig.java b/akka-actor/src/test/java/akka/config/SupervisionConfig.java index 271e6c490e..fd71c86bf1 100644 --- a/akka-actor/src/test/java/akka/config/SupervisionConfig.java +++ b/akka-actor/src/test/java/akka/config/SupervisionConfig.java @@ -13,7 +13,7 @@ public class SupervisionConfig { public SupervisorConfig createSupervisorConfig(List toSupervise) { ArrayList targets = new ArrayList(toSupervise.size()); for(ActorRef ref : toSupervise) { - targets.add(new Supervise(ref, permanent(), new RemoteAddress("localhost",2552))); + targets.add(new Supervise(ref, permanent(), true)); } return new SupervisorConfig(new AllForOneStrategy(new Class[] { Exception.class },50,1000), targets.toArray(new Server[0])); diff --git a/akka-remote/src/main/scala/akka/remote/BootableRemoteActorService.scala b/akka-remote/src/main/scala/akka/remote/BootableRemoteActorService.scala index 9c2eeb9c2c..58892c2ad3 100644 --- a/akka-remote/src/main/scala/akka/remote/BootableRemoteActorService.scala +++ b/akka-remote/src/main/scala/akka/remote/BootableRemoteActorService.scala @@ -17,7 +17,7 @@ trait BootableRemoteActorService extends Bootable with Logging { self: BootableActorLoaderService => protected lazy val remoteServerThread = new Thread(new Runnable() { - import ReflectiveAccess.RemoteServerModule.{HOSTNAME,PORT} + import ReflectiveAccess.Remote.{HOSTNAME,PORT} def run = ActorRegistry.remote.start(HOSTNAME,PORT,loader = self.applicationLoader) }, "Akka Remote Service") @@ -34,7 +34,7 @@ trait BootableRemoteActorService extends Bootable with Logging { abstract override def onUnload = { log.slf4j.info("Shutting down Remote Actors Service") - RemoteNode.shutdown + ActorRegistry.remote.shutdown if (remoteServerThread.isAlive) remoteServerThread.join(1000) log.slf4j.info("Remote Actors Service has been shut down") super.onUnload diff --git a/akka-remote/src/main/scala/akka/remote/RemoteServer.scala b/akka-remote/src/main/scala/akka/remote/NettyRemoteSupport.scala similarity index 59% rename from akka-remote/src/main/scala/akka/remote/RemoteServer.scala rename to akka-remote/src/main/scala/akka/remote/NettyRemoteSupport.scala index b8e55c8054..459eebcaf9 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteServer.scala +++ b/akka-remote/src/main/scala/akka/remote/NettyRemoteSupport.scala @@ -4,63 +4,470 @@ package akka.remote -import java.lang.reflect.InvocationTargetException -import java.net.InetSocketAddress -import java.util.concurrent.{ConcurrentHashMap, Executors} -import java.util.{Map => JMap} - -import akka.actor.Actor._ -import akka.actor.{Actor, TypedActor, ActorRef, IllegalActorStateException, RemoteActorSystemMessage, uuidFrom, Uuid, ActorRegistry, LifeCycleMessage, ActorType => AkkaActorType} -import akka.util._ +import akka.remote.protocol.RemoteProtocol.{ActorType => ActorTypeProtocol, _} +import akka.dispatch.{DefaultCompletableFuture, CompletableFuture, Future} import akka.remote.protocol.RemoteProtocol._ import akka.remote.protocol.RemoteProtocol.ActorType._ -import akka.config.Config._ import akka.config.ConfigurationException import akka.serialization.RemoteActorSerialization +import akka.japi.Creator +import akka.actor.{ActorRegistryInstance, Actor, RemoteActorRef, TypedActor, ActorRef, IllegalActorStateException, + RemoteActorSystemMessage, uuidFrom, Uuid, Exit, ActorRegistry, LifeCycleMessage, ActorType => AkkaActorType} +import akka.remoteinterface. {RemoteSupport, RemoteModule, RemoteServerModule, RemoteClientModule} +import akka.config.Config._ import akka.serialization.RemoteActorSerialization._ +import akka.AkkaException +import akka.actor.Actor._ +import akka.util._ -import org.jboss.netty.bootstrap.ServerBootstrap import org.jboss.netty.channel._ -import org.jboss.netty.channel.group.{DefaultChannelGroup, ChannelGroup} +import org.jboss.netty.channel.group.{DefaultChannelGroup,ChannelGroup} +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory -import org.jboss.netty.handler.codec.frame.{LengthFieldBasedFrameDecoder, LengthFieldPrepender} -import org.jboss.netty.handler.codec.protobuf.{ProtobufDecoder, ProtobufEncoder} -import org.jboss.netty.handler.codec.compression.{ZlibEncoder, ZlibDecoder} +import org.jboss.netty.bootstrap.{ServerBootstrap,ClientBootstrap} +import org.jboss.netty.handler.codec.frame.{ LengthFieldBasedFrameDecoder, LengthFieldPrepender } +import org.jboss.netty.handler.codec.compression.{ ZlibDecoder, ZlibEncoder } +import org.jboss.netty.handler.codec.protobuf.{ ProtobufDecoder, ProtobufEncoder } +import org.jboss.netty.handler.timeout.ReadTimeoutHandler +import org.jboss.netty.util.{ TimerTask, Timeout, HashedWheelTimer } import org.jboss.netty.handler.ssl.SslHandler -import scala.collection.mutable.Map +import java.net.{ SocketAddress, InetSocketAddress } +import java.util.concurrent.{ TimeUnit, Executors, ConcurrentMap, ConcurrentHashMap, ConcurrentSkipListSet } +import java.util.concurrent.atomic.{ AtomicLong, AtomicBoolean } + +import scala.collection.mutable.{ HashSet, HashMap } import scala.reflect.BeanProperty -import akka.dispatch. {Future, DefaultCompletableFuture, CompletableFuture} -import akka.japi.Creator -import akka.remoteinterface.RemoteServerModule +import java.lang.reflect.InvocationTargetException + + /** - * Use this object if you need a single remote server on a specific node. - * - *
- * // takes hostname and port from 'akka.conf'
- * RemoteNode.start
- * 
- * - *
- * RemoteNode.start(hostname, port)
- * 
- * - * You can specify the class loader to use to load the remote actors. - *
- * RemoteNode.start(hostname, port, classLoader)
- * 
- * - * If you need to create more than one, then you can use the RemoteServer: - * - *
- * val server = new RemoteServer
- * server.start(hostname, port)
- * 
+ * Life-cycle events for RemoteClient. + */ +sealed trait RemoteClientLifeCycleEvent +case class RemoteClientError( + @BeanProperty val cause: Throwable, + @BeanProperty val client: RemoteClient) extends RemoteClientLifeCycleEvent +case class RemoteClientDisconnected( + @BeanProperty val client: RemoteClient) extends RemoteClientLifeCycleEvent +case class RemoteClientConnected( + @BeanProperty val client: RemoteClient) extends RemoteClientLifeCycleEvent +case class RemoteClientStarted( + @BeanProperty val client: RemoteClient) extends RemoteClientLifeCycleEvent +case class RemoteClientShutdown( + @BeanProperty val client: RemoteClient) extends RemoteClientLifeCycleEvent + +/** + * Thrown for example when trying to send a message using a RemoteClient that is either not started or shut down. + */ +class RemoteClientException private[akka] (message: String, @BeanProperty val client: RemoteClient) extends AkkaException(message) + +/** + * The RemoteClient object manages RemoteClient instances and gives you an API to lookup remote actor handles. * * @author Jonas Bonér */ -object RemoteNode extends RemoteServer +trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagement with Logging => + private val remoteClients = new HashMap[String, RemoteClient] + private val remoteActors = new HashMap[Address, HashSet[Uuid]] + + protected[akka] def typedActorFor[T](intfClass: Class[T], serviceId: String, implClassName: String, timeout: Long, hostname: String, port: Int, loader: Option[ClassLoader]): T = + TypedActor.createProxyForRemoteActorRef(intfClass, RemoteActorRef(registry, serviceId, implClassName, hostname, port, timeout, false, loader, AkkaActorType.TypedActor)) + + protected[akka] def send[T](message: Any, + senderOption: Option[ActorRef], + senderFuture: Option[CompletableFuture[T]], + remoteAddress: InetSocketAddress, + timeout: Long, + isOneWay: Boolean, + actorRef: ActorRef, + typedActorInfo: Option[Tuple2[String, String]], + actorType: AkkaActorType): Option[CompletableFuture[T]] = + clientFor(remoteAddress, None).send[T](message, senderOption, senderFuture, remoteAddress, timeout, isOneWay, actorRef, typedActorInfo, actorType) + + private[akka] def clientFor( + address: InetSocketAddress, loader: Option[ClassLoader]): RemoteClient = synchronized { //TODO: REVIST: synchronized here seems bottlenecky + val hostname = address.getHostName + val port = address.getPort + val hash = hostname + ':' + port + loader.foreach(MessageSerializer.setClassLoader(_))//TODO: REVISIT: THIS SMELLS FUNNY + if (remoteClients.contains(hash)) remoteClients(hash) + else { + val client = new RemoteClient(hostname, port, loader, self.notifyListeners _) + client.connect + remoteClients += hash -> client + client + } + } + + def shutdownClientFor(address: InetSocketAddress) = synchronized { + val hostname = address.getHostName + val port = address.getPort + val hash = hostname + ':' + port + if (remoteClients.contains(hash)) { + val client = remoteClients(hash) + client.shutdown + remoteClients -= hash + } + } + //TODO: REVISIT IMPLEMENT OR REMOVE + /*private[akka] def registerSupervisorForActor(actorRef: ActorRef): ActorRef = + clientFor().registerSupervisorForActor(actorRef) + + private[akka] def deregisterSupervisorForActor(actorRef: ActorRef): ActorRef = + clientFor().deregisterSupervisorForActor(actorRef)*/ + + /** + * Clean-up all open connections. + */ + def shutdownClientModule = synchronized { + remoteClients.foreach({ case (addr, client) => client.shutdown }) + remoteClients.clear + } + + def registerClientManagedActor(hostname: String, port: Int, uuid: Uuid) = synchronized { + actorsFor(Address(hostname, port)) += uuid + } + + private[akka] def unregisterClientManagedActor(hostname: String, port: Int, uuid: Uuid) = synchronized { + val set = actorsFor(Address(hostname, port)) + set -= uuid + if (set.isEmpty) shutdownClientFor(new InetSocketAddress(hostname, port)) + } + + private[akka] def actorsFor(remoteServerAddress: Address): HashSet[Uuid] = { + val set = remoteActors.get(remoteServerAddress) + if (set.isDefined && (set.get ne null)) set.get + else { + val remoteActorSet = new HashSet[Uuid] + remoteActors.put(remoteServerAddress, remoteActorSet) + remoteActorSet + } + } +} + +object RemoteClient { + val SECURE_COOKIE: Option[String] = { + val cookie = config.getString("akka.remote.secure-cookie", "") + if (cookie == "") None else Some(cookie) + } + + val READ_TIMEOUT = Duration(config.getInt("akka.remote.client.read-timeout", 1), TIME_UNIT) + val RECONNECT_DELAY = Duration(config.getInt("akka.remote.client.reconnect-delay", 5), TIME_UNIT) + val MESSAGE_FRAME_SIZE = config.getInt("akka.remote.client.message-frame-size", 1048576) +} +/** + * RemoteClient represents a connection to a RemoteServer. Is used to send messages to remote actors on the RemoteServer. + * + * @author Jonas Bonér + */ +class RemoteClient private[akka] ( + val hostname: String, + val port: Int, + val loader: Option[ClassLoader] = None, + val notifyListeners: (=> Any) => Unit) extends Logging { + val name = "RemoteClient@" + hostname + "::" + port + + //FIXME Should these be clear:ed on postStop? + private val futures = new ConcurrentHashMap[Uuid, CompletableFuture[_]] + private val supervisors = new ConcurrentHashMap[Uuid, ActorRef] + + private val remoteAddress = new InetSocketAddress(hostname, port) + + //FIXME rewrite to a wrapper object (minimize volatile access and maximize encapsulation) + @volatile + private var bootstrap: ClientBootstrap = _ + @volatile + private[remote] var connection: ChannelFuture = _ + @volatile + private[remote] var openChannels: DefaultChannelGroup = _ + @volatile + private var timer: HashedWheelTimer = _ + private[remote] val runSwitch = new Switch() + private[remote] val isAuthenticated = new AtomicBoolean(false) + + private[remote] def isRunning = runSwitch.isOn + + private val reconnectionTimeWindow = Duration(config.getInt( + "akka.remote.client.reconnection-time-window", 600), TIME_UNIT).toMillis + @volatile + private var reconnectionTimeWindowStart = 0L + + def connect = runSwitch switchOn { + openChannels = new DefaultChannelGroup(classOf[RemoteClient].getName) + timer = new HashedWheelTimer + + bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool)) + bootstrap.setPipelineFactory(new RemoteClientPipelineFactory(name, futures, supervisors, bootstrap, remoteAddress, timer, this)) + bootstrap.setOption("tcpNoDelay", true) + bootstrap.setOption("keepAlive", true) + + log.slf4j.info("Starting remote client connection to [{}:{}]", hostname, port) + + // Wait until the connection attempt succeeds or fails. + connection = bootstrap.connect(remoteAddress) + val channel = connection.awaitUninterruptibly.getChannel + openChannels.add(channel) + + if (!connection.isSuccess) { + notifyListeners(RemoteClientError(connection.getCause, this)) + log.slf4j.error("Remote client connection to [{}:{}] has failed", hostname, port) + log.slf4j.debug("Remote client connection failed", connection.getCause) + } + notifyListeners(RemoteClientStarted(this)) + } + + def shutdown = runSwitch switchOff { + log.slf4j.info("Shutting down {}", name) + notifyListeners(RemoteClientShutdown(this)) + timer.stop + timer = null + openChannels.close.awaitUninterruptibly + openChannels = null + bootstrap.releaseExternalResources + bootstrap = null + connection = null + log.slf4j.info("{} has been shut down", name) + } + + def send[T]( + message: Any, + senderOption: Option[ActorRef], + senderFuture: Option[CompletableFuture[T]], + remoteAddress: InetSocketAddress, + timeout: Long, + isOneWay: Boolean, + actorRef: ActorRef, + typedActorInfo: Option[Tuple2[String, String]], + actorType: AkkaActorType): Option[CompletableFuture[T]] = { + val cookie = if (isAuthenticated.compareAndSet(false, true)) RemoteClient.SECURE_COOKIE + else None + send(createRemoteMessageProtocolBuilder( + Some(actorRef), + Left(actorRef.uuid), + actorRef.id, + actorRef.actorClassName, + actorRef.timeout, + Left(message), + isOneWay, + senderOption, + typedActorInfo, + actorType, + cookie + ).build, senderFuture) + } + + def send[T]( + request: RemoteMessageProtocol, + senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = { + if (isRunning) { + if (request.getOneWay) { + connection.getChannel.write(request) + None + } else { + val futureResult = if (senderFuture.isDefined) senderFuture.get + else new DefaultCompletableFuture[T](request.getActorInfo.getTimeout) + futures.put(uuidFrom(request.getUuid.getHigh, request.getUuid.getLow), futureResult) + connection.getChannel.write(request) + Some(futureResult) + } + } else { + val exception = new RemoteClientException( + "Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.", this) + notifyListeners(RemoteClientError(exception, this)) + throw exception + } + } + + private[akka] def registerSupervisorForActor(actorRef: ActorRef): ActorRef = + if (!actorRef.supervisor.isDefined) throw new IllegalActorStateException( + "Can't register supervisor for " + actorRef + " since it is not under supervision") + else supervisors.putIfAbsent(actorRef.supervisor.get.uuid, actorRef) + + private[akka] def deregisterSupervisorForActor(actorRef: ActorRef): ActorRef = + if (!actorRef.supervisor.isDefined) throw new IllegalActorStateException( + "Can't unregister supervisor for " + actorRef + " since it is not under supervision") + else supervisors.remove(actorRef.supervisor.get.uuid) + + private[akka] def isWithinReconnectionTimeWindow: Boolean = { + if (reconnectionTimeWindowStart == 0L) { + reconnectionTimeWindowStart = System.currentTimeMillis + true + } else { + val timeLeft = reconnectionTimeWindow - (System.currentTimeMillis - reconnectionTimeWindowStart) + if (timeLeft > 0) { + log.slf4j.info("Will try to reconnect to remote server for another [{}] milliseconds", timeLeft) + true + } else false + } + } + + private[akka] def resetReconnectionTimeWindow = reconnectionTimeWindowStart = 0L +} + +/** + * @author Jonas Bonér + */ +class RemoteClientPipelineFactory( + name: String, + futures: ConcurrentMap[Uuid, CompletableFuture[_]], + supervisors: ConcurrentMap[Uuid, ActorRef], + bootstrap: ClientBootstrap, + remoteAddress: SocketAddress, + timer: HashedWheelTimer, + client: RemoteClient) extends ChannelPipelineFactory { + + def getPipeline: ChannelPipeline = { + def join(ch: ChannelHandler*) = Array[ChannelHandler](ch: _*) + + lazy val engine = { + val e = RemoteServerSslContext.client.createSSLEngine() + e.setEnabledCipherSuites(e.getSupportedCipherSuites) //TODO is this sensible? + e.setUseClientMode(true) + e + } + + val ssl = if (RemoteServer.SECURE) join(new SslHandler(engine)) else join() + val timeout = new ReadTimeoutHandler(timer, RemoteClient.READ_TIMEOUT.toMillis.toInt) + val lenDec = new LengthFieldBasedFrameDecoder(RemoteClient.MESSAGE_FRAME_SIZE, 0, 4, 0, 4) + val lenPrep = new LengthFieldPrepender(4) + val protobufDec = new ProtobufDecoder(RemoteMessageProtocol.getDefaultInstance) + val protobufEnc = new ProtobufEncoder + val (enc, dec) = RemoteServer.COMPRESSION_SCHEME match { + case "zlib" => (join(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL)), join(new ZlibDecoder)) + case _ => (join(), join()) + } + + val remoteClient = new RemoteClientHandler(name, futures, supervisors, bootstrap, remoteAddress, timer, client) + val stages = ssl ++ join(timeout) ++ dec ++ join(lenDec, protobufDec) ++ enc ++ join(lenPrep, protobufEnc, remoteClient) + new StaticChannelPipeline(stages: _*) + } +} + +/** + * @author Jonas Bonér + */ +@ChannelHandler.Sharable +class RemoteClientHandler( + val name: String, + val futures: ConcurrentMap[Uuid, CompletableFuture[_]], + val supervisors: ConcurrentMap[Uuid, ActorRef], + val bootstrap: ClientBootstrap, + val remoteAddress: SocketAddress, + val timer: HashedWheelTimer, + val client: RemoteClient) + extends SimpleChannelUpstreamHandler with Logging { + + override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = { + if (event.isInstanceOf[ChannelStateEvent] && + event.asInstanceOf[ChannelStateEvent].getState != ChannelState.INTEREST_OPS) { + log.slf4j.debug(event.toString) + } + super.handleUpstream(ctx, event) + } + + override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) { + try { + val result = event.getMessage + if (result.isInstanceOf[RemoteMessageProtocol]) { + val reply = result.asInstanceOf[RemoteMessageProtocol] + val replyUuid = uuidFrom(reply.getUuid.getHigh, reply.getUuid.getLow) + log.debug("Remote client received RemoteMessageProtocol[\n{}]",reply) + val future = futures.get(replyUuid).asInstanceOf[CompletableFuture[Any]] + if (reply.hasMessage) { + if (future eq null) throw new IllegalActorStateException("Future mapped to UUID " + replyUuid + " does not exist") + val message = MessageSerializer.deserialize(reply.getMessage) + future.completeWithResult(message) + } else { + if (reply.hasSupervisorUuid()) { + val supervisorUuid = uuidFrom(reply.getSupervisorUuid.getHigh, reply.getSupervisorUuid.getLow) + if (!supervisors.containsKey(supervisorUuid)) throw new IllegalActorStateException( + "Expected a registered supervisor for UUID [" + supervisorUuid + "] but none was found") + val supervisedActor = supervisors.get(supervisorUuid) + if (!supervisedActor.supervisor.isDefined) throw new IllegalActorStateException( + "Can't handle restart for remote actor " + supervisedActor + " since its supervisor has been removed") + else supervisedActor.supervisor.get ! Exit(supervisedActor, parseException(reply, client.loader)) + } + val exception = parseException(reply, client.loader) + future.completeWithException(exception) + } + futures remove replyUuid + } else { + val exception = new RemoteClientException("Unknown message received in remote client handler: " + result, client) + client.notifyListeners(RemoteClientError(exception, client)) + throw exception + } + } catch { + case e: Exception => + client.notifyListeners(RemoteClientError(e, client)) + log.slf4j.error("Unexpected exception in remote client handler: {}", e) + throw e + } + } + + override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = client.runSwitch ifOn { + if (client.isWithinReconnectionTimeWindow) { + timer.newTimeout(new TimerTask() { + def run(timeout: Timeout) = { + client.openChannels.remove(event.getChannel) + client.isAuthenticated.set(false) + log.slf4j.debug("Remote client reconnecting to [{}]", remoteAddress) + client.connection = bootstrap.connect(remoteAddress) + client.connection.awaitUninterruptibly // Wait until the connection attempt succeeds or fails. + if (!client.connection.isSuccess) { + client.notifyListeners(RemoteClientError(client.connection.getCause, client)) + log.slf4j.error("Reconnection to [{}] has failed", remoteAddress) + log.slf4j.debug("Reconnection failed", client.connection.getCause) + } + } + }, RemoteClient.RECONNECT_DELAY.toMillis, TimeUnit.MILLISECONDS) + } else spawn { client.shutdown } + } + + override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { + def connect = { + client.notifyListeners(RemoteClientConnected(client)) + log.slf4j.debug("Remote client connected to [{}]", ctx.getChannel.getRemoteAddress) + client.resetReconnectionTimeWindow + } + + if (RemoteServer.SECURE) { + val sslHandler: SslHandler = ctx.getPipeline.get(classOf[SslHandler]) + sslHandler.handshake.addListener(new ChannelFutureListener { + def operationComplete(future: ChannelFuture): Unit = { + if (future.isSuccess) connect + else throw new RemoteClientException("Could not establish SSL handshake", client) + } + }) + } else connect + } + + override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { + client.notifyListeners(RemoteClientDisconnected(client)) + log.slf4j.debug("Remote client disconnected from [{}]", ctx.getChannel.getRemoteAddress) + } + + override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = { + client.notifyListeners(RemoteClientError(event.getCause, client)) + if (event.getCause ne null) + log.slf4j.error("Unexpected exception from downstream in remote client", event.getCause) + else + log.slf4j.error("Unexpected exception from downstream in remote client: {}", event) + + event.getChannel.close + } + + private def parseException(reply: RemoteMessageProtocol, loader: Option[ClassLoader]): Throwable = { + val exception = reply.getException + val classname = exception.getClassname + val exceptionClass = if (loader.isDefined) loader.get.loadClass(classname) + else Class.forName(classname) + exceptionClass + .getConstructor(Array[Class[_]](classOf[String]): _*) + .newInstance(exception.getMessage).asInstanceOf[Throwable] + } +} /** * For internal use only. Holds configuration variables, remote actors, remote typed actors and remote servers. @@ -118,41 +525,62 @@ object RemoteServer { /** * Life-cycle events for RemoteServer. */ -sealed trait RemoteServerLifeCycleEvent +sealed trait RemoteServerLifeCycleEvent //TODO: REVISIT: Document change from RemoteServer to RemoteServerModule case class RemoteServerStarted( - @BeanProperty val server: RemoteServer) extends RemoteServerLifeCycleEvent + @BeanProperty val server: RemoteServerModule) extends RemoteServerLifeCycleEvent case class RemoteServerShutdown( - @BeanProperty val server: RemoteServer) extends RemoteServerLifeCycleEvent + @BeanProperty val server: RemoteServerModule) extends RemoteServerLifeCycleEvent case class RemoteServerError( @BeanProperty val cause: Throwable, - @BeanProperty val server: RemoteServer) extends RemoteServerLifeCycleEvent + @BeanProperty val server: RemoteServerModule) extends RemoteServerLifeCycleEvent case class RemoteServerClientConnected( - @BeanProperty val server: RemoteServer, + @BeanProperty val server: RemoteServerModule, @BeanProperty val clientAddress: Option[InetSocketAddress]) extends RemoteServerLifeCycleEvent case class RemoteServerClientDisconnected( - @BeanProperty val server: RemoteServer, + @BeanProperty val server: RemoteServerModule, @BeanProperty val clientAddress: Option[InetSocketAddress]) extends RemoteServerLifeCycleEvent case class RemoteServerClientClosed( - @BeanProperty val server: RemoteServer, + @BeanProperty val server: RemoteServerModule, @BeanProperty val clientAddress: Option[InetSocketAddress]) extends RemoteServerLifeCycleEvent + /** - * Use this class if you need a more than one remote server on a specific node. - * - *
- * val server = new RemoteServer
- * server.start
- * 
- * - * If you need to create more than one, then you can use the RemoteServer: - * - *
- * RemoteNode.start
- * 
- * - * @author Jonas Bonér + * Provides the implementation of the Netty remote support */ -class RemoteServer extends RemoteServerModule { +class NettyRemoteSupport(val registry: ActorRegistryInstance) + extends RemoteSupport with NettyRemoteServerModule with NettyRemoteClientModule { + + protected[akka] def actorFor(serviceId: String, className: String, timeout: Long, hostname: String, port: Int, loader: Option[ClassLoader]): ActorRef = { + //TODO: REVISIT: Possible to optimize server-managed actors in local scope? + //val Host = this.hostname + //val Port = this.port + + //(host,port) match { + // case (Host, Port) if optimizeLocalScoped_? => + //if actor with that servicename or uuid is present locally, return a LocalActorRef to that one + //else return RemoteActorRef(registry, serviceId, className, hostname, port, timeout, false, loader) + // case _ => + // RemoteActorRef(registry, serviceId, className, hostname, port, timeout, false, loader) + //} + RemoteActorRef(registry, serviceId, className, hostname, port, timeout, false, loader) + } + + def clientManagedActorOf(clazz: Class[_ <: Actor], host: String, port: Int, timeout: Long): ActorRef = { + val Host = this.hostname + val Port = this.port + + (host,port) match { + case (Host, Port) if optimizeLocalScoped_? => + registry.actorOf(clazz) //Local + case _ => + new RemoteActorRef(registry,clazz.getName,clazz.getName,host,port,timeout,true /*Client managed*/, None) + } + } + + val optimizeLocalScoped_? = true +} + +trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule => import RemoteServer._ @volatile private[akka] var address = Address(RemoteServer.HOSTNAME,RemoteServer.PORT) @@ -172,7 +600,7 @@ class RemoteServer extends RemoteServerModule { def isRunning = _isRunning.isOn - def start(_hostname: String, _port: Int, loader: Option[ClassLoader] = None): RemoteServer = guard withGuard { + def start(_hostname: String, _port: Int, loader: Option[ClassLoader] = None): RemoteServerModule = guard withGuard { try { _isRunning switchOn { address = Address(_hostname,_port) @@ -196,7 +624,7 @@ class RemoteServer extends RemoteServerModule { this } - def shutdown = guard withGuard { + def shutdownServerModule = guard withGuard { _isRunning switchOff { try { openChannels.disconnect @@ -336,22 +764,15 @@ class RemoteServer extends RemoteServerModule { *

* NOTE: You need to call this method if you have registered an actor by a custom ID. */ - def unregisterTypedPerSessionActor(id: String): Unit = { - if (_isRunning.isOn) { - typedActorsFactories.remove(id) - } - } + def unregisterTypedPerSessionActor(id: String): Unit = + if (_isRunning.isOn) typedActorsFactories.remove(id) - protected override def manageLifeCycleOfListeners = false - - protected[akka] override def notifyListeners(message: => Any): Unit = super.notifyListeners(message) - - private[akka] def actors = ActorRegistry.actors(address) - private[akka] def actorsByUuid = ActorRegistry.actorsByUuid(address) - private[akka] def actorsFactories = ActorRegistry.actorsFactories(address) - private[akka] def typedActors = ActorRegistry.typedActors(address) - private[akka] def typedActorsByUuid = ActorRegistry.typedActorsByUuid(address) - private[akka] def typedActorsFactories = ActorRegistry.typedActorsFactories(address) + private[akka] def actors = registry.actors(address) + private[akka] def actorsByUuid = registry.actorsByUuid(address) + private[akka] def actorsFactories = registry.actorsFactories(address) + private[akka] def typedActors = registry.typedActors(address) + private[akka] def typedActorsByUuid = registry.typedActorsByUuid(address) + private[akka] def typedActorsFactories = registry.typedActorsFactories(address) } object RemoteServerSslContext { @@ -376,7 +797,7 @@ class RemoteServerPipelineFactory( val name: String, val openChannels: ChannelGroup, val loader: Option[ClassLoader], - val server: RemoteServer) extends ChannelPipelineFactory { + val server: NettyRemoteServerModule) extends ChannelPipelineFactory { import RemoteServer._ def getPipeline: ChannelPipeline = { @@ -413,7 +834,7 @@ class RemoteServerHandler( val name: String, val openChannels: ChannelGroup, val applicationLoader: Option[ClassLoader], - val server: RemoteServer) extends SimpleChannelUpstreamHandler with Logging { + val server: NettyRemoteServerModule) extends SimpleChannelUpstreamHandler with Logging { import RemoteServer._ val AW_PROXY_PREFIX = "$$ProxiedByAW".intern @@ -422,7 +843,7 @@ class RemoteServerHandler( val sessionActors = new ChannelLocal[ConcurrentHashMap[String, ActorRef]]() val typedSessionActors = new ChannelLocal[ConcurrentHashMap[String, AnyRef]]() - applicationLoader.foreach(MessageSerializer.setClassLoader(_)) + applicationLoader.foreach(MessageSerializer.setClassLoader(_)) //TODO: REVISIT: THIS FEELS A BIT DODGY /** * ChannelOpen overridden to store open channels for a clean postStop of a RemoteServer. diff --git a/akka-remote/src/main/scala/akka/remote/RemoteClient.scala b/akka-remote/src/main/scala/akka/remote/RemoteClient.scala deleted file mode 100644 index 0a27f04cb1..0000000000 --- a/akka-remote/src/main/scala/akka/remote/RemoteClient.scala +++ /dev/null @@ -1,515 +0,0 @@ -/** - * Copyright (C) 2009-2010 Scalable Solutions AB - */ - -package akka.remote - -import akka.remote.protocol.RemoteProtocol.{ActorType => ActorTypeProtocol, _} -import akka.actor.{Exit, Actor, ActorRef, ActorType, RemoteActorRef, IllegalActorStateException} -import akka.dispatch.{DefaultCompletableFuture, CompletableFuture} -import akka.actor.{Uuid,newUuid,uuidFrom} -import akka.config.Config._ -import akka.serialization.RemoteActorSerialization._ -import akka.AkkaException -import Actor._ - -import org.jboss.netty.channel._ -import group.DefaultChannelGroup -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory -import org.jboss.netty.bootstrap.ClientBootstrap -import org.jboss.netty.handler.codec.frame.{ LengthFieldBasedFrameDecoder, LengthFieldPrepender } -import org.jboss.netty.handler.codec.compression.{ ZlibDecoder, ZlibEncoder } -import org.jboss.netty.handler.codec.protobuf.{ ProtobufDecoder, ProtobufEncoder } -import org.jboss.netty.handler.timeout.ReadTimeoutHandler -import org.jboss.netty.util.{ TimerTask, Timeout, HashedWheelTimer } -import org.jboss.netty.handler.ssl.SslHandler - -import java.net.{ SocketAddress, InetSocketAddress } -import java.util.concurrent.{ TimeUnit, Executors, ConcurrentMap, ConcurrentHashMap, ConcurrentSkipListSet } -import java.util.concurrent.atomic.{ AtomicLong, AtomicBoolean } - -import scala.collection.mutable.{ HashSet, HashMap } -import scala.reflect.BeanProperty - -import akka.actor._ -import akka.util._ - - -/** - * Life-cycle events for RemoteClient. - */ -sealed trait RemoteClientLifeCycleEvent -case class RemoteClientError( - @BeanProperty val cause: Throwable, - @BeanProperty val client: RemoteClient) extends RemoteClientLifeCycleEvent -case class RemoteClientDisconnected( - @BeanProperty val client: RemoteClient) extends RemoteClientLifeCycleEvent -case class RemoteClientConnected( - @BeanProperty val client: RemoteClient) extends RemoteClientLifeCycleEvent -case class RemoteClientStarted( - @BeanProperty val client: RemoteClient) extends RemoteClientLifeCycleEvent -case class RemoteClientShutdown( - @BeanProperty val client: RemoteClient) extends RemoteClientLifeCycleEvent - -/** - * Thrown for example when trying to send a message using a RemoteClient that is either not started or shut down. - */ -class RemoteClientException private[akka] (message: String, @BeanProperty val client: RemoteClient) extends AkkaException(message) - -/** - * The RemoteClient object manages RemoteClient instances and gives you an API to lookup remote actor handles. - * - * @author Jonas Bonér - */ -object RemoteClient extends Logging { - - val SECURE_COOKIE: Option[String] = { - val cookie = config.getString("akka.remote.secure-cookie", "") - if (cookie == "") None - else Some(cookie) - } - - val READ_TIMEOUT = Duration(config.getInt("akka.remote.client.read-timeout", 1), TIME_UNIT) - val RECONNECT_DELAY = Duration(config.getInt("akka.remote.client.reconnect-delay", 5), TIME_UNIT) - val MESSAGE_FRAME_SIZE = config.getInt("akka.remote.client.message-frame-size", 1048576) - - private val remoteClients = new HashMap[String, RemoteClient] - private val remoteActors = new HashMap[Address, HashSet[Uuid]] - - def actorFor(classNameOrServiceId: String, hostname: String, port: Int): ActorRef = - actorFor(classNameOrServiceId, classNameOrServiceId, Actor.TIMEOUT, hostname, port, None) - - def actorFor(classNameOrServiceId: String, hostname: String, port: Int, loader: ClassLoader): ActorRef = - actorFor(classNameOrServiceId, classNameOrServiceId, Actor.TIMEOUT, hostname, port, Some(loader)) - - def actorFor(serviceId: String, className: String, hostname: String, port: Int): ActorRef = - actorFor(serviceId, className, Actor.TIMEOUT, hostname, port, None) - - def actorFor(serviceId: String, className: String, hostname: String, port: Int, loader: ClassLoader): ActorRef = - actorFor(serviceId, className, Actor.TIMEOUT, hostname, port, Some(loader)) - - def actorFor(classNameOrServiceId: String, timeout: Long, hostname: String, port: Int): ActorRef = - actorFor(classNameOrServiceId, classNameOrServiceId, timeout, hostname, port, None) - - def actorFor(classNameOrServiceId: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): ActorRef = - actorFor(classNameOrServiceId, classNameOrServiceId, timeout, hostname, port, Some(loader)) - - def actorFor(serviceId: String, className: String, timeout: Long, hostname: String, port: Int): ActorRef = - RemoteActorRef(serviceId, className, hostname, port, timeout, false, None) - - def typedActorFor[T](intfClass: Class[T], serviceIdOrClassName: String, hostname: String, port: Int): T = { - typedActorFor(intfClass, serviceIdOrClassName, serviceIdOrClassName, Actor.TIMEOUT, hostname, port, None) - } - - def typedActorFor[T](intfClass: Class[T], serviceIdOrClassName: String, timeout: Long, hostname: String, port: Int): T = { - typedActorFor(intfClass, serviceIdOrClassName, serviceIdOrClassName, timeout, hostname, port, None) - } - - def typedActorFor[T](intfClass: Class[T], serviceIdOrClassName: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): T = { - typedActorFor(intfClass, serviceIdOrClassName, serviceIdOrClassName, timeout, hostname, port, Some(loader)) - } - - def typedActorFor[T](intfClass: Class[T], serviceId: String, implClassName: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): T = { - typedActorFor(intfClass, serviceId, implClassName, timeout, hostname, port, Some(loader)) - } - - private[akka] def typedActorFor[T](intfClass: Class[T], serviceId: String, implClassName: String, timeout: Long, hostname: String, port: Int, loader: Option[ClassLoader]): T = { - val actorRef = RemoteActorRef(serviceId, implClassName, hostname, port, timeout, false, loader, ActorType.TypedActor) - TypedActor.createProxyForRemoteActorRef(intfClass, actorRef) - } - - private[akka] def actorFor(serviceId: String, className: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): ActorRef = - RemoteActorRef(serviceId, className, hostname, port, timeout, false, Some(loader)) - - private[akka] def actorFor(serviceId: String, className: String, timeout: Long, hostname: String, port: Int, loader: Option[ClassLoader]): ActorRef = - RemoteActorRef(serviceId, className, hostname, port, timeout, false, loader) - - def clientFor(hostname: String, port: Int): RemoteClient = - clientFor(new InetSocketAddress(hostname, port), None) - - def clientFor(hostname: String, port: Int, loader: ClassLoader): RemoteClient = - clientFor(new InetSocketAddress(hostname, port), Some(loader)) - - def clientFor(address: InetSocketAddress): RemoteClient = - clientFor(address, None) - - def clientFor(address: InetSocketAddress, loader: ClassLoader): RemoteClient = - clientFor(address, Some(loader)) - - private[akka] def clientFor(hostname: String, port: Int, loader: Option[ClassLoader]): RemoteClient = - clientFor(new InetSocketAddress(hostname, port), loader) - - private[akka] def clientFor( - address: InetSocketAddress, loader: Option[ClassLoader]): RemoteClient = synchronized { - val hostname = address.getHostName - val port = address.getPort - val hash = hostname + ':' + port - loader.foreach(MessageSerializer.setClassLoader(_)) - if (remoteClients.contains(hash)) remoteClients(hash) - else { - val client = new RemoteClient(hostname, port, loader) - client.connect - remoteClients += hash -> client - client - } - } - - def shutdownClientFor(address: InetSocketAddress) = synchronized { - val hostname = address.getHostName - val port = address.getPort - val hash = hostname + ':' + port - if (remoteClients.contains(hash)) { - val client = remoteClients(hash) - client.shutdown - remoteClients -= hash - } - } - - /** - * Clean-up all open connections. - */ - def shutdownAll = synchronized { - remoteClients.foreach({ case (addr, client) => client.shutdown }) - remoteClients.clear - } - - def register(hostname: String, port: Int, uuid: Uuid) = synchronized { - actorsFor(Address(hostname, port)) += uuid - } - - private[akka] def unregister(hostname: String, port: Int, uuid: Uuid) = synchronized { - val set = actorsFor(Address(hostname, port)) - set -= uuid - if (set.isEmpty) shutdownClientFor(new InetSocketAddress(hostname, port)) - } - - private[akka] def actorsFor(remoteServerAddress: Address): HashSet[Uuid] = { - val set = remoteActors.get(remoteServerAddress) - if (set.isDefined && (set.get ne null)) set.get - else { - val remoteActorSet = new HashSet[Uuid] - remoteActors.put(remoteServerAddress, remoteActorSet) - remoteActorSet - } - } -} - -/** - * RemoteClient represents a connection to a RemoteServer. Is used to send messages to remote actors on the RemoteServer. - * - * @author Jonas Bonér - */ -class RemoteClient private[akka] ( - val hostname: String, val port: Int, val loader: Option[ClassLoader] = None) - extends Logging with ListenerManagement { - val name = "RemoteClient@" + hostname + "::" + port - - //FIXME Should these be clear:ed on postStop? - private val futures = new ConcurrentHashMap[Uuid, CompletableFuture[_]] - private val supervisors = new ConcurrentHashMap[Uuid, ActorRef] - - private val remoteAddress = new InetSocketAddress(hostname, port) - - //FIXME rewrite to a wrapper object (minimize volatile access and maximize encapsulation) - @volatile - private var bootstrap: ClientBootstrap = _ - @volatile - private[remote] var connection: ChannelFuture = _ - @volatile - private[remote] var openChannels: DefaultChannelGroup = _ - @volatile - private var timer: HashedWheelTimer = _ - private[remote] val runSwitch = new Switch() - private[remote] val isAuthenticated = new AtomicBoolean(false) - - private[remote] def isRunning = runSwitch.isOn - - private val reconnectionTimeWindow = Duration(config.getInt( - "akka.remote.client.reconnection-time-window", 600), TIME_UNIT).toMillis - @volatile - private var reconnectionTimeWindowStart = 0L - - def connect = runSwitch switchOn { - openChannels = new DefaultChannelGroup(classOf[RemoteClient].getName) - timer = new HashedWheelTimer - - bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool)) - bootstrap.setPipelineFactory(new RemoteClientPipelineFactory(name, futures, supervisors, bootstrap, remoteAddress, timer, this)) - bootstrap.setOption("tcpNoDelay", true) - bootstrap.setOption("keepAlive", true) - - log.slf4j.info("Starting remote client connection to [{}:{}]", hostname, port) - - // Wait until the connection attempt succeeds or fails. - connection = bootstrap.connect(remoteAddress) - val channel = connection.awaitUninterruptibly.getChannel - openChannels.add(channel) - - if (!connection.isSuccess) { - notifyListeners(RemoteClientError(connection.getCause, this)) - log.slf4j.error("Remote client connection to [{}:{}] has failed", hostname, port) - log.slf4j.debug("Remote client connection failed", connection.getCause) - } - notifyListeners(RemoteClientStarted(this)) - } - - def shutdown = runSwitch switchOff { - log.slf4j.info("Shutting down {}", name) - notifyListeners(RemoteClientShutdown(this)) - timer.stop - timer = null - openChannels.close.awaitUninterruptibly - openChannels = null - bootstrap.releaseExternalResources - bootstrap = null - connection = null - log.slf4j.info("{} has been shut down", name) - } - - @deprecated("Use addListener instead") - def registerListener(actorRef: ActorRef) = addListener(actorRef) - - @deprecated("Use removeListener instead") - def deregisterListener(actorRef: ActorRef) = removeListener(actorRef) - - override def notifyListeners(message: => Any): Unit = super.notifyListeners(message) - - protected override def manageLifeCycleOfListeners = false - - def send[T]( - message: Any, - senderOption: Option[ActorRef], - senderFuture: Option[CompletableFuture[T]], - remoteAddress: InetSocketAddress, - timeout: Long, - isOneWay: Boolean, - actorRef: ActorRef, - typedActorInfo: Option[Tuple2[String, String]], - actorType: ActorType): Option[CompletableFuture[T]] = { - val cookie = if (isAuthenticated.compareAndSet(false, true)) RemoteClient.SECURE_COOKIE - else None - send(createRemoteMessageProtocolBuilder( - Some(actorRef), - Left(actorRef.uuid), - actorRef.id, - actorRef.actorClassName, - actorRef.timeout, - Left(message), - isOneWay, - senderOption, - typedActorInfo, - actorType, - cookie - ).build, senderFuture) - } - - def send[T]( - request: RemoteMessageProtocol, - senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = { - if (isRunning) { - if (request.getOneWay) { - connection.getChannel.write(request) - None - } else { - val futureResult = if (senderFuture.isDefined) senderFuture.get - else new DefaultCompletableFuture[T](request.getActorInfo.getTimeout) - futures.put(uuidFrom(request.getUuid.getHigh, request.getUuid.getLow), futureResult) - connection.getChannel.write(request) - Some(futureResult) - } - } else { - val exception = new RemoteClientException( - "Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.", this) - notifyListeners(RemoteClientError(exception, this)) - throw exception - } - } - - private[akka] def registerSupervisorForActor(actorRef: ActorRef) = - if (!actorRef.supervisor.isDefined) throw new IllegalActorStateException( - "Can't register supervisor for " + actorRef + " since it is not under supervision") - else supervisors.putIfAbsent(actorRef.supervisor.get.uuid, actorRef) - - private[akka] def deregisterSupervisorForActor(actorRef: ActorRef) = - if (!actorRef.supervisor.isDefined) throw new IllegalActorStateException( - "Can't unregister supervisor for " + actorRef + " since it is not under supervision") - else supervisors.remove(actorRef.supervisor.get.uuid) - - private[akka] def isWithinReconnectionTimeWindow: Boolean = { - if (reconnectionTimeWindowStart == 0L) { - reconnectionTimeWindowStart = System.currentTimeMillis - true - } else { - val timeLeft = reconnectionTimeWindow - (System.currentTimeMillis - reconnectionTimeWindowStart) - if (timeLeft > 0) { - log.slf4j.info("Will try to reconnect to remote server for another [{}] milliseconds", timeLeft) - true - } else false - } - } - - private[akka] def resetReconnectionTimeWindow = reconnectionTimeWindowStart = 0L -} - -/** - * @author Jonas Bonér - */ -class RemoteClientPipelineFactory( - name: String, - futures: ConcurrentMap[Uuid, CompletableFuture[_]], - supervisors: ConcurrentMap[Uuid, ActorRef], - bootstrap: ClientBootstrap, - remoteAddress: SocketAddress, - timer: HashedWheelTimer, - client: RemoteClient) extends ChannelPipelineFactory { - - def getPipeline: ChannelPipeline = { - def join(ch: ChannelHandler*) = Array[ChannelHandler](ch: _*) - - lazy val engine = { - val e = RemoteServerSslContext.client.createSSLEngine() - e.setEnabledCipherSuites(e.getSupportedCipherSuites) //TODO is this sensible? - e.setUseClientMode(true) - e - } - - val ssl = if (RemoteServer.SECURE) join(new SslHandler(engine)) else join() - val timeout = new ReadTimeoutHandler(timer, RemoteClient.READ_TIMEOUT.toMillis.toInt) - val lenDec = new LengthFieldBasedFrameDecoder(RemoteClient.MESSAGE_FRAME_SIZE, 0, 4, 0, 4) - val lenPrep = new LengthFieldPrepender(4) - val protobufDec = new ProtobufDecoder(RemoteMessageProtocol.getDefaultInstance) - val protobufEnc = new ProtobufEncoder - val (enc, dec) = RemoteServer.COMPRESSION_SCHEME match { - case "zlib" => (join(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL)), join(new ZlibDecoder)) - case _ => (join(), join()) - } - - val remoteClient = new RemoteClientHandler(name, futures, supervisors, bootstrap, remoteAddress, timer, client) - val stages = ssl ++ join(timeout) ++ dec ++ join(lenDec, protobufDec) ++ enc ++ join(lenPrep, protobufEnc, remoteClient) - new StaticChannelPipeline(stages: _*) - } -} - -/** - * @author Jonas Bonér - */ -@ChannelHandler.Sharable -class RemoteClientHandler( - val name: String, - val futures: ConcurrentMap[Uuid, CompletableFuture[_]], - val supervisors: ConcurrentMap[Uuid, ActorRef], - val bootstrap: ClientBootstrap, - val remoteAddress: SocketAddress, - val timer: HashedWheelTimer, - val client: RemoteClient) - extends SimpleChannelUpstreamHandler with Logging { - - override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = { - if (event.isInstanceOf[ChannelStateEvent] && - event.asInstanceOf[ChannelStateEvent].getState != ChannelState.INTEREST_OPS) { - log.slf4j.debug(event.toString) - } - super.handleUpstream(ctx, event) - } - - override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) { - try { - val result = event.getMessage - if (result.isInstanceOf[RemoteMessageProtocol]) { - val reply = result.asInstanceOf[RemoteMessageProtocol] - val replyUuid = uuidFrom(reply.getUuid.getHigh, reply.getUuid.getLow) - log.debug("Remote client received RemoteMessageProtocol[\n{}]",reply) - val future = futures.get(replyUuid).asInstanceOf[CompletableFuture[Any]] - if (reply.hasMessage) { - if (future eq null) throw new IllegalActorStateException("Future mapped to UUID " + replyUuid + " does not exist") - val message = MessageSerializer.deserialize(reply.getMessage) - future.completeWithResult(message) - } else { - if (reply.hasSupervisorUuid()) { - val supervisorUuid = uuidFrom(reply.getSupervisorUuid.getHigh, reply.getSupervisorUuid.getLow) - if (!supervisors.containsKey(supervisorUuid)) throw new IllegalActorStateException( - "Expected a registered supervisor for UUID [" + supervisorUuid + "] but none was found") - val supervisedActor = supervisors.get(supervisorUuid) - if (!supervisedActor.supervisor.isDefined) throw new IllegalActorStateException( - "Can't handle restart for remote actor " + supervisedActor + " since its supervisor has been removed") - else supervisedActor.supervisor.get ! Exit(supervisedActor, parseException(reply, client.loader)) - } - val exception = parseException(reply, client.loader) - future.completeWithException(exception) - } - futures remove replyUuid - } else { - val exception = new RemoteClientException("Unknown message received in remote client handler: " + result, client) - client.notifyListeners(RemoteClientError(exception, client)) - throw exception - } - } catch { - case e: Exception => - client.notifyListeners(RemoteClientError(e, client)) - log.slf4j.error("Unexpected exception in remote client handler: {}", e) - throw e - } - } - - override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = client.runSwitch ifOn { - if (client.isWithinReconnectionTimeWindow) { - timer.newTimeout(new TimerTask() { - def run(timeout: Timeout) = { - client.openChannels.remove(event.getChannel) - client.isAuthenticated.set(false) - log.slf4j.debug("Remote client reconnecting to [{}]", remoteAddress) - client.connection = bootstrap.connect(remoteAddress) - client.connection.awaitUninterruptibly // Wait until the connection attempt succeeds or fails. - if (!client.connection.isSuccess) { - client.notifyListeners(RemoteClientError(client.connection.getCause, client)) - log.slf4j.error("Reconnection to [{}] has failed", remoteAddress) - log.slf4j.debug("Reconnection failed", client.connection.getCause) - } - } - }, RemoteClient.RECONNECT_DELAY.toMillis, TimeUnit.MILLISECONDS) - } else spawn { client.shutdown } - } - - override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { - def connect = { - client.notifyListeners(RemoteClientConnected(client)) - log.slf4j.debug("Remote client connected to [{}]", ctx.getChannel.getRemoteAddress) - client.resetReconnectionTimeWindow - } - - if (RemoteServer.SECURE) { - val sslHandler: SslHandler = ctx.getPipeline.get(classOf[SslHandler]) - sslHandler.handshake.addListener(new ChannelFutureListener { - def operationComplete(future: ChannelFuture): Unit = { - if (future.isSuccess) connect - else throw new RemoteClientException("Could not establish SSL handshake", client) - } - }) - } else connect - } - - override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { - client.notifyListeners(RemoteClientDisconnected(client)) - log.slf4j.debug("Remote client disconnected from [{}]", ctx.getChannel.getRemoteAddress) - } - - override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = { - client.notifyListeners(RemoteClientError(event.getCause, client)) - if (event.getCause ne null) - log.slf4j.error("Unexpected exception from downstream in remote client", event.getCause) - else - log.slf4j.error("Unexpected exception from downstream in remote client: {}", event) - - event.getChannel.close - } - - private def parseException(reply: RemoteMessageProtocol, loader: Option[ClassLoader]): Throwable = { - val exception = reply.getException - val classname = exception.getClassname - val exceptionClass = if (loader.isDefined) loader.get.loadClass(classname) - else Class.forName(classname) - exceptionClass - .getConstructor(Array[Class[_]](classOf[String]): _*) - .newInstance(exception.getMessage).asInstanceOf[Throwable] - } -} diff --git a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala index 258d210490..10eed2c362 100644 --- a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala +++ b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala @@ -17,7 +17,7 @@ import scala.collection.immutable.Stack import com.google.protobuf.ByteString import akka.util.ReflectiveAccess -import akka.util.ReflectiveAccess.RemoteServerModule.{HOSTNAME,PORT} +import akka.util.ReflectiveAccess.Remote.{HOSTNAME,PORT} /** * Type class definition for Actor Serialization @@ -191,6 +191,7 @@ object ActorSerialization { } val ar = new LocalActorRef( + ActorRegistry,//TODO: REVISIST: Change to an implicit ActorRegistryInstance? uuidFrom(protocol.getUuid.getHigh, protocol.getUuid.getLow), protocol.getId, protocol.getOriginalAddress.getHostname, @@ -230,6 +231,7 @@ object RemoteActorSerialization { private[akka] def fromProtobufToRemoteActorRef(protocol: RemoteActorRefProtocol, loader: Option[ClassLoader]): ActorRef = { Actor.log.slf4j.debug("Deserializing RemoteActorRefProtocol to RemoteActorRef:\n {}", protocol) RemoteActorRef( + ActorRegistry,//TODO: REVISIST: Change to an implicit ActorRegistryInstance? protocol.getClassOrServiceName, protocol.getActorClassname, protocol.getHomeAddress.getHostname, @@ -245,8 +247,10 @@ object RemoteActorSerialization { def toRemoteActorRefProtocol(ar: ActorRef): RemoteActorRefProtocol = { import ar._ - Actor.log.slf4j.debug("Register serialized Actor [{}] as remote @ [{}:{}]",Array[AnyRef](actorClassName, HOSTNAME, PORT.asInstanceOf[AnyRef])) - ActorRegistry.remote.registerByUuid(ar) + Actor.log.slf4j.debug("Register serialized Actor [{}] as remote @ [{}:{}]", + Array[AnyRef](actorClassName, registry.remote.hostname, registry.remote.port.asInstanceOf[AnyRef])) + + registry.remote.registerByUuid(ar) RemoteActorRefProtocol.newBuilder .setClassOrServiceName(uuid.toString) diff --git a/akka-remote/src/test/scala/remote/ClientInitiatedRemoteActorSpec.scala b/akka-remote/src/test/scala/remote/ClientInitiatedRemoteActorSpec.scala index a5e4159366..d5ee009142 100644 --- a/akka-remote/src/test/scala/remote/ClientInitiatedRemoteActorSpec.scala +++ b/akka-remote/src/test/scala/remote/ClientInitiatedRemoteActorSpec.scala @@ -4,10 +4,11 @@ import java.util.concurrent.{CountDownLatch, TimeUnit} import org.scalatest.junit.JUnitSuite import org.junit.{Test, Before, After} -import akka.remote.{RemoteServer, RemoteClient} import akka.dispatch.Dispatchers -import akka.actor.{ActorRef, Actor} -import Actor._ +import akka.remote. {NettyRemoteSupport, RemoteServer, RemoteClient} +import akka.actor. {RemoteActorRef, ActorRegistryInstance, ActorRef, Actor} + +class ExpectedRemoteProblem extends RuntimeException object ClientInitiatedRemoteActorSpec { case class Send(actor: Actor) @@ -28,8 +29,7 @@ object ClientInitiatedRemoteActorSpec { def receive = { case "Hello" => self.reply("World") - case "Failure" => - throw new RuntimeException("Expected exception; to test fault-tolerance") + case "Failure" => throw new ExpectedRemoteProblem } } @@ -40,6 +40,12 @@ object ClientInitiatedRemoteActorSpec { } } + class CountDownActor(latch: CountDownLatch) extends Actor { + def receive = { + case "World" => latch.countDown + } + } + object SendOneWayAndReplySenderActor { val latch = new CountDownLatch(1) } @@ -74,59 +80,54 @@ class ClientInitiatedRemoteActorSpec extends JUnitSuite { val HOSTNAME = "localhost" val PORT1 = 9990 val PORT2 = 9991 - var s1: RemoteServer = null + var s1,s2: ActorRegistryInstance = null private val unit = TimeUnit.MILLISECONDS @Before def init() { - s1 = new RemoteServer() - s1.start(HOSTNAME, PORT1) - Thread.sleep(1000) + s1 = new ActorRegistryInstance(Some(new NettyRemoteSupport(_))) + s2 = new ActorRegistryInstance(Some(new NettyRemoteSupport(_))) + s1.remote.start(HOSTNAME, PORT1) + s2.remote.start(HOSTNAME, PORT2) + Thread.sleep(2000) } @After def finished() { - s1.shutdown - val s2 = RemoteServer.serverFor(HOSTNAME, PORT2) - if (s2.isDefined) s2.get.shutdown - RemoteClient.shutdownAll + s1.remote.shutdown + s2.remote.shutdown + s1.shutdownAll + s2.shutdownAll Thread.sleep(1000) } @Test def shouldSendOneWay = { - val actor = actorOf[RemoteActorSpecActorUnidirectional] - actor.makeRemote(HOSTNAME, PORT1) - actor.start - actor ! "OneWay" + val clientManaged = s1.actorOf[RemoteActorSpecActorUnidirectional](HOSTNAME,PORT2).start + //implicit val self = Some(s2.actorOf[RemoteActorSpecActorUnidirectional].start) + assert(clientManaged ne null) + assert(clientManaged.getClass.equals(classOf[RemoteActorRef])) + clientManaged ! "OneWay" assert(RemoteActorSpecActorUnidirectional.latch.await(1, TimeUnit.SECONDS)) - actor.stop + clientManaged.stop } @Test def shouldSendOneWayAndReceiveReply = { - val actor = actorOf[SendOneWayAndReplyReceiverActor] - actor.makeRemote(HOSTNAME, PORT1) - actor.start - val sender = actorOf[SendOneWayAndReplySenderActor] - sender.homeAddress = (HOSTNAME, PORT2) - sender.actor.asInstanceOf[SendOneWayAndReplySenderActor].sendTo = actor - sender.start - sender.actor.asInstanceOf[SendOneWayAndReplySenderActor].sendOff - assert(SendOneWayAndReplySenderActor.latch.await(3, TimeUnit.SECONDS)) - assert(sender.actor.asInstanceOf[SendOneWayAndReplySenderActor].state.isDefined === true) - assert("World" === sender.actor.asInstanceOf[SendOneWayAndReplySenderActor].state.get.asInstanceOf[String]) - actor.stop - sender.stop + val latch = new CountDownLatch(1) + val actor = s2.actorOf[SendOneWayAndReplyReceiverActor](HOSTNAME, PORT1).start + implicit val sender = Some(s1.actorOf(new CountDownActor(latch)).start) + + actor ! "OneWay" + + assert(latch.await(3,TimeUnit.SECONDS)) } @Test def shouldSendBangBangMessageAndReceiveReply = { - val actor = actorOf[RemoteActorSpecActorBidirectional] - actor.makeRemote(HOSTNAME, PORT1) - actor.start + val actor = s2.actorOf[RemoteActorSpecActorBidirectional](HOSTNAME, PORT1).start val result = actor !! "Hello" assert("World" === result.get.asInstanceOf[String]) actor.stop @@ -134,29 +135,20 @@ class ClientInitiatedRemoteActorSpec extends JUnitSuite { @Test def shouldSendBangBangMessageAndReceiveReplyConcurrently = { - val actors = (1 to 10). - map(num => { - val a = actorOf[RemoteActorSpecActorBidirectional] - a.makeRemote(HOSTNAME, PORT1) - a.start - }).toList + val actors = (1 to 10).map(num => { s2.actorOf[RemoteActorSpecActorBidirectional](HOSTNAME, PORT1).start }).toList actors.map(_ !!! "Hello").foreach(future => assert("World" === future.await.result.asInstanceOf[Option[String]].get)) actors.foreach(_.stop) } @Test def shouldRegisterActorByUuid { - val actor1 = actorOf[MyActorCustomConstructor] - actor1.makeRemote(HOSTNAME, PORT1) - actor1.start + val actor1 = s2.actorOf[MyActorCustomConstructor](HOSTNAME, PORT1).start actor1 ! "incrPrefix" assert((actor1 !! "test").get === "1-test") actor1 ! "incrPrefix" assert((actor1 !! "test").get === "2-test") - val actor2 = actorOf[MyActorCustomConstructor] - actor2.makeRemote(HOSTNAME, PORT1) - actor2.start + val actor2 = s2.actorOf[MyActorCustomConstructor](HOSTNAME, PORT1).start assert((actor2 !! "test").get === "default-test") @@ -164,19 +156,11 @@ class ClientInitiatedRemoteActorSpec extends JUnitSuite { actor2.stop } - @Test + @Test(expected=classOf[ExpectedRemoteProblem]) def shouldSendAndReceiveRemoteException { implicit val timeout = 500000000L - val actor = actorOf[RemoteActorSpecActorBidirectional] - actor.makeRemote(HOSTNAME, PORT1) - actor.start - try { - actor !! "Failure" - fail("Should have thrown an exception") - } catch { - case e => - assert("Expected exception; to test fault-tolerance" === e.getMessage()) - } + val actor = s2.actorOf[RemoteActorSpecActorBidirectional](HOSTNAME, PORT1).start + actor !! "Failure" actor.stop } } diff --git a/akka-remote/src/test/scala/remote/RemoteSupervisorSpec.scala b/akka-remote/src/test/scala/remote/RemoteSupervisorSpec.scala index b340e89f45..017207cce7 100644 --- a/akka-remote/src/test/scala/remote/RemoteSupervisorSpec.scala +++ b/akka-remote/src/test/scala/remote/RemoteSupervisorSpec.scala @@ -76,9 +76,6 @@ object RemoteSupervisorSpec { var server: RemoteServer = null } -/** - * @author Jonas Bonér - */ class RemoteSupervisorSpec extends JUnitSuite { import RemoteSupervisorSpec._ diff --git a/akka-samples/akka-sample-remote/src/main/scala/ServerManagedRemoteActorSample.scala b/akka-samples/akka-sample-remote/src/main/scala/ServerManagedRemoteActorSample.scala index b81bc485da..7dad199ca5 100644 --- a/akka-samples/akka-sample-remote/src/main/scala/ServerManagedRemoteActorSample.scala +++ b/akka-samples/akka-sample-remote/src/main/scala/ServerManagedRemoteActorSample.scala @@ -4,10 +4,9 @@ package sample.remote -import akka.actor.Actor import akka.actor.Actor._ -import akka.remote.{RemoteClient, RemoteNode} import akka.util.Logging +import akka.actor. {ActorRegistry, Actor} class HelloWorldActor extends Actor { def receive = { @@ -20,9 +19,9 @@ class HelloWorldActor extends Actor { object ServerManagedRemoteActorServer extends Logging { def run = { - RemoteNode.start("localhost", 2552) + ActorRegistry.remote.start("localhost", 2552) log.slf4j.info("Remote node started") - RemoteNode.register("hello-service", actorOf[HelloWorldActor]) + ActorRegistry.remote.register("hello-service", actorOf[HelloWorldActor]) log.slf4j.info("Remote actor registered and started") } @@ -32,7 +31,7 @@ object ServerManagedRemoteActorServer extends Logging { object ServerManagedRemoteActorClient extends Logging { def run = { - val actor = RemoteClient.actorFor("hello-service", "localhost", 2552) + val actor = ActorRegistry.remote.actorFor("hello-service", "localhost", 2552) log.slf4j.info("Remote client created") log.slf4j.info("Sending 'Hello' to remote actor") val result = actor !! "Hello" diff --git a/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala index dbdf27b352..1672cdc3a8 100644 --- a/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala @@ -841,8 +841,8 @@ private[akka] abstract class ActorAspect { val isOneWay = TypedActor.isOneWay(methodRtti) val (message: Array[AnyRef], isEscaped) = escapeArguments(methodRtti.getParameterValues) - - val future = RemoteClientModule.send[AnyRef]( + //TODO: REVISIT: MAKE REGISTRY COME FROM ACTORREF + val future = ActorRegistry.remote.send[AnyRef]( message, None, None, remoteAddress.get, timeout, isOneWay, actorRef, Some((interfaceClass.getName, methodRtti.getMethod.getName)),