All tests passing, still some work to be done though, but thank God for all tests being green ;)

This commit is contained in:
Viktor Klang 2010-12-21 14:36:47 +01:00
parent 1fa105faa8
commit c20aab06eb
19 changed files with 445 additions and 660 deletions

View file

@ -115,7 +115,7 @@ object Actor extends Logging {
private[actor] val actorRefInCreation = new scala.util.DynamicVariable[Option[ActorRef]](None) private[actor] val actorRefInCreation = new scala.util.DynamicVariable[Option[ActorRef]](None)
/** /**
* Creates an ActorRef out of the Actor with type T. * Creates an ActorRef out of the Actor with type T.
* <pre> * <pre>
* import Actor._ * import Actor._
* val actor = actorOf[MyActor] * val actor = actorOf[MyActor]
@ -128,26 +128,7 @@ object Actor extends Logging {
* val actor = actorOf[MyActor].start * val actor = actorOf[MyActor].start
* </pre> * </pre>
*/ */
def actorOf[T <: Actor : Manifest]: ActorRef = def actorOf[T <: Actor : Manifest]: ActorRef = actorOf(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]])
ActorRegistry.actorOf[T]
/**
* Creates a Client-managed ActorRef out of the Actor of the specified Class.
* If the supplied host and port is identical of the configured local node, it will be a local actor
* <pre>
* import Actor._
* val actor = actorOf[MyActor]("www.akka.io",2552)
* actor.start
* actor ! message
* actor.stop
* </pre>
* You can create and start the actor in one statement like this:
* <pre>
* val actor = actorOf[MyActor]("www.akka.io",2552).start
* </pre>
*/
def actorOf[T <: Actor : Manifest](host: String, port: Int): ActorRef =
ActorRegistry.actorOf[T](host,port)
/** /**
* Creates an ActorRef out of the Actor of the specified Class. * Creates an ActorRef out of the Actor of the specified Class.
@ -163,8 +144,15 @@ object Actor extends Logging {
* val actor = actorOf(classOf[MyActor]).start * val actor = actorOf(classOf[MyActor]).start
* </pre> * </pre>
*/ */
def actorOf(clazz: Class[_ <: Actor]): ActorRef = def actorOf(clazz: Class[_ <: Actor]): ActorRef = new LocalActorRef(() => {
ActorRegistry.actorOf(clazz) import ReflectiveAccess.{ createInstance, noParams, noArgs }
createInstance[Actor](clazz.asInstanceOf[Class[_]], noParams, noArgs).getOrElse(
throw new ActorInitializationException(
"Could not instantiate Actor" +
"\nMake sure Actor is NOT defined inside a class/trait," +
"\nif so put it outside the class/trait, f.e. in a companion object," +
"\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'."))
}, None)
/** /**
* Creates a Client-managed ActorRef out of the Actor of the specified Class. * Creates a Client-managed ActorRef out of the Actor of the specified Class.
@ -181,8 +169,62 @@ object Actor extends Logging {
* val actor = actorOf(classOf[MyActor],"www.akka.io",2552).start * val actor = actorOf(classOf[MyActor],"www.akka.io",2552).start
* </pre> * </pre>
*/ */
def actorOf(clazz: Class[_ <: Actor], host: String, port: Int, timeout: Long = Actor.TIMEOUT): ActorRef = def actorOf(factory: => Actor, host: String, port: Int): ActorRef =
ActorRegistry.actorOf(clazz, host, port, timeout) ActorRegistry.remote.clientManagedActorOf(() => factory, host, port)
/**
* Creates a Client-managed ActorRef out of the Actor of the specified Class.
* If the supplied host and port is identical of the configured local node, it will be a local actor
* <pre>
* import Actor._
* val actor = actorOf(classOf[MyActor],"www.akka.io",2552)
* actor.start
* actor ! message
* actor.stop
* </pre>
* You can create and start the actor in one statement like this:
* <pre>
* val actor = actorOf(classOf[MyActor],"www.akka.io",2552).start
* </pre>
*/
def actorOf(clazz: Class[_ <: Actor], host: String, port: Int): ActorRef = {
import ReflectiveAccess.{ createInstance, noParams, noArgs }
ActorRegistry.remote.clientManagedActorOf(() =>
createInstance[Actor](clazz.asInstanceOf[Class[_]], noParams, noArgs).getOrElse(
throw new ActorInitializationException(
"Could not instantiate Actor" +
"\nMake sure Actor is NOT defined inside a class/trait," +
"\nif so put it outside the class/trait, f.e. in a companion object," +
"\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'.")),
host, port)
}
/**
* Creates a Client-managed ActorRef out of the Actor of the specified Class.
* If the supplied host and port is identical of the configured local node, it will be a local actor
* <pre>
* import Actor._
* val actor = actorOf[MyActor]("www.akka.io",2552)
* actor.start
* actor ! message
* actor.stop
* </pre>
* You can create and start the actor in one statement like this:
* <pre>
* val actor = actorOf[MyActor]("www.akka.io",2552).start
* </pre>
*/
def actorOf[T <: Actor : Manifest](host: String, port: Int): ActorRef = {
import ReflectiveAccess.{ createInstance, noParams, noArgs }
ActorRegistry.remote.clientManagedActorOf(() =>
createInstance[Actor](manifest[T].erasure.asInstanceOf[Class[_]], noParams, noArgs).getOrElse(
throw new ActorInitializationException(
"Could not instantiate Actor" +
"\nMake sure Actor is NOT defined inside a class/trait," +
"\nif so put it outside the class/trait, f.e. in a companion object," +
"\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'.")),
host, port)
}
/** /**
* Creates an ActorRef out of the Actor. Allows you to pass in a factory function * Creates an ActorRef out of the Actor. Allows you to pass in a factory function
@ -202,25 +244,32 @@ object Actor extends Logging {
* val actor = actorOf(new MyActor).start * val actor = actorOf(new MyActor).start
* </pre> * </pre>
*/ */
def actorOf(factory: => Actor): ActorRef = ActorRegistry.actorOf(factory) def actorOf(factory: => Actor): ActorRef = new LocalActorRef(() => factory, None)
/** /**
* Use to spawn out a block of code in an event-driven actor. Will shut actor down when * Use to spawn out a block of code in an event-driven actor. Will shut actor down when
* the block has been executed. * the block has been executed.
* <p/> * <p/>
* NOTE: If used from within an Actor then has to be qualified with 'Actor.spawn' since * NOTE: If used from within an Actor then has to be qualified with 'ActorRegistry.spawn' since
* there is a method 'spawn[ActorType]' in the Actor trait already. * there is a method 'spawn[ActorType]' in the Actor trait already.
* Example: * Example:
* <pre> * <pre>
* import Actor._ * import ActorRegistry.{spawn}
* *
* spawn { * spawn {
* ... // do stuff * ... // do stuff
* } * }
* </pre> * </pre>
*/ */
def spawn(body: => Unit)(implicit dispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher): Unit = def spawn(body: => Unit)(implicit dispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher): Unit = {
ActorRegistry.spawn(body)(dispatcher) case object Spawn
actorOf(new Actor() {
self.dispatcher = dispatcher
def receive = {
case Spawn => try { body } finally { self.stop }
}
}).start ! Spawn
}
/** /**

View file

@ -172,14 +172,14 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
def getDispatcher(): MessageDispatcher = dispatcher def getDispatcher(): MessageDispatcher = dispatcher
/** /**
* Returns on which node this actor lives * Returns on which node this actor lives if None it lives in the local ActorRegistry
*/ */
def homeAddress: InetSocketAddress def homeAddress: Option[InetSocketAddress]
/** /**
* Java API * Java API
*/ */
def getHomeAddress(): InetSocketAddress = homeAddress def getHomeAddress(): InetSocketAddress = homeAddress getOrElse null
/** /**
* Holds the hot swapped partial function. * Holds the hot swapped partial function.
@ -451,7 +451,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
* <p/> * <p/>
* To be invoked from within the actor itself. * To be invoked from within the actor itself.
*/ */
def spawnRemote(clazz: Class[_ <: Actor], hostname: String, port: Int): ActorRef def spawnRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long): ActorRef
/** /**
* Atomically create (from actor class), link and start an actor. * Atomically create (from actor class), link and start an actor.
@ -465,7 +465,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
* <p/> * <p/>
* To be invoked from within the actor itself. * To be invoked from within the actor itself.
*/ */
def spawnLinkRemote(clazz: Class[_ <: Actor], hostname: String, port: Int): ActorRef def spawnLinkRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long): ActorRef
/** /**
* Returns the mailbox size. * Returns the mailbox size.
@ -551,7 +551,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
*/ */
class LocalActorRef private[akka] ( class LocalActorRef private[akka] (
private[this] val actorFactory: () => Actor, private[this] val actorFactory: () => Actor,
val homeAddress: InetSocketAddress = Remote.localAddress) val homeAddress: Option[InetSocketAddress])
extends ActorRef with ScalaActorRef { extends ActorRef with ScalaActorRef {
@volatile @volatile
@ -582,7 +582,7 @@ class LocalActorRef private[akka] (
__supervisor: Option[ActorRef], __supervisor: Option[ActorRef],
__hotswap: Stack[PartialFunction[Any, Unit]], __hotswap: Stack[PartialFunction[Any, Unit]],
__factory: () => Actor, __factory: () => Actor,
__homeAddress: InetSocketAddress) = { __homeAddress: Option[InetSocketAddress]) = {
this(__factory, __homeAddress) this(__factory, __homeAddress)
_uuid = __uuid _uuid = __uuid
id = __id id = __id
@ -595,7 +595,10 @@ class LocalActorRef private[akka] (
start start
} }
private final def isClientManaged_? = (homeAddress ne Remote.localAddress) && homeAddress != Remote.localAddress /**
* Returns whether this actor ref is client-managed remote or not
*/
private[akka] final def isClientManaged_? = homeAddress.isDefined && isRemotingEnabled
// ========= PUBLIC FUNCTIONS ========= // ========= PUBLIC FUNCTIONS =========
@ -640,8 +643,8 @@ class LocalActorRef private[akka] (
if ((actorInstance ne null) && (actorInstance.get ne null)) if ((actorInstance ne null) && (actorInstance.get ne null))
initializeActorInstance initializeActorInstance
if (isRemotingEnabled && isClientManaged_?) if (isClientManaged_?)
ActorRegistry.remote.registerClientManagedActor(homeAddress.getHostName,homeAddress.getPort, uuid) ActorRegistry.remote.registerClientManagedActor(homeAddress.get.getHostName,homeAddress.get.getPort, uuid)
checkReceiveTimeout //Schedule the initial Receive timeout checkReceiveTimeout //Schedule the initial Receive timeout
} }
@ -661,7 +664,7 @@ class LocalActorRef private[akka] (
ActorRegistry.unregister(this) ActorRegistry.unregister(this)
if (isRemotingEnabled) { if (isRemotingEnabled) {
if (isClientManaged_?) if (isClientManaged_?)
ActorRegistry.remote.registerClientManagedActor(homeAddress.getHostName,homeAddress.getPort, uuid) ActorRegistry.remote.registerClientManagedActor(homeAddress.get.getHostName,homeAddress.get.getPort, uuid)
ActorRegistry.remote.unregister(this) ActorRegistry.remote.unregister(this)
} }
setActorSelfFields(actorInstance.get,null) setActorSelfFields(actorInstance.get,null)
@ -732,9 +735,11 @@ class LocalActorRef private[akka] (
* <p/> * <p/>
* To be invoked from within the actor itself. * To be invoked from within the actor itself.
*/ */
def spawnRemote(clazz: Class[_ <: Actor], hostname: String, port: Int): ActorRef = guard.withGuard { def spawnRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long = Actor.TIMEOUT): ActorRef = guard.withGuard {
ensureRemotingEnabled ensureRemotingEnabled
Actor.actorOf(clazz, hostname, port).start val ref = Actor.actorOf(clazz, hostname, port)
ref.timeout = timeout
ref.start
} }
/** /**
@ -754,13 +759,15 @@ class LocalActorRef private[akka] (
* <p/> * <p/>
* To be invoked from within the actor itself. * To be invoked from within the actor itself.
*/ */
def spawnLinkRemote(clazz: Class[_ <: Actor], hostname: String, port: Int): ActorRef = guard.withGuard { def spawnLinkRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long = Actor.TIMEOUT): ActorRef =
ensureRemotingEnabled guard.withGuard {
val actor = Actor.actorOf(clazz, hostname, port) ensureRemotingEnabled
link(actor) val actor = Actor.actorOf(clazz, hostname, port)
actor.start actor.timeout = timeout
actor link(actor)
} actor.start
actor
}
/** /**
* Returns the mailbox. * Returns the mailbox.
@ -790,9 +797,9 @@ class LocalActorRef private[akka] (
protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit = _supervisor = sup protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit = _supervisor = sup
protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit =
if (isClientManaged_? && isRemotingEnabled) { if (isClientManaged_?) {
ActorRegistry.remote.send[Any]( ActorRegistry.remote.send[Any](
message, senderOption, None, homeAddress, timeout, true, this, None, ActorType.ScalaActor, None) message, senderOption, None, homeAddress.get, timeout, true, this, None, ActorType.ScalaActor, None)
} else } else
dispatcher dispatchMessage new MessageInvocation(this, message, senderOption, None) dispatcher dispatchMessage new MessageInvocation(this, message, senderOption, None)
@ -801,9 +808,9 @@ class LocalActorRef private[akka] (
timeout: Long, timeout: Long,
senderOption: Option[ActorRef], senderOption: Option[ActorRef],
senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = {
if (isClientManaged_? && isRemotingEnabled) { if (isClientManaged_?) {
val future = ActorRegistry.remote.send[T]( val future = ActorRegistry.remote.send[T](
message, senderOption, senderFuture, homeAddress, timeout, false, this, None, ActorType.ScalaActor, None) message, senderOption, senderFuture, homeAddress.get, timeout, false, this, None, ActorType.ScalaActor, None)
if (future.isDefined) future.get if (future.isDefined) future.get
else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString) else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString)
} else { } else {
@ -970,7 +977,8 @@ class LocalActorRef private[akka] (
protected[akka] def registerSupervisorAsRemoteActor: Option[Uuid] = guard.withGuard { protected[akka] def registerSupervisorAsRemoteActor: Option[Uuid] = guard.withGuard {
ensureRemotingEnabled ensureRemotingEnabled
if (_supervisor.isDefined) { if (_supervisor.isDefined) {
ActorRegistry.remote.registerSupervisorForActor(this) if (homeAddress.isDefined)
ActorRegistry.remote.registerSupervisorForActor(this)
Some(_supervisor.get.uuid) Some(_supervisor.get.uuid)
} else None } else None
} }
@ -1108,7 +1116,7 @@ private[akka] case class RemoteActorRef private[akka] (
ensureRemotingEnabled ensureRemotingEnabled
val homeAddress = new InetSocketAddress(hostname, port) val homeAddress = Some(new InetSocketAddress(hostname, port))
//protected def clientManaged = classOrServiceName.isEmpty //If no class or service name, it's client managed //protected def clientManaged = classOrServiceName.isEmpty //If no class or service name, it's client managed
id = classOrServiceName id = classOrServiceName
@ -1119,14 +1127,14 @@ private[akka] case class RemoteActorRef private[akka] (
start start
def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit =
ActorRegistry.remote.send[Any](message, senderOption, None, homeAddress, timeout, true, this, None, actorType, loader) ActorRegistry.remote.send[Any](message, senderOption, None, homeAddress.get, timeout, true, this, None, actorType, loader)
def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( def postMessageToMailboxAndCreateFutureResultWithTimeout[T](
message: Any, message: Any,
timeout: Long, timeout: Long,
senderOption: Option[ActorRef], senderOption: Option[ActorRef],
senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = {
val future = ActorRegistry.remote.send[T](message, senderOption, senderFuture, homeAddress, timeout, false, this, None, actorType, loader) val future = ActorRegistry.remote.send[T](message, senderOption, senderFuture, homeAddress.get, timeout, false, this, None, actorType, loader)
if (future.isDefined) future.get if (future.isDefined) future.get
else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString) else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString)
} }
@ -1158,9 +1166,9 @@ private[akka] case class RemoteActorRef private[akka] (
def startLink(actorRef: ActorRef): Unit = unsupported def startLink(actorRef: ActorRef): Unit = unsupported
def startLinkRemote(actorRef: ActorRef, hostname: String, port: Int): Unit = unsupported def startLinkRemote(actorRef: ActorRef, hostname: String, port: Int): Unit = unsupported
def spawn(clazz: Class[_ <: Actor]): ActorRef = unsupported def spawn(clazz: Class[_ <: Actor]): ActorRef = unsupported
def spawnRemote(clazz: Class[_ <: Actor], hostname: String, port: Int): ActorRef = unsupported def spawnRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long): ActorRef = unsupported
def spawnLink(clazz: Class[_ <: Actor]): ActorRef = unsupported def spawnLink(clazz: Class[_ <: Actor]): ActorRef = unsupported
def spawnLinkRemote(clazz: Class[_ <: Actor], hostname: String, port: Int): ActorRef = unsupported def spawnLinkRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long): ActorRef = unsupported
def supervisor: Option[ActorRef] = unsupported def supervisor: Option[ActorRef] = unsupported
def shutdownLinkedActors: Unit = unsupported def shutdownLinkedActors: Unit = unsupported
protected[akka] def mailbox: AnyRef = unsupported protected[akka] def mailbox: AnyRef = unsupported
@ -1397,9 +1405,9 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef =>
/** /**
* Atomically create (from actor class), start and make an actor remote. * Atomically create (from actor class), start and make an actor remote.
*/ */
def spawnRemote[T <: Actor: Manifest](hostname: String, port: Int): ActorRef = { def spawnRemote[T <: Actor: Manifest](hostname: String, port: Int, timeout: Long): ActorRef = {
ensureRemotingEnabled ensureRemotingEnabled
spawnRemote(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]], hostname, port) spawnRemote(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]], hostname, port, timeout)
} }
/** /**
@ -1411,9 +1419,9 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef =>
/** /**
* Atomically create (from actor class), start, link and make an actor remote. * Atomically create (from actor class), start, link and make an actor remote.
*/ */
def spawnLinkRemote[T <: Actor: Manifest](hostname: String, port: Int): ActorRef = { def spawnLinkRemote[T <: Actor: Manifest](hostname: String, port: Int, timeout: Long): ActorRef = {
ensureRemotingEnabled ensureRemotingEnabled
spawnLinkRemote(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]], hostname, port) spawnLinkRemote(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]], hostname, port, timeout)
} }
} }

