diff --git a/akka-amqp/src/main/scala/AMQP.scala b/akka-amqp/src/main/scala/AMQP.scala index 84f52088e7..1886577fc6 100644 --- a/akka-amqp/src/main/scala/AMQP.scala +++ b/akka-amqp/src/main/scala/AMQP.scala @@ -79,11 +79,12 @@ object AMQP { */ class AMQPSupervisor extends Actor with Logging { import scala.collection.JavaConversions._ - + import self._ + private val connections = new ConcurrentHashMap[ActorRef, ActorRef] - faultHandler = Some(OneForOneStrategy(5, 5000)) - trapExit = List(classOf[Throwable]) + self.faultHandler = Some(OneForOneStrategy(5, 5000)) + self.trapExit = List(classOf[Throwable]) start def newProducer( @@ -362,7 +363,8 @@ object AMQP { extends FaultTolerantConnectionActor { import scala.collection.JavaConversions._ - + import self._ + faultHandler = Some(OneForOneStrategy(5, 5000)) trapExit = List(classOf[Throwable]) diff --git a/akka-amqp/src/main/scala/ExampleSession.scala b/akka-amqp/src/main/scala/ExampleSession.scala index c15ae49b97..119ae07a07 100644 --- a/akka-amqp/src/main/scala/ExampleSession.scala +++ b/akka-amqp/src/main/scala/ExampleSession.scala @@ -5,7 +5,6 @@ package se.scalablesolutions.akka.amqp import se.scalablesolutions.akka.actor.Actor._ -import se.scalablesolutions.akka.actor.Actor.Sender.Self import com.rabbitmq.client.ConnectionParameters diff --git a/akka-camel/src/main/scala/Producer.scala b/akka-camel/src/main/scala/Producer.scala index ae50754e49..6609318900 100644 --- a/akka-camel/src/main/scala/Producer.scala +++ b/akka-camel/src/main/scala/Producer.scala @@ -20,7 +20,7 @@ import se.scalablesolutions.akka.util.Logging * @author Martin Krasser */ trait Producer { this: Actor => - + private val headersToCopyDefault = Set(Message.MessageExchangeId) /** @@ -128,7 +128,7 @@ trait Producer { this: Actor => case msg => { if ( oneway && !async) produceOnewaySync(msg) else if ( oneway && async) produceOnewayAsync(msg) - else if (!oneway && !async) reply(produceSync(msg)) + else if (!oneway && !async) self.reply(produceSync(msg)) else /*(!oneway && async)*/ produceAsync(msg) } } diff --git a/akka-camel/src/test/scala/component/ActorComponentFeatureTest.scala b/akka-camel/src/test/scala/component/ActorComponentFeatureTest.scala index 8e5195cb33..3c100d0bc3 100644 --- a/akka-camel/src/test/scala/component/ActorComponentFeatureTest.scala +++ b/akka-camel/src/test/scala/component/ActorComponentFeatureTest.scala @@ -3,7 +3,7 @@ package se.scalablesolutions.akka.camel.component import org.apache.camel.RuntimeCamelException import org.scalatest.{BeforeAndAfterEach, BeforeAndAfterAll, FeatureSpec} -import se.scalablesolutions.akka.actor.ActorRegistry +import se.scalablesolutions.akka.actor.{ActorRegistry, Actor} import se.scalablesolutions.akka.camel.support.{Respond, Countdown, Tester, Retain} import se.scalablesolutions.akka.camel.{Message, CamelContextManager} @@ -20,39 +20,40 @@ class ActorComponentFeatureTest extends FeatureSpec with BeforeAndAfterAll with feature("Communicate with an actor from a Camel application using actor endpoint URIs") { import CamelContextManager.template - + import Actor._ + scenario("one-way communication using actor id") { - val actor = new Tester with Retain with Countdown[Message] + val actor = newActor(() => new Tester with Retain with Countdown[Message]) actor.start template.sendBody("actor:%s" format actor.id, "Martin") - assert(actor.waitFor) - assert(actor.body === "Martin") + assert(actor.actor.asInstanceOf[Countdown[Message]].waitFor) + assert(actor.actor.asInstanceOf[Retain].body === "Martin") } scenario("one-way communication using actor uuid") { - val actor = new Tester with Retain with Countdown[Message] + val actor = newActor(() => new Tester with Retain with Countdown[Message]) actor.start template.sendBody("actor:uuid:%s" format actor.uuid, "Martin") - assert(actor.waitFor) - assert(actor.body === "Martin") + assert(actor.actor.asInstanceOf[Countdown[Message]].waitFor) + assert(actor.actor.asInstanceOf[Retain].body === "Martin") } scenario("two-way communication using actor id") { - val actor = new Tester with Respond + val actor = newActor(() => new Tester with Respond) actor.start assert(template.requestBody("actor:%s" format actor.id, "Martin") === "Hello Martin") } scenario("two-way communication using actor uuid") { - val actor = new Tester with Respond + val actor = newActor(() => new Tester with Respond) actor.start assert(template.requestBody("actor:uuid:%s" format actor.uuid, "Martin") === "Hello Martin") } scenario("two-way communication with timeout") { - val actor = new Tester { - timeout = 1 - } + val actor = newActor(() => new Tester { + self.timeout = 1 + }) actor.start intercept[RuntimeCamelException] { template.requestBody("actor:uuid:%s" format actor.uuid, "Martin") diff --git a/akka-camel/src/test/scala/component/ActorProducerTest.scala b/akka-camel/src/test/scala/component/ActorProducerTest.scala index 5f7059295f..21a9af4be5 100644 --- a/akka-camel/src/test/scala/component/ActorProducerTest.scala +++ b/akka-camel/src/test/scala/component/ActorProducerTest.scala @@ -61,7 +61,7 @@ class ActorProducerTest extends JUnitSuite with BeforeAndAfterAll { @Test def shouldSendMessageToActorAndTimeout: Unit = { val actor = newActor(() => new Tester { - timeout = 1 + self.timeout = 1 }) val endpoint = mockEndpoint("actor:uuid:%s" format actor.uuid) val exchange = endpoint.createExchange(ExchangePattern.InOut) diff --git a/akka-camel/src/test/scala/service/CamelServiceFeatureTest.scala b/akka-camel/src/test/scala/service/CamelServiceFeatureTest.scala index b876fe14c8..b7c53f42ae 100644 --- a/akka-camel/src/test/scala/service/CamelServiceFeatureTest.scala +++ b/akka-camel/src/test/scala/service/CamelServiceFeatureTest.scala @@ -12,14 +12,14 @@ object CamelServiceFeatureTest { class TestConsumer(uri: String) extends Actor with Consumer { def endpointUri = uri protected def receive = { - case msg: Message => reply("received %s" format msg.body) + case msg: Message => self.reply("received %s" format msg.body) } } class TestActor extends Actor { - id = "custom-actor-id" + self.id = "custom-actor-id" protected def receive = { - case msg: Message => reply("received %s" format msg.body) + case msg: Message => self.reply("received %s" format msg.body) } } diff --git a/akka-camel/src/test/scala/service/PublishTest.scala b/akka-camel/src/test/scala/service/PublishTest.scala index 69910254ec..71ccea65d4 100644 --- a/akka-camel/src/test/scala/service/PublishTest.scala +++ b/akka-camel/src/test/scala/service/PublishTest.scala @@ -11,7 +11,7 @@ import se.scalablesolutions.akka.camel.Consumer object PublishTest { @consume("mock:test1") class ConsumeAnnotatedActor extends Actor { - id = "test" + self.id = "test" protected def receive = null } diff --git a/akka-camel/src/test/scala/support/TestSupport.scala b/akka-camel/src/test/scala/support/TestSupport.scala index 8dc7d4dd04..98bd23d5ed 100644 --- a/akka-camel/src/test/scala/support/TestSupport.scala +++ b/akka-camel/src/test/scala/support/TestSupport.scala @@ -9,10 +9,10 @@ trait Receive[T] { def onMessage(msg: T): Unit } -trait Respond extends Receive[Message] {self: Actor => +trait Respond extends Receive[Message] { this: Actor => abstract override def onMessage(msg: Message): Unit = { super.onMessage(msg) - reply(response(msg)) + this.self.reply(response(msg)) } def response(msg: Message): Any = "Hello %s" format msg.body } diff --git a/akka-core/src/main/scala/actor/ActiveObject.scala b/akka-core/src/main/scala/actor/ActiveObject.scala index 7a4656cec8..bb71179fa2 100644 --- a/akka-core/src/main/scala/actor/ActiveObject.scala +++ b/akka-core/src/main/scala/actor/ActiveObject.scala @@ -284,7 +284,7 @@ object ActiveObject { private[akka] def newInstance[T](target: Class[T], actorRef: ActorRef, remoteAddress: Option[InetSocketAddress], timeout: Long): T = { val proxy = Proxy.newInstance(target, false, false) actorRef.actor.asInstanceOf[Dispatcher].initialize(target, proxy) - actorRef.actor.timeout = timeout + actorRef.timeout = timeout if (remoteAddress.isDefined) actorRef.makeRemote(remoteAddress.get) AspectInitRegistry.register(proxy, AspectInit(target, actorRef, remoteAddress, timeout)) actorRef.start @@ -294,7 +294,7 @@ object ActiveObject { private[akka] def newInstance[T](intf: Class[T], target: AnyRef, actorRef: ActorRef, remoteAddress: Option[InetSocketAddress], timeout: Long): T = { val proxy = Proxy.newInstance(Array(intf), Array(target), false, false) actorRef.actor.asInstanceOf[Dispatcher].initialize(target.getClass, target) - actorRef.actor.timeout = timeout + actorRef.timeout = timeout if (remoteAddress.isDefined) actorRef.makeRemote(remoteAddress.get) AspectInitRegistry.register(proxy, AspectInit(intf, actorRef, remoteAddress, timeout)) actorRef.start @@ -544,8 +544,8 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Op def this(transactionalRequired: Boolean) = this(transactionalRequired,None) private[actor] def initialize(targetClass: Class[_], targetInstance: AnyRef) = { - if (transactionalRequired || targetClass.isAnnotationPresent(Annotations.transactionrequired)) makeTransactionRequired - id = targetClass.getName + if (transactionalRequired || targetClass.isAnnotationPresent(Annotations.transactionrequired)) self.makeTransactionRequired + self.id = targetClass.getName target = Some(targetInstance) val methods = targetInstance.getClass.getDeclaredMethods.toList @@ -592,10 +592,10 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Op case Invocation(joinPoint, isOneWay, _) => if (Actor.SERIALIZE_MESSAGES) serializeArguments(joinPoint) if (isOneWay) joinPoint.proceed - else reply(joinPoint.proceed) -// Jan Kronquist: started work on issue 121 - case Link(target) => link(target) - case Unlink(target) => unlink(target) + else self.reply(joinPoint.proceed) + // Jan Kronquist: started work on issue 121 + case Link(target) => self.link(target) + case Unlink(target) => self.unlink(target) case unexpected => throw new IllegalStateException("Unexpected message [" + unexpected + "] sent to [" + this + "]") } diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index faac8f56bc..1d3a37a4b8 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -50,7 +50,7 @@ trait ActorWithNestedReceive extends Actor { * @author Jonas Bonér */ trait Transactor extends Actor { - makeTransactionRequired + self.makeTransactionRequired } /** @@ -61,7 +61,7 @@ trait Transactor extends Actor { * @author Jonas Bonér */ abstract class RemoteActor(hostname: String, port: Int) extends Actor { - makeRemote(hostname, port) + self.makeRemote(hostname, port) } // Life-cycle messages for the Actors @@ -85,18 +85,10 @@ class ActorInitializationException private[akka](message: String) extends Runtim */ object Actor extends Logging { val TIMEOUT = config.getInt("akka.actor.timeout", 5000) - val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost") - val PORT = config.getInt("akka.remote.server.port", 9999) val SERIALIZE_MESSAGES = config.getBool("akka.actor.serialize-messages", false) private[actor] val actorRefInCreation = new scala.util.DynamicVariable[Option[ActorRef]](None) - // FIXME remove next release - object Sender { - @deprecated("import Actor.Sender.Self is not needed anymore, just use 'actor ! msg'") - object Self - } - /** * Creates a Actor.newActor out of the Actor with type T. *
@@ -143,7 +135,7 @@ object Actor extends Logging {
*/
def actor(body: PartialFunction[Any, Unit]): ActorRef =
newActor(() => new Actor() {
- lifeCycle = Some(LifeCycle(Permanent))
+ self.lifeCycle = Some(LifeCycle(Permanent))
def receive: PartialFunction[Any, Unit] = body
}).start
@@ -165,7 +157,7 @@ object Actor extends Logging {
*/
def transactor(body: PartialFunction[Any, Unit]): ActorRef =
newActor(() => new Transactor() {
- lifeCycle = Some(LifeCycle(Permanent))
+ self.lifeCycle = Some(LifeCycle(Permanent))
def receive: PartialFunction[Any, Unit] = body
}).start
@@ -185,7 +177,7 @@ object Actor extends Logging {
*/
def temporaryActor(body: PartialFunction[Any, Unit]): ActorRef =
newActor(() => new Actor() {
- lifeCycle = Some(LifeCycle(Temporary))
+ self.lifeCycle = Some(LifeCycle(Temporary))
def receive = body
}).start
@@ -210,7 +202,7 @@ object Actor extends Logging {
def handler[A](body: => Unit) = new {
def receive(handler: PartialFunction[Any, Unit]) =
newActor(() => new Actor() {
- lifeCycle = Some(LifeCycle(Permanent))
+ self.lifeCycle = Some(LifeCycle(Permanent))
body
def receive = handler
}).start
@@ -238,7 +230,7 @@ object Actor extends Logging {
newActor(() => new Actor() {
self ! Spawn
def receive = {
- case Spawn => body; stop
+ case Spawn => body; self.stop
}
}).start
}
@@ -258,12 +250,12 @@ object Actor extends Logging {
* @author Jonas Bonér
*/
trait Actor extends Logging {
-
+
/**
* For internal use only, functions as the implicit sender references when invoking
* one of the message send functions (!, !!, !!! and forward).
*/
- protected implicit val _selfSenderRef: Some[ActorRef] = {
+ implicit val optionSelf: Option[ActorRef] = {
val ref = Actor.actorRefInCreation.value
Actor.actorRefInCreation.value = None
if (ref.isEmpty) throw new ActorInitializationException(
@@ -273,9 +265,11 @@ trait Actor extends Logging {
"\n\tEither use:" +
"\n\t\t'val actor = Actor.newActor[MyActor]', or" +
"\n\t\t'val actor = Actor.newActor(() => new MyActor(..))'")
- else ref.asInstanceOf[Some[ActorRef]]
+ else ref
}
+ implicit val someSelf: Some[ActorRef] = optionSelf.asInstanceOf[Some[ActorRef]]
+
/**
* The 'self' field holds the ActorRef for this actor.
*
@@ -284,29 +278,32 @@ trait Actor extends Logging {
* self ! message
*
*/
- val self: ActorRef = _selfSenderRef.get
+ val self: ActorRef = optionSelf.get
+ self.id = getClass.getName
+
/**
- * The default dispatcher is the Dispatchers.globalExecutorBasedEventDrivenDispatcher.
- * This means that all actors will share the same event-driven executor based dispatcher.
+ * User overridable callback/setting.
*
- * You can override it so it fits the specific use-case that the actor is used for.
- * See the se.scalablesolutions.akka.dispatch.Dispatchers class for the different
- * dispatchers available.
+ * Partial function implementing the actor logic.
+ * To be implemented by subclassing actor.
*
- * The default is also that all actors that are created and spawned from within this actor
- * is sharing the same dispatcher as its creator.
+ * Example code:
+ *
+ * def receive = {
+ * case Ping =>
+ * println("got a ping")
+ * self.reply("pong")
+ *
+ * case OneWay =>
+ * println("got a oneway")
+ *
+ * case _ =>
+ * println("unknown message, ignoring")
+ * }
+ *
*/
- private[akka] var _messageDispatcher: MessageDispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher
-
- /**
- * Holds the hot swapped partial function.
- */
- private var _hotswap: Option[PartialFunction[Any, Unit]] = None // FIXME: _hotswap should be a stack
-
- // ========================================
- // ==== CALLBACKS FOR USER TO OVERRIDE ====
- // ========================================
+ protected def receive: PartialFunction[Any, Unit]
/**
* User overridable callback/setting.
@@ -348,282 +345,14 @@ trait Actor extends Logging {
*/
def shutdown {}
- /**
- * User overridable callback/setting.
- *
- * Partial function implementing the actor logic.
- * To be implemented by subclassing actor.
- *
- * Example code:
- *
- * def receive = {
- * case Ping =>
- * println("got a ping")
- * reply("pong")
- *
- * case OneWay =>
- * println("got a oneway")
- *
- * case _ =>
- * println("unknown message, ignoring")
- * }
- *
- */
- protected def receive: PartialFunction[Any, Unit]
-
- // ==================
- // ==== USER API ====
- // ==================
-
- /**
- * Forwards the message and passes the original sender actor as the sender.
- *
- * Works with '!', '!!' and '!!!'.
- */
- def forward(message: Any)(implicit sender: Some[ActorRef]) = self.forward(message)(sender)
-
- /**
- * User overridable callback/setting.
- *
- * Identifier for actor, does not have to be a unique one.
- * Default is the class name.
- *
- * This field is used for logging, AspectRegistry.actorsFor, identifier for remote actor in RemoteServer etc.
- * But also as the identifier for persistence, which means that you can
- * use a custom name to be able to retrieve the "correct" persisted state
- * upon restart, remote restart etc.
- */
- protected[akka] var id: String = this.getClass.getName
-
- /**
- * User overridable callback/setting.
- *
- * Defines the default timeout for '!!' and '!!!' invocations,
- * e.g. the timeout for the future returned by the call to '!!' and '!!!'.
- */
- @volatile var timeout: Long = Actor.TIMEOUT
-
- /**
- * User overridable callback/setting.
- *
- * Set trapExit to the list of exception classes that the actor should be able to trap
- * from the actor it is supervising. When the supervising actor throws these exceptions
- * then they will trigger a restart.
- *
- * - * // trap no exceptions - * trapExit = Nil - * - * // trap all exceptions - * trapExit = List(classOf[Throwable]) - * - * // trap specific exceptions only - * trapExit = List(classOf[MyApplicationException], classOf[MyApplicationError]) - *- */ - protected[akka] var trapExit: List[Class[_ <: Throwable]] = Nil - - /** - * User overridable callback/setting. - * - * If 'trapExit' is set for the actor to act as supervisor, then a faultHandler must be defined. - * Can be one of: - * - * faultHandler = Some(AllForOneStrategy(maxNrOfRetries, withinTimeRange)) - * - * faultHandler = Some(OneForOneStrategy(maxNrOfRetries, withinTimeRange)) - * - */ - protected[akka] var faultHandler: Option[FaultHandlingStrategy] = None - - /** - * User overridable callback/setting. - * - * Defines the life-cycle for a supervised actor. - */ - @volatile var lifeCycle: Option[LifeCycle] = None - - /** - * Use
reply(..) to reply with a message to the original sender of the message currently
- * being processed.
- *
- * Throws an IllegalStateException if unable to determine what to reply to
- */
- protected[this] def reply(message: Any) = if(!reply_?(message)) throw new IllegalStateException(
- "\n\tNo sender in scope, can't reply. " +
- "\n\tYou have probably used the '!' method to either; " +
- "\n\t\t1. Send a message to a remote actor which does not have a contact address." +
- "\n\t\t2. Send a message from an instance that is *not* an actor" +
- "\n\t\t3. Send a message to an Active Object annotated with the '@oneway' annotation? " +
- "\n\tIf so, switch to '!!' (or remove '@oneway') which passes on an implicit future" +
- "\n\tthat will be bound by the argument passed to 'reply'." +
- "\n\tAlternatively, you can use setReplyToAddress to make sure the actor can be contacted over the network.")
-
- /**
- * Use reply_?(..) to reply with a message to the original sender of the message currently
- * being processed.
- *
- * Returns true if reply was sent, and false if unable to determine what to reply to.
- */
- protected[this] def reply_?(message: Any): Boolean = self.replyTo match {
- case Some(Left(actor)) =>
- actor ! message
- true
- case Some(Right(future: Future[Any])) =>
- future completeWithResult message
- true
- case _ =>
- false
- }
-
- /**
- * Starts the actor.
- */
- def start = self.startOnCreation = true
-
- /**
- * Shuts down the actor its dispatcher and message queue.
- * Alias for 'stop'.
- */
- def exit = self.stop
-
- /**
- * Shuts down the actor its dispatcher and message queue.
- */
- def stop = self.stop
-
- /**
- * Returns the uuid for the actor.
- */
- def uuid = self.uuid
-
- /**
- * Sets the dispatcher for this actor. Needs to be invoked before the actor is started.
- *
- * The default dispatcher is the Dispatchers.globalExecutorBasedEventDrivenDispatcher.
- * This means that all actors will share the same event-driven executor based dispatcher.
- *
- * You can override it so it fits the specific use-case that the actor is used for.
- * See the se.scalablesolutions.akka.dispatch.Dispatchers class for the different
- * dispatchers available.
- *
- * The default is also that all actors that are created and spawned from within this actor
- * is sharing the same dispatcher as its creator.
- */
- def dispatcher_=(md: MessageDispatcher): Unit =
- if (!self.isRunning) _messageDispatcher = md
- else throw new IllegalArgumentException(
- "Can not swap dispatcher for " + toString + " after it has been started")
-
- /**
- * Get the dispatcher for this actor.
- */
- def dispatcher: MessageDispatcher = _messageDispatcher
-
- /**
- * Invoking 'makeTransactionRequired' means that the actor will **start** a new transaction if non exists.
- * However, it will always participate in an existing transaction.
- * If transactionality want to be completely turned off then do it by invoking:
- *
- * TransactionManagement.disableTransactions
- *
- */
- def makeTransactionRequired = self.makeTransactionRequired
-
- /**
- * Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host.
- */
- def makeRemote(hostname: String, port: Int): Unit = makeRemote(new InetSocketAddress(hostname, port))
-
- /**
- * Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host.
- */
- def makeRemote(address: InetSocketAddress): Unit = self.makeRemote(address)
-
- /**
- * Set the contact address for this actor. This is used for replying to messages sent
- * asynchronously when no reply channel exists.
- */
- def setReplyToAddress(hostname: String, port: Int): Unit = self.setReplyToAddress(new InetSocketAddress(hostname, port))
-
- /**
- * Set the contact address for this actor. This is used for replying to messages sent
- * asynchronously when no reply channel exists.
- */
- def setReplyToAddress(address: InetSocketAddress): Unit = self.setReplyToAddress(address)
-
- /**
- * Links an other actor to this actor. Links are unidirectional and means that a the linking actor will
- * receive a notification if the linked actor has crashed.
- *
- * If the 'trapExit' member field has been set to at contain at least one exception class then it will
- * 'trap' these exceptions and automatically restart the linked actors according to the restart strategy
- * defined by the 'faultHandler'.
- *
- * To be invoked from within the actor itself.
- */
- protected[this] def link(actorRef: ActorRef) = self.link(actorRef)
-
- /**
- * Unlink the actor.
- *
- * To be invoked from within the actor itself.
- */
- protected[this] def unlink(actorRef: ActorRef) = self.unlink(actorRef)
-
- /**
- * Atomically start and link an actor.
- *
- * To be invoked from within the actor itself.
- */
- protected[this] def startLink(actorRef: ActorRef) = self.startLink(actorRef)
-
- /**
- * Atomically start, link and make an actor remote.
- *
- * To be invoked from within the actor itself.
- */
- protected[this] def startLinkRemote(actorRef: ActorRef, hostname: String, port: Int) =
- self.startLinkRemote(actorRef, hostname, port)
-
- /**
- * Atomically create (from actor class) and start an actor.
- *
- * To be invoked from within the actor itself.
- */
- protected[this] def spawn[T <: Actor : Manifest]: ActorRef = self.spawn[T]
-
- /**
- * Atomically create (from actor class), start and make an actor remote.
- *
- * To be invoked from within the actor itself.
- */
- protected[this] def spawnRemote[T <: Actor: Manifest](hostname: String, port: Int): ActorRef =
- self.spawnRemote[T](hostname, port)
-
- /**
- * Atomically create (from actor class), start and link an actor.
- *
- * To be invoked from within the actor itself.
- */
- protected[this] def spawnLink[T <: Actor: Manifest]: ActorRef = self.spawnLink[T]
-
- /**
- * User overridable callback/setting.
- *
- * Set to true if messages should have REQUIRES_NEW semantics, e.g. a new transaction should
- * start if there is no one running, else it joins the existing transaction.
- */
- protected[this] def isTransactor_=(flag: Boolean) = self.isTransactor = flag
-
// =========================================
// ==== INTERNAL IMPLEMENTATION DETAILS ====
// =========================================
- private[akka] def base: PartialFunction[Any, Unit] = lifeCycles orElse (_hotswap getOrElse receive)
+ private[akka] def base: PartialFunction[Any, Unit] = lifeCycles orElse (self.hotswap getOrElse receive)
private val lifeCycles: PartialFunction[Any, Unit] = {
- case HotSwap(code) => _hotswap = code
+ case HotSwap(code) => self.hotswap = code
case Restart(reason) => self.restart(reason)
case Exit(dead, reason) => self.handleTrapExit(dead, reason)
case Unlink(child) => self.unlink(child)
@@ -631,15 +360,11 @@ trait Actor extends Logging {
case Kill => throw new ActorKilledException("Actor [" + toString + "] was killed by a Kill message")
}
- override def hashCode(): Int = HashCode.hash(HashCode.SEED, uuid)
+ override def hashCode: Int = self.hashCode
- override def equals(that: Any): Boolean = {
- that != null &&
- that.isInstanceOf[Actor] &&
- that.asInstanceOf[Actor].uuid == uuid
- }
+ override def equals(that: Any): Boolean = self.equals(that)
- override def toString = "Actor[" + id + ":" + uuid + "]"
+ override def toString = self.toString
}
/**
diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala
index 6a32a48ba6..5f8a6d93c8 100644
--- a/akka-core/src/main/scala/actor/ActorRef.scala
+++ b/akka-core/src/main/scala/actor/ActorRef.scala
@@ -14,7 +14,7 @@ import se.scalablesolutions.akka.stm.TransactionManagement
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.{RemoteRequestProtocol, RemoteReplyProtocol, ActorRefProtocol}
import se.scalablesolutions.akka.remote.{RemoteNode, RemoteServer, RemoteClient, RemoteProtocolBuilder, RemoteRequestProtocolIdFactory}
import se.scalablesolutions.akka.serialization.Serializer
-import se.scalablesolutions.akka.util.{HashCode, Logging, UUID, ReadWriteLock}
+import se.scalablesolutions.akka.util.{HashCode, Logging, UUID, ReentrantGuard}
import org.multiverse.api.ThreadLocalTransaction._
import org.multiverse.commitbarriers.CountDownCommitBarrier
@@ -22,7 +22,7 @@ import org.multiverse.commitbarriers.CountDownCommitBarrier
import jsr166x.{Deque, ConcurrentLinkedDeque}
import java.net.InetSocketAddress
-import java.util.concurrent.locks.{Lock, ReentrantLock}
+import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.ConcurrentHashMap
import java.util.{Map => JMap}
@@ -115,10 +115,89 @@ trait ActorRef extends TransactionManagement {
@volatile protected[this] var _isSuspended = true
@volatile protected[this] var _isShutDown = false
@volatile protected[akka] var _isKilled = false
+ @volatile protected[akka] var _homeAddress = new InetSocketAddress(RemoteServer.HOSTNAME, RemoteServer.PORT)
@volatile protected[akka] var startOnCreation = false
@volatile protected[akka] var registeredInRemoteNodeDuringSerialization = false
- protected[this] val guard = new ReadWriteLock
+ protected[this] val guard = new ReentrantGuard
+
+ /**
+ * User overridable callback/setting.
+ *
+ * Identifier for actor, does not have to be a unique one. Default is the 'uuid'.
+ *
+ * This field is used for logging, AspectRegistry.actorsFor, identifier for remote
+ * actor in RemoteServer etc.But also as the identifier for persistence, which means
+ * that you can use a custom name to be able to retrieve the "correct" persisted state
+ * upon restart, remote restart etc.
+ */
+ @volatile var id: String = _uuid
+
+ /**
+ * User overridable callback/setting.
+ *
+ * Defines the default timeout for '!!' and '!!!' invocations,
+ * e.g. the timeout for the future returned by the call to '!!' and '!!!'.
+ */
+ @volatile var timeout: Long = Actor.TIMEOUT
+
+ /**
+ * User overridable callback/setting.
+ *
+ * Set trapExit to the list of exception classes that the actor should be able to trap
+ * from the actor it is supervising. When the supervising actor throws these exceptions
+ * then they will trigger a restart.
+ *
+ * + * // trap no exceptions + * trapExit = Nil + * + * // trap all exceptions + * trapExit = List(classOf[Throwable]) + * + * // trap specific exceptions only + * trapExit = List(classOf[MyApplicationException], classOf[MyApplicationError]) + *+ */ + @volatile var trapExit: List[Class[_ <: Throwable]] = Nil + + /** + * User overridable callback/setting. + * + * If 'trapExit' is set for the actor to act as supervisor, then a faultHandler must be defined. + * Can be one of: + * + * faultHandler = Some(AllForOneStrategy(maxNrOfRetries, withinTimeRange)) + * + * faultHandler = Some(OneForOneStrategy(maxNrOfRetries, withinTimeRange)) + * + */ + @volatile var faultHandler: Option[FaultHandlingStrategy] = None + + /** + * User overridable callback/setting. + * + * Defines the life-cycle for a supervised actor. + */ + @volatile var lifeCycle: Option[LifeCycle] = None + + /** + * The default dispatcher is the Dispatchers.globalExecutorBasedEventDrivenDispatcher. + * This means that all actors will share the same event-driven executor based dispatcher. + * + * You can override it so it fits the specific use-case that the actor is used for. + * See the se.scalablesolutions.akka.dispatch.Dispatchers class for the different + * dispatchers available. + * + * The default is also that all actors that are created and spawned from within this actor + * is sharing the same dispatcher as its creator. + */ + private[akka] var _dispatcher: MessageDispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher + + /** + * Holds the hot swapped partial function. + */ + protected[akka] var hotswap: Option[PartialFunction[Any, Unit]] = None // FIXME: _hotswap should be a stack /** * User overridable callback/setting. @@ -132,7 +211,7 @@ trait ActorRef extends TransactionManagement { * This lock ensures thread safety in the dispatching: only one message can * be dispatched at once on the actor. */ - protected[akka] val dispatcherLock: Lock = new ReentrantLock + protected[akka] val dispatcherLock = new ReentrantLock /** * Holds the reference to the sender of the currently processed message. @@ -141,10 +220,8 @@ trait ActorRef extends TransactionManagement { * - Is Some(Right(CompletableFuture)) if sender is holding on to a Future for the result */ protected[this] var _replyTo: Option[Either[ActorRef, CompletableFuture[Any]]] = None - protected[akka] def replyTo: Option[Either[ActorRef, CompletableFuture[Any]]] = - guard.withReadLock { _replyTo } - protected[akka] def replyTo_=(rt: Option[Either[ActorRef, CompletableFuture[Any]]]) = - guard.withWriteLock { _replyTo = rt } + protected[akka] def replyTo: Option[Either[ActorRef, CompletableFuture[Any]]] = guard.withGuard { _replyTo } + protected[akka] def replyTo_=(rt: Option[Either[ActorRef, CompletableFuture[Any]]]) = guard.withGuard { _replyTo = rt } /** * Is the actor killed? @@ -165,6 +242,10 @@ trait ActorRef extends TransactionManagement { * Returns the uuid for the actor. */ def uuid = _uuid + + /** + * Only for internal use. UUID is effectively final. + */ protected[akka] def uuid_=(uid: String) = _uuid = uid /** @@ -196,7 +277,7 @@ trait ActorRef extends TransactionManagement { * NOTE: * Use this method with care. In most cases it is better to use '!' together with the 'sender' member field to * implement request/response message exchanges. - * If you are sending messages using
!! then you have to use reply(..)
+ * If you are sending messages using !! then you have to use self.reply(..)
* to send a reply message to the original sender. If not then the sender will block until the timeout expires.
*/
def !: Option[T] = {
@@ -204,8 +285,9 @@ trait ActorRef extends TransactionManagement {
if (isRunning) {
val future = postMessageToMailboxAndCreateFutureResultWithTimeout[T](message, timeout, None)
val isActiveObject = message.isInstanceOf[Invocation]
- if (isActiveObject && message.asInstanceOf[Invocation].isVoid)
+ if (isActiveObject && message.asInstanceOf[Invocation].isVoid) {
future.asInstanceOf[CompletableFuture[Option[_]]].completeWithResult(None)
+ }
try {
future.await
} catch {
@@ -232,7 +314,7 @@ trait ActorRef extends TransactionManagement {
* Use this method with care. In most cases it is better to use '!' together with the 'sender' member field to
* implement request/response message exchanges.
*
- * If you are sending messages using !! then you have to use reply(..)
+ * If you are sending messages using !! then you have to use self.reply(..)
* to send a reply message to the original sender. If not then the sender will block until the timeout expires.
*/
def !(implicit sender: Option[ActorRef] = None): Option[T] = !
@@ -243,7 +325,7 @@ trait ActorRef extends TransactionManagement {
* NOTE:
* Use this method with care. In most cases it is better to use '!' together with the 'sender' member field to
* implement request/response message exchanges.
- * If you are sending messages using !!! then you have to use reply(..)
+ * If you are sending messages using !!! then you have to use self.reply(..)
* to send a reply message to the original sender. If not then the sender will block until the timeout expires.
*/
def !!: Future[T] = {
@@ -269,6 +351,39 @@ trait ActorRef extends TransactionManagement {
} else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
}
+ /**
+ * Use self.reply(..) to reply with a message to the original sender of the message currently
+ * being processed.
+ *
+ * Throws an IllegalStateException if unable to determine what to reply to
+ */
+ def reply(message: Any) = if(!reply_?(message)) throw new IllegalStateException(
+ "\n\tNo sender in scope, can't reply. " +
+ "\n\tYou have probably used the '!' method to either; " +
+ "\n\t\t1. Send a message to a remote actor which does not have a contact address." +
+ "\n\t\t2. Send a message from an instance that is *not* an actor" +
+ "\n\t\t3. Send a message to an Active Object annotated with the '@oneway' annotation? " +
+ "\n\tIf so, switch to '!!' (or remove '@oneway') which passes on an implicit future" +
+ "\n\tthat will be bound by the argument passed to 'reply'." +
+ "\n\tAlternatively, you can use setReplyToAddress to make sure the actor can be contacted over the network.")
+
+ /**
+ * Use reply_?(..) to reply with a message to the original sender of the message currently
+ * being processed.
+ *
+ * Returns true if reply was sent, and false if unable to determine what to reply to.
+ */
+ def reply_?(message: Any): Boolean = replyTo match {
+ case Some(Left(actor)) =>
+ actor ! message
+ true
+ case Some(Right(future: Future[Any])) =>
+ future completeWithResult message
+ true
+ case _ =>
+ false
+ }
+
/**
* Serializes the ActorRef instance into a byte array (Array[Byte]).
*/
@@ -310,22 +425,20 @@ trait ActorRef extends TransactionManagement {
def makeTransactionRequired: Unit
/**
- * Set the contact address for this actor. This is used for replying to messages
- * sent asynchronously when no reply channel exists.
+ * Returns the home address and port for this actor.
*/
- def setReplyToAddress(hostname: String, port: Int): Unit =
- setReplyToAddress(new InetSocketAddress(hostname, port))
+ def homeAddress: InetSocketAddress = _homeAddress
/**
- * Set the contact address for this actor. This is used for replying to messages
- * sent asynchronously when no reply channel exists.
+ * Set the home address and port for this actor.
*/
- def setReplyToAddress(address: InetSocketAddress): Unit
-
+ def homeAddress_=(hostnameAndPort: Tuple2[String, Int]): Unit =
+ homeAddress_=(new InetSocketAddress(hostnameAndPort._1, hostnameAndPort._2))
+
/**
- * Returns the id for the actor.
+ * Set the home address and port for this actor.
*/
- def id: String
+ def homeAddress_=(address: InetSocketAddress): Unit
/**
* Returns the remote address for the actor, if any, else None.
@@ -333,20 +446,6 @@ trait ActorRef extends TransactionManagement {
def remoteAddress: Option[InetSocketAddress]
protected[akka] def remoteAddress_=(addr: Option[InetSocketAddress]): Unit
- /**
- * User overridable callback/setting.
- *
- * Defines the default timeout for '!!' and '!!!' invocations,
- * e.g. the timeout for the future returned by the call to '!!' and '!!!'.
- */
- def timeout: Long
-
- /**
- * Sets the default timeout for '!!' and '!!!' invocations,
- * e.g. the timeout for the future returned by the call to '!!' and '!!!'.
- */
- def timeout_=(t: Long)
-
/**
* Starts up the actor and its message queue.
*/
@@ -456,15 +555,6 @@ trait ActorRef extends TransactionManagement {
protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit
- protected[akka] def trapExit: List[Class[_ <: Throwable]] = actor.trapExit
- protected[akka] def trapExit_=(exits: List[Class[_ <: Throwable]]) = actor.trapExit = exits
-
- protected[akka] def lifeCycle: Option[LifeCycle] = actor.lifeCycle
- protected[akka] def lifeCycle_=(cycle: Option[LifeCycle]) = actor.lifeCycle = cycle
-
- protected[akka] def faultHandler: Option[FaultHandlingStrategy] = actor.faultHandler
- protected[akka] def faultHandler_=(handler: Option[FaultHandlingStrategy]) = actor.faultHandler = handler
-
protected[akka] def mailbox: Deque[MessageInvocation]
protected[akka] def restart(reason: Throwable): Unit
@@ -479,20 +569,38 @@ trait ActorRef extends TransactionManagement {
protected[akka] def linkedActorsAsList: List[ActorRef]
- override def toString: String
-
- override def hashCode: Int
-
- override def equals(that: Any): Boolean
+ override def hashCode: Int = HashCode.hash(HashCode.SEED, uuid)
+
+ override def equals(that: Any): Boolean = {
+ that != null &&
+ that.isInstanceOf[ActorRef] &&
+ that.asInstanceOf[ActorRef].uuid == uuid
+ }
+
+ override def toString = "Actor[" + id + ":" + uuid + "]"
+
+ protected def processSender(senderOption: Option[ActorRef], requestBuilder: RemoteRequestProtocol.Builder) = {
+ senderOption.foreach { sender =>
+ val address = sender.homeAddress
+ val server = RemoteServer.serverFor(address) match {
+ case Some(server) => server
+ case None => (new RemoteServer).start(address)
+ }
+ server.register(sender.uuid, sender)
+ requestBuilder.setSender(sender.toProtocol)
+ }
+ }
}
/**
+ * Local ActorRef that is used when referencing the Actor on its "home" node.
+ *
* @author Jonas Bonér
*/
sealed class LocalActorRef private[akka](
private[this] var actorFactory: Either[Option[Class[_ <: Actor]], Option[() => Actor]] = Left(None))
extends ActorRef {
-
+
private[akka] def this(clazz: Class[_ <: Actor]) = this(Left(Some(clazz)))
private[akka] def this(factory: () => Actor) = this(Right(Some(factory)))
@@ -500,20 +608,21 @@ sealed class LocalActorRef private[akka](
@volatile private[akka] var _remoteAddress: Option[InetSocketAddress] = None
@volatile private[akka] var _linkedActors: Option[ConcurrentHashMap[String, ActorRef]] = None
@volatile private[akka] var _supervisor: Option[ActorRef] = None
- @volatile private[akka] var _replyToAddress: Option[InetSocketAddress] = None
protected[akka] val _mailbox: Deque[MessageInvocation] = new ConcurrentLinkedDeque[MessageInvocation]
protected[this] val actorInstance = new AtomicReference[Actor](newActor)
- if (startOnCreation) start
-
+ @volatile private var isInInitialization = false
+ @volatile private var runActorInitialization = false
+
+ if (runActorInitialization) initializeActorInstance
+
/**
* Serializes the ActorRef instance into a Protocol Buffers (protobuf) Message.
*/
- protected[akka] def toProtocol: ActorRefProtocol = guard.withWriteLock {
- val (host, port) = _replyToAddress.map(address =>
- (address.getHostName, address.getPort))
- .getOrElse((Actor.HOSTNAME, Actor.PORT))
+ protected[akka] def toProtocol: ActorRefProtocol = guard.withGuard {
+ val host = homeAddress.getHostName
+ val port = homeAddress.getPort
if (!registeredInRemoteNodeDuringSerialization) {
Actor.log.debug("Register serialized Actor [%s] as remote @ [%s:%s]", actorClass.getName, host, port)
@@ -549,8 +658,8 @@ sealed class LocalActorRef private[akka](
/**
* Sets the dispatcher for this actor. Needs to be invoked before the actor is started.
*/
- def dispatcher_=(md: MessageDispatcher): Unit = guard.withWriteLock {
- if (!isRunning) actor.dispatcher = md
+ def dispatcher_=(md: MessageDispatcher): Unit = guard.withGuard {
+ if (!isRunning) _dispatcher = md
else throw new IllegalArgumentException(
"Can not swap dispatcher for " + toString + " after it has been started")
}
@@ -558,7 +667,7 @@ sealed class LocalActorRef private[akka](
/**
* Get the dispatcher for this actor.
*/
- def dispatcher: MessageDispatcher = guard.withReadLock { actor.dispatcher }
+ def dispatcher: MessageDispatcher = guard.withGuard { _dispatcher }
/**
* Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host.
@@ -571,13 +680,13 @@ sealed class LocalActorRef private[akka](
/**
* Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host.
*/
- def makeRemote(address: InetSocketAddress): Unit = guard.withWriteLock {
+ def makeRemote(address: InetSocketAddress): Unit = guard.withGuard {
if (isRunning) throw new IllegalStateException(
"Can't make a running actor remote. Make sure you call 'makeRemote' before 'start'.")
else {
_remoteAddress = Some(address)
RemoteClient.register(address.getHostName, address.getPort, uuid)
- if (_replyToAddress.isEmpty) setReplyToAddress(Actor.HOSTNAME, Actor.PORT)
+ homeAddress = (RemoteServer.HOSTNAME, RemoteServer.PORT)
}
}
@@ -589,7 +698,7 @@ sealed class LocalActorRef private[akka](
* TransactionManagement.disableTransactions
*
*/
- def makeTransactionRequired = guard.withWriteLock {
+ def makeTransactionRequired = guard.withGuard {
if (isRunning) throw new IllegalArgumentException(
"Can not make actor transaction required after it has been started")
else isTransactor = true
@@ -599,57 +708,32 @@ sealed class LocalActorRef private[akka](
* Set the contact address for this actor. This is used for replying to messages
* sent asynchronously when no reply channel exists.
*/
- def setReplyToAddress(address: InetSocketAddress): Unit =
- guard.withReadLock { _replyToAddress = Some(address) }
-
- /**
- * Returns the id for the actor.
- */
- def id = actor.id
+ def homeAddress_=(address: InetSocketAddress): Unit = guard.withGuard { _homeAddress = address }
/**
* Returns the remote address for the actor, if any, else None.
*/
- def remoteAddress: Option[InetSocketAddress] = guard.withReadLock { _remoteAddress }
+ def remoteAddress: Option[InetSocketAddress] = guard.withGuard { _remoteAddress }
protected[akka] def remoteAddress_=(addr: Option[InetSocketAddress]): Unit =
- guard.withWriteLock { _remoteAddress = addr }
-
- /**
- * User overridable callback/setting.
- *
- * Defines the default timeout for '!!' and '!!!' invocations,
- * e.g. the timeout for the future returned by the call to '!!' and '!!!'.
- */
- def timeout: Long = actor.timeout
-
- /**
- * Sets the default timeout for '!!' and '!!!' invocations,
- * e.g. the timeout for the future returned by the call to '!!' and '!!!'.
- */
- def timeout_=(t: Long) = actor.timeout = t
+ guard.withGuard { _remoteAddress = addr }
/**
* Starts up the actor and its message queue.
*/
- def start: ActorRef = guard.withWriteLock {
+ def start: ActorRef = guard.withGuard {
if (isShutdown) throw new IllegalStateException(
"Can't restart an actor that has been shut down with 'stop' or 'exit'")
if (!isRunning) {
- dispatcher.register(this)
- dispatcher.start
- _isRunning = true
- actor.init
- actor.initTransactionalState
+ if (!isInInitialization) initializeActorInstance
+ else runActorInitialization = true
}
- Actor.log.debug("[%s] has started", toString)
- ActorRegistry.register(this)
this
}
/**
* Shuts down the actor its dispatcher and message queue.
*/
- def stop = guard.withWriteLock {
+ def stop = guard.withGuard {
if (isRunning) {
dispatcher.unregister(this)
_isRunning = false
@@ -672,7 +756,7 @@ sealed class LocalActorRef private[akka](
*
* To be invoked from within the actor itself.
*/
- def link(actorRef: ActorRef) = guard.withWriteLock {
+ def link(actorRef: ActorRef) = guard.withGuard {
if (actorRef.supervisor.isDefined) throw new IllegalStateException(
"Actor can only have one supervisor [" + actorRef + "], e.g. link(actor) fails")
linkedActors.put(actorRef.uuid, actorRef)
@@ -685,7 +769,7 @@ sealed class LocalActorRef private[akka](
*
* To be invoked from within the actor itself.
*/
- def unlink(actorRef: ActorRef) = guard.withWriteLock {
+ def unlink(actorRef: ActorRef) = guard.withGuard {
if (!linkedActors.containsKey(actorRef.uuid)) throw new IllegalStateException(
"Actor [" + actorRef + "] is not a linked actor, can't unlink")
linkedActors.remove(actorRef.uuid)
@@ -698,7 +782,7 @@ sealed class LocalActorRef private[akka](
*
* To be invoked from within the actor itself.
*/
- def startLink(actorRef: ActorRef) = guard.withWriteLock {
+ def startLink(actorRef: ActorRef) = guard.withGuard {
try {
actorRef.start
} finally {
@@ -711,7 +795,7 @@ sealed class LocalActorRef private[akka](
*
* To be invoked from within the actor itself.
*/
- def startLinkRemote(actorRef: ActorRef, hostname: String, port: Int) = guard.withWriteLock {
+ def startLinkRemote(actorRef: ActorRef, hostname: String, port: Int) = guard.withGuard {
try {
actorRef.makeRemote(hostname, port)
actorRef.start
@@ -725,7 +809,7 @@ sealed class LocalActorRef private[akka](
*
* To be invoked from within the actor itself.
*/
- def spawn[T <: Actor : Manifest]: ActorRef = guard.withWriteLock {
+ def spawn[T <: Actor : Manifest]: ActorRef = guard.withGuard {
val actorRef = spawnButDoNotStart[T]
actorRef.start
actorRef
@@ -736,7 +820,7 @@ sealed class LocalActorRef private[akka](
*
* To be invoked from within the actor itself.
*/
- def spawnRemote[T <: Actor: Manifest](hostname: String, port: Int): ActorRef = guard.withWriteLock {
+ def spawnRemote[T <: Actor: Manifest](hostname: String, port: Int): ActorRef = guard.withGuard {
val actor = spawnButDoNotStart[T]
actor.makeRemote(hostname, port)
actor.start
@@ -748,7 +832,7 @@ sealed class LocalActorRef private[akka](
*
* To be invoked from within the actor itself.
*/
- def spawnLink[T <: Actor: Manifest]: ActorRef = guard.withWriteLock {
+ def spawnLink[T <: Actor: Manifest]: ActorRef = guard.withGuard {
val actor = spawnButDoNotStart[T]
try {
actor.start
@@ -763,7 +847,7 @@ sealed class LocalActorRef private[akka](
*
* To be invoked from within the actor itself.
*/
- def spawnLinkRemote[T <: Actor : Manifest](hostname: String, port: Int): ActorRef = guard.withWriteLock {
+ def spawnLinkRemote[T <: Actor : Manifest](hostname: String, port: Int): ActorRef = guard.withGuard {
val actor = spawnButDoNotStart[T]
try {
actor.makeRemote(hostname, port)
@@ -781,25 +865,19 @@ sealed class LocalActorRef private[akka](
/**
* Shuts down and removes all linked actors.
*/
- def shutdownLinkedActors: Unit = guard.withWriteLock {
+ def shutdownLinkedActors: Unit = guard.withGuard {
linkedActorsAsList.foreach(_.stop)
linkedActors.clear
}
- override def toString: String = actor.toString
-
- override def hashCode: Int = actor.hashCode
-
- override def equals(that: Any): Boolean = actor.equals(that)
-
/**
* Returns the supervisor, if there is one.
*/
- def supervisor: Option[ActorRef] = guard.withReadLock { _supervisor }
+ def supervisor: Option[ActorRef] = guard.withGuard { _supervisor }
- protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit = guard.withWriteLock { _supervisor = sup }
+ protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit = guard.withGuard { _supervisor = sup }
- private def spawnButDoNotStart[T <: Actor: Manifest]: ActorRef = guard.withWriteLock {
+ private def spawnButDoNotStart[T <: Actor: Manifest]: ActorRef = guard.withGuard {
val actor = manifest[T].erasure.asInstanceOf[Class[T]].newInstance
val actorRef = Actor.newActor(() => actor)
if (!dispatcher.isInstanceOf[ThreadBasedDispatcher]) {
@@ -809,6 +887,7 @@ sealed class LocalActorRef private[akka](
}
private[this] def newActor: Actor = {
+ isInInitialization = true
Actor.actorRefInCreation.value = Some(this)
val actor = actorFactory match {
case Left(Some(clazz)) =>
@@ -828,6 +907,7 @@ sealed class LocalActorRef private[akka](
}
if (actor eq null) throw new ActorInitializationException(
"Actor instance passed to ActorRef can not be 'null'")
+ isInInitialization = false
actor
}
@@ -846,8 +926,7 @@ sealed class LocalActorRef private[akka](
val id = registerSupervisorAsRemoteActor
if (id.isDefined) requestBuilder.setSupervisorUuid(id.get)
-
- senderOption.foreach(sender => requestBuilder.setSender(sender.toProtocol))
+ processSender(senderOption, requestBuilder)
RemoteProtocolBuilder.setMessage(message, requestBuilder)
RemoteClient.clientFor(remoteAddress.get).send[Any](requestBuilder.build, None)
@@ -897,26 +976,6 @@ sealed class LocalActorRef private[akka](
}
}
- protected[akka] def restart(reason: Throwable): Unit = {
- Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id)
- val failedActor = actorInstance.get
- failedActor.synchronized {
- Actor.log.debug("Restarting linked actors for actor [%s].", id)
- restartLinkedActors(reason)
- Actor.log.debug("Invoking 'preRestart' for failed actor instance [%s].", id)
- failedActor.preRestart(reason)
- stop
- val freshActor = newActor
- freshActor.synchronized {
- actorInstance.set(freshActor)
- start
- Actor.log.debug("Invoking 'postRestart' for new actor instance [%s].", id)
- freshActor.postRestart(reason)
- }
- _isKilled = false
- }
- }
-
private def joinTransaction(message: Any) = if (isTransactionSetInScope) {
// FIXME test to run bench without this trace call
Actor.log.trace("Joining transaction set [%s];\n\tactor %s\n\twith message [%s]",
@@ -938,11 +997,11 @@ sealed class LocalActorRef private[akka](
}
}
- private def dispatch[T](messageHandle: MessageInvocation) = guard.withWriteLock {
+ private def dispatch[T](messageHandle: MessageInvocation) = {
setTransactionSet(messageHandle.transactionSet)
val message = messageHandle.message //serializeMessage(messageHandle.message)
- replyTo = messageHandle.replyTo
+ _replyTo = messageHandle.replyTo
try {
if (actor.base.isDefinedAt(message)) actor.base(message) // invoke user actor's receive partial function
@@ -963,7 +1022,7 @@ sealed class LocalActorRef private[akka](
}
}
- private def transactionalDispatch[T](messageHandle: MessageInvocation) = guard.withWriteLock {
+ private def transactionalDispatch[T](messageHandle: MessageInvocation) = {
var topLevelTransaction = false
val txSet: Option[CountDownCommitBarrier] =
if (messageHandle.transactionSet.isDefined) messageHandle.transactionSet
@@ -979,7 +1038,7 @@ sealed class LocalActorRef private[akka](
setTransactionSet(txSet)
val message = messageHandle.message //serializeMessage(messageHandle.message)
- replyTo = messageHandle.replyTo
+ _replyTo = messageHandle.replyTo
def proceed = {
if (actor.base.isDefinedAt(message)) actor.base(message) // invoke user actor's receive partial function
@@ -1020,21 +1079,49 @@ sealed class LocalActorRef private[akka](
}
}
- protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = guard.withReadLock {
+ protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = {
if (trapExit.exists(_.isAssignableFrom(reason.getClass))) {
if (faultHandler.isDefined) {
faultHandler.get match {
// FIXME: implement support for maxNrOfRetries and withinTimeRange in RestartStrategy
- case AllForOneStrategy(maxNrOfRetries, withinTimeRange) => restartLinkedActors(reason)
- case OneForOneStrategy(maxNrOfRetries, withinTimeRange) => dead.restart(reason)
+ case AllForOneStrategy(maxNrOfRetries, withinTimeRange) =>
+ restartLinkedActors(reason)
+
+ case OneForOneStrategy(maxNrOfRetries, withinTimeRange) =>
+ dead.restart(reason)
}
} else throw new IllegalStateException(
"No 'faultHandler' defined for an actor with the 'trapExit' member field defined " +
"\n\tto non-empty list of exception classes - can't proceed " + toString)
- } else _supervisor.foreach(_ ! Exit(dead, reason)) // if 'trapExit' is not defined then pass the Exit on
+ } else {
+ _supervisor.foreach(_ ! Exit(dead, reason)) // if 'trapExit' is not defined then pass the Exit on
+ }
}
- protected[akka] def restartLinkedActors(reason: Throwable) = guard.withWriteLock {
+ protected[akka] def restart(reason: Throwable): Unit = {
+ Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id)
+ restartLinkedActors(reason)
+ val failedActor = actorInstance.get
+ failedActor.synchronized {
+ Actor.log.debug("Restarting linked actors for actor [%s].", id)
+ Actor.log.debug("Invoking 'preRestart' for failed actor instance [%s].", id)
+ failedActor.preRestart(reason)
+ failedActor.shutdown
+ _isRunning = false
+ val freshActor = newActor
+ freshActor.synchronized {
+ freshActor.init
+ freshActor.initTransactionalState
+ _isRunning = true
+ actorInstance.set(freshActor)
+ Actor.log.debug("Invoking 'postRestart' for new actor instance [%s].", id)
+ freshActor.postRestart(reason)
+ }
+ _isKilled = false
+ }
+ }
+
+ protected[akka] def restartLinkedActors(reason: Throwable) = guard.withGuard {
linkedActorsAsList.foreach { actorRef =>
if (actorRef.lifeCycle.isEmpty) actorRef.lifeCycle = Some(LifeCycle(Permanent))
actorRef.lifeCycle.get match {
@@ -1060,14 +1147,14 @@ sealed class LocalActorRef private[akka](
}
}
- protected[akka] def registerSupervisorAsRemoteActor: Option[String] = guard.withWriteLock {
+ protected[akka] def registerSupervisorAsRemoteActor: Option[String] = guard.withGuard {
if (_supervisor.isDefined) {
RemoteClient.clientFor(remoteAddress.get).registerSupervisorForActor(this)
Some(_supervisor.get.uuid)
} else None
}
- protected[akka] def linkedActors: JMap[String, ActorRef] = guard.withWriteLock {
+ protected[akka] def linkedActors: JMap[String, ActorRef] = guard.withGuard {
if (_linkedActors.isEmpty) {
val actors = new ConcurrentHashMap[String, ActorRef]
_linkedActors = Some(actors)
@@ -1078,6 +1165,17 @@ sealed class LocalActorRef private[akka](
protected[akka] def linkedActorsAsList: List[ActorRef] =
linkedActors.values.toArray.toList.asInstanceOf[List[ActorRef]]
+ private def initializeActorInstance = if (!isRunning) {
+ dispatcher.register(this)
+ dispatcher.start
+ actor.init // run actor init and initTransactionalState callbacks
+ actor.initTransactionalState
+ Actor.log.debug("[%s] has started", toString)
+ ActorRegistry.register(this)
+ if (id == "N/A") id = actorClass.getName // if no name set, then use default name (class name)
+ _isRunning = true
+ }
+
private def serializeMessage(message: AnyRef): AnyRef = if (Actor.SERIALIZE_MESSAGES) {
if (!message.isInstanceOf[String] &&
!message.isInstanceOf[Byte] &&
@@ -1105,8 +1203,9 @@ sealed class LocalActorRef private[akka](
}
/**
- * Remote Actor proxy.
- *
+ * Remote ActorRef that is used when referencing the Actor on a different node than its "home" node.
+ * This reference is network-aware (remembers its origin) and immutable.
+ *
* @author Jonas Bonér
*/
private[akka] case class RemoteActorRef private[akka] (
@@ -1114,8 +1213,10 @@ private[akka] case class RemoteActorRef private[akka] (
uuuid: String, val className: String, val hostname: String, val port: Int, _timeout: Long)
extends ActorRef {
_uuid = uuuid
+ timeout = _timeout
start
+ Thread.sleep(1000)
val remoteClient = RemoteClient.clientFor(hostname, port)
def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = {
@@ -1127,7 +1228,7 @@ private[akka] case class RemoteActorRef private[akka] (
.setIsActor(true)
.setIsOneWay(true)
.setIsEscaped(false)
- senderOption.foreach(sender => requestBuilder.setSender(sender.toProtocol))
+ processSender(senderOption, requestBuilder)
RemoteProtocolBuilder.setMessage(message, requestBuilder)
remoteClient.send[Any](requestBuilder.build, None)
}
@@ -1144,15 +1245,12 @@ private[akka] case class RemoteActorRef private[akka] (
.setIsActor(true)
.setIsOneWay(false)
.setIsEscaped(false)
- //senderOption.foreach(sender => requestBuilder.setSender(sender.toProtocol))
RemoteProtocolBuilder.setMessage(message, requestBuilder)
val future = remoteClient.send(requestBuilder.build, senderFuture)
if (future.isDefined) future.get
else throw new IllegalStateException("Expected a future from remote call to actor " + toString)
}
- def timeout: Long = _timeout
-
def start: ActorRef = {
_isRunning = true
this
@@ -1168,13 +1266,11 @@ private[akka] case class RemoteActorRef private[akka] (
def actorClass: Class[_ <: Actor] = unsupported
def dispatcher_=(md: MessageDispatcher): Unit = unsupported
def dispatcher: MessageDispatcher = unsupported
+ def makeTransactionRequired: Unit = unsupported
def makeRemote(hostname: String, port: Int): Unit = unsupported
def makeRemote(address: InetSocketAddress): Unit = unsupported
- def makeTransactionRequired: Unit = unsupported
- def setReplyToAddress(address: InetSocketAddress): Unit = unsupported
- def id: String = unsupported
+ def homeAddress_=(address: InetSocketAddress): Unit = unsupported
def remoteAddress: Option[InetSocketAddress] = unsupported
- def timeout_=(t: Long) = unsupported
def link(actorRef: ActorRef): Unit = unsupported
def unlink(actorRef: ActorRef): Unit = unsupported
def startLink(actorRef: ActorRef): Unit = unsupported
@@ -1198,5 +1294,6 @@ private[akka] case class RemoteActorRef private[akka] (
protected[akka] def remoteAddress_=(addr: Option[InetSocketAddress]): Unit = unsupported
protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit = unsupported
protected[this] def actorInstance: AtomicReference[Actor] = unsupported
+
private def unsupported = throw new UnsupportedOperationException("Not supported for RemoteActorRef")
}
diff --git a/akka-core/src/main/scala/actor/Agent.scala b/akka-core/src/main/scala/actor/Agent.scala
index f949d94777..ac788b213a 100644
--- a/akka-core/src/main/scala/actor/Agent.scala
+++ b/akka-core/src/main/scala/actor/Agent.scala
@@ -98,39 +98,12 @@ class AgentException private[akka](message: String) extends RuntimeException(mes
* @author Viktor Klang
* @author Jonas Bonér
*/
-sealed class Agent[T] private (initialValue: T) extends Transactor {
+sealed class Agent[T] private (initialValue: T) {
import Agent._
import Actor._
-// _selfSenderRef = Some(newActor(() => this).start)
- log.debug("Starting up Agent [%s]", uuid)
-
- private lazy val value = Ref[T]()
-
- self ! Value(initialValue)
-
- /**
- * Periodically handles incoming messages.
- */
- def receive = {
- case Value(v: T) =>
- swap(v)
- case Function(fun: (T => T)) =>
- swap(fun(value.getOrWait))
- case Procedure(proc: (T => Unit)) =>
- proc(copyStrategy(value.getOrElse(throw new AgentException("Could not read Agent's value; value is null"))))
- }
-
- /**
- * Specifies how a copy of the value is made, defaults to using identity.
- */
- protected def copyStrategy(t: T): T = t
-
- /**
- * Performs a CAS operation, atomically swapping the internal state with the value
- * provided as a by-name parameter.
- */
- private final def swap(newData: => T): Unit = value.swap(newData)
+ private val dispatcher = newActor(() => new AgentDispatcher[T](initialValue)).start
+ dispatcher ! Value(initialValue)
/**
* Submits a request to read the internal state.
@@ -140,8 +113,9 @@ sealed class Agent[T] private (initialValue: T) extends Transactor {
* method and then waits for its result on a CountDownLatch.
*/
final def get: T = {
- if (self.isTransactionInScope) throw new AgentException(
- "Can't call Agent.get within an enclosing transaction.\n\tWould block indefinitely.\n\tPlease refactor your code.")
+ if (dispatcher.isTransactionInScope) throw new AgentException(
+ "Can't call Agent.get within an enclosing transaction."+
+ "\n\tWould block indefinitely.\n\tPlease refactor your code.")
val ref = new AtomicReference[T]
val latch = new CountDownLatch(1)
sendProc((v: T) => {ref.set(v); latch.countDown})
@@ -159,22 +133,22 @@ sealed class Agent[T] private (initialValue: T) extends Transactor {
/**
* Submits the provided function for execution against the internal agent's state.
*/
- final def apply(message: (T => T)): Unit = self ! Function(message)
+ final def apply(message: (T => T)): Unit = dispatcher ! Function(message)
/**
* Submits a new value to be set as the new agent's internal state.
*/
- final def apply(message: T): Unit = self ! Value(message)
+ final def apply(message: T): Unit = dispatcher ! Value(message)
/**
* Submits the provided function of type 'T => T' for execution against the internal agent's state.
*/
- final def send(message: (T) => T): Unit = self ! Function(message)
+ final def send(message: (T) => T): Unit = dispatcher ! Function(message)
/**
* Submits a new value to be set as the new agent's internal state.
*/
- final def send(message: T): Unit = self ! Value(message)
+ final def send(message: T): Unit = dispatcher ! Value(message)
/**
* Asynchronously submits a procedure of type 'T => Unit' to read the internal state.
@@ -182,7 +156,7 @@ sealed class Agent[T] private (initialValue: T) extends Transactor {
* of the internal state will be used, depending on the underlying effective copyStrategy.
* Does not change the value of the agent (this).
*/
- final def sendProc(f: (T) => Unit): Unit = self ! Procedure(f)
+ final def sendProc(f: (T) => Unit): Unit = dispatcher ! Procedure(f)
/**
* Applies function with type 'T => B' to the agent's internal state and then returns a new agent with the result.
@@ -207,7 +181,7 @@ sealed class Agent[T] private (initialValue: T) extends Transactor {
*
* A closed agent can never be used again.
*/
- def close = stop
+ def close = dispatcher.stop
}
/**
@@ -221,20 +195,44 @@ object Agent {
/*
* The internal messages for passing around requests.
*/
- private case class Value[T](value: T)
- private case class Function[T](fun: ((T) => T))
- private case class Procedure[T](fun: ((T) => Unit))
+ private[akka] case class Value[T](value: T)
+ private[akka] case class Function[T](fun: ((T) => T))
+ private[akka] case class Procedure[T](fun: ((T) => Unit))
/**
* Creates a new Agent of type T with the initial value of value.
*/
def apply[T](value: T): Agent[T] = new Agent(value)
+}
+
+/**
+ * Agent dispatcher Actor.
+ *
+ * @author Jonas Bonér
+ */
+final class AgentDispatcher[T] private[akka] (initialValue: T) extends Transactor {
+ import Agent._
+ import Actor._
+ log.debug("Starting up Agent [%s]", self.uuid)
+
+ private lazy val value = Ref[T]()
+
+ /**
+ * Periodically handles incoming messages.
+ */
+ def receive = {
+ case Value(v: T) =>
+ swap(v)
+ case Function(fun: (T => T)) =>
+ swap(fun(value.getOrWait))
+ case Procedure(proc: (T => Unit)) =>
+ proc(value.getOrElse(throw new AgentException("Could not read Agent's value; value is null")))
+ }
/**
- * Creates a new Agent of type T with the initial value of value and with the
- * specified copy function.
+ * Performs a CAS operation, atomically swapping the internal state with the value
+ * provided as a by-name parameter.
*/
- def apply[T](value: T, newCopyStrategy: (T) => T) = new Agent(value) {
- override def copyStrategy(t: T) = newCopyStrategy(t)
- }
+ private final def swap(newData: => T): Unit = value.swap(newData)
}
+
diff --git a/akka-core/src/main/scala/actor/Scheduler.scala b/akka-core/src/main/scala/actor/Scheduler.scala
index 8bb6bbcd19..d3429ef446 100644
--- a/akka-core/src/main/scala/actor/Scheduler.scala
+++ b/akka-core/src/main/scala/actor/Scheduler.scala
@@ -27,7 +27,7 @@ case class SchedulerException(msg: String, e: Throwable) extends RuntimeExceptio
* which is licensed under the Apache 2 License.
*/
class ScheduleActor(val receiver: ActorRef, val future: ScheduledFuture[AnyRef]) extends Actor with Logging {
- lifeCycle = Some(LifeCycle(Permanent))
+ self.lifeCycle = Some(LifeCycle(Permanent))
def receive = {
case UnSchedule =>
@@ -42,12 +42,12 @@ object Scheduler extends Actor {
private var service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory)
private val schedulers = new ConcurrentHashMap[ActorRef, ActorRef]
- faultHandler = Some(OneForOneStrategy(5, 5000))
- trapExit = List(classOf[Throwable])
+ self.faultHandler = Some(OneForOneStrategy(5, 5000))
+ self.trapExit = List(classOf[Throwable])
def schedule(receiver: ActorRef, message: AnyRef, initialDelay: Long, delay: Long, timeUnit: TimeUnit) = {
try {
- startLink(newActor(() => new ScheduleActor(
+ self.startLink(newActor(() => new ScheduleActor(
receiver,
service.scheduleAtFixedRate(new java.lang.Runnable {
def run = receiver ! message;
@@ -60,7 +60,7 @@ object Scheduler extends Actor {
def restart = service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory)
def stopSupervising(actorRef: ActorRef) = {
- unlink(actorRef)
+ self.unlink(actorRef)
schedulers.remove(actorRef)
}
diff --git a/akka-core/src/main/scala/actor/Supervisor.scala b/akka-core/src/main/scala/actor/Supervisor.scala
index 9e2bbe8e62..e366f102d5 100644
--- a/akka-core/src/main/scala/actor/Supervisor.scala
+++ b/akka-core/src/main/scala/actor/Supervisor.scala
@@ -164,7 +164,8 @@ class SupervisorFactory private[akka] (val config: SupervisorConfig) extends Log
*
* @author Jonas Bonér
*/
-sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExceptions: List[Class[_ <: Throwable]])
+sealed class Supervisor private[akka] (
+ handler: FaultHandlingStrategy, trapExceptions: List[Class[_ <: Throwable]])
extends Configurator {
private val children = new ConcurrentHashMap[String, List[ActorRef]]
@@ -202,7 +203,7 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExcep
else list
}
children.put(className, actorRef :: currentActors)
- actorRef.actor.lifeCycle = Some(lifeCycle)
+ actorRef.lifeCycle = Some(lifeCycle)
supervisor ! Link(actorRef)
remoteAddress.foreach(address => RemoteServer.actorsFor(
RemoteServer.Address(address.hostname, address.port))
@@ -241,16 +242,16 @@ final class SupervisorActor private[akka] (
handler: FaultHandlingStrategy,
trapExceptions: List[Class[_ <: Throwable]])
extends Actor {
-
- trapExit = trapExceptions
- faultHandler = Some(handler)
+ self.dispatcher = Dispatchers.newThreadBasedDispatcher(self)
+ self.trapExit = trapExceptions
+ self.faultHandler = Some(handler)
override def shutdown: Unit = self.shutdownLinkedActors
def receive = {
- case Link(child) => startLink(child)
- case Unlink(child) => unlink(child)
- case UnlinkAndStop(child) => unlink(child); child.stop
+ case Link(child) => self.startLink(child)
+ case Unlink(child) => self.unlink(child)
+ case UnlinkAndStop(child) => self.unlink(child); child.stop
case unknown => throw new IllegalArgumentException(
"Supervisor can only respond to 'Link' and 'Unlink' messages. Unknown message [" + unknown + "]")
}
diff --git a/akka-core/src/main/scala/dispatch/Dispatchers.scala b/akka-core/src/main/scala/dispatch/Dispatchers.scala
index 411ab297ea..6fa9ebd418 100644
--- a/akka-core/src/main/scala/dispatch/Dispatchers.scala
+++ b/akka-core/src/main/scala/dispatch/Dispatchers.scala
@@ -80,5 +80,5 @@ object Dispatchers {
*
* E.g. each actor consumes its own thread.
*/
- def newThreadBasedDispatcher(actor: Actor) = new ThreadBasedDispatcher(actor)
+ def newThreadBasedDispatcher(actor: ActorRef) = new ThreadBasedDispatcher(actor)
}
diff --git a/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala b/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala
index b5aaa3380c..d13b41e574 100644
--- a/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala
+++ b/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala
@@ -14,11 +14,10 @@ import se.scalablesolutions.akka.actor.{Actor, ActorRef, ActorMessageInvoker}
*
* @author Jonas Bonér
*/
-class ThreadBasedDispatcher private[akka] (val name: String, val messageHandler: MessageInvoker)
- extends MessageDispatcher {
-
- def this(actor: Actor) = this(actor.getClass.getName, new ActorMessageInvoker(Actor.newActor(() => actor)))
-
+class ThreadBasedDispatcher(actor: ActorRef) extends MessageDispatcher {
+ private val name = actor.getClass.getName + ":" + actor.uuid
+ private val threadName = "thread-based:dispatcher:" + name
+ private val messageHandler = new ActorMessageInvoker(actor)
private val queue = new BlockingMessageQueue(name)
private var selectorThread: Thread = _
@volatile private var active: Boolean = false
@@ -27,7 +26,7 @@ class ThreadBasedDispatcher private[akka] (val name: String, val messageHandler:
def start = if (!active) {
active = true
- selectorThread = new Thread {
+ selectorThread = new Thread(threadName) {
override def run = {
while (active) {
try {
diff --git a/akka-core/src/main/scala/dispatch/ThreadPoolBuilder.scala b/akka-core/src/main/scala/dispatch/ThreadPoolBuilder.scala
index 1fedc1a5d7..a5f097a84b 100644
--- a/akka-core/src/main/scala/dispatch/ThreadPoolBuilder.scala
+++ b/akka-core/src/main/scala/dispatch/ThreadPoolBuilder.scala
@@ -14,7 +14,7 @@ import se.scalablesolutions.akka.util.Logging
trait ThreadPoolBuilder {
val name: String
- private val NR_START_THREADS = 4
+ private val NR_START_THREADS = 16
private val NR_MAX_THREADS = 128
private val KEEP_ALIVE_TIME = 60000L // default is one minute
private val MILLISECONDS = TimeUnit.MILLISECONDS
diff --git a/akka-core/src/main/scala/remote/RemoteClient.scala b/akka-core/src/main/scala/remote/RemoteClient.scala
index 6888d2c6c7..57b3eb7048 100644
--- a/akka-core/src/main/scala/remote/RemoteClient.scala
+++ b/akka-core/src/main/scala/remote/RemoteClient.scala
@@ -182,6 +182,7 @@ class RemoteClient(val hostname: String, val port: Int) extends Logging {
futures.synchronized {
val futureResult = if (senderFuture.isDefined) senderFuture.get
else new DefaultCompletableFuture[T](request.getTimeout)
+ println("------ SETTING ID: " + request.getId + " " + name)
futures.put(request.getId, futureResult)
connection.getChannel.write(request)
Some(futureResult)
@@ -263,6 +264,7 @@ class RemoteClientHandler(val name: String,
if (result.isInstanceOf[RemoteReplyProtocol]) {
val reply = result.asInstanceOf[RemoteReplyProtocol]
log.debug("Remote client received RemoteReplyProtocol[\n%s]", reply.toString)
+ println("------ GETTING ID: " + reply.getId + " " + name)
val future = futures.get(reply.getId).asInstanceOf[CompletableFuture[Any]]
if (reply.getIsSuccessful) {
val message = RemoteProtocolBuilder.getMessage(reply)
diff --git a/akka-core/src/main/scala/remote/RemoteServer.scala b/akka-core/src/main/scala/remote/RemoteServer.scala
index 5dfde04747..1d0e67613e 100644
--- a/akka-core/src/main/scala/remote/RemoteServer.scala
+++ b/akka-core/src/main/scala/remote/RemoteServer.scala
@@ -112,6 +112,9 @@ object RemoteServer {
else Some(server)
}
+ private[akka] def serverFor(address: InetSocketAddress): Option[RemoteServer] =
+ serverFor(address.getHostName, address.getPort)
+
private[remote] def register(hostname: String, port: Int, server: RemoteServer) =
remoteServers.put(Address(hostname, port), server)
@@ -154,13 +157,22 @@ class RemoteServer extends Logging {
def isRunning = _isRunning
- def start: Unit = start(None)
+ def start: RemoteServer =
+ start(hostname, port, None)
- def start(loader: Option[ClassLoader]): Unit = start(hostname, port, loader)
+ def start(loader: Option[ClassLoader]): RemoteServer =
+ start(hostname, port, loader)
- def start(_hostname: String, _port: Int): Unit = start(_hostname, _port, None)
+ def start(address: InetSocketAddress): RemoteServer =
+ start(address.getHostName, address.getPort, None)
- def start(_hostname: String, _port: Int, loader: Option[ClassLoader]): Unit = synchronized {
+ def start(address: InetSocketAddress, loader: Option[ClassLoader]): RemoteServer =
+ start(address.getHostName, address.getPort, loader)
+
+ def start(_hostname: String, _port: Int): RemoteServer =
+ start(_hostname, _port, None)
+
+ def start(_hostname: String, _port: Int, loader: Option[ClassLoader]): RemoteServer = synchronized {
try {
if (!_isRunning) {
hostname = _hostname
@@ -182,6 +194,7 @@ class RemoteServer extends Logging {
} catch {
case e => log.error(e, "Could not start up remote server")
}
+ this
}
def shutdown = synchronized {
@@ -334,7 +347,7 @@ class RemoteServerHandler(
val actorRef = createActor(request.getTarget, request.getUuid, request.getTimeout)
actorRef.start
val message = RemoteProtocolBuilder.getMessage(request)
- if (request.getIsOneWay) {
+ if (request.hasSender) {
val sender = request.getSender
if (sender ne null) actorRef.!(message)(Some(ActorRef.fromProtocol(sender)))
} else {
diff --git a/akka-core/src/main/scala/stm/DataFlowVariable.scala b/akka-core/src/main/scala/stm/DataFlowVariable.scala
index d5e3901fa7..bc65d9d5cd 100644
--- a/akka-core/src/main/scala/stm/DataFlowVariable.scala
+++ b/akka-core/src/main/scala/stm/DataFlowVariable.scala
@@ -46,7 +46,7 @@ import se.scalablesolutions.akka.dispatch.CompletableFuture
extends Actor {
def receive = {
case Exit => exit
- case message => reply(body(message.asInstanceOf[A]))
+ case message => self.reply(body(message.asInstanceOf[A]))
}
}
@@ -64,7 +64,7 @@ import se.scalablesolutions.akka.dispatch.CompletableFuture
private val blockedReaders = new ConcurrentLinkedQueue[ActorRef]
private class In[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor {
- timeout = TIME_OUT
+ self.timeout = TIME_OUT
def receive = {
case Set(v) =>
if (dataFlow.value.compareAndSet(None, Some(v.asInstanceOf[T]))) {
@@ -78,13 +78,13 @@ import se.scalablesolutions.akka.dispatch.CompletableFuture
}
private class Out[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor {
- timeout = TIME_OUT
+ self.timeout = TIME_OUT
private var readerFuture: Option[CompletableFuture[T]] = None
def receive = {
case Get =>
val ref = dataFlow.value.get
if (ref.isDefined)
- reply(ref.get)
+ self.reply(ref.get)
else {
readerFuture = self.replyTo match {
case Some(Right(future)) => Some(future.asInstanceOf[CompletableFuture[T]])
@@ -348,7 +348,6 @@ object Test4 extends Application {
// =======================================
object Test5 extends Application {
- import Actor.Sender.Self
import DataFlow._
// create four 'Int' data flow variables
diff --git a/akka-core/src/main/scala/util/ReadWriteLock.scala b/akka-core/src/main/scala/util/ReadWriteLock.scala
deleted file mode 100644
index 9dd7a27b07..0000000000
--- a/akka-core/src/main/scala/util/ReadWriteLock.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Copyright (C) 2009-2010 Scalable Solutions AB