ensure unique actor names in camel ProducerFeatureTest, see #3026
This commit is contained in:
parent
9915e3e263
commit
5f25169095
1 changed files with 45 additions and 63 deletions
|
|
@ -44,15 +44,15 @@ class ProducerFeatureTest extends TestKit(ActorSystem("test", AkkaSpec.testConf)
|
||||||
override protected def afterEach { mockEndpoint.reset() }
|
override protected def afterEach { mockEndpoint.reset() }
|
||||||
|
|
||||||
"A Producer on a sync Camel route" must {
|
"A Producer on a sync Camel route" must {
|
||||||
"produce a message and receive normal response" in {
|
|
||||||
val producer = system.actorOf(Props(new TestProducer("direct:producer-test-2", true)), name = "direct-producer-2")
|
"01 produce a message and receive normal response" in {
|
||||||
|
val producer = system.actorOf(Props(new TestProducer("direct:producer-test-2", true)), name = "01-direct-producer-2")
|
||||||
val message = CamelMessage("test", Map(CamelMessage.MessageExchangeId -> "123"))
|
val message = CamelMessage("test", Map(CamelMessage.MessageExchangeId -> "123"))
|
||||||
producer.tell(message, testActor)
|
producer.tell(message, testActor)
|
||||||
expectMsg(CamelMessage("received TEST", Map(CamelMessage.MessageExchangeId -> "123")))
|
expectMsg(CamelMessage("received TEST", Map(CamelMessage.MessageExchangeId -> "123")))
|
||||||
stopGracefully(producer)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
"produce a message and receive failure response" in {
|
"02 produce a message and receive failure response" in {
|
||||||
val latch = TestLatch()
|
val latch = TestLatch()
|
||||||
var deadActor: Option[ActorRef] = None
|
var deadActor: Option[ActorRef] = None
|
||||||
val supervisor = system.actorOf(Props(new Actor {
|
val supervisor = system.actorOf(Props(new Actor {
|
||||||
|
|
@ -70,7 +70,7 @@ class ProducerFeatureTest extends TestKit(ActorSystem("test", AkkaSpec.testConf)
|
||||||
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
|
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
|
||||||
case _: AkkaCamelException ⇒ Stop
|
case _: AkkaCamelException ⇒ Stop
|
||||||
}
|
}
|
||||||
}), name = "prod-anonymous-supervisor")
|
}), name = "02-prod-anonymous-supervisor")
|
||||||
|
|
||||||
supervisor.tell(Props(new TestProducer("direct:producer-test-2")), testActor)
|
supervisor.tell(Props(new TestProducer("direct:producer-test-2")), testActor)
|
||||||
val producer = receiveOne(timeoutDuration).asInstanceOf[ActorRef]
|
val producer = receiveOne(timeoutDuration).asInstanceOf[ActorRef]
|
||||||
|
|
@ -85,40 +85,36 @@ class ProducerFeatureTest extends TestKit(ActorSystem("test", AkkaSpec.testConf)
|
||||||
}
|
}
|
||||||
Await.ready(latch, timeoutDuration)
|
Await.ready(latch, timeoutDuration)
|
||||||
deadActor must be(Some(producer))
|
deadActor must be(Some(producer))
|
||||||
stopGracefully(producer)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
"produce a message oneway" in {
|
"03 produce a message oneway" in {
|
||||||
val producer = system.actorOf(Props(new TestProducer("direct:producer-test-1", true) with Oneway), name = "direct-producer-1-oneway")
|
val producer = system.actorOf(Props(new TestProducer("direct:producer-test-1", true) with Oneway), name = "03-direct-producer-1-oneway")
|
||||||
mockEndpoint.expectedBodiesReceived("TEST")
|
mockEndpoint.expectedBodiesReceived("TEST")
|
||||||
producer ! CamelMessage("test", Map())
|
producer ! CamelMessage("test", Map())
|
||||||
mockEndpoint.assertIsSatisfied()
|
mockEndpoint.assertIsSatisfied()
|
||||||
stopGracefully(producer)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
"produces message twoway without sender reference" in {
|
"04 produces message twoway without sender reference" in {
|
||||||
// this test causes a dead letter which can be ignored. The producer is two-way but a oneway tell is used
|
// this test causes a dead letter which can be ignored. The producer is two-way but a oneway tell is used
|
||||||
// to communicate with it and the response is ignored, which ends up in a dead letter
|
// to communicate with it and the response is ignored, which ends up in a dead letter
|
||||||
val producer = system.actorOf(Props(new TestProducer("direct:producer-test-1")), name = "ignore-this-deadletter-direct-producer-test-no-sender")
|
val producer = system.actorOf(Props(new TestProducer("direct:producer-test-1")), name = "04-ignore-this-deadletter-direct-producer-test-no-sender")
|
||||||
mockEndpoint.expectedBodiesReceived("test")
|
mockEndpoint.expectedBodiesReceived("test")
|
||||||
producer ! CamelMessage("test", Map())
|
producer ! CamelMessage("test", Map())
|
||||||
mockEndpoint.assertIsSatisfied()
|
mockEndpoint.assertIsSatisfied()
|
||||||
stopGracefully(producer)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
"A Producer on an async Camel route" must {
|
"A Producer on an async Camel route" must {
|
||||||
|
|
||||||
"produce message to direct:producer-test-3 and receive normal response" in {
|
"10 produce message to direct:producer-test-3 and receive normal response" in {
|
||||||
val producer = system.actorOf(Props(new TestProducer("direct:producer-test-3")), name = "direct-producer-test-3")
|
val producer = system.actorOf(Props(new TestProducer("direct:producer-test-3")), name = "10-direct-producer-test-3")
|
||||||
val message = CamelMessage("test", Map(CamelMessage.MessageExchangeId -> "123"))
|
val message = CamelMessage("test", Map(CamelMessage.MessageExchangeId -> "123"))
|
||||||
producer.tell(message, testActor)
|
producer.tell(message, testActor)
|
||||||
expectMsg(CamelMessage("received test", Map(CamelMessage.MessageExchangeId -> "123")))
|
expectMsg(CamelMessage("received test", Map(CamelMessage.MessageExchangeId -> "123")))
|
||||||
stopGracefully(producer)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
"produce message to direct:producer-test-3 and receive failure response" in {
|
"11 produce message to direct:producer-test-3 and receive failure response" in {
|
||||||
val producer = system.actorOf(Props(new TestProducer("direct:producer-test-3")), name = "direct-producer-test-3-receive-failure")
|
val producer = system.actorOf(Props(new TestProducer("direct:producer-test-3")), name = "11-direct-producer-test-3-receive-failure")
|
||||||
val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123"))
|
val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123"))
|
||||||
|
|
||||||
filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) {
|
filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) {
|
||||||
|
|
@ -129,21 +125,19 @@ class ProducerFeatureTest extends TestKit(ActorSystem("test", AkkaSpec.testConf)
|
||||||
e.headers must be(Map(CamelMessage.MessageExchangeId -> "123"))
|
e.headers must be(Map(CamelMessage.MessageExchangeId -> "123"))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
stopGracefully(producer)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
"produce message, forward normal response of direct:producer-test-2 to a replying target actor and receive response" in {
|
"12 produce message, forward normal response of direct:producer-test-2 to a replying target actor and receive response" in {
|
||||||
val target = system.actorOf(Props[ReplyingForwardTarget], name = "reply-forwarding-target")
|
val target = system.actorOf(Props[ReplyingForwardTarget], name = "12-reply-forwarding-target")
|
||||||
val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target)), name = "direct-producer-test-2-forwarder")
|
val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target)), name = "12-direct-producer-test-2-forwarder")
|
||||||
val message = CamelMessage("test", Map(CamelMessage.MessageExchangeId -> "123"))
|
val message = CamelMessage("test", Map(CamelMessage.MessageExchangeId -> "123"))
|
||||||
producer.tell(message, testActor)
|
producer.tell(message, testActor)
|
||||||
expectMsg(CamelMessage("received test", Map(CamelMessage.MessageExchangeId -> "123", "test" -> "result")))
|
expectMsg(CamelMessage("received test", Map(CamelMessage.MessageExchangeId -> "123", "test" -> "result")))
|
||||||
stopGracefully(target, producer)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
"produce message, forward failure response of direct:producer-test-2 to a replying target actor and receive response" in {
|
"13 produce message, forward failure response of direct:producer-test-2 to a replying target actor and receive response" in {
|
||||||
val target = system.actorOf(Props[ReplyingForwardTarget], name = "reply-forwarding-target")
|
val target = system.actorOf(Props[ReplyingForwardTarget], name = "13-reply-forwarding-target")
|
||||||
val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target)), name = "direct-producer-test-2-forwarder-failure")
|
val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target)), name = "13-direct-producer-test-2-forwarder-failure")
|
||||||
val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123"))
|
val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123"))
|
||||||
|
|
||||||
filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) {
|
filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) {
|
||||||
|
|
@ -154,43 +148,39 @@ class ProducerFeatureTest extends TestKit(ActorSystem("test", AkkaSpec.testConf)
|
||||||
e.headers must be(Map(CamelMessage.MessageExchangeId -> "123", "test" -> "failure"))
|
e.headers must be(Map(CamelMessage.MessageExchangeId -> "123", "test" -> "failure"))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
stopGracefully(target, producer)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
"produce message, forward normal response to a producing target actor and produce response to direct:forward-test-1" in {
|
"14 produce message, forward normal response to a producing target actor and produce response to direct:forward-test-1" in {
|
||||||
val target = system.actorOf(Props[ProducingForwardTarget], name = "producer-forwarding-target")
|
val target = system.actorOf(Props[ProducingForwardTarget], name = "14-producer-forwarding-target")
|
||||||
val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target)), name = "direct-producer-test-2-forwarder-to-producing-target")
|
val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target)), name = "14-direct-producer-test-2-forwarder-to-producing-target")
|
||||||
mockEndpoint.expectedBodiesReceived("received test")
|
mockEndpoint.expectedBodiesReceived("received test")
|
||||||
producer.tell(CamelMessage("test", Map()), producer)
|
producer.tell(CamelMessage("test", Map()), producer)
|
||||||
mockEndpoint.assertIsSatisfied()
|
mockEndpoint.assertIsSatisfied()
|
||||||
stopGracefully(target, producer)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
"produce message, forward failure response to a producing target actor and produce response to direct:forward-test-1" in {
|
"15 produce message, forward failure response to a producing target actor and produce response to direct:forward-test-1" in {
|
||||||
val target = system.actorOf(Props[ProducingForwardTarget], name = "producer-forwarding-target-failure")
|
val target = system.actorOf(Props[ProducingForwardTarget], name = "15-producer-forwarding-target-failure")
|
||||||
val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target)), name = "direct-producer-test-2-forward-failure")
|
val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target)), name = "15-direct-producer-test-2-forward-failure")
|
||||||
filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) {
|
filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) {
|
||||||
mockEndpoint.expectedMessageCount(1)
|
mockEndpoint.expectedMessageCount(1)
|
||||||
mockEndpoint.message(0).body().isInstanceOf(classOf[akka.actor.Status.Failure])
|
mockEndpoint.message(0).body().isInstanceOf(classOf[akka.actor.Status.Failure])
|
||||||
producer.tell(CamelMessage("fail", Map()), producer)
|
producer.tell(CamelMessage("fail", Map()), producer)
|
||||||
mockEndpoint.assertIsSatisfied()
|
mockEndpoint.assertIsSatisfied()
|
||||||
}
|
}
|
||||||
stopGracefully(target, producer)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
"produce message, forward normal response from direct:producer-test-3 to a replying target actor and receive response" in {
|
"16 produce message, forward normal response from direct:producer-test-3 to a replying target actor and receive response" in {
|
||||||
val target = system.actorOf(Props[ReplyingForwardTarget], name = "reply-forwarding-target")
|
val target = system.actorOf(Props[ReplyingForwardTarget], name = "16-reply-forwarding-target")
|
||||||
val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target)), name = "direct-producer-test-3-to-replying-actor")
|
val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target)), name = "16-direct-producer-test-3-to-replying-actor")
|
||||||
val message = CamelMessage("test", Map(CamelMessage.MessageExchangeId -> "123"))
|
val message = CamelMessage("test", Map(CamelMessage.MessageExchangeId -> "123"))
|
||||||
|
|
||||||
producer.tell(message, testActor)
|
producer.tell(message, testActor)
|
||||||
expectMsg(CamelMessage("received test", Map(CamelMessage.MessageExchangeId -> "123", "test" -> "result")))
|
expectMsg(CamelMessage("received test", Map(CamelMessage.MessageExchangeId -> "123", "test" -> "result")))
|
||||||
stopGracefully(target, producer)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
"produce message, forward failure response from direct:producer-test-3 to a replying target actor and receive response" in {
|
"17 produce message, forward failure response from direct:producer-test-3 to a replying target actor and receive response" in {
|
||||||
val target = system.actorOf(Props[ReplyingForwardTarget], name = "reply-forwarding-target")
|
val target = system.actorOf(Props[ReplyingForwardTarget], name = "17-reply-forwarding-target")
|
||||||
val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target)), name = "direct-producer-test-3-forward-failure")
|
val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target)), name = "17-direct-producer-test-3-forward-failure")
|
||||||
|
|
||||||
val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123"))
|
val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123"))
|
||||||
filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) {
|
filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) {
|
||||||
|
|
@ -201,35 +191,31 @@ class ProducerFeatureTest extends TestKit(ActorSystem("test", AkkaSpec.testConf)
|
||||||
e.headers must be(Map(CamelMessage.MessageExchangeId -> "123", "test" -> "failure"))
|
e.headers must be(Map(CamelMessage.MessageExchangeId -> "123", "test" -> "failure"))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
stopGracefully(target, producer)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
"produce message, forward normal response from direct:producer-test-3 to a producing target actor and produce response to direct:forward-test-1" in {
|
"18 produce message, forward normal response from direct:producer-test-3 to a producing target actor and produce response to direct:forward-test-1" in {
|
||||||
val target = system.actorOf(Props[ProducingForwardTarget], "producing-forward-target-normal")
|
val target = system.actorOf(Props[ProducingForwardTarget], "18-producing-forward-target-normal")
|
||||||
val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target)), name = "direct-producer-test-3-forward-normal")
|
val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target)), name = "18-direct-producer-test-3-forward-normal")
|
||||||
mockEndpoint.expectedBodiesReceived("received test")
|
mockEndpoint.expectedBodiesReceived("received test")
|
||||||
producer.tell(CamelMessage("test", Map()), producer)
|
producer.tell(CamelMessage("test", Map()), producer)
|
||||||
mockEndpoint.assertIsSatisfied()
|
mockEndpoint.assertIsSatisfied()
|
||||||
system.stop(target)
|
|
||||||
system.stop(producer)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
"produce message, forward failure response from direct:producer-test-3 to a producing target actor and produce response to direct:forward-test-1" in {
|
"19 produce message, forward failure response from direct:producer-test-3 to a producing target actor and produce response to direct:forward-test-1" in {
|
||||||
val target = system.actorOf(Props[ProducingForwardTarget], "producing-forward-target-failure")
|
val target = system.actorOf(Props[ProducingForwardTarget], "19-producing-forward-target-failure")
|
||||||
val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target)), name = "direct-producer-test-3-forward-failure-producing-target")
|
val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target)), name = "19-direct-producer-test-3-forward-failure-producing-target")
|
||||||
filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) {
|
filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) {
|
||||||
mockEndpoint.expectedMessageCount(1)
|
mockEndpoint.expectedMessageCount(1)
|
||||||
mockEndpoint.message(0).body().isInstanceOf(classOf[akka.actor.Status.Failure])
|
mockEndpoint.message(0).body().isInstanceOf(classOf[akka.actor.Status.Failure])
|
||||||
producer.tell(CamelMessage("fail", Map()), producer)
|
producer.tell(CamelMessage("fail", Map()), producer)
|
||||||
mockEndpoint.assertIsSatisfied()
|
mockEndpoint.assertIsSatisfied()
|
||||||
}
|
}
|
||||||
stopGracefully(target, producer)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
"keep producing messages after error" in {
|
"20 keep producing messages after error" in {
|
||||||
import TestSupport._
|
import TestSupport._
|
||||||
val consumer = start(new IntermittentErrorConsumer("direct:intermittentTest-1"), "intermittentTest-error-consumer")
|
val consumer = start(new IntermittentErrorConsumer("direct:intermittentTest-1"), "20-intermittentTest-error-consumer")
|
||||||
val producer = start(new SimpleProducer("direct:intermittentTest-1"), "intermittentTest-producer")
|
val producer = start(new SimpleProducer("direct:intermittentTest-1"), "20-intermittentTest-producer")
|
||||||
filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) {
|
filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) {
|
||||||
val futureFailed = producer.tell("fail", testActor)
|
val futureFailed = producer.tell("fail", testActor)
|
||||||
expectMsgPF(timeoutDuration) {
|
expectMsgPF(timeoutDuration) {
|
||||||
|
|
@ -242,31 +228,26 @@ class ProducerFeatureTest extends TestKit(ActorSystem("test", AkkaSpec.testConf)
|
||||||
stop(consumer)
|
stop(consumer)
|
||||||
stop(producer)
|
stop(producer)
|
||||||
}
|
}
|
||||||
"be able to transform outgoing messages and have a valid sender reference" in {
|
|
||||||
|
"21 be able to transform outgoing messages and have a valid sender reference" in {
|
||||||
import TestSupport._
|
import TestSupport._
|
||||||
filterEvents(EventFilter[Exception](occurrences = 1)) {
|
filterEvents(EventFilter[Exception](occurrences = 1)) {
|
||||||
val producerSupervisor = system.actorOf(Props(new ProducerSupervisor(Props(new ChildProducer("mock:mock", true)))), "ignore-deadletter-sender-ref-test")
|
val producerSupervisor = system.actorOf(Props(new ProducerSupervisor(Props(new ChildProducer("mock:mock", true)))), "21-ignore-deadletter-sender-ref-test")
|
||||||
mockEndpoint.reset()
|
mockEndpoint.reset()
|
||||||
producerSupervisor.tell(CamelMessage("test", Map()), testActor)
|
producerSupervisor.tell(CamelMessage("test", Map()), testActor)
|
||||||
producerSupervisor.tell(CamelMessage("err", Map()), testActor)
|
producerSupervisor.tell(CamelMessage("err", Map()), testActor)
|
||||||
mockEndpoint.expectedMessageCount(1)
|
mockEndpoint.expectedMessageCount(1)
|
||||||
mockEndpoint.expectedBodiesReceived("TEST")
|
mockEndpoint.expectedBodiesReceived("TEST")
|
||||||
expectMsg("TEST")
|
expectMsg("TEST")
|
||||||
system.stop(producerSupervisor)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private def mockEndpoint = camel.context.getEndpoint("mock:mock", classOf[MockEndpoint])
|
private def mockEndpoint = camel.context.getEndpoint("mock:mock", classOf[MockEndpoint])
|
||||||
|
|
||||||
def stopGracefully(actors: ActorRef*)(implicit timeout: Timeout) {
|
|
||||||
val deadline = timeout.duration.fromNow
|
|
||||||
for (a ← actors)
|
|
||||||
Await.result(gracefulStop(a, deadline.timeLeft), deadline.timeLeft) must be === true
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
object ProducerFeatureTest {
|
object ProducerFeatureTest {
|
||||||
|
|
||||||
class ProducerSupervisor(childProps: Props) extends Actor {
|
class ProducerSupervisor(childProps: Props) extends Actor {
|
||||||
override def supervisorStrategy = SupervisorStrategy.stoppingStrategy
|
override def supervisorStrategy = SupervisorStrategy.stoppingStrategy
|
||||||
val child = context.actorOf(childProps, "producer-supervisor-child")
|
val child = context.actorOf(childProps, "producer-supervisor-child")
|
||||||
|
|
@ -281,6 +262,7 @@ object ProducerFeatureTest {
|
||||||
aref ! msg
|
aref ! msg
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class ChildProducer(uri: String, upper: Boolean = false) extends Actor with Producer {
|
class ChildProducer(uri: String, upper: Boolean = false) extends Actor with Producer {
|
||||||
override def oneway = true
|
override def oneway = true
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue