Preparing Akka Camel for bin compat

This commit is contained in:
Viktor Klang 2012-05-23 15:17:49 +02:00
parent 5bafa2d3a0
commit b45cec3da4
18 changed files with 109 additions and 125 deletions

View file

@ -214,7 +214,7 @@ private[akka] object MessageDispatcher {
// dispatcher debugging helper using println (see below) // dispatcher debugging helper using println (see below)
// since this is a compile-time constant, scalac will elide code behind if (MessageDispatcher.debug) (RK checked with 2.9.1) // since this is a compile-time constant, scalac will elide code behind if (MessageDispatcher.debug) (RK checked with 2.9.1)
final val debug = false final val debug = false // Deliberately without type ascription to make it a compile-time constant
lazy val actors = new Index[MessageDispatcher, ActorRef](16, _ compareTo _) lazy val actors = new Index[MessageDispatcher, ActorRef](16, _ compareTo _)
def printActors: Unit = if (debug) { def printActors: Unit = if (debug) {
for { for {

View file

@ -31,15 +31,15 @@ private[akka] object Mailbox {
*/ */
// primary status: only first three // primary status: only first three
final val Open = 0 // _status is not initialized in AbstractMailbox, so default must be zero! final val Open = 0 // _status is not initialized in AbstractMailbox, so default must be zero! Deliberately without type ascription to make it a compile-time constant
final val Suspended = 1 final val Suspended = 1 // Deliberately without type ascription to make it a compile-time constant
final val Closed = 2 final val Closed = 2 // Deliberately without type ascription to make it a compile-time constant
// secondary status: Scheduled bit may be added to Open/Suspended // secondary status: Scheduled bit may be added to Open/Suspended
final val Scheduled = 4 final val Scheduled = 4 // Deliberately without type ascription to make it a compile-time constant
// mailbox debugging helper using println (see below) // mailbox debugging helper using println (see below)
// since this is a compile-time constant, scalac will elide code behind if (Mailbox.debug) (RK checked with 2.9.1) // since this is a compile-time constant, scalac will elide code behind if (Mailbox.debug) (RK checked with 2.9.1)
final val debug = false final val debug = false // Deliberately without type ascription to make it a compile-time constant
} }
/** /**

View file

@ -18,9 +18,9 @@ import akka.pattern._
trait Activation { trait Activation {
import akka.dispatch.Await import akka.dispatch.Await
def system: ActorSystem def system: ActorSystem //FIXME Why is this here, what's it needed for and who should use it?
private val activationTracker = system.actorOf(Props[ActivationTracker], "camelActivationTracker") private val activationTracker = system.actorOf(Props[ActivationTracker], "camelActivationTracker") //FIXME Why is this also top level?
/** /**
* Awaits for endpoint to be activated. It blocks until the endpoint is registered in camel context or timeout expires. * Awaits for endpoint to be activated. It blocks until the endpoint is registered in camel context or timeout expires.
@ -29,13 +29,10 @@ trait Activation {
* @throws akka.camel.ActivationTimeoutException if endpoint is not activated within timeout. * @throws akka.camel.ActivationTimeoutException if endpoint is not activated within timeout.
* @return the activated ActorRef * @return the activated ActorRef
*/ */
def awaitActivation(endpoint: ActorRef, timeout: Duration): ActorRef = { def awaitActivation(endpoint: ActorRef, timeout: Duration): ActorRef =
try { try Await.result(activationFutureFor(endpoint, timeout), timeout) catch {
Await.result(activationFutureFor(endpoint, timeout), timeout)
} catch {
case e: TimeoutException throw new ActivationTimeoutException(endpoint, timeout) case e: TimeoutException throw new ActivationTimeoutException(endpoint, timeout)
} }
}
/** /**
* Awaits for endpoint to be de-activated. It is blocking until endpoint is unregistered in camel context or timeout expires. * Awaits for endpoint to be de-activated. It is blocking until endpoint is unregistered in camel context or timeout expires.
@ -43,37 +40,32 @@ trait Activation {
* @param timeout the timeout for the wait * @param timeout the timeout for the wait
* @throws akka.camel.DeActivationTimeoutException if endpoint is not de-activated within timeout. * @throws akka.camel.DeActivationTimeoutException if endpoint is not de-activated within timeout.
*/ */
def awaitDeactivation(endpoint: ActorRef, timeout: Duration) { def awaitDeactivation(endpoint: ActorRef, timeout: Duration): Unit =
try { try Await.result(deactivationFutureFor(endpoint, timeout), timeout) catch {
Await.result(deactivationFutureFor(endpoint, timeout), timeout)
} catch {
case e: TimeoutException throw new DeActivationTimeoutException(endpoint, timeout) case e: TimeoutException throw new DeActivationTimeoutException(endpoint, timeout)
} }
}
/** /**
* Similar to `awaitActivation` but returns a future instead. * Similar to `awaitActivation` but returns a future instead.
* @param endpoint the endpoint to be activated * @param endpoint the endpoint to be activated
* @param timeout the timeout for the Future * @param timeout the timeout for the Future
*/ */
def activationFutureFor(endpoint: ActorRef, timeout: Duration): Future[ActorRef] = { def activationFutureFor(endpoint: ActorRef, timeout: Duration): Future[ActorRef] =
(activationTracker.ask(AwaitActivation(endpoint))(Timeout(timeout))).map[ActorRef] { (activationTracker.ask(AwaitActivation(endpoint))(Timeout(timeout))).map[ActorRef] {
case EndpointActivated(_) endpoint case EndpointActivated(_) endpoint
case EndpointFailedToActivate(_, cause) throw cause case EndpointFailedToActivate(_, cause) throw cause
} }
}
/** /**
* Similar to awaitDeactivation but returns a future instead. * Similar to awaitDeactivation but returns a future instead.
* @param endpoint the endpoint to be deactivated * @param endpoint the endpoint to be deactivated
* @param timeout the timeout of the Future * @param timeout the timeout of the Future
*/ */
def deactivationFutureFor(endpoint: ActorRef, timeout: Duration): Future[Unit] = { def deactivationFutureFor(endpoint: ActorRef, timeout: Duration): Future[Unit] =
(activationTracker.ask(AwaitDeActivation(endpoint))(Timeout(timeout))).map[Unit] { (activationTracker.ask(AwaitDeActivation(endpoint))(Timeout(timeout))).map[Unit] {
case EndpointDeActivated(_) () case EndpointDeActivated(_) ()
case EndpointFailedToDeActivate(_, cause) throw cause case EndpointFailedToDeActivate(_, cause) throw cause
} }
}
} }
/** /**
@ -82,7 +74,7 @@ trait Activation {
* @param timeout the timeout * @param timeout the timeout
*/ */
class DeActivationTimeoutException(endpoint: ActorRef, timeout: Duration) extends TimeoutException { class DeActivationTimeoutException(endpoint: ActorRef, timeout: Duration) extends TimeoutException {
override def getMessage = "Timed out after %s, while waiting for de-activation of %s" format (timeout, endpoint.path) override def getMessage: String = "Timed out after %s, while waiting for de-activation of %s" format (timeout, endpoint.path)
} }
/** /**
@ -91,5 +83,5 @@ class DeActivationTimeoutException(endpoint: ActorRef, timeout: Duration) extend
* @param timeout the timeout * @param timeout the timeout
*/ */
class ActivationTimeoutException(endpoint: ActorRef, timeout: Duration) extends TimeoutException { class ActivationTimeoutException(endpoint: ActorRef, timeout: Duration) extends TimeoutException {
override def getMessage = "Timed out after %s, while waiting for activation of %s" format (timeout, endpoint.path) override def getMessage: String = "Timed out after %s, while waiting for activation of %s" format (timeout, endpoint.path)
} }

View file

@ -6,5 +6,5 @@ package akka.camel
* @author Martin Krasser * @author Martin Krasser
*/ */
class ActorNotRegisteredException(uri: String) extends RuntimeException { class ActorNotRegisteredException(uri: String) extends RuntimeException {
override def getMessage = "Actor [%s] doesn't exist" format uri override def getMessage: String = "Actor [%s] doesn't exist" format uri
} }

View file

@ -29,7 +29,8 @@ class ActorRouteDefinition(definition: ProcessorDefinition[_]) {
* @param actorRef the consumer with a default configuration. * @param actorRef the consumer with a default configuration.
* @return the path to the actor, as a camel uri String * @return the path to the actor, as a camel uri String
*/ */
def to(actorRef: ActorRef) = definition.to(ActorEndpointPath(actorRef).toCamelPath()) def to(actorRef: ActorRef) = //FIXME What is the return type of this?
definition.to(ActorEndpointPath(actorRef).toCamelPath())
/** /**
* Sends the message to an ActorRef endpoint * Sends the message to an ActorRef endpoint
@ -37,6 +38,7 @@ class ActorRouteDefinition(definition: ProcessorDefinition[_]) {
* @param consumerConfig the configuration for the consumer * @param consumerConfig the configuration for the consumer
* @return the path to the actor, as a camel uri String * @return the path to the actor, as a camel uri String
*/ */
def to(actorRef: ActorRef, consumerConfig: ConsumerConfig) = definition.to(ActorEndpointPath(actorRef).toCamelPath(consumerConfig)) def to(actorRef: ActorRef, consumerConfig: ConsumerConfig) = //FIXME What is the return type of this?
definition.to(ActorEndpointPath(actorRef).toCamelPath(consumerConfig))
} }

View file

@ -50,13 +50,13 @@ object CamelExtension extends ExtensionId[Camel] with ExtensionIdProvider {
/** /**
* Creates a new instance of Camel and makes sure it gets stopped when the actor system is shutdown. * Creates a new instance of Camel and makes sure it gets stopped when the actor system is shutdown.
*/ */
def createExtension(system: ExtendedActorSystem) = { override def createExtension(system: ExtendedActorSystem): Camel = {
val camel = new DefaultCamel(system).start val camel = new DefaultCamel(system).start
system.registerOnTermination(camel.shutdown()) system.registerOnTermination(camel.shutdown())
camel camel
} }
def lookup(): ExtensionId[Camel] = CamelExtension override def lookup(): ExtensionId[Camel] = CamelExtension
override def get(system: ActorSystem): Camel = super.get(system) override def get(system: ActorSystem): Camel = super.get(system)
} }

