Refactored code into ActorRef, LocalActorRef and RemoteActorRef

This commit is contained in:
Jonas Bonér 2010-05-13 15:40:49 +02:00
parent b1d9897e22
commit 21e6085864
33 changed files with 806 additions and 443 deletions

View file

@ -24,7 +24,7 @@ class ActorComponentFeatureTest extends FeatureSpec with BeforeAndAfterAll with
scenario("one-way communication using actor id") { scenario("one-way communication using actor id") {
val actor = new Tester with Retain with Countdown[Message] val actor = new Tester with Retain with Countdown[Message]
actor.start actor.start
template.sendBody("actor:%s" format actor.getId, "Martin") template.sendBody("actor:%s" format actor.id, "Martin")
assert(actor.waitFor) assert(actor.waitFor)
assert(actor.body === "Martin") assert(actor.body === "Martin")
} }
@ -40,7 +40,7 @@ class ActorComponentFeatureTest extends FeatureSpec with BeforeAndAfterAll with
scenario("two-way communication using actor id") { scenario("two-way communication using actor id") {
val actor = new Tester with Respond val actor = new Tester with Respond
actor.start actor.start
assert(template.requestBody("actor:%s" format actor.getId, "Martin") === "Hello Martin") assert(template.requestBody("actor:%s" format actor.id, "Martin") === "Hello Martin")
} }
scenario("two-way communication using actor uuid") { scenario("two-way communication using actor uuid") {

View file

@ -6,7 +6,7 @@ package se.scalablesolutions.akka.remote.protobuf;
/* /*
Compile with: Compile with:
cd ./akka-util-java/src/main/java cd ./akka-core/src/main/java
protoc se/scalablesolutions/akka/remote/protobuf/RemoteProtocol.proto --java_out . protoc se/scalablesolutions/akka/remote/protobuf/RemoteProtocol.proto --java_out .
*/ */

View file

@ -374,8 +374,8 @@ object ActiveObject {
this this
} }
private[akka] def supervise(restartStrategy: RestartStrategy, components: List[Supervise]): ActorRef = private[akka] def supervise(restartStrategy: RestartStrategy, components: List[Supervise]): Supervisor =
SupervisorFactory(SupervisorConfig(restartStrategy, components)).newInstance.start Supervisor(SupervisorConfig(restartStrategy, components))
} }
private[akka] object AspectInitRegistry { private[akka] object AspectInitRegistry {
@ -523,9 +523,6 @@ private[akka] sealed class ActiveObjectAspect {
} }
} }
// FIXME Jan Kronquist: started work on issue 121
private[akka] case class Link(val actor: ActorRef)
object Dispatcher { object Dispatcher {
val ZERO_ITEM_CLASS_ARRAY = Array[Class[_]]() val ZERO_ITEM_CLASS_ARRAY = Array[Class[_]]()
val ZERO_ITEM_OBJECT_ARRAY = Array[Object]() val ZERO_ITEM_OBJECT_ARRAY = Array[Object]()

View file

@ -69,6 +69,7 @@ abstract class RemoteActor(hostname: String, port: Int) extends Actor {
case class HotSwap(code: Option[PartialFunction[Any, Unit]]) extends LifeCycleMessage case class HotSwap(code: Option[PartialFunction[Any, Unit]]) extends LifeCycleMessage
case class Restart(reason: Throwable) extends LifeCycleMessage case class Restart(reason: Throwable) extends LifeCycleMessage
case class Exit(dead: ActorRef, killer: Throwable) extends LifeCycleMessage case class Exit(dead: ActorRef, killer: Throwable) extends LifeCycleMessage
case class Link(child: ActorRef) extends LifeCycleMessage
case class Unlink(child: ActorRef) extends LifeCycleMessage case class Unlink(child: ActorRef) extends LifeCycleMessage
case class UnlinkAndStop(child: ActorRef) extends LifeCycleMessage case class UnlinkAndStop(child: ActorRef) extends LifeCycleMessage
case object Kill extends LifeCycleMessage case object Kill extends LifeCycleMessage
@ -97,7 +98,7 @@ object Actor extends Logging {
} }
/** /**
* Creates a new ActorRef out of the Actor with type T. * Creates a Actor.newActor out of the Actor with type T.
* <pre> * <pre>
* import Actor._ * import Actor._
* val actor = newActor[MyActor] * val actor = newActor[MyActor]
@ -106,10 +107,10 @@ object Actor extends Logging {
* actor.stop * actor.stop
* </pre> * </pre>
*/ */
def newActor[T <: Actor: Manifest]: ActorRef = new ActorRef(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]]) def newActor[T <: Actor: Manifest]: ActorRef = new LocalActorRef(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]])
/** /**
* Creates a new ActorRef out of the Actor. Allows you to pass in a factory function * Creates a Actor.newActor out of the Actor. Allows you to pass in a factory function
* that creates the Actor. Please note that this function can be invoked multiple * that creates the Actor. Please note that this function can be invoked multiple
* times if for example the Actor is supervised and needs to be restarted. * times if for example the Actor is supervised and needs to be restarted.
* <p/> * <p/>
@ -122,7 +123,7 @@ object Actor extends Logging {
* actor.stop * actor.stop
* </pre> * </pre>
*/ */
def newActor(factory: () => Actor): ActorRef = new ActorRef(factory) def newActor(factory: () => Actor): ActorRef = new LocalActorRef(factory)
/** /**
* Use to create an anonymous event-driven actor. * Use to create an anonymous event-driven actor.
@ -301,7 +302,7 @@ trait Actor extends Logging {
/** /**
* Holds the hot swapped partial function. * Holds the hot swapped partial function.
*/ */
private[this] var _hotswap: Option[PartialFunction[Any, Unit]] = None // FIXME: _hotswap should be a stack private var _hotswap: Option[PartialFunction[Any, Unit]] = None // FIXME: _hotswap should be a stack
// ======================================== // ========================================
// ==== CALLBACKS FOR USER TO OVERRIDE ==== // ==== CALLBACKS FOR USER TO OVERRIDE ====
@ -478,7 +479,7 @@ trait Actor extends Logging {
/** /**
* Starts the actor. * Starts the actor.
*/ */
def start = self.start def start = self.startOnCreation = true
/** /**
* Shuts down the actor its dispatcher and message queue. * Shuts down the actor its dispatcher and message queue.

View file

@ -24,7 +24,8 @@ import jsr166x.{Deque, ConcurrentLinkedDeque}
import java.net.InetSocketAddress import java.net.InetSocketAddress
import java.util.concurrent.locks.{Lock, ReentrantLock} import java.util.concurrent.locks.{Lock, ReentrantLock}
import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.atomic.AtomicReference
import java.util.{HashSet => JHashSet} import java.util.concurrent.ConcurrentHashMap
import java.util.{Map => JMap}
/* /*
trait ActorWithNestedReceive extends Actor { trait ActorWithNestedReceive extends Actor {
@ -45,7 +46,7 @@ trait ActorWithNestedReceive extends Actor {
/** /**
* The ActorRef object can be used to deserialize ActorRef instances from of its binary representation * The ActorRef object can be used to deserialize ActorRef instances from of its binary representation
* or its Protocol Buffers (protobuf) Message representation to a new ActorRef instance. * or its Protocol Buffers (protobuf) Message representation to a Actor.newActor instance.
* <p/> * <p/>
* Binary -> ActorRef: * Binary -> ActorRef:
* <pre> * <pre>
@ -106,21 +107,18 @@ object ActorRef {
* *
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
sealed class ActorRef private[akka] () extends TransactionManagement { trait ActorRef extends TransactionManagement {
// Only mutable for RemoteServer in order to maintain identity across nodes // Only mutable for RemoteServer in order to maintain identity across nodes
@volatile private[akka] var _uuid = UUID.newUuid.toString @volatile protected[akka] var _uuid = UUID.newUuid.toString
@volatile private[this] var _isRunning = false @volatile protected[this] var _isRunning = false
@volatile private[this] var _isSuspended = true @volatile protected[this] var _isSuspended = true
@volatile private[this] var _isShutDown = false @volatile protected[this] var _isShutDown = false
@volatile private[akka] var _isKilled = false @volatile protected[akka] var _isKilled = false
@volatile private[akka] var _registeredInRemoteNodeDuringSerialization = false
@volatile private[akka] var _remoteAddress: Option[InetSocketAddress] = None @volatile protected[akka] var startOnCreation = false
@volatile private[akka] var _linkedActors: Option[JHashSet[ActorRef]] = None // FIXME: turn _linkedActors into a ConcurrentHashSet to avoid ReadWrite lock when touched? @volatile protected[akka] var registeredInRemoteNodeDuringSerialization = false
@volatile private[akka] var _supervisor: Option[ActorRef] = None protected[this] val guard = new ReadWriteLock
@volatile private[akka] var _replyToAddress: Option[InetSocketAddress] = None
private[akka] val _mailbox: Deque[MessageInvocation] = new ConcurrentLinkedDeque[MessageInvocation]
private[this] val _guard = new ReadWriteLock
/** /**
* User overridable callback/setting. * User overridable callback/setting.
@ -128,13 +126,13 @@ sealed class ActorRef private[akka] () extends TransactionManagement {
* Set to true if messages should have REQUIRES_NEW semantics, e.g. a new transaction should * Set to true if messages should have REQUIRES_NEW semantics, e.g. a new transaction should
* start if there is no one running, else it joins the existing transaction. * start if there is no one running, else it joins the existing transaction.
*/ */
@volatile private[akka] var isTransactor = false @volatile protected[akka] var isTransactor = false
/**v /**v
* This lock ensures thread safety in the dispatching: only one message can * This lock ensures thread safety in the dispatching: only one message can
* be dispatched at once on the actor. * be dispatched at once on the actor.
*/ */
private[akka] val _dispatcherLock: Lock = new ReentrantLock protected[akka] val dispatcherLock: Lock = new ReentrantLock
/** /**
* Holds the reference to the sender of the currently processed message. * Holds the reference to the sender of the currently processed message.
@ -142,27 +140,32 @@ sealed class ActorRef private[akka] () extends TransactionManagement {
* - Is Some(Left(Actor)) if sender is an actor * - Is Some(Left(Actor)) if sender is an actor
* - Is Some(Right(CompletableFuture)) if sender is holding on to a Future for the result * - Is Some(Right(CompletableFuture)) if sender is holding on to a Future for the result
*/ */
private[this] var _replyTo: Option[Either[ActorRef, CompletableFuture[Any]]] = None protected[this] var _replyTo: Option[Either[ActorRef, CompletableFuture[Any]]] = None
private[akka] def replyTo: Option[Either[ActorRef, CompletableFuture[Any]]] = protected[akka] def replyTo: Option[Either[ActorRef, CompletableFuture[Any]]] =
_guard.withReadLock { _replyTo } guard.withReadLock { _replyTo }
private[akka] def replyTo_=(rt: Option[Either[ActorRef, CompletableFuture[Any]]]) = protected[akka] def replyTo_=(rt: Option[Either[ActorRef, CompletableFuture[Any]]]) =
_guard.withWriteLock { _replyTo = rt } guard.withWriteLock { _replyTo = rt }
private[this] var actorFactory: Either[Option[Class[_ <: Actor]], Option[() => Actor]] = Left(None) /**
* Is the actor killed?
*/
def isKilled: Boolean = _isKilled
private[this] lazy val actorInstance: AtomicReference[Actor] = new AtomicReference[Actor](newActor) /**
* Is the actor running?
*/
def isRunning: Boolean = _isRunning
private[akka] def actor: Actor = actorInstance.get /**
* Is the actor shut down?
*/
def isShutdown: Boolean = _isShutDown
private[akka] def this(clazz: Class[_ <: Actor]) = { /**
this() * Returns the uuid for the actor.
actorFactory = Left(Some(clazz)) */
} def uuid = _uuid
protected[akka] def uuid_=(uid: String) = _uuid = uid
private[akka] def this(factory: () => Actor) = {
this()
actorFactory = Right(Some(factory))
}
/** /**
* Sends a one-way asynchronous message. E.g. fire-and-forget semantics. * Sends a one-way asynchronous message. E.g. fire-and-forget semantics.
@ -265,20 +268,258 @@ sealed class ActorRef private[akka] () extends TransactionManagement {
} }
} else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it") } else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
} }
/**
* Serializes the ActorRef instance into a byte array (Array[Byte]).
*/
def toBinary: Array[Byte]
/**
* Returns the class for the Actor instance that is managed by the ActorRef.
*/
def actorClass: Class[_ <: Actor]
/**
* Sets the dispatcher for this actor. Needs to be invoked before the actor is started.
*/
def dispatcher_=(md: MessageDispatcher): Unit
/**
* Get the dispatcher for this actor.
*/
def dispatcher: MessageDispatcher
/**
* Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host.
*/
def makeRemote(hostname: String, port: Int): Unit
/**
* Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host.
*/
def makeRemote(address: InetSocketAddress): Unit
/**
* Invoking 'makeTransactionRequired' means that the actor will **start** a new transaction if non exists.
* However, it will always participate in an existing transaction.
* If transactionality want to be completely turned off then do it by invoking:
* <pre/>
* TransactionManagement.disableTransactions
* </pre>
*/
def makeTransactionRequired: Unit
/**
* Set the contact address for this actor. This is used for replying to messages
* sent asynchronously when no reply channel exists.
*/
def setReplyToAddress(hostname: String, port: Int): Unit =
setReplyToAddress(new InetSocketAddress(hostname, port))
/**
* Set the contact address for this actor. This is used for replying to messages
* sent asynchronously when no reply channel exists.
*/
def setReplyToAddress(address: InetSocketAddress): Unit
/**
* Returns the id for the actor.
*/
def id: String
/**
* Returns the remote address for the actor, if any, else None.
*/
def remoteAddress: Option[InetSocketAddress]
protected[akka] def remoteAddress_=(addr: Option[InetSocketAddress]): Unit
/**
* User overridable callback/setting.
* <p/>
* Defines the default timeout for '!!' and '!!!' invocations,
* e.g. the timeout for the future returned by the call to '!!' and '!!!'.
*/
def timeout: Long
/**
* Sets the default timeout for '!!' and '!!!' invocations,
* e.g. the timeout for the future returned by the call to '!!' and '!!!'.
*/
def timeout_=(t: Long)
/**
* Starts up the actor and its message queue.
*/
def start: ActorRef
/**
* Shuts down the actor its dispatcher and message queue.
* Alias for 'stop'.
*/
def exit = stop
/**
* Shuts down the actor its dispatcher and message queue.
*/
def stop: Unit
/**
* Links an other actor to this actor. Links are unidirectional and means that a the linking actor will
* receive a notification if the linked actor has crashed.
* <p/>
* If the 'trapExit' member field has been set to at contain at least one exception class then it will
* 'trap' these exceptions and automatically restart the linked actors according to the restart strategy
* defined by the 'faultHandler'.
* <p/>
* To be invoked from within the actor itself.
*/
def link(actorRef: ActorRef): Unit
/**
* Unlink the actor.
* <p/>
* To be invoked from within the actor itself.
*/
def unlink(actorRef: ActorRef): Unit
/**
* Atomically start and link an actor.
* <p/>
* To be invoked from within the actor itself.
*/
def startLink(actorRef: ActorRef): Unit
/**
* Atomically start, link and make an actor remote.
* <p/>
* To be invoked from within the actor itself.
*/
def startLinkRemote(actorRef: ActorRef, hostname: String, port: Int): Unit
/**
* Atomically create (from actor class) and start an actor.
* <p/>
* To be invoked from within the actor itself.
*/
def spawn[T <: Actor : Manifest]: ActorRef
/**
* Atomically create (from actor class), start and make an actor remote.
* <p/>
* To be invoked from within the actor itself.
*/
def spawnRemote[T <: Actor: Manifest](hostname: String, port: Int): ActorRef
/**
* Atomically create (from actor class), start and link an actor.
* <p/>
* To be invoked from within the actor itself.
*/
def spawnLink[T <: Actor: Manifest]: ActorRef
/**
* Atomically create (from actor class), start, link and make an actor remote.
* <p/>
* To be invoked from within the actor itself.
*/
def spawnLinkRemote[T <: Actor : Manifest](hostname: String, port: Int): ActorRef
/**
* Returns the mailbox size.
*/
def mailboxSize: Int
/**
* Returns the supervisor, if there is one.
*/
def supervisor: Option[ActorRef]
/**
* Shuts down and removes all linked actors.
*/
def shutdownLinkedActors: Unit
protected[akka] def toProtocol: ActorRefProtocol
protected[akka] def invoke(messageHandle: MessageInvocation): Unit
protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit
protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout[T](
message: Any,
timeout: Long,
senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T]
protected[this] def actorInstance: AtomicReference[Actor]
protected[akka] def actor: Actor = actorInstance.get
protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit
protected[akka] def trapExit: List[Class[_ <: Throwable]] = actor.trapExit
protected[akka] def trapExit_=(exits: List[Class[_ <: Throwable]]) = actor.trapExit = exits
protected[akka] def lifeCycle: Option[LifeCycle] = actor.lifeCycle
protected[akka] def lifeCycle_=(cycle: Option[LifeCycle]) = actor.lifeCycle = cycle
protected[akka] def faultHandler: Option[FaultHandlingStrategy] = actor.faultHandler
protected[akka] def faultHandler_=(handler: Option[FaultHandlingStrategy]) = actor.faultHandler = handler
protected[akka] def mailbox: Deque[MessageInvocation]
protected[akka] def restart(reason: Throwable): Unit
protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit
protected[akka] def restartLinkedActors(reason: Throwable): Unit
protected[akka] def registerSupervisorAsRemoteActor: Option[String]
protected[akka] def linkedActors: JMap[String, ActorRef]
protected[akka] def linkedActorsAsList: List[ActorRef]
override def toString: String
override def hashCode: Int
override def equals(that: Any): Boolean
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
sealed class LocalActorRef private[akka](
private[this] var actorFactory: Either[Option[Class[_ <: Actor]], Option[() => Actor]] = Left(None))
extends ActorRef {
private[akka] def this(clazz: Class[_ <: Actor]) = this(Left(Some(clazz)))
private[akka] def this(factory: () => Actor) = this(Right(Some(factory)))
// Only mutable for RemoteServer in order to maintain identity across nodes
@volatile private[akka] var _remoteAddress: Option[InetSocketAddress] = None
@volatile private[akka] var _linkedActors: Option[ConcurrentHashMap[String, ActorRef]] = None
@volatile private[akka] var _supervisor: Option[ActorRef] = None
@volatile private[akka] var _replyToAddress: Option[InetSocketAddress] = None
protected[akka] val _mailbox: Deque[MessageInvocation] = new ConcurrentLinkedDeque[MessageInvocation]
protected[this] val actorInstance = new AtomicReference[Actor](newActor)
if (startOnCreation) start
/** /**
* Serializes the ActorRef instance into a Protocol Buffers (protobuf) Message. * Serializes the ActorRef instance into a Protocol Buffers (protobuf) Message.
*/ */
private[akka] def toProtocol: ActorRefProtocol = _guard.withWriteLock { protected[akka] def toProtocol: ActorRefProtocol = guard.withWriteLock {
val (host, port) = _replyToAddress.map(address => val (host, port) = _replyToAddress.map(address =>
(address.getHostName, address.getPort)) (address.getHostName, address.getPort))
.getOrElse((Actor.HOSTNAME, Actor.PORT)) .getOrElse((Actor.HOSTNAME, Actor.PORT))
if (!_registeredInRemoteNodeDuringSerialization) { if (!registeredInRemoteNodeDuringSerialization) {
Actor.log.debug("Register serialized Actor [%s] as remote @ [%s:%s]", actorClass.getName, host, port) Actor.log.debug("Register serialized Actor [%s] as remote @ [%s:%s]", actorClass.getName, host, port)
if (RemoteServer.serverFor(host, port).isEmpty) (new RemoteServer).start(host, port) if (RemoteServer.serverFor(host, port).isEmpty) (new RemoteServer).start(host, port)
RemoteServer.actorsFor(RemoteServer.Address(host, port)).actors.put(uuid, this) RemoteServer.actorsFor(RemoteServer.Address(host, port)).actors.put(uuid, this)
_registeredInRemoteNodeDuringSerialization = true registeredInRemoteNodeDuringSerialization = true
} }
ActorRefProtocol.newBuilder ActorRefProtocol.newBuilder
@ -289,6 +530,11 @@ sealed class ActorRef private[akka] () extends TransactionManagement {
.setTimeout(timeout) .setTimeout(timeout)
.build .build
} }
/**
* Returns the mailbox.
*/
protected[akka] def mailbox: Deque[MessageInvocation] = _mailbox
/** /**
* Serializes the ActorRef instance into a byte array (Array[Byte]). * Serializes the ActorRef instance into a byte array (Array[Byte]).
@ -303,7 +549,7 @@ sealed class ActorRef private[akka] () extends TransactionManagement {
/** /**
* Sets the dispatcher for this actor. Needs to be invoked before the actor is started. * Sets the dispatcher for this actor. Needs to be invoked before the actor is started.
*/ */
def dispatcher_=(md: MessageDispatcher): Unit = _guard.withWriteLock { def dispatcher_=(md: MessageDispatcher): Unit = guard.withWriteLock {
if (!isRunning) actor.dispatcher = md if (!isRunning) actor.dispatcher = md
else throw new IllegalArgumentException( else throw new IllegalArgumentException(
"Can not swap dispatcher for " + toString + " after it has been started") "Can not swap dispatcher for " + toString + " after it has been started")
@ -312,7 +558,7 @@ sealed class ActorRef private[akka] () extends TransactionManagement {
/** /**
* Get the dispatcher for this actor. * Get the dispatcher for this actor.
*/ */
def dispatcher: MessageDispatcher = _guard.withReadLock { actor.dispatcher } def dispatcher: MessageDispatcher = guard.withReadLock { actor.dispatcher }
/** /**
* Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host. * Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host.
@ -325,7 +571,7 @@ sealed class ActorRef private[akka] () extends TransactionManagement {
/** /**
* Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host. * Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host.
*/ */
def makeRemote(address: InetSocketAddress): Unit = _guard.withWriteLock { def makeRemote(address: InetSocketAddress): Unit = guard.withWriteLock {
if (isRunning) throw new IllegalStateException( if (isRunning) throw new IllegalStateException(
"Can't make a running actor remote. Make sure you call 'makeRemote' before 'start'.") "Can't make a running actor remote. Make sure you call 'makeRemote' before 'start'.")
else { else {
@ -343,40 +589,30 @@ sealed class ActorRef private[akka] () extends TransactionManagement {
* TransactionManagement.disableTransactions * TransactionManagement.disableTransactions
* </pre> * </pre>
*/ */
def makeTransactionRequired = _guard.withWriteLock { def makeTransactionRequired = guard.withWriteLock {
if (isRunning) throw new IllegalArgumentException( if (isRunning) throw new IllegalArgumentException(
"Can not make actor transaction required after it has been started") "Can not make actor transaction required after it has been started")
else isTransactor = true else isTransactor = true
} }
/**
* Set the contact address for this actor. This is used for replying to messages
* sent asynchronously when no reply channel exists.
*/
def setReplyToAddress(hostname: String, port: Int): Unit =
setReplyToAddress(new InetSocketAddress(hostname, port))
/** /**
* Set the contact address for this actor. This is used for replying to messages * Set the contact address for this actor. This is used for replying to messages
* sent asynchronously when no reply channel exists. * sent asynchronously when no reply channel exists.
*/ */
def setReplyToAddress(address: InetSocketAddress): Unit = def setReplyToAddress(address: InetSocketAddress): Unit =
_guard.withReadLock { _replyToAddress = Some(address) } guard.withReadLock { _replyToAddress = Some(address) }
/** /**
* Returns the id for the actor. * Returns the id for the actor.
*/ */
def id = actor.id def id = actor.id
/**
* Returns the uuid for the actor.
*/
def uuid = _uuid
/** /**
* Returns the remote address for the actor, if any, else None. * Returns the remote address for the actor, if any, else None.
*/ */
def remoteAddress: Option[InetSocketAddress] = _guard.withReadLock { _remoteAddress } def remoteAddress: Option[InetSocketAddress] = guard.withReadLock { _remoteAddress }
protected[akka] def remoteAddress_=(addr: Option[InetSocketAddress]): Unit =
guard.withWriteLock { _remoteAddress = addr }
/** /**
* User overridable callback/setting. * User overridable callback/setting.
@ -395,8 +631,8 @@ sealed class ActorRef private[akka] () extends TransactionManagement {
/** /**
* Starts up the actor and its message queue. * Starts up the actor and its message queue.
*/ */
def start: ActorRef = _guard.withWriteLock { def start: ActorRef = guard.withWriteLock {
if (_isShutDown) throw new IllegalStateException( if (isShutdown) throw new IllegalStateException(
"Can't restart an actor that has been shut down with 'stop' or 'exit'") "Can't restart an actor that has been shut down with 'stop' or 'exit'")
if (!isRunning) { if (!isRunning) {
dispatcher.register(this) dispatcher.register(this)
@ -412,21 +648,15 @@ sealed class ActorRef private[akka] () extends TransactionManagement {
/** /**
* Shuts down the actor its dispatcher and message queue. * Shuts down the actor its dispatcher and message queue.
* Alias for 'stop'.
*/ */
def exit = stop def stop = guard.withWriteLock {
/**
* Shuts down the actor its dispatcher and message queue.
*/
def stop = _guard.withWriteLock {
if (isRunning) { if (isRunning) {
dispatcher.unregister(this) dispatcher.unregister(this)
_isRunning = false _isRunning = false
_isShutDown = true _isShutDown = true
actor.shutdown actor.shutdown
ActorRegistry.unregister(this) ActorRegistry.unregister(this)
_remoteAddress.foreach(address => RemoteClient.unregister( remoteAddress.foreach(address => RemoteClient.unregister(
address.getHostName, address.getPort, uuid)) address.getHostName, address.getPort, uuid))
RemoteNode.unregister(this) RemoteNode.unregister(this)
} }
@ -442,10 +672,10 @@ sealed class ActorRef private[akka] () extends TransactionManagement {
* <p/> * <p/>
* To be invoked from within the actor itself. * To be invoked from within the actor itself.
*/ */
def link(actorRef: ActorRef) = _guard.withWriteLock { def link(actorRef: ActorRef) = guard.withWriteLock {
if (actorRef.supervisor.isDefined) throw new IllegalStateException( if (actorRef.supervisor.isDefined) throw new IllegalStateException(
"Actor can only have one supervisor [" + actorRef + "], e.g. link(actor) fails") "Actor can only have one supervisor [" + actorRef + "], e.g. link(actor) fails")
linkedActors.add(actorRef) linkedActors.put(actorRef.uuid, actorRef)
actorRef.supervisor = Some(this) actorRef.supervisor = Some(this)
Actor.log.debug("Linking actor [%s] to actor [%s]", actorRef, this) Actor.log.debug("Linking actor [%s] to actor [%s]", actorRef, this)
} }
@ -455,10 +685,10 @@ sealed class ActorRef private[akka] () extends TransactionManagement {
* <p/> * <p/>
* To be invoked from within the actor itself. * To be invoked from within the actor itself.
*/ */
def unlink(actorRef: ActorRef) = _guard.withWriteLock { def unlink(actorRef: ActorRef) = guard.withWriteLock {
if (!linkedActors.contains(actorRef)) throw new IllegalStateException( if (!linkedActors.containsKey(actorRef.uuid)) throw new IllegalStateException(
"Actor [" + actorRef + "] is not a linked actor, can't unlink") "Actor [" + actorRef + "] is not a linked actor, can't unlink")
linkedActors.remove(actorRef) linkedActors.remove(actorRef.uuid)
actorRef.supervisor = None actorRef.supervisor = None
Actor.log.debug("Unlinking actor [%s] from actor [%s]", actorRef, this) Actor.log.debug("Unlinking actor [%s] from actor [%s]", actorRef, this)
} }
@ -468,7 +698,7 @@ sealed class ActorRef private[akka] () extends TransactionManagement {
* <p/> * <p/>
* To be invoked from within the actor itself. * To be invoked from within the actor itself.
*/ */
def startLink(actorRef: ActorRef) = _guard.withWriteLock { def startLink(actorRef: ActorRef) = guard.withWriteLock {
try { try {
actorRef.start actorRef.start
} finally { } finally {
@ -481,7 +711,7 @@ sealed class ActorRef private[akka] () extends TransactionManagement {
* <p/> * <p/>
* To be invoked from within the actor itself. * To be invoked from within the actor itself.
*/ */
def startLinkRemote(actorRef: ActorRef, hostname: String, port: Int) = _guard.withWriteLock { def startLinkRemote(actorRef: ActorRef, hostname: String, port: Int) = guard.withWriteLock {
try { try {
actorRef.makeRemote(hostname, port) actorRef.makeRemote(hostname, port)
actorRef.start actorRef.start
@ -495,7 +725,7 @@ sealed class ActorRef private[akka] () extends TransactionManagement {
* <p/> * <p/>
* To be invoked from within the actor itself. * To be invoked from within the actor itself.
*/ */
def spawn[T <: Actor : Manifest]: ActorRef = _guard.withWriteLock { def spawn[T <: Actor : Manifest]: ActorRef = guard.withWriteLock {
val actorRef = spawnButDoNotStart[T] val actorRef = spawnButDoNotStart[T]
actorRef.start actorRef.start
actorRef actorRef
@ -506,7 +736,7 @@ sealed class ActorRef private[akka] () extends TransactionManagement {
* <p/> * <p/>
* To be invoked from within the actor itself. * To be invoked from within the actor itself.
*/ */
def spawnRemote[T <: Actor: Manifest](hostname: String, port: Int): ActorRef = _guard.withWriteLock { def spawnRemote[T <: Actor: Manifest](hostname: String, port: Int): ActorRef = guard.withWriteLock {
val actor = spawnButDoNotStart[T] val actor = spawnButDoNotStart[T]
actor.makeRemote(hostname, port) actor.makeRemote(hostname, port)
actor.start actor.start
@ -518,7 +748,7 @@ sealed class ActorRef private[akka] () extends TransactionManagement {
* <p/> * <p/>
* To be invoked from within the actor itself. * To be invoked from within the actor itself.
*/ */
def spawnLink[T <: Actor: Manifest]: ActorRef = _guard.withWriteLock { def spawnLink[T <: Actor: Manifest]: ActorRef = guard.withWriteLock {
val actor = spawnButDoNotStart[T] val actor = spawnButDoNotStart[T]
try { try {
actor.start actor.start
@ -533,7 +763,7 @@ sealed class ActorRef private[akka] () extends TransactionManagement {
* <p/> * <p/>
* To be invoked from within the actor itself. * To be invoked from within the actor itself.
*/ */
def spawnLinkRemote[T <: Actor : Manifest](hostname: String, port: Int): ActorRef = _guard.withWriteLock { def spawnLinkRemote[T <: Actor : Manifest](hostname: String, port: Int): ActorRef = guard.withWriteLock {
val actor = spawnButDoNotStart[T] val actor = spawnButDoNotStart[T]
try { try {
actor.makeRemote(hostname, port) actor.makeRemote(hostname, port)
@ -543,21 +773,6 @@ sealed class ActorRef private[akka] () extends TransactionManagement {
} }
} }
/**
* Is the actor killed?
*/
def isKilled: Boolean = _isKilled
/**
* Is the actor running?
*/
def isRunning: Boolean = _isRunning
/**
* Is the actor shut down?
*/
def isShutdown: Boolean = !_isRunning
/** /**
* Returns the mailbox size. * Returns the mailbox size.
*/ */
@ -566,8 +781,8 @@ sealed class ActorRef private[akka] () extends TransactionManagement {
/** /**
* Shuts down and removes all linked actors. * Shuts down and removes all linked actors.
*/ */
def shutdownLinkedActors: Unit = _guard.withWriteLock { def shutdownLinkedActors: Unit = guard.withWriteLock {
linkedActors.toArray.toList.asInstanceOf[List[ActorRef]].foreach(_.stop) linkedActorsAsList.foreach(_.stop)
linkedActors.clear linkedActors.clear
} }
@ -580,22 +795,13 @@ sealed class ActorRef private[akka] () extends TransactionManagement {
/** /**
* Returns the supervisor, if there is one. * Returns the supervisor, if there is one.
*/ */
def supervisor: Option[ActorRef] = _guard.withReadLock { _supervisor } def supervisor: Option[ActorRef] = guard.withReadLock { _supervisor }
private[akka] def supervisor_=(sup: Option[ActorRef]): Unit = _guard.withWriteLock { _supervisor = sup } protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit = guard.withWriteLock { _supervisor = sup }
private[akka] def trapExit: List[Class[_ <: Throwable]] = actor.trapExit private def spawnButDoNotStart[T <: Actor: Manifest]: ActorRef = guard.withWriteLock {
private[akka] def trapExit_=(exits: List[Class[_ <: Throwable]]) = actor.trapExit = exits
private[akka] def lifeCycle: Option[LifeCycle] = actor.lifeCycle
private[akka] def lifeCycle_=(cycle: Option[LifeCycle]) = actor.lifeCycle = cycle
private[akka] def faultHandler: Option[FaultHandlingStrategy] = actor.faultHandler
private[akka] def faultHandler_=(handler: Option[FaultHandlingStrategy]) = actor.faultHandler = handler
private def spawnButDoNotStart[T <: Actor: Manifest]: ActorRef = _guard.withWriteLock {
val actor = manifest[T].erasure.asInstanceOf[Class[T]].newInstance val actor = manifest[T].erasure.asInstanceOf[Class[T]].newInstance
val actorRef = new ActorRef(() => actor) val actorRef = Actor.newActor(() => actor)
if (!dispatcher.isInstanceOf[ThreadBasedDispatcher]) { if (!dispatcher.isInstanceOf[ThreadBasedDispatcher]) {
actorRef.dispatcher = dispatcher actorRef.dispatcher = dispatcher
} }
@ -628,12 +834,12 @@ sealed class ActorRef private[akka] () extends TransactionManagement {
protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = { protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = {
joinTransaction(message) joinTransaction(message)
if (_remoteAddress.isDefined) { if (remoteAddress.isDefined) {
val requestBuilder = RemoteRequestProtocol.newBuilder val requestBuilder = RemoteRequestProtocol.newBuilder
.setId(RemoteRequestProtocolIdFactory.nextId) .setId(RemoteRequestProtocolIdFactory.nextId)
.setTarget(this.getClass.getName) .setTarget(actorClass.getName)
.setTimeout(this.timeout) .setTimeout(timeout)
.setUuid(this.uuid) .setUuid(uuid)
.setIsActor(true) .setIsActor(true)
.setIsOneWay(true) .setIsOneWay(true)
.setIsEscaped(false) .setIsEscaped(false)
@ -644,7 +850,7 @@ sealed class ActorRef private[akka] () extends TransactionManagement {
senderOption.foreach(sender => requestBuilder.setSender(sender.toProtocol)) senderOption.foreach(sender => requestBuilder.setSender(sender.toProtocol))
RemoteProtocolBuilder.setMessage(message, requestBuilder) RemoteProtocolBuilder.setMessage(message, requestBuilder)
RemoteClient.clientFor(_remoteAddress.get).send[Any](requestBuilder.build, None) RemoteClient.clientFor(remoteAddress.get).send[Any](requestBuilder.build, None)
} else { } else {
val invocation = new MessageInvocation(this, message, senderOption.map(Left(_)), transactionSet.get) val invocation = new MessageInvocation(this, message, senderOption.map(Left(_)), transactionSet.get)
if (dispatcher.usesActorMailbox) { if (dispatcher.usesActorMailbox) {
@ -661,12 +867,12 @@ sealed class ActorRef private[akka] () extends TransactionManagement {
senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = {
joinTransaction(message) joinTransaction(message)
if (_remoteAddress.isDefined) { if (remoteAddress.isDefined) {
val requestBuilder = RemoteRequestProtocol.newBuilder val requestBuilder = RemoteRequestProtocol.newBuilder
.setId(RemoteRequestProtocolIdFactory.nextId) .setId(RemoteRequestProtocolIdFactory.nextId)
.setTarget(this.getClass.getName) .setTarget(actorClass.getName)
.setTimeout(this.timeout) .setTimeout(timeout)
.setUuid(this.uuid) .setUuid(uuid)
.setIsActor(true) .setIsActor(true)
.setIsOneWay(false) .setIsOneWay(false)
.setIsEscaped(false) .setIsEscaped(false)
@ -677,7 +883,7 @@ sealed class ActorRef private[akka] () extends TransactionManagement {
val id = registerSupervisorAsRemoteActor val id = registerSupervisorAsRemoteActor
if (id.isDefined) requestBuilder.setSupervisorUuid(id.get) if (id.isDefined) requestBuilder.setSupervisorUuid(id.get)
val future = RemoteClient.clientFor(_remoteAddress.get).send(requestBuilder.build, senderFuture) val future = RemoteClient.clientFor(remoteAddress.get).send(requestBuilder.build, senderFuture)
if (future.isDefined) future.get if (future.isDefined) future.get
else throw new IllegalStateException("Expected a future from remote call to actor " + toString) else throw new IllegalStateException("Expected a future from remote call to actor " + toString)
} else { } else {
@ -691,7 +897,7 @@ sealed class ActorRef private[akka] () extends TransactionManagement {
} }
} }
private[akka] def restart(reason: Throwable): Unit = { protected[akka] def restart(reason: Throwable): Unit = {
Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id) Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id)
val failedActor = actorInstance.get val failedActor = actorInstance.get
failedActor.synchronized { failedActor.synchronized {
@ -721,7 +927,7 @@ sealed class ActorRef private[akka] () extends TransactionManagement {
/** /**
* Callback for the dispatcher. E.g. single entry point to the user code and all protected[this] methods. * Callback for the dispatcher. E.g. single entry point to the user code and all protected[this] methods.
*/ */
private[akka] def invoke(messageHandle: MessageInvocation) = actor.synchronized { protected[akka] def invoke(messageHandle: MessageInvocation) = actor.synchronized {
try { try {
if (TransactionManagement.isTransactionalityEnabled) transactionalDispatch(messageHandle) if (TransactionManagement.isTransactionalityEnabled) transactionalDispatch(messageHandle)
else dispatch(messageHandle) else dispatch(messageHandle)
@ -732,7 +938,7 @@ sealed class ActorRef private[akka] () extends TransactionManagement {
} }
} }
private def dispatch[T](messageHandle: MessageInvocation) = _guard.withWriteLock { private def dispatch[T](messageHandle: MessageInvocation) = guard.withWriteLock {
setTransactionSet(messageHandle.transactionSet) setTransactionSet(messageHandle.transactionSet)
val message = messageHandle.message //serializeMessage(messageHandle.message) val message = messageHandle.message //serializeMessage(messageHandle.message)
@ -757,7 +963,7 @@ sealed class ActorRef private[akka] () extends TransactionManagement {
} }
} }
private def transactionalDispatch[T](messageHandle: MessageInvocation) = _guard.withWriteLock { private def transactionalDispatch[T](messageHandle: MessageInvocation) = guard.withWriteLock {
var topLevelTransaction = false var topLevelTransaction = false
val txSet: Option[CountDownCommitBarrier] = val txSet: Option[CountDownCommitBarrier] =
if (messageHandle.transactionSet.isDefined) messageHandle.transactionSet if (messageHandle.transactionSet.isDefined) messageHandle.transactionSet
@ -814,7 +1020,7 @@ sealed class ActorRef private[akka] () extends TransactionManagement {
} }
} }
private[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = _guard.withReadLock { protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = guard.withReadLock {
if (trapExit.exists(_.isAssignableFrom(reason.getClass))) { if (trapExit.exists(_.isAssignableFrom(reason.getClass))) {
if (faultHandler.isDefined) { if (faultHandler.isDefined) {
faultHandler.get match { faultHandler.get match {
@ -828,8 +1034,8 @@ sealed class ActorRef private[akka] () extends TransactionManagement {
} else _supervisor.foreach(_ ! Exit(dead, reason)) // if 'trapExit' is not defined then pass the Exit on } else _supervisor.foreach(_ ! Exit(dead, reason)) // if 'trapExit' is not defined then pass the Exit on
} }
private[akka] def restartLinkedActors(reason: Throwable) = _guard.withWriteLock { protected[akka] def restartLinkedActors(reason: Throwable) = guard.withWriteLock {
linkedActors.toArray.toList.asInstanceOf[List[ActorRef]].foreach { actorRef => linkedActorsAsList.foreach { actorRef =>
if (actorRef.lifeCycle.isEmpty) actorRef.lifeCycle = Some(LifeCycle(Permanent)) if (actorRef.lifeCycle.isEmpty) actorRef.lifeCycle = Some(LifeCycle(Permanent))
actorRef.lifeCycle.get match { actorRef.lifeCycle.get match {
case LifeCycle(scope, _) => { case LifeCycle(scope, _) => {
@ -839,7 +1045,7 @@ sealed class ActorRef private[akka] () extends TransactionManagement {
case Temporary => case Temporary =>
Actor.log.info("Actor [%s] configured as TEMPORARY and will not be restarted.", actorRef.id) Actor.log.info("Actor [%s] configured as TEMPORARY and will not be restarted.", actorRef.id)
actorRef.stop actorRef.stop
linkedActors.remove(actorRef) // remove the temporary actor linkedActors.remove(actorRef.uuid) // remove the temporary actor
// if last temporary actor is gone, then unlink me from supervisor // if last temporary actor is gone, then unlink me from supervisor
if (linkedActors.isEmpty) { if (linkedActors.isEmpty) {
Actor.log.info( Actor.log.info(
@ -854,21 +1060,24 @@ sealed class ActorRef private[akka] () extends TransactionManagement {
} }
} }
private[akka] def registerSupervisorAsRemoteActor: Option[String] = _guard.withWriteLock { protected[akka] def registerSupervisorAsRemoteActor: Option[String] = guard.withWriteLock {
if (_supervisor.isDefined) { if (_supervisor.isDefined) {
RemoteClient.clientFor(_remoteAddress.get).registerSupervisorForActor(this) RemoteClient.clientFor(remoteAddress.get).registerSupervisorForActor(this)
Some(_supervisor.get.uuid) Some(_supervisor.get.uuid)
} else None } else None
} }
private[akka] def linkedActors: JHashSet[ActorRef] = _guard.withWriteLock { protected[akka] def linkedActors: JMap[String, ActorRef] = guard.withWriteLock {
if (_linkedActors.isEmpty) { if (_linkedActors.isEmpty) {
val set = new JHashSet[ActorRef] val actors = new ConcurrentHashMap[String, ActorRef]
_linkedActors = Some(set) _linkedActors = Some(actors)
set actors
} else _linkedActors.get } else _linkedActors.get
} }
protected[akka] def linkedActorsAsList: List[ActorRef] =
linkedActors.values.toArray.toList.asInstanceOf[List[ActorRef]]
private def serializeMessage(message: AnyRef): AnyRef = if (Actor.SERIALIZE_MESSAGES) { private def serializeMessage(message: AnyRef): AnyRef = if (Actor.SERIALIZE_MESSAGES) {
if (!message.isInstanceOf[String] && if (!message.isInstanceOf[String] &&
!message.isInstanceOf[Byte] && !message.isInstanceOf[Byte] &&
@ -900,18 +1109,20 @@ sealed class ActorRef private[akka] () extends TransactionManagement {
* *
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
private[akka] class RemoteActorRef private ( private[akka] case class RemoteActorRef private[akka] (
// uuid: String, className: String, hostname: String, port: Int, timeOut: Long, isOnRemoteHost: Boolean) extends ActorRef { // uuid: String, className: String, hostname: String, port: Int, timeOut: Long, isOnRemoteHost: Boolean) extends ActorRef {
uuid: String, className: String, hostname: String, port: Int, timeOut: Long) uuuid: String, val className: String, val hostname: String, val port: Int, _timeout: Long)
extends ActorRef { extends ActorRef {
_uuid = uuuid
start
val remoteClient = RemoteClient.clientFor(hostname, port) val remoteClient = RemoteClient.clientFor(hostname, port)
override def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = { def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = {
val requestBuilder = RemoteRequestProtocol.newBuilder val requestBuilder = RemoteRequestProtocol.newBuilder
.setId(RemoteRequestProtocolIdFactory.nextId) .setId(RemoteRequestProtocolIdFactory.nextId)
.setTarget(className) .setTarget(className)
.setTimeout(timeOut) .setTimeout(timeout)
.setUuid(uuid) .setUuid(uuid)
.setIsActor(true) .setIsActor(true)
.setIsOneWay(true) .setIsOneWay(true)
@ -921,7 +1132,7 @@ private[akka] class RemoteActorRef private (
remoteClient.send[Any](requestBuilder.build, None) remoteClient.send[Any](requestBuilder.build, None)
} }
override def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( def postMessageToMailboxAndCreateFutureResultWithTimeout[T](
message: Any, message: Any,
timeout: Long, timeout: Long,
senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = {
@ -939,17 +1150,53 @@ private[akka] class RemoteActorRef private (
if (future.isDefined) future.get if (future.isDefined) future.get
else throw new IllegalStateException("Expected a future from remote call to actor " + toString) else throw new IllegalStateException("Expected a future from remote call to actor " + toString)
} }
}
/** def timeout: Long = _timeout
* Remote Actor proxy factory.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
private[akka] object RemoteActorRef {
// def apply(uuid: String, className: String, hostname: String, port: Int, timeout: Long, isOnRemoteHost: Boolean): ActorRef =
// (new RemoteActorRef(uuid, className, hostname, port, timeout, isOnRemoteHost)).start
def apply(uuid: String, className: String, hostname: String, port: Int, timeout: Long): ActorRef =
(new RemoteActorRef(uuid, className, hostname, port, timeout)).start
}
def start: ActorRef = {
_isRunning = true
this
}
def stop: Unit = {
_isRunning = false
_isShutDown = true
}
// ==== NOT SUPPORTED ====
def toBinary: Array[Byte] = unsupported
def actorClass: Class[_ <: Actor] = unsupported
def dispatcher_=(md: MessageDispatcher): Unit = unsupported
def dispatcher: MessageDispatcher = unsupported
def makeRemote(hostname: String, port: Int): Unit = unsupported
def makeRemote(address: InetSocketAddress): Unit = unsupported
def makeTransactionRequired: Unit = unsupported
def setReplyToAddress(address: InetSocketAddress): Unit = unsupported
def id: String = unsupported
def remoteAddress: Option[InetSocketAddress] = unsupported
def timeout_=(t: Long) = unsupported
def link(actorRef: ActorRef): Unit = unsupported
def unlink(actorRef: ActorRef): Unit = unsupported
def startLink(actorRef: ActorRef): Unit = unsupported
def startLinkRemote(actorRef: ActorRef, hostname: String, port: Int): Unit = unsupported
def spawn[T <: Actor : Manifest]: ActorRef = unsupported
def spawnRemote[T <: Actor: Manifest](hostname: String, port: Int): ActorRef = unsupported
def spawnLink[T <: Actor: Manifest]: ActorRef = unsupported
def spawnLinkRemote[T <: Actor : Manifest](hostname: String, port: Int): ActorRef = unsupported
def mailboxSize: Int = unsupported
def supervisor: Option[ActorRef] = unsupported
def shutdownLinkedActors: Unit = unsupported
protected[akka] def toProtocol: ActorRefProtocol = unsupported
protected[akka] def mailbox: Deque[MessageInvocation] = unsupported
protected[akka] def restart(reason: Throwable): Unit = unsupported
protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = unsupported
protected[akka] def restartLinkedActors(reason: Throwable): Unit = unsupported
protected[akka] def registerSupervisorAsRemoteActor: Option[String] = unsupported
protected[akka] def linkedActors: JMap[String, ActorRef] = unsupported
protected[akka] def linkedActorsAsList: List[ActorRef] = unsupported
protected[akka] def invoke(messageHandle: MessageInvocation): Unit = unsupported
protected[akka] def remoteAddress_=(addr: Option[InetSocketAddress]): Unit = unsupported
protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit = unsupported
protected[this] def actorInstance: AtomicReference[Actor] = unsupported
private def unsupported = throw new UnsupportedOperationException("Not supported for RemoteActorRef")
}

View file

@ -92,27 +92,28 @@ object ActorRegistry extends Logging {
/** /**
* Registers an actor in the ActorRegistry. * Registers an actor in the ActorRegistry.
*/ */
def register(actorId: ActorRef) = { def register(actor: ActorRef) = {
// UUID // UUID
actorsByUUID.put(actorId.uuid, actorId) actorsByUUID.put(actor.uuid, actor)
// ID // ID
val id = actorId.id val id = actor.id
if (id eq null) throw new IllegalStateException("Actor.id is null " + actorId) if (id eq null) throw new IllegalStateException("Actor.id is null " + actor)
if (actorsById.containsKey(id)) actorsById.put(id, actorId :: actorsById.get(id)) if (actorsById.containsKey(id)) actorsById.put(id, actor :: actorsById.get(id))
else actorsById.put(id, actorId :: Nil) else actorsById.put(id, actor :: Nil)
// Class name // Class name
val className = actorId.actor.getClass.getName val className = actor.actor.getClass.getName
if (actorsByClassName.containsKey(className)) { if (actorsByClassName.containsKey(className)) {
actorsByClassName.put(className, actorId :: actorsByClassName.get(className)) actorsByClassName.put(className, actor :: actorsByClassName.get(className))
} else actorsByClassName.put(className, actorId :: Nil) } else actorsByClassName.put(className, actor :: Nil)
// notify listeners // notify listeners
foreachListener(_ ! ActorRegistered(actorId)) foreachListener(_ ! ActorRegistered(actor))
} }
/** /**
* FIXME: WRONG - unregisters all actors with the same id and class name, should remove the right one in each list
* Unregisters an actor in the ActorRegistry. * Unregisters an actor in the ActorRegistry.
*/ */
def unregister(actor: ActorRef) = { def unregister(actor: ActorRef) = {

View file

@ -10,11 +10,91 @@ import se.scalablesolutions.akka.util.Helpers._
import se.scalablesolutions.akka.util.Logging import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.dispatch.Dispatchers import se.scalablesolutions.akka.dispatch.Dispatchers
import se.scalablesolutions.akka.remote.RemoteServer import se.scalablesolutions.akka.remote.RemoteServer
import Actor._
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
/** /**
* Abstract base class for all supervisor factories. * Factory object for creating supervisors declarative. It creates instances of the 'Supervisor' class.
* These are not actors, if you need a supervisor that is an Actor then you have to use the 'SupervisorActor'
* factory object.
* <p/>
*
* Here is a sample on how to use it:
* <pre>
* val supervisor = Supervisor(
* SupervisorConfig(
* RestartStrategy(OneForOne, 3, 10, List(classOf[Exception]),
* Supervise(
* myFirstActor,
* LifeCycle(Permanent)) ::
* Supervise(
* mySecondActor,
* LifeCycle(Permanent)) ::
* Nil))
* </pre>
*
* You can use the declaratively created Supervisor to link and unlink child children
* dynamically using the 'link' and 'unlink' methods.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object Supervisor {
def apply(config: SupervisorConfig): Supervisor = SupervisorFactory(config).newInstance.start
}
/**
* Factory object for creating supervisors as Actors, it has both a declarative and programatic API.
* <p/>
*
* Here is a sample on how to use the programmatic API (note that the supervisor is automatically started):
* <pre>
* val supervisor = SupervisorActor(AllForOneStrategy(maxNrOfRetries, timeRange), Array(classOf[Throwable]))
*
* // link and unlink child actors dynamically
* supervisor ! Link(child1) // starts the actor if not started yet, starts and links atomically
* supervisor ! Unlink(child2)
* supervisor ! UnlinkAndStop(child3)
* </pre>
*
* Here is a sample on how to use the declarative API:
* <pre>
* val supervisor = SupervisorActor(
* SupervisorConfig(
* RestartStrategy(OneForOne, 3, 10, List(classOf[Exception]),
* Supervise(
* myFirstActor,
* LifeCycle(Permanent)) ::
* Supervise(
* mySecondActor,
* LifeCycle(Permanent)) ::
* Nil))
*
* // link and unlink child actors dynamically
* supervisor ! Link(child1) // starts the actor if not started yet, starts and links atomically
* supervisor ! Unlink(child2)
* supervisor ! UnlinkAndStop(child3)
* </pre>
*
* You can use the declaratively created Supervisor to link and unlink child children
* dynamically using the 'link' and 'unlink' methods.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object SupervisorActor {
def apply(config: SupervisorConfig): ActorRef = {
val (handler, trapExits) = SupervisorFactory.retrieveFaultHandlerAndTrapExitsFrom(config)
newActor(() => new SupervisorActor(handler, trapExits)).start
}
def apply(handler: FaultHandlingStrategy, trapExceptions: List[Class[_ <: Throwable]]): ActorRef =
newActor(() => new SupervisorActor(handler, trapExceptions)).start
}
/**
* Use this factory instead of the Supervisor factory object if you want to control
* instantiation and starting of the Supervisor, if not then it is easier and better
* to use the Supervisor factory object.
* <p> * <p>
* Example usage: * Example usage:
* <pre> * <pre>
@ -39,29 +119,37 @@ import java.util.concurrent.ConcurrentHashMap
* *
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
class SupervisorFactory(val config: SupervisorConfig) extends Logging {
type ExceptionList = List[Class[_ <: Throwable]]
def newInstance: ActorRef = newInstanceFor(config)
def newInstanceFor(config: SupervisorConfig): ActorRef = config match {
case SupervisorConfig(restartStrategy, _) =>
val supervisor = create(restartStrategy)
supervisor.configure(config, this)
Actor.newActor(() => supervisor).start
}
protected def create(strategy: RestartStrategy): Supervisor = strategy match {
case RestartStrategy(scheme, maxNrOfRetries, timeRange, trapExceptions: ExceptionList) =>
scheme match {
case AllForOne => new Supervisor(AllForOneStrategy(maxNrOfRetries, timeRange), trapExceptions)
case OneForOne => new Supervisor(OneForOneStrategy(maxNrOfRetries, timeRange), trapExceptions)
}
}
}
object SupervisorFactory { object SupervisorFactory {
def apply(config: SupervisorConfig) = new SupervisorFactory(config) def apply(config: SupervisorConfig) = new SupervisorFactory(config)
private[akka] def retrieveFaultHandlerAndTrapExitsFrom(config: SupervisorConfig):
Tuple2[FaultHandlingStrategy, List[Class[_ <: Throwable]]] = config match {
case SupervisorConfig(RestartStrategy(scheme, maxNrOfRetries, timeRange, trapExceptions), _) =>
scheme match {
case AllForOne => (AllForOneStrategy(maxNrOfRetries, timeRange), trapExceptions)
case OneForOne => (OneForOneStrategy(maxNrOfRetries, timeRange), trapExceptions)
}
}
}
/**
* For internal use only.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class SupervisorFactory private[akka] (val config: SupervisorConfig) extends Logging {
type ExceptionList = List[Class[_ <: Throwable]]
def newInstance: Supervisor = newInstanceFor(config)
def newInstanceFor(config: SupervisorConfig): Supervisor = {
val (handler, trapExits) = SupervisorFactory.retrieveFaultHandlerAndTrapExitsFrom(config)
val supervisor = new Supervisor(handler, trapExits)
supervisor.configure(config, this)
supervisor.start
supervisor
}
} }
/** /**
@ -69,42 +157,38 @@ object SupervisorFactory {
* <p/> * <p/>
* The supervisor class is only used for the configuration system when configuring supervisor * The supervisor class is only used for the configuration system when configuring supervisor
* hierarchies declaratively. Should not be used as part of the regular programming API. Instead * hierarchies declaratively. Should not be used as part of the regular programming API. Instead
* wire the actors together using 'link', 'spawnLink' etc. and set the 'trapExit' flag in the * wire the children together using 'link', 'spawnLink' etc. and set the 'trapExit' flag in the
* actors that should trap error signals and trigger restart. * children that should trap error signals and trigger restart.
* <p/> * <p/>
* See the ScalaDoc for the SupervisorFactory for an example on how to declaratively wire up actors. * See the ScalaDoc for the SupervisorFactory for an example on how to declaratively wire up children.
* *
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExceptions: List[Class[_ <: Throwable]]) sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExceptions: List[Class[_ <: Throwable]])
extends Actor with Logging with Configurator { extends Configurator {
trapExit = trapExceptions private val children = new ConcurrentHashMap[String, List[ActorRef]]
faultHandler = Some(handler) private val supervisor = SupervisorActor(handler, trapExceptions)
def uuid = supervisor.uuid
def start: Supervisor = {
ConfiguratorRepository.registerConfigurator(this)
this
}
// FIXME should Supervisor really havea newThreadBasedDispatcher?? def shutdown: Unit = supervisor.stop
self.dispatcher = Dispatchers.newThreadBasedDispatcher(this)
private val actors = new ConcurrentHashMap[String, List[ActorRef]] def link(child: ActorRef) = supervisor ! Link(child)
// Cheating, should really go through the dispatcher rather than direct access to a CHM def unlink(child: ActorRef) = supervisor ! Unlink(child)
def getInstance[T](clazz: Class[T]): List[T] = actors.get(clazz.getName).asInstanceOf[List[T]]
def getInstance[T](clazz: Class[T]): List[T] = children.get(clazz.getName).asInstanceOf[List[T]]
def getComponentInterfaces: List[Class[_]] = def getComponentInterfaces: List[Class[_]] =
actors.values.toArray.toList.asInstanceOf[List[List[AnyRef]]].flatten.map(_.getClass) children.values.toArray.toList.asInstanceOf[List[List[AnyRef]]].flatten.map(_.getClass)
def isDefined(clazz: Class[_]): Boolean = actors.containsKey(clazz.getName) def isDefined(clazz: Class[_]): Boolean = children.containsKey(clazz.getName)
override def init: Unit = synchronized {
ConfiguratorRepository.registerConfigurator(this)
}
override def shutdown: Unit = synchronized { self.shutdownLinkedActors }
def receive = {
case unknown => throw new IllegalArgumentException(
"Supervisor " + toString + " does not respond to any messages. Unknown message [" + unknown + "]")
}
def configure(config: SupervisorConfig, factory: SupervisorFactory) = config match { def configure(config: SupervisorConfig, factory: SupervisorFactory) = config match {
case SupervisorConfig(_, servers) => case SupervisorConfig(_, servers) =>
@ -113,32 +197,62 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExcep
case Supervise(actorRef, lifeCycle, remoteAddress) => case Supervise(actorRef, lifeCycle, remoteAddress) =>
val className = actorRef.actor.getClass.getName val className = actorRef.actor.getClass.getName
val currentActors = { val currentActors = {
val list = actors.get(className) val list = children.get(className)
if (list eq null) List[ActorRef]() if (list eq null) List[ActorRef]()
else list else list
} }
actors.put(className, actorRef :: currentActors) children.put(className, actorRef :: currentActors)
actorRef.actor.lifeCycle = Some(lifeCycle) actorRef.actor.lifeCycle = Some(lifeCycle)
startLink(actorRef) supervisor ! Link(actorRef)
remoteAddress.foreach(address => RemoteServer.actorsFor( remoteAddress.foreach(address => RemoteServer.actorsFor(
RemoteServer.Address(address.hostname, address.port)) RemoteServer.Address(address.hostname, address.port))
.actors.put(actorRef.id, actorRef)) .actors.put(actorRef.id, actorRef))
case supervisorConfig @ SupervisorConfig(_, _) => // recursive supervisor configuration case supervisorConfig @ SupervisorConfig(_, _) => // recursive supervisor configuration
val supervisor = { val childSupervisor = SupervisorActor(supervisorConfig)
val instance = factory.newInstanceFor(supervisorConfig) childSupervisor.lifeCycle = Some(LifeCycle(Permanent))
instance.start val className = childSupervisor.uuid
instance
}
supervisor.lifeCycle = Some(LifeCycle(Permanent))
val className = supervisor.actorClass.getName
val currentSupervisors = { val currentSupervisors = {
val list = actors.get(className) val list = children.get(className)
if (list eq null) List[ActorRef]() if (list eq null) List[ActorRef]()
else list else list
} }
actors.put(className, supervisor :: currentSupervisors) children.put(className, childSupervisor :: currentSupervisors)
link(supervisor) supervisor ! Link(childSupervisor)
}) })
} }
} }
/**
* Use this class when you want to create a supervisor dynamically that should only
* manage its child children and not have any functionality by itself.
* <p/>
* Here is a sample on how to use it:
* <pre>
* val supervisor = Supervisor(AllForOneStrategy(maxNrOfRetries, timeRange), Array(classOf[Throwable]))
* supervisor ! Link(child1) // starts the actor if not started yet, starts and links atomically
* supervisor ! Unlink(child2)
* supervisor ! UnlinkAndStop(child3)
* </pre>
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
final class SupervisorActor private[akka] (
handler: FaultHandlingStrategy,
trapExceptions: List[Class[_ <: Throwable]])
extends Actor {
trapExit = trapExceptions
faultHandler = Some(handler)
override def shutdown: Unit = self.shutdownLinkedActors
def receive = {
case Link(child) => startLink(child)
case Unlink(child) => unlink(child)
case UnlinkAndStop(child) => unlink(child); child.stop
case unknown => throw new IllegalArgumentException(
"Supervisor can only respond to 'Link' and 'Unlink' messages. Unknown message [" + unknown + "]")
}
}

View file

@ -7,7 +7,7 @@ package se.scalablesolutions.akka.config
import com.google.inject._ import com.google.inject._
import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.actor.{Supervisor, ActiveObject, Dispatcher, ActorRef} import se.scalablesolutions.akka.actor.{Supervisor, ActiveObject, Dispatcher, ActorRef, Actor}
import se.scalablesolutions.akka.remote.RemoteServer import se.scalablesolutions.akka.remote.RemoteServer
import se.scalablesolutions.akka.util.Logging import se.scalablesolutions.akka.util.Logging
@ -24,7 +24,7 @@ import java.lang.reflect.Method
*/ */
private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfiguratorBase with Logging { private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfiguratorBase with Logging {
private var injector: Injector = _ private var injector: Injector = _
private var supervisor: Option[ActorRef] = None private var supervisor: Option[Supervisor] = None
private var restartStrategy: RestartStrategy = _ private var restartStrategy: RestartStrategy = _
private var components: List[Component] = _ private var components: List[Component] = _
private var supervised: List[Supervise] = Nil private var supervised: List[Supervise] = Nil
@ -82,7 +82,7 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat
private def newSubclassingProxy(component: Component): DependencyBinding = { private def newSubclassingProxy(component: Component): DependencyBinding = {
val targetClass = component.target val targetClass = component.target
val actorRef = new ActorRef(() => new Dispatcher(component.transactionRequired, component.lifeCycle.callbacks)) val actorRef = Actor.newActor(() => new Dispatcher(component.transactionRequired, component.lifeCycle.callbacks))
if (component.dispatcher.isDefined) actorRef.dispatcher = component.dispatcher.get if (component.dispatcher.isDefined) actorRef.dispatcher = component.dispatcher.get
val remoteAddress = val remoteAddress =
if (component.remoteAddress.isDefined) if (component.remoteAddress.isDefined)
@ -103,7 +103,7 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat
val targetClass = component.intf.get val targetClass = component.intf.get
val targetInstance = component.target.newInstance.asInstanceOf[AnyRef] // TODO: perhaps need to put in registry val targetInstance = component.target.newInstance.asInstanceOf[AnyRef] // TODO: perhaps need to put in registry
component.target.getConstructor(Array[Class[_]](): _*).setAccessible(true) component.target.getConstructor(Array[Class[_]](): _*).setAccessible(true)
val actorRef = new ActorRef(() => new Dispatcher(component.transactionRequired, component.lifeCycle.callbacks)) val actorRef = Actor.newActor(() => new Dispatcher(component.transactionRequired, component.lifeCycle.callbacks))
if (component.dispatcher.isDefined) actorRef.dispatcher = component.dispatcher.get if (component.dispatcher.isDefined) actorRef.dispatcher = component.dispatcher.get
val remoteAddress = val remoteAddress =
if (component.remoteAddress.isDefined) if (component.remoteAddress.isDefined)
@ -130,7 +130,6 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat
override def supervise: ActiveObjectConfiguratorBase = synchronized { override def supervise: ActiveObjectConfiguratorBase = synchronized {
if (injector eq null) inject if (injector eq null) inject
supervisor = Some(ActiveObject.supervise(restartStrategy, supervised)) supervisor = Some(ActiveObject.supervise(restartStrategy, supervised))
supervisor.get.start
ConfiguratorRepository.registerConfigurator(this) ConfiguratorRepository.registerConfigurator(this)
this this
} }
@ -164,7 +163,7 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat
} }
def stop = synchronized { def stop = synchronized {
if (supervisor.isDefined) supervisor.get.stop if (supervisor.isDefined) supervisor.get.shutdown
} }
} }

View file

@ -65,8 +65,8 @@ class ExecutorBasedEventDrivenDispatcher(_name: String) extends MessageDispatche
var lockAcquiredOnce = false var lockAcquiredOnce = false
// this do-wile loop is required to prevent missing new messages between the end of the inner while // this do-wile loop is required to prevent missing new messages between the end of the inner while
// loop and releasing the lock // loop and releasing the lock
val lock = invocation.receiver._dispatcherLock val lock = invocation.receiver.dispatcherLock
val mailbox = invocation.receiver._mailbox val mailbox = invocation.receiver.mailbox
do { do {
if (lock.tryLock) { if (lock.tryLock) {
lockAcquiredOnce = true lockAcquiredOnce = true

View file

@ -70,7 +70,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
*/ */
private def tryProcessMailbox(receiver: ActorRef): Boolean = { private def tryProcessMailbox(receiver: ActorRef): Boolean = {
var lockAcquiredOnce = false var lockAcquiredOnce = false
val lock = receiver._dispatcherLock val lock = receiver.dispatcherLock
// this do-wile loop is required to prevent missing new messages between the end of processing // this do-wile loop is required to prevent missing new messages between the end of processing
// the mailbox and releasing the lock // the mailbox and releasing the lock
do { do {
@ -82,7 +82,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
lock.unlock lock.unlock
} }
} }
} while ((lockAcquiredOnce && !receiver._mailbox.isEmpty)) } while ((lockAcquiredOnce && !receiver.mailbox.isEmpty))
return lockAcquiredOnce return lockAcquiredOnce
} }
@ -91,10 +91,10 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
* Process the messages in the mailbox of the given actor. * Process the messages in the mailbox of the given actor.
*/ */
private def processMailbox(receiver: ActorRef) = { private def processMailbox(receiver: ActorRef) = {
var messageInvocation = receiver._mailbox.poll var messageInvocation = receiver.mailbox.poll
while (messageInvocation != null) { while (messageInvocation != null) {
messageInvocation.invoke messageInvocation.invoke
messageInvocation = receiver._mailbox.poll messageInvocation = receiver.mailbox.poll
} }
} }
@ -128,7 +128,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
val index = (i + startIndex) % actors.length val index = (i + startIndex) % actors.length
val actor = actors(index) val actor = actors(index)
if (actor != receiver) { // skip ourselves if (actor != receiver) { // skip ourselves
if (actor._mailbox.isEmpty) { // only pick actors that will most likely be able to process the messages if (actor.mailbox.isEmpty) { // only pick actors that will most likely be able to process the messages
return (Some(actor), index) return (Some(actor), index)
} }
} }
@ -141,11 +141,11 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
* the thiefs dispatching lock, because in that case another thread is already processing the thiefs mailbox. * the thiefs dispatching lock, because in that case another thread is already processing the thiefs mailbox.
*/ */
private def tryDonateAndProcessMessages(receiver: ActorRef, thief: ActorRef) = { private def tryDonateAndProcessMessages(receiver: ActorRef, thief: ActorRef) = {
if (thief._dispatcherLock.tryLock) { if (thief.dispatcherLock.tryLock) {
try { try {
donateAndProcessMessages(receiver, thief) donateAndProcessMessages(receiver, thief)
} finally { } finally {
thief._dispatcherLock.unlock thief.dispatcherLock.unlock
} }
} }
} }
@ -170,7 +170,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
* Steal a message from the receiver and give it to the thief. * Steal a message from the receiver and give it to the thief.
*/ */
private def donateMessage(receiver: ActorRef, thief: ActorRef): Option[MessageInvocation] = { private def donateMessage(receiver: ActorRef, thief: ActorRef): Option[MessageInvocation] = {
val donated = receiver._mailbox.pollLast val donated = receiver.mailbox.pollLast
if (donated != null) { if (donated != null) {
thief.self ! donated.message thief.self ! donated.message
return Some(donated) return Some(donated)

View file

@ -17,7 +17,7 @@ import se.scalablesolutions.akka.actor.{Actor, ActorRef, ActorMessageInvoker}
class ThreadBasedDispatcher private[akka] (val name: String, val messageHandler: MessageInvoker) class ThreadBasedDispatcher private[akka] (val name: String, val messageHandler: MessageInvoker)
extends MessageDispatcher { extends MessageDispatcher {
def this(actor: Actor) = this(actor.getClass.getName, new ActorMessageInvoker(new ActorRef(() => actor))) def this(actor: Actor) = this(actor.getClass.getName, new ActorMessageInvoker(Actor.newActor(() => actor)))
private val queue = new BlockingMessageQueue(name) private val queue = new BlockingMessageQueue(name)
private var selectorThread: Thread = _ private var selectorThread: Thread = _

View file

@ -251,7 +251,7 @@ object Cluster extends Cluster with Logging {
fqn => fqn =>
val a = Class.forName(fqn).newInstance.asInstanceOf[ClusterActor] val a = Class.forName(fqn).newInstance.asInstanceOf[ClusterActor]
a setSerializer serializer a setSerializer serializer
new ActorRef(() => a) Actor.newActor(() => a)
} }
} }
catch { catch {
@ -261,15 +261,11 @@ object Cluster extends Cluster with Logging {
} }
} }
private[akka] def createSupervisor(actor: ActorRef): Option[ActorRef] = { private[akka] def createSupervisor(actor: ActorRef): Option[Supervisor] =
val sup = SupervisorFactory( Some(Supervisor(
SupervisorConfig( SupervisorConfig(
RestartStrategy(OneForOne, 5, 1000, List(classOf[Exception])), RestartStrategy(OneForOne, 5, 1000, List(classOf[Exception])),
Supervise(actor, LifeCycle(Permanent)) :: Nil) Supervise(actor, LifeCycle(Permanent)) :: Nil)))
).newInstance
Some(sup)
}
def name = clusterActor.map(_.name).getOrElse("No cluster") def name = clusterActor.map(_.name).getOrElse("No cluster")
@ -303,7 +299,7 @@ object Cluster extends Cluster with Logging {
log.info("Shutting down Cluster Service...") log.info("Shutting down Cluster Service...")
for { for {
c <- clusterActorRef c <- clusterActorRef
s <- c._supervisor s <- c.supervisor
} s.stop } s.stop
clusterActor = None clusterActor = None
} }

View file

@ -226,7 +226,7 @@ class RemoteServer extends Logging {
log.info("Unregistering server side remote actor [%s] with id [%s]", actorRef.actorClass.getName, actorRef.id) log.info("Unregistering server side remote actor [%s] with id [%s]", actorRef.actorClass.getName, actorRef.id)
val server = RemoteServer.actorsFor(RemoteServer.Address(hostname, port)) val server = RemoteServer.actorsFor(RemoteServer.Address(hostname, port))
server.actors.remove(actorRef.id) server.actors.remove(actorRef.id)
if (actorRef._registeredInRemoteNodeDuringSerialization) server.actors.remove(actorRef.uuid) if (actorRef.registeredInRemoteNodeDuringSerialization) server.actors.remove(actorRef.uuid)
} }
} }
@ -241,7 +241,7 @@ class RemoteServer extends Logging {
val server = RemoteServer.actorsFor(RemoteServer.Address(hostname, port)) val server = RemoteServer.actorsFor(RemoteServer.Address(hostname, port))
val actorRef = server.actors.get(id) val actorRef = server.actors.get(id)
server.actors.remove(id) server.actors.remove(id)
if (actorRef._registeredInRemoteNodeDuringSerialization) server.actors.remove(actorRef.uuid) if (actorRef.registeredInRemoteNodeDuringSerialization) server.actors.remove(actorRef.uuid)
} }
} }
} }
@ -464,10 +464,10 @@ class RemoteServerHandler(
log.info("Creating a new remote actor [%s:%s]", name, uuid) log.info("Creating a new remote actor [%s:%s]", name, uuid)
val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(name) val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(name)
else Class.forName(name) else Class.forName(name)
val actorRef = new ActorRef(() => clazz.newInstance.asInstanceOf[Actor]) val actorRef = Actor.newActor(() => clazz.newInstance.asInstanceOf[Actor])
actorRef._uuid = uuid actorRef.uuid = uuid
actorRef.timeout = timeout actorRef.timeout = timeout
actorRef._remoteAddress = None actorRef.remoteAddress = None
actors.put(uuid, actorRef) actors.put(uuid, actorRef)
actorRef actorRef
} catch { } catch {

View file

@ -5,7 +5,6 @@
package se.scalablesolutions.akka.util package se.scalablesolutions.akka.util
import java.security.MessageDigest import java.security.MessageDigest
import java.util.concurrent.locks.ReentrantReadWriteLock
class SystemFailure(cause: Throwable) extends RuntimeException(cause) class SystemFailure(cause: Throwable) extends RuntimeException(cause)
@ -38,30 +37,5 @@ object Helpers extends Logging {
}) })
sb.toString sb.toString
} }
// ================================================
class ReadWriteLock {
private val rwl = new ReentrantReadWriteLock
private val readLock = rwl.readLock
private val writeLock = rwl.writeLock
def withWriteLock[T](body: => T): T = {
writeLock.lock
try {
body
} finally {
writeLock.unlock
}
}
def withReadLock[T](body: => T): T = {
readLock.lock
try {
body
} finally {
readLock.unlock
}
}
}
} }

View file

@ -0,0 +1,35 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.util
import java.util.concurrent.locks.ReentrantReadWriteLock
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class ReadWriteLock {
private val rwl = new ReentrantReadWriteLock
private val readLock = rwl.readLock
private val writeLock = rwl.writeLock
def withWriteLock[T](body: => T): T = {
writeLock.lock
try {
body
} finally {
writeLock.unlock
}
}
def withReadLock[T](body: => T): T = {
readLock.lock
try {
body
} finally {
readLock.unlock
}
}
}

View file

@ -13,7 +13,7 @@ object ActorFireForgetRequestReplySpec {
def receive = { def receive = {
case "Send" => reply("Reply") case "Send" => reply("Reply")
case "SendImplicit" => replyTo.get.left.get ! "ReplyImplicit" case "SendImplicit" => self.replyTo.get.left.get ! "ReplyImplicit"
} }
} }

View file

@ -19,18 +19,18 @@ object ActorRegistrySpec {
class ActorRegistrySpec extends JUnitSuite { class ActorRegistrySpec extends JUnitSuite {
import ActorRegistrySpec._ import ActorRegistrySpec._
@Test def shouldGetActorByIdFromActorRegistry = { @Test def shouldGetActorByIdFromActorRegistry {
ActorRegistry.shutdownAll ActorRegistry.shutdownAll
val actor = newActor[TestActor] val actor = newActor[TestActor]
actor.start actor.start
val actors = ActorRegistry.actorsFor("MyID") val actors = ActorRegistry.actorsFor("MyID")
assert(actors.size === 1) assert(actors.size === 1)
assert(actors.head.actor.isInstanceOf[TestActor]) assert(actors.head.actor.isInstanceOf[TestActor])
assert(actors.head.actor.asInstanceOf[TestActor].getId === "MyID") assert(actors.head.actor.asInstanceOf[TestActor].id === "MyID")
actor.stop actor.stop
} }
@Test def shouldGetActorByUUIDFromActorRegistry = { @Test def shouldGetActorByUUIDFromActorRegistry {
ActorRegistry.shutdownAll ActorRegistry.shutdownAll
val actor = newActor[TestActor] val actor = newActor[TestActor]
val uuid = actor.uuid val uuid = actor.uuid
@ -41,29 +41,29 @@ class ActorRegistrySpec extends JUnitSuite {
actor.stop actor.stop
} }
@Test def shouldGetActorByClassFromActorRegistry = { @Test def shouldGetActorByClassFromActorRegistry {
ActorRegistry.shutdownAll ActorRegistry.shutdownAll
val actor = newActor[TestActor] val actor = newActor[TestActor]
actor.start actor.start
val actors = ActorRegistry.actorsFor(classOf[TestActor]) val actors = ActorRegistry.actorsFor(classOf[TestActor])
assert(actors.size === 1) assert(actors.size === 1)
assert(actors.head.actor.isInstanceOf[TestActor]) assert(actors.head.actor.isInstanceOf[TestActor])
assert(actors.head.actor.asInstanceOf[TestActor].getId === "MyID") assert(actors.head.actor.asInstanceOf[TestActor].id === "MyID")
actor.stop actor.stop
} }
@Test def shouldGetActorByManifestFromActorRegistry = { @Test def shouldGetActorByManifestFromActorRegistry {
ActorRegistry.shutdownAll ActorRegistry.shutdownAll
val actor = newActor[TestActor] val actor = newActor[TestActor]
actor.start actor.start
val actors = ActorRegistry.actorsFor[TestActor] val actors = ActorRegistry.actorsFor[TestActor]
assert(actors.size === 1) assert(actors.size === 1)
assert(actors.head.actor.isInstanceOf[TestActor]) assert(actors.head.actor.isInstanceOf[TestActor])
assert(actors.head.actor.asInstanceOf[TestActor].getId === "MyID") assert(actors.head.actor.asInstanceOf[TestActor].id === "MyID")
actor.stop actor.stop
} }
@Test def shouldGetActorsByIdFromActorRegistry = { @Test def shouldGetActorsByIdFromActorRegistry {
ActorRegistry.shutdownAll ActorRegistry.shutdownAll
val actor1 = newActor[TestActor] val actor1 = newActor[TestActor]
actor1.start actor1.start
@ -72,14 +72,14 @@ class ActorRegistrySpec extends JUnitSuite {
val actors = ActorRegistry.actorsFor("MyID") val actors = ActorRegistry.actorsFor("MyID")
assert(actors.size === 2) assert(actors.size === 2)
assert(actors.head.actor.isInstanceOf[TestActor]) assert(actors.head.actor.isInstanceOf[TestActor])
assert(actors.head.actor.asInstanceOf[TestActor].getId === "MyID") assert(actors.head.actor.asInstanceOf[TestActor].id === "MyID")
assert(actors.last.actor.isInstanceOf[TestActor]) assert(actors.last.actor.isInstanceOf[TestActor])
assert(actors.last.actor.asInstanceOf[TestActor].getId === "MyID") assert(actors.last.actor.asInstanceOf[TestActor].id === "MyID")
actor1.stop actor1.stop
actor2.stop actor2.stop
} }
@Test def shouldGetActorsByClassFromActorRegistry = { @Test def shouldGetActorsByClassFromActorRegistry {
ActorRegistry.shutdownAll ActorRegistry.shutdownAll
val actor1 = newActor[TestActor] val actor1 = newActor[TestActor]
actor1.start actor1.start
@ -88,14 +88,14 @@ class ActorRegistrySpec extends JUnitSuite {
val actors = ActorRegistry.actorsFor(classOf[TestActor]) val actors = ActorRegistry.actorsFor(classOf[TestActor])
assert(actors.size === 2) assert(actors.size === 2)
assert(actors.head.actor.isInstanceOf[TestActor]) assert(actors.head.actor.isInstanceOf[TestActor])
assert(actors.head.actor.asInstanceOf[TestActor].getId === "MyID") assert(actors.head.actor.asInstanceOf[TestActor].id === "MyID")
assert(actors.last.actor.isInstanceOf[TestActor]) assert(actors.last.actor.isInstanceOf[TestActor])
assert(actors.last.actor.asInstanceOf[TestActor].getId === "MyID") assert(actors.last.actor.asInstanceOf[TestActor].id === "MyID")
actor1.stop actor1.stop
actor2.stop actor2.stop
} }
@Test def shouldGetActorsByManifestFromActorRegistry = { @Test def shouldGetActorsByManifestFromActorRegistry {
ActorRegistry.shutdownAll ActorRegistry.shutdownAll
val actor1 = newActor[TestActor] val actor1 = newActor[TestActor]
actor1.start actor1.start
@ -104,14 +104,14 @@ class ActorRegistrySpec extends JUnitSuite {
val actors = ActorRegistry.actorsFor[TestActor] val actors = ActorRegistry.actorsFor[TestActor]
assert(actors.size === 2) assert(actors.size === 2)
assert(actors.head.actor.isInstanceOf[TestActor]) assert(actors.head.actor.isInstanceOf[TestActor])
assert(actors.head.actor.asInstanceOf[TestActor].getId === "MyID") assert(actors.head.actor.asInstanceOf[TestActor].id === "MyID")
assert(actors.last.actor.isInstanceOf[TestActor]) assert(actors.last.actor.isInstanceOf[TestActor])
assert(actors.last.actor.asInstanceOf[TestActor].getId === "MyID") assert(actors.last.actor.asInstanceOf[TestActor].id === "MyID")
actor1.stop actor1.stop
actor2.stop actor2.stop
} }
@Test def shouldGetAllActorsFromActorRegistry = { @Test def shouldGetAllActorsFromActorRegistry {
ActorRegistry.shutdownAll ActorRegistry.shutdownAll
val actor1 = newActor[TestActor] val actor1 = newActor[TestActor]
actor1.start actor1.start
@ -120,14 +120,14 @@ class ActorRegistrySpec extends JUnitSuite {
val actors = ActorRegistry.actors val actors = ActorRegistry.actors
assert(actors.size === 2) assert(actors.size === 2)
assert(actors.head.actor.isInstanceOf[TestActor]) assert(actors.head.actor.isInstanceOf[TestActor])
assert(actors.head.actor.asInstanceOf[TestActor].getId === "MyID") assert(actors.head.actor.asInstanceOf[TestActor].id === "MyID")
assert(actors.last.actor.isInstanceOf[TestActor]) assert(actors.last.actor.isInstanceOf[TestActor])
assert(actors.last.actor.asInstanceOf[TestActor].getId === "MyID") assert(actors.last.actor.asInstanceOf[TestActor].id === "MyID")
actor1.stop actor1.stop
actor2.stop actor2.stop
} }
@Test def shouldGetResponseByAllActorsInActorRegistryWhenInvokingForeach = { @Test def shouldGetResponseByAllActorsInActorRegistryWhenInvokingForeach {
ActorRegistry.shutdownAll ActorRegistry.shutdownAll
val actor1 = newActor[TestActor] val actor1 = newActor[TestActor]
actor1.start actor1.start
@ -140,7 +140,7 @@ class ActorRegistrySpec extends JUnitSuite {
actor2.stop actor2.stop
} }
@Test def shouldShutdownAllActorsInActorRegistry = { @Test def shouldShutdownAllActorsInActorRegistry {
ActorRegistry.shutdownAll ActorRegistry.shutdownAll
val actor1 = newActor[TestActor] val actor1 = newActor[TestActor]
actor1.start actor1.start
@ -150,7 +150,7 @@ class ActorRegistrySpec extends JUnitSuite {
assert(ActorRegistry.actors.size === 0) assert(ActorRegistry.actors.size === 0)
} }
@Test def shouldRemoveUnregisterActorInActorRegistry = { @Test def shouldRemoveUnregisterActorInActorRegistry {
ActorRegistry.shutdownAll ActorRegistry.shutdownAll
val actor1 = newActor[TestActor] val actor1 = newActor[TestActor]
actor1.start actor1.start

View file

@ -89,7 +89,7 @@ class ClientInitiatedRemoteActorSpec extends JUnitSuite {
} }
@Test @Test
def shouldSendOneWay = { def shouldSendOneWay {
val actor = newActor[RemoteActorSpecActorUnidirectional] val actor = newActor[RemoteActorSpecActorUnidirectional]
actor.makeRemote(HOSTNAME, PORT1) actor.makeRemote(HOSTNAME, PORT1)
actor.start actor.start
@ -99,7 +99,7 @@ class ClientInitiatedRemoteActorSpec extends JUnitSuite {
} }
@Test @Test
def shouldSendOneWayAndReceiveReply = { def shouldSendOneWayAndReceiveReply {
val actor = newActor[SendOneWayAndReplyReceiverActor] val actor = newActor[SendOneWayAndReplyReceiverActor]
actor.makeRemote(HOSTNAME, PORT1) actor.makeRemote(HOSTNAME, PORT1)
actor.start actor.start
@ -116,7 +116,7 @@ class ClientInitiatedRemoteActorSpec extends JUnitSuite {
} }
@Test @Test
def shouldSendBangBangMessageAndReceiveReply = { def shouldSendBangBangMessageAndReceiveReply {
val actor = newActor[RemoteActorSpecActorBidirectional] val actor = newActor[RemoteActorSpecActorBidirectional]
actor.makeRemote(HOSTNAME, PORT1) actor.makeRemote(HOSTNAME, PORT1)
actor.start actor.start
@ -126,7 +126,7 @@ class ClientInitiatedRemoteActorSpec extends JUnitSuite {
} }
@Test @Test
def shouldSendAndReceiveRemoteException = { def shouldSendAndReceiveRemoteException {
implicit val timeout = 500000000L implicit val timeout = 500000000L
val actor = newActor[RemoteActorSpecActorBidirectional] val actor = newActor[RemoteActorSpecActorBidirectional]
actor.makeRemote(HOSTNAME, PORT1) actor.makeRemote(HOSTNAME, PORT1)

View file

@ -32,7 +32,7 @@ class ExecutorBasedEventDrivenDispatcherActorSpec extends JUnitSuite {
private val unit = TimeUnit.MILLISECONDS private val unit = TimeUnit.MILLISECONDS
@Test def shouldSendOneWay = { @Test def shouldSendOneWay {
val actor = newActor[OneWayTestActor] val actor = newActor[OneWayTestActor]
actor.start actor.start
val result = actor ! "OneWay" val result = actor ! "OneWay"
@ -40,7 +40,7 @@ class ExecutorBasedEventDrivenDispatcherActorSpec extends JUnitSuite {
actor.stop actor.stop
} }
@Test def shouldSendReplySync = { @Test def shouldSendReplySync {
val actor = newActor[TestActor] val actor = newActor[TestActor]
actor.start actor.start
val result: String = (actor !! ("Hello", 10000)).get val result: String = (actor !! ("Hello", 10000)).get
@ -48,7 +48,7 @@ class ExecutorBasedEventDrivenDispatcherActorSpec extends JUnitSuite {
actor.stop actor.stop
} }
@Test def shouldSendReplyAsync = { @Test def shouldSendReplyAsync {
val actor = newActor[TestActor] val actor = newActor[TestActor]
actor.start actor.start
val result = actor !! "Hello" val result = actor !! "Hello"
@ -56,7 +56,7 @@ class ExecutorBasedEventDrivenDispatcherActorSpec extends JUnitSuite {
actor.stop actor.stop
} }
@Test def shouldSendReceiveException = { @Test def shouldSendReceiveException {
val actor = newActor[TestActor] val actor = newActor[TestActor]
actor.start actor.start
try { try {

View file

@ -14,7 +14,7 @@ import Actor._
*/ */
class ExecutorBasedEventDrivenDispatcherActorsSpec extends JUnitSuite with MustMatchers { class ExecutorBasedEventDrivenDispatcherActorsSpec extends JUnitSuite with MustMatchers {
class SlowActor(finishedCounter: CountDownLatch) extends Actor { class SlowActor(finishedCounter: CountDownLatch) extends Actor {
messageDispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher dispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher
id = "SlowActor" id = "SlowActor"
def receive = { def receive = {
@ -26,7 +26,7 @@ class ExecutorBasedEventDrivenDispatcherActorsSpec extends JUnitSuite with MustM
} }
class FastActor(finishedCounter: CountDownLatch) extends Actor { class FastActor(finishedCounter: CountDownLatch) extends Actor {
messageDispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher dispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher
id = "FastActor" id = "FastActor"
def receive = { def receive = {
@ -36,7 +36,7 @@ class ExecutorBasedEventDrivenDispatcherActorsSpec extends JUnitSuite with MustM
} }
} }
@Test def slowActorShouldntBlockFastActor = { @Test def slowActorShouldntBlockFastActor {
val sFinished = new CountDownLatch(50) val sFinished = new CountDownLatch(50)
val fFinished = new CountDownLatch(10) val fFinished = new CountDownLatch(10)
val s = newActor(() => new SlowActor(sFinished)).start val s = newActor(() => new SlowActor(sFinished)).start

View file

@ -16,7 +16,7 @@ object ExecutorBasedEventDrivenWorkStealingDispatcherSpec {
val parentActorDispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pooled-dispatcher") val parentActorDispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pooled-dispatcher")
class DelayableActor(name: String, delay: Int, finishedCounter: CountDownLatch) extends Actor { class DelayableActor(name: String, delay: Int, finishedCounter: CountDownLatch) extends Actor {
messageDispatcher = delayableActorDispatcher dispatcher = delayableActorDispatcher
var invocationCount = 0 var invocationCount = 0
id = name id = name
@ -30,17 +30,17 @@ object ExecutorBasedEventDrivenWorkStealingDispatcherSpec {
} }
class FirstActor extends Actor { class FirstActor extends Actor {
messageDispatcher = sharedActorDispatcher dispatcher = sharedActorDispatcher
def receive = {case _ => {}} def receive = {case _ => {}}
} }
class SecondActor extends Actor { class SecondActor extends Actor {
messageDispatcher = sharedActorDispatcher dispatcher = sharedActorDispatcher
def receive = {case _ => {}} def receive = {case _ => {}}
} }
class ParentActor extends Actor { class ParentActor extends Actor {
messageDispatcher = parentActorDispatcher dispatcher = parentActorDispatcher
def receive = {case _ => {}} def receive = {case _ => {}}
} }
@ -54,7 +54,7 @@ object ExecutorBasedEventDrivenWorkStealingDispatcherSpec {
class ExecutorBasedEventDrivenWorkStealingDispatcherSpec extends JUnitSuite with MustMatchers { class ExecutorBasedEventDrivenWorkStealingDispatcherSpec extends JUnitSuite with MustMatchers {
import ExecutorBasedEventDrivenWorkStealingDispatcherSpec._ import ExecutorBasedEventDrivenWorkStealingDispatcherSpec._
@Test def fastActorShouldStealWorkFromSlowActor = { @Test def fastActorShouldStealWorkFromSlowActor {
val finishedCounter = new CountDownLatch(110) val finishedCounter = new CountDownLatch(110)
val slow = newActor(() => new DelayableActor("slow", 50, finishedCounter)).start val slow = newActor(() => new DelayableActor("slow", 50, finishedCounter)).start
@ -78,7 +78,8 @@ class ExecutorBasedEventDrivenWorkStealingDispatcherSpec extends JUnitSuite with
} }
finishedCounter.await(5, TimeUnit.SECONDS) finishedCounter.await(5, TimeUnit.SECONDS)
fast.actor.asInstanceOf[DelayableActor].invocationCount must be > (slow.actor.asInstanceOf[DelayableActor].invocationCount) fast.actor.asInstanceOf[DelayableActor].invocationCount must be >
(slow.actor.asInstanceOf[DelayableActor].invocationCount)
slow.stop slow.stop
fast.stop fast.stop
} }
@ -94,8 +95,8 @@ class ExecutorBasedEventDrivenWorkStealingDispatcherSpec extends JUnitSuite with
} }
@Test def canNotUseActorsOfDifferentSubTypesInSameDispatcher: Unit = { @Test def canNotUseActorsOfDifferentSubTypesInSameDispatcher: Unit = {
val parent = new ParentActor val parent = newActor[ParentActor]
val child = new ChildActor val child = newActor[ChildActor]
parent.start parent.start
intercept[IllegalStateException] { intercept[IllegalStateException] {

View file

@ -15,7 +15,7 @@ object ForwardActorSpec {
val latch = new CountDownLatch(1) val latch = new CountDownLatch(1)
def receive = { def receive = {
case "SendBang" => { case "SendBang" => {
ForwardState.sender = Some(replyTo.get.left.get) ForwardState.sender = Some(self.replyTo.get.left.get)
latch.countDown latch.countDown
} }
case "SendBangBang" => reply("SendBangBang") case "SendBangBang" => reply("SendBangBang")

View file

@ -41,7 +41,7 @@ class FutureSpec extends JUnitSuite {
} }
/* /*
@Test def shouldFutureAwaitEitherLeft = { @Test def shouldFutureAwaitEitherLeft {
val actor1 = newActor[TestActor] val actor1 = newActor[TestActor]
actor1.start actor1.start
val actor2 = newActor[TestActor] val actor2 = newActor[TestActor]
@ -55,7 +55,7 @@ class FutureSpec extends JUnitSuite {
actor2.stop actor2.stop
} }
@Test def shouldFutureAwaitEitherRight = { @Test def shouldFutureAwaitEitherRight {
val actor1 = newActor[TestActor] val actor1 = newActor[TestActor]
actor1.start actor1.start
val actor2 = newActor[TestActor] val actor2 = newActor[TestActor]
@ -69,7 +69,7 @@ class FutureSpec extends JUnitSuite {
actor2.stop actor2.stop
} }
*/ */
@Test def shouldFutureAwaitOneLeft = { @Test def shouldFutureAwaitOneLeft {
val actor1 = newActor[TestActor] val actor1 = newActor[TestActor]
actor1.start actor1.start
val actor2 = newActor[TestActor] val actor2 = newActor[TestActor]
@ -83,7 +83,7 @@ class FutureSpec extends JUnitSuite {
actor2.stop actor2.stop
} }
@Test def shouldFutureAwaitOneRight = { @Test def shouldFutureAwaitOneRight {
val actor1 = newActor[TestActor] val actor1 = newActor[TestActor]
actor1.start actor1.start
val actor2 = newActor[TestActor] val actor2 = newActor[TestActor]
@ -97,7 +97,7 @@ class FutureSpec extends JUnitSuite {
actor2.stop actor2.stop
} }
@Test def shouldFutureAwaitAll = { @Test def shouldFutureAwaitAll {
val actor1 = newActor[TestActor] val actor1 = newActor[TestActor]
actor1.start actor1.start
val actor2 = newActor[TestActor] val actor2 = newActor[TestActor]

View file

@ -27,7 +27,6 @@ object ProtobufActorMessageSerializationSpec {
var server: RemoteServer = null var server: RemoteServer = null
class RemoteActorSpecActorBidirectional extends Actor { class RemoteActorSpecActorBidirectional extends Actor {
start
def receive = { def receive = {
case pojo: ProtobufPOJO => case pojo: ProtobufPOJO =>
val id = pojo.getId val id = pojo.getId
@ -45,7 +44,7 @@ class ProtobufActorMessageSerializationSpec extends JUnitSuite {
def init() { def init() {
server = new RemoteServer server = new RemoteServer
server.start(HOSTNAME, PORT) server.start(HOSTNAME, PORT)
server.register("RemoteActorSpecActorBidirectional", newActor[RemoteActorSpecActorBidirectional]) server.register("RemoteActorSpecActorBidirectional", newActor[RemoteActorSpecActorBidirectional].start)
Thread.sleep(1000) Thread.sleep(1000)
} }
@ -58,7 +57,7 @@ class ProtobufActorMessageSerializationSpec extends JUnitSuite {
} }
@Test @Test
def shouldSendReplyAsync = { def shouldSendReplyAsync {
val actor = RemoteClient.actorFor("RemoteActorSpecActorBidirectional", 5000L, HOSTNAME, PORT) val actor = RemoteClient.actorFor("RemoteActorSpecActorBidirectional", 5000L, HOSTNAME, PORT)
val result = actor !! ProtobufPOJO.newBuilder val result = actor !! ProtobufPOJO.newBuilder
.setId(11) .setId(11)

View file

@ -35,7 +35,7 @@ class ReactorBasedSingleThreadEventDrivenDispatcherActorSpec extends JUnitSuite
private val unit = TimeUnit.MILLISECONDS private val unit = TimeUnit.MILLISECONDS
@Test def shouldSendOneWay = { @Test def shouldSendOneWay {
val actor = newActor[OneWayTestActor] val actor = newActor[OneWayTestActor]
actor.start actor.start
val result = actor ! "OneWay" val result = actor ! "OneWay"
@ -43,7 +43,7 @@ class ReactorBasedSingleThreadEventDrivenDispatcherActorSpec extends JUnitSuite
actor.stop actor.stop
} }
@Test def shouldSendReplySync = { @Test def shouldSendReplySync {
val actor = newActor[TestActor] val actor = newActor[TestActor]
actor.start actor.start
val result: String = (actor !! ("Hello", 10000)).get val result: String = (actor !! ("Hello", 10000)).get
@ -51,7 +51,7 @@ class ReactorBasedSingleThreadEventDrivenDispatcherActorSpec extends JUnitSuite
actor.stop actor.stop
} }
@Test def shouldSendReplyAsync = { @Test def shouldSendReplyAsync {
val actor = newActor[TestActor] val actor = newActor[TestActor]
actor.start actor.start
val result = actor !! "Hello" val result = actor !! "Hello"
@ -59,7 +59,7 @@ class ReactorBasedSingleThreadEventDrivenDispatcherActorSpec extends JUnitSuite
actor.stop actor.stop
} }
@Test def shouldSendReceiveException = { @Test def shouldSendReceiveException {
val actor = newActor[TestActor] val actor = newActor[TestActor]
actor.start actor.start
try { try {

View file

@ -24,7 +24,7 @@ class ReactorBasedThreadPoolEventDrivenDispatcherActorSpec extends JUnitSuite {
private val unit = TimeUnit.MILLISECONDS private val unit = TimeUnit.MILLISECONDS
@Test def shouldSendOneWay = { @Test def shouldSendOneWay {
val oneWay = new CountDownLatch(1) val oneWay = new CountDownLatch(1)
val actor = newActor(() => new Actor { val actor = newActor(() => new Actor {
dispatcher = Dispatchers.newReactorBasedThreadPoolEventDrivenDispatcher(uuid) dispatcher = Dispatchers.newReactorBasedThreadPoolEventDrivenDispatcher(uuid)
@ -38,7 +38,7 @@ class ReactorBasedThreadPoolEventDrivenDispatcherActorSpec extends JUnitSuite {
actor.stop actor.stop
} }
@Test def shouldSendReplySync = { @Test def shouldSendReplySync {
val actor = newActor[TestActor] val actor = newActor[TestActor]
actor.start actor.start
val result: String = (actor !! ("Hello", 10000)).get val result: String = (actor !! ("Hello", 10000)).get
@ -46,7 +46,7 @@ class ReactorBasedThreadPoolEventDrivenDispatcherActorSpec extends JUnitSuite {
actor.stop actor.stop
} }
@Test def shouldSendReplyAsync = { @Test def shouldSendReplyAsync {
val actor = newActor[TestActor] val actor = newActor[TestActor]
actor.start actor.start
val result = actor !! "Hello" val result = actor !! "Hello"
@ -54,7 +54,7 @@ class ReactorBasedThreadPoolEventDrivenDispatcherActorSpec extends JUnitSuite {
actor.stop actor.stop
} }
@Test def shouldSendReceiveException = { @Test def shouldSendReceiveException {
val actor = newActor[TestActor] val actor = newActor[TestActor]
actor.start actor.start
try { try {

View file

@ -34,7 +34,7 @@ object Log {
throw new RuntimeException("DIE") throw new RuntimeException("DIE")
} }
override protected def postRestart(reason: Throwable) { override def postRestart(reason: Throwable) {
Log.messageLog.put(reason.getMessage) Log.messageLog.put(reason.getMessage)
} }
} }
@ -49,7 +49,7 @@ object Log {
throw new RuntimeException("DIE") throw new RuntimeException("DIE")
} }
override protected def postRestart(reason: Throwable) { override def postRestart(reason: Throwable) {
Log.messageLog.put(reason.getMessage) Log.messageLog.put(reason.getMessage)
} }
} }
@ -64,7 +64,7 @@ object Log {
throw new RuntimeException("DIE") throw new RuntimeException("DIE")
} }
override protected def postRestart(reason: Throwable) { override def postRestart(reason: Throwable) {
Log.messageLog.put(reason.getMessage) Log.messageLog.put(reason.getMessage)
} }
} }
@ -337,6 +337,7 @@ class RemoteSupervisorSpec extends JUnitSuite {
pingpong1 = newActor[RemotePingPong1Actor] pingpong1 = newActor[RemotePingPong1Actor]
pingpong1.makeRemote(RemoteServer.HOSTNAME, 9988) pingpong1.makeRemote(RemoteServer.HOSTNAME, 9988)
pingpong1.start
val factory = SupervisorFactory( val factory = SupervisorFactory(
SupervisorConfig( SupervisorConfig(
@ -352,6 +353,7 @@ class RemoteSupervisorSpec extends JUnitSuite {
def getSingleActorOneForOneSupervisor: Supervisor = { def getSingleActorOneForOneSupervisor: Supervisor = {
pingpong1 = newActor[RemotePingPong1Actor] pingpong1 = newActor[RemotePingPong1Actor]
pingpong1.makeRemote(RemoteServer.HOSTNAME, 9988) pingpong1.makeRemote(RemoteServer.HOSTNAME, 9988)
pingpong1.start
val factory = SupervisorFactory( val factory = SupervisorFactory(
SupervisorConfig( SupervisorConfig(
@ -366,10 +368,13 @@ class RemoteSupervisorSpec extends JUnitSuite {
def getMultipleActorsAllForOneConf: Supervisor = { def getMultipleActorsAllForOneConf: Supervisor = {
pingpong1 = newActor[RemotePingPong1Actor] pingpong1 = newActor[RemotePingPong1Actor]
pingpong1.makeRemote(RemoteServer.HOSTNAME, 9988) pingpong1.makeRemote(RemoteServer.HOSTNAME, 9988)
pingpong1.start
pingpong2 = newActor[RemotePingPong2Actor] pingpong2 = newActor[RemotePingPong2Actor]
pingpong2.makeRemote(RemoteServer.HOSTNAME, 9988) pingpong2.makeRemote(RemoteServer.HOSTNAME, 9988)
pingpong2.start
pingpong3 = newActor[RemotePingPong3Actor] pingpong3 = newActor[RemotePingPong3Actor]
pingpong3.makeRemote(RemoteServer.HOSTNAME, 9988) pingpong3.makeRemote(RemoteServer.HOSTNAME, 9988)
pingpong3.start
val factory = SupervisorFactory( val factory = SupervisorFactory(
SupervisorConfig( SupervisorConfig(
@ -392,10 +397,13 @@ class RemoteSupervisorSpec extends JUnitSuite {
def getMultipleActorsOneForOneConf: Supervisor = { def getMultipleActorsOneForOneConf: Supervisor = {
pingpong1 = newActor[RemotePingPong1Actor] pingpong1 = newActor[RemotePingPong1Actor]
pingpong1.makeRemote(RemoteServer.HOSTNAME, 9988) pingpong1.makeRemote(RemoteServer.HOSTNAME, 9988)
pingpong1.start
pingpong2 = newActor[RemotePingPong2Actor] pingpong2 = newActor[RemotePingPong2Actor]
pingpong2.makeRemote(RemoteServer.HOSTNAME, 9988) pingpong2.makeRemote(RemoteServer.HOSTNAME, 9988)
pingpong2.start
pingpong3 = newActor[RemotePingPong3Actor] pingpong3 = newActor[RemotePingPong3Actor]
pingpong3.makeRemote(RemoteServer.HOSTNAME, 9988) pingpong3.makeRemote(RemoteServer.HOSTNAME, 9988)
pingpong3.start
val factory = SupervisorFactory( val factory = SupervisorFactory(
SupervisorConfig( SupervisorConfig(
@ -416,11 +424,11 @@ class RemoteSupervisorSpec extends JUnitSuite {
} }
def getNestedSupervisorsAllForOneConf: Supervisor = { def getNestedSupervisorsAllForOneConf: Supervisor = {
pingpong1 = newActor[RemotePingPong1Actor] pingpong1 = newActor[RemotePingPong1Actor].start
pingpong1.makeRemote(RemoteServer.HOSTNAME, 9988) pingpong1.makeRemote(RemoteServer.HOSTNAME, 9988)
pingpong2 = newActor[RemotePingPong2Actor] pingpong2 = newActor[RemotePingPong2Actor].start
pingpong2.makeRemote(RemoteServer.HOSTNAME, 9988) pingpong2.makeRemote(RemoteServer.HOSTNAME, 9988)
pingpong3 = newActor[RemotePingPong3Actor] pingpong3 = newActor[RemotePingPong3Actor].start
pingpong3.makeRemote(RemoteServer.HOSTNAME, 9988) pingpong3.makeRemote(RemoteServer.HOSTNAME, 9988)
val factory = SupervisorFactory( val factory = SupervisorFactory(

View file

@ -83,7 +83,7 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite {
} }
@Test @Test
def shouldSendOneWay = { def shouldSendOneWay {
val actor = RemoteClient.actorFor( val actor = RemoteClient.actorFor(
"se.scalablesolutions.akka.actor.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorUnidirectional", "se.scalablesolutions.akka.actor.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorUnidirectional",
5000L, 5000L,
@ -94,7 +94,7 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite {
} }
@Test @Test
def shouldSendReplyAsync = { def shouldSendReplyAsync {
val actor = RemoteClient.actorFor( val actor = RemoteClient.actorFor(
"se.scalablesolutions.akka.actor.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorBidirectional", "se.scalablesolutions.akka.actor.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorBidirectional",
5000L, 5000L,
@ -105,13 +105,13 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite {
} }
@Test @Test
def shouldSendRemoteReplyProtocol = { def shouldSendRemoteReplyProtocol {
implicit val timeout = 500000000L implicit val timeout = 500000000L
val actor = RemoteClient.actorFor( val actor = RemoteClient.actorFor(
"se.scalablesolutions.akka.actor.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorBidirectional", "se.scalablesolutions.akka.actor.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorBidirectional",
timeout, timeout,
HOSTNAME, PORT) HOSTNAME, PORT)
val sender = new RemoteActorSpecActorAsyncSender val sender = newActor[RemoteActorSpecActorAsyncSender]
sender.setReplyToAddress(HOSTNAME, PORT) sender.setReplyToAddress(HOSTNAME, PORT)
sender.start sender.start
sender.send(actor) sender.send(actor)
@ -120,7 +120,7 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite {
} }
@Test @Test
def shouldSendReceiveException = { def shouldSendReceiveException {
implicit val timeout = 500000000L implicit val timeout = 500000000L
val actor = RemoteClient.actorFor( val actor = RemoteClient.actorFor(
"se.scalablesolutions.akka.actor.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorBidirectional", "se.scalablesolutions.akka.actor.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorBidirectional",

View file

@ -30,7 +30,7 @@ object SupervisorSpec {
case Die => case Die =>
throw new RuntimeException("DIE") throw new RuntimeException("DIE")
} }
override protected def postRestart(reason: Throwable) { override def postRestart(reason: Throwable) {
messageLog.put(reason.getMessage) messageLog.put(reason.getMessage)
} }
} }
@ -43,7 +43,7 @@ object SupervisorSpec {
case Die => case Die =>
throw new RuntimeException("DIE") throw new RuntimeException("DIE")
} }
override protected def postRestart(reason: Throwable) { override def postRestart(reason: Throwable) {
messageLog.put(reason.getMessage) messageLog.put(reason.getMessage)
} }
} }
@ -57,7 +57,7 @@ object SupervisorSpec {
throw new RuntimeException("DIE") throw new RuntimeException("DIE")
} }
override protected def postRestart(reason: Throwable) { override def postRestart(reason: Throwable) {
messageLog.put(reason.getMessage) messageLog.put(reason.getMessage)
} }
} }
@ -419,7 +419,7 @@ class SupervisorSpec extends JUnitSuite {
// Creat some supervisors with different configurations // Creat some supervisors with different configurations
def getSingleActorAllForOneSupervisor: Supervisor = { def getSingleActorAllForOneSupervisor: Supervisor = {
pingpong1 = newActor[PingPong1Actor] pingpong1 = newActor[PingPong1Actor].start
val factory = SupervisorFactory( val factory = SupervisorFactory(
SupervisorConfig( SupervisorConfig(
@ -432,53 +432,75 @@ class SupervisorSpec extends JUnitSuite {
} }
def getSingleActorOneForOneSupervisor: Supervisor = { def getSingleActorOneForOneSupervisor: Supervisor = {
pingpong1 = newActor[PingPong1Actor] pingpong1 = newActor[PingPong1Actor].start
val factory = SupervisorFactory( Supervisor(
SupervisorConfig( SupervisorConfig(
RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])), RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])),
Supervise( Supervise(
pingpong1, pingpong1,
LifeCycle(Permanent)) LifeCycle(Permanent))
:: Nil)) :: Nil))
factory.newInstance
} }
def getMultipleActorsAllForOneConf: Supervisor = { def getMultipleActorsAllForOneConf: Supervisor = {
pingpong1 = newActor[PingPong1Actor] pingpong1 = newActor[PingPong1Actor].start
pingpong2 = newActor[PingPong2Actor] pingpong2 = newActor[PingPong2Actor].start
pingpong3 = newActor[PingPong3Actor] pingpong3 = newActor[PingPong3Actor].start
val factory = SupervisorFactory( Supervisor(
SupervisorConfig( SupervisorConfig(
RestartStrategy(AllForOne, 3, 100, List(classOf[Exception])), RestartStrategy(AllForOne, 3, 100, List(classOf[Exception])),
Supervise( Supervise(
pingpong1, pingpong1,
LifeCycle(Permanent)) LifeCycle(Permanent))
:: ::
Supervise( Supervise(
pingpong2, pingpong2,
LifeCycle(Permanent)) LifeCycle(Permanent))
:: ::
Supervise( Supervise(
pingpong3, pingpong3,
LifeCycle(Permanent)) LifeCycle(Permanent))
:: Nil)) :: Nil))
factory.newInstance
} }
def getMultipleActorsOneForOneConf: Supervisor = { def getMultipleActorsOneForOneConf: Supervisor = {
pingpong1 = newActor[PingPong1Actor] pingpong1 = newActor[PingPong1Actor].start
pingpong2 = newActor[PingPong2Actor] pingpong2 = newActor[PingPong2Actor].start
pingpong3 = newActor[PingPong3Actor] pingpong3 = newActor[PingPong3Actor].start
val factory = SupervisorFactory( Supervisor(
SupervisorConfig(
RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])),
Supervise(
pingpong1,
LifeCycle(Permanent))
::
Supervise(
pingpong2,
LifeCycle(Permanent))
::
Supervise(
pingpong3,
LifeCycle(Permanent))
:: Nil))
}
def getNestedSupervisorsAllForOneConf: Supervisor = {
pingpong1 = newActor[PingPong1Actor].start
pingpong2 = newActor[PingPong2Actor].start
pingpong3 = newActor[PingPong3Actor].start
Supervisor(
SupervisorConfig(
RestartStrategy(AllForOne, 3, 100, List(classOf[Exception])),
Supervise(
pingpong1,
LifeCycle(Permanent))
::
SupervisorConfig( SupervisorConfig(
RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])), RestartStrategy(AllForOne, 3, 100, Nil),
Supervise(
pingpong1,
LifeCycle(Permanent))
::
Supervise( Supervise(
pingpong2, pingpong2,
LifeCycle(Permanent)) LifeCycle(Permanent))
@ -486,33 +508,7 @@ class SupervisorSpec extends JUnitSuite {
Supervise( Supervise(
pingpong3, pingpong3,
LifeCycle(Permanent)) LifeCycle(Permanent))
:: Nil)) :: Nil)
factory.newInstance :: Nil))
} }
def getNestedSupervisorsAllForOneConf: Supervisor = {
pingpong1 = newActor[PingPong1Actor]
pingpong2 = newActor[PingPong2Actor]
pingpong3 = newActor[PingPong3Actor]
val factory = SupervisorFactory(
SupervisorConfig(
RestartStrategy(AllForOne, 3, 100, List(classOf[Exception])),
Supervise(
pingpong1,
LifeCycle(Permanent))
::
SupervisorConfig(
RestartStrategy(AllForOne, 3, 100, Nil),
Supervise(
pingpong2,
LifeCycle(Permanent))
::
Supervise(
pingpong3,
LifeCycle(Permanent))
:: Nil)
:: Nil))
factory.newInstance
}
} }

View file

@ -25,39 +25,35 @@ class ThreadBasedActorSpec extends JUnitSuite {
private val unit = TimeUnit.MILLISECONDS private val unit = TimeUnit.MILLISECONDS
@Test def shouldSendOneWay = { @Test def shouldSendOneWay {
var oneWay = new CountDownLatch(1) var oneWay = new CountDownLatch(1)
val actor = newActor(() => new Actor { val actor = newActor(() => new Actor {
dispatcher = Dispatchers.newThreadBasedDispatcher(this) dispatcher = Dispatchers.newThreadBasedDispatcher(this)
def receive = { def receive = {
case "OneWay" => oneWay.countDown case "OneWay" => oneWay.countDown
} }
}) }).start
actor.start
val result = actor ! "OneWay" val result = actor ! "OneWay"
assert(oneWay.await(1, TimeUnit.SECONDS)) assert(oneWay.await(1, TimeUnit.SECONDS))
actor.stop actor.stop
} }
@Test def shouldSendReplySync = { @Test def shouldSendReplySync {
val actor = newActor[TestActor] val actor = newActor[TestActor].start
actor.start
val result: String = (actor !! ("Hello", 10000)).get val result: String = (actor !! ("Hello", 10000)).get
assert("World" === result) assert("World" === result)
actor.stop actor.stop
} }
@Test def shouldSendReplyAsync = { @Test def shouldSendReplyAsync {
val actor = newActor[TestActor] val actor = newActor[TestActor].start
actor.start
val result = actor !! "Hello" val result = actor !! "Hello"
assert("World" === result.get.asInstanceOf[String]) assert("World" === result.get.asInstanceOf[String])
actor.stop actor.stop
} }
@Test def shouldSendReceiveException = { @Test def shouldSendReceiveException {
val actor = newActor[TestActor] val actor = newActor[TestActor].start
actor.start
try { try {
actor !! "Failure" actor !! "Failure"
fail("Should have thrown an exception") fail("Should have thrown an exception")

View file

@ -205,8 +205,7 @@ class ChatService extends
SessionManagement with SessionManagement with
ChatManagement with ChatManagement with
RedisChatStorageFactory { RedisChatStorageFactory {
override def start { override def init = {
super.start
RemoteNode.start("localhost", 9999) RemoteNode.start("localhost", 9999)
RemoteNode.register("chat:service", self) RemoteNode.register("chat:service", self)
} }

View file

@ -8,7 +8,7 @@
<log> <log>
filename = "./logs/akka.log" filename = "./logs/akka.log"
roll = "daily" # Options: never, hourly, daily, sunday/monday/... roll = "daily" # Options: never, hourly, daily, sunday/monday/...
level = "info" # Options: fatal, critical, error, warning, info, debug, trace level = "debug" # Options: fatal, critical, error, warning, info, debug, trace
console = on console = on
# syslog_host = "" # syslog_host = ""
# syslog_server_name = "" # syslog_server_name = ""