diff --git a/akka-camel/src/test/scala/component/ActorComponentFeatureTest.scala b/akka-camel/src/test/scala/component/ActorComponentFeatureTest.scala index 396fceacb8..8e5195cb33 100644 --- a/akka-camel/src/test/scala/component/ActorComponentFeatureTest.scala +++ b/akka-camel/src/test/scala/component/ActorComponentFeatureTest.scala @@ -24,7 +24,7 @@ class ActorComponentFeatureTest extends FeatureSpec with BeforeAndAfterAll with scenario("one-way communication using actor id") { val actor = new Tester with Retain with Countdown[Message] actor.start - template.sendBody("actor:%s" format actor.getId, "Martin") + template.sendBody("actor:%s" format actor.id, "Martin") assert(actor.waitFor) assert(actor.body === "Martin") } @@ -40,7 +40,7 @@ class ActorComponentFeatureTest extends FeatureSpec with BeforeAndAfterAll with scenario("two-way communication using actor id") { val actor = new Tester with Respond actor.start - assert(template.requestBody("actor:%s" format actor.getId, "Martin") === "Hello Martin") + assert(template.requestBody("actor:%s" format actor.id, "Martin") === "Hello Martin") } scenario("two-way communication using actor uuid") { diff --git a/akka-core/src/main/java/se/scalablesolutions/akka/remote/protobuf/RemoteProtocol.proto b/akka-core/src/main/java/se/scalablesolutions/akka/remote/protobuf/RemoteProtocol.proto index 372691cd8b..cfb7464e61 100644 --- a/akka-core/src/main/java/se/scalablesolutions/akka/remote/protobuf/RemoteProtocol.proto +++ b/akka-core/src/main/java/se/scalablesolutions/akka/remote/protobuf/RemoteProtocol.proto @@ -6,7 +6,7 @@ package se.scalablesolutions.akka.remote.protobuf; /* Compile with: - cd ./akka-util-java/src/main/java + cd ./akka-core/src/main/java protoc se/scalablesolutions/akka/remote/protobuf/RemoteProtocol.proto --java_out . */ diff --git a/akka-core/src/main/scala/actor/ActiveObject.scala b/akka-core/src/main/scala/actor/ActiveObject.scala index aac90ce056..7a4656cec8 100644 --- a/akka-core/src/main/scala/actor/ActiveObject.scala +++ b/akka-core/src/main/scala/actor/ActiveObject.scala @@ -374,8 +374,8 @@ object ActiveObject { this } - private[akka] def supervise(restartStrategy: RestartStrategy, components: List[Supervise]): ActorRef = - SupervisorFactory(SupervisorConfig(restartStrategy, components)).newInstance.start + private[akka] def supervise(restartStrategy: RestartStrategy, components: List[Supervise]): Supervisor = + Supervisor(SupervisorConfig(restartStrategy, components)) } private[akka] object AspectInitRegistry { @@ -523,9 +523,6 @@ private[akka] sealed class ActiveObjectAspect { } } -// FIXME Jan Kronquist: started work on issue 121 -private[akka] case class Link(val actor: ActorRef) - object Dispatcher { val ZERO_ITEM_CLASS_ARRAY = Array[Class[_]]() val ZERO_ITEM_OBJECT_ARRAY = Array[Object]() diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index 5cdbb3b1ab..faac8f56bc 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -69,6 +69,7 @@ abstract class RemoteActor(hostname: String, port: Int) extends Actor { case class HotSwap(code: Option[PartialFunction[Any, Unit]]) extends LifeCycleMessage case class Restart(reason: Throwable) extends LifeCycleMessage case class Exit(dead: ActorRef, killer: Throwable) extends LifeCycleMessage +case class Link(child: ActorRef) extends LifeCycleMessage case class Unlink(child: ActorRef) extends LifeCycleMessage case class UnlinkAndStop(child: ActorRef) extends LifeCycleMessage case object Kill extends LifeCycleMessage @@ -97,7 +98,7 @@ object Actor extends Logging { } /** - * Creates a new ActorRef out of the Actor with type T. + * Creates a Actor.newActor out of the Actor with type T. *
    *   import Actor._
    *   val actor = newActor[MyActor]
@@ -106,10 +107,10 @@ object Actor extends Logging {
    *   actor.stop
    * 
*/ - def newActor[T <: Actor: Manifest]: ActorRef = new ActorRef(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]]) + def newActor[T <: Actor: Manifest]: ActorRef = new LocalActorRef(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]]) /** - * Creates a new ActorRef out of the Actor. Allows you to pass in a factory function + * Creates a Actor.newActor out of the Actor. Allows you to pass in a factory function * that creates the Actor. Please note that this function can be invoked multiple * times if for example the Actor is supervised and needs to be restarted. *

@@ -122,7 +123,7 @@ object Actor extends Logging { * actor.stop * */ - def newActor(factory: () => Actor): ActorRef = new ActorRef(factory) + def newActor(factory: () => Actor): ActorRef = new LocalActorRef(factory) /** * Use to create an anonymous event-driven actor. @@ -301,7 +302,7 @@ trait Actor extends Logging { /** * Holds the hot swapped partial function. */ - private[this] var _hotswap: Option[PartialFunction[Any, Unit]] = None // FIXME: _hotswap should be a stack + private var _hotswap: Option[PartialFunction[Any, Unit]] = None // FIXME: _hotswap should be a stack // ======================================== // ==== CALLBACKS FOR USER TO OVERRIDE ==== @@ -478,7 +479,7 @@ trait Actor extends Logging { /** * Starts the actor. */ - def start = self.start + def start = self.startOnCreation = true /** * Shuts down the actor its dispatcher and message queue. diff --git a/akka-core/src/main/scala/actor/ActorIdProtobufSpec.proto b/akka-core/src/main/scala/actor/ActorIdProtobufSpec.proto deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala index ab3aa0386d..6a32a48ba6 100644 --- a/akka-core/src/main/scala/actor/ActorRef.scala +++ b/akka-core/src/main/scala/actor/ActorRef.scala @@ -24,7 +24,8 @@ import jsr166x.{Deque, ConcurrentLinkedDeque} import java.net.InetSocketAddress import java.util.concurrent.locks.{Lock, ReentrantLock} import java.util.concurrent.atomic.AtomicReference -import java.util.{HashSet => JHashSet} +import java.util.concurrent.ConcurrentHashMap +import java.util.{Map => JMap} /* trait ActorWithNestedReceive extends Actor { @@ -45,7 +46,7 @@ trait ActorWithNestedReceive extends Actor { /** * The ActorRef object can be used to deserialize ActorRef instances from of its binary representation - * or its Protocol Buffers (protobuf) Message representation to a new ActorRef instance. + * or its Protocol Buffers (protobuf) Message representation to a Actor.newActor instance. *

* Binary -> ActorRef: *

