diff --git a/akka-camel/src/main/scala/CamelService.scala b/akka-camel/src/main/scala/CamelService.scala index 3795b8a7fb..23ff029b5e 100644 --- a/akka-camel/src/main/scala/CamelService.scala +++ b/akka-camel/src/main/scala/CamelService.scala @@ -4,6 +4,7 @@ package se.scalablesolutions.akka.camel import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit import org.apache.camel.CamelContext @@ -94,12 +95,34 @@ trait CamelService extends Bootable with Logging { CamelContextManager.stop } + /** + * Waits for an expected number (count) of endpoints to be activated + * during execution of f. The wait-timeout is by default 10 seconds. + * Other timeout values can be set via the timeout and timeUnit + * parameters. + */ + def awaitEndpointActivation(count: Int, timeout: Long = 10, timeUnit: TimeUnit = TimeUnit.SECONDS)(f: => Unit) = { + val activation = expectEndpointActivationCount(count) + f; activation.await(timeout, timeUnit) + } + + /** + * Waits for an expected number (count) of endpoints to be de-activated + * during execution of f. The wait-timeout is by default 10 seconds. + * Other timeout values can be set via the timeout and timeUnit + * parameters. + */ + def awaitEndpointDeactivation(count: Int, timeout: Long = 10, timeUnit: TimeUnit = TimeUnit.SECONDS)(f: => Unit) = { + val activation = expectEndpointDeactivationCount(count) + f; activation.await(timeout, timeUnit) + } + /** * Sets an expectation on the number of upcoming endpoint activations and returns - * a CountDownLatch that can be used to wait for the activations to occur. Endpoint + * a CountDownLatch that can be used to wait for the activations to occur. Endpoint * activations that occurred in the past are not considered. */ - def expectEndpointActivationCount(count: Int): CountDownLatch = + private def expectEndpointActivationCount(count: Int): CountDownLatch = (consumerPublisher !! SetExpectedRegistrationCount(count)).as[CountDownLatch].get /** @@ -107,7 +130,7 @@ trait CamelService extends Bootable with Logging { * a CountDownLatch that can be used to wait for the de-activations to occur. Endpoint * de-activations that occurred in the past are not considered. */ - def expectEndpointDeactivationCount(count: Int): CountDownLatch = + private def expectEndpointDeactivationCount(count: Int): CountDownLatch = (consumerPublisher !! SetExpectedUnregistrationCount(count)).as[CountDownLatch].get } diff --git a/akka-camel/src/test/scala/ConsumerTest.scala b/akka-camel/src/test/scala/ConsumerTest.scala index 0af8aec7d5..ec8f6661fc 100644 --- a/akka-camel/src/test/scala/ConsumerTest.scala +++ b/akka-camel/src/test/scala/ConsumerTest.scala @@ -28,12 +28,9 @@ class ConsumerTest extends WordSpec with BeforeAndAfterAll with MustMatchers { // start consumer publisher, otherwise we cannot set message // count expectations in the next step (needed for testing only). service.consumerPublisher.start - // set expectations on publish count - val latch = service.expectEndpointActivationCount(1) - // start the CamelService - service.start - // await publication of first test consumer - latch.await(5000, TimeUnit.MILLISECONDS) must be (true) + service.awaitEndpointActivation(1) { + service.start + } must be (true) } override protected def afterAll = { @@ -55,9 +52,9 @@ class ConsumerTest extends WordSpec with BeforeAndAfterAll with MustMatchers { } "started" must { "support an in-out message exchange via its endpoint" in { - val latch = service.expectEndpointActivationCount(1) - consumer.start - latch.await(5000, TimeUnit.MILLISECONDS) must be (true) + service.awaitEndpointActivation(1) { + consumer.start + } must be (true) mandatoryTemplate.requestBody("direct:publish-test-2", "msg2") must equal ("received msg2") } "have an associated endpoint in the CamelContext" in { @@ -66,9 +63,9 @@ class ConsumerTest extends WordSpec with BeforeAndAfterAll with MustMatchers { } "stopped" must { "not support an in-out message exchange via its endpoint" in { - val latch = service.expectEndpointDeactivationCount(1) - consumer.stop - latch.await(5000, TimeUnit.MILLISECONDS) must be (true) + service.awaitEndpointDeactivation(1) { + consumer.stop + } must be (true) intercept[CamelExecutionException] { mandatoryTemplate.requestBody("direct:publish-test-2", "msg2") } @@ -80,9 +77,9 @@ class ConsumerTest extends WordSpec with BeforeAndAfterAll with MustMatchers { var actor: SampleTypedConsumer = null "started" must { "support in-out message exchanges via its endpoints" in { - val latch = service.expectEndpointActivationCount(3) - actor = TypedActor.newInstance(classOf[SampleTypedConsumer], classOf[SampleTypedConsumerImpl]) - latch.await(5000, TimeUnit.MILLISECONDS) must be (true) + service.awaitEndpointActivation(3) { + actor = TypedActor.newInstance(classOf[SampleTypedConsumer], classOf[SampleTypedConsumerImpl]) + } must be (true) mandatoryTemplate.requestBodyAndHeader("direct:m2", "x", "test", "y") must equal ("m2: x y") mandatoryTemplate.requestBodyAndHeader("direct:m3", "x", "test", "y") must equal ("m3: x y") mandatoryTemplate.requestBodyAndHeader("direct:m4", "x", "test", "y") must equal ("m4: x y") @@ -90,9 +87,9 @@ class ConsumerTest extends WordSpec with BeforeAndAfterAll with MustMatchers { } "stopped" must { "not support in-out message exchanges via its endpoints" in { - val latch = service.expectEndpointDeactivationCount(3) - TypedActor.stop(actor) - latch.await(5000, TimeUnit.MILLISECONDS) must be (true) + service.awaitEndpointDeactivation(3) { + TypedActor.stop(actor) + } must be (true) intercept[CamelExecutionException] { mandatoryTemplate.requestBodyAndHeader("direct:m2", "x", "test", "y") } @@ -110,18 +107,18 @@ class ConsumerTest extends WordSpec with BeforeAndAfterAll with MustMatchers { var actor: TestTypedConsumer = null "started" must { "support in-out message exchanges via its endpoints" in { - val latch = service.expectEndpointActivationCount(2) - actor = TypedActor.newInstance(classOf[TestTypedConsumer], classOf[TestTypedConsumerImpl]) - latch.await(5000, TimeUnit.MILLISECONDS) must be (true) + service.awaitEndpointActivation(2) { + actor = TypedActor.newInstance(classOf[TestTypedConsumer], classOf[TestTypedConsumerImpl]) + } must be (true) mandatoryTemplate.requestBody("direct:publish-test-3", "x") must equal ("foo: x") mandatoryTemplate.requestBody("direct:publish-test-4", "x") must equal ("bar: x") } } "stopped" must { "not support in-out message exchanges via its endpoints" in { - val latch = service.expectEndpointDeactivationCount(2) - TypedActor.stop(actor) - latch.await(5000, TimeUnit.MILLISECONDS) must be (true) + service.awaitEndpointDeactivation(2) { + TypedActor.stop(actor) + } must be (true) intercept[CamelExecutionException] { mandatoryTemplate.requestBody("direct:publish-test-3", "x") } @@ -136,17 +133,17 @@ class ConsumerTest extends WordSpec with BeforeAndAfterAll with MustMatchers { val consumer = UntypedActor.actorOf(classOf[SampleUntypedConsumer]) "started" must { "support an in-out message exchange via its endpoint" in { - val latch = service.expectEndpointActivationCount(1) - consumer.start - latch.await(5000, TimeUnit.MILLISECONDS) must be (true) + service.awaitEndpointActivation(1) { + consumer.start + } must be (true) mandatoryTemplate.requestBodyAndHeader("direct:test-untyped-consumer", "x", "test", "y") must equal ("x y") } } "stopped" must { "not support an in-out message exchange via its endpoint" in { - val latch = service.expectEndpointDeactivationCount(1) - consumer.stop - latch.await(5000, TimeUnit.MILLISECONDS) must be (true) + service.awaitEndpointDeactivation(1) { + consumer.stop + } must be (true) intercept[CamelExecutionException] { mandatoryTemplate.sendBodyAndHeader("direct:test-untyped-consumer", "blah", "test", "blub") } @@ -157,9 +154,9 @@ class ConsumerTest extends WordSpec with BeforeAndAfterAll with MustMatchers { "A non-responding, blocking consumer" when { "receiving an in-out message exchange" must { "lead to a TimeoutException" in { - val latch = service.expectEndpointActivationCount(1) - actorOf(new TestBlocker("direct:publish-test-5")).start - latch.await(5000, TimeUnit.MILLISECONDS) must be (true) + service.awaitEndpointActivation(1) { + actorOf(new TestBlocker("direct:publish-test-5")).start + } must be (true) try { mandatoryTemplate.requestBody("direct:publish-test-5", "msg3") diff --git a/akka-camel/src/test/scala/RemoteConsumerTest.scala b/akka-camel/src/test/scala/RemoteConsumerTest.scala index 2218aac25a..8c469a9379 100644 --- a/akka-camel/src/test/scala/RemoteConsumerTest.scala +++ b/akka-camel/src/test/scala/RemoteConsumerTest.scala @@ -45,9 +45,9 @@ class RemoteConsumerTest extends FeatureSpec with BeforeAndAfterAll with GivenWh val consumer = actorOf[RemoteConsumer].start when("remote consumer publication is triggered") - var latch = mandatoryService.expectEndpointActivationCount(1) - consumer !! "init" - assert(latch.await(5000, TimeUnit.MILLISECONDS)) + assert(mandatoryService.awaitEndpointActivation(1) { + consumer !! "init" + }) then("the published consumer is accessible via its endpoint URI") val response = CamelContextManager.mandatoryTemplate.requestBody("direct:remote-consumer", "test") @@ -61,10 +61,9 @@ class RemoteConsumerTest extends FeatureSpec with BeforeAndAfterAll with GivenWh val consumer = TypedActor.newRemoteInstance(classOf[SampleRemoteTypedConsumer], classOf[SampleRemoteTypedConsumerImpl], host, port) when("remote typed consumer publication is triggered") - var latch = mandatoryService.expectEndpointActivationCount(1) - consumer.foo("init") - assert(latch.await(5000, TimeUnit.MILLISECONDS)) - + assert(mandatoryService.awaitEndpointActivation(1) { + consumer.foo("init") + }) then("the published method is accessible via its endpoint URI") val response = CamelContextManager.mandatoryTemplate.requestBody("direct:remote-typed-consumer", "test") assert(response === "remote typed actor: test") @@ -77,10 +76,9 @@ class RemoteConsumerTest extends FeatureSpec with BeforeAndAfterAll with GivenWh val consumer = UntypedActor.actorOf(classOf[SampleRemoteUntypedConsumer]).start when("remote untyped consumer publication is triggered") - var latch = mandatoryService.expectEndpointActivationCount(1) - consumer.sendRequestReply(Message("init", Map("test" -> "init"))) - assert(latch.await(5000, TimeUnit.MILLISECONDS)) - + assert(mandatoryService.awaitEndpointActivation(1) { + consumer.sendRequestReply(Message("init", Map("test" -> "init"))) + }) then("the published untyped consumer is accessible via its endpoint URI") val response = CamelContextManager.mandatoryTemplate.requestBodyAndHeader("direct:remote-untyped-consumer", "a", "test", "b") assert(response === "a b") diff --git a/akka-samples/akka-sample-camel/src/main/scala/StandaloneApplication.scala b/akka-samples/akka-sample-camel/src/main/scala/StandaloneApplication.scala index 2ecccb1e02..80684be78a 100644 --- a/akka-samples/akka-sample-camel/src/main/scala/StandaloneApplication.scala +++ b/akka-samples/akka-sample-camel/src/main/scala/StandaloneApplication.scala @@ -28,14 +28,10 @@ object StandaloneApplication extends Application { // access 'externally' registered typed actors assert("hello msg1" == mandatoryContext.createProducerTemplate.requestBody("direct:test", "msg1")) - // set expectations on upcoming endpoint activation - val activation = mandatoryService.expectEndpointActivationCount(1) - - // 'internally' register typed actor (requires CamelService) - TypedActor.newInstance(classOf[TypedConsumer2], classOf[TypedConsumer2Impl]) - - // internal registration is done in background. Wait a bit ... - activation.await + mandatoryService.awaitEndpointActivation(1) { + // 'internally' register typed actor (requires CamelService) + TypedActor.newInstance(classOf[TypedConsumer2], classOf[TypedConsumer2Impl]) + } // access 'internally' (automatically) registered typed-actors // (see @consume annotation value at TypedConsumer2.foo method) @@ -85,17 +81,13 @@ object StandaloneJmsApplication extends Application { startCamelService - // Expect two consumer endpoints to be activated - val completion = mandatoryService.expectEndpointActivationCount(2) - val jmsUri = "jms:topic:test" - // Wire publisher and consumer using a JMS topic - val jmsSubscriber1 = Actor.actorOf(new Subscriber("jms-subscriber-1", jmsUri)).start - val jmsSubscriber2 = Actor.actorOf(new Subscriber("jms-subscriber-2", jmsUri)).start val jmsPublisher = Actor.actorOf(new Publisher("jms-publisher", jmsUri)).start - // wait for the consumer (subscriber) endpoint being activated - completion.await + mandatoryService.awaitEndpointActivation(2) { + Actor.actorOf(new Subscriber("jms-subscriber-1", jmsUri)).start + Actor.actorOf(new Subscriber("jms-subscriber-2", jmsUri)).start + } // Send 10 messages to via publisher actor for(i <- 1 to 10) { @@ -108,6 +100,5 @@ object StandaloneJmsApplication extends Application { } stopCamelService - ActorRegistry.shutdownAll } diff --git a/akka-samples/akka-sample-camel/src/test/scala/HttpConcurrencyTestStress.scala b/akka-samples/akka-sample-camel/src/test/scala/HttpConcurrencyTestStress.scala index 76cbc58a8b..e4530160f2 100644 --- a/akka-samples/akka-sample-camel/src/test/scala/HttpConcurrencyTestStress.scala +++ b/akka-samples/akka-sample-camel/src/test/scala/HttpConcurrencyTestStress.scala @@ -44,15 +44,15 @@ class HttpConcurrencyTestStress extends JUnitSuite { object HttpConcurrencyTestStress { @BeforeClass - def beforeClass = { + def beforeClass: Unit = { startCamelService val workers = for (i <- 1 to 8) yield actorOf[HttpServerWorker].start val balancer = loadBalancerActor(new CyclicIterator(workers.toList)) - val completion = service.get.expectEndpointActivationCount(1) - val server = actorOf(new HttpServerActor(balancer)).start - completion.await + service.get.awaitEndpointActivation(1) { + actorOf(new HttpServerActor(balancer)).start + } } @AfterClass