The unborkinging
This commit is contained in:
parent
acf9aa6aa4
commit
ebe0cc05c9
8 changed files with 48 additions and 52 deletions
|
|
@ -39,7 +39,7 @@ object Consumers {
|
||||||
def endpointUri = "jms:queue:test"
|
def endpointUri = "jms:queue:test"
|
||||||
|
|
||||||
def receive = {
|
def receive = {
|
||||||
case msg:CamelMessage ⇒
|
case msg: CamelMessage ⇒
|
||||||
sender ! Ack
|
sender ! Ack
|
||||||
// on success
|
// on success
|
||||||
// ..
|
// ..
|
||||||
|
|
|
||||||
|
|
@ -3,12 +3,11 @@ package docs.camel
|
||||||
import akka.camel.CamelMessage
|
import akka.camel.CamelMessage
|
||||||
import akka.actor.Status.Failure
|
import akka.actor.Status.Failure
|
||||||
|
|
||||||
|
|
||||||
object CustomRoute {
|
object CustomRoute {
|
||||||
{
|
{
|
||||||
//#CustomRoute
|
//#CustomRoute
|
||||||
import akka.actor.{Props, ActorSystem, Actor, ActorRef}
|
import akka.actor.{ Props, ActorSystem, Actor, ActorRef }
|
||||||
import akka.camel.{CamelMessage, CamelExtension}
|
import akka.camel.{ CamelMessage, CamelExtension }
|
||||||
import org.apache.camel.builder.RouteBuilder
|
import org.apache.camel.builder.RouteBuilder
|
||||||
import akka.camel._
|
import akka.camel._
|
||||||
class Responder extends Actor {
|
class Responder extends Actor {
|
||||||
|
|
@ -20,7 +19,7 @@ object CustomRoute {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class CustomRouteBuilder(system: ActorSystem, responder:ActorRef) extends RouteBuilder {
|
class CustomRouteBuilder(system: ActorSystem, responder: ActorRef) extends RouteBuilder {
|
||||||
def configure {
|
def configure {
|
||||||
from("jetty:http://localhost:8877/camel/custom").to(responder)
|
from("jetty:http://localhost:8877/camel/custom").to(responder)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,27 +1,26 @@
|
||||||
package docs.camel
|
package docs.camel
|
||||||
|
|
||||||
|
|
||||||
object CustomRouteExample {
|
object CustomRouteExample {
|
||||||
{
|
{
|
||||||
//#CustomRouteExample
|
//#CustomRouteExample
|
||||||
import akka.actor.{Actor, ActorRef, Props, ActorSystem}
|
import akka.actor.{ Actor, ActorRef, Props, ActorSystem }
|
||||||
import akka.camel.{CamelMessage, Consumer, Producer, CamelExtension}
|
import akka.camel.{ CamelMessage, Consumer, Producer, CamelExtension }
|
||||||
import org.apache.camel.builder.RouteBuilder
|
import org.apache.camel.builder.RouteBuilder
|
||||||
import org.apache.camel.{Exchange, Processor}
|
import org.apache.camel.{ Exchange, Processor }
|
||||||
|
|
||||||
class Consumer3(transformer: ActorRef) extends Actor with Consumer {
|
class Consumer3(transformer: ActorRef) extends Actor with Consumer {
|
||||||
def endpointUri = "jetty:http://0.0.0.0:8877/camel/welcome"
|
def endpointUri = "jetty:http://0.0.0.0:8877/camel/welcome"
|
||||||
|
|
||||||
def receive = {
|
def receive = {
|
||||||
// Forward a string representation of the message body to transformer
|
// Forward a string representation of the message body to transformer
|
||||||
case msg: CamelMessage => transformer.forward(msg.bodyAs[String])
|
case msg: CamelMessage ⇒ transformer.forward(msg.bodyAs[String])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class Transformer(producer: ActorRef) extends Actor {
|
class Transformer(producer: ActorRef) extends Actor {
|
||||||
def receive = {
|
def receive = {
|
||||||
// example: transform message body "foo" to "- foo -" and forward result to producer
|
// example: transform message body "foo" to "- foo -" and forward result to producer
|
||||||
case msg: CamelMessage => producer.forward(msg.mapBody((body: String) => "- %s -" format body))
|
case msg: CamelMessage ⇒ producer.forward(msg.mapBody((body: String) ⇒ "- %s -" format body))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -5,15 +5,15 @@ object HttpExample {
|
||||||
{
|
{
|
||||||
//#HttpExample
|
//#HttpExample
|
||||||
import org.apache.camel.Exchange
|
import org.apache.camel.Exchange
|
||||||
import akka.actor.{Actor, ActorRef, Props, ActorSystem}
|
import akka.actor.{ Actor, ActorRef, Props, ActorSystem }
|
||||||
import akka.camel.{Producer, CamelMessage, Consumer}
|
import akka.camel.{ Producer, CamelMessage, Consumer }
|
||||||
import akka.actor.Status.Failure
|
import akka.actor.Status.Failure
|
||||||
|
|
||||||
class HttpConsumer(producer: ActorRef) extends Consumer {
|
class HttpConsumer(producer: ActorRef) extends Consumer {
|
||||||
def endpointUri = "jetty:http://0.0.0.0:8875/"
|
def endpointUri = "jetty:http://0.0.0.0:8875/"
|
||||||
|
|
||||||
def receive = {
|
def receive = {
|
||||||
case msg => producer forward msg
|
case msg ⇒ producer forward msg
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -21,7 +21,7 @@ object HttpExample {
|
||||||
def endpointUri = "jetty://http://akka.io/?bridgeEndpoint=true"
|
def endpointUri = "jetty://http://akka.io/?bridgeEndpoint=true"
|
||||||
|
|
||||||
override def transformOutgoingMessage(msg: Any) = msg match {
|
override def transformOutgoingMessage(msg: Any) = msg match {
|
||||||
case msg: CamelMessage => msg.addHeaders(msg.headers(Set(Exchange.HTTP_PATH)))
|
case msg: CamelMessage ⇒ msg.addHeaders(msg.headers(Set(Exchange.HTTP_PATH)))
|
||||||
}
|
}
|
||||||
|
|
||||||
override def routeResponse(msg: Any) { transformer forward msg }
|
override def routeResponse(msg: Any) { transformer forward msg }
|
||||||
|
|
@ -29,8 +29,8 @@ object HttpExample {
|
||||||
|
|
||||||
class HttpTransformer extends Actor {
|
class HttpTransformer extends Actor {
|
||||||
def receive = {
|
def receive = {
|
||||||
case msg: CamelMessage => sender ! (msg.mapBody {body: String => body replaceAll ("Akka ", "AKKA ")})
|
case msg: CamelMessage ⇒ sender ! (msg.mapBody { body: String ⇒ body replaceAll ("Akka ", "AKKA ") })
|
||||||
case msg: Failure => sender ! msg
|
case msg: Failure ⇒ sender ! msg
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
package docs.camel
|
package docs.camel
|
||||||
|
|
||||||
import akka.actor.{Props, ActorSystem}
|
import akka.actor.{ Props, ActorSystem }
|
||||||
import akka.camel.CamelExtension
|
import akka.camel.CamelExtension
|
||||||
|
|
||||||
object Introduction {
|
object Introduction {
|
||||||
|
|
@ -89,14 +89,14 @@ object Introduction {
|
||||||
val camel = CamelExtension(system)
|
val camel = CamelExtension(system)
|
||||||
val actorRef = system.actorOf(Props[MyEndpoint])
|
val actorRef = system.actorOf(Props[MyEndpoint])
|
||||||
// get a future reference to the activation of the endpoint of the Consumer Actor
|
// get a future reference to the activation of the endpoint of the Consumer Actor
|
||||||
val activationFuture = camel.activationFutureFor(actorRef,10 seconds)
|
val activationFuture = camel.activationFutureFor(actorRef, 10 seconds)
|
||||||
// or, block wait on the activation
|
// or, block wait on the activation
|
||||||
camel.awaitActivation(actorRef, 10 seconds)
|
camel.awaitActivation(actorRef, 10 seconds)
|
||||||
//#CamelActivation
|
//#CamelActivation
|
||||||
//#CamelDeactivation
|
//#CamelDeactivation
|
||||||
system.stop(actorRef)
|
system.stop(actorRef)
|
||||||
// get a future reference to the deactivation of the endpoint of the Consumer Actor
|
// get a future reference to the deactivation of the endpoint of the Consumer Actor
|
||||||
val deactivationFuture = camel.activationFutureFor(actorRef,10 seconds)
|
val deactivationFuture = camel.activationFutureFor(actorRef, 10 seconds)
|
||||||
// or, block wait on the deactivation
|
// or, block wait on the deactivation
|
||||||
camel.awaitDeactivation(actorRef, 10 seconds)
|
camel.awaitDeactivation(actorRef, 10 seconds)
|
||||||
//#CamelDeactivation
|
//#CamelDeactivation
|
||||||
|
|
|
||||||
|
|
@ -6,8 +6,8 @@ object Producers {
|
||||||
{
|
{
|
||||||
//#Producer1
|
//#Producer1
|
||||||
import akka.actor.Actor
|
import akka.actor.Actor
|
||||||
import akka.actor.{Props, ActorSystem}
|
import akka.actor.{ Props, ActorSystem }
|
||||||
import akka.camel.{Producer, CamelMessage}
|
import akka.camel.{ Producer, CamelMessage }
|
||||||
import akka.util.Timeout
|
import akka.util.Timeout
|
||||||
|
|
||||||
class Producer1 extends Actor with Producer {
|
class Producer1 extends Actor with Producer {
|
||||||
|
|
@ -26,14 +26,14 @@ object Producers {
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
//#RouteResponse
|
//#RouteResponse
|
||||||
import akka.actor.{Actor, ActorRef}
|
import akka.actor.{ Actor, ActorRef }
|
||||||
import akka.camel.{Producer, CamelMessage}
|
import akka.camel.{ Producer, CamelMessage }
|
||||||
import akka.actor.{Props, ActorSystem}
|
import akka.actor.{ Props, ActorSystem }
|
||||||
|
|
||||||
class ResponseReceiver extends Actor {
|
class ResponseReceiver extends Actor {
|
||||||
def receive = {
|
def receive = {
|
||||||
case msg:CamelMessage ⇒
|
case msg: CamelMessage ⇒
|
||||||
// do something with the forwarded response
|
// do something with the forwarded response
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -44,7 +44,7 @@ object Producers {
|
||||||
}
|
}
|
||||||
val system = ActorSystem("some-system")
|
val system = ActorSystem("some-system")
|
||||||
val receiver = system.actorOf(Props[ResponseReceiver])
|
val receiver = system.actorOf(Props[ResponseReceiver])
|
||||||
val forwardResponse = system.actorOf(Props(new Forwarder("http://localhost:8080/news/akka",receiver)))
|
val forwardResponse = system.actorOf(Props(new Forwarder("http://localhost:8080/news/akka", receiver)))
|
||||||
// the Forwarder sends out a request to the web page and forwards the response to
|
// the Forwarder sends out a request to the web page and forwards the response to
|
||||||
// the ResponseReceiver
|
// the ResponseReceiver
|
||||||
forwardResponse ! "some request"
|
forwardResponse ! "some request"
|
||||||
|
|
@ -53,12 +53,12 @@ object Producers {
|
||||||
{
|
{
|
||||||
//#TransformOutgoingMessage
|
//#TransformOutgoingMessage
|
||||||
import akka.actor.Actor
|
import akka.actor.Actor
|
||||||
import akka.camel.{Producer, CamelMessage}
|
import akka.camel.{ Producer, CamelMessage }
|
||||||
|
|
||||||
class Transformer(uri: String) extends Actor with Producer {
|
class Transformer(uri: String) extends Actor with Producer {
|
||||||
def endpointUri = uri
|
def endpointUri = uri
|
||||||
|
|
||||||
def upperCase(msg:CamelMessage) = msg.mapBody {
|
def upperCase(msg: CamelMessage) = msg.mapBody {
|
||||||
body: String ⇒ body.toUpperCase
|
body: String ⇒ body.toUpperCase
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -70,10 +70,10 @@ object Producers {
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
//#Oneway
|
//#Oneway
|
||||||
import akka.actor.{Actor, Props, ActorSystem}
|
import akka.actor.{ Actor, Props, ActorSystem }
|
||||||
import akka.camel.Producer
|
import akka.camel.Producer
|
||||||
|
|
||||||
class OnewaySender(uri:String) extends Actor with Producer {
|
class OnewaySender(uri: String) extends Actor with Producer {
|
||||||
def endpointUri = uri
|
def endpointUri = uri
|
||||||
override def oneway: Boolean = true
|
override def oneway: Boolean = true
|
||||||
}
|
}
|
||||||
|
|
@ -86,9 +86,9 @@ object Producers {
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
//#Correlate
|
//#Correlate
|
||||||
import akka.camel.{Producer, CamelMessage}
|
import akka.camel.{ Producer, CamelMessage }
|
||||||
import akka.actor.Actor
|
import akka.actor.Actor
|
||||||
import akka.actor.{Props, ActorSystem}
|
import akka.actor.{ Props, ActorSystem }
|
||||||
|
|
||||||
class Producer2 extends Actor with Producer {
|
class Producer2 extends Actor with Producer {
|
||||||
def endpointUri = "activemg:FOO.BAR"
|
def endpointUri = "activemg:FOO.BAR"
|
||||||
|
|
@ -103,11 +103,11 @@ object Producers {
|
||||||
//#ProducerTemplate
|
//#ProducerTemplate
|
||||||
import akka.actor.Actor
|
import akka.actor.Actor
|
||||||
class MyActor extends Actor {
|
class MyActor extends Actor {
|
||||||
def receive = {
|
def receive = {
|
||||||
case msg ⇒
|
case msg ⇒
|
||||||
val template = CamelExtension(context.system).template
|
val template = CamelExtension(context.system).template
|
||||||
template.sendBody("direct:news", msg)
|
template.sendBody("direct:news", msg)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
//#ProducerTemplate
|
//#ProducerTemplate
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,17 +1,16 @@
|
||||||
package docs.camel
|
package docs.camel
|
||||||
|
|
||||||
|
|
||||||
object PublishSubscribe {
|
object PublishSubscribe {
|
||||||
{
|
{
|
||||||
//#PubSub
|
//#PubSub
|
||||||
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
|
import akka.actor.{ Actor, ActorRef, ActorSystem, Props }
|
||||||
import akka.camel.{Producer, CamelMessage, Consumer}
|
import akka.camel.{ Producer, CamelMessage, Consumer }
|
||||||
|
|
||||||
class Subscriber(name:String, uri: String) extends Actor with Consumer {
|
class Subscriber(name: String, uri: String) extends Actor with Consumer {
|
||||||
def endpointUri = uri
|
def endpointUri = uri
|
||||||
|
|
||||||
protected def receive = {
|
def receive = {
|
||||||
case msg: CamelMessage => println("%s received: %s" format (name, msg.body))
|
case msg: CamelMessage ⇒ println("%s received: %s" format (name, msg.body))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -26,8 +25,8 @@ object PublishSubscribe {
|
||||||
class PublisherBridge(uri: String, publisher: ActorRef) extends Actor with Consumer {
|
class PublisherBridge(uri: String, publisher: ActorRef) extends Actor with Consumer {
|
||||||
def endpointUri = uri
|
def endpointUri = uri
|
||||||
|
|
||||||
protected def receive = {
|
def receive = {
|
||||||
case msg: CamelMessage => {
|
case msg: CamelMessage ⇒ {
|
||||||
publisher ! msg.bodyAs[String]
|
publisher ! msg.bodyAs[String]
|
||||||
sender ! ("message published")
|
sender ! ("message published")
|
||||||
}
|
}
|
||||||
|
|
@ -40,7 +39,7 @@ object PublishSubscribe {
|
||||||
val jmsUri = "jms:topic:test"
|
val jmsUri = "jms:topic:test"
|
||||||
val jmsSubscriber1 = system.actorOf(Props(new Subscriber("jms-subscriber-1", jmsUri)))
|
val jmsSubscriber1 = system.actorOf(Props(new Subscriber("jms-subscriber-1", jmsUri)))
|
||||||
val jmsSubscriber2 = system.actorOf(Props(new Subscriber("jms-subscriber-2", jmsUri)))
|
val jmsSubscriber2 = system.actorOf(Props(new Subscriber("jms-subscriber-2", jmsUri)))
|
||||||
val jmsPublisher = system.actorOf(Props(new Publisher("jms-publisher", jmsUri)))
|
val jmsPublisher = system.actorOf(Props(new Publisher("jms-publisher", jmsUri)))
|
||||||
val jmsPublisherBridge = system.actorOf(Props(new PublisherBridge("jetty:http://0.0.0.0:8877/camel/pub/jms", jmsPublisher)))
|
val jmsPublisherBridge = system.actorOf(Props(new PublisherBridge("jetty:http://0.0.0.0:8877/camel/pub/jms", jmsPublisher)))
|
||||||
//#PubSub
|
//#PubSub
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,13 +1,12 @@
|
||||||
package docs.camel
|
package docs.camel
|
||||||
|
|
||||||
|
|
||||||
object QuartzExample {
|
object QuartzExample {
|
||||||
|
|
||||||
{
|
{
|
||||||
//#Quartz
|
//#Quartz
|
||||||
import akka.actor.{ActorSystem, Props}
|
import akka.actor.{ ActorSystem, Props }
|
||||||
|
|
||||||
import akka.camel.{Consumer}
|
import akka.camel.{ Consumer }
|
||||||
|
|
||||||
class MyQuartzActor extends Consumer {
|
class MyQuartzActor extends Consumer {
|
||||||
|
|
||||||
|
|
@ -15,7 +14,7 @@ object QuartzExample {
|
||||||
|
|
||||||
def receive = {
|
def receive = {
|
||||||
|
|
||||||
case msg => println("==============> received %s " format msg)
|
case msg ⇒ println("==============> received %s " format msg)
|
||||||
|
|
||||||
} // end receive
|
} // end receive
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue