Closes #333 Allow applications to wait for endpoints being activated

This commit is contained in:
Martin Krasser 2010-07-21 14:32:07 +02:00
parent 122bba2b84
commit c5f30d8948
4 changed files with 49 additions and 32 deletions

View file

@ -1,9 +1,10 @@
/** /**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se> * Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/ */
package se.scalablesolutions.akka.camel package se.scalablesolutions.akka.camel
import java.util.concurrent.CountDownLatch
import se.scalablesolutions.akka.actor.Actor._ import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.actor.{AspectInitRegistry, ActorRegistry} import se.scalablesolutions.akka.actor.{AspectInitRegistry, ActorRegistry}
import se.scalablesolutions.akka.util.{Bootable, Logging} import se.scalablesolutions.akka.util.{Bootable, Logging}
@ -77,6 +78,22 @@ trait CamelService extends Bootable with Logging {
* @see onUnload * @see onUnload
*/ */
def unload = onUnload def unload = onUnload
/**
* Sets an expectation of the number of upcoming endpoint activations and returns
* a {@link CountDownLatch} that can be used to wait for the activations to occur.
* Endpoint activations that occurred in the past are not considered.
*/
def expectEndpointActivationCount(count: Int): CountDownLatch =
(consumerPublisher !! SetExpectedRegistrationCount(count)).as[CountDownLatch].get
/**
* Sets an expectation of the number of upcoming endpoint de-activations and returns
* a {@link CountDownLatch} that can be used to wait for the de-activations to occur.
* Endpoint de-activations that occurred in the past are not considered.
*/
def expectEndpointDeactivationCount(count: Int): CountDownLatch =
(consumerPublisher !! SetExpectedUnregistrationCount(count)).as[CountDownLatch].get
} }
/** /**

View file

@ -65,50 +65,50 @@ private[camel] object ConsumerPublisher extends Logging {
* Actor that publishes consumer actors and active object methods at Camel endpoints. * Actor that publishes consumer actors and active object methods at Camel endpoints.
* The Camel context used for publishing is CamelContextManager.context. This actor * The Camel context used for publishing is CamelContextManager.context. This actor
* accepts messages of type * accepts messages of type
* se.scalablesolutions.akka.camel.service.ConsumerRegistered, * se.scalablesolutions.akka.camel.ConsumerRegistered,
* se.scalablesolutions.akka.camel.service.ConsumerMethodRegistered and * se.scalablesolutions.akka.camel.ConsumerUnregistered.
* se.scalablesolutions.akka.camel.service.ConsumerUnregistered. * se.scalablesolutions.akka.camel.ConsumerMethodRegistered and
* se.scalablesolutions.akka.camel.ConsumerMethodUnregistered.
* *
* @author Martin Krasser * @author Martin Krasser
*/ */
private[camel] class ConsumerPublisher extends Actor { private[camel] class ConsumerPublisher extends Actor {
import ConsumerPublisher._ import ConsumerPublisher._
@volatile private var latch = new CountDownLatch(0) @volatile private var registrationLatch = new CountDownLatch(0)
@volatile private var unregistrationLatch = new CountDownLatch(0)
/**
* Adds a route to the actor identified by a Publish message to the global CamelContext.
*/
protected def receive = { protected def receive = {
case r: ConsumerRegistered => { case r: ConsumerRegistered => {
handleConsumerRegistered(r) handleConsumerRegistered(r)
latch.countDown // needed for testing only. registrationLatch.countDown
} }
case u: ConsumerUnregistered => { case u: ConsumerUnregistered => {
handleConsumerUnregistered(u) handleConsumerUnregistered(u)
latch.countDown // needed for testing only. unregistrationLatch.countDown
} }
case mr: ConsumerMethodRegistered => { case mr: ConsumerMethodRegistered => {
handleConsumerMethodRegistered(mr) handleConsumerMethodRegistered(mr)
latch.countDown // needed for testing only. registrationLatch.countDown
} }
case mu: ConsumerMethodUnregistered => { case mu: ConsumerMethodUnregistered => {
handleConsumerMethodUnregistered(mu) handleConsumerMethodUnregistered(mu)
latch.countDown // needed for testing only. unregistrationLatch.countDown
} }
case SetExpectedMessageCount(num) => { case SetExpectedRegistrationCount(num) => {
// needed for testing only. registrationLatch = new CountDownLatch(num)
latch = new CountDownLatch(num) self.reply(registrationLatch)
self.reply(latch) }
case SetExpectedUnregistrationCount(num) => {
unregistrationLatch = new CountDownLatch(num)
self.reply(unregistrationLatch)
} }
case _ => { /* ignore */} case _ => { /* ignore */}
} }
} }
/** private[camel] case class SetExpectedRegistrationCount(num: Int)
* Command message used For testing-purposes only. private[camel] case class SetExpectedUnregistrationCount(num: Int)
*/
private[camel] case class SetExpectedMessageCount(num: Int)
/** /**
* Defines an abstract route to a target which is either an actor or an active object method.. * Defines an abstract route to a target which is either an actor or an active object method..

View file

@ -27,7 +27,7 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi
// count expectations in the next step (needed for testing only). // count expectations in the next step (needed for testing only).
service.consumerPublisher.start service.consumerPublisher.start
// set expectations on publish count // set expectations on publish count
val latch = (service.consumerPublisher !! SetExpectedMessageCount(1)).as[CountDownLatch].get val latch = service.expectEndpointActivationCount(1)
// start the CamelService // start the CamelService
service.load service.load
// await publication of first test consumer // await publication of first test consumer
@ -44,7 +44,7 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi
scenario("access non-blocking consumer actors via Camel direct-endpoints") { scenario("access non-blocking consumer actors via Camel direct-endpoints") {
given("two consumer actors registered before and after CamelService startup") given("two consumer actors registered before and after CamelService startup")
val latch = (service.consumerPublisher !! SetExpectedMessageCount(1)).as[CountDownLatch].get val latch = service.expectEndpointActivationCount(1)
actorOf(new TestConsumer("direct:publish-test-2")).start actorOf(new TestConsumer("direct:publish-test-2")).start
assert(latch.await(5000, TimeUnit.MILLISECONDS)) assert(latch.await(5000, TimeUnit.MILLISECONDS))
@ -60,7 +60,7 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi
scenario("access blocking, non-responding consumer actor via a Camel direct-endpoint") { scenario("access blocking, non-responding consumer actor via a Camel direct-endpoint") {
given("a consumer actor registered after CamelService startup") given("a consumer actor registered after CamelService startup")
val latch = (service.consumerPublisher !! SetExpectedMessageCount(1)).as[CountDownLatch].get val latch = service.expectEndpointActivationCount(1)
actorOf(new TestBlocker("direct:publish-test-3")).start actorOf(new TestBlocker("direct:publish-test-3")).start
assert(latch.await(5000, TimeUnit.MILLISECONDS)) assert(latch.await(5000, TimeUnit.MILLISECONDS))
@ -84,13 +84,13 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi
given("a consumer actor registered after CamelService startup") given("a consumer actor registered after CamelService startup")
assert(CamelContextManager.context.hasEndpoint(endpointUri) eq null) assert(CamelContextManager.context.hasEndpoint(endpointUri) eq null)
var latch = (service.consumerPublisher !! SetExpectedMessageCount(1)).as[CountDownLatch].get var latch = service.expectEndpointActivationCount(1)
val consumer = actorOf(new TestConsumer(endpointUri)).start val consumer = actorOf(new TestConsumer(endpointUri)).start
assert(latch.await(5000, TimeUnit.MILLISECONDS)) assert(latch.await(5000, TimeUnit.MILLISECONDS))
assert(CamelContextManager.context.hasEndpoint(endpointUri) ne null) assert(CamelContextManager.context.hasEndpoint(endpointUri) ne null)
when("the actor is stopped") when("the actor is stopped")
latch = (service.consumerPublisher !! SetExpectedMessageCount(1)).as[CountDownLatch].get latch = service.expectEndpointDeactivationCount(1)
consumer.stop consumer.stop
assert(latch.await(5000, TimeUnit.MILLISECONDS)) assert(latch.await(5000, TimeUnit.MILLISECONDS))
@ -121,7 +121,7 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi
scenario("access active object methods via Camel direct-endpoints") { scenario("access active object methods via Camel direct-endpoints") {
given("an active object registered after CamelService startup") given("an active object registered after CamelService startup")
var latch = (service.consumerPublisher !! SetExpectedMessageCount(3)).as[CountDownLatch].get var latch = service.expectEndpointActivationCount(3)
val obj = ActiveObject.newInstance(classOf[PojoBase]) val obj = ActiveObject.newInstance(classOf[PojoBase])
assert(latch.await(5000, TimeUnit.MILLISECONDS)) assert(latch.await(5000, TimeUnit.MILLISECONDS))
@ -136,7 +136,7 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi
assert(response3 === "m4base: x y") assert(response3 === "m4base: x y")
// cleanup to avoid conflicts with next test (i.e. avoid multiple consumers on direct-endpoints) // cleanup to avoid conflicts with next test (i.e. avoid multiple consumers on direct-endpoints)
latch = (service.consumerPublisher !! SetExpectedMessageCount(3)).as[CountDownLatch].get latch = service.expectEndpointDeactivationCount(3)
ActiveObject.stop(obj) ActiveObject.stop(obj)
assert(latch.await(5000, TimeUnit.MILLISECONDS)) assert(latch.await(5000, TimeUnit.MILLISECONDS))
} }
@ -144,15 +144,15 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi
feature("Unpublish active object method from the global CamelContext") { feature("Unpublish active object method from the global CamelContext") {
scenario("access to unregistered active object methof via Camel direct-endpoint fails") { scenario("access to unregistered active object method via Camel direct-endpoint fails") {
given("an active object registered after CamelService startup") given("an active object registered after CamelService startup")
var latch = (service.consumerPublisher !! SetExpectedMessageCount(3)).as[CountDownLatch].get var latch = service.expectEndpointActivationCount(3)
val obj = ActiveObject.newInstance(classOf[PojoBase]) val obj = ActiveObject.newInstance(classOf[PojoBase])
assert(latch.await(5000, TimeUnit.MILLISECONDS)) assert(latch.await(5000, TimeUnit.MILLISECONDS))
when("the active object is stopped") when("the active object is stopped")
latch = (service.consumerPublisher !! SetExpectedMessageCount(3)).as[CountDownLatch].get latch = service.expectEndpointDeactivationCount(3)
ActiveObject.stop(obj) ActiveObject.stop(obj)
assert(latch.await(5000, TimeUnit.MILLISECONDS)) assert(latch.await(5000, TimeUnit.MILLISECONDS))

View file

@ -45,7 +45,7 @@ class RemoteConsumerTest extends FeatureSpec with BeforeAndAfterAll with GivenWh
val consumer = actorOf[RemoteConsumer].start val consumer = actorOf[RemoteConsumer].start
when("remote consumer publication is triggered") when("remote consumer publication is triggered")
val latch = (service.consumerPublisher !! SetExpectedMessageCount(1)).as[CountDownLatch].get var latch = service.expectEndpointActivationCount(1)
consumer !! "init" consumer !! "init"
assert(latch.await(5000, TimeUnit.MILLISECONDS)) assert(latch.await(5000, TimeUnit.MILLISECONDS))
@ -61,7 +61,7 @@ class RemoteConsumerTest extends FeatureSpec with BeforeAndAfterAll with GivenWh
val consumer = ActiveObject.newRemoteInstance(classOf[PojoRemote], host, port) val consumer = ActiveObject.newRemoteInstance(classOf[PojoRemote], host, port)
when("remote consumer publication is triggered") when("remote consumer publication is triggered")
val latch = (service.consumerPublisher !! SetExpectedMessageCount(1)).as[CountDownLatch].get var latch = service.expectEndpointActivationCount(1)
consumer.foo("init") consumer.foo("init")
assert(latch.await(5000, TimeUnit.MILLISECONDS)) assert(latch.await(5000, TimeUnit.MILLISECONDS))