diff --git a/akka-samples/akka-sample-camel/config/akka.conf b/akka-samples/akka-sample-camel/config/akka.conf
new file mode 100644
index 0000000000..0bd7bd16a2
--- /dev/null
+++ b/akka-samples/akka-sample-camel/config/akka.conf
@@ -0,0 +1,20 @@
+####################
+# Akka Config File #
+####################
+
+akka {
+ version = "2.0-SNAPSHOT"
+
+ enabled-modules = ["camel", "http"]
+
+ time-unit = "seconds"
+
+ event-handlers = ["akka.event.EventHandler$DefaultListener"]
+
+ boot = ["sample.camel.Boot"]
+
+ http {
+ hostname = "localhost"
+ port = 9998
+ }
+}
diff --git a/akka-samples/akka-sample-camel/config/microkernel-server.xml b/akka-samples/akka-sample-camel/config/microkernel-server.xml
new file mode 100644
index 0000000000..6be6beec33
--- /dev/null
+++ b/akka-samples/akka-sample-camel/config/microkernel-server.xml
@@ -0,0 +1,65 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ 300000
+ 2
+ false
+ 8443
+ 20000
+ 5000
+
+
+
+
+
+
+
+
+
+
+
+ -
+
+ /
+
+ akka.http.AkkaRestServlet
+ /*
+
+
+
+ -
+
+
+
+
+
+
+
+
+
+
+ true
+ true
+ true
+ 1000
+
+
diff --git a/akka-samples/akka-sample-camel/src/main/java/sample/camel/BeanImpl.java b/akka-samples/akka-sample-camel/src/main/java/sample/camel/BeanImpl.java
new file mode 100644
index 0000000000..9ceba85d64
--- /dev/null
+++ b/akka-samples/akka-sample-camel/src/main/java/sample/camel/BeanImpl.java
@@ -0,0 +1,13 @@
+package sample.camel;
+
+import akka.actor.TypedActor;
+/**
+ * @author Martin Krasser
+ */
+public class BeanImpl extends TypedActor implements BeanIntf {
+
+ public String foo(String s) {
+ return "hello " + s;
+ }
+
+}
diff --git a/akka-samples/akka-sample-camel/src/main/java/sample/camel/BeanIntf.java b/akka-samples/akka-sample-camel/src/main/java/sample/camel/BeanIntf.java
new file mode 100644
index 0000000000..a7b2e6e6a4
--- /dev/null
+++ b/akka-samples/akka-sample-camel/src/main/java/sample/camel/BeanIntf.java
@@ -0,0 +1,10 @@
+package sample.camel;
+
+/**
+ * @author Martin Krasser
+ */
+public interface BeanIntf {
+
+ public String foo(String s);
+
+}
diff --git a/akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteTypedConsumer1.java b/akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteTypedConsumer1.java
new file mode 100644
index 0000000000..3e8ce1e20f
--- /dev/null
+++ b/akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteTypedConsumer1.java
@@ -0,0 +1,15 @@
+package sample.camel;
+
+import org.apache.camel.Body;
+import org.apache.camel.Header;
+
+import akka.camel.consume;
+
+/**
+ * @author Martin Krasser
+ */
+public interface RemoteTypedConsumer1 {
+
+ @consume("jetty:http://localhost:6644/camel/remote-typed-actor-1")
+ public String foo(@Body String body, @Header("name") String header);
+}
diff --git a/akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteTypedConsumer1Impl.java b/akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteTypedConsumer1Impl.java
new file mode 100644
index 0000000000..522db0e4a7
--- /dev/null
+++ b/akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteTypedConsumer1Impl.java
@@ -0,0 +1,13 @@
+package sample.camel;
+
+import akka.actor.TypedActor;
+
+/**
+ * @author Martin Krasser
+ */
+public class RemoteTypedConsumer1Impl extends TypedActor implements RemoteTypedConsumer1 {
+
+ public String foo(String body, String header) {
+ return String.format("remote1: body=%s header=%s", body, header);
+ }
+}
diff --git a/akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteTypedConsumer2.java b/akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteTypedConsumer2.java
new file mode 100644
index 0000000000..ba093a1d96
--- /dev/null
+++ b/akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteTypedConsumer2.java
@@ -0,0 +1,15 @@
+package sample.camel;
+
+import org.apache.camel.Body;
+import org.apache.camel.Header;
+import akka.camel.consume;
+
+/**
+ * @author Martin Krasser
+ */
+public interface RemoteTypedConsumer2 {
+
+ @consume("jetty:http://localhost:6644/camel/remote-typed-actor-2")
+ public String foo(@Body String body, @Header("name") String header);
+
+}
diff --git a/akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteTypedConsumer2Impl.java b/akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteTypedConsumer2Impl.java
new file mode 100644
index 0000000000..b3475ad2d6
--- /dev/null
+++ b/akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteTypedConsumer2Impl.java
@@ -0,0 +1,14 @@
+package sample.camel;
+
+import akka.actor.TypedActor;
+
+/**
+ * @author Martin Krasser
+ */
+public class RemoteTypedConsumer2Impl extends TypedActor implements RemoteTypedConsumer2 {
+
+ public String foo(String body, String header) {
+ return String.format("remote2: body=%s header=%s", body, header);
+ }
+
+}
diff --git a/akka-samples/akka-sample-camel/src/main/java/sample/camel/TypedConsumer1.java b/akka-samples/akka-sample-camel/src/main/java/sample/camel/TypedConsumer1.java
new file mode 100644
index 0000000000..6213fb8f09
--- /dev/null
+++ b/akka-samples/akka-sample-camel/src/main/java/sample/camel/TypedConsumer1.java
@@ -0,0 +1,17 @@
+package sample.camel;
+
+import org.apache.camel.Body;
+import org.apache.camel.Header;
+
+import akka.camel.consume;
+
+/**
+ * @author Martin Krasser
+ */
+public interface TypedConsumer1 {
+ @consume("file:data/input/typed-actor")
+ public void foo(String body);
+
+ @consume("jetty:http://0.0.0.0:8877/camel/typed-actor")
+ public String bar(@Body String body, @Header("name") String header);
+}
diff --git a/akka-samples/akka-sample-camel/src/main/java/sample/camel/TypedConsumer1Impl.java b/akka-samples/akka-sample-camel/src/main/java/sample/camel/TypedConsumer1Impl.java
new file mode 100644
index 0000000000..bd735fe14b
--- /dev/null
+++ b/akka-samples/akka-sample-camel/src/main/java/sample/camel/TypedConsumer1Impl.java
@@ -0,0 +1,21 @@
+package sample.camel;
+
+import org.apache.camel.Body;
+import org.apache.camel.Header;
+
+import akka.actor.TypedActor;
+
+/**
+ * @author Martin Krasser
+ */
+public class TypedConsumer1Impl extends TypedActor implements TypedConsumer1 {
+
+ public void foo(String body) {
+ System.out.println("Received message:");
+ System.out.println(body);
+ }
+
+ public String bar(@Body String body, @Header("name") String header) {
+ return String.format("body=%s header=%s", body, header);
+ }
+}
diff --git a/akka-samples/akka-sample-camel/src/main/java/sample/camel/TypedConsumer2.java b/akka-samples/akka-sample-camel/src/main/java/sample/camel/TypedConsumer2.java
new file mode 100644
index 0000000000..9a39b534b5
--- /dev/null
+++ b/akka-samples/akka-sample-camel/src/main/java/sample/camel/TypedConsumer2.java
@@ -0,0 +1,14 @@
+package sample.camel;
+
+import org.apache.camel.Body;
+import org.apache.camel.Header;
+import akka.camel.consume;
+
+/**
+ * @author Martin Krasser
+ */
+public interface TypedConsumer2 {
+
+ @consume("direct:default")
+ public String foo(String body);
+}
diff --git a/akka-samples/akka-sample-camel/src/main/java/sample/camel/TypedConsumer2Impl.java b/akka-samples/akka-sample-camel/src/main/java/sample/camel/TypedConsumer2Impl.java
new file mode 100644
index 0000000000..ed82810c10
--- /dev/null
+++ b/akka-samples/akka-sample-camel/src/main/java/sample/camel/TypedConsumer2Impl.java
@@ -0,0 +1,13 @@
+package sample.camel;
+
+import akka.actor.TypedActor;
+
+/**
+ * @author Martin Krasser
+ */
+public class TypedConsumer2Impl extends TypedActor implements TypedConsumer2 {
+
+ public String foo(String body) {
+ return String.format("default: %s", body);
+ }
+}
diff --git a/akka-samples/akka-sample-camel/src/main/java/sample/camel/UntypedConsumer1.java b/akka-samples/akka-sample-camel/src/main/java/sample/camel/UntypedConsumer1.java
new file mode 100644
index 0000000000..39d910fc28
--- /dev/null
+++ b/akka-samples/akka-sample-camel/src/main/java/sample/camel/UntypedConsumer1.java
@@ -0,0 +1,20 @@
+package sample.camel;
+
+import akka.camel.Message;
+import akka.camel.UntypedConsumerActor;
+
+/**
+ * @author Martin Krasser
+ */
+public class UntypedConsumer1 extends UntypedConsumerActor {
+
+ public String getEndpointUri() {
+ return "direct:untyped-consumer-1";
+ }
+
+ public void onReceive(Object message) {
+ Message msg = (Message)message;
+ String body = msg.getBodyAs(String.class);
+ getContext().replySafe(String.format("received %s", body));
+ }
+}
diff --git a/akka-samples/akka-sample-camel/src/main/resources/context-jms.xml b/akka-samples/akka-sample-camel/src/main/resources/context-jms.xml
new file mode 100644
index 0000000000..12e4541be3
--- /dev/null
+++ b/akka-samples/akka-sample-camel/src/main/resources/context-jms.xml
@@ -0,0 +1,27 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/akka-samples/akka-sample-camel/src/main/resources/context-standalone.xml b/akka-samples/akka-sample-camel/src/main/resources/context-standalone.xml
new file mode 100644
index 0000000000..e4edcbc350
--- /dev/null
+++ b/akka-samples/akka-sample-camel/src/main/resources/context-standalone.xml
@@ -0,0 +1,26 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/akka-samples/akka-sample-camel/src/main/scala/sample/camel/Actors.scala b/akka-samples/akka-sample-camel/src/main/scala/sample/camel/Actors.scala
new file mode 100644
index 0000000000..5b176b0888
--- /dev/null
+++ b/akka-samples/akka-sample-camel/src/main/scala/sample/camel/Actors.scala
@@ -0,0 +1,162 @@
+package sample.camel
+
+import org.apache.camel.Exchange
+
+import akka.actor.{Actor, ActorRef, ActorRegistry}
+import akka.camel.{Ack, Failure, Producer, Message, Consumer}
+
+/**
+ * Client-initiated remote actor.
+ */
+class RemoteActor1 extends Actor with Consumer {
+ def endpointUri = "jetty:http://localhost:6644/camel/remote-actor-1"
+
+ protected def receive = {
+ case msg: Message => self.reply(Message("hello %s" format msg.bodyAs[String], Map("sender" -> "remote1")))
+ }
+}
+
+/**
+ * Server-initiated remote actor.
+ */
+class RemoteActor2 extends Actor with Consumer {
+ def endpointUri = "jetty:http://localhost:6644/camel/remote-actor-2"
+
+ protected def receive = {
+ case msg: Message => self.reply(Message("hello %s" format msg.bodyAs[String], Map("sender" -> "remote2")))
+ }
+}
+
+class Producer1 extends Actor with Producer {
+ def endpointUri = "direct:welcome"
+ override def oneway = false // default
+}
+
+class Consumer1 extends Actor with Consumer {
+ def endpointUri = "file:data/input/actor"
+
+ def receive = {
+ case msg: Message => println("received %s" format msg.bodyAs[String])
+ }
+}
+
+class Consumer2 extends Actor with Consumer {
+ def endpointUri = "jetty:http://0.0.0.0:8877/camel/default"
+
+ def receive = {
+ case msg: Message => self.reply("Hello %s" format msg.bodyAs[String])
+ }
+}
+
+class Consumer3(transformer: ActorRef) extends Actor with Consumer {
+ def endpointUri = "jetty:http://0.0.0.0:8877/camel/welcome"
+
+ def receive = {
+ case msg: Message => transformer.forward(msg.setBodyAs[String])
+ }
+}
+
+class Consumer4 extends Actor with Consumer {
+ def endpointUri = "jetty:http://0.0.0.0:8877/camel/stop"
+
+ def receive = {
+ case msg: Message => msg.bodyAs[String] match {
+ case "stop" => {
+ self.reply("Consumer4 stopped")
+ self.stop
+ }
+ case body => self.reply(body)
+ }
+ }
+}
+
+class Consumer5 extends Actor with Consumer {
+ def endpointUri = "jetty:http://0.0.0.0:8877/camel/start"
+
+ def receive = {
+ case _ => {
+ Actor.actorOf[Consumer4].start
+ self.reply("Consumer4 started")
+ }
+ }
+}
+
+class Transformer(producer: ActorRef) extends Actor {
+ protected def receive = {
+ case msg: Message => producer.forward(msg.transformBody( (body: String) => "- %s -" format body))
+ }
+}
+
+class Subscriber(name:String, uri: String) extends Actor with Consumer {
+ def endpointUri = uri
+
+ protected def receive = {
+ case msg: Message => println("%s received: %s" format (name, msg.body))
+ }
+}
+
+class Publisher(name: String, uri: String) extends Actor with Producer {
+ self.id = name
+ def endpointUri = uri
+ override def oneway = true
+}
+
+class PublisherBridge(uri: String, publisher: ActorRef) extends Actor with Consumer {
+ def endpointUri = uri
+
+ protected def receive = {
+ case msg: Message => {
+ publisher ! msg.bodyAs[String]
+ self.reply("message published")
+ }
+ }
+}
+
+class HttpConsumer(producer: ActorRef) extends Actor with Consumer {
+ def endpointUri = "jetty:http://0.0.0.0:8875/"
+
+ protected def receive = {
+ case msg => producer forward msg
+ }
+}
+
+class HttpProducer(transformer: ActorRef) extends Actor with Producer {
+ def endpointUri = "jetty://http://akka.io/?bridgeEndpoint=true"
+
+ override protected def receiveBeforeProduce = {
+ // only keep Exchange.HTTP_PATH message header (which needed by bridge endpoint)
+ case msg: Message => msg.setHeaders(msg.headers(Set(Exchange.HTTP_PATH)))
+ }
+
+ override protected def receiveAfterProduce = {
+ // do not reply but forward result to transformer
+ case msg => transformer forward msg
+ }
+}
+
+class HttpTransformer extends Actor {
+ protected def receive = {
+ case msg: Message => self.reply(msg.transformBody {body: String => body replaceAll ("Akka ", "AKKA ")})
+ case msg: Failure => self.reply(msg)
+ }
+}
+
+class FileConsumer extends Actor with Consumer {
+ def endpointUri = "file:data/input/actor?delete=true"
+ override def autoack = false
+
+ var counter = 0
+
+ def receive = {
+ case msg: Message => {
+ if (counter == 2) {
+ println("received %s" format msg.bodyAs[String])
+ self.reply(Ack)
+ } else {
+ println("rejected %s" format msg.bodyAs[String])
+ counter += 1
+ self.reply(Failure(new Exception("message number %s not accepted" format counter)))
+ }
+ }
+ }
+}
diff --git a/akka-samples/akka-sample-camel/src/main/scala/sample/camel/Boot.scala b/akka-samples/akka-sample-camel/src/main/scala/sample/camel/Boot.scala
new file mode 100644
index 0000000000..a6065a004d
--- /dev/null
+++ b/akka-samples/akka-sample-camel/src/main/scala/sample/camel/Boot.scala
@@ -0,0 +1,108 @@
+package sample.camel
+
+import org.apache.camel.{Exchange, Processor}
+import org.apache.camel.builder.RouteBuilder
+import org.apache.camel.impl.DefaultCamelContext
+import org.apache.camel.spring.spi.ApplicationContextRegistry
+import org.springframework.context.support.ClassPathXmlApplicationContext
+
+import akka.actor.Actor._
+import akka.actor.{TypedActor, Supervisor}
+import akka.camel.CamelContextManager
+import akka.config.Supervision._
+
+/**
+ * @author Martin Krasser
+ */
+class Boot {
+
+ // -----------------------------------------------------------------------
+ // Basic example
+ // -----------------------------------------------------------------------
+
+ actorOf[Consumer1].start
+ actorOf[Consumer2].start
+
+ // Alternatively, use a supervisor for these actors
+ //val supervisor = Supervisor(
+ // SupervisorConfig(
+ // RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])),
+ // Supervise(actorOf[Consumer1], Permanent) ::
+ // Supervise(actorOf[Consumer2], Permanent) :: Nil))
+
+ // -----------------------------------------------------------------------
+ // Custom Camel route example
+ // -----------------------------------------------------------------------
+
+ // Create CamelContext and a Spring-based registry
+ val context = new ClassPathXmlApplicationContext("/context-jms.xml", getClass)
+ val registry = new ApplicationContextRegistry(context)
+
+ // Use a custom Camel context and a custom touter builder
+ CamelContextManager.init(new DefaultCamelContext(registry))
+ CamelContextManager.mandatoryContext.addRoutes(new CustomRouteBuilder)
+
+ val producer = actorOf[Producer1]
+ val mediator = actorOf(new Transformer(producer))
+ val consumer = actorOf(new Consumer3(mediator))
+
+ producer.start
+ mediator.start
+ consumer.start
+
+ // -----------------------------------------------------------------------
+ // Asynchronous consumer-producer example (Akka homepage transformation)
+ // -----------------------------------------------------------------------
+
+ val httpTransformer = actorOf(new HttpTransformer).start
+ val httpProducer = actorOf(new HttpProducer(httpTransformer)).start
+ val httpConsumer = actorOf(new HttpConsumer(httpProducer)).start
+
+ // -----------------------------------------------------------------------
+ // Publish subscribe examples
+ // -----------------------------------------------------------------------
+
+ //
+ // Cometd example commented out because camel-cometd is broken since Camel 2.3
+ //
+
+ //val cometdUri = "cometd://localhost:8111/test/abc?baseResource=file:target"
+ //val cometdSubscriber = actorOf(new Subscriber("cometd-subscriber", cometdUri)).start
+ //val cometdPublisher = actorOf(new Publisher("cometd-publisher", cometdUri)).start
+
+ val jmsUri = "jms:topic:test"
+ val jmsSubscriber1 = actorOf(new Subscriber("jms-subscriber-1", jmsUri)).start
+ val jmsSubscriber2 = actorOf(new Subscriber("jms-subscriber-2", jmsUri)).start
+ val jmsPublisher = actorOf(new Publisher("jms-publisher", jmsUri)).start
+
+ //val cometdPublisherBridge = actorOf(new PublisherBridge("jetty:http://0.0.0.0:8877/camel/pub/cometd", cometdPublisher)).start
+ val jmsPublisherBridge = actorOf(new PublisherBridge("jetty:http://0.0.0.0:8877/camel/pub/jms", jmsPublisher)).start
+
+ // -----------------------------------------------------------------------
+ // Actor un-publishing and re-publishing example
+ // -----------------------------------------------------------------------
+
+ actorOf[Consumer4].start // POSTing "stop" to http://0.0.0.0:8877/camel/stop stops and unpublishes this actor
+ actorOf[Consumer5].start // POSTing any msg to http://0.0.0.0:8877/camel/start starts and published Consumer4 again.
+
+ // -----------------------------------------------------------------------
+ // Active object example
+ // -----------------------------------------------------------------------
+
+ //TypedActor.newInstance(classOf[TypedConsumer1], classOf[TypedConsumer1Impl])
+}
+
+/**
+ * @author Martin Krasser
+ */
+class CustomRouteBuilder extends RouteBuilder {
+ def configure {
+ val actorUri = "actor:%s" format classOf[Consumer2].getName
+ from("jetty:http://0.0.0.0:8877/camel/custom").to(actorUri)
+ from("direct:welcome").process(new Processor() {
+ def process(exchange: Exchange) {
+ exchange.getOut.setBody("Welcome %s" format exchange.getIn.getBody)
+ }
+ })
+ }
+}
diff --git a/akka-samples/akka-sample-camel/src/main/scala/sample/camel/ClientApplication.scala b/akka-samples/akka-sample-camel/src/main/scala/sample/camel/ClientApplication.scala
new file mode 100644
index 0000000000..b5bfe56232
--- /dev/null
+++ b/akka-samples/akka-sample-camel/src/main/scala/sample/camel/ClientApplication.scala
@@ -0,0 +1,26 @@
+package sample.camel
+
+import akka.actor.Actor._
+import akka.actor.TypedActor
+import akka.camel.Message
+
+/**
+ * @author Martin Krasser
+ */
+object ClientApplication extends App {
+
+ val actor1 = remote.actorOf[RemoteActor1]("localhost", 7777).start
+ val actor2 = remote.actorFor("remote2", "localhost", 7777)
+
+ val typedActor1 =
+ TypedActor.newRemoteInstance(classOf[RemoteTypedConsumer1],classOf[RemoteTypedConsumer1Impl], "localhost", 7777)
+
+ val typedActor2 = remote.typedActorFor(classOf[RemoteTypedConsumer2], "remote3", "localhost", 7777)
+
+ println(actor1 !! Message("actor1")) // activates and publishes actor remotely
+ println(actor2 !! Message("actor2")) // actor already activated and published remotely
+
+ println(typedActor1.foo("x1", "y1")) // activates and publishes typed actor methods remotely
+ println(typedActor2.foo("x2", "y2")) // typed actor methods already activated and published remotely
+
+}
diff --git a/akka-samples/akka-sample-camel/src/main/scala/sample/camel/ServerApplication.scala b/akka-samples/akka-sample-camel/src/main/scala/sample/camel/ServerApplication.scala
new file mode 100644
index 0000000000..971416f64a
--- /dev/null
+++ b/akka-samples/akka-sample-camel/src/main/scala/sample/camel/ServerApplication.scala
@@ -0,0 +1,23 @@
+package sample.camel
+
+import akka.actor.Actor._
+import akka.camel.CamelServiceManager
+import akka.actor.{TypedActor}
+
+/**
+ * @author Martin Krasser
+ */
+object ServerApplication extends App {
+ import CamelServiceManager._
+
+ startCamelService
+
+ val ua = actorOf[RemoteActor2].start
+ val ta = TypedActor.newInstance(
+ classOf[RemoteTypedConsumer2],
+ classOf[RemoteTypedConsumer2Impl], 2000)
+
+ remote.start("localhost", 7777)
+ remote.register("remote2", ua)
+ remote.registerTypedActor("remote3", ta)
+}
diff --git a/akka-samples/akka-sample-camel/src/main/scala/sample/camel/StandaloneApplication.scala b/akka-samples/akka-sample-camel/src/main/scala/sample/camel/StandaloneApplication.scala
new file mode 100644
index 0000000000..ff7fb5c9da
--- /dev/null
+++ b/akka-samples/akka-sample-camel/src/main/scala/sample/camel/StandaloneApplication.scala
@@ -0,0 +1,128 @@
+package sample.camel
+
+import org.apache.camel.impl.{DefaultCamelContext, SimpleRegistry}
+import org.apache.camel.builder.RouteBuilder
+import org.apache.camel.spring.spi.ApplicationContextRegistry
+import org.springframework.context.support.ClassPathXmlApplicationContext
+
+import akka.actor.{Actor, ActorRegistry, TypedActor}
+import akka.camel._
+
+/**
+ * @author Martin Krasser
+ */
+object StandaloneApplication extends App {
+ import CamelContextManager._
+ import CamelServiceManager._
+
+ // 'externally' register typed actors
+ val registry = new SimpleRegistry
+ registry.put("sample", TypedActor.newInstance(classOf[BeanIntf], classOf[BeanImpl]))
+
+ // customize CamelContext
+ CamelContextManager.init(new DefaultCamelContext(registry))
+ CamelContextManager.mandatoryContext.addRoutes(new StandaloneApplicationRoute)
+
+ startCamelService
+
+ // access 'externally' registered typed actors
+ assert("hello msg1" == mandatoryContext.createProducerTemplate.requestBody("direct:test", "msg1"))
+
+ mandatoryService.awaitEndpointActivation(1) {
+ // 'internally' register typed actor (requires CamelService)
+ TypedActor.newInstance(classOf[TypedConsumer2], classOf[TypedConsumer2Impl])
+ }
+
+ // access 'internally' (automatically) registered typed-actors
+ // (see @consume annotation value at TypedConsumer2.foo method)
+ assert("default: msg3" == mandatoryContext.createProducerTemplate.requestBody("direct:default", "msg3"))
+
+ stopCamelService
+
+ Actor.registry.shutdownAll
+}
+
+class StandaloneApplicationRoute extends RouteBuilder {
+ def configure = {
+ // route to typed actors (in SimpleRegistry)
+ from("direct:test").to("typed-actor:sample?method=foo")
+ }
+}
+
+object StandaloneSpringApplication extends App {
+ import CamelContextManager._
+
+ // load Spring application context
+ val appctx = new ClassPathXmlApplicationContext("/context-standalone.xml")
+
+ // We cannot use the CamelServiceManager to wait for endpoint activation
+ // because CamelServiceManager is started by the Spring application context.
+ // (and hence is not available for setting expectations on activations). This
+ // will be improved/enabled in upcoming releases.
+ Thread.sleep(1000)
+
+ // access 'externally' registered typed actors with typed-actor component
+ assert("hello msg3" == mandatoryTemplate.requestBody("direct:test3", "msg3"))
+
+ // access auto-started untyped consumer
+ assert("received msg3" == mandatoryTemplate.requestBody("direct:untyped-consumer-1", "msg3"))
+
+ appctx.close
+
+ Actor.registry.shutdownAll
+}
+
+class StandaloneSpringApplicationRoute extends RouteBuilder {
+ def configure = {
+ // routes to typed actor (in ApplicationContextRegistry)
+ from("direct:test3").to("typed-actor:ta?method=foo")
+ }
+}
+
+object StandaloneJmsApplication extends App {
+ import CamelServiceManager._
+
+ val context = new ClassPathXmlApplicationContext("/context-jms.xml")
+ val registry = new ApplicationContextRegistry(context)
+
+ // Init CamelContextManager with custom CamelContext
+ CamelContextManager.init(new DefaultCamelContext(registry))
+
+ startCamelService
+
+ val jmsUri = "jms:topic:test"
+ val jmsPublisher = Actor.actorOf(new Publisher("jms-publisher", jmsUri)).start
+
+ mandatoryService.awaitEndpointActivation(2) {
+ Actor.actorOf(new Subscriber("jms-subscriber-1", jmsUri)).start
+ Actor.actorOf(new Subscriber("jms-subscriber-2", jmsUri)).start
+ }
+
+ // Send 10 messages to via publisher actor
+ for(i <- 1 to 10) {
+ jmsPublisher ! ("Akka rocks (%d)" format i)
+ }
+
+ // Send 10 messages to JMS topic directly
+ for(i <- 1 to 10) {
+ CamelContextManager.mandatoryTemplate.sendBody(jmsUri, "Camel rocks (%d)" format i)
+ }
+
+ // Wait a bit for subscribes to receive messages
+ Thread.sleep(1000)
+
+ stopCamelService
+ Actor.registry.shutdownAll
+}
+
+object StandaloneFileApplication {
+ import CamelServiceManager._
+
+ def main(args: Array[String]) {
+ startCamelService
+ mandatoryService.awaitEndpointActivation(1) {
+ Actor.actorOf(new FileConsumer).start
+ }
+ }
+}
+
diff --git a/akka-samples/akka-sample-camel/src/test/java/sample/camel/SampleRemoteUntypedConsumer.java b/akka-samples/akka-sample-camel/src/test/java/sample/camel/SampleRemoteUntypedConsumer.java
new file mode 100644
index 0000000000..5dea328e59
--- /dev/null
+++ b/akka-samples/akka-sample-camel/src/test/java/sample/camel/SampleRemoteUntypedConsumer.java
@@ -0,0 +1,21 @@
+package sample.camel;
+
+import akka.camel.Message;
+import akka.camel.UntypedConsumerActor;
+
+/**
+ * @author Martin Krasser
+ */
+public class SampleRemoteUntypedConsumer extends UntypedConsumerActor {
+ public String getEndpointUri() {
+ return "direct:remote-untyped-consumer";
+ }
+
+ public void onReceive(Object message) {
+ Message msg = (Message)message;
+ String body = msg.getBodyAs(String.class);
+ String header = msg.getHeaderAs("test", String.class);
+ getContext().replySafe(String.format("%s %s", body, header));
+ }
+
+}
diff --git a/akka-samples/akka-sample-camel/src/test/scala/sample/camel/HttpConcurrencyTestStress.scala b/akka-samples/akka-sample-camel/src/test/scala/sample/camel/HttpConcurrencyTestStress.scala
new file mode 100644
index 0000000000..6568840e19
--- /dev/null
+++ b/akka-samples/akka-sample-camel/src/test/scala/sample/camel/HttpConcurrencyTestStress.scala
@@ -0,0 +1,99 @@
+package sample.camel
+
+import collection.mutable.Set
+
+import java.util.concurrent.CountDownLatch
+
+import org.junit._
+import org.scalatest.junit.JUnitSuite
+
+import akka.actor.Actor._
+import akka.actor.{ActorRegistry, ActorRef, Actor}
+import akka.camel._
+import akka.camel.CamelServiceManager._
+import akka.routing.CyclicIterator
+import akka.routing.Routing._
+
+/**
+ * @author Martin Krasser
+ */
+class HttpConcurrencyTestStress extends JUnitSuite {
+ import HttpConcurrencyTestStress._
+
+ @Test def shouldProcessMessagesConcurrently = {
+ val num = 50
+ val latch1 = new CountDownLatch(num)
+ val latch2 = new CountDownLatch(num)
+ val latch3 = new CountDownLatch(num)
+ val client1 = actorOf(new HttpClientActor("client1", latch1)).start
+ val client2 = actorOf(new HttpClientActor("client2", latch2)).start
+ val client3 = actorOf(new HttpClientActor("client3", latch3)).start
+ for (i <- 1 to num) {
+ client1 ! Message("client1", Map(Message.MessageExchangeId -> i))
+ client2 ! Message("client2", Map(Message.MessageExchangeId -> i))
+ client3 ! Message("client3", Map(Message.MessageExchangeId -> i))
+ }
+ latch1.await
+ latch2.await
+ latch3.await
+ assert(num == (client1 !! "getCorrelationIdCount").as[Int].get)
+ assert(num == (client2 !! "getCorrelationIdCount").as[Int].get)
+ assert(num == (client3 !! "getCorrelationIdCount").as[Int].get)
+ }
+}
+
+object HttpConcurrencyTestStress {
+ @BeforeClass
+ def beforeClass: Unit = {
+ startCamelService
+
+ val workers = for (i <- 1 to 8) yield actorOf[HttpServerWorker].start
+ val balancer = loadBalancerActor(new CyclicIterator(workers.toList))
+
+ service.get.awaitEndpointActivation(1) {
+ actorOf(new HttpServerActor(balancer)).start
+ }
+ }
+
+ @AfterClass
+ def afterClass = {
+ stopCamelService
+ Actor.registry.shutdownAll
+ }
+
+ class HttpClientActor(label: String, latch: CountDownLatch) extends Actor with Producer {
+ def endpointUri = "jetty:http://0.0.0.0:8855/echo"
+ var correlationIds = Set[Any]()
+
+ override protected def receive = {
+ case "getCorrelationIdCount" => self.reply(correlationIds.size)
+ case msg => super.receive(msg)
+ }
+
+ override protected def receiveAfterProduce = {
+ case msg: Message => {
+ val corr = msg.headers(Message.MessageExchangeId)
+ val body = msg.bodyAs[String]
+ correlationIds += corr
+ assert(label == body)
+ latch.countDown
+ print(".")
+ }
+ }
+ }
+
+ class HttpServerActor(balancer: ActorRef) extends Actor with Consumer {
+ def endpointUri = "jetty:http://0.0.0.0:8855/echo"
+ var counter = 0
+
+ def receive = {
+ case msg => balancer forward msg
+ }
+ }
+
+ class HttpServerWorker extends Actor {
+ protected def receive = {
+ case msg => self.reply(msg)
+ }
+ }
+}
diff --git a/akka-samples/akka-sample-camel/src/test/scala/sample/camel/RemoteConsumerTest.scala b/akka-samples/akka-sample-camel/src/test/scala/sample/camel/RemoteConsumerTest.scala
new file mode 100644
index 0000000000..087b46b9b1
--- /dev/null
+++ b/akka-samples/akka-sample-camel/src/test/scala/sample/camel/RemoteConsumerTest.scala
@@ -0,0 +1,99 @@
+package sample.camel
+
+import org.scalatest.{GivenWhenThen, BeforeAndAfterAll, FeatureSpec}
+
+import akka.actor.Actor._
+import akka.actor._
+import akka.camel._
+import akka.remote.netty.NettyRemoteSupport
+import akka.remoteinterface.RemoteServerModule
+
+/**
+ * @author Martin Krasser
+ */
+class RemoteConsumerTest extends FeatureSpec with BeforeAndAfterAll with GivenWhenThen {
+ import CamelServiceManager._
+ import RemoteConsumerTest._
+
+ var server: RemoteServerModule = _
+
+ override protected def beforeAll = {
+ registry.shutdownAll
+
+ startCamelService
+
+ remote.shutdown
+ remote.asInstanceOf[NettyRemoteSupport].optimizeLocal.set(false)
+
+ server = remote.start(host,port)
+ }
+
+ override protected def afterAll = {
+ remote.shutdown
+
+ stopCamelService
+
+ registry.shutdownAll
+ remote.asInstanceOf[NettyRemoteSupport].optimizeLocal.set(true)
+ }
+
+ feature("Publish consumer on remote node") {
+ scenario("access published remote consumer") {
+ given("a consumer actor")
+ val consumer = Actor.actorOf[RemoteConsumer]
+
+ when("registered at the server")
+ assert(mandatoryService.awaitEndpointActivation(1) {
+ remote.register(consumer)
+ })
+
+ then("the published consumer is accessible via its endpoint URI")
+ val response = CamelContextManager.mandatoryTemplate.requestBody("direct:remote-consumer", "test")
+ assert(response === "remote actor: test")
+ }
+ }
+
+ feature("Publish typed consumer on remote node") {
+ scenario("access published remote consumer method") {
+ given("a typed consumer actor")
+ when("registered at the server")
+ assert(mandatoryService.awaitEndpointActivation(1) {
+ remote.registerTypedActor("whatever", TypedActor.newInstance(
+ classOf[SampleRemoteTypedConsumer],
+ classOf[SampleRemoteTypedConsumerImpl]))
+ })
+ then("the published method is accessible via its endpoint URI")
+ val response = CamelContextManager.mandatoryTemplate.requestBody("direct:remote-typed-consumer", "test")
+ assert(response === "remote typed actor: test")
+ }
+ }
+
+ feature("Publish untyped consumer on remote node") {
+ scenario("access published remote untyped consumer") {
+ given("an untyped consumer actor")
+ val consumer = Actor.actorOf(classOf[SampleRemoteUntypedConsumer])
+
+ when("registered at the server")
+ assert(mandatoryService.awaitEndpointActivation(1) {
+ remote.register(consumer)
+ })
+ then("the published untyped consumer is accessible via its endpoint URI")
+ val response = CamelContextManager.mandatoryTemplate.requestBodyAndHeader("direct:remote-untyped-consumer", "a", "test", "b")
+ assert(response === "a b")
+ }
+ }
+}
+
+object RemoteConsumerTest {
+ val host = "localhost"
+ val port = 7774
+
+ class RemoteConsumer extends Actor with Consumer {
+ def endpointUri = "direct:remote-consumer"
+
+ protected def receive = {
+ case "init" => self.reply("done")
+ case m: Message => self.reply("remote actor: %s" format m.body)
+ }
+ }
+}
diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala
index ec0a765aa1..844be68fe6 100644
--- a/project/build/AkkaProject.scala
+++ b/project/build/AkkaProject.scala
@@ -52,6 +52,7 @@ class AkkaParentProject(info: ProjectInfo) extends ParentProject(info) with Exec
import Repositories._
lazy val jettyModuleConfig = ModuleConfiguration("org.eclipse.jetty", sbt.DefaultMavenRepository)
+ lazy val camelJettyModuleConfig = ModuleConfiguration("org.apache.camel", "camel-jetty", AkkaRepo)
lazy val guiceyFruitModuleConfig = ModuleConfiguration("org.guiceyfruit", GuiceyFruitRepo)
lazy val glassfishModuleConfig = ModuleConfiguration("org.glassfish", GlassfishRepo)
lazy val jbossModuleConfig = ModuleConfiguration("org.jboss", JBossRepo)
@@ -99,12 +100,14 @@ class AkkaParentProject(info: ProjectInfo) extends ParentProject(info) with Exec
// Compile
+ lazy val activemq = "org.apache.activemq" % "activemq-core" % "5.4.2" % "compile" // ApacheV2
lazy val aopalliance = "aopalliance" % "aopalliance" % "1.0" % "compile" //Public domain
lazy val aspectwerkz = "org.codehaus.aspectwerkz" % "aspectwerkz" % "2.2.3" % "compile" //ApacheV2
lazy val beanstalk = "beanstalk" % "beanstalk_client" % "1.4.5" //New BSD
lazy val bookkeeper = "org.apache.hadoop.zookeeper" % "bookkeeper" % ZOOKEEPER_VERSION //ApacheV2
lazy val camel_core = "org.apache.camel" % "camel-core" % CAMEL_VERSION % "compile" //ApacheV2
-
+ lazy val camel_jetty = "org.apache.camel" % "camel-jetty" % "2.7.1.1" % "compile"
+ lazy val camel_jms = "org.apache.camel" % "camel-jms" % CAMEL_VERSION % "compile" //ApacheV2
lazy val commons_codec = "commons-codec" % "commons-codec" % "1.4" % "compile" //ApacheV2
lazy val commons_io = "commons-io" % "commons-io" % "2.0.1" % "compile" //ApacheV2
lazy val javax_servlet_30 = "org.glassfish" % "javax.servlet" % JAVAX_SERVLET_VERSION % "provided" //CDDL v1
@@ -134,7 +137,7 @@ class AkkaParentProject(info: ProjectInfo) extends ParentProject(info) with Exec
lazy val slf4j = "org.slf4j" % "slf4j-api" % SLF4J_VERSION // MIT
lazy val spring_beans = "org.springframework" % "spring-beans" % SPRING_VERSION % "compile" //ApacheV2
lazy val spring_context = "org.springframework" % "spring-context" % SPRING_VERSION % "compile" //ApacheV2
-
+ lazy val spring_jms = "org.springframework" % "spring-jms" % SPRING_VERSION % "compile" //ApacheV2
lazy val stax_api = "javax.xml.stream" % "stax-api" % "1.0-2" % "compile" //ApacheV2
lazy val logback = "ch.qos.logback" % "logback-classic" % "0.9.28" % "runtime" //MIT
lazy val log4j = "log4j" % "log4j" % "1.2.15" //ApacheV2
@@ -604,6 +607,22 @@ class AkkaParentProject(info: ProjectInfo) extends ParentProject(info) with Exec
class AkkaSampleFSMProject(info: ProjectInfo) extends AkkaDefaultProject(info)
+ class AkkaSampleCamelProject(info: ProjectInfo) extends AkkaDefaultProject(info) {
+ val activemq = Dependencies.activemq
+ val camel_jetty = Dependencies.camel_jetty
+ val camel_jms = Dependencies.camel_jms
+ val spring_jms = Dependencies.spring_jms
+ val commons_codec = Dependencies.commons_codec
+
+ override def ivyXML = {
+
+
+
+ }
+
+ override def testOptions = createTestFilter( _.endsWith("Test"))
+ }
+
class AkkaSampleOsgiProject(info: ProjectInfo) extends AkkaDefaultProject(info) with BNDPlugin {
val osgiCore = Dependencies.osgi_core
override protected def bndPrivatePackage = List("sample.osgi.*")
@@ -624,6 +643,10 @@ class AkkaParentProject(info: ProjectInfo) extends ParentProject(info) with Exec
lazy val akka_sample_osgi = project("akka-sample-osgi", "akka-sample-osgi",
new AkkaSampleOsgiProject(_), akka_actor)
+ lazy val akka_sample_camel = project("akka-sample-camel", "akka-sample-camel",
+ new AkkaSampleCamelProject(_), akka_actor, akka_kernel)
+
+
lazy val publishRelease = {
val releaseConfiguration = new DefaultPublishConfiguration(localReleaseRepository, "release", false)
publishTask(publishIvyModule, releaseConfiguration) dependsOn (deliver, publishLocal, makePom)