Removing awaitActivation and awaitDeactivation and just retain the future-returning methods as to promote not to block.
This commit is contained in:
parent
52e4a18ebe
commit
7ba74434de
10 changed files with 179 additions and 215 deletions
|
|
@ -7,7 +7,6 @@ package akka.camel
|
|||
import akka.camel.internal._
|
||||
import akka.util.Timeout
|
||||
import scala.concurrent.Future
|
||||
import java.util.concurrent.TimeoutException
|
||||
import akka.actor.{ ActorSystem, Props, ActorRef }
|
||||
import akka.pattern._
|
||||
import scala.concurrent.util.Duration
|
||||
|
|
@ -17,72 +16,33 @@ import scala.concurrent.util.Duration
|
|||
* The Camel endpoints are activated asynchronously. This trait can signal when an endpoint is activated or de-activated.
|
||||
*/
|
||||
trait Activation {
|
||||
import scala.concurrent.Await
|
||||
|
||||
def system: ActorSystem //FIXME Why is this here, what's it needed for and who should use it?
|
||||
|
||||
private val activationTracker = system.actorOf(Props[ActivationTracker], "camelActivationTracker") //FIXME Why is this also top level?
|
||||
|
||||
/**
|
||||
* Awaits for endpoint to be activated. It blocks until the endpoint is registered in camel context or timeout expires.
|
||||
* @param endpoint the endpoint to wait for to be activated
|
||||
* @param timeout the timeout for the wait
|
||||
* @throws akka.camel.ActivationTimeoutException if endpoint is not activated within timeout.
|
||||
* @return the activated ActorRef
|
||||
*/
|
||||
def awaitActivation(endpoint: ActorRef, timeout: Duration): ActorRef =
|
||||
try Await.result(activationFutureFor(endpoint, timeout), timeout) catch {
|
||||
case e: TimeoutException ⇒ throw new ActivationTimeoutException(endpoint, timeout)
|
||||
}
|
||||
|
||||
/**
|
||||
* Awaits for endpoint to be de-activated. It is blocking until endpoint is unregistered in camel context or timeout expires.
|
||||
* @param endpoint the endpoint to wait for to be de-activated
|
||||
* @param timeout the timeout for the wait
|
||||
* @throws akka.camel.DeActivationTimeoutException if endpoint is not de-activated within timeout.
|
||||
*/
|
||||
def awaitDeactivation(endpoint: ActorRef, timeout: Duration): Unit =
|
||||
try Await.result(deactivationFutureFor(endpoint, timeout), timeout) catch {
|
||||
case e: TimeoutException ⇒ throw new DeActivationTimeoutException(endpoint, timeout)
|
||||
}
|
||||
|
||||
/**
|
||||
* Similar to `awaitActivation` but returns a future instead.
|
||||
* Produces a Future with the specified endpoint that will be completed when the endpoint has been activated,
|
||||
* or if it times out, which will happen after the specified Timeout.
|
||||
*
|
||||
* @param endpoint the endpoint to be activated
|
||||
* @param timeout the timeout for the Future
|
||||
*/
|
||||
def activationFutureFor(endpoint: ActorRef, timeout: Duration): Future[ActorRef] =
|
||||
def activationFutureFor(endpoint: ActorRef)(implicit timeout: Duration): Future[ActorRef] =
|
||||
(activationTracker.ask(AwaitActivation(endpoint))(Timeout(timeout))).map[ActorRef]({
|
||||
case EndpointActivated(_) ⇒ endpoint
|
||||
case EndpointActivated(`endpoint`) ⇒ endpoint
|
||||
case EndpointFailedToActivate(_, cause) ⇒ throw cause
|
||||
})(system.dispatcher)
|
||||
|
||||
/**
|
||||
* Similar to awaitDeactivation but returns a future instead.
|
||||
* Produces a Future which will be completed when the given endpoint has been deactivated or
|
||||
* or if it times out, which will happen after the specified Timeout.
|
||||
*
|
||||
* @param endpoint the endpoint to be deactivated
|
||||
* @param timeout the timeout of the Future
|
||||
*/
|
||||
def deactivationFutureFor(endpoint: ActorRef, timeout: Duration): Future[Unit] =
|
||||
def deactivationFutureFor(endpoint: ActorRef)(implicit timeout: Duration): Future[Unit] =
|
||||
(activationTracker.ask(AwaitDeActivation(endpoint))(Timeout(timeout))).map[Unit]({
|
||||
case EndpointDeActivated(_) ⇒ ()
|
||||
case EndpointDeActivated(`endpoint`) ⇒ ()
|
||||
case EndpointFailedToDeActivate(_, cause) ⇒ throw cause
|
||||
})(system.dispatcher)
|
||||
}
|
||||
|
||||
/**
|
||||
* An exception for when a timeout has occurred during deactivation of an endpoint.
|
||||
* @param endpoint the endpoint that could not be de-activated in time
|
||||
* @param timeout the timeout
|
||||
*/
|
||||
class DeActivationTimeoutException(endpoint: ActorRef, timeout: Duration) extends TimeoutException {
|
||||
override def getMessage: String = "Timed out after %s, while waiting for de-activation of %s" format (timeout, endpoint.path)
|
||||
}
|
||||
|
||||
/**
|
||||
* An exception for when a timeout has occurred during the activation of an endpoint.
|
||||
* @param endpoint the endpoint that could not be activated in time
|
||||
* @param timeout the timeout
|
||||
*/
|
||||
class ActivationTimeoutException(endpoint: ActorRef, timeout: Duration) extends TimeoutException {
|
||||
override def getMessage: String = "Timed out after %s, while waiting for activation of %s" format (timeout, endpoint.path)
|
||||
}
|
||||
|
|
@ -15,6 +15,8 @@ import collection.mutable
|
|||
import org.apache.camel.model.RouteDefinition
|
||||
import org.apache.camel.CamelContext
|
||||
import scala.concurrent.util.Duration
|
||||
import concurrent.Await
|
||||
import akka.util.Timeout
|
||||
|
||||
/**
|
||||
* For internal use only.
|
||||
|
|
@ -30,7 +32,7 @@ private[camel] trait ConsumerRegistry { this: Activation ⇒
|
|||
*/
|
||||
private[this] lazy val idempotentRegistry = system.actorOf(Props(new IdempotentCamelConsumerRegistry(context)))
|
||||
/**
|
||||
* For internal use only.
|
||||
* For internal use only. BLOCKING
|
||||
* @param endpointUri the URI to register the consumer on
|
||||
* @param consumer the consumer
|
||||
* @param activationTimeout the timeout for activation
|
||||
|
|
@ -38,7 +40,7 @@ private[camel] trait ConsumerRegistry { this: Activation ⇒
|
|||
*/
|
||||
private[camel] def registerConsumer(endpointUri: String, consumer: Consumer, activationTimeout: Duration) = {
|
||||
idempotentRegistry ! RegisterConsumer(endpointUri, consumer.self, consumer)
|
||||
awaitActivation(consumer.self, activationTimeout)
|
||||
Await.result(activationFutureFor(consumer.self)(activationTimeout), activationTimeout)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -79,6 +81,7 @@ private[camel] class IdempotentCamelConsumerRegistry(camelContext: CamelContext)
|
|||
|
||||
def isAlreadyActivated(ref: ActorRef): Boolean = activated.contains(ref)
|
||||
|
||||
//FIXME Break out
|
||||
class CamelConsumerRegistrator extends Actor with ActorLogging {
|
||||
|
||||
def receive = {
|
||||
|
|
|
|||
|
|
@ -7,10 +7,12 @@ package akka.camel;
|
|||
import akka.actor.ActorRef;
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.actor.Props;
|
||||
import scala.concurrent.util.FiniteDuration;
|
||||
import scala.concurrent.Await;
|
||||
import scala.concurrent.util.Duration;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
|
@ -22,7 +24,7 @@ import static org.junit.Assert.assertEquals;
|
|||
public class ConsumerJavaTestBase {
|
||||
|
||||
static ActorSystem system = ActorSystem.create("test");
|
||||
static Camel camel = (Camel) CamelExtension.get(system);
|
||||
static Camel camel = CamelExtension.get(system);
|
||||
|
||||
|
||||
@AfterClass
|
||||
|
|
@ -31,9 +33,11 @@ public class ConsumerJavaTestBase {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void shouldHandleExceptionThrownByActorAndGenerateCustomResponse() {
|
||||
ActorRef ref = system.actorOf(new Props().withCreator(SampleErrorHandlingConsumer.class));
|
||||
camel.awaitActivation(ref, new FiniteDuration(1, TimeUnit.SECONDS));
|
||||
public void shouldHandleExceptionThrownByActorAndGenerateCustomResponse() throws Exception {
|
||||
Duration timeout = Duration.create(1, TimeUnit.SECONDS);
|
||||
ActorRef ref = Await.result(
|
||||
camel.activationFutureFor(system.actorOf(new Props(SampleErrorHandlingConsumer.class)), timeout),
|
||||
timeout);
|
||||
|
||||
String result = camel.template().requestBody("direct:error-handler-test-java", "hello", String.class);
|
||||
assertEquals("error: hello", result);
|
||||
|
|
|
|||
|
|
@ -4,7 +4,8 @@ import akka.actor.*;
|
|||
import akka.camel.internal.component.CamelPath;
|
||||
import akka.camel.javaapi.UntypedConsumerActor;
|
||||
import akka.camel.javaapi.UntypedProducerActor;
|
||||
import scala.concurrent.util.FiniteDuration;
|
||||
import scala.concurrent.Await;
|
||||
import scala.concurrent.util.Duration;
|
||||
import org.apache.camel.CamelExecutionException;
|
||||
import org.apache.camel.Exchange;
|
||||
import org.apache.camel.Predicate;
|
||||
|
|
@ -13,7 +14,6 @@ import org.apache.camel.component.mock.MockEndpoint;
|
|||
import org.junit.Before;
|
||||
import org.junit.After;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class CustomRouteTestBase {
|
||||
|
|
@ -58,8 +58,10 @@ public class CustomRouteTestBase {
|
|||
@Test
|
||||
public void testCustomConsumerRoute() throws Exception {
|
||||
MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockConsumer", MockEndpoint.class);
|
||||
ActorRef consumer = system.actorOf(new Props(TestConsumer.class), "testConsumer");
|
||||
camel.awaitActivation(consumer,new FiniteDuration(10, TimeUnit.SECONDS));
|
||||
Duration timeout = Duration.create(10, TimeUnit.SECONDS);
|
||||
ActorRef consumer = Await.result(
|
||||
camel.activationFutureFor(system.actorOf(new Props(TestConsumer.class), "testConsumer"), timeout),
|
||||
timeout);
|
||||
camel.context().addRoutes(new CustomRouteBuilder("direct:testRouteConsumer",consumer));
|
||||
camel.template().sendBody("direct:testRouteConsumer", "test");
|
||||
assertMockEndpoint(mockEndpoint);
|
||||
|
|
@ -69,13 +71,14 @@ public class CustomRouteTestBase {
|
|||
@Test
|
||||
public void testCustomAckConsumerRoute() throws Exception {
|
||||
MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockAck", MockEndpoint.class);
|
||||
ActorRef consumer = system.actorOf(new Props( new UntypedActorFactory(){
|
||||
public Actor create() {
|
||||
return new TestAckConsumer("direct:testConsumerAck","mock:mockAck");
|
||||
}
|
||||
}), "testConsumerAck");
|
||||
camel.awaitActivation(consumer,new FiniteDuration(10, TimeUnit.SECONDS));
|
||||
camel.context().addRoutes(new CustomRouteBuilder("direct:testAck", consumer, false, new FiniteDuration(10, TimeUnit.SECONDS)));
|
||||
Duration timeout = Duration.create(10, TimeUnit.SECONDS);
|
||||
ActorRef consumer = Await.result(
|
||||
camel.activationFutureFor(
|
||||
system.actorOf(
|
||||
new Props( new UntypedActorFactory(){ public Actor create() { return new TestAckConsumer("direct:testConsumerAck","mock:mockAck"); } }), "testConsumerAck"),
|
||||
timeout),
|
||||
timeout);
|
||||
camel.context().addRoutes(new CustomRouteBuilder("direct:testAck", consumer, false, timeout));
|
||||
camel.template().sendBody("direct:testAck", "test");
|
||||
assertMockEndpoint(mockEndpoint);
|
||||
system.stop(consumer);
|
||||
|
|
@ -84,12 +87,11 @@ public class CustomRouteTestBase {
|
|||
@Test
|
||||
public void testCustomAckConsumerRouteFromUri() throws Exception {
|
||||
MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockAckUri", MockEndpoint.class);
|
||||
ActorRef consumer = system.actorOf(new Props( new UntypedActorFactory(){
|
||||
public Actor create() {
|
||||
return new TestAckConsumer("direct:testConsumerAckFromUri","mock:mockAckUri");
|
||||
}
|
||||
}), "testConsumerAckUri");
|
||||
camel.awaitActivation(consumer,new FiniteDuration(10, TimeUnit.SECONDS));
|
||||
Duration timeout = Duration.create(10, TimeUnit.SECONDS);
|
||||
ActorRef consumer = Await.result(
|
||||
camel.activationFutureFor(system.actorOf(new Props( new UntypedActorFactory(){ public Actor create() { return new TestAckConsumer("direct:testConsumerAckFromUri","mock:mockAckUri"); } }), "testConsumerAckUri"),
|
||||
timeout),
|
||||
timeout);
|
||||
camel.context().addRoutes(new CustomRouteBuilder("direct:testAckFromUri","akka://test/user/testConsumerAckUri?autoAck=false"));
|
||||
camel.template().sendBody("direct:testAckFromUri", "test");
|
||||
assertMockEndpoint(mockEndpoint);
|
||||
|
|
@ -98,13 +100,12 @@ public class CustomRouteTestBase {
|
|||
|
||||
@Test(expected=CamelExecutionException.class)
|
||||
public void testCustomTimeoutConsumerRoute() throws Exception {
|
||||
ActorRef consumer = system.actorOf(new Props( new UntypedActorFactory(){
|
||||
public Actor create() {
|
||||
return new TestAckConsumer("direct:testConsumerException","mock:mockException");
|
||||
}
|
||||
}), "testConsumerException");
|
||||
camel.awaitActivation(consumer, new FiniteDuration(10, TimeUnit.SECONDS));
|
||||
camel.context().addRoutes(new CustomRouteBuilder("direct:testException", consumer, false, new FiniteDuration(0, TimeUnit.SECONDS)));
|
||||
Duration timeout = Duration.create(10, TimeUnit.SECONDS);
|
||||
ActorRef consumer = Await.result(
|
||||
camel.activationFutureFor(system.actorOf(new Props( new UntypedActorFactory(){ public Actor create() { return new TestAckConsumer("direct:testConsumerException","mock:mockException"); } }), "testConsumerException"),
|
||||
timeout),
|
||||
timeout);
|
||||
camel.context().addRoutes(new CustomRouteBuilder("direct:testException", consumer, false, Duration.create(0, TimeUnit.SECONDS)));
|
||||
camel.template().sendBody("direct:testException", "test");
|
||||
}
|
||||
|
||||
|
|
@ -132,7 +133,7 @@ public class CustomRouteTestBase {
|
|||
uri = CamelPath.toUri(actor);
|
||||
}
|
||||
|
||||
public CustomRouteBuilder(String from, ActorRef actor, boolean autoAck, FiniteDuration replyTimeout) {
|
||||
public CustomRouteBuilder(String from, ActorRef actor, boolean autoAck, Duration replyTimeout) {
|
||||
fromUri = from;
|
||||
uri = CamelPath.toUri(actor, autoAck, replyTimeout);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -15,15 +15,16 @@ import TestSupport._
|
|||
import org.scalatest.WordSpec
|
||||
import akka.testkit.TestLatch
|
||||
import scala.concurrent.Await
|
||||
import java.util.concurrent.TimeoutException
|
||||
|
||||
class ActivationIntegrationTest extends WordSpec with MustMatchers with SharedCamelSystem {
|
||||
implicit val timeout = Timeout(10 seconds)
|
||||
implicit val timeout = 10 seconds
|
||||
def template: ProducerTemplate = camel.template
|
||||
|
||||
"ActivationAware must be notified when endpoint is activated" in {
|
||||
val latch = new TestLatch(0)
|
||||
val actor = system.actorOf(Props(new TestConsumer("direct:actor-1", latch)))
|
||||
camel.awaitActivation(actor, 10 second) must be === actor
|
||||
Await.result(camel.activationFutureFor(actor), 10 seconds) must be === actor
|
||||
|
||||
template.requestBody("direct:actor-1", "test") must be("received test")
|
||||
}
|
||||
|
|
@ -39,27 +40,25 @@ class ActivationIntegrationTest extends WordSpec with MustMatchers with SharedCa
|
|||
latch.countDown()
|
||||
}
|
||||
})
|
||||
camel.awaitActivation(actor, 10 second)
|
||||
Await.result(camel.activationFutureFor(actor), timeout)
|
||||
|
||||
system.stop(actor)
|
||||
camel.awaitDeactivation(actor, 10 second)
|
||||
Await.result(camel.deactivationFutureFor(actor), timeout)
|
||||
Await.ready(latch, 10 second)
|
||||
}
|
||||
|
||||
"ActivationAware must time out when waiting for endpoint de-activation for too long" in {
|
||||
val latch = new TestLatch(0)
|
||||
val actor = start(new TestConsumer("direct:a5", latch))
|
||||
camel.awaitActivation(actor, 10 second)
|
||||
intercept[DeActivationTimeoutException] {
|
||||
camel.awaitDeactivation(actor, 1 millis)
|
||||
}
|
||||
Await.result(camel.activationFutureFor(actor), timeout)
|
||||
intercept[TimeoutException] { Await.result(camel.deactivationFutureFor(actor), 1 millis) }
|
||||
}
|
||||
|
||||
"awaitActivation must fail if notification timeout is too short and activation is not complete yet" in {
|
||||
"activationFutureFor must fail if notification timeout is too short and activation is not complete yet" in {
|
||||
val latch = new TestLatch(1)
|
||||
try {
|
||||
val actor = system.actorOf(Props(new TestConsumer("direct:actor-4", latch)))
|
||||
intercept[ActivationTimeoutException] { camel.awaitActivation(actor, 1 millis) }
|
||||
intercept[TimeoutException] { Await.result(camel.activationFutureFor(actor), 1 millis) }
|
||||
} finally latch.countDown()
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -9,147 +9,147 @@ import language.existentials
|
|||
|
||||
import akka.actor._
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
import scala.concurrent.util.duration._
|
||||
import TestSupport._
|
||||
import org.scalatest.WordSpec
|
||||
import akka.camel.TestSupport._
|
||||
import org.apache.camel.model.RouteDefinition
|
||||
import org.apache.camel.builder.Builder
|
||||
import org.apache.camel.{ FailedToCreateRouteException, CamelExecutionException }
|
||||
import java.util.concurrent.{ ExecutionException, TimeUnit, TimeoutException }
|
||||
import akka.testkit.TestLatch
|
||||
import scala.concurrent.Await
|
||||
import akka.actor.Status.Failure
|
||||
import scala.concurrent.Await
|
||||
import scala.concurrent.util.duration._
|
||||
|
||||
class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedCamelSystem {
|
||||
private val defaultTimeout = 10
|
||||
"Consumer must throw FailedToCreateRouteException, while awaiting activation, if endpoint is invalid" in {
|
||||
val actorRef = system.actorOf(Props(new TestActor(uri = "some invalid uri")))
|
||||
"ConsumerIntegrationTest" must {
|
||||
implicit val defaultTimeout = 10.seconds
|
||||
|
||||
intercept[FailedToCreateRouteException] {
|
||||
camel.awaitActivation(actorRef, timeout = defaultTimeout seconds)
|
||||
"Consumer must throw FailedToCreateRouteException, while awaiting activation, if endpoint is invalid" in {
|
||||
val actorRef = system.actorOf(Props(new TestActor(uri = "some invalid uri")))
|
||||
intercept[FailedToCreateRouteException] { Await.result(camel.activationFutureFor(actorRef), defaultTimeout) }
|
||||
}
|
||||
}
|
||||
|
||||
"Consumer must support in-out messaging" in {
|
||||
start(new Consumer {
|
||||
def endpointUri = "direct:a1"
|
||||
def receive = {
|
||||
case m: CamelMessage ⇒ sender ! "received " + m.bodyAs[String]
|
||||
}
|
||||
})
|
||||
camel.sendTo("direct:a1", msg = "some message") must be("received some message")
|
||||
}
|
||||
|
||||
"Consumer must time-out if consumer is slow" in {
|
||||
val SHORT_TIMEOUT = 10 millis
|
||||
val LONG_WAIT = 200 millis
|
||||
|
||||
start(new Consumer {
|
||||
override def replyTimeout = SHORT_TIMEOUT
|
||||
|
||||
def endpointUri = "direct:a3"
|
||||
def receive = { case _ ⇒ { Thread.sleep(LONG_WAIT.toMillis); sender ! "done" } }
|
||||
})
|
||||
|
||||
val exception = intercept[CamelExecutionException] {
|
||||
camel.sendTo("direct:a3", msg = "some msg 3")
|
||||
"Consumer must support in-out messaging" in {
|
||||
start(new Consumer {
|
||||
def endpointUri = "direct:a1"
|
||||
def receive = {
|
||||
case m: CamelMessage ⇒ sender ! "received " + m.bodyAs[String]
|
||||
}
|
||||
})
|
||||
camel.sendTo("direct:a1", msg = "some message") must be("received some message")
|
||||
}
|
||||
exception.getCause.getClass must be(classOf[TimeoutException])
|
||||
}
|
||||
|
||||
"Consumer must process messages even after actor restart" in {
|
||||
val restarted = TestLatch()
|
||||
val consumer = start(new Consumer {
|
||||
def endpointUri = "direct:a2"
|
||||
"Consumer must time-out if consumer is slow" in {
|
||||
val SHORT_TIMEOUT = 10 millis
|
||||
val LONG_WAIT = 200 millis
|
||||
|
||||
def receive = {
|
||||
case "throw" ⇒ throw new Exception
|
||||
case m: CamelMessage ⇒ sender ! "received " + m.bodyAs[String]
|
||||
start(new Consumer {
|
||||
override def replyTimeout = SHORT_TIMEOUT
|
||||
|
||||
def endpointUri = "direct:a3"
|
||||
def receive = { case _ ⇒ { Thread.sleep(LONG_WAIT.toMillis); sender ! "done" } }
|
||||
})
|
||||
|
||||
val exception = intercept[CamelExecutionException] {
|
||||
camel.sendTo("direct:a3", msg = "some msg 3")
|
||||
}
|
||||
exception.getCause.getClass must be(classOf[TimeoutException])
|
||||
}
|
||||
|
||||
override def postRestart(reason: Throwable) {
|
||||
restarted.countDown()
|
||||
}
|
||||
})
|
||||
consumer ! "throw"
|
||||
Await.ready(restarted, defaultTimeout seconds)
|
||||
"Consumer must process messages even after actor restart" in {
|
||||
val restarted = TestLatch()
|
||||
val consumer = start(new Consumer {
|
||||
def endpointUri = "direct:a2"
|
||||
|
||||
val response = camel.sendTo("direct:a2", msg = "xyz")
|
||||
response must be("received xyz")
|
||||
}
|
||||
def receive = {
|
||||
case "throw" ⇒ throw new Exception
|
||||
case m: CamelMessage ⇒ sender ! "received " + m.bodyAs[String]
|
||||
}
|
||||
|
||||
"Consumer must unregister itself when stopped" in {
|
||||
val consumer = start(new TestActor())
|
||||
camel.awaitActivation(consumer, defaultTimeout seconds)
|
||||
override def postRestart(reason: Throwable) {
|
||||
restarted.countDown()
|
||||
}
|
||||
})
|
||||
consumer ! "throw"
|
||||
Await.ready(restarted, defaultTimeout)
|
||||
|
||||
camel.routeCount must be > (0)
|
||||
val response = camel.sendTo("direct:a2", msg = "xyz")
|
||||
response must be("received xyz")
|
||||
}
|
||||
|
||||
system.stop(consumer)
|
||||
camel.awaitDeactivation(consumer, defaultTimeout seconds)
|
||||
"Consumer must unregister itself when stopped" in {
|
||||
val consumer = start(new TestActor())
|
||||
Await.result(camel.activationFutureFor(consumer), defaultTimeout)
|
||||
|
||||
camel.routeCount must be(0)
|
||||
}
|
||||
camel.routeCount must be > (0)
|
||||
|
||||
"Consumer must register on uri passed in through constructor" in {
|
||||
val consumer = start(new TestActor("direct://test"))
|
||||
camel.awaitActivation(consumer, defaultTimeout seconds)
|
||||
system.stop(consumer)
|
||||
Await.result(camel.deactivationFutureFor(consumer), defaultTimeout)
|
||||
|
||||
camel.routeCount must be > (0)
|
||||
camel.routes.get(0).getEndpoint.getEndpointUri must be("direct://test")
|
||||
system.stop(consumer)
|
||||
camel.awaitDeactivation(consumer, defaultTimeout seconds)
|
||||
camel.routeCount must be(0)
|
||||
}
|
||||
|
||||
camel.routeCount must be(0)
|
||||
}
|
||||
"Consumer must register on uri passed in through constructor" in {
|
||||
val consumer = start(new TestActor("direct://test"))
|
||||
Await.result(camel.activationFutureFor(consumer), defaultTimeout)
|
||||
|
||||
"Error passing consumer supports error handling through route modification" in {
|
||||
start(new ErrorThrowingConsumer("direct:error-handler-test") with ErrorPassing {
|
||||
override def onRouteDefinition(rd: RouteDefinition) = {
|
||||
rd.onException(classOf[Exception]).handled(true).transform(Builder.exceptionMessage).end
|
||||
}
|
||||
})
|
||||
camel.sendTo("direct:error-handler-test", msg = "hello") must be("error: hello")
|
||||
}
|
||||
camel.routeCount must be > (0)
|
||||
camel.routes.get(0).getEndpoint.getEndpointUri must be("direct://test")
|
||||
system.stop(consumer)
|
||||
Await.result(camel.deactivationFutureFor(consumer), defaultTimeout)
|
||||
|
||||
"Error passing consumer supports redelivery through route modification" in {
|
||||
start(new FailingOnceConsumer("direct:failing-once-concumer") with ErrorPassing {
|
||||
override def onRouteDefinition(rd: RouteDefinition) = {
|
||||
rd.onException(classOf[Exception]).maximumRedeliveries(1).end
|
||||
}
|
||||
})
|
||||
camel.sendTo("direct:failing-once-concumer", msg = "hello") must be("accepted: hello")
|
||||
}
|
||||
camel.routeCount must be(0)
|
||||
}
|
||||
|
||||
"Consumer supports manual Ack" in {
|
||||
start(new ManualAckConsumer() {
|
||||
def endpointUri = "direct:manual-ack"
|
||||
def receive = { case _ ⇒ sender ! Ack }
|
||||
})
|
||||
camel.template.asyncSendBody("direct:manual-ack", "some message").get(defaultTimeout, TimeUnit.SECONDS) must be(null) //should not timeout
|
||||
}
|
||||
"Error passing consumer supports error handling through route modification" in {
|
||||
start(new ErrorThrowingConsumer("direct:error-handler-test") with ErrorPassing {
|
||||
override def onRouteDefinition(rd: RouteDefinition) = {
|
||||
rd.onException(classOf[Exception]).handled(true).transform(Builder.exceptionMessage).end
|
||||
}
|
||||
})
|
||||
camel.sendTo("direct:error-handler-test", msg = "hello") must be("error: hello")
|
||||
}
|
||||
|
||||
"Consumer handles manual Ack failure" in {
|
||||
val someException = new Exception("e1")
|
||||
start(new ManualAckConsumer() {
|
||||
def endpointUri = "direct:manual-ack"
|
||||
def receive = { case _ ⇒ sender ! Failure(someException) }
|
||||
})
|
||||
"Error passing consumer supports redelivery through route modification" in {
|
||||
start(new FailingOnceConsumer("direct:failing-once-concumer") with ErrorPassing {
|
||||
override def onRouteDefinition(rd: RouteDefinition) = {
|
||||
rd.onException(classOf[Exception]).maximumRedeliveries(1).end
|
||||
}
|
||||
})
|
||||
camel.sendTo("direct:failing-once-concumer", msg = "hello") must be("accepted: hello")
|
||||
}
|
||||
|
||||
intercept[ExecutionException] {
|
||||
camel.template.asyncSendBody("direct:manual-ack", "some message").get(defaultTimeout, TimeUnit.SECONDS)
|
||||
}.getCause.getCause must be(someException)
|
||||
}
|
||||
"Consumer supports manual Ack" in {
|
||||
start(new ManualAckConsumer() {
|
||||
def endpointUri = "direct:manual-ack"
|
||||
def receive = { case _ ⇒ sender ! Ack }
|
||||
})
|
||||
camel.template.asyncSendBody("direct:manual-ack", "some message").get(defaultTimeout.toSeconds, TimeUnit.SECONDS) must be(null) //should not timeout
|
||||
}
|
||||
|
||||
"Consumer should time-out, if manual Ack not received within replyTimeout and should give a human readable error message" in {
|
||||
start(new ManualAckConsumer() {
|
||||
override def replyTimeout = 10 millis
|
||||
def endpointUri = "direct:manual-ack"
|
||||
def receive = { case _ ⇒ }
|
||||
})
|
||||
"Consumer handles manual Ack failure" in {
|
||||
val someException = new Exception("e1")
|
||||
start(new ManualAckConsumer() {
|
||||
def endpointUri = "direct:manual-ack"
|
||||
def receive = { case _ ⇒ sender ! Failure(someException) }
|
||||
})
|
||||
|
||||
intercept[ExecutionException] {
|
||||
camel.template.asyncSendBody("direct:manual-ack", "some message").get(defaultTimeout, TimeUnit.SECONDS)
|
||||
}.getCause.getCause.getMessage must include("Failed to get Ack")
|
||||
intercept[ExecutionException] {
|
||||
camel.template.asyncSendBody("direct:manual-ack", "some message").get(defaultTimeout.toSeconds, TimeUnit.SECONDS)
|
||||
}.getCause.getCause must be(someException)
|
||||
}
|
||||
|
||||
"Consumer should time-out, if manual Ack not received within replyTimeout and should give a human readable error message" in {
|
||||
start(new ManualAckConsumer() {
|
||||
override def replyTimeout = 10 millis
|
||||
def endpointUri = "direct:manual-ack"
|
||||
def receive = { case _ ⇒ }
|
||||
})
|
||||
|
||||
intercept[ExecutionException] {
|
||||
camel.template.asyncSendBody("direct:manual-ack", "some message").get(defaultTimeout.toSeconds, TimeUnit.SECONDS)
|
||||
}.getCause.getCause.getMessage must include("Failed to get Ack")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -11,9 +11,11 @@ import org.scalatest.WordSpec
|
|||
import akka.camel.TestSupport.SharedCamelSystem
|
||||
import scala.concurrent.util.duration._
|
||||
import akka.actor.{ ActorRef, Props }
|
||||
import akka.util.Timeout
|
||||
import scala.concurrent.Await
|
||||
|
||||
class ProducerRegistryTest extends WordSpec with MustMatchers with SharedCamelSystem {
|
||||
|
||||
implicit val timeout = 5.seconds
|
||||
"A ProducerRegistry" must {
|
||||
def newEmptyActor: ActorRef = system.actorOf(Props.empty)
|
||||
def registerProcessorFor(actorRef: ActorRef) = camel.registerProducer(actorRef, "mock:mock")._2
|
||||
|
|
@ -21,10 +23,10 @@ class ProducerRegistryTest extends WordSpec with MustMatchers with SharedCamelSy
|
|||
"register a started SendProcessor for the producer, which is stopped when the actor is stopped" in {
|
||||
val actorRef = newEmptyActor
|
||||
val processor = registerProcessorFor(actorRef)
|
||||
camel.awaitActivation(actorRef, 5 second)
|
||||
Await.result(camel.activationFutureFor(actorRef), timeout)
|
||||
processor.isStarted must be(true)
|
||||
system.stop(actorRef)
|
||||
camel.awaitDeactivation(actorRef, 5 second)
|
||||
Await.result(camel.deactivationFutureFor(actorRef), timeout)
|
||||
(processor.isStopping || processor.isStopped) must be(true)
|
||||
}
|
||||
"remove and stop the SendProcessor if the actorRef is registered" in {
|
||||
|
|
|
|||
|
|
@ -14,12 +14,14 @@ import org.scalatest.matchers.{ BePropertyMatcher, BePropertyMatchResult }
|
|||
import scala.concurrent.util.{ FiniteDuration, Duration }
|
||||
import scala.reflect.ClassTag
|
||||
import akka.actor.{ ActorRef, Props, ActorSystem, Actor }
|
||||
import concurrent.Await
|
||||
import akka.util.Timeout
|
||||
|
||||
private[camel] object TestSupport {
|
||||
|
||||
def start(actor: ⇒ Actor)(implicit system: ActorSystem): ActorRef = {
|
||||
val actorRef = system.actorOf(Props(actor))
|
||||
CamelExtension(system).awaitActivation(actorRef, 10 seconds)
|
||||
Await.result(CamelExtension(system).activationFutureFor(actorRef)(10 seconds), 10 seconds)
|
||||
actorRef
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue