copied over akka-sample-camel

This commit is contained in:
ticktock 2011-05-25 13:47:36 -07:00
parent 6f1ff4efdb
commit 0770e54e4d
24 changed files with 994 additions and 2 deletions

View file

@ -0,0 +1,20 @@
####################
# Akka Config File #
####################
akka {
version = "2.0-SNAPSHOT"
enabled-modules = ["camel", "http"]
time-unit = "seconds"
event-handlers = ["akka.event.EventHandler$DefaultListener"]
boot = ["sample.camel.Boot"]
http {
hostname = "localhost"
port = 9998
}
}

View file

@ -0,0 +1,65 @@
<?xml version="1.0"?>
<!DOCTYPE Configure PUBLIC "-//Jetty//Configure//EN" "http://www.eclipse.org/jetty/configure.dtd">
<Configure id="Server" class="org.eclipse.jetty.server.Server">
<!-- =========================================================== -->
<!-- Server Thread Pool -->
<!-- =========================================================== -->
<Set name="ThreadPool">
<New class="org.eclipse.jetty.util.thread.ExecutorThreadPool">
</New>
</Set>
<!-- =========================================================== -->
<!-- Set connectors -->
<!-- =========================================================== -->
<Call name="addConnector">
<Arg>
<New class="org.eclipse.jetty.server.nio.SelectChannelConnector">
<Set name="host"><SystemProperty name="jetty.host" /></Set>
<Set name="port"><SystemProperty name="jetty.port" default="8080"/></Set>
<Set name="maxIdleTime">300000</Set>
<Set name="Acceptors">2</Set>
<Set name="statsOn">false</Set>
<Set name="confidentialPort">8443</Set>
<Set name="lowResourcesConnections">20000</Set>
<Set name="lowResourcesMaxIdleTime">5000</Set>
</New>
</Arg>
</Call>
<!-- =========================================================== -->
<!-- Set handler -->
<!-- =========================================================== -->
<Set name="handler">
<New id="Handlers" class="org.eclipse.jetty.server.handler.HandlerCollection">
<Set name="handlers">
<Array type="org.eclipse.jetty.server.Handler">
<Item>
<New id="AkkaRestHandler" class="org.eclipse.jetty.servlet.ServletContextHandler">
<Set name="contextPath">/</Set>
<Call name="addServlet">
<Arg>akka.http.AkkaRestServlet</Arg>
<Arg>/*</Arg>
</Call>
</New>
</Item>
<Item>
<New id="DefaultHandler" class="org.eclipse.jetty.server.handler.DefaultHandler"/>
</Item>
</Array>
</Set>
</New>
</Set>
<!-- =========================================================== -->
<!-- extra options -->
<!-- =========================================================== -->
<Set name="stopAtShutdown">true</Set>
<Set name="sendServerVersion">true</Set>
<Set name="sendDateHeader">true</Set>
<Set name="gracefulShutdown">1000</Set>
</Configure>

View file

@ -0,0 +1,13 @@
package sample.camel;
import akka.actor.TypedActor;
/**
* @author Martin Krasser
*/
public class BeanImpl extends TypedActor implements BeanIntf {
public String foo(String s) {
return "hello " + s;
}
}

View file

@ -0,0 +1,10 @@
package sample.camel;
/**
* @author Martin Krasser
*/
public interface BeanIntf {
public String foo(String s);
}

View file

@ -0,0 +1,15 @@
package sample.camel;
import org.apache.camel.Body;
import org.apache.camel.Header;
import akka.camel.consume;
/**
* @author Martin Krasser
*/
public interface RemoteTypedConsumer1 {
@consume("jetty:http://localhost:6644/camel/remote-typed-actor-1")
public String foo(@Body String body, @Header("name") String header);
}

View file

@ -0,0 +1,13 @@
package sample.camel;
import akka.actor.TypedActor;
/**
* @author Martin Krasser
*/
public class RemoteTypedConsumer1Impl extends TypedActor implements RemoteTypedConsumer1 {
public String foo(String body, String header) {
return String.format("remote1: body=%s header=%s", body, header);
}
}

View file

@ -0,0 +1,15 @@
package sample.camel;
import org.apache.camel.Body;
import org.apache.camel.Header;
import akka.camel.consume;
/**
* @author Martin Krasser
*/
public interface RemoteTypedConsumer2 {
@consume("jetty:http://localhost:6644/camel/remote-typed-actor-2")
public String foo(@Body String body, @Header("name") String header);
}

View file

@ -0,0 +1,14 @@
package sample.camel;
import akka.actor.TypedActor;
/**
* @author Martin Krasser
*/
public class RemoteTypedConsumer2Impl extends TypedActor implements RemoteTypedConsumer2 {
public String foo(String body, String header) {
return String.format("remote2: body=%s header=%s", body, header);
}
}

View file

@ -0,0 +1,17 @@
package sample.camel;
import org.apache.camel.Body;
import org.apache.camel.Header;
import akka.camel.consume;
/**
* @author Martin Krasser
*/
public interface TypedConsumer1 {
@consume("file:data/input/typed-actor")
public void foo(String body);
@consume("jetty:http://0.0.0.0:8877/camel/typed-actor")
public String bar(@Body String body, @Header("name") String header);
}

View file

@ -0,0 +1,21 @@
package sample.camel;
import org.apache.camel.Body;
import org.apache.camel.Header;
import akka.actor.TypedActor;
/**
* @author Martin Krasser
*/
public class TypedConsumer1Impl extends TypedActor implements TypedConsumer1 {
public void foo(String body) {
System.out.println("Received message:");
System.out.println(body);
}
public String bar(@Body String body, @Header("name") String header) {
return String.format("body=%s header=%s", body, header);
}
}

View file

@ -0,0 +1,14 @@
package sample.camel;
import org.apache.camel.Body;
import org.apache.camel.Header;
import akka.camel.consume;
/**
* @author Martin Krasser
*/
public interface TypedConsumer2 {
@consume("direct:default")
public String foo(String body);
}

View file

@ -0,0 +1,13 @@
package sample.camel;
import akka.actor.TypedActor;
/**
* @author Martin Krasser
*/
public class TypedConsumer2Impl extends TypedActor implements TypedConsumer2 {
public String foo(String body) {
return String.format("default: %s", body);
}
}

View file

@ -0,0 +1,20 @@
package sample.camel;
import akka.camel.Message;
import akka.camel.UntypedConsumerActor;
/**
* @author Martin Krasser
*/
public class UntypedConsumer1 extends UntypedConsumerActor {
public String getEndpointUri() {
return "direct:untyped-consumer-1";
}
public void onReceive(Object message) {
Message msg = (Message)message;
String body = msg.getBodyAs(String.class);
getContext().replySafe(String.format("received %s", body));
}
}

View file

@ -0,0 +1,27 @@
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd">
<!-- ================================================================== -->
<!-- Camel JMS component and ActiveMQ setup -->
<!-- ================================================================== -->
<bean id="jms" class="org.apache.camel.component.jms.JmsComponent">
<property name="configuration" ref="jmsConfig"/>
</bean>
<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
<property name="connectionFactory" ref="singleConnectionFactory"/>
</bean>
<bean id="singleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="jmsConnectionFactory"/>
</bean>
<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="vm://testbroker"/>
</bean>
</beans>

View file

@ -0,0 +1,26 @@
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:akka="http://akka.io/schema/akka"
xmlns:camel="http://camel.apache.org/schema/spring"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://akka.io/schema/akka
http://akka.io/akka-2.0-SNAPSHOT.xsd
http://camel.apache.org/schema/spring
http://camel.apache.org/schema/spring/camel-spring.xsd">
<bean id="routeBuilder" class="sample.camel.StandaloneSpringApplicationRoute" />
<camel:camelContext id="camelContext">
<camel:routeBuilder ref="routeBuilder" />
</camel:camelContext>
<akka:camel-service id="service">
<akka:camel-context ref="camelContext" />
</akka:camel-service>
<akka:typed-actor id="ta" interface="sample.camel.BeanIntf" implementation="sample.camel.BeanImpl" timeout="1000" />
<akka:untyped-actor id="ua" implementation="sample.camel.UntypedConsumer1" scope="singleton" autostart="true" />
</beans>

View file

