From 92e3ca2629a5a586ad83f6fae71315de847b6d17 Mon Sep 17 00:00:00 2001 From: RayRoestenburg Date: Tue, 26 Jun 2012 00:47:06 +0200 Subject: [PATCH] added more docs for camel, couldnt resist to also remove a ; from DefaultCamel --- .../akka/camel/internal/DefaultCamel.scala | 2 +- akka-docs/scala/camel.rst | 159 +++++++++++++++++- .../scala/code/docs/camel/Consumers.scala | 37 ++++ .../scala/code/docs/camel/Introduction.scala | 61 ++++++- .../scala/code/docs/camel/Producers.scala | 14 ++ 5 files changed, 262 insertions(+), 11 deletions(-) create mode 100644 akka-docs/scala/code/docs/camel/Producers.scala diff --git a/akka-camel/src/main/scala/akka/camel/internal/DefaultCamel.scala b/akka-camel/src/main/scala/akka/camel/internal/DefaultCamel.scala index 2ac35fdec2..e390c799e9 100644 --- a/akka-camel/src/main/scala/akka/camel/internal/DefaultCamel.scala +++ b/akka-camel/src/main/scala/akka/camel/internal/DefaultCamel.scala @@ -26,7 +26,7 @@ private[camel] class DefaultCamel(val system: ActorSystem) extends Camel { lazy val context: CamelContext = { val ctx = new DefaultCamelContext - ctx.setName(system.name); + ctx.setName(system.name) ctx.setStreamCaching(true) ctx.addComponent("actor", new ActorComponent(this)) ctx.getTypeConverterRegistry.addTypeConverter(classOf[Duration], classOf[String], DurationTypeConverter) diff --git a/akka-docs/scala/camel.rst b/akka-docs/scala/camel.rst index 886e00c320..be5fd589a0 100644 --- a/akka-docs/scala/camel.rst +++ b/akka-docs/scala/camel.rst @@ -21,14 +21,13 @@ Other, more advanced external articles (for version 1) are: * `Akka Consumer Actors: New Features and Best Practices `_ * `Akka Producer Actors: New Features and Best Practices `_ - - Introduction ============ The akka-camel module allows actors to receive -and send messages over a great variety of protocols and APIs. This section gives -a brief overview of the general ideas behind the akka-camel module, the +and send messages over a great variety of protocols and APIs (akka-camel version 1.x also provided +this functionality to Typed Actors. In akka-camel 2, support for Typed Actors has been removed.) +This section gives a brief overview of the general ideas behind the akka-camel module, the remaining sections go into the details. In addition to the native Scala and Java actor API, actors can now exchange messages with other systems over large number of protocols and APIs such as HTTP, SOAP, TCP, FTP, SMTP or JMS, to mention a @@ -90,10 +89,55 @@ protocol-specific details from Akka and makes it therefore very easy to support a large number of protocols through a uniform Camel component interface. The akka-camel module further converts mutable Camel messages into immutable representations which are used by Consumer and Producer actors for pattern -matching, transformation, serialization or storage. +matching, transformation, serialization or storage. In the above example of the Orders Producer, +the XML message is put in the body of a newly created Camel Message with an empty set of headers. +You can also create a CamelMessage yourself with the appropriate body and headers as you see fit. __ https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/main/java/org/apache/camel/Message.java +CamelExtension +-------------- +The akka-camel module is implemented as an Akka Extension, the ``CamelExtension`` object. +Extensions will only be loaded once per ``ActorSystem``, which will be managed by Akka. There is a one to one relationship between the ``CamelExtension`` and +the ``ActorSystem``, there can be only one ``CamelExtension`` for one ``ActorSystem``. +The ``CamelExtension`` object provides access to the `Camel`_ trait. +The `Camel`_ trait in turn provides access to two important Apache Camel objects, the `CamelContext`_ and the `ProducerTemplate`_. +Below you can see how you can get access to these Apache Camel objects. + +.. includecode:: code/docs/camel/Introduction.scala#CamelExtension + +One ``CamelExtension`` is only loaded once for every one ``ActorSystem``, which makes it safe to call the ``CamelExtension`` at any point in your code to get to the +Apache Camel objects associated with it. There is one `CamelContext`_ and one `ProducerTemplate`_ for every one ``ActorSystem`` that uses a ``CamelExtension``. +Below an example on how to add the ActiveMQ component to the `CamelContext`_, which is required when you would like to use the ActiveMQ component. + +.. includecode:: code/docs/camel/Introduction.scala#CamelExtensionAddComponent + +The `CamelContext`_ joins the lifecycle of the ``ActorSystem`` and ``CamelExtension`` it is associated with; the `CamelContext`_ is started when +the ``CamelExtension`` is created, and it is shutdown when the associated ``ActorSystem`` is shut down. The same is true for the `ProducerTemplate`_. + +The ``CamelExtension`` is used by both `Producer` and `Consumer` actors to interact with Apache Camel internally. +When Akka creates a `Consumer` actor, the `Consumer` is published at its +Camel endpoint (more precisely, the route is added to the `CamelContext`_ from the `Endpoint`_ to the actor). +When Akka creates a `Producer` actor, a `SendProcessor`_ and `Endpoint`_ are created so that the Producer can send messages to it. +Publication is done asynchronously; setting up an endpoint may still be in progress after you have +requested the actor to be created. Some Camel components can take a while to startup, and in some cases you might want to know when the endpoints are activated and ready to be used. +The `Camel`_ trait allows you to find out when the endpoint is activated or deactivated. + +.. includecode:: code/docs/camel/Introduction.scala#CamelActivation + +The above code shows that you can get a ``Future`` to the activation of the route from the endpoint to the actor, or you can wait in a blocking fashion on the activation of the route. +An ``ActivationTimeoutException`` is thrown if the endpoint could not be activated within the specified timeout. Deactivation works in a similar fashion: + +.. includecode:: code/docs/camel/Introduction.scala#CamelDeactivation + +Deactivation of a Consumer or a Producer actor happens when the actor is terminated. For a Consumer, the route to the actor is stopped. For a Producer, the `SendProcessor`_ is stopped. +A ``DeActivationTimeoutException`` is thrown if the associated camel objects could not be deactivated within the specified timeout. + +.. _Camel: http://github.com/akka/akka/blob/master/akka-camel/src/main/scala/akka/camel/Camel.scala +.. _CamelContext: https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java +.. _ProducerTemplate: https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java +.. _SendProcessor: https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java +.. _Endpoint: https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java Dependencies ============ @@ -156,3 +200,108 @@ new Message object is created by akka-camel with the actor response as message body. .. _Message: http://github.com/akka/akka/blob/master/akka-camel/src/main/scala/akka/camel/CamelMessage.scala + +Acknowledgements +---------------- + +With in-out message exchanges, clients usually know that a message exchange is +done when they receive a reply from a consumer actor. The reply message can be a +CamelMessage (or any object which is then internally converted to a CamelMessage) on +success, and a Failure message on failure. + +With in-only message exchanges, by default, an exchange is done when a message +is added to the consumer actor's mailbox. Any failure or exception that occurs +during processing of that message by the consumer actor cannot be reported back +to the endpoint in this case. To allow consumer actors to positively or +negatively acknowledge the receipt of a message from an in-only message +exchange, they need to override the ``autoack`` method to return false. +In this case, consumer actors must reply either with a +special Ack message (positive acknowledgement) or a Failure (negative +acknowledgement). + +.. includecode:: code/docs/camel/Consumers.scala#Consumer3 + +Consumer timeout +---------------- + +Camel Exchanges (and their corresponding endpoints) that support two-way communications need to wait for a response from +an actor before returning it to the initiating client. +For some endpoint types, timeout values can be defined in an endpoint-specific +way which is described in the documentation of the individual `Camel +components`_. Another option is to configure timeouts on the level of consumer +actors. + +.. _Camel components: http://camel.apache.org/components.html + +Two-way communications between a Camel endpoint and an actor are +initiated by sending the request message to the actor with the ask pattern +and the actor replies to the endpoint when the response is ready. The ask request to the actor can timeout, which will +result in the `Exchange`_ failing with a TimeoutException set on the failure of the `Exchange`_. +The timeout on the consumer actor can be overridden with the ``replyTimeout``, as shown below. + +.. includecode:: code/docs/camel/Consumers.scala#Consumer4 +.. _Exchange: https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java + +Producer Actors +=============== + +For sending messages to Camel endpoints, actors need to mixin the `Producer`_ trait and implement the endpointUri method. + +.. includecode:: code/docs/camel/Producers.scala#Producer1 +.. _Producer: http://github.com/akka/akka/blob/master/akka-camel/src/main/scala/akka/camel/Producer.scala + + + + + Actors (untyped) + Consumer publishing + Actors (untyped) + Typed actors + Consumers and the CamelService + Consumer un-publishing + Actors (untyped) + Typed actors + Acknowledgements + Actors (untyped) + Blocking exchanges + Consumer timeout + Typed actors + Actors (untyped) + Remote consumers + Actors (untyped) + Typed actors +Produce messages + Producer trait + Actors (untyped) + Custom Processing + Producer configuration options + Message correlation + Matching responses + ProducerTemplate + Actors (untyped) + Typed actors + Asynchronous routing +Fault tolerance +CamelService configuration +Standalone applications +Standalone Spring applications +Kernel mode +Custom Camel routes +Akka Camel components +Access to actors +URI options +Message headers +Access to typed actors +Using Spring +Without Spring +Intercepting route construction +Actors (untyped) +Typed actors +Examples +Asynchronous routing and transformation example +Custom Camel route example +Publish-subcribe example +JMS +Cometd +Quartz Scheduler Example + diff --git a/akka-docs/scala/code/docs/camel/Consumers.scala b/akka-docs/scala/code/docs/camel/Consumers.scala index df7161b9e6..289936c4dd 100644 --- a/akka-docs/scala/code/docs/camel/Consumers.scala +++ b/akka-docs/scala/code/docs/camel/Consumers.scala @@ -27,4 +27,41 @@ object Consumers { } //#Consumer2 } + { + //#Consumer3 + import akka.camel.{ CamelMessage, Consumer } + import akka.camel.Ack + import akka.actor.Status.Failure + + class Consumer3 extends Consumer { + override def autoack = false + + def endpointUri = "jms:queue:test" + + def receive = { + case msg:CamelMessage ⇒ + sender ! Ack + // on success + // .. + val someException = new Exception("e1") + // on failure + sender ! Failure(someException) + } + } + //#Consumer3 + } + { + //#Consumer4 + import akka.camel.{ CamelMessage, Consumer } + import akka.util.duration._ + + class Consumer4 extends Consumer { + def endpointUri = "jetty:http://localhost:8877/camel/default" + override def replyTimeout = 500 millis + def receive = { + case msg: CamelMessage ⇒ sender ! ("Hello %s" format msg.bodyAs[String]) + } + } + //#Consumer4 + } } \ No newline at end of file diff --git a/akka-docs/scala/code/docs/camel/Introduction.scala b/akka-docs/scala/code/docs/camel/Introduction.scala index eaf4c400f6..92ddccfc51 100644 --- a/akka-docs/scala/code/docs/camel/Introduction.scala +++ b/akka-docs/scala/code/docs/camel/Introduction.scala @@ -1,11 +1,14 @@ package docs.camel +import akka.actor.{Props, ActorSystem} +import akka.camel.CamelExtension + object Introduction { { //#Consumer-mina import akka.camel.{ CamelMessage, Consumer } - class MinaClient extends Consumer { + class MyEndpoint extends Consumer { def endpointUri = "mina:tcp://localhost:6200?textline=true" def receive = { @@ -17,15 +20,15 @@ object Introduction { // start and expose actor via tcp import akka.actor.{ ActorSystem, Props } - val sys = ActorSystem("camel") - val mina = sys.actorOf(Props[MinaClient]) + val system = ActorSystem("some-system") + val mina = system.actorOf(Props[MyEndpoint]) //#Consumer-mina } { //#Consumer import akka.camel.{ CamelMessage, Consumer } - class JettyAdapter extends Consumer { + class MyEndpoint extends Consumer { def endpointUri = "jetty:http://localhost:8877/example" def receive = { @@ -45,10 +48,58 @@ object Introduction { def endpointUri = "jms:queue:Orders" } - val sys = ActorSystem("camel") + val sys = ActorSystem("some-system") val orders = sys.actorOf(Props[Orders]) orders ! //#Producer } + { + //#CamelExtension + val system = ActorSystem("some-system") + val camel = CamelExtension(system) + val camelContext = camel.context + val producerTemplate = camel.template + + //#CamelExtension + } + { + //#CamelExtensionAddComponent + // import org.apache.activemq.camel.component.ActiveMQComponent + val system = ActorSystem("some-system") + val camel = CamelExtension(system) + val camelContext = camel.context + // camelContext.addComponent("activemq", ActiveMQComponent.activeMQComponent("vm://localhost?broker.persistent=false")) + //#CamelExtensionAddComponent + } + { + //#CamelActivation + import akka.camel.{ CamelMessage, Consumer } + import akka.util.duration._ + + class MyEndpoint extends Consumer { + def endpointUri = "mina:tcp://localhost:6200?textline=true" + + def receive = { + case msg: CamelMessage ⇒ { /* ... */ } + case _ ⇒ { /* ... */ } + } + } + val system = ActorSystem("some-system") + val camel = CamelExtension(system) + val actorRef = system.actorOf(Props[MyEndpoint]) + // get a future reference to the activation of the endpoint of the Consumer Actor + val activationFuture = camel.activationFutureFor(actorRef,10 seconds) + // or, block wait on the activation + camel.awaitActivation(actorRef, 10 seconds) + //#CamelActivation + //#CamelDeactivation + system.stop(actorRef) + // get a future reference to the deactivation of the endpoint of the Consumer Actor + val deactivationFuture = camel.activationFutureFor(actorRef,10 seconds) + // or, block wait on the deactivation + camel.awaitDeactivation(actorRef, 10 seconds) + //#CamelDeactivation + } + } \ No newline at end of file diff --git a/akka-docs/scala/code/docs/camel/Producers.scala b/akka-docs/scala/code/docs/camel/Producers.scala new file mode 100644 index 0000000000..bc3576e93a --- /dev/null +++ b/akka-docs/scala/code/docs/camel/Producers.scala @@ -0,0 +1,14 @@ +package docs.camel + +object Producers { + { + //#Producer1 + import akka.camel.Producer + import akka.actor.Actor + + class Producer1 extends Actor with Producer { + def endpointUri = "http://localhost:8080/news" + } + //#Producer1 + } +}