Removed akka.camel.Failure, replaced with akka.actor.Status.Failure, fixed tests

This commit is contained in:
RayRoestenburg 2012-05-11 09:46:49 +02:00
parent db1ece45e9
commit 82d9427ad4
10 changed files with 86 additions and 124 deletions

View file

@ -238,44 +238,6 @@ case object Ack {
def ack = this def ack = this
} }
/**
* An immutable representation of a failed Camel exchange. It contains the failure cause
* obtained from Exchange.getException and the headers from either the Exchange.getIn
* message or Exchange.getOut message, depending on the exchange pattern.
*
* @author Martin Krasser
*/
case class Failure(val cause: Throwable, val headers: Map[String, Any] = Map.empty) {
/**
* Creates a Failure with cause body and empty headers map.
*/
def this(cause: Throwable) = this(cause, Map.empty[String, Any])
/**
* Creates a Failure with given cause and headers map. A copy of the headers map is made.
* <p>
* Java API
*/
def this(cause: Throwable, headers: JMap[String, Any]) = this(cause, headers.toMap)
/**
* Returns the cause of this Failure.
* <p>
* Java API.
*/
def getCause = cause
/**
* Returns all headers from this failure message. The returned headers map is backed up by
* this message's immutable headers map. Any attempt to modify the returned map will throw
* an exception.
* <p>
* Java API
*/
def getHeaders: JMap[String, Any] = headers
}
/** /**
* An exception indicating that the exchange to the camel endpoint failed. * An exception indicating that the exchange to the camel endpoint failed.
* It contains the failure cause obtained from Exchange.getException and the headers from either the Exchange.getIn * It contains the failure cause obtained from Exchange.getException and the headers from either the Exchange.getIn

View file

@ -7,6 +7,7 @@ package akka.camel
import akka.actor.Actor import akka.actor.Actor
import internal.CamelExchangeAdapter import internal.CamelExchangeAdapter
import org.apache.camel.{ Exchange, ExchangePattern, AsyncCallback } import org.apache.camel.{ Exchange, ExchangePattern, AsyncCallback }
import akka.actor.Status.Failure
/** /**
* Support trait for producing messages to Camel endpoints. * Support trait for producing messages to Camel endpoints.
@ -75,7 +76,7 @@ trait ProducerSupport { this: Actor ⇒
val originalSender = sender val originalSender = sender
// Ignoring doneSync, sending back async uniformly. // Ignoring doneSync, sending back async uniformly.
def done(doneSync: Boolean): Unit = producer.tell( def done(doneSync: Boolean): Unit = producer.tell(
if (exchange.isFailed) FailureResult(exchange.toFailureMessage(cmsg.headers(headersToCopy))) if (exchange.isFailed) exchange.toFailureResult(cmsg.headers(headersToCopy))
else MessageResult(exchange.toResponseMessage(cmsg.headers(headersToCopy))), originalSender) else MessageResult(exchange.toResponseMessage(cmsg.headers(headersToCopy))), originalSender)
}) })
} }
@ -90,8 +91,9 @@ trait ProducerSupport { this: Actor ⇒
protected def produce: Receive = { protected def produce: Receive = {
case res: MessageResult routeResponse(res.message) case res: MessageResult routeResponse(res.message)
case res: FailureResult case res: FailureResult
routeResponse(res.failure) val e = new AkkaCamelException(res.cause, res.headers)
throw new AkkaCamelException(res.failure.cause, res.failure.headers) routeResponse(Failure(e))
throw e
case msg case msg
val exchangePattern = if (oneway) ExchangePattern.InOnly else ExchangePattern.InOut val exchangePattern = if (oneway) ExchangePattern.InOnly else ExchangePattern.InOut
produce(transformOutgoingMessage(msg), exchangePattern) produce(transformOutgoingMessage(msg), exchangePattern)
@ -143,7 +145,7 @@ private case class MessageResult(message: CamelMessage)
/** /**
* @author Martin Krasser * @author Martin Krasser
*/ */
private case class FailureResult(failure: Failure) private case class FailureResult(cause: Throwable, headers: Map[String, Any] = Map.empty)
/** /**
* A one-way producer. * A one-way producer.

View file

@ -5,7 +5,7 @@ import scala.collection.JavaConversions._
import org.apache.camel.util.ExchangeHelper import org.apache.camel.util.ExchangeHelper
import org.apache.camel.{ Exchange, Message JCamelMessage } import org.apache.camel.{ Exchange, Message JCamelMessage }
import akka.camel.{ Failure, AkkaCamelException, CamelMessage } import akka.camel.{ FailureResult, AkkaCamelException, CamelMessage }
/** /**
* For internal use only. * For internal use only.
@ -40,10 +40,10 @@ private[camel] class CamelExchangeAdapter(exchange: Exchange) {
def setResponse(msg: CamelMessage) { msg.copyContentTo(response) } def setResponse(msg: CamelMessage) { msg.copyContentTo(response) }
/** /**
* Sets Exchange.getException from the given Failure message. Headers of the Failure message * Sets Exchange.getException from the given FailureResult message. Headers of the FailureResult message
* are ignored. * are ignored.
*/ */
def setFailure(msg: Failure) { exchange.setException(msg.cause) } def setFailure(msg: FailureResult) { exchange.setException(msg.cause) }
/** /**
* Creates an immutable CamelMessage object from Exchange.getIn so it can be used with Actors. * Creates an immutable CamelMessage object from Exchange.getIn so it can be used with Actors.
@ -87,21 +87,21 @@ private[camel] class CamelExchangeAdapter(exchange: Exchange) {
new AkkaCamelException(exchange.getException, headers ++ response.getHeaders) new AkkaCamelException(exchange.getException, headers ++ response.getHeaders)
/** /**
* Creates an immutable Failure object from the adapted Exchange so it can be used with Actors. * Creates an immutable Failure object from the adapted Exchange so it can be used internally between Actors.
* *
* @see Failure * @see Failure
*/ */
def toFailureMessage: Failure = toFailureMessage(Map.empty) def toFailureMessage: FailureResult = toFailureResult(Map.empty)
/** /**
* Creates an immutable Failure object from the adapted Exchange so it can be used with Actors. * Creates an immutable FailureResult object from the adapted Exchange so it can be used internally between Actors.
* *
* @param headers additional headers to set on the created CamelMessage in addition to those * @param headers additional headers to set on the created CamelMessage in addition to those
* in the Camel message. * in the Camel message.
* *
* @see Failure * @see Failure
*/ */
def toFailureMessage(headers: Map[String, Any]): Failure = Failure(exchange.getException, headers ++ response.getHeaders) def toFailureResult(headers: Map[String, Any]): FailureResult = FailureResult(exchange.getException, headers ++ response.getHeaders)
/** /**
* Creates an immutable CamelMessage object from Exchange.getIn so it can be used with Actors. * Creates an immutable CamelMessage object from Exchange.getIn so it can be used with Actors.

View file

@ -17,7 +17,7 @@ import akka.util.duration._
import java.util.concurrent.{ TimeoutException, CountDownLatch } import java.util.concurrent.{ TimeoutException, CountDownLatch }
import akka.camel.internal.CamelExchangeAdapter import akka.camel.internal.CamelExchangeAdapter
import akka.util.{ NonFatal, Duration, Timeout } import akka.util.{ NonFatal, Duration, Timeout }
import akka.camel.{ ActorNotRegisteredException, DefaultConsumerConfig, ConsumerConfig, Camel, Ack, Failure CamelFailure, CamelMessage } import akka.camel.{ ActorNotRegisteredException, DefaultConsumerConfig, ConsumerConfig, Camel, Ack, FailureResult, CamelMessage }
/** /**
* For internal use only. * For internal use only.
@ -170,18 +170,18 @@ private[camel] class ActorProducer(val endpoint: ActorEndpoint, camel: Camel) ex
} }
private def forwardResponseTo(exchange: CamelExchangeAdapter): PartialFunction[Either[Throwable, Any], Unit] = { private def forwardResponseTo(exchange: CamelExchangeAdapter): PartialFunction[Either[Throwable, Any], Unit] = {
case Right(failure: CamelFailure) exchange.setFailure(failure) case Right(failure: FailureResult) exchange.setFailure(failure)
case Right(msg) exchange.setResponse(CamelMessage.canonicalize(msg)) case Right(msg) exchange.setResponse(CamelMessage.canonicalize(msg))
case Left(e: TimeoutException) exchange.setFailure(CamelFailure(new TimeoutException("Failed to get response from the actor [%s] within timeout [%s]. Check replyTimeout and blocking settings [%s]" format (endpoint.path, endpoint.replyTimeout, endpoint)))) case Left(e: TimeoutException) exchange.setFailure(FailureResult(new TimeoutException("Failed to get response from the actor [%s] within timeout [%s]. Check replyTimeout and blocking settings [%s]" format (endpoint.path, endpoint.replyTimeout, endpoint))))
case Left(throwable) exchange.setFailure(CamelFailure(throwable)) case Left(throwable) exchange.setFailure(FailureResult(throwable))
} }
private def forwardAckTo(exchange: CamelExchangeAdapter): PartialFunction[Either[Throwable, Any], Unit] = { private def forwardAckTo(exchange: CamelExchangeAdapter): PartialFunction[Either[Throwable, Any], Unit] = {
case Right(Ack) { /* no response message to set */ } case Right(Ack) { /* no response message to set */ }
case Right(failure: CamelFailure) exchange.setFailure(failure) case Right(failure: FailureResult) exchange.setFailure(failure)
case Right(msg) exchange.setFailure(CamelFailure(new IllegalArgumentException("Expected Ack or Failure message, but got: [%s] from actor [%s]" format (msg, endpoint.path)))) case Right(msg) exchange.setFailure(FailureResult(new IllegalArgumentException("Expected Ack or Failure message, but got: [%s] from actor [%s]" format (msg, endpoint.path))))
case Left(e: TimeoutException) exchange.setFailure(CamelFailure(new TimeoutException("Failed to get Ack or Failure response from the actor [%s] within timeout [%s]. Check replyTimeout and blocking settings [%s]" format (endpoint.path, endpoint.replyTimeout, endpoint)))) case Left(e: TimeoutException) exchange.setFailure(FailureResult(new TimeoutException("Failed to get Ack or Failure response from the actor [%s] within timeout [%s]. Check replyTimeout and blocking settings [%s]" format (endpoint.path, endpoint.replyTimeout, endpoint))))
case Left(throwable) exchange.setFailure(CamelFailure(throwable)) case Left(throwable) exchange.setFailure(FailureResult(throwable))
} }
private def sendAsync(message: CamelMessage, onComplete: PartialFunction[Either[Throwable, Any], Unit]): Boolean = { private def sendAsync(message: CamelMessage, onComplete: PartialFunction[Either[Throwable, Any], Unit]): Boolean = {
@ -199,7 +199,7 @@ private[camel] class ActorProducer(val endpoint: ActorEndpoint, camel: Camel) ex
try { try {
actorFor(endpoint.path) ! message actorFor(endpoint.path) ! message
} catch { } catch {
case e exchange.setFailure(new CamelFailure(e)) case e exchange.setFailure(new FailureResult(e))
} }
} }