View file

@ -234,125 +234,9 @@ object ActorRegistry extends ListenerManagement {
lazy val remote: RemoteSupport = remoteBootstrap.map(_()).getOrElse(throw new UnsupportedOperationException("You need to have akka-remote on classpath")) lazy val remote: RemoteSupport = remoteBootstrap.map(_()).getOrElse(throw new UnsupportedOperationException("You need to have akka-remote on classpath"))
/** /**
* Creates an ActorRef out of the Actor with type T. * Current home address of this ActorRegistry
* <pre>
* import Actor._
* val actor = actorOf[MyActor]
* actor.start
* actor ! message
* actor.stop
* </pre>
* You can create and start the actor in one statement like this:
* <pre>
* val actor = actorOf[MyActor].start
* </pre>
*/ */
def actorOf[T <: Actor : Manifest]: ActorRef = actorOf(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]]) def homeAddress(): InetSocketAddress = if (isRemotingEnabled) remote.address else Remote.configDefaultAddress
/**
* Creates a Client-managed ActorRef out of the Actor of the specified Class.
* If the supplied host and port is identical of the configured local node, it will be a local actor
* <pre>
* import Actor._
* val actor = actorOf[MyActor]("www.akka.io",2552)
* actor.start
* actor ! message
* actor.stop
* </pre>
* You can create and start the actor in one statement like this:
* <pre>
* val actor = actorOf[MyActor]("www.akka.io",2552).start
* </pre>
*/
def actorOf[T <: Actor : Manifest](host: String, port: Int): ActorRef =
actorOf(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]], host, port)
/**
* Creates an ActorRef out of the Actor of the specified Class.
* <pre>
* import Actor._
* val actor = actorOf(classOf[MyActor])
* actor.start
* actor ! message
* actor.stop
* </pre>
* You can create and start the actor in one statement like this:
* <pre>
* val actor = actorOf(classOf[MyActor]).start
* </pre>
*/
def actorOf(clazz: Class[_ <: Actor]): ActorRef = new LocalActorRef(() => {
import ReflectiveAccess.{ createInstance, noParams, noArgs }
createInstance[Actor](clazz.asInstanceOf[Class[_]], noParams, noArgs).getOrElse(
throw new ActorInitializationException(
"Could not instantiate Actor" +
"\nMake sure Actor is NOT defined inside a class/trait," +
"\nif so put it outside the class/trait, f.e. in a companion object," +
"\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'."))
})
/**
* Creates a Client-managed ActorRef out of the Actor of the specified Class.
* If the supplied host and port is identical of the configured local node, it will be a local actor
* <pre>
* import Actor._
* val actor = actorOf(classOf[MyActor],"www.akka.io",2552)
* actor.start
* actor ! message
* actor.stop
* </pre>
* You can create and start the actor in one statement like this:
* <pre>
* val actor = actorOf(classOf[MyActor],"www.akka.io",2552).start
* </pre>
*/
def actorOf(clazz: Class[_ <: Actor], host: String, port: Int, timeout: Long = Actor.TIMEOUT): ActorRef =
remote.clientManagedActorOf(clazz, host, port, timeout)
/**
* Creates an ActorRef out of the Actor. Allows you to pass in a factory function
* that creates the Actor. Please note that this function can be invoked multiple
* times if for example the Actor is supervised and needs to be restarted.
* <p/>
* This function should <b>NOT</b> be used for remote actors.
* <pre>
* import Actor._
* val actor = actorOf(new MyActor)
* actor.start
* actor ! message
* actor.stop
* </pre>
* You can create and start the actor in one statement like this:
* <pre>
* val actor = actorOf(new MyActor).start
* </pre>
*/
def actorOf(factory: => Actor): ActorRef = new LocalActorRef(() => factory)
/**
* Use to spawn out a block of code in an event-driven actor. Will shut actor down when
* the block has been executed.
* <p/>
* NOTE: If used from within an Actor then has to be qualified with 'ActorRegistry.spawn' since
* there is a method 'spawn[ActorType]' in the Actor trait already.
* Example:
* <pre>
* import ActorRegistry.{spawn}
*
* spawn {
* ... // do stuff
* }
* </pre>
*/
def spawn(body: => Unit)(implicit dispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher): Unit = {
case object Spawn
actorOf(new Actor() {
self.dispatcher = dispatcher
def receive = {
case Spawn => try { body } finally { self.stop }
}
}).start ! Spawn
}
/** /**

View file

@ -142,7 +142,7 @@ sealed class Supervisor(handler: FaultHandlingStrategy) {
actorRef.lifeCycle = lifeCycle actorRef.lifeCycle = lifeCycle
supervisor.link(actorRef) supervisor.link(actorRef)
if (registerAsRemoteService) if (registerAsRemoteService)
ActorRegistry.remote.register(actorRef) //TODO: REVISIT: Is this the most sensible approach? other way of obtaining ActorRegistry? ActorRegistry.remote.register(actorRef)
case supervisorConfig @ SupervisorConfig(_, _) => // recursive supervisor configuration case supervisorConfig @ SupervisorConfig(_, _) => // recursive supervisor configuration
val childSupervisor = Supervisor(supervisorConfig) val childSupervisor = Supervisor(supervisorConfig)
supervisor.link(childSupervisor.supervisor) supervisor.link(childSupervisor.supervisor)

View file

@ -132,7 +132,7 @@ trait MessageDispatcher extends MailboxFactory with Logging {
val i = uuids.iterator val i = uuids.iterator
while(i.hasNext()) { while(i.hasNext()) {
val uuid = i.next() val uuid = i.next()
ActorRegistry.actorFor(uuid) match { //TODO: REVISIT: How to keep track of which registry? ActorRegistry.actorFor(uuid) match {
case Some(actor) => actor.stop case Some(actor) => actor.stop
case None => case None =>
log.slf4j.error("stopAllLinkedActors couldn't find linked actor: " + uuid) log.slf4j.error("stopAllLinkedActors couldn't find linked actor: " + uuid)

View file

@ -13,6 +13,8 @@ import akka.config.Config.{config, TIME_UNIT}
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
trait RemoteModule extends Logging { trait RemoteModule extends Logging {
val UUID_PREFIX = "uuid:"
def optimizeLocalScoped_?(): Boolean //Apply optimizations for remote operations in local scope def optimizeLocalScoped_?(): Boolean //Apply optimizations for remote operations in local scope
protected[akka] def notifyListeners(message: => Any): Unit protected[akka] def notifyListeners(message: => Any): Unit
@ -23,6 +25,35 @@ trait RemoteModule extends Logging {
private[akka] def typedActors: ConcurrentHashMap[String, AnyRef] private[akka] def typedActors: ConcurrentHashMap[String, AnyRef]
private[akka] def typedActorsByUuid: ConcurrentHashMap[String, AnyRef] private[akka] def typedActorsByUuid: ConcurrentHashMap[String, AnyRef]
private[akka] def typedActorsFactories: ConcurrentHashMap[String, () => AnyRef] private[akka] def typedActorsFactories: ConcurrentHashMap[String, () => AnyRef]
/** Lookup methods **/
private[akka] def findActorById(id: String) : ActorRef = actors.get(id)
private[akka] def findActorByUuid(uuid: String) : ActorRef = actorsByUuid.get(uuid)
private[akka] def findActorFactory(id: String) : () => ActorRef = actorsFactories.get(id)
private[akka] def findTypedActorById(id: String) : AnyRef = typedActors.get(id)
private[akka] def findTypedActorFactory(id: String) : () => AnyRef = typedActorsFactories.get(id)
private[akka] def findTypedActorByUuid(uuid: String) : AnyRef = typedActorsByUuid.get(uuid)
private[akka] def findActorByIdOrUuid(id: String, uuid: String) : ActorRef = {
var actorRefOrNull = if (id.startsWith(UUID_PREFIX)) findActorByUuid(id.substring(UUID_PREFIX.length))
else findActorById(id)
if (actorRefOrNull eq null) actorRefOrNull = findActorByUuid(uuid)
actorRefOrNull
}
private[akka] def findTypedActorByIdOrUuid(id: String, uuid: String) : AnyRef = {
var actorRefOrNull = if (id.startsWith(UUID_PREFIX)) findTypedActorByUuid(id.substring(UUID_PREFIX.length))
else findTypedActorById(id)
if (actorRefOrNull eq null) actorRefOrNull = findTypedActorByUuid(uuid)
actorRefOrNull
}
} }
@ -35,11 +66,11 @@ abstract class RemoteSupport extends ListenerManagement with RemoteServerModule
protected override def manageLifeCycleOfListeners = false protected override def manageLifeCycleOfListeners = false
protected[akka] override def notifyListeners(message: => Any): Unit = super.notifyListeners(message) protected[akka] override def notifyListeners(message: => Any): Unit = super.notifyListeners(message)
private[akka] val actors = new ConcurrentHashMap[String, ActorRef] private[akka] val actors = new ConcurrentHashMap[String, ActorRef]
private[akka] val actorsByUuid = new ConcurrentHashMap[String, ActorRef] private[akka] val actorsByUuid = new ConcurrentHashMap[String, ActorRef]
private[akka] val actorsFactories = new ConcurrentHashMap[String, () => ActorRef] private[akka] val actorsFactories = new ConcurrentHashMap[String, () => ActorRef]
private[akka] val typedActors = new ConcurrentHashMap[String, AnyRef] private[akka] val typedActors = new ConcurrentHashMap[String, AnyRef]
private[akka] val typedActorsByUuid = new ConcurrentHashMap[String, AnyRef] private[akka] val typedActorsByUuid = new ConcurrentHashMap[String, AnyRef]
private[akka] val typedActorsFactories = new ConcurrentHashMap[String, () => AnyRef] private[akka] val typedActorsFactories = new ConcurrentHashMap[String, () => AnyRef]
def clear { def clear {
@ -64,19 +95,16 @@ trait RemoteServerModule extends RemoteModule {
def name: String def name: String
/** /**
* Gets the current hostname of the server instance * Gets the address of the server instance
*/ */
def hostname: String def address: InetSocketAddress
/**
* Gets the current port of the server instance
*/
def port: Int
/** /**
* Starts the server up * Starts the server up
*/ */
def start(host: String = ReflectiveAccess.Remote.HOSTNAME, port: Int = ReflectiveAccess.Remote.PORT, loader: Option[ClassLoader] = None): RemoteServerModule def start(host: String = ReflectiveAccess.Remote.configDefaultAddress.getHostName,
port: Int = ReflectiveAccess.Remote.configDefaultAddress.getPort,
loader: Option[ClassLoader] = None): RemoteServerModule
/** /**
* Shuts the server down * Shuts the server down
@ -222,7 +250,7 @@ trait RemoteClientModule extends RemoteModule { self: RemoteModule =>
def typedActorFor[T](intfClass: Class[T], serviceId: String, implClassName: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): T = def typedActorFor[T](intfClass: Class[T], serviceId: String, implClassName: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): T =
typedActorFor(intfClass, serviceId, implClassName, timeout, hostname, port, Some(loader)) typedActorFor(intfClass, serviceId, implClassName, timeout, hostname, port, Some(loader))
def clientManagedActorOf(clazz: Class[_ <: Actor], host: String, port: Int, timeout: Long): ActorRef def clientManagedActorOf(factory: () => Actor, host: String, port: Int): ActorRef
/** Methods that needs to be implemented by a transport **/ /** Methods that needs to be implemented by a transport **/

View file

@ -21,7 +21,7 @@ object ReflectiveAccess extends Logging {
val loader = getClass.getClassLoader val loader = getClass.getClassLoader
lazy val isRemotingEnabled = Remote.isEnabled def isRemotingEnabled = Remote.isEnabled
lazy val isTypedActorEnabled = TypedActorModule.isEnabled lazy val isTypedActorEnabled = TypedActorModule.isEnabled
def ensureRemotingEnabled = Remote.ensureEnabled def ensureRemotingEnabled = Remote.ensureEnabled
@ -33,18 +33,18 @@ object ReflectiveAccess extends Logging {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
object Remote { object Remote {
val TRANSPORT = Config.config.getString("akka.remote.transport","akka.remote.NettyRemoteSupport") val TRANSPORT = Config.config.getString("akka.remote.layer","akka.remote.NettyRemoteSupport")
val HOSTNAME = Config.config.getString("akka.remote.server.hostname", "localhost")
val PORT = Config.config.getInt("akka.remote.server.port", 2552) private[akka] val configDefaultAddress =
new InetSocketAddress(Config.config.getString("akka.remote.server.hostname", "localhost"),
Config.config.getInt("akka.remote.server.port", 2552))
lazy val localAddress = new InetSocketAddress(HOSTNAME,PORT)
lazy val isEnabled = remoteSupportClass.isDefined lazy val isEnabled = remoteSupportClass.isDefined
def ensureEnabled = if (!isEnabled) throw new ModuleNotAvailableException( def ensureEnabled = if (!isEnabled) throw new ModuleNotAvailableException(
"Can't load the remoting module, make sure that akka-remote.jar is on the classpath") "Can't load the remoting module, make sure that akka-remote.jar is on the classpath")
//TODO: REVISIT: Make class configurable
val remoteSupportClass: Option[Class[_ <: RemoteSupport]] = getClassFor(TRANSPORT) val remoteSupportClass: Option[Class[_ <: RemoteSupport]] = getClassFor(TRANSPORT)
protected[akka] val defaultRemoteSupport: Option[() => RemoteSupport] = remoteSupportClass map { protected[akka] val defaultRemoteSupport: Option[() => RemoteSupport] = remoteSupportClass map {

View file

@ -17,8 +17,7 @@ trait BootableRemoteActorService extends Bootable with Logging {
self: BootableActorLoaderService => self: BootableActorLoaderService =>
protected lazy val remoteServerThread = new Thread(new Runnable() { protected lazy val remoteServerThread = new Thread(new Runnable() {
import ReflectiveAccess.Remote.{HOSTNAME,PORT} def run = ActorRegistry.remote.start(loader = self.applicationLoader) //Use config host/port
def run = ActorRegistry.remote.start(HOSTNAME,PORT,loader = self.applicationLoader)
}, "Akka Remote Service") }, "Akka Remote Service")
def startRemoteService = remoteServerThread.start def startRemoteService = remoteServerThread.start

View file

@ -115,10 +115,10 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem
} }
private[akka] def registerSupervisorForActor(actorRef: ActorRef): ActorRef = private[akka] def registerSupervisorForActor(actorRef: ActorRef): ActorRef =
clientFor(actorRef.homeAddress, None).registerSupervisorForActor(actorRef) clientFor(actorRef.homeAddress.get, None).registerSupervisorForActor(actorRef)
private[akka] def deregisterSupervisorForActor(actorRef: ActorRef): ActorRef = private[akka] def deregisterSupervisorForActor(actorRef: ActorRef): ActorRef =
clientFor(actorRef.homeAddress, None).deregisterSupervisorForActor(actorRef) clientFor(actorRef.homeAddress.get, None).deregisterSupervisorForActor(actorRef)
/** /**
* Clean-up all open connections. * Clean-up all open connections.
@ -486,8 +486,6 @@ class RemoteClientHandler(
*/ */
object RemoteServer { object RemoteServer {
val isRemotingEnabled = config.getList("akka.enabled-modules").exists(_ == "remote") val isRemotingEnabled = config.getList("akka.enabled-modules").exists(_ == "remote")
val UUID_PREFIX = "uuid:"
val MESSAGE_FRAME_SIZE = config.getInt("akka.remote.server.message-frame-size", 1048576) val MESSAGE_FRAME_SIZE = config.getInt("akka.remote.server.message-frame-size", 1048576)
val SECURE_COOKIE = config.getString("akka.remote.secure-cookie") val SECURE_COOKIE = config.getString("akka.remote.secure-cookie")
val REQUIRE_COOKIE = { val REQUIRE_COOKIE = {
@ -563,31 +561,22 @@ class NettyRemoteSupport extends RemoteSupport with NettyRemoteServerModule with
def optimizeLocalScoped_?() = optimizeLocal.get def optimizeLocalScoped_?() = optimizeLocal.get
protected[akka] def actorFor(serviceId: String, className: String, timeout: Long, hostname: String, port: Int, loader: Option[ClassLoader]): ActorRef = { protected[akka] def actorFor(serviceId: String, className: String, timeout: Long, host: String, port: Int, loader: Option[ClassLoader]): ActorRef = {
//TODO: REVISIT: Possible to optimize server-managed actors in local scope? if (optimizeLocalScoped_?) {
//val Host = this.hostname val home = this.address
//val Port = this.port if (host == home.getHostName && port == home.getPort) {//TODO: switch to InetSocketAddres.equals?
val localRef = findActorByIdOrUuid(serviceId,serviceId)
//(host,port) match { if (localRef ne null) return localRef //Code significantly simpler with the return statement
// case (Host, Port) if optimizeLocalScoped_? => }
//if actor with that servicename or uuid is present locally, return a LocalActorRef to that one }
//else return RemoteActorRef(registry, serviceId, className, hostname, port, timeout, false, loader)
// case _ => RemoteActorRef(serviceId, className, host, port, timeout, loader)
// RemoteActorRef(registry, serviceId, className, hostname, port, timeout, false, loader)
//}
RemoteActorRef(serviceId, className, hostname, port, timeout, loader)
} }
def clientManagedActorOf(clazz: Class[_ <: Actor], host: String, port: Int, timeout: Long): ActorRef = { def clientManagedActorOf(factory: () => Actor, host: String, port: Int): ActorRef = {
import ReflectiveAccess.{ createInstance, noParams, noArgs } val ref = new LocalActorRef(factory, Some(new InetSocketAddress(host, port)))
val ref = new LocalActorRef(() => createInstance[Actor](clazz.asInstanceOf[Class[_]], noParams, noArgs).getOrElse( //ref.timeout = timeout //removed because setting default timeout should be done after construction
throw new ActorInitializationException(
"Could not instantiate Actor" +
"\nMake sure Actor is NOT defined inside a class/trait," +
"\nif so put it outside the class/trait, f.e. in a companion object," +
"\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'.")),
new InetSocketAddress(host, port))
ref.timeout = timeout
ref ref
} }
} }
@ -595,6 +584,7 @@ class NettyRemoteSupport extends RemoteSupport with NettyRemoteServerModule with
class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String, val port: Int, val loader: Option[ClassLoader]) { class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String, val port: Int, val loader: Option[ClassLoader]) {
val name = "NettyRemoteServer@" + host + ":" + port val name = "NettyRemoteServer@" + host + ":" + port
val address = new InetSocketAddress(host,port)
private val factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool,Executors.newCachedThreadPool) private val factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool,Executors.newCachedThreadPool)
@ -610,7 +600,7 @@ class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String,
bootstrap.setOption("child.reuseAddress", true) bootstrap.setOption("child.reuseAddress", true)
bootstrap.setOption("child.connectTimeoutMillis", RemoteServer.CONNECTION_TIMEOUT_MILLIS.toMillis) bootstrap.setOption("child.connectTimeoutMillis", RemoteServer.CONNECTION_TIMEOUT_MILLIS.toMillis)
openChannels.add(bootstrap.bind(new InetSocketAddress(host, port))) openChannels.add(bootstrap.bind(address))
serverModule.notifyListeners(RemoteServerStarted(serverModule)) serverModule.notifyListeners(RemoteServerStarted(serverModule))
def shutdown { def shutdown {
@ -630,19 +620,16 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule =>
import RemoteServer._ import RemoteServer._
private[akka] val currentServer = new AtomicReference[Option[NettyRemoteServer]](None) private[akka] val currentServer = new AtomicReference[Option[NettyRemoteServer]](None)
def hostname = currentServer.get match { def address = currentServer.get match {
case Some(s) => s.host case Some(s) => s.address
case None => ReflectiveAccess.Remote.HOSTNAME case None => ReflectiveAccess.Remote.configDefaultAddress
}
def port = currentServer.get match {
case Some(s) => s.port
case None => ReflectiveAccess.Remote.PORT
} }
def name = currentServer.get match { def name = currentServer.get match {
case Some(s) => s.name case Some(s) => s.name
case None => "NettyRemoteServer@" + ReflectiveAccess.Remote.HOSTNAME + ":" + ReflectiveAccess.Remote.PORT case None =>
val a = ReflectiveAccess.Remote.configDefaultAddress
"NettyRemoteServer@" + a.getHostName + ":" + a.getPort
} }
private val _isRunning = new Switch(false) private val _isRunning = new Switch(false)
@ -1096,57 +1083,18 @@ class RemoteServerHandler(
} }
} }
private def findActorById(id: String) : ActorRef = {
server.actors.get(id)
}
private def findActorByUuid(uuid: String) : ActorRef = {
log.slf4j.debug("Trying to find actor for uuid '{}' inside {}",uuid,server.actorsByUuid)
server.actorsByUuid.get(uuid)
}
private def findActorFactory(id: String) : () => ActorRef = {
server.actorsFactories.get(id)
}
private def findSessionActor(id: String, channel: Channel) : ActorRef = { private def findSessionActor(id: String, channel: Channel) : ActorRef = {
val map = sessionActors.get(channel) val map = sessionActors.get(channel)
if (map ne null) map.get(id) if (map ne null) map.get(id)
else null else null
} }
private def findTypedActorById(id: String) : AnyRef = {
server.typedActors.get(id)
}
private def findTypedActorFactory(id: String) : () => AnyRef = {
server.typedActorsFactories.get(id)
}
private def findTypedSessionActor(id: String, channel: Channel) : AnyRef = { private def findTypedSessionActor(id: String, channel: Channel) : AnyRef = {
val map = typedSessionActors.get(channel) val map = typedSessionActors.get(channel)
if (map ne null) map.get(id) if (map ne null) map.get(id)
else null else null
} }
private def findTypedActorByUuid(uuid: String) : AnyRef = {
server.typedActorsByUuid.get(uuid)
}
private def findActorByIdOrUuid(id: String, uuid: String) : ActorRef = {
var actorRefOrNull = if (id.startsWith(UUID_PREFIX)) findActorByUuid(id.substring(UUID_PREFIX.length))
else findActorById(id)
if (actorRefOrNull eq null) actorRefOrNull = findActorByUuid(uuid)
actorRefOrNull
}
private def findTypedActorByIdOrUuid(id: String, uuid: String) : AnyRef = {
var actorRefOrNull = if (id.startsWith(UUID_PREFIX)) findTypedActorByUuid(id.substring(UUID_PREFIX.length))
else findTypedActorById(id)
if (actorRefOrNull eq null) actorRefOrNull = findTypedActorByUuid(uuid)
actorRefOrNull
}
/** /**
* gets the actor from the session, or creates one if there is a factory for it * gets the actor from the session, or creates one if there is a factory for it
*/ */
@ -1159,7 +1107,7 @@ class RemoteServerHandler(
sessionActorRefOrNull sessionActorRefOrNull
} else { } else {
// we dont have it in the session either, see if we have a factory for it // we dont have it in the session either, see if we have a factory for it
val actorFactoryOrNull = findActorFactory(id) val actorFactoryOrNull = server.findActorFactory(id)
if (actorFactoryOrNull ne null) { if (actorFactoryOrNull ne null) {
val actorRef = actorFactoryOrNull() val actorRef = actorFactoryOrNull()
actorRef.uuid = uuidFrom(uuid.getHigh,uuid.getLow) actorRef.uuid = uuidFrom(uuid.getHigh,uuid.getLow)
@ -1185,7 +1133,7 @@ class RemoteServerHandler(
log.slf4j.info("Creating a new client-managed remote actor [{}:{}]", name, uuid) log.slf4j.info("Creating a new client-managed remote actor [{}:{}]", name, uuid)
val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(name) val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(name)
else Class.forName(name) else Class.forName(name)
val actorRef = ActorRegistry.actorOf(clazz.asInstanceOf[Class[_ <: Actor]]) val actorRef = Actor.actorOf(clazz.asInstanceOf[Class[_ <: Actor]])
actorRef.uuid = uuidFrom(uuid.getHigh,uuid.getLow) actorRef.uuid = uuidFrom(uuid.getHigh,uuid.getLow)
actorRef.id = id actorRef.id = id
actorRef.timeout = timeout actorRef.timeout = timeout
@ -1211,7 +1159,7 @@ class RemoteServerHandler(
val uuid = actorInfo.getUuid val uuid = actorInfo.getUuid
val id = actorInfo.getId val id = actorInfo.getId
val actorRefOrNull = findActorByIdOrUuid(id, uuidFrom(uuid.getHigh,uuid.getLow).toString) val actorRefOrNull = server.findActorByIdOrUuid(id, uuidFrom(uuid.getHigh,uuid.getLow).toString)
if (actorRefOrNull ne null) if (actorRefOrNull ne null)
actorRefOrNull actorRefOrNull
@ -1235,7 +1183,7 @@ class RemoteServerHandler(
if (sessionActorRefOrNull ne null) if (sessionActorRefOrNull ne null)
sessionActorRefOrNull sessionActorRefOrNull
else { else {
val actorFactoryOrNull = findTypedActorFactory(id) val actorFactoryOrNull = server.findTypedActorFactory(id)
if (actorFactoryOrNull ne null) { if (actorFactoryOrNull ne null) {
val newInstance = actorFactoryOrNull() val newInstance = actorFactoryOrNull()
typedSessionActors.get(channel).put(id, newInstance) typedSessionActors.get(channel).put(id, newInstance)
@ -1280,7 +1228,7 @@ class RemoteServerHandler(
val uuid = actorInfo.getUuid val uuid = actorInfo.getUuid
val id = actorInfo.getId val id = actorInfo.getId
val typedActorOrNull = findTypedActorByIdOrUuid(id, uuidFrom(uuid.getHigh,uuid.getLow).toString) val typedActorOrNull = server.findTypedActorByIdOrUuid(id, uuidFrom(uuid.getHigh,uuid.getLow).toString)
if (typedActorOrNull ne null) if (typedActorOrNull ne null)
typedActorOrNull typedActorOrNull
else else

View file

@ -17,7 +17,6 @@ import scala.collection.immutable.Stack
import com.google.protobuf.ByteString import com.google.protobuf.ByteString
import akka.util.ReflectiveAccess import akka.util.ReflectiveAccess
import akka.util.ReflectiveAccess.Remote.{HOSTNAME,PORT}
import java.net.InetSocketAddress import java.net.InetSocketAddress
/** /**
@ -91,11 +90,14 @@ object ActorSerialization {
def toBinaryJ[T <: Actor](a: ActorRef, format: Format[T], srlMailBox: Boolean = true): Array[Byte] = def toBinaryJ[T <: Actor](a: ActorRef, format: Format[T], srlMailBox: Boolean = true): Array[Byte] =
toBinary(a, srlMailBox)(format) toBinary(a, srlMailBox)(format)
private[akka] def toAddressProtocol(actorRef: ActorRef) = private[akka] def toAddressProtocol(actorRef: ActorRef) = {
val address = actorRef.homeAddress.getOrElse(ActorRegistry.remote.address)
AddressProtocol.newBuilder AddressProtocol.newBuilder
.setHostname(actorRef.homeAddress.getHostName) .setHostname(address.getHostName)
.setPort(actorRef.homeAddress.getPort) .setPort(address.getPort)
.build .build
}
private[akka] def toSerializedActorRefProtocol[T <: Actor]( private[akka] def toSerializedActorRefProtocol[T <: Actor](
actorRef: ActorRef, format: Format[T], serializeMailBox: Boolean = true): SerializedActorRefProtocol = { actorRef: ActorRef, format: Format[T], serializeMailBox: Boolean = true): SerializedActorRefProtocol = {
@ -129,7 +131,7 @@ object ActorSerialization {
messages.map(m => messages.map(m =>
RemoteActorSerialization.createRemoteMessageProtocolBuilder( RemoteActorSerialization.createRemoteMessageProtocolBuilder(
Some(actorRef), Some(actorRef),
Left(actorRef.uuid), //TODO: REVISIT: generate uuid for the request Left(actorRef.uuid),
actorRef.id, actorRef.id,
actorRef.actorClassName, actorRef.actorClassName,
actorRef.timeout, actorRef.timeout,
@ -201,7 +203,7 @@ object ActorSerialization {
supervisor, supervisor,
hotswap, hotswap,
factory, factory,
new InetSocketAddress(protocol.getOriginalAddress.getHostname,protocol.getOriginalAddress.getPort)) None) //TODO: shouldn't originalAddress be optional?
val messages = protocol.getMessagesList.toArray.toList.asInstanceOf[List[RemoteMessageProtocol]] val messages = protocol.getMessagesList.toArray.toList.asInstanceOf[List[RemoteMessageProtocol]]
messages.foreach(message => ar ! MessageSerializer.deserialize(message.getMessage)) messages.foreach(message => ar ! MessageSerializer.deserialize(message.getMessage))

View file

@ -27,8 +27,9 @@ class AkkaRemoteTest extends
val remote = ActorRegistry.remote val remote = ActorRegistry.remote
val unit = TimeUnit.SECONDS val unit = TimeUnit.SECONDS
val host = remote.hostname
val port = remote.port val host = "localhost"
val port = 25520
var optimizeLocal_? = remote.asInstanceOf[NettyRemoteSupport].optimizeLocalScoped_? var optimizeLocal_? = remote.asInstanceOf[NettyRemoteSupport].optimizeLocalScoped_?
@ -41,7 +42,7 @@ class AkkaRemoteTest extends
} }
override def beforeEach { override def beforeEach {
remote.start() remote.start(host,port)
Thread.sleep(2000) Thread.sleep(2000)
super.beforeEach super.beforeEach
} }

View file

@ -4,23 +4,11 @@
package akka.actor.remote package akka.actor.remote
import org.scalatest.matchers.ShouldMatchers
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
import akka.config.Supervision._ import akka.config.Supervision._
import akka.actor._ import akka.actor._
import akka.remote.{RemoteServer, RemoteClient}
import java.util.concurrent.{LinkedBlockingQueue, TimeUnit, BlockingQueue} import java.util.concurrent.{LinkedBlockingQueue, TimeUnit, BlockingQueue}
import org.scalatest.{BeforeAndAfterEach, Spec, Assertions, BeforeAndAfterAll} import akka.config. {RemoteAddress, Config, TypedActorConfigurator}
import akka.config.{Config, TypedActorConfigurator, RemoteAddress}
/* THIS SHOULD BE UNCOMMENTED
object RemoteTypedActorSpec {
val HOSTNAME = "localhost"
val PORT = 9988
var server: RemoteServer = null
}*/
object RemoteTypedActorLog { object RemoteTypedActorLog {
val messageLog: BlockingQueue[String] = new LinkedBlockingQueue[String] val messageLog: BlockingQueue[String] = new LinkedBlockingQueue[String]
@ -31,22 +19,17 @@ object RemoteTypedActorLog {
oneWayLog.clear oneWayLog.clear
} }
} }
/* THIS SHOULD BE UNCOMMENTED
@RunWith(classOf[JUnitRunner]) class RemoteTypedActorSpec extends AkkaRemoteTest {
class RemoteTypedActorSpec extends
Spec with
ShouldMatchers with
BeforeAndAfterEach with BeforeAndAfterAll {
import RemoteTypedActorLog._ import RemoteTypedActorLog._
import RemoteTypedActorSpec._
private val conf = new TypedActorConfigurator private var conf: TypedActorConfigurator = _
override def beforeAll = { override def beforeEach {
server = new RemoteServer() super.beforeEach
server.start("localhost", 9995)
Config.config Config.config
conf = new TypedActorConfigurator
conf.configure( conf.configure(
new AllForOneStrategy(List(classOf[Exception]), 3, 5000), new AllForOneStrategy(List(classOf[Exception]), 3, 5000),
List( List(
@ -55,74 +38,57 @@ class RemoteTypedActorSpec extends
classOf[RemoteTypedActorOneImpl], classOf[RemoteTypedActorOneImpl],
Permanent, Permanent,
10000, 10000,
new RemoteAddress("localhost", 9995)), RemoteAddress(host,port)),
new SuperviseTypedActor( new SuperviseTypedActor(
classOf[RemoteTypedActorTwo], classOf[RemoteTypedActorTwo],
classOf[RemoteTypedActorTwoImpl], classOf[RemoteTypedActorTwoImpl],
Permanent, Permanent,
10000, 10000,
new RemoteAddress("localhost", 9995)) RemoteAddress(host,port))
).toArray).supervise ).toArray).supervise
}
override def afterEach {
clearMessageLogs
conf.stop
super.afterEach
Thread.sleep(1000) Thread.sleep(1000)
} }
override def afterAll = { "Remote Typed Actor " should {
conf.stop
try {
server.shutdown
RemoteClient.shutdownAll
Thread.sleep(1000)
} catch {
case e => ()
}
ActorRegistry.shutdownAll
}
override def afterEach() { /*"receives one-way message" in {
server.typedActors.clear
}
describe("Remote Typed Actor ") {
it("should receive one-way message") {
clearMessageLogs
val ta = conf.getInstance(classOf[RemoteTypedActorOne]) val ta = conf.getInstance(classOf[RemoteTypedActorOne])
expect("oneway") { ta.oneWay
ta.oneWay oneWayLog.poll(5, TimeUnit.SECONDS) must equal ("oneway")
oneWayLog.poll(5, TimeUnit.SECONDS)
}
} }
it("should respond to request-reply message") { "responds to request-reply message" in {
clearMessageLogs val ta = conf.getInstance(classOf[RemoteTypedActorOne])
ta.requestReply("ping") must equal ("pong")
} */
"be restarted on failure" in {
val ta = conf.getInstance(classOf[RemoteTypedActorOne]) val ta = conf.getInstance(classOf[RemoteTypedActorOne])
expect("pong") { try {
ta.requestReply("ping")
}
}
it("should be restarted on failure") {
clearMessageLogs
val ta = conf.getInstance(classOf[RemoteTypedActorOne])
intercept[RuntimeException] {
ta.requestReply("die") ta.requestReply("die")
} fail("Shouldn't get here")
messageLog.poll(5, TimeUnit.SECONDS) should equal ("Expected exception; to test fault-tolerance") } catch { case re: RuntimeException if re.getMessage == "Expected exception; to test fault-tolerance" => }
messageLog.poll(5, TimeUnit.SECONDS) must equal ("Expected exception; to test fault-tolerance")
} }
it("should restart linked friends on failure") { /* "restarts linked friends on failure" in {
clearMessageLogs
val ta1 = conf.getInstance(classOf[RemoteTypedActorOne]) val ta1 = conf.getInstance(classOf[RemoteTypedActorOne])
val ta2 = conf.getInstance(classOf[RemoteTypedActorTwo]) val ta2 = conf.getInstance(classOf[RemoteTypedActorTwo])
intercept[RuntimeException] { try {
ta1.requestReply("die") ta1.requestReply("die")
} fail("Shouldn't get here")
messageLog.poll(5, TimeUnit.SECONDS) should equal ("Expected exception; to test fault-tolerance") } catch { case re: RuntimeException if re.getMessage == "Expected exception; to test fault-tolerance" => }
messageLog.poll(5, TimeUnit.SECONDS) should equal ("Expected exception; to test fault-tolerance") messageLog.poll(5, TimeUnit.SECONDS) must equal ("Expected exception; to test fault-tolerance")
} messageLog.poll(5, TimeUnit.SECONDS) must equal ("Expected exception; to test fault-tolerance")
}*/
} }
} */ }

View file

@ -1,16 +1,10 @@
package akka.actor.remote package akka.actor.remote
import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.concurrent.{CountDownLatch, TimeUnit}
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
import akka.util._
import akka.actor.Actor._ import akka.actor.Actor._
import akka.actor.{ActorRegistry, ActorRef, Actor} import akka.actor.{ActorRegistry, ActorRef, Actor}
import akka.remote. {NettyRemoteSupport, RemoteServer, RemoteClient} import akka.remote. {NettyRemoteSupport}
object ServerInitiatedRemoteActorSpec { object ServerInitiatedRemoteActorSpec {
case class Send(actor: ActorRef) case class Send(actor: ActorRef)

View file

@ -4,13 +4,6 @@
package akka.actor.remote package akka.actor.remote
import org.scalatest._
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
import akka.actor._ import akka.actor._
import akka.actor.Actor._ import akka.actor.Actor._
import akka.remote.NettyRemoteSupport import akka.remote.NettyRemoteSupport

View file

@ -2,137 +2,99 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se> * Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/ */
/* THIS SHOULD BE UNCOMMENTED
package akka.actor.remote package akka.actor.remote
import org.scalatest.Spec
import org.scalatest.matchers.ShouldMatchers
import org.scalatest.BeforeAndAfterAll
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import akka.remote.{RemoteServer, RemoteClient} import akka.remote.{RemoteServer, RemoteClient}
import akka.actor._ import akka.actor._
import RemoteTypedActorLog._ import RemoteTypedActorLog._
object ServerInitiatedRemoteTypedActorSpec { class ServerInitiatedRemoteTypedActorSpec extends AkkaRemoteTest {
val HOSTNAME = "localhost"
val PORT = 9990 override def beforeEach = {
var server: RemoteServer = null super.beforeEach
val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000)
remote.registerTypedActor("typed-actor-service", typedActor)
}
override def afterEach {
super.afterEach
clearMessageLogs
}
def createRemoteActorRef = remote.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", 5000L, host, port)
"Server managed remote typed Actor " should {
"receive one-way message" in {
val actor = createRemoteActorRef
actor.oneWay
oneWayLog.poll(5, TimeUnit.SECONDS) must equal ("oneway")
}
"should respond to request-reply message" in {
val actor = createRemoteActorRef
actor.requestReply("ping") must equal ("pong")
}
"should not recreate registered actors" in {
val actor = createRemoteActorRef
val numberOfActorsInRegistry = ActorRegistry.actors.length
actor.oneWay
oneWayLog.poll(5, TimeUnit.SECONDS) must equal ("oneway")
numberOfActorsInRegistry must be (ActorRegistry.actors.length)
}
"should support multiple variants to get the actor from client side" in {
var actor = createRemoteActorRef
actor.oneWay
oneWayLog.poll(5, TimeUnit.SECONDS) must equal ("oneway")
actor = remote.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", host, port)
actor.oneWay
oneWayLog.poll(5, TimeUnit.SECONDS) must equal ("oneway")
actor = remote.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", 5000L, host, port, this.getClass().getClassLoader)
actor.oneWay
oneWayLog.poll(5, TimeUnit.SECONDS) must equal ("oneway")
}
"should register and unregister typed actors" in {
val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000)
remote.registerTypedActor("my-test-service", typedActor)
remote.typedActors.get("my-test-service") must not be (null)
remote.unregisterTypedActor("my-test-service")
remote.typedActors.get("my-test-service") must be (null)
}
"should register and unregister typed actors by uuid" in {
val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000)
val init = AspectInitRegistry.initFor(typedActor)
val uuid = "uuid:" + init.actorRef.uuid
remote.registerTypedActor(uuid, typedActor)
remote.typedActorsByUuid.get(init.actorRef.uuid.toString) must not be (null)
remote.unregisterTypedActor(uuid)
remote.typedActorsByUuid.get(init.actorRef.uuid.toString) must be (null)
}
"should find typed actors by uuid" in {
val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000)
val init = AspectInitRegistry.initFor(typedActor)
val uuid = "uuid:" + init.actorRef.uuid
remote.registerTypedActor(uuid, typedActor)
remote.typedActorsByUuid.get(init.actorRef.uuid.toString) must not be (null)
val actor = remote.typedActorFor(classOf[RemoteTypedActorOne], uuid, host, port)
actor.oneWay
oneWayLog.poll(5, TimeUnit.SECONDS) must equal ("oneway")
}
}
} }
@RunWith(classOf[JUnitRunner])
class ServerInitiatedRemoteTypedActorSpec extends
Spec with
ShouldMatchers with
BeforeAndAfterAll {
import ServerInitiatedRemoteTypedActorSpec._
private val unit = TimeUnit.MILLISECONDS
override def beforeAll = {
server = new RemoteServer()
server.start(HOSTNAME, PORT)
val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000)
server.registerTypedActor("typed-actor-service", typedActor)
Thread.sleep(1000)
}
// make sure the servers shutdown cleanly after the test has finished
override def afterAll = {
try {
server.shutdown
RemoteClient.shutdownAll
Thread.sleep(1000)
} catch {
case e => ()
}
}
describe("Server managed remote typed Actor ") {
it("should receive one-way message") {
clearMessageLogs
val actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", 5000L, HOSTNAME, PORT)
expect("oneway") {
actor.oneWay
oneWayLog.poll(5, TimeUnit.SECONDS)
}
}
it("should respond to request-reply message") {
clearMessageLogs
val actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", 5000L, HOSTNAME, PORT)
expect("pong") {
actor.requestReply("ping")
}
}
it("should not recreate registered actors") {
val actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", 5000L, HOSTNAME, PORT)
val numberOfActorsInRegistry = ActorRegistry.actors.length
expect("oneway") {
actor.oneWay
oneWayLog.poll(5, TimeUnit.SECONDS)
}
assert(numberOfActorsInRegistry === ActorRegistry.actors.length)
}
it("should support multiple variants to get the actor from client side") {
var actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", 5000L, HOSTNAME, PORT)
expect("oneway") {
actor.oneWay
oneWayLog.poll(5, TimeUnit.SECONDS)
}
actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", HOSTNAME, PORT)
expect("oneway") {
actor.oneWay
oneWayLog.poll(5, TimeUnit.SECONDS)
}
actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", 5000L, HOSTNAME, PORT, this.getClass().getClassLoader)
expect("oneway") {
actor.oneWay
oneWayLog.poll(5, TimeUnit.SECONDS)
}
}
it("should register and unregister typed actors") {
val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000)
server.registerTypedActor("my-test-service", typedActor)
assert(server.typedActors.get("my-test-service") ne null, "typed actor registered")
server.unregisterTypedActor("my-test-service")
assert(server.typedActors.get("my-test-service") eq null, "typed actor unregistered")
}
it("should register and unregister typed actors by uuid") {
val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000)
val init = AspectInitRegistry.initFor(typedActor)
val uuid = "uuid:" + init.actorRef.uuid
server.registerTypedActor(uuid, typedActor)
assert(server.typedActorsByUuid.get(init.actorRef.uuid.toString) ne null, "typed actor registered")
server.unregisterTypedActor(uuid)
assert(server.typedActorsByUuid.get(init.actorRef.uuid.toString) eq null, "typed actor unregistered")
}
it("should find typed actors by uuid") {
val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000)
val init = AspectInitRegistry.initFor(typedActor)
val uuid = "uuid:" + init.actorRef.uuid
server.registerTypedActor(uuid, typedActor)
assert(server.typedActorsByUuid.get(init.actorRef.uuid.toString) ne null, "typed actor registered")
val actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], uuid, HOSTNAME, PORT)
expect("oneway") {
actor.oneWay
oneWayLog.poll(5, TimeUnit.SECONDS)
}
}
}
} */

View file

@ -2,108 +2,74 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se> * Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/ */
/* THIS SHOULD BE UNCOMMENTED
package akka.actor.remote package akka.actor.remote
import org.scalatest._
import org.scalatest.matchers.ShouldMatchers
import org.scalatest.BeforeAndAfterAll
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
import java.util.concurrent.TimeUnit
import akka.remote.{RemoteServer, RemoteClient}
import akka.actor._ import akka.actor._
import RemoteTypedActorLog._ import RemoteTypedActorLog._
object ServerInitiatedRemoteTypedSessionActorSpec { class ServerInitiatedRemoteTypedSessionActorSpec extends AkkaRemoteTest {
val HOSTNAME = "localhost"
val PORT = 9990
var server: RemoteServer = null
}
@RunWith(classOf[JUnitRunner])
class ServerInitiatedRemoteTypedSessionActorSpec extends
FlatSpec with
ShouldMatchers with
BeforeAndAfterEach {
import ServerInitiatedRemoteTypedActorSpec._
private val unit = TimeUnit.MILLISECONDS
override def beforeEach = { override def beforeEach = {
server = new RemoteServer() super.beforeEach
server.start(HOSTNAME, PORT)
server.registerTypedPerSessionActor("typed-session-actor-service", remote.registerTypedPerSessionActor("typed-session-actor-service",
TypedActor.newInstance(classOf[RemoteTypedSessionActor], classOf[RemoteTypedSessionActorImpl], 1000)) TypedActor.newInstance(classOf[RemoteTypedSessionActor], classOf[RemoteTypedSessionActorImpl], 1000))
Thread.sleep(1000)
} }
// make sure the servers shutdown cleanly after the test has finished // make sure the servers shutdown cleanly after the test has finished
override def afterEach = { override def afterEach = {
try { super.afterEach
server.shutdown clearMessageLogs
RemoteClient.shutdownAll }
"A remote session Actor" should {
"create a new session actor per connection" in {
val session1 = remote.typedActorFor(classOf[RemoteTypedSessionActor], "typed-session-actor-service", 5000L, host, port)
session1.getUser() must equal ("anonymous")
session1.login("session[1]")
session1.getUser() must equal ("session[1]")
remote.shutdownClientModule
val session2 = remote.typedActorFor(classOf[RemoteTypedSessionActor], "typed-session-actor-service", 5000L, host, port)
session2.getUser() must equal ("anonymous")
}
"stop the actor when the client disconnects" in {
val session1 = remote.typedActorFor(classOf[RemoteTypedSessionActor], "typed-session-actor-service", 5000L, host, port)
session1.getUser() must equal ("anonymous")
RemoteTypedSessionActorImpl.getInstances() must have size (1)
remote.shutdownClientModule
Thread.sleep(1000) Thread.sleep(1000)
} catch { RemoteTypedSessionActorImpl.getInstances() must have size (0)
case e => ()
}
"stop the actor when there is an error" in {
val session1 = remote.typedActorFor(classOf[RemoteTypedSessionActor], "typed-session-actor-service", 5000L, host, port)
session1.doSomethingFunny()
remote.shutdownClientModule
Thread.sleep(1000)
RemoteTypedSessionActorImpl.getInstances() must have size (0)
}
"be able to unregister" in {
remote.registerTypedPerSessionActor("my-service-1",TypedActor.newInstance(classOf[RemoteTypedSessionActor], classOf[RemoteTypedSessionActorImpl], 1000))
remote.typedActorsFactories.get("my-service-1") must not be (null)
remote.unregisterTypedPerSessionActor("my-service-1")
remote.typedActorsFactories.get("my-service-1") must be (null)
} }
} }
}
"A remote session Actor" should "create a new session actor per connection" in {
clearMessageLogs
val session1 = RemoteClient.typedActorFor(classOf[RemoteTypedSessionActor], "typed-session-actor-service", 5000L, HOSTNAME, PORT)
session1.getUser() should equal ("anonymous")
session1.login("session[1]")
session1.getUser() should equal ("session[1]")
RemoteClient.shutdownAll
val session2 = RemoteClient.typedActorFor(classOf[RemoteTypedSessionActor], "typed-session-actor-service", 5000L, HOSTNAME, PORT)
session2.getUser() should equal ("anonymous")
}
it should "stop the actor when the client disconnects" in {
val session1 = RemoteClient.typedActorFor(classOf[RemoteTypedSessionActor], "typed-session-actor-service", 5000L, HOSTNAME, PORT)
session1.getUser() should equal ("anonymous")
RemoteTypedSessionActorImpl.getInstances() should have size (1)
RemoteClient.shutdownAll
Thread.sleep(1000)
RemoteTypedSessionActorImpl.getInstances() should have size (0)
}
it should "stop the actor when there is an error" in {
val session1 = RemoteClient.typedActorFor(classOf[RemoteTypedSessionActor], "typed-session-actor-service", 5000L, HOSTNAME, PORT)
session1.doSomethingFunny()
RemoteClient.shutdownAll
Thread.sleep(1000)
RemoteTypedSessionActorImpl.getInstances() should have size (0)
}
it should "be able to unregister" in {
server.registerTypedPerSessionActor("my-service-1",TypedActor.newInstance(classOf[RemoteTypedSessionActor], classOf[RemoteTypedSessionActorImpl], 1000))
server.typedActorsFactories.get("my-service-1") should not be (null)
server.unregisterTypedPerSessionActor("my-service-1")
server.typedActorsFactories.get("my-service-1") should be (null)
}
}*/

View file

@ -2,49 +2,30 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se> * Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/ */
/* THIS SHOULD BE UNCOMMENTED
package akka.actor.serialization
import org.scalatest.Spec package akka.actor.serialization
import org.scalatest.matchers.ShouldMatchers
import org.scalatest.BeforeAndAfterAll
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
import akka.serialization._ import akka.serialization._
import akka.actor._ import akka.actor._
import TypedActorSerialization._ import TypedActorSerialization._
import Actor._ import Actor._
import akka.remote.{RemoteClient, RemoteServer}
import akka.actor.remote.ServerInitiatedRemoteActorSpec.RemoteActorSpecActorUnidirectional import akka.actor.remote.ServerInitiatedRemoteActorSpec.RemoteActorSpecActorUnidirectional
import akka.actor.remote.AkkaRemoteTest
@RunWith(classOf[JUnitRunner]) class TypedActorSerializationSpec extends AkkaRemoteTest {
class TypedActorSerializationSpec extends
Spec with
ShouldMatchers with
BeforeAndAfterAll {
var server1: RemoteServer = null
var typedActor: MyTypedActor = null var typedActor: MyTypedActor = null
override def beforeAll = { override def beforeAll = {
server1 = new RemoteServer().start("localhost", 9991) super.beforeAll
typedActor = TypedActor.newInstance(classOf[MyTypedActor], classOf[MyTypedActorImpl], 1000) typedActor = TypedActor.newInstance(classOf[MyTypedActor], classOf[MyTypedActorImpl], 1000)
server1.registerTypedActor("typed-actor-service", typedActor) remote.registerTypedActor("typed-actor-service", typedActor)
Thread.sleep(1000)
} }
// make sure the servers shutdown cleanly after the test has finished // make sure the servers shutdown cleanly after the test has finished
override def afterAll = { override def afterAll = {
try { TypedActor.stop(typedActor)
TypedActor.stop(typedActor) super.afterAll
server1.shutdown
RemoteClient.shutdownAll
Thread.sleep(1000)
} catch {
case e => ()
}
} }
object MyTypedStatelessActorFormat extends StatelessActorFormat[MyStatelessTypedActorImpl] object MyTypedStatelessActorFormat extends StatelessActorFormat[MyStatelessTypedActorImpl]
@ -71,48 +52,48 @@ class TypedActorSerializationSpec extends
} }
describe("Serializable typed actor") { "Serializable typed actor" should {
it("should be able to serialize and de-serialize a stateless typed actor") { "should be able to serialize and de-serialize a stateless typed actor" in {
val typedActor1 = TypedActor.newInstance(classOf[MyTypedActor], classOf[MyStatelessTypedActorImpl], 1000) val typedActor1 = TypedActor.newInstance(classOf[MyTypedActor], classOf[MyStatelessTypedActorImpl], 1000)
typedActor1.requestReply("hello") should equal("world") typedActor1.requestReply("hello") must equal("world")
typedActor1.requestReply("hello") should equal("world") typedActor1.requestReply("hello") must equal("world")
val bytes = toBinaryJ(typedActor1, MyTypedStatelessActorFormat) val bytes = toBinaryJ(typedActor1, MyTypedStatelessActorFormat)
val typedActor2: MyTypedActor = fromBinaryJ(bytes, MyTypedStatelessActorFormat) val typedActor2: MyTypedActor = fromBinaryJ(bytes, MyTypedStatelessActorFormat)
typedActor2.requestReply("hello") should equal("world") typedActor2.requestReply("hello") must equal("world")
} }
it("should be able to serialize and de-serialize a stateful typed actor") { "should be able to serialize and de-serialize a stateful typed actor" in {
val typedActor1 = TypedActor.newInstance(classOf[MyTypedActor], classOf[MyTypedActorImpl], 1000) val typedActor1 = TypedActor.newInstance(classOf[MyTypedActor], classOf[MyTypedActorImpl], 1000)
typedActor1.requestReply("hello") should equal("world 1") typedActor1.requestReply("hello") must equal("world 1")
typedActor1.requestReply("scala") should equal("hello scala 2") typedActor1.requestReply("scala") must equal("hello scala 2")
val f = new MyTypedActorFormat val f = new MyTypedActorFormat
val bytes = toBinaryJ(typedActor1, f) val bytes = toBinaryJ(typedActor1, f)
val typedActor2: MyTypedActor = fromBinaryJ(bytes, f) val typedActor2: MyTypedActor = fromBinaryJ(bytes, f)
typedActor2.requestReply("hello") should equal("world 3") typedActor2.requestReply("hello") must equal("world 3")
} }
it("should be able to serialize and de-serialize a stateful typed actor with compound state") { "should be able to serialize and de-serialize a stateful typed actor with compound state" in {
val typedActor1 = TypedActor.newInstance(classOf[MyTypedActor], classOf[MyTypedActorWithDualCounter], 1000) val typedActor1 = TypedActor.newInstance(classOf[MyTypedActor], classOf[MyTypedActorWithDualCounter], 1000)
typedActor1.requestReply("hello") should equal("world 1 1") typedActor1.requestReply("hello") must equal("world 1 1")
typedActor1.requestReply("hello") should equal("world 2 2") typedActor1.requestReply("hello") must equal("world 2 2")
val f = new MyTypedActorWithDualCounterFormat val f = new MyTypedActorWithDualCounterFormat
val bytes = toBinaryJ(typedActor1, f) val bytes = toBinaryJ(typedActor1, f)
val typedActor2: MyTypedActor = fromBinaryJ(bytes, f) val typedActor2: MyTypedActor = fromBinaryJ(bytes, f)
typedActor2.requestReply("hello") should equal("world 3 3") typedActor2.requestReply("hello") must equal("world 3 3")
} }
it("should be able to serialize a local yped actor ref to a remote typed actor ref proxy") { "should be able to serialize a local yped actor ref to a remote typed actor ref proxy" in {
val typedActor1 = TypedActor.newInstance(classOf[MyTypedActor], classOf[MyStatelessTypedActorImpl], 1000) val typedActor1 = TypedActor.newInstance(classOf[MyTypedActor], classOf[MyStatelessTypedActorImpl], 1000)
typedActor1.requestReply("hello") should equal("world") typedActor1.requestReply("hello") must equal("world")
typedActor1.requestReply("hello") should equal("world") typedActor1.requestReply("hello") must equal("world")
val bytes = RemoteTypedActorSerialization.toBinary(typedActor1) val bytes = RemoteTypedActorSerialization.toBinary(typedActor1)
val typedActor2: MyTypedActor = RemoteTypedActorSerialization.fromBinaryToRemoteTypedActorRef(bytes) val typedActor2: MyTypedActor = RemoteTypedActorSerialization.fromBinaryToRemoteTypedActorRef(bytes)
typedActor1.requestReply("hello") should equal("world") typedActor1.requestReply("hello") must equal("world")
} }
} }
} }
@ -165,6 +146,4 @@ class MyStatelessTypedActorImpl extends TypedActor with MyTypedActor {
override def requestReply(message: String) : String = { override def requestReply(message: String) : String = {
if (message == "hello") "world" else ("hello " + message) if (message == "hello") "world" else ("hello " + message)
} }
} }
*/

View file

@ -478,21 +478,29 @@ object TypedActor extends Logging {
* @param factory factory method that constructs the typed actor * @param factory factory method that constructs the typed actor
* @paramm config configuration object fo the typed actor * @paramm config configuration object fo the typed actor
*/ */
def newInstance[T](intfClass: Class[T], factory: => AnyRef, config: TypedActorConfiguration): T = { def newInstance[T](intfClass: Class[T], factory: => AnyRef, config: TypedActorConfiguration): T =
val actorRef = actorOf(newTypedActor(factory)) newInstance(intfClass, createActorRef(newTypedActor(factory),config), config)
newInstance(intfClass, actorRef, config)
/**
* Creates an ActorRef, can be local only or client-managed-remote
*/
private[akka] def createActorRef(typedActor: => TypedActor, config: TypedActorConfiguration): ActorRef = {
config match {
case null => actorOf(typedActor)
case c: TypedActorConfiguration if (c._host.isDefined) =>
actorOf(typedActor, c._host.get.getHostName, c._host.get.getPort)
case _ => actorOf(typedActor)
}
} }
/** /**
* Factory method for typed actor. * Factory method for typed actor.
* @param intfClass interface the typed actor implements * @param intfClass interface the typed actor implements
* @param targetClass implementation class of the typed actor * @param targetClass implementation class of the typed actor
* @paramm config configuration object fo the typed actor * @paramm config configuration object fo the typed actor
*/ */
def newInstance[T](intfClass: Class[T], targetClass: Class[_], config: TypedActorConfiguration): T = { def newInstance[T](intfClass: Class[T], targetClass: Class[_], config: TypedActorConfiguration): T =
val actorRef = actorOf(newTypedActor(targetClass)) newInstance(intfClass, createActorRef(newTypedActor(targetClass),config), config)
newInstance(intfClass, actorRef, config)
}
private[akka] def newInstance[T](intfClass: Class[T], actorRef: ActorRef): T = { private[akka] def newInstance[T](intfClass: Class[T], actorRef: ActorRef): T = {
if (!actorRef.actorInstance.get.isInstanceOf[TypedActor]) throw new IllegalArgumentException("ActorRef is not a ref to a typed actor") if (!actorRef.actorInstance.get.isInstanceOf[TypedActor]) throw new IllegalArgumentException("ActorRef is not a ref to a typed actor")
@ -512,9 +520,9 @@ object TypedActor extends Logging {
typedActor.initialize(proxy) typedActor.initialize(proxy)
if (config._messageDispatcher.isDefined) actorRef.dispatcher = config._messageDispatcher.get if (config._messageDispatcher.isDefined) actorRef.dispatcher = config._messageDispatcher.get
if (config._threadBasedDispatcher.isDefined) actorRef.dispatcher = Dispatchers.newThreadBasedDispatcher(actorRef) if (config._threadBasedDispatcher.isDefined) actorRef.dispatcher = Dispatchers.newThreadBasedDispatcher(actorRef)
if (config._host.isDefined) log.slf4j.warn("Client-managed typed actors are not supported!") //TODO: REVISIT: FIXME
actorRef.timeout = config.timeout actorRef.timeout = config.timeout
AspectInitRegistry.register(proxy, AspectInit(intfClass, typedActor, actorRef, None, actorRef.timeout)) //TODO: REVISIT fix Client managed typed actor log.slf4j.warn("config._host for {} is {} but homeAddress is {}",intfClass, config._host)
AspectInitRegistry.register(proxy, AspectInit(intfClass, typedActor, actorRef, config._host, actorRef.timeout)) //TODO: REVISIT fix Client managed typed actor
actorRef.start actorRef.start
proxy.asInstanceOf[T] proxy.asInstanceOf[T]
} }
@ -582,7 +590,7 @@ object TypedActor extends Logging {
val jProxy = JProxy.newProxyInstance(intfClass.getClassLoader(), interfaces, handler) val jProxy = JProxy.newProxyInstance(intfClass.getClassLoader(), interfaces, handler)
val awProxy = Proxy.newInstance(interfaces, Array(jProxy, jProxy), true, false) val awProxy = Proxy.newInstance(interfaces, Array(jProxy, jProxy), true, false)
AspectInitRegistry.register(awProxy, AspectInit(intfClass, null, actorRef, None, 5000L)) AspectInitRegistry.register(awProxy, AspectInit(intfClass, null, actorRef, None, 5000L)) //TODO: does .homeAddress work here or do we need to check if it's local and then provide None?
awProxy.asInstanceOf[T] awProxy.asInstanceOf[T]
} }

View file

@ -108,26 +108,24 @@ private[akka] class TypedActorGuiceConfigurator extends TypedActorConfiguratorBa
val implementationClass = component.target val implementationClass = component.target
val timeout = component.timeout val timeout = component.timeout
val actorRef = Actor.actorOf(TypedActor.newTypedActor(implementationClass)) val (remoteAddress,actorRef) =
component.remoteAddress match {
case Some(a) =>
(Some(new InetSocketAddress(a.hostname, a.port)),
Actor.actorOf(TypedActor.newTypedActor(implementationClass), a.hostname, a.port))
case None =>
(None, Actor.actorOf(TypedActor.newTypedActor(implementationClass)))
}
actorRef.timeout = timeout actorRef.timeout = timeout
if (component.dispatcher.isDefined) actorRef.dispatcher = component.dispatcher.get if (component.dispatcher.isDefined) actorRef.dispatcher = component.dispatcher.get
val typedActor = actorRef.actorInstance.get.asInstanceOf[TypedActor] val typedActor = actorRef.actorInstance.get.asInstanceOf[TypedActor]
val proxy = Proxy.newInstance(Array(interfaceClass), Array(typedActor), true, false) val proxy = Proxy.newInstance(Array(interfaceClass), Array(typedActor), true, false)
/*
val remoteAddress =
if (component.remoteAddress.isDefined)
Some(new InetSocketAddress(component.remoteAddress.get.hostname, component.remoteAddress.get.port))
else None
remoteAddress.foreach { address =>
actorRef.makeRemote(remoteAddress.get)
}*/
AspectInitRegistry.register( AspectInitRegistry.register(
proxy, proxy,
AspectInit(interfaceClass, typedActor, actorRef, None, timeout)) //TODO: REVISIT: FIX CLIENT MANAGED ACTORS AspectInit(interfaceClass, typedActor, actorRef, remoteAddress, timeout))
typedActor.initialize(proxy) typedActor.initialize(proxy)
actorRef.start actorRef.start