diff --git a/akka-camel/src/main/scala/akka/camel/Producer.scala b/akka-camel/src/main/scala/akka/camel/Producer.scala index 017304ea4d..ca05f7a45d 100644 --- a/akka-camel/src/main/scala/akka/camel/Producer.scala +++ b/akka-camel/src/main/scala/akka/camel/Producer.scala @@ -64,7 +64,7 @@ trait ProducerSupport extends Actor with CamelSupport { for ( child ← producerChild; (sender, msg) ← messages - ) child.tell(msg, sender) + ) child.tell(transformOutgoingMessage(msg), sender) Map() } } @@ -76,7 +76,7 @@ trait ProducerSupport extends Actor with CamelSupport { case msg ⇒ producerChild match { - case Some(child) ⇒ child forward msg + case Some(child) ⇒ child forward transformOutgoingMessage(msg) case None ⇒ messages += (sender -> msg) } } @@ -108,7 +108,7 @@ trait ProducerSupport extends Actor with CamelSupport { private class ProducerChild(endpoint: Endpoint, processor: SendProcessor) extends Actor { def receive = { case msg @ (_: FailureResult | _: MessageResult) ⇒ context.parent forward msg - case msg ⇒ produce(endpoint, processor, transformOutgoingMessage(msg), if (oneway) ExchangePattern.InOnly else ExchangePattern.InOut) + case msg ⇒ produce(endpoint, processor, msg, if (oneway) ExchangePattern.InOnly else ExchangePattern.InOut) } /** * Initiates a message exchange of given pattern with the endpoint specified by diff --git a/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala b/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala index 07a46781c8..58cd0713d6 100644 --- a/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala @@ -242,6 +242,19 @@ class ProducerFeatureTest extends TestKit(ActorSystem("test", AkkaSpec.testConf) stop(consumer) stop(producer) } + "be able to transform outgoing messages and have a valid sender reference" in { + import TestSupport._ + filterEvents(EventFilter[Exception](occurrences = 1)) { + val producerSupervisor = system.actorOf(Props(new ProducerSupervisor(Props(new ChildProducer("mock:mock", true)))), "ignore-deadletter-sender-ref-test") + mockEndpoint.reset() + producerSupervisor.tell(CamelMessage("test", Map()), testActor) + producerSupervisor.tell(CamelMessage("err", Map()), testActor) + mockEndpoint.expectedMessageCount(1) + mockEndpoint.expectedBodiesReceived("TEST") + expectMsg("TEST") + system.stop(producerSupervisor) + } + } } private def mockEndpoint = camel.context.getEndpoint("mock:mock", classOf[MockEndpoint]) @@ -254,6 +267,44 @@ class ProducerFeatureTest extends TestKit(ActorSystem("test", AkkaSpec.testConf) } object ProducerFeatureTest { + class ProducerSupervisor(childProps: Props) extends Actor { + override def supervisorStrategy = SupervisorStrategy.stoppingStrategy + val child = context.actorOf(childProps, "producer-supervisor-child") + val duration = 10 seconds + implicit val timeout = Timeout(duration) + implicit val ec = context.system.dispatcher + Await.ready(CamelExtension(context.system).activationFutureFor(child), timeout.duration) + def receive = { + case msg: CamelMessage ⇒ + child forward (msg) + case (aref: ActorRef, msg: String) ⇒ + aref ! msg + } + } + class ChildProducer(uri: String, upper: Boolean = false) extends Actor with Producer { + override def oneway = true + + var lastSender: Option[ActorRef] = None + var lastMessage: Option[String] = None + def endpointUri = uri + + override def transformOutgoingMessage(msg: Any) = msg match { + case msg: CamelMessage ⇒ if (upper) msg.mapBody { + body: String ⇒ + if (body == "err") throw new Exception("Crash!") + val upperMsg = body.toUpperCase + lastSender = Some(sender) + lastMessage = Some(upperMsg) + } + else msg + } + + override def postStop() { + for (msg ← lastMessage; aref ← lastSender) context.parent ! (aref, msg) + super.postStop() + } + } + class TestProducer(uri: String, upper: Boolean = false) extends Actor with Producer { def endpointUri = uri