@ -0,0 +1,162 @@
package sample.camel
import org.apache.camel.Exchange
import akka.actor.{Actor, ActorRef, ActorRegistry}
import akka.camel.{Ack, Failure, Producer, Message, Consumer}
/**
* Client-initiated remote actor.
*/
class RemoteActor1 extends Actor with Consumer {
def endpointUri = "jetty:http://localhost:6644/camel/remote-actor-1"
protected def receive = {
case msg: Message => self.reply(Message("hello %s" format msg.bodyAs[String], Map("sender" -> "remote1")))
}
}
/**
* Server-initiated remote actor.
*/
class RemoteActor2 extends Actor with Consumer {
def endpointUri = "jetty:http://localhost:6644/camel/remote-actor-2"
protected def receive = {
case msg: Message => self.reply(Message("hello %s" format msg.bodyAs[String], Map("sender" -> "remote2")))
}
}
class Producer1 extends Actor with Producer {
def endpointUri = "direct:welcome"
override def oneway = false // default
}
class Consumer1 extends Actor with Consumer {
def endpointUri = "file:data/input/actor"
def receive = {
case msg: Message => println("received %s" format msg.bodyAs[String])
}
}
class Consumer2 extends Actor with Consumer {
def endpointUri = "jetty:http://0.0.0.0:8877/camel/default"
def receive = {
case msg: Message => self.reply("Hello %s" format msg.bodyAs[String])
}
}
class Consumer3(transformer: ActorRef) extends Actor with Consumer {
def endpointUri = "jetty:http://0.0.0.0:8877/camel/welcome"
def receive = {
case msg: Message => transformer.forward(msg.setBodyAs[String])
}
}
class Consumer4 extends Actor with Consumer {
def endpointUri = "jetty:http://0.0.0.0:8877/camel/stop"
def receive = {
case msg: Message => msg.bodyAs[String] match {
case "stop" => {
self.reply("Consumer4 stopped")
self.stop
}
case body => self.reply(body)
}
}
}
class Consumer5 extends Actor with Consumer {
def endpointUri = "jetty:http://0.0.0.0:8877/camel/start"
def receive = {
case _ => {
Actor.actorOf[Consumer4].start
self.reply("Consumer4 started")
}
}
}
class Transformer(producer: ActorRef) extends Actor {
protected def receive = {
case msg: Message => producer.forward(msg.transformBody( (body: String) => "- %s -" format body))
}
}
class Subscriber(name:String, uri: String) extends Actor with Consumer {
def endpointUri = uri
protected def receive = {
case msg: Message => println("%s received: %s" format (name, msg.body))
}
}
class Publisher(name: String, uri: String) extends Actor with Producer {
self.id = name
def endpointUri = uri
override def oneway = true
}
class PublisherBridge(uri: String, publisher: ActorRef) extends Actor with Consumer {
def endpointUri = uri
protected def receive = {
case msg: Message => {
publisher ! msg.bodyAs[String]
self.reply("message published")
}
}
}
class HttpConsumer(producer: ActorRef) extends Actor with Consumer {
def endpointUri = "jetty:http://0.0.0.0:8875/"
protected def receive = {
case msg => producer forward msg
}
}
class HttpProducer(transformer: ActorRef) extends Actor with Producer {
def endpointUri = "jetty://http://akka.io/?bridgeEndpoint=true"
override protected def receiveBeforeProduce = {
// only keep Exchange.HTTP_PATH message header (which needed by bridge endpoint)
case msg: Message => msg.setHeaders(msg.headers(Set(Exchange.HTTP_PATH)))
}
override protected def receiveAfterProduce = {
// do not reply but forward result to transformer
case msg => transformer forward msg
}
}
class HttpTransformer extends Actor {
protected def receive = {
case msg: Message => self.reply(msg.transformBody {body: String => body replaceAll ("Akka ", "AKKA ")})
case msg: Failure => self.reply(msg)
}
}
class FileConsumer extends Actor with Consumer {
def endpointUri = "file:data/input/actor?delete=true"
override def autoack = false
var counter = 0
def receive = {
case msg: Message => {
if (counter == 2) {
println("received %s" format msg.bodyAs[String])
self.reply(Ack)
} else {
println("rejected %s" format msg.bodyAs[String])
counter += 1
self.reply(Failure(new Exception("message number %s not accepted" format counter)))
}
}
}
}

