From 82d9427ad4fb0c93b6330ab44cd770b774e4c66b Mon Sep 17 00:00:00 2001 From: RayRoestenburg Date: Fri, 11 May 2012 09:46:49 +0200 Subject: [PATCH] Removed akka.camel.Failure, replaced with akka.actor.Status.Failure, fixed tests --- .../main/scala/akka/camel/CamelMessage.scala | 38 ---------- .../src/main/scala/akka/camel/Producer.scala | 10 ++- .../camel/internal/CamelExchangeAdapter.scala | 14 ++-- .../internal/component/ActorComponent.scala | 18 ++--- .../camel/SampleErrorHandlingConsumer.java | 3 +- .../akka/camel/CamelExchangeAdapterTest.scala | 8 +- .../akka/camel/ConsumerIntegrationTest.scala | 1 + .../akka/camel/ProducerFeatureTest.scala | 75 +++++++++---------- .../akka/camel/UntypedProducerTest.scala | 21 +++--- .../component/ActorProducerTest.scala | 22 +++--- 10 files changed, 86 insertions(+), 124 deletions(-) diff --git a/akka-camel/src/main/scala/akka/camel/CamelMessage.scala b/akka-camel/src/main/scala/akka/camel/CamelMessage.scala index 0c1a77bb58..75bc85bb23 100644 --- a/akka-camel/src/main/scala/akka/camel/CamelMessage.scala +++ b/akka-camel/src/main/scala/akka/camel/CamelMessage.scala @@ -238,44 +238,6 @@ case object Ack { def ack = this } -/** - * An immutable representation of a failed Camel exchange. It contains the failure cause - * obtained from Exchange.getException and the headers from either the Exchange.getIn - * message or Exchange.getOut message, depending on the exchange pattern. - * - * @author Martin Krasser - */ -case class Failure(val cause: Throwable, val headers: Map[String, Any] = Map.empty) { - - /** - * Creates a Failure with cause body and empty headers map. - */ - def this(cause: Throwable) = this(cause, Map.empty[String, Any]) - - /** - * Creates a Failure with given cause and headers map. A copy of the headers map is made. - *

- * Java API - */ - def this(cause: Throwable, headers: JMap[String, Any]) = this(cause, headers.toMap) - - /** - * Returns the cause of this Failure. - *

- * Java API. - */ - def getCause = cause - - /** - * Returns all headers from this failure message. The returned headers map is backed up by - * this message's immutable headers map. Any attempt to modify the returned map will throw - * an exception. - *

- * Java API - */ - def getHeaders: JMap[String, Any] = headers -} - /** * An exception indicating that the exchange to the camel endpoint failed. * It contains the failure cause obtained from Exchange.getException and the headers from either the Exchange.getIn diff --git a/akka-camel/src/main/scala/akka/camel/Producer.scala b/akka-camel/src/main/scala/akka/camel/Producer.scala index ca16861b1a..80537fda12 100644 --- a/akka-camel/src/main/scala/akka/camel/Producer.scala +++ b/akka-camel/src/main/scala/akka/camel/Producer.scala @@ -7,6 +7,7 @@ package akka.camel import akka.actor.Actor import internal.CamelExchangeAdapter import org.apache.camel.{ Exchange, ExchangePattern, AsyncCallback } +import akka.actor.Status.Failure /** * Support trait for producing messages to Camel endpoints. @@ -75,7 +76,7 @@ trait ProducerSupport { this: Actor ⇒ val originalSender = sender // Ignoring doneSync, sending back async uniformly. def done(doneSync: Boolean): Unit = producer.tell( - if (exchange.isFailed) FailureResult(exchange.toFailureMessage(cmsg.headers(headersToCopy))) + if (exchange.isFailed) exchange.toFailureResult(cmsg.headers(headersToCopy)) else MessageResult(exchange.toResponseMessage(cmsg.headers(headersToCopy))), originalSender) }) } @@ -90,8 +91,9 @@ trait ProducerSupport { this: Actor ⇒ protected def produce: Receive = { case res: MessageResult ⇒ routeResponse(res.message) case res: FailureResult ⇒ - routeResponse(res.failure) - throw new AkkaCamelException(res.failure.cause, res.failure.headers) + val e = new AkkaCamelException(res.cause, res.headers) + routeResponse(Failure(e)) + throw e case msg ⇒ val exchangePattern = if (oneway) ExchangePattern.InOnly else ExchangePattern.InOut produce(transformOutgoingMessage(msg), exchangePattern) @@ -143,7 +145,7 @@ private case class MessageResult(message: CamelMessage) /** * @author Martin Krasser */ -private case class FailureResult(failure: Failure) +private case class FailureResult(cause: Throwable, headers: Map[String, Any] = Map.empty) /** * A one-way producer. diff --git a/akka-camel/src/main/scala/akka/camel/internal/CamelExchangeAdapter.scala b/akka-camel/src/main/scala/akka/camel/internal/CamelExchangeAdapter.scala index ae409e3407..1f2d80e6df 100644 --- a/akka-camel/src/main/scala/akka/camel/internal/CamelExchangeAdapter.scala +++ b/akka-camel/src/main/scala/akka/camel/internal/CamelExchangeAdapter.scala @@ -5,7 +5,7 @@ import scala.collection.JavaConversions._ import org.apache.camel.util.ExchangeHelper import org.apache.camel.{ Exchange, Message ⇒ JCamelMessage } -import akka.camel.{ Failure, AkkaCamelException, CamelMessage } +import akka.camel.{ FailureResult, AkkaCamelException, CamelMessage } /** * For internal use only. @@ -40,10 +40,10 @@ private[camel] class CamelExchangeAdapter(exchange: Exchange) { def setResponse(msg: CamelMessage) { msg.copyContentTo(response) } /** - * Sets Exchange.getException from the given Failure message. Headers of the Failure message + * Sets Exchange.getException from the given FailureResult message. Headers of the FailureResult message * are ignored. */ - def setFailure(msg: Failure) { exchange.setException(msg.cause) } + def setFailure(msg: FailureResult) { exchange.setException(msg.cause) } /** * Creates an immutable CamelMessage object from Exchange.getIn so it can be used with Actors. @@ -87,21 +87,21 @@ private[camel] class CamelExchangeAdapter(exchange: Exchange) { new AkkaCamelException(exchange.getException, headers ++ response.getHeaders) /** - * Creates an immutable Failure object from the adapted Exchange so it can be used with Actors. + * Creates an immutable Failure object from the adapted Exchange so it can be used internally between Actors. * * @see Failure */ - def toFailureMessage: Failure = toFailureMessage(Map.empty) + def toFailureMessage: FailureResult = toFailureResult(Map.empty) /** - * Creates an immutable Failure object from the adapted Exchange so it can be used with Actors. + * Creates an immutable FailureResult object from the adapted Exchange so it can be used internally between Actors. * * @param headers additional headers to set on the created CamelMessage in addition to those * in the Camel message. * * @see Failure */ - def toFailureMessage(headers: Map[String, Any]): Failure = Failure(exchange.getException, headers ++ response.getHeaders) + def toFailureResult(headers: Map[String, Any]): FailureResult = FailureResult(exchange.getException, headers ++ response.getHeaders) /** * Creates an immutable CamelMessage object from Exchange.getIn so it can be used with Actors. diff --git a/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala b/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala index bf284c37c8..e637914a2a 100644 --- a/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala +++ b/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala @@ -17,7 +17,7 @@ import akka.util.duration._ import java.util.concurrent.{ TimeoutException, CountDownLatch } import akka.camel.internal.CamelExchangeAdapter import akka.util.{ NonFatal, Duration, Timeout } -import akka.camel.{ ActorNotRegisteredException, DefaultConsumerConfig, ConsumerConfig, Camel, Ack, Failure ⇒ CamelFailure, CamelMessage } +import akka.camel.{ ActorNotRegisteredException, DefaultConsumerConfig, ConsumerConfig, Camel, Ack, FailureResult, CamelMessage } /** * For internal use only. @@ -170,18 +170,18 @@ private[camel] class ActorProducer(val endpoint: ActorEndpoint, camel: Camel) ex } private def forwardResponseTo(exchange: CamelExchangeAdapter): PartialFunction[Either[Throwable, Any], Unit] = { - case Right(failure: CamelFailure) ⇒ exchange.setFailure(failure) + case Right(failure: FailureResult) ⇒ exchange.setFailure(failure) case Right(msg) ⇒ exchange.setResponse(CamelMessage.canonicalize(msg)) - case Left(e: TimeoutException) ⇒ exchange.setFailure(CamelFailure(new TimeoutException("Failed to get response from the actor [%s] within timeout [%s]. Check replyTimeout and blocking settings [%s]" format (endpoint.path, endpoint.replyTimeout, endpoint)))) - case Left(throwable) ⇒ exchange.setFailure(CamelFailure(throwable)) + case Left(e: TimeoutException) ⇒ exchange.setFailure(FailureResult(new TimeoutException("Failed to get response from the actor [%s] within timeout [%s]. Check replyTimeout and blocking settings [%s]" format (endpoint.path, endpoint.replyTimeout, endpoint)))) + case Left(throwable) ⇒ exchange.setFailure(FailureResult(throwable)) } private def forwardAckTo(exchange: CamelExchangeAdapter): PartialFunction[Either[Throwable, Any], Unit] = { case Right(Ack) ⇒ { /* no response message to set */ } - case Right(failure: CamelFailure) ⇒ exchange.setFailure(failure) - case Right(msg) ⇒ exchange.setFailure(CamelFailure(new IllegalArgumentException("Expected Ack or Failure message, but got: [%s] from actor [%s]" format (msg, endpoint.path)))) - case Left(e: TimeoutException) ⇒ exchange.setFailure(CamelFailure(new TimeoutException("Failed to get Ack or Failure response from the actor [%s] within timeout [%s]. Check replyTimeout and blocking settings [%s]" format (endpoint.path, endpoint.replyTimeout, endpoint)))) - case Left(throwable) ⇒ exchange.setFailure(CamelFailure(throwable)) + case Right(failure: FailureResult) ⇒ exchange.setFailure(failure) + case Right(msg) ⇒ exchange.setFailure(FailureResult(new IllegalArgumentException("Expected Ack or Failure message, but got: [%s] from actor [%s]" format (msg, endpoint.path)))) + case Left(e: TimeoutException) ⇒ exchange.setFailure(FailureResult(new TimeoutException("Failed to get Ack or Failure response from the actor [%s] within timeout [%s]. Check replyTimeout and blocking settings [%s]" format (endpoint.path, endpoint.replyTimeout, endpoint)))) + case Left(throwable) ⇒ exchange.setFailure(FailureResult(throwable)) } private def sendAsync(message: CamelMessage, onComplete: PartialFunction[Either[Throwable, Any], Unit]): Boolean = { @@ -199,7 +199,7 @@ private[camel] class ActorProducer(val endpoint: ActorEndpoint, camel: Camel) ex try { actorFor(endpoint.path) ! message } catch { - case e ⇒ exchange.setFailure(new CamelFailure(e)) + case e ⇒ exchange.setFailure(new FailureResult(e)) } } diff --git a/akka-camel/src/test/java/akka/camel/SampleErrorHandlingConsumer.java b/akka-camel/src/test/java/akka/camel/SampleErrorHandlingConsumer.java index 0622a9a51a..5bde5f8976 100644 --- a/akka-camel/src/test/java/akka/camel/SampleErrorHandlingConsumer.java +++ b/akka-camel/src/test/java/akka/camel/SampleErrorHandlingConsumer.java @@ -4,6 +4,7 @@ package akka.camel; +import akka.actor.Status; import akka.camel.javaapi.UntypedConsumerActor; import akka.util.Duration; import org.apache.camel.builder.Builder; @@ -41,7 +42,7 @@ public class SampleErrorHandlingConsumer extends UntypedConsumerActor { @Override public void preRestart(Throwable reason, Option message){ - getSender().tell(new Failure(reason)); + getSender().tell(new Status.Failure(reason)); } } diff --git a/akka-camel/src/test/scala/akka/camel/CamelExchangeAdapterTest.scala b/akka-camel/src/test/scala/akka/camel/CamelExchangeAdapterTest.scala index fd633b85cb..1b08df493c 100644 --- a/akka-camel/src/test/scala/akka/camel/CamelExchangeAdapterTest.scala +++ b/akka-camel/src/test/scala/akka/camel/CamelExchangeAdapterTest.scala @@ -39,10 +39,10 @@ class CamelExchangeAdapterTest extends FunSuite with SharedCamelSystem with Mess test("mustSetExceptionFromFailureMessage") { val e1 = sampleInOnly - e1.setFailure(Failure(new Exception("test1"))) + e1.setFailure(FailureResult(new Exception("test1"))) assert(e1.getException.getMessage === "test1") val e2 = sampleInOut - e2.setFailure(Failure(new Exception("test2"))) + e2.setFailure(FailureResult(new Exception("test2"))) assert(e2.getException.getMessage === "test2") } @@ -103,7 +103,7 @@ class CamelExchangeAdapterTest extends FunSuite with SharedCamelSystem with Mess assert(headers("x") === "y") assert(e1.toFailureMessage.cause.getMessage === "test1") - val failureHeaders = e1.toFailureMessage(Map("x" -> "y")).headers + val failureHeaders = e1.toFailureResult(Map("x" -> "y")).headers assert(failureHeaders("key-in") === "val-in") assert(failureHeaders("x") === "y") @@ -117,7 +117,7 @@ class CamelExchangeAdapterTest extends FunSuite with SharedCamelSystem with Mess assert(headers("key-out") === "val-out") assert(headers("x") === "y") assert(e1.toFailureMessage.cause.getMessage === "test2") - val failureHeaders = e1.toFailureMessage(Map("x" -> "y")).headers + val failureHeaders = e1.toFailureResult(Map("x" -> "y")).headers assert(failureHeaders("key-out") === "val-out") assert(failureHeaders("x") === "y") } diff --git a/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala b/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala index 46070337c7..5a0d52572f 100644 --- a/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala @@ -15,6 +15,7 @@ import org.apache.camel.{ FailedToCreateRouteException, CamelExecutionException import java.util.concurrent.{ ExecutionException, TimeUnit, TimeoutException } import akka.testkit.TestLatch import akka.dispatch.Await +import akka.actor.Status.Failure class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedCamelSystem { private val defaultTimeout = 10 diff --git a/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala b/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala index 8c1ef22b92..477e393026 100644 --- a/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala @@ -7,7 +7,7 @@ package akka.camel import org.apache.camel.{ Exchange, Processor } import org.apache.camel.builder.RouteBuilder import org.apache.camel.component.mock.MockEndpoint -import akka.dispatch.{ Future, Await } +import akka.dispatch.Await import akka.camel.TestSupport.SharedCamelSystem import akka.actor.SupervisorStrategy.Stop import org.scalatest.{ GivenWhenThen, BeforeAndAfterEach, BeforeAndAfterAll, WordSpec } @@ -71,18 +71,16 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd case _: AkkaCamelException ⇒ Stop } })) - val producer = Await.result[ActorRef](supervisor.ask(Props(new TestProducer("direct:producer-test-2"))).asInstanceOf[Future[ActorRef]], timeoutDuration) + val producer = Await.result[ActorRef](supervisor.ask(Props(new TestProducer("direct:producer-test-2"))).mapTo[ActorRef], timeoutDuration) when("a test message causing an exception is sent to the producer with ?") val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123")) - val future = producer.ask(message)(timeoutDuration) - Await.result(future, timeoutDuration) match { - case result: Failure ⇒ + val future = producer.ask(message)(timeoutDuration).failed + Await.ready(future, timeoutDuration).value match { + case Some(Right(e: AkkaCamelException)) ⇒ then("a failure response must have been returned by the producer") - val expectedFailureText = result.cause.getMessage - val expectedHeaders = result.headers - assert(expectedFailureText === "failure") - assert(expectedHeaders === Map(CamelMessage.MessageExchangeId -> "123")) + e.getMessage must be("failure") + e.headers must be(Map(CamelMessage.MessageExchangeId -> "123")) case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected) } then("an AkkaCamelException must have been thrown, which can be used for supervision") @@ -130,7 +128,7 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd case result: CamelMessage ⇒ then("a normal response must have been returned by the producer") val expected = CamelMessage("received test", Map(CamelMessage.MessageExchangeId -> "123")) - assert(result === expected) + result must be(expected) case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected) } } @@ -141,14 +139,13 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd when("a test message causing an exception is sent to the producer with ?") val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123")) - val future = producer.ask(message)(timeoutDuration) - Await.result(future, timeoutDuration) match { - case result: Failure ⇒ + + val future = producer.ask(message)(timeoutDuration).failed + Await.ready(future, timeoutDuration).value match { + case Some(Right(e: AkkaCamelException)) ⇒ then("a failure response must have been returned by the producer") - val expectedFailureText = result.cause.getMessage - val expectedHeaders = result.headers - assert(expectedFailureText === "failure") - assert(expectedHeaders === Map(CamelMessage.MessageExchangeId -> "123")) + e.getMessage must be("failure") + e.headers must be(Map(CamelMessage.MessageExchangeId -> "123")) case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected) } } @@ -166,7 +163,7 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd case result: CamelMessage ⇒ then("a normal response must have been returned by the forward target") val expected = CamelMessage("received test", Map(CamelMessage.MessageExchangeId -> "123", "test" -> "result")) - assert(result === expected) + result must be(expected) case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected) } } @@ -178,14 +175,12 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd when("a test message causing an exception is sent to the producer with ?") val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123")) - val future = producer.ask(message)(timeoutDuration) - Await.result(future, timeoutDuration) match { - case failure: Failure ⇒ + val future = producer.ask(message)(timeoutDuration).failed + Await.ready(future, timeoutDuration).value match { + case Some(Right(e: AkkaCamelException)) ⇒ then("a failure response must have been returned by the forward target") - val expectedFailureText = failure.cause.getMessage - val expectedHeaders = failure.headers - assert(expectedFailureText === "failure") - assert(expectedHeaders === Map(CamelMessage.MessageExchangeId -> "123", "test" -> "failure")) + e.getMessage must be("failure") + e.headers must be(Map(CamelMessage.MessageExchangeId -> "123", "test" -> "failure")) case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected) } } @@ -211,7 +206,7 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd when("a test message causing an exception is sent to the producer with !") mockEndpoint.expectedMessageCount(1) - mockEndpoint.message(0).body().isInstanceOf(classOf[Failure]) + mockEndpoint.message(0).body().isInstanceOf(classOf[akka.actor.Status.Failure]) producer.tell(CamelMessage("fail", Map()), producer) then("a failure response must have been produced by the forward target") @@ -232,7 +227,7 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd Await.result(future, timeoutDuration) match { case message: CamelMessage ⇒ val expected = CamelMessage("received test", Map(CamelMessage.MessageExchangeId -> "123", "test" -> "result")) - assert(message === expected) + message must be(expected) case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected) } } @@ -244,14 +239,12 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd when("a test message causing an exception is sent to the producer with ask") val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123")) - val future = producer.ask(message)(timeoutDuration) - Await.result(future, timeoutDuration) match { - case failure: Failure ⇒ + val future = producer.ask(message)(timeoutDuration).failed + Await.ready(future, timeoutDuration).value match { + case Some(Right(e: AkkaCamelException)) ⇒ then("a failure response must have been returned by the forward target") - val expectedFailureText = failure.cause.getMessage - val expectedHeaders = failure.headers - assert(expectedFailureText === "failure") - assert(expectedHeaders === Map(CamelMessage.MessageExchangeId -> "123", "test" -> "failure")) + e.getMessage must be("failure") + e.headers must be(Map(CamelMessage.MessageExchangeId -> "123", "test" -> "failure")) case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected) } } @@ -276,7 +269,7 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd when("a test message causing an exception is sent to the producer with !") mockEndpoint.expectedMessageCount(1) - mockEndpoint.message(0).body().isInstanceOf(classOf[Failure]) + mockEndpoint.message(0).body().isInstanceOf(classOf[akka.actor.Status.Failure]) producer.tell(CamelMessage("fail", Map()), producer) then("a failure response must have been produced by the forward target") @@ -303,13 +296,15 @@ object ProducerFeatureTest { class TestForwarder(uri: String, target: ActorRef) extends Actor with Producer { def endpointUri = uri - override protected def routeResponse(msg: Any): Unit = target forward msg + override def headersToCopy = Set(CamelMessage.MessageExchangeId, "test") + + override def routeResponse(msg: Any): Unit = target forward msg } class TestResponder extends Actor { protected def receive = { case msg: CamelMessage ⇒ msg.body match { - case "fail" ⇒ context.sender ! (Failure(new Exception("failure"), msg.headers)) + case "fail" ⇒ context.sender ! akka.actor.Status.Failure(new AkkaCamelException(new Exception("failure"), msg.headers)) case _ ⇒ context.sender ! (msg.mapBody { body: String ⇒ "received %s" format body @@ -322,8 +317,10 @@ object ProducerFeatureTest { protected def receive = { case msg: CamelMessage ⇒ context.sender ! (msg.addHeader("test" -> "result")) - case msg: Failure ⇒ - context.sender ! (Failure(msg.cause, msg.headers + ("test" -> "failure"))) + case msg: akka.actor.Status.Failure ⇒ + msg.cause match { + case e: AkkaCamelException ⇒ context.sender ! Status.Failure(new AkkaCamelException(e, e.headers + ("test" -> "failure"))) + } } } diff --git a/akka-camel/src/test/scala/akka/camel/UntypedProducerTest.scala b/akka-camel/src/test/scala/akka/camel/UntypedProducerTest.scala index aa7b8e3931..411aa0b938 100644 --- a/akka-camel/src/test/scala/akka/camel/UntypedProducerTest.scala +++ b/akka-camel/src/test/scala/akka/camel/UntypedProducerTest.scala @@ -14,8 +14,9 @@ import akka.pattern._ import akka.dispatch.Await import akka.util.duration._ import org.scalatest._ +import matchers.MustMatchers -class UntypedProducerTest extends WordSpec with BeforeAndAfterAll with BeforeAndAfterEach with SharedCamelSystem with GivenWhenThen { +class UntypedProducerTest extends WordSpec with MustMatchers with BeforeAndAfterAll with BeforeAndAfterEach with SharedCamelSystem with GivenWhenThen { import UntypedProducerTest._ val timeout = 1 second override protected def beforeAll = { @@ -38,7 +39,7 @@ class UntypedProducerTest extends WordSpec with BeforeAndAfterAll with BeforeAnd then("a normal response should have been returned by the producer") val expected = CamelMessage("received test", Map(CamelMessage.MessageExchangeId -> "123")) Await.result(future, timeout) match { - case result: CamelMessage ⇒ assert(result === expected) + case result: CamelMessage ⇒ result must be(expected) case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected) } @@ -50,19 +51,15 @@ class UntypedProducerTest extends WordSpec with BeforeAndAfterAll with BeforeAnd when("a test message causing an exception is sent to the producer with ask") val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123")) - val future = producer.ask(message)(timeout) + val future = producer.ask(message)(timeout).failed then("a failure response should have been returned by the producer") - Await.result(future, timeout) match { - case result: Failure ⇒ { - val expectedFailureText = result.cause.getMessage - val expectedHeaders = result.headers - assert(expectedFailureText === "failure") - assert(expectedHeaders === Map(CamelMessage.MessageExchangeId -> "123")) - } + Await.ready(future, timeout).value match { + case Some(Right(e: AkkaCamelException)) ⇒ + e.getMessage must be("failure") + e.headers must be(Map(CamelMessage.MessageExchangeId -> "123")) case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected) } } - } "An UntypedProducer producing a message to a sync Camel route and then forwarding the response" must { @@ -73,7 +70,7 @@ class UntypedProducerTest extends WordSpec with BeforeAndAfterAll with BeforeAnd when("a test message is sent to the producer with !") mockEndpoint.expectedBodiesReceived("received test") - val result = producer.tell(CamelMessage("test", Map[String, Any]()), producer) + producer.tell(CamelMessage("test", Map[String, Any]()), producer) then("a normal response should have been sent") mockEndpoint.assertIsSatisfied diff --git a/akka-camel/src/test/scala/akka/camel/internal/component/ActorProducerTest.scala b/akka-camel/src/test/scala/akka/camel/internal/component/ActorProducerTest.scala index 0ff385c91c..8146b17399 100644 --- a/akka-camel/src/test/scala/akka/camel/internal/component/ActorProducerTest.scala +++ b/akka-camel/src/test/scala/akka/camel/internal/component/ActorProducerTest.scala @@ -21,6 +21,7 @@ import akka.camel.TestSupport._ import java.util.concurrent.{ TimeoutException, CountDownLatch, TimeUnit } import org.mockito.{ ArgumentMatcher, Matchers, Mockito } import org.scalatest.matchers.MustMatchers +import akka.actor.Status.Failure class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with MustMatchers with ActorProducerFixture { @@ -33,7 +34,7 @@ class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with producer = given(actor = null) producer.processExchangeAdapter(exchange) - verify(exchange).setFailure(any[Failure]) + verify(exchange).setFailure(any[FailureResult]) } } @@ -82,7 +83,7 @@ class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with "set failure message to timeout" in { process() - verify(exchange).setFailure(any[Failure]) + verify(exchange).setFailure(any[FailureResult]) } } @@ -97,7 +98,7 @@ class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with def verifyFailureIsSet { producer.processExchangeAdapter(exchange, asyncCallback) asyncCallback.awaitCalled() - verify(exchange).setFailure(any[Failure]) + verify(exchange).setFailure(any[FailureResult]) } "out-capable" when { @@ -130,7 +131,8 @@ class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with "response is Failure" must { "set an exception on exchange" in { - val failure = Failure(new RuntimeException("some failure")) + val exception = new RuntimeException("some failure") + val failure = Failure(exception) producer = given(outCapable = true) @@ -142,7 +144,7 @@ class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with asyncCallback.awaitCalled(remaining) } - verify(exchange).setFailure(failure) + verify(exchange).setFailure(FailureResult(exception)) } } @@ -151,8 +153,8 @@ class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with producer = given(outCapable = true, replyTimeout = 10 millis) producer.processExchangeAdapter(exchange, asyncCallback) asyncCallback.awaitCalled(100 millis) - verify(exchange).setFailure(Matchers.argThat(new ArgumentMatcher[Failure] { - def matches(failure: AnyRef) = { failure.asInstanceOf[Failure].getCause must be(anInstanceOf[TimeoutException]); true } + verify(exchange).setFailure(Matchers.argThat(new ArgumentMatcher[FailureResult] { + def matches(failure: AnyRef) = { failure.asInstanceOf[FailureResult].cause must be(anInstanceOf[TimeoutException]); true } })) } @@ -213,7 +215,7 @@ class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with asyncCallback.expectDoneAsyncWithin(remaining); info("async callback called") } verify(exchange, never()).setResponse(any[CamelMessage]); info("no response forwarded to exchange") - verify(exchange).setFailure(any[Failure]); info("failure set") + verify(exchange).setFailure(any[FailureResult]); info("failure set") } } @@ -224,7 +226,7 @@ class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with producer.processExchangeAdapter(exchange, asyncCallback) asyncCallback.awaitCalled(100 millis) - verify(exchange).setFailure(any[Failure]) + verify(exchange).setFailure(any[FailureResult]) } } @@ -242,7 +244,7 @@ class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with asyncCallback.awaitCalled(remaining); } verify(exchange, never()).setResponse(any[CamelMessage]); info("no response forwarded to exchange") - verify(exchange).setFailure(any[Failure]); info("failure set") + verify(exchange).setFailure(any[FailureResult]); info("failure set") } } }