Deactivate endpoints of stopped consumer actors (AKKA-183)

This commit is contained in:
Martin Krasser 2010-05-09 15:35:43 +02:00
parent 6b30bc8c4f
commit 7d00a2b48b
7 changed files with 219 additions and 76 deletions

View file

@ -45,7 +45,7 @@ trait CamelService extends Bootable with Logging {
ActorRegistry.addRegistrationListener(publishRequestor.start)
// publish already registered consumer actors
for (publish <- Publish.forConsumers(ActorRegistry.actors)) consumerPublisher ! publish
for (actor <- ActorRegistry.actors; event <- ConsumerRegistered.forConsumer(actor)) consumerPublisher ! event
}
/**

View file

@ -16,36 +16,66 @@ import se.scalablesolutions.akka.util.Logging
/**
* Actor that publishes consumer actors as Camel endpoints at the CamelContext managed
* by se.scalablesolutions.akka.camel.CamelContextManager. It accepts messages of type
* se.scalablesolutions.akka.camel.service.Publish.
* se.scalablesolutions.akka.camel.service.ConsumerRegistered and
* se.scalablesolutions.akka.camel.service.ConsumerUnregistered.
*
* @author Martin Krasser
*/
class ConsumerPublisher extends Actor with Logging {
@volatile private var latch = new CountDownLatch(0)
@volatile private var publishLatch = new CountDownLatch(0)
@volatile private var unpublishLatch = new CountDownLatch(0)
/**
* Adds a route to the actor identified by a Publish message to the global CamelContext.
*/
protected def receive = {
case p: Publish => publish(new ConsumerRoute(p.endpointUri, p.id, p.uuid))
case _ => { /* ignore */}
case r: ConsumerRegistered => {
handleConsumerRegistered(r)
publishLatch.countDown // needed for testing only.
}
case u: ConsumerUnregistered => {
handleConsumerUnregistered(u)
unpublishLatch.countDown // needed for testing only.
}
case _ => { /* ignore */}
}
/**
* Sets the number of expected Publish messages received by this actor. Used for testing
* only.
* Sets the expected number of actors to be published. Used for testing only.
*/
private[camel] def expectPublishCount(count: Int): Unit = latch = new CountDownLatch(count)
private[camel] def expectPublishCount(count: Int): Unit =
publishLatch = new CountDownLatch(count)
/**
* Waits for the number of expected Publish messages to arrive. Used for testing only.
* Sets the expected number of actors to be unpublished. Used for testing only.
*/
private[camel] def awaitPublish = latch.await
private[camel] def expectUnpublishCount(count: Int): Unit =
unpublishLatch = new CountDownLatch(count)
private def publish(route: ConsumerRoute) {
CamelContextManager.context.addRoutes(route)
log.info("published actor via endpoint %s" format route.endpointUri)
latch.countDown // needed for testing only.
/**
* Waits for the expected number of actors to be published. Used for testing only.
*/
private[camel] def awaitPublish = publishLatch.await
/**
* Waits for the expected number of actors to be unpublished. Used for testing only.
*/
private[camel] def awaitUnpublish = unpublishLatch.await
/**
* Creates a route to the registered consumer actor.
*/
private def handleConsumerRegistered(event: ConsumerRegistered) {
CamelContextManager.context.addRoutes(new ConsumerRoute(event.uri, event.id, event.uuid))
log.info("published actor %s (%s) at endpoint %s" format (event.clazz, event.id, event.uri))
}
/**
* Stops route to the already un-registered consumer actor.
*/
private def handleConsumerUnregistered(event: ConsumerUnregistered) {
CamelContextManager.context.stopRoute(event.id)
log.info("unpublished actor %s (%s) from endpoint %s" format (event.clazz, event.id, event.uri))
}
}
@ -68,8 +98,8 @@ class ConsumerRoute(val endpointUri: String, id: String, uuid: Boolean) extends
def configure = {
val schema = endpointUri take endpointUri.indexOf(":") // e.g. "http" from "http://whatever/..."
bodyConversions.get(schema) match {
case Some(clazz) => from(endpointUri).convertBodyTo(clazz).to(actorUri)
case None => from(endpointUri).to(actorUri)
case Some(clazz) => from(endpointUri).routeId(id).convertBodyTo(clazz).to(actorUri)
case None => from(endpointUri).routeId(id).to(actorUri)
}
}
@ -77,59 +107,104 @@ class ConsumerRoute(val endpointUri: String, id: String, uuid: Boolean) extends
}
/**
* A registration listener that publishes consumer actors (and ignores other actors).
* A registration listener that triggers publication and un-publication of consumer actors.
*
* @author Martin Krasser
*/
class PublishRequestor(consumerPublisher: ActorRef) extends Actor {
protected def receive = {
case ActorUnregistered(actor) => { /* ignore */ }
case ActorRegistered(actor) => Publish.forConsumer(actor) match {
case Some(publish) => consumerPublisher ! publish
case None => { /* ignore */ }
}
case ActorRegistered(actor) => for (event <- ConsumerRegistered.forConsumer(actor)) consumerPublisher ! event
case ActorUnregistered(actor) => for (event <- ConsumerUnregistered.forConsumer(actor)) consumerPublisher ! event
}
}
/**
* Request message for publishing a consumer actor.
*
* @param endpointUri endpoint URI of the consumer actor
* @param id actor identifier
* @param uuid <code>true</code> if <code>id</code> refers to Actor.uuid, <code>false</code> if
* <code>id</code> refers to Acotr.getId.
* Consumer actor lifecycle event.
*
* @author Martin Krasser
*/
case class Publish(endpointUri: String, id: String, uuid: Boolean)
sealed trait ConsumerEvent
/**
* Event indicating that a consumer actor has been registered at the actor registry.
*
* @param clazz clazz name of the referenced actor
* @param uri endpoint URI of the consumer actor
* @param id actor identifier
* @param uuid <code>true</code> if <code>id</code> is the actor's uuid, <code>false</code> if
* <code>id</code> is the actor's id.
*
* @author Martin Krasser
*/
case class ConsumerRegistered(clazz: String, uri: String, id: String, uuid: Boolean) extends ConsumerEvent
/**
* Event indicating that a consumer actor has been unregistered from the actor registry.
*
* @param clazz clazz name of the referenced actor
* @param uri endpoint URI of the consumer actor
* @param id actor identifier
* @param uuid <code>true</code> if <code>id</code> is the actor's uuid, <code>false</code> if
* <code>id</code> is the actor's id.
*
* @author Martin Krasser
*/
case class ConsumerUnregistered(clazz: String, uri: String, id: String, uuid: Boolean) extends ConsumerEvent
/**
* @author Martin Krasser
*/
object Publish {
private[camel] object ConsumerRegistered {
/**
* Optionally creates an ConsumerRegistered event message for a consumer actor or None if
* <code>actorRef</code> is not a consumer actor.
*/
def forConsumer(actorRef: ActorRef): Option[ConsumerRegistered] = actorRef match {
case ConsumerDescriptor(clazz, uri, id, uuid) => Some(ConsumerRegistered(clazz, uri, id, uuid))
case _ => None
}
}
/**
* @author Martin Krasser
*/
private[camel] object ConsumerUnregistered {
/**
* Optionally creates an ConsumerUnregistered event message for a consumer actor or None if
* <code>actorRef</code> is not a consumer actor.
*/
def forConsumer(actorRef: ActorRef): Option[ConsumerUnregistered] = actorRef match {
case ConsumerDescriptor(clazz, uri, id, uuid) => Some(ConsumerUnregistered(clazz, uri, id, uuid))
case _ => None
}
}
/**
* Describes a consumer actor with elements that are relevant for publishing an actor at a
* Camel endpoint (or unpublishing an actor from an endpoint).
*
* @author Martin Krasser
*/
private[camel] object ConsumerDescriptor {
/**
* Creates a list of Publish request messages for all consumer actors in the <code>actors</code>
* list.
* An extractor that optionally creates a 4-tuple from a consumer actor reference containing
* the target actor's class name, endpoint URI, identifier and a hint whether the identifier
* is the actor uuid or actor id. If <code>actorRef</code> doesn't reference a consumer actor,
* None is returned.
*/
def forConsumers(actors: List[ActorRef]): List[Publish] =
for (actor <- actors; pub = forConsumer(actor); if pub.isDefined) yield pub.get
def unapply(actorRef: ActorRef): Option[(String, String, String, Boolean)] =
unapplyConsumerInstance(actorRef) orElse unapplyConsumeAnnotated(actorRef)
/**
* Creates a Publish request message if <code>actor</code> is a consumer actor.
*/
def forConsumer(actor: ActorRef): Option[Publish] =
forConsumeAnnotated(actor) orElse forConsumerType(actor)
private def forConsumeAnnotated(actorRef: ActorRef): Option[Publish] = {
private def unapplyConsumeAnnotated(actorRef: ActorRef): Option[(String, String, String, Boolean)] = {
val annotation = actorRef.actorClass.getAnnotation(classOf[consume])
if (annotation eq null) None
else if (actorRef.remoteAddress.isDefined) None // do not publish proxies
else Some(Publish(annotation.value, actorRef.id, false))
else if (actorRef.remoteAddress.isDefined) None
else Some((actorRef.actor.getClass.getName, annotation.value, actorRef.id, false))
}
private def forConsumerType(actorRef: ActorRef): Option[Publish] =
private def unapplyConsumerInstance(actorRef: ActorRef): Option[(String, String, String, Boolean)] =
if (!actorRef.actor.isInstanceOf[Consumer]) None
else if (actorRef.remoteAddress.isDefined) None
else Some(Publish(actorRef.actor.asInstanceOf[Consumer].endpointUri, actorRef.uuid, true))
else Some((actorRef.actor.getClass.getName, actorRef.actor.asInstanceOf[Consumer].endpointUri, actorRef.uuid, true))
}

View file

@ -62,9 +62,9 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi
given("two consumer actors registered before and after CamelService startup")
service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].expectPublishCount(1)
actorOf(new TestConsumer("direct:publish-test-2")).start
service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].awaitPublish
when("requests are sent to these actors")
service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].awaitPublish
val response1 = CamelContextManager.template.requestBody("direct:publish-test-1", "msg1")
val response2 = CamelContextManager.template.requestBody("direct:publish-test-2", "msg2")
@ -74,6 +74,32 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi
}
}
feature("Unpublish registered consumer actor from the global CamelContext") {
scenario("attempt access to unregistered consumer actor via Camel direct-endpoint") {
val endpointUri = "direct:unpublish-test-1"
given("a consumer actor that has been stopped")
assert(CamelContextManager.context.hasEndpoint(endpointUri) eq null)
service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].expectPublishCount(1)
val consumer = actorOf(new TestConsumer(endpointUri)).start
service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].awaitPublish
assert(CamelContextManager.context.hasEndpoint(endpointUri) ne null)
service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].expectUnpublishCount(1)
consumer.stop
service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].awaitUnpublish
// endpoint is still there but the route has been stopped
assert(CamelContextManager.context.hasEndpoint(endpointUri) ne null)
when("a request is sent to this actor")
val response1 = CamelContextManager.template.requestBody(endpointUri, "msg1")
then("the direct endpoint falls back to its default behaviour and returns the original message")
assert(response1 === "msg1")
}
}
feature("Configure a custom Camel route for the global CamelContext") {
scenario("access an actor from the custom Camel route") {

View file

@ -8,7 +8,7 @@ import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.actor.annotation.consume
import se.scalablesolutions.akka.camel.Consumer
object PublishTest {
object ConsumerRegisteredTest {
@consume("mock:test1")
class ConsumeAnnotatedActor extends Actor {
id = "test"
@ -25,27 +25,28 @@ object PublishTest {
}
}
class PublishTest extends JUnitSuite {
import PublishTest._
class ConsumerRegisteredTest extends JUnitSuite {
import ConsumerRegisteredTest._
@Test def shouldCreatePublishRequestList = {
val publish = Publish.forConsumers(List(actorOf[ConsumeAnnotatedActor]))
assert(publish === List(Publish("mock:test1", "test", false)))
val as = List(actorOf[ConsumeAnnotatedActor])
val events = for (a <- as; e <- ConsumerRegistered.forConsumer(a)) yield e
assert(events === List(ConsumerRegistered(classOf[ConsumeAnnotatedActor].getName, "mock:test1", "test", false)))
}
@Test def shouldCreateSomePublishRequestWithActorId = {
val publish = Publish.forConsumer(actorOf[ConsumeAnnotatedActor])
assert(publish === Some(Publish("mock:test1", "test", false)))
val event = ConsumerRegistered.forConsumer(actorOf[ConsumeAnnotatedActor])
assert(event === Some(ConsumerRegistered(classOf[ConsumeAnnotatedActor].getName, "mock:test1", "test", false)))
}
@Test def shouldCreateSomePublishRequestWithActorUuid = {
val ca = actorOf[ConsumerActor]
val publish = Publish.forConsumer(ca)
assert(publish === Some(Publish("mock:test2", ca.uuid, true)))
val event = ConsumerRegistered.forConsumer(ca)
assert(event === Some(ConsumerRegistered(ca.actor.getClass.getName, "mock:test2", ca.uuid, true)))
}
@Test def shouldCreateNone = {
val publish = Publish.forConsumer(actorOf[PlainActor])
assert(publish === None)
val event = ConsumerRegistered.forConsumer(actorOf[PlainActor])
assert(event === None)
}
}

View file

@ -1,41 +1,55 @@
package se.scalablesolutions.akka.camel.service
import org.junit.{After, Test}
import _root_.org.junit.{Before, After, Test}
import org.scalatest.junit.JUnitSuite
import se.scalablesolutions.akka.camel.Consumer
import se.scalablesolutions.akka.camel.support.{Receive, Countdown}
import se.scalablesolutions.akka.actor.{ActorRegistry, ActorRegistered, Actor}
import se.scalablesolutions.akka.actor.{Actor, ActorRef, ActorRegistry, ActorRegistered, ActorUnregistered}
import Actor._
object PublishRequestorTest {
class PublisherMock extends Actor with Receive[Publish] {
var received: Publish = _
class PublisherMock extends Actor with Receive[ConsumerEvent] {
var received: ConsumerEvent = _
protected def receive = {
case msg: Publish => onMessage(msg)
case msg: ConsumerRegistered => onMessage(msg)
case msg: ConsumerUnregistered => onMessage(msg)
}
def onMessage(msg: Publish) = received = msg
def onMessage(msg: ConsumerEvent) = received = msg
}
}
class PublishRequestorTest extends JUnitSuite {
import PublishRequestorTest._
@After def tearDown = ActorRegistry.shutdownAll
@Test def shouldReceivePublishRequestOnActorRegisteredEvent = {
val consumer = actorOf(new Actor with Consumer {
var publisher: ActorRef = _
var requestor: ActorRef = _
var consumer: ActorRef = _
@Before def setUp = {
publisher = actorOf(new PublisherMock with Countdown[ConsumerEvent]).start
requestor = actorOf(new PublishRequestor(publisher)).start
consumer = actorOf(new Actor with Consumer {
def endpointUri = "mock:test"
protected def receive = null
}).start
val publisher = actorOf(new PublisherMock with Countdown[Publish])
val requestor = actorOf(new PublishRequestor(publisher))
publisher.start
requestor.start
}
@After def tearDown = {
ActorRegistry.shutdownAll
}
@Test def shouldReceiveConsumerRegisteredEvent = {
requestor.!(ActorRegistered(consumer))(None)
publisher.actor.asInstanceOf[Countdown[Publish]].waitFor
assert(publisher.actor.asInstanceOf[PublisherMock].received === Publish("mock:test", consumer.uuid, true))
publisher.stop
requestor.stop
publisher.actor.asInstanceOf[Countdown[ConsumerEvent]].waitFor
assert(publisher.actor.asInstanceOf[PublisherMock].received ===
ConsumerRegistered(consumer.actor.getClass.getName, "mock:test", consumer.uuid, true))
}
@Test def shouldReceiveConsumerUnregisteredEvent = {
requestor.!(ActorUnregistered(consumer))(None)
publisher.actor.asInstanceOf[Countdown[ConsumerRegistered]].waitFor
assert(publisher.actor.asInstanceOf[PublisherMock].received ===
ConsumerUnregistered(consumer.actor.getClass.getName, "mock:test", consumer.uuid, true))
}
}

View file

@ -59,6 +59,31 @@ class Consumer3(transformer: ActorRef) extends Actor with Consumer {
}
}
class Consumer4 extends Actor with Consumer with Logging {
def endpointUri = "jetty:http://0.0.0.0:8877/camel/stop"
def receive = {
case msg: Message => msg.bodyAs(classOf[String]) match {
case "stop" => {
reply("Consumer4 stopped")
stop
}
case body => reply(body)
}
}
}
class Consumer5 extends Actor with Consumer with Logging {
def endpointUri = "jetty:http://0.0.0.0:8877/camel/start"
def receive = {
case _ => {
new Consumer4().start;
reply("Consumer4 started")
}
}
}
class Transformer(producer: ActorRef) extends Actor {
protected def receive = {
case msg: Message => producer.forward(msg.transformBody[String]("- %s -" format _))

View file

@ -61,7 +61,9 @@ class Boot {
//val cometdPublisherBridge = new PublisherBridge("jetty:http://0.0.0.0:8877/camel/pub/cometd", cometdPublisher).start
val jmsPublisherBridge = new PublisherBridge("jetty:http://0.0.0.0:8877/camel/pub/jms", jmsPublisher).start
new Consumer4().start // POSTing "stop" to http://0.0.0.0:8877/camel/stop stops and unpublishes this actor
new Consumer5().start // POSTing any msg to http://0.0.0.0:8877/camel/start starts and published Consumer4 again.
}
class CustomRouteBuilder extends RouteBuilder {