From ddb6d9edf99aaa5478fbc517315fecb52d716ada Mon Sep 17 00:00:00 2001 From: Michael Kober Date: Wed, 29 Sep 2010 14:42:05 +0200 Subject: [PATCH] closing ticket440, implemented typed actor with constructor args --- .../src/main/scala/actor/TypedActor.scala | 176 +++++++++++++++--- .../actor/typed-actor/TypedActorSpec.scala | 57 +++++- 2 files changed, 202 insertions(+), 31 deletions(-) diff --git a/akka-typed-actor/src/main/scala/actor/TypedActor.scala b/akka-typed-actor/src/main/scala/actor/TypedActor.scala index 7b35abfa57..015f06289a 100644 --- a/akka-typed-actor/src/main/scala/actor/TypedActor.scala +++ b/akka-typed-actor/src/main/scala/actor/TypedActor.scala @@ -309,6 +309,31 @@ final class TypedActorContext(private val actorRef: ActorRef) { def getSenderFuture = senderFuture } +object TypedActorConfiguration { + + def apply() : TypedActorConfiguration = { + new TypedActorConfiguration() + } + + def apply(timeout: Long) : TypedActorConfiguration = { + new TypedActorConfiguration().timeout(Duration(timeout, "millis")) + } + + def apply(host: String, port: Int) : TypedActorConfiguration = { + new TypedActorConfiguration().makeRemote(host, port) + } + + def apply(host: String, port: Int, timeout: Long) : TypedActorConfiguration = { + new TypedActorConfiguration().makeRemote(host, port).timeout(Duration(timeout, "millis")) + } + + def apply(transactionRequired: Boolean) : TypedActorConfiguration = { + if (transactionRequired) { + new TypedActorConfiguration().makeTransactionRequired + } else new TypedActorConfiguration() + } +} + /** * Configuration factory for TypedActors. * @@ -332,8 +357,10 @@ final class TypedActorConfiguration { this } - def makeRemote(hostname: String, port: Int) : TypedActorConfiguration = { - _host = Some(new InetSocketAddress(hostname, port)) + def makeRemote(hostname: String, port: Int): TypedActorConfiguration = makeRemote(new InetSocketAddress(hostname, port)) + + def makeRemote(remoteAddress: InetSocketAddress): TypedActorConfiguration = { + _host = Some(remoteAddress) this } @@ -366,24 +393,125 @@ object TypedActor extends Logging { val AKKA_CAMEL_ROUTING_SCHEME = "akka".intern private[actor] val AW_PROXY_PREFIX = "$$ProxiedByAW".intern + /** + * Factory method for typed actor. + * @param intfClass interface the typed actor implements + * @param targetClass implementation class of the typed actor + */ def newInstance[T](intfClass: Class[T], targetClass: Class[_]): T = { - newInstance(intfClass, targetClass, None, Actor.TIMEOUT) + newInstance(intfClass, targetClass, TypedActorConfiguration()) } + /** + * Factory method for typed actor. + * @param intfClass interface the typed actor implements + * @param factory factory method that constructs the typed actor + */ + def newInstance[T](intfClass: Class[T], factory: => AnyRef): T = { + newInstance(intfClass, factory, TypedActorConfiguration()) + } + + /** + * Factory method for remote typed actor. + * @param intfClass interface the typed actor implements + * @param targetClass implementation class of the typed actor + * @param host hostanme of the remote server + * @param port port of the remote server + */ def newRemoteInstance[T](intfClass: Class[T], targetClass: Class[_], hostname: String, port: Int): T = { - newInstance(intfClass, targetClass, Some(new InetSocketAddress(hostname, port)), Actor.TIMEOUT) + newInstance(intfClass, targetClass, TypedActorConfiguration(hostname, port)) } - def newInstance[T](intfClass: Class[T], targetClass: Class[_], timeout: Long = Actor.TIMEOUT): T = { - newInstance(intfClass, targetClass, None, timeout) + /** + * Factory method for remote typed actor. + * @param intfClass interface the typed actor implements + * @param factory factory method that constructs the typed actor + * @param host hostanme of the remote server + * @param port port of the remote server + */ + def newRemoteInstance[T](intfClass: Class[T], factory: => AnyRef, hostname: String, port: Int): T = { + newInstance(intfClass, factory, TypedActorConfiguration(hostname, port)) } - def newRemoteInstance[T](intfClass: Class[T], targetClass: Class[_], timeout: Long = Actor.TIMEOUT, hostname: String, port: Int): T = { - newInstance(intfClass, targetClass, Some(new InetSocketAddress(hostname, port)), timeout) + /** + * Factory method for typed actor. + * @param intfClass interface the typed actor implements + * @param targetClass implementation class of the typed actor + * @param timeout timeout for future + */ + def newInstance[T](intfClass: Class[T], targetClass: Class[_], timeout: Long) : T = { + newInstance(intfClass, targetClass, TypedActorConfiguration(timeout)) } + /** + * Factory method for typed actor. + * @param intfClass interface the typed actor implements + * @param factory factory method that constructs the typed actor + * @param timeout timeout for future + */ + def newInstance[T](intfClass: Class[T], factory: => AnyRef, timeout: Long) : T = { + newInstance(intfClass, factory, TypedActorConfiguration(timeout)) + } + + /** + * Factory method for remote typed actor. + * @param intfClass interface the typed actor implements + * @param targetClass implementation class of the typed actor + * @paramm timeout timeout for future + * @param host hostanme of the remote server + * @param port port of the remote server + */ + def newRemoteInstance[T](intfClass: Class[T], targetClass: Class[_], timeout: Long, hostname: String, port: Int): T = { + newInstance(intfClass, targetClass, TypedActorConfiguration(hostname, port, timeout)) + } + + /** + * Factory method for remote typed actor. + * @param intfClass interface the typed actor implements + * @param factory factory method that constructs the typed actor + * @paramm timeout timeout for future + * @param host hostanme of the remote server + * @param port port of the remote server + */ + def newRemoteInstance[T](intfClass: Class[T], factory: => AnyRef, timeout: Long, hostname: String, port: Int): T = { + newInstance(intfClass, factory, TypedActorConfiguration(hostname, port, timeout)) + } + + /** + * Factory method for typed actor. + * @param intfClass interface the typed actor implements + * @param factory factory method that constructs the typed actor + * @paramm config configuration object fo the typed actor + */ + def newInstance[T](intfClass: Class[T], factory: => AnyRef, config: TypedActorConfiguration): T = { + val actorRef = actorOf(newTypedActor(factory)) + newInstance(intfClass, actorRef, config) + } + + /** + * Factory method for typed actor. + * @param intfClass interface the typed actor implements + * @param targetClass implementation class of the typed actor + * @paramm config configuration object fo the typed actor + */ def newInstance[T](intfClass: Class[T], targetClass: Class[_], config: TypedActorConfiguration): T = { val actorRef = actorOf(newTypedActor(targetClass)) + newInstance(intfClass, actorRef, config) + } + + private[akka] def newInstance[T](intfClass: Class[T], actorRef: ActorRef): T = { + if (!actorRef.actorInstance.get.isInstanceOf[TypedActor]) throw new IllegalArgumentException("ActorRef is not a ref to a typed actor") + newInstance(intfClass, actorRef, TypedActorConfiguration()) + } + + private[akka] def newInstance[T](intfClass: Class[T], targetClass: Class[_], + remoteAddress: Option[InetSocketAddress], timeout: Long): T = { + val config = TypedActorConfiguration(timeout) + if (remoteAddress.isDefined) config.makeRemote(remoteAddress.get) + newInstance(intfClass, targetClass, config) + } + + private def newInstance[T](intfClass: Class[T], actorRef: ActorRef, config: TypedActorConfiguration) : T = { val typedActor = actorRef.actorInstance.get.asInstanceOf[TypedActor] val proxy = Proxy.newInstance(Array(intfClass), Array(typedActor), true, false) typedActor.initialize(proxy) @@ -391,33 +519,12 @@ object TypedActor extends Logging { if (config._threadBasedDispatcher.isDefined) actorRef.dispatcher = Dispatchers.newThreadBasedDispatcher(actorRef) if (config._host.isDefined) actorRef.makeRemote(config._host.get) actorRef.timeout = config.timeout - AspectInitRegistry.register(proxy, AspectInit(intfClass, typedActor, actorRef, config._host, config.timeout)) - actorRef.start - proxy.asInstanceOf[T] - } - - private[akka] def newInstance[T](intfClass: Class[T], actorRef: ActorRef): T = { - if (!actorRef.actorInstance.get.isInstanceOf[TypedActor]) throw new IllegalArgumentException("ActorRef is not a ref to a typed actor") - val typedActor = actorRef.actorInstance.get.asInstanceOf[TypedActor] - val proxy = Proxy.newInstance(Array(intfClass), Array(typedActor), true, false) - typedActor.initialize(proxy) AspectInitRegistry.register(proxy, AspectInit(intfClass, typedActor, actorRef, actorRef.remoteAddress, actorRef.timeout)) actorRef.start proxy.asInstanceOf[T] } - private[akka] def newInstance[T](intfClass: Class[T], targetClass: Class[_], - remoteAddress: Option[InetSocketAddress], timeout: Long): T = { - val actorRef = actorOf(newTypedActor(targetClass)) - val typedActor = actorRef.actorInstance.get.asInstanceOf[TypedActor] - val proxy = Proxy.newInstance(Array(intfClass), Array(typedActor), true, false) - typedActor.initialize(proxy) - actorRef.timeout = timeout - if (remoteAddress.isDefined) actorRef.makeRemote(remoteAddress.get) - AspectInitRegistry.register(proxy, AspectInit(intfClass, typedActor, actorRef, remoteAddress, timeout)) - actorRef.start - proxy.asInstanceOf[T] - } + /** * Create a proxy for a RemoteActorRef representing a server managed remote typed actor. @@ -557,6 +664,15 @@ object TypedActor extends Logging { typedActor } + private[akka] def newTypedActor(factory: => AnyRef): TypedActor = { + val instance = factory + val typedActor = + if (instance.isInstanceOf[TypedActor]) instance.asInstanceOf[TypedActor] + else throw new IllegalArgumentException("Actor [" + instance.getClass.getName + "] is not a sub class of 'TypedActor'") + typedActor.preStart + typedActor + } + private[akka] def isOneWay(joinPoint: JoinPoint): Boolean = isOneWay(joinPoint.getRtti.asInstanceOf[MethodRtti]) diff --git a/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorSpec.scala b/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorSpec.scala index 7de0a8f5df..761813d4dc 100644 --- a/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorSpec.scala +++ b/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorSpec.scala @@ -11,7 +11,46 @@ import org.scalatest.BeforeAndAfterAll import org.scalatest.junit.JUnitRunner import org.junit.runner.RunWith -import se.scalablesolutions.akka.dispatch.DefaultCompletableFuture; +import se.scalablesolutions.akka.dispatch.DefaultCompletableFuture +import TypedActorSpec._ + + +object TypedActorSpec { + trait MyTypedActor { + def sendOneWay(msg: String) : Unit + def sendRequestReply(msg: String) : String + } + + class MyTypedActorImpl extends TypedActor with MyTypedActor { + self.id = "my-custom-id" + def sendOneWay(msg: String) { + println("got " + msg ) + } + def sendRequestReply(msg: String) : String = { + "got " + msg + } + } + + class MyTypedActorWithConstructorArgsImpl(aString: String, aLong: Long) extends TypedActor with MyTypedActor { + self.id = "my-custom-id" + def sendOneWay(msg: String) { + println("got " + msg + " " + aString + " " + aLong) + } + + def sendRequestReply(msg: String) : String = { + msg + " " + aString + " " + aLong + } + } + + class MyActor extends Actor { + self.id = "my-custom-id" + def receive = { + case msg: String => println("got " + msg) + } + } + +} + @RunWith(classOf[JUnitRunner]) class TypedActorSpec extends @@ -19,7 +58,12 @@ class TypedActorSpec extends ShouldMatchers with BeforeAndAfterAll { + override def afterAll() { + ActorRegistry.shutdownAll + } + describe("TypedActor") { + it("should resolve Future return from method defined to return a Future") { val pojo = TypedActor.newInstance(classOf[SimpleJavaPojo], classOf[SimpleJavaPojoImpl]) val future = pojo.square(10) @@ -27,5 +71,16 @@ class TypedActorSpec extends future.result.isDefined should equal (true) future.result.get should equal (100) } + + it("should accept constructor arguments") { + val pojo1 = TypedActor.newInstance(classOf[MyTypedActor], new MyTypedActorWithConstructorArgsImpl("test", 1L)) + assert(pojo1.sendRequestReply("hello") === "hello test 1") + + val pojo2 = TypedActor.newInstance(classOf[MyTypedActor], new MyTypedActorWithConstructorArgsImpl("test2", 2L), new TypedActorConfiguration()) + assert(pojo2.sendRequestReply("hello") === "hello test2 2") + + val pojo3 = TypedActor.newInstance(classOf[MyTypedActor], new MyTypedActorWithConstructorArgsImpl("test3", 3L), 5000L) + assert(pojo3.sendRequestReply("hello") === "hello test3 3") + } } }