diff --git a/akka-camel/src/main/scala/akka/camel/Activation.scala b/akka-camel/src/main/scala/akka/camel/Activation.scala index 768ea22b3a..a23cc923b4 100644 --- a/akka-camel/src/main/scala/akka/camel/Activation.scala +++ b/akka-camel/src/main/scala/akka/camel/Activation.scala @@ -7,7 +7,6 @@ package akka.camel import akka.camel.internal._ import akka.util.Timeout import scala.concurrent.Future -import java.util.concurrent.TimeoutException import akka.actor.{ ActorSystem, Props, ActorRef } import akka.pattern._ import scala.concurrent.util.Duration @@ -17,72 +16,33 @@ import scala.concurrent.util.Duration * The Camel endpoints are activated asynchronously. This trait can signal when an endpoint is activated or de-activated. */ trait Activation { - import scala.concurrent.Await - def system: ActorSystem //FIXME Why is this here, what's it needed for and who should use it? private val activationTracker = system.actorOf(Props[ActivationTracker], "camelActivationTracker") //FIXME Why is this also top level? /** - * Awaits for endpoint to be activated. It blocks until the endpoint is registered in camel context or timeout expires. - * @param endpoint the endpoint to wait for to be activated - * @param timeout the timeout for the wait - * @throws akka.camel.ActivationTimeoutException if endpoint is not activated within timeout. - * @return the activated ActorRef - */ - def awaitActivation(endpoint: ActorRef, timeout: Duration): ActorRef = - try Await.result(activationFutureFor(endpoint, timeout), timeout) catch { - case e: TimeoutException ⇒ throw new ActivationTimeoutException(endpoint, timeout) - } - - /** - * Awaits for endpoint to be de-activated. It is blocking until endpoint is unregistered in camel context or timeout expires. - * @param endpoint the endpoint to wait for to be de-activated - * @param timeout the timeout for the wait - * @throws akka.camel.DeActivationTimeoutException if endpoint is not de-activated within timeout. - */ - def awaitDeactivation(endpoint: ActorRef, timeout: Duration): Unit = - try Await.result(deactivationFutureFor(endpoint, timeout), timeout) catch { - case e: TimeoutException ⇒ throw new DeActivationTimeoutException(endpoint, timeout) - } - - /** - * Similar to `awaitActivation` but returns a future instead. + * Produces a Future with the specified endpoint that will be completed when the endpoint has been activated, + * or if it times out, which will happen after the specified Timeout. + * * @param endpoint the endpoint to be activated * @param timeout the timeout for the Future */ - def activationFutureFor(endpoint: ActorRef, timeout: Duration): Future[ActorRef] = + def activationFutureFor(endpoint: ActorRef)(implicit timeout: Duration): Future[ActorRef] = (activationTracker.ask(AwaitActivation(endpoint))(Timeout(timeout))).map[ActorRef]({ - case EndpointActivated(_) ⇒ endpoint + case EndpointActivated(`endpoint`) ⇒ endpoint case EndpointFailedToActivate(_, cause) ⇒ throw cause })(system.dispatcher) /** - * Similar to awaitDeactivation but returns a future instead. + * Produces a Future which will be completed when the given endpoint has been deactivated or + * or if it times out, which will happen after the specified Timeout. + * * @param endpoint the endpoint to be deactivated * @param timeout the timeout of the Future */ - def deactivationFutureFor(endpoint: ActorRef, timeout: Duration): Future[Unit] = + def deactivationFutureFor(endpoint: ActorRef)(implicit timeout: Duration): Future[Unit] = (activationTracker.ask(AwaitDeActivation(endpoint))(Timeout(timeout))).map[Unit]({ - case EndpointDeActivated(_) ⇒ () + case EndpointDeActivated(`endpoint`) ⇒ () case EndpointFailedToDeActivate(_, cause) ⇒ throw cause })(system.dispatcher) -} - -/** - * An exception for when a timeout has occurred during deactivation of an endpoint. - * @param endpoint the endpoint that could not be de-activated in time - * @param timeout the timeout - */ -class DeActivationTimeoutException(endpoint: ActorRef, timeout: Duration) extends TimeoutException { - override def getMessage: String = "Timed out after %s, while waiting for de-activation of %s" format (timeout, endpoint.path) -} - -/** - * An exception for when a timeout has occurred during the activation of an endpoint. - * @param endpoint the endpoint that could not be activated in time - * @param timeout the timeout - */ -class ActivationTimeoutException(endpoint: ActorRef, timeout: Duration) extends TimeoutException { - override def getMessage: String = "Timed out after %s, while waiting for activation of %s" format (timeout, endpoint.path) } \ No newline at end of file diff --git a/akka-camel/src/main/scala/akka/camel/internal/ConsumerRegistry.scala b/akka-camel/src/main/scala/akka/camel/internal/ConsumerRegistry.scala index e172598b57..e8cfdbd609 100644 --- a/akka-camel/src/main/scala/akka/camel/internal/ConsumerRegistry.scala +++ b/akka-camel/src/main/scala/akka/camel/internal/ConsumerRegistry.scala @@ -15,6 +15,8 @@ import collection.mutable import org.apache.camel.model.RouteDefinition import org.apache.camel.CamelContext import scala.concurrent.util.Duration +import concurrent.Await +import akka.util.Timeout /** * For internal use only. @@ -30,7 +32,7 @@ private[camel] trait ConsumerRegistry { this: Activation ⇒ */ private[this] lazy val idempotentRegistry = system.actorOf(Props(new IdempotentCamelConsumerRegistry(context))) /** - * For internal use only. + * For internal use only. BLOCKING * @param endpointUri the URI to register the consumer on * @param consumer the consumer * @param activationTimeout the timeout for activation @@ -38,7 +40,7 @@ private[camel] trait ConsumerRegistry { this: Activation ⇒ */ private[camel] def registerConsumer(endpointUri: String, consumer: Consumer, activationTimeout: Duration) = { idempotentRegistry ! RegisterConsumer(endpointUri, consumer.self, consumer) - awaitActivation(consumer.self, activationTimeout) + Await.result(activationFutureFor(consumer.self)(activationTimeout), activationTimeout) } } @@ -79,6 +81,7 @@ private[camel] class IdempotentCamelConsumerRegistry(camelContext: CamelContext) def isAlreadyActivated(ref: ActorRef): Boolean = activated.contains(ref) + //FIXME Break out class CamelConsumerRegistrator extends Actor with ActorLogging { def receive = { diff --git a/akka-camel/src/test/java/akka/camel/ConsumerJavaTestBase.java b/akka-camel/src/test/java/akka/camel/ConsumerJavaTestBase.java index 6a1eec27de..8bb72baf00 100644 --- a/akka-camel/src/test/java/akka/camel/ConsumerJavaTestBase.java +++ b/akka-camel/src/test/java/akka/camel/ConsumerJavaTestBase.java @@ -7,10 +7,12 @@ package akka.camel; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; -import scala.concurrent.util.FiniteDuration; +import scala.concurrent.Await; +import scala.concurrent.util.Duration; import org.junit.AfterClass; import org.junit.Test; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; @@ -22,7 +24,7 @@ import static org.junit.Assert.assertEquals; public class ConsumerJavaTestBase { static ActorSystem system = ActorSystem.create("test"); - static Camel camel = (Camel) CamelExtension.get(system); + static Camel camel = CamelExtension.get(system); @AfterClass @@ -31,9 +33,11 @@ public class ConsumerJavaTestBase { } @Test - public void shouldHandleExceptionThrownByActorAndGenerateCustomResponse() { - ActorRef ref = system.actorOf(new Props().withCreator(SampleErrorHandlingConsumer.class)); - camel.awaitActivation(ref, new FiniteDuration(1, TimeUnit.SECONDS)); + public void shouldHandleExceptionThrownByActorAndGenerateCustomResponse() throws Exception { + Duration timeout = Duration.create(1, TimeUnit.SECONDS); + ActorRef ref = Await.result( + camel.activationFutureFor(system.actorOf(new Props(SampleErrorHandlingConsumer.class)), timeout), + timeout); String result = camel.template().requestBody("direct:error-handler-test-java", "hello", String.class); assertEquals("error: hello", result); diff --git a/akka-camel/src/test/java/akka/camel/CustomRouteTestBase.java b/akka-camel/src/test/java/akka/camel/CustomRouteTestBase.java index 517557f0a7..df668b72a1 100644 --- a/akka-camel/src/test/java/akka/camel/CustomRouteTestBase.java +++ b/akka-camel/src/test/java/akka/camel/CustomRouteTestBase.java @@ -4,7 +4,8 @@ import akka.actor.*; import akka.camel.internal.component.CamelPath; import akka.camel.javaapi.UntypedConsumerActor; import akka.camel.javaapi.UntypedProducerActor; -import scala.concurrent.util.FiniteDuration; +import scala.concurrent.Await; +import scala.concurrent.util.Duration; import org.apache.camel.CamelExecutionException; import org.apache.camel.Exchange; import org.apache.camel.Predicate; @@ -13,7 +14,6 @@ import org.apache.camel.component.mock.MockEndpoint; import org.junit.Before; import org.junit.After; import org.junit.Test; - import java.util.concurrent.TimeUnit; public class CustomRouteTestBase { @@ -58,8 +58,10 @@ public class CustomRouteTestBase { @Test public void testCustomConsumerRoute() throws Exception { MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockConsumer", MockEndpoint.class); - ActorRef consumer = system.actorOf(new Props(TestConsumer.class), "testConsumer"); - camel.awaitActivation(consumer,new FiniteDuration(10, TimeUnit.SECONDS)); + Duration timeout = Duration.create(10, TimeUnit.SECONDS); + ActorRef consumer = Await.result( + camel.activationFutureFor(system.actorOf(new Props(TestConsumer.class), "testConsumer"), timeout), + timeout); camel.context().addRoutes(new CustomRouteBuilder("direct:testRouteConsumer",consumer)); camel.template().sendBody("direct:testRouteConsumer", "test"); assertMockEndpoint(mockEndpoint); @@ -69,13 +71,14 @@ public class CustomRouteTestBase { @Test public void testCustomAckConsumerRoute() throws Exception { MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockAck", MockEndpoint.class); - ActorRef consumer = system.actorOf(new Props( new UntypedActorFactory(){ - public Actor create() { - return new TestAckConsumer("direct:testConsumerAck","mock:mockAck"); - } - }), "testConsumerAck"); - camel.awaitActivation(consumer,new FiniteDuration(10, TimeUnit.SECONDS)); - camel.context().addRoutes(new CustomRouteBuilder("direct:testAck", consumer, false, new FiniteDuration(10, TimeUnit.SECONDS))); + Duration timeout = Duration.create(10, TimeUnit.SECONDS); + ActorRef consumer = Await.result( + camel.activationFutureFor( + system.actorOf( + new Props( new UntypedActorFactory(){ public Actor create() { return new TestAckConsumer("direct:testConsumerAck","mock:mockAck"); } }), "testConsumerAck"), + timeout), + timeout); + camel.context().addRoutes(new CustomRouteBuilder("direct:testAck", consumer, false, timeout)); camel.template().sendBody("direct:testAck", "test"); assertMockEndpoint(mockEndpoint); system.stop(consumer); @@ -84,12 +87,11 @@ public class CustomRouteTestBase { @Test public void testCustomAckConsumerRouteFromUri() throws Exception { MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockAckUri", MockEndpoint.class); - ActorRef consumer = system.actorOf(new Props( new UntypedActorFactory(){ - public Actor create() { - return new TestAckConsumer("direct:testConsumerAckFromUri","mock:mockAckUri"); - } - }), "testConsumerAckUri"); - camel.awaitActivation(consumer,new FiniteDuration(10, TimeUnit.SECONDS)); + Duration timeout = Duration.create(10, TimeUnit.SECONDS); + ActorRef consumer = Await.result( + camel.activationFutureFor(system.actorOf(new Props( new UntypedActorFactory(){ public Actor create() { return new TestAckConsumer("direct:testConsumerAckFromUri","mock:mockAckUri"); } }), "testConsumerAckUri"), + timeout), + timeout); camel.context().addRoutes(new CustomRouteBuilder("direct:testAckFromUri","akka://test/user/testConsumerAckUri?autoAck=false")); camel.template().sendBody("direct:testAckFromUri", "test"); assertMockEndpoint(mockEndpoint); @@ -98,13 +100,12 @@ public class CustomRouteTestBase { @Test(expected=CamelExecutionException.class) public void testCustomTimeoutConsumerRoute() throws Exception { - ActorRef consumer = system.actorOf(new Props( new UntypedActorFactory(){ - public Actor create() { - return new TestAckConsumer("direct:testConsumerException","mock:mockException"); - } - }), "testConsumerException"); - camel.awaitActivation(consumer, new FiniteDuration(10, TimeUnit.SECONDS)); - camel.context().addRoutes(new CustomRouteBuilder("direct:testException", consumer, false, new FiniteDuration(0, TimeUnit.SECONDS))); + Duration timeout = Duration.create(10, TimeUnit.SECONDS); + ActorRef consumer = Await.result( + camel.activationFutureFor(system.actorOf(new Props( new UntypedActorFactory(){ public Actor create() { return new TestAckConsumer("direct:testConsumerException","mock:mockException"); } }), "testConsumerException"), + timeout), + timeout); + camel.context().addRoutes(new CustomRouteBuilder("direct:testException", consumer, false, Duration.create(0, TimeUnit.SECONDS))); camel.template().sendBody("direct:testException", "test"); } @@ -132,7 +133,7 @@ public class CustomRouteTestBase { uri = CamelPath.toUri(actor); } - public CustomRouteBuilder(String from, ActorRef actor, boolean autoAck, FiniteDuration replyTimeout) { + public CustomRouteBuilder(String from, ActorRef actor, boolean autoAck, Duration replyTimeout) { fromUri = from; uri = CamelPath.toUri(actor, autoAck, replyTimeout); } diff --git a/akka-camel/src/test/scala/akka/camel/ActivationIntegrationTest.scala b/akka-camel/src/test/scala/akka/camel/ActivationIntegrationTest.scala index e6e34440c5..9d3ea5ef69 100644 --- a/akka-camel/src/test/scala/akka/camel/ActivationIntegrationTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ActivationIntegrationTest.scala @@ -15,15 +15,16 @@ import TestSupport._ import org.scalatest.WordSpec import akka.testkit.TestLatch import scala.concurrent.Await +import java.util.concurrent.TimeoutException class ActivationIntegrationTest extends WordSpec with MustMatchers with SharedCamelSystem { - implicit val timeout = Timeout(10 seconds) + implicit val timeout = 10 seconds def template: ProducerTemplate = camel.template "ActivationAware must be notified when endpoint is activated" in { val latch = new TestLatch(0) val actor = system.actorOf(Props(new TestConsumer("direct:actor-1", latch))) - camel.awaitActivation(actor, 10 second) must be === actor + Await.result(camel.activationFutureFor(actor), 10 seconds) must be === actor template.requestBody("direct:actor-1", "test") must be("received test") } @@ -39,27 +40,25 @@ class ActivationIntegrationTest extends WordSpec with MustMatchers with SharedCa latch.countDown() } }) - camel.awaitActivation(actor, 10 second) + Await.result(camel.activationFutureFor(actor), timeout) system.stop(actor) - camel.awaitDeactivation(actor, 10 second) + Await.result(camel.deactivationFutureFor(actor), timeout) Await.ready(latch, 10 second) } "ActivationAware must time out when waiting for endpoint de-activation for too long" in { val latch = new TestLatch(0) val actor = start(new TestConsumer("direct:a5", latch)) - camel.awaitActivation(actor, 10 second) - intercept[DeActivationTimeoutException] { - camel.awaitDeactivation(actor, 1 millis) - } + Await.result(camel.activationFutureFor(actor), timeout) + intercept[TimeoutException] { Await.result(camel.deactivationFutureFor(actor), 1 millis) } } - "awaitActivation must fail if notification timeout is too short and activation is not complete yet" in { + "activationFutureFor must fail if notification timeout is too short and activation is not complete yet" in { val latch = new TestLatch(1) try { val actor = system.actorOf(Props(new TestConsumer("direct:actor-4", latch))) - intercept[ActivationTimeoutException] { camel.awaitActivation(actor, 1 millis) } + intercept[TimeoutException] { Await.result(camel.activationFutureFor(actor), 1 millis) } } finally latch.countDown() } diff --git a/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala b/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala index 2d8d780264..5e471c2947 100644 --- a/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala @@ -9,147 +9,147 @@ import language.existentials import akka.actor._ import org.scalatest.matchers.MustMatchers -import scala.concurrent.util.duration._ -import TestSupport._ import org.scalatest.WordSpec +import akka.camel.TestSupport._ import org.apache.camel.model.RouteDefinition import org.apache.camel.builder.Builder import org.apache.camel.{ FailedToCreateRouteException, CamelExecutionException } import java.util.concurrent.{ ExecutionException, TimeUnit, TimeoutException } import akka.testkit.TestLatch -import scala.concurrent.Await import akka.actor.Status.Failure +import scala.concurrent.Await +import scala.concurrent.util.duration._ class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedCamelSystem { - private val defaultTimeout = 10 - "Consumer must throw FailedToCreateRouteException, while awaiting activation, if endpoint is invalid" in { - val actorRef = system.actorOf(Props(new TestActor(uri = "some invalid uri"))) + "ConsumerIntegrationTest" must { + implicit val defaultTimeout = 10.seconds - intercept[FailedToCreateRouteException] { - camel.awaitActivation(actorRef, timeout = defaultTimeout seconds) + "Consumer must throw FailedToCreateRouteException, while awaiting activation, if endpoint is invalid" in { + val actorRef = system.actorOf(Props(new TestActor(uri = "some invalid uri"))) + intercept[FailedToCreateRouteException] { Await.result(camel.activationFutureFor(actorRef), defaultTimeout) } } - } - "Consumer must support in-out messaging" in { - start(new Consumer { - def endpointUri = "direct:a1" - def receive = { - case m: CamelMessage ⇒ sender ! "received " + m.bodyAs[String] - } - }) - camel.sendTo("direct:a1", msg = "some message") must be("received some message") - } - - "Consumer must time-out if consumer is slow" in { - val SHORT_TIMEOUT = 10 millis - val LONG_WAIT = 200 millis - - start(new Consumer { - override def replyTimeout = SHORT_TIMEOUT - - def endpointUri = "direct:a3" - def receive = { case _ ⇒ { Thread.sleep(LONG_WAIT.toMillis); sender ! "done" } } - }) - - val exception = intercept[CamelExecutionException] { - camel.sendTo("direct:a3", msg = "some msg 3") + "Consumer must support in-out messaging" in { + start(new Consumer { + def endpointUri = "direct:a1" + def receive = { + case m: CamelMessage ⇒ sender ! "received " + m.bodyAs[String] + } + }) + camel.sendTo("direct:a1", msg = "some message") must be("received some message") } - exception.getCause.getClass must be(classOf[TimeoutException]) - } - "Consumer must process messages even after actor restart" in { - val restarted = TestLatch() - val consumer = start(new Consumer { - def endpointUri = "direct:a2" + "Consumer must time-out if consumer is slow" in { + val SHORT_TIMEOUT = 10 millis + val LONG_WAIT = 200 millis - def receive = { - case "throw" ⇒ throw new Exception - case m: CamelMessage ⇒ sender ! "received " + m.bodyAs[String] + start(new Consumer { + override def replyTimeout = SHORT_TIMEOUT + + def endpointUri = "direct:a3" + def receive = { case _ ⇒ { Thread.sleep(LONG_WAIT.toMillis); sender ! "done" } } + }) + + val exception = intercept[CamelExecutionException] { + camel.sendTo("direct:a3", msg = "some msg 3") } + exception.getCause.getClass must be(classOf[TimeoutException]) + } - override def postRestart(reason: Throwable) { - restarted.countDown() - } - }) - consumer ! "throw" - Await.ready(restarted, defaultTimeout seconds) + "Consumer must process messages even after actor restart" in { + val restarted = TestLatch() + val consumer = start(new Consumer { + def endpointUri = "direct:a2" - val response = camel.sendTo("direct:a2", msg = "xyz") - response must be("received xyz") - } + def receive = { + case "throw" ⇒ throw new Exception + case m: CamelMessage ⇒ sender ! "received " + m.bodyAs[String] + } - "Consumer must unregister itself when stopped" in { - val consumer = start(new TestActor()) - camel.awaitActivation(consumer, defaultTimeout seconds) + override def postRestart(reason: Throwable) { + restarted.countDown() + } + }) + consumer ! "throw" + Await.ready(restarted, defaultTimeout) - camel.routeCount must be > (0) + val response = camel.sendTo("direct:a2", msg = "xyz") + response must be("received xyz") + } - system.stop(consumer) - camel.awaitDeactivation(consumer, defaultTimeout seconds) + "Consumer must unregister itself when stopped" in { + val consumer = start(new TestActor()) + Await.result(camel.activationFutureFor(consumer), defaultTimeout) - camel.routeCount must be(0) - } + camel.routeCount must be > (0) - "Consumer must register on uri passed in through constructor" in { - val consumer = start(new TestActor("direct://test")) - camel.awaitActivation(consumer, defaultTimeout seconds) + system.stop(consumer) + Await.result(camel.deactivationFutureFor(consumer), defaultTimeout) - camel.routeCount must be > (0) - camel.routes.get(0).getEndpoint.getEndpointUri must be("direct://test") - system.stop(consumer) - camel.awaitDeactivation(consumer, defaultTimeout seconds) + camel.routeCount must be(0) + } - camel.routeCount must be(0) - } + "Consumer must register on uri passed in through constructor" in { + val consumer = start(new TestActor("direct://test")) + Await.result(camel.activationFutureFor(consumer), defaultTimeout) - "Error passing consumer supports error handling through route modification" in { - start(new ErrorThrowingConsumer("direct:error-handler-test") with ErrorPassing { - override def onRouteDefinition(rd: RouteDefinition) = { - rd.onException(classOf[Exception]).handled(true).transform(Builder.exceptionMessage).end - } - }) - camel.sendTo("direct:error-handler-test", msg = "hello") must be("error: hello") - } + camel.routeCount must be > (0) + camel.routes.get(0).getEndpoint.getEndpointUri must be("direct://test") + system.stop(consumer) + Await.result(camel.deactivationFutureFor(consumer), defaultTimeout) - "Error passing consumer supports redelivery through route modification" in { - start(new FailingOnceConsumer("direct:failing-once-concumer") with ErrorPassing { - override def onRouteDefinition(rd: RouteDefinition) = { - rd.onException(classOf[Exception]).maximumRedeliveries(1).end - } - }) - camel.sendTo("direct:failing-once-concumer", msg = "hello") must be("accepted: hello") - } + camel.routeCount must be(0) + } - "Consumer supports manual Ack" in { - start(new ManualAckConsumer() { - def endpointUri = "direct:manual-ack" - def receive = { case _ ⇒ sender ! Ack } - }) - camel.template.asyncSendBody("direct:manual-ack", "some message").get(defaultTimeout, TimeUnit.SECONDS) must be(null) //should not timeout - } + "Error passing consumer supports error handling through route modification" in { + start(new ErrorThrowingConsumer("direct:error-handler-test") with ErrorPassing { + override def onRouteDefinition(rd: RouteDefinition) = { + rd.onException(classOf[Exception]).handled(true).transform(Builder.exceptionMessage).end + } + }) + camel.sendTo("direct:error-handler-test", msg = "hello") must be("error: hello") + } - "Consumer handles manual Ack failure" in { - val someException = new Exception("e1") - start(new ManualAckConsumer() { - def endpointUri = "direct:manual-ack" - def receive = { case _ ⇒ sender ! Failure(someException) } - }) + "Error passing consumer supports redelivery through route modification" in { + start(new FailingOnceConsumer("direct:failing-once-concumer") with ErrorPassing { + override def onRouteDefinition(rd: RouteDefinition) = { + rd.onException(classOf[Exception]).maximumRedeliveries(1).end + } + }) + camel.sendTo("direct:failing-once-concumer", msg = "hello") must be("accepted: hello") + } - intercept[ExecutionException] { - camel.template.asyncSendBody("direct:manual-ack", "some message").get(defaultTimeout, TimeUnit.SECONDS) - }.getCause.getCause must be(someException) - } + "Consumer supports manual Ack" in { + start(new ManualAckConsumer() { + def endpointUri = "direct:manual-ack" + def receive = { case _ ⇒ sender ! Ack } + }) + camel.template.asyncSendBody("direct:manual-ack", "some message").get(defaultTimeout.toSeconds, TimeUnit.SECONDS) must be(null) //should not timeout + } - "Consumer should time-out, if manual Ack not received within replyTimeout and should give a human readable error message" in { - start(new ManualAckConsumer() { - override def replyTimeout = 10 millis - def endpointUri = "direct:manual-ack" - def receive = { case _ ⇒ } - }) + "Consumer handles manual Ack failure" in { + val someException = new Exception("e1") + start(new ManualAckConsumer() { + def endpointUri = "direct:manual-ack" + def receive = { case _ ⇒ sender ! Failure(someException) } + }) - intercept[ExecutionException] { - camel.template.asyncSendBody("direct:manual-ack", "some message").get(defaultTimeout, TimeUnit.SECONDS) - }.getCause.getCause.getMessage must include("Failed to get Ack") + intercept[ExecutionException] { + camel.template.asyncSendBody("direct:manual-ack", "some message").get(defaultTimeout.toSeconds, TimeUnit.SECONDS) + }.getCause.getCause must be(someException) + } + + "Consumer should time-out, if manual Ack not received within replyTimeout and should give a human readable error message" in { + start(new ManualAckConsumer() { + override def replyTimeout = 10 millis + def endpointUri = "direct:manual-ack" + def receive = { case _ ⇒ } + }) + + intercept[ExecutionException] { + camel.template.asyncSendBody("direct:manual-ack", "some message").get(defaultTimeout.toSeconds, TimeUnit.SECONDS) + }.getCause.getCause.getMessage must include("Failed to get Ack") + } } } diff --git a/akka-camel/src/test/scala/akka/camel/ProducerRegistryTest.scala b/akka-camel/src/test/scala/akka/camel/ProducerRegistryTest.scala index 43f9498bdd..5fcf409937 100644 --- a/akka-camel/src/test/scala/akka/camel/ProducerRegistryTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ProducerRegistryTest.scala @@ -11,9 +11,11 @@ import org.scalatest.WordSpec import akka.camel.TestSupport.SharedCamelSystem import scala.concurrent.util.duration._ import akka.actor.{ ActorRef, Props } +import akka.util.Timeout +import scala.concurrent.Await class ProducerRegistryTest extends WordSpec with MustMatchers with SharedCamelSystem { - + implicit val timeout = 5.seconds "A ProducerRegistry" must { def newEmptyActor: ActorRef = system.actorOf(Props.empty) def registerProcessorFor(actorRef: ActorRef) = camel.registerProducer(actorRef, "mock:mock")._2 @@ -21,10 +23,10 @@ class ProducerRegistryTest extends WordSpec with MustMatchers with SharedCamelSy "register a started SendProcessor for the producer, which is stopped when the actor is stopped" in { val actorRef = newEmptyActor val processor = registerProcessorFor(actorRef) - camel.awaitActivation(actorRef, 5 second) + Await.result(camel.activationFutureFor(actorRef), timeout) processor.isStarted must be(true) system.stop(actorRef) - camel.awaitDeactivation(actorRef, 5 second) + Await.result(camel.deactivationFutureFor(actorRef), timeout) (processor.isStopping || processor.isStopped) must be(true) } "remove and stop the SendProcessor if the actorRef is registered" in { diff --git a/akka-camel/src/test/scala/akka/camel/TestSupport.scala b/akka-camel/src/test/scala/akka/camel/TestSupport.scala index 0bea9d6781..2920e558a0 100644 --- a/akka-camel/src/test/scala/akka/camel/TestSupport.scala +++ b/akka-camel/src/test/scala/akka/camel/TestSupport.scala @@ -14,12 +14,14 @@ import org.scalatest.matchers.{ BePropertyMatcher, BePropertyMatchResult } import scala.concurrent.util.{ FiniteDuration, Duration } import scala.reflect.ClassTag import akka.actor.{ ActorRef, Props, ActorSystem, Actor } +import concurrent.Await +import akka.util.Timeout private[camel] object TestSupport { def start(actor: ⇒ Actor)(implicit system: ActorSystem): ActorRef = { val actorRef = system.actorOf(Props(actor)) - CamelExtension(system).awaitActivation(actorRef, 10 seconds) + Await.result(CamelExtension(system).activationFutureFor(actorRef)(10 seconds), 10 seconds) actorRef } diff --git a/akka-docs/java/code/docs/camel/ActivationTestBase.java b/akka-docs/java/code/docs/camel/ActivationTestBase.java index 96e6ff6af0..d53b66a20b 100644 --- a/akka-docs/java/code/docs/camel/ActivationTestBase.java +++ b/akka-docs/java/code/docs/camel/ActivationTestBase.java @@ -30,16 +30,12 @@ public class ActivationTestBase { // get a future reference to the activation of the endpoint of the Consumer Actor FiniteDuration duration = Duration.create(10, SECONDS); Future activationFuture = camel.activationFutureFor(producer, duration); - // or, block wait on the activation - camel.awaitActivation(producer, duration); //#CamelActivation //#CamelDeactivation // .. system.stop(producer); // get a future reference to the deactivation of the endpoint of the Consumer Actor Future deactivationFuture = camel.activationFutureFor(producer, duration); - // or, block wait on the deactivation - camel.awaitDeactivation(producer, duration); //#CamelDeactivation system.shutdown(); } diff --git a/akka-docs/scala/code/docs/camel/Introduction.scala b/akka-docs/scala/code/docs/camel/Introduction.scala index ad4fbadf1a..aa9546defb 100644 --- a/akka-docs/scala/code/docs/camel/Introduction.scala +++ b/akka-docs/scala/code/docs/camel/Introduction.scala @@ -4,6 +4,7 @@ import akka.actor.{ Props, ActorSystem } import akka.camel.CamelExtension import language.postfixOps +import akka.util.Timeout object Introduction { def foo = { @@ -91,16 +92,12 @@ object Introduction { val camel = CamelExtension(system) val actorRef = system.actorOf(Props[MyEndpoint]) // get a future reference to the activation of the endpoint of the Consumer Actor - val activationFuture = camel.activationFutureFor(actorRef, 10 seconds) - // or, block wait on the activation - camel.awaitActivation(actorRef, 10 seconds) + val activationFuture = camel.activationFutureFor(actorRef)(timeout = 10 seconds) //#CamelActivation //#CamelDeactivation system.stop(actorRef) // get a future reference to the deactivation of the endpoint of the Consumer Actor - val deactivationFuture = camel.activationFutureFor(actorRef, 10 seconds) - // or, block wait on the deactivation - camel.awaitDeactivation(actorRef, 10 seconds) + val deactivationFuture = camel.deactivationFutureFor(actorRef)(timeout = 10 seconds) //#CamelDeactivation }