diff --git a/akka-actor/src/main/scala/actor/ActorRef.scala b/akka-actor/src/main/scala/actor/ActorRef.scala
index b250e16ff3..e78edba96a 100644
--- a/akka-actor/src/main/scala/actor/ActorRef.scala
+++ b/akka-actor/src/main/scala/actor/ActorRef.scala
@@ -1363,7 +1363,8 @@ private[akka] case class RemoteActorRef private[akka] (
val hostname: String,
val port: Int,
_timeout: Long,
- loader: Option[ClassLoader])
+ loader: Option[ClassLoader],
+ val actorType: ActorType = ActorType.ScalaActor)
extends ActorRef with ScalaActorRef {
ensureRemotingEnabled
@@ -1376,7 +1377,7 @@ private[akka] case class RemoteActorRef private[akka] (
def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit =
RemoteClientModule.send[Any](
- message, senderOption, None, remoteAddress.get, timeout, true, this, None, ActorType.ScalaActor)
+ message, senderOption, None, remoteAddress.get, timeout, true, this, None, actorType)
def postMessageToMailboxAndCreateFutureResultWithTimeout[T](
message: Any,
@@ -1384,7 +1385,7 @@ private[akka] case class RemoteActorRef private[akka] (
senderOption: Option[ActorRef],
senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = {
val future = RemoteClientModule.send[T](
- message, senderOption, senderFuture, remoteAddress.get, timeout, false, this, None, ActorType.ScalaActor)
+ message, senderOption, senderFuture, remoteAddress.get, timeout, false, this, None, actorType)
if (future.isDefined) future.get
else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString)
}
diff --git a/akka-remote/src/main/scala/remote/RemoteClient.scala b/akka-remote/src/main/scala/remote/RemoteClient.scala
index f61a5d63a1..dc3cf6b146 100644
--- a/akka-remote/src/main/scala/remote/RemoteClient.scala
+++ b/akka-remote/src/main/scala/remote/RemoteClient.scala
@@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicLong
import scala.collection.mutable.{HashSet, HashMap}
import scala.reflect.BeanProperty
+import se.scalablesolutions.akka.actor._
/**
* Atomic remote request/reply message id generator.
@@ -76,8 +77,6 @@ object RemoteClient extends Logging {
private val remoteClients = new HashMap[String, RemoteClient]
private val remoteActors = new HashMap[RemoteServer.Address, HashSet[String]]
- // FIXME: simplify overloaded methods when we have Scala 2.8
-
def actorFor(classNameOrServiceId: String, hostname: String, port: Int): ActorRef =
actorFor(classNameOrServiceId, classNameOrServiceId, 5000L, hostname, port, None)
@@ -99,6 +98,27 @@ object RemoteClient extends Logging {
def actorFor(serviceId: String, className: String, timeout: Long, hostname: String, port: Int): ActorRef =
RemoteActorRef(serviceId, className, hostname, port, timeout, None)
+ def typedActorFor[T](intfClass: Class[T], serviceIdOrClassName: String, hostname: String, port: Int) : T = {
+ typedActorFor(intfClass, serviceIdOrClassName, serviceIdOrClassName, 5000L, hostname, port, None)
+ }
+
+ def typedActorFor[T](intfClass: Class[T], serviceIdOrClassName: String, timeout: Long, hostname: String, port: Int) : T = {
+ typedActorFor(intfClass, serviceIdOrClassName, serviceIdOrClassName, timeout, hostname, port, None)
+ }
+
+ def typedActorFor[T](intfClass: Class[T], serviceIdOrClassName: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader) : T = {
+ typedActorFor(intfClass, serviceIdOrClassName, serviceIdOrClassName, timeout, hostname, port, Some(loader))
+ }
+
+ def typedActorFor[T](intfClass: Class[T], serviceId: String, implClassName: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader) : T = {
+ typedActorFor(intfClass, serviceId, implClassName, timeout, hostname, port, Some(loader))
+ }
+
+ private[akka] def typedActorFor[T](intfClass: Class[T], serviceId: String, implClassName: String, timeout: Long, hostname: String, port: Int, loader: Option[ClassLoader]) : T = {
+ val actorRef = RemoteActorRef(serviceId, implClassName, hostname, port, timeout, loader, ActorType.TypedActor)
+ TypedActor.createProxyForRemoteActorRef(intfClass, actorRef)
+ }
+
private[akka] def actorFor(serviceId: String, className: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): ActorRef =
RemoteActorRef(serviceId, className, hostname, port, timeout, Some(loader))
diff --git a/akka-remote/src/main/scala/remote/RemoteServer.scala b/akka-remote/src/main/scala/remote/RemoteServer.scala
index 6337bd9893..bf9b38ca1b 100644
--- a/akka-remote/src/main/scala/remote/RemoteServer.scala
+++ b/akka-remote/src/main/scala/remote/RemoteServer.scala
@@ -10,7 +10,7 @@ import java.util.concurrent.{ConcurrentHashMap, Executors}
import java.util.{Map => JMap}
import se.scalablesolutions.akka.actor.{
- Actor, TypedActor, ActorRef, LocalActorRef, RemoteActorRef, IllegalActorStateException, RemoteActorSystemMessage}
+ Actor, TypedActor, ActorRef, IllegalActorStateException, RemoteActorSystemMessage}
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.util._
import se.scalablesolutions.akka.remote.protocol.RemoteProtocol._
@@ -133,8 +133,8 @@ object RemoteServer {
actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).actors.put(uuid, actor)
}
- private[akka] def registerTypedActor(address: InetSocketAddress, name: String, typedActor: AnyRef) = guard.withWriteGuard {
- actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).typedActors.put(name, typedActor)
+ private[akka] def registerTypedActor(address: InetSocketAddress, uuid: String, typedActor: AnyRef) = guard.withWriteGuard {
+ actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).typedActors.put(uuid, typedActor)
}
private[akka] def getOrCreateServer(address: InetSocketAddress): RemoteServer = guard.withWriteGuard {
@@ -271,7 +271,18 @@ class RemoteServer extends Logging with ListenerManagement {
}
}
- // TODO: register typed actor in RemoteServer as well
+ /**
+ * Register remote typed actor by a specific id.
+ * @param id custom actor id
+ * @param typedActor typed actor to register
+ */
+ def registerTypedActor(id: String, typedActor: AnyRef): Unit = synchronized {
+ val typedActors = RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).typedActors
+ if (!typedActors.contains(id)) {
+ log.debug("Registering server side remote actor [%s] with id [%s] on [%s:%d]", typedActor.getClass.getName, id, hostname, port)
+ typedActors.put(id, typedActor)
+ }
+ }
/**
* Register Remote Actor by the Actor's 'id' field. It starts the Actor if it is not started already.
@@ -321,11 +332,24 @@ class RemoteServer extends Logging with ListenerManagement {
}
}
+ /**
+ * Unregister Remote Typed Actor by specific 'id'.
+ *
+ * NOTE: You need to call this method if you have registered an actor by a custom ID.
+ */
+ def unregisterTypedActor(id: String):Unit = synchronized {
+ if (_isRunning) {
+ log.info("Unregistering server side remote typed actor with id [%s]", id)
+ val registeredTypedActors = typedActors()
+ registeredTypedActors.remove(id)
+ }
+ }
+
protected override def manageLifeCycleOfListeners = false
protected[akka] override def foreachListener(f: (ActorRef) => Unit): Unit = super.foreachListener(f)
- private[akka] def actors() = RemoteServer.actorsFor(address).actors
+ private[akka] def actors() = RemoteServer.actorsFor(address).actors
private[akka] def typedActors() = RemoteServer.actorsFor(address).typedActors
}
diff --git a/akka-remote/src/test/resources/META-INF/aop.xml b/akka-remote/src/test/resources/META-INF/aop.xml
index bdc167ca54..be133a51b8 100644
--- a/akka-remote/src/test/resources/META-INF/aop.xml
+++ b/akka-remote/src/test/resources/META-INF/aop.xml
@@ -2,6 +2,7 @@
+
diff --git a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala
index 8b1e0ef765..012d42f92a 100644
--- a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala
+++ b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala
@@ -144,5 +144,6 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite {
assert(numberOfActorsInRegistry === ActorRegistry.actors.length)
actor.stop
}
+
}
diff --git a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedActorSpec.scala b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedActorSpec.scala
new file mode 100644
index 0000000000..b800fbf2c3
--- /dev/null
+++ b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedActorSpec.scala
@@ -0,0 +1,112 @@
+/**
+ * Copyright (C) 2009-2010 Scalable Solutions AB
+ */
+
+package se.scalablesolutions.akka.actor.remote
+
+import org.scalatest.Spec
+import org.scalatest.matchers.ShouldMatchers
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.junit.JUnitRunner
+import org.junit.runner.RunWith
+
+import java.util.concurrent.TimeUnit
+
+import se.scalablesolutions.akka.remote.{RemoteServer, RemoteClient}
+import se.scalablesolutions.akka.actor._
+import RemoteTypedActorLog._
+
+object ServerInitiatedRemoteTypedActorSpec {
+ val HOSTNAME = "localhost"
+ val PORT = 9990
+ var server: RemoteServer = null
+}
+
+@RunWith(classOf[JUnitRunner])
+class ServerInitiatedRemoteTypedActorSpec extends
+ Spec with
+ ShouldMatchers with
+ BeforeAndAfterAll {
+ import ServerInitiatedRemoteTypedActorSpec._
+
+ private val unit = TimeUnit.MILLISECONDS
+
+
+ override def beforeAll = {
+ server = new RemoteServer()
+ server.start(HOSTNAME, PORT)
+
+ val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000)
+ server.registerTypedActor("typed-actor-service", typedActor)
+
+ Thread.sleep(1000)
+ }
+
+ // make sure the servers shutdown cleanly after the test has finished
+ override def afterAll = {
+ try {
+ server.shutdown
+ RemoteClient.shutdownAll
+ Thread.sleep(1000)
+ } catch {
+ case e => ()
+ }
+ }
+
+ describe("Server managed remote typed Actor ") {
+
+ it("should receive one-way message") {
+ clearMessageLogs
+ val actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", 5000L, HOSTNAME, PORT)
+ expect("oneway") {
+ actor.oneWay
+ oneWayLog.poll(5, TimeUnit.SECONDS)
+ }
+ }
+
+ it("should respond to request-reply message") {
+ clearMessageLogs
+ val actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", 5000L, HOSTNAME, PORT)
+ expect("pong") {
+ actor.requestReply("ping")
+ }
+ }
+
+ it("should not recreate registered actors") {
+ val actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", 5000L, HOSTNAME, PORT)
+ val numberOfActorsInRegistry = ActorRegistry.actors.length
+ expect("oneway") {
+ actor.oneWay
+ oneWayLog.poll(5, TimeUnit.SECONDS)
+ }
+ assert(numberOfActorsInRegistry === ActorRegistry.actors.length)
+ }
+
+ it("should support multiple variants to get the actor from client side") {
+ var actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", 5000L, HOSTNAME, PORT)
+ expect("oneway") {
+ actor.oneWay
+ oneWayLog.poll(5, TimeUnit.SECONDS)
+ }
+ actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", HOSTNAME, PORT)
+ expect("oneway") {
+ actor.oneWay
+ oneWayLog.poll(5, TimeUnit.SECONDS)
+ }
+ actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", 5000L, HOSTNAME, PORT, this.getClass().getClassLoader)
+ expect("oneway") {
+ actor.oneWay
+ oneWayLog.poll(5, TimeUnit.SECONDS)
+ }
+ }
+
+ it("should register and unregister typed actors") {
+ val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000)
+ server.registerTypedActor("my-test-service", typedActor)
+ assert(server.typedActors().get("my-test-service") != null)
+ server.unregisterTypedActor("my-test-service")
+ assert(server.typedActors().get("my-test-service") == null)
+ }
+ }
+}
+
diff --git a/akka-typed-actor/src/main/scala/actor/TypedActor.scala b/akka-typed-actor/src/main/scala/actor/TypedActor.scala
index b27f5b4b4d..385c1831a4 100644
--- a/akka-typed-actor/src/main/scala/actor/TypedActor.scala
+++ b/akka-typed-actor/src/main/scala/actor/TypedActor.scala
@@ -16,9 +16,8 @@ import org.codehaus.aspectwerkz.proxy.Proxy
import org.codehaus.aspectwerkz.annotation.{Aspect, Around}
import java.net.InetSocketAddress
-import java.lang.reflect.{InvocationTargetException, Method, Field}
-
import scala.reflect.BeanProperty
+import java.lang.reflect.{Method, Field, InvocationHandler, Proxy => JProxy}
/**
* TypedActor is a type-safe actor made out of a POJO with interface.
@@ -408,24 +407,47 @@ object TypedActor extends Logging {
proxy.asInstanceOf[T]
}
-/*
- // NOTE: currently not used - but keep it around
- private[akka] def newInstance[T <: TypedActor](targetClass: Class[T],
- remoteAddress: Option[InetSocketAddress], timeout: Long): T = {
- val proxy = {
- val instance = Proxy.newInstance(targetClass, true, false)
- if (instance.isInstanceOf[TypedActor]) instance.asInstanceOf[TypedActor]
- else throw new IllegalActorStateException("Actor [" + targetClass.getName + "] is not a sub class of 'TypedActor'")
+ /**
+ * Create a proxy for a RemoteActorRef representing a server managed remote typed actor.
+ *
+ */
+ private[akka] def createProxyForRemoteActorRef[T](intfClass: Class[T], actorRef: ActorRef): T = {
+
+ class MyInvocationHandler extends InvocationHandler {
+ def invoke(proxy: AnyRef, method: Method, args: Array[AnyRef]): AnyRef = {
+ // do nothing, this is just a dummy
+ null
+ }
}
- val context = injectTypedActorContext(proxy)
- actorRef.actor.asInstanceOf[Dispatcher].initialize(targetClass, proxy, proxy, context)
- actorRef.timeout = timeout
- if (remoteAddress.isDefined) actorRef.makeRemote(remoteAddress.get)
- AspectInitRegistry.register(proxy, AspectInit(targetClass, proxy, actorRef, remoteAddress, timeout))
- actorRef.start
- proxy.asInstanceOf[T]
+ val handler = new MyInvocationHandler()
+
+ val interfaces = Array(intfClass, classOf[ServerManagedTypedActor]).asInstanceOf[Array[java.lang.Class[_]]]
+ val jProxy = JProxy.newProxyInstance(intfClass.getClassLoader(), interfaces, handler)
+ val awProxy = Proxy.newInstance(interfaces, Array(jProxy, jProxy), true, false)
+
+ AspectInitRegistry.register(awProxy, AspectInit(intfClass, null, actorRef, None, 5000L))
+ awProxy.asInstanceOf[T]
}
-*/
+
+
+ /*
+ // NOTE: currently not used - but keep it around
+ private[akka] def newInstance[T <: TypedActor](targetClass: Class[T],
+ remoteAddress: Option[InetSocketAddress], timeout: Long): T = {
+ val proxy = {
+ val instance = Proxy.newInstance(targetClass, true, false)
+ if (instance.isInstanceOf[TypedActor]) instance.asInstanceOf[TypedActor]
+ else throw new IllegalActorStateException("Actor [" + targetClass.getName + "] is not a sub class of 'TypedActor'")
+ }
+ val context = injectTypedActorContext(proxy)
+ actorRef.actor.asInstanceOf[Dispatcher].initialize(targetClass, proxy, proxy, context)
+ actorRef.timeout = timeout
+ if (remoteAddress.isDefined) actorRef.makeRemote(remoteAddress.get)
+ AspectInitRegistry.register(proxy, AspectInit(targetClass, proxy, actorRef, remoteAddress, timeout))
+ actorRef.start
+ proxy.asInstanceOf[T]
+ }
+ */
/**
* Stops the current Typed Actor.
@@ -546,6 +568,30 @@ object TypedActor extends Logging {
private[akka] def isJoinPoint(message: Any): Boolean = message.isInstanceOf[JoinPoint]
}
+
+/**
+ * AspectWerkz Aspect that is turning POJO into proxy to a server managed remote TypedActor.
+ *
+ * Is deployed on a 'perInstance' basis with the pointcut 'execution(* *.*(..))',
+ * e.g. all methods on the instance.
+ *
+ * @author Jonas Bonér
+ */
+@Aspect("perInstance")
+private[akka] sealed class ServerManagedTypedActorAspect extends ActorAspect {
+
+ @Around("execution(* *.*(..)) && this(se.scalablesolutions.akka.actor.ServerManagedTypedActor)")
+ def invoke(joinPoint: JoinPoint): AnyRef = {
+ if (!isInitialized) initialize(joinPoint)
+ remoteDispatch(joinPoint)
+ }
+
+ override def initialize(joinPoint: JoinPoint): Unit = {
+ super.initialize(joinPoint)
+ remoteAddress = actorRef.remoteAddress
+ }
+}
+
/**
* AspectWerkz Aspect that is turning POJO into TypedActor.
*
@@ -555,18 +601,9 @@ object TypedActor extends Logging {
* @author Jonas Bonér
*/
@Aspect("perInstance")
-private[akka] sealed class TypedActorAspect {
- @volatile private var isInitialized = false
- @volatile private var isStopped = false
- private var interfaceClass: Class[_] = _
- private var typedActor: TypedActor = _
- private var actorRef: ActorRef = _
- private var remoteAddress: Option[InetSocketAddress] = _
- private var timeout: Long = _
- private var uuid: String = _
- @volatile private var instance: TypedActor = _
+private[akka] sealed class TypedActorAspect extends ActorAspect {
- @Around("execution(* *.*(..))")
+ @Around("execution(* *.*(..)) && !this(se.scalablesolutions.akka.actor.ServerManagedTypedActor)")
def invoke(joinPoint: JoinPoint): AnyRef = {
if (!isInitialized) initialize(joinPoint)
dispatch(joinPoint)
@@ -576,12 +613,26 @@ private[akka] sealed class TypedActorAspect {
if (remoteAddress.isDefined) remoteDispatch(joinPoint)
else localDispatch(joinPoint)
}
+}
- private def localDispatch(joinPoint: JoinPoint): AnyRef = {
- val methodRtti = joinPoint.getRtti.asInstanceOf[MethodRtti]
- val isOneWay = TypedActor.isOneWay(methodRtti)
+/**
+ * Base class for TypedActorAspect and ServerManagedTypedActorAspect to reduce code duplication.
+ */
+private[akka] abstract class ActorAspect {
+ @volatile protected var isInitialized = false
+ @volatile protected var isStopped = false
+ protected var interfaceClass: Class[_] = _
+ protected var typedActor: TypedActor = _
+ protected var actorRef: ActorRef = _
+ protected var timeout: Long = _
+ protected var uuid: String = _
+ protected var remoteAddress: Option[InetSocketAddress] = _
+
+ protected def localDispatch(joinPoint: JoinPoint): AnyRef = {
+ val methodRtti = joinPoint.getRtti.asInstanceOf[MethodRtti]
+ val isOneWay = TypedActor.isOneWay(methodRtti)
val senderActorRef = Some(SenderContextInfo.senderActorRef.value)
- val senderProxy = Some(SenderContextInfo.senderProxy.value)
+ val senderProxy = Some(SenderContextInfo.senderProxy.value)
typedActor.context._sender = senderProxy
if (!actorRef.isRunning && !isStopped) {
@@ -602,7 +653,7 @@ private[akka] sealed class TypedActorAspect {
}
}
- private def remoteDispatch(joinPoint: JoinPoint): AnyRef = {
+ protected def remoteDispatch(joinPoint: JoinPoint): AnyRef = {
val methodRtti = joinPoint.getRtti.asInstanceOf[MethodRtti]
val isOneWay = TypedActor.isOneWay(methodRtti)
@@ -641,7 +692,7 @@ private[akka] sealed class TypedActorAspect {
(escapedArgs, isEscaped)
}
- private def initialize(joinPoint: JoinPoint): Unit = {
+ protected def initialize(joinPoint: JoinPoint): Unit = {
val init = AspectInitRegistry.initFor(joinPoint.getThis)
interfaceClass = init.interfaceClass
typedActor = init.targetInstance
@@ -653,6 +704,7 @@ private[akka] sealed class TypedActorAspect {
}
}
+
/**
* Internal helper class to help pass the contextual information between threads.
*
@@ -704,5 +756,11 @@ private[akka] sealed case class AspectInit(
val timeout: Long) {
def this(interfaceClass: Class[_], targetInstance: TypedActor, actorRef: ActorRef, timeout: Long) =
this(interfaceClass, targetInstance, actorRef, None, timeout)
+
}
+
+/**
+ * Marker interface for server manager typed actors.
+ */
+private[akka] sealed trait ServerManagedTypedActor extends TypedActor
\ No newline at end of file