diff --git a/akka-amqp/src/main/scala/AMQP.scala b/akka-amqp/src/main/scala/AMQP.scala index 84f52088e7..1886577fc6 100644 --- a/akka-amqp/src/main/scala/AMQP.scala +++ b/akka-amqp/src/main/scala/AMQP.scala @@ -79,11 +79,12 @@ object AMQP { */ class AMQPSupervisor extends Actor with Logging { import scala.collection.JavaConversions._ - + import self._ + private val connections = new ConcurrentHashMap[ActorRef, ActorRef] - faultHandler = Some(OneForOneStrategy(5, 5000)) - trapExit = List(classOf[Throwable]) + self.faultHandler = Some(OneForOneStrategy(5, 5000)) + self.trapExit = List(classOf[Throwable]) start def newProducer( @@ -362,7 +363,8 @@ object AMQP { extends FaultTolerantConnectionActor { import scala.collection.JavaConversions._ - + import self._ + faultHandler = Some(OneForOneStrategy(5, 5000)) trapExit = List(classOf[Throwable]) diff --git a/akka-amqp/src/main/scala/ExampleSession.scala b/akka-amqp/src/main/scala/ExampleSession.scala index c15ae49b97..119ae07a07 100644 --- a/akka-amqp/src/main/scala/ExampleSession.scala +++ b/akka-amqp/src/main/scala/ExampleSession.scala @@ -5,7 +5,6 @@ package se.scalablesolutions.akka.amqp import se.scalablesolutions.akka.actor.Actor._ -import se.scalablesolutions.akka.actor.Actor.Sender.Self import com.rabbitmq.client.ConnectionParameters diff --git a/akka-camel/src/main/scala/Producer.scala b/akka-camel/src/main/scala/Producer.scala index ae50754e49..6609318900 100644 --- a/akka-camel/src/main/scala/Producer.scala +++ b/akka-camel/src/main/scala/Producer.scala @@ -20,7 +20,7 @@ import se.scalablesolutions.akka.util.Logging * @author Martin Krasser */ trait Producer { this: Actor => - + private val headersToCopyDefault = Set(Message.MessageExchangeId) /** @@ -128,7 +128,7 @@ trait Producer { this: Actor => case msg => { if ( oneway && !async) produceOnewaySync(msg) else if ( oneway && async) produceOnewayAsync(msg) - else if (!oneway && !async) reply(produceSync(msg)) + else if (!oneway && !async) self.reply(produceSync(msg)) else /*(!oneway && async)*/ produceAsync(msg) } } diff --git a/akka-camel/src/test/scala/component/ActorComponentFeatureTest.scala b/akka-camel/src/test/scala/component/ActorComponentFeatureTest.scala index 8e5195cb33..3c100d0bc3 100644 --- a/akka-camel/src/test/scala/component/ActorComponentFeatureTest.scala +++ b/akka-camel/src/test/scala/component/ActorComponentFeatureTest.scala @@ -3,7 +3,7 @@ package se.scalablesolutions.akka.camel.component import org.apache.camel.RuntimeCamelException import org.scalatest.{BeforeAndAfterEach, BeforeAndAfterAll, FeatureSpec} -import se.scalablesolutions.akka.actor.ActorRegistry +import se.scalablesolutions.akka.actor.{ActorRegistry, Actor} import se.scalablesolutions.akka.camel.support.{Respond, Countdown, Tester, Retain} import se.scalablesolutions.akka.camel.{Message, CamelContextManager} @@ -20,39 +20,40 @@ class ActorComponentFeatureTest extends FeatureSpec with BeforeAndAfterAll with feature("Communicate with an actor from a Camel application using actor endpoint URIs") { import CamelContextManager.template - + import Actor._ + scenario("one-way communication using actor id") { - val actor = new Tester with Retain with Countdown[Message] + val actor = newActor(() => new Tester with Retain with Countdown[Message]) actor.start template.sendBody("actor:%s" format actor.id, "Martin") - assert(actor.waitFor) - assert(actor.body === "Martin") + assert(actor.actor.asInstanceOf[Countdown[Message]].waitFor) + assert(actor.actor.asInstanceOf[Retain].body === "Martin") } scenario("one-way communication using actor uuid") { - val actor = new Tester with Retain with Countdown[Message] + val actor = newActor(() => new Tester with Retain with Countdown[Message]) actor.start template.sendBody("actor:uuid:%s" format actor.uuid, "Martin") - assert(actor.waitFor) - assert(actor.body === "Martin") + assert(actor.actor.asInstanceOf[Countdown[Message]].waitFor) + assert(actor.actor.asInstanceOf[Retain].body === "Martin") } scenario("two-way communication using actor id") { - val actor = new Tester with Respond + val actor = newActor(() => new Tester with Respond) actor.start assert(template.requestBody("actor:%s" format actor.id, "Martin") === "Hello Martin") } scenario("two-way communication using actor uuid") { - val actor = new Tester with Respond + val actor = newActor(() => new Tester with Respond) actor.start assert(template.requestBody("actor:uuid:%s" format actor.uuid, "Martin") === "Hello Martin") } scenario("two-way communication with timeout") { - val actor = new Tester { - timeout = 1 - } + val actor = newActor(() => new Tester { + self.timeout = 1 + }) actor.start intercept[RuntimeCamelException] { template.requestBody("actor:uuid:%s" format actor.uuid, "Martin") diff --git a/akka-camel/src/test/scala/component/ActorProducerTest.scala b/akka-camel/src/test/scala/component/ActorProducerTest.scala index 5f7059295f..21a9af4be5 100644 --- a/akka-camel/src/test/scala/component/ActorProducerTest.scala +++ b/akka-camel/src/test/scala/component/ActorProducerTest.scala @@ -61,7 +61,7 @@ class ActorProducerTest extends JUnitSuite with BeforeAndAfterAll { @Test def shouldSendMessageToActorAndTimeout: Unit = { val actor = newActor(() => new Tester { - timeout = 1 + self.timeout = 1 }) val endpoint = mockEndpoint("actor:uuid:%s" format actor.uuid) val exchange = endpoint.createExchange(ExchangePattern.InOut) diff --git a/akka-camel/src/test/scala/service/CamelServiceFeatureTest.scala b/akka-camel/src/test/scala/service/CamelServiceFeatureTest.scala index b876fe14c8..b7c53f42ae 100644 --- a/akka-camel/src/test/scala/service/CamelServiceFeatureTest.scala +++ b/akka-camel/src/test/scala/service/CamelServiceFeatureTest.scala @@ -12,14 +12,14 @@ object CamelServiceFeatureTest { class TestConsumer(uri: String) extends Actor with Consumer { def endpointUri = uri protected def receive = { - case msg: Message => reply("received %s" format msg.body) + case msg: Message => self.reply("received %s" format msg.body) } } class TestActor extends Actor { - id = "custom-actor-id" + self.id = "custom-actor-id" protected def receive = { - case msg: Message => reply("received %s" format msg.body) + case msg: Message => self.reply("received %s" format msg.body) } } diff --git a/akka-camel/src/test/scala/service/PublishTest.scala b/akka-camel/src/test/scala/service/PublishTest.scala index 69910254ec..71ccea65d4 100644 --- a/akka-camel/src/test/scala/service/PublishTest.scala +++ b/akka-camel/src/test/scala/service/PublishTest.scala @@ -11,7 +11,7 @@ import se.scalablesolutions.akka.camel.Consumer object PublishTest { @consume("mock:test1") class ConsumeAnnotatedActor extends Actor { - id = "test" + self.id = "test" protected def receive = null } diff --git a/akka-camel/src/test/scala/support/TestSupport.scala b/akka-camel/src/test/scala/support/TestSupport.scala index 8dc7d4dd04..98bd23d5ed 100644 --- a/akka-camel/src/test/scala/support/TestSupport.scala +++ b/akka-camel/src/test/scala/support/TestSupport.scala @@ -9,10 +9,10 @@ trait Receive[T] { def onMessage(msg: T): Unit } -trait Respond extends Receive[Message] {self: Actor => +trait Respond extends Receive[Message] { this: Actor => abstract override def onMessage(msg: Message): Unit = { super.onMessage(msg) - reply(response(msg)) + this.self.reply(response(msg)) } def response(msg: Message): Any = "Hello %s" format msg.body } diff --git a/akka-core/src/main/scala/actor/ActiveObject.scala b/akka-core/src/main/scala/actor/ActiveObject.scala index 7a4656cec8..bb71179fa2 100644 --- a/akka-core/src/main/scala/actor/ActiveObject.scala +++ b/akka-core/src/main/scala/actor/ActiveObject.scala @@ -284,7 +284,7 @@ object ActiveObject { private[akka] def newInstance[T](target: Class[T], actorRef: ActorRef, remoteAddress: Option[InetSocketAddress], timeout: Long): T = { val proxy = Proxy.newInstance(target, false, false) actorRef.actor.asInstanceOf[Dispatcher].initialize(target, proxy) - actorRef.actor.timeout = timeout + actorRef.timeout = timeout if (remoteAddress.isDefined) actorRef.makeRemote(remoteAddress.get) AspectInitRegistry.register(proxy, AspectInit(target, actorRef, remoteAddress, timeout)) actorRef.start @@ -294,7 +294,7 @@ object ActiveObject { private[akka] def newInstance[T](intf: Class[T], target: AnyRef, actorRef: ActorRef, remoteAddress: Option[InetSocketAddress], timeout: Long): T = { val proxy = Proxy.newInstance(Array(intf), Array(target), false, false) actorRef.actor.asInstanceOf[Dispatcher].initialize(target.getClass, target) - actorRef.actor.timeout = timeout + actorRef.timeout = timeout if (remoteAddress.isDefined) actorRef.makeRemote(remoteAddress.get) AspectInitRegistry.register(proxy, AspectInit(intf, actorRef, remoteAddress, timeout)) actorRef.start @@ -544,8 +544,8 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Op def this(transactionalRequired: Boolean) = this(transactionalRequired,None) private[actor] def initialize(targetClass: Class[_], targetInstance: AnyRef) = { - if (transactionalRequired || targetClass.isAnnotationPresent(Annotations.transactionrequired)) makeTransactionRequired - id = targetClass.getName + if (transactionalRequired || targetClass.isAnnotationPresent(Annotations.transactionrequired)) self.makeTransactionRequired + self.id = targetClass.getName target = Some(targetInstance) val methods = targetInstance.getClass.getDeclaredMethods.toList @@ -592,10 +592,10 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Op case Invocation(joinPoint, isOneWay, _) => if (Actor.SERIALIZE_MESSAGES) serializeArguments(joinPoint) if (isOneWay) joinPoint.proceed - else reply(joinPoint.proceed) -// Jan Kronquist: started work on issue 121 - case Link(target) => link(target) - case Unlink(target) => unlink(target) + else self.reply(joinPoint.proceed) + // Jan Kronquist: started work on issue 121 + case Link(target) => self.link(target) + case Unlink(target) => self.unlink(target) case unexpected => throw new IllegalStateException("Unexpected message [" + unexpected + "] sent to [" + this + "]") } diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index faac8f56bc..1d3a37a4b8 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -50,7 +50,7 @@ trait ActorWithNestedReceive extends Actor { * @author Jonas Bonér */ trait Transactor extends Actor { - makeTransactionRequired + self.makeTransactionRequired } /** @@ -61,7 +61,7 @@ trait Transactor extends Actor { * @author Jonas Bonér */ abstract class RemoteActor(hostname: String, port: Int) extends Actor { - makeRemote(hostname, port) + self.makeRemote(hostname, port) } // Life-cycle messages for the Actors @@ -85,18 +85,10 @@ class ActorInitializationException private[akka](message: String) extends Runtim */ object Actor extends Logging { val TIMEOUT = config.getInt("akka.actor.timeout", 5000) - val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost") - val PORT = config.getInt("akka.remote.server.port", 9999) val SERIALIZE_MESSAGES = config.getBool("akka.actor.serialize-messages", false) private[actor] val actorRefInCreation = new scala.util.DynamicVariable[Option[ActorRef]](None) - // FIXME remove next release - object Sender { - @deprecated("import Actor.Sender.Self is not needed anymore, just use 'actor ! msg'") - object Self - } - /** * Creates a Actor.newActor out of the Actor with type T. *
@@ -143,7 +135,7 @@ object Actor extends Logging {
    */
   def actor(body: PartialFunction[Any, Unit]): ActorRef =
     newActor(() => new Actor() {
-      lifeCycle = Some(LifeCycle(Permanent))
+      self.lifeCycle = Some(LifeCycle(Permanent))
       def receive: PartialFunction[Any, Unit] = body
     }).start
 
@@ -165,7 +157,7 @@ object Actor extends Logging {
    */
   def transactor(body: PartialFunction[Any, Unit]): ActorRef =
     newActor(() => new Transactor() {
-      lifeCycle = Some(LifeCycle(Permanent))
+      self.lifeCycle = Some(LifeCycle(Permanent))
       def receive: PartialFunction[Any, Unit] = body
     }).start
 
@@ -185,7 +177,7 @@ object Actor extends Logging {
    */
   def temporaryActor(body: PartialFunction[Any, Unit]): ActorRef =
     newActor(() => new Actor() {
-      lifeCycle = Some(LifeCycle(Temporary))
+      self.lifeCycle = Some(LifeCycle(Temporary))
       def receive = body
     }).start
 
@@ -210,7 +202,7 @@ object Actor extends Logging {
     def handler[A](body: => Unit) = new {
       def receive(handler: PartialFunction[Any, Unit]) =
         newActor(() => new Actor() {
-          lifeCycle = Some(LifeCycle(Permanent))
+          self.lifeCycle = Some(LifeCycle(Permanent))
           body
           def receive = handler
         }).start
@@ -238,7 +230,7 @@ object Actor extends Logging {
     newActor(() => new Actor() {
       self ! Spawn
       def receive = {
-        case Spawn => body; stop
+        case Spawn => body; self.stop
       }
     }).start
   }
@@ -258,12 +250,12 @@ object Actor extends Logging {
  * @author Jonas Bonér
  */
 trait Actor extends Logging {
- 
+
    /** 
     * For internal use only, functions as the implicit sender references when invoking 
     * one of the message send functions (!, !!, !!! and forward).
     */
-  protected implicit val _selfSenderRef: Some[ActorRef] = { 
+  implicit val optionSelf: Option[ActorRef] = { 
     val ref = Actor.actorRefInCreation.value
     Actor.actorRefInCreation.value = None
     if (ref.isEmpty) throw new ActorInitializationException(
@@ -273,9 +265,11 @@ trait Actor extends Logging {
        "\n\tEither use:" + 
        "\n\t\t'val actor = Actor.newActor[MyActor]', or" + 
        "\n\t\t'val actor = Actor.newActor(() => new MyActor(..))'")
-    else ref.asInstanceOf[Some[ActorRef]]
+    else ref
   }
 
+  implicit val someSelf: Some[ActorRef] = optionSelf.asInstanceOf[Some[ActorRef]]
+
   /**
    * The 'self' field holds the ActorRef for this actor.
    * 

@@ -284,29 +278,32 @@ trait Actor extends Logging { * self ! message *

*/ - val self: ActorRef = _selfSenderRef.get + val self: ActorRef = optionSelf.get + self.id = getClass.getName + /** - * The default dispatcher is the Dispatchers.globalExecutorBasedEventDrivenDispatcher. - * This means that all actors will share the same event-driven executor based dispatcher. + * User overridable callback/setting. *

- * You can override it so it fits the specific use-case that the actor is used for. - * See the se.scalablesolutions.akka.dispatch.Dispatchers class for the different - * dispatchers available. + * Partial function implementing the actor logic. + * To be implemented by subclassing actor. *

- * The default is also that all actors that are created and spawned from within this actor - * is sharing the same dispatcher as its creator. + * Example code: + *

+   *   def receive = {
+   *     case Ping =>
+   *       println("got a ping")
+   *       self.reply("pong")
+   *
+   *     case OneWay =>
+   *       println("got a oneway")
+   *
+   *     case _ =>
+   *       println("unknown message, ignoring")
+   * }
+   * 
*/ - private[akka] var _messageDispatcher: MessageDispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher - - /** - * Holds the hot swapped partial function. - */ - private var _hotswap: Option[PartialFunction[Any, Unit]] = None // FIXME: _hotswap should be a stack - - // ======================================== - // ==== CALLBACKS FOR USER TO OVERRIDE ==== - // ======================================== + protected def receive: PartialFunction[Any, Unit] /** * User overridable callback/setting. @@ -348,282 +345,14 @@ trait Actor extends Logging { */ def shutdown {} - /** - * User overridable callback/setting. - *

- * Partial function implementing the actor logic. - * To be implemented by subclassing actor. - *

- * Example code: - *

-   *   def receive = {
-   *     case Ping =>
-   *       println("got a ping")
-   *       reply("pong")
-   *
-   *     case OneWay =>
-   *       println("got a oneway")
-   *
-   *     case _ =>
-   *       println("unknown message, ignoring")
-   * }
-   * 
- */ - protected def receive: PartialFunction[Any, Unit] - - // ================== - // ==== USER API ==== - // ================== - - /** - * Forwards the message and passes the original sender actor as the sender. - *

- * Works with '!', '!!' and '!!!'. - */ - def forward(message: Any)(implicit sender: Some[ActorRef]) = self.forward(message)(sender) - - /** - * User overridable callback/setting. - *

- * Identifier for actor, does not have to be a unique one. - * Default is the class name. - *

- * This field is used for logging, AspectRegistry.actorsFor, identifier for remote actor in RemoteServer etc. - * But also as the identifier for persistence, which means that you can - * use a custom name to be able to retrieve the "correct" persisted state - * upon restart, remote restart etc. - */ - protected[akka] var id: String = this.getClass.getName - - /** - * User overridable callback/setting. - *

- * Defines the default timeout for '!!' and '!!!' invocations, - * e.g. the timeout for the future returned by the call to '!!' and '!!!'. - */ - @volatile var timeout: Long = Actor.TIMEOUT - - /** - * User overridable callback/setting. - *

- * Set trapExit to the list of exception classes that the actor should be able to trap - * from the actor it is supervising. When the supervising actor throws these exceptions - * then they will trigger a restart. - *

- *

-   * // trap no exceptions
-   * trapExit = Nil
-   *
-   * // trap all exceptions
-   * trapExit = List(classOf[Throwable])
-   *
-   * // trap specific exceptions only
-   * trapExit = List(classOf[MyApplicationException], classOf[MyApplicationError])
-   * 
- */ - protected[akka] var trapExit: List[Class[_ <: Throwable]] = Nil - - /** - * User overridable callback/setting. - *

- * If 'trapExit' is set for the actor to act as supervisor, then a faultHandler must be defined. - * Can be one of: - *

-   *  faultHandler = Some(AllForOneStrategy(maxNrOfRetries, withinTimeRange))
-   *
-   *  faultHandler = Some(OneForOneStrategy(maxNrOfRetries, withinTimeRange))
-   * 
- */ - protected[akka] var faultHandler: Option[FaultHandlingStrategy] = None - - /** - * User overridable callback/setting. - *

- * Defines the life-cycle for a supervised actor. - */ - @volatile var lifeCycle: Option[LifeCycle] = None - - /** - * Use reply(..) to reply with a message to the original sender of the message currently - * being processed. - *

- * Throws an IllegalStateException if unable to determine what to reply to - */ - protected[this] def reply(message: Any) = if(!reply_?(message)) throw new IllegalStateException( - "\n\tNo sender in scope, can't reply. " + - "\n\tYou have probably used the '!' method to either; " + - "\n\t\t1. Send a message to a remote actor which does not have a contact address." + - "\n\t\t2. Send a message from an instance that is *not* an actor" + - "\n\t\t3. Send a message to an Active Object annotated with the '@oneway' annotation? " + - "\n\tIf so, switch to '!!' (or remove '@oneway') which passes on an implicit future" + - "\n\tthat will be bound by the argument passed to 'reply'." + - "\n\tAlternatively, you can use setReplyToAddress to make sure the actor can be contacted over the network.") - - /** - * Use reply_?(..) to reply with a message to the original sender of the message currently - * being processed. - *

- * Returns true if reply was sent, and false if unable to determine what to reply to. - */ - protected[this] def reply_?(message: Any): Boolean = self.replyTo match { - case Some(Left(actor)) => - actor ! message - true - case Some(Right(future: Future[Any])) => - future completeWithResult message - true - case _ => - false - } - - /** - * Starts the actor. - */ - def start = self.startOnCreation = true - - /** - * Shuts down the actor its dispatcher and message queue. - * Alias for 'stop'. - */ - def exit = self.stop - - /** - * Shuts down the actor its dispatcher and message queue. - */ - def stop = self.stop - - /** - * Returns the uuid for the actor. - */ - def uuid = self.uuid - - /** - * Sets the dispatcher for this actor. Needs to be invoked before the actor is started. - *

- * The default dispatcher is the Dispatchers.globalExecutorBasedEventDrivenDispatcher. - * This means that all actors will share the same event-driven executor based dispatcher. - *

- * You can override it so it fits the specific use-case that the actor is used for. - * See the se.scalablesolutions.akka.dispatch.Dispatchers class for the different - * dispatchers available. - *

- * The default is also that all actors that are created and spawned from within this actor - * is sharing the same dispatcher as its creator. - */ - def dispatcher_=(md: MessageDispatcher): Unit = - if (!self.isRunning) _messageDispatcher = md - else throw new IllegalArgumentException( - "Can not swap dispatcher for " + toString + " after it has been started") - - /** - * Get the dispatcher for this actor. - */ - def dispatcher: MessageDispatcher = _messageDispatcher - - /** - * Invoking 'makeTransactionRequired' means that the actor will **start** a new transaction if non exists. - * However, it will always participate in an existing transaction. - * If transactionality want to be completely turned off then do it by invoking: - *

-   *  TransactionManagement.disableTransactions
-   * 
- */ - def makeTransactionRequired = self.makeTransactionRequired - - /** - * Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host. - */ - def makeRemote(hostname: String, port: Int): Unit = makeRemote(new InetSocketAddress(hostname, port)) - - /** - * Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host. - */ - def makeRemote(address: InetSocketAddress): Unit = self.makeRemote(address) - - /** - * Set the contact address for this actor. This is used for replying to messages sent - * asynchronously when no reply channel exists. - */ - def setReplyToAddress(hostname: String, port: Int): Unit = self.setReplyToAddress(new InetSocketAddress(hostname, port)) - - /** - * Set the contact address for this actor. This is used for replying to messages sent - * asynchronously when no reply channel exists. - */ - def setReplyToAddress(address: InetSocketAddress): Unit = self.setReplyToAddress(address) - - /** - * Links an other actor to this actor. Links are unidirectional and means that a the linking actor will - * receive a notification if the linked actor has crashed. - *

- * If the 'trapExit' member field has been set to at contain at least one exception class then it will - * 'trap' these exceptions and automatically restart the linked actors according to the restart strategy - * defined by the 'faultHandler'. - *

- * To be invoked from within the actor itself. - */ - protected[this] def link(actorRef: ActorRef) = self.link(actorRef) - - /** - * Unlink the actor. - *

- * To be invoked from within the actor itself. - */ - protected[this] def unlink(actorRef: ActorRef) = self.unlink(actorRef) - - /** - * Atomically start and link an actor. - *

- * To be invoked from within the actor itself. - */ - protected[this] def startLink(actorRef: ActorRef) = self.startLink(actorRef) - - /** - * Atomically start, link and make an actor remote. - *

- * To be invoked from within the actor itself. - */ - protected[this] def startLinkRemote(actorRef: ActorRef, hostname: String, port: Int) = - self.startLinkRemote(actorRef, hostname, port) - - /** - * Atomically create (from actor class) and start an actor. - *

- * To be invoked from within the actor itself. - */ - protected[this] def spawn[T <: Actor : Manifest]: ActorRef = self.spawn[T] - - /** - * Atomically create (from actor class), start and make an actor remote. - *

- * To be invoked from within the actor itself. - */ - protected[this] def spawnRemote[T <: Actor: Manifest](hostname: String, port: Int): ActorRef = - self.spawnRemote[T](hostname, port) - - /** - * Atomically create (from actor class), start and link an actor. - *

- * To be invoked from within the actor itself. - */ - protected[this] def spawnLink[T <: Actor: Manifest]: ActorRef = self.spawnLink[T] - - /** - * User overridable callback/setting. - *

- * Set to true if messages should have REQUIRES_NEW semantics, e.g. a new transaction should - * start if there is no one running, else it joins the existing transaction. - */ - protected[this] def isTransactor_=(flag: Boolean) = self.isTransactor = flag - // ========================================= // ==== INTERNAL IMPLEMENTATION DETAILS ==== // ========================================= - private[akka] def base: PartialFunction[Any, Unit] = lifeCycles orElse (_hotswap getOrElse receive) + private[akka] def base: PartialFunction[Any, Unit] = lifeCycles orElse (self.hotswap getOrElse receive) private val lifeCycles: PartialFunction[Any, Unit] = { - case HotSwap(code) => _hotswap = code + case HotSwap(code) => self.hotswap = code case Restart(reason) => self.restart(reason) case Exit(dead, reason) => self.handleTrapExit(dead, reason) case Unlink(child) => self.unlink(child) @@ -631,15 +360,11 @@ trait Actor extends Logging { case Kill => throw new ActorKilledException("Actor [" + toString + "] was killed by a Kill message") } - override def hashCode(): Int = HashCode.hash(HashCode.SEED, uuid) + override def hashCode: Int = self.hashCode - override def equals(that: Any): Boolean = { - that != null && - that.isInstanceOf[Actor] && - that.asInstanceOf[Actor].uuid == uuid - } + override def equals(that: Any): Boolean = self.equals(that) - override def toString = "Actor[" + id + ":" + uuid + "]" + override def toString = self.toString } /** diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala index 6a32a48ba6..5f8a6d93c8 100644 --- a/akka-core/src/main/scala/actor/ActorRef.scala +++ b/akka-core/src/main/scala/actor/ActorRef.scala @@ -14,7 +14,7 @@ import se.scalablesolutions.akka.stm.TransactionManagement import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.{RemoteRequestProtocol, RemoteReplyProtocol, ActorRefProtocol} import se.scalablesolutions.akka.remote.{RemoteNode, RemoteServer, RemoteClient, RemoteProtocolBuilder, RemoteRequestProtocolIdFactory} import se.scalablesolutions.akka.serialization.Serializer -import se.scalablesolutions.akka.util.{HashCode, Logging, UUID, ReadWriteLock} +import se.scalablesolutions.akka.util.{HashCode, Logging, UUID, ReentrantGuard} import org.multiverse.api.ThreadLocalTransaction._ import org.multiverse.commitbarriers.CountDownCommitBarrier @@ -22,7 +22,7 @@ import org.multiverse.commitbarriers.CountDownCommitBarrier import jsr166x.{Deque, ConcurrentLinkedDeque} import java.net.InetSocketAddress -import java.util.concurrent.locks.{Lock, ReentrantLock} +import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.ConcurrentHashMap import java.util.{Map => JMap} @@ -115,10 +115,89 @@ trait ActorRef extends TransactionManagement { @volatile protected[this] var _isSuspended = true @volatile protected[this] var _isShutDown = false @volatile protected[akka] var _isKilled = false + @volatile protected[akka] var _homeAddress = new InetSocketAddress(RemoteServer.HOSTNAME, RemoteServer.PORT) @volatile protected[akka] var startOnCreation = false @volatile protected[akka] var registeredInRemoteNodeDuringSerialization = false - protected[this] val guard = new ReadWriteLock + protected[this] val guard = new ReentrantGuard + + /** + * User overridable callback/setting. + *

+ * Identifier for actor, does not have to be a unique one. Default is the 'uuid'. + *

+ * This field is used for logging, AspectRegistry.actorsFor, identifier for remote + * actor in RemoteServer etc.But also as the identifier for persistence, which means + * that you can use a custom name to be able to retrieve the "correct" persisted state + * upon restart, remote restart etc. + */ + @volatile var id: String = _uuid + + /** + * User overridable callback/setting. + *

+ * Defines the default timeout for '!!' and '!!!' invocations, + * e.g. the timeout for the future returned by the call to '!!' and '!!!'. + */ + @volatile var timeout: Long = Actor.TIMEOUT + + /** + * User overridable callback/setting. + *

+ * Set trapExit to the list of exception classes that the actor should be able to trap + * from the actor it is supervising. When the supervising actor throws these exceptions + * then they will trigger a restart. + *

+ *

+   * // trap no exceptions
+   * trapExit = Nil
+   *
+   * // trap all exceptions
+   * trapExit = List(classOf[Throwable])
+   *
+   * // trap specific exceptions only
+   * trapExit = List(classOf[MyApplicationException], classOf[MyApplicationError])
+   * 
+ */ + @volatile var trapExit: List[Class[_ <: Throwable]] = Nil + + /** + * User overridable callback/setting. + *

+ * If 'trapExit' is set for the actor to act as supervisor, then a faultHandler must be defined. + * Can be one of: + *

+   *  faultHandler = Some(AllForOneStrategy(maxNrOfRetries, withinTimeRange))
+   *
+   *  faultHandler = Some(OneForOneStrategy(maxNrOfRetries, withinTimeRange))
+   * 
+ */ + @volatile var faultHandler: Option[FaultHandlingStrategy] = None + + /** + * User overridable callback/setting. + *

+ * Defines the life-cycle for a supervised actor. + */ + @volatile var lifeCycle: Option[LifeCycle] = None + + /** + * The default dispatcher is the Dispatchers.globalExecutorBasedEventDrivenDispatcher. + * This means that all actors will share the same event-driven executor based dispatcher. + *

+ * You can override it so it fits the specific use-case that the actor is used for. + * See the se.scalablesolutions.akka.dispatch.Dispatchers class for the different + * dispatchers available. + *

+ * The default is also that all actors that are created and spawned from within this actor + * is sharing the same dispatcher as its creator. + */ + private[akka] var _dispatcher: MessageDispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher + + /** + * Holds the hot swapped partial function. + */ + protected[akka] var hotswap: Option[PartialFunction[Any, Unit]] = None // FIXME: _hotswap should be a stack /** * User overridable callback/setting. @@ -132,7 +211,7 @@ trait ActorRef extends TransactionManagement { * This lock ensures thread safety in the dispatching: only one message can * be dispatched at once on the actor. */ - protected[akka] val dispatcherLock: Lock = new ReentrantLock + protected[akka] val dispatcherLock = new ReentrantLock /** * Holds the reference to the sender of the currently processed message. @@ -141,10 +220,8 @@ trait ActorRef extends TransactionManagement { * - Is Some(Right(CompletableFuture)) if sender is holding on to a Future for the result */ protected[this] var _replyTo: Option[Either[ActorRef, CompletableFuture[Any]]] = None - protected[akka] def replyTo: Option[Either[ActorRef, CompletableFuture[Any]]] = - guard.withReadLock { _replyTo } - protected[akka] def replyTo_=(rt: Option[Either[ActorRef, CompletableFuture[Any]]]) = - guard.withWriteLock { _replyTo = rt } + protected[akka] def replyTo: Option[Either[ActorRef, CompletableFuture[Any]]] = guard.withGuard { _replyTo } + protected[akka] def replyTo_=(rt: Option[Either[ActorRef, CompletableFuture[Any]]]) = guard.withGuard { _replyTo = rt } /** * Is the actor killed? @@ -165,6 +242,10 @@ trait ActorRef extends TransactionManagement { * Returns the uuid for the actor. */ def uuid = _uuid + + /** + * Only for internal use. UUID is effectively final. + */ protected[akka] def uuid_=(uid: String) = _uuid = uid /** @@ -196,7 +277,7 @@ trait ActorRef extends TransactionManagement { * NOTE: * Use this method with care. In most cases it is better to use '!' together with the 'sender' member field to * implement request/response message exchanges. - * If you are sending messages using !! then you have to use reply(..) + * If you are sending messages using !! then you have to use self.reply(..) * to send a reply message to the original sender. If not then the sender will block until the timeout expires. */ def !![T](message: Any, timeout: Long): Option[T] = { @@ -204,8 +285,9 @@ trait ActorRef extends TransactionManagement { if (isRunning) { val future = postMessageToMailboxAndCreateFutureResultWithTimeout[T](message, timeout, None) val isActiveObject = message.isInstanceOf[Invocation] - if (isActiveObject && message.asInstanceOf[Invocation].isVoid) + if (isActiveObject && message.asInstanceOf[Invocation].isVoid) { future.asInstanceOf[CompletableFuture[Option[_]]].completeWithResult(None) + } try { future.await } catch { @@ -232,7 +314,7 @@ trait ActorRef extends TransactionManagement { * Use this method with care. In most cases it is better to use '!' together with the 'sender' member field to * implement request/response message exchanges. *

- * If you are sending messages using !! then you have to use reply(..) + * If you are sending messages using !! then you have to use self.reply(..) * to send a reply message to the original sender. If not then the sender will block until the timeout expires. */ def !![T](message: Any)(implicit sender: Option[ActorRef] = None): Option[T] = !![T](message, timeout) @@ -243,7 +325,7 @@ trait ActorRef extends TransactionManagement { * NOTE: * Use this method with care. In most cases it is better to use '!' together with the 'sender' member field to * implement request/response message exchanges. - * If you are sending messages using !!! then you have to use reply(..) + * If you are sending messages using !!! then you have to use self.reply(..) * to send a reply message to the original sender. If not then the sender will block until the timeout expires. */ def !!![T](message: Any): Future[T] = { @@ -269,6 +351,39 @@ trait ActorRef extends TransactionManagement { } else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it") } + /** + * Use self.reply(..) to reply with a message to the original sender of the message currently + * being processed. + *

+ * Throws an IllegalStateException if unable to determine what to reply to + */ + def reply(message: Any) = if(!reply_?(message)) throw new IllegalStateException( + "\n\tNo sender in scope, can't reply. " + + "\n\tYou have probably used the '!' method to either; " + + "\n\t\t1. Send a message to a remote actor which does not have a contact address." + + "\n\t\t2. Send a message from an instance that is *not* an actor" + + "\n\t\t3. Send a message to an Active Object annotated with the '@oneway' annotation? " + + "\n\tIf so, switch to '!!' (or remove '@oneway') which passes on an implicit future" + + "\n\tthat will be bound by the argument passed to 'reply'." + + "\n\tAlternatively, you can use setReplyToAddress to make sure the actor can be contacted over the network.") + + /** + * Use reply_?(..) to reply with a message to the original sender of the message currently + * being processed. + *

+ * Returns true if reply was sent, and false if unable to determine what to reply to. + */ + def reply_?(message: Any): Boolean = replyTo match { + case Some(Left(actor)) => + actor ! message + true + case Some(Right(future: Future[Any])) => + future completeWithResult message + true + case _ => + false + } + /** * Serializes the ActorRef instance into a byte array (Array[Byte]). */ @@ -310,22 +425,20 @@ trait ActorRef extends TransactionManagement { def makeTransactionRequired: Unit /** - * Set the contact address for this actor. This is used for replying to messages - * sent asynchronously when no reply channel exists. + * Returns the home address and port for this actor. */ - def setReplyToAddress(hostname: String, port: Int): Unit = - setReplyToAddress(new InetSocketAddress(hostname, port)) + def homeAddress: InetSocketAddress = _homeAddress /** - * Set the contact address for this actor. This is used for replying to messages - * sent asynchronously when no reply channel exists. + * Set the home address and port for this actor. */ - def setReplyToAddress(address: InetSocketAddress): Unit - + def homeAddress_=(hostnameAndPort: Tuple2[String, Int]): Unit = + homeAddress_=(new InetSocketAddress(hostnameAndPort._1, hostnameAndPort._2)) + /** - * Returns the id for the actor. + * Set the home address and port for this actor. */ - def id: String + def homeAddress_=(address: InetSocketAddress): Unit /** * Returns the remote address for the actor, if any, else None. @@ -333,20 +446,6 @@ trait ActorRef extends TransactionManagement { def remoteAddress: Option[InetSocketAddress] protected[akka] def remoteAddress_=(addr: Option[InetSocketAddress]): Unit - /** - * User overridable callback/setting. - *

- * Defines the default timeout for '!!' and '!!!' invocations, - * e.g. the timeout for the future returned by the call to '!!' and '!!!'. - */ - def timeout: Long - - /** - * Sets the default timeout for '!!' and '!!!' invocations, - * e.g. the timeout for the future returned by the call to '!!' and '!!!'. - */ - def timeout_=(t: Long) - /** * Starts up the actor and its message queue. */ @@ -456,15 +555,6 @@ trait ActorRef extends TransactionManagement { protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit - protected[akka] def trapExit: List[Class[_ <: Throwable]] = actor.trapExit - protected[akka] def trapExit_=(exits: List[Class[_ <: Throwable]]) = actor.trapExit = exits - - protected[akka] def lifeCycle: Option[LifeCycle] = actor.lifeCycle - protected[akka] def lifeCycle_=(cycle: Option[LifeCycle]) = actor.lifeCycle = cycle - - protected[akka] def faultHandler: Option[FaultHandlingStrategy] = actor.faultHandler - protected[akka] def faultHandler_=(handler: Option[FaultHandlingStrategy]) = actor.faultHandler = handler - protected[akka] def mailbox: Deque[MessageInvocation] protected[akka] def restart(reason: Throwable): Unit @@ -479,20 +569,38 @@ trait ActorRef extends TransactionManagement { protected[akka] def linkedActorsAsList: List[ActorRef] - override def toString: String - - override def hashCode: Int - - override def equals(that: Any): Boolean + override def hashCode: Int = HashCode.hash(HashCode.SEED, uuid) + + override def equals(that: Any): Boolean = { + that != null && + that.isInstanceOf[ActorRef] && + that.asInstanceOf[ActorRef].uuid == uuid + } + + override def toString = "Actor[" + id + ":" + uuid + "]" + + protected def processSender(senderOption: Option[ActorRef], requestBuilder: RemoteRequestProtocol.Builder) = { + senderOption.foreach { sender => + val address = sender.homeAddress + val server = RemoteServer.serverFor(address) match { + case Some(server) => server + case None => (new RemoteServer).start(address) + } + server.register(sender.uuid, sender) + requestBuilder.setSender(sender.toProtocol) + } + } } /** + * Local ActorRef that is used when referencing the Actor on its "home" node. + * * @author Jonas Bonér */ sealed class LocalActorRef private[akka]( private[this] var actorFactory: Either[Option[Class[_ <: Actor]], Option[() => Actor]] = Left(None)) extends ActorRef { - + private[akka] def this(clazz: Class[_ <: Actor]) = this(Left(Some(clazz))) private[akka] def this(factory: () => Actor) = this(Right(Some(factory))) @@ -500,20 +608,21 @@ sealed class LocalActorRef private[akka]( @volatile private[akka] var _remoteAddress: Option[InetSocketAddress] = None @volatile private[akka] var _linkedActors: Option[ConcurrentHashMap[String, ActorRef]] = None @volatile private[akka] var _supervisor: Option[ActorRef] = None - @volatile private[akka] var _replyToAddress: Option[InetSocketAddress] = None protected[akka] val _mailbox: Deque[MessageInvocation] = new ConcurrentLinkedDeque[MessageInvocation] protected[this] val actorInstance = new AtomicReference[Actor](newActor) - if (startOnCreation) start - + @volatile private var isInInitialization = false + @volatile private var runActorInitialization = false + + if (runActorInitialization) initializeActorInstance + /** * Serializes the ActorRef instance into a Protocol Buffers (protobuf) Message. */ - protected[akka] def toProtocol: ActorRefProtocol = guard.withWriteLock { - val (host, port) = _replyToAddress.map(address => - (address.getHostName, address.getPort)) - .getOrElse((Actor.HOSTNAME, Actor.PORT)) + protected[akka] def toProtocol: ActorRefProtocol = guard.withGuard { + val host = homeAddress.getHostName + val port = homeAddress.getPort if (!registeredInRemoteNodeDuringSerialization) { Actor.log.debug("Register serialized Actor [%s] as remote @ [%s:%s]", actorClass.getName, host, port) @@ -549,8 +658,8 @@ sealed class LocalActorRef private[akka]( /** * Sets the dispatcher for this actor. Needs to be invoked before the actor is started. */ - def dispatcher_=(md: MessageDispatcher): Unit = guard.withWriteLock { - if (!isRunning) actor.dispatcher = md + def dispatcher_=(md: MessageDispatcher): Unit = guard.withGuard { + if (!isRunning) _dispatcher = md else throw new IllegalArgumentException( "Can not swap dispatcher for " + toString + " after it has been started") } @@ -558,7 +667,7 @@ sealed class LocalActorRef private[akka]( /** * Get the dispatcher for this actor. */ - def dispatcher: MessageDispatcher = guard.withReadLock { actor.dispatcher } + def dispatcher: MessageDispatcher = guard.withGuard { _dispatcher } /** * Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host. @@ -571,13 +680,13 @@ sealed class LocalActorRef private[akka]( /** * Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host. */ - def makeRemote(address: InetSocketAddress): Unit = guard.withWriteLock { + def makeRemote(address: InetSocketAddress): Unit = guard.withGuard { if (isRunning) throw new IllegalStateException( "Can't make a running actor remote. Make sure you call 'makeRemote' before 'start'.") else { _remoteAddress = Some(address) RemoteClient.register(address.getHostName, address.getPort, uuid) - if (_replyToAddress.isEmpty) setReplyToAddress(Actor.HOSTNAME, Actor.PORT) + homeAddress = (RemoteServer.HOSTNAME, RemoteServer.PORT) } } @@ -589,7 +698,7 @@ sealed class LocalActorRef private[akka]( * TransactionManagement.disableTransactions * */ - def makeTransactionRequired = guard.withWriteLock { + def makeTransactionRequired = guard.withGuard { if (isRunning) throw new IllegalArgumentException( "Can not make actor transaction required after it has been started") else isTransactor = true @@ -599,57 +708,32 @@ sealed class LocalActorRef private[akka]( * Set the contact address for this actor. This is used for replying to messages * sent asynchronously when no reply channel exists. */ - def setReplyToAddress(address: InetSocketAddress): Unit = - guard.withReadLock { _replyToAddress = Some(address) } - - /** - * Returns the id for the actor. - */ - def id = actor.id + def homeAddress_=(address: InetSocketAddress): Unit = guard.withGuard { _homeAddress = address } /** * Returns the remote address for the actor, if any, else None. */ - def remoteAddress: Option[InetSocketAddress] = guard.withReadLock { _remoteAddress } + def remoteAddress: Option[InetSocketAddress] = guard.withGuard { _remoteAddress } protected[akka] def remoteAddress_=(addr: Option[InetSocketAddress]): Unit = - guard.withWriteLock { _remoteAddress = addr } - - /** - * User overridable callback/setting. - *

- * Defines the default timeout for '!!' and '!!!' invocations, - * e.g. the timeout for the future returned by the call to '!!' and '!!!'. - */ - def timeout: Long = actor.timeout - - /** - * Sets the default timeout for '!!' and '!!!' invocations, - * e.g. the timeout for the future returned by the call to '!!' and '!!!'. - */ - def timeout_=(t: Long) = actor.timeout = t + guard.withGuard { _remoteAddress = addr } /** * Starts up the actor and its message queue. */ - def start: ActorRef = guard.withWriteLock { + def start: ActorRef = guard.withGuard { if (isShutdown) throw new IllegalStateException( "Can't restart an actor that has been shut down with 'stop' or 'exit'") if (!isRunning) { - dispatcher.register(this) - dispatcher.start - _isRunning = true - actor.init - actor.initTransactionalState + if (!isInInitialization) initializeActorInstance + else runActorInitialization = true } - Actor.log.debug("[%s] has started", toString) - ActorRegistry.register(this) this } /** * Shuts down the actor its dispatcher and message queue. */ - def stop = guard.withWriteLock { + def stop = guard.withGuard { if (isRunning) { dispatcher.unregister(this) _isRunning = false @@ -672,7 +756,7 @@ sealed class LocalActorRef private[akka]( *

* To be invoked from within the actor itself. */ - def link(actorRef: ActorRef) = guard.withWriteLock { + def link(actorRef: ActorRef) = guard.withGuard { if (actorRef.supervisor.isDefined) throw new IllegalStateException( "Actor can only have one supervisor [" + actorRef + "], e.g. link(actor) fails") linkedActors.put(actorRef.uuid, actorRef) @@ -685,7 +769,7 @@ sealed class LocalActorRef private[akka]( *

* To be invoked from within the actor itself. */ - def unlink(actorRef: ActorRef) = guard.withWriteLock { + def unlink(actorRef: ActorRef) = guard.withGuard { if (!linkedActors.containsKey(actorRef.uuid)) throw new IllegalStateException( "Actor [" + actorRef + "] is not a linked actor, can't unlink") linkedActors.remove(actorRef.uuid) @@ -698,7 +782,7 @@ sealed class LocalActorRef private[akka]( *

* To be invoked from within the actor itself. */ - def startLink(actorRef: ActorRef) = guard.withWriteLock { + def startLink(actorRef: ActorRef) = guard.withGuard { try { actorRef.start } finally { @@ -711,7 +795,7 @@ sealed class LocalActorRef private[akka]( *

* To be invoked from within the actor itself. */ - def startLinkRemote(actorRef: ActorRef, hostname: String, port: Int) = guard.withWriteLock { + def startLinkRemote(actorRef: ActorRef, hostname: String, port: Int) = guard.withGuard { try { actorRef.makeRemote(hostname, port) actorRef.start @@ -725,7 +809,7 @@ sealed class LocalActorRef private[akka]( *

* To be invoked from within the actor itself. */ - def spawn[T <: Actor : Manifest]: ActorRef = guard.withWriteLock { + def spawn[T <: Actor : Manifest]: ActorRef = guard.withGuard { val actorRef = spawnButDoNotStart[T] actorRef.start actorRef @@ -736,7 +820,7 @@ sealed class LocalActorRef private[akka]( *

* To be invoked from within the actor itself. */ - def spawnRemote[T <: Actor: Manifest](hostname: String, port: Int): ActorRef = guard.withWriteLock { + def spawnRemote[T <: Actor: Manifest](hostname: String, port: Int): ActorRef = guard.withGuard { val actor = spawnButDoNotStart[T] actor.makeRemote(hostname, port) actor.start @@ -748,7 +832,7 @@ sealed class LocalActorRef private[akka]( *

* To be invoked from within the actor itself. */ - def spawnLink[T <: Actor: Manifest]: ActorRef = guard.withWriteLock { + def spawnLink[T <: Actor: Manifest]: ActorRef = guard.withGuard { val actor = spawnButDoNotStart[T] try { actor.start @@ -763,7 +847,7 @@ sealed class LocalActorRef private[akka]( *

* To be invoked from within the actor itself. */ - def spawnLinkRemote[T <: Actor : Manifest](hostname: String, port: Int): ActorRef = guard.withWriteLock { + def spawnLinkRemote[T <: Actor : Manifest](hostname: String, port: Int): ActorRef = guard.withGuard { val actor = spawnButDoNotStart[T] try { actor.makeRemote(hostname, port) @@ -781,25 +865,19 @@ sealed class LocalActorRef private[akka]( /** * Shuts down and removes all linked actors. */ - def shutdownLinkedActors: Unit = guard.withWriteLock { + def shutdownLinkedActors: Unit = guard.withGuard { linkedActorsAsList.foreach(_.stop) linkedActors.clear } - override def toString: String = actor.toString - - override def hashCode: Int = actor.hashCode - - override def equals(that: Any): Boolean = actor.equals(that) - /** * Returns the supervisor, if there is one. */ - def supervisor: Option[ActorRef] = guard.withReadLock { _supervisor } + def supervisor: Option[ActorRef] = guard.withGuard { _supervisor } - protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit = guard.withWriteLock { _supervisor = sup } + protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit = guard.withGuard { _supervisor = sup } - private def spawnButDoNotStart[T <: Actor: Manifest]: ActorRef = guard.withWriteLock { + private def spawnButDoNotStart[T <: Actor: Manifest]: ActorRef = guard.withGuard { val actor = manifest[T].erasure.asInstanceOf[Class[T]].newInstance val actorRef = Actor.newActor(() => actor) if (!dispatcher.isInstanceOf[ThreadBasedDispatcher]) { @@ -809,6 +887,7 @@ sealed class LocalActorRef private[akka]( } private[this] def newActor: Actor = { + isInInitialization = true Actor.actorRefInCreation.value = Some(this) val actor = actorFactory match { case Left(Some(clazz)) => @@ -828,6 +907,7 @@ sealed class LocalActorRef private[akka]( } if (actor eq null) throw new ActorInitializationException( "Actor instance passed to ActorRef can not be 'null'") + isInInitialization = false actor } @@ -846,8 +926,7 @@ sealed class LocalActorRef private[akka]( val id = registerSupervisorAsRemoteActor if (id.isDefined) requestBuilder.setSupervisorUuid(id.get) - - senderOption.foreach(sender => requestBuilder.setSender(sender.toProtocol)) + processSender(senderOption, requestBuilder) RemoteProtocolBuilder.setMessage(message, requestBuilder) RemoteClient.clientFor(remoteAddress.get).send[Any](requestBuilder.build, None) @@ -897,26 +976,6 @@ sealed class LocalActorRef private[akka]( } } - protected[akka] def restart(reason: Throwable): Unit = { - Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id) - val failedActor = actorInstance.get - failedActor.synchronized { - Actor.log.debug("Restarting linked actors for actor [%s].", id) - restartLinkedActors(reason) - Actor.log.debug("Invoking 'preRestart' for failed actor instance [%s].", id) - failedActor.preRestart(reason) - stop - val freshActor = newActor - freshActor.synchronized { - actorInstance.set(freshActor) - start - Actor.log.debug("Invoking 'postRestart' for new actor instance [%s].", id) - freshActor.postRestart(reason) - } - _isKilled = false - } - } - private def joinTransaction(message: Any) = if (isTransactionSetInScope) { // FIXME test to run bench without this trace call Actor.log.trace("Joining transaction set [%s];\n\tactor %s\n\twith message [%s]", @@ -938,11 +997,11 @@ sealed class LocalActorRef private[akka]( } } - private def dispatch[T](messageHandle: MessageInvocation) = guard.withWriteLock { + private def dispatch[T](messageHandle: MessageInvocation) = { setTransactionSet(messageHandle.transactionSet) val message = messageHandle.message //serializeMessage(messageHandle.message) - replyTo = messageHandle.replyTo + _replyTo = messageHandle.replyTo try { if (actor.base.isDefinedAt(message)) actor.base(message) // invoke user actor's receive partial function @@ -963,7 +1022,7 @@ sealed class LocalActorRef private[akka]( } } - private def transactionalDispatch[T](messageHandle: MessageInvocation) = guard.withWriteLock { + private def transactionalDispatch[T](messageHandle: MessageInvocation) = { var topLevelTransaction = false val txSet: Option[CountDownCommitBarrier] = if (messageHandle.transactionSet.isDefined) messageHandle.transactionSet @@ -979,7 +1038,7 @@ sealed class LocalActorRef private[akka]( setTransactionSet(txSet) val message = messageHandle.message //serializeMessage(messageHandle.message) - replyTo = messageHandle.replyTo + _replyTo = messageHandle.replyTo def proceed = { if (actor.base.isDefinedAt(message)) actor.base(message) // invoke user actor's receive partial function @@ -1020,21 +1079,49 @@ sealed class LocalActorRef private[akka]( } } - protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = guard.withReadLock { + protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = { if (trapExit.exists(_.isAssignableFrom(reason.getClass))) { if (faultHandler.isDefined) { faultHandler.get match { // FIXME: implement support for maxNrOfRetries and withinTimeRange in RestartStrategy - case AllForOneStrategy(maxNrOfRetries, withinTimeRange) => restartLinkedActors(reason) - case OneForOneStrategy(maxNrOfRetries, withinTimeRange) => dead.restart(reason) + case AllForOneStrategy(maxNrOfRetries, withinTimeRange) => + restartLinkedActors(reason) + + case OneForOneStrategy(maxNrOfRetries, withinTimeRange) => + dead.restart(reason) } } else throw new IllegalStateException( "No 'faultHandler' defined for an actor with the 'trapExit' member field defined " + "\n\tto non-empty list of exception classes - can't proceed " + toString) - } else _supervisor.foreach(_ ! Exit(dead, reason)) // if 'trapExit' is not defined then pass the Exit on + } else { + _supervisor.foreach(_ ! Exit(dead, reason)) // if 'trapExit' is not defined then pass the Exit on + } } - protected[akka] def restartLinkedActors(reason: Throwable) = guard.withWriteLock { + protected[akka] def restart(reason: Throwable): Unit = { + Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id) + restartLinkedActors(reason) + val failedActor = actorInstance.get + failedActor.synchronized { + Actor.log.debug("Restarting linked actors for actor [%s].", id) + Actor.log.debug("Invoking 'preRestart' for failed actor instance [%s].", id) + failedActor.preRestart(reason) + failedActor.shutdown + _isRunning = false + val freshActor = newActor + freshActor.synchronized { + freshActor.init + freshActor.initTransactionalState + _isRunning = true + actorInstance.set(freshActor) + Actor.log.debug("Invoking 'postRestart' for new actor instance [%s].", id) + freshActor.postRestart(reason) + } + _isKilled = false + } + } + + protected[akka] def restartLinkedActors(reason: Throwable) = guard.withGuard { linkedActorsAsList.foreach { actorRef => if (actorRef.lifeCycle.isEmpty) actorRef.lifeCycle = Some(LifeCycle(Permanent)) actorRef.lifeCycle.get match { @@ -1060,14 +1147,14 @@ sealed class LocalActorRef private[akka]( } } - protected[akka] def registerSupervisorAsRemoteActor: Option[String] = guard.withWriteLock { + protected[akka] def registerSupervisorAsRemoteActor: Option[String] = guard.withGuard { if (_supervisor.isDefined) { RemoteClient.clientFor(remoteAddress.get).registerSupervisorForActor(this) Some(_supervisor.get.uuid) } else None } - protected[akka] def linkedActors: JMap[String, ActorRef] = guard.withWriteLock { + protected[akka] def linkedActors: JMap[String, ActorRef] = guard.withGuard { if (_linkedActors.isEmpty) { val actors = new ConcurrentHashMap[String, ActorRef] _linkedActors = Some(actors) @@ -1078,6 +1165,17 @@ sealed class LocalActorRef private[akka]( protected[akka] def linkedActorsAsList: List[ActorRef] = linkedActors.values.toArray.toList.asInstanceOf[List[ActorRef]] + private def initializeActorInstance = if (!isRunning) { + dispatcher.register(this) + dispatcher.start + actor.init // run actor init and initTransactionalState callbacks + actor.initTransactionalState + Actor.log.debug("[%s] has started", toString) + ActorRegistry.register(this) + if (id == "N/A") id = actorClass.getName // if no name set, then use default name (class name) + _isRunning = true + } + private def serializeMessage(message: AnyRef): AnyRef = if (Actor.SERIALIZE_MESSAGES) { if (!message.isInstanceOf[String] && !message.isInstanceOf[Byte] && @@ -1105,8 +1203,9 @@ sealed class LocalActorRef private[akka]( } /** - * Remote Actor proxy. - * + * Remote ActorRef that is used when referencing the Actor on a different node than its "home" node. + * This reference is network-aware (remembers its origin) and immutable. + * * @author Jonas Bonér */ private[akka] case class RemoteActorRef private[akka] ( @@ -1114,8 +1213,10 @@ private[akka] case class RemoteActorRef private[akka] ( uuuid: String, val className: String, val hostname: String, val port: Int, _timeout: Long) extends ActorRef { _uuid = uuuid + timeout = _timeout start + Thread.sleep(1000) val remoteClient = RemoteClient.clientFor(hostname, port) def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = { @@ -1127,7 +1228,7 @@ private[akka] case class RemoteActorRef private[akka] ( .setIsActor(true) .setIsOneWay(true) .setIsEscaped(false) - senderOption.foreach(sender => requestBuilder.setSender(sender.toProtocol)) + processSender(senderOption, requestBuilder) RemoteProtocolBuilder.setMessage(message, requestBuilder) remoteClient.send[Any](requestBuilder.build, None) } @@ -1144,15 +1245,12 @@ private[akka] case class RemoteActorRef private[akka] ( .setIsActor(true) .setIsOneWay(false) .setIsEscaped(false) - //senderOption.foreach(sender => requestBuilder.setSender(sender.toProtocol)) RemoteProtocolBuilder.setMessage(message, requestBuilder) val future = remoteClient.send(requestBuilder.build, senderFuture) if (future.isDefined) future.get else throw new IllegalStateException("Expected a future from remote call to actor " + toString) } - def timeout: Long = _timeout - def start: ActorRef = { _isRunning = true this @@ -1168,13 +1266,11 @@ private[akka] case class RemoteActorRef private[akka] ( def actorClass: Class[_ <: Actor] = unsupported def dispatcher_=(md: MessageDispatcher): Unit = unsupported def dispatcher: MessageDispatcher = unsupported + def makeTransactionRequired: Unit = unsupported def makeRemote(hostname: String, port: Int): Unit = unsupported def makeRemote(address: InetSocketAddress): Unit = unsupported - def makeTransactionRequired: Unit = unsupported - def setReplyToAddress(address: InetSocketAddress): Unit = unsupported - def id: String = unsupported + def homeAddress_=(address: InetSocketAddress): Unit = unsupported def remoteAddress: Option[InetSocketAddress] = unsupported - def timeout_=(t: Long) = unsupported def link(actorRef: ActorRef): Unit = unsupported def unlink(actorRef: ActorRef): Unit = unsupported def startLink(actorRef: ActorRef): Unit = unsupported @@ -1198,5 +1294,6 @@ private[akka] case class RemoteActorRef private[akka] ( protected[akka] def remoteAddress_=(addr: Option[InetSocketAddress]): Unit = unsupported protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit = unsupported protected[this] def actorInstance: AtomicReference[Actor] = unsupported + private def unsupported = throw new UnsupportedOperationException("Not supported for RemoteActorRef") } diff --git a/akka-core/src/main/scala/actor/Agent.scala b/akka-core/src/main/scala/actor/Agent.scala index f949d94777..ac788b213a 100644 --- a/akka-core/src/main/scala/actor/Agent.scala +++ b/akka-core/src/main/scala/actor/Agent.scala @@ -98,39 +98,12 @@ class AgentException private[akka](message: String) extends RuntimeException(mes * @author Viktor Klang * @author Jonas Bonér */ -sealed class Agent[T] private (initialValue: T) extends Transactor { +sealed class Agent[T] private (initialValue: T) { import Agent._ import Actor._ -// _selfSenderRef = Some(newActor(() => this).start) - log.debug("Starting up Agent [%s]", uuid) - - private lazy val value = Ref[T]() - - self ! Value(initialValue) - - /** - * Periodically handles incoming messages. - */ - def receive = { - case Value(v: T) => - swap(v) - case Function(fun: (T => T)) => - swap(fun(value.getOrWait)) - case Procedure(proc: (T => Unit)) => - proc(copyStrategy(value.getOrElse(throw new AgentException("Could not read Agent's value; value is null")))) - } - - /** - * Specifies how a copy of the value is made, defaults to using identity. - */ - protected def copyStrategy(t: T): T = t - - /** - * Performs a CAS operation, atomically swapping the internal state with the value - * provided as a by-name parameter. - */ - private final def swap(newData: => T): Unit = value.swap(newData) + private val dispatcher = newActor(() => new AgentDispatcher[T](initialValue)).start + dispatcher ! Value(initialValue) /** * Submits a request to read the internal state. @@ -140,8 +113,9 @@ sealed class Agent[T] private (initialValue: T) extends Transactor { * method and then waits for its result on a CountDownLatch. */ final def get: T = { - if (self.isTransactionInScope) throw new AgentException( - "Can't call Agent.get within an enclosing transaction.\n\tWould block indefinitely.\n\tPlease refactor your code.") + if (dispatcher.isTransactionInScope) throw new AgentException( + "Can't call Agent.get within an enclosing transaction."+ + "\n\tWould block indefinitely.\n\tPlease refactor your code.") val ref = new AtomicReference[T] val latch = new CountDownLatch(1) sendProc((v: T) => {ref.set(v); latch.countDown}) @@ -159,22 +133,22 @@ sealed class Agent[T] private (initialValue: T) extends Transactor { /** * Submits the provided function for execution against the internal agent's state. */ - final def apply(message: (T => T)): Unit = self ! Function(message) + final def apply(message: (T => T)): Unit = dispatcher ! Function(message) /** * Submits a new value to be set as the new agent's internal state. */ - final def apply(message: T): Unit = self ! Value(message) + final def apply(message: T): Unit = dispatcher ! Value(message) /** * Submits the provided function of type 'T => T' for execution against the internal agent's state. */ - final def send(message: (T) => T): Unit = self ! Function(message) + final def send(message: (T) => T): Unit = dispatcher ! Function(message) /** * Submits a new value to be set as the new agent's internal state. */ - final def send(message: T): Unit = self ! Value(message) + final def send(message: T): Unit = dispatcher ! Value(message) /** * Asynchronously submits a procedure of type 'T => Unit' to read the internal state. @@ -182,7 +156,7 @@ sealed class Agent[T] private (initialValue: T) extends Transactor { * of the internal state will be used, depending on the underlying effective copyStrategy. * Does not change the value of the agent (this). */ - final def sendProc(f: (T) => Unit): Unit = self ! Procedure(f) + final def sendProc(f: (T) => Unit): Unit = dispatcher ! Procedure(f) /** * Applies function with type 'T => B' to the agent's internal state and then returns a new agent with the result. @@ -207,7 +181,7 @@ sealed class Agent[T] private (initialValue: T) extends Transactor { * * A closed agent can never be used again. */ - def close = stop + def close = dispatcher.stop } /** @@ -221,20 +195,44 @@ object Agent { /* * The internal messages for passing around requests. */ - private case class Value[T](value: T) - private case class Function[T](fun: ((T) => T)) - private case class Procedure[T](fun: ((T) => Unit)) + private[akka] case class Value[T](value: T) + private[akka] case class Function[T](fun: ((T) => T)) + private[akka] case class Procedure[T](fun: ((T) => Unit)) /** * Creates a new Agent of type T with the initial value of value. */ def apply[T](value: T): Agent[T] = new Agent(value) +} + +/** + * Agent dispatcher Actor. + * + * @author Jonas Bonér + */ +final class AgentDispatcher[T] private[akka] (initialValue: T) extends Transactor { + import Agent._ + import Actor._ + log.debug("Starting up Agent [%s]", self.uuid) + + private lazy val value = Ref[T]() + + /** + * Periodically handles incoming messages. + */ + def receive = { + case Value(v: T) => + swap(v) + case Function(fun: (T => T)) => + swap(fun(value.getOrWait)) + case Procedure(proc: (T => Unit)) => + proc(value.getOrElse(throw new AgentException("Could not read Agent's value; value is null"))) + } /** - * Creates a new Agent of type T with the initial value of value and with the - * specified copy function. + * Performs a CAS operation, atomically swapping the internal state with the value + * provided as a by-name parameter. */ - def apply[T](value: T, newCopyStrategy: (T) => T) = new Agent(value) { - override def copyStrategy(t: T) = newCopyStrategy(t) - } + private final def swap(newData: => T): Unit = value.swap(newData) } + diff --git a/akka-core/src/main/scala/actor/Scheduler.scala b/akka-core/src/main/scala/actor/Scheduler.scala index 8bb6bbcd19..d3429ef446 100644 --- a/akka-core/src/main/scala/actor/Scheduler.scala +++ b/akka-core/src/main/scala/actor/Scheduler.scala @@ -27,7 +27,7 @@ case class SchedulerException(msg: String, e: Throwable) extends RuntimeExceptio * which is licensed under the Apache 2 License. */ class ScheduleActor(val receiver: ActorRef, val future: ScheduledFuture[AnyRef]) extends Actor with Logging { - lifeCycle = Some(LifeCycle(Permanent)) + self.lifeCycle = Some(LifeCycle(Permanent)) def receive = { case UnSchedule => @@ -42,12 +42,12 @@ object Scheduler extends Actor { private var service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory) private val schedulers = new ConcurrentHashMap[ActorRef, ActorRef] - faultHandler = Some(OneForOneStrategy(5, 5000)) - trapExit = List(classOf[Throwable]) + self.faultHandler = Some(OneForOneStrategy(5, 5000)) + self.trapExit = List(classOf[Throwable]) def schedule(receiver: ActorRef, message: AnyRef, initialDelay: Long, delay: Long, timeUnit: TimeUnit) = { try { - startLink(newActor(() => new ScheduleActor( + self.startLink(newActor(() => new ScheduleActor( receiver, service.scheduleAtFixedRate(new java.lang.Runnable { def run = receiver ! message; @@ -60,7 +60,7 @@ object Scheduler extends Actor { def restart = service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory) def stopSupervising(actorRef: ActorRef) = { - unlink(actorRef) + self.unlink(actorRef) schedulers.remove(actorRef) } diff --git a/akka-core/src/main/scala/actor/Supervisor.scala b/akka-core/src/main/scala/actor/Supervisor.scala index 9e2bbe8e62..e366f102d5 100644 --- a/akka-core/src/main/scala/actor/Supervisor.scala +++ b/akka-core/src/main/scala/actor/Supervisor.scala @@ -164,7 +164,8 @@ class SupervisorFactory private[akka] (val config: SupervisorConfig) extends Log * * @author Jonas Bonér */ -sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExceptions: List[Class[_ <: Throwable]]) +sealed class Supervisor private[akka] ( + handler: FaultHandlingStrategy, trapExceptions: List[Class[_ <: Throwable]]) extends Configurator { private val children = new ConcurrentHashMap[String, List[ActorRef]] @@ -202,7 +203,7 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExcep else list } children.put(className, actorRef :: currentActors) - actorRef.actor.lifeCycle = Some(lifeCycle) + actorRef.lifeCycle = Some(lifeCycle) supervisor ! Link(actorRef) remoteAddress.foreach(address => RemoteServer.actorsFor( RemoteServer.Address(address.hostname, address.port)) @@ -241,16 +242,16 @@ final class SupervisorActor private[akka] ( handler: FaultHandlingStrategy, trapExceptions: List[Class[_ <: Throwable]]) extends Actor { - - trapExit = trapExceptions - faultHandler = Some(handler) + self.dispatcher = Dispatchers.newThreadBasedDispatcher(self) + self.trapExit = trapExceptions + self.faultHandler = Some(handler) override def shutdown: Unit = self.shutdownLinkedActors def receive = { - case Link(child) => startLink(child) - case Unlink(child) => unlink(child) - case UnlinkAndStop(child) => unlink(child); child.stop + case Link(child) => self.startLink(child) + case Unlink(child) => self.unlink(child) + case UnlinkAndStop(child) => self.unlink(child); child.stop case unknown => throw new IllegalArgumentException( "Supervisor can only respond to 'Link' and 'Unlink' messages. Unknown message [" + unknown + "]") } diff --git a/akka-core/src/main/scala/dispatch/Dispatchers.scala b/akka-core/src/main/scala/dispatch/Dispatchers.scala index 411ab297ea..6fa9ebd418 100644 --- a/akka-core/src/main/scala/dispatch/Dispatchers.scala +++ b/akka-core/src/main/scala/dispatch/Dispatchers.scala @@ -80,5 +80,5 @@ object Dispatchers { *

* E.g. each actor consumes its own thread. */ - def newThreadBasedDispatcher(actor: Actor) = new ThreadBasedDispatcher(actor) + def newThreadBasedDispatcher(actor: ActorRef) = new ThreadBasedDispatcher(actor) } diff --git a/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala b/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala index b5aaa3380c..d13b41e574 100644 --- a/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala @@ -14,11 +14,10 @@ import se.scalablesolutions.akka.actor.{Actor, ActorRef, ActorMessageInvoker} * * @author Jonas Bonér */ -class ThreadBasedDispatcher private[akka] (val name: String, val messageHandler: MessageInvoker) - extends MessageDispatcher { - - def this(actor: Actor) = this(actor.getClass.getName, new ActorMessageInvoker(Actor.newActor(() => actor))) - +class ThreadBasedDispatcher(actor: ActorRef) extends MessageDispatcher { + private val name = actor.getClass.getName + ":" + actor.uuid + private val threadName = "thread-based:dispatcher:" + name + private val messageHandler = new ActorMessageInvoker(actor) private val queue = new BlockingMessageQueue(name) private var selectorThread: Thread = _ @volatile private var active: Boolean = false @@ -27,7 +26,7 @@ class ThreadBasedDispatcher private[akka] (val name: String, val messageHandler: def start = if (!active) { active = true - selectorThread = new Thread { + selectorThread = new Thread(threadName) { override def run = { while (active) { try { diff --git a/akka-core/src/main/scala/dispatch/ThreadPoolBuilder.scala b/akka-core/src/main/scala/dispatch/ThreadPoolBuilder.scala index 1fedc1a5d7..a5f097a84b 100644 --- a/akka-core/src/main/scala/dispatch/ThreadPoolBuilder.scala +++ b/akka-core/src/main/scala/dispatch/ThreadPoolBuilder.scala @@ -14,7 +14,7 @@ import se.scalablesolutions.akka.util.Logging trait ThreadPoolBuilder { val name: String - private val NR_START_THREADS = 4 + private val NR_START_THREADS = 16 private val NR_MAX_THREADS = 128 private val KEEP_ALIVE_TIME = 60000L // default is one minute private val MILLISECONDS = TimeUnit.MILLISECONDS diff --git a/akka-core/src/main/scala/remote/RemoteClient.scala b/akka-core/src/main/scala/remote/RemoteClient.scala index 6888d2c6c7..57b3eb7048 100644 --- a/akka-core/src/main/scala/remote/RemoteClient.scala +++ b/akka-core/src/main/scala/remote/RemoteClient.scala @@ -182,6 +182,7 @@ class RemoteClient(val hostname: String, val port: Int) extends Logging { futures.synchronized { val futureResult = if (senderFuture.isDefined) senderFuture.get else new DefaultCompletableFuture[T](request.getTimeout) + println("------ SETTING ID: " + request.getId + " " + name) futures.put(request.getId, futureResult) connection.getChannel.write(request) Some(futureResult) @@ -263,6 +264,7 @@ class RemoteClientHandler(val name: String, if (result.isInstanceOf[RemoteReplyProtocol]) { val reply = result.asInstanceOf[RemoteReplyProtocol] log.debug("Remote client received RemoteReplyProtocol[\n%s]", reply.toString) + println("------ GETTING ID: " + reply.getId + " " + name) val future = futures.get(reply.getId).asInstanceOf[CompletableFuture[Any]] if (reply.getIsSuccessful) { val message = RemoteProtocolBuilder.getMessage(reply) diff --git a/akka-core/src/main/scala/remote/RemoteServer.scala b/akka-core/src/main/scala/remote/RemoteServer.scala index 5dfde04747..1d0e67613e 100644 --- a/akka-core/src/main/scala/remote/RemoteServer.scala +++ b/akka-core/src/main/scala/remote/RemoteServer.scala @@ -112,6 +112,9 @@ object RemoteServer { else Some(server) } + private[akka] def serverFor(address: InetSocketAddress): Option[RemoteServer] = + serverFor(address.getHostName, address.getPort) + private[remote] def register(hostname: String, port: Int, server: RemoteServer) = remoteServers.put(Address(hostname, port), server) @@ -154,13 +157,22 @@ class RemoteServer extends Logging { def isRunning = _isRunning - def start: Unit = start(None) + def start: RemoteServer = + start(hostname, port, None) - def start(loader: Option[ClassLoader]): Unit = start(hostname, port, loader) + def start(loader: Option[ClassLoader]): RemoteServer = + start(hostname, port, loader) - def start(_hostname: String, _port: Int): Unit = start(_hostname, _port, None) + def start(address: InetSocketAddress): RemoteServer = + start(address.getHostName, address.getPort, None) - def start(_hostname: String, _port: Int, loader: Option[ClassLoader]): Unit = synchronized { + def start(address: InetSocketAddress, loader: Option[ClassLoader]): RemoteServer = + start(address.getHostName, address.getPort, loader) + + def start(_hostname: String, _port: Int): RemoteServer = + start(_hostname, _port, None) + + def start(_hostname: String, _port: Int, loader: Option[ClassLoader]): RemoteServer = synchronized { try { if (!_isRunning) { hostname = _hostname @@ -182,6 +194,7 @@ class RemoteServer extends Logging { } catch { case e => log.error(e, "Could not start up remote server") } + this } def shutdown = synchronized { @@ -334,7 +347,7 @@ class RemoteServerHandler( val actorRef = createActor(request.getTarget, request.getUuid, request.getTimeout) actorRef.start val message = RemoteProtocolBuilder.getMessage(request) - if (request.getIsOneWay) { + if (request.hasSender) { val sender = request.getSender if (sender ne null) actorRef.!(message)(Some(ActorRef.fromProtocol(sender))) } else { diff --git a/akka-core/src/main/scala/stm/DataFlowVariable.scala b/akka-core/src/main/scala/stm/DataFlowVariable.scala index d5e3901fa7..bc65d9d5cd 100644 --- a/akka-core/src/main/scala/stm/DataFlowVariable.scala +++ b/akka-core/src/main/scala/stm/DataFlowVariable.scala @@ -46,7 +46,7 @@ import se.scalablesolutions.akka.dispatch.CompletableFuture extends Actor { def receive = { case Exit => exit - case message => reply(body(message.asInstanceOf[A])) + case message => self.reply(body(message.asInstanceOf[A])) } } @@ -64,7 +64,7 @@ import se.scalablesolutions.akka.dispatch.CompletableFuture private val blockedReaders = new ConcurrentLinkedQueue[ActorRef] private class In[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor { - timeout = TIME_OUT + self.timeout = TIME_OUT def receive = { case Set(v) => if (dataFlow.value.compareAndSet(None, Some(v.asInstanceOf[T]))) { @@ -78,13 +78,13 @@ import se.scalablesolutions.akka.dispatch.CompletableFuture } private class Out[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor { - timeout = TIME_OUT + self.timeout = TIME_OUT private var readerFuture: Option[CompletableFuture[T]] = None def receive = { case Get => val ref = dataFlow.value.get if (ref.isDefined) - reply(ref.get) + self.reply(ref.get) else { readerFuture = self.replyTo match { case Some(Right(future)) => Some(future.asInstanceOf[CompletableFuture[T]]) @@ -348,7 +348,6 @@ object Test4 extends Application { // ======================================= object Test5 extends Application { - import Actor.Sender.Self import DataFlow._ // create four 'Int' data flow variables diff --git a/akka-core/src/main/scala/util/ReadWriteLock.scala b/akka-core/src/main/scala/util/ReadWriteLock.scala deleted file mode 100644 index 9dd7a27b07..0000000000 --- a/akka-core/src/main/scala/util/ReadWriteLock.scala +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Copyright (C) 2009-2010 Scalable Solutions AB - */ - -package se.scalablesolutions.akka.util - -import java.util.concurrent.locks.ReentrantReadWriteLock - -/** - * @author Jonas Bonér - */ -class ReadWriteLock { - private val rwl = new ReentrantReadWriteLock - private val readLock = rwl.readLock - private val writeLock = rwl.writeLock - - def withWriteLock[T](body: => T): T = { - writeLock.lock - try { - body - } finally { - writeLock.unlock - } - } - - def withReadLock[T](body: => T): T = { - readLock.lock - try { - body - } finally { - readLock.unlock - } - } -} - diff --git a/akka-core/src/test/scala/ActorFireForgetRequestReplySpec.scala b/akka-core/src/test/scala/ActorFireForgetRequestReplySpec.scala index ed6517de8b..5531c2fbe7 100644 --- a/akka-core/src/test/scala/ActorFireForgetRequestReplySpec.scala +++ b/akka-core/src/test/scala/ActorFireForgetRequestReplySpec.scala @@ -9,16 +9,18 @@ import Actor._ object ActorFireForgetRequestReplySpec { class ReplyActor extends Actor { - dispatcher = Dispatchers.newThreadBasedDispatcher(this) + self.dispatcher = Dispatchers.newThreadBasedDispatcher(self) def receive = { - case "Send" => reply("Reply") - case "SendImplicit" => self.replyTo.get.left.get ! "ReplyImplicit" + case "Send" => + self.reply("Reply") + case "SendImplicit" => + self.replyTo.get.left.get ! "ReplyImplicit" } } class SenderActor(replyActor: ActorRef) extends Actor { - dispatcher = Dispatchers.newThreadBasedDispatcher(this) + self.dispatcher = Dispatchers.newThreadBasedDispatcher(self) def receive = { case "Init" => replyActor ! "Send" @@ -42,12 +44,11 @@ object ActorFireForgetRequestReplySpec { class ActorFireForgetRequestReplySpec extends JUnitSuite { import ActorFireForgetRequestReplySpec._ - + @Test def shouldReplyToBangMessageUsingReply = { state.finished.reset - val replyActor = newActor[ReplyActor] - replyActor.start + val replyActor = newActor[ReplyActor].start val senderActor = newActor(() => new SenderActor(replyActor)) senderActor.start senderActor ! "Init" @@ -59,10 +60,8 @@ class ActorFireForgetRequestReplySpec extends JUnitSuite { @Test def shouldReplyToBangMessageUsingImplicitSender = { state.finished.reset - val replyActor = newActor[ReplyActor] - replyActor.start - val senderActor = newActor(() => new SenderActor(replyActor)) - senderActor.start + val replyActor = newActor[ReplyActor].start + val senderActor = newActor(() => new SenderActor(replyActor)).start senderActor ! "InitImplicit" try { state.finished.await(1L, TimeUnit.SECONDS) } catch { case e: TimeoutException => fail("Never got the message") } diff --git a/akka-core/src/test/scala/ActorRegistrySpec.scala b/akka-core/src/test/scala/ActorRegistrySpec.scala index 8027c99655..9dcba44e8c 100644 --- a/akka-core/src/test/scala/ActorRegistrySpec.scala +++ b/akka-core/src/test/scala/ActorRegistrySpec.scala @@ -7,11 +7,11 @@ import Actor._ object ActorRegistrySpec { var record = "" class TestActor extends Actor { - id = "MyID" + self.id = "MyID" def receive = { case "ping" => record = "pong" + record - reply("got ping") + self.reply("got ping") } } } @@ -26,7 +26,7 @@ class ActorRegistrySpec extends JUnitSuite { val actors = ActorRegistry.actorsFor("MyID") assert(actors.size === 1) assert(actors.head.actor.isInstanceOf[TestActor]) - assert(actors.head.actor.asInstanceOf[TestActor].id === "MyID") + assert(actors.head.id === "MyID") actor.stop } @@ -48,7 +48,7 @@ class ActorRegistrySpec extends JUnitSuite { val actors = ActorRegistry.actorsFor(classOf[TestActor]) assert(actors.size === 1) assert(actors.head.actor.isInstanceOf[TestActor]) - assert(actors.head.actor.asInstanceOf[TestActor].id === "MyID") + assert(actors.head.id === "MyID") actor.stop } @@ -59,7 +59,7 @@ class ActorRegistrySpec extends JUnitSuite { val actors = ActorRegistry.actorsFor[TestActor] assert(actors.size === 1) assert(actors.head.actor.isInstanceOf[TestActor]) - assert(actors.head.actor.asInstanceOf[TestActor].id === "MyID") + assert(actors.head.id === "MyID") actor.stop } @@ -72,9 +72,9 @@ class ActorRegistrySpec extends JUnitSuite { val actors = ActorRegistry.actorsFor("MyID") assert(actors.size === 2) assert(actors.head.actor.isInstanceOf[TestActor]) - assert(actors.head.actor.asInstanceOf[TestActor].id === "MyID") + assert(actors.head.id === "MyID") assert(actors.last.actor.isInstanceOf[TestActor]) - assert(actors.last.actor.asInstanceOf[TestActor].id === "MyID") + assert(actors.last.id === "MyID") actor1.stop actor2.stop } @@ -88,9 +88,9 @@ class ActorRegistrySpec extends JUnitSuite { val actors = ActorRegistry.actorsFor(classOf[TestActor]) assert(actors.size === 2) assert(actors.head.actor.isInstanceOf[TestActor]) - assert(actors.head.actor.asInstanceOf[TestActor].id === "MyID") + assert(actors.head.id === "MyID") assert(actors.last.actor.isInstanceOf[TestActor]) - assert(actors.last.actor.asInstanceOf[TestActor].id === "MyID") + assert(actors.last.id === "MyID") actor1.stop actor2.stop } @@ -104,9 +104,9 @@ class ActorRegistrySpec extends JUnitSuite { val actors = ActorRegistry.actorsFor[TestActor] assert(actors.size === 2) assert(actors.head.actor.isInstanceOf[TestActor]) - assert(actors.head.actor.asInstanceOf[TestActor].id === "MyID") + assert(actors.head.id === "MyID") assert(actors.last.actor.isInstanceOf[TestActor]) - assert(actors.last.actor.asInstanceOf[TestActor].id === "MyID") + assert(actors.last.id === "MyID") actor1.stop actor2.stop } @@ -120,9 +120,9 @@ class ActorRegistrySpec extends JUnitSuite { val actors = ActorRegistry.actors assert(actors.size === 2) assert(actors.head.actor.isInstanceOf[TestActor]) - assert(actors.head.actor.asInstanceOf[TestActor].id === "MyID") + assert(actors.head.id === "MyID") assert(actors.last.actor.isInstanceOf[TestActor]) - assert(actors.last.actor.asInstanceOf[TestActor].id === "MyID") + assert(actors.last.id === "MyID") actor1.stop actor2.stop } diff --git a/akka-core/src/test/scala/AgentSpec.scala b/akka-core/src/test/scala/AgentSpec.scala index aa089a9fd4..c4bc3e44d0 100644 --- a/akka-core/src/test/scala/AgentSpec.scala +++ b/akka-core/src/test/scala/AgentSpec.scala @@ -23,7 +23,7 @@ class AgentSpec extends junit.framework.TestCase with Suite with MustMatchers { agent send (_ * 2) val result = agent() result must be(12) - agent.stop + agent.close } @Test def testSendValue = { @@ -31,7 +31,7 @@ class AgentSpec extends junit.framework.TestCase with Suite with MustMatchers { agent send 6 val result = agent() result must be(6) - agent.stop + agent.close } @Test def testSendProc = { @@ -42,7 +42,7 @@ class AgentSpec extends junit.framework.TestCase with Suite with MustMatchers { agent sendProc { e => result += e; latch.countDown } assert(latch.await(5, TimeUnit.SECONDS)) result must be(10) - agent.stop + agent.close } @Test def testOneAgentsendWithinEnlosingTransactionSuccess = { diff --git a/akka-core/src/test/scala/ClientInitiatedRemoteActorSpec.scala b/akka-core/src/test/scala/ClientInitiatedRemoteActorSpec.scala index 8dcf19ce71..e412261b80 100644 --- a/akka-core/src/test/scala/ClientInitiatedRemoteActorSpec.scala +++ b/akka-core/src/test/scala/ClientInitiatedRemoteActorSpec.scala @@ -16,7 +16,7 @@ object RemoteActorSpecActorUnidirectional { val latch = new CountDownLatch(1) } class RemoteActorSpecActorUnidirectional extends Actor { - dispatcher = Dispatchers.newThreadBasedDispatcher(this) + self.dispatcher = Dispatchers.newThreadBasedDispatcher(self) def receive = { case "OneWay" => @@ -27,7 +27,7 @@ class RemoteActorSpecActorUnidirectional extends Actor { class RemoteActorSpecActorBidirectional extends Actor { def receive = { case "Hello" => - reply("World") + self.reply("World") case "Failure" => throw new RuntimeException("expected") } @@ -36,7 +36,7 @@ class RemoteActorSpecActorBidirectional extends Actor { class SendOneWayAndReplyReceiverActor extends Actor { def receive = { case "Hello" => - reply("World") + self.reply("World") } } @@ -64,15 +64,11 @@ class ClientInitiatedRemoteActorSpec extends JUnitSuite { val PORT1 = 9990 val PORT2 = 9991 var s1: RemoteServer = null - var s2: RemoteServer = null @Before def init() { s1 = new RemoteServer() - s2 = new RemoteServer() - s1.start(HOSTNAME, PORT1) - s2.start(HOSTNAME, PORT2) Thread.sleep(1000) } @@ -83,7 +79,6 @@ class ClientInitiatedRemoteActorSpec extends JUnitSuite { @After def finished() { s1.shutdown - s2.shutdown RemoteClient.shutdownAll Thread.sleep(1000) } @@ -104,7 +99,7 @@ class ClientInitiatedRemoteActorSpec extends JUnitSuite { actor.makeRemote(HOSTNAME, PORT1) actor.start val sender = newActor[SendOneWayAndReplySenderActor] - sender.setReplyToAddress(HOSTNAME, PORT2) + sender.homeAddress = (HOSTNAME, PORT2) sender.actor.asInstanceOf[SendOneWayAndReplySenderActor].sendTo = actor sender.start sender.actor.asInstanceOf[SendOneWayAndReplySenderActor].sendOff diff --git a/akka-core/src/test/scala/ExecutorBasedEventDrivenDispatcherActorSpec.scala b/akka-core/src/test/scala/ExecutorBasedEventDrivenDispatcherActorSpec.scala index e03427cc94..aa5a16f0f6 100644 --- a/akka-core/src/test/scala/ExecutorBasedEventDrivenDispatcherActorSpec.scala +++ b/akka-core/src/test/scala/ExecutorBasedEventDrivenDispatcherActorSpec.scala @@ -8,10 +8,10 @@ import Actor._ object ExecutorBasedEventDrivenDispatcherActorSpec { class TestActor extends Actor { - dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(uuid) + self.dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(self.uuid) def receive = { case "Hello" => - reply("World") + self.reply("World") case "Failure" => throw new RuntimeException("expected") } @@ -21,7 +21,7 @@ object ExecutorBasedEventDrivenDispatcherActorSpec { val oneWay = new CountDownLatch(1) } class OneWayTestActor extends Actor { - dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(uuid) + self.dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(self.uuid) def receive = { case "OneWay" => OneWayTestActor.oneWay.countDown } diff --git a/akka-core/src/test/scala/ExecutorBasedEventDrivenDispatcherActorsSpec.scala b/akka-core/src/test/scala/ExecutorBasedEventDrivenDispatcherActorsSpec.scala index a019bee3b2..ac1421dcdd 100644 --- a/akka-core/src/test/scala/ExecutorBasedEventDrivenDispatcherActorsSpec.scala +++ b/akka-core/src/test/scala/ExecutorBasedEventDrivenDispatcherActorsSpec.scala @@ -14,8 +14,8 @@ import Actor._ */ class ExecutorBasedEventDrivenDispatcherActorsSpec extends JUnitSuite with MustMatchers { class SlowActor(finishedCounter: CountDownLatch) extends Actor { - dispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher - id = "SlowActor" + self.dispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher + self.id = "SlowActor" def receive = { case x: Int => { @@ -26,8 +26,8 @@ class ExecutorBasedEventDrivenDispatcherActorsSpec extends JUnitSuite with MustM } class FastActor(finishedCounter: CountDownLatch) extends Actor { - dispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher - id = "FastActor" + self.dispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher + self.id = "FastActor" def receive = { case x: Int => { diff --git a/akka-core/src/test/scala/ExecutorBasedEventDrivenWorkStealingDispatcherSpec.scala b/akka-core/src/test/scala/ExecutorBasedEventDrivenWorkStealingDispatcherSpec.scala index e8ef3a35ff..7aee353e38 100644 --- a/akka-core/src/test/scala/ExecutorBasedEventDrivenWorkStealingDispatcherSpec.scala +++ b/akka-core/src/test/scala/ExecutorBasedEventDrivenWorkStealingDispatcherSpec.scala @@ -16,9 +16,9 @@ object ExecutorBasedEventDrivenWorkStealingDispatcherSpec { val parentActorDispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pooled-dispatcher") class DelayableActor(name: String, delay: Int, finishedCounter: CountDownLatch) extends Actor { - dispatcher = delayableActorDispatcher + self.dispatcher = delayableActorDispatcher var invocationCount = 0 - id = name + self.id = name def receive = { case x: Int => { @@ -30,17 +30,17 @@ object ExecutorBasedEventDrivenWorkStealingDispatcherSpec { } class FirstActor extends Actor { - dispatcher = sharedActorDispatcher + self.dispatcher = sharedActorDispatcher def receive = {case _ => {}} } class SecondActor extends Actor { - dispatcher = sharedActorDispatcher + self.dispatcher = sharedActorDispatcher def receive = {case _ => {}} } class ParentActor extends Actor { - dispatcher = parentActorDispatcher + self.dispatcher = parentActorDispatcher def receive = {case _ => {}} } diff --git a/akka-core/src/test/scala/ForwardActorSpec.scala b/akka-core/src/test/scala/ForwardActorSpec.scala index 7722eccc99..f6e3e26e86 100644 --- a/akka-core/src/test/scala/ForwardActorSpec.scala +++ b/akka-core/src/test/scala/ForwardActorSpec.scala @@ -18,7 +18,7 @@ object ForwardActorSpec { ForwardState.sender = Some(self.replyTo.get.left.get) latch.countDown } - case "SendBangBang" => reply("SendBangBang") + case "SendBangBang" => self.reply("SendBangBang") } } diff --git a/akka-core/src/test/scala/FutureSpec.scala b/akka-core/src/test/scala/FutureSpec.scala index 957e845750..2e30689ff9 100644 --- a/akka-core/src/test/scala/FutureSpec.scala +++ b/akka-core/src/test/scala/FutureSpec.scala @@ -9,7 +9,7 @@ object FutureSpec { class TestActor extends Actor { def receive = { case "Hello" => - reply("World") + self.reply("World") case "NoReply" => {} case "Failure" => throw new RuntimeException("expected") diff --git a/akka-core/src/test/scala/InMemoryActorSpec.scala b/akka-core/src/test/scala/InMemoryActorSpec.scala index c9380eb34f..18ca946592 100644 --- a/akka-core/src/test/scala/InMemoryActorSpec.scala +++ b/akka-core/src/test/scala/InMemoryActorSpec.scala @@ -26,11 +26,10 @@ case class FailureOneWay(key: String, value: String, failer: ActorRef) case object GetNotifier -class InMemStatefulActor(expectedInvocationCount: Int) extends Actor { +class InMemStatefulActor(expectedInvocationCount: Int) extends Transactor { def this() = this(0) - timeout = 5000 - makeTransactionRequired - + self.timeout = 5000 + val notifier = new CountDownLatch(expectedInvocationCount) private lazy val mapState = TransactionalState.newMap[String, String] @@ -39,40 +38,40 @@ class InMemStatefulActor(expectedInvocationCount: Int) extends Actor { def receive = { case GetNotifier => - reply(notifier) + self.reply(notifier) case GetMapState(key) => - reply(mapState.get(key).get) + self.reply(mapState.get(key).get) notifier.countDown case GetVectorSize => - reply(vectorState.length.asInstanceOf[AnyRef]) + self.reply(vectorState.length.asInstanceOf[AnyRef]) notifier.countDown case GetRefState => - reply(refState.get.get) + self.reply(refState.get.get) notifier.countDown case SetMapState(key, msg) => mapState.put(key, msg) - reply(msg) + self.reply(msg) notifier.countDown case SetVectorState(msg) => vectorState.add(msg) - reply(msg) + self.reply(msg) notifier.countDown case SetRefState(msg) => refState.swap(msg) - reply(msg) + self.reply(msg) notifier.countDown case Success(key, msg) => mapState.put(key, msg) vectorState.add(msg) refState.swap(msg) - reply(msg) + self.reply(msg) notifier.countDown case Failure(key, msg, failer) => mapState.put(key, msg) vectorState.add(msg) refState.swap(msg) failer !! "Failure" - reply(msg) + self.reply(msg) notifier.countDown case SetMapStateOneWay(key, msg) => @@ -99,8 +98,8 @@ class InMemStatefulActor(expectedInvocationCount: Int) extends Actor { } @serializable -class InMemFailerActor extends Actor { - makeTransactionRequired +class InMemFailerActor extends Transactor { + def receive = { case "Failure" => throw new RuntimeException("expected") diff --git a/akka-core/src/test/scala/ProtobufActorMessageSerializationSpec.scala b/akka-core/src/test/scala/ProtobufActorMessageSerializationSpec.scala index b7ab96b0d3..22fed5f061 100644 --- a/akka-core/src/test/scala/ProtobufActorMessageSerializationSpec.scala +++ b/akka-core/src/test/scala/ProtobufActorMessageSerializationSpec.scala @@ -30,7 +30,7 @@ object ProtobufActorMessageSerializationSpec { def receive = { case pojo: ProtobufPOJO => val id = pojo.getId - reply(id + 1) + self.reply(id + 1) case msg => throw new RuntimeException("Expected a ProtobufPOJO message but got: " + msg) } diff --git a/akka-core/src/test/scala/ReactorBasedSingleThreadEventDrivenDispatcherActorSpec.scala b/akka-core/src/test/scala/ReactorBasedSingleThreadEventDrivenDispatcherActorSpec.scala index acae2ef607..3679736c94 100644 --- a/akka-core/src/test/scala/ReactorBasedSingleThreadEventDrivenDispatcherActorSpec.scala +++ b/akka-core/src/test/scala/ReactorBasedSingleThreadEventDrivenDispatcherActorSpec.scala @@ -9,11 +9,11 @@ import se.scalablesolutions.akka.dispatch.Dispatchers object ReactorBasedSingleThreadEventDrivenDispatcherActorSpec { class TestActor extends Actor { - dispatcher = Dispatchers.newReactorBasedSingleThreadEventDrivenDispatcher(uuid) + self.dispatcher = Dispatchers.newReactorBasedSingleThreadEventDrivenDispatcher(self.uuid) def receive = { case "Hello" => - reply("World") + self.reply("World") case "Failure" => throw new RuntimeException("expected") } @@ -23,7 +23,7 @@ object ReactorBasedSingleThreadEventDrivenDispatcherActorSpec { val oneWay = new CountDownLatch(1) } class OneWayTestActor extends Actor { - dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(uuid) + self.dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(self.uuid) def receive = { case "OneWay" => OneWayTestActor.oneWay.countDown } diff --git a/akka-core/src/test/scala/ReactorBasedThreadPoolEventDrivenDispatcherActorSpec.scala b/akka-core/src/test/scala/ReactorBasedThreadPoolEventDrivenDispatcherActorSpec.scala index 322976cc0f..0681aaa534 100644 --- a/akka-core/src/test/scala/ReactorBasedThreadPoolEventDrivenDispatcherActorSpec.scala +++ b/akka-core/src/test/scala/ReactorBasedThreadPoolEventDrivenDispatcherActorSpec.scala @@ -9,10 +9,10 @@ import Actor._ object ReactorBasedThreadPoolEventDrivenDispatcherActorSpec { class TestActor extends Actor { - dispatcher = Dispatchers.newReactorBasedThreadPoolEventDrivenDispatcher(uuid) + self.dispatcher = Dispatchers.newReactorBasedThreadPoolEventDrivenDispatcher(self.uuid) def receive = { case "Hello" => - reply("World") + self.reply("World") case "Failure" => throw new RuntimeException("expected") } @@ -27,7 +27,7 @@ class ReactorBasedThreadPoolEventDrivenDispatcherActorSpec extends JUnitSuite { @Test def shouldSendOneWay { val oneWay = new CountDownLatch(1) val actor = newActor(() => new Actor { - dispatcher = Dispatchers.newReactorBasedThreadPoolEventDrivenDispatcher(uuid) + self.dispatcher = Dispatchers.newReactorBasedThreadPoolEventDrivenDispatcher(self.uuid) def receive = { case "OneWay" => oneWay.countDown } diff --git a/akka-core/src/test/scala/RemoteSupervisorSpec.scala b/akka-core/src/test/scala/RemoteSupervisorSpec.scala index e01ac11165..23b82ca187 100644 --- a/akka-core/src/test/scala/RemoteSupervisorSpec.scala +++ b/akka-core/src/test/scala/RemoteSupervisorSpec.scala @@ -21,11 +21,11 @@ object Log { } @serializable class RemotePingPong1Actor extends Actor { - dispatcher = Dispatchers.newThreadBasedDispatcher(this) + self.dispatcher = Dispatchers.newThreadBasedDispatcher(self) def receive = { case BinaryString("Ping") => Log.messageLog.put("ping") - reply("pong") + self.reply("pong") case OneWay => Log.oneWayLog += "oneway" @@ -40,11 +40,11 @@ object Log { } @serializable class RemotePingPong2Actor extends Actor { - dispatcher = Dispatchers.newThreadBasedDispatcher(this) + self.dispatcher = Dispatchers.newThreadBasedDispatcher(self) def receive = { case BinaryString("Ping") => Log.messageLog.put("ping") - reply("pong") + self.reply("pong") case BinaryString("Die") => throw new RuntimeException("DIE") } @@ -55,11 +55,11 @@ object Log { } @serializable class RemotePingPong3Actor extends Actor { - dispatcher = Dispatchers.newThreadBasedDispatcher(this) + self.dispatcher = Dispatchers.newThreadBasedDispatcher(self) def receive = { case BinaryString("Ping") => Log.messageLog.put("ping") - reply("pong") + self.reply("pong") case BinaryString("Die") => throw new RuntimeException("DIE") } diff --git a/akka-core/src/test/scala/SerializerSpec.scala b/akka-core/src/test/scala/SerializerSpec.scala index 683d480e08..391af17bc7 100644 --- a/akka-core/src/test/scala/SerializerSpec.scala +++ b/akka-core/src/test/scala/SerializerSpec.scala @@ -6,12 +6,13 @@ import org.scalatest.junit.JUnitSuite import org.junit.{Test, Before, After} import scala.reflect.BeanInfo -@BeanInfo + +//@BeanInfo case class Foo(foo: String) { def this() = this(null) } -@BeanInfo +//@BeanInfo case class MyMessage(val id: String, val value: Tuple2[String, Int]) { private def this() = this(null, null) } diff --git a/akka-core/src/test/scala/ServerInitiatedRemoteActorSample.scala b/akka-core/src/test/scala/ServerInitiatedRemoteActorSample.scala index 7a26bc65f9..f359ead8ae 100644 --- a/akka-core/src/test/scala/ServerInitiatedRemoteActorSample.scala +++ b/akka-core/src/test/scala/ServerInitiatedRemoteActorSample.scala @@ -7,9 +7,10 @@ import se.scalablesolutions.akka.util.Logging import Actor._ class HelloWorldActor extends Actor { - start + self.start + def receive = { - case "Hello" => reply("World") + case "Hello" => self.reply("World") } } diff --git a/akka-core/src/test/scala/ServerInitiatedRemoteActorSpec.scala b/akka-core/src/test/scala/ServerInitiatedRemoteActorSpec.scala index 28262d6fc7..d02f21dfc4 100644 --- a/akka-core/src/test/scala/ServerInitiatedRemoteActorSpec.scala +++ b/akka-core/src/test/scala/ServerInitiatedRemoteActorSpec.scala @@ -19,7 +19,6 @@ object ServerInitiatedRemoteActorSpec { val latch = new CountDownLatch(1) } class RemoteActorSpecActorUnidirectional extends Actor { - start def receive = { case "OneWay" => @@ -28,10 +27,10 @@ object ServerInitiatedRemoteActorSpec { } class RemoteActorSpecActorBidirectional extends Actor { - start + def receive = { case "Hello" => - reply("World") + self.reply("World") case "Failure" => throw new RuntimeException("expected") } @@ -41,17 +40,13 @@ object ServerInitiatedRemoteActorSpec { val latch = new CountDownLatch(1) } class RemoteActorSpecActorAsyncSender extends Actor { - start + def receive = { case Send(actor: ActorRef) => actor ! "Hello" case "World" => RemoteActorSpecActorAsyncSender.latch.countDown } - - def send(actor: ActorRef) { - self ! Send(actor) - } } } @@ -62,7 +57,7 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite { private val unit = TimeUnit.MILLISECONDS @Before - def init() { + def init { server = new RemoteServer() server.start(HOSTNAME, PORT) @@ -76,14 +71,18 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite { // make sure the servers shutdown cleanly after the test has finished @After - def finished() { - server.shutdown - RemoteClient.shutdownAll - Thread.sleep(1000) + def finished { + try { + server.shutdown + RemoteClient.shutdownAll + Thread.sleep(1000) + } catch { + case e => () + } } @Test - def shouldSendOneWay { + def shouldSendWithBang { val actor = RemoteClient.actorFor( "se.scalablesolutions.akka.actor.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorUnidirectional", 5000L, @@ -94,7 +93,7 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite { } @Test - def shouldSendReplyAsync { + def shouldSendWithBangBangAndGetReply { val actor = RemoteClient.actorFor( "se.scalablesolutions.akka.actor.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorBidirectional", 5000L, @@ -105,22 +104,22 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite { } @Test - def shouldSendRemoteReplyProtocol { + def shouldSendWithBangAndGetReplyThroughSenderRef { implicit val timeout = 500000000L val actor = RemoteClient.actorFor( "se.scalablesolutions.akka.actor.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorBidirectional", timeout, HOSTNAME, PORT) val sender = newActor[RemoteActorSpecActorAsyncSender] - sender.setReplyToAddress(HOSTNAME, PORT) + sender.homeAddress = (HOSTNAME, PORT + 1) sender.start - sender.send(actor) + sender ! Send(actor) assert(RemoteActorSpecActorAsyncSender.latch.await(1, TimeUnit.SECONDS)) actor.stop } @Test - def shouldSendReceiveException { + def shouldSendWithBangBangAndReplyWithException { implicit val timeout = 500000000L val actor = RemoteClient.actorFor( "se.scalablesolutions.akka.actor.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorBidirectional", diff --git a/akka-core/src/test/scala/SupervisorSpec.scala b/akka-core/src/test/scala/SupervisorSpec.scala index 90781d6f45..0c36e0466d 100644 --- a/akka-core/src/test/scala/SupervisorSpec.scala +++ b/akka-core/src/test/scala/SupervisorSpec.scala @@ -4,8 +4,6 @@ package se.scalablesolutions.akka.actor -import java.util.concurrent.{TimeUnit, BlockingQueue, LinkedBlockingQueue} - import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.dispatch.Dispatchers import se.scalablesolutions.akka.{OneWay, Die, Ping} @@ -13,16 +11,23 @@ import Actor._ import org.scalatest.junit.JUnitSuite import org.junit.Test +import java.util.concurrent.{ConcurrentLinkedQueue, TimeUnit, BlockingQueue, LinkedBlockingQueue} object SupervisorSpec { - var messageLog: BlockingQueue[String] = new LinkedBlockingQueue[String] - var oneWayLog: BlockingQueue[String] = new LinkedBlockingQueue[String] + var messageLog = new LinkedBlockingQueue[String] + var oneWayLog = new LinkedBlockingQueue[String] + def clearMessageLogs { + messageLog.clear + oneWayLog.clear + } + class PingPong1Actor extends Actor { + self.timeout = 1000 def receive = { case Ping => messageLog.put("ping") - reply("pong") + self.reply("pong") case OneWay => oneWayLog.put("oneway") @@ -36,10 +41,11 @@ object SupervisorSpec { } class PingPong2Actor extends Actor { + self.timeout = 1000 def receive = { case Ping => messageLog.put("ping") - reply("pong") + self.reply("pong") case Die => throw new RuntimeException("DIE") } @@ -49,10 +55,11 @@ object SupervisorSpec { } class PingPong3Actor extends Actor { + self.timeout = 1000 def receive = { case Ping => messageLog.put("ping") - reply("pong") + self.reply("pong") case Die => throw new RuntimeException("DIE") } @@ -74,31 +81,31 @@ class SupervisorSpec extends JUnitSuite { var pingpong3: ActorRef = _ @Test def shouldStartServer = { - messageLog.clear + clearMessageLogs val sup = getSingleActorAllForOneSupervisor sup.start expect("pong") { - (pingpong1 !! Ping).getOrElse("nil") + (pingpong1 !! (Ping, 100)).getOrElse("nil") } } @Test def shouldStartServerForNestedSupervisorHierarchy = { - messageLog.clear + clearMessageLogs val sup = getNestedSupervisorsAllForOneConf sup.start expect("pong") { - (pingpong1 !! Ping).getOrElse("nil") + (pingpong1 !! (Ping, 100)).getOrElse("nil") } } @Test def shouldKillSingleActorOneForOne = { - messageLog.clear + clearMessageLogs val sup = getSingleActorOneForOneSupervisor sup.start intercept[RuntimeException] { - pingpong1 !! Die + pingpong1 !! (Die, 100) } expect("DIE") { @@ -107,25 +114,25 @@ class SupervisorSpec extends JUnitSuite { } @Test def shouldCallKillCallSingleActorOneForOne = { - messageLog.clear + clearMessageLogs val sup = getSingleActorOneForOneSupervisor sup.start expect("pong") { - (pingpong1 !! Ping).getOrElse("nil") + (pingpong1 !! (Ping, 100)).getOrElse("nil") } expect("ping") { messageLog.poll(1, TimeUnit.SECONDS) } intercept[RuntimeException] { - pingpong1 !! Die + pingpong1 !! (Die, 100) } expect("DIE") { messageLog.poll(1, TimeUnit.SECONDS) } expect("pong") { - (pingpong1 !! Ping).getOrElse("nil") + (pingpong1 !! (Ping, 100)).getOrElse("nil") } expect("ping") { @@ -134,11 +141,11 @@ class SupervisorSpec extends JUnitSuite { } @Test def shouldKillSingleActorAllForOne = { - messageLog.clear + clearMessageLogs val sup = getSingleActorAllForOneSupervisor sup.start intercept[RuntimeException] { - pingpong1 !! Die + pingpong1 !! (Die, 100) } expect("DIE") { @@ -147,25 +154,25 @@ class SupervisorSpec extends JUnitSuite { } @Test def shouldCallKillCallSingleActorAllForOne = { - messageLog.clear + clearMessageLogs val sup = getSingleActorAllForOneSupervisor sup.start expect("pong") { - (pingpong1 !! Ping).getOrElse("nil") + (pingpong1 !! (Ping, 100)).getOrElse("nil") } expect("ping") { messageLog.poll(1, TimeUnit.SECONDS) } intercept[RuntimeException] { - pingpong1 !! Die + pingpong1 !! (Die, 100) } expect("DIE") { messageLog.poll(1, TimeUnit.SECONDS) } expect("pong") { - (pingpong1 !! Ping).getOrElse("nil") + (pingpong1 !! (Ping, 100)).getOrElse("nil") } expect("ping") { @@ -174,11 +181,11 @@ class SupervisorSpec extends JUnitSuite { } @Test def shouldKillMultipleActorsOneForOne = { - messageLog.clear + clearMessageLogs val sup = getMultipleActorsOneForOneConf sup.start intercept[RuntimeException] { - pingpong3 !! Die + pingpong3 !! (Die, 100) } expect("DIE") { @@ -186,20 +193,20 @@ class SupervisorSpec extends JUnitSuite { } } - def tesCallKillCallMultipleActorsOneForOne = { - messageLog.clear + @Test def shouldKillCallMultipleActorsOneForOne = { + clearMessageLogs val sup = getMultipleActorsOneForOneConf sup.start expect("pong") { - (pingpong1 !! Ping).getOrElse("nil") + (pingpong1 !! (Ping, 100)).getOrElse("nil") } expect("pong") { - (pingpong2 !! Ping).getOrElse("nil") + (pingpong2 !! (Ping, 100)).getOrElse("nil") } expect("pong") { - (pingpong3 !! Ping).getOrElse("nil") + (pingpong3 !! (Ping, 100)).getOrElse("nil") } expect("ping") { @@ -212,22 +219,22 @@ class SupervisorSpec extends JUnitSuite { messageLog.poll(1, TimeUnit.SECONDS) } intercept[RuntimeException] { - pingpong2 !! Die + pingpong2 !! (Die, 100) } expect("DIE") { messageLog.poll(1, TimeUnit.SECONDS) } expect("pong") { - (pingpong1 !! Ping).getOrElse("nil") + (pingpong1 !! (Ping, 100)).getOrElse("nil") } expect("pong") { - (pingpong2 !! Ping).getOrElse("nil") + (pingpong2 !! (Ping, 100)).getOrElse("nil") } expect("pong") { - (pingpong3 !! Ping).getOrElse("nil") + (pingpong3 !! (Ping, 100)).getOrElse("nil") } expect("ping") { @@ -242,11 +249,11 @@ class SupervisorSpec extends JUnitSuite { } @Test def shouldKillMultipleActorsAllForOne = { - messageLog.clear + clearMessageLogs val sup = getMultipleActorsAllForOneConf sup.start intercept[RuntimeException] { - pingpong2 !! Die + pingpong2 !! (Die, 100) } expect("DIE") { @@ -260,20 +267,20 @@ class SupervisorSpec extends JUnitSuite { } } - def tesCallKillCallMultipleActorsAllForOne = { - messageLog.clear + @Test def shouldCallKillCallMultipleActorsAllForOne = { + clearMessageLogs val sup = getMultipleActorsAllForOneConf sup.start expect("pong") { - (pingpong1 !! Ping).getOrElse("nil") + (pingpong1 !! (Ping, 100)).getOrElse("nil") } expect("pong") { - (pingpong2 !! Ping).getOrElse("nil") + (pingpong2 !! (Ping, 100)).getOrElse("nil") } expect("pong") { - (pingpong3 !! Ping).getOrElse("nil") + (pingpong3 !! (Ping, 100)).getOrElse("nil") } expect("ping") { @@ -286,7 +293,7 @@ class SupervisorSpec extends JUnitSuite { messageLog.poll(1, TimeUnit.SECONDS) } intercept[RuntimeException] { - pingpong2 !! Die + pingpong2 !! (Die, 100) } expect("DIE") { @@ -299,15 +306,15 @@ class SupervisorSpec extends JUnitSuite { messageLog.poll(1, TimeUnit.SECONDS) } expect("pong") { - (pingpong1 !! Ping).getOrElse("nil") + (pingpong1 !! (Ping, 100)).getOrElse("nil") } expect("pong") { - (pingpong2 !! Ping).getOrElse("nil") + (pingpong2 !! (Ping, 100)).getOrElse("nil") } expect("pong") { - (pingpong3 !! Ping).getOrElse("nil") + (pingpong3 !! (Ping, 100)).getOrElse("nil") } expect("ping") { @@ -322,7 +329,7 @@ class SupervisorSpec extends JUnitSuite { } @Test def shouldOneWayKillSingleActorOneForOne = { - messageLog.clear + clearMessageLogs val sup = getSingleActorOneForOneSupervisor sup.start pingpong1 ! Die @@ -333,7 +340,7 @@ class SupervisorSpec extends JUnitSuite { } @Test def shouldOneWayCallKillCallSingleActorOneForOne = { - messageLog.clear + clearMessageLogs val sup = getSingleActorOneForOneSupervisor sup.start pingpong1 ! OneWay @@ -347,27 +354,28 @@ class SupervisorSpec extends JUnitSuite { messageLog.poll(1, TimeUnit.SECONDS) } pingpong1 ! OneWay - + expect("oneway") { oneWayLog.poll(1, TimeUnit.SECONDS) } } +/* @Test def shouldRestartKilledActorsForNestedSupervisorHierarchy = { - messageLog.clear + clearMessageLogs val sup = getNestedSupervisorsAllForOneConf sup.start expect("pong") { - (pingpong1 !! Ping).getOrElse("nil") + (pingpong1 !! (Ping, 100)).getOrElse("nil") } expect("pong") { - (pingpong2 !! Ping).getOrElse("nil") + (pingpong2 !! (Ping, 100)).getOrElse("nil") } expect("pong") { - (pingpong3 !! Ping).getOrElse("nil") + (pingpong3 !! (Ping, 100)).getOrElse("nil") } expect("ping") { @@ -380,11 +388,11 @@ class SupervisorSpec extends JUnitSuite { messageLog.poll(1, TimeUnit.SECONDS) } intercept[RuntimeException] { - pingpong2 !! Die + pingpong2 !! (Die, 100) } expect("DIE") { - messageLog.poll(1, TimeUnit.SECONDS) + messageLog.poll(1 , TimeUnit.SECONDS) } expect("DIE") { messageLog.poll(1, TimeUnit.SECONDS) @@ -393,15 +401,15 @@ class SupervisorSpec extends JUnitSuite { messageLog.poll(1, TimeUnit.SECONDS) } expect("pong") { - (pingpong1 !! Ping).getOrElse("nil") + (pingpong1 !! (Ping, 100)).getOrElse("nil") } expect("pong") { - (pingpong2 !! Ping).getOrElse("nil") + (pingpong2 !! (Ping, 100)).getOrElse("nil") } expect("pong") { - (pingpong3 !! Ping).getOrElse("nil") + (pingpong3 !! (Ping, 100)).getOrElse("nil") } expect("ping") { @@ -414,9 +422,9 @@ class SupervisorSpec extends JUnitSuite { messageLog.poll(1, TimeUnit.SECONDS) } } - +*/ // ============================================= - // Creat some supervisors with different configurations + // Create some supervisors with different configurations def getSingleActorAllForOneSupervisor: Supervisor = { pingpong1 = newActor[PingPong1Actor].start diff --git a/akka-core/src/test/scala/ThreadBasedActorSpec.scala b/akka-core/src/test/scala/ThreadBasedActorSpec.scala index 935d74e872..c5bb6b1a3a 100644 --- a/akka-core/src/test/scala/ThreadBasedActorSpec.scala +++ b/akka-core/src/test/scala/ThreadBasedActorSpec.scala @@ -9,11 +9,11 @@ import Actor._ object ThreadBasedActorSpec { class TestActor extends Actor { - dispatcher = Dispatchers.newThreadBasedDispatcher(this) + self.dispatcher = Dispatchers.newThreadBasedDispatcher(self) def receive = { case "Hello" => - reply("World") + self.reply("World") case "Failure" => throw new RuntimeException("expected") } @@ -28,7 +28,7 @@ class ThreadBasedActorSpec extends JUnitSuite { @Test def shouldSendOneWay { var oneWay = new CountDownLatch(1) val actor = newActor(() => new Actor { - dispatcher = Dispatchers.newThreadBasedDispatcher(this) + self.dispatcher = Dispatchers.newThreadBasedDispatcher(self) def receive = { case "OneWay" => oneWay.countDown } diff --git a/akka-core/src/test/scala/ThreadBasedDispatcherSpec.scala b/akka-core/src/test/scala/ThreadBasedDispatcherSpec.scala index f5b0235430..d3f635b8ca 100644 --- a/akka-core/src/test/scala/ThreadBasedDispatcherSpec.scala +++ b/akka-core/src/test/scala/ThreadBasedDispatcherSpec.scala @@ -12,6 +12,8 @@ import org.junit.{Test, Before} import se.scalablesolutions.akka.actor.Actor import Actor._ +// FIXME use this test when we have removed the MessageInvoker classes +/* class ThreadBasedDispatcherSpec extends JUnitSuite { private var threadingIssueDetected: AtomicBoolean = null val key1 = newActor(() => new Actor { def receive = { case _ => {}} }) @@ -86,3 +88,4 @@ class ThreadBasedDispatcherSpec extends JUnitSuite { dispatcher.shutdown } } +*/ \ No newline at end of file diff --git a/akka-http/src/main/scala/AkkaClusterBroadcastFilter.scala b/akka-http/src/main/scala/AkkaClusterBroadcastFilter.scala index 8fdd47fddd..210d6e48c4 100644 --- a/akka-http/src/main/scala/AkkaClusterBroadcastFilter.scala +++ b/akka-http/src/main/scala/AkkaClusterBroadcastFilter.scala @@ -24,10 +24,10 @@ class AkkaClusterBroadcastFilter extends Actor with ClusterBroadcastFilter[AnyRe @BeanProperty var clusterName = "" @BeanProperty var broadcaster : Broadcaster = null - override def init : Unit = () - - /** Stops the actor */ - def destroy : Unit = stop + /** + * Stops the actor + */ + def destroy: Unit = self.stop /** * Relays all non ClusterCometBroadcast messages to the other AkkaClusterBroadcastFilters in the cluster @@ -45,10 +45,10 @@ class AkkaClusterBroadcastFilter extends Actor with ClusterBroadcastFilter[AnyRe def receive = { //Only handle messages intended for this particular instance - case b@ClusterCometBroadcast(c,_) if (c == clusterName) && (broadcaster ne null) => broadcaster broadcast b + case b @ ClusterCometBroadcast(c, _) if (c == clusterName) && (broadcaster ne null) => broadcaster broadcast b case _ => } //Since this class is instantiated by Atmosphere, we need to make sure it's started - start + self.start } \ No newline at end of file diff --git a/akka-http/src/main/scala/Security.scala b/akka-http/src/main/scala/Security.scala index 9ea966505e..2d57c9080d 100644 --- a/akka-http/src/main/scala/Security.scala +++ b/akka-http/src/main/scala/Security.scala @@ -192,10 +192,10 @@ trait AuthenticationActor[C <: Credentials] extends Actor { verify(extractCredentials(req)) match { case Some(u: UserInfo) => { req.setSecurityContext(mkSecurityContext(req, u)) - if (roles.exists(req.isUserInRole(_))) reply(OK) - else reply(Response.status(Response.Status.FORBIDDEN).build) + if (roles.exists(req.isUserInRole(_))) self.reply(OK) + else self.reply(Response.status(Response.Status.FORBIDDEN).build) } - case _ => reply(unauthorized) + case _ => self.reply(unauthorized) } } } diff --git a/akka-persistence/akka-persistence-cassandra/src/test/scala/CassandraPersistentActorSpec.scala b/akka-persistence/akka-persistence-cassandra/src/test/scala/CassandraPersistentActorSpec.scala index b47e11c982..66c0286407 100644 --- a/akka-persistence/akka-persistence-cassandra/src/test/scala/CassandraPersistentActorSpec.scala +++ b/akka-persistence/akka-persistence-cassandra/src/test/scala/CassandraPersistentActorSpec.scala @@ -26,7 +26,7 @@ case class SuccessOneWay(key: String, value: String) case class FailureOneWay(key: String, value: String, failer: ActorRef) class CassandraPersistentActor extends Transactor { - timeout = 100000 + self.timeout = 100000 private lazy val mapState = CassandraStorage.newMap private lazy val vectorState = CassandraStorage.newVector @@ -34,31 +34,31 @@ class CassandraPersistentActor extends Transactor { def receive = { case GetMapState(key) => - reply(mapState.get(key.getBytes("UTF-8")).get) + self.reply(mapState.get(key.getBytes("UTF-8")).get) case GetVectorSize => - reply(vectorState.length.asInstanceOf[AnyRef]) + self.reply(vectorState.length.asInstanceOf[AnyRef]) case GetRefState => - reply(refState.get.get) + self.reply(refState.get.get) case SetMapState(key, msg) => mapState.put(key.getBytes("UTF-8"), msg.getBytes("UTF-8")) - reply(msg) + self.reply(msg) case SetVectorState(msg) => vectorState.add(msg.getBytes("UTF-8")) - reply(msg) + self.reply(msg) case SetRefState(msg) => refState.swap(msg.getBytes("UTF-8")) - reply(msg) + self.reply(msg) case Success(key, msg) => mapState.put(key.getBytes("UTF-8"), msg.getBytes("UTF-8")) vectorState.add(msg.getBytes("UTF-8")) refState.swap(msg.getBytes("UTF-8")) - reply(msg) + self.reply(msg) case Failure(key, msg, failer) => mapState.put(key.getBytes("UTF-8"), msg.getBytes("UTF-8")) vectorState.add(msg.getBytes("UTF-8")) refState.swap(msg.getBytes("UTF-8")) failer !! "Failure" - reply(msg) + self.reply(msg) } } diff --git a/akka-persistence/akka-persistence-mongo/src/test/scala/MongoPersistentActorSpec.scala b/akka-persistence/akka-persistence-mongo/src/test/scala/MongoPersistentActorSpec.scala index d2fbc959be..06a8bf381e 100644 --- a/akka-persistence/akka-persistence-mongo/src/test/scala/MongoPersistentActorSpec.scala +++ b/akka-persistence/akka-persistence-mongo/src/test/scala/MongoPersistentActorSpec.scala @@ -39,7 +39,7 @@ class BankAccountActor extends Transactor { // check balance case Balance(accountNo) => txnLog.add("Balance:" + accountNo) - reply(accountState.get(accountNo).get) + self.reply(accountState.get(accountNo).get) // debit amount: can fail case Debit(accountNo, amount, failer) => @@ -54,7 +54,7 @@ class BankAccountActor extends Transactor { accountState.put(accountNo, (m - amount)) if (amount > m) failer !! "Failure" - reply(m - amount) + self.reply(m - amount) // many debits: can fail // demonstrates true rollback even if multiple puts have been done @@ -72,7 +72,7 @@ class BankAccountActor extends Transactor { accountState.put(accountNo, (m - bal)) } if (bal > m) failer !! "Failure" - reply(m - bal) + self.reply(m - bal) // credit amount case Credit(accountNo, amount) => @@ -85,13 +85,13 @@ class BankAccountActor extends Transactor { case None => 0 } accountState.put(accountNo, (m + amount)) - reply(m + amount) + self.reply(m + amount) case LogSize => - reply(txnLog.length.asInstanceOf[AnyRef]) + self.reply(txnLog.length.asInstanceOf[AnyRef]) case Log(start, finish) => - reply(txnLog.slice(start, finish)) + self.reply(txnLog.slice(start, finish)) } } diff --git a/akka-persistence/akka-persistence-redis/src/main/scala/RedisPubSubServer.scala b/akka-persistence/akka-persistence-redis/src/main/scala/RedisPubSubServer.scala index c5621361fb..53205a029d 100644 --- a/akka-persistence/akka-persistence-redis/src/main/scala/RedisPubSubServer.scala +++ b/akka-persistence/akka-persistence-redis/src/main/scala/RedisPubSubServer.scala @@ -16,19 +16,19 @@ class Subscriber(client: RedisClient) extends Actor { def receive = { case Subscribe(channels) => client.subscribe(channels.head, channels.tail: _*)(callback) - reply(true) + self.reply(true) case Register(cb) => callback = cb - reply(true) + self.reply(true) case Unsubscribe(channels) => client.unsubscribe(channels.head, channels.tail: _*) - reply(true) + self.reply(true) case UnsubscribeAll => client.unsubscribe - reply(true) + self.reply(true) } } @@ -36,7 +36,7 @@ class Publisher(client: RedisClient) extends Actor { def receive = { case Publish(channel, message) => client.publish(channel, message) - reply(true) + self.reply(true) } } diff --git a/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentActorSpec.scala b/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentActorSpec.scala index 889078a544..77e1143bb3 100644 --- a/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentActorSpec.scala +++ b/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentActorSpec.scala @@ -35,7 +35,7 @@ class AccountActor extends Transactor { // check balance case Balance(accountNo) => txnLog.add("Balance:%s".format(accountNo).getBytes) - reply(BigInt(new String(accountState.get(accountNo.getBytes).get))) + self.reply(BigInt(new String(accountState.get(accountNo.getBytes).get))) // debit amount: can fail case Debit(accountNo, amount, failer) => @@ -49,7 +49,7 @@ class AccountActor extends Transactor { accountState.put(accountNo.getBytes, (m - amount).toString.getBytes) if (amount > m) failer !! "Failure" - reply(m - amount) + self.reply(m - amount) // many debits: can fail // demonstrates true rollback even if multiple puts have been done @@ -67,7 +67,7 @@ class AccountActor extends Transactor { accountState.put(accountNo.getBytes, (m - bal).toString.getBytes) } if (bal > m) failer !! "Failure" - reply(m - bal) + self.reply(m - bal) // credit amount case Credit(accountNo, amount) => @@ -79,13 +79,13 @@ class AccountActor extends Transactor { case None => 0 } accountState.put(accountNo.getBytes, (m + amount).toString.getBytes) - reply(m + amount) + self.reply(m + amount) case LogSize => - reply(txnLog.length.asInstanceOf[AnyRef]) + self.reply(txnLog.length.asInstanceOf[AnyRef]) case Log(start, finish) => - reply(txnLog.slice(start, finish)) + self.reply(txnLog.slice(start, finish)) } } diff --git a/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentQSpec.scala b/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentQSpec.scala index 6a4b42e3b9..85fde879a6 100644 --- a/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentQSpec.scala +++ b/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentQSpec.scala @@ -25,12 +25,12 @@ class QueueActor extends Transactor { // enqueue case NQ(accountNo) => accounts.enqueue(accountNo.getBytes) - reply(true) + self.reply(true) // dequeue case DQ => val d = new String(accounts.dequeue) - reply(d) + self.reply(d) // multiple NQ and DQ case MNDQ(enqs, no, failer) => @@ -41,11 +41,11 @@ class QueueActor extends Transactor { case e: Exception => failer !! "Failure" } - reply(true) + self.reply(true) // size case SZ => - reply(accounts.size) + self.reply(accounts.size) } } diff --git a/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentSortedSetSpec.scala b/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentSortedSetSpec.scala index f970e3fd27..f7a23b6d34 100644 --- a/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentSortedSetSpec.scala +++ b/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentSortedSetSpec.scala @@ -46,26 +46,26 @@ case class RANGE(start: Int, end: Int) case class MULTI(add: List[Hacker], rem: List[Hacker], failer: ActorRef) class SortedSetActor extends Transactor { - timeout = 100000 + self.timeout = 100000 private lazy val hackers = RedisStorage.newSortedSet def receive = { case ADD(h) => hackers.+(h.name.getBytes, h.zscore) - reply(true) + self.reply(true) case REMOVE(h) => hackers.-(h.name.getBytes) - reply(true) + self.reply(true) case SIZE => - reply(hackers.size) + self.reply(hackers.size) case SCORE(h) => - reply(hackers.zscore(h.name.getBytes)) + self.reply(hackers.zscore(h.name.getBytes)) case RANGE(s, e) => - reply(hackers.zrange(s, e)) + self.reply(hackers.zrange(s, e)) case MULTI(a, r, failer) => a.foreach{ h: Hacker => @@ -81,7 +81,7 @@ class SortedSetActor extends Transactor { case e: Exception => failer !! "Failure" } - reply((a.size, r.size)) + self.reply((a.size, r.size)) } } diff --git a/akka-samples/akka-sample-camel/src/main/scala/Actors.scala b/akka-samples/akka-sample-camel/src/main/scala/Actors.scala index d92f742e6f..be50d898c2 100644 --- a/akka-samples/akka-sample-camel/src/main/scala/Actors.scala +++ b/akka-samples/akka-sample-camel/src/main/scala/Actors.scala @@ -12,7 +12,7 @@ class RemoteActor1 extends RemoteActor("localhost", 7777) with Consumer { def endpointUri = "jetty:http://localhost:6644/remote1" protected def receive = { - case msg: Message => reply(Message("hello %s" format msg.body, Map("sender" -> "remote1"))) + case msg: Message => self.reply(Message("hello %s" format msg.body, Map("sender" -> "remote1"))) } } @@ -23,7 +23,7 @@ class RemoteActor2 extends Actor with Consumer { def endpointUri = "jetty:http://localhost:6644/remote2" protected def receive = { - case msg: Message => reply(Message("hello %s" format msg.body, Map("sender" -> "remote2"))) + case msg: Message => self.reply(Message("hello %s" format msg.body, Map("sender" -> "remote2"))) } } @@ -47,7 +47,7 @@ class Consumer1 extends Actor with Consumer with Logging { @consume("jetty:http://0.0.0.0:8877/camel/test1") class Consumer2 extends Actor { def receive = { - case msg: Message => reply("Hello %s" format msg.bodyAs(classOf[String])) + case msg: Message => self.reply("Hello %s" format msg.bodyAs(classOf[String])) } } @@ -74,7 +74,7 @@ class Subscriber(name:String, uri: String) extends Actor with Consumer with Logg } class Publisher(name: String, uri: String) extends Actor with Producer { - id = name + self.id = name def endpointUri = uri override def oneway = true protected def receive = produce @@ -86,7 +86,7 @@ class PublisherBridge(uri: String, publisher: ActorRef) extends Actor with Consu protected def receive = { case msg: Message => { publisher ! msg.bodyAs(classOf[String]) - reply("message published") + self.reply("message published") } } } diff --git a/akka-samples/akka-sample-camel/src/main/scala/Boot.scala b/akka-samples/akka-sample-camel/src/main/scala/Boot.scala index 81bf83801c..37c1ff651a 100644 --- a/akka-samples/akka-sample-camel/src/main/scala/Boot.scala +++ b/akka-samples/akka-sample-camel/src/main/scala/Boot.scala @@ -59,9 +59,8 @@ class Boot { val jmsSubscriber2 = newActor(() => new Subscriber("jms-subscriber-2", jmsUri)).start val jmsPublisher = newActor(() => new Publisher("jms-publisher", jmsUri)).start - //val cometdPublisherBridge = new PublisherBridge("jetty:http://0.0.0.0:8877/camel/pub/cometd", cometdPublisher).start - val jmsPublisherBridge = new PublisherBridge("jetty:http://0.0.0.0:8877/camel/pub/jms", jmsPublisher).start - + //val cometdPublisherBridge = newActor(() => new PublisherBridge("jetty:http://0.0.0.0:8877/camel/pub/cometd", cometdPublisher)).start + val jmsPublisherBridge = newActor(() => new PublisherBridge("jetty:http://0.0.0.0:8877/camel/pub/jms", jmsPublisher)).start } class CustomRouteBuilder extends RouteBuilder { diff --git a/akka-samples/akka-sample-chat/src/main/scala/ChatServer.scala b/akka-samples/akka-sample-chat/src/main/scala/ChatServer.scala index a293381d9c..01772de6c7 100644 --- a/akka-samples/akka-sample-chat/src/main/scala/ChatServer.scala +++ b/akka-samples/akka-sample-chat/src/main/scala/ChatServer.scala @@ -97,7 +97,7 @@ trait ChatStorage extends Actor * Redis-backed chat storage implementation. */ class RedisChatStorage extends ChatStorage { - lifeCycle = Some(LifeCycle(Permanent)) + self.lifeCycle = Some(LifeCycle(Permanent)) val CHAT_LOG = "akka.chat.log" private var chatLog = atomic { RedisStorage.getVector(CHAT_LOG) } @@ -111,7 +111,7 @@ class RedisChatStorage extends ChatStorage { case GetChatLog(_) => val messageList = atomic { chatLog.map(bytes => new String(bytes, "UTF-8")).toList } - reply(ChatLog(messageList)) + self.reply(ChatLog(messageList)) } override def postRestart(reason: Throwable) = chatLog = RedisStorage.getVector(CHAT_LOG) @@ -163,15 +163,15 @@ trait ChatManagement { this: Actor => * Creates and links a RedisChatStorage. */ trait RedisChatStorageFactory { this: Actor => - val storage = spawnLink[RedisChatStorage] // starts and links ChatStorage + val storage = this.self.spawnLink[RedisChatStorage] // starts and links ChatStorage } /** * Chat server. Manages sessions and redirects all other messages to the Session for the client. */ trait ChatServer extends Actor { - faultHandler = Some(OneForOneStrategy(5, 5000)) - trapExit = List(classOf[Exception]) + self.faultHandler = Some(OneForOneStrategy(5, 5000)) + self.trapExit = List(classOf[Exception]) val storage: ActorRef @@ -188,7 +188,7 @@ trait ChatServer extends Actor { override def shutdown = { log.info("Chat server is shutting down...") shutdownSessions - unlink(storage) + self.unlink(storage) storage.stop } } diff --git a/akka-samples/akka-sample-lift/src/main/scala/akka/SimpleService.scala b/akka-samples/akka-sample-lift/src/main/scala/akka/SimpleService.scala index df832bc45f..33037b8d8b 100644 --- a/akka-samples/akka-sample-lift/src/main/scala/akka/SimpleService.scala +++ b/akka-samples/akka-sample-lift/src/main/scala/akka/SimpleService.scala @@ -32,11 +32,11 @@ class SimpleService extends Transactor { case Tick => if (hasStartedTicking) { val counter = storage.get(KEY).get.asInstanceOf[Integer].intValue storage.put(KEY, new Integer(counter + 1)) - reply(

Tick: {counter + 1}

) + self.reply(

Tick: {counter + 1}

) } else { storage.put(KEY, new Integer(0)) hasStartedTicking = true - reply(

Tick: 0

) + self.reply(

Tick: 0

) } } } @@ -65,11 +65,11 @@ class PersistentSimpleService extends Transactor { val bytes = storage.get(KEY.getBytes).get val counter = ByteBuffer.wrap(bytes).getInt storage.put(KEY.getBytes, ByteBuffer.allocate(4).putInt(counter + 1).array) - reply(Tick:{counter + 1}) + self.reply(Tick:{counter + 1}) } else { storage.put(KEY.getBytes, ByteBuffer.allocate(4).putInt(0).array) hasStartedTicking = true - reply(Tick: 0) + self.reply(Tick: 0) } } } diff --git a/akka-samples/akka-sample-remote/src/main/scala/ClientManagedRemoteActorSample.scala b/akka-samples/akka-sample-remote/src/main/scala/ClientManagedRemoteActorSample.scala index 0f00b29a1c..403a767978 100644 --- a/akka-samples/akka-sample-remote/src/main/scala/ClientManagedRemoteActorSample.scala +++ b/akka-samples/akka-sample-remote/src/main/scala/ClientManagedRemoteActorSample.scala @@ -10,16 +10,14 @@ import se.scalablesolutions.akka.remote.RemoteNode import se.scalablesolutions.akka.util.Logging class RemoteHelloWorldActor extends RemoteActor("localhost", 9999) { - start def receive = { case "Hello" => log.info("Received 'Hello'") - reply("World") + self.reply("World") } } object ClientManagedRemoteActorServer extends Logging { - def run = { RemoteNode.start("localhost", 9999) log.info("Remote node started") @@ -31,7 +29,7 @@ object ClientManagedRemoteActorServer extends Logging { object ClientManagedRemoteActorClient extends Logging { def run = { - val actor = newActor[RemoteHelloWorldActor] + val actor = newActor[RemoteHelloWorldActor].start log.info("Remote actor created, moved to the server") log.info("Sending 'Hello' to remote actor") val result = actor !! "Hello" diff --git a/akka-samples/akka-sample-remote/src/main/scala/ServerManagedRemoteActorSample.scala b/akka-samples/akka-sample-remote/src/main/scala/ServerManagedRemoteActorSample.scala index 62ddf23a53..11960dfbe2 100644 --- a/akka-samples/akka-sample-remote/src/main/scala/ServerManagedRemoteActorSample.scala +++ b/akka-samples/akka-sample-remote/src/main/scala/ServerManagedRemoteActorSample.scala @@ -10,11 +10,10 @@ import se.scalablesolutions.akka.remote.{RemoteClient, RemoteNode} import se.scalablesolutions.akka.util.Logging class HelloWorldActor extends Actor { - start def receive = { case "Hello" => log.info("Received 'Hello'") - reply("World") + self.reply("World") } } @@ -23,7 +22,7 @@ object ServerManagedRemoteActorServer extends Logging { def run = { RemoteNode.start("localhost", 9999) log.info("Remote node started") - RemoteNode.register("hello-service", newActor[HelloWorldActor]) + RemoteNode.register("hello-service", newActor[HelloWorldActor].start) log.info("Remote actor registered and started") } diff --git a/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService.scala b/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService.scala index 53f4663e9d..38cf021c99 100644 --- a/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService.scala +++ b/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService.scala @@ -65,11 +65,11 @@ class SimpleService extends Transactor { case Tick => if (hasStartedTicking) { val counter = storage.get(KEY).get.asInstanceOf[Integer].intValue storage.put(KEY, new Integer(counter + 1)) - reply(Tick:{counter + 1}) + self.reply(Tick:{counter + 1}) } else { storage.put(KEY, new Integer(0)) hasStartedTicking = true - reply(Tick: 0) + self.reply(Tick: 0) } } } @@ -118,11 +118,11 @@ class PersistentSimpleService extends Transactor { val bytes = storage.get(KEY.getBytes).get val counter = ByteBuffer.wrap(bytes).getInt storage.put(KEY.getBytes, ByteBuffer.allocate(4).putInt(counter + 1).array) - reply(Tick:{counter + 1}) + self.reply(Tick:{counter + 1}) } else { storage.put(KEY.getBytes, Array(0.toByte)) hasStartedTicking = true - reply(Tick: 0) + self.reply(Tick: 0) } } } @@ -139,8 +139,8 @@ class Chat extends Actor with Logging { def receive = { case Chat(who, what, msg) => { what match { - case "login" => reply("System Message__" + who + " has joined.") - case "post" => reply("" + who + "__" + msg) + case "login" => self.reply("System Message__" + who + " has joined.") + case "post" => self.reply("" + who + "__" + msg) case _ => throw new WebApplicationException(422) } } diff --git a/akka-samples/akka-sample-security/src/main/scala/SimpleService.scala b/akka-samples/akka-sample-security/src/main/scala/SimpleService.scala index 6ec40ab7fb..b5e1fffb2e 100644 --- a/akka-samples/akka-sample-security/src/main/scala/SimpleService.scala +++ b/akka-samples/akka-sample-security/src/main/scala/SimpleService.scala @@ -4,7 +4,7 @@ package sample.security -import se.scalablesolutions.akka.actor.{SupervisorFactory, Actor} +import se.scalablesolutions.akka.actor.{SupervisorFactory, Transactor, Actor} import se.scalablesolutions.akka.actor.Actor._ import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.util.Logging @@ -90,8 +90,7 @@ import javax.annotation.security.{RolesAllowed, DenyAll, PermitAll} import javax.ws.rs.{GET, Path, Produces} @Path("/secureticker") -class SecureTickActor extends Actor with Logging { - makeTransactionRequired +class SecureTickActor extends Transactor with Logging { case object Tick private val KEY = "COUNTER" @@ -135,11 +134,11 @@ class SecureTickActor extends Actor with Logging { case Tick => if (hasStartedTicking) { val counter = storage.get(KEY).get.intValue storage.put(KEY, counter + 1) - reply(new Integer(counter + 1)) + self.reply(new Integer(counter + 1)) } else { storage.put(KEY, 0) hasStartedTicking = true - reply(new Integer(0)) + self.reply(new Integer(0)) } } } \ No newline at end of file diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 7d287344d2..dbe3abca56 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -42,7 +42,7 @@ provider = "from-jndi" # Options: "from-jndi" (means that Akka will try to detect a TransactionManager in the JNDI) # "atomikos" (means that Akka will use the Atomikos based JTA impl in 'akka-jta', # e.g. you need the akka-jta JARs on classpath). - timeout = 60000 + self.timeout = 60000 diff --git a/deploy/root/page.html b/deploy/root/page.html index 80b460108d..59ee11dc46 100644 --- a/deploy/root/page.html +++ b/deploy/root/page.html @@ -31,7 +31,7 @@ else alert(data); } - } + }; $('#send').click(function(e) { var message = $('#msg').val(); diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index f96ff598e8..a95e529050 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -70,7 +70,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) { lazy val akka_samples = project("akka-samples", "akka-samples", new AkkaSamplesParentProject(_)) // ------------------------------------------------------------ - // create executable jar + // Run Akka microkernel using 'sbt run' + use for packaging executable JAR override def mainClass = Some("se.scalablesolutions.akka.kernel.Main") override def packageOptions = diff --git a/project/plugins/Plugins.scala b/project/plugins/Plugins.scala index a929827cc6..4b21ea189b 100644 --- a/project/plugins/Plugins.scala +++ b/project/plugins/Plugins.scala @@ -1,5 +1,6 @@ import sbt._ class Plugins(info: ProjectInfo) extends PluginDefinition(info) { -// val surefire = "bryanjswift" % "sbt-surefire-reporting" % "0.0.3-SNAPSHOT" +// val repo = "GH-pages repo" at "http://mpeltonen.github.com/maven/" +// val idea = "com.github.mpeltonen" % "sbt-idea-plugin" % "0.1-SNAPSHOT" } \ No newline at end of file