View file

@ -0,0 +1,108 @@
package sample.camel
import org.apache.camel.{Exchange, Processor}
import org.apache.camel.builder.RouteBuilder
import org.apache.camel.impl.DefaultCamelContext
import org.apache.camel.spring.spi.ApplicationContextRegistry
import org.springframework.context.support.ClassPathXmlApplicationContext
import akka.actor.Actor._
import akka.actor.{TypedActor, Supervisor}
import akka.camel.CamelContextManager
import akka.config.Supervision._
/**
* @author Martin Krasser
*/
class Boot {
// -----------------------------------------------------------------------
// Basic example
// -----------------------------------------------------------------------
actorOf[Consumer1].start
actorOf[Consumer2].start
// Alternatively, use a supervisor for these actors
//val supervisor = Supervisor(
// SupervisorConfig(
// RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])),
// Supervise(actorOf[Consumer1], Permanent) ::
// Supervise(actorOf[Consumer2], Permanent) :: Nil))
// -----------------------------------------------------------------------
// Custom Camel route example
// -----------------------------------------------------------------------
// Create CamelContext and a Spring-based registry
val context = new ClassPathXmlApplicationContext("/context-jms.xml", getClass)
val registry = new ApplicationContextRegistry(context)
// Use a custom Camel context and a custom touter builder
CamelContextManager.init(new DefaultCamelContext(registry))
CamelContextManager.mandatoryContext.addRoutes(new CustomRouteBuilder)
val producer = actorOf[Producer1]
val mediator = actorOf(new Transformer(producer))
val consumer = actorOf(new Consumer3(mediator))
producer.start
mediator.start
consumer.start
// -----------------------------------------------------------------------
// Asynchronous consumer-producer example (Akka homepage transformation)
// -----------------------------------------------------------------------
val httpTransformer = actorOf(new HttpTransformer).start
val httpProducer = actorOf(new HttpProducer(httpTransformer)).start
val httpConsumer = actorOf(new HttpConsumer(httpProducer)).start
// -----------------------------------------------------------------------
// Publish subscribe examples
// -----------------------------------------------------------------------
//
// Cometd example commented out because camel-cometd is broken since Camel 2.3
//
//val cometdUri = "cometd://localhost:8111/test/abc?baseResource=file:target"
//val cometdSubscriber = actorOf(new Subscriber("cometd-subscriber", cometdUri)).start
//val cometdPublisher = actorOf(new Publisher("cometd-publisher", cometdUri)).start
val jmsUri = "jms:topic:test"
val jmsSubscriber1 = actorOf(new Subscriber("jms-subscriber-1", jmsUri)).start
val jmsSubscriber2 = actorOf(new Subscriber("jms-subscriber-2", jmsUri)).start
val jmsPublisher = actorOf(new Publisher("jms-publisher", jmsUri)).start
//val cometdPublisherBridge = actorOf(new PublisherBridge("jetty:http://0.0.0.0:8877/camel/pub/cometd", cometdPublisher)).start
val jmsPublisherBridge = actorOf(new PublisherBridge("jetty:http://0.0.0.0:8877/camel/pub/jms", jmsPublisher)).start
// -----------------------------------------------------------------------
// Actor un-publishing and re-publishing example
// -----------------------------------------------------------------------
actorOf[Consumer4].start // POSTing "stop" to http://0.0.0.0:8877/camel/stop stops and unpublishes this actor
actorOf[Consumer5].start // POSTing any msg to http://0.0.0.0:8877/camel/start starts and published Consumer4 again.
// -----------------------------------------------------------------------
// Active object example
// -----------------------------------------------------------------------
//TypedActor.newInstance(classOf[TypedConsumer1], classOf[TypedConsumer1Impl])
}
/**
* @author Martin Krasser
*/
class CustomRouteBuilder extends RouteBuilder {
def configure {
val actorUri = "actor:%s" format classOf[Consumer2].getName
from("jetty:http://0.0.0.0:8877/camel/custom").to(actorUri)
from("direct:welcome").process(new Processor() {
def process(exchange: Exchange) {
exchange.getOut.setBody("Welcome %s" format exchange.getIn.getBody)
}
})
}
}

View file

@ -0,0 +1,26 @@
package sample.camel
import akka.actor.Actor._
import akka.actor.TypedActor
import akka.camel.Message
/**
* @author Martin Krasser
*/
object ClientApplication extends App {
val actor1 = remote.actorOf[RemoteActor1]("localhost", 7777).start
val actor2 = remote.actorFor("remote2", "localhost", 7777)
val typedActor1 =
TypedActor.newRemoteInstance(classOf[RemoteTypedConsumer1],classOf[RemoteTypedConsumer1Impl], "localhost", 7777)
val typedActor2 = remote.typedActorFor(classOf[RemoteTypedConsumer2], "remote3", "localhost", 7777)
println(actor1 !! Message("actor1")) // activates and publishes actor remotely
println(actor2 !! Message("actor2")) // actor already activated and published remotely
println(typedActor1.foo("x1", "y1")) // activates and publishes typed actor methods remotely
println(typedActor2.foo("x2", "y2")) // typed actor methods already activated and published remotely
}

View file

@ -0,0 +1,23 @@
package sample.camel
import akka.actor.Actor._
import akka.camel.CamelServiceManager
import akka.actor.{TypedActor}
/**
* @author Martin Krasser
*/
object ServerApplication extends App {
import CamelServiceManager._
startCamelService
val ua = actorOf[RemoteActor2].start
val ta = TypedActor.newInstance(
classOf[RemoteTypedConsumer2],
classOf[RemoteTypedConsumer2Impl], 2000)
remote.start("localhost", 7777)
remote.register("remote2", ua)
remote.registerTypedActor("remote3", ta)
}

View file

@ -0,0 +1,128 @@
package sample.camel
import org.apache.camel.impl.{DefaultCamelContext, SimpleRegistry}
import org.apache.camel.builder.RouteBuilder
import org.apache.camel.spring.spi.ApplicationContextRegistry
import org.springframework.context.support.ClassPathXmlApplicationContext
import akka.actor.{Actor, ActorRegistry, TypedActor}
import akka.camel._
/**
* @author Martin Krasser
*/
object StandaloneApplication extends App {
import CamelContextManager._
import CamelServiceManager._
// 'externally' register typed actors
val registry = new SimpleRegistry
registry.put("sample", TypedActor.newInstance(classOf[BeanIntf], classOf[BeanImpl]))
// customize CamelContext
CamelContextManager.init(new DefaultCamelContext(registry))
CamelContextManager.mandatoryContext.addRoutes(new StandaloneApplicationRoute)
startCamelService
// access 'externally' registered typed actors
assert("hello msg1" == mandatoryContext.createProducerTemplate.requestBody("direct:test", "msg1"))
mandatoryService.awaitEndpointActivation(1) {
// 'internally' register typed actor (requires CamelService)
TypedActor.newInstance(classOf[TypedConsumer2], classOf[TypedConsumer2Impl])
}
// access 'internally' (automatically) registered typed-actors
// (see @consume annotation value at TypedConsumer2.foo method)
assert("default: msg3" == mandatoryContext.createProducerTemplate.requestBody("direct:default", "msg3"))
stopCamelService
Actor.registry.shutdownAll
}
class StandaloneApplicationRoute extends RouteBuilder {
def configure = {
// route to typed actors (in SimpleRegistry)
from("direct:test").to("typed-actor:sample?method=foo")
}
}
object StandaloneSpringApplication extends App {
import CamelContextManager._
// load Spring application context
val appctx = new ClassPathXmlApplicationContext("/context-standalone.xml")
// We cannot use the CamelServiceManager to wait for endpoint activation
// because CamelServiceManager is started by the Spring application context.
// (and hence is not available for setting expectations on activations). This
// will be improved/enabled in upcoming releases.
Thread.sleep(1000)
// access 'externally' registered typed actors with typed-actor component
assert("hello msg3" == mandatoryTemplate.requestBody("direct:test3", "msg3"))
// access auto-started untyped consumer
assert("received msg3" == mandatoryTemplate.requestBody("direct:untyped-consumer-1", "msg3"))
appctx.close
Actor.registry.shutdownAll
}
class StandaloneSpringApplicationRoute extends RouteBuilder {
def configure = {
// routes to typed actor (in ApplicationContextRegistry)
from("direct:test3").to("typed-actor:ta?method=foo")
}
}
object StandaloneJmsApplication extends App {
import CamelServiceManager._
val context = new ClassPathXmlApplicationContext("/context-jms.xml")
val registry = new ApplicationContextRegistry(context)
// Init CamelContextManager with custom CamelContext
CamelContextManager.init(new DefaultCamelContext(registry))
startCamelService
val jmsUri = "jms:topic:test"
val jmsPublisher = Actor.actorOf(new Publisher("jms-publisher", jmsUri)).start
mandatoryService.awaitEndpointActivation(2) {
Actor.actorOf(new Subscriber("jms-subscriber-1", jmsUri)).start
Actor.actorOf(new Subscriber("jms-subscriber-2", jmsUri)).start
}
// Send 10 messages to via publisher actor
for(i <- 1 to 10) {
jmsPublisher ! ("Akka rocks (%d)" format i)
}
// Send 10 messages to JMS topic directly
for(i <- 1 to 10) {
CamelContextManager.mandatoryTemplate.sendBody(jmsUri, "Camel rocks (%d)" format i)
}
// Wait a bit for subscribes to receive messages
Thread.sleep(1000)
stopCamelService
Actor.registry.shutdownAll
}
object StandaloneFileApplication {
import CamelServiceManager._
def main(args: Array[String]) {
startCamelService
mandatoryService.awaitEndpointActivation(1) {
Actor.actorOf(new FileConsumer).start
}
}
}

