From 92e3ca2629a5a586ad83f6fae71315de847b6d17 Mon Sep 17 00:00:00 2001 From: RayRoestenburg Date: Tue, 26 Jun 2012 00:47:06 +0200 Subject: [PATCH 1/6] added more docs for camel, couldnt resist to also remove a ; from DefaultCamel --- .../akka/camel/internal/DefaultCamel.scala | 2 +- akka-docs/scala/camel.rst | 159 +++++++++++++++++- .../scala/code/docs/camel/Consumers.scala | 37 ++++ .../scala/code/docs/camel/Introduction.scala | 61 ++++++- .../scala/code/docs/camel/Producers.scala | 14 ++ 5 files changed, 262 insertions(+), 11 deletions(-) create mode 100644 akka-docs/scala/code/docs/camel/Producers.scala diff --git a/akka-camel/src/main/scala/akka/camel/internal/DefaultCamel.scala b/akka-camel/src/main/scala/akka/camel/internal/DefaultCamel.scala index 2ac35fdec2..e390c799e9 100644 --- a/akka-camel/src/main/scala/akka/camel/internal/DefaultCamel.scala +++ b/akka-camel/src/main/scala/akka/camel/internal/DefaultCamel.scala @@ -26,7 +26,7 @@ private[camel] class DefaultCamel(val system: ActorSystem) extends Camel { lazy val context: CamelContext = { val ctx = new DefaultCamelContext - ctx.setName(system.name); + ctx.setName(system.name) ctx.setStreamCaching(true) ctx.addComponent("actor", new ActorComponent(this)) ctx.getTypeConverterRegistry.addTypeConverter(classOf[Duration], classOf[String], DurationTypeConverter) diff --git a/akka-docs/scala/camel.rst b/akka-docs/scala/camel.rst index 886e00c320..be5fd589a0 100644 --- a/akka-docs/scala/camel.rst +++ b/akka-docs/scala/camel.rst @@ -21,14 +21,13 @@ Other, more advanced external articles (for version 1) are: * `Akka Consumer Actors: New Features and Best Practices `_ * `Akka Producer Actors: New Features and Best Practices `_ - - Introduction ============ The akka-camel module allows actors to receive -and send messages over a great variety of protocols and APIs. This section gives -a brief overview of the general ideas behind the akka-camel module, the +and send messages over a great variety of protocols and APIs (akka-camel version 1.x also provided +this functionality to Typed Actors. In akka-camel 2, support for Typed Actors has been removed.) +This section gives a brief overview of the general ideas behind the akka-camel module, the remaining sections go into the details. In addition to the native Scala and Java actor API, actors can now exchange messages with other systems over large number of protocols and APIs such as HTTP, SOAP, TCP, FTP, SMTP or JMS, to mention a @@ -90,10 +89,55 @@ protocol-specific details from Akka and makes it therefore very easy to support a large number of protocols through a uniform Camel component interface. The akka-camel module further converts mutable Camel messages into immutable representations which are used by Consumer and Producer actors for pattern -matching, transformation, serialization or storage. +matching, transformation, serialization or storage. In the above example of the Orders Producer, +the XML message is put in the body of a newly created Camel Message with an empty set of headers. +You can also create a CamelMessage yourself with the appropriate body and headers as you see fit. __ https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/main/java/org/apache/camel/Message.java +CamelExtension +-------------- +The akka-camel module is implemented as an Akka Extension, the ``CamelExtension`` object. +Extensions will only be loaded once per ``ActorSystem``, which will be managed by Akka. There is a one to one relationship between the ``CamelExtension`` and +the ``ActorSystem``, there can be only one ``CamelExtension`` for one ``ActorSystem``. +The ``CamelExtension`` object provides access to the `Camel`_ trait. +The `Camel`_ trait in turn provides access to two important Apache Camel objects, the `CamelContext`_ and the `ProducerTemplate`_. +Below you can see how you can get access to these Apache Camel objects. + +.. includecode:: code/docs/camel/Introduction.scala#CamelExtension + +One ``CamelExtension`` is only loaded once for every one ``ActorSystem``, which makes it safe to call the ``CamelExtension`` at any point in your code to get to the +Apache Camel objects associated with it. There is one `CamelContext`_ and one `ProducerTemplate`_ for every one ``ActorSystem`` that uses a ``CamelExtension``. +Below an example on how to add the ActiveMQ component to the `CamelContext`_, which is required when you would like to use the ActiveMQ component. + +.. includecode:: code/docs/camel/Introduction.scala#CamelExtensionAddComponent + +The `CamelContext`_ joins the lifecycle of the ``ActorSystem`` and ``CamelExtension`` it is associated with; the `CamelContext`_ is started when +the ``CamelExtension`` is created, and it is shutdown when the associated ``ActorSystem`` is shut down. The same is true for the `ProducerTemplate`_. + +The ``CamelExtension`` is used by both `Producer` and `Consumer` actors to interact with Apache Camel internally. +When Akka creates a `Consumer` actor, the `Consumer` is published at its +Camel endpoint (more precisely, the route is added to the `CamelContext`_ from the `Endpoint`_ to the actor). +When Akka creates a `Producer` actor, a `SendProcessor`_ and `Endpoint`_ are created so that the Producer can send messages to it. +Publication is done asynchronously; setting up an endpoint may still be in progress after you have +requested the actor to be created. Some Camel components can take a while to startup, and in some cases you might want to know when the endpoints are activated and ready to be used. +The `Camel`_ trait allows you to find out when the endpoint is activated or deactivated. + +.. includecode:: code/docs/camel/Introduction.scala#CamelActivation + +The above code shows that you can get a ``Future`` to the activation of the route from the endpoint to the actor, or you can wait in a blocking fashion on the activation of the route. +An ``ActivationTimeoutException`` is thrown if the endpoint could not be activated within the specified timeout. Deactivation works in a similar fashion: + +.. includecode:: code/docs/camel/Introduction.scala#CamelDeactivation + +Deactivation of a Consumer or a Producer actor happens when the actor is terminated. For a Consumer, the route to the actor is stopped. For a Producer, the `SendProcessor`_ is stopped. +A ``DeActivationTimeoutException`` is thrown if the associated camel objects could not be deactivated within the specified timeout. + +.. _Camel: http://github.com/akka/akka/blob/master/akka-camel/src/main/scala/akka/camel/Camel.scala +.. _CamelContext: https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java +.. _ProducerTemplate: https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java +.. _SendProcessor: https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java +.. _Endpoint: https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java Dependencies ============ @@ -156,3 +200,108 @@ new Message object is created by akka-camel with the actor response as message body. .. _Message: http://github.com/akka/akka/blob/master/akka-camel/src/main/scala/akka/camel/CamelMessage.scala + +Acknowledgements +---------------- + +With in-out message exchanges, clients usually know that a message exchange is +done when they receive a reply from a consumer actor. The reply message can be a +CamelMessage (or any object which is then internally converted to a CamelMessage) on +success, and a Failure message on failure. + +With in-only message exchanges, by default, an exchange is done when a message +is added to the consumer actor's mailbox. Any failure or exception that occurs +during processing of that message by the consumer actor cannot be reported back +to the endpoint in this case. To allow consumer actors to positively or +negatively acknowledge the receipt of a message from an in-only message +exchange, they need to override the ``autoack`` method to return false. +In this case, consumer actors must reply either with a +special Ack message (positive acknowledgement) or a Failure (negative +acknowledgement). + +.. includecode:: code/docs/camel/Consumers.scala#Consumer3 + +Consumer timeout +---------------- + +Camel Exchanges (and their corresponding endpoints) that support two-way communications need to wait for a response from +an actor before returning it to the initiating client. +For some endpoint types, timeout values can be defined in an endpoint-specific +way which is described in the documentation of the individual `Camel +components`_. Another option is to configure timeouts on the level of consumer +actors. + +.. _Camel components: http://camel.apache.org/components.html + +Two-way communications between a Camel endpoint and an actor are +initiated by sending the request message to the actor with the ask pattern +and the actor replies to the endpoint when the response is ready. The ask request to the actor can timeout, which will +result in the `Exchange`_ failing with a TimeoutException set on the failure of the `Exchange`_. +The timeout on the consumer actor can be overridden with the ``replyTimeout``, as shown below. + +.. includecode:: code/docs/camel/Consumers.scala#Consumer4 +.. _Exchange: https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java + +Producer Actors +=============== + +For sending messages to Camel endpoints, actors need to mixin the `Producer`_ trait and implement the endpointUri method. + +.. includecode:: code/docs/camel/Producers.scala#Producer1 +.. _Producer: http://github.com/akka/akka/blob/master/akka-camel/src/main/scala/akka/camel/Producer.scala + + + + + Actors (untyped) + Consumer publishing + Actors (untyped) + Typed actors + Consumers and the CamelService + Consumer un-publishing + Actors (untyped) + Typed actors + Acknowledgements + Actors (untyped) + Blocking exchanges + Consumer timeout + Typed actors + Actors (untyped) + Remote consumers + Actors (untyped) + Typed actors +Produce messages + Producer trait + Actors (untyped) + Custom Processing + Producer configuration options + Message correlation + Matching responses + ProducerTemplate + Actors (untyped) + Typed actors + Asynchronous routing +Fault tolerance +CamelService configuration +Standalone applications +Standalone Spring applications +Kernel mode +Custom Camel routes +Akka Camel components +Access to actors +URI options +Message headers +Access to typed actors +Using Spring +Without Spring +Intercepting route construction +Actors (untyped) +Typed actors +Examples +Asynchronous routing and transformation example +Custom Camel route example +Publish-subcribe example +JMS +Cometd +Quartz Scheduler Example + diff --git a/akka-docs/scala/code/docs/camel/Consumers.scala b/akka-docs/scala/code/docs/camel/Consumers.scala index df7161b9e6..289936c4dd 100644 --- a/akka-docs/scala/code/docs/camel/Consumers.scala +++ b/akka-docs/scala/code/docs/camel/Consumers.scala @@ -27,4 +27,41 @@ object Consumers { } //#Consumer2 } + { + //#Consumer3 + import akka.camel.{ CamelMessage, Consumer } + import akka.camel.Ack + import akka.actor.Status.Failure + + class Consumer3 extends Consumer { + override def autoack = false + + def endpointUri = "jms:queue:test" + + def receive = { + case msg:CamelMessage ⇒ + sender ! Ack + // on success + // .. + val someException = new Exception("e1") + // on failure + sender ! Failure(someException) + } + } + //#Consumer3 + } + { + //#Consumer4 + import akka.camel.{ CamelMessage, Consumer } + import akka.util.duration._ + + class Consumer4 extends Consumer { + def endpointUri = "jetty:http://localhost:8877/camel/default" + override def replyTimeout = 500 millis + def receive = { + case msg: CamelMessage ⇒ sender ! ("Hello %s" format msg.bodyAs[String]) + } + } + //#Consumer4 + } } \ No newline at end of file diff --git a/akka-docs/scala/code/docs/camel/Introduction.scala b/akka-docs/scala/code/docs/camel/Introduction.scala index eaf4c400f6..92ddccfc51 100644 --- a/akka-docs/scala/code/docs/camel/Introduction.scala +++ b/akka-docs/scala/code/docs/camel/Introduction.scala @@ -1,11 +1,14 @@ package docs.camel +import akka.actor.{Props, ActorSystem} +import akka.camel.CamelExtension + object Introduction { { //#Consumer-mina import akka.camel.{ CamelMessage, Consumer } - class MinaClient extends Consumer { + class MyEndpoint extends Consumer { def endpointUri = "mina:tcp://localhost:6200?textline=true" def receive = { @@ -17,15 +20,15 @@ object Introduction { // start and expose actor via tcp import akka.actor.{ ActorSystem, Props } - val sys = ActorSystem("camel") - val mina = sys.actorOf(Props[MinaClient]) + val system = ActorSystem("some-system") + val mina = system.actorOf(Props[MyEndpoint]) //#Consumer-mina } { //#Consumer import akka.camel.{ CamelMessage, Consumer } - class JettyAdapter extends Consumer { + class MyEndpoint extends Consumer { def endpointUri = "jetty:http://localhost:8877/example" def receive = { @@ -45,10 +48,58 @@ object Introduction { def endpointUri = "jms:queue:Orders" } - val sys = ActorSystem("camel") + val sys = ActorSystem("some-system") val orders = sys.actorOf(Props[Orders]) orders ! //#Producer } + { + //#CamelExtension + val system = ActorSystem("some-system") + val camel = CamelExtension(system) + val camelContext = camel.context + val producerTemplate = camel.template + + //#CamelExtension + } + { + //#CamelExtensionAddComponent + // import org.apache.activemq.camel.component.ActiveMQComponent + val system = ActorSystem("some-system") + val camel = CamelExtension(system) + val camelContext = camel.context + // camelContext.addComponent("activemq", ActiveMQComponent.activeMQComponent("vm://localhost?broker.persistent=false")) + //#CamelExtensionAddComponent + } + { + //#CamelActivation + import akka.camel.{ CamelMessage, Consumer } + import akka.util.duration._ + + class MyEndpoint extends Consumer { + def endpointUri = "mina:tcp://localhost:6200?textline=true" + + def receive = { + case msg: CamelMessage ⇒ { /* ... */ } + case _ ⇒ { /* ... */ } + } + } + val system = ActorSystem("some-system") + val camel = CamelExtension(system) + val actorRef = system.actorOf(Props[MyEndpoint]) + // get a future reference to the activation of the endpoint of the Consumer Actor + val activationFuture = camel.activationFutureFor(actorRef,10 seconds) + // or, block wait on the activation + camel.awaitActivation(actorRef, 10 seconds) + //#CamelActivation + //#CamelDeactivation + system.stop(actorRef) + // get a future reference to the deactivation of the endpoint of the Consumer Actor + val deactivationFuture = camel.activationFutureFor(actorRef,10 seconds) + // or, block wait on the deactivation + camel.awaitDeactivation(actorRef, 10 seconds) + //#CamelDeactivation + } + } \ No newline at end of file diff --git a/akka-docs/scala/code/docs/camel/Producers.scala b/akka-docs/scala/code/docs/camel/Producers.scala new file mode 100644 index 0000000000..bc3576e93a --- /dev/null +++ b/akka-docs/scala/code/docs/camel/Producers.scala @@ -0,0 +1,14 @@ +package docs.camel + +object Producers { + { + //#Producer1 + import akka.camel.Producer + import akka.actor.Actor + + class Producer1 extends Actor with Producer { + def endpointUri = "http://localhost:8080/news" + } + //#Producer1 + } +} From a7a96687e4b5a4c95b4b78d4549b1bfd607c0db5 Mon Sep 17 00:00:00 2001 From: RayRoestenburg Date: Tue, 26 Jun 2012 10:43:31 +0200 Subject: [PATCH 2/6] added akka camel docs --- akka-docs/scala/camel.rst | 394 +++++++++++++++--- .../scala/code/docs/camel/CustomRoute.scala | 58 +++ .../code/docs/camel/CustomRouteExample.scala | 51 +++ .../scala/code/docs/camel/HttpExample.scala | 47 +++ .../scala/code/docs/camel/Producers.scala | 117 +++++- .../code/docs/camel/PublishSubscribe.scala | 48 +++ .../scala/code/docs/camel/QuartzExample.scala | 35 ++ 7 files changed, 698 insertions(+), 52 deletions(-) create mode 100644 akka-docs/scala/code/docs/camel/CustomRoute.scala create mode 100644 akka-docs/scala/code/docs/camel/CustomRouteExample.scala create mode 100644 akka-docs/scala/code/docs/camel/HttpExample.scala create mode 100644 akka-docs/scala/code/docs/camel/PublishSubscribe.scala create mode 100644 akka-docs/scala/code/docs/camel/QuartzExample.scala diff --git a/akka-docs/scala/camel.rst b/akka-docs/scala/camel.rst index be5fd589a0..00f7b6eb10 100644 --- a/akka-docs/scala/camel.rst +++ b/akka-docs/scala/camel.rst @@ -201,6 +201,8 @@ body. .. _Message: http://github.com/akka/akka/blob/master/akka-camel/src/main/scala/akka/camel/CamelMessage.scala +.. _camel-acknowledgements: + Acknowledgements ---------------- @@ -221,6 +223,8 @@ acknowledgement). .. includecode:: code/docs/camel/Consumers.scala#Consumer3 +.. _camel-timeout: + Consumer timeout ---------------- @@ -248,60 +252,350 @@ Producer Actors For sending messages to Camel endpoints, actors need to mixin the `Producer`_ trait and implement the endpointUri method. .. includecode:: code/docs/camel/Producers.scala#Producer1 + +Producer1 inherits a default implementation of the receive method from the +Producer trait. To customize a producer actor's default behavior it is +recommended to override the `Producer`_.transformResponse and +`Producer`_.transformOutgoingMessage methods. This is explained later in more detail. +Actors should not override the default `Producer`_.receive method. + +Any message sent to a `Producer`_ actor will be sent to +the associated Camel endpoint, in the above example to +``http://localhost:8080/news``. The `Producer`_ always sends messages asynchronously. Response messages (if supported by the +configured endpoint) will, by default, be returned to the original sender. The +following example uses the ask pattern to send a message to a +Producer actor and waits for a response. + +.. includecode:: code/docs/camel/Producers.scala#AskProducer + +The future contains the response CamelMessage, or an ``AkkaCamelException`` when an error occurred, which contains the headers of the response. + +.. _camel-custom-processing: + +Custom Processing +----------------- + +Instead of replying to the initial sender, producer actors can implement custom +response processing by overriding the routeResponse method. In the following example, the response +message is forwarded to a target actor instead of being replied to the original +sender. + +.. includecode:: code/docs/camel/Producers.scala#RouteResponse + +Before producing messages to endpoints, producer actors can pre-process them by +overriding the `Producer`_.transformOutgoingMessage method. + +.. includecode:: code/docs/camel/Producers.scala#TransformOutgoingMessage + +Producer configuration options +------------------------------ + +The interaction of producer actors with Camel endpoints can be configured to be +one-way or two-way (by initiating in-only or in-out message exchanges, +respectively). By default, the producer initiates an in-out message exchange +with the endpoint. For initiating an in-only exchange, producer actors have to override the oneway method to return true. + +.. includecode:: code/docs/camel/Producers.scala#Oneway + +Message correlation +------------------- + +To correlate request with response messages, applications can set the +Message.MessageExchangeId message header. + +.. includecode:: code/docs/camel/Producers.scala#Correlate + +ProducerTemplate +---------------- + +The `Producer`_ trait is a very +convenient way for actors to produce messages to Camel endpoints. Actors may also use a Camel `ProducerTemplate`_ for producing +messages to endpoints. + +.. includecode:: code/docs/camel/Producers.scala#ProducerTemplate + +For initiating a a two-way message exchange, one of the +``ProducerTemplate.request*`` methods must be used. + +.. includecode:: code/docs/camel/Producers.scala#RequestProducerTemplate + .. _Producer: http://github.com/akka/akka/blob/master/akka-camel/src/main/scala/akka/camel/Producer.scala +.. _ProducerTemplate: https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java +.. _camel-asynchronous-routing: +Asynchronous routing +==================== +Since Akka 0.10, in-out message exchanges between endpoints and actors are +designed to be asynchronous. This is the case for both, consumer and producer +actors. + +* A consumer endpoint sends request messages to its consumer actor using the ``!`` + (bang) operator and the actor returns responses with ``sender !`` once they are + ready. + +* A producer actor sends request messages to its endpoint using Camel's + asynchronous routing engine. Asynchronous responses are wrapped and added to the + producer actor's mailbox for later processing. By default, response messages are + returned to the initial sender but this can be overridden by Producer + implementations (see also description of the ``routeResponse`` method + in :ref:`camel-custom-processing`). + +However, asynchronous two-way message exchanges, without allocating a thread for +the full duration of exchange, cannot be generically supported by Camel's +asynchronous routing engine alone. This must be supported by the individual +`Camel components`_ (from which endpoints are created) as well. They must be +able to suspend any work started for request processing (thereby freeing threads +to do other work) and resume processing when the response is ready. This is +currently the case for a `subset of components`_ such as the `Jetty component`_. +All other Camel components can still be used, of course, but they will cause +allocation of a thread for the duration of an in-out message exchange. There's +also a :ref:`camel-async-example` that implements both, an asynchronous +consumer and an asynchronous producer, with the jetty component. + +.. _Camel components: http://camel.apache.org/components.html +.. _subset of components: http://camel.apache.org/asynchronous-routing-engine.html +.. _Jetty component: http://camel.apache.org/jetty.html - Actors (untyped) - Consumer publishing - Actors (untyped) - Typed actors - Consumers and the CamelService - Consumer un-publishing - Actors (untyped) - Typed actors - Acknowledgements - Actors (untyped) - Blocking exchanges - Consumer timeout - Typed actors - Actors (untyped) - Remote consumers - Actors (untyped) - Typed actors -Produce messages - Producer trait - Actors (untyped) - Custom Processing - Producer configuration options - Message correlation - Matching responses - ProducerTemplate - Actors (untyped) - Typed actors - Asynchronous routing -Fault tolerance -CamelService configuration -Standalone applications -Standalone Spring applications -Kernel mode Custom Camel routes -Akka Camel components -Access to actors -URI options -Message headers -Access to typed actors -Using Spring -Without Spring -Intercepting route construction -Actors (untyped) -Typed actors -Examples -Asynchronous routing and transformation example -Custom Camel route example -Publish-subcribe example -JMS -Cometd -Quartz Scheduler Example +=================== +In all the examples so far, routes to consumer actors have been automatically +constructed by akka-camel, when the actor was started. Although the default +route construction templates, used by akka-camel internally, are sufficient for +most use cases, some applications may require more specialized routes to actors. +The akka-camel module provides two mechanisms for customizing routes to actors, +which will be explained in this section. These are + +* Usage of :ref:`camel-components` to access actors. + Any Camel route can use these components to access Akka actors. + +* :ref:`camel-intercepting-route-construction` to actors. + Default routes to consumer actors are extended using predefined extension + points. + + +.. _camel-components: + +Akka Camel components +--------------------- + +Akka actors can be access from Camel routes using the `actor`_ Camel components, respectively. These components can be used to +access any Akka actor (not only consumer actors) from Camel routes, as described +in the following sections. + +.. _actor: https://github.com/akka/akka/blob/master/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala + +.. _access-to-actors: + +Access to actors +---------------- + +To access actors from custom Camel routes, the `actor`_ Camel +component should be used. It fully supports Camel's `asynchronous routing +engine`_. + +.. _actor: https://github.com/akka/akka/blob/master/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala +.. _asynchronous routing engine: http://camel.apache.org/asynchronous-routing-engine.html + +This component accepts the following endpoint URI format: + +* ``actor://path:[]?`` + +where ```` is the ``ActorPath`` to the actor. The ```` are +name-value pairs separated by ``&`` (i.e. ``name1=value1&name2=value2&...``). + + +URI options +^^^^^^^^^^^ + +The following URI options are supported: + ++--------------+----------+---------+-------------------------------------------+ +| Name | Type | Default | Description | ++==============+==========+=========+===========================================+ +| replyTimeout | Duration | false | The reply timeout, specified in the same | +| | | | way that you use the duration in akka, | +| | | | for instance ``10 seconds`` except that | +| | | | in the url it is handy to use a + | +| | | | between the amount and the unit, like | +| | | | for example ``200+millis`` | +| | | | | +| | | | See also :ref:`camel-timeout`. | ++--------------+----------+---------+-------------------------------------------+ +| autoack | Boolean | true | If set to true, in-only message exchanges | +| | | | are auto-acknowledged when the message is | +| | | | added to the actor's mailbox. If set to | +| | | | false, actors must acknowledge the | +| | | | receipt of the message. | +| | | | | +| | | | See also :ref:`camel-acknowledgements`. | ++--------------+----------+---------+-------------------------------------------+ + +Here's an actor endpoint URI example containing an actor uuid:: + + actor://path:akka://some-system/user/myconsumer?autoack=false&replyTimeout=100+millis + +In the following example, a custom route to an actor is created, using the +actor's path. the akka camel package contains an implicit ``toActorRouteDefinition`` that allows for a route to +reference an ``ActorRef`` directly as shown in the below example, The route starts from a `Jetty`_ endpoint and +ends at the target actor. + +.. includecode:: code/docs/camel/CustomRoute.scala#CustomRoute + +When a message is received on the jetty endpoint, it is routed to the Responder actor, which in return replies back to the client of +the HTTP request. + + +.. _camel-intercepting-route-construction: + +Intercepting route construction +------------------------------- + +The previous section, :ref:`camel-components`, explained how to setup a route to +an actor manually. It was the application's +responsibility to define the route and add it to the current CamelContext. This +section explains a more convenient way to define custom routes: akka-camel is +still setting up the routes to consumer actors (and adds these routes to the +current CamelContext) but applications can define extensions to these routes. +Extensions can be defined with Camel's `Java DSL`_ or `Scala DSL`_. For example, +an extension could be a custom error handler that redelivers messages from an +endpoint to an actor's bounded mailbox when the mailbox was full. + +.. _Java DSL: http://camel.apache.org/dsl.html +.. _Scala DSL: http://camel.apache.org/scala-dsl.html + +The following examples demonstrate how to extend a route to a consumer actor for +handling exceptions thrown by that actor. + +.. includecode:: code/docs/camel/CustomRoute.scala#ErrorThrowingConsumer + +The above ErrorThrowingConsumer sends the Failure back to the sender in preRestart +because the Exception that is thrown in the actor would +otherwise just crash the actor, by default the actor would be restarted, and the response would never reach the client of the Consumer. + +The akka-camel module creates a RouteDefinition instance by calling +from(endpointUri) on a Camel RouteBuilder (where endpointUri is the endpoint URI +of the consumer actor) and passes that instance as argument to the route +definition handler \*). The route definition handler then extends the route and +returns a ProcessorDefinition (in the above example, the ProcessorDefinition +returned by the end method. See the `org.apache.camel.model`__ package for +details). After executing the route definition handler, akka-camel finally calls +a to(targetActorUri) on the returned ProcessorDefinition to complete the +route to the consumer actor (where targetActorUri is the actor component URI as described in :ref:`access-to-actors`). + +\*) Before passing the RouteDefinition instance to the route definition handler, +akka-camel may make some further modifications to it. + +__ https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ + +.. _camel-examples: + +Examples +======== + +.. _camel-async-example: + +Asynchronous routing and transformation example +----------------------------------------------- + +This example demonstrates how to implement consumer and producer actors that +support :ref:`camel-asynchronous-routing` with their Camel endpoints. The sample +application transforms the content of the Akka homepage, http://akka.io, by +replacing every occurrence of *Akka* with *AKKA*. To run this example, add +a Boot class that starts the actors. After starting +the :ref:`microkernel-scala`, direct the browser to http://localhost:8875 and the +transformed Akka homepage should be displayed. Please note that this example +will probably not work if you're behind an HTTP proxy. + +The following figure gives an overview how the example actors interact with +external systems and with each other. A browser sends a GET request to +http://localhost:8875 which is the published endpoint of the ``HttpConsumer`` +actor. The ``HttpConsumer`` actor forwards the requests to the ``HttpProducer`` +actor which retrieves the Akka homepage from http://akka.io. The retrieved HTML +is then forwarded to the ``HttpTransformer`` actor which replaces all occurrences +of *Akka* with *AKKA*. The transformation result is sent back the HttpConsumer +which finally returns it to the browser. + +.. image:: ../modules/camel-async-interact.png + +Implementing the example actor classes and wiring them together is rather easy +as shown in the following snippet. + +.. includecode:: code/docs/camel/HttpExample.scala#HttpExample + +The `jetty endpoints`_ of HttpConsumer and HttpProducer support asynchronous +in-out message exchanges and do not allocate threads for the full duration of +the exchange. This is achieved by using `Jetty continuations`_ on the +consumer-side and by using `Jetty's asynchronous HTTP client`_ on the producer +side. The following high-level sequence diagram illustrates that. + +.. _jetty endpoints: http://camel.apache.org/jetty.html +.. _Jetty continuations: http://wiki.eclipse.org/Jetty/Feature/Continuations +.. _Jetty's asynchronous HTTP client: http://wiki.eclipse.org/Jetty/Tutorial/HttpClient + +.. image:: ../modules/camel-async-sequence.png + +Custom Camel route example +-------------------------- + +This section also demonstrates the combined usage of a ``Producer`` and a +``Consumer`` actor as well as the inclusion of a custom Camel route. The +following figure gives an overview. + +.. image:: ../modules/camel-custom-route.png + +* A consumer actor receives a message from an HTTP client + +* It forwards the message to another actor that transforms the message (encloses + the original message into hyphens) + +* The transformer actor forwards the transformed message to a producer actor + +* The producer actor sends the message to a custom Camel route beginning at the + ``direct:welcome`` endpoint + +* A processor (transformer) in the custom Camel route prepends "Welcome" to the + original message and creates a result message + +* The producer actor sends the result back to the consumer actor which returns + it to the HTTP client + + +The consumer, transformer and +producer actor implementations are as follows. + +.. includecode:: code/docs/camel/CustomRouteExample.scala#CustomRouteExample + + +The producer actor knows where to reply the message to because the consumer and +transformer actors have forwarded the original sender reference as well. The +application configuration and the route starting from direct:welcome are done in the code above. + +To run the example, add the lines shown in the example to a Boot class and the start the :ref:`microkernel-scala` and POST a message to +``http://localhost:8877/camel/welcome``. + +.. code-block:: none + + curl -H "Content-Type: text/plain" -d "Anke" http://localhost:8877/camel/welcome + +The response should be: + +.. code-block:: none + + Welcome - Anke - + +Quartz Scheduler Example +------------------------ + +Here is an example showing how simple is to implement a cron-style scheduler by +using the Camel Quartz component in Akka. + +The following example creates a "timer" actor which fires a message every 2 +seconds: + +.. includecode:: code/docs/camel/QuartzExample.scala#Quartz + +For more information about the Camel Quartz component, see here: +http://camel.apache.org/quartz.html \ No newline at end of file diff --git a/akka-docs/scala/code/docs/camel/CustomRoute.scala b/akka-docs/scala/code/docs/camel/CustomRoute.scala new file mode 100644 index 0000000000..ad268a0a6b --- /dev/null +++ b/akka-docs/scala/code/docs/camel/CustomRoute.scala @@ -0,0 +1,58 @@ +package docs.camel + +import akka.camel.CamelMessage +import akka.actor.Status.Failure + + +object CustomRoute { + { + //#CustomRoute + import akka.actor.{Props, ActorSystem, Actor, ActorRef} + import akka.camel.{CamelMessage, CamelExtension} + import org.apache.camel.builder.RouteBuilder + import akka.camel._ + class Responder extends Actor { + def receive = { + case msg: CamelMessage ⇒ + sender ! (msg.mapBody { + body: String ⇒ "received %s" format body + }) + } + } + + class CustomRouteBuilder(system: ActorSystem, responder:ActorRef) extends RouteBuilder { + def configure { + from("jetty:http://localhost:8877/camel/custom").to(responder) + } + } + val system = ActorSystem("some-system") + val camel = CamelExtension(system) + val responder = system.actorOf(Props[Responder], name = "TestResponder") + camel.context.addRoutes(new CustomRouteBuilder(system, responder)) + //#CustomRoute + + } + { + //#ErrorThrowingConsumer + import akka.camel.Consumer + + import org.apache.camel.builder.Builder + import org.apache.camel.model.RouteDefinition + + class ErrorThrowingConsumer(override val endpointUri: String) extends Consumer { + def receive = { + case msg: CamelMessage ⇒ throw new Exception("error: %s" format msg.body) + } + override def onRouteDefinition(rd: RouteDefinition) = { + // Catch any exception and handle it by returning the exception message as response + rd.onException(classOf[Exception]).handled(true).transform(Builder.exceptionMessage).end + } + + final override def preRestart(reason: Throwable, message: Option[Any]) { + sender ! Failure(reason) + } + } + //#ErrorThrowingConsumer + } + +} diff --git a/akka-docs/scala/code/docs/camel/CustomRouteExample.scala b/akka-docs/scala/code/docs/camel/CustomRouteExample.scala new file mode 100644 index 0000000000..96a9a7f667 --- /dev/null +++ b/akka-docs/scala/code/docs/camel/CustomRouteExample.scala @@ -0,0 +1,51 @@ +package docs.camel + + +object CustomRouteExample { + { + //#CustomRouteExample + import akka.actor.{Actor, ActorRef, Props, ActorSystem} + import akka.camel.{CamelMessage, Consumer, Producer, CamelExtension} + import org.apache.camel.builder.RouteBuilder + import org.apache.camel.{Exchange, Processor} + + class Consumer3(transformer: ActorRef) extends Actor with Consumer { + def endpointUri = "jetty:http://0.0.0.0:8877/camel/welcome" + + def receive = { + // Forward a string representation of the message body to transformer + case msg: CamelMessage => transformer.forward(msg.bodyAs[String]) + } + } + + class Transformer(producer: ActorRef) extends Actor { + def receive = { + // example: transform message body "foo" to "- foo -" and forward result to producer + case msg: CamelMessage => producer.forward(msg.mapBody((body: String) => "- %s -" format body)) + } + } + + class Producer1 extends Actor with Producer { + def endpointUri = "direct:welcome" + } + + class CustomRouteBuilder extends RouteBuilder { + def configure { + from("direct:welcome").process(new Processor() { + def process(exchange: Exchange) { + // Create a 'welcome' message from the input message + exchange.getOut.setBody("Welcome %s" format exchange.getIn.getBody) + } + }) + } + } + // the below lines can be added to a Boot class, so that you can run the example from a MicroKernel + val system = ActorSystem("some-system") + val producer = system.actorOf(Props[Producer1]) + val mediator = system.actorOf(Props(new Transformer(producer))) + val consumer = system.actorOf(Props(new Consumer3(mediator))) + CamelExtension(system).context.addRoutes(new CustomRouteBuilder) + //#CustomRouteExample + } + +} diff --git a/akka-docs/scala/code/docs/camel/HttpExample.scala b/akka-docs/scala/code/docs/camel/HttpExample.scala new file mode 100644 index 0000000000..d57f3707e5 --- /dev/null +++ b/akka-docs/scala/code/docs/camel/HttpExample.scala @@ -0,0 +1,47 @@ +package docs.camel + +object HttpExample { + + { + //#HttpExample + import org.apache.camel.Exchange + import akka.actor.{Actor, ActorRef, Props, ActorSystem} + import akka.camel.{Producer, CamelMessage, Consumer} + import akka.actor.Status.Failure + + class HttpConsumer(producer: ActorRef) extends Consumer { + def endpointUri = "jetty:http://0.0.0.0:8875/" + + def receive = { + case msg => producer forward msg + } + } + + class HttpProducer(transformer: ActorRef) extends Actor with Producer { + def endpointUri = "jetty://http://akka.io/?bridgeEndpoint=true" + + override def transformOutgoingMessage(msg: Any) = msg match { + case msg: CamelMessage => msg.addHeaders(msg.headers(Set(Exchange.HTTP_PATH))) + } + + override def routeResponse(msg: Any) { transformer forward msg } + } + + class HttpTransformer extends Actor { + def receive = { + case msg: CamelMessage => sender ! (msg.mapBody {body: String => body replaceAll ("Akka ", "AKKA ")}) + case msg: Failure => sender ! msg + } + } + + // Create the actors. this can be done in a Boot class so you can + // run the example in the MicroKernel. just add the below three lines to your boot class. + val system = ActorSystem("some-system") + val httpTransformer = system.actorOf(Props[HttpTransformer]) + val httpProducer = system.actorOf(Props(new HttpProducer(httpTransformer))) + val httpConsumer = system.actorOf(Props(new HttpConsumer(httpProducer))) + //#HttpExample + + } + +} diff --git a/akka-docs/scala/code/docs/camel/Producers.scala b/akka-docs/scala/code/docs/camel/Producers.scala index bc3576e93a..5229f83914 100644 --- a/akka-docs/scala/code/docs/camel/Producers.scala +++ b/akka-docs/scala/code/docs/camel/Producers.scala @@ -1,14 +1,127 @@ package docs.camel +import akka.camel.CamelExtension + object Producers { { //#Producer1 - import akka.camel.Producer import akka.actor.Actor + import akka.actor.{Props, ActorSystem} + import akka.camel.{Producer, CamelMessage} + import akka.util.Timeout class Producer1 extends Actor with Producer { def endpointUri = "http://localhost:8080/news" } //#Producer1 + //#AskProducer + import akka.pattern.ask + import akka.util.duration._ + implicit val timeout = Timeout(10 seconds) + + val system = ActorSystem("some-system") + val producer = system.actorOf(Props[Producer1]) + val future = producer.ask("some request").mapTo[CamelMessage] + //#AskProducer } -} + { + //#RouteResponse + import akka.actor.{Actor, ActorRef} + import akka.camel.{Producer, CamelMessage} + import akka.actor.{Props, ActorSystem} + + class ResponseReceiver extends Actor { + def receive = { + case msg:CamelMessage ⇒ + // do something with the forwarded response + } + } + + class Forwarder(uri: String, target: ActorRef) extends Actor with Producer { + def endpointUri = uri + + override def routeResponse(msg: Any) { target forward msg } + } + val system = ActorSystem("some-system") + val receiver = system.actorOf(Props[ResponseReceiver]) + val forwardResponse = system.actorOf(Props(new Forwarder("http://localhost:8080/news/akka",receiver))) + // the Forwarder sends out a request to the web page and forwards the response to + // the ResponseReceiver + forwardResponse ! "some request" + //#RouteResponse + } + { + //#TransformOutgoingMessage + import akka.actor.Actor + import akka.camel.{Producer, CamelMessage} + + class Transformer(uri: String) extends Actor with Producer { + def endpointUri = uri + + def upperCase(msg:CamelMessage) = msg.mapBody { + body: String ⇒ body.toUpperCase + } + + override def transformOutgoingMessage(msg: Any) = msg match { + case msg: CamelMessage ⇒ upperCase(msg) + } + } + //#TransformOutgoingMessage + } + { + //#Oneway + import akka.actor.{Actor, Props, ActorSystem} + import akka.camel.Producer + + class OnewaySender(uri:String) extends Actor with Producer { + def endpointUri = uri + override def oneway: Boolean = true + } + + val system = ActorSystem("some-system") + val producer = system.actorOf(Props(new OnewaySender("activemq:FOO.BAR"))) + producer ! "Some message" + //#Oneway + + } + { + //#Correlate + import akka.camel.{Producer, CamelMessage} + import akka.actor.Actor + import akka.actor.{Props, ActorSystem} + + class Producer2 extends Actor with Producer { + def endpointUri = "activemg:FOO.BAR" + } + val system = ActorSystem("some-system") + val producer = system.actorOf(Props[Producer2]) + + producer ! CamelMessage("bar", Map(CamelMessage.MessageExchangeId -> "123")) + //#Correlate + } + { + //#ProducerTemplate + import akka.actor.Actor + class MyActor extends Actor { + def receive = { + case msg ⇒ + val template = CamelExtension(context.system).template + template.sendBody("direct:news", msg) + } + } + //#ProducerTemplate + } + { + //#RequestProducerTemplate + import akka.actor.Actor + class MyActor extends Actor { + def receive = { + case msg ⇒ + val template = CamelExtension(context.system).template + sender ! template.requestBody("direct:news", msg) + } + } + //#RequestProducerTemplate + } + +} \ No newline at end of file diff --git a/akka-docs/scala/code/docs/camel/PublishSubscribe.scala b/akka-docs/scala/code/docs/camel/PublishSubscribe.scala new file mode 100644 index 0000000000..ab604b44dd --- /dev/null +++ b/akka-docs/scala/code/docs/camel/PublishSubscribe.scala @@ -0,0 +1,48 @@ +package docs.camel + + +object PublishSubscribe { + { + //#PubSub + import akka.actor.{Actor, ActorRef, ActorSystem, Props} + import akka.camel.{Producer, CamelMessage, Consumer} + + class Subscriber(name:String, uri: String) extends Actor with Consumer { + def endpointUri = uri + + protected def receive = { + case msg: CamelMessage => println("%s received: %s" format (name, msg.body)) + } + } + + class Publisher(name: String, uri: String) extends Actor with Producer { + + def endpointUri = uri + + // one-way communication with JMS + override def oneway = true + } + + class PublisherBridge(uri: String, publisher: ActorRef) extends Actor with Consumer { + def endpointUri = uri + + protected def receive = { + case msg: CamelMessage => { + publisher ! msg.bodyAs[String] + sender ! ("message published") + } + } + } + + // Add below to a Boot class + // Setup publish/subscribe example + val system = ActorSystem("some-system") + val jmsUri = "jms:topic:test" + val jmsSubscriber1 = system.actorOf(Props(new Subscriber("jms-subscriber-1", jmsUri))) + val jmsSubscriber2 = system.actorOf(Props(new Subscriber("jms-subscriber-2", jmsUri))) + val jmsPublisher = system.actorOf(Props(new Publisher("jms-publisher", jmsUri))) + val jmsPublisherBridge = system.actorOf(Props(new PublisherBridge("jetty:http://0.0.0.0:8877/camel/pub/jms", jmsPublisher))) + //#PubSub + + } +} diff --git a/akka-docs/scala/code/docs/camel/QuartzExample.scala b/akka-docs/scala/code/docs/camel/QuartzExample.scala new file mode 100644 index 0000000000..b25e38630c --- /dev/null +++ b/akka-docs/scala/code/docs/camel/QuartzExample.scala @@ -0,0 +1,35 @@ +package docs.camel + + +object QuartzExample { + + { + //#Quartz + import akka.actor.{ActorSystem, Props} + + import akka.camel.{Consumer} + + class MyQuartzActor extends Consumer { + + def endpointUri = "quartz://example?cron=0/2+*+*+*+*+?" + + def receive = { + + case msg => println("==============> received %s " format msg) + + } // end receive + + } // end MyQuartzActor + + object MyQuartzActor { + + def main(str: Array[String]) { + val system = ActorSystem("my-quartz-system") + system.actorOf(Props[MyQuartzActor]) + } // end main + + } // end MyQuartzActor + //#Quartz + } + +} From 869ce07c741f5a61680cee210d15eeb85fad79e1 Mon Sep 17 00:00:00 2001 From: RayRoestenburg Date: Thu, 12 Jul 2012 13:55:56 +0200 Subject: [PATCH 3/6] processed review --- akka-docs/scala/camel.rst | 34 ++++++++++++++++------------------ 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/akka-docs/scala/camel.rst b/akka-docs/scala/camel.rst index 00f7b6eb10..acffeadd90 100644 --- a/akka-docs/scala/camel.rst +++ b/akka-docs/scala/camel.rst @@ -93,13 +93,12 @@ matching, transformation, serialization or storage. In the above example of the the XML message is put in the body of a newly created Camel Message with an empty set of headers. You can also create a CamelMessage yourself with the appropriate body and headers as you see fit. -__ https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/main/java/org/apache/camel/Message.java +__ https://svn.apache.org/repos/asf/camel/tags/camel-2.8.0/camel-core/src/main/java/org/apache/camel/Message.java CamelExtension -------------- The akka-camel module is implemented as an Akka Extension, the ``CamelExtension`` object. -Extensions will only be loaded once per ``ActorSystem``, which will be managed by Akka. There is a one to one relationship between the ``CamelExtension`` and -the ``ActorSystem``, there can be only one ``CamelExtension`` for one ``ActorSystem``. +Extensions will only be loaded once per ``ActorSystem``, which will be managed by Akka. The ``CamelExtension`` object provides access to the `Camel`_ trait. The `Camel`_ trait in turn provides access to two important Apache Camel objects, the `CamelContext`_ and the `ProducerTemplate`_. Below you can see how you can get access to these Apache Camel objects. @@ -113,12 +112,12 @@ Below an example on how to add the ActiveMQ component to the `CamelContext`_, wh .. includecode:: code/docs/camel/Introduction.scala#CamelExtensionAddComponent The `CamelContext`_ joins the lifecycle of the ``ActorSystem`` and ``CamelExtension`` it is associated with; the `CamelContext`_ is started when -the ``CamelExtension`` is created, and it is shutdown when the associated ``ActorSystem`` is shut down. The same is true for the `ProducerTemplate`_. +the ``CamelExtension`` is created, and it is shut down when the associated ``ActorSystem`` is shut down. The same is true for the `ProducerTemplate`_. The ``CamelExtension`` is used by both `Producer` and `Consumer` actors to interact with Apache Camel internally. -When Akka creates a `Consumer` actor, the `Consumer` is published at its -Camel endpoint (more precisely, the route is added to the `CamelContext`_ from the `Endpoint`_ to the actor). -When Akka creates a `Producer` actor, a `SendProcessor`_ and `Endpoint`_ are created so that the Producer can send messages to it. +You can access the ``CamelExtension`` inside a `Producer` or a `Consumer` using the ``camel`` definition, or get straight at the `CamelContext` using the ``camelContext`` definition. +Actors are created and started asynchronously. When a `Consumer` actor is created, the `Consumer` is published at its Camel endpoint (more precisely, the route is added to the `CamelContext`_ from the `Endpoint`_ to the actor). +When a `Producer` actor is created, a `SendProcessor`_ and `Endpoint`_ are created so that the Producer can send messages to it. Publication is done asynchronously; setting up an endpoint may still be in progress after you have requested the actor to be created. Some Camel components can take a while to startup, and in some cases you might want to know when the endpoints are activated and ready to be used. The `Camel`_ trait allows you to find out when the endpoint is activated or deactivated. @@ -134,10 +133,10 @@ Deactivation of a Consumer or a Producer actor happens when the actor is termina A ``DeActivationTimeoutException`` is thrown if the associated camel objects could not be deactivated within the specified timeout. .. _Camel: http://github.com/akka/akka/blob/master/akka-camel/src/main/scala/akka/camel/Camel.scala -.. _CamelContext: https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java -.. _ProducerTemplate: https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java -.. _SendProcessor: https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java -.. _Endpoint: https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java +.. _CamelContext: https://svn.apache.org/repos/asf/camel/tags/camel-2.8.0/camel-core/src/main/java/org/apache/camel/CamelContext.java +.. _ProducerTemplate: https://svn.apache.org/repos/asf/camel/tags/camel-2.8.0/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java +.. _SendProcessor: https://svn.apache.org/repos/asf/camel/tags/camel-2.8.0/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java +.. _Endpoint: https://svn.apache.org/repos/asf/camel/tags/camel-2.8.0/camel-core/src/main/java/org/apache/camel/Endpoint.java Dependencies ============ @@ -203,7 +202,7 @@ body. .. _camel-acknowledgements: -Acknowledgements +Delivery acknowledgements ---------------- With in-out message exchanges, clients usually know that a message exchange is @@ -232,8 +231,7 @@ Camel Exchanges (and their corresponding endpoints) that support two-way communi an actor before returning it to the initiating client. For some endpoint types, timeout values can be defined in an endpoint-specific way which is described in the documentation of the individual `Camel -components`_. Another option is to configure timeouts on the level of consumer -actors. +components`_. Another option is to configure timeouts on the level of consumer actors. .. _Camel components: http://camel.apache.org/components.html @@ -244,7 +242,7 @@ result in the `Exchange`_ failing with a TimeoutException set on the failure of The timeout on the consumer actor can be overridden with the ``replyTimeout``, as shown below. .. includecode:: code/docs/camel/Consumers.scala#Consumer4 -.. _Exchange: https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java +.. _Exchange: https://svn.apache.org/repos/asf/camel/tags/camel-2.8.0/camel-core/src/main/java/org/apache/camel/Exchange.java Producer Actors =============== @@ -320,7 +318,7 @@ For initiating a a two-way message exchange, one of the .. includecode:: code/docs/camel/Producers.scala#RequestProducerTemplate .. _Producer: http://github.com/akka/akka/blob/master/akka-camel/src/main/scala/akka/camel/Producer.scala -.. _ProducerTemplate: https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java +.. _ProducerTemplate: https://svn.apache.org/repos/asf/camel/tags/camel-2.8.0/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java .. _camel-asynchronous-routing: @@ -366,7 +364,7 @@ constructed by akka-camel, when the actor was started. Although the default route construction templates, used by akka-camel internally, are sufficient for most use cases, some applications may require more specialized routes to actors. The akka-camel module provides two mechanisms for customizing routes to actors, -which will be explained in this section. These are +which will be explained in this section. These are: * Usage of :ref:`camel-components` to access actors. Any Camel route can use these components to access Akka actors. @@ -488,7 +486,7 @@ route to the consumer actor (where targetActorUri is the actor component URI as \*) Before passing the RouteDefinition instance to the route definition handler, akka-camel may make some further modifications to it. -__ https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ +__ https://svn.apache.org/repos/asf/camel/tags/camel-2.8.0/camel-core/src/main/java/org/apache/camel/model/ .. _camel-examples: From ebe0cc05c9358d737f952ea9f66b5b4ccbb9a72b Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Thu, 12 Jul 2012 15:00:11 +0200 Subject: [PATCH 4/6] The unborkinging --- .../scala/code/docs/camel/Consumers.scala | 2 +- .../scala/code/docs/camel/CustomRoute.scala | 7 ++-- .../code/docs/camel/CustomRouteExample.scala | 11 +++--- .../scala/code/docs/camel/HttpExample.scala | 12 +++--- .../scala/code/docs/camel/Introduction.scala | 6 +-- .../scala/code/docs/camel/Producers.scala | 38 +++++++++---------- .../code/docs/camel/PublishSubscribe.scala | 17 ++++----- .../scala/code/docs/camel/QuartzExample.scala | 7 ++-- 8 files changed, 48 insertions(+), 52 deletions(-) diff --git a/akka-docs/scala/code/docs/camel/Consumers.scala b/akka-docs/scala/code/docs/camel/Consumers.scala index 289936c4dd..85b46646e2 100644 --- a/akka-docs/scala/code/docs/camel/Consumers.scala +++ b/akka-docs/scala/code/docs/camel/Consumers.scala @@ -39,7 +39,7 @@ object Consumers { def endpointUri = "jms:queue:test" def receive = { - case msg:CamelMessage ⇒ + case msg: CamelMessage ⇒ sender ! Ack // on success // .. diff --git a/akka-docs/scala/code/docs/camel/CustomRoute.scala b/akka-docs/scala/code/docs/camel/CustomRoute.scala index ad268a0a6b..c51d3e1fc4 100644 --- a/akka-docs/scala/code/docs/camel/CustomRoute.scala +++ b/akka-docs/scala/code/docs/camel/CustomRoute.scala @@ -3,12 +3,11 @@ package docs.camel import akka.camel.CamelMessage import akka.actor.Status.Failure - object CustomRoute { { //#CustomRoute - import akka.actor.{Props, ActorSystem, Actor, ActorRef} - import akka.camel.{CamelMessage, CamelExtension} + import akka.actor.{ Props, ActorSystem, Actor, ActorRef } + import akka.camel.{ CamelMessage, CamelExtension } import org.apache.camel.builder.RouteBuilder import akka.camel._ class Responder extends Actor { @@ -20,7 +19,7 @@ object CustomRoute { } } - class CustomRouteBuilder(system: ActorSystem, responder:ActorRef) extends RouteBuilder { + class CustomRouteBuilder(system: ActorSystem, responder: ActorRef) extends RouteBuilder { def configure { from("jetty:http://localhost:8877/camel/custom").to(responder) } diff --git a/akka-docs/scala/code/docs/camel/CustomRouteExample.scala b/akka-docs/scala/code/docs/camel/CustomRouteExample.scala index 96a9a7f667..15b93defaf 100644 --- a/akka-docs/scala/code/docs/camel/CustomRouteExample.scala +++ b/akka-docs/scala/code/docs/camel/CustomRouteExample.scala @@ -1,27 +1,26 @@ package docs.camel - object CustomRouteExample { { //#CustomRouteExample - import akka.actor.{Actor, ActorRef, Props, ActorSystem} - import akka.camel.{CamelMessage, Consumer, Producer, CamelExtension} + import akka.actor.{ Actor, ActorRef, Props, ActorSystem } + import akka.camel.{ CamelMessage, Consumer, Producer, CamelExtension } import org.apache.camel.builder.RouteBuilder - import org.apache.camel.{Exchange, Processor} + import org.apache.camel.{ Exchange, Processor } class Consumer3(transformer: ActorRef) extends Actor with Consumer { def endpointUri = "jetty:http://0.0.0.0:8877/camel/welcome" def receive = { // Forward a string representation of the message body to transformer - case msg: CamelMessage => transformer.forward(msg.bodyAs[String]) + case msg: CamelMessage ⇒ transformer.forward(msg.bodyAs[String]) } } class Transformer(producer: ActorRef) extends Actor { def receive = { // example: transform message body "foo" to "- foo -" and forward result to producer - case msg: CamelMessage => producer.forward(msg.mapBody((body: String) => "- %s -" format body)) + case msg: CamelMessage ⇒ producer.forward(msg.mapBody((body: String) ⇒ "- %s -" format body)) } } diff --git a/akka-docs/scala/code/docs/camel/HttpExample.scala b/akka-docs/scala/code/docs/camel/HttpExample.scala index d57f3707e5..55f200d5ef 100644 --- a/akka-docs/scala/code/docs/camel/HttpExample.scala +++ b/akka-docs/scala/code/docs/camel/HttpExample.scala @@ -5,15 +5,15 @@ object HttpExample { { //#HttpExample import org.apache.camel.Exchange - import akka.actor.{Actor, ActorRef, Props, ActorSystem} - import akka.camel.{Producer, CamelMessage, Consumer} + import akka.actor.{ Actor, ActorRef, Props, ActorSystem } + import akka.camel.{ Producer, CamelMessage, Consumer } import akka.actor.Status.Failure class HttpConsumer(producer: ActorRef) extends Consumer { def endpointUri = "jetty:http://0.0.0.0:8875/" def receive = { - case msg => producer forward msg + case msg ⇒ producer forward msg } } @@ -21,7 +21,7 @@ object HttpExample { def endpointUri = "jetty://http://akka.io/?bridgeEndpoint=true" override def transformOutgoingMessage(msg: Any) = msg match { - case msg: CamelMessage => msg.addHeaders(msg.headers(Set(Exchange.HTTP_PATH))) + case msg: CamelMessage ⇒ msg.addHeaders(msg.headers(Set(Exchange.HTTP_PATH))) } override def routeResponse(msg: Any) { transformer forward msg } @@ -29,8 +29,8 @@ object HttpExample { class HttpTransformer extends Actor { def receive = { - case msg: CamelMessage => sender ! (msg.mapBody {body: String => body replaceAll ("Akka ", "AKKA ")}) - case msg: Failure => sender ! msg + case msg: CamelMessage ⇒ sender ! (msg.mapBody { body: String ⇒ body replaceAll ("Akka ", "AKKA ") }) + case msg: Failure ⇒ sender ! msg } } diff --git a/akka-docs/scala/code/docs/camel/Introduction.scala b/akka-docs/scala/code/docs/camel/Introduction.scala index 92ddccfc51..3c9561d22c 100644 --- a/akka-docs/scala/code/docs/camel/Introduction.scala +++ b/akka-docs/scala/code/docs/camel/Introduction.scala @@ -1,6 +1,6 @@ package docs.camel -import akka.actor.{Props, ActorSystem} +import akka.actor.{ Props, ActorSystem } import akka.camel.CamelExtension object Introduction { @@ -89,14 +89,14 @@ object Introduction { val camel = CamelExtension(system) val actorRef = system.actorOf(Props[MyEndpoint]) // get a future reference to the activation of the endpoint of the Consumer Actor - val activationFuture = camel.activationFutureFor(actorRef,10 seconds) + val activationFuture = camel.activationFutureFor(actorRef, 10 seconds) // or, block wait on the activation camel.awaitActivation(actorRef, 10 seconds) //#CamelActivation //#CamelDeactivation system.stop(actorRef) // get a future reference to the deactivation of the endpoint of the Consumer Actor - val deactivationFuture = camel.activationFutureFor(actorRef,10 seconds) + val deactivationFuture = camel.activationFutureFor(actorRef, 10 seconds) // or, block wait on the deactivation camel.awaitDeactivation(actorRef, 10 seconds) //#CamelDeactivation diff --git a/akka-docs/scala/code/docs/camel/Producers.scala b/akka-docs/scala/code/docs/camel/Producers.scala index 5229f83914..58ee243d7b 100644 --- a/akka-docs/scala/code/docs/camel/Producers.scala +++ b/akka-docs/scala/code/docs/camel/Producers.scala @@ -6,8 +6,8 @@ object Producers { { //#Producer1 import akka.actor.Actor - import akka.actor.{Props, ActorSystem} - import akka.camel.{Producer, CamelMessage} + import akka.actor.{ Props, ActorSystem } + import akka.camel.{ Producer, CamelMessage } import akka.util.Timeout class Producer1 extends Actor with Producer { @@ -26,14 +26,14 @@ object Producers { } { //#RouteResponse - import akka.actor.{Actor, ActorRef} - import akka.camel.{Producer, CamelMessage} - import akka.actor.{Props, ActorSystem} + import akka.actor.{ Actor, ActorRef } + import akka.camel.{ Producer, CamelMessage } + import akka.actor.{ Props, ActorSystem } class ResponseReceiver extends Actor { def receive = { - case msg:CamelMessage ⇒ - // do something with the forwarded response + case msg: CamelMessage ⇒ + // do something with the forwarded response } } @@ -44,7 +44,7 @@ object Producers { } val system = ActorSystem("some-system") val receiver = system.actorOf(Props[ResponseReceiver]) - val forwardResponse = system.actorOf(Props(new Forwarder("http://localhost:8080/news/akka",receiver))) + val forwardResponse = system.actorOf(Props(new Forwarder("http://localhost:8080/news/akka", receiver))) // the Forwarder sends out a request to the web page and forwards the response to // the ResponseReceiver forwardResponse ! "some request" @@ -53,12 +53,12 @@ object Producers { { //#TransformOutgoingMessage import akka.actor.Actor - import akka.camel.{Producer, CamelMessage} + import akka.camel.{ Producer, CamelMessage } class Transformer(uri: String) extends Actor with Producer { def endpointUri = uri - def upperCase(msg:CamelMessage) = msg.mapBody { + def upperCase(msg: CamelMessage) = msg.mapBody { body: String ⇒ body.toUpperCase } @@ -70,10 +70,10 @@ object Producers { } { //#Oneway - import akka.actor.{Actor, Props, ActorSystem} + import akka.actor.{ Actor, Props, ActorSystem } import akka.camel.Producer - class OnewaySender(uri:String) extends Actor with Producer { + class OnewaySender(uri: String) extends Actor with Producer { def endpointUri = uri override def oneway: Boolean = true } @@ -86,9 +86,9 @@ object Producers { } { //#Correlate - import akka.camel.{Producer, CamelMessage} + import akka.camel.{ Producer, CamelMessage } import akka.actor.Actor - import akka.actor.{Props, ActorSystem} + import akka.actor.{ Props, ActorSystem } class Producer2 extends Actor with Producer { def endpointUri = "activemg:FOO.BAR" @@ -103,11 +103,11 @@ object Producers { //#ProducerTemplate import akka.actor.Actor class MyActor extends Actor { - def receive = { - case msg ⇒ - val template = CamelExtension(context.system).template - template.sendBody("direct:news", msg) - } + def receive = { + case msg ⇒ + val template = CamelExtension(context.system).template + template.sendBody("direct:news", msg) + } } //#ProducerTemplate } diff --git a/akka-docs/scala/code/docs/camel/PublishSubscribe.scala b/akka-docs/scala/code/docs/camel/PublishSubscribe.scala index ab604b44dd..430fcdbb3e 100644 --- a/akka-docs/scala/code/docs/camel/PublishSubscribe.scala +++ b/akka-docs/scala/code/docs/camel/PublishSubscribe.scala @@ -1,17 +1,16 @@ package docs.camel - object PublishSubscribe { { //#PubSub - import akka.actor.{Actor, ActorRef, ActorSystem, Props} - import akka.camel.{Producer, CamelMessage, Consumer} + import akka.actor.{ Actor, ActorRef, ActorSystem, Props } + import akka.camel.{ Producer, CamelMessage, Consumer } - class Subscriber(name:String, uri: String) extends Actor with Consumer { + class Subscriber(name: String, uri: String) extends Actor with Consumer { def endpointUri = uri - protected def receive = { - case msg: CamelMessage => println("%s received: %s" format (name, msg.body)) + def receive = { + case msg: CamelMessage ⇒ println("%s received: %s" format (name, msg.body)) } } @@ -26,8 +25,8 @@ object PublishSubscribe { class PublisherBridge(uri: String, publisher: ActorRef) extends Actor with Consumer { def endpointUri = uri - protected def receive = { - case msg: CamelMessage => { + def receive = { + case msg: CamelMessage ⇒ { publisher ! msg.bodyAs[String] sender ! ("message published") } @@ -40,7 +39,7 @@ object PublishSubscribe { val jmsUri = "jms:topic:test" val jmsSubscriber1 = system.actorOf(Props(new Subscriber("jms-subscriber-1", jmsUri))) val jmsSubscriber2 = system.actorOf(Props(new Subscriber("jms-subscriber-2", jmsUri))) - val jmsPublisher = system.actorOf(Props(new Publisher("jms-publisher", jmsUri))) + val jmsPublisher = system.actorOf(Props(new Publisher("jms-publisher", jmsUri))) val jmsPublisherBridge = system.actorOf(Props(new PublisherBridge("jetty:http://0.0.0.0:8877/camel/pub/jms", jmsPublisher))) //#PubSub diff --git a/akka-docs/scala/code/docs/camel/QuartzExample.scala b/akka-docs/scala/code/docs/camel/QuartzExample.scala index b25e38630c..2735511d3c 100644 --- a/akka-docs/scala/code/docs/camel/QuartzExample.scala +++ b/akka-docs/scala/code/docs/camel/QuartzExample.scala @@ -1,13 +1,12 @@ package docs.camel - object QuartzExample { { //#Quartz - import akka.actor.{ActorSystem, Props} + import akka.actor.{ ActorSystem, Props } - import akka.camel.{Consumer} + import akka.camel.{ Consumer } class MyQuartzActor extends Consumer { @@ -15,7 +14,7 @@ object QuartzExample { def receive = { - case msg => println("==============> received %s " format msg) + case msg ⇒ println("==============> received %s " format msg) } // end receive From c32d22d7b840f5352e99f0519577a881c146e9af Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Thu, 12 Jul 2012 15:59:58 +0200 Subject: [PATCH 5/6] Fixing erronous docs for Future.reduce --- akka-actor/src/main/scala/akka/dispatch/Future.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index e3c7f8348c..f799322b10 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -127,7 +127,7 @@ object Futures { /** * Java API. - * Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first + Reduces the results of the supplied futures and binary function. */ def reduce[T <: AnyRef, R >: T](futures: JIterable[Future[T]], fun: akka.japi.Function2[R, T, R], executor: ExecutionContext): Future[R] = Future.reduce[T, R](scala.collection.JavaConversions.iterableAsScalaIterable(futures))(fun.apply)(executor) @@ -266,7 +266,7 @@ object Future { } /** - * Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first + * Reduces the results of the supplied futures and binary operation. * Example: *
    *   val result = Await.result(Futures.reduce(futures)(_ + _), 5 seconds)

From 448687d692bc885785d0d12d1d716cabbb4d68f0 Mon Sep 17 00:00:00 2001
From: Viktor Klang 
Date: Thu, 12 Jul 2012 17:16:19 +0200
Subject: [PATCH 6/6] Adding the OKResponse definition to the IO reST docs.

---
 akka-docs/scala/io.rst | 5 +++++
 1 file changed, 5 insertions(+)

diff --git a/akka-docs/scala/io.rst b/akka-docs/scala/io.rst
index 9063e010f5..ba12324aa5 100644
--- a/akka-docs/scala/io.rst
+++ b/akka-docs/scala/io.rst
@@ -234,6 +234,11 @@ And it's companion object:
 .. includecode:: code/docs/io/HTTPServer.scala
    :include: actor-companion
 
+And the OKResponse:
+
+.. includecode:: code/docs/io/HTTPServer.scala
+   :include: ok-response
+
 A ``main`` method to start everything up:
 
 .. includecode:: code/docs/io/HTTPServer.scala