Merge with master

This commit is contained in:
Viktor Klang 2012-07-12 21:05:23 +02:00
commit 68848f6631
12 changed files with 909 additions and 13 deletions

View file

@ -27,4 +27,41 @@ object Consumers {
}
//#Consumer2
}
{
//#Consumer3
import akka.camel.{ CamelMessage, Consumer }
import akka.camel.Ack
import akka.actor.Status.Failure
class Consumer3 extends Consumer {
override def autoack = false
def endpointUri = "jms:queue:test"
def receive = {
case msg: CamelMessage
sender ! Ack
// on success
// ..
val someException = new Exception("e1")
// on failure
sender ! Failure(someException)
}
}
//#Consumer3
}
{
//#Consumer4
import akka.camel.{ CamelMessage, Consumer }
import akka.util.duration._
class Consumer4 extends Consumer {
def endpointUri = "jetty:http://localhost:8877/camel/default"
override def replyTimeout = 500 millis
def receive = {
case msg: CamelMessage sender ! ("Hello %s" format msg.bodyAs[String])
}
}
//#Consumer4
}
}

View file

@ -0,0 +1,57 @@
package docs.camel
import akka.camel.CamelMessage
import akka.actor.Status.Failure
object CustomRoute {
{
//#CustomRoute
import akka.actor.{ Props, ActorSystem, Actor, ActorRef }
import akka.camel.{ CamelMessage, CamelExtension }
import org.apache.camel.builder.RouteBuilder
import akka.camel._
class Responder extends Actor {
def receive = {
case msg: CamelMessage
sender ! (msg.mapBody {
body: String "received %s" format body
})
}
}
class CustomRouteBuilder(system: ActorSystem, responder: ActorRef) extends RouteBuilder {
def configure {
from("jetty:http://localhost:8877/camel/custom").to(responder)
}
}
val system = ActorSystem("some-system")
val camel = CamelExtension(system)
val responder = system.actorOf(Props[Responder], name = "TestResponder")
camel.context.addRoutes(new CustomRouteBuilder(system, responder))
//#CustomRoute
}
{
//#ErrorThrowingConsumer
import akka.camel.Consumer
import org.apache.camel.builder.Builder
import org.apache.camel.model.RouteDefinition
class ErrorThrowingConsumer(override val endpointUri: String) extends Consumer {
def receive = {
case msg: CamelMessage throw new Exception("error: %s" format msg.body)
}
override def onRouteDefinition(rd: RouteDefinition) = {
// Catch any exception and handle it by returning the exception message as response
rd.onException(classOf[Exception]).handled(true).transform(Builder.exceptionMessage).end
}
final override def preRestart(reason: Throwable, message: Option[Any]) {
sender ! Failure(reason)
}
}
//#ErrorThrowingConsumer
}
}

View file

@ -0,0 +1,50 @@
package docs.camel
object CustomRouteExample {
{
//#CustomRouteExample
import akka.actor.{ Actor, ActorRef, Props, ActorSystem }
import akka.camel.{ CamelMessage, Consumer, Producer, CamelExtension }
import org.apache.camel.builder.RouteBuilder
import org.apache.camel.{ Exchange, Processor }
class Consumer3(transformer: ActorRef) extends Actor with Consumer {
def endpointUri = "jetty:http://0.0.0.0:8877/camel/welcome"
def receive = {
// Forward a string representation of the message body to transformer
case msg: CamelMessage transformer.forward(msg.bodyAs[String])
}
}
class Transformer(producer: ActorRef) extends Actor {
def receive = {
// example: transform message body "foo" to "- foo -" and forward result to producer
case msg: CamelMessage producer.forward(msg.mapBody((body: String) "- %s -" format body))
}
}
class Producer1 extends Actor with Producer {
def endpointUri = "direct:welcome"
}
class CustomRouteBuilder extends RouteBuilder {
def configure {
from("direct:welcome").process(new Processor() {
def process(exchange: Exchange) {
// Create a 'welcome' message from the input message
exchange.getOut.setBody("Welcome %s" format exchange.getIn.getBody)
}
})
}
}
// the below lines can be added to a Boot class, so that you can run the example from a MicroKernel
val system = ActorSystem("some-system")
val producer = system.actorOf(Props[Producer1])
val mediator = system.actorOf(Props(new Transformer(producer)))
val consumer = system.actorOf(Props(new Consumer3(mediator)))
CamelExtension(system).context.addRoutes(new CustomRouteBuilder)
//#CustomRouteExample
}
}

View file

