diff --git a/akka-camel/src/main/scala/akka/camel/CamelMessage.scala b/akka-camel/src/main/scala/akka/camel/CamelMessage.scala
index 0c1a77bb58..75bc85bb23 100644
--- a/akka-camel/src/main/scala/akka/camel/CamelMessage.scala
+++ b/akka-camel/src/main/scala/akka/camel/CamelMessage.scala
@@ -238,44 +238,6 @@ case object Ack {
def ack = this
}
-/**
- * An immutable representation of a failed Camel exchange. It contains the failure cause
- * obtained from Exchange.getException and the headers from either the Exchange.getIn
- * message or Exchange.getOut message, depending on the exchange pattern.
- *
- * @author Martin Krasser
- */
-case class Failure(val cause: Throwable, val headers: Map[String, Any] = Map.empty) {
-
- /**
- * Creates a Failure with cause body and empty headers map.
- */
- def this(cause: Throwable) = this(cause, Map.empty[String, Any])
-
- /**
- * Creates a Failure with given cause and headers map. A copy of the headers map is made.
- *
- * Java API
- */
- def this(cause: Throwable, headers: JMap[String, Any]) = this(cause, headers.toMap)
-
- /**
- * Returns the cause of this Failure.
- *
- * Java API.
- */
- def getCause = cause
-
- /**
- * Returns all headers from this failure message. The returned headers map is backed up by
- * this message's immutable headers map. Any attempt to modify the returned map will throw
- * an exception.
- *
- * Java API
- */
- def getHeaders: JMap[String, Any] = headers
-}
-
/**
* An exception indicating that the exchange to the camel endpoint failed.
* It contains the failure cause obtained from Exchange.getException and the headers from either the Exchange.getIn
diff --git a/akka-camel/src/main/scala/akka/camel/Producer.scala b/akka-camel/src/main/scala/akka/camel/Producer.scala
index ca16861b1a..80537fda12 100644
--- a/akka-camel/src/main/scala/akka/camel/Producer.scala
+++ b/akka-camel/src/main/scala/akka/camel/Producer.scala
@@ -7,6 +7,7 @@ package akka.camel
import akka.actor.Actor
import internal.CamelExchangeAdapter
import org.apache.camel.{ Exchange, ExchangePattern, AsyncCallback }
+import akka.actor.Status.Failure
/**
* Support trait for producing messages to Camel endpoints.
@@ -75,7 +76,7 @@ trait ProducerSupport { this: Actor ⇒
val originalSender = sender
// Ignoring doneSync, sending back async uniformly.
def done(doneSync: Boolean): Unit = producer.tell(
- if (exchange.isFailed) FailureResult(exchange.toFailureMessage(cmsg.headers(headersToCopy)))
+ if (exchange.isFailed) exchange.toFailureResult(cmsg.headers(headersToCopy))
else MessageResult(exchange.toResponseMessage(cmsg.headers(headersToCopy))), originalSender)
})
}
@@ -90,8 +91,9 @@ trait ProducerSupport { this: Actor ⇒
protected def produce: Receive = {
case res: MessageResult ⇒ routeResponse(res.message)
case res: FailureResult ⇒
- routeResponse(res.failure)
- throw new AkkaCamelException(res.failure.cause, res.failure.headers)
+ val e = new AkkaCamelException(res.cause, res.headers)
+ routeResponse(Failure(e))
+ throw e
case msg ⇒
val exchangePattern = if (oneway) ExchangePattern.InOnly else ExchangePattern.InOut
produce(transformOutgoingMessage(msg), exchangePattern)
@@ -143,7 +145,7 @@ private case class MessageResult(message: CamelMessage)
/**
* @author Martin Krasser
*/
-private case class FailureResult(failure: Failure)
+private case class FailureResult(cause: Throwable, headers: Map[String, Any] = Map.empty)
/**
* A one-way producer.
diff --git a/akka-camel/src/main/scala/akka/camel/internal/CamelExchangeAdapter.scala b/akka-camel/src/main/scala/akka/camel/internal/CamelExchangeAdapter.scala
index ae409e3407..1f2d80e6df 100644
--- a/akka-camel/src/main/scala/akka/camel/internal/CamelExchangeAdapter.scala
+++ b/akka-camel/src/main/scala/akka/camel/internal/CamelExchangeAdapter.scala
@@ -5,7 +5,7 @@ import scala.collection.JavaConversions._
import org.apache.camel.util.ExchangeHelper
import org.apache.camel.{ Exchange, Message ⇒ JCamelMessage }
-import akka.camel.{ Failure, AkkaCamelException, CamelMessage }
+import akka.camel.{ FailureResult, AkkaCamelException, CamelMessage }
/**
* For internal use only.
@@ -40,10 +40,10 @@ private[camel] class CamelExchangeAdapter(exchange: Exchange) {
def setResponse(msg: CamelMessage) { msg.copyContentTo(response) }
/**
- * Sets Exchange.getException from the given Failure message. Headers of the Failure message
+ * Sets Exchange.getException from the given FailureResult message. Headers of the FailureResult message
* are ignored.
*/
- def setFailure(msg: Failure) { exchange.setException(msg.cause) }
+ def setFailure(msg: FailureResult) { exchange.setException(msg.cause) }
/**
* Creates an immutable CamelMessage object from Exchange.getIn so it can be used with Actors.
@@ -87,21 +87,21 @@ private[camel] class CamelExchangeAdapter(exchange: Exchange) {
new AkkaCamelException(exchange.getException, headers ++ response.getHeaders)
/**
- * Creates an immutable Failure object from the adapted Exchange so it can be used with Actors.
+ * Creates an immutable Failure object from the adapted Exchange so it can be used internally between Actors.
*
* @see Failure
*/
- def toFailureMessage: Failure = toFailureMessage(Map.empty)
+ def toFailureMessage: FailureResult = toFailureResult(Map.empty)
/**
- * Creates an immutable Failure object from the adapted Exchange so it can be used with Actors.
+ * Creates an immutable FailureResult object from the adapted Exchange so it can be used internally between Actors.
*
* @param headers additional headers to set on the created CamelMessage in addition to those
* in the Camel message.
*
* @see Failure
*/
- def toFailureMessage(headers: Map[String, Any]): Failure = Failure(exchange.getException, headers ++ response.getHeaders)
+ def toFailureResult(headers: Map[String, Any]): FailureResult = FailureResult(exchange.getException, headers ++ response.getHeaders)
/**
* Creates an immutable CamelMessage object from Exchange.getIn so it can be used with Actors.
diff --git a/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala b/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala
index bf284c37c8..e637914a2a 100644
--- a/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala
+++ b/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala
@@ -17,7 +17,7 @@ import akka.util.duration._
import java.util.concurrent.{ TimeoutException, CountDownLatch }
import akka.camel.internal.CamelExchangeAdapter
import akka.util.{ NonFatal, Duration, Timeout }
-import akka.camel.{ ActorNotRegisteredException, DefaultConsumerConfig, ConsumerConfig, Camel, Ack, Failure ⇒ CamelFailure, CamelMessage }
+import akka.camel.{ ActorNotRegisteredException, DefaultConsumerConfig, ConsumerConfig, Camel, Ack, FailureResult, CamelMessage }
/**
* For internal use only.
@@ -170,18 +170,18 @@ private[camel] class ActorProducer(val endpoint: ActorEndpoint, camel: Camel) ex
}
private def forwardResponseTo(exchange: CamelExchangeAdapter): PartialFunction[Either[Throwable, Any], Unit] = {
- case Right(failure: CamelFailure) ⇒ exchange.setFailure(failure)
+ case Right(failure: FailureResult) ⇒ exchange.setFailure(failure)
case Right(msg) ⇒ exchange.setResponse(CamelMessage.canonicalize(msg))
- case Left(e: TimeoutException) ⇒ exchange.setFailure(CamelFailure(new TimeoutException("Failed to get response from the actor [%s] within timeout [%s]. Check replyTimeout and blocking settings [%s]" format (endpoint.path, endpoint.replyTimeout, endpoint))))
- case Left(throwable) ⇒ exchange.setFailure(CamelFailure(throwable))
+ case Left(e: TimeoutException) ⇒ exchange.setFailure(FailureResult(new TimeoutException("Failed to get response from the actor [%s] within timeout [%s]. Check replyTimeout and blocking settings [%s]" format (endpoint.path, endpoint.replyTimeout, endpoint))))
+ case Left(throwable) ⇒ exchange.setFailure(FailureResult(throwable))
}
private def forwardAckTo(exchange: CamelExchangeAdapter): PartialFunction[Either[Throwable, Any], Unit] = {
case Right(Ack) ⇒ { /* no response message to set */ }
- case Right(failure: CamelFailure) ⇒ exchange.setFailure(failure)
- case Right(msg) ⇒ exchange.setFailure(CamelFailure(new IllegalArgumentException("Expected Ack or Failure message, but got: [%s] from actor [%s]" format (msg, endpoint.path))))
- case Left(e: TimeoutException) ⇒ exchange.setFailure(CamelFailure(new TimeoutException("Failed to get Ack or Failure response from the actor [%s] within timeout [%s]. Check replyTimeout and blocking settings [%s]" format (endpoint.path, endpoint.replyTimeout, endpoint))))
- case Left(throwable) ⇒ exchange.setFailure(CamelFailure(throwable))
+ case Right(failure: FailureResult) ⇒ exchange.setFailure(failure)
+ case Right(msg) ⇒ exchange.setFailure(FailureResult(new IllegalArgumentException("Expected Ack or Failure message, but got: [%s] from actor [%s]" format (msg, endpoint.path))))
+ case Left(e: TimeoutException) ⇒ exchange.setFailure(FailureResult(new TimeoutException("Failed to get Ack or Failure response from the actor [%s] within timeout [%s]. Check replyTimeout and blocking settings [%s]" format (endpoint.path, endpoint.replyTimeout, endpoint))))
+ case Left(throwable) ⇒ exchange.setFailure(FailureResult(throwable))
}
private def sendAsync(message: CamelMessage, onComplete: PartialFunction[Either[Throwable, Any], Unit]): Boolean = {
@@ -199,7 +199,7 @@ private[camel] class ActorProducer(val endpoint: ActorEndpoint, camel: Camel) ex
try {
actorFor(endpoint.path) ! message
} catch {
- case e ⇒ exchange.setFailure(new CamelFailure(e))
+ case e ⇒ exchange.setFailure(new FailureResult(e))
}
}
diff --git a/akka-camel/src/test/java/akka/camel/SampleErrorHandlingConsumer.java b/akka-camel/src/test/java/akka/camel/SampleErrorHandlingConsumer.java
index 0622a9a51a..5bde5f8976 100644
--- a/akka-camel/src/test/java/akka/camel/SampleErrorHandlingConsumer.java
+++ b/akka-camel/src/test/java/akka/camel/SampleErrorHandlingConsumer.java
@@ -4,6 +4,7 @@
package akka.camel;
+import akka.actor.Status;
import akka.camel.javaapi.UntypedConsumerActor;
import akka.util.Duration;
import org.apache.camel.builder.Builder;
@@ -41,7 +42,7 @@ public class SampleErrorHandlingConsumer extends UntypedConsumerActor {
@Override
public void preRestart(Throwable reason, Option