diff --git a/akka-camel/src/test/java/akka/camel/ConsumerJavaTestBase.java b/akka-camel/src/test/java/akka/camel/ConsumerJavaTestBase.java index 8bb72baf00..d83fadc209 100644 --- a/akka-camel/src/test/java/akka/camel/ConsumerJavaTestBase.java +++ b/akka-camel/src/test/java/akka/camel/ConsumerJavaTestBase.java @@ -7,13 +7,14 @@ package akka.camel; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; +import akka.testkit.JavaTestKit; import scala.concurrent.Await; import scala.concurrent.util.Duration; import org.junit.AfterClass; import org.junit.Test; - -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import akka.testkit.AkkaSpec; +import akka.testkit.JavaTestKit.EventFilter; import static org.junit.Assert.assertEquals; @@ -23,7 +24,7 @@ import static org.junit.Assert.assertEquals; */ public class ConsumerJavaTestBase { - static ActorSystem system = ActorSystem.create("test"); + static ActorSystem system = ActorSystem.create("test", AkkaSpec.testConf()); static Camel camel = CamelExtension.get(system); @@ -34,12 +35,22 @@ public class ConsumerJavaTestBase { @Test public void shouldHandleExceptionThrownByActorAndGenerateCustomResponse() throws Exception { - Duration timeout = Duration.create(1, TimeUnit.SECONDS); - ActorRef ref = Await.result( - camel.activationFutureFor(system.actorOf(new Props(SampleErrorHandlingConsumer.class)), timeout), - timeout); - - String result = camel.template().requestBody("direct:error-handler-test-java", "hello", String.class); - assertEquals("error: hello", result); + new JavaTestKit(system) {{ + String result = new EventFilter(Exception.class) { + protected String run() { + Duration timeout = Duration.create(1, TimeUnit.SECONDS); + try { + ActorRef ref = Await.result( + camel.activationFutureFor(system.actorOf(new Props(SampleErrorHandlingConsumer.class)), timeout), + timeout); + return camel.template().requestBody("direct:error-handler-test-java", "hello", String.class); + } + catch (Exception e) { + return e.getMessage(); + } + } + }.occurrences(1).exec(); + assertEquals("error: hello", result); + }}; } } diff --git a/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala b/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala index 08ee3cf99d..ba2447862e 100644 --- a/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala @@ -15,18 +15,24 @@ import org.apache.camel.model.RouteDefinition import org.apache.camel.builder.Builder import org.apache.camel.{ FailedToCreateRouteException, CamelExecutionException } import java.util.concurrent.{ ExecutionException, TimeUnit, TimeoutException } -import akka.testkit.TestLatch +import akka.testkit._ import akka.actor.Status.Failure import scala.concurrent.Await import scala.concurrent.util.duration._ +import akka.actor.Status.Failure +import akka.actor.ActorKilledException class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedCamelSystem { "ConsumerIntegrationTest" must { implicit val defaultTimeout = 10.seconds "Consumer must throw FailedToCreateRouteException, while awaiting activation, if endpoint is invalid" in { - val actorRef = system.actorOf(Props(new TestActor(uri = "some invalid uri"))) - intercept[FailedToCreateRouteException] { Await.result(camel.activationFutureFor(actorRef), defaultTimeout) } + filterEvents(EventFilter[ActorInitializationException](occurrences = 1), EventFilter[FailedToCreateRouteException](occurrences = 1)) { + val actorRef = system.actorOf(Props(new TestActor(uri = "some invalid uri"))) + intercept[FailedToCreateRouteException] { + Await.result(camel.activationFutureFor(actorRef), defaultTimeout) + } + } } "Consumer must support in-out messaging" in { @@ -70,11 +76,13 @@ class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedC restarted.countDown() } }) - consumer ! "throw" - Await.ready(restarted, defaultTimeout) + filterEvents(EventFilter[Exception](occurrences = 1)) { + consumer ! "throw" + Await.ready(restarted, defaultTimeout) - val response = camel.sendTo("direct:a2", msg = "xyz") - response must be("received xyz") + val response = camel.sendTo("direct:a2", msg = "xyz") + response must be("received xyz") + } } "Consumer must unregister itself when stopped" in { @@ -106,7 +114,9 @@ class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedC rd.onException(classOf[Exception]).handled(true).transform(Builder.exceptionMessage).end } }) - camel.sendTo("direct:error-handler-test", msg = "hello") must be("error: hello") + filterEvents(EventFilter[Exception](occurrences = 1)) { + camel.sendTo("direct:error-handler-test", msg = "hello") must be("error: hello") + } } "Error passing consumer supports redelivery through route modification" in { @@ -115,7 +125,9 @@ class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedC rd.onException(classOf[Exception]).maximumRedeliveries(1).end } }) - camel.sendTo("direct:failing-once-concumer", msg = "hello") must be("accepted: hello") + filterEvents(EventFilter[Exception](occurrences = 1)) { + camel.sendTo("direct:failing-once-concumer", msg = "hello") must be("accepted: hello") + } } "Consumer supports manual Ack" in { diff --git a/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala b/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala index 3de8055875..0bd3f4d0a4 100644 --- a/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala @@ -18,7 +18,7 @@ import akka.pattern._ import scala.concurrent.util.duration._ import akka.util.Timeout import org.scalatest.matchers.MustMatchers -import akka.testkit.TestLatch +import akka.testkit._ /** * Tests the features of the Camel Producer. @@ -70,14 +70,16 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd })) val producer = Await.result[ActorRef](supervisor.ask(Props(new TestProducer("direct:producer-test-2"))).mapTo[ActorRef], timeoutDuration) val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123")) - val future = producer.ask(message)(timeoutDuration).failed + filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) { + val future = producer.ask(message)(timeoutDuration).failed - Await.ready(future, timeoutDuration).value match { - case Some(Right(e: AkkaCamelException)) ⇒ - // a failure response must have been returned by the producer - e.getMessage must be("failure") - e.headers must be(Map(CamelMessage.MessageExchangeId -> "123")) - case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected) + Await.ready(future, timeoutDuration).value match { + case Some(Right(e: AkkaCamelException)) ⇒ + // a failure response must have been returned by the producer + e.getMessage must be("failure") + e.headers must be(Map(CamelMessage.MessageExchangeId -> "123")) + case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected) + } } Await.ready(latch, timeoutDuration) deadActor must be(Some(producer)) @@ -117,14 +119,17 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd "produce message to direct:producer-test-3 and receive failure response" in { val producer = system.actorOf(Props(new TestProducer("direct:producer-test-3"))) val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123")) - val future = producer.ask(message)(timeoutDuration).failed - Await.ready(future, timeoutDuration).value match { - case Some(Right(e: AkkaCamelException)) ⇒ - // a failure response must have been returned by the producer - e.getMessage must be("failure") - e.headers must be(Map(CamelMessage.MessageExchangeId -> "123")) - case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected) + filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) { + val future = producer.ask(message)(timeoutDuration).failed + + Await.ready(future, timeoutDuration).value match { + case Some(Right(e: AkkaCamelException)) ⇒ + // a failure response must have been returned by the producer + e.getMessage must be("failure") + e.headers must be(Map(CamelMessage.MessageExchangeId -> "123")) + case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected) + } } } @@ -148,13 +153,15 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target))) val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123")) - val future = producer.ask(message)(timeoutDuration).failed - Await.ready(future, timeoutDuration).value match { - case Some(Right(e: AkkaCamelException)) ⇒ - // a failure response must have been returned by the forward target - e.getMessage must be("failure") - e.headers must be(Map(CamelMessage.MessageExchangeId -> "123", "test" -> "failure")) - case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected) + filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) { + val future = producer.ask(message)(timeoutDuration).failed + Await.ready(future, timeoutDuration).value match { + case Some(Right(e: AkkaCamelException)) ⇒ + // a failure response must have been returned by the forward target + e.getMessage must be("failure") + e.headers must be(Map(CamelMessage.MessageExchangeId -> "123", "test" -> "failure")) + case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected) + } } } @@ -169,10 +176,12 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd "produce message, forward failure response to a producing target actor and produce response to direct:forward-test-1" in { val target = system.actorOf(Props[ProducingForwardTarget]) val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target))) - mockEndpoint.expectedMessageCount(1) - mockEndpoint.message(0).body().isInstanceOf(classOf[akka.actor.Status.Failure]) - producer.tell(CamelMessage("fail", Map()), producer) - mockEndpoint.assertIsSatisfied() + filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) { + mockEndpoint.expectedMessageCount(1) + mockEndpoint.message(0).body().isInstanceOf(classOf[akka.actor.Status.Failure]) + producer.tell(CamelMessage("fail", Map()), producer) + mockEndpoint.assertIsSatisfied() + } } "produce message, forward normal response from direct:producer-test-3 to a replying target actor and receive response" in { @@ -194,12 +203,14 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target))) val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123")) - val future = producer.ask(message)(timeoutDuration).failed - Await.ready(future, timeoutDuration).value match { - case Some(Right(e: AkkaCamelException)) ⇒ - e.getMessage must be("failure") - e.headers must be(Map(CamelMessage.MessageExchangeId -> "123", "test" -> "failure")) - case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected) + filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) { + val future = producer.ask(message)(timeoutDuration).failed + Await.ready(future, timeoutDuration).value match { + case Some(Right(e: AkkaCamelException)) ⇒ + e.getMessage must be("failure") + e.headers must be(Map(CamelMessage.MessageExchangeId -> "123", "test" -> "failure")) + case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected) + } } } @@ -214,10 +225,12 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd "produce message, forward failure response from direct:producer-test-3 to a producing target actor and produce response to direct:forward-test-1" in { val target = system.actorOf(Props[ProducingForwardTarget]) val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target))) - mockEndpoint.expectedMessageCount(1) - mockEndpoint.message(0).body().isInstanceOf(classOf[akka.actor.Status.Failure]) - producer.tell(CamelMessage("fail", Map()), producer) - mockEndpoint.assertIsSatisfied() + filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) { + mockEndpoint.expectedMessageCount(1) + mockEndpoint.message(0).body().isInstanceOf(classOf[akka.actor.Status.Failure]) + producer.tell(CamelMessage("fail", Map()), producer) + mockEndpoint.assertIsSatisfied() + } } } diff --git a/akka-camel/src/test/scala/akka/camel/TestSupport.scala b/akka-camel/src/test/scala/akka/camel/TestSupport.scala index 2920e558a0..38454db1c4 100644 --- a/akka-camel/src/test/scala/akka/camel/TestSupport.scala +++ b/akka-camel/src/test/scala/akka/camel/TestSupport.scala @@ -16,6 +16,7 @@ import scala.reflect.ClassTag import akka.actor.{ ActorRef, Props, ActorSystem, Actor } import concurrent.Await import akka.util.Timeout +import akka.testkit.AkkaSpec private[camel] object TestSupport { @@ -47,7 +48,7 @@ private[camel] object TestSupport { } trait SharedCamelSystem extends BeforeAndAfterAll { this: Suite ⇒ - implicit lazy val system = ActorSystem("test") + implicit lazy val system = ActorSystem("test", AkkaSpec.testConf) implicit lazy val camel = CamelExtension(system) abstract override protected def afterAll() { @@ -62,7 +63,7 @@ private[camel] object TestSupport { override protected def beforeEach() { super.beforeEach() - system = ActorSystem("test") + system = ActorSystem("test", AkkaSpec.testConf) camel = CamelExtension(system) } diff --git a/akka-camel/src/test/scala/akka/camel/UntypedProducerTest.scala b/akka-camel/src/test/scala/akka/camel/UntypedProducerTest.scala index 21e9800b87..11178277b9 100644 --- a/akka-camel/src/test/scala/akka/camel/UntypedProducerTest.scala +++ b/akka-camel/src/test/scala/akka/camel/UntypedProducerTest.scala @@ -16,6 +16,7 @@ import akka.pattern._ import scala.concurrent.Await import scala.concurrent.util.duration._ import org.scalatest._ +import akka.testkit._ import matchers.MustMatchers class UntypedProducerTest extends WordSpec with MustMatchers with BeforeAndAfterAll with BeforeAndAfterEach with SharedCamelSystem with GivenWhenThen { @@ -49,13 +50,15 @@ class UntypedProducerTest extends WordSpec with MustMatchers with BeforeAndAfter val producer = system.actorOf(Props[SampleUntypedReplyingProducer]) val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123")) - val future = producer.ask(message)(timeout).failed + filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) { + val future = producer.ask(message)(timeout).failed - Await.ready(future, timeout).value match { - case Some(Right(e: AkkaCamelException)) ⇒ - e.getMessage must be("failure") - e.headers must be(Map(CamelMessage.MessageExchangeId -> "123")) - case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected) + Await.ready(future, timeout).value match { + case Some(Right(e: AkkaCamelException)) ⇒ + e.getMessage must be("failure") + e.headers must be(Map(CamelMessage.MessageExchangeId -> "123")) + case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected) + } } } } @@ -67,7 +70,6 @@ class UntypedProducerTest extends WordSpec with MustMatchers with BeforeAndAfter mockEndpoint.expectedBodiesReceived("received test") producer.tell(CamelMessage("test", Map[String, Any]()), producer) - mockEndpoint.assertIsSatisfied }