Fixing the camel tests for real this time by introducing separate registered/unregistered events for actors and typed actors

This commit is contained in:
Viktor Klang 2011-09-22 17:15:51 +02:00
parent 6109a17af4
commit 6e0e9910e9
11 changed files with 102 additions and 80 deletions

View file

@ -111,6 +111,7 @@ private[akka] class ActorCell(
def start(): Unit = {
if (props.supervisor.isDefined) props.supervisor.get.link(self)
dispatcher.attach(this)
Actor.registry.register(self)
dispatcher.systemDispatch(SystemEnvelope(this, Create, NullChannel))
}
@ -223,17 +224,16 @@ private[akka] class ActorCell(
case msg msg.channel
}
def systemInvoke(envelope: SystemEnvelope): Boolean = {
def systemInvoke(envelope: SystemEnvelope): Unit = {
var isTerminated = terminated
def create(recreation: Boolean): Boolean = try {
def create(recreation: Boolean): Unit = try {
actor.get() match {
case null
val created = newActor(restart = false)
actor.set(created)
created.preStart()
checkReceiveTimeout
Actor.registry.register(self)
if (Actor.debugLifecycle) EventHandler.debug(created, "started")
case instance if recreation
restart(new Exception("Restart commanded"), None, None)
@ -245,21 +245,14 @@ private[akka] class ActorCell(
envelope.channel.sendException(e)
if (supervisor.isDefined) {
supervisor.get ! Failed(self, e, false, maxNrOfRetriesCount, restartTimeWindowStartNanos)
false // don't continue processing messages right now
} else throw e
}
def suspend(): Boolean = {
dispatcher suspend this
true
}
def suspend(): Unit = dispatcher suspend this
def resume(): Boolean = {
dispatcher resume this
true
}
def resume(): Unit = dispatcher resume this
def terminate(): Boolean = {
def terminate(): Unit = {
receiveTimeout = None
cancelReceiveTimeout
Actor.registry.unregister(self)
@ -278,10 +271,6 @@ private[akka] class ActorCell(
i.remove()
}
}
// TODO CHECK: stop message dequeuing, which means that mailbox will not be restarted and GCed
false
} finally {
try {
if (supervisor.isDefined)
@ -305,8 +294,6 @@ private[akka] class ActorCell(
case Resume resume()
case Terminate terminate()
}
} else {
false
}
} catch {
case e //Should we really catch everything here?

View file

@ -9,6 +9,7 @@ import scala.collection.mutable.ListBuffer
import java.util.concurrent.ConcurrentHashMap
import akka.util.ListenerManagement
import reflect.BeanProperty
/**
* Base trait for ActorRegistry events, allows listen to when an actor is added and removed from the ActorRegistry.
@ -16,8 +17,10 @@ import akka.util.ListenerManagement
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
sealed trait ActorRegistryEvent
case class ActorRegistered(address: String, actor: ActorRef, typedActor: Option[AnyRef]) extends ActorRegistryEvent
case class ActorUnregistered(address: String, actor: ActorRef, typedActor: Option[AnyRef]) extends ActorRegistryEvent
case class ActorRegistered(@BeanProperty address: String, @BeanProperty actor: ActorRef) extends ActorRegistryEvent
case class ActorUnregistered(@BeanProperty address: String, @BeanProperty actor: ActorRef) extends ActorRegistryEvent
case class TypedActorRegistered(@BeanProperty address: String, @BeanProperty actor: ActorRef, @BeanProperty proxy: AnyRef) extends ActorRegistryEvent
case class TypedActorUnregistered(@BeanProperty address: String, @BeanProperty actor: ActorRef, @BeanProperty proxy: AnyRef) extends ActorRegistryEvent
/**
* Registry holding all Actor instances in the whole system.
@ -55,14 +58,18 @@ private[actor] final class ActorRegistry private[actor] () extends ListenerManag
actorsByAddress.put(address, actor)
actorsByUuid.put(actor.uuid, actor)
notifyListeners(ActorRegistered(address, actor, Option(typedActorsByUuid get actor.uuid)))
notifyListeners(ActorRegistered(address, actor))
}
private[akka] def registerTypedActor(actorRef: ActorRef, interface: AnyRef): Unit =
typedActorsByUuid.put(actorRef.uuid, interface)
private[akka] def registerTypedActor(actorRef: ActorRef, proxy: AnyRef): Unit = {
if (typedActorsByUuid.putIfAbsent(actorRef.uuid, proxy) eq null)
notifyListeners(TypedActorRegistered(actorRef.address, actorRef, proxy))
}
private[akka] def unregisterTypedActor(actorRef: ActorRef, interface: AnyRef): Unit =
typedActorsByUuid.remove(actorRef.uuid, interface)
private[akka] def unregisterTypedActor(actorRef: ActorRef, proxy: AnyRef): Unit = {
if (typedActorsByUuid.remove(actorRef.uuid, proxy))
notifyListeners(TypedActorUnregistered(actorRef.address, actorRef, proxy))
}
/**
* Unregisters an actor in the ActorRegistry.
@ -70,7 +77,7 @@ private[actor] final class ActorRegistry private[actor] () extends ListenerManag
private[akka] def unregister(address: String) {
val actor = actorsByAddress remove address
actorsByUuid remove actor.uuid
notifyListeners(ActorUnregistered(address, actor, None))
notifyListeners(ActorUnregistered(address, actor))
}
/**
@ -80,7 +87,12 @@ private[actor] final class ActorRegistry private[actor] () extends ListenerManag
val address = actor.address
actorsByAddress remove address
actorsByUuid remove actor.uuid
notifyListeners(ActorUnregistered(address, actor, Option(typedActorsByUuid remove actor.uuid)))
notifyListeners(ActorUnregistered(address, actor))
//Safe cleanup (if called from the outside)
val proxy = typedActorsByUuid.remove(actor.uuid)
if (proxy ne null)
notifyListeners(TypedActorUnregistered(address, actor, proxy))
}
/**

View file

@ -146,22 +146,6 @@ class Dispatcher(
protected[akka] def reRegisterForExecution(mbox: Mailbox): Unit =
registerForExecution(mbox)
protected override def cleanUpMailboxFor(actor: ActorCell) {
val m = actor.mailbox
actor.mailbox = deadLetterMailbox //FIXME switch to getAndSet semantics
if (m.hasMessages) {
var invocation = m.dequeue
lazy val exception = new ActorKilledException("Actor has been stopped")
while (invocation ne null) {
invocation.channel.sendException(exception)
invocation = m.dequeue
}
}
while (m.systemDequeue() ne null) {
//Empty the system messages
}
}
override val toString = getClass.getSimpleName + "[" + name + "]"
def suspend(actor: ActorCell): Unit =

View file

@ -39,7 +39,9 @@ trait Mailbox extends Runnable {
* @return true if the processing finished before the mailbox was empty, due to the throughput constraint
*/
final def processMailbox() {
if (processAllSystemMessages() && !suspended.locked) {
if (hasSystemMessages)
processAllSystemMessages()
else if (!suspended.locked) {
var nextMessage = dequeue()
if (nextMessage ne null) { //If we have a message
if (dispatcher.throughput <= 1) //If we only run one message per process
@ -51,8 +53,10 @@ trait Mailbox extends Runnable {
else 0
do {
nextMessage.invoke
nextMessage =
if (!processAllSystemMessages() || suspended.locked) {
nextMessage = if (hasSystemMessages) {
processAllSystemMessages()
null
} else if (suspended.locked) {
null // If we are suspended, abort
} else { // If we aren't suspended, we need to make sure we're not overstepping our boundaries
processedMessages += 1
@ -66,13 +70,12 @@ trait Mailbox extends Runnable {
}
}
def processAllSystemMessages(): Boolean = {
def processAllSystemMessages(): Unit = {
var nextMessage = systemDequeue()
while (nextMessage ne null) {
if (!nextMessage.invoke()) return false
nextMessage.invoke()
nextMessage = systemDequeue()
}
true
}
/*

View file

@ -34,7 +34,7 @@ final case class SystemEnvelope(val receiver: ActorCell, val message: SystemMess
/**
* @return whether to proceed with processing other messages
*/
final def invoke(): Boolean = receiver systemInvoke this
final def invoke(): Unit = receiver systemInvoke this
}
final case class TaskInvocation(function: () Unit, cleanup: () Unit) extends Runnable {
@ -174,7 +174,9 @@ abstract class MessageDispatcher extends Serializable {
*/
protected[akka] def unregister(actor: ActorCell) = {
if (uuids remove actor.uuid) {
cleanUpMailboxFor(actor)
val mailBox = actor.mailbox
actor.mailbox = deadLetterMailbox //FIXME switch to getAndSet semantics
cleanUpMailboxFor(actor, mailBox)
if (uuids.isEmpty && _tasks.get == 0) {
shutdownSchedule match {
case UNSCHEDULED
@ -192,7 +194,25 @@ abstract class MessageDispatcher extends Serializable {
* Overridable callback to clean up the mailbox for a given actor,
* called when an actor is unregistered.
*/
protected def cleanUpMailboxFor(actor: ActorCell) {}
protected def cleanUpMailboxFor(actor: ActorCell, mailBox: Mailbox) {
val m = mailBox
if (m.hasSystemMessages) {
var envelope = m.systemDequeue()
while (envelope ne null) {
deadLetterMailbox.systemEnqueue(envelope)
envelope = m.systemDequeue()
}
}
if (m.hasMessages) {
var envelope = m.dequeue
while (envelope ne null) {
deadLetterMailbox.enqueue(envelope)
envelope = m.dequeue
}
}
}
/**
* Traverses the list of actors (uuids) currently being attached to this dispatcher and stops those actors

View file

@ -12,15 +12,15 @@ import akka.event.EventHandler
/**
* Concrete publish requestor that requests publication of typed consumer actor methods on
* <code>ActorRegistered</code> events and unpublication of typed consumer actor methods on
* <code>ActorUnregistered</code> events.
* <code>TypedActorRegistered</code> events and unpublication of typed consumer actor methods on
* <code>TypedActorUnregistered</code> events.
*
* @author Martin Krasser
*/
private[camel] class TypedConsumerPublishRequestor extends PublishRequestor {
def receiveActorRegistryEvent = {
case ActorRegistered(_, actor, typedActor) for (event ConsumerMethodRegistered.eventsFor(actor, typedActor)) deliverCurrentEvent(event)
case ActorUnregistered(_, actor, typedActor) for (event ConsumerMethodUnregistered.eventsFor(actor, typedActor)) deliverCurrentEvent(event)
case TypedActorRegistered(_, actor, typedActor) for (event ConsumerMethodRegistered.eventsFor(actor, Option(typedActor))) deliverCurrentEvent(event)
case TypedActorUnregistered(_, actor, typedActor) for (event ConsumerMethodUnregistered.eventsFor(actor, Option(typedActor))) deliverCurrentEvent(event)
case _ ()
}
}

View file

@ -20,8 +20,8 @@ import akka.event.EventHandler
*/
private[camel] class ConsumerPublishRequestor extends PublishRequestor {
def receiveActorRegistryEvent = {
case ActorRegistered(_, actor, None) for (event ConsumerActorRegistered.eventFor(actor)) deliverCurrentEvent(event)
case ActorUnregistered(_, actor, None) for (event ConsumerActorUnregistered.eventFor(actor)) deliverCurrentEvent(event)
case ActorRegistered(_, actor) for (event ConsumerActorRegistered.eventFor(actor)) deliverCurrentEvent(event)
case ActorUnregistered(_, actor) for (event ConsumerActorUnregistered.eventFor(actor)) deliverCurrentEvent(event)
case _ ()
}
}

View file

@ -54,7 +54,7 @@ private[camel] abstract class PublishRequestor extends Actor {
* @author Martin Krasser
*/
private[camel] object PublishRequestor {
def pastActorRegisteredEvents = for (actor Actor.registry.local.actors) yield ActorRegistered(actor.address, actor, None)
def pastActorRegisteredEvents = for (actor Actor.registry.local.actors) yield ActorRegistered(actor.address, actor)
}
/**

View file

@ -36,7 +36,7 @@ class ConsumerPublishRequestorTest extends JUnitSuite {
@Test
def shouldReceiveOneConsumerRegisteredEvent = {
val latch = (publisher ? SetExpectedTestMessageCount(1)).as[CountDownLatch].get
requestor ! ActorRegistered(consumer.address, consumer, None)
requestor ! ActorRegistered(consumer.address, consumer)
assert(latch.await(5000, TimeUnit.MILLISECONDS))
assert((publisher ? GetRetainedMessage).get ===
ConsumerActorRegistered(consumer, consumer.underlyingActorInstance.asInstanceOf[Consumer]))
@ -45,7 +45,7 @@ class ConsumerPublishRequestorTest extends JUnitSuite {
@Test
def shouldReceiveOneConsumerUnregisteredEvent = {
val latch = (publisher ? SetExpectedTestMessageCount(1)).as[CountDownLatch].get
requestor ! ActorUnregistered(consumer.address, consumer, None)
requestor ! ActorUnregistered(consumer.address, consumer)
assert(latch.await(5000, TimeUnit.MILLISECONDS))
assert((publisher ? GetRetainedMessage).get ===
ConsumerActorUnregistered(consumer, consumer.underlyingActorInstance.asInstanceOf[Consumer]))

View file

@ -43,10 +43,25 @@ The messages sent to this Actor are:
.. code-block:: java
public class ActorRegistered {
ActorRef actor();
ActorRef getActor();
String getAddress();
}
public class ActorUnregistered {
ActorRef actor();
String getAddress();
}
public class TypedActorRegistered {
ActorRef getActor();
String getAddress();
Object getProxy();
}
public class TypedActorUnregistered {
ActorRef actor();
String getAddress();
Object getProxy();
}
So your listener Actor needs to be able to handle these two messages. Example:
@ -55,6 +70,8 @@ So your listener Actor needs to be able to handle these two messages. Example:
import akka.actor.ActorRegistered;
import akka.actor.ActorUnregistered;
import akka.actor.TypedActorRegistered;
import akka.actor.TypedActorUnregistered;
import akka.actor.UntypedActor;
import akka.event.EventHandler;

View file

@ -73,18 +73,17 @@ The messages sent to this Actor are:
.. code-block:: scala
case class ActorRegistered(actor: ActorRef)
case class ActorUnregistered(actor: ActorRef)
case class ActorRegistered(@BeanProperty address: String,@BeanProperty actor: ActorRef) extends ActorRegistryEvent
case class ActorUnregistered(@BeanProperty address: String, @BeanProperty actor: ActorRef) extends ActorRegistryEvent
case class TypedActorRegistered(@BeanProperty address: String, @BeanProperty actor: ActorRef, @BeanProperty proxy: AnyRef) extends ActorRegistryEvent
case class TypedActorUnregistered(@BeanProperty address: String, @BeanProperty actor: ActorRef, @BeanProperty proxy: AnyRef) extends ActorRegistryEvent
So your listener Actor needs to be able to handle these two messages. Example:
So your listener Actor needs to be able to handle these messages. Example:
.. code-block:: scala
import akka.actor.Actor
import akka.actor.ActorRegistered;
import akka.actor.ActorUnregistered;
import akka.actor.UntypedActor;
import akka.event.EventHandler;
import akka.actor._
import akka.event.EventHandler
class RegistryListener extends Actor {
def receive = {