diff --git a/akka-camel/src/main/resources/reference.conf b/akka-camel/src/main/resources/reference.conf
new file mode 100644
index 0000000000..658c729fb3
--- /dev/null
+++ b/akka-camel/src/main/resources/reference.conf
@@ -0,0 +1,16 @@
+####################################
+# Akka Camel Reference Config File #
+####################################
+
+# This is the reference config file that contains all the default settings.
+# Make your edits/overrides in your application.conf.
+
+akka {
+ camel {
+ consumer {
+ autoAck = true
+ replyTimeout = 1m
+ activationTimeout = 10s
+ }
+ }
+}
diff --git a/akka-camel/src/main/scala/akka/camel/ActorRouteDefinition.scala b/akka-camel/src/main/scala/akka/camel/ActorRouteDefinition.scala
index 6286edad87..ebc99c7a92 100644
--- a/akka-camel/src/main/scala/akka/camel/ActorRouteDefinition.scala
+++ b/akka-camel/src/main/scala/akka/camel/ActorRouteDefinition.scala
@@ -4,9 +4,10 @@
package akka.camel
-import internal.component.ActorEndpointPath
+import internal.component.CamelPath
import akka.actor.ActorRef
import org.apache.camel.model.ProcessorDefinition
+import scala.concurrent.util.Duration
/**
* Wraps a [[org.apache.camel.model.ProcessorDefinition]].
@@ -23,22 +24,25 @@ import org.apache.camel.model.ProcessorDefinition
* }}}
* @param definition the processor definition
*/
-class ActorRouteDefinition(definition: ProcessorDefinition[_]) {
+class ActorRouteDefinition[T <: ProcessorDefinition[T]](definition: ProcessorDefinition[T]) {
/**
* Sends the message to an ActorRef endpoint.
- * @param actorRef the consumer with a default configuration.
+ * @param actorRef the actorRef to the actor.
* @return the path to the actor, as a camel uri String
*/
- def to(actorRef: ActorRef) = //FIXME What is the return type of this?
- definition.to(ActorEndpointPath(actorRef).toCamelPath())
+ def to(actorRef: ActorRef): T = definition.to(CamelPath.toUri(actorRef))
/**
* Sends the message to an ActorRef endpoint
* @param actorRef the consumer
- * @param consumerConfig the configuration for the consumer
+ * @param autoAck Determines whether one-way communications between an endpoint and this consumer actor
+ * should be auto-acknowledged or application-acknowledged.
+ * This flag has only effect when exchange is in-only.
+ * @param replyTimeout When endpoint is out-capable (can produce responses) replyTimeout is the maximum time
+ * the endpoint can take to send the response before the message exchange fails. It defaults to 1 minute.
+ * This setting is used for out-capable, in-only, manually acknowledged communication.
* @return the path to the actor, as a camel uri String
*/
- def to(actorRef: ActorRef, consumerConfig: ConsumerConfig) = //FIXME What is the return type of this?
- definition.to(ActorEndpointPath(actorRef).toCamelPath(consumerConfig))
+ def to(actorRef: ActorRef, autoAck: Boolean, replyTimeout: Duration): T = definition.to(CamelPath.toUri(actorRef, autoAck, replyTimeout))
}
diff --git a/akka-camel/src/main/scala/akka/camel/Camel.scala b/akka-camel/src/main/scala/akka/camel/Camel.scala
index 72252212cf..0ba241590b 100644
--- a/akka-camel/src/main/scala/akka/camel/Camel.scala
+++ b/akka-camel/src/main/scala/akka/camel/Camel.scala
@@ -7,6 +7,9 @@ package akka.camel
import internal._
import akka.actor._
import org.apache.camel.{ ProducerTemplate, CamelContext }
+import com.typesafe.config.Config
+import scala.concurrent.util.Duration
+import java.util.concurrent.TimeUnit._
/**
* Camel trait encapsulates the underlying camel machinery.
@@ -30,6 +33,33 @@ trait Camel extends ConsumerRegistry with ProducerRegistry with Extension with A
*/
def template: ProducerTemplate
+ /**
+ * The settings for the CamelExtension
+ */
+ def settings: CamelSettings
+}
+
+/**
+ * Settings for the Camel Extension
+ * @param config the config
+ */
+class CamelSettings(val config: Config) {
+ /**
+ * Configured setting for how long the actor should wait for activation before it fails.
+ */
+ final val activationTimeout: Duration = Duration(config.getMilliseconds("akka.camel.consumer.activationTimeout"), MILLISECONDS)
+ /**
+ * Configured setting, When endpoint is out-capable (can produce responses) replyTimeout is the maximum time
+ * the endpoint can take to send the response before the message exchange fails.
+ * This setting is used for out-capable, in-only, manually acknowledged communication.
+ */
+ final val replyTimeout: Duration = Duration(config.getMilliseconds("akka.camel.consumer.replyTimeout"), MILLISECONDS)
+ /**
+ * Configured setting which determines whether one-way communications between an endpoint and this consumer actor
+ * should be auto-acknowledged or application-acknowledged.
+ * This flag has only effect when exchange is in-only.
+ */
+ final val autoAck: Boolean = config.getBoolean("akka.camel.consumer.autoAck")
}
/**
diff --git a/akka-camel/src/main/scala/akka/camel/CamelMessage.scala b/akka-camel/src/main/scala/akka/camel/CamelMessage.scala
index cb4121189d..fc29e08382 100644
--- a/akka-camel/src/main/scala/akka/camel/CamelMessage.scala
+++ b/akka-camel/src/main/scala/akka/camel/CamelMessage.scala
@@ -229,7 +229,7 @@ object CamelMessage {
/**
* Positive acknowledgement message (used for application-acknowledged message receipts).
- * When `autoack` is set to false in the [[akka.camel.Consumer]], you can send an `Ack` to the sender of the CamelMessage.
+ * When `autoAck` is set to false in the [[akka.camel.Consumer]], you can send an `Ack` to the sender of the CamelMessage.
* @author Martin Krasser
*/
case object Ack {
diff --git a/akka-camel/src/main/scala/akka/camel/CamelSupport.scala b/akka-camel/src/main/scala/akka/camel/CamelSupport.scala
new file mode 100644
index 0000000000..84cd23e339
--- /dev/null
+++ b/akka-camel/src/main/scala/akka/camel/CamelSupport.scala
@@ -0,0 +1,21 @@
+package akka.camel
+
+import akka.actor.Actor
+import com.typesafe.config.Config
+import scala.concurrent.util.Duration
+import java.util.concurrent.TimeUnit._
+
+private[camel] trait CamelSupport { this: Actor ⇒
+
+ /**
+ * For internal use only. Returns a [[akka.camel.Camel]] trait which provides access to the CamelExtension.
+ */
+ protected val camel = CamelExtension(context.system)
+
+ /**
+ * Returns the CamelContext.
+ * The camelContext is defined implicit for simplifying the use of CamelMessage from the Scala API.
+ */
+ protected implicit def camelContext = camel.context
+
+}
diff --git a/akka-camel/src/main/scala/akka/camel/Consumer.scala b/akka-camel/src/main/scala/akka/camel/Consumer.scala
index 93356d100a..72daa89da0 100644
--- a/akka-camel/src/main/scala/akka/camel/Consumer.scala
+++ b/akka-camel/src/main/scala/akka/camel/Consumer.scala
@@ -8,7 +8,6 @@ import language.postfixOps
import internal.component.DurationTypeConverter
import org.apache.camel.model.{ RouteDefinition, ProcessorDefinition }
-
import akka.actor._
import scala.concurrent.util.Duration
import scala.concurrent.util.duration._
@@ -18,37 +17,45 @@ import scala.concurrent.util.duration._
*
* @author Martin Krasser
*/
-trait Consumer extends Actor with ConsumerConfig {
-
+trait Consumer extends Actor with CamelSupport with ConsumerConfig {
+ /**
+ * Must return the Camel endpoint URI that the consumer wants to consume messages from.
+ */
def endpointUri: String
- protected[this] implicit def camel = CamelExtension(context.system)
- protected[this] implicit def camelContext = camel.context
-
- camel.registerConsumer(endpointUri, this, activationTimeout)
+ /**
+ * Registers the consumer endpoint. Note: when overriding this method, be sure to
+ * call 'super.preRestart', otherwise the consumer endpoint will not be registered.
+ */
+ override def preStart() {
+ super.preStart()
+ // Possible FIXME. registering the endpoint here because of problems
+ // with order of execution of trait body in the Java version (UntypedConsumerActor)
+ // where getEndpointUri is called before its constructor (where a uri is set to return from getEndpointUri)
+ // and remains null. CustomRouteTest provides a test to verify this.
+ camel.registerConsumer(endpointUri, this, activationTimeout)
+ }
}
-trait ConsumerConfig {
-
+trait ConsumerConfig { this: CamelSupport ⇒
/**
* How long the actor should wait for activation before it fails.
*/
- def activationTimeout: Duration = 10 seconds // FIXME Should be configured in reference.conf
+ def activationTimeout: Duration = camel.settings.activationTimeout
/**
* When endpoint is out-capable (can produce responses) replyTimeout is the maximum time
* the endpoint can take to send the response before the message exchange fails. It defaults to 1 minute.
* This setting is used for out-capable, in-only, manually acknowledged communication.
- * When the blocking is set to Blocking replyTimeout is ignored.
*/
- def replyTimeout: Duration = 1 minute // FIXME Should be configured in reference.conf
+ def replyTimeout: Duration = camel.settings.replyTimeout
/**
* Determines whether one-way communications between an endpoint and this consumer actor
* should be auto-acknowledged or application-acknowledged.
* This flag has only effect when exchange is in-only.
*/
- def autoack: Boolean = true // FIXME Should be configured in reference.conf
+ def autoAck: Boolean = camel.settings.autoAck
/**
* The route definition handler for creating a custom route to this consumer instance.
@@ -56,9 +63,4 @@ trait ConsumerConfig {
//FIXME: write a test confirming onRouteDefinition gets called
def onRouteDefinition(rd: RouteDefinition): ProcessorDefinition[_] = rd
- /**
- * For internal use only. Converts this ConsumerConfig to camel URI parameters
- * @return
- */
- private[camel] def toCamelParameters: String = "autoack=%s&replyTimeout=%s" format (autoack, DurationTypeConverter.toString(replyTimeout))
}
diff --git a/akka-camel/src/main/scala/akka/camel/Producer.scala b/akka-camel/src/main/scala/akka/camel/Producer.scala
index 5a7262a133..c6e8c6714d 100644
--- a/akka-camel/src/main/scala/akka/camel/Producer.scala
+++ b/akka-camel/src/main/scala/akka/camel/Producer.scala
@@ -15,13 +15,7 @@ import org.apache.camel.processor.SendProcessor
*
* @author Martin Krasser
*/
-trait ProducerSupport { this: Actor ⇒
- protected[this] implicit def camel = CamelExtension(context.system) // FIXME This is duplicated from Consumer, create a common base-trait?
-
- /**
- * camelContext implicit is useful when using advanced methods of CamelMessage.
- */
- protected[this] implicit def camelContext = camel.context // FIXME This is duplicated from Consumer, create a common base-trait?
+trait ProducerSupport extends CamelSupport { this: Actor ⇒
protected[this] lazy val (endpoint: Endpoint, processor: SendProcessor) = camel.registerProducer(self, endpointUri)
@@ -131,10 +125,10 @@ trait ProducerSupport { this: Actor ⇒
trait Producer extends ProducerSupport { this: Actor ⇒
/**
- * Default implementation of Actor.receive. Any messages received by this actors
+ * Implementation of Actor.receive. Any messages received by this actor
* will be produced to the endpoint specified by endpointUri.
*/
- def receive: Actor.Receive = produce
+ final def receive: Actor.Receive = produce
}
/**
diff --git a/akka-camel/src/main/scala/akka/camel/internal/ConsumerRegistry.scala b/akka-camel/src/main/scala/akka/camel/internal/ConsumerRegistry.scala
index b163a54674..e172598b57 100644
--- a/akka-camel/src/main/scala/akka/camel/internal/ConsumerRegistry.scala
+++ b/akka-camel/src/main/scala/akka/camel/internal/ConsumerRegistry.scala
@@ -5,7 +5,7 @@
package akka.camel.internal
import akka.camel._
-import component.ActorEndpointPath
+import component.CamelPath
import java.io.InputStream
import org.apache.camel.builder.RouteBuilder
@@ -122,7 +122,7 @@ private[camel] case class RegisterConsumer(endpointUri: String, actorRef: ActorR
*/
private[camel] class ConsumerActorRouteBuilder(endpointUri: String, consumer: ActorRef, config: ConsumerConfig) extends RouteBuilder {
- protected def targetActorUri = ActorEndpointPath(consumer).toCamelPath(config)
+ protected def targetActorUri = CamelPath.toUri(consumer, config.autoAck, config.replyTimeout)
def configure() {
val scheme = endpointUri take endpointUri.indexOf(":") // e.g. "http" from "http://whatever/..."
diff --git a/akka-camel/src/main/scala/akka/camel/internal/DefaultCamel.scala b/akka-camel/src/main/scala/akka/camel/internal/DefaultCamel.scala
index cae98ce33c..59752526f4 100644
--- a/akka-camel/src/main/scala/akka/camel/internal/DefaultCamel.scala
+++ b/akka-camel/src/main/scala/akka/camel/internal/DefaultCamel.scala
@@ -5,9 +5,10 @@ import akka.camel.internal.component.{ DurationTypeConverter, ActorComponent }
import org.apache.camel.impl.DefaultCamelContext
import scala.Predef._
import akka.event.Logging
-import akka.camel.Camel
+import akka.camel.{ CamelSettings, Camel }
import scala.util.control.NonFatal
import scala.concurrent.util.Duration
+
import org.apache.camel.{ ProducerTemplate, CamelContext }
/**
@@ -29,11 +30,13 @@ private[camel] class DefaultCamel(val system: ActorSystem) extends Camel {
val ctx = new DefaultCamelContext
ctx.setName(system.name)
ctx.setStreamCaching(true)
- ctx.addComponent("actor", new ActorComponent(this))
+ ctx.addComponent("akka", new ActorComponent(this, system))
ctx.getTypeConverterRegistry.addTypeConverter(classOf[Duration], classOf[String], DurationTypeConverter)
ctx
}
+ val settings = new CamelSettings(system.settings.config)
+
lazy val template: ProducerTemplate = context.createProducerTemplate()
/**
diff --git a/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala b/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala
index 309e84b032..e12456d2cf 100644
--- a/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala
+++ b/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala
@@ -22,8 +22,7 @@ import scala.util.control.NonFatal
import java.util.concurrent.{ TimeoutException, CountDownLatch }
import akka.util.Timeout
import akka.camel.internal.CamelExchangeAdapter
-import akka.camel.{ ActorNotRegisteredException, ConsumerConfig, Camel, Ack, FailureResult, CamelMessage }
-
+import akka.camel.{ ActorNotRegisteredException, Camel, Ack, FailureResult, CamelMessage }
/**
* For internal use only.
* Creates Camel [[org.apache.camel.Endpoint]]s that send messages to [[akka.camel.Consumer]] actors through an [[akka.camel.internal.component.ActorProducer]].
@@ -36,12 +35,12 @@ import akka.camel.{ ActorNotRegisteredException, ConsumerConfig, Camel, Ack, Fai
*
* @author Martin Krasser
*/
-private[camel] class ActorComponent(camel: Camel) extends DefaultComponent {
+private[camel] class ActorComponent(camel: Camel, system: ActorSystem) extends DefaultComponent {
/**
* @see org.apache.camel.Component
*/
def createEndpoint(uri: String, remaining: String, parameters: JMap[String, Object]): ActorEndpoint =
- new ActorEndpoint(uri, this, ActorEndpointPath.fromCamelPath(remaining), camel)
+ new ActorEndpoint(uri, this, ActorEndpointPath.fromCamelPath(uri), camel)
}
/**
@@ -52,7 +51,7 @@ private[camel] class ActorComponent(camel: Camel) extends DefaultComponent {
* The `ActorEndpoint`s are created by the [[akka.camel.internal.component.ActorComponent]].
*
* Actors are referenced using actor endpoint URIs of the following format:
- * actor://path:[actorPath]?[options]%s,
+ * [actorPath]?[options]%s,
* where [actorPath] refers to the actor path to the actor.
*
* @author Martin Krasser
@@ -60,14 +59,14 @@ private[camel] class ActorComponent(camel: Camel) extends DefaultComponent {
private[camel] class ActorEndpoint(uri: String,
comp: ActorComponent,
val path: ActorEndpointPath,
- camel: Camel) extends DefaultEndpoint(uri, comp) with ActorEndpointConfig {
+ val camel: Camel) extends DefaultEndpoint(uri, comp) with ActorEndpointConfig {
/**
* The ActorEndpoint only supports receiving messages from Camel.
* The createProducer method (not to be confused with a producer actor) is used to send messages into the endpoint.
* The ActorComponent is only there to send to actors registered through an actor endpoint URI.
* You can use an actor as an endpoint to send to in a camel route (as in, a Camel Consumer Actor). so from(someuri) to (actoruri), but not 'the other way around'.
- * Supporting createConsumer would mean that messages are consumed from an Actor endpoint in a route, and an Actor is not necessarily a producer of messages
+ * Supporting createConsumer would mean that messages are consumed from an Actor endpoint in a route, and an Actor is not necessarily a producer of messages.
* [[akka.camel.Producer]] Actors can be used for sending messages to some other uri/ component type registered in Camel.
* @throws UnsupportedOperationException this method is not supported
*/
@@ -94,16 +93,11 @@ private[camel] class ActorEndpoint(uri: String,
*/
private[camel] trait ActorEndpointConfig {
def path: ActorEndpointPath
+ def camel: Camel
- @BeanProperty var replyTimeout: Duration = 1 minute // FIXME default should be in config, not code
+ @BeanProperty var replyTimeout: Duration = camel.settings.replyTimeout
- /**
- * Whether to auto-acknowledge one-way message exchanges with (untyped) actors. This is
- * set via the autoack=true|false endpoint URI parameter. Default value is
- * true. When set to false consumer actors need to additionally
- * call Consumer.ack within Actor.receive.
- */
- @BeanProperty var autoack: Boolean = true
+ @BeanProperty var autoAck: Boolean = camel.settings.autoAck
}
/**
@@ -154,7 +148,7 @@ private[camel] class ActorProducer(val endpoint: ActorEndpoint, camel: Camel) ex
* @return (doneSync) true to continue execute synchronously, false to continue being executed asynchronously
*/
private[camel] def processExchangeAdapter(exchange: CamelExchangeAdapter, callback: AsyncCallback): Boolean = {
- if (!exchange.isOutCapable && endpoint.autoack) {
+ if (!exchange.isOutCapable && endpoint.autoAck) {
fireAndForget(messageFor(exchange), exchange)
callback.done(true)
true // done sync
@@ -218,27 +212,56 @@ private[camel] case class ActorEndpointPath private (actorPath: String) {
import ActorEndpointPath._
require(actorPath != null)
require(actorPath.length() > 0)
- def toCamelPath(config: ConsumerConfig = consumerConfig): String = "actor://path:%s?%s" format (actorPath, config.toCamelParameters)
+ require(actorPath.startsWith("akka://"))
def findActorIn(system: ActorSystem): Option[ActorRef] = {
val ref = system.actorFor(actorPath)
if (ref.isTerminated) None else Some(ref)
}
}
+
+/**
+ * Converts ActorRefs and actorPaths to URI's that point to the actor through the Camel Actor Component.
+ * Can also be used in the Java API as a helper for custom route builders. the Scala API has an implicit conversion in the camel package to
+ * directly use `to(actorRef)`. In java you could use `to(CamelPath.toUri(actorRef)`.
+ * The URI to the actor is exactly the same as the string representation of the ActorPath, except that it can also have optional URI parameters to configure the Consumer Actor.
+ */
+object CamelPath {
+ /**
+ * Converts the actorRef to a Camel URI (string) which can be used in custom routes.
+ * The created URI will have no parameters, it is purely the string representation of the actor's path.
+ * @param actorRef the actorRef
+ * @return the Camel URI to the actor.
+ */
+ def toUri(actorRef: ActorRef): String = actorRef.path.toString
+
+ /**
+ * Converts the actorRef to a Camel URI (string) which can be used in custom routes.
+ * Use this version of toUri when you know that the actorRef points to a Consumer Actor and you would like to
+ * set autoAck and replyTimeout parameters to non-default values.
+ *
+ * @param actorRef the actorRef
+ * @param autoAck parameter for a Consumer Actor, see [[akka.camel.ConsumerConfig]]
+ * @param replyTimeout parameter for a Consumer Actor, see [[akka.camel.ConsumerConfig]]
+ * @return the Camel URI to the Consumer Actor, including the parameters for auto acknowledgement and replyTimeout.
+ */
+ def toUri(actorRef: ActorRef, autoAck: Boolean, replyTimeout: Duration): String = "%s?autoAck=%s&replyTimeout=%s".format(actorRef.path.toString, autoAck, replyTimeout.toString)
+}
+
/**
* For internal use only. Companion of `ActorEndpointPath`
*/
-private[camel] object ActorEndpointPath {
- private val consumerConfig: ConsumerConfig = new ConsumerConfig {}
+private[camel] case object ActorEndpointPath {
def apply(actorRef: ActorRef): ActorEndpointPath = new ActorEndpointPath(actorRef.path.toString)
/**
- * Creates an [[akka.camel.internal.component.ActorEndpointPath]] from the remaining part of the endpoint URI (the part after the scheme, without the parameters of the URI).
- * Expects the remaining part of the URI (the actor path) in a format: path:%s
+ * Creates an [[akka.camel.internal.component.ActorEndpointPath]] from the uri
+ * Expects the uri in the akka [[akka.actor.ActorPath]] format, i.e 'akka://system/user/someactor'.
+ * parameters can be optionally added to the actor path to indicate auto-acknowledgement and replyTimeout for a [[akka.camel.Consumer]] actor.
*/
def fromCamelPath(camelPath: String): ActorEndpointPath = camelPath match {
- case id if id startsWith "path:" ⇒ new ActorEndpointPath(id substring 5)
- case _ ⇒ throw new IllegalArgumentException("Invalid path: [%s] - should be path:" format camelPath)
+ case id if id startsWith "akka://" ⇒ new ActorEndpointPath(id.split('?')(0))
+ case _ ⇒ throw new IllegalArgumentException("Invalid path: [%s] - should be an actorPath starting with 'akka://', optionally followed by options" format camelPath)
}
-}
\ No newline at end of file
+}
diff --git a/akka-camel/src/main/scala/akka/camel/javaapi/UntypedConsumer.scala b/akka-camel/src/main/scala/akka/camel/javaapi/UntypedConsumerActor.scala
similarity index 50%
rename from akka-camel/src/main/scala/akka/camel/javaapi/UntypedConsumer.scala
rename to akka-camel/src/main/scala/akka/camel/javaapi/UntypedConsumerActor.scala
index a4671583bb..3cef3d285a 100644
--- a/akka-camel/src/main/scala/akka/camel/javaapi/UntypedConsumer.scala
+++ b/akka-camel/src/main/scala/akka/camel/javaapi/UntypedConsumerActor.scala
@@ -16,19 +16,26 @@ abstract class UntypedConsumerActor extends UntypedActor with Consumer {
final def endpointUri: String = getEndpointUri
/**
- * Returns the Camel endpoint URI to consume messages from.
+ * ''Java API'': Returns the Camel endpoint URI to consume messages from.
*/
def getEndpointUri(): String
/**
- * Returns the [[org.apache.camel.CamelContext]]
+ * ''Java API'': Returns the [[org.apache.camel.CamelContext]]
* @return the CamelContext
*/
- protected def getCamelContext: CamelContext = camelContext
+ protected def getCamelContext(): CamelContext = camelContext
/**
- * Returns the [[org.apache.camel.ProducerTemplate]]
+ * ''Java API'': Returns the [[org.apache.camel.ProducerTemplate]]
* @return the ProducerTemplate
*/
- protected def getProducerTemplate: ProducerTemplate = camel.template
+ protected def getProducerTemplate(): ProducerTemplate = camel.template
+
+ /**
+ * ''Java API'': Returns the [[akka.camel.Activation]] interface
+ * that can be used to wait on activation or de-activation of Camel endpoints.
+ * @return the Activation interface
+ */
+ protected def getActivation(): Activation = camel
}
diff --git a/akka-camel/src/main/scala/akka/camel/javaapi/UntypedProducerActor.scala b/akka-camel/src/main/scala/akka/camel/javaapi/UntypedProducerActor.scala
index f44daf0725..38c4cf276a 100644
--- a/akka-camel/src/main/scala/akka/camel/javaapi/UntypedProducerActor.scala
+++ b/akka-camel/src/main/scala/akka/camel/javaapi/UntypedProducerActor.scala
@@ -47,7 +47,7 @@ abstract class UntypedProducerActor extends UntypedActor with ProducerSupport {
/**
* Default implementation of UntypedActor.onReceive
*/
- def onReceive(message: Any): Unit = produce(message)
+ final def onReceive(message: Any): Unit = produce(message)
/**
* Returns the Camel endpoint URI to produce messages to.
@@ -70,4 +70,11 @@ abstract class UntypedProducerActor extends UntypedActor with ProducerSupport {
* Returns the ProducerTemplate.
*/
def getProducerTemplate(): ProducerTemplate = camel.template
+
+ /**
+ * ''Java API'': Returns the [[akka.camel.Activation]] interface
+ * that can be used to wait on activation or de-activation of Camel endpoints.
+ * @return the Activation interface
+ */
+ def getActivation(): Activation = camel
}
diff --git a/akka-camel/src/main/scala/akka/package.scala b/akka-camel/src/main/scala/akka/package.scala
index c97a8e5b64..e1f2c0756e 100644
--- a/akka-camel/src/main/scala/akka/package.scala
+++ b/akka-camel/src/main/scala/akka/package.scala
@@ -16,5 +16,5 @@ package object camel {
* from("file://data/input/CamelConsumer").to(actor)
* }}}
*/
- implicit def toActorRouteDefinition(definition: ProcessorDefinition[_]) = new ActorRouteDefinition(definition)
+ implicit def toActorRouteDefinition[T <: ProcessorDefinition[T]](definition: ProcessorDefinition[T]) = new ActorRouteDefinition(definition)
}
\ No newline at end of file
diff --git a/akka-camel/src/test/java/akka/camel/CustomRouteTestBase.java b/akka-camel/src/test/java/akka/camel/CustomRouteTestBase.java
new file mode 100644
index 0000000000..517557f0a7
--- /dev/null
+++ b/akka-camel/src/test/java/akka/camel/CustomRouteTestBase.java
@@ -0,0 +1,207 @@
+package akka.camel;
+
+import akka.actor.*;
+import akka.camel.internal.component.CamelPath;
+import akka.camel.javaapi.UntypedConsumerActor;
+import akka.camel.javaapi.UntypedProducerActor;
+import scala.concurrent.util.FiniteDuration;
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.Exchange;
+import org.apache.camel.Predicate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Before;
+import org.junit.After;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+public class CustomRouteTestBase {
+ private static Camel camel;
+ private static ActorSystem system;
+
+ @Before
+ public void before() {
+ system = ActorSystem.create("test");
+ camel = (Camel) CamelExtension.get(system);
+ }
+
+ @After
+ public void after() {
+ system.shutdown();
+ }
+
+ @Test
+ public void testCustomProducerRoute() throws Exception {
+ MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockProducer", MockEndpoint.class);
+ ActorRef producer = system.actorOf(new Props(MockEndpointProducer.class), "mockEndpoint");
+ camel.context().addRoutes(new CustomRouteBuilder("direct:test",producer));
+ camel.template().sendBody("direct:test", "test");
+ assertMockEndpoint(mockEndpoint);
+ system.stop(producer);
+ }
+
+ @Test
+ public void testCustomProducerUriRoute() throws Exception {
+ MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockProducerUri", MockEndpoint.class);
+ ActorRef producer = system.actorOf(new Props(new UntypedActorFactory(){
+ public Actor create() {
+ return new EndpointProducer("mock:mockProducerUri");
+ }
+ }), "mockEndpointUri");
+ camel.context().addRoutes(new CustomRouteBuilder("direct:test",producer));
+ camel.template().sendBody("direct:test", "test");
+ assertMockEndpoint(mockEndpoint);
+ system.stop(producer);
+ }
+
+ @Test
+ public void testCustomConsumerRoute() throws Exception {
+ MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockConsumer", MockEndpoint.class);
+ ActorRef consumer = system.actorOf(new Props(TestConsumer.class), "testConsumer");
+ camel.awaitActivation(consumer,new FiniteDuration(10, TimeUnit.SECONDS));
+ camel.context().addRoutes(new CustomRouteBuilder("direct:testRouteConsumer",consumer));
+ camel.template().sendBody("direct:testRouteConsumer", "test");
+ assertMockEndpoint(mockEndpoint);
+ system.stop(consumer);
+ }
+
+ @Test
+ public void testCustomAckConsumerRoute() throws Exception {
+ MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockAck", MockEndpoint.class);
+ ActorRef consumer = system.actorOf(new Props( new UntypedActorFactory(){
+ public Actor create() {
+ return new TestAckConsumer("direct:testConsumerAck","mock:mockAck");
+ }
+ }), "testConsumerAck");
+ camel.awaitActivation(consumer,new FiniteDuration(10, TimeUnit.SECONDS));
+ camel.context().addRoutes(new CustomRouteBuilder("direct:testAck", consumer, false, new FiniteDuration(10, TimeUnit.SECONDS)));
+ camel.template().sendBody("direct:testAck", "test");
+ assertMockEndpoint(mockEndpoint);
+ system.stop(consumer);
+ }
+
+ @Test
+ public void testCustomAckConsumerRouteFromUri() throws Exception {
+ MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockAckUri", MockEndpoint.class);
+ ActorRef consumer = system.actorOf(new Props( new UntypedActorFactory(){
+ public Actor create() {
+ return new TestAckConsumer("direct:testConsumerAckFromUri","mock:mockAckUri");
+ }
+ }), "testConsumerAckUri");
+ camel.awaitActivation(consumer,new FiniteDuration(10, TimeUnit.SECONDS));
+ camel.context().addRoutes(new CustomRouteBuilder("direct:testAckFromUri","akka://test/user/testConsumerAckUri?autoAck=false"));
+ camel.template().sendBody("direct:testAckFromUri", "test");
+ assertMockEndpoint(mockEndpoint);
+ system.stop(consumer);
+ }
+
+ @Test(expected=CamelExecutionException.class)
+ public void testCustomTimeoutConsumerRoute() throws Exception {
+ ActorRef consumer = system.actorOf(new Props( new UntypedActorFactory(){
+ public Actor create() {
+ return new TestAckConsumer("direct:testConsumerException","mock:mockException");
+ }
+ }), "testConsumerException");
+ camel.awaitActivation(consumer, new FiniteDuration(10, TimeUnit.SECONDS));
+ camel.context().addRoutes(new CustomRouteBuilder("direct:testException", consumer, false, new FiniteDuration(0, TimeUnit.SECONDS)));
+ camel.template().sendBody("direct:testException", "test");
+ }
+
+ private void assertMockEndpoint(MockEndpoint mockEndpoint) throws InterruptedException {
+ mockEndpoint.expectedMessageCount(1);
+ mockEndpoint.expectedMessagesMatches(new Predicate() {
+ public boolean matches(Exchange exchange) {
+ return exchange.getIn().getBody().equals("test");
+ }
+ });
+ mockEndpoint.assertIsSatisfied();
+ }
+
+ public static class CustomRouteBuilder extends RouteBuilder {
+ private String uri;
+ private String fromUri;
+
+ public CustomRouteBuilder(String from, String to) {
+ fromUri = from;
+ uri = to;
+ }
+
+ public CustomRouteBuilder(String from, ActorRef actor) {
+ fromUri = from;
+ uri = CamelPath.toUri(actor);
+ }
+
+ public CustomRouteBuilder(String from, ActorRef actor, boolean autoAck, FiniteDuration replyTimeout) {
+ fromUri = from;
+ uri = CamelPath.toUri(actor, autoAck, replyTimeout);
+ }
+
+ @Override
+ public void configure() throws Exception {
+ from(fromUri).to(uri);
+ }
+ }
+
+
+ public static class TestConsumer extends UntypedConsumerActor {
+ @Override
+ public String getEndpointUri() {
+ return "direct:testconsumer";
+ }
+
+ @Override
+ public void onReceive(Object message) {
+ this.getProducerTemplate().sendBody("mock:mockConsumer","test");
+ }
+ }
+
+ public static class EndpointProducer extends UntypedProducerActor {
+ private String uri;
+
+ public EndpointProducer(String uri) {
+ this.uri = uri;
+ }
+
+ public String getEndpointUri() {
+ return uri;
+ }
+
+ @Override
+ public boolean isOneway() {
+ return true;
+ }
+ }
+
+ public static class MockEndpointProducer extends UntypedProducerActor {
+ public String getEndpointUri() {
+ return "mock:mockProducer";
+ }
+
+ @Override
+ public boolean isOneway() {
+ return true;
+ }
+ }
+
+ public static class TestAckConsumer extends UntypedConsumerActor {
+ private String myuri;
+ private String to;
+
+ public TestAckConsumer(String uri, String to){
+ myuri = uri;
+ this.to = to;
+ }
+
+ @Override
+ public String getEndpointUri() {
+ return myuri;
+ }
+
+ @Override
+ public void onReceive(Object message) {
+ this.getProducerTemplate().sendBody(to, "test");
+ getSender().tell(Ack.getInstance());
+ }
+ }
+}
diff --git a/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala b/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala
index a016dadb1a..2d8d780264 100644
--- a/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala
+++ b/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala
@@ -90,6 +90,18 @@ class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedC
camel.routeCount must be(0)
}
+ "Consumer must register on uri passed in through constructor" in {
+ val consumer = start(new TestActor("direct://test"))
+ camel.awaitActivation(consumer, defaultTimeout seconds)
+
+ camel.routeCount must be > (0)
+ camel.routes.get(0).getEndpoint.getEndpointUri must be("direct://test")
+ system.stop(consumer)
+ camel.awaitDeactivation(consumer, defaultTimeout seconds)
+
+ camel.routeCount must be(0)
+ }
+
"Error passing consumer supports error handling through route modification" in {
start(new ErrorThrowingConsumer("direct:error-handler-test") with ErrorPassing {
override def onRouteDefinition(rd: RouteDefinition) = {
@@ -171,5 +183,5 @@ trait ErrorPassing {
}
trait ManualAckConsumer extends Consumer {
- override def autoack = false
+ override def autoAck = false
}
diff --git a/akka-camel/src/test/scala/akka/camel/CustomRouteTest.scala b/akka-camel/src/test/scala/akka/camel/CustomRouteTest.scala
new file mode 100644
index 0000000000..f9b54eb8e8
--- /dev/null
+++ b/akka-camel/src/test/scala/akka/camel/CustomRouteTest.scala
@@ -0,0 +1,5 @@
+package akka.camel
+
+import org.scalatest.junit.JUnitSuite
+
+class CustomRouteTest extends CustomRouteTestBase with JUnitSuite
diff --git a/akka-camel/src/test/scala/akka/camel/DefaultCamelTest.scala b/akka-camel/src/test/scala/akka/camel/DefaultCamelTest.scala
index abc50b49e8..dab91623d4 100644
--- a/akka-camel/src/test/scala/akka/camel/DefaultCamelTest.scala
+++ b/akka-camel/src/test/scala/akka/camel/DefaultCamelTest.scala
@@ -12,19 +12,26 @@ import akka.actor.ActorSystem
import org.apache.camel.{ CamelContext, ProducerTemplate }
import org.scalatest.WordSpec
import akka.event.LoggingAdapter
+import akka.actor.ActorSystem.Settings
+import com.typesafe.config.ConfigFactory
class DefaultCamelTest extends WordSpec with SharedCamelSystem with MustMatchers with MockitoSugar {
import org.mockito.Mockito.{ when, verify }
+ val sys = mock[ActorSystem]
+ val config = ConfigFactory.defaultReference()
+ when(sys.settings) thenReturn (new Settings(this.getClass.getClassLoader, config, "mocksystem"))
+ when(sys.name) thenReturn ("mocksystem")
- def camelWitMocks = new DefaultCamel(mock[ActorSystem]) {
+ def camelWithMocks = new DefaultCamel(sys) {
override val log = mock[LoggingAdapter]
override lazy val template = mock[ProducerTemplate]
override lazy val context = mock[CamelContext]
+ override val settings = mock[CamelSettings]
}
"during shutdown, when both context and template fail to shutdown" when {
- val camel = camelWitMocks
+ val camel = camelWithMocks
when(camel.context.stop()) thenThrow new RuntimeException("context")
when(camel.template.stop()) thenThrow new RuntimeException("template")
@@ -44,7 +51,7 @@ class DefaultCamelTest extends WordSpec with SharedCamelSystem with MustMatchers
}
"during start, if template fails to start, it will stop the context" in {
- val camel = camelWitMocks
+ val camel = camelWithMocks
when(camel.template.start()) thenThrow new RuntimeException
diff --git a/akka-camel/src/test/scala/akka/camel/TestSupport.scala b/akka-camel/src/test/scala/akka/camel/TestSupport.scala
index 32159891be..045f75e0ef 100644
--- a/akka-camel/src/test/scala/akka/camel/TestSupport.scala
+++ b/akka-camel/src/test/scala/akka/camel/TestSupport.scala
@@ -40,6 +40,7 @@ private[camel] object TestSupport {
}
def routeCount = camel.context.getRoutes().size()
+ def routes = camel.context.getRoutes
}
@deprecated
diff --git a/akka-camel/src/test/scala/akka/camel/internal/component/ActorComponentConfigurationTest.scala b/akka-camel/src/test/scala/akka/camel/internal/component/ActorComponentConfigurationTest.scala
index bcec86b16d..09f9c1aa62 100644
--- a/akka-camel/src/test/scala/akka/camel/internal/component/ActorComponentConfigurationTest.scala
+++ b/akka-camel/src/test/scala/akka/camel/internal/component/ActorComponentConfigurationTest.scala
@@ -14,15 +14,15 @@ import org.scalatest.WordSpec
class ActorComponentConfigurationTest extends WordSpec with MustMatchers with SharedCamelSystem {
- val component: Component = camel.context.getComponent("actor")
+ val component: Component = camel.context.getComponent("akka")
"Endpoint url config must be correctly parsed" in {
- val actorEndpointConfig = component.createEndpoint("actor://path:akka://test/user/$a?autoack=false&replyTimeout=987000000+nanos").asInstanceOf[ActorEndpointConfig]
+ val actorEndpointConfig = component.createEndpoint("akka://test/user/$a?autoAck=false&replyTimeout=987000000+nanos").asInstanceOf[ActorEndpointConfig]
actorEndpointConfig must have(
- 'endpointUri("actor://path:akka://test/user/$a?autoack=false&replyTimeout=987000000+nanos"),
- 'path(ActorEndpointPath.fromCamelPath("path:akka://test/user/$a")),
- 'autoack(false),
+ 'endpointUri("akka://test/user/$a?autoAck=false&replyTimeout=987000000+nanos"),
+ 'path(ActorEndpointPath.fromCamelPath("akka://test/user/$a")),
+ 'autoAck(false),
'replyTimeout(987000000 nanos))
}
diff --git a/akka-camel/src/test/scala/akka/camel/internal/component/ActorEndpointPathTest.scala b/akka-camel/src/test/scala/akka/camel/internal/component/ActorEndpointPathTest.scala
index eddec13b8b..ada4497f29 100644
--- a/akka-camel/src/test/scala/akka/camel/internal/component/ActorEndpointPathTest.scala
+++ b/akka-camel/src/test/scala/akka/camel/internal/component/ActorEndpointPathTest.scala
@@ -12,16 +12,21 @@ import org.scalatest.WordSpec
class ActorEndpointPathTest extends WordSpec with SharedCamelSystem with MustMatchers with MockitoSugar {
- def find(path: String) = ActorEndpointPath.fromCamelPath("path:" + path).findActorIn(system)
+ def find(path: String) = ActorEndpointPath.fromCamelPath(path).findActorIn(system)
"findActorIn returns Some(actor ref) if actor exists" in {
- val path = system.actorOf(Props(behavior = ctx ⇒ { case _ ⇒ {} })).path
+ val path = system.actorOf(Props(behavior = ctx ⇒ { case _ ⇒ {} }), "knownactor").path
find(path.toString) must be('defined)
}
"findActorIn returns None" when {
- "invalid path" in { find("some_invalid_path") must be(None) }
- "non existing valid path" in { find("akka://system/user/$a") must be(None) }
+ "non existing valid path" in { find("akka://system/user/unknownactor") must be(None) }
+ }
+ "fromCamelPath throws IllegalArgumentException" when {
+ "invalid path" in {
+ intercept[IllegalArgumentException] {
+ find("invalidpath")
+ }
+ }
}
-
}
\ No newline at end of file
diff --git a/akka-camel/src/test/scala/akka/camel/internal/component/ActorProducerTest.scala b/akka-camel/src/test/scala/akka/camel/internal/component/ActorProducerTest.scala
index 1a43d041c7..b874849fd5 100644
--- a/akka-camel/src/test/scala/akka/camel/internal/component/ActorProducerTest.scala
+++ b/akka-camel/src/test/scala/akka/camel/internal/component/ActorProducerTest.scala
@@ -7,9 +7,9 @@ package akka.camel.internal.component
import language.postfixOps
import org.scalatest.mock.MockitoSugar
-import org.mockito.Matchers.{ eq ⇒ the, any }
+import org.mockito.Matchers.any
import org.mockito.Mockito._
-import org.apache.camel.AsyncCallback
+import org.apache.camel.{ CamelContext, ProducerTemplate, AsyncCallback }
import java.util.concurrent.atomic.AtomicBoolean
import scala.concurrent.util.duration._
import scala.concurrent.util.Duration
@@ -17,13 +17,16 @@ import akka.testkit.{ TestKit, TestProbe }
import java.lang.String
import akka.actor.{ ActorRef, Props, ActorSystem, Actor }
import akka.camel._
-import internal.CamelExchangeAdapter
+import internal.{ DefaultCamel, CamelExchangeAdapter }
import org.scalatest.{ Suite, WordSpec, BeforeAndAfterAll, BeforeAndAfterEach }
import akka.camel.TestSupport._
import java.util.concurrent.{ TimeoutException, CountDownLatch, TimeUnit }
import org.mockito.{ ArgumentMatcher, Matchers, Mockito }
import org.scalatest.matchers.MustMatchers
import akka.actor.Status.Failure
+import com.typesafe.config.ConfigFactory
+import akka.actor.ActorSystem.Settings
+import akka.event.LoggingAdapter
class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with MustMatchers with ActorProducerFixture {
@@ -271,12 +274,26 @@ trait ActorProducerFixture extends MockitoSugar with BeforeAndAfterAll with Befo
asyncCallback = createAsyncCallback
probe = TestProbe()
- camel = mock[Camel]
+
+ val sys = mock[ActorSystem]
+ val config = ConfigFactory.defaultReference()
+ when(sys.dispatcher) thenReturn system.dispatcher
+ when(sys.settings) thenReturn (new Settings(this.getClass.getClassLoader, config, "mocksystem"))
+ when(sys.name) thenReturn ("mocksystem")
+
+ def camelWithMocks = new DefaultCamel(sys) {
+ override val log = mock[LoggingAdapter]
+ override lazy val template = mock[ProducerTemplate]
+ override lazy val context = mock[CamelContext]
+ override val settings = mock[CamelSettings]
+ }
+ camel = camelWithMocks
+
exchange = mock[CamelExchangeAdapter]
callback = mock[AsyncCallback]
actorEndpointPath = mock[ActorEndpointPath]
actorComponent = mock[ActorComponent]
- producer = new ActorProducer(config(), camel)
+ producer = new ActorProducer(configure(), camel)
message = CamelMessage(null, null)
}
@@ -288,7 +305,7 @@ trait ActorProducerFixture extends MockitoSugar with BeforeAndAfterAll with Befo
def given(actor: ActorRef = probe.ref, outCapable: Boolean = true, autoAck: Boolean = true, replyTimeout: Duration = Int.MaxValue seconds) = {
prepareMocks(actor, outCapable = outCapable)
- new ActorProducer(config(isAutoAck = autoAck, _replyTimeout = replyTimeout), camel)
+ new ActorProducer(configure(isAutoAck = autoAck, _replyTimeout = replyTimeout), camel)
}
def createAsyncCallback = new TestAsyncCallback
@@ -308,29 +325,23 @@ trait ActorProducerFixture extends MockitoSugar with BeforeAndAfterAll with Befo
callbackReceived.countDown()
}
- private[this] def valueWithin(implicit timeout: Duration) = {
+ private[this] def valueWithin(implicit timeout: Duration) =
if (!callbackReceived.await(timeout.toNanos, TimeUnit.NANOSECONDS)) fail("Callback not received!")
- callbackValue.get
- }
+ else callbackValue.get
- def expectDoneSyncWithin(implicit timeout: Duration) {
- if (!valueWithin(timeout)) fail("Expected to be done Synchronously")
- }
- def expectDoneAsyncWithin(implicit timeout: Duration) {
- if (valueWithin(timeout)) fail("Expected to be done Asynchronously")
- }
+ def expectDoneSyncWithin(implicit timeout: Duration): Unit = if (!valueWithin(timeout)) fail("Expected to be done Synchronously")
+ def expectDoneAsyncWithin(implicit timeout: Duration): Unit = if (valueWithin(timeout)) fail("Expected to be done Asynchronously")
}
- def config(endpointUri: String = "test-uri", isAutoAck: Boolean = true, _replyTimeout: Duration = Int.MaxValue seconds) = {
+ def configure(endpointUri: String = "test-uri", isAutoAck: Boolean = true, _replyTimeout: Duration = Int.MaxValue seconds) = {
val endpoint = new ActorEndpoint(endpointUri, actorComponent, actorEndpointPath, camel)
- endpoint.autoack = isAutoAck
+ endpoint.autoAck = isAutoAck
endpoint.replyTimeout = _replyTimeout
endpoint
}
def prepareMocks(actor: ActorRef, message: CamelMessage = message, outCapable: Boolean) {
- when(camel.system) thenReturn system
when(actorEndpointPath.findActorIn(any[ActorSystem])) thenReturn Option(actor)
when(exchange.toRequestMessage(any[Map[String, Any]])) thenReturn message
when(exchange.isOutCapable) thenReturn outCapable
diff --git a/akka-docs/java/camel.rst b/akka-docs/java/camel.rst
new file mode 100644
index 0000000000..429454f25d
--- /dev/null
+++ b/akka-docs/java/camel.rst
@@ -0,0 +1,582 @@
+
+.. _camel-java:
+
+#############
+ Camel (Java)
+#############
+
+Additional Resources
+====================
+For an introduction to akka-camel 2, see also the Peter Gabryanczyk's talk `Migrating akka-camel module to Akka 2.x`_.
+
+For an introduction to akka-camel 1, see also the `Appendix E - Akka and Camel`_
+(pdf) of the book `Camel in Action`_.
+
+.. _Appendix E - Akka and Camel: http://www.manning.com/ibsen/appEsample.pdf
+.. _Camel in Action: http://www.manning.com/ibsen/
+.. _Migrating akka-camel module to Akka 2.x: http://skillsmatter.com/podcast/scala/akka-2-x
+
+Other, more advanced external articles (for version 1) are:
+
+* `Akka Consumer Actors: New Features and Best Practices `_
+* `Akka Producer Actors: New Features and Best Practices `_
+
+Introduction
+============
+
+The akka-camel module allows Untyped Actors to receive
+and send messages over a great variety of protocols and APIs.
+In addition to the native Scala and Java actor API, actors can now exchange messages with other systems over large number
+of protocols and APIs such as HTTP, SOAP, TCP, FTP, SMTP or JMS, to mention a
+few. At the moment, approximately 80 protocols and APIs are supported.
+
+Apache Camel
+------------
+The akka-camel module is based on `Apache Camel`_, a powerful and light-weight
+integration framework for the JVM. For an introduction to Apache Camel you may
+want to read this `Apache Camel article`_. Camel comes with a
+large number of `components`_ that provide bindings to different protocols and
+APIs. The `camel-extra`_ project provides further components.
+
+.. _Apache Camel: http://camel.apache.org/
+.. _Apache Camel article: http://architects.dzone.com/articles/apache-camel-integration
+.. _components: http://camel.apache.org/components.html
+.. _camel-extra: http://code.google.com/p/camel-extra/
+
+Consumer
+--------
+Here's an example of using Camel's integration components in Akka.
+
+.. includecode:: code/docs/camel/MyEndpoint.java#Consumer-mina
+
+The above example exposes an actor over a TCP endpoint via Apache
+Camel's `Mina component`_. The actor implements the `getEndpointUri` method to define
+an endpoint from which it can receive messages. After starting the actor, TCP
+clients can immediately send messages to and receive responses from that
+actor. If the message exchange should go over HTTP (via Camel's `Jetty
+component`_), the actor's `getEndpointUri` method should return a different URI, for instance "jetty:http://localhost:8877/example".
+In the above case an extra constructor is added that can set the endpoint URI, which would result in
+the `getEndpointUri` returning the URI that was set using this constructor.
+
+.. _Mina component: http://camel.apache.org/mina.html
+.. _Jetty component: http://camel.apache.org/jetty.html
+
+Producer
+--------
+Actors can also trigger message exchanges with external systems i.e. produce to
+Camel endpoints.
+
+.. includecode:: code/docs/camel/Orders.java#Producer
+
+In the above example, any message sent to this actor will be sent to
+the JMS queue ``Orders``. Producer actors may choose from the same set of Camel
+components as Consumer actors do.
+Below an example of how to send a message to the Orders producer.
+
+.. includecode:: code/docs/camel/ProducerTestBase.java#TellProducer
+
+CamelMessage
+------------
+The number of Camel components is constantly increasing. The akka-camel module
+can support these in a plug-and-play manner. Just add them to your application's
+classpath, define a component-specific endpoint URI and use it to exchange
+messages over the component-specific protocols or APIs. This is possible because
+Camel components bind protocol-specific message formats to a Camel-specific
+`normalized message format`__. The normalized message format hides
+protocol-specific details from Akka and makes it therefore very easy to support
+a large number of protocols through a uniform Camel component interface. The
+akka-camel module further converts mutable Camel messages into immutable
+representations which are used by Consumer and Producer actors for pattern
+matching, transformation, serialization or storage. In the above example of the Orders Producer,
+the XML message is put in the body of a newly created Camel Message with an empty set of headers.
+You can also create a CamelMessage yourself with the appropriate body and headers as you see fit.
+
+__ https://svn.apache.org/repos/asf/camel/tags/camel-2.8.0/camel-core/src/main/java/org/apache/camel/Message.java
+
+CamelExtension
+--------------
+The akka-camel module is implemented as an Akka Extension, the ``CamelExtension`` object.
+Extensions will only be loaded once per ``ActorSystem``, which will be managed by Akka.
+The ``CamelExtension`` object provides access to the `Camel`_ interface.
+The `Camel`_ interface in turn provides access to two important Apache Camel objects, the `CamelContext`_ and the `ProducerTemplate`_.
+Below you can see how you can get access to these Apache Camel objects.
+
+.. includecode:: code/docs/camel/CamelExtensionTestBase.java#CamelExtension
+
+One ``CamelExtension`` is only loaded once for every one ``ActorSystem``, which makes it safe to call the ``CamelExtension`` at any point in your code to get to the
+Apache Camel objects associated with it. There is one `CamelContext`_ and one `ProducerTemplate`_ for every one ``ActorSystem`` that uses a ``CamelExtension``.
+Below an example on how to add the ActiveMQ component to the `CamelContext`_, which is required when you would like to use the ActiveMQ component.
+
+.. includecode:: code/docs/camel/CamelExtensionTestBase.java#CamelExtensionAddComponent
+
+The `CamelContext`_ joins the lifecycle of the ``ActorSystem`` and ``CamelExtension`` it is associated with; the `CamelContext`_ is started when
+the ``CamelExtension`` is created, and it is shut down when the associated ``ActorSystem`` is shut down. The same is true for the `ProducerTemplate`_.
+
+The ``CamelExtension`` is used by both `Producer` and `Consumer` actors to interact with Apache Camel internally.
+You can access the ``CamelExtension`` inside a `Producer` or a `Consumer` using the ``camel`` method, or get straight at the `CamelContext`
+using the ``getCamelContext`` method or to the `ProducerTemplate` using the `getProducerTemplate` method.
+Actors are created and started asynchronously. When a `Consumer` actor is created, the `Consumer` is published at its Camel endpoint
+(more precisely, the route is added to the `CamelContext`_ from the `Endpoint`_ to the actor).
+When a `Producer` actor is created, a `SendProcessor`_ and `Endpoint`_ are created so that the Producer can send messages to it.
+Publication is done asynchronously; setting up an endpoint may still be in progress after you have
+requested the actor to be created. Some Camel components can take a while to startup, and in some cases you might want to know when the endpoints are activated and ready to be used.
+The `Camel`_ interface allows you to find out when the endpoint is activated or deactivated.
+
+.. includecode:: code/docs/camel/ActivationTestBase.java#CamelActivation
+
+The above code shows that you can get a ``Future`` to the activation of the route from the endpoint to the actor, or you can wait in a blocking fashion on the activation of the route.
+An ``ActivationTimeoutException`` is thrown if the endpoint could not be activated within the specified timeout. Deactivation works in a similar fashion:
+
+.. includecode:: code/docs/camel/ActivationTestBase.java#CamelDeactivation
+
+Deactivation of a Consumer or a Producer actor happens when the actor is terminated. For a Consumer, the route to the actor is stopped. For a Producer, the `SendProcessor`_ is stopped.
+A ``DeActivationTimeoutException`` is thrown if the associated camel objects could not be deactivated within the specified timeout.
+
+.. _Camel: http://github.com/akka/akka/blob/master/akka-camel/src/main/scala/akka/camel/Camel.scala
+.. _CamelContext: https://svn.apache.org/repos/asf/camel/tags/camel-2.8.0/camel-core/src/main/java/org/apache/camel/CamelContext.java
+.. _ProducerTemplate: https://svn.apache.org/repos/asf/camel/tags/camel-2.8.0/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java
+.. _SendProcessor: https://svn.apache.org/repos/asf/camel/tags/camel-2.8.0/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
+.. _Endpoint: https://svn.apache.org/repos/asf/camel/tags/camel-2.8.0/camel-core/src/main/java/org/apache/camel/Endpoint.java
+
+Consumer Actors
+================
+
+For objects to receive messages, they must inherit from the `UntypedConsumerActor`_
+class. For example, the following actor class (Consumer1) implements the
+`getEndpointUri` method, which is declared in the `UntypedConsumerActor`_ class, in order to receive
+messages from the ``file:data/input/actor`` Camel endpoint.
+
+.. _UntypedConsumerActor: http://github.com/akka/akka/blob/master/akka-camel/src/main/scala/akka/camel/javaapi/UntypedConsumer.scala
+
+.. includecode:: code/docs/camel/Consumer1.java#Consumer1
+
+Whenever a file is put into the data/input/actor directory, its content is
+picked up by the Camel `file component`_ and sent as message to the
+actor. Messages consumed by actors from Camel endpoints are of type
+`CamelMessage`_. These are immutable representations of Camel messages.
+
+.. _file component: http://camel.apache.org/file2.html
+.. _Message: http://github.com/akka/akka/blob/master/akka-camel/src/main/scala/akka/camel/CamelMessage.scala
+
+
+Here's another example that sets the endpointUri to
+``jetty:http://localhost:8877/camel/default``. It causes Camel's `Jetty
+component`_ to start an embedded `Jetty`_ server, accepting HTTP connections
+from localhost on port 8877.
+
+.. _Jetty component: http://camel.apache.org/jetty.html
+.. _Jetty: http://www.eclipse.org/jetty/
+
+.. includecode:: code/docs/camel/Consumer2.java#Consumer2
+
+After starting the actor, clients can send messages to that actor by POSTing to
+``http://localhost:8877/camel/default``. The actor sends a response by using the
+getSender().tell method. For returning a message body and headers to the HTTP
+client the response type should be `CamelMessage`_. For any other response type, a
+new CamelMessage object is created by akka-camel with the actor response as message
+body.
+
+.. _Message: http://github.com/akka/akka/blob/master/akka-camel/src/main/scala/akka/camel/CamelMessage.scala
+
+.. _camel-acknowledgements-java:
+
+Delivery acknowledgements
+-------------------------
+
+With in-out message exchanges, clients usually know that a message exchange is
+done when they receive a reply from a consumer actor. The reply message can be a
+CamelMessage (or any object which is then internally converted to a CamelMessage) on
+success, and a Failure message on failure.
+
+With in-only message exchanges, by default, an exchange is done when a message
+is added to the consumer actor's mailbox. Any failure or exception that occurs
+during processing of that message by the consumer actor cannot be reported back
+to the endpoint in this case. To allow consumer actors to positively or
+negatively acknowledge the receipt of a message from an in-only message
+exchange, they need to override the ``autoAck`` method to return false.
+In this case, consumer actors must reply either with a
+special akka.camel.Ack message (positive acknowledgement) or a akka.actor.Status.Failure (negative
+acknowledgement).
+
+.. includecode:: code/docs/camel/Consumer3.java#Consumer3
+
+.. _camel-timeout-java:
+
+Consumer timeout
+----------------
+
+Camel Exchanges (and their corresponding endpoints) that support two-way communications need to wait for a response from
+an actor before returning it to the initiating client.
+For some endpoint types, timeout values can be defined in an endpoint-specific
+way which is described in the documentation of the individual `Camel
+components`_. Another option is to configure timeouts on the level of consumer actors.
+
+.. _Camel components: http://camel.apache.org/components.html
+
+Two-way communications between a Camel endpoint and an actor are
+initiated by sending the request message to the actor with the `ask`_ pattern
+and the actor replies to the endpoint when the response is ready. The ask request to the actor can timeout, which will
+result in the `Exchange`_ failing with a TimeoutException set on the failure of the `Exchange`_.
+The timeout on the consumer actor can be overridden with the ``replyTimeout``, as shown below.
+
+.. includecode:: code/docs/camel/Consumer4.java#Consumer4
+.. _Exchange: https://svn.apache.org/repos/asf/camel/tags/camel-2.8.0/camel-core/src/main/java/org/apache/camel/Exchange.java
+.. _ask: http://github.com/akka/akka/blob/master/akka-actor/src/main/scala/akka/pattern/Patterns.scala
+
+Producer Actors
+===============
+
+For sending messages to Camel endpoints, actors need to inherit from the `UntypedProducerActor`_ class and implement the getEndpointUri method.
+
+.. includecode:: code/docs/camel/Producer1.java#Producer1
+
+Producer1 inherits a default implementation of the onReceive method from the
+`UntypedProducerActor`_ class. To customize a producer actor's default behavior you must override the `UntypedProducerActor`_.onTransformResponse and
+`UntypedProducerActor`_.onTransformOutgoingMessage methods. This is explained later in more detail.
+Producer Actors cannot override the `UntypedProducerActor`_.onReceive method.
+
+Any message sent to a Producer actor will be sent to
+the associated Camel endpoint, in the above example to
+``http://localhost:8080/news``. The `UntypedProducerActor`_ always sends messages asynchronously. Response messages (if supported by the
+configured endpoint) will, by default, be returned to the original sender. The
+following example uses the ask pattern to send a message to a
+Producer actor and waits for a response.
+
+.. includecode:: code/docs/camel/ProducerTestBase.java#AskProducer
+
+The future contains the response CamelMessage, or an ``AkkaCamelException`` when an error occurred, which contains the headers of the response.
+
+.. _camel-custom-processing-java:
+
+Custom Processing
+-----------------
+
+Instead of replying to the initial sender, producer actors can implement custom
+response processing by overriding the onRouteResponse method. In the following example, the response
+message is forwarded to a target actor instead of being replied to the original
+sender.
+
+.. includecode:: code/docs/camel/ResponseReceiver.java#RouteResponse
+.. includecode:: code/docs/camel/Forwarder.java#RouteResponse
+.. includecode:: code/docs/camel/OnRouteResponseTestBase.java#RouteResponse
+
+Before producing messages to endpoints, producer actors can pre-process them by
+overriding the `UntypedProducerActor`_.onTransformOutgoingMessage method.
+
+.. includecode:: code/docs/camel/Transformer.java#TransformOutgoingMessage
+
+Producer configuration options
+------------------------------
+
+The interaction of producer actors with Camel endpoints can be configured to be
+one-way or two-way (by initiating in-only or in-out message exchanges,
+respectively). By default, the producer initiates an in-out message exchange
+with the endpoint. For initiating an in-only exchange, producer actors have to override the isOneway method to return true.
+
+.. includecode:: code/docs/camel/OnewaySender.java#Oneway
+
+Message correlation
+-------------------
+
+To correlate request with response messages, applications can set the
+`Message.MessageExchangeId` message header.
+
+.. includecode:: code/docs/camel/ProducerTestBase.java#Correlate
+
+ProducerTemplate
+----------------
+
+The `UntypedProducerActor`_ class is a very convenient way for actors to produce messages to Camel endpoints.
+Actors may also use a Camel `ProducerTemplate`_ for producing messages to endpoints.
+
+.. includecode:: code/docs/camel/MyActor.java#ProducerTemplate
+
+For initiating a a two-way message exchange, one of the
+``ProducerTemplate.request*`` methods must be used.
+
+.. includecode:: code/docs/camel/RequestBodyActor.java#RequestProducerTemplate
+
+.. _UntypedProducerActor: http://github.com/akka/akka/blob/master/akka-camel/src/main/scala/akka/camel/javaapi/UntypedProducerActor.scala
+.. _ProducerTemplate: https://svn.apache.org/repos/asf/camel/tags/camel-2.8.0/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java
+
+.. _camel-asynchronous-routing-java:
+
+Asynchronous routing
+====================
+
+In-out message exchanges between endpoints and actors are
+designed to be asynchronous. This is the case for both, consumer and producer
+actors.
+
+* A consumer endpoint sends request messages to its consumer actor using the ``!``
+ (tell) operator and the actor returns responses with ``sender !`` once they are
+ ready.
+
+* A producer actor sends request messages to its endpoint using Camel's
+ asynchronous routing engine. Asynchronous responses are wrapped and added to the
+ producer actor's mailbox for later processing. By default, response messages are
+ returned to the initial sender but this can be overridden by Producer
+ implementations (see also description of the ``onRouteResponse`` method
+ in :ref:`camel-custom-processing-java`).
+
+However, asynchronous two-way message exchanges, without allocating a thread for
+the full duration of exchange, cannot be generically supported by Camel's
+asynchronous routing engine alone. This must be supported by the individual
+`Camel components`_ (from which endpoints are created) as well. They must be
+able to suspend any work started for request processing (thereby freeing threads
+to do other work) and resume processing when the response is ready. This is
+currently the case for a `subset of components`_ such as the `Jetty component`_.
+All other Camel components can still be used, of course, but they will cause
+allocation of a thread for the duration of an in-out message exchange. There's
+also a :ref:`camel-async-example-java` that implements both, an asynchronous
+consumer and an asynchronous producer, with the jetty component.
+
+.. _Camel components: http://camel.apache.org/components.html
+.. _subset of components: http://camel.apache.org/asynchronous-routing-engine.html
+.. _Jetty component: http://camel.apache.org/jetty.html
+
+Custom Camel routes
+===================
+
+In all the examples so far, routes to consumer actors have been automatically
+constructed by akka-camel, when the actor was started. Although the default
+route construction templates, used by akka-camel internally, are sufficient for
+most use cases, some applications may require more specialized routes to actors.
+The akka-camel module provides two mechanisms for customizing routes to actors,
+which will be explained in this section. These are:
+
+* Usage of :ref:`camel-components-java` to access actors.
+ Any Camel route can use these components to access Akka actors.
+
+* :ref:`camel-intercepting-route-construction-java` to actors.
+ This option gives you the ability to change routes that have already been added to Camel.
+ Consumer actors have a hook into the route definition process which can be used to change the route.
+
+
+.. _camel-components-java:
+
+Akka Camel components
+---------------------
+
+Akka actors can be accessed from Camel routes using the `actor`_ Camel component. This component can be used to
+access any Akka actor (not only consumer actors) from Camel routes, as described in the following sections.
+
+.. _actor: http://github.com/akka/akka/blob/master/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala
+
+.. _access-to-actors-java:
+
+Access to actors
+----------------
+
+To access actors from custom Camel routes, the `actor`_ Camel
+component should be used. It fully supports Camel's `asynchronous routing
+engine`_.
+
+.. _actor: http://github.com/akka/akka/blob/master/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala
+.. _asynchronous routing engine: http://camel.apache.org/asynchronous-routing-engine.html
+
+This component accepts the following endpoint URI format:
+
+* ``[]?``
+
+where ```` is the ``ActorPath`` to the actor. The ```` are
+name-value pairs separated by ``&`` (i.e. ``name1=value1&name2=value2&...``).
+
+
+URI options
+^^^^^^^^^^^
+
+The following URI options are supported:
+
++--------------+----------+---------+------------------------------------------------+
+| Name | Type | Default | Description |
++==============+==========+=========+================================================+
+| replyTimeout | Duration | false | The reply timeout, specified in the same |
+| | | | way that you use the duration in akka, |
+| | | | for instance ``10 seconds`` except that |
+| | | | in the url it is handy to use a + |
+| | | | between the amount and the unit, like |
+| | | | for example ``200+millis`` |
+| | | | |
+| | | | See also :ref:`camel-timeout-java`. |
++--------------+----------+---------+------------------------------------------------+
+| autoAck | Boolean | true | If set to true, in-only message exchanges |
+| | | | are auto-acknowledged when the message is |
+| | | | added to the actor's mailbox. If set to |
+| | | | false, actors must acknowledge the |
+| | | | receipt of the message. |
+| | | | |
+| | | | See also :ref:`camel-acknowledgements-java`. |
++--------------+----------+---------+------------------------------------------------+
+
+Here's an actor endpoint URI example containing an actor uuid::
+
+ akka://some-system/user/myconsumer?autoAck=false&replyTimeout=100+millis
+
+In the following example, a custom route to an actor is created, using the
+actor's path.
+
+.. includecode:: code/docs/camel/Responder.java#CustomRoute
+.. includecode:: code/docs/camel/CustomRouteBuilder.java#CustomRoute
+.. includecode:: code/docs/camel/CustomRouteTestBase.java#CustomRoute
+
+The `CamelPath.toCamelUri` converts the `ActorRef` to the Camel actor component URI format which points to the actor endpoint as described above.
+When a message is received on the jetty endpoint, it is routed to the Responder actor, which in return replies back to the client of
+the HTTP request.
+
+
+.. _camel-intercepting-route-construction-java:
+
+Intercepting route construction
+-------------------------------
+
+The previous section, :ref:`camel-components-java`, explained how to setup a route to
+an actor manually.
+It was the application's responsibility to define the route and add it to the current CamelContext.
+This section explains a more convenient way to define custom routes: akka-camel is still setting up the routes to consumer actors
+(and adds these routes to the current CamelContext) but applications can define extensions to these routes.
+Extensions can be defined with Camel's `Java DSL`_ or `Scala DSL`_. For example, an extension could be a custom error handler that redelivers messages from an endpoint to an actor's bounded mailbox when the mailbox was full.
+
+.. _Java DSL: http://camel.apache.org/dsl.html
+.. _Scala DSL: http://camel.apache.org/scala-dsl.html
+
+The following examples demonstrate how to extend a route to a consumer actor for
+handling exceptions thrown by that actor.
+
+.. includecode:: code/docs/camel/ErrorThrowingConsumer.java#ErrorThrowingConsumer
+
+The above ErrorThrowingConsumer sends the Failure back to the sender in preRestart
+because the Exception that is thrown in the actor would
+otherwise just crash the actor, by default the actor would be restarted, and the response would never reach the client of the Consumer.
+
+The akka-camel module creates a RouteDefinition instance by calling
+from(endpointUri) on a Camel RouteBuilder (where endpointUri is the endpoint URI
+of the consumer actor) and passes that instance as argument to the route
+definition handler \*). The route definition handler then extends the route and
+returns a ProcessorDefinition (in the above example, the ProcessorDefinition
+returned by the end method. See the `org.apache.camel.model`__ package for
+details). After executing the route definition handler, akka-camel finally calls
+a to(targetActorUri) on the returned ProcessorDefinition to complete the
+route to the consumer actor (where targetActorUri is the actor component URI as described in :ref:`access-to-actors-java`).
+If the actor cannot be found, a `ActorNotRegisteredException` is thrown.
+
+\*) Before passing the RouteDefinition instance to the route definition handler,
+akka-camel may make some further modifications to it.
+
+__ https://svn.apache.org/repos/asf/camel/tags/camel-2.8.0/camel-core/src/main/java/org/apache/camel/model/
+
+.. _camel-examples-java:
+
+Examples
+========
+
+.. _camel-async-example-java:
+
+Asynchronous routing and transformation example
+-----------------------------------------------
+
+This example demonstrates how to implement consumer and producer actors that
+support :ref:`camel-asynchronous-routing-java` with their Camel endpoints. The sample
+application transforms the content of the Akka homepage, http://akka.io, by
+replacing every occurrence of *Akka* with *AKKA*. To run this example, add
+a Boot class that starts the actors. After starting
+the :ref:`microkernel-java`, direct the browser to http://localhost:8875 and the
+transformed Akka homepage should be displayed. Please note that this example
+will probably not work if you're behind an HTTP proxy.
+
+The following figure gives an overview how the example actors interact with
+external systems and with each other. A browser sends a GET request to
+http://localhost:8875 which is the published endpoint of the ``HttpConsumer``
+actor. The ``HttpConsumer`` actor forwards the requests to the ``HttpProducer``
+actor which retrieves the Akka homepage from http://akka.io. The retrieved HTML
+is then forwarded to the ``HttpTransformer`` actor which replaces all occurrences
+of *Akka* with *AKKA*. The transformation result is sent back the HttpConsumer
+which finally returns it to the browser.
+
+.. image:: ../modules/camel-async-interact.png
+
+Implementing the example actor classes and wiring them together is rather easy
+as shown in the following snippet.
+
+.. includecode:: code/docs/camel/sample/http/HttpConsumer.java#HttpExample
+.. includecode:: code/docs/camel/sample/http/HttpProducer.java#HttpExample
+.. includecode:: code/docs/camel/sample/http/HttpTransformer.java#HttpExample
+.. includecode:: code/docs/camel/sample/http/HttpSample.java#HttpExample
+
+The `jetty endpoints`_ of HttpConsumer and HttpProducer support asynchronous
+in-out message exchanges and do not allocate threads for the full duration of
+the exchange. This is achieved by using `Jetty continuations`_ on the
+consumer-side and by using `Jetty's asynchronous HTTP client`_ on the producer
+side. The following high-level sequence diagram illustrates that.
+
+.. _jetty endpoints: http://camel.apache.org/jetty.html
+.. _Jetty continuations: http://wiki.eclipse.org/Jetty/Feature/Continuations
+.. _Jetty's asynchronous HTTP client: http://wiki.eclipse.org/Jetty/Tutorial/HttpClient
+
+.. image:: ../modules/camel-async-sequence.png
+
+Custom Camel route example
+--------------------------
+
+This section also demonstrates the combined usage of a ``Producer`` and a
+``Consumer`` actor as well as the inclusion of a custom Camel route. The
+following figure gives an overview.
+
+.. image:: ../modules/camel-custom-route.png
+
+* A consumer actor receives a message from an HTTP client
+
+* It forwards the message to another actor that transforms the message (encloses
+ the original message into hyphens)
+
+* The transformer actor forwards the transformed message to a producer actor
+
+* The producer actor sends the message to a custom Camel route beginning at the
+ ``direct:welcome`` endpoint
+
+* A processor (transformer) in the custom Camel route prepends "Welcome" to the
+ original message and creates a result message
+
+* The producer actor sends the result back to the consumer actor which returns
+ it to the HTTP client
+
+
+The consumer, transformer and
+producer actor implementations are as follows.
+
+.. includecode:: code/docs/camel/sample/route/Consumer3.java#CustomRouteExample
+.. includecode:: code/docs/camel/sample/route/Transformer.java#CustomRouteExample
+.. includecode:: code/docs/camel/sample/route/Producer1.java#CustomRouteExample
+.. includecode:: code/docs/camel/sample/route/CustomRouteSample.java#CustomRouteExample
+
+The producer actor knows where to reply the message to because the consumer and
+transformer actors have forwarded the original sender reference as well. The
+application configuration and the route starting from direct:welcome are done in the code above.
+
+To run the example, add the lines shown in the example to a Boot class and the start the :ref:`microkernel-java` and POST a message to
+``http://localhost:8877/camel/welcome``.
+
+.. code-block:: none
+
+ curl -H "Content-Type: text/plain" -d "Anke" http://localhost:8877/camel/welcome
+
+The response should be:
+
+.. code-block:: none
+
+ Welcome - Anke -
+
+Quartz Scheduler Example
+------------------------
+
+Here is an example showing how simple is to implement a cron-style scheduler by
+using the Camel Quartz component in Akka.
+
+The following example creates a "timer" actor which fires a message every 2
+seconds:
+
+.. includecode:: code/docs/camel/sample/quartz/MyQuartzActor.java#QuartzExample
+.. includecode:: code/docs/camel/sample/quartz/QuartzSample.java#QuartzExample
+
+For more information about the Camel Quartz component, see here:
+http://camel.apache.org/quartz.html
diff --git a/akka-docs/java/code/docs/camel/ActivationTestBase.java b/akka-docs/java/code/docs/camel/ActivationTestBase.java
new file mode 100644
index 0000000000..96e6ff6af0
--- /dev/null
+++ b/akka-docs/java/code/docs/camel/ActivationTestBase.java
@@ -0,0 +1,55 @@
+package docs.camel;
+//#CamelActivation
+ import akka.actor.ActorRef;
+ import akka.actor.ActorSystem;
+ import akka.actor.Props;
+ import akka.camel.Camel;
+ import akka.camel.CamelExtension;
+ import akka.camel.javaapi.UntypedConsumerActor;
+ import scala.concurrent.Future;
+ import scala.concurrent.util.Duration;
+ import scala.concurrent.util.FiniteDuration;
+ import static java.util.concurrent.TimeUnit.SECONDS;
+//#CamelActivation
+
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+public class ActivationTestBase {
+
+ @Test
+ public void testActivation() {
+ //#CamelActivation
+
+ // ..
+ ActorSystem system = ActorSystem.create("some-system");
+ Props props = new Props(MyConsumer.class);
+ ActorRef producer = system.actorOf(props,"myproducer");
+ Camel camel = CamelExtension.get(system);
+ // get a future reference to the activation of the endpoint of the Consumer Actor
+ FiniteDuration duration = Duration.create(10, SECONDS);
+ Future activationFuture = camel.activationFutureFor(producer, duration);
+ // or, block wait on the activation
+ camel.awaitActivation(producer, duration);
+ //#CamelActivation
+ //#CamelDeactivation
+ // ..
+ system.stop(producer);
+ // get a future reference to the deactivation of the endpoint of the Consumer Actor
+ Future deactivationFuture = camel.activationFutureFor(producer, duration);
+ // or, block wait on the deactivation
+ camel.awaitDeactivation(producer, duration);
+ //#CamelDeactivation
+ system.shutdown();
+ }
+
+ public static class MyConsumer extends UntypedConsumerActor {
+ public String getEndpointUri() {
+ return "direct:test";
+ }
+
+ public void onReceive(Object message) {
+ }
+ }
+}
diff --git a/akka-docs/java/code/docs/camel/CamelExtensionDocTest.scala b/akka-docs/java/code/docs/camel/CamelExtensionDocTest.scala
new file mode 100644
index 0000000000..4408d247e7
--- /dev/null
+++ b/akka-docs/java/code/docs/camel/CamelExtensionDocTest.scala
@@ -0,0 +1,5 @@
+package docs.camel
+
+import org.scalatest.junit.JUnitSuite
+
+class CamelExtensionDocTest extends CamelExtensionTestBase with JUnitSuite
diff --git a/akka-docs/java/code/docs/camel/CamelExtensionTestBase.java b/akka-docs/java/code/docs/camel/CamelExtensionTestBase.java
new file mode 100644
index 0000000000..0a7fc877b2
--- /dev/null
+++ b/akka-docs/java/code/docs/camel/CamelExtensionTestBase.java
@@ -0,0 +1,31 @@
+package docs.camel;
+
+import akka.actor.ActorSystem;
+import akka.camel.Camel;
+import akka.camel.CamelExtension;
+import org.apache.camel.CamelContext;
+import org.apache.camel.ProducerTemplate;
+import org.junit.Test;
+
+public class CamelExtensionTestBase {
+ @Test
+ public void getCamelExtension() {
+ //#CamelExtension
+ ActorSystem system = ActorSystem.create("some-system");
+ Camel camel = CamelExtension.get(system);
+ CamelContext camelContext = camel.context();
+ ProducerTemplate producerTemplate = camel.template();
+ //#CamelExtension
+ system.shutdown();
+ }
+ public void addActiveMQComponent() {
+ //#CamelExtensionAddComponent
+ ActorSystem system = ActorSystem.create("some-system");
+ Camel camel = CamelExtension.get(system);
+ CamelContext camelContext = camel.context();
+ // camelContext.addComponent("activemq", ActiveMQComponent.activeMQComponent("vm://localhost?broker.persistent=false"))
+ //#CamelExtensionAddComponent
+ system.shutdown();
+ }
+
+}
diff --git a/akka-docs/java/code/docs/camel/Consumer1.java b/akka-docs/java/code/docs/camel/Consumer1.java
new file mode 100644
index 0000000000..df21ee3fcc
--- /dev/null
+++ b/akka-docs/java/code/docs/camel/Consumer1.java
@@ -0,0 +1,24 @@
+package docs.camel;
+//#Consumer1
+import akka.camel.CamelMessage;
+import akka.camel.javaapi.UntypedConsumerActor;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
+
+public class Consumer1 extends UntypedConsumerActor {
+ LoggingAdapter log = Logging.getLogger(getContext().system(), this);
+
+ public String getEndpointUri() {
+ return "file:data/input/actor";
+ }
+
+ public void onReceive(Object message) {
+ if (message instanceof CamelMessage) {
+ CamelMessage camelMessage = (CamelMessage) message;
+ String body = camelMessage.getBodyAs(String.class, getCamelContext());
+ log.info("Received message: {}", body);
+ } else
+ unhandled(message);
+ }
+}
+//#Consumer1
diff --git a/akka-docs/java/code/docs/camel/Consumer2.java b/akka-docs/java/code/docs/camel/Consumer2.java
new file mode 100644
index 0000000000..b22c324877
--- /dev/null
+++ b/akka-docs/java/code/docs/camel/Consumer2.java
@@ -0,0 +1,20 @@
+package docs.camel;
+//#Consumer2
+import akka.camel.CamelMessage;
+import akka.camel.javaapi.UntypedConsumerActor;
+
+public class Consumer2 extends UntypedConsumerActor {
+ public String getEndpointUri() {
+ return "jetty:http://localhost:8877/camel/default";
+ }
+
+ public void onReceive(Object message) {
+ if (message instanceof CamelMessage) {
+ CamelMessage camelMessage = (CamelMessage) message;
+ String body = camelMessage.getBodyAs(String.class, getCamelContext());
+ getSender().tell(String.format("Received message: %s",body));
+ } else
+ unhandled(message);
+ }
+}
+//#Consumer2
diff --git a/akka-docs/java/code/docs/camel/Consumer3.java b/akka-docs/java/code/docs/camel/Consumer3.java
new file mode 100644
index 0000000000..bf661cb8ea
--- /dev/null
+++ b/akka-docs/java/code/docs/camel/Consumer3.java
@@ -0,0 +1,31 @@
+package docs.camel;
+//#Consumer3
+import akka.actor.Status;
+import akka.camel.Ack;
+import akka.camel.CamelMessage;
+import akka.camel.javaapi.UntypedConsumerActor;
+
+public class Consumer3 extends UntypedConsumerActor{
+
+ @Override
+ public boolean autoAck() {
+ return false;
+ }
+
+ public String getEndpointUri() {
+ return "jms:queue:test";
+ }
+
+ public void onReceive(Object message) {
+ if (message instanceof CamelMessage) {
+ getSender().tell(Ack.getInstance());
+ // on success
+ // ..
+ Exception someException = new Exception("e1");
+ // on failure
+ getSender().tell(new Status.Failure(someException));
+ } else
+ unhandled(message);
+ }
+}
+//#Consumer3
diff --git a/akka-docs/java/code/docs/camel/Consumer4.java b/akka-docs/java/code/docs/camel/Consumer4.java
new file mode 100644
index 0000000000..144d79965b
--- /dev/null
+++ b/akka-docs/java/code/docs/camel/Consumer4.java
@@ -0,0 +1,31 @@
+package docs.camel;
+//#Consumer4
+import akka.camel.CamelMessage;
+import akka.camel.javaapi.UntypedConsumerActor;
+import scala.concurrent.util.Duration;
+import scala.concurrent.util.FiniteDuration;
+
+import java.util.concurrent.TimeUnit;
+
+public class Consumer4 extends UntypedConsumerActor {
+ private final static FiniteDuration timeout = Duration.create(500, TimeUnit.MILLISECONDS);
+
+ @Override
+ public Duration replyTimeout() {
+ return timeout;
+ }
+
+ public String getEndpointUri() {
+ return "jetty:http://localhost:8877/camel/default";
+ }
+
+ public void onReceive(Object message) {
+ if (message instanceof CamelMessage) {
+ CamelMessage camelMessage = (CamelMessage) message;
+ String body = camelMessage.getBodyAs(String.class, getCamelContext());
+ getSender().tell(String.format("Hello %s",body));
+ } else
+ unhandled(message);
+ }
+}
+//#Consumer4
\ No newline at end of file
diff --git a/akka-docs/java/code/docs/camel/CustomRouteBuilder.java b/akka-docs/java/code/docs/camel/CustomRouteBuilder.java
new file mode 100644
index 0000000000..fd62d0dbbe
--- /dev/null
+++ b/akka-docs/java/code/docs/camel/CustomRouteBuilder.java
@@ -0,0 +1,18 @@
+package docs.camel;
+//#CustomRoute
+import akka.actor.ActorRef;
+import akka.camel.internal.component.CamelPath;
+import org.apache.camel.builder.RouteBuilder;
+
+public class CustomRouteBuilder extends RouteBuilder{
+ private String uri;
+
+ public CustomRouteBuilder(ActorRef responder) {
+ uri = CamelPath.toUri(responder);
+ }
+
+ public void configure() throws Exception {
+ from("jetty:http://localhost:8877/camel/custom").to(uri);
+ }
+}
+//#CustomRoute
diff --git a/akka-docs/java/code/docs/camel/CustomRouteTestBase.java b/akka-docs/java/code/docs/camel/CustomRouteTestBase.java
new file mode 100644
index 0000000000..8e6b3dc1bf
--- /dev/null
+++ b/akka-docs/java/code/docs/camel/CustomRouteTestBase.java
@@ -0,0 +1,20 @@
+package docs.camel;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.camel.Camel;
+import akka.camel.CamelExtension;
+
+public class CustomRouteTestBase {
+ public void customRoute() throws Exception{
+ //#CustomRoute
+ ActorSystem system = ActorSystem.create("some-system");
+ Camel camel = CamelExtension.get(system);
+ ActorRef responder = system.actorOf(new Props(Responder.class), "TestResponder");
+ camel.context().addRoutes(new CustomRouteBuilder(responder));
+ //#CustomRoute
+ system.stop(responder);
+ system.shutdown();
+ }
+}
diff --git a/akka-docs/java/code/docs/camel/ErrorThrowingConsumer.java b/akka-docs/java/code/docs/camel/ErrorThrowingConsumer.java
new file mode 100644
index 0000000000..23790021be
--- /dev/null
+++ b/akka-docs/java/code/docs/camel/ErrorThrowingConsumer.java
@@ -0,0 +1,42 @@
+package docs.camel;
+//#ErrorThrowingConsumer
+import akka.actor.Status;
+import akka.camel.CamelMessage;
+import akka.camel.javaapi.UntypedConsumerActor;
+import org.apache.camel.builder.Builder;
+import org.apache.camel.model.ProcessorDefinition;
+import org.apache.camel.model.RouteDefinition;
+import scala.Option;
+
+public class ErrorThrowingConsumer extends UntypedConsumerActor{
+ private String uri;
+
+ public ErrorThrowingConsumer(String uri){
+ this.uri = uri;
+ }
+
+ public String getEndpointUri() {
+ return uri;
+ }
+
+ public void onReceive(Object message) throws Exception{
+ if (message instanceof CamelMessage) {
+ CamelMessage camelMessage = (CamelMessage) message;
+ String body = camelMessage.getBodyAs(String.class, getCamelContext());
+ throw new Exception(String.format("error: %s",body));
+ } else
+ unhandled(message);
+ }
+
+ @Override
+ public ProcessorDefinition> onRouteDefinition(RouteDefinition rd) {
+ // Catch any exception and handle it by returning the exception message as response
+ return rd.onException(Exception.class).handled(true).transform(Builder.exceptionMessage()).end();
+ }
+
+ @Override
+ public void preRestart(Throwable reason, Option