diff --git a/akka-actor/src/main/scala/akka/AkkaApplication.scala b/akka-actor/src/main/scala/akka/AkkaApplication.scala index b064630107..60e7d78df2 100644 --- a/akka-actor/src/main/scala/akka/AkkaApplication.scala +++ b/akka-actor/src/main/scala/akka/AkkaApplication.scala @@ -176,6 +176,8 @@ class AkkaApplication(val name: String, val config: Configuration) extends Actor // TODO think about memory consistency effects when doing funky stuff inside an ActorRefProvider's constructor val provider: ActorRefProvider = reflective.createProvider + val deathWatch = provider.createDeathWatch() + val typedActor = new TypedActor(this) val serialization = new Serialization(this) diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 16a9444349..f95ed69e88 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -9,7 +9,6 @@ import akka.util._ import scala.annotation.tailrec import scala.collection.immutable.Stack import scala.collection.JavaConverters -import akka.event.InVMMonitoring import java.util.concurrent.{ ScheduledFuture, TimeUnit } import java.util.{ Collection ⇒ JCollection, Collections ⇒ JCollections } import akka.AkkaApplication @@ -207,27 +206,25 @@ private[akka] object ActorCell { } } +//vars don't need volatile since it's protected with the mailbox status +//Make sure that they are not read/written outside of a message processing (systemInvoke/invoke) private[akka] class ActorCell( val app: AkkaApplication, val self: ActorRef with ScalaActorRef, val props: Props, - @volatile var receiveTimeout: Option[Long], - @volatile var hotswap: Stack[PartialFunction[Any, Unit]]) extends ActorContext { + var receiveTimeout: Option[Long], + var hotswap: Stack[PartialFunction[Any, Unit]]) extends ActorContext { import ActorCell._ def provider = app.provider - @volatile var futureTimeout: Option[ScheduledFuture[AnyRef]] = None //FIXME TODO Doesn't need to be volatile either, since it will only ever be accessed when a message is processed - @volatile var _children: Vector[ChildRestartStats] = Vector.empty - @volatile //TODO FIXME Might be able to make this non-volatile since it should be guarded by a mailbox.isShutdown test (which will force volatile piggyback read) var currentMessage: Envelope = null - @volatile //TODO FIXME Might be able to make this non-volatile since it should be guarded by a mailbox.isShutdown test (which will force volatile piggyback read) var actor: Actor = _ //FIXME We can most probably make this just a regular reference to Actor def ref: ActorRef with ScalaActorRef = self @@ -238,7 +235,7 @@ private[akka] class ActorCell( def isShutdown: Boolean = mailbox.isClosed - @volatile //This must be volatile + @volatile //This must be volatile since it isn't protected by the mailbox status var mailbox: Mailbox = _ def start(): Unit = { @@ -404,7 +401,7 @@ private[akka] class ActorCell( if (supervisor.isDefined) supervisor.get ! ChildTerminated(self, cause) - InVMMonitoring.publish(Terminated(self, cause)) + app.deathWatch.publish(Terminated(self, cause)) currentMessage = null clearActorContext() @@ -426,10 +423,10 @@ private[akka] class ActorCell( case Create ⇒ create() case Recreate(cause) ⇒ recreate(cause) case Link(subject) ⇒ - akka.event.InVMMonitoring.subscribe(self, subject) + app.deathWatch.subscribe(self, subject) if (app.AkkaConfig.DebugLifecycle) app.eventHandler.debug(self, "now monitoring " + subject) case Unlink(subject) ⇒ - akka.event.InVMMonitoring.unsubscribe(self, subject) + app.deathWatch.unsubscribe(self, subject) if (app.AkkaConfig.DebugLifecycle) app.eventHandler.debug(self, "stopped monitoring " + subject) case Suspend ⇒ suspend() case Resume ⇒ resume() diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index d58ba2bf85..6e47f60300 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -13,7 +13,7 @@ import java.lang.{ UnsupportedOperationException, IllegalStateException } import akka.AkkaApplication import akka.remote.RemoteSupport import scala.util.DynamicVariable -import akka.event.{ EventHandler, InVMMonitoring } +import akka.event.{ EventHandler } /** * ActorRef is an immutable and serializable handle to an Actor. diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 4904c07d21..0b94e94809 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -4,7 +4,6 @@ package akka.actor -import akka.event.EventHandler import akka.config.ConfigurationException import akka.util.ReflectiveAccess import akka.routing._ @@ -14,6 +13,7 @@ import java.util.concurrent.ConcurrentHashMap import akka.dispatch.Promise import com.eaio.uuid.UUID import akka.AkkaException +import akka.event.{ ActorClassification, DeathWatch, EventHandler } /** * Interface for all ActorRef providers to implement. @@ -31,6 +31,8 @@ trait ActorRefProvider { private[akka] def evict(address: String): Boolean private[akka] def deserialize(actor: SerializedActorRef): Option[ActorRef] + + private[akka] def createDeathWatch(): DeathWatch } /** @@ -168,4 +170,23 @@ class LocalActorRefProvider(val app: AkkaApplication) extends ActorRefProvider { } private[akka] def deserialize(actor: SerializedActorRef): Option[ActorRef] = actorFor(actor.address) + + private[akka] def createDeathWatch(): DeathWatch = new LocalDeathWatch +} + +class LocalDeathWatch extends DeathWatch with ActorClassification { + + def mapSize = 1024 + + override def publish(event: Event): Unit = { + val monitors = dissociate(classify(event)) + if (monitors.nonEmpty) monitors.foreach(_ ! event) + } + + override def subscribe(subscriber: Subscriber, to: Classifier): Boolean = { + if (!super.subscribe(subscriber, to)) { + subscriber ! Terminated(subscriber, new ActorKilledException("Already terminated when linking")) + false + } else true + } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala index e44806b206..fd3dd697da 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala @@ -121,8 +121,11 @@ class Dispatcher( true } catch { case e: RejectedExecutionException ⇒ - app.eventHandler.warning(this, e.toString) - mbox.setAsIdle() + try { + app.eventHandler.warning(this, e.toString) + } finally { + mbox.setAsIdle() + } throw e } } else false diff --git a/akka-actor/src/main/scala/akka/event/DeathWatch.scala b/akka-actor/src/main/scala/akka/event/DeathWatch.scala index 31aabc076b..099d45a703 100644 --- a/akka-actor/src/main/scala/akka/event/DeathWatch.scala +++ b/akka-actor/src/main/scala/akka/event/DeathWatch.scala @@ -10,27 +10,10 @@ import akka.actor._ * The contract of DeathWatch is not properly expressed using the type system * Whenever there is a publish, all listeners to the Terminated Actor should be atomically removed * A failed subscribe should also only mean that the Classifier (ActorRef) that is listened to is already shut down - * See InVMMonitoring for semantics + * See LocalDeathWatch for semantics */ trait DeathWatch extends ActorEventBus with ActorClassifier { type Event = Terminated protected final def classify(event: Event): Classifier = event.actor -} - -object InVMMonitoring extends DeathWatch with ActorClassification { - - def mapSize = 1024 - - override def publish(event: Event): Unit = { - val monitors = dissociate(classify(event)) - if (monitors.nonEmpty) monitors.foreach(_ ! event) - } - - override def subscribe(subscriber: Subscriber, to: Classifier): Boolean = { - if (!super.subscribe(subscriber, to)) { - subscriber ! Terminated(subscriber, new ActorKilledException("Already terminated when linking")) - false - } else true - } } \ No newline at end of file diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index ef7a82cfbe..5fcf8b8fee 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -9,7 +9,6 @@ import akka.routing._ import akka.actor.Actor._ import akka.actor.Status._ import akka.dispatch._ -import akka.event.EventHandler import akka.util.duration._ import akka.config.ConfigurationException import akka.AkkaException @@ -20,6 +19,7 @@ import Compression.LZF import java.net.InetSocketAddress import com.google.protobuf.ByteString import akka.AkkaApplication +import akka.event.{ DeathWatch, EventHandler } /** * Remote ActorRefProvider. Starts up actor on remote node and creates a RemoteActorRef representing it. @@ -225,6 +225,8 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider connection ! command } } + + private[akka] def createDeathWatch(): DeathWatch = local.createDeathWatch() //FIXME Implement Remote DeathWatch } /**