Turned 'sendRequestReplyFuture(..): Future[_]' into 'sendRequestReplyFuture[T <: AnyRef](..): Future[T]

This commit is contained in:
Jonas Bonér 2011-04-06 12:33:19 +02:00
parent 899144d1d2
commit 10ecd8541c
2 changed files with 43 additions and 42 deletions

View file

@ -32,19 +32,20 @@ private[akka] object ActorRefInternals {
}
/**
* Abstraction for unification of sender and senderFuture for later reply
* Abstraction for unification of sender and senderFuture for later reply.
* Can be stored away and used at a later point in time.
*/
abstract class Channel[T] {
/**
* Sends the specified message to the channel
* Scala API
* Scala API. <p/>
* Sends the specified message to the channel.
*/
def !(msg: T): Unit
/**
* Sends the specified message to the channel
* Java API
* Java API. <p/>
* Sends the specified message to the channel.
*/
def sendOneWay(msg: T): Unit = this.!(msg)
}
@ -125,7 +126,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
var receiveTimeout: Option[Long] = None
/**
* Akka Java API
* Akka Java API. <p/>
* Defines the default timeout for an initial receive invocation.
* When specified, the receive function should be able to handle a 'ReceiveTimeout' message.
*/
@ -133,7 +134,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
def getReceiveTimeout(): Option[Long] = receiveTimeout
/**
* Akka Java API
* Akka Java API. <p/>
* A faultHandler defines what should be done when a linked actor signals an error.
* <p/>
* Can be one of:
@ -150,7 +151,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
/**
* Akka Java API
* Akka Java API. <p/>
* A lifeCycle defines whether the actor will be stopped on error (Temporary) or if it can be restarted (Permanent)
* <p/>
* Can be one of:
@ -168,7 +169,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
def getLifeCycle(): LifeCycle
/**
* Akka Java API
* Akka Java API. <p/>
* The default dispatcher is the <tt>Dispatchers.globalExecutorBasedEventDrivenDispatcher</tt>.
* This means that all actors will share the same event-driven executor based dispatcher.
* <p/>
@ -188,7 +189,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
def homeAddress: Option[InetSocketAddress]
/**
* Java API
* Java API. <p/>
*/
def getHomeAddress(): InetSocketAddress = homeAddress getOrElse null
@ -216,14 +217,14 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
def uuid = _uuid
/**
* Akka Java API
* Akka Java API. <p/>
* The reference sender Actor of the last received message.
* Is defined if the message was sent from another Actor, else None.
*/
def getSender(): Option[ActorRef] = sender
/**
* Akka Java API
* Akka Java API. <p/>
* The reference sender future of the last received message.
* Is defined if the message was sent with sent with '!!' or '!!!', else None.
*/
@ -263,7 +264,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
protected[akka] def uuid_=(uid: Uuid) = _uuid = uid
/**
* Akka Java API
* Akka Java API. <p/>
* Sends a one-way asynchronous message. E.g. fire-and-forget semantics.
* <p/>
* <pre>
@ -274,7 +275,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
def sendOneWay(message: AnyRef): Unit = sendOneWay(message, null)
/**
* Akka Java API
* Akka Java API. <p/>
* Sends a one-way asynchronous message. E.g. fire-and-forget semantics.
* <p/>
* Allows you to pass along the sender of the messag.
@ -287,21 +288,21 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
def sendOneWay(message: AnyRef, sender: ActorRef): Unit = this.!(message)(Option(sender))
/**
* Akka Java API
* Akka Java API. <p/>
* @see sendRequestReply(message: AnyRef, timeout: Long, sender: ActorRef)
* Uses the defualt timeout of the Actor (setTimeout()) and omits the sender reference
*/
def sendRequestReply(message: AnyRef): AnyRef = sendRequestReply(message, timeout, null)
/**
* Akka Java API
* Akka Java API. <p/>
* @see sendRequestReply(message: AnyRef, timeout: Long, sender: ActorRef)
* Uses the defualt timeout of the Actor (setTimeout())
*/
def sendRequestReply(message: AnyRef, sender: ActorRef): AnyRef = sendRequestReply(message, timeout, sender)
/**
* Akka Java API
* Akka Java API. <p/>
* Sends a message asynchronously and waits on a future for a reply message under the hood.
* <p/>
* It waits on the reply either until it receives it or until the timeout expires
@ -325,21 +326,21 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
}
/**
* Akka Java API
* Akka Java API. <p/>
* @see sendRequestReplyFuture(message: AnyRef, sender: ActorRef): Future[_]
* Uses the Actors default timeout (setTimeout()) and omits the sender
*/
def sendRequestReplyFuture[T](message: AnyRef): Future[T] = sendRequestReplyFuture(message, timeout, null).asInstanceOf[Future[T]]
def sendRequestReplyFuture[T <: AnyRef](message: AnyRef): Future[T] = sendRequestReplyFuture(message, timeout, null).asInstanceOf[Future[T]]
/**
* Akka Java API
* Akka Java API. <p/>
* @see sendRequestReplyFuture(message: AnyRef, sender: ActorRef): Future[_]
* Uses the Actors default timeout (setTimeout())
*/
def sendRequestReplyFuture(message: AnyRef, sender: ActorRef): Future[_] = sendRequestReplyFuture(message, timeout, sender)
def sendRequestReplyFuture[T <: AnyRef](message: AnyRef, sender: ActorRef): Future[T] = sendRequestReplyFuture(message, timeout, sender).asInstanceOf[Future[T]]
/**
* Akka Java API
* Akka Java API. <p/>
* Sends a message asynchronously returns a future holding the eventual reply message.
* <p/>
* <b>NOTE:</b>
@ -349,10 +350,10 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
* If you are sending messages using <code>sendRequestReplyFuture</code> then you <b>have to</b> use <code>getContext().reply(..)</code>
* to send a reply message to the original sender. If not then the sender will block until the timeout expires.
*/
def sendRequestReplyFuture(message: AnyRef, timeout: Long, sender: ActorRef): Future[_] = !!!(message, timeout)(Option(sender))
def sendRequestReplyFuture[T <: AnyRef](message: AnyRef, timeout: Long, sender: ActorRef): Future[T] = !!!(message, timeout)(Option(sender)).asInstanceOf[Future[T]]
/**
* Akka Java API
* Akka Java API. <p/>
* Forwards the message specified to this actor and preserves the original sender of the message
*/
def forward(message: AnyRef, sender: ActorRef): Unit =
@ -360,7 +361,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
else forward(message)(Some(sender))
/**
* Akka Java API
* Akka Java API. <p/>
* Use <code>getContext().replyUnsafe(..)</code> to reply with a message to the original sender of the message currently
* being processed.
* <p/>
@ -369,7 +370,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
def replyUnsafe(message: AnyRef) = reply(message)
/**
* Akka Java API
* Akka Java API. <p/>
* Use <code>getContext().replySafe(..)</code> to reply with a message to the original sender of the message currently
* being processed.
* <p/>
@ -383,7 +384,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
def actorClass: Class[_ <: Actor]
/**
* Akka Java API
* Akka Java API. <p/>
* Returns the class for the Actor instance that is managed by the ActorRef.
*/
def getActorClass(): Class[_ <: Actor] = actorClass
@ -394,7 +395,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
def actorClassName: String
/**
* Akka Java API
* Akka Java API. <p/>
* Returns the class name for the Actor instance that is managed by the ActorRef.
*/
def getActorClassName(): String = actorClassName
@ -479,7 +480,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
def mailboxSize = dispatcher.mailboxSize(this)
/**
* Akka Java API
* Akka Java API. <p/>
* Returns the mailbox size.
*/
def getMailboxSize(): Int = mailboxSize
@ -490,7 +491,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
def supervisor: Option[ActorRef]
/**
* Akka Java API
* Akka Java API. <p/>
* Returns the supervisor, if there is one.
*/
def getSupervisor(): ActorRef = supervisor getOrElse null
@ -502,7 +503,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
def linkedActors: JMap[Uuid, ActorRef]
/**
* Java API
* Java API. <p/>
* Returns an unmodifiable Java Map containing the linked actors,
* please note that the backing map is thread-safe but not immutable
*/
@ -527,7 +528,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
}
/**
* Java API.
* Java API. <p/>
* Abstraction for unification of sender and senderFuture for later reply
*/
def getChannel: Channel[Any] = channel

