diff --git a/akka-camel/src/main/resources/META-INF/services/org/apache/camel/component/active-object b/akka-camel/src/main/resources/META-INF/services/org/apache/camel/component/active-object new file mode 100644 index 0000000000..5dd88a0671 --- /dev/null +++ b/akka-camel/src/main/resources/META-INF/services/org/apache/camel/component/active-object @@ -0,0 +1 @@ +class=se.scalablesolutions.akka.camel.component.ActiveObjectComponent \ No newline at end of file diff --git a/akka-camel/src/main/scala/CamelContextLifecycle.scala b/akka-camel/src/main/scala/CamelContextLifecycle.scala index 9aee8f5a1d..66646eed57 100644 --- a/akka-camel/src/main/scala/CamelContextLifecycle.scala +++ b/akka-camel/src/main/scala/CamelContextLifecycle.scala @@ -4,9 +4,12 @@ package se.scalablesolutions.akka.camel +import java.util.Map + import org.apache.camel.{ProducerTemplate, CamelContext} import org.apache.camel.impl.DefaultCamelContext +import se.scalablesolutions.akka.camel.component.ActiveObjectComponent import se.scalablesolutions.akka.util.Logging /** @@ -26,7 +29,18 @@ trait CamelContextLifecycle extends Logging { private var _started = false /** - * Returns the managed CamelContext. + * Camel component for accessing active objects. + */ + private[camel] var activeObjectComponent: ActiveObjectComponent = _ + + /** + * Registry in which active objects are TEMPORARILY registered during + * creation of Camel routes to active objects. + */ + private[camel] var activeObjectRegistry: Map[String, AnyRef] = _ + + /** + * Returns the managed CamelContext. */ protected def context: CamelContext = _context @@ -78,10 +92,16 @@ trait CamelContextLifecycle extends Logging { * Initializes this lifecycle object with the given CamelContext. For the passed * CamelContext stream-caching is enabled. If applications want to disable stream- * caching they can do so after this method returned and prior to calling start. + * This method also registers a new + * {@link se.scalablesolutions.akka.camel.component.ActiveObjectComponent} at + * context under a name defined by ActiveObjectComponent.InternalSchema. */ def init(context: CamelContext) { + this.activeObjectComponent = new ActiveObjectComponent + this.activeObjectRegistry = activeObjectComponent.activeObjectRegistry this.context = context this.context.setStreamCaching(true) + this.context.addComponent(ActiveObjectComponent.InternalSchema, activeObjectComponent) this.template = context.createProducerTemplate _initialized = true log.info("Camel context initialized") @@ -90,7 +110,7 @@ trait CamelContextLifecycle extends Logging { /** * Makes a global CamelContext and ProducerTemplate accessible to applications. The lifecycle - * of these objects is managed by se.scalablesolutions.akka.camel.service.CamelService. + * of these objects is managed by se.scalablesolutions.akka.camel.CamelService. */ object CamelContextManager extends CamelContextLifecycle { override def context: CamelContext = super.context diff --git a/akka-camel/src/main/scala/CamelService.scala b/akka-camel/src/main/scala/CamelService.scala new file mode 100644 index 0000000000..0e7fac4c9f --- /dev/null +++ b/akka-camel/src/main/scala/CamelService.scala @@ -0,0 +1,100 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.camel + +import se.scalablesolutions.akka.actor.Actor._ +import se.scalablesolutions.akka.actor.{AspectInitRegistry, ActorRegistry} +import se.scalablesolutions.akka.util.{Bootable, Logging} + +/** + * Used by applications (and the Kernel) to publish consumer actors and active objects via + * Camel endpoints and to manage the life cycle of a a global CamelContext which can be + * accessed via se.scalablesolutions.akka.camel.CamelContextManager.context. + * + * @author Martin Krasser + */ +trait CamelService extends Bootable with Logging { + + import CamelContextManager._ + + private[camel] val consumerPublisher = actorOf[ConsumerPublisher] + private[camel] val publishRequestor = actorOf[PublishRequestor] + + // add listener for actor registration events + ActorRegistry.addListener(publishRequestor) + + // add listener for AspectInit registration events + AspectInitRegistry.addListener(publishRequestor) + + /** + * Starts the CamelService. Any started actor that is a consumer actor will be (asynchronously) + * published as Camel endpoint. Consumer actors that are started after this method returned will + * be published as well. Actor publishing is done asynchronously. A started (loaded) CamelService + * also publishes @consume annotated methods of active objects that have been created + * with ActiveObject.newInstance(..) (and ActiveObject.newInstance(..) + * on a remote node). + */ + abstract override def onLoad = { + super.onLoad + + // Only init and start if not already done by application + if (!initialized) init + if (!started) start + + // start actor that exposes consumer actors and active objects via Camel endpoints + consumerPublisher.start + + // init publishRequestor so that buffered and future events are delivered to consumerPublisher + publishRequestor ! PublishRequestorInit(consumerPublisher) + } + + /** + * Stops the CamelService. + */ + abstract override def onUnload = { + ActorRegistry.removeListener(publishRequestor) + AspectInitRegistry.removeListener(publishRequestor) + consumerPublisher.stop + stop + super.onUnload + } + + /** + * Starts the CamelService. + * + * @see onLoad + */ + def load = onLoad + + /** + * Stops the CamelService. + * + * @see onUnload + */ + def unload = onUnload +} + +/** + * CamelService companion object used by standalone applications to create their own + * CamelService instance. + * + * @author Martin Krasser + */ +object CamelService { + + /** + * Creates a new CamelService instance. + */ + def newInstance: CamelService = new DefaultCamelService +} + +/** + * Default CamelService implementation to be created in Java applications with + *
+ * CamelService service = new DefaultCamelService()
+ * 
+ */ +class DefaultCamelService extends CamelService { +} \ No newline at end of file diff --git a/akka-camel/src/main/scala/ConsumerPublisher.scala b/akka-camel/src/main/scala/ConsumerPublisher.scala new file mode 100644 index 0000000000..9df41a7d92 --- /dev/null +++ b/akka-camel/src/main/scala/ConsumerPublisher.scala @@ -0,0 +1,322 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ +package se.scalablesolutions.akka.camel + +import collection.mutable.ListBuffer + +import java.io.InputStream +import java.lang.reflect.Method +import java.util.concurrent.CountDownLatch + +import org.apache.camel.builder.RouteBuilder + +import se.scalablesolutions.akka.actor._ +import se.scalablesolutions.akka.actor.annotation.consume +import se.scalablesolutions.akka.camel.component.ActiveObjectComponent +import se.scalablesolutions.akka.util.Logging + +/** + * @author Martin Krasser + */ +private[camel] object ConsumerPublisher extends Logging { + /** + * Creates a route to the registered consumer actor. + */ + def handleConsumerRegistered(event: ConsumerRegistered) { + CamelContextManager.context.addRoutes(new ConsumerActorRoute(event.uri, event.id, event.uuid)) + log.info("published actor %s at endpoint %s" format (event.actorRef, event.uri)) + } + + /** + * Stops route to the already un-registered consumer actor. + */ + def handleConsumerUnregistered(event: ConsumerUnregistered) { + CamelContextManager.context.stopRoute(event.id) + log.info("unpublished actor %s from endpoint %s" format (event.actorRef, event.uri)) + } + + /** + * Creates a route to an active object method. + */ + def handleConsumerMethodRegistered(event: ConsumerMethodRegistered) { + val targetMethod = event.method.getName + val objectId = "%s_%s" format (event.init.actorRef.uuid, targetMethod) + + CamelContextManager.activeObjectRegistry.put(objectId, event.activeObject) + CamelContextManager.context.addRoutes(new ConsumerMethodRoute(event.uri, objectId, targetMethod)) + log.info("published method %s of %s at endpoint %s" format (targetMethod, event.activeObject, event.uri)) + } +} + +/** + * Actor that publishes consumer actors and active object methods at Camel endpoints. + * The Camel context used for publishing is CamelContextManager.context. This actor + * accepts messages of type + * se.scalablesolutions.akka.camel.service.ConsumerRegistered, + * se.scalablesolutions.akka.camel.service.ConsumerMethodRegistered and + * se.scalablesolutions.akka.camel.service.ConsumerUnregistered. + * + * @author Martin Krasser + */ +private[camel] class ConsumerPublisher extends Actor { + import ConsumerPublisher._ + + @volatile private var latch = new CountDownLatch(0) + + /** + * Adds a route to the actor identified by a Publish message to the global CamelContext. + */ + protected def receive = { + case r: ConsumerRegistered => { + handleConsumerRegistered(r) + latch.countDown // needed for testing only. + } + case u: ConsumerUnregistered => { + handleConsumerUnregistered(u) + latch.countDown // needed for testing only. + } + case d: ConsumerMethodRegistered => { + handleConsumerMethodRegistered(d) + latch.countDown // needed for testing only. + } + case SetExpectedMessageCount(num) => { + // needed for testing only. + latch = new CountDownLatch(num) + self.reply(latch) + } + case _ => { /* ignore */} + } +} + +/** + * Command message used For testing-purposes only. + */ +private[camel] case class SetExpectedMessageCount(num: Int) + + +/** + * Defines an abstract route to a target which is either an actor or an active object method.. + * + * @param endpointUri endpoint URI of the consumer actor or active object method. + * @param id actor identifier or active object identifier (registry key). + * + * @author Martin Krasser + */ +private[camel] abstract class ConsumerRoute(endpointUri: String, id: String) extends RouteBuilder { + // TODO: make conversions configurable + private val bodyConversions = Map( + "file" -> classOf[InputStream] + ) + + def configure = { + val schema = endpointUri take endpointUri.indexOf(":") // e.g. "http" from "http://whatever/..." + bodyConversions.get(schema) match { + case Some(clazz) => from(endpointUri).routeId(id).convertBodyTo(clazz).to(targetUri) + case None => from(endpointUri).routeId(id).to(targetUri) + } + } + + protected def targetUri: String +} + +/** + * Defines the route to a consumer actor. + * + * @param endpointUri endpoint URI of the consumer actor + * @param id actor identifier + * @param uuid true if id refers to Actor.uuid, false if + * id refers to Actor.getId. + * + * @author Martin Krasser + */ +private[camel] class ConsumerActorRoute(endpointUri: String, id: String, uuid: Boolean) extends ConsumerRoute(endpointUri, id) { + protected override def targetUri = (if (uuid) "actor:uuid:%s" else "actor:id:%s") format id +} + +/** + * Defines the route to an active object method.. + * + * @param endpointUri endpoint URI of the consumer actor method + * @param id active object identifier + * @param method name of the method to invoke. + * + * @author Martin Krasser + */ +private[camel] class ConsumerMethodRoute(val endpointUri: String, id: String, method: String) extends ConsumerRoute(endpointUri, id) { + protected override def targetUri = "%s:%s?method=%s" format (ActiveObjectComponent.InternalSchema, id, method) +} + +/** + * A registration listener that triggers publication of consumer actors and active object + * methods as well as un-publication of consumer actors. This actor needs to be initialized + * with a PublishRequestorInit command message for obtaining a reference to + * a publisher actor. Before initialization it buffers all outbound messages + * and delivers them to the publisher when receiving a + * PublishRequestorInit message. After initialization, outbound messages are + * delivered directly without buffering. + * + * @see PublishRequestorInit + * + * @author Martin Krasser + */ +private[camel] class PublishRequestor extends Actor { + private val events = ListBuffer[ConsumerEvent]() + private var publisher: Option[ActorRef] = None + + protected def receive = { + case ActorRegistered(actor) => + for (event <- ConsumerRegistered.forConsumer(actor)) deliverCurrentEvent(event) + case ActorUnregistered(actor) => + for (event <- ConsumerUnregistered.forConsumer(actor)) deliverCurrentEvent(event) + case AspectInitRegistered(proxy, init) => + for (event <- ConsumerMethodRegistered.forConsumer(proxy, init)) deliverCurrentEvent(event) + case PublishRequestorInit(pub) => { + publisher = Some(pub) + deliverBufferedEvents + } + case _ => { /* ignore */ } + } + + private def deliverCurrentEvent(event: ConsumerEvent) = { + publisher match { + case Some(pub) => pub ! event + case None => events += event + } + } + + private def deliverBufferedEvents = { + for (event <- events) deliverCurrentEvent(event) + events.clear + } +} + +/** + * Command message to initialize a PublishRequestor to use consumerPublisher + * for publishing actors or active object methods. + */ +private[camel] case class PublishRequestorInit(consumerPublisher: ActorRef) + +/** + * A consumer (un)registration event. + * + * @author Martin Krasser + */ +private[camel] sealed trait ConsumerEvent + +/** + * Event indicating that a consumer actor has been registered at the actor registry. + * + * @param actorRef actor reference + * @param uri endpoint URI of the consumer actor + * @param id actor identifier + * @param uuid true if id is the actor's uuid, false if + * id is the actor's id. + * + * @author Martin Krasser + */ +private[camel] case class ConsumerRegistered(actorRef: ActorRef, uri: String, id: String, uuid: Boolean) extends ConsumerEvent + +/** + * Event indicating that a consumer actor has been unregistered from the actor registry. + * + * @param actorRef actor reference + * @param uri endpoint URI of the consumer actor + * @param id actor identifier + * @param uuid true if id is the actor's uuid, false if + * id is the actor's id. + * + * @author Martin Krasser + */ +private[camel] case class ConsumerUnregistered(actorRef: ActorRef, uri: String, id: String, uuid: Boolean) extends ConsumerEvent + +/** + * Event indicating that an active object proxy has been created for a POJO. For each + * @consume annotated POJO method a separate instance of this class is + * created. + * + * @param activeObject active object (proxy). + * @param init + * @param uri endpoint URI of the active object method + * @param method method to be published. + * + * @author Martin Krasser + */ +private[camel] case class ConsumerMethodRegistered(activeObject: AnyRef, init: AspectInit, uri: String, method: Method) extends ConsumerEvent + +/** + * @author Martin Krasser + */ +private[camel] object ConsumerRegistered { + /** + * Optionally creates an ConsumerRegistered event message for a consumer actor or None if + * actorRef is not a consumer actor. + */ + def forConsumer(actorRef: ActorRef): Option[ConsumerRegistered] = actorRef match { + case ConsumerDescriptor(ref, uri, id, uuid) => Some(ConsumerRegistered(ref, uri, id, uuid)) + case _ => None + } +} + +/** + * @author Martin Krasser + */ +private[camel] object ConsumerUnregistered { + /** + * Optionally creates an ConsumerUnregistered event message for a consumer actor or None if + * actorRef is not a consumer actor. + */ + def forConsumer(actorRef: ActorRef): Option[ConsumerUnregistered] = actorRef match { + case ConsumerDescriptor(ref, uri, id, uuid) => Some(ConsumerUnregistered(ref, uri, id, uuid)) + case _ => None + } +} + +/** + * @author Martin Krasser + */ +private[camel] object ConsumerMethodRegistered { + /** + * Creates a list of ConsumerMethodRegistered event messages for an active object or an empty + * list if the active object is a proxy for an remote active object or the active object doesn't + * have any @consume annotated methods. + */ + def forConsumer(activeObject: AnyRef, init: AspectInit): List[ConsumerMethodRegistered] = { + // TODO: support consumer annotation inheritance + // - visit overridden methods in superclasses + // - visit implemented method declarations in interfaces + if (init.remoteAddress.isDefined) Nil // let remote node publish active object methods on endpoints + else for (m <- activeObject.getClass.getMethods.toList; if (m.isAnnotationPresent(classOf[consume]))) + yield ConsumerMethodRegistered(activeObject, init, m.getAnnotation(classOf[consume]).value, m) + } +} + +/** + * Describes a consumer actor with elements that are relevant for publishing an actor at a + * Camel endpoint (or unpublishing an actor from an endpoint). + * + * @author Martin Krasser + */ +private[camel] object ConsumerDescriptor { + + /** + * An extractor that optionally creates a 4-tuple from a consumer actor reference containing + * the actor reference itself, endpoint URI, identifier and a hint whether the identifier + * is the actor uuid or actor id. If actorRef doesn't reference a consumer actor, + * None is returned. + */ + def unapply(actorRef: ActorRef): Option[(ActorRef, String, String, Boolean)] = + unapplyConsumerInstance(actorRef) orElse unapplyConsumeAnnotated(actorRef) + + private def unapplyConsumeAnnotated(actorRef: ActorRef): Option[(ActorRef, String, String, Boolean)] = { + val annotation = actorRef.actorClass.getAnnotation(classOf[consume]) + if (annotation eq null) None + else if (actorRef.remoteAddress.isDefined) None + else Some((actorRef, annotation.value, actorRef.id, false)) + } + + private def unapplyConsumerInstance(actorRef: ActorRef): Option[(ActorRef, String, String, Boolean)] = + if (!actorRef.actor.isInstanceOf[Consumer]) None + else if (actorRef.remoteAddress.isDefined) None + else Some((actorRef, actorRef.actor.asInstanceOf[Consumer].endpointUri, actorRef.uuid, true)) +} diff --git a/akka-camel/src/main/scala/Producer.scala b/akka-camel/src/main/scala/Producer.scala index 634cf60b3d..7aca16f956 100644 --- a/akka-camel/src/main/scala/Producer.scala +++ b/akka-camel/src/main/scala/Producer.scala @@ -160,7 +160,7 @@ trait Producer { this: Actor => * * @author Martin Krasser */ -class ProducerResponseSender( +private[camel] class ProducerResponseSender( headers: Map[String, Any], sender: Option[ActorRef], senderFuture: Option[CompletableFuture[Any]], diff --git a/akka-camel/src/main/scala/component/ActiveObjectComponent.scala b/akka-camel/src/main/scala/component/ActiveObjectComponent.scala new file mode 100644 index 0000000000..d65c7e6528 --- /dev/null +++ b/akka-camel/src/main/scala/component/ActiveObjectComponent.scala @@ -0,0 +1,108 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.camel.component + +import java.util.Map +import java.util.concurrent.ConcurrentHashMap +import org.apache.camel.CamelContext +import org.apache.camel.component.bean._ + +/** + * @author Martin Krasser + */ +object ActiveObjectComponent { + /** + * Default schema name for active object endpoint URIs. + */ + val InternalSchema = "active-object-internal" +} + +/** + * Camel component for exchanging messages with active objects. This component + * tries to obtain the active object from the activeObjectRegistry + * first. If it's not there it tries to obtain it from the CamelContext's registry. + * + * @see org.apache.camel.component.bean.BeanComponent + * + * @author Martin Krasser + */ +class ActiveObjectComponent extends BeanComponent { + val activeObjectRegistry = new ConcurrentHashMap[String, AnyRef] + + /** + * Creates a {@link org.apache.camel.component.bean.BeanEndpoint} with a custom + * bean holder that uses activeObjectRegistry for getting access to + * active objects (beans). + * + * @see se.scalablesolutions.akka.camel.component.ActiveObjectHolder + */ + override def createEndpoint(uri: String, remaining: String, parameters: Map[String, AnyRef]) = { + val endpoint = new BeanEndpoint(uri, this) + endpoint.setBeanName(remaining) + endpoint.setBeanHolder(createBeanHolder(remaining)) + setProperties(endpoint.getProcessor, parameters) + endpoint + } + + private def createBeanHolder(beanName: String) = + new ActiveObjectHolder(activeObjectRegistry, getCamelContext, beanName).createCacheHolder +} + +/** + * {@link org.apache.camel.component.bean.BeanHolder} implementation that uses a custom + * registry for getting access to active objects. + * + * @author Martin Krasser + */ +class ActiveObjectHolder(activeObjectRegistry: Map[String, AnyRef], context: CamelContext, name: String) + extends RegistryBean(context, name) { + + /** + * Returns an {@link se.scalablesolutions.akka.camel.component.ActiveObjectInfo} instance. + */ + override def getBeanInfo: BeanInfo = + new ActiveObjectInfo(getContext, getBean.getClass, getParameterMappingStrategy) + + /** + * Obtains an active object from activeObjectRegistry. + */ + override def getBean: AnyRef = { + val bean = activeObjectRegistry.get(getName) + if (bean eq null) super.getBean else bean + } +} + +/** + * Provides active object meta information. + * + * @author Martin Krasser + */ +class ActiveObjectInfo(context: CamelContext, clazz: Class[_], strategy: ParameterMappingStrategy) + extends BeanInfo(context, clazz, strategy) { + + /** + * Introspects AspectWerkz proxy classes. + * + * @param clazz AspectWerkz proxy class. + */ + protected override def introspect(clazz: Class[_]): Unit = { + + // TODO: fix target class detection in BeanInfo.introspect(Class) + // Camel assumes that classes containing a '$$' in the class name + // are classes generated with CGLIB. This conflicts with proxies + // created from interfaces with AspectWerkz. Once the fix is in + // place this method can be removed. + + for (method <- clazz.getDeclaredMethods) { + if (isValidMethod(clazz, method)) { + introspect(clazz, method) + } + } + val superclass = clazz.getSuperclass + if (superclass != null && !superclass.equals(classOf[AnyRef])) { + introspect(superclass) + } + } +} \ No newline at end of file diff --git a/akka-camel/src/main/scala/service/CamelService.scala b/akka-camel/src/main/scala/service/CamelService.scala deleted file mode 100644 index 80fa603897..0000000000 --- a/akka-camel/src/main/scala/service/CamelService.scala +++ /dev/null @@ -1,86 +0,0 @@ -/** - * Copyright (C) 2009-2010 Scalable Solutions AB - */ - -package se.scalablesolutions.akka.camel.service - -import se.scalablesolutions.akka.actor.ActorRegistry -import se.scalablesolutions.akka.actor.Actor._ -import se.scalablesolutions.akka.camel.CamelContextManager -import se.scalablesolutions.akka.util.{Bootable, Logging} - -/** - * Used by applications (and the Kernel) to publish consumer actors via Camel - * endpoints and to manage the life cycle of a a global CamelContext which can - * be accessed via se.scalablesolutions.akka.camel.CamelContextManager. - * - * @author Martin Krasser - */ -trait CamelService extends Bootable with Logging { - - import CamelContextManager._ - - private[camel] val consumerPublisher = actorOf[ConsumerPublisher] - private[camel] val publishRequestor = actorOf(new PublishRequestor(consumerPublisher)) - - /** - * Starts the CamelService. Any started actor that is a consumer actor will be (asynchronously) - * published as Camel endpoint. Consumer actors that are started after this method returned will - * be published as well. Actor publishing is done asynchronously. - */ - abstract override def onLoad = { - super.onLoad - - // Only init and start if not already done by application - if (!initialized) init - if (!started) start - - // start actor that exposes consumer actors via Camel endpoints - consumerPublisher.start - - // add listener for actor registration events - ActorRegistry.addRegistrationListener(publishRequestor.start) - - // publish already registered consumer actors - for (actor <- ActorRegistry.actors; event <- ConsumerRegistered.forConsumer(actor)) consumerPublisher ! event - } - - /** - * Stops the CamelService. - */ - abstract override def onUnload = { - ActorRegistry.removeRegistrationListener(publishRequestor) - publishRequestor.stop - consumerPublisher.stop - stop - super.onUnload - } - - /** - * Starts the CamelService. - * - * @see onLoad - */ - def load = onLoad - - /** - * Stops the CamelService. - * - * @see onUnload - */ - def unload = onUnload -} - -/** - * CamelService companion object used by standalone applications to create their own - * CamelService instance. - * - * @author Martin Krasser - */ -object CamelService { - - /** - * Creates a new CamelService instance. - */ - def newInstance: CamelService = new CamelService {} -} diff --git a/akka-camel/src/main/scala/service/ConsumerPublisher.scala b/akka-camel/src/main/scala/service/ConsumerPublisher.scala deleted file mode 100644 index 167f3acaa1..0000000000 --- a/akka-camel/src/main/scala/service/ConsumerPublisher.scala +++ /dev/null @@ -1,210 +0,0 @@ -/** - * Copyright (C) 2009-2010 Scalable Solutions AB - */ -package se.scalablesolutions.akka.camel.service - -import java.io.InputStream -import java.util.concurrent.CountDownLatch - -import org.apache.camel.builder.RouteBuilder - -import se.scalablesolutions.akka.actor.{ActorUnregistered, ActorRegistered, Actor, ActorRef} -import se.scalablesolutions.akka.actor.annotation.consume -import se.scalablesolutions.akka.camel.{Consumer, CamelContextManager} -import se.scalablesolutions.akka.util.Logging - -/** - * Actor that publishes consumer actors as Camel endpoints at the CamelContext managed - * by se.scalablesolutions.akka.camel.CamelContextManager. It accepts messages of type - * se.scalablesolutions.akka.camel.service.ConsumerRegistered and - * se.scalablesolutions.akka.camel.service.ConsumerUnregistered. - * - * @author Martin Krasser - */ -class ConsumerPublisher extends Actor with Logging { - @volatile private var publishLatch = new CountDownLatch(0) - @volatile private var unpublishLatch = new CountDownLatch(0) - - /** - * Adds a route to the actor identified by a Publish message to the global CamelContext. - */ - protected def receive = { - case r: ConsumerRegistered => { - handleConsumerRegistered(r) - publishLatch.countDown // needed for testing only. - } - case u: ConsumerUnregistered => { - handleConsumerUnregistered(u) - unpublishLatch.countDown // needed for testing only. - } - case _ => { /* ignore */} - } - - /** - * Sets the expected number of actors to be published. Used for testing only. - */ - private[camel] def expectPublishCount(count: Int): Unit = - publishLatch = new CountDownLatch(count) - - /** - * Sets the expected number of actors to be unpublished. Used for testing only. - */ - private[camel] def expectUnpublishCount(count: Int): Unit = - unpublishLatch = new CountDownLatch(count) - - /** - * Waits for the expected number of actors to be published. Used for testing only. - */ - private[camel] def awaitPublish = publishLatch.await - - /** - * Waits for the expected number of actors to be unpublished. Used for testing only. - */ - private[camel] def awaitUnpublish = unpublishLatch.await - - /** - * Creates a route to the registered consumer actor. - */ - private def handleConsumerRegistered(event: ConsumerRegistered) { - CamelContextManager.context.addRoutes(new ConsumerRoute(event.uri, event.id, event.uuid)) - log.info("published actor %s (%s) at endpoint %s" format (event.clazz, event.id, event.uri)) - } - - /** - * Stops route to the already un-registered consumer actor. - */ - private def handleConsumerUnregistered(event: ConsumerUnregistered) { - CamelContextManager.context.stopRoute(event.id) - log.info("unpublished actor %s (%s) from endpoint %s" format (event.clazz, event.id, event.uri)) - } -} - -/** - * Defines the route to a consumer actor. - * - * @param endpointUri endpoint URI of the consumer actor - * @param id actor identifier - * @param uuid true if id refers to Actor.uuid, false if - * id refers to Actor.getId. - * - * @author Martin Krasser - */ -class ConsumerRoute(val endpointUri: String, id: String, uuid: Boolean) extends RouteBuilder { - // TODO: make conversions configurable - private val bodyConversions = Map( - "file" -> classOf[InputStream] - ) - - def configure = { - val schema = endpointUri take endpointUri.indexOf(":") // e.g. "http" from "http://whatever/..." - bodyConversions.get(schema) match { - case Some(clazz) => from(endpointUri).routeId(id).convertBodyTo(clazz).to(actorUri) - case None => from(endpointUri).routeId(id).to(actorUri) - } - } - - private def actorUri = (if (uuid) "actor:uuid:%s" else "actor:id:%s") format id -} - -/** - * A registration listener that triggers publication and un-publication of consumer actors. - * - * @author Martin Krasser - */ -class PublishRequestor(consumerPublisher: ActorRef) extends Actor { - protected def receive = { - case ActorRegistered(actor) => for (event <- ConsumerRegistered.forConsumer(actor)) consumerPublisher ! event - case ActorUnregistered(actor) => for (event <- ConsumerUnregistered.forConsumer(actor)) consumerPublisher ! event - } -} - -/** - * Consumer actor lifecycle event. - * - * @author Martin Krasser - */ -sealed trait ConsumerEvent - -/** - * Event indicating that a consumer actor has been registered at the actor registry. - * - * @param clazz clazz name of the referenced actor - * @param uri endpoint URI of the consumer actor - * @param id actor identifier - * @param uuid true if id is the actor's uuid, false if - * id is the actor's id. - * - * @author Martin Krasser - */ -case class ConsumerRegistered(clazz: String, uri: String, id: String, uuid: Boolean) extends ConsumerEvent - -/** - * Event indicating that a consumer actor has been unregistered from the actor registry. - * - * @param clazz clazz name of the referenced actor - * @param uri endpoint URI of the consumer actor - * @param id actor identifier - * @param uuid true if id is the actor's uuid, false if - * id is the actor's id. - * - * @author Martin Krasser - */ -case class ConsumerUnregistered(clazz: String, uri: String, id: String, uuid: Boolean) extends ConsumerEvent - -/** - * @author Martin Krasser - */ -private[camel] object ConsumerRegistered { - /** - * Optionally creates an ConsumerRegistered event message for a consumer actor or None if - * actorRef is not a consumer actor. - */ - def forConsumer(actorRef: ActorRef): Option[ConsumerRegistered] = actorRef match { - case ConsumerDescriptor(clazz, uri, id, uuid) => Some(ConsumerRegistered(clazz, uri, id, uuid)) - case _ => None - } -} - -/** - * @author Martin Krasser - */ -private[camel] object ConsumerUnregistered { - /** - * Optionally creates an ConsumerUnregistered event message for a consumer actor or None if - * actorRef is not a consumer actor. - */ - def forConsumer(actorRef: ActorRef): Option[ConsumerUnregistered] = actorRef match { - case ConsumerDescriptor(clazz, uri, id, uuid) => Some(ConsumerUnregistered(clazz, uri, id, uuid)) - case _ => None - } -} - -/** - * Describes a consumer actor with elements that are relevant for publishing an actor at a - * Camel endpoint (or unpublishing an actor from an endpoint). - * - * @author Martin Krasser - */ -private[camel] object ConsumerDescriptor { - - /** - * An extractor that optionally creates a 4-tuple from a consumer actor reference containing - * the target actor's class name, endpoint URI, identifier and a hint whether the identifier - * is the actor uuid or actor id. If actorRef doesn't reference a consumer actor, - * None is returned. - */ - def unapply(actorRef: ActorRef): Option[(String, String, String, Boolean)] = - unapplyConsumerInstance(actorRef) orElse unapplyConsumeAnnotated(actorRef) - - private def unapplyConsumeAnnotated(actorRef: ActorRef): Option[(String, String, String, Boolean)] = { - val annotation = actorRef.actorClass.getAnnotation(classOf[consume]) - if (annotation eq null) None - else if (actorRef.remoteAddress.isDefined) None - else Some((actorRef.actor.getClass.getName, annotation.value, actorRef.id, false)) - } - - private def unapplyConsumerInstance(actorRef: ActorRef): Option[(String, String, String, Boolean)] = - if (!actorRef.actor.isInstanceOf[Consumer]) None - else if (actorRef.remoteAddress.isDefined) None - else Some((actorRef.actor.getClass.getName, actorRef.actor.asInstanceOf[Consumer].endpointUri, actorRef.uuid, true)) -} diff --git a/akka-camel/src/test/java/se/scalablesolutions/akka/camel/PojoBase.java b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/PojoBase.java new file mode 100644 index 0000000000..05bf1625bb --- /dev/null +++ b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/PojoBase.java @@ -0,0 +1,34 @@ +package se.scalablesolutions.akka.camel; + +import org.apache.camel.Body; +import org.apache.camel.Header; + +import se.scalablesolutions.akka.actor.annotation.consume; + +/** + * @author Martin Krasser + */ +public class PojoBase { + + public String m1(String b, String h) { + return "m1base: " + b + " " + h; + } + + @consume("direct:m2base") + public String m2(@Body String b, @Header("test") String h) { + return "m2base: " + b + " " + h; + } + + @consume("direct:m3base") + public String m3(@Body String b, @Header("test") String h) { + return "m3base: " + b + " " + h; + } + + @consume("direct:m4base") + public String m4(@Body String b, @Header("test") String h) { + return "m4base: " + b + " " + h; + } + + public void m5(@Body String b, @Header("test") String h) { + } +} diff --git a/akka-camel/src/test/java/se/scalablesolutions/akka/camel/PojoImpl.java b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/PojoImpl.java new file mode 100644 index 0000000000..b48202d4dc --- /dev/null +++ b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/PojoImpl.java @@ -0,0 +1,23 @@ +package se.scalablesolutions.akka.camel; + +import org.apache.camel.Body; +import org.apache.camel.Header; + +import se.scalablesolutions.akka.actor.annotation.consume; + +/** + * @author Martin Krasser + */ +public class PojoImpl implements PojoIntf { + + public String m1(String b, String h) { + return "m1impl: " + b + " " + h; + } + + @consume("direct:m2impl") + public String m2(@Body String b, @Header("test") String h) { + return "m2impl: " + b + " " + h; + } + + +} diff --git a/akka-camel/src/test/java/se/scalablesolutions/akka/camel/PojoIntf.java b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/PojoIntf.java new file mode 100644 index 0000000000..14f63afd2e --- /dev/null +++ b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/PojoIntf.java @@ -0,0 +1,18 @@ +package se.scalablesolutions.akka.camel; + +import org.apache.camel.Body; +import org.apache.camel.Header; + +import se.scalablesolutions.akka.actor.annotation.consume; + +/** + * @author Martin Krasser + */ +public interface PojoIntf { + + public String m1(String b, String h); + + @consume("direct:m2intf") + public String m2(@Body String b, @Header("test") String h); + +} diff --git a/akka-camel/src/test/java/se/scalablesolutions/akka/camel/PojoSingle.java b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/PojoSingle.java new file mode 100644 index 0000000000..7d577535b2 --- /dev/null +++ b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/PojoSingle.java @@ -0,0 +1,14 @@ +package se.scalablesolutions.akka.camel; + +import se.scalablesolutions.akka.actor.annotation.consume; + +/** + * @author Martin Krasser + */ +public class PojoSingle { + + @consume("direct:foo") + public void foo(String b) { + } + +} diff --git a/akka-camel/src/test/java/se/scalablesolutions/akka/camel/PojoSub.java b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/PojoSub.java new file mode 100644 index 0000000000..be5b453698 --- /dev/null +++ b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/PojoSub.java @@ -0,0 +1,27 @@ +package se.scalablesolutions.akka.camel; + +import org.apache.camel.Body; +import org.apache.camel.Header; + +import se.scalablesolutions.akka.actor.annotation.consume; + +public class PojoSub extends PojoBase { + + @Override + @consume("direct:m1sub") + public String m1(@Body String b, @Header("test") String h) { + return "m1sub: " + b + " " + h; + } + + @Override + public String m2(String b, String h) { + return "m2sub: " + b + " " + h; + } + + @Override + @consume("direct:m3sub") + public String m3(@Body String b, @Header("test") String h) { + return "m3sub: " + b + " " + h; + } + +} diff --git a/akka-camel/src/test/scala/service/CamelServiceFeatureTest.scala b/akka-camel/src/test/scala/CamelServiceFeatureTest.scala similarity index 61% rename from akka-camel/src/test/scala/service/CamelServiceFeatureTest.scala rename to akka-camel/src/test/scala/CamelServiceFeatureTest.scala index fd106f799f..fd57d83457 100644 --- a/akka-camel/src/test/scala/service/CamelServiceFeatureTest.scala +++ b/akka-camel/src/test/scala/CamelServiceFeatureTest.scala @@ -1,53 +1,36 @@ -package se.scalablesolutions.akka.camel.service +package se.scalablesolutions.akka.camel + +import java.util.concurrent.{CountDownLatch, TimeUnit} import org.apache.camel.builder.RouteBuilder import org.scalatest.{GivenWhenThen, BeforeAndAfterAll, FeatureSpec} -import se.scalablesolutions.akka.actor.{Actor, ActorRegistry} -import se.scalablesolutions.akka.camel.{CamelContextManager, Message, Consumer} -import Actor._ - -object CamelServiceFeatureTest { - - class TestConsumer(uri: String) extends Actor with Consumer { - def endpointUri = uri - protected def receive = { - case msg: Message => self.reply("received %s" format msg.body) - } - } - - class TestActor extends Actor { - self.id = "custom-actor-id" - protected def receive = { - case msg: Message => self.reply("received %s" format msg.body) - } - } - - class TestRoute extends RouteBuilder { - def configure { - from("direct:custom-route-test-1").to("actor:custom-actor-id") - } - } -} +import se.scalablesolutions.akka.actor.Actor._ +import se.scalablesolutions.akka.actor.{ActiveObject, Actor, ActorRegistry} class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with GivenWhenThen { import CamelServiceFeatureTest._ - var service: CamelService = CamelService.newInstance + var service: CamelService = _ override protected def beforeAll = { ActorRegistry.shutdownAll + // create new CamelService instance + service = CamelService.newInstance // register test consumer before starting the CamelService actorOf(new TestConsumer("direct:publish-test-1")).start - // Consigure a custom camel route + // Configure a custom camel route CamelContextManager.init CamelContextManager.context.addRoutes(new TestRoute) - // set expectations for testing purposes - service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].expectPublishCount(1) + // start consumer publisher, otherwise we cannot set message + // count expectations in the next step (needed for testing only). + service.consumerPublisher.start + // set expectations on publish count + val latch = service.consumerPublisher.!![CountDownLatch](SetExpectedMessageCount(1)).get // start the CamelService service.load // await publication of first test consumer - service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].awaitPublish + assert(latch.await(5000, TimeUnit.MILLISECONDS)) } override protected def afterAll = { @@ -60,9 +43,9 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi scenario("access registered consumer actors via Camel direct-endpoints") { given("two consumer actors registered before and after CamelService startup") - service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].expectPublishCount(1) + val latch = service.consumerPublisher.!![CountDownLatch](SetExpectedMessageCount(1)).get actorOf(new TestConsumer("direct:publish-test-2")).start - service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].awaitPublish + assert(latch.await(5000, TimeUnit.MILLISECONDS)) when("requests are sent to these actors") val response1 = CamelContextManager.template.requestBody("direct:publish-test-1", "msg1") @@ -81,14 +64,14 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi given("a consumer actor that has been stopped") assert(CamelContextManager.context.hasEndpoint(endpointUri) eq null) - service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].expectPublishCount(1) + var latch = service.consumerPublisher.!![CountDownLatch](SetExpectedMessageCount(1)).get val consumer = actorOf(new TestConsumer(endpointUri)).start - service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].awaitPublish + assert(latch.await(5000, TimeUnit.MILLISECONDS)) assert(CamelContextManager.context.hasEndpoint(endpointUri) ne null) - service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].expectUnpublishCount(1) + latch = service.consumerPublisher.!![CountDownLatch](SetExpectedMessageCount(1)).get consumer.stop - service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].awaitUnpublish + assert(latch.await(5000, TimeUnit.MILLISECONDS)) // endpoint is still there but the route has been stopped assert(CamelContextManager.context.hasEndpoint(endpointUri) ne null) @@ -114,4 +97,48 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi assert(response === "received msg3") } } + + feature("Publish active object methods in the global CamelContext") { + + scenario("access active object methods via Camel direct-endpoints") { + + given("an active object registered after CamelService startup") + val latch = service.consumerPublisher.!![CountDownLatch](SetExpectedMessageCount(3)).get + ActiveObject.newInstance(classOf[PojoBase]) + assert(latch.await(5000, TimeUnit.MILLISECONDS)) + + when("requests are sent to published methods") + val response1 = CamelContextManager.template.requestBodyAndHeader("direct:m2base", "x", "test", "y") + val response2 = CamelContextManager.template.requestBodyAndHeader("direct:m3base", "x", "test", "y") + val response3 = CamelContextManager.template.requestBodyAndHeader("direct:m4base", "x", "test", "y") + + then("each should have returned a different response") + assert(response1 === "m2base: x y") + assert(response2 === "m3base: x y") + assert(response3 === "m4base: x y") + } + } +} + +object CamelServiceFeatureTest { + + class TestConsumer(uri: String) extends Actor with Consumer { + def endpointUri = uri + protected def receive = { + case msg: Message => self.reply("received %s" format msg.body) + } + } + + class TestActor extends Actor { + self.id = "custom-actor-id" + protected def receive = { + case msg: Message => self.reply("received %s" format msg.body) + } + } + + class TestRoute extends RouteBuilder { + def configure { + from("direct:custom-route-test-1").to("actor:custom-actor-id") + } + } } diff --git a/akka-camel/src/test/scala/ConsumerMethodRegisteredTest.scala b/akka-camel/src/test/scala/ConsumerMethodRegisteredTest.scala new file mode 100644 index 0000000000..66d63d69e8 --- /dev/null +++ b/akka-camel/src/test/scala/ConsumerMethodRegisteredTest.scala @@ -0,0 +1,46 @@ +package se.scalablesolutions.akka.camel + +import java.net.InetSocketAddress + +import org.junit.Test +import org.scalatest.junit.JUnitSuite + +import se.scalablesolutions.akka.actor.{AspectInit, ActiveObject} +import se.scalablesolutions.akka.camel.ConsumerMethodRegistered._ + +class ConsumerMethodRegisteredTest extends JUnitSuite { + val remoteAddress = new InetSocketAddress("localhost", 8888); + val remoteAspectInit = AspectInit(classOf[String], null, Some(remoteAddress), 1000) + val localAspectInit = AspectInit(classOf[String], null, None, 1000) + + val activePojoBase = ActiveObject.newInstance(classOf[PojoBase]) + val activePojoSub = ActiveObject.newInstance(classOf[PojoSub]) + val activePojoIntf = ActiveObject.newInstance(classOf[PojoIntf], new PojoImpl) + + val ascendingMethodName = (r1: ConsumerMethodRegistered, r2: ConsumerMethodRegistered) => + r1.method.getName < r2.method.getName + + @Test def shouldSelectPojoBaseMethods234 = { + val registered = forConsumer(activePojoBase, localAspectInit).sortWith(ascendingMethodName) + assert(registered.size === 3) + assert(registered.map(_.method.getName) === List("m2", "m3", "m4")) + } + + @Test def shouldSelectPojoSubMethods134 = { + val registered = forConsumer(activePojoSub, localAspectInit).sortWith(ascendingMethodName) + assert(registered.size === 3) + assert(registered.map(_.method.getName) === List("m1", "m3", "m4")) + } + + @Test def shouldSelectPojoIntfMethod2 = { + val registered = forConsumer(activePojoIntf, localAspectInit) + assert(registered.size === 1) + assert(registered(0).method.getName === "m2") + } + + @Test def shouldIgnoreRemoteProxies = { + val registered = forConsumer(activePojoBase, remoteAspectInit) + assert(registered.size === 0) + } + +} \ No newline at end of file diff --git a/akka-camel/src/test/scala/service/ConsumerRegisteredTest.scala b/akka-camel/src/test/scala/ConsumerRegisteredTest.scala similarity index 67% rename from akka-camel/src/test/scala/service/ConsumerRegisteredTest.scala rename to akka-camel/src/test/scala/ConsumerRegisteredTest.scala index e69dd69339..caaa03591b 100644 --- a/akka-camel/src/test/scala/service/ConsumerRegisteredTest.scala +++ b/akka-camel/src/test/scala/ConsumerRegisteredTest.scala @@ -1,4 +1,4 @@ -package se.scalablesolutions.akka.camel.service +package se.scalablesolutions.akka.camel import org.junit.Test import org.scalatest.junit.JUnitSuite @@ -6,7 +6,6 @@ import org.scalatest.junit.JUnitSuite import se.scalablesolutions.akka.actor.Actor import se.scalablesolutions.akka.actor.Actor._ import se.scalablesolutions.akka.actor.annotation.consume -import se.scalablesolutions.akka.camel.Consumer object ConsumerRegisteredTest { @consume("mock:test1") @@ -29,20 +28,22 @@ class ConsumerRegisteredTest extends JUnitSuite { import ConsumerRegisteredTest._ @Test def shouldCreatePublishRequestList = { - val as = List(actorOf[ConsumeAnnotatedActor]) + val a = actorOf[ConsumeAnnotatedActor] + val as = List(a) val events = for (a <- as; e <- ConsumerRegistered.forConsumer(a)) yield e - assert(events === List(ConsumerRegistered(classOf[ConsumeAnnotatedActor].getName, "mock:test1", "test", false))) + assert(events === List(ConsumerRegistered(a, "mock:test1", "test", false))) } @Test def shouldCreateSomePublishRequestWithActorId = { - val event = ConsumerRegistered.forConsumer(actorOf[ConsumeAnnotatedActor]) - assert(event === Some(ConsumerRegistered(classOf[ConsumeAnnotatedActor].getName, "mock:test1", "test", false))) + val a = actorOf[ConsumeAnnotatedActor] + val event = ConsumerRegistered.forConsumer(a) + assert(event === Some(ConsumerRegistered(a, "mock:test1", "test", false))) } @Test def shouldCreateSomePublishRequestWithActorUuid = { val ca = actorOf[ConsumerActor] val event = ConsumerRegistered.forConsumer(ca) - assert(event === Some(ConsumerRegistered(ca.actor.getClass.getName, "mock:test2", ca.uuid, true))) + assert(event === Some(ConsumerRegistered(ca, "mock:test2", ca.uuid, true))) } @Test def shouldCreateNone = { diff --git a/akka-camel/src/test/scala/PublishRequestorTest.scala b/akka-camel/src/test/scala/PublishRequestorTest.scala new file mode 100644 index 0000000000..f3c9a899b2 --- /dev/null +++ b/akka-camel/src/test/scala/PublishRequestorTest.scala @@ -0,0 +1,69 @@ +package se.scalablesolutions.akka.camel + +import java.util.concurrent.{CountDownLatch, TimeUnit} + +import org.junit.{Before, After, Test} +import org.scalatest.junit.JUnitSuite + +import se.scalablesolutions.akka.actor._ +import se.scalablesolutions.akka.actor.Actor._ +import se.scalablesolutions.akka.camel.support.{SetExpectedMessageCount => SetExpectedTestMessageCount, _} + +class PublishRequestorTest extends JUnitSuite { + import PublishRequestorTest._ + + var publisher: ActorRef = _ + var requestor: ActorRef = _ + var consumer: ActorRef = _ + + @Before def setUp = { + publisher = actorOf[PublisherMock].start + requestor = actorOf[PublishRequestor].start + requestor ! PublishRequestorInit(publisher) + consumer = actorOf(new Actor with Consumer { + def endpointUri = "mock:test" + protected def receive = null + }).start + + } + + @After def tearDown = { + ActorRegistry.shutdownAll + } + + @Test def shouldReceiveConsumerMethodRegisteredEvent = { + val obj = ActiveObject.newInstance(classOf[PojoSingle]) + val init = AspectInit(classOf[PojoSingle], null, None, 1000) + val latch = publisher.!![CountDownLatch](SetExpectedTestMessageCount(1)).get + requestor ! AspectInitRegistered(obj, init) + assert(latch.await(5000, TimeUnit.MILLISECONDS)) + val event = (publisher !! GetRetainedMessage).get.asInstanceOf[ConsumerMethodRegistered] + assert(event.init === init) + assert(event.uri === "direct:foo") + assert(event.activeObject === obj) + assert(event.method.getName === "foo") + } + + @Test def shouldReceiveConsumerRegisteredEvent = { + val latch = publisher.!![CountDownLatch](SetExpectedTestMessageCount(1)).get + requestor ! ActorRegistered(consumer) + assert(latch.await(5000, TimeUnit.MILLISECONDS)) + assert((publisher !! GetRetainedMessage) === + Some(ConsumerRegistered(consumer, "mock:test", consumer.uuid, true))) + } + + @Test def shouldReceiveConsumerUnregisteredEvent = { + val latch = publisher.!![CountDownLatch](SetExpectedTestMessageCount(1)).get + requestor ! ActorUnregistered(consumer) + assert(latch.await(5000, TimeUnit.MILLISECONDS)) + assert((publisher !! GetRetainedMessage) === + Some(ConsumerUnregistered(consumer, "mock:test", consumer.uuid, true))) + } +} + +object PublishRequestorTest { + class PublisherMock extends TestActor with Retain with Countdown { + def handler = retain andThen countdown + } +} + diff --git a/akka-camel/src/test/scala/component/ActiveObjectComponentFeatureTest.scala b/akka-camel/src/test/scala/component/ActiveObjectComponentFeatureTest.scala new file mode 100644 index 0000000000..1decc24eef --- /dev/null +++ b/akka-camel/src/test/scala/component/ActiveObjectComponentFeatureTest.scala @@ -0,0 +1,74 @@ +package se.scalablesolutions.akka.camel.component + +import org.scalatest.{BeforeAndAfterEach, BeforeAndAfterAll, FeatureSpec} + +import se.scalablesolutions.akka.actor.Actor._ +import se.scalablesolutions.akka.camel._ +import se.scalablesolutions.akka.actor.{ActorRegistry, ActiveObject} +import org.apache.camel.{ExchangePattern, Exchange, Processor} + +/** + * @author Martin Krasser + */ +class ActiveObjectComponentFeatureTest extends FeatureSpec with BeforeAndAfterAll with BeforeAndAfterEach { + override protected def beforeAll = { + val activePojoBase = ActiveObject.newInstance(classOf[PojoBase]) + val activePojoIntf = ActiveObject.newInstance(classOf[PojoIntf], new PojoImpl) + + CamelContextManager.init + CamelContextManager.start + + CamelContextManager.activeObjectRegistry.put("base", activePojoBase) + CamelContextManager.activeObjectRegistry.put("intf", activePojoIntf) + } + + override protected def afterAll = { + CamelContextManager.stop + ActorRegistry.shutdownAll + } + + feature("Communicate with an active object from a Camel application using active object endpoint URIs") { + import ActiveObjectComponent.InternalSchema + import CamelContextManager.template + import ExchangePattern._ + + scenario("in-out exchange with proxy created from interface and method returning String") { + val result = template.requestBodyAndHeader("%s:intf?method=m2" format InternalSchema, "x", "test", "y") + assert(result === "m2impl: x y") + } + + scenario("in-out exchange with proxy created from class and method returning String") { + val result = template.requestBodyAndHeader("%s:base?method=m2" format InternalSchema, "x", "test", "y") + assert(result === "m2base: x y") + } + + scenario("in-out exchange with proxy created from class and method returning void") { + val result = template.requestBodyAndHeader("%s:base?method=m5" format InternalSchema, "x", "test", "y") + assert(result === "x") // returns initial body + } + + scenario("in-only exchange with proxy created from class and method returning String") { + val result = template.send("%s:base?method=m2" format InternalSchema, InOnly, new Processor { + def process(exchange: Exchange) = { + exchange.getIn.setBody("x") + exchange.getIn.setHeader("test", "y") + } + }); + assert(result.getPattern === InOnly) + assert(result.getIn.getBody === "m2base: x y") + assert(result.getOut.getBody === null) + } + + scenario("in-only exchange with proxy created from class and method returning void") { + val result = template.send("%s:base?method=m5" format InternalSchema, InOnly, new Processor { + def process(exchange: Exchange) = { + exchange.getIn.setBody("x") + exchange.getIn.setHeader("test", "y") + } + }); + assert(result.getPattern === InOnly) + assert(result.getIn.getBody === "x") + assert(result.getOut.getBody === null) + } + } +} diff --git a/akka-camel/src/test/scala/component/ActorComponentFeatureTest.scala b/akka-camel/src/test/scala/component/ActorComponentFeatureTest.scala index 7db119fa5e..f73a2fcd3e 100644 --- a/akka-camel/src/test/scala/component/ActorComponentFeatureTest.scala +++ b/akka-camel/src/test/scala/component/ActorComponentFeatureTest.scala @@ -1,11 +1,14 @@ package se.scalablesolutions.akka.camel.component +import java.util.concurrent.{TimeUnit, CountDownLatch} + import org.apache.camel.RuntimeCamelException import org.scalatest.{BeforeAndAfterEach, BeforeAndAfterAll, FeatureSpec} +import se.scalablesolutions.akka.actor.Actor._ import se.scalablesolutions.akka.actor.{ActorRegistry, Actor} -import se.scalablesolutions.akka.camel.support.{Respond, Countdown, Tester, Retain} import se.scalablesolutions.akka.camel.{Message, CamelContextManager} +import se.scalablesolutions.akka.camel.support._ class ActorComponentFeatureTest extends FeatureSpec with BeforeAndAfterAll with BeforeAndAfterEach { override protected def beforeAll = { @@ -20,41 +23,37 @@ class ActorComponentFeatureTest extends FeatureSpec with BeforeAndAfterAll with feature("Communicate with an actor from a Camel application using actor endpoint URIs") { import CamelContextManager.template - import Actor._ scenario("one-way communication using actor id") { - val actor = actorOf(new Tester with Retain with Countdown[Message]) - actor.start + val actor = actorOf[Tester1].start + val latch = actor.!![CountDownLatch](SetExpectedMessageCount(1)).get template.sendBody("actor:%s" format actor.id, "Martin") - assert(actor.actor.asInstanceOf[Countdown[Message]].waitFor) - assert(actor.actor.asInstanceOf[Retain].body === "Martin") + assert(latch.await(5000, TimeUnit.MILLISECONDS)) + val reply = (actor !! GetRetainedMessage).get.asInstanceOf[Message] + assert(reply.body === "Martin") } scenario("one-way communication using actor uuid") { - val actor = actorOf(new Tester with Retain with Countdown[Message]) - actor.start + val actor = actorOf[Tester1].start + val latch = actor.!![CountDownLatch](SetExpectedMessageCount(1)).get template.sendBody("actor:uuid:%s" format actor.uuid, "Martin") - assert(actor.actor.asInstanceOf[Countdown[Message]].waitFor) - assert(actor.actor.asInstanceOf[Retain].body === "Martin") + assert(latch.await(5000, TimeUnit.MILLISECONDS)) + val reply = (actor !! GetRetainedMessage).get.asInstanceOf[Message] + assert(reply.body === "Martin") } scenario("two-way communication using actor id") { - val actor = actorOf(new Tester with Respond) - actor.start + val actor = actorOf[Tester2].start assert(template.requestBody("actor:%s" format actor.id, "Martin") === "Hello Martin") } scenario("two-way communication using actor uuid") { - val actor = actorOf(new Tester with Respond) - actor.start + val actor = actorOf[Tester2].start assert(template.requestBody("actor:uuid:%s" format actor.uuid, "Martin") === "Hello Martin") } scenario("two-way communication with timeout") { - val actor = actorOf(new Tester { - self.timeout = 1 - }) - actor.start + val actor = actorOf[Tester3].start intercept[RuntimeCamelException] { template.requestBody("actor:uuid:%s" format actor.uuid, "Martin") } diff --git a/akka-camel/src/test/scala/component/ActorProducerTest.scala b/akka-camel/src/test/scala/component/ActorProducerTest.scala index d41971c07f..419784681b 100644 --- a/akka-camel/src/test/scala/component/ActorProducerTest.scala +++ b/akka-camel/src/test/scala/component/ActorProducerTest.scala @@ -2,41 +2,41 @@ package se.scalablesolutions.akka.camel.component import ActorComponentTest._ -import java.util.concurrent.TimeoutException +import java.util.concurrent.{CountDownLatch, TimeoutException, TimeUnit} import org.apache.camel.ExchangePattern import org.junit.{After, Test} import org.scalatest.junit.JUnitSuite import org.scalatest.BeforeAndAfterAll -import se.scalablesolutions.akka.actor.ActorRegistry import se.scalablesolutions.akka.actor.Actor._ -import se.scalablesolutions.akka.camel.support.{Countdown, Retain, Tester, Respond} +import se.scalablesolutions.akka.actor.ActorRegistry import se.scalablesolutions.akka.camel.{Failure, Message} +import se.scalablesolutions.akka.camel.support._ class ActorProducerTest extends JUnitSuite with BeforeAndAfterAll { @After def tearDown = ActorRegistry.shutdownAll @Test def shouldSendMessageToActor = { - val actor = actorOf(new Tester with Retain with Countdown[Message]) + val actor = actorOf[Tester1].start + val latch = actor.!![CountDownLatch](SetExpectedMessageCount(1)).get val endpoint = mockEndpoint("actor:uuid:%s" format actor.uuid) val exchange = endpoint.createExchange(ExchangePattern.InOnly) - actor.start exchange.getIn.setBody("Martin") exchange.getIn.setHeader("k1", "v1") endpoint.createProducer.process(exchange) - actor.actor.asInstanceOf[Countdown[Message]].waitFor - assert(actor.actor.asInstanceOf[Retain].body === "Martin") - assert(actor.actor.asInstanceOf[Retain].headers === Map(Message.MessageExchangeId -> exchange.getExchangeId, "k1" -> "v1")) + assert(latch.await(5000, TimeUnit.MILLISECONDS)) + val reply = (actor !! GetRetainedMessage).get.asInstanceOf[Message] + assert(reply.body === "Martin") + assert(reply.headers === Map(Message.MessageExchangeId -> exchange.getExchangeId, "k1" -> "v1")) } @Test def shouldSendMessageToActorAndReceiveResponse = { - val actor = actorOf(new Tester with Respond { + val actor = actorOf(new Tester2 { override def response(msg: Message) = Message(super.response(msg), Map("k2" -> "v2")) - }) + }).start val endpoint = mockEndpoint("actor:uuid:%s" format actor.uuid) val exchange = endpoint.createExchange(ExchangePattern.InOut) - actor.start exchange.getIn.setBody("Martin") exchange.getIn.setHeader("k1", "v1") endpoint.createProducer.process(exchange) @@ -45,12 +45,11 @@ class ActorProducerTest extends JUnitSuite with BeforeAndAfterAll { } @Test def shouldSendMessageToActorAndReceiveFailure = { - val actor = actorOf(new Tester with Respond { + val actor = actorOf(new Tester2 { override def response(msg: Message) = Failure(new Exception("testmsg"), Map("k3" -> "v3")) - }) + }).start val endpoint = mockEndpoint("actor:uuid:%s" format actor.uuid) val exchange = endpoint.createExchange(ExchangePattern.InOut) - actor.start exchange.getIn.setBody("Martin") exchange.getIn.setHeader("k1", "v1") endpoint.createProducer.process(exchange) @@ -60,12 +59,9 @@ class ActorProducerTest extends JUnitSuite with BeforeAndAfterAll { } @Test def shouldSendMessageToActorAndTimeout: Unit = { - val actor = actorOf(new Tester { - self.timeout = 1 - }) + val actor = actorOf[Tester3].start val endpoint = mockEndpoint("actor:uuid:%s" format actor.uuid) val exchange = endpoint.createExchange(ExchangePattern.InOut) - actor.start exchange.getIn.setBody("Martin") intercept[TimeoutException] { endpoint.createProducer.process(exchange) diff --git a/akka-camel/src/test/scala/service/PublishRequestorTest.scala b/akka-camel/src/test/scala/service/PublishRequestorTest.scala deleted file mode 100644 index a7f1685de1..0000000000 --- a/akka-camel/src/test/scala/service/PublishRequestorTest.scala +++ /dev/null @@ -1,55 +0,0 @@ -package se.scalablesolutions.akka.camel.service - -import _root_.org.junit.{Before, After, Test} -import org.scalatest.junit.JUnitSuite - -import se.scalablesolutions.akka.camel.Consumer -import se.scalablesolutions.akka.camel.support.{Receive, Countdown} -import se.scalablesolutions.akka.actor.{Actor, ActorRef, ActorRegistry, ActorRegistered, ActorUnregistered} -import Actor._ - -object PublishRequestorTest { - class PublisherMock extends Actor with Receive[ConsumerEvent] { - var received: ConsumerEvent = _ - protected def receive = { - case msg: ConsumerRegistered => onMessage(msg) - case msg: ConsumerUnregistered => onMessage(msg) - } - def onMessage(msg: ConsumerEvent) = received = msg - } -} - -class PublishRequestorTest extends JUnitSuite { - import PublishRequestorTest._ - - var publisher: ActorRef = _ - var requestor: ActorRef = _ - var consumer: ActorRef = _ - - @Before def setUp = { - publisher = actorOf(new PublisherMock with Countdown[ConsumerEvent]).start - requestor = actorOf(new PublishRequestor(publisher)).start - consumer = actorOf(new Actor with Consumer { - def endpointUri = "mock:test" - protected def receive = null - }).start - } - - @After def tearDown = { - ActorRegistry.shutdownAll - } - - @Test def shouldReceiveConsumerRegisteredEvent = { - requestor.!(ActorRegistered(consumer))(None) - publisher.actor.asInstanceOf[Countdown[ConsumerEvent]].waitFor - assert(publisher.actor.asInstanceOf[PublisherMock].received === - ConsumerRegistered(consumer.actor.getClass.getName, "mock:test", consumer.uuid, true)) - } - - @Test def shouldReceiveConsumerUnregisteredEvent = { - requestor.!(ActorUnregistered(consumer))(None) - publisher.actor.asInstanceOf[Countdown[ConsumerRegistered]].waitFor - assert(publisher.actor.asInstanceOf[PublisherMock].received === - ConsumerUnregistered(consumer.actor.getClass.getName, "mock:test", consumer.uuid, true)) - } -} diff --git a/akka-camel/src/test/scala/support/TestSupport.scala b/akka-camel/src/test/scala/support/TestSupport.scala index 98bd23d5ed..a4a0d177ab 100644 --- a/akka-camel/src/test/scala/support/TestSupport.scala +++ b/akka-camel/src/test/scala/support/TestSupport.scala @@ -5,45 +5,71 @@ import java.util.concurrent.{TimeUnit, CountDownLatch} import se.scalablesolutions.akka.camel.Message import se.scalablesolutions.akka.actor.Actor -trait Receive[T] { - def onMessage(msg: T): Unit +import TestSupport._ + +object TestSupport { + type Handler = PartialFunction[Any, Any] } -trait Respond extends Receive[Message] { this: Actor => - abstract override def onMessage(msg: Message): Unit = { - super.onMessage(msg) - this.self.reply(response(msg)) +trait TestActor extends Actor { + def receive = { + case msg => { + handler(msg) + } } + + def handler: Handler +} + +class Tester1 extends TestActor with Retain with Countdown { + def handler = retain andThen countdown +} + +class Tester2 extends TestActor with Respond { + def handler = respond +} + +class Tester3 extends TestActor with Noop { + self.timeout = 1 + def handler = noop +} + +trait Countdown { this: Actor => + var latch: CountDownLatch = new CountDownLatch(0) + def countdown: Handler = { + case SetExpectedMessageCount(num) => { + latch = new CountDownLatch(num) + self.reply(latch) + } + case msg => latch.countDown + } +} + +trait Respond { this: Actor => + def respond: Handler = { + case msg: Message => self.reply(response(msg)) + } + def response(msg: Message): Any = "Hello %s" format msg.body } -trait Retain extends Receive[Message] { - var body: Any = _ - var headers = Map.empty[String, Any] - abstract override def onMessage(msg: Message): Unit = { - super.onMessage(msg) - body = msg.body - headers = msg.headers +trait Retain { this: Actor => + var message: Any = _ + + def retain: Handler = { + case GetRetainedMessage => self.reply(message) + case msg => { + message = msg + msg + } } } -trait Countdown[T] extends Receive[T] { - val count = 1 - val duration = 5000 - val latch = new CountDownLatch(count) - - def waitFor = latch.await(duration, TimeUnit.MILLISECONDS) - def countDown = latch.countDown - - abstract override def onMessage(msg: T) = { - super.onMessage(msg) - countDown +trait Noop { this: Actor => + def noop: Handler = { + case msg => msg } } -class Tester extends Actor with Receive[Message] { - def receive = { - case msg: Message => onMessage(msg) - } - def onMessage(msg: Message): Unit = {} -} +case class SetExpectedMessageCount(num: Int) +case class GetRetainedMessage() diff --git a/akka-core/src/main/java/se/scalablesolutions/akka/annotation/consume.java b/akka-core/src/main/java/se/scalablesolutions/akka/annotation/consume.java index 11552609f5..1a8e5c8db6 100644 --- a/akka-core/src/main/java/se/scalablesolutions/akka/annotation/consume.java +++ b/akka-core/src/main/java/se/scalablesolutions/akka/annotation/consume.java @@ -10,7 +10,7 @@ import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; @Retention(RetentionPolicy.RUNTIME) -@Target(ElementType.TYPE) +@Target({ElementType.TYPE, ElementType.METHOD}) public @interface consume { public abstract String value(); diff --git a/akka-core/src/main/scala/actor/ActiveObject.scala b/akka-core/src/main/scala/actor/ActiveObject.scala index 0780e355dd..a2d2820cca 100644 --- a/akka-core/src/main/scala/actor/ActiveObject.scala +++ b/akka-core/src/main/scala/actor/ActiveObject.scala @@ -476,7 +476,7 @@ object ActiveObject extends Logging { } -private[akka] object AspectInitRegistry { +private[akka] object AspectInitRegistry extends ListenerManagement { private val initializations = new java.util.concurrent.ConcurrentHashMap[AnyRef, AspectInit] def initFor(target: AnyRef) = { @@ -485,9 +485,16 @@ private[akka] object AspectInitRegistry { init } - def register(target: AnyRef, init: AspectInit) = initializations.put(target, init) + def register(target: AnyRef, init: AspectInit) = { + val res = initializations.put(target, init) + foreachListener(_ ! AspectInitRegistered(target, init)) + res + } } +private[akka] sealed trait AspectInitRegistryEvent +private[akka] case class AspectInitRegistered(proxy: AnyRef, init: AspectInit) extends AspectInitRegistryEvent + private[akka] sealed case class AspectInit( val target: Class[_], val actorRef: ActorRef, diff --git a/akka-core/src/main/scala/actor/ActorRegistry.scala b/akka-core/src/main/scala/actor/ActorRegistry.scala index ee1b6e2b4f..b9827fdb9c 100644 --- a/akka-core/src/main/scala/actor/ActorRegistry.scala +++ b/akka-core/src/main/scala/actor/ActorRegistry.scala @@ -4,11 +4,10 @@ package se.scalablesolutions.akka.actor -import se.scalablesolutions.akka.util.Logging - import scala.collection.mutable.ListBuffer import scala.reflect.Manifest import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap} +import se.scalablesolutions.akka.util.ListenerManagement sealed trait ActorRegistryEvent case class ActorRegistered(actor: ActorRef) extends ActorRegistryEvent @@ -26,11 +25,10 @@ case class ActorUnregistered(actor: ActorRef) extends ActorRegistryEvent * * @author Jonas Bonér */ -object ActorRegistry extends Logging { +object ActorRegistry extends ListenerManagement { private val actorsByUUID = new ConcurrentHashMap[String, ActorRef] private val actorsById = new ConcurrentHashMap[String, List[ActorRef]] private val actorsByClassName = new ConcurrentHashMap[String, List[ActorRef]] - private val registrationListeners = new CopyOnWriteArrayList[ActorRef] /** * Returns all actors in the system. @@ -141,29 +139,4 @@ object ActorRegistry extends Logging { actorsByClassName.clear log.info("All actors have been shut down and unregistered from ActorRegistry") } - - /** - * Adds the registration listener this this registry's listener list. - */ - def addRegistrationListener(listener: ActorRef) = { - listener.start - registrationListeners.add(listener) - } - - /** - * Removes the registration listener this this registry's listener list. - */ - def removeRegistrationListener(listener: ActorRef) = { - listener.stop - registrationListeners.remove(listener) - } - - private def foreachListener(f: (ActorRef) => Unit) { - val iterator = registrationListeners.iterator - while (iterator.hasNext) { - val listener = iterator.next - if (listener.isRunning) f(listener) - else log.warning("Can't send ActorRegistryEvent to [%s] since it is not running.", listener) - } - } } diff --git a/akka-core/src/main/scala/util/ListenerManagement.scala b/akka-core/src/main/scala/util/ListenerManagement.scala new file mode 100644 index 0000000000..bdee9d1c1c --- /dev/null +++ b/akka-core/src/main/scala/util/ListenerManagement.scala @@ -0,0 +1,49 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.util + +import java.util.concurrent.CopyOnWriteArrayList + +import se.scalablesolutions.akka.actor.ActorRef + +/** + * A manager for listener actors. Intended for mixin by observables. + * + * @author Martin Krasser + */ +trait ListenerManagement extends Logging { + + private val listeners = new CopyOnWriteArrayList[ActorRef] + + /** + * Adds the listener this this registry's listener list. + * The listener is started by this method. + */ + def addListener(listener: ActorRef) = { + listener.start + listeners.add(listener) + } + + /** + * Removes the listener this this registry's listener list. + * The listener is stopped by this method. + */ + def removeListener(listener: ActorRef) = { + listener.stop + listeners.remove(listener) + } + + /** + * Execute f with each listener as argument. + */ + protected def foreachListener(f: (ActorRef) => Unit) { + val iterator = listeners.iterator + while (iterator.hasNext) { + val listener = iterator.next + if (listener.isRunning) f(listener) + else log.warning("Can't notify [%s] since it is not running.", listener) + } + } +} \ No newline at end of file diff --git a/akka-http/src/main/scala/Initializer.scala b/akka-http/src/main/scala/Initializer.scala index d47357c710..da95a39b77 100644 --- a/akka-http/src/main/scala/Initializer.scala +++ b/akka-http/src/main/scala/Initializer.scala @@ -6,7 +6,7 @@ package se.scalablesolutions.akka.servlet import se.scalablesolutions.akka.remote.BootableRemoteActorService import se.scalablesolutions.akka.actor.BootableActorLoaderService -import se.scalablesolutions.akka.camel.service.CamelService +import se.scalablesolutions.akka.camel.CamelService import se.scalablesolutions.akka.config.Config import se.scalablesolutions.akka.util.{Logging, Bootable} diff --git a/akka-kernel/src/main/scala/Kernel.scala b/akka-kernel/src/main/scala/Kernel.scala index db460062c8..4b5fa83574 100644 --- a/akka-kernel/src/main/scala/Kernel.scala +++ b/akka-kernel/src/main/scala/Kernel.scala @@ -7,7 +7,7 @@ package se.scalablesolutions.akka.kernel import se.scalablesolutions.akka.servlet.AkkaLoader import se.scalablesolutions.akka.remote.BootableRemoteActorService import se.scalablesolutions.akka.actor.BootableActorLoaderService -import se.scalablesolutions.akka.camel.service.CamelService +import se.scalablesolutions.akka.camel.CamelService import se.scalablesolutions.akka.config.Config object Main { diff --git a/akka-samples/akka-sample-camel/src/main/java/sample/camel/Consumer10.java b/akka-samples/akka-sample-camel/src/main/java/sample/camel/Consumer10.java new file mode 100644 index 0000000000..f08c486dac --- /dev/null +++ b/akka-samples/akka-sample-camel/src/main/java/sample/camel/Consumer10.java @@ -0,0 +1,24 @@ +package sample.camel; + +import org.apache.camel.Body; +import org.apache.camel.Header; + +import se.scalablesolutions.akka.actor.annotation.consume; + +/** + * @author Martin Krasser + */ +public class Consumer10 { + + @consume("file:data/input2") + public void foo(String body) { + System.out.println("Received message:"); + System.out.println(body); + } + + @consume("jetty:http://0.0.0.0:8877/camel/active") + public String bar(@Body String body, @Header("name") String header) { + return String.format("body=%s header=%s", body, header); + } + +} diff --git a/akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteActiveObject1.java b/akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteActiveObject1.java new file mode 100644 index 0000000000..695aa148f4 --- /dev/null +++ b/akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteActiveObject1.java @@ -0,0 +1,18 @@ +package sample.camel; + +import org.apache.camel.Body; +import org.apache.camel.Header; + +import se.scalablesolutions.akka.actor.annotation.consume; + +/** + * @author Martin Krasser + */ +public class RemoteActiveObject1 { + + @consume("jetty:http://localhost:6644/remote-active-object-1") + public String foo(@Body String body, @Header("name") String header) { + return String.format("remote1: body=%s header=%s", body, header); + } + +} diff --git a/akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteActiveObject2.java b/akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteActiveObject2.java new file mode 100644 index 0000000000..210a72d2f8 --- /dev/null +++ b/akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteActiveObject2.java @@ -0,0 +1,17 @@ +package sample.camel; + +import org.apache.camel.Body; +import org.apache.camel.Header; +import se.scalablesolutions.akka.actor.annotation.consume; + +/** + * @author Martin Krasser + */ +public class RemoteActiveObject2 { + + @consume("jetty:http://localhost:6644/remote-active-object-2") + public String foo(@Body String body, @Header("name") String header) { + return String.format("remote2: body=%s header=%s", body, header); + } + +} \ No newline at end of file diff --git a/akka-samples/akka-sample-camel/src/main/scala/Actors.scala b/akka-samples/akka-sample-camel/src/main/scala/Actors.scala index 93aaa08ff6..25f7986730 100644 --- a/akka-samples/akka-sample-camel/src/main/scala/Actors.scala +++ b/akka-samples/akka-sample-camel/src/main/scala/Actors.scala @@ -9,10 +9,10 @@ import se.scalablesolutions.akka.util.Logging * Client-initiated remote actor. */ class RemoteActor1 extends RemoteActor("localhost", 7777) with Consumer { - def endpointUri = "jetty:http://localhost:6644/remote1" + def endpointUri = "jetty:http://localhost:6644/remote-actor-1" protected def receive = { - case msg: Message => self.reply(Message("hello %s" format msg.body, Map("sender" -> "remote1"))) + case msg: Message => self.reply(Message("hello %s" format msg.bodyAs[String], Map("sender" -> "remote1"))) } } @@ -20,10 +20,10 @@ class RemoteActor1 extends RemoteActor("localhost", 7777) with Consumer { * Server-initiated remote actor. */ class RemoteActor2 extends Actor with Consumer { - def endpointUri = "jetty:http://localhost:6644/remote2" + def endpointUri = "jetty:http://localhost:6644/remote-actor-2" protected def receive = { - case msg: Message => self.reply(Message("hello %s" format msg.body, Map("sender" -> "remote2"))) + case msg: Message => self.reply(Message("hello %s" format msg.bodyAs[String], Map("sender" -> "remote2"))) } } @@ -37,7 +37,7 @@ class Producer1 extends Actor with Producer { } class Consumer1 extends Actor with Consumer with Logging { - def endpointUri = "file:data/input" + def endpointUri = "file:data/input1" def receive = { case msg: Message => log.info("received %s" format msg.bodyAs[String]) diff --git a/akka-samples/akka-sample-camel/src/main/scala/Application1.scala b/akka-samples/akka-sample-camel/src/main/scala/Application1.scala index 6dcb437992..2b3c7b5db5 100644 --- a/akka-samples/akka-sample-camel/src/main/scala/Application1.scala +++ b/akka-samples/akka-sample-camel/src/main/scala/Application1.scala @@ -1,9 +1,9 @@ package sample.camel -import se.scalablesolutions.akka.actor.{Actor, ActorRef} import se.scalablesolutions.akka.actor.Actor._ import se.scalablesolutions.akka.camel.Message import se.scalablesolutions.akka.remote.RemoteClient +import se.scalablesolutions.akka.actor.{ActiveObject, Actor, ActorRef} /** * @author Martin Krasser @@ -15,15 +15,19 @@ object Application1 { // def main(args: Array[String]) { - implicit val sender: Option[ActorRef] = None - val actor1 = actorOf[RemoteActor1] val actor2 = RemoteClient.actorFor("remote2", "localhost", 7777) + val actobj1 = ActiveObject.newRemoteInstance(classOf[RemoteActiveObject1], "localhost", 7777) + //val actobj2 = TODO: create reference to server-managed active object (RemoteActiveObject2) + actor1.start - println(actor1 !! Message("actor1")) - println(actor2 !! Message("actor2")) + println(actor1 !! Message("actor1")) // activates and publishes actor remotely + println(actor2 !! Message("actor2")) // actor already activated and published remotely + + println(actobj1.foo("x", "y")) // activates and publishes active object methods remotely + // ... } } diff --git a/akka-samples/akka-sample-camel/src/main/scala/Application2.scala b/akka-samples/akka-sample-camel/src/main/scala/Application2.scala index 8a789d13bf..411f7b96b4 100644 --- a/akka-samples/akka-sample-camel/src/main/scala/Application2.scala +++ b/akka-samples/akka-sample-camel/src/main/scala/Application2.scala @@ -1,6 +1,6 @@ package sample.camel -import se.scalablesolutions.akka.camel.service.CamelService +import se.scalablesolutions.akka.camel.CamelService import se.scalablesolutions.akka.remote.RemoteNode import se.scalablesolutions.akka.actor.Actor._ diff --git a/akka-samples/akka-sample-camel/src/main/scala/Boot.scala b/akka-samples/akka-sample-camel/src/main/scala/Boot.scala index 206661c8fd..24e0a41605 100644 --- a/akka-samples/akka-sample-camel/src/main/scala/Boot.scala +++ b/akka-samples/akka-sample-camel/src/main/scala/Boot.scala @@ -6,8 +6,8 @@ import org.apache.camel.impl.DefaultCamelContext import org.apache.camel.spring.spi.ApplicationContextRegistry import org.springframework.context.support.ClassPathXmlApplicationContext -import se.scalablesolutions.akka.actor.SupervisorFactory import se.scalablesolutions.akka.actor.Actor._ +import se.scalablesolutions.akka.actor.{ActiveObject, SupervisorFactory} import se.scalablesolutions.akka.camel.CamelContextManager import se.scalablesolutions.akka.config.ScalaConfig._ @@ -62,6 +62,10 @@ class Boot { actorOf[Consumer4].start // POSTing "stop" to http://0.0.0.0:8877/camel/stop stops and unpublishes this actor actorOf[Consumer5].start // POSTing any msg to http://0.0.0.0:8877/camel/start starts and published Consumer4 again. + + // Active object example + + ActiveObject.newInstance(classOf[Consumer10]) } class CustomRouteBuilder extends RouteBuilder {