diff --git a/akka-camel/src/main/scala/akka/camel/Activation.scala b/akka-camel/src/main/scala/akka/camel/Activation.scala index a23cc923b4..61bbae14f7 100644 --- a/akka-camel/src/main/scala/akka/camel/Activation.scala +++ b/akka-camel/src/main/scala/akka/camel/Activation.scala @@ -6,10 +6,10 @@ package akka.camel import akka.camel.internal._ import akka.util.Timeout -import scala.concurrent.Future import akka.actor.{ ActorSystem, Props, ActorRef } import akka.pattern._ import scala.concurrent.util.Duration +import concurrent.{ ExecutionContext, Future } /** * Activation trait that can be used to wait on activation or de-activation of Camel endpoints. @@ -27,11 +27,11 @@ trait Activation { * @param endpoint the endpoint to be activated * @param timeout the timeout for the Future */ - def activationFutureFor(endpoint: ActorRef)(implicit timeout: Duration): Future[ActorRef] = + def activationFutureFor(endpoint: ActorRef)(implicit timeout: Duration, executor: ExecutionContext): Future[ActorRef] = (activationTracker.ask(AwaitActivation(endpoint))(Timeout(timeout))).map[ActorRef]({ - case EndpointActivated(`endpoint`) ⇒ endpoint - case EndpointFailedToActivate(_, cause) ⇒ throw cause - })(system.dispatcher) + case EndpointActivated(`endpoint`) ⇒ endpoint + case EndpointFailedToActivate(`endpoint`, cause) ⇒ throw cause + }) /** * Produces a Future which will be completed when the given endpoint has been deactivated or @@ -40,9 +40,9 @@ trait Activation { * @param endpoint the endpoint to be deactivated * @param timeout the timeout of the Future */ - def deactivationFutureFor(endpoint: ActorRef)(implicit timeout: Duration): Future[Unit] = - (activationTracker.ask(AwaitDeActivation(endpoint))(Timeout(timeout))).map[Unit]({ - case EndpointDeActivated(`endpoint`) ⇒ () - case EndpointFailedToDeActivate(_, cause) ⇒ throw cause - })(system.dispatcher) + def deactivationFutureFor(endpoint: ActorRef)(implicit timeout: Duration, executor: ExecutionContext): Future[ActorRef] = + (activationTracker.ask(AwaitDeActivation(endpoint))(Timeout(timeout))).map[ActorRef]({ + case EndpointDeActivated(`endpoint`) ⇒ endpoint + case EndpointFailedToDeActivate(`endpoint`, cause) ⇒ throw cause + }) } \ No newline at end of file diff --git a/akka-camel/src/main/scala/akka/camel/internal/ConsumerRegistry.scala b/akka-camel/src/main/scala/akka/camel/internal/ConsumerRegistry.scala index 5531cda109..c69c2f55bf 100644 --- a/akka-camel/src/main/scala/akka/camel/internal/ConsumerRegistry.scala +++ b/akka-camel/src/main/scala/akka/camel/internal/ConsumerRegistry.scala @@ -40,7 +40,7 @@ private[camel] trait ConsumerRegistry { this: Activation ⇒ */ private[camel] def registerConsumer(endpointUri: String, consumer: Consumer, activationTimeout: Duration) = { idempotentRegistry ! RegisterConsumer(endpointUri, consumer.self, consumer) - Await.result(activationFutureFor(consumer.self)(activationTimeout), activationTimeout) + Await.result(activationFutureFor(consumer.self)(activationTimeout, consumer.context.dispatcher), activationTimeout) } } diff --git a/akka-camel/src/test/java/akka/camel/ConsumerJavaTestBase.java b/akka-camel/src/test/java/akka/camel/ConsumerJavaTestBase.java index 8bb72baf00..c9872fbddb 100644 --- a/akka-camel/src/test/java/akka/camel/ConsumerJavaTestBase.java +++ b/akka-camel/src/test/java/akka/camel/ConsumerJavaTestBase.java @@ -8,6 +8,7 @@ import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; import scala.concurrent.Await; +import scala.concurrent.ExecutionContext; import scala.concurrent.util.Duration; import org.junit.AfterClass; import org.junit.Test; @@ -35,8 +36,9 @@ public class ConsumerJavaTestBase { @Test public void shouldHandleExceptionThrownByActorAndGenerateCustomResponse() throws Exception { Duration timeout = Duration.create(1, TimeUnit.SECONDS); + ExecutionContext executionContext = system.dispatcher(); ActorRef ref = Await.result( - camel.activationFutureFor(system.actorOf(new Props(SampleErrorHandlingConsumer.class)), timeout), + camel.activationFutureFor(system.actorOf(new Props(SampleErrorHandlingConsumer.class)), timeout, executionContext), timeout); String result = camel.template().requestBody("direct:error-handler-test-java", "hello", String.class); diff --git a/akka-camel/src/test/java/akka/camel/CustomRouteTestBase.java b/akka-camel/src/test/java/akka/camel/CustomRouteTestBase.java index df668b72a1..bb7bf4042f 100644 --- a/akka-camel/src/test/java/akka/camel/CustomRouteTestBase.java +++ b/akka-camel/src/test/java/akka/camel/CustomRouteTestBase.java @@ -5,6 +5,7 @@ import akka.camel.internal.component.CamelPath; import akka.camel.javaapi.UntypedConsumerActor; import akka.camel.javaapi.UntypedProducerActor; import scala.concurrent.Await; +import scala.concurrent.ExecutionContext; import scala.concurrent.util.Duration; import org.apache.camel.CamelExecutionException; import org.apache.camel.Exchange; @@ -59,8 +60,9 @@ public class CustomRouteTestBase { public void testCustomConsumerRoute() throws Exception { MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockConsumer", MockEndpoint.class); Duration timeout = Duration.create(10, TimeUnit.SECONDS); + ExecutionContext executionContext = system.dispatcher(); ActorRef consumer = Await.result( - camel.activationFutureFor(system.actorOf(new Props(TestConsumer.class), "testConsumer"), timeout), + camel.activationFutureFor(system.actorOf(new Props(TestConsumer.class), "testConsumer"), timeout, executionContext), timeout); camel.context().addRoutes(new CustomRouteBuilder("direct:testRouteConsumer",consumer)); camel.template().sendBody("direct:testRouteConsumer", "test"); @@ -72,11 +74,12 @@ public class CustomRouteTestBase { public void testCustomAckConsumerRoute() throws Exception { MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockAck", MockEndpoint.class); Duration timeout = Duration.create(10, TimeUnit.SECONDS); + ExecutionContext executionContext = system.dispatcher(); ActorRef consumer = Await.result( camel.activationFutureFor( system.actorOf( new Props( new UntypedActorFactory(){ public Actor create() { return new TestAckConsumer("direct:testConsumerAck","mock:mockAck"); } }), "testConsumerAck"), - timeout), + timeout, executionContext), timeout); camel.context().addRoutes(new CustomRouteBuilder("direct:testAck", consumer, false, timeout)); camel.template().sendBody("direct:testAck", "test"); @@ -87,10 +90,11 @@ public class CustomRouteTestBase { @Test public void testCustomAckConsumerRouteFromUri() throws Exception { MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockAckUri", MockEndpoint.class); + ExecutionContext executionContext = system.dispatcher(); Duration timeout = Duration.create(10, TimeUnit.SECONDS); ActorRef consumer = Await.result( camel.activationFutureFor(system.actorOf(new Props( new UntypedActorFactory(){ public Actor create() { return new TestAckConsumer("direct:testConsumerAckFromUri","mock:mockAckUri"); } }), "testConsumerAckUri"), - timeout), + timeout, executionContext), timeout); camel.context().addRoutes(new CustomRouteBuilder("direct:testAckFromUri","akka://test/user/testConsumerAckUri?autoAck=false")); camel.template().sendBody("direct:testAckFromUri", "test"); @@ -101,9 +105,10 @@ public class CustomRouteTestBase { @Test(expected=CamelExecutionException.class) public void testCustomTimeoutConsumerRoute() throws Exception { Duration timeout = Duration.create(10, TimeUnit.SECONDS); + ExecutionContext executionContext = system.dispatcher(); ActorRef consumer = Await.result( camel.activationFutureFor(system.actorOf(new Props( new UntypedActorFactory(){ public Actor create() { return new TestAckConsumer("direct:testConsumerException","mock:mockException"); } }), "testConsumerException"), - timeout), + timeout, executionContext), timeout); camel.context().addRoutes(new CustomRouteBuilder("direct:testException", consumer, false, Duration.create(0, TimeUnit.SECONDS))); camel.template().sendBody("direct:testException", "test"); diff --git a/akka-camel/src/test/scala/akka/camel/ActivationIntegrationTest.scala b/akka-camel/src/test/scala/akka/camel/ActivationIntegrationTest.scala index 9d3ea5ef69..fe79bacdd8 100644 --- a/akka-camel/src/test/scala/akka/camel/ActivationIntegrationTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ActivationIntegrationTest.scala @@ -20,6 +20,7 @@ import java.util.concurrent.TimeoutException class ActivationIntegrationTest extends WordSpec with MustMatchers with SharedCamelSystem { implicit val timeout = 10 seconds def template: ProducerTemplate = camel.template + import system.dispatcher "ActivationAware must be notified when endpoint is activated" in { val latch = new TestLatch(0) diff --git a/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala b/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala index 08ee3cf99d..9f98f5b495 100644 --- a/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala @@ -17,12 +17,13 @@ import org.apache.camel.{ FailedToCreateRouteException, CamelExecutionException import java.util.concurrent.{ ExecutionException, TimeUnit, TimeoutException } import akka.testkit.TestLatch import akka.actor.Status.Failure -import scala.concurrent.Await import scala.concurrent.util.duration._ +import concurrent.{ ExecutionContext, Await } class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedCamelSystem { "ConsumerIntegrationTest" must { implicit val defaultTimeout = 10.seconds + implicit def ec: ExecutionContext = system.dispatcher "Consumer must throw FailedToCreateRouteException, while awaiting activation, if endpoint is invalid" in { val actorRef = system.actorOf(Props(new TestActor(uri = "some invalid uri"))) diff --git a/akka-camel/src/test/scala/akka/camel/ProducerRegistryTest.scala b/akka-camel/src/test/scala/akka/camel/ProducerRegistryTest.scala index 5fcf409937..4d46544b99 100644 --- a/akka-camel/src/test/scala/akka/camel/ProducerRegistryTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ProducerRegistryTest.scala @@ -16,6 +16,8 @@ import scala.concurrent.Await class ProducerRegistryTest extends WordSpec with MustMatchers with SharedCamelSystem { implicit val timeout = 5.seconds + implicit val ec = system.dispatcher + "A ProducerRegistry" must { def newEmptyActor: ActorRef = system.actorOf(Props.empty) def registerProcessorFor(actorRef: ActorRef) = camel.registerProducer(actorRef, "mock:mock")._2 diff --git a/akka-camel/src/test/scala/akka/camel/TestSupport.scala b/akka-camel/src/test/scala/akka/camel/TestSupport.scala index 2920e558a0..0e212576b8 100644 --- a/akka-camel/src/test/scala/akka/camel/TestSupport.scala +++ b/akka-camel/src/test/scala/akka/camel/TestSupport.scala @@ -21,7 +21,7 @@ private[camel] object TestSupport { def start(actor: ⇒ Actor)(implicit system: ActorSystem): ActorRef = { val actorRef = system.actorOf(Props(actor)) - Await.result(CamelExtension(system).activationFutureFor(actorRef)(10 seconds), 10 seconds) + Await.result(CamelExtension(system).activationFutureFor(actorRef)(10 seconds, system.dispatcher), 10 seconds) actorRef } diff --git a/akka-docs/java/code/docs/camel/ActivationTestBase.java b/akka-docs/java/code/docs/camel/ActivationTestBase.java index d53b66a20b..d75b59b02b 100644 --- a/akka-docs/java/code/docs/camel/ActivationTestBase.java +++ b/akka-docs/java/code/docs/camel/ActivationTestBase.java @@ -29,13 +29,13 @@ public class ActivationTestBase { Camel camel = CamelExtension.get(system); // get a future reference to the activation of the endpoint of the Consumer Actor FiniteDuration duration = Duration.create(10, SECONDS); - Future activationFuture = camel.activationFutureFor(producer, duration); + Future activationFuture = camel.activationFutureFor(producer, duration, system.dispatcher()); //#CamelActivation //#CamelDeactivation // .. system.stop(producer); // get a future reference to the deactivation of the endpoint of the Consumer Actor - Future deactivationFuture = camel.activationFutureFor(producer, duration); + Future deactivationFuture = camel.deactivationFutureFor(producer, duration, system.dispatcher()); //#CamelDeactivation system.shutdown(); } diff --git a/akka-docs/scala/code/docs/camel/Introduction.scala b/akka-docs/scala/code/docs/camel/Introduction.scala index aa9546defb..6abb8c33bc 100644 --- a/akka-docs/scala/code/docs/camel/Introduction.scala +++ b/akka-docs/scala/code/docs/camel/Introduction.scala @@ -92,12 +92,12 @@ object Introduction { val camel = CamelExtension(system) val actorRef = system.actorOf(Props[MyEndpoint]) // get a future reference to the activation of the endpoint of the Consumer Actor - val activationFuture = camel.activationFutureFor(actorRef)(timeout = 10 seconds) + val activationFuture = camel.activationFutureFor(actorRef)(timeout = 10 seconds, executor = system.dispatcher) //#CamelActivation //#CamelDeactivation system.stop(actorRef) // get a future reference to the deactivation of the endpoint of the Consumer Actor - val deactivationFuture = camel.deactivationFutureFor(actorRef)(timeout = 10 seconds) + val deactivationFuture = camel.deactivationFutureFor(actorRef)(timeout = 10 seconds, executor = system.dispatcher) //#CamelDeactivation }