closing ticket440, implemented typed actor with constructor args
This commit is contained in:
parent
94344041a0
commit
ddb6d9edf9
2 changed files with 202 additions and 31 deletions
|
|
@ -309,6 +309,31 @@ final class TypedActorContext(private val actorRef: ActorRef) {
|
|||
def getSenderFuture = senderFuture
|
||||
}
|
||||
|
||||
object TypedActorConfiguration {
|
||||
|
||||
def apply() : TypedActorConfiguration = {
|
||||
new TypedActorConfiguration()
|
||||
}
|
||||
|
||||
def apply(timeout: Long) : TypedActorConfiguration = {
|
||||
new TypedActorConfiguration().timeout(Duration(timeout, "millis"))
|
||||
}
|
||||
|
||||
def apply(host: String, port: Int) : TypedActorConfiguration = {
|
||||
new TypedActorConfiguration().makeRemote(host, port)
|
||||
}
|
||||
|
||||
def apply(host: String, port: Int, timeout: Long) : TypedActorConfiguration = {
|
||||
new TypedActorConfiguration().makeRemote(host, port).timeout(Duration(timeout, "millis"))
|
||||
}
|
||||
|
||||
def apply(transactionRequired: Boolean) : TypedActorConfiguration = {
|
||||
if (transactionRequired) {
|
||||
new TypedActorConfiguration().makeTransactionRequired
|
||||
} else new TypedActorConfiguration()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Configuration factory for TypedActors.
|
||||
*
|
||||
|
|
@ -332,8 +357,10 @@ final class TypedActorConfiguration {
|
|||
this
|
||||
}
|
||||
|
||||
def makeRemote(hostname: String, port: Int) : TypedActorConfiguration = {
|
||||
_host = Some(new InetSocketAddress(hostname, port))
|
||||
def makeRemote(hostname: String, port: Int): TypedActorConfiguration = makeRemote(new InetSocketAddress(hostname, port))
|
||||
|
||||
def makeRemote(remoteAddress: InetSocketAddress): TypedActorConfiguration = {
|
||||
_host = Some(remoteAddress)
|
||||
this
|
||||
}
|
||||
|
||||
|
|
@ -366,24 +393,125 @@ object TypedActor extends Logging {
|
|||
val AKKA_CAMEL_ROUTING_SCHEME = "akka".intern
|
||||
private[actor] val AW_PROXY_PREFIX = "$$ProxiedByAW".intern
|
||||
|
||||
/**
|
||||
* Factory method for typed actor.
|
||||
* @param intfClass interface the typed actor implements
|
||||
* @param targetClass implementation class of the typed actor
|
||||
*/
|
||||
def newInstance[T](intfClass: Class[T], targetClass: Class[_]): T = {
|
||||
newInstance(intfClass, targetClass, None, Actor.TIMEOUT)
|
||||
newInstance(intfClass, targetClass, TypedActorConfiguration())
|
||||
}
|
||||
|
||||
/**
|
||||
* Factory method for typed actor.
|
||||
* @param intfClass interface the typed actor implements
|
||||
* @param factory factory method that constructs the typed actor
|
||||
*/
|
||||
def newInstance[T](intfClass: Class[T], factory: => AnyRef): T = {
|
||||
newInstance(intfClass, factory, TypedActorConfiguration())
|
||||
}
|
||||
|
||||
/**
|
||||
* Factory method for remote typed actor.
|
||||
* @param intfClass interface the typed actor implements
|
||||
* @param targetClass implementation class of the typed actor
|
||||
* @param host hostanme of the remote server
|
||||
* @param port port of the remote server
|
||||
*/
|
||||
def newRemoteInstance[T](intfClass: Class[T], targetClass: Class[_], hostname: String, port: Int): T = {
|
||||
newInstance(intfClass, targetClass, Some(new InetSocketAddress(hostname, port)), Actor.TIMEOUT)
|
||||
newInstance(intfClass, targetClass, TypedActorConfiguration(hostname, port))
|
||||
}
|
||||
|
||||
def newInstance[T](intfClass: Class[T], targetClass: Class[_], timeout: Long = Actor.TIMEOUT): T = {
|
||||
newInstance(intfClass, targetClass, None, timeout)
|
||||
/**
|
||||
* Factory method for remote typed actor.
|
||||
* @param intfClass interface the typed actor implements
|
||||
* @param factory factory method that constructs the typed actor
|
||||
* @param host hostanme of the remote server
|
||||
* @param port port of the remote server
|
||||
*/
|
||||
def newRemoteInstance[T](intfClass: Class[T], factory: => AnyRef, hostname: String, port: Int): T = {
|
||||
newInstance(intfClass, factory, TypedActorConfiguration(hostname, port))
|
||||
}
|
||||
|
||||
def newRemoteInstance[T](intfClass: Class[T], targetClass: Class[_], timeout: Long = Actor.TIMEOUT, hostname: String, port: Int): T = {
|
||||
newInstance(intfClass, targetClass, Some(new InetSocketAddress(hostname, port)), timeout)
|
||||
/**
|
||||
* Factory method for typed actor.
|
||||
* @param intfClass interface the typed actor implements
|
||||
* @param targetClass implementation class of the typed actor
|
||||
* @param timeout timeout for future
|
||||
*/
|
||||
def newInstance[T](intfClass: Class[T], targetClass: Class[_], timeout: Long) : T = {
|
||||
newInstance(intfClass, targetClass, TypedActorConfiguration(timeout))
|
||||
}
|
||||
|
||||
/**
|
||||
* Factory method for typed actor.
|
||||
* @param intfClass interface the typed actor implements
|
||||
* @param factory factory method that constructs the typed actor
|
||||
* @param timeout timeout for future
|
||||
*/
|
||||
def newInstance[T](intfClass: Class[T], factory: => AnyRef, timeout: Long) : T = {
|
||||
newInstance(intfClass, factory, TypedActorConfiguration(timeout))
|
||||
}
|
||||
|
||||
/**
|
||||
* Factory method for remote typed actor.
|
||||
* @param intfClass interface the typed actor implements
|
||||
* @param targetClass implementation class of the typed actor
|
||||
* @paramm timeout timeout for future
|
||||
* @param host hostanme of the remote server
|
||||
* @param port port of the remote server
|
||||
*/
|
||||
def newRemoteInstance[T](intfClass: Class[T], targetClass: Class[_], timeout: Long, hostname: String, port: Int): T = {
|
||||
newInstance(intfClass, targetClass, TypedActorConfiguration(hostname, port, timeout))
|
||||
}
|
||||
|
||||
/**
|
||||
* Factory method for remote typed actor.
|
||||
* @param intfClass interface the typed actor implements
|
||||
* @param factory factory method that constructs the typed actor
|
||||
* @paramm timeout timeout for future
|
||||
* @param host hostanme of the remote server
|
||||
* @param port port of the remote server
|
||||
*/
|
||||
def newRemoteInstance[T](intfClass: Class[T], factory: => AnyRef, timeout: Long, hostname: String, port: Int): T = {
|
||||
newInstance(intfClass, factory, TypedActorConfiguration(hostname, port, timeout))
|
||||
}
|
||||
|
||||
/**
|
||||
* Factory method for typed actor.
|
||||
* @param intfClass interface the typed actor implements
|
||||
* @param factory factory method that constructs the typed actor
|
||||
* @paramm config configuration object fo the typed actor
|
||||
*/
|
||||
def newInstance[T](intfClass: Class[T], factory: => AnyRef, config: TypedActorConfiguration): T = {
|
||||
val actorRef = actorOf(newTypedActor(factory))
|
||||
newInstance(intfClass, actorRef, config)
|
||||
}
|
||||
|
||||
/**
|
||||
* Factory method for typed actor.
|
||||
* @param intfClass interface the typed actor implements
|
||||
* @param targetClass implementation class of the typed actor
|
||||
* @paramm config configuration object fo the typed actor
|
||||
*/
|
||||
def newInstance[T](intfClass: Class[T], targetClass: Class[_], config: TypedActorConfiguration): T = {
|
||||
val actorRef = actorOf(newTypedActor(targetClass))
|
||||
newInstance(intfClass, actorRef, config)
|
||||
}
|
||||
|
||||
private[akka] def newInstance[T](intfClass: Class[T], actorRef: ActorRef): T = {
|
||||
if (!actorRef.actorInstance.get.isInstanceOf[TypedActor]) throw new IllegalArgumentException("ActorRef is not a ref to a typed actor")
|
||||
newInstance(intfClass, actorRef, TypedActorConfiguration())
|
||||
}
|
||||
|
||||
private[akka] def newInstance[T](intfClass: Class[T], targetClass: Class[_],
|
||||
remoteAddress: Option[InetSocketAddress], timeout: Long): T = {
|
||||
val config = TypedActorConfiguration(timeout)
|
||||
if (remoteAddress.isDefined) config.makeRemote(remoteAddress.get)
|
||||
newInstance(intfClass, targetClass, config)
|
||||
}
|
||||
|
||||
private def newInstance[T](intfClass: Class[T], actorRef: ActorRef, config: TypedActorConfiguration) : T = {
|
||||
val typedActor = actorRef.actorInstance.get.asInstanceOf[TypedActor]
|
||||
val proxy = Proxy.newInstance(Array(intfClass), Array(typedActor), true, false)
|
||||
typedActor.initialize(proxy)
|
||||
|
|
@ -391,33 +519,12 @@ object TypedActor extends Logging {
|
|||
if (config._threadBasedDispatcher.isDefined) actorRef.dispatcher = Dispatchers.newThreadBasedDispatcher(actorRef)
|
||||
if (config._host.isDefined) actorRef.makeRemote(config._host.get)
|
||||
actorRef.timeout = config.timeout
|
||||
AspectInitRegistry.register(proxy, AspectInit(intfClass, typedActor, actorRef, config._host, config.timeout))
|
||||
actorRef.start
|
||||
proxy.asInstanceOf[T]
|
||||
}
|
||||
|
||||
private[akka] def newInstance[T](intfClass: Class[T], actorRef: ActorRef): T = {
|
||||
if (!actorRef.actorInstance.get.isInstanceOf[TypedActor]) throw new IllegalArgumentException("ActorRef is not a ref to a typed actor")
|
||||
val typedActor = actorRef.actorInstance.get.asInstanceOf[TypedActor]
|
||||
val proxy = Proxy.newInstance(Array(intfClass), Array(typedActor), true, false)
|
||||
typedActor.initialize(proxy)
|
||||
AspectInitRegistry.register(proxy, AspectInit(intfClass, typedActor, actorRef, actorRef.remoteAddress, actorRef.timeout))
|
||||
actorRef.start
|
||||
proxy.asInstanceOf[T]
|
||||
}
|
||||
|
||||
private[akka] def newInstance[T](intfClass: Class[T], targetClass: Class[_],
|
||||
remoteAddress: Option[InetSocketAddress], timeout: Long): T = {
|
||||
val actorRef = actorOf(newTypedActor(targetClass))
|
||||
val typedActor = actorRef.actorInstance.get.asInstanceOf[TypedActor]
|
||||
val proxy = Proxy.newInstance(Array(intfClass), Array(typedActor), true, false)
|
||||
typedActor.initialize(proxy)
|
||||
actorRef.timeout = timeout
|
||||
if (remoteAddress.isDefined) actorRef.makeRemote(remoteAddress.get)
|
||||
AspectInitRegistry.register(proxy, AspectInit(intfClass, typedActor, actorRef, remoteAddress, timeout))
|
||||
actorRef.start
|
||||
proxy.asInstanceOf[T]
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Create a proxy for a RemoteActorRef representing a server managed remote typed actor.
|
||||
|
|
@ -557,6 +664,15 @@ object TypedActor extends Logging {
|
|||
typedActor
|
||||
}
|
||||
|
||||
private[akka] def newTypedActor(factory: => AnyRef): TypedActor = {
|
||||
val instance = factory
|
||||
val typedActor =
|
||||
if (instance.isInstanceOf[TypedActor]) instance.asInstanceOf[TypedActor]
|
||||
else throw new IllegalArgumentException("Actor [" + instance.getClass.getName + "] is not a sub class of 'TypedActor'")
|
||||
typedActor.preStart
|
||||
typedActor
|
||||
}
|
||||
|
||||
private[akka] def isOneWay(joinPoint: JoinPoint): Boolean =
|
||||
isOneWay(joinPoint.getRtti.asInstanceOf[MethodRtti])
|
||||
|
||||
|
|
|
|||
|
|
@ -11,7 +11,46 @@ import org.scalatest.BeforeAndAfterAll
|
|||
import org.scalatest.junit.JUnitRunner
|
||||
import org.junit.runner.RunWith
|
||||
|
||||
import se.scalablesolutions.akka.dispatch.DefaultCompletableFuture;
|
||||
import se.scalablesolutions.akka.dispatch.DefaultCompletableFuture
|
||||
import TypedActorSpec._
|
||||
|
||||
|
||||
object TypedActorSpec {
|
||||
trait MyTypedActor {
|
||||
def sendOneWay(msg: String) : Unit
|
||||
def sendRequestReply(msg: String) : String
|
||||
}
|
||||
|
||||
class MyTypedActorImpl extends TypedActor with MyTypedActor {
|
||||
self.id = "my-custom-id"
|
||||
def sendOneWay(msg: String) {
|
||||
println("got " + msg )
|
||||
}
|
||||
def sendRequestReply(msg: String) : String = {
|
||||
"got " + msg
|
||||
}
|
||||
}
|
||||
|
||||
class MyTypedActorWithConstructorArgsImpl(aString: String, aLong: Long) extends TypedActor with MyTypedActor {
|
||||
self.id = "my-custom-id"
|
||||
def sendOneWay(msg: String) {
|
||||
println("got " + msg + " " + aString + " " + aLong)
|
||||
}
|
||||
|
||||
def sendRequestReply(msg: String) : String = {
|
||||
msg + " " + aString + " " + aLong
|
||||
}
|
||||
}
|
||||
|
||||
class MyActor extends Actor {
|
||||
self.id = "my-custom-id"
|
||||
def receive = {
|
||||
case msg: String => println("got " + msg)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
@RunWith(classOf[JUnitRunner])
|
||||
class TypedActorSpec extends
|
||||
|
|
@ -19,7 +58,12 @@ class TypedActorSpec extends
|
|||
ShouldMatchers with
|
||||
BeforeAndAfterAll {
|
||||
|
||||
override def afterAll() {
|
||||
ActorRegistry.shutdownAll
|
||||
}
|
||||
|
||||
describe("TypedActor") {
|
||||
|
||||
it("should resolve Future return from method defined to return a Future") {
|
||||
val pojo = TypedActor.newInstance(classOf[SimpleJavaPojo], classOf[SimpleJavaPojoImpl])
|
||||
val future = pojo.square(10)
|
||||
|
|
@ -27,5 +71,16 @@ class TypedActorSpec extends
|
|||
future.result.isDefined should equal (true)
|
||||
future.result.get should equal (100)
|
||||
}
|
||||
|
||||
it("should accept constructor arguments") {
|
||||
val pojo1 = TypedActor.newInstance(classOf[MyTypedActor], new MyTypedActorWithConstructorArgsImpl("test", 1L))
|
||||
assert(pojo1.sendRequestReply("hello") === "hello test 1")
|
||||
|
||||
val pojo2 = TypedActor.newInstance(classOf[MyTypedActor], new MyTypedActorWithConstructorArgsImpl("test2", 2L), new TypedActorConfiguration())
|
||||
assert(pojo2.sendRequestReply("hello") === "hello test2 2")
|
||||
|
||||
val pojo3 = TypedActor.newInstance(classOf[MyTypedActor], new MyTypedActorWithConstructorArgsImpl("test3", 3L), 5000L)
|
||||
assert(pojo3.sendRequestReply("hello") === "hello test3 3")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue