Merge branch 'master' of git@github.com:jboner/akka

This commit is contained in:
Jonas Bonér 2010-06-08 20:47:35 +02:00
commit 411038abcb
35 changed files with 1161 additions and 511 deletions

View file

@ -0,0 +1 @@
class=se.scalablesolutions.akka.camel.component.ActiveObjectComponent

View file

@ -4,9 +4,12 @@
package se.scalablesolutions.akka.camel
import java.util.Map
import org.apache.camel.{ProducerTemplate, CamelContext}
import org.apache.camel.impl.DefaultCamelContext
import se.scalablesolutions.akka.camel.component.ActiveObjectComponent
import se.scalablesolutions.akka.util.Logging
/**
@ -26,7 +29,18 @@ trait CamelContextLifecycle extends Logging {
private var _started = false
/**
* Returns the managed CamelContext.
* Camel component for accessing active objects.
*/
private[camel] var activeObjectComponent: ActiveObjectComponent = _
/**
* Registry in which active objects are TEMPORARILY registered during
* creation of Camel routes to active objects.
*/
private[camel] var activeObjectRegistry: Map[String, AnyRef] = _
/**
* Returns the managed CamelContext.
*/
protected def context: CamelContext = _context
@ -78,10 +92,16 @@ trait CamelContextLifecycle extends Logging {
* Initializes this lifecycle object with the given CamelContext. For the passed
* CamelContext stream-caching is enabled. If applications want to disable stream-
* caching they can do so after this method returned and prior to calling start.
* This method also registers a new
* {@link se.scalablesolutions.akka.camel.component.ActiveObjectComponent} at
* <code>context</code> under a name defined by ActiveObjectComponent.InternalSchema.
*/
def init(context: CamelContext) {
this.activeObjectComponent = new ActiveObjectComponent
this.activeObjectRegistry = activeObjectComponent.activeObjectRegistry
this.context = context
this.context.setStreamCaching(true)
this.context.addComponent(ActiveObjectComponent.InternalSchema, activeObjectComponent)
this.template = context.createProducerTemplate
_initialized = true
log.info("Camel context initialized")
@ -90,7 +110,7 @@ trait CamelContextLifecycle extends Logging {
/**
* Makes a global CamelContext and ProducerTemplate accessible to applications. The lifecycle
* of these objects is managed by se.scalablesolutions.akka.camel.service.CamelService.
* of these objects is managed by se.scalablesolutions.akka.camel.CamelService.
*/
object CamelContextManager extends CamelContextLifecycle {
override def context: CamelContext = super.context

View file

@ -0,0 +1,100 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.camel
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.actor.{AspectInitRegistry, ActorRegistry}
import se.scalablesolutions.akka.util.{Bootable, Logging}
/**
* Used by applications (and the Kernel) to publish consumer actors and active objects via
* Camel endpoints and to manage the life cycle of a a global CamelContext which can be
* accessed via <code>se.scalablesolutions.akka.camel.CamelContextManager.context</code>.
*
* @author Martin Krasser
*/
trait CamelService extends Bootable with Logging {
import CamelContextManager._
private[camel] val consumerPublisher = actorOf[ConsumerPublisher]
private[camel] val publishRequestor = actorOf[PublishRequestor]
// add listener for actor registration events
ActorRegistry.addListener(publishRequestor)
// add listener for AspectInit registration events
AspectInitRegistry.addListener(publishRequestor)
/**
* Starts the CamelService. Any started actor that is a consumer actor will be (asynchronously)
* published as Camel endpoint. Consumer actors that are started after this method returned will
* be published as well. Actor publishing is done asynchronously. A started (loaded) CamelService
* also publishes <code>@consume</code> annotated methods of active objects that have been created
* with <code>ActiveObject.newInstance(..)</code> (and <code>ActiveObject.newInstance(..)</code>
* on a remote node).
*/
abstract override def onLoad = {
super.onLoad
// Only init and start if not already done by application
if (!initialized) init
if (!started) start
// start actor that exposes consumer actors and active objects via Camel endpoints
consumerPublisher.start
// init publishRequestor so that buffered and future events are delivered to consumerPublisher
publishRequestor ! PublishRequestorInit(consumerPublisher)
}
/**
* Stops the CamelService.
*/
abstract override def onUnload = {
ActorRegistry.removeListener(publishRequestor)
AspectInitRegistry.removeListener(publishRequestor)
consumerPublisher.stop
stop
super.onUnload
}
/**
* Starts the CamelService.
*
* @see onLoad
*/
def load = onLoad
/**
* Stops the CamelService.
*
* @see onUnload
*/
def unload = onUnload
}
/**
* CamelService companion object used by standalone applications to create their own
* CamelService instance.
*
* @author Martin Krasser
*/
object CamelService {
/**
* Creates a new CamelService instance.
*/
def newInstance: CamelService = new DefaultCamelService
}
/**
* Default CamelService implementation to be created in Java applications with
* <pre>
* CamelService service = new DefaultCamelService()
* </pre>
*/
class DefaultCamelService extends CamelService {
}

View file

@ -0,0 +1,322 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.camel
import collection.mutable.ListBuffer
import java.io.InputStream
import java.lang.reflect.Method
import java.util.concurrent.CountDownLatch
import org.apache.camel.builder.RouteBuilder
import se.scalablesolutions.akka.actor._
import se.scalablesolutions.akka.actor.annotation.consume
import se.scalablesolutions.akka.camel.component.ActiveObjectComponent
import se.scalablesolutions.akka.util.Logging
/**
* @author Martin Krasser
*/
private[camel] object ConsumerPublisher extends Logging {
/**
* Creates a route to the registered consumer actor.
*/
def handleConsumerRegistered(event: ConsumerRegistered) {
CamelContextManager.context.addRoutes(new ConsumerActorRoute(event.uri, event.id, event.uuid))
log.info("published actor %s at endpoint %s" format (event.actorRef, event.uri))
}
/**
* Stops route to the already un-registered consumer actor.
*/
def handleConsumerUnregistered(event: ConsumerUnregistered) {
CamelContextManager.context.stopRoute(event.id)
log.info("unpublished actor %s from endpoint %s" format (event.actorRef, event.uri))
}
/**
* Creates a route to an active object method.
*/
def handleConsumerMethodRegistered(event: ConsumerMethodRegistered) {
val targetMethod = event.method.getName
val objectId = "%s_%s" format (event.init.actorRef.uuid, targetMethod)
CamelContextManager.activeObjectRegistry.put(objectId, event.activeObject)
CamelContextManager.context.addRoutes(new ConsumerMethodRoute(event.uri, objectId, targetMethod))
log.info("published method %s of %s at endpoint %s" format (targetMethod, event.activeObject, event.uri))
}
}
/**
* Actor that publishes consumer actors and active object methods at Camel endpoints.
* The Camel context used for publishing is CamelContextManager.context. This actor
* accepts messages of type
* se.scalablesolutions.akka.camel.service.ConsumerRegistered,
* se.scalablesolutions.akka.camel.service.ConsumerMethodRegistered and
* se.scalablesolutions.akka.camel.service.ConsumerUnregistered.
*
* @author Martin Krasser
*/
private[camel] class ConsumerPublisher extends Actor {
import ConsumerPublisher._
@volatile private var latch = new CountDownLatch(0)
/**
* Adds a route to the actor identified by a Publish message to the global CamelContext.
*/
protected def receive = {
case r: ConsumerRegistered => {
handleConsumerRegistered(r)
latch.countDown // needed for testing only.
}
case u: ConsumerUnregistered => {
handleConsumerUnregistered(u)
latch.countDown // needed for testing only.
}
case d: ConsumerMethodRegistered => {
handleConsumerMethodRegistered(d)
latch.countDown // needed for testing only.
}
case SetExpectedMessageCount(num) => {
// needed for testing only.
latch = new CountDownLatch(num)
self.reply(latch)
}
case _ => { /* ignore */}
}
}
/**
* Command message used For testing-purposes only.
*/
private[camel] case class SetExpectedMessageCount(num: Int)
/**
* Defines an abstract route to a target which is either an actor or an active object method..
*
* @param endpointUri endpoint URI of the consumer actor or active object method.
* @param id actor identifier or active object identifier (registry key).
*
* @author Martin Krasser
*/
private[camel] abstract class ConsumerRoute(endpointUri: String, id: String) extends RouteBuilder {
// TODO: make conversions configurable
private val bodyConversions = Map(
"file" -> classOf[InputStream]
)
def configure = {
val schema = endpointUri take endpointUri.indexOf(":") // e.g. "http" from "http://whatever/..."
bodyConversions.get(schema) match {
case Some(clazz) => from(endpointUri).routeId(id).convertBodyTo(clazz).to(targetUri)
case None => from(endpointUri).routeId(id).to(targetUri)
}
}
protected def targetUri: String
}
/**
* Defines the route to a consumer actor.
*
* @param endpointUri endpoint URI of the consumer actor
* @param id actor identifier
* @param uuid <code>true</code> if <code>id</code> refers to Actor.uuid, <code>false</code> if
* <code>id</code> refers to Actor.getId.
*
* @author Martin Krasser
*/
private[camel] class ConsumerActorRoute(endpointUri: String, id: String, uuid: Boolean) extends ConsumerRoute(endpointUri, id) {
protected override def targetUri = (if (uuid) "actor:uuid:%s" else "actor:id:%s") format id
}
/**
* Defines the route to an active object method..
*
* @param endpointUri endpoint URI of the consumer actor method
* @param id active object identifier
* @param method name of the method to invoke.
*
* @author Martin Krasser
*/
private[camel] class ConsumerMethodRoute(val endpointUri: String, id: String, method: String) extends ConsumerRoute(endpointUri, id) {
protected override def targetUri = "%s:%s?method=%s" format (ActiveObjectComponent.InternalSchema, id, method)
}
/**
* A registration listener that triggers publication of consumer actors and active object
* methods as well as un-publication of consumer actors. This actor needs to be initialized
* with a <code>PublishRequestorInit</code> command message for obtaining a reference to
* a <code>publisher</code> actor. Before initialization it buffers all outbound messages
* and delivers them to the <code>publisher</code> when receiving a
* <code>PublishRequestorInit</code> message. After initialization, outbound messages are
* delivered directly without buffering.
*
* @see PublishRequestorInit
*
* @author Martin Krasser
*/
private[camel] class PublishRequestor extends Actor {
private val events = ListBuffer[ConsumerEvent]()
private var publisher: Option[ActorRef] = None
protected def receive = {
case ActorRegistered(actor) =>
for (event <- ConsumerRegistered.forConsumer(actor)) deliverCurrentEvent(event)
case ActorUnregistered(actor) =>
for (event <- ConsumerUnregistered.forConsumer(actor)) deliverCurrentEvent(event)
case AspectInitRegistered(proxy, init) =>
for (event <- ConsumerMethodRegistered.forConsumer(proxy, init)) deliverCurrentEvent(event)
case PublishRequestorInit(pub) => {
publisher = Some(pub)
deliverBufferedEvents
}
case _ => { /* ignore */ }
}
private def deliverCurrentEvent(event: ConsumerEvent) = {
publisher match {
case Some(pub) => pub ! event
case None => events += event
}
}
private def deliverBufferedEvents = {
for (event <- events) deliverCurrentEvent(event)
events.clear
}
}
/**
* Command message to initialize a PublishRequestor to use <code>consumerPublisher</code>
* for publishing actors or active object methods.
*/
private[camel] case class PublishRequestorInit(consumerPublisher: ActorRef)
/**
* A consumer (un)registration event.
*
* @author Martin Krasser
*/
private[camel] sealed trait ConsumerEvent
/**
* Event indicating that a consumer actor has been registered at the actor registry.
*
* @param actorRef actor reference
* @param uri endpoint URI of the consumer actor
* @param id actor identifier
* @param uuid <code>true</code> if <code>id</code> is the actor's uuid, <code>false</code> if
* <code>id</code> is the actor's id.
*
* @author Martin Krasser
*/
private[camel] case class ConsumerRegistered(actorRef: ActorRef, uri: String, id: String, uuid: Boolean) extends ConsumerEvent
/**
* Event indicating that a consumer actor has been unregistered from the actor registry.
*
* @param actorRef actor reference
* @param uri endpoint URI of the consumer actor
* @param id actor identifier
* @param uuid <code>true</code> if <code>id</code> is the actor's uuid, <code>false</code> if
* <code>id</code> is the actor's id.
*
* @author Martin Krasser
*/
private[camel] case class ConsumerUnregistered(actorRef: ActorRef, uri: String, id: String, uuid: Boolean) extends ConsumerEvent
/**
* Event indicating that an active object proxy has been created for a POJO. For each
* <code>@consume</code> annotated POJO method a separate instance of this class is
* created.
*
* @param activeObject active object (proxy).
* @param init
* @param uri endpoint URI of the active object method
* @param method method to be published.
*
* @author Martin Krasser
*/
private[camel] case class ConsumerMethodRegistered(activeObject: AnyRef, init: AspectInit, uri: String, method: Method) extends ConsumerEvent
/**
* @author Martin Krasser
*/
private[camel] object ConsumerRegistered {
/**
* Optionally creates an ConsumerRegistered event message for a consumer actor or None if
* <code>actorRef</code> is not a consumer actor.
*/
def forConsumer(actorRef: ActorRef): Option[ConsumerRegistered] = actorRef match {
case ConsumerDescriptor(ref, uri, id, uuid) => Some(ConsumerRegistered(ref, uri, id, uuid))
case _ => None
}
}
/**
* @author Martin Krasser
*/
private[camel] object ConsumerUnregistered {
/**
* Optionally creates an ConsumerUnregistered event message for a consumer actor or None if
* <code>actorRef</code> is not a consumer actor.
*/
def forConsumer(actorRef: ActorRef): Option[ConsumerUnregistered] = actorRef match {
case ConsumerDescriptor(ref, uri, id, uuid) => Some(ConsumerUnregistered(ref, uri, id, uuid))
case _ => None
}
}
/**
* @author Martin Krasser
*/
private[camel] object ConsumerMethodRegistered {
/**
* Creates a list of ConsumerMethodRegistered event messages for an active object or an empty
* list if the active object is a proxy for an remote active object or the active object doesn't
* have any <code>@consume</code> annotated methods.
*/
def forConsumer(activeObject: AnyRef, init: AspectInit): List[ConsumerMethodRegistered] = {
// TODO: support consumer annotation inheritance
// - visit overridden methods in superclasses
// - visit implemented method declarations in interfaces
if (init.remoteAddress.isDefined) Nil // let remote node publish active object methods on endpoints
else for (m <- activeObject.getClass.getMethods.toList; if (m.isAnnotationPresent(classOf[consume])))
yield ConsumerMethodRegistered(activeObject, init, m.getAnnotation(classOf[consume]).value, m)
}
}
/**
* Describes a consumer actor with elements that are relevant for publishing an actor at a
* Camel endpoint (or unpublishing an actor from an endpoint).
*
* @author Martin Krasser
*/
private[camel] object ConsumerDescriptor {
/**
* An extractor that optionally creates a 4-tuple from a consumer actor reference containing
* the actor reference itself, endpoint URI, identifier and a hint whether the identifier
* is the actor uuid or actor id. If <code>actorRef</code> doesn't reference a consumer actor,
* None is returned.
*/
def unapply(actorRef: ActorRef): Option[(ActorRef, String, String, Boolean)] =
unapplyConsumerInstance(actorRef) orElse unapplyConsumeAnnotated(actorRef)
private def unapplyConsumeAnnotated(actorRef: ActorRef): Option[(ActorRef, String, String, Boolean)] = {
val annotation = actorRef.actorClass.getAnnotation(classOf[consume])
if (annotation eq null) None
else if (actorRef.remoteAddress.isDefined) None
else Some((actorRef, annotation.value, actorRef.id, false))
}
private def unapplyConsumerInstance(actorRef: ActorRef): Option[(ActorRef, String, String, Boolean)] =
if (!actorRef.actor.isInstanceOf[Consumer]) None
else if (actorRef.remoteAddress.isDefined) None
else Some((actorRef, actorRef.actor.asInstanceOf[Consumer].endpointUri, actorRef.uuid, true))
}

View file

@ -160,7 +160,7 @@ trait Producer { this: Actor =>
*
* @author Martin Krasser
*/
class ProducerResponseSender(
private[camel] class ProducerResponseSender(
headers: Map[String, Any],
sender: Option[ActorRef],
senderFuture: Option[CompletableFuture[Any]],

View file

@ -0,0 +1,108 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.camel.component
import java.util.Map
import java.util.concurrent.ConcurrentHashMap
import org.apache.camel.CamelContext
import org.apache.camel.component.bean._
/**
* @author Martin Krasser
*/
object ActiveObjectComponent {
/**
* Default schema name for active object endpoint URIs.
*/
val InternalSchema = "active-object-internal"
}
/**
* Camel component for exchanging messages with active objects. This component
* tries to obtain the active object from the <code>activeObjectRegistry</code>
* first. If it's not there it tries to obtain it from the CamelContext's registry.
*
* @see org.apache.camel.component.bean.BeanComponent
*
* @author Martin Krasser
*/
class ActiveObjectComponent extends BeanComponent {
val activeObjectRegistry = new ConcurrentHashMap[String, AnyRef]
/**
* Creates a {@link org.apache.camel.component.bean.BeanEndpoint} with a custom
* bean holder that uses <code>activeObjectRegistry</code> for getting access to
* active objects (beans).
*
* @see se.scalablesolutions.akka.camel.component.ActiveObjectHolder
*/
override def createEndpoint(uri: String, remaining: String, parameters: Map[String, AnyRef]) = {
val endpoint = new BeanEndpoint(uri, this)
endpoint.setBeanName(remaining)
endpoint.setBeanHolder(createBeanHolder(remaining))
setProperties(endpoint.getProcessor, parameters)
endpoint
}
private def createBeanHolder(beanName: String) =
new ActiveObjectHolder(activeObjectRegistry, getCamelContext, beanName).createCacheHolder
}
/**
* {@link org.apache.camel.component.bean.BeanHolder} implementation that uses a custom
* registry for getting access to active objects.
*
* @author Martin Krasser
*/
class ActiveObjectHolder(activeObjectRegistry: Map[String, AnyRef], context: CamelContext, name: String)
extends RegistryBean(context, name) {
/**
* Returns an {@link se.scalablesolutions.akka.camel.component.ActiveObjectInfo} instance.
*/
override def getBeanInfo: BeanInfo =
new ActiveObjectInfo(getContext, getBean.getClass, getParameterMappingStrategy)
/**
* Obtains an active object from <code>activeObjectRegistry</code>.
*/
override def getBean: AnyRef = {
val bean = activeObjectRegistry.get(getName)
if (bean eq null) super.getBean else bean
}
}
/**
* Provides active object meta information.
*
* @author Martin Krasser
*/
class ActiveObjectInfo(context: CamelContext, clazz: Class[_], strategy: ParameterMappingStrategy)
extends BeanInfo(context, clazz, strategy) {
/**
* Introspects AspectWerkz proxy classes.
*
* @param clazz AspectWerkz proxy class.
*/
protected override def introspect(clazz: Class[_]): Unit = {
// TODO: fix target class detection in BeanInfo.introspect(Class)
// Camel assumes that classes containing a '$$' in the class name
// are classes generated with CGLIB. This conflicts with proxies
// created from interfaces with AspectWerkz. Once the fix is in
// place this method can be removed.
for (method <- clazz.getDeclaredMethods) {
if (isValidMethod(clazz, method)) {
introspect(clazz, method)
}
}
val superclass = clazz.getSuperclass
if (superclass != null && !superclass.equals(classOf[AnyRef])) {
introspect(superclass)
}
}
}

View file

@ -1,86 +0,0 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.camel.service
import se.scalablesolutions.akka.actor.ActorRegistry
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.camel.CamelContextManager
import se.scalablesolutions.akka.util.{Bootable, Logging}
/**
* Used by applications (and the Kernel) to publish consumer actors via Camel
* endpoints and to manage the life cycle of a a global CamelContext which can
* be accessed via se.scalablesolutions.akka.camel.CamelContextManager.
*
* @author Martin Krasser
*/
trait CamelService extends Bootable with Logging {
import CamelContextManager._
private[camel] val consumerPublisher = actorOf[ConsumerPublisher]
private[camel] val publishRequestor = actorOf(new PublishRequestor(consumerPublisher))
/**
* Starts the CamelService. Any started actor that is a consumer actor will be (asynchronously)
* published as Camel endpoint. Consumer actors that are started after this method returned will
* be published as well. Actor publishing is done asynchronously.
*/
abstract override def onLoad = {
super.onLoad
// Only init and start if not already done by application
if (!initialized) init
if (!started) start
// start actor that exposes consumer actors via Camel endpoints
consumerPublisher.start
// add listener for actor registration events
ActorRegistry.addRegistrationListener(publishRequestor.start)
// publish already registered consumer actors
for (actor <- ActorRegistry.actors; event <- ConsumerRegistered.forConsumer(actor)) consumerPublisher ! event
}
/**
* Stops the CamelService.
*/
abstract override def onUnload = {
ActorRegistry.removeRegistrationListener(publishRequestor)
publishRequestor.stop
consumerPublisher.stop
stop
super.onUnload
}
/**
* Starts the CamelService.
*
* @see onLoad
*/
def load = onLoad
/**
* Stops the CamelService.
*
* @see onUnload
*/
def unload = onUnload
}
/**
* CamelService companion object used by standalone applications to create their own
* CamelService instance.
*
* @author Martin Krasser
*/
object CamelService {
/**
* Creates a new CamelService instance.
*/
def newInstance: CamelService = new CamelService {}
}

View file

@ -1,210 +0,0 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.camel.service
import java.io.InputStream
import java.util.concurrent.CountDownLatch
import org.apache.camel.builder.RouteBuilder
import se.scalablesolutions.akka.actor.{ActorUnregistered, ActorRegistered, Actor, ActorRef}
import se.scalablesolutions.akka.actor.annotation.consume
import se.scalablesolutions.akka.camel.{Consumer, CamelContextManager}
import se.scalablesolutions.akka.util.Logging
/**
* Actor that publishes consumer actors as Camel endpoints at the CamelContext managed
* by se.scalablesolutions.akka.camel.CamelContextManager. It accepts messages of type
* se.scalablesolutions.akka.camel.service.ConsumerRegistered and
* se.scalablesolutions.akka.camel.service.ConsumerUnregistered.
*
* @author Martin Krasser
*/
class ConsumerPublisher extends Actor with Logging {
@volatile private var publishLatch = new CountDownLatch(0)
@volatile private var unpublishLatch = new CountDownLatch(0)
/**
* Adds a route to the actor identified by a Publish message to the global CamelContext.
*/
protected def receive = {
case r: ConsumerRegistered => {
handleConsumerRegistered(r)
publishLatch.countDown // needed for testing only.
}
case u: ConsumerUnregistered => {
handleConsumerUnregistered(u)
unpublishLatch.countDown // needed for testing only.
}
case _ => { /* ignore */}
}
/**
* Sets the expected number of actors to be published. Used for testing only.
*/
private[camel] def expectPublishCount(count: Int): Unit =
publishLatch = new CountDownLatch(count)
/**
* Sets the expected number of actors to be unpublished. Used for testing only.
*/
private[camel] def expectUnpublishCount(count: Int): Unit =
unpublishLatch = new CountDownLatch(count)
/**
* Waits for the expected number of actors to be published. Used for testing only.
*/
private[camel] def awaitPublish = publishLatch.await
/**
* Waits for the expected number of actors to be unpublished. Used for testing only.
*/
private[camel] def awaitUnpublish = unpublishLatch.await
/**
* Creates a route to the registered consumer actor.
*/
private def handleConsumerRegistered(event: ConsumerRegistered) {
CamelContextManager.context.addRoutes(new ConsumerRoute(event.uri, event.id, event.uuid))
log.info("published actor %s (%s) at endpoint %s" format (event.clazz, event.id, event.uri))
}
/**
* Stops route to the already un-registered consumer actor.
*/
private def handleConsumerUnregistered(event: ConsumerUnregistered) {
CamelContextManager.context.stopRoute(event.id)
log.info("unpublished actor %s (%s) from endpoint %s" format (event.clazz, event.id, event.uri))
}
}
/**
* Defines the route to a consumer actor.
*
* @param endpointUri endpoint URI of the consumer actor
* @param id actor identifier
* @param uuid <code>true</code> if <code>id</code> refers to Actor.uuid, <code>false</code> if
* <code>id</code> refers to Actor.getId.
*
* @author Martin Krasser
*/
class ConsumerRoute(val endpointUri: String, id: String, uuid: Boolean) extends RouteBuilder {
// TODO: make conversions configurable
private val bodyConversions = Map(
"file" -> classOf[InputStream]
)
def configure = {
val schema = endpointUri take endpointUri.indexOf(":") // e.g. "http" from "http://whatever/..."
bodyConversions.get(schema) match {
case Some(clazz) => from(endpointUri).routeId(id).convertBodyTo(clazz).to(actorUri)
case None => from(endpointUri).routeId(id).to(actorUri)
}
}
private def actorUri = (if (uuid) "actor:uuid:%s" else "actor:id:%s") format id
}
/**
* A registration listener that triggers publication and un-publication of consumer actors.
*
* @author Martin Krasser
*/
class PublishRequestor(consumerPublisher: ActorRef) extends Actor {
protected def receive = {
case ActorRegistered(actor) => for (event <- ConsumerRegistered.forConsumer(actor)) consumerPublisher ! event
case ActorUnregistered(actor) => for (event <- ConsumerUnregistered.forConsumer(actor)) consumerPublisher ! event
}
}
/**
* Consumer actor lifecycle event.
*
* @author Martin Krasser
*/
sealed trait ConsumerEvent
/**
* Event indicating that a consumer actor has been registered at the actor registry.
*
* @param clazz clazz name of the referenced actor
* @param uri endpoint URI of the consumer actor
* @param id actor identifier
* @param uuid <code>true</code> if <code>id</code> is the actor's uuid, <code>false</code> if
* <code>id</code> is the actor's id.
*
* @author Martin Krasser
*/
case class ConsumerRegistered(clazz: String, uri: String, id: String, uuid: Boolean) extends ConsumerEvent
/**
* Event indicating that a consumer actor has been unregistered from the actor registry.
*
* @param clazz clazz name of the referenced actor
* @param uri endpoint URI of the consumer actor
* @param id actor identifier
* @param uuid <code>true</code> if <code>id</code> is the actor's uuid, <code>false</code> if
* <code>id</code> is the actor's id.
*
* @author Martin Krasser
*/
case class ConsumerUnregistered(clazz: String, uri: String, id: String, uuid: Boolean) extends ConsumerEvent
/**
* @author Martin Krasser
*/
private[camel] object ConsumerRegistered {
/**
* Optionally creates an ConsumerRegistered event message for a consumer actor or None if
* <code>actorRef</code> is not a consumer actor.
*/
def forConsumer(actorRef: ActorRef): Option[ConsumerRegistered] = actorRef match {
case ConsumerDescriptor(clazz, uri, id, uuid) => Some(ConsumerRegistered(clazz, uri, id, uuid))
case _ => None
}
}
/**
* @author Martin Krasser
*/
private[camel] object ConsumerUnregistered {
/**
* Optionally creates an ConsumerUnregistered event message for a consumer actor or None if
* <code>actorRef</code> is not a consumer actor.
*/
def forConsumer(actorRef: ActorRef): Option[ConsumerUnregistered] = actorRef match {
case ConsumerDescriptor(clazz, uri, id, uuid) => Some(ConsumerUnregistered(clazz, uri, id, uuid))
case _ => None
}
}
/**
* Describes a consumer actor with elements that are relevant for publishing an actor at a
* Camel endpoint (or unpublishing an actor from an endpoint).
*
* @author Martin Krasser
*/
private[camel] object ConsumerDescriptor {
/**
* An extractor that optionally creates a 4-tuple from a consumer actor reference containing
* the target actor's class name, endpoint URI, identifier and a hint whether the identifier
* is the actor uuid or actor id. If <code>actorRef</code> doesn't reference a consumer actor,
* None is returned.
*/
def unapply(actorRef: ActorRef): Option[(String, String, String, Boolean)] =
unapplyConsumerInstance(actorRef) orElse unapplyConsumeAnnotated(actorRef)
private def unapplyConsumeAnnotated(actorRef: ActorRef): Option[(String, String, String, Boolean)] = {
val annotation = actorRef.actorClass.getAnnotation(classOf[consume])
if (annotation eq null) None
else if (actorRef.remoteAddress.isDefined) None
else Some((actorRef.actor.getClass.getName, annotation.value, actorRef.id, false))
}
private def unapplyConsumerInstance(actorRef: ActorRef): Option[(String, String, String, Boolean)] =
if (!actorRef.actor.isInstanceOf[Consumer]) None
else if (actorRef.remoteAddress.isDefined) None
else Some((actorRef.actor.getClass.getName, actorRef.actor.asInstanceOf[Consumer].endpointUri, actorRef.uuid, true))
}

View file

@ -0,0 +1,34 @@
package se.scalablesolutions.akka.camel;
import org.apache.camel.Body;
import org.apache.camel.Header;
import se.scalablesolutions.akka.actor.annotation.consume;
/**
* @author Martin Krasser
*/
public class PojoBase {
public String m1(String b, String h) {
return "m1base: " + b + " " + h;
}
@consume("direct:m2base")
public String m2(@Body String b, @Header("test") String h) {
return "m2base: " + b + " " + h;
}
@consume("direct:m3base")
public String m3(@Body String b, @Header("test") String h) {
return "m3base: " + b + " " + h;
}
@consume("direct:m4base")
public String m4(@Body String b, @Header("test") String h) {
return "m4base: " + b + " " + h;
}
public void m5(@Body String b, @Header("test") String h) {
}
}

View file

@ -0,0 +1,23 @@
package se.scalablesolutions.akka.camel;
import org.apache.camel.Body;
import org.apache.camel.Header;
import se.scalablesolutions.akka.actor.annotation.consume;
/**
* @author Martin Krasser
*/
public class PojoImpl implements PojoIntf {
public String m1(String b, String h) {
return "m1impl: " + b + " " + h;
}
@consume("direct:m2impl")
public String m2(@Body String b, @Header("test") String h) {
return "m2impl: " + b + " " + h;
}
}

View file

@ -0,0 +1,18 @@
package se.scalablesolutions.akka.camel;
import org.apache.camel.Body;
import org.apache.camel.Header;
import se.scalablesolutions.akka.actor.annotation.consume;
/**
* @author Martin Krasser
*/
public interface PojoIntf {
public String m1(String b, String h);
@consume("direct:m2intf")
public String m2(@Body String b, @Header("test") String h);
}

View file

@ -0,0 +1,14 @@
package se.scalablesolutions.akka.camel;
import se.scalablesolutions.akka.actor.annotation.consume;
/**
* @author Martin Krasser
*/
public class PojoSingle {
@consume("direct:foo")
public void foo(String b) {
}
}

View file

@ -0,0 +1,27 @@
package se.scalablesolutions.akka.camel;
import org.apache.camel.Body;
import org.apache.camel.Header;
import se.scalablesolutions.akka.actor.annotation.consume;
public class PojoSub extends PojoBase {
@Override
@consume("direct:m1sub")
public String m1(@Body String b, @Header("test") String h) {
return "m1sub: " + b + " " + h;
}
@Override
public String m2(String b, String h) {
return "m2sub: " + b + " " + h;
}
@Override
@consume("direct:m3sub")
public String m3(@Body String b, @Header("test") String h) {
return "m3sub: " + b + " " + h;
}
}

View file

@ -1,53 +1,36 @@
package se.scalablesolutions.akka.camel.service
package se.scalablesolutions.akka.camel
import java.util.concurrent.{CountDownLatch, TimeUnit}
import org.apache.camel.builder.RouteBuilder
import org.scalatest.{GivenWhenThen, BeforeAndAfterAll, FeatureSpec}
import se.scalablesolutions.akka.actor.{Actor, ActorRegistry}
import se.scalablesolutions.akka.camel.{CamelContextManager, Message, Consumer}
import Actor._
object CamelServiceFeatureTest {
class TestConsumer(uri: String) extends Actor with Consumer {
def endpointUri = uri
protected def receive = {
case msg: Message => self.reply("received %s" format msg.body)
}
}
class TestActor extends Actor {
self.id = "custom-actor-id"
protected def receive = {
case msg: Message => self.reply("received %s" format msg.body)
}
}
class TestRoute extends RouteBuilder {
def configure {
from("direct:custom-route-test-1").to("actor:custom-actor-id")
}
}
}
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.actor.{ActiveObject, Actor, ActorRegistry}
class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with GivenWhenThen {
import CamelServiceFeatureTest._
var service: CamelService = CamelService.newInstance
var service: CamelService = _
override protected def beforeAll = {
ActorRegistry.shutdownAll
// create new CamelService instance
service = CamelService.newInstance
// register test consumer before starting the CamelService
actorOf(new TestConsumer("direct:publish-test-1")).start
// Consigure a custom camel route
// Configure a custom camel route
CamelContextManager.init
CamelContextManager.context.addRoutes(new TestRoute)
// set expectations for testing purposes
service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].expectPublishCount(1)
// start consumer publisher, otherwise we cannot set message
// count expectations in the next step (needed for testing only).
service.consumerPublisher.start
// set expectations on publish count
val latch = service.consumerPublisher.!![CountDownLatch](SetExpectedMessageCount(1)).get
// start the CamelService
service.load
// await publication of first test consumer
service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].awaitPublish
assert(latch.await(5000, TimeUnit.MILLISECONDS))
}
override protected def afterAll = {
@ -60,9 +43,9 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi
scenario("access registered consumer actors via Camel direct-endpoints") {
given("two consumer actors registered before and after CamelService startup")
service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].expectPublishCount(1)
val latch = service.consumerPublisher.!![CountDownLatch](SetExpectedMessageCount(1)).get
actorOf(new TestConsumer("direct:publish-test-2")).start
service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].awaitPublish
assert(latch.await(5000, TimeUnit.MILLISECONDS))
when("requests are sent to these actors")
val response1 = CamelContextManager.template.requestBody("direct:publish-test-1", "msg1")
@ -81,14 +64,14 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi
given("a consumer actor that has been stopped")
assert(CamelContextManager.context.hasEndpoint(endpointUri) eq null)
service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].expectPublishCount(1)
var latch = service.consumerPublisher.!![CountDownLatch](SetExpectedMessageCount(1)).get
val consumer = actorOf(new TestConsumer(endpointUri)).start
service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].awaitPublish
assert(latch.await(5000, TimeUnit.MILLISECONDS))
assert(CamelContextManager.context.hasEndpoint(endpointUri) ne null)
service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].expectUnpublishCount(1)
latch = service.consumerPublisher.!![CountDownLatch](SetExpectedMessageCount(1)).get
consumer.stop
service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].awaitUnpublish
assert(latch.await(5000, TimeUnit.MILLISECONDS))
// endpoint is still there but the route has been stopped
assert(CamelContextManager.context.hasEndpoint(endpointUri) ne null)
@ -114,4 +97,48 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi
assert(response === "received msg3")
}
}
feature("Publish active object methods in the global CamelContext") {
scenario("access active object methods via Camel direct-endpoints") {
given("an active object registered after CamelService startup")
val latch = service.consumerPublisher.!![CountDownLatch](SetExpectedMessageCount(3)).get
ActiveObject.newInstance(classOf[PojoBase])
assert(latch.await(5000, TimeUnit.MILLISECONDS))
when("requests are sent to published methods")
val response1 = CamelContextManager.template.requestBodyAndHeader("direct:m2base", "x", "test", "y")
val response2 = CamelContextManager.template.requestBodyAndHeader("direct:m3base", "x", "test", "y")
val response3 = CamelContextManager.template.requestBodyAndHeader("direct:m4base", "x", "test", "y")
then("each should have returned a different response")
assert(response1 === "m2base: x y")
assert(response2 === "m3base: x y")
assert(response3 === "m4base: x y")
}
}
}
object CamelServiceFeatureTest {
class TestConsumer(uri: String) extends Actor with Consumer {
def endpointUri = uri
protected def receive = {
case msg: Message => self.reply("received %s" format msg.body)
}
}
class TestActor extends Actor {
self.id = "custom-actor-id"
protected def receive = {
case msg: Message => self.reply("received %s" format msg.body)
}
}
class TestRoute extends RouteBuilder {
def configure {
from("direct:custom-route-test-1").to("actor:custom-actor-id")
}
}
}

View file

@ -0,0 +1,46 @@
package se.scalablesolutions.akka.camel
import java.net.InetSocketAddress
import org.junit.Test
import org.scalatest.junit.JUnitSuite
import se.scalablesolutions.akka.actor.{AspectInit, ActiveObject}
import se.scalablesolutions.akka.camel.ConsumerMethodRegistered._
class ConsumerMethodRegisteredTest extends JUnitSuite {
val remoteAddress = new InetSocketAddress("localhost", 8888);
val remoteAspectInit = AspectInit(classOf[String], null, Some(remoteAddress), 1000)
val localAspectInit = AspectInit(classOf[String], null, None, 1000)
val activePojoBase = ActiveObject.newInstance(classOf[PojoBase])
val activePojoSub = ActiveObject.newInstance(classOf[PojoSub])
val activePojoIntf = ActiveObject.newInstance(classOf[PojoIntf], new PojoImpl)
val ascendingMethodName = (r1: ConsumerMethodRegistered, r2: ConsumerMethodRegistered) =>
r1.method.getName < r2.method.getName
@Test def shouldSelectPojoBaseMethods234 = {
val registered = forConsumer(activePojoBase, localAspectInit).sortWith(ascendingMethodName)
assert(registered.size === 3)
assert(registered.map(_.method.getName) === List("m2", "m3", "m4"))
}
@Test def shouldSelectPojoSubMethods134 = {
val registered = forConsumer(activePojoSub, localAspectInit).sortWith(ascendingMethodName)
assert(registered.size === 3)
assert(registered.map(_.method.getName) === List("m1", "m3", "m4"))
}
@Test def shouldSelectPojoIntfMethod2 = {
val registered = forConsumer(activePojoIntf, localAspectInit)
assert(registered.size === 1)
assert(registered(0).method.getName === "m2")
}
@Test def shouldIgnoreRemoteProxies = {
val registered = forConsumer(activePojoBase, remoteAspectInit)
assert(registered.size === 0)
}
}

