diff --git a/akka-docs/scala/camel.rst b/akka-docs/scala/camel.rst index be5fd589a0..00f7b6eb10 100644 --- a/akka-docs/scala/camel.rst +++ b/akka-docs/scala/camel.rst @@ -201,6 +201,8 @@ body. .. _Message: http://github.com/akka/akka/blob/master/akka-camel/src/main/scala/akka/camel/CamelMessage.scala +.. _camel-acknowledgements: + Acknowledgements ---------------- @@ -221,6 +223,8 @@ acknowledgement). .. includecode:: code/docs/camel/Consumers.scala#Consumer3 +.. _camel-timeout: + Consumer timeout ---------------- @@ -248,60 +252,350 @@ Producer Actors For sending messages to Camel endpoints, actors need to mixin the `Producer`_ trait and implement the endpointUri method. .. includecode:: code/docs/camel/Producers.scala#Producer1 + +Producer1 inherits a default implementation of the receive method from the +Producer trait. To customize a producer actor's default behavior it is +recommended to override the `Producer`_.transformResponse and +`Producer`_.transformOutgoingMessage methods. This is explained later in more detail. +Actors should not override the default `Producer`_.receive method. + +Any message sent to a `Producer`_ actor will be sent to +the associated Camel endpoint, in the above example to +``http://localhost:8080/news``. The `Producer`_ always sends messages asynchronously. Response messages (if supported by the +configured endpoint) will, by default, be returned to the original sender. The +following example uses the ask pattern to send a message to a +Producer actor and waits for a response. + +.. includecode:: code/docs/camel/Producers.scala#AskProducer + +The future contains the response CamelMessage, or an ``AkkaCamelException`` when an error occurred, which contains the headers of the response. + +.. _camel-custom-processing: + +Custom Processing +----------------- + +Instead of replying to the initial sender, producer actors can implement custom +response processing by overriding the routeResponse method. In the following example, the response +message is forwarded to a target actor instead of being replied to the original +sender. + +.. includecode:: code/docs/camel/Producers.scala#RouteResponse + +Before producing messages to endpoints, producer actors can pre-process them by +overriding the `Producer`_.transformOutgoingMessage method. + +.. includecode:: code/docs/camel/Producers.scala#TransformOutgoingMessage + +Producer configuration options +------------------------------ + +The interaction of producer actors with Camel endpoints can be configured to be +one-way or two-way (by initiating in-only or in-out message exchanges, +respectively). By default, the producer initiates an in-out message exchange +with the endpoint. For initiating an in-only exchange, producer actors have to override the oneway method to return true. + +.. includecode:: code/docs/camel/Producers.scala#Oneway + +Message correlation +------------------- + +To correlate request with response messages, applications can set the +Message.MessageExchangeId message header. + +.. includecode:: code/docs/camel/Producers.scala#Correlate + +ProducerTemplate +---------------- + +The `Producer`_ trait is a very +convenient way for actors to produce messages to Camel endpoints. Actors may also use a Camel `ProducerTemplate`_ for producing +messages to endpoints. + +.. includecode:: code/docs/camel/Producers.scala#ProducerTemplate + +For initiating a a two-way message exchange, one of the +``ProducerTemplate.request*`` methods must be used. + +.. includecode:: code/docs/camel/Producers.scala#RequestProducerTemplate + .. _Producer: http://github.com/akka/akka/blob/master/akka-camel/src/main/scala/akka/camel/Producer.scala +.. _ProducerTemplate: https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java +.. _camel-asynchronous-routing: +Asynchronous routing +==================== +Since Akka 0.10, in-out message exchanges between endpoints and actors are +designed to be asynchronous. This is the case for both, consumer and producer +actors. + +* A consumer endpoint sends request messages to its consumer actor using the ``!`` + (bang) operator and the actor returns responses with ``sender !`` once they are + ready. + +* A producer actor sends request messages to its endpoint using Camel's + asynchronous routing engine. Asynchronous responses are wrapped and added to the + producer actor's mailbox for later processing. By default, response messages are + returned to the initial sender but this can be overridden by Producer + implementations (see also description of the ``routeResponse`` method + in :ref:`camel-custom-processing`). + +However, asynchronous two-way message exchanges, without allocating a thread for +the full duration of exchange, cannot be generically supported by Camel's +asynchronous routing engine alone. This must be supported by the individual +`Camel components`_ (from which endpoints are created) as well. They must be +able to suspend any work started for request processing (thereby freeing threads +to do other work) and resume processing when the response is ready. This is +currently the case for a `subset of components`_ such as the `Jetty component`_. +All other Camel components can still be used, of course, but they will cause +allocation of a thread for the duration of an in-out message exchange. There's +also a :ref:`camel-async-example` that implements both, an asynchronous +consumer and an asynchronous producer, with the jetty component. + +.. _Camel components: http://camel.apache.org/components.html +.. _subset of components: http://camel.apache.org/asynchronous-routing-engine.html +.. _Jetty component: http://camel.apache.org/jetty.html - Actors (untyped) - Consumer publishing - Actors (untyped) - Typed actors - Consumers and the CamelService - Consumer un-publishing - Actors (untyped) - Typed actors - Acknowledgements - Actors (untyped) - Blocking exchanges - Consumer timeout - Typed actors - Actors (untyped) - Remote consumers - Actors (untyped) - Typed actors -Produce messages - Producer trait - Actors (untyped) - Custom Processing - Producer configuration options - Message correlation - Matching responses - ProducerTemplate - Actors (untyped) - Typed actors - Asynchronous routing -Fault tolerance -CamelService configuration -Standalone applications -Standalone Spring applications -Kernel mode Custom Camel routes -Akka Camel components -Access to actors -URI options -Message headers -Access to typed actors -Using Spring -Without Spring -Intercepting route construction -Actors (untyped) -Typed actors -Examples -Asynchronous routing and transformation example -Custom Camel route example -Publish-subcribe example -JMS -Cometd -Quartz Scheduler Example +=================== +In all the examples so far, routes to consumer actors have been automatically +constructed by akka-camel, when the actor was started. Although the default +route construction templates, used by akka-camel internally, are sufficient for +most use cases, some applications may require more specialized routes to actors. +The akka-camel module provides two mechanisms for customizing routes to actors, +which will be explained in this section. These are + +* Usage of :ref:`camel-components` to access actors. + Any Camel route can use these components to access Akka actors. + +* :ref:`camel-intercepting-route-construction` to actors. + Default routes to consumer actors are extended using predefined extension + points. + + +.. _camel-components: + +Akka Camel components +--------------------- + +Akka actors can be access from Camel routes using the `actor`_ Camel components, respectively. These components can be used to +access any Akka actor (not only consumer actors) from Camel routes, as described +in the following sections. + +.. _actor: https://github.com/akka/akka/blob/master/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala + +.. _access-to-actors: + +Access to actors +---------------- + +To access actors from custom Camel routes, the `actor`_ Camel +component should be used. It fully supports Camel's `asynchronous routing +engine`_. + +.. _actor: https://github.com/akka/akka/blob/master/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala +.. _asynchronous routing engine: http://camel.apache.org/asynchronous-routing-engine.html + +This component accepts the following endpoint URI format: + +* ``actor://path:[]?`` + +where ```` is the ``ActorPath`` to the actor. The ```` are +name-value pairs separated by ``&`` (i.e. ``name1=value1&name2=value2&...``). + + +URI options +^^^^^^^^^^^ + +The following URI options are supported: + ++--------------+----------+---------+-------------------------------------------+ +| Name | Type | Default | Description | ++==============+==========+=========+===========================================+ +| replyTimeout | Duration | false | The reply timeout, specified in the same | +| | | | way that you use the duration in akka, | +| | | | for instance ``10 seconds`` except that | +| | | | in the url it is handy to use a + | +| | | | between the amount and the unit, like | +| | | | for example ``200+millis`` | +| | | | | +| | | | See also :ref:`camel-timeout`. | ++--------------+----------+---------+-------------------------------------------+ +| autoack | Boolean | true | If set to true, in-only message exchanges | +| | | | are auto-acknowledged when the message is | +| | | | added to the actor's mailbox. If set to | +| | | | false, actors must acknowledge the | +| | | | receipt of the message. | +| | | | | +| | | | See also :ref:`camel-acknowledgements`. | ++--------------+----------+---------+-------------------------------------------+ + +Here's an actor endpoint URI example containing an actor uuid:: + + actor://path:akka://some-system/user/myconsumer?autoack=false&replyTimeout=100+millis + +In the following example, a custom route to an actor is created, using the +actor's path. the akka camel package contains an implicit ``toActorRouteDefinition`` that allows for a route to +reference an ``ActorRef`` directly as shown in the below example, The route starts from a `Jetty`_ endpoint and +ends at the target actor. + +.. includecode:: code/docs/camel/CustomRoute.scala#CustomRoute + +When a message is received on the jetty endpoint, it is routed to the Responder actor, which in return replies back to the client of +the HTTP request. + + +.. _camel-intercepting-route-construction: + +Intercepting route construction +------------------------------- + +The previous section, :ref:`camel-components`, explained how to setup a route to +an actor manually. It was the application's +responsibility to define the route and add it to the current CamelContext. This +section explains a more convenient way to define custom routes: akka-camel is +still setting up the routes to consumer actors (and adds these routes to the +current CamelContext) but applications can define extensions to these routes. +Extensions can be defined with Camel's `Java DSL`_ or `Scala DSL`_. For example, +an extension could be a custom error handler that redelivers messages from an +endpoint to an actor's bounded mailbox when the mailbox was full. + +.. _Java DSL: http://camel.apache.org/dsl.html +.. _Scala DSL: http://camel.apache.org/scala-dsl.html + +The following examples demonstrate how to extend a route to a consumer actor for +handling exceptions thrown by that actor. + +.. includecode:: code/docs/camel/CustomRoute.scala#ErrorThrowingConsumer + +The above ErrorThrowingConsumer sends the Failure back to the sender in preRestart +because the Exception that is thrown in the actor would +otherwise just crash the actor, by default the actor would be restarted, and the response would never reach the client of the Consumer. + +The akka-camel module creates a RouteDefinition instance by calling +from(endpointUri) on a Camel RouteBuilder (where endpointUri is the endpoint URI +of the consumer actor) and passes that instance as argument to the route +definition handler \*). The route definition handler then extends the route and +returns a ProcessorDefinition (in the above example, the ProcessorDefinition +returned by the end method. See the `org.apache.camel.model`__ package for +details). After executing the route definition handler, akka-camel finally calls +a to(targetActorUri) on the returned ProcessorDefinition to complete the +route to the consumer actor (where targetActorUri is the actor component URI as described in :ref:`access-to-actors`). + +\*) Before passing the RouteDefinition instance to the route definition handler, +akka-camel may make some further modifications to it. + +__ https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ + +.. _camel-examples: + +Examples +======== + +.. _camel-async-example: + +Asynchronous routing and transformation example +----------------------------------------------- + +This example demonstrates how to implement consumer and producer actors that +support :ref:`camel-asynchronous-routing` with their Camel endpoints. The sample +application transforms the content of the Akka homepage, http://akka.io, by +replacing every occurrence of *Akka* with *AKKA*. To run this example, add +a Boot class that starts the actors. After starting +the :ref:`microkernel-scala`, direct the browser to http://localhost:8875 and the +transformed Akka homepage should be displayed. Please note that this example +will probably not work if you're behind an HTTP proxy. + +The following figure gives an overview how the example actors interact with +external systems and with each other. A browser sends a GET request to +http://localhost:8875 which is the published endpoint of the ``HttpConsumer`` +actor. The ``HttpConsumer`` actor forwards the requests to the ``HttpProducer`` +actor which retrieves the Akka homepage from http://akka.io. The retrieved HTML +is then forwarded to the ``HttpTransformer`` actor which replaces all occurrences +of *Akka* with *AKKA*. The transformation result is sent back the HttpConsumer +which finally returns it to the browser. + +.. image:: ../modules/camel-async-interact.png + +Implementing the example actor classes and wiring them together is rather easy +as shown in the following snippet. + +.. includecode:: code/docs/camel/HttpExample.scala#HttpExample + +The `jetty endpoints`_ of HttpConsumer and HttpProducer support asynchronous +in-out message exchanges and do not allocate threads for the full duration of +the exchange. This is achieved by using `Jetty continuations`_ on the +consumer-side and by using `Jetty's asynchronous HTTP client`_ on the producer +side. The following high-level sequence diagram illustrates that. + +.. _jetty endpoints: http://camel.apache.org/jetty.html +.. _Jetty continuations: http://wiki.eclipse.org/Jetty/Feature/Continuations +.. _Jetty's asynchronous HTTP client: http://wiki.eclipse.org/Jetty/Tutorial/HttpClient + +.. image:: ../modules/camel-async-sequence.png + +Custom Camel route example +-------------------------- + +This section also demonstrates the combined usage of a ``Producer`` and a +``Consumer`` actor as well as the inclusion of a custom Camel route. The +following figure gives an overview. + +.. image:: ../modules/camel-custom-route.png + +* A consumer actor receives a message from an HTTP client + +* It forwards the message to another actor that transforms the message (encloses + the original message into hyphens) + +* The transformer actor forwards the transformed message to a producer actor + +* The producer actor sends the message to a custom Camel route beginning at the + ``direct:welcome`` endpoint + +* A processor (transformer) in the custom Camel route prepends "Welcome" to the + original message and creates a result message + +* The producer actor sends the result back to the consumer actor which returns + it to the HTTP client + + +The consumer, transformer and +producer actor implementations are as follows. + +.. includecode:: code/docs/camel/CustomRouteExample.scala#CustomRouteExample + + +The producer actor knows where to reply the message to because the consumer and +transformer actors have forwarded the original sender reference as well. The +application configuration and the route starting from direct:welcome are done in the code above. + +To run the example, add the lines shown in the example to a Boot class and the start the :ref:`microkernel-scala` and POST a message to +``http://localhost:8877/camel/welcome``. + +.. code-block:: none + + curl -H "Content-Type: text/plain" -d "Anke" http://localhost:8877/camel/welcome + +The response should be: + +.. code-block:: none + + Welcome - Anke - + +Quartz Scheduler Example +------------------------ + +Here is an example showing how simple is to implement a cron-style scheduler by +using the Camel Quartz component in Akka. + +The following example creates a "timer" actor which fires a message every 2 +seconds: + +.. includecode:: code/docs/camel/QuartzExample.scala#Quartz + +For more information about the Camel Quartz component, see here: +http://camel.apache.org/quartz.html \ No newline at end of file diff --git a/akka-docs/scala/code/docs/camel/CustomRoute.scala b/akka-docs/scala/code/docs/camel/CustomRoute.scala new file mode 100644 index 0000000000..ad268a0a6b --- /dev/null +++ b/akka-docs/scala/code/docs/camel/CustomRoute.scala @@ -0,0 +1,58 @@ +package docs.camel + +import akka.camel.CamelMessage +import akka.actor.Status.Failure + + +object CustomRoute { + { + //#CustomRoute + import akka.actor.{Props, ActorSystem, Actor, ActorRef} + import akka.camel.{CamelMessage, CamelExtension} + import org.apache.camel.builder.RouteBuilder + import akka.camel._ + class Responder extends Actor { + def receive = { + case msg: CamelMessage ⇒ + sender ! (msg.mapBody { + body: String ⇒ "received %s" format body + }) + } + } + + class CustomRouteBuilder(system: ActorSystem, responder:ActorRef) extends RouteBuilder { + def configure { + from("jetty:http://localhost:8877/camel/custom").to(responder) + } + } + val system = ActorSystem("some-system") + val camel = CamelExtension(system) + val responder = system.actorOf(Props[Responder], name = "TestResponder") + camel.context.addRoutes(new CustomRouteBuilder(system, responder)) + //#CustomRoute + + } + { + //#ErrorThrowingConsumer + import akka.camel.Consumer + + import org.apache.camel.builder.Builder + import org.apache.camel.model.RouteDefinition + + class ErrorThrowingConsumer(override val endpointUri: String) extends Consumer { + def receive = { + case msg: CamelMessage ⇒ throw new Exception("error: %s" format msg.body) + } + override def onRouteDefinition(rd: RouteDefinition) = { + // Catch any exception and handle it by returning the exception message as response + rd.onException(classOf[Exception]).handled(true).transform(Builder.exceptionMessage).end + } + + final override def preRestart(reason: Throwable, message: Option[Any]) { + sender ! Failure(reason) + } + } + //#ErrorThrowingConsumer + } + +} diff --git a/akka-docs/scala/code/docs/camel/CustomRouteExample.scala b/akka-docs/scala/code/docs/camel/CustomRouteExample.scala new file mode 100644 index 0000000000..96a9a7f667 --- /dev/null +++ b/akka-docs/scala/code/docs/camel/CustomRouteExample.scala @@ -0,0 +1,51 @@ +package docs.camel + + +object CustomRouteExample { + { + //#CustomRouteExample + import akka.actor.{Actor, ActorRef, Props, ActorSystem} + import akka.camel.{CamelMessage, Consumer, Producer, CamelExtension} + import org.apache.camel.builder.RouteBuilder + import org.apache.camel.{Exchange, Processor} + + class Consumer3(transformer: ActorRef) extends Actor with Consumer { + def endpointUri = "jetty:http://0.0.0.0:8877/camel/welcome" + + def receive = { + // Forward a string representation of the message body to transformer + case msg: CamelMessage => transformer.forward(msg.bodyAs[String]) + } + } + + class Transformer(producer: ActorRef) extends Actor { + def receive = { + // example: transform message body "foo" to "- foo -" and forward result to producer + case msg: CamelMessage => producer.forward(msg.mapBody((body: String) => "- %s -" format body)) + } + } + + class Producer1 extends Actor with Producer { + def endpointUri = "direct:welcome" + } + + class CustomRouteBuilder extends RouteBuilder { + def configure { + from("direct:welcome").process(new Processor() { + def process(exchange: Exchange) { + // Create a 'welcome' message from the input message + exchange.getOut.setBody("Welcome %s" format exchange.getIn.getBody) + } + }) + } + } + // the below lines can be added to a Boot class, so that you can run the example from a MicroKernel + val system = ActorSystem("some-system") + val producer = system.actorOf(Props[Producer1]) + val mediator = system.actorOf(Props(new Transformer(producer))) + val consumer = system.actorOf(Props(new Consumer3(mediator))) + CamelExtension(system).context.addRoutes(new CustomRouteBuilder) + //#CustomRouteExample + } + +} diff --git a/akka-docs/scala/code/docs/camel/HttpExample.scala b/akka-docs/scala/code/docs/camel/HttpExample.scala new file mode 100644 index 0000000000..d57f3707e5 --- /dev/null +++ b/akka-docs/scala/code/docs/camel/HttpExample.scala @@ -0,0 +1,47 @@ +package docs.camel + +object HttpExample { + + { + //#HttpExample + import org.apache.camel.Exchange + import akka.actor.{Actor, ActorRef, Props, ActorSystem} + import akka.camel.{Producer, CamelMessage, Consumer} + import akka.actor.Status.Failure + + class HttpConsumer(producer: ActorRef) extends Consumer { + def endpointUri = "jetty:http://0.0.0.0:8875/" + + def receive = { + case msg => producer forward msg + } + } + + class HttpProducer(transformer: ActorRef) extends Actor with Producer { + def endpointUri = "jetty://http://akka.io/?bridgeEndpoint=true" + + override def transformOutgoingMessage(msg: Any) = msg match { + case msg: CamelMessage => msg.addHeaders(msg.headers(Set(Exchange.HTTP_PATH))) + } + + override def routeResponse(msg: Any) { transformer forward msg } + } + + class HttpTransformer extends Actor { + def receive = { + case msg: CamelMessage => sender ! (msg.mapBody {body: String => body replaceAll ("Akka ", "AKKA ")}) + case msg: Failure => sender ! msg + } + } + + // Create the actors. this can be done in a Boot class so you can + // run the example in the MicroKernel. just add the below three lines to your boot class. + val system = ActorSystem("some-system") + val httpTransformer = system.actorOf(Props[HttpTransformer]) + val httpProducer = system.actorOf(Props(new HttpProducer(httpTransformer))) + val httpConsumer = system.actorOf(Props(new HttpConsumer(httpProducer))) + //#HttpExample + + } + +} diff --git a/akka-docs/scala/code/docs/camel/Producers.scala b/akka-docs/scala/code/docs/camel/Producers.scala index bc3576e93a..5229f83914 100644 --- a/akka-docs/scala/code/docs/camel/Producers.scala +++ b/akka-docs/scala/code/docs/camel/Producers.scala @@ -1,14 +1,127 @@ package docs.camel +import akka.camel.CamelExtension + object Producers { { //#Producer1 - import akka.camel.Producer import akka.actor.Actor + import akka.actor.{Props, ActorSystem} + import akka.camel.{Producer, CamelMessage} + import akka.util.Timeout class Producer1 extends Actor with Producer { def endpointUri = "http://localhost:8080/news" } //#Producer1 + //#AskProducer + import akka.pattern.ask + import akka.util.duration._ + implicit val timeout = Timeout(10 seconds) + + val system = ActorSystem("some-system") + val producer = system.actorOf(Props[Producer1]) + val future = producer.ask("some request").mapTo[CamelMessage] + //#AskProducer } -} + { + //#RouteResponse + import akka.actor.{Actor, ActorRef} + import akka.camel.{Producer, CamelMessage} + import akka.actor.{Props, ActorSystem} + + class ResponseReceiver extends Actor { + def receive = { + case msg:CamelMessage ⇒ + // do something with the forwarded response + } + } + + class Forwarder(uri: String, target: ActorRef) extends Actor with Producer { + def endpointUri = uri + + override def routeResponse(msg: Any) { target forward msg } + } + val system = ActorSystem("some-system") + val receiver = system.actorOf(Props[ResponseReceiver]) + val forwardResponse = system.actorOf(Props(new Forwarder("http://localhost:8080/news/akka",receiver))) + // the Forwarder sends out a request to the web page and forwards the response to + // the ResponseReceiver + forwardResponse ! "some request" + //#RouteResponse + } + { + //#TransformOutgoingMessage + import akka.actor.Actor + import akka.camel.{Producer, CamelMessage} + + class Transformer(uri: String) extends Actor with Producer { + def endpointUri = uri + + def upperCase(msg:CamelMessage) = msg.mapBody { + body: String ⇒ body.toUpperCase + } + + override def transformOutgoingMessage(msg: Any) = msg match { + case msg: CamelMessage ⇒ upperCase(msg) + } + } + //#TransformOutgoingMessage + } + { + //#Oneway + import akka.actor.{Actor, Props, ActorSystem} + import akka.camel.Producer + + class OnewaySender(uri:String) extends Actor with Producer { + def endpointUri = uri + override def oneway: Boolean = true + } + + val system = ActorSystem("some-system") + val producer = system.actorOf(Props(new OnewaySender("activemq:FOO.BAR"))) + producer ! "Some message" + //#Oneway + + } + { + //#Correlate + import akka.camel.{Producer, CamelMessage} + import akka.actor.Actor + import akka.actor.{Props, ActorSystem} + + class Producer2 extends Actor with Producer { + def endpointUri = "activemg:FOO.BAR" + } + val system = ActorSystem("some-system") + val producer = system.actorOf(Props[Producer2]) + + producer ! CamelMessage("bar", Map(CamelMessage.MessageExchangeId -> "123")) + //#Correlate + } + { + //#ProducerTemplate + import akka.actor.Actor + class MyActor extends Actor { + def receive = { + case msg ⇒ + val template = CamelExtension(context.system).template + template.sendBody("direct:news", msg) + } + } + //#ProducerTemplate + } + { + //#RequestProducerTemplate + import akka.actor.Actor + class MyActor extends Actor { + def receive = { + case msg ⇒ + val template = CamelExtension(context.system).template + sender ! template.requestBody("direct:news", msg) + } + } + //#RequestProducerTemplate + } + +} \ No newline at end of file diff --git a/akka-docs/scala/code/docs/camel/PublishSubscribe.scala b/akka-docs/scala/code/docs/camel/PublishSubscribe.scala new file mode 100644 index 0000000000..ab604b44dd --- /dev/null +++ b/akka-docs/scala/code/docs/camel/PublishSubscribe.scala @@ -0,0 +1,48 @@ +package docs.camel + + +object PublishSubscribe { + { + //#PubSub + import akka.actor.{Actor, ActorRef, ActorSystem, Props} + import akka.camel.{Producer, CamelMessage, Consumer} + + class Subscriber(name:String, uri: String) extends Actor with Consumer { + def endpointUri = uri + + protected def receive = { + case msg: CamelMessage => println("%s received: %s" format (name, msg.body)) + } + } + + class Publisher(name: String, uri: String) extends Actor with Producer { + + def endpointUri = uri + + // one-way communication with JMS + override def oneway = true + } + + class PublisherBridge(uri: String, publisher: ActorRef) extends Actor with Consumer { + def endpointUri = uri + + protected def receive = { + case msg: CamelMessage => { + publisher ! msg.bodyAs[String] + sender ! ("message published") + } + } + } + + // Add below to a Boot class + // Setup publish/subscribe example + val system = ActorSystem("some-system") + val jmsUri = "jms:topic:test" + val jmsSubscriber1 = system.actorOf(Props(new Subscriber("jms-subscriber-1", jmsUri))) + val jmsSubscriber2 = system.actorOf(Props(new Subscriber("jms-subscriber-2", jmsUri))) + val jmsPublisher = system.actorOf(Props(new Publisher("jms-publisher", jmsUri))) + val jmsPublisherBridge = system.actorOf(Props(new PublisherBridge("jetty:http://0.0.0.0:8877/camel/pub/jms", jmsPublisher))) + //#PubSub + + } +} diff --git a/akka-docs/scala/code/docs/camel/QuartzExample.scala b/akka-docs/scala/code/docs/camel/QuartzExample.scala new file mode 100644 index 0000000000..b25e38630c --- /dev/null +++ b/akka-docs/scala/code/docs/camel/QuartzExample.scala @@ -0,0 +1,35 @@ +package docs.camel + + +object QuartzExample { + + { + //#Quartz + import akka.actor.{ActorSystem, Props} + + import akka.camel.{Consumer} + + class MyQuartzActor extends Consumer { + + def endpointUri = "quartz://example?cron=0/2+*+*+*+*+?" + + def receive = { + + case msg => println("==============> received %s " format msg) + + } // end receive + + } // end MyQuartzActor + + object MyQuartzActor { + + def main(str: Array[String]) { + val system = ActorSystem("my-quartz-system") + system.actorOf(Props[MyQuartzActor]) + } // end main + + } // end MyQuartzActor + //#Quartz + } + +}