diff --git a/.gitignore b/.gitignore index 51b42bf7be..91a3be7969 100755 --- a/.gitignore +++ b/.gitignore @@ -39,4 +39,5 @@ run-codefellow .classpath .idea .scala_dependencies -multiverse.log \ No newline at end of file +multiverse.log +.eprj \ No newline at end of file diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index 1de3997c12..562a10a7d5 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -14,6 +14,7 @@ import se.scalablesolutions.akka.util.{Logging, Duration} import com.google.protobuf.Message import java.util.concurrent.TimeUnit +import java.net.InetSocketAddress /** * Implements the Transactor abstraction. E.g. a transactional actor. @@ -33,8 +34,9 @@ trait Transactor extends Actor { * * @author Jonas Bonér */ -abstract class RemoteActor(hostname: String, port: Int) extends Actor { - self.makeRemote(hostname, port) +abstract class RemoteActor(address: InetSocketAddress) extends Actor { + def this(hostname: String, port: Int) = this(new InetSocketAddress(hostname, port)) + self.makeRemote(address) } /** @@ -56,6 +58,7 @@ class ActorStartException private[akka](message: String) extends RuntimeExceptio class IllegalActorStateException private[akka](message: String) extends RuntimeException(message) class ActorKilledException private[akka](message: String) extends RuntimeException(message) class ActorInitializationException private[akka](message: String) extends RuntimeException(message) +class ActorTimeoutException private[akka](message: String) extends RuntimeException(message) /** * Actor factory module with factory methods for creating various kinds of Actors. @@ -73,9 +76,9 @@ object Actor extends Logging { type Receive = PartialFunction[Any, Unit] private[actor] val actorRefInCreation = new scala.util.DynamicVariable[Option[ActorRef]](None) - + /** - * Creates a Actor.actorOf out of the Actor with type T. + * Creates an ActorRef out of the Actor with type T. *
    *   import Actor._
    *   val actor = actorOf[MyActor]
@@ -91,7 +94,7 @@ object Actor extends Logging {
   def actorOf[T <: Actor : Manifest]: ActorRef = new LocalActorRef(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]])
 
   /**
-   * Creates a Actor.actorOf out of the Actor. Allows you to pass in a factory function
+   * Creates an ActorRef out of the Actor. Allows you to pass in a factory function
    * that creates the Actor. Please note that this function can be invoked multiple
    * times if for example the Actor is supervised and needs to be restarted.
    * 

diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala index ca55d6c1fa..3cc9878129 100644 --- a/akka-core/src/main/scala/actor/ActorRef.scala +++ b/akka-core/src/main/scala/actor/ActorRef.scala @@ -457,57 +457,41 @@ trait ActorRef extends TransactionManagement { * If the 'trapExit' member field has been set to at contain at least one exception class then it will * 'trap' these exceptions and automatically restart the linked actors according to the restart strategy * defined by the 'faultHandler'. - *

- * To be invoked from within the actor itself. */ def link(actorRef: ActorRef): Unit /** * Unlink the actor. - *

- * To be invoked from within the actor itself. */ def unlink(actorRef: ActorRef): Unit /** * Atomically start and link an actor. - *

- * To be invoked from within the actor itself. */ def startLink(actorRef: ActorRef): Unit /** * Atomically start, link and make an actor remote. - *

- * To be invoked from within the actor itself. */ def startLinkRemote(actorRef: ActorRef, hostname: String, port: Int): Unit /** * Atomically create (from actor class) and start an actor. - *

- * To be invoked from within the actor itself. */ def spawn[T <: Actor : Manifest]: ActorRef /** * Atomically create (from actor class), start and make an actor remote. - *

- * To be invoked from within the actor itself. */ def spawnRemote[T <: Actor: Manifest](hostname: String, port: Int): ActorRef /** * Atomically create (from actor class), start and link an actor. - *

- * To be invoked from within the actor itself. */ def spawnLink[T <: Actor: Manifest]: ActorRef /** * Atomically create (from actor class), start, link and make an actor remote. - *

- * To be invoked from within the actor itself. */ def spawnLinkRemote[T <: Actor : Manifest](hostname: String, port: Int): ActorRef @@ -587,7 +571,7 @@ trait ActorRef extends TransactionManagement { * * @author Jonas Bonér */ -sealed class LocalActorRef private[akka]( +class LocalActorRef private[akka]( private[this] var actorFactory: Either[Option[Class[_ <: Actor]], Option[() => Actor]] = Left(None)) extends ActorRef { diff --git a/akka-core/src/main/scala/actor/UntypedActor.scala b/akka-core/src/main/scala/actor/UntypedActor.scala index 7e45dee40d..3026369202 100644 --- a/akka-core/src/main/scala/actor/UntypedActor.scala +++ b/akka-core/src/main/scala/actor/UntypedActor.scala @@ -4,31 +4,577 @@ package se.scalablesolutions.akka.actor +import se.scalablesolutions.akka.dispatch._ +import se.scalablesolutions.akka.stm.global._ +import se.scalablesolutions.akka.config.{AllForOneStrategy, OneForOneStrategy, FaultHandlingStrategy} +import se.scalablesolutions.akka.config.ScalaConfig._ + +import java.net.InetSocketAddress + /** - * FIXME: document + * Subclass this abstract class to create a MDB-style untyped actor. + *

+ * This class is meant to be used from Java. + *

+ * Here is an example on how to create and use an UntypedActor: + *

+ *  public class SampleUntypedActor extends UntypedActor {
+ *    public void onReceive(Object message, UntypedActorRef context) throws Exception {
+ *      if (message instanceof String) {
+ *        String msg = (String)message;
+ *
+ *	      if (msg.equals("UseReply")) {
+ *	  	    // Reply to original sender of message using the 'replyUnsafe' method
+ *	  	    context.replyUnsafe(msg + ":" + context.getUuid());
+ *
+ *	      } else if (msg.equals("UseSender") && context.getSender().isDefined()) {	
+ *	  	    // Reply to original sender of message using the sender reference
+ *	  	    // also passing along my own refererence (the context)
+ *	  	    context.getSender().get().sendOneWay(msg, context); 
+ *
+ *        } else if (msg.equals("UseSenderFuture") && context.getSenderFuture().isDefined()) {	
+ *	  	    // Reply to original sender of message using the sender future reference
+ *	  	    context.getSenderFuture().get().completeWithResult(msg);
+ *
+ *        } else if (msg.equals("SendToSelf")) {
+ *	  	    // Send message to the actor itself recursively
+ *	  	    context.sendOneWay(msg)
+ *
+ *        } else if (msg.equals("ForwardMessage")) {
+ *          // Retreive an actor from the ActorRegistry by ID and get an ActorRef back
+ *          ActorRef actorRef = ActorRegistry.actorsFor("some-actor-id").head();
+ *          // Wrap the ActorRef in an UntypedActorRef and forward the message to this actor
+ *          UntypedActorRef.wrap(actorRef).forward(msg, context);
+ *
+ *        } else throw new IllegalArgumentException("Unknown message: " + message);
+ *      } else throw new IllegalArgumentException("Unknown message: " + message);
+ *    }
+ *   
+ *    public static void main(String[] args) {
+ *      UntypedActorRef actor = UntypedActor.actorOf(SampleUntypedActor.class);
+ *      actor.start();
+ *      actor.sendOneWay("SendToSelf");
+ *      actor.stop();
+ *    }
+ *  }
+ * 
* * @author Jonas Bonér */ abstract class UntypedActor extends Actor { - protected[akka] var context: Option[ActorContext] = None + protected[akka] var context: Option[UntypedActorRef] = None - protected def receive = { + final protected def receive = { case msg => if (context.isEmpty) { - val ctx = new ActorContext(self) + val ctx = new UntypedActorRef(self) context = Some(ctx) onReceive(msg, ctx) } else onReceive(msg, context.get) } - def onReceive(message: Any, context: ActorContext): Unit + @throws(classOf[Exception]) + def onReceive(message: Any, context: UntypedActorRef): Unit } /** - * FIXME: document + * Implements the Transactor abstraction. E.g. a transactional UntypedActor. * * @author Jonas Bonér */ -class ActorContext(self: ActorRef) { +abstract class UntypedTransactor extends UntypedActor { + self.makeTransactionRequired +} + +/** + * Extend this abstract class to create a remote UntypedActor. + * + * @author Jonas Bonér + */ +abstract class RemoteUntypedActor(address: InetSocketAddress) extends UntypedActor { + def this(hostname: String, port: Int) = this(new InetSocketAddress(hostname, port)) + self.makeRemote(address) +} + +/** + * Factory object for creating and managing 'UntypedActor's. Meant to be used from Java. + *

+ * Example on how to create an actor: + *

+ *   ActorRef actor = UntypedActor.actorOf(MyUntypedActor.class);
+ *   actor.start();
+ *   actor.sendOneWay(message, context)
+ *   actor.stop();
+ * 
+ * You can create and start the actor in one statement like this: + *
+ *   ActorRef actor = UntypedActor.actorOf(MyUntypedActor.class).start();
+ * 
+ * + * @author Jonas Bonér + */ +object UntypedActor { -} \ No newline at end of file + /** + * Creates an ActorRef out of the Actor. Allows you to pass in the class for the Actor. + *

+ * Example in Java: + *

+   *   ActorRef actor = UntypedActor.actorOf(MyUntypedActor.class);
+   *   actor.start();
+   *   actor.sendOneWay(message, context)
+   *   actor.stop();
+   * 
+ * You can create and start the actor in one statement like this: + *
+   *   ActorRef actor = UntypedActor.actorOf(MyUntypedActor.class).start();
+   * 
+ */ + def actorOf(clazz: Class[_]): UntypedActorRef = { + if (!clazz.isInstanceOf[Class[_ <: UntypedActor]]) throw new IllegalArgumentException( + "Class [" + clazz.getName + "] passed into the 'actorOf' factory method needs to be assignable from 'UntypedActor'") + UntypedActorRef.wrap(new LocalActorRef(() => clazz.newInstance.asInstanceOf[Actor])) + } + + /** + * NOTE: Use this convenience method with care, do NOT make it possible to get a reference to the + * UntypedActor instance directly, but only through its 'UntypedActorRef' wrapper reference. + *

+ * Creates an ActorRef out of the Actor. Allows you to pass in the instance for the Actor. Only + * use this method when you need to pass in constructor arguments into the 'UntypedActor'. + *

+ * Example in Java: + *

+   *   ActorRef actor = UntypedActor.actorOf(new MyUntypedActor("service:name", 5));
+   *   actor.start();
+   *   actor.sendOneWay(message, context)
+   *   actor.stop();
+   * 
+ * You can create and start the actor in one statement like this: + *
+   *   ActorRef actor = UntypedActor.actorOf(MyUntypedActor.class).start();
+   * 
+ */ + def actorOf(actorInstance: UntypedActor): UntypedActorRef = UntypedActorRef.wrap(new LocalActorRef(() => actorInstance)) +} + +/** + * Use this class if you need to wrap an 'ActorRef' in the more Java-friendly 'UntypedActorRef'. + * + * @author Jonas Bonér + */ +object UntypedActorRef { + def wrap(actorRef: ActorRef) = new UntypedActorRef(actorRef) +} + +/** + * A Java-friendly wrapper class around the 'ActorRef'. + * + * @author Jonas Bonér + */ +class UntypedActorRef(val actorRef: ActorRef) { + + /** + * Returns the uuid for the actor. + */ + def getUuid(): String = actorRef.uuid + + /** + * Identifier for actor, does not have to be a unique one. Default is the 'uuid'. + *

+ * This field is used for logging, AspectRegistry.actorsFor(id), identifier for remote + * actor in RemoteServer etc.But also as the identifier for persistence, which means + * that you can use a custom name to be able to retrieve the "correct" persisted state + * upon restart, remote restart etc. + */ + def setId(id: String) = actorRef.id = id + def getId(): String = actorRef.id + + /** + * Defines the default timeout for '!!' and '!!!' invocations, + * e.g. the timeout for the future returned by the call to '!!' and '!!!'. + */ + def setTimeout(timeout: Long) = actorRef.timeout = timeout + def getTimeout(): Long = actorRef.timeout + + /** + * Defines the default timeout for an initial receive invocation. + * When specified, the receive function should be able to handle a 'ReceiveTimeout' message. + */ + def setReceiveTimeout(timeout: Long) = actorRef.receiveTimeout = Some(timeout) + def getReceiveTimeout(): Option[Long] = actorRef.receiveTimeout + + /** + * Set 'trapExit' to the list of exception classes that the actor should be able to trap + * from the actor it is supervising. When the supervising actor throws these exceptions + * then they will trigger a restart. + *

+ * + * Trap all exceptions: + *

+   * context.setTrapExit(new Class[]{Throwable.class});
+   * 
+ * + * Trap specific exceptions only: + *
+   * context.setTrapExit(new Class[]{MyApplicationException.class, MyApplicationError.class});
+   * 
+ */ + def setTrapExit(exceptions: Array[Class[_ <: Throwable]]) = actorRef.trapExit = exceptions.toList + def getTrapExit(): Array[Class[_ <: Throwable]] = actorRef.trapExit.toArray + + /** + * If 'trapExit' is set for the actor to act as supervisor, then a 'faultHandler' must be defined. + *

+ * Can be one of: + *

+   *  context.setFaultHandler(new AllForOneStrategy(maxNrOfRetries, withinTimeRange));
+   * 
+ * Or: + *
+   *  context.setFaultHandler(new OneForOneStrategy(maxNrOfRetries, withinTimeRange));
+   * 
+ */ + def setFaultHandler(handler: FaultHandlingStrategy) = actorRef.faultHandler = Some(handler) + def getFaultHandler(): Option[FaultHandlingStrategy] = actorRef.faultHandler + + /** + * Defines the life-cycle for a supervised actor. + */ + def setLifeCycle(lifeCycle: LifeCycle) = actorRef.lifeCycle = Some(lifeCycle) + def getLifeCycle(): Option[LifeCycle] = actorRef.lifeCycle + + /** + * The default dispatcher is the Dispatchers.globalExecutorBasedEventDrivenDispatcher();. + * This means that all actors will share the same event-driven executor based dispatcher. + *

+ * You can override it so it fits the specific use-case that the actor is used for. + * See the se.scalablesolutions.akka.dispatch.Dispatchers class for the different + * dispatchers available. + *

+ * The default is also that all actors that are created and spawned from within this actor + * is sharing the same dispatcher as its creator. + */ + def setDispatcher(dispatcher: MessageDispatcher) = actorRef.dispatcher = dispatcher + def getDispatcher(): MessageDispatcher = actorRef.dispatcher + + /** + * The reference sender Actor of the last received message. + * Is defined if the message was sent from another Actor, else None. + */ + def getSender(): Option[UntypedActorRef] = actorRef.sender match { + case Some(s) => Some(UntypedActorRef.wrap(s)) + case None => None + } + + /** + * The reference sender future of the last received message. + * Is defined if the message was sent with sent with 'sendRequestReply' or 'sendRequestReplyFuture', else None. + */ + def getSenderFuture(): Option[CompletableFuture[Any]] = actorRef.senderFuture + + /** + * Starts up the actor and its message queue. + */ + def start(): UntypedActorRef = UntypedActorRef.wrap(actorRef.start) + + /** + * Shuts down the actor its dispatcher and message queue. + * Alias for 'stop'. + */ + def exit() = stop() + + /** + * Shuts down the actor its dispatcher and message queue. + */ + def stop(): Unit = actorRef.stop() + + /** + * Sends a one-way asynchronous message. E.g. fire-and-forget semantics. + *

+ *

+   *   actor.sendOneWay(message);
+   * 
+ *

+ */ + def sendOneWay(message: AnyRef) = actorRef.!(message)(None) + + /** + * Sends a one-way asynchronous message. E.g. fire-and-forget semantics. + *

+ * Allows you to pass along the sender of the messag. + *

+ *

+   *   actor.sendOneWay(message, context);
+   * 
+ *

+ */ + def sendOneWay(message: AnyRef, sender: UntypedActorRef) = + if (sender eq null) actorRef.!(message)(None) + else actorRef.!(message)(Some(sender.actorRef)) + + /** + * Sends a message asynchronously and waits on a future for a reply message under the hood. The timeout is taken from + * the default timeout in the Actor. + *

+ * It waits on the reply either until it receives it or until the timeout expires + * (which will throw an ActorTimeoutException). E.g. send-and-receive-eventually semantics. + *

+ * NOTE: + * Use this method with care. In most cases it is better to use 'sendOneWay' together with 'context.getSender()' to + * implement request/response message exchanges. + *

+ * If you are sending messages using sendRequestReply then you have to use context.reply(..) + * to send a reply message to the original sender. If not then the sender will block until the timeout expires. + */ + def sendRequestReply(message: AnyRef): AnyRef = + actorRef.!!(message)(None).getOrElse(throw new ActorTimeoutException( + "Message [" + message + + "]\n\tsent to [" + actorRef.actorClassName + + "]\n\twith timeout [" + actorRef.timeout + + "]\n\ttimed out.")) + .asInstanceOf[AnyRef] + + /** + * Sends a message asynchronously and waits on a future for a reply message under the hood. The timeout is taken from + * the default timeout in the Actor. + *

+ * It waits on the reply either until it receives it or until the timeout expires + * (which will throw an ActorTimeoutException). E.g. send-and-receive-eventually semantics. + *

+ * NOTE: + * Use this method with care. In most cases it is better to use 'sendOneWay' together with 'context.getSender()' to + * implement request/response message exchanges. + *

+ * If you are sending messages using sendRequestReply then you have to use context.reply(..) + * to send a reply message to the original sender. If not then the sender will block until the timeout expires. + */ + def sendRequestReply(message: AnyRef, sender: UntypedActorRef): AnyRef = { + val result = if (sender eq null) actorRef.!!(message)(None) + else actorRef.!!(message)(Some(sender.actorRef)) + result.getOrElse(throw new ActorTimeoutException( + "Message [" + message + + "]\n\tsent to [" + actorRef.actorClassName + + "]\n\tfrom [" + sender.actorRef.actorClassName + + "]\n\twith timeout [" + actorRef.timeout + + "]\n\ttimed out.")) + .asInstanceOf[AnyRef] + } + + /** + * Sends a message asynchronously and waits on a future for a reply message under the hood. + *

+ * It waits on the reply either until it receives it or until the timeout expires + * (which will throw an ActorTimeoutException). E.g. send-and-receive-eventually semantics. + *

+ * NOTE: + * Use this method with care. In most cases it is better to use 'sendOneWay' together with 'context.getSender()' to + * implement request/response message exchanges. + *

+ * If you are sending messages using sendRequestReply then you have to use context.reply(..) + * to send a reply message to the original sender. If not then the sender will block until the timeout expires. + */ + def sendRequestReply(message: AnyRef, timeout: Long): AnyRef = + actorRef.!!(message, timeout)(None).getOrElse(throw new ActorTimeoutException( + "Message [" + message + + "]\n\tsent to [" + actorRef.actorClassName + + "]\n\twith timeout [" + timeout + + "]\n\ttimed out.")) + .asInstanceOf[AnyRef] + + /** + * Sends a message asynchronously and waits on a future for a reply message under the hood. + *

+ * It waits on the reply either until it receives it or until the timeout expires + * (which will throw an ActorTimeoutException). E.g. send-and-receive-eventually semantics. + *

+ * NOTE: + * Use this method with care. In most cases it is better to use 'sendOneWay' together with 'context.getSender()' to + * implement request/response message exchanges. + *

+ * If you are sending messages using sendRequestReply then you have to use context.reply(..) + * to send a reply message to the original sender. If not then the sender will block until the timeout expires. + */ + def sendRequestReply(message: AnyRef, timeout: Long, sender: UntypedActorRef): AnyRef = { + val result = if (sender eq null) actorRef.!!(message, timeout)(None) + else actorRef.!!(message)(Some(sender.actorRef)) + result.getOrElse(throw new ActorTimeoutException( + "Message [" + message + + "]\n\tsent to [" + actorRef.actorClassName + + "]\n\tfrom [" + sender.actorRef.actorClassName + + "]\n\twith timeout [" + timeout + + "]\n\ttimed out.")) + .asInstanceOf[AnyRef] + } + + /** + * Sends a message asynchronously returns a future holding the eventual reply message. The timeout is taken from + * the default timeout in the Actor. + *

+ * NOTE: + * Use this method with care. In most cases it is better to use 'sendOneWay' together with the 'context.getSender()' to + * implement request/response message exchanges. + *

+ * If you are sending messages using sendRequestReplyFuture then you have to use context.reply(..) + * to send a reply message to the original sender. If not then the sender will block until the timeout expires. + */ + def sendRequestReplyFuture(message: AnyRef): Future[_] = actorRef.!!!(message)(None) + + /** + * Sends a message asynchronously returns a future holding the eventual reply message. The timeout is taken from + * the default timeout in the Actor. + *

+ * NOTE: + * Use this method with care. In most cases it is better to use 'sendOneWay' together with the 'context.getSender()' to + * implement request/response message exchanges. + *

+ * If you are sending messages using sendRequestReplyFuture then you have to use context.reply(..) + * to send a reply message to the original sender. If not then the sender will block until the timeout expires. + */ + def sendRequestReplyFuture(message: AnyRef, sender: UntypedActorRef): Future[_] = + if (sender eq null) actorRef.!!!(message)(None) + else actorRef.!!!(message)(Some(sender.actorRef)) + + /** + * Sends a message asynchronously returns a future holding the eventual reply message. + *

+ * NOTE: + * Use this method with care. In most cases it is better to use 'sendOneWay' together with the 'context.getSender()' to + * implement request/response message exchanges. + *

+ * If you are sending messages using sendRequestReplyFuture then you have to use context.reply(..) + * to send a reply message to the original sender. If not then the sender will block until the timeout expires. + */ + def sendRequestReplyFuture(message: AnyRef, timeout: Long): Future[_] = actorRef.!!!(message, timeout)(None) + + /** + * Sends a message asynchronously returns a future holding the eventual reply message. + *

+ * NOTE: + * Use this method with care. In most cases it is better to use 'sendOneWay' together with the 'context.getSender()' to + * implement request/response message exchanges. + *

+ * If you are sending messages using sendRequestReplyFuture then you have to use context.reply(..) + * to send a reply message to the original sender. If not then the sender will block until the timeout expires. + */ + def sendRequestReplyFuture(message: AnyRef, timeout: Long, sender: UntypedActorRef): Future[_] = + if (sender eq null) actorRef.!!!(message, timeout)(None) + else actorRef.!!!(message)(Some(sender.actorRef)) + + /** + * Forwards the message and passes the original sender actor as the sender. + *

+ * Works with 'sendOneWay', 'sendRequestReply' and 'sendRequestReplyFuture'. + */ + def forward(message: AnyRef, sender: UntypedActorRef): Unit = + if (sender eq null) throw new IllegalArgumentException("The 'sender' argument to 'forward' can't be null") + else actorRef.forward(message)(Some(sender.actorRef)) + + /** + * Use context.replyUnsafe(..) to reply with a message to the original sender of the message currently + * being processed. + *

+ * Throws an IllegalStateException if unable to determine what to reply to. + */ + def replyUnsafe(message: AnyRef): Unit = actorRef.reply(message) + + /** + * Use context.replySafe(..) to reply with a message to the original sender of the message currently + * being processed. + *

+ * Returns true if reply was sent, and false if unable to determine what to reply to. + */ + def replySafe(message: AnyRef): Boolean = actorRef.reply_?(message) + + /** + * Returns the class for the Actor instance that is managed by the ActorRef. + */ + def getActorClass(): Class[_ <: Actor] = actorRef.actorClass + + /** + * Returns the class name for the Actor instance that is managed by the ActorRef. + */ + def getActorClassName(): String = actorRef.actorClassName + + /** + * Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host. + */ + def makeRemote(hostname: String, port: Int): Unit = actorRef.makeRemote(hostname, port) + + /** + * Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host. + */ + def makeRemote(address: InetSocketAddress): Unit = actorRef.makeRemote(address) + + /** + * Invoking 'makeTransactionRequired' means that the actor will **start** a new transaction if non exists. + * However, it will always participate in an existing transaction. + */ + def makeTransactionRequired(): Unit = actorRef.makeTransactionRequired + + /** + * Sets the transaction configuration for this actor. Needs to be invoked before the actor is started. + */ + def setTransactionConfig(config: TransactionConfig): Unit = actorRef.transactionConfig = config + + /** + * Get the transaction configuration for this actor. + */ + def getTransactionConfig(): TransactionConfig = actorRef.transactionConfig + + /** + * Gets the remote address for the actor, if any, else None. + */ + def getRemoteAddress(): Option[InetSocketAddress] = actorRef.remoteAddress + + /** + * Returns the home address and port for this actor. + */ + def getHomeAddress(): InetSocketAddress = actorRef.homeAddress + + /** + * Set the home address and port for this actor. + */ + def setHomeAddress(hostnameAndPort: Tuple2[String, Int]): Unit = actorRef.homeAddress = hostnameAndPort + + /** + * Set the home address and port for this actor. + */ + def setHomeAddress(address: InetSocketAddress): Unit = actorRef.homeAddress = address + + /** + * Links an other actor to this actor. Links are unidirectional and means that a the linking actor will + * receive a notification if the linked actor has crashed. + *

+ * If the 'trapExit' member field has been set to at contain at least one exception class then it will + * 'trap' these exceptions and automatically restart the linked actors according to the restart strategy + * defined by the 'faultHandler'. + */ + def link(actor: UntypedActorRef): Unit = actorRef.link(actor.actorRef) + + /** + * Unlink the actor. + */ + def unlink(actor: UntypedActorRef): Unit = actorRef.unlink(actor.actorRef) + + /** + * Atomically start and link an actor. + */ + def startLink(actor: UntypedActorRef): Unit = actorRef.startLink(actor.actorRef) + + /** + * Atomically start, link and make an actor remote. + */ + def startLinkRemote(actor: UntypedActorRef, hostname: String, port: Int): Unit = + actorRef.startLinkRemote(actor.actorRef, hostname, port) + + /** + * Returns the mailbox size. + */ + def getMailboxSize(): Int = actorRef.mailboxSize + + /** + * Returns the current supervisor if there is one, null if not. + */ + def getSupervisor(): UntypedActorRef = UntypedActorRef.wrap(actorRef.supervisor.getOrElse(null)) +} diff --git a/akka-core/src/test/java/se/scalablesolutions/akka/actor/ReplyUntypedActor.java b/akka-core/src/test/java/se/scalablesolutions/akka/actor/ReplyUntypedActor.java new file mode 100644 index 0000000000..c50e1e8b3d --- /dev/null +++ b/akka-core/src/test/java/se/scalablesolutions/akka/actor/ReplyUntypedActor.java @@ -0,0 +1,28 @@ +package se.scalablesolutions.akka.actor; + +import se.scalablesolutions.akka.actor.*; + +public class ReplyUntypedActor extends UntypedActor { + public void onReceive(Object message, UntypedActorRef context) throws Exception { + if (message instanceof String) { + String str = (String)message; + + if (str.equals("ReplyToSendOneWayUsingReply")) { + context.replyUnsafe("Reply"); + } else if (str.equals("ReplyToSendOneWayUsingSender")) { + context.getSender().get().sendOneWay("Reply"); + + } else if (str.equals("ReplyToSendRequestReplyUsingReply")) { + context.replyUnsafe("Reply"); + } else if (str.equals("ReplyToSendRequestReplyUsingFuture")) { + context.getSenderFuture().get().completeWithResult("Reply"); + + } else if (str.equals("ReplyToSendRequestReplyFutureUsingReply")) { + context.replyUnsafe("Reply"); + } else if (str.equals("ReplyToSendRequestReplyFutureUsingFuture")) { + context.getSenderFuture().get().completeWithResult("Reply"); + + } else throw new IllegalArgumentException("Unknown message: " + str); + } else throw new IllegalArgumentException("Unknown message: " + message); + } +} diff --git a/akka-core/src/test/java/se/scalablesolutions/akka/actor/SampleUntypedActor.java b/akka-core/src/test/java/se/scalablesolutions/akka/actor/SampleUntypedActor.java new file mode 100644 index 0000000000..41f125dd98 --- /dev/null +++ b/akka-core/src/test/java/se/scalablesolutions/akka/actor/SampleUntypedActor.java @@ -0,0 +1,53 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ +package se.scalablesolutions.akka.actor; + +import se.scalablesolutions.akka.actor.*; + +/** + * Here is an example on how to create and use an UntypedActor. + * + * @author Jonas Bonér + */ +public class SampleUntypedActor extends UntypedActor { + + public void onReceive(Object message, UntypedActorRef context) throws Exception { + if (message instanceof String) { + String msg = (String)message; + System.out.println("Received message: " + msg); + + if (msg.equals("UseReply")) { + // Reply to original sender of message using the 'replyUnsafe' method + context.replyUnsafe(msg + ":" + context.getUuid()); + + } else if (msg.equals("UseSender") && context.getSender().isDefined()) { + // Reply to original sender of message using the sender reference + // also passing along my own refererence (the context) + context.getSender().get().sendOneWay(msg, context); + + } else if (msg.equals("UseSenderFuture") && context.getSenderFuture().isDefined()) { + // Reply to original sender of message using the sender future reference + context.getSenderFuture().get().completeWithResult(msg); + + } else if (msg.equals("SendToSelf")) { + // Send fire-forget message to the actor itself recursively + context.sendOneWay(msg); + + } else if (msg.equals("ForwardMessage")) { + // Retreive an actor from the ActorRegistry by ID and get an ActorRef back + ActorRef actorRef = ActorRegistry.actorsFor("some-actor-id").head(); + // Wrap the ActorRef in an UntypedActorRef and forward the message to this actor + UntypedActorRef.wrap(actorRef).forward(msg, context); + + } else throw new IllegalArgumentException("Unknown message: " + message); + } else throw new IllegalArgumentException("Unknown message: " + message); + } + + public static void main(String[] args) { + UntypedActorRef actor = UntypedActor.actorOf(SampleUntypedActor.class); + actor.start(); + actor.sendOneWay("SendToSelf"); + actor.stop(); + } +} diff --git a/akka-core/src/test/java/se/scalablesolutions/akka/actor/SenderUntypedActor.java b/akka-core/src/test/java/se/scalablesolutions/akka/actor/SenderUntypedActor.java new file mode 100644 index 0000000000..84adc5da7a --- /dev/null +++ b/akka-core/src/test/java/se/scalablesolutions/akka/actor/SenderUntypedActor.java @@ -0,0 +1,44 @@ +package se.scalablesolutions.akka.actor; + +import se.scalablesolutions.akka.actor.*; +import se.scalablesolutions.akka.dispatch.CompletableFuture; + +public class SenderUntypedActor extends UntypedActor { + private UntypedActorRef replyActor = null; + + public void onReceive(Object message, UntypedActorRef context) throws Exception { + if (message instanceof UntypedActorRef) replyActor = (UntypedActorRef)message; + else if (message instanceof String) { + if (replyActor == null) throw new IllegalStateException("Need to receive a ReplyUntypedActor before any other message."); + String str = (String)message; + + if (str.equals("ReplyToSendOneWayUsingReply")) { + replyActor.sendOneWay("ReplyToSendOneWayUsingReply", context); + } else if (str.equals("ReplyToSendOneWayUsingSender")) { + replyActor.sendOneWay("ReplyToSendOneWayUsingSender", context); + + } else if (str.equals("ReplyToSendRequestReplyUsingReply")) { + UntypedActorTestState.log = (String)replyActor.sendRequestReply("ReplyToSendRequestReplyUsingReply", context); + UntypedActorTestState.finished.await(); + } else if (str.equals("ReplyToSendRequestReplyUsingFuture")) { + UntypedActorTestState.log = (String)replyActor.sendRequestReply("ReplyToSendRequestReplyUsingFuture", context); + UntypedActorTestState.finished.await(); + + } else if (str.equals("ReplyToSendRequestReplyFutureUsingReply")) { + CompletableFuture future = (CompletableFuture)replyActor.sendRequestReplyFuture("ReplyToSendRequestReplyFutureUsingReply", context); + future.await(); + UntypedActorTestState.log = (String)future.result().get(); + UntypedActorTestState.finished.await(); + } else if (str.equals("ReplyToSendRequestReplyFutureUsingFuture")) { + CompletableFuture future = (CompletableFuture)replyActor.sendRequestReplyFuture("ReplyToSendRequestReplyFutureUsingFuture", context); + future.await(); + UntypedActorTestState.log = (String)future.result().get(); + UntypedActorTestState.finished.await(); + + } else if (str.equals("Reply")) { + UntypedActorTestState.log = "Reply"; + UntypedActorTestState.finished.await(); + } + } + } +} diff --git a/akka-core/src/test/java/se/scalablesolutions/akka/actor/TestUntypedActor.java b/akka-core/src/test/java/se/scalablesolutions/akka/actor/TestUntypedActor.java deleted file mode 100644 index 7a1d2982e9..0000000000 --- a/akka-core/src/test/java/se/scalablesolutions/akka/actor/TestUntypedActor.java +++ /dev/null @@ -1,9 +0,0 @@ -package se.scalablesolutions.akka.actor; - -import se.scalablesolutions.akka.actor.*; - -public class TestUntypedActor extends UntypedActor { - public void onReceive(Object message, ActorContext context) { - System.out.println("TestUntypedActor got " + message); - } -} \ No newline at end of file diff --git a/akka-core/src/test/java/se/scalablesolutions/akka/actor/UntypedActorTestState.java b/akka-core/src/test/java/se/scalablesolutions/akka/actor/UntypedActorTestState.java new file mode 100644 index 0000000000..b94c5870fd --- /dev/null +++ b/akka-core/src/test/java/se/scalablesolutions/akka/actor/UntypedActorTestState.java @@ -0,0 +1,10 @@ +package se.scalablesolutions.akka.actor; + +import se.scalablesolutions.akka.actor.*; + +import java.util.concurrent.CyclicBarrier; + +public class UntypedActorTestState { + public static String log = "NIL"; + public static CyclicBarrier finished = null; +} diff --git a/akka-core/src/test/scala/actor/untyped-actor/UntypedActorFireForgetRequestReplySpec.scala b/akka-core/src/test/scala/actor/untyped-actor/UntypedActorFireForgetRequestReplySpec.scala new file mode 100644 index 0000000000..89a05eca9c --- /dev/null +++ b/akka-core/src/test/scala/actor/untyped-actor/UntypedActorFireForgetRequestReplySpec.scala @@ -0,0 +1,86 @@ +package se.scalablesolutions.akka.actor + +import java.util.concurrent.{TimeUnit, CyclicBarrier, TimeoutException} + +import org.scalatest.WordSpec +import org.scalatest.matchers.MustMatchers + +import se.scalablesolutions.akka.dispatch.Dispatchers +import Actor._ + +class UntypedActorFireForgetRequestReplySpec extends WordSpec with MustMatchers { + + "An UntypedActor" should { + "reply to message sent with 'sendOneWay' using 'reply'" in { + UntypedActorTestState.finished = new CyclicBarrier(2); + UntypedActorTestState.log = "NIL"; + val replyActor = UntypedActor.actorOf(classOf[ReplyUntypedActor]).start + val senderActor = UntypedActor.actorOf(classOf[SenderUntypedActor]).start + senderActor.sendOneWay(replyActor) + senderActor.sendOneWay("ReplyToSendOneWayUsingReply") + try { UntypedActorTestState.finished.await(1L, TimeUnit.SECONDS) } + catch { case e: TimeoutException => fail("Never got the message") } + UntypedActorTestState.log must be ("Reply") + } + + "reply to message sent with 'sendOneWay' using 'sender' reference" in { + UntypedActorTestState.finished = new CyclicBarrier(2); + UntypedActorTestState.log = "NIL"; + val replyActor = UntypedActor.actorOf(classOf[ReplyUntypedActor]).start + val senderActor = UntypedActor.actorOf(classOf[SenderUntypedActor]).start + senderActor.sendOneWay(replyActor) + senderActor.sendOneWay("ReplyToSendOneWayUsingSender") + try { UntypedActorTestState.finished.await(1L, TimeUnit.SECONDS) } + catch { case e: TimeoutException => fail("Never got the message") } + UntypedActorTestState.log must be ("Reply") + } + + "reply to message sent with 'sendRequestReply' using 'reply'" in { + UntypedActorTestState.finished = new CyclicBarrier(2); + UntypedActorTestState.log = "NIL"; + val replyActor = UntypedActor.actorOf(classOf[ReplyUntypedActor]).start + val senderActor = UntypedActor.actorOf(classOf[SenderUntypedActor]).start + senderActor.sendOneWay(replyActor) + senderActor.sendOneWay("ReplyToSendRequestReplyUsingReply") + try { UntypedActorTestState.finished.await(1L, TimeUnit.SECONDS) } + catch { case e: TimeoutException => fail("Never got the message") } + UntypedActorTestState.log must be ("Reply") + } + + "reply to message sent with 'sendRequestReply' using 'sender future' reference" in { + UntypedActorTestState.finished = new CyclicBarrier(2); + UntypedActorTestState.log = "NIL"; + val replyActor = UntypedActor.actorOf(classOf[ReplyUntypedActor]).start + val senderActor = UntypedActor.actorOf(classOf[SenderUntypedActor]).start + senderActor.sendOneWay(replyActor) + senderActor.sendOneWay("ReplyToSendRequestReplyUsingFuture") + try { UntypedActorTestState.finished.await(1L, TimeUnit.SECONDS) } + catch { case e: TimeoutException => fail("Never got the message") } + UntypedActorTestState.log must be ("Reply") + } + + "reply to message sent with 'sendRequestReplyFuture' using 'reply'" in { + UntypedActorTestState.finished = new CyclicBarrier(2); + UntypedActorTestState.log = "NIL"; + val replyActor = UntypedActor.actorOf(classOf[ReplyUntypedActor]).start + val senderActor = UntypedActor.actorOf(classOf[SenderUntypedActor]).start + senderActor.sendOneWay(replyActor) + senderActor.sendOneWay("ReplyToSendRequestReplyFutureUsingReply") + try { UntypedActorTestState.finished.await(1L, TimeUnit.SECONDS) } + catch { case e: TimeoutException => fail("Never got the message") } + UntypedActorTestState.log must be ("Reply") + } + + "reply to message sent with 'sendRequestReplyFuture' using 'sender future' reference" in { + UntypedActorTestState.finished = new CyclicBarrier(2); + UntypedActorTestState.log = "NIL"; + val replyActor = UntypedActor.actorOf(classOf[ReplyUntypedActor]).start + val senderActor = UntypedActor.actorOf(classOf[SenderUntypedActor]).start + senderActor.sendOneWay(replyActor) + senderActor.sendOneWay("ReplyToSendRequestReplyFutureUsingFuture") + try { UntypedActorTestState.finished.await(1L, TimeUnit.SECONDS) } + catch { case e: TimeoutException => fail("Never got the message") } + UntypedActorTestState.log must be ("Reply") + } + } +}