support for remote actors, consumer actor publishing at any time

This commit is contained in:
Martin Krasser 2010-03-11 15:53:17 +01:00
parent f8fab07bc8
commit e056af15fa
14 changed files with 356 additions and 152 deletions

View file

@ -122,8 +122,8 @@ class ActorProducer(val ep: ActorEndpoint) extends DefaultProducer(ep) {
case Some(msg: Failure) => exchange.fromFailureMessage(msg)
case Some(msg) => exchange.fromResponseMessage(Message.canonicalize(msg))
case None => {
throw new TimeoutException("communication with %s timed out after %d ms"
format (ep.getEndpointUri, actor.timeout))
throw new TimeoutException("timeout (%d ms) while waiting response from %s"
format (actor.timeout, ep.getEndpointUri))
}
}
}

View file

@ -4,23 +4,14 @@
package se.scalablesolutions.akka.camel.service
import java.io.InputStream
import org.apache.camel.builder.RouteBuilder
import se.scalablesolutions.akka.actor.{Actor, ActorRegistry}
import se.scalablesolutions.akka.annotation.consume
import se.scalablesolutions.akka.actor.ActorRegistry
import se.scalablesolutions.akka.camel.CamelContextManager
import se.scalablesolutions.akka.util.{Bootable, Logging}
import se.scalablesolutions.akka.camel.{CamelContextManager, Consumer}
/**
* Started by the Kernel to expose certain actors as Camel endpoints. It uses
* se.scalablesolutions.akka.camel.CamelContextManage to create and manage the
* lifecycle of a global CamelContext. This class further uses the
* se.scalablesolutions.akka.camel.service.CamelServiceRouteBuilder to implement
* routes from Camel endpoints to actors.
*
* @see CamelRouteBuilder
* Used by applications (and the Kernel) to publish consumer actors via Camel
* endpoints and to manage the life cycle of a a global CamelContext which can
* be accessed via se.scalablesolutions.akka.camel.CamelContextManager.
*
* @author Martin Krasser
*/
@ -28,28 +19,63 @@ trait CamelService extends Bootable with Logging {
import CamelContextManager._
private[camel] val consumerPublisher = new ConsumerPublisher
private[camel] val publishRequestor = new PublishRequestor(consumerPublisher)
/**
* Starts the CamelService. Any started actor that is a consumer actor will be (asynchronously)
* published as Camel endpoint. Consumer actors that are started after this method returned will
* be published as well. Actor publishing is done asynchronously.
*/
abstract override def onLoad = {
super.onLoad
// Only init and start if not already done by application
if (!initialized) init()
context.addRoutes(new CamelServiceRouteBuilder)
if (!started) start()
// Camel should cache input streams
context.setStreamCaching(true)
start()
// start actor that exposes consumer actors via Camel endpoints
consumerPublisher.start
// add listener for actor registration events
ActorRegistry.addRegistrationListener(publishRequestor.start)
// publish already registered consumer actors
for (publish <- Publish.forConsumers(ActorRegistry.actors)) consumerPublisher.!(publish)(None)
}
/**
* Stops the CamelService.
*/
abstract override def onUnload = {
ActorRegistry.removeRegistrationListener(publishRequestor)
publishRequestor.stop
consumerPublisher.stop
stop()
super.onUnload
}
/**
* Starts the CamelService.
*
* @see onLoad
*/
def load = onLoad
/**
* Stops the CamelService.
*
* @see onUnload
*/
def unload = onUnload
}
/**
* CamelService companion object used by standalone applications to create their own
* CamelService instances.
* CamelService instance.
*
* @author Martin Krasser
*/
@ -59,55 +85,4 @@ object CamelService {
* Creates a new CamelService instance.
*/
def newInstance: CamelService = new CamelService {}
}
/**
* Implements routes from Camel endpoints to actors. It searches the registry for actors
* that are either annotated with @se.scalablesolutions.akka.annotation.consume or mix in
* se.scalablesolutions.akka.camel.Consumer and exposes them as Camel endpoints.
*
* @author Martin Krasser
*/
class CamelServiceRouteBuilder extends RouteBuilder with Logging {
def configure = {
val actors = ActorRegistry.actors
// TODO: avoid redundant registrations
actors.filter(isConsumeAnnotated _).foreach { actor: Actor =>
val fromUri = actor.getClass.getAnnotation(classOf[consume]).value()
configure(fromUri, "actor:id:%s" format actor.getId)
log.debug("registered actor (id=%s) for consuming messages from %s "
format (actor.getId, fromUri))
}
// TODO: avoid redundant registrations
actors.filter(isConsumerInstance _).foreach { actor: Actor =>
val fromUri = actor.asInstanceOf[Consumer].endpointUri
configure(fromUri, "actor:uuid:%s" format actor.uuid)
log.debug("registered actor (uuid=%s) for consuming messages from %s "
format (actor.uuid, fromUri))
}
}
private def configure(fromUri: String, toUri: String) {
val schema = fromUri take fromUri.indexOf(":") // e.g. "http" from "http://whatever/..."
bodyConversions.get(schema) match {
case Some(clazz) => from(fromUri).convertBodyTo(clazz).to(toUri)
case None => from(fromUri).to(toUri)
}
}
// TODO: make conversions configurable
private def bodyConversions = Map(
"file" -> classOf[InputStream]
)
private def isConsumeAnnotated(actor: Actor) =
actor.getClass.getAnnotation(classOf[consume]) ne null
private def isConsumerInstance(actor: Actor) =
actor.isInstanceOf[Consumer]
}