View file

@ -4,6 +4,7 @@
package akka.camel; package akka.camel;
import akka.actor.Status;
import akka.camel.javaapi.UntypedConsumerActor; import akka.camel.javaapi.UntypedConsumerActor;
import akka.util.Duration; import akka.util.Duration;
import org.apache.camel.builder.Builder; import org.apache.camel.builder.Builder;
@ -41,7 +42,7 @@ public class SampleErrorHandlingConsumer extends UntypedConsumerActor {
@Override @Override
public void preRestart(Throwable reason, Option<Object> message){ public void preRestart(Throwable reason, Option<Object> message){
getSender().tell(new Failure(reason)); getSender().tell(new Status.Failure(reason));
} }
} }

View file

@ -39,10 +39,10 @@ class CamelExchangeAdapterTest extends FunSuite with SharedCamelSystem with Mess
test("mustSetExceptionFromFailureMessage") { test("mustSetExceptionFromFailureMessage") {
val e1 = sampleInOnly val e1 = sampleInOnly
e1.setFailure(Failure(new Exception("test1"))) e1.setFailure(FailureResult(new Exception("test1")))
assert(e1.getException.getMessage === "test1") assert(e1.getException.getMessage === "test1")
val e2 = sampleInOut val e2 = sampleInOut
e2.setFailure(Failure(new Exception("test2"))) e2.setFailure(FailureResult(new Exception("test2")))
assert(e2.getException.getMessage === "test2") assert(e2.getException.getMessage === "test2")
} }
@ -103,7 +103,7 @@ class CamelExchangeAdapterTest extends FunSuite with SharedCamelSystem with Mess
assert(headers("x") === "y") assert(headers("x") === "y")
assert(e1.toFailureMessage.cause.getMessage === "test1") assert(e1.toFailureMessage.cause.getMessage === "test1")
val failureHeaders = e1.toFailureMessage(Map("x" -> "y")).headers val failureHeaders = e1.toFailureResult(Map("x" -> "y")).headers
assert(failureHeaders("key-in") === "val-in") assert(failureHeaders("key-in") === "val-in")
assert(failureHeaders("x") === "y") assert(failureHeaders("x") === "y")
@ -117,7 +117,7 @@ class CamelExchangeAdapterTest extends FunSuite with SharedCamelSystem with Mess
assert(headers("key-out") === "val-out") assert(headers("key-out") === "val-out")
assert(headers("x") === "y") assert(headers("x") === "y")
assert(e1.toFailureMessage.cause.getMessage === "test2") assert(e1.toFailureMessage.cause.getMessage === "test2")
val failureHeaders = e1.toFailureMessage(Map("x" -> "y")).headers val failureHeaders = e1.toFailureResult(Map("x" -> "y")).headers
assert(failureHeaders("key-out") === "val-out") assert(failureHeaders("key-out") === "val-out")
assert(failureHeaders("x") === "y") assert(failureHeaders("x") === "y")
} }