@@ -106,21 +107,18 @@ object ActorRef {
  * 
  * @author Jonas Bonér
  */
-sealed class ActorRef private[akka] () extends TransactionManagement {
+trait ActorRef extends TransactionManagement {
   
   // Only mutable for RemoteServer in order to maintain identity across nodes
-  @volatile private[akka] var _uuid = UUID.newUuid.toString
-  @volatile private[this] var _isRunning = false
-  @volatile private[this] var _isSuspended = true
-  @volatile private[this] var _isShutDown = false
-  @volatile private[akka] var _isKilled = false
-  @volatile private[akka] var _registeredInRemoteNodeDuringSerialization = false
-  @volatile private[akka] var _remoteAddress: Option[InetSocketAddress] = None
-  @volatile private[akka] var _linkedActors: Option[JHashSet[ActorRef]] = None // FIXME: turn _linkedActors into a ConcurrentHashSet to avoid ReadWrite lock when touched?
-  @volatile private[akka] var _supervisor: Option[ActorRef] = None
-  @volatile private[akka] var _replyToAddress: Option[InetSocketAddress] = None
-  private[akka] val _mailbox: Deque[MessageInvocation] = new ConcurrentLinkedDeque[MessageInvocation]
-  private[this] val _guard = new ReadWriteLock
+  @volatile protected[akka] var _uuid = UUID.newUuid.toString
+  @volatile protected[this] var _isRunning = false
+  @volatile protected[this] var _isSuspended = true
+  @volatile protected[this] var _isShutDown = false
+  @volatile protected[akka] var _isKilled = false
+
+  @volatile protected[akka] var startOnCreation = false
+  @volatile protected[akka] var registeredInRemoteNodeDuringSerialization = false
+  protected[this] val guard = new ReadWriteLock
 
   /**
    * User overridable callback/setting.
@@ -128,13 +126,13 @@ sealed class ActorRef private[akka] () extends TransactionManagement {
    * Set to true if messages should have REQUIRES_NEW semantics, e.g. a new transaction should
    * start if there is no one running, else it joins the existing transaction.
    */
-  @volatile private[akka] var isTransactor = false
+  @volatile protected[akka] var isTransactor = false
 
   /**v
    * This lock ensures thread safety in the dispatching: only one message can
    * be dispatched at once on the actor.
    */
-  private[akka] val _dispatcherLock: Lock = new ReentrantLock
+  protected[akka] val dispatcherLock: Lock = new ReentrantLock
 
   /**
    * Holds the reference to the sender of the currently processed message.
@@ -142,27 +140,32 @@ sealed class ActorRef private[akka] () extends TransactionManagement {
    * - Is Some(Left(Actor)) if sender is an actor
    * - Is Some(Right(CompletableFuture)) if sender is holding on to a Future for the result
    */
- private[this] var _replyTo: Option[Either[ActorRef, CompletableFuture[Any]]] = None
- private[akka] def replyTo: Option[Either[ActorRef, CompletableFuture[Any]]] = 
-  _guard.withReadLock { _replyTo }
- private[akka] def replyTo_=(rt: Option[Either[ActorRef, CompletableFuture[Any]]]) = 
-  _guard.withWriteLock { _replyTo = rt }
+ protected[this] var _replyTo: Option[Either[ActorRef, CompletableFuture[Any]]] = None
+ protected[akka] def replyTo: Option[Either[ActorRef, CompletableFuture[Any]]] = 
+  guard.withReadLock { _replyTo }
+ protected[akka] def replyTo_=(rt: Option[Either[ActorRef, CompletableFuture[Any]]]) = 
+  guard.withWriteLock { _replyTo = rt }
 
-  private[this] var actorFactory: Either[Option[Class[_ <: Actor]], Option[() => Actor]] = Left(None)
+  /**
+   * Is the actor killed?
+   */
+  def isKilled: Boolean = _isKilled
 
-  private[this] lazy val actorInstance: AtomicReference[Actor] = new AtomicReference[Actor](newActor)
+  /**
+   * Is the actor running?
+   */
+  def isRunning: Boolean = _isRunning
 
-  private[akka] def actor: Actor = actorInstance.get
+  /**
+   * Is the actor shut down?
+   */
+  def isShutdown: Boolean = _isShutDown
 
-  private[akka] def this(clazz: Class[_ <: Actor]) = { 
-    this()
-    actorFactory = Left(Some(clazz))
-  }
-  
-  private[akka] def this(factory: () => Actor) = {
-    this()
-    actorFactory = Right(Some(factory))
-  }    
+  /**
+   * Returns the uuid for the actor.
+   */
+  def uuid = _uuid
+  protected[akka] def uuid_=(uid: String) = _uuid = uid
 
   /**
    * Sends a one-way asynchronous message. E.g. fire-and-forget semantics.
@@ -265,20 +268,258 @@ sealed class ActorRef private[akka] () extends TransactionManagement {
       }
     } else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
   }
+  
+  /**
+   * Serializes the ActorRef instance into a byte array (Array[Byte]).
+   */
+  def toBinary: Array[Byte]
+  
+  /**
+   * Returns the class for the Actor instance that is managed by the ActorRef.
+   */
+  def actorClass: Class[_ <: Actor]
+  
+  /**
+   * Sets the dispatcher for this actor. Needs to be invoked before the actor is started.
+   */
+  def dispatcher_=(md: MessageDispatcher): Unit
+
+  /**
+   * Get the dispatcher for this actor.
+   */
+  def dispatcher: MessageDispatcher
+
+  /**
+   * Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host.
+   */
+  def makeRemote(hostname: String, port: Int): Unit
+  
+  /**
+   * Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host.
+   */
+  def makeRemote(address: InetSocketAddress): Unit
+  
+  /**
+   * Invoking 'makeTransactionRequired' means that the actor will **start** a new transaction if non exists.
+   * However, it will always participate in an existing transaction.
+   * If transactionality want to be completely turned off then do it by invoking:
+   * 
+   *  TransactionManagement.disableTransactions
+   * 
+ */ + def makeTransactionRequired: Unit + + /** + * Set the contact address for this actor. This is used for replying to messages + * sent asynchronously when no reply channel exists. + */ + def setReplyToAddress(hostname: String, port: Int): Unit = + setReplyToAddress(new InetSocketAddress(hostname, port)) + + /** + * Set the contact address for this actor. This is used for replying to messages + * sent asynchronously when no reply channel exists. + */ + def setReplyToAddress(address: InetSocketAddress): Unit + + /** + * Returns the id for the actor. + */ + def id: String + + /** + * Returns the remote address for the actor, if any, else None. + */ + def remoteAddress: Option[InetSocketAddress] + protected[akka] def remoteAddress_=(addr: Option[InetSocketAddress]): Unit + + /** + * User overridable callback/setting. + *

+ * Defines the default timeout for '!!' and '!!!' invocations, + * e.g. the timeout for the future returned by the call to '!!' and '!!!'. + */ + def timeout: Long + + /** + * Sets the default timeout for '!!' and '!!!' invocations, + * e.g. the timeout for the future returned by the call to '!!' and '!!!'. + */ + def timeout_=(t: Long) + + /** + * Starts up the actor and its message queue. + */ + def start: ActorRef + + /** + * Shuts down the actor its dispatcher and message queue. + * Alias for 'stop'. + */ + def exit = stop + + /** + * Shuts down the actor its dispatcher and message queue. + */ + def stop: Unit + + /** + * Links an other actor to this actor. Links are unidirectional and means that a the linking actor will + * receive a notification if the linked actor has crashed. + *

+ * If the 'trapExit' member field has been set to at contain at least one exception class then it will + * 'trap' these exceptions and automatically restart the linked actors according to the restart strategy + * defined by the 'faultHandler'. + *

+ * To be invoked from within the actor itself. + */ + def link(actorRef: ActorRef): Unit + + /** + * Unlink the actor. + *

+ * To be invoked from within the actor itself. + */ + def unlink(actorRef: ActorRef): Unit + + /** + * Atomically start and link an actor. + *

+ * To be invoked from within the actor itself. + */ + def startLink(actorRef: ActorRef): Unit + + /** + * Atomically start, link and make an actor remote. + *

+ * To be invoked from within the actor itself. + */ + def startLinkRemote(actorRef: ActorRef, hostname: String, port: Int): Unit + + /** + * Atomically create (from actor class) and start an actor. + *

+ * To be invoked from within the actor itself. + */ + def spawn[T <: Actor : Manifest]: ActorRef + + /** + * Atomically create (from actor class), start and make an actor remote. + *

+ * To be invoked from within the actor itself. + */ + def spawnRemote[T <: Actor: Manifest](hostname: String, port: Int): ActorRef + + /** + * Atomically create (from actor class), start and link an actor. + *

+ * To be invoked from within the actor itself. + */ + def spawnLink[T <: Actor: Manifest]: ActorRef + + /** + * Atomically create (from actor class), start, link and make an actor remote. + *

+ * To be invoked from within the actor itself. + */ + def spawnLinkRemote[T <: Actor : Manifest](hostname: String, port: Int): ActorRef + + /** + * Returns the mailbox size. + */ + def mailboxSize: Int + + /** + * Returns the supervisor, if there is one. + */ + def supervisor: Option[ActorRef] + + /** + * Shuts down and removes all linked actors. + */ + def shutdownLinkedActors: Unit + + protected[akka] def toProtocol: ActorRefProtocol + + protected[akka] def invoke(messageHandle: MessageInvocation): Unit + + protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit + + protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( + message: Any, + timeout: Long, + senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] + + protected[this] def actorInstance: AtomicReference[Actor] + + protected[akka] def actor: Actor = actorInstance.get + + protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit + + protected[akka] def trapExit: List[Class[_ <: Throwable]] = actor.trapExit + protected[akka] def trapExit_=(exits: List[Class[_ <: Throwable]]) = actor.trapExit = exits + + protected[akka] def lifeCycle: Option[LifeCycle] = actor.lifeCycle + protected[akka] def lifeCycle_=(cycle: Option[LifeCycle]) = actor.lifeCycle = cycle + + protected[akka] def faultHandler: Option[FaultHandlingStrategy] = actor.faultHandler + protected[akka] def faultHandler_=(handler: Option[FaultHandlingStrategy]) = actor.faultHandler = handler + + protected[akka] def mailbox: Deque[MessageInvocation] + + protected[akka] def restart(reason: Throwable): Unit + + protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit + + protected[akka] def restartLinkedActors(reason: Throwable): Unit + + protected[akka] def registerSupervisorAsRemoteActor: Option[String] + + protected[akka] def linkedActors: JMap[String, ActorRef] + + protected[akka] def linkedActorsAsList: List[ActorRef] + + override def toString: String + + override def hashCode: Int + + override def equals(that: Any): Boolean +} + +/** + * @author Jonas Bonér + */ +sealed class LocalActorRef private[akka]( + private[this] var actorFactory: Either[Option[Class[_ <: Actor]], Option[() => Actor]] = Left(None)) + extends ActorRef { + + private[akka] def this(clazz: Class[_ <: Actor]) = this(Left(Some(clazz))) + private[akka] def this(factory: () => Actor) = this(Right(Some(factory))) + + // Only mutable for RemoteServer in order to maintain identity across nodes + @volatile private[akka] var _remoteAddress: Option[InetSocketAddress] = None + @volatile private[akka] var _linkedActors: Option[ConcurrentHashMap[String, ActorRef]] = None + @volatile private[akka] var _supervisor: Option[ActorRef] = None + @volatile private[akka] var _replyToAddress: Option[InetSocketAddress] = None + + protected[akka] val _mailbox: Deque[MessageInvocation] = new ConcurrentLinkedDeque[MessageInvocation] + protected[this] val actorInstance = new AtomicReference[Actor](newActor) + + if (startOnCreation) start /** * Serializes the ActorRef instance into a Protocol Buffers (protobuf) Message. */ - private[akka] def toProtocol: ActorRefProtocol = _guard.withWriteLock { + protected[akka] def toProtocol: ActorRefProtocol = guard.withWriteLock { val (host, port) = _replyToAddress.map(address => (address.getHostName, address.getPort)) .getOrElse((Actor.HOSTNAME, Actor.PORT)) - if (!_registeredInRemoteNodeDuringSerialization) { + if (!registeredInRemoteNodeDuringSerialization) { Actor.log.debug("Register serialized Actor [%s] as remote @ [%s:%s]", actorClass.getName, host, port) if (RemoteServer.serverFor(host, port).isEmpty) (new RemoteServer).start(host, port) RemoteServer.actorsFor(RemoteServer.Address(host, port)).actors.put(uuid, this) - _registeredInRemoteNodeDuringSerialization = true + registeredInRemoteNodeDuringSerialization = true } ActorRefProtocol.newBuilder @@ -289,6 +530,11 @@ sealed class ActorRef private[akka] () extends TransactionManagement { .setTimeout(timeout) .build } + + /** + * Returns the mailbox. + */ + protected[akka] def mailbox: Deque[MessageInvocation] = _mailbox /** * Serializes the ActorRef instance into a byte array (Array[Byte]). @@ -303,7 +549,7 @@ sealed class ActorRef private[akka] () extends TransactionManagement { /** * Sets the dispatcher for this actor. Needs to be invoked before the actor is started. */ - def dispatcher_=(md: MessageDispatcher): Unit = _guard.withWriteLock { + def dispatcher_=(md: MessageDispatcher): Unit = guard.withWriteLock { if (!isRunning) actor.dispatcher = md else throw new IllegalArgumentException( "Can not swap dispatcher for " + toString + " after it has been started") @@ -312,7 +558,7 @@ sealed class ActorRef private[akka] () extends TransactionManagement { /** * Get the dispatcher for this actor. */ - def dispatcher: MessageDispatcher = _guard.withReadLock { actor.dispatcher } + def dispatcher: MessageDispatcher = guard.withReadLock { actor.dispatcher } /** * Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host. @@ -325,7 +571,7 @@ sealed class ActorRef private[akka] () extends TransactionManagement { /** * Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host. */ - def makeRemote(address: InetSocketAddress): Unit = _guard.withWriteLock { + def makeRemote(address: InetSocketAddress): Unit = guard.withWriteLock { if (isRunning) throw new IllegalStateException( "Can't make a running actor remote. Make sure you call 'makeRemote' before 'start'.") else { @@ -343,40 +589,30 @@ sealed class ActorRef private[akka] () extends TransactionManagement { * TransactionManagement.disableTransactions *

*/ - def makeTransactionRequired = _guard.withWriteLock { + def makeTransactionRequired = guard.withWriteLock { if (isRunning) throw new IllegalArgumentException( "Can not make actor transaction required after it has been started") else isTransactor = true } - /** - * Set the contact address for this actor. This is used for replying to messages - * sent asynchronously when no reply channel exists. - */ - def setReplyToAddress(hostname: String, port: Int): Unit = - setReplyToAddress(new InetSocketAddress(hostname, port)) - /** * Set the contact address for this actor. This is used for replying to messages * sent asynchronously when no reply channel exists. */ def setReplyToAddress(address: InetSocketAddress): Unit = - _guard.withReadLock { _replyToAddress = Some(address) } + guard.withReadLock { _replyToAddress = Some(address) } /** * Returns the id for the actor. */ def id = actor.id - /** - * Returns the uuid for the actor. - */ - def uuid = _uuid - /** * Returns the remote address for the actor, if any, else None. */ - def remoteAddress: Option[InetSocketAddress] = _guard.withReadLock { _remoteAddress } + def remoteAddress: Option[InetSocketAddress] = guard.withReadLock { _remoteAddress } + protected[akka] def remoteAddress_=(addr: Option[InetSocketAddress]): Unit = + guard.withWriteLock { _remoteAddress = addr } /** * User overridable callback/setting. @@ -395,8 +631,8 @@ sealed class ActorRef private[akka] () extends TransactionManagement { /** * Starts up the actor and its message queue. */ - def start: ActorRef = _guard.withWriteLock { - if (_isShutDown) throw new IllegalStateException( + def start: ActorRef = guard.withWriteLock { + if (isShutdown) throw new IllegalStateException( "Can't restart an actor that has been shut down with 'stop' or 'exit'") if (!isRunning) { dispatcher.register(this) @@ -412,21 +648,15 @@ sealed class ActorRef private[akka] () extends TransactionManagement { /** * Shuts down the actor its dispatcher and message queue. - * Alias for 'stop'. */ - def exit = stop - - /** - * Shuts down the actor its dispatcher and message queue. - */ - def stop = _guard.withWriteLock { + def stop = guard.withWriteLock { if (isRunning) { dispatcher.unregister(this) _isRunning = false _isShutDown = true actor.shutdown ActorRegistry.unregister(this) - _remoteAddress.foreach(address => RemoteClient.unregister( + remoteAddress.foreach(address => RemoteClient.unregister( address.getHostName, address.getPort, uuid)) RemoteNode.unregister(this) } @@ -442,10 +672,10 @@ sealed class ActorRef private[akka] () extends TransactionManagement { *

* To be invoked from within the actor itself. */ - def link(actorRef: ActorRef) = _guard.withWriteLock { + def link(actorRef: ActorRef) = guard.withWriteLock { if (actorRef.supervisor.isDefined) throw new IllegalStateException( "Actor can only have one supervisor [" + actorRef + "], e.g. link(actor) fails") - linkedActors.add(actorRef) + linkedActors.put(actorRef.uuid, actorRef) actorRef.supervisor = Some(this) Actor.log.debug("Linking actor [%s] to actor [%s]", actorRef, this) } @@ -455,10 +685,10 @@ sealed class ActorRef private[akka] () extends TransactionManagement { *

* To be invoked from within the actor itself. */ - def unlink(actorRef: ActorRef) = _guard.withWriteLock { - if (!linkedActors.contains(actorRef)) throw new IllegalStateException( + def unlink(actorRef: ActorRef) = guard.withWriteLock { + if (!linkedActors.containsKey(actorRef.uuid)) throw new IllegalStateException( "Actor [" + actorRef + "] is not a linked actor, can't unlink") - linkedActors.remove(actorRef) + linkedActors.remove(actorRef.uuid) actorRef.supervisor = None Actor.log.debug("Unlinking actor [%s] from actor [%s]", actorRef, this) } @@ -468,7 +698,7 @@ sealed class ActorRef private[akka] () extends TransactionManagement { *

* To be invoked from within the actor itself. */ - def startLink(actorRef: ActorRef) = _guard.withWriteLock { + def startLink(actorRef: ActorRef) = guard.withWriteLock { try { actorRef.start } finally { @@ -481,7 +711,7 @@ sealed class ActorRef private[akka] () extends TransactionManagement { *

* To be invoked from within the actor itself. */ - def startLinkRemote(actorRef: ActorRef, hostname: String, port: Int) = _guard.withWriteLock { + def startLinkRemote(actorRef: ActorRef, hostname: String, port: Int) = guard.withWriteLock { try { actorRef.makeRemote(hostname, port) actorRef.start @@ -495,7 +725,7 @@ sealed class ActorRef private[akka] () extends TransactionManagement { *

* To be invoked from within the actor itself. */ - def spawn[T <: Actor : Manifest]: ActorRef = _guard.withWriteLock { + def spawn[T <: Actor : Manifest]: ActorRef = guard.withWriteLock { val actorRef = spawnButDoNotStart[T] actorRef.start actorRef @@ -506,7 +736,7 @@ sealed class ActorRef private[akka] () extends TransactionManagement { *

* To be invoked from within the actor itself. */ - def spawnRemote[T <: Actor: Manifest](hostname: String, port: Int): ActorRef = _guard.withWriteLock { + def spawnRemote[T <: Actor: Manifest](hostname: String, port: Int): ActorRef = guard.withWriteLock { val actor = spawnButDoNotStart[T] actor.makeRemote(hostname, port) actor.start @@ -518,7 +748,7 @@ sealed class ActorRef private[akka] () extends TransactionManagement { *

* To be invoked from within the actor itself. */ - def spawnLink[T <: Actor: Manifest]: ActorRef = _guard.withWriteLock { + def spawnLink[T <: Actor: Manifest]: ActorRef = guard.withWriteLock { val actor = spawnButDoNotStart[T] try { actor.start @@ -533,7 +763,7 @@ sealed class ActorRef private[akka] () extends TransactionManagement { *

* To be invoked from within the actor itself. */ - def spawnLinkRemote[T <: Actor : Manifest](hostname: String, port: Int): ActorRef = _guard.withWriteLock { + def spawnLinkRemote[T <: Actor : Manifest](hostname: String, port: Int): ActorRef = guard.withWriteLock { val actor = spawnButDoNotStart[T] try { actor.makeRemote(hostname, port) @@ -543,21 +773,6 @@ sealed class ActorRef private[akka] () extends TransactionManagement { } } - /** - * Is the actor killed? - */ - def isKilled: Boolean = _isKilled - - /** - * Is the actor running? - */ - def isRunning: Boolean = _isRunning - - /** - * Is the actor shut down? - */ - def isShutdown: Boolean = !_isRunning - /** * Returns the mailbox size. */ @@ -566,8 +781,8 @@ sealed class ActorRef private[akka] () extends TransactionManagement { /** * Shuts down and removes all linked actors. */ - def shutdownLinkedActors: Unit = _guard.withWriteLock { - linkedActors.toArray.toList.asInstanceOf[List[ActorRef]].foreach(_.stop) + def shutdownLinkedActors: Unit = guard.withWriteLock { + linkedActorsAsList.foreach(_.stop) linkedActors.clear } @@ -580,22 +795,13 @@ sealed class ActorRef private[akka] () extends TransactionManagement { /** * Returns the supervisor, if there is one. */ - def supervisor: Option[ActorRef] = _guard.withReadLock { _supervisor } + def supervisor: Option[ActorRef] = guard.withReadLock { _supervisor } - private[akka] def supervisor_=(sup: Option[ActorRef]): Unit = _guard.withWriteLock { _supervisor = sup } + protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit = guard.withWriteLock { _supervisor = sup } - private[akka] def trapExit: List[Class[_ <: Throwable]] = actor.trapExit - private[akka] def trapExit_=(exits: List[Class[_ <: Throwable]]) = actor.trapExit = exits - - private[akka] def lifeCycle: Option[LifeCycle] = actor.lifeCycle - private[akka] def lifeCycle_=(cycle: Option[LifeCycle]) = actor.lifeCycle = cycle - - private[akka] def faultHandler: Option[FaultHandlingStrategy] = actor.faultHandler - private[akka] def faultHandler_=(handler: Option[FaultHandlingStrategy]) = actor.faultHandler = handler - - private def spawnButDoNotStart[T <: Actor: Manifest]: ActorRef = _guard.withWriteLock { + private def spawnButDoNotStart[T <: Actor: Manifest]: ActorRef = guard.withWriteLock { val actor = manifest[T].erasure.asInstanceOf[Class[T]].newInstance - val actorRef = new ActorRef(() => actor) + val actorRef = Actor.newActor(() => actor) if (!dispatcher.isInstanceOf[ThreadBasedDispatcher]) { actorRef.dispatcher = dispatcher } @@ -628,12 +834,12 @@ sealed class ActorRef private[akka] () extends TransactionManagement { protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = { joinTransaction(message) - if (_remoteAddress.isDefined) { + if (remoteAddress.isDefined) { val requestBuilder = RemoteRequestProtocol.newBuilder .setId(RemoteRequestProtocolIdFactory.nextId) - .setTarget(this.getClass.getName) - .setTimeout(this.timeout) - .setUuid(this.uuid) + .setTarget(actorClass.getName) + .setTimeout(timeout) + .setUuid(uuid) .setIsActor(true) .setIsOneWay(true) .setIsEscaped(false) @@ -644,7 +850,7 @@ sealed class ActorRef private[akka] () extends TransactionManagement { senderOption.foreach(sender => requestBuilder.setSender(sender.toProtocol)) RemoteProtocolBuilder.setMessage(message, requestBuilder) - RemoteClient.clientFor(_remoteAddress.get).send[Any](requestBuilder.build, None) + RemoteClient.clientFor(remoteAddress.get).send[Any](requestBuilder.build, None) } else { val invocation = new MessageInvocation(this, message, senderOption.map(Left(_)), transactionSet.get) if (dispatcher.usesActorMailbox) { @@ -661,12 +867,12 @@ sealed class ActorRef private[akka] () extends TransactionManagement { senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { joinTransaction(message) - if (_remoteAddress.isDefined) { + if (remoteAddress.isDefined) { val requestBuilder = RemoteRequestProtocol.newBuilder .setId(RemoteRequestProtocolIdFactory.nextId) - .setTarget(this.getClass.getName) - .setTimeout(this.timeout) - .setUuid(this.uuid) + .setTarget(actorClass.getName) + .setTimeout(timeout) + .setUuid(uuid) .setIsActor(true) .setIsOneWay(false) .setIsEscaped(false) @@ -677,7 +883,7 @@ sealed class ActorRef private[akka] () extends TransactionManagement { val id = registerSupervisorAsRemoteActor if (id.isDefined) requestBuilder.setSupervisorUuid(id.get) - val future = RemoteClient.clientFor(_remoteAddress.get).send(requestBuilder.build, senderFuture) + val future = RemoteClient.clientFor(remoteAddress.get).send(requestBuilder.build, senderFuture) if (future.isDefined) future.get else throw new IllegalStateException("Expected a future from remote call to actor " + toString) } else { @@ -691,7 +897,7 @@ sealed class ActorRef private[akka] () extends TransactionManagement { } } - private[akka] def restart(reason: Throwable): Unit = { + protected[akka] def restart(reason: Throwable): Unit = { Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id) val failedActor = actorInstance.get failedActor.synchronized { @@ -721,7 +927,7 @@ sealed class ActorRef private[akka] () extends TransactionManagement { /** * Callback for the dispatcher. E.g. single entry point to the user code and all protected[this] methods. */ - private[akka] def invoke(messageHandle: MessageInvocation) = actor.synchronized { + protected[akka] def invoke(messageHandle: MessageInvocation) = actor.synchronized { try { if (TransactionManagement.isTransactionalityEnabled) transactionalDispatch(messageHandle) else dispatch(messageHandle) @@ -732,7 +938,7 @@ sealed class ActorRef private[akka] () extends TransactionManagement { } } - private def dispatch[T](messageHandle: MessageInvocation) = _guard.withWriteLock { + private def dispatch[T](messageHandle: MessageInvocation) = guard.withWriteLock { setTransactionSet(messageHandle.transactionSet) val message = messageHandle.message //serializeMessage(messageHandle.message) @@ -757,7 +963,7 @@ sealed class ActorRef private[akka] () extends TransactionManagement { } } - private def transactionalDispatch[T](messageHandle: MessageInvocation) = _guard.withWriteLock { + private def transactionalDispatch[T](messageHandle: MessageInvocation) = guard.withWriteLock { var topLevelTransaction = false val txSet: Option[CountDownCommitBarrier] = if (messageHandle.transactionSet.isDefined) messageHandle.transactionSet @@ -814,7 +1020,7 @@ sealed class ActorRef private[akka] () extends TransactionManagement { } } - private[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = _guard.withReadLock { + protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = guard.withReadLock { if (trapExit.exists(_.isAssignableFrom(reason.getClass))) { if (faultHandler.isDefined) { faultHandler.get match { @@ -828,8 +1034,8 @@ sealed class ActorRef private[akka] () extends TransactionManagement { } else _supervisor.foreach(_ ! Exit(dead, reason)) // if 'trapExit' is not defined then pass the Exit on } - private[akka] def restartLinkedActors(reason: Throwable) = _guard.withWriteLock { - linkedActors.toArray.toList.asInstanceOf[List[ActorRef]].foreach { actorRef => + protected[akka] def restartLinkedActors(reason: Throwable) = guard.withWriteLock { + linkedActorsAsList.foreach { actorRef => if (actorRef.lifeCycle.isEmpty) actorRef.lifeCycle = Some(LifeCycle(Permanent)) actorRef.lifeCycle.get match { case LifeCycle(scope, _) => { @@ -839,7 +1045,7 @@ sealed class ActorRef private[akka] () extends TransactionManagement { case Temporary => Actor.log.info("Actor [%s] configured as TEMPORARY and will not be restarted.", actorRef.id) actorRef.stop - linkedActors.remove(actorRef) // remove the temporary actor + linkedActors.remove(actorRef.uuid) // remove the temporary actor // if last temporary actor is gone, then unlink me from supervisor if (linkedActors.isEmpty) { Actor.log.info( @@ -854,21 +1060,24 @@ sealed class ActorRef private[akka] () extends TransactionManagement { } } - private[akka] def registerSupervisorAsRemoteActor: Option[String] = _guard.withWriteLock { + protected[akka] def registerSupervisorAsRemoteActor: Option[String] = guard.withWriteLock { if (_supervisor.isDefined) { - RemoteClient.clientFor(_remoteAddress.get).registerSupervisorForActor(this) + RemoteClient.clientFor(remoteAddress.get).registerSupervisorForActor(this) Some(_supervisor.get.uuid) } else None } - private[akka] def linkedActors: JHashSet[ActorRef] = _guard.withWriteLock { + protected[akka] def linkedActors: JMap[String, ActorRef] = guard.withWriteLock { if (_linkedActors.isEmpty) { - val set = new JHashSet[ActorRef] - _linkedActors = Some(set) - set + val actors = new ConcurrentHashMap[String, ActorRef] + _linkedActors = Some(actors) + actors } else _linkedActors.get } + protected[akka] def linkedActorsAsList: List[ActorRef] = + linkedActors.values.toArray.toList.asInstanceOf[List[ActorRef]] + private def serializeMessage(message: AnyRef): AnyRef = if (Actor.SERIALIZE_MESSAGES) { if (!message.isInstanceOf[String] && !message.isInstanceOf[Byte] && @@ -900,18 +1109,20 @@ sealed class ActorRef private[akka] () extends TransactionManagement { * * @author Jonas Bonér */ -private[akka] class RemoteActorRef private ( +private[akka] case class RemoteActorRef private[akka] ( // uuid: String, className: String, hostname: String, port: Int, timeOut: Long, isOnRemoteHost: Boolean) extends ActorRef { - uuid: String, className: String, hostname: String, port: Int, timeOut: Long) + uuuid: String, val className: String, val hostname: String, val port: Int, _timeout: Long) extends ActorRef { - + _uuid = uuuid + + start val remoteClient = RemoteClient.clientFor(hostname, port) - override def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = { + def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = { val requestBuilder = RemoteRequestProtocol.newBuilder .setId(RemoteRequestProtocolIdFactory.nextId) .setTarget(className) - .setTimeout(timeOut) + .setTimeout(timeout) .setUuid(uuid) .setIsActor(true) .setIsOneWay(true) @@ -921,7 +1132,7 @@ private[akka] class RemoteActorRef private ( remoteClient.send[Any](requestBuilder.build, None) } - override def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( + def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( message: Any, timeout: Long, senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { @@ -939,17 +1150,53 @@ private[akka] class RemoteActorRef private ( if (future.isDefined) future.get else throw new IllegalStateException("Expected a future from remote call to actor " + toString) } -} -/** - * Remote Actor proxy factory. - * - * @author Jonas Bonér - */ -private[akka] object RemoteActorRef { - // def apply(uuid: String, className: String, hostname: String, port: Int, timeout: Long, isOnRemoteHost: Boolean): ActorRef = - // (new RemoteActorRef(uuid, className, hostname, port, timeout, isOnRemoteHost)).start - def apply(uuid: String, className: String, hostname: String, port: Int, timeout: Long): ActorRef = - (new RemoteActorRef(uuid, className, hostname, port, timeout)).start -} + def timeout: Long = _timeout + def start: ActorRef = { + _isRunning = true + this + } + + def stop: Unit = { + _isRunning = false + _isShutDown = true + } + + // ==== NOT SUPPORTED ==== + def toBinary: Array[Byte] = unsupported + def actorClass: Class[_ <: Actor] = unsupported + def dispatcher_=(md: MessageDispatcher): Unit = unsupported + def dispatcher: MessageDispatcher = unsupported + def makeRemote(hostname: String, port: Int): Unit = unsupported + def makeRemote(address: InetSocketAddress): Unit = unsupported + def makeTransactionRequired: Unit = unsupported + def setReplyToAddress(address: InetSocketAddress): Unit = unsupported + def id: String = unsupported + def remoteAddress: Option[InetSocketAddress] = unsupported + def timeout_=(t: Long) = unsupported + def link(actorRef: ActorRef): Unit = unsupported + def unlink(actorRef: ActorRef): Unit = unsupported + def startLink(actorRef: ActorRef): Unit = unsupported + def startLinkRemote(actorRef: ActorRef, hostname: String, port: Int): Unit = unsupported + def spawn[T <: Actor : Manifest]: ActorRef = unsupported + def spawnRemote[T <: Actor: Manifest](hostname: String, port: Int): ActorRef = unsupported + def spawnLink[T <: Actor: Manifest]: ActorRef = unsupported + def spawnLinkRemote[T <: Actor : Manifest](hostname: String, port: Int): ActorRef = unsupported + def mailboxSize: Int = unsupported + def supervisor: Option[ActorRef] = unsupported + def shutdownLinkedActors: Unit = unsupported + protected[akka] def toProtocol: ActorRefProtocol = unsupported + protected[akka] def mailbox: Deque[MessageInvocation] = unsupported + protected[akka] def restart(reason: Throwable): Unit = unsupported + protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = unsupported + protected[akka] def restartLinkedActors(reason: Throwable): Unit = unsupported + protected[akka] def registerSupervisorAsRemoteActor: Option[String] = unsupported + protected[akka] def linkedActors: JMap[String, ActorRef] = unsupported + protected[akka] def linkedActorsAsList: List[ActorRef] = unsupported + protected[akka] def invoke(messageHandle: MessageInvocation): Unit = unsupported + protected[akka] def remoteAddress_=(addr: Option[InetSocketAddress]): Unit = unsupported + protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit = unsupported + protected[this] def actorInstance: AtomicReference[Actor] = unsupported + private def unsupported = throw new UnsupportedOperationException("Not supported for RemoteActorRef") +} diff --git a/akka-core/src/main/scala/actor/ActorRegistry.scala b/akka-core/src/main/scala/actor/ActorRegistry.scala index c72c588937..6fa40e8031 100644 --- a/akka-core/src/main/scala/actor/ActorRegistry.scala +++ b/akka-core/src/main/scala/actor/ActorRegistry.scala @@ -92,27 +92,28 @@ object ActorRegistry extends Logging { /** * Registers an actor in the ActorRegistry. */ - def register(actorId: ActorRef) = { + def register(actor: ActorRef) = { // UUID - actorsByUUID.put(actorId.uuid, actorId) + actorsByUUID.put(actor.uuid, actor) // ID - val id = actorId.id - if (id eq null) throw new IllegalStateException("Actor.id is null " + actorId) - if (actorsById.containsKey(id)) actorsById.put(id, actorId :: actorsById.get(id)) - else actorsById.put(id, actorId :: Nil) + val id = actor.id + if (id eq null) throw new IllegalStateException("Actor.id is null " + actor) + if (actorsById.containsKey(id)) actorsById.put(id, actor :: actorsById.get(id)) + else actorsById.put(id, actor :: Nil) // Class name - val className = actorId.actor.getClass.getName + val className = actor.actor.getClass.getName if (actorsByClassName.containsKey(className)) { - actorsByClassName.put(className, actorId :: actorsByClassName.get(className)) - } else actorsByClassName.put(className, actorId :: Nil) + actorsByClassName.put(className, actor :: actorsByClassName.get(className)) + } else actorsByClassName.put(className, actor :: Nil) // notify listeners - foreachListener(_ ! ActorRegistered(actorId)) + foreachListener(_ ! ActorRegistered(actor)) } /** + * FIXME: WRONG - unregisters all actors with the same id and class name, should remove the right one in each list * Unregisters an actor in the ActorRegistry. */ def unregister(actor: ActorRef) = { diff --git a/akka-core/src/main/scala/actor/Supervisor.scala b/akka-core/src/main/scala/actor/Supervisor.scala index 47d295b36f..9e2bbe8e62 100644 --- a/akka-core/src/main/scala/actor/Supervisor.scala +++ b/akka-core/src/main/scala/actor/Supervisor.scala @@ -10,11 +10,91 @@ import se.scalablesolutions.akka.util.Helpers._ import se.scalablesolutions.akka.util.Logging import se.scalablesolutions.akka.dispatch.Dispatchers import se.scalablesolutions.akka.remote.RemoteServer +import Actor._ import java.util.concurrent.ConcurrentHashMap /** - * Abstract base class for all supervisor factories. + * Factory object for creating supervisors declarative. It creates instances of the 'Supervisor' class. + * These are not actors, if you need a supervisor that is an Actor then you have to use the 'SupervisorActor' + * factory object. + *

+ * + * Here is a sample on how to use it: + *

+ *  val supervisor = Supervisor(
+ *    SupervisorConfig(
+ *      RestartStrategy(OneForOne, 3, 10, List(classOf[Exception]),
+ *      Supervise(
+ *        myFirstActor,
+ *        LifeCycle(Permanent)) ::
+ *      Supervise(
+ *        mySecondActor,
+ *        LifeCycle(Permanent)) ::
+ *      Nil))
+ * 
+ * + * You can use the declaratively created Supervisor to link and unlink child children + * dynamically using the 'link' and 'unlink' methods. + * + * @author Jonas Bonér + */ +object Supervisor { + def apply(config: SupervisorConfig): Supervisor = SupervisorFactory(config).newInstance.start +} + +/** + * Factory object for creating supervisors as Actors, it has both a declarative and programatic API. + *

+ * + * Here is a sample on how to use the programmatic API (note that the supervisor is automatically started): + *

+ * val supervisor = SupervisorActor(AllForOneStrategy(maxNrOfRetries, timeRange), Array(classOf[Throwable]))
+ * 
+ * // link and unlink child actors dynamically
+ * supervisor ! Link(child1) // starts the actor if not started yet, starts and links atomically
+ * supervisor ! Unlink(child2)
+ * supervisor ! UnlinkAndStop(child3)
+ * 
+ * + * Here is a sample on how to use the declarative API: + *
+ *  val supervisor = SupervisorActor(
+ *    SupervisorConfig(
+ *      RestartStrategy(OneForOne, 3, 10, List(classOf[Exception]),
+ *      Supervise(
+ *        myFirstActor,
+ *        LifeCycle(Permanent)) ::
+ *      Supervise(
+ *        mySecondActor,
+ *        LifeCycle(Permanent)) ::
+ *      Nil))
+ * 
+ * // link and unlink child actors dynamically
+ * supervisor ! Link(child1) // starts the actor if not started yet, starts and links atomically
+ * supervisor ! Unlink(child2)
+ * supervisor ! UnlinkAndStop(child3)
+ * 
+ * + * You can use the declaratively created Supervisor to link and unlink child children + * dynamically using the 'link' and 'unlink' methods. + * + * @author Jonas Bonér + */ +object SupervisorActor { + def apply(config: SupervisorConfig): ActorRef = { + val (handler, trapExits) = SupervisorFactory.retrieveFaultHandlerAndTrapExitsFrom(config) + newActor(() => new SupervisorActor(handler, trapExits)).start + } + + def apply(handler: FaultHandlingStrategy, trapExceptions: List[Class[_ <: Throwable]]): ActorRef = + newActor(() => new SupervisorActor(handler, trapExceptions)).start +} + +/** + * Use this factory instead of the Supervisor factory object if you want to control + * instantiation and starting of the Supervisor, if not then it is easier and better + * to use the Supervisor factory object. *

* Example usage: *

@@ -39,29 +119,37 @@ import java.util.concurrent.ConcurrentHashMap
  *
  * @author Jonas Bonér
  */
-class SupervisorFactory(val config: SupervisorConfig) extends Logging {
-  type ExceptionList = List[Class[_ <: Throwable]]
-
-  def newInstance: ActorRef = newInstanceFor(config)
-
-  def newInstanceFor(config: SupervisorConfig): ActorRef = config match {
-    case SupervisorConfig(restartStrategy, _) =>
-      val supervisor = create(restartStrategy)
-      supervisor.configure(config, this)
-      Actor.newActor(() => supervisor).start
-  }
-
-  protected def create(strategy: RestartStrategy): Supervisor = strategy match {
-    case RestartStrategy(scheme, maxNrOfRetries, timeRange, trapExceptions: ExceptionList) =>
-      scheme match {
-        case AllForOne => new Supervisor(AllForOneStrategy(maxNrOfRetries, timeRange), trapExceptions)
-        case OneForOne => new Supervisor(OneForOneStrategy(maxNrOfRetries, timeRange), trapExceptions)
-      }
-  }
-}
-
 object SupervisorFactory {
   def apply(config: SupervisorConfig) = new SupervisorFactory(config)
+  
+  private[akka] def retrieveFaultHandlerAndTrapExitsFrom(config: SupervisorConfig): 
+    Tuple2[FaultHandlingStrategy, List[Class[_ <: Throwable]]] = config match { 
+    case SupervisorConfig(RestartStrategy(scheme, maxNrOfRetries, timeRange, trapExceptions), _) =>
+      scheme match {
+        case AllForOne => (AllForOneStrategy(maxNrOfRetries, timeRange), trapExceptions)
+        case OneForOne => (OneForOneStrategy(maxNrOfRetries, timeRange), trapExceptions)
+      }
+    }
+}
+
+
+/**
+ * For internal use only. 
+ * 
+ * @author Jonas Bonér
+ */
+class SupervisorFactory private[akka] (val config: SupervisorConfig) extends Logging {
+  type ExceptionList = List[Class[_ <: Throwable]]
+
+  def newInstance: Supervisor = newInstanceFor(config)
+
+  def newInstanceFor(config: SupervisorConfig): Supervisor = {
+    val (handler, trapExits) = SupervisorFactory.retrieveFaultHandlerAndTrapExitsFrom(config)
+    val supervisor = new Supervisor(handler, trapExits)
+    supervisor.configure(config, this)
+    supervisor.start
+    supervisor
+  }
 }
 
 /**
@@ -69,42 +157,38 @@ object SupervisorFactory {
  * 

* The supervisor class is only used for the configuration system when configuring supervisor * hierarchies declaratively. Should not be used as part of the regular programming API. Instead - * wire the actors together using 'link', 'spawnLink' etc. and set the 'trapExit' flag in the - * actors that should trap error signals and trigger restart. + * wire the children together using 'link', 'spawnLink' etc. and set the 'trapExit' flag in the + * children that should trap error signals and trigger restart. *

- * See the ScalaDoc for the SupervisorFactory for an example on how to declaratively wire up actors. + * See the ScalaDoc for the SupervisorFactory for an example on how to declaratively wire up children. * * @author Jonas Bonér */ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExceptions: List[Class[_ <: Throwable]]) - extends Actor with Logging with Configurator { + extends Configurator { - trapExit = trapExceptions - faultHandler = Some(handler) + private val children = new ConcurrentHashMap[String, List[ActorRef]] + private val supervisor = SupervisorActor(handler, trapExceptions) + + def uuid = supervisor.uuid + + def start: Supervisor = { + ConfiguratorRepository.registerConfigurator(this) + this + } - // FIXME should Supervisor really havea newThreadBasedDispatcher?? - self.dispatcher = Dispatchers.newThreadBasedDispatcher(this) + def shutdown: Unit = supervisor.stop - private val actors = new ConcurrentHashMap[String, List[ActorRef]] - - // Cheating, should really go through the dispatcher rather than direct access to a CHM - def getInstance[T](clazz: Class[T]): List[T] = actors.get(clazz.getName).asInstanceOf[List[T]] + def link(child: ActorRef) = supervisor ! Link(child) + + def unlink(child: ActorRef) = supervisor ! Unlink(child) + + def getInstance[T](clazz: Class[T]): List[T] = children.get(clazz.getName).asInstanceOf[List[T]] def getComponentInterfaces: List[Class[_]] = - actors.values.toArray.toList.asInstanceOf[List[List[AnyRef]]].flatten.map(_.getClass) + children.values.toArray.toList.asInstanceOf[List[List[AnyRef]]].flatten.map(_.getClass) - def isDefined(clazz: Class[_]): Boolean = actors.containsKey(clazz.getName) - - override def init: Unit = synchronized { - ConfiguratorRepository.registerConfigurator(this) - } - - override def shutdown: Unit = synchronized { self.shutdownLinkedActors } - - def receive = { - case unknown => throw new IllegalArgumentException( - "Supervisor " + toString + " does not respond to any messages. Unknown message [" + unknown + "]") - } + def isDefined(clazz: Class[_]): Boolean = children.containsKey(clazz.getName) def configure(config: SupervisorConfig, factory: SupervisorFactory) = config match { case SupervisorConfig(_, servers) => @@ -113,32 +197,62 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExcep case Supervise(actorRef, lifeCycle, remoteAddress) => val className = actorRef.actor.getClass.getName val currentActors = { - val list = actors.get(className) + val list = children.get(className) if (list eq null) List[ActorRef]() else list } - actors.put(className, actorRef :: currentActors) + children.put(className, actorRef :: currentActors) actorRef.actor.lifeCycle = Some(lifeCycle) - startLink(actorRef) + supervisor ! Link(actorRef) remoteAddress.foreach(address => RemoteServer.actorsFor( RemoteServer.Address(address.hostname, address.port)) .actors.put(actorRef.id, actorRef)) case supervisorConfig @ SupervisorConfig(_, _) => // recursive supervisor configuration - val supervisor = { - val instance = factory.newInstanceFor(supervisorConfig) - instance.start - instance - } - supervisor.lifeCycle = Some(LifeCycle(Permanent)) - val className = supervisor.actorClass.getName + val childSupervisor = SupervisorActor(supervisorConfig) + childSupervisor.lifeCycle = Some(LifeCycle(Permanent)) + val className = childSupervisor.uuid val currentSupervisors = { - val list = actors.get(className) + val list = children.get(className) if (list eq null) List[ActorRef]() else list } - actors.put(className, supervisor :: currentSupervisors) - link(supervisor) + children.put(className, childSupervisor :: currentSupervisors) + supervisor ! Link(childSupervisor) }) } } + +/** + * Use this class when you want to create a supervisor dynamically that should only + * manage its child children and not have any functionality by itself. + *

+ * Here is a sample on how to use it: + *

+ * val supervisor = Supervisor(AllForOneStrategy(maxNrOfRetries, timeRange), Array(classOf[Throwable]))
+ * supervisor ! Link(child1) // starts the actor if not started yet, starts and links atomically
+ * supervisor ! Unlink(child2)
+ * supervisor ! UnlinkAndStop(child3)
+ * 
+ * + * @author Jonas Bonér + */ +final class SupervisorActor private[akka] ( + handler: FaultHandlingStrategy, + trapExceptions: List[Class[_ <: Throwable]]) + extends Actor { + + trapExit = trapExceptions + faultHandler = Some(handler) + + override def shutdown: Unit = self.shutdownLinkedActors + + def receive = { + case Link(child) => startLink(child) + case Unlink(child) => unlink(child) + case UnlinkAndStop(child) => unlink(child); child.stop + case unknown => throw new IllegalArgumentException( + "Supervisor can only respond to 'Link' and 'Unlink' messages. Unknown message [" + unknown + "]") + } +} + diff --git a/akka-core/src/main/scala/config/ActiveObjectGuiceConfigurator.scala b/akka-core/src/main/scala/config/ActiveObjectGuiceConfigurator.scala index c659751a4e..4e1bd16efb 100644 --- a/akka-core/src/main/scala/config/ActiveObjectGuiceConfigurator.scala +++ b/akka-core/src/main/scala/config/ActiveObjectGuiceConfigurator.scala @@ -7,7 +7,7 @@ package se.scalablesolutions.akka.config import com.google.inject._ import se.scalablesolutions.akka.config.ScalaConfig._ -import se.scalablesolutions.akka.actor.{Supervisor, ActiveObject, Dispatcher, ActorRef} +import se.scalablesolutions.akka.actor.{Supervisor, ActiveObject, Dispatcher, ActorRef, Actor} import se.scalablesolutions.akka.remote.RemoteServer import se.scalablesolutions.akka.util.Logging @@ -24,7 +24,7 @@ import java.lang.reflect.Method */ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfiguratorBase with Logging { private var injector: Injector = _ - private var supervisor: Option[ActorRef] = None + private var supervisor: Option[Supervisor] = None private var restartStrategy: RestartStrategy = _ private var components: List[Component] = _ private var supervised: List[Supervise] = Nil @@ -82,7 +82,7 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat private def newSubclassingProxy(component: Component): DependencyBinding = { val targetClass = component.target - val actorRef = new ActorRef(() => new Dispatcher(component.transactionRequired, component.lifeCycle.callbacks)) + val actorRef = Actor.newActor(() => new Dispatcher(component.transactionRequired, component.lifeCycle.callbacks)) if (component.dispatcher.isDefined) actorRef.dispatcher = component.dispatcher.get val remoteAddress = if (component.remoteAddress.isDefined) @@ -103,7 +103,7 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat val targetClass = component.intf.get val targetInstance = component.target.newInstance.asInstanceOf[AnyRef] // TODO: perhaps need to put in registry component.target.getConstructor(Array[Class[_]](): _*).setAccessible(true) - val actorRef = new ActorRef(() => new Dispatcher(component.transactionRequired, component.lifeCycle.callbacks)) + val actorRef = Actor.newActor(() => new Dispatcher(component.transactionRequired, component.lifeCycle.callbacks)) if (component.dispatcher.isDefined) actorRef.dispatcher = component.dispatcher.get val remoteAddress = if (component.remoteAddress.isDefined) @@ -130,7 +130,6 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat override def supervise: ActiveObjectConfiguratorBase = synchronized { if (injector eq null) inject supervisor = Some(ActiveObject.supervise(restartStrategy, supervised)) - supervisor.get.start ConfiguratorRepository.registerConfigurator(this) this } @@ -164,7 +163,7 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat } def stop = synchronized { - if (supervisor.isDefined) supervisor.get.stop + if (supervisor.isDefined) supervisor.get.shutdown } } \ No newline at end of file diff --git a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index eb3a647710..b602dbb83b 100644 --- a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -65,8 +65,8 @@ class ExecutorBasedEventDrivenDispatcher(_name: String) extends MessageDispatche var lockAcquiredOnce = false // this do-wile loop is required to prevent missing new messages between the end of the inner while // loop and releasing the lock - val lock = invocation.receiver._dispatcherLock - val mailbox = invocation.receiver._mailbox + val lock = invocation.receiver.dispatcherLock + val mailbox = invocation.receiver.mailbox do { if (lock.tryLock) { lockAcquiredOnce = true diff --git a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala index f8bf772ccb..40372c465d 100644 --- a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala @@ -70,7 +70,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess */ private def tryProcessMailbox(receiver: ActorRef): Boolean = { var lockAcquiredOnce = false - val lock = receiver._dispatcherLock + val lock = receiver.dispatcherLock // this do-wile loop is required to prevent missing new messages between the end of processing // the mailbox and releasing the lock do { @@ -82,7 +82,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess lock.unlock } } - } while ((lockAcquiredOnce && !receiver._mailbox.isEmpty)) + } while ((lockAcquiredOnce && !receiver.mailbox.isEmpty)) return lockAcquiredOnce } @@ -91,10 +91,10 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess * Process the messages in the mailbox of the given actor. */ private def processMailbox(receiver: ActorRef) = { - var messageInvocation = receiver._mailbox.poll + var messageInvocation = receiver.mailbox.poll while (messageInvocation != null) { messageInvocation.invoke - messageInvocation = receiver._mailbox.poll + messageInvocation = receiver.mailbox.poll } } @@ -128,7 +128,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess val index = (i + startIndex) % actors.length val actor = actors(index) if (actor != receiver) { // skip ourselves - if (actor._mailbox.isEmpty) { // only pick actors that will most likely be able to process the messages + if (actor.mailbox.isEmpty) { // only pick actors that will most likely be able to process the messages return (Some(actor), index) } } @@ -141,11 +141,11 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess * the thiefs dispatching lock, because in that case another thread is already processing the thiefs mailbox. */ private def tryDonateAndProcessMessages(receiver: ActorRef, thief: ActorRef) = { - if (thief._dispatcherLock.tryLock) { + if (thief.dispatcherLock.tryLock) { try { donateAndProcessMessages(receiver, thief) } finally { - thief._dispatcherLock.unlock + thief.dispatcherLock.unlock } } } @@ -170,7 +170,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess * Steal a message from the receiver and give it to the thief. */ private def donateMessage(receiver: ActorRef, thief: ActorRef): Option[MessageInvocation] = { - val donated = receiver._mailbox.pollLast + val donated = receiver.mailbox.pollLast if (donated != null) { thief.self ! donated.message return Some(donated) diff --git a/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala b/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala index db78f41c1e..b5aaa3380c 100644 --- a/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala @@ -17,7 +17,7 @@ import se.scalablesolutions.akka.actor.{Actor, ActorRef, ActorMessageInvoker} class ThreadBasedDispatcher private[akka] (val name: String, val messageHandler: MessageInvoker) extends MessageDispatcher { - def this(actor: Actor) = this(actor.getClass.getName, new ActorMessageInvoker(new ActorRef(() => actor))) + def this(actor: Actor) = this(actor.getClass.getName, new ActorMessageInvoker(Actor.newActor(() => actor))) private val queue = new BlockingMessageQueue(name) private var selectorThread: Thread = _ diff --git a/akka-core/src/main/scala/remote/Cluster.scala b/akka-core/src/main/scala/remote/Cluster.scala index a1abe9b6b9..5b918e78a3 100644 --- a/akka-core/src/main/scala/remote/Cluster.scala +++ b/akka-core/src/main/scala/remote/Cluster.scala @@ -251,7 +251,7 @@ object Cluster extends Cluster with Logging { fqn => val a = Class.forName(fqn).newInstance.asInstanceOf[ClusterActor] a setSerializer serializer - new ActorRef(() => a) + Actor.newActor(() => a) } } catch { @@ -261,15 +261,11 @@ object Cluster extends Cluster with Logging { } } - private[akka] def createSupervisor(actor: ActorRef): Option[ActorRef] = { - val sup = SupervisorFactory( + private[akka] def createSupervisor(actor: ActorRef): Option[Supervisor] = + Some(Supervisor( SupervisorConfig( RestartStrategy(OneForOne, 5, 1000, List(classOf[Exception])), - Supervise(actor, LifeCycle(Permanent)) :: Nil) - ).newInstance - Some(sup) - } - + Supervise(actor, LifeCycle(Permanent)) :: Nil))) def name = clusterActor.map(_.name).getOrElse("No cluster") @@ -303,7 +299,7 @@ object Cluster extends Cluster with Logging { log.info("Shutting down Cluster Service...") for { c <- clusterActorRef - s <- c._supervisor + s <- c.supervisor } s.stop clusterActor = None } diff --git a/akka-core/src/main/scala/remote/RemoteServer.scala b/akka-core/src/main/scala/remote/RemoteServer.scala index 7fa21fccf6..5dfde04747 100644 --- a/akka-core/src/main/scala/remote/RemoteServer.scala +++ b/akka-core/src/main/scala/remote/RemoteServer.scala @@ -226,7 +226,7 @@ class RemoteServer extends Logging { log.info("Unregistering server side remote actor [%s] with id [%s]", actorRef.actorClass.getName, actorRef.id) val server = RemoteServer.actorsFor(RemoteServer.Address(hostname, port)) server.actors.remove(actorRef.id) - if (actorRef._registeredInRemoteNodeDuringSerialization) server.actors.remove(actorRef.uuid) + if (actorRef.registeredInRemoteNodeDuringSerialization) server.actors.remove(actorRef.uuid) } } @@ -241,7 +241,7 @@ class RemoteServer extends Logging { val server = RemoteServer.actorsFor(RemoteServer.Address(hostname, port)) val actorRef = server.actors.get(id) server.actors.remove(id) - if (actorRef._registeredInRemoteNodeDuringSerialization) server.actors.remove(actorRef.uuid) + if (actorRef.registeredInRemoteNodeDuringSerialization) server.actors.remove(actorRef.uuid) } } } @@ -464,10 +464,10 @@ class RemoteServerHandler( log.info("Creating a new remote actor [%s:%s]", name, uuid) val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(name) else Class.forName(name) - val actorRef = new ActorRef(() => clazz.newInstance.asInstanceOf[Actor]) - actorRef._uuid = uuid + val actorRef = Actor.newActor(() => clazz.newInstance.asInstanceOf[Actor]) + actorRef.uuid = uuid actorRef.timeout = timeout - actorRef._remoteAddress = None + actorRef.remoteAddress = None actors.put(uuid, actorRef) actorRef } catch { diff --git a/akka-core/src/main/scala/util/Helpers.scala b/akka-core/src/main/scala/util/Helpers.scala index 55abf6e7ac..4835a4dd05 100644 --- a/akka-core/src/main/scala/util/Helpers.scala +++ b/akka-core/src/main/scala/util/Helpers.scala @@ -5,7 +5,6 @@ package se.scalablesolutions.akka.util import java.security.MessageDigest -import java.util.concurrent.locks.ReentrantReadWriteLock class SystemFailure(cause: Throwable) extends RuntimeException(cause) @@ -38,30 +37,5 @@ object Helpers extends Logging { }) sb.toString } - - // ================================================ - class ReadWriteLock { - private val rwl = new ReentrantReadWriteLock - private val readLock = rwl.readLock - private val writeLock = rwl.writeLock - - def withWriteLock[T](body: => T): T = { - writeLock.lock - try { - body - } finally { - writeLock.unlock - } - } - - def withReadLock[T](body: => T): T = { - readLock.lock - try { - body - } finally { - readLock.unlock - } - } - } } diff --git a/akka-core/src/main/scala/util/ReadWriteLock.scala b/akka-core/src/main/scala/util/ReadWriteLock.scala new file mode 100644 index 0000000000..9dd7a27b07 --- /dev/null +++ b/akka-core/src/main/scala/util/ReadWriteLock.scala @@ -0,0 +1,35 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.util + +import java.util.concurrent.locks.ReentrantReadWriteLock + +/** + * @author Jonas Bonér + */ +class ReadWriteLock { + private val rwl = new ReentrantReadWriteLock + private val readLock = rwl.readLock + private val writeLock = rwl.writeLock + + def withWriteLock[T](body: => T): T = { + writeLock.lock + try { + body + } finally { + writeLock.unlock + } + } + + def withReadLock[T](body: => T): T = { + readLock.lock + try { + body + } finally { + readLock.unlock + } + } +} + diff --git a/akka-core/src/test/scala/ActorFireForgetRequestReplySpec.scala b/akka-core/src/test/scala/ActorFireForgetRequestReplySpec.scala index 50e655d97d..ed6517de8b 100644 --- a/akka-core/src/test/scala/ActorFireForgetRequestReplySpec.scala +++ b/akka-core/src/test/scala/ActorFireForgetRequestReplySpec.scala @@ -13,7 +13,7 @@ object ActorFireForgetRequestReplySpec { def receive = { case "Send" => reply("Reply") - case "SendImplicit" => replyTo.get.left.get ! "ReplyImplicit" + case "SendImplicit" => self.replyTo.get.left.get ! "ReplyImplicit" } } diff --git a/akka-core/src/test/scala/ActorRegistrySpec.scala b/akka-core/src/test/scala/ActorRegistrySpec.scala index 024a111525..8027c99655 100644 --- a/akka-core/src/test/scala/ActorRegistrySpec.scala +++ b/akka-core/src/test/scala/ActorRegistrySpec.scala @@ -19,18 +19,18 @@ object ActorRegistrySpec { class ActorRegistrySpec extends JUnitSuite { import ActorRegistrySpec._ - @Test def shouldGetActorByIdFromActorRegistry = { + @Test def shouldGetActorByIdFromActorRegistry { ActorRegistry.shutdownAll val actor = newActor[TestActor] actor.start val actors = ActorRegistry.actorsFor("MyID") assert(actors.size === 1) assert(actors.head.actor.isInstanceOf[TestActor]) - assert(actors.head.actor.asInstanceOf[TestActor].getId === "MyID") + assert(actors.head.actor.asInstanceOf[TestActor].id === "MyID") actor.stop } - @Test def shouldGetActorByUUIDFromActorRegistry = { + @Test def shouldGetActorByUUIDFromActorRegistry { ActorRegistry.shutdownAll val actor = newActor[TestActor] val uuid = actor.uuid @@ -41,29 +41,29 @@ class ActorRegistrySpec extends JUnitSuite { actor.stop } - @Test def shouldGetActorByClassFromActorRegistry = { + @Test def shouldGetActorByClassFromActorRegistry { ActorRegistry.shutdownAll val actor = newActor[TestActor] actor.start val actors = ActorRegistry.actorsFor(classOf[TestActor]) assert(actors.size === 1) assert(actors.head.actor.isInstanceOf[TestActor]) - assert(actors.head.actor.asInstanceOf[TestActor].getId === "MyID") + assert(actors.head.actor.asInstanceOf[TestActor].id === "MyID") actor.stop } - @Test def shouldGetActorByManifestFromActorRegistry = { + @Test def shouldGetActorByManifestFromActorRegistry { ActorRegistry.shutdownAll val actor = newActor[TestActor] actor.start val actors = ActorRegistry.actorsFor[TestActor] assert(actors.size === 1) assert(actors.head.actor.isInstanceOf[TestActor]) - assert(actors.head.actor.asInstanceOf[TestActor].getId === "MyID") + assert(actors.head.actor.asInstanceOf[TestActor].id === "MyID") actor.stop } - @Test def shouldGetActorsByIdFromActorRegistry = { + @Test def shouldGetActorsByIdFromActorRegistry { ActorRegistry.shutdownAll val actor1 = newActor[TestActor] actor1.start @@ -72,14 +72,14 @@ class ActorRegistrySpec extends JUnitSuite { val actors = ActorRegistry.actorsFor("MyID") assert(actors.size === 2) assert(actors.head.actor.isInstanceOf[TestActor]) - assert(actors.head.actor.asInstanceOf[TestActor].getId === "MyID") + assert(actors.head.actor.asInstanceOf[TestActor].id === "MyID") assert(actors.last.actor.isInstanceOf[TestActor]) - assert(actors.last.actor.asInstanceOf[TestActor].getId === "MyID") + assert(actors.last.actor.asInstanceOf[TestActor].id === "MyID") actor1.stop actor2.stop } - @Test def shouldGetActorsByClassFromActorRegistry = { + @Test def shouldGetActorsByClassFromActorRegistry { ActorRegistry.shutdownAll val actor1 = newActor[TestActor] actor1.start @@ -88,14 +88,14 @@ class ActorRegistrySpec extends JUnitSuite { val actors = ActorRegistry.actorsFor(classOf[TestActor]) assert(actors.size === 2) assert(actors.head.actor.isInstanceOf[TestActor]) - assert(actors.head.actor.asInstanceOf[TestActor].getId === "MyID") + assert(actors.head.actor.asInstanceOf[TestActor].id === "MyID") assert(actors.last.actor.isInstanceOf[TestActor]) - assert(actors.last.actor.asInstanceOf[TestActor].getId === "MyID") + assert(actors.last.actor.asInstanceOf[TestActor].id === "MyID") actor1.stop actor2.stop } - @Test def shouldGetActorsByManifestFromActorRegistry = { + @Test def shouldGetActorsByManifestFromActorRegistry { ActorRegistry.shutdownAll val actor1 = newActor[TestActor] actor1.start @@ -104,14 +104,14 @@ class ActorRegistrySpec extends JUnitSuite { val actors = ActorRegistry.actorsFor[TestActor] assert(actors.size === 2) assert(actors.head.actor.isInstanceOf[TestActor]) - assert(actors.head.actor.asInstanceOf[TestActor].getId === "MyID") + assert(actors.head.actor.asInstanceOf[TestActor].id === "MyID") assert(actors.last.actor.isInstanceOf[TestActor]) - assert(actors.last.actor.asInstanceOf[TestActor].getId === "MyID") + assert(actors.last.actor.asInstanceOf[TestActor].id === "MyID") actor1.stop actor2.stop } - @Test def shouldGetAllActorsFromActorRegistry = { + @Test def shouldGetAllActorsFromActorRegistry { ActorRegistry.shutdownAll val actor1 = newActor[TestActor] actor1.start @@ -120,14 +120,14 @@ class ActorRegistrySpec extends JUnitSuite { val actors = ActorRegistry.actors assert(actors.size === 2) assert(actors.head.actor.isInstanceOf[TestActor]) - assert(actors.head.actor.asInstanceOf[TestActor].getId === "MyID") + assert(actors.head.actor.asInstanceOf[TestActor].id === "MyID") assert(actors.last.actor.isInstanceOf[TestActor]) - assert(actors.last.actor.asInstanceOf[TestActor].getId === "MyID") + assert(actors.last.actor.asInstanceOf[TestActor].id === "MyID") actor1.stop actor2.stop } - @Test def shouldGetResponseByAllActorsInActorRegistryWhenInvokingForeach = { + @Test def shouldGetResponseByAllActorsInActorRegistryWhenInvokingForeach { ActorRegistry.shutdownAll val actor1 = newActor[TestActor] actor1.start @@ -140,7 +140,7 @@ class ActorRegistrySpec extends JUnitSuite { actor2.stop } - @Test def shouldShutdownAllActorsInActorRegistry = { + @Test def shouldShutdownAllActorsInActorRegistry { ActorRegistry.shutdownAll val actor1 = newActor[TestActor] actor1.start @@ -150,7 +150,7 @@ class ActorRegistrySpec extends JUnitSuite { assert(ActorRegistry.actors.size === 0) } - @Test def shouldRemoveUnregisterActorInActorRegistry = { + @Test def shouldRemoveUnregisterActorInActorRegistry { ActorRegistry.shutdownAll val actor1 = newActor[TestActor] actor1.start diff --git a/akka-core/src/test/scala/ClientInitiatedRemoteActorSpec.scala b/akka-core/src/test/scala/ClientInitiatedRemoteActorSpec.scala index 9b8b5e03e2..8dcf19ce71 100644 --- a/akka-core/src/test/scala/ClientInitiatedRemoteActorSpec.scala +++ b/akka-core/src/test/scala/ClientInitiatedRemoteActorSpec.scala @@ -89,7 +89,7 @@ class ClientInitiatedRemoteActorSpec extends JUnitSuite { } @Test - def shouldSendOneWay = { + def shouldSendOneWay { val actor = newActor[RemoteActorSpecActorUnidirectional] actor.makeRemote(HOSTNAME, PORT1) actor.start @@ -99,7 +99,7 @@ class ClientInitiatedRemoteActorSpec extends JUnitSuite { } @Test - def shouldSendOneWayAndReceiveReply = { + def shouldSendOneWayAndReceiveReply { val actor = newActor[SendOneWayAndReplyReceiverActor] actor.makeRemote(HOSTNAME, PORT1) actor.start @@ -116,7 +116,7 @@ class ClientInitiatedRemoteActorSpec extends JUnitSuite { } @Test - def shouldSendBangBangMessageAndReceiveReply = { + def shouldSendBangBangMessageAndReceiveReply { val actor = newActor[RemoteActorSpecActorBidirectional] actor.makeRemote(HOSTNAME, PORT1) actor.start @@ -126,7 +126,7 @@ class ClientInitiatedRemoteActorSpec extends JUnitSuite { } @Test - def shouldSendAndReceiveRemoteException = { + def shouldSendAndReceiveRemoteException { implicit val timeout = 500000000L val actor = newActor[RemoteActorSpecActorBidirectional] actor.makeRemote(HOSTNAME, PORT1) diff --git a/akka-core/src/test/scala/ExecutorBasedEventDrivenDispatcherActorSpec.scala b/akka-core/src/test/scala/ExecutorBasedEventDrivenDispatcherActorSpec.scala index 957bad5fe3..e03427cc94 100644 --- a/akka-core/src/test/scala/ExecutorBasedEventDrivenDispatcherActorSpec.scala +++ b/akka-core/src/test/scala/ExecutorBasedEventDrivenDispatcherActorSpec.scala @@ -32,7 +32,7 @@ class ExecutorBasedEventDrivenDispatcherActorSpec extends JUnitSuite { private val unit = TimeUnit.MILLISECONDS - @Test def shouldSendOneWay = { + @Test def shouldSendOneWay { val actor = newActor[OneWayTestActor] actor.start val result = actor ! "OneWay" @@ -40,7 +40,7 @@ class ExecutorBasedEventDrivenDispatcherActorSpec extends JUnitSuite { actor.stop } - @Test def shouldSendReplySync = { + @Test def shouldSendReplySync { val actor = newActor[TestActor] actor.start val result: String = (actor !! ("Hello", 10000)).get @@ -48,7 +48,7 @@ class ExecutorBasedEventDrivenDispatcherActorSpec extends JUnitSuite { actor.stop } - @Test def shouldSendReplyAsync = { + @Test def shouldSendReplyAsync { val actor = newActor[TestActor] actor.start val result = actor !! "Hello" @@ -56,7 +56,7 @@ class ExecutorBasedEventDrivenDispatcherActorSpec extends JUnitSuite { actor.stop } - @Test def shouldSendReceiveException = { + @Test def shouldSendReceiveException { val actor = newActor[TestActor] actor.start try { diff --git a/akka-core/src/test/scala/ExecutorBasedEventDrivenDispatcherActorsSpec.scala b/akka-core/src/test/scala/ExecutorBasedEventDrivenDispatcherActorsSpec.scala index 91ba57d5c7..a019bee3b2 100644 --- a/akka-core/src/test/scala/ExecutorBasedEventDrivenDispatcherActorsSpec.scala +++ b/akka-core/src/test/scala/ExecutorBasedEventDrivenDispatcherActorsSpec.scala @@ -14,7 +14,7 @@ import Actor._ */ class ExecutorBasedEventDrivenDispatcherActorsSpec extends JUnitSuite with MustMatchers { class SlowActor(finishedCounter: CountDownLatch) extends Actor { - messageDispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher + dispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher id = "SlowActor" def receive = { @@ -26,7 +26,7 @@ class ExecutorBasedEventDrivenDispatcherActorsSpec extends JUnitSuite with MustM } class FastActor(finishedCounter: CountDownLatch) extends Actor { - messageDispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher + dispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher id = "FastActor" def receive = { @@ -36,7 +36,7 @@ class ExecutorBasedEventDrivenDispatcherActorsSpec extends JUnitSuite with MustM } } - @Test def slowActorShouldntBlockFastActor = { + @Test def slowActorShouldntBlockFastActor { val sFinished = new CountDownLatch(50) val fFinished = new CountDownLatch(10) val s = newActor(() => new SlowActor(sFinished)).start diff --git a/akka-core/src/test/scala/ExecutorBasedEventDrivenWorkStealingDispatcherSpec.scala b/akka-core/src/test/scala/ExecutorBasedEventDrivenWorkStealingDispatcherSpec.scala index 6ab034d8ec..e8ef3a35ff 100644 --- a/akka-core/src/test/scala/ExecutorBasedEventDrivenWorkStealingDispatcherSpec.scala +++ b/akka-core/src/test/scala/ExecutorBasedEventDrivenWorkStealingDispatcherSpec.scala @@ -16,7 +16,7 @@ object ExecutorBasedEventDrivenWorkStealingDispatcherSpec { val parentActorDispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pooled-dispatcher") class DelayableActor(name: String, delay: Int, finishedCounter: CountDownLatch) extends Actor { - messageDispatcher = delayableActorDispatcher + dispatcher = delayableActorDispatcher var invocationCount = 0 id = name @@ -30,17 +30,17 @@ object ExecutorBasedEventDrivenWorkStealingDispatcherSpec { } class FirstActor extends Actor { - messageDispatcher = sharedActorDispatcher + dispatcher = sharedActorDispatcher def receive = {case _ => {}} } class SecondActor extends Actor { - messageDispatcher = sharedActorDispatcher + dispatcher = sharedActorDispatcher def receive = {case _ => {}} } class ParentActor extends Actor { - messageDispatcher = parentActorDispatcher + dispatcher = parentActorDispatcher def receive = {case _ => {}} } @@ -54,7 +54,7 @@ object ExecutorBasedEventDrivenWorkStealingDispatcherSpec { class ExecutorBasedEventDrivenWorkStealingDispatcherSpec extends JUnitSuite with MustMatchers { import ExecutorBasedEventDrivenWorkStealingDispatcherSpec._ - @Test def fastActorShouldStealWorkFromSlowActor = { + @Test def fastActorShouldStealWorkFromSlowActor { val finishedCounter = new CountDownLatch(110) val slow = newActor(() => new DelayableActor("slow", 50, finishedCounter)).start @@ -78,7 +78,8 @@ class ExecutorBasedEventDrivenWorkStealingDispatcherSpec extends JUnitSuite with } finishedCounter.await(5, TimeUnit.SECONDS) - fast.actor.asInstanceOf[DelayableActor].invocationCount must be > (slow.actor.asInstanceOf[DelayableActor].invocationCount) + fast.actor.asInstanceOf[DelayableActor].invocationCount must be > + (slow.actor.asInstanceOf[DelayableActor].invocationCount) slow.stop fast.stop } @@ -94,8 +95,8 @@ class ExecutorBasedEventDrivenWorkStealingDispatcherSpec extends JUnitSuite with } @Test def canNotUseActorsOfDifferentSubTypesInSameDispatcher: Unit = { - val parent = new ParentActor - val child = new ChildActor + val parent = newActor[ParentActor] + val child = newActor[ChildActor] parent.start intercept[IllegalStateException] { diff --git a/akka-core/src/test/scala/ForwardActorSpec.scala b/akka-core/src/test/scala/ForwardActorSpec.scala index 05d06df32b..7722eccc99 100644 --- a/akka-core/src/test/scala/ForwardActorSpec.scala +++ b/akka-core/src/test/scala/ForwardActorSpec.scala @@ -15,7 +15,7 @@ object ForwardActorSpec { val latch = new CountDownLatch(1) def receive = { case "SendBang" => { - ForwardState.sender = Some(replyTo.get.left.get) + ForwardState.sender = Some(self.replyTo.get.left.get) latch.countDown } case "SendBangBang" => reply("SendBangBang") diff --git a/akka-core/src/test/scala/FutureSpec.scala b/akka-core/src/test/scala/FutureSpec.scala index 4f9da6572f..957e845750 100644 --- a/akka-core/src/test/scala/FutureSpec.scala +++ b/akka-core/src/test/scala/FutureSpec.scala @@ -41,7 +41,7 @@ class FutureSpec extends JUnitSuite { } /* - @Test def shouldFutureAwaitEitherLeft = { + @Test def shouldFutureAwaitEitherLeft { val actor1 = newActor[TestActor] actor1.start val actor2 = newActor[TestActor] @@ -55,7 +55,7 @@ class FutureSpec extends JUnitSuite { actor2.stop } - @Test def shouldFutureAwaitEitherRight = { + @Test def shouldFutureAwaitEitherRight { val actor1 = newActor[TestActor] actor1.start val actor2 = newActor[TestActor] @@ -69,7 +69,7 @@ class FutureSpec extends JUnitSuite { actor2.stop } */ - @Test def shouldFutureAwaitOneLeft = { + @Test def shouldFutureAwaitOneLeft { val actor1 = newActor[TestActor] actor1.start val actor2 = newActor[TestActor] @@ -83,7 +83,7 @@ class FutureSpec extends JUnitSuite { actor2.stop } - @Test def shouldFutureAwaitOneRight = { + @Test def shouldFutureAwaitOneRight { val actor1 = newActor[TestActor] actor1.start val actor2 = newActor[TestActor] @@ -97,7 +97,7 @@ class FutureSpec extends JUnitSuite { actor2.stop } - @Test def shouldFutureAwaitAll = { + @Test def shouldFutureAwaitAll { val actor1 = newActor[TestActor] actor1.start val actor2 = newActor[TestActor] diff --git a/akka-core/src/test/scala/ProtobufActorMessageSerializationSpec.scala b/akka-core/src/test/scala/ProtobufActorMessageSerializationSpec.scala index 1af37c7a26..b7ab96b0d3 100644 --- a/akka-core/src/test/scala/ProtobufActorMessageSerializationSpec.scala +++ b/akka-core/src/test/scala/ProtobufActorMessageSerializationSpec.scala @@ -27,7 +27,6 @@ object ProtobufActorMessageSerializationSpec { var server: RemoteServer = null class RemoteActorSpecActorBidirectional extends Actor { - start def receive = { case pojo: ProtobufPOJO => val id = pojo.getId @@ -45,7 +44,7 @@ class ProtobufActorMessageSerializationSpec extends JUnitSuite { def init() { server = new RemoteServer server.start(HOSTNAME, PORT) - server.register("RemoteActorSpecActorBidirectional", newActor[RemoteActorSpecActorBidirectional]) + server.register("RemoteActorSpecActorBidirectional", newActor[RemoteActorSpecActorBidirectional].start) Thread.sleep(1000) } @@ -58,7 +57,7 @@ class ProtobufActorMessageSerializationSpec extends JUnitSuite { } @Test - def shouldSendReplyAsync = { + def shouldSendReplyAsync { val actor = RemoteClient.actorFor("RemoteActorSpecActorBidirectional", 5000L, HOSTNAME, PORT) val result = actor !! ProtobufPOJO.newBuilder .setId(11) diff --git a/akka-core/src/test/scala/ReactorBasedSingleThreadEventDrivenDispatcherActorSpec.scala b/akka-core/src/test/scala/ReactorBasedSingleThreadEventDrivenDispatcherActorSpec.scala index 1c947182e2..acae2ef607 100644 --- a/akka-core/src/test/scala/ReactorBasedSingleThreadEventDrivenDispatcherActorSpec.scala +++ b/akka-core/src/test/scala/ReactorBasedSingleThreadEventDrivenDispatcherActorSpec.scala @@ -35,7 +35,7 @@ class ReactorBasedSingleThreadEventDrivenDispatcherActorSpec extends JUnitSuite private val unit = TimeUnit.MILLISECONDS - @Test def shouldSendOneWay = { + @Test def shouldSendOneWay { val actor = newActor[OneWayTestActor] actor.start val result = actor ! "OneWay" @@ -43,7 +43,7 @@ class ReactorBasedSingleThreadEventDrivenDispatcherActorSpec extends JUnitSuite actor.stop } - @Test def shouldSendReplySync = { + @Test def shouldSendReplySync { val actor = newActor[TestActor] actor.start val result: String = (actor !! ("Hello", 10000)).get @@ -51,7 +51,7 @@ class ReactorBasedSingleThreadEventDrivenDispatcherActorSpec extends JUnitSuite actor.stop } - @Test def shouldSendReplyAsync = { + @Test def shouldSendReplyAsync { val actor = newActor[TestActor] actor.start val result = actor !! "Hello" @@ -59,7 +59,7 @@ class ReactorBasedSingleThreadEventDrivenDispatcherActorSpec extends JUnitSuite actor.stop } - @Test def shouldSendReceiveException = { + @Test def shouldSendReceiveException { val actor = newActor[TestActor] actor.start try { diff --git a/akka-core/src/test/scala/ReactorBasedThreadPoolEventDrivenDispatcherActorSpec.scala b/akka-core/src/test/scala/ReactorBasedThreadPoolEventDrivenDispatcherActorSpec.scala index 4e0b55f91b..322976cc0f 100644 --- a/akka-core/src/test/scala/ReactorBasedThreadPoolEventDrivenDispatcherActorSpec.scala +++ b/akka-core/src/test/scala/ReactorBasedThreadPoolEventDrivenDispatcherActorSpec.scala @@ -24,7 +24,7 @@ class ReactorBasedThreadPoolEventDrivenDispatcherActorSpec extends JUnitSuite { private val unit = TimeUnit.MILLISECONDS - @Test def shouldSendOneWay = { + @Test def shouldSendOneWay { val oneWay = new CountDownLatch(1) val actor = newActor(() => new Actor { dispatcher = Dispatchers.newReactorBasedThreadPoolEventDrivenDispatcher(uuid) @@ -38,7 +38,7 @@ class ReactorBasedThreadPoolEventDrivenDispatcherActorSpec extends JUnitSuite { actor.stop } - @Test def shouldSendReplySync = { + @Test def shouldSendReplySync { val actor = newActor[TestActor] actor.start val result: String = (actor !! ("Hello", 10000)).get @@ -46,7 +46,7 @@ class ReactorBasedThreadPoolEventDrivenDispatcherActorSpec extends JUnitSuite { actor.stop } - @Test def shouldSendReplyAsync = { + @Test def shouldSendReplyAsync { val actor = newActor[TestActor] actor.start val result = actor !! "Hello" @@ -54,7 +54,7 @@ class ReactorBasedThreadPoolEventDrivenDispatcherActorSpec extends JUnitSuite { actor.stop } - @Test def shouldSendReceiveException = { + @Test def shouldSendReceiveException { val actor = newActor[TestActor] actor.start try { diff --git a/akka-core/src/test/scala/RemoteSupervisorSpec.scala b/akka-core/src/test/scala/RemoteSupervisorSpec.scala index e3f06bd555..e01ac11165 100644 --- a/akka-core/src/test/scala/RemoteSupervisorSpec.scala +++ b/akka-core/src/test/scala/RemoteSupervisorSpec.scala @@ -34,7 +34,7 @@ object Log { throw new RuntimeException("DIE") } - override protected def postRestart(reason: Throwable) { + override def postRestart(reason: Throwable) { Log.messageLog.put(reason.getMessage) } } @@ -49,7 +49,7 @@ object Log { throw new RuntimeException("DIE") } - override protected def postRestart(reason: Throwable) { + override def postRestart(reason: Throwable) { Log.messageLog.put(reason.getMessage) } } @@ -64,7 +64,7 @@ object Log { throw new RuntimeException("DIE") } - override protected def postRestart(reason: Throwable) { + override def postRestart(reason: Throwable) { Log.messageLog.put(reason.getMessage) } } @@ -337,6 +337,7 @@ class RemoteSupervisorSpec extends JUnitSuite { pingpong1 = newActor[RemotePingPong1Actor] pingpong1.makeRemote(RemoteServer.HOSTNAME, 9988) + pingpong1.start val factory = SupervisorFactory( SupervisorConfig( @@ -352,6 +353,7 @@ class RemoteSupervisorSpec extends JUnitSuite { def getSingleActorOneForOneSupervisor: Supervisor = { pingpong1 = newActor[RemotePingPong1Actor] pingpong1.makeRemote(RemoteServer.HOSTNAME, 9988) + pingpong1.start val factory = SupervisorFactory( SupervisorConfig( @@ -366,10 +368,13 @@ class RemoteSupervisorSpec extends JUnitSuite { def getMultipleActorsAllForOneConf: Supervisor = { pingpong1 = newActor[RemotePingPong1Actor] pingpong1.makeRemote(RemoteServer.HOSTNAME, 9988) + pingpong1.start pingpong2 = newActor[RemotePingPong2Actor] pingpong2.makeRemote(RemoteServer.HOSTNAME, 9988) + pingpong2.start pingpong3 = newActor[RemotePingPong3Actor] pingpong3.makeRemote(RemoteServer.HOSTNAME, 9988) + pingpong3.start val factory = SupervisorFactory( SupervisorConfig( @@ -392,10 +397,13 @@ class RemoteSupervisorSpec extends JUnitSuite { def getMultipleActorsOneForOneConf: Supervisor = { pingpong1 = newActor[RemotePingPong1Actor] pingpong1.makeRemote(RemoteServer.HOSTNAME, 9988) + pingpong1.start pingpong2 = newActor[RemotePingPong2Actor] pingpong2.makeRemote(RemoteServer.HOSTNAME, 9988) + pingpong2.start pingpong3 = newActor[RemotePingPong3Actor] pingpong3.makeRemote(RemoteServer.HOSTNAME, 9988) + pingpong3.start val factory = SupervisorFactory( SupervisorConfig( @@ -416,11 +424,11 @@ class RemoteSupervisorSpec extends JUnitSuite { } def getNestedSupervisorsAllForOneConf: Supervisor = { - pingpong1 = newActor[RemotePingPong1Actor] + pingpong1 = newActor[RemotePingPong1Actor].start pingpong1.makeRemote(RemoteServer.HOSTNAME, 9988) - pingpong2 = newActor[RemotePingPong2Actor] + pingpong2 = newActor[RemotePingPong2Actor].start pingpong2.makeRemote(RemoteServer.HOSTNAME, 9988) - pingpong3 = newActor[RemotePingPong3Actor] + pingpong3 = newActor[RemotePingPong3Actor].start pingpong3.makeRemote(RemoteServer.HOSTNAME, 9988) val factory = SupervisorFactory( diff --git a/akka-core/src/test/scala/ServerInitiatedRemoteActorSpec.scala b/akka-core/src/test/scala/ServerInitiatedRemoteActorSpec.scala index 51d1e0876f..28262d6fc7 100644 --- a/akka-core/src/test/scala/ServerInitiatedRemoteActorSpec.scala +++ b/akka-core/src/test/scala/ServerInitiatedRemoteActorSpec.scala @@ -83,7 +83,7 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite { } @Test - def shouldSendOneWay = { + def shouldSendOneWay { val actor = RemoteClient.actorFor( "se.scalablesolutions.akka.actor.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorUnidirectional", 5000L, @@ -94,7 +94,7 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite { } @Test - def shouldSendReplyAsync = { + def shouldSendReplyAsync { val actor = RemoteClient.actorFor( "se.scalablesolutions.akka.actor.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorBidirectional", 5000L, @@ -105,13 +105,13 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite { } @Test - def shouldSendRemoteReplyProtocol = { + def shouldSendRemoteReplyProtocol { implicit val timeout = 500000000L val actor = RemoteClient.actorFor( "se.scalablesolutions.akka.actor.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorBidirectional", timeout, HOSTNAME, PORT) - val sender = new RemoteActorSpecActorAsyncSender + val sender = newActor[RemoteActorSpecActorAsyncSender] sender.setReplyToAddress(HOSTNAME, PORT) sender.start sender.send(actor) @@ -120,7 +120,7 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite { } @Test - def shouldSendReceiveException = { + def shouldSendReceiveException { implicit val timeout = 500000000L val actor = RemoteClient.actorFor( "se.scalablesolutions.akka.actor.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorBidirectional", diff --git a/akka-core/src/test/scala/SupervisorSpec.scala b/akka-core/src/test/scala/SupervisorSpec.scala index b2c634cb85..90781d6f45 100644 --- a/akka-core/src/test/scala/SupervisorSpec.scala +++ b/akka-core/src/test/scala/SupervisorSpec.scala @@ -30,7 +30,7 @@ object SupervisorSpec { case Die => throw new RuntimeException("DIE") } - override protected def postRestart(reason: Throwable) { + override def postRestart(reason: Throwable) { messageLog.put(reason.getMessage) } } @@ -43,7 +43,7 @@ object SupervisorSpec { case Die => throw new RuntimeException("DIE") } - override protected def postRestart(reason: Throwable) { + override def postRestart(reason: Throwable) { messageLog.put(reason.getMessage) } } @@ -57,7 +57,7 @@ object SupervisorSpec { throw new RuntimeException("DIE") } - override protected def postRestart(reason: Throwable) { + override def postRestart(reason: Throwable) { messageLog.put(reason.getMessage) } } @@ -419,7 +419,7 @@ class SupervisorSpec extends JUnitSuite { // Creat some supervisors with different configurations def getSingleActorAllForOneSupervisor: Supervisor = { - pingpong1 = newActor[PingPong1Actor] + pingpong1 = newActor[PingPong1Actor].start val factory = SupervisorFactory( SupervisorConfig( @@ -432,53 +432,75 @@ class SupervisorSpec extends JUnitSuite { } def getSingleActorOneForOneSupervisor: Supervisor = { - pingpong1 = newActor[PingPong1Actor] + pingpong1 = newActor[PingPong1Actor].start - val factory = SupervisorFactory( - SupervisorConfig( - RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])), - Supervise( - pingpong1, - LifeCycle(Permanent)) - :: Nil)) - factory.newInstance + Supervisor( + SupervisorConfig( + RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])), + Supervise( + pingpong1, + LifeCycle(Permanent)) + :: Nil)) } def getMultipleActorsAllForOneConf: Supervisor = { - pingpong1 = newActor[PingPong1Actor] - pingpong2 = newActor[PingPong2Actor] - pingpong3 = newActor[PingPong3Actor] + pingpong1 = newActor[PingPong1Actor].start + pingpong2 = newActor[PingPong2Actor].start + pingpong3 = newActor[PingPong3Actor].start - val factory = SupervisorFactory( - SupervisorConfig( - RestartStrategy(AllForOne, 3, 100, List(classOf[Exception])), - Supervise( - pingpong1, - LifeCycle(Permanent)) - :: - Supervise( - pingpong2, - LifeCycle(Permanent)) - :: - Supervise( - pingpong3, - LifeCycle(Permanent)) - :: Nil)) - factory.newInstance + Supervisor( + SupervisorConfig( + RestartStrategy(AllForOne, 3, 100, List(classOf[Exception])), + Supervise( + pingpong1, + LifeCycle(Permanent)) + :: + Supervise( + pingpong2, + LifeCycle(Permanent)) + :: + Supervise( + pingpong3, + LifeCycle(Permanent)) + :: Nil)) } def getMultipleActorsOneForOneConf: Supervisor = { - pingpong1 = newActor[PingPong1Actor] - pingpong2 = newActor[PingPong2Actor] - pingpong3 = newActor[PingPong3Actor] + pingpong1 = newActor[PingPong1Actor].start + pingpong2 = newActor[PingPong2Actor].start + pingpong3 = newActor[PingPong3Actor].start - val factory = SupervisorFactory( + Supervisor( + SupervisorConfig( + RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])), + Supervise( + pingpong1, + LifeCycle(Permanent)) + :: + Supervise( + pingpong2, + LifeCycle(Permanent)) + :: + Supervise( + pingpong3, + LifeCycle(Permanent)) + :: Nil)) + } + + def getNestedSupervisorsAllForOneConf: Supervisor = { + pingpong1 = newActor[PingPong1Actor].start + pingpong2 = newActor[PingPong2Actor].start + pingpong3 = newActor[PingPong3Actor].start + + Supervisor( + SupervisorConfig( + RestartStrategy(AllForOne, 3, 100, List(classOf[Exception])), + Supervise( + pingpong1, + LifeCycle(Permanent)) + :: SupervisorConfig( - RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])), - Supervise( - pingpong1, - LifeCycle(Permanent)) - :: + RestartStrategy(AllForOne, 3, 100, Nil), Supervise( pingpong2, LifeCycle(Permanent)) @@ -486,33 +508,7 @@ class SupervisorSpec extends JUnitSuite { Supervise( pingpong3, LifeCycle(Permanent)) - :: Nil)) - factory.newInstance + :: Nil) + :: Nil)) } - - def getNestedSupervisorsAllForOneConf: Supervisor = { - pingpong1 = newActor[PingPong1Actor] - pingpong2 = newActor[PingPong2Actor] - pingpong3 = newActor[PingPong3Actor] - - val factory = SupervisorFactory( - SupervisorConfig( - RestartStrategy(AllForOne, 3, 100, List(classOf[Exception])), - Supervise( - pingpong1, - LifeCycle(Permanent)) - :: - SupervisorConfig( - RestartStrategy(AllForOne, 3, 100, Nil), - Supervise( - pingpong2, - LifeCycle(Permanent)) - :: - Supervise( - pingpong3, - LifeCycle(Permanent)) - :: Nil) - :: Nil)) - factory.newInstance - } } diff --git a/akka-core/src/test/scala/ThreadBasedActorSpec.scala b/akka-core/src/test/scala/ThreadBasedActorSpec.scala index 6466e5749b..935d74e872 100644 --- a/akka-core/src/test/scala/ThreadBasedActorSpec.scala +++ b/akka-core/src/test/scala/ThreadBasedActorSpec.scala @@ -25,39 +25,35 @@ class ThreadBasedActorSpec extends JUnitSuite { private val unit = TimeUnit.MILLISECONDS - @Test def shouldSendOneWay = { + @Test def shouldSendOneWay { var oneWay = new CountDownLatch(1) val actor = newActor(() => new Actor { dispatcher = Dispatchers.newThreadBasedDispatcher(this) def receive = { case "OneWay" => oneWay.countDown } - }) - actor.start + }).start val result = actor ! "OneWay" assert(oneWay.await(1, TimeUnit.SECONDS)) actor.stop } - @Test def shouldSendReplySync = { - val actor = newActor[TestActor] - actor.start + @Test def shouldSendReplySync { + val actor = newActor[TestActor].start val result: String = (actor !! ("Hello", 10000)).get assert("World" === result) actor.stop } - @Test def shouldSendReplyAsync = { - val actor = newActor[TestActor] - actor.start + @Test def shouldSendReplyAsync { + val actor = newActor[TestActor].start val result = actor !! "Hello" assert("World" === result.get.asInstanceOf[String]) actor.stop } - @Test def shouldSendReceiveException = { - val actor = newActor[TestActor] - actor.start + @Test def shouldSendReceiveException { + val actor = newActor[TestActor].start try { actor !! "Failure" fail("Should have thrown an exception") diff --git a/akka-samples/akka-sample-chat/src/main/scala/ChatServer.scala b/akka-samples/akka-sample-chat/src/main/scala/ChatServer.scala index c2ce76e1fa..a293381d9c 100644 --- a/akka-samples/akka-sample-chat/src/main/scala/ChatServer.scala +++ b/akka-samples/akka-sample-chat/src/main/scala/ChatServer.scala @@ -205,8 +205,7 @@ class ChatService extends SessionManagement with ChatManagement with RedisChatStorageFactory { - override def start { - super.start + override def init = { RemoteNode.start("localhost", 9999) RemoteNode.register("chat:service", self) } diff --git a/config/akka-reference.conf b/config/akka-reference.conf index f5ac4d724e..7d287344d2 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -8,7 +8,7 @@ filename = "./logs/akka.log" roll = "daily" # Options: never, hourly, daily, sunday/monday/... - level = "info" # Options: fatal, critical, error, warning, info, debug, trace + level = "debug" # Options: fatal, critical, error, warning, info, debug, trace console = on # syslog_host = "" # syslog_server_name = ""