View file

@ -0,0 +1,145 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.camel.service
import java.io.InputStream
import java.util.concurrent.CountDownLatch
import org.apache.camel.builder.RouteBuilder
import se.scalablesolutions.akka.actor.{ActorUnregistered, ActorRegistered, Actor}
import se.scalablesolutions.akka.annotation.consume
import se.scalablesolutions.akka.camel.{Consumer, CamelContextManager}
import se.scalablesolutions.akka.util.Logging
/**
* Actor that publishes consumer actors as Camel endpoints at the CamelContext managed
* by se.scalablesolutions.akka.camel.CamelContextManager. It accepts messages of type
* se.scalablesolutions.akka.camel.service.Publish.
*
* @author Martin Krasser
*/
class ConsumerPublisher extends Actor with Logging {
@volatile private var latch = new CountDownLatch(0)
/**
* Adds a route to the actor identified by a Publish message to the global CamelContext.
*/
protected def receive = {
case p: Publish => publish(new ConsumerRoute(p.endpointUri, p.id, p.uuid))
case _ => { /* ignore */}
}
/**
* Sets the number of expected Publish messages received by this actor. Used for testing
* only.
*/
private[camel] def expectPublishCount(count: Int) {
latch = new CountDownLatch(count)
}
/**
* Waits for the number of expected Publish messages to arrive. Used for testing only.
*/
private[camel] def awaitPublish = latch.await
private def publish(route: ConsumerRoute) {
CamelContextManager.context.addRoutes(route)
log.info("published actor via endpoint %s" format route.endpointUri)
latch.countDown // needed for testing only.
}
}
/**
* Defines the route to a consumer actor.
*
* @param endpointUri endpoint URI of the consumer actor
* @param id actor identifier
* @param uuid <code>true</code> if <code>id</code> refers to Actor.uuid, <code>false</code> if
* <code>id</code> refers to Acotr.getId.
*
* @author Martin Krasser
*/
class ConsumerRoute(val endpointUri: String, id: String, uuid: Boolean) extends RouteBuilder {
// TODO: make conversions configurable
private val bodyConversions = Map(
"file" -> classOf[InputStream]
)
def configure = {
val schema = endpointUri take endpointUri.indexOf(":") // e.g. "http" from "http://whatever/..."
bodyConversions.get(schema) match {
case Some(clazz) => from(endpointUri).convertBodyTo(clazz).to(actorUri)
case None => from(endpointUri).to(actorUri)
}
}
private def actorUri = (if (uuid) "actor:uuid:%s" else "actor:id:%s") format id
}
/**
* A registration listener that publishes consumer actors (and ignores other actors).
*
* @author Martin Krasser
*/
class PublishRequestor(consumerPublisher: Actor) extends Actor {
protected def receive = {
case ActorUnregistered(actor) => { /* ignore */ }
case ActorRegistered(actor) => Publish.forConsumer(actor) match {
case Some(publish) => consumerPublisher ! publish
case None => { /* ignore */ }
}
}
}
/**
* Request message for publishing a consumer actor.
*
* @param endpointUri endpoint URI of the consumer actor
* @param id actor identifier
* @param uuid <code>true</code> if <code>id</code> refers to Actor.uuid, <code>false</code> if
* <code>id</code> refers to Acotr.getId.
*
* @author Martin Krasser
*/
case class Publish(endpointUri: String, id: String, uuid: Boolean)
/**
* @author Martin Krasser
*/
object Publish {
/**
* Creates a list of Publish request messages for all consumer actors in the <code>actors</code>
* list.
*/
def forConsumers(actors: List[Actor]): List[Publish] = {
for (actor <- actors; pub = forConsumer(actor); if pub.isDefined) yield pub.get
}
/**
* Creates a Publish request message if <code>actor</code> is a consumer actor.
*/
def forConsumer(actor: Actor): Option[Publish] = {
forConsumeAnnotated(actor) orElse forConsumerType(actor)
}
private def forConsumeAnnotated(actor: Actor): Option[Publish] = {
val annotation = actor.getClass.getAnnotation(classOf[consume])
if (annotation eq null)
None
else if (actor._remoteAddress.isDefined)
None // do not publish proxies
else
Some(Publish(annotation.value, actor.getId, false))
}
private def forConsumerType(actor: Actor): Option[Publish] = {
if (!actor.isInstanceOf[Consumer])
None
else if (actor._remoteAddress.isDefined)
None
else
Some(Publish(actor.asInstanceOf[Consumer].endpointUri, actor.uuid, true))
}
}