View file

@ -21,12 +21,12 @@ case class CamelMessage(body: Any, headers: Map[String, Any]) {
def this(body: Any, headers: JMap[String, Any]) = this(body, headers.toMap) //for Java def this(body: Any, headers: JMap[String, Any]) = this(body, headers.toMap) //for Java
override def toString = "CamelMessage(%s, %s)" format (body, headers) override def toString: String = "CamelMessage(%s, %s)" format (body, headers)
/** /**
* Returns those headers from this message whose name is contained in <code>names</code>. * Returns those headers from this message whose name is contained in <code>names</code>.
*/ */
def headers(names: Set[String]): Map[String, Any] = headers.filterKeys(names contains _) def headers(names: Set[String]): Map[String, Any] = headers filterKeys names
/** /**
* Returns those headers from this message whose name is contained in <code>names</code>. * Returns those headers from this message whose name is contained in <code>names</code>.
@ -75,7 +75,7 @@ case class CamelMessage(body: Any, headers: Map[String, Any]) {
/** /**
* Creates a CamelMessage with a given <code>body</code>. * Creates a CamelMessage with a given <code>body</code>.
*/ */
def withBody(body: Any) = CamelMessage(body, this.headers) def withBody(body: Any): CamelMessage = CamelMessage(body, this.headers)
/** /**
* Creates a new CamelMessage with given <code>headers</code>. * Creates a new CamelMessage with given <code>headers</code>.
@ -119,9 +119,9 @@ case class CamelMessage(body: Any, headers: Map[String, Any]) {
* Creates a new CamelMessage where the header with given <code>headerName</code> is removed from * Creates a new CamelMessage where the header with given <code>headerName</code> is removed from
* the existing headers. * the existing headers.
*/ */
def withoutHeader(headerName: String) = copy(this.body, this.headers - headerName) def withoutHeader(headerName: String): CamelMessage = copy(this.body, this.headers - headerName)
def copyContentTo(to: JCamelMessage) = { def copyContentTo(to: JCamelMessage): Unit = {
to.setBody(this.body) to.setBody(this.body)
for ((name, value) this.headers) to.getHeaders.put(name, value.asInstanceOf[AnyRef]) for ((name, value) this.headers) to.getHeaders.put(name, value.asInstanceOf[AnyRef])
} }
@ -145,8 +145,7 @@ case class CamelMessage(body: Any, headers: Map[String, Any]) {
* Java API * Java API
* *
*/ */
def getBodyAs[T](clazz: Class[T], camelContext: CamelContext): T = def getBodyAs[T](clazz: Class[T], camelContext: CamelContext): T = camelContext.getTypeConverter.mandatoryConvertTo[T](clazz, body)
camelContext.getTypeConverter.mandatoryConvertTo[T](clazz, body)
/** /**
* Creates a CamelMessage with current <code>body</code> converted to type <code>T</code>. * Creates a CamelMessage with current <code>body</code> converted to type <code>T</code>.
@ -184,7 +183,7 @@ case class CamelMessage(body: Any, headers: Map[String, Any]) {
* <p> * <p>
* Java API * Java API
*/ */
def getHeaderAs[T](name: String, clazz: Class[T], camelContext: CamelContext) = headerAs[T](name)(Manifest.classType(clazz), camelContext).get def getHeaderAs[T](name: String, clazz: Class[T], camelContext: CamelContext): T = headerAs[T](name)(Manifest.classType(clazz), camelContext).get
} }
@ -201,7 +200,7 @@ object CamelMessage {
* so that it can be correlated with an asynchronous response. Messages send to Consumer * so that it can be correlated with an asynchronous response. Messages send to Consumer
* actors have this header already set. * actors have this header already set.
*/ */
val MessageExchangeId = "MessageExchangeId".intern val MessageExchangeId = "MessageExchangeId".intern //Deliberately without type ascription to make it a constant
/** /**
* Creates a canonical form of the given message <code>msg</code>. If <code>msg</code> of type * Creates a canonical form of the given message <code>msg</code>. If <code>msg</code> of type
@ -244,5 +243,7 @@ case object Ack {
* message or Exchange.getOut message, depending on the exchange pattern. * message or Exchange.getOut message, depending on the exchange pattern.
* *
*/ */
class AkkaCamelException private[akka] (cause: Throwable, val headers: Map[String, Any] = Map.empty) class AkkaCamelException private[akka] (cause: Throwable, val headers: Map[String, Any])
extends AkkaException(cause.getMessage, cause) extends AkkaException(cause.getMessage, cause) {
def this(cause: Throwable) = this(cause, Map.empty)
}

View file

@ -31,7 +31,7 @@ trait ConsumerConfig {
/** /**
* How long the actor should wait for activation before it fails. * How long the actor should wait for activation before it fails.
*/ */
def activationTimeout: Duration = 10 seconds def activationTimeout: Duration = 10 seconds // FIXME Should be configured in reference.conf
/** /**
* When endpoint is out-capable (can produce responses) replyTimeout is the maximum time * When endpoint is out-capable (can produce responses) replyTimeout is the maximum time
@ -39,14 +39,14 @@ trait ConsumerConfig {
* This setting is used for out-capable, in-only, manually acknowledged communication. * This setting is used for out-capable, in-only, manually acknowledged communication.
* When the blocking is set to Blocking replyTimeout is ignored. * When the blocking is set to Blocking replyTimeout is ignored.
*/ */
def replyTimeout: Duration = 1 minute def replyTimeout: Duration = 1 minute // FIXME Should be configured in reference.conf
/** /**
* Determines whether one-way communications between an endpoint and this consumer actor * Determines whether one-way communications between an endpoint and this consumer actor
* should be auto-acknowledged or application-acknowledged. * should be auto-acknowledged or application-acknowledged.
* This flag has only effect when exchange is in-only. * This flag has only effect when exchange is in-only.
*/ */
def autoack: Boolean = true def autoack: Boolean = true // FIXME Should be configured in reference.conf
/** /**
* The route definition handler for creating a custom route to this consumer instance. * The route definition handler for creating a custom route to this consumer instance.

View file

@ -6,8 +6,9 @@ package akka.camel
import akka.actor.Actor import akka.actor.Actor
import internal.CamelExchangeAdapter import internal.CamelExchangeAdapter
import org.apache.camel.{ Exchange, ExchangePattern, AsyncCallback }
import akka.actor.Status.Failure import akka.actor.Status.Failure
import org.apache.camel.{ Endpoint, Exchange, ExchangePattern, AsyncCallback }
import org.apache.camel.processor.SendProcessor
/** /**
* Support trait for producing messages to Camel endpoints. * Support trait for producing messages to Camel endpoints.
@ -15,19 +16,19 @@ import akka.actor.Status.Failure
* @author Martin Krasser * @author Martin Krasser
*/ */
trait ProducerSupport { this: Actor trait ProducerSupport { this: Actor
protected[this] implicit def camel = CamelExtension(context.system) protected[this] implicit def camel = CamelExtension(context.system) // FIXME This is duplicated from Consumer, create a common base-trait?
/** /**
* camelContext implicit is useful when using advanced methods of CamelMessage. * camelContext implicit is useful when using advanced methods of CamelMessage.
*/ */
protected[this] implicit def camelContext = camel.context protected[this] implicit def camelContext = camel.context // FIXME This is duplicated from Consumer, create a common base-trait?
protected[this] lazy val (endpoint, processor) = camel.registerProducer(self, endpointUri) protected[this] lazy val (endpoint: Endpoint, processor: SendProcessor) = camel.registerProducer(self, endpointUri)
/** /**
* CamelMessage headers to copy by default from request message to response-message. * CamelMessage headers to copy by default from request message to response-message.
*/ */
private val headersToCopyDefault = Set(CamelMessage.MessageExchangeId) private val headersToCopyDefault: Set[String] = Set(CamelMessage.MessageExchangeId)
/** /**
* If set to false (default), this producer expects a response message from the Camel endpoint. * If set to false (default), this producer expects a response message from the Camel endpoint.
@ -64,20 +65,21 @@ trait ProducerSupport { this: Actor ⇒
* @param pattern exchange pattern * @param pattern exchange pattern
*/ */
protected def produce(msg: Any, pattern: ExchangePattern): Unit = { protected def produce(msg: Any, pattern: ExchangePattern): Unit = {
implicit def toExchangeAdapter(exchange: Exchange): CamelExchangeAdapter = new CamelExchangeAdapter(exchange) // Need copies of sender reference here since the callback could be done
// later by another thread.
val producer = self
val originalSender = sender
val cmsg = CamelMessage.canonicalize(msg) val cmsg = CamelMessage.canonicalize(msg)
val exchange = endpoint.createExchange(pattern) val xchg = new CamelExchangeAdapter(endpoint.createExchange(pattern))
exchange.setRequest(cmsg)
processor.process(exchange, new AsyncCallback { xchg.setRequest(cmsg)
val producer = self
// Need copies of sender reference here since the callback could be done processor.process(xchg.exchange, new AsyncCallback {
// later by another thread.
val originalSender = sender
// Ignoring doneSync, sending back async uniformly. // Ignoring doneSync, sending back async uniformly.
def done(doneSync: Boolean): Unit = producer.tell( def done(doneSync: Boolean): Unit = producer.tell(
if (exchange.isFailed) exchange.toFailureResult(cmsg.headers(headersToCopy)) if (xchg.exchange.isFailed) xchg.toFailureResult(cmsg.headers(headersToCopy))
else MessageResult(exchange.toResponseMessage(cmsg.headers(headersToCopy))), originalSender) else MessageResult(xchg.toResponseMessage(cmsg.headers(headersToCopy))), originalSender)
}) })
} }
@ -94,9 +96,7 @@ trait ProducerSupport { this: Actor ⇒
val e = new AkkaCamelException(res.cause, res.headers) val e = new AkkaCamelException(res.cause, res.headers)
routeResponse(Failure(e)) routeResponse(Failure(e))
throw e throw e
case msg case msg produce(transformOutgoingMessage(msg), if (oneway) ExchangePattern.InOnly else ExchangePattern.InOut)
val exchangePattern = if (oneway) ExchangePattern.InOnly else ExchangePattern.InOut
produce(transformOutgoingMessage(msg), exchangePattern)
} }
/** /**
@ -134,7 +134,7 @@ trait Producer extends ProducerSupport { this: Actor ⇒
* Default implementation of Actor.receive. Any messages received by this actors * Default implementation of Actor.receive. Any messages received by this actors
* will be produced to the endpoint specified by <code>endpointUri</code>. * will be produced to the endpoint specified by <code>endpointUri</code>.
*/ */
def receive = produce def receive: Actor.Receive = produce
} }
/** /**
@ -153,6 +153,6 @@ private case class FailureResult(cause: Throwable, headers: Map[String, Any] = M
* @author Martin Krasser * @author Martin Krasser
*/ */
trait Oneway extends Producer { this: Actor trait Oneway extends Producer { this: Actor
override def oneway = true override def oneway: Boolean = true
} }

View file

@ -20,7 +20,7 @@ private[camel] abstract class ActivationMessage(val actor: ActorRef)
* *
*/ */
private[camel] object ActivationMessage { private[camel] object ActivationMessage {
def unapply(msg: ActivationMessage): Option[ActorRef] = Some(msg.actor) def unapply(msg: ActivationMessage): Option[ActorRef] = Option(msg.actor)
} }
/** /**

View file

@ -96,17 +96,15 @@ private[akka] final class ActivationTracker extends Actor with ActorLogging {
/** /**
* Subscribes self to messages of type <code>ActivationMessage</code> * Subscribes self to messages of type <code>ActivationMessage</code>
*/ */
override def preStart() { override def preStart(): Unit = context.system.eventStream.subscribe(self, classOf[ActivationMessage])
context.system.eventStream.subscribe(self, classOf[ActivationMessage])
}
override def receive = { override def receive = {
case msg @ ActivationMessage(ref) case msg @ ActivationMessage(ref)
val state = activations.getOrElseUpdate(ref, new ActivationStateMachine) (activations.getOrElseUpdate(ref, new ActivationStateMachine).receive orElse logStateWarning(ref))(msg)
(state.receive orElse logStateWarning(ref))(msg)
} }
private[this] def logStateWarning(actorRef: ActorRef): Receive = { case msg log.warning("Message [{}] not expected in current state of actor [{}]", msg, actorRef) } private[this] def logStateWarning(actorRef: ActorRef): Receive =
{ case msg log.warning("Message [{}] not expected in current state of actor [{}]", msg, actorRef) }
} }
/** /**

View file

@ -16,34 +16,34 @@ import akka.camel.{ FailureResult, AkkaCamelException, CamelMessage }
* *
* @author Martin Krasser * @author Martin Krasser
*/ */
private[camel] class CamelExchangeAdapter(exchange: Exchange) { private[camel] class CamelExchangeAdapter(val exchange: Exchange) {
/** /**
* Returns the exchange id * Returns the exchange id
*/ */
def getExchangeId = exchange.getExchangeId def getExchangeId: String = exchange.getExchangeId
/** /**
* Returns if the exchange is out capable. * Returns if the exchange is out capable.
*/ */
def isOutCapable = exchange.getPattern.isOutCapable def isOutCapable: Boolean = exchange.getPattern.isOutCapable
/** /**
* Sets Exchange.getIn from the given CamelMessage object. * Sets Exchange.getIn from the given CamelMessage object.
*/ */
def setRequest(msg: CamelMessage) { msg.copyContentTo(request) } def setRequest(msg: CamelMessage): Unit = msg.copyContentTo(request)
/** /**
* Depending on the exchange pattern, sets Exchange.getIn or Exchange.getOut from the given * Depending on the exchange pattern, sets Exchange.getIn or Exchange.getOut from the given
* CamelMessage object. If the exchange is out-capable then the Exchange.getOut is set, otherwise * CamelMessage object. If the exchange is out-capable then the Exchange.getOut is set, otherwise
* Exchange.getIn. * Exchange.getIn.
*/ */
def setResponse(msg: CamelMessage) { msg.copyContentTo(response) } def setResponse(msg: CamelMessage): Unit = msg.copyContentTo(response)
/** /**
* Sets Exchange.getException from the given FailureResult message. Headers of the FailureResult message * Sets Exchange.getException from the given FailureResult message. Headers of the FailureResult message
* are ignored. * are ignored.
*/ */
def setFailure(msg: FailureResult) { exchange.setException(msg.cause) } def setFailure(msg: FailureResult): Unit = exchange.setException(msg.cause)
/** /**
* Creates an immutable CamelMessage object from Exchange.getIn so it can be used with Actors. * Creates an immutable CamelMessage object from Exchange.getIn so it can be used with Actors.
@ -120,7 +120,7 @@ private[camel] class CamelExchangeAdapter(exchange: Exchange) {
*/ */
def toResponseMessage(headers: Map[String, Any]): CamelMessage = CamelMessage.from(response, headers) def toResponseMessage(headers: Map[String, Any]): CamelMessage = CamelMessage.from(response, headers)
private def request = exchange.getIn private def request: JCamelMessage = exchange.getIn
private def response: JCamelMessage = ExchangeHelper.getResultMessage(exchange) private def response: JCamelMessage = ExchangeHelper.getResultMessage(exchange)

View file

@ -2,12 +2,12 @@ package akka.camel.internal
import akka.actor.ActorSystem import akka.actor.ActorSystem
import component.{ DurationTypeConverter, ActorComponent } import component.{ DurationTypeConverter, ActorComponent }
import org.apache.camel.CamelContext
import org.apache.camel.impl.DefaultCamelContext import org.apache.camel.impl.DefaultCamelContext
import scala.Predef._ import scala.Predef._
import akka.event.Logging import akka.event.Logging
import akka.camel.Camel import akka.camel.Camel
import akka.util.{ NonFatal, Duration } import akka.util.{ NonFatal, Duration }
import org.apache.camel.{ ProducerTemplate, CamelContext }
/** /**
* For internal use only. * For internal use only.
@ -33,14 +33,14 @@ private[camel] class DefaultCamel(val system: ActorSystem) extends Camel {
ctx ctx
} }
lazy val template = context.createProducerTemplate() lazy val template: ProducerTemplate = context.createProducerTemplate()
/** /**
* Starts camel and underlying camel context and template. * Starts camel and underlying camel context and template.
* Only the creator of Camel should start and stop it. * Only the creator of Camel should start and stop it.
* @see akka.camel.DefaultCamel#stop() * @see akka.camel.DefaultCamel#stop()
*/ */
def start = { def start(): this.type = {
context.start() context.start()
try template.start() catch { case NonFatal(e) context.stop(); throw e } try template.start() catch { case NonFatal(e) context.stop(); throw e }
log.debug("Started CamelContext[{}] for ActorSystem[{}]", context.getName, system.name) log.debug("Started CamelContext[{}] for ActorSystem[{}]", context.getName, system.name)
@ -54,9 +54,9 @@ private[camel] class DefaultCamel(val system: ActorSystem) extends Camel {
* *
* @see akka.camel.DefaultCamel#start() * @see akka.camel.DefaultCamel#start()
*/ */
def shutdown() { def shutdown(): Unit = {
try context.stop() finally { try context.stop() finally {
try { template.stop() } catch { case NonFatal(e) log.debug("Swallowing non-fatal exception [{}] on stopping Camel producer template", e) } try template.stop() catch { case NonFatal(e) log.debug("Swallowing non-fatal exception [{}] on stopping Camel producer template", e) }
} }
log.debug("Stopped CamelContext[{}] for ActorSystem[{}]", context.getName, system.name) log.debug("Stopped CamelContext[{}] for ActorSystem[{}]", context.getName, system.name)
} }

View file

@ -11,6 +11,8 @@ import akka.util.NonFatal
* Watches the end of life of <code>Producer</code>s. * Watches the end of life of <code>Producer</code>s.
* Removes a <code>Producer</code> from the <code>ProducerRegistry</code> when it is <code>Terminated</code>, * Removes a <code>Producer</code> from the <code>ProducerRegistry</code> when it is <code>Terminated</code>,
* which in turn stops the <code>SendProcessor</code>. * which in turn stops the <code>SendProcessor</code>.
*
* INTERNAL API
*/ */
private class ProducerWatcher(registry: ProducerRegistry) extends Actor { private class ProducerWatcher(registry: ProducerRegistry) extends Actor {
override def receive = { override def receive = {
@ -19,6 +21,9 @@ private class ProducerWatcher(registry: ProducerRegistry) extends Actor {
} }
} }
/**
* INTERNAL API
*/
private case class RegisterProducer(actorRef: ActorRef) private case class RegisterProducer(actorRef: ActorRef)
/** /**
@ -27,14 +32,11 @@ private case class RegisterProducer(actorRef: ActorRef)
* Every <code>Producer</code> needs an <code>Endpoint</code> and a <code>SendProcessor</code> * Every <code>Producer</code> needs an <code>Endpoint</code> and a <code>SendProcessor</code>
* to produce messages over an <code>Exchange</code>. * to produce messages over an <code>Exchange</code>.
*/ */
private[camel] trait ProducerRegistry { private[camel] trait ProducerRegistry { this: Camel
this: Camel
private val camelObjects = new ConcurrentHashMap[ActorRef, (Endpoint, SendProcessor)]() private val camelObjects = new ConcurrentHashMap[ActorRef, (Endpoint, SendProcessor)]()
private val watcher = system.actorOf(Props(new ProducerWatcher(this))) private val watcher = system.actorOf(Props(new ProducerWatcher(this))) //FIXME should this really be top level?
private def registerWatch(actorRef: ActorRef) { private def registerWatch(actorRef: ActorRef): Unit = watcher ! RegisterProducer(actorRef)
watcher ! RegisterProducer(actorRef)
}
/** /**
* For internal use only. * For internal use only.
@ -77,7 +79,7 @@ private[camel] trait ProducerRegistry {
case NonFatal(e) { case NonFatal(e) {
system.eventStream.publish(EndpointFailedToActivate(actorRef, e)) system.eventStream.publish(EndpointFailedToActivate(actorRef, e))
// can't return null to the producer actor, so blow up actor in initialization. // can't return null to the producer actor, so blow up actor in initialization.
throw e throw e //FIXME I'm not a huge fan of log-rethrow, either log or rethrow
} }
} }
} }

View file

@ -35,10 +35,8 @@ private[camel] class ActorComponent(camel: Camel) extends DefaultComponent {
/** /**
* @see org.apache.camel.Component * @see org.apache.camel.Component
*/ */
def createEndpoint(uri: String, remaining: String, parameters: JMap[String, Object]): ActorEndpoint = { def createEndpoint(uri: String, remaining: String, parameters: JMap[String, Object]): ActorEndpoint =
val path = ActorEndpointPath.fromCamelPath(remaining) new ActorEndpoint(uri, this, ActorEndpointPath.fromCamelPath(remaining), camel)
new ActorEndpoint(uri, this, path, camel)
}
} }
/** /**
@ -92,7 +90,7 @@ private[camel] class ActorEndpoint(uri: String,
private[camel] trait ActorEndpointConfig { private[camel] trait ActorEndpointConfig {
def path: ActorEndpointPath def path: ActorEndpointPath
@BeanProperty var replyTimeout: Duration = 1 minute @BeanProperty var replyTimeout: Duration = 1 minute // FIXME default should be in config, not code
/** /**
* Whether to auto-acknowledge one-way message exchanges with (untyped) actors. This is * Whether to auto-acknowledge one-way message exchanges with (untyped) actors. This is
@ -117,7 +115,7 @@ private[camel] class ActorProducer(val endpoint: ActorEndpoint, camel: Camel) ex
* Calls the asynchronous version of the method and waits for the result (blocking). * Calls the asynchronous version of the method and waits for the result (blocking).
* @param exchange the exchange to process * @param exchange the exchange to process
*/ */
def process(exchange: Exchange) { processExchangeAdapter(new CamelExchangeAdapter(exchange)) } def process(exchange: Exchange): Unit = processExchangeAdapter(new CamelExchangeAdapter(exchange))
/** /**
* Processes the message exchange. the caller supports having the exchange asynchronously processed. * Processes the message exchange. the caller supports having the exchange asynchronously processed.
@ -129,13 +127,15 @@ private[camel] class ActorProducer(val endpoint: ActorEndpoint, camel: Camel) ex
* The callback should therefore be careful of starting recursive loop. * The callback should therefore be careful of starting recursive loop.
* @return (doneSync) true to continue execute synchronously, false to continue being executed asynchronously * @return (doneSync) true to continue execute synchronously, false to continue being executed asynchronously
*/ */
def process(exchange: Exchange, callback: AsyncCallback): Boolean = { processExchangeAdapter(new CamelExchangeAdapter(exchange), callback) } def process(exchange: Exchange, callback: AsyncCallback): Boolean = processExchangeAdapter(new CamelExchangeAdapter(exchange), callback)
/** /**
* For internal use only. Processes the [[akka.camel.internal.CamelExchangeAdapter]] * For internal use only. Processes the [[akka.camel.internal.CamelExchangeAdapter]]
* @param exchange the [[akka.camel.internal.CamelExchangeAdapter]] * @param exchange the [[akka.camel.internal.CamelExchangeAdapter]]
*
* WARNING UNBOUNDED BLOCKING AWAITS
*/ */
private[camel] def processExchangeAdapter(exchange: CamelExchangeAdapter) { private[camel] def processExchangeAdapter(exchange: CamelExchangeAdapter): Unit = {
val isDone = new CountDownLatch(1) val isDone = new CountDownLatch(1)
processExchangeAdapter(exchange, new AsyncCallback { def done(doneSync: Boolean) { isDone.countDown() } }) processExchangeAdapter(exchange, new AsyncCallback { def done(doneSync: Boolean) { isDone.countDown() } })
isDone.await() // this should never wait forever as the process(exchange, callback) method guarantees that. isDone.await() // this should never wait forever as the process(exchange, callback) method guarantees that.
@ -151,10 +151,10 @@ private[camel] class ActorProducer(val endpoint: ActorEndpoint, camel: Camel) ex
private[camel] def processExchangeAdapter(exchange: CamelExchangeAdapter, callback: AsyncCallback): Boolean = { private[camel] def processExchangeAdapter(exchange: CamelExchangeAdapter, callback: AsyncCallback): Boolean = {
// these notify methods are just a syntax sugar // these notify methods are just a syntax sugar
def notifyDoneSynchronously[A](a: A = null) = callback.done(true) def notifyDoneSynchronously[A](a: A = null): Unit = callback.done(true)
def notifyDoneAsynchronously[A](a: A = null) = callback.done(false) def notifyDoneAsynchronously[A](a: A = null): Unit = callback.done(false)
def message = messageFor(exchange) def message: CamelMessage = messageFor(exchange)
if (exchange.isOutCapable) { //InOut if (exchange.isOutCapable) { //InOut
sendAsync(message, onComplete = forwardResponseTo(exchange) andThen notifyDoneAsynchronously) sendAsync(message, onComplete = forwardResponseTo(exchange) andThen notifyDoneAsynchronously)
@ -186,39 +186,29 @@ private[camel] class ActorProducer(val endpoint: ActorEndpoint, camel: Camel) ex
private def sendAsync(message: CamelMessage, onComplete: PartialFunction[Either[Throwable, Any], Unit]): Boolean = { private def sendAsync(message: CamelMessage, onComplete: PartialFunction[Either[Throwable, Any], Unit]): Boolean = {
try { try {
val actor = actorFor(endpoint.path) actorFor(endpoint.path).ask(message)(Timeout(endpoint.replyTimeout)).onComplete(onComplete)
val future = actor.ask(message)(new Timeout(endpoint.replyTimeout))
future.onComplete(onComplete)
} catch { } catch {
case NonFatal(e) onComplete(Left(e)) case NonFatal(e) onComplete(Left(e))
} }
false // Done async false // Done async
} }
private def fireAndForget(message: CamelMessage, exchange: CamelExchangeAdapter) { private def fireAndForget(message: CamelMessage, exchange: CamelExchangeAdapter): Unit =
try { try { actorFor(endpoint.path) ! message } catch { case NonFatal(e) exchange.setFailure(new FailureResult(e)) }
actorFor(endpoint.path) ! message
} catch {
case e exchange.setFailure(new FailureResult(e))
}
}
private[this] def actorFor(path: ActorEndpointPath): ActorRef = private[this] def actorFor(path: ActorEndpointPath): ActorRef =
path.findActorIn(camel.system) getOrElse (throw new ActorNotRegisteredException(path.actorPath)) path.findActorIn(camel.system) getOrElse (throw new ActorNotRegisteredException(path.actorPath))
private[this] def messageFor(exchange: CamelExchangeAdapter) = private[this] def messageFor(exchange: CamelExchangeAdapter) =
exchange.toRequestMessage(Map(CamelMessage.MessageExchangeId -> exchange.getExchangeId)) exchange.toRequestMessage(Map(CamelMessage.MessageExchangeId -> exchange.getExchangeId))
} }
/** /**
* For internal use only. Converts Strings to [[akka.util.Duration]]s * For internal use only. Converts Strings to [[akka.util.Duration]]s
*/ */
private[camel] object DurationTypeConverter extends TypeConverter { private[camel] object DurationTypeConverter extends TypeConverter {
def convertTo[T](`type`: Class[T], value: AnyRef) = { def convertTo[T](`type`: Class[T], value: AnyRef) = Duration(value.toString).asInstanceOf[T] //FIXME WTF
Duration(value.toString).asInstanceOf[T] def convertTo[T](`type`: Class[T], exchange: Exchange, value: AnyRef): T = convertTo(`type`, value)
}
def convertTo[T](`type`: Class[T], exchange: Exchange, value: AnyRef) = convertTo(`type`, value)
def mandatoryConvertTo[T](`type`: Class[T], value: AnyRef) = convertTo(`type`, value) def mandatoryConvertTo[T](`type`: Class[T], value: AnyRef) = convertTo(`type`, value)
def mandatoryConvertTo[T](`type`: Class[T], exchange: Exchange, value: AnyRef) = convertTo(`type`, value) def mandatoryConvertTo[T](`type`: Class[T], exchange: Exchange, value: AnyRef) = convertTo(`type`, value)
def toString(duration: Duration) = duration.toNanos + " nanos" def toString(duration: Duration) = duration.toNanos + " nanos"
@ -243,15 +233,15 @@ private[camel] case class ActorEndpointPath private (actorPath: String) {
* For internal use only. Companion of `ActorEndpointPath` * For internal use only. Companion of `ActorEndpointPath`
*/ */
private[camel] object ActorEndpointPath { private[camel] object ActorEndpointPath {
private val consumerConfig = new ConsumerConfig {} private val consumerConfig: ConsumerConfig = new ConsumerConfig {}
def apply(actorRef: ActorRef) = new ActorEndpointPath(actorRef.path.toString) def apply(actorRef: ActorRef): ActorEndpointPath = new ActorEndpointPath(actorRef.path.toString)
/** /**
* Creates an [[akka.camel.internal.component.ActorEndpointPath]] from the remaining part of the endpoint URI (the part after the scheme, without the parameters of the URI). * Creates an [[akka.camel.internal.component.ActorEndpointPath]] from the remaining part of the endpoint URI (the part after the scheme, without the parameters of the URI).
* Expects the remaining part of the URI (the actor path) in a format: path:%s * Expects the remaining part of the URI (the actor path) in a format: path:%s
*/ */
def fromCamelPath(camelPath: String) = camelPath match { def fromCamelPath(camelPath: String): ActorEndpointPath = camelPath match {
case id if id startsWith "path:" new ActorEndpointPath(id substring 5) case id if id startsWith "path:" new ActorEndpointPath(id substring 5)
case _ throw new IllegalArgumentException("Invalid path: [%s] - should be path:<actorPath>" format camelPath) case _ throw new IllegalArgumentException("Invalid path: [%s] - should be path:<actorPath>" format camelPath)
} }

View file

@ -13,7 +13,7 @@ import org.apache.camel.{ ProducerTemplate, CamelContext }
* class is meant to be used from Java. * class is meant to be used from Java.
*/ */
abstract class UntypedConsumerActor extends UntypedActor with Consumer { abstract class UntypedConsumerActor extends UntypedActor with Consumer {
final def endpointUri = getEndpointUri final def endpointUri: String = getEndpointUri
/** /**
* Returns the Camel endpoint URI to consume messages from. * Returns the Camel endpoint URI to consume messages from.

View file

@ -40,16 +40,14 @@ abstract class UntypedProducerActor extends UntypedActor with ProducerSupport {
final override def transformResponse(msg: Any): AnyRef = onTransformResponse(msg.asInstanceOf[AnyRef]) final override def transformResponse(msg: Any): AnyRef = onTransformResponse(msg.asInstanceOf[AnyRef])
final override def routeResponse(msg: Any): Unit = onRouteResponse(msg.asInstanceOf[AnyRef]) final override def routeResponse(msg: Any): Unit = onRouteResponse(msg.asInstanceOf[AnyRef])
final override def endpointUri = getEndpointUri final override def endpointUri: String = getEndpointUri
final override def oneway = isOneway final override def oneway: Boolean = isOneway
/** /**
* Default implementation of UntypedActor.onReceive * Default implementation of UntypedActor.onReceive
*/ */
def onReceive(message: Any) { def onReceive(message: Any): Unit = produce(message)
produce(message)
}
/** /**
* Returns the Camel endpoint URI to produce messages to. * Returns the Camel endpoint URI to produce messages to.
@ -61,7 +59,7 @@ abstract class UntypedProducerActor extends UntypedActor with ProducerSupport {
* If set to true, this producer communicates with the Camel endpoint with an in-only message * If set to true, this producer communicates with the Camel endpoint with an in-only message
* exchange pattern (fire and forget). * exchange pattern (fire and forget).
*/ */
def isOneway() = super.oneway def isOneway(): Boolean = super.oneway
/** /**
* Returns the <code>CamelContext</code>. * Returns the <code>CamelContext</code>.

View file

@ -7,5 +7,6 @@ package akka
import org.apache.camel.model.ProcessorDefinition import org.apache.camel.model.ProcessorDefinition
package object camel { package object camel {
//TODO Why do I exist?
implicit def toActorRouteDefinition(definition: ProcessorDefinition[_]) = new ActorRouteDefinition(definition) implicit def toActorRouteDefinition(definition: ProcessorDefinition[_]) = new ActorRouteDefinition(definition)
} }