diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala index cb8e7d410a..ca14b5fc87 100644 --- a/akka-core/src/main/scala/actor/ActorRef.scala +++ b/akka-core/src/main/scala/actor/ActorRef.scala @@ -1605,7 +1605,7 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef => * If you are sending messages using !!! then you have to use self.reply(..) * to send a reply message to the original sender. If not then the sender will block until the timeout expires. */ - def !!![T](message: Any)(implicit sender: Option[ActorRef] = None): Future[T] = { + def !!![T](message: Any, timeout: Long = this.timeout)(implicit sender: Option[ActorRef] = None): Future[T] = { if (isRunning) postMessageToMailboxAndCreateFutureResultWithTimeout[T](message, timeout, sender, None) else throw new ActorInitializationException( "Actor has not been started, you need to invoke 'actor.start' before using it") diff --git a/akka-core/src/main/scala/actor/TypedActor.scala b/akka-core/src/main/scala/actor/TypedActor.scala index 78b5740344..eec385d921 100644 --- a/akka-core/src/main/scala/actor/TypedActor.scala +++ b/akka-core/src/main/scala/actor/TypedActor.scala @@ -23,16 +23,25 @@ import java.lang.reflect.{InvocationTargetException, Method, Field} import scala.reflect.BeanProperty /** - * FIXME: document TypedActor + * TypedActor is a type-safe actor made out of a POJO with interface. + * Void methods are turned into fire-forget messages. + * Non-void methods are turned into request-reply messages with the exception of methods returning + * a 'Future' which will be sent using request-reply-with-future semantics and need to return the + * result using the 'future(..)' method: 'return future(... future result ...);'. * * Here is an example of usage (in Java): *
- * class PingImpl extends TypedActor implements Ping {
+ * class TestActorImpl extends TypedActor implements TestActor {
+ * 
  *   public void hit(int count) {
  *     Pong pong = (Pong) getContext().getSender();
  *     pong.hit(count++);
  *   }
  *
+ *   public Future square(int x) {
+ *     return future(x * x);
+ *   }
+ * 
  *   @Override
  *   public void init() {
  *     ... // optional initialization on start
@@ -47,23 +56,31 @@ import scala.reflect.BeanProperty
  * }
  *
  * // create the ping actor
- * Ping ping = TypedActor.newInstance(Ping.class, PingImpl.class);
+ * TestActor actor = TypedActor.newInstance(TestActor.class, TestActorImpl.class);
  *
- * ping.hit(1); // use the actor
- * ping.hit(1);
+ * actor.hit(1); // use the actor
+ * actor.hit(1);
  *
+ * // This method will return immediately when called, caller should wait on the Future for the result
+ * Future future = actor.square(10);
+ * future.await();
+ * Integer result = future.get();
+ * 
  * // stop the actor
- * TypedActor.stop(ping);
+ * TypedActor.stop(actor);
  * 
* * Here is an example of usage (in Scala): *
- * class PingImpl extends TypedActor with Ping {
+ * class TestActorImpl extends TypedActor with TestActor {
+ * 
  *   def hit(count: Int) = {
  *     val pong = context.sender.asInstanceOf[Pong]
  *     pong.hit(count += 1)
  *   }
  *
+ *   def square(x: Int): Future[Integer] = future(x * x)
+ * 
  *   override def init = {
  *     ... // optional initialization on start
  *   }
@@ -81,6 +98,11 @@ import scala.reflect.BeanProperty
  * ping.hit(1) // use the actor
  * ping.hit(1)
  *
+ * // This method will return immediately when called, caller should wait on the Future for the result
+ * val future = actor.square(10)
+ * future.await
+ * val result: Int = future.get
+ * 
  * // stop the actor
  * TypedActor.stop(ping)
  * 
@@ -181,10 +203,39 @@ abstract class TypedActor extends Logging { * Is called during initialization. Can be used to initialize transactional state. Will be invoked within a transaction. */ def initTransactionalState {} + + /** + * This method is used to resolve the Future for TypedActor methods that are defined to return a + * {@link se.scalablesolutions.akka.actor.dispatch.Future }. + *

+ * Here is an example: + *

+   *   class MyTypedActorImpl extends TypedActor implements MyTypedActor {
+   *     public Future square(int x) {
+   *       return future(x * x);
+   *    }
+   *  }
+   *
+   *  MyTypedActor actor = TypedActor.actorOf(MyTypedActor.class, MyTypedActorImpl.class);
+   *
+   *  // This method will return immediately when called, caller should wait on the Future for the result
+   *  Future future = actor.square(10);
+   *  future.await();
+   *  Integer result = future.get();
+   * 
+ */ + def future[T](value: T): Future[T] = { + val fut = context.senderFuture + if (fut.isDefined) { + fut.get.completeWithResult(value) + fut.get.asInstanceOf[Future[T]] + } else throw new IllegalActorStateException("No sender future in scope") + } } /** - * FIXME: document TypedTransactor + * Transactional TypedActor. All messages send to this actor as sent in a transaction. If an enclosing transaction + * exists it will be joined, if not then a new transaction will be created. * * @author Jonas Bonér */ @@ -193,8 +244,6 @@ abstract class TypedTransactor extends TypedActor /** * Configuration factory for TypedActors. * - * FIXDOC: document TypedActorConfiguration - * * @author Jonas Bonér */ final class TypedActorConfiguration { @@ -459,8 +508,8 @@ object TypedActor extends Logging { if (parent != null) injectTypedActorContext0(typedActor, parent) else { log.trace("Can't set 'TypedActorContext' for TypedActor [" + - typedActor.getClass.getName + - "] since no field of this type could be found.") + typedActor.getClass.getName + + "] since no field of this type could be found.") None } } @@ -575,34 +624,37 @@ private[akka] sealed class TypedActorAspect { } private def localDispatch(joinPoint: JoinPoint): AnyRef = { - val rtti = joinPoint.getRtti.asInstanceOf[MethodRtti] - val isOneWay = isVoid(rtti) + val method = joinPoint.getRtti.asInstanceOf[MethodRtti] + val isOneWay = isVoid(method) val sender = TypedActorContext.sender.value - val senderFuture = TypedActorContext.senderFuture.value + val priorSenderFuture = TypedActorContext.senderFuture.value if (!actorRef.isRunning && !isStopped) { isStopped = true joinPoint.proceed } else if (isOneWay) { - actorRef ! Invocation(joinPoint, true, true, sender, senderFuture) + actorRef ! Invocation(joinPoint, true, true, sender, priorSenderFuture) null.asInstanceOf[AnyRef] + } else if (returnsFuture_?(method)) { + actorRef !!! (Invocation(joinPoint, false, false, sender, priorSenderFuture), timeout) + } else { - val result = (actorRef !! (Invocation(joinPoint, false, isOneWay, sender, senderFuture), timeout)).as[AnyRef] + val result = (actorRef !! (Invocation(joinPoint, false, false, sender, priorSenderFuture), timeout)).as[AnyRef] if (result.isDefined) result.get - else throw new IllegalActorStateException("No result defined for invocation [" + joinPoint + "]") + else throw new ActorTimeoutException("Invocation to [" + joinPoint + "] timed out.") } } private def remoteDispatch(joinPoint: JoinPoint): AnyRef = { - val rtti = joinPoint.getRtti.asInstanceOf[MethodRtti] - val isOneWay = isVoid(rtti) - val (message: Array[AnyRef], isEscaped) = escapeArguments(rtti.getParameterValues) + val method = joinPoint.getRtti.asInstanceOf[MethodRtti] + val isOneWay = isVoid(method) + val (message: Array[AnyRef], isEscaped) = escapeArguments(method.getParameterValues) val typedActorInfo = TypedActorInfoProtocol.newBuilder .setInterface(interfaceClass.getName) - .setMethod(rtti.getMethod.getName) + .setMethod(method.getMethod.getName) .build val actorInfo = ActorInfoProtocol.newBuilder @@ -614,10 +666,10 @@ private[akka] sealed class TypedActorAspect { .build val requestBuilder = RemoteRequestProtocol.newBuilder - .setId(RemoteRequestProtocolIdFactory.nextId) - .setMessage(MessageSerializer.serialize(message)) - .setActorInfo(actorInfo) - .setIsOneWay(isOneWay) + .setId(RemoteRequestProtocolIdFactory.nextId) + .setMessage(MessageSerializer.serialize(message)) + .setActorInfo(actorInfo) + .setIsOneWay(isOneWay) val id = actorRef.registerSupervisorAsRemoteActor if (id.isDefined) requestBuilder.setSupervisorUuid(id.get) @@ -643,6 +695,8 @@ private[akka] sealed class TypedActorAspect { private def isVoid(rtti: MethodRtti) = rtti.getMethod.getReturnType == java.lang.Void.TYPE + private def returnsFuture_?(rtti: MethodRtti) = rtti.getMethod.getReturnType.isAssignableFrom(classOf[Future[_]]) + private def escapeArguments(args: Array[AnyRef]): Tuple2[Array[AnyRef], Boolean] = { var isEscaped = false val escapedArgs = for (arg <- args) yield { @@ -666,7 +720,8 @@ private[akka] sealed class TypedActorAspect { override def toString: String = synchronized { "Invocation [" + - "\n\t\tmethod = " + joinPoint.getRtti.asInstanceOf[MethodRtti].getMethod.getName + " @ " + joinPoint.getTarget.getClass.getName + + "\n\t\tmethod = " + joinPoint.getRtti.asInstanceOf[MethodRtti].getMethod.getName + " @ " + + joinPoint.getTarget.getClass.getName + "\n\t\tisOneWay = " + isOneWay + "\n\t\tisVoid = " + isVoid + "\n\t\tsender = " + sender + @@ -716,7 +771,7 @@ private[akka] class Dispatcher(transactionalRequired: Boolean) extends Actor { private[actor] def initialize( targetClass: Class[_], targetInstance: TypedActor, proxy: AnyRef, ctx: Option[TypedActorContext]) = { - if (transactionalRequired || isTransactional(targetClass)) self.makeTransactionRequired + if (transactionalRequired || isTransactional(targetClass)) self.makeTransactionRequired self.id = targetClass.getName this.targetClass = targetClass @@ -739,6 +794,7 @@ private[akka] class Dispatcher(transactionalRequired: Boolean) extends Actor { context.foreach { ctx => if (sender ne null) ctx._sender = sender if (senderFuture ne null) ctx._senderFuture = senderFuture + else if (self.senderFuture.isDefined) ctx._senderFuture = self.senderFuture.get } TypedActorContext.sender.value = joinPoint.getThis // set next sender self.senderFuture.foreach(TypedActorContext.senderFuture.value = _) @@ -749,7 +805,7 @@ private[akka] class Dispatcher(transactionalRequired: Boolean) extends Actor { // Jan Kronquist: started work on issue 121 case Link(proxy) => self.link(proxy) case Unlink(proxy) => self.unlink(proxy) - case unexpected => throw new IllegalActorStateException( + case unexpected => throw new IllegalActorStateException( "Unexpected message [" + unexpected + "] sent to [" + this + "]") } @@ -785,22 +841,20 @@ private[akka] class Dispatcher(transactionalRequired: Boolean) extends Actor { var hasMutableArgument = false for (arg <- args.toList) { if (!arg.isInstanceOf[String] && - !arg.isInstanceOf[Byte] && - !arg.isInstanceOf[Int] && - !arg.isInstanceOf[Long] && - !arg.isInstanceOf[Float] && - !arg.isInstanceOf[Double] && - !arg.isInstanceOf[Boolean] && - !arg.isInstanceOf[Char] && - !arg.isInstanceOf[java.lang.Byte] && - !arg.isInstanceOf[java.lang.Integer] && - !arg.isInstanceOf[java.lang.Long] && - !arg.isInstanceOf[java.lang.Float] && - !arg.isInstanceOf[java.lang.Double] && - !arg.isInstanceOf[java.lang.Boolean] && - !arg.isInstanceOf[java.lang.Character]) { - hasMutableArgument = true - } + !arg.isInstanceOf[Byte] && + !arg.isInstanceOf[Int] && + !arg.isInstanceOf[Long] && + !arg.isInstanceOf[Float] && + !arg.isInstanceOf[Double] && + !arg.isInstanceOf[Boolean] && + !arg.isInstanceOf[Char] && + !arg.isInstanceOf[java.lang.Byte] && + !arg.isInstanceOf[java.lang.Integer] && + !arg.isInstanceOf[java.lang.Long] && + !arg.isInstanceOf[java.lang.Float] && + !arg.isInstanceOf[java.lang.Double] && + !arg.isInstanceOf[java.lang.Boolean] && + !arg.isInstanceOf[java.lang.Character]) hasMutableArgument = true if (arg.getClass.getName.contains(TypedActor.AW_PROXY_PREFIX)) unserializable = true } if (!unserializable && hasMutableArgument) { diff --git a/akka-core/src/test/java/se/scalablesolutions/akka/actor/SimpleJavaPojo.java b/akka-core/src/test/java/se/scalablesolutions/akka/actor/SimpleJavaPojo.java index d7ab60b676..ae63299547 100644 --- a/akka-core/src/test/java/se/scalablesolutions/akka/actor/SimpleJavaPojo.java +++ b/akka-core/src/test/java/se/scalablesolutions/akka/actor/SimpleJavaPojo.java @@ -1,10 +1,13 @@ package se.scalablesolutions.akka.actor; +import se.scalablesolutions.akka.dispatch.Future; import se.scalablesolutions.akka.dispatch.CompletableFuture; +import se.scalablesolutions.akka.dispatch.Future; public interface SimpleJavaPojo { public Object getSender(); public CompletableFuture getSenderFuture(); + public Future square(int value); public void setName(String name); public String getName(); public void throwException(); diff --git a/akka-core/src/test/java/se/scalablesolutions/akka/actor/SimpleJavaPojoImpl.java b/akka-core/src/test/java/se/scalablesolutions/akka/actor/SimpleJavaPojoImpl.java index 3b3508c0ab..4218aa0325 100644 --- a/akka-core/src/test/java/se/scalablesolutions/akka/actor/SimpleJavaPojoImpl.java +++ b/akka-core/src/test/java/se/scalablesolutions/akka/actor/SimpleJavaPojoImpl.java @@ -1,6 +1,7 @@ package se.scalablesolutions.akka.actor; import se.scalablesolutions.akka.actor.*; +import se.scalablesolutions.akka.dispatch.Future; import se.scalablesolutions.akka.dispatch.CompletableFuture; public class SimpleJavaPojoImpl extends TypedActor implements SimpleJavaPojo { @@ -16,6 +17,11 @@ public class SimpleJavaPojoImpl extends TypedActor implements SimpleJavaPojo { private String name; + public Future square(int value) { + System.out.println("------------------------ SQUARE"); + return future(value * value); + } + public Object getSender() { return getContext().getSender(); } diff --git a/akka-core/src/test/scala/actor/typed-actor/TypedActorContextSpec.scala b/akka-core/src/test/scala/actor/typed-actor/TypedActorContextSpec.scala index 11719650d6..a4db260a4e 100644 --- a/akka-core/src/test/scala/actor/typed-actor/TypedActorContextSpec.scala +++ b/akka-core/src/test/scala/actor/typed-actor/TypedActorContextSpec.scala @@ -24,22 +24,14 @@ class TypedActorContextSpec extends val pojo = TypedActor.newInstance(classOf[SimpleJavaPojo], classOf[SimpleJavaPojoImpl]) val pojoCaller = TypedActor.newInstance(classOf[SimpleJavaPojoCaller], classOf[SimpleJavaPojoCallerImpl]) pojoCaller.setPojo(pojo) - try { - pojoCaller.getSenderFromSimpleJavaPojo should equal (pojoCaller) - } catch { - case e => fail("no sender available") - } + pojoCaller.getSenderFromSimpleJavaPojo should equal (pojoCaller) } it("context.senderFuture should return the senderFuture TypedActor reference") { val pojo = TypedActor.newInstance(classOf[SimpleJavaPojo], classOf[SimpleJavaPojoImpl]) val pojoCaller = TypedActor.newInstance(classOf[SimpleJavaPojoCaller], classOf[SimpleJavaPojoCallerImpl]) pojoCaller.setPojo(pojo) - try { - pojoCaller.getSenderFutureFromSimpleJavaPojo.getClass.getName should equal (classOf[DefaultCompletableFuture[_]].getName) - } catch { - case e => fail("no sender future available", e) - } + pojoCaller.getSenderFutureFromSimpleJavaPojo.getClass.getName should equal (classOf[DefaultCompletableFuture[_]].getName) } } } diff --git a/akka-core/src/test/scala/actor/typed-actor/TypedActorSpec.scala b/akka-core/src/test/scala/actor/typed-actor/TypedActorSpec.scala new file mode 100644 index 0000000000..7de0a8f5df --- /dev/null +++ b/akka-core/src/test/scala/actor/typed-actor/TypedActorSpec.scala @@ -0,0 +1,31 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.actor + +import org.scalatest.Spec +import org.scalatest.Assertions +import org.scalatest.matchers.ShouldMatchers +import org.scalatest.BeforeAndAfterAll +import org.scalatest.junit.JUnitRunner +import org.junit.runner.RunWith + +import se.scalablesolutions.akka.dispatch.DefaultCompletableFuture; + +@RunWith(classOf[JUnitRunner]) +class TypedActorSpec extends + Spec with + ShouldMatchers with + BeforeAndAfterAll { + + describe("TypedActor") { + it("should resolve Future return from method defined to return a Future") { + val pojo = TypedActor.newInstance(classOf[SimpleJavaPojo], classOf[SimpleJavaPojoImpl]) + val future = pojo.square(10) + future.await + future.result.isDefined should equal (true) + future.result.get should equal (100) + } + } +}