View file

@ -0,0 +1,21 @@
package sample.camel;
import akka.camel.Message;
import akka.camel.UntypedConsumerActor;
/**
* @author Martin Krasser
*/
public class SampleRemoteUntypedConsumer extends UntypedConsumerActor {
public String getEndpointUri() {
return "direct:remote-untyped-consumer";
}
public void onReceive(Object message) {
Message msg = (Message)message;
String body = msg.getBodyAs(String.class);
String header = msg.getHeaderAs("test", String.class);
getContext().replySafe(String.format("%s %s", body, header));
}
}

View file

@ -0,0 +1,99 @@
package sample.camel
import collection.mutable.Set
import java.util.concurrent.CountDownLatch
import org.junit._
import org.scalatest.junit.JUnitSuite
import akka.actor.Actor._
import akka.actor.{ActorRegistry, ActorRef, Actor}
import akka.camel._
import akka.camel.CamelServiceManager._
import akka.routing.CyclicIterator
import akka.routing.Routing._
/**
* @author Martin Krasser
*/
class HttpConcurrencyTestStress extends JUnitSuite {
import HttpConcurrencyTestStress._
@Test def shouldProcessMessagesConcurrently = {
val num = 50
val latch1 = new CountDownLatch(num)
val latch2 = new CountDownLatch(num)
val latch3 = new CountDownLatch(num)
val client1 = actorOf(new HttpClientActor("client1", latch1)).start
val client2 = actorOf(new HttpClientActor("client2", latch2)).start
val client3 = actorOf(new HttpClientActor("client3", latch3)).start
for (i <- 1 to num) {
client1 ! Message("client1", Map(Message.MessageExchangeId -> i))
client2 ! Message("client2", Map(Message.MessageExchangeId -> i))
client3 ! Message("client3", Map(Message.MessageExchangeId -> i))
}
latch1.await
latch2.await
latch3.await
assert(num == (client1 !! "getCorrelationIdCount").as[Int].get)
assert(num == (client2 !! "getCorrelationIdCount").as[Int].get)
assert(num == (client3 !! "getCorrelationIdCount").as[Int].get)
}
}
object HttpConcurrencyTestStress {
@BeforeClass
def beforeClass: Unit = {
startCamelService
val workers = for (i <- 1 to 8) yield actorOf[HttpServerWorker].start
val balancer = loadBalancerActor(new CyclicIterator(workers.toList))
service.get.awaitEndpointActivation(1) {
actorOf(new HttpServerActor(balancer)).start
}
}
@AfterClass
def afterClass = {
stopCamelService
Actor.registry.shutdownAll
}
class HttpClientActor(label: String, latch: CountDownLatch) extends Actor with Producer {
def endpointUri = "jetty:http://0.0.0.0:8855/echo"
var correlationIds = Set[Any]()
override protected def receive = {
case "getCorrelationIdCount" => self.reply(correlationIds.size)
case msg => super.receive(msg)
}
override protected def receiveAfterProduce = {
case msg: Message => {
val corr = msg.headers(Message.MessageExchangeId)
val body = msg.bodyAs[String]
correlationIds += corr
assert(label == body)
latch.countDown
print(".")
}
}
}
class HttpServerActor(balancer: ActorRef) extends Actor with Consumer {
def endpointUri = "jetty:http://0.0.0.0:8855/echo"
var counter = 0
def receive = {
case msg => balancer forward msg
}
}
class HttpServerWorker extends Actor {
protected def receive = {
case msg => self.reply(msg)
}
}
}

