#2384 - Adding implicit EC for activation and deactivation future call

This commit is contained in:
Viktor Klang 2012-08-14 13:16:43 +02:00
parent 9094199011
commit c94d2c7274
10 changed files with 33 additions and 22 deletions

View file

@ -6,10 +6,10 @@ package akka.camel
import akka.camel.internal._ import akka.camel.internal._
import akka.util.Timeout import akka.util.Timeout
import scala.concurrent.Future
import akka.actor.{ ActorSystem, Props, ActorRef } import akka.actor.{ ActorSystem, Props, ActorRef }
import akka.pattern._ import akka.pattern._
import scala.concurrent.util.Duration import scala.concurrent.util.Duration
import concurrent.{ ExecutionContext, Future }
/** /**
* Activation trait that can be used to wait on activation or de-activation of Camel endpoints. * Activation trait that can be used to wait on activation or de-activation of Camel endpoints.
@ -27,11 +27,11 @@ trait Activation {
* @param endpoint the endpoint to be activated * @param endpoint the endpoint to be activated
* @param timeout the timeout for the Future * @param timeout the timeout for the Future
*/ */
def activationFutureFor(endpoint: ActorRef)(implicit timeout: Duration): Future[ActorRef] = def activationFutureFor(endpoint: ActorRef)(implicit timeout: Duration, executor: ExecutionContext): Future[ActorRef] =
(activationTracker.ask(AwaitActivation(endpoint))(Timeout(timeout))).map[ActorRef]({ (activationTracker.ask(AwaitActivation(endpoint))(Timeout(timeout))).map[ActorRef]({
case EndpointActivated(`endpoint`) endpoint case EndpointActivated(`endpoint`) endpoint
case EndpointFailedToActivate(_, cause) throw cause case EndpointFailedToActivate(`endpoint`, cause) throw cause
})(system.dispatcher) })
/** /**
* Produces a Future which will be completed when the given endpoint has been deactivated or * Produces a Future which will be completed when the given endpoint has been deactivated or
@ -40,9 +40,9 @@ trait Activation {
* @param endpoint the endpoint to be deactivated * @param endpoint the endpoint to be deactivated
* @param timeout the timeout of the Future * @param timeout the timeout of the Future
*/ */
def deactivationFutureFor(endpoint: ActorRef)(implicit timeout: Duration): Future[Unit] = def deactivationFutureFor(endpoint: ActorRef)(implicit timeout: Duration, executor: ExecutionContext): Future[ActorRef] =
(activationTracker.ask(AwaitDeActivation(endpoint))(Timeout(timeout))).map[Unit]({ (activationTracker.ask(AwaitDeActivation(endpoint))(Timeout(timeout))).map[ActorRef]({
case EndpointDeActivated(`endpoint`) () case EndpointDeActivated(`endpoint`) endpoint
case EndpointFailedToDeActivate(_, cause) throw cause case EndpointFailedToDeActivate(`endpoint`, cause) throw cause
})(system.dispatcher) })
} }

View file

@ -40,7 +40,7 @@ private[camel] trait ConsumerRegistry { this: Activation ⇒
*/ */
private[camel] def registerConsumer(endpointUri: String, consumer: Consumer, activationTimeout: Duration) = { private[camel] def registerConsumer(endpointUri: String, consumer: Consumer, activationTimeout: Duration) = {
idempotentRegistry ! RegisterConsumer(endpointUri, consumer.self, consumer) idempotentRegistry ! RegisterConsumer(endpointUri, consumer.self, consumer)
Await.result(activationFutureFor(consumer.self)(activationTimeout), activationTimeout) Await.result(activationFutureFor(consumer.self)(activationTimeout, consumer.context.dispatcher), activationTimeout)
} }
} }

View file

