diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala
index 3824c329b4..1b99c63158 100644
--- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala
+++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala
@@ -111,6 +111,7 @@ private[akka] class ActorCell(
def start(): Unit = {
if (props.supervisor.isDefined) props.supervisor.get.link(self)
dispatcher.attach(this)
+ Actor.registry.register(self)
dispatcher.systemDispatch(SystemEnvelope(this, Create, NullChannel))
}
@@ -223,17 +224,16 @@ private[akka] class ActorCell(
case msg ⇒ msg.channel
}
- def systemInvoke(envelope: SystemEnvelope): Boolean = {
+ def systemInvoke(envelope: SystemEnvelope): Unit = {
var isTerminated = terminated
- def create(recreation: Boolean): Boolean = try {
+ def create(recreation: Boolean): Unit = try {
actor.get() match {
case null ⇒
val created = newActor(restart = false)
actor.set(created)
created.preStart()
checkReceiveTimeout
- Actor.registry.register(self)
if (Actor.debugLifecycle) EventHandler.debug(created, "started")
case instance if recreation ⇒
restart(new Exception("Restart commanded"), None, None)
@@ -245,21 +245,14 @@ private[akka] class ActorCell(
envelope.channel.sendException(e)
if (supervisor.isDefined) {
supervisor.get ! Failed(self, e, false, maxNrOfRetriesCount, restartTimeWindowStartNanos)
- false // don't continue processing messages right now
} else throw e
}
- def suspend(): Boolean = {
- dispatcher suspend this
- true
- }
+ def suspend(): Unit = dispatcher suspend this
- def resume(): Boolean = {
- dispatcher resume this
- true
- }
+ def resume(): Unit = dispatcher resume this
- def terminate(): Boolean = {
+ def terminate(): Unit = {
receiveTimeout = None
cancelReceiveTimeout
Actor.registry.unregister(self)
@@ -278,10 +271,6 @@ private[akka] class ActorCell(
i.remove()
}
}
-
- // TODO CHECK: stop message dequeuing, which means that mailbox will not be restarted and GCed
- false
-
} finally {
try {
if (supervisor.isDefined)
@@ -305,8 +294,6 @@ private[akka] class ActorCell(
case Resume ⇒ resume()
case Terminate ⇒ terminate()
}
- } else {
- false
}
} catch {
case e ⇒ //Should we really catch everything here?
diff --git a/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala b/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala
index 702fb93a08..69c7727bbe 100644
--- a/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala
+++ b/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala
@@ -9,6 +9,7 @@ import scala.collection.mutable.ListBuffer
import java.util.concurrent.ConcurrentHashMap
import akka.util.ListenerManagement
+import reflect.BeanProperty
/**
* Base trait for ActorRegistry events, allows listen to when an actor is added and removed from the ActorRegistry.
@@ -16,8 +17,10 @@ import akka.util.ListenerManagement
* @author Jonas Bonér
*/
sealed trait ActorRegistryEvent
-case class ActorRegistered(address: String, actor: ActorRef, typedActor: Option[AnyRef]) extends ActorRegistryEvent
-case class ActorUnregistered(address: String, actor: ActorRef, typedActor: Option[AnyRef]) extends ActorRegistryEvent
+case class ActorRegistered(@BeanProperty address: String, @BeanProperty actor: ActorRef) extends ActorRegistryEvent
+case class ActorUnregistered(@BeanProperty address: String, @BeanProperty actor: ActorRef) extends ActorRegistryEvent
+case class TypedActorRegistered(@BeanProperty address: String, @BeanProperty actor: ActorRef, @BeanProperty proxy: AnyRef) extends ActorRegistryEvent
+case class TypedActorUnregistered(@BeanProperty address: String, @BeanProperty actor: ActorRef, @BeanProperty proxy: AnyRef) extends ActorRegistryEvent
/**
* Registry holding all Actor instances in the whole system.
@@ -55,14 +58,18 @@ private[actor] final class ActorRegistry private[actor] () extends ListenerManag
actorsByAddress.put(address, actor)
actorsByUuid.put(actor.uuid, actor)
- notifyListeners(ActorRegistered(address, actor, Option(typedActorsByUuid get actor.uuid)))
+ notifyListeners(ActorRegistered(address, actor))
}
- private[akka] def registerTypedActor(actorRef: ActorRef, interface: AnyRef): Unit =
- typedActorsByUuid.put(actorRef.uuid, interface)
+ private[akka] def registerTypedActor(actorRef: ActorRef, proxy: AnyRef): Unit = {
+ if (typedActorsByUuid.putIfAbsent(actorRef.uuid, proxy) eq null)
+ notifyListeners(TypedActorRegistered(actorRef.address, actorRef, proxy))
+ }
- private[akka] def unregisterTypedActor(actorRef: ActorRef, interface: AnyRef): Unit =
- typedActorsByUuid.remove(actorRef.uuid, interface)
+ private[akka] def unregisterTypedActor(actorRef: ActorRef, proxy: AnyRef): Unit = {
+ if (typedActorsByUuid.remove(actorRef.uuid, proxy))
+ notifyListeners(TypedActorUnregistered(actorRef.address, actorRef, proxy))
+ }
/**
* Unregisters an actor in the ActorRegistry.
@@ -70,7 +77,7 @@ private[actor] final class ActorRegistry private[actor] () extends ListenerManag
private[akka] def unregister(address: String) {
val actor = actorsByAddress remove address
actorsByUuid remove actor.uuid
- notifyListeners(ActorUnregistered(address, actor, None))
+ notifyListeners(ActorUnregistered(address, actor))
}
/**
@@ -80,7 +87,12 @@ private[actor] final class ActorRegistry private[actor] () extends ListenerManag
val address = actor.address
actorsByAddress remove address
actorsByUuid remove actor.uuid
- notifyListeners(ActorUnregistered(address, actor, Option(typedActorsByUuid remove actor.uuid)))
+ notifyListeners(ActorUnregistered(address, actor))
+
+ //Safe cleanup (if called from the outside)
+ val proxy = typedActorsByUuid.remove(actor.uuid)
+ if (proxy ne null)
+ notifyListeners(TypedActorUnregistered(address, actor, proxy))
}
/**
diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala
index 362bb45382..da5893e1bf 100644
--- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala
+++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala
@@ -146,22 +146,6 @@ class Dispatcher(
protected[akka] def reRegisterForExecution(mbox: Mailbox): Unit =
registerForExecution(mbox)
- protected override def cleanUpMailboxFor(actor: ActorCell) {
- val m = actor.mailbox
- actor.mailbox = deadLetterMailbox //FIXME switch to getAndSet semantics
- if (m.hasMessages) {
- var invocation = m.dequeue
- lazy val exception = new ActorKilledException("Actor has been stopped")
- while (invocation ne null) {
- invocation.channel.sendException(exception)
- invocation = m.dequeue
- }
- }
- while (m.systemDequeue() ne null) {
- //Empty the system messages
- }
- }
-
override val toString = getClass.getSimpleName + "[" + name + "]"
def suspend(actor: ActorCell): Unit =
diff --git a/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala
index fa3d542914..6181f4b5d4 100644
--- a/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala
+++ b/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala
@@ -39,7 +39,9 @@ trait Mailbox extends Runnable {
* @return true if the processing finished before the mailbox was empty, due to the throughput constraint
*/
final def processMailbox() {
- if (processAllSystemMessages() && !suspended.locked) {
+ if (hasSystemMessages)
+ processAllSystemMessages()
+ else if (!suspended.locked) {
var nextMessage = dequeue()
if (nextMessage ne null) { //If we have a message
if (dispatcher.throughput <= 1) //If we only run one message per process
@@ -51,28 +53,29 @@ trait Mailbox extends Runnable {
else 0
do {
nextMessage.invoke
- nextMessage =
- if (!processAllSystemMessages() || suspended.locked) {
- null // If we are suspended, abort
- } else { // If we aren't suspended, we need to make sure we're not overstepping our boundaries
- processedMessages += 1
- if ((processedMessages >= dispatcher.throughput) || (isDeadlineEnabled && System.nanoTime >= deadlineNs)) // If we're throttled, break out
- null //We reached our boundaries, abort
- else dequeue //Dequeue the next message
- }
+ nextMessage = if (hasSystemMessages) {
+ processAllSystemMessages()
+ null
+ } else if (suspended.locked) {
+ null // If we are suspended, abort
+ } else { // If we aren't suspended, we need to make sure we're not overstepping our boundaries
+ processedMessages += 1
+ if ((processedMessages >= dispatcher.throughput) || (isDeadlineEnabled && System.nanoTime >= deadlineNs)) // If we're throttled, break out
+ null //We reached our boundaries, abort
+ else dequeue //Dequeue the next message
+ }
} while (nextMessage ne null)
}
}
}
}
- def processAllSystemMessages(): Boolean = {
+ def processAllSystemMessages(): Unit = {
var nextMessage = systemDequeue()
while (nextMessage ne null) {
- if (!nextMessage.invoke()) return false
+ nextMessage.invoke()
nextMessage = systemDequeue()
}
- true
}
/*
diff --git a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala
index b64541cd17..35cd2f91ba 100644
--- a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala
+++ b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala
@@ -34,7 +34,7 @@ final case class SystemEnvelope(val receiver: ActorCell, val message: SystemMess
/**
* @return whether to proceed with processing other messages
*/
- final def invoke(): Boolean = receiver systemInvoke this
+ final def invoke(): Unit = receiver systemInvoke this
}
final case class TaskInvocation(function: () ⇒ Unit, cleanup: () ⇒ Unit) extends Runnable {
@@ -174,7 +174,9 @@ abstract class MessageDispatcher extends Serializable {
*/
protected[akka] def unregister(actor: ActorCell) = {
if (uuids remove actor.uuid) {
- cleanUpMailboxFor(actor)
+ val mailBox = actor.mailbox
+ actor.mailbox = deadLetterMailbox //FIXME switch to getAndSet semantics
+ cleanUpMailboxFor(actor, mailBox)
if (uuids.isEmpty && _tasks.get == 0) {
shutdownSchedule match {
case UNSCHEDULED ⇒
@@ -192,7 +194,25 @@ abstract class MessageDispatcher extends Serializable {
* Overridable callback to clean up the mailbox for a given actor,
* called when an actor is unregistered.
*/
- protected def cleanUpMailboxFor(actor: ActorCell) {}
+ protected def cleanUpMailboxFor(actor: ActorCell, mailBox: Mailbox) {
+ val m = mailBox
+
+ if (m.hasSystemMessages) {
+ var envelope = m.systemDequeue()
+ while (envelope ne null) {
+ deadLetterMailbox.systemEnqueue(envelope)
+ envelope = m.systemDequeue()
+ }
+ }
+
+ if (m.hasMessages) {
+ var envelope = m.dequeue
+ while (envelope ne null) {
+ deadLetterMailbox.enqueue(envelope)
+ envelope = m.dequeue
+ }
+ }
+ }
/**
* Traverses the list of actors (uuids) currently being attached to this dispatcher and stops those actors
diff --git a/akka-camel-typed/src/main/scala/akka/camel/TypedConsumerPublisher.scala b/akka-camel-typed/src/main/scala/akka/camel/TypedConsumerPublisher.scala
index 7b30ae5d74..c530ded991 100644
--- a/akka-camel-typed/src/main/scala/akka/camel/TypedConsumerPublisher.scala
+++ b/akka-camel-typed/src/main/scala/akka/camel/TypedConsumerPublisher.scala
@@ -12,16 +12,16 @@ import akka.event.EventHandler
/**
* Concrete publish requestor that requests publication of typed consumer actor methods on
- * ActorRegistered events and unpublication of typed consumer actor methods on
- * ActorUnregistered events.
+ * TypedActorRegistered events and unpublication of typed consumer actor methods on
+ * TypedActorUnregistered events.
*
* @author Martin Krasser
*/
private[camel] class TypedConsumerPublishRequestor extends PublishRequestor {
def receiveActorRegistryEvent = {
- case ActorRegistered(_, actor, typedActor) ⇒ for (event ← ConsumerMethodRegistered.eventsFor(actor, typedActor)) deliverCurrentEvent(event)
- case ActorUnregistered(_, actor, typedActor) ⇒ for (event ← ConsumerMethodUnregistered.eventsFor(actor, typedActor)) deliverCurrentEvent(event)
- case _ ⇒ ()
+ case TypedActorRegistered(_, actor, typedActor) ⇒ for (event ← ConsumerMethodRegistered.eventsFor(actor, Option(typedActor))) deliverCurrentEvent(event)
+ case TypedActorUnregistered(_, actor, typedActor) ⇒ for (event ← ConsumerMethodUnregistered.eventsFor(actor, Option(typedActor))) deliverCurrentEvent(event)
+ case _ ⇒ ()
}
}
diff --git a/akka-camel/src/main/scala/akka/camel/ConsumerPublisher.scala b/akka-camel/src/main/scala/akka/camel/ConsumerPublisher.scala
index aac68fc1e4..b02587f236 100644
--- a/akka-camel/src/main/scala/akka/camel/ConsumerPublisher.scala
+++ b/akka-camel/src/main/scala/akka/camel/ConsumerPublisher.scala
@@ -20,9 +20,9 @@ import akka.event.EventHandler
*/
private[camel] class ConsumerPublishRequestor extends PublishRequestor {
def receiveActorRegistryEvent = {
- case ActorRegistered(_, actor, None) ⇒ for (event ← ConsumerActorRegistered.eventFor(actor)) deliverCurrentEvent(event)
- case ActorUnregistered(_, actor, None) ⇒ for (event ← ConsumerActorUnregistered.eventFor(actor)) deliverCurrentEvent(event)
- case _ ⇒ ()
+ case ActorRegistered(_, actor) ⇒ for (event ← ConsumerActorRegistered.eventFor(actor)) deliverCurrentEvent(event)
+ case ActorUnregistered(_, actor) ⇒ for (event ← ConsumerActorUnregistered.eventFor(actor)) deliverCurrentEvent(event)
+ case _ ⇒ ()
}
}
diff --git a/akka-camel/src/main/scala/akka/camel/PublisherRequestor.scala b/akka-camel/src/main/scala/akka/camel/PublisherRequestor.scala
index 7c1ace2b77..7083cdbe6e 100644
--- a/akka-camel/src/main/scala/akka/camel/PublisherRequestor.scala
+++ b/akka-camel/src/main/scala/akka/camel/PublisherRequestor.scala
@@ -54,7 +54,7 @@ private[camel] abstract class PublishRequestor extends Actor {
* @author Martin Krasser
*/
private[camel] object PublishRequestor {
- def pastActorRegisteredEvents = for (actor ← Actor.registry.local.actors) yield ActorRegistered(actor.address, actor, None)
+ def pastActorRegisteredEvents = for (actor ← Actor.registry.local.actors) yield ActorRegistered(actor.address, actor)
}
/**
diff --git a/akka-camel/src/test/scala/akka/camel/ConsumerPublishRequestorTest.scala b/akka-camel/src/test/scala/akka/camel/ConsumerPublishRequestorTest.scala
index 45ab3a2ccb..7bfb28afb6 100644
--- a/akka-camel/src/test/scala/akka/camel/ConsumerPublishRequestorTest.scala
+++ b/akka-camel/src/test/scala/akka/camel/ConsumerPublishRequestorTest.scala
@@ -36,7 +36,7 @@ class ConsumerPublishRequestorTest extends JUnitSuite {
@Test
def shouldReceiveOneConsumerRegisteredEvent = {
val latch = (publisher ? SetExpectedTestMessageCount(1)).as[CountDownLatch].get
- requestor ! ActorRegistered(consumer.address, consumer, None)
+ requestor ! ActorRegistered(consumer.address, consumer)
assert(latch.await(5000, TimeUnit.MILLISECONDS))
assert((publisher ? GetRetainedMessage).get ===
ConsumerActorRegistered(consumer, consumer.underlyingActorInstance.asInstanceOf[Consumer]))
@@ -45,7 +45,7 @@ class ConsumerPublishRequestorTest extends JUnitSuite {
@Test
def shouldReceiveOneConsumerUnregisteredEvent = {
val latch = (publisher ? SetExpectedTestMessageCount(1)).as[CountDownLatch].get
- requestor ! ActorUnregistered(consumer.address, consumer, None)
+ requestor ! ActorUnregistered(consumer.address, consumer)
assert(latch.await(5000, TimeUnit.MILLISECONDS))
assert((publisher ? GetRetainedMessage).get ===
ConsumerActorUnregistered(consumer, consumer.underlyingActorInstance.asInstanceOf[Consumer]))
diff --git a/akka-docs/java/actor-registry.rst b/akka-docs/java/actor-registry.rst
index 4439c37468..32a5af42c6 100644
--- a/akka-docs/java/actor-registry.rst
+++ b/akka-docs/java/actor-registry.rst
@@ -43,10 +43,25 @@ The messages sent to this Actor are:
.. code-block:: java
public class ActorRegistered {
- ActorRef actor();
+ ActorRef getActor();
+ String getAddress();
}
+
public class ActorUnregistered {
ActorRef actor();
+ String getAddress();
+ }
+
+ public class TypedActorRegistered {
+ ActorRef getActor();
+ String getAddress();
+ Object getProxy();
+ }
+
+ public class TypedActorUnregistered {
+ ActorRef actor();
+ String getAddress();
+ Object getProxy();
}
So your listener Actor needs to be able to handle these two messages. Example:
@@ -55,6 +70,8 @@ So your listener Actor needs to be able to handle these two messages. Example:
import akka.actor.ActorRegistered;
import akka.actor.ActorUnregistered;
+ import akka.actor.TypedActorRegistered;
+ import akka.actor.TypedActorUnregistered;
import akka.actor.UntypedActor;
import akka.event.EventHandler;
diff --git a/akka-docs/scala/actor-registry.rst b/akka-docs/scala/actor-registry.rst
index 762a6b9b96..f5a6290cdf 100644
--- a/akka-docs/scala/actor-registry.rst
+++ b/akka-docs/scala/actor-registry.rst
@@ -73,18 +73,17 @@ The messages sent to this Actor are:
.. code-block:: scala
- case class ActorRegistered(actor: ActorRef)
- case class ActorUnregistered(actor: ActorRef)
+ case class ActorRegistered(@BeanProperty address: String,@BeanProperty actor: ActorRef) extends ActorRegistryEvent
+ case class ActorUnregistered(@BeanProperty address: String, @BeanProperty actor: ActorRef) extends ActorRegistryEvent
+ case class TypedActorRegistered(@BeanProperty address: String, @BeanProperty actor: ActorRef, @BeanProperty proxy: AnyRef) extends ActorRegistryEvent
+ case class TypedActorUnregistered(@BeanProperty address: String, @BeanProperty actor: ActorRef, @BeanProperty proxy: AnyRef) extends ActorRegistryEvent
-So your listener Actor needs to be able to handle these two messages. Example:
+So your listener Actor needs to be able to handle these messages. Example:
.. code-block:: scala
- import akka.actor.Actor
- import akka.actor.ActorRegistered;
- import akka.actor.ActorUnregistered;
- import akka.actor.UntypedActor;
- import akka.event.EventHandler;
+ import akka.actor._
+ import akka.event.EventHandler
class RegistryListener extends Actor {
def receive = {