View file

@ -0,0 +1,99 @@
package sample.camel
import org.scalatest.{GivenWhenThen, BeforeAndAfterAll, FeatureSpec}
import akka.actor.Actor._
import akka.actor._
import akka.camel._
import akka.remote.netty.NettyRemoteSupport
import akka.remoteinterface.RemoteServerModule
/**
* @author Martin Krasser
*/
class RemoteConsumerTest extends FeatureSpec with BeforeAndAfterAll with GivenWhenThen {
import CamelServiceManager._
import RemoteConsumerTest._
var server: RemoteServerModule = _
override protected def beforeAll = {
registry.shutdownAll
startCamelService
remote.shutdown
remote.asInstanceOf[NettyRemoteSupport].optimizeLocal.set(false)
server = remote.start(host,port)
}
override protected def afterAll = {
remote.shutdown
stopCamelService
registry.shutdownAll
remote.asInstanceOf[NettyRemoteSupport].optimizeLocal.set(true)
}
feature("Publish consumer on remote node") {
scenario("access published remote consumer") {
given("a consumer actor")
val consumer = Actor.actorOf[RemoteConsumer]
when("registered at the server")
assert(mandatoryService.awaitEndpointActivation(1) {
remote.register(consumer)
})
then("the published consumer is accessible via its endpoint URI")
val response = CamelContextManager.mandatoryTemplate.requestBody("direct:remote-consumer", "test")
assert(response === "remote actor: test")
}
}
feature("Publish typed consumer on remote node") {
scenario("access published remote consumer method") {
given("a typed consumer actor")
when("registered at the server")
assert(mandatoryService.awaitEndpointActivation(1) {
remote.registerTypedActor("whatever", TypedActor.newInstance(
classOf[SampleRemoteTypedConsumer],
classOf[SampleRemoteTypedConsumerImpl]))
})
then("the published method is accessible via its endpoint URI")
val response = CamelContextManager.mandatoryTemplate.requestBody("direct:remote-typed-consumer", "test")
assert(response === "remote typed actor: test")
}
}
feature("Publish untyped consumer on remote node") {
scenario("access published remote untyped consumer") {
given("an untyped consumer actor")
val consumer = Actor.actorOf(classOf[SampleRemoteUntypedConsumer])
when("registered at the server")
assert(mandatoryService.awaitEndpointActivation(1) {
remote.register(consumer)
})
then("the published untyped consumer is accessible via its endpoint URI")
val response = CamelContextManager.mandatoryTemplate.requestBodyAndHeader("direct:remote-untyped-consumer", "a", "test", "b")
assert(response === "a b")
}
}
}
object RemoteConsumerTest {
val host = "localhost"
val port = 7774
class RemoteConsumer extends Actor with Consumer {
def endpointUri = "direct:remote-consumer"
protected def receive = {
case "init" => self.reply("done")
case m: Message => self.reply("remote actor: %s" format m.body)
}
}
}

View file

