From 1948918322b902ca6959853fd20db5e3547de9fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bon=C3=A9r?= Date: Tue, 10 Aug 2010 10:07:51 +0200 Subject: [PATCH] Did some work on improving the Java API (UntypedActor) --- akka-core/src/main/scala/actor/Actor.scala | 31 ++- akka-core/src/main/scala/actor/ActorRef.scala | 2 +- .../src/main/scala/actor/UntypedActor.scala | 198 +++++++++++------- .../src/main/scala/remote/RemoteClient.scala | 7 +- .../akka/actor/ReplyUntypedActor.java | 24 +-- .../akka/actor/SampleUntypedActor.java | 42 ++-- .../akka/actor/SenderUntypedActor.java | 14 +- 7 files changed, 196 insertions(+), 122 deletions(-) diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index 5b736abb70..0227e05d6d 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -16,6 +16,8 @@ import com.google.protobuf.Message import java.util.concurrent.TimeUnit import java.net.InetSocketAddress +import scala.reflect.BeanProperty + /** * Implements the Transactor abstraction. E.g. a transactional actor. *

@@ -43,15 +45,34 @@ abstract class RemoteActor(address: InetSocketAddress) extends Actor { * Life-cycle messages for the Actors */ @serializable sealed trait LifeCycleMessage + case class HotSwap(code: Option[Actor.Receive]) extends LifeCycleMessage + case class Restart(reason: Throwable) extends LifeCycleMessage -case class Exit(dead: ActorRef, killer: Throwable) extends LifeCycleMessage -case class Link(child: ActorRef) extends LifeCycleMessage -case class Unlink(child: ActorRef) extends LifeCycleMessage -case class UnlinkAndStop(child: ActorRef) extends LifeCycleMessage + +case class Exit(dead: ActorRef, killer: Throwable) extends LifeCycleMessage { + def this(child: UntypedActorRef, killer: Throwable) = this(child.actorRef, killer) +} + +case class Link(child: ActorRef) extends LifeCycleMessage { + def this(child: UntypedActorRef) = this(child.actorRef) +} + +case class Unlink(child: ActorRef) extends LifeCycleMessage { + def this(child: UntypedActorRef) = this(child.actorRef) +} + +case class UnlinkAndStop(child: ActorRef) extends LifeCycleMessage { + def this(child: UntypedActorRef) = this(child.actorRef) +} + case object ReceiveTimeout extends LifeCycleMessage + case class MaximumNumberOfRestartsWithinTimeRangeReached( - victim: ActorRef, maxNrOfRetries: Int, withinTimeRange: Int, lastExceptionCausingRestart: Throwable) extends LifeCycleMessage + @BeanProperty val victim: ActorRef, + @BeanProperty val maxNrOfRetries: Int, + @BeanProperty val withinTimeRange: Int, + @BeanProperty val lastExceptionCausingRestart: Throwable) extends LifeCycleMessage // Exceptions for Actors class ActorStartException private[akka](message: String) extends RuntimeException(message) diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala index 902943f24d..b9ce02721f 100644 --- a/akka-core/src/main/scala/actor/ActorRef.scala +++ b/akka-core/src/main/scala/actor/ActorRef.scala @@ -75,7 +75,7 @@ trait ActorRef extends TransactionManagement with java.lang.Comparable[ActorRef] @volatile protected[akka] var _timeoutActor: Option[ActorRef] = None @volatile protected[akka] var startOnCreation = false @volatile protected[akka] var registeredInRemoteNodeDuringSerialization = false - protected[this] val guard = new ReentrantGuard + protected[akka] val guard = new ReentrantGuard /** * User overridable callback/setting. diff --git a/akka-core/src/main/scala/actor/UntypedActor.scala b/akka-core/src/main/scala/actor/UntypedActor.scala index 8ea36531e8..b0c4c2ff04 100644 --- a/akka-core/src/main/scala/actor/UntypedActor.scala +++ b/akka-core/src/main/scala/actor/UntypedActor.scala @@ -11,6 +11,8 @@ import se.scalablesolutions.akka.config.ScalaConfig._ import java.net.InetSocketAddress +import scala.reflect.BeanProperty + /** * Subclass this abstract class to create a MDB-style untyped actor. *

