Holy crap, it actually works!
This commit is contained in:
parent
1e9883d6be
commit
c293420003
18 changed files with 313 additions and 863 deletions
|
|
@ -22,7 +22,7 @@ import scala.reflect.BeanProperty
|
|||
|
||||
import CamelMessageConversion.toExchangeAdapter
|
||||
import java.lang.Throwable
|
||||
import se.scalablesolutions.akka.actor.{ActorRegistry, Actor, ActorRef}
|
||||
import se.scalablesolutions.akka.actor.{ScalaActorRef, ActorRegistry, Actor, ActorRef}
|
||||
|
||||
/**
|
||||
* Camel component for sending messages to and receiving replies from actors.
|
||||
|
|
@ -197,7 +197,7 @@ private[akka] object AsyncCallbackAdapter {
|
|||
*
|
||||
* @author Martin Krasser
|
||||
*/
|
||||
private[akka] class AsyncCallbackAdapter(exchange: Exchange, callback: AsyncCallback) extends ActorRef {
|
||||
private[akka] class AsyncCallbackAdapter(exchange: Exchange, callback: AsyncCallback) extends ActorRef with ScalaActorRef {
|
||||
|
||||
def start = {
|
||||
_isRunning = true
|
||||
|
|
|
|||
|
|
@ -104,7 +104,24 @@ object Actor extends Logging {
|
|||
* val actor = actorOf[MyActor].start
|
||||
* </pre>
|
||||
*/
|
||||
def actorOf[T <: Actor : Manifest]: ActorRef = new LocalActorRef(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]])
|
||||
def actorOf[T <: Actor : Manifest]: ActorRef = actorOf(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]])
|
||||
|
||||
/**
|
||||
* Creates an ActorRef out of the Actor with type T.
|
||||
* <pre>
|
||||
* import Actor._
|
||||
* val actor = actorOf[MyActor]
|
||||
* actor.start
|
||||
* actor ! message
|
||||
* actor.stop
|
||||
* </pre>
|
||||
* You can create and start the actor in one statement like this:
|
||||
* <pre>
|
||||
* val actor = actorOf[MyActor].start
|
||||
* </pre>
|
||||
*/
|
||||
def actorOf(clazz: Class[_ <: Actor]): ActorRef = new LocalActorRef(clazz)
|
||||
|
||||
|
||||
/**
|
||||
* Creates an ActorRef out of the Actor. Allows you to pass in a factory function
|
||||
|
|
@ -365,7 +382,7 @@ trait Actor extends Logging {
|
|||
* self.stop(..)
|
||||
* </pre>
|
||||
*/
|
||||
@transient val self: ActorRef = someSelf.get
|
||||
@transient val self: ScalaActorRef = someSelf.get
|
||||
|
||||
/**
|
||||
* User overridable callback/setting.
|
||||
|
|
|
|||
|
|
@ -65,7 +65,8 @@ import java.util.concurrent.{ScheduledFuture, ConcurrentHashMap, TimeUnit}
|
|||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
trait ActorRef extends TransactionManagement with java.lang.Comparable[ActorRef] {
|
||||
trait ActorRef extends ActorRefShared with TransactionManagement with java.lang.Comparable[ActorRef] {
|
||||
scalaRef: ScalaActorRef =>
|
||||
|
||||
// Only mutable for RemoteServer in order to maintain identity across nodes
|
||||
@volatile protected[akka] var _uuid = UUID.newUuid.toString
|
||||
|
|
@ -110,64 +111,14 @@ trait ActorRef extends TransactionManagement with java.lang.Comparable[ActorRef]
|
|||
def setReceiveTimeout(timeout: Long) = this.receiveTimeout = Some(timeout)
|
||||
def getReceiveTimeout(): Option[Long] = receiveTimeout
|
||||
|
||||
/**
|
||||
* User overridable callback/setting.
|
||||
*
|
||||
* <p/>
|
||||
* Set trapExit to the list of exception classes that the actor should be able to trap
|
||||
* from the actor it is supervising. When the supervising actor throws these exceptions
|
||||
* then they will trigger a restart.
|
||||
* <p/>
|
||||
*
|
||||
* Trap no exceptions:
|
||||
* <pre>
|
||||
* trapExit = Nil
|
||||
* </pre>
|
||||
*
|
||||
* Trap all exceptions:
|
||||
* <pre>
|
||||
* trapExit = List(classOf[Throwable])
|
||||
* </pre>
|
||||
*
|
||||
* Trap specific exceptions only:
|
||||
* <pre>
|
||||
* trapExit = List(classOf[MyApplicationException], classOf[MyApplicationError])
|
||||
* </pre>
|
||||
*/
|
||||
@volatile var trapExit: List[Class[_ <: Throwable]] = Nil
|
||||
|
||||
//Java methods
|
||||
def setTrapExit(exceptions: Array[Class[_ <: Throwable]]) = trapExit = exceptions.toList
|
||||
def getTrapExit(): Array[Class[_ <: Throwable]] = trapExit.toArray
|
||||
|
||||
|
||||
/**
|
||||
* User overridable callback/setting.
|
||||
* <p/>
|
||||
* If 'trapExit' is set for the actor to act as supervisor, then a faultHandler must be defined.
|
||||
* <p/>
|
||||
* Can be one of:
|
||||
* <pre>
|
||||
* faultHandler = Some(AllForOneStrategy(maxNrOfRetries, withinTimeRange))
|
||||
* </pre>
|
||||
* Or:
|
||||
* <pre>
|
||||
* faultHandler = Some(OneForOneStrategy(maxNrOfRetries, withinTimeRange))
|
||||
* </pre>
|
||||
*/
|
||||
@volatile var faultHandler: Option[FaultHandlingStrategy] = None
|
||||
|
||||
//Java Methods
|
||||
def setFaultHandler(handler: FaultHandlingStrategy) = this.faultHandler = Some(handler)
|
||||
def getFaultHandler(): Option[FaultHandlingStrategy] = faultHandler
|
||||
|
||||
/**
|
||||
* User overridable callback/setting.
|
||||
* <p/>
|
||||
* Defines the life-cycle for a supervised actor.
|
||||
*/
|
||||
@volatile var lifeCycle: Option[LifeCycle] = None
|
||||
|
||||
//Java Methods
|
||||
def setLifeCycle(lifeCycle: LifeCycle) = this.lifeCycle = Some(lifeCycle)
|
||||
def getLifeCycle(): Option[LifeCycle] = lifeCycle
|
||||
|
|
@ -235,19 +186,9 @@ trait ActorRef extends TransactionManagement with java.lang.Comparable[ActorRef]
|
|||
/**
|
||||
* Returns the uuid for the actor.
|
||||
*/
|
||||
def getUuid() = _uuid
|
||||
def uuid = _uuid
|
||||
|
||||
/**
|
||||
* The reference sender Actor of the last received message.
|
||||
* Is defined if the message was sent from another Actor, else None.
|
||||
*/
|
||||
def sender: Option[ActorRef] = {
|
||||
// Five lines of map-performance-avoidance, could be just: currentMessage map { _.sender }
|
||||
val msg = currentMessage
|
||||
if(msg.isEmpty) None
|
||||
else msg.get.sender
|
||||
}
|
||||
|
||||
//Java Methods
|
||||
def getSender(): Option[ActorRef] = sender
|
||||
|
||||
|
|
@ -255,12 +196,7 @@ trait ActorRef extends TransactionManagement with java.lang.Comparable[ActorRef]
|
|||
* The reference sender future of the last received message.
|
||||
* Is defined if the message was sent with sent with '!!' or '!!!', else None.
|
||||
*/
|
||||
def senderFuture(): Option[CompletableFuture[Any]] = {
|
||||
// Five lines of map-performance-avoidance, could be just: currentMessage map { _.senderFuture }
|
||||
val msg = currentMessage
|
||||
if(msg.isEmpty) None
|
||||
else msg.get.senderFuture
|
||||
}
|
||||
def getSenderFuture(): Option[CompletableFuture[Any]] = senderFuture
|
||||
|
||||
/**
|
||||
* Is the actor being restarted?
|
||||
|
|
@ -287,62 +223,10 @@ trait ActorRef extends TransactionManagement with java.lang.Comparable[ActorRef]
|
|||
*/
|
||||
protected[akka] def uuid_=(uid: String) = _uuid = uid
|
||||
|
||||
/**
|
||||
* Sends a one-way asynchronous message. E.g. fire-and-forget semantics.
|
||||
* <p/>
|
||||
*
|
||||
* If invoked from within an actor then the actor reference is implicitly passed on as the implicit 'sender' argument.
|
||||
* <p/>
|
||||
*
|
||||
* This actor 'sender' reference is then available in the receiving actor in the 'sender' member variable,
|
||||
* if invoked from within an Actor. If not then no sender is available.
|
||||
* <pre>
|
||||
* actor ! message
|
||||
* </pre>
|
||||
* <p/>
|
||||
*/
|
||||
def !(message: Any)(implicit sender: Option[ActorRef] = None): Unit = {
|
||||
if (isRunning) postMessageToMailbox(message, sender)
|
||||
else throw new ActorInitializationException(
|
||||
"Actor has not been started, you need to invoke 'actor.start' before using it")
|
||||
}
|
||||
|
||||
//Java Methods
|
||||
def sendOneWay(message: AnyRef): Unit = sendOneWay(message,null)
|
||||
def sendOneWay(message: AnyRef, sender: ActorRef): Unit = this.!(message)(Option(sender))
|
||||
|
||||
/**
|
||||
* Sends a message asynchronously and waits on a future for a reply message.
|
||||
* <p/>
|
||||
* It waits on the reply either until it receives it (in the form of <code>Some(replyMessage)</code>)
|
||||
* or until the timeout expires (which will return None). E.g. send-and-receive-eventually semantics.
|
||||
* <p/>
|
||||
* <b>NOTE:</b>
|
||||
* Use this method with care. In most cases it is better to use '!' together with the 'sender' member field to
|
||||
* implement request/response message exchanges.
|
||||
* If you are sending messages using <code>!!</code> then you <b>have to</b> use <code>self.reply(..)</code>
|
||||
* to send a reply message to the original sender. If not then the sender will block until the timeout expires.
|
||||
*/
|
||||
def !!(message: Any, timeout: Long = this.timeout)(implicit sender: Option[ActorRef] = None): Option[Any] = {
|
||||
if (isRunning) {
|
||||
val future = postMessageToMailboxAndCreateFutureResultWithTimeout[Any](message, timeout, sender, None)
|
||||
val isTypedActor = message.isInstanceOf[Invocation]
|
||||
if (isTypedActor && message.asInstanceOf[Invocation].isVoid) {
|
||||
future.asInstanceOf[CompletableFuture[Option[_]]].completeWithResult(None)
|
||||
}
|
||||
try {
|
||||
future.await
|
||||
} catch {
|
||||
case e: FutureTimeoutException =>
|
||||
if (isTypedActor) throw e
|
||||
else None
|
||||
}
|
||||
if (future.exception.isDefined) throw future.exception.get
|
||||
else future.result
|
||||
} else throw new ActorInitializationException(
|
||||
"Actor has not been started, you need to invoke 'actor.start' before using it")
|
||||
}
|
||||
|
||||
//Java Methods
|
||||
def sendRequestReply(message: AnyRef): AnyRef = sendRequestReply(message,timeout,null)
|
||||
def sendRequestReply(message: AnyRef, sender: ActorRef): AnyRef = sendRequestReply(message,timeout,sender)
|
||||
|
|
@ -356,78 +240,20 @@ trait ActorRef extends TransactionManagement with java.lang.Comparable[ActorRef]
|
|||
.asInstanceOf[AnyRef]
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends a message asynchronously returns a future holding the eventual reply message.
|
||||
* <p/>
|
||||
* <b>NOTE:</b>
|
||||
* Use this method with care. In most cases it is better to use '!' together with the 'sender' member field to
|
||||
* implement request/response message exchanges.
|
||||
* If you are sending messages using <code>!!!</code> then you <b>have to</b> use <code>self.reply(..)</code>
|
||||
* to send a reply message to the original sender. If not then the sender will block until the timeout expires.
|
||||
*/
|
||||
def !!(implicit sender: Option[ActorRef] = None): Future[T] = {
|
||||
if (isRunning) postMessageToMailboxAndCreateFutureResultWithTimeout[T](message, timeout, sender, None)
|
||||
else throw new ActorInitializationException(
|
||||
"Actor has not been started, you need to invoke 'actor.start' before using it")
|
||||
}
|
||||
|
||||
//Java Methods
|
||||
def sendRequestReplyFuture(message: AnyRef): Future[_] = sendRequestReplyFuture(message,timeout,null)
|
||||
def sendRequestReplyFuture(message: AnyRef, sender: ActorRef): Future[_] = sendRequestReplyFuture(message,timeout,sender)
|
||||
def sendRequestReplyFuture(message: AnyRef, timeout: Long, sender: ActorRef): Future[_] = !!!(message,timeout)(Option(sender))
|
||||
|
||||
/**
|
||||
* Forwards the message and passes the original sender actor as the sender.
|
||||
* <p/>
|
||||
* Works with '!', '!!' and '!!!'.
|
||||
*/
|
||||
def forward(message: Any)(implicit sender: Some[ActorRef]) = {
|
||||
if (isRunning) {
|
||||
if (sender.get.senderFuture.isDefined) postMessageToMailboxAndCreateFutureResultWithTimeout(
|
||||
message, timeout, sender.get.sender, sender.get.senderFuture)
|
||||
else if (sender.get.sender.isDefined) postMessageToMailbox(message, Some(sender.get.sender.get))
|
||||
else throw new IllegalActorStateException("Can't forward message when initial sender is not an actor")
|
||||
} else throw new ActorInitializationException("Actor has not been started, you need to invoke 'actor.start' before using it")
|
||||
}
|
||||
|
||||
//Java Methods
|
||||
def forward(message: AnyRef, sender: ActorRef): Unit =
|
||||
if (sender eq null) throw new IllegalArgumentException("The 'sender' argument to 'forward' can't be null")
|
||||
else forward(message)(Some(sender))
|
||||
|
||||
|
||||
/**
|
||||
* Use <code>self.reply(..)</code> to reply with a message to the original sender of the message currently
|
||||
* being processed.
|
||||
* <p/>
|
||||
* Throws an IllegalStateException if unable to determine what to reply to.
|
||||
*/
|
||||
def reply(message: Any) = if(!reply_?(message)) throw new IllegalActorStateException(
|
||||
"\n\tNo sender in scope, can't reply. " +
|
||||
"\n\tYou have probably: " +
|
||||
"\n\t\t1. Sent a message to an Actor from an instance that is NOT an Actor." +
|
||||
"\n\t\t2. Invoked a method on an TypedActor from an instance NOT an TypedActor." +
|
||||
"\n\tElse you might want to use 'reply_?' which returns Boolean(true) if succes and Boolean(false) if no sender in scope")
|
||||
|
||||
//Java Methods
|
||||
def replyUnsafe(message: AnyRef) = reply(message)
|
||||
|
||||
/**
|
||||
* Use <code>reply_?(..)</code> to reply with a message to the original sender of the message currently
|
||||
* being processed.
|
||||
* <p/>
|
||||
* Returns true if reply was sent, and false if unable to determine what to reply to.
|
||||
*/
|
||||
def reply_?(message: Any): Boolean = {
|
||||
if (senderFuture.isDefined) {
|
||||
senderFuture.get completeWithResult message
|
||||
true
|
||||
} else if (sender.isDefined) {
|
||||
sender.get ! message
|
||||
true
|
||||
} else false
|
||||
}
|
||||
|
||||
//Java Methods
|
||||
def replySafe(message: AnyRef): Boolean = reply_?(message)
|
||||
|
||||
|
|
@ -570,39 +396,15 @@ trait ActorRef extends TransactionManagement with java.lang.Comparable[ActorRef]
|
|||
*/
|
||||
def startLinkRemote(actorRef: ActorRef, hostname: String, port: Int): Unit
|
||||
|
||||
/**
|
||||
* Atomically create (from actor class) and start an actor.
|
||||
*/
|
||||
def spawn[T <: Actor : Manifest]: ActorRef =
|
||||
spawn(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]])
|
||||
|
||||
//Java Methods
|
||||
def spawn(clazz: Class[_ <: Actor]): ActorRef
|
||||
|
||||
/**
|
||||
* Atomically create (from actor class), start and make an actor remote.
|
||||
*/
|
||||
def spawnRemote[T <: Actor: Manifest](hostname: String, port: Int): ActorRef =
|
||||
spawnRemote(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]],hostname,port)
|
||||
|
||||
//Java Methods
|
||||
def spawnRemote(clazz: Class[_ <: Actor], hostname: String, port: Int): ActorRef
|
||||
|
||||
/**
|
||||
* Atomically create (from actor class), start and link an actor.
|
||||
*/
|
||||
def spawnLink[T <: Actor: Manifest]: ActorRef =
|
||||
spawnLink(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]])
|
||||
|
||||
//Java Methods
|
||||
def spawnLink(clazz: Class[_ <: Actor]): ActorRef
|
||||
|
||||
/**
|
||||
* Atomically create (from actor class), start, link and make an actor remote.
|
||||
*/
|
||||
def spawnLinkRemote[T <: Actor : Manifest](hostname: String, port: Int): ActorRef =
|
||||
spawnLinkRemote(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]],hostname,port)
|
||||
|
||||
//Java Methods
|
||||
def spawnLinkRemote(clazz: Class[_ <: Actor], hostname: String, port: Int): ActorRef
|
||||
|
||||
|
|
@ -623,12 +425,6 @@ trait ActorRef extends TransactionManagement with java.lang.Comparable[ActorRef]
|
|||
//Java Methods
|
||||
def getSupervisor(): ActorRef = supervisor getOrElse null
|
||||
|
||||
|
||||
/**
|
||||
* Shuts down and removes all linked actors.
|
||||
*/
|
||||
def shutdownLinkedActors(): Unit
|
||||
|
||||
protected[akka] def invoke(messageHandle: MessageInvocation): Unit
|
||||
|
||||
protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit
|
||||
|
|
@ -694,7 +490,7 @@ trait ActorRef extends TransactionManagement with java.lang.Comparable[ActorRef]
|
|||
*/
|
||||
class LocalActorRef private[akka](
|
||||
private[this] var actorFactory: Either[Option[Class[_ <: Actor]], Option[() => Actor]] = Left(None))
|
||||
extends ActorRef {
|
||||
extends ActorRef with ScalaActorRef {
|
||||
|
||||
@volatile private[akka] var _remoteAddress: Option[InetSocketAddress] = None // only mutable to maintain identity across nodes
|
||||
@volatile private[akka] var _linkedActors: Option[ConcurrentHashMap[String, ActorRef]] = None
|
||||
|
|
@ -1386,7 +1182,7 @@ object RemoteActorSystemMessage {
|
|||
private[akka] case class RemoteActorRef private[akka] (
|
||||
uuuid: String, val className: String, val hostname: String, val port: Int, _timeout: Long, loader: Option[ClassLoader])
|
||||
// uuid: String, className: String, hostname: String, port: Int, timeOut: Long, isOnRemoteHost: Boolean) extends ActorRef {
|
||||
extends ActorRef {
|
||||
extends ActorRef with ScalaActorRef {
|
||||
|
||||
_uuid = uuuid
|
||||
timeout = _timeout
|
||||
|
|
@ -1461,3 +1257,249 @@ private[akka] case class RemoteActorRef private[akka] (
|
|||
|
||||
private def unsupported = throw new UnsupportedOperationException("Not supported for RemoteActorRef")
|
||||
}
|
||||
|
||||
/**
|
||||
* This trait represents the common (external) methods for all ActorRefs
|
||||
* Needed because implicit conversions aren't applied when instance imports are used
|
||||
*
|
||||
* i.e.
|
||||
* var self: ScalaActorRef = ...
|
||||
* import self._
|
||||
* //can't call ActorRef methods here unless they are declared in a common
|
||||
* //superclass, which ActorRefShared is.
|
||||
*/
|
||||
trait ActorRefShared {
|
||||
/**
|
||||
* Returns the uuid for the actor.
|
||||
*/
|
||||
def uuid: String
|
||||
|
||||
/**
|
||||
* Shuts down and removes all linked actors.
|
||||
*/
|
||||
def shutdownLinkedActors(): Unit
|
||||
}
|
||||
|
||||
/**
|
||||
* This trait represents the Scala Actor API
|
||||
* There are implicit conversions in ../actor/Implicits.scala
|
||||
* from ActorRef -> ScalaActorRef and back
|
||||
*/
|
||||
trait ScalaActorRef extends ActorRefShared { ref: ActorRef =>
|
||||
|
||||
def id: String
|
||||
def id_=(id: String):Unit
|
||||
|
||||
/**
|
||||
* User overridable callback/setting.
|
||||
* <p/>
|
||||
* Defines the life-cycle for a supervised actor.
|
||||
*/
|
||||
@volatile var lifeCycle: Option[LifeCycle] = None
|
||||
|
||||
/**
|
||||
* User overridable callback/setting.
|
||||
*
|
||||
* <p/>
|
||||
* Set trapExit to the list of exception classes that the actor should be able to trap
|
||||
* from the actor it is supervising. When the supervising actor throws these exceptions
|
||||
* then they will trigger a restart.
|
||||
* <p/>
|
||||
*
|
||||
* Trap no exceptions:
|
||||
* <pre>
|
||||
* trapExit = Nil
|
||||
* </pre>
|
||||
*
|
||||
* Trap all exceptions:
|
||||
* <pre>
|
||||
* trapExit = List(classOf[Throwable])
|
||||
* </pre>
|
||||
*
|
||||
* Trap specific exceptions only:
|
||||
* <pre>
|
||||
* trapExit = List(classOf[MyApplicationException], classOf[MyApplicationError])
|
||||
* </pre>
|
||||
*/
|
||||
@volatile var trapExit: List[Class[_ <: Throwable]] = Nil
|
||||
|
||||
|
||||
/**
|
||||
* User overridable callback/setting.
|
||||
* <p/>
|
||||
* If 'trapExit' is set for the actor to act as supervisor, then a faultHandler must be defined.
|
||||
* <p/>
|
||||
* Can be one of:
|
||||
* <pre>
|
||||
* faultHandler = Some(AllForOneStrategy(maxNrOfRetries, withinTimeRange))
|
||||
* </pre>
|
||||
* Or:
|
||||
* <pre>
|
||||
* faultHandler = Some(OneForOneStrategy(maxNrOfRetries, withinTimeRange))
|
||||
* </pre>
|
||||
*/
|
||||
@volatile var faultHandler: Option[FaultHandlingStrategy] = None
|
||||
|
||||
|
||||
/**
|
||||
* The reference sender Actor of the last received message.
|
||||
* Is defined if the message was sent from another Actor, else None.
|
||||
*/
|
||||
def sender: Option[ActorRef] = {
|
||||
// Five lines of map-performance-avoidance, could be just: currentMessage map { _.sender }
|
||||
val msg = currentMessage
|
||||
if(msg.isEmpty) None
|
||||
else msg.get.sender
|
||||
}
|
||||
|
||||
/**
|
||||
* The reference sender future of the last received message.
|
||||
* Is defined if the message was sent with sent with '!!' or '!!!', else None.
|
||||
*/
|
||||
def senderFuture(): Option[CompletableFuture[Any]] = {
|
||||
// Five lines of map-performance-avoidance, could be just: currentMessage map { _.senderFuture }
|
||||
val msg = currentMessage
|
||||
if(msg.isEmpty) None
|
||||
else msg.get.senderFuture
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Sends a one-way asynchronous message. E.g. fire-and-forget semantics.
|
||||
* <p/>
|
||||
*
|
||||
* If invoked from within an actor then the actor reference is implicitly passed on as the implicit 'sender' argument.
|
||||
* <p/>
|
||||
*
|
||||
* This actor 'sender' reference is then available in the receiving actor in the 'sender' member variable,
|
||||
* if invoked from within an Actor. If not then no sender is available.
|
||||
* <pre>
|
||||
* actor ! message
|
||||
* </pre>
|
||||
* <p/>
|
||||
*/
|
||||
def !(message: Any)(implicit sender: Option[ActorRef] = None): Unit = {
|
||||
if (isRunning) postMessageToMailbox(message, sender)
|
||||
else throw new ActorInitializationException(
|
||||
"Actor has not been started, you need to invoke 'actor.start' before using it")
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends a message asynchronously and waits on a future for a reply message.
|
||||
* <p/>
|
||||
* It waits on the reply either until it receives it (in the form of <code>Some(replyMessage)</code>)
|
||||
* or until the timeout expires (which will return None). E.g. send-and-receive-eventually semantics.
|
||||
* <p/>
|
||||
* <b>NOTE:</b>
|
||||
* Use this method with care. In most cases it is better to use '!' together with the 'sender' member field to
|
||||
* implement request/response message exchanges.
|
||||
* If you are sending messages using <code>!!</code> then you <b>have to</b> use <code>self.reply(..)</code>
|
||||
* to send a reply message to the original sender. If not then the sender will block until the timeout expires.
|
||||
*/
|
||||
def !!(message: Any, timeout: Long = this.timeout)(implicit sender: Option[ActorRef] = None): Option[Any] = {
|
||||
if (isRunning) {
|
||||
val future = postMessageToMailboxAndCreateFutureResultWithTimeout[Any](message, timeout, sender, None)
|
||||
val isTypedActor = message.isInstanceOf[Invocation]
|
||||
if (isTypedActor && message.asInstanceOf[Invocation].isVoid) {
|
||||
future.asInstanceOf[CompletableFuture[Option[_]]].completeWithResult(None)
|
||||
}
|
||||
try {
|
||||
future.await
|
||||
} catch {
|
||||
case e: FutureTimeoutException =>
|
||||
if (isTypedActor) throw e
|
||||
else None
|
||||
}
|
||||
if (future.exception.isDefined) throw future.exception.get
|
||||
else future.result
|
||||
} else throw new ActorInitializationException(
|
||||
"Actor has not been started, you need to invoke 'actor.start' before using it")
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Sends a message asynchronously returns a future holding the eventual reply message.
|
||||
* <p/>
|
||||
* <b>NOTE:</b>
|
||||
* Use this method with care. In most cases it is better to use '!' together with the 'sender' member field to
|
||||
* implement request/response message exchanges.
|
||||
* If you are sending messages using <code>!!!</code> then you <b>have to</b> use <code>self.reply(..)</code>
|
||||
* to send a reply message to the original sender. If not then the sender will block until the timeout expires.
|
||||
*/
|
||||
def !!(implicit sender: Option[ActorRef] = None): Future[T] = {
|
||||
if (isRunning) postMessageToMailboxAndCreateFutureResultWithTimeout[T](message, timeout, sender, None)
|
||||
else throw new ActorInitializationException(
|
||||
"Actor has not been started, you need to invoke 'actor.start' before using it")
|
||||
}
|
||||
|
||||
/**
|
||||
* Forwards the message and passes the original sender actor as the sender.
|
||||
* <p/>
|
||||
* Works with '!', '!!' and '!!!'.
|
||||
*/
|
||||
def forward(message: Any)(implicit sender: Some[ActorRef]) = {
|
||||
if (isRunning) {
|
||||
if (sender.get.senderFuture.isDefined) postMessageToMailboxAndCreateFutureResultWithTimeout(
|
||||
message, timeout, sender.get.sender, sender.get.senderFuture)
|
||||
else if (sender.get.sender.isDefined) postMessageToMailbox(message, Some(sender.get.sender.get))
|
||||
else throw new IllegalActorStateException("Can't forward message when initial sender is not an actor")
|
||||
} else throw new ActorInitializationException("Actor has not been started, you need to invoke 'actor.start' before using it")
|
||||
}
|
||||
|
||||
/**
|
||||
* Use <code>self.reply(..)</code> to reply with a message to the original sender of the message currently
|
||||
* being processed.
|
||||
* <p/>
|
||||
* Throws an IllegalStateException if unable to determine what to reply to.
|
||||
*/
|
||||
def reply(message: Any) = if(!reply_?(message)) throw new IllegalActorStateException(
|
||||
"\n\tNo sender in scope, can't reply. " +
|
||||
"\n\tYou have probably: " +
|
||||
"\n\t\t1. Sent a message to an Actor from an instance that is NOT an Actor." +
|
||||
"\n\t\t2. Invoked a method on an TypedActor from an instance NOT an TypedActor." +
|
||||
"\n\tElse you might want to use 'reply_?' which returns Boolean(true) if succes and Boolean(false) if no sender in scope")
|
||||
|
||||
/**
|
||||
* Use <code>reply_?(..)</code> to reply with a message to the original sender of the message currently
|
||||
* being processed.
|
||||
* <p/>
|
||||
* Returns true if reply was sent, and false if unable to determine what to reply to.
|
||||
*/
|
||||
def reply_?(message: Any): Boolean = {
|
||||
if (senderFuture.isDefined) {
|
||||
senderFuture.get completeWithResult message
|
||||
true
|
||||
} else if (sender.isDefined) {
|
||||
sender.get ! message
|
||||
true
|
||||
} else false
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Atomically create (from actor class) and start an actor.
|
||||
*/
|
||||
def spawn[T <: Actor : Manifest]: ActorRef =
|
||||
spawn(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]])
|
||||
|
||||
/**
|
||||
* Atomically create (from actor class), start and make an actor remote.
|
||||
*/
|
||||
def spawnRemote[T <: Actor: Manifest](hostname: String, port: Int): ActorRef =
|
||||
spawnRemote(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]],hostname,port)
|
||||
|
||||
|
||||
/**
|
||||
* Atomically create (from actor class), start and link an actor.
|
||||
*/
|
||||
def spawnLink[T <: Actor: Manifest]: ActorRef =
|
||||
spawnLink(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]])
|
||||
|
||||
|
||||
/**
|
||||
* Atomically create (from actor class), start, link and make an actor remote.
|
||||
*/
|
||||
def spawnLinkRemote[T <: Actor : Manifest](hostname: String, port: Int): ActorRef =
|
||||
spawnLinkRemote(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]],hostname,port)
|
||||
}
|
||||
15
akka-core/src/main/scala/actor/Implicits.scala
Normal file
15
akka-core/src/main/scala/actor/Implicits.scala
Normal file
|
|
@ -0,0 +1,15 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package se.scalablesolutions.akka
|
||||
|
||||
import actor.{ScalaActorRef, ActorRef}
|
||||
|
||||
package object actor {
|
||||
implicit def actorRef2Scala(ref: ActorRef): ScalaActorRef =
|
||||
ref.asInstanceOf[ScalaActorRef]
|
||||
|
||||
implicit def scala2ActorRef(ref: ScalaActorRef): ActorRef =
|
||||
ref.asInstanceOf[ActorRef]
|
||||
}
|
||||
|
|
@ -118,11 +118,9 @@ abstract class RemoteUntypedActor(address: InetSocketAddress) extends UntypedAct
|
|||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
object UntypedActor {
|
||||
|
||||
/**
|
||||
* Creates an ActorRef out of the Actor. Allows you to pass in the class for the Actor.
|
||||
* <p/>
|
||||
* Example in Java:
|
||||
* Creates an ActorRef out of the Actor type represented by the class provided.
|
||||
* Example in Java:
|
||||
* <pre>
|
||||
* ActorRef actor = UntypedActor.actorOf(MyUntypedActor.class);
|
||||
* actor.start();
|
||||
|
|
@ -131,13 +129,12 @@ object UntypedActor {
|
|||
* </pre>
|
||||
* You can create and start the actor in one statement like this:
|
||||
* <pre>
|
||||
* ActorRef actor = UntypedActor.actorOf(MyUntypedActor.class).start();
|
||||
* val actor = actorOf(classOf[MyActor]).start
|
||||
* </pre>
|
||||
*/
|
||||
def actorOf(clazz: Class[_ <: UntypedActor]): ActorRef =
|
||||
Actor.actorOf(clazz.newInstance)
|
||||
def actorOf[T <: Actor](clazz: Class[T]): ActorRef = Actor.actorOf(clazz)
|
||||
|
||||
/**
|
||||
/**
|
||||
* NOTE: Use this convenience method with care, do NOT make it possible to get a reference to the
|
||||
* UntypedActor instance directly, but only through its 'ActorRef' wrapper reference.
|
||||
* <p/>
|
||||
|
|
@ -148,8 +145,8 @@ object UntypedActor {
|
|||
* Example in Java:
|
||||
* <pre>
|
||||
* ActorRef actor = UntypedActor.actorOf(new UntypedActorFactory() {
|
||||
* public UntypedActor create() {
|
||||
* return new MyUntypedActor("service:name", 5);
|
||||
* public UntypedActor create() {
|
||||
* return new MyUntypedActor("service:name", 5);
|
||||
* }
|
||||
* });
|
||||
* actor.start();
|
||||
|
|
@ -157,6 +154,5 @@ object UntypedActor {
|
|||
* actor.stop();
|
||||
* </pre>
|
||||
*/
|
||||
def actorOf(factory: UntypedActorFactory) =
|
||||
Actor.actorOf(factory.create)
|
||||
def actorOf(factory: UntypedActorFactory): ActorRef = Actor.actorOf(factory.create)
|
||||
}
|
||||
|
|
@ -1,33 +0,0 @@
|
|||
package se.scalablesolutions.akka.actor;
|
||||
|
||||
import se.scalablesolutions.akka.actor.*;
|
||||
import se.scalablesolutions.akka.actor.UntypedActor;
|
||||
|
||||
public class ReplyUntypedActor extends UntypedActor {
|
||||
|
||||
public ReplyUntypedActor() {
|
||||
ActorRef actor = UntypedActor.untypedActorOf(ReplyUntypedActor.class)
|
||||
actor.setId("JavaNinja");
|
||||
actor.start()
|
||||
actor.sendOneWay("ReplyToSendOneWayUsingReply");
|
||||
}
|
||||
|
||||
public void onReceive(Object message) throws Exception {
|
||||
if (message instanceof String) {
|
||||
String str = (String)message;
|
||||
if (str.equals("ReplyToSendOneWayUsingReply")) {
|
||||
getContext().replyUnsafe("Reply");
|
||||
} else if (str.equals("ReplyToSendOneWayUsingSender")) {
|
||||
getContext().getSender().get().sendOneWay("Reply");
|
||||
} else if (str.equals("ReplyToSendRequestReplyUsingReply")) {
|
||||
getContext().replyUnsafe("Reply");
|
||||
} else if (str.equals("ReplyToSendRequestReplyUsingFuture")) {
|
||||
getContext().getSenderFuture().get().completeWithResult("Reply");
|
||||
} else if (str.equals("ReplyToSendRequestReplyFutureUsingReply")) {
|
||||
getContext().replyUnsafe("Reply");
|
||||
} else if (str.equals("ReplyToSendRequestReplyFutureUsingFuture")) {
|
||||
getContext().getSenderFuture().get().completeWithResult("Reply");
|
||||
} else throw new IllegalArgumentException("Unknown message: " + str);
|
||||
} else throw new IllegalArgumentException("Unknown message: " + message);
|
||||
}
|
||||
}
|
||||
|
|
@ -1,53 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
package se.scalablesolutions.akka.actor;
|
||||
|
||||
import se.scalablesolutions.akka.actor.*;
|
||||
|
||||
/**
|
||||
* Here is an example on how to create and use an UntypedActor.
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
public class SampleUntypedActor extends UntypedActor {
|
||||
|
||||
public void onReceive(Object message) throws Exception {
|
||||
if (message instanceof String) {
|
||||
String msg = (String)message;
|
||||
System.out.println("Received message: " + msg);
|
||||
|
||||
if (msg.equals("UseReply")) {
|
||||
// Reply to original sender of message using the 'replyUnsafe' method
|
||||
getContext().replyUnsafe(msg + ":" + getContext().getUuid());
|
||||
|
||||
} else if (msg.equals("UseSender") && getContext().getSender().isDefined()) {
|
||||
// Reply to original sender of message using the sender reference
|
||||
// also passing along my own refererence (the context)
|
||||
getContext().getSender().get().sendOneWay(msg, getContext());
|
||||
|
||||
} else if (msg.equals("UseSenderFuture") && getContext().getSenderFuture().isDefined()) {
|
||||
// Reply to original sender of message using the sender future reference
|
||||
getContext().getSenderFuture().get().completeWithResult(msg);
|
||||
|
||||
} else if (msg.equals("SendToSelf")) {
|
||||
// Send fire-forget message to the actor recursively
|
||||
getContext().sendOneWay(msg);
|
||||
|
||||
} else if (msg.equals("ForwardMessage")) {
|
||||
// Retreive an actor from the ActorRegistry by ID and get an ActorRef back
|
||||
ActorRef actorRef = ActorRegistry.actorsFor("some-actor-id")[0];
|
||||
// Wrap the ActorRef in an UntypedActorRef and forward the message to this actor
|
||||
UntypedActorRef.wrap(actorRef).forward(msg, getContext());
|
||||
|
||||
} else throw new IllegalArgumentException("Unknown message: " + message);
|
||||
} else throw new IllegalArgumentException("Unknown message: " + message);
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
UntypedActorRef actor = UntypedActor.actorOf(SampleUntypedActor.class);
|
||||
actor.start();
|
||||
actor.sendOneWay("SendToSelf");
|
||||
actor.stop();
|
||||
}
|
||||
}
|
||||
|
|
@ -1,44 +0,0 @@
|
|||
package se.scalablesolutions.akka.actor;
|
||||
|
||||
import se.scalablesolutions.akka.actor.*;
|
||||
import se.scalablesolutions.akka.dispatch.CompletableFuture;
|
||||
|
||||
public class SenderUntypedActor extends UntypedActor {
|
||||
private UntypedActorRef replyActor = null;
|
||||
|
||||
public void onReceive(Object message) throws Exception {
|
||||
if (message instanceof UntypedActorRef) replyActor = (UntypedActorRef)message;
|
||||
else if (message instanceof String) {
|
||||
if (replyActor == null) throw new IllegalStateException("Need to receive a ReplyUntypedActor before any other message.");
|
||||
String str = (String)message;
|
||||
|
||||
if (str.equals("ReplyToSendOneWayUsingReply")) {
|
||||
replyActor.sendOneWay("ReplyToSendOneWayUsingReply", getContext());
|
||||
} else if (str.equals("ReplyToSendOneWayUsingSender")) {
|
||||
replyActor.sendOneWay("ReplyToSendOneWayUsingSender", getContext());
|
||||
|
||||
} else if (str.equals("ReplyToSendRequestReplyUsingReply")) {
|
||||
UntypedActorTestState.log = (String)replyActor.sendRequestReply("ReplyToSendRequestReplyUsingReply", getContext());
|
||||
UntypedActorTestState.finished.await();
|
||||
} else if (str.equals("ReplyToSendRequestReplyUsingFuture")) {
|
||||
UntypedActorTestState.log = (String)replyActor.sendRequestReply("ReplyToSendRequestReplyUsingFuture", getContext());
|
||||
UntypedActorTestState.finished.await();
|
||||
|
||||
} else if (str.equals("ReplyToSendRequestReplyFutureUsingReply")) {
|
||||
CompletableFuture future = (CompletableFuture)replyActor.sendRequestReplyFuture("ReplyToSendRequestReplyFutureUsingReply", getContext());
|
||||
future.await();
|
||||
UntypedActorTestState.log = (String)future.result().get();
|
||||
UntypedActorTestState.finished.await();
|
||||
} else if (str.equals("ReplyToSendRequestReplyFutureUsingFuture")) {
|
||||
CompletableFuture future = (CompletableFuture)replyActor.sendRequestReplyFuture("ReplyToSendRequestReplyFutureUsingFuture", getContext());
|
||||
future.await();
|
||||
UntypedActorTestState.log = (String)future.result().get();
|
||||
UntypedActorTestState.finished.await();
|
||||
|
||||
} else if (str.equals("Reply")) {
|
||||
UntypedActorTestState.log = "Reply";
|
||||
UntypedActorTestState.finished.await();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,10 +0,0 @@
|
|||
package se.scalablesolutions.akka.actor;
|
||||
|
||||
import se.scalablesolutions.akka.actor.*;
|
||||
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
|
||||
public class UntypedActorTestState {
|
||||
public static String log = "NIL";
|
||||
public static CyclicBarrier finished = null;
|
||||
}
|
||||
|
|
@ -1,80 +0,0 @@
|
|||
package se.scalablesolutions.akka.actor
|
||||
|
||||
import java.util.concurrent.{TimeUnit, CountDownLatch}
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit.Test
|
||||
|
||||
import UntypedActor._
|
||||
|
||||
object ForwardUntypedActorSpec {
|
||||
object ForwardState {
|
||||
var sender: Option[UntypedActorRef] = None
|
||||
}
|
||||
|
||||
class ReceiverUntypedActor extends UntypedActor {
|
||||
//println(getClass + ":" + toString + " => " + getContext)
|
||||
val latch = new CountDownLatch(1)
|
||||
def onReceive(message: Any){
|
||||
println(getClass.getName + " got " + message)
|
||||
message match {
|
||||
case "SendBang" => {
|
||||
ForwardState.sender = getContext.getSender
|
||||
latch.countDown
|
||||
}
|
||||
case "SendBangBang" => getContext.replyUnsafe("SendBangBang")
|
||||
case x => throw new IllegalArgumentException("Unknown message: " + x);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class ForwardUntypedActor extends UntypedActor {
|
||||
val receiverActor = actorOf(classOf[ReceiverUntypedActor]).start
|
||||
def onReceive(message: Any){
|
||||
message match {
|
||||
case "SendBang" => receiverActor.forward("SendBang",getContext)
|
||||
case "SendBangBang" => receiverActor.forward("SendBangBang",getContext)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class BangSenderUntypedActor extends UntypedActor {
|
||||
val forwardActor = actorOf(classOf[ForwardUntypedActor]).start
|
||||
forwardActor.sendOneWay("SendBang",getContext)
|
||||
def onReceive(message: Any) = ()
|
||||
}
|
||||
|
||||
class BangBangSenderUntypedActor extends UntypedActor {
|
||||
val latch: CountDownLatch = new CountDownLatch(1)
|
||||
val forwardActor = actorOf(classOf[ForwardUntypedActor]).start
|
||||
(forwardActor sendRequestReply "SendBangBang") match {
|
||||
case _ => latch.countDown
|
||||
}
|
||||
def onReceive(message: Any) = ()
|
||||
}
|
||||
}
|
||||
|
||||
class ForwardUntypedActorSpec extends JUnitSuite {
|
||||
import ForwardUntypedActorSpec._
|
||||
|
||||
@Test
|
||||
def shouldForwardUntypedActorReferenceWhenInvokingForwardOnBang {
|
||||
val senderActor = actorOf(classOf[BangSenderUntypedActor])
|
||||
val latch = senderActor.actorRef.actor.asInstanceOf[BangSenderUntypedActor]
|
||||
.forwardActor.actorRef.actor.asInstanceOf[ForwardUntypedActor]
|
||||
.receiverActor.actorRef.actor.asInstanceOf[ReceiverUntypedActor]
|
||||
.latch
|
||||
|
||||
senderActor.start
|
||||
assert(latch.await(5L, TimeUnit.SECONDS))
|
||||
println(senderActor.actorRef.toString + " " + ForwardState.sender.get.actorRef.toString)
|
||||
assert(ForwardState.sender ne null)
|
||||
assert(senderActor.actorRef.toString === ForwardState.sender.get.actorRef.toString)
|
||||
}
|
||||
|
||||
@Test
|
||||
def shouldForwardUntypedActorReferenceWhenInvokingForwardOnBangBang {
|
||||
val senderActor = actorOf(classOf[BangBangSenderUntypedActor]).start
|
||||
val latch = senderActor.actorRef.actor.asInstanceOf[BangBangSenderUntypedActor].latch
|
||||
assert(latch.await(1L, TimeUnit.SECONDS))
|
||||
}
|
||||
}
|
||||
|
|
@ -1,102 +0,0 @@
|
|||
package se.scalablesolutions.akka.actor
|
||||
|
||||
import java.util.concurrent.{TimeUnit, CyclicBarrier, TimeoutException}
|
||||
|
||||
import org.scalatest.WordSpec
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
|
||||
import se.scalablesolutions.akka.dispatch.Dispatchers
|
||||
import Actor._
|
||||
|
||||
class UntypedActorFireForgetRequestReplySpec extends WordSpec with MustMatchers {
|
||||
|
||||
"An UntypedActor" should {
|
||||
"be able to be created using the UntypedActor.actorOf(UntypedActorFactory) factory method" in {
|
||||
UntypedActorTestState.finished = new CyclicBarrier(2);
|
||||
UntypedActorTestState.log = "NIL";
|
||||
val replyActor = UntypedActor.actorOf(new UntypedActorFactory() {
|
||||
def create: UntypedActor = new ReplyUntypedActor
|
||||
}).start
|
||||
val senderActor = UntypedActor.actorOf(new UntypedActorFactory() {
|
||||
def create: UntypedActor = new SenderUntypedActor
|
||||
}).start
|
||||
senderActor.sendOneWay(replyActor)
|
||||
senderActor.sendOneWay("ReplyToSendOneWayUsingReply")
|
||||
try { UntypedActorTestState.finished.await(1L, TimeUnit.SECONDS) }
|
||||
catch { case e: TimeoutException => fail("Never got the message") }
|
||||
UntypedActorTestState.log must be ("Reply")
|
||||
}
|
||||
|
||||
"reply to message sent with 'sendOneWay' using 'reply'" in {
|
||||
UntypedActorTestState.finished = new CyclicBarrier(2);
|
||||
UntypedActorTestState.log = "NIL";
|
||||
val replyActor = UntypedActor.actorOf(classOf[ReplyUntypedActor]).start
|
||||
val senderActor = UntypedActor.actorOf(classOf[SenderUntypedActor]).start
|
||||
senderActor.sendOneWay(replyActor)
|
||||
senderActor.sendOneWay("ReplyToSendOneWayUsingReply")
|
||||
try { UntypedActorTestState.finished.await(1L, TimeUnit.SECONDS) }
|
||||
catch { case e: TimeoutException => fail("Never got the message") }
|
||||
UntypedActorTestState.log must be ("Reply")
|
||||
}
|
||||
|
||||
"reply to message sent with 'sendOneWay' using 'sender' reference" in {
|
||||
UntypedActorTestState.finished = new CyclicBarrier(2);
|
||||
UntypedActorTestState.log = "NIL";
|
||||
val replyActor = UntypedActor.actorOf(classOf[ReplyUntypedActor]).start
|
||||
val senderActor = UntypedActor.actorOf(classOf[SenderUntypedActor]).start
|
||||
senderActor.sendOneWay(replyActor)
|
||||
senderActor.sendOneWay("ReplyToSendOneWayUsingSender")
|
||||
try { UntypedActorTestState.finished.await(1L, TimeUnit.SECONDS) }
|
||||
catch { case e: TimeoutException => fail("Never got the message") }
|
||||
UntypedActorTestState.log must be ("Reply")
|
||||
}
|
||||
|
||||
"reply to message sent with 'sendRequestReply' using 'reply'" in {
|
||||
UntypedActorTestState.finished = new CyclicBarrier(2);
|
||||
UntypedActorTestState.log = "NIL";
|
||||
val replyActor = UntypedActor.actorOf(classOf[ReplyUntypedActor]).start
|
||||
val senderActor = UntypedActor.actorOf(classOf[SenderUntypedActor]).start
|
||||
senderActor.sendOneWay(replyActor)
|
||||
senderActor.sendOneWay("ReplyToSendRequestReplyUsingReply")
|
||||
try { UntypedActorTestState.finished.await(1L, TimeUnit.SECONDS) }
|
||||
catch { case e: TimeoutException => fail("Never got the message") }
|
||||
UntypedActorTestState.log must be ("Reply")
|
||||
}
|
||||
|
||||
"reply to message sent with 'sendRequestReply' using 'sender future' reference" in {
|
||||
UntypedActorTestState.finished = new CyclicBarrier(2);
|
||||
UntypedActorTestState.log = "NIL";
|
||||
val replyActor = UntypedActor.actorOf(classOf[ReplyUntypedActor]).start
|
||||
val senderActor = UntypedActor.actorOf(classOf[SenderUntypedActor]).start
|
||||
senderActor.sendOneWay(replyActor)
|
||||
senderActor.sendOneWay("ReplyToSendRequestReplyUsingFuture")
|
||||
try { UntypedActorTestState.finished.await(1L, TimeUnit.SECONDS) }
|
||||
catch { case e: TimeoutException => fail("Never got the message") }
|
||||
UntypedActorTestState.log must be ("Reply")
|
||||
}
|
||||
|
||||
"reply to message sent with 'sendRequestReplyFuture' using 'reply'" in {
|
||||
UntypedActorTestState.finished = new CyclicBarrier(2);
|
||||
UntypedActorTestState.log = "NIL";
|
||||
val replyActor = UntypedActor.actorOf(classOf[ReplyUntypedActor]).start
|
||||
val senderActor = UntypedActor.actorOf(classOf[SenderUntypedActor]).start
|
||||
senderActor.sendOneWay(replyActor)
|
||||
senderActor.sendOneWay("ReplyToSendRequestReplyFutureUsingReply")
|
||||
try { UntypedActorTestState.finished.await(1L, TimeUnit.SECONDS) }
|
||||
catch { case e: TimeoutException => fail("Never got the message") }
|
||||
UntypedActorTestState.log must be ("Reply")
|
||||
}
|
||||
|
||||
"reply to message sent with 'sendRequestReplyFuture' using 'sender future' reference" in {
|
||||
UntypedActorTestState.finished = new CyclicBarrier(2);
|
||||
UntypedActorTestState.log = "NIL";
|
||||
val replyActor = UntypedActor.actorOf(classOf[ReplyUntypedActor]).start
|
||||
val senderActor = UntypedActor.actorOf(classOf[SenderUntypedActor]).start
|
||||
senderActor.sendOneWay(replyActor)
|
||||
senderActor.sendOneWay("ReplyToSendRequestReplyFutureUsingFuture")
|
||||
try { UntypedActorTestState.finished.await(1L, TimeUnit.SECONDS) }
|
||||
catch { case e: TimeoutException => fail("Never got the message") }
|
||||
UntypedActorTestState.log must be ("Reply")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,59 +0,0 @@
|
|||
package se.scalablesolutions.akka.actor
|
||||
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit.Test
|
||||
|
||||
import java.util.concurrent.TimeUnit
|
||||
import org.multiverse.api.latches.StandardLatch
|
||||
import UntypedActor._
|
||||
|
||||
object UntypedActorReceiveTimeoutSpec {
|
||||
object Tick
|
||||
class TestReceiveTimeoutActor extends UntypedActor {
|
||||
val latch = new StandardLatch
|
||||
|
||||
def onReceive(message: Any):Unit = message match {
|
||||
case ReceiveTimeout => latch.open
|
||||
case Tick =>
|
||||
}
|
||||
}
|
||||
|
||||
class FiveOhOhTestReceiveTimeoutActor extends TestReceiveTimeoutActor {
|
||||
getContext.setReceiveTimeout(500L)
|
||||
}
|
||||
}
|
||||
|
||||
class UntypedActorReceiveTimeoutSpec extends JUnitSuite {
|
||||
import UntypedActorReceiveTimeoutSpec._
|
||||
|
||||
@Test def receiveShouldGetTimeout = {
|
||||
val timeoutActor = actorOf(classOf[FiveOhOhTestReceiveTimeoutActor]).start
|
||||
|
||||
assert(timeoutActor.actorRef.actor.asInstanceOf[TestReceiveTimeoutActor].latch.tryAwait(3, TimeUnit.SECONDS))
|
||||
}
|
||||
|
||||
@Test def swappedReceiveShouldAlsoGetTimout = {
|
||||
val timeoutActor = actorOf(classOf[FiveOhOhTestReceiveTimeoutActor]).start
|
||||
|
||||
assert(timeoutActor.actorRef.actor.asInstanceOf[TestReceiveTimeoutActor].latch.tryAwait(3, TimeUnit.SECONDS))
|
||||
|
||||
val swappedLatch = new StandardLatch
|
||||
timeoutActor sendOneWay HotSwap(Some{
|
||||
case ReceiveTimeout => swappedLatch.open
|
||||
})
|
||||
|
||||
assert(swappedLatch.tryAwait(3, TimeUnit.SECONDS))
|
||||
}
|
||||
|
||||
@Test def timeoutShouldBeCancelledAfterRegularReceive = {
|
||||
val timeoutActor = actorOf(classOf[FiveOhOhTestReceiveTimeoutActor]).start
|
||||
timeoutActor sendOneWay Tick
|
||||
assert(timeoutActor.actorRef.actor.asInstanceOf[TestReceiveTimeoutActor].latch.tryAwait(1, TimeUnit.SECONDS) == false)
|
||||
}
|
||||
|
||||
@Test def timeoutShouldNotBeSentWhenNotSpecified = {
|
||||
val timeoutLatch = new StandardLatch
|
||||
val timeoutActor = actorOf(classOf[TestReceiveTimeoutActor]).start
|
||||
assert(timeoutLatch.tryAwait(2, TimeUnit.SECONDS) == false)
|
||||
}
|
||||
}
|
||||
|
|
@ -1,239 +0,0 @@
|
|||
package se.scalablesolutions.akka.actor
|
||||
|
||||
import java.util.concurrent.{TimeUnit, CountDownLatch}
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit.Test
|
||||
|
||||
import se.scalablesolutions.akka.stm.{Ref, TransactionalMap, TransactionalVector}
|
||||
import UntypedActor._
|
||||
|
||||
object UntypedTransactorSpec {
|
||||
case class GetMapState(key: String)
|
||||
case object GetVectorState
|
||||
case object GetVectorSize
|
||||
case object GetRefState
|
||||
|
||||
case class SetMapState(key: String, value: String)
|
||||
case class SetVectorState(key: String)
|
||||
case class SetRefState(key: String)
|
||||
case class Success(key: String, value: String)
|
||||
case class Failure(key: String, value: String, failer: UntypedActorRef)
|
||||
|
||||
case class SetMapStateOneWay(key: String, value: String)
|
||||
case class SetVectorStateOneWay(key: String)
|
||||
case class SetRefStateOneWay(key: String)
|
||||
case class SuccessOneWay(key: String, value: String)
|
||||
case class FailureOneWay(key: String, value: String, failer: UntypedActorRef)
|
||||
|
||||
case object GetNotifier
|
||||
}
|
||||
import UntypedTransactorSpec._
|
||||
|
||||
class StatefulUntypedTransactor(expectedInvocationCount: Int) extends UntypedTransactor {
|
||||
def this() = this(0)
|
||||
getContext.setTimeout(5000)
|
||||
|
||||
val notifier = new CountDownLatch(expectedInvocationCount)
|
||||
|
||||
private val mapState = TransactionalMap[String, String]()
|
||||
private val vectorState = TransactionalVector[String]()
|
||||
private val refState = Ref[String]()
|
||||
|
||||
def onReceive(message: Any) = message match {
|
||||
case GetNotifier =>
|
||||
getContext.replyUnsafe(notifier)
|
||||
case GetMapState(key) =>
|
||||
getContext.replyUnsafe(mapState.get(key).get)
|
||||
notifier.countDown
|
||||
case GetVectorSize =>
|
||||
getContext.replyUnsafe(vectorState.length.asInstanceOf[AnyRef])
|
||||
notifier.countDown
|
||||
case GetRefState =>
|
||||
getContext.replyUnsafe(refState.get)
|
||||
notifier.countDown
|
||||
case SetMapState(key, msg) =>
|
||||
mapState.put(key, msg)
|
||||
getContext.replyUnsafe(msg)
|
||||
notifier.countDown
|
||||
case SetVectorState(msg) =>
|
||||
vectorState.add(msg)
|
||||
getContext.replyUnsafe(msg)
|
||||
notifier.countDown
|
||||
case SetRefState(msg) =>
|
||||
refState.swap(msg)
|
||||
getContext.replyUnsafe(msg)
|
||||
notifier.countDown
|
||||
case Success(key, msg) =>
|
||||
mapState.put(key, msg)
|
||||
vectorState.add(msg)
|
||||
refState.swap(msg)
|
||||
getContext.replyUnsafe(msg)
|
||||
notifier.countDown
|
||||
case Failure(key, msg, failer) =>
|
||||
mapState.put(key, msg)
|
||||
vectorState.add(msg)
|
||||
refState.swap(msg)
|
||||
failer.sendRequestReply("Failure")
|
||||
getContext.replyUnsafe(msg)
|
||||
notifier.countDown
|
||||
case SetMapStateOneWay(key, msg) =>
|
||||
mapState.put(key, msg)
|
||||
notifier.countDown
|
||||
case SetVectorStateOneWay(msg) =>
|
||||
vectorState.add(msg)
|
||||
notifier.countDown
|
||||
case SetRefStateOneWay(msg) =>
|
||||
refState.swap(msg)
|
||||
notifier.countDown
|
||||
case SuccessOneWay(key, msg) =>
|
||||
mapState.put(key, msg)
|
||||
vectorState.add(msg)
|
||||
refState.swap(msg)
|
||||
notifier.countDown
|
||||
case FailureOneWay(key, msg, failer) =>
|
||||
mapState.put(key, msg)
|
||||
vectorState.add(msg)
|
||||
refState.swap(msg)
|
||||
notifier.countDown
|
||||
failer.sendOneWay("Failure",getContext)
|
||||
}
|
||||
}
|
||||
|
||||
class StatefulUntypedTransactorExpectingTwoInvocations extends StatefulUntypedTransactor(2)
|
||||
|
||||
@serializable
|
||||
class FailerUntypedTransactor extends UntypedTransactor {
|
||||
|
||||
def onReceive(message: Any) = message match {
|
||||
case "Failure" =>
|
||||
throw new RuntimeException("Expected exception; to test fault-tolerance")
|
||||
}
|
||||
}
|
||||
|
||||
class UntypedTransactorSpec extends JUnitSuite {
|
||||
|
||||
@Test
|
||||
def shouldOneWayMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
|
||||
val stateful = actorOf(classOf[StatefulUntypedTransactorExpectingTwoInvocations]).start
|
||||
stateful sendOneWay SetMapStateOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init") // set init state
|
||||
stateful sendOneWay SuccessOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
|
||||
val notifier = (stateful sendRequestReply GetNotifier).asInstanceOf[CountDownLatch]
|
||||
assert(notifier.await(1, TimeUnit.SECONDS))
|
||||
assert("new state" === (stateful sendRequestReply GetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")))
|
||||
}
|
||||
|
||||
@Test
|
||||
def shouldMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
|
||||
val stateful = actorOf(classOf[StatefulUntypedTransactor]).start
|
||||
stateful sendRequestReply SetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init") // set init state
|
||||
stateful sendRequestReply Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
|
||||
assert("new state" === (stateful sendRequestReply GetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")))
|
||||
}
|
||||
|
||||
@Test
|
||||
def shouldOneWayMapShouldRollbackStateForStatefulServerInCaseOfFailure = {
|
||||
val stateful = actorOf(classOf[StatefulUntypedTransactorExpectingTwoInvocations]).start
|
||||
val failer = actorOf(classOf[FailerUntypedTransactor]).start
|
||||
stateful sendOneWay SetMapStateOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init") // set init state
|
||||
stateful sendOneWay FailureOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
|
||||
val notifier = (stateful sendRequestReply GetNotifier).asInstanceOf[CountDownLatch]
|
||||
assert(notifier.await(5, TimeUnit.SECONDS))
|
||||
assert("init" === (stateful sendRequestReply GetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure"))) // check that state is == init state
|
||||
}
|
||||
|
||||
@Test
|
||||
def shouldMapShouldRollbackStateForStatefulServerInCaseOfFailure = {
|
||||
val stateful = actorOf(classOf[StatefulUntypedTransactor]).start
|
||||
stateful sendRequestReply SetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init") // set init state
|
||||
val failer = actorOf(classOf[FailerUntypedTransactor]).start
|
||||
try {
|
||||
stateful sendRequestReply Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
|
||||
fail("should have thrown an exception")
|
||||
} catch {case e: RuntimeException => {}}
|
||||
assert("init" === (stateful sendRequestReply GetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure"))) // check that state is == init state
|
||||
}
|
||||
|
||||
@Test
|
||||
def shouldOneWayVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
|
||||
val stateful = actorOf(classOf[StatefulUntypedTransactorExpectingTwoInvocations]).start
|
||||
stateful sendOneWay SetVectorStateOneWay("init") // set init state
|
||||
stateful sendOneWay SuccessOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
|
||||
val notifier = (stateful sendRequestReply GetNotifier).asInstanceOf[CountDownLatch]
|
||||
assert(notifier.await(1, TimeUnit.SECONDS))
|
||||
assert(2 === (stateful sendRequestReply GetVectorSize))
|
||||
}
|
||||
|
||||
@Test
|
||||
def shouldVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
|
||||
val stateful = actorOf(classOf[StatefulUntypedTransactor]).start
|
||||
stateful sendRequestReply SetVectorState("init") // set init state
|
||||
stateful sendRequestReply Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
|
||||
assert(2 === (stateful sendRequestReply GetVectorSize))
|
||||
}
|
||||
|
||||
@Test
|
||||
def shouldOneWayVectorShouldRollbackStateForStatefulServerInCaseOfFailure = {
|
||||
val stateful = actorOf(classOf[StatefulUntypedTransactorExpectingTwoInvocations]).start
|
||||
stateful sendOneWay SetVectorStateOneWay("init") // set init state
|
||||
Thread.sleep(1000)
|
||||
val failer = actorOf(classOf[FailerUntypedTransactor]).start
|
||||
stateful sendOneWay FailureOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
|
||||
val notifier = (stateful sendRequestReply GetNotifier).asInstanceOf[CountDownLatch]
|
||||
assert(notifier.await(1, TimeUnit.SECONDS))
|
||||
assert(1 === (stateful sendRequestReply GetVectorSize))
|
||||
}
|
||||
|
||||
@Test
|
||||
def shouldVectorShouldRollbackStateForStatefulServerInCaseOfFailure = {
|
||||
val stateful = actorOf(classOf[StatefulUntypedTransactor]).start
|
||||
stateful sendRequestReply SetVectorState("init") // set init state
|
||||
val failer = actorOf(classOf[FailerUntypedTransactor]).start
|
||||
try {
|
||||
stateful sendRequestReply Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
|
||||
fail("should have thrown an exception")
|
||||
} catch {case e: RuntimeException => {}}
|
||||
assert(1 === (stateful sendRequestReply GetVectorSize))
|
||||
}
|
||||
|
||||
@Test
|
||||
def shouldOneWayRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
|
||||
val stateful = actorOf(classOf[StatefulUntypedTransactorExpectingTwoInvocations]).start
|
||||
stateful sendOneWay SetRefStateOneWay("init") // set init state
|
||||
stateful sendOneWay SuccessOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
|
||||
val notifier = (stateful sendRequestReply GetNotifier).asInstanceOf[CountDownLatch]
|
||||
assert(notifier.await(1, TimeUnit.SECONDS))
|
||||
assert("new state" === (stateful sendRequestReply GetRefState))
|
||||
}
|
||||
|
||||
@Test
|
||||
def shouldRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
|
||||
val stateful = actorOf(classOf[StatefulUntypedTransactor]).start
|
||||
stateful sendRequestReply SetRefState("init") // set init state
|
||||
stateful sendRequestReply Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
|
||||
assert("new state" === (stateful sendRequestReply GetRefState))
|
||||
}
|
||||
|
||||
@Test
|
||||
def shouldOneWayRefShouldRollbackStateForStatefulServerInCaseOfFailure = {
|
||||
val stateful = actorOf(classOf[StatefulUntypedTransactorExpectingTwoInvocations]).start
|
||||
stateful sendOneWay SetRefStateOneWay("init") // set init state
|
||||
Thread.sleep(1000)
|
||||
val failer = actorOf(classOf[FailerUntypedTransactor]).start
|
||||
stateful sendOneWay FailureOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
|
||||
val notifier = (stateful sendRequestReply GetNotifier).asInstanceOf[CountDownLatch]
|
||||
assert(notifier.await(1, TimeUnit.SECONDS))
|
||||
assert("init" === (stateful sendRequestReply (GetRefState, 1000000))) // check that state is == init state
|
||||
}
|
||||
|
||||
@Test
|
||||
def shouldRefShouldRollbackStateForStatefulServerInCaseOfFailure = {
|
||||
val stateful = actorOf(classOf[StatefulUntypedTransactor]).start
|
||||
stateful sendRequestReply SetRefState("init") // set init state
|
||||
val failer = actorOf(classOf[FailerUntypedTransactor]).start
|
||||
try {
|
||||
stateful sendRequestReply Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
|
||||
fail("should have thrown an exception")
|
||||
} catch {case e: RuntimeException => {}}
|
||||
assert("init" === (stateful sendRequestReply GetRefState)) // check that state is == init state
|
||||
}
|
||||
}
|
||||
|
|
@ -16,7 +16,7 @@ import org.springframework.context.{ApplicationContext,ApplicationContextAware}
|
|||
import org.springframework.util.ReflectionUtils
|
||||
import org.springframework.util.StringUtils
|
||||
|
||||
import se.scalablesolutions.akka.actor.{ActorRef, AspectInitRegistry, TypedActorConfiguration, TypedActor, UntypedActor}
|
||||
import se.scalablesolutions.akka.actor.{ActorRef, AspectInitRegistry, TypedActorConfiguration, TypedActor,Actor}
|
||||
import se.scalablesolutions.akka.dispatch.MessageDispatcher
|
||||
import se.scalablesolutions.akka.util.{Logging, Duration}
|
||||
import scala.reflect.BeanProperty
|
||||
|
|
@ -101,7 +101,7 @@ class ActorFactoryBean extends AbstractFactoryBean[AnyRef] with Logging with App
|
|||
private[akka] def createUntypedInstance() : ActorRef = {
|
||||
if (implementation == null || implementation == "") throw new AkkaBeansException(
|
||||
"The 'implementation' part of the 'akka:untyped-actor' element in the Spring config file can't be null or empty string")
|
||||
val actorRef = UntypedActor.actorOf(implementation.toClass)
|
||||
val actorRef = Actor.actorOf(implementation.toClass)
|
||||
if (timeout > 0) {
|
||||
actorRef.setTimeout(timeout)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@ import org.springframework.beans.factory.config.AbstractFactoryBean
|
|||
import se.scalablesolutions.akka.config.TypedActorConfigurator
|
||||
import se.scalablesolutions.akka.config.JavaConfig._
|
||||
import se.scalablesolutions.akka.config.ScalaConfig.{Supervise, Server, SupervisorConfig, RemoteAddress => SRemoteAddress}
|
||||
import se.scalablesolutions.akka.actor.{Supervisor, SupervisorFactory, UntypedActor}
|
||||
import se.scalablesolutions.akka.actor.{Supervisor, SupervisorFactory, Actor}
|
||||
import AkkaSpringConfigurationTags._
|
||||
import reflect.BeanProperty
|
||||
|
||||
|
|
@ -82,7 +82,7 @@ class SupervisionFactoryBean extends AbstractFactoryBean[AnyRef] {
|
|||
import StringReflect._
|
||||
val lifeCycle = if (!props.lifecycle.isEmpty && props.lifecycle.equalsIgnoreCase(VAL_LIFECYCYLE_TEMPORARY)) new LifeCycle(new Temporary()) else new LifeCycle(new Permanent())
|
||||
val isRemote = (props.host != null) && (!props.host.isEmpty)
|
||||
val actorRef = UntypedActor.actorOf(props.target.toClass)
|
||||
val actorRef = Actor.actorOf(props.target.toClass)
|
||||
if (props.timeout > 0) {
|
||||
actorRef.setTimeout(props.timeout)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
package se.scalablesolutions.akka.spring.foo;
|
||||
|
||||
import se.scalablesolutions.akka.actor.UntypedActor;
|
||||
import se.scalablesolutions.akka.actor.UntypedActorRef;
|
||||
import se.scalablesolutions.akka.actor.ActorRef;
|
||||
|
||||
/**
|
||||
* test class
|
||||
|
|
@ -21,7 +21,7 @@ public class PingActor extends UntypedActor {
|
|||
System.out.println("Ping received String message: " + message);
|
||||
if (message.equals("longRunning")) {
|
||||
System.out.println("### starting pong");
|
||||
UntypedActorRef pongActor = UntypedActor.actorOf(PongActor.class).start();
|
||||
ActorRef pongActor = UntypedActor.actorOf(PongActor.class).start();
|
||||
pongActor.sendRequestReply("longRunning", getContext());
|
||||
}
|
||||
} else {
|
||||
|
|
|
|||
|
|
@ -6,7 +6,7 @@ package se.scalablesolutions.akka.spring
|
|||
|
||||
import foo.{IMyPojo, MyPojo, PingActor}
|
||||
import se.scalablesolutions.akka.dispatch._
|
||||
import se.scalablesolutions.akka.actor.UntypedActorRef
|
||||
import se.scalablesolutions.akka.actor.ActorRef
|
||||
|
||||
import org.scalatest.FeatureSpec
|
||||
import org.scalatest.matchers.ShouldMatchers
|
||||
|
|
@ -120,7 +120,7 @@ class DispatcherSpringFeatureTest extends FeatureSpec with ShouldMatchers {
|
|||
|
||||
scenario("get a thread-based-dispatcher for untyped from context") {
|
||||
val context = new ClassPathXmlApplicationContext("/dispatcher-config.xml")
|
||||
val actorRef = context.getBean("untyped-actor-with-thread-based-dispatcher").asInstanceOf[UntypedActorRef]
|
||||
val actorRef = context.getBean("untyped-actor-with-thread-based-dispatcher").asInstanceOf[ActorRef]
|
||||
assert(actorRef.getActorClassName() === "se.scalablesolutions.akka.spring.foo.PingActor")
|
||||
actorRef.start()
|
||||
actorRef.sendOneWay("Hello")
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@ package se.scalablesolutions.akka.spring
|
|||
import foo.PingActor
|
||||
import se.scalablesolutions.akka.dispatch.ExecutorBasedEventDrivenWorkStealingDispatcher
|
||||
import se.scalablesolutions.akka.remote.RemoteNode
|
||||
import se.scalablesolutions.akka.actor.UntypedActorRef
|
||||
import se.scalablesolutions.akka.actor.ActorRef
|
||||
import org.scalatest.FeatureSpec
|
||||
import org.scalatest.matchers.ShouldMatchers
|
||||
import org.scalatest.junit.JUnitRunner
|
||||
|
|
@ -26,16 +26,16 @@ class UntypedActorSpringFeatureTest extends FeatureSpec with ShouldMatchers {
|
|||
|
||||
scenario("get a untyped actor") {
|
||||
val context = new ClassPathXmlApplicationContext("/untyped-actor-config.xml")
|
||||
val myactor = context.getBean("simple-untyped-actor").asInstanceOf[UntypedActorRef]
|
||||
val myactor = context.getBean("simple-untyped-actor").asInstanceOf[ActorRef]
|
||||
assert(myactor.getActorClassName() === "se.scalablesolutions.akka.spring.foo.PingActor")
|
||||
myactor.start()
|
||||
myactor.sendOneWay("Hello")
|
||||
assert(myactor.actorRef.isDefinedAt("some string message"))
|
||||
assert(myactor.isDefinedAt("some string message"))
|
||||
}
|
||||
|
||||
scenario("untyped-actor with timeout") {
|
||||
val context = new ClassPathXmlApplicationContext("/untyped-actor-config.xml")
|
||||
val myactor = context.getBean("simple-untyped-actor-long-timeout").asInstanceOf[UntypedActorRef]
|
||||
val myactor = context.getBean("simple-untyped-actor-long-timeout").asInstanceOf[ActorRef]
|
||||
assert(myactor.getActorClassName() === "se.scalablesolutions.akka.spring.foo.PingActor")
|
||||
myactor.start()
|
||||
myactor.sendOneWay("Hello")
|
||||
|
|
@ -44,22 +44,22 @@ class UntypedActorSpringFeatureTest extends FeatureSpec with ShouldMatchers {
|
|||
|
||||
scenario("transactional untyped-actor") {
|
||||
val context = new ClassPathXmlApplicationContext("/untyped-actor-config.xml")
|
||||
val myactor = context.getBean("transactional-untyped-actor").asInstanceOf[UntypedActorRef]
|
||||
val myactor = context.getBean("transactional-untyped-actor").asInstanceOf[ActorRef]
|
||||
assert(myactor.getActorClassName() === "se.scalablesolutions.akka.spring.foo.PingActor")
|
||||
myactor.start()
|
||||
myactor.sendOneWay("Hello")
|
||||
assert(myactor.actorRef.isDefinedAt("some string message"))
|
||||
assert(myactor.isDefinedAt("some string message"))
|
||||
}
|
||||
|
||||
scenario("get a remote typed-actor") {
|
||||
RemoteNode.start
|
||||
Thread.sleep(1000)
|
||||
val context = new ClassPathXmlApplicationContext("/untyped-actor-config.xml")
|
||||
val myactor = context.getBean("remote-untyped-actor").asInstanceOf[UntypedActorRef]
|
||||
val myactor = context.getBean("remote-untyped-actor").asInstanceOf[ActorRef]
|
||||
assert(myactor.getActorClassName() === "se.scalablesolutions.akka.spring.foo.PingActor")
|
||||
myactor.start()
|
||||
myactor.sendOneWay("Hello")
|
||||
assert(myactor.actorRef.isDefinedAt("some string message"))
|
||||
assert(myactor.isDefinedAt("some string message"))
|
||||
assert(myactor.getRemoteAddress().isDefined)
|
||||
assert(myactor.getRemoteAddress().get.getHostName() === "localhost")
|
||||
assert(myactor.getRemoteAddress().get.getPort() === 9999)
|
||||
|
|
@ -67,7 +67,7 @@ class UntypedActorSpringFeatureTest extends FeatureSpec with ShouldMatchers {
|
|||
|
||||
scenario("untyped-actor with custom dispatcher") {
|
||||
val context = new ClassPathXmlApplicationContext("/untyped-actor-config.xml")
|
||||
val myactor = context.getBean("untyped-actor-with-dispatcher").asInstanceOf[UntypedActorRef]
|
||||
val myactor = context.getBean("untyped-actor-with-dispatcher").asInstanceOf[ActorRef]
|
||||
assert(myactor.getActorClassName() === "se.scalablesolutions.akka.spring.foo.PingActor")
|
||||
myactor.start()
|
||||
myactor.sendOneWay("Hello")
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue