diff --git a/akka-samples/akka-sample-camel/src/main/scala/Actors.scala b/akka-samples/akka-sample-camel/src/main/scala/Actors.scala index b4350e7cc0..68537eb3e3 100644 --- a/akka-samples/akka-sample-camel/src/main/scala/Actors.scala +++ b/akka-samples/akka-sample-camel/src/main/scala/Actors.scala @@ -1,7 +1,9 @@ package sample.camel +import org.apache.camel.Exchange + import se.scalablesolutions.akka.actor.{Actor, ActorRef, RemoteActor} -import se.scalablesolutions.akka.camel.{Producer, Message, Consumer} +import se.scalablesolutions.akka.camel.{Failure, Producer, Message, Consumer} import se.scalablesolutions.akka.util.Logging /** @@ -110,3 +112,23 @@ class PublisherBridge(uri: String, publisher: ActorRef) extends Actor with Consu } } } + +class HttpConsumer(producer: ActorRef) extends Actor with Consumer { + def endpointUri = "jetty:http://0.0.0.0:8875/" + protected def receive = { + // only keep Exchange.HTTP_PATH message header (which needed by bridge endpoint) + case msg: Message => producer forward msg.setHeaders(msg.headers(Set(Exchange.HTTP_PATH))) + } +} + +class HttpProducer(transformer: ActorRef) extends Actor with Producer { + def endpointUri = "jetty://http://akkasource.org/?bridgeEndpoint=true" + override def forwardResultTo = Some(transformer) +} + +class HttpTransformer extends Actor { + protected def receive = { + case msg: Message => self.reply(msg.transformBody[String] {_ replaceAll ("Akka ", "AKKA ")}) + case msg: Failure => self.reply(msg) + } +} diff --git a/akka-samples/akka-sample-camel/src/main/scala/Boot.scala b/akka-samples/akka-sample-camel/src/main/scala/Boot.scala index 2cfb56e64f..3894c24cca 100644 --- a/akka-samples/akka-sample-camel/src/main/scala/Boot.scala +++ b/akka-samples/akka-sample-camel/src/main/scala/Boot.scala @@ -79,6 +79,14 @@ class Boot { actorOf[Consumer4].start // POSTing "stop" to http://0.0.0.0:8877/camel/stop stops and unpublishes this actor actorOf[Consumer5].start // POSTing any msg to http://0.0.0.0:8877/camel/start starts and published Consumer4 again. + // ----------------------------------------------------------------------- + // Non-blocking consumer-producer example (Akka homepage transformation) + // ----------------------------------------------------------------------- + + val nbResponder = actorOf(new HttpTransformer).start + val nbProducer = actorOf(new HttpProducer(nbResponder)).start + val nbConsumer = actorOf(new HttpConsumer(nbProducer)).start + // ----------------------------------------------------------------------- // Active object example // -----------------------------------------------------------------------