From 10ecd8541c4b7213ca3db015f4e6cfc9a1f10fa5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Wed, 6 Apr 2011 12:33:19 +0200 Subject: [PATCH] Turned 'sendRequestReplyFuture(..): Future[_]' into 'sendRequestReplyFuture[T <: AnyRef](..): Future[T] --- .../src/main/scala/akka/actor/ActorRef.scala | 65 ++++++++++--------- .../java/akka/tutorial/java/second/Pi.java | 20 +++--- 2 files changed, 43 insertions(+), 42 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 9358d8ea40..b15fdecf24 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -32,19 +32,20 @@ private[akka] object ActorRefInternals { } /** - * Abstraction for unification of sender and senderFuture for later reply + * Abstraction for unification of sender and senderFuture for later reply. + * Can be stored away and used at a later point in time. */ abstract class Channel[T] { /** - * Sends the specified message to the channel - * Scala API + * Scala API.

+ * Sends the specified message to the channel. */ def !(msg: T): Unit /** - * Sends the specified message to the channel - * Java API + * Java API.

+ * Sends the specified message to the channel. */ def sendOneWay(msg: T): Unit = this.!(msg) } @@ -125,7 +126,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal var receiveTimeout: Option[Long] = None /** - * Akka Java API + * Akka Java API.

* Defines the default timeout for an initial receive invocation. * When specified, the receive function should be able to handle a 'ReceiveTimeout' message. */ @@ -133,7 +134,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal def getReceiveTimeout(): Option[Long] = receiveTimeout /** - * Akka Java API + * Akka Java API.

* A faultHandler defines what should be done when a linked actor signals an error. *

* Can be one of: @@ -150,7 +151,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal /** - * Akka Java API + * Akka Java API.

* A lifeCycle defines whether the actor will be stopped on error (Temporary) or if it can be restarted (Permanent) *

* Can be one of: @@ -168,7 +169,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal def getLifeCycle(): LifeCycle /** - * Akka Java API + * Akka Java API.

* The default dispatcher is the Dispatchers.globalExecutorBasedEventDrivenDispatcher. * This means that all actors will share the same event-driven executor based dispatcher. *

@@ -188,7 +189,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal def homeAddress: Option[InetSocketAddress] /** - * Java API + * Java API.

*/ def getHomeAddress(): InetSocketAddress = homeAddress getOrElse null @@ -216,14 +217,14 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal def uuid = _uuid /** - * Akka Java API + * Akka Java API.

* The reference sender Actor of the last received message. * Is defined if the message was sent from another Actor, else None. */ def getSender(): Option[ActorRef] = sender /** - * Akka Java API + * Akka Java API.

* The reference sender future of the last received message. * Is defined if the message was sent with sent with '!!' or '!!!', else None. */ @@ -263,7 +264,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal protected[akka] def uuid_=(uid: Uuid) = _uuid = uid /** - * Akka Java API + * Akka Java API.

* Sends a one-way asynchronous message. E.g. fire-and-forget semantics. *

*

@@ -274,7 +275,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
   def sendOneWay(message: AnyRef): Unit = sendOneWay(message, null)
 
   /**
-   * Akka Java API
+   * Akka Java API. 

* Sends a one-way asynchronous message. E.g. fire-and-forget semantics. *

* Allows you to pass along the sender of the messag. @@ -287,21 +288,21 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal def sendOneWay(message: AnyRef, sender: ActorRef): Unit = this.!(message)(Option(sender)) /** - * Akka Java API + * Akka Java API.

* @see sendRequestReply(message: AnyRef, timeout: Long, sender: ActorRef) * Uses the defualt timeout of the Actor (setTimeout()) and omits the sender reference */ def sendRequestReply(message: AnyRef): AnyRef = sendRequestReply(message, timeout, null) /** - * Akka Java API + * Akka Java API.

* @see sendRequestReply(message: AnyRef, timeout: Long, sender: ActorRef) * Uses the defualt timeout of the Actor (setTimeout()) */ def sendRequestReply(message: AnyRef, sender: ActorRef): AnyRef = sendRequestReply(message, timeout, sender) /** - * Akka Java API + * Akka Java API.

* Sends a message asynchronously and waits on a future for a reply message under the hood. *

* It waits on the reply either until it receives it or until the timeout expires @@ -325,21 +326,21 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal } /** - * Akka Java API + * Akka Java API.

* @see sendRequestReplyFuture(message: AnyRef, sender: ActorRef): Future[_] * Uses the Actors default timeout (setTimeout()) and omits the sender */ - def sendRequestReplyFuture[T](message: AnyRef): Future[T] = sendRequestReplyFuture(message, timeout, null).asInstanceOf[Future[T]] + def sendRequestReplyFuture[T <: AnyRef](message: AnyRef): Future[T] = sendRequestReplyFuture(message, timeout, null).asInstanceOf[Future[T]] /** - * Akka Java API + * Akka Java API.

* @see sendRequestReplyFuture(message: AnyRef, sender: ActorRef): Future[_] * Uses the Actors default timeout (setTimeout()) */ - def sendRequestReplyFuture(message: AnyRef, sender: ActorRef): Future[_] = sendRequestReplyFuture(message, timeout, sender) + def sendRequestReplyFuture[T <: AnyRef](message: AnyRef, sender: ActorRef): Future[T] = sendRequestReplyFuture(message, timeout, sender).asInstanceOf[Future[T]] /** - * Akka Java API + * Akka Java API.

* Sends a message asynchronously returns a future holding the eventual reply message. *

* NOTE: @@ -349,10 +350,10 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal * If you are sending messages using sendRequestReplyFuture then you have to use getContext().reply(..) * to send a reply message to the original sender. If not then the sender will block until the timeout expires. */ - def sendRequestReplyFuture(message: AnyRef, timeout: Long, sender: ActorRef): Future[_] = !!!(message, timeout)(Option(sender)) + def sendRequestReplyFuture[T <: AnyRef](message: AnyRef, timeout: Long, sender: ActorRef): Future[T] = !!!(message, timeout)(Option(sender)).asInstanceOf[Future[T]] /** - * Akka Java API + * Akka Java API.

* Forwards the message specified to this actor and preserves the original sender of the message */ def forward(message: AnyRef, sender: ActorRef): Unit = @@ -360,7 +361,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal else forward(message)(Some(sender)) /** - * Akka Java API + * Akka Java API.

* Use getContext().replyUnsafe(..) to reply with a message to the original sender of the message currently * being processed. *

@@ -369,7 +370,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal def replyUnsafe(message: AnyRef) = reply(message) /** - * Akka Java API + * Akka Java API.

* Use getContext().replySafe(..) to reply with a message to the original sender of the message currently * being processed. *

@@ -383,7 +384,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal def actorClass: Class[_ <: Actor] /** - * Akka Java API + * Akka Java API.

* Returns the class for the Actor instance that is managed by the ActorRef. */ def getActorClass(): Class[_ <: Actor] = actorClass @@ -394,7 +395,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal def actorClassName: String /** - * Akka Java API + * Akka Java API.

* Returns the class name for the Actor instance that is managed by the ActorRef. */ def getActorClassName(): String = actorClassName @@ -479,7 +480,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal def mailboxSize = dispatcher.mailboxSize(this) /** - * Akka Java API + * Akka Java API.

* Returns the mailbox size. */ def getMailboxSize(): Int = mailboxSize @@ -490,7 +491,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal def supervisor: Option[ActorRef] /** - * Akka Java API + * Akka Java API.

* Returns the supervisor, if there is one. */ def getSupervisor(): ActorRef = supervisor getOrElse null @@ -502,7 +503,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal def linkedActors: JMap[Uuid, ActorRef] /** - * Java API + * Java API.

* Returns an unmodifiable Java Map containing the linked actors, * please note that the backing map is thread-safe but not immutable */ @@ -527,7 +528,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal } /** - * Java API. + * Java API.

* Abstraction for unification of sender and senderFuture for later reply */ def getChannel: Channel[Any] = channel diff --git a/akka-tutorials/akka-tutorial-second/src/main/java/akka/tutorial/java/second/Pi.java b/akka-tutorials/akka-tutorial-second/src/main/java/akka/tutorial/java/second/Pi.java index d3192bced5..20eda5613f 100644 --- a/akka-tutorials/akka-tutorial-second/src/main/java/akka/tutorial/java/second/Pi.java +++ b/akka-tutorials/akka-tutorial-second/src/main/java/akka/tutorial/java/second/Pi.java @@ -149,32 +149,32 @@ public class Pi { } }).start(); } - + @Override public void preStart() { become(scatter); } - + // message handler public void onReceive(Object message) { throw new IllegalStateException("Should be gatter or scatter"); } - + private final Procedure scatter = new Procedure() { public void apply(Object msg) { // schedule work for (int arg = 0; arg < nrOfMessages; arg++) { router.sendOneWay(new Work(arg, nrOfElements), getContext()); } - + // TODO would like to use channel instead, wrong docs, channel() not there // getContext().channel() CompletableFuture resultFuture = getContext().getSenderFuture().get(); // Assume the gathering behavior become(gatter(resultFuture)); } - }; - + }; + private Procedure gatter(final CompletableFuture resultFuture) { return new Procedure() { public void apply(Object msg) { @@ -190,7 +190,7 @@ public class Pi { getContext().stop(); } } - }; + }; } @Override @@ -213,13 +213,13 @@ public class Pi { return new Master(nrOfWorkers, nrOfMessages, nrOfElements); } }).start(); - + // start the calculation long start = currentTimeMillis(); // send calculate message long timeout = 60000; - Future replyFuture = (Future) master.sendRequestReplyFuture(new Calculate(), timeout, null); + Future replyFuture = master.sendRequestReplyFuture(new Calculate(), timeout, null); Option result = replyFuture.await().resultOrException(); if (result.isDefined()) { double pi = result.get(); @@ -231,6 +231,6 @@ public class Pi { // EventHandler.error(this, "Pi calculation did not complete within the timeout."); System.out.println("Pi calculation did not complete within the timeout."); } - + } }