View file

@ -33,6 +33,7 @@ class ProducerTest extends JUnitSuite {
//
// TODO: test replies to messages sent with ! (bang)
// TODO: test copying of custom message headers
//
@Test def shouldProduceMessageSyncAndReceiveResponse = {

View file

@ -2,12 +2,12 @@ package se.scalablesolutions.akka.camel.service
import org.apache.camel.builder.RouteBuilder
import org.junit.Assert._
import org.junit.{Before, After, Test}
import org.scalatest.junit.JUnitSuite
import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.annotation.consume
import se.scalablesolutions.akka.camel.{CamelContextManager, Consumer, Message}
import org.junit.{Ignore, Before, After, Test}
class CamelServiceTest extends JUnitSuite with CamelService {
@ -23,26 +23,40 @@ class CamelServiceTest extends JUnitSuite with CamelService {
var actor3: Actor = _
@Before def setUp = {
// register actors before starting the CamelService
actor1 = new TestActor1().start
actor2 = new TestActor2().start
actor3 = new TestActor3().start
init()
// initialize global CamelContext
init
// customize global CamelContext
context.addRoutes(new TestRouteBuilder)
onLoad
consumerPublisher.expectPublishCount(2)
load
consumerPublisher.awaitPublish
}
@After def tearDown = {
onUnload
unload
actor1.stop
actor2.stop
actor3.stop
}
@Test def shouldReceiveResponseViaGeneratedRoute = {
@Test def shouldReceiveResponseViaPreStartGeneratedRoutes = {
assertEquals("Hello Martin (actor1)", template.requestBody("direct:actor1", "Martin"))
assertEquals("Hello Martin (actor2)", template.requestBody("direct:actor2", "Martin"))
}
@Test def shouldReceiveResponseViaPostStartGeneratedRoute = {
consumerPublisher.expectPublishCount(1)
// register actor after starting CamelService
val actor4 = new TestActor4().start
consumerPublisher.awaitPublish
assertEquals("Hello Martin (actor4)", template.requestBody("direct:actor4", "Martin"))
actor4.stop
}
@Test def shouldReceiveResponseViaCustomRoute = {
assertEquals("Hello Tester (actor3)", template.requestBody("direct:actor3", "Martin"))
}
@ -55,7 +69,6 @@ class TestActor1 extends Actor with Consumer {
protected def receive = {
case msg: Message => reply("Hello %s (actor1)" format msg.body)
}
}
@consume("direct:actor2")
@ -73,6 +86,14 @@ class TestActor3 extends Actor {
}
}
class TestActor4 extends Actor with Consumer {
def endpointUri = "direct:actor4"
protected def receive = {
case msg: Message => reply("Hello %s (actor4)" format msg.body)
}
}
class TestRouteBuilder extends RouteBuilder {
def configure {
val actorUri = "actor:%s" format classOf[TestActor3].getName

View file

@ -8,8 +8,7 @@ import se.scalablesolutions.akka.util.Logging
import scala.collection.mutable.ListBuffer
import scala.reflect.Manifest
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap}
/**
* Registry holding all Actor instances in the whole system.
@ -23,9 +22,10 @@ import java.util.concurrent.ConcurrentHashMap
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object ActorRegistry extends Logging {
private val actorsByUUID = new ConcurrentHashMap[String, Actor]
private val actorsById = new ConcurrentHashMap[String, List[Actor]]
private val actorsByClassName = new ConcurrentHashMap[String, List[Actor]]
private val actorsByUUID = new ConcurrentHashMap[String, Actor]
private val actorsById = new ConcurrentHashMap[String, List[Actor]]
private val actorsByClassName = new ConcurrentHashMap[String, List[Actor]]
private val registrationListeners = new CopyOnWriteArrayList[Actor]
/**
* Returns all actors in the system.
@ -103,6 +103,9 @@ object ActorRegistry extends Logging {
if (actorsByClassName.containsKey(className)) {
actorsByClassName.put(className, actor :: actorsByClassName.get(className))
} else actorsByClassName.put(className, actor :: Nil)
// notify listeners
foreachListener(_.!(ActorRegistered(actor))(None))
}
/**
@ -112,6 +115,8 @@ object ActorRegistry extends Logging {
actorsByUUID remove actor.uuid
actorsById remove actor.getId
actorsByClassName remove actor.getClass.getName
// notify listeners
foreachListener(_.!(ActorUnregistered(actor))(None))
}
/**
@ -125,4 +130,26 @@ object ActorRegistry extends Logging {
actorsByClassName.clear
log.info("All actors have been shut down and unregistered from ActorRegistry")
}
/**
* Adds the registration <code>listener</code> this this registry's listener list.
*/
def addRegistrationListener(listener: Actor) = {
registrationListeners.add(listener)
}
/**
* Removes the registration <code>listener</code> this this registry's listener list.
*/
def removeRegistrationListener(listener: Actor) = {
registrationListeners.remove(listener)
}
private def foreachListener(f: (Actor) => Unit) {
val iterator = registrationListeners.iterator
while (iterator.hasNext) f(iterator.next)
}
}
case class ActorRegistered(actor: Actor)
case class ActorUnregistered(actor: Actor)

View file

@ -0,0 +1,57 @@
package sample.camel
import se.scalablesolutions.akka.actor.{Actor, RemoteActor}
import se.scalablesolutions.akka.annotation.consume
import se.scalablesolutions.akka.camel.{Message, Consumer}
import se.scalablesolutions.akka.util.Logging
/**
* Client-initiated remote actor.
*/
class RemoteActor1 extends RemoteActor("localhost", 7777) with Consumer {
def endpointUri = "jetty:http://localhost:6644/remote1"
protected def receive = {
case msg => reply("response from remote actor 1")
}
}
/**
* Server-initiated remote actor.
*/
class RemoteActor2 extends Actor with Consumer {
def endpointUri = "jetty:http://localhost:6644/remote2"
protected def receive = {
case msg => reply("response from remote actor 2")
}
}
class Consumer1 extends Actor with Consumer with Logging {
def endpointUri = "file:data/input"
def receive = {
case msg: Message => log.info("received %s" format msg.bodyAs(classOf[String]))
}
}
@consume("jetty:http://0.0.0.0:8877/camel/test1")
class Consumer2 extends Actor {
def receive = {
case msg: Message => reply("Hello %s" format msg.bodyAs(classOf[String]))
}
}
class Consumer3(transformer: Actor) extends Actor with Consumer {
def endpointUri = "jetty:http://0.0.0.0:8877/camel/welcome"
def receive = {
case msg: Message => transformer.forward(msg.setBodyAs(classOf[String]))
}
}
class Transformer(producer: Actor) extends Actor {
protected def receive = {
case msg: Message => producer.forward(msg.transformBody[String]("- %s -" format _))
}
}

View file

@ -0,0 +1,26 @@
package sample.camel
import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.remote.RemoteClient
/**
* @author Martin Krasser
*/
object Application1 {
//
// TODO: completion of example
//
def main(args: Array[String]) {
implicit val sender: Option[Actor] = None
val actor1 = new RemoteActor1
val actor2 = RemoteClient.actorFor("remote2", "localhost", 7777)
actor1.start
actor1 ! "hello"
actor2 ! "hello"
}
}

View file

@ -0,0 +1,22 @@
package sample.camel
import se.scalablesolutions.akka.camel.service.CamelService
import se.scalablesolutions.akka.remote.RemoteNode
/**
* @author Martin Krasser
*/
object Application2 {
//
// TODO: completion of example
//
def main(args: Array[String]) {
val camelService = CamelService.newInstance
camelService.load
RemoteNode.start("localhost", 7777)
RemoteNode.register("remote2", new RemoteActor2().start)
}
}

View file

@ -33,7 +33,6 @@ class Boot {
}
class CustomRouteBuilder extends RouteBuilder {
def configure {
val actorUri = "actor:%s" format classOf[Consumer2].getName
from("jetty:http://0.0.0.0:8877/camel/test2").to(actorUri)
@ -42,7 +41,5 @@ class CustomRouteBuilder extends RouteBuilder {
exchange.getOut.setBody("Welcome %s" format exchange.getIn.getBody)
}
})
}
}

View file

@ -1,18 +0,0 @@
package sample.camel
import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.camel.{Message, Consumer}
/**
* @author Martin Krasser
*/
class Consumer1 extends Actor with Consumer with Logging {
def endpointUri = "file:data/input"
def receive = {
case msg: Message => log.info("received %s" format msg.bodyAs(classOf[String]))
}
}

View file

@ -1,17 +0,0 @@
package sample.camel
import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.annotation.consume
import se.scalablesolutions.akka.camel.Message
/**
* @author Martin Krasser
*/
@consume("jetty:http://0.0.0.0:8877/camel/test1")
class Consumer2 extends Actor {
def receive = {
case msg: Message => reply("Hello %s" format msg.bodyAs(classOf[String]))
}
}

View file

@ -1,17 +0,0 @@
package sample.camel
import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.camel.{Message, Consumer}
/**
* @author Martin Krasser
*/
class Consumer3(transformer: Actor) extends Actor with Consumer {
def endpointUri = "jetty:http://0.0.0.0:8877/camel/welcome"
def receive = {
case msg: Message => transformer.forward(msg.setBodyAs(classOf[String]))
}
}

View file

@ -1,15 +0,0 @@
package sample.camel
import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.camel.Message
/**
* @author Martin Krasser
*/
class Transformer(producer: Actor) extends Actor {
protected def receive = {
case msg: Message => producer.forward(msg.transformBody[String]("- %s -" format _))
}
}