diff --git a/akka-camel/src/main/scala/akka/camel/Activation.scala b/akka-camel/src/main/scala/akka/camel/Activation.scala index a23cc923b4..61bbae14f7 100644 --- a/akka-camel/src/main/scala/akka/camel/Activation.scala +++ b/akka-camel/src/main/scala/akka/camel/Activation.scala @@ -6,10 +6,10 @@ package akka.camel import akka.camel.internal._ import akka.util.Timeout -import scala.concurrent.Future import akka.actor.{ ActorSystem, Props, ActorRef } import akka.pattern._ import scala.concurrent.util.Duration +import concurrent.{ ExecutionContext, Future } /** * Activation trait that can be used to wait on activation or de-activation of Camel endpoints. @@ -27,11 +27,11 @@ trait Activation { * @param endpoint the endpoint to be activated * @param timeout the timeout for the Future */ - def activationFutureFor(endpoint: ActorRef)(implicit timeout: Duration): Future[ActorRef] = + def activationFutureFor(endpoint: ActorRef)(implicit timeout: Duration, executor: ExecutionContext): Future[ActorRef] = (activationTracker.ask(AwaitActivation(endpoint))(Timeout(timeout))).map[ActorRef]({ - case EndpointActivated(`endpoint`) ⇒ endpoint - case EndpointFailedToActivate(_, cause) ⇒ throw cause - })(system.dispatcher) + case EndpointActivated(`endpoint`) ⇒ endpoint + case EndpointFailedToActivate(`endpoint`, cause) ⇒ throw cause + }) /** * Produces a Future which will be completed when the given endpoint has been deactivated or @@ -40,9 +40,9 @@ trait Activation { * @param endpoint the endpoint to be deactivated * @param timeout the timeout of the Future */ - def deactivationFutureFor(endpoint: ActorRef)(implicit timeout: Duration): Future[Unit] = - (activationTracker.ask(AwaitDeActivation(endpoint))(Timeout(timeout))).map[Unit]({ - case EndpointDeActivated(`endpoint`) ⇒ () - case EndpointFailedToDeActivate(_, cause) ⇒ throw cause - })(system.dispatcher) + def deactivationFutureFor(endpoint: ActorRef)(implicit timeout: Duration, executor: ExecutionContext): Future[ActorRef] = + (activationTracker.ask(AwaitDeActivation(endpoint))(Timeout(timeout))).map[ActorRef]({ + case EndpointDeActivated(`endpoint`) ⇒ endpoint + case EndpointFailedToDeActivate(`endpoint`, cause) ⇒ throw cause + }) } \ No newline at end of file diff --git a/akka-camel/src/main/scala/akka/camel/internal/ConsumerRegistry.scala b/akka-camel/src/main/scala/akka/camel/internal/ConsumerRegistry.scala index 5531cda109..c69c2f55bf 100644 --- a/akka-camel/src/main/scala/akka/camel/internal/ConsumerRegistry.scala +++ b/akka-camel/src/main/scala/akka/camel/internal/ConsumerRegistry.scala @@ -40,7 +40,7 @@ private[camel] trait ConsumerRegistry { this: Activation ⇒ */ private[camel] def registerConsumer(endpointUri: String, consumer: Consumer, activationTimeout: Duration) = { idempotentRegistry ! RegisterConsumer(endpointUri, consumer.self, consumer) - Await.result(activationFutureFor(consumer.self)(activationTimeout), activationTimeout) + Await.result(activationFutureFor(consumer.self)(activationTimeout, consumer.context.dispatcher), activationTimeout) } } diff --git a/akka-camel/src/test/java/akka/camel/ConsumerJavaTestBase.java b/akka-camel/src/test/java/akka/camel/ConsumerJavaTestBase.java index d83fadc209..b30f541bb1 100644 --- a/akka-camel/src/test/java/akka/camel/ConsumerJavaTestBase.java +++ b/akka-camel/src/test/java/akka/camel/ConsumerJavaTestBase.java @@ -9,6 +9,7 @@ import akka.actor.ActorSystem; import akka.actor.Props; import akka.testkit.JavaTestKit; import scala.concurrent.Await; +import scala.concurrent.ExecutionContext; import scala.concurrent.util.Duration; import org.junit.AfterClass; import org.junit.Test; @@ -25,8 +26,6 @@ import static org.junit.Assert.assertEquals; public class ConsumerJavaTestBase { static ActorSystem system = ActorSystem.create("test", AkkaSpec.testConf()); - static Camel camel = CamelExtension.get(system); - @AfterClass public static void tearDownAfterClass() { @@ -39,9 +38,11 @@ public class ConsumerJavaTestBase { String result = new EventFilter(Exception.class) { protected String run() { Duration timeout = Duration.create(1, TimeUnit.SECONDS); + Camel camel = CamelExtension.get(system); + ExecutionContext executionContext = system.dispatcher(); try { ActorRef ref = Await.result( - camel.activationFutureFor(system.actorOf(new Props(SampleErrorHandlingConsumer.class)), timeout), + camel.activationFutureFor(system.actorOf(new Props(SampleErrorHandlingConsumer.class)), timeout, executionContext), timeout); return camel.template().requestBody("direct:error-handler-test-java", "hello", String.class); } diff --git a/akka-camel/src/test/java/akka/camel/CustomRouteTestBase.java b/akka-camel/src/test/java/akka/camel/CustomRouteTestBase.java index df668b72a1..bb7bf4042f 100644 --- a/akka-camel/src/test/java/akka/camel/CustomRouteTestBase.java +++ b/akka-camel/src/test/java/akka/camel/CustomRouteTestBase.java @@ -5,6 +5,7 @@ import akka.camel.internal.component.CamelPath; import akka.camel.javaapi.UntypedConsumerActor; import akka.camel.javaapi.UntypedProducerActor; import scala.concurrent.Await; +import scala.concurrent.ExecutionContext; import scala.concurrent.util.Duration; import org.apache.camel.CamelExecutionException; import org.apache.camel.Exchange; @@ -59,8 +60,9 @@ public class CustomRouteTestBase { public void testCustomConsumerRoute() throws Exception { MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockConsumer", MockEndpoint.class); Duration timeout = Duration.create(10, TimeUnit.SECONDS); + ExecutionContext executionContext = system.dispatcher(); ActorRef consumer = Await.result( - camel.activationFutureFor(system.actorOf(new Props(TestConsumer.class), "testConsumer"), timeout), + camel.activationFutureFor(system.actorOf(new Props(TestConsumer.class), "testConsumer"), timeout, executionContext), timeout); camel.context().addRoutes(new CustomRouteBuilder("direct:testRouteConsumer",consumer)); camel.template().sendBody("direct:testRouteConsumer", "test"); @@ -72,11 +74,12 @@ public class CustomRouteTestBase { public void testCustomAckConsumerRoute() throws Exception { MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockAck", MockEndpoint.class); Duration timeout = Duration.create(10, TimeUnit.SECONDS); + ExecutionContext executionContext = system.dispatcher(); ActorRef consumer = Await.result( camel.activationFutureFor( system.actorOf( new Props( new UntypedActorFactory(){ public Actor create() { return new TestAckConsumer("direct:testConsumerAck","mock:mockAck"); } }), "testConsumerAck"), - timeout), + timeout, executionContext), timeout); camel.context().addRoutes(new CustomRouteBuilder("direct:testAck", consumer, false, timeout)); camel.template().sendBody("direct:testAck", "test"); @@ -87,10 +90,11 @@ public class CustomRouteTestBase { @Test public void testCustomAckConsumerRouteFromUri() throws Exception { MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockAckUri", MockEndpoint.class); + ExecutionContext executionContext = system.dispatcher(); Duration timeout = Duration.create(10, TimeUnit.SECONDS); ActorRef consumer = Await.result( camel.activationFutureFor(system.actorOf(new Props( new UntypedActorFactory(){ public Actor create() { return new TestAckConsumer("direct:testConsumerAckFromUri","mock:mockAckUri"); } }), "testConsumerAckUri"), - timeout), + timeout, executionContext), timeout); camel.context().addRoutes(new CustomRouteBuilder("direct:testAckFromUri","akka://test/user/testConsumerAckUri?autoAck=false")); camel.template().sendBody("direct:testAckFromUri", "test"); @@ -101,9 +105,10 @@ public class CustomRouteTestBase { @Test(expected=CamelExecutionException.class) public void testCustomTimeoutConsumerRoute() throws Exception { Duration timeout = Duration.create(10, TimeUnit.SECONDS); + ExecutionContext executionContext = system.dispatcher(); ActorRef consumer = Await.result( camel.activationFutureFor(system.actorOf(new Props( new UntypedActorFactory(){ public Actor create() { return new TestAckConsumer("direct:testConsumerException","mock:mockException"); } }), "testConsumerException"), - timeout), + timeout, executionContext), timeout); camel.context().addRoutes(new CustomRouteBuilder("direct:testException", consumer, false, Duration.create(0, TimeUnit.SECONDS))); camel.template().sendBody("direct:testException", "test"); diff --git a/akka-camel/src/test/scala/akka/camel/ActivationIntegrationTest.scala b/akka-camel/src/test/scala/akka/camel/ActivationIntegrationTest.scala index 9d3ea5ef69..fe79bacdd8 100644 --- a/akka-camel/src/test/scala/akka/camel/ActivationIntegrationTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ActivationIntegrationTest.scala @@ -20,6 +20,7 @@ import java.util.concurrent.TimeoutException class ActivationIntegrationTest extends WordSpec with MustMatchers with SharedCamelSystem { implicit val timeout = 10 seconds def template: ProducerTemplate = camel.template + import system.dispatcher "ActivationAware must be notified when endpoint is activated" in { val latch = new TestLatch(0) diff --git a/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala b/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala index 20fa98b7b8..6ffacf3432 100644 --- a/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala @@ -15,14 +15,15 @@ import org.apache.camel.model.RouteDefinition import org.apache.camel.builder.Builder import org.apache.camel.{ FailedToCreateRouteException, CamelExecutionException } import java.util.concurrent.{ ExecutionException, TimeUnit, TimeoutException } -import akka.testkit._ -import scala.concurrent.Await -import scala.concurrent.util.duration._ import akka.actor.Status.Failure +import scala.concurrent.util.duration._ +import concurrent.{ ExecutionContext, Await } +import akka.testkit._ class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedCamelSystem { "ConsumerIntegrationTest" must { implicit val defaultTimeout = 10.seconds + implicit def ec: ExecutionContext = system.dispatcher "Consumer must throw FailedToCreateRouteException, while awaiting activation, if endpoint is invalid" in { filterEvents(EventFilter[ActorInitializationException](occurrences = 1), EventFilter[FailedToCreateRouteException](occurrences = 1)) { diff --git a/akka-camel/src/test/scala/akka/camel/ProducerRegistryTest.scala b/akka-camel/src/test/scala/akka/camel/ProducerRegistryTest.scala index 5fcf409937..4d46544b99 100644 --- a/akka-camel/src/test/scala/akka/camel/ProducerRegistryTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ProducerRegistryTest.scala @@ -16,6 +16,8 @@ import scala.concurrent.Await class ProducerRegistryTest extends WordSpec with MustMatchers with SharedCamelSystem { implicit val timeout = 5.seconds + implicit val ec = system.dispatcher + "A ProducerRegistry" must { def newEmptyActor: ActorRef = system.actorOf(Props.empty) def registerProcessorFor(actorRef: ActorRef) = camel.registerProducer(actorRef, "mock:mock")._2 diff --git a/akka-camel/src/test/scala/akka/camel/TestSupport.scala b/akka-camel/src/test/scala/akka/camel/TestSupport.scala index 38454db1c4..fe22b0e7a0 100644 --- a/akka-camel/src/test/scala/akka/camel/TestSupport.scala +++ b/akka-camel/src/test/scala/akka/camel/TestSupport.scala @@ -22,7 +22,7 @@ private[camel] object TestSupport { def start(actor: ⇒ Actor)(implicit system: ActorSystem): ActorRef = { val actorRef = system.actorOf(Props(actor)) - Await.result(CamelExtension(system).activationFutureFor(actorRef)(10 seconds), 10 seconds) + Await.result(CamelExtension(system).activationFutureFor(actorRef)(10 seconds, system.dispatcher), 10 seconds) actorRef } diff --git a/akka-docs/java/code/docs/camel/ActivationTestBase.java b/akka-docs/java/code/docs/camel/ActivationTestBase.java index d53b66a20b..d75b59b02b 100644 --- a/akka-docs/java/code/docs/camel/ActivationTestBase.java +++ b/akka-docs/java/code/docs/camel/ActivationTestBase.java @@ -29,13 +29,13 @@ public class ActivationTestBase { Camel camel = CamelExtension.get(system); // get a future reference to the activation of the endpoint of the Consumer Actor FiniteDuration duration = Duration.create(10, SECONDS); - Future activationFuture = camel.activationFutureFor(producer, duration); + Future activationFuture = camel.activationFutureFor(producer, duration, system.dispatcher()); //#CamelActivation //#CamelDeactivation // .. system.stop(producer); // get a future reference to the deactivation of the endpoint of the Consumer Actor - Future deactivationFuture = camel.activationFutureFor(producer, duration); + Future deactivationFuture = camel.deactivationFutureFor(producer, duration, system.dispatcher()); //#CamelDeactivation system.shutdown(); } diff --git a/akka-docs/scala/code/docs/camel/Introduction.scala b/akka-docs/scala/code/docs/camel/Introduction.scala index aa9546defb..6abb8c33bc 100644 --- a/akka-docs/scala/code/docs/camel/Introduction.scala +++ b/akka-docs/scala/code/docs/camel/Introduction.scala @@ -92,12 +92,12 @@ object Introduction { val camel = CamelExtension(system) val actorRef = system.actorOf(Props[MyEndpoint]) // get a future reference to the activation of the endpoint of the Consumer Actor - val activationFuture = camel.activationFutureFor(actorRef)(timeout = 10 seconds) + val activationFuture = camel.activationFutureFor(actorRef)(timeout = 10 seconds, executor = system.dispatcher) //#CamelActivation //#CamelDeactivation system.stop(actorRef) // get a future reference to the deactivation of the endpoint of the Consumer Actor - val deactivationFuture = camel.deactivationFutureFor(actorRef)(timeout = 10 seconds) + val deactivationFuture = camel.deactivationFutureFor(actorRef)(timeout = 10 seconds, executor = system.dispatcher) //#CamelDeactivation }