diff --git a/akka-camel/src/main/resources/reference.conf b/akka-camel/src/main/resources/reference.conf
index 76be54088d..302a31b754 100644
--- a/akka-camel/src/main/resources/reference.conf
+++ b/akka-camel/src/main/resources/reference.conf
@@ -9,7 +9,8 @@ akka {
camel {
# Whether JMX should be enabled or disabled for the Camel Context
jmx = off
-
+ # enable/disable streaming cache on the Camel Context
+ streamingCache = on
consumer {
# Configured setting which determines whether one-way communications between an endpoint and this consumer actor
# should be auto-acknowledged or application-acknowledged.
diff --git a/akka-camel/src/main/scala/akka/camel/Activation.scala b/akka-camel/src/main/scala/akka/camel/Activation.scala
index 4af600215c..a12abc7d0c 100644
--- a/akka-camel/src/main/scala/akka/camel/Activation.scala
+++ b/akka-camel/src/main/scala/akka/camel/Activation.scala
@@ -10,16 +10,12 @@ import akka.actor.{ ActorSystem, Props, ActorRef }
import akka.pattern._
import scala.concurrent.util.Duration
import concurrent.{ ExecutionContext, Future }
-import scala.concurrent.util.FiniteDuration
/**
* Activation trait that can be used to wait on activation or de-activation of Camel endpoints.
* The Camel endpoints are activated asynchronously. This trait can signal when an endpoint is activated or de-activated.
*/
trait Activation {
- def system: ActorSystem //FIXME Why is this here, what's it needed for and who should use it?
-
- private val activationTracker = system.actorOf(Props[ActivationTracker], "camelActivationTracker") //FIXME Why is this also top level?
/**
* Produces a Future with the specified endpoint that will be completed when the endpoint has been activated,
@@ -28,11 +24,7 @@ trait Activation {
* @param endpoint the endpoint to be activated
* @param timeout the timeout for the Future
*/
- def activationFutureFor(endpoint: ActorRef)(implicit timeout: FiniteDuration, executor: ExecutionContext): Future[ActorRef] =
- (activationTracker.ask(AwaitActivation(endpoint))(Timeout(timeout))).map[ActorRef]({
- case EndpointActivated(`endpoint`) ⇒ endpoint
- case EndpointFailedToActivate(`endpoint`, cause) ⇒ throw cause
- })
+ def activationFutureFor(endpoint: ActorRef)(implicit timeout: Timeout, executor: ExecutionContext): Future[ActorRef]
/**
* Produces a Future which will be completed when the given endpoint has been deactivated or
@@ -41,9 +33,5 @@ trait Activation {
* @param endpoint the endpoint to be deactivated
* @param timeout the timeout of the Future
*/
- def deactivationFutureFor(endpoint: ActorRef)(implicit timeout: FiniteDuration, executor: ExecutionContext): Future[ActorRef] =
- (activationTracker.ask(AwaitDeActivation(endpoint))(Timeout(timeout))).map[ActorRef]({
- case EndpointDeActivated(`endpoint`) ⇒ endpoint
- case EndpointFailedToDeActivate(`endpoint`, cause) ⇒ throw cause
- })
+ def deactivationFutureFor(endpoint: ActorRef)(implicit timeout: Timeout, executor: ExecutionContext): Future[ActorRef]
}
\ No newline at end of file
diff --git a/akka-camel/src/main/scala/akka/camel/Camel.scala b/akka-camel/src/main/scala/akka/camel/Camel.scala
index 258c7ec2ad..43682c49c5 100644
--- a/akka-camel/src/main/scala/akka/camel/Camel.scala
+++ b/akka-camel/src/main/scala/akka/camel/Camel.scala
@@ -6,7 +6,8 @@ package akka.camel
import internal._
import akka.actor._
-import org.apache.camel.{ ProducerTemplate, CamelContext }
+import org.apache.camel.ProducerTemplate
+import org.apache.camel.impl.DefaultCamelContext
import com.typesafe.config.Config
import scala.concurrent.util.Duration
import java.util.concurrent.TimeUnit._
@@ -17,16 +18,16 @@ import scala.concurrent.util.FiniteDuration
* '''Note:''' `CamelContext` and `ProducerTemplate` are stopped when the associated actor system is shut down.
* This trait can be obtained through the [[akka.camel.CamelExtension]] object.
*/
-trait Camel extends ConsumerRegistry with ProducerRegistry with Extension with Activation {
+trait Camel extends Extension with Activation {
/**
* Underlying camel context.
*
* It can be used to configure camel manually, i.e. when the user wants to add new routes or endpoints,
* i.e. {{{camel.context.addRoutes(...)}}}
*
- * @see [[org.apache.camel.CamelContext]]
+ * @see [[org.apache.camel.impl.DefaultCamelContext]]
*/
- def context: CamelContext
+ def context: DefaultCamelContext
/**
* The Camel ProducerTemplate.
@@ -38,6 +39,16 @@ trait Camel extends ConsumerRegistry with ProducerRegistry with Extension with A
* The settings for the CamelExtension
*/
def settings: CamelSettings
+
+ /**
+ * For internal use only. Returns the camel supervisor actor.
+ */
+ private[camel] def supervisor: ActorRef
+
+ /**
+ * For internal use only. Returns the associated ActorSystem.
+ */
+ private[camel] def system: ActorSystem
}
/**
@@ -68,6 +79,12 @@ class CamelSettings private[camel] (config: Config) {
*
*/
final val jmxStatistics: Boolean = config.getBoolean("akka.camel.jmx")
+
+ /**
+ * enables or disables streamingCache on the Camel Context
+ */
+ final val streamingCache: Boolean = config.getBoolean("akka.camel.streamingCache")
+
}
/**
@@ -97,4 +114,4 @@ object CamelExtension extends ExtensionId[Camel] with ExtensionIdProvider {
override def lookup(): ExtensionId[Camel] = CamelExtension
override def get(system: ActorSystem): Camel = super.get(system)
-}
\ No newline at end of file
+}
diff --git a/akka-camel/src/main/scala/akka/camel/CamelMessage.scala b/akka-camel/src/main/scala/akka/camel/CamelMessage.scala
index 6c45a66bf4..3e0f0734b0 100644
--- a/akka-camel/src/main/scala/akka/camel/CamelMessage.scala
+++ b/akka-camel/src/main/scala/akka/camel/CamelMessage.scala
@@ -8,14 +8,14 @@ import java.util.{ Map ⇒ JMap, Set ⇒ JSet }
import scala.collection.JavaConversions._
-import akka.japi.{ Function ⇒ JFunction }
import org.apache.camel.{ CamelContext, Message ⇒ JCamelMessage }
import akka.AkkaException
import scala.reflect.ClassTag
+import akka.dispatch.Mapper
+import util.{ Success, Failure, Try }
/**
* An immutable representation of a Camel message.
- *
* @author Martin Krasser
*/
case class CamelMessage(body: Any, headers: Map[String, Any]) {
@@ -47,86 +47,59 @@ case class CamelMessage(body: Any, headers: Map[String, Any]) {
*/
def getHeaders: JMap[String, Any] = headers
- /**
- * Returns the header with given name. Throws NoSuchElementException
- * if the header doesn't exist.
- */
- def header(name: String): Option[Any] = headers.get(name)
-
- /**
- * Returns the header with given name. Throws NoSuchElementException
- * if the header doesn't exist.
- *
- * Java API
- */
- def getHeader(name: String): Any = headers(name)
-
- /**
- * Creates a CamelMessage with a transformed body using a transformer function.
- */
- def mapBody[A, B](transformer: A ⇒ B): CamelMessage = withBody(transformer(body.asInstanceOf[A]))
-
- /**
- * Creates a CamelMessage with a transformed body using a transformer function.
- *
- * Java API
- */
- def mapBody[A, B](transformer: JFunction[A, B]): CamelMessage = withBody(transformer(body.asInstanceOf[A]))
-
- /**
- * Creates a CamelMessage with a given body.
- */
- def withBody(body: Any): CamelMessage = CamelMessage(body, this.headers)
-
- /**
- * Creates a new CamelMessage with given headers.
- */
- def withHeaders[A](headers: Map[String, A]): CamelMessage = copy(this.body, headers)
-
/**
* Creates a new CamelMessage with given headers. A copy of the headers map is made.
*
* Java API
*/
- def withHeaders[A](headers: JMap[String, A]): CamelMessage = withHeaders(headers.toMap)
+ def withHeaders[A](headers: JMap[String, A]): CamelMessage = copy(this.body, headers.toMap)
/**
- * Creates a new CamelMessage with given headers added to the current headers.
- */
- def addHeaders[A](headers: Map[String, A]): CamelMessage = copy(this.body, this.headers ++ headers)
-
- /**
- * Creates a new CamelMessage with given headers added to the current headers.
- * A copy of the headers map is made.
+ * Returns the header by given name parameter in a [[scala.util.Try]]. The header is converted to type T, which is returned
+ * in a [[scala.util.Success]]. If an exception occurs during the conversion to the type T or when the header cannot be found,
+ * the exception is returned in a [[scala.util.Failure]].
+ *
*
- * Java API
+ * The CamelContext is accessible in a [[akka.camel.javaapi.UntypedConsumerActor]] and [[akka.camel.javaapi.UntypedProducerActor]]
+ * using the `getCamelContext` method, and is available on the [[akka.camel.CamelExtension]].
+ *
*/
- def addHeaders[A](headers: JMap[String, A]): CamelMessage = addHeaders(headers.toMap)
-
- /**
- * Creates a new CamelMessage with the given header added to the current headers.
- */
- def addHeader(header: (String, Any)): CamelMessage = copy(this.body, this.headers + header)
-
- /**
- * Creates a new CamelMessage with the given header, represented by name and
- * value added to the existing headers.
- *
- * Java API
- */
- def addHeader(name: String, value: Any): CamelMessage = addHeader((name, value))
-
- /**
- * Creates a new CamelMessage where the header with given headerName is removed from
- * the existing headers.
- */
- def withoutHeader(headerName: String): CamelMessage = copy(this.body, this.headers - headerName)
-
- def copyContentTo(to: JCamelMessage): Unit = {
- to.setBody(this.body)
- for ((name, value) ← this.headers) to.getHeaders.put(name, value.asInstanceOf[AnyRef])
+ def headerAs[T](name: String)(implicit t: ClassTag[T], camelContext: CamelContext): Try[T] = {
+ def tryHeader = headers.get(name).map(camelContext.getTypeConverter.mandatoryConvertTo[T](t.runtimeClass.asInstanceOf[Class[T]], _)) match {
+ case Some(header) ⇒ header
+ case None ⇒ throw new NoSuchElementException(name)
+ }
+ Try(tryHeader)
}
+ /**
+ * Returns the header by given name parameter. The header is converted to type T as defined by the clazz parameter.
+ * An exception is thrown when the conversion to the type T fails or when the header cannot be found.
+ *
+ * The CamelContext is accessible in a [[akka.camel.javaapi.UntypedConsumerActor]] and [[akka.camel.javaapi.UntypedProducerActor]]
+ * using the `getCamelContext` method, and is available on the [[akka.camel.CamelExtension]].
+ *
+ * Java API
+ */
+ def getHeaderAs[T](name: String, clazz: Class[T], camelContext: CamelContext): T = headerAs[T](name)(ClassTag(clazz), camelContext) match {
+ case Success(header) ⇒ header
+ case Failure(t) ⇒ throw t
+ }
+
+ /**
+ * Returns a new CamelMessage with a transformed body using a transformer function.
+ * This method will throw a [[java.lang.ClassCastException]] if the body cannot be mapped to type A.
+ */
+ def mapBody[A, B](transformer: A ⇒ B): CamelMessage = copy(body = transformer(body.asInstanceOf[A]))
+
+ /**
+ * Returns a new CamelMessage with a transformed body using a transformer function.
+ * This method will throw a [[java.lang.ClassCastException]] if the body cannot be mapped to type A.
+ *
+ * Java API
+ */
+ def mapBody[A, B](transformer: Mapper[A, B]): CamelMessage = copy(body = transformer(body.asInstanceOf[A]))
+
/**
* Returns the body of the message converted to the type T. Conversion is done
* using Camel's type converter. The type converter is obtained from the CamelContext that is passed in.
@@ -148,6 +121,12 @@ case class CamelMessage(body: Any, headers: Map[String, Any]) {
*/
def getBodyAs[T](clazz: Class[T], camelContext: CamelContext): T = camelContext.getTypeConverter.mandatoryConvertTo[T](clazz, body)
+ /**
+ * Returns a new CamelMessage with a new body, while keeping the same headers.
+ *
+ * Java API
+ */
+ def withBody[T](body: T) = this.copy(body = body)
/**
* Creates a CamelMessage with current body converted to type T.
* The CamelContext is accessible in a [[akka.camel.javaapi.UntypedConsumerActor]] and [[akka.camel.javaapi.UntypedProducerActor]]
@@ -163,28 +142,7 @@ case class CamelMessage(body: Any, headers: Map[String, Any]) {
*
* Java API
*/
- def withBodyAs[T](clazz: Class[T])(implicit camelContext: CamelContext): CamelMessage = withBody(getBodyAs(clazz, camelContext))
-
- /**
- * Returns the header with given name converted to type T. Throws
- * NoSuchElementException if the header doesn't exist.
- *
- * The CamelContext is accessible in a [[akka.camel.javaapi.UntypedConsumerActor]] and [[akka.camel.javaapi.UntypedProducerActor]]
- * using the `getCamelContext` method, and is available on the [[akka.camel.CamelExtension]].
- *
- */
- def headerAs[T](name: String)(implicit t: ClassTag[T], camelContext: CamelContext): Option[T] = header(name).map(camelContext.getTypeConverter.mandatoryConvertTo[T](t.runtimeClass.asInstanceOf[Class[T]], _))
-
- /**
- * Returns the header with given name converted to type as given by the clazz
- * parameter. Throws NoSuchElementException if the header doesn't exist.
- *
- * The CamelContext is accessible in a [[akka.camel.javaapi.UntypedConsumerActor]] and [[akka.camel.javaapi.UntypedProducerActor]]
- * using the `getCamelContext` method, and is available on the [[akka.camel.CamelExtension]].
- *
- * Java API
- */
- def getHeaderAs[T](name: String, clazz: Class[T], camelContext: CamelContext): T = headerAs[T](name)(ClassTag(clazz), camelContext).get
+ def withBodyAs[T](clazz: Class[T])(implicit camelContext: CamelContext): CamelMessage = copy(body = getBodyAs(clazz, camelContext))
}
@@ -208,24 +166,28 @@ object CamelMessage {
* CamelMessage then msg is returned, otherwise msg is set as body of a
* newly created CamelMessage object.
*/
- def canonicalize(msg: Any) = msg match {
+ private[camel] def canonicalize(msg: Any) = msg match {
case mobj: CamelMessage ⇒ mobj
case body ⇒ CamelMessage(body, Map.empty)
}
- /**
- * Creates a new CamelMessage object from the Camel message.
- */
- def from(camelMessage: JCamelMessage): CamelMessage = from(camelMessage, Map.empty)
-
/**
* Creates a new CamelMessage object from the Camel message.
*
* @param headers additional headers to set on the created CamelMessage in addition to those
* in the Camel message.
*/
- def from(camelMessage: JCamelMessage, headers: Map[String, Any]): CamelMessage = CamelMessage(camelMessage.getBody, headers ++ camelMessage.getHeaders)
+ private[camel] def from(camelMessage: JCamelMessage, headers: Map[String, Any]): CamelMessage = CamelMessage(camelMessage.getBody, headers ++ camelMessage.getHeaders)
+ /**
+ * For internal use only.
+ * copies the content of this CamelMessage to an Apache Camel Message.
+ *
+ */
+ private[camel] def copyContent(from: CamelMessage, to: JCamelMessage): Unit = {
+ to.setBody(from.body)
+ for ((name, value) ← from.headers) to.getHeaders.put(name, value.asInstanceOf[AnyRef])
+ }
}
/**
@@ -242,7 +204,6 @@ case object Ack {
* An exception indicating that the exchange to the camel endpoint failed.
* It contains the failure cause obtained from Exchange.getException and the headers from either the Exchange.getIn
* message or Exchange.getOut message, depending on the exchange pattern.
- *
*/
class AkkaCamelException private[akka] (cause: Throwable, val headers: Map[String, Any])
extends AkkaException(cause.getMessage, cause) {
diff --git a/akka-camel/src/main/scala/akka/camel/Consumer.scala b/akka-camel/src/main/scala/akka/camel/Consumer.scala
index 89f7719e23..61ba9a8cc4 100644
--- a/akka-camel/src/main/scala/akka/camel/Consumer.scala
+++ b/akka-camel/src/main/scala/akka/camel/Consumer.scala
@@ -4,20 +4,19 @@
package akka.camel
-import language.postfixOps
-import internal.component.DurationTypeConverter
+import akka.camel.internal.CamelSupervisor.Register
import org.apache.camel.model.{ RouteDefinition, ProcessorDefinition }
import akka.actor._
-import scala.concurrent.util.Duration
-import scala.concurrent.util.duration._
import scala.concurrent.util.FiniteDuration
+import akka.dispatch.Mapper
/**
* Mixed in by Actor implementations that consume message from Camel endpoints.
*
* @author Martin Krasser
*/
-trait Consumer extends Actor with CamelSupport with ConsumerConfig {
+trait Consumer extends Actor with CamelSupport {
+ import Consumer._
/**
* Must return the Camel endpoint URI that the consumer wants to consume messages from.
*/
@@ -33,11 +32,13 @@ trait Consumer extends Actor with CamelSupport with ConsumerConfig {
// with order of execution of trait body in the Java version (UntypedConsumerActor)
// where getEndpointUri is called before its constructor (where a uri is set to return from getEndpointUri)
// and remains null. CustomRouteTest provides a test to verify this.
- camel.registerConsumer(endpointUri, this, activationTimeout)
+ register()
+ }
+
+ private[this] def register() {
+ camel.supervisor ! Register(self, endpointUri, Some(ConsumerConfig(activationTimeout, replyTimeout, autoAck, onRouteDefinition)))
}
-}
-trait ConsumerConfig { this: CamelSupport ⇒
/**
* How long the actor should wait for activation before it fails.
*/
@@ -48,7 +49,7 @@ trait ConsumerConfig { this: CamelSupport ⇒
* the endpoint can take to send the response before the message exchange fails. It defaults to 1 minute.
* This setting is used for out-capable, in-only, manually acknowledged communication.
*/
- def replyTimeout: Duration = camel.settings.replyTimeout
+ def replyTimeout: FiniteDuration = camel.settings.replyTimeout
/**
* Determines whether one-way communications between an endpoint and this consumer actor
@@ -58,9 +59,36 @@ trait ConsumerConfig { this: CamelSupport ⇒
def autoAck: Boolean = camel.settings.autoAck
/**
- * The route definition handler for creating a custom route to this consumer instance.
+ * Returns the route definition handler for creating a custom route to this consumer.
+ * By default it returns an identity function, override this method to
+ * return a custom route definition handler. The returned function is not allowed to close over 'this', meaning it is
+ * not allowed to refer to the actor instance itself, since that can easily cause concurrent shared state issues.
*/
- //FIXME: write a test confirming onRouteDefinition gets called
- def onRouteDefinition(rd: RouteDefinition): ProcessorDefinition[_] = rd
+ def onRouteDefinition: RouteDefinition ⇒ ProcessorDefinition[_] = {
+ val mapper = getRouteDefinitionHandler
+ if (mapper != identityRouteMapper) mapper.apply _
+ else identityRouteMapper
+ }
+ /**
+ * Java API. Returns the [[akka.dispatch.Mapper]] function that will be used as a route definition handler
+ * for creating custom route to this consumer. By default it returns an identity function, override this method to
+ * return a custom route definition handler. The [[akka.dispatch.Mapper]] is not allowed to close over 'this', meaning it is
+ * not allowed to refer to the actor instance itself, since that can easily cause concurrent shared state issues.
+ */
+ def getRouteDefinitionHandler: Mapper[RouteDefinition, ProcessorDefinition[_]] = identityRouteMapper
}
+
+/**
+ * Internal use only.
+ */
+private[camel] object Consumer {
+ val identityRouteMapper = new Mapper[RouteDefinition, ProcessorDefinition[_]]() {
+ override def checkedApply(rd: RouteDefinition): ProcessorDefinition[_] = rd
+ }
+}
+/**
+ * For internal use only.
+ * Captures the configuration of the Consumer.
+ */
+private[camel] case class ConsumerConfig(activationTimeout: FiniteDuration, replyTimeout: FiniteDuration, autoAck: Boolean, onRouteDefinition: RouteDefinition ⇒ ProcessorDefinition[_]) extends NoSerializationVerificationNeeded
diff --git a/akka-camel/src/main/scala/akka/camel/Producer.scala b/akka-camel/src/main/scala/akka/camel/Producer.scala
index c6e8c6714d..683ff4f20f 100644
--- a/akka-camel/src/main/scala/akka/camel/Producer.scala
+++ b/akka-camel/src/main/scala/akka/camel/Producer.scala
@@ -4,10 +4,11 @@
package akka.camel
-import akka.actor.Actor
+import akka.actor.{ Props, NoSerializationVerificationNeeded, ActorRef, Actor }
+import internal.CamelSupervisor.{ CamelProducerObjects, Register }
import internal.CamelExchangeAdapter
import akka.actor.Status.Failure
-import org.apache.camel.{ Endpoint, Exchange, ExchangePattern, AsyncCallback }
+import org.apache.camel.{ Endpoint, ExchangePattern, AsyncCallback }
import org.apache.camel.processor.SendProcessor
/**
@@ -15,9 +16,16 @@ import org.apache.camel.processor.SendProcessor
*
* @author Martin Krasser
*/
-trait ProducerSupport extends CamelSupport { this: Actor ⇒
+trait ProducerSupport extends Actor with CamelSupport {
+ private[this] var messages = Map[ActorRef, Any]()
+ private[this] var producerChild: Option[ActorRef] = None
- protected[this] lazy val (endpoint: Endpoint, processor: SendProcessor) = camel.registerProducer(self, endpointUri)
+ override def preStart() {
+ super.preStart()
+ register()
+ }
+
+ private[this] def register() { camel.supervisor ! Register(self, endpointUri) }
/**
* CamelMessage headers to copy by default from request message to response-message.
@@ -43,40 +51,6 @@ trait ProducerSupport extends CamelSupport { this: Actor ⇒
*/
def headersToCopy: Set[String] = headersToCopyDefault
- /**
- * Initiates a message exchange of given pattern with the endpoint specified by
- * endpointUri. The in-message of the initiated exchange is the canonical form
- * of msg. After sending the in-message, the processing result (response) is passed
- * as argument to receiveAfterProduce. If the response is received synchronously from
- * the endpoint then receiveAfterProduce is called synchronously as well. If the
- * response is received asynchronously, the receiveAfterProduce is called
- * asynchronously. The original
- * sender and senderFuture are preserved.
- *
- * @see CamelMessage#canonicalize(Any)
- *
- * @param msg message to produce
- * @param pattern exchange pattern
- */
- protected def produce(msg: Any, pattern: ExchangePattern): Unit = {
- // Need copies of sender reference here since the callback could be done
- // later by another thread.
- val producer = self
- val originalSender = sender
-
- val cmsg = CamelMessage.canonicalize(msg)
- val xchg = new CamelExchangeAdapter(endpoint.createExchange(pattern))
-
- xchg.setRequest(cmsg)
-
- processor.process(xchg.exchange, new AsyncCallback {
- // Ignoring doneSync, sending back async uniformly.
- def done(doneSync: Boolean): Unit = producer.tell(
- if (xchg.exchange.isFailed) xchg.toFailureResult(cmsg.headers(headersToCopy))
- else MessageResult(xchg.toResponseMessage(cmsg.headers(headersToCopy))), originalSender)
- })
- }
-
/**
* Produces msg to the endpoint specified by endpointUri. Before the message is
* actually sent it is pre-processed by calling receiveBeforeProduce. If oneway
@@ -85,12 +59,28 @@ trait ProducerSupport extends CamelSupport { this: Actor ⇒
* @see Producer#produce(Any, ExchangePattern)
*/
protected def produce: Receive = {
+ case CamelProducerObjects(endpoint, processor) ⇒
+ if (producerChild.isEmpty) {
+ producerChild = Some(context.actorOf(Props(new ProducerChild(endpoint, processor))))
+ messages = {
+ for (
+ child ← producerChild;
+ (sender, msg) ← messages
+ ) child.tell(msg, sender)
+ Map()
+ }
+ }
case res: MessageResult ⇒ routeResponse(res.message)
case res: FailureResult ⇒
val e = new AkkaCamelException(res.cause, res.headers)
routeResponse(Failure(e))
throw e
- case msg ⇒ produce(transformOutgoingMessage(msg), if (oneway) ExchangePattern.InOnly else ExchangePattern.InOut)
+
+ case msg ⇒
+ producerChild match {
+ case Some(child) ⇒ child forward msg
+ case None ⇒ messages += (sender -> msg)
+ }
}
/**
@@ -117,8 +107,45 @@ trait ProducerSupport extends CamelSupport { this: Actor ⇒
protected def routeResponse(msg: Any): Unit = if (!oneway) sender ! transformResponse(msg)
-}
+ private class ProducerChild(endpoint: Endpoint, processor: SendProcessor) extends Actor {
+ def receive = {
+ case msg @ (_: FailureResult | _: MessageResult) ⇒ context.parent forward msg
+ case msg ⇒ produce(endpoint, processor, transformOutgoingMessage(msg), if (oneway) ExchangePattern.InOnly else ExchangePattern.InOut)
+ }
+ /**
+ * Initiates a message exchange of given pattern with the endpoint specified by
+ * endpointUri. The in-message of the initiated exchange is the canonical form
+ * of msg. After sending the in-message, the processing result (response) is passed
+ * as argument to receiveAfterProduce. If the response is received synchronously from
+ * the endpoint then receiveAfterProduce is called synchronously as well. If the
+ * response is received asynchronously, the receiveAfterProduce is called
+ * asynchronously. The original
+ * sender and senderFuture are preserved.
+ *
+ * @see CamelMessage#canonicalize(Any)
+ * @param endpoint the endpoint
+ * @param processor the processor
+ * @param msg message to produce
+ * @param pattern exchange pattern
+ */
+ protected def produce(endpoint: Endpoint, processor: SendProcessor, msg: Any, pattern: ExchangePattern): Unit = {
+ // Need copies of sender reference here since the callback could be done
+ // later by another thread.
+ val producer = self
+ val originalSender = sender
+ val xchg = new CamelExchangeAdapter(endpoint.createExchange(pattern))
+ val cmsg = CamelMessage.canonicalize(msg)
+ xchg.setRequest(cmsg)
+ processor.process(xchg.exchange, new AsyncCallback {
+ // Ignoring doneSync, sending back async uniformly.
+ def done(doneSync: Boolean): Unit = producer.tell(
+ if (xchg.exchange.isFailed) xchg.toFailureResult(cmsg.headers(headersToCopy))
+ else MessageResult(xchg.toResponseMessage(cmsg.headers(headersToCopy))), originalSender)
+ })
+ }
+ }
+}
/**
* Mixed in by Actor implementations to produce messages to Camel endpoints.
*/
@@ -132,14 +159,16 @@ trait Producer extends ProducerSupport { this: Actor ⇒
}
/**
+ * For internal use only.
* @author Martin Krasser
*/
-private case class MessageResult(message: CamelMessage)
+private case class MessageResult(message: CamelMessage) extends NoSerializationVerificationNeeded
/**
+ * For internal use only.
* @author Martin Krasser
*/
-private case class FailureResult(cause: Throwable, headers: Map[String, Any] = Map.empty)
+private case class FailureResult(cause: Throwable, headers: Map[String, Any] = Map.empty) extends NoSerializationVerificationNeeded
/**
* A one-way producer.
diff --git a/akka-camel/src/main/scala/akka/camel/internal/ActivationMessage.scala b/akka-camel/src/main/scala/akka/camel/internal/ActivationMessage.scala
index bdd915ff70..7b27dbc789 100644
--- a/akka-camel/src/main/scala/akka/camel/internal/ActivationMessage.scala
+++ b/akka-camel/src/main/scala/akka/camel/internal/ActivationMessage.scala
@@ -6,53 +6,64 @@ package akka.camel.internal
import akka.actor.ActorRef
-/**
- * Super class of all activation messages. Registration of the Camel [[akka.camel.Consumer]]s and [[akka.camel.Producer]]s
- * is done asynchronously. Activation messages are sent in the Camel extension when endpoints are
- * activated, de-activated, failed to activate and failed to de-activate.
- * You can use the [[akka.camel.Activation]] trait which is available on [[akka.camel.Camel]]
- * to await activation or de-activation of endpoints.
- */
-private[camel] abstract class ActivationMessage(val actor: ActorRef)
+private[camel] object ActivationProtocol {
+ /**
+ * Super class of all activation messages. Registration of the Camel [[akka.camel.Consumer]]s and [[akka.camel.Producer]]s
+ * is done asynchronously. Activation messages are sent in the Camel extension when endpoints are
+ * activated, de-activated, failed to activate and failed to de-activate.
+ * You can use the [[akka.camel.Activation]] trait which is available on [[akka.camel.Camel]]
+ * to await activation or de-activation of endpoints.
+ */
+ @SerialVersionUID(1L)
+ private[camel] abstract class ActivationMessage(val actor: ActorRef) extends Serializable
-/**
- * For internal use only. companion object of ActivationMessage
- *
- */
-private[camel] object ActivationMessage {
- def unapply(msg: ActivationMessage): Option[ActorRef] = Option(msg.actor)
-}
+ /**
+ * For internal use only. companion object of ActivationMessage
+ *
+ */
+ private[camel] object ActivationMessage {
+ def unapply(msg: ActivationMessage): Option[ActorRef] = Option(msg.actor)
+ }
-/**
- * Event message indicating that a single endpoint has been activated.
- * You can use the [[akka.camel.Activation]] trait which is available on [[akka.camel.Camel]]
- * to await activation or de-activation of endpoints.
- * @param actorRef the endpoint that was activated
- */
-private[camel] sealed case class EndpointActivated(actorRef: ActorRef) extends ActivationMessage(actorRef)
+ /**
+ * For internal use only.
+ * Event message indicating that a single endpoint has been activated.
+ * You can use the [[akka.camel.Activation]] trait which is available on [[akka.camel.Camel]]
+ * to await activation or de-activation of endpoints.
+ * @param actorRef the endpoint that was activated
+ */
+ @SerialVersionUID(1L)
+ final case class EndpointActivated(actorRef: ActorRef) extends ActivationMessage(actorRef)
-/**
- * Event message indicating that a single endpoint failed to activate.
- * You can use the [[akka.camel.Activation]] trait which is available on [[akka.camel.Camel]]
- * to await activation or de-activation of endpoints.
- * @param actorRef the endpoint that failed to activate
- * @param cause the cause for failure
- */
-private[camel] sealed case class EndpointFailedToActivate(actorRef: ActorRef, cause: Throwable) extends ActivationMessage(actorRef)
+ /**
+ * For internal use only.
+ * Event message indicating that a single endpoint failed to activate.
+ * You can use the [[akka.camel.Activation]] trait which is available on [[akka.camel.Camel]]
+ * to await activation or de-activation of endpoints.
+ * @param actorRef the endpoint that failed to activate
+ * @param cause the cause for failure
+ */
+ @SerialVersionUID(1L)
+ final case class EndpointFailedToActivate(actorRef: ActorRef, cause: Throwable) extends ActivationMessage(actorRef)
-/**
- * Event message indicating that a single endpoint was de-activated.
- * You can use the [[akka.camel.Activation]] trait which is available on [[akka.camel.Camel]]
- * to await activation or de-activation of endpoints.
- * @param actorRef the endpoint that was de-activated
- */
-private[camel] sealed case class EndpointDeActivated(actorRef: ActorRef) extends ActivationMessage(actorRef)
+ /**
+ * For internal use only.
+ * Event message indicating that a single endpoint was de-activated.
+ * You can use the [[akka.camel.Activation]] trait which is available on [[akka.camel.Camel]]
+ * to await activation or de-activation of endpoints.
+ * @param actorRef the endpoint that was de-activated
+ */
+ @SerialVersionUID(1L)
+ final case class EndpointDeActivated(actorRef: ActorRef) extends ActivationMessage(actorRef)
-/**
- * Event message indicating that a single endpoint failed to de-activate.
- * You can use the [[akka.camel.Activation]] trait which is available on [[akka.camel.Camel]]
- * to await activation or de-activation of endpoints.
- * @param actorRef the endpoint that failed to de-activate
- * @param cause the cause for failure
- */
-private[camel] sealed case class EndpointFailedToDeActivate(actorRef: ActorRef, cause: Throwable) extends ActivationMessage(actorRef)
+ /**
+ * For internal use only.
+ * Event message indicating that a single endpoint failed to de-activate.
+ * You can use the [[akka.camel.Activation]] trait which is available on [[akka.camel.Camel]]
+ * to await activation or de-activation of endpoints.
+ * @param actorRef the endpoint that failed to de-activate
+ * @param cause the cause for failure
+ */
+ @SerialVersionUID(1L)
+ final case class EndpointFailedToDeActivate(actorRef: ActorRef, cause: Throwable) extends ActivationMessage(actorRef)
+}
\ No newline at end of file
diff --git a/akka-camel/src/main/scala/akka/camel/internal/ActivationTracker.scala b/akka-camel/src/main/scala/akka/camel/internal/ActivationTracker.scala
index f5a87eff25..43ca2701c6 100644
--- a/akka-camel/src/main/scala/akka/camel/internal/ActivationTracker.scala
+++ b/akka-camel/src/main/scala/akka/camel/internal/ActivationTracker.scala
@@ -7,6 +7,7 @@ package akka.camel.internal
import akka.actor._
import collection.mutable.WeakHashMap
import akka.camel._
+import internal.ActivationProtocol._
/**
* For internal use only. An actor that tracks activation and de-activation of endpoints.
@@ -93,11 +94,6 @@ private[akka] final class ActivationTracker extends Actor with ActorLogging {
}
- /**
- * Subscribes self to messages of type ActivationMessage
- */
- override def preStart(): Unit = context.system.eventStream.subscribe(self, classOf[ActivationMessage])
-
override def receive = {
case msg @ ActivationMessage(ref) ⇒
(activations.getOrElseUpdate(ref, new ActivationStateMachine).receive orElse logStateWarning(ref))(msg)
diff --git a/akka-camel/src/main/scala/akka/camel/internal/CamelExchangeAdapter.scala b/akka-camel/src/main/scala/akka/camel/internal/CamelExchangeAdapter.scala
index 5de9eb447d..5750856b37 100644
--- a/akka-camel/src/main/scala/akka/camel/internal/CamelExchangeAdapter.scala
+++ b/akka-camel/src/main/scala/akka/camel/internal/CamelExchangeAdapter.scala
@@ -30,14 +30,14 @@ private[camel] class CamelExchangeAdapter(val exchange: Exchange) {
/**
* Sets Exchange.getIn from the given CamelMessage object.
*/
- def setRequest(msg: CamelMessage): Unit = msg.copyContentTo(request)
+ def setRequest(msg: CamelMessage): Unit = CamelMessage.copyContent(msg, request)
/**
* Depending on the exchange pattern, sets Exchange.getIn or Exchange.getOut from the given
* CamelMessage object. If the exchange is out-capable then the Exchange.getOut is set, otherwise
* Exchange.getIn.
*/
- def setResponse(msg: CamelMessage): Unit = msg.copyContentTo(response)
+ def setResponse(msg: CamelMessage): Unit = CamelMessage.copyContent(msg, response)
/**
* Sets Exchange.getException from the given FailureResult message. Headers of the FailureResult message
diff --git a/akka-camel/src/main/scala/akka/camel/internal/CamelSupervisor.scala b/akka-camel/src/main/scala/akka/camel/internal/CamelSupervisor.scala
new file mode 100644
index 0000000000..1a7fff6f92
--- /dev/null
+++ b/akka-camel/src/main/scala/akka/camel/internal/CamelSupervisor.scala
@@ -0,0 +1,197 @@
+package akka.camel.internal
+
+import akka.actor._
+import akka.camel.{ CamelSupport, ConsumerConfig }
+import org.apache.camel.Endpoint
+import org.apache.camel.processor.SendProcessor
+import scala.util.control.NonFatal
+import akka.actor.Terminated
+import akka.actor.SupervisorStrategy.Resume
+import akka.camel.internal.CamelSupervisor._
+import akka.AkkaException
+import akka.camel.internal.ActivationProtocol._
+
+/**
+ * For internal use only.
+ * Top level supervisor for internal Camel actors
+ */
+private[camel] class CamelSupervisor extends Actor with CamelSupport {
+ private val activationTracker = context.actorOf(Props[ActivationTracker], "activationTracker")
+ private val registry: ActorRef = context.actorOf(Props(new Registry(activationTracker)), "registry")
+
+ override val supervisorStrategy = OneForOneStrategy() {
+ case NonFatal(e) ⇒
+ Resume
+ }
+
+ def receive = {
+ case AddWatch(actorRef) ⇒ context.watch(actorRef)
+ case Terminated(actorRef) ⇒ registry ! DeRegister(actorRef)
+ case msg: ActivationMessage ⇒ activationTracker forward msg
+ case msg ⇒ registry forward (msg)
+ }
+}
+
+/**
+ * For internal use only.
+ * Messages for the camel supervisor, registrations and de-registrations.
+ */
+private[camel] object CamelSupervisor {
+
+ @SerialVersionUID(1L)
+ sealed trait CamelSupervisorMessage extends Serializable
+
+ /**
+ * For internal use only.
+ * Registers a consumer or a producer.
+ */
+ case class Register(actorRef: ActorRef, endpointUri: String, config: Option[ConsumerConfig] = None) extends NoSerializationVerificationNeeded
+
+ /**
+ * For internal use only.
+ * De-registers a producer or a consumer.
+ */
+ @SerialVersionUID(1L)
+ case class DeRegister(actorRef: ActorRef) extends CamelSupervisorMessage
+
+ /**
+ * For internal use only.
+ * Adds a watch for the actor
+ */
+ @SerialVersionUID(1L)
+ case class AddWatch(actorRef: ActorRef) extends CamelSupervisorMessage
+
+ /**
+ * For internal use only.
+ * Provides a Producer with the required camel objects to function.
+ */
+ case class CamelProducerObjects(endpoint: Endpoint, processor: SendProcessor) extends NoSerializationVerificationNeeded
+}
+
+/**
+ * For internal use only.
+ * Thrown by registrars to indicate that the actor could not be de-activated.
+ */
+private[camel] class ActorDeActivationException(val actorRef: ActorRef, cause: Throwable) extends AkkaException("Actor [%s] failed to de-activate".format(actorRef), cause)
+
+/**
+ * For internal use only.
+ * Thrown by the registrars to indicate that the actor could not be activated.
+ */
+private[camel] class ActorActivationException(val actorRef: ActorRef, cause: Throwable) extends AkkaException("Actor [%s] failed to activate".format(actorRef), cause)
+
+/**
+ * For internal use only.
+ * Registry for Camel Consumers and Producers. Supervises the registrars.
+ */
+private[camel] class Registry(activationTracker: ActorRef) extends Actor with CamelSupport {
+ import context.{ stop, parent }
+
+ private val producerRegistrar = context.actorOf(Props(new ProducerRegistrar(activationTracker)), "producerRegistrar")
+ private val consumerRegistrar = context.actorOf(Props(new ConsumerRegistrar(activationTracker)), "consumerRegistrar")
+ private var producers = Set[ActorRef]()
+ private var consumers = Set[ActorRef]()
+
+ override val supervisorStrategy = OneForOneStrategy() {
+ case e: ActorActivationException ⇒
+ activationTracker ! EndpointFailedToActivate(e.actorRef, e.getCause)
+ stop(e.actorRef)
+ Resume
+ case e: ActorDeActivationException ⇒
+ activationTracker ! EndpointFailedToDeActivate(e.actorRef, e.getCause)
+ stop(e.actorRef)
+ Resume
+ case NonFatal(e) ⇒
+ Resume
+ }
+
+ def receive = {
+ case msg @ Register(consumer, _, Some(_)) ⇒
+ if (!consumers(consumer)) {
+ consumers += consumer
+ consumerRegistrar forward msg
+ parent ! AddWatch(consumer)
+ }
+ case msg @ Register(producer, _, None) ⇒
+ if (!producers(producer)) {
+ producers += producer
+ producerRegistrar forward msg
+ parent ! AddWatch(producer)
+ }
+ case DeRegister(actorRef) ⇒
+ producers.find(_ == actorRef).foreach { p ⇒
+ deRegisterProducer(p)
+ producers -= p
+ }
+ consumers.find(_ == actorRef).foreach { c ⇒
+ deRegisterConsumer(c)
+ consumers -= c
+ }
+ }
+
+ private def deRegisterConsumer(actorRef: ActorRef) { consumerRegistrar ! DeRegister(actorRef) }
+
+ private def deRegisterProducer(actorRef: ActorRef) { producerRegistrar ! DeRegister(actorRef) }
+}
+
+/**
+ * For internal use only.
+ * Registers Producers.
+ */
+private[camel] class ProducerRegistrar(activationTracker: ActorRef) extends Actor with CamelSupport {
+ private var camelObjects = Map[ActorRef, (Endpoint, SendProcessor)]()
+
+ def receive = {
+ case Register(producer, endpointUri, _) ⇒
+ if (!camelObjects.contains(producer)) {
+ try {
+ val endpoint = camelContext.getEndpoint(endpointUri)
+ val processor = new SendProcessor(endpoint)
+ camelObjects += producer -> (endpoint, processor)
+ // if this throws, the supervisor stops the producer and de-registers it on termination
+ processor.start()
+ producer ! CamelProducerObjects(endpoint, processor)
+ activationTracker ! EndpointActivated(producer)
+ } catch {
+ case NonFatal(e) ⇒ throw new ActorActivationException(producer, e)
+ }
+ }
+ case DeRegister(producer) ⇒
+ camelObjects.get(producer).foreach {
+ case (_, processor) ⇒
+ try {
+ camelObjects.get(producer).foreach(_._2.stop())
+ camelObjects -= producer
+ activationTracker ! EndpointDeActivated(producer)
+ } catch {
+ case NonFatal(e) ⇒ throw new ActorDeActivationException(producer, e)
+ }
+ }
+ }
+}
+
+/**
+ * For internal use only.
+ * Registers Consumers.
+ */
+private[camel] class ConsumerRegistrar(activationTracker: ActorRef) extends Actor with CamelSupport {
+ def receive = {
+ case Register(consumer, endpointUri, Some(consumerConfig)) ⇒
+ try {
+ // if this throws, the supervisor stops the consumer and de-registers it on termination
+ camelContext.addRoutes(new ConsumerActorRouteBuilder(endpointUri, consumer, consumerConfig))
+ activationTracker ! EndpointActivated(consumer)
+ } catch {
+ case NonFatal(e) ⇒ throw new ActorActivationException(consumer, e)
+ }
+ case DeRegister(consumer) ⇒
+ try {
+ val route = consumer.path.toString
+ camelContext.stopRoute(route)
+ camelContext.removeRoute(route)
+ activationTracker ! EndpointDeActivated(consumer)
+ } catch {
+ case NonFatal(e) ⇒ throw new ActorDeActivationException(consumer, e)
+ }
+ }
+}
\ No newline at end of file
diff --git a/akka-camel/src/main/scala/akka/camel/internal/ConsumerActorRouteBuilder.scala b/akka-camel/src/main/scala/akka/camel/internal/ConsumerActorRouteBuilder.scala
new file mode 100644
index 0000000000..cd8b70bed5
--- /dev/null
+++ b/akka-camel/src/main/scala/akka/camel/internal/ConsumerActorRouteBuilder.scala
@@ -0,0 +1,39 @@
+/**
+ * Copyright (C) 2009-2012 Typesafe Inc.
+ */
+
+package akka.camel.internal
+
+import akka.camel._
+import component.CamelPath
+import java.io.InputStream
+
+import org.apache.camel.builder.RouteBuilder
+
+import akka.actor._
+import org.apache.camel.model.RouteDefinition
+import akka.serialization.Serializer
+
+/**
+ * For internal use only.
+ * Builder of a route to a target which can be an actor.
+ *
+ * @param endpointUri endpoint URI of the consumer actor.
+ *
+ * @author Martin Krasser
+ */
+private[camel] class ConsumerActorRouteBuilder(endpointUri: String, consumer: ActorRef, config: ConsumerConfig) extends RouteBuilder {
+
+ protected def targetActorUri = CamelPath.toUri(consumer, config.autoAck, config.replyTimeout)
+
+ def configure() {
+ val scheme = endpointUri take endpointUri.indexOf(":") // e.g. "http" from "http://whatever/..."
+ val route = from(endpointUri).routeId(consumer.path.toString)
+ val converted = Conversions(scheme, route)
+ val userCustomized = applyUserRouteCustomization(converted)
+ userCustomized.to(targetActorUri)
+ }
+
+ def applyUserRouteCustomization(rd: RouteDefinition) = config.onRouteDefinition(rd)
+
+}
diff --git a/akka-camel/src/main/scala/akka/camel/internal/ConsumerRegistry.scala b/akka-camel/src/main/scala/akka/camel/internal/ConsumerRegistry.scala
deleted file mode 100644
index 37fec6e1d0..0000000000
--- a/akka-camel/src/main/scala/akka/camel/internal/ConsumerRegistry.scala
+++ /dev/null
@@ -1,151 +0,0 @@
-/**
- * Copyright (C) 2009-2012 Typesafe Inc.
- */
-
-package akka.camel.internal
-
-import akka.camel._
-import component.CamelPath
-import java.io.InputStream
-import org.apache.camel.builder.RouteBuilder
-import akka.actor._
-import collection.mutable
-import org.apache.camel.model.RouteDefinition
-import org.apache.camel.CamelContext
-import scala.concurrent.util.Duration
-import concurrent.Await
-import akka.util.Timeout
-import scala.concurrent.util.FiniteDuration
-
-/**
- * For internal use only.
- * Manages consumer registration. Consumers call registerConsumer method to register themselves when they get created.
- * ActorEndpoint uses it to lookup an actor by its path.
- */
-private[camel] trait ConsumerRegistry { this: Activation ⇒
- def system: ActorSystem
- def context: CamelContext
-
- /**
- * For internal use only.
- */
- private[this] lazy val idempotentRegistry = system.actorOf(Props(new IdempotentCamelConsumerRegistry(context)))
- /**
- * For internal use only. BLOCKING
- * @param endpointUri the URI to register the consumer on
- * @param consumer the consumer
- * @param activationTimeout the timeout for activation
- * @return the actorRef to the consumer
- */
- private[camel] def registerConsumer(endpointUri: String, consumer: Consumer, activationTimeout: FiniteDuration) = {
- idempotentRegistry ! RegisterConsumer(endpointUri, consumer.self, consumer)
- Await.result(activationFutureFor(consumer.self)(activationTimeout, consumer.context.dispatcher), activationTimeout)
- }
-}
-
-/**
- * For internal use only.
- * Guarantees idempotent registration of camel consumer endpoints.
- *
- * Once registered the consumer is watched and unregistered upon termination.
- * It also publishes events to the eventStream so interested parties could subscribe to them.
- * The main consumer of these events is currently the ActivationTracker.
- */
-private[camel] class IdempotentCamelConsumerRegistry(camelContext: CamelContext) extends Actor {
-
- case class UnregisterConsumer(actorRef: ActorRef)
-
- val activated = new mutable.HashSet[ActorRef]
-
- val registrator = context.actorOf(Props(new CamelConsumerRegistrator))
-
- def receive = {
- case msg @ RegisterConsumer(_, consumer, _) ⇒
- if (!isAlreadyActivated(consumer)) {
- activated.add(consumer)
- registrator ! msg
- }
- case msg @ EndpointActivated(consumer) ⇒
- context.watch(consumer)
- context.system.eventStream.publish(msg)
- case msg @ EndpointFailedToActivate(consumer, _) ⇒
- activated.remove(consumer)
- context.system.eventStream.publish(msg)
- case Terminated(ref) ⇒
- activated.remove(ref)
- registrator ! UnregisterConsumer(ref)
- case msg @ EndpointFailedToDeActivate(ref, cause) ⇒ context.system.eventStream.publish(msg)
- case msg: EndpointDeActivated ⇒ context.system.eventStream.publish(msg)
- }
-
- def isAlreadyActivated(ref: ActorRef): Boolean = activated.contains(ref)
-
- //FIXME Break out
- class CamelConsumerRegistrator extends Actor with ActorLogging {
-
- def receive = {
- case RegisterConsumer(endpointUri, consumer, consumerConfig) ⇒
- camelContext.addRoutes(new ConsumerActorRouteBuilder(endpointUri, consumer, consumerConfig))
- context.sender ! EndpointActivated(consumer)
- log.debug("Published actor [{}] at endpoint [{}]", consumerConfig, endpointUri)
- case UnregisterConsumer(consumer) ⇒
- camelContext.stopRoute(consumer.path.toString)
- camelContext.removeRoute(consumer.path.toString)
- context.sender ! EndpointDeActivated(consumer)
- log.debug("Unpublished actor [{}] from endpoint [{}]", consumer, consumer.path)
- }
-
- override def preRestart(reason: Throwable, message: Option[Any]) {
- //FIXME check logic
- super.preStart()
- message match {
- case Some(RegisterConsumer(_, consumer, _)) ⇒ sender ! EndpointFailedToActivate(consumer, reason)
- case Some(UnregisterConsumer(consumer)) ⇒ sender ! EndpointFailedToDeActivate(consumer, reason)
- case _ ⇒
- }
- }
- }
-}
-
-/**
- * For internal use only. A message to register a consumer.
- * @param endpointUri the endpointUri to register to
- * @param actorRef the actorRef to register as a consumer
- * @param config the configuration for the consumer
- */
-private[camel] case class RegisterConsumer(endpointUri: String, actorRef: ActorRef, config: ConsumerConfig)
-
-/**
- * For internal use only.
- * Builder of a route to a target which can be an actor.
- *
- * @param endpointUri endpoint URI of the consumer actor.
- *
- * @author Martin Krasser
- */
-private[camel] class ConsumerActorRouteBuilder(endpointUri: String, consumer: ActorRef, config: ConsumerConfig) extends RouteBuilder {
-
- protected def targetActorUri = CamelPath.toUri(consumer, config.autoAck, config.replyTimeout)
-
- def configure() {
- val scheme = endpointUri take endpointUri.indexOf(":") // e.g. "http" from "http://whatever/..."
-
- val route = from(endpointUri).routeId(consumer.path.toString)
- val converted = Conversions(scheme, route)
- val userCustomized = applyUserRouteCustomization(converted)
- userCustomized.to(targetActorUri)
- }
-
- def applyUserRouteCustomization(rd: RouteDefinition) = config.onRouteDefinition(rd)
-
- object Conversions {
- private val bodyConversions = Map(
- "file" -> classOf[InputStream])
-
- def apply(scheme: String, routeDefinition: RouteDefinition): RouteDefinition = bodyConversions.get(scheme) match {
- case Some(clazz) ⇒ routeDefinition.convertBodyTo(clazz)
- case None ⇒ routeDefinition
- }
- }
-
-}
\ No newline at end of file
diff --git a/akka-camel/src/main/scala/akka/camel/internal/DefaultCamel.scala b/akka-camel/src/main/scala/akka/camel/internal/DefaultCamel.scala
index 41aaacdf8c..016b923bf0 100644
--- a/akka-camel/src/main/scala/akka/camel/internal/DefaultCamel.scala
+++ b/akka-camel/src/main/scala/akka/camel/internal/DefaultCamel.scala
@@ -1,15 +1,21 @@
package akka.camel.internal
-import akka.actor.ActorSystem
+import akka.actor.{ ActorRef, Props, ActorSystem }
import akka.camel.internal.component.{ DurationTypeConverter, ActorComponent }
import org.apache.camel.impl.DefaultCamelContext
import scala.Predef._
import akka.event.Logging
import akka.camel.{ CamelSettings, Camel }
+import akka.camel.internal.ActivationProtocol._
import scala.util.control.NonFatal
import scala.concurrent.util.Duration
-import org.apache.camel.{ ProducerTemplate, CamelContext }
import scala.concurrent.util.FiniteDuration
+import org.apache.camel.ProducerTemplate
+import concurrent.{ Future, ExecutionContext }
+import akka.util.Timeout
+import akka.pattern.ask
+import java.io.InputStream
+import org.apache.camel.model.RouteDefinition
/**
* For internal use only.
@@ -21,16 +27,17 @@ import scala.concurrent.util.FiniteDuration
* Also by not creating extra internal actor system we are conserving resources.
*/
private[camel] class DefaultCamel(val system: ActorSystem) extends Camel {
+ val supervisor = system.actorOf(Props[CamelSupervisor], "camel-supervisor")
/**
* For internal use only.
*/
private[camel] implicit val log = Logging(system, "Camel")
- lazy val context: CamelContext = {
+ lazy val context: DefaultCamelContext = {
val ctx = new DefaultCamelContext
if (!settings.jmxStatistics) ctx.disableJMX()
ctx.setName(system.name)
- ctx.setStreamCaching(true)
+ ctx.setStreamCaching(settings.streamingCache)
ctx.addComponent("akka", new ActorComponent(this, system))
ctx.getTypeConverterRegistry.addTypeConverter(classOf[FiniteDuration], classOf[String], DurationTypeConverter)
ctx
@@ -65,4 +72,46 @@ private[camel] class DefaultCamel(val system: ActorSystem) extends Camel {
}
log.debug("Stopped CamelContext[{}] for ActorSystem[{}]", context.getName, system.name)
}
+
+ /**
+ * Produces a Future with the specified endpoint that will be completed when the endpoint has been activated,
+ * or if it times out, which will happen after the specified Timeout.
+ *
+ * @param endpoint the endpoint to be activated
+ * @param timeout the timeout for the Future
+ */
+ def activationFutureFor(endpoint: ActorRef)(implicit timeout: Timeout, executor: ExecutionContext): Future[ActorRef] =
+
+ (supervisor.ask(AwaitActivation(endpoint))(timeout)).map[ActorRef]({
+ case EndpointActivated(`endpoint`) ⇒ endpoint
+ case EndpointFailedToActivate(`endpoint`, cause) ⇒ throw cause
+ })
+
+ /**
+ * Produces a Future which will be completed when the given endpoint has been deactivated or
+ * or if it times out, which will happen after the specified Timeout.
+ *
+ * @param endpoint the endpoint to be deactivated
+ * @param timeout the timeout of the Future
+ */
+ def deactivationFutureFor(endpoint: ActorRef)(implicit timeout: Timeout, executor: ExecutionContext): Future[ActorRef] =
+ (supervisor.ask(AwaitDeActivation(endpoint))(timeout)).map[ActorRef]({
+ case EndpointDeActivated(`endpoint`) ⇒ endpoint
+ case EndpointFailedToDeActivate(`endpoint`, cause) ⇒ throw cause
+ })
}
+
+/**
+ * For internal use only.
+ */
+private[camel] object Conversions {
+ //FIXME Add this to the configuration, and move this functionality to the Camel Extension.
+ private val bodyConversions = Map(
+ "file" -> classOf[InputStream])
+
+ def apply(scheme: String, routeDefinition: RouteDefinition): RouteDefinition = bodyConversions.get(scheme) match {
+ case Some(clazz) ⇒ routeDefinition.convertBodyTo(clazz)
+ case None ⇒ routeDefinition
+ }
+}
+
diff --git a/akka-camel/src/main/scala/akka/camel/internal/ProducerRegistry.scala b/akka-camel/src/main/scala/akka/camel/internal/ProducerRegistry.scala
deleted file mode 100644
index 8631039a11..0000000000
--- a/akka-camel/src/main/scala/akka/camel/internal/ProducerRegistry.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-package akka.camel.internal
-
-import java.util.concurrent.ConcurrentHashMap
-import org.apache.camel.processor.SendProcessor
-import akka.actor.{ Props, ActorRef, Terminated, Actor }
-import org.apache.camel.Endpoint
-import akka.camel._
-import scala.util.control.NonFatal
-
-/**
- * Watches the end of life of Producers.
- * Removes a Producer from the ProducerRegistry when it is Terminated,
- * which in turn stops the SendProcessor.
- *
- * INTERNAL API
- */
-private class ProducerWatcher(registry: ProducerRegistry) extends Actor {
- override def receive = {
- case RegisterProducer(actorRef) ⇒ context.watch(actorRef)
- case Terminated(actorRef) ⇒ registry.unregisterProducer(actorRef)
- }
-}
-
-/**
- * INTERNAL API
- */
-private case class RegisterProducer(actorRef: ActorRef)
-
-/**
- * For internal use only.
- * Manages the Camel objects for Producers.
- * Every Producer needs an Endpoint and a SendProcessor
- * to produce messages over an Exchange.
- */
-private[camel] trait ProducerRegistry { this: Camel ⇒
- private val camelObjects = new ConcurrentHashMap[ActorRef, (Endpoint, SendProcessor)]()
- private val watcher = system.actorOf(Props(new ProducerWatcher(this))) //FIXME should this really be top level?
-
- private def registerWatch(actorRef: ActorRef): Unit = watcher ! RegisterProducer(actorRef)
-
- /**
- * For internal use only.
- * Unregisters Endpoint and SendProcessor and stops the SendProcessor
- */
- private[camel] def unregisterProducer(actorRef: ActorRef): Unit = {
- // Terminated cannot be sent before the actor is created in the processing of system messages.
- Option(camelObjects.remove(actorRef)).foreach {
- case (_, processor) ⇒
- try {
- processor.stop()
- system.eventStream.publish(EndpointDeActivated(actorRef))
- } catch {
- case NonFatal(e) ⇒ system.eventStream.publish(EndpointFailedToDeActivate(actorRef, e))
- }
- }
- }
-
- /**
- * For internal use only.
- * Creates Endpoint and SendProcessor and associates the actorRef to these.
- * @param actorRef the actorRef of the Producer actor.
- * @param endpointUri the endpoint Uri of the producer
- * @return Endpoint and SendProcessor registered for the actorRef
- */
- private[camel] def registerProducer(actorRef: ActorRef, endpointUri: String): (Endpoint, SendProcessor) = {
- try {
- val endpoint = context.getEndpoint(endpointUri)
- val processor = new SendProcessor(endpoint)
-
- camelObjects.putIfAbsent(actorRef, (endpoint, processor)) match {
- case null ⇒
- processor.start()
- registerWatch(actorRef)
- system.eventStream.publish(EndpointActivated(actorRef))
- (endpoint, processor)
- case prev ⇒ prev
- }
- } catch {
- case NonFatal(e) ⇒ {
- system.eventStream.publish(EndpointFailedToActivate(actorRef, e))
- // can't return null to the producer actor, so blow up actor in initialization.
- throw e //FIXME I'm not a huge fan of log-rethrow, either log or rethrow
- }
- }
- }
-}
\ No newline at end of file
diff --git a/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala b/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala
index 61c4cdb1fb..2a9ab2155f 100644
--- a/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala
+++ b/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala
@@ -15,7 +15,7 @@ import scala.concurrent.util.duration._
import scala.concurrent.util.Duration
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.control.NonFatal
-import java.util.concurrent.{ TimeoutException, CountDownLatch }
+import java.util.concurrent.{ TimeUnit, TimeoutException, CountDownLatch }
import akka.util.Timeout
import akka.camel.internal.CamelExchangeAdapter
import akka.camel.{ ActorNotRegisteredException, Camel, Ack, FailureResult, CamelMessage }
@@ -111,7 +111,7 @@ private[camel] trait ActorEndpointConfig {
private[camel] class ActorProducer(val endpoint: ActorEndpoint, camel: Camel) extends DefaultProducer(endpoint) with AsyncProcessor {
/**
* Processes the exchange.
- * Calls the asynchronous version of the method and waits for the result (blocking).
+ * Calls the synchronous version of the method and waits for the result (blocking).
* @param exchange the exchange to process
*/
def process(exchange: Exchange): Unit = processExchangeAdapter(new CamelExchangeAdapter(exchange))
@@ -131,13 +131,11 @@ private[camel] class ActorProducer(val endpoint: ActorEndpoint, camel: Camel) ex
/**
* For internal use only. Processes the [[akka.camel.internal.CamelExchangeAdapter]]
* @param exchange the [[akka.camel.internal.CamelExchangeAdapter]]
- *
- * WARNING UNBOUNDED BLOCKING AWAITS
*/
private[camel] def processExchangeAdapter(exchange: CamelExchangeAdapter): Unit = {
val isDone = new CountDownLatch(1)
processExchangeAdapter(exchange, new AsyncCallback { def done(doneSync: Boolean) { isDone.countDown() } })
- isDone.await() // this should never wait forever as the process(exchange, callback) method guarantees that.
+ isDone.await(camel.settings.replyTimeout.toMillis, TimeUnit.MILLISECONDS)
}
/**
diff --git a/akka-camel/src/main/scala/akka/camel/javaapi/UntypedConsumerActor.scala b/akka-camel/src/main/scala/akka/camel/javaapi/UntypedConsumerActor.scala
index 3cef3d285a..77526dab08 100644
--- a/akka-camel/src/main/scala/akka/camel/javaapi/UntypedConsumerActor.scala
+++ b/akka-camel/src/main/scala/akka/camel/javaapi/UntypedConsumerActor.scala
@@ -6,7 +6,8 @@ package akka.camel.javaapi
import akka.actor.UntypedActor
import akka.camel._
-import org.apache.camel.{ ProducerTemplate, CamelContext }
+import org.apache.camel.ProducerTemplate
+import org.apache.camel.impl.DefaultCamelContext
/**
* Subclass this abstract class to create an MDB-style untyped consumer actor. This
@@ -21,10 +22,10 @@ abstract class UntypedConsumerActor extends UntypedActor with Consumer {
def getEndpointUri(): String
/**
- * ''Java API'': Returns the [[org.apache.camel.CamelContext]]
+ * ''Java API'': Returns the [[org.apache.camel.impl.DefaultCamelContext]]
* @return the CamelContext
*/
- protected def getCamelContext(): CamelContext = camelContext
+ protected def getCamelContext(): DefaultCamelContext = camelContext
/**
* ''Java API'': Returns the [[org.apache.camel.ProducerTemplate]]
diff --git a/akka-camel/src/main/scala/akka/camel/javaapi/UntypedProducerActor.scala b/akka-camel/src/main/scala/akka/camel/javaapi/UntypedProducerActor.scala
index 38c4cf276a..cd353e04a0 100644
--- a/akka-camel/src/main/scala/akka/camel/javaapi/UntypedProducerActor.scala
+++ b/akka-camel/src/main/scala/akka/camel/javaapi/UntypedProducerActor.scala
@@ -7,6 +7,7 @@ package akka.camel.javaapi
import akka.actor.UntypedActor
import akka.camel._
import org.apache.camel.{ CamelContext, ProducerTemplate }
+import org.apache.camel.impl.DefaultCamelContext
/**
* Subclass this abstract class to create an untyped producer actor. This class is meant to be used from Java.
@@ -64,7 +65,7 @@ abstract class UntypedProducerActor extends UntypedActor with ProducerSupport {
/**
* Returns the CamelContext.
*/
- def getCamelContext(): CamelContext = camel.context
+ def getCamelContext(): DefaultCamelContext = camel.context
/**
* Returns the ProducerTemplate.
diff --git a/akka-camel/src/main/scala/akka/package.scala b/akka-camel/src/main/scala/akka/package.scala
index e1f2c0756e..9347580e76 100644
--- a/akka-camel/src/main/scala/akka/package.scala
+++ b/akka-camel/src/main/scala/akka/package.scala
@@ -17,4 +17,4 @@ package object camel {
* }}}
*/
implicit def toActorRouteDefinition[T <: ProcessorDefinition[T]](definition: ProcessorDefinition[T]) = new ActorRouteDefinition(definition)
-}
\ No newline at end of file
+}
diff --git a/akka-camel/src/test/java/akka/camel/ConsumerJavaTestBase.java b/akka-camel/src/test/java/akka/camel/ConsumerJavaTestBase.java
index 466f213092..0c9aad7e23 100644
--- a/akka-camel/src/test/java/akka/camel/ConsumerJavaTestBase.java
+++ b/akka-camel/src/test/java/akka/camel/ConsumerJavaTestBase.java
@@ -4,23 +4,20 @@
package akka.camel;
-import static org.junit.Assert.assertEquals;
-
-import java.util.concurrent.TimeUnit;
-
-import org.junit.AfterClass;
-import org.junit.Test;
-
-import scala.concurrent.Await;
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.util.Duration;
-import scala.concurrent.util.FiniteDuration;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
-import akka.testkit.AkkaSpec;
import akka.testkit.JavaTestKit;
-
+import akka.util.Timeout;
+import scala.concurrent.Await;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.util.Duration;
+import org.junit.AfterClass;
+import org.junit.Test;
+import java.util.concurrent.TimeUnit;
+import akka.testkit.AkkaSpec;
+import scala.concurrent.util.FiniteDuration;
+import static org.junit.Assert.*;
/**
* @author Martin Krasser
*/
@@ -33,30 +30,27 @@ public class ConsumerJavaTestBase {
system.shutdown();
}
- @Test
- public void shouldHandleExceptionThrownByActorAndGenerateCustomResponse()
- throws Exception {
- new JavaTestKit(system) {
- {
- String result = new EventFilter(Exception.class) {
- protected String run() {
- FiniteDuration timeout = Duration.create(1, TimeUnit.SECONDS);
- Camel camel = CamelExtension.get(system);
- ExecutionContext executionContext = system.dispatcher();
- try {
- @SuppressWarnings("unused")
- ActorRef ref = Await.result(camel.activationFutureFor(
- system.actorOf(new Props(SampleErrorHandlingConsumer.class)),
- timeout, executionContext), timeout);
- return camel.template().requestBody(
- "direct:error-handler-test-java", "hello", String.class);
- } catch (Exception e) {
- return e.getMessage();
- }
- }
- }.occurrences(1).exec();
- assertEquals("error: hello", result);
- }
- };
- }
+ @Test
+ public void shouldHandleExceptionThrownByActorAndGenerateCustomResponse() throws Exception {
+ new JavaTestKit(system) {{
+ String result = new EventFilter(Exception.class) {
+ protected String run() {
+ FiniteDuration duration = Duration.create(1, TimeUnit.SECONDS);
+ Timeout timeout = new Timeout(duration);
+ Camel camel = CamelExtension.get(system);
+ ExecutionContext executionContext = system.dispatcher();
+ try {
+ Await.result(
+ camel.activationFutureFor(system.actorOf(new Props(SampleErrorHandlingConsumer.class), "sample-error-handling-consumer"), timeout, executionContext),
+ duration);
+ return camel.template().requestBody("direct:error-handler-test-java", "hello", String.class);
+ }
+ catch (Exception e) {
+ return e.getMessage();
+ }
+ }
+ }.occurrences(1).exec();
+ assertEquals("error: hello", result);
+ }};
+ }
}
diff --git a/akka-camel/src/test/java/akka/camel/CustomRouteTestBase.java b/akka-camel/src/test/java/akka/camel/CustomRouteTestBase.java
index db26c9e8ab..77b0294f60 100644
--- a/akka-camel/src/test/java/akka/camel/CustomRouteTestBase.java
+++ b/akka-camel/src/test/java/akka/camel/CustomRouteTestBase.java
@@ -4,10 +4,10 @@ import akka.actor.*;
import akka.camel.internal.component.CamelPath;
import akka.camel.javaapi.UntypedConsumerActor;
import akka.camel.javaapi.UntypedProducerActor;
+import akka.util.Timeout;
import scala.concurrent.Await;
import scala.concurrent.ExecutionContext;
import scala.concurrent.util.Duration;
-import scala.concurrent.util.FiniteDuration;
import org.apache.camel.CamelExecutionException;
import org.apache.camel.Exchange;
import org.apache.camel.Predicate;
@@ -16,6 +16,8 @@ import org.apache.camel.component.mock.MockEndpoint;
import org.junit.Before;
import org.junit.After;
import org.junit.Test;
+import scala.concurrent.util.FiniteDuration;
+
import java.util.concurrent.TimeUnit;
public class CustomRouteTestBase {
@@ -60,11 +62,12 @@ public class CustomRouteTestBase {
@Test
public void testCustomConsumerRoute() throws Exception {
MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockConsumer", MockEndpoint.class);
- FiniteDuration timeout = Duration.create(10, TimeUnit.SECONDS);
+ FiniteDuration duration = Duration.create(10, TimeUnit.SECONDS);
+ Timeout timeout = new Timeout(duration);
ExecutionContext executionContext = system.dispatcher();
ActorRef consumer = Await.result(
camel.activationFutureFor(system.actorOf(new Props(TestConsumer.class), "testConsumer"), timeout, executionContext),
- timeout);
+ duration);
camel.context().addRoutes(new CustomRouteBuilder("direct:testRouteConsumer",consumer));
camel.template().sendBody("direct:testRouteConsumer", "test");
assertMockEndpoint(mockEndpoint);
@@ -74,15 +77,16 @@ public class CustomRouteTestBase {
@Test
public void testCustomAckConsumerRoute() throws Exception {
MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockAck", MockEndpoint.class);
- FiniteDuration timeout = Duration.create(10, TimeUnit.SECONDS);
+ FiniteDuration duration = Duration.create(10, TimeUnit.SECONDS);
+ Timeout timeout = new Timeout(duration);
ExecutionContext executionContext = system.dispatcher();
ActorRef consumer = Await.result(
camel.activationFutureFor(
system.actorOf(
new Props( new UntypedActorFactory(){ public Actor create() { return new TestAckConsumer("direct:testConsumerAck","mock:mockAck"); } }), "testConsumerAck"),
timeout, executionContext),
- timeout);
- camel.context().addRoutes(new CustomRouteBuilder("direct:testAck", consumer, false, timeout));
+ duration);
+ camel.context().addRoutes(new CustomRouteBuilder("direct:testAck", consumer, false, duration));
camel.template().sendBody("direct:testAck", "test");
assertMockEndpoint(mockEndpoint);
system.stop(consumer);
@@ -92,11 +96,12 @@ public class CustomRouteTestBase {
public void testCustomAckConsumerRouteFromUri() throws Exception {
MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockAckUri", MockEndpoint.class);
ExecutionContext executionContext = system.dispatcher();
- FiniteDuration timeout = Duration.create(10, TimeUnit.SECONDS);
+ FiniteDuration duration = Duration.create(10, TimeUnit.SECONDS);
+ Timeout timeout = new Timeout(duration);
ActorRef consumer = Await.result(
- camel.activationFutureFor(system.actorOf(new Props( new UntypedActorFactory(){ public Actor create() { return new TestAckConsumer("direct:testConsumerAckFromUri","mock:mockAckUri"); } }), "testConsumerAckUri"),
+ camel.activationFutureFor(system.actorOf(new Props( new UntypedActorFactory(){ public Actor create() { return new TestAckConsumer("direct:testConsumerAckFromUri","mock:mockAckUri"); } }), "testConsumerAckUri"),
timeout, executionContext),
- timeout);
+ duration);
camel.context().addRoutes(new CustomRouteBuilder("direct:testAckFromUri","akka://test/user/testConsumerAckUri?autoAck=false"));
camel.template().sendBody("direct:testAckFromUri", "test");
assertMockEndpoint(mockEndpoint);
@@ -105,12 +110,13 @@ public class CustomRouteTestBase {
@Test(expected=CamelExecutionException.class)
public void testCustomTimeoutConsumerRoute() throws Exception {
- FiniteDuration timeout = Duration.create(10, TimeUnit.SECONDS);
+ FiniteDuration duration = Duration.create(10, TimeUnit.SECONDS);
+ Timeout timeout = new Timeout(duration);
ExecutionContext executionContext = system.dispatcher();
ActorRef consumer = Await.result(
camel.activationFutureFor(system.actorOf(new Props( new UntypedActorFactory(){ public Actor create() { return new TestAckConsumer("direct:testConsumerException","mock:mockException"); } }), "testConsumerException"),
timeout, executionContext),
- timeout);
+ duration);
camel.context().addRoutes(new CustomRouteBuilder("direct:testException", consumer, false, Duration.create(0, TimeUnit.SECONDS)));
camel.template().sendBody("direct:testException", "test");
}
diff --git a/akka-camel/src/test/java/akka/camel/MessageJavaTestBase.java b/akka-camel/src/test/java/akka/camel/MessageJavaTestBase.java
index ba483e7a13..95cdc5007b 100644
--- a/akka-camel/src/test/java/akka/camel/MessageJavaTestBase.java
+++ b/akka-camel/src/test/java/akka/camel/MessageJavaTestBase.java
@@ -5,6 +5,7 @@
package akka.camel;
import akka.actor.ActorSystem;
+import akka.dispatch.Mapper;
import akka.japi.Function;
import org.apache.camel.NoTypeConversionAvailableException;
import org.junit.AfterClass;
@@ -48,12 +49,6 @@ public class MessageJavaTestBase {
message(1.4).getBodyAs(InputStream.class,camel.context());
}
-
- @Test public void shouldReturnDoubleHeader() {
- CamelMessage message = message("test" , createMap("test", 1.4));
- assertEquals(1.4, message.getHeader("test"));
- }
-
@Test public void shouldConvertDoubleHeaderToString() {
CamelMessage message = message("test" , createMap("test", 1.4));
assertEquals("1.4", message.getHeaderAs("test", String.class,camel.context()));
@@ -105,24 +100,6 @@ public class MessageJavaTestBase {
message("test1" , createMap("A", "1")).withHeaders(createMap("C", "3")));
}
- @Test public void shouldAddHeaderAndPreserveBodyAndHeaders() {
- assertEquals(
- message("test1" , createMap("A", "1", "B", "2")),
- message("test1" , createMap("A", "1")).addHeader("B", "2"));
- }
-
- @Test public void shouldAddHeadersAndPreserveBodyAndHeaders() {
- assertEquals(
- message("test1" , createMap("A", "1", "B", "2")),
- message("test1" , createMap("A", "1")).addHeaders(createMap("B", "2")));
- }
-
- @Test public void shouldRemoveHeadersAndPreserveBodyAndRemainingHeaders() {
- assertEquals(
- message("test1" , createMap("A", "1")),
- message("test1" , createMap("A", "1", "B", "2")).withoutHeader("B"));
- }
-
private static Set createSet(String... entries) {
HashSet set = new HashSet();
set.addAll(Arrays.asList(entries));
@@ -137,7 +114,8 @@ public class MessageJavaTestBase {
return map;
}
- private static class TestTransformer implements Function {
+ private static class TestTransformer extends Mapper {
+ @Override
public String apply(String param) {
return param + "b";
}
diff --git a/akka-camel/src/test/java/akka/camel/SampleErrorHandlingConsumer.java b/akka-camel/src/test/java/akka/camel/SampleErrorHandlingConsumer.java
index e5603acec1..c654e3958d 100644
--- a/akka-camel/src/test/java/akka/camel/SampleErrorHandlingConsumer.java
+++ b/akka-camel/src/test/java/akka/camel/SampleErrorHandlingConsumer.java
@@ -6,34 +6,38 @@ package akka.camel;
import akka.actor.Status;
import akka.camel.javaapi.UntypedConsumerActor;
+import akka.dispatch.Mapper;
import scala.concurrent.util.Duration;
import org.apache.camel.builder.Builder;
import org.apache.camel.model.ProcessorDefinition;
import org.apache.camel.model.RouteDefinition;
import scala.Option;
+import scala.concurrent.util.FiniteDuration;
/**
* @author Martin Krasser
*/
public class SampleErrorHandlingConsumer extends UntypedConsumerActor {
+ private static Mapper> mapper = new Mapper>() {
+ public ProcessorDefinition> apply(RouteDefinition rd) {
+ return rd.onException(Exception.class).handled(true).transform(Builder.exceptionMessage()).end();
+ }
+ };
public String getEndpointUri() {
return "direct:error-handler-test-java";
}
@Override
- //TODO write test confirming this gets called in java
- public ProcessorDefinition onRouteDefinition(RouteDefinition rd) {
- return rd.onException(Exception.class).handled(true).transform(Builder.exceptionMessage()).end();
+ public Mapper> onRouteDefinition() {
+ return mapper;
}
@Override
- public Duration replyTimeout(){
+ public FiniteDuration replyTimeout(){
return Duration.create(1, "second");
}
-
-
public void onReceive(Object message) throws Exception {
CamelMessage msg = (CamelMessage) message;
String body = msg.getBodyAs(String.class,this.getCamelContext());
diff --git a/akka-camel/src/test/java/akka/camel/SampleUntypedActor.java b/akka-camel/src/test/java/akka/camel/SampleUntypedActor.java
deleted file mode 100644
index e1c17900c3..0000000000
--- a/akka-camel/src/test/java/akka/camel/SampleUntypedActor.java
+++ /dev/null
@@ -1,16 +0,0 @@
-/**
- * Copyright (C) 2009-2012 Typesafe Inc.
- */
-
-package akka.camel;
-
-import akka.actor.UntypedActor;
-
-/**
- * @author Martin Krasser
- */
-public class SampleUntypedActor extends UntypedActor {
- public void onReceive(Object message) {
- System.out.println("Yay! I haz a message!");
- }
-}
diff --git a/akka-camel/src/test/resources/logback-test.xml b/akka-camel/src/test/resources/logback-test.xml
new file mode 100644
index 0000000000..1b571cf387
--- /dev/null
+++ b/akka-camel/src/test/resources/logback-test.xml
@@ -0,0 +1,23 @@
+
+
+
+
+
+
+ %date{ISO8601} %-5level %logger %X{akkaSource} %X{sourceThread} - %msg%n
+
+
+
+
+ target/akka-camel.log
+ true
+
+ %date{ISO8601} %-5level %logger %X{akkaSource} %X{sourceThread} - %msg%n
+
+
+
+
+
+
+
+
diff --git a/akka-camel/src/test/scala/akka/camel/ActivationIntegrationTest.scala b/akka-camel/src/test/scala/akka/camel/ActivationIntegrationTest.scala
index fe79bacdd8..54c671c3b5 100644
--- a/akka-camel/src/test/scala/akka/camel/ActivationIntegrationTest.scala
+++ b/akka-camel/src/test/scala/akka/camel/ActivationIntegrationTest.scala
@@ -10,21 +10,22 @@ import org.scalatest.matchers.MustMatchers
import scala.concurrent.util.duration._
import org.apache.camel.ProducerTemplate
import akka.actor._
-import akka.util.Timeout
import TestSupport._
import org.scalatest.WordSpec
import akka.testkit.TestLatch
-import scala.concurrent.Await
+import concurrent.Await
import java.util.concurrent.TimeoutException
+import akka.util.Timeout
class ActivationIntegrationTest extends WordSpec with MustMatchers with SharedCamelSystem {
- implicit val timeout = 10 seconds
+ val timeoutDuration = 10 seconds
+ implicit val timeout = Timeout(timeoutDuration)
def template: ProducerTemplate = camel.template
import system.dispatcher
"ActivationAware must be notified when endpoint is activated" in {
val latch = new TestLatch(0)
- val actor = system.actorOf(Props(new TestConsumer("direct:actor-1", latch)))
+ val actor = system.actorOf(Props(new TestConsumer("direct:actor-1", latch)), "act-direct-actor-1")
Await.result(camel.activationFutureFor(actor), 10 seconds) must be === actor
template.requestBody("direct:actor-1", "test") must be("received test")
@@ -40,32 +41,39 @@ class ActivationIntegrationTest extends WordSpec with MustMatchers with SharedCa
super.postStop()
latch.countDown()
}
- })
- Await.result(camel.activationFutureFor(actor), timeout)
+ }, name = "direct-a3")
+ Await.result(camel.activationFutureFor(actor), timeoutDuration)
system.stop(actor)
- Await.result(camel.deactivationFutureFor(actor), timeout)
- Await.ready(latch, 10 second)
+ Await.result(camel.deactivationFutureFor(actor), timeoutDuration)
+ Await.ready(latch, timeoutDuration)
}
"ActivationAware must time out when waiting for endpoint de-activation for too long" in {
val latch = new TestLatch(0)
- val actor = start(new TestConsumer("direct:a5", latch))
- Await.result(camel.activationFutureFor(actor), timeout)
+ val actor = start(new TestConsumer("direct:a5", latch), name = "direct-a5")
+ Await.result(camel.activationFutureFor(actor), timeoutDuration)
intercept[TimeoutException] { Await.result(camel.deactivationFutureFor(actor), 1 millis) }
}
"activationFutureFor must fail if notification timeout is too short and activation is not complete yet" in {
val latch = new TestLatch(1)
- try {
- val actor = system.actorOf(Props(new TestConsumer("direct:actor-4", latch)))
- intercept[TimeoutException] { Await.result(camel.activationFutureFor(actor), 1 millis) }
- } finally latch.countDown()
+ val actor = system.actorOf(Props(new TestConsumer("direct:actor-4", latch)), "direct-actor-4")
+ intercept[TimeoutException] { Await.result(camel.activationFutureFor(actor), 1 millis) }
+ latch.countDown()
+ // after the latch is removed, complete the wait for completion so this test does not later on
+ // print errors because of the registerConsumer timing out.
+ Await.result(camel.activationFutureFor(actor), timeoutDuration)
}
class TestConsumer(uri: String, latch: TestLatch) extends Consumer {
def endpointUri = uri
- Await.ready(latch, 60 seconds)
+
+ override def preStart() {
+ Await.ready(latch, 60 seconds)
+ super.preStart()
+ }
+
override def receive = {
case msg: CamelMessage ⇒ sender ! "received " + msg.body
}
diff --git a/akka-camel/src/test/scala/akka/camel/CamelMessageTest.scala b/akka-camel/src/test/scala/akka/camel/CamelMessageTest.scala
index c5dfd01d00..0fd0a73fb6 100644
--- a/akka-camel/src/test/scala/akka/camel/CamelMessageTest.scala
+++ b/akka-camel/src/test/scala/akka/camel/CamelMessageTest.scala
@@ -16,24 +16,11 @@ class CamelMessageTest extends MustMatchers with WordSpec with SharedCamelSystem
"overwrite body and add header" in {
val msg = sampleMessage
- CamelMessage("blah", Map("key" -> "baz")).copyContentTo(msg)
+ CamelMessage.copyContent(CamelMessage("blah", Map("key" -> "baz")), msg)
assert(msg.getBody === "blah")
assert(msg.getHeader("foo") === "bar")
assert(msg.getHeader("key") === "baz")
}
-
- "create message with body and header" in {
- val m = CamelMessage.from(sampleMessage)
- assert(m.body === "test")
- assert(m.headers("foo") === "bar")
- }
-
- "create message with body and header and custom header" in {
- val m = CamelMessage.from(sampleMessage, Map("key" -> "baz"))
- assert(m.body === "test")
- assert(m.headers("foo") === "bar")
- assert(m.headers("key") === "baz")
- }
}
private[camel] def sampleMessage = {
diff --git a/akka-camel/src/test/scala/akka/camel/ConcurrentActivationTest.scala b/akka-camel/src/test/scala/akka/camel/ConcurrentActivationTest.scala
new file mode 100644
index 0000000000..d4983bbf6a
--- /dev/null
+++ b/akka-camel/src/test/scala/akka/camel/ConcurrentActivationTest.scala
@@ -0,0 +1,159 @@
+package akka.camel
+
+import org.scalatest.WordSpec
+import org.scalatest.matchers.MustMatchers
+import akka.camel.TestSupport.NonSharedCamelSystem
+import akka.actor.{ ActorRef, Props, Actor }
+import akka.routing.BroadcastRouter
+import concurrent.{ Promise, Await, Future }
+import scala.concurrent.util.duration._
+import language.postfixOps
+import akka.testkit._
+import akka.util.Timeout
+import org.apache.camel.model.{ProcessorDefinition, RouteDefinition}
+import org.apache.camel.builder.Builder
+
+/**
+ * A test to concurrently register and de-register consumer and producer endpoints
+ */
+class ConcurrentActivationTest extends WordSpec with MustMatchers with NonSharedCamelSystem {
+
+ "Activation" must {
+ "support concurrent registrations and de-registrations" in {
+ implicit val timeout = Timeout(10 seconds)
+ val timeoutDuration = timeout.duration
+ implicit val ec = system.dispatcher
+ val number = 10
+ filterEvents(EventFilter.warning(pattern = "received dead letter from .*producerRegistrar.*", occurrences = number*number)) {
+ // A ConsumerBroadcast creates 'number' amount of ConsumerRegistrars, which will register 'number' amount of endpoints,
+ // in total number*number endpoints, activating and deactivating every endpoint.
+ // a promise to the list of registrars, which have a list of actorRefs each. A tuple of a list of activated refs and a list of deactivated refs
+ val promiseRegistrarLists = Promise[(Future[List[List[ActorRef]]], Future[List[List[ActorRef]]])]()
+ // future to all the futures of activation and deactivation
+ val futureRegistrarLists = promiseRegistrarLists.future
+
+ val ref = system.actorOf(Props(new ConsumerBroadcast(promiseRegistrarLists)), name = "broadcaster")
+ // create the registrars
+ ref ! CreateRegistrars(number)
+ // send a broadcast to all registrars, so that number * number messages are sent
+ // every Register registers a consumer and a producer
+ (1 to number).map(i ⇒ ref ! RegisterConsumersAndProducers("direct:concurrent-"))
+ // de-register all consumers and producers
+ ref ! DeRegisterConsumersAndProducers()
+
+ val promiseAllRefs = Promise[(List[ActorRef], List[ActorRef])]()
+ val allRefsFuture = promiseAllRefs.future
+ // map over all futures, put all futures in one list of activated and deactivated actor refs.
+ futureRegistrarLists.map {
+ case (futureActivations, futureDeactivations) ⇒
+ futureActivations zip futureDeactivations map {
+ case (activations, deactivations) ⇒
+ promiseAllRefs.success( (activations.flatten, deactivations.flatten))
+ }
+ }
+ val (activations, deactivations) = Await.result(allRefsFuture, timeoutDuration)
+ // must be the size of the activated activated producers and consumers
+ activations.size must be (2 * number * number)
+ // must be the size of the activated activated producers and consumers
+ deactivations.size must be ( 2* number * number )
+ def partitionNames(refs:Seq[ActorRef]) = refs.map(_.path.name).partition(_.startsWith("concurrent-test-echo-consumer"))
+ def assertContainsSameElements(lists:(Seq[_], Seq[_])) {
+ val (a,b) = lists
+ a.intersect(b).size must be (a.size)
+ }
+ val (activatedConsumerNames, activatedProducerNames) = partitionNames(activations)
+ val (deactivatedConsumerNames, deactivatedProducerNames) = partitionNames(deactivations)
+ assertContainsSameElements(activatedConsumerNames, deactivatedConsumerNames)
+ assertContainsSameElements(activatedProducerNames, deactivatedProducerNames)
+ }
+ }
+ }
+
+}
+class ConsumerBroadcast(promise: Promise[(Future[List[List[ActorRef]]], Future[List[List[ActorRef]]])]) extends Actor {
+ private var broadcaster: Option[ActorRef] = None
+ private implicit val ec = context.dispatcher
+ def receive = {
+ case CreateRegistrars(number) ⇒
+ var allActivationFutures = List[Future[List[ActorRef]]]()
+ var allDeactivationFutures = List[Future[List[ActorRef]]]()
+
+ val routees = (1 to number).map { i ⇒
+ val activationListPromise = Promise[List[ActorRef]]()
+ val deactivationListPromise = Promise[List[ActorRef]]()
+ val activationListFuture = activationListPromise.future
+ val deactivationListFuture = deactivationListPromise.future
+
+ allActivationFutures = allActivationFutures :+ activationListFuture
+ allDeactivationFutures = allDeactivationFutures :+ deactivationListFuture
+ context.actorOf(Props(new Registrar(i, number, activationListPromise, deactivationListPromise)), s"registrar-$i")
+ }
+ promise.success((Future.sequence(allActivationFutures)), Future.sequence(allDeactivationFutures))
+
+ broadcaster = Some(context.actorOf(Props[Registrar] withRouter (BroadcastRouter(routees)), "registrarRouter"))
+ case reg: Any ⇒
+ broadcaster.foreach(_.forward(reg))
+ }
+}
+
+case class CreateRegistrars(number: Int)
+case class RegisterConsumersAndProducers(endpointUri: String)
+case class DeRegisterConsumersAndProducers()
+case class Activations()
+case class DeActivations()
+
+class Registrar(val start: Int, val number: Int, activationsPromise: Promise[List[ActorRef]],
+ deActivationsPromise: Promise[List[ActorRef]]) extends Actor {
+ private var actorRefs = Set[ActorRef]()
+ private var activations = Set[Future[ActorRef]]()
+ private var deActivations = Set[Future[ActorRef]]()
+ private var index = 0
+ private val camel = CamelExtension(context.system)
+ private implicit val ec = context.dispatcher
+ private implicit val timeout = Timeout(10 seconds)
+
+ def receive = {
+ case reg: RegisterConsumersAndProducers ⇒
+ val i = index
+ add(new EchoConsumer(s"${reg.endpointUri}$start-$i"), s"concurrent-test-echo-consumer$start-$i")
+ add(new TestProducer(s"${reg.endpointUri}$start-$i"), s"concurrent-test-producer-$start-$i")
+ index = index + 1
+ if (activations.size == number * 2) {
+ Future.sequence(activations.toList) map activationsPromise.success
+ }
+ case reg: DeRegisterConsumersAndProducers ⇒
+ actorRefs.foreach { aref ⇒
+ context.stop(aref)
+ deActivations = deActivations + camel.deactivationFutureFor(aref)
+ if (deActivations.size == number * 2) {
+ Future.sequence(deActivations.toList) map deActivationsPromise.success
+ }
+ }
+ }
+
+ def add(actor: =>Actor, name:String) {
+ val ref = context.actorOf(Props(actor), name)
+ actorRefs = actorRefs + ref
+ activations = activations + camel.activationFutureFor(ref)
+ }
+}
+
+class EchoConsumer(endpoint: String) extends Actor with Consumer {
+
+ def endpointUri = endpoint
+
+ def receive = {
+ case msg: CamelMessage ⇒ sender ! msg
+ }
+
+ /**
+ * Returns the route definition handler for creating a custom route to this consumer.
+ * By default it returns an identity function, override this method to
+ * return a custom route definition handler.
+ */
+ override def onRouteDefinition = (rd: RouteDefinition) => rd.onException(classOf[Exception]).handled(true).transform(Builder.exceptionMessage).end
+}
+
+class TestProducer(uri: String) extends Actor with Producer {
+ def endpointUri = uri
+}
diff --git a/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala b/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala
index 6ffacf3432..0de66ae082 100644
--- a/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala
+++ b/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala
@@ -4,6 +4,7 @@
package akka.camel
+import internal.ActorActivationException
import language.postfixOps
import language.existentials
@@ -11,7 +12,7 @@ import akka.actor._
import org.scalatest.matchers.MustMatchers
import org.scalatest.WordSpec
import akka.camel.TestSupport._
-import org.apache.camel.model.RouteDefinition
+import org.apache.camel.model.{ ProcessorDefinition, RouteDefinition }
import org.apache.camel.builder.Builder
import org.apache.camel.{ FailedToCreateRouteException, CamelExecutionException }
import java.util.concurrent.{ ExecutionException, TimeUnit, TimeoutException }
@@ -19,17 +20,19 @@ import akka.actor.Status.Failure
import scala.concurrent.util.duration._
import concurrent.{ ExecutionContext, Await }
import akka.testkit._
+import akka.util.Timeout
class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedCamelSystem {
"ConsumerIntegrationTest" must {
- implicit val defaultTimeout = 10.seconds
+ val defaultTimeoutDuration = 10 seconds
+ implicit val defaultTimeout = Timeout(defaultTimeoutDuration)
implicit def ec: ExecutionContext = system.dispatcher
"Consumer must throw FailedToCreateRouteException, while awaiting activation, if endpoint is invalid" in {
- filterEvents(EventFilter[ActorInitializationException](occurrences = 1), EventFilter[FailedToCreateRouteException](occurrences = 1)) {
+ filterEvents(EventFilter[ActorActivationException](occurrences = 1)) {
val actorRef = system.actorOf(Props(new TestActor(uri = "some invalid uri")))
intercept[FailedToCreateRouteException] {
- Await.result(camel.activationFutureFor(actorRef), defaultTimeout)
+ Await.result(camel.activationFutureFor(actorRef), defaultTimeoutDuration)
}
}
}
@@ -40,7 +43,7 @@ class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedC
def receive = {
case m: CamelMessage ⇒ sender ! "received " + m.bodyAs[String]
}
- })
+ }, name = "direct-a1")
camel.sendTo("direct:a1", msg = "some message") must be("received some message")
}
@@ -48,17 +51,18 @@ class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedC
val SHORT_TIMEOUT = 10 millis
val LONG_WAIT = 200 millis
- start(new Consumer {
+ val ref = start(new Consumer {
override def replyTimeout = SHORT_TIMEOUT
def endpointUri = "direct:a3"
def receive = { case _ ⇒ { Thread.sleep(LONG_WAIT.toMillis); sender ! "done" } }
- })
+ }, name = "ignore-this-deadletter-timeout-consumer-reply")
val exception = intercept[CamelExecutionException] {
camel.sendTo("direct:a3", msg = "some msg 3")
}
exception.getCause.getClass must be(classOf[TimeoutException])
+ stop(ref)
}
"Consumer must process messages even after actor restart" in {
@@ -74,90 +78,105 @@ class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedC
override def postRestart(reason: Throwable) {
restarted.countDown()
}
- })
+ }, "direct-a2")
filterEvents(EventFilter[TestException](occurrences = 1)) {
consumer ! "throw"
- Await.ready(restarted, defaultTimeout)
+ Await.ready(restarted, defaultTimeoutDuration)
camel.sendTo("direct:a2", msg = "xyz") must be("received xyz")
}
+ stop(consumer)
}
"Consumer must unregister itself when stopped" in {
- val consumer = start(new TestActor())
- Await.result(camel.activationFutureFor(consumer), defaultTimeout)
+ val consumer = start(new TestActor(), name = "test-actor-unregister")
+ Await.result(camel.activationFutureFor(consumer), defaultTimeoutDuration)
camel.routeCount must be > (0)
system.stop(consumer)
- Await.result(camel.deactivationFutureFor(consumer), defaultTimeout)
+ Await.result(camel.deactivationFutureFor(consumer), defaultTimeoutDuration)
camel.routeCount must be(0)
}
"Consumer must register on uri passed in through constructor" in {
- val consumer = start(new TestActor("direct://test"))
- Await.result(camel.activationFutureFor(consumer), defaultTimeout)
+ val consumer = start(new TestActor("direct://test"), name = "direct-test")
+ Await.result(camel.activationFutureFor(consumer), defaultTimeoutDuration)
camel.routeCount must be > (0)
camel.routes.get(0).getEndpoint.getEndpointUri must be("direct://test")
system.stop(consumer)
- Await.result(camel.deactivationFutureFor(consumer), defaultTimeout)
+ Await.result(camel.deactivationFutureFor(consumer), defaultTimeoutDuration)
camel.routeCount must be(0)
+ stop(consumer)
}
"Error passing consumer supports error handling through route modification" in {
- start(new ErrorThrowingConsumer("direct:error-handler-test") with ErrorPassing {
- override def onRouteDefinition(rd: RouteDefinition) = {
+ val ref = start(new ErrorThrowingConsumer("direct:error-handler-test") {
+ override def onRouteDefinition = (rd: RouteDefinition) ⇒ {
rd.onException(classOf[TestException]).handled(true).transform(Builder.exceptionMessage).end
}
- })
+ }, name = "direct-error-handler-test")
filterEvents(EventFilter[TestException](occurrences = 1)) {
camel.sendTo("direct:error-handler-test", msg = "hello") must be("error: hello")
}
+ stop(ref)
}
"Error passing consumer supports redelivery through route modification" in {
- start(new FailingOnceConsumer("direct:failing-once-concumer") with ErrorPassing {
- override def onRouteDefinition(rd: RouteDefinition) = {
+ val ref = start(new FailingOnceConsumer("direct:failing-once-concumer") {
+ override def onRouteDefinition = (rd: RouteDefinition) ⇒ {
rd.onException(classOf[TestException]).maximumRedeliveries(1).end
}
- })
+ }, name = "direct-failing-once-consumer")
filterEvents(EventFilter[TestException](occurrences = 1)) {
camel.sendTo("direct:failing-once-concumer", msg = "hello") must be("accepted: hello")
}
+ stop(ref)
}
"Consumer supports manual Ack" in {
- start(new ManualAckConsumer() {
+ val ref = start(new ManualAckConsumer() {
def endpointUri = "direct:manual-ack"
def receive = { case _ ⇒ sender ! Ack }
- })
- camel.template.asyncSendBody("direct:manual-ack", "some message").get(defaultTimeout.toSeconds, TimeUnit.SECONDS) must be(null) //should not timeout
+ }, name = "direct-manual-ack-1")
+ camel.template.asyncSendBody("direct:manual-ack", "some message").get(defaultTimeoutDuration.toSeconds, TimeUnit.SECONDS) must be(null) //should not timeout
+ stop(ref)
}
"Consumer handles manual Ack failure" in {
val someException = new Exception("e1")
- start(new ManualAckConsumer() {
+ val ref = start(new ManualAckConsumer() {
def endpointUri = "direct:manual-ack"
def receive = { case _ ⇒ sender ! Failure(someException) }
- })
+ }, name = "direct-manual-ack-2")
intercept[ExecutionException] {
- camel.template.asyncSendBody("direct:manual-ack", "some message").get(defaultTimeout.toSeconds, TimeUnit.SECONDS)
+ camel.template.asyncSendBody("direct:manual-ack", "some message").get(defaultTimeoutDuration.toSeconds, TimeUnit.SECONDS)
}.getCause.getCause must be(someException)
+ stop(ref)
}
"Consumer should time-out, if manual Ack not received within replyTimeout and should give a human readable error message" in {
- start(new ManualAckConsumer() {
+ val ref = start(new ManualAckConsumer() {
override def replyTimeout = 10 millis
def endpointUri = "direct:manual-ack"
def receive = { case _ ⇒ }
- })
+ }, name = "direct-manual-ack-3")
intercept[ExecutionException] {
- camel.template.asyncSendBody("direct:manual-ack", "some message").get(defaultTimeout.toSeconds, TimeUnit.SECONDS)
+ camel.template.asyncSendBody("direct:manual-ack", "some message").get(defaultTimeoutDuration.toSeconds, TimeUnit.SECONDS)
}.getCause.getCause.getMessage must include("Failed to get Ack")
+ stop(ref)
+ }
+ "respond to onRouteDefinition" in {
+ val ref = start(new ErrorRespondingConsumer("direct:error-responding-consumer-1"), "error-responding-consumer")
+ filterEvents(EventFilter[TestException](occurrences = 1)) {
+ val response = camel.sendTo("direct:error-responding-consumer-1", "some body")
+ response must be("some body has an error")
+ }
+ stop(ref)
}
}
}
@@ -166,6 +185,25 @@ class ErrorThrowingConsumer(override val endpointUri: String) extends Consumer {
def receive = {
case msg: CamelMessage ⇒ throw new TestException("error: %s" format msg.body)
}
+ override def preRestart(reason: Throwable, message: Option[Any]) {
+ super.preRestart(reason, message)
+ sender ! Failure(reason)
+ }
+}
+
+class ErrorRespondingConsumer(override val endpointUri: String) extends Consumer {
+ def receive = {
+ case msg: CamelMessage ⇒ throw new TestException("Error!")
+ }
+ override def onRouteDefinition = (rd: RouteDefinition) ⇒ {
+ // Catch TestException and handle it by returning a modified version of the in message
+ rd.onException(classOf[TestException]).handled(true).transform(Builder.body.append(" has an error")).end
+ }
+
+ final override def preRestart(reason: Throwable, message: Option[Any]) {
+ super.preRestart(reason, message)
+ sender ! Failure(reason)
+ }
}
class FailingOnceConsumer(override val endpointUri: String) extends Consumer {
@@ -177,6 +215,11 @@ class FailingOnceConsumer(override val endpointUri: String) extends Consumer {
else
throw new TestException("rejected: %s" format msg.body)
}
+
+ final override def preRestart(reason: Throwable, message: Option[Any]) {
+ super.preRestart(reason, message)
+ sender ! Failure(reason)
+ }
}
class TestActor(uri: String = "file://target/abcde") extends Consumer {
@@ -184,13 +227,6 @@ class TestActor(uri: String = "file://target/abcde") extends Consumer {
def receive = { case _ ⇒ /* do nothing */ }
}
-trait ErrorPassing {
- self: Actor ⇒
- final override def preRestart(reason: Throwable, message: Option[Any]) {
- sender ! Failure(reason)
- }
-}
-
trait ManualAckConsumer extends Consumer {
override def autoAck = false
}
diff --git a/akka-camel/src/test/scala/akka/camel/DefaultCamelTest.scala b/akka-camel/src/test/scala/akka/camel/DefaultCamelTest.scala
index dab91623d4..c649424122 100644
--- a/akka-camel/src/test/scala/akka/camel/DefaultCamelTest.scala
+++ b/akka-camel/src/test/scala/akka/camel/DefaultCamelTest.scala
@@ -14,6 +14,8 @@ import org.scalatest.WordSpec
import akka.event.LoggingAdapter
import akka.actor.ActorSystem.Settings
import com.typesafe.config.ConfigFactory
+import org.apache.camel.impl.DefaultCamelContext
+import org.apache.camel.spi.Registry
class DefaultCamelTest extends WordSpec with SharedCamelSystem with MustMatchers with MockitoSugar {
@@ -26,7 +28,7 @@ class DefaultCamelTest extends WordSpec with SharedCamelSystem with MustMatchers
def camelWithMocks = new DefaultCamel(sys) {
override val log = mock[LoggingAdapter]
override lazy val template = mock[ProducerTemplate]
- override lazy val context = mock[CamelContext]
+ override lazy val context = mock[DefaultCamelContext]
override val settings = mock[CamelSettings]
}
@@ -62,5 +64,4 @@ class DefaultCamelTest extends WordSpec with SharedCamelSystem with MustMatchers
verify(camel.context).stop()
}
-
}
\ No newline at end of file
diff --git a/akka-camel/src/test/scala/akka/camel/MessageScalaTest.scala b/akka-camel/src/test/scala/akka/camel/MessageScalaTest.scala
index 6285f13561..dd73027624 100644
--- a/akka-camel/src/test/scala/akka/camel/MessageScalaTest.scala
+++ b/akka-camel/src/test/scala/akka/camel/MessageScalaTest.scala
@@ -23,11 +23,6 @@ class MessageScalaTest extends FunSuite with MustMatchers with SharedCamelSystem
}
}
- test("mustReturnDoubleHeader") {
- val message = CamelMessage("test", Map("test" -> 1.4))
- message.header("test").get must be(1.4)
- }
-
test("mustConvertDoubleHeaderToString") {
val message = CamelMessage("test", Map("test" -> 1.4))
message.headerAs[String]("test").get must be("1.4")
@@ -47,32 +42,14 @@ class MessageScalaTest extends FunSuite with MustMatchers with SharedCamelSystem
}
test("mustSetBodyAndPreserveHeaders") {
- CamelMessage("test1", Map("A" -> "1")).withBody("test2") must be(
+ CamelMessage("test1", Map("A" -> "1")).copy(body = "test2") must be(
CamelMessage("test2", Map("A" -> "1")))
}
test("mustSetHeadersAndPreserveBody") {
- CamelMessage("test1", Map("A" -> "1")).withHeaders(Map("C" -> "3")) must be(
+ CamelMessage("test1", Map("A" -> "1")).copy(headers = Map("C" -> "3")) must be(
CamelMessage("test1", Map("C" -> "3")))
}
-
- test("mustAddHeaderAndPreserveBodyAndHeaders") {
- CamelMessage("test1", Map("A" -> "1")).addHeader("B" -> "2") must be(
- CamelMessage("test1", Map("A" -> "1", "B" -> "2")))
-
- }
-
- test("mustAddHeadersAndPreserveBodyAndHeaders") {
- CamelMessage("test1", Map("A" -> "1")).addHeaders(Map("B" -> "2")) must be(
- CamelMessage("test1", Map("A" -> "1", "B" -> "2")))
-
- }
-
- test("mustRemoveHeadersAndPreserveBodyAndRemainingHeaders") {
- CamelMessage("test1", Map("A" -> "1", "B" -> "2")).withoutHeader("B") must be(
- CamelMessage("test1", Map("A" -> "1")))
-
- }
}
diff --git a/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala b/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala
index 6510f6fd67..c8e5902093 100644
--- a/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala
+++ b/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala
@@ -31,7 +31,6 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
val camelContext = camel.context
// to make testing equality of messages easier, otherwise the breadcrumb shows up in the result.
camelContext.setUseBreadcrumb(false)
-
val timeoutDuration = 1 second
implicit val timeout = Timeout(timeoutDuration)
override protected def beforeAll { camelContext.addRoutes(new TestRoute(system)) }
@@ -40,7 +39,7 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
"A Producer on a sync Camel route" must {
"produce a message and receive normal response" in {
- val producer = system.actorOf(Props(new TestProducer("direct:producer-test-2", true)))
+ val producer = system.actorOf(Props(new TestProducer("direct:producer-test-2", true)), name = "direct-producer-2")
val message = CamelMessage("test", Map(CamelMessage.MessageExchangeId -> "123"))
val future = producer.ask(message)(timeoutDuration)
val expected = CamelMessage("received TEST", Map(CamelMessage.MessageExchangeId -> "123"))
@@ -48,6 +47,7 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
case result: CamelMessage ⇒ assert(result === expected)
case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected)
}
+ system.stop(producer)
}
"produce a message and receive failure response" in {
@@ -68,7 +68,7 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
case _: AkkaCamelException ⇒ Stop
}
- }))
+ }), name = "prod-anonymous-supervisor")
val producer = Await.result[ActorRef](supervisor.ask(Props(new TestProducer("direct:producer-test-2"))).mapTo[ActorRef], timeoutDuration)
val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123"))
filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) {
@@ -78,27 +78,32 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
}
Await.ready(latch, timeoutDuration)
deadActor must be(Some(producer))
+ system.stop(producer)
}
"produce a message oneway" in {
- val producer = system.actorOf(Props(new TestProducer("direct:producer-test-1", true) with Oneway))
+ val producer = system.actorOf(Props(new TestProducer("direct:producer-test-1", true) with Oneway), name = "direct-producer-1-oneway")
mockEndpoint.expectedBodiesReceived("TEST")
producer ! CamelMessage("test", Map())
mockEndpoint.assertIsSatisfied()
+ system.stop(producer)
}
"produces message twoway without sender reference" in {
- val producer = system.actorOf(Props(new TestProducer("direct:producer-test-1")))
+ // this test causes a dead letter which can be ignored. The producer is two-way but a oneway tell is used
+ // to communicate with it and the response is ignored, which ends up in a dead letter
+ val producer = system.actorOf(Props(new TestProducer("direct:producer-test-1")), name = "ignore-this-deadletter-direct-producer-test-no-sender")
mockEndpoint.expectedBodiesReceived("test")
producer ! CamelMessage("test", Map())
mockEndpoint.assertIsSatisfied()
+ system.stop(producer)
}
}
"A Producer on an async Camel route" must {
"produce message to direct:producer-test-3 and receive normal response" in {
- val producer = system.actorOf(Props(new TestProducer("direct:producer-test-3")))
+ val producer = system.actorOf(Props(new TestProducer("direct:producer-test-3")), name = "direct-producer-test-3")
val message = CamelMessage("test", Map(CamelMessage.MessageExchangeId -> "123"))
val future = producer.ask(message)(timeoutDuration)
@@ -109,10 +114,11 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
result must be(expected)
case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected)
}
+ system.stop(producer)
}
"produce message to direct:producer-test-3 and receive failure response" in {
- val producer = system.actorOf(Props(new TestProducer("direct:producer-test-3")))
+ val producer = system.actorOf(Props(new TestProducer("direct:producer-test-3")), name = "direct-producer-test-3-receive-failure")
val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123"))
filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) {
@@ -120,11 +126,12 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
e.getMessage must be("failure")
e.headers must be(Map(CamelMessage.MessageExchangeId -> "123"))
}
+ system.stop(producer)
}
"produce message, forward normal response of direct:producer-test-2 to a replying target actor and receive response" in {
- val target = system.actorOf(Props[ReplyingForwardTarget])
- val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target)))
+ val target = system.actorOf(Props[ReplyingForwardTarget], name = "reply-forwarding-target")
+ val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target)), name = "direct-producer-test-2-forwarder")
val message = CamelMessage("test", Map(CamelMessage.MessageExchangeId -> "123"))
val future = producer.ask(message)(timeoutDuration)
@@ -135,11 +142,13 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
result must be(expected)
case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected)
}
+ system.stop(target)
+ system.stop(producer)
}
"produce message, forward failure response of direct:producer-test-2 to a replying target actor and receive response" in {
- val target = system.actorOf(Props[ReplyingForwardTarget])
- val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target)))
+ val target = system.actorOf(Props[ReplyingForwardTarget], name = "reply-forwarding-target")
+ val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target)), name = "direct-producer-test-2-forwarder-failure")
val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123"))
filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) {
@@ -147,30 +156,36 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
e.getMessage must be("failure")
e.headers must be(Map(CamelMessage.MessageExchangeId -> "123", "test" -> "failure"))
}
+ system.stop(target)
+ system.stop(producer)
}
"produce message, forward normal response to a producing target actor and produce response to direct:forward-test-1" in {
- val target = system.actorOf(Props[ProducingForwardTarget])
- val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target)))
+ val target = system.actorOf(Props[ProducingForwardTarget], name = "producer-forwarding-target")
+ val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target)), name = "direct-producer-test-2-forwarder-to-producing-target")
mockEndpoint.expectedBodiesReceived("received test")
producer.tell(CamelMessage("test", Map()), producer)
mockEndpoint.assertIsSatisfied()
+ system.stop(target)
+ system.stop(producer)
}
"produce message, forward failure response to a producing target actor and produce response to direct:forward-test-1" in {
- val target = system.actorOf(Props[ProducingForwardTarget])
- val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target)))
+ val target = system.actorOf(Props[ProducingForwardTarget], name = "producer-forwarding-target-failure")
+ val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target)), name = "direct-producer-test-2-forward-failure")
filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) {
mockEndpoint.expectedMessageCount(1)
mockEndpoint.message(0).body().isInstanceOf(classOf[akka.actor.Status.Failure])
producer.tell(CamelMessage("fail", Map()), producer)
mockEndpoint.assertIsSatisfied()
}
+ system.stop(target)
+ system.stop(producer)
}
"produce message, forward normal response from direct:producer-test-3 to a replying target actor and receive response" in {
- val target = system.actorOf(Props[ReplyingForwardTarget])
- val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target)))
+ val target = system.actorOf(Props[ReplyingForwardTarget], name = "reply-forwarding-target")
+ val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target)), name = "direct-producer-test-3-to-replying-actor")
val message = CamelMessage("test", Map(CamelMessage.MessageExchangeId -> "123"))
val future = producer.ask(message)(timeoutDuration)
@@ -180,11 +195,13 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
message must be(expected)
case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected)
}
+ system.stop(target)
+ system.stop(producer)
}
"produce message, forward failure response from direct:producer-test-3 to a replying target actor and receive response" in {
- val target = system.actorOf(Props[ReplyingForwardTarget])
- val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target)))
+ val target = system.actorOf(Props[ReplyingForwardTarget], name = "reply-forwarding-target")
+ val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target)), name = "direct-producer-test-3-forward-failure")
val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123"))
filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) {
@@ -192,19 +209,23 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
e.getMessage must be("failure")
e.headers must be(Map(CamelMessage.MessageExchangeId -> "123", "test" -> "failure"))
}
+ system.stop(target)
+ system.stop(producer)
}
"produce message, forward normal response from direct:producer-test-3 to a producing target actor and produce response to direct:forward-test-1" in {
- val target = system.actorOf(Props[ProducingForwardTarget])
- val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target)))
+ val target = system.actorOf(Props[ProducingForwardTarget], "producing-forward-target-normal")
+ val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target)), name = "direct-producer-test-3-forward-normal")
mockEndpoint.expectedBodiesReceived("received test")
producer.tell(CamelMessage("test", Map()), producer)
mockEndpoint.assertIsSatisfied()
+ system.stop(target)
+ system.stop(producer)
}
"produce message, forward failure response from direct:producer-test-3 to a producing target actor and produce response to direct:forward-test-1" in {
- val target = system.actorOf(Props[ProducingForwardTarget])
- val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target)))
+ val target = system.actorOf(Props[ProducingForwardTarget], "producing-forward-target-failure")
+ val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target)), name = "direct-producer-test-3-forward-failure-producing-target")
filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) {
mockEndpoint.expectedMessageCount(1)
mockEndpoint.message(0).body().isInstanceOf(classOf[akka.actor.Status.Failure])
@@ -253,7 +274,7 @@ object ProducerFeatureTest {
class ReplyingForwardTarget extends Actor {
def receive = {
case msg: CamelMessage ⇒
- context.sender ! (msg.addHeader("test" -> "result"))
+ context.sender ! (msg.copy(headers = msg.headers + ("test" -> "result")))
case msg: akka.actor.Status.Failure ⇒
msg.cause match {
case e: AkkaCamelException ⇒ context.sender ! Status.Failure(new AkkaCamelException(e, e.headers + ("test" -> "failure")))
diff --git a/akka-camel/src/test/scala/akka/camel/ProducerRegistryTest.scala b/akka-camel/src/test/scala/akka/camel/ProducerRegistryTest.scala
deleted file mode 100644
index 4d46544b99..0000000000
--- a/akka-camel/src/test/scala/akka/camel/ProducerRegistryTest.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Copyright (C) 2009-2012 Typesafe Inc.
- */
-
-package akka.camel
-
-import language.postfixOps
-
-import org.scalatest.matchers.MustMatchers
-import org.scalatest.WordSpec
-import akka.camel.TestSupport.SharedCamelSystem
-import scala.concurrent.util.duration._
-import akka.actor.{ ActorRef, Props }
-import akka.util.Timeout
-import scala.concurrent.Await
-
-class ProducerRegistryTest extends WordSpec with MustMatchers with SharedCamelSystem {
- implicit val timeout = 5.seconds
- implicit val ec = system.dispatcher
-
- "A ProducerRegistry" must {
- def newEmptyActor: ActorRef = system.actorOf(Props.empty)
- def registerProcessorFor(actorRef: ActorRef) = camel.registerProducer(actorRef, "mock:mock")._2
-
- "register a started SendProcessor for the producer, which is stopped when the actor is stopped" in {
- val actorRef = newEmptyActor
- val processor = registerProcessorFor(actorRef)
- Await.result(camel.activationFutureFor(actorRef), timeout)
- processor.isStarted must be(true)
- system.stop(actorRef)
- Await.result(camel.deactivationFutureFor(actorRef), timeout)
- (processor.isStopping || processor.isStopped) must be(true)
- }
- "remove and stop the SendProcessor if the actorRef is registered" in {
- val actorRef = newEmptyActor
- val processor = registerProcessorFor(actorRef)
- camel.unregisterProducer(actorRef)
- (processor.isStopping || processor.isStopped) must be(true)
- }
- "remove and stop only the SendProcessor for the actorRef that is registered" in {
- val actorRef1 = newEmptyActor
- val actorRef2 = newEmptyActor
- val processor = registerProcessorFor(actorRef2)
- // this generates a warning on the activationTracker.
- camel.unregisterProducer(actorRef1)
- (!processor.isStopped && !processor.isStopping) must be(true)
-
- camel.unregisterProducer(actorRef2)
- (processor.isStopping || processor.isStopped) must be(true)
- }
- }
-
-}
\ No newline at end of file
diff --git a/akka-camel/src/test/scala/akka/camel/TestSupport.scala b/akka-camel/src/test/scala/akka/camel/TestSupport.scala
index fe22b0e7a0..c25ccdab3c 100644
--- a/akka-camel/src/test/scala/akka/camel/TestSupport.scala
+++ b/akka-camel/src/test/scala/akka/camel/TestSupport.scala
@@ -19,11 +19,12 @@ import akka.util.Timeout
import akka.testkit.AkkaSpec
private[camel] object TestSupport {
+ def start(actor: ⇒ Actor, name: String)(implicit system: ActorSystem, timeout: Timeout): ActorRef =
+ Await.result(CamelExtension(system).activationFutureFor(system.actorOf(Props(actor), name))(timeout, system.dispatcher), timeout.duration)
- def start(actor: ⇒ Actor)(implicit system: ActorSystem): ActorRef = {
- val actorRef = system.actorOf(Props(actor))
- Await.result(CamelExtension(system).activationFutureFor(actorRef)(10 seconds, system.dispatcher), 10 seconds)
- actorRef
+ def stop(actorRef: ActorRef)(implicit system: ActorSystem, timeout: Timeout) {
+ system.stop(actorRef)
+ Await.result(CamelExtension(system).deactivationFutureFor(actorRef)(timeout, system.dispatcher), timeout.duration)
}
private[camel] implicit def camelToTestWrapper(camel: Camel) = new CamelTestWrapper(camel)
diff --git a/akka-camel/src/test/scala/akka/camel/UntypedProducerTest.scala b/akka-camel/src/test/scala/akka/camel/UntypedProducerTest.scala
index ba853083bf..a9d097aa10 100644
--- a/akka-camel/src/test/scala/akka/camel/UntypedProducerTest.scala
+++ b/akka-camel/src/test/scala/akka/camel/UntypedProducerTest.scala
@@ -33,7 +33,7 @@ class UntypedProducerTest extends WordSpec with MustMatchers with BeforeAndAfter
"An UntypedProducer producing a message to a sync Camel route" must {
"produce a message and receive a normal response" in {
- val producer = system.actorOf(Props[SampleUntypedReplyingProducer])
+ val producer = system.actorOf(Props[SampleUntypedReplyingProducer], name = "sample-untyped-replying-producer")
val message = CamelMessage("test", Map(CamelMessage.MessageExchangeId -> "123"))
val future = producer.ask(message)(timeout)
@@ -47,7 +47,7 @@ class UntypedProducerTest extends WordSpec with MustMatchers with BeforeAndAfter
}
"produce a message and receive a failure response" in {
- val producer = system.actorOf(Props[SampleUntypedReplyingProducer])
+ val producer = system.actorOf(Props[SampleUntypedReplyingProducer], name = "sample-untyped-replying-producer-failure")
val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123"))
filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) {
@@ -66,7 +66,7 @@ class UntypedProducerTest extends WordSpec with MustMatchers with BeforeAndAfter
"An UntypedProducer producing a message to a sync Camel route and then forwarding the response" must {
"produce a message and send a normal response to direct:forward-test-1" in {
- val producer = system.actorOf(Props[SampleUntypedForwardingProducer])
+ val producer = system.actorOf(Props[SampleUntypedForwardingProducer], name = "sample-untyped-forwarding-producer")
mockEndpoint.expectedBodiesReceived("received test")
producer.tell(CamelMessage("test", Map[String, Any]()), producer)
diff --git a/akka-camel/src/test/scala/akka/camel/internal/ActivationTrackerTest.scala b/akka-camel/src/test/scala/akka/camel/internal/ActivationTrackerTest.scala
index ddff6017ec..b290ba254b 100644
--- a/akka-camel/src/test/scala/akka/camel/internal/ActivationTrackerTest.scala
+++ b/akka-camel/src/test/scala/akka/camel/internal/ActivationTrackerTest.scala
@@ -5,9 +5,9 @@ import org.scalatest.matchers.MustMatchers
import scala.concurrent.util.duration._
import org.scalatest.{ GivenWhenThen, BeforeAndAfterEach, BeforeAndAfterAll, WordSpec }
import akka.actor.{ Props, ActorSystem }
-import scala.concurrent.util.Duration
import akka.camel._
import akka.testkit.{ TimingTest, TestProbe, TestKit }
+import akka.camel.internal.ActivationProtocol._
import scala.concurrent.util.FiniteDuration
class ActivationTrackerTest extends TestKit(ActorSystem("test")) with WordSpec with MustMatchers with BeforeAndAfterAll with BeforeAndAfterEach with GivenWhenThen {
@@ -25,7 +25,7 @@ class ActivationTrackerTest extends TestKit(ActorSystem("test")) with WordSpec w
anotherAwaiting = new Awaiting(actor)
}
- val at = system.actorOf(Props[ActivationTracker])
+ val at = system.actorOf(Props[ActivationTracker], name = "activationTrackker")
"ActivationTracker forwards activation message to all awaiting parties" taggedAs TimingTest in {
awaiting.awaitActivation()
diff --git a/akka-camel/src/test/scala/akka/camel/internal/component/ActorProducerTest.scala b/akka-camel/src/test/scala/akka/camel/internal/component/ActorProducerTest.scala
index 09f9431cc9..3a7cca93e2 100644
--- a/akka-camel/src/test/scala/akka/camel/internal/component/ActorProducerTest.scala
+++ b/akka-camel/src/test/scala/akka/camel/internal/component/ActorProducerTest.scala
@@ -11,7 +11,7 @@ import org.mockito.Mockito._
import org.apache.camel.{ CamelContext, ProducerTemplate, AsyncCallback }
import java.util.concurrent.atomic.AtomicBoolean
import scala.concurrent.util.duration._
-import scala.concurrent.util.Duration
+import concurrent.util.{ FiniteDuration, Duration }
import java.lang.String
import akka.actor.{ ActorRef, Props, ActorSystem, Actor }
import akka.camel._
@@ -21,14 +21,17 @@ import akka.camel.TestSupport._
import java.util.concurrent.{ TimeoutException, CountDownLatch, TimeUnit }
import org.mockito.{ ArgumentMatcher, Matchers, Mockito }
import org.scalatest.matchers.MustMatchers
-import akka.actor.Status.Failure
+import akka.actor.Status.{ Success, Failure }
import com.typesafe.config.ConfigFactory
import akka.actor.ActorSystem.Settings
import akka.event.LoggingAdapter
import akka.testkit.{ TimingTest, TestKit, TestProbe }
-import scala.concurrent.util.FiniteDuration
+import org.apache.camel.impl.DefaultCamelContext
+import concurrent.{ Await, Promise, Future }
+import akka.util.Timeout
class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with MustMatchers with ActorProducerFixture {
+ implicit val timeout = Timeout(10 seconds)
"ActorProducer" when {
@@ -54,7 +57,45 @@ class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with
"not expect response and not block" taggedAs TimingTest in {
time(producer.processExchangeAdapter(exchange)) must be < (200 millis)
}
+ }
+ "manualAck" when {
+
+ "response is Ack" must {
+ "process the exchange" in {
+ producer = given(outCapable = false, autoAck = false)
+ import system.dispatcher
+ val future = Future {
+ producer.processExchangeAdapter(exchange)
+ }
+ within(1 second) {
+ probe.expectMsgType[CamelMessage]
+ info("message sent to consumer")
+ probe.sender ! Ack
+ }
+ verify(exchange, never()).setResponse(any[CamelMessage])
+ info("no response forwarded to exchange")
+ Await.ready(future, timeout.duration)
+ }
+ }
+ "the consumer does not respond wit Ack" must {
+ "not block forever" in {
+ producer = given(outCapable = false, autoAck = false)
+ import system.dispatcher
+ val future = Future {
+ producer.processExchangeAdapter(exchange)
+ }
+ within(1 second) {
+ probe.expectMsgType[CamelMessage]
+ info("message sent to consumer")
+ }
+ verify(exchange, never()).setResponse(any[CamelMessage])
+ info("no response forwarded to exchange")
+ intercept[TimeoutException] {
+ Await.ready(future, camel.settings.replyTimeout - (1 seconds))
+ }
+ }
+ }
}
"out capable" when {
@@ -93,9 +134,6 @@ class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with
}
}
-
- //TODO: write more tests for synchronous process(exchange) method
-
}
"asynchronous" when {
@@ -284,8 +322,21 @@ trait ActorProducerFixture extends MockitoSugar with BeforeAndAfterAll with Befo
def camelWithMocks = new DefaultCamel(sys) {
override val log = mock[LoggingAdapter]
override lazy val template = mock[ProducerTemplate]
- override lazy val context = mock[CamelContext]
- override val settings = mock[CamelSettings]
+ override lazy val context = mock[DefaultCamelContext]
+ override val settings = new CamelSettings(ConfigFactory.parseString(
+ """
+ akka {
+ camel {
+ jmx = off
+ streamingCache = on
+ consumer {
+ auto-ack = on
+ reply-timeout = 2s
+ activation-timeout = 10s
+ }
+ }
+ }
+ """))
}
camel = camelWithMocks
@@ -351,6 +402,6 @@ trait ActorProducerFixture extends MockitoSugar with BeforeAndAfterAll with Befo
def receive = {
case msg ⇒ sender ! "received " + msg
}
- }))
+ }), name = "echoActor")
}
diff --git a/akka-camel/src/test/scala/akka/camel/internal/component/DurationConverterTest.scala b/akka-camel/src/test/scala/akka/camel/internal/component/DurationConverterTest.scala
index c66a77c287..e8918a5b67 100644
--- a/akka-camel/src/test/scala/akka/camel/internal/component/DurationConverterTest.scala
+++ b/akka-camel/src/test/scala/akka/camel/internal/component/DurationConverterTest.scala
@@ -10,7 +10,8 @@ import org.scalatest.matchers.MustMatchers
import scala.concurrent.util.duration._
import scala.concurrent.util.Duration
import org.scalatest.WordSpec
-import org.apache.camel.{ TypeConversionException, NoTypeConversionAvailableException }
+import org.apache.camel.TypeConversionException
+import language.postfixOps
class DurationConverterSpec extends WordSpec with MustMatchers {
import DurationTypeConverter._
diff --git a/akka-docs/rst/java/code/docs/camel/ActivationTestBase.java b/akka-docs/rst/java/code/docs/camel/ActivationTestBase.java
index e0b9b60c2b..5c406e0fb4 100644
--- a/akka-docs/rst/java/code/docs/camel/ActivationTestBase.java
+++ b/akka-docs/rst/java/code/docs/camel/ActivationTestBase.java
@@ -6,6 +6,7 @@ package docs.camel;
import akka.camel.Camel;
import akka.camel.CamelExtension;
import akka.camel.javaapi.UntypedConsumerActor;
+ import akka.util.Timeout;
import scala.concurrent.Future;
import scala.concurrent.util.Duration;
import scala.concurrent.util.FiniteDuration;
@@ -26,14 +27,14 @@ public class ActivationTestBase {
ActorRef producer = system.actorOf(props,"myproducer");
Camel camel = CamelExtension.get(system);
// get a future reference to the activation of the endpoint of the Consumer Actor
- FiniteDuration duration = Duration.create(10, SECONDS);
- Future activationFuture = camel.activationFutureFor(producer, duration, system.dispatcher());
+ Timeout timeout = new Timeout(Duration.create(10, SECONDS));
+ Future activationFuture = camel.activationFutureFor(producer, timeout, system.dispatcher());
//#CamelActivation
//#CamelDeactivation
// ..
system.stop(producer);
// get a future reference to the deactivation of the endpoint of the Consumer Actor
- Future deactivationFuture = camel.deactivationFutureFor(producer, duration, system.dispatcher());
+ Future deactivationFuture = camel.deactivationFutureFor(producer, timeout, system.dispatcher());
//#CamelDeactivation
system.shutdown();
}
diff --git a/akka-docs/rst/java/code/docs/camel/Consumer4.java b/akka-docs/rst/java/code/docs/camel/Consumer4.java
index 960b523f3a..d1400b2e2f 100644
--- a/akka-docs/rst/java/code/docs/camel/Consumer4.java
+++ b/akka-docs/rst/java/code/docs/camel/Consumer4.java
@@ -11,7 +11,7 @@ public class Consumer4 extends UntypedConsumerActor {
private final static FiniteDuration timeout = Duration.create(500, TimeUnit.MILLISECONDS);
@Override
- public Duration replyTimeout() {
+ public FiniteDuration replyTimeout() {
return timeout;
}
diff --git a/akka-docs/rst/java/code/docs/camel/ErrorThrowingConsumer.java b/akka-docs/rst/java/code/docs/camel/ErrorThrowingConsumer.java
index 9a01717287..ebf49dab11 100644
--- a/akka-docs/rst/java/code/docs/camel/ErrorThrowingConsumer.java
+++ b/akka-docs/rst/java/code/docs/camel/ErrorThrowingConsumer.java
@@ -3,6 +3,7 @@ package docs.camel;
import akka.actor.Status;
import akka.camel.CamelMessage;
import akka.camel.javaapi.UntypedConsumerActor;
+import akka.dispatch.Mapper;
import org.apache.camel.builder.Builder;
import org.apache.camel.model.ProcessorDefinition;
import org.apache.camel.model.RouteDefinition;
@@ -11,6 +12,13 @@ import scala.Option;
public class ErrorThrowingConsumer extends UntypedConsumerActor{
private String uri;
+ private static Mapper> mapper = new Mapper>() {
+ public ProcessorDefinition> apply(RouteDefinition rd) {
+ // Catch any exception and handle it by returning the exception message as response
+ return rd.onException(Exception.class).handled(true).transform(Builder.exceptionMessage()).end();
+ }
+ };
+
public ErrorThrowingConsumer(String uri){
this.uri = uri;
}
@@ -29,9 +37,8 @@ public class ErrorThrowingConsumer extends UntypedConsumerActor{
}
@Override
- public ProcessorDefinition> onRouteDefinition(RouteDefinition rd) {
- // Catch any exception and handle it by returning the exception message as response
- return rd.onException(Exception.class).handled(true).transform(Builder.exceptionMessage()).end();
+ public Mapper> getRouteDefinitionHandler() {
+ return mapper;
}
@Override
diff --git a/akka-docs/rst/java/code/docs/camel/Responder.java b/akka-docs/rst/java/code/docs/camel/Responder.java
index 12ca8603cf..8750aaf700 100644
--- a/akka-docs/rst/java/code/docs/camel/Responder.java
+++ b/akka-docs/rst/java/code/docs/camel/Responder.java
@@ -2,6 +2,7 @@ package docs.camel;
//#CustomRoute
import akka.actor.UntypedActor;
import akka.camel.CamelMessage;
+import akka.dispatch.Mapper;
import akka.japi.Function;
public class Responder extends UntypedActor{
@@ -15,7 +16,8 @@ public class Responder extends UntypedActor{
}
private CamelMessage createResponse(CamelMessage msg) {
- return msg.mapBody(new Function() {
+ return msg.mapBody(new Mapper() {
+ @Override
public String apply(String body) {
return String.format("received %s", body);
}
diff --git a/akka-docs/rst/java/code/docs/camel/Transformer.java b/akka-docs/rst/java/code/docs/camel/Transformer.java
index be077cf27d..17338f6837 100644
--- a/akka-docs/rst/java/code/docs/camel/Transformer.java
+++ b/akka-docs/rst/java/code/docs/camel/Transformer.java
@@ -2,6 +2,7 @@ package docs.camel;
//#TransformOutgoingMessage
import akka.camel.CamelMessage;
import akka.camel.javaapi.UntypedProducerActor;
+import akka.dispatch.Mapper;
import akka.japi.Function;
public class Transformer extends UntypedProducerActor{
@@ -16,7 +17,8 @@ public class Transformer extends UntypedProducerActor{
}
private CamelMessage upperCase(CamelMessage msg) {
- return msg.mapBody(new Function() {
+ return msg.mapBody(new Mapper() {
+ @Override
public String apply(String body) {
return body.toUpperCase();
}
diff --git a/akka-docs/rst/java/code/docs/camel/sample/http/HttpTransformer.java b/akka-docs/rst/java/code/docs/camel/sample/http/HttpTransformer.java
index 267d828cd7..3f35b5a2d4 100644
--- a/akka-docs/rst/java/code/docs/camel/sample/http/HttpTransformer.java
+++ b/akka-docs/rst/java/code/docs/camel/sample/http/HttpTransformer.java
@@ -3,6 +3,7 @@ package docs.camel.sample.http;
import akka.actor.Status;
import akka.actor.UntypedActor;
import akka.camel.CamelMessage;
+import akka.dispatch.Mapper;
import akka.japi.Function;
//#HttpExample
@@ -10,7 +11,8 @@ public class HttpTransformer extends UntypedActor{
public void onReceive(Object message) {
if (message instanceof CamelMessage) {
CamelMessage camelMessage = (CamelMessage) message;
- CamelMessage replacedMessage = camelMessage.mapBody(new Function