Non-blocking routing and transformation example with asynchronous HTTP request/reply

This commit is contained in:
Martin Krasser 2010-07-16 17:19:38 +02:00
parent df1de812af
commit 8707bb04ae
2 changed files with 31 additions and 1 deletions

View file

@ -1,7 +1,9 @@
package sample.camel
import org.apache.camel.Exchange
import se.scalablesolutions.akka.actor.{Actor, ActorRef, RemoteActor}
import se.scalablesolutions.akka.camel.{Producer, Message, Consumer}
import se.scalablesolutions.akka.camel.{Failure, Producer, Message, Consumer}
import se.scalablesolutions.akka.util.Logging
/**
@ -110,3 +112,23 @@ class PublisherBridge(uri: String, publisher: ActorRef) extends Actor with Consu
}
}
}
class HttpConsumer(producer: ActorRef) extends Actor with Consumer {
def endpointUri = "jetty:http://0.0.0.0:8875/"
protected def receive = {
// only keep Exchange.HTTP_PATH message header (which needed by bridge endpoint)
case msg: Message => producer forward msg.setHeaders(msg.headers(Set(Exchange.HTTP_PATH)))
}
}
class HttpProducer(transformer: ActorRef) extends Actor with Producer {
def endpointUri = "jetty://http://akkasource.org/?bridgeEndpoint=true"
override def forwardResultTo = Some(transformer)
}
class HttpTransformer extends Actor {
protected def receive = {
case msg: Message => self.reply(msg.transformBody[String] {_ replaceAll ("Akka ", "AKKA ")})
case msg: Failure => self.reply(msg)
}
}

View file

@ -79,6 +79,14 @@ class Boot {
actorOf[Consumer4].start // POSTing "stop" to http://0.0.0.0:8877/camel/stop stops and unpublishes this actor
actorOf[Consumer5].start // POSTing any msg to http://0.0.0.0:8877/camel/start starts and published Consumer4 again.
// -----------------------------------------------------------------------
// Non-blocking consumer-producer example (Akka homepage transformation)
// -----------------------------------------------------------------------
val nbResponder = actorOf(new HttpTransformer).start
val nbProducer = actorOf(new HttpProducer(nbResponder)).start
val nbConsumer = actorOf(new HttpConsumer(nbProducer)).start
// -----------------------------------------------------------------------
// Active object example
// -----------------------------------------------------------------------