Did some work on improving the Java API (UntypedActor)

This commit is contained in:
Jonas Bonér 2010-08-10 10:07:51 +02:00
parent 6d41299c61
commit 1948918322
7 changed files with 196 additions and 122 deletions

View file

@ -16,6 +16,8 @@ import com.google.protobuf.Message
import java.util.concurrent.TimeUnit
import java.net.InetSocketAddress
import scala.reflect.BeanProperty
/**
* Implements the Transactor abstraction. E.g. a transactional actor.
* <p/>
@ -43,15 +45,34 @@ abstract class RemoteActor(address: InetSocketAddress) extends Actor {
* Life-cycle messages for the Actors
*/
@serializable sealed trait LifeCycleMessage
case class HotSwap(code: Option[Actor.Receive]) extends LifeCycleMessage
case class Restart(reason: Throwable) extends LifeCycleMessage
case class Exit(dead: ActorRef, killer: Throwable) extends LifeCycleMessage
case class Link(child: ActorRef) extends LifeCycleMessage
case class Unlink(child: ActorRef) extends LifeCycleMessage
case class UnlinkAndStop(child: ActorRef) extends LifeCycleMessage
case class Exit(dead: ActorRef, killer: Throwable) extends LifeCycleMessage {
def this(child: UntypedActorRef, killer: Throwable) = this(child.actorRef, killer)
}
case class Link(child: ActorRef) extends LifeCycleMessage {
def this(child: UntypedActorRef) = this(child.actorRef)
}
case class Unlink(child: ActorRef) extends LifeCycleMessage {
def this(child: UntypedActorRef) = this(child.actorRef)
}
case class UnlinkAndStop(child: ActorRef) extends LifeCycleMessage {
def this(child: UntypedActorRef) = this(child.actorRef)
}
case object ReceiveTimeout extends LifeCycleMessage
case class MaximumNumberOfRestartsWithinTimeRangeReached(
victim: ActorRef, maxNrOfRetries: Int, withinTimeRange: Int, lastExceptionCausingRestart: Throwable) extends LifeCycleMessage
@BeanProperty val victim: ActorRef,
@BeanProperty val maxNrOfRetries: Int,
@BeanProperty val withinTimeRange: Int,
@BeanProperty val lastExceptionCausingRestart: Throwable) extends LifeCycleMessage
// Exceptions for Actors
class ActorStartException private[akka](message: String) extends RuntimeException(message)

View file

