diff --git a/akka-camel/src/main/scala/component/ActorComponent.scala b/akka-camel/src/main/scala/component/ActorComponent.scala
index c1421535d0..f5304607b0 100644
--- a/akka-camel/src/main/scala/component/ActorComponent.scala
+++ b/akka-camel/src/main/scala/component/ActorComponent.scala
@@ -22,7 +22,7 @@ import scala.reflect.BeanProperty
import CamelMessageConversion.toExchangeAdapter
import java.lang.Throwable
-import se.scalablesolutions.akka.actor.{ActorRegistry, Actor, ActorRef}
+import se.scalablesolutions.akka.actor.{ScalaActorRef, ActorRegistry, Actor, ActorRef}
/**
* Camel component for sending messages to and receiving replies from actors.
@@ -197,7 +197,7 @@ private[akka] object AsyncCallbackAdapter {
*
* @author Martin Krasser
*/
-private[akka] class AsyncCallbackAdapter(exchange: Exchange, callback: AsyncCallback) extends ActorRef {
+private[akka] class AsyncCallbackAdapter(exchange: Exchange, callback: AsyncCallback) extends ActorRef with ScalaActorRef {
def start = {
_isRunning = true
diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala
index 0f1e2c00cc..eaaf60b9f0 100644
--- a/akka-core/src/main/scala/actor/Actor.scala
+++ b/akka-core/src/main/scala/actor/Actor.scala
@@ -104,7 +104,24 @@ object Actor extends Logging {
* val actor = actorOf[MyActor].start
*
*/
- def actorOf[T <: Actor : Manifest]: ActorRef = new LocalActorRef(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]])
+ def actorOf[T <: Actor : Manifest]: ActorRef = actorOf(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]])
+
+ /**
+ * Creates an ActorRef out of the Actor with type T.
+ *
+ * import Actor._
+ * val actor = actorOf[MyActor]
+ * actor.start
+ * actor ! message
+ * actor.stop
+ *
+ * You can create and start the actor in one statement like this:
+ *
+ * val actor = actorOf[MyActor].start
+ *
+ */
+ def actorOf(clazz: Class[_ <: Actor]): ActorRef = new LocalActorRef(clazz)
+
/**
* Creates an ActorRef out of the Actor. Allows you to pass in a factory function
@@ -365,7 +382,7 @@ trait Actor extends Logging {
* self.stop(..)
*
*/
- @transient val self: ActorRef = someSelf.get
+ @transient val self: ScalaActorRef = someSelf.get
/**
* User overridable callback/setting.
diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala
index 2fd04a992b..464b562bee 100644
--- a/akka-core/src/main/scala/actor/ActorRef.scala
+++ b/akka-core/src/main/scala/actor/ActorRef.scala
@@ -65,7 +65,8 @@ import java.util.concurrent.{ScheduledFuture, ConcurrentHashMap, TimeUnit}
*
* @author Jonas Bonér
*/
-trait ActorRef extends TransactionManagement with java.lang.Comparable[ActorRef] {
+trait ActorRef extends ActorRefShared with TransactionManagement with java.lang.Comparable[ActorRef] {
+ scalaRef: ScalaActorRef =>
// Only mutable for RemoteServer in order to maintain identity across nodes
@volatile protected[akka] var _uuid = UUID.newUuid.toString
@@ -110,64 +111,14 @@ trait ActorRef extends TransactionManagement with java.lang.Comparable[ActorRef]
def setReceiveTimeout(timeout: Long) = this.receiveTimeout = Some(timeout)
def getReceiveTimeout(): Option[Long] = receiveTimeout
- /**
- * User overridable callback/setting.
- *
- *
- * Set trapExit to the list of exception classes that the actor should be able to trap
- * from the actor it is supervising. When the supervising actor throws these exceptions
- * then they will trigger a restart.
- *
- *
- * Trap no exceptions:
- *
- * trapExit = Nil
- *
- *
- * Trap all exceptions:
- *
- * trapExit = List(classOf[Throwable])
- *
- *
- * Trap specific exceptions only:
- *
- * trapExit = List(classOf[MyApplicationException], classOf[MyApplicationError])
- *
- */
- @volatile var trapExit: List[Class[_ <: Throwable]] = Nil
-
//Java methods
def setTrapExit(exceptions: Array[Class[_ <: Throwable]]) = trapExit = exceptions.toList
def getTrapExit(): Array[Class[_ <: Throwable]] = trapExit.toArray
-
- /**
- * User overridable callback/setting.
- *
- * If 'trapExit' is set for the actor to act as supervisor, then a faultHandler must be defined.
- *
- * Can be one of:
- *
- * faultHandler = Some(AllForOneStrategy(maxNrOfRetries, withinTimeRange))
- *
- * Or:
- *
- * faultHandler = Some(OneForOneStrategy(maxNrOfRetries, withinTimeRange))
- *
- */
- @volatile var faultHandler: Option[FaultHandlingStrategy] = None
-
//Java Methods
def setFaultHandler(handler: FaultHandlingStrategy) = this.faultHandler = Some(handler)
def getFaultHandler(): Option[FaultHandlingStrategy] = faultHandler
- /**
- * User overridable callback/setting.
- *
- * Defines the life-cycle for a supervised actor.
- */
- @volatile var lifeCycle: Option[LifeCycle] = None
-
//Java Methods
def setLifeCycle(lifeCycle: LifeCycle) = this.lifeCycle = Some(lifeCycle)
def getLifeCycle(): Option[LifeCycle] = lifeCycle
@@ -235,19 +186,9 @@ trait ActorRef extends TransactionManagement with java.lang.Comparable[ActorRef]
/**
* Returns the uuid for the actor.
*/
+ def getUuid() = _uuid
def uuid = _uuid
- /**
- * The reference sender Actor of the last received message.
- * Is defined if the message was sent from another Actor, else None.
- */
- def sender: Option[ActorRef] = {
- // Five lines of map-performance-avoidance, could be just: currentMessage map { _.sender }
- val msg = currentMessage
- if(msg.isEmpty) None
- else msg.get.sender
- }
-
//Java Methods
def getSender(): Option[ActorRef] = sender
@@ -255,12 +196,7 @@ trait ActorRef extends TransactionManagement with java.lang.Comparable[ActorRef]
* The reference sender future of the last received message.
* Is defined if the message was sent with sent with '!!' or '!!!', else None.
*/
- def senderFuture(): Option[CompletableFuture[Any]] = {
- // Five lines of map-performance-avoidance, could be just: currentMessage map { _.senderFuture }
- val msg = currentMessage
- if(msg.isEmpty) None
- else msg.get.senderFuture
- }
+ def getSenderFuture(): Option[CompletableFuture[Any]] = senderFuture
/**
* Is the actor being restarted?
@@ -287,62 +223,10 @@ trait ActorRef extends TransactionManagement with java.lang.Comparable[ActorRef]
*/
protected[akka] def uuid_=(uid: String) = _uuid = uid
- /**
- * Sends a one-way asynchronous message. E.g. fire-and-forget semantics.
- *
- *
- * If invoked from within an actor then the actor reference is implicitly passed on as the implicit 'sender' argument.
- *
- *
- * This actor 'sender' reference is then available in the receiving actor in the 'sender' member variable,
- * if invoked from within an Actor. If not then no sender is available.
- *
- * actor ! message
- *
- *
- */
- def !(message: Any)(implicit sender: Option[ActorRef] = None): Unit = {
- if (isRunning) postMessageToMailbox(message, sender)
- else throw new ActorInitializationException(
- "Actor has not been started, you need to invoke 'actor.start' before using it")
- }
-
//Java Methods
def sendOneWay(message: AnyRef): Unit = sendOneWay(message,null)
def sendOneWay(message: AnyRef, sender: ActorRef): Unit = this.!(message)(Option(sender))
- /**
- * Sends a message asynchronously and waits on a future for a reply message.
- *
- * It waits on the reply either until it receives it (in the form of Some(replyMessage))
- * or until the timeout expires (which will return None). E.g. send-and-receive-eventually semantics.
- *
- * NOTE:
- * Use this method with care. In most cases it is better to use '!' together with the 'sender' member field to
- * implement request/response message exchanges.
- * If you are sending messages using !! then you have to use self.reply(..)
- * to send a reply message to the original sender. If not then the sender will block until the timeout expires.
- */
- def !!(message: Any, timeout: Long = this.timeout)(implicit sender: Option[ActorRef] = None): Option[Any] = {
- if (isRunning) {
- val future = postMessageToMailboxAndCreateFutureResultWithTimeout[Any](message, timeout, sender, None)
- val isTypedActor = message.isInstanceOf[Invocation]
- if (isTypedActor && message.asInstanceOf[Invocation].isVoid) {
- future.asInstanceOf[CompletableFuture[Option[_]]].completeWithResult(None)
- }
- try {
- future.await
- } catch {
- case e: FutureTimeoutException =>
- if (isTypedActor) throw e
- else None
- }
- if (future.exception.isDefined) throw future.exception.get
- else future.result
- } else throw new ActorInitializationException(
- "Actor has not been started, you need to invoke 'actor.start' before using it")
- }
-
//Java Methods
def sendRequestReply(message: AnyRef): AnyRef = sendRequestReply(message,timeout,null)
def sendRequestReply(message: AnyRef, sender: ActorRef): AnyRef = sendRequestReply(message,timeout,sender)
@@ -356,78 +240,20 @@ trait ActorRef extends TransactionManagement with java.lang.Comparable[ActorRef]
.asInstanceOf[AnyRef]
}
- /**
- * Sends a message asynchronously returns a future holding the eventual reply message.
- *
- * NOTE:
- * Use this method with care. In most cases it is better to use '!' together with the 'sender' member field to
- * implement request/response message exchanges.
- * If you are sending messages using !!! then you have to use self.reply(..)
- * to send a reply message to the original sender. If not then the sender will block until the timeout expires.
- */
- def !!(implicit sender: Option[ActorRef] = None): Future[T] = {
- if (isRunning) postMessageToMailboxAndCreateFutureResultWithTimeout[T](message, timeout, sender, None)
- else throw new ActorInitializationException(
- "Actor has not been started, you need to invoke 'actor.start' before using it")
- }
-
//Java Methods
def sendRequestReplyFuture(message: AnyRef): Future[_] = sendRequestReplyFuture(message,timeout,null)
def sendRequestReplyFuture(message: AnyRef, sender: ActorRef): Future[_] = sendRequestReplyFuture(message,timeout,sender)
def sendRequestReplyFuture(message: AnyRef, timeout: Long, sender: ActorRef): Future[_] = !!!(message,timeout)(Option(sender))
- /**
- * Forwards the message and passes the original sender actor as the sender.
- *
- * Works with '!', '!!' and '!!!'.
- */
- def forward(message: Any)(implicit sender: Some[ActorRef]) = {
- if (isRunning) {
- if (sender.get.senderFuture.isDefined) postMessageToMailboxAndCreateFutureResultWithTimeout(
- message, timeout, sender.get.sender, sender.get.senderFuture)
- else if (sender.get.sender.isDefined) postMessageToMailbox(message, Some(sender.get.sender.get))
- else throw new IllegalActorStateException("Can't forward message when initial sender is not an actor")
- } else throw new ActorInitializationException("Actor has not been started, you need to invoke 'actor.start' before using it")
- }
-
//Java Methods
def forward(message: AnyRef, sender: ActorRef): Unit =
if (sender eq null) throw new IllegalArgumentException("The 'sender' argument to 'forward' can't be null")
else forward(message)(Some(sender))
- /**
- * Use self.reply(..) to reply with a message to the original sender of the message currently
- * being processed.
- *
- * Throws an IllegalStateException if unable to determine what to reply to.
- */
- def reply(message: Any) = if(!reply_?(message)) throw new IllegalActorStateException(
- "\n\tNo sender in scope, can't reply. " +
- "\n\tYou have probably: " +
- "\n\t\t1. Sent a message to an Actor from an instance that is NOT an Actor." +
- "\n\t\t2. Invoked a method on an TypedActor from an instance NOT an TypedActor." +
- "\n\tElse you might want to use 'reply_?' which returns Boolean(true) if succes and Boolean(false) if no sender in scope")
-
//Java Methods
def replyUnsafe(message: AnyRef) = reply(message)
- /**
- * Use reply_?(..) to reply with a message to the original sender of the message currently
- * being processed.
- *
- * Returns true if reply was sent, and false if unable to determine what to reply to.
- */
- def reply_?(message: Any): Boolean = {
- if (senderFuture.isDefined) {
- senderFuture.get completeWithResult message
- true
- } else if (sender.isDefined) {
- sender.get ! message
- true
- } else false
- }
-
//Java Methods
def replySafe(message: AnyRef): Boolean = reply_?(message)
@@ -570,39 +396,15 @@ trait ActorRef extends TransactionManagement with java.lang.Comparable[ActorRef]
*/
def startLinkRemote(actorRef: ActorRef, hostname: String, port: Int): Unit
- /**
- * Atomically create (from actor class) and start an actor.
- */
- def spawn[T <: Actor : Manifest]: ActorRef =
- spawn(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]])
-
//Java Methods
def spawn(clazz: Class[_ <: Actor]): ActorRef
- /**
- * Atomically create (from actor class), start and make an actor remote.
- */
- def spawnRemote[T <: Actor: Manifest](hostname: String, port: Int): ActorRef =
- spawnRemote(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]],hostname,port)
-
//Java Methods
def spawnRemote(clazz: Class[_ <: Actor], hostname: String, port: Int): ActorRef
- /**
- * Atomically create (from actor class), start and link an actor.
- */
- def spawnLink[T <: Actor: Manifest]: ActorRef =
- spawnLink(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]])
-
//Java Methods
def spawnLink(clazz: Class[_ <: Actor]): ActorRef
- /**
- * Atomically create (from actor class), start, link and make an actor remote.
- */
- def spawnLinkRemote[T <: Actor : Manifest](hostname: String, port: Int): ActorRef =
- spawnLinkRemote(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]],hostname,port)
-
//Java Methods
def spawnLinkRemote(clazz: Class[_ <: Actor], hostname: String, port: Int): ActorRef
@@ -623,12 +425,6 @@ trait ActorRef extends TransactionManagement with java.lang.Comparable[ActorRef]
//Java Methods
def getSupervisor(): ActorRef = supervisor getOrElse null
-
- /**
- * Shuts down and removes all linked actors.
- */
- def shutdownLinkedActors(): Unit
-
protected[akka] def invoke(messageHandle: MessageInvocation): Unit
protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit
@@ -694,7 +490,7 @@ trait ActorRef extends TransactionManagement with java.lang.Comparable[ActorRef]
*/
class LocalActorRef private[akka](
private[this] var actorFactory: Either[Option[Class[_ <: Actor]], Option[() => Actor]] = Left(None))
- extends ActorRef {
+ extends ActorRef with ScalaActorRef {
@volatile private[akka] var _remoteAddress: Option[InetSocketAddress] = None // only mutable to maintain identity across nodes
@volatile private[akka] var _linkedActors: Option[ConcurrentHashMap[String, ActorRef]] = None
@@ -1386,7 +1182,7 @@ object RemoteActorSystemMessage {
private[akka] case class RemoteActorRef private[akka] (
uuuid: String, val className: String, val hostname: String, val port: Int, _timeout: Long, loader: Option[ClassLoader])
// uuid: String, className: String, hostname: String, port: Int, timeOut: Long, isOnRemoteHost: Boolean) extends ActorRef {
- extends ActorRef {
+ extends ActorRef with ScalaActorRef {
_uuid = uuuid
timeout = _timeout
@@ -1461,3 +1257,249 @@ private[akka] case class RemoteActorRef private[akka] (
private def unsupported = throw new UnsupportedOperationException("Not supported for RemoteActorRef")
}
+
+/**
+ * This trait represents the common (external) methods for all ActorRefs
+ * Needed because implicit conversions aren't applied when instance imports are used
+ *
+ * i.e.
+ * var self: ScalaActorRef = ...
+ * import self._
+ * //can't call ActorRef methods here unless they are declared in a common
+ * //superclass, which ActorRefShared is.
+ */
+trait ActorRefShared {
+ /**
+ * Returns the uuid for the actor.
+ */
+ def uuid: String
+
+ /**
+ * Shuts down and removes all linked actors.
+ */
+ def shutdownLinkedActors(): Unit
+}
+
+/**
+ * This trait represents the Scala Actor API
+ * There are implicit conversions in ../actor/Implicits.scala
+ * from ActorRef -> ScalaActorRef and back
+ */
+trait ScalaActorRef extends ActorRefShared { ref: ActorRef =>
+
+ def id: String
+ def id_=(id: String):Unit
+
+ /**
+ * User overridable callback/setting.
+ *
+ * Defines the life-cycle for a supervised actor.
+ */
+ @volatile var lifeCycle: Option[LifeCycle] = None
+
+ /**
+ * User overridable callback/setting.
+ *
+ *
+ * Set trapExit to the list of exception classes that the actor should be able to trap
+ * from the actor it is supervising. When the supervising actor throws these exceptions
+ * then they will trigger a restart.
+ *
+ *
+ * Trap no exceptions:
+ *
+ * trapExit = Nil
+ *
+ *
+ * Trap all exceptions:
+ *
+ * trapExit = List(classOf[Throwable])
+ *
+ *
+ * Trap specific exceptions only:
+ *
+ * trapExit = List(classOf[MyApplicationException], classOf[MyApplicationError])
+ *
+ */
+ @volatile var trapExit: List[Class[_ <: Throwable]] = Nil
+
+
+ /**
+ * User overridable callback/setting.
+ *
+ * If 'trapExit' is set for the actor to act as supervisor, then a faultHandler must be defined.
+ *
+ * Can be one of:
+ *
+ * faultHandler = Some(AllForOneStrategy(maxNrOfRetries, withinTimeRange))
+ *
+ * Or:
+ *
+ * faultHandler = Some(OneForOneStrategy(maxNrOfRetries, withinTimeRange))
+ *
+ */
+ @volatile var faultHandler: Option[FaultHandlingStrategy] = None
+
+
+ /**
+ * The reference sender Actor of the last received message.
+ * Is defined if the message was sent from another Actor, else None.
+ */
+ def sender: Option[ActorRef] = {
+ // Five lines of map-performance-avoidance, could be just: currentMessage map { _.sender }
+ val msg = currentMessage
+ if(msg.isEmpty) None
+ else msg.get.sender
+ }
+
+ /**
+ * The reference sender future of the last received message.
+ * Is defined if the message was sent with sent with '!!' or '!!!', else None.
+ */
+ def senderFuture(): Option[CompletableFuture[Any]] = {
+ // Five lines of map-performance-avoidance, could be just: currentMessage map { _.senderFuture }
+ val msg = currentMessage
+ if(msg.isEmpty) None
+ else msg.get.senderFuture
+ }
+
+
+ /**
+ * Sends a one-way asynchronous message. E.g. fire-and-forget semantics.
+ *
+ *
+ * If invoked from within an actor then the actor reference is implicitly passed on as the implicit 'sender' argument.
+ *
+ *
+ * This actor 'sender' reference is then available in the receiving actor in the 'sender' member variable,
+ * if invoked from within an Actor. If not then no sender is available.
+ *
+ * actor ! message
+ *
+ *
+ */
+ def !(message: Any)(implicit sender: Option[ActorRef] = None): Unit = {
+ if (isRunning) postMessageToMailbox(message, sender)
+ else throw new ActorInitializationException(
+ "Actor has not been started, you need to invoke 'actor.start' before using it")
+ }
+
+ /**
+ * Sends a message asynchronously and waits on a future for a reply message.
+ *
+ * It waits on the reply either until it receives it (in the form of Some(replyMessage))
+ * or until the timeout expires (which will return None). E.g. send-and-receive-eventually semantics.
+ *
+ * NOTE:
+ * Use this method with care. In most cases it is better to use '!' together with the 'sender' member field to
+ * implement request/response message exchanges.
+ * If you are sending messages using !! then you have to use self.reply(..)
+ * to send a reply message to the original sender. If not then the sender will block until the timeout expires.
+ */
+ def !!(message: Any, timeout: Long = this.timeout)(implicit sender: Option[ActorRef] = None): Option[Any] = {
+ if (isRunning) {
+ val future = postMessageToMailboxAndCreateFutureResultWithTimeout[Any](message, timeout, sender, None)
+ val isTypedActor = message.isInstanceOf[Invocation]
+ if (isTypedActor && message.asInstanceOf[Invocation].isVoid) {
+ future.asInstanceOf[CompletableFuture[Option[_]]].completeWithResult(None)
+ }
+ try {
+ future.await
+ } catch {
+ case e: FutureTimeoutException =>
+ if (isTypedActor) throw e
+ else None
+ }
+ if (future.exception.isDefined) throw future.exception.get
+ else future.result
+ } else throw new ActorInitializationException(
+ "Actor has not been started, you need to invoke 'actor.start' before using it")
+ }
+
+
+ /**
+ * Sends a message asynchronously returns a future holding the eventual reply message.
+ *
+ * NOTE:
+ * Use this method with care. In most cases it is better to use '!' together with the 'sender' member field to
+ * implement request/response message exchanges.
+ * If you are sending messages using !!! then you have to use self.reply(..)
+ * to send a reply message to the original sender. If not then the sender will block until the timeout expires.
+ */
+ def !!(implicit sender: Option[ActorRef] = None): Future[T] = {
+ if (isRunning) postMessageToMailboxAndCreateFutureResultWithTimeout[T](message, timeout, sender, None)
+ else throw new ActorInitializationException(
+ "Actor has not been started, you need to invoke 'actor.start' before using it")
+ }
+
+ /**
+ * Forwards the message and passes the original sender actor as the sender.
+ *
+ * Works with '!', '!!' and '!!!'.
+ */
+ def forward(message: Any)(implicit sender: Some[ActorRef]) = {
+ if (isRunning) {
+ if (sender.get.senderFuture.isDefined) postMessageToMailboxAndCreateFutureResultWithTimeout(
+ message, timeout, sender.get.sender, sender.get.senderFuture)
+ else if (sender.get.sender.isDefined) postMessageToMailbox(message, Some(sender.get.sender.get))
+ else throw new IllegalActorStateException("Can't forward message when initial sender is not an actor")
+ } else throw new ActorInitializationException("Actor has not been started, you need to invoke 'actor.start' before using it")
+ }
+
+ /**
+ * Use self.reply(..) to reply with a message to the original sender of the message currently
+ * being processed.
+ *
+ * Throws an IllegalStateException if unable to determine what to reply to.
+ */
+ def reply(message: Any) = if(!reply_?(message)) throw new IllegalActorStateException(
+ "\n\tNo sender in scope, can't reply. " +
+ "\n\tYou have probably: " +
+ "\n\t\t1. Sent a message to an Actor from an instance that is NOT an Actor." +
+ "\n\t\t2. Invoked a method on an TypedActor from an instance NOT an TypedActor." +
+ "\n\tElse you might want to use 'reply_?' which returns Boolean(true) if succes and Boolean(false) if no sender in scope")
+
+ /**
+ * Use reply_?(..) to reply with a message to the original sender of the message currently
+ * being processed.
+ *
+ * Returns true if reply was sent, and false if unable to determine what to reply to.
+ */
+ def reply_?(message: Any): Boolean = {
+ if (senderFuture.isDefined) {
+ senderFuture.get completeWithResult message
+ true
+ } else if (sender.isDefined) {
+ sender.get ! message
+ true
+ } else false
+ }
+
+
+
+ /**
+ * Atomically create (from actor class) and start an actor.
+ */
+ def spawn[T <: Actor : Manifest]: ActorRef =
+ spawn(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]])
+
+ /**
+ * Atomically create (from actor class), start and make an actor remote.
+ */
+ def spawnRemote[T <: Actor: Manifest](hostname: String, port: Int): ActorRef =
+ spawnRemote(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]],hostname,port)
+
+
+ /**
+ * Atomically create (from actor class), start and link an actor.
+ */
+ def spawnLink[T <: Actor: Manifest]: ActorRef =
+ spawnLink(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]])
+
+
+ /**
+ * Atomically create (from actor class), start, link and make an actor remote.
+ */
+ def spawnLinkRemote[T <: Actor : Manifest](hostname: String, port: Int): ActorRef =
+ spawnLinkRemote(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]],hostname,port)
+}
\ No newline at end of file
diff --git a/akka-core/src/main/scala/actor/Implicits.scala b/akka-core/src/main/scala/actor/Implicits.scala
new file mode 100644
index 0000000000..16bce4b016
--- /dev/null
+++ b/akka-core/src/main/scala/actor/Implicits.scala
@@ -0,0 +1,15 @@
+/**
+ * Copyright (C) 2009-2010 Scalable Solutions AB
+ */
+
+package se.scalablesolutions.akka
+
+import actor.{ScalaActorRef, ActorRef}
+
+package object actor {
+ implicit def actorRef2Scala(ref: ActorRef): ScalaActorRef =
+ ref.asInstanceOf[ScalaActorRef]
+
+ implicit def scala2ActorRef(ref: ScalaActorRef): ActorRef =
+ ref.asInstanceOf[ActorRef]
+}
\ No newline at end of file
diff --git a/akka-core/src/main/scala/actor/UntypedActor.scala b/akka-core/src/main/scala/actor/UntypedActor.scala
index 580320ec8d..e94ea94b3e 100644
--- a/akka-core/src/main/scala/actor/UntypedActor.scala
+++ b/akka-core/src/main/scala/actor/UntypedActor.scala
@@ -118,11 +118,9 @@ abstract class RemoteUntypedActor(address: InetSocketAddress) extends UntypedAct
* @author Jonas Bonér
*/
object UntypedActor {
-
/**
- * Creates an ActorRef out of the Actor. Allows you to pass in the class for the Actor.
- *
- * Example in Java:
+ * Creates an ActorRef out of the Actor type represented by the class provided.
+ * Example in Java:
*
* ActorRef actor = UntypedActor.actorOf(MyUntypedActor.class);
* actor.start();
@@ -131,13 +129,12 @@ object UntypedActor {
*
* You can create and start the actor in one statement like this:
*
- * ActorRef actor = UntypedActor.actorOf(MyUntypedActor.class).start();
+ * val actor = actorOf(classOf[MyActor]).start
*
*/
- def actorOf(clazz: Class[_ <: UntypedActor]): ActorRef =
- Actor.actorOf(clazz.newInstance)
+ def actorOf[T <: Actor](clazz: Class[T]): ActorRef = Actor.actorOf(clazz)
- /**
+ /**
* NOTE: Use this convenience method with care, do NOT make it possible to get a reference to the
* UntypedActor instance directly, but only through its 'ActorRef' wrapper reference.
*
@@ -148,8 +145,8 @@ object UntypedActor {
* Example in Java:
*
* ActorRef actor = UntypedActor.actorOf(new UntypedActorFactory() {
- * public UntypedActor create() {
- * return new MyUntypedActor("service:name", 5);
+ * public UntypedActor create() {
+ * return new MyUntypedActor("service:name", 5);
* }
* });
* actor.start();
@@ -157,6 +154,5 @@ object UntypedActor {
* actor.stop();
*
*/
- def actorOf(factory: UntypedActorFactory) =
- Actor.actorOf(factory.create)
+ def actorOf(factory: UntypedActorFactory): ActorRef = Actor.actorOf(factory.create)
}
\ No newline at end of file
diff --git a/akka-core/src/test/java/se/scalablesolutions/akka/actor/ReplyUntypedActor.java b/akka-core/src/test/java/se/scalablesolutions/akka/actor/ReplyUntypedActor.java
deleted file mode 100644
index 2e2d5c6189..0000000000
--- a/akka-core/src/test/java/se/scalablesolutions/akka/actor/ReplyUntypedActor.java
+++ /dev/null
@@ -1,33 +0,0 @@
-package se.scalablesolutions.akka.actor;
-
-import se.scalablesolutions.akka.actor.*;
-import se.scalablesolutions.akka.actor.UntypedActor;
-
-public class ReplyUntypedActor extends UntypedActor {
-
- public ReplyUntypedActor() {
- ActorRef actor = UntypedActor.untypedActorOf(ReplyUntypedActor.class)
- actor.setId("JavaNinja");
- actor.start()
- actor.sendOneWay("ReplyToSendOneWayUsingReply");
- }
-
- public void onReceive(Object message) throws Exception {
- if (message instanceof String) {
- String str = (String)message;
- if (str.equals("ReplyToSendOneWayUsingReply")) {
- getContext().replyUnsafe("Reply");
- } else if (str.equals("ReplyToSendOneWayUsingSender")) {
- getContext().getSender().get().sendOneWay("Reply");
- } else if (str.equals("ReplyToSendRequestReplyUsingReply")) {
- getContext().replyUnsafe("Reply");
- } else if (str.equals("ReplyToSendRequestReplyUsingFuture")) {
- getContext().getSenderFuture().get().completeWithResult("Reply");
- } else if (str.equals("ReplyToSendRequestReplyFutureUsingReply")) {
- getContext().replyUnsafe("Reply");
- } else if (str.equals("ReplyToSendRequestReplyFutureUsingFuture")) {
- getContext().getSenderFuture().get().completeWithResult("Reply");
- } else throw new IllegalArgumentException("Unknown message: " + str);
- } else throw new IllegalArgumentException("Unknown message: " + message);
- }
-}
diff --git a/akka-core/src/test/java/se/scalablesolutions/akka/actor/SampleUntypedActor.java b/akka-core/src/test/java/se/scalablesolutions/akka/actor/SampleUntypedActor.java
deleted file mode 100644
index add3b86061..0000000000
--- a/akka-core/src/test/java/se/scalablesolutions/akka/actor/SampleUntypedActor.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Copyright (C) 2009-2010 Scalable Solutions AB
- */
-package se.scalablesolutions.akka.actor;
-
-import se.scalablesolutions.akka.actor.*;
-
-/**
- * Here is an example on how to create and use an UntypedActor.
- *
- * @author Jonas Bonér
- */
-public class SampleUntypedActor extends UntypedActor {
-
- public void onReceive(Object message) throws Exception {
- if (message instanceof String) {
- String msg = (String)message;
- System.out.println("Received message: " + msg);
-
- if (msg.equals("UseReply")) {
- // Reply to original sender of message using the 'replyUnsafe' method
- getContext().replyUnsafe(msg + ":" + getContext().getUuid());
-
- } else if (msg.equals("UseSender") && getContext().getSender().isDefined()) {
- // Reply to original sender of message using the sender reference
- // also passing along my own refererence (the context)
- getContext().getSender().get().sendOneWay(msg, getContext());
-
- } else if (msg.equals("UseSenderFuture") && getContext().getSenderFuture().isDefined()) {
- // Reply to original sender of message using the sender future reference
- getContext().getSenderFuture().get().completeWithResult(msg);
-
- } else if (msg.equals("SendToSelf")) {
- // Send fire-forget message to the actor recursively
- getContext().sendOneWay(msg);
-
- } else if (msg.equals("ForwardMessage")) {
- // Retreive an actor from the ActorRegistry by ID and get an ActorRef back
- ActorRef actorRef = ActorRegistry.actorsFor("some-actor-id")[0];
- // Wrap the ActorRef in an UntypedActorRef and forward the message to this actor
- UntypedActorRef.wrap(actorRef).forward(msg, getContext());
-
- } else throw new IllegalArgumentException("Unknown message: " + message);
- } else throw new IllegalArgumentException("Unknown message: " + message);
- }
-
- public static void main(String[] args) {
- UntypedActorRef actor = UntypedActor.actorOf(SampleUntypedActor.class);
- actor.start();
- actor.sendOneWay("SendToSelf");
- actor.stop();
- }
-}
diff --git a/akka-core/src/test/java/se/scalablesolutions/akka/actor/SenderUntypedActor.java b/akka-core/src/test/java/se/scalablesolutions/akka/actor/SenderUntypedActor.java
deleted file mode 100644
index 9c9519de12..0000000000
--- a/akka-core/src/test/java/se/scalablesolutions/akka/actor/SenderUntypedActor.java
+++ /dev/null
@@ -1,44 +0,0 @@
-package se.scalablesolutions.akka.actor;
-
-import se.scalablesolutions.akka.actor.*;
-import se.scalablesolutions.akka.dispatch.CompletableFuture;
-
-public class SenderUntypedActor extends UntypedActor {
- private UntypedActorRef replyActor = null;
-
- public void onReceive(Object message) throws Exception {
- if (message instanceof UntypedActorRef) replyActor = (UntypedActorRef)message;
- else if (message instanceof String) {
- if (replyActor == null) throw new IllegalStateException("Need to receive a ReplyUntypedActor before any other message.");
- String str = (String)message;
-
- if (str.equals("ReplyToSendOneWayUsingReply")) {
- replyActor.sendOneWay("ReplyToSendOneWayUsingReply", getContext());
- } else if (str.equals("ReplyToSendOneWayUsingSender")) {
- replyActor.sendOneWay("ReplyToSendOneWayUsingSender", getContext());
-
- } else if (str.equals("ReplyToSendRequestReplyUsingReply")) {
- UntypedActorTestState.log = (String)replyActor.sendRequestReply("ReplyToSendRequestReplyUsingReply", getContext());
- UntypedActorTestState.finished.await();
- } else if (str.equals("ReplyToSendRequestReplyUsingFuture")) {
- UntypedActorTestState.log = (String)replyActor.sendRequestReply("ReplyToSendRequestReplyUsingFuture", getContext());
- UntypedActorTestState.finished.await();
-
- } else if (str.equals("ReplyToSendRequestReplyFutureUsingReply")) {
- CompletableFuture future = (CompletableFuture)replyActor.sendRequestReplyFuture("ReplyToSendRequestReplyFutureUsingReply", getContext());
- future.await();
- UntypedActorTestState.log = (String)future.result().get();
- UntypedActorTestState.finished.await();
- } else if (str.equals("ReplyToSendRequestReplyFutureUsingFuture")) {
- CompletableFuture future = (CompletableFuture)replyActor.sendRequestReplyFuture("ReplyToSendRequestReplyFutureUsingFuture", getContext());
- future.await();
- UntypedActorTestState.log = (String)future.result().get();
- UntypedActorTestState.finished.await();
-
- } else if (str.equals("Reply")) {
- UntypedActorTestState.log = "Reply";
- UntypedActorTestState.finished.await();
- }
- }
- }
-}
diff --git a/akka-core/src/test/java/se/scalablesolutions/akka/actor/UntypedActorTestState.java b/akka-core/src/test/java/se/scalablesolutions/akka/actor/UntypedActorTestState.java
deleted file mode 100644
index b94c5870fd..0000000000
--- a/akka-core/src/test/java/se/scalablesolutions/akka/actor/UntypedActorTestState.java
+++ /dev/null
@@ -1,10 +0,0 @@
-package se.scalablesolutions.akka.actor;
-
-import se.scalablesolutions.akka.actor.*;
-
-import java.util.concurrent.CyclicBarrier;
-
-public class UntypedActorTestState {
- public static String log = "NIL";
- public static CyclicBarrier finished = null;
-}
diff --git a/akka-core/src/test/scala/actor/untyped-actor/ForwardUntypedActorSpec.scala b/akka-core/src/test/scala/actor/untyped-actor/ForwardUntypedActorSpec.scala
deleted file mode 100644
index 26d3ba370e..0000000000
--- a/akka-core/src/test/scala/actor/untyped-actor/ForwardUntypedActorSpec.scala
+++ /dev/null
@@ -1,80 +0,0 @@
-package se.scalablesolutions.akka.actor
-
-import java.util.concurrent.{TimeUnit, CountDownLatch}
-import org.scalatest.junit.JUnitSuite
-import org.junit.Test
-
-import UntypedActor._
-
-object ForwardUntypedActorSpec {
- object ForwardState {
- var sender: Option[UntypedActorRef] = None
- }
-
- class ReceiverUntypedActor extends UntypedActor {
- //println(getClass + ":" + toString + " => " + getContext)
- val latch = new CountDownLatch(1)
- def onReceive(message: Any){
- println(getClass.getName + " got " + message)
- message match {
- case "SendBang" => {
- ForwardState.sender = getContext.getSender
- latch.countDown
- }
- case "SendBangBang" => getContext.replyUnsafe("SendBangBang")
- case x => throw new IllegalArgumentException("Unknown message: " + x);
- }
- }
- }
-
- class ForwardUntypedActor extends UntypedActor {
- val receiverActor = actorOf(classOf[ReceiverUntypedActor]).start
- def onReceive(message: Any){
- message match {
- case "SendBang" => receiverActor.forward("SendBang",getContext)
- case "SendBangBang" => receiverActor.forward("SendBangBang",getContext)
- }
- }
- }
-
- class BangSenderUntypedActor extends UntypedActor {
- val forwardActor = actorOf(classOf[ForwardUntypedActor]).start
- forwardActor.sendOneWay("SendBang",getContext)
- def onReceive(message: Any) = ()
- }
-
- class BangBangSenderUntypedActor extends UntypedActor {
- val latch: CountDownLatch = new CountDownLatch(1)
- val forwardActor = actorOf(classOf[ForwardUntypedActor]).start
- (forwardActor sendRequestReply "SendBangBang") match {
- case _ => latch.countDown
- }
- def onReceive(message: Any) = ()
- }
-}
-
-class ForwardUntypedActorSpec extends JUnitSuite {
- import ForwardUntypedActorSpec._
-
- @Test
- def shouldForwardUntypedActorReferenceWhenInvokingForwardOnBang {
- val senderActor = actorOf(classOf[BangSenderUntypedActor])
- val latch = senderActor.actorRef.actor.asInstanceOf[BangSenderUntypedActor]
- .forwardActor.actorRef.actor.asInstanceOf[ForwardUntypedActor]
- .receiverActor.actorRef.actor.asInstanceOf[ReceiverUntypedActor]
- .latch
-
- senderActor.start
- assert(latch.await(5L, TimeUnit.SECONDS))
- println(senderActor.actorRef.toString + " " + ForwardState.sender.get.actorRef.toString)
- assert(ForwardState.sender ne null)
- assert(senderActor.actorRef.toString === ForwardState.sender.get.actorRef.toString)
- }
-
- @Test
- def shouldForwardUntypedActorReferenceWhenInvokingForwardOnBangBang {
- val senderActor = actorOf(classOf[BangBangSenderUntypedActor]).start
- val latch = senderActor.actorRef.actor.asInstanceOf[BangBangSenderUntypedActor].latch
- assert(latch.await(1L, TimeUnit.SECONDS))
- }
-}
diff --git a/akka-core/src/test/scala/actor/untyped-actor/UntypedActorFireForgetRequestReplySpec.scala b/akka-core/src/test/scala/actor/untyped-actor/UntypedActorFireForgetRequestReplySpec.scala
deleted file mode 100644
index bb883142d6..0000000000
--- a/akka-core/src/test/scala/actor/untyped-actor/UntypedActorFireForgetRequestReplySpec.scala
+++ /dev/null
@@ -1,102 +0,0 @@
-package se.scalablesolutions.akka.actor
-
-import java.util.concurrent.{TimeUnit, CyclicBarrier, TimeoutException}
-
-import org.scalatest.WordSpec
-import org.scalatest.matchers.MustMatchers
-
-import se.scalablesolutions.akka.dispatch.Dispatchers
-import Actor._
-
-class UntypedActorFireForgetRequestReplySpec extends WordSpec with MustMatchers {
-
- "An UntypedActor" should {
- "be able to be created using the UntypedActor.actorOf(UntypedActorFactory) factory method" in {
- UntypedActorTestState.finished = new CyclicBarrier(2);
- UntypedActorTestState.log = "NIL";
- val replyActor = UntypedActor.actorOf(new UntypedActorFactory() {
- def create: UntypedActor = new ReplyUntypedActor
- }).start
- val senderActor = UntypedActor.actorOf(new UntypedActorFactory() {
- def create: UntypedActor = new SenderUntypedActor
- }).start
- senderActor.sendOneWay(replyActor)
- senderActor.sendOneWay("ReplyToSendOneWayUsingReply")
- try { UntypedActorTestState.finished.await(1L, TimeUnit.SECONDS) }
- catch { case e: TimeoutException => fail("Never got the message") }
- UntypedActorTestState.log must be ("Reply")
- }
-
- "reply to message sent with 'sendOneWay' using 'reply'" in {
- UntypedActorTestState.finished = new CyclicBarrier(2);
- UntypedActorTestState.log = "NIL";
- val replyActor = UntypedActor.actorOf(classOf[ReplyUntypedActor]).start
- val senderActor = UntypedActor.actorOf(classOf[SenderUntypedActor]).start
- senderActor.sendOneWay(replyActor)
- senderActor.sendOneWay("ReplyToSendOneWayUsingReply")
- try { UntypedActorTestState.finished.await(1L, TimeUnit.SECONDS) }
- catch { case e: TimeoutException => fail("Never got the message") }
- UntypedActorTestState.log must be ("Reply")
- }
-
- "reply to message sent with 'sendOneWay' using 'sender' reference" in {
- UntypedActorTestState.finished = new CyclicBarrier(2);
- UntypedActorTestState.log = "NIL";
- val replyActor = UntypedActor.actorOf(classOf[ReplyUntypedActor]).start
- val senderActor = UntypedActor.actorOf(classOf[SenderUntypedActor]).start
- senderActor.sendOneWay(replyActor)
- senderActor.sendOneWay("ReplyToSendOneWayUsingSender")
- try { UntypedActorTestState.finished.await(1L, TimeUnit.SECONDS) }
- catch { case e: TimeoutException => fail("Never got the message") }
- UntypedActorTestState.log must be ("Reply")
- }
-
- "reply to message sent with 'sendRequestReply' using 'reply'" in {
- UntypedActorTestState.finished = new CyclicBarrier(2);
- UntypedActorTestState.log = "NIL";
- val replyActor = UntypedActor.actorOf(classOf[ReplyUntypedActor]).start
- val senderActor = UntypedActor.actorOf(classOf[SenderUntypedActor]).start
- senderActor.sendOneWay(replyActor)
- senderActor.sendOneWay("ReplyToSendRequestReplyUsingReply")
- try { UntypedActorTestState.finished.await(1L, TimeUnit.SECONDS) }
- catch { case e: TimeoutException => fail("Never got the message") }
- UntypedActorTestState.log must be ("Reply")
- }
-
- "reply to message sent with 'sendRequestReply' using 'sender future' reference" in {
- UntypedActorTestState.finished = new CyclicBarrier(2);
- UntypedActorTestState.log = "NIL";
- val replyActor = UntypedActor.actorOf(classOf[ReplyUntypedActor]).start
- val senderActor = UntypedActor.actorOf(classOf[SenderUntypedActor]).start
- senderActor.sendOneWay(replyActor)
- senderActor.sendOneWay("ReplyToSendRequestReplyUsingFuture")
- try { UntypedActorTestState.finished.await(1L, TimeUnit.SECONDS) }
- catch { case e: TimeoutException => fail("Never got the message") }
- UntypedActorTestState.log must be ("Reply")
- }
-
- "reply to message sent with 'sendRequestReplyFuture' using 'reply'" in {
- UntypedActorTestState.finished = new CyclicBarrier(2);
- UntypedActorTestState.log = "NIL";
- val replyActor = UntypedActor.actorOf(classOf[ReplyUntypedActor]).start
- val senderActor = UntypedActor.actorOf(classOf[SenderUntypedActor]).start
- senderActor.sendOneWay(replyActor)
- senderActor.sendOneWay("ReplyToSendRequestReplyFutureUsingReply")
- try { UntypedActorTestState.finished.await(1L, TimeUnit.SECONDS) }
- catch { case e: TimeoutException => fail("Never got the message") }
- UntypedActorTestState.log must be ("Reply")
- }
-
- "reply to message sent with 'sendRequestReplyFuture' using 'sender future' reference" in {
- UntypedActorTestState.finished = new CyclicBarrier(2);
- UntypedActorTestState.log = "NIL";
- val replyActor = UntypedActor.actorOf(classOf[ReplyUntypedActor]).start
- val senderActor = UntypedActor.actorOf(classOf[SenderUntypedActor]).start
- senderActor.sendOneWay(replyActor)
- senderActor.sendOneWay("ReplyToSendRequestReplyFutureUsingFuture")
- try { UntypedActorTestState.finished.await(1L, TimeUnit.SECONDS) }
- catch { case e: TimeoutException => fail("Never got the message") }
- UntypedActorTestState.log must be ("Reply")
- }
- }
-}
diff --git a/akka-core/src/test/scala/actor/untyped-actor/UntypedActorReceiveTimeoutSpec.scala b/akka-core/src/test/scala/actor/untyped-actor/UntypedActorReceiveTimeoutSpec.scala
deleted file mode 100644
index 6ed8184514..0000000000
--- a/akka-core/src/test/scala/actor/untyped-actor/UntypedActorReceiveTimeoutSpec.scala
+++ /dev/null
@@ -1,59 +0,0 @@
-package se.scalablesolutions.akka.actor
-
-import org.scalatest.junit.JUnitSuite
-import org.junit.Test
-
-import java.util.concurrent.TimeUnit
-import org.multiverse.api.latches.StandardLatch
-import UntypedActor._
-
-object UntypedActorReceiveTimeoutSpec {
- object Tick
- class TestReceiveTimeoutActor extends UntypedActor {
- val latch = new StandardLatch
-
- def onReceive(message: Any):Unit = message match {
- case ReceiveTimeout => latch.open
- case Tick =>
- }
- }
-
- class FiveOhOhTestReceiveTimeoutActor extends TestReceiveTimeoutActor {
- getContext.setReceiveTimeout(500L)
- }
-}
-
-class UntypedActorReceiveTimeoutSpec extends JUnitSuite {
- import UntypedActorReceiveTimeoutSpec._
-
- @Test def receiveShouldGetTimeout = {
- val timeoutActor = actorOf(classOf[FiveOhOhTestReceiveTimeoutActor]).start
-
- assert(timeoutActor.actorRef.actor.asInstanceOf[TestReceiveTimeoutActor].latch.tryAwait(3, TimeUnit.SECONDS))
- }
-
- @Test def swappedReceiveShouldAlsoGetTimout = {
- val timeoutActor = actorOf(classOf[FiveOhOhTestReceiveTimeoutActor]).start
-
- assert(timeoutActor.actorRef.actor.asInstanceOf[TestReceiveTimeoutActor].latch.tryAwait(3, TimeUnit.SECONDS))
-
- val swappedLatch = new StandardLatch
- timeoutActor sendOneWay HotSwap(Some{
- case ReceiveTimeout => swappedLatch.open
- })
-
- assert(swappedLatch.tryAwait(3, TimeUnit.SECONDS))
- }
-
- @Test def timeoutShouldBeCancelledAfterRegularReceive = {
- val timeoutActor = actorOf(classOf[FiveOhOhTestReceiveTimeoutActor]).start
- timeoutActor sendOneWay Tick
- assert(timeoutActor.actorRef.actor.asInstanceOf[TestReceiveTimeoutActor].latch.tryAwait(1, TimeUnit.SECONDS) == false)
- }
-
- @Test def timeoutShouldNotBeSentWhenNotSpecified = {
- val timeoutLatch = new StandardLatch
- val timeoutActor = actorOf(classOf[TestReceiveTimeoutActor]).start
- assert(timeoutLatch.tryAwait(2, TimeUnit.SECONDS) == false)
- }
-}
diff --git a/akka-core/src/test/scala/actor/untyped-actor/UntypedTransactorSpec.scala b/akka-core/src/test/scala/actor/untyped-actor/UntypedTransactorSpec.scala
deleted file mode 100644
index 3ec707119c..0000000000
--- a/akka-core/src/test/scala/actor/untyped-actor/UntypedTransactorSpec.scala
+++ /dev/null
@@ -1,239 +0,0 @@
-package se.scalablesolutions.akka.actor
-
-import java.util.concurrent.{TimeUnit, CountDownLatch}
-import org.scalatest.junit.JUnitSuite
-import org.junit.Test
-
-import se.scalablesolutions.akka.stm.{Ref, TransactionalMap, TransactionalVector}
-import UntypedActor._
-
-object UntypedTransactorSpec {
- case class GetMapState(key: String)
- case object GetVectorState
- case object GetVectorSize
- case object GetRefState
-
- case class SetMapState(key: String, value: String)
- case class SetVectorState(key: String)
- case class SetRefState(key: String)
- case class Success(key: String, value: String)
- case class Failure(key: String, value: String, failer: UntypedActorRef)
-
- case class SetMapStateOneWay(key: String, value: String)
- case class SetVectorStateOneWay(key: String)
- case class SetRefStateOneWay(key: String)
- case class SuccessOneWay(key: String, value: String)
- case class FailureOneWay(key: String, value: String, failer: UntypedActorRef)
-
- case object GetNotifier
-}
-import UntypedTransactorSpec._
-
-class StatefulUntypedTransactor(expectedInvocationCount: Int) extends UntypedTransactor {
- def this() = this(0)
- getContext.setTimeout(5000)
-
- val notifier = new CountDownLatch(expectedInvocationCount)
-
- private val mapState = TransactionalMap[String, String]()
- private val vectorState = TransactionalVector[String]()
- private val refState = Ref[String]()
-
- def onReceive(message: Any) = message match {
- case GetNotifier =>
- getContext.replyUnsafe(notifier)
- case GetMapState(key) =>
- getContext.replyUnsafe(mapState.get(key).get)
- notifier.countDown
- case GetVectorSize =>
- getContext.replyUnsafe(vectorState.length.asInstanceOf[AnyRef])
- notifier.countDown
- case GetRefState =>
- getContext.replyUnsafe(refState.get)
- notifier.countDown
- case SetMapState(key, msg) =>
- mapState.put(key, msg)
- getContext.replyUnsafe(msg)
- notifier.countDown
- case SetVectorState(msg) =>
- vectorState.add(msg)
- getContext.replyUnsafe(msg)
- notifier.countDown
- case SetRefState(msg) =>
- refState.swap(msg)
- getContext.replyUnsafe(msg)
- notifier.countDown
- case Success(key, msg) =>
- mapState.put(key, msg)
- vectorState.add(msg)
- refState.swap(msg)
- getContext.replyUnsafe(msg)
- notifier.countDown
- case Failure(key, msg, failer) =>
- mapState.put(key, msg)
- vectorState.add(msg)
- refState.swap(msg)
- failer.sendRequestReply("Failure")
- getContext.replyUnsafe(msg)
- notifier.countDown
- case SetMapStateOneWay(key, msg) =>
- mapState.put(key, msg)
- notifier.countDown
- case SetVectorStateOneWay(msg) =>
- vectorState.add(msg)
- notifier.countDown
- case SetRefStateOneWay(msg) =>
- refState.swap(msg)
- notifier.countDown
- case SuccessOneWay(key, msg) =>
- mapState.put(key, msg)
- vectorState.add(msg)
- refState.swap(msg)
- notifier.countDown
- case FailureOneWay(key, msg, failer) =>
- mapState.put(key, msg)
- vectorState.add(msg)
- refState.swap(msg)
- notifier.countDown
- failer.sendOneWay("Failure",getContext)
- }
-}
-
-class StatefulUntypedTransactorExpectingTwoInvocations extends StatefulUntypedTransactor(2)
-
-@serializable
-class FailerUntypedTransactor extends UntypedTransactor {
-
- def onReceive(message: Any) = message match {
- case "Failure" =>
- throw new RuntimeException("Expected exception; to test fault-tolerance")
- }
-}
-
-class UntypedTransactorSpec extends JUnitSuite {
-
- @Test
- def shouldOneWayMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
- val stateful = actorOf(classOf[StatefulUntypedTransactorExpectingTwoInvocations]).start
- stateful sendOneWay SetMapStateOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init") // set init state
- stateful sendOneWay SuccessOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
- val notifier = (stateful sendRequestReply GetNotifier).asInstanceOf[CountDownLatch]
- assert(notifier.await(1, TimeUnit.SECONDS))
- assert("new state" === (stateful sendRequestReply GetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")))
- }
-
- @Test
- def shouldMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
- val stateful = actorOf(classOf[StatefulUntypedTransactor]).start
- stateful sendRequestReply SetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init") // set init state
- stateful sendRequestReply Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
- assert("new state" === (stateful sendRequestReply GetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")))
- }
-
- @Test
- def shouldOneWayMapShouldRollbackStateForStatefulServerInCaseOfFailure = {
- val stateful = actorOf(classOf[StatefulUntypedTransactorExpectingTwoInvocations]).start
- val failer = actorOf(classOf[FailerUntypedTransactor]).start
- stateful sendOneWay SetMapStateOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init") // set init state
- stateful sendOneWay FailureOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
- val notifier = (stateful sendRequestReply GetNotifier).asInstanceOf[CountDownLatch]
- assert(notifier.await(5, TimeUnit.SECONDS))
- assert("init" === (stateful sendRequestReply GetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure"))) // check that state is == init state
- }
-
- @Test
- def shouldMapShouldRollbackStateForStatefulServerInCaseOfFailure = {
- val stateful = actorOf(classOf[StatefulUntypedTransactor]).start
- stateful sendRequestReply SetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init") // set init state
- val failer = actorOf(classOf[FailerUntypedTransactor]).start
- try {
- stateful sendRequestReply Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
- fail("should have thrown an exception")
- } catch {case e: RuntimeException => {}}
- assert("init" === (stateful sendRequestReply GetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure"))) // check that state is == init state
- }
-
- @Test
- def shouldOneWayVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
- val stateful = actorOf(classOf[StatefulUntypedTransactorExpectingTwoInvocations]).start
- stateful sendOneWay SetVectorStateOneWay("init") // set init state
- stateful sendOneWay SuccessOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
- val notifier = (stateful sendRequestReply GetNotifier).asInstanceOf[CountDownLatch]
- assert(notifier.await(1, TimeUnit.SECONDS))
- assert(2 === (stateful sendRequestReply GetVectorSize))
- }
-
- @Test
- def shouldVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
- val stateful = actorOf(classOf[StatefulUntypedTransactor]).start
- stateful sendRequestReply SetVectorState("init") // set init state
- stateful sendRequestReply Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
- assert(2 === (stateful sendRequestReply GetVectorSize))
- }
-
- @Test
- def shouldOneWayVectorShouldRollbackStateForStatefulServerInCaseOfFailure = {
- val stateful = actorOf(classOf[StatefulUntypedTransactorExpectingTwoInvocations]).start
- stateful sendOneWay SetVectorStateOneWay("init") // set init state
- Thread.sleep(1000)
- val failer = actorOf(classOf[FailerUntypedTransactor]).start
- stateful sendOneWay FailureOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
- val notifier = (stateful sendRequestReply GetNotifier).asInstanceOf[CountDownLatch]
- assert(notifier.await(1, TimeUnit.SECONDS))
- assert(1 === (stateful sendRequestReply GetVectorSize))
- }
-
- @Test
- def shouldVectorShouldRollbackStateForStatefulServerInCaseOfFailure = {
- val stateful = actorOf(classOf[StatefulUntypedTransactor]).start
- stateful sendRequestReply SetVectorState("init") // set init state
- val failer = actorOf(classOf[FailerUntypedTransactor]).start
- try {
- stateful sendRequestReply Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
- fail("should have thrown an exception")
- } catch {case e: RuntimeException => {}}
- assert(1 === (stateful sendRequestReply GetVectorSize))
- }
-
- @Test
- def shouldOneWayRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
- val stateful = actorOf(classOf[StatefulUntypedTransactorExpectingTwoInvocations]).start
- stateful sendOneWay SetRefStateOneWay("init") // set init state
- stateful sendOneWay SuccessOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
- val notifier = (stateful sendRequestReply GetNotifier).asInstanceOf[CountDownLatch]
- assert(notifier.await(1, TimeUnit.SECONDS))
- assert("new state" === (stateful sendRequestReply GetRefState))
- }
-
- @Test
- def shouldRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
- val stateful = actorOf(classOf[StatefulUntypedTransactor]).start
- stateful sendRequestReply SetRefState("init") // set init state
- stateful sendRequestReply Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
- assert("new state" === (stateful sendRequestReply GetRefState))
- }
-
- @Test
- def shouldOneWayRefShouldRollbackStateForStatefulServerInCaseOfFailure = {
- val stateful = actorOf(classOf[StatefulUntypedTransactorExpectingTwoInvocations]).start
- stateful sendOneWay SetRefStateOneWay("init") // set init state
- Thread.sleep(1000)
- val failer = actorOf(classOf[FailerUntypedTransactor]).start
- stateful sendOneWay FailureOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
- val notifier = (stateful sendRequestReply GetNotifier).asInstanceOf[CountDownLatch]
- assert(notifier.await(1, TimeUnit.SECONDS))
- assert("init" === (stateful sendRequestReply (GetRefState, 1000000))) // check that state is == init state
- }
-
- @Test
- def shouldRefShouldRollbackStateForStatefulServerInCaseOfFailure = {
- val stateful = actorOf(classOf[StatefulUntypedTransactor]).start
- stateful sendRequestReply SetRefState("init") // set init state
- val failer = actorOf(classOf[FailerUntypedTransactor]).start
- try {
- stateful sendRequestReply Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
- fail("should have thrown an exception")
- } catch {case e: RuntimeException => {}}
- assert("init" === (stateful sendRequestReply GetRefState)) // check that state is == init state
- }
-}
\ No newline at end of file
diff --git a/akka-spring/src/main/scala/ActorFactoryBean.scala b/akka-spring/src/main/scala/ActorFactoryBean.scala
index c0e63397e7..f01f8a4880 100644
--- a/akka-spring/src/main/scala/ActorFactoryBean.scala
+++ b/akka-spring/src/main/scala/ActorFactoryBean.scala
@@ -16,7 +16,7 @@ import org.springframework.context.{ApplicationContext,ApplicationContextAware}
import org.springframework.util.ReflectionUtils
import org.springframework.util.StringUtils
-import se.scalablesolutions.akka.actor.{ActorRef, AspectInitRegistry, TypedActorConfiguration, TypedActor, UntypedActor}
+import se.scalablesolutions.akka.actor.{ActorRef, AspectInitRegistry, TypedActorConfiguration, TypedActor,Actor}
import se.scalablesolutions.akka.dispatch.MessageDispatcher
import se.scalablesolutions.akka.util.{Logging, Duration}
import scala.reflect.BeanProperty
@@ -101,7 +101,7 @@ class ActorFactoryBean extends AbstractFactoryBean[AnyRef] with Logging with App
private[akka] def createUntypedInstance() : ActorRef = {
if (implementation == null || implementation == "") throw new AkkaBeansException(
"The 'implementation' part of the 'akka:untyped-actor' element in the Spring config file can't be null or empty string")
- val actorRef = UntypedActor.actorOf(implementation.toClass)
+ val actorRef = Actor.actorOf(implementation.toClass)
if (timeout > 0) {
actorRef.setTimeout(timeout)
}
diff --git a/akka-spring/src/main/scala/SupervisionFactoryBean.scala b/akka-spring/src/main/scala/SupervisionFactoryBean.scala
index 8a58de8828..39d927a16c 100644
--- a/akka-spring/src/main/scala/SupervisionFactoryBean.scala
+++ b/akka-spring/src/main/scala/SupervisionFactoryBean.scala
@@ -7,7 +7,7 @@ import org.springframework.beans.factory.config.AbstractFactoryBean
import se.scalablesolutions.akka.config.TypedActorConfigurator
import se.scalablesolutions.akka.config.JavaConfig._
import se.scalablesolutions.akka.config.ScalaConfig.{Supervise, Server, SupervisorConfig, RemoteAddress => SRemoteAddress}
-import se.scalablesolutions.akka.actor.{Supervisor, SupervisorFactory, UntypedActor}
+import se.scalablesolutions.akka.actor.{Supervisor, SupervisorFactory, Actor}
import AkkaSpringConfigurationTags._
import reflect.BeanProperty
@@ -82,7 +82,7 @@ class SupervisionFactoryBean extends AbstractFactoryBean[AnyRef] {
import StringReflect._
val lifeCycle = if (!props.lifecycle.isEmpty && props.lifecycle.equalsIgnoreCase(VAL_LIFECYCYLE_TEMPORARY)) new LifeCycle(new Temporary()) else new LifeCycle(new Permanent())
val isRemote = (props.host != null) && (!props.host.isEmpty)
- val actorRef = UntypedActor.actorOf(props.target.toClass)
+ val actorRef = Actor.actorOf(props.target.toClass)
if (props.timeout > 0) {
actorRef.setTimeout(props.timeout)
}
diff --git a/akka-spring/src/test/java/se/scalablesolutions/akka/spring/foo/PingActor.java b/akka-spring/src/test/java/se/scalablesolutions/akka/spring/foo/PingActor.java
index c624d63ecd..98291788e1 100644
--- a/akka-spring/src/test/java/se/scalablesolutions/akka/spring/foo/PingActor.java
+++ b/akka-spring/src/test/java/se/scalablesolutions/akka/spring/foo/PingActor.java
@@ -1,7 +1,7 @@
package se.scalablesolutions.akka.spring.foo;
import se.scalablesolutions.akka.actor.UntypedActor;
-import se.scalablesolutions.akka.actor.UntypedActorRef;
+import se.scalablesolutions.akka.actor.ActorRef;
/**
* test class
@@ -21,7 +21,7 @@ public class PingActor extends UntypedActor {
System.out.println("Ping received String message: " + message);
if (message.equals("longRunning")) {
System.out.println("### starting pong");
- UntypedActorRef pongActor = UntypedActor.actorOf(PongActor.class).start();
+ ActorRef pongActor = UntypedActor.actorOf(PongActor.class).start();
pongActor.sendRequestReply("longRunning", getContext());
}
} else {
diff --git a/akka-spring/src/test/scala/DispatcherSpringFeatureTest.scala b/akka-spring/src/test/scala/DispatcherSpringFeatureTest.scala
index 18dea2abb0..9fbc2c800e 100644
--- a/akka-spring/src/test/scala/DispatcherSpringFeatureTest.scala
+++ b/akka-spring/src/test/scala/DispatcherSpringFeatureTest.scala
@@ -6,7 +6,7 @@ package se.scalablesolutions.akka.spring
import foo.{IMyPojo, MyPojo, PingActor}
import se.scalablesolutions.akka.dispatch._
-import se.scalablesolutions.akka.actor.UntypedActorRef
+import se.scalablesolutions.akka.actor.ActorRef
import org.scalatest.FeatureSpec
import org.scalatest.matchers.ShouldMatchers
@@ -120,7 +120,7 @@ class DispatcherSpringFeatureTest extends FeatureSpec with ShouldMatchers {
scenario("get a thread-based-dispatcher for untyped from context") {
val context = new ClassPathXmlApplicationContext("/dispatcher-config.xml")
- val actorRef = context.getBean("untyped-actor-with-thread-based-dispatcher").asInstanceOf[UntypedActorRef]
+ val actorRef = context.getBean("untyped-actor-with-thread-based-dispatcher").asInstanceOf[ActorRef]
assert(actorRef.getActorClassName() === "se.scalablesolutions.akka.spring.foo.PingActor")
actorRef.start()
actorRef.sendOneWay("Hello")
diff --git a/akka-spring/src/test/scala/UntypedActorSpringFeatureTest.scala b/akka-spring/src/test/scala/UntypedActorSpringFeatureTest.scala
index 677a671d53..cf7d8d9805 100644
--- a/akka-spring/src/test/scala/UntypedActorSpringFeatureTest.scala
+++ b/akka-spring/src/test/scala/UntypedActorSpringFeatureTest.scala
@@ -7,7 +7,7 @@ package se.scalablesolutions.akka.spring
import foo.PingActor
import se.scalablesolutions.akka.dispatch.ExecutorBasedEventDrivenWorkStealingDispatcher
import se.scalablesolutions.akka.remote.RemoteNode
-import se.scalablesolutions.akka.actor.UntypedActorRef
+import se.scalablesolutions.akka.actor.ActorRef
import org.scalatest.FeatureSpec
import org.scalatest.matchers.ShouldMatchers
import org.scalatest.junit.JUnitRunner
@@ -26,16 +26,16 @@ class UntypedActorSpringFeatureTest extends FeatureSpec with ShouldMatchers {
scenario("get a untyped actor") {
val context = new ClassPathXmlApplicationContext("/untyped-actor-config.xml")
- val myactor = context.getBean("simple-untyped-actor").asInstanceOf[UntypedActorRef]
+ val myactor = context.getBean("simple-untyped-actor").asInstanceOf[ActorRef]
assert(myactor.getActorClassName() === "se.scalablesolutions.akka.spring.foo.PingActor")
myactor.start()
myactor.sendOneWay("Hello")
- assert(myactor.actorRef.isDefinedAt("some string message"))
+ assert(myactor.isDefinedAt("some string message"))
}
scenario("untyped-actor with timeout") {
val context = new ClassPathXmlApplicationContext("/untyped-actor-config.xml")
- val myactor = context.getBean("simple-untyped-actor-long-timeout").asInstanceOf[UntypedActorRef]
+ val myactor = context.getBean("simple-untyped-actor-long-timeout").asInstanceOf[ActorRef]
assert(myactor.getActorClassName() === "se.scalablesolutions.akka.spring.foo.PingActor")
myactor.start()
myactor.sendOneWay("Hello")
@@ -44,22 +44,22 @@ class UntypedActorSpringFeatureTest extends FeatureSpec with ShouldMatchers {
scenario("transactional untyped-actor") {
val context = new ClassPathXmlApplicationContext("/untyped-actor-config.xml")
- val myactor = context.getBean("transactional-untyped-actor").asInstanceOf[UntypedActorRef]
+ val myactor = context.getBean("transactional-untyped-actor").asInstanceOf[ActorRef]
assert(myactor.getActorClassName() === "se.scalablesolutions.akka.spring.foo.PingActor")
myactor.start()
myactor.sendOneWay("Hello")
- assert(myactor.actorRef.isDefinedAt("some string message"))
+ assert(myactor.isDefinedAt("some string message"))
}
scenario("get a remote typed-actor") {
RemoteNode.start
Thread.sleep(1000)
val context = new ClassPathXmlApplicationContext("/untyped-actor-config.xml")
- val myactor = context.getBean("remote-untyped-actor").asInstanceOf[UntypedActorRef]
+ val myactor = context.getBean("remote-untyped-actor").asInstanceOf[ActorRef]
assert(myactor.getActorClassName() === "se.scalablesolutions.akka.spring.foo.PingActor")
myactor.start()
myactor.sendOneWay("Hello")
- assert(myactor.actorRef.isDefinedAt("some string message"))
+ assert(myactor.isDefinedAt("some string message"))
assert(myactor.getRemoteAddress().isDefined)
assert(myactor.getRemoteAddress().get.getHostName() === "localhost")
assert(myactor.getRemoteAddress().get.getPort() === 9999)
@@ -67,7 +67,7 @@ class UntypedActorSpringFeatureTest extends FeatureSpec with ShouldMatchers {
scenario("untyped-actor with custom dispatcher") {
val context = new ClassPathXmlApplicationContext("/untyped-actor-config.xml")
- val myactor = context.getBean("untyped-actor-with-dispatcher").asInstanceOf[UntypedActorRef]
+ val myactor = context.getBean("untyped-actor-with-dispatcher").asInstanceOf[ActorRef]
assert(myactor.getActorClassName() === "se.scalablesolutions.akka.spring.foo.PingActor")
myactor.start()
myactor.sendOneWay("Hello")