added akka camel docs

This commit is contained in:
RayRoestenburg 2012-06-26 10:43:31 +02:00
parent 92e3ca2629
commit a7a96687e4
7 changed files with 698 additions and 52 deletions

View file

@ -0,0 +1,58 @@
package docs.camel
import akka.camel.CamelMessage
import akka.actor.Status.Failure
object CustomRoute {
{
//#CustomRoute
import akka.actor.{Props, ActorSystem, Actor, ActorRef}
import akka.camel.{CamelMessage, CamelExtension}
import org.apache.camel.builder.RouteBuilder
import akka.camel._
class Responder extends Actor {
def receive = {
case msg: CamelMessage
sender ! (msg.mapBody {
body: String "received %s" format body
})
}
}
class CustomRouteBuilder(system: ActorSystem, responder:ActorRef) extends RouteBuilder {
def configure {
from("jetty:http://localhost:8877/camel/custom").to(responder)
}
}
val system = ActorSystem("some-system")
val camel = CamelExtension(system)
val responder = system.actorOf(Props[Responder], name = "TestResponder")
camel.context.addRoutes(new CustomRouteBuilder(system, responder))
//#CustomRoute
}
{
//#ErrorThrowingConsumer
import akka.camel.Consumer
import org.apache.camel.builder.Builder
import org.apache.camel.model.RouteDefinition
class ErrorThrowingConsumer(override val endpointUri: String) extends Consumer {
def receive = {
case msg: CamelMessage throw new Exception("error: %s" format msg.body)
}
override def onRouteDefinition(rd: RouteDefinition) = {
// Catch any exception and handle it by returning the exception message as response
rd.onException(classOf[Exception]).handled(true).transform(Builder.exceptionMessage).end
}
final override def preRestart(reason: Throwable, message: Option[Any]) {
sender ! Failure(reason)
}
}
//#ErrorThrowingConsumer
}
}

View file

@ -0,0 +1,51 @@
package docs.camel
object CustomRouteExample {
{
//#CustomRouteExample
import akka.actor.{Actor, ActorRef, Props, ActorSystem}
import akka.camel.{CamelMessage, Consumer, Producer, CamelExtension}
import org.apache.camel.builder.RouteBuilder
import org.apache.camel.{Exchange, Processor}
class Consumer3(transformer: ActorRef) extends Actor with Consumer {
def endpointUri = "jetty:http://0.0.0.0:8877/camel/welcome"
def receive = {
// Forward a string representation of the message body to transformer
case msg: CamelMessage => transformer.forward(msg.bodyAs[String])
}
}
class Transformer(producer: ActorRef) extends Actor {
def receive = {
// example: transform message body "foo" to "- foo -" and forward result to producer
case msg: CamelMessage => producer.forward(msg.mapBody((body: String) => "- %s -" format body))
}
}
class Producer1 extends Actor with Producer {
def endpointUri = "direct:welcome"
}
class CustomRouteBuilder extends RouteBuilder {
def configure {
from("direct:welcome").process(new Processor() {
def process(exchange: Exchange) {
// Create a 'welcome' message from the input message
exchange.getOut.setBody("Welcome %s" format exchange.getIn.getBody)
}
})
}
}
// the below lines can be added to a Boot class, so that you can run the example from a MicroKernel
val system = ActorSystem("some-system")
val producer = system.actorOf(Props[Producer1])
val mediator = system.actorOf(Props(new Transformer(producer)))
val consumer = system.actorOf(Props(new Consumer3(mediator)))
CamelExtension(system).context.addRoutes(new CustomRouteBuilder)
//#CustomRouteExample
}
}

View file

@ -0,0 +1,47 @@
package docs.camel
object HttpExample {
{
//#HttpExample
import org.apache.camel.Exchange
import akka.actor.{Actor, ActorRef, Props, ActorSystem}
import akka.camel.{Producer, CamelMessage, Consumer}
import akka.actor.Status.Failure
class HttpConsumer(producer: ActorRef) extends Consumer {
def endpointUri = "jetty:http://0.0.0.0:8875/"
def receive = {
case msg => producer forward msg
}
}
class HttpProducer(transformer: ActorRef) extends Actor with Producer {
def endpointUri = "jetty://http://akka.io/?bridgeEndpoint=true"
override def transformOutgoingMessage(msg: Any) = msg match {
case msg: CamelMessage => msg.addHeaders(msg.headers(Set(Exchange.HTTP_PATH)))
}
override def routeResponse(msg: Any) { transformer forward msg }
}
class HttpTransformer extends Actor {
def receive = {
case msg: CamelMessage => sender ! (msg.mapBody {body: String => body replaceAll ("Akka ", "AKKA ")})
case msg: Failure => sender ! msg
}
}
// Create the actors. this can be done in a Boot class so you can
// run the example in the MicroKernel. just add the below three lines to your boot class.
val system = ActorSystem("some-system")
val httpTransformer = system.actorOf(Props[HttpTransformer])
val httpProducer = system.actorOf(Props(new HttpProducer(httpTransformer)))
val httpConsumer = system.actorOf(Props(new HttpConsumer(httpProducer)))
//#HttpExample
}
}

View file