@ -8,6 +8,7 @@ import akka.actor.ActorRef;
import akka.actor.ActorSystem; import akka.actor.ActorSystem;
import akka.actor.Props; import akka.actor.Props;
import scala.concurrent.Await; import scala.concurrent.Await;
import scala.concurrent.ExecutionContext;
import scala.concurrent.util.Duration; import scala.concurrent.util.Duration;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Test; import org.junit.Test;
@ -35,8 +36,9 @@ public class ConsumerJavaTestBase {
@Test @Test
public void shouldHandleExceptionThrownByActorAndGenerateCustomResponse() throws Exception { public void shouldHandleExceptionThrownByActorAndGenerateCustomResponse() throws Exception {
Duration timeout = Duration.create(1, TimeUnit.SECONDS); Duration timeout = Duration.create(1, TimeUnit.SECONDS);
ExecutionContext executionContext = system.dispatcher();
ActorRef ref = Await.result( ActorRef ref = Await.result(
camel.activationFutureFor(system.actorOf(new Props(SampleErrorHandlingConsumer.class)), timeout), camel.activationFutureFor(system.actorOf(new Props(SampleErrorHandlingConsumer.class)), timeout, executionContext),
timeout); timeout);
String result = camel.template().requestBody("direct:error-handler-test-java", "hello", String.class); String result = camel.template().requestBody("direct:error-handler-test-java", "hello", String.class);

View file

@ -5,6 +5,7 @@ import akka.camel.internal.component.CamelPath;
import akka.camel.javaapi.UntypedConsumerActor; import akka.camel.javaapi.UntypedConsumerActor;
import akka.camel.javaapi.UntypedProducerActor; import akka.camel.javaapi.UntypedProducerActor;
import scala.concurrent.Await; import scala.concurrent.Await;
import scala.concurrent.ExecutionContext;
import scala.concurrent.util.Duration; import scala.concurrent.util.Duration;
import org.apache.camel.CamelExecutionException; import org.apache.camel.CamelExecutionException;
import org.apache.camel.Exchange; import org.apache.camel.Exchange;
@ -59,8 +60,9 @@ public class CustomRouteTestBase {
public void testCustomConsumerRoute() throws Exception { public void testCustomConsumerRoute() throws Exception {
MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockConsumer", MockEndpoint.class); MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockConsumer", MockEndpoint.class);
Duration timeout = Duration.create(10, TimeUnit.SECONDS); Duration timeout = Duration.create(10, TimeUnit.SECONDS);
ExecutionContext executionContext = system.dispatcher();
ActorRef consumer = Await.result( ActorRef consumer = Await.result(
camel.activationFutureFor(system.actorOf(new Props(TestConsumer.class), "testConsumer"), timeout), camel.activationFutureFor(system.actorOf(new Props(TestConsumer.class), "testConsumer"), timeout, executionContext),
timeout); timeout);
camel.context().addRoutes(new CustomRouteBuilder("direct:testRouteConsumer",consumer)); camel.context().addRoutes(new CustomRouteBuilder("direct:testRouteConsumer",consumer));
camel.template().sendBody("direct:testRouteConsumer", "test"); camel.template().sendBody("direct:testRouteConsumer", "test");
@ -72,11 +74,12 @@ public class CustomRouteTestBase {
public void testCustomAckConsumerRoute() throws Exception { public void testCustomAckConsumerRoute() throws Exception {
MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockAck", MockEndpoint.class); MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockAck", MockEndpoint.class);
Duration timeout = Duration.create(10, TimeUnit.SECONDS); Duration timeout = Duration.create(10, TimeUnit.SECONDS);
ExecutionContext executionContext = system.dispatcher();
ActorRef consumer = Await.result( ActorRef consumer = Await.result(
camel.activationFutureFor( camel.activationFutureFor(
system.actorOf( system.actorOf(
new Props( new UntypedActorFactory(){ public Actor create() { return new TestAckConsumer("direct:testConsumerAck","mock:mockAck"); } }), "testConsumerAck"), new Props( new UntypedActorFactory(){ public Actor create() { return new TestAckConsumer("direct:testConsumerAck","mock:mockAck"); } }), "testConsumerAck"),
timeout), timeout, executionContext),
timeout); timeout);
camel.context().addRoutes(new CustomRouteBuilder("direct:testAck", consumer, false, timeout)); camel.context().addRoutes(new CustomRouteBuilder("direct:testAck", consumer, false, timeout));
camel.template().sendBody("direct:testAck", "test"); camel.template().sendBody("direct:testAck", "test");
@ -87,10 +90,11 @@ public class CustomRouteTestBase {
@Test @Test
public void testCustomAckConsumerRouteFromUri() throws Exception { public void testCustomAckConsumerRouteFromUri() throws Exception {
MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockAckUri", MockEndpoint.class); MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockAckUri", MockEndpoint.class);
ExecutionContext executionContext = system.dispatcher();
Duration timeout = Duration.create(10, TimeUnit.SECONDS); Duration timeout = Duration.create(10, TimeUnit.SECONDS);
ActorRef consumer = Await.result( ActorRef consumer = Await.result(
camel.activationFutureFor(system.actorOf(new Props( new UntypedActorFactory(){ public Actor create() { return new TestAckConsumer("direct:testConsumerAckFromUri","mock:mockAckUri"); } }), "testConsumerAckUri"), camel.activationFutureFor(system.actorOf(new Props( new UntypedActorFactory(){ public Actor create() { return new TestAckConsumer("direct:testConsumerAckFromUri","mock:mockAckUri"); } }), "testConsumerAckUri"),
timeout), timeout, executionContext),
timeout); timeout);
camel.context().addRoutes(new CustomRouteBuilder("direct:testAckFromUri","akka://test/user/testConsumerAckUri?autoAck=false")); camel.context().addRoutes(new CustomRouteBuilder("direct:testAckFromUri","akka://test/user/testConsumerAckUri?autoAck=false"));
camel.template().sendBody("direct:testAckFromUri", "test"); camel.template().sendBody("direct:testAckFromUri", "test");
@ -101,9 +105,10 @@ public class CustomRouteTestBase {
@Test(expected=CamelExecutionException.class) @Test(expected=CamelExecutionException.class)
public void testCustomTimeoutConsumerRoute() throws Exception { public void testCustomTimeoutConsumerRoute() throws Exception {
Duration timeout = Duration.create(10, TimeUnit.SECONDS); Duration timeout = Duration.create(10, TimeUnit.SECONDS);
ExecutionContext executionContext = system.dispatcher();
ActorRef consumer = Await.result( ActorRef consumer = Await.result(
camel.activationFutureFor(system.actorOf(new Props( new UntypedActorFactory(){ public Actor create() { return new TestAckConsumer("direct:testConsumerException","mock:mockException"); } }), "testConsumerException"), camel.activationFutureFor(system.actorOf(new Props( new UntypedActorFactory(){ public Actor create() { return new TestAckConsumer("direct:testConsumerException","mock:mockException"); } }), "testConsumerException"),
timeout), timeout, executionContext),
timeout); timeout);
camel.context().addRoutes(new CustomRouteBuilder("direct:testException", consumer, false, Duration.create(0, TimeUnit.SECONDS))); camel.context().addRoutes(new CustomRouteBuilder("direct:testException", consumer, false, Duration.create(0, TimeUnit.SECONDS)));
camel.template().sendBody("direct:testException", "test"); camel.template().sendBody("direct:testException", "test");

View file

@ -20,6 +20,7 @@ import java.util.concurrent.TimeoutException
class ActivationIntegrationTest extends WordSpec with MustMatchers with SharedCamelSystem { class ActivationIntegrationTest extends WordSpec with MustMatchers with SharedCamelSystem {
implicit val timeout = 10 seconds implicit val timeout = 10 seconds
def template: ProducerTemplate = camel.template def template: ProducerTemplate = camel.template
import system.dispatcher
"ActivationAware must be notified when endpoint is activated" in { "ActivationAware must be notified when endpoint is activated" in {
val latch = new TestLatch(0) val latch = new TestLatch(0)

View file

@ -17,12 +17,13 @@ import org.apache.camel.{ FailedToCreateRouteException, CamelExecutionException
import java.util.concurrent.{ ExecutionException, TimeUnit, TimeoutException } import java.util.concurrent.{ ExecutionException, TimeUnit, TimeoutException }
import akka.testkit.TestLatch import akka.testkit.TestLatch
import akka.actor.Status.Failure import akka.actor.Status.Failure
import scala.concurrent.Await
import scala.concurrent.util.duration._ import scala.concurrent.util.duration._
import concurrent.{ ExecutionContext, Await }
class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedCamelSystem { class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedCamelSystem {
"ConsumerIntegrationTest" must { "ConsumerIntegrationTest" must {
implicit val defaultTimeout = 10.seconds implicit val defaultTimeout = 10.seconds
implicit def ec: ExecutionContext = system.dispatcher
"Consumer must throw FailedToCreateRouteException, while awaiting activation, if endpoint is invalid" in { "Consumer must throw FailedToCreateRouteException, while awaiting activation, if endpoint is invalid" in {
val actorRef = system.actorOf(Props(new TestActor(uri = "some invalid uri"))) val actorRef = system.actorOf(Props(new TestActor(uri = "some invalid uri")))

View file

@ -16,6 +16,8 @@ import scala.concurrent.Await
class ProducerRegistryTest extends WordSpec with MustMatchers with SharedCamelSystem { class ProducerRegistryTest extends WordSpec with MustMatchers with SharedCamelSystem {
implicit val timeout = 5.seconds implicit val timeout = 5.seconds
implicit val ec = system.dispatcher
"A ProducerRegistry" must { "A ProducerRegistry" must {
def newEmptyActor: ActorRef = system.actorOf(Props.empty) def newEmptyActor: ActorRef = system.actorOf(Props.empty)
def registerProcessorFor(actorRef: ActorRef) = camel.registerProducer(actorRef, "mock:mock")._2 def registerProcessorFor(actorRef: ActorRef) = camel.registerProducer(actorRef, "mock:mock")._2

View file

@ -21,7 +21,7 @@ private[camel] object TestSupport {
def start(actor: Actor)(implicit system: ActorSystem): ActorRef = { def start(actor: Actor)(implicit system: ActorSystem): ActorRef = {
val actorRef = system.actorOf(Props(actor)) val actorRef = system.actorOf(Props(actor))
Await.result(CamelExtension(system).activationFutureFor(actorRef)(10 seconds), 10 seconds) Await.result(CamelExtension(system).activationFutureFor(actorRef)(10 seconds, system.dispatcher), 10 seconds)
actorRef actorRef
} }

View file

@ -29,13 +29,13 @@ public class ActivationTestBase {
Camel camel = CamelExtension.get(system); Camel camel = CamelExtension.get(system);
// get a future reference to the activation of the endpoint of the Consumer Actor // get a future reference to the activation of the endpoint of the Consumer Actor
FiniteDuration duration = Duration.create(10, SECONDS); FiniteDuration duration = Duration.create(10, SECONDS);
Future<ActorRef> activationFuture = camel.activationFutureFor(producer, duration); Future<ActorRef> activationFuture = camel.activationFutureFor(producer, duration, system.dispatcher());
//#CamelActivation //#CamelActivation
//#CamelDeactivation //#CamelDeactivation
// .. // ..
system.stop(producer); system.stop(producer);
// get a future reference to the deactivation of the endpoint of the Consumer Actor // get a future reference to the deactivation of the endpoint of the Consumer Actor
Future<ActorRef> deactivationFuture = camel.activationFutureFor(producer, duration); Future<ActorRef> deactivationFuture = camel.deactivationFutureFor(producer, duration, system.dispatcher());
//#CamelDeactivation //#CamelDeactivation
system.shutdown(); system.shutdown();
} }

View file

@ -92,12 +92,12 @@ object Introduction {
val camel = CamelExtension(system) val camel = CamelExtension(system)
val actorRef = system.actorOf(Props[MyEndpoint]) val actorRef = system.actorOf(Props[MyEndpoint])
// get a future reference to the activation of the endpoint of the Consumer Actor // get a future reference to the activation of the endpoint of the Consumer Actor
val activationFuture = camel.activationFutureFor(actorRef)(timeout = 10 seconds) val activationFuture = camel.activationFutureFor(actorRef)(timeout = 10 seconds, executor = system.dispatcher)
//#CamelActivation //#CamelActivation
//#CamelDeactivation //#CamelDeactivation
system.stop(actorRef) system.stop(actorRef)
// get a future reference to the deactivation of the endpoint of the Consumer Actor // get a future reference to the deactivation of the endpoint of the Consumer Actor
val deactivationFuture = camel.deactivationFutureFor(actorRef)(timeout = 10 seconds) val deactivationFuture = camel.deactivationFutureFor(actorRef)(timeout = 10 seconds, executor = system.dispatcher)
//#CamelDeactivation //#CamelDeactivation
} }