From 8426ded00d12f1b84e7c83968a326cbe0969cd38 Mon Sep 17 00:00:00 2001 From: RayRoestenburg Date: Mon, 7 May 2012 14:18:06 +0200 Subject: [PATCH] ticket-1732, return Failure on producer ask _and_ throw AkkaCamelException so supervision can occur on camel failures --- .../main/scala/akka/camel/CamelMessage.scala | 12 +++- .../src/main/scala/akka/camel/Producer.scala | 7 +- .../camel/internal/CamelExchangeAdapter.scala | 41 +++++++---- .../internal/component/ActorComponent.scala | 2 +- .../akka/camel/CamelExchangeAdapterTest.scala | 22 ++++-- .../akka/camel/ProducerFeatureTest.scala | 72 +++++++++++++------ 6 files changed, 112 insertions(+), 44 deletions(-) diff --git a/akka-camel/src/main/scala/akka/camel/CamelMessage.scala b/akka-camel/src/main/scala/akka/camel/CamelMessage.scala index 45787562a7..0c1a77bb58 100644 --- a/akka-camel/src/main/scala/akka/camel/CamelMessage.scala +++ b/akka-camel/src/main/scala/akka/camel/CamelMessage.scala @@ -10,6 +10,7 @@ import scala.collection.JavaConversions._ import akka.japi.{ Function ⇒ JFunction } import org.apache.camel.{ CamelContext, Message ⇒ JCamelMessage } +import akka.AkkaException /** * An immutable representation of a Camel message. @@ -273,4 +274,13 @@ case class Failure(val cause: Throwable, val headers: Map[String, Any] = Map.emp * Java API */ def getHeaders: JMap[String, Any] = headers -} \ No newline at end of file +} + +/** + * An exception indicating that the exchange to the camel endpoint failed. + * It contains the failure cause obtained from Exchange.getException and the headers from either the Exchange.getIn + * message or Exchange.getOut message, depending on the exchange pattern. + * + */ +class AkkaCamelException private[akka] (cause: Throwable, val headers: Map[String, Any] = Map.empty) + extends AkkaException(cause.getMessage, cause) diff --git a/akka-camel/src/main/scala/akka/camel/Producer.scala b/akka-camel/src/main/scala/akka/camel/Producer.scala index d42b78911a..a9990f4ec1 100644 --- a/akka-camel/src/main/scala/akka/camel/Producer.scala +++ b/akka-camel/src/main/scala/akka/camel/Producer.scala @@ -89,13 +89,14 @@ trait ProducerSupport { this: Actor ⇒ */ protected def produce: Receive = { case res: MessageResult ⇒ routeResponse(res.message) - case res: FailureResult ⇒ routeResponse(res.failure) - case msg ⇒ { + case res: FailureResult ⇒ + routeResponse(res.failure) + throw new AkkaCamelException(res.failure.cause, res.failure.headers) + case msg ⇒ if (oneway) produce(transformOutgoingMessage(msg), ExchangePattern.InOnly) else produce(transformOutgoingMessage(msg), ExchangePattern.InOut) - } } /** diff --git a/akka-camel/src/main/scala/akka/camel/internal/CamelExchangeAdapter.scala b/akka-camel/src/main/scala/akka/camel/internal/CamelExchangeAdapter.scala index 51e0ef5ca0..bf09685cca 100644 --- a/akka-camel/src/main/scala/akka/camel/internal/CamelExchangeAdapter.scala +++ b/akka-camel/src/main/scala/akka/camel/internal/CamelExchangeAdapter.scala @@ -5,7 +5,7 @@ import scala.collection.JavaConversions._ import org.apache.camel.util.ExchangeHelper import org.apache.camel.{ Exchange, Message ⇒ JCamelMessage } -import akka.camel.{ Failure, CamelMessage } +import akka.camel.{ Failure, AkkaCamelException, CamelMessage } /** * For internal use only. @@ -53,6 +53,24 @@ private[camel] class CamelExchangeAdapter(exchange: Exchange) { */ def toResponseMessage: CamelMessage = toResponseMessage(Map.empty) + /** + * Creates an AkkaCamelException object from the adapted Exchange. + * + * @see AkkaCamelException + */ + def toAkkaCamelException: AkkaCamelException = toAkkaCamelException(Map.empty) + + /** + * Creates an AkkaCamelException object from the adapted Exchange. + * + * @param headers additional headers to set on the created CamelMessage in addition to those + * in the Camel message. + * + * @see AkkaCamelException + */ + def toAkkaCamelException(headers: Map[String, Any]): AkkaCamelException = + new AkkaCamelException(exchange.getException, headers ++ response.getHeaders) + /** * Creates a Failure object from the adapted Exchange. * @@ -60,6 +78,16 @@ private[camel] class CamelExchangeAdapter(exchange: Exchange) { */ def toFailureMessage: Failure = toFailureMessage(Map.empty) + /** + * Creates a Failure object from the adapted Exchange. + * + * @param headers additional headers to set on the created CamelMessage in addition to those + * in the Camel message. + * + * @see Failure + */ + def toFailureMessage(headers: Map[String, Any]): Failure = Failure(exchange.getException, headers ++ response.getHeaders) + /** * Creates a CamelMessage object from Exchange.getIn. * @@ -77,17 +105,6 @@ private[camel] class CamelExchangeAdapter(exchange: Exchange) { */ def toResponseMessage(headers: Map[String, Any]): CamelMessage = CamelMessage.from(response, headers) - /** - * Creates a Failure object from the adapted Exchange. - * - * @param headers additional headers to set on the created CamelMessage in addition to those - * in the Camel message. - * - * @see Failure - */ - def toFailureMessage(headers: Map[String, Any]): Failure = - Failure(exchange.getException, headers ++ response.getHeaders) - private def request = exchange.getIn private def response: JCamelMessage = ExchangeHelper.getResultMessage(exchange) diff --git a/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala b/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala index 31071a5a35..bf284c37c8 100644 --- a/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala +++ b/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala @@ -170,7 +170,7 @@ private[camel] class ActorProducer(val endpoint: ActorEndpoint, camel: Camel) ex } private def forwardResponseTo(exchange: CamelExchangeAdapter): PartialFunction[Either[Throwable, Any], Unit] = { - case Right(failure: CamelFailure) ⇒ exchange.setFailure(failure); + case Right(failure: CamelFailure) ⇒ exchange.setFailure(failure) case Right(msg) ⇒ exchange.setResponse(CamelMessage.canonicalize(msg)) case Left(e: TimeoutException) ⇒ exchange.setFailure(CamelFailure(new TimeoutException("Failed to get response from the actor [%s] within timeout [%s]. Check replyTimeout and blocking settings [%s]" format (endpoint.path, endpoint.replyTimeout, endpoint)))) case Left(throwable) ⇒ exchange.setFailure(CamelFailure(throwable)) diff --git a/akka-camel/src/test/scala/akka/camel/CamelExchangeAdapterTest.scala b/akka-camel/src/test/scala/akka/camel/CamelExchangeAdapterTest.scala index 676062a2b7..fd633b85cb 100644 --- a/akka-camel/src/test/scala/akka/camel/CamelExchangeAdapterTest.scala +++ b/akka-camel/src/test/scala/akka/camel/CamelExchangeAdapterTest.scala @@ -64,6 +64,8 @@ class CamelExchangeAdapterTest extends FunSuite with SharedCamelSystem with Mess test("mustCreateFailureMessageFromExceptionAndInMessage") { val e1 = sampleInOnly e1.setException(new Exception("test1")) + assert(e1.toAkkaCamelException.getMessage === "test1") + assert(e1.toAkkaCamelException.headers("key-in") === "val-in") assert(e1.toFailureMessage.cause.getMessage === "test1") assert(e1.toFailureMessage.headers("key-in") === "val-in") } @@ -71,6 +73,8 @@ class CamelExchangeAdapterTest extends FunSuite with SharedCamelSystem with Mess test("mustCreateFailureMessageFromExceptionAndOutMessage") { val e1 = sampleInOut e1.setException(new Exception("test2")) + assert(e1.toAkkaCamelException.getMessage === "test2") + assert(e1.toAkkaCamelException.headers("key-out") === "val-out") assert(e1.toFailureMessage.cause.getMessage === "test2") assert(e1.toFailureMessage.headers("key-out") === "val-out") } @@ -93,19 +97,29 @@ class CamelExchangeAdapterTest extends FunSuite with SharedCamelSystem with Mess test("mustCreateFailureMessageFromExceptionAndInMessageWithAdditionalHeader") { val e1 = sampleInOnly e1.setException(new Exception("test1")) - assert(e1.toFailureMessage.cause.getMessage === "test1") - val headers = e1.toFailureMessage(Map("x" -> "y")).headers + assert(e1.toAkkaCamelException.getMessage === "test1") + val headers = e1.toAkkaCamelException(Map("x" -> "y")).headers assert(headers("key-in") === "val-in") assert(headers("x") === "y") + + assert(e1.toFailureMessage.cause.getMessage === "test1") + val failureHeaders = e1.toFailureMessage(Map("x" -> "y")).headers + assert(failureHeaders("key-in") === "val-in") + assert(failureHeaders("x") === "y") + } test("mustCreateFailureMessageFromExceptionAndOutMessageWithAdditionalHeader") { val e1 = sampleInOut e1.setException(new Exception("test2")) - assert(e1.toFailureMessage.cause.getMessage === "test2") - val headers = e1.toFailureMessage(Map("x" -> "y")).headers + assert(e1.toAkkaCamelException.getMessage === "test2") + val headers = e1.toAkkaCamelException(Map("x" -> "y")).headers assert(headers("key-out") === "val-out") assert(headers("x") === "y") + assert(e1.toFailureMessage.cause.getMessage === "test2") + val failureHeaders = e1.toFailureMessage(Map("x" -> "y")).headers + assert(failureHeaders("key-out") === "val-out") + assert(failureHeaders("x") === "y") } private def sampleInOnly = sampleExchange(ExchangePattern.InOnly) diff --git a/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala b/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala index 44ce2540c8..8c1ef22b92 100644 --- a/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala @@ -7,17 +7,21 @@ package akka.camel import org.apache.camel.{ Exchange, Processor } import org.apache.camel.builder.RouteBuilder import org.apache.camel.component.mock.MockEndpoint +import akka.dispatch.{ Future, Await } +import akka.camel.TestSupport.SharedCamelSystem +import akka.actor.SupervisorStrategy.Stop +import org.scalatest.{ GivenWhenThen, BeforeAndAfterEach, BeforeAndAfterAll, WordSpec } import akka.actor._ import akka.pattern._ -import akka.dispatch.Await import akka.util.duration._ -import akka.camel.TestSupport.SharedCamelSystem -import org.scalatest._ +import akka.util.Timeout +import akka.testkit.TestLatch +import org.scalatest.matchers.MustMatchers /** * Tests the features of the Camel Producer. */ -class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAndAfterEach with SharedCamelSystem with GivenWhenThen { +class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAndAfterEach with SharedCamelSystem with GivenWhenThen with MustMatchers { import ProducerFeatureTest._ @@ -25,8 +29,8 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd // to make testing equality of messages easier, otherwise the breadcrumb shows up in the result. camelContext.setUseBreadcrumb(false) - val timeout = 1 second - + val timeoutDuration = 1 second + implicit val timeout = Timeout(timeoutDuration) override protected def beforeAll { camelContext.addRoutes(new TestRoute(system)) } override protected def afterEach { mockEndpoint.reset() } @@ -38,10 +42,10 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd val producer = system.actorOf(Props(new TestProducer("direct:producer-test-2", true))) when("a test message is sent to the producer with ?") val message = CamelMessage("test", Map(CamelMessage.MessageExchangeId -> "123")) - val future = producer.ask(message)(timeout) + val future = producer.ask(message)(timeoutDuration) then("a normal response must have been returned by the producer") val expected = CamelMessage("received TEST", Map(CamelMessage.MessageExchangeId -> "123")) - Await.result(future, timeout) match { + Await.result(future, timeoutDuration) match { case result: CamelMessage ⇒ assert(result === expected) case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected) } @@ -49,12 +53,30 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd "produce a message and receive failure response" in { given("a registered two-way producer") - val producer = system.actorOf(Props(new TestProducer("direct:producer-test-2"))) + val latch = TestLatch() + var deadActor: Option[ActorRef] = None + val supervisor = system.actorOf(Props(new Actor { + def receive = { + case p: Props ⇒ { + val producer = context.actorOf(p) + context.watch(producer) + sender ! producer + } + case Terminated(actorRef) ⇒ { + deadActor = Some(actorRef) + latch.countDown() + } + } + override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) { + case _: AkkaCamelException ⇒ Stop + } + })) + val producer = Await.result[ActorRef](supervisor.ask(Props(new TestProducer("direct:producer-test-2"))).asInstanceOf[Future[ActorRef]], timeoutDuration) when("a test message causing an exception is sent to the producer with ?") val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123")) - val future = producer.ask(message)(timeout) - Await.result(future, timeout) match { + val future = producer.ask(message)(timeoutDuration) + Await.result(future, timeoutDuration) match { case result: Failure ⇒ then("a failure response must have been returned by the producer") val expectedFailureText = result.cause.getMessage @@ -63,6 +85,10 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd assert(expectedHeaders === Map(CamelMessage.MessageExchangeId -> "123")) case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected) } + then("an AkkaCamelException must have been thrown, which can be used for supervision") + // check that the supervisor stopped the producer and received a Terminated + Await.ready(latch, timeoutDuration) + deadActor must be(Some(producer)) } "produce a message oneway" in { @@ -98,9 +124,9 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd when("a test message is sent to the producer with ?") val message = CamelMessage("test", Map(CamelMessage.MessageExchangeId -> "123")) - val future = producer.ask(message)(timeout) + val future = producer.ask(message)(timeoutDuration) - Await.result(future, timeout) match { + Await.result(future, timeoutDuration) match { case result: CamelMessage ⇒ then("a normal response must have been returned by the producer") val expected = CamelMessage("received test", Map(CamelMessage.MessageExchangeId -> "123")) @@ -115,8 +141,8 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd when("a test message causing an exception is sent to the producer with ?") val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123")) - val future = producer.ask(message)(timeout) - Await.result(future, timeout) match { + val future = producer.ask(message)(timeoutDuration) + Await.result(future, timeoutDuration) match { case result: Failure ⇒ then("a failure response must have been returned by the producer") val expectedFailureText = result.cause.getMessage @@ -134,9 +160,9 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd when("a test message is sent to the producer with ?") val message = CamelMessage("test", Map(CamelMessage.MessageExchangeId -> "123")) - val future = producer.ask(message)(timeout) + val future = producer.ask(message)(timeoutDuration) - Await.result(future, timeout) match { + Await.result(future, timeoutDuration) match { case result: CamelMessage ⇒ then("a normal response must have been returned by the forward target") val expected = CamelMessage("received test", Map(CamelMessage.MessageExchangeId -> "123", "test" -> "result")) @@ -152,8 +178,8 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd when("a test message causing an exception is sent to the producer with ?") val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123")) - val future = producer.ask(message)(timeout) - Await.result(future, timeout) match { + val future = producer.ask(message)(timeoutDuration) + Await.result(future, timeoutDuration) match { case failure: Failure ⇒ then("a failure response must have been returned by the forward target") val expectedFailureText = failure.cause.getMessage @@ -200,10 +226,10 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd when("a test message is sent to the producer with ?") val message = CamelMessage("test", Map(CamelMessage.MessageExchangeId -> "123")) - val future = producer.ask(message)(timeout) + val future = producer.ask(message)(timeoutDuration) then("a normal response must have been returned by the forward target") - Await.result(future, timeout) match { + Await.result(future, timeoutDuration) match { case message: CamelMessage ⇒ val expected = CamelMessage("received test", Map(CamelMessage.MessageExchangeId -> "123", "test" -> "result")) assert(message === expected) @@ -218,8 +244,8 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd when("a test message causing an exception is sent to the producer with ask") val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123")) - val future = producer.ask(message)(timeout) - Await.result(future, timeout) match { + val future = producer.ask(message)(timeoutDuration) + Await.result(future, timeoutDuration) match { case failure: Failure ⇒ then("a failure response must have been returned by the forward target") val expectedFailureText = failure.cause.getMessage