Return Future from TypedActor message send

This commit is contained in:
Jonas Boner 2010-08-16 21:55:00 +02:00
parent 19ac69fda4
commit e53d2200e3
6 changed files with 142 additions and 56 deletions

View file

@ -1605,7 +1605,7 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef =>
* If you are sending messages using <code>!!!</code> then you <b>have to</b> use <code>self.reply(..)</code>
* to send a reply message to the original sender. If not then the sender will block until the timeout expires.
*/
def !!![T](message: Any)(implicit sender: Option[ActorRef] = None): Future[T] = {
def !!![T](message: Any, timeout: Long = this.timeout)(implicit sender: Option[ActorRef] = None): Future[T] = {
if (isRunning) postMessageToMailboxAndCreateFutureResultWithTimeout[T](message, timeout, sender, None)
else throw new ActorInitializationException(
"Actor has not been started, you need to invoke 'actor.start' before using it")

View file

@ -23,16 +23,25 @@ import java.lang.reflect.{InvocationTargetException, Method, Field}
import scala.reflect.BeanProperty
/**
* FIXME: document TypedActor
* TypedActor is a type-safe actor made out of a POJO with interface.
* Void methods are turned into fire-forget messages.
* Non-void methods are turned into request-reply messages with the exception of methods returning
* a 'Future' which will be sent using request-reply-with-future semantics and need to return the
* result using the 'future(..)' method: 'return future(... future result ...);'.
*
* Here is an example of usage (in Java):
* <pre>
* class PingImpl extends TypedActor implements Ping {
* class TestActorImpl extends TypedActor implements TestActor {
*
* public void hit(int count) {
* Pong pong = (Pong) getContext().getSender();
* pong.hit(count++);
* }
*
* public Future<Integer> square(int x) {
* return future(x * x);
* }
*
* @Override
* public void init() {
* ... // optional initialization on start
@ -47,23 +56,31 @@ import scala.reflect.BeanProperty
* }
*
* // create the ping actor
* Ping ping = TypedActor.newInstance(Ping.class, PingImpl.class);
* TestActor actor = TypedActor.newInstance(TestActor.class, TestActorImpl.class);
*
* ping.hit(1); // use the actor
* ping.hit(1);
* actor.hit(1); // use the actor
* actor.hit(1);
*
* // This method will return immediately when called, caller should wait on the Future for the result
* Future<Integer> future = actor.square(10);
* future.await();
* Integer result = future.get();
*
* // stop the actor
* TypedActor.stop(ping);
* TypedActor.stop(actor);
* </pre>
*
* Here is an example of usage (in Scala):
* <pre>
* class PingImpl extends TypedActor with Ping {
* class TestActorImpl extends TypedActor with TestActor {
*
* def hit(count: Int) = {
* val pong = context.sender.asInstanceOf[Pong]
* pong.hit(count += 1)
* }
*
* def square(x: Int): Future[Integer] = future(x * x)
*
* override def init = {
* ... // optional initialization on start
* }
@ -81,6 +98,11 @@ import scala.reflect.BeanProperty
* ping.hit(1) // use the actor
* ping.hit(1)
*
* // This method will return immediately when called, caller should wait on the Future for the result
* val future = actor.square(10)
* future.await
* val result: Int = future.get
*
* // stop the actor
* TypedActor.stop(ping)
* </pre>
@ -181,10 +203,39 @@ abstract class TypedActor extends Logging {
* Is called during initialization. Can be used to initialize transactional state. Will be invoked within a transaction.
*/
def initTransactionalState {}
/**
* This method is used to resolve the Future for TypedActor methods that are defined to return a
* {@link se.scalablesolutions.akka.actor.dispatch.Future }.
* <p/>
* Here is an example:
* <pre>
* class MyTypedActorImpl extends TypedActor implements MyTypedActor {
* public Future<Integer> square(int x) {
* return future(x * x);
* }
* }
*
* MyTypedActor actor = TypedActor.actorOf(MyTypedActor.class, MyTypedActorImpl.class);
*
* // This method will return immediately when called, caller should wait on the Future for the result
* Future<Integer> future = actor.square(10);
* future.await();
* Integer result = future.get();
* </pre>
*/
def future[T](value: T): Future[T] = {
val fut = context.senderFuture
if (fut.isDefined) {
fut.get.completeWithResult(value)
fut.get.asInstanceOf[Future[T]]
} else throw new IllegalActorStateException("No sender future in scope")
}
}
/**
* FIXME: document TypedTransactor
* Transactional TypedActor. All messages send to this actor as sent in a transaction. If an enclosing transaction
* exists it will be joined, if not then a new transaction will be created.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
@ -193,8 +244,6 @@ abstract class TypedTransactor extends TypedActor
/**
* Configuration factory for TypedActors.
*
* FIXDOC: document TypedActorConfiguration
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
final class TypedActorConfiguration {
@ -459,8 +508,8 @@ object TypedActor extends Logging {
if (parent != null) injectTypedActorContext0(typedActor, parent)
else {
log.trace("Can't set 'TypedActorContext' for TypedActor [" +
typedActor.getClass.getName +
"] since no field of this type could be found.")
typedActor.getClass.getName +
"] since no field of this type could be found.")
None
}
}
@ -575,34 +624,37 @@ private[akka] sealed class TypedActorAspect {
}
private def localDispatch(joinPoint: JoinPoint): AnyRef = {
val rtti = joinPoint.getRtti.asInstanceOf[MethodRtti]
val isOneWay = isVoid(rtti)
val method = joinPoint.getRtti.asInstanceOf[MethodRtti]
val isOneWay = isVoid(method)
val sender = TypedActorContext.sender.value
val senderFuture = TypedActorContext.senderFuture.value
val priorSenderFuture = TypedActorContext.senderFuture.value
if (!actorRef.isRunning && !isStopped) {
isStopped = true
joinPoint.proceed
} else if (isOneWay) {
actorRef ! Invocation(joinPoint, true, true, sender, senderFuture)
actorRef ! Invocation(joinPoint, true, true, sender, priorSenderFuture)
null.asInstanceOf[AnyRef]
} else if (returnsFuture_?(method)) {
actorRef !!! (Invocation(joinPoint, false, false, sender, priorSenderFuture), timeout)
} else {
val result = (actorRef !! (Invocation(joinPoint, false, isOneWay, sender, senderFuture), timeout)).as[AnyRef]
val result = (actorRef !! (Invocation(joinPoint, false, false, sender, priorSenderFuture), timeout)).as[AnyRef]
if (result.isDefined) result.get
else throw new IllegalActorStateException("No result defined for invocation [" + joinPoint + "]")
else throw new ActorTimeoutException("Invocation to [" + joinPoint + "] timed out.")
}
}
private def remoteDispatch(joinPoint: JoinPoint): AnyRef = {
val rtti = joinPoint.getRtti.asInstanceOf[MethodRtti]
val isOneWay = isVoid(rtti)
val (message: Array[AnyRef], isEscaped) = escapeArguments(rtti.getParameterValues)
val method = joinPoint.getRtti.asInstanceOf[MethodRtti]
val isOneWay = isVoid(method)
val (message: Array[AnyRef], isEscaped) = escapeArguments(method.getParameterValues)
val typedActorInfo = TypedActorInfoProtocol.newBuilder
.setInterface(interfaceClass.getName)
.setMethod(rtti.getMethod.getName)
.setMethod(method.getMethod.getName)
.build
val actorInfo = ActorInfoProtocol.newBuilder
@ -614,10 +666,10 @@ private[akka] sealed class TypedActorAspect {
.build
val requestBuilder = RemoteRequestProtocol.newBuilder
.setId(RemoteRequestProtocolIdFactory.nextId)
.setMessage(MessageSerializer.serialize(message))
.setActorInfo(actorInfo)
.setIsOneWay(isOneWay)
.setId(RemoteRequestProtocolIdFactory.nextId)
.setMessage(MessageSerializer.serialize(message))
.setActorInfo(actorInfo)
.setIsOneWay(isOneWay)
val id = actorRef.registerSupervisorAsRemoteActor
if (id.isDefined) requestBuilder.setSupervisorUuid(id.get)
@ -643,6 +695,8 @@ private[akka] sealed class TypedActorAspect {
private def isVoid(rtti: MethodRtti) = rtti.getMethod.getReturnType == java.lang.Void.TYPE
private def returnsFuture_?(rtti: MethodRtti) = rtti.getMethod.getReturnType.isAssignableFrom(classOf[Future[_]])
private def escapeArguments(args: Array[AnyRef]): Tuple2[Array[AnyRef], Boolean] = {
var isEscaped = false
val escapedArgs = for (arg <- args) yield {
@ -666,7 +720,8 @@ private[akka] sealed class TypedActorAspect {
override def toString: String = synchronized {
"Invocation [" +
"\n\t\tmethod = " + joinPoint.getRtti.asInstanceOf[MethodRtti].getMethod.getName + " @ " + joinPoint.getTarget.getClass.getName +
"\n\t\tmethod = " + joinPoint.getRtti.asInstanceOf[MethodRtti].getMethod.getName + " @ " +
joinPoint.getTarget.getClass.getName +
"\n\t\tisOneWay = " + isOneWay +
"\n\t\tisVoid = " + isVoid +
"\n\t\tsender = " + sender +
@ -716,7 +771,7 @@ private[akka] class Dispatcher(transactionalRequired: Boolean) extends Actor {
private[actor] def initialize(
targetClass: Class[_], targetInstance: TypedActor, proxy: AnyRef, ctx: Option[TypedActorContext]) = {
if (transactionalRequired || isTransactional(targetClass)) self.makeTransactionRequired
if (transactionalRequired || isTransactional(targetClass)) self.makeTransactionRequired
self.id = targetClass.getName
this.targetClass = targetClass
@ -739,6 +794,7 @@ private[akka] class Dispatcher(transactionalRequired: Boolean) extends Actor {
context.foreach { ctx =>
if (sender ne null) ctx._sender = sender
if (senderFuture ne null) ctx._senderFuture = senderFuture
else if (self.senderFuture.isDefined) ctx._senderFuture = self.senderFuture.get
}
TypedActorContext.sender.value = joinPoint.getThis // set next sender
self.senderFuture.foreach(TypedActorContext.senderFuture.value = _)
@ -749,7 +805,7 @@ private[akka] class Dispatcher(transactionalRequired: Boolean) extends Actor {
// Jan Kronquist: started work on issue 121
case Link(proxy) => self.link(proxy)
case Unlink(proxy) => self.unlink(proxy)
case unexpected => throw new IllegalActorStateException(
case unexpected => throw new IllegalActorStateException(
"Unexpected message [" + unexpected + "] sent to [" + this + "]")
}
@ -785,22 +841,20 @@ private[akka] class Dispatcher(transactionalRequired: Boolean) extends Actor {
var hasMutableArgument = false
for (arg <- args.toList) {
if (!arg.isInstanceOf[String] &&
!arg.isInstanceOf[Byte] &&
!arg.isInstanceOf[Int] &&
!arg.isInstanceOf[Long] &&
!arg.isInstanceOf[Float] &&
!arg.isInstanceOf[Double] &&
!arg.isInstanceOf[Boolean] &&
!arg.isInstanceOf[Char] &&
!arg.isInstanceOf[java.lang.Byte] &&
!arg.isInstanceOf[java.lang.Integer] &&
!arg.isInstanceOf[java.lang.Long] &&
!arg.isInstanceOf[java.lang.Float] &&
!arg.isInstanceOf[java.lang.Double] &&
!arg.isInstanceOf[java.lang.Boolean] &&
!arg.isInstanceOf[java.lang.Character]) {
hasMutableArgument = true
}
!arg.isInstanceOf[Byte] &&
!arg.isInstanceOf[Int] &&
!arg.isInstanceOf[Long] &&
!arg.isInstanceOf[Float] &&
!arg.isInstanceOf[Double] &&
!arg.isInstanceOf[Boolean] &&
!arg.isInstanceOf[Char] &&
!arg.isInstanceOf[java.lang.Byte] &&
!arg.isInstanceOf[java.lang.Integer] &&
!arg.isInstanceOf[java.lang.Long] &&
!arg.isInstanceOf[java.lang.Float] &&
!arg.isInstanceOf[java.lang.Double] &&
!arg.isInstanceOf[java.lang.Boolean] &&
!arg.isInstanceOf[java.lang.Character]) hasMutableArgument = true
if (arg.getClass.getName.contains(TypedActor.AW_PROXY_PREFIX)) unserializable = true
}
if (!unserializable && hasMutableArgument) {

View file

@ -1,10 +1,13 @@
package se.scalablesolutions.akka.actor;
import se.scalablesolutions.akka.dispatch.Future;
import se.scalablesolutions.akka.dispatch.CompletableFuture;
import se.scalablesolutions.akka.dispatch.Future;
public interface SimpleJavaPojo {
public Object getSender();
public CompletableFuture<Object> getSenderFuture();
public Future<Integer> square(int value);
public void setName(String name);
public String getName();
public void throwException();

View file

@ -1,6 +1,7 @@
package se.scalablesolutions.akka.actor;
import se.scalablesolutions.akka.actor.*;
import se.scalablesolutions.akka.dispatch.Future;
import se.scalablesolutions.akka.dispatch.CompletableFuture;
public class SimpleJavaPojoImpl extends TypedActor implements SimpleJavaPojo {
@ -16,6 +17,11 @@ public class SimpleJavaPojoImpl extends TypedActor implements SimpleJavaPojo {
private String name;
public Future<Integer> square(int value) {
System.out.println("------------------------ SQUARE");
return future(value * value);
}
public Object getSender() {
return getContext().getSender();
}

View file

@ -24,22 +24,14 @@ class TypedActorContextSpec extends
val pojo = TypedActor.newInstance(classOf[SimpleJavaPojo], classOf[SimpleJavaPojoImpl])
val pojoCaller = TypedActor.newInstance(classOf[SimpleJavaPojoCaller], classOf[SimpleJavaPojoCallerImpl])
pojoCaller.setPojo(pojo)
try {
pojoCaller.getSenderFromSimpleJavaPojo should equal (pojoCaller)
} catch {
case e => fail("no sender available")
}
pojoCaller.getSenderFromSimpleJavaPojo should equal (pojoCaller)
}
it("context.senderFuture should return the senderFuture TypedActor reference") {
val pojo = TypedActor.newInstance(classOf[SimpleJavaPojo], classOf[SimpleJavaPojoImpl])
val pojoCaller = TypedActor.newInstance(classOf[SimpleJavaPojoCaller], classOf[SimpleJavaPojoCallerImpl])
pojoCaller.setPojo(pojo)
try {
pojoCaller.getSenderFutureFromSimpleJavaPojo.getClass.getName should equal (classOf[DefaultCompletableFuture[_]].getName)
} catch {
case e => fail("no sender future available", e)
}
pojoCaller.getSenderFutureFromSimpleJavaPojo.getClass.getName should equal (classOf[DefaultCompletableFuture[_]].getName)
}
}
}

View file

@ -0,0 +1,31 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.actor
import org.scalatest.Spec
import org.scalatest.Assertions
import org.scalatest.matchers.ShouldMatchers
import org.scalatest.BeforeAndAfterAll
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
import se.scalablesolutions.akka.dispatch.DefaultCompletableFuture;
@RunWith(classOf[JUnitRunner])
class TypedActorSpec extends
Spec with
ShouldMatchers with
BeforeAndAfterAll {
describe("TypedActor") {
it("should resolve Future return from method defined to return a Future") {
val pojo = TypedActor.newInstance(classOf[SimpleJavaPojo], classOf[SimpleJavaPojoImpl])
val future = pojo.square(10)
future.await
future.result.isDefined should equal (true)
future.result.get should equal (100)
}
}
}