View file

@ -149,32 +149,32 @@ public class Pi {
}
}).start();
}
@Override
public void preStart() {
become(scatter);
}
// message handler
public void onReceive(Object message) {
throw new IllegalStateException("Should be gatter or scatter");
}
private final Procedure<Object> scatter = new Procedure<Object>() {
public void apply(Object msg) {
// schedule work
for (int arg = 0; arg < nrOfMessages; arg++) {
router.sendOneWay(new Work(arg, nrOfElements), getContext());
}
// TODO would like to use channel instead, wrong docs, channel() not there
// getContext().channel()
CompletableFuture<Object> resultFuture = getContext().getSenderFuture().get();
// Assume the gathering behavior
become(gatter(resultFuture));
}
};
};
private Procedure<Object> gatter(final CompletableFuture<Object> resultFuture) {
return new Procedure<Object>() {
public void apply(Object msg) {
@ -190,7 +190,7 @@ public class Pi {
getContext().stop();
}
}
};
};
}
@Override
@ -213,13 +213,13 @@ public class Pi {
return new Master(nrOfWorkers, nrOfMessages, nrOfElements);
}
}).start();
// start the calculation
long start = currentTimeMillis();
// send calculate message
long timeout = 60000;
Future<Double> replyFuture = (Future<Double>) master.sendRequestReplyFuture(new Calculate(), timeout, null);
Future<Double> replyFuture = master.sendRequestReplyFuture(new Calculate(), timeout, null);
Option<Double> result = replyFuture.await().resultOrException();
if (result.isDefined()) {
double pi = result.get();
@ -231,6 +231,6 @@ public class Pi {
// EventHandler.error(this, "Pi calculation did not complete within the timeout.");
System.out.println("Pi calculation did not complete within the timeout.");
}
}
}