From 2a49a6cefacded469c252020b92d25d60698e91e Mon Sep 17 00:00:00 2001 From: Martin Krasser Date: Mon, 8 Mar 2010 16:38:23 +0100 Subject: [PATCH] error handling enhancements --- akka-camel/src/main/scala/Message.scala | 8 +++++++- .../main/scala/component/ActorComponent.scala | 18 ++++++++++-------- 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/akka-camel/src/main/scala/Message.scala b/akka-camel/src/main/scala/Message.scala index db23868fac..b145ff10cd 100644 --- a/akka-camel/src/main/scala/Message.scala +++ b/akka-camel/src/main/scala/Message.scala @@ -117,7 +117,7 @@ object Message { * * @author Martin Krasser */ -case class Failure(val cause: Throwable, val headers: Map[String, Any]) +case class Failure(val cause: Exception, val headers: Map[String, Any]) /** * Adapter for converting an org.apache.camel.Exchange to and from Message and Failure objects. @@ -140,6 +140,12 @@ class CamelExchangeAdapter(exchange: Exchange) { */ def fromResponseMessage(msg: Message): Exchange = { responseMessage.fromMessage(msg); exchange } + /** + * Sets Exchange.getException from the given Failure message. Headers of the Failure message + * are ignored. + */ + def fromFailureMessage(msg: Failure): Exchange = { exchange.setException(msg.cause); exchange } + /** * Creates a Message object from Exchange.getIn. */ diff --git a/akka-camel/src/main/scala/component/ActorComponent.scala b/akka-camel/src/main/scala/component/ActorComponent.scala index 71db14021a..2fa116926c 100644 --- a/akka-camel/src/main/scala/component/ActorComponent.scala +++ b/akka-camel/src/main/scala/component/ActorComponent.scala @@ -6,12 +6,13 @@ package se.scalablesolutions.akka.camel.component import java.lang.{RuntimeException, String} import java.util.{Map => JavaMap} +import java.util.concurrent.TimeoutException import org.apache.camel.{Exchange, Consumer, Processor} import org.apache.camel.impl.{DefaultProducer, DefaultEndpoint, DefaultComponent} import se.scalablesolutions.akka.actor.{ActorRegistry, Actor} -import se.scalablesolutions.akka.camel.{CamelMessageConversion, Message} +import se.scalablesolutions.akka.camel.{Failure, CamelMessageConversion, Message} /** * Camel component for sending messages to and receiving replies from actors. @@ -52,6 +53,7 @@ class ActorComponent extends DefaultComponent { * @author Martin Krasser */ class ActorEndpoint(uri: String, comp: ActorComponent, val id: Option[String], val uuid: Option[String]) extends DefaultEndpoint(uri, comp) { + /** * @throws UnsupportedOperationException */ @@ -113,15 +115,15 @@ class ActorProducer(val ep: ActorEndpoint) extends DefaultProducer(ep) { * actor is of type se.scalablesolutions.akka.camel.Message. */ protected def processInOut(exchange: Exchange, actor: Actor) { - - // TODO: support asynchronous communication - val result: Any = actor !! exchange.toRequestMessage(Map(Message.MessageExchangeId -> exchange.getExchangeId)) + val header = Map(Message.MessageExchangeId -> exchange.getExchangeId) + val result: Any = actor !! exchange.toRequestMessage(header) result match { - case Some(msg) => exchange.fromResponseMessage(Message.canonicalize(msg)) - case None => { - // TODO: handle timeout properly - // TODO: make timeout configurable + case Some(msg: Failure) => exchange.fromFailureMessage(msg) + case Some(msg) => exchange.fromResponseMessage(Message.canonicalize(msg)) + case None => { + throw new TimeoutException("communication with %s timed out after %d ms" + format (ep.getEndpointUri, actor.timeout)) } } }