View file

@ -15,6 +15,7 @@ import org.apache.camel.{ FailedToCreateRouteException, CamelExecutionException
import java.util.concurrent.{ ExecutionException, TimeUnit, TimeoutException } import java.util.concurrent.{ ExecutionException, TimeUnit, TimeoutException }
import akka.testkit.TestLatch import akka.testkit.TestLatch
import akka.dispatch.Await import akka.dispatch.Await
import akka.actor.Status.Failure
class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedCamelSystem { class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedCamelSystem {
private val defaultTimeout = 10 private val defaultTimeout = 10

View file

@ -7,7 +7,7 @@ package akka.camel
import org.apache.camel.{ Exchange, Processor } import org.apache.camel.{ Exchange, Processor }
import org.apache.camel.builder.RouteBuilder import org.apache.camel.builder.RouteBuilder
import org.apache.camel.component.mock.MockEndpoint import org.apache.camel.component.mock.MockEndpoint
import akka.dispatch.{ Future, Await } import akka.dispatch.Await
import akka.camel.TestSupport.SharedCamelSystem import akka.camel.TestSupport.SharedCamelSystem
import akka.actor.SupervisorStrategy.Stop import akka.actor.SupervisorStrategy.Stop
import org.scalatest.{ GivenWhenThen, BeforeAndAfterEach, BeforeAndAfterAll, WordSpec } import org.scalatest.{ GivenWhenThen, BeforeAndAfterEach, BeforeAndAfterAll, WordSpec }
@ -71,18 +71,16 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
case _: AkkaCamelException Stop case _: AkkaCamelException Stop
} }
})) }))
val producer = Await.result[ActorRef](supervisor.ask(Props(new TestProducer("direct:producer-test-2"))).asInstanceOf[Future[ActorRef]], timeoutDuration) val producer = Await.result[ActorRef](supervisor.ask(Props(new TestProducer("direct:producer-test-2"))).mapTo[ActorRef], timeoutDuration)
when("a test message causing an exception is sent to the producer with ?") when("a test message causing an exception is sent to the producer with ?")
val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123")) val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123"))
val future = producer.ask(message)(timeoutDuration) val future = producer.ask(message)(timeoutDuration).failed
Await.result(future, timeoutDuration) match { Await.ready(future, timeoutDuration).value match {
case result: Failure case Some(Right(e: AkkaCamelException))
then("a failure response must have been returned by the producer") then("a failure response must have been returned by the producer")
val expectedFailureText = result.cause.getMessage e.getMessage must be("failure")
val expectedHeaders = result.headers e.headers must be(Map(CamelMessage.MessageExchangeId -> "123"))
assert(expectedFailureText === "failure")
assert(expectedHeaders === Map(CamelMessage.MessageExchangeId -> "123"))
case unexpected fail("Actor responded with unexpected message:" + unexpected) case unexpected fail("Actor responded with unexpected message:" + unexpected)
} }
then("an AkkaCamelException must have been thrown, which can be used for supervision") then("an AkkaCamelException must have been thrown, which can be used for supervision")
@ -130,7 +128,7 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
case result: CamelMessage case result: CamelMessage
then("a normal response must have been returned by the producer") then("a normal response must have been returned by the producer")
val expected = CamelMessage("received test", Map(CamelMessage.MessageExchangeId -> "123")) val expected = CamelMessage("received test", Map(CamelMessage.MessageExchangeId -> "123"))
assert(result === expected) result must be(expected)
case unexpected fail("Actor responded with unexpected message:" + unexpected) case unexpected fail("Actor responded with unexpected message:" + unexpected)
} }
} }
@ -141,14 +139,13 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
when("a test message causing an exception is sent to the producer with ?") when("a test message causing an exception is sent to the producer with ?")
val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123")) val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123"))
val future = producer.ask(message)(timeoutDuration)
Await.result(future, timeoutDuration) match { val future = producer.ask(message)(timeoutDuration).failed
case result: Failure Await.ready(future, timeoutDuration).value match {
case Some(Right(e: AkkaCamelException))
then("a failure response must have been returned by the producer") then("a failure response must have been returned by the producer")
val expectedFailureText = result.cause.getMessage e.getMessage must be("failure")
val expectedHeaders = result.headers e.headers must be(Map(CamelMessage.MessageExchangeId -> "123"))
assert(expectedFailureText === "failure")
assert(expectedHeaders === Map(CamelMessage.MessageExchangeId -> "123"))
case unexpected fail("Actor responded with unexpected message:" + unexpected) case unexpected fail("Actor responded with unexpected message:" + unexpected)
} }
} }
@ -166,7 +163,7 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
case result: CamelMessage case result: CamelMessage
then("a normal response must have been returned by the forward target") then("a normal response must have been returned by the forward target")
val expected = CamelMessage("received test", Map(CamelMessage.MessageExchangeId -> "123", "test" -> "result")) val expected = CamelMessage("received test", Map(CamelMessage.MessageExchangeId -> "123", "test" -> "result"))
assert(result === expected) result must be(expected)
case unexpected fail("Actor responded with unexpected message:" + unexpected) case unexpected fail("Actor responded with unexpected message:" + unexpected)
} }
} }
@ -178,14 +175,12 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
when("a test message causing an exception is sent to the producer with ?") when("a test message causing an exception is sent to the producer with ?")
val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123")) val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123"))
val future = producer.ask(message)(timeoutDuration) val future = producer.ask(message)(timeoutDuration).failed
Await.result(future, timeoutDuration) match { Await.ready(future, timeoutDuration).value match {
case failure: Failure case Some(Right(e: AkkaCamelException))
then("a failure response must have been returned by the forward target") then("a failure response must have been returned by the forward target")
val expectedFailureText = failure.cause.getMessage e.getMessage must be("failure")
val expectedHeaders = failure.headers e.headers must be(Map(CamelMessage.MessageExchangeId -> "123", "test" -> "failure"))
assert(expectedFailureText === "failure")
assert(expectedHeaders === Map(CamelMessage.MessageExchangeId -> "123", "test" -> "failure"))
case unexpected fail("Actor responded with unexpected message:" + unexpected) case unexpected fail("Actor responded with unexpected message:" + unexpected)
} }
} }
@ -211,7 +206,7 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
when("a test message causing an exception is sent to the producer with !") when("a test message causing an exception is sent to the producer with !")
mockEndpoint.expectedMessageCount(1) mockEndpoint.expectedMessageCount(1)
mockEndpoint.message(0).body().isInstanceOf(classOf[Failure]) mockEndpoint.message(0).body().isInstanceOf(classOf[akka.actor.Status.Failure])
producer.tell(CamelMessage("fail", Map()), producer) producer.tell(CamelMessage("fail", Map()), producer)
then("a failure response must have been produced by the forward target") then("a failure response must have been produced by the forward target")
@ -232,7 +227,7 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
Await.result(future, timeoutDuration) match { Await.result(future, timeoutDuration) match {
case message: CamelMessage case message: CamelMessage
val expected = CamelMessage("received test", Map(CamelMessage.MessageExchangeId -> "123", "test" -> "result")) val expected = CamelMessage("received test", Map(CamelMessage.MessageExchangeId -> "123", "test" -> "result"))
assert(message === expected) message must be(expected)
case unexpected fail("Actor responded with unexpected message:" + unexpected) case unexpected fail("Actor responded with unexpected message:" + unexpected)
} }
} }
@ -244,14 +239,12 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
when("a test message causing an exception is sent to the producer with ask") when("a test message causing an exception is sent to the producer with ask")
val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123")) val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123"))
val future = producer.ask(message)(timeoutDuration) val future = producer.ask(message)(timeoutDuration).failed
Await.result(future, timeoutDuration) match { Await.ready(future, timeoutDuration).value match {
case failure: Failure case Some(Right(e: AkkaCamelException))
then("a failure response must have been returned by the forward target") then("a failure response must have been returned by the forward target")
val expectedFailureText = failure.cause.getMessage e.getMessage must be("failure")
val expectedHeaders = failure.headers e.headers must be(Map(CamelMessage.MessageExchangeId -> "123", "test" -> "failure"))
assert(expectedFailureText === "failure")
assert(expectedHeaders === Map(CamelMessage.MessageExchangeId -> "123", "test" -> "failure"))
case unexpected fail("Actor responded with unexpected message:" + unexpected) case unexpected fail("Actor responded with unexpected message:" + unexpected)
} }
} }
@ -276,7 +269,7 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
when("a test message causing an exception is sent to the producer with !") when("a test message causing an exception is sent to the producer with !")
mockEndpoint.expectedMessageCount(1) mockEndpoint.expectedMessageCount(1)
mockEndpoint.message(0).body().isInstanceOf(classOf[Failure]) mockEndpoint.message(0).body().isInstanceOf(classOf[akka.actor.Status.Failure])
producer.tell(CamelMessage("fail", Map()), producer) producer.tell(CamelMessage("fail", Map()), producer)
then("a failure response must have been produced by the forward target") then("a failure response must have been produced by the forward target")
@ -303,13 +296,15 @@ object ProducerFeatureTest {
class TestForwarder(uri: String, target: ActorRef) extends Actor with Producer { class TestForwarder(uri: String, target: ActorRef) extends Actor with Producer {
def endpointUri = uri def endpointUri = uri
override protected def routeResponse(msg: Any): Unit = target forward msg override def headersToCopy = Set(CamelMessage.MessageExchangeId, "test")
override def routeResponse(msg: Any): Unit = target forward msg
} }
class TestResponder extends Actor { class TestResponder extends Actor {
protected def receive = { protected def receive = {
case msg: CamelMessage msg.body match { case msg: CamelMessage msg.body match {
case "fail" context.sender ! (Failure(new Exception("failure"), msg.headers)) case "fail" context.sender ! akka.actor.Status.Failure(new AkkaCamelException(new Exception("failure"), msg.headers))
case _ case _
context.sender ! (msg.mapBody { context.sender ! (msg.mapBody {
body: String "received %s" format body body: String "received %s" format body
@ -322,8 +317,10 @@ object ProducerFeatureTest {
protected def receive = { protected def receive = {
case msg: CamelMessage case msg: CamelMessage
context.sender ! (msg.addHeader("test" -> "result")) context.sender ! (msg.addHeader("test" -> "result"))
case msg: Failure case msg: akka.actor.Status.Failure
context.sender ! (Failure(msg.cause, msg.headers + ("test" -> "failure"))) msg.cause match {
case e: AkkaCamelException context.sender ! Status.Failure(new AkkaCamelException(e, e.headers + ("test" -> "failure")))
}
} }
} }

View file

@ -14,8 +14,9 @@ import akka.pattern._
import akka.dispatch.Await import akka.dispatch.Await
import akka.util.duration._ import akka.util.duration._
import org.scalatest._ import org.scalatest._
import matchers.MustMatchers
class UntypedProducerTest extends WordSpec with BeforeAndAfterAll with BeforeAndAfterEach with SharedCamelSystem with GivenWhenThen { class UntypedProducerTest extends WordSpec with MustMatchers with BeforeAndAfterAll with BeforeAndAfterEach with SharedCamelSystem with GivenWhenThen {
import UntypedProducerTest._ import UntypedProducerTest._
val timeout = 1 second val timeout = 1 second
override protected def beforeAll = { override protected def beforeAll = {
@ -38,7 +39,7 @@ class UntypedProducerTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
then("a normal response should have been returned by the producer") then("a normal response should have been returned by the producer")
val expected = CamelMessage("received test", Map(CamelMessage.MessageExchangeId -> "123")) val expected = CamelMessage("received test", Map(CamelMessage.MessageExchangeId -> "123"))
Await.result(future, timeout) match { Await.result(future, timeout) match {
case result: CamelMessage assert(result === expected) case result: CamelMessage result must be(expected)
case unexpected fail("Actor responded with unexpected message:" + unexpected) case unexpected fail("Actor responded with unexpected message:" + unexpected)
} }
@ -50,19 +51,15 @@ class UntypedProducerTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
when("a test message causing an exception is sent to the producer with ask") when("a test message causing an exception is sent to the producer with ask")
val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123")) val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123"))
val future = producer.ask(message)(timeout) val future = producer.ask(message)(timeout).failed
then("a failure response should have been returned by the producer") then("a failure response should have been returned by the producer")
Await.result(future, timeout) match { Await.ready(future, timeout).value match {
case result: Failure { case Some(Right(e: AkkaCamelException))
val expectedFailureText = result.cause.getMessage e.getMessage must be("failure")
val expectedHeaders = result.headers e.headers must be(Map(CamelMessage.MessageExchangeId -> "123"))
assert(expectedFailureText === "failure")
assert(expectedHeaders === Map(CamelMessage.MessageExchangeId -> "123"))
}
case unexpected fail("Actor responded with unexpected message:" + unexpected) case unexpected fail("Actor responded with unexpected message:" + unexpected)
} }
} }
} }
"An UntypedProducer producing a message to a sync Camel route and then forwarding the response" must { "An UntypedProducer producing a message to a sync Camel route and then forwarding the response" must {
@ -73,7 +70,7 @@ class UntypedProducerTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
when("a test message is sent to the producer with !") when("a test message is sent to the producer with !")
mockEndpoint.expectedBodiesReceived("received test") mockEndpoint.expectedBodiesReceived("received test")
val result = producer.tell(CamelMessage("test", Map[String, Any]()), producer) producer.tell(CamelMessage("test", Map[String, Any]()), producer)
then("a normal response should have been sent") then("a normal response should have been sent")
mockEndpoint.assertIsSatisfied mockEndpoint.assertIsSatisfied

View file

@ -21,6 +21,7 @@ import akka.camel.TestSupport._
import java.util.concurrent.{ TimeoutException, CountDownLatch, TimeUnit } import java.util.concurrent.{ TimeoutException, CountDownLatch, TimeUnit }
import org.mockito.{ ArgumentMatcher, Matchers, Mockito } import org.mockito.{ ArgumentMatcher, Matchers, Mockito }
import org.scalatest.matchers.MustMatchers import org.scalatest.matchers.MustMatchers
import akka.actor.Status.Failure
class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with MustMatchers with ActorProducerFixture { class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with MustMatchers with ActorProducerFixture {
@ -33,7 +34,7 @@ class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with
producer = given(actor = null) producer = given(actor = null)
producer.processExchangeAdapter(exchange) producer.processExchangeAdapter(exchange)
verify(exchange).setFailure(any[Failure]) verify(exchange).setFailure(any[FailureResult])
} }
} }
@ -82,7 +83,7 @@ class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with
"set failure message to timeout" in { "set failure message to timeout" in {
process() process()
verify(exchange).setFailure(any[Failure]) verify(exchange).setFailure(any[FailureResult])
} }
} }
@ -97,7 +98,7 @@ class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with
def verifyFailureIsSet { def verifyFailureIsSet {
producer.processExchangeAdapter(exchange, asyncCallback) producer.processExchangeAdapter(exchange, asyncCallback)
asyncCallback.awaitCalled() asyncCallback.awaitCalled()
verify(exchange).setFailure(any[Failure]) verify(exchange).setFailure(any[FailureResult])
} }
"out-capable" when { "out-capable" when {
@ -130,7 +131,8 @@ class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with
"response is Failure" must { "response is Failure" must {
"set an exception on exchange" in { "set an exception on exchange" in {
val failure = Failure(new RuntimeException("some failure")) val exception = new RuntimeException("some failure")
val failure = Failure(exception)
producer = given(outCapable = true) producer = given(outCapable = true)
@ -142,7 +144,7 @@ class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with
asyncCallback.awaitCalled(remaining) asyncCallback.awaitCalled(remaining)
} }
verify(exchange).setFailure(failure) verify(exchange).setFailure(FailureResult(exception))
} }
} }
@ -151,8 +153,8 @@ class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with
producer = given(outCapable = true, replyTimeout = 10 millis) producer = given(outCapable = true, replyTimeout = 10 millis)
producer.processExchangeAdapter(exchange, asyncCallback) producer.processExchangeAdapter(exchange, asyncCallback)
asyncCallback.awaitCalled(100 millis) asyncCallback.awaitCalled(100 millis)
verify(exchange).setFailure(Matchers.argThat(new ArgumentMatcher[Failure] { verify(exchange).setFailure(Matchers.argThat(new ArgumentMatcher[FailureResult] {
def matches(failure: AnyRef) = { failure.asInstanceOf[Failure].getCause must be(anInstanceOf[TimeoutException]); true } def matches(failure: AnyRef) = { failure.asInstanceOf[FailureResult].cause must be(anInstanceOf[TimeoutException]); true }
})) }))
} }
@ -213,7 +215,7 @@ class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with
asyncCallback.expectDoneAsyncWithin(remaining); info("async callback called") asyncCallback.expectDoneAsyncWithin(remaining); info("async callback called")
} }
verify(exchange, never()).setResponse(any[CamelMessage]); info("no response forwarded to exchange") verify(exchange, never()).setResponse(any[CamelMessage]); info("no response forwarded to exchange")
verify(exchange).setFailure(any[Failure]); info("failure set") verify(exchange).setFailure(any[FailureResult]); info("failure set")
} }
} }
@ -224,7 +226,7 @@ class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with
producer.processExchangeAdapter(exchange, asyncCallback) producer.processExchangeAdapter(exchange, asyncCallback)
asyncCallback.awaitCalled(100 millis) asyncCallback.awaitCalled(100 millis)
verify(exchange).setFailure(any[Failure]) verify(exchange).setFailure(any[FailureResult])
} }
} }
@ -242,7 +244,7 @@ class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with
asyncCallback.awaitCalled(remaining); asyncCallback.awaitCalled(remaining);
} }
verify(exchange, never()).setResponse(any[CamelMessage]); info("no response forwarded to exchange") verify(exchange, never()).setResponse(any[CamelMessage]); info("no response forwarded to exchange")
verify(exchange).setFailure(any[Failure]); info("failure set") verify(exchange).setFailure(any[FailureResult]); info("failure set")
} }
} }
} }