View file

@ -1,4 +1,4 @@
package se.scalablesolutions.akka.camel.service
package se.scalablesolutions.akka.camel
import org.junit.Test
import org.scalatest.junit.JUnitSuite
@ -6,7 +6,6 @@ import org.scalatest.junit.JUnitSuite
import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.actor.annotation.consume
import se.scalablesolutions.akka.camel.Consumer
object ConsumerRegisteredTest {
@consume("mock:test1")
@ -29,20 +28,22 @@ class ConsumerRegisteredTest extends JUnitSuite {
import ConsumerRegisteredTest._
@Test def shouldCreatePublishRequestList = {
val as = List(actorOf[ConsumeAnnotatedActor])
val a = actorOf[ConsumeAnnotatedActor]
val as = List(a)
val events = for (a <- as; e <- ConsumerRegistered.forConsumer(a)) yield e
assert(events === List(ConsumerRegistered(classOf[ConsumeAnnotatedActor].getName, "mock:test1", "test", false)))
assert(events === List(ConsumerRegistered(a, "mock:test1", "test", false)))
}
@Test def shouldCreateSomePublishRequestWithActorId = {
val event = ConsumerRegistered.forConsumer(actorOf[ConsumeAnnotatedActor])
assert(event === Some(ConsumerRegistered(classOf[ConsumeAnnotatedActor].getName, "mock:test1", "test", false)))
val a = actorOf[ConsumeAnnotatedActor]
val event = ConsumerRegistered.forConsumer(a)
assert(event === Some(ConsumerRegistered(a, "mock:test1", "test", false)))
}
@Test def shouldCreateSomePublishRequestWithActorUuid = {
val ca = actorOf[ConsumerActor]
val event = ConsumerRegistered.forConsumer(ca)
assert(event === Some(ConsumerRegistered(ca.actor.getClass.getName, "mock:test2", ca.uuid, true)))
assert(event === Some(ConsumerRegistered(ca, "mock:test2", ca.uuid, true)))
}
@Test def shouldCreateNone = {

View file

@ -0,0 +1,69 @@
package se.scalablesolutions.akka.camel
import java.util.concurrent.{CountDownLatch, TimeUnit}
import org.junit.{Before, After, Test}
import org.scalatest.junit.JUnitSuite
import se.scalablesolutions.akka.actor._
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.camel.support.{SetExpectedMessageCount => SetExpectedTestMessageCount, _}
class PublishRequestorTest extends JUnitSuite {
import PublishRequestorTest._
var publisher: ActorRef = _
var requestor: ActorRef = _
var consumer: ActorRef = _
@Before def setUp = {
publisher = actorOf[PublisherMock].start
requestor = actorOf[PublishRequestor].start
requestor ! PublishRequestorInit(publisher)
consumer = actorOf(new Actor with Consumer {
def endpointUri = "mock:test"
protected def receive = null
}).start
}
@After def tearDown = {
ActorRegistry.shutdownAll
}
@Test def shouldReceiveConsumerMethodRegisteredEvent = {
val obj = ActiveObject.newInstance(classOf[PojoSingle])
val init = AspectInit(classOf[PojoSingle], null, None, 1000)
val latch = publisher.!![CountDownLatch](SetExpectedTestMessageCount(1)).get
requestor ! AspectInitRegistered(obj, init)
assert(latch.await(5000, TimeUnit.MILLISECONDS))
val event = (publisher !! GetRetainedMessage).get.asInstanceOf[ConsumerMethodRegistered]
assert(event.init === init)
assert(event.uri === "direct:foo")
assert(event.activeObject === obj)
assert(event.method.getName === "foo")
}
@Test def shouldReceiveConsumerRegisteredEvent = {
val latch = publisher.!![CountDownLatch](SetExpectedTestMessageCount(1)).get
requestor ! ActorRegistered(consumer)
assert(latch.await(5000, TimeUnit.MILLISECONDS))
assert((publisher !! GetRetainedMessage) ===
Some(ConsumerRegistered(consumer, "mock:test", consumer.uuid, true)))
}
@Test def shouldReceiveConsumerUnregisteredEvent = {
val latch = publisher.!![CountDownLatch](SetExpectedTestMessageCount(1)).get
requestor ! ActorUnregistered(consumer)
assert(latch.await(5000, TimeUnit.MILLISECONDS))
assert((publisher !! GetRetainedMessage) ===
Some(ConsumerUnregistered(consumer, "mock:test", consumer.uuid, true)))
}
}
object PublishRequestorTest {
class PublisherMock extends TestActor with Retain with Countdown {
def handler = retain andThen countdown
}
}

View file

@ -0,0 +1,74 @@
package se.scalablesolutions.akka.camel.component
import org.scalatest.{BeforeAndAfterEach, BeforeAndAfterAll, FeatureSpec}
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.camel._
import se.scalablesolutions.akka.actor.{ActorRegistry, ActiveObject}
import org.apache.camel.{ExchangePattern, Exchange, Processor}
/**
* @author Martin Krasser
*/
class ActiveObjectComponentFeatureTest extends FeatureSpec with BeforeAndAfterAll with BeforeAndAfterEach {
override protected def beforeAll = {
val activePojoBase = ActiveObject.newInstance(classOf[PojoBase])
val activePojoIntf = ActiveObject.newInstance(classOf[PojoIntf], new PojoImpl)
CamelContextManager.init
CamelContextManager.start
CamelContextManager.activeObjectRegistry.put("base", activePojoBase)
CamelContextManager.activeObjectRegistry.put("intf", activePojoIntf)
}
override protected def afterAll = {
CamelContextManager.stop
ActorRegistry.shutdownAll
}
feature("Communicate with an active object from a Camel application using active object endpoint URIs") {
import ActiveObjectComponent.InternalSchema
import CamelContextManager.template
import ExchangePattern._
scenario("in-out exchange with proxy created from interface and method returning String") {
val result = template.requestBodyAndHeader("%s:intf?method=m2" format InternalSchema, "x", "test", "y")
assert(result === "m2impl: x y")
}
scenario("in-out exchange with proxy created from class and method returning String") {
val result = template.requestBodyAndHeader("%s:base?method=m2" format InternalSchema, "x", "test", "y")
assert(result === "m2base: x y")
}
scenario("in-out exchange with proxy created from class and method returning void") {
val result = template.requestBodyAndHeader("%s:base?method=m5" format InternalSchema, "x", "test", "y")
assert(result === "x") // returns initial body
}
scenario("in-only exchange with proxy created from class and method returning String") {
val result = template.send("%s:base?method=m2" format InternalSchema, InOnly, new Processor {
def process(exchange: Exchange) = {
exchange.getIn.setBody("x")
exchange.getIn.setHeader("test", "y")
}
});
assert(result.getPattern === InOnly)
assert(result.getIn.getBody === "m2base: x y")
assert(result.getOut.getBody === null)
}
scenario("in-only exchange with proxy created from class and method returning void") {
val result = template.send("%s:base?method=m5" format InternalSchema, InOnly, new Processor {
def process(exchange: Exchange) = {
exchange.getIn.setBody("x")
exchange.getIn.setHeader("test", "y")
}
});
assert(result.getPattern === InOnly)
assert(result.getIn.getBody === "x")
assert(result.getOut.getBody === null)
}
}
}

View file

@ -1,11 +1,14 @@
package se.scalablesolutions.akka.camel.component
import java.util.concurrent.{TimeUnit, CountDownLatch}
import org.apache.camel.RuntimeCamelException
import org.scalatest.{BeforeAndAfterEach, BeforeAndAfterAll, FeatureSpec}
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.actor.{ActorRegistry, Actor}
import se.scalablesolutions.akka.camel.support.{Respond, Countdown, Tester, Retain}
import se.scalablesolutions.akka.camel.{Message, CamelContextManager}
import se.scalablesolutions.akka.camel.support._
class ActorComponentFeatureTest extends FeatureSpec with BeforeAndAfterAll with BeforeAndAfterEach {
override protected def beforeAll = {
@ -20,41 +23,37 @@ class ActorComponentFeatureTest extends FeatureSpec with BeforeAndAfterAll with
feature("Communicate with an actor from a Camel application using actor endpoint URIs") {
import CamelContextManager.template
import Actor._
scenario("one-way communication using actor id") {
val actor = actorOf(new Tester with Retain with Countdown[Message])
actor.start
val actor = actorOf[Tester1].start
val latch = actor.!![CountDownLatch](SetExpectedMessageCount(1)).get
template.sendBody("actor:%s" format actor.id, "Martin")
assert(actor.actor.asInstanceOf[Countdown[Message]].waitFor)
assert(actor.actor.asInstanceOf[Retain].body === "Martin")
assert(latch.await(5000, TimeUnit.MILLISECONDS))
val reply = (actor !! GetRetainedMessage).get.asInstanceOf[Message]
assert(reply.body === "Martin")
}
scenario("one-way communication using actor uuid") {
val actor = actorOf(new Tester with Retain with Countdown[Message])
actor.start
val actor = actorOf[Tester1].start
val latch = actor.!![CountDownLatch](SetExpectedMessageCount(1)).get
template.sendBody("actor:uuid:%s" format actor.uuid, "Martin")
assert(actor.actor.asInstanceOf[Countdown[Message]].waitFor)
assert(actor.actor.asInstanceOf[Retain].body === "Martin")
assert(latch.await(5000, TimeUnit.MILLISECONDS))
val reply = (actor !! GetRetainedMessage).get.asInstanceOf[Message]
assert(reply.body === "Martin")
}
scenario("two-way communication using actor id") {
val actor = actorOf(new Tester with Respond)
actor.start
val actor = actorOf[Tester2].start
assert(template.requestBody("actor:%s" format actor.id, "Martin") === "Hello Martin")
}
scenario("two-way communication using actor uuid") {
val actor = actorOf(new Tester with Respond)
actor.start
val actor = actorOf[Tester2].start
assert(template.requestBody("actor:uuid:%s" format actor.uuid, "Martin") === "Hello Martin")
}
scenario("two-way communication with timeout") {
val actor = actorOf(new Tester {
self.timeout = 1
})
actor.start
val actor = actorOf[Tester3].start
intercept[RuntimeCamelException] {
template.requestBody("actor:uuid:%s" format actor.uuid, "Martin")
}

View file

@ -2,41 +2,41 @@ package se.scalablesolutions.akka.camel.component
import ActorComponentTest._
import java.util.concurrent.TimeoutException
import java.util.concurrent.{CountDownLatch, TimeoutException, TimeUnit}
import org.apache.camel.ExchangePattern
import org.junit.{After, Test}
import org.scalatest.junit.JUnitSuite
import org.scalatest.BeforeAndAfterAll
import se.scalablesolutions.akka.actor.ActorRegistry
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.camel.support.{Countdown, Retain, Tester, Respond}
import se.scalablesolutions.akka.actor.ActorRegistry
import se.scalablesolutions.akka.camel.{Failure, Message}
import se.scalablesolutions.akka.camel.support._
class ActorProducerTest extends JUnitSuite with BeforeAndAfterAll {
@After def tearDown = ActorRegistry.shutdownAll
@Test def shouldSendMessageToActor = {
val actor = actorOf(new Tester with Retain with Countdown[Message])
val actor = actorOf[Tester1].start
val latch = actor.!![CountDownLatch](SetExpectedMessageCount(1)).get
val endpoint = mockEndpoint("actor:uuid:%s" format actor.uuid)
val exchange = endpoint.createExchange(ExchangePattern.InOnly)
actor.start
exchange.getIn.setBody("Martin")
exchange.getIn.setHeader("k1", "v1")
endpoint.createProducer.process(exchange)
actor.actor.asInstanceOf[Countdown[Message]].waitFor
assert(actor.actor.asInstanceOf[Retain].body === "Martin")
assert(actor.actor.asInstanceOf[Retain].headers === Map(Message.MessageExchangeId -> exchange.getExchangeId, "k1" -> "v1"))
assert(latch.await(5000, TimeUnit.MILLISECONDS))
val reply = (actor !! GetRetainedMessage).get.asInstanceOf[Message]
assert(reply.body === "Martin")
assert(reply.headers === Map(Message.MessageExchangeId -> exchange.getExchangeId, "k1" -> "v1"))
}
@Test def shouldSendMessageToActorAndReceiveResponse = {
val actor = actorOf(new Tester with Respond {
val actor = actorOf(new Tester2 {
override def response(msg: Message) = Message(super.response(msg), Map("k2" -> "v2"))
})
}).start
val endpoint = mockEndpoint("actor:uuid:%s" format actor.uuid)
val exchange = endpoint.createExchange(ExchangePattern.InOut)
actor.start
exchange.getIn.setBody("Martin")
exchange.getIn.setHeader("k1", "v1")
endpoint.createProducer.process(exchange)
@ -45,12 +45,11 @@ class ActorProducerTest extends JUnitSuite with BeforeAndAfterAll {
}
@Test def shouldSendMessageToActorAndReceiveFailure = {
val actor = actorOf(new Tester with Respond {
val actor = actorOf(new Tester2 {
override def response(msg: Message) = Failure(new Exception("testmsg"), Map("k3" -> "v3"))
})
}).start
val endpoint = mockEndpoint("actor:uuid:%s" format actor.uuid)
val exchange = endpoint.createExchange(ExchangePattern.InOut)
actor.start
exchange.getIn.setBody("Martin")
exchange.getIn.setHeader("k1", "v1")
endpoint.createProducer.process(exchange)
@ -60,12 +59,9 @@ class ActorProducerTest extends JUnitSuite with BeforeAndAfterAll {
}
@Test def shouldSendMessageToActorAndTimeout: Unit = {
val actor = actorOf(new Tester {
self.timeout = 1
})
val actor = actorOf[Tester3].start
val endpoint = mockEndpoint("actor:uuid:%s" format actor.uuid)
val exchange = endpoint.createExchange(ExchangePattern.InOut)
actor.start
exchange.getIn.setBody("Martin")
intercept[TimeoutException] {
endpoint.createProducer.process(exchange)

View file

@ -1,55 +0,0 @@
package se.scalablesolutions.akka.camel.service
import _root_.org.junit.{Before, After, Test}
import org.scalatest.junit.JUnitSuite
import se.scalablesolutions.akka.camel.Consumer
import se.scalablesolutions.akka.camel.support.{Receive, Countdown}
import se.scalablesolutions.akka.actor.{Actor, ActorRef, ActorRegistry, ActorRegistered, ActorUnregistered}
import Actor._
object PublishRequestorTest {
class PublisherMock extends Actor with Receive[ConsumerEvent] {
var received: ConsumerEvent = _
protected def receive = {
case msg: ConsumerRegistered => onMessage(msg)
case msg: ConsumerUnregistered => onMessage(msg)
}
def onMessage(msg: ConsumerEvent) = received = msg
}
}
class PublishRequestorTest extends JUnitSuite {
import PublishRequestorTest._
var publisher: ActorRef = _
var requestor: ActorRef = _
var consumer: ActorRef = _
@Before def setUp = {
publisher = actorOf(new PublisherMock with Countdown[ConsumerEvent]).start
requestor = actorOf(new PublishRequestor(publisher)).start
consumer = actorOf(new Actor with Consumer {
def endpointUri = "mock:test"
protected def receive = null
}).start
}
@After def tearDown = {
ActorRegistry.shutdownAll
}
@Test def shouldReceiveConsumerRegisteredEvent = {
requestor.!(ActorRegistered(consumer))(None)
publisher.actor.asInstanceOf[Countdown[ConsumerEvent]].waitFor
assert(publisher.actor.asInstanceOf[PublisherMock].received ===
ConsumerRegistered(consumer.actor.getClass.getName, "mock:test", consumer.uuid, true))
}
@Test def shouldReceiveConsumerUnregisteredEvent = {
requestor.!(ActorUnregistered(consumer))(None)
publisher.actor.asInstanceOf[Countdown[ConsumerRegistered]].waitFor
assert(publisher.actor.asInstanceOf[PublisherMock].received ===
ConsumerUnregistered(consumer.actor.getClass.getName, "mock:test", consumer.uuid, true))
}
}

View file

@ -5,45 +5,71 @@ import java.util.concurrent.{TimeUnit, CountDownLatch}
import se.scalablesolutions.akka.camel.Message
import se.scalablesolutions.akka.actor.Actor
trait Receive[T] {
def onMessage(msg: T): Unit
import TestSupport._
object TestSupport {
type Handler = PartialFunction[Any, Any]
}
trait Respond extends Receive[Message] { this: Actor =>
abstract override def onMessage(msg: Message): Unit = {
super.onMessage(msg)
this.self.reply(response(msg))
trait TestActor extends Actor {
def receive = {
case msg => {
handler(msg)
}
}
def handler: Handler
}
class Tester1 extends TestActor with Retain with Countdown {
def handler = retain andThen countdown
}
class Tester2 extends TestActor with Respond {
def handler = respond
}
class Tester3 extends TestActor with Noop {
self.timeout = 1
def handler = noop
}
trait Countdown { this: Actor =>
var latch: CountDownLatch = new CountDownLatch(0)
def countdown: Handler = {
case SetExpectedMessageCount(num) => {
latch = new CountDownLatch(num)
self.reply(latch)
}
case msg => latch.countDown
}
}
trait Respond { this: Actor =>
def respond: Handler = {
case msg: Message => self.reply(response(msg))
}
def response(msg: Message): Any = "Hello %s" format msg.body
}
trait Retain extends Receive[Message] {
var body: Any = _
var headers = Map.empty[String, Any]
abstract override def onMessage(msg: Message): Unit = {
super.onMessage(msg)
body = msg.body
headers = msg.headers
trait Retain { this: Actor =>
var message: Any = _
def retain: Handler = {
case GetRetainedMessage => self.reply(message)
case msg => {
message = msg
msg
}
}
}
trait Countdown[T] extends Receive[T] {
val count = 1
val duration = 5000
val latch = new CountDownLatch(count)
def waitFor = latch.await(duration, TimeUnit.MILLISECONDS)
def countDown = latch.countDown
abstract override def onMessage(msg: T) = {
super.onMessage(msg)
countDown
trait Noop { this: Actor =>
def noop: Handler = {
case msg => msg
}
}
class Tester extends Actor with Receive[Message] {
def receive = {
case msg: Message => onMessage(msg)
}
def onMessage(msg: Message): Unit = {}
}
case class SetExpectedMessageCount(num: Int)
case class GetRetainedMessage()

View file

@ -10,7 +10,7 @@ import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface consume {
public abstract String value();

View file

@ -476,7 +476,7 @@ object ActiveObject extends Logging {
}
private[akka] object AspectInitRegistry {
private[akka] object AspectInitRegistry extends ListenerManagement {
private val initializations = new java.util.concurrent.ConcurrentHashMap[AnyRef, AspectInit]
def initFor(target: AnyRef) = {
@ -485,9 +485,16 @@ private[akka] object AspectInitRegistry {
init
}
def register(target: AnyRef, init: AspectInit) = initializations.put(target, init)
def register(target: AnyRef, init: AspectInit) = {
val res = initializations.put(target, init)
foreachListener(_ ! AspectInitRegistered(target, init))
res
}
}
private[akka] sealed trait AspectInitRegistryEvent
private[akka] case class AspectInitRegistered(proxy: AnyRef, init: AspectInit) extends AspectInitRegistryEvent
private[akka] sealed case class AspectInit(
val target: Class[_],
val actorRef: ActorRef,

View file

@ -4,11 +4,10 @@
package se.scalablesolutions.akka.actor
import se.scalablesolutions.akka.util.Logging
import scala.collection.mutable.ListBuffer
import scala.reflect.Manifest
import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap}
import se.scalablesolutions.akka.util.ListenerManagement
sealed trait ActorRegistryEvent
case class ActorRegistered(actor: ActorRef) extends ActorRegistryEvent
@ -26,11 +25,10 @@ case class ActorUnregistered(actor: ActorRef) extends ActorRegistryEvent
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object ActorRegistry extends Logging {
object ActorRegistry extends ListenerManagement {
private val actorsByUUID = new ConcurrentHashMap[String, ActorRef]
private val actorsById = new ConcurrentHashMap[String, List[ActorRef]]
private val actorsByClassName = new ConcurrentHashMap[String, List[ActorRef]]
private val registrationListeners = new CopyOnWriteArrayList[ActorRef]
/**
* Returns all actors in the system.
@ -141,29 +139,4 @@ object ActorRegistry extends Logging {
actorsByClassName.clear
log.info("All actors have been shut down and unregistered from ActorRegistry")
}
/**
* Adds the registration <code>listener</code> this this registry's listener list.
*/
def addRegistrationListener(listener: ActorRef) = {
listener.start
registrationListeners.add(listener)
}
/**
* Removes the registration <code>listener</code> this this registry's listener list.
*/
def removeRegistrationListener(listener: ActorRef) = {
listener.stop
registrationListeners.remove(listener)
}
private def foreachListener(f: (ActorRef) => Unit) {
val iterator = registrationListeners.iterator
while (iterator.hasNext) {
val listener = iterator.next
if (listener.isRunning) f(listener)
else log.warning("Can't send ActorRegistryEvent to [%s] since it is not running.", listener)
}
}
}

View file

@ -0,0 +1,49 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.util
import java.util.concurrent.CopyOnWriteArrayList
import se.scalablesolutions.akka.actor.ActorRef
/**
* A manager for listener actors. Intended for mixin by observables.
*
* @author Martin Krasser
*/
trait ListenerManagement extends Logging {
private val listeners = new CopyOnWriteArrayList[ActorRef]
/**
* Adds the <code>listener</code> this this registry's listener list.
* The <code>listener</code> is started by this method.
*/
def addListener(listener: ActorRef) = {
listener.start
listeners.add(listener)
}
/**
* Removes the <code>listener</code> this this registry's listener list.
* The <code>listener</code> is stopped by this method.
*/
def removeListener(listener: ActorRef) = {
listener.stop
listeners.remove(listener)
}
/**
* Execute <code>f</code> with each listener as argument.
*/
protected def foreachListener(f: (ActorRef) => Unit) {
val iterator = listeners.iterator
while (iterator.hasNext) {
val listener = iterator.next
if (listener.isRunning) f(listener)
else log.warning("Can't notify [%s] since it is not running.", listener)
}
}
}

View file

@ -6,7 +6,7 @@ package se.scalablesolutions.akka.servlet
import se.scalablesolutions.akka.remote.BootableRemoteActorService
import se.scalablesolutions.akka.actor.BootableActorLoaderService
import se.scalablesolutions.akka.camel.service.CamelService
import se.scalablesolutions.akka.camel.CamelService
import se.scalablesolutions.akka.config.Config
import se.scalablesolutions.akka.util.{Logging, Bootable}

View file

@ -7,7 +7,7 @@ package se.scalablesolutions.akka.kernel
import se.scalablesolutions.akka.servlet.AkkaLoader
import se.scalablesolutions.akka.remote.BootableRemoteActorService
import se.scalablesolutions.akka.actor.BootableActorLoaderService
import se.scalablesolutions.akka.camel.service.CamelService
import se.scalablesolutions.akka.camel.CamelService
import se.scalablesolutions.akka.config.Config
object Main {

View file

@ -0,0 +1,24 @@
package sample.camel;
import org.apache.camel.Body;
import org.apache.camel.Header;
import se.scalablesolutions.akka.actor.annotation.consume;
/**
* @author Martin Krasser
*/
public class Consumer10 {
@consume("file:data/input2")
public void foo(String body) {
System.out.println("Received message:");
System.out.println(body);
}
@consume("jetty:http://0.0.0.0:8877/camel/active")
public String bar(@Body String body, @Header("name") String header) {
return String.format("body=%s header=%s", body, header);
}
}

View file

@ -0,0 +1,18 @@
package sample.camel;
import org.apache.camel.Body;
import org.apache.camel.Header;
import se.scalablesolutions.akka.actor.annotation.consume;
/**
* @author Martin Krasser
*/
public class RemoteActiveObject1 {
@consume("jetty:http://localhost:6644/remote-active-object-1")
public String foo(@Body String body, @Header("name") String header) {
return String.format("remote1: body=%s header=%s", body, header);
}
}

View file

@ -0,0 +1,17 @@
package sample.camel;
import org.apache.camel.Body;
import org.apache.camel.Header;
import se.scalablesolutions.akka.actor.annotation.consume;
/**
* @author Martin Krasser
*/
public class RemoteActiveObject2 {
@consume("jetty:http://localhost:6644/remote-active-object-2")
public String foo(@Body String body, @Header("name") String header) {
return String.format("remote2: body=%s header=%s", body, header);
}
}

View file

@ -9,10 +9,10 @@ import se.scalablesolutions.akka.util.Logging
* Client-initiated remote actor.
*/
class RemoteActor1 extends RemoteActor("localhost", 7777) with Consumer {
def endpointUri = "jetty:http://localhost:6644/remote1"
def endpointUri = "jetty:http://localhost:6644/remote-actor-1"
protected def receive = {
case msg: Message => self.reply(Message("hello %s" format msg.body, Map("sender" -> "remote1")))
case msg: Message => self.reply(Message("hello %s" format msg.bodyAs[String], Map("sender" -> "remote1")))
}
}
@ -20,10 +20,10 @@ class RemoteActor1 extends RemoteActor("localhost", 7777) with Consumer {
* Server-initiated remote actor.
*/
class RemoteActor2 extends Actor with Consumer {
def endpointUri = "jetty:http://localhost:6644/remote2"
def endpointUri = "jetty:http://localhost:6644/remote-actor-2"
protected def receive = {
case msg: Message => self.reply(Message("hello %s" format msg.body, Map("sender" -> "remote2")))
case msg: Message => self.reply(Message("hello %s" format msg.bodyAs[String], Map("sender" -> "remote2")))
}
}
@ -37,7 +37,7 @@ class Producer1 extends Actor with Producer {
}
class Consumer1 extends Actor with Consumer with Logging {
def endpointUri = "file:data/input"
def endpointUri = "file:data/input1"
def receive = {
case msg: Message => log.info("received %s" format msg.bodyAs[String])

View file

@ -1,9 +1,9 @@
package sample.camel
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.camel.Message
import se.scalablesolutions.akka.remote.RemoteClient
import se.scalablesolutions.akka.actor.{ActiveObject, Actor, ActorRef}
/**
* @author Martin Krasser
@ -15,15 +15,19 @@ object Application1 {
//
def main(args: Array[String]) {
implicit val sender: Option[ActorRef] = None
val actor1 = actorOf[RemoteActor1]
val actor2 = RemoteClient.actorFor("remote2", "localhost", 7777)
val actobj1 = ActiveObject.newRemoteInstance(classOf[RemoteActiveObject1], "localhost", 7777)
//val actobj2 = TODO: create reference to server-managed active object (RemoteActiveObject2)
actor1.start
println(actor1 !! Message("actor1"))
println(actor2 !! Message("actor2"))
println(actor1 !! Message("actor1")) // activates and publishes actor remotely
println(actor2 !! Message("actor2")) // actor already activated and published remotely
println(actobj1.foo("x", "y")) // activates and publishes active object methods remotely
// ...
}
}

View file

@ -1,6 +1,6 @@
package sample.camel
import se.scalablesolutions.akka.camel.service.CamelService
import se.scalablesolutions.akka.camel.CamelService
import se.scalablesolutions.akka.remote.RemoteNode
import se.scalablesolutions.akka.actor.Actor._

View file

@ -6,8 +6,8 @@ import org.apache.camel.impl.DefaultCamelContext
import org.apache.camel.spring.spi.ApplicationContextRegistry
import org.springframework.context.support.ClassPathXmlApplicationContext
import se.scalablesolutions.akka.actor.SupervisorFactory
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.actor.{ActiveObject, SupervisorFactory}
import se.scalablesolutions.akka.camel.CamelContextManager
import se.scalablesolutions.akka.config.ScalaConfig._
@ -62,6 +62,10 @@ class Boot {
actorOf[Consumer4].start // POSTing "stop" to http://0.0.0.0:8877/camel/stop stops and unpublishes this actor
actorOf[Consumer5].start // POSTing any msg to http://0.0.0.0:8877/camel/start starts and published Consumer4 again.
// Active object example
ActiveObject.newInstance(classOf[Consumer10])
}
class CustomRouteBuilder extends RouteBuilder {