diff --git a/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala b/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala index ba2447862e..20fa98b7b8 100644 --- a/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala @@ -16,11 +16,9 @@ import org.apache.camel.builder.Builder import org.apache.camel.{ FailedToCreateRouteException, CamelExecutionException } import java.util.concurrent.{ ExecutionException, TimeUnit, TimeoutException } import akka.testkit._ -import akka.actor.Status.Failure import scala.concurrent.Await import scala.concurrent.util.duration._ import akka.actor.Status.Failure -import akka.actor.ActorKilledException class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedCamelSystem { "ConsumerIntegrationTest" must { @@ -68,7 +66,7 @@ class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedC def endpointUri = "direct:a2" def receive = { - case "throw" ⇒ throw new Exception + case "throw" ⇒ throw new TestException("") case m: CamelMessage ⇒ sender ! "received " + m.bodyAs[String] } @@ -76,12 +74,11 @@ class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedC restarted.countDown() } }) - filterEvents(EventFilter[Exception](occurrences = 1)) { + filterEvents(EventFilter[TestException](occurrences = 1)) { consumer ! "throw" Await.ready(restarted, defaultTimeout) - val response = camel.sendTo("direct:a2", msg = "xyz") - response must be("received xyz") + camel.sendTo("direct:a2", msg = "xyz") must be("received xyz") } } @@ -111,10 +108,10 @@ class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedC "Error passing consumer supports error handling through route modification" in { start(new ErrorThrowingConsumer("direct:error-handler-test") with ErrorPassing { override def onRouteDefinition(rd: RouteDefinition) = { - rd.onException(classOf[Exception]).handled(true).transform(Builder.exceptionMessage).end + rd.onException(classOf[TestException]).handled(true).transform(Builder.exceptionMessage).end } }) - filterEvents(EventFilter[Exception](occurrences = 1)) { + filterEvents(EventFilter[TestException](occurrences = 1)) { camel.sendTo("direct:error-handler-test", msg = "hello") must be("error: hello") } } @@ -122,10 +119,10 @@ class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedC "Error passing consumer supports redelivery through route modification" in { start(new FailingOnceConsumer("direct:failing-once-concumer") with ErrorPassing { override def onRouteDefinition(rd: RouteDefinition) = { - rd.onException(classOf[Exception]).maximumRedeliveries(1).end + rd.onException(classOf[TestException]).maximumRedeliveries(1).end } }) - filterEvents(EventFilter[Exception](occurrences = 1)) { + filterEvents(EventFilter[TestException](occurrences = 1)) { camel.sendTo("direct:failing-once-concumer", msg = "hello") must be("accepted: hello") } } @@ -166,7 +163,7 @@ class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedC class ErrorThrowingConsumer(override val endpointUri: String) extends Consumer { def receive = { - case msg: CamelMessage ⇒ throw new Exception("error: %s" format msg.body) + case msg: CamelMessage ⇒ throw new TestException("error: %s" format msg.body) } } @@ -177,7 +174,7 @@ class FailingOnceConsumer(override val endpointUri: String) extends Consumer { if (msg.headerAs[Boolean]("CamelRedelivered").getOrElse(false)) sender ! ("accepted: %s" format msg.body) else - throw new Exception("rejected: %s" format msg.body) + throw new TestException("rejected: %s" format msg.body) } } @@ -196,3 +193,5 @@ trait ErrorPassing { trait ManualAckConsumer extends Consumer { override def autoAck = false } + +class TestException(msg: String) extends Exception(msg) diff --git a/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala b/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala index 0bd3f4d0a4..e9d5382843 100644 --- a/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala @@ -71,15 +71,9 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd val producer = Await.result[ActorRef](supervisor.ask(Props(new TestProducer("direct:producer-test-2"))).mapTo[ActorRef], timeoutDuration) val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123")) filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) { - val future = producer.ask(message)(timeoutDuration).failed - - Await.ready(future, timeoutDuration).value match { - case Some(Right(e: AkkaCamelException)) ⇒ - // a failure response must have been returned by the producer - e.getMessage must be("failure") - e.headers must be(Map(CamelMessage.MessageExchangeId -> "123")) - case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected) - } + val e = intercept[AkkaCamelException] { Await.result(producer.ask(message)(timeoutDuration), timeoutDuration) } + e.getMessage must be("failure") + e.headers must be(Map(CamelMessage.MessageExchangeId -> "123")) } Await.ready(latch, timeoutDuration) deadActor must be(Some(producer)) @@ -121,15 +115,9 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123")) filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) { - val future = producer.ask(message)(timeoutDuration).failed - - Await.ready(future, timeoutDuration).value match { - case Some(Right(e: AkkaCamelException)) ⇒ - // a failure response must have been returned by the producer - e.getMessage must be("failure") - e.headers must be(Map(CamelMessage.MessageExchangeId -> "123")) - case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected) - } + val e = intercept[AkkaCamelException] { Await.result(producer.ask(message)(timeoutDuration), timeoutDuration) } + e.getMessage must be("failure") + e.headers must be(Map(CamelMessage.MessageExchangeId -> "123")) } } @@ -154,14 +142,9 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123")) filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) { - val future = producer.ask(message)(timeoutDuration).failed - Await.ready(future, timeoutDuration).value match { - case Some(Right(e: AkkaCamelException)) ⇒ - // a failure response must have been returned by the forward target - e.getMessage must be("failure") - e.headers must be(Map(CamelMessage.MessageExchangeId -> "123", "test" -> "failure")) - case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected) - } + val e = intercept[AkkaCamelException] { Await.result(producer.ask(message)(timeoutDuration), timeoutDuration) } + e.getMessage must be("failure") + e.headers must be(Map(CamelMessage.MessageExchangeId -> "123", "test" -> "failure")) } } @@ -204,13 +187,9 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123")) filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) { - val future = producer.ask(message)(timeoutDuration).failed - Await.ready(future, timeoutDuration).value match { - case Some(Right(e: AkkaCamelException)) ⇒ - e.getMessage must be("failure") - e.headers must be(Map(CamelMessage.MessageExchangeId -> "123", "test" -> "failure")) - case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected) - } + val e = intercept[AkkaCamelException] { Await.result(producer.ask(message)(timeoutDuration), timeoutDuration) } + e.getMessage must be("failure") + e.headers must be(Map(CamelMessage.MessageExchangeId -> "123", "test" -> "failure")) } }