+ */
+
+package se.scalablesolutions.akka.camel
+
+import se.scalablesolutions.akka.actor.Actor._
+import se.scalablesolutions.akka.actor.{AspectInitRegistry, ActorRegistry}
+import se.scalablesolutions.akka.util.{Bootable, Logging}
+
+/**
+ * Used by applications (and the Kernel) to publish consumer actors and active objects via
+ * Camel endpoints and to manage the life cycle of a a global CamelContext which can be
+ * accessed via se.scalablesolutions.akka.camel.CamelContextManager.context.
+ *
+ * @author Martin Krasser
+ */
+trait CamelService extends Bootable with Logging {
+
+ import CamelContextManager._
+
+ private[camel] val consumerPublisher = actorOf[ConsumerPublisher]
+ private[camel] val publishRequestor = actorOf[PublishRequestor]
+
+ // add listener for actor registration events
+ ActorRegistry.addListener(publishRequestor)
+
+ // add listener for AspectInit registration events
+ AspectInitRegistry.addListener(publishRequestor)
+
+ /**
+ * Starts the CamelService. Any started actor that is a consumer actor will be (asynchronously)
+ * published as Camel endpoint. Consumer actors that are started after this method returned will
+ * be published as well. Actor publishing is done asynchronously. A started (loaded) CamelService
+ * also publishes @consume annotated methods of active objects that have been created
+ * with ActiveObject.newInstance(..) (and ActiveObject.newInstance(..)
+ * on a remote node).
+ */
+ abstract override def onLoad = {
+ super.onLoad
+
+ // Only init and start if not already done by application
+ if (!initialized) init
+ if (!started) start
+
+ // start actor that exposes consumer actors and active objects via Camel endpoints
+ consumerPublisher.start
+
+ // init publishRequestor so that buffered and future events are delivered to consumerPublisher
+ publishRequestor ! PublishRequestorInit(consumerPublisher)
+ }
+
+ /**
+ * Stops the CamelService.
+ */
+ abstract override def onUnload = {
+ ActorRegistry.removeListener(publishRequestor)
+ AspectInitRegistry.removeListener(publishRequestor)
+ consumerPublisher.stop
+ stop
+ super.onUnload
+ }
+
+ /**
+ * Starts the CamelService.
+ *
+ * @see onLoad
+ */
+ def load = onLoad
+
+ /**
+ * Stops the CamelService.
+ *
+ * @see onUnload
+ */
+ def unload = onUnload
+}
+
+/**
+ * CamelService companion object used by standalone applications to create their own
+ * CamelService instance.
+ *
+ * @author Martin Krasser
+ */
+object CamelService {
+
+ /**
+ * Creates a new CamelService instance.
+ */
+ def newInstance: CamelService = new DefaultCamelService
+}
+
+/**
+ * Default CamelService implementation to be created in Java applications with
+ *
+ * CamelService service = new DefaultCamelService()
+ *
+ */
+class DefaultCamelService extends CamelService {
+}
\ No newline at end of file
diff --git a/akka-camel/src/main/scala/ConsumerPublisher.scala b/akka-camel/src/main/scala/ConsumerPublisher.scala
new file mode 100644
index 0000000000..9df41a7d92
--- /dev/null
+++ b/akka-camel/src/main/scala/ConsumerPublisher.scala
@@ -0,0 +1,322 @@
+/**
+ * Copyright (C) 2009-2010 Scalable Solutions AB
+ */
+package se.scalablesolutions.akka.camel
+
+import collection.mutable.ListBuffer
+
+import java.io.InputStream
+import java.lang.reflect.Method
+import java.util.concurrent.CountDownLatch
+
+import org.apache.camel.builder.RouteBuilder
+
+import se.scalablesolutions.akka.actor._
+import se.scalablesolutions.akka.actor.annotation.consume
+import se.scalablesolutions.akka.camel.component.ActiveObjectComponent
+import se.scalablesolutions.akka.util.Logging
+
+/**
+ * @author Martin Krasser
+ */
+private[camel] object ConsumerPublisher extends Logging {
+ /**
+ * Creates a route to the registered consumer actor.
+ */
+ def handleConsumerRegistered(event: ConsumerRegistered) {
+ CamelContextManager.context.addRoutes(new ConsumerActorRoute(event.uri, event.id, event.uuid))
+ log.info("published actor %s at endpoint %s" format (event.actorRef, event.uri))
+ }
+
+ /**
+ * Stops route to the already un-registered consumer actor.
+ */
+ def handleConsumerUnregistered(event: ConsumerUnregistered) {
+ CamelContextManager.context.stopRoute(event.id)
+ log.info("unpublished actor %s from endpoint %s" format (event.actorRef, event.uri))
+ }
+
+ /**
+ * Creates a route to an active object method.
+ */
+ def handleConsumerMethodRegistered(event: ConsumerMethodRegistered) {
+ val targetMethod = event.method.getName
+ val objectId = "%s_%s" format (event.init.actorRef.uuid, targetMethod)
+
+ CamelContextManager.activeObjectRegistry.put(objectId, event.activeObject)
+ CamelContextManager.context.addRoutes(new ConsumerMethodRoute(event.uri, objectId, targetMethod))
+ log.info("published method %s of %s at endpoint %s" format (targetMethod, event.activeObject, event.uri))
+ }
+}
+
+/**
+ * Actor that publishes consumer actors and active object methods at Camel endpoints.
+ * The Camel context used for publishing is CamelContextManager.context. This actor
+ * accepts messages of type
+ * se.scalablesolutions.akka.camel.service.ConsumerRegistered,
+ * se.scalablesolutions.akka.camel.service.ConsumerMethodRegistered and
+ * se.scalablesolutions.akka.camel.service.ConsumerUnregistered.
+ *
+ * @author Martin Krasser
+ */
+private[camel] class ConsumerPublisher extends Actor {
+ import ConsumerPublisher._
+
+ @volatile private var latch = new CountDownLatch(0)
+
+ /**
+ * Adds a route to the actor identified by a Publish message to the global CamelContext.
+ */
+ protected def receive = {
+ case r: ConsumerRegistered => {
+ handleConsumerRegistered(r)
+ latch.countDown // needed for testing only.
+ }
+ case u: ConsumerUnregistered => {
+ handleConsumerUnregistered(u)
+ latch.countDown // needed for testing only.
+ }
+ case d: ConsumerMethodRegistered => {
+ handleConsumerMethodRegistered(d)
+ latch.countDown // needed for testing only.
+ }
+ case SetExpectedMessageCount(num) => {
+ // needed for testing only.
+ latch = new CountDownLatch(num)
+ self.reply(latch)
+ }
+ case _ => { /* ignore */}
+ }
+}
+
+/**
+ * Command message used For testing-purposes only.
+ */
+private[camel] case class SetExpectedMessageCount(num: Int)
+
+
+/**
+ * Defines an abstract route to a target which is either an actor or an active object method..
+ *
+ * @param endpointUri endpoint URI of the consumer actor or active object method.
+ * @param id actor identifier or active object identifier (registry key).
+ *
+ * @author Martin Krasser
+ */
+private[camel] abstract class ConsumerRoute(endpointUri: String, id: String) extends RouteBuilder {
+ // TODO: make conversions configurable
+ private val bodyConversions = Map(
+ "file" -> classOf[InputStream]
+ )
+
+ def configure = {
+ val schema = endpointUri take endpointUri.indexOf(":") // e.g. "http" from "http://whatever/..."
+ bodyConversions.get(schema) match {
+ case Some(clazz) => from(endpointUri).routeId(id).convertBodyTo(clazz).to(targetUri)
+ case None => from(endpointUri).routeId(id).to(targetUri)
+ }
+ }
+
+ protected def targetUri: String
+}
+
+/**
+ * Defines the route to a consumer actor.
+ *
+ * @param endpointUri endpoint URI of the consumer actor
+ * @param id actor identifier
+ * @param uuid true if id refers to Actor.uuid, false if
+ * id refers to Actor.getId.
+ *
+ * @author Martin Krasser
+ */
+private[camel] class ConsumerActorRoute(endpointUri: String, id: String, uuid: Boolean) extends ConsumerRoute(endpointUri, id) {
+ protected override def targetUri = (if (uuid) "actor:uuid:%s" else "actor:id:%s") format id
+}
+
+/**
+ * Defines the route to an active object method..
+ *
+ * @param endpointUri endpoint URI of the consumer actor method
+ * @param id active object identifier
+ * @param method name of the method to invoke.
+ *
+ * @author Martin Krasser
+ */
+private[camel] class ConsumerMethodRoute(val endpointUri: String, id: String, method: String) extends ConsumerRoute(endpointUri, id) {
+ protected override def targetUri = "%s:%s?method=%s" format (ActiveObjectComponent.InternalSchema, id, method)
+}
+
+/**
+ * A registration listener that triggers publication of consumer actors and active object
+ * methods as well as un-publication of consumer actors. This actor needs to be initialized
+ * with a PublishRequestorInit command message for obtaining a reference to
+ * a publisher actor. Before initialization it buffers all outbound messages
+ * and delivers them to the publisher when receiving a
+ * PublishRequestorInit message. After initialization, outbound messages are
+ * delivered directly without buffering.
+ *
+ * @see PublishRequestorInit
+ *
+ * @author Martin Krasser
+ */
+private[camel] class PublishRequestor extends Actor {
+ private val events = ListBuffer[ConsumerEvent]()
+ private var publisher: Option[ActorRef] = None
+
+ protected def receive = {
+ case ActorRegistered(actor) =>
+ for (event <- ConsumerRegistered.forConsumer(actor)) deliverCurrentEvent(event)
+ case ActorUnregistered(actor) =>
+ for (event <- ConsumerUnregistered.forConsumer(actor)) deliverCurrentEvent(event)
+ case AspectInitRegistered(proxy, init) =>
+ for (event <- ConsumerMethodRegistered.forConsumer(proxy, init)) deliverCurrentEvent(event)
+ case PublishRequestorInit(pub) => {
+ publisher = Some(pub)
+ deliverBufferedEvents
+ }
+ case _ => { /* ignore */ }
+ }
+
+ private def deliverCurrentEvent(event: ConsumerEvent) = {
+ publisher match {
+ case Some(pub) => pub ! event
+ case None => events += event
+ }
+ }
+
+ private def deliverBufferedEvents = {
+ for (event <- events) deliverCurrentEvent(event)
+ events.clear
+ }
+}
+
+/**
+ * Command message to initialize a PublishRequestor to use consumerPublisher
+ * for publishing actors or active object methods.
+ */
+private[camel] case class PublishRequestorInit(consumerPublisher: ActorRef)
+
+/**
+ * A consumer (un)registration event.
+ *
+ * @author Martin Krasser
+ */
+private[camel] sealed trait ConsumerEvent
+
+/**
+ * Event indicating that a consumer actor has been registered at the actor registry.
+ *
+ * @param actorRef actor reference
+ * @param uri endpoint URI of the consumer actor
+ * @param id actor identifier
+ * @param uuid true if id is the actor's uuid, false if
+ * id is the actor's id.
+ *
+ * @author Martin Krasser
+ */
+private[camel] case class ConsumerRegistered(actorRef: ActorRef, uri: String, id: String, uuid: Boolean) extends ConsumerEvent
+
+/**
+ * Event indicating that a consumer actor has been unregistered from the actor registry.
+ *
+ * @param actorRef actor reference
+ * @param uri endpoint URI of the consumer actor
+ * @param id actor identifier
+ * @param uuid true if id is the actor's uuid, false if
+ * id is the actor's id.
+ *
+ * @author Martin Krasser
+ */
+private[camel] case class ConsumerUnregistered(actorRef: ActorRef, uri: String, id: String, uuid: Boolean) extends ConsumerEvent
+
+/**
+ * Event indicating that an active object proxy has been created for a POJO. For each
+ * @consume annotated POJO method a separate instance of this class is
+ * created.
+ *
+ * @param activeObject active object (proxy).
+ * @param init
+ * @param uri endpoint URI of the active object method
+ * @param method method to be published.
+ *
+ * @author Martin Krasser
+ */
+private[camel] case class ConsumerMethodRegistered(activeObject: AnyRef, init: AspectInit, uri: String, method: Method) extends ConsumerEvent
+
+/**
+ * @author Martin Krasser
+ */
+private[camel] object ConsumerRegistered {
+ /**
+ * Optionally creates an ConsumerRegistered event message for a consumer actor or None if
+ * actorRef is not a consumer actor.
+ */
+ def forConsumer(actorRef: ActorRef): Option[ConsumerRegistered] = actorRef match {
+ case ConsumerDescriptor(ref, uri, id, uuid) => Some(ConsumerRegistered(ref, uri, id, uuid))
+ case _ => None
+ }
+}
+
+/**
+ * @author Martin Krasser
+ */
+private[camel] object ConsumerUnregistered {
+ /**
+ * Optionally creates an ConsumerUnregistered event message for a consumer actor or None if
+ * actorRef is not a consumer actor.
+ */
+ def forConsumer(actorRef: ActorRef): Option[ConsumerUnregistered] = actorRef match {
+ case ConsumerDescriptor(ref, uri, id, uuid) => Some(ConsumerUnregistered(ref, uri, id, uuid))
+ case _ => None
+ }
+}
+
+/**
+ * @author Martin Krasser
+ */
+private[camel] object ConsumerMethodRegistered {
+ /**
+ * Creates a list of ConsumerMethodRegistered event messages for an active object or an empty
+ * list if the active object is a proxy for an remote active object or the active object doesn't
+ * have any @consume annotated methods.
+ */
+ def forConsumer(activeObject: AnyRef, init: AspectInit): List[ConsumerMethodRegistered] = {
+ // TODO: support consumer annotation inheritance
+ // - visit overridden methods in superclasses
+ // - visit implemented method declarations in interfaces
+ if (init.remoteAddress.isDefined) Nil // let remote node publish active object methods on endpoints
+ else for (m <- activeObject.getClass.getMethods.toList; if (m.isAnnotationPresent(classOf[consume])))
+ yield ConsumerMethodRegistered(activeObject, init, m.getAnnotation(classOf[consume]).value, m)
+ }
+}
+
+/**
+ * Describes a consumer actor with elements that are relevant for publishing an actor at a
+ * Camel endpoint (or unpublishing an actor from an endpoint).
+ *
+ * @author Martin Krasser
+ */
+private[camel] object ConsumerDescriptor {
+
+ /**
+ * An extractor that optionally creates a 4-tuple from a consumer actor reference containing
+ * the actor reference itself, endpoint URI, identifier and a hint whether the identifier
+ * is the actor uuid or actor id. If actorRef doesn't reference a consumer actor,
+ * None is returned.
+ */
+ def unapply(actorRef: ActorRef): Option[(ActorRef, String, String, Boolean)] =
+ unapplyConsumerInstance(actorRef) orElse unapplyConsumeAnnotated(actorRef)
+
+ private def unapplyConsumeAnnotated(actorRef: ActorRef): Option[(ActorRef, String, String, Boolean)] = {
+ val annotation = actorRef.actorClass.getAnnotation(classOf[consume])
+ if (annotation eq null) None
+ else if (actorRef.remoteAddress.isDefined) None
+ else Some((actorRef, annotation.value, actorRef.id, false))
+ }
+
+ private def unapplyConsumerInstance(actorRef: ActorRef): Option[(ActorRef, String, String, Boolean)] =
+ if (!actorRef.actor.isInstanceOf[Consumer]) None
+ else if (actorRef.remoteAddress.isDefined) None
+ else Some((actorRef, actorRef.actor.asInstanceOf[Consumer].endpointUri, actorRef.uuid, true))
+}
diff --git a/akka-camel/src/main/scala/Producer.scala b/akka-camel/src/main/scala/Producer.scala
index 634cf60b3d..7aca16f956 100644
--- a/akka-camel/src/main/scala/Producer.scala
+++ b/akka-camel/src/main/scala/Producer.scala
@@ -160,7 +160,7 @@ trait Producer { this: Actor =>
*
* @author Martin Krasser
*/
-class ProducerResponseSender(
+private[camel] class ProducerResponseSender(
headers: Map[String, Any],
sender: Option[ActorRef],
senderFuture: Option[CompletableFuture[Any]],
diff --git a/akka-camel/src/main/scala/component/ActiveObjectComponent.scala b/akka-camel/src/main/scala/component/ActiveObjectComponent.scala
new file mode 100644
index 0000000000..d65c7e6528
--- /dev/null
+++ b/akka-camel/src/main/scala/component/ActiveObjectComponent.scala
@@ -0,0 +1,108 @@
+/**
+ * Copyright (C) 2009-2010 Scalable Solutions AB
+ */
+
+package se.scalablesolutions.akka.camel.component
+
+import java.util.Map
+import java.util.concurrent.ConcurrentHashMap
+import org.apache.camel.CamelContext
+import org.apache.camel.component.bean._
+
+/**
+ * @author Martin Krasser
+ */
+object ActiveObjectComponent {
+ /**
+ * Default schema name for active object endpoint URIs.
+ */
+ val InternalSchema = "active-object-internal"
+}
+
+/**
+ * Camel component for exchanging messages with active objects. This component
+ * tries to obtain the active object from the activeObjectRegistry
+ * first. If it's not there it tries to obtain it from the CamelContext's registry.
+ *
+ * @see org.apache.camel.component.bean.BeanComponent
+ *
+ * @author Martin Krasser
+ */
+class ActiveObjectComponent extends BeanComponent {
+ val activeObjectRegistry = new ConcurrentHashMap[String, AnyRef]
+
+ /**
+ * Creates a {@link org.apache.camel.component.bean.BeanEndpoint} with a custom
+ * bean holder that uses activeObjectRegistry for getting access to
+ * active objects (beans).
+ *
+ * @see se.scalablesolutions.akka.camel.component.ActiveObjectHolder
+ */
+ override def createEndpoint(uri: String, remaining: String, parameters: Map[String, AnyRef]) = {
+ val endpoint = new BeanEndpoint(uri, this)
+ endpoint.setBeanName(remaining)
+ endpoint.setBeanHolder(createBeanHolder(remaining))
+ setProperties(endpoint.getProcessor, parameters)
+ endpoint
+ }
+
+ private def createBeanHolder(beanName: String) =
+ new ActiveObjectHolder(activeObjectRegistry, getCamelContext, beanName).createCacheHolder
+}
+
+/**
+ * {@link org.apache.camel.component.bean.BeanHolder} implementation that uses a custom
+ * registry for getting access to active objects.
+ *
+ * @author Martin Krasser
+ */
+class ActiveObjectHolder(activeObjectRegistry: Map[String, AnyRef], context: CamelContext, name: String)
+ extends RegistryBean(context, name) {
+
+ /**
+ * Returns an {@link se.scalablesolutions.akka.camel.component.ActiveObjectInfo} instance.
+ */
+ override def getBeanInfo: BeanInfo =
+ new ActiveObjectInfo(getContext, getBean.getClass, getParameterMappingStrategy)
+
+ /**
+ * Obtains an active object from activeObjectRegistry.
+ */
+ override def getBean: AnyRef = {
+ val bean = activeObjectRegistry.get(getName)
+ if (bean eq null) super.getBean else bean
+ }
+}
+
+/**
+ * Provides active object meta information.
+ *
+ * @author Martin Krasser
+ */
+class ActiveObjectInfo(context: CamelContext, clazz: Class[_], strategy: ParameterMappingStrategy)
+ extends BeanInfo(context, clazz, strategy) {
+
+ /**
+ * Introspects AspectWerkz proxy classes.
+ *
+ * @param clazz AspectWerkz proxy class.
+ */
+ protected override def introspect(clazz: Class[_]): Unit = {
+
+ // TODO: fix target class detection in BeanInfo.introspect(Class)
+ // Camel assumes that classes containing a '$$' in the class name
+ // are classes generated with CGLIB. This conflicts with proxies
+ // created from interfaces with AspectWerkz. Once the fix is in
+ // place this method can be removed.
+
+ for (method <- clazz.getDeclaredMethods) {
+ if (isValidMethod(clazz, method)) {
+ introspect(clazz, method)
+ }
+ }
+ val superclass = clazz.getSuperclass
+ if (superclass != null && !superclass.equals(classOf[AnyRef])) {
+ introspect(superclass)
+ }
+ }
+}
\ No newline at end of file
diff --git a/akka-camel/src/main/scala/service/CamelService.scala b/akka-camel/src/main/scala/service/CamelService.scala
deleted file mode 100644
index 80fa603897..0000000000
--- a/akka-camel/src/main/scala/service/CamelService.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Copyright (C) 2009-2010 Scalable Solutions AB
- */
-
-package se.scalablesolutions.akka.camel.service
-
-import se.scalablesolutions.akka.actor.ActorRegistry
-import se.scalablesolutions.akka.actor.Actor._
-import se.scalablesolutions.akka.camel.CamelContextManager
-import se.scalablesolutions.akka.util.{Bootable, Logging}
-
-/**
- * Used by applications (and the Kernel) to publish consumer actors via Camel
- * endpoints and to manage the life cycle of a a global CamelContext which can
- * be accessed via se.scalablesolutions.akka.camel.CamelContextManager.
- *
- * @author Martin Krasser
- */
-trait CamelService extends Bootable with Logging {
-
- import CamelContextManager._
-
- private[camel] val consumerPublisher = actorOf[ConsumerPublisher]
- private[camel] val publishRequestor = actorOf(new PublishRequestor(consumerPublisher))
-
- /**
- * Starts the CamelService. Any started actor that is a consumer actor will be (asynchronously)
- * published as Camel endpoint. Consumer actors that are started after this method returned will
- * be published as well. Actor publishing is done asynchronously.
- */
- abstract override def onLoad = {
- super.onLoad
-
- // Only init and start if not already done by application
- if (!initialized) init
- if (!started) start
-
- // start actor that exposes consumer actors via Camel endpoints
- consumerPublisher.start
-
- // add listener for actor registration events
- ActorRegistry.addRegistrationListener(publishRequestor.start)
-
- // publish already registered consumer actors
- for (actor <- ActorRegistry.actors; event <- ConsumerRegistered.forConsumer(actor)) consumerPublisher ! event
- }
-
- /**
- * Stops the CamelService.
- */
- abstract override def onUnload = {
- ActorRegistry.removeRegistrationListener(publishRequestor)
- publishRequestor.stop
- consumerPublisher.stop
- stop
- super.onUnload
- }
-
- /**
- * Starts the CamelService.
- *
- * @see onLoad
- */
- def load = onLoad
-
- /**
- * Stops the CamelService.
- *
- * @see onUnload
- */
- def unload = onUnload
-}
-
-/**
- * CamelService companion object used by standalone applications to create their own
- * CamelService instance.
- *
- * @author Martin Krasser
- */
-object CamelService {
-
- /**
- * Creates a new CamelService instance.
- */
- def newInstance: CamelService = new CamelService {}
-}
diff --git a/akka-camel/src/main/scala/service/ConsumerPublisher.scala b/akka-camel/src/main/scala/service/ConsumerPublisher.scala
deleted file mode 100644
index 167f3acaa1..0000000000
--- a/akka-camel/src/main/scala/service/ConsumerPublisher.scala
+++ /dev/null
@@ -1,210 +0,0 @@
-/**
- * Copyright (C) 2009-2010 Scalable Solutions AB
- */
-package se.scalablesolutions.akka.camel.service
-
-import java.io.InputStream
-import java.util.concurrent.CountDownLatch
-
-import org.apache.camel.builder.RouteBuilder
-
-import se.scalablesolutions.akka.actor.{ActorUnregistered, ActorRegistered, Actor, ActorRef}
-import se.scalablesolutions.akka.actor.annotation.consume
-import se.scalablesolutions.akka.camel.{Consumer, CamelContextManager}
-import se.scalablesolutions.akka.util.Logging
-
-/**
- * Actor that publishes consumer actors as Camel endpoints at the CamelContext managed
- * by se.scalablesolutions.akka.camel.CamelContextManager. It accepts messages of type
- * se.scalablesolutions.akka.camel.service.ConsumerRegistered and
- * se.scalablesolutions.akka.camel.service.ConsumerUnregistered.
- *
- * @author Martin Krasser
- */
-class ConsumerPublisher extends Actor with Logging {
- @volatile private var publishLatch = new CountDownLatch(0)
- @volatile private var unpublishLatch = new CountDownLatch(0)
-
- /**
- * Adds a route to the actor identified by a Publish message to the global CamelContext.
- */
- protected def receive = {
- case r: ConsumerRegistered => {
- handleConsumerRegistered(r)
- publishLatch.countDown // needed for testing only.
- }
- case u: ConsumerUnregistered => {
- handleConsumerUnregistered(u)
- unpublishLatch.countDown // needed for testing only.
- }
- case _ => { /* ignore */}
- }
-
- /**
- * Sets the expected number of actors to be published. Used for testing only.
- */
- private[camel] def expectPublishCount(count: Int): Unit =
- publishLatch = new CountDownLatch(count)
-
- /**
- * Sets the expected number of actors to be unpublished. Used for testing only.
- */
- private[camel] def expectUnpublishCount(count: Int): Unit =
- unpublishLatch = new CountDownLatch(count)
-
- /**
- * Waits for the expected number of actors to be published. Used for testing only.
- */
- private[camel] def awaitPublish = publishLatch.await
-
- /**
- * Waits for the expected number of actors to be unpublished. Used for testing only.
- */
- private[camel] def awaitUnpublish = unpublishLatch.await
-
- /**
- * Creates a route to the registered consumer actor.
- */
- private def handleConsumerRegistered(event: ConsumerRegistered) {
- CamelContextManager.context.addRoutes(new ConsumerRoute(event.uri, event.id, event.uuid))
- log.info("published actor %s (%s) at endpoint %s" format (event.clazz, event.id, event.uri))
- }
-
- /**
- * Stops route to the already un-registered consumer actor.
- */
- private def handleConsumerUnregistered(event: ConsumerUnregistered) {
- CamelContextManager.context.stopRoute(event.id)
- log.info("unpublished actor %s (%s) from endpoint %s" format (event.clazz, event.id, event.uri))
- }
-}
-
-/**
- * Defines the route to a consumer actor.
- *
- * @param endpointUri endpoint URI of the consumer actor
- * @param id actor identifier
- * @param uuid true if id refers to Actor.uuid, false if
- * id refers to Actor.getId.
- *
- * @author Martin Krasser
- */
-class ConsumerRoute(val endpointUri: String, id: String, uuid: Boolean) extends RouteBuilder {
- // TODO: make conversions configurable
- private val bodyConversions = Map(
- "file" -> classOf[InputStream]
- )
-
- def configure = {
- val schema = endpointUri take endpointUri.indexOf(":") // e.g. "http" from "http://whatever/..."
- bodyConversions.get(schema) match {
- case Some(clazz) => from(endpointUri).routeId(id).convertBodyTo(clazz).to(actorUri)
- case None => from(endpointUri).routeId(id).to(actorUri)
- }
- }
-
- private def actorUri = (if (uuid) "actor:uuid:%s" else "actor:id:%s") format id
-}
-
-/**
- * A registration listener that triggers publication and un-publication of consumer actors.
- *
- * @author Martin Krasser
- */
-class PublishRequestor(consumerPublisher: ActorRef) extends Actor {
- protected def receive = {
- case ActorRegistered(actor) => for (event <- ConsumerRegistered.forConsumer(actor)) consumerPublisher ! event
- case ActorUnregistered(actor) => for (event <- ConsumerUnregistered.forConsumer(actor)) consumerPublisher ! event
- }
-}
-
-/**
- * Consumer actor lifecycle event.
- *
- * @author Martin Krasser
- */
-sealed trait ConsumerEvent
-
-/**
- * Event indicating that a consumer actor has been registered at the actor registry.
- *
- * @param clazz clazz name of the referenced actor
- * @param uri endpoint URI of the consumer actor
- * @param id actor identifier
- * @param uuid true if id is the actor's uuid, false if
- * id is the actor's id.
- *
- * @author Martin Krasser
- */
-case class ConsumerRegistered(clazz: String, uri: String, id: String, uuid: Boolean) extends ConsumerEvent
-
-/**
- * Event indicating that a consumer actor has been unregistered from the actor registry.
- *
- * @param clazz clazz name of the referenced actor
- * @param uri endpoint URI of the consumer actor
- * @param id actor identifier
- * @param uuid true if id is the actor's uuid, false if
- * id is the actor's id.
- *
- * @author Martin Krasser
- */
-case class ConsumerUnregistered(clazz: String, uri: String, id: String, uuid: Boolean) extends ConsumerEvent
-
-/**
- * @author Martin Krasser
- */
-private[camel] object ConsumerRegistered {
- /**
- * Optionally creates an ConsumerRegistered event message for a consumer actor or None if
- * actorRef is not a consumer actor.
- */
- def forConsumer(actorRef: ActorRef): Option[ConsumerRegistered] = actorRef match {
- case ConsumerDescriptor(clazz, uri, id, uuid) => Some(ConsumerRegistered(clazz, uri, id, uuid))
- case _ => None
- }
-}
-
-/**
- * @author Martin Krasser
- */
-private[camel] object ConsumerUnregistered {
- /**
- * Optionally creates an ConsumerUnregistered event message for a consumer actor or None if
- * actorRef is not a consumer actor.
- */
- def forConsumer(actorRef: ActorRef): Option[ConsumerUnregistered] = actorRef match {
- case ConsumerDescriptor(clazz, uri, id, uuid) => Some(ConsumerUnregistered(clazz, uri, id, uuid))
- case _ => None
- }
-}
-
-/**
- * Describes a consumer actor with elements that are relevant for publishing an actor at a
- * Camel endpoint (or unpublishing an actor from an endpoint).
- *
- * @author Martin Krasser
- */
-private[camel] object ConsumerDescriptor {
-
- /**
- * An extractor that optionally creates a 4-tuple from a consumer actor reference containing
- * the target actor's class name, endpoint URI, identifier and a hint whether the identifier
- * is the actor uuid or actor id. If actorRef doesn't reference a consumer actor,
- * None is returned.
- */
- def unapply(actorRef: ActorRef): Option[(String, String, String, Boolean)] =
- unapplyConsumerInstance(actorRef) orElse unapplyConsumeAnnotated(actorRef)
-
- private def unapplyConsumeAnnotated(actorRef: ActorRef): Option[(String, String, String, Boolean)] = {
- val annotation = actorRef.actorClass.getAnnotation(classOf[consume])
- if (annotation eq null) None
- else if (actorRef.remoteAddress.isDefined) None
- else Some((actorRef.actor.getClass.getName, annotation.value, actorRef.id, false))
- }
-
- private def unapplyConsumerInstance(actorRef: ActorRef): Option[(String, String, String, Boolean)] =
- if (!actorRef.actor.isInstanceOf[Consumer]) None
- else if (actorRef.remoteAddress.isDefined) None
- else Some((actorRef.actor.getClass.getName, actorRef.actor.asInstanceOf[Consumer].endpointUri, actorRef.uuid, true))
-}
diff --git a/akka-camel/src/test/java/se/scalablesolutions/akka/camel/Pojo.java b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/Pojo.java
new file mode 100644
index 0000000000..d1848c49ee
--- /dev/null
+++ b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/Pojo.java
@@ -0,0 +1,14 @@
+package se.scalablesolutions.akka.camel;
+
+import se.scalablesolutions.akka.actor.annotation.consume;
+
+/**
+ * @author Martin Krasser
+ */
+public class Pojo {
+
+ public String foo(String s) {
+ return String.format("foo: %s", s);
+ }
+
+}
\ No newline at end of file
diff --git a/akka-camel/src/test/java/se/scalablesolutions/akka/camel/PojoBase.java b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/PojoBase.java
new file mode 100644
index 0000000000..05bf1625bb
--- /dev/null
+++ b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/PojoBase.java
@@ -0,0 +1,34 @@
+package se.scalablesolutions.akka.camel;
+
+import org.apache.camel.Body;
+import org.apache.camel.Header;
+
+import se.scalablesolutions.akka.actor.annotation.consume;
+
+/**
+ * @author Martin Krasser
+ */
+public class PojoBase {
+
+ public String m1(String b, String h) {
+ return "m1base: " + b + " " + h;
+ }
+
+ @consume("direct:m2base")
+ public String m2(@Body String b, @Header("test") String h) {
+ return "m2base: " + b + " " + h;
+ }
+
+ @consume("direct:m3base")
+ public String m3(@Body String b, @Header("test") String h) {
+ return "m3base: " + b + " " + h;
+ }
+
+ @consume("direct:m4base")
+ public String m4(@Body String b, @Header("test") String h) {
+ return "m4base: " + b + " " + h;
+ }
+
+ public void m5(@Body String b, @Header("test") String h) {
+ }
+}
diff --git a/akka-camel/src/test/java/se/scalablesolutions/akka/camel/PojoImpl.java b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/PojoImpl.java
new file mode 100644
index 0000000000..b48202d4dc
--- /dev/null
+++ b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/PojoImpl.java
@@ -0,0 +1,23 @@
+package se.scalablesolutions.akka.camel;
+
+import org.apache.camel.Body;
+import org.apache.camel.Header;
+
+import se.scalablesolutions.akka.actor.annotation.consume;
+
+/**
+ * @author Martin Krasser
+ */
+public class PojoImpl implements PojoIntf {
+
+ public String m1(String b, String h) {
+ return "m1impl: " + b + " " + h;
+ }
+
+ @consume("direct:m2impl")
+ public String m2(@Body String b, @Header("test") String h) {
+ return "m2impl: " + b + " " + h;
+ }
+
+
+}
diff --git a/akka-camel/src/test/java/se/scalablesolutions/akka/camel/PojoIntf.java b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/PojoIntf.java
new file mode 100644
index 0000000000..14f63afd2e
--- /dev/null
+++ b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/PojoIntf.java
@@ -0,0 +1,18 @@
+package se.scalablesolutions.akka.camel;
+
+import org.apache.camel.Body;
+import org.apache.camel.Header;
+
+import se.scalablesolutions.akka.actor.annotation.consume;
+
+/**
+ * @author Martin Krasser
+ */
+public interface PojoIntf {
+
+ public String m1(String b, String h);
+
+ @consume("direct:m2intf")
+ public String m2(@Body String b, @Header("test") String h);
+
+}
diff --git a/akka-camel/src/test/java/se/scalablesolutions/akka/camel/PojoRemote.java b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/PojoRemote.java
new file mode 100644
index 0000000000..57b0999b8f
--- /dev/null
+++ b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/PojoRemote.java
@@ -0,0 +1,15 @@
+package se.scalablesolutions.akka.camel;
+
+import se.scalablesolutions.akka.actor.annotation.consume;
+
+/**
+ * @author Martin Krasser
+ */
+public class PojoRemote {
+
+ @consume("direct:remote-active-object")
+ public String foo(String s) {
+ return String.format("remote active object: %s", s);
+ }
+
+}
diff --git a/akka-camel/src/test/java/se/scalablesolutions/akka/camel/PojoSingle.java b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/PojoSingle.java
new file mode 100644
index 0000000000..7d577535b2
--- /dev/null
+++ b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/PojoSingle.java
@@ -0,0 +1,14 @@
+package se.scalablesolutions.akka.camel;
+
+import se.scalablesolutions.akka.actor.annotation.consume;
+
+/**
+ * @author Martin Krasser
+ */
+public class PojoSingle {
+
+ @consume("direct:foo")
+ public void foo(String b) {
+ }
+
+}
diff --git a/akka-camel/src/test/java/se/scalablesolutions/akka/camel/PojoSub.java b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/PojoSub.java
new file mode 100644
index 0000000000..be5b453698
--- /dev/null
+++ b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/PojoSub.java
@@ -0,0 +1,27 @@
+package se.scalablesolutions.akka.camel;
+
+import org.apache.camel.Body;
+import org.apache.camel.Header;
+
+import se.scalablesolutions.akka.actor.annotation.consume;
+
+public class PojoSub extends PojoBase {
+
+ @Override
+ @consume("direct:m1sub")
+ public String m1(@Body String b, @Header("test") String h) {
+ return "m1sub: " + b + " " + h;
+ }
+
+ @Override
+ public String m2(String b, String h) {
+ return "m2sub: " + b + " " + h;
+ }
+
+ @Override
+ @consume("direct:m3sub")
+ public String m3(@Body String b, @Header("test") String h) {
+ return "m3sub: " + b + " " + h;
+ }
+
+}
diff --git a/akka-camel/src/test/scala/service/CamelServiceFeatureTest.scala b/akka-camel/src/test/scala/CamelServiceFeatureTest.scala
similarity index 61%
rename from akka-camel/src/test/scala/service/CamelServiceFeatureTest.scala
rename to akka-camel/src/test/scala/CamelServiceFeatureTest.scala
index fd106f799f..fd57d83457 100644
--- a/akka-camel/src/test/scala/service/CamelServiceFeatureTest.scala
+++ b/akka-camel/src/test/scala/CamelServiceFeatureTest.scala
@@ -1,53 +1,36 @@
-package se.scalablesolutions.akka.camel.service
+package se.scalablesolutions.akka.camel
+
+import java.util.concurrent.{CountDownLatch, TimeUnit}
import org.apache.camel.builder.RouteBuilder
import org.scalatest.{GivenWhenThen, BeforeAndAfterAll, FeatureSpec}
-import se.scalablesolutions.akka.actor.{Actor, ActorRegistry}
-import se.scalablesolutions.akka.camel.{CamelContextManager, Message, Consumer}
-import Actor._
-
-object CamelServiceFeatureTest {
-
- class TestConsumer(uri: String) extends Actor with Consumer {
- def endpointUri = uri
- protected def receive = {
- case msg: Message => self.reply("received %s" format msg.body)
- }
- }
-
- class TestActor extends Actor {
- self.id = "custom-actor-id"
- protected def receive = {
- case msg: Message => self.reply("received %s" format msg.body)
- }
- }
-
- class TestRoute extends RouteBuilder {
- def configure {
- from("direct:custom-route-test-1").to("actor:custom-actor-id")
- }
- }
-}
+import se.scalablesolutions.akka.actor.Actor._
+import se.scalablesolutions.akka.actor.{ActiveObject, Actor, ActorRegistry}
class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with GivenWhenThen {
import CamelServiceFeatureTest._
- var service: CamelService = CamelService.newInstance
+ var service: CamelService = _
override protected def beforeAll = {
ActorRegistry.shutdownAll
+ // create new CamelService instance
+ service = CamelService.newInstance
// register test consumer before starting the CamelService
actorOf(new TestConsumer("direct:publish-test-1")).start
- // Consigure a custom camel route
+ // Configure a custom camel route
CamelContextManager.init
CamelContextManager.context.addRoutes(new TestRoute)
- // set expectations for testing purposes
- service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].expectPublishCount(1)
+ // start consumer publisher, otherwise we cannot set message
+ // count expectations in the next step (needed for testing only).
+ service.consumerPublisher.start
+ // set expectations on publish count
+ val latch = service.consumerPublisher.!).get
// start the CamelService
service.load
// await publication of first test consumer
- service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].awaitPublish
+ assert(latch.await(5000, TimeUnit.MILLISECONDS))
}
override protected def afterAll = {
@@ -60,9 +43,9 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi
scenario("access registered consumer actors via Camel direct-endpoints") {
given("two consumer actors registered before and after CamelService startup")
- service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].expectPublishCount(1)
+ val latch = service.consumerPublisher.!).get
actorOf(new TestConsumer("direct:publish-test-2")).start
- service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].awaitPublish
+ assert(latch.await(5000, TimeUnit.MILLISECONDS))
when("requests are sent to these actors")
val response1 = CamelContextManager.template.requestBody("direct:publish-test-1", "msg1")
@@ -81,14 +64,14 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi
given("a consumer actor that has been stopped")
assert(CamelContextManager.context.hasEndpoint(endpointUri) eq null)
- service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].expectPublishCount(1)
+ var latch = service.consumerPublisher.!).get
val consumer = actorOf(new TestConsumer(endpointUri)).start
- service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].awaitPublish
+ assert(latch.await(5000, TimeUnit.MILLISECONDS))
assert(CamelContextManager.context.hasEndpoint(endpointUri) ne null)
- service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].expectUnpublishCount(1)
+ latch = service.consumerPublisher.!).get
consumer.stop
- service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].awaitUnpublish
+ assert(latch.await(5000, TimeUnit.MILLISECONDS))
// endpoint is still there but the route has been stopped
assert(CamelContextManager.context.hasEndpoint(endpointUri) ne null)
@@ -114,4 +97,48 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi
assert(response === "received msg3")
}
}
+
+ feature("Publish active object methods in the global CamelContext") {
+
+ scenario("access active object methods via Camel direct-endpoints") {
+
+ given("an active object registered after CamelService startup")
+ val latch = service.consumerPublisher.!).get
+ ActiveObject.newInstance(classOf[PojoBase])
+ assert(latch.await(5000, TimeUnit.MILLISECONDS))
+
+ when("requests are sent to published methods")
+ val response1 = CamelContextManager.template.requestBodyAndHeader("direct:m2base", "x", "test", "y")
+ val response2 = CamelContextManager.template.requestBodyAndHeader("direct:m3base", "x", "test", "y")
+ val response3 = CamelContextManager.template.requestBodyAndHeader("direct:m4base", "x", "test", "y")
+
+ then("each should have returned a different response")
+ assert(response1 === "m2base: x y")
+ assert(response2 === "m3base: x y")
+ assert(response3 === "m4base: x y")
+ }
+ }
+}
+
+object CamelServiceFeatureTest {
+
+ class TestConsumer(uri: String) extends Actor with Consumer {
+ def endpointUri = uri
+ protected def receive = {
+ case msg: Message => self.reply("received %s" format msg.body)
+ }
+ }
+
+ class TestActor extends Actor {
+ self.id = "custom-actor-id"
+ protected def receive = {
+ case msg: Message => self.reply("received %s" format msg.body)
+ }
+ }
+
+ class TestRoute extends RouteBuilder {
+ def configure {
+ from("direct:custom-route-test-1").to("actor:custom-actor-id")
+ }
+ }
}
diff --git a/akka-camel/src/test/scala/ConsumerMethodRegisteredTest.scala b/akka-camel/src/test/scala/ConsumerMethodRegisteredTest.scala
new file mode 100644
index 0000000000..66d63d69e8
--- /dev/null
+++ b/akka-camel/src/test/scala/ConsumerMethodRegisteredTest.scala
@@ -0,0 +1,46 @@
+package se.scalablesolutions.akka.camel
+
+import java.net.InetSocketAddress
+
+import org.junit.Test
+import org.scalatest.junit.JUnitSuite
+
+import se.scalablesolutions.akka.actor.{AspectInit, ActiveObject}
+import se.scalablesolutions.akka.camel.ConsumerMethodRegistered._
+
+class ConsumerMethodRegisteredTest extends JUnitSuite {
+ val remoteAddress = new InetSocketAddress("localhost", 8888);
+ val remoteAspectInit = AspectInit(classOf[String], null, Some(remoteAddress), 1000)
+ val localAspectInit = AspectInit(classOf[String], null, None, 1000)
+
+ val activePojoBase = ActiveObject.newInstance(classOf[PojoBase])
+ val activePojoSub = ActiveObject.newInstance(classOf[PojoSub])
+ val activePojoIntf = ActiveObject.newInstance(classOf[PojoIntf], new PojoImpl)
+
+ val ascendingMethodName = (r1: ConsumerMethodRegistered, r2: ConsumerMethodRegistered) =>
+ r1.method.getName < r2.method.getName
+
+ @Test def shouldSelectPojoBaseMethods234 = {
+ val registered = forConsumer(activePojoBase, localAspectInit).sortWith(ascendingMethodName)
+ assert(registered.size === 3)
+ assert(registered.map(_.method.getName) === List("m2", "m3", "m4"))
+ }
+
+ @Test def shouldSelectPojoSubMethods134 = {
+ val registered = forConsumer(activePojoSub, localAspectInit).sortWith(ascendingMethodName)
+ assert(registered.size === 3)
+ assert(registered.map(_.method.getName) === List("m1", "m3", "m4"))
+ }
+
+ @Test def shouldSelectPojoIntfMethod2 = {
+ val registered = forConsumer(activePojoIntf, localAspectInit)
+ assert(registered.size === 1)
+ assert(registered(0).method.getName === "m2")
+ }
+
+ @Test def shouldIgnoreRemoteProxies = {
+ val registered = forConsumer(activePojoBase, remoteAspectInit)
+ assert(registered.size === 0)
+ }
+
+}
\ No newline at end of file
diff --git a/akka-camel/src/test/scala/service/ConsumerRegisteredTest.scala b/akka-camel/src/test/scala/ConsumerRegisteredTest.scala
similarity index 67%
rename from akka-camel/src/test/scala/service/ConsumerRegisteredTest.scala
rename to akka-camel/src/test/scala/ConsumerRegisteredTest.scala
index e69dd69339..caaa03591b 100644
--- a/akka-camel/src/test/scala/service/ConsumerRegisteredTest.scala
+++ b/akka-camel/src/test/scala/ConsumerRegisteredTest.scala
@@ -1,4 +1,4 @@
-package se.scalablesolutions.akka.camel.service
+package se.scalablesolutions.akka.camel
import org.junit.Test
import org.scalatest.junit.JUnitSuite
@@ -6,7 +6,6 @@ import org.scalatest.junit.JUnitSuite
import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.actor.annotation.consume
-import se.scalablesolutions.akka.camel.Consumer
object ConsumerRegisteredTest {
@consume("mock:test1")
@@ -29,20 +28,22 @@ class ConsumerRegisteredTest extends JUnitSuite {
import ConsumerRegisteredTest._
@Test def shouldCreatePublishRequestList = {
- val as = List(actorOf[ConsumeAnnotatedActor])
+ val a = actorOf[ConsumeAnnotatedActor]
+ val as = List(a)
val events = for (a <- as; e <- ConsumerRegistered.forConsumer(a)) yield e
- assert(events === List(ConsumerRegistered(classOf[ConsumeAnnotatedActor].getName, "mock:test1", "test", false)))
+ assert(events === List(ConsumerRegistered(a, "mock:test1", "test", false)))
}
@Test def shouldCreateSomePublishRequestWithActorId = {
- val event = ConsumerRegistered.forConsumer(actorOf[ConsumeAnnotatedActor])
- assert(event === Some(ConsumerRegistered(classOf[ConsumeAnnotatedActor].getName, "mock:test1", "test", false)))
+ val a = actorOf[ConsumeAnnotatedActor]
+ val event = ConsumerRegistered.forConsumer(a)
+ assert(event === Some(ConsumerRegistered(a, "mock:test1", "test", false)))
}
@Test def shouldCreateSomePublishRequestWithActorUuid = {
val ca = actorOf[ConsumerActor]
val event = ConsumerRegistered.forConsumer(ca)
- assert(event === Some(ConsumerRegistered(ca.actor.getClass.getName, "mock:test2", ca.uuid, true)))
+ assert(event === Some(ConsumerRegistered(ca, "mock:test2", ca.uuid, true)))
}
@Test def shouldCreateNone = {
diff --git a/akka-camel/src/test/scala/PublishRequestorTest.scala b/akka-camel/src/test/scala/PublishRequestorTest.scala
new file mode 100644
index 0000000000..f3c9a899b2
--- /dev/null
+++ b/akka-camel/src/test/scala/PublishRequestorTest.scala
@@ -0,0 +1,69 @@
+package se.scalablesolutions.akka.camel
+
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+
+import org.junit.{Before, After, Test}
+import org.scalatest.junit.JUnitSuite
+
+import se.scalablesolutions.akka.actor._
+import se.scalablesolutions.akka.actor.Actor._
+import se.scalablesolutions.akka.camel.support.{SetExpectedMessageCount => SetExpectedTestMessageCount, _}
+
+class PublishRequestorTest extends JUnitSuite {
+ import PublishRequestorTest._
+
+ var publisher: ActorRef = _
+ var requestor: ActorRef = _
+ var consumer: ActorRef = _
+
+ @Before def setUp = {
+ publisher = actorOf[PublisherMock].start
+ requestor = actorOf[PublishRequestor].start
+ requestor ! PublishRequestorInit(publisher)
+ consumer = actorOf(new Actor with Consumer {
+ def endpointUri = "mock:test"
+ protected def receive = null
+ }).start
+
+ }
+
+ @After def tearDown = {
+ ActorRegistry.shutdownAll
+ }
+
+ @Test def shouldReceiveConsumerMethodRegisteredEvent = {
+ val obj = ActiveObject.newInstance(classOf[PojoSingle])
+ val init = AspectInit(classOf[PojoSingle], null, None, 1000)
+ val latch = publisher.!).get
+ requestor ! AspectInitRegistered(obj, init)
+ assert(latch.await(5000, TimeUnit.MILLISECONDS))
+ val event = (publisher !! GetRetainedMessage).get.asInstanceOf[ConsumerMethodRegistered]
+ assert(event.init === init)
+ assert(event.uri === "direct:foo")
+ assert(event.activeObject === obj)
+ assert(event.method.getName === "foo")
+ }
+
+ @Test def shouldReceiveConsumerRegisteredEvent = {
+ val latch = publisher.!).get
+ requestor ! ActorRegistered(consumer)
+ assert(latch.await(5000, TimeUnit.MILLISECONDS))
+ assert((publisher !! GetRetainedMessage) ===
+ Some(ConsumerRegistered(consumer, "mock:test", consumer.uuid, true)))
+ }
+
+ @Test def shouldReceiveConsumerUnregisteredEvent = {
+ val latch = publisher.!).get
+ requestor ! ActorUnregistered(consumer)
+ assert(latch.await(5000, TimeUnit.MILLISECONDS))
+ assert((publisher !! GetRetainedMessage) ===
+ Some(ConsumerUnregistered(consumer, "mock:test", consumer.uuid, true)))
+ }
+}
+
+object PublishRequestorTest {
+ class PublisherMock extends TestActor with Retain with Countdown {
+ def handler = retain andThen countdown
+ }
+}
+
diff --git a/akka-camel/src/test/scala/RemoteConsumerTest.scala b/akka-camel/src/test/scala/RemoteConsumerTest.scala
new file mode 100644
index 0000000000..e1a7842e0d
--- /dev/null
+++ b/akka-camel/src/test/scala/RemoteConsumerTest.scala
@@ -0,0 +1,89 @@
+package se.scalablesolutions.akka.camel
+
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+
+import org.scalatest.{GivenWhenThen, BeforeAndAfterAll, FeatureSpec}
+
+import se.scalablesolutions.akka.actor.Actor._
+import se.scalablesolutions.akka.actor.{ActiveObject, ActorRegistry, RemoteActor}
+import se.scalablesolutions.akka.remote.{RemoteClient, RemoteServer}
+
+/**
+ * @author Martin Krasser
+ */
+class RemoteConsumerTest extends FeatureSpec with BeforeAndAfterAll with GivenWhenThen {
+ import RemoteConsumerTest._
+
+ var service: CamelService = _
+ var server: RemoteServer = _
+
+ override protected def beforeAll = {
+ ActorRegistry.shutdownAll
+
+ service = CamelService.newInstance
+ service.load
+
+ server = new RemoteServer()
+ server.start(host, port)
+
+ Thread.sleep(1000)
+ }
+
+ override protected def afterAll = {
+ server.shutdown
+ service.unload
+
+ RemoteClient.shutdownAll
+ ActorRegistry.shutdownAll
+
+ Thread.sleep(1000)
+ }
+
+ feature("Client-initiated remote consumer actor") {
+ scenario("access published remote consumer actor") {
+ given("a client-initiated remote consumer actor")
+ val consumer = actorOf[RemoteConsumer].start
+
+ when("remote consumer publication is triggered")
+ val latch = service.consumerPublisher.!).get
+ consumer !! "init"
+ assert(latch.await(5000, TimeUnit.MILLISECONDS))
+
+ then("the published actor is accessible via its endpoint URI")
+ val response = CamelContextManager.template.requestBody("direct:remote-actor", "test")
+ assert(response === "remote actor: test")
+ }
+ }
+
+ /* TODO: enable once issues with remote active objects are resolved
+ feature("Client-initiated remote consumer active object") {
+ scenario("access published remote consumer method") {
+ given("a client-initiated remote consumer active object")
+ val consumer = ActiveObject.newRemoteInstance(classOf[PojoRemote], host, port)
+
+ when("remote consumer publication is triggered")
+ val latch = service.consumerPublisher.!).get
+ consumer.foo("init")
+ assert(latch.await(5000, TimeUnit.MILLISECONDS))
+
+ then("the published method is accessible via its endpoint URI")
+ val response = CamelContextManager.template.requestBody("direct:remote-active-object", "test")
+ assert(response === "remote active object: test")
+ }
+ }
+ */
+}
+
+object RemoteConsumerTest {
+ val host = "localhost"
+ val port = 7774
+
+ class RemoteConsumer extends RemoteActor(host, port) with Consumer {
+ def endpointUri = "direct:remote-actor"
+
+ protected def receive = {
+ case "init" => self.reply("done")
+ case m: Message => self.reply("remote actor: %s" format m.body)
+ }
+ }
+}
\ No newline at end of file
diff --git a/akka-camel/src/test/scala/component/ActiveObjectComponentFeatureTest.scala b/akka-camel/src/test/scala/component/ActiveObjectComponentFeatureTest.scala
new file mode 100644
index 0000000000..d80eedfd7a
--- /dev/null
+++ b/akka-camel/src/test/scala/component/ActiveObjectComponentFeatureTest.scala
@@ -0,0 +1,105 @@
+package se.scalablesolutions.akka.camel.component
+
+import org.scalatest.{BeforeAndAfterEach, BeforeAndAfterAll, FeatureSpec}
+
+import org.apache.camel.builder.RouteBuilder
+import se.scalablesolutions.akka.actor.Actor._
+import se.scalablesolutions.akka.actor.{ActorRegistry, ActiveObject}
+import se.scalablesolutions.akka.camel._
+import org.apache.camel.impl.{DefaultCamelContext, SimpleRegistry}
+import org.apache.camel.{ResolveEndpointFailedException, ExchangePattern, Exchange, Processor}
+
+/**
+ * @author Martin Krasser
+ */
+class ActiveObjectComponentFeatureTest extends FeatureSpec with BeforeAndAfterAll with BeforeAndAfterEach {
+ import ActiveObjectComponentFeatureTest._
+ import CamelContextManager.template
+
+ override protected def beforeAll = {
+ val activePojo = ActiveObject.newInstance(classOf[Pojo]) // not a consumer
+ val activePojoBase = ActiveObject.newInstance(classOf[PojoBase])
+ val activePojoIntf = ActiveObject.newInstance(classOf[PojoIntf], new PojoImpl)
+
+ val registry = new SimpleRegistry
+ registry.put("pojo", activePojo)
+
+ CamelContextManager.init(new DefaultCamelContext(registry))
+ CamelContextManager.context.addRoutes(new CustomRouteBuilder)
+ CamelContextManager.start
+
+ CamelContextManager.activeObjectRegistry.put("base", activePojoBase)
+ CamelContextManager.activeObjectRegistry.put("intf", activePojoIntf)
+ }
+
+ override protected def afterAll = {
+ CamelContextManager.stop
+ ActorRegistry.shutdownAll
+ }
+
+ feature("Communicate with an active object from a Camel application using active object endpoint URIs") {
+ import ActiveObjectComponent.InternalSchema
+ import ExchangePattern._
+
+ scenario("in-out exchange with proxy created from interface and method returning String") {
+ val result = template.requestBodyAndHeader("%s:intf?method=m2" format InternalSchema, "x", "test", "y")
+ assert(result === "m2impl: x y")
+ }
+
+ scenario("in-out exchange with proxy created from class and method returning String") {
+ val result = template.requestBodyAndHeader("%s:base?method=m2" format InternalSchema, "x", "test", "y")
+ assert(result === "m2base: x y")
+ }
+
+ scenario("in-out exchange with proxy created from class and method returning void") {
+ val result = template.requestBodyAndHeader("%s:base?method=m5" format InternalSchema, "x", "test", "y")
+ assert(result === "x") // returns initial body
+ }
+
+ scenario("in-only exchange with proxy created from class and method returning String") {
+ val result = template.send("%s:base?method=m2" format InternalSchema, InOnly, new Processor {
+ def process(exchange: Exchange) = {
+ exchange.getIn.setBody("x")
+ exchange.getIn.setHeader("test", "y")
+ }
+ });
+ assert(result.getPattern === InOnly)
+ assert(result.getIn.getBody === "m2base: x y")
+ assert(result.getOut.getBody === null)
+ }
+
+ scenario("in-only exchange with proxy created from class and method returning void") {
+ val result = template.send("%s:base?method=m5" format InternalSchema, InOnly, new Processor {
+ def process(exchange: Exchange) = {
+ exchange.getIn.setBody("x")
+ exchange.getIn.setHeader("test", "y")
+ }
+ });
+ assert(result.getPattern === InOnly)
+ assert(result.getIn.getBody === "x")
+ assert(result.getOut.getBody === null)
+ }
+ }
+
+ feature("Communicate with an active object from a Camel application from a custom Camel route") {
+
+ scenario("in-out exchange with externally registered active object") {
+ val result = template.requestBody("direct:test", "test")
+ assert(result === "foo: test")
+ }
+
+ scenario("in-out exchange with internally registered active object not possible") {
+ intercept[ResolveEndpointFailedException] {
+ template.requestBodyAndHeader("active-object:intf?method=m2", "x", "test", "y")
+ }
+ }
+ }
+}
+
+object ActiveObjectComponentFeatureTest {
+ class CustomRouteBuilder extends RouteBuilder {
+ def configure = {
+ from("direct:test").to("active-object:pojo?method=foo")
+ }
+ }
+}
diff --git a/akka-camel/src/test/scala/component/ActorComponentFeatureTest.scala b/akka-camel/src/test/scala/component/ActorComponentFeatureTest.scala
index 7db119fa5e..f73a2fcd3e 100644
--- a/akka-camel/src/test/scala/component/ActorComponentFeatureTest.scala
+++ b/akka-camel/src/test/scala/component/ActorComponentFeatureTest.scala
@@ -1,11 +1,14 @@
package se.scalablesolutions.akka.camel.component
+import java.util.concurrent.{TimeUnit, CountDownLatch}
+
import org.apache.camel.RuntimeCamelException
import org.scalatest.{BeforeAndAfterEach, BeforeAndAfterAll, FeatureSpec}
+import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.actor.{ActorRegistry, Actor}
-import se.scalablesolutions.akka.camel.support.{Respond, Countdown, Tester, Retain}
import se.scalablesolutions.akka.camel.{Message, CamelContextManager}
+import se.scalablesolutions.akka.camel.support._
class ActorComponentFeatureTest extends FeatureSpec with BeforeAndAfterAll with BeforeAndAfterEach {
override protected def beforeAll = {
@@ -20,41 +23,37 @@ class ActorComponentFeatureTest extends FeatureSpec with BeforeAndAfterAll with
feature("Communicate with an actor from a Camel application using actor endpoint URIs") {
import CamelContextManager.template
- import Actor._
scenario("one-way communication using actor id") {
- val actor = actorOf(new Tester with Retain with Countdown[Message])
- actor.start
+ val actor = actorOf[Tester1].start
+ val latch = actor.!).get
template.sendBody("actor:%s" format actor.id, "Martin")
- assert(actor.actor.asInstanceOf[Countdown[Message]].waitFor)
- assert(actor.actor.asInstanceOf[Retain].body === "Martin")
+ assert(latch.await(5000, TimeUnit.MILLISECONDS))
+ val reply = (actor !! GetRetainedMessage).get.asInstanceOf[Message]
+ assert(reply.body === "Martin")
}
scenario("one-way communication using actor uuid") {
- val actor = actorOf(new Tester with Retain with Countdown[Message])
- actor.start
+ val actor = actorOf[Tester1].start
+ val latch = actor.!).get
template.sendBody("actor:uuid:%s" format actor.uuid, "Martin")
- assert(actor.actor.asInstanceOf[Countdown[Message]].waitFor)
- assert(actor.actor.asInstanceOf[Retain].body === "Martin")
+ assert(latch.await(5000, TimeUnit.MILLISECONDS))
+ val reply = (actor !! GetRetainedMessage).get.asInstanceOf[Message]
+ assert(reply.body === "Martin")
}
scenario("two-way communication using actor id") {
- val actor = actorOf(new Tester with Respond)
- actor.start
+ val actor = actorOf[Tester2].start
assert(template.requestBody("actor:%s" format actor.id, "Martin") === "Hello Martin")
}
scenario("two-way communication using actor uuid") {
- val actor = actorOf(new Tester with Respond)
- actor.start
+ val actor = actorOf[Tester2].start
assert(template.requestBody("actor:uuid:%s" format actor.uuid, "Martin") === "Hello Martin")
}
scenario("two-way communication with timeout") {
- val actor = actorOf(new Tester {
- self.timeout = 1
- })
- actor.start
+ val actor = actorOf[Tester3].start
intercept[RuntimeCamelException] {
template.requestBody("actor:uuid:%s" format actor.uuid, "Martin")
}
diff --git a/akka-camel/src/test/scala/component/ActorProducerTest.scala b/akka-camel/src/test/scala/component/ActorProducerTest.scala
index d41971c07f..419784681b 100644
--- a/akka-camel/src/test/scala/component/ActorProducerTest.scala
+++ b/akka-camel/src/test/scala/component/ActorProducerTest.scala
@@ -2,41 +2,41 @@ package se.scalablesolutions.akka.camel.component
import ActorComponentTest._
-import java.util.concurrent.TimeoutException
+import java.util.concurrent.{CountDownLatch, TimeoutException, TimeUnit}
import org.apache.camel.ExchangePattern
import org.junit.{After, Test}
import org.scalatest.junit.JUnitSuite
import org.scalatest.BeforeAndAfterAll
-import se.scalablesolutions.akka.actor.ActorRegistry
import se.scalablesolutions.akka.actor.Actor._
-import se.scalablesolutions.akka.camel.support.{Countdown, Retain, Tester, Respond}
+import se.scalablesolutions.akka.actor.ActorRegistry
import se.scalablesolutions.akka.camel.{Failure, Message}
+import se.scalablesolutions.akka.camel.support._
class ActorProducerTest extends JUnitSuite with BeforeAndAfterAll {
@After def tearDown = ActorRegistry.shutdownAll
@Test def shouldSendMessageToActor = {
- val actor = actorOf(new Tester with Retain with Countdown[Message])
+ val actor = actorOf[Tester1].start
+ val latch = actor.!).get
val endpoint = mockEndpoint("actor:uuid:%s" format actor.uuid)
val exchange = endpoint.createExchange(ExchangePattern.InOnly)
- actor.start
exchange.getIn.setBody("Martin")
exchange.getIn.setHeader("k1", "v1")
endpoint.createProducer.process(exchange)
- actor.actor.asInstanceOf[Countdown[Message]].waitFor
- assert(actor.actor.asInstanceOf[Retain].body === "Martin")
- assert(actor.actor.asInstanceOf[Retain].headers === Map(Message.MessageExchangeId -> exchange.getExchangeId, "k1" -> "v1"))
+ assert(latch.await(5000, TimeUnit.MILLISECONDS))
+ val reply = (actor !! GetRetainedMessage).get.asInstanceOf[Message]
+ assert(reply.body === "Martin")
+ assert(reply.headers === Map(Message.MessageExchangeId -> exchange.getExchangeId, "k1" -> "v1"))
}
@Test def shouldSendMessageToActorAndReceiveResponse = {
- val actor = actorOf(new Tester with Respond {
+ val actor = actorOf(new Tester2 {
override def response(msg: Message) = Message(super.response(msg), Map("k2" -> "v2"))
- })
+ }).start
val endpoint = mockEndpoint("actor:uuid:%s" format actor.uuid)
val exchange = endpoint.createExchange(ExchangePattern.InOut)
- actor.start
exchange.getIn.setBody("Martin")
exchange.getIn.setHeader("k1", "v1")
endpoint.createProducer.process(exchange)
@@ -45,12 +45,11 @@ class ActorProducerTest extends JUnitSuite with BeforeAndAfterAll {
}
@Test def shouldSendMessageToActorAndReceiveFailure = {
- val actor = actorOf(new Tester with Respond {
+ val actor = actorOf(new Tester2 {
override def response(msg: Message) = Failure(new Exception("testmsg"), Map("k3" -> "v3"))
- })
+ }).start
val endpoint = mockEndpoint("actor:uuid:%s" format actor.uuid)
val exchange = endpoint.createExchange(ExchangePattern.InOut)
- actor.start
exchange.getIn.setBody("Martin")
exchange.getIn.setHeader("k1", "v1")
endpoint.createProducer.process(exchange)
@@ -60,12 +59,9 @@ class ActorProducerTest extends JUnitSuite with BeforeAndAfterAll {
}
@Test def shouldSendMessageToActorAndTimeout: Unit = {
- val actor = actorOf(new Tester {
- self.timeout = 1
- })
+ val actor = actorOf[Tester3].start
val endpoint = mockEndpoint("actor:uuid:%s" format actor.uuid)
val exchange = endpoint.createExchange(ExchangePattern.InOut)
- actor.start
exchange.getIn.setBody("Martin")
intercept[TimeoutException] {
endpoint.createProducer.process(exchange)
diff --git a/akka-camel/src/test/scala/service/PublishRequestorTest.scala b/akka-camel/src/test/scala/service/PublishRequestorTest.scala
deleted file mode 100644
index a7f1685de1..0000000000
--- a/akka-camel/src/test/scala/service/PublishRequestorTest.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-package se.scalablesolutions.akka.camel.service
-
-import _root_.org.junit.{Before, After, Test}
-import org.scalatest.junit.JUnitSuite
-
-import se.scalablesolutions.akka.camel.Consumer
-import se.scalablesolutions.akka.camel.support.{Receive, Countdown}
-import se.scalablesolutions.akka.actor.{Actor, ActorRef, ActorRegistry, ActorRegistered, ActorUnregistered}
-import Actor._
-
-object PublishRequestorTest {
- class PublisherMock extends Actor with Receive[ConsumerEvent] {
- var received: ConsumerEvent = _
- protected def receive = {
- case msg: ConsumerRegistered => onMessage(msg)
- case msg: ConsumerUnregistered => onMessage(msg)
- }
- def onMessage(msg: ConsumerEvent) = received = msg
- }
-}
-
-class PublishRequestorTest extends JUnitSuite {
- import PublishRequestorTest._
-
- var publisher: ActorRef = _
- var requestor: ActorRef = _
- var consumer: ActorRef = _
-
- @Before def setUp = {
- publisher = actorOf(new PublisherMock with Countdown[ConsumerEvent]).start
- requestor = actorOf(new PublishRequestor(publisher)).start
- consumer = actorOf(new Actor with Consumer {
- def endpointUri = "mock:test"
- protected def receive = null
- }).start
- }
-
- @After def tearDown = {
- ActorRegistry.shutdownAll
- }
-
- @Test def shouldReceiveConsumerRegisteredEvent = {
- requestor.!(ActorRegistered(consumer))(None)
- publisher.actor.asInstanceOf[Countdown[ConsumerEvent]].waitFor
- assert(publisher.actor.asInstanceOf[PublisherMock].received ===
- ConsumerRegistered(consumer.actor.getClass.getName, "mock:test", consumer.uuid, true))
- }
-
- @Test def shouldReceiveConsumerUnregisteredEvent = {
- requestor.!(ActorUnregistered(consumer))(None)
- publisher.actor.asInstanceOf[Countdown[ConsumerRegistered]].waitFor
- assert(publisher.actor.asInstanceOf[PublisherMock].received ===
- ConsumerUnregistered(consumer.actor.getClass.getName, "mock:test", consumer.uuid, true))
- }
-}
diff --git a/akka-camel/src/test/scala/support/TestSupport.scala b/akka-camel/src/test/scala/support/TestSupport.scala
index 98bd23d5ed..a4a0d177ab 100644
--- a/akka-camel/src/test/scala/support/TestSupport.scala
+++ b/akka-camel/src/test/scala/support/TestSupport.scala
@@ -5,45 +5,71 @@ import java.util.concurrent.{TimeUnit, CountDownLatch}
import se.scalablesolutions.akka.camel.Message
import se.scalablesolutions.akka.actor.Actor
-trait Receive[T] {
- def onMessage(msg: T): Unit
+import TestSupport._
+
+object TestSupport {
+ type Handler = PartialFunction[Any, Any]
}
-trait Respond extends Receive[Message] { this: Actor =>
- abstract override def onMessage(msg: Message): Unit = {
- super.onMessage(msg)
- this.self.reply(response(msg))
+trait TestActor extends Actor {
+ def receive = {
+ case msg => {
+ handler(msg)
+ }
}
+
+ def handler: Handler
+}
+
+class Tester1 extends TestActor with Retain with Countdown {
+ def handler = retain andThen countdown
+}
+
+class Tester2 extends TestActor with Respond {
+ def handler = respond
+}
+
+class Tester3 extends TestActor with Noop {
+ self.timeout = 1
+ def handler = noop
+}
+
+trait Countdown { this: Actor =>
+ var latch: CountDownLatch = new CountDownLatch(0)
+ def countdown: Handler = {
+ case SetExpectedMessageCount(num) => {
+ latch = new CountDownLatch(num)
+ self.reply(latch)
+ }
+ case msg => latch.countDown
+ }
+}
+
+trait Respond { this: Actor =>
+ def respond: Handler = {
+ case msg: Message => self.reply(response(msg))
+ }
+
def response(msg: Message): Any = "Hello %s" format msg.body
}
-trait Retain extends Receive[Message] {
- var body: Any = _
- var headers = Map.empty[String, Any]
- abstract override def onMessage(msg: Message): Unit = {
- super.onMessage(msg)
- body = msg.body
- headers = msg.headers
+trait Retain { this: Actor =>
+ var message: Any = _
+
+ def retain: Handler = {
+ case GetRetainedMessage => self.reply(message)
+ case msg => {
+ message = msg
+ msg
+ }
}
}
-trait Countdown[T] extends Receive[T] {
- val count = 1
- val duration = 5000
- val latch = new CountDownLatch(count)
-
- def waitFor = latch.await(duration, TimeUnit.MILLISECONDS)
- def countDown = latch.countDown
-
- abstract override def onMessage(msg: T) = {
- super.onMessage(msg)
- countDown
+trait Noop { this: Actor =>
+ def noop: Handler = {
+ case msg => msg
}
}
-class Tester extends Actor with Receive[Message] {
- def receive = {
- case msg: Message => onMessage(msg)
- }
- def onMessage(msg: Message): Unit = {}
-}
+case class SetExpectedMessageCount(num: Int)
+case class GetRetainedMessage()
diff --git a/akka-core/src/main/java/se/scalablesolutions/akka/annotation/consume.java b/akka-core/src/main/java/se/scalablesolutions/akka/annotation/consume.java
index 11552609f5..1a8e5c8db6 100644
--- a/akka-core/src/main/java/se/scalablesolutions/akka/annotation/consume.java
+++ b/akka-core/src/main/java/se/scalablesolutions/akka/annotation/consume.java
@@ -10,7 +10,7 @@ import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
-@Target(ElementType.TYPE)
+@Target({ElementType.TYPE, ElementType.METHOD})
public @interface consume {
public abstract String value();
diff --git a/akka-core/src/main/scala/actor/ActiveObject.scala b/akka-core/src/main/scala/actor/ActiveObject.scala
index 0780e355dd..a2d2820cca 100644
--- a/akka-core/src/main/scala/actor/ActiveObject.scala
+++ b/akka-core/src/main/scala/actor/ActiveObject.scala
@@ -476,7 +476,7 @@ object ActiveObject extends Logging {
}
-private[akka] object AspectInitRegistry {
+private[akka] object AspectInitRegistry extends ListenerManagement {
private val initializations = new java.util.concurrent.ConcurrentHashMap[AnyRef, AspectInit]
def initFor(target: AnyRef) = {
@@ -485,9 +485,16 @@ private[akka] object AspectInitRegistry {
init
}
- def register(target: AnyRef, init: AspectInit) = initializations.put(target, init)
+ def register(target: AnyRef, init: AspectInit) = {
+ val res = initializations.put(target, init)
+ foreachListener(_ ! AspectInitRegistered(target, init))
+ res
+ }
}
+private[akka] sealed trait AspectInitRegistryEvent
+private[akka] case class AspectInitRegistered(proxy: AnyRef, init: AspectInit) extends AspectInitRegistryEvent
+
private[akka] sealed case class AspectInit(
val target: Class[_],
val actorRef: ActorRef,
diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala
index 3b7d62f97f..4c19e9b115 100644
--- a/akka-core/src/main/scala/actor/Actor.scala
+++ b/akka-core/src/main/scala/actor/Actor.scala
@@ -228,12 +228,11 @@ object Actor extends Logging {
def spawn(body: => Unit): Unit = {
case object Spawn
actorOf(new Actor() {
- self.start
- self ! Spawn
def receive = {
case Spawn => body; self.stop
}
- })
+ }).start ! Spawn
+
}
}
@@ -413,6 +412,22 @@ trait Actor extends Logging {
*/
def initTransactionalState {}
+ /**
+ * Use reply(..) to reply with a message to the original sender of the message currently
+ * being processed.
+ *
+ * Throws an IllegalStateException if unable to determine what to reply to.
+ */
+ def reply(message: Any) = self.reply(message)
+
+ /**
+ * Use reply_?(..) to reply with a message to the original sender of the message currently
+ * being processed.
+ *
+ * Returns true if reply was sent, and false if unable to determine what to reply to.
+ */
+ def reply_?(message: Any): Boolean = self.reply_?(message)
+
// =========================================
// ==== INTERNAL IMPLEMENTATION DETAILS ====
// =========================================
diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala
index 3cece45a18..5f08c7b900 100644
--- a/akka-core/src/main/scala/actor/ActorRef.scala
+++ b/akka-core/src/main/scala/actor/ActorRef.scala
@@ -119,7 +119,7 @@ trait ActorRef extends TransactionManagement {
*
* Identifier for actor, does not have to be a unique one. Default is the 'uuid'.
*
- * This field is used for logging, AspectRegistry.actorsFor, identifier for remote
+ * This field is used for logging, AspectRegistry.actorsFor(id), identifier for remote
* actor in RemoteServer etc.But also as the identifier for persistence, which means
* that you can use a custom name to be able to retrieve the "correct" persisted state
* upon restart, remote restart etc.
@@ -208,8 +208,8 @@ trait ActorRef extends TransactionManagement {
protected[akka] var _sender: Option[ActorRef] = None
protected[akka] var _senderFuture: Option[CompletableFuture[Any]] = None
- protected[akka] def sender_=(s: Option[ActorRef]) = guard.withGuard { _sender = s}
- protected[akka] def senderFuture_=(sf: Option[CompletableFuture[Any]]) = guard.withGuard { _senderFuture = sf}
+ protected[akka] def sender_=(s: Option[ActorRef]) = guard.withGuard { _sender = s }
+ protected[akka] def senderFuture_=(sf: Option[CompletableFuture[Any]]) = guard.withGuard { _senderFuture = sf }
/**
* The reference sender Actor of the last received message.
@@ -243,6 +243,11 @@ trait ActorRef extends TransactionManagement {
*/
def uuid = _uuid
+ /**
+ * Tests if the actor is able to handle the message passed in as arguments.
+ */
+ def isDefinedAt(message: Any): Boolean = actor.base.isDefinedAt(message)
+
/**
* Only for internal use. UUID is effectively final.
*/
@@ -838,6 +843,11 @@ sealed class LocalActorRef private[akka](
*/
def mailboxSize: Int = _mailbox.size
+ /**
+ * Returns a copy of all the messages, put into a List[MessageInvocation].
+ */
+ def messagesInMailbox: List[MessageInvocation] = _mailbox.toArray.toList.asInstanceOf[List[MessageInvocation]]
+
/**
* Shuts down and removes all linked actors.
*/
@@ -886,7 +896,6 @@ sealed class LocalActorRef private[akka](
}
protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = {
- sender = senderOption
joinTransaction(message)
if (remoteAddress.isDefined) {
@@ -919,7 +928,6 @@ sealed class LocalActorRef private[akka](
timeout: Long,
senderOption: Option[ActorRef],
senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = {
- sender = senderOption
joinTransaction(message)
if (remoteAddress.isDefined) {
@@ -969,9 +977,9 @@ sealed class LocalActorRef private[akka](
Actor.log.warning("Actor [%s] is shut down, ignoring message [%s]", toString, messageHandle)
return
}
+ sender = messageHandle.sender
+ senderFuture = messageHandle.senderFuture
try {
- sender = messageHandle.sender
- senderFuture = messageHandle.senderFuture
if (TransactionManagement.isTransactionalityEnabled) transactionalDispatch(messageHandle)
else dispatch(messageHandle)
} catch {
@@ -985,9 +993,7 @@ sealed class LocalActorRef private[akka](
val message = messageHandle.message //serializeMessage(messageHandle.message)
setTransactionSet(messageHandle.transactionSet)
try {
- if (actor.base.isDefinedAt(message)) actor.base(message) // invoke user actor's receive partial function
- else throw new IllegalArgumentException(
- "No handler matching message [" + message + "] in " + toString)
+ actor.base(message)
} catch {
case e =>
_isBeingRestarted = true
@@ -1016,20 +1022,16 @@ sealed class LocalActorRef private[akka](
}
setTransactionSet(txSet)
- def proceed = {
- if (actor.base.isDefinedAt(message)) actor.base(message) // invoke user actor's receive partial function
- else throw new IllegalArgumentException(
- toString + " could not process message [" + message + "]" +
- "\n\tsince no matching 'case' clause in its 'receive' method could be found")
- setTransactionSet(txSet) // restore transaction set to allow atomic block to do commit
- }
-
try {
if (isTransactor) {
atomic {
- proceed
+ actor.base(message)
+ setTransactionSet(txSet) // restore transaction set to allow atomic block to do commit
}
- } else proceed
+ } else {
+ actor.base(message)
+ setTransactionSet(txSet) // restore transaction set to allow atomic block to do commit
+ }
} catch {
case e: IllegalStateException => {}
case e =>
diff --git a/akka-core/src/main/scala/actor/ActorRegistry.scala b/akka-core/src/main/scala/actor/ActorRegistry.scala
index ee1b6e2b4f..b9827fdb9c 100644
--- a/akka-core/src/main/scala/actor/ActorRegistry.scala
+++ b/akka-core/src/main/scala/actor/ActorRegistry.scala
@@ -4,11 +4,10 @@
package se.scalablesolutions.akka.actor
-import se.scalablesolutions.akka.util.Logging
-
import scala.collection.mutable.ListBuffer
import scala.reflect.Manifest
import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap}
+import se.scalablesolutions.akka.util.ListenerManagement
sealed trait ActorRegistryEvent
case class ActorRegistered(actor: ActorRef) extends ActorRegistryEvent
@@ -26,11 +25,10 @@ case class ActorUnregistered(actor: ActorRef) extends ActorRegistryEvent
*
* @author Jonas Bonér
*/
-object ActorRegistry extends Logging {
+object ActorRegistry extends ListenerManagement {
private val actorsByUUID = new ConcurrentHashMap[String, ActorRef]
private val actorsById = new ConcurrentHashMap[String, List[ActorRef]]
private val actorsByClassName = new ConcurrentHashMap[String, List[ActorRef]]
- private val registrationListeners = new CopyOnWriteArrayList[ActorRef]
/**
* Returns all actors in the system.
@@ -141,29 +139,4 @@ object ActorRegistry extends Logging {
actorsByClassName.clear
log.info("All actors have been shut down and unregistered from ActorRegistry")
}
-
- /**
- * Adds the registration listener this this registry's listener list.
- */
- def addRegistrationListener(listener: ActorRef) = {
- listener.start
- registrationListeners.add(listener)
- }
-
- /**
- * Removes the registration listener this this registry's listener list.
- */
- def removeRegistrationListener(listener: ActorRef) = {
- listener.stop
- registrationListeners.remove(listener)
- }
-
- private def foreachListener(f: (ActorRef) => Unit) {
- val iterator = registrationListeners.iterator
- while (iterator.hasNext) {
- val listener = iterator.next
- if (listener.isRunning) f(listener)
- else log.warning("Can't send ActorRegistryEvent to [%s] since it is not running.", listener)
- }
- }
}
diff --git a/akka-core/src/main/scala/config/Config.scala b/akka-core/src/main/scala/config/Config.scala
index b5de27a669..68842ad1e3 100644
--- a/akka-core/src/main/scala/config/Config.scala
+++ b/akka-core/src/main/scala/config/Config.scala
@@ -16,7 +16,7 @@ class ConfigurationException(message: String) extends RuntimeException(message)
* @author Jonas Bonér
*/
object Config extends Logging {
- val VERSION = "0.9"
+ val VERSION = "0.10"
// Set Multiverse options for max speed
System.setProperty("org.multiverse.MuliverseConstants.sanityChecks", "false")
diff --git a/akka-core/src/main/scala/dispatch/MessageHandling.scala b/akka-core/src/main/scala/dispatch/MessageHandling.scala
index fbf52224ce..54d627258d 100644
--- a/akka-core/src/main/scala/dispatch/MessageHandling.scala
+++ b/akka-core/src/main/scala/dispatch/MessageHandling.scala
@@ -7,7 +7,7 @@ package se.scalablesolutions.akka.dispatch
import java.util.List
import se.scalablesolutions.akka.util.{HashCode, Logging}
-import se.scalablesolutions.akka.actor.{Actor, ActorRef}
+import se.scalablesolutions.akka.actor.{Actor, ActorRef, ActorInitializationException}
import java.util.concurrent.ConcurrentHashMap
@@ -23,7 +23,12 @@ final class MessageInvocation(val receiver: ActorRef,
val transactionSet: Option[CountDownCommitBarrier]) {
if (receiver eq null) throw new IllegalArgumentException("receiver is null")
- def invoke = receiver.invoke(this)
+ def invoke = try {
+ receiver.invoke(this)
+ } catch {
+ case e: NullPointerException => throw new ActorInitializationException(
+ "Don't call 'self ! message' in the Actor's constructor (e.g. body of the class).")
+ }
def send = receiver.dispatcher.dispatch(this)
diff --git a/akka-core/src/main/scala/remote/RemoteClient.scala b/akka-core/src/main/scala/remote/RemoteClient.scala
index cfb8a9a5ea..da0f9be72b 100644
--- a/akka-core/src/main/scala/remote/RemoteClient.scala
+++ b/akka-core/src/main/scala/remote/RemoteClient.scala
@@ -38,8 +38,11 @@ object RemoteRequestProtocolIdFactory {
def nextId: Long = id.getAndIncrement + nodeId
}
+/**
+ * Life-cycle events for RemoteClient.
+ */
sealed trait RemoteClientLifeCycleEvent
-case class RemoteClientError(cause: Throwable) extends RemoteClientLifeCycleEvent
+case class RemoteClientError(cause: Throwable, host: String, port: Int) extends RemoteClientLifeCycleEvent
case class RemoteClientDisconnected(host: String, port: Int) extends RemoteClientLifeCycleEvent
case class RemoteClientConnected(host: String, port: Int) extends RemoteClientLifeCycleEvent
@@ -186,7 +189,7 @@ class RemoteClient private[akka] (val hostname: String, val port: Int, loader: O
val channel = connection.awaitUninterruptibly.getChannel
openChannels.add(channel)
if (!connection.isSuccess) {
- listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(connection.getCause))
+ listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(connection.getCause, hostname, port))
log.error(connection.getCause, "Remote client connection to [%s:%s] has failed", hostname, port)
}
isRunning = true
@@ -222,7 +225,7 @@ class RemoteClient private[akka] (val hostname: String, val port: Int, loader: O
}
} else {
val exception = new IllegalStateException("Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.")
- listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(exception))
+ listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(exception, hostname, port))
throw exception
}
@@ -311,12 +314,12 @@ class RemoteClientHandler(val name: String,
futures.remove(reply.getId)
} else {
val exception = new IllegalArgumentException("Unknown message received in remote client handler: " + result)
- client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(exception))
+ client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(exception, client.hostname, client.port))
throw exception
}
} catch {
case e: Exception =>
- client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(e))
+ client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(e, client.hostname, client.port))
log.error("Unexpected exception in remote client handler: %s", e)
throw e
}
@@ -331,7 +334,7 @@ class RemoteClientHandler(val name: String,
client.connection.awaitUninterruptibly // Wait until the connection attempt succeeds or fails.
if (!client.connection.isSuccess) {
client.listeners.toArray.foreach(l =>
- l.asInstanceOf[ActorRef] ! RemoteClientError(client.connection.getCause))
+ l.asInstanceOf[ActorRef] ! RemoteClientError(client.connection.getCause, client.hostname, client.port))
log.error(client.connection.getCause, "Reconnection to [%s] has failed", remoteAddress)
}
}
@@ -351,7 +354,7 @@ class RemoteClientHandler(val name: String,
}
override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
- client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(event.getCause))
+ client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(event.getCause, client.hostname, client.port))
log.error(event.getCause, "Unexpected exception from downstream in remote client")
event.getChannel.close
}
diff --git a/akka-core/src/main/scala/util/ListenerManagement.scala b/akka-core/src/main/scala/util/ListenerManagement.scala
new file mode 100644
index 0000000000..bdee9d1c1c
--- /dev/null
+++ b/akka-core/src/main/scala/util/ListenerManagement.scala
@@ -0,0 +1,49 @@
+/**
+ * Copyright (C) 2009-2010 Scalable Solutions AB
+ */
+
+package se.scalablesolutions.akka.util
+
+import java.util.concurrent.CopyOnWriteArrayList
+
+import se.scalablesolutions.akka.actor.ActorRef
+
+/**
+ * A manager for listener actors. Intended for mixin by observables.
+ *
+ * @author Martin Krasser
+ */
+trait ListenerManagement extends Logging {
+
+ private val listeners = new CopyOnWriteArrayList[ActorRef]
+
+ /**
+ * Adds the listener this this registry's listener list.
+ * The listener is started by this method.
+ */
+ def addListener(listener: ActorRef) = {
+ listener.start
+ listeners.add(listener)
+ }
+
+ /**
+ * Removes the listener this this registry's listener list.
+ * The listener is stopped by this method.
+ */
+ def removeListener(listener: ActorRef) = {
+ listener.stop
+ listeners.remove(listener)
+ }
+
+ /**
+ * Execute f with each listener as argument.
+ */
+ protected def foreachListener(f: (ActorRef) => Unit) {
+ val iterator = listeners.iterator
+ while (iterator.hasNext) {
+ val listener = iterator.next
+ if (listener.isRunning) f(listener)
+ else log.warning("Can't notify [%s] since it is not running.", listener)
+ }
+ }
+}
\ No newline at end of file
diff --git a/akka-core/src/test/scala/Bench.scala b/akka-core/src/test/scala/Bench.scala
new file mode 100644
index 0000000000..454fc2b7e9
--- /dev/null
+++ b/akka-core/src/test/scala/Bench.scala
@@ -0,0 +1,119 @@
+/* The Computer Language Benchmarks Game
+ http://shootout.alioth.debian.org/
+ contributed by Julien Gaugaz
+ inspired by the version contributed by Yura Taras and modified by Isaac Gouy
+*/
+package se.scalablesolutions.akka.actor
+
+import se.scalablesolutions.akka.actor.Actor._
+
+object Chameneos {
+
+ sealed trait ChameneosEvent
+ case class Meet(from: ActorRef, colour: Colour) extends ChameneosEvent
+ case class Change(colour: Colour) extends ChameneosEvent
+ case class MeetingCount(count: Int) extends ChameneosEvent
+ case object Exit extends ChameneosEvent
+
+ abstract class Colour
+ case object RED extends Colour
+ case object YELLOW extends Colour
+ case object BLUE extends Colour
+ case object FADED extends Colour
+
+ val colours = Array[Colour](BLUE, RED, YELLOW)
+
+ var start = 0L
+ var end = 0L
+
+ class Chameneo(var mall: ActorRef, var colour: Colour, cid: Int) extends Actor {
+ var meetings = 0
+ self.start
+ mall ! Meet(self, colour)
+
+ def receive = {
+ case Meet(from, otherColour) =>
+ colour = complement(otherColour)
+ meetings = meetings +1
+ from ! Change(colour)
+ mall ! Meet(self, colour)
+
+ case Change(newColour) =>
+ colour = newColour
+ meetings = meetings +1
+ mall ! Meet(self, colour)
+
+ case Exit =>
+ colour = FADED
+ self.sender.get ! MeetingCount(meetings)
+ }
+
+ def complement(otherColour: Colour): Colour = colour match {
+ case RED => otherColour match {
+ case RED => RED
+ case YELLOW => BLUE
+ case BLUE => YELLOW
+ case FADED => FADED
+ }
+ case YELLOW => otherColour match {
+ case RED => BLUE
+ case YELLOW => YELLOW
+ case BLUE => RED
+ case FADED => FADED
+ }
+ case BLUE => otherColour match {
+ case RED => YELLOW
+ case YELLOW => RED
+ case BLUE => BLUE
+ case FADED => FADED
+ }
+ case FADED => FADED
+ }
+
+ override def toString = cid + "(" + colour + ")"
+ }
+
+ class Mall(var n: Int, numChameneos: Int) extends Actor {
+ var waitingChameneo: Option[ActorRef] = None
+ var sumMeetings = 0
+ var numFaded = 0
+
+ override def init = {
+ for (i <- 0 until numChameneos) actorOf(new Chameneo(self, colours(i % 3), i))
+ }
+
+ def receive = {
+ case MeetingCount(i) =>
+ numFaded += 1
+ sumMeetings += i
+ if (numFaded == numChameneos) {
+ Chameneos.end = System.currentTimeMillis
+ self.stop
+ }
+
+ case msg @ Meet(a, c) =>
+ if (n > 0) {
+ waitingChameneo match {
+ case Some(chameneo) =>
+ n -= 1
+ chameneo ! msg
+ waitingChameneo = None
+ case None => waitingChameneo = self.sender
+ }
+ } else {
+ waitingChameneo.foreach(_ ! Exit)
+ self.sender.get ! Exit
+ }
+ }
+ }
+
+ def run {
+// System.setProperty("akka.config", "akka.conf")
+ Chameneos.start = System.currentTimeMillis
+ actorOf(new Mall(1000000, 4)).start
+ Thread.sleep(10000)
+ println("Elapsed: " + (end - start))
+ }
+
+ def main(args : Array[String]): Unit = run
+}
diff --git a/akka-http/src/main/scala/Initializer.scala b/akka-http/src/main/scala/Initializer.scala
index d47357c710..da95a39b77 100644
--- a/akka-http/src/main/scala/Initializer.scala
+++ b/akka-http/src/main/scala/Initializer.scala
@@ -6,7 +6,7 @@ package se.scalablesolutions.akka.servlet
import se.scalablesolutions.akka.remote.BootableRemoteActorService
import se.scalablesolutions.akka.actor.BootableActorLoaderService
-import se.scalablesolutions.akka.camel.service.CamelService
+import se.scalablesolutions.akka.camel.CamelService
import se.scalablesolutions.akka.config.Config
import se.scalablesolutions.akka.util.{Logging, Bootable}
diff --git a/akka-kernel/src/main/scala/Kernel.scala b/akka-kernel/src/main/scala/Kernel.scala
index db460062c8..4b5fa83574 100644
--- a/akka-kernel/src/main/scala/Kernel.scala
+++ b/akka-kernel/src/main/scala/Kernel.scala
@@ -7,7 +7,7 @@ package se.scalablesolutions.akka.kernel
import se.scalablesolutions.akka.servlet.AkkaLoader
import se.scalablesolutions.akka.remote.BootableRemoteActorService
import se.scalablesolutions.akka.actor.BootableActorLoaderService
-import se.scalablesolutions.akka.camel.service.CamelService
+import se.scalablesolutions.akka.camel.CamelService
import se.scalablesolutions.akka.config.Config
object Main {
diff --git a/akka-persistence/akka-persistence-common/src/main/scala/StorageBackend.scala b/akka-persistence/akka-persistence-common/src/main/scala/StorageBackend.scala
index a5226eb1a4..df74040b68 100644
--- a/akka-persistence/akka-persistence-common/src/main/scala/StorageBackend.scala
+++ b/akka-persistence/akka-persistence-common/src/main/scala/StorageBackend.scala
@@ -33,14 +33,6 @@ trait VectorStorageBackend[T] extends StorageBackend {
trait RefStorageBackend[T] extends StorageBackend {
def insertRefStorageFor(name: String, element: T)
def getRefStorageFor(name: String): Option[T]
- def incrementAtomically(name: String): Option[Int] =
- throw new UnsupportedOperationException // only for redis
- def incrementByAtomically(name: String, by: Int): Option[Int] =
- throw new UnsupportedOperationException // only for redis
- def decrementAtomically(name: String): Option[Int] =
- throw new UnsupportedOperationException // only for redis
- def decrementByAtomically(name: String, by: Int): Option[Int] =
- throw new UnsupportedOperationException // only for redis
}
// for Queue
diff --git a/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala b/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala
index b1973c3c7b..ad758f9999 100644
--- a/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala
+++ b/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala
@@ -11,20 +11,45 @@ import se.scalablesolutions.akka.config.Config.config
import com.redis._
-trait Encoder {
+trait Base64Encoder {
def encode(bytes: Array[Byte]): Array[Byte]
def decode(bytes: Array[Byte]): Array[Byte]
}
-trait CommonsCodecBase64 {
- import org.apache.commons.codec.binary.Base64._
-
- def encode(bytes: Array[Byte]): Array[Byte] = encodeBase64(bytes)
- def decode(bytes: Array[Byte]): Array[Byte] = decodeBase64(bytes)
+trait Base64StringEncoder {
+ def byteArrayToString(bytes: Array[Byte]): String
+ def stringToByteArray(str: String): Array[Byte]
}
-object Base64Encoder extends Encoder with CommonsCodecBase64
-import Base64Encoder._
+trait NullBase64 {
+ def encode(bytes: Array[Byte]): Array[Byte] = bytes
+ def decode(bytes: Array[Byte]): Array[Byte] = bytes
+}
+
+object CommonsCodec {
+ import org.apache.commons.codec.binary.Base64
+ import org.apache.commons.codec.binary.Base64._
+
+ val b64 = new Base64(true)
+
+ trait CommonsCodecBase64 {
+ def encode(bytes: Array[Byte]): Array[Byte] = encodeBase64(bytes)
+ def decode(bytes: Array[Byte]): Array[Byte] = decodeBase64(bytes)
+ }
+
+ object Base64Encoder extends Base64Encoder with CommonsCodecBase64
+
+ trait CommonsCodecBase64StringEncoder {
+ def byteArrayToString(bytes: Array[Byte]) = encodeBase64URLSafeString(bytes)
+ def stringToByteArray(str: String) = b64.decode(str)
+ }
+
+ object Base64StringEncoder extends Base64StringEncoder with CommonsCodecBase64StringEncoder
+}
+
+import CommonsCodec._
+import CommonsCodec.Base64Encoder._
+import CommonsCodec.Base64StringEncoder._
/**
* A module for supporting Redis based persistence.
@@ -95,7 +120,7 @@ private [akka] object RedisStorageBackend extends
def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[Array[Byte], Array[Byte]]]): Unit = withErrorHandling {
mset(entries.map(e =>
- (makeRedisKey(name, e._1), new String(e._2))))
+ (makeRedisKey(name, e._1), byteArrayToString(e._2))))
}
/**
@@ -138,7 +163,7 @@ private [akka] object RedisStorageBackend extends
db.get(makeRedisKey(name, key)) match {
case None =>
throw new NoSuchElementException(new String(key) + " not present")
- case Some(s) => Some(s.getBytes)
+ case Some(s) => Some(stringToByteArray(s))
}
}
@@ -155,7 +180,7 @@ private [akka] object RedisStorageBackend extends
case None =>
throw new NoSuchElementException(name + " not present")
case Some(keys) =>
- keys.map(key => (makeKeyFromRedisKey(key)._2, db.get(key).get.getBytes)).toList
+ keys.map(key => (makeKeyFromRedisKey(key)._2, stringToByteArray(db.get(key).get))).toList
}
}
@@ -207,7 +232,7 @@ private [akka] object RedisStorageBackend extends
}
def insertVectorStorageEntryFor(name: String, element: Array[Byte]): Unit = withErrorHandling {
- db.lpush(new String(encode(name.getBytes)), new String(element))
+ db.lpush(new String(encode(name.getBytes)), byteArrayToString(element))
}
def insertVectorStorageEntriesFor(name: String, elements: List[Array[Byte]]): Unit = withErrorHandling {
@@ -215,14 +240,15 @@ private [akka] object RedisStorageBackend extends
}
def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]): Unit = withErrorHandling {
- db.lset(new String(encode(name.getBytes)), index, new String(elem))
+ db.lset(new String(encode(name.getBytes)), index, byteArrayToString(elem))
}
def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] = withErrorHandling {
db.lindex(new String(encode(name.getBytes)), index) match {
case None =>
throw new NoSuchElementException(name + " does not have element at " + index)
- case Some(e) => e.getBytes
+ case Some(e) =>
+ stringToByteArray(e)
}
}
@@ -246,75 +272,46 @@ private [akka] object RedisStorageBackend extends
case None =>
throw new NoSuchElementException(name + " does not have elements in the range specified")
case Some(l) =>
- l map (_.get.getBytes)
+ l map ( e => stringToByteArray(e.get))
}
}
- def getVectorStorageSizeFor(name: String): Int = {
+ def getVectorStorageSizeFor(name: String): Int = withErrorHandling {
db.llen(new String(encode(name.getBytes))) match {
case None =>
throw new NoSuchElementException(name + " not present")
- case Some(l) => l
+ case Some(l) =>
+ l
}
}
def insertRefStorageFor(name: String, element: Array[Byte]): Unit = withErrorHandling {
- db.set(new String(encode(name.getBytes)), new String(element))
+ db.set(new String(encode(name.getBytes)), byteArrayToString(element))
+ }
+
+ def insertRefStorageFor(name: String, element: String): Unit = withErrorHandling {
+ db.set(new String(encode(name.getBytes)), element)
}
def getRefStorageFor(name: String): Option[Array[Byte]] = withErrorHandling {
db.get(new String(encode(name.getBytes))) match {
case None =>
throw new NoSuchElementException(name + " not present")
- case Some(s) => Some(s.getBytes)
- }
- }
-
- override def incrementAtomically(name: String): Option[Int] = withErrorHandling {
- db.incr(new String(encode(name.getBytes))) match {
- case Some(i) => Some(i)
- case None =>
- throw new IllegalArgumentException(name + " exception in incr")
- }
- }
-
- override def incrementByAtomically(name: String, by: Int): Option[Int] = withErrorHandling {
- db.incrby(new String(encode(name.getBytes)), by) match {
- case Some(i) => Some(i)
- case None =>
- throw new IllegalArgumentException(name + " exception in incrby")
- }
- }
-
- override def decrementAtomically(name: String): Option[Int] = withErrorHandling {
- db.decr(new String(encode(name.getBytes))) match {
- case Some(i) => Some(i)
- case None =>
- throw new IllegalArgumentException(name + " exception in decr")
- }
- }
-
- override def decrementByAtomically(name: String, by: Int): Option[Int] = withErrorHandling {
- db.decrby(new String(encode(name.getBytes)), by) match {
- case Some(i) => Some(i)
- case None =>
- throw new IllegalArgumentException(name + " exception in decrby")
+ case Some(s) => Some(stringToByteArray(s))
}
}
// add to the end of the queue
def enqueue(name: String, item: Array[Byte]): Boolean = withErrorHandling {
- db.rpush(new String(encode(name.getBytes)), new String(item))
+ db.rpush(new String(encode(name.getBytes)), byteArrayToString(item))
}
-
// pop from the front of the queue
def dequeue(name: String): Option[Array[Byte]] = withErrorHandling {
db.lpop(new String(encode(name.getBytes))) match {
case None =>
throw new NoSuchElementException(name + " not present")
- case Some(s) =>
- Some(s.getBytes)
+ case Some(s) => Some(stringToByteArray(s))
}
}
@@ -336,7 +333,7 @@ private [akka] object RedisStorageBackend extends
case None =>
throw new NoSuchElementException("No element at " + start)
case Some(s) =>
- List(s.getBytes)
+ List(stringToByteArray(s))
}
case n =>
db.lrange(new String(encode(name.getBytes)), start, start + count - 1) match {
@@ -344,7 +341,7 @@ private [akka] object RedisStorageBackend extends
throw new NoSuchElementException(
"No element found between " + start + " and " + (start + count - 1))
case Some(es) =>
- es.map(_.get.getBytes)
+ es.map(e => stringToByteArray(e.get))
}
}
}
@@ -359,7 +356,7 @@ private [akka] object RedisStorageBackend extends
// add item to sorted set identified by name
def zadd(name: String, zscore: String, item: Array[Byte]): Boolean = withErrorHandling {
- db.zadd(new String(encode(name.getBytes)), zscore, new String(item)) match {
+ db.zadd(new String(encode(name.getBytes)), zscore, byteArrayToString(item)) match {
case Some(1) => true
case _ => false
}
@@ -367,7 +364,7 @@ private [akka] object RedisStorageBackend extends
// remove item from sorted set identified by name
def zrem(name: String, item: Array[Byte]): Boolean = withErrorHandling {
- db.zrem(new String(encode(name.getBytes)), new String(item)) match {
+ db.zrem(new String(encode(name.getBytes)), byteArrayToString(item)) match {
case Some(1) => true
case _ => false
}
@@ -383,7 +380,7 @@ private [akka] object RedisStorageBackend extends
}
def zscore(name: String, item: Array[Byte]): Option[Float] = withErrorHandling {
- db.zscore(new String(encode(name.getBytes)), new String(item)) match {
+ db.zscore(new String(encode(name.getBytes)), byteArrayToString(item)) match {
case Some(s) => Some(s.toFloat)
case None => None
}
@@ -394,7 +391,7 @@ private [akka] object RedisStorageBackend extends
case None =>
throw new NoSuchElementException(name + " not present")
case Some(s) =>
- s.map(_.get.getBytes)
+ s.map(e => stringToByteArray(e.get))
}
}
@@ -404,7 +401,7 @@ private [akka] object RedisStorageBackend extends
case None =>
throw new NoSuchElementException(name + " not present")
case Some(l) =>
- l.map{ case (elem, score) => (elem.get.getBytes, score.get.toFloat) }
+ l.map{ case (elem, score) => (stringToByteArray(elem.get), score.get.toFloat) }
}
}
diff --git a/akka-persistence/akka-persistence-redis/src/test/scala/RedisInconsistentSizeBugTest.scala b/akka-persistence/akka-persistence-redis/src/test/scala/RedisInconsistentSizeBugTest.scala
new file mode 100644
index 0000000000..22d294c735
--- /dev/null
+++ b/akka-persistence/akka-persistence-redis/src/test/scala/RedisInconsistentSizeBugTest.scala
@@ -0,0 +1,74 @@
+package se.scalablesolutions.akka.persistence.redis
+
+import sbinary._
+import sbinary.Operations._
+import sbinary.DefaultProtocol._
+
+import se.scalablesolutions.akka.actor.{Actor, ActorRef}
+import se.scalablesolutions.akka.config.OneForOneStrategy
+import Actor._
+import se.scalablesolutions.akka.persistence.common.PersistentVector
+import se.scalablesolutions.akka.stm.Transaction.Global._
+import se.scalablesolutions.akka.config.ScalaConfig._
+import se.scalablesolutions.akka.util.Logging
+
+import java.util.{Calendar, Date}
+
+object Serial {
+ implicit object DateFormat extends Format[Date] {
+ def reads(in : Input) = new Date(read[Long](in))
+ def writes(out: Output, value: Date) = write[Long](out, value.getTime)
+ }
+ case class Name(id: Int, name: String, address: String, dateOfBirth: Date, dateDied: Option[Date])
+ implicit val NameFormat: Format[Name] = asProduct5(Name)(Name.unapply(_).get)
+}
+
+case class GETFOO(s: String)
+case class SETFOO(s: String)
+
+object SampleStorage {
+ class RedisSampleStorage extends Actor {
+ self.lifeCycle = Some(LifeCycle(Permanent))
+ val EVENT_MAP = "akka.sample.map"
+
+ private var eventMap = atomic { RedisStorage.getMap(EVENT_MAP) }
+
+ import sbinary._
+ import DefaultProtocol._
+ import Operations._
+ import Serial._
+ import java.util.Calendar
+
+ val dtb = Calendar.getInstance.getTime
+ val n = Name(100, "debasish ghosh", "kolkata", dtb, Some(dtb))
+
+ def receive = {
+ case SETFOO(str) =>
+ atomic {
+ eventMap += (str.getBytes, toByteArray[Name](n))
+ }
+ self.reply(str)
+
+ case GETFOO(str) =>
+ val ev = atomic {
+ eventMap.keySet.size
+ }
+ println("************* " + ev)
+ self.reply(ev)
+ }
+ }
+}
+
+import Serial._
+import SampleStorage._
+
+object Runner {
+ def run {
+ val proc = actorOf[RedisSampleStorage]
+ proc.start
+ val i: Option[String] = proc !! SETFOO("debasish")
+ println("i = " + i)
+ val ev: Option[Int] = proc !! GETFOO("debasish")
+ println(ev)
+ }
+}
diff --git a/akka-persistence/akka-persistence-redis/src/test/scala/RedisStorageBackendSpec.scala b/akka-persistence/akka-persistence-redis/src/test/scala/RedisStorageBackendSpec.scala
index d8d79d7f2a..8a8021b3c5 100644
--- a/akka-persistence/akka-persistence-redis/src/test/scala/RedisStorageBackendSpec.scala
+++ b/akka-persistence/akka-persistence-redis/src/test/scala/RedisStorageBackendSpec.scala
@@ -9,6 +9,11 @@ import org.junit.runner.RunWith
import se.scalablesolutions.akka.serialization.Serializable
import se.scalablesolutions.akka.serialization.Serializer._
+import sbinary._
+import sbinary.Operations._
+import sbinary.DefaultProtocol._
+import java.util.{Calendar, Date}
+
import RedisStorageBackend._
@RunWith(classOf[JUnitRunner])
@@ -39,15 +44,6 @@ class RedisStorageBackendSpec extends
"T-1", "debasish.language".getBytes).get) should equal("java")
}
- /**
- it("should enter a custom object for transaction T-1") {
- val n = Name(100, "debasish", "kolkata")
- // insertMapStorageEntryFor("T-1", "debasish.identity".getBytes, Java.out(n))
- // insertMapStorageEntryFor("T-1", "debasish.identity".getBytes, n.toBytes)
- getMapStorageSizeFor("T-1") should equal(5)
- }
- **/
-
it("should enter key/values for another transaction T-2") {
insertMapStorageEntryFor("T-2", "debasish.age".getBytes, "49".getBytes)
insertMapStorageEntryFor("T-2", "debasish.spouse".getBytes, "paramita".getBytes)
@@ -61,6 +57,21 @@ class RedisStorageBackendSpec extends
}
}
+ describe("Store and query long value in maps") {
+ it("should enter 4 entries in redis for transaction T-1") {
+ val d = Calendar.getInstance.getTime.getTime
+ insertMapStorageEntryFor("T-11", "debasish".getBytes,
+ toByteArray[Long](d))
+
+ getMapStorageSizeFor("T-11") should equal(1)
+ fromByteArray[Long](getMapStorageEntryFor("T-11", "debasish".getBytes).get) should equal(d)
+ }
+
+ it("should remove map storage for T-1 and T2") {
+ removeMapStorageFor("T-11")
+ }
+ }
+
describe("Range query in maps") {
it("should enter 7 entries in redis for transaction T-5") {
insertMapStorageEntryFor("T-5", "trade.refno".getBytes, "R-123".getBytes)
@@ -93,73 +104,61 @@ class RedisStorageBackendSpec extends
}
}
+ describe("Store and query objects in maps") {
+ import NameSerialization._
+ it("should write a Name object and fetch it properly") {
+ val dtb = Calendar.getInstance.getTime
+ val n = Name(100, "debasish ghosh", "kolkata", dtb, Some(dtb))
+
+ insertMapStorageEntryFor("T-31", "debasish".getBytes, toByteArray[Name](n))
+ getMapStorageSizeFor("T-31") should equal(1)
+ fromByteArray[Name](getMapStorageEntryFor("T-31", "debasish".getBytes).get) should equal(n)
+ }
+ it("should remove map storage for T31") {
+ removeMapStorageFor("T-31")
+ }
+ }
+
describe("Store and query in vectors") {
it("should write 4 entries in a vector for transaction T-3") {
insertVectorStorageEntryFor("T-3", "debasish".getBytes)
insertVectorStorageEntryFor("T-3", "maulindu".getBytes)
- val n = Name(100, "debasish", "kolkata")
- // insertVectorStorageEntryFor("T-3", Java.out(n))
- // insertVectorStorageEntryFor("T-3", n.toBytes)
insertVectorStorageEntryFor("T-3", "1200".getBytes)
- getVectorStorageSizeFor("T-3") should equal(3)
+
+ val dt = Calendar.getInstance.getTime.getTime
+ insertVectorStorageEntryFor("T-3", toByteArray[Long](dt))
+ getVectorStorageSizeFor("T-3") should equal(4)
+ fromByteArray[Long](getVectorStorageEntryFor("T-3", 0)) should equal(dt)
+ getVectorStorageSizeFor("T-3") should equal(4)
+ }
+ }
+
+ describe("Store and query objects in vectors") {
+ import NameSerialization._
+ it("should write a Name object and fetch it properly") {
+ val dtb = Calendar.getInstance.getTime
+ val n = Name(100, "debasish ghosh", "kolkata", dtb, Some(dtb))
+
+ insertVectorStorageEntryFor("T-31", toByteArray[Name](n))
+ getVectorStorageSizeFor("T-31") should equal(1)
+ fromByteArray[Name](getVectorStorageEntryFor("T-31", 0)) should equal(n)
}
}
describe("Store and query in ref") {
+ import NameSerialization._
it("should write 4 entries in 4 refs for transaction T-4") {
insertRefStorageFor("T-4", "debasish".getBytes)
insertRefStorageFor("T-4", "maulindu".getBytes)
insertRefStorageFor("T-4", "1200".getBytes)
new String(getRefStorageFor("T-4").get) should equal("1200")
-
- // val n = Name(100, "debasish", "kolkata")
- // insertRefStorageFor("T-4", Java.out(n))
- // insertRefStorageFor("T-4", n.toBytes)
- // Java.in(getRefStorageFor("T-4").get, Some(classOf[Name])).asInstanceOf[Name] should equal(n)
- // n.fromBytes(getRefStorageFor("T-4").get) should equal(n)
}
- }
-
- describe("atomic increment in ref") {
- it("should increment an existing key value by 1") {
- insertRefStorageFor("T-4-1", "1200".getBytes)
- new String(getRefStorageFor("T-4-1").get) should equal("1200")
- incrementAtomically("T-4-1").get should equal(1201)
- }
- it("should create and increment a non-existing key value by 1") {
- incrementAtomically("T-4-2").get should equal(1)
- new String(getRefStorageFor("T-4-2").get) should equal("1")
- }
- it("should increment an existing key value by the amount specified") {
- insertRefStorageFor("T-4-3", "1200".getBytes)
- new String(getRefStorageFor("T-4-3").get) should equal("1200")
- incrementByAtomically("T-4-3", 50).get should equal(1250)
- }
- it("should create and increment a non-existing key value by the amount specified") {
- incrementByAtomically("T-4-4", 20).get should equal(20)
- new String(getRefStorageFor("T-4-4").get) should equal("20")
- }
- }
-
- describe("atomic decrement in ref") {
- it("should decrement an existing key value by 1") {
- insertRefStorageFor("T-4-5", "1200".getBytes)
- new String(getRefStorageFor("T-4-5").get) should equal("1200")
- decrementAtomically("T-4-5").get should equal(1199)
- }
- it("should create and decrement a non-existing key value by 1") {
- decrementAtomically("T-4-6").get should equal(-1)
- new String(getRefStorageFor("T-4-6").get) should equal("-1")
- }
- it("should decrement an existing key value by the amount specified") {
- insertRefStorageFor("T-4-7", "1200".getBytes)
- new String(getRefStorageFor("T-4-7").get) should equal("1200")
- decrementByAtomically("T-4-7", 50).get should equal(1150)
- }
- it("should create and decrement a non-existing key value by the amount specified") {
- decrementByAtomically("T-4-8", 20).get should equal(-20)
- new String(getRefStorageFor("T-4-8").get) should equal("-20")
+ it("should write a Name object and fetch it properly") {
+ val dtb = Calendar.getInstance.getTime
+ val n = Name(100, "debasish ghosh", "kolkata", dtb, Some(dtb))
+ insertRefStorageFor("T-4", toByteArray[Name](n))
+ fromByteArray[Name](getRefStorageFor("T-4").get) should equal(n)
}
}
@@ -185,6 +184,14 @@ class RedisStorageBackendSpec extends
new String(l(1)) should equal("yukihiro matsumoto")
new String(l(2)) should equal("claude shannon")
}
+ it("should write a Name object and fetch it properly") {
+ import NameSerialization._
+ val dtb = Calendar.getInstance.getTime
+ val n = Name(100, "debasish ghosh", "kolkata", dtb, Some(dtb))
+ enqueue("T-5-1", toByteArray[Name](n))
+ fromByteArray[Name](peek("T-5-1", 0, 1).head) should equal(n)
+ fromByteArray[Name](dequeue("T-5-1").get) should equal(n)
+ }
}
describe("store and query in sorted set") {
@@ -221,27 +228,18 @@ class RedisStorageBackendSpec extends
}
}
-case class Name(id: Int, name: String, address: String)
- extends Serializable.SBinary[Name] {
- import sbinary._
- import sbinary.Operations._
- import sbinary.DefaultProtocol._
+object NameSerialization {
+ implicit object DateFormat extends Format[Date] {
+ def reads(in : Input) =
+ new Date(read[Long](in))
- def this() = this(0, null, null)
-
- implicit object NameFormat extends Format[Name] {
- def reads(in : Input) = Name(
- read[Int](in),
- read[String](in),
- read[String](in))
- def writes(out: Output, value: Name) = {
- write[Int](out, value.id)
- write[String](out, value.name)
- write[String](out, value.address)
- }
+ def writes(out: Output, value: Date) =
+ write[Long](out, value.getTime)
}
- def fromBytes(bytes: Array[Byte]) = fromByteArray[Name](bytes)
+ case class Name(id: Int, name: String,
+ address: String, dateOfBirth: Date, dateDied: Option[Date])
- def toBytes: Array[Byte] = toByteArray(this)
+ implicit val NameFormat: Format[Name] =
+ asProduct5(Name)(Name.unapply(_).get)
}
diff --git a/akka-samples/akka-sample-ants/README.md b/akka-samples/akka-sample-ants/README.md
index 8556abe5b2..3c559834cb 100644
--- a/akka-samples/akka-sample-ants/README.md
+++ b/akka-samples/akka-sample-ants/README.md
@@ -3,38 +3,33 @@ Ants
Ants is written by Peter Vlugter.
-Ants is based on the Clojure [ants simulation][ants.clj] by Rich Hickey, and ported to Scala using [Akka][akka] and [Spde][spde].
-
-[ants.clj]:http://clojure.googlegroups.com/web/ants.clj
-[akka]:http://akkasource.org
-[spde]:http://technically.us/spde/
-
+Ants is roughly based on the Clojure [ants simulation][ants.clj] by Rich Hickey, and ported to Scala using [Akka][akka] and [Spde][spde].
Requirements
------------
To build and run Ants you need [Simple Build Tool][sbt] (sbt).
-[sbt]: http://code.google.com/p/simple-build-tool/
-
-
Running
-------
First time, 'sbt update' to get dependencies, then to run Ants use 'sbt run'.
Here is an example. First type 'sbt' to start SBT interactively, the run 'update' and 'run':
+> cd $AKKA_HOME
> % sbt
-> update
+> > update
-> run
+> > project akka-sample-ants
+
+> > run
Notice
------
-Ants is based on the Clojure ants simulation by Rich Hickey.
+Ants is roughly based on the Clojure ants simulation by Rich Hickey.
Copyright (c) Rich Hickey. All rights reserved.
The use and distribution terms for this software are covered by the
@@ -44,4 +39,8 @@ By using this software in any fashion, you are agreeing to be bound by
the terms of this license.
You must not remove this notice, or any other, from this software.
+[ants.clj]:http://clojure.googlegroups.com/web/ants.clj
+[akka]:http://akkasource.org
+[spde]:http://technically.us/spde/
+[sbt]: http://code.google.com/p/simple-build-tool/
[cpl]: http://opensource.org/licenses/cpl1.0.php
\ No newline at end of file
diff --git a/akka-samples/akka-sample-camel/src/main/java/sample/camel/BeanImpl.java b/akka-samples/akka-sample-camel/src/main/java/sample/camel/BeanImpl.java
new file mode 100644
index 0000000000..10437e7624
--- /dev/null
+++ b/akka-samples/akka-sample-camel/src/main/java/sample/camel/BeanImpl.java
@@ -0,0 +1,12 @@
+package sample.camel;
+
+/**
+ * @author Martin Krasser
+ */
+public class BeanImpl implements BeanIntf {
+
+ public String foo(String s) {
+ return "hello " + s;
+ }
+
+}
diff --git a/akka-samples/akka-sample-camel/src/main/java/sample/camel/BeanIntf.java b/akka-samples/akka-sample-camel/src/main/java/sample/camel/BeanIntf.java
new file mode 100644
index 0000000000..a7b2e6e6a4
--- /dev/null
+++ b/akka-samples/akka-sample-camel/src/main/java/sample/camel/BeanIntf.java
@@ -0,0 +1,10 @@
+package sample.camel;
+
+/**
+ * @author Martin Krasser
+ */
+public interface BeanIntf {
+
+ public String foo(String s);
+
+}
diff --git a/akka-samples/akka-sample-camel/src/main/java/sample/camel/ConsumerPojo1.java b/akka-samples/akka-sample-camel/src/main/java/sample/camel/ConsumerPojo1.java
new file mode 100644
index 0000000000..ed29ac30e6
--- /dev/null
+++ b/akka-samples/akka-sample-camel/src/main/java/sample/camel/ConsumerPojo1.java
@@ -0,0 +1,24 @@
+package sample.camel;
+
+import org.apache.camel.Body;
+import org.apache.camel.Header;
+
+import se.scalablesolutions.akka.actor.annotation.consume;
+
+/**
+ * @author Martin Krasser
+ */
+public class ConsumerPojo1 {
+
+ @consume("file:data/input/pojo")
+ public void foo(String body) {
+ System.out.println("Received message:");
+ System.out.println(body);
+ }
+
+ @consume("jetty:http://0.0.0.0:8877/camel/pojo")
+ public String bar(@Body String body, @Header("name") String header) {
+ return String.format("body=%s header=%s", body, header);
+ }
+
+}
diff --git a/akka-samples/akka-sample-camel/src/main/java/sample/camel/ConsumerPojo2.java b/akka-samples/akka-sample-camel/src/main/java/sample/camel/ConsumerPojo2.java
new file mode 100644
index 0000000000..429e6043ad
--- /dev/null
+++ b/akka-samples/akka-sample-camel/src/main/java/sample/camel/ConsumerPojo2.java
@@ -0,0 +1,17 @@
+package sample.camel;
+
+import org.apache.camel.Body;
+import org.apache.camel.Header;
+import se.scalablesolutions.akka.actor.annotation.consume;
+
+/**
+ * @author Martin Krasser
+ */
+public class ConsumerPojo2 {
+
+ @consume("direct:default")
+ public String foo(String body) {
+ return String.format("default: %s", body);
+ }
+
+}
\ No newline at end of file
diff --git a/akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteConsumerPojo1.java b/akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteConsumerPojo1.java
new file mode 100644
index 0000000000..ab7e878b0d
--- /dev/null
+++ b/akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteConsumerPojo1.java
@@ -0,0 +1,18 @@
+package sample.camel;
+
+import org.apache.camel.Body;
+import org.apache.camel.Header;
+
+import se.scalablesolutions.akka.actor.annotation.consume;
+
+/**
+ * @author Martin Krasser
+ */
+public class RemoteConsumerPojo1 {
+
+ @consume("jetty:http://localhost:6644/camel/remote-active-object-1")
+ public String foo(@Body String body, @Header("name") String header) {
+ return String.format("remote1: body=%s header=%s", body, header);
+ }
+
+}
diff --git a/akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteConsumerPojo2.java b/akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteConsumerPojo2.java
new file mode 100644
index 0000000000..e982fe5025
--- /dev/null
+++ b/akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteConsumerPojo2.java
@@ -0,0 +1,17 @@
+package sample.camel;
+
+import org.apache.camel.Body;
+import org.apache.camel.Header;
+import se.scalablesolutions.akka.actor.annotation.consume;
+
+/**
+ * @author Martin Krasser
+ */
+public class RemoteConsumerPojo2 {
+
+ @consume("jetty:http://localhost:6644/camel/remote-active-object-2")
+ public String foo(@Body String body, @Header("name") String header) {
+ return String.format("remote2: body=%s header=%s", body, header);
+ }
+
+}
\ No newline at end of file
diff --git a/akka-samples/akka-sample-camel/src/main/resources/sample-camel-context.xml b/akka-samples/akka-sample-camel/src/main/resources/context-boot.xml
similarity index 100%
rename from akka-samples/akka-sample-camel/src/main/resources/sample-camel-context.xml
rename to akka-samples/akka-sample-camel/src/main/resources/context-boot.xml
diff --git a/akka-samples/akka-sample-camel/src/main/resources/context-standalone.xml b/akka-samples/akka-sample-camel/src/main/resources/context-standalone.xml
new file mode 100644
index 0000000000..a493678817
--- /dev/null
+++ b/akka-samples/akka-sample-camel/src/main/resources/context-standalone.xml
@@ -0,0 +1,12 @@
+
+
+
+
+
diff --git a/akka-samples/akka-sample-camel/src/main/scala/Actors.scala b/akka-samples/akka-sample-camel/src/main/scala/Actors.scala
index 93aaa08ff6..f9feaa7f4d 100644
--- a/akka-samples/akka-sample-camel/src/main/scala/Actors.scala
+++ b/akka-samples/akka-sample-camel/src/main/scala/Actors.scala
@@ -9,10 +9,10 @@ import se.scalablesolutions.akka.util.Logging
* Client-initiated remote actor.
*/
class RemoteActor1 extends RemoteActor("localhost", 7777) with Consumer {
- def endpointUri = "jetty:http://localhost:6644/remote1"
+ def endpointUri = "jetty:http://localhost:6644/camel/remote-actor-1"
protected def receive = {
- case msg: Message => self.reply(Message("hello %s" format msg.body, Map("sender" -> "remote1")))
+ case msg: Message => self.reply(Message("hello %s" format msg.bodyAs[String], Map("sender" -> "remote1")))
}
}
@@ -20,10 +20,10 @@ class RemoteActor1 extends RemoteActor("localhost", 7777) with Consumer {
* Server-initiated remote actor.
*/
class RemoteActor2 extends Actor with Consumer {
- def endpointUri = "jetty:http://localhost:6644/remote2"
+ def endpointUri = "jetty:http://localhost:6644/camel/remote-actor-2"
protected def receive = {
- case msg: Message => self.reply(Message("hello %s" format msg.body, Map("sender" -> "remote2")))
+ case msg: Message => self.reply(Message("hello %s" format msg.bodyAs[String], Map("sender" -> "remote2")))
}
}
@@ -37,14 +37,14 @@ class Producer1 extends Actor with Producer {
}
class Consumer1 extends Actor with Consumer with Logging {
- def endpointUri = "file:data/input"
+ def endpointUri = "file:data/input/actor"
def receive = {
case msg: Message => log.info("received %s" format msg.bodyAs[String])
}
}
-@consume("jetty:http://0.0.0.0:8877/camel/test1")
+@consume("jetty:http://0.0.0.0:8877/camel/default")
class Consumer2 extends Actor {
def receive = {
case msg: Message => self.reply("Hello %s" format msg.bodyAs[String])
diff --git a/akka-samples/akka-sample-camel/src/main/scala/Application1.scala b/akka-samples/akka-sample-camel/src/main/scala/Application1.scala
deleted file mode 100644
index 6dcb437992..0000000000
--- a/akka-samples/akka-sample-camel/src/main/scala/Application1.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-package sample.camel
-
-import se.scalablesolutions.akka.actor.{Actor, ActorRef}
-import se.scalablesolutions.akka.actor.Actor._
-import se.scalablesolutions.akka.camel.Message
-import se.scalablesolutions.akka.remote.RemoteClient
-
-/**
- * @author Martin Krasser
- */
-object Application1 {
-
- //
- // TODO: completion of example
- //
-
- def main(args: Array[String]) {
- implicit val sender: Option[ActorRef] = None
-
- val actor1 = actorOf[RemoteActor1]
- val actor2 = RemoteClient.actorFor("remote2", "localhost", 7777)
-
- actor1.start
-
- println(actor1 !! Message("actor1"))
- println(actor2 !! Message("actor2"))
- }
-
-}
diff --git a/akka-samples/akka-sample-camel/src/main/scala/Boot.scala b/akka-samples/akka-sample-camel/src/main/scala/Boot.scala
index 206661c8fd..f5335e5ecd 100644
--- a/akka-samples/akka-sample-camel/src/main/scala/Boot.scala
+++ b/akka-samples/akka-sample-camel/src/main/scala/Boot.scala
@@ -6,8 +6,8 @@ import org.apache.camel.impl.DefaultCamelContext
import org.apache.camel.spring.spi.ApplicationContextRegistry
import org.springframework.context.support.ClassPathXmlApplicationContext
-import se.scalablesolutions.akka.actor.SupervisorFactory
import se.scalablesolutions.akka.actor.Actor._
+import se.scalablesolutions.akka.actor.{ActiveObject, Supervisor}
import se.scalablesolutions.akka.camel.CamelContextManager
import se.scalablesolutions.akka.config.ScalaConfig._
@@ -16,23 +16,29 @@ import se.scalablesolutions.akka.config.ScalaConfig._
*/
class Boot {
+ // -----------------------------------------------------------------------
// Create CamelContext with Spring-based registry and custom route builder
+ // -----------------------------------------------------------------------
- val context = new ClassPathXmlApplicationContext("/sample-camel-context.xml", getClass)
+ val context = new ClassPathXmlApplicationContext("/context-boot.xml", getClass)
val registry = new ApplicationContextRegistry(context)
+
CamelContextManager.init(new DefaultCamelContext(registry))
CamelContextManager.context.addRoutes(new CustomRouteBuilder)
- // Basic example
+ // -----------------------------------------------------------------------
+ // Basic example (using a supervisor for consumer actors)
+ // -----------------------------------------------------------------------
- val factory = SupervisorFactory(
+ val supervisor = Supervisor(
SupervisorConfig(
RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])),
Supervise(actorOf[Consumer1], LifeCycle(Permanent)) ::
Supervise(actorOf[Consumer2], LifeCycle(Permanent)) :: Nil))
- factory.newInstance.start
+ // -----------------------------------------------------------------------
// Routing example
+ // -----------------------------------------------------------------------
val producer = actorOf[Producer1]
val mediator = actorOf(new Transformer(producer))
@@ -42,7 +48,9 @@ class Boot {
mediator.start
consumer.start
- // Publish subscribe example
+ // -----------------------------------------------------------------------
+ // Publish subscribe examples
+ // -----------------------------------------------------------------------
//
// Cometd example commented out because camel-cometd is broken in Camel 2.3
@@ -60,14 +68,27 @@ class Boot {
//val cometdPublisherBridge = actorOf(new PublisherBridge("jetty:http://0.0.0.0:8877/camel/pub/cometd", cometdPublisher)).start
val jmsPublisherBridge = actorOf(new PublisherBridge("jetty:http://0.0.0.0:8877/camel/pub/jms", jmsPublisher)).start
+ // -----------------------------------------------------------------------
+ // Actor un-publishing and re-publishing example
+ // -----------------------------------------------------------------------
+
actorOf[Consumer4].start // POSTing "stop" to http://0.0.0.0:8877/camel/stop stops and unpublishes this actor
actorOf[Consumer5].start // POSTing any msg to http://0.0.0.0:8877/camel/start starts and published Consumer4 again.
+
+ // -----------------------------------------------------------------------
+ // Active object example
+ // -----------------------------------------------------------------------
+
+ ActiveObject.newInstance(classOf[ConsumerPojo1])
}
+/**
+ * @author Martin Krasser
+ */
class CustomRouteBuilder extends RouteBuilder {
def configure {
val actorUri = "actor:%s" format classOf[Consumer2].getName
- from("jetty:http://0.0.0.0:8877/camel/test2").to(actorUri)
+ from("jetty:http://0.0.0.0:8877/camel/custom").to(actorUri)
from("direct:welcome").process(new Processor() {
def process(exchange: Exchange) {
exchange.getOut.setBody("Welcome %s" format exchange.getIn.getBody)
diff --git a/akka-samples/akka-sample-camel/src/main/scala/ClientApplication.scala b/akka-samples/akka-sample-camel/src/main/scala/ClientApplication.scala
new file mode 100644
index 0000000000..467d715360
--- /dev/null
+++ b/akka-samples/akka-sample-camel/src/main/scala/ClientApplication.scala
@@ -0,0 +1,33 @@
+package sample.camel
+
+import se.scalablesolutions.akka.actor.Actor._
+import se.scalablesolutions.akka.actor.{ActiveObject, Actor, ActorRef}
+import se.scalablesolutions.akka.camel.Message
+import se.scalablesolutions.akka.remote.RemoteClient
+
+/**
+ * @author Martin Krasser
+ */
+object ClientApplication {
+
+ //
+ // TODO: completion of example
+ //
+
+ def main(args: Array[String]) {
+ val actor1 = actorOf[RemoteActor1]
+ val actor2 = RemoteClient.actorFor("remote2", "localhost", 7777)
+
+ val actobj1 = ActiveObject.newRemoteInstance(classOf[RemoteConsumerPojo1], "localhost", 7777)
+ //val actobj2 = TODO: create reference to server-managed active object (RemoteConsumerPojo2)
+
+ actor1.start
+
+ println(actor1 !! Message("actor1")) // activates and publishes actor remotely
+ println(actor2 !! Message("actor2")) // actor already activated and published remotely
+
+ println(actobj1.foo("x", "y")) // activates and publishes active object methods remotely
+ // ...
+ }
+
+}
diff --git a/akka-samples/akka-sample-camel/src/main/scala/Application2.scala b/akka-samples/akka-sample-camel/src/main/scala/ServerApplication.scala
similarity index 83%
rename from akka-samples/akka-sample-camel/src/main/scala/Application2.scala
rename to akka-samples/akka-sample-camel/src/main/scala/ServerApplication.scala
index 8a789d13bf..8f53bbb866 100644
--- a/akka-samples/akka-sample-camel/src/main/scala/Application2.scala
+++ b/akka-samples/akka-sample-camel/src/main/scala/ServerApplication.scala
@@ -1,13 +1,13 @@
package sample.camel
-import se.scalablesolutions.akka.camel.service.CamelService
-import se.scalablesolutions.akka.remote.RemoteNode
import se.scalablesolutions.akka.actor.Actor._
+import se.scalablesolutions.akka.camel.CamelService
+import se.scalablesolutions.akka.remote.RemoteNode
/**
* @author Martin Krasser
*/
-object Application2 {
+object ServerApplication {
//
// TODO: completion of example
diff --git a/akka-samples/akka-sample-camel/src/main/scala/StandaloneApplication.scala b/akka-samples/akka-sample-camel/src/main/scala/StandaloneApplication.scala
new file mode 100644
index 0000000000..ebfabe9ce2
--- /dev/null
+++ b/akka-samples/akka-sample-camel/src/main/scala/StandaloneApplication.scala
@@ -0,0 +1,60 @@
+package sample.camel
+
+import org.apache.camel.impl.{DefaultCamelContext, SimpleRegistry}
+import org.apache.camel.builder.RouteBuilder
+
+import se.scalablesolutions.akka.camel.{CamelService, CamelContextManager}
+import se.scalablesolutions.akka.actor.{ActorRegistry, ActiveObject}
+
+/**
+ * @author Martin Krasser
+ */
+object PlainApplication {
+ def main(args: Array[String]) {
+ import CamelContextManager.context
+
+ // 'externally' register active objects
+ val registry = new SimpleRegistry
+ registry.put("pojo1", ActiveObject.newInstance(classOf[BeanIntf], new BeanImpl))
+ registry.put("pojo2", ActiveObject.newInstance(classOf[BeanImpl]))
+
+ // customize CamelContext
+ CamelContextManager.init(new DefaultCamelContext(registry))
+ CamelContextManager.context.addRoutes(new PlainApplicationRoute)
+
+ // start CamelService
+ val camelService = CamelService.newInstance
+ camelService.load
+
+ // 'internally' register active object (requires CamelService)
+ ActiveObject.newInstance(classOf[ConsumerPojo2])
+
+ // access 'externally' registered active objects with active-object component
+ assert("hello msg1" == context.createProducerTemplate.requestBody("direct:test1", "msg1"))
+ assert("hello msg2" == context.createProducerTemplate.requestBody("direct:test2", "msg2"))
+
+ // internal registration is done in background. Wait a bit ...
+ Thread.sleep(1000)
+
+ // access 'internally' (automatically) registered active-objects
+ // (see @consume annotation value at ConsumerPojo2.foo method)
+ assert("default: msg3" == context.createProducerTemplate.requestBody("direct:default", "msg3"))
+
+ // shutdown CamelService
+ camelService.unload
+
+ // shutdown all (internally) created actors
+ ActorRegistry.shutdownAll
+ }
+}
+
+class PlainApplicationRoute extends RouteBuilder {
+ def configure = {
+ from("direct:test1").to("active-object:pojo1?method=foo")
+ from("direct:test2").to("active-object:pojo2?method=foo")
+ }
+}
+
+object SpringApplication {
+ // TODO
+}
diff --git a/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService.scala b/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService.scala
index f31e10861d..90e208d7e1 100644
--- a/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService.scala
+++ b/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService.scala
@@ -123,11 +123,15 @@ class PersistentSimpleServiceActor extends Transactor {
def receive = {
case "Tick" => if (hasStartedTicking) {
val bytes = storage.get(KEY.getBytes).get
- val counter = ByteBuffer.wrap(bytes).getInt
- storage.put(KEY.getBytes, ByteBuffer.allocate(4).putInt(counter + 1).array)
+ val counter = Integer.parseInt(new String(bytes, "UTF8"))
+ storage.put(KEY.getBytes, (counter + 1).toString.getBytes )
+// val bytes = storage.get(KEY.getBytes).get
+// val counter = ByteBuffer.wrap(bytes).getInt
+// storage.put(KEY.getBytes, ByteBuffer.allocate(4).putInt(counter + 1).array)
self.reply(Tick:{counter + 1})
} else {
- storage.put(KEY.getBytes, Array(0.toByte))
+ storage.put(KEY.getBytes, "0".getBytes)
+// storage.put(KEY.getBytes, Array(0.toByte))
hasStartedTicking = true
self.reply(Tick: 0)
}
diff --git a/config/akka-reference.conf b/config/akka-reference.conf
index 25aa0998a3..2ac6bf42f1 100644
--- a/config/akka-reference.conf
+++ b/config/akka-reference.conf
@@ -15,7 +15,7 @@
- version = "0.9"
+ version = "0.10"
# FQN (Fully Qualified Name) to the class doing initial active object/actor
# supervisor bootstrap, should be defined in default constructor
diff --git a/project/build.properties b/project/build.properties
index 30b2ba4ef6..cc8e376f1b 100644
--- a/project/build.properties
+++ b/project/build.properties
@@ -1,6 +1,6 @@
project.organization=se.scalablesolutions.akka
project.name=akka
-project.version=0.9
+project.version=0.10
scala.version=2.8.0.RC3
sbt.version=0.7.4
def.scala.version=2.7.7
diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala
index 90e9a3124e..0745dcb08a 100644
--- a/project/build/AkkaProject.scala
+++ b/project/build/AkkaProject.scala
@@ -48,7 +48,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) with AkkaWrappe
// must be resolved from a ModuleConfiguration. This will result in a significant acceleration of the update action.
// Therefore, if repositories are defined, this must happen as def, not as val.
// -------------------------------------------------------------------------------------------------------------------
- val embeddedRepo = "Embedded Repo" at (info.projectPath / "embedded-repo").asURL.toString // Fast enough => No need for a module configuration here!
+ val embeddedRepo = "Embedded Repo" at (info.projectPath / "embedded-repo").asURL.toString
val scalaTestModuleConfig = ModuleConfiguration("org.scalatest", ScalaToolsSnapshots)
def guiceyFruitRepo = "GuiceyFruit Repo" at "http://guiceyfruit.googlecode.com/svn/repo/releases/"
val guiceyFruitModuleConfig = ModuleConfiguration("org.guiceyfruit", guiceyFruitRepo)
@@ -192,7 +192,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) with AkkaWrappe
val guicey = "org.guiceyfruit" % "guice-all" % "2.0" % "compile"
val aopalliance = "aopalliance" % "aopalliance" % "1.0" % "compile"
val protobuf = "com.google.protobuf" % "protobuf-java" % "2.3.0" % "compile"
- val multiverse = "org.multiverse" % "multiverse-alpha" % MULTIVERSE_VERSION % "compile"
+ val multiverse = "org.multiverse" % "multiverse-alpha" % MULTIVERSE_VERSION % "compile" intransitive()
val jgroups = "jgroups" % "jgroups" % "2.9.0.GA" % "compile"
// testing
@@ -243,6 +243,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) with AkkaWrappe
class AkkaRedisProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) {
val redis = "com.redis" % "redisclient" % "2.8.0.RC3-1.4-SNAPSHOT" % "compile"
+ val commons_codec = "commons-codec" % "commons-codec" % "1.4" % "compile"
override def testOptions = TestFilter((name: String) => name.endsWith("Test")) :: Nil
}
@@ -367,8 +368,8 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) with AkkaWrappe
def removeDupEntries(paths: PathFinder) =
Path.lazyPathFinder {
val mapped = paths.get map { p => (p.relativePath, p) }
- (Map() ++ mapped).values.toList
- }
+ (Map() ++ mapped).values.toList
+ }
def allArtifacts = {
Path.fromFile(buildScalaInstance.libraryJar) +++