@ -52,6 +52,7 @@ class AkkaParentProject(info: ProjectInfo) extends ParentProject(info) with Exec
import Repositories._
lazy val jettyModuleConfig = ModuleConfiguration("org.eclipse.jetty", sbt.DefaultMavenRepository)
lazy val camelJettyModuleConfig = ModuleConfiguration("org.apache.camel", "camel-jetty", AkkaRepo)
lazy val guiceyFruitModuleConfig = ModuleConfiguration("org.guiceyfruit", GuiceyFruitRepo)
lazy val glassfishModuleConfig = ModuleConfiguration("org.glassfish", GlassfishRepo)
lazy val jbossModuleConfig = ModuleConfiguration("org.jboss", JBossRepo)
@ -99,12 +100,14 @@ class AkkaParentProject(info: ProjectInfo) extends ParentProject(info) with Exec
// Compile
lazy val activemq = "org.apache.activemq" % "activemq-core" % "5.4.2" % "compile" // ApacheV2
lazy val aopalliance = "aopalliance" % "aopalliance" % "1.0" % "compile" //Public domain
lazy val aspectwerkz = "org.codehaus.aspectwerkz" % "aspectwerkz" % "2.2.3" % "compile" //ApacheV2
lazy val beanstalk = "beanstalk" % "beanstalk_client" % "1.4.5" //New BSD
lazy val bookkeeper = "org.apache.hadoop.zookeeper" % "bookkeeper" % ZOOKEEPER_VERSION //ApacheV2
lazy val camel_core = "org.apache.camel" % "camel-core" % CAMEL_VERSION % "compile" //ApacheV2
lazy val camel_jetty = "org.apache.camel" % "camel-jetty" % "2.7.1.1" % "compile"
lazy val camel_jms = "org.apache.camel" % "camel-jms" % CAMEL_VERSION % "compile" //ApacheV2
lazy val commons_codec = "commons-codec" % "commons-codec" % "1.4" % "compile" //ApacheV2
lazy val commons_io = "commons-io" % "commons-io" % "2.0.1" % "compile" //ApacheV2
lazy val javax_servlet_30 = "org.glassfish" % "javax.servlet" % JAVAX_SERVLET_VERSION % "provided" //CDDL v1
@ -134,7 +137,7 @@ class AkkaParentProject(info: ProjectInfo) extends ParentProject(info) with Exec
lazy val slf4j = "org.slf4j" % "slf4j-api" % SLF4J_VERSION // MIT
lazy val spring_beans = "org.springframework" % "spring-beans" % SPRING_VERSION % "compile" //ApacheV2
lazy val spring_context = "org.springframework" % "spring-context" % SPRING_VERSION % "compile" //ApacheV2
lazy val spring_jms = "org.springframework" % "spring-jms" % SPRING_VERSION % "compile" //ApacheV2
lazy val stax_api = "javax.xml.stream" % "stax-api" % "1.0-2" % "compile" //ApacheV2
lazy val logback = "ch.qos.logback" % "logback-classic" % "0.9.28" % "runtime" //MIT
lazy val log4j = "log4j" % "log4j" % "1.2.15" //ApacheV2
@ -604,6 +607,22 @@ class AkkaParentProject(info: ProjectInfo) extends ParentProject(info) with Exec
class AkkaSampleFSMProject(info: ProjectInfo) extends AkkaDefaultProject(info)
class AkkaSampleCamelProject(info: ProjectInfo) extends AkkaDefaultProject(info) {
val activemq = Dependencies.activemq
val camel_jetty = Dependencies.camel_jetty
val camel_jms = Dependencies.camel_jms
val spring_jms = Dependencies.spring_jms
val commons_codec = Dependencies.commons_codec
override def ivyXML = {
<dependencies>
<exclude module="slf4j-api" />
</dependencies>
}
override def testOptions = createTestFilter( _.endsWith("Test"))
}
class AkkaSampleOsgiProject(info: ProjectInfo) extends AkkaDefaultProject(info) with BNDPlugin {
val osgiCore = Dependencies.osgi_core
override protected def bndPrivatePackage = List("sample.osgi.*")
@ -624,6 +643,10 @@ class AkkaParentProject(info: ProjectInfo) extends ParentProject(info) with Exec
lazy val akka_sample_osgi = project("akka-sample-osgi", "akka-sample-osgi",
new AkkaSampleOsgiProject(_), akka_actor)
lazy val akka_sample_camel = project("akka-sample-camel", "akka-sample-camel",
new AkkaSampleCamelProject(_), akka_actor, akka_kernel)
lazy val publishRelease = {
val releaseConfiguration = new DefaultPublishConfiguration(localReleaseRepository, "release", false)
publishTask(publishIvyModule, releaseConfiguration) dependsOn (deliver, publishLocal, makePom)