@ -0,0 +1,47 @@
package docs.camel
object HttpExample {
{
//#HttpExample
import org.apache.camel.Exchange
import akka.actor.{ Actor, ActorRef, Props, ActorSystem }
import akka.camel.{ Producer, CamelMessage, Consumer }
import akka.actor.Status.Failure
class HttpConsumer(producer: ActorRef) extends Consumer {
def endpointUri = "jetty:http://0.0.0.0:8875/"
def receive = {
case msg producer forward msg
}
}
class HttpProducer(transformer: ActorRef) extends Actor with Producer {
def endpointUri = "jetty://http://akka.io/?bridgeEndpoint=true"
override def transformOutgoingMessage(msg: Any) = msg match {
case msg: CamelMessage msg.addHeaders(msg.headers(Set(Exchange.HTTP_PATH)))
}
override def routeResponse(msg: Any) { transformer forward msg }
}
class HttpTransformer extends Actor {
def receive = {
case msg: CamelMessage sender ! (msg.mapBody { body: String body replaceAll ("Akka ", "AKKA ") })
case msg: Failure sender ! msg
}
}
// Create the actors. this can be done in a Boot class so you can
// run the example in the MicroKernel. just add the below three lines to your boot class.
val system = ActorSystem("some-system")
val httpTransformer = system.actorOf(Props[HttpTransformer])
val httpProducer = system.actorOf(Props(new HttpProducer(httpTransformer)))
val httpConsumer = system.actorOf(Props(new HttpConsumer(httpProducer)))
//#HttpExample
}
}

View file

