Changed unlimited blocking to block up untill the replyTimeout, and added a test for it, where manual Ack is never received ticket #1926

removed unnecesary methods
This commit is contained in:
RayRoestenburg 2012-09-03 12:08:46 +02:00
parent 112e02e965
commit d0a50f66e7
47 changed files with 1056 additions and 784 deletions

View file

@ -9,7 +9,8 @@ akka {
camel {
# Whether JMX should be enabled or disabled for the Camel Context
jmx = off
# enable/disable streaming cache on the Camel Context
streamingCache = on
consumer {
# Configured setting which determines whether one-way communications between an endpoint and this consumer actor
# should be auto-acknowledged or application-acknowledged.

View file

@ -10,16 +10,12 @@ import akka.actor.{ ActorSystem, Props, ActorRef }
import akka.pattern._
import scala.concurrent.util.Duration
import concurrent.{ ExecutionContext, Future }
import scala.concurrent.util.FiniteDuration
/**
* Activation trait that can be used to wait on activation or de-activation of Camel endpoints.
* The Camel endpoints are activated asynchronously. This trait can signal when an endpoint is activated or de-activated.
*/
trait Activation {
def system: ActorSystem //FIXME Why is this here, what's it needed for and who should use it?
private val activationTracker = system.actorOf(Props[ActivationTracker], "camelActivationTracker") //FIXME Why is this also top level?
/**
* Produces a Future with the specified endpoint that will be completed when the endpoint has been activated,
@ -28,11 +24,7 @@ trait Activation {
* @param endpoint the endpoint to be activated
* @param timeout the timeout for the Future
*/
def activationFutureFor(endpoint: ActorRef)(implicit timeout: FiniteDuration, executor: ExecutionContext): Future[ActorRef] =
(activationTracker.ask(AwaitActivation(endpoint))(Timeout(timeout))).map[ActorRef]({
case EndpointActivated(`endpoint`) endpoint
case EndpointFailedToActivate(`endpoint`, cause) throw cause
})
def activationFutureFor(endpoint: ActorRef)(implicit timeout: Timeout, executor: ExecutionContext): Future[ActorRef]
/**
* Produces a Future which will be completed when the given endpoint has been deactivated or
@ -41,9 +33,5 @@ trait Activation {
* @param endpoint the endpoint to be deactivated
* @param timeout the timeout of the Future
*/
def deactivationFutureFor(endpoint: ActorRef)(implicit timeout: FiniteDuration, executor: ExecutionContext): Future[ActorRef] =
(activationTracker.ask(AwaitDeActivation(endpoint))(Timeout(timeout))).map[ActorRef]({
case EndpointDeActivated(`endpoint`) endpoint
case EndpointFailedToDeActivate(`endpoint`, cause) throw cause
})
def deactivationFutureFor(endpoint: ActorRef)(implicit timeout: Timeout, executor: ExecutionContext): Future[ActorRef]
}

View file

@ -6,7 +6,8 @@ package akka.camel
import internal._
import akka.actor._
import org.apache.camel.{ ProducerTemplate, CamelContext }
import org.apache.camel.ProducerTemplate
import org.apache.camel.impl.DefaultCamelContext
import com.typesafe.config.Config
import scala.concurrent.util.Duration
import java.util.concurrent.TimeUnit._
@ -17,16 +18,16 @@ import scala.concurrent.util.FiniteDuration
* '''Note:''' `CamelContext` and `ProducerTemplate` are stopped when the associated actor system is shut down.
* This trait can be obtained through the [[akka.camel.CamelExtension]] object.
*/
trait Camel extends ConsumerRegistry with ProducerRegistry with Extension with Activation {
trait Camel extends Extension with Activation {
/**
* Underlying camel context.
*
* It can be used to configure camel manually, i.e. when the user wants to add new routes or endpoints,
* i.e. {{{camel.context.addRoutes(...)}}}
*
* @see [[org.apache.camel.CamelContext]]
* @see [[org.apache.camel.impl.DefaultCamelContext]]
*/
def context: CamelContext
def context: DefaultCamelContext
/**
* The Camel ProducerTemplate.
@ -38,6 +39,16 @@ trait Camel extends ConsumerRegistry with ProducerRegistry with Extension with A
* The settings for the CamelExtension
*/
def settings: CamelSettings
/**
* For internal use only. Returns the camel supervisor actor.
*/
private[camel] def supervisor: ActorRef
/**
* For internal use only. Returns the associated ActorSystem.
*/
private[camel] def system: ActorSystem
}
/**
@ -68,6 +79,12 @@ class CamelSettings private[camel] (config: Config) {
*
*/
final val jmxStatistics: Boolean = config.getBoolean("akka.camel.jmx")
/**
* enables or disables streamingCache on the Camel Context
*/
final val streamingCache: Boolean = config.getBoolean("akka.camel.streamingCache")
}
/**
@ -97,4 +114,4 @@ object CamelExtension extends ExtensionId[Camel] with ExtensionIdProvider {
override def lookup(): ExtensionId[Camel] = CamelExtension
override def get(system: ActorSystem): Camel = super.get(system)
}
}

View file

@ -8,14 +8,14 @@ import java.util.{ Map ⇒ JMap, Set ⇒ JSet }
import scala.collection.JavaConversions._
import akka.japi.{ Function JFunction }
import org.apache.camel.{ CamelContext, Message JCamelMessage }
import akka.AkkaException
import scala.reflect.ClassTag
import akka.dispatch.Mapper
import util.{ Success, Failure, Try }
/**
* An immutable representation of a Camel message.
*
* @author Martin Krasser
*/
case class CamelMessage(body: Any, headers: Map[String, Any]) {
@ -47,86 +47,59 @@ case class CamelMessage(body: Any, headers: Map[String, Any]) {
*/
def getHeaders: JMap[String, Any] = headers
/**
* Returns the header with given <code>name</code>. Throws <code>NoSuchElementException</code>
* if the header doesn't exist.
*/
def header(name: String): Option[Any] = headers.get(name)
/**
* Returns the header with given <code>name</code>. Throws <code>NoSuchElementException</code>
* if the header doesn't exist.
* <p>
* Java API
*/
def getHeader(name: String): Any = headers(name)
/**
* Creates a CamelMessage with a transformed body using a <code>transformer</code> function.
*/
def mapBody[A, B](transformer: A B): CamelMessage = withBody(transformer(body.asInstanceOf[A]))
/**
* Creates a CamelMessage with a transformed body using a <code>transformer</code> function.
* <p>
* Java API
*/
def mapBody[A, B](transformer: JFunction[A, B]): CamelMessage = withBody(transformer(body.asInstanceOf[A]))
/**
* Creates a CamelMessage with a given <code>body</code>.
*/
def withBody(body: Any): CamelMessage = CamelMessage(body, this.headers)
/**
* Creates a new CamelMessage with given <code>headers</code>.
*/
def withHeaders[A](headers: Map[String, A]): CamelMessage = copy(this.body, headers)
/**
* Creates a new CamelMessage with given <code>headers</code>. A copy of the headers map is made.
* <p>
* Java API
*/
def withHeaders[A](headers: JMap[String, A]): CamelMessage = withHeaders(headers.toMap)
def withHeaders[A](headers: JMap[String, A]): CamelMessage = copy(this.body, headers.toMap)
/**
* Creates a new CamelMessage with given <code>headers</code> added to the current headers.
*/
def addHeaders[A](headers: Map[String, A]): CamelMessage = copy(this.body, this.headers ++ headers)
/**
* Creates a new CamelMessage with given <code>headers</code> added to the current headers.
* A copy of the headers map is made.
* Returns the header by given <code>name</code> parameter in a [[scala.util.Try]]. The header is converted to type <code>T</code>, which is returned
* in a [[scala.util.Success]]. If an exception occurs during the conversion to the type <code>T</code> or when the header cannot be found,
* the exception is returned in a [[scala.util.Failure]].
*
* <p>
* Java API
* The CamelContext is accessible in a [[akka.camel.javaapi.UntypedConsumerActor]] and [[akka.camel.javaapi.UntypedProducerActor]]
* using the `getCamelContext` method, and is available on the [[akka.camel.CamelExtension]].
*
*/
def addHeaders[A](headers: JMap[String, A]): CamelMessage = addHeaders(headers.toMap)
/**
* Creates a new CamelMessage with the given <code>header</code> added to the current headers.
*/
def addHeader(header: (String, Any)): CamelMessage = copy(this.body, this.headers + header)
/**
* Creates a new CamelMessage with the given header, represented by <code>name</code> and
* <code>value</code> added to the existing headers.
* <p>
* Java API
*/
def addHeader(name: String, value: Any): CamelMessage = addHeader((name, value))
/**
* Creates a new CamelMessage where the header with given <code>headerName</code> is removed from
* the existing headers.
*/
def withoutHeader(headerName: String): CamelMessage = copy(this.body, this.headers - headerName)
def copyContentTo(to: JCamelMessage): Unit = {
to.setBody(this.body)
for ((name, value) this.headers) to.getHeaders.put(name, value.asInstanceOf[AnyRef])
def headerAs[T](name: String)(implicit t: ClassTag[T], camelContext: CamelContext): Try[T] = {
def tryHeader = headers.get(name).map(camelContext.getTypeConverter.mandatoryConvertTo[T](t.runtimeClass.asInstanceOf[Class[T]], _)) match {
case Some(header) header
case None throw new NoSuchElementException(name)
}
Try(tryHeader)
}
/**
* Returns the header by given <code>name</code> parameter. The header is converted to type <code>T</code> as defined by the <code>clazz</code> parameter.
* An exception is thrown when the conversion to the type <code>T</code> fails or when the header cannot be found.
* <p>
* The CamelContext is accessible in a [[akka.camel.javaapi.UntypedConsumerActor]] and [[akka.camel.javaapi.UntypedProducerActor]]
* using the `getCamelContext` method, and is available on the [[akka.camel.CamelExtension]].
* <p>
* Java API
*/
def getHeaderAs[T](name: String, clazz: Class[T], camelContext: CamelContext): T = headerAs[T](name)(ClassTag(clazz), camelContext) match {
case Success(header) header
case Failure(t) throw t
}
/**
* Returns a new CamelMessage with a transformed body using a <code>transformer</code> function.
* This method will throw a [[java.lang.ClassCastException]] if the body cannot be mapped to type A.
*/
def mapBody[A, B](transformer: A B): CamelMessage = copy(body = transformer(body.asInstanceOf[A]))
/**
* Returns a new CamelMessage with a transformed body using a <code>transformer</code> function.
* This method will throw a [[java.lang.ClassCastException]] if the body cannot be mapped to type A.
* <p>
* Java API
*/
def mapBody[A, B](transformer: Mapper[A, B]): CamelMessage = copy(body = transformer(body.asInstanceOf[A]))
/**
* Returns the body of the message converted to the type <code>T</code>. Conversion is done
* using Camel's type converter. The type converter is obtained from the CamelContext that is passed in.
@ -148,6 +121,12 @@ case class CamelMessage(body: Any, headers: Map[String, Any]) {
*/
def getBodyAs[T](clazz: Class[T], camelContext: CamelContext): T = camelContext.getTypeConverter.mandatoryConvertTo[T](clazz, body)
/**
* Returns a new CamelMessage with a new body, while keeping the same headers.
* <p>
* Java API
*/
def withBody[T](body: T) = this.copy(body = body)
/**
* Creates a CamelMessage with current <code>body</code> converted to type <code>T</code>.
* The CamelContext is accessible in a [[akka.camel.javaapi.UntypedConsumerActor]] and [[akka.camel.javaapi.UntypedProducerActor]]
@ -163,28 +142,7 @@ case class CamelMessage(body: Any, headers: Map[String, Any]) {
* <p>
* Java API
*/
def withBodyAs[T](clazz: Class[T])(implicit camelContext: CamelContext): CamelMessage = withBody(getBodyAs(clazz, camelContext))
/**
* Returns the header with given <code>name</code> converted to type <code>T</code>. Throws
* <code>NoSuchElementException</code> if the header doesn't exist.
* <p>
* The CamelContext is accessible in a [[akka.camel.javaapi.UntypedConsumerActor]] and [[akka.camel.javaapi.UntypedProducerActor]]
* using the `getCamelContext` method, and is available on the [[akka.camel.CamelExtension]].
*
*/
def headerAs[T](name: String)(implicit t: ClassTag[T], camelContext: CamelContext): Option[T] = header(name).map(camelContext.getTypeConverter.mandatoryConvertTo[T](t.runtimeClass.asInstanceOf[Class[T]], _))
/**
* Returns the header with given <code>name</code> converted to type as given by the <code>clazz</code>
* parameter. Throws <code>NoSuchElementException</code> if the header doesn't exist.
* <p>
* The CamelContext is accessible in a [[akka.camel.javaapi.UntypedConsumerActor]] and [[akka.camel.javaapi.UntypedProducerActor]]
* using the `getCamelContext` method, and is available on the [[akka.camel.CamelExtension]].
* <p>
* Java API
*/
def getHeaderAs[T](name: String, clazz: Class[T], camelContext: CamelContext): T = headerAs[T](name)(ClassTag(clazz), camelContext).get
def withBodyAs[T](clazz: Class[T])(implicit camelContext: CamelContext): CamelMessage = copy(body = getBodyAs(clazz, camelContext))
}
@ -208,24 +166,28 @@ object CamelMessage {
* CamelMessage then <code>msg</code> is returned, otherwise <code>msg</code> is set as body of a
* newly created CamelMessage object.
*/
def canonicalize(msg: Any) = msg match {
private[camel] def canonicalize(msg: Any) = msg match {
case mobj: CamelMessage mobj
case body CamelMessage(body, Map.empty)
}
/**
* Creates a new CamelMessage object from the Camel message.
*/
def from(camelMessage: JCamelMessage): CamelMessage = from(camelMessage, Map.empty)
/**
* Creates a new CamelMessage object from the Camel message.
*
* @param headers additional headers to set on the created CamelMessage in addition to those
* in the Camel message.
*/
def from(camelMessage: JCamelMessage, headers: Map[String, Any]): CamelMessage = CamelMessage(camelMessage.getBody, headers ++ camelMessage.getHeaders)
private[camel] def from(camelMessage: JCamelMessage, headers: Map[String, Any]): CamelMessage = CamelMessage(camelMessage.getBody, headers ++ camelMessage.getHeaders)
/**
* For internal use only.
* copies the content of this CamelMessage to an Apache Camel Message.
*
*/
private[camel] def copyContent(from: CamelMessage, to: JCamelMessage): Unit = {
to.setBody(from.body)
for ((name, value) from.headers) to.getHeaders.put(name, value.asInstanceOf[AnyRef])
}
}
/**
@ -242,7 +204,6 @@ case object Ack {
* An exception indicating that the exchange to the camel endpoint failed.
* It contains the failure cause obtained from Exchange.getException and the headers from either the Exchange.getIn
* message or Exchange.getOut message, depending on the exchange pattern.
*
*/
class AkkaCamelException private[akka] (cause: Throwable, val headers: Map[String, Any])
extends AkkaException(cause.getMessage, cause) {

View file

@ -4,20 +4,19 @@
package akka.camel
import language.postfixOps
import internal.component.DurationTypeConverter
import akka.camel.internal.CamelSupervisor.Register
import org.apache.camel.model.{ RouteDefinition, ProcessorDefinition }
import akka.actor._
import scala.concurrent.util.Duration
import scala.concurrent.util.duration._
import scala.concurrent.util.FiniteDuration
import akka.dispatch.Mapper
/**
* Mixed in by Actor implementations that consume message from Camel endpoints.
*
* @author Martin Krasser
*/
trait Consumer extends Actor with CamelSupport with ConsumerConfig {
trait Consumer extends Actor with CamelSupport {
import Consumer._
/**
* Must return the Camel endpoint URI that the consumer wants to consume messages from.
*/
@ -33,11 +32,13 @@ trait Consumer extends Actor with CamelSupport with ConsumerConfig {
// with order of execution of trait body in the Java version (UntypedConsumerActor)
// where getEndpointUri is called before its constructor (where a uri is set to return from getEndpointUri)
// and remains null. CustomRouteTest provides a test to verify this.
camel.registerConsumer(endpointUri, this, activationTimeout)
register()
}
private[this] def register() {
camel.supervisor ! Register(self, endpointUri, Some(ConsumerConfig(activationTimeout, replyTimeout, autoAck, onRouteDefinition)))
}
}
trait ConsumerConfig { this: CamelSupport
/**
* How long the actor should wait for activation before it fails.
*/
@ -48,7 +49,7 @@ trait ConsumerConfig { this: CamelSupport ⇒
* the endpoint can take to send the response before the message exchange fails. It defaults to 1 minute.
* This setting is used for out-capable, in-only, manually acknowledged communication.
*/
def replyTimeout: Duration = camel.settings.replyTimeout
def replyTimeout: FiniteDuration = camel.settings.replyTimeout
/**
* Determines whether one-way communications between an endpoint and this consumer actor
@ -58,9 +59,36 @@ trait ConsumerConfig { this: CamelSupport ⇒
def autoAck: Boolean = camel.settings.autoAck
/**
* The route definition handler for creating a custom route to this consumer instance.
* Returns the route definition handler for creating a custom route to this consumer.
* By default it returns an identity function, override this method to
* return a custom route definition handler. The returned function is not allowed to close over 'this', meaning it is
* not allowed to refer to the actor instance itself, since that can easily cause concurrent shared state issues.
*/
//FIXME: write a test confirming onRouteDefinition gets called
def onRouteDefinition(rd: RouteDefinition): ProcessorDefinition[_] = rd
def onRouteDefinition: RouteDefinition ProcessorDefinition[_] = {
val mapper = getRouteDefinitionHandler
if (mapper != identityRouteMapper) mapper.apply _
else identityRouteMapper
}
/**
* Java API. Returns the [[akka.dispatch.Mapper]] function that will be used as a route definition handler
* for creating custom route to this consumer. By default it returns an identity function, override this method to
* return a custom route definition handler. The [[akka.dispatch.Mapper]] is not allowed to close over 'this', meaning it is
* not allowed to refer to the actor instance itself, since that can easily cause concurrent shared state issues.
*/
def getRouteDefinitionHandler: Mapper[RouteDefinition, ProcessorDefinition[_]] = identityRouteMapper
}
/**
* Internal use only.
*/
private[camel] object Consumer {
val identityRouteMapper = new Mapper[RouteDefinition, ProcessorDefinition[_]]() {
override def checkedApply(rd: RouteDefinition): ProcessorDefinition[_] = rd
}
}
/**
* For internal use only.
* Captures the configuration of the Consumer.
*/
private[camel] case class ConsumerConfig(activationTimeout: FiniteDuration, replyTimeout: FiniteDuration, autoAck: Boolean, onRouteDefinition: RouteDefinition ProcessorDefinition[_]) extends NoSerializationVerificationNeeded

View file

@ -4,10 +4,11 @@
package akka.camel
import akka.actor.Actor
import akka.actor.{ Props, NoSerializationVerificationNeeded, ActorRef, Actor }
import internal.CamelSupervisor.{ CamelProducerObjects, Register }
import internal.CamelExchangeAdapter
import akka.actor.Status.Failure
import org.apache.camel.{ Endpoint, Exchange, ExchangePattern, AsyncCallback }
import org.apache.camel.{ Endpoint, ExchangePattern, AsyncCallback }
import org.apache.camel.processor.SendProcessor
/**
@ -15,9 +16,16 @@ import org.apache.camel.processor.SendProcessor
*
* @author Martin Krasser
*/
trait ProducerSupport extends CamelSupport { this: Actor
trait ProducerSupport extends Actor with CamelSupport {
private[this] var messages = Map[ActorRef, Any]()
private[this] var producerChild: Option[ActorRef] = None
protected[this] lazy val (endpoint: Endpoint, processor: SendProcessor) = camel.registerProducer(self, endpointUri)
override def preStart() {
super.preStart()
register()
}
private[this] def register() { camel.supervisor ! Register(self, endpointUri) }
/**
* CamelMessage headers to copy by default from request message to response-message.
@ -43,40 +51,6 @@ trait ProducerSupport extends CamelSupport { this: Actor ⇒
*/
def headersToCopy: Set[String] = headersToCopyDefault
/**
* Initiates a message exchange of given <code>pattern</code> with the endpoint specified by
* <code>endpointUri</code>. The in-message of the initiated exchange is the canonical form
* of <code>msg</code>. After sending the in-message, the processing result (response) is passed
* as argument to <code>receiveAfterProduce</code>. If the response is received synchronously from
* the endpoint then <code>receiveAfterProduce</code> is called synchronously as well. If the
* response is received asynchronously, the <code>receiveAfterProduce</code> is called
* asynchronously. The original
* sender and senderFuture are preserved.
*
* @see CamelMessage#canonicalize(Any)
*
* @param msg message to produce
* @param pattern exchange pattern
*/
protected def produce(msg: Any, pattern: ExchangePattern): Unit = {
// Need copies of sender reference here since the callback could be done
// later by another thread.
val producer = self
val originalSender = sender
val cmsg = CamelMessage.canonicalize(msg)
val xchg = new CamelExchangeAdapter(endpoint.createExchange(pattern))
xchg.setRequest(cmsg)
processor.process(xchg.exchange, new AsyncCallback {
// Ignoring doneSync, sending back async uniformly.
def done(doneSync: Boolean): Unit = producer.tell(
if (xchg.exchange.isFailed) xchg.toFailureResult(cmsg.headers(headersToCopy))
else MessageResult(xchg.toResponseMessage(cmsg.headers(headersToCopy))), originalSender)
})
}
/**
* Produces <code>msg</code> to the endpoint specified by <code>endpointUri</code>. Before the message is
* actually sent it is pre-processed by calling <code>receiveBeforeProduce</code>. If <code>oneway</code>
@ -85,12 +59,28 @@ trait ProducerSupport extends CamelSupport { this: Actor ⇒
* @see Producer#produce(Any, ExchangePattern)
*/
protected def produce: Receive = {
case CamelProducerObjects(endpoint, processor)
if (producerChild.isEmpty) {
producerChild = Some(context.actorOf(Props(new ProducerChild(endpoint, processor))))
messages = {
for (
child producerChild;
(sender, msg) messages
) child.tell(msg, sender)
Map()
}
}
case res: MessageResult routeResponse(res.message)
case res: FailureResult
val e = new AkkaCamelException(res.cause, res.headers)
routeResponse(Failure(e))
throw e
case msg produce(transformOutgoingMessage(msg), if (oneway) ExchangePattern.InOnly else ExchangePattern.InOut)
case msg
producerChild match {
case Some(child) child forward msg
case None messages += (sender -> msg)
}
}
/**
@ -117,8 +107,45 @@ trait ProducerSupport extends CamelSupport { this: Actor ⇒
protected def routeResponse(msg: Any): Unit = if (!oneway) sender ! transformResponse(msg)
}
private class ProducerChild(endpoint: Endpoint, processor: SendProcessor) extends Actor {
def receive = {
case msg @ (_: FailureResult | _: MessageResult) context.parent forward msg
case msg produce(endpoint, processor, transformOutgoingMessage(msg), if (oneway) ExchangePattern.InOnly else ExchangePattern.InOut)
}
/**
* Initiates a message exchange of given <code>pattern</code> with the endpoint specified by
* <code>endpointUri</code>. The in-message of the initiated exchange is the canonical form
* of <code>msg</code>. After sending the in-message, the processing result (response) is passed
* as argument to <code>receiveAfterProduce</code>. If the response is received synchronously from
* the endpoint then <code>receiveAfterProduce</code> is called synchronously as well. If the
* response is received asynchronously, the <code>receiveAfterProduce</code> is called
* asynchronously. The original
* sender and senderFuture are preserved.
*
* @see CamelMessage#canonicalize(Any)
* @param endpoint the endpoint
* @param processor the processor
* @param msg message to produce
* @param pattern exchange pattern
*/
protected def produce(endpoint: Endpoint, processor: SendProcessor, msg: Any, pattern: ExchangePattern): Unit = {
// Need copies of sender reference here since the callback could be done
// later by another thread.
val producer = self
val originalSender = sender
val xchg = new CamelExchangeAdapter(endpoint.createExchange(pattern))
val cmsg = CamelMessage.canonicalize(msg)
xchg.setRequest(cmsg)
processor.process(xchg.exchange, new AsyncCallback {
// Ignoring doneSync, sending back async uniformly.
def done(doneSync: Boolean): Unit = producer.tell(
if (xchg.exchange.isFailed) xchg.toFailureResult(cmsg.headers(headersToCopy))
else MessageResult(xchg.toResponseMessage(cmsg.headers(headersToCopy))), originalSender)
})
}
}
}
/**
* Mixed in by Actor implementations to produce messages to Camel endpoints.
*/
@ -132,14 +159,16 @@ trait Producer extends ProducerSupport { this: Actor ⇒
}
/**
* For internal use only.
* @author Martin Krasser
*/
private case class MessageResult(message: CamelMessage)
private case class MessageResult(message: CamelMessage) extends NoSerializationVerificationNeeded
/**
* For internal use only.
* @author Martin Krasser
*/
private case class FailureResult(cause: Throwable, headers: Map[String, Any] = Map.empty)
private case class FailureResult(cause: Throwable, headers: Map[String, Any] = Map.empty) extends NoSerializationVerificationNeeded
/**
* A one-way producer.

View file

@ -6,53 +6,64 @@ package akka.camel.internal
import akka.actor.ActorRef
/**
* Super class of all activation messages. Registration of the Camel [[akka.camel.Consumer]]s and [[akka.camel.Producer]]s
* is done asynchronously. Activation messages are sent in the Camel extension when endpoints are
* activated, de-activated, failed to activate and failed to de-activate.
* You can use the [[akka.camel.Activation]] trait which is available on [[akka.camel.Camel]]
* to await activation or de-activation of endpoints.
*/
private[camel] abstract class ActivationMessage(val actor: ActorRef)
private[camel] object ActivationProtocol {
/**
* Super class of all activation messages. Registration of the Camel [[akka.camel.Consumer]]s and [[akka.camel.Producer]]s
* is done asynchronously. Activation messages are sent in the Camel extension when endpoints are
* activated, de-activated, failed to activate and failed to de-activate.
* You can use the [[akka.camel.Activation]] trait which is available on [[akka.camel.Camel]]
* to await activation or de-activation of endpoints.
*/
@SerialVersionUID(1L)
private[camel] abstract class ActivationMessage(val actor: ActorRef) extends Serializable
/**
* For internal use only. companion object of <code>ActivationMessage</code>
*
*/
private[camel] object ActivationMessage {
def unapply(msg: ActivationMessage): Option[ActorRef] = Option(msg.actor)
}
/**
* For internal use only. companion object of <code>ActivationMessage</code>
*
*/
private[camel] object ActivationMessage {
def unapply(msg: ActivationMessage): Option[ActorRef] = Option(msg.actor)
}
/**
* Event message indicating that a single endpoint has been activated.
* You can use the [[akka.camel.Activation]] trait which is available on [[akka.camel.Camel]]
* to await activation or de-activation of endpoints.
* @param actorRef the endpoint that was activated
*/
private[camel] sealed case class EndpointActivated(actorRef: ActorRef) extends ActivationMessage(actorRef)
/**
* For internal use only.
* Event message indicating that a single endpoint has been activated.
* You can use the [[akka.camel.Activation]] trait which is available on [[akka.camel.Camel]]
* to await activation or de-activation of endpoints.
* @param actorRef the endpoint that was activated
*/
@SerialVersionUID(1L)
final case class EndpointActivated(actorRef: ActorRef) extends ActivationMessage(actorRef)
/**
* Event message indicating that a single endpoint failed to activate.
* You can use the [[akka.camel.Activation]] trait which is available on [[akka.camel.Camel]]
* to await activation or de-activation of endpoints.
* @param actorRef the endpoint that failed to activate
* @param cause the cause for failure
*/
private[camel] sealed case class EndpointFailedToActivate(actorRef: ActorRef, cause: Throwable) extends ActivationMessage(actorRef)
/**
* For internal use only.
* Event message indicating that a single endpoint failed to activate.
* You can use the [[akka.camel.Activation]] trait which is available on [[akka.camel.Camel]]
* to await activation or de-activation of endpoints.
* @param actorRef the endpoint that failed to activate
* @param cause the cause for failure
*/
@SerialVersionUID(1L)
final case class EndpointFailedToActivate(actorRef: ActorRef, cause: Throwable) extends ActivationMessage(actorRef)
/**
* Event message indicating that a single endpoint was de-activated.
* You can use the [[akka.camel.Activation]] trait which is available on [[akka.camel.Camel]]
* to await activation or de-activation of endpoints.
* @param actorRef the endpoint that was de-activated
*/
private[camel] sealed case class EndpointDeActivated(actorRef: ActorRef) extends ActivationMessage(actorRef)
/**
* For internal use only.
* Event message indicating that a single endpoint was de-activated.
* You can use the [[akka.camel.Activation]] trait which is available on [[akka.camel.Camel]]
* to await activation or de-activation of endpoints.
* @param actorRef the endpoint that was de-activated
*/
@SerialVersionUID(1L)
final case class EndpointDeActivated(actorRef: ActorRef) extends ActivationMessage(actorRef)
/**
* Event message indicating that a single endpoint failed to de-activate.
* You can use the [[akka.camel.Activation]] trait which is available on [[akka.camel.Camel]]
* to await activation or de-activation of endpoints.
* @param actorRef the endpoint that failed to de-activate
* @param cause the cause for failure
*/
private[camel] sealed case class EndpointFailedToDeActivate(actorRef: ActorRef, cause: Throwable) extends ActivationMessage(actorRef)
/**
* For internal use only.
* Event message indicating that a single endpoint failed to de-activate.
* You can use the [[akka.camel.Activation]] trait which is available on [[akka.camel.Camel]]
* to await activation or de-activation of endpoints.
* @param actorRef the endpoint that failed to de-activate
* @param cause the cause for failure
*/
@SerialVersionUID(1L)
final case class EndpointFailedToDeActivate(actorRef: ActorRef, cause: Throwable) extends ActivationMessage(actorRef)
}

View file

@ -7,6 +7,7 @@ package akka.camel.internal
import akka.actor._
import collection.mutable.WeakHashMap
import akka.camel._
import internal.ActivationProtocol._
/**
* For internal use only. An actor that tracks activation and de-activation of endpoints.
@ -93,11 +94,6 @@ private[akka] final class ActivationTracker extends Actor with ActorLogging {
}
/**
* Subscribes self to messages of type <code>ActivationMessage</code>
*/
override def preStart(): Unit = context.system.eventStream.subscribe(self, classOf[ActivationMessage])
override def receive = {
case msg @ ActivationMessage(ref)
(activations.getOrElseUpdate(ref, new ActivationStateMachine).receive orElse logStateWarning(ref))(msg)

View file

@ -30,14 +30,14 @@ private[camel] class CamelExchangeAdapter(val exchange: Exchange) {
/**
* Sets Exchange.getIn from the given CamelMessage object.
*/
def setRequest(msg: CamelMessage): Unit = msg.copyContentTo(request)
def setRequest(msg: CamelMessage): Unit = CamelMessage.copyContent(msg, request)
/**
* Depending on the exchange pattern, sets Exchange.getIn or Exchange.getOut from the given
* CamelMessage object. If the exchange is out-capable then the Exchange.getOut is set, otherwise
* Exchange.getIn.
*/
def setResponse(msg: CamelMessage): Unit = msg.copyContentTo(response)
def setResponse(msg: CamelMessage): Unit = CamelMessage.copyContent(msg, response)
/**
* Sets Exchange.getException from the given FailureResult message. Headers of the FailureResult message

View file

@ -0,0 +1,197 @@
package akka.camel.internal
import akka.actor._
import akka.camel.{ CamelSupport, ConsumerConfig }
import org.apache.camel.Endpoint
import org.apache.camel.processor.SendProcessor
import scala.util.control.NonFatal
import akka.actor.Terminated
import akka.actor.SupervisorStrategy.Resume
import akka.camel.internal.CamelSupervisor._
import akka.AkkaException
import akka.camel.internal.ActivationProtocol._
/**
* For internal use only.
* Top level supervisor for internal Camel actors
*/
private[camel] class CamelSupervisor extends Actor with CamelSupport {
private val activationTracker = context.actorOf(Props[ActivationTracker], "activationTracker")
private val registry: ActorRef = context.actorOf(Props(new Registry(activationTracker)), "registry")
override val supervisorStrategy = OneForOneStrategy() {
case NonFatal(e)
Resume
}
def receive = {
case AddWatch(actorRef) context.watch(actorRef)
case Terminated(actorRef) registry ! DeRegister(actorRef)
case msg: ActivationMessage activationTracker forward msg
case msg registry forward (msg)
}
}
/**
* For internal use only.
* Messages for the camel supervisor, registrations and de-registrations.
*/
private[camel] object CamelSupervisor {
@SerialVersionUID(1L)
sealed trait CamelSupervisorMessage extends Serializable
/**
* For internal use only.
* Registers a consumer or a producer.
*/
case class Register(actorRef: ActorRef, endpointUri: String, config: Option[ConsumerConfig] = None) extends NoSerializationVerificationNeeded
/**
* For internal use only.
* De-registers a producer or a consumer.
*/
@SerialVersionUID(1L)
case class DeRegister(actorRef: ActorRef) extends CamelSupervisorMessage
/**
* For internal use only.
* Adds a watch for the actor
*/
@SerialVersionUID(1L)
case class AddWatch(actorRef: ActorRef) extends CamelSupervisorMessage
/**
* For internal use only.
* Provides a Producer with the required camel objects to function.
*/
case class CamelProducerObjects(endpoint: Endpoint, processor: SendProcessor) extends NoSerializationVerificationNeeded
}
/**
* For internal use only.
* Thrown by registrars to indicate that the actor could not be de-activated.
*/
private[camel] class ActorDeActivationException(val actorRef: ActorRef, cause: Throwable) extends AkkaException("Actor [%s] failed to de-activate".format(actorRef), cause)
/**
* For internal use only.
* Thrown by the registrars to indicate that the actor could not be activated.
*/
private[camel] class ActorActivationException(val actorRef: ActorRef, cause: Throwable) extends AkkaException("Actor [%s] failed to activate".format(actorRef), cause)
/**
* For internal use only.
* Registry for Camel Consumers and Producers. Supervises the registrars.
*/
private[camel] class Registry(activationTracker: ActorRef) extends Actor with CamelSupport {
import context.{ stop, parent }
private val producerRegistrar = context.actorOf(Props(new ProducerRegistrar(activationTracker)), "producerRegistrar")
private val consumerRegistrar = context.actorOf(Props(new ConsumerRegistrar(activationTracker)), "consumerRegistrar")
private var producers = Set[ActorRef]()
private var consumers = Set[ActorRef]()
override val supervisorStrategy = OneForOneStrategy() {
case e: ActorActivationException
activationTracker ! EndpointFailedToActivate(e.actorRef, e.getCause)
stop(e.actorRef)
Resume
case e: ActorDeActivationException
activationTracker ! EndpointFailedToDeActivate(e.actorRef, e.getCause)
stop(e.actorRef)
Resume
case NonFatal(e)
Resume
}
def receive = {
case msg @ Register(consumer, _, Some(_))
if (!consumers(consumer)) {
consumers += consumer
consumerRegistrar forward msg
parent ! AddWatch(consumer)
}
case msg @ Register(producer, _, None)
if (!producers(producer)) {
producers += producer
producerRegistrar forward msg
parent ! AddWatch(producer)
}
case DeRegister(actorRef)
producers.find(_ == actorRef).foreach { p
deRegisterProducer(p)
producers -= p
}
consumers.find(_ == actorRef).foreach { c
deRegisterConsumer(c)
consumers -= c
}
}
private def deRegisterConsumer(actorRef: ActorRef) { consumerRegistrar ! DeRegister(actorRef) }
private def deRegisterProducer(actorRef: ActorRef) { producerRegistrar ! DeRegister(actorRef) }
}
/**
* For internal use only.
* Registers Producers.
*/
private[camel] class ProducerRegistrar(activationTracker: ActorRef) extends Actor with CamelSupport {
private var camelObjects = Map[ActorRef, (Endpoint, SendProcessor)]()
def receive = {
case Register(producer, endpointUri, _)
if (!camelObjects.contains(producer)) {
try {
val endpoint = camelContext.getEndpoint(endpointUri)
val processor = new SendProcessor(endpoint)
camelObjects += producer -> (endpoint, processor)
// if this throws, the supervisor stops the producer and de-registers it on termination
processor.start()
producer ! CamelProducerObjects(endpoint, processor)
activationTracker ! EndpointActivated(producer)
} catch {
case NonFatal(e) throw new ActorActivationException(producer, e)
}
}
case DeRegister(producer)
camelObjects.get(producer).foreach {
case (_, processor)
try {
camelObjects.get(producer).foreach(_._2.stop())
camelObjects -= producer
activationTracker ! EndpointDeActivated(producer)
} catch {
case NonFatal(e) throw new ActorDeActivationException(producer, e)
}
}
}
}
/**
* For internal use only.
* Registers Consumers.
*/
private[camel] class ConsumerRegistrar(activationTracker: ActorRef) extends Actor with CamelSupport {
def receive = {
case Register(consumer, endpointUri, Some(consumerConfig))
try {
// if this throws, the supervisor stops the consumer and de-registers it on termination
camelContext.addRoutes(new ConsumerActorRouteBuilder(endpointUri, consumer, consumerConfig))
activationTracker ! EndpointActivated(consumer)
} catch {
case NonFatal(e) throw new ActorActivationException(consumer, e)
}
case DeRegister(consumer)
try {
val route = consumer.path.toString
camelContext.stopRoute(route)
camelContext.removeRoute(route)
activationTracker ! EndpointDeActivated(consumer)
} catch {
case NonFatal(e) throw new ActorDeActivationException(consumer, e)
}
}
}

View file

@ -0,0 +1,39 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.camel.internal
import akka.camel._
import component.CamelPath
import java.io.InputStream
import org.apache.camel.builder.RouteBuilder
import akka.actor._
import org.apache.camel.model.RouteDefinition
import akka.serialization.Serializer
/**
* For internal use only.
* Builder of a route to a target which can be an actor.
*
* @param endpointUri endpoint URI of the consumer actor.
*
* @author Martin Krasser
*/
private[camel] class ConsumerActorRouteBuilder(endpointUri: String, consumer: ActorRef, config: ConsumerConfig) extends RouteBuilder {
protected def targetActorUri = CamelPath.toUri(consumer, config.autoAck, config.replyTimeout)
def configure() {
val scheme = endpointUri take endpointUri.indexOf(":") // e.g. "http" from "http://whatever/..."
val route = from(endpointUri).routeId(consumer.path.toString)
val converted = Conversions(scheme, route)
val userCustomized = applyUserRouteCustomization(converted)
userCustomized.to(targetActorUri)
}
def applyUserRouteCustomization(rd: RouteDefinition) = config.onRouteDefinition(rd)
}

View file

@ -1,151 +0,0 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.camel.internal
import akka.camel._
import component.CamelPath
import java.io.InputStream
import org.apache.camel.builder.RouteBuilder
import akka.actor._
import collection.mutable
import org.apache.camel.model.RouteDefinition
import org.apache.camel.CamelContext
import scala.concurrent.util.Duration
import concurrent.Await
import akka.util.Timeout
import scala.concurrent.util.FiniteDuration
/**
* For internal use only.
* Manages consumer registration. Consumers call registerConsumer method to register themselves when they get created.
* ActorEndpoint uses it to lookup an actor by its path.
*/
private[camel] trait ConsumerRegistry { this: Activation
def system: ActorSystem
def context: CamelContext
/**
* For internal use only.
*/
private[this] lazy val idempotentRegistry = system.actorOf(Props(new IdempotentCamelConsumerRegistry(context)))
/**
* For internal use only. BLOCKING
* @param endpointUri the URI to register the consumer on
* @param consumer the consumer
* @param activationTimeout the timeout for activation
* @return the actorRef to the consumer
*/
private[camel] def registerConsumer(endpointUri: String, consumer: Consumer, activationTimeout: FiniteDuration) = {
idempotentRegistry ! RegisterConsumer(endpointUri, consumer.self, consumer)
Await.result(activationFutureFor(consumer.self)(activationTimeout, consumer.context.dispatcher), activationTimeout)
}
}
/**
* For internal use only.
* Guarantees idempotent registration of camel consumer endpoints.
*
* Once registered the consumer is watched and unregistered upon termination.
* It also publishes events to the eventStream so interested parties could subscribe to them.
* The main consumer of these events is currently the ActivationTracker.
*/
private[camel] class IdempotentCamelConsumerRegistry(camelContext: CamelContext) extends Actor {
case class UnregisterConsumer(actorRef: ActorRef)
val activated = new mutable.HashSet[ActorRef]
val registrator = context.actorOf(Props(new CamelConsumerRegistrator))
def receive = {
case msg @ RegisterConsumer(_, consumer, _)
if (!isAlreadyActivated(consumer)) {
activated.add(consumer)
registrator ! msg
}
case msg @ EndpointActivated(consumer)
context.watch(consumer)
context.system.eventStream.publish(msg)
case msg @ EndpointFailedToActivate(consumer, _)
activated.remove(consumer)
context.system.eventStream.publish(msg)
case Terminated(ref)
activated.remove(ref)
registrator ! UnregisterConsumer(ref)
case msg @ EndpointFailedToDeActivate(ref, cause) context.system.eventStream.publish(msg)
case msg: EndpointDeActivated context.system.eventStream.publish(msg)
}
def isAlreadyActivated(ref: ActorRef): Boolean = activated.contains(ref)
//FIXME Break out
class CamelConsumerRegistrator extends Actor with ActorLogging {
def receive = {
case RegisterConsumer(endpointUri, consumer, consumerConfig)
camelContext.addRoutes(new ConsumerActorRouteBuilder(endpointUri, consumer, consumerConfig))
context.sender ! EndpointActivated(consumer)
log.debug("Published actor [{}] at endpoint [{}]", consumerConfig, endpointUri)
case UnregisterConsumer(consumer)
camelContext.stopRoute(consumer.path.toString)
camelContext.removeRoute(consumer.path.toString)
context.sender ! EndpointDeActivated(consumer)
log.debug("Unpublished actor [{}] from endpoint [{}]", consumer, consumer.path)
}
override def preRestart(reason: Throwable, message: Option[Any]) {
//FIXME check logic
super.preStart()
message match {
case Some(RegisterConsumer(_, consumer, _)) sender ! EndpointFailedToActivate(consumer, reason)
case Some(UnregisterConsumer(consumer)) sender ! EndpointFailedToDeActivate(consumer, reason)
case _
}
}
}
}
/**
* For internal use only. A message to register a consumer.
* @param endpointUri the endpointUri to register to
* @param actorRef the actorRef to register as a consumer
* @param config the configuration for the consumer
*/
private[camel] case class RegisterConsumer(endpointUri: String, actorRef: ActorRef, config: ConsumerConfig)
/**
* For internal use only.
* Builder of a route to a target which can be an actor.
*
* @param endpointUri endpoint URI of the consumer actor.
*
* @author Martin Krasser
*/
private[camel] class ConsumerActorRouteBuilder(endpointUri: String, consumer: ActorRef, config: ConsumerConfig) extends RouteBuilder {
protected def targetActorUri = CamelPath.toUri(consumer, config.autoAck, config.replyTimeout)
def configure() {
val scheme = endpointUri take endpointUri.indexOf(":") // e.g. "http" from "http://whatever/..."
val route = from(endpointUri).routeId(consumer.path.toString)
val converted = Conversions(scheme, route)
val userCustomized = applyUserRouteCustomization(converted)
userCustomized.to(targetActorUri)
}
def applyUserRouteCustomization(rd: RouteDefinition) = config.onRouteDefinition(rd)
object Conversions {
private val bodyConversions = Map(
"file" -> classOf[InputStream])
def apply(scheme: String, routeDefinition: RouteDefinition): RouteDefinition = bodyConversions.get(scheme) match {
case Some(clazz) routeDefinition.convertBodyTo(clazz)
case None routeDefinition
}
}
}

View file

@ -1,15 +1,21 @@
package akka.camel.internal
import akka.actor.ActorSystem
import akka.actor.{ ActorRef, Props, ActorSystem }
import akka.camel.internal.component.{ DurationTypeConverter, ActorComponent }
import org.apache.camel.impl.DefaultCamelContext
import scala.Predef._
import akka.event.Logging
import akka.camel.{ CamelSettings, Camel }
import akka.camel.internal.ActivationProtocol._
import scala.util.control.NonFatal
import scala.concurrent.util.Duration
import org.apache.camel.{ ProducerTemplate, CamelContext }
import scala.concurrent.util.FiniteDuration
import org.apache.camel.ProducerTemplate
import concurrent.{ Future, ExecutionContext }
import akka.util.Timeout
import akka.pattern.ask
import java.io.InputStream
import org.apache.camel.model.RouteDefinition
/**
* For internal use only.
@ -21,16 +27,17 @@ import scala.concurrent.util.FiniteDuration
* Also by not creating extra internal actor system we are conserving resources.
*/
private[camel] class DefaultCamel(val system: ActorSystem) extends Camel {
val supervisor = system.actorOf(Props[CamelSupervisor], "camel-supervisor")
/**
* For internal use only.
*/
private[camel] implicit val log = Logging(system, "Camel")
lazy val context: CamelContext = {
lazy val context: DefaultCamelContext = {
val ctx = new DefaultCamelContext
if (!settings.jmxStatistics) ctx.disableJMX()
ctx.setName(system.name)
ctx.setStreamCaching(true)
ctx.setStreamCaching(settings.streamingCache)
ctx.addComponent("akka", new ActorComponent(this, system))
ctx.getTypeConverterRegistry.addTypeConverter(classOf[FiniteDuration], classOf[String], DurationTypeConverter)
ctx
@ -65,4 +72,46 @@ private[camel] class DefaultCamel(val system: ActorSystem) extends Camel {
}
log.debug("Stopped CamelContext[{}] for ActorSystem[{}]", context.getName, system.name)
}
/**
* Produces a Future with the specified endpoint that will be completed when the endpoint has been activated,
* or if it times out, which will happen after the specified Timeout.
*
* @param endpoint the endpoint to be activated
* @param timeout the timeout for the Future
*/
def activationFutureFor(endpoint: ActorRef)(implicit timeout: Timeout, executor: ExecutionContext): Future[ActorRef] =
(supervisor.ask(AwaitActivation(endpoint))(timeout)).map[ActorRef]({
case EndpointActivated(`endpoint`) endpoint
case EndpointFailedToActivate(`endpoint`, cause) throw cause
})
/**
* Produces a Future which will be completed when the given endpoint has been deactivated or
* or if it times out, which will happen after the specified Timeout.
*
* @param endpoint the endpoint to be deactivated
* @param timeout the timeout of the Future
*/
def deactivationFutureFor(endpoint: ActorRef)(implicit timeout: Timeout, executor: ExecutionContext): Future[ActorRef] =
(supervisor.ask(AwaitDeActivation(endpoint))(timeout)).map[ActorRef]({
case EndpointDeActivated(`endpoint`) endpoint
case EndpointFailedToDeActivate(`endpoint`, cause) throw cause
})
}
/**
* For internal use only.
*/
private[camel] object Conversions {
//FIXME Add this to the configuration, and move this functionality to the Camel Extension.
private val bodyConversions = Map(
"file" -> classOf[InputStream])
def apply(scheme: String, routeDefinition: RouteDefinition): RouteDefinition = bodyConversions.get(scheme) match {
case Some(clazz) routeDefinition.convertBodyTo(clazz)
case None routeDefinition
}
}

View file

@ -1,86 +0,0 @@
package akka.camel.internal
import java.util.concurrent.ConcurrentHashMap
import org.apache.camel.processor.SendProcessor
import akka.actor.{ Props, ActorRef, Terminated, Actor }
import org.apache.camel.Endpoint
import akka.camel._
import scala.util.control.NonFatal
/**
* Watches the end of life of <code>Producer</code>s.
* Removes a <code>Producer</code> from the <code>ProducerRegistry</code> when it is <code>Terminated</code>,
* which in turn stops the <code>SendProcessor</code>.
*
* INTERNAL API
*/
private class ProducerWatcher(registry: ProducerRegistry) extends Actor {
override def receive = {
case RegisterProducer(actorRef) context.watch(actorRef)
case Terminated(actorRef) registry.unregisterProducer(actorRef)
}
}
/**
* INTERNAL API
*/
private case class RegisterProducer(actorRef: ActorRef)
/**
* For internal use only.
* Manages the Camel objects for <code>Producer</code>s.
* Every <code>Producer</code> needs an <code>Endpoint</code> and a <code>SendProcessor</code>
* to produce messages over an <code>Exchange</code>.
*/
private[camel] trait ProducerRegistry { this: Camel
private val camelObjects = new ConcurrentHashMap[ActorRef, (Endpoint, SendProcessor)]()
private val watcher = system.actorOf(Props(new ProducerWatcher(this))) //FIXME should this really be top level?
private def registerWatch(actorRef: ActorRef): Unit = watcher ! RegisterProducer(actorRef)
/**
* For internal use only.
* Unregisters <code>Endpoint</code> and <code>SendProcessor</code> and stops the SendProcessor
*/
private[camel] def unregisterProducer(actorRef: ActorRef): Unit = {
// Terminated cannot be sent before the actor is created in the processing of system messages.
Option(camelObjects.remove(actorRef)).foreach {
case (_, processor)
try {
processor.stop()
system.eventStream.publish(EndpointDeActivated(actorRef))
} catch {
case NonFatal(e) system.eventStream.publish(EndpointFailedToDeActivate(actorRef, e))
}
}
}
/**
* For internal use only.
* Creates <code>Endpoint</code> and <code>SendProcessor</code> and associates the actorRef to these.
* @param actorRef the actorRef of the <code>Producer</code> actor.
* @param endpointUri the endpoint Uri of the producer
* @return <code>Endpoint</code> and <code>SendProcessor</code> registered for the actorRef
*/
private[camel] def registerProducer(actorRef: ActorRef, endpointUri: String): (Endpoint, SendProcessor) = {
try {
val endpoint = context.getEndpoint(endpointUri)
val processor = new SendProcessor(endpoint)
camelObjects.putIfAbsent(actorRef, (endpoint, processor)) match {
case null
processor.start()
registerWatch(actorRef)
system.eventStream.publish(EndpointActivated(actorRef))
(endpoint, processor)
case prev prev
}
} catch {
case NonFatal(e) {
system.eventStream.publish(EndpointFailedToActivate(actorRef, e))
// can't return null to the producer actor, so blow up actor in initialization.
throw e //FIXME I'm not a huge fan of log-rethrow, either log or rethrow
}
}
}
}

View file

@ -15,7 +15,7 @@ import scala.concurrent.util.duration._
import scala.concurrent.util.Duration
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.control.NonFatal
import java.util.concurrent.{ TimeoutException, CountDownLatch }
import java.util.concurrent.{ TimeUnit, TimeoutException, CountDownLatch }
import akka.util.Timeout
import akka.camel.internal.CamelExchangeAdapter
import akka.camel.{ ActorNotRegisteredException, Camel, Ack, FailureResult, CamelMessage }
@ -111,7 +111,7 @@ private[camel] trait ActorEndpointConfig {
private[camel] class ActorProducer(val endpoint: ActorEndpoint, camel: Camel) extends DefaultProducer(endpoint) with AsyncProcessor {
/**
* Processes the exchange.
* Calls the asynchronous version of the method and waits for the result (blocking).
* Calls the synchronous version of the method and waits for the result (blocking).
* @param exchange the exchange to process
*/
def process(exchange: Exchange): Unit = processExchangeAdapter(new CamelExchangeAdapter(exchange))
@ -131,13 +131,11 @@ private[camel] class ActorProducer(val endpoint: ActorEndpoint, camel: Camel) ex
/**
* For internal use only. Processes the [[akka.camel.internal.CamelExchangeAdapter]]
* @param exchange the [[akka.camel.internal.CamelExchangeAdapter]]
*
* WARNING UNBOUNDED BLOCKING AWAITS
*/
private[camel] def processExchangeAdapter(exchange: CamelExchangeAdapter): Unit = {
val isDone = new CountDownLatch(1)
processExchangeAdapter(exchange, new AsyncCallback { def done(doneSync: Boolean) { isDone.countDown() } })
isDone.await() // this should never wait forever as the process(exchange, callback) method guarantees that.
isDone.await(camel.settings.replyTimeout.toMillis, TimeUnit.MILLISECONDS)
}
/**

View file

@ -6,7 +6,8 @@ package akka.camel.javaapi
import akka.actor.UntypedActor
import akka.camel._
import org.apache.camel.{ ProducerTemplate, CamelContext }
import org.apache.camel.ProducerTemplate
import org.apache.camel.impl.DefaultCamelContext
/**
* Subclass this abstract class to create an MDB-style untyped consumer actor. This
@ -21,10 +22,10 @@ abstract class UntypedConsumerActor extends UntypedActor with Consumer {
def getEndpointUri(): String
/**
* ''Java API'': Returns the [[org.apache.camel.CamelContext]]
* ''Java API'': Returns the [[org.apache.camel.impl.DefaultCamelContext]]
* @return the CamelContext
*/
protected def getCamelContext(): CamelContext = camelContext
protected def getCamelContext(): DefaultCamelContext = camelContext
/**
* ''Java API'': Returns the [[org.apache.camel.ProducerTemplate]]

View file

@ -7,6 +7,7 @@ package akka.camel.javaapi
import akka.actor.UntypedActor
import akka.camel._
import org.apache.camel.{ CamelContext, ProducerTemplate }
import org.apache.camel.impl.DefaultCamelContext
/**
* Subclass this abstract class to create an untyped producer actor. This class is meant to be used from Java.
@ -64,7 +65,7 @@ abstract class UntypedProducerActor extends UntypedActor with ProducerSupport {
/**
* Returns the <code>CamelContext</code>.
*/
def getCamelContext(): CamelContext = camel.context
def getCamelContext(): DefaultCamelContext = camel.context
/**
* Returns the <code>ProducerTemplate</code>.

View file

@ -17,4 +17,4 @@ package object camel {
* }}}
*/
implicit def toActorRouteDefinition[T <: ProcessorDefinition[T]](definition: ProcessorDefinition[T]) = new ActorRouteDefinition(definition)
}
}

View file

@ -4,23 +4,20 @@
package akka.camel;
import static org.junit.Assert.assertEquals;
import java.util.concurrent.TimeUnit;
import org.junit.AfterClass;
import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.ExecutionContext;
import scala.concurrent.util.Duration;
import scala.concurrent.util.FiniteDuration;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.testkit.AkkaSpec;
import akka.testkit.JavaTestKit;
import akka.util.Timeout;
import scala.concurrent.Await;
import scala.concurrent.ExecutionContext;
import scala.concurrent.util.Duration;
import org.junit.AfterClass;
import org.junit.Test;
import java.util.concurrent.TimeUnit;
import akka.testkit.AkkaSpec;
import scala.concurrent.util.FiniteDuration;
import static org.junit.Assert.*;
/**
* @author Martin Krasser
*/
@ -33,30 +30,27 @@ public class ConsumerJavaTestBase {
system.shutdown();
}
@Test
public void shouldHandleExceptionThrownByActorAndGenerateCustomResponse()
throws Exception {
new JavaTestKit(system) {
{
String result = new EventFilter<String>(Exception.class) {
protected String run() {
FiniteDuration timeout = Duration.create(1, TimeUnit.SECONDS);
Camel camel = CamelExtension.get(system);
ExecutionContext executionContext = system.dispatcher();
try {
@SuppressWarnings("unused")
ActorRef ref = Await.result(camel.activationFutureFor(
system.actorOf(new Props(SampleErrorHandlingConsumer.class)),
timeout, executionContext), timeout);
return camel.template().requestBody(
"direct:error-handler-test-java", "hello", String.class);
} catch (Exception e) {
return e.getMessage();
}
}
}.occurrences(1).exec();
assertEquals("error: hello", result);
}
};
}
@Test
public void shouldHandleExceptionThrownByActorAndGenerateCustomResponse() throws Exception {
new JavaTestKit(system) {{
String result = new EventFilter<String>(Exception.class) {
protected String run() {
FiniteDuration duration = Duration.create(1, TimeUnit.SECONDS);
Timeout timeout = new Timeout(duration);
Camel camel = CamelExtension.get(system);
ExecutionContext executionContext = system.dispatcher();
try {
Await.result(
camel.activationFutureFor(system.actorOf(new Props(SampleErrorHandlingConsumer.class), "sample-error-handling-consumer"), timeout, executionContext),
duration);
return camel.template().requestBody("direct:error-handler-test-java", "hello", String.class);
}
catch (Exception e) {
return e.getMessage();
}
}
}.occurrences(1).exec();
assertEquals("error: hello", result);
}};
}
}

View file

@ -4,10 +4,10 @@ import akka.actor.*;
import akka.camel.internal.component.CamelPath;
import akka.camel.javaapi.UntypedConsumerActor;
import akka.camel.javaapi.UntypedProducerActor;
import akka.util.Timeout;
import scala.concurrent.Await;
import scala.concurrent.ExecutionContext;
import scala.concurrent.util.Duration;
import scala.concurrent.util.FiniteDuration;
import org.apache.camel.CamelExecutionException;
import org.apache.camel.Exchange;
import org.apache.camel.Predicate;
@ -16,6 +16,8 @@ import org.apache.camel.component.mock.MockEndpoint;
import org.junit.Before;
import org.junit.After;
import org.junit.Test;
import scala.concurrent.util.FiniteDuration;
import java.util.concurrent.TimeUnit;
public class CustomRouteTestBase {
@ -60,11 +62,12 @@ public class CustomRouteTestBase {
@Test
public void testCustomConsumerRoute() throws Exception {
MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockConsumer", MockEndpoint.class);
FiniteDuration timeout = Duration.create(10, TimeUnit.SECONDS);
FiniteDuration duration = Duration.create(10, TimeUnit.SECONDS);
Timeout timeout = new Timeout(duration);
ExecutionContext executionContext = system.dispatcher();
ActorRef consumer = Await.result(
camel.activationFutureFor(system.actorOf(new Props(TestConsumer.class), "testConsumer"), timeout, executionContext),
timeout);
duration);
camel.context().addRoutes(new CustomRouteBuilder("direct:testRouteConsumer",consumer));
camel.template().sendBody("direct:testRouteConsumer", "test");
assertMockEndpoint(mockEndpoint);
@ -74,15 +77,16 @@ public class CustomRouteTestBase {
@Test
public void testCustomAckConsumerRoute() throws Exception {
MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockAck", MockEndpoint.class);
FiniteDuration timeout = Duration.create(10, TimeUnit.SECONDS);
FiniteDuration duration = Duration.create(10, TimeUnit.SECONDS);
Timeout timeout = new Timeout(duration);
ExecutionContext executionContext = system.dispatcher();
ActorRef consumer = Await.result(
camel.activationFutureFor(
system.actorOf(
new Props( new UntypedActorFactory(){ public Actor create() { return new TestAckConsumer("direct:testConsumerAck","mock:mockAck"); } }), "testConsumerAck"),
timeout, executionContext),
timeout);
camel.context().addRoutes(new CustomRouteBuilder("direct:testAck", consumer, false, timeout));
duration);
camel.context().addRoutes(new CustomRouteBuilder("direct:testAck", consumer, false, duration));
camel.template().sendBody("direct:testAck", "test");
assertMockEndpoint(mockEndpoint);
system.stop(consumer);
@ -92,11 +96,12 @@ public class CustomRouteTestBase {
public void testCustomAckConsumerRouteFromUri() throws Exception {
MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockAckUri", MockEndpoint.class);
ExecutionContext executionContext = system.dispatcher();
FiniteDuration timeout = Duration.create(10, TimeUnit.SECONDS);
FiniteDuration duration = Duration.create(10, TimeUnit.SECONDS);
Timeout timeout = new Timeout(duration);
ActorRef consumer = Await.result(
camel.activationFutureFor(system.actorOf(new Props( new UntypedActorFactory(){ public Actor create() { return new TestAckConsumer("direct:testConsumerAckFromUri","mock:mockAckUri"); } }), "testConsumerAckUri"),
camel.activationFutureFor(system.actorOf(new Props( new UntypedActorFactory(){ public Actor create() { return new TestAckConsumer("direct:testConsumerAckFromUri","mock:mockAckUri"); } }), "testConsumerAckUri"),
timeout, executionContext),
timeout);
duration);
camel.context().addRoutes(new CustomRouteBuilder("direct:testAckFromUri","akka://test/user/testConsumerAckUri?autoAck=false"));
camel.template().sendBody("direct:testAckFromUri", "test");
assertMockEndpoint(mockEndpoint);
@ -105,12 +110,13 @@ public class CustomRouteTestBase {
@Test(expected=CamelExecutionException.class)
public void testCustomTimeoutConsumerRoute() throws Exception {
FiniteDuration timeout = Duration.create(10, TimeUnit.SECONDS);
FiniteDuration duration = Duration.create(10, TimeUnit.SECONDS);
Timeout timeout = new Timeout(duration);
ExecutionContext executionContext = system.dispatcher();
ActorRef consumer = Await.result(
camel.activationFutureFor(system.actorOf(new Props( new UntypedActorFactory(){ public Actor create() { return new TestAckConsumer("direct:testConsumerException","mock:mockException"); } }), "testConsumerException"),
timeout, executionContext),
timeout);
duration);
camel.context().addRoutes(new CustomRouteBuilder("direct:testException", consumer, false, Duration.create(0, TimeUnit.SECONDS)));
camel.template().sendBody("direct:testException", "test");
}

View file

@ -5,6 +5,7 @@
package akka.camel;
import akka.actor.ActorSystem;
import akka.dispatch.Mapper;
import akka.japi.Function;
import org.apache.camel.NoTypeConversionAvailableException;
import org.junit.AfterClass;
@ -48,12 +49,6 @@ public class MessageJavaTestBase {
message(1.4).getBodyAs(InputStream.class,camel.context());
}
@Test public void shouldReturnDoubleHeader() {
CamelMessage message = message("test" , createMap("test", 1.4));
assertEquals(1.4, message.getHeader("test"));
}
@Test public void shouldConvertDoubleHeaderToString() {
CamelMessage message = message("test" , createMap("test", 1.4));
assertEquals("1.4", message.getHeaderAs("test", String.class,camel.context()));
@ -105,24 +100,6 @@ public class MessageJavaTestBase {
message("test1" , createMap("A", "1")).withHeaders(createMap("C", "3")));
}
@Test public void shouldAddHeaderAndPreserveBodyAndHeaders() {
assertEquals(
message("test1" , createMap("A", "1", "B", "2")),
message("test1" , createMap("A", "1")).addHeader("B", "2"));
}
@Test public void shouldAddHeadersAndPreserveBodyAndHeaders() {
assertEquals(
message("test1" , createMap("A", "1", "B", "2")),
message("test1" , createMap("A", "1")).addHeaders(createMap("B", "2")));
}
@Test public void shouldRemoveHeadersAndPreserveBodyAndRemainingHeaders() {
assertEquals(
message("test1" , createMap("A", "1")),
message("test1" , createMap("A", "1", "B", "2")).withoutHeader("B"));
}
private static Set<String> createSet(String... entries) {
HashSet<String> set = new HashSet<String>();
set.addAll(Arrays.asList(entries));
@ -137,7 +114,8 @@ public class MessageJavaTestBase {
return map;
}
private static class TestTransformer implements Function<String, String> {
private static class TestTransformer extends Mapper<String, String> {
@Override
public String apply(String param) {
return param + "b";
}

View file

@ -6,34 +6,38 @@ package akka.camel;
import akka.actor.Status;
import akka.camel.javaapi.UntypedConsumerActor;
import akka.dispatch.Mapper;
import scala.concurrent.util.Duration;
import org.apache.camel.builder.Builder;
import org.apache.camel.model.ProcessorDefinition;
import org.apache.camel.model.RouteDefinition;
import scala.Option;
import scala.concurrent.util.FiniteDuration;
/**
* @author Martin Krasser
*/
public class SampleErrorHandlingConsumer extends UntypedConsumerActor {
private static Mapper<RouteDefinition, ProcessorDefinition<?>> mapper = new Mapper<RouteDefinition, ProcessorDefinition<?>>() {
public ProcessorDefinition<?> apply(RouteDefinition rd) {
return rd.onException(Exception.class).handled(true).transform(Builder.exceptionMessage()).end();
}
};
public String getEndpointUri() {
return "direct:error-handler-test-java";
}
@Override
//TODO write test confirming this gets called in java
public ProcessorDefinition onRouteDefinition(RouteDefinition rd) {
return rd.onException(Exception.class).handled(true).transform(Builder.exceptionMessage()).end();
public Mapper<RouteDefinition, ProcessorDefinition<?>> onRouteDefinition() {
return mapper;
}
@Override
public Duration replyTimeout(){
public FiniteDuration replyTimeout(){
return Duration.create(1, "second");
}
public void onReceive(Object message) throws Exception {
CamelMessage msg = (CamelMessage) message;
String body = msg.getBodyAs(String.class,this.getCamelContext());

View file

@ -1,16 +0,0 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.camel;
import akka.actor.UntypedActor;
/**
* @author Martin Krasser
*/
public class SampleUntypedActor extends UntypedActor {
public void onReceive(Object message) {
System.out.println("Yay! I haz a message!");
}
}

View file

@ -0,0 +1,23 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%date{ISO8601} %-5level %logger %X{akkaSource} %X{sourceThread} - %msg%n</pattern>
</encoder>
</appender>
<appender name="FILE" class="ch.qos.logback.core.FileAppender">
<file>target/akka-camel.log</file>
<append>true</append>
<encoder>
<pattern>%date{ISO8601} %-5level %logger %X{akkaSource} %X{sourceThread} - %msg%n</pattern>
</encoder>
</appender>
<root level="fatal">
<appender-ref ref="FILE" />
</root>
</configuration>

View file

@ -10,21 +10,22 @@ import org.scalatest.matchers.MustMatchers
import scala.concurrent.util.duration._
import org.apache.camel.ProducerTemplate
import akka.actor._
import akka.util.Timeout
import TestSupport._
import org.scalatest.WordSpec
import akka.testkit.TestLatch
import scala.concurrent.Await
import concurrent.Await
import java.util.concurrent.TimeoutException
import akka.util.Timeout
class ActivationIntegrationTest extends WordSpec with MustMatchers with SharedCamelSystem {
implicit val timeout = 10 seconds
val timeoutDuration = 10 seconds
implicit val timeout = Timeout(timeoutDuration)
def template: ProducerTemplate = camel.template
import system.dispatcher
"ActivationAware must be notified when endpoint is activated" in {
val latch = new TestLatch(0)
val actor = system.actorOf(Props(new TestConsumer("direct:actor-1", latch)))
val actor = system.actorOf(Props(new TestConsumer("direct:actor-1", latch)), "act-direct-actor-1")
Await.result(camel.activationFutureFor(actor), 10 seconds) must be === actor
template.requestBody("direct:actor-1", "test") must be("received test")
@ -40,32 +41,39 @@ class ActivationIntegrationTest extends WordSpec with MustMatchers with SharedCa
super.postStop()
latch.countDown()
}
})
Await.result(camel.activationFutureFor(actor), timeout)
}, name = "direct-a3")
Await.result(camel.activationFutureFor(actor), timeoutDuration)
system.stop(actor)
Await.result(camel.deactivationFutureFor(actor), timeout)
Await.ready(latch, 10 second)
Await.result(camel.deactivationFutureFor(actor), timeoutDuration)
Await.ready(latch, timeoutDuration)
}
"ActivationAware must time out when waiting for endpoint de-activation for too long" in {
val latch = new TestLatch(0)
val actor = start(new TestConsumer("direct:a5", latch))
Await.result(camel.activationFutureFor(actor), timeout)
val actor = start(new TestConsumer("direct:a5", latch), name = "direct-a5")
Await.result(camel.activationFutureFor(actor), timeoutDuration)
intercept[TimeoutException] { Await.result(camel.deactivationFutureFor(actor), 1 millis) }
}
"activationFutureFor must fail if notification timeout is too short and activation is not complete yet" in {
val latch = new TestLatch(1)
try {
val actor = system.actorOf(Props(new TestConsumer("direct:actor-4", latch)))
intercept[TimeoutException] { Await.result(camel.activationFutureFor(actor), 1 millis) }
} finally latch.countDown()
val actor = system.actorOf(Props(new TestConsumer("direct:actor-4", latch)), "direct-actor-4")
intercept[TimeoutException] { Await.result(camel.activationFutureFor(actor), 1 millis) }
latch.countDown()
// after the latch is removed, complete the wait for completion so this test does not later on
// print errors because of the registerConsumer timing out.
Await.result(camel.activationFutureFor(actor), timeoutDuration)
}
class TestConsumer(uri: String, latch: TestLatch) extends Consumer {
def endpointUri = uri
Await.ready(latch, 60 seconds)
override def preStart() {
Await.ready(latch, 60 seconds)
super.preStart()
}
override def receive = {
case msg: CamelMessage sender ! "received " + msg.body
}

View file

@ -16,24 +16,11 @@ class CamelMessageTest extends MustMatchers with WordSpec with SharedCamelSystem
"overwrite body and add header" in {
val msg = sampleMessage
CamelMessage("blah", Map("key" -> "baz")).copyContentTo(msg)
CamelMessage.copyContent(CamelMessage("blah", Map("key" -> "baz")), msg)
assert(msg.getBody === "blah")
assert(msg.getHeader("foo") === "bar")
assert(msg.getHeader("key") === "baz")
}
"create message with body and header" in {
val m = CamelMessage.from(sampleMessage)
assert(m.body === "test")
assert(m.headers("foo") === "bar")
}
"create message with body and header and custom header" in {
val m = CamelMessage.from(sampleMessage, Map("key" -> "baz"))
assert(m.body === "test")
assert(m.headers("foo") === "bar")
assert(m.headers("key") === "baz")
}
}
private[camel] def sampleMessage = {

View file

@ -0,0 +1,159 @@
package akka.camel
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import akka.camel.TestSupport.NonSharedCamelSystem
import akka.actor.{ ActorRef, Props, Actor }
import akka.routing.BroadcastRouter
import concurrent.{ Promise, Await, Future }
import scala.concurrent.util.duration._
import language.postfixOps
import akka.testkit._
import akka.util.Timeout
import org.apache.camel.model.{ProcessorDefinition, RouteDefinition}
import org.apache.camel.builder.Builder
/**
* A test to concurrently register and de-register consumer and producer endpoints
*/
class ConcurrentActivationTest extends WordSpec with MustMatchers with NonSharedCamelSystem {
"Activation" must {
"support concurrent registrations and de-registrations" in {
implicit val timeout = Timeout(10 seconds)
val timeoutDuration = timeout.duration
implicit val ec = system.dispatcher
val number = 10
filterEvents(EventFilter.warning(pattern = "received dead letter from .*producerRegistrar.*", occurrences = number*number)) {
// A ConsumerBroadcast creates 'number' amount of ConsumerRegistrars, which will register 'number' amount of endpoints,
// in total number*number endpoints, activating and deactivating every endpoint.
// a promise to the list of registrars, which have a list of actorRefs each. A tuple of a list of activated refs and a list of deactivated refs
val promiseRegistrarLists = Promise[(Future[List[List[ActorRef]]], Future[List[List[ActorRef]]])]()
// future to all the futures of activation and deactivation
val futureRegistrarLists = promiseRegistrarLists.future
val ref = system.actorOf(Props(new ConsumerBroadcast(promiseRegistrarLists)), name = "broadcaster")
// create the registrars
ref ! CreateRegistrars(number)
// send a broadcast to all registrars, so that number * number messages are sent
// every Register registers a consumer and a producer
(1 to number).map(i ref ! RegisterConsumersAndProducers("direct:concurrent-"))
// de-register all consumers and producers
ref ! DeRegisterConsumersAndProducers()
val promiseAllRefs = Promise[(List[ActorRef], List[ActorRef])]()
val allRefsFuture = promiseAllRefs.future
// map over all futures, put all futures in one list of activated and deactivated actor refs.
futureRegistrarLists.map {
case (futureActivations, futureDeactivations)
futureActivations zip futureDeactivations map {
case (activations, deactivations)
promiseAllRefs.success( (activations.flatten, deactivations.flatten))
}
}
val (activations, deactivations) = Await.result(allRefsFuture, timeoutDuration)
// must be the size of the activated activated producers and consumers
activations.size must be (2 * number * number)
// must be the size of the activated activated producers and consumers
deactivations.size must be ( 2* number * number )
def partitionNames(refs:Seq[ActorRef]) = refs.map(_.path.name).partition(_.startsWith("concurrent-test-echo-consumer"))
def assertContainsSameElements(lists:(Seq[_], Seq[_])) {
val (a,b) = lists
a.intersect(b).size must be (a.size)
}
val (activatedConsumerNames, activatedProducerNames) = partitionNames(activations)
val (deactivatedConsumerNames, deactivatedProducerNames) = partitionNames(deactivations)
assertContainsSameElements(activatedConsumerNames, deactivatedConsumerNames)
assertContainsSameElements(activatedProducerNames, deactivatedProducerNames)
}
}
}
}
class ConsumerBroadcast(promise: Promise[(Future[List[List[ActorRef]]], Future[List[List[ActorRef]]])]) extends Actor {
private var broadcaster: Option[ActorRef] = None
private implicit val ec = context.dispatcher
def receive = {
case CreateRegistrars(number)
var allActivationFutures = List[Future[List[ActorRef]]]()
var allDeactivationFutures = List[Future[List[ActorRef]]]()
val routees = (1 to number).map { i
val activationListPromise = Promise[List[ActorRef]]()
val deactivationListPromise = Promise[List[ActorRef]]()
val activationListFuture = activationListPromise.future
val deactivationListFuture = deactivationListPromise.future
allActivationFutures = allActivationFutures :+ activationListFuture
allDeactivationFutures = allDeactivationFutures :+ deactivationListFuture
context.actorOf(Props(new Registrar(i, number, activationListPromise, deactivationListPromise)), s"registrar-$i")
}
promise.success((Future.sequence(allActivationFutures)), Future.sequence(allDeactivationFutures))
broadcaster = Some(context.actorOf(Props[Registrar] withRouter (BroadcastRouter(routees)), "registrarRouter"))
case reg: Any
broadcaster.foreach(_.forward(reg))
}
}
case class CreateRegistrars(number: Int)
case class RegisterConsumersAndProducers(endpointUri: String)
case class DeRegisterConsumersAndProducers()
case class Activations()
case class DeActivations()
class Registrar(val start: Int, val number: Int, activationsPromise: Promise[List[ActorRef]],
deActivationsPromise: Promise[List[ActorRef]]) extends Actor {
private var actorRefs = Set[ActorRef]()
private var activations = Set[Future[ActorRef]]()
private var deActivations = Set[Future[ActorRef]]()
private var index = 0
private val camel = CamelExtension(context.system)
private implicit val ec = context.dispatcher
private implicit val timeout = Timeout(10 seconds)
def receive = {
case reg: RegisterConsumersAndProducers
val i = index
add(new EchoConsumer(s"${reg.endpointUri}$start-$i"), s"concurrent-test-echo-consumer$start-$i")
add(new TestProducer(s"${reg.endpointUri}$start-$i"), s"concurrent-test-producer-$start-$i")
index = index + 1
if (activations.size == number * 2) {
Future.sequence(activations.toList) map activationsPromise.success
}
case reg: DeRegisterConsumersAndProducers
actorRefs.foreach { aref
context.stop(aref)
deActivations = deActivations + camel.deactivationFutureFor(aref)
if (deActivations.size == number * 2) {
Future.sequence(deActivations.toList) map deActivationsPromise.success
}
}
}
def add(actor: =>Actor, name:String) {
val ref = context.actorOf(Props(actor), name)
actorRefs = actorRefs + ref
activations = activations + camel.activationFutureFor(ref)
}
}
class EchoConsumer(endpoint: String) extends Actor with Consumer {
def endpointUri = endpoint
def receive = {
case msg: CamelMessage sender ! msg
}
/**
* Returns the route definition handler for creating a custom route to this consumer.
* By default it returns an identity function, override this method to
* return a custom route definition handler.
*/
override def onRouteDefinition = (rd: RouteDefinition) => rd.onException(classOf[Exception]).handled(true).transform(Builder.exceptionMessage).end
}
class TestProducer(uri: String) extends Actor with Producer {
def endpointUri = uri
}

View file

@ -4,6 +4,7 @@
package akka.camel
import internal.ActorActivationException
import language.postfixOps
import language.existentials
@ -11,7 +12,7 @@ import akka.actor._
import org.scalatest.matchers.MustMatchers
import org.scalatest.WordSpec
import akka.camel.TestSupport._
import org.apache.camel.model.RouteDefinition
import org.apache.camel.model.{ ProcessorDefinition, RouteDefinition }
import org.apache.camel.builder.Builder
import org.apache.camel.{ FailedToCreateRouteException, CamelExecutionException }
import java.util.concurrent.{ ExecutionException, TimeUnit, TimeoutException }
@ -19,17 +20,19 @@ import akka.actor.Status.Failure
import scala.concurrent.util.duration._
import concurrent.{ ExecutionContext, Await }
import akka.testkit._
import akka.util.Timeout
class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedCamelSystem {
"ConsumerIntegrationTest" must {
implicit val defaultTimeout = 10.seconds
val defaultTimeoutDuration = 10 seconds
implicit val defaultTimeout = Timeout(defaultTimeoutDuration)
implicit def ec: ExecutionContext = system.dispatcher
"Consumer must throw FailedToCreateRouteException, while awaiting activation, if endpoint is invalid" in {
filterEvents(EventFilter[ActorInitializationException](occurrences = 1), EventFilter[FailedToCreateRouteException](occurrences = 1)) {
filterEvents(EventFilter[ActorActivationException](occurrences = 1)) {
val actorRef = system.actorOf(Props(new TestActor(uri = "some invalid uri")))
intercept[FailedToCreateRouteException] {
Await.result(camel.activationFutureFor(actorRef), defaultTimeout)
Await.result(camel.activationFutureFor(actorRef), defaultTimeoutDuration)
}
}
}
@ -40,7 +43,7 @@ class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedC
def receive = {
case m: CamelMessage sender ! "received " + m.bodyAs[String]
}
})
}, name = "direct-a1")
camel.sendTo("direct:a1", msg = "some message") must be("received some message")
}
@ -48,17 +51,18 @@ class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedC
val SHORT_TIMEOUT = 10 millis
val LONG_WAIT = 200 millis
start(new Consumer {
val ref = start(new Consumer {
override def replyTimeout = SHORT_TIMEOUT
def endpointUri = "direct:a3"
def receive = { case _ { Thread.sleep(LONG_WAIT.toMillis); sender ! "done" } }
})
}, name = "ignore-this-deadletter-timeout-consumer-reply")
val exception = intercept[CamelExecutionException] {
camel.sendTo("direct:a3", msg = "some msg 3")
}
exception.getCause.getClass must be(classOf[TimeoutException])
stop(ref)
}
"Consumer must process messages even after actor restart" in {
@ -74,90 +78,105 @@ class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedC
override def postRestart(reason: Throwable) {
restarted.countDown()
}
})
}, "direct-a2")
filterEvents(EventFilter[TestException](occurrences = 1)) {
consumer ! "throw"
Await.ready(restarted, defaultTimeout)
Await.ready(restarted, defaultTimeoutDuration)
camel.sendTo("direct:a2", msg = "xyz") must be("received xyz")
}
stop(consumer)
}
"Consumer must unregister itself when stopped" in {
val consumer = start(new TestActor())
Await.result(camel.activationFutureFor(consumer), defaultTimeout)
val consumer = start(new TestActor(), name = "test-actor-unregister")
Await.result(camel.activationFutureFor(consumer), defaultTimeoutDuration)
camel.routeCount must be > (0)
system.stop(consumer)
Await.result(camel.deactivationFutureFor(consumer), defaultTimeout)
Await.result(camel.deactivationFutureFor(consumer), defaultTimeoutDuration)
camel.routeCount must be(0)
}
"Consumer must register on uri passed in through constructor" in {
val consumer = start(new TestActor("direct://test"))
Await.result(camel.activationFutureFor(consumer), defaultTimeout)
val consumer = start(new TestActor("direct://test"), name = "direct-test")
Await.result(camel.activationFutureFor(consumer), defaultTimeoutDuration)
camel.routeCount must be > (0)
camel.routes.get(0).getEndpoint.getEndpointUri must be("direct://test")
system.stop(consumer)
Await.result(camel.deactivationFutureFor(consumer), defaultTimeout)
Await.result(camel.deactivationFutureFor(consumer), defaultTimeoutDuration)
camel.routeCount must be(0)
stop(consumer)
}
"Error passing consumer supports error handling through route modification" in {
start(new ErrorThrowingConsumer("direct:error-handler-test") with ErrorPassing {
override def onRouteDefinition(rd: RouteDefinition) = {
val ref = start(new ErrorThrowingConsumer("direct:error-handler-test") {
override def onRouteDefinition = (rd: RouteDefinition) {
rd.onException(classOf[TestException]).handled(true).transform(Builder.exceptionMessage).end
}
})
}, name = "direct-error-handler-test")
filterEvents(EventFilter[TestException](occurrences = 1)) {
camel.sendTo("direct:error-handler-test", msg = "hello") must be("error: hello")
}
stop(ref)
}
"Error passing consumer supports redelivery through route modification" in {
start(new FailingOnceConsumer("direct:failing-once-concumer") with ErrorPassing {
override def onRouteDefinition(rd: RouteDefinition) = {
val ref = start(new FailingOnceConsumer("direct:failing-once-concumer") {
override def onRouteDefinition = (rd: RouteDefinition) {
rd.onException(classOf[TestException]).maximumRedeliveries(1).end
}
})
}, name = "direct-failing-once-consumer")
filterEvents(EventFilter[TestException](occurrences = 1)) {
camel.sendTo("direct:failing-once-concumer", msg = "hello") must be("accepted: hello")
}
stop(ref)
}
"Consumer supports manual Ack" in {
start(new ManualAckConsumer() {
val ref = start(new ManualAckConsumer() {
def endpointUri = "direct:manual-ack"
def receive = { case _ sender ! Ack }
})
camel.template.asyncSendBody("direct:manual-ack", "some message").get(defaultTimeout.toSeconds, TimeUnit.SECONDS) must be(null) //should not timeout
}, name = "direct-manual-ack-1")
camel.template.asyncSendBody("direct:manual-ack", "some message").get(defaultTimeoutDuration.toSeconds, TimeUnit.SECONDS) must be(null) //should not timeout
stop(ref)
}
"Consumer handles manual Ack failure" in {
val someException = new Exception("e1")
start(new ManualAckConsumer() {
val ref = start(new ManualAckConsumer() {
def endpointUri = "direct:manual-ack"
def receive = { case _ sender ! Failure(someException) }
})
}, name = "direct-manual-ack-2")
intercept[ExecutionException] {
camel.template.asyncSendBody("direct:manual-ack", "some message").get(defaultTimeout.toSeconds, TimeUnit.SECONDS)
camel.template.asyncSendBody("direct:manual-ack", "some message").get(defaultTimeoutDuration.toSeconds, TimeUnit.SECONDS)
}.getCause.getCause must be(someException)
stop(ref)
}
"Consumer should time-out, if manual Ack not received within replyTimeout and should give a human readable error message" in {
start(new ManualAckConsumer() {
val ref = start(new ManualAckConsumer() {
override def replyTimeout = 10 millis
def endpointUri = "direct:manual-ack"
def receive = { case _ }
})
}, name = "direct-manual-ack-3")
intercept[ExecutionException] {
camel.template.asyncSendBody("direct:manual-ack", "some message").get(defaultTimeout.toSeconds, TimeUnit.SECONDS)
camel.template.asyncSendBody("direct:manual-ack", "some message").get(defaultTimeoutDuration.toSeconds, TimeUnit.SECONDS)
}.getCause.getCause.getMessage must include("Failed to get Ack")
stop(ref)
}
"respond to onRouteDefinition" in {
val ref = start(new ErrorRespondingConsumer("direct:error-responding-consumer-1"), "error-responding-consumer")
filterEvents(EventFilter[TestException](occurrences = 1)) {
val response = camel.sendTo("direct:error-responding-consumer-1", "some body")
response must be("some body has an error")
}
stop(ref)
}
}
}
@ -166,6 +185,25 @@ class ErrorThrowingConsumer(override val endpointUri: String) extends Consumer {
def receive = {
case msg: CamelMessage throw new TestException("error: %s" format msg.body)
}
override def preRestart(reason: Throwable, message: Option[Any]) {
super.preRestart(reason, message)
sender ! Failure(reason)
}
}
class ErrorRespondingConsumer(override val endpointUri: String) extends Consumer {
def receive = {
case msg: CamelMessage throw new TestException("Error!")
}
override def onRouteDefinition = (rd: RouteDefinition) {
// Catch TestException and handle it by returning a modified version of the in message
rd.onException(classOf[TestException]).handled(true).transform(Builder.body.append(" has an error")).end
}
final override def preRestart(reason: Throwable, message: Option[Any]) {
super.preRestart(reason, message)
sender ! Failure(reason)
}
}
class FailingOnceConsumer(override val endpointUri: String) extends Consumer {
@ -177,6 +215,11 @@ class FailingOnceConsumer(override val endpointUri: String) extends Consumer {
else
throw new TestException("rejected: %s" format msg.body)
}
final override def preRestart(reason: Throwable, message: Option[Any]) {
super.preRestart(reason, message)
sender ! Failure(reason)
}
}
class TestActor(uri: String = "file://target/abcde") extends Consumer {
@ -184,13 +227,6 @@ class TestActor(uri: String = "file://target/abcde") extends Consumer {
def receive = { case _ /* do nothing */ }
}
trait ErrorPassing {
self: Actor
final override def preRestart(reason: Throwable, message: Option[Any]) {
sender ! Failure(reason)
}
}
trait ManualAckConsumer extends Consumer {
override def autoAck = false
}

View file

@ -14,6 +14,8 @@ import org.scalatest.WordSpec
import akka.event.LoggingAdapter
import akka.actor.ActorSystem.Settings
import com.typesafe.config.ConfigFactory
import org.apache.camel.impl.DefaultCamelContext
import org.apache.camel.spi.Registry
class DefaultCamelTest extends WordSpec with SharedCamelSystem with MustMatchers with MockitoSugar {
@ -26,7 +28,7 @@ class DefaultCamelTest extends WordSpec with SharedCamelSystem with MustMatchers
def camelWithMocks = new DefaultCamel(sys) {
override val log = mock[LoggingAdapter]
override lazy val template = mock[ProducerTemplate]
override lazy val context = mock[CamelContext]
override lazy val context = mock[DefaultCamelContext]
override val settings = mock[CamelSettings]
}
@ -62,5 +64,4 @@ class DefaultCamelTest extends WordSpec with SharedCamelSystem with MustMatchers
verify(camel.context).stop()
}
}

View file

@ -23,11 +23,6 @@ class MessageScalaTest extends FunSuite with MustMatchers with SharedCamelSystem
}
}
test("mustReturnDoubleHeader") {
val message = CamelMessage("test", Map("test" -> 1.4))
message.header("test").get must be(1.4)
}
test("mustConvertDoubleHeaderToString") {
val message = CamelMessage("test", Map("test" -> 1.4))
message.headerAs[String]("test").get must be("1.4")
@ -47,32 +42,14 @@ class MessageScalaTest extends FunSuite with MustMatchers with SharedCamelSystem
}
test("mustSetBodyAndPreserveHeaders") {
CamelMessage("test1", Map("A" -> "1")).withBody("test2") must be(
CamelMessage("test1", Map("A" -> "1")).copy(body = "test2") must be(
CamelMessage("test2", Map("A" -> "1")))
}
test("mustSetHeadersAndPreserveBody") {
CamelMessage("test1", Map("A" -> "1")).withHeaders(Map("C" -> "3")) must be(
CamelMessage("test1", Map("A" -> "1")).copy(headers = Map("C" -> "3")) must be(
CamelMessage("test1", Map("C" -> "3")))
}
test("mustAddHeaderAndPreserveBodyAndHeaders") {
CamelMessage("test1", Map("A" -> "1")).addHeader("B" -> "2") must be(
CamelMessage("test1", Map("A" -> "1", "B" -> "2")))
}
test("mustAddHeadersAndPreserveBodyAndHeaders") {
CamelMessage("test1", Map("A" -> "1")).addHeaders(Map("B" -> "2")) must be(
CamelMessage("test1", Map("A" -> "1", "B" -> "2")))
}
test("mustRemoveHeadersAndPreserveBodyAndRemainingHeaders") {
CamelMessage("test1", Map("A" -> "1", "B" -> "2")).withoutHeader("B") must be(
CamelMessage("test1", Map("A" -> "1")))
}
}

View file

@ -31,7 +31,6 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
val camelContext = camel.context
// to make testing equality of messages easier, otherwise the breadcrumb shows up in the result.
camelContext.setUseBreadcrumb(false)
val timeoutDuration = 1 second
implicit val timeout = Timeout(timeoutDuration)
override protected def beforeAll { camelContext.addRoutes(new TestRoute(system)) }
@ -40,7 +39,7 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
"A Producer on a sync Camel route" must {
"produce a message and receive normal response" in {
val producer = system.actorOf(Props(new TestProducer("direct:producer-test-2", true)))
val producer = system.actorOf(Props(new TestProducer("direct:producer-test-2", true)), name = "direct-producer-2")
val message = CamelMessage("test", Map(CamelMessage.MessageExchangeId -> "123"))
val future = producer.ask(message)(timeoutDuration)
val expected = CamelMessage("received TEST", Map(CamelMessage.MessageExchangeId -> "123"))
@ -48,6 +47,7 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
case result: CamelMessage assert(result === expected)
case unexpected fail("Actor responded with unexpected message:" + unexpected)
}
system.stop(producer)
}
"produce a message and receive failure response" in {
@ -68,7 +68,7 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
case _: AkkaCamelException Stop
}
}))
}), name = "prod-anonymous-supervisor")
val producer = Await.result[ActorRef](supervisor.ask(Props(new TestProducer("direct:producer-test-2"))).mapTo[ActorRef], timeoutDuration)
val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123"))
filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) {
@ -78,27 +78,32 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
}
Await.ready(latch, timeoutDuration)
deadActor must be(Some(producer))
system.stop(producer)
}
"produce a message oneway" in {
val producer = system.actorOf(Props(new TestProducer("direct:producer-test-1", true) with Oneway))
val producer = system.actorOf(Props(new TestProducer("direct:producer-test-1", true) with Oneway), name = "direct-producer-1-oneway")
mockEndpoint.expectedBodiesReceived("TEST")
producer ! CamelMessage("test", Map())
mockEndpoint.assertIsSatisfied()
system.stop(producer)
}
"produces message twoway without sender reference" in {
val producer = system.actorOf(Props(new TestProducer("direct:producer-test-1")))
// this test causes a dead letter which can be ignored. The producer is two-way but a oneway tell is used
// to communicate with it and the response is ignored, which ends up in a dead letter
val producer = system.actorOf(Props(new TestProducer("direct:producer-test-1")), name = "ignore-this-deadletter-direct-producer-test-no-sender")
mockEndpoint.expectedBodiesReceived("test")
producer ! CamelMessage("test", Map())
mockEndpoint.assertIsSatisfied()
system.stop(producer)
}
}
"A Producer on an async Camel route" must {
"produce message to direct:producer-test-3 and receive normal response" in {
val producer = system.actorOf(Props(new TestProducer("direct:producer-test-3")))
val producer = system.actorOf(Props(new TestProducer("direct:producer-test-3")), name = "direct-producer-test-3")
val message = CamelMessage("test", Map(CamelMessage.MessageExchangeId -> "123"))
val future = producer.ask(message)(timeoutDuration)
@ -109,10 +114,11 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
result must be(expected)
case unexpected fail("Actor responded with unexpected message:" + unexpected)
}
system.stop(producer)
}
"produce message to direct:producer-test-3 and receive failure response" in {
val producer = system.actorOf(Props(new TestProducer("direct:producer-test-3")))
val producer = system.actorOf(Props(new TestProducer("direct:producer-test-3")), name = "direct-producer-test-3-receive-failure")
val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123"))
filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) {
@ -120,11 +126,12 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
e.getMessage must be("failure")
e.headers must be(Map(CamelMessage.MessageExchangeId -> "123"))
}
system.stop(producer)
}
"produce message, forward normal response of direct:producer-test-2 to a replying target actor and receive response" in {
val target = system.actorOf(Props[ReplyingForwardTarget])
val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target)))
val target = system.actorOf(Props[ReplyingForwardTarget], name = "reply-forwarding-target")
val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target)), name = "direct-producer-test-2-forwarder")
val message = CamelMessage("test", Map(CamelMessage.MessageExchangeId -> "123"))
val future = producer.ask(message)(timeoutDuration)
@ -135,11 +142,13 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
result must be(expected)
case unexpected fail("Actor responded with unexpected message:" + unexpected)
}
system.stop(target)
system.stop(producer)
}
"produce message, forward failure response of direct:producer-test-2 to a replying target actor and receive response" in {
val target = system.actorOf(Props[ReplyingForwardTarget])
val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target)))
val target = system.actorOf(Props[ReplyingForwardTarget], name = "reply-forwarding-target")
val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target)), name = "direct-producer-test-2-forwarder-failure")
val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123"))
filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) {
@ -147,30 +156,36 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
e.getMessage must be("failure")
e.headers must be(Map(CamelMessage.MessageExchangeId -> "123", "test" -> "failure"))
}
system.stop(target)
system.stop(producer)
}
"produce message, forward normal response to a producing target actor and produce response to direct:forward-test-1" in {
val target = system.actorOf(Props[ProducingForwardTarget])
val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target)))
val target = system.actorOf(Props[ProducingForwardTarget], name = "producer-forwarding-target")
val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target)), name = "direct-producer-test-2-forwarder-to-producing-target")
mockEndpoint.expectedBodiesReceived("received test")
producer.tell(CamelMessage("test", Map()), producer)
mockEndpoint.assertIsSatisfied()
system.stop(target)
system.stop(producer)
}
"produce message, forward failure response to a producing target actor and produce response to direct:forward-test-1" in {
val target = system.actorOf(Props[ProducingForwardTarget])
val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target)))
val target = system.actorOf(Props[ProducingForwardTarget], name = "producer-forwarding-target-failure")
val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target)), name = "direct-producer-test-2-forward-failure")
filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) {
mockEndpoint.expectedMessageCount(1)
mockEndpoint.message(0).body().isInstanceOf(classOf[akka.actor.Status.Failure])
producer.tell(CamelMessage("fail", Map()), producer)
mockEndpoint.assertIsSatisfied()
}
system.stop(target)
system.stop(producer)
}
"produce message, forward normal response from direct:producer-test-3 to a replying target actor and receive response" in {
val target = system.actorOf(Props[ReplyingForwardTarget])
val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target)))
val target = system.actorOf(Props[ReplyingForwardTarget], name = "reply-forwarding-target")
val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target)), name = "direct-producer-test-3-to-replying-actor")
val message = CamelMessage("test", Map(CamelMessage.MessageExchangeId -> "123"))
val future = producer.ask(message)(timeoutDuration)
@ -180,11 +195,13 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
message must be(expected)
case unexpected fail("Actor responded with unexpected message:" + unexpected)
}
system.stop(target)
system.stop(producer)
}
"produce message, forward failure response from direct:producer-test-3 to a replying target actor and receive response" in {
val target = system.actorOf(Props[ReplyingForwardTarget])
val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target)))
val target = system.actorOf(Props[ReplyingForwardTarget], name = "reply-forwarding-target")
val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target)), name = "direct-producer-test-3-forward-failure")
val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123"))
filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) {
@ -192,19 +209,23 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
e.getMessage must be("failure")
e.headers must be(Map(CamelMessage.MessageExchangeId -> "123", "test" -> "failure"))
}
system.stop(target)
system.stop(producer)
}
"produce message, forward normal response from direct:producer-test-3 to a producing target actor and produce response to direct:forward-test-1" in {
val target = system.actorOf(Props[ProducingForwardTarget])
val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target)))
val target = system.actorOf(Props[ProducingForwardTarget], "producing-forward-target-normal")
val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target)), name = "direct-producer-test-3-forward-normal")
mockEndpoint.expectedBodiesReceived("received test")
producer.tell(CamelMessage("test", Map()), producer)
mockEndpoint.assertIsSatisfied()
system.stop(target)
system.stop(producer)
}
"produce message, forward failure response from direct:producer-test-3 to a producing target actor and produce response to direct:forward-test-1" in {
val target = system.actorOf(Props[ProducingForwardTarget])
val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target)))
val target = system.actorOf(Props[ProducingForwardTarget], "producing-forward-target-failure")
val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target)), name = "direct-producer-test-3-forward-failure-producing-target")
filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) {
mockEndpoint.expectedMessageCount(1)
mockEndpoint.message(0).body().isInstanceOf(classOf[akka.actor.Status.Failure])
@ -253,7 +274,7 @@ object ProducerFeatureTest {
class ReplyingForwardTarget extends Actor {
def receive = {
case msg: CamelMessage
context.sender ! (msg.addHeader("test" -> "result"))
context.sender ! (msg.copy(headers = msg.headers + ("test" -> "result")))
case msg: akka.actor.Status.Failure
msg.cause match {
case e: AkkaCamelException context.sender ! Status.Failure(new AkkaCamelException(e, e.headers + ("test" -> "failure")))

View file

@ -1,53 +0,0 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.camel
import language.postfixOps
import org.scalatest.matchers.MustMatchers
import org.scalatest.WordSpec
import akka.camel.TestSupport.SharedCamelSystem
import scala.concurrent.util.duration._
import akka.actor.{ ActorRef, Props }
import akka.util.Timeout
import scala.concurrent.Await
class ProducerRegistryTest extends WordSpec with MustMatchers with SharedCamelSystem {
implicit val timeout = 5.seconds
implicit val ec = system.dispatcher
"A ProducerRegistry" must {
def newEmptyActor: ActorRef = system.actorOf(Props.empty)
def registerProcessorFor(actorRef: ActorRef) = camel.registerProducer(actorRef, "mock:mock")._2
"register a started SendProcessor for the producer, which is stopped when the actor is stopped" in {
val actorRef = newEmptyActor
val processor = registerProcessorFor(actorRef)
Await.result(camel.activationFutureFor(actorRef), timeout)
processor.isStarted must be(true)
system.stop(actorRef)
Await.result(camel.deactivationFutureFor(actorRef), timeout)
(processor.isStopping || processor.isStopped) must be(true)
}
"remove and stop the SendProcessor if the actorRef is registered" in {
val actorRef = newEmptyActor
val processor = registerProcessorFor(actorRef)
camel.unregisterProducer(actorRef)
(processor.isStopping || processor.isStopped) must be(true)
}
"remove and stop only the SendProcessor for the actorRef that is registered" in {
val actorRef1 = newEmptyActor
val actorRef2 = newEmptyActor
val processor = registerProcessorFor(actorRef2)
// this generates a warning on the activationTracker.
camel.unregisterProducer(actorRef1)
(!processor.isStopped && !processor.isStopping) must be(true)
camel.unregisterProducer(actorRef2)
(processor.isStopping || processor.isStopped) must be(true)
}
}
}

View file

@ -19,11 +19,12 @@ import akka.util.Timeout
import akka.testkit.AkkaSpec
private[camel] object TestSupport {
def start(actor: Actor, name: String)(implicit system: ActorSystem, timeout: Timeout): ActorRef =
Await.result(CamelExtension(system).activationFutureFor(system.actorOf(Props(actor), name))(timeout, system.dispatcher), timeout.duration)
def start(actor: Actor)(implicit system: ActorSystem): ActorRef = {
val actorRef = system.actorOf(Props(actor))
Await.result(CamelExtension(system).activationFutureFor(actorRef)(10 seconds, system.dispatcher), 10 seconds)
actorRef
def stop(actorRef: ActorRef)(implicit system: ActorSystem, timeout: Timeout) {
system.stop(actorRef)
Await.result(CamelExtension(system).deactivationFutureFor(actorRef)(timeout, system.dispatcher), timeout.duration)
}
private[camel] implicit def camelToTestWrapper(camel: Camel) = new CamelTestWrapper(camel)

View file

@ -33,7 +33,7 @@ class UntypedProducerTest extends WordSpec with MustMatchers with BeforeAndAfter
"An UntypedProducer producing a message to a sync Camel route" must {
"produce a message and receive a normal response" in {
val producer = system.actorOf(Props[SampleUntypedReplyingProducer])
val producer = system.actorOf(Props[SampleUntypedReplyingProducer], name = "sample-untyped-replying-producer")
val message = CamelMessage("test", Map(CamelMessage.MessageExchangeId -> "123"))
val future = producer.ask(message)(timeout)
@ -47,7 +47,7 @@ class UntypedProducerTest extends WordSpec with MustMatchers with BeforeAndAfter
}
"produce a message and receive a failure response" in {
val producer = system.actorOf(Props[SampleUntypedReplyingProducer])
val producer = system.actorOf(Props[SampleUntypedReplyingProducer], name = "sample-untyped-replying-producer-failure")
val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123"))
filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) {
@ -66,7 +66,7 @@ class UntypedProducerTest extends WordSpec with MustMatchers with BeforeAndAfter
"An UntypedProducer producing a message to a sync Camel route and then forwarding the response" must {
"produce a message and send a normal response to direct:forward-test-1" in {
val producer = system.actorOf(Props[SampleUntypedForwardingProducer])
val producer = system.actorOf(Props[SampleUntypedForwardingProducer], name = "sample-untyped-forwarding-producer")
mockEndpoint.expectedBodiesReceived("received test")
producer.tell(CamelMessage("test", Map[String, Any]()), producer)

View file

@ -5,9 +5,9 @@ import org.scalatest.matchers.MustMatchers
import scala.concurrent.util.duration._
import org.scalatest.{ GivenWhenThen, BeforeAndAfterEach, BeforeAndAfterAll, WordSpec }
import akka.actor.{ Props, ActorSystem }
import scala.concurrent.util.Duration
import akka.camel._
import akka.testkit.{ TimingTest, TestProbe, TestKit }
import akka.camel.internal.ActivationProtocol._
import scala.concurrent.util.FiniteDuration
class ActivationTrackerTest extends TestKit(ActorSystem("test")) with WordSpec with MustMatchers with BeforeAndAfterAll with BeforeAndAfterEach with GivenWhenThen {
@ -25,7 +25,7 @@ class ActivationTrackerTest extends TestKit(ActorSystem("test")) with WordSpec w
anotherAwaiting = new Awaiting(actor)
}
val at = system.actorOf(Props[ActivationTracker])
val at = system.actorOf(Props[ActivationTracker], name = "activationTrackker")
"ActivationTracker forwards activation message to all awaiting parties" taggedAs TimingTest in {
awaiting.awaitActivation()

View file

@ -11,7 +11,7 @@ import org.mockito.Mockito._
import org.apache.camel.{ CamelContext, ProducerTemplate, AsyncCallback }
import java.util.concurrent.atomic.AtomicBoolean
import scala.concurrent.util.duration._
import scala.concurrent.util.Duration
import concurrent.util.{ FiniteDuration, Duration }
import java.lang.String
import akka.actor.{ ActorRef, Props, ActorSystem, Actor }
import akka.camel._
@ -21,14 +21,17 @@ import akka.camel.TestSupport._
import java.util.concurrent.{ TimeoutException, CountDownLatch, TimeUnit }
import org.mockito.{ ArgumentMatcher, Matchers, Mockito }
import org.scalatest.matchers.MustMatchers
import akka.actor.Status.Failure
import akka.actor.Status.{ Success, Failure }
import com.typesafe.config.ConfigFactory
import akka.actor.ActorSystem.Settings
import akka.event.LoggingAdapter
import akka.testkit.{ TimingTest, TestKit, TestProbe }
import scala.concurrent.util.FiniteDuration
import org.apache.camel.impl.DefaultCamelContext
import concurrent.{ Await, Promise, Future }
import akka.util.Timeout
class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with MustMatchers with ActorProducerFixture {
implicit val timeout = Timeout(10 seconds)
"ActorProducer" when {
@ -54,7 +57,45 @@ class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with
"not expect response and not block" taggedAs TimingTest in {
time(producer.processExchangeAdapter(exchange)) must be < (200 millis)
}
}
"manualAck" when {
"response is Ack" must {
"process the exchange" in {
producer = given(outCapable = false, autoAck = false)
import system.dispatcher
val future = Future {
producer.processExchangeAdapter(exchange)
}
within(1 second) {
probe.expectMsgType[CamelMessage]
info("message sent to consumer")
probe.sender ! Ack
}
verify(exchange, never()).setResponse(any[CamelMessage])
info("no response forwarded to exchange")
Await.ready(future, timeout.duration)
}
}
"the consumer does not respond wit Ack" must {
"not block forever" in {
producer = given(outCapable = false, autoAck = false)
import system.dispatcher
val future = Future {
producer.processExchangeAdapter(exchange)
}
within(1 second) {
probe.expectMsgType[CamelMessage]
info("message sent to consumer")
}
verify(exchange, never()).setResponse(any[CamelMessage])
info("no response forwarded to exchange")
intercept[TimeoutException] {
Await.ready(future, camel.settings.replyTimeout - (1 seconds))
}
}
}
}
"out capable" when {
@ -93,9 +134,6 @@ class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with
}
}
//TODO: write more tests for synchronous process(exchange) method
}
"asynchronous" when {
@ -284,8 +322,21 @@ trait ActorProducerFixture extends MockitoSugar with BeforeAndAfterAll with Befo
def camelWithMocks = new DefaultCamel(sys) {
override val log = mock[LoggingAdapter]
override lazy val template = mock[ProducerTemplate]
override lazy val context = mock[CamelContext]
override val settings = mock[CamelSettings]
override lazy val context = mock[DefaultCamelContext]
override val settings = new CamelSettings(ConfigFactory.parseString(
"""
akka {
camel {
jmx = off
streamingCache = on
consumer {
auto-ack = on
reply-timeout = 2s
activation-timeout = 10s
}
}
}
"""))
}
camel = camelWithMocks
@ -351,6 +402,6 @@ trait ActorProducerFixture extends MockitoSugar with BeforeAndAfterAll with Befo
def receive = {
case msg sender ! "received " + msg
}
}))
}), name = "echoActor")
}

View file

@ -10,7 +10,8 @@ import org.scalatest.matchers.MustMatchers
import scala.concurrent.util.duration._
import scala.concurrent.util.Duration
import org.scalatest.WordSpec
import org.apache.camel.{ TypeConversionException, NoTypeConversionAvailableException }
import org.apache.camel.TypeConversionException
import language.postfixOps
class DurationConverterSpec extends WordSpec with MustMatchers {
import DurationTypeConverter._

View file

@ -6,6 +6,7 @@ package docs.camel;
import akka.camel.Camel;
import akka.camel.CamelExtension;
import akka.camel.javaapi.UntypedConsumerActor;
import akka.util.Timeout;
import scala.concurrent.Future;
import scala.concurrent.util.Duration;
import scala.concurrent.util.FiniteDuration;
@ -26,14 +27,14 @@ public class ActivationTestBase {
ActorRef producer = system.actorOf(props,"myproducer");
Camel camel = CamelExtension.get(system);
// get a future reference to the activation of the endpoint of the Consumer Actor
FiniteDuration duration = Duration.create(10, SECONDS);
Future<ActorRef> activationFuture = camel.activationFutureFor(producer, duration, system.dispatcher());
Timeout timeout = new Timeout(Duration.create(10, SECONDS));
Future<ActorRef> activationFuture = camel.activationFutureFor(producer, timeout, system.dispatcher());
//#CamelActivation
//#CamelDeactivation
// ..
system.stop(producer);
// get a future reference to the deactivation of the endpoint of the Consumer Actor
Future<ActorRef> deactivationFuture = camel.deactivationFutureFor(producer, duration, system.dispatcher());
Future<ActorRef> deactivationFuture = camel.deactivationFutureFor(producer, timeout, system.dispatcher());
//#CamelDeactivation
system.shutdown();
}

View file

@ -11,7 +11,7 @@ public class Consumer4 extends UntypedConsumerActor {
private final static FiniteDuration timeout = Duration.create(500, TimeUnit.MILLISECONDS);
@Override
public Duration replyTimeout() {
public FiniteDuration replyTimeout() {
return timeout;
}

View file

@ -3,6 +3,7 @@ package docs.camel;
import akka.actor.Status;
import akka.camel.CamelMessage;
import akka.camel.javaapi.UntypedConsumerActor;
import akka.dispatch.Mapper;
import org.apache.camel.builder.Builder;
import org.apache.camel.model.ProcessorDefinition;
import org.apache.camel.model.RouteDefinition;
@ -11,6 +12,13 @@ import scala.Option;
public class ErrorThrowingConsumer extends UntypedConsumerActor{
private String uri;
private static Mapper<RouteDefinition, ProcessorDefinition<?>> mapper = new Mapper<RouteDefinition, ProcessorDefinition<?>>() {
public ProcessorDefinition<?> apply(RouteDefinition rd) {
// Catch any exception and handle it by returning the exception message as response
return rd.onException(Exception.class).handled(true).transform(Builder.exceptionMessage()).end();
}
};
public ErrorThrowingConsumer(String uri){
this.uri = uri;
}
@ -29,9 +37,8 @@ public class ErrorThrowingConsumer extends UntypedConsumerActor{
}
@Override
public ProcessorDefinition<?> onRouteDefinition(RouteDefinition rd) {
// Catch any exception and handle it by returning the exception message as response
return rd.onException(Exception.class).handled(true).transform(Builder.exceptionMessage()).end();
public Mapper<RouteDefinition, ProcessorDefinition<?>> getRouteDefinitionHandler() {
return mapper;
}
@Override

View file

@ -2,6 +2,7 @@ package docs.camel;
//#CustomRoute
import akka.actor.UntypedActor;
import akka.camel.CamelMessage;
import akka.dispatch.Mapper;
import akka.japi.Function;
public class Responder extends UntypedActor{
@ -15,7 +16,8 @@ public class Responder extends UntypedActor{
}
private CamelMessage createResponse(CamelMessage msg) {
return msg.mapBody(new Function<String,String>() {
return msg.mapBody(new Mapper<String,String>() {
@Override
public String apply(String body) {
return String.format("received %s", body);
}

View file

@ -2,6 +2,7 @@ package docs.camel;
//#TransformOutgoingMessage
import akka.camel.CamelMessage;
import akka.camel.javaapi.UntypedProducerActor;
import akka.dispatch.Mapper;
import akka.japi.Function;
public class Transformer extends UntypedProducerActor{
@ -16,7 +17,8 @@ public class Transformer extends UntypedProducerActor{
}
private CamelMessage upperCase(CamelMessage msg) {
return msg.mapBody(new Function<String,String>() {
return msg.mapBody(new Mapper<String,String>() {
@Override
public String apply(String body) {
return body.toUpperCase();
}

View file

@ -3,6 +3,7 @@ package docs.camel.sample.http;
import akka.actor.Status;
import akka.actor.UntypedActor;
import akka.camel.CamelMessage;
import akka.dispatch.Mapper;
import akka.japi.Function;
//#HttpExample
@ -10,7 +11,8 @@ public class HttpTransformer extends UntypedActor{
public void onReceive(Object message) {
if (message instanceof CamelMessage) {
CamelMessage camelMessage = (CamelMessage) message;
CamelMessage replacedMessage = camelMessage.mapBody(new Function<Object, String>(){
CamelMessage replacedMessage = camelMessage.mapBody(new Mapper<Object, String>(){
@Override
public String apply(Object body) {
String text = new String((byte[])body);
return text.replaceAll("Akka ", "AKKA ");

View file

@ -3,6 +3,7 @@ package docs.camel.sample.route;
import akka.actor.ActorRef;
import akka.actor.UntypedActor;
import akka.camel.CamelMessage;
import akka.dispatch.Mapper;
import akka.japi.Function;
public class Transformer extends UntypedActor {
@ -16,7 +17,8 @@ public class Transformer extends UntypedActor {
if (message instanceof CamelMessage) {
// example: transform message body "foo" to "- foo -" and forward result to producer
CamelMessage camelMessage = (CamelMessage) message;
CamelMessage transformedMessage = camelMessage.mapBody(new Function<String, String>(){
CamelMessage transformedMessage = camelMessage.mapBody(new Mapper<String, String>(){
@Override
public String apply(String body) {
return String.format("- %s -",body);
}

View file

@ -48,10 +48,7 @@ object CustomRoute {
def receive = {
case msg: CamelMessage throw new Exception("error: %s" format msg.body)
}
override def onRouteDefinition(rd: RouteDefinition) = {
// Catch any exception and handle it by returning the exception message as response
rd.onException(classOf[Exception]).handled(true).transform(Builder.exceptionMessage).end
}
override def onRouteDefinition = (rd) rd.onException(classOf[Exception]).handled(true).transform(Builder.exceptionMessage).end
final override def preRestart(reason: Throwable, message: Option[Any]) {
sender ! Failure(reason)

View file

@ -21,7 +21,7 @@ object HttpExample {
def endpointUri = "jetty://http://akka.io/?bridgeEndpoint=true"
override def transformOutgoingMessage(msg: Any) = msg match {
case msg: CamelMessage msg.addHeaders(msg.headers(Set(Exchange.HTTP_PATH)))
case msg: CamelMessage msg.copy(headers = msg.headers ++ msg.headers(Set(Exchange.HTTP_PATH)))
}
override def routeResponse(msg: Any) { transformer forward msg }

View file

@ -639,7 +639,7 @@ object Dependencies {
val kernel = Seq(Test.scalatest, Test.junit)
val camel = Seq(camelCore, Test.scalatest, Test.junit, Test.mockito)
val camel = Seq(camelCore, Test.scalatest, Test.junit, Test.mockito, Test.logback, Test.commonsIo)
val camelSample = Seq(CamelSample.camelJetty)
@ -655,10 +655,12 @@ object Dependencies {
val multiNodeSample = Seq(Test.scalatest)
}
object V {
val Camel = "2.10.0"
}
object Dependency {
// Compile
val camelCore = "org.apache.camel" % "camel-core" % "2.10.0" exclude("org.slf4j", "slf4j-api") // ApacheV2
val camelCore = "org.apache.camel" % "camel-core" % V.Camel exclude("org.slf4j", "slf4j-api") // ApacheV2
val config = "com.typesafe" % "config" % "0.5.2" // ApacheV2
val netty = "io.netty" % "netty" % "3.5.4.Final" // ApacheV2
val protobuf = "com.google.protobuf" % "protobuf-java" % "2.4.1" // New BSD
@ -689,7 +691,7 @@ object Dependency {
// Camel Sample
object CamelSample {
val camelJetty = "org.apache.camel" % "camel-jetty" % "2.10.0" // ApacheV2
val camelJetty = "org.apache.camel" % "camel-jetty" % V.Camel // ApacheV2
}
}