diff --git a/akka-camel/src/main/scala/component/ActorComponent.scala b/akka-camel/src/main/scala/component/ActorComponent.scala index 2fa116926c..5788fd9028 100644 --- a/akka-camel/src/main/scala/component/ActorComponent.scala +++ b/akka-camel/src/main/scala/component/ActorComponent.scala @@ -122,8 +122,8 @@ class ActorProducer(val ep: ActorEndpoint) extends DefaultProducer(ep) { case Some(msg: Failure) => exchange.fromFailureMessage(msg) case Some(msg) => exchange.fromResponseMessage(Message.canonicalize(msg)) case None => { - throw new TimeoutException("communication with %s timed out after %d ms" - format (ep.getEndpointUri, actor.timeout)) + throw new TimeoutException("timeout (%d ms) while waiting response from %s" + format (actor.timeout, ep.getEndpointUri)) } } } diff --git a/akka-camel/src/main/scala/service/CamelService.scala b/akka-camel/src/main/scala/service/CamelService.scala index 0b77e56796..0f61f2c0f3 100644 --- a/akka-camel/src/main/scala/service/CamelService.scala +++ b/akka-camel/src/main/scala/service/CamelService.scala @@ -4,23 +4,14 @@ package se.scalablesolutions.akka.camel.service -import java.io.InputStream - -import org.apache.camel.builder.RouteBuilder - -import se.scalablesolutions.akka.actor.{Actor, ActorRegistry} -import se.scalablesolutions.akka.annotation.consume +import se.scalablesolutions.akka.actor.ActorRegistry +import se.scalablesolutions.akka.camel.CamelContextManager import se.scalablesolutions.akka.util.{Bootable, Logging} -import se.scalablesolutions.akka.camel.{CamelContextManager, Consumer} /** - * Started by the Kernel to expose certain actors as Camel endpoints. It uses - * se.scalablesolutions.akka.camel.CamelContextManage to create and manage the - * lifecycle of a global CamelContext. This class further uses the - * se.scalablesolutions.akka.camel.service.CamelServiceRouteBuilder to implement - * routes from Camel endpoints to actors. - * - * @see CamelRouteBuilder + * Used by applications (and the Kernel) to publish consumer actors via Camel + * endpoints and to manage the life cycle of a a global CamelContext which can + * be accessed via se.scalablesolutions.akka.camel.CamelContextManager. * * @author Martin Krasser */ @@ -28,28 +19,63 @@ trait CamelService extends Bootable with Logging { import CamelContextManager._ + private[camel] val consumerPublisher = new ConsumerPublisher + private[camel] val publishRequestor = new PublishRequestor(consumerPublisher) + + /** + * Starts the CamelService. Any started actor that is a consumer actor will be (asynchronously) + * published as Camel endpoint. Consumer actors that are started after this method returned will + * be published as well. Actor publishing is done asynchronously. + */ abstract override def onLoad = { super.onLoad + + // Only init and start if not already done by application if (!initialized) init() - context.addRoutes(new CamelServiceRouteBuilder) + if (!started) start() + + // Camel should cache input streams context.setStreamCaching(true) - start() + + // start actor that exposes consumer actors via Camel endpoints + consumerPublisher.start + + // add listener for actor registration events + ActorRegistry.addRegistrationListener(publishRequestor.start) + + // publish already registered consumer actors + for (publish <- Publish.forConsumers(ActorRegistry.actors)) consumerPublisher.!(publish)(None) } + /** + * Stops the CamelService. + */ abstract override def onUnload = { + ActorRegistry.removeRegistrationListener(publishRequestor) + publishRequestor.stop + consumerPublisher.stop stop() super.onUnload } + /** + * Starts the CamelService. + * + * @see onLoad + */ def load = onLoad + /** + * Stops the CamelService. + * + * @see onUnload + */ def unload = onUnload - } /** * CamelService companion object used by standalone applications to create their own - * CamelService instances. + * CamelService instance. * * @author Martin Krasser */ @@ -59,55 +85,4 @@ object CamelService { * Creates a new CamelService instance. */ def newInstance: CamelService = new CamelService {} - -} - -/** - * Implements routes from Camel endpoints to actors. It searches the registry for actors - * that are either annotated with @se.scalablesolutions.akka.annotation.consume or mix in - * se.scalablesolutions.akka.camel.Consumer and exposes them as Camel endpoints. - * - * @author Martin Krasser - */ -class CamelServiceRouteBuilder extends RouteBuilder with Logging { - - def configure = { - val actors = ActorRegistry.actors - - // TODO: avoid redundant registrations - actors.filter(isConsumeAnnotated _).foreach { actor: Actor => - val fromUri = actor.getClass.getAnnotation(classOf[consume]).value() - configure(fromUri, "actor:id:%s" format actor.getId) - log.debug("registered actor (id=%s) for consuming messages from %s " - format (actor.getId, fromUri)) - } - - // TODO: avoid redundant registrations - actors.filter(isConsumerInstance _).foreach { actor: Actor => - val fromUri = actor.asInstanceOf[Consumer].endpointUri - configure(fromUri, "actor:uuid:%s" format actor.uuid) - log.debug("registered actor (uuid=%s) for consuming messages from %s " - format (actor.uuid, fromUri)) - } - } - - private def configure(fromUri: String, toUri: String) { - val schema = fromUri take fromUri.indexOf(":") // e.g. "http" from "http://whatever/..." - bodyConversions.get(schema) match { - case Some(clazz) => from(fromUri).convertBodyTo(clazz).to(toUri) - case None => from(fromUri).to(toUri) - } - } - - // TODO: make conversions configurable - private def bodyConversions = Map( - "file" -> classOf[InputStream] - ) - - private def isConsumeAnnotated(actor: Actor) = - actor.getClass.getAnnotation(classOf[consume]) ne null - - private def isConsumerInstance(actor: Actor) = - actor.isInstanceOf[Consumer] - } diff --git a/akka-camel/src/main/scala/service/ConsumerPublisher.scala b/akka-camel/src/main/scala/service/ConsumerPublisher.scala new file mode 100644 index 0000000000..4e5adea0b8 --- /dev/null +++ b/akka-camel/src/main/scala/service/ConsumerPublisher.scala @@ -0,0 +1,145 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ +package se.scalablesolutions.akka.camel.service + +import java.io.InputStream +import java.util.concurrent.CountDownLatch + +import org.apache.camel.builder.RouteBuilder + +import se.scalablesolutions.akka.actor.{ActorUnregistered, ActorRegistered, Actor} +import se.scalablesolutions.akka.annotation.consume +import se.scalablesolutions.akka.camel.{Consumer, CamelContextManager} +import se.scalablesolutions.akka.util.Logging + +/** + * Actor that publishes consumer actors as Camel endpoints at the CamelContext managed + * by se.scalablesolutions.akka.camel.CamelContextManager. It accepts messages of type + * se.scalablesolutions.akka.camel.service.Publish. + * + * @author Martin Krasser + */ +class ConsumerPublisher extends Actor with Logging { + @volatile private var latch = new CountDownLatch(0) + + /** + * Adds a route to the actor identified by a Publish message to the global CamelContext. + */ + protected def receive = { + case p: Publish => publish(new ConsumerRoute(p.endpointUri, p.id, p.uuid)) + case _ => { /* ignore */} + } + + /** + * Sets the number of expected Publish messages received by this actor. Used for testing + * only. + */ + private[camel] def expectPublishCount(count: Int) { + latch = new CountDownLatch(count) + } + + /** + * Waits for the number of expected Publish messages to arrive. Used for testing only. + */ + private[camel] def awaitPublish = latch.await + + private def publish(route: ConsumerRoute) { + CamelContextManager.context.addRoutes(route) + log.info("published actor via endpoint %s" format route.endpointUri) + latch.countDown // needed for testing only. + } +} + +/** + * Defines the route to a consumer actor. + * + * @param endpointUri endpoint URI of the consumer actor + * @param id actor identifier + * @param uuid true if id refers to Actor.uuid, false if + * id refers to Acotr.getId. + * + * @author Martin Krasser + */ +class ConsumerRoute(val endpointUri: String, id: String, uuid: Boolean) extends RouteBuilder { + // TODO: make conversions configurable + private val bodyConversions = Map( + "file" -> classOf[InputStream] + ) + + def configure = { + val schema = endpointUri take endpointUri.indexOf(":") // e.g. "http" from "http://whatever/..." + bodyConversions.get(schema) match { + case Some(clazz) => from(endpointUri).convertBodyTo(clazz).to(actorUri) + case None => from(endpointUri).to(actorUri) + } + } + + private def actorUri = (if (uuid) "actor:uuid:%s" else "actor:id:%s") format id +} + +/** + * A registration listener that publishes consumer actors (and ignores other actors). + * + * @author Martin Krasser + */ +class PublishRequestor(consumerPublisher: Actor) extends Actor { + protected def receive = { + case ActorUnregistered(actor) => { /* ignore */ } + case ActorRegistered(actor) => Publish.forConsumer(actor) match { + case Some(publish) => consumerPublisher ! publish + case None => { /* ignore */ } + } + } +} + +/** + * Request message for publishing a consumer actor. + * + * @param endpointUri endpoint URI of the consumer actor + * @param id actor identifier + * @param uuid true if id refers to Actor.uuid, false if + * id refers to Acotr.getId. + * + * @author Martin Krasser + */ +case class Publish(endpointUri: String, id: String, uuid: Boolean) + +/** + * @author Martin Krasser + */ +object Publish { + /** + * Creates a list of Publish request messages for all consumer actors in the actors + * list. + */ + def forConsumers(actors: List[Actor]): List[Publish] = { + for (actor <- actors; pub = forConsumer(actor); if pub.isDefined) yield pub.get + } + + /** + * Creates a Publish request message if actor is a consumer actor. + */ + def forConsumer(actor: Actor): Option[Publish] = { + forConsumeAnnotated(actor) orElse forConsumerType(actor) + } + + private def forConsumeAnnotated(actor: Actor): Option[Publish] = { + val annotation = actor.getClass.getAnnotation(classOf[consume]) + if (annotation eq null) + None + else if (actor._remoteAddress.isDefined) + None // do not publish proxies + else + Some(Publish(annotation.value, actor.getId, false)) + } + + private def forConsumerType(actor: Actor): Option[Publish] = { + if (!actor.isInstanceOf[Consumer]) + None + else if (actor._remoteAddress.isDefined) + None + else + Some(Publish(actor.asInstanceOf[Consumer].endpointUri, actor.uuid, true)) + } +} diff --git a/akka-camel/src/test/scala/ProducerTest.scala b/akka-camel/src/test/scala/ProducerTest.scala index 1a69316836..268c8c6a6a 100644 --- a/akka-camel/src/test/scala/ProducerTest.scala +++ b/akka-camel/src/test/scala/ProducerTest.scala @@ -33,6 +33,7 @@ class ProducerTest extends JUnitSuite { // // TODO: test replies to messages sent with ! (bang) + // TODO: test copying of custom message headers // @Test def shouldProduceMessageSyncAndReceiveResponse = { diff --git a/akka-camel/src/test/scala/service/CamelServiceTest.scala b/akka-camel/src/test/scala/service/CamelServiceTest.scala index 58ccc76273..8fafea4687 100644 --- a/akka-camel/src/test/scala/service/CamelServiceTest.scala +++ b/akka-camel/src/test/scala/service/CamelServiceTest.scala @@ -2,12 +2,12 @@ package se.scalablesolutions.akka.camel.service import org.apache.camel.builder.RouteBuilder import org.junit.Assert._ -import org.junit.{Before, After, Test} import org.scalatest.junit.JUnitSuite import se.scalablesolutions.akka.actor.Actor import se.scalablesolutions.akka.annotation.consume import se.scalablesolutions.akka.camel.{CamelContextManager, Consumer, Message} +import org.junit.{Ignore, Before, After, Test} class CamelServiceTest extends JUnitSuite with CamelService { @@ -23,26 +23,40 @@ class CamelServiceTest extends JUnitSuite with CamelService { var actor3: Actor = _ @Before def setUp = { + // register actors before starting the CamelService actor1 = new TestActor1().start actor2 = new TestActor2().start actor3 = new TestActor3().start - init() + // initialize global CamelContext + init + // customize global CamelContext context.addRoutes(new TestRouteBuilder) - onLoad + consumerPublisher.expectPublishCount(2) + load + consumerPublisher.awaitPublish } @After def tearDown = { - onUnload + unload actor1.stop actor2.stop actor3.stop } - @Test def shouldReceiveResponseViaGeneratedRoute = { + @Test def shouldReceiveResponseViaPreStartGeneratedRoutes = { assertEquals("Hello Martin (actor1)", template.requestBody("direct:actor1", "Martin")) assertEquals("Hello Martin (actor2)", template.requestBody("direct:actor2", "Martin")) } + @Test def shouldReceiveResponseViaPostStartGeneratedRoute = { + consumerPublisher.expectPublishCount(1) + // register actor after starting CamelService + val actor4 = new TestActor4().start + consumerPublisher.awaitPublish + assertEquals("Hello Martin (actor4)", template.requestBody("direct:actor4", "Martin")) + actor4.stop + } + @Test def shouldReceiveResponseViaCustomRoute = { assertEquals("Hello Tester (actor3)", template.requestBody("direct:actor3", "Martin")) } @@ -55,7 +69,6 @@ class TestActor1 extends Actor with Consumer { protected def receive = { case msg: Message => reply("Hello %s (actor1)" format msg.body) } - } @consume("direct:actor2") @@ -73,6 +86,14 @@ class TestActor3 extends Actor { } } +class TestActor4 extends Actor with Consumer { + def endpointUri = "direct:actor4" + + protected def receive = { + case msg: Message => reply("Hello %s (actor4)" format msg.body) + } +} + class TestRouteBuilder extends RouteBuilder { def configure { val actorUri = "actor:%s" format classOf[TestActor3].getName diff --git a/akka-core/src/main/scala/actor/ActorRegistry.scala b/akka-core/src/main/scala/actor/ActorRegistry.scala index 9e0b1cba08..6db4d0375a 100644 --- a/akka-core/src/main/scala/actor/ActorRegistry.scala +++ b/akka-core/src/main/scala/actor/ActorRegistry.scala @@ -8,8 +8,7 @@ import se.scalablesolutions.akka.util.Logging import scala.collection.mutable.ListBuffer import scala.reflect.Manifest - -import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap} /** * Registry holding all Actor instances in the whole system. @@ -23,9 +22,10 @@ import java.util.concurrent.ConcurrentHashMap * @author Jonas Bonér */ object ActorRegistry extends Logging { - private val actorsByUUID = new ConcurrentHashMap[String, Actor] - private val actorsById = new ConcurrentHashMap[String, List[Actor]] - private val actorsByClassName = new ConcurrentHashMap[String, List[Actor]] + private val actorsByUUID = new ConcurrentHashMap[String, Actor] + private val actorsById = new ConcurrentHashMap[String, List[Actor]] + private val actorsByClassName = new ConcurrentHashMap[String, List[Actor]] + private val registrationListeners = new CopyOnWriteArrayList[Actor] /** * Returns all actors in the system. @@ -103,6 +103,9 @@ object ActorRegistry extends Logging { if (actorsByClassName.containsKey(className)) { actorsByClassName.put(className, actor :: actorsByClassName.get(className)) } else actorsByClassName.put(className, actor :: Nil) + + // notify listeners + foreachListener(_.!(ActorRegistered(actor))(None)) } /** @@ -112,6 +115,8 @@ object ActorRegistry extends Logging { actorsByUUID remove actor.uuid actorsById remove actor.getId actorsByClassName remove actor.getClass.getName + // notify listeners + foreachListener(_.!(ActorUnregistered(actor))(None)) } /** @@ -125,4 +130,26 @@ object ActorRegistry extends Logging { actorsByClassName.clear log.info("All actors have been shut down and unregistered from ActorRegistry") } + + /** + * Adds the registration listener this this registry's listener list. + */ + def addRegistrationListener(listener: Actor) = { + registrationListeners.add(listener) + } + + /** + * Removes the registration listener this this registry's listener list. + */ + def removeRegistrationListener(listener: Actor) = { + registrationListeners.remove(listener) + } + + private def foreachListener(f: (Actor) => Unit) { + val iterator = registrationListeners.iterator + while (iterator.hasNext) f(iterator.next) + } } + +case class ActorRegistered(actor: Actor) +case class ActorUnregistered(actor: Actor) \ No newline at end of file diff --git a/akka-samples/akka-sample-camel/src/main/scala/Actors.scala b/akka-samples/akka-sample-camel/src/main/scala/Actors.scala new file mode 100644 index 0000000000..51c3940991 --- /dev/null +++ b/akka-samples/akka-sample-camel/src/main/scala/Actors.scala @@ -0,0 +1,57 @@ +package sample.camel + +import se.scalablesolutions.akka.actor.{Actor, RemoteActor} +import se.scalablesolutions.akka.annotation.consume +import se.scalablesolutions.akka.camel.{Message, Consumer} +import se.scalablesolutions.akka.util.Logging + +/** + * Client-initiated remote actor. + */ +class RemoteActor1 extends RemoteActor("localhost", 7777) with Consumer { + def endpointUri = "jetty:http://localhost:6644/remote1" + + protected def receive = { + case msg => reply("response from remote actor 1") + } +} + +/** + * Server-initiated remote actor. + */ +class RemoteActor2 extends Actor with Consumer { + def endpointUri = "jetty:http://localhost:6644/remote2" + + protected def receive = { + case msg => reply("response from remote actor 2") + } +} + +class Consumer1 extends Actor with Consumer with Logging { + def endpointUri = "file:data/input" + + def receive = { + case msg: Message => log.info("received %s" format msg.bodyAs(classOf[String])) + } +} + +@consume("jetty:http://0.0.0.0:8877/camel/test1") +class Consumer2 extends Actor { + def receive = { + case msg: Message => reply("Hello %s" format msg.bodyAs(classOf[String])) + } +} + +class Consumer3(transformer: Actor) extends Actor with Consumer { + def endpointUri = "jetty:http://0.0.0.0:8877/camel/welcome" + + def receive = { + case msg: Message => transformer.forward(msg.setBodyAs(classOf[String])) + } +} + +class Transformer(producer: Actor) extends Actor { + protected def receive = { + case msg: Message => producer.forward(msg.transformBody[String]("- %s -" format _)) + } +} \ No newline at end of file diff --git a/akka-samples/akka-sample-camel/src/main/scala/Application1.scala b/akka-samples/akka-sample-camel/src/main/scala/Application1.scala new file mode 100644 index 0000000000..5b708bfac5 --- /dev/null +++ b/akka-samples/akka-sample-camel/src/main/scala/Application1.scala @@ -0,0 +1,26 @@ +package sample.camel + +import se.scalablesolutions.akka.actor.Actor +import se.scalablesolutions.akka.remote.RemoteClient +/** + * @author Martin Krasser + */ +object Application1 { + + // + // TODO: completion of example + // + + def main(args: Array[String]) { + implicit val sender: Option[Actor] = None + + val actor1 = new RemoteActor1 + val actor2 = RemoteClient.actorFor("remote2", "localhost", 7777) + + actor1.start + + actor1 ! "hello" + actor2 ! "hello" + } + +} \ No newline at end of file diff --git a/akka-samples/akka-sample-camel/src/main/scala/Application2.scala b/akka-samples/akka-sample-camel/src/main/scala/Application2.scala new file mode 100644 index 0000000000..83c6e8c439 --- /dev/null +++ b/akka-samples/akka-sample-camel/src/main/scala/Application2.scala @@ -0,0 +1,22 @@ +package sample.camel + +import se.scalablesolutions.akka.camel.service.CamelService +import se.scalablesolutions.akka.remote.RemoteNode + +/** + * @author Martin Krasser + */ +object Application2 { + + // + // TODO: completion of example + // + + def main(args: Array[String]) { + val camelService = CamelService.newInstance + camelService.load + RemoteNode.start("localhost", 7777) + RemoteNode.register("remote2", new RemoteActor2().start) + } + +} \ No newline at end of file diff --git a/akka-samples/akka-sample-camel/src/main/scala/Boot.scala b/akka-samples/akka-sample-camel/src/main/scala/Boot.scala index b23c99dafa..81af9775cb 100644 --- a/akka-samples/akka-sample-camel/src/main/scala/Boot.scala +++ b/akka-samples/akka-sample-camel/src/main/scala/Boot.scala @@ -33,7 +33,6 @@ class Boot { } class CustomRouteBuilder extends RouteBuilder { - def configure { val actorUri = "actor:%s" format classOf[Consumer2].getName from("jetty:http://0.0.0.0:8877/camel/test2").to(actorUri) @@ -42,7 +41,5 @@ class CustomRouteBuilder extends RouteBuilder { exchange.getOut.setBody("Welcome %s" format exchange.getIn.getBody) } }) - } - } \ No newline at end of file diff --git a/akka-samples/akka-sample-camel/src/main/scala/Consumer1.scala b/akka-samples/akka-sample-camel/src/main/scala/Consumer1.scala deleted file mode 100644 index b292d6e186..0000000000 --- a/akka-samples/akka-sample-camel/src/main/scala/Consumer1.scala +++ /dev/null @@ -1,18 +0,0 @@ -package sample.camel - -import se.scalablesolutions.akka.util.Logging -import se.scalablesolutions.akka.actor.Actor -import se.scalablesolutions.akka.camel.{Message, Consumer} - -/** - * @author Martin Krasser - */ -class Consumer1 extends Actor with Consumer with Logging { - - def endpointUri = "file:data/input" - - def receive = { - case msg: Message => log.info("received %s" format msg.bodyAs(classOf[String])) - } - -} \ No newline at end of file diff --git a/akka-samples/akka-sample-camel/src/main/scala/Consumer2.scala b/akka-samples/akka-sample-camel/src/main/scala/Consumer2.scala deleted file mode 100644 index 4940c46f0d..0000000000 --- a/akka-samples/akka-sample-camel/src/main/scala/Consumer2.scala +++ /dev/null @@ -1,17 +0,0 @@ -package sample.camel - -import se.scalablesolutions.akka.actor.Actor -import se.scalablesolutions.akka.annotation.consume -import se.scalablesolutions.akka.camel.Message - -/** - * @author Martin Krasser - */ -@consume("jetty:http://0.0.0.0:8877/camel/test1") -class Consumer2 extends Actor { - - def receive = { - case msg: Message => reply("Hello %s" format msg.bodyAs(classOf[String])) - } - -} \ No newline at end of file diff --git a/akka-samples/akka-sample-camel/src/main/scala/Consumer3.scala b/akka-samples/akka-sample-camel/src/main/scala/Consumer3.scala deleted file mode 100644 index 39cf1f0652..0000000000 --- a/akka-samples/akka-sample-camel/src/main/scala/Consumer3.scala +++ /dev/null @@ -1,17 +0,0 @@ -package sample.camel - -import se.scalablesolutions.akka.actor.Actor -import se.scalablesolutions.akka.camel.{Message, Consumer} - -/** - * @author Martin Krasser - */ -class Consumer3(transformer: Actor) extends Actor with Consumer { - - def endpointUri = "jetty:http://0.0.0.0:8877/camel/welcome" - - def receive = { - case msg: Message => transformer.forward(msg.setBodyAs(classOf[String])) - } - -} diff --git a/akka-samples/akka-sample-camel/src/main/scala/Transformer.scala b/akka-samples/akka-sample-camel/src/main/scala/Transformer.scala deleted file mode 100644 index 0df05c594c..0000000000 --- a/akka-samples/akka-sample-camel/src/main/scala/Transformer.scala +++ /dev/null @@ -1,15 +0,0 @@ -package sample.camel - -import se.scalablesolutions.akka.actor.Actor -import se.scalablesolutions.akka.camel.Message - -/** - * @author Martin Krasser - */ -class Transformer(producer: Actor) extends Actor { - - protected def receive = { - case msg: Message => producer.forward(msg.transformBody[String]("- %s -" format _)) - } - -} \ No newline at end of file