diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala
index c87d1460f9..58ceaf44b3 100644
--- a/akka-actor/src/main/scala/akka/actor/Actor.scala
+++ b/akka-actor/src/main/scala/akka/actor/Actor.scala
@@ -17,18 +17,6 @@ import scala.reflect.BeanProperty
import akka.util. {ReflectiveAccess, Logging, Duration}
import akka.japi.Procedure
-/**
- * Extend this abstract class to create a remote actor.
- *
- * Equivalent to invoking the makeRemote(..) method in the body of the ActorJonas Bonér
- */
-abstract class RemoteActor(address: InetSocketAddress) extends Actor {
- def this(hostname: String, port: Int) = this(new InetSocketAddress(hostname, port))
- self.makeRemote(address)
-}
-
/**
* Life-cycle messages for the Actors
*/
@@ -155,17 +143,35 @@ object Actor extends Logging {
def actorOf[T <: Actor : Manifest]: ActorRef = actorOf(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]])
/**
- * Creates an ActorRef out of the Actor with type T.
+ * Creates a Client-managed ActorRef out of the Actor of the specified Class.
+ * If the supplied host and port is identical of the configured local node, it will be a local actor
*
* import Actor._
- * val actor = actorOf[MyActor]
+ * val actor = actorOf[MyActor]("www.akka.io",2552)
* actor.start
* actor ! message
* actor.stop
*
* You can create and start the actor in one statement like this:
*
- * val actor = actorOf[MyActor].start
+ * val actor = actorOf[MyActor]("www.akka.io",2552).start
+ *
+ */
+ def actorOf[T <: Actor : Manifest](host: String, port: Int): ActorRef =
+ actorOf(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]], host, port)
+
+ /**
+ * Creates an ActorRef out of the Actor of the specified Class.
+ *
+ * import Actor._
+ * val actor = actorOf(classOf[MyActor])
+ * actor.start
+ * actor ! message
+ * actor.stop
+ *
+ * You can create and start the actor in one statement like this:
+ *
+ * val actor = actorOf(classOf[MyActor]).start
*
*/
def actorOf(clazz: Class[_ <: Actor]): ActorRef = new LocalActorRef(() => {
@@ -178,6 +184,39 @@ object Actor extends Logging {
"\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'."))
})
+ /**
+ * Creates a Client-managed ActorRef out of the Actor of the specified Class.
+ * If the supplied host and port is identical of the configured local node, it will be a local actor
+ *
+ * import Actor._
+ * val actor = actorOf(classOf[MyActor],"www.akka.io",2552)
+ * actor.start
+ * actor ! message
+ * actor.stop
+ *
+ * You can create and start the actor in one statement like this:
+ *
+ * val actor = actorOf(classOf[MyActor],"www.akka.io",2552).start
+ *
+ */
+ def actorOf(clazz: Class[_ <: Actor], host: String, port: Int, timeout: Long = TIMEOUT): ActorRef = {
+ import ReflectiveAccess._
+ import ReflectiveAccess.RemoteServerModule.{HOSTNAME,PORT}
+ ensureRemotingEnabled
+
+ (host,port) match {
+ case null => throw new IllegalArgumentException("No location specified")
+ case (HOSTNAME, PORT) => actorOf(clazz) //Local
+ case _ => new RemoteActorRef(clazz.getName,
+ clazz.getName,
+ host,
+ port,
+ timeout,
+ true, //Client managed
+ None)
+ }
+ }
+
/**
* Creates an ActorRef out of the Actor. Allows you to pass in a factory function
* that creates the Actor. Please note that this function can be invoked multiple
diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala
index dbaf013705..d938366b62 100644
--- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala
+++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala
@@ -77,8 +77,6 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
@volatile
protected[this] var _status: ActorRefInternals.StatusType = ActorRefInternals.UNSTARTED
@volatile
- protected[akka] var _homeAddress = new InetSocketAddress(RemoteServerModule.HOSTNAME, RemoteServerModule.PORT)
- @volatile
protected[akka] var _futureTimeout: Option[ScheduledFuture[AnyRef]] = None
protected[akka] val guard = new ReentrantGuard
@@ -390,62 +388,6 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
*/
def dispatcher: MessageDispatcher
- /**
- * Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host.
- */
- def makeRemote(hostname: String, port: Int): Unit
-
- /**
- * Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host.
- */
- def makeRemote(address: InetSocketAddress): Unit
-
- /**
- * Returns the home address and port for this actor.
- */
- def homeAddress: InetSocketAddress = _homeAddress
-
- /**
- * Akka Java API
- * Returns the home address and port for this actor.
- */
- def getHomeAddress(): InetSocketAddress = homeAddress
-
- /**
- * Set the home address and port for this actor.
- */
- def homeAddress_=(hostnameAndPort: Tuple2[String, Int]): Unit =
- homeAddress_=(new InetSocketAddress(hostnameAndPort._1, hostnameAndPort._2))
-
- /**
- * Akka Java API
- * Set the home address and port for this actor.
- */
- def setHomeAddress(hostname: String, port: Int): Unit = homeAddress = (hostname, port)
-
- /**
- * Set the home address and port for this actor.
- */
- def homeAddress_=(address: InetSocketAddress): Unit
-
- /**
- * Akka Java API
- * Set the home address and port for this actor.
- */
- def setHomeAddress(address: InetSocketAddress): Unit = homeAddress = address
-
- /**
- * Returns the remote address for the actor, if any, else None.
- */
- def remoteAddress: Option[InetSocketAddress]
- protected[akka] def remoteAddress_=(addr: Option[InetSocketAddress]): Unit
-
- /**
- * Akka Java API
- * Gets the remote address for the actor, if any, else None.
- */
- def getRemoteAddress(): Option[InetSocketAddress] = remoteAddress
-
/**
* Starts up the actor and its message queue.
*/
@@ -562,7 +504,8 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit
- protected[akka] def registerSupervisorAsRemoteActor: Option[Uuid]
+ //TODO: REVISIT: REMOVE
+ //protected[akka] def registerSupervisorAsRemoteActor: Option[Uuid]
protected[akka] def linkedActors: JMap[Uuid, ActorRef]
@@ -601,8 +544,6 @@ class LocalActorRef private[akka] (
private[this] val actorFactory: () => Actor)
extends ActorRef with ScalaActorRef {
- @volatile
- private[akka] var _remoteAddress: Option[InetSocketAddress] = None // only mutable to maintain identity across nodes
@volatile
private[akka] lazy val _linkedActors = new ConcurrentHashMap[Uuid, ActorRef]
@volatile
@@ -635,7 +576,6 @@ class LocalActorRef private[akka] (
this(__factory)
_uuid = __uuid
id = __id
- homeAddress = (__hostname, __port)
timeout = __timeout
receiveTimeout = __receiveTimeout
lifeCycle = __lifeCycle
@@ -674,41 +614,6 @@ class LocalActorRef private[akka] (
*/
def dispatcher: MessageDispatcher = _dispatcher
- /**
- * Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host.
- */
- def makeRemote(hostname: String, port: Int): Unit = {
- ensureRemotingEnabled
- if (!isRunning || isBeingRestarted) makeRemote(new InetSocketAddress(hostname, port))
- else throw new ActorInitializationException(
- "Can't make a running actor remote. Make sure you call 'makeRemote' before 'start'.")
- }
-
- /**
- * Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host.
- */
- def makeRemote(address: InetSocketAddress): Unit = guard.withGuard {
- ensureRemotingEnabled
- if (!isRunning || isBeingRestarted) {
- _remoteAddress = Some(address)
- RemoteClientModule.register(address, uuid)
- homeAddress = (RemoteServerModule.HOSTNAME, RemoteServerModule.PORT)
- } else throw new ActorInitializationException(
- "Can't make a running actor remote. Make sure you call 'makeRemote' before 'start'.")
- }
-
- /**
- * Set the contact address for this actor. This is used for replying to messages
- * sent asynchronously when no reply channel exists.
- */
- def homeAddress_=(address: InetSocketAddress): Unit = _homeAddress = address
-
- /**
- * Returns the remote address for the actor, if any, else None.
- */
- def remoteAddress: Option[InetSocketAddress] = _remoteAddress
- protected[akka] def remoteAddress_=(addr: Option[InetSocketAddress]): Unit = _remoteAddress = addr
-
/**
* Starts up the actor and its message queue.
*/
@@ -740,11 +645,6 @@ class LocalActorRef private[akka] (
_status = ActorRefInternals.SHUTDOWN
actor.postStop
ActorRegistry.unregister(this)
- if (isRemotingEnabled) {
- if (remoteAddress.isDefined)
- RemoteClientModule.unregister(remoteAddress.get, uuid)
- RemoteServerModule.unregister(this)
- }
setActorSelfFields(actorInstance.get,null)
} //else if (isBeingRestarted) throw new ActorKilledException("Actor [" + toString + "] is being restarted.")
}
@@ -795,11 +695,8 @@ class LocalActorRef private[akka] (
*
* To be invoked from within the actor itself.
*/
- def startLinkRemote(actorRef: ActorRef, hostname: String, port: Int): Unit = guard.withGuard {
- ensureRemotingEnabled
- actorRef.makeRemote(hostname, port)
- link(actorRef)
- actorRef.start
+ def startLinkRemote(actorRef: ActorRef, hostname: String, port: Int): Unit = {
+ //TODO: REVISIT: REMOVED
}
/**
@@ -818,10 +715,7 @@ class LocalActorRef private[akka] (
*/
def spawnRemote(clazz: Class[_ <: Actor], hostname: String, port: Int): ActorRef = guard.withGuard {
ensureRemotingEnabled
- val actor = Actor.actorOf(clazz)
- actor.makeRemote(hostname, port)
- actor.start
- actor
+ Actor.actorOf(clazz, hostname, port).start
}
/**
@@ -843,8 +737,7 @@ class LocalActorRef private[akka] (
*/
def spawnLinkRemote(clazz: Class[_ <: Actor], hostname: String, port: Int): ActorRef = guard.withGuard {
ensureRemotingEnabled
- val actor = Actor.actorOf(clazz)
- actor.makeRemote(hostname, port)
+ val actor = Actor.actorOf(clazz, hostname, port)
link(actor)
actor.start
actor
@@ -878,13 +771,8 @@ class LocalActorRef private[akka] (
protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit = _supervisor = sup
protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = {
- if (remoteAddress.isDefined && isRemotingEnabled) {
- RemoteClientModule.send[Any](
- message, senderOption, None, remoteAddress.get, timeout, true, this, None, ActorType.ScalaActor)
- } else {
- val invocation = new MessageInvocation(this, message, senderOption, None)
- dispatcher dispatchMessage invocation
- }
+ val invocation = new MessageInvocation(this, message, senderOption, None)
+ dispatcher dispatchMessage invocation
}
protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout[T](
@@ -892,19 +780,11 @@ class LocalActorRef private[akka] (
timeout: Long,
senderOption: Option[ActorRef],
senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = {
-
- if (remoteAddress.isDefined && isRemotingEnabled) {
- val future = RemoteClientModule.send[T](
- message, senderOption, senderFuture, remoteAddress.get, timeout, false, this, None, ActorType.ScalaActor)
- if (future.isDefined) future.get
- else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString)
- } else {
val future = if (senderFuture.isDefined) senderFuture else Some(new DefaultCompletableFuture[T](timeout))
val invocation = new MessageInvocation(
this, message, senderOption, future.asInstanceOf[Some[CompletableFuture[Any]]])
dispatcher dispatchMessage invocation
future.get
- }
}
/**
@@ -1061,13 +941,15 @@ class LocalActorRef private[akka] (
}
}
+ //TODO: REVISIT: REMOVE
+ /*
protected[akka] def registerSupervisorAsRemoteActor: Option[Uuid] = guard.withGuard {
ensureRemotingEnabled
if (_supervisor.isDefined) {
remoteAddress.foreach(address => RemoteClientModule.registerSupervisorForActor(address, this))
Some(_supervisor.get.uuid)
} else None
- }
+ }*/
protected[akka] def linkedActors: JMap[Uuid, ActorRef] = _linkedActors
@@ -1222,13 +1104,14 @@ private[akka] case class RemoteActorRef private[akka] (
val hostname: String,
val port: Int,
_timeout: Long,
+ clientManaged: Boolean,
loader: Option[ClassLoader],
val actorType: ActorType = ActorType.ScalaActor)
extends ActorRef with ScalaActorRef {
ensureRemotingEnabled
- val remoteAddress: Option[InetSocketAddress] = Some(new InetSocketAddress(hostname, port))
+ val homeAddress = new InetSocketAddress(hostname, port)
id = classOrServiceName
timeout = _timeout
@@ -1237,7 +1120,7 @@ private[akka] case class RemoteActorRef private[akka] (
def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit =
RemoteClientModule.send[Any](
- message, senderOption, None, remoteAddress.get, timeout, true, this, None, actorType)
+ message, senderOption, None, homeAddress, timeout, true, this, None, actorType)
def postMessageToMailboxAndCreateFutureResultWithTimeout[T](
message: Any,
@@ -1245,13 +1128,16 @@ private[akka] case class RemoteActorRef private[akka] (
senderOption: Option[ActorRef],
senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = {
val future = RemoteClientModule.send[T](
- message, senderOption, senderFuture, remoteAddress.get, timeout, false, this, None, actorType)
+ message, senderOption, senderFuture, homeAddress, timeout, false, this, None, actorType)
if (future.isDefined) future.get
else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString)
}
def start: ActorRef = synchronized {
_status = ActorRefInternals.RUNNING
+ if (clientManaged) {
+ RemoteClientModule.register(homeAddress, uuid)
+ }
this
}
@@ -1259,18 +1145,20 @@ private[akka] case class RemoteActorRef private[akka] (
if (_status == ActorRefInternals.RUNNING) {
_status = ActorRefInternals.SHUTDOWN
postMessageToMailbox(RemoteActorSystemMessage.Stop, None)
+ if (clientManaged) {
+ RemoteClientModule.unregister(homeAddress, uuid)
+ ActorRegistry.remote.unregister(this) //TODO: Why does this need to be deregistered from the server?
+ }
}
}
- protected[akka] def registerSupervisorAsRemoteActor: Option[Uuid] = None
+ //TODO: REVISIT: REMOVE
+ //protected[akka] def registerSupervisorAsRemoteActor: Option[Uuid] = None
// ==== NOT SUPPORTED ====
def actorClass: Class[_ <: Actor] = unsupported
def dispatcher_=(md: MessageDispatcher): Unit = unsupported
def dispatcher: MessageDispatcher = unsupported
- def makeRemote(hostname: String, port: Int): Unit = unsupported
- def makeRemote(address: InetSocketAddress): Unit = unsupported
- def homeAddress_=(address: InetSocketAddress): Unit = unsupported
def link(actorRef: ActorRef): Unit = unsupported
def unlink(actorRef: ActorRef): Unit = unsupported
def startLink(actorRef: ActorRef): Unit = unsupported
@@ -1288,7 +1176,6 @@ private[akka] case class RemoteActorRef private[akka] (
protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = unsupported
protected[akka] def linkedActors: JMap[Uuid, ActorRef] = unsupported
protected[akka] def invoke(messageHandle: MessageInvocation): Unit = unsupported
- protected[akka] def remoteAddress_=(addr: Option[InetSocketAddress]): Unit = unsupported
protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit = unsupported
protected[akka] def actorInstance: AtomicReference[Actor] = unsupported
private def unsupported = throw new UnsupportedOperationException("Not supported for RemoteActorRef")
diff --git a/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala b/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala
index 68fdb50d85..d80e5273da 100644
--- a/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala
+++ b/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala
@@ -12,8 +12,9 @@ import java.util.{Set => JSet}
import annotation.tailrec
import akka.util.ReflectiveAccess._
-import akka.util.{ReadWriteGuard, Address, ListenerManagement}
import java.net.InetSocketAddress
+import akka.util. {ReadWriteGuard, Address, ListenerManagement}
+import akka.remoteinterface.RemoteServerModule
/**
* Base trait for ActorRegistry events, allows listen to when an actor is added and removed from the ActorRegistry.
@@ -223,9 +224,19 @@ object ActorRegistry extends ListenerManagement {
TypedActorModule.typedActorObjectInstance.get.proxyFor(actorRef)
}
+ /**
+ * Handy access to the RemoteServer module
+ */
+ lazy val remote: RemoteServerModule = getObjectFor("akka.remote.RemoteNode$") match {
+ case Some(module) => module
+ case None =>
+ log.slf4j.error("Wanted remote module but didn't exist on classpath")
+ null
+ }
+
/**
- * Registers an actor in the ActorRegistry.
+ * Registers an actor in the ActorRegistry.
*/
private[akka] def register(actor: ActorRef) = {
// ID
diff --git a/akka-actor/src/main/scala/akka/actor/Supervisor.scala b/akka-actor/src/main/scala/akka/actor/Supervisor.scala
index 351cdecf89..daf7a962de 100644
--- a/akka-actor/src/main/scala/akka/actor/Supervisor.scala
+++ b/akka-actor/src/main/scala/akka/actor/Supervisor.scala
@@ -130,7 +130,7 @@ sealed class Supervisor(handler: FaultHandlingStrategy) {
case SupervisorConfig(_, servers) =>
servers.map(server =>
server match {
- case Supervise(actorRef, lifeCycle, remoteAddress) =>
+ case Supervise(actorRef, lifeCycle, registerAsRemoteService) =>
actorRef.start
val className = actorRef.actor.getClass.getName
val currentActors = {
@@ -141,10 +141,8 @@ sealed class Supervisor(handler: FaultHandlingStrategy) {
_childActors.put(className, actorRef :: currentActors)
actorRef.lifeCycle = lifeCycle
supervisor.link(actorRef)
- if (remoteAddress.isDefined) {
- val address = remoteAddress.get
- RemoteServerModule.registerActor(new InetSocketAddress(address.hostname, address.port), actorRef)
- }
+ if (registerAsRemoteService)
+ ActorRegistry.remote.register(actorRef)
case supervisorConfig @ SupervisorConfig(_, _) => // recursive supervisor configuration
val childSupervisor = Supervisor(supervisorConfig)
supervisor.link(childSupervisor.supervisor)
diff --git a/akka-actor/src/main/scala/akka/actor/UntypedActor.scala b/akka-actor/src/main/scala/akka/actor/UntypedActor.scala
index 9eec8cbb5d..668dad7f86 100644
--- a/akka-actor/src/main/scala/akka/actor/UntypedActor.scala
+++ b/akka-actor/src/main/scala/akka/actor/UntypedActor.scala
@@ -92,16 +92,6 @@ abstract class UntypedActor extends Actor {
*/
trait UntypedActorFactory extends Creator[Actor]
-/**
- * Extend this abstract class to create a remote UntypedActor.
- *
- * @author Jonas Bonér
- */
-abstract class RemoteUntypedActor(address: InetSocketAddress) extends UntypedActor {
- def this(hostname: String, port: Int) = this(new InetSocketAddress(hostname, port))
- self.makeRemote(address)
-}
-
/**
* Factory object for creating and managing 'UntypedActor's. Meant to be used from Java.
*
diff --git a/akka-actor/src/main/scala/akka/config/SupervisionConfig.scala b/akka-actor/src/main/scala/akka/config/SupervisionConfig.scala
index f3d6e1ada9..f7771d80b5 100644
--- a/akka-actor/src/main/scala/akka/config/SupervisionConfig.scala
+++ b/akka-actor/src/main/scala/akka/config/SupervisionConfig.scala
@@ -26,20 +26,16 @@ object Supervision {
def this(restartStrategy: FaultHandlingStrategy, worker: Array[Server]) = this(restartStrategy,worker.toList)
}
- class Supervise(val actorRef: ActorRef, val lifeCycle: LifeCycle, val remoteAddress: Option[RemoteAddress]) extends Server {
- //Java API
- def this(actorRef: ActorRef, lifeCycle: LifeCycle, remoteAddress: RemoteAddress) =
- this(actorRef, lifeCycle, Option(remoteAddress))
-
+ class Supervise(val actorRef: ActorRef, val lifeCycle: LifeCycle, val registerAsRemoteService: Boolean = false) extends Server {
//Java API
def this(actorRef: ActorRef, lifeCycle: LifeCycle) =
- this(actorRef, lifeCycle, None)
+ this(actorRef, lifeCycle, false)
}
object Supervise {
- def apply(actorRef: ActorRef, lifeCycle: LifeCycle, remoteAddress: RemoteAddress) = new Supervise(actorRef, lifeCycle, remoteAddress)
- def apply(actorRef: ActorRef, lifeCycle: LifeCycle) = new Supervise(actorRef, lifeCycle, None)
- def unapply(supervise: Supervise) = Some((supervise.actorRef, supervise.lifeCycle, supervise.remoteAddress))
+ def apply(actorRef: ActorRef, lifeCycle: LifeCycle, registerAsRemoteService: Boolean = false) = new Supervise(actorRef, lifeCycle, registerAsRemoteService)
+ def apply(actorRef: ActorRef, lifeCycle: LifeCycle) = new Supervise(actorRef, lifeCycle, false)
+ def unapply(supervise: Supervise) = Some((supervise.actorRef, supervise.lifeCycle, supervise.registerAsRemoteService))
}
object AllForOneStrategy {
diff --git a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala
new file mode 100644
index 0000000000..4f6cd11a67
--- /dev/null
+++ b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala
@@ -0,0 +1,149 @@
+/**
+ * Copyright (C) 2009-2010 Scalable Solutions AB
+ */
+
+package akka.remoteinterface
+
+import akka.japi.Creator
+import akka.actor.ActorRef
+import akka.util.{ReentrantGuard, Logging, ListenerManagement}
+
+/**
+ * This is the interface for the RemoteServer functionality, it's used in ActorRegistry.remote
+ */
+trait RemoteServerModule extends ListenerManagement with Logging {
+ protected val guard = new ReentrantGuard
+
+ /**
+ * Signals whether the server is up and running or not
+ */
+ def isRunning: Boolean
+
+ /**
+ * Gets the name of the server instance
+ */
+ def name: String
+
+ /**
+ * Gets the current hostname of the server instance
+ */
+ def hostname: String
+
+ /**
+ * Gets the current port of the server instance
+ */
+ def port: Int
+
+ /**
+ * Starts the server up
+ */
+ def start(host: String, port: Int, loader: Option[ClassLoader] = None): RemoteServerModule //TODO possibly hidden
+
+ /**
+ * Shuts the server down
+ */
+ def shutdown: Unit //TODO possibly hidden
+
+ /**
+ * Register typed actor by interface name.
+ */
+ def registerTypedActor(intfClass: Class[_], typedActor: AnyRef) : Unit = registerTypedActor(intfClass.getName, typedActor)
+
+ /**
+ * Register remote typed actor by a specific id.
+ * @param id custom actor id
+ * @param typedActor typed actor to register
+ */
+ def registerTypedActor(id: String, typedActor: AnyRef): Unit
+
+ /**
+ * Register typed actor by interface name.
+ */
+ def registerTypedPerSessionActor(intfClass: Class[_], factory: => AnyRef) : Unit = registerTypedActor(intfClass.getName, factory)
+
+ /**
+ * Register typed actor by interface name.
+ * Java API
+ */
+ def registerTypedPerSessionActor(intfClass: Class[_], factory: Creator[AnyRef]) : Unit = registerTypedActor(intfClass.getName, factory)
+
+ /**
+ * Register remote typed actor by a specific id.
+ * @param id custom actor id
+ * @param typedActor typed actor to register
+ */
+ def registerTypedPerSessionActor(id: String, factory: => AnyRef): Unit
+
+ /**
+ * Register remote typed actor by a specific id.
+ * @param id custom actor id
+ * @param typedActor typed actor to register
+ * Java API
+ */
+ def registerTypedPerSessionActor(id: String, factory: Creator[AnyRef]): Unit = registerTypedPerSessionActor(id, factory.create)
+
+ /**
+ * Register Remote Actor by the Actor's 'id' field. It starts the Actor if it is not started already.
+ */
+ def register(actorRef: ActorRef): Unit = register(actorRef.id, actorRef)
+
+ /**
+ * Register Remote Actor by the Actor's uuid field. It starts the Actor if it is not started already.
+ */
+ def registerByUuid(actorRef: ActorRef): Unit
+
+ /**
+ * Register Remote Actor by a specific 'id' passed as argument.
+ *
+ * NOTE: If you use this method to register your remote actor then you must unregister the actor by this ID yourself.
+ */
+ def register(id: String, actorRef: ActorRef): Unit
+
+ /**
+ * Register Remote Session Actor by a specific 'id' passed as argument.
+ *
+ * NOTE: If you use this method to register your remote actor then you must unregister the actor by this ID yourself.
+ */
+ def registerPerSession(id: String, factory: => ActorRef): Unit
+
+ /**
+ * Register Remote Session Actor by a specific 'id' passed as argument.
+ *
+ * NOTE: If you use this method to register your remote actor then you must unregister the actor by this ID yourself.
+ * Java API
+ */
+ def registerPerSession(id: String, factory: Creator[ActorRef]): Unit = registerPerSession(id, factory.create)
+
+ /**
+ * Unregister Remote Actor that is registered using its 'id' field (not custom ID).
+ */
+ def unregister(actorRef: ActorRef): Unit
+
+ /**
+ * Unregister Remote Actor by specific 'id'.
+ *
+ * NOTE: You need to call this method if you have registered an actor by a custom ID.
+ */
+ def unregister(id: String): Unit
+
+ /**
+ * Unregister Remote Actor by specific 'id'.
+ *
+ * NOTE: You need to call this method if you have registered an actor by a custom ID.
+ */
+ def unregisterPerSession(id: String): Unit
+
+ /**
+ * Unregister Remote Typed Actor by specific 'id'.
+ *
+ * NOTE: You need to call this method if you have registered an actor by a custom ID.
+ */
+ def unregisterTypedActor(id: String): Unit
+
+ /**
+ * Unregister Remote Typed Actor by specific 'id'.
+ *
+ * NOTE: You need to call this method if you have registered an actor by a custom ID.
+ */
+ def unregisterTypedPerSessionActor(id: String): Unit
+}
\ No newline at end of file
diff --git a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala
index 45e1609b79..14bde14e89 100644
--- a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala
+++ b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala
@@ -107,36 +107,6 @@ object ReflectiveAccess extends Logging {
object RemoteServerModule {
val HOSTNAME = Config.config.getString("akka.remote.server.hostname", "localhost")
val PORT = Config.config.getInt("akka.remote.server.port", 2552)
-
- type RemoteServerObject = {
- def registerActor(address: InetSocketAddress, actor: ActorRef): Unit
- def registerTypedActor(address: InetSocketAddress, name: String, typedActor: AnyRef): Unit
- }
-
- type RemoteNodeObject = {
- def unregister(actorRef: ActorRef): Unit
- }
-
- val remoteServerObjectInstance: Option[RemoteServerObject] =
- getObjectFor("akka.remote.RemoteServer$")
-
- val remoteNodeObjectInstance: Option[RemoteNodeObject] =
- getObjectFor("akka.remote.RemoteNode$")
-
- def registerActor(address: InetSocketAddress, actorRef: ActorRef) = {
- RemoteClientModule.ensureEnabled
- remoteServerObjectInstance.get.registerActor(address, actorRef)
- }
-
- def registerTypedActor(address: InetSocketAddress, implementationClassName: String, proxy: AnyRef) = {
- RemoteClientModule.ensureEnabled
- remoteServerObjectInstance.get.registerTypedActor(address, implementationClassName, proxy)
- }
-
- def unregister(actorRef: ActorRef) = {
- RemoteClientModule.ensureEnabled
- remoteNodeObjectInstance.get.unregister(actorRef)
- }
}
/**
diff --git a/akka-remote/src/main/scala/akka/remote/BootableRemoteActorService.scala b/akka-remote/src/main/scala/akka/remote/BootableRemoteActorService.scala
index 70b6154fda..9c2eeb9c2c 100644
--- a/akka-remote/src/main/scala/akka/remote/BootableRemoteActorService.scala
+++ b/akka-remote/src/main/scala/akka/remote/BootableRemoteActorService.scala
@@ -4,9 +4,9 @@
package akka.remote
-import akka.actor.BootableActorLoaderService
-import akka.util.{Bootable, Logging}
import akka.config.Config.config
+import akka.actor. {ActorRegistry, BootableActorLoaderService}
+import akka.util. {ReflectiveAccess, Bootable, Logging}
/**
* This bundle/service is responsible for booting up and shutting down the remote actors facility
@@ -17,10 +17,8 @@ trait BootableRemoteActorService extends Bootable with Logging {
self: BootableActorLoaderService =>
protected lazy val remoteServerThread = new Thread(new Runnable() {
- def run = {
- if (self.applicationLoader.isDefined) RemoteNode.start(self.applicationLoader.get)
- else RemoteNode.start
- }
+ import ReflectiveAccess.RemoteServerModule.{HOSTNAME,PORT}
+ def run = ActorRegistry.remote.start(HOSTNAME,PORT,loader = self.applicationLoader)
}, "Akka Remote Service")
def startRemoteService = remoteServerThread.start
diff --git a/akka-remote/src/main/scala/akka/remote/RemoteClient.scala b/akka-remote/src/main/scala/akka/remote/RemoteClient.scala
index 2b137d8788..0a27f04cb1 100644
--- a/akka-remote/src/main/scala/akka/remote/RemoteClient.scala
+++ b/akka-remote/src/main/scala/akka/remote/RemoteClient.scala
@@ -95,7 +95,7 @@ object RemoteClient extends Logging {
actorFor(classNameOrServiceId, classNameOrServiceId, timeout, hostname, port, Some(loader))
def actorFor(serviceId: String, className: String, timeout: Long, hostname: String, port: Int): ActorRef =
- RemoteActorRef(serviceId, className, hostname, port, timeout, None)
+ RemoteActorRef(serviceId, className, hostname, port, timeout, false, None)
def typedActorFor[T](intfClass: Class[T], serviceIdOrClassName: String, hostname: String, port: Int): T = {
typedActorFor(intfClass, serviceIdOrClassName, serviceIdOrClassName, Actor.TIMEOUT, hostname, port, None)
@@ -114,15 +114,15 @@ object RemoteClient extends Logging {
}
private[akka] def typedActorFor[T](intfClass: Class[T], serviceId: String, implClassName: String, timeout: Long, hostname: String, port: Int, loader: Option[ClassLoader]): T = {
- val actorRef = RemoteActorRef(serviceId, implClassName, hostname, port, timeout, loader, ActorType.TypedActor)
+ val actorRef = RemoteActorRef(serviceId, implClassName, hostname, port, timeout, false, loader, ActorType.TypedActor)
TypedActor.createProxyForRemoteActorRef(intfClass, actorRef)
}
private[akka] def actorFor(serviceId: String, className: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): ActorRef =
- RemoteActorRef(serviceId, className, hostname, port, timeout, Some(loader))
+ RemoteActorRef(serviceId, className, hostname, port, timeout, false, Some(loader))
private[akka] def actorFor(serviceId: String, className: String, timeout: Long, hostname: String, port: Int, loader: Option[ClassLoader]): ActorRef =
- RemoteActorRef(serviceId, className, hostname, port, timeout, loader)
+ RemoteActorRef(serviceId, className, hostname, port, timeout, false, loader)
def clientFor(hostname: String, port: Int): RemoteClient =
clientFor(new InetSocketAddress(hostname, port), None)
diff --git a/akka-remote/src/main/scala/akka/remote/RemoteServer.scala b/akka-remote/src/main/scala/akka/remote/RemoteServer.scala
index 9fa86e25df..1ec427d0ab 100644
--- a/akka-remote/src/main/scala/akka/remote/RemoteServer.scala
+++ b/akka-remote/src/main/scala/akka/remote/RemoteServer.scala
@@ -32,6 +32,7 @@ import scala.collection.mutable.Map
import scala.reflect.BeanProperty
import akka.dispatch. {Future, DefaultCompletableFuture, CompletableFuture}
import akka.japi.Creator
+import akka.remoteinterface.RemoteServerModule
/**
* Use this object if you need a single remote server on a specific node.
@@ -112,45 +113,6 @@ object RemoteServer {
true
} else */false
}
-
- private val guard = new ReadWriteGuard
- private val remoteServers = Map[Address, RemoteServer]()
-
- def serverFor(address: InetSocketAddress): Option[RemoteServer] =
- serverFor(address.getHostName, address.getPort)
-
- def serverFor(hostname: String, port: Int): Option[RemoteServer] = guard.withReadGuard {
- remoteServers.get(Address(hostname, port))
- }
-
- private[akka] def getOrCreateServer(address: InetSocketAddress): RemoteServer = guard.withWriteGuard {
- serverFor(address) match {
- case Some(server) => server
- case None => (new RemoteServer).start(address)
- }
- }
-
- private[akka] def register(hostname: String, port: Int, server: RemoteServer) = guard.withWriteGuard {
- remoteServers.put(Address(hostname, port), server)
- }
-
- private[akka] def unregister(hostname: String, port: Int) = guard.withWriteGuard {
- remoteServers.remove(Address(hostname, port))
- }
-
- /**
- * Used in REflectiveAccess
- */
- private[akka] def registerActor(address: InetSocketAddress, actorRef: ActorRef) {
- serverFor(address) foreach { _.register(actorRef) }
- }
-
- /**
- * Used in Reflective
- */
- private[akka] def registerTypedActor(address: InetSocketAddress, implementationClassName: String, proxy: AnyRef) {
- serverFor(address) foreach { _.registerTypedActor(implementationClassName,proxy)}
- }
}
/**
@@ -190,52 +152,31 @@ case class RemoteServerClientClosed(
*
* @author Jonas Bonér
*/
-class RemoteServer extends Logging with ListenerManagement {
+class RemoteServer extends RemoteServerModule {
import RemoteServer._
- def name = "RemoteServer@" + hostname + ":" + port
- private[akka] var address = Address(RemoteServer.HOSTNAME,RemoteServer.PORT)
+ @volatile private[akka] var address = Address(RemoteServer.HOSTNAME,RemoteServer.PORT)
def hostname = address.hostname
def port = address.port
+ def name = "RemoteServer@" + hostname + ":" + port
- @volatile private var _isRunning = false
+ private val _isRunning = new Switch(false)
- private val factory = new NioServerSocketChannelFactory(
- Executors.newCachedThreadPool,
- Executors.newCachedThreadPool)
+ private val factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool,Executors.newCachedThreadPool)
private val bootstrap = new ServerBootstrap(factory)
// group of open channels, used for clean-up
private val openChannels: ChannelGroup = new DefaultChannelGroup("akka-remote-server")
- def isRunning = _isRunning
+ def isRunning = _isRunning.isOn
- def start: RemoteServer =
- start(hostname, port, None)
-
- def start(loader: ClassLoader): RemoteServer =
- start(hostname, port, Some(loader))
-
- def start(address: InetSocketAddress): RemoteServer =
- start(address.getHostName, address.getPort, None)
-
- def start(address: InetSocketAddress, loader: ClassLoader): RemoteServer =
- start(address.getHostName, address.getPort, Some(loader))
-
- def start(_hostname: String, _port: Int): RemoteServer =
- start(_hostname, _port, None)
-
- private def start(_hostname: String, _port: Int, loader: ClassLoader): RemoteServer =
- start(_hostname, _port, Some(loader))
-
- private def start(_hostname: String, _port: Int, loader: Option[ClassLoader]): RemoteServer = synchronized {
+ def start(_hostname: String, _port: Int, loader: Option[ClassLoader] = None): RemoteServer = guard withGuard {
try {
- if (!_isRunning) {
+ _isRunning switchOn {
address = Address(_hostname,_port)
log.slf4j.info("Starting remote server at [{}:{}]", hostname, port)
- RemoteServer.register(hostname, port, this)
val pipelineFactory = new RemoteServerPipelineFactory(name, openChannels, loader, this)
bootstrap.setPipelineFactory(pipelineFactory)
@@ -245,7 +186,6 @@ class RemoteServer extends Logging with ListenerManagement {
bootstrap.setOption("child.connectTimeoutMillis", RemoteServer.CONNECTION_TIMEOUT_MILLIS.toMillis)
openChannels.add(bootstrap.bind(new InetSocketAddress(hostname, port)))
- _isRunning = true
notifyListeners(RemoteServerStarted(this))
}
} catch {
@@ -256,10 +196,9 @@ class RemoteServer extends Logging with ListenerManagement {
this
}
- def shutdown = synchronized {
- if (_isRunning) {
+ def shutdown = guard withGuard {
+ _isRunning switchOff {
try {
- RemoteServer.unregister(hostname, port)
openChannels.disconnect
openChannels.close.awaitUninterruptibly
bootstrap.releaseExternalResources
@@ -271,70 +210,49 @@ class RemoteServer extends Logging with ListenerManagement {
}
}
- /**
- * Register typed actor by interface name.
- */
- def registerTypedActor(intfClass: Class[_], typedActor: AnyRef) : Unit = registerTypedActor(intfClass.getName, typedActor)
-
/**
* Register remote typed actor by a specific id.
* @param id custom actor id
* @param typedActor typed actor to register
*/
- def registerTypedActor(id: String, typedActor: AnyRef): Unit = synchronized {
+ def registerTypedActor(id: String, typedActor: AnyRef): Unit = guard withGuard {
log.slf4j.debug("Registering server side remote typed actor [{}] with id [{}]", typedActor.getClass.getName, id)
if (id.startsWith(UUID_PREFIX)) registerTypedActor(id.substring(UUID_PREFIX.length), typedActor, typedActorsByUuid)
else registerTypedActor(id, typedActor, typedActors)
}
- /**
- * Register typed actor by interface name.
- */
- def registerTypedPerSessionActor(intfClass: Class[_], factory: => AnyRef) : Unit = registerTypedActor(intfClass.getName, factory)
-
- /**
- * Register typed actor by interface name.
- * Java API
- */
- def registerTypedPerSessionActor(intfClass: Class[_], factory: Creator[AnyRef]) : Unit = registerTypedActor(intfClass.getName, factory)
-
/**
* Register remote typed actor by a specific id.
* @param id custom actor id
* @param typedActor typed actor to register
*/
- def registerTypedPerSessionActor(id: String, factory: => AnyRef): Unit = synchronized {
+ def registerTypedPerSessionActor(id: String, factory: => AnyRef): Unit = guard withGuard {
log.slf4j.debug("Registering server side typed remote session actor with id [{}]", id)
registerTypedPerSessionActor(id, () => factory, typedActorsFactories)
}
- /**
- * Register remote typed actor by a specific id.
- * @param id custom actor id
- * @param typedActor typed actor to register
- * Java API
- */
- def registerTypedPerSessionActor(id: String, factory: Creator[AnyRef]): Unit = synchronized {
- log.slf4j.debug("Registering server side typed remote session actor with id [{}]", id)
- registerTypedPerSessionActor(id, factory.create _, typedActorsFactories)
- }
-
- /**
- * Register Remote Actor by the Actor's 'id' field. It starts the Actor if it is not started already.
- */
- def register(actorRef: ActorRef): Unit = register(actorRef.id, actorRef)
-
/**
* Register Remote Actor by a specific 'id' passed as argument.
*
* NOTE: If you use this method to register your remote actor then you must unregister the actor by this ID yourself.
*/
- def register(id: String, actorRef: ActorRef): Unit = synchronized {
+ def register(id: String, actorRef: ActorRef): Unit = guard withGuard {
log.slf4j.debug("Registering server side remote actor [{}] with id [{}]", actorRef.actorClass.getName, id)
if (id.startsWith(UUID_PREFIX)) register(id.substring(UUID_PREFIX.length), actorRef, actorsByUuid)
else register(id, actorRef, actors)
}
+ def registerByUuid(actorRef: ActorRef): Unit = guard withGuard {
+ register(actorRef.uuid.toString, actorRef, actorsByUuid)
+ }
+
+ private def register[Key](id: Key, actorRef: ActorRef, registry: ConcurrentHashMap[Key, ActorRef]) {
+ if (_isRunning.isOn) {
+ registry.put(id, actorRef) //TODO change to putIfAbsent
+ if (!actorRef.isRunning) actorRef.start
+ }
+ }
+
/**
* Register Remote Session Actor by a specific 'id' passed as argument.
*
@@ -345,44 +263,26 @@ class RemoteServer extends Logging with ListenerManagement {
registerPerSession(id, () => factory, actorsFactories)
}
- /**
- * Register Remote Session Actor by a specific 'id' passed as argument.
- *
- * NOTE: If you use this method to register your remote actor then you must unregister the actor by this ID yourself.
- * Java API
- */
- def registerPerSession(id: String, factory: Creator[ActorRef]): Unit = synchronized {
- log.slf4j.debug("Registering server side remote session actor with id [{}]", id)
- registerPerSession(id, factory.create _, actorsFactories)
- }
-
- private def register[Key](id: Key, actorRef: ActorRef, registry: ConcurrentHashMap[Key, ActorRef]) {
- if (_isRunning) {
- registry.put(id, actorRef) //TODO change to putIfAbsent
- if (!actorRef.isRunning) actorRef.start
- }
- }
-
private def registerPerSession[Key](id: Key, factory: () => ActorRef, registry: ConcurrentHashMap[Key,() => ActorRef]) {
- if (_isRunning)
+ if (_isRunning.isOn)
registry.put(id, factory) //TODO change to putIfAbsent
}
private def registerTypedActor[Key](id: Key, typedActor: AnyRef, registry: ConcurrentHashMap[Key, AnyRef]) {
- if (_isRunning)
+ if (_isRunning.isOn)
registry.put(id, typedActor) //TODO change to putIfAbsent
}
private def registerTypedPerSessionActor[Key](id: Key, factory: () => AnyRef, registry: ConcurrentHashMap[Key,() => AnyRef]) {
- if (_isRunning)
+ if (_isRunning.isOn)
registry.put(id, factory) //TODO change to putIfAbsent
}
/**
* Unregister Remote Actor that is registered using its 'id' field (not custom ID).
*/
- def unregister(actorRef: ActorRef):Unit = synchronized {
- if (_isRunning) {
+ def unregister(actorRef: ActorRef): Unit = guard withGuard {
+ if (_isRunning.isOn) {
log.slf4j.debug("Unregistering server side remote actor [{}] with id [{}:{}]", Array[AnyRef](actorRef.actorClass.getName, actorRef.id, actorRef.uuid))
actors.remove(actorRef.id, actorRef)
actorsByUuid.remove(actorRef.uuid, actorRef)
@@ -394,8 +294,8 @@ class RemoteServer extends Logging with ListenerManagement {
*
* NOTE: You need to call this method if you have registered an actor by a custom ID.
*/
- def unregister(id: String):Unit = synchronized {
- if (_isRunning) {
+ def unregister(id: String): Unit = guard withGuard {
+ if (_isRunning.isOn) {
log.slf4j.info("Unregistering server side remote actor with id [{}]", id)
if (id.startsWith(UUID_PREFIX)) actorsByUuid.remove(id.substring(UUID_PREFIX.length))
else {
@@ -411,8 +311,8 @@ class RemoteServer extends Logging with ListenerManagement {
*
* NOTE: You need to call this method if you have registered an actor by a custom ID.
*/
- def unregisterPerSession(id: String):Unit = {
- if (_isRunning) {
+ def unregisterPerSession(id: String): Unit = {
+ if (_isRunning.isOn) {
log.slf4j.info("Unregistering server side remote session actor with id [{}]", id)
actorsFactories.remove(id)
}
@@ -423,8 +323,8 @@ class RemoteServer extends Logging with ListenerManagement {
*
* NOTE: You need to call this method if you have registered an actor by a custom ID.
*/
- def unregisterTypedActor(id: String):Unit = synchronized {
- if (_isRunning) {
+ def unregisterTypedActor(id: String):Unit = guard withGuard {
+ if (_isRunning.isOn) {
log.slf4j.info("Unregistering server side remote typed actor with id [{}]", id)
if (id.startsWith(UUID_PREFIX)) typedActorsByUuid.remove(id.substring(UUID_PREFIX.length))
else typedActors.remove(id)
@@ -436,8 +336,8 @@ class RemoteServer extends Logging with ListenerManagement {
*
* NOTE: You need to call this method if you have registered an actor by a custom ID.
*/
- def unregisterTypedPerSessionActor(id: String):Unit = {
- if (_isRunning) {
+ def unregisterTypedPerSessionActor(id: String): Unit = {
+ if (_isRunning.isOn) {
typedActorsFactories.remove(id)
}
}
@@ -446,7 +346,6 @@ class RemoteServer extends Logging with ListenerManagement {
protected[akka] override def notifyListeners(message: => Any): Unit = super.notifyListeners(message)
-
private[akka] def actors = ActorRegistry.actors(address)
private[akka] def actorsByUuid = ActorRegistry.actorsByUuid(address)
private[akka] def actorsFactories = ActorRegistry.actorsFactories(address)
@@ -838,7 +737,6 @@ class RemoteServerHandler(
actorRef.uuid = uuidFrom(uuid.getHigh,uuid.getLow)
actorRef.id = id
actorRef.timeout = timeout
- actorRef.remoteAddress = None
server.actorsByUuid.put(actorRef.uuid.toString, actorRef) // register by uuid
actorRef
} catch {
diff --git a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala
index b93b472f51..258d210490 100644
--- a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala
+++ b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala
@@ -16,6 +16,8 @@ import akka.actor._
import scala.collection.immutable.Stack
import com.google.protobuf.ByteString
+import akka.util.ReflectiveAccess
+import akka.util.ReflectiveAccess.RemoteServerModule.{HOSTNAME,PORT}
/**
* Type class definition for Actor Serialization
@@ -88,6 +90,11 @@ object ActorSerialization {
def toBinaryJ[T <: Actor](a: ActorRef, format: Format[T], srlMailBox: Boolean = true): Array[Byte] =
toBinary(a, srlMailBox)(format)
+ val localAddress = AddressProtocol.newBuilder
+ .setHostname(HOSTNAME)
+ .setPort(PORT)
+ .build
+
private[akka] def toSerializedActorRefProtocol[T <: Actor](
actorRef: ActorRef, format: Format[T], serializeMailBox: Boolean = true): SerializedActorRefProtocol = {
val lifeCycleProtocol: Option[LifeCycleProtocol] = {
@@ -98,16 +105,11 @@ object ActorSerialization {
}
}
- val originalAddress = AddressProtocol.newBuilder
- .setHostname(actorRef.homeAddress.getHostName)
- .setPort(actorRef.homeAddress.getPort)
- .build
-
val builder = SerializedActorRefProtocol.newBuilder
.setUuid(UuidProtocol.newBuilder.setHigh(actorRef.uuid.getTime).setLow(actorRef.uuid.getClockSeqAndNode).build)
.setId(actorRef.id)
.setActorClassname(actorRef.actorClass.getName)
- .setOriginalAddress(originalAddress)
+ .setOriginalAddress(localAddress)
.setTimeout(actorRef.timeout)
@@ -233,6 +235,7 @@ object RemoteActorSerialization {
protocol.getHomeAddress.getHostname,
protocol.getHomeAddress.getPort,
protocol.getTimeout,
+ false,
loader)
}
@@ -241,18 +244,14 @@ object RemoteActorSerialization {
*/
def toRemoteActorRefProtocol(ar: ActorRef): RemoteActorRefProtocol = {
import ar._
- val home = homeAddress
- val host = home.getHostName
- val port = home.getPort
- Actor.log.slf4j.debug("Register serialized Actor [{}] as remote @ [{}]",actorClassName, home)
- RemoteServer.getOrCreateServer(homeAddress)
- ActorRegistry.registerActorByUuid(homeAddress, uuid.toString, ar)
+ Actor.log.slf4j.debug("Register serialized Actor [{}] as remote @ [{}:{}]",Array[AnyRef](actorClassName, HOSTNAME, PORT.asInstanceOf[AnyRef]))
+ ActorRegistry.remote.registerByUuid(ar)
RemoteActorRefProtocol.newBuilder
.setClassOrServiceName(uuid.toString)
.setActorClassname(actorClassName)
- .setHomeAddress(AddressProtocol.newBuilder.setHostname(host).setPort(port).build)
+ .setHomeAddress(ActorSerialization.localAddress)
.setTimeout(timeout)
.build
}
@@ -311,6 +310,8 @@ object RemoteActorSerialization {
secureCookie.foreach(messageBuilder.setCookie(_))
+ //TODO: REVISIT: REMOVE
+ /**
actorRef.foreach { ref =>
ref.registerSupervisorAsRemoteActor.foreach { id =>
messageBuilder.setSupervisorUuid(
@@ -319,13 +320,11 @@ object RemoteActorSerialization {
.setLow(id.getClockSeqAndNode)
.build)
}
- }
+ }*/
- senderOption.foreach { sender =>
- RemoteServer.getOrCreateServer(sender.homeAddress).register(sender.uuid.toString, sender)
- messageBuilder.setSender(toRemoteActorRefProtocol(sender))
+ if( senderOption.isDefined)
+ messageBuilder.setSender(toRemoteActorRefProtocol(senderOption.get))
- }
messageBuilder
}
}
diff --git a/akka-samples/akka-sample-remote/src/main/scala/ClientManagedRemoteActorSample.scala b/akka-samples/akka-sample-remote/src/main/scala/ClientManagedRemoteActorSample.scala
index 1f85c73100..76ab88c7fb 100644
--- a/akka-samples/akka-sample-remote/src/main/scala/ClientManagedRemoteActorSample.scala
+++ b/akka-samples/akka-sample-remote/src/main/scala/ClientManagedRemoteActorSample.scala
@@ -5,11 +5,10 @@
package sample.remote
import akka.actor.Actor._
-import akka.actor.RemoteActor
-import akka.remote.RemoteNode
import akka.util.Logging
+import akka.actor. {ActorRegistry, Actor}
-class RemoteHelloWorldActor extends RemoteActor("localhost", 2552) {
+class RemoteHelloWorldActor extends Actor {
def receive = {
case "Hello" =>
log.slf4j.info("Received 'Hello'")
@@ -19,7 +18,7 @@ class RemoteHelloWorldActor extends RemoteActor("localhost", 2552) {
object ClientManagedRemoteActorServer extends Logging {
def run = {
- RemoteNode.start("localhost", 2552)
+ ActorRegistry.remote.start("localhost", 2552)
log.slf4j.info("Remote node started")
}
@@ -29,7 +28,7 @@ object ClientManagedRemoteActorServer extends Logging {
object ClientManagedRemoteActorClient extends Logging {
def run = {
- val actor = actorOf[RemoteHelloWorldActor].start
+ val actor = actorOf[RemoteHelloWorldActor]("localhost",2552).start
log.slf4j.info("Remote actor created, moved to the server")
log.slf4j.info("Sending 'Hello' to remote actor")
val result = actor !! "Hello"
diff --git a/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala
index 1a39eab01d..dbdf27b352 100644
--- a/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala
+++ b/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala
@@ -512,9 +512,9 @@ object TypedActor extends Logging {
typedActor.initialize(proxy)
if (config._messageDispatcher.isDefined) actorRef.dispatcher = config._messageDispatcher.get
if (config._threadBasedDispatcher.isDefined) actorRef.dispatcher = Dispatchers.newThreadBasedDispatcher(actorRef)
- if (config._host.isDefined) actorRef.makeRemote(config._host.get)
+ if (config._host.isDefined) log.slf4j.warn("Client-managed typed actors are not supported!") //TODO: REVISIT: FIXME
actorRef.timeout = config.timeout
- AspectInitRegistry.register(proxy, AspectInit(intfClass, typedActor, actorRef, actorRef.remoteAddress, actorRef.timeout))
+ AspectInitRegistry.register(proxy, AspectInit(intfClass, typedActor, actorRef, None, actorRef.timeout)) //TODO: REVISIT fix Client managed typed actor
actorRef.start
proxy.asInstanceOf[T]
}
@@ -751,7 +751,7 @@ private[akka] sealed class ServerManagedTypedActorAspect extends ActorAspect {
override def initialize(joinPoint: JoinPoint): Unit = {
super.initialize(joinPoint)
- remoteAddress = actorRef.remoteAddress
+ //remoteAddress = actorRef.remoteAddress //TODO: REVISIT: Fix Server managed Typed Actor
}
}
diff --git a/akka-typed-actor/src/main/scala/akka/config/TypedActorGuiceConfigurator.scala b/akka-typed-actor/src/main/scala/akka/config/TypedActorGuiceConfigurator.scala
index fc2f83c5d9..d2a9ebca26 100644
--- a/akka-typed-actor/src/main/scala/akka/config/TypedActorGuiceConfigurator.scala
+++ b/akka-typed-actor/src/main/scala/akka/config/TypedActorGuiceConfigurator.scala
@@ -115,6 +115,7 @@ private[akka] class TypedActorGuiceConfigurator extends TypedActorConfiguratorBa
val proxy = Proxy.newInstance(Array(interfaceClass), Array(typedActor), true, false)
+ /*
val remoteAddress =
if (component.remoteAddress.isDefined)
Some(new InetSocketAddress(component.remoteAddress.get.hostname, component.remoteAddress.get.port))
@@ -122,11 +123,11 @@ private[akka] class TypedActorGuiceConfigurator extends TypedActorConfiguratorBa
remoteAddress.foreach { address =>
actorRef.makeRemote(remoteAddress.get)
- }
+ }*/
AspectInitRegistry.register(
proxy,
- AspectInit(interfaceClass, typedActor, actorRef, remoteAddress, timeout))
+ AspectInit(interfaceClass, typedActor, actorRef, None, timeout)) //TODO: REVISIT: FIX CLIENT MANAGED ACTORS
typedActor.initialize(proxy)
actorRef.start