diff --git a/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala b/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala index 8463c185ae..6fd865bfd1 100644 --- a/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala @@ -44,15 +44,15 @@ class ProducerFeatureTest extends TestKit(ActorSystem("test", AkkaSpec.testConf) override protected def afterEach { mockEndpoint.reset() } "A Producer on a sync Camel route" must { - "produce a message and receive normal response" in { - val producer = system.actorOf(Props(new TestProducer("direct:producer-test-2", true)), name = "direct-producer-2") + + "01 produce a message and receive normal response" in { + val producer = system.actorOf(Props(new TestProducer("direct:producer-test-2", true)), name = "01-direct-producer-2") val message = CamelMessage("test", Map(CamelMessage.MessageExchangeId -> "123")) producer.tell(message, testActor) expectMsg(CamelMessage("received TEST", Map(CamelMessage.MessageExchangeId -> "123"))) - stopGracefully(producer) } - "produce a message and receive failure response" in { + "02 produce a message and receive failure response" in { val latch = TestLatch() var deadActor: Option[ActorRef] = None val supervisor = system.actorOf(Props(new Actor { @@ -70,7 +70,7 @@ class ProducerFeatureTest extends TestKit(ActorSystem("test", AkkaSpec.testConf) override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) { case _: AkkaCamelException ⇒ Stop } - }), name = "prod-anonymous-supervisor") + }), name = "02-prod-anonymous-supervisor") supervisor.tell(Props(new TestProducer("direct:producer-test-2")), testActor) val producer = receiveOne(timeoutDuration).asInstanceOf[ActorRef] @@ -85,40 +85,36 @@ class ProducerFeatureTest extends TestKit(ActorSystem("test", AkkaSpec.testConf) } Await.ready(latch, timeoutDuration) deadActor must be(Some(producer)) - stopGracefully(producer) } - "produce a message oneway" in { - val producer = system.actorOf(Props(new TestProducer("direct:producer-test-1", true) with Oneway), name = "direct-producer-1-oneway") + "03 produce a message oneway" in { + val producer = system.actorOf(Props(new TestProducer("direct:producer-test-1", true) with Oneway), name = "03-direct-producer-1-oneway") mockEndpoint.expectedBodiesReceived("TEST") producer ! CamelMessage("test", Map()) mockEndpoint.assertIsSatisfied() - stopGracefully(producer) } - "produces message twoway without sender reference" in { + "04 produces message twoway without sender reference" in { // this test causes a dead letter which can be ignored. The producer is two-way but a oneway tell is used // to communicate with it and the response is ignored, which ends up in a dead letter - val producer = system.actorOf(Props(new TestProducer("direct:producer-test-1")), name = "ignore-this-deadletter-direct-producer-test-no-sender") + val producer = system.actorOf(Props(new TestProducer("direct:producer-test-1")), name = "04-ignore-this-deadletter-direct-producer-test-no-sender") mockEndpoint.expectedBodiesReceived("test") producer ! CamelMessage("test", Map()) mockEndpoint.assertIsSatisfied() - stopGracefully(producer) } } "A Producer on an async Camel route" must { - "produce message to direct:producer-test-3 and receive normal response" in { - val producer = system.actorOf(Props(new TestProducer("direct:producer-test-3")), name = "direct-producer-test-3") + "10 produce message to direct:producer-test-3 and receive normal response" in { + val producer = system.actorOf(Props(new TestProducer("direct:producer-test-3")), name = "10-direct-producer-test-3") val message = CamelMessage("test", Map(CamelMessage.MessageExchangeId -> "123")) producer.tell(message, testActor) expectMsg(CamelMessage("received test", Map(CamelMessage.MessageExchangeId -> "123"))) - stopGracefully(producer) } - "produce message to direct:producer-test-3 and receive failure response" in { - val producer = system.actorOf(Props(new TestProducer("direct:producer-test-3")), name = "direct-producer-test-3-receive-failure") + "11 produce message to direct:producer-test-3 and receive failure response" in { + val producer = system.actorOf(Props(new TestProducer("direct:producer-test-3")), name = "11-direct-producer-test-3-receive-failure") val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123")) filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) { @@ -129,21 +125,19 @@ class ProducerFeatureTest extends TestKit(ActorSystem("test", AkkaSpec.testConf) e.headers must be(Map(CamelMessage.MessageExchangeId -> "123")) } } - stopGracefully(producer) } - "produce message, forward normal response of direct:producer-test-2 to a replying target actor and receive response" in { - val target = system.actorOf(Props[ReplyingForwardTarget], name = "reply-forwarding-target") - val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target)), name = "direct-producer-test-2-forwarder") + "12 produce message, forward normal response of direct:producer-test-2 to a replying target actor and receive response" in { + val target = system.actorOf(Props[ReplyingForwardTarget], name = "12-reply-forwarding-target") + val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target)), name = "12-direct-producer-test-2-forwarder") val message = CamelMessage("test", Map(CamelMessage.MessageExchangeId -> "123")) producer.tell(message, testActor) expectMsg(CamelMessage("received test", Map(CamelMessage.MessageExchangeId -> "123", "test" -> "result"))) - stopGracefully(target, producer) } - "produce message, forward failure response of direct:producer-test-2 to a replying target actor and receive response" in { - val target = system.actorOf(Props[ReplyingForwardTarget], name = "reply-forwarding-target") - val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target)), name = "direct-producer-test-2-forwarder-failure") + "13 produce message, forward failure response of direct:producer-test-2 to a replying target actor and receive response" in { + val target = system.actorOf(Props[ReplyingForwardTarget], name = "13-reply-forwarding-target") + val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target)), name = "13-direct-producer-test-2-forwarder-failure") val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123")) filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) { @@ -154,43 +148,39 @@ class ProducerFeatureTest extends TestKit(ActorSystem("test", AkkaSpec.testConf) e.headers must be(Map(CamelMessage.MessageExchangeId -> "123", "test" -> "failure")) } } - stopGracefully(target, producer) } - "produce message, forward normal response to a producing target actor and produce response to direct:forward-test-1" in { - val target = system.actorOf(Props[ProducingForwardTarget], name = "producer-forwarding-target") - val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target)), name = "direct-producer-test-2-forwarder-to-producing-target") + "14 produce message, forward normal response to a producing target actor and produce response to direct:forward-test-1" in { + val target = system.actorOf(Props[ProducingForwardTarget], name = "14-producer-forwarding-target") + val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target)), name = "14-direct-producer-test-2-forwarder-to-producing-target") mockEndpoint.expectedBodiesReceived("received test") producer.tell(CamelMessage("test", Map()), producer) mockEndpoint.assertIsSatisfied() - stopGracefully(target, producer) } - "produce message, forward failure response to a producing target actor and produce response to direct:forward-test-1" in { - val target = system.actorOf(Props[ProducingForwardTarget], name = "producer-forwarding-target-failure") - val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target)), name = "direct-producer-test-2-forward-failure") + "15 produce message, forward failure response to a producing target actor and produce response to direct:forward-test-1" in { + val target = system.actorOf(Props[ProducingForwardTarget], name = "15-producer-forwarding-target-failure") + val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target)), name = "15-direct-producer-test-2-forward-failure") filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) { mockEndpoint.expectedMessageCount(1) mockEndpoint.message(0).body().isInstanceOf(classOf[akka.actor.Status.Failure]) producer.tell(CamelMessage("fail", Map()), producer) mockEndpoint.assertIsSatisfied() } - stopGracefully(target, producer) } - "produce message, forward normal response from direct:producer-test-3 to a replying target actor and receive response" in { - val target = system.actorOf(Props[ReplyingForwardTarget], name = "reply-forwarding-target") - val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target)), name = "direct-producer-test-3-to-replying-actor") + "16 produce message, forward normal response from direct:producer-test-3 to a replying target actor and receive response" in { + val target = system.actorOf(Props[ReplyingForwardTarget], name = "16-reply-forwarding-target") + val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target)), name = "16-direct-producer-test-3-to-replying-actor") val message = CamelMessage("test", Map(CamelMessage.MessageExchangeId -> "123")) producer.tell(message, testActor) expectMsg(CamelMessage("received test", Map(CamelMessage.MessageExchangeId -> "123", "test" -> "result"))) - stopGracefully(target, producer) } - "produce message, forward failure response from direct:producer-test-3 to a replying target actor and receive response" in { - val target = system.actorOf(Props[ReplyingForwardTarget], name = "reply-forwarding-target") - val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target)), name = "direct-producer-test-3-forward-failure") + "17 produce message, forward failure response from direct:producer-test-3 to a replying target actor and receive response" in { + val target = system.actorOf(Props[ReplyingForwardTarget], name = "17-reply-forwarding-target") + val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target)), name = "17-direct-producer-test-3-forward-failure") val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123")) filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) { @@ -201,35 +191,31 @@ class ProducerFeatureTest extends TestKit(ActorSystem("test", AkkaSpec.testConf) e.headers must be(Map(CamelMessage.MessageExchangeId -> "123", "test" -> "failure")) } } - stopGracefully(target, producer) } - "produce message, forward normal response from direct:producer-test-3 to a producing target actor and produce response to direct:forward-test-1" in { - val target = system.actorOf(Props[ProducingForwardTarget], "producing-forward-target-normal") - val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target)), name = "direct-producer-test-3-forward-normal") + "18 produce message, forward normal response from direct:producer-test-3 to a producing target actor and produce response to direct:forward-test-1" in { + val target = system.actorOf(Props[ProducingForwardTarget], "18-producing-forward-target-normal") + val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target)), name = "18-direct-producer-test-3-forward-normal") mockEndpoint.expectedBodiesReceived("received test") producer.tell(CamelMessage("test", Map()), producer) mockEndpoint.assertIsSatisfied() - system.stop(target) - system.stop(producer) } - "produce message, forward failure response from direct:producer-test-3 to a producing target actor and produce response to direct:forward-test-1" in { - val target = system.actorOf(Props[ProducingForwardTarget], "producing-forward-target-failure") - val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target)), name = "direct-producer-test-3-forward-failure-producing-target") + "19 produce message, forward failure response from direct:producer-test-3 to a producing target actor and produce response to direct:forward-test-1" in { + val target = system.actorOf(Props[ProducingForwardTarget], "19-producing-forward-target-failure") + val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target)), name = "19-direct-producer-test-3-forward-failure-producing-target") filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) { mockEndpoint.expectedMessageCount(1) mockEndpoint.message(0).body().isInstanceOf(classOf[akka.actor.Status.Failure]) producer.tell(CamelMessage("fail", Map()), producer) mockEndpoint.assertIsSatisfied() } - stopGracefully(target, producer) } - "keep producing messages after error" in { + "20 keep producing messages after error" in { import TestSupport._ - val consumer = start(new IntermittentErrorConsumer("direct:intermittentTest-1"), "intermittentTest-error-consumer") - val producer = start(new SimpleProducer("direct:intermittentTest-1"), "intermittentTest-producer") + val consumer = start(new IntermittentErrorConsumer("direct:intermittentTest-1"), "20-intermittentTest-error-consumer") + val producer = start(new SimpleProducer("direct:intermittentTest-1"), "20-intermittentTest-producer") filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) { val futureFailed = producer.tell("fail", testActor) expectMsgPF(timeoutDuration) { @@ -242,31 +228,26 @@ class ProducerFeatureTest extends TestKit(ActorSystem("test", AkkaSpec.testConf) stop(consumer) stop(producer) } - "be able to transform outgoing messages and have a valid sender reference" in { + + "21 be able to transform outgoing messages and have a valid sender reference" in { import TestSupport._ filterEvents(EventFilter[Exception](occurrences = 1)) { - val producerSupervisor = system.actorOf(Props(new ProducerSupervisor(Props(new ChildProducer("mock:mock", true)))), "ignore-deadletter-sender-ref-test") + val producerSupervisor = system.actorOf(Props(new ProducerSupervisor(Props(new ChildProducer("mock:mock", true)))), "21-ignore-deadletter-sender-ref-test") mockEndpoint.reset() producerSupervisor.tell(CamelMessage("test", Map()), testActor) producerSupervisor.tell(CamelMessage("err", Map()), testActor) mockEndpoint.expectedMessageCount(1) mockEndpoint.expectedBodiesReceived("TEST") expectMsg("TEST") - system.stop(producerSupervisor) } } } private def mockEndpoint = camel.context.getEndpoint("mock:mock", classOf[MockEndpoint]) - - def stopGracefully(actors: ActorRef*)(implicit timeout: Timeout) { - val deadline = timeout.duration.fromNow - for (a ← actors) - Await.result(gracefulStop(a, deadline.timeLeft), deadline.timeLeft) must be === true - } } object ProducerFeatureTest { + class ProducerSupervisor(childProps: Props) extends Actor { override def supervisorStrategy = SupervisorStrategy.stoppingStrategy val child = context.actorOf(childProps, "producer-supervisor-child") @@ -281,6 +262,7 @@ object ProducerFeatureTest { aref ! msg } } + class ChildProducer(uri: String, upper: Boolean = false) extends Actor with Producer { override def oneway = true