@ -75,7 +75,7 @@ trait ActorRef extends TransactionManagement with java.lang.Comparable[ActorRef]
@volatile protected[akka] var _timeoutActor: Option[ActorRef] = None
@volatile protected[akka] var startOnCreation = false
@volatile protected[akka] var registeredInRemoteNodeDuringSerialization = false
protected[this] val guard = new ReentrantGuard
protected[akka] val guard = new ReentrantGuard
/**
* User overridable callback/setting.

View file

@ -11,6 +11,8 @@ import se.scalablesolutions.akka.config.ScalaConfig._
import java.net.InetSocketAddress
import scala.reflect.BeanProperty
/**
* Subclass this abstract class to create a MDB-style untyped actor.
* <p/>
@ -19,32 +21,32 @@ import java.net.InetSocketAddress
* Here is an example on how to create and use an UntypedActor:
* <pre>
* public class SampleUntypedActor extends UntypedActor {
* public void onReceive(Object message, UntypedActorRef self) throws Exception {
* public void onReceive(Object message) throws Exception {
* if (message instanceof String) {
* String msg = (String)message;
*
* if (msg.equals("UseReply")) {
* // Reply to original sender of message using the 'replyUnsafe' method
* self.replyUnsafe(msg + ":" + self.getUuid());
* if (msg.equals("UseReply")) {
* // Reply to original sender of message using the 'replyUnsafe' method
* getContext().replyUnsafe(msg + ":" + getContext().getUuid());
*
* } else if (msg.equals("UseSender") && self.getSender().isDefined()) {
* // Reply to original sender of message using the sender reference
* // also passing along my own refererence (the self)
* self.getSender().get().sendOneWay(msg, self);
* } else if (msg.equals("UseSender") && getContext().getSender().isDefined()) {
* // Reply to original sender of message using the sender reference
* // also passing along my own refererence (the context)
* getContext().getSender().get().sendOneWay(msg, context);
*
* } else if (msg.equals("UseSenderFuture") && self.getSenderFuture().isDefined()) {
* // Reply to original sender of message using the sender future reference
* self.getSenderFuture().get().completeWithResult(msg);
* } else if (msg.equals("UseSenderFuture") && getContext().getSenderFuture().isDefined()) {
* // Reply to original sender of message using the sender future reference
* getContext().getSenderFuture().get().completeWithResult(msg);
*
* } else if (msg.equals("SendToSelf")) {
* // Send message to the actor itself recursively
* self.sendOneWay(msg)
* // Send message to the actor itself recursively
* getContext().sendOneWay(msg)
*
* } else if (msg.equals("ForwardMessage")) {
* // Retreive an actor from the ActorRegistry by ID and get an ActorRef back
* ActorRef actorRef = ActorRegistry.actorsFor("some-actor-id").head();
* // Wrap the ActorRef in an UntypedActorRef and forward the message to this actor
* UntypedActorRef.wrap(actorRef).forward(msg, self);
* UntypedActorRef.wrap(actorRef).forward(msg, context);
*
* } else throw new IllegalArgumentException("Unknown message: " + message);
* } else throw new IllegalArgumentException("Unknown message: " + message);
@ -62,19 +64,14 @@ import java.net.InetSocketAddress
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
abstract class UntypedActor extends Actor {
protected[akka] var context: Option[UntypedActorRef] = None
@BeanProperty val context = UntypedActorRef.wrap(self)
final protected def receive = {
case msg =>
if (context.isEmpty) {
val ctx = new UntypedActorRef(self)
context = Some(ctx)
onReceive(msg, ctx)
} else onReceive(msg, context.get)
case msg => onReceive(msg)
}
@throws(classOf[Exception])
def onReceive(message: Any, context: UntypedActorRef): Unit
def onReceive(message: Any): Unit
}
/**
@ -212,12 +209,12 @@ class UntypedActorRef(val actorRef: ActorRef) {
*
* Trap all exceptions:
* <pre>
* context.setTrapExit(new Class[]{Throwable.class});
* getContext().setTrapExit(new Class[]{Throwable.class});
* </pre>
*
* Trap specific exceptions only:
* <pre>
* context.setTrapExit(new Class[]{MyApplicationException.class, MyApplicationError.class});
* getContext().setTrapExit(new Class[]{MyApplicationException.class, MyApplicationError.class});
* </pre>
*/
def setTrapExit(exceptions: Array[Class[_ <: Throwable]]) = actorRef.trapExit = exceptions.toList
@ -228,11 +225,11 @@ class UntypedActorRef(val actorRef: ActorRef) {
* <p/>
* Can be one of:
* <pre>
* context.setFaultHandler(new AllForOneStrategy(maxNrOfRetries, withinTimeRange));
* getContext().setFaultHandler(new AllForOneStrategy(maxNrOfRetries, withinTimeRange));
* </pre>
* Or:
* <pre>
* context.setFaultHandler(new OneForOneStrategy(maxNrOfRetries, withinTimeRange));
* getContext().setFaultHandler(new OneForOneStrategy(maxNrOfRetries, withinTimeRange));
* </pre>
*/
def setFaultHandler(handler: FaultHandlingStrategy) = actorRef.faultHandler = Some(handler)
@ -263,8 +260,8 @@ class UntypedActorRef(val actorRef: ActorRef) {
* Is defined if the message was sent from another Actor, else None.
*/
def getSender(): Option[UntypedActorRef] = actorRef.sender match {
case Some(s) => Some(UntypedActorRef.wrap(s))
case None => None
case Some(s) => Some(UntypedActorRef.wrap(s))
case None => None
}
/**
@ -310,8 +307,8 @@ class UntypedActorRef(val actorRef: ActorRef) {
* <p/>
*/
def sendOneWay(message: AnyRef, sender: UntypedActorRef) =
if (sender eq null) actorRef.!(message)(None)
else actorRef.!(message)(Some(sender.actorRef))
if (sender eq null) actorRef.!(message)(None)
else actorRef.!(message)(Some(sender.actorRef))
/**
* Sends a message asynchronously and waits on a future for a reply message under the hood. The timeout is taken from
@ -321,14 +318,14 @@ class UntypedActorRef(val actorRef: ActorRef) {
* (which will throw an ActorTimeoutException). E.g. send-and-receive-eventually semantics.
* <p/>
* <b>NOTE:</b>
* Use this method with care. In most cases it is better to use 'sendOneWay' together with 'context.getSender()' to
* Use this method with care. In most cases it is better to use 'sendOneWay' together with 'getContext().getSender()' to
* implement request/response message exchanges.
* <p/>
* If you are sending messages using <code>sendRequestReply</code> then you <b>have to</b> use <code>context.reply(..)</code>
* If you are sending messages using <code>sendRequestReply</code> then you <b>have to</b> use <code>getContext().reply(..)</code>
* to send a reply message to the original sender. If not then the sender will block until the timeout expires.
*/
def sendRequestReply(message: AnyRef): AnyRef =
actorRef.!!(message)(None).getOrElse(throw new ActorTimeoutException(
actorRef.!!(message)(None).getOrElse(throw new ActorTimeoutException(
"Message [" + message +
"]\n\tsent to [" + actorRef.actorClassName +
"]\n\twith timeout [" + actorRef.timeout +
@ -343,16 +340,16 @@ class UntypedActorRef(val actorRef: ActorRef) {
* (which will throw an ActorTimeoutException). E.g. send-and-receive-eventually semantics.
* <p/>
* <b>NOTE:</b>
* Use this method with care. In most cases it is better to use 'sendOneWay' together with 'context.getSender()' to
* Use this method with care. In most cases it is better to use 'sendOneWay' together with 'getContext().getSender()' to
* implement request/response message exchanges.
* <p/>
* If you are sending messages using <code>sendRequestReply</code> then you <b>have to</b> use <code>context.reply(..)</code>
* If you are sending messages using <code>sendRequestReply</code> then you <b>have to</b> use <code>getContext().reply(..)</code>
* to send a reply message to the original sender. If not then the sender will block until the timeout expires.
*/
def sendRequestReply(message: AnyRef, sender: UntypedActorRef): AnyRef = {
val result = if (sender eq null) actorRef.!!(message)(None)
else actorRef.!!(message)(Some(sender.actorRef))
result.getOrElse(throw new ActorTimeoutException(
val result = if (sender eq null) actorRef.!!(message)(None)
else actorRef.!!(message)(Some(sender.actorRef))
result.getOrElse(throw new ActorTimeoutException(
"Message [" + message +
"]\n\tsent to [" + actorRef.actorClassName +
"]\n\tfrom [" + sender.actorRef.actorClassName +
@ -368,14 +365,14 @@ class UntypedActorRef(val actorRef: ActorRef) {
* (which will throw an ActorTimeoutException). E.g. send-and-receive-eventually semantics.
* <p/>
* <b>NOTE:</b>
* Use this method with care. In most cases it is better to use 'sendOneWay' together with 'context.getSender()' to
* Use this method with care. In most cases it is better to use 'sendOneWay' together with 'getContext().getSender()' to
* implement request/response message exchanges.
* <p/>
* If you are sending messages using <code>sendRequestReply</code> then you <b>have to</b> use <code>context.reply(..)</code>
* If you are sending messages using <code>sendRequestReply</code> then you <b>have to</b> use <code>getContext().reply(..)</code>
* to send a reply message to the original sender. If not then the sender will block until the timeout expires.
*/
def sendRequestReply(message: AnyRef, timeout: Long): AnyRef =
actorRef.!!(message, timeout)(None).getOrElse(throw new ActorTimeoutException(
actorRef.!!(message, timeout)(None).getOrElse(throw new ActorTimeoutException(
"Message [" + message +
"]\n\tsent to [" + actorRef.actorClassName +
"]\n\twith timeout [" + timeout +
@ -389,16 +386,16 @@ class UntypedActorRef(val actorRef: ActorRef) {
* (which will throw an ActorTimeoutException). E.g. send-and-receive-eventually semantics.
* <p/>
* <b>NOTE:</b>
* Use this method with care. In most cases it is better to use 'sendOneWay' together with 'context.getSender()' to
* Use this method with care. In most cases it is better to use 'sendOneWay' together with 'getContext().getSender()' to
* implement request/response message exchanges.
* <p/>
* If you are sending messages using <code>sendRequestReply</code> then you <b>have to</b> use <code>context.reply(..)</code>
* If you are sending messages using <code>sendRequestReply</code> then you <b>have to</b> use <code>getContext().reply(..)</code>
* to send a reply message to the original sender. If not then the sender will block until the timeout expires.
*/
def sendRequestReply(message: AnyRef, timeout: Long, sender: UntypedActorRef): AnyRef = {
val result = if (sender eq null) actorRef.!!(message, timeout)(None)
else actorRef.!!(message)(Some(sender.actorRef))
result.getOrElse(throw new ActorTimeoutException(
val result = if (sender eq null) actorRef.!!(message, timeout)(None)
else actorRef.!!(message)(Some(sender.actorRef))
result.getOrElse(throw new ActorTimeoutException(
"Message [" + message +
"]\n\tsent to [" + actorRef.actorClassName +
"]\n\tfrom [" + sender.actorRef.actorClassName +
@ -412,10 +409,10 @@ class UntypedActorRef(val actorRef: ActorRef) {
* the default timeout in the Actor.
* <p/>
* <b>NOTE:</b>
* Use this method with care. In most cases it is better to use 'sendOneWay' together with the 'context.getSender()' to
* Use this method with care. In most cases it is better to use 'sendOneWay' together with the 'getContext().getSender()' to
* implement request/response message exchanges.
* <p/>
* If you are sending messages using <code>sendRequestReplyFuture</code> then you <b>have to</b> use <code>context.reply(..)</code>
* If you are sending messages using <code>sendRequestReplyFuture</code> then you <b>have to</b> use <code>getContext().reply(..)</code>
* to send a reply message to the original sender. If not then the sender will block until the timeout expires.
*/
def sendRequestReplyFuture(message: AnyRef): Future[_] = actorRef.!!!(message)(None)
@ -425,24 +422,24 @@ class UntypedActorRef(val actorRef: ActorRef) {
* the default timeout in the Actor.
* <p/>
* <b>NOTE:</b>
* Use this method with care. In most cases it is better to use 'sendOneWay' together with the 'context.getSender()' to
* Use this method with care. In most cases it is better to use 'sendOneWay' together with the 'getContext().getSender()' to
* implement request/response message exchanges.
* <p/>
* If you are sending messages using <code>sendRequestReplyFuture</code> then you <b>have to</b> use <code>context.reply(..)</code>
* If you are sending messages using <code>sendRequestReplyFuture</code> then you <b>have to</b> use <code>getContext().reply(..)</code>
* to send a reply message to the original sender. If not then the sender will block until the timeout expires.
*/
def sendRequestReplyFuture(message: AnyRef, sender: UntypedActorRef): Future[_] =
if (sender eq null) actorRef.!!!(message)(None)
else actorRef.!!!(message)(Some(sender.actorRef))
if (sender eq null) actorRef.!!!(message)(None)
else actorRef.!!!(message)(Some(sender.actorRef))
/**
* Sends a message asynchronously returns a future holding the eventual reply message.
* <p/>
* <b>NOTE:</b>
* Use this method with care. In most cases it is better to use 'sendOneWay' together with the 'context.getSender()' to
* Use this method with care. In most cases it is better to use 'sendOneWay' together with the 'getContext().getSender()' to
* implement request/response message exchanges.
* <p/>
* If you are sending messages using <code>sendRequestReplyFuture</code> then you <b>have to</b> use <code>context.reply(..)</code>
* If you are sending messages using <code>sendRequestReplyFuture</code> then you <b>have to</b> use <code>getContext().reply(..)</code>
* to send a reply message to the original sender. If not then the sender will block until the timeout expires.
*/
def sendRequestReplyFuture(message: AnyRef, timeout: Long): Future[_] = actorRef.!!!(message, timeout)(None)
@ -451,15 +448,15 @@ class UntypedActorRef(val actorRef: ActorRef) {
* Sends a message asynchronously returns a future holding the eventual reply message.
* <p/>
* <b>NOTE:</b>
* Use this method with care. In most cases it is better to use 'sendOneWay' together with the 'context.getSender()' to
* Use this method with care. In most cases it is better to use 'sendOneWay' together with the 'getContext().getSender()' to
* implement request/response message exchanges.
* <p/>
* If you are sending messages using <code>sendRequestReplyFuture</code> then you <b>have to</b> use <code>context.reply(..)</code>
* If you are sending messages using <code>sendRequestReplyFuture</code> then you <b>have to</b> use <code>getContext().reply(..)</code>
* to send a reply message to the original sender. If not then the sender will block until the timeout expires.
*/
def sendRequestReplyFuture(message: AnyRef, timeout: Long, sender: UntypedActorRef): Future[_] =
if (sender eq null) actorRef.!!!(message, timeout)(None)
else actorRef.!!!(message)(Some(sender.actorRef))
if (sender eq null) actorRef.!!!(message, timeout)(None)
else actorRef.!!!(message)(Some(sender.actorRef))
/**
* Forwards the message and passes the original sender actor as the sender.
@ -467,11 +464,11 @@ class UntypedActorRef(val actorRef: ActorRef) {
* Works with 'sendOneWay', 'sendRequestReply' and 'sendRequestReplyFuture'.
*/
def forward(message: AnyRef, sender: UntypedActorRef): Unit =
if (sender eq null) throw new IllegalArgumentException("The 'sender' argument to 'forward' can't be null")
else actorRef.forward(message)(Some(sender.actorRef))
if (sender eq null) throw new IllegalArgumentException("The 'sender' argument to 'forward' can't be null")
else actorRef.forward(message)(Some(sender.actorRef))
/**
* Use <code>context.replyUnsafe(..)</code> to reply with a message to the original sender of the message currently
* Use <code>getContext().replyUnsafe(..)</code> to reply with a message to the original sender of the message currently
* being processed.
* <p/>
* Throws an IllegalStateException if unable to determine what to reply to.
@ -479,7 +476,7 @@ class UntypedActorRef(val actorRef: ActorRef) {
def replyUnsafe(message: AnyRef): Unit = actorRef.reply(message)
/**
* Use <code>context.replySafe(..)</code> to reply with a message to the original sender of the message currently
* Use <code>getContext().replySafe(..)</code> to reply with a message to the original sender of the message currently
* being processed.
* <p/>
* Returns true if reply was sent, and false if unable to determine what to reply to.
@ -499,18 +496,18 @@ class UntypedActorRef(val actorRef: ActorRef) {
/**
* Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host.
*/
def makeRemote(hostname: String, port: Int): Unit = actorRef.makeRemote(hostname, port)
def makeRemote(hostname: String, port: Int): Unit = actorRef.makeRemote(hostname, port)
/**
* Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host.
*/
def makeRemote(address: InetSocketAddress): Unit = actorRef.makeRemote(address)
def makeRemote(address: InetSocketAddress): Unit = actorRef.makeRemote(address)
/**
* Invoking 'makeTransactionRequired' means that the actor will **start** a new transaction if non exists.
* However, it will always participate in an existing transaction.
*/
def makeTransactionRequired(): Unit = actorRef.makeTransactionRequired
def makeTransactionRequired(): Unit = actorRef.makeTransactionRequired
/**
* Sets the transaction configuration for this actor. Needs to be invoked before the actor is started.
@ -550,31 +547,90 @@ class UntypedActorRef(val actorRef: ActorRef) {
* 'trap' these exceptions and automatically restart the linked actors according to the restart strategy
* defined by the 'faultHandler'.
*/
def link(actor: UntypedActorRef): Unit = actorRef.link(actor.actorRef)
def link(actor: UntypedActorRef): Unit = actorRef.link(actor.actorRef)
/**
* Unlink the actor.
*/
def unlink(actor: UntypedActorRef): Unit = actorRef.unlink(actor.actorRef)
def unlink(actor: UntypedActorRef): Unit = actorRef.unlink(actor.actorRef)
/**
* Atomically start and link an actor.
*/
def startLink(actor: UntypedActorRef): Unit = actorRef.startLink(actor.actorRef)
def startLink(actor: UntypedActorRef): Unit = actorRef.startLink(actor.actorRef)
/**
* Atomically start, link and make an actor remote.
*/
def startLinkRemote(actor: UntypedActorRef, hostname: String, port: Int): Unit =
actorRef.startLinkRemote(actor.actorRef, hostname, port)
def startLinkRemote(actor: UntypedActorRef, hostname: String, port: Int): Unit =
actorRef.startLinkRemote(actor.actorRef, hostname, port)
/**
* Atomically create (from actor class) and start an actor.
* <p/>
* To be invoked from within the actor itself.
*/
def spawn(clazz: Class[_]): ActorRef = actorRef.guard.withGuard {
val actorRef = spawnButDoNotStart(clazz)
actorRef.start
actorRef
}
/**
* Atomically create (from actor class), start and make an actor remote.
* <p/>
* To be invoked from within the actor itself.
*/
def spawnRemote(clazz: Class[_], hostname: String, port: Int): ActorRef = actorRef.guard.withGuard {
val actor = spawnButDoNotStart(clazz)
actor.makeRemote(hostname, port)
actor.start
actor
}
/**
* Atomically create (from actor class), start and link an actor.
* <p/>
* To be invoked from within the actor itself.
*/
def spawnLink(clazz: Class[_]): ActorRef = actorRef.guard.withGuard {
val actor = spawnButDoNotStart(clazz)
try {
actor.start
} finally {
actorRef.link(actor)
}
actor
}
/**
* Atomically create (from actor class), start, link and make an actor remote.
* <p/>
* To be invoked from within the actor itself.
*/
def spawnLinkRemote(clazz: Class[_], hostname: String, port: Int): ActorRef = actorRef.guard.withGuard {
val actor = spawnButDoNotStart(clazz)
try {
actor.makeRemote(hostname, port)
actor.start
} finally {
actorRef.link(actor)
}
}
/**
* Returns the mailbox size.
*/
def getMailboxSize(): Int = actorRef.mailboxSize
/**
* Returns the current supervisor if there is one, null if not.
*/
def getSupervisor(): UntypedActorRef = UntypedActorRef.wrap(actorRef.supervisor.getOrElse(null))
private def spawnButDoNotStart(clazz: Class[_]): ActorRef = actorRef.guard.withGuard {
val actor = UntypedActor.actorOf(clazz)
if (!actorRef.dispatcher.isInstanceOf[ThreadBasedDispatcher]) actor.actorRef.dispatcher = actorRef.dispatcher
actorRef
}
}

View file

@ -26,6 +26,7 @@ import java.util.concurrent.{TimeUnit, Executors, ConcurrentMap, ConcurrentHashM
import java.util.concurrent.atomic.AtomicLong
import scala.collection.mutable.{HashSet, HashMap}
import scala.reflect.BeanProperty
/**
* Atomic remote request/reply message id generator.
@ -43,9 +44,9 @@ object RemoteRequestProtocolIdFactory {
* Life-cycle events for RemoteClient.
*/
sealed trait RemoteClientLifeCycleEvent
case class RemoteClientError(cause: Throwable, host: String, port: Int) extends RemoteClientLifeCycleEvent
case class RemoteClientDisconnected(host: String, port: Int) extends RemoteClientLifeCycleEvent
case class RemoteClientConnected(host: String, port: Int) extends RemoteClientLifeCycleEvent
case class RemoteClientError(@BeanProperty val cause: Throwable, @BeanProperty val host: String, @BeanProperty val port: Int) extends RemoteClientLifeCycleEvent
case class RemoteClientDisconnected(@BeanProperty val host: String, @BeanProperty val port: Int) extends RemoteClientLifeCycleEvent
case class RemoteClientConnected(@BeanProperty val host: String, @BeanProperty val port: Int) extends RemoteClientLifeCycleEvent
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>

View file

@ -3,26 +3,22 @@ package se.scalablesolutions.akka.actor;
import se.scalablesolutions.akka.actor.*;
public class ReplyUntypedActor extends UntypedActor {
public void onReceive(Object message, UntypedActorRef context) throws Exception {
if (message instanceof String) {
public void onReceive(Object message) throws Exception {
if (message instanceof String) {
String str = (String)message;
if (str.equals("ReplyToSendOneWayUsingReply")) {
context.replyUnsafe("Reply");
if (str.equals("ReplyToSendOneWayUsingReply")) {
getContext().replyUnsafe("Reply");
} else if (str.equals("ReplyToSendOneWayUsingSender")) {
context.getSender().get().sendOneWay("Reply");
getContext().getSender().get().sendOneWay("Reply");
} else if (str.equals("ReplyToSendRequestReplyUsingReply")) {
context.replyUnsafe("Reply");
getContext().replyUnsafe("Reply");
} else if (str.equals("ReplyToSendRequestReplyUsingFuture")) {
context.getSenderFuture().get().completeWithResult("Reply");
getContext().getSenderFuture().get().completeWithResult("Reply");
} else if (str.equals("ReplyToSendRequestReplyFutureUsingReply")) {
context.replyUnsafe("Reply");
getContext().replyUnsafe("Reply");
} else if (str.equals("ReplyToSendRequestReplyFutureUsingFuture")) {
context.getSenderFuture().get().completeWithResult("Reply");
getContext().getSenderFuture().get().completeWithResult("Reply");
} else throw new IllegalArgumentException("Unknown message: " + str);
} else throw new IllegalArgumentException("Unknown message: " + message);
} else throw new IllegalArgumentException("Unknown message: " + message);
}
}

View file

@ -12,33 +12,33 @@ import se.scalablesolutions.akka.actor.*;
*/
public class SampleUntypedActor extends UntypedActor {
public void onReceive(Object message, UntypedActorRef self) throws Exception {
if (message instanceof String) {
public void onReceive(Object message) throws Exception {
if (message instanceof String) {
String msg = (String)message;
System.out.println("Received message: " + msg);
System.out.println("Received message: " + msg);
if (msg.equals("UseReply")) {
// Reply to original sender of message using the 'replyUnsafe' method
self.replyUnsafe(msg + ":" + self.getUuid());
if (msg.equals("UseReply")) {
// Reply to original sender of message using the 'replyUnsafe' method
getContext().replyUnsafe(msg + ":" + getContext().getUuid());
} else if (msg.equals("UseSender") && self.getSender().isDefined()) {
// Reply to original sender of message using the sender reference
// also passing along my own refererence (the self)
self.getSender().get().sendOneWay(msg, self);
} else if (msg.equals("UseSender") && getContext().getSender().isDefined()) {
// Reply to original sender of message using the sender reference
// also passing along my own refererence (the context)
getContext().getSender().get().sendOneWay(msg, getContext());
} else if (msg.equals("UseSenderFuture") && self.getSenderFuture().isDefined()) {
// Reply to original sender of message using the sender future reference
self.getSenderFuture().get().completeWithResult(msg);
} else if (msg.equals("UseSenderFuture") && getContext().getSenderFuture().isDefined()) {
// Reply to original sender of message using the sender future reference
getContext().getSenderFuture().get().completeWithResult(msg);
} else if (msg.equals("SendToSelf")) {
// Send fire-forget message to the actor itself recursively
self.sendOneWay(msg);
} else if (msg.equals("SendToSelf")) {
// Send fire-forget message to the actor recursively
getContext().sendOneWay(msg);
} else if (msg.equals("ForwardMessage")) {
// Retreive an actor from the ActorRegistry by ID and get an ActorRef back
ActorRef actorRef = ActorRegistry.actorsFor("some-actor-id")[0];
// Wrap the ActorRef in an UntypedActorRef and forward the message to this actor
UntypedActorRef.wrap(actorRef).forward(msg, self);
} else if (msg.equals("ForwardMessage")) {
// Retreive an actor from the ActorRegistry by ID and get an ActorRef back
ActorRef actorRef = ActorRegistry.actorsFor("some-actor-id")[0];
// Wrap the ActorRef in an UntypedActorRef and forward the message to this actor
UntypedActorRef.wrap(actorRef).forward(msg, getContext());
} else throw new IllegalArgumentException("Unknown message: " + message);
} else throw new IllegalArgumentException("Unknown message: " + message);

View file

@ -6,31 +6,31 @@ import se.scalablesolutions.akka.dispatch.CompletableFuture;
public class SenderUntypedActor extends UntypedActor {
private UntypedActorRef replyActor = null;
public void onReceive(Object message, UntypedActorRef context) throws Exception {
public void onReceive(Object message) throws Exception {
if (message instanceof UntypedActorRef) replyActor = (UntypedActorRef)message;
else if (message instanceof String) {
if (replyActor == null) throw new IllegalStateException("Need to receive a ReplyUntypedActor before any other message.");
String str = (String)message;
if (str.equals("ReplyToSendOneWayUsingReply")) {
replyActor.sendOneWay("ReplyToSendOneWayUsingReply", context);
replyActor.sendOneWay("ReplyToSendOneWayUsingReply", getContext());
} else if (str.equals("ReplyToSendOneWayUsingSender")) {
replyActor.sendOneWay("ReplyToSendOneWayUsingSender", context);
replyActor.sendOneWay("ReplyToSendOneWayUsingSender", getContext());
} else if (str.equals("ReplyToSendRequestReplyUsingReply")) {
UntypedActorTestState.log = (String)replyActor.sendRequestReply("ReplyToSendRequestReplyUsingReply", context);
UntypedActorTestState.log = (String)replyActor.sendRequestReply("ReplyToSendRequestReplyUsingReply", getContext());
UntypedActorTestState.finished.await();
} else if (str.equals("ReplyToSendRequestReplyUsingFuture")) {
UntypedActorTestState.log = (String)replyActor.sendRequestReply("ReplyToSendRequestReplyUsingFuture", context);
UntypedActorTestState.log = (String)replyActor.sendRequestReply("ReplyToSendRequestReplyUsingFuture", getContext());
UntypedActorTestState.finished.await();
} else if (str.equals("ReplyToSendRequestReplyFutureUsingReply")) {
CompletableFuture future = (CompletableFuture)replyActor.sendRequestReplyFuture("ReplyToSendRequestReplyFutureUsingReply", context);
CompletableFuture future = (CompletableFuture)replyActor.sendRequestReplyFuture("ReplyToSendRequestReplyFutureUsingReply", getContext());
future.await();
UntypedActorTestState.log = (String)future.result().get();
UntypedActorTestState.finished.await();
} else if (str.equals("ReplyToSendRequestReplyFutureUsingFuture")) {
CompletableFuture future = (CompletableFuture)replyActor.sendRequestReplyFuture("ReplyToSendRequestReplyFutureUsingFuture", context);
CompletableFuture future = (CompletableFuture)replyActor.sendRequestReplyFuture("ReplyToSendRequestReplyFutureUsingFuture", getContext());
future.await();
UntypedActorTestState.log = (String)future.result().get();
UntypedActorTestState.finished.await();