diff --git a/akka-camel/src/main/scala/CamelService.scala b/akka-camel/src/main/scala/CamelService.scala index 65a6a44fe5..11f2907c4d 100644 --- a/akka-camel/src/main/scala/CamelService.scala +++ b/akka-camel/src/main/scala/CamelService.scala @@ -1,9 +1,10 @@ /** * Copyright (C) 2009-2010 Scalable Solutions AB */ - package se.scalablesolutions.akka.camel +import java.util.concurrent.CountDownLatch + import se.scalablesolutions.akka.actor.Actor._ import se.scalablesolutions.akka.actor.{AspectInitRegistry, ActorRegistry} import se.scalablesolutions.akka.util.{Bootable, Logging} @@ -77,6 +78,22 @@ trait CamelService extends Bootable with Logging { * @see onUnload */ def unload = onUnload + + /** + * Sets an expectation of the number of upcoming endpoint activations and returns + * a {@link CountDownLatch} that can be used to wait for the activations to occur. + * Endpoint activations that occurred in the past are not considered. + */ + def expectEndpointActivationCount(count: Int): CountDownLatch = + (consumerPublisher !! SetExpectedRegistrationCount(count)).as[CountDownLatch].get + + /** + * Sets an expectation of the number of upcoming endpoint de-activations and returns + * a {@link CountDownLatch} that can be used to wait for the de-activations to occur. + * Endpoint de-activations that occurred in the past are not considered. + */ + def expectEndpointDeactivationCount(count: Int): CountDownLatch = + (consumerPublisher !! SetExpectedUnregistrationCount(count)).as[CountDownLatch].get } /** diff --git a/akka-camel/src/main/scala/ConsumerPublisher.scala b/akka-camel/src/main/scala/ConsumerPublisher.scala index 298c70c2b7..e50c625639 100644 --- a/akka-camel/src/main/scala/ConsumerPublisher.scala +++ b/akka-camel/src/main/scala/ConsumerPublisher.scala @@ -65,50 +65,50 @@ private[camel] object ConsumerPublisher extends Logging { * Actor that publishes consumer actors and active object methods at Camel endpoints. * The Camel context used for publishing is CamelContextManager.context. This actor * accepts messages of type - * se.scalablesolutions.akka.camel.service.ConsumerRegistered, - * se.scalablesolutions.akka.camel.service.ConsumerMethodRegistered and - * se.scalablesolutions.akka.camel.service.ConsumerUnregistered. + * se.scalablesolutions.akka.camel.ConsumerRegistered, + * se.scalablesolutions.akka.camel.ConsumerUnregistered. + * se.scalablesolutions.akka.camel.ConsumerMethodRegistered and + * se.scalablesolutions.akka.camel.ConsumerMethodUnregistered. * * @author Martin Krasser */ private[camel] class ConsumerPublisher extends Actor { import ConsumerPublisher._ - @volatile private var latch = new CountDownLatch(0) + @volatile private var registrationLatch = new CountDownLatch(0) + @volatile private var unregistrationLatch = new CountDownLatch(0) - /** - * Adds a route to the actor identified by a Publish message to the global CamelContext. - */ protected def receive = { case r: ConsumerRegistered => { handleConsumerRegistered(r) - latch.countDown // needed for testing only. + registrationLatch.countDown } case u: ConsumerUnregistered => { handleConsumerUnregistered(u) - latch.countDown // needed for testing only. + unregistrationLatch.countDown } case mr: ConsumerMethodRegistered => { handleConsumerMethodRegistered(mr) - latch.countDown // needed for testing only. + registrationLatch.countDown } case mu: ConsumerMethodUnregistered => { handleConsumerMethodUnregistered(mu) - latch.countDown // needed for testing only. + unregistrationLatch.countDown } - case SetExpectedMessageCount(num) => { - // needed for testing only. - latch = new CountDownLatch(num) - self.reply(latch) + case SetExpectedRegistrationCount(num) => { + registrationLatch = new CountDownLatch(num) + self.reply(registrationLatch) + } + case SetExpectedUnregistrationCount(num) => { + unregistrationLatch = new CountDownLatch(num) + self.reply(unregistrationLatch) } case _ => { /* ignore */} } } -/** - * Command message used For testing-purposes only. - */ -private[camel] case class SetExpectedMessageCount(num: Int) +private[camel] case class SetExpectedRegistrationCount(num: Int) +private[camel] case class SetExpectedUnregistrationCount(num: Int) /** * Defines an abstract route to a target which is either an actor or an active object method.. diff --git a/akka-camel/src/test/scala/CamelServiceFeatureTest.scala b/akka-camel/src/test/scala/CamelServiceFeatureTest.scala index df42237a61..09d386c383 100644 --- a/akka-camel/src/test/scala/CamelServiceFeatureTest.scala +++ b/akka-camel/src/test/scala/CamelServiceFeatureTest.scala @@ -27,7 +27,7 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi // count expectations in the next step (needed for testing only). service.consumerPublisher.start // set expectations on publish count - val latch = (service.consumerPublisher !! SetExpectedMessageCount(1)).as[CountDownLatch].get + val latch = service.expectEndpointActivationCount(1) // start the CamelService service.load // await publication of first test consumer @@ -44,7 +44,7 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi scenario("access non-blocking consumer actors via Camel direct-endpoints") { given("two consumer actors registered before and after CamelService startup") - val latch = (service.consumerPublisher !! SetExpectedMessageCount(1)).as[CountDownLatch].get + val latch = service.expectEndpointActivationCount(1) actorOf(new TestConsumer("direct:publish-test-2")).start assert(latch.await(5000, TimeUnit.MILLISECONDS)) @@ -60,7 +60,7 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi scenario("access blocking, non-responding consumer actor via a Camel direct-endpoint") { given("a consumer actor registered after CamelService startup") - val latch = (service.consumerPublisher !! SetExpectedMessageCount(1)).as[CountDownLatch].get + val latch = service.expectEndpointActivationCount(1) actorOf(new TestBlocker("direct:publish-test-3")).start assert(latch.await(5000, TimeUnit.MILLISECONDS)) @@ -84,13 +84,13 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi given("a consumer actor registered after CamelService startup") assert(CamelContextManager.context.hasEndpoint(endpointUri) eq null) - var latch = (service.consumerPublisher !! SetExpectedMessageCount(1)).as[CountDownLatch].get + var latch = service.expectEndpointActivationCount(1) val consumer = actorOf(new TestConsumer(endpointUri)).start assert(latch.await(5000, TimeUnit.MILLISECONDS)) assert(CamelContextManager.context.hasEndpoint(endpointUri) ne null) when("the actor is stopped") - latch = (service.consumerPublisher !! SetExpectedMessageCount(1)).as[CountDownLatch].get + latch = service.expectEndpointDeactivationCount(1) consumer.stop assert(latch.await(5000, TimeUnit.MILLISECONDS)) @@ -121,7 +121,7 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi scenario("access active object methods via Camel direct-endpoints") { given("an active object registered after CamelService startup") - var latch = (service.consumerPublisher !! SetExpectedMessageCount(3)).as[CountDownLatch].get + var latch = service.expectEndpointActivationCount(3) val obj = ActiveObject.newInstance(classOf[PojoBase]) assert(latch.await(5000, TimeUnit.MILLISECONDS)) @@ -136,7 +136,7 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi assert(response3 === "m4base: x y") // cleanup to avoid conflicts with next test (i.e. avoid multiple consumers on direct-endpoints) - latch = (service.consumerPublisher !! SetExpectedMessageCount(3)).as[CountDownLatch].get + latch = service.expectEndpointDeactivationCount(3) ActiveObject.stop(obj) assert(latch.await(5000, TimeUnit.MILLISECONDS)) } @@ -144,15 +144,15 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi feature("Unpublish active object method from the global CamelContext") { - scenario("access to unregistered active object methof via Camel direct-endpoint fails") { + scenario("access to unregistered active object method via Camel direct-endpoint fails") { given("an active object registered after CamelService startup") - var latch = (service.consumerPublisher !! SetExpectedMessageCount(3)).as[CountDownLatch].get + var latch = service.expectEndpointActivationCount(3) val obj = ActiveObject.newInstance(classOf[PojoBase]) assert(latch.await(5000, TimeUnit.MILLISECONDS)) when("the active object is stopped") - latch = (service.consumerPublisher !! SetExpectedMessageCount(3)).as[CountDownLatch].get + latch = service.expectEndpointDeactivationCount(3) ActiveObject.stop(obj) assert(latch.await(5000, TimeUnit.MILLISECONDS)) diff --git a/akka-camel/src/test/scala/RemoteConsumerTest.scala b/akka-camel/src/test/scala/RemoteConsumerTest.scala index 7e3b666590..25c6d6b975 100644 --- a/akka-camel/src/test/scala/RemoteConsumerTest.scala +++ b/akka-camel/src/test/scala/RemoteConsumerTest.scala @@ -45,7 +45,7 @@ class RemoteConsumerTest extends FeatureSpec with BeforeAndAfterAll with GivenWh val consumer = actorOf[RemoteConsumer].start when("remote consumer publication is triggered") - val latch = (service.consumerPublisher !! SetExpectedMessageCount(1)).as[CountDownLatch].get + var latch = service.expectEndpointActivationCount(1) consumer !! "init" assert(latch.await(5000, TimeUnit.MILLISECONDS)) @@ -61,7 +61,7 @@ class RemoteConsumerTest extends FeatureSpec with BeforeAndAfterAll with GivenWh val consumer = ActiveObject.newRemoteInstance(classOf[PojoRemote], host, port) when("remote consumer publication is triggered") - val latch = (service.consumerPublisher !! SetExpectedMessageCount(1)).as[CountDownLatch].get + var latch = service.expectEndpointActivationCount(1) consumer.foo("init") assert(latch.await(5000, TimeUnit.MILLISECONDS))