From 0770e54e4d3e22f23c727a47ccb9e947e87b07ef Mon Sep 17 00:00:00 2001 From: ticktock Date: Wed, 25 May 2011 13:47:36 -0700 Subject: [PATCH] copied over akka-sample-camel --- .../akka-sample-camel/config/akka.conf | 20 +++ .../config/microkernel-server.xml | 65 +++++++ .../src/main/java/sample/camel/BeanImpl.java | 13 ++ .../src/main/java/sample/camel/BeanIntf.java | 10 ++ .../sample/camel/RemoteTypedConsumer1.java | 15 ++ .../camel/RemoteTypedConsumer1Impl.java | 13 ++ .../sample/camel/RemoteTypedConsumer2.java | 15 ++ .../camel/RemoteTypedConsumer2Impl.java | 14 ++ .../java/sample/camel/TypedConsumer1.java | 17 ++ .../java/sample/camel/TypedConsumer1Impl.java | 21 +++ .../java/sample/camel/TypedConsumer2.java | 14 ++ .../java/sample/camel/TypedConsumer2Impl.java | 13 ++ .../java/sample/camel/UntypedConsumer1.java | 20 +++ .../src/main/resources/context-jms.xml | 27 +++ .../src/main/resources/context-standalone.xml | 26 +++ .../src/main/scala/sample/camel/Actors.scala | 162 ++++++++++++++++++ .../src/main/scala/sample/camel/Boot.scala | 108 ++++++++++++ .../sample/camel/ClientApplication.scala | 26 +++ .../sample/camel/ServerApplication.scala | 23 +++ .../sample/camel/StandaloneApplication.scala | 128 ++++++++++++++ .../camel/SampleRemoteUntypedConsumer.java | 21 +++ .../camel/HttpConcurrencyTestStress.scala | 99 +++++++++++ .../sample/camel/RemoteConsumerTest.scala | 99 +++++++++++ project/build/AkkaProject.scala | 27 ++- 24 files changed, 994 insertions(+), 2 deletions(-) create mode 100644 akka-samples/akka-sample-camel/config/akka.conf create mode 100644 akka-samples/akka-sample-camel/config/microkernel-server.xml create mode 100644 akka-samples/akka-sample-camel/src/main/java/sample/camel/BeanImpl.java create mode 100644 akka-samples/akka-sample-camel/src/main/java/sample/camel/BeanIntf.java create mode 100644 akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteTypedConsumer1.java create mode 100644 akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteTypedConsumer1Impl.java create mode 100644 akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteTypedConsumer2.java create mode 100644 akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteTypedConsumer2Impl.java create mode 100644 akka-samples/akka-sample-camel/src/main/java/sample/camel/TypedConsumer1.java create mode 100644 akka-samples/akka-sample-camel/src/main/java/sample/camel/TypedConsumer1Impl.java create mode 100644 akka-samples/akka-sample-camel/src/main/java/sample/camel/TypedConsumer2.java create mode 100644 akka-samples/akka-sample-camel/src/main/java/sample/camel/TypedConsumer2Impl.java create mode 100644 akka-samples/akka-sample-camel/src/main/java/sample/camel/UntypedConsumer1.java create mode 100644 akka-samples/akka-sample-camel/src/main/resources/context-jms.xml create mode 100644 akka-samples/akka-sample-camel/src/main/resources/context-standalone.xml create mode 100644 akka-samples/akka-sample-camel/src/main/scala/sample/camel/Actors.scala create mode 100644 akka-samples/akka-sample-camel/src/main/scala/sample/camel/Boot.scala create mode 100644 akka-samples/akka-sample-camel/src/main/scala/sample/camel/ClientApplication.scala create mode 100644 akka-samples/akka-sample-camel/src/main/scala/sample/camel/ServerApplication.scala create mode 100644 akka-samples/akka-sample-camel/src/main/scala/sample/camel/StandaloneApplication.scala create mode 100644 akka-samples/akka-sample-camel/src/test/java/sample/camel/SampleRemoteUntypedConsumer.java create mode 100644 akka-samples/akka-sample-camel/src/test/scala/sample/camel/HttpConcurrencyTestStress.scala create mode 100644 akka-samples/akka-sample-camel/src/test/scala/sample/camel/RemoteConsumerTest.scala diff --git a/akka-samples/akka-sample-camel/config/akka.conf b/akka-samples/akka-sample-camel/config/akka.conf new file mode 100644 index 0000000000..0bd7bd16a2 --- /dev/null +++ b/akka-samples/akka-sample-camel/config/akka.conf @@ -0,0 +1,20 @@ +#################### +# Akka Config File # +#################### + +akka { + version = "2.0-SNAPSHOT" + + enabled-modules = ["camel", "http"] + + time-unit = "seconds" + + event-handlers = ["akka.event.EventHandler$DefaultListener"] + + boot = ["sample.camel.Boot"] + + http { + hostname = "localhost" + port = 9998 + } +} diff --git a/akka-samples/akka-sample-camel/config/microkernel-server.xml b/akka-samples/akka-sample-camel/config/microkernel-server.xml new file mode 100644 index 0000000000..6be6beec33 --- /dev/null +++ b/akka-samples/akka-sample-camel/config/microkernel-server.xml @@ -0,0 +1,65 @@ + + + + + + + + + + + + + + + + + + + + + + + 300000 + 2 + false + 8443 + 20000 + 5000 + + + + + + + + + + + + + + / + + akka.http.AkkaRestServlet + /* + + + + + + + + + + + + + + + true + true + true + 1000 + + diff --git a/akka-samples/akka-sample-camel/src/main/java/sample/camel/BeanImpl.java b/akka-samples/akka-sample-camel/src/main/java/sample/camel/BeanImpl.java new file mode 100644 index 0000000000..9ceba85d64 --- /dev/null +++ b/akka-samples/akka-sample-camel/src/main/java/sample/camel/BeanImpl.java @@ -0,0 +1,13 @@ +package sample.camel; + +import akka.actor.TypedActor; +/** + * @author Martin Krasser + */ +public class BeanImpl extends TypedActor implements BeanIntf { + + public String foo(String s) { + return "hello " + s; + } + +} diff --git a/akka-samples/akka-sample-camel/src/main/java/sample/camel/BeanIntf.java b/akka-samples/akka-sample-camel/src/main/java/sample/camel/BeanIntf.java new file mode 100644 index 0000000000..a7b2e6e6a4 --- /dev/null +++ b/akka-samples/akka-sample-camel/src/main/java/sample/camel/BeanIntf.java @@ -0,0 +1,10 @@ +package sample.camel; + +/** + * @author Martin Krasser + */ +public interface BeanIntf { + + public String foo(String s); + +} diff --git a/akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteTypedConsumer1.java b/akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteTypedConsumer1.java new file mode 100644 index 0000000000..3e8ce1e20f --- /dev/null +++ b/akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteTypedConsumer1.java @@ -0,0 +1,15 @@ +package sample.camel; + +import org.apache.camel.Body; +import org.apache.camel.Header; + +import akka.camel.consume; + +/** + * @author Martin Krasser + */ +public interface RemoteTypedConsumer1 { + + @consume("jetty:http://localhost:6644/camel/remote-typed-actor-1") + public String foo(@Body String body, @Header("name") String header); +} diff --git a/akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteTypedConsumer1Impl.java b/akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteTypedConsumer1Impl.java new file mode 100644 index 0000000000..522db0e4a7 --- /dev/null +++ b/akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteTypedConsumer1Impl.java @@ -0,0 +1,13 @@ +package sample.camel; + +import akka.actor.TypedActor; + +/** + * @author Martin Krasser + */ +public class RemoteTypedConsumer1Impl extends TypedActor implements RemoteTypedConsumer1 { + + public String foo(String body, String header) { + return String.format("remote1: body=%s header=%s", body, header); + } +} diff --git a/akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteTypedConsumer2.java b/akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteTypedConsumer2.java new file mode 100644 index 0000000000..ba093a1d96 --- /dev/null +++ b/akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteTypedConsumer2.java @@ -0,0 +1,15 @@ +package sample.camel; + +import org.apache.camel.Body; +import org.apache.camel.Header; +import akka.camel.consume; + +/** + * @author Martin Krasser + */ +public interface RemoteTypedConsumer2 { + + @consume("jetty:http://localhost:6644/camel/remote-typed-actor-2") + public String foo(@Body String body, @Header("name") String header); + +} diff --git a/akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteTypedConsumer2Impl.java b/akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteTypedConsumer2Impl.java new file mode 100644 index 0000000000..b3475ad2d6 --- /dev/null +++ b/akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteTypedConsumer2Impl.java @@ -0,0 +1,14 @@ +package sample.camel; + +import akka.actor.TypedActor; + +/** + * @author Martin Krasser + */ +public class RemoteTypedConsumer2Impl extends TypedActor implements RemoteTypedConsumer2 { + + public String foo(String body, String header) { + return String.format("remote2: body=%s header=%s", body, header); + } + +} diff --git a/akka-samples/akka-sample-camel/src/main/java/sample/camel/TypedConsumer1.java b/akka-samples/akka-sample-camel/src/main/java/sample/camel/TypedConsumer1.java new file mode 100644 index 0000000000..6213fb8f09 --- /dev/null +++ b/akka-samples/akka-sample-camel/src/main/java/sample/camel/TypedConsumer1.java @@ -0,0 +1,17 @@ +package sample.camel; + +import org.apache.camel.Body; +import org.apache.camel.Header; + +import akka.camel.consume; + +/** + * @author Martin Krasser + */ +public interface TypedConsumer1 { + @consume("file:data/input/typed-actor") + public void foo(String body); + + @consume("jetty:http://0.0.0.0:8877/camel/typed-actor") + public String bar(@Body String body, @Header("name") String header); +} diff --git a/akka-samples/akka-sample-camel/src/main/java/sample/camel/TypedConsumer1Impl.java b/akka-samples/akka-sample-camel/src/main/java/sample/camel/TypedConsumer1Impl.java new file mode 100644 index 0000000000..bd735fe14b --- /dev/null +++ b/akka-samples/akka-sample-camel/src/main/java/sample/camel/TypedConsumer1Impl.java @@ -0,0 +1,21 @@ +package sample.camel; + +import org.apache.camel.Body; +import org.apache.camel.Header; + +import akka.actor.TypedActor; + +/** + * @author Martin Krasser + */ +public class TypedConsumer1Impl extends TypedActor implements TypedConsumer1 { + + public void foo(String body) { + System.out.println("Received message:"); + System.out.println(body); + } + + public String bar(@Body String body, @Header("name") String header) { + return String.format("body=%s header=%s", body, header); + } +} diff --git a/akka-samples/akka-sample-camel/src/main/java/sample/camel/TypedConsumer2.java b/akka-samples/akka-sample-camel/src/main/java/sample/camel/TypedConsumer2.java new file mode 100644 index 0000000000..9a39b534b5 --- /dev/null +++ b/akka-samples/akka-sample-camel/src/main/java/sample/camel/TypedConsumer2.java @@ -0,0 +1,14 @@ +package sample.camel; + +import org.apache.camel.Body; +import org.apache.camel.Header; +import akka.camel.consume; + +/** + * @author Martin Krasser + */ +public interface TypedConsumer2 { + + @consume("direct:default") + public String foo(String body); +} diff --git a/akka-samples/akka-sample-camel/src/main/java/sample/camel/TypedConsumer2Impl.java b/akka-samples/akka-sample-camel/src/main/java/sample/camel/TypedConsumer2Impl.java new file mode 100644 index 0000000000..ed82810c10 --- /dev/null +++ b/akka-samples/akka-sample-camel/src/main/java/sample/camel/TypedConsumer2Impl.java @@ -0,0 +1,13 @@ +package sample.camel; + +import akka.actor.TypedActor; + +/** + * @author Martin Krasser + */ +public class TypedConsumer2Impl extends TypedActor implements TypedConsumer2 { + + public String foo(String body) { + return String.format("default: %s", body); + } +} diff --git a/akka-samples/akka-sample-camel/src/main/java/sample/camel/UntypedConsumer1.java b/akka-samples/akka-sample-camel/src/main/java/sample/camel/UntypedConsumer1.java new file mode 100644 index 0000000000..39d910fc28 --- /dev/null +++ b/akka-samples/akka-sample-camel/src/main/java/sample/camel/UntypedConsumer1.java @@ -0,0 +1,20 @@ +package sample.camel; + +import akka.camel.Message; +import akka.camel.UntypedConsumerActor; + +/** + * @author Martin Krasser + */ +public class UntypedConsumer1 extends UntypedConsumerActor { + + public String getEndpointUri() { + return "direct:untyped-consumer-1"; + } + + public void onReceive(Object message) { + Message msg = (Message)message; + String body = msg.getBodyAs(String.class); + getContext().replySafe(String.format("received %s", body)); + } +} diff --git a/akka-samples/akka-sample-camel/src/main/resources/context-jms.xml b/akka-samples/akka-sample-camel/src/main/resources/context-jms.xml new file mode 100644 index 0000000000..12e4541be3 --- /dev/null +++ b/akka-samples/akka-sample-camel/src/main/resources/context-jms.xml @@ -0,0 +1,27 @@ + + + + + + + + + + + + + + + + + + + + + + + diff --git a/akka-samples/akka-sample-camel/src/main/resources/context-standalone.xml b/akka-samples/akka-sample-camel/src/main/resources/context-standalone.xml new file mode 100644 index 0000000000..e4edcbc350 --- /dev/null +++ b/akka-samples/akka-sample-camel/src/main/resources/context-standalone.xml @@ -0,0 +1,26 @@ + + + + + + + + + + + + + + + + diff --git a/akka-samples/akka-sample-camel/src/main/scala/sample/camel/Actors.scala b/akka-samples/akka-sample-camel/src/main/scala/sample/camel/Actors.scala new file mode 100644 index 0000000000..5b176b0888 --- /dev/null +++ b/akka-samples/akka-sample-camel/src/main/scala/sample/camel/Actors.scala @@ -0,0 +1,162 @@ +package sample.camel + +import org.apache.camel.Exchange + +import akka.actor.{Actor, ActorRef, ActorRegistry} +import akka.camel.{Ack, Failure, Producer, Message, Consumer} + +/** + * Client-initiated remote actor. + */ +class RemoteActor1 extends Actor with Consumer { + def endpointUri = "jetty:http://localhost:6644/camel/remote-actor-1" + + protected def receive = { + case msg: Message => self.reply(Message("hello %s" format msg.bodyAs[String], Map("sender" -> "remote1"))) + } +} + +/** + * Server-initiated remote actor. + */ +class RemoteActor2 extends Actor with Consumer { + def endpointUri = "jetty:http://localhost:6644/camel/remote-actor-2" + + protected def receive = { + case msg: Message => self.reply(Message("hello %s" format msg.bodyAs[String], Map("sender" -> "remote2"))) + } +} + +class Producer1 extends Actor with Producer { + def endpointUri = "direct:welcome" + override def oneway = false // default +} + +class Consumer1 extends Actor with Consumer { + def endpointUri = "file:data/input/actor" + + def receive = { + case msg: Message => println("received %s" format msg.bodyAs[String]) + } +} + +class Consumer2 extends Actor with Consumer { + def endpointUri = "jetty:http://0.0.0.0:8877/camel/default" + + def receive = { + case msg: Message => self.reply("Hello %s" format msg.bodyAs[String]) + } +} + +class Consumer3(transformer: ActorRef) extends Actor with Consumer { + def endpointUri = "jetty:http://0.0.0.0:8877/camel/welcome" + + def receive = { + case msg: Message => transformer.forward(msg.setBodyAs[String]) + } +} + +class Consumer4 extends Actor with Consumer { + def endpointUri = "jetty:http://0.0.0.0:8877/camel/stop" + + def receive = { + case msg: Message => msg.bodyAs[String] match { + case "stop" => { + self.reply("Consumer4 stopped") + self.stop + } + case body => self.reply(body) + } + } +} + +class Consumer5 extends Actor with Consumer { + def endpointUri = "jetty:http://0.0.0.0:8877/camel/start" + + def receive = { + case _ => { + Actor.actorOf[Consumer4].start + self.reply("Consumer4 started") + } + } +} + +class Transformer(producer: ActorRef) extends Actor { + protected def receive = { + case msg: Message => producer.forward(msg.transformBody( (body: String) => "- %s -" format body)) + } +} + +class Subscriber(name:String, uri: String) extends Actor with Consumer { + def endpointUri = uri + + protected def receive = { + case msg: Message => println("%s received: %s" format (name, msg.body)) + } +} + +class Publisher(name: String, uri: String) extends Actor with Producer { + self.id = name + def endpointUri = uri + override def oneway = true +} + +class PublisherBridge(uri: String, publisher: ActorRef) extends Actor with Consumer { + def endpointUri = uri + + protected def receive = { + case msg: Message => { + publisher ! msg.bodyAs[String] + self.reply("message published") + } + } +} + +class HttpConsumer(producer: ActorRef) extends Actor with Consumer { + def endpointUri = "jetty:http://0.0.0.0:8875/" + + protected def receive = { + case msg => producer forward msg + } +} + +class HttpProducer(transformer: ActorRef) extends Actor with Producer { + def endpointUri = "jetty://http://akka.io/?bridgeEndpoint=true" + + override protected def receiveBeforeProduce = { + // only keep Exchange.HTTP_PATH message header (which needed by bridge endpoint) + case msg: Message => msg.setHeaders(msg.headers(Set(Exchange.HTTP_PATH))) + } + + override protected def receiveAfterProduce = { + // do not reply but forward result to transformer + case msg => transformer forward msg + } +} + +class HttpTransformer extends Actor { + protected def receive = { + case msg: Message => self.reply(msg.transformBody {body: String => body replaceAll ("Akka ", "AKKA ")}) + case msg: Failure => self.reply(msg) + } +} + +class FileConsumer extends Actor with Consumer { + def endpointUri = "file:data/input/actor?delete=true" + override def autoack = false + + var counter = 0 + + def receive = { + case msg: Message => { + if (counter == 2) { + println("received %s" format msg.bodyAs[String]) + self.reply(Ack) + } else { + println("rejected %s" format msg.bodyAs[String]) + counter += 1 + self.reply(Failure(new Exception("message number %s not accepted" format counter))) + } + } + } +} diff --git a/akka-samples/akka-sample-camel/src/main/scala/sample/camel/Boot.scala b/akka-samples/akka-sample-camel/src/main/scala/sample/camel/Boot.scala new file mode 100644 index 0000000000..a6065a004d --- /dev/null +++ b/akka-samples/akka-sample-camel/src/main/scala/sample/camel/Boot.scala @@ -0,0 +1,108 @@ +package sample.camel + +import org.apache.camel.{Exchange, Processor} +import org.apache.camel.builder.RouteBuilder +import org.apache.camel.impl.DefaultCamelContext +import org.apache.camel.spring.spi.ApplicationContextRegistry +import org.springframework.context.support.ClassPathXmlApplicationContext + +import akka.actor.Actor._ +import akka.actor.{TypedActor, Supervisor} +import akka.camel.CamelContextManager +import akka.config.Supervision._ + +/** + * @author Martin Krasser + */ +class Boot { + + // ----------------------------------------------------------------------- + // Basic example + // ----------------------------------------------------------------------- + + actorOf[Consumer1].start + actorOf[Consumer2].start + + // Alternatively, use a supervisor for these actors + //val supervisor = Supervisor( + // SupervisorConfig( + // RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])), + // Supervise(actorOf[Consumer1], Permanent) :: + // Supervise(actorOf[Consumer2], Permanent) :: Nil)) + + // ----------------------------------------------------------------------- + // Custom Camel route example + // ----------------------------------------------------------------------- + + // Create CamelContext and a Spring-based registry + val context = new ClassPathXmlApplicationContext("/context-jms.xml", getClass) + val registry = new ApplicationContextRegistry(context) + + // Use a custom Camel context and a custom touter builder + CamelContextManager.init(new DefaultCamelContext(registry)) + CamelContextManager.mandatoryContext.addRoutes(new CustomRouteBuilder) + + val producer = actorOf[Producer1] + val mediator = actorOf(new Transformer(producer)) + val consumer = actorOf(new Consumer3(mediator)) + + producer.start + mediator.start + consumer.start + + // ----------------------------------------------------------------------- + // Asynchronous consumer-producer example (Akka homepage transformation) + // ----------------------------------------------------------------------- + + val httpTransformer = actorOf(new HttpTransformer).start + val httpProducer = actorOf(new HttpProducer(httpTransformer)).start + val httpConsumer = actorOf(new HttpConsumer(httpProducer)).start + + // ----------------------------------------------------------------------- + // Publish subscribe examples + // ----------------------------------------------------------------------- + + // + // Cometd example commented out because camel-cometd is broken since Camel 2.3 + // + + //val cometdUri = "cometd://localhost:8111/test/abc?baseResource=file:target" + //val cometdSubscriber = actorOf(new Subscriber("cometd-subscriber", cometdUri)).start + //val cometdPublisher = actorOf(new Publisher("cometd-publisher", cometdUri)).start + + val jmsUri = "jms:topic:test" + val jmsSubscriber1 = actorOf(new Subscriber("jms-subscriber-1", jmsUri)).start + val jmsSubscriber2 = actorOf(new Subscriber("jms-subscriber-2", jmsUri)).start + val jmsPublisher = actorOf(new Publisher("jms-publisher", jmsUri)).start + + //val cometdPublisherBridge = actorOf(new PublisherBridge("jetty:http://0.0.0.0:8877/camel/pub/cometd", cometdPublisher)).start + val jmsPublisherBridge = actorOf(new PublisherBridge("jetty:http://0.0.0.0:8877/camel/pub/jms", jmsPublisher)).start + + // ----------------------------------------------------------------------- + // Actor un-publishing and re-publishing example + // ----------------------------------------------------------------------- + + actorOf[Consumer4].start // POSTing "stop" to http://0.0.0.0:8877/camel/stop stops and unpublishes this actor + actorOf[Consumer5].start // POSTing any msg to http://0.0.0.0:8877/camel/start starts and published Consumer4 again. + + // ----------------------------------------------------------------------- + // Active object example + // ----------------------------------------------------------------------- + + //TypedActor.newInstance(classOf[TypedConsumer1], classOf[TypedConsumer1Impl]) +} + +/** + * @author Martin Krasser + */ +class CustomRouteBuilder extends RouteBuilder { + def configure { + val actorUri = "actor:%s" format classOf[Consumer2].getName + from("jetty:http://0.0.0.0:8877/camel/custom").to(actorUri) + from("direct:welcome").process(new Processor() { + def process(exchange: Exchange) { + exchange.getOut.setBody("Welcome %s" format exchange.getIn.getBody) + } + }) + } +} diff --git a/akka-samples/akka-sample-camel/src/main/scala/sample/camel/ClientApplication.scala b/akka-samples/akka-sample-camel/src/main/scala/sample/camel/ClientApplication.scala new file mode 100644 index 0000000000..b5bfe56232 --- /dev/null +++ b/akka-samples/akka-sample-camel/src/main/scala/sample/camel/ClientApplication.scala @@ -0,0 +1,26 @@ +package sample.camel + +import akka.actor.Actor._ +import akka.actor.TypedActor +import akka.camel.Message + +/** + * @author Martin Krasser + */ +object ClientApplication extends App { + + val actor1 = remote.actorOf[RemoteActor1]("localhost", 7777).start + val actor2 = remote.actorFor("remote2", "localhost", 7777) + + val typedActor1 = + TypedActor.newRemoteInstance(classOf[RemoteTypedConsumer1],classOf[RemoteTypedConsumer1Impl], "localhost", 7777) + + val typedActor2 = remote.typedActorFor(classOf[RemoteTypedConsumer2], "remote3", "localhost", 7777) + + println(actor1 !! Message("actor1")) // activates and publishes actor remotely + println(actor2 !! Message("actor2")) // actor already activated and published remotely + + println(typedActor1.foo("x1", "y1")) // activates and publishes typed actor methods remotely + println(typedActor2.foo("x2", "y2")) // typed actor methods already activated and published remotely + +} diff --git a/akka-samples/akka-sample-camel/src/main/scala/sample/camel/ServerApplication.scala b/akka-samples/akka-sample-camel/src/main/scala/sample/camel/ServerApplication.scala new file mode 100644 index 0000000000..971416f64a --- /dev/null +++ b/akka-samples/akka-sample-camel/src/main/scala/sample/camel/ServerApplication.scala @@ -0,0 +1,23 @@ +package sample.camel + +import akka.actor.Actor._ +import akka.camel.CamelServiceManager +import akka.actor.{TypedActor} + +/** + * @author Martin Krasser + */ +object ServerApplication extends App { + import CamelServiceManager._ + + startCamelService + + val ua = actorOf[RemoteActor2].start + val ta = TypedActor.newInstance( + classOf[RemoteTypedConsumer2], + classOf[RemoteTypedConsumer2Impl], 2000) + + remote.start("localhost", 7777) + remote.register("remote2", ua) + remote.registerTypedActor("remote3", ta) +} diff --git a/akka-samples/akka-sample-camel/src/main/scala/sample/camel/StandaloneApplication.scala b/akka-samples/akka-sample-camel/src/main/scala/sample/camel/StandaloneApplication.scala new file mode 100644 index 0000000000..ff7fb5c9da --- /dev/null +++ b/akka-samples/akka-sample-camel/src/main/scala/sample/camel/StandaloneApplication.scala @@ -0,0 +1,128 @@ +package sample.camel + +import org.apache.camel.impl.{DefaultCamelContext, SimpleRegistry} +import org.apache.camel.builder.RouteBuilder +import org.apache.camel.spring.spi.ApplicationContextRegistry +import org.springframework.context.support.ClassPathXmlApplicationContext + +import akka.actor.{Actor, ActorRegistry, TypedActor} +import akka.camel._ + +/** + * @author Martin Krasser + */ +object StandaloneApplication extends App { + import CamelContextManager._ + import CamelServiceManager._ + + // 'externally' register typed actors + val registry = new SimpleRegistry + registry.put("sample", TypedActor.newInstance(classOf[BeanIntf], classOf[BeanImpl])) + + // customize CamelContext + CamelContextManager.init(new DefaultCamelContext(registry)) + CamelContextManager.mandatoryContext.addRoutes(new StandaloneApplicationRoute) + + startCamelService + + // access 'externally' registered typed actors + assert("hello msg1" == mandatoryContext.createProducerTemplate.requestBody("direct:test", "msg1")) + + mandatoryService.awaitEndpointActivation(1) { + // 'internally' register typed actor (requires CamelService) + TypedActor.newInstance(classOf[TypedConsumer2], classOf[TypedConsumer2Impl]) + } + + // access 'internally' (automatically) registered typed-actors + // (see @consume annotation value at TypedConsumer2.foo method) + assert("default: msg3" == mandatoryContext.createProducerTemplate.requestBody("direct:default", "msg3")) + + stopCamelService + + Actor.registry.shutdownAll +} + +class StandaloneApplicationRoute extends RouteBuilder { + def configure = { + // route to typed actors (in SimpleRegistry) + from("direct:test").to("typed-actor:sample?method=foo") + } +} + +object StandaloneSpringApplication extends App { + import CamelContextManager._ + + // load Spring application context + val appctx = new ClassPathXmlApplicationContext("/context-standalone.xml") + + // We cannot use the CamelServiceManager to wait for endpoint activation + // because CamelServiceManager is started by the Spring application context. + // (and hence is not available for setting expectations on activations). This + // will be improved/enabled in upcoming releases. + Thread.sleep(1000) + + // access 'externally' registered typed actors with typed-actor component + assert("hello msg3" == mandatoryTemplate.requestBody("direct:test3", "msg3")) + + // access auto-started untyped consumer + assert("received msg3" == mandatoryTemplate.requestBody("direct:untyped-consumer-1", "msg3")) + + appctx.close + + Actor.registry.shutdownAll +} + +class StandaloneSpringApplicationRoute extends RouteBuilder { + def configure = { + // routes to typed actor (in ApplicationContextRegistry) + from("direct:test3").to("typed-actor:ta?method=foo") + } +} + +object StandaloneJmsApplication extends App { + import CamelServiceManager._ + + val context = new ClassPathXmlApplicationContext("/context-jms.xml") + val registry = new ApplicationContextRegistry(context) + + // Init CamelContextManager with custom CamelContext + CamelContextManager.init(new DefaultCamelContext(registry)) + + startCamelService + + val jmsUri = "jms:topic:test" + val jmsPublisher = Actor.actorOf(new Publisher("jms-publisher", jmsUri)).start + + mandatoryService.awaitEndpointActivation(2) { + Actor.actorOf(new Subscriber("jms-subscriber-1", jmsUri)).start + Actor.actorOf(new Subscriber("jms-subscriber-2", jmsUri)).start + } + + // Send 10 messages to via publisher actor + for(i <- 1 to 10) { + jmsPublisher ! ("Akka rocks (%d)" format i) + } + + // Send 10 messages to JMS topic directly + for(i <- 1 to 10) { + CamelContextManager.mandatoryTemplate.sendBody(jmsUri, "Camel rocks (%d)" format i) + } + + // Wait a bit for subscribes to receive messages + Thread.sleep(1000) + + stopCamelService + Actor.registry.shutdownAll +} + +object StandaloneFileApplication { + import CamelServiceManager._ + + def main(args: Array[String]) { + startCamelService + mandatoryService.awaitEndpointActivation(1) { + Actor.actorOf(new FileConsumer).start + } + } +} + diff --git a/akka-samples/akka-sample-camel/src/test/java/sample/camel/SampleRemoteUntypedConsumer.java b/akka-samples/akka-sample-camel/src/test/java/sample/camel/SampleRemoteUntypedConsumer.java new file mode 100644 index 0000000000..5dea328e59 --- /dev/null +++ b/akka-samples/akka-sample-camel/src/test/java/sample/camel/SampleRemoteUntypedConsumer.java @@ -0,0 +1,21 @@ +package sample.camel; + +import akka.camel.Message; +import akka.camel.UntypedConsumerActor; + +/** + * @author Martin Krasser + */ +public class SampleRemoteUntypedConsumer extends UntypedConsumerActor { + public String getEndpointUri() { + return "direct:remote-untyped-consumer"; + } + + public void onReceive(Object message) { + Message msg = (Message)message; + String body = msg.getBodyAs(String.class); + String header = msg.getHeaderAs("test", String.class); + getContext().replySafe(String.format("%s %s", body, header)); + } + +} diff --git a/akka-samples/akka-sample-camel/src/test/scala/sample/camel/HttpConcurrencyTestStress.scala b/akka-samples/akka-sample-camel/src/test/scala/sample/camel/HttpConcurrencyTestStress.scala new file mode 100644 index 0000000000..6568840e19 --- /dev/null +++ b/akka-samples/akka-sample-camel/src/test/scala/sample/camel/HttpConcurrencyTestStress.scala @@ -0,0 +1,99 @@ +package sample.camel + +import collection.mutable.Set + +import java.util.concurrent.CountDownLatch + +import org.junit._ +import org.scalatest.junit.JUnitSuite + +import akka.actor.Actor._ +import akka.actor.{ActorRegistry, ActorRef, Actor} +import akka.camel._ +import akka.camel.CamelServiceManager._ +import akka.routing.CyclicIterator +import akka.routing.Routing._ + +/** + * @author Martin Krasser + */ +class HttpConcurrencyTestStress extends JUnitSuite { + import HttpConcurrencyTestStress._ + + @Test def shouldProcessMessagesConcurrently = { + val num = 50 + val latch1 = new CountDownLatch(num) + val latch2 = new CountDownLatch(num) + val latch3 = new CountDownLatch(num) + val client1 = actorOf(new HttpClientActor("client1", latch1)).start + val client2 = actorOf(new HttpClientActor("client2", latch2)).start + val client3 = actorOf(new HttpClientActor("client3", latch3)).start + for (i <- 1 to num) { + client1 ! Message("client1", Map(Message.MessageExchangeId -> i)) + client2 ! Message("client2", Map(Message.MessageExchangeId -> i)) + client3 ! Message("client3", Map(Message.MessageExchangeId -> i)) + } + latch1.await + latch2.await + latch3.await + assert(num == (client1 !! "getCorrelationIdCount").as[Int].get) + assert(num == (client2 !! "getCorrelationIdCount").as[Int].get) + assert(num == (client3 !! "getCorrelationIdCount").as[Int].get) + } +} + +object HttpConcurrencyTestStress { + @BeforeClass + def beforeClass: Unit = { + startCamelService + + val workers = for (i <- 1 to 8) yield actorOf[HttpServerWorker].start + val balancer = loadBalancerActor(new CyclicIterator(workers.toList)) + + service.get.awaitEndpointActivation(1) { + actorOf(new HttpServerActor(balancer)).start + } + } + + @AfterClass + def afterClass = { + stopCamelService + Actor.registry.shutdownAll + } + + class HttpClientActor(label: String, latch: CountDownLatch) extends Actor with Producer { + def endpointUri = "jetty:http://0.0.0.0:8855/echo" + var correlationIds = Set[Any]() + + override protected def receive = { + case "getCorrelationIdCount" => self.reply(correlationIds.size) + case msg => super.receive(msg) + } + + override protected def receiveAfterProduce = { + case msg: Message => { + val corr = msg.headers(Message.MessageExchangeId) + val body = msg.bodyAs[String] + correlationIds += corr + assert(label == body) + latch.countDown + print(".") + } + } + } + + class HttpServerActor(balancer: ActorRef) extends Actor with Consumer { + def endpointUri = "jetty:http://0.0.0.0:8855/echo" + var counter = 0 + + def receive = { + case msg => balancer forward msg + } + } + + class HttpServerWorker extends Actor { + protected def receive = { + case msg => self.reply(msg) + } + } +} diff --git a/akka-samples/akka-sample-camel/src/test/scala/sample/camel/RemoteConsumerTest.scala b/akka-samples/akka-sample-camel/src/test/scala/sample/camel/RemoteConsumerTest.scala new file mode 100644 index 0000000000..087b46b9b1 --- /dev/null +++ b/akka-samples/akka-sample-camel/src/test/scala/sample/camel/RemoteConsumerTest.scala @@ -0,0 +1,99 @@ +package sample.camel + +import org.scalatest.{GivenWhenThen, BeforeAndAfterAll, FeatureSpec} + +import akka.actor.Actor._ +import akka.actor._ +import akka.camel._ +import akka.remote.netty.NettyRemoteSupport +import akka.remoteinterface.RemoteServerModule + +/** + * @author Martin Krasser + */ +class RemoteConsumerTest extends FeatureSpec with BeforeAndAfterAll with GivenWhenThen { + import CamelServiceManager._ + import RemoteConsumerTest._ + + var server: RemoteServerModule = _ + + override protected def beforeAll = { + registry.shutdownAll + + startCamelService + + remote.shutdown + remote.asInstanceOf[NettyRemoteSupport].optimizeLocal.set(false) + + server = remote.start(host,port) + } + + override protected def afterAll = { + remote.shutdown + + stopCamelService + + registry.shutdownAll + remote.asInstanceOf[NettyRemoteSupport].optimizeLocal.set(true) + } + + feature("Publish consumer on remote node") { + scenario("access published remote consumer") { + given("a consumer actor") + val consumer = Actor.actorOf[RemoteConsumer] + + when("registered at the server") + assert(mandatoryService.awaitEndpointActivation(1) { + remote.register(consumer) + }) + + then("the published consumer is accessible via its endpoint URI") + val response = CamelContextManager.mandatoryTemplate.requestBody("direct:remote-consumer", "test") + assert(response === "remote actor: test") + } + } + + feature("Publish typed consumer on remote node") { + scenario("access published remote consumer method") { + given("a typed consumer actor") + when("registered at the server") + assert(mandatoryService.awaitEndpointActivation(1) { + remote.registerTypedActor("whatever", TypedActor.newInstance( + classOf[SampleRemoteTypedConsumer], + classOf[SampleRemoteTypedConsumerImpl])) + }) + then("the published method is accessible via its endpoint URI") + val response = CamelContextManager.mandatoryTemplate.requestBody("direct:remote-typed-consumer", "test") + assert(response === "remote typed actor: test") + } + } + + feature("Publish untyped consumer on remote node") { + scenario("access published remote untyped consumer") { + given("an untyped consumer actor") + val consumer = Actor.actorOf(classOf[SampleRemoteUntypedConsumer]) + + when("registered at the server") + assert(mandatoryService.awaitEndpointActivation(1) { + remote.register(consumer) + }) + then("the published untyped consumer is accessible via its endpoint URI") + val response = CamelContextManager.mandatoryTemplate.requestBodyAndHeader("direct:remote-untyped-consumer", "a", "test", "b") + assert(response === "a b") + } + } +} + +object RemoteConsumerTest { + val host = "localhost" + val port = 7774 + + class RemoteConsumer extends Actor with Consumer { + def endpointUri = "direct:remote-consumer" + + protected def receive = { + case "init" => self.reply("done") + case m: Message => self.reply("remote actor: %s" format m.body) + } + } +} diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index ec0a765aa1..844be68fe6 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -52,6 +52,7 @@ class AkkaParentProject(info: ProjectInfo) extends ParentProject(info) with Exec import Repositories._ lazy val jettyModuleConfig = ModuleConfiguration("org.eclipse.jetty", sbt.DefaultMavenRepository) + lazy val camelJettyModuleConfig = ModuleConfiguration("org.apache.camel", "camel-jetty", AkkaRepo) lazy val guiceyFruitModuleConfig = ModuleConfiguration("org.guiceyfruit", GuiceyFruitRepo) lazy val glassfishModuleConfig = ModuleConfiguration("org.glassfish", GlassfishRepo) lazy val jbossModuleConfig = ModuleConfiguration("org.jboss", JBossRepo) @@ -99,12 +100,14 @@ class AkkaParentProject(info: ProjectInfo) extends ParentProject(info) with Exec // Compile + lazy val activemq = "org.apache.activemq" % "activemq-core" % "5.4.2" % "compile" // ApacheV2 lazy val aopalliance = "aopalliance" % "aopalliance" % "1.0" % "compile" //Public domain lazy val aspectwerkz = "org.codehaus.aspectwerkz" % "aspectwerkz" % "2.2.3" % "compile" //ApacheV2 lazy val beanstalk = "beanstalk" % "beanstalk_client" % "1.4.5" //New BSD lazy val bookkeeper = "org.apache.hadoop.zookeeper" % "bookkeeper" % ZOOKEEPER_VERSION //ApacheV2 lazy val camel_core = "org.apache.camel" % "camel-core" % CAMEL_VERSION % "compile" //ApacheV2 - + lazy val camel_jetty = "org.apache.camel" % "camel-jetty" % "2.7.1.1" % "compile" + lazy val camel_jms = "org.apache.camel" % "camel-jms" % CAMEL_VERSION % "compile" //ApacheV2 lazy val commons_codec = "commons-codec" % "commons-codec" % "1.4" % "compile" //ApacheV2 lazy val commons_io = "commons-io" % "commons-io" % "2.0.1" % "compile" //ApacheV2 lazy val javax_servlet_30 = "org.glassfish" % "javax.servlet" % JAVAX_SERVLET_VERSION % "provided" //CDDL v1 @@ -134,7 +137,7 @@ class AkkaParentProject(info: ProjectInfo) extends ParentProject(info) with Exec lazy val slf4j = "org.slf4j" % "slf4j-api" % SLF4J_VERSION // MIT lazy val spring_beans = "org.springframework" % "spring-beans" % SPRING_VERSION % "compile" //ApacheV2 lazy val spring_context = "org.springframework" % "spring-context" % SPRING_VERSION % "compile" //ApacheV2 - + lazy val spring_jms = "org.springframework" % "spring-jms" % SPRING_VERSION % "compile" //ApacheV2 lazy val stax_api = "javax.xml.stream" % "stax-api" % "1.0-2" % "compile" //ApacheV2 lazy val logback = "ch.qos.logback" % "logback-classic" % "0.9.28" % "runtime" //MIT lazy val log4j = "log4j" % "log4j" % "1.2.15" //ApacheV2 @@ -604,6 +607,22 @@ class AkkaParentProject(info: ProjectInfo) extends ParentProject(info) with Exec class AkkaSampleFSMProject(info: ProjectInfo) extends AkkaDefaultProject(info) + class AkkaSampleCamelProject(info: ProjectInfo) extends AkkaDefaultProject(info) { + val activemq = Dependencies.activemq + val camel_jetty = Dependencies.camel_jetty + val camel_jms = Dependencies.camel_jms + val spring_jms = Dependencies.spring_jms + val commons_codec = Dependencies.commons_codec + + override def ivyXML = { + + + + } + + override def testOptions = createTestFilter( _.endsWith("Test")) + } + class AkkaSampleOsgiProject(info: ProjectInfo) extends AkkaDefaultProject(info) with BNDPlugin { val osgiCore = Dependencies.osgi_core override protected def bndPrivatePackage = List("sample.osgi.*") @@ -624,6 +643,10 @@ class AkkaParentProject(info: ProjectInfo) extends ParentProject(info) with Exec lazy val akka_sample_osgi = project("akka-sample-osgi", "akka-sample-osgi", new AkkaSampleOsgiProject(_), akka_actor) + lazy val akka_sample_camel = project("akka-sample-camel", "akka-sample-camel", + new AkkaSampleCamelProject(_), akka_actor, akka_kernel) + + lazy val publishRelease = { val releaseConfiguration = new DefaultPublishConfiguration(localReleaseRepository, "release", false) publishTask(publishIvyModule, releaseConfiguration) dependsOn (deliver, publishLocal, makePom)