@@ -19,32 +21,32 @@ import java.net.InetSocketAddress * Here is an example on how to create and use an UntypedActor: *

  *  public class SampleUntypedActor extends UntypedActor {
- *    public void onReceive(Object message, UntypedActorRef self) throws Exception {
+ *    public void onReceive(Object message) throws Exception {
  *      if (message instanceof String) {
  *        String msg = (String)message;
  *
- *            if (msg.equals("UseReply")) {
- *                  // Reply to original sender of message using the 'replyUnsafe' method
- *                  self.replyUnsafe(msg + ":" + self.getUuid());
+ *        if (msg.equals("UseReply")) {
+ *          // Reply to original sender of message using the 'replyUnsafe' method
+ *          getContext().replyUnsafe(msg + ":" + getContext().getUuid());
  *
- *            } else if (msg.equals("UseSender") && self.getSender().isDefined()) {     
- *                  // Reply to original sender of message using the sender reference
- *                  // also passing along my own refererence (the self)
- *                  self.getSender().get().sendOneWay(msg, self); 
+ *        } else if (msg.equals("UseSender") && getContext().getSender().isDefined()) {     
+ *          // Reply to original sender of message using the sender reference
+ *          // also passing along my own refererence (the context)
+ *          getContext().getSender().get().sendOneWay(msg, context); 
  *
- *        } else if (msg.equals("UseSenderFuture") && self.getSenderFuture().isDefined()) {     
- *                  // Reply to original sender of message using the sender future reference
- *                  self.getSenderFuture().get().completeWithResult(msg);
+ *        } else if (msg.equals("UseSenderFuture") && getContext().getSenderFuture().isDefined()) {     
+ *          // Reply to original sender of message using the sender future reference
+ *          getContext().getSenderFuture().get().completeWithResult(msg);
  *
  *        } else if (msg.equals("SendToSelf")) {
- *                  // Send message to the actor itself recursively
- *                  self.sendOneWay(msg)
+ *          // Send message to the actor itself recursively
+ *          getContext().sendOneWay(msg)
  *
  *        } else if (msg.equals("ForwardMessage")) {
  *          // Retreive an actor from the ActorRegistry by ID and get an ActorRef back
  *          ActorRef actorRef = ActorRegistry.actorsFor("some-actor-id").head();
  *          // Wrap the ActorRef in an UntypedActorRef and forward the message to this actor
- *          UntypedActorRef.wrap(actorRef).forward(msg, self);
+ *          UntypedActorRef.wrap(actorRef).forward(msg, context);
  *
  *        } else throw new IllegalArgumentException("Unknown message: " + message);
  *      } else throw new IllegalArgumentException("Unknown message: " + message);
@@ -62,19 +64,14 @@ import java.net.InetSocketAddress
  * @author Jonas Bonér
  */
 abstract class UntypedActor extends Actor {
-  protected[akka] var context: Option[UntypedActorRef] = None
+  @BeanProperty val context = UntypedActorRef.wrap(self)
 
   final protected def receive = {
-    case msg => 
-      if (context.isEmpty) {
-        val ctx = new UntypedActorRef(self)
-        context = Some(ctx)
-        onReceive(msg, ctx)
-      } else onReceive(msg, context.get)
+    case msg => onReceive(msg)
   }
 
   @throws(classOf[Exception])
-  def onReceive(message: Any, context: UntypedActorRef): Unit
+  def onReceive(message: Any): Unit
 }
 
 /**
@@ -212,12 +209,12 @@ class UntypedActorRef(val actorRef: ActorRef) {
    *
    * Trap all exceptions:
    * 
-   * context.setTrapExit(new Class[]{Throwable.class});
+   * getContext().setTrapExit(new Class[]{Throwable.class});
    * 
* * Trap specific exceptions only: *
-   * context.setTrapExit(new Class[]{MyApplicationException.class, MyApplicationError.class});
+   * getContext().setTrapExit(new Class[]{MyApplicationException.class, MyApplicationError.class});
    * 
*/ def setTrapExit(exceptions: Array[Class[_ <: Throwable]]) = actorRef.trapExit = exceptions.toList @@ -228,11 +225,11 @@ class UntypedActorRef(val actorRef: ActorRef) { *

* Can be one of: *

-   *  context.setFaultHandler(new AllForOneStrategy(maxNrOfRetries, withinTimeRange));
+   *  getContext().setFaultHandler(new AllForOneStrategy(maxNrOfRetries, withinTimeRange));
    * 
* Or: *
-   *  context.setFaultHandler(new OneForOneStrategy(maxNrOfRetries, withinTimeRange));
+   *  getContext().setFaultHandler(new OneForOneStrategy(maxNrOfRetries, withinTimeRange));
    * 
*/ def setFaultHandler(handler: FaultHandlingStrategy) = actorRef.faultHandler = Some(handler) @@ -263,8 +260,8 @@ class UntypedActorRef(val actorRef: ActorRef) { * Is defined if the message was sent from another Actor, else None. */ def getSender(): Option[UntypedActorRef] = actorRef.sender match { - case Some(s) => Some(UntypedActorRef.wrap(s)) - case None => None + case Some(s) => Some(UntypedActorRef.wrap(s)) + case None => None } /** @@ -310,8 +307,8 @@ class UntypedActorRef(val actorRef: ActorRef) { *

*/ def sendOneWay(message: AnyRef, sender: UntypedActorRef) = - if (sender eq null) actorRef.!(message)(None) - else actorRef.!(message)(Some(sender.actorRef)) + if (sender eq null) actorRef.!(message)(None) + else actorRef.!(message)(Some(sender.actorRef)) /** * Sends a message asynchronously and waits on a future for a reply message under the hood. The timeout is taken from @@ -321,14 +318,14 @@ class UntypedActorRef(val actorRef: ActorRef) { * (which will throw an ActorTimeoutException). E.g. send-and-receive-eventually semantics. *

* NOTE: - * Use this method with care. In most cases it is better to use 'sendOneWay' together with 'context.getSender()' to + * Use this method with care. In most cases it is better to use 'sendOneWay' together with 'getContext().getSender()' to * implement request/response message exchanges. *

- * If you are sending messages using sendRequestReply then you have to use context.reply(..) + * If you are sending messages using sendRequestReply then you have to use getContext().reply(..) * to send a reply message to the original sender. If not then the sender will block until the timeout expires. */ def sendRequestReply(message: AnyRef): AnyRef = - actorRef.!!(message)(None).getOrElse(throw new ActorTimeoutException( + actorRef.!!(message)(None).getOrElse(throw new ActorTimeoutException( "Message [" + message + "]\n\tsent to [" + actorRef.actorClassName + "]\n\twith timeout [" + actorRef.timeout + @@ -343,16 +340,16 @@ class UntypedActorRef(val actorRef: ActorRef) { * (which will throw an ActorTimeoutException). E.g. send-and-receive-eventually semantics. *

* NOTE: - * Use this method with care. In most cases it is better to use 'sendOneWay' together with 'context.getSender()' to + * Use this method with care. In most cases it is better to use 'sendOneWay' together with 'getContext().getSender()' to * implement request/response message exchanges. *

- * If you are sending messages using sendRequestReply then you have to use context.reply(..) + * If you are sending messages using sendRequestReply then you have to use getContext().reply(..) * to send a reply message to the original sender. If not then the sender will block until the timeout expires. */ def sendRequestReply(message: AnyRef, sender: UntypedActorRef): AnyRef = { - val result = if (sender eq null) actorRef.!!(message)(None) - else actorRef.!!(message)(Some(sender.actorRef)) - result.getOrElse(throw new ActorTimeoutException( + val result = if (sender eq null) actorRef.!!(message)(None) + else actorRef.!!(message)(Some(sender.actorRef)) + result.getOrElse(throw new ActorTimeoutException( "Message [" + message + "]\n\tsent to [" + actorRef.actorClassName + "]\n\tfrom [" + sender.actorRef.actorClassName + @@ -368,14 +365,14 @@ class UntypedActorRef(val actorRef: ActorRef) { * (which will throw an ActorTimeoutException). E.g. send-and-receive-eventually semantics. *

* NOTE: - * Use this method with care. In most cases it is better to use 'sendOneWay' together with 'context.getSender()' to + * Use this method with care. In most cases it is better to use 'sendOneWay' together with 'getContext().getSender()' to * implement request/response message exchanges. *

- * If you are sending messages using sendRequestReply then you have to use context.reply(..) + * If you are sending messages using sendRequestReply then you have to use getContext().reply(..) * to send a reply message to the original sender. If not then the sender will block until the timeout expires. */ def sendRequestReply(message: AnyRef, timeout: Long): AnyRef = - actorRef.!!(message, timeout)(None).getOrElse(throw new ActorTimeoutException( + actorRef.!!(message, timeout)(None).getOrElse(throw new ActorTimeoutException( "Message [" + message + "]\n\tsent to [" + actorRef.actorClassName + "]\n\twith timeout [" + timeout + @@ -389,16 +386,16 @@ class UntypedActorRef(val actorRef: ActorRef) { * (which will throw an ActorTimeoutException). E.g. send-and-receive-eventually semantics. *

* NOTE: - * Use this method with care. In most cases it is better to use 'sendOneWay' together with 'context.getSender()' to + * Use this method with care. In most cases it is better to use 'sendOneWay' together with 'getContext().getSender()' to * implement request/response message exchanges. *

- * If you are sending messages using sendRequestReply then you have to use context.reply(..) + * If you are sending messages using sendRequestReply then you have to use getContext().reply(..) * to send a reply message to the original sender. If not then the sender will block until the timeout expires. */ def sendRequestReply(message: AnyRef, timeout: Long, sender: UntypedActorRef): AnyRef = { - val result = if (sender eq null) actorRef.!!(message, timeout)(None) - else actorRef.!!(message)(Some(sender.actorRef)) - result.getOrElse(throw new ActorTimeoutException( + val result = if (sender eq null) actorRef.!!(message, timeout)(None) + else actorRef.!!(message)(Some(sender.actorRef)) + result.getOrElse(throw new ActorTimeoutException( "Message [" + message + "]\n\tsent to [" + actorRef.actorClassName + "]\n\tfrom [" + sender.actorRef.actorClassName + @@ -412,10 +409,10 @@ class UntypedActorRef(val actorRef: ActorRef) { * the default timeout in the Actor. *

* NOTE: - * Use this method with care. In most cases it is better to use 'sendOneWay' together with the 'context.getSender()' to + * Use this method with care. In most cases it is better to use 'sendOneWay' together with the 'getContext().getSender()' to * implement request/response message exchanges. *

- * If you are sending messages using sendRequestReplyFuture then you have to use context.reply(..) + * If you are sending messages using sendRequestReplyFuture then you have to use getContext().reply(..) * to send a reply message to the original sender. If not then the sender will block until the timeout expires. */ def sendRequestReplyFuture(message: AnyRef): Future[_] = actorRef.!!!(message)(None) @@ -425,24 +422,24 @@ class UntypedActorRef(val actorRef: ActorRef) { * the default timeout in the Actor. *

* NOTE: - * Use this method with care. In most cases it is better to use 'sendOneWay' together with the 'context.getSender()' to + * Use this method with care. In most cases it is better to use 'sendOneWay' together with the 'getContext().getSender()' to * implement request/response message exchanges. *

- * If you are sending messages using sendRequestReplyFuture then you have to use context.reply(..) + * If you are sending messages using sendRequestReplyFuture then you have to use getContext().reply(..) * to send a reply message to the original sender. If not then the sender will block until the timeout expires. */ def sendRequestReplyFuture(message: AnyRef, sender: UntypedActorRef): Future[_] = - if (sender eq null) actorRef.!!!(message)(None) - else actorRef.!!!(message)(Some(sender.actorRef)) + if (sender eq null) actorRef.!!!(message)(None) + else actorRef.!!!(message)(Some(sender.actorRef)) /** * Sends a message asynchronously returns a future holding the eventual reply message. *

* NOTE: - * Use this method with care. In most cases it is better to use 'sendOneWay' together with the 'context.getSender()' to + * Use this method with care. In most cases it is better to use 'sendOneWay' together with the 'getContext().getSender()' to * implement request/response message exchanges. *

- * If you are sending messages using sendRequestReplyFuture then you have to use context.reply(..) + * If you are sending messages using sendRequestReplyFuture then you have to use getContext().reply(..) * to send a reply message to the original sender. If not then the sender will block until the timeout expires. */ def sendRequestReplyFuture(message: AnyRef, timeout: Long): Future[_] = actorRef.!!!(message, timeout)(None) @@ -451,15 +448,15 @@ class UntypedActorRef(val actorRef: ActorRef) { * Sends a message asynchronously returns a future holding the eventual reply message. *

* NOTE: - * Use this method with care. In most cases it is better to use 'sendOneWay' together with the 'context.getSender()' to + * Use this method with care. In most cases it is better to use 'sendOneWay' together with the 'getContext().getSender()' to * implement request/response message exchanges. *

- * If you are sending messages using sendRequestReplyFuture then you have to use context.reply(..) + * If you are sending messages using sendRequestReplyFuture then you have to use getContext().reply(..) * to send a reply message to the original sender. If not then the sender will block until the timeout expires. */ def sendRequestReplyFuture(message: AnyRef, timeout: Long, sender: UntypedActorRef): Future[_] = - if (sender eq null) actorRef.!!!(message, timeout)(None) - else actorRef.!!!(message)(Some(sender.actorRef)) + if (sender eq null) actorRef.!!!(message, timeout)(None) + else actorRef.!!!(message)(Some(sender.actorRef)) /** * Forwards the message and passes the original sender actor as the sender. @@ -467,11 +464,11 @@ class UntypedActorRef(val actorRef: ActorRef) { * Works with 'sendOneWay', 'sendRequestReply' and 'sendRequestReplyFuture'. */ def forward(message: AnyRef, sender: UntypedActorRef): Unit = - if (sender eq null) throw new IllegalArgumentException("The 'sender' argument to 'forward' can't be null") - else actorRef.forward(message)(Some(sender.actorRef)) + if (sender eq null) throw new IllegalArgumentException("The 'sender' argument to 'forward' can't be null") + else actorRef.forward(message)(Some(sender.actorRef)) /** - * Use context.replyUnsafe(..) to reply with a message to the original sender of the message currently + * Use getContext().replyUnsafe(..) to reply with a message to the original sender of the message currently * being processed. *

* Throws an IllegalStateException if unable to determine what to reply to. @@ -479,7 +476,7 @@ class UntypedActorRef(val actorRef: ActorRef) { def replyUnsafe(message: AnyRef): Unit = actorRef.reply(message) /** - * Use context.replySafe(..) to reply with a message to the original sender of the message currently + * Use getContext().replySafe(..) to reply with a message to the original sender of the message currently * being processed. *

* Returns true if reply was sent, and false if unable to determine what to reply to. @@ -499,18 +496,18 @@ class UntypedActorRef(val actorRef: ActorRef) { /** * Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host. */ - def makeRemote(hostname: String, port: Int): Unit = actorRef.makeRemote(hostname, port) + def makeRemote(hostname: String, port: Int): Unit = actorRef.makeRemote(hostname, port) /** * Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host. */ - def makeRemote(address: InetSocketAddress): Unit = actorRef.makeRemote(address) + def makeRemote(address: InetSocketAddress): Unit = actorRef.makeRemote(address) /** * Invoking 'makeTransactionRequired' means that the actor will **start** a new transaction if non exists. * However, it will always participate in an existing transaction. */ - def makeTransactionRequired(): Unit = actorRef.makeTransactionRequired + def makeTransactionRequired(): Unit = actorRef.makeTransactionRequired /** * Sets the transaction configuration for this actor. Needs to be invoked before the actor is started. @@ -550,31 +547,90 @@ class UntypedActorRef(val actorRef: ActorRef) { * 'trap' these exceptions and automatically restart the linked actors according to the restart strategy * defined by the 'faultHandler'. */ - def link(actor: UntypedActorRef): Unit = actorRef.link(actor.actorRef) + def link(actor: UntypedActorRef): Unit = actorRef.link(actor.actorRef) /** * Unlink the actor. */ - def unlink(actor: UntypedActorRef): Unit = actorRef.unlink(actor.actorRef) + def unlink(actor: UntypedActorRef): Unit = actorRef.unlink(actor.actorRef) /** * Atomically start and link an actor. */ - def startLink(actor: UntypedActorRef): Unit = actorRef.startLink(actor.actorRef) + def startLink(actor: UntypedActorRef): Unit = actorRef.startLink(actor.actorRef) /** * Atomically start, link and make an actor remote. */ - def startLinkRemote(actor: UntypedActorRef, hostname: String, port: Int): Unit = - actorRef.startLinkRemote(actor.actorRef, hostname, port) + def startLinkRemote(actor: UntypedActorRef, hostname: String, port: Int): Unit = + actorRef.startLinkRemote(actor.actorRef, hostname, port) + + /** + * Atomically create (from actor class) and start an actor. + *

+ * To be invoked from within the actor itself. + */ + def spawn(clazz: Class[_]): ActorRef = actorRef.guard.withGuard { + val actorRef = spawnButDoNotStart(clazz) + actorRef.start + actorRef + } + + /** + * Atomically create (from actor class), start and make an actor remote. + *

+ * To be invoked from within the actor itself. + */ + def spawnRemote(clazz: Class[_], hostname: String, port: Int): ActorRef = actorRef.guard.withGuard { + val actor = spawnButDoNotStart(clazz) + actor.makeRemote(hostname, port) + actor.start + actor + } + + /** + * Atomically create (from actor class), start and link an actor. + *

+ * To be invoked from within the actor itself. + */ + def spawnLink(clazz: Class[_]): ActorRef = actorRef.guard.withGuard { + val actor = spawnButDoNotStart(clazz) + try { + actor.start + } finally { + actorRef.link(actor) + } + actor + } + + /** + * Atomically create (from actor class), start, link and make an actor remote. + *

+ * To be invoked from within the actor itself. + */ + def spawnLinkRemote(clazz: Class[_], hostname: String, port: Int): ActorRef = actorRef.guard.withGuard { + val actor = spawnButDoNotStart(clazz) + try { + actor.makeRemote(hostname, port) + actor.start + } finally { + actorRef.link(actor) + } + } /** * Returns the mailbox size. */ def getMailboxSize(): Int = actorRef.mailboxSize - + /** * Returns the current supervisor if there is one, null if not. */ def getSupervisor(): UntypedActorRef = UntypedActorRef.wrap(actorRef.supervisor.getOrElse(null)) + + private def spawnButDoNotStart(clazz: Class[_]): ActorRef = actorRef.guard.withGuard { + val actor = UntypedActor.actorOf(clazz) + if (!actorRef.dispatcher.isInstanceOf[ThreadBasedDispatcher]) actor.actorRef.dispatcher = actorRef.dispatcher + actorRef + } } diff --git a/akka-core/src/main/scala/remote/RemoteClient.scala b/akka-core/src/main/scala/remote/RemoteClient.scala index e0212572b2..cc32e96998 100644 --- a/akka-core/src/main/scala/remote/RemoteClient.scala +++ b/akka-core/src/main/scala/remote/RemoteClient.scala @@ -26,6 +26,7 @@ import java.util.concurrent.{TimeUnit, Executors, ConcurrentMap, ConcurrentHashM import java.util.concurrent.atomic.AtomicLong import scala.collection.mutable.{HashSet, HashMap} +import scala.reflect.BeanProperty /** * Atomic remote request/reply message id generator. @@ -43,9 +44,9 @@ object RemoteRequestProtocolIdFactory { * Life-cycle events for RemoteClient. */ sealed trait RemoteClientLifeCycleEvent -case class RemoteClientError(cause: Throwable, host: String, port: Int) extends RemoteClientLifeCycleEvent -case class RemoteClientDisconnected(host: String, port: Int) extends RemoteClientLifeCycleEvent -case class RemoteClientConnected(host: String, port: Int) extends RemoteClientLifeCycleEvent +case class RemoteClientError(@BeanProperty val cause: Throwable, @BeanProperty val host: String, @BeanProperty val port: Int) extends RemoteClientLifeCycleEvent +case class RemoteClientDisconnected(@BeanProperty val host: String, @BeanProperty val port: Int) extends RemoteClientLifeCycleEvent +case class RemoteClientConnected(@BeanProperty val host: String, @BeanProperty val port: Int) extends RemoteClientLifeCycleEvent /** * @author Jonas Bonér diff --git a/akka-core/src/test/java/se/scalablesolutions/akka/actor/ReplyUntypedActor.java b/akka-core/src/test/java/se/scalablesolutions/akka/actor/ReplyUntypedActor.java index 8510c3889b..aeedd010fc 100644 --- a/akka-core/src/test/java/se/scalablesolutions/akka/actor/ReplyUntypedActor.java +++ b/akka-core/src/test/java/se/scalablesolutions/akka/actor/ReplyUntypedActor.java @@ -3,26 +3,22 @@ package se.scalablesolutions.akka.actor; import se.scalablesolutions.akka.actor.*; public class ReplyUntypedActor extends UntypedActor { - public void onReceive(Object message, UntypedActorRef context) throws Exception { - if (message instanceof String) { + public void onReceive(Object message) throws Exception { + if (message instanceof String) { String str = (String)message; - - if (str.equals("ReplyToSendOneWayUsingReply")) { - context.replyUnsafe("Reply"); + if (str.equals("ReplyToSendOneWayUsingReply")) { + getContext().replyUnsafe("Reply"); } else if (str.equals("ReplyToSendOneWayUsingSender")) { - context.getSender().get().sendOneWay("Reply"); - + getContext().getSender().get().sendOneWay("Reply"); } else if (str.equals("ReplyToSendRequestReplyUsingReply")) { - context.replyUnsafe("Reply"); + getContext().replyUnsafe("Reply"); } else if (str.equals("ReplyToSendRequestReplyUsingFuture")) { - context.getSenderFuture().get().completeWithResult("Reply"); - + getContext().getSenderFuture().get().completeWithResult("Reply"); } else if (str.equals("ReplyToSendRequestReplyFutureUsingReply")) { - context.replyUnsafe("Reply"); + getContext().replyUnsafe("Reply"); } else if (str.equals("ReplyToSendRequestReplyFutureUsingFuture")) { - context.getSenderFuture().get().completeWithResult("Reply"); - + getContext().getSenderFuture().get().completeWithResult("Reply"); } else throw new IllegalArgumentException("Unknown message: " + str); - } else throw new IllegalArgumentException("Unknown message: " + message); + } else throw new IllegalArgumentException("Unknown message: " + message); } } diff --git a/akka-core/src/test/java/se/scalablesolutions/akka/actor/SampleUntypedActor.java b/akka-core/src/test/java/se/scalablesolutions/akka/actor/SampleUntypedActor.java index ed8a67ab13..add3b86061 100644 --- a/akka-core/src/test/java/se/scalablesolutions/akka/actor/SampleUntypedActor.java +++ b/akka-core/src/test/java/se/scalablesolutions/akka/actor/SampleUntypedActor.java @@ -12,33 +12,33 @@ import se.scalablesolutions.akka.actor.*; */ public class SampleUntypedActor extends UntypedActor { - public void onReceive(Object message, UntypedActorRef self) throws Exception { - if (message instanceof String) { + public void onReceive(Object message) throws Exception { + if (message instanceof String) { String msg = (String)message; - System.out.println("Received message: " + msg); + System.out.println("Received message: " + msg); - if (msg.equals("UseReply")) { - // Reply to original sender of message using the 'replyUnsafe' method - self.replyUnsafe(msg + ":" + self.getUuid()); + if (msg.equals("UseReply")) { + // Reply to original sender of message using the 'replyUnsafe' method + getContext().replyUnsafe(msg + ":" + getContext().getUuid()); - } else if (msg.equals("UseSender") && self.getSender().isDefined()) { - // Reply to original sender of message using the sender reference - // also passing along my own refererence (the self) - self.getSender().get().sendOneWay(msg, self); + } else if (msg.equals("UseSender") && getContext().getSender().isDefined()) { + // Reply to original sender of message using the sender reference + // also passing along my own refererence (the context) + getContext().getSender().get().sendOneWay(msg, getContext()); - } else if (msg.equals("UseSenderFuture") && self.getSenderFuture().isDefined()) { - // Reply to original sender of message using the sender future reference - self.getSenderFuture().get().completeWithResult(msg); + } else if (msg.equals("UseSenderFuture") && getContext().getSenderFuture().isDefined()) { + // Reply to original sender of message using the sender future reference + getContext().getSenderFuture().get().completeWithResult(msg); - } else if (msg.equals("SendToSelf")) { - // Send fire-forget message to the actor itself recursively - self.sendOneWay(msg); + } else if (msg.equals("SendToSelf")) { + // Send fire-forget message to the actor recursively + getContext().sendOneWay(msg); - } else if (msg.equals("ForwardMessage")) { - // Retreive an actor from the ActorRegistry by ID and get an ActorRef back - ActorRef actorRef = ActorRegistry.actorsFor("some-actor-id")[0]; - // Wrap the ActorRef in an UntypedActorRef and forward the message to this actor - UntypedActorRef.wrap(actorRef).forward(msg, self); + } else if (msg.equals("ForwardMessage")) { + // Retreive an actor from the ActorRegistry by ID and get an ActorRef back + ActorRef actorRef = ActorRegistry.actorsFor("some-actor-id")[0]; + // Wrap the ActorRef in an UntypedActorRef and forward the message to this actor + UntypedActorRef.wrap(actorRef).forward(msg, getContext()); } else throw new IllegalArgumentException("Unknown message: " + message); } else throw new IllegalArgumentException("Unknown message: " + message); diff --git a/akka-core/src/test/java/se/scalablesolutions/akka/actor/SenderUntypedActor.java b/akka-core/src/test/java/se/scalablesolutions/akka/actor/SenderUntypedActor.java index 7234ff27a4..9c9519de12 100644 --- a/akka-core/src/test/java/se/scalablesolutions/akka/actor/SenderUntypedActor.java +++ b/akka-core/src/test/java/se/scalablesolutions/akka/actor/SenderUntypedActor.java @@ -6,31 +6,31 @@ import se.scalablesolutions.akka.dispatch.CompletableFuture; public class SenderUntypedActor extends UntypedActor { private UntypedActorRef replyActor = null; - public void onReceive(Object message, UntypedActorRef context) throws Exception { + public void onReceive(Object message) throws Exception { if (message instanceof UntypedActorRef) replyActor = (UntypedActorRef)message; else if (message instanceof String) { if (replyActor == null) throw new IllegalStateException("Need to receive a ReplyUntypedActor before any other message."); String str = (String)message; if (str.equals("ReplyToSendOneWayUsingReply")) { - replyActor.sendOneWay("ReplyToSendOneWayUsingReply", context); + replyActor.sendOneWay("ReplyToSendOneWayUsingReply", getContext()); } else if (str.equals("ReplyToSendOneWayUsingSender")) { - replyActor.sendOneWay("ReplyToSendOneWayUsingSender", context); + replyActor.sendOneWay("ReplyToSendOneWayUsingSender", getContext()); } else if (str.equals("ReplyToSendRequestReplyUsingReply")) { - UntypedActorTestState.log = (String)replyActor.sendRequestReply("ReplyToSendRequestReplyUsingReply", context); + UntypedActorTestState.log = (String)replyActor.sendRequestReply("ReplyToSendRequestReplyUsingReply", getContext()); UntypedActorTestState.finished.await(); } else if (str.equals("ReplyToSendRequestReplyUsingFuture")) { - UntypedActorTestState.log = (String)replyActor.sendRequestReply("ReplyToSendRequestReplyUsingFuture", context); + UntypedActorTestState.log = (String)replyActor.sendRequestReply("ReplyToSendRequestReplyUsingFuture", getContext()); UntypedActorTestState.finished.await(); } else if (str.equals("ReplyToSendRequestReplyFutureUsingReply")) { - CompletableFuture future = (CompletableFuture)replyActor.sendRequestReplyFuture("ReplyToSendRequestReplyFutureUsingReply", context); + CompletableFuture future = (CompletableFuture)replyActor.sendRequestReplyFuture("ReplyToSendRequestReplyFutureUsingReply", getContext()); future.await(); UntypedActorTestState.log = (String)future.result().get(); UntypedActorTestState.finished.await(); } else if (str.equals("ReplyToSendRequestReplyFutureUsingFuture")) { - CompletableFuture future = (CompletableFuture)replyActor.sendRequestReplyFuture("ReplyToSendRequestReplyFutureUsingFuture", context); + CompletableFuture future = (CompletableFuture)replyActor.sendRequestReplyFuture("ReplyToSendRequestReplyFutureUsingFuture", getContext()); future.await(); UntypedActorTestState.log = (String)future.result().get(); UntypedActorTestState.finished.await();