added more docs for camel, couldnt resist to also remove a ; from DefaultCamel
This commit is contained in:
parent
91268365c1
commit
92e3ca2629
5 changed files with 262 additions and 11 deletions
|
|
@ -26,7 +26,7 @@ private[camel] class DefaultCamel(val system: ActorSystem) extends Camel {
|
|||
|
||||
lazy val context: CamelContext = {
|
||||
val ctx = new DefaultCamelContext
|
||||
ctx.setName(system.name);
|
||||
ctx.setName(system.name)
|
||||
ctx.setStreamCaching(true)
|
||||
ctx.addComponent("actor", new ActorComponent(this))
|
||||
ctx.getTypeConverterRegistry.addTypeConverter(classOf[Duration], classOf[String], DurationTypeConverter)
|
||||
|
|
|
|||
|
|
@ -21,14 +21,13 @@ Other, more advanced external articles (for version 1) are:
|
|||
* `Akka Consumer Actors: New Features and Best Practices <http://krasserm.blogspot.com/2011/02/akka-consumer-actors-new-features-and.html>`_
|
||||
* `Akka Producer Actors: New Features and Best Practices <http://krasserm.blogspot.com/2011/02/akka-producer-actor-new-features-and.html>`_
|
||||
|
||||
|
||||
|
||||
Introduction
|
||||
============
|
||||
|
||||
The akka-camel module allows actors to receive
|
||||
and send messages over a great variety of protocols and APIs. This section gives
|
||||
a brief overview of the general ideas behind the akka-camel module, the
|
||||
and send messages over a great variety of protocols and APIs (akka-camel version 1.x also provided
|
||||
this functionality to Typed Actors. In akka-camel 2, support for Typed Actors has been removed.)
|
||||
This section gives a brief overview of the general ideas behind the akka-camel module, the
|
||||
remaining sections go into the details. In addition to the native Scala and Java
|
||||
actor API, actors can now exchange messages with other systems over large number
|
||||
of protocols and APIs such as HTTP, SOAP, TCP, FTP, SMTP or JMS, to mention a
|
||||
|
|
@ -90,10 +89,55 @@ protocol-specific details from Akka and makes it therefore very easy to support
|
|||
a large number of protocols through a uniform Camel component interface. The
|
||||
akka-camel module further converts mutable Camel messages into immutable
|
||||
representations which are used by Consumer and Producer actors for pattern
|
||||
matching, transformation, serialization or storage.
|
||||
matching, transformation, serialization or storage. In the above example of the Orders Producer,
|
||||
the XML message is put in the body of a newly created Camel Message with an empty set of headers.
|
||||
You can also create a CamelMessage yourself with the appropriate body and headers as you see fit.
|
||||
|
||||
__ https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/main/java/org/apache/camel/Message.java
|
||||
|
||||
CamelExtension
|
||||
--------------
|
||||
The akka-camel module is implemented as an Akka Extension, the ``CamelExtension`` object.
|
||||
Extensions will only be loaded once per ``ActorSystem``, which will be managed by Akka. There is a one to one relationship between the ``CamelExtension`` and
|
||||
the ``ActorSystem``, there can be only one ``CamelExtension`` for one ``ActorSystem``.
|
||||
The ``CamelExtension`` object provides access to the `Camel`_ trait.
|
||||
The `Camel`_ trait in turn provides access to two important Apache Camel objects, the `CamelContext`_ and the `ProducerTemplate`_.
|
||||
Below you can see how you can get access to these Apache Camel objects.
|
||||
|
||||
.. includecode:: code/docs/camel/Introduction.scala#CamelExtension
|
||||
|
||||
One ``CamelExtension`` is only loaded once for every one ``ActorSystem``, which makes it safe to call the ``CamelExtension`` at any point in your code to get to the
|
||||
Apache Camel objects associated with it. There is one `CamelContext`_ and one `ProducerTemplate`_ for every one ``ActorSystem`` that uses a ``CamelExtension``.
|
||||
Below an example on how to add the ActiveMQ component to the `CamelContext`_, which is required when you would like to use the ActiveMQ component.
|
||||
|
||||
.. includecode:: code/docs/camel/Introduction.scala#CamelExtensionAddComponent
|
||||
|
||||
The `CamelContext`_ joins the lifecycle of the ``ActorSystem`` and ``CamelExtension`` it is associated with; the `CamelContext`_ is started when
|
||||
the ``CamelExtension`` is created, and it is shutdown when the associated ``ActorSystem`` is shut down. The same is true for the `ProducerTemplate`_.
|
||||
|
||||
The ``CamelExtension`` is used by both `Producer` and `Consumer` actors to interact with Apache Camel internally.
|
||||
When Akka creates a `Consumer` actor, the `Consumer` is published at its
|
||||
Camel endpoint (more precisely, the route is added to the `CamelContext`_ from the `Endpoint`_ to the actor).
|
||||
When Akka creates a `Producer` actor, a `SendProcessor`_ and `Endpoint`_ are created so that the Producer can send messages to it.
|
||||
Publication is done asynchronously; setting up an endpoint may still be in progress after you have
|
||||
requested the actor to be created. Some Camel components can take a while to startup, and in some cases you might want to know when the endpoints are activated and ready to be used.
|
||||
The `Camel`_ trait allows you to find out when the endpoint is activated or deactivated.
|
||||
|
||||
.. includecode:: code/docs/camel/Introduction.scala#CamelActivation
|
||||
|
||||
The above code shows that you can get a ``Future`` to the activation of the route from the endpoint to the actor, or you can wait in a blocking fashion on the activation of the route.
|
||||
An ``ActivationTimeoutException`` is thrown if the endpoint could not be activated within the specified timeout. Deactivation works in a similar fashion:
|
||||
|
||||
.. includecode:: code/docs/camel/Introduction.scala#CamelDeactivation
|
||||
|
||||
Deactivation of a Consumer or a Producer actor happens when the actor is terminated. For a Consumer, the route to the actor is stopped. For a Producer, the `SendProcessor`_ is stopped.
|
||||
A ``DeActivationTimeoutException`` is thrown if the associated camel objects could not be deactivated within the specified timeout.
|
||||
|
||||
.. _Camel: http://github.com/akka/akka/blob/master/akka-camel/src/main/scala/akka/camel/Camel.scala
|
||||
.. _CamelContext: https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java
|
||||
.. _ProducerTemplate: https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java
|
||||
.. _SendProcessor: https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
|
||||
.. _Endpoint: https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java
|
||||
|
||||
Dependencies
|
||||
============
|
||||
|
|
@ -156,3 +200,108 @@ new Message object is created by akka-camel with the actor response as message
|
|||
body.
|
||||
|
||||
.. _Message: http://github.com/akka/akka/blob/master/akka-camel/src/main/scala/akka/camel/CamelMessage.scala
|
||||
|
||||
Acknowledgements
|
||||
----------------
|
||||
|
||||
With in-out message exchanges, clients usually know that a message exchange is
|
||||
done when they receive a reply from a consumer actor. The reply message can be a
|
||||
CamelMessage (or any object which is then internally converted to a CamelMessage) on
|
||||
success, and a Failure message on failure.
|
||||
|
||||
With in-only message exchanges, by default, an exchange is done when a message
|
||||
is added to the consumer actor's mailbox. Any failure or exception that occurs
|
||||
during processing of that message by the consumer actor cannot be reported back
|
||||
to the endpoint in this case. To allow consumer actors to positively or
|
||||
negatively acknowledge the receipt of a message from an in-only message
|
||||
exchange, they need to override the ``autoack`` method to return false.
|
||||
In this case, consumer actors must reply either with a
|
||||
special Ack message (positive acknowledgement) or a Failure (negative
|
||||
acknowledgement).
|
||||
|
||||
.. includecode:: code/docs/camel/Consumers.scala#Consumer3
|
||||
|
||||
Consumer timeout
|
||||
----------------
|
||||
|
||||
Camel Exchanges (and their corresponding endpoints) that support two-way communications need to wait for a response from
|
||||
an actor before returning it to the initiating client.
|
||||
For some endpoint types, timeout values can be defined in an endpoint-specific
|
||||
way which is described in the documentation of the individual `Camel
|
||||
components`_. Another option is to configure timeouts on the level of consumer
|
||||
actors.
|
||||
|
||||
.. _Camel components: http://camel.apache.org/components.html
|
||||
|
||||
Two-way communications between a Camel endpoint and an actor are
|
||||
initiated by sending the request message to the actor with the ask pattern
|
||||
and the actor replies to the endpoint when the response is ready. The ask request to the actor can timeout, which will
|
||||
result in the `Exchange`_ failing with a TimeoutException set on the failure of the `Exchange`_.
|
||||
The timeout on the consumer actor can be overridden with the ``replyTimeout``, as shown below.
|
||||
|
||||
.. includecode:: code/docs/camel/Consumers.scala#Consumer4
|
||||
.. _Exchange: https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
|
||||
|
||||
Producer Actors
|
||||
===============
|
||||
|
||||
For sending messages to Camel endpoints, actors need to mixin the `Producer`_ trait and implement the endpointUri method.
|
||||
|
||||
.. includecode:: code/docs/camel/Producers.scala#Producer1
|
||||
.. _Producer: http://github.com/akka/akka/blob/master/akka-camel/src/main/scala/akka/camel/Producer.scala
|
||||
|
||||
|
||||
|
||||
|
||||
Actors (untyped)
|
||||
Consumer publishing
|
||||
Actors (untyped)
|
||||
Typed actors
|
||||
Consumers and the CamelService
|
||||
Consumer un-publishing
|
||||
Actors (untyped)
|
||||
Typed actors
|
||||
Acknowledgements
|
||||
Actors (untyped)
|
||||
Blocking exchanges
|
||||
Consumer timeout
|
||||
Typed actors
|
||||
Actors (untyped)
|
||||
Remote consumers
|
||||
Actors (untyped)
|
||||
Typed actors
|
||||
Produce messages
|
||||
Producer trait
|
||||
Actors (untyped)
|
||||
Custom Processing
|
||||
Producer configuration options
|
||||
Message correlation
|
||||
Matching responses
|
||||
ProducerTemplate
|
||||
Actors (untyped)
|
||||
Typed actors
|
||||
Asynchronous routing
|
||||
Fault tolerance
|
||||
CamelService configuration
|
||||
Standalone applications
|
||||
Standalone Spring applications
|
||||
Kernel mode
|
||||
Custom Camel routes
|
||||
Akka Camel components
|
||||
Access to actors
|
||||
URI options
|
||||
Message headers
|
||||
Access to typed actors
|
||||
Using Spring
|
||||
Without Spring
|
||||
Intercepting route construction
|
||||
Actors (untyped)
|
||||
Typed actors
|
||||
Examples
|
||||
Asynchronous routing and transformation example
|
||||
Custom Camel route example
|
||||
Publish-subcribe example
|
||||
JMS
|
||||
Cometd
|
||||
Quartz Scheduler Example
|
||||
|
||||
|
|
|
|||
|
|
@ -27,4 +27,41 @@ object Consumers {
|
|||
}
|
||||
//#Consumer2
|
||||
}
|
||||
{
|
||||
//#Consumer3
|
||||
import akka.camel.{ CamelMessage, Consumer }
|
||||
import akka.camel.Ack
|
||||
import akka.actor.Status.Failure
|
||||
|
||||
class Consumer3 extends Consumer {
|
||||
override def autoack = false
|
||||
|
||||
def endpointUri = "jms:queue:test"
|
||||
|
||||
def receive = {
|
||||
case msg:CamelMessage ⇒
|
||||
sender ! Ack
|
||||
// on success
|
||||
// ..
|
||||
val someException = new Exception("e1")
|
||||
// on failure
|
||||
sender ! Failure(someException)
|
||||
}
|
||||
}
|
||||
//#Consumer3
|
||||
}
|
||||
{
|
||||
//#Consumer4
|
||||
import akka.camel.{ CamelMessage, Consumer }
|
||||
import akka.util.duration._
|
||||
|
||||
class Consumer4 extends Consumer {
|
||||
def endpointUri = "jetty:http://localhost:8877/camel/default"
|
||||
override def replyTimeout = 500 millis
|
||||
def receive = {
|
||||
case msg: CamelMessage ⇒ sender ! ("Hello %s" format msg.bodyAs[String])
|
||||
}
|
||||
}
|
||||
//#Consumer4
|
||||
}
|
||||
}
|
||||
|
|
@ -1,11 +1,14 @@
|
|||
package docs.camel
|
||||
|
||||
import akka.actor.{Props, ActorSystem}
|
||||
import akka.camel.CamelExtension
|
||||
|
||||
object Introduction {
|
||||
{
|
||||
//#Consumer-mina
|
||||
import akka.camel.{ CamelMessage, Consumer }
|
||||
|
||||
class MinaClient extends Consumer {
|
||||
class MyEndpoint extends Consumer {
|
||||
def endpointUri = "mina:tcp://localhost:6200?textline=true"
|
||||
|
||||
def receive = {
|
||||
|
|
@ -17,15 +20,15 @@ object Introduction {
|
|||
// start and expose actor via tcp
|
||||
import akka.actor.{ ActorSystem, Props }
|
||||
|
||||
val sys = ActorSystem("camel")
|
||||
val mina = sys.actorOf(Props[MinaClient])
|
||||
val system = ActorSystem("some-system")
|
||||
val mina = system.actorOf(Props[MyEndpoint])
|
||||
//#Consumer-mina
|
||||
}
|
||||
{
|
||||
//#Consumer
|
||||
import akka.camel.{ CamelMessage, Consumer }
|
||||
|
||||
class JettyAdapter extends Consumer {
|
||||
class MyEndpoint extends Consumer {
|
||||
def endpointUri = "jetty:http://localhost:8877/example"
|
||||
|
||||
def receive = {
|
||||
|
|
@ -45,10 +48,58 @@ object Introduction {
|
|||
def endpointUri = "jms:queue:Orders"
|
||||
}
|
||||
|
||||
val sys = ActorSystem("camel")
|
||||
val sys = ActorSystem("some-system")
|
||||
val orders = sys.actorOf(Props[Orders])
|
||||
|
||||
orders ! <order amount="100" currency="PLN" itemId="12345"/>
|
||||
//#Producer
|
||||
}
|
||||
{
|
||||
//#CamelExtension
|
||||
val system = ActorSystem("some-system")
|
||||
val camel = CamelExtension(system)
|
||||
val camelContext = camel.context
|
||||
val producerTemplate = camel.template
|
||||
|
||||
//#CamelExtension
|
||||
}
|
||||
{
|
||||
//#CamelExtensionAddComponent
|
||||
// import org.apache.activemq.camel.component.ActiveMQComponent
|
||||
val system = ActorSystem("some-system")
|
||||
val camel = CamelExtension(system)
|
||||
val camelContext = camel.context
|
||||
// camelContext.addComponent("activemq", ActiveMQComponent.activeMQComponent("vm://localhost?broker.persistent=false"))
|
||||
//#CamelExtensionAddComponent
|
||||
}
|
||||
{
|
||||
//#CamelActivation
|
||||
import akka.camel.{ CamelMessage, Consumer }
|
||||
import akka.util.duration._
|
||||
|
||||
class MyEndpoint extends Consumer {
|
||||
def endpointUri = "mina:tcp://localhost:6200?textline=true"
|
||||
|
||||
def receive = {
|
||||
case msg: CamelMessage ⇒ { /* ... */ }
|
||||
case _ ⇒ { /* ... */ }
|
||||
}
|
||||
}
|
||||
val system = ActorSystem("some-system")
|
||||
val camel = CamelExtension(system)
|
||||
val actorRef = system.actorOf(Props[MyEndpoint])
|
||||
// get a future reference to the activation of the endpoint of the Consumer Actor
|
||||
val activationFuture = camel.activationFutureFor(actorRef,10 seconds)
|
||||
// or, block wait on the activation
|
||||
camel.awaitActivation(actorRef, 10 seconds)
|
||||
//#CamelActivation
|
||||
//#CamelDeactivation
|
||||
system.stop(actorRef)
|
||||
// get a future reference to the deactivation of the endpoint of the Consumer Actor
|
||||
val deactivationFuture = camel.activationFutureFor(actorRef,10 seconds)
|
||||
// or, block wait on the deactivation
|
||||
camel.awaitDeactivation(actorRef, 10 seconds)
|
||||
//#CamelDeactivation
|
||||
}
|
||||
|
||||
}
|
||||
14
akka-docs/scala/code/docs/camel/Producers.scala
Normal file
14
akka-docs/scala/code/docs/camel/Producers.scala
Normal file
|
|
@ -0,0 +1,14 @@
|
|||
package docs.camel
|
||||
|
||||
object Producers {
|
||||
{
|
||||
//#Producer1
|
||||
import akka.camel.Producer
|
||||
import akka.actor.Actor
|
||||
|
||||
class Producer1 extends Actor with Producer {
|
||||
def endpointUri = "http://localhost:8080/news"
|
||||
}
|
||||
//#Producer1
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue