Merge branch 'master' of git-proxy:jboner/akka
Conflicts: project/build/AkkaProject.scala
This commit is contained in:
commit
2d0e467dbe
15 changed files with 310 additions and 87 deletions
|
|
@ -1,9 +1,10 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package se.scalablesolutions.akka.camel
|
||||
|
||||
import java.util.concurrent.CountDownLatch
|
||||
|
||||
import se.scalablesolutions.akka.actor.Actor._
|
||||
import se.scalablesolutions.akka.actor.{AspectInitRegistry, ActorRegistry}
|
||||
import se.scalablesolutions.akka.util.{Bootable, Logging}
|
||||
|
|
@ -77,6 +78,22 @@ trait CamelService extends Bootable with Logging {
|
|||
* @see onUnload
|
||||
*/
|
||||
def unload = onUnload
|
||||
|
||||
/**
|
||||
* Sets an expectation of the number of upcoming endpoint activations and returns
|
||||
* a {@link CountDownLatch} that can be used to wait for the activations to occur.
|
||||
* Endpoint activations that occurred in the past are not considered.
|
||||
*/
|
||||
def expectEndpointActivationCount(count: Int): CountDownLatch =
|
||||
(consumerPublisher !! SetExpectedRegistrationCount(count)).as[CountDownLatch].get
|
||||
|
||||
/**
|
||||
* Sets an expectation of the number of upcoming endpoint de-activations and returns
|
||||
* a {@link CountDownLatch} that can be used to wait for the de-activations to occur.
|
||||
* Endpoint de-activations that occurred in the past are not considered.
|
||||
*/
|
||||
def expectEndpointDeactivationCount(count: Int): CountDownLatch =
|
||||
(consumerPublisher !! SetExpectedUnregistrationCount(count)).as[CountDownLatch].get
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -65,50 +65,50 @@ private[camel] object ConsumerPublisher extends Logging {
|
|||
* Actor that publishes consumer actors and active object methods at Camel endpoints.
|
||||
* The Camel context used for publishing is CamelContextManager.context. This actor
|
||||
* accepts messages of type
|
||||
* se.scalablesolutions.akka.camel.service.ConsumerRegistered,
|
||||
* se.scalablesolutions.akka.camel.service.ConsumerMethodRegistered and
|
||||
* se.scalablesolutions.akka.camel.service.ConsumerUnregistered.
|
||||
* se.scalablesolutions.akka.camel.ConsumerRegistered,
|
||||
* se.scalablesolutions.akka.camel.ConsumerUnregistered.
|
||||
* se.scalablesolutions.akka.camel.ConsumerMethodRegistered and
|
||||
* se.scalablesolutions.akka.camel.ConsumerMethodUnregistered.
|
||||
*
|
||||
* @author Martin Krasser
|
||||
*/
|
||||
private[camel] class ConsumerPublisher extends Actor {
|
||||
import ConsumerPublisher._
|
||||
|
||||
@volatile private var latch = new CountDownLatch(0)
|
||||
@volatile private var registrationLatch = new CountDownLatch(0)
|
||||
@volatile private var unregistrationLatch = new CountDownLatch(0)
|
||||
|
||||
/**
|
||||
* Adds a route to the actor identified by a Publish message to the global CamelContext.
|
||||
*/
|
||||
protected def receive = {
|
||||
case r: ConsumerRegistered => {
|
||||
handleConsumerRegistered(r)
|
||||
latch.countDown // needed for testing only.
|
||||
registrationLatch.countDown
|
||||
}
|
||||
case u: ConsumerUnregistered => {
|
||||
handleConsumerUnregistered(u)
|
||||
latch.countDown // needed for testing only.
|
||||
unregistrationLatch.countDown
|
||||
}
|
||||
case mr: ConsumerMethodRegistered => {
|
||||
handleConsumerMethodRegistered(mr)
|
||||
latch.countDown // needed for testing only.
|
||||
registrationLatch.countDown
|
||||
}
|
||||
case mu: ConsumerMethodUnregistered => {
|
||||
handleConsumerMethodUnregistered(mu)
|
||||
latch.countDown // needed for testing only.
|
||||
unregistrationLatch.countDown
|
||||
}
|
||||
case SetExpectedMessageCount(num) => {
|
||||
// needed for testing only.
|
||||
latch = new CountDownLatch(num)
|
||||
self.reply(latch)
|
||||
case SetExpectedRegistrationCount(num) => {
|
||||
registrationLatch = new CountDownLatch(num)
|
||||
self.reply(registrationLatch)
|
||||
}
|
||||
case SetExpectedUnregistrationCount(num) => {
|
||||
unregistrationLatch = new CountDownLatch(num)
|
||||
self.reply(unregistrationLatch)
|
||||
}
|
||||
case _ => { /* ignore */}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Command message used For testing-purposes only.
|
||||
*/
|
||||
private[camel] case class SetExpectedMessageCount(num: Int)
|
||||
private[camel] case class SetExpectedRegistrationCount(num: Int)
|
||||
private[camel] case class SetExpectedUnregistrationCount(num: Int)
|
||||
|
||||
/**
|
||||
* Defines an abstract route to a target which is either an actor or an active object method..
|
||||
|
|
|
|||
|
|
@ -27,7 +27,7 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi
|
|||
// count expectations in the next step (needed for testing only).
|
||||
service.consumerPublisher.start
|
||||
// set expectations on publish count
|
||||
val latch = (service.consumerPublisher !! SetExpectedMessageCount(1)).as[CountDownLatch].get
|
||||
val latch = service.expectEndpointActivationCount(1)
|
||||
// start the CamelService
|
||||
service.load
|
||||
// await publication of first test consumer
|
||||
|
|
@ -44,7 +44,7 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi
|
|||
scenario("access non-blocking consumer actors via Camel direct-endpoints") {
|
||||
|
||||
given("two consumer actors registered before and after CamelService startup")
|
||||
val latch = (service.consumerPublisher !! SetExpectedMessageCount(1)).as[CountDownLatch].get
|
||||
val latch = service.expectEndpointActivationCount(1)
|
||||
actorOf(new TestConsumer("direct:publish-test-2")).start
|
||||
assert(latch.await(5000, TimeUnit.MILLISECONDS))
|
||||
|
||||
|
|
@ -60,7 +60,7 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi
|
|||
scenario("access blocking, non-responding consumer actor via a Camel direct-endpoint") {
|
||||
|
||||
given("a consumer actor registered after CamelService startup")
|
||||
val latch = (service.consumerPublisher !! SetExpectedMessageCount(1)).as[CountDownLatch].get
|
||||
val latch = service.expectEndpointActivationCount(1)
|
||||
actorOf(new TestBlocker("direct:publish-test-3")).start
|
||||
assert(latch.await(5000, TimeUnit.MILLISECONDS))
|
||||
|
||||
|
|
@ -84,13 +84,13 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi
|
|||
|
||||
given("a consumer actor registered after CamelService startup")
|
||||
assert(CamelContextManager.context.hasEndpoint(endpointUri) eq null)
|
||||
var latch = (service.consumerPublisher !! SetExpectedMessageCount(1)).as[CountDownLatch].get
|
||||
var latch = service.expectEndpointActivationCount(1)
|
||||
val consumer = actorOf(new TestConsumer(endpointUri)).start
|
||||
assert(latch.await(5000, TimeUnit.MILLISECONDS))
|
||||
assert(CamelContextManager.context.hasEndpoint(endpointUri) ne null)
|
||||
|
||||
when("the actor is stopped")
|
||||
latch = (service.consumerPublisher !! SetExpectedMessageCount(1)).as[CountDownLatch].get
|
||||
latch = service.expectEndpointDeactivationCount(1)
|
||||
consumer.stop
|
||||
assert(latch.await(5000, TimeUnit.MILLISECONDS))
|
||||
|
||||
|
|
@ -121,7 +121,7 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi
|
|||
scenario("access active object methods via Camel direct-endpoints") {
|
||||
|
||||
given("an active object registered after CamelService startup")
|
||||
var latch = (service.consumerPublisher !! SetExpectedMessageCount(3)).as[CountDownLatch].get
|
||||
var latch = service.expectEndpointActivationCount(3)
|
||||
val obj = ActiveObject.newInstance(classOf[PojoBase])
|
||||
assert(latch.await(5000, TimeUnit.MILLISECONDS))
|
||||
|
||||
|
|
@ -136,7 +136,7 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi
|
|||
assert(response3 === "m4base: x y")
|
||||
|
||||
// cleanup to avoid conflicts with next test (i.e. avoid multiple consumers on direct-endpoints)
|
||||
latch = (service.consumerPublisher !! SetExpectedMessageCount(3)).as[CountDownLatch].get
|
||||
latch = service.expectEndpointDeactivationCount(3)
|
||||
ActiveObject.stop(obj)
|
||||
assert(latch.await(5000, TimeUnit.MILLISECONDS))
|
||||
}
|
||||
|
|
@ -144,15 +144,15 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi
|
|||
|
||||
feature("Unpublish active object method from the global CamelContext") {
|
||||
|
||||
scenario("access to unregistered active object methof via Camel direct-endpoint fails") {
|
||||
scenario("access to unregistered active object method via Camel direct-endpoint fails") {
|
||||
|
||||
given("an active object registered after CamelService startup")
|
||||
var latch = (service.consumerPublisher !! SetExpectedMessageCount(3)).as[CountDownLatch].get
|
||||
var latch = service.expectEndpointActivationCount(3)
|
||||
val obj = ActiveObject.newInstance(classOf[PojoBase])
|
||||
assert(latch.await(5000, TimeUnit.MILLISECONDS))
|
||||
|
||||
when("the active object is stopped")
|
||||
latch = (service.consumerPublisher !! SetExpectedMessageCount(3)).as[CountDownLatch].get
|
||||
latch = service.expectEndpointDeactivationCount(3)
|
||||
ActiveObject.stop(obj)
|
||||
assert(latch.await(5000, TimeUnit.MILLISECONDS))
|
||||
|
||||
|
|
|
|||
|
|
@ -45,7 +45,7 @@ class RemoteConsumerTest extends FeatureSpec with BeforeAndAfterAll with GivenWh
|
|||
val consumer = actorOf[RemoteConsumer].start
|
||||
|
||||
when("remote consumer publication is triggered")
|
||||
val latch = (service.consumerPublisher !! SetExpectedMessageCount(1)).as[CountDownLatch].get
|
||||
var latch = service.expectEndpointActivationCount(1)
|
||||
consumer !! "init"
|
||||
assert(latch.await(5000, TimeUnit.MILLISECONDS))
|
||||
|
||||
|
|
@ -61,7 +61,7 @@ class RemoteConsumerTest extends FeatureSpec with BeforeAndAfterAll with GivenWh
|
|||
val consumer = ActiveObject.newRemoteInstance(classOf[PojoRemote], host, port)
|
||||
|
||||
when("remote consumer publication is triggered")
|
||||
val latch = (service.consumerPublisher !! SetExpectedMessageCount(1)).as[CountDownLatch].get
|
||||
var latch = service.expectEndpointActivationCount(1)
|
||||
consumer.foo("init")
|
||||
assert(latch.await(5000, TimeUnit.MILLISECONDS))
|
||||
|
||||
|
|
|
|||
22
akka-karaf/akka-features/src/main/resources/features.xml
Normal file
22
akka-karaf/akka-features/src/main/resources/features.xml
Normal file
|
|
@ -0,0 +1,22 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<features name="akka 0.9">
|
||||
|
||||
<feature name="scala-osgi" version="1.5">
|
||||
<bundle>mvn:com.weiglewilczek.scala-lang-osgi/scala-library/2.8.0.RC2</bundle>
|
||||
<bundle>mvn:org.eclipse.scalamodules/scalamodules-core/2.0-M2</bundle>
|
||||
</feature>
|
||||
|
||||
|
||||
<feature name="sjson" version="0.6-SNAPSHOT">
|
||||
<bundle>mvn:se.scalablesolutions.akka.akka-wrap/dispatch-json_2.8.0.RC3_osgi/0.7.4</bundle>
|
||||
<bundle>mvn:org.objenesis/objenesis/1.2</bundle>
|
||||
<bundle>mvn:sjson.json/sjson/0.6-SNAPSHOT</bundle>
|
||||
</feature>
|
||||
|
||||
<feature name="akka-core" version="0.9-SNAPSHOT">
|
||||
<feature>sjson</feature>
|
||||
<bundle>mvn:se.scalablesolutions.akka.akka-wrap/jgroups-wrapper_2.8.0.RC3_osgi/2.9.0.GA</bundle>
|
||||
<bundle>mvn:org.jboss.netty/netty/3.2.0.CR1</bundle>
|
||||
<bundle>mvn:se.scalablesolutions.akka/akka-core_2.8.0.RC3_osgi/0.9</bundle>
|
||||
</feature>
|
||||
</features>
|
||||
|
|
@ -78,10 +78,8 @@ private[akka] object MongoStorageBackend extends
|
|||
val o = dbo.get(VALUE).asInstanceOf[Map[AnyRef, AnyRef]]
|
||||
o.putAll(m)
|
||||
|
||||
// remove existing reference
|
||||
removeMapStorageFor(name)
|
||||
// and insert
|
||||
coll.insert(new BasicDBObject().append(KEY, name).append(VALUE, o))
|
||||
val newdbo = new BasicDBObject().append(KEY, name).append(VALUE, o)
|
||||
coll.update(new BasicDBObject().append(KEY, name), newdbo, true, false)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -72,7 +72,7 @@ private [akka] object RedisStorageBackend extends
|
|||
// need an explicit definition in akka-conf
|
||||
val nodes = config.getList("akka.storage.redis.cluster")
|
||||
|
||||
val db =
|
||||
def connect() =
|
||||
nodes match {
|
||||
case Seq() =>
|
||||
// no cluster defined
|
||||
|
|
@ -89,6 +89,8 @@ private [akka] object RedisStorageBackend extends
|
|||
}
|
||||
}
|
||||
|
||||
var db = connect()
|
||||
|
||||
/**
|
||||
* Map storage in Redis.
|
||||
* <p/>
|
||||
|
|
@ -411,6 +413,10 @@ private [akka] object RedisStorageBackend extends
|
|||
try {
|
||||
body
|
||||
} catch {
|
||||
case e: RedisConnectionException => {
|
||||
db = connect()
|
||||
body
|
||||
}
|
||||
case e: java.lang.NullPointerException =>
|
||||
throw new StorageException("Could not connect to Redis server")
|
||||
case e =>
|
||||
|
|
|
|||
|
|
@ -117,14 +117,18 @@ class HttpConsumer(producer: ActorRef) extends Actor with Consumer {
|
|||
def endpointUri = "jetty:http://0.0.0.0:8875/"
|
||||
|
||||
protected def receive = {
|
||||
// only keep Exchange.HTTP_PATH message header (which needed by bridge endpoint)
|
||||
case msg: Message => producer forward msg.setHeaders(msg.headers(Set(Exchange.HTTP_PATH)))
|
||||
case msg => producer forward msg
|
||||
}
|
||||
}
|
||||
|
||||
class HttpProducer(transformer: ActorRef) extends Actor with Producer {
|
||||
def endpointUri = "jetty://http://akkasource.org/?bridgeEndpoint=true"
|
||||
|
||||
override protected def receiveBeforeProduce = {
|
||||
// only keep Exchange.HTTP_PATH message header (which needed by bridge endpoint)
|
||||
case msg: Message => msg.setHeaders(msg.headers(Set(Exchange.HTTP_PATH)))
|
||||
}
|
||||
|
||||
override protected def receiveAfterProduce = {
|
||||
// do not reply but forward result to transformer
|
||||
case msg => transformer forward msg
|
||||
|
|
|
|||
|
|
@ -16,16 +16,6 @@ import se.scalablesolutions.akka.config.ScalaConfig._
|
|||
*/
|
||||
class Boot {
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
// Create CamelContext with Spring-based registry and custom route builder
|
||||
// -----------------------------------------------------------------------
|
||||
|
||||
val context = new ClassPathXmlApplicationContext("/context-boot.xml", getClass)
|
||||
val registry = new ApplicationContextRegistry(context)
|
||||
|
||||
CamelContextManager.init(new DefaultCamelContext(registry))
|
||||
CamelContextManager.context.addRoutes(new CustomRouteBuilder)
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
// Basic example
|
||||
// -----------------------------------------------------------------------
|
||||
|
|
@ -41,9 +31,17 @@ class Boot {
|
|||
// Supervise(actorOf[Consumer2], LifeCycle(Permanent)) :: Nil))
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
// Tranformer example
|
||||
// Custom Camel route example
|
||||
// -----------------------------------------------------------------------
|
||||
|
||||
// Create CamelContext and a Spring-based registry
|
||||
val context = new ClassPathXmlApplicationContext("/context-jms.xml", getClass)
|
||||
val registry = new ApplicationContextRegistry(context)
|
||||
|
||||
// Use a custom Camel context and a custom touter builder
|
||||
CamelContextManager.init(new DefaultCamelContext(registry))
|
||||
CamelContextManager.context.addRoutes(new CustomRouteBuilder)
|
||||
|
||||
val producer = actorOf[Producer1]
|
||||
val mediator = actorOf(new Transformer(producer))
|
||||
val consumer = actorOf(new Consumer3(mediator))
|
||||
|
|
@ -52,12 +50,20 @@ class Boot {
|
|||
mediator.start
|
||||
consumer.start
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
// Asynchronous consumer-producer example (Akka homepage transformation)
|
||||
// -----------------------------------------------------------------------
|
||||
|
||||
val httpTransformer = actorOf(new HttpTransformer).start
|
||||
val httpProducer = actorOf(new HttpProducer(httpTransformer)).start
|
||||
val httpConsumer = actorOf(new HttpConsumer(httpProducer)).start
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
// Publish subscribe examples
|
||||
// -----------------------------------------------------------------------
|
||||
|
||||
//
|
||||
// Cometd example commented out because camel-cometd is broken in Camel 2.3
|
||||
// Cometd example commented out because camel-cometd is broken since Camel 2.3
|
||||
//
|
||||
|
||||
//val cometdUri = "cometd://localhost:8111/test/abc?baseResource=file:target"
|
||||
|
|
@ -79,14 +85,6 @@ class Boot {
|
|||
actorOf[Consumer4].start // POSTing "stop" to http://0.0.0.0:8877/camel/stop stops and unpublishes this actor
|
||||
actorOf[Consumer5].start // POSTing any msg to http://0.0.0.0:8877/camel/start starts and published Consumer4 again.
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
// Non-blocking consumer-producer example (Akka homepage transformation)
|
||||
// -----------------------------------------------------------------------
|
||||
|
||||
val nbResponder = actorOf(new HttpTransformer).start
|
||||
val nbProducer = actorOf(new HttpProducer(nbResponder)).start
|
||||
val nbConsumer = actorOf(new HttpConsumer(nbProducer)).start
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
// Active object example
|
||||
// -----------------------------------------------------------------------
|
||||
|
|
|
|||
|
|
@ -5,8 +5,9 @@ import org.apache.camel.builder.RouteBuilder
|
|||
import org.apache.camel.spring.spi.ApplicationContextRegistry
|
||||
import org.springframework.context.support.ClassPathXmlApplicationContext
|
||||
|
||||
import se.scalablesolutions.akka.camel.{CamelService, CamelContextManager}
|
||||
import se.scalablesolutions.akka.actor.{ActorRegistry, ActiveObject}
|
||||
import se.scalablesolutions.akka.actor.{Actor, ActorRegistry, ActiveObject}
|
||||
import se.scalablesolutions.akka.camel._
|
||||
import se.scalablesolutions.akka.util.Logging
|
||||
|
||||
/**
|
||||
* @author Martin Krasser
|
||||
|
|
@ -81,3 +82,43 @@ class StandaloneSpringApplicationRoute extends RouteBuilder {
|
|||
from("direct:test3").to("active-object:pojo3?method=foo")
|
||||
}
|
||||
}
|
||||
|
||||
object StandaloneJmsApplication {
|
||||
def main(args: Array[String]) = {
|
||||
val context = new ClassPathXmlApplicationContext("/context-jms.xml")
|
||||
val registry = new ApplicationContextRegistry(context)
|
||||
|
||||
// Init CamelContextManager with custom CamelContext
|
||||
CamelContextManager.init(new DefaultCamelContext(registry))
|
||||
|
||||
// Create new instance of CamelService and start it
|
||||
val service = CamelService.newInstance.load
|
||||
// Expect two consumer endpoints to be activated
|
||||
val completion = service.expectEndpointActivationCount(2)
|
||||
|
||||
val jmsUri = "jms:topic:test"
|
||||
// Wire publisher and consumer using a JMS topic
|
||||
val jmsSubscriber1 = Actor.actorOf(new Subscriber("jms-subscriber-1", jmsUri)).start
|
||||
val jmsSubscriber2 = Actor.actorOf(new Subscriber("jms-subscriber-2", jmsUri)).start
|
||||
val jmsPublisher = Actor.actorOf(new Publisher("jms-publisher", jmsUri)).start
|
||||
|
||||
// wait for the consumer (subscriber) endpoint being activated
|
||||
completion.await
|
||||
|
||||
// Send 10 messages to via publisher actor
|
||||
for(i <- 1 to 10) {
|
||||
jmsPublisher ! ("Akka rocks (%d)" format i)
|
||||
}
|
||||
|
||||
// Send 10 messages to JMS topic directly
|
||||
for(i <- 1 to 10) {
|
||||
CamelContextManager.template.sendBody(jmsUri, "Camel rocks (%d)" format i)
|
||||
}
|
||||
|
||||
// Graceful shutdown of all endpoints/routes
|
||||
service.unload
|
||||
|
||||
// Shutdown example actors
|
||||
ActorRegistry.shutdownAll
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,104 @@
|
|||
package sample.camel
|
||||
|
||||
import collection.mutable.Set
|
||||
|
||||
import java.util.concurrent.CountDownLatch
|
||||
|
||||
import org.junit._
|
||||
|
||||
import se.scalablesolutions.akka.actor.Actor._
|
||||
import se.scalablesolutions.akka.actor.{ActorRegistry, ActorRef, Actor}
|
||||
import se.scalablesolutions.akka.camel.{CamelService, Message, Producer, Consumer}
|
||||
import se.scalablesolutions.akka.routing.CyclicIterator
|
||||
import se.scalablesolutions.akka.routing.Routing._
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
|
||||
/**
|
||||
* @author Martin Krasser
|
||||
*/
|
||||
@Ignore
|
||||
class HttpConcurrencyTest extends JUnitSuite {
|
||||
import HttpConcurrencyTest._
|
||||
|
||||
@Test def shouldProcessMessagesConcurrently = {
|
||||
val num = 50
|
||||
val latch1 = new CountDownLatch(num)
|
||||
val latch2 = new CountDownLatch(num)
|
||||
val latch3 = new CountDownLatch(num)
|
||||
val client1 = actorOf(new HttpClientActor("client1", latch1)).start
|
||||
val client2 = actorOf(new HttpClientActor("client2", latch2)).start
|
||||
val client3 = actorOf(new HttpClientActor("client3", latch3)).start
|
||||
for (i <- 1 to num) {
|
||||
client1 ! Message("client1", Map(Message.MessageExchangeId -> i))
|
||||
client2 ! Message("client2", Map(Message.MessageExchangeId -> i))
|
||||
client3 ! Message("client3", Map(Message.MessageExchangeId -> i))
|
||||
}
|
||||
latch1.await
|
||||
latch2.await
|
||||
latch3.await
|
||||
assert(num == (client1 !! "getCorrelationIdCount").as[Int].get)
|
||||
assert(num == (client2 !! "getCorrelationIdCount").as[Int].get)
|
||||
assert(num == (client3 !! "getCorrelationIdCount").as[Int].get)
|
||||
}
|
||||
}
|
||||
|
||||
object HttpConcurrencyTest {
|
||||
var service: CamelService = _
|
||||
|
||||
@BeforeClass
|
||||
def beforeClass = {
|
||||
service = CamelService.newInstance.load
|
||||
|
||||
val workers = for (i <- 1 to 8) yield actorOf[HttpServerWorker].start
|
||||
val balancer = loadBalancerActor(new CyclicIterator(workers.toList))
|
||||
|
||||
val completion = service.expectEndpointActivationCount(1)
|
||||
val server = actorOf(new HttpServerActor(balancer)).start
|
||||
completion.await
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
def afterClass = {
|
||||
service.unload
|
||||
ActorRegistry.shutdownAll
|
||||
}
|
||||
|
||||
class HttpClientActor(label: String, latch: CountDownLatch) extends Actor with Producer {
|
||||
def endpointUri = "jetty:http://0.0.0.0:8855/echo"
|
||||
var correlationIds = Set[Any]()
|
||||
|
||||
override protected def receive = {
|
||||
case "getCorrelationIdCount" => self.reply(correlationIds.size)
|
||||
case msg => super.receive(msg)
|
||||
}
|
||||
|
||||
override protected def receiveAfterProduce = {
|
||||
case msg: Message => {
|
||||
val corr = msg.headers(Message.MessageExchangeId)
|
||||
val body = msg.bodyAs[String]
|
||||
correlationIds += corr
|
||||
assert(label == body)
|
||||
latch.countDown
|
||||
print(".")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class HttpServerActor(balancer: ActorRef) extends Actor with Consumer {
|
||||
def endpointUri = "jetty:http://0.0.0.0:8855/echo"
|
||||
|
||||
def receive = {
|
||||
case msg => balancer forward msg
|
||||
}
|
||||
}
|
||||
|
||||
class HttpServerWorker extends Actor {
|
||||
protected def receive = {
|
||||
case msg => {
|
||||
// slow processing
|
||||
Thread.sleep(100)
|
||||
self.reply(msg)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,33 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package se.scalablesolutions.akka
|
||||
package sample.osgi
|
||||
|
||||
import actor.{ Actor, ActorRegistry }
|
||||
import actor.Actor._
|
||||
|
||||
import org.osgi.framework.{ BundleActivator, BundleContext }
|
||||
|
||||
class Activator extends BundleActivator {
|
||||
|
||||
def start(context: BundleContext) {
|
||||
println("Starting the OSGi example ...")
|
||||
val echo = actorOf[EchoActor].start
|
||||
val answer = (echo !! "OSGi example")
|
||||
println(answer getOrElse "No answer!")
|
||||
}
|
||||
|
||||
def stop(context: BundleContext) {
|
||||
ActorRegistry.shutdownAll()
|
||||
println("Stopped the OSGi example.")
|
||||
}
|
||||
}
|
||||
|
||||
class EchoActor extends Actor {
|
||||
|
||||
override def receive = {
|
||||
case x => self reply x
|
||||
}
|
||||
}
|
||||
Binary file not shown.
|
|
@ -38,17 +38,17 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
|
|||
// -------------------------------------------------------------------------------------------------------------------
|
||||
// All repositories *must* go here! See ModuleConigurations below.
|
||||
// -------------------------------------------------------------------------------------------------------------------
|
||||
// object Repositories {
|
||||
// lazy val AkkaRepo = MavenRepository("Akka Repository", "http://scalablesolutions.se/akka/repository")
|
||||
// lazy val CodehausSnapshotRepo = MavenRepository("Codehaus Snapshots", "http://snapshots.repository.codehaus.org")
|
||||
object Repositories {
|
||||
lazy val AkkaRepo = MavenRepository("Akka Repository", "http://scalablesolutions.se/akka/repository")
|
||||
lazy val CodehausSnapshotRepo = MavenRepository("Codehaus Snapshots", "http://snapshots.repository.codehaus.org")
|
||||
lazy val EmbeddedRepo = MavenRepository("Embedded Repo", (info.projectPath / "embedded-repo").asURL.toString)
|
||||
// lazy val GuiceyFruitRepo = MavenRepository("GuiceyFruit Repo", "http://guiceyfruit.googlecode.com/svn/repo/releases/")
|
||||
// lazy val JBossRepo = MavenRepository("JBoss Repo", "https://repository.jboss.org/nexus/content/groups/public/")
|
||||
// lazy val JavaNetRepo = MavenRepository("java.net Repo", "http://download.java.net/maven/2")
|
||||
// lazy val SonatypeSnapshotRepo = MavenRepository("Sonatype OSS Repo", "http://oss.sonatype.org/content/repositories/releases")
|
||||
// lazy val SunJDMKRepo = MavenRepository("Sun JDMK Repo", "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo")
|
||||
// lazy val ScalazRepo = MavenRepository("Scalaz Repo", "http://scala-tools.org/repo-releases")
|
||||
// }
|
||||
lazy val GuiceyFruitRepo = MavenRepository("GuiceyFruit Repo", "http://guiceyfruit.googlecode.com/svn/repo/releases/")
|
||||
lazy val JBossRepo = MavenRepository("JBoss Repo", "https://repository.jboss.org/nexus/content/groups/public/")
|
||||
lazy val JavaNetRepo = MavenRepository("java.net Repo", "http://download.java.net/maven/2")
|
||||
lazy val SonatypeSnapshotRepo = MavenRepository("Sonatype OSS Repo", "http://oss.sonatype.org/content/repositories/releases")
|
||||
lazy val SunJDMKRepo = MavenRepository("Sun JDMK Repo", "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo")
|
||||
lazy val ScalazRepo = MavenRepository("Scalaz Repo", "http://scala-tools.org/repo-releases")
|
||||
}
|
||||
|
||||
val mavenLocal = "Local Maven Repository" at "file:/e:/maven-repository"
|
||||
|
||||
|
|
@ -61,22 +61,22 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
|
|||
// must be resolved from a ModuleConfiguration. This will result in a significant acceleration of the update action.
|
||||
// Therefore, if repositories are defined, this must happen as def, not as val.
|
||||
// -------------------------------------------------------------------------------------------------------------------
|
||||
// import Repositories._
|
||||
// lazy val atmosphereModuleConfig = ModuleConfiguration("org.atmosphere", SonatypeSnapshotRepo)
|
||||
// lazy val grizzlyModuleConfig = ModuleConfiguration("com.sun.grizzly", JavaNetRepo)
|
||||
// lazy val guiceyFruitModuleConfig = ModuleConfiguration("org.guiceyfruit", GuiceyFruitRepo)
|
||||
// lazy val jbossModuleConfig = ModuleConfiguration("org.jboss", JBossRepo)
|
||||
// lazy val jdmkModuleConfig = ModuleConfiguration("com.sun.jdmk", SunJDMKRepo)
|
||||
// lazy val jerseyContrModuleConfig = ModuleConfiguration("com.sun.jersey.contribs", JavaNetRepo)
|
||||
// lazy val jerseyModuleConfig = ModuleConfiguration("com.sun.jersey", JavaNetRepo)
|
||||
// lazy val jgroupsModuleConfig = ModuleConfiguration("jgroups", JBossRepo)
|
||||
// lazy val jmsModuleConfig = ModuleConfiguration("javax.jms", SunJDMKRepo)
|
||||
// lazy val jmxModuleConfig = ModuleConfiguration("com.sun.jmx", SunJDMKRepo)
|
||||
// lazy val liftModuleConfig = ModuleConfiguration("net.liftweb", ScalaToolsSnapshots)
|
||||
// lazy val multiverseModuleConfig = ModuleConfiguration("org.multiverse", CodehausSnapshotRepo)
|
||||
// lazy val nettyModuleConfig = ModuleConfiguration("org.jboss.netty", JBossRepo)
|
||||
// lazy val scalazModuleConfig = ModuleConfiguration("scalaz", ScalazRepo)
|
||||
// lazy val scalaTestModuleConfig = ModuleConfiguration("org.scalatest", ScalaToolsSnapshots)
|
||||
import Repositories._
|
||||
lazy val atmosphereModuleConfig = ModuleConfiguration("org.atmosphere", SonatypeSnapshotRepo)
|
||||
lazy val grizzlyModuleConfig = ModuleConfiguration("com.sun.grizzly", JavaNetRepo)
|
||||
lazy val guiceyFruitModuleConfig = ModuleConfiguration("org.guiceyfruit", GuiceyFruitRepo)
|
||||
lazy val jbossModuleConfig = ModuleConfiguration("org.jboss", JBossRepo)
|
||||
lazy val jdmkModuleConfig = ModuleConfiguration("com.sun.jdmk", SunJDMKRepo)
|
||||
lazy val jerseyContrModuleConfig = ModuleConfiguration("com.sun.jersey.contribs", JavaNetRepo)
|
||||
lazy val jerseyModuleConfig = ModuleConfiguration("com.sun.jersey", JavaNetRepo)
|
||||
lazy val jgroupsModuleConfig = ModuleConfiguration("jgroups", JBossRepo)
|
||||
lazy val jmsModuleConfig = ModuleConfiguration("javax.jms", SunJDMKRepo)
|
||||
lazy val jmxModuleConfig = ModuleConfiguration("com.sun.jmx", SunJDMKRepo)
|
||||
lazy val liftModuleConfig = ModuleConfiguration("net.liftweb", ScalaToolsSnapshots)
|
||||
lazy val multiverseModuleConfig = ModuleConfiguration("org.multiverse", CodehausSnapshotRepo)
|
||||
lazy val nettyModuleConfig = ModuleConfiguration("org.jboss.netty", JBossRepo)
|
||||
lazy val scalazModuleConfig = ModuleConfiguration("scalaz", ScalazRepo)
|
||||
lazy val scalaTestModuleConfig = ModuleConfiguration("org.scalatest", ScalaToolsSnapshots)
|
||||
lazy val embeddedRepo = EmbeddedRepo // This is the only exception, because the embedded repo is fast!
|
||||
|
||||
// -------------------------------------------------------------------------------------------------------------------
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue