Merge branch 'ticket440'

This commit is contained in:
Michael Kober 2010-09-29 20:39:45 +02:00
commit 5aaecc4897
2 changed files with 254 additions and 31 deletions

View file

@ -309,6 +309,31 @@ final class TypedActorContext(private val actorRef: ActorRef) {
def getSenderFuture = senderFuture
}
object TypedActorConfiguration {
def apply() : TypedActorConfiguration = {
new TypedActorConfiguration()
}
def apply(timeout: Long) : TypedActorConfiguration = {
new TypedActorConfiguration().timeout(Duration(timeout, "millis"))
}
def apply(host: String, port: Int) : TypedActorConfiguration = {
new TypedActorConfiguration().makeRemote(host, port)
}
def apply(host: String, port: Int, timeout: Long) : TypedActorConfiguration = {
new TypedActorConfiguration().makeRemote(host, port).timeout(Duration(timeout, "millis"))
}
def apply(transactionRequired: Boolean) : TypedActorConfiguration = {
if (transactionRequired) {
new TypedActorConfiguration().makeTransactionRequired
} else new TypedActorConfiguration()
}
}
/**
* Configuration factory for TypedActors.
*
@ -332,8 +357,10 @@ final class TypedActorConfiguration {
this
}
def makeRemote(hostname: String, port: Int) : TypedActorConfiguration = {
_host = Some(new InetSocketAddress(hostname, port))
def makeRemote(hostname: String, port: Int): TypedActorConfiguration = makeRemote(new InetSocketAddress(hostname, port))
def makeRemote(remoteAddress: InetSocketAddress): TypedActorConfiguration = {
_host = Some(remoteAddress)
this
}
@ -352,6 +379,15 @@ final class TypedActorConfiguration {
}
}
/**
* Factory closure for an TypedActor, to be used with 'TypedActor.newInstance(interface, factory)'.
*
* @author michaelkober
*/
trait TypedActorFactory {
def create: TypedActor
}
/**
* Factory class for creating TypedActors out of plain POJOs and/or POJOs with interfaces.
*
@ -366,24 +402,125 @@ object TypedActor extends Logging {
val AKKA_CAMEL_ROUTING_SCHEME = "akka".intern
private[actor] val AW_PROXY_PREFIX = "$$ProxiedByAW".intern
/**
* Factory method for typed actor.
* @param intfClass interface the typed actor implements
* @param targetClass implementation class of the typed actor
*/
def newInstance[T](intfClass: Class[T], targetClass: Class[_]): T = {
newInstance(intfClass, targetClass, None, Actor.TIMEOUT)
newInstance(intfClass, targetClass, TypedActorConfiguration())
}
/**
* Factory method for typed actor.
* @param intfClass interface the typed actor implements
* @param factory factory method that constructs the typed actor
*/
def newInstance[T](intfClass: Class[T], factory: => AnyRef): T = {
newInstance(intfClass, factory, TypedActorConfiguration())
}
/**
* Factory method for remote typed actor.
* @param intfClass interface the typed actor implements
* @param targetClass implementation class of the typed actor
* @param host hostanme of the remote server
* @param port port of the remote server
*/
def newRemoteInstance[T](intfClass: Class[T], targetClass: Class[_], hostname: String, port: Int): T = {
newInstance(intfClass, targetClass, Some(new InetSocketAddress(hostname, port)), Actor.TIMEOUT)
newInstance(intfClass, targetClass, TypedActorConfiguration(hostname, port))
}
def newInstance[T](intfClass: Class[T], targetClass: Class[_], timeout: Long = Actor.TIMEOUT): T = {
newInstance(intfClass, targetClass, None, timeout)
/**
* Factory method for remote typed actor.
* @param intfClass interface the typed actor implements
* @param factory factory method that constructs the typed actor
* @param host hostanme of the remote server
* @param port port of the remote server
*/
def newRemoteInstance[T](intfClass: Class[T], factory: => AnyRef, hostname: String, port: Int): T = {
newInstance(intfClass, factory, TypedActorConfiguration(hostname, port))
}
def newRemoteInstance[T](intfClass: Class[T], targetClass: Class[_], timeout: Long = Actor.TIMEOUT, hostname: String, port: Int): T = {
newInstance(intfClass, targetClass, Some(new InetSocketAddress(hostname, port)), timeout)
/**
* Factory method for typed actor.
* @param intfClass interface the typed actor implements
* @param targetClass implementation class of the typed actor
* @param timeout timeout for future
*/
def newInstance[T](intfClass: Class[T], targetClass: Class[_], timeout: Long) : T = {
newInstance(intfClass, targetClass, TypedActorConfiguration(timeout))
}
/**
* Factory method for typed actor.
* @param intfClass interface the typed actor implements
* @param factory factory method that constructs the typed actor
* @param timeout timeout for future
*/
def newInstance[T](intfClass: Class[T], factory: => AnyRef, timeout: Long) : T = {
newInstance(intfClass, factory, TypedActorConfiguration(timeout))
}
/**
* Factory method for remote typed actor.
* @param intfClass interface the typed actor implements
* @param targetClass implementation class of the typed actor
* @paramm timeout timeout for future
* @param host hostanme of the remote server
* @param port port of the remote server
*/
def newRemoteInstance[T](intfClass: Class[T], targetClass: Class[_], timeout: Long, hostname: String, port: Int): T = {
newInstance(intfClass, targetClass, TypedActorConfiguration(hostname, port, timeout))
}
/**
* Factory method for remote typed actor.
* @param intfClass interface the typed actor implements
* @param factory factory method that constructs the typed actor
* @paramm timeout timeout for future
* @param host hostanme of the remote server
* @param port port of the remote server
*/
def newRemoteInstance[T](intfClass: Class[T], factory: => AnyRef, timeout: Long, hostname: String, port: Int): T = {
newInstance(intfClass, factory, TypedActorConfiguration(hostname, port, timeout))
}
/**
* Factory method for typed actor.
* @param intfClass interface the typed actor implements
* @param factory factory method that constructs the typed actor
* @paramm config configuration object fo the typed actor
*/
def newInstance[T](intfClass: Class[T], factory: => AnyRef, config: TypedActorConfiguration): T = {
val actorRef = actorOf(newTypedActor(factory))
newInstance(intfClass, actorRef, config)
}
/**
* Factory method for typed actor.
* @param intfClass interface the typed actor implements
* @param targetClass implementation class of the typed actor
* @paramm config configuration object fo the typed actor
*/
def newInstance[T](intfClass: Class[T], targetClass: Class[_], config: TypedActorConfiguration): T = {
val actorRef = actorOf(newTypedActor(targetClass))
newInstance(intfClass, actorRef, config)
}
private[akka] def newInstance[T](intfClass: Class[T], actorRef: ActorRef): T = {
if (!actorRef.actorInstance.get.isInstanceOf[TypedActor]) throw new IllegalArgumentException("ActorRef is not a ref to a typed actor")
newInstance(intfClass, actorRef, TypedActorConfiguration())
}
private[akka] def newInstance[T](intfClass: Class[T], targetClass: Class[_],
remoteAddress: Option[InetSocketAddress], timeout: Long): T = {
val config = TypedActorConfiguration(timeout)
if (remoteAddress.isDefined) config.makeRemote(remoteAddress.get)
newInstance(intfClass, targetClass, config)
}
private def newInstance[T](intfClass: Class[T], actorRef: ActorRef, config: TypedActorConfiguration) : T = {
val typedActor = actorRef.actorInstance.get.asInstanceOf[TypedActor]
val proxy = Proxy.newInstance(Array(intfClass), Array(typedActor), true, false)
typedActor.initialize(proxy)
@ -391,33 +528,55 @@ object TypedActor extends Logging {
if (config._threadBasedDispatcher.isDefined) actorRef.dispatcher = Dispatchers.newThreadBasedDispatcher(actorRef)
if (config._host.isDefined) actorRef.makeRemote(config._host.get)
actorRef.timeout = config.timeout
AspectInitRegistry.register(proxy, AspectInit(intfClass, typedActor, actorRef, config._host, config.timeout))
actorRef.start
proxy.asInstanceOf[T]
}
private[akka] def newInstance[T](intfClass: Class[T], actorRef: ActorRef): T = {
if (!actorRef.actorInstance.get.isInstanceOf[TypedActor]) throw new IllegalArgumentException("ActorRef is not a ref to a typed actor")
val typedActor = actorRef.actorInstance.get.asInstanceOf[TypedActor]
val proxy = Proxy.newInstance(Array(intfClass), Array(typedActor), true, false)
typedActor.initialize(proxy)
AspectInitRegistry.register(proxy, AspectInit(intfClass, typedActor, actorRef, actorRef.remoteAddress, actorRef.timeout))
actorRef.start
proxy.asInstanceOf[T]
}
private[akka] def newInstance[T](intfClass: Class[T], targetClass: Class[_],
remoteAddress: Option[InetSocketAddress], timeout: Long): T = {
val actorRef = actorOf(newTypedActor(targetClass))
val typedActor = actorRef.actorInstance.get.asInstanceOf[TypedActor]
val proxy = Proxy.newInstance(Array(intfClass), Array(typedActor), true, false)
typedActor.initialize(proxy)
actorRef.timeout = timeout
if (remoteAddress.isDefined) actorRef.makeRemote(remoteAddress.get)
AspectInitRegistry.register(proxy, AspectInit(intfClass, typedActor, actorRef, remoteAddress, timeout))
actorRef.start
proxy.asInstanceOf[T]
}
/**
* Java API.
* NOTE: Use this convenience method with care, do NOT make it possible to get a reference to the
* TypedActor instance directly, but only through its 'ActorRef' wrapper reference.
* <p/>
* Creates an ActorRef out of the Actor. Allows you to pass in the instance for the TypedActor.
* Only use this method when you need to pass in constructor arguments into the 'TypedActor'.
* <p/>
* You use it by implementing the TypedActorFactory interface.
* Example in Java:
* <pre>
* MyPojo pojo = TypedActor.newInstance(MyPojo.class, new TypedActorFactory() {
* public TypedActor create() {
* return new MyTypedActor("service:name", 5);
* }
* });
* </pre>
*/
def newInstance[T](intfClass: Class[T], factory: TypedActorFactory) : T =
newInstance(intfClass, factory.create)
/**
* Java API.
*/
def newRemoteInstance[T](intfClass: Class[T], factory: TypedActorFactory, hostname: String, port: Int) : T =
newRemoteInstance(intfClass, factory.create, hostname, port)
/**
* Java API.
*/
def newRemoteInstance[T](intfClass: Class[T], factory: TypedActorFactory, timeout: Long, hostname: String, port: Int) : T =
newRemoteInstance(intfClass, factory.create, timeout, hostname, port)
/**
* Java API.
*/
def newInstance[T](intfClass: Class[T], factory: TypedActorFactory, timeout: Long) : T =
newInstance(intfClass, factory.create, timeout)
/**
* Java API.
*/
def newInstance[T](intfClass: Class[T], factory: TypedActorFactory, config: TypedActorConfiguration): T =
newInstance(intfClass, factory.create, config)
/**
* Create a proxy for a RemoteActorRef representing a server managed remote typed actor.
@ -557,6 +716,15 @@ object TypedActor extends Logging {
typedActor
}
private[akka] def newTypedActor(factory: => AnyRef): TypedActor = {
val instance = factory
val typedActor =
if (instance.isInstanceOf[TypedActor]) instance.asInstanceOf[TypedActor]
else throw new IllegalArgumentException("Actor [" + instance.getClass.getName + "] is not a sub class of 'TypedActor'")
typedActor.preStart
typedActor
}
private[akka] def isOneWay(joinPoint: JoinPoint): Boolean =
isOneWay(joinPoint.getRtti.asInstanceOf[MethodRtti])

View file

@ -11,7 +11,46 @@ import org.scalatest.BeforeAndAfterAll
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
import se.scalablesolutions.akka.dispatch.DefaultCompletableFuture;
import se.scalablesolutions.akka.dispatch.DefaultCompletableFuture
import TypedActorSpec._
object TypedActorSpec {
trait MyTypedActor {
def sendOneWay(msg: String) : Unit
def sendRequestReply(msg: String) : String
}
class MyTypedActorImpl extends TypedActor with MyTypedActor {
self.id = "my-custom-id"
def sendOneWay(msg: String) {
println("got " + msg )
}
def sendRequestReply(msg: String) : String = {
"got " + msg
}
}
class MyTypedActorWithConstructorArgsImpl(aString: String, aLong: Long) extends TypedActor with MyTypedActor {
self.id = "my-custom-id"
def sendOneWay(msg: String) {
println("got " + msg + " " + aString + " " + aLong)
}
def sendRequestReply(msg: String) : String = {
msg + " " + aString + " " + aLong
}
}
class MyActor extends Actor {
self.id = "my-custom-id"
def receive = {
case msg: String => println("got " + msg)
}
}
}
@RunWith(classOf[JUnitRunner])
class TypedActorSpec extends
@ -19,7 +58,12 @@ class TypedActorSpec extends
ShouldMatchers with
BeforeAndAfterAll {
override def afterAll() {
ActorRegistry.shutdownAll
}
describe("TypedActor") {
it("should resolve Future return from method defined to return a Future") {
val pojo = TypedActor.newInstance(classOf[SimpleJavaPojo], classOf[SimpleJavaPojoImpl])
val future = pojo.square(10)
@ -27,5 +71,16 @@ class TypedActorSpec extends
future.result.isDefined should equal (true)
future.result.get should equal (100)
}
it("should accept constructor arguments") {
val pojo1 = TypedActor.newInstance(classOf[MyTypedActor], new MyTypedActorWithConstructorArgsImpl("test", 1L))
assert(pojo1.sendRequestReply("hello") === "hello test 1")
val pojo2 = TypedActor.newInstance(classOf[MyTypedActor], new MyTypedActorWithConstructorArgsImpl("test2", 2L), new TypedActorConfiguration())
assert(pojo2.sendRequestReply("hello") === "hello test2 2")
val pojo3 = TypedActor.newInstance(classOf[MyTypedActor], new MyTypedActorWithConstructorArgsImpl("test3", 3L), 5000L)
assert(pojo3.sendRequestReply("hello") === "hello test3 3")
}
}
}