@ -1,11 +1,14 @@
package docs.camel
import akka.actor.{ Props, ActorSystem }
import akka.camel.CamelExtension
object Introduction {
def foo = {
//#Consumer-mina
import akka.camel.{ CamelMessage, Consumer }
class MinaClient extends Consumer {
class MyEndpoint extends Consumer {
def endpointUri = "mina:tcp://localhost:6200?textline=true"
def receive = {
@ -17,15 +20,15 @@ object Introduction {
// start and expose actor via tcp
import akka.actor.{ ActorSystem, Props }
val sys = ActorSystem("camel")
val mina = sys.actorOf(Props[MinaClient])
val system = ActorSystem("some-system")
val mina = system.actorOf(Props[MyEndpoint])
//#Consumer-mina
}
def bar = {
//#Consumer
import akka.camel.{ CamelMessage, Consumer }
class JettyAdapter extends Consumer {
class MyEndpoint extends Consumer {
def endpointUri = "jetty:http://localhost:8877/example"
def receive = {
@ -45,10 +48,58 @@ object Introduction {
def endpointUri = "jms:queue:Orders"
}
val sys = ActorSystem("camel")
val sys = ActorSystem("some-system")
val orders = sys.actorOf(Props[Orders])
orders ! <order amount="100" currency="PLN" itemId="12345"/>
//#Producer
}
{
//#CamelExtension
val system = ActorSystem("some-system")
val camel = CamelExtension(system)
val camelContext = camel.context
val producerTemplate = camel.template
//#CamelExtension
}
{
//#CamelExtensionAddComponent
// import org.apache.activemq.camel.component.ActiveMQComponent
val system = ActorSystem("some-system")
val camel = CamelExtension(system)
val camelContext = camel.context
// camelContext.addComponent("activemq", ActiveMQComponent.activeMQComponent("vm://localhost?broker.persistent=false"))
//#CamelExtensionAddComponent
}
{
//#CamelActivation
import akka.camel.{ CamelMessage, Consumer }
import akka.util.duration._
class MyEndpoint extends Consumer {
def endpointUri = "mina:tcp://localhost:6200?textline=true"
def receive = {
case msg: CamelMessage { /* ... */ }
case _ { /* ... */ }
}
}
val system = ActorSystem("some-system")
val camel = CamelExtension(system)
val actorRef = system.actorOf(Props[MyEndpoint])
// get a future reference to the activation of the endpoint of the Consumer Actor
val activationFuture = camel.activationFutureFor(actorRef, 10 seconds)
// or, block wait on the activation
camel.awaitActivation(actorRef, 10 seconds)
//#CamelActivation
//#CamelDeactivation
system.stop(actorRef)
// get a future reference to the deactivation of the endpoint of the Consumer Actor
val deactivationFuture = camel.activationFutureFor(actorRef, 10 seconds)
// or, block wait on the deactivation
camel.awaitDeactivation(actorRef, 10 seconds)
//#CamelDeactivation
}
}

View file

@ -0,0 +1,127 @@
package docs.camel
import akka.camel.CamelExtension
object Producers {
{
//#Producer1
import akka.actor.Actor
import akka.actor.{ Props, ActorSystem }
import akka.camel.{ Producer, CamelMessage }
import akka.util.Timeout
class Producer1 extends Actor with Producer {
def endpointUri = "http://localhost:8080/news"
}
//#Producer1
//#AskProducer
import akka.pattern.ask
import akka.util.duration._
implicit val timeout = Timeout(10 seconds)
val system = ActorSystem("some-system")
val producer = system.actorOf(Props[Producer1])
val future = producer.ask("some request").mapTo[CamelMessage]
//#AskProducer
}
{
//#RouteResponse
import akka.actor.{ Actor, ActorRef }
import akka.camel.{ Producer, CamelMessage }
import akka.actor.{ Props, ActorSystem }
class ResponseReceiver extends Actor {
def receive = {
case msg: CamelMessage
// do something with the forwarded response
}
}
class Forwarder(uri: String, target: ActorRef) extends Actor with Producer {
def endpointUri = uri
override def routeResponse(msg: Any) { target forward msg }
}
val system = ActorSystem("some-system")
val receiver = system.actorOf(Props[ResponseReceiver])
val forwardResponse = system.actorOf(Props(new Forwarder("http://localhost:8080/news/akka", receiver)))
// the Forwarder sends out a request to the web page and forwards the response to
// the ResponseReceiver
forwardResponse ! "some request"
//#RouteResponse
}
{
//#TransformOutgoingMessage
import akka.actor.Actor
import akka.camel.{ Producer, CamelMessage }
class Transformer(uri: String) extends Actor with Producer {
def endpointUri = uri
def upperCase(msg: CamelMessage) = msg.mapBody {
body: String body.toUpperCase
}
override def transformOutgoingMessage(msg: Any) = msg match {
case msg: CamelMessage upperCase(msg)
}
}
//#TransformOutgoingMessage
}
{
//#Oneway
import akka.actor.{ Actor, Props, ActorSystem }
import akka.camel.Producer
class OnewaySender(uri: String) extends Actor with Producer {
def endpointUri = uri
override def oneway: Boolean = true
}
val system = ActorSystem("some-system")
val producer = system.actorOf(Props(new OnewaySender("activemq:FOO.BAR")))
producer ! "Some message"
//#Oneway
}
{
//#Correlate
import akka.camel.{ Producer, CamelMessage }
import akka.actor.Actor
import akka.actor.{ Props, ActorSystem }
class Producer2 extends Actor with Producer {
def endpointUri = "activemg:FOO.BAR"
}
val system = ActorSystem("some-system")
val producer = system.actorOf(Props[Producer2])
producer ! CamelMessage("bar", Map(CamelMessage.MessageExchangeId -> "123"))
//#Correlate
}
{
//#ProducerTemplate
import akka.actor.Actor
class MyActor extends Actor {
def receive = {
case msg
val template = CamelExtension(context.system).template
template.sendBody("direct:news", msg)
}
}
//#ProducerTemplate
}
{
//#RequestProducerTemplate
import akka.actor.Actor
class MyActor extends Actor {
def receive = {
case msg
val template = CamelExtension(context.system).template
sender ! template.requestBody("direct:news", msg)
}
}
//#RequestProducerTemplate
}
}

View file

@ -0,0 +1,47 @@
package docs.camel
object PublishSubscribe {
{
//#PubSub
import akka.actor.{ Actor, ActorRef, ActorSystem, Props }
import akka.camel.{ Producer, CamelMessage, Consumer }
class Subscriber(name: String, uri: String) extends Actor with Consumer {
def endpointUri = uri
def receive = {
case msg: CamelMessage println("%s received: %s" format (name, msg.body))
}
}
class Publisher(name: String, uri: String) extends Actor with Producer {
def endpointUri = uri
// one-way communication with JMS
override def oneway = true
}
class PublisherBridge(uri: String, publisher: ActorRef) extends Actor with Consumer {
def endpointUri = uri
def receive = {
case msg: CamelMessage {
publisher ! msg.bodyAs[String]
sender ! ("message published")
}
}
}
// Add below to a Boot class
// Setup publish/subscribe example
val system = ActorSystem("some-system")
val jmsUri = "jms:topic:test"
val jmsSubscriber1 = system.actorOf(Props(new Subscriber("jms-subscriber-1", jmsUri)))
val jmsSubscriber2 = system.actorOf(Props(new Subscriber("jms-subscriber-2", jmsUri)))
val jmsPublisher = system.actorOf(Props(new Publisher("jms-publisher", jmsUri)))
val jmsPublisherBridge = system.actorOf(Props(new PublisherBridge("jetty:http://0.0.0.0:8877/camel/pub/jms", jmsPublisher)))
//#PubSub
}
}

View file

@ -0,0 +1,34 @@
package docs.camel
object QuartzExample {
{
//#Quartz
import akka.actor.{ ActorSystem, Props }
import akka.camel.{ Consumer }
class MyQuartzActor extends Consumer {
def endpointUri = "quartz://example?cron=0/2+*+*+*+*+?"
def receive = {
case msg println("==============> received %s " format msg)
} // end receive
} // end MyQuartzActor
object MyQuartzActor {
def main(str: Array[String]) {
val system = ActorSystem("my-quartz-system")
system.actorOf(Props[MyQuartzActor])
} // end main
} // end MyQuartzActor
//#Quartz
}
}