Quiet down the camel tests. See #2339
This commit is contained in:
parent
9094199011
commit
4d114eb2c6
5 changed files with 103 additions and 64 deletions
|
|
@ -7,13 +7,14 @@ package akka.camel;
|
||||||
import akka.actor.ActorRef;
|
import akka.actor.ActorRef;
|
||||||
import akka.actor.ActorSystem;
|
import akka.actor.ActorSystem;
|
||||||
import akka.actor.Props;
|
import akka.actor.Props;
|
||||||
|
import akka.testkit.JavaTestKit;
|
||||||
import scala.concurrent.Await;
|
import scala.concurrent.Await;
|
||||||
import scala.concurrent.util.Duration;
|
import scala.concurrent.util.Duration;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.util.concurrent.Future;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import akka.testkit.AkkaSpec;
|
||||||
|
import akka.testkit.JavaTestKit.EventFilter;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
|
@ -23,7 +24,7 @@ import static org.junit.Assert.assertEquals;
|
||||||
*/
|
*/
|
||||||
public class ConsumerJavaTestBase {
|
public class ConsumerJavaTestBase {
|
||||||
|
|
||||||
static ActorSystem system = ActorSystem.create("test");
|
static ActorSystem system = ActorSystem.create("test", AkkaSpec.testConf());
|
||||||
static Camel camel = CamelExtension.get(system);
|
static Camel camel = CamelExtension.get(system);
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -34,12 +35,22 @@ public class ConsumerJavaTestBase {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldHandleExceptionThrownByActorAndGenerateCustomResponse() throws Exception {
|
public void shouldHandleExceptionThrownByActorAndGenerateCustomResponse() throws Exception {
|
||||||
Duration timeout = Duration.create(1, TimeUnit.SECONDS);
|
new JavaTestKit(system) {{
|
||||||
ActorRef ref = Await.result(
|
String result = new EventFilter<String>(Exception.class) {
|
||||||
camel.activationFutureFor(system.actorOf(new Props(SampleErrorHandlingConsumer.class)), timeout),
|
protected String run() {
|
||||||
timeout);
|
Duration timeout = Duration.create(1, TimeUnit.SECONDS);
|
||||||
|
try {
|
||||||
String result = camel.template().requestBody("direct:error-handler-test-java", "hello", String.class);
|
ActorRef ref = Await.result(
|
||||||
assertEquals("error: hello", result);
|
camel.activationFutureFor(system.actorOf(new Props(SampleErrorHandlingConsumer.class)), timeout),
|
||||||
|
timeout);
|
||||||
|
return camel.template().requestBody("direct:error-handler-test-java", "hello", String.class);
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
return e.getMessage();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}.occurrences(1).exec();
|
||||||
|
assertEquals("error: hello", result);
|
||||||
|
}};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -15,18 +15,24 @@ import org.apache.camel.model.RouteDefinition
|
||||||
import org.apache.camel.builder.Builder
|
import org.apache.camel.builder.Builder
|
||||||
import org.apache.camel.{ FailedToCreateRouteException, CamelExecutionException }
|
import org.apache.camel.{ FailedToCreateRouteException, CamelExecutionException }
|
||||||
import java.util.concurrent.{ ExecutionException, TimeUnit, TimeoutException }
|
import java.util.concurrent.{ ExecutionException, TimeUnit, TimeoutException }
|
||||||
import akka.testkit.TestLatch
|
import akka.testkit._
|
||||||
import akka.actor.Status.Failure
|
import akka.actor.Status.Failure
|
||||||
import scala.concurrent.Await
|
import scala.concurrent.Await
|
||||||
import scala.concurrent.util.duration._
|
import scala.concurrent.util.duration._
|
||||||
|
import akka.actor.Status.Failure
|
||||||
|
import akka.actor.ActorKilledException
|
||||||
|
|
||||||
class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedCamelSystem {
|
class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedCamelSystem {
|
||||||
"ConsumerIntegrationTest" must {
|
"ConsumerIntegrationTest" must {
|
||||||
implicit val defaultTimeout = 10.seconds
|
implicit val defaultTimeout = 10.seconds
|
||||||
|
|
||||||
"Consumer must throw FailedToCreateRouteException, while awaiting activation, if endpoint is invalid" in {
|
"Consumer must throw FailedToCreateRouteException, while awaiting activation, if endpoint is invalid" in {
|
||||||
val actorRef = system.actorOf(Props(new TestActor(uri = "some invalid uri")))
|
filterEvents(EventFilter[ActorInitializationException](occurrences = 1), EventFilter[FailedToCreateRouteException](occurrences = 1)) {
|
||||||
intercept[FailedToCreateRouteException] { Await.result(camel.activationFutureFor(actorRef), defaultTimeout) }
|
val actorRef = system.actorOf(Props(new TestActor(uri = "some invalid uri")))
|
||||||
|
intercept[FailedToCreateRouteException] {
|
||||||
|
Await.result(camel.activationFutureFor(actorRef), defaultTimeout)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
"Consumer must support in-out messaging" in {
|
"Consumer must support in-out messaging" in {
|
||||||
|
|
@ -70,11 +76,13 @@ class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedC
|
||||||
restarted.countDown()
|
restarted.countDown()
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
consumer ! "throw"
|
filterEvents(EventFilter[Exception](occurrences = 1)) {
|
||||||
Await.ready(restarted, defaultTimeout)
|
consumer ! "throw"
|
||||||
|
Await.ready(restarted, defaultTimeout)
|
||||||
|
|
||||||
val response = camel.sendTo("direct:a2", msg = "xyz")
|
val response = camel.sendTo("direct:a2", msg = "xyz")
|
||||||
response must be("received xyz")
|
response must be("received xyz")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
"Consumer must unregister itself when stopped" in {
|
"Consumer must unregister itself when stopped" in {
|
||||||
|
|
@ -106,7 +114,9 @@ class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedC
|
||||||
rd.onException(classOf[Exception]).handled(true).transform(Builder.exceptionMessage).end
|
rd.onException(classOf[Exception]).handled(true).transform(Builder.exceptionMessage).end
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
camel.sendTo("direct:error-handler-test", msg = "hello") must be("error: hello")
|
filterEvents(EventFilter[Exception](occurrences = 1)) {
|
||||||
|
camel.sendTo("direct:error-handler-test", msg = "hello") must be("error: hello")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
"Error passing consumer supports redelivery through route modification" in {
|
"Error passing consumer supports redelivery through route modification" in {
|
||||||
|
|
@ -115,7 +125,9 @@ class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedC
|
||||||
rd.onException(classOf[Exception]).maximumRedeliveries(1).end
|
rd.onException(classOf[Exception]).maximumRedeliveries(1).end
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
camel.sendTo("direct:failing-once-concumer", msg = "hello") must be("accepted: hello")
|
filterEvents(EventFilter[Exception](occurrences = 1)) {
|
||||||
|
camel.sendTo("direct:failing-once-concumer", msg = "hello") must be("accepted: hello")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
"Consumer supports manual Ack" in {
|
"Consumer supports manual Ack" in {
|
||||||
|
|
|
||||||
|
|
@ -18,7 +18,7 @@ import akka.pattern._
|
||||||
import scala.concurrent.util.duration._
|
import scala.concurrent.util.duration._
|
||||||
import akka.util.Timeout
|
import akka.util.Timeout
|
||||||
import org.scalatest.matchers.MustMatchers
|
import org.scalatest.matchers.MustMatchers
|
||||||
import akka.testkit.TestLatch
|
import akka.testkit._
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests the features of the Camel Producer.
|
* Tests the features of the Camel Producer.
|
||||||
|
|
@ -70,14 +70,16 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
|
||||||
}))
|
}))
|
||||||
val producer = Await.result[ActorRef](supervisor.ask(Props(new TestProducer("direct:producer-test-2"))).mapTo[ActorRef], timeoutDuration)
|
val producer = Await.result[ActorRef](supervisor.ask(Props(new TestProducer("direct:producer-test-2"))).mapTo[ActorRef], timeoutDuration)
|
||||||
val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123"))
|
val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123"))
|
||||||
val future = producer.ask(message)(timeoutDuration).failed
|
filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) {
|
||||||
|
val future = producer.ask(message)(timeoutDuration).failed
|
||||||
|
|
||||||
Await.ready(future, timeoutDuration).value match {
|
Await.ready(future, timeoutDuration).value match {
|
||||||
case Some(Right(e: AkkaCamelException)) ⇒
|
case Some(Right(e: AkkaCamelException)) ⇒
|
||||||
// a failure response must have been returned by the producer
|
// a failure response must have been returned by the producer
|
||||||
e.getMessage must be("failure")
|
e.getMessage must be("failure")
|
||||||
e.headers must be(Map(CamelMessage.MessageExchangeId -> "123"))
|
e.headers must be(Map(CamelMessage.MessageExchangeId -> "123"))
|
||||||
case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected)
|
case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Await.ready(latch, timeoutDuration)
|
Await.ready(latch, timeoutDuration)
|
||||||
deadActor must be(Some(producer))
|
deadActor must be(Some(producer))
|
||||||
|
|
@ -117,14 +119,17 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
|
||||||
"produce message to direct:producer-test-3 and receive failure response" in {
|
"produce message to direct:producer-test-3 and receive failure response" in {
|
||||||
val producer = system.actorOf(Props(new TestProducer("direct:producer-test-3")))
|
val producer = system.actorOf(Props(new TestProducer("direct:producer-test-3")))
|
||||||
val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123"))
|
val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123"))
|
||||||
val future = producer.ask(message)(timeoutDuration).failed
|
|
||||||
|
|
||||||
Await.ready(future, timeoutDuration).value match {
|
filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) {
|
||||||
case Some(Right(e: AkkaCamelException)) ⇒
|
val future = producer.ask(message)(timeoutDuration).failed
|
||||||
// a failure response must have been returned by the producer
|
|
||||||
e.getMessage must be("failure")
|
Await.ready(future, timeoutDuration).value match {
|
||||||
e.headers must be(Map(CamelMessage.MessageExchangeId -> "123"))
|
case Some(Right(e: AkkaCamelException)) ⇒
|
||||||
case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected)
|
// a failure response must have been returned by the producer
|
||||||
|
e.getMessage must be("failure")
|
||||||
|
e.headers must be(Map(CamelMessage.MessageExchangeId -> "123"))
|
||||||
|
case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -148,13 +153,15 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
|
||||||
val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target)))
|
val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target)))
|
||||||
val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123"))
|
val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123"))
|
||||||
|
|
||||||
val future = producer.ask(message)(timeoutDuration).failed
|
filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) {
|
||||||
Await.ready(future, timeoutDuration).value match {
|
val future = producer.ask(message)(timeoutDuration).failed
|
||||||
case Some(Right(e: AkkaCamelException)) ⇒
|
Await.ready(future, timeoutDuration).value match {
|
||||||
// a failure response must have been returned by the forward target
|
case Some(Right(e: AkkaCamelException)) ⇒
|
||||||
e.getMessage must be("failure")
|
// a failure response must have been returned by the forward target
|
||||||
e.headers must be(Map(CamelMessage.MessageExchangeId -> "123", "test" -> "failure"))
|
e.getMessage must be("failure")
|
||||||
case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected)
|
e.headers must be(Map(CamelMessage.MessageExchangeId -> "123", "test" -> "failure"))
|
||||||
|
case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -169,10 +176,12 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
|
||||||
"produce message, forward failure response to a producing target actor and produce response to direct:forward-test-1" in {
|
"produce message, forward failure response to a producing target actor and produce response to direct:forward-test-1" in {
|
||||||
val target = system.actorOf(Props[ProducingForwardTarget])
|
val target = system.actorOf(Props[ProducingForwardTarget])
|
||||||
val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target)))
|
val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target)))
|
||||||
mockEndpoint.expectedMessageCount(1)
|
filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) {
|
||||||
mockEndpoint.message(0).body().isInstanceOf(classOf[akka.actor.Status.Failure])
|
mockEndpoint.expectedMessageCount(1)
|
||||||
producer.tell(CamelMessage("fail", Map()), producer)
|
mockEndpoint.message(0).body().isInstanceOf(classOf[akka.actor.Status.Failure])
|
||||||
mockEndpoint.assertIsSatisfied()
|
producer.tell(CamelMessage("fail", Map()), producer)
|
||||||
|
mockEndpoint.assertIsSatisfied()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
"produce message, forward normal response from direct:producer-test-3 to a replying target actor and receive response" in {
|
"produce message, forward normal response from direct:producer-test-3 to a replying target actor and receive response" in {
|
||||||
|
|
@ -194,12 +203,14 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
|
||||||
val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target)))
|
val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target)))
|
||||||
|
|
||||||
val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123"))
|
val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123"))
|
||||||
val future = producer.ask(message)(timeoutDuration).failed
|
filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) {
|
||||||
Await.ready(future, timeoutDuration).value match {
|
val future = producer.ask(message)(timeoutDuration).failed
|
||||||
case Some(Right(e: AkkaCamelException)) ⇒
|
Await.ready(future, timeoutDuration).value match {
|
||||||
e.getMessage must be("failure")
|
case Some(Right(e: AkkaCamelException)) ⇒
|
||||||
e.headers must be(Map(CamelMessage.MessageExchangeId -> "123", "test" -> "failure"))
|
e.getMessage must be("failure")
|
||||||
case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected)
|
e.headers must be(Map(CamelMessage.MessageExchangeId -> "123", "test" -> "failure"))
|
||||||
|
case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -214,10 +225,12 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
|
||||||
"produce message, forward failure response from direct:producer-test-3 to a producing target actor and produce response to direct:forward-test-1" in {
|
"produce message, forward failure response from direct:producer-test-3 to a producing target actor and produce response to direct:forward-test-1" in {
|
||||||
val target = system.actorOf(Props[ProducingForwardTarget])
|
val target = system.actorOf(Props[ProducingForwardTarget])
|
||||||
val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target)))
|
val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target)))
|
||||||
mockEndpoint.expectedMessageCount(1)
|
filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) {
|
||||||
mockEndpoint.message(0).body().isInstanceOf(classOf[akka.actor.Status.Failure])
|
mockEndpoint.expectedMessageCount(1)
|
||||||
producer.tell(CamelMessage("fail", Map()), producer)
|
mockEndpoint.message(0).body().isInstanceOf(classOf[akka.actor.Status.Failure])
|
||||||
mockEndpoint.assertIsSatisfied()
|
producer.tell(CamelMessage("fail", Map()), producer)
|
||||||
|
mockEndpoint.assertIsSatisfied()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -16,6 +16,7 @@ import scala.reflect.ClassTag
|
||||||
import akka.actor.{ ActorRef, Props, ActorSystem, Actor }
|
import akka.actor.{ ActorRef, Props, ActorSystem, Actor }
|
||||||
import concurrent.Await
|
import concurrent.Await
|
||||||
import akka.util.Timeout
|
import akka.util.Timeout
|
||||||
|
import akka.testkit.AkkaSpec
|
||||||
|
|
||||||
private[camel] object TestSupport {
|
private[camel] object TestSupport {
|
||||||
|
|
||||||
|
|
@ -47,7 +48,7 @@ private[camel] object TestSupport {
|
||||||
}
|
}
|
||||||
|
|
||||||
trait SharedCamelSystem extends BeforeAndAfterAll { this: Suite ⇒
|
trait SharedCamelSystem extends BeforeAndAfterAll { this: Suite ⇒
|
||||||
implicit lazy val system = ActorSystem("test")
|
implicit lazy val system = ActorSystem("test", AkkaSpec.testConf)
|
||||||
implicit lazy val camel = CamelExtension(system)
|
implicit lazy val camel = CamelExtension(system)
|
||||||
|
|
||||||
abstract override protected def afterAll() {
|
abstract override protected def afterAll() {
|
||||||
|
|
@ -62,7 +63,7 @@ private[camel] object TestSupport {
|
||||||
|
|
||||||
override protected def beforeEach() {
|
override protected def beforeEach() {
|
||||||
super.beforeEach()
|
super.beforeEach()
|
||||||
system = ActorSystem("test")
|
system = ActorSystem("test", AkkaSpec.testConf)
|
||||||
camel = CamelExtension(system)
|
camel = CamelExtension(system)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -16,6 +16,7 @@ import akka.pattern._
|
||||||
import scala.concurrent.Await
|
import scala.concurrent.Await
|
||||||
import scala.concurrent.util.duration._
|
import scala.concurrent.util.duration._
|
||||||
import org.scalatest._
|
import org.scalatest._
|
||||||
|
import akka.testkit._
|
||||||
import matchers.MustMatchers
|
import matchers.MustMatchers
|
||||||
|
|
||||||
class UntypedProducerTest extends WordSpec with MustMatchers with BeforeAndAfterAll with BeforeAndAfterEach with SharedCamelSystem with GivenWhenThen {
|
class UntypedProducerTest extends WordSpec with MustMatchers with BeforeAndAfterAll with BeforeAndAfterEach with SharedCamelSystem with GivenWhenThen {
|
||||||
|
|
@ -49,13 +50,15 @@ class UntypedProducerTest extends WordSpec with MustMatchers with BeforeAndAfter
|
||||||
val producer = system.actorOf(Props[SampleUntypedReplyingProducer])
|
val producer = system.actorOf(Props[SampleUntypedReplyingProducer])
|
||||||
|
|
||||||
val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123"))
|
val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123"))
|
||||||
val future = producer.ask(message)(timeout).failed
|
filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) {
|
||||||
|
val future = producer.ask(message)(timeout).failed
|
||||||
|
|
||||||
Await.ready(future, timeout).value match {
|
Await.ready(future, timeout).value match {
|
||||||
case Some(Right(e: AkkaCamelException)) ⇒
|
case Some(Right(e: AkkaCamelException)) ⇒
|
||||||
e.getMessage must be("failure")
|
e.getMessage must be("failure")
|
||||||
e.headers must be(Map(CamelMessage.MessageExchangeId -> "123"))
|
e.headers must be(Map(CamelMessage.MessageExchangeId -> "123"))
|
||||||
case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected)
|
case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -67,7 +70,6 @@ class UntypedProducerTest extends WordSpec with MustMatchers with BeforeAndAfter
|
||||||
|
|
||||||
mockEndpoint.expectedBodiesReceived("received test")
|
mockEndpoint.expectedBodiesReceived("received test")
|
||||||
producer.tell(CamelMessage("test", Map[String, Any]()), producer)
|
producer.tell(CamelMessage("test", Map[String, Any]()), producer)
|
||||||
|
|
||||||
mockEndpoint.assertIsSatisfied
|
mockEndpoint.assertIsSatisfied
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue