From 60d4010421d73d241f809974ab6d3423facf61f1 Mon Sep 17 00:00:00 2001 From: Michael Kober Date: Mon, 6 Sep 2010 10:15:44 +0200 Subject: [PATCH 1/2] started working on ticket 194 --- .../src/main/scala/actor/ActorRef.scala | 7 +- .../src/main/scala/remote/RemoteClient.scala | 16 +- .../src/main/scala/remote/RemoteServer.scala | 44 ++++- .../akka/actor/RemoteTypedActorOneImpl.java | 3 + .../src/test/resources/META-INF/aop.xml | 1 + .../ServerInitiatedRemoteTypedActorSpec.scala | 86 +++++++++ .../src/main/scala/actor/TypedActor.scala | 176 +++++++++++++++--- .../typed-actor/TypedActorLifecycleSpec.scala | 1 + 8 files changed, 299 insertions(+), 35 deletions(-) create mode 100644 akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedActorSpec.scala diff --git a/akka-actor/src/main/scala/actor/ActorRef.scala b/akka-actor/src/main/scala/actor/ActorRef.scala index a6b42db579..c712a8d408 100644 --- a/akka-actor/src/main/scala/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/actor/ActorRef.scala @@ -1362,7 +1362,8 @@ private[akka] case class RemoteActorRef private[akka] ( val hostname: String, val port: Int, _timeout: Long, - loader: Option[ClassLoader]) + loader: Option[ClassLoader], + val actorType: ActorType = ActorType.ScalaActor ) extends ActorRef with ScalaActorRef { ensureRemotingEnabled @@ -1375,7 +1376,7 @@ private[akka] case class RemoteActorRef private[akka] ( def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = RemoteClientModule.send[Any]( - message, senderOption, None, remoteAddress.get, timeout, true, this, None, ActorType.ScalaActor) + message, senderOption, None, remoteAddress.get, timeout, true, this, None, actorType) def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( message: Any, @@ -1383,7 +1384,7 @@ private[akka] case class RemoteActorRef private[akka] ( senderOption: Option[ActorRef], senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { val future = RemoteClientModule.send[T]( - message, senderOption, senderFuture, remoteAddress.get, timeout, false, this, None, ActorType.ScalaActor) + message, senderOption, senderFuture, remoteAddress.get, timeout, false, this, None, actorType) if (future.isDefined) future.get else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString) } diff --git a/akka-remote/src/main/scala/remote/RemoteClient.scala b/akka-remote/src/main/scala/remote/RemoteClient.scala index f61a5d63a1..689f55a049 100644 --- a/akka-remote/src/main/scala/remote/RemoteClient.scala +++ b/akka-remote/src/main/scala/remote/RemoteClient.scala @@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicLong import scala.collection.mutable.{HashSet, HashMap} import scala.reflect.BeanProperty +import se.scalablesolutions.akka.actor._ /** * Atomic remote request/reply message id generator. @@ -76,11 +77,19 @@ object RemoteClient extends Logging { private val remoteClients = new HashMap[String, RemoteClient] private val remoteActors = new HashMap[RemoteServer.Address, HashSet[String]] - // FIXME: simplify overloaded methods when we have Scala 2.8 - def actorFor(classNameOrServiceId: String, hostname: String, port: Int): ActorRef = actorFor(classNameOrServiceId, classNameOrServiceId, 5000L, hostname, port, None) + // FIXME: + def typedActorFor[T](intfClass: Class[T], serviceId: String, implClassName: String, timeout: Long, hostname: String, port: Int) : T = { + + println("### create RemoteActorRef") + val actorRef = RemoteActorRef(serviceId, implClassName, hostname, port, timeout, None, ActorType.TypedActor) + val proxy = TypedActor.createProxyForRemoteActorRef(intfClass, actorRef) + proxy + + } + def actorFor(classNameOrServiceId: String, hostname: String, port: Int, loader: ClassLoader): ActorRef = actorFor(classNameOrServiceId, classNameOrServiceId, 5000L, hostname, port, Some(loader)) @@ -99,6 +108,9 @@ object RemoteClient extends Logging { def actorFor(serviceId: String, className: String, timeout: Long, hostname: String, port: Int): ActorRef = RemoteActorRef(serviceId, className, hostname, port, timeout, None) + + + private[akka] def actorFor(serviceId: String, className: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): ActorRef = RemoteActorRef(serviceId, className, hostname, port, timeout, Some(loader)) diff --git a/akka-remote/src/main/scala/remote/RemoteServer.scala b/akka-remote/src/main/scala/remote/RemoteServer.scala index 5f24def4f5..af1507c3cb 100644 --- a/akka-remote/src/main/scala/remote/RemoteServer.scala +++ b/akka-remote/src/main/scala/remote/RemoteServer.scala @@ -120,9 +120,12 @@ object RemoteServer { } } - private class RemoteActorSet { - private[RemoteServer] val actors = new ConcurrentHashMap[String, ActorRef] - private[RemoteServer] val typedActors = new ConcurrentHashMap[String, AnyRef] + // FIXME private + class RemoteActorSet { + //private[RemoteServer] val actors = new ConcurrentHashMap[String, ActorRef] + val actors = new ConcurrentHashMap[String, ActorRef] + //private[RemoteServer] val typedActors = new ConcurrentHashMap[String, AnyRef] + val typedActors = new ConcurrentHashMap[String, AnyRef] } private val guard = new ReadWriteGuard @@ -130,11 +133,14 @@ object RemoteServer { private val remoteServers = Map[Address, RemoteServer]() private[akka] def registerActor(address: InetSocketAddress, uuid: String, actor: ActorRef) = guard.withWriteGuard { - actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).actors.put(uuid, actor) + // FIXME + //actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).actors.put(uuid, actor) + val actors = actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).actors + actors.put(uuid, actor) } - private[akka] def registerTypedActor(address: InetSocketAddress, name: String, typedActor: AnyRef) = guard.withWriteGuard { - actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).typedActors.put(name, typedActor) + private[akka] def registerTypedActor(address: InetSocketAddress, uuid: String, typedActor: TypedActor) = guard.withWriteGuard { + actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).typedActors.put(uuid, typedActor) } private[akka] def getOrCreateServer(address: InetSocketAddress): RemoteServer = guard.withWriteGuard { @@ -159,7 +165,9 @@ object RemoteServer { remoteServers.remove(Address(hostname, port)) } - private def actorsFor(remoteServerAddress: RemoteServer.Address): RemoteActorSet = { + // FIXME + def actorsFor(remoteServerAddress: RemoteServer.Address): RemoteActorSet = { + println("##### actorsFor SIZE=" + remoteActorSets.size) remoteActorSets.getOrElseUpdate(remoteServerAddress,new RemoteActorSet) } } @@ -195,7 +203,7 @@ class RemoteServer extends Logging with ListenerManagement { private[akka] var hostname = RemoteServer.HOSTNAME private[akka] var port = RemoteServer.PORT - + @volatile private var _isRunning = false private val factory = new NioServerSocketChannelFactory( @@ -270,7 +278,22 @@ class RemoteServer extends Logging with ListenerManagement { } } - // TODO: register typed actor in RemoteServer as well + // FIXME: register typed actor in RemoteServer as well + def registerTypedActor(id: String, actorRef: AnyRef): Unit = synchronized { + val typedActors = RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).typedActors + if (!typedActors.contains(id)) { + log.debug("Registering server side remote actor [%s] with id [%s] on [%s:%d]", actorRef.getClass.getName, id, hostname, port) + typedActors.put(id, actorRef) + } + } + + private def actors() : ConcurrentHashMap[String, ActorRef] = { + RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors + } + + private def typedActors() : ConcurrentHashMap[String, AnyRef] = { + RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).typedActors + } /** * Register Remote Actor by the Actor's 'id' field. It starts the Actor if it is not started already. @@ -285,6 +308,7 @@ class RemoteServer extends Logging with ListenerManagement { def register(id: String, actorRef: ActorRef): Unit = synchronized { if (_isRunning) { val actors = RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors + println("___ register ___ " + actors.hashCode + " hostname=" + hostname + " port="+ port) if (!actors.contains(id)) { if (!actorRef.isRunning) actorRef.start log.debug("Registering server side remote actor [%s] with id [%s]", actorRef.actorClass.getName, id) @@ -320,6 +344,8 @@ class RemoteServer extends Logging with ListenerManagement { } } + //FIXME: unregister typed Actor + protected override def manageLifeCycleOfListeners = false protected[akka] override def foreachListener(f: (ActorRef) => Unit): Unit = super.foreachListener(f) diff --git a/akka-remote/src/test/java/se/scalablesolutions/akka/actor/RemoteTypedActorOneImpl.java b/akka-remote/src/test/java/se/scalablesolutions/akka/actor/RemoteTypedActorOneImpl.java index 715e5366a4..a30aa26124 100644 --- a/akka-remote/src/test/java/se/scalablesolutions/akka/actor/RemoteTypedActorOneImpl.java +++ b/akka-remote/src/test/java/se/scalablesolutions/akka/actor/RemoteTypedActorOneImpl.java @@ -18,9 +18,12 @@ public class RemoteTypedActorOneImpl extends TypedActor implements RemoteTypedAc } public void oneWay() throws Exception { + System.out.println("--------> got it!!!!!!"); RemoteTypedActorLog.oneWayLog().put("oneway"); } + + @Override public void preRestart(Throwable e) { try { RemoteTypedActorLog.messageLog().put(e.getMessage()); } catch(Exception ex) {} diff --git a/akka-remote/src/test/resources/META-INF/aop.xml b/akka-remote/src/test/resources/META-INF/aop.xml index bdc167ca54..be133a51b8 100644 --- a/akka-remote/src/test/resources/META-INF/aop.xml +++ b/akka-remote/src/test/resources/META-INF/aop.xml @@ -2,6 +2,7 @@ + diff --git a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedActorSpec.scala b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedActorSpec.scala new file mode 100644 index 0000000000..76c50f7658 --- /dev/null +++ b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedActorSpec.scala @@ -0,0 +1,86 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.actor.remote + +import java.util.concurrent.{CountDownLatch, TimeUnit} +import org.scalatest.junit.JUnitSuite +import org.junit.{Test, Before, After} + +import se.scalablesolutions.akka.remote.{RemoteServer, RemoteClient} +import se.scalablesolutions.akka.actor._ + +object ServerInitiatedRemoteTypedActorSpec { + val HOSTNAME = "localhost" + val PORT = 9990 + var server: RemoteServer = null + + class SimpleActor extends Actor { + def receive = { + case _ => println("received message") + } + } + + +} + +class ServerInitiatedRemoteTypedActorSpec extends JUnitSuite { + import ServerInitiatedRemoteTypedActorSpec._ + private val unit = TimeUnit.MILLISECONDS + + + @Before + def init { + server = new RemoteServer() + server.start(HOSTNAME, PORT) + Thread.sleep(1000) + } + + // make sure the servers shutdown cleanly after the test has finished + @After + def finished { + try { + server.shutdown + RemoteClient.shutdownAll + Thread.sleep(1000) + } catch { + case e => () + } + } + + + @Test + def shouldSendWithBang { + + /* + val clientManangedTypedActor = TypedActor.newRemoteInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000, HOSTNAME, PORT) + clientManangedTypedActor.requestReply("test-string") + Thread.sleep(2000) + println("###########") + */ + /* + trace() + val actor = Actor.actorOf[SimpleActor].start + server.register("simple-actor", actor) + val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000) + server.registerTypedActor("typed-actor-service", typedActor) + println("### registered actor") + trace() + + //val actorRef = RemoteActorRef("typed-actor-service", classOf[RemoteTypedActorOneImpl].getName, HOSTNAME, PORT, 5000L, None) + //val myActor = TypedActor.createProxyForRemoteActorRef(classOf[RemoteTypedActorOne], actorRef) + val myActor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", classOf[RemoteTypedActorOneImpl].getName, 5000L, HOSTNAME, PORT) + println("### call one way") + myActor.oneWay() + Thread.sleep(3000) + println("### call one way - done") + */ + //assert(RemoteActorSpecActorUnidirectional.latch.await(1, TimeUnit.SECONDS)) + //actor.stop + /* */ + } + + +} + diff --git a/akka-typed-actor/src/main/scala/actor/TypedActor.scala b/akka-typed-actor/src/main/scala/actor/TypedActor.scala index b27f5b4b4d..7f0ee5dbbc 100644 --- a/akka-typed-actor/src/main/scala/actor/TypedActor.scala +++ b/akka-typed-actor/src/main/scala/actor/TypedActor.scala @@ -16,9 +16,8 @@ import org.codehaus.aspectwerkz.proxy.Proxy import org.codehaus.aspectwerkz.annotation.{Aspect, Around} import java.net.InetSocketAddress -import java.lang.reflect.{InvocationTargetException, Method, Field} - import scala.reflect.BeanProperty +import java.lang.reflect.{Method, Field, InvocationHandler, Proxy => JProxy} /** * TypedActor is a type-safe actor made out of a POJO with interface. @@ -183,6 +182,7 @@ abstract class TypedActor extends Actor with Proxyable { case Link(proxy) => self.link(proxy) case Unlink(proxy) => self.unlink(proxy) + case method: String => println("### got method") case unexpected => throw new IllegalActorStateException( "Unexpected message [" + unexpected + "] sent to [" + this + "]") } @@ -408,24 +408,47 @@ object TypedActor extends Logging { proxy.asInstanceOf[T] } -/* - // NOTE: currently not used - but keep it around - private[akka] def newInstance[T <: TypedActor](targetClass: Class[T], - remoteAddress: Option[InetSocketAddress], timeout: Long): T = { - val proxy = { - val instance = Proxy.newInstance(targetClass, true, false) - if (instance.isInstanceOf[TypedActor]) instance.asInstanceOf[TypedActor] - else throw new IllegalActorStateException("Actor [" + targetClass.getName + "] is not a sub class of 'TypedActor'") + // FIXME + def createProxyForRemoteActorRef[T](intfClass: Class[T], actorRef: ActorRef): T = { + + class MyInvocationHandler extends InvocationHandler { + def invoke(proxy: AnyRef, method: Method, args: Array[AnyRef]): AnyRef = { + // do nothing, this is just a dummy + null + } } - val context = injectTypedActorContext(proxy) - actorRef.actor.asInstanceOf[Dispatcher].initialize(targetClass, proxy, proxy, context) - actorRef.timeout = timeout - if (remoteAddress.isDefined) actorRef.makeRemote(remoteAddress.get) - AspectInitRegistry.register(proxy, AspectInit(targetClass, proxy, actorRef, remoteAddress, timeout)) - actorRef.start - proxy.asInstanceOf[T] + val handler = new MyInvocationHandler() + + val interfaces = Array(intfClass, classOf[ServerManagedTypedActor]).asInstanceOf[Array[java.lang.Class[_]]] + val jProxy = JProxy.newProxyInstance(intfClass.getClassLoader(), interfaces, handler) + val awProxy = Proxy.newInstance(interfaces, Array(jProxy, jProxy), true, false) + + // TODO: needed? + //typedActor.initialize(proxy) + // TODO: timeout?? + AspectInitRegistry.register(awProxy, AspectInit(intfClass, null, actorRef, None, 5000L)) + awProxy.asInstanceOf[T] } -*/ + + + /* + // NOTE: currently not used - but keep it around + private[akka] def newInstance[T <: TypedActor](targetClass: Class[T], + remoteAddress: Option[InetSocketAddress], timeout: Long): T = { + val proxy = { + val instance = Proxy.newInstance(targetClass, true, false) + if (instance.isInstanceOf[TypedActor]) instance.asInstanceOf[TypedActor] + else throw new IllegalActorStateException("Actor [" + targetClass.getName + "] is not a sub class of 'TypedActor'") + } + val context = injectTypedActorContext(proxy) + actorRef.actor.asInstanceOf[Dispatcher].initialize(targetClass, proxy, proxy, context) + actorRef.timeout = timeout + if (remoteAddress.isDefined) actorRef.makeRemote(remoteAddress.get) + AspectInitRegistry.register(proxy, AspectInit(targetClass, proxy, actorRef, remoteAddress, timeout)) + actorRef.start + proxy.asInstanceOf[T] + } + */ /** * Stops the current Typed Actor. @@ -546,6 +569,114 @@ object TypedActor extends Logging { private[akka] def isJoinPoint(message: Any): Boolean = message.isInstanceOf[JoinPoint] } +/** + * AspectWerkz Aspect that is turning POJO into proxy to a server managed remote TypedActor. + *

+ * Is deployed on a 'perInstance' basis with the pointcut 'execution(* *.*(..))', + * e.g. all methods on the instance. + * + * @author Jonas Bonér + */ +@Aspect("perInstance") +private[akka] sealed class ServerManagedTypedActorAspect { + @volatile private var isInitialized = false + @volatile private var isStopped = false + private var interfaceClass: Class[_] = _ + private var actorRef: ActorRef = _ + private var timeout: Long = _ + private var uuid: String = _ + private var remoteAddress: Option[InetSocketAddress] = _ + + //FIXME + + @Around("execution(* *.*(..)) && this(se.scalablesolutions.akka.actor.ServerManagedTypedActor)") + def invoke(joinPoint: JoinPoint): AnyRef = { + println("### MyAspect intercepted " + joinPoint.getSignature) + if (!isInitialized) initialize(joinPoint) + remoteDispatch(joinPoint) + } + + + private def remoteDispatch(joinPoint: JoinPoint): AnyRef = { + val methodRtti = joinPoint.getRtti.asInstanceOf[MethodRtti] + val isOneWay = TypedActor.isOneWay(methodRtti) + + val (message: Array[AnyRef], isEscaped) = escapeArguments(methodRtti.getParameterValues) + + println("### remote dispatch...") + + val future = RemoteClientModule.send[AnyRef]( + message, None, None, remoteAddress.get, + timeout, isOneWay, actorRef, + Some((interfaceClass.getName, methodRtti.getMethod.getName)), + ActorType.TypedActor) + + if (isOneWay) null // for void methods + else { + if (future.isDefined) { + future.get.await + val result = getResultOrThrowException(future.get) + if (result.isDefined) result.get + else throw new IllegalActorStateException("No result returned from call to [" + joinPoint + "]") + } else throw new IllegalActorStateException("No future returned from call to [" + joinPoint + "]") + } + } + + /* + private def remoteDispatch(joinPoint: JoinPoint): AnyRef = { + val methodRtti = joinPoint.getRtti.asInstanceOf[MethodRtti] + val isOneWay = TypedActor.isOneWay(methodRtti) + val senderActorRef = Some(SenderContextInfo.senderActorRef.value) + + + if (!actorRef.isRunning && !isStopped) { + isStopped = true + // FIXME - what to do? + // joinPoint.proceed + null + } else if (isOneWay) { + actorRef.!("joinPoint") + //actorRef.!(joinPoint)(senderActorRef) + null.asInstanceOf[AnyRef] + } else if (TypedActor.returnsFuture_?(methodRtti)) { + actorRef.!!!(joinPoint, timeout)(senderActorRef) + } else { + val result = (actorRef.!!(joinPoint, timeout)(senderActorRef)).as[AnyRef] + if (result.isDefined) result.get + else throw new ActorTimeoutException("Invocation to [" + joinPoint + "] timed out.") + } + } + */ + private def getResultOrThrowException[T](future: Future[T]): Option[T] = + if (future.exception.isDefined) throw future.exception.get + else future.result + + private def escapeArguments(args: Array[AnyRef]): Tuple2[Array[AnyRef], Boolean] = { + var isEscaped = false + val escapedArgs = for (arg <- args) yield { + val clazz = arg.getClass + if (clazz.getName.contains(TypedActor.AW_PROXY_PREFIX)) { + isEscaped = true + TypedActor.AW_PROXY_PREFIX + clazz.getSuperclass.getName + } else arg + } + (escapedArgs, isEscaped) + } + + + private def initialize(joinPoint: JoinPoint): Unit = { + val init = AspectInitRegistry.initFor(joinPoint.getThis) + interfaceClass = init.interfaceClass + actorRef = init.actorRef + uuid = actorRef.uuid + remoteAddress = actorRef.remoteAddress + println("### address= " + remoteAddress.get) + timeout = init.timeout + isInitialized = true + } + +} + /** * AspectWerkz Aspect that is turning POJO into TypedActor. *

@@ -564,9 +695,8 @@ private[akka] sealed class TypedActorAspect { private var remoteAddress: Option[InetSocketAddress] = _ private var timeout: Long = _ private var uuid: String = _ - @volatile private var instance: TypedActor = _ - - @Around("execution(* *.*(..))") + + @Around("execution(* *.*(..)) && !this(se.scalablesolutions.akka.actor.ServerManagedTypedActor)") def invoke(joinPoint: JoinPoint): AnyRef = { if (!isInitialized) initialize(joinPoint) dispatch(joinPoint) @@ -653,6 +783,7 @@ private[akka] sealed class TypedActorAspect { } } + /** * Internal helper class to help pass the contextual information between threads. * @@ -704,5 +835,8 @@ private[akka] sealed case class AspectInit( val timeout: Long) { def this(interfaceClass: Class[_], targetInstance: TypedActor, actorRef: ActorRef, timeout: Long) = this(interfaceClass, targetInstance, actorRef, None, timeout) + } + +private[akka] sealed trait ServerManagedTypedActor extends TypedActor \ No newline at end of file diff --git a/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorLifecycleSpec.scala b/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorLifecycleSpec.scala index 10fc40493b..1fcbe0c5ef 100644 --- a/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorLifecycleSpec.scala +++ b/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorLifecycleSpec.scala @@ -45,6 +45,7 @@ class TypedActorLifecycleSpec extends Spec with ShouldMatchers with BeforeAndAft fail("expected exception not thrown") } catch { case e: RuntimeException => { + println("#failed") cdl.await assert(SamplePojoImpl._pre) assert(SamplePojoImpl._post) From ec61c29f21672f17cd30292192cbbee0d2ea8aae Mon Sep 17 00:00:00 2001 From: Michael Kober Date: Mon, 6 Sep 2010 16:33:55 +0200 Subject: [PATCH 2/2] implemented server managed typed actor --- .../src/main/scala/actor/ActorRef.scala | 2 +- .../src/main/scala/remote/RemoteClient.scala | 28 ++-- .../src/main/scala/remote/RemoteServer.scala | 60 ++++--- .../akka/actor/RemoteTypedActorOneImpl.java | 3 - .../ServerInitiatedRemoteActorSpec.scala | 1 + .../ServerInitiatedRemoteTypedActorSpec.scala | 116 ++++++++------ .../src/main/scala/actor/TypedActor.scala | 146 +++++------------- .../typed-actor/TypedActorLifecycleSpec.scala | 1 - 8 files changed, 155 insertions(+), 202 deletions(-) diff --git a/akka-actor/src/main/scala/actor/ActorRef.scala b/akka-actor/src/main/scala/actor/ActorRef.scala index c712a8d408..5da69ea7c2 100644 --- a/akka-actor/src/main/scala/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/actor/ActorRef.scala @@ -1363,7 +1363,7 @@ private[akka] case class RemoteActorRef private[akka] ( val port: Int, _timeout: Long, loader: Option[ClassLoader], - val actorType: ActorType = ActorType.ScalaActor ) + val actorType: ActorType = ActorType.ScalaActor) extends ActorRef with ScalaActorRef { ensureRemotingEnabled diff --git a/akka-remote/src/main/scala/remote/RemoteClient.scala b/akka-remote/src/main/scala/remote/RemoteClient.scala index 689f55a049..dc3cf6b146 100644 --- a/akka-remote/src/main/scala/remote/RemoteClient.scala +++ b/akka-remote/src/main/scala/remote/RemoteClient.scala @@ -80,16 +80,6 @@ object RemoteClient extends Logging { def actorFor(classNameOrServiceId: String, hostname: String, port: Int): ActorRef = actorFor(classNameOrServiceId, classNameOrServiceId, 5000L, hostname, port, None) - // FIXME: - def typedActorFor[T](intfClass: Class[T], serviceId: String, implClassName: String, timeout: Long, hostname: String, port: Int) : T = { - - println("### create RemoteActorRef") - val actorRef = RemoteActorRef(serviceId, implClassName, hostname, port, timeout, None, ActorType.TypedActor) - val proxy = TypedActor.createProxyForRemoteActorRef(intfClass, actorRef) - proxy - - } - def actorFor(classNameOrServiceId: String, hostname: String, port: Int, loader: ClassLoader): ActorRef = actorFor(classNameOrServiceId, classNameOrServiceId, 5000L, hostname, port, Some(loader)) @@ -108,8 +98,26 @@ object RemoteClient extends Logging { def actorFor(serviceId: String, className: String, timeout: Long, hostname: String, port: Int): ActorRef = RemoteActorRef(serviceId, className, hostname, port, timeout, None) + def typedActorFor[T](intfClass: Class[T], serviceIdOrClassName: String, hostname: String, port: Int) : T = { + typedActorFor(intfClass, serviceIdOrClassName, serviceIdOrClassName, 5000L, hostname, port, None) + } + def typedActorFor[T](intfClass: Class[T], serviceIdOrClassName: String, timeout: Long, hostname: String, port: Int) : T = { + typedActorFor(intfClass, serviceIdOrClassName, serviceIdOrClassName, timeout, hostname, port, None) + } + def typedActorFor[T](intfClass: Class[T], serviceIdOrClassName: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader) : T = { + typedActorFor(intfClass, serviceIdOrClassName, serviceIdOrClassName, timeout, hostname, port, Some(loader)) + } + + def typedActorFor[T](intfClass: Class[T], serviceId: String, implClassName: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader) : T = { + typedActorFor(intfClass, serviceId, implClassName, timeout, hostname, port, Some(loader)) + } + + private[akka] def typedActorFor[T](intfClass: Class[T], serviceId: String, implClassName: String, timeout: Long, hostname: String, port: Int, loader: Option[ClassLoader]) : T = { + val actorRef = RemoteActorRef(serviceId, implClassName, hostname, port, timeout, loader, ActorType.TypedActor) + TypedActor.createProxyForRemoteActorRef(intfClass, actorRef) + } private[akka] def actorFor(serviceId: String, className: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): ActorRef = RemoteActorRef(serviceId, className, hostname, port, timeout, Some(loader)) diff --git a/akka-remote/src/main/scala/remote/RemoteServer.scala b/akka-remote/src/main/scala/remote/RemoteServer.scala index af1507c3cb..5a7fbd00d9 100644 --- a/akka-remote/src/main/scala/remote/RemoteServer.scala +++ b/akka-remote/src/main/scala/remote/RemoteServer.scala @@ -10,7 +10,7 @@ import java.util.concurrent.{ConcurrentHashMap, Executors} import java.util.{Map => JMap} import se.scalablesolutions.akka.actor.{ - Actor, TypedActor, ActorRef, LocalActorRef, RemoteActorRef, IllegalActorStateException, RemoteActorSystemMessage} + Actor, TypedActor, ActorRef, IllegalActorStateException, RemoteActorSystemMessage} import se.scalablesolutions.akka.actor.Actor._ import se.scalablesolutions.akka.util._ import se.scalablesolutions.akka.remote.protocol.RemoteProtocol._ @@ -120,12 +120,9 @@ object RemoteServer { } } - // FIXME private - class RemoteActorSet { - //private[RemoteServer] val actors = new ConcurrentHashMap[String, ActorRef] - val actors = new ConcurrentHashMap[String, ActorRef] - //private[RemoteServer] val typedActors = new ConcurrentHashMap[String, AnyRef] - val typedActors = new ConcurrentHashMap[String, AnyRef] + private class RemoteActorSet { + private[RemoteServer] val actors = new ConcurrentHashMap[String, ActorRef] + private[RemoteServer] val typedActors = new ConcurrentHashMap[String, AnyRef] } private val guard = new ReadWriteGuard @@ -133,13 +130,10 @@ object RemoteServer { private val remoteServers = Map[Address, RemoteServer]() private[akka] def registerActor(address: InetSocketAddress, uuid: String, actor: ActorRef) = guard.withWriteGuard { - // FIXME - //actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).actors.put(uuid, actor) - val actors = actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).actors - actors.put(uuid, actor) + actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).actors.put(uuid, actor) } - private[akka] def registerTypedActor(address: InetSocketAddress, uuid: String, typedActor: TypedActor) = guard.withWriteGuard { + private[akka] def registerTypedActor(address: InetSocketAddress, uuid: String, typedActor: AnyRef) = guard.withWriteGuard { actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).typedActors.put(uuid, typedActor) } @@ -165,9 +159,7 @@ object RemoteServer { remoteServers.remove(Address(hostname, port)) } - // FIXME - def actorsFor(remoteServerAddress: RemoteServer.Address): RemoteActorSet = { - println("##### actorsFor SIZE=" + remoteActorSets.size) + private def actorsFor(remoteServerAddress: RemoteServer.Address): RemoteActorSet = { remoteActorSets.getOrElseUpdate(remoteServerAddress,new RemoteActorSet) } } @@ -278,23 +270,19 @@ class RemoteServer extends Logging with ListenerManagement { } } - // FIXME: register typed actor in RemoteServer as well - def registerTypedActor(id: String, actorRef: AnyRef): Unit = synchronized { + /** + * Register remote typed actor by a specific id. + * @param id custom actor id + * @param typedActor typed actor to register + */ + def registerTypedActor(id: String, typedActor: AnyRef): Unit = synchronized { val typedActors = RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).typedActors if (!typedActors.contains(id)) { - log.debug("Registering server side remote actor [%s] with id [%s] on [%s:%d]", actorRef.getClass.getName, id, hostname, port) - typedActors.put(id, actorRef) + log.debug("Registering server side remote actor [%s] with id [%s] on [%s:%d]", typedActor.getClass.getName, id, hostname, port) + typedActors.put(id, typedActor) } } - private def actors() : ConcurrentHashMap[String, ActorRef] = { - RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors - } - - private def typedActors() : ConcurrentHashMap[String, AnyRef] = { - RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).typedActors - } - /** * Register Remote Actor by the Actor's 'id' field. It starts the Actor if it is not started already. */ @@ -308,7 +296,6 @@ class RemoteServer extends Logging with ListenerManagement { def register(id: String, actorRef: ActorRef): Unit = synchronized { if (_isRunning) { val actors = RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors - println("___ register ___ " + actors.hashCode + " hostname=" + hostname + " port="+ port) if (!actors.contains(id)) { if (!actorRef.isRunning) actorRef.start log.debug("Registering server side remote actor [%s] with id [%s]", actorRef.actorClass.getName, id) @@ -344,16 +331,27 @@ class RemoteServer extends Logging with ListenerManagement { } } - //FIXME: unregister typed Actor + /** + * Unregister Remote Typed Actor by specific 'id'. + *

+ * NOTE: You need to call this method if you have registered an actor by a custom ID. + */ + def unregisterTypedActor(id: String):Unit = synchronized { + if (_isRunning) { + log.info("Unregistering server side remote typed actor with id [%s]", id) + val registeredTypedActors = typedActors() + registeredTypedActors.remove(id) + } + } protected override def manageLifeCycleOfListeners = false protected[akka] override def foreachListener(f: (ActorRef) => Unit): Unit = super.foreachListener(f) - private def actors() : ConcurrentHashMap[String, ActorRef] = { + private[akka] def actors() : ConcurrentHashMap[String, ActorRef] = { RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors } - private def typedActors() : ConcurrentHashMap[String, AnyRef] = { + private[akka] def typedActors() : ConcurrentHashMap[String, AnyRef] = { RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).typedActors } } diff --git a/akka-remote/src/test/java/se/scalablesolutions/akka/actor/RemoteTypedActorOneImpl.java b/akka-remote/src/test/java/se/scalablesolutions/akka/actor/RemoteTypedActorOneImpl.java index a30aa26124..715e5366a4 100644 --- a/akka-remote/src/test/java/se/scalablesolutions/akka/actor/RemoteTypedActorOneImpl.java +++ b/akka-remote/src/test/java/se/scalablesolutions/akka/actor/RemoteTypedActorOneImpl.java @@ -18,12 +18,9 @@ public class RemoteTypedActorOneImpl extends TypedActor implements RemoteTypedAc } public void oneWay() throws Exception { - System.out.println("--------> got it!!!!!!"); RemoteTypedActorLog.oneWayLog().put("oneway"); } - - @Override public void preRestart(Throwable e) { try { RemoteTypedActorLog.messageLog().put(e.getMessage()); } catch(Exception ex) {} diff --git a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala index 8b1e0ef765..012d42f92a 100644 --- a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala +++ b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala @@ -144,5 +144,6 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite { assert(numberOfActorsInRegistry === ActorRegistry.actors.length) actor.stop } + } diff --git a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedActorSpec.scala b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedActorSpec.scala index 76c50f7658..b800fbf2c3 100644 --- a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedActorSpec.scala +++ b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedActorSpec.scala @@ -4,42 +4,46 @@ package se.scalablesolutions.akka.actor.remote -import java.util.concurrent.{CountDownLatch, TimeUnit} -import org.scalatest.junit.JUnitSuite -import org.junit.{Test, Before, After} +import org.scalatest.Spec +import org.scalatest.matchers.ShouldMatchers +import org.scalatest.BeforeAndAfterAll +import org.scalatest.junit.JUnitRunner +import org.junit.runner.RunWith + +import java.util.concurrent.TimeUnit import se.scalablesolutions.akka.remote.{RemoteServer, RemoteClient} import se.scalablesolutions.akka.actor._ +import RemoteTypedActorLog._ object ServerInitiatedRemoteTypedActorSpec { val HOSTNAME = "localhost" val PORT = 9990 var server: RemoteServer = null - - class SimpleActor extends Actor { - def receive = { - case _ => println("received message") - } - } - - } -class ServerInitiatedRemoteTypedActorSpec extends JUnitSuite { +@RunWith(classOf[JUnitRunner]) +class ServerInitiatedRemoteTypedActorSpec extends + Spec with + ShouldMatchers with + BeforeAndAfterAll { import ServerInitiatedRemoteTypedActorSpec._ + private val unit = TimeUnit.MILLISECONDS - @Before - def init { + override def beforeAll = { server = new RemoteServer() server.start(HOSTNAME, PORT) + + val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000) + server.registerTypedActor("typed-actor-service", typedActor) + Thread.sleep(1000) } // make sure the servers shutdown cleanly after the test has finished - @After - def finished { + override def afterAll = { try { server.shutdown RemoteClient.shutdownAll @@ -49,38 +53,60 @@ class ServerInitiatedRemoteTypedActorSpec extends JUnitSuite { } } + describe("Server managed remote typed Actor ") { - @Test - def shouldSendWithBang { + it("should receive one-way message") { + clearMessageLogs + val actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", 5000L, HOSTNAME, PORT) + expect("oneway") { + actor.oneWay + oneWayLog.poll(5, TimeUnit.SECONDS) + } + } - /* - val clientManangedTypedActor = TypedActor.newRemoteInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000, HOSTNAME, PORT) - clientManangedTypedActor.requestReply("test-string") - Thread.sleep(2000) - println("###########") - */ - /* - trace() - val actor = Actor.actorOf[SimpleActor].start - server.register("simple-actor", actor) - val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000) - server.registerTypedActor("typed-actor-service", typedActor) - println("### registered actor") - trace() + it("should respond to request-reply message") { + clearMessageLogs + val actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", 5000L, HOSTNAME, PORT) + expect("pong") { + actor.requestReply("ping") + } + } - //val actorRef = RemoteActorRef("typed-actor-service", classOf[RemoteTypedActorOneImpl].getName, HOSTNAME, PORT, 5000L, None) - //val myActor = TypedActor.createProxyForRemoteActorRef(classOf[RemoteTypedActorOne], actorRef) - val myActor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", classOf[RemoteTypedActorOneImpl].getName, 5000L, HOSTNAME, PORT) - println("### call one way") - myActor.oneWay() - Thread.sleep(3000) - println("### call one way - done") - */ - //assert(RemoteActorSpecActorUnidirectional.latch.await(1, TimeUnit.SECONDS)) - //actor.stop - /* */ + it("should not recreate registered actors") { + val actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", 5000L, HOSTNAME, PORT) + val numberOfActorsInRegistry = ActorRegistry.actors.length + expect("oneway") { + actor.oneWay + oneWayLog.poll(5, TimeUnit.SECONDS) + } + assert(numberOfActorsInRegistry === ActorRegistry.actors.length) + } + + it("should support multiple variants to get the actor from client side") { + var actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", 5000L, HOSTNAME, PORT) + expect("oneway") { + actor.oneWay + oneWayLog.poll(5, TimeUnit.SECONDS) + } + actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", HOSTNAME, PORT) + expect("oneway") { + actor.oneWay + oneWayLog.poll(5, TimeUnit.SECONDS) + } + actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", 5000L, HOSTNAME, PORT, this.getClass().getClassLoader) + expect("oneway") { + actor.oneWay + oneWayLog.poll(5, TimeUnit.SECONDS) + } + } + + it("should register and unregister typed actors") { + val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000) + server.registerTypedActor("my-test-service", typedActor) + assert(server.typedActors().get("my-test-service") != null) + server.unregisterTypedActor("my-test-service") + assert(server.typedActors().get("my-test-service") == null) + } } - - } diff --git a/akka-typed-actor/src/main/scala/actor/TypedActor.scala b/akka-typed-actor/src/main/scala/actor/TypedActor.scala index 7f0ee5dbbc..385c1831a4 100644 --- a/akka-typed-actor/src/main/scala/actor/TypedActor.scala +++ b/akka-typed-actor/src/main/scala/actor/TypedActor.scala @@ -182,7 +182,6 @@ abstract class TypedActor extends Actor with Proxyable { case Link(proxy) => self.link(proxy) case Unlink(proxy) => self.unlink(proxy) - case method: String => println("### got method") case unexpected => throw new IllegalActorStateException( "Unexpected message [" + unexpected + "] sent to [" + this + "]") } @@ -408,8 +407,11 @@ object TypedActor extends Logging { proxy.asInstanceOf[T] } - // FIXME - def createProxyForRemoteActorRef[T](intfClass: Class[T], actorRef: ActorRef): T = { + /** + * Create a proxy for a RemoteActorRef representing a server managed remote typed actor. + * + */ + private[akka] def createProxyForRemoteActorRef[T](intfClass: Class[T], actorRef: ActorRef): T = { class MyInvocationHandler extends InvocationHandler { def invoke(proxy: AnyRef, method: Method, args: Array[AnyRef]): AnyRef = { @@ -423,9 +425,6 @@ object TypedActor extends Logging { val jProxy = JProxy.newProxyInstance(intfClass.getClassLoader(), interfaces, handler) val awProxy = Proxy.newInstance(interfaces, Array(jProxy, jProxy), true, false) - // TODO: needed? - //typedActor.initialize(proxy) - // TODO: timeout?? AspectInitRegistry.register(awProxy, AspectInit(intfClass, null, actorRef, None, 5000L)) awProxy.asInstanceOf[T] } @@ -569,6 +568,7 @@ object TypedActor extends Logging { private[akka] def isJoinPoint(message: Any): Boolean = message.isInstanceOf[JoinPoint] } + /** * AspectWerkz Aspect that is turning POJO into proxy to a server managed remote TypedActor. *

@@ -578,103 +578,18 @@ object TypedActor extends Logging { * @author Jonas Bonér */ @Aspect("perInstance") -private[akka] sealed class ServerManagedTypedActorAspect { - @volatile private var isInitialized = false - @volatile private var isStopped = false - private var interfaceClass: Class[_] = _ - private var actorRef: ActorRef = _ - private var timeout: Long = _ - private var uuid: String = _ - private var remoteAddress: Option[InetSocketAddress] = _ - - //FIXME - +private[akka] sealed class ServerManagedTypedActorAspect extends ActorAspect { + @Around("execution(* *.*(..)) && this(se.scalablesolutions.akka.actor.ServerManagedTypedActor)") def invoke(joinPoint: JoinPoint): AnyRef = { - println("### MyAspect intercepted " + joinPoint.getSignature) if (!isInitialized) initialize(joinPoint) remoteDispatch(joinPoint) } - - private def remoteDispatch(joinPoint: JoinPoint): AnyRef = { - val methodRtti = joinPoint.getRtti.asInstanceOf[MethodRtti] - val isOneWay = TypedActor.isOneWay(methodRtti) - - val (message: Array[AnyRef], isEscaped) = escapeArguments(methodRtti.getParameterValues) - - println("### remote dispatch...") - - val future = RemoteClientModule.send[AnyRef]( - message, None, None, remoteAddress.get, - timeout, isOneWay, actorRef, - Some((interfaceClass.getName, methodRtti.getMethod.getName)), - ActorType.TypedActor) - - if (isOneWay) null // for void methods - else { - if (future.isDefined) { - future.get.await - val result = getResultOrThrowException(future.get) - if (result.isDefined) result.get - else throw new IllegalActorStateException("No result returned from call to [" + joinPoint + "]") - } else throw new IllegalActorStateException("No future returned from call to [" + joinPoint + "]") - } - } - - /* - private def remoteDispatch(joinPoint: JoinPoint): AnyRef = { - val methodRtti = joinPoint.getRtti.asInstanceOf[MethodRtti] - val isOneWay = TypedActor.isOneWay(methodRtti) - val senderActorRef = Some(SenderContextInfo.senderActorRef.value) - - - if (!actorRef.isRunning && !isStopped) { - isStopped = true - // FIXME - what to do? - // joinPoint.proceed - null - } else if (isOneWay) { - actorRef.!("joinPoint") - //actorRef.!(joinPoint)(senderActorRef) - null.asInstanceOf[AnyRef] - } else if (TypedActor.returnsFuture_?(methodRtti)) { - actorRef.!!!(joinPoint, timeout)(senderActorRef) - } else { - val result = (actorRef.!!(joinPoint, timeout)(senderActorRef)).as[AnyRef] - if (result.isDefined) result.get - else throw new ActorTimeoutException("Invocation to [" + joinPoint + "] timed out.") - } - } - */ - private def getResultOrThrowException[T](future: Future[T]): Option[T] = - if (future.exception.isDefined) throw future.exception.get - else future.result - - private def escapeArguments(args: Array[AnyRef]): Tuple2[Array[AnyRef], Boolean] = { - var isEscaped = false - val escapedArgs = for (arg <- args) yield { - val clazz = arg.getClass - if (clazz.getName.contains(TypedActor.AW_PROXY_PREFIX)) { - isEscaped = true - TypedActor.AW_PROXY_PREFIX + clazz.getSuperclass.getName - } else arg - } - (escapedArgs, isEscaped) - } - - - private def initialize(joinPoint: JoinPoint): Unit = { - val init = AspectInitRegistry.initFor(joinPoint.getThis) - interfaceClass = init.interfaceClass - actorRef = init.actorRef - uuid = actorRef.uuid + override def initialize(joinPoint: JoinPoint): Unit = { + super.initialize(joinPoint) remoteAddress = actorRef.remoteAddress - println("### address= " + remoteAddress.get) - timeout = init.timeout - isInitialized = true } - } /** @@ -686,16 +601,8 @@ private[akka] sealed class ServerManagedTypedActorAspect { * @author Jonas Bonér */ @Aspect("perInstance") -private[akka] sealed class TypedActorAspect { - @volatile private var isInitialized = false - @volatile private var isStopped = false - private var interfaceClass: Class[_] = _ - private var typedActor: TypedActor = _ - private var actorRef: ActorRef = _ - private var remoteAddress: Option[InetSocketAddress] = _ - private var timeout: Long = _ - private var uuid: String = _ - +private[akka] sealed class TypedActorAspect extends ActorAspect { + @Around("execution(* *.*(..)) && !this(se.scalablesolutions.akka.actor.ServerManagedTypedActor)") def invoke(joinPoint: JoinPoint): AnyRef = { if (!isInitialized) initialize(joinPoint) @@ -706,12 +613,26 @@ private[akka] sealed class TypedActorAspect { if (remoteAddress.isDefined) remoteDispatch(joinPoint) else localDispatch(joinPoint) } +} - private def localDispatch(joinPoint: JoinPoint): AnyRef = { - val methodRtti = joinPoint.getRtti.asInstanceOf[MethodRtti] - val isOneWay = TypedActor.isOneWay(methodRtti) +/** + * Base class for TypedActorAspect and ServerManagedTypedActorAspect to reduce code duplication. + */ +private[akka] abstract class ActorAspect { + @volatile protected var isInitialized = false + @volatile protected var isStopped = false + protected var interfaceClass: Class[_] = _ + protected var typedActor: TypedActor = _ + protected var actorRef: ActorRef = _ + protected var timeout: Long = _ + protected var uuid: String = _ + protected var remoteAddress: Option[InetSocketAddress] = _ + + protected def localDispatch(joinPoint: JoinPoint): AnyRef = { + val methodRtti = joinPoint.getRtti.asInstanceOf[MethodRtti] + val isOneWay = TypedActor.isOneWay(methodRtti) val senderActorRef = Some(SenderContextInfo.senderActorRef.value) - val senderProxy = Some(SenderContextInfo.senderProxy.value) + val senderProxy = Some(SenderContextInfo.senderProxy.value) typedActor.context._sender = senderProxy if (!actorRef.isRunning && !isStopped) { @@ -732,7 +653,7 @@ private[akka] sealed class TypedActorAspect { } } - private def remoteDispatch(joinPoint: JoinPoint): AnyRef = { + protected def remoteDispatch(joinPoint: JoinPoint): AnyRef = { val methodRtti = joinPoint.getRtti.asInstanceOf[MethodRtti] val isOneWay = TypedActor.isOneWay(methodRtti) @@ -771,7 +692,7 @@ private[akka] sealed class TypedActorAspect { (escapedArgs, isEscaped) } - private def initialize(joinPoint: JoinPoint): Unit = { + protected def initialize(joinPoint: JoinPoint): Unit = { val init = AspectInitRegistry.initFor(joinPoint.getThis) interfaceClass = init.interfaceClass typedActor = init.targetInstance @@ -839,4 +760,7 @@ private[akka] sealed case class AspectInit( } +/** + * Marker interface for server manager typed actors. + */ private[akka] sealed trait ServerManagedTypedActor extends TypedActor \ No newline at end of file diff --git a/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorLifecycleSpec.scala b/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorLifecycleSpec.scala index 1fcbe0c5ef..10fc40493b 100644 --- a/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorLifecycleSpec.scala +++ b/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorLifecycleSpec.scala @@ -45,7 +45,6 @@ class TypedActorLifecycleSpec extends Spec with ShouldMatchers with BeforeAndAft fail("expected exception not thrown") } catch { case e: RuntimeException => { - println("#failed") cdl.await assert(SamplePojoImpl._pre) assert(SamplePojoImpl._post)