ticket-1732, return Failure on producer ask _and_ throw AkkaCamelException so supervision can occur on camel failures

This commit is contained in:
RayRoestenburg 2012-05-07 14:18:06 +02:00
parent e6513bcb67
commit 8426ded00d
6 changed files with 112 additions and 44 deletions

View file

@ -7,17 +7,21 @@ package akka.camel
import org.apache.camel.{ Exchange, Processor }
import org.apache.camel.builder.RouteBuilder
import org.apache.camel.component.mock.MockEndpoint
import akka.dispatch.{ Future, Await }
import akka.camel.TestSupport.SharedCamelSystem
import akka.actor.SupervisorStrategy.Stop
import org.scalatest.{ GivenWhenThen, BeforeAndAfterEach, BeforeAndAfterAll, WordSpec }
import akka.actor._
import akka.pattern._
import akka.dispatch.Await
import akka.util.duration._
import akka.camel.TestSupport.SharedCamelSystem
import org.scalatest._
import akka.util.Timeout
import akka.testkit.TestLatch
import org.scalatest.matchers.MustMatchers
/**
* Tests the features of the Camel Producer.
*/
class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAndAfterEach with SharedCamelSystem with GivenWhenThen {
class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAndAfterEach with SharedCamelSystem with GivenWhenThen with MustMatchers {
import ProducerFeatureTest._
@ -25,8 +29,8 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
// to make testing equality of messages easier, otherwise the breadcrumb shows up in the result.
camelContext.setUseBreadcrumb(false)
val timeout = 1 second
val timeoutDuration = 1 second
implicit val timeout = Timeout(timeoutDuration)
override protected def beforeAll { camelContext.addRoutes(new TestRoute(system)) }
override protected def afterEach { mockEndpoint.reset() }
@ -38,10 +42,10 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
val producer = system.actorOf(Props(new TestProducer("direct:producer-test-2", true)))
when("a test message is sent to the producer with ?")
val message = CamelMessage("test", Map(CamelMessage.MessageExchangeId -> "123"))
val future = producer.ask(message)(timeout)
val future = producer.ask(message)(timeoutDuration)
then("a normal response must have been returned by the producer")
val expected = CamelMessage("received TEST", Map(CamelMessage.MessageExchangeId -> "123"))
Await.result(future, timeout) match {
Await.result(future, timeoutDuration) match {
case result: CamelMessage assert(result === expected)
case unexpected fail("Actor responded with unexpected message:" + unexpected)
}
@ -49,12 +53,30 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
"produce a message and receive failure response" in {
given("a registered two-way producer")
val producer = system.actorOf(Props(new TestProducer("direct:producer-test-2")))
val latch = TestLatch()
var deadActor: Option[ActorRef] = None
val supervisor = system.actorOf(Props(new Actor {
def receive = {
case p: Props {
val producer = context.actorOf(p)
context.watch(producer)
sender ! producer
}
case Terminated(actorRef) {
deadActor = Some(actorRef)
latch.countDown()
}
}
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
case _: AkkaCamelException Stop
}
}))
val producer = Await.result[ActorRef](supervisor.ask(Props(new TestProducer("direct:producer-test-2"))).asInstanceOf[Future[ActorRef]], timeoutDuration)
when("a test message causing an exception is sent to the producer with ?")
val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123"))
val future = producer.ask(message)(timeout)
Await.result(future, timeout) match {
val future = producer.ask(message)(timeoutDuration)
Await.result(future, timeoutDuration) match {
case result: Failure
then("a failure response must have been returned by the producer")
val expectedFailureText = result.cause.getMessage
@ -63,6 +85,10 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
assert(expectedHeaders === Map(CamelMessage.MessageExchangeId -> "123"))
case unexpected fail("Actor responded with unexpected message:" + unexpected)
}
then("an AkkaCamelException must have been thrown, which can be used for supervision")
// check that the supervisor stopped the producer and received a Terminated
Await.ready(latch, timeoutDuration)
deadActor must be(Some(producer))
}
"produce a message oneway" in {
@ -98,9 +124,9 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
when("a test message is sent to the producer with ?")
val message = CamelMessage("test", Map(CamelMessage.MessageExchangeId -> "123"))
val future = producer.ask(message)(timeout)
val future = producer.ask(message)(timeoutDuration)
Await.result(future, timeout) match {
Await.result(future, timeoutDuration) match {
case result: CamelMessage
then("a normal response must have been returned by the producer")
val expected = CamelMessage("received test", Map(CamelMessage.MessageExchangeId -> "123"))
@ -115,8 +141,8 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
when("a test message causing an exception is sent to the producer with ?")
val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123"))
val future = producer.ask(message)(timeout)
Await.result(future, timeout) match {
val future = producer.ask(message)(timeoutDuration)
Await.result(future, timeoutDuration) match {
case result: Failure
then("a failure response must have been returned by the producer")
val expectedFailureText = result.cause.getMessage
@ -134,9 +160,9 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
when("a test message is sent to the producer with ?")
val message = CamelMessage("test", Map(CamelMessage.MessageExchangeId -> "123"))
val future = producer.ask(message)(timeout)
val future = producer.ask(message)(timeoutDuration)
Await.result(future, timeout) match {
Await.result(future, timeoutDuration) match {
case result: CamelMessage
then("a normal response must have been returned by the forward target")
val expected = CamelMessage("received test", Map(CamelMessage.MessageExchangeId -> "123", "test" -> "result"))
@ -152,8 +178,8 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
when("a test message causing an exception is sent to the producer with ?")
val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123"))
val future = producer.ask(message)(timeout)
Await.result(future, timeout) match {
val future = producer.ask(message)(timeoutDuration)
Await.result(future, timeoutDuration) match {
case failure: Failure
then("a failure response must have been returned by the forward target")
val expectedFailureText = failure.cause.getMessage
@ -200,10 +226,10 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
when("a test message is sent to the producer with ?")
val message = CamelMessage("test", Map(CamelMessage.MessageExchangeId -> "123"))
val future = producer.ask(message)(timeout)
val future = producer.ask(message)(timeoutDuration)
then("a normal response must have been returned by the forward target")
Await.result(future, timeout) match {
Await.result(future, timeoutDuration) match {
case message: CamelMessage
val expected = CamelMessage("received test", Map(CamelMessage.MessageExchangeId -> "123", "test" -> "result"))
assert(message === expected)
@ -218,8 +244,8 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
when("a test message causing an exception is sent to the producer with ask")
val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123"))
val future = producer.ask(message)(timeout)
Await.result(future, timeout) match {
val future = producer.ask(message)(timeoutDuration)
Await.result(future, timeoutDuration) match {
case failure: Failure
then("a failure response must have been returned by the forward target")
val expectedFailureText = failure.cause.getMessage