From 6e0e9910e93945a200adf6c2b43ddcea4c99c5f3 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Thu, 22 Sep 2011 17:15:51 +0200 Subject: [PATCH] Fixing the camel tests for real this time by introducing separate registered/unregistered events for actors and typed actors --- .../src/main/scala/akka/actor/ActorCell.scala | 25 ++++------------ .../main/scala/akka/actor/ActorRegistry.scala | 30 +++++++++++++------ .../main/scala/akka/dispatch/Dispatcher.scala | 16 ---------- .../scala/akka/dispatch/MailboxHandling.scala | 29 ++++++++++-------- .../scala/akka/dispatch/MessageHandling.scala | 26 ++++++++++++++-- .../akka/camel/TypedConsumerPublisher.scala | 10 +++---- .../scala/akka/camel/ConsumerPublisher.scala | 6 ++-- .../scala/akka/camel/PublisherRequestor.scala | 2 +- .../camel/ConsumerPublishRequestorTest.scala | 4 +-- akka-docs/java/actor-registry.rst | 19 +++++++++++- akka-docs/scala/actor-registry.rst | 15 +++++----- 11 files changed, 102 insertions(+), 80 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 3824c329b4..1b99c63158 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -111,6 +111,7 @@ private[akka] class ActorCell( def start(): Unit = { if (props.supervisor.isDefined) props.supervisor.get.link(self) dispatcher.attach(this) + Actor.registry.register(self) dispatcher.systemDispatch(SystemEnvelope(this, Create, NullChannel)) } @@ -223,17 +224,16 @@ private[akka] class ActorCell( case msg ⇒ msg.channel } - def systemInvoke(envelope: SystemEnvelope): Boolean = { + def systemInvoke(envelope: SystemEnvelope): Unit = { var isTerminated = terminated - def create(recreation: Boolean): Boolean = try { + def create(recreation: Boolean): Unit = try { actor.get() match { case null ⇒ val created = newActor(restart = false) actor.set(created) created.preStart() checkReceiveTimeout - Actor.registry.register(self) if (Actor.debugLifecycle) EventHandler.debug(created, "started") case instance if recreation ⇒ restart(new Exception("Restart commanded"), None, None) @@ -245,21 +245,14 @@ private[akka] class ActorCell( envelope.channel.sendException(e) if (supervisor.isDefined) { supervisor.get ! Failed(self, e, false, maxNrOfRetriesCount, restartTimeWindowStartNanos) - false // don't continue processing messages right now } else throw e } - def suspend(): Boolean = { - dispatcher suspend this - true - } + def suspend(): Unit = dispatcher suspend this - def resume(): Boolean = { - dispatcher resume this - true - } + def resume(): Unit = dispatcher resume this - def terminate(): Boolean = { + def terminate(): Unit = { receiveTimeout = None cancelReceiveTimeout Actor.registry.unregister(self) @@ -278,10 +271,6 @@ private[akka] class ActorCell( i.remove() } } - - // TODO CHECK: stop message dequeuing, which means that mailbox will not be restarted and GCed - false - } finally { try { if (supervisor.isDefined) @@ -305,8 +294,6 @@ private[akka] class ActorCell( case Resume ⇒ resume() case Terminate ⇒ terminate() } - } else { - false } } catch { case e ⇒ //Should we really catch everything here? diff --git a/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala b/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala index 702fb93a08..69c7727bbe 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala @@ -9,6 +9,7 @@ import scala.collection.mutable.ListBuffer import java.util.concurrent.ConcurrentHashMap import akka.util.ListenerManagement +import reflect.BeanProperty /** * Base trait for ActorRegistry events, allows listen to when an actor is added and removed from the ActorRegistry. @@ -16,8 +17,10 @@ import akka.util.ListenerManagement * @author Jonas Bonér */ sealed trait ActorRegistryEvent -case class ActorRegistered(address: String, actor: ActorRef, typedActor: Option[AnyRef]) extends ActorRegistryEvent -case class ActorUnregistered(address: String, actor: ActorRef, typedActor: Option[AnyRef]) extends ActorRegistryEvent +case class ActorRegistered(@BeanProperty address: String, @BeanProperty actor: ActorRef) extends ActorRegistryEvent +case class ActorUnregistered(@BeanProperty address: String, @BeanProperty actor: ActorRef) extends ActorRegistryEvent +case class TypedActorRegistered(@BeanProperty address: String, @BeanProperty actor: ActorRef, @BeanProperty proxy: AnyRef) extends ActorRegistryEvent +case class TypedActorUnregistered(@BeanProperty address: String, @BeanProperty actor: ActorRef, @BeanProperty proxy: AnyRef) extends ActorRegistryEvent /** * Registry holding all Actor instances in the whole system. @@ -55,14 +58,18 @@ private[actor] final class ActorRegistry private[actor] () extends ListenerManag actorsByAddress.put(address, actor) actorsByUuid.put(actor.uuid, actor) - notifyListeners(ActorRegistered(address, actor, Option(typedActorsByUuid get actor.uuid))) + notifyListeners(ActorRegistered(address, actor)) } - private[akka] def registerTypedActor(actorRef: ActorRef, interface: AnyRef): Unit = - typedActorsByUuid.put(actorRef.uuid, interface) + private[akka] def registerTypedActor(actorRef: ActorRef, proxy: AnyRef): Unit = { + if (typedActorsByUuid.putIfAbsent(actorRef.uuid, proxy) eq null) + notifyListeners(TypedActorRegistered(actorRef.address, actorRef, proxy)) + } - private[akka] def unregisterTypedActor(actorRef: ActorRef, interface: AnyRef): Unit = - typedActorsByUuid.remove(actorRef.uuid, interface) + private[akka] def unregisterTypedActor(actorRef: ActorRef, proxy: AnyRef): Unit = { + if (typedActorsByUuid.remove(actorRef.uuid, proxy)) + notifyListeners(TypedActorUnregistered(actorRef.address, actorRef, proxy)) + } /** * Unregisters an actor in the ActorRegistry. @@ -70,7 +77,7 @@ private[actor] final class ActorRegistry private[actor] () extends ListenerManag private[akka] def unregister(address: String) { val actor = actorsByAddress remove address actorsByUuid remove actor.uuid - notifyListeners(ActorUnregistered(address, actor, None)) + notifyListeners(ActorUnregistered(address, actor)) } /** @@ -80,7 +87,12 @@ private[actor] final class ActorRegistry private[actor] () extends ListenerManag val address = actor.address actorsByAddress remove address actorsByUuid remove actor.uuid - notifyListeners(ActorUnregistered(address, actor, Option(typedActorsByUuid remove actor.uuid))) + notifyListeners(ActorUnregistered(address, actor)) + + //Safe cleanup (if called from the outside) + val proxy = typedActorsByUuid.remove(actor.uuid) + if (proxy ne null) + notifyListeners(TypedActorUnregistered(address, actor, proxy)) } /** diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala index 362bb45382..da5893e1bf 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala @@ -146,22 +146,6 @@ class Dispatcher( protected[akka] def reRegisterForExecution(mbox: Mailbox): Unit = registerForExecution(mbox) - protected override def cleanUpMailboxFor(actor: ActorCell) { - val m = actor.mailbox - actor.mailbox = deadLetterMailbox //FIXME switch to getAndSet semantics - if (m.hasMessages) { - var invocation = m.dequeue - lazy val exception = new ActorKilledException("Actor has been stopped") - while (invocation ne null) { - invocation.channel.sendException(exception) - invocation = m.dequeue - } - } - while (m.systemDequeue() ne null) { - //Empty the system messages - } - } - override val toString = getClass.getSimpleName + "[" + name + "]" def suspend(actor: ActorCell): Unit = diff --git a/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala index fa3d542914..6181f4b5d4 100644 --- a/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala +++ b/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala @@ -39,7 +39,9 @@ trait Mailbox extends Runnable { * @return true if the processing finished before the mailbox was empty, due to the throughput constraint */ final def processMailbox() { - if (processAllSystemMessages() && !suspended.locked) { + if (hasSystemMessages) + processAllSystemMessages() + else if (!suspended.locked) { var nextMessage = dequeue() if (nextMessage ne null) { //If we have a message if (dispatcher.throughput <= 1) //If we only run one message per process @@ -51,28 +53,29 @@ trait Mailbox extends Runnable { else 0 do { nextMessage.invoke - nextMessage = - if (!processAllSystemMessages() || suspended.locked) { - null // If we are suspended, abort - } else { // If we aren't suspended, we need to make sure we're not overstepping our boundaries - processedMessages += 1 - if ((processedMessages >= dispatcher.throughput) || (isDeadlineEnabled && System.nanoTime >= deadlineNs)) // If we're throttled, break out - null //We reached our boundaries, abort - else dequeue //Dequeue the next message - } + nextMessage = if (hasSystemMessages) { + processAllSystemMessages() + null + } else if (suspended.locked) { + null // If we are suspended, abort + } else { // If we aren't suspended, we need to make sure we're not overstepping our boundaries + processedMessages += 1 + if ((processedMessages >= dispatcher.throughput) || (isDeadlineEnabled && System.nanoTime >= deadlineNs)) // If we're throttled, break out + null //We reached our boundaries, abort + else dequeue //Dequeue the next message + } } while (nextMessage ne null) } } } } - def processAllSystemMessages(): Boolean = { + def processAllSystemMessages(): Unit = { var nextMessage = systemDequeue() while (nextMessage ne null) { - if (!nextMessage.invoke()) return false + nextMessage.invoke() nextMessage = systemDequeue() } - true } /* diff --git a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala index b64541cd17..35cd2f91ba 100644 --- a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala @@ -34,7 +34,7 @@ final case class SystemEnvelope(val receiver: ActorCell, val message: SystemMess /** * @return whether to proceed with processing other messages */ - final def invoke(): Boolean = receiver systemInvoke this + final def invoke(): Unit = receiver systemInvoke this } final case class TaskInvocation(function: () ⇒ Unit, cleanup: () ⇒ Unit) extends Runnable { @@ -174,7 +174,9 @@ abstract class MessageDispatcher extends Serializable { */ protected[akka] def unregister(actor: ActorCell) = { if (uuids remove actor.uuid) { - cleanUpMailboxFor(actor) + val mailBox = actor.mailbox + actor.mailbox = deadLetterMailbox //FIXME switch to getAndSet semantics + cleanUpMailboxFor(actor, mailBox) if (uuids.isEmpty && _tasks.get == 0) { shutdownSchedule match { case UNSCHEDULED ⇒ @@ -192,7 +194,25 @@ abstract class MessageDispatcher extends Serializable { * Overridable callback to clean up the mailbox for a given actor, * called when an actor is unregistered. */ - protected def cleanUpMailboxFor(actor: ActorCell) {} + protected def cleanUpMailboxFor(actor: ActorCell, mailBox: Mailbox) { + val m = mailBox + + if (m.hasSystemMessages) { + var envelope = m.systemDequeue() + while (envelope ne null) { + deadLetterMailbox.systemEnqueue(envelope) + envelope = m.systemDequeue() + } + } + + if (m.hasMessages) { + var envelope = m.dequeue + while (envelope ne null) { + deadLetterMailbox.enqueue(envelope) + envelope = m.dequeue + } + } + } /** * Traverses the list of actors (uuids) currently being attached to this dispatcher and stops those actors diff --git a/akka-camel-typed/src/main/scala/akka/camel/TypedConsumerPublisher.scala b/akka-camel-typed/src/main/scala/akka/camel/TypedConsumerPublisher.scala index 7b30ae5d74..c530ded991 100644 --- a/akka-camel-typed/src/main/scala/akka/camel/TypedConsumerPublisher.scala +++ b/akka-camel-typed/src/main/scala/akka/camel/TypedConsumerPublisher.scala @@ -12,16 +12,16 @@ import akka.event.EventHandler /** * Concrete publish requestor that requests publication of typed consumer actor methods on - * ActorRegistered events and unpublication of typed consumer actor methods on - * ActorUnregistered events. + * TypedActorRegistered events and unpublication of typed consumer actor methods on + * TypedActorUnregistered events. * * @author Martin Krasser */ private[camel] class TypedConsumerPublishRequestor extends PublishRequestor { def receiveActorRegistryEvent = { - case ActorRegistered(_, actor, typedActor) ⇒ for (event ← ConsumerMethodRegistered.eventsFor(actor, typedActor)) deliverCurrentEvent(event) - case ActorUnregistered(_, actor, typedActor) ⇒ for (event ← ConsumerMethodUnregistered.eventsFor(actor, typedActor)) deliverCurrentEvent(event) - case _ ⇒ () + case TypedActorRegistered(_, actor, typedActor) ⇒ for (event ← ConsumerMethodRegistered.eventsFor(actor, Option(typedActor))) deliverCurrentEvent(event) + case TypedActorUnregistered(_, actor, typedActor) ⇒ for (event ← ConsumerMethodUnregistered.eventsFor(actor, Option(typedActor))) deliverCurrentEvent(event) + case _ ⇒ () } } diff --git a/akka-camel/src/main/scala/akka/camel/ConsumerPublisher.scala b/akka-camel/src/main/scala/akka/camel/ConsumerPublisher.scala index aac68fc1e4..b02587f236 100644 --- a/akka-camel/src/main/scala/akka/camel/ConsumerPublisher.scala +++ b/akka-camel/src/main/scala/akka/camel/ConsumerPublisher.scala @@ -20,9 +20,9 @@ import akka.event.EventHandler */ private[camel] class ConsumerPublishRequestor extends PublishRequestor { def receiveActorRegistryEvent = { - case ActorRegistered(_, actor, None) ⇒ for (event ← ConsumerActorRegistered.eventFor(actor)) deliverCurrentEvent(event) - case ActorUnregistered(_, actor, None) ⇒ for (event ← ConsumerActorUnregistered.eventFor(actor)) deliverCurrentEvent(event) - case _ ⇒ () + case ActorRegistered(_, actor) ⇒ for (event ← ConsumerActorRegistered.eventFor(actor)) deliverCurrentEvent(event) + case ActorUnregistered(_, actor) ⇒ for (event ← ConsumerActorUnregistered.eventFor(actor)) deliverCurrentEvent(event) + case _ ⇒ () } } diff --git a/akka-camel/src/main/scala/akka/camel/PublisherRequestor.scala b/akka-camel/src/main/scala/akka/camel/PublisherRequestor.scala index 7c1ace2b77..7083cdbe6e 100644 --- a/akka-camel/src/main/scala/akka/camel/PublisherRequestor.scala +++ b/akka-camel/src/main/scala/akka/camel/PublisherRequestor.scala @@ -54,7 +54,7 @@ private[camel] abstract class PublishRequestor extends Actor { * @author Martin Krasser */ private[camel] object PublishRequestor { - def pastActorRegisteredEvents = for (actor ← Actor.registry.local.actors) yield ActorRegistered(actor.address, actor, None) + def pastActorRegisteredEvents = for (actor ← Actor.registry.local.actors) yield ActorRegistered(actor.address, actor) } /** diff --git a/akka-camel/src/test/scala/akka/camel/ConsumerPublishRequestorTest.scala b/akka-camel/src/test/scala/akka/camel/ConsumerPublishRequestorTest.scala index 45ab3a2ccb..7bfb28afb6 100644 --- a/akka-camel/src/test/scala/akka/camel/ConsumerPublishRequestorTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ConsumerPublishRequestorTest.scala @@ -36,7 +36,7 @@ class ConsumerPublishRequestorTest extends JUnitSuite { @Test def shouldReceiveOneConsumerRegisteredEvent = { val latch = (publisher ? SetExpectedTestMessageCount(1)).as[CountDownLatch].get - requestor ! ActorRegistered(consumer.address, consumer, None) + requestor ! ActorRegistered(consumer.address, consumer) assert(latch.await(5000, TimeUnit.MILLISECONDS)) assert((publisher ? GetRetainedMessage).get === ConsumerActorRegistered(consumer, consumer.underlyingActorInstance.asInstanceOf[Consumer])) @@ -45,7 +45,7 @@ class ConsumerPublishRequestorTest extends JUnitSuite { @Test def shouldReceiveOneConsumerUnregisteredEvent = { val latch = (publisher ? SetExpectedTestMessageCount(1)).as[CountDownLatch].get - requestor ! ActorUnregistered(consumer.address, consumer, None) + requestor ! ActorUnregistered(consumer.address, consumer) assert(latch.await(5000, TimeUnit.MILLISECONDS)) assert((publisher ? GetRetainedMessage).get === ConsumerActorUnregistered(consumer, consumer.underlyingActorInstance.asInstanceOf[Consumer])) diff --git a/akka-docs/java/actor-registry.rst b/akka-docs/java/actor-registry.rst index 4439c37468..32a5af42c6 100644 --- a/akka-docs/java/actor-registry.rst +++ b/akka-docs/java/actor-registry.rst @@ -43,10 +43,25 @@ The messages sent to this Actor are: .. code-block:: java public class ActorRegistered { - ActorRef actor(); + ActorRef getActor(); + String getAddress(); } + public class ActorUnregistered { ActorRef actor(); + String getAddress(); + } + + public class TypedActorRegistered { + ActorRef getActor(); + String getAddress(); + Object getProxy(); + } + + public class TypedActorUnregistered { + ActorRef actor(); + String getAddress(); + Object getProxy(); } So your listener Actor needs to be able to handle these two messages. Example: @@ -55,6 +70,8 @@ So your listener Actor needs to be able to handle these two messages. Example: import akka.actor.ActorRegistered; import akka.actor.ActorUnregistered; + import akka.actor.TypedActorRegistered; + import akka.actor.TypedActorUnregistered; import akka.actor.UntypedActor; import akka.event.EventHandler; diff --git a/akka-docs/scala/actor-registry.rst b/akka-docs/scala/actor-registry.rst index 762a6b9b96..f5a6290cdf 100644 --- a/akka-docs/scala/actor-registry.rst +++ b/akka-docs/scala/actor-registry.rst @@ -73,18 +73,17 @@ The messages sent to this Actor are: .. code-block:: scala - case class ActorRegistered(actor: ActorRef) - case class ActorUnregistered(actor: ActorRef) + case class ActorRegistered(@BeanProperty address: String,@BeanProperty actor: ActorRef) extends ActorRegistryEvent + case class ActorUnregistered(@BeanProperty address: String, @BeanProperty actor: ActorRef) extends ActorRegistryEvent + case class TypedActorRegistered(@BeanProperty address: String, @BeanProperty actor: ActorRef, @BeanProperty proxy: AnyRef) extends ActorRegistryEvent + case class TypedActorUnregistered(@BeanProperty address: String, @BeanProperty actor: ActorRef, @BeanProperty proxy: AnyRef) extends ActorRegistryEvent -So your listener Actor needs to be able to handle these two messages. Example: +So your listener Actor needs to be able to handle these messages. Example: .. code-block:: scala - import akka.actor.Actor - import akka.actor.ActorRegistered; - import akka.actor.ActorUnregistered; - import akka.actor.UntypedActor; - import akka.event.EventHandler; + import akka.actor._ + import akka.event.EventHandler class RegistryListener extends Actor { def receive = {