@ -1,14 +1,127 @@
package docs.camel
import akka.camel.CamelExtension
object Producers {
{
//#Producer1
import akka.camel.Producer
import akka.actor.Actor
import akka.actor.{Props, ActorSystem}
import akka.camel.{Producer, CamelMessage}
import akka.util.Timeout
class Producer1 extends Actor with Producer {
def endpointUri = "http://localhost:8080/news"
}
//#Producer1
//#AskProducer
import akka.pattern.ask
import akka.util.duration._
implicit val timeout = Timeout(10 seconds)
val system = ActorSystem("some-system")
val producer = system.actorOf(Props[Producer1])
val future = producer.ask("some request").mapTo[CamelMessage]
//#AskProducer
}
}
{
//#RouteResponse
import akka.actor.{Actor, ActorRef}
import akka.camel.{Producer, CamelMessage}
import akka.actor.{Props, ActorSystem}
class ResponseReceiver extends Actor {
def receive = {
case msg:CamelMessage
// do something with the forwarded response
}
}
class Forwarder(uri: String, target: ActorRef) extends Actor with Producer {
def endpointUri = uri
override def routeResponse(msg: Any) { target forward msg }
}
val system = ActorSystem("some-system")
val receiver = system.actorOf(Props[ResponseReceiver])
val forwardResponse = system.actorOf(Props(new Forwarder("http://localhost:8080/news/akka",receiver)))
// the Forwarder sends out a request to the web page and forwards the response to
// the ResponseReceiver
forwardResponse ! "some request"
//#RouteResponse
}
{
//#TransformOutgoingMessage
import akka.actor.Actor
import akka.camel.{Producer, CamelMessage}
class Transformer(uri: String) extends Actor with Producer {
def endpointUri = uri
def upperCase(msg:CamelMessage) = msg.mapBody {
body: String body.toUpperCase
}
override def transformOutgoingMessage(msg: Any) = msg match {
case msg: CamelMessage upperCase(msg)
}
}
//#TransformOutgoingMessage
}
{
//#Oneway
import akka.actor.{Actor, Props, ActorSystem}
import akka.camel.Producer
class OnewaySender(uri:String) extends Actor with Producer {
def endpointUri = uri
override def oneway: Boolean = true
}
val system = ActorSystem("some-system")
val producer = system.actorOf(Props(new OnewaySender("activemq:FOO.BAR")))
producer ! "Some message"
//#Oneway
}
{
//#Correlate
import akka.camel.{Producer, CamelMessage}
import akka.actor.Actor
import akka.actor.{Props, ActorSystem}
class Producer2 extends Actor with Producer {
def endpointUri = "activemg:FOO.BAR"
}
val system = ActorSystem("some-system")
val producer = system.actorOf(Props[Producer2])
producer ! CamelMessage("bar", Map(CamelMessage.MessageExchangeId -> "123"))
//#Correlate
}
{
//#ProducerTemplate
import akka.actor.Actor
class MyActor extends Actor {
def receive = {
case msg
val template = CamelExtension(context.system).template
template.sendBody("direct:news", msg)
}
}
//#ProducerTemplate
}
{
//#RequestProducerTemplate
import akka.actor.Actor
class MyActor extends Actor {
def receive = {
case msg
val template = CamelExtension(context.system).template
sender ! template.requestBody("direct:news", msg)
}
}
//#RequestProducerTemplate
}
}

View file

@ -0,0 +1,48 @@
package docs.camel
object PublishSubscribe {
{
//#PubSub
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import akka.camel.{Producer, CamelMessage, Consumer}
class Subscriber(name:String, uri: String) extends Actor with Consumer {
def endpointUri = uri
protected def receive = {
case msg: CamelMessage => println("%s received: %s" format (name, msg.body))
}
}
class Publisher(name: String, uri: String) extends Actor with Producer {
def endpointUri = uri
// one-way communication with JMS
override def oneway = true
}
class PublisherBridge(uri: String, publisher: ActorRef) extends Actor with Consumer {
def endpointUri = uri
protected def receive = {
case msg: CamelMessage => {
publisher ! msg.bodyAs[String]
sender ! ("message published")
}
}
}
// Add below to a Boot class
// Setup publish/subscribe example
val system = ActorSystem("some-system")
val jmsUri = "jms:topic:test"
val jmsSubscriber1 = system.actorOf(Props(new Subscriber("jms-subscriber-1", jmsUri)))
val jmsSubscriber2 = system.actorOf(Props(new Subscriber("jms-subscriber-2", jmsUri)))
val jmsPublisher = system.actorOf(Props(new Publisher("jms-publisher", jmsUri)))
val jmsPublisherBridge = system.actorOf(Props(new PublisherBridge("jetty:http://0.0.0.0:8877/camel/pub/jms", jmsPublisher)))
//#PubSub
}
}

View file

@ -0,0 +1,35 @@
package docs.camel
object QuartzExample {
{
//#Quartz
import akka.actor.{ActorSystem, Props}
import akka.camel.{Consumer}
class MyQuartzActor extends Consumer {
def endpointUri = "quartz://example?cron=0/2+*+*+*+*+?"
def receive = {
case msg => println("==============> received %s " format msg)
} // end receive
} // end MyQuartzActor
object MyQuartzActor {
def main(str: Array[String]) {
val system = ActorSystem("my-quartz-system")
system.actorOf(Props[MyQuartzActor])
} // end main
} // end MyQuartzActor
//#Quartz
}
}