diff --git a/akka-camel/src/main/scala/Producer.scala b/akka-camel/src/main/scala/Producer.scala index 22173c97ca..aa37e11978 100644 --- a/akka-camel/src/main/scala/Producer.scala +++ b/akka-camel/src/main/scala/Producer.scala @@ -41,6 +41,8 @@ trait Producer { this: Actor => */ def oneway: Boolean = false + def forwardResultTo: Option[ActorRef] = None + /** * Returns the Camel endpoint URI to produce messages to. */ @@ -91,10 +93,27 @@ trait Producer { this: Actor => val senderFuture = self.senderFuture def done(doneSync: Boolean): Unit = { - val response = - if (exchange.isFailed) exchange.toFailureMessage(cmsg.headers(headersToCopy)) - else exchange.toResponseMessage(cmsg.headers(headersToCopy)) + val response = if (exchange.isFailed) + exchange.toFailureMessage(cmsg.headers(headersToCopy)) + else + exchange.toResponseMessage(cmsg.headers(headersToCopy)) + if (forwardResultTo.isDefined) + forward(response, forwardResultTo.get) + else + reply(response) + } + + private def forward(response: Any, target: ActorRef) = { + // TODO: avoid redundancy to ActorRef.forward + if (target.isRunning) { + if (senderFuture.isDefined) target.postMessageToMailboxAndCreateFutureResultWithTimeout(response, target.timeout, sender, senderFuture) + else target.postMessageToMailbox(response, sender) // initial sender doesn't need be an actor + } + } + + private def reply(response: Any) = { + // TODO: avoid redundancy to ActorRef.reply if (senderFuture.isDefined) senderFuture.get completeWithResult response else if (sender.isDefined) sender.get ! response else log.warning("No destination for sending response") diff --git a/akka-camel/src/test/scala/ProducerFeatureTest.scala b/akka-camel/src/test/scala/ProducerFeatureTest.scala index 6613864790..458250bcac 100644 --- a/akka-camel/src/test/scala/ProducerFeatureTest.scala +++ b/akka-camel/src/test/scala/ProducerFeatureTest.scala @@ -5,8 +5,8 @@ import org.apache.camel.builder.RouteBuilder import org.apache.camel.component.mock.MockEndpoint import org.scalatest.{GivenWhenThen, BeforeAndAfterEach, BeforeAndAfterAll, FeatureSpec} -import se.scalablesolutions.akka.actor.{Actor, ActorRegistry} import se.scalablesolutions.akka.actor.Actor._ +import se.scalablesolutions.akka.actor.{ActorRef, Actor, ActorRegistry} class ProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with BeforeAndAfterEach with GivenWhenThen { import ProducerFeatureTest._ @@ -27,30 +27,30 @@ class ProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with Before feature("Produce a message to a Camel endpoint") { - scenario("produce message and receive response") { - given("a registered asynchronous two-way producer for endpoint direct:producer-test-2") + scenario("produce message and receive normal response") { + given("a registered two-way producer") val producer = actorOf(new TestProducer("direct:producer-test-2")) producer.start - when("a test message is sent to the producer") + when("a test message is sent to the producer with !!") val message = Message("test", Map(Message.MessageExchangeId -> "123")) val result = producer !! message - then("the expected result message should be returned including a correlation identifier") + then("a normal response should have been returned by the producer") val expected = Message("received test", Map(Message.MessageExchangeId -> "123")) assert(result === Some(expected)) } - scenario("produce message and receive failure") { - given("a registered asynchronous two-way producer for endpoint direct:producer-test-2") + scenario("produce message and receive failure response") { + given("a registered two-way producer") val producer = actorOf(new TestProducer("direct:producer-test-2")) producer.start - when("a fail message is sent to the producer") + when("a test message causing an exception is sent to the producer with !!") val message = Message("fail", Map(Message.MessageExchangeId -> "123")) val result = (producer !! message).as[Failure] - then("the expected failure message should be returned including a correlation identifier") + then("a failure response should have been returned by the producer") val expectedFailureText = result.get.cause.getMessage val expectedHeaders = result.get.headers assert(expectedFailureText === "failure") @@ -58,24 +58,24 @@ class ProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with Before } scenario("produce message oneway") { - given("a registered asynchronous one-way producer for endpoint direct:producer-test-1") + given("a registered one-way producer") val producer = actorOf(new TestProducer("direct:producer-test-1") with Oneway) producer.start - when("a test message is sent to the producer") + when("a test message is sent to the producer with !") mockEndpoint.expectedBodiesReceived("test") producer ! Message("test") - then("the expected message should have been sent to mock:mock") + then("the test message should have been sent to mock:mock") mockEndpoint.assertIsSatisfied } scenario("produce message twoway without sender reference") { - given("a registered asynchronous two-way producer for endpoint direct:producer-test-1") + given("a registered two-way producer") val producer = actorOf(new TestProducer("direct:producer-test-1")) producer.start - when("a test message is sent to the producer") + when("a test message is sent to the producer with !") mockEndpoint.expectedBodiesReceived("test") producer ! Message("test") @@ -84,16 +84,91 @@ class ProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with Before } } + feature("Produce a message to a Camel endpoint and then forward the result") { + + scenario("produce message, forward and receive normal response") { + given("a registered two-way producer configured with a forward target") + val responder = actorOf[ReplyingForwardTarget].start + val producer = actorOf(new TestProducer("direct:producer-test-2", Some(responder))).start + + when("a test message is sent to the producer with !!") + val message = Message("test", Map(Message.MessageExchangeId -> "123")) + val result = producer !! message + + then("a normal response should have been returned by the forward target") + val expected = Message("received test", Map(Message.MessageExchangeId -> "123", "test" -> "result")) + assert(result === Some(expected)) + } + + scenario("produce message, forward and receive failure response") { + given("a registered two-way producer configured with a forward target") + val responder = actorOf[ReplyingForwardTarget].start + val producer = actorOf(new TestProducer("direct:producer-test-2", Some(responder))).start + + when("a test message causing an exception is sent to the producer with !!") + val message = Message("fail", Map(Message.MessageExchangeId -> "123")) + val result = (producer !! message).as[Failure] + + then("a failure response should have been returned by the forward target") + val expectedFailureText = result.get.cause.getMessage + val expectedHeaders = result.get.headers + assert(expectedFailureText === "failure") + assert(expectedHeaders === Map(Message.MessageExchangeId -> "123", "test" -> "failure")) + } + + scenario("produce message, forward and produce normal response") { + given("a registered one-way producer configured with a forward target") + val responder = actorOf[ProducingForwardTarget].start + val producer = actorOf(new TestProducer("direct:producer-test-2", Some(responder))).start + + when("a test message is sent to the producer with !") + mockEndpoint.expectedBodiesReceived("received test") + val result = producer ! Message("test") + + then("a normal response should have been produced by the forward target") + mockEndpoint.assertIsSatisfied + } + + scenario("produce message, forward and produce failure response") { + given("a registered one-way producer configured with a forward target") + val responder = actorOf[ProducingForwardTarget].start + val producer = actorOf(new TestProducer("direct:producer-test-2", Some(responder))).start + + when("a test message causing an exception is sent to the producer with !") + mockEndpoint.expectedMessageCount(1) + mockEndpoint.message(0).body().isInstanceOf(classOf[Failure]) + val result = producer ! Message("fail") + + then("a failure response should have been produced by the forward target") + mockEndpoint.assertIsSatisfied + } + } + private def mockEndpoint = CamelContextManager.context.getEndpoint("mock:mock", classOf[MockEndpoint]) } object ProducerFeatureTest { - class TestProducer(uri: String) extends Actor with Producer { + class TestProducer(uri: String, target: Option[ActorRef] = None) extends Actor with Producer { def endpointUri = uri + override def forwardResultTo = target + } + + class ReplyingForwardTarget extends Actor { + protected def receive = { + case msg: Message => + self.reply(msg.addHeader("test" -> "result")) + case msg: Failure => + self.reply(Failure(msg.cause, msg.headers + ("test" -> "failure"))) + } + } + + class ProducingForwardTarget extends Actor with Producer with Oneway { + def endpointUri = "direct:forward-test-1" } class TestRoute extends RouteBuilder { def configure { + from("direct:forward-test-1").to("mock:mock") // for one-way messaging tests from("direct:producer-test-1").to("mock:mock") // for two-way messaging tests