camel: ticket #2781

ProducerSupport.transformOutgoingMessage was called from wrong context.
This commit is contained in:
Raymond Roestenburg 2012-12-07 16:31:44 +01:00 committed by Roland
parent fa3a5c97de
commit 5ad22c8e9c
2 changed files with 54 additions and 3 deletions

View file

@ -64,7 +64,7 @@ trait ProducerSupport extends Actor with CamelSupport {
for (
child producerChild;
(sender, msg) messages
) child.tell(msg, sender)
) child.tell(transformOutgoingMessage(msg), sender)
Map()
}
}
@ -76,7 +76,7 @@ trait ProducerSupport extends Actor with CamelSupport {
case msg
producerChild match {
case Some(child) child forward msg
case Some(child) child forward transformOutgoingMessage(msg)
case None messages += (sender -> msg)
}
}
@ -108,7 +108,7 @@ trait ProducerSupport extends Actor with CamelSupport {
private class ProducerChild(endpoint: Endpoint, processor: SendProcessor) extends Actor {
def receive = {
case msg @ (_: FailureResult | _: MessageResult) context.parent forward msg
case msg produce(endpoint, processor, transformOutgoingMessage(msg), if (oneway) ExchangePattern.InOnly else ExchangePattern.InOut)
case msg produce(endpoint, processor, msg, if (oneway) ExchangePattern.InOnly else ExchangePattern.InOut)
}
/**
* Initiates a message exchange of given <code>pattern</code> with the endpoint specified by

View file

@ -242,6 +242,19 @@ class ProducerFeatureTest extends TestKit(ActorSystem("test", AkkaSpec.testConf)
stop(consumer)
stop(producer)
}
"be able to transform outgoing messages and have a valid sender reference" in {
import TestSupport._
filterEvents(EventFilter[Exception](occurrences = 1)) {
val producerSupervisor = system.actorOf(Props(new ProducerSupervisor(Props(new ChildProducer("mock:mock", true)))), "ignore-deadletter-sender-ref-test")
mockEndpoint.reset()
producerSupervisor.tell(CamelMessage("test", Map()), testActor)
producerSupervisor.tell(CamelMessage("err", Map()), testActor)
mockEndpoint.expectedMessageCount(1)
mockEndpoint.expectedBodiesReceived("TEST")
expectMsg("TEST")
system.stop(producerSupervisor)
}
}
}
private def mockEndpoint = camel.context.getEndpoint("mock:mock", classOf[MockEndpoint])
@ -254,6 +267,44 @@ class ProducerFeatureTest extends TestKit(ActorSystem("test", AkkaSpec.testConf)
}
object ProducerFeatureTest {
class ProducerSupervisor(childProps: Props) extends Actor {
override def supervisorStrategy = SupervisorStrategy.stoppingStrategy
val child = context.actorOf(childProps, "producer-supervisor-child")
val duration = 10 seconds
implicit val timeout = Timeout(duration)
implicit val ec = context.system.dispatcher
Await.ready(CamelExtension(context.system).activationFutureFor(child), timeout.duration)
def receive = {
case msg: CamelMessage
child forward (msg)
case (aref: ActorRef, msg: String)
aref ! msg
}
}
class ChildProducer(uri: String, upper: Boolean = false) extends Actor with Producer {
override def oneway = true
var lastSender: Option[ActorRef] = None
var lastMessage: Option[String] = None
def endpointUri = uri
override def transformOutgoingMessage(msg: Any) = msg match {
case msg: CamelMessage if (upper) msg.mapBody {
body: String
if (body == "err") throw new Exception("Crash!")
val upperMsg = body.toUpperCase
lastSender = Some(sender)
lastMessage = Some(upperMsg)
}
else msg
}
override def postStop() {
for (msg lastMessage; aref lastSender) context.parent ! (aref, msg)
super.postStop()
}
}
class TestProducer(uri: String, upper: Boolean = false) extends Actor with Producer {
def endpointUri = uri