error handling enhancements

This commit is contained in:
Martin Krasser 2010-03-08 16:38:23 +01:00
parent 48ef898336
commit 2a49a6cefa
2 changed files with 17 additions and 9 deletions

View file

@ -117,7 +117,7 @@ object Message {
*
* @author Martin Krasser
*/
case class Failure(val cause: Throwable, val headers: Map[String, Any])
case class Failure(val cause: Exception, val headers: Map[String, Any])
/**
* Adapter for converting an org.apache.camel.Exchange to and from Message and Failure objects.
@ -140,6 +140,12 @@ class CamelExchangeAdapter(exchange: Exchange) {
*/
def fromResponseMessage(msg: Message): Exchange = { responseMessage.fromMessage(msg); exchange }
/**
* Sets Exchange.getException from the given Failure message. Headers of the Failure message
* are ignored.
*/
def fromFailureMessage(msg: Failure): Exchange = { exchange.setException(msg.cause); exchange }
/**
* Creates a Message object from Exchange.getIn.
*/

View file

@ -6,12 +6,13 @@ package se.scalablesolutions.akka.camel.component
import java.lang.{RuntimeException, String}
import java.util.{Map => JavaMap}
import java.util.concurrent.TimeoutException
import org.apache.camel.{Exchange, Consumer, Processor}
import org.apache.camel.impl.{DefaultProducer, DefaultEndpoint, DefaultComponent}
import se.scalablesolutions.akka.actor.{ActorRegistry, Actor}
import se.scalablesolutions.akka.camel.{CamelMessageConversion, Message}
import se.scalablesolutions.akka.camel.{Failure, CamelMessageConversion, Message}
/**
* Camel component for sending messages to and receiving replies from actors.
@ -52,6 +53,7 @@ class ActorComponent extends DefaultComponent {
* @author Martin Krasser
*/
class ActorEndpoint(uri: String, comp: ActorComponent, val id: Option[String], val uuid: Option[String]) extends DefaultEndpoint(uri, comp) {
/**
* @throws UnsupportedOperationException
*/
@ -113,15 +115,15 @@ class ActorProducer(val ep: ActorEndpoint) extends DefaultProducer(ep) {
* actor is of type se.scalablesolutions.akka.camel.Message.
*/
protected def processInOut(exchange: Exchange, actor: Actor) {
// TODO: support asynchronous communication
val result: Any = actor !! exchange.toRequestMessage(Map(Message.MessageExchangeId -> exchange.getExchangeId))
val header = Map(Message.MessageExchangeId -> exchange.getExchangeId)
val result: Any = actor !! exchange.toRequestMessage(header)
result match {
case Some(msg) => exchange.fromResponseMessage(Message.canonicalize(msg))
case None => {
// TODO: handle timeout properly
// TODO: make timeout configurable
case Some(msg: Failure) => exchange.fromFailureMessage(msg)
case Some(msg) => exchange.fromResponseMessage(Message.canonicalize(msg))
case None => {
throw new TimeoutException("communication with %s timed out after %d ms"
format (ep.getEndpointUri, actor.timeout))
}
}
}