2012-01-19 14:38:44 +00:00
|
|
|
/**
|
2013-01-09 01:47:48 +01:00
|
|
|
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
2012-01-19 14:38:44 +00:00
|
|
|
*/
|
|
|
|
|
|
2011-05-23 11:37:56 -04:00
|
|
|
package akka.camel
|
|
|
|
|
|
2012-06-25 18:28:38 +02:00
|
|
|
import language.postfixOps
|
|
|
|
|
|
2011-05-23 11:37:56 -04:00
|
|
|
import org.apache.camel.{ Exchange, Processor }
|
|
|
|
|
import org.apache.camel.builder.RouteBuilder
|
|
|
|
|
import org.apache.camel.component.mock.MockEndpoint
|
2012-06-29 16:06:26 +02:00
|
|
|
import scala.concurrent.Await
|
2012-05-07 14:18:06 +02:00
|
|
|
import akka.camel.TestSupport.SharedCamelSystem
|
|
|
|
|
import akka.actor.SupervisorStrategy.Stop
|
2012-05-11 21:37:40 +02:00
|
|
|
import org.scalatest.{ BeforeAndAfterEach, BeforeAndAfterAll, WordSpec }
|
2012-01-19 14:38:44 +00:00
|
|
|
import akka.actor._
|
|
|
|
|
import akka.pattern._
|
2012-09-21 14:50:06 +02:00
|
|
|
import scala.concurrent.duration._
|
2012-05-07 14:18:06 +02:00
|
|
|
import akka.util.Timeout
|
|
|
|
|
import org.scalatest.matchers.MustMatchers
|
2012-08-14 08:04:24 +02:00
|
|
|
import akka.testkit._
|
2012-11-24 18:08:34 +01:00
|
|
|
import akka.actor.Status.Failure
|
2011-05-23 11:37:56 -04:00
|
|
|
|
2012-01-19 14:38:44 +00:00
|
|
|
/**
|
|
|
|
|
* Tests the features of the Camel Producer.
|
|
|
|
|
*/
|
2012-11-24 18:08:34 +01:00
|
|
|
class ProducerFeatureTest extends TestKit(ActorSystem("test", AkkaSpec.testConf)) with WordSpec with BeforeAndAfterAll with BeforeAndAfterEach with MustMatchers {
|
2011-05-23 11:37:56 -04:00
|
|
|
|
|
|
|
|
import ProducerFeatureTest._
|
2012-11-24 18:08:34 +01:00
|
|
|
implicit def camel = CamelExtension(system)
|
|
|
|
|
|
|
|
|
|
override protected def afterAll() {
|
|
|
|
|
super.afterAll()
|
|
|
|
|
system.shutdown()
|
|
|
|
|
}
|
2011-05-23 11:37:56 -04:00
|
|
|
|
2012-01-19 14:38:44 +00:00
|
|
|
val camelContext = camel.context
|
|
|
|
|
// to make testing equality of messages easier, otherwise the breadcrumb shows up in the result.
|
|
|
|
|
camelContext.setUseBreadcrumb(false)
|
2012-05-07 14:18:06 +02:00
|
|
|
val timeoutDuration = 1 second
|
|
|
|
|
implicit val timeout = Timeout(timeoutDuration)
|
2012-01-19 14:38:44 +00:00
|
|
|
override protected def beforeAll { camelContext.addRoutes(new TestRoute(system)) }
|
2011-05-23 11:37:56 -04:00
|
|
|
|
2012-01-19 14:38:44 +00:00
|
|
|
override protected def afterEach { mockEndpoint.reset() }
|
2011-05-23 11:37:56 -04:00
|
|
|
|
2012-03-01 17:32:10 +01:00
|
|
|
"A Producer on a sync Camel route" must {
|
|
|
|
|
"produce a message and receive normal response" in {
|
2012-09-03 12:08:46 +02:00
|
|
|
val producer = system.actorOf(Props(new TestProducer("direct:producer-test-2", true)), name = "direct-producer-2")
|
2012-03-01 17:32:10 +01:00
|
|
|
val message = CamelMessage("test", Map(CamelMessage.MessageExchangeId -> "123"))
|
2012-11-24 18:08:34 +01:00
|
|
|
producer.tell(message, testActor)
|
|
|
|
|
expectMsg(CamelMessage("received TEST", Map(CamelMessage.MessageExchangeId -> "123")))
|
2012-09-26 12:39:15 +02:00
|
|
|
stopGracefully(producer)
|
2011-05-23 11:37:56 -04:00
|
|
|
}
|
|
|
|
|
|
2012-03-01 17:32:10 +01:00
|
|
|
"produce a message and receive failure response" in {
|
2012-05-07 14:18:06 +02:00
|
|
|
val latch = TestLatch()
|
|
|
|
|
var deadActor: Option[ActorRef] = None
|
|
|
|
|
val supervisor = system.actorOf(Props(new Actor {
|
|
|
|
|
def receive = {
|
|
|
|
|
case p: Props ⇒ {
|
|
|
|
|
val producer = context.actorOf(p)
|
|
|
|
|
context.watch(producer)
|
|
|
|
|
sender ! producer
|
|
|
|
|
}
|
|
|
|
|
case Terminated(actorRef) ⇒ {
|
|
|
|
|
deadActor = Some(actorRef)
|
|
|
|
|
latch.countDown()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
|
|
|
|
|
case _: AkkaCamelException ⇒ Stop
|
|
|
|
|
}
|
2012-09-03 12:08:46 +02:00
|
|
|
}), name = "prod-anonymous-supervisor")
|
2012-11-24 18:08:34 +01:00
|
|
|
|
|
|
|
|
supervisor.tell(Props(new TestProducer("direct:producer-test-2")), testActor)
|
|
|
|
|
val producer = receiveOne(timeoutDuration).asInstanceOf[ActorRef]
|
2012-03-01 17:32:10 +01:00
|
|
|
val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123"))
|
2012-08-14 08:04:24 +02:00
|
|
|
filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) {
|
2012-11-24 18:08:34 +01:00
|
|
|
producer.tell(message, testActor)
|
|
|
|
|
expectMsgPF(timeoutDuration) {
|
|
|
|
|
case Failure(e: AkkaCamelException) ⇒
|
|
|
|
|
e.getMessage must be("failure")
|
|
|
|
|
e.headers must be(Map(CamelMessage.MessageExchangeId -> "123"))
|
|
|
|
|
}
|
2012-01-19 14:38:44 +00:00
|
|
|
}
|
2012-05-07 14:18:06 +02:00
|
|
|
Await.ready(latch, timeoutDuration)
|
|
|
|
|
deadActor must be(Some(producer))
|
2012-09-26 12:39:15 +02:00
|
|
|
stopGracefully(producer)
|
2011-05-23 11:37:56 -04:00
|
|
|
}
|
|
|
|
|
|
2012-03-01 17:32:10 +01:00
|
|
|
"produce a message oneway" in {
|
2012-09-03 12:08:46 +02:00
|
|
|
val producer = system.actorOf(Props(new TestProducer("direct:producer-test-1", true) with Oneway), name = "direct-producer-1-oneway")
|
2011-05-23 11:37:56 -04:00
|
|
|
mockEndpoint.expectedBodiesReceived("TEST")
|
2012-03-01 17:32:10 +01:00
|
|
|
producer ! CamelMessage("test", Map())
|
2012-01-19 14:38:44 +00:00
|
|
|
mockEndpoint.assertIsSatisfied()
|
2012-09-26 12:39:15 +02:00
|
|
|
stopGracefully(producer)
|
2011-05-23 11:37:56 -04:00
|
|
|
}
|
|
|
|
|
|
2012-03-01 17:32:10 +01:00
|
|
|
"produces message twoway without sender reference" in {
|
2012-09-03 12:08:46 +02:00
|
|
|
// this test causes a dead letter which can be ignored. The producer is two-way but a oneway tell is used
|
|
|
|
|
// to communicate with it and the response is ignored, which ends up in a dead letter
|
|
|
|
|
val producer = system.actorOf(Props(new TestProducer("direct:producer-test-1")), name = "ignore-this-deadletter-direct-producer-test-no-sender")
|
2011-05-23 11:37:56 -04:00
|
|
|
mockEndpoint.expectedBodiesReceived("test")
|
2012-03-01 17:32:10 +01:00
|
|
|
producer ! CamelMessage("test", Map())
|
2012-01-19 14:38:44 +00:00
|
|
|
mockEndpoint.assertIsSatisfied()
|
2012-09-26 12:39:15 +02:00
|
|
|
stopGracefully(producer)
|
2011-05-23 11:37:56 -04:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2012-03-01 17:32:10 +01:00
|
|
|
"A Producer on an async Camel route" must {
|
2011-05-23 11:37:56 -04:00
|
|
|
|
2012-03-01 17:32:10 +01:00
|
|
|
"produce message to direct:producer-test-3 and receive normal response" in {
|
2012-09-03 12:08:46 +02:00
|
|
|
val producer = system.actorOf(Props(new TestProducer("direct:producer-test-3")), name = "direct-producer-test-3")
|
2012-03-01 17:32:10 +01:00
|
|
|
val message = CamelMessage("test", Map(CamelMessage.MessageExchangeId -> "123"))
|
2012-11-24 18:08:34 +01:00
|
|
|
producer.tell(message, testActor)
|
|
|
|
|
expectMsg(CamelMessage("received test", Map(CamelMessage.MessageExchangeId -> "123")))
|
2012-09-26 12:39:15 +02:00
|
|
|
stopGracefully(producer)
|
2011-05-23 11:37:56 -04:00
|
|
|
}
|
|
|
|
|
|
2012-03-01 17:32:10 +01:00
|
|
|
"produce message to direct:producer-test-3 and receive failure response" in {
|
2012-09-03 12:08:46 +02:00
|
|
|
val producer = system.actorOf(Props(new TestProducer("direct:producer-test-3")), name = "direct-producer-test-3-receive-failure")
|
2012-03-01 17:32:10 +01:00
|
|
|
val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123"))
|
2012-05-11 21:37:40 +02:00
|
|
|
|
2012-08-14 08:04:24 +02:00
|
|
|
filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) {
|
2012-11-24 18:08:34 +01:00
|
|
|
producer.tell(message, testActor)
|
|
|
|
|
expectMsgPF(timeoutDuration) {
|
|
|
|
|
case Failure(e: AkkaCamelException) ⇒
|
|
|
|
|
e.getMessage must be("failure")
|
|
|
|
|
e.headers must be(Map(CamelMessage.MessageExchangeId -> "123"))
|
|
|
|
|
}
|
2012-01-19 14:38:44 +00:00
|
|
|
}
|
2012-09-26 12:39:15 +02:00
|
|
|
stopGracefully(producer)
|
2011-05-23 11:37:56 -04:00
|
|
|
}
|
|
|
|
|
|
2012-03-01 17:32:10 +01:00
|
|
|
"produce message, forward normal response of direct:producer-test-2 to a replying target actor and receive response" in {
|
2012-09-03 12:08:46 +02:00
|
|
|
val target = system.actorOf(Props[ReplyingForwardTarget], name = "reply-forwarding-target")
|
|
|
|
|
val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target)), name = "direct-producer-test-2-forwarder")
|
2012-03-01 17:32:10 +01:00
|
|
|
val message = CamelMessage("test", Map(CamelMessage.MessageExchangeId -> "123"))
|
2012-11-24 18:08:34 +01:00
|
|
|
producer.tell(message, testActor)
|
|
|
|
|
expectMsg(CamelMessage("received test", Map(CamelMessage.MessageExchangeId -> "123", "test" -> "result")))
|
2012-09-26 12:39:15 +02:00
|
|
|
stopGracefully(target, producer)
|
2011-05-23 11:37:56 -04:00
|
|
|
}
|
|
|
|
|
|
2012-03-01 17:32:10 +01:00
|
|
|
"produce message, forward failure response of direct:producer-test-2 to a replying target actor and receive response" in {
|
2012-09-03 12:08:46 +02:00
|
|
|
val target = system.actorOf(Props[ReplyingForwardTarget], name = "reply-forwarding-target")
|
|
|
|
|
val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target)), name = "direct-producer-test-2-forwarder-failure")
|
2012-03-01 17:32:10 +01:00
|
|
|
val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123"))
|
2012-05-11 21:37:40 +02:00
|
|
|
|
2012-08-14 08:04:24 +02:00
|
|
|
filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) {
|
2012-11-24 18:08:34 +01:00
|
|
|
producer.tell(message, testActor)
|
|
|
|
|
expectMsgPF(timeoutDuration) {
|
|
|
|
|
case Failure(e: AkkaCamelException) ⇒
|
|
|
|
|
e.getMessage must be("failure")
|
|
|
|
|
e.headers must be(Map(CamelMessage.MessageExchangeId -> "123", "test" -> "failure"))
|
|
|
|
|
}
|
2012-01-19 14:38:44 +00:00
|
|
|
}
|
2012-09-26 12:39:15 +02:00
|
|
|
stopGracefully(target, producer)
|
2011-05-23 11:37:56 -04:00
|
|
|
}
|
|
|
|
|
|
2012-03-01 17:32:10 +01:00
|
|
|
"produce message, forward normal response to a producing target actor and produce response to direct:forward-test-1" in {
|
2012-09-03 12:08:46 +02:00
|
|
|
val target = system.actorOf(Props[ProducingForwardTarget], name = "producer-forwarding-target")
|
|
|
|
|
val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target)), name = "direct-producer-test-2-forwarder-to-producing-target")
|
2011-05-23 11:37:56 -04:00
|
|
|
mockEndpoint.expectedBodiesReceived("received test")
|
2012-03-01 17:32:10 +01:00
|
|
|
producer.tell(CamelMessage("test", Map()), producer)
|
2012-01-19 14:38:44 +00:00
|
|
|
mockEndpoint.assertIsSatisfied()
|
2012-09-26 12:39:15 +02:00
|
|
|
stopGracefully(target, producer)
|
2011-05-23 11:37:56 -04:00
|
|
|
}
|
|
|
|
|
|
2012-03-01 17:32:10 +01:00
|
|
|
"produce message, forward failure response to a producing target actor and produce response to direct:forward-test-1" in {
|
2012-09-03 12:08:46 +02:00
|
|
|
val target = system.actorOf(Props[ProducingForwardTarget], name = "producer-forwarding-target-failure")
|
|
|
|
|
val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target)), name = "direct-producer-test-2-forward-failure")
|
2012-08-14 08:04:24 +02:00
|
|
|
filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) {
|
|
|
|
|
mockEndpoint.expectedMessageCount(1)
|
|
|
|
|
mockEndpoint.message(0).body().isInstanceOf(classOf[akka.actor.Status.Failure])
|
|
|
|
|
producer.tell(CamelMessage("fail", Map()), producer)
|
|
|
|
|
mockEndpoint.assertIsSatisfied()
|
|
|
|
|
}
|
2012-09-26 12:39:15 +02:00
|
|
|
stopGracefully(target, producer)
|
2011-05-23 11:37:56 -04:00
|
|
|
}
|
|
|
|
|
|
2012-03-01 17:32:10 +01:00
|
|
|
"produce message, forward normal response from direct:producer-test-3 to a replying target actor and receive response" in {
|
2012-09-03 12:08:46 +02:00
|
|
|
val target = system.actorOf(Props[ReplyingForwardTarget], name = "reply-forwarding-target")
|
|
|
|
|
val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target)), name = "direct-producer-test-3-to-replying-actor")
|
2012-03-01 17:32:10 +01:00
|
|
|
val message = CamelMessage("test", Map(CamelMessage.MessageExchangeId -> "123"))
|
2011-05-23 11:37:56 -04:00
|
|
|
|
2012-11-24 18:08:34 +01:00
|
|
|
producer.tell(message, testActor)
|
|
|
|
|
expectMsg(CamelMessage("received test", Map(CamelMessage.MessageExchangeId -> "123", "test" -> "result")))
|
2012-09-26 12:39:15 +02:00
|
|
|
stopGracefully(target, producer)
|
2011-05-23 11:37:56 -04:00
|
|
|
}
|
|
|
|
|
|
2012-03-01 17:32:10 +01:00
|
|
|
"produce message, forward failure response from direct:producer-test-3 to a replying target actor and receive response" in {
|
2012-09-03 12:08:46 +02:00
|
|
|
val target = system.actorOf(Props[ReplyingForwardTarget], name = "reply-forwarding-target")
|
|
|
|
|
val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target)), name = "direct-producer-test-3-forward-failure")
|
2011-05-23 11:37:56 -04:00
|
|
|
|
2012-03-01 17:32:10 +01:00
|
|
|
val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123"))
|
2012-08-14 08:04:24 +02:00
|
|
|
filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) {
|
2012-11-24 18:08:34 +01:00
|
|
|
producer.tell(message, testActor)
|
|
|
|
|
expectMsgPF(timeoutDuration) {
|
|
|
|
|
case Failure(e: AkkaCamelException) ⇒
|
|
|
|
|
e.getMessage must be("failure")
|
|
|
|
|
e.headers must be(Map(CamelMessage.MessageExchangeId -> "123", "test" -> "failure"))
|
|
|
|
|
}
|
2012-01-19 14:38:44 +00:00
|
|
|
}
|
2012-09-26 12:39:15 +02:00
|
|
|
stopGracefully(target, producer)
|
2011-05-23 11:37:56 -04:00
|
|
|
}
|
|
|
|
|
|
2012-03-01 17:32:10 +01:00
|
|
|
"produce message, forward normal response from direct:producer-test-3 to a producing target actor and produce response to direct:forward-test-1" in {
|
2012-09-03 12:08:46 +02:00
|
|
|
val target = system.actorOf(Props[ProducingForwardTarget], "producing-forward-target-normal")
|
|
|
|
|
val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target)), name = "direct-producer-test-3-forward-normal")
|
2011-05-23 11:37:56 -04:00
|
|
|
mockEndpoint.expectedBodiesReceived("received test")
|
2012-03-01 17:32:10 +01:00
|
|
|
producer.tell(CamelMessage("test", Map()), producer)
|
2012-01-19 14:38:44 +00:00
|
|
|
mockEndpoint.assertIsSatisfied()
|
2012-09-03 12:08:46 +02:00
|
|
|
system.stop(target)
|
|
|
|
|
system.stop(producer)
|
2011-05-23 11:37:56 -04:00
|
|
|
}
|
|
|
|
|
|
2012-03-01 17:32:10 +01:00
|
|
|
"produce message, forward failure response from direct:producer-test-3 to a producing target actor and produce response to direct:forward-test-1" in {
|
2012-09-03 12:08:46 +02:00
|
|
|
val target = system.actorOf(Props[ProducingForwardTarget], "producing-forward-target-failure")
|
|
|
|
|
val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target)), name = "direct-producer-test-3-forward-failure-producing-target")
|
2012-08-14 08:04:24 +02:00
|
|
|
filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) {
|
|
|
|
|
mockEndpoint.expectedMessageCount(1)
|
|
|
|
|
mockEndpoint.message(0).body().isInstanceOf(classOf[akka.actor.Status.Failure])
|
|
|
|
|
producer.tell(CamelMessage("fail", Map()), producer)
|
|
|
|
|
mockEndpoint.assertIsSatisfied()
|
|
|
|
|
}
|
2012-09-26 12:39:15 +02:00
|
|
|
stopGracefully(target, producer)
|
2011-05-23 11:37:56 -04:00
|
|
|
}
|
2012-11-24 18:08:34 +01:00
|
|
|
|
|
|
|
|
"keep producing messages after error" in {
|
|
|
|
|
import TestSupport._
|
|
|
|
|
val consumer = start(new IntermittentErrorConsumer("direct:intermittentTest-1"), "intermittentTest-error-consumer")
|
|
|
|
|
val producer = start(new SimpleProducer("direct:intermittentTest-1"), "intermittentTest-producer")
|
|
|
|
|
filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) {
|
|
|
|
|
val futureFailed = producer.tell("fail", testActor)
|
|
|
|
|
expectMsgPF(timeoutDuration) {
|
|
|
|
|
case Failure(e) ⇒
|
|
|
|
|
e.getMessage must be("fail")
|
|
|
|
|
}
|
|
|
|
|
producer.tell("OK", testActor)
|
|
|
|
|
expectMsg("OK")
|
|
|
|
|
}
|
|
|
|
|
stop(consumer)
|
|
|
|
|
stop(producer)
|
|
|
|
|
}
|
2012-12-07 16:31:44 +01:00
|
|
|
"be able to transform outgoing messages and have a valid sender reference" in {
|
|
|
|
|
import TestSupport._
|
|
|
|
|
filterEvents(EventFilter[Exception](occurrences = 1)) {
|
|
|
|
|
val producerSupervisor = system.actorOf(Props(new ProducerSupervisor(Props(new ChildProducer("mock:mock", true)))), "ignore-deadletter-sender-ref-test")
|
|
|
|
|
mockEndpoint.reset()
|
|
|
|
|
producerSupervisor.tell(CamelMessage("test", Map()), testActor)
|
|
|
|
|
producerSupervisor.tell(CamelMessage("err", Map()), testActor)
|
|
|
|
|
mockEndpoint.expectedMessageCount(1)
|
|
|
|
|
mockEndpoint.expectedBodiesReceived("TEST")
|
|
|
|
|
expectMsg("TEST")
|
|
|
|
|
system.stop(producerSupervisor)
|
|
|
|
|
}
|
|
|
|
|
}
|
2011-05-23 11:37:56 -04:00
|
|
|
}
|
|
|
|
|
|
2012-01-19 14:38:44 +00:00
|
|
|
private def mockEndpoint = camel.context.getEndpoint("mock:mock", classOf[MockEndpoint])
|
2012-09-26 12:39:15 +02:00
|
|
|
|
|
|
|
|
def stopGracefully(actors: ActorRef*)(implicit timeout: Timeout) {
|
|
|
|
|
val deadline = timeout.duration.fromNow
|
|
|
|
|
for (a ← actors)
|
2012-10-15 17:17:54 +02:00
|
|
|
Await.result(gracefulStop(a, deadline.timeLeft), deadline.timeLeft) must be === true
|
2012-09-26 12:39:15 +02:00
|
|
|
}
|
2011-05-23 11:37:56 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
object ProducerFeatureTest {
|
2012-12-07 16:31:44 +01:00
|
|
|
class ProducerSupervisor(childProps: Props) extends Actor {
|
|
|
|
|
override def supervisorStrategy = SupervisorStrategy.stoppingStrategy
|
|
|
|
|
val child = context.actorOf(childProps, "producer-supervisor-child")
|
|
|
|
|
val duration = 10 seconds
|
|
|
|
|
implicit val timeout = Timeout(duration)
|
|
|
|
|
implicit val ec = context.system.dispatcher
|
|
|
|
|
Await.ready(CamelExtension(context.system).activationFutureFor(child), timeout.duration)
|
|
|
|
|
def receive = {
|
|
|
|
|
case msg: CamelMessage ⇒
|
|
|
|
|
child forward (msg)
|
|
|
|
|
case (aref: ActorRef, msg: String) ⇒
|
|
|
|
|
aref ! msg
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
class ChildProducer(uri: String, upper: Boolean = false) extends Actor with Producer {
|
|
|
|
|
override def oneway = true
|
|
|
|
|
|
|
|
|
|
var lastSender: Option[ActorRef] = None
|
|
|
|
|
var lastMessage: Option[String] = None
|
|
|
|
|
def endpointUri = uri
|
|
|
|
|
|
|
|
|
|
override def transformOutgoingMessage(msg: Any) = msg match {
|
|
|
|
|
case msg: CamelMessage ⇒ if (upper) msg.mapBody {
|
|
|
|
|
body: String ⇒
|
|
|
|
|
if (body == "err") throw new Exception("Crash!")
|
|
|
|
|
val upperMsg = body.toUpperCase
|
|
|
|
|
lastSender = Some(sender)
|
|
|
|
|
lastMessage = Some(upperMsg)
|
|
|
|
|
}
|
|
|
|
|
else msg
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def postStop() {
|
|
|
|
|
for (msg ← lastMessage; aref ← lastSender) context.parent ! (aref, msg)
|
|
|
|
|
super.postStop()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2011-05-23 11:37:56 -04:00
|
|
|
class TestProducer(uri: String, upper: Boolean = false) extends Actor with Producer {
|
|
|
|
|
def endpointUri = uri
|
2012-01-19 14:38:44 +00:00
|
|
|
|
2012-11-24 18:08:34 +01:00
|
|
|
override def preRestart(reason: Throwable, message: Option[Any]) {
|
|
|
|
|
//overriding on purpose so it doesn't try to deRegister and reRegister at restart,
|
|
|
|
|
// which would cause a deadletter message in the test output.
|
|
|
|
|
}
|
|
|
|
|
|
2012-04-06 11:13:59 +01:00
|
|
|
override protected def transformOutgoingMessage(msg: Any) = msg match {
|
2012-03-01 17:32:10 +01:00
|
|
|
case msg: CamelMessage ⇒ if (upper) msg.mapBody {
|
2012-01-19 14:38:44 +00:00
|
|
|
body: String ⇒ body.toUpperCase
|
|
|
|
|
}
|
|
|
|
|
else msg
|
2011-05-23 11:37:56 -04:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
class TestForwarder(uri: String, target: ActorRef) extends Actor with Producer {
|
|
|
|
|
def endpointUri = uri
|
2012-01-19 14:38:44 +00:00
|
|
|
|
2012-05-11 09:46:49 +02:00
|
|
|
override def headersToCopy = Set(CamelMessage.MessageExchangeId, "test")
|
|
|
|
|
|
|
|
|
|
override def routeResponse(msg: Any): Unit = target forward msg
|
2011-05-23 11:37:56 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
class TestResponder extends Actor {
|
2012-05-21 13:47:48 +02:00
|
|
|
def receive = {
|
2012-03-01 17:32:10 +01:00
|
|
|
case msg: CamelMessage ⇒ msg.body match {
|
2012-05-11 09:46:49 +02:00
|
|
|
case "fail" ⇒ context.sender ! akka.actor.Status.Failure(new AkkaCamelException(new Exception("failure"), msg.headers))
|
2012-03-01 17:32:10 +01:00
|
|
|
case _ ⇒
|
2012-01-19 14:38:44 +00:00
|
|
|
context.sender ! (msg.mapBody {
|
|
|
|
|
body: String ⇒ "received %s" format body
|
|
|
|
|
})
|
2011-05-23 11:37:56 -04:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
class ReplyingForwardTarget extends Actor {
|
2012-05-21 13:47:48 +02:00
|
|
|
def receive = {
|
2012-03-01 17:32:10 +01:00
|
|
|
case msg: CamelMessage ⇒
|
2012-09-03 12:08:46 +02:00
|
|
|
context.sender ! (msg.copy(headers = msg.headers + ("test" -> "result")))
|
2012-05-11 09:46:49 +02:00
|
|
|
case msg: akka.actor.Status.Failure ⇒
|
|
|
|
|
msg.cause match {
|
|
|
|
|
case e: AkkaCamelException ⇒ context.sender ! Status.Failure(new AkkaCamelException(e, e.headers + ("test" -> "failure")))
|
|
|
|
|
}
|
2011-05-23 11:37:56 -04:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
class ProducingForwardTarget extends Actor with Producer with Oneway {
|
|
|
|
|
def endpointUri = "direct:forward-test-1"
|
|
|
|
|
}
|
|
|
|
|
|
2012-01-19 14:38:44 +00:00
|
|
|
class TestRoute(system: ActorSystem) extends RouteBuilder {
|
|
|
|
|
val responder = system.actorOf(Props[TestResponder], name = "TestResponder")
|
|
|
|
|
|
2011-05-23 11:37:56 -04:00
|
|
|
def configure {
|
|
|
|
|
from("direct:forward-test-1").to("mock:mock")
|
|
|
|
|
// for one-way messaging tests
|
|
|
|
|
from("direct:producer-test-1").to("mock:mock")
|
|
|
|
|
// for two-way messaging tests (async)
|
2012-01-19 14:38:44 +00:00
|
|
|
from("direct:producer-test-3").to(responder)
|
2011-05-23 11:37:56 -04:00
|
|
|
// for two-way messaging tests (sync)
|
|
|
|
|
from("direct:producer-test-2").process(new Processor() {
|
|
|
|
|
def process(exchange: Exchange) = {
|
|
|
|
|
exchange.getIn.getBody match {
|
|
|
|
|
case "fail" ⇒ throw new Exception("failure")
|
|
|
|
|
case body ⇒ exchange.getOut.setBody("received %s" format body)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
}
|
2012-01-19 14:38:44 +00:00
|
|
|
|
2012-11-24 18:08:34 +01:00
|
|
|
class SimpleProducer(override val endpointUri: String) extends Producer {
|
|
|
|
|
override protected def transformResponse(msg: Any) = msg match {
|
|
|
|
|
case m: CamelMessage ⇒ m.bodyAs[String]
|
|
|
|
|
case m: Any ⇒ m
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
class IntermittentErrorConsumer(override val endpointUri: String) extends Consumer {
|
|
|
|
|
def receive = {
|
|
|
|
|
case msg: CamelMessage if msg.bodyAs[String] == "fail" ⇒ sender ! Failure(new Exception("fail"))
|
|
|
|
|
case msg: CamelMessage ⇒ sender ! msg
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2011-05-23 11:37:56 -04:00
|
|
|
}
|