WIP
This commit is contained in:
parent
c20aab06eb
commit
a1d02435b1
9 changed files with 107 additions and 97 deletions
|
|
@ -154,78 +154,6 @@ object Actor extends Logging {
|
|||
"\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'."))
|
||||
}, None)
|
||||
|
||||
/**
|
||||
* Creates a Client-managed ActorRef out of the Actor of the specified Class.
|
||||
* If the supplied host and port is identical of the configured local node, it will be a local actor
|
||||
* <pre>
|
||||
* import Actor._
|
||||
* val actor = actorOf(classOf[MyActor],"www.akka.io",2552)
|
||||
* actor.start
|
||||
* actor ! message
|
||||
* actor.stop
|
||||
* </pre>
|
||||
* You can create and start the actor in one statement like this:
|
||||
* <pre>
|
||||
* val actor = actorOf(classOf[MyActor],"www.akka.io",2552).start
|
||||
* </pre>
|
||||
*/
|
||||
def actorOf(factory: => Actor, host: String, port: Int): ActorRef =
|
||||
ActorRegistry.remote.clientManagedActorOf(() => factory, host, port)
|
||||
|
||||
/**
|
||||
* Creates a Client-managed ActorRef out of the Actor of the specified Class.
|
||||
* If the supplied host and port is identical of the configured local node, it will be a local actor
|
||||
* <pre>
|
||||
* import Actor._
|
||||
* val actor = actorOf(classOf[MyActor],"www.akka.io",2552)
|
||||
* actor.start
|
||||
* actor ! message
|
||||
* actor.stop
|
||||
* </pre>
|
||||
* You can create and start the actor in one statement like this:
|
||||
* <pre>
|
||||
* val actor = actorOf(classOf[MyActor],"www.akka.io",2552).start
|
||||
* </pre>
|
||||
*/
|
||||
def actorOf(clazz: Class[_ <: Actor], host: String, port: Int): ActorRef = {
|
||||
import ReflectiveAccess.{ createInstance, noParams, noArgs }
|
||||
ActorRegistry.remote.clientManagedActorOf(() =>
|
||||
createInstance[Actor](clazz.asInstanceOf[Class[_]], noParams, noArgs).getOrElse(
|
||||
throw new ActorInitializationException(
|
||||
"Could not instantiate Actor" +
|
||||
"\nMake sure Actor is NOT defined inside a class/trait," +
|
||||
"\nif so put it outside the class/trait, f.e. in a companion object," +
|
||||
"\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'.")),
|
||||
host, port)
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a Client-managed ActorRef out of the Actor of the specified Class.
|
||||
* If the supplied host and port is identical of the configured local node, it will be a local actor
|
||||
* <pre>
|
||||
* import Actor._
|
||||
* val actor = actorOf[MyActor]("www.akka.io",2552)
|
||||
* actor.start
|
||||
* actor ! message
|
||||
* actor.stop
|
||||
* </pre>
|
||||
* You can create and start the actor in one statement like this:
|
||||
* <pre>
|
||||
* val actor = actorOf[MyActor]("www.akka.io",2552).start
|
||||
* </pre>
|
||||
*/
|
||||
def actorOf[T <: Actor : Manifest](host: String, port: Int): ActorRef = {
|
||||
import ReflectiveAccess.{ createInstance, noParams, noArgs }
|
||||
ActorRegistry.remote.clientManagedActorOf(() =>
|
||||
createInstance[Actor](manifest[T].erasure.asInstanceOf[Class[_]], noParams, noArgs).getOrElse(
|
||||
throw new ActorInitializationException(
|
||||
"Could not instantiate Actor" +
|
||||
"\nMake sure Actor is NOT defined inside a class/trait," +
|
||||
"\nif so put it outside the class/trait, f.e. in a companion object," +
|
||||
"\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'.")),
|
||||
host, port)
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates an ActorRef out of the Actor. Allows you to pass in a factory function
|
||||
* that creates the Actor. Please note that this function can be invoked multiple
|
||||
|
|
|
|||
|
|
@ -737,7 +737,7 @@ class LocalActorRef private[akka] (
|
|||
*/
|
||||
def spawnRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long = Actor.TIMEOUT): ActorRef = guard.withGuard {
|
||||
ensureRemotingEnabled
|
||||
val ref = Actor.actorOf(clazz, hostname, port)
|
||||
val ref = ActorRegistry.remote.actorOf(clazz, hostname, port)
|
||||
ref.timeout = timeout
|
||||
ref.start
|
||||
}
|
||||
|
|
@ -762,7 +762,7 @@ class LocalActorRef private[akka] (
|
|||
def spawnLinkRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long = Actor.TIMEOUT): ActorRef =
|
||||
guard.withGuard {
|
||||
ensureRemotingEnabled
|
||||
val actor = Actor.actorOf(clazz, hostname, port)
|
||||
val actor = ActorRegistry.remote.actorOf(clazz, hostname, port)
|
||||
actor.timeout = timeout
|
||||
link(actor)
|
||||
actor.start
|
||||
|
|
|
|||
|
|
@ -63,6 +63,80 @@ abstract class RemoteSupport extends ListenerManagement with RemoteServerModule
|
|||
this.shutdownServerModule
|
||||
clear
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Creates a Client-managed ActorRef out of the Actor of the specified Class.
|
||||
* If the supplied host and port is identical of the configured local node, it will be a local actor
|
||||
* <pre>
|
||||
* import Actor._
|
||||
* val actor = actorOf(classOf[MyActor],"www.akka.io",2552)
|
||||
* actor.start
|
||||
* actor ! message
|
||||
* actor.stop
|
||||
* </pre>
|
||||
* You can create and start the actor in one statement like this:
|
||||
* <pre>
|
||||
* val actor = actorOf(classOf[MyActor],"www.akka.io",2552).start
|
||||
* </pre>
|
||||
*/
|
||||
def actorOf(factory: => Actor, host: String, port: Int): ActorRef =
|
||||
ActorRegistry.remote.clientManagedActorOf(() => factory, host, port)
|
||||
|
||||
/**
|
||||
* Creates a Client-managed ActorRef out of the Actor of the specified Class.
|
||||
* If the supplied host and port is identical of the configured local node, it will be a local actor
|
||||
* <pre>
|
||||
* import Actor._
|
||||
* val actor = actorOf(classOf[MyActor],"www.akka.io",2552)
|
||||
* actor.start
|
||||
* actor ! message
|
||||
* actor.stop
|
||||
* </pre>
|
||||
* You can create and start the actor in one statement like this:
|
||||
* <pre>
|
||||
* val actor = actorOf(classOf[MyActor],"www.akka.io",2552).start
|
||||
* </pre>
|
||||
*/
|
||||
def actorOf(clazz: Class[_ <: Actor], host: String, port: Int): ActorRef = {
|
||||
import ReflectiveAccess.{ createInstance, noParams, noArgs }
|
||||
clientManagedActorOf(() =>
|
||||
createInstance[Actor](clazz.asInstanceOf[Class[_]], noParams, noArgs).getOrElse(
|
||||
throw new ActorInitializationException(
|
||||
"Could not instantiate Actor" +
|
||||
"\nMake sure Actor is NOT defined inside a class/trait," +
|
||||
"\nif so put it outside the class/trait, f.e. in a companion object," +
|
||||
"\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'.")),
|
||||
host, port)
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a Client-managed ActorRef out of the Actor of the specified Class.
|
||||
* If the supplied host and port is identical of the configured local node, it will be a local actor
|
||||
* <pre>
|
||||
* import Actor._
|
||||
* val actor = actorOf[MyActor]("www.akka.io",2552)
|
||||
* actor.start
|
||||
* actor ! message
|
||||
* actor.stop
|
||||
* </pre>
|
||||
* You can create and start the actor in one statement like this:
|
||||
* <pre>
|
||||
* val actor = actorOf[MyActor]("www.akka.io",2552).start
|
||||
* </pre>
|
||||
*/
|
||||
def actorOf[T <: Actor : Manifest](host: String, port: Int): ActorRef = {
|
||||
import ReflectiveAccess.{ createInstance, noParams, noArgs }
|
||||
clientManagedActorOf(() =>
|
||||
createInstance[Actor](manifest[T].erasure.asInstanceOf[Class[_]], noParams, noArgs).getOrElse(
|
||||
throw new ActorInitializationException(
|
||||
"Could not instantiate Actor" +
|
||||
"\nMake sure Actor is NOT defined inside a class/trait," +
|
||||
"\nif so put it outside the class/trait, f.e. in a companion object," +
|
||||
"\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'.")),
|
||||
host, port)
|
||||
}
|
||||
|
||||
protected override def manageLifeCycleOfListeners = false
|
||||
protected[akka] override def notifyListeners(message: => Any): Unit = super.notifyListeners(message)
|
||||
|
||||
|
|
|
|||
|
|
@ -564,7 +564,7 @@ class NettyRemoteSupport extends RemoteSupport with NettyRemoteServerModule with
|
|||
protected[akka] def actorFor(serviceId: String, className: String, timeout: Long, host: String, port: Int, loader: Option[ClassLoader]): ActorRef = {
|
||||
if (optimizeLocalScoped_?) {
|
||||
val home = this.address
|
||||
if (host == home.getHostName && port == home.getPort) {//TODO: switch to InetSocketAddres.equals?
|
||||
if (host == home.getHostName && port == home.getPort) {//TODO: switch to InetSocketAddress.equals?
|
||||
val localRef = findActorByIdOrUuid(serviceId,serviceId)
|
||||
|
||||
if (localRef ne null) return localRef //Code significantly simpler with the return statement
|
||||
|
|
@ -575,6 +575,13 @@ class NettyRemoteSupport extends RemoteSupport with NettyRemoteServerModule with
|
|||
}
|
||||
|
||||
def clientManagedActorOf(factory: () => Actor, host: String, port: Int): ActorRef = {
|
||||
|
||||
if (optimizeLocalScoped_?) {
|
||||
val home = this.address
|
||||
if (host == home.getHostName && port == home.getPort)//TODO: switch to InetSocketAddress.equals?
|
||||
return new LocalActorRef(factory, None) // Code is much simpler with return
|
||||
}
|
||||
|
||||
val ref = new LocalActorRef(factory, Some(new InetSocketAddress(host, port)))
|
||||
//ref.timeout = timeout //removed because setting default timeout should be done after construction
|
||||
ref
|
||||
|
|
|
|||
|
|
@ -76,7 +76,7 @@ class MyActorCustomConstructor extends Actor {
|
|||
class ClientInitiatedRemoteActorSpec extends AkkaRemoteTest {
|
||||
"ClientInitiatedRemoteActor" should {
|
||||
"shouldSendOneWay" in {
|
||||
val clientManaged = actorOf[RemoteActorSpecActorUnidirectional](host,port).start
|
||||
val clientManaged = remote.actorOf[RemoteActorSpecActorUnidirectional](host,port).start
|
||||
clientManaged must not be null
|
||||
clientManaged.getClass must be (classOf[LocalActorRef])
|
||||
clientManaged ! "OneWay"
|
||||
|
|
@ -86,7 +86,7 @@ class ClientInitiatedRemoteActorSpec extends AkkaRemoteTest {
|
|||
|
||||
"shouldSendOneWayAndReceiveReply" in {
|
||||
val latch = new CountDownLatch(1)
|
||||
val actor = actorOf[SendOneWayAndReplyReceiverActor](host,port).start
|
||||
val actor = remote.actorOf[SendOneWayAndReplyReceiverActor](host,port).start
|
||||
implicit val sender = Some(actorOf(new CountDownActor(latch)).start)
|
||||
|
||||
actor ! "Hello"
|
||||
|
|
@ -95,14 +95,14 @@ class ClientInitiatedRemoteActorSpec extends AkkaRemoteTest {
|
|||
}
|
||||
|
||||
"shouldSendBangBangMessageAndReceiveReply" in {
|
||||
val actor = actorOf[RemoteActorSpecActorBidirectional](host,port).start
|
||||
val actor = remote.actorOf[RemoteActorSpecActorBidirectional](host,port).start
|
||||
val result = actor !! "Hello"
|
||||
"World" must equal (result.get.asInstanceOf[String])
|
||||
actor.stop
|
||||
}
|
||||
|
||||
"shouldSendBangBangMessageAndReceiveReplyConcurrently" in {
|
||||
val actors = (1 to 10).map(num => { actorOf[RemoteActorSpecActorBidirectional](host,port).start }).toList
|
||||
val actors = (1 to 10).map(num => { remote.actorOf[RemoteActorSpecActorBidirectional](host,port).start }).toList
|
||||
actors.map(_ !!! "Hello") foreach { future =>
|
||||
"World" must equal (future.await.result.asInstanceOf[Option[String]].get)
|
||||
}
|
||||
|
|
@ -110,8 +110,8 @@ class ClientInitiatedRemoteActorSpec extends AkkaRemoteTest {
|
|||
}
|
||||
|
||||
"shouldRegisterActorByUuid" in {
|
||||
val actor1 = actorOf[MyActorCustomConstructor](host, port).start
|
||||
val actor2 = actorOf[MyActorCustomConstructor](host, port).start
|
||||
val actor1 = remote.actorOf[MyActorCustomConstructor](host, port).start
|
||||
val actor2 = remote.actorOf[MyActorCustomConstructor](host, port).start
|
||||
|
||||
actor1 ! "incrPrefix"
|
||||
|
||||
|
|
@ -129,7 +129,7 @@ class ClientInitiatedRemoteActorSpec extends AkkaRemoteTest {
|
|||
|
||||
"shouldSendAndReceiveRemoteException" in {
|
||||
|
||||
val actor = actorOf[RemoteActorSpecActorBidirectional](host, port).start
|
||||
val actor = remote.actorOf[RemoteActorSpecActorBidirectional](host, port).start
|
||||
try {
|
||||
implicit val timeout = 500000000L
|
||||
val f = (actor !!! "Failure").await.resultOrException
|
||||
|
|
|
|||
|
|
@ -228,7 +228,7 @@ class RemoteSupervisorSpec extends AkkaRemoteTest {
|
|||
// Then create a concrete container in which we mix in support for the specific
|
||||
// implementation of the Actors we want to use.
|
||||
|
||||
pingpong1 = actorOf[RemotePingPong1Actor](host,port).start
|
||||
pingpong1 = remote.actorOf[RemotePingPong1Actor](host,port).start
|
||||
|
||||
val factory = SupervisorFactory(
|
||||
SupervisorConfig(
|
||||
|
|
@ -242,7 +242,7 @@ class RemoteSupervisorSpec extends AkkaRemoteTest {
|
|||
}
|
||||
|
||||
def getSingleActorOneForOneSupervisor: Supervisor = {
|
||||
pingpong1 = actorOf[RemotePingPong1Actor](host,port).start
|
||||
pingpong1 = remote.actorOf[RemotePingPong1Actor](host,port).start
|
||||
|
||||
val factory = SupervisorFactory(
|
||||
SupervisorConfig(
|
||||
|
|
@ -255,9 +255,9 @@ class RemoteSupervisorSpec extends AkkaRemoteTest {
|
|||
}
|
||||
|
||||
def getMultipleActorsAllForOneConf: Supervisor = {
|
||||
pingpong1 = actorOf[RemotePingPong1Actor](host,port).start
|
||||
pingpong2 = actorOf[RemotePingPong2Actor](host,port).start
|
||||
pingpong3 = actorOf[RemotePingPong3Actor](host,port).start
|
||||
pingpong1 = remote.actorOf[RemotePingPong1Actor](host,port).start
|
||||
pingpong2 = remote.actorOf[RemotePingPong2Actor](host,port).start
|
||||
pingpong3 = remote.actorOf[RemotePingPong3Actor](host,port).start
|
||||
|
||||
val factory = SupervisorFactory(
|
||||
SupervisorConfig(
|
||||
|
|
@ -278,9 +278,9 @@ class RemoteSupervisorSpec extends AkkaRemoteTest {
|
|||
}
|
||||
|
||||
def getMultipleActorsOneForOneConf: Supervisor = {
|
||||
pingpong1 = actorOf[RemotePingPong1Actor](host,port).start
|
||||
pingpong2 = actorOf[RemotePingPong2Actor](host,port).start
|
||||
pingpong3 = actorOf[RemotePingPong3Actor](host,port).start
|
||||
pingpong1 = remote.actorOf[RemotePingPong1Actor](host,port).start
|
||||
pingpong2 = remote.actorOf[RemotePingPong2Actor](host,port).start
|
||||
pingpong3 = remote.actorOf[RemotePingPong3Actor](host,port).start
|
||||
|
||||
val factory = SupervisorFactory(
|
||||
SupervisorConfig(
|
||||
|
|
@ -301,9 +301,9 @@ class RemoteSupervisorSpec extends AkkaRemoteTest {
|
|||
}
|
||||
|
||||
def getNestedSupervisorsAllForOneConf: Supervisor = {
|
||||
pingpong1 = actorOf[RemotePingPong1Actor](host,port).start
|
||||
pingpong2 = actorOf[RemotePingPong2Actor](host,port).start
|
||||
pingpong3 = actorOf[RemotePingPong3Actor](host,port).start
|
||||
pingpong1 = remote.actorOf[RemotePingPong1Actor](host,port).start
|
||||
pingpong2 = remote.actorOf[RemotePingPong2Actor](host,port).start
|
||||
pingpong3 = remote.actorOf[RemotePingPong3Actor](host,port).start
|
||||
|
||||
val factory = SupervisorFactory(
|
||||
SupervisorConfig(
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ package sample.remote
|
|||
import akka.actor.Actor._
|
||||
import akka.util.Logging
|
||||
import akka.actor. {ActorRegistry, Actor}
|
||||
import ActorRegistry.remote
|
||||
|
||||
class RemoteHelloWorldActor extends Actor {
|
||||
def receive = {
|
||||
|
|
@ -18,7 +19,7 @@ class RemoteHelloWorldActor extends Actor {
|
|||
|
||||
object ClientManagedRemoteActorServer extends Logging {
|
||||
def run = {
|
||||
ActorRegistry.remote.start("localhost", 2552)
|
||||
remote.start("localhost", 2552)
|
||||
log.slf4j.info("Remote node started")
|
||||
}
|
||||
|
||||
|
|
@ -28,7 +29,7 @@ object ClientManagedRemoteActorServer extends Logging {
|
|||
object ClientManagedRemoteActorClient extends Logging {
|
||||
|
||||
def run = {
|
||||
val actor = actorOf[RemoteHelloWorldActor]("localhost",2552).start
|
||||
val actor = remote.actorOf[RemoteHelloWorldActor]("localhost",2552).start
|
||||
log.slf4j.info("Remote actor created, moved to the server")
|
||||
log.slf4j.info("Sending 'Hello' to remote actor")
|
||||
val result = actor !! "Hello"
|
||||
|
|
|
|||
|
|
@ -488,7 +488,7 @@ object TypedActor extends Logging {
|
|||
config match {
|
||||
case null => actorOf(typedActor)
|
||||
case c: TypedActorConfiguration if (c._host.isDefined) =>
|
||||
actorOf(typedActor, c._host.get.getHostName, c._host.get.getPort)
|
||||
ActorRegistry.remote.actorOf(typedActor, c._host.get.getHostName, c._host.get.getPort)
|
||||
case _ => actorOf(typedActor)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -112,7 +112,7 @@ private[akka] class TypedActorGuiceConfigurator extends TypedActorConfiguratorBa
|
|||
component.remoteAddress match {
|
||||
case Some(a) =>
|
||||
(Some(new InetSocketAddress(a.hostname, a.port)),
|
||||
Actor.actorOf(TypedActor.newTypedActor(implementationClass), a.hostname, a.port))
|
||||
ActorRegistry.remote.actorOf(TypedActor.newTypedActor(implementationClass), a.hostname, a.port))
|
||||
case None =>
|
||||
(None, Actor.actorOf(TypedActor.newTypedActor(implementationClass)))
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue