This commit is contained in:
Andreas Kollegger 2010-06-17 08:41:28 -04:00
commit 13cc2de82f
61 changed files with 1884 additions and 748 deletions

View file

@ -259,7 +259,7 @@ object AMQP {
case object Stop extends AMQPMessage
private[akka] case class UnregisterMessageConsumerListener(consumer: MessageConsumerListener) extends InternalAMQPMessage
case class UnregisterMessageConsumerListener(consumer: MessageConsumerListener) extends InternalAMQPMessage
private[akka] case class Reconnect(delay: Long) extends InternalAMQPMessage

View file

@ -0,0 +1 @@
class=se.scalablesolutions.akka.camel.component.ActiveObjectComponent

View file

@ -4,9 +4,12 @@
package se.scalablesolutions.akka.camel
import java.util.Map
import org.apache.camel.{ProducerTemplate, CamelContext}
import org.apache.camel.impl.DefaultCamelContext
import se.scalablesolutions.akka.camel.component.ActiveObjectComponent
import se.scalablesolutions.akka.util.Logging
/**
@ -26,7 +29,18 @@ trait CamelContextLifecycle extends Logging {
private var _started = false
/**
* Returns the managed CamelContext.
* Camel component for accessing active objects.
*/
private[camel] var activeObjectComponent: ActiveObjectComponent = _
/**
* Registry in which active objects are TEMPORARILY registered during
* creation of Camel routes to active objects.
*/
private[camel] var activeObjectRegistry: Map[String, AnyRef] = _
/**
* Returns the managed CamelContext.
*/
protected def context: CamelContext = _context
@ -78,10 +92,16 @@ trait CamelContextLifecycle extends Logging {
* Initializes this lifecycle object with the given CamelContext. For the passed
* CamelContext stream-caching is enabled. If applications want to disable stream-
* caching they can do so after this method returned and prior to calling start.
* This method also registers a new
* {@link se.scalablesolutions.akka.camel.component.ActiveObjectComponent} at
* <code>context</code> under a name defined by ActiveObjectComponent.InternalSchema.
*/
def init(context: CamelContext) {
this.activeObjectComponent = new ActiveObjectComponent
this.activeObjectRegistry = activeObjectComponent.activeObjectRegistry
this.context = context
this.context.setStreamCaching(true)
this.context.addComponent(ActiveObjectComponent.InternalSchema, activeObjectComponent)
this.template = context.createProducerTemplate
_initialized = true
log.info("Camel context initialized")
@ -90,7 +110,7 @@ trait CamelContextLifecycle extends Logging {
/**
* Makes a global CamelContext and ProducerTemplate accessible to applications. The lifecycle
* of these objects is managed by se.scalablesolutions.akka.camel.service.CamelService.
* of these objects is managed by se.scalablesolutions.akka.camel.CamelService.
*/
object CamelContextManager extends CamelContextLifecycle {
override def context: CamelContext = super.context

View file

@ -0,0 +1,100 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.camel
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.actor.{AspectInitRegistry, ActorRegistry}
import se.scalablesolutions.akka.util.{Bootable, Logging}
/**
* Used by applications (and the Kernel) to publish consumer actors and active objects via
* Camel endpoints and to manage the life cycle of a a global CamelContext which can be
* accessed via <code>se.scalablesolutions.akka.camel.CamelContextManager.context</code>.
*
* @author Martin Krasser
*/
trait CamelService extends Bootable with Logging {
import CamelContextManager._
private[camel] val consumerPublisher = actorOf[ConsumerPublisher]
private[camel] val publishRequestor = actorOf[PublishRequestor]
// add listener for actor registration events
ActorRegistry.addListener(publishRequestor)
// add listener for AspectInit registration events
AspectInitRegistry.addListener(publishRequestor)
/**
* Starts the CamelService. Any started actor that is a consumer actor will be (asynchronously)
* published as Camel endpoint. Consumer actors that are started after this method returned will
* be published as well. Actor publishing is done asynchronously. A started (loaded) CamelService
* also publishes <code>@consume</code> annotated methods of active objects that have been created
* with <code>ActiveObject.newInstance(..)</code> (and <code>ActiveObject.newInstance(..)</code>
* on a remote node).
*/
abstract override def onLoad = {
super.onLoad
// Only init and start if not already done by application
if (!initialized) init
if (!started) start
// start actor that exposes consumer actors and active objects via Camel endpoints
consumerPublisher.start
// init publishRequestor so that buffered and future events are delivered to consumerPublisher
publishRequestor ! PublishRequestorInit(consumerPublisher)
}
/**
* Stops the CamelService.
*/
abstract override def onUnload = {
ActorRegistry.removeListener(publishRequestor)
AspectInitRegistry.removeListener(publishRequestor)
consumerPublisher.stop
stop
super.onUnload
}
/**
* Starts the CamelService.
*
* @see onLoad
*/
def load = onLoad
/**
* Stops the CamelService.
*
* @see onUnload
*/
def unload = onUnload
}
/**
* CamelService companion object used by standalone applications to create their own
* CamelService instance.
*
* @author Martin Krasser
*/
object CamelService {
/**
* Creates a new CamelService instance.
*/
def newInstance: CamelService = new DefaultCamelService
}
/**
* Default CamelService implementation to be created in Java applications with
* <pre>
* CamelService service = new DefaultCamelService()
* </pre>
*/
class DefaultCamelService extends CamelService {
}

View file

@ -0,0 +1,322 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.camel
import collection.mutable.ListBuffer
import java.io.InputStream
import java.lang.reflect.Method
import java.util.concurrent.CountDownLatch
import org.apache.camel.builder.RouteBuilder
import se.scalablesolutions.akka.actor._
import se.scalablesolutions.akka.actor.annotation.consume
import se.scalablesolutions.akka.camel.component.ActiveObjectComponent
import se.scalablesolutions.akka.util.Logging
/**
* @author Martin Krasser
*/
private[camel] object ConsumerPublisher extends Logging {
/**
* Creates a route to the registered consumer actor.
*/
def handleConsumerRegistered(event: ConsumerRegistered) {
CamelContextManager.context.addRoutes(new ConsumerActorRoute(event.uri, event.id, event.uuid))
log.info("published actor %s at endpoint %s" format (event.actorRef, event.uri))
}
/**
* Stops route to the already un-registered consumer actor.
*/
def handleConsumerUnregistered(event: ConsumerUnregistered) {
CamelContextManager.context.stopRoute(event.id)
log.info("unpublished actor %s from endpoint %s" format (event.actorRef, event.uri))
}
/**
* Creates a route to an active object method.
*/
def handleConsumerMethodRegistered(event: ConsumerMethodRegistered) {
val targetMethod = event.method.getName
val objectId = "%s_%s" format (event.init.actorRef.uuid, targetMethod)
CamelContextManager.activeObjectRegistry.put(objectId, event.activeObject)
CamelContextManager.context.addRoutes(new ConsumerMethodRoute(event.uri, objectId, targetMethod))
log.info("published method %s of %s at endpoint %s" format (targetMethod, event.activeObject, event.uri))
}
}
/**
* Actor that publishes consumer actors and active object methods at Camel endpoints.
* The Camel context used for publishing is CamelContextManager.context. This actor
* accepts messages of type
* se.scalablesolutions.akka.camel.service.ConsumerRegistered,
* se.scalablesolutions.akka.camel.service.ConsumerMethodRegistered and
* se.scalablesolutions.akka.camel.service.ConsumerUnregistered.
*
* @author Martin Krasser
*/
private[camel] class ConsumerPublisher extends Actor {
import ConsumerPublisher._
@volatile private var latch = new CountDownLatch(0)
/**
* Adds a route to the actor identified by a Publish message to the global CamelContext.
*/
protected def receive = {
case r: ConsumerRegistered => {
handleConsumerRegistered(r)
latch.countDown // needed for testing only.
}
case u: ConsumerUnregistered => {
handleConsumerUnregistered(u)
latch.countDown // needed for testing only.
}
case d: ConsumerMethodRegistered => {
handleConsumerMethodRegistered(d)
latch.countDown // needed for testing only.
}
case SetExpectedMessageCount(num) => {
// needed for testing only.
latch = new CountDownLatch(num)
self.reply(latch)
}
case _ => { /* ignore */}
}
}
/**
* Command message used For testing-purposes only.
*/
private[camel] case class SetExpectedMessageCount(num: Int)
/**
* Defines an abstract route to a target which is either an actor or an active object method..
*
* @param endpointUri endpoint URI of the consumer actor or active object method.
* @param id actor identifier or active object identifier (registry key).
*
* @author Martin Krasser
*/
private[camel] abstract class ConsumerRoute(endpointUri: String, id: String) extends RouteBuilder {
// TODO: make conversions configurable
private val bodyConversions = Map(
"file" -> classOf[InputStream]
)
def configure = {
val schema = endpointUri take endpointUri.indexOf(":") // e.g. "http" from "http://whatever/..."
bodyConversions.get(schema) match {
case Some(clazz) => from(endpointUri).routeId(id).convertBodyTo(clazz).to(targetUri)
case None => from(endpointUri).routeId(id).to(targetUri)
}
}
protected def targetUri: String
}
/**
* Defines the route to a consumer actor.
*
* @param endpointUri endpoint URI of the consumer actor
* @param id actor identifier
* @param uuid <code>true</code> if <code>id</code> refers to Actor.uuid, <code>false</code> if
* <code>id</code> refers to Actor.getId.
*
* @author Martin Krasser
*/
private[camel] class ConsumerActorRoute(endpointUri: String, id: String, uuid: Boolean) extends ConsumerRoute(endpointUri, id) {
protected override def targetUri = (if (uuid) "actor:uuid:%s" else "actor:id:%s") format id
}
/**
* Defines the route to an active object method..
*
* @param endpointUri endpoint URI of the consumer actor method
* @param id active object identifier
* @param method name of the method to invoke.
*
* @author Martin Krasser
*/
private[camel] class ConsumerMethodRoute(val endpointUri: String, id: String, method: String) extends ConsumerRoute(endpointUri, id) {
protected override def targetUri = "%s:%s?method=%s" format (ActiveObjectComponent.InternalSchema, id, method)
}
/**
* A registration listener that triggers publication of consumer actors and active object
* methods as well as un-publication of consumer actors. This actor needs to be initialized
* with a <code>PublishRequestorInit</code> command message for obtaining a reference to
* a <code>publisher</code> actor. Before initialization it buffers all outbound messages
* and delivers them to the <code>publisher</code> when receiving a
* <code>PublishRequestorInit</code> message. After initialization, outbound messages are
* delivered directly without buffering.
*
* @see PublishRequestorInit
*
* @author Martin Krasser
*/
private[camel] class PublishRequestor extends Actor {
private val events = ListBuffer[ConsumerEvent]()
private var publisher: Option[ActorRef] = None
protected def receive = {
case ActorRegistered(actor) =>
for (event <- ConsumerRegistered.forConsumer(actor)) deliverCurrentEvent(event)
case ActorUnregistered(actor) =>
for (event <- ConsumerUnregistered.forConsumer(actor)) deliverCurrentEvent(event)
case AspectInitRegistered(proxy, init) =>
for (event <- ConsumerMethodRegistered.forConsumer(proxy, init)) deliverCurrentEvent(event)
case PublishRequestorInit(pub) => {
publisher = Some(pub)
deliverBufferedEvents
}
case _ => { /* ignore */ }
}
private def deliverCurrentEvent(event: ConsumerEvent) = {
publisher match {
case Some(pub) => pub ! event
case None => events += event
}
}
private def deliverBufferedEvents = {
for (event <- events) deliverCurrentEvent(event)
events.clear
}
}
/**
* Command message to initialize a PublishRequestor to use <code>consumerPublisher</code>
* for publishing actors or active object methods.
*/
private[camel] case class PublishRequestorInit(consumerPublisher: ActorRef)
/**
* A consumer (un)registration event.
*
* @author Martin Krasser
*/
private[camel] sealed trait ConsumerEvent
/**
* Event indicating that a consumer actor has been registered at the actor registry.
*
* @param actorRef actor reference
* @param uri endpoint URI of the consumer actor
* @param id actor identifier
* @param uuid <code>true</code> if <code>id</code> is the actor's uuid, <code>false</code> if
* <code>id</code> is the actor's id.
*
* @author Martin Krasser
*/
private[camel] case class ConsumerRegistered(actorRef: ActorRef, uri: String, id: String, uuid: Boolean) extends ConsumerEvent
/**
* Event indicating that a consumer actor has been unregistered from the actor registry.
*
* @param actorRef actor reference
* @param uri endpoint URI of the consumer actor
* @param id actor identifier
* @param uuid <code>true</code> if <code>id</code> is the actor's uuid, <code>false</code> if
* <code>id</code> is the actor's id.
*
* @author Martin Krasser
*/
private[camel] case class ConsumerUnregistered(actorRef: ActorRef, uri: String, id: String, uuid: Boolean) extends ConsumerEvent
/**
* Event indicating that an active object proxy has been created for a POJO. For each
* <code>@consume</code> annotated POJO method a separate instance of this class is
* created.
*
* @param activeObject active object (proxy).
* @param init
* @param uri endpoint URI of the active object method
* @param method method to be published.
*
* @author Martin Krasser
*/
private[camel] case class ConsumerMethodRegistered(activeObject: AnyRef, init: AspectInit, uri: String, method: Method) extends ConsumerEvent
/**
* @author Martin Krasser
*/
private[camel] object ConsumerRegistered {
/**
* Optionally creates an ConsumerRegistered event message for a consumer actor or None if
* <code>actorRef</code> is not a consumer actor.
*/
def forConsumer(actorRef: ActorRef): Option[ConsumerRegistered] = actorRef match {
case ConsumerDescriptor(ref, uri, id, uuid) => Some(ConsumerRegistered(ref, uri, id, uuid))
case _ => None
}
}
/**
* @author Martin Krasser
*/
private[camel] object ConsumerUnregistered {
/**
* Optionally creates an ConsumerUnregistered event message for a consumer actor or None if
* <code>actorRef</code> is not a consumer actor.
*/
def forConsumer(actorRef: ActorRef): Option[ConsumerUnregistered] = actorRef match {
case ConsumerDescriptor(ref, uri, id, uuid) => Some(ConsumerUnregistered(ref, uri, id, uuid))
case _ => None
}
}
/**
* @author Martin Krasser
*/
private[camel] object ConsumerMethodRegistered {
/**
* Creates a list of ConsumerMethodRegistered event messages for an active object or an empty
* list if the active object is a proxy for an remote active object or the active object doesn't
* have any <code>@consume</code> annotated methods.
*/
def forConsumer(activeObject: AnyRef, init: AspectInit): List[ConsumerMethodRegistered] = {
// TODO: support consumer annotation inheritance
// - visit overridden methods in superclasses
// - visit implemented method declarations in interfaces
if (init.remoteAddress.isDefined) Nil // let remote node publish active object methods on endpoints
else for (m <- activeObject.getClass.getMethods.toList; if (m.isAnnotationPresent(classOf[consume])))
yield ConsumerMethodRegistered(activeObject, init, m.getAnnotation(classOf[consume]).value, m)
}
}
/**
* Describes a consumer actor with elements that are relevant for publishing an actor at a
* Camel endpoint (or unpublishing an actor from an endpoint).
*
* @author Martin Krasser
*/
private[camel] object ConsumerDescriptor {
/**
* An extractor that optionally creates a 4-tuple from a consumer actor reference containing
* the actor reference itself, endpoint URI, identifier and a hint whether the identifier
* is the actor uuid or actor id. If <code>actorRef</code> doesn't reference a consumer actor,
* None is returned.
*/
def unapply(actorRef: ActorRef): Option[(ActorRef, String, String, Boolean)] =
unapplyConsumerInstance(actorRef) orElse unapplyConsumeAnnotated(actorRef)
private def unapplyConsumeAnnotated(actorRef: ActorRef): Option[(ActorRef, String, String, Boolean)] = {
val annotation = actorRef.actorClass.getAnnotation(classOf[consume])
if (annotation eq null) None
else if (actorRef.remoteAddress.isDefined) None
else Some((actorRef, annotation.value, actorRef.id, false))
}
private def unapplyConsumerInstance(actorRef: ActorRef): Option[(ActorRef, String, String, Boolean)] =
if (!actorRef.actor.isInstanceOf[Consumer]) None
else if (actorRef.remoteAddress.isDefined) None
else Some((actorRef, actorRef.actor.asInstanceOf[Consumer].endpointUri, actorRef.uuid, true))
}

View file

@ -160,7 +160,7 @@ trait Producer { this: Actor =>
*
* @author Martin Krasser
*/
class ProducerResponseSender(
private[camel] class ProducerResponseSender(
headers: Map[String, Any],
sender: Option[ActorRef],
senderFuture: Option[CompletableFuture[Any]],

View file

@ -0,0 +1,108 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.camel.component
import java.util.Map
import java.util.concurrent.ConcurrentHashMap
import org.apache.camel.CamelContext
import org.apache.camel.component.bean._
/**
* @author Martin Krasser
*/
object ActiveObjectComponent {
/**
* Default schema name for active object endpoint URIs.
*/
val InternalSchema = "active-object-internal"
}
/**
* Camel component for exchanging messages with active objects. This component
* tries to obtain the active object from the <code>activeObjectRegistry</code>
* first. If it's not there it tries to obtain it from the CamelContext's registry.
*
* @see org.apache.camel.component.bean.BeanComponent
*
* @author Martin Krasser
*/
class ActiveObjectComponent extends BeanComponent {
val activeObjectRegistry = new ConcurrentHashMap[String, AnyRef]
/**
* Creates a {@link org.apache.camel.component.bean.BeanEndpoint} with a custom
* bean holder that uses <code>activeObjectRegistry</code> for getting access to
* active objects (beans).
*
* @see se.scalablesolutions.akka.camel.component.ActiveObjectHolder
*/
override def createEndpoint(uri: String, remaining: String, parameters: Map[String, AnyRef]) = {
val endpoint = new BeanEndpoint(uri, this)
endpoint.setBeanName(remaining)
endpoint.setBeanHolder(createBeanHolder(remaining))
setProperties(endpoint.getProcessor, parameters)
endpoint
}
private def createBeanHolder(beanName: String) =
new ActiveObjectHolder(activeObjectRegistry, getCamelContext, beanName).createCacheHolder
}
/**
* {@link org.apache.camel.component.bean.BeanHolder} implementation that uses a custom
* registry for getting access to active objects.
*
* @author Martin Krasser
*/
class ActiveObjectHolder(activeObjectRegistry: Map[String, AnyRef], context: CamelContext, name: String)
extends RegistryBean(context, name) {
/**
* Returns an {@link se.scalablesolutions.akka.camel.component.ActiveObjectInfo} instance.
*/
override def getBeanInfo: BeanInfo =
new ActiveObjectInfo(getContext, getBean.getClass, getParameterMappingStrategy)
/**
* Obtains an active object from <code>activeObjectRegistry</code>.
*/
override def getBean: AnyRef = {
val bean = activeObjectRegistry.get(getName)
if (bean eq null) super.getBean else bean
}
}
/**
* Provides active object meta information.
*
* @author Martin Krasser
*/
class ActiveObjectInfo(context: CamelContext, clazz: Class[_], strategy: ParameterMappingStrategy)
extends BeanInfo(context, clazz, strategy) {
/**
* Introspects AspectWerkz proxy classes.
*
* @param clazz AspectWerkz proxy class.
*/
protected override def introspect(clazz: Class[_]): Unit = {
// TODO: fix target class detection in BeanInfo.introspect(Class)
// Camel assumes that classes containing a '$$' in the class name
// are classes generated with CGLIB. This conflicts with proxies
// created from interfaces with AspectWerkz. Once the fix is in
// place this method can be removed.
for (method <- clazz.getDeclaredMethods) {
if (isValidMethod(clazz, method)) {
introspect(clazz, method)
}
}
val superclass = clazz.getSuperclass
if (superclass != null && !superclass.equals(classOf[AnyRef])) {
introspect(superclass)
}
}
}

View file

@ -1,86 +0,0 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.camel.service
import se.scalablesolutions.akka.actor.ActorRegistry
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.camel.CamelContextManager
import se.scalablesolutions.akka.util.{Bootable, Logging}
/**
* Used by applications (and the Kernel) to publish consumer actors via Camel
* endpoints and to manage the life cycle of a a global CamelContext which can
* be accessed via se.scalablesolutions.akka.camel.CamelContextManager.
*
* @author Martin Krasser
*/
trait CamelService extends Bootable with Logging {
import CamelContextManager._
private[camel] val consumerPublisher = actorOf[ConsumerPublisher]
private[camel] val publishRequestor = actorOf(new PublishRequestor(consumerPublisher))
/**
* Starts the CamelService. Any started actor that is a consumer actor will be (asynchronously)
* published as Camel endpoint. Consumer actors that are started after this method returned will
* be published as well. Actor publishing is done asynchronously.
*/
abstract override def onLoad = {
super.onLoad
// Only init and start if not already done by application
if (!initialized) init
if (!started) start
// start actor that exposes consumer actors via Camel endpoints
consumerPublisher.start
// add listener for actor registration events
ActorRegistry.addRegistrationListener(publishRequestor.start)
// publish already registered consumer actors
for (actor <- ActorRegistry.actors; event <- ConsumerRegistered.forConsumer(actor)) consumerPublisher ! event
}
/**
* Stops the CamelService.
*/
abstract override def onUnload = {
ActorRegistry.removeRegistrationListener(publishRequestor)
publishRequestor.stop
consumerPublisher.stop
stop
super.onUnload
}
/**
* Starts the CamelService.
*
* @see onLoad
*/
def load = onLoad
/**
* Stops the CamelService.
*
* @see onUnload
*/
def unload = onUnload
}
/**
* CamelService companion object used by standalone applications to create their own
* CamelService instance.
*
* @author Martin Krasser
*/
object CamelService {
/**
* Creates a new CamelService instance.
*/
def newInstance: CamelService = new CamelService {}
}

View file

@ -1,210 +0,0 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.camel.service
import java.io.InputStream
import java.util.concurrent.CountDownLatch
import org.apache.camel.builder.RouteBuilder
import se.scalablesolutions.akka.actor.{ActorUnregistered, ActorRegistered, Actor, ActorRef}
import se.scalablesolutions.akka.actor.annotation.consume
import se.scalablesolutions.akka.camel.{Consumer, CamelContextManager}
import se.scalablesolutions.akka.util.Logging
/**
* Actor that publishes consumer actors as Camel endpoints at the CamelContext managed
* by se.scalablesolutions.akka.camel.CamelContextManager. It accepts messages of type
* se.scalablesolutions.akka.camel.service.ConsumerRegistered and
* se.scalablesolutions.akka.camel.service.ConsumerUnregistered.
*
* @author Martin Krasser
*/
class ConsumerPublisher extends Actor with Logging {
@volatile private var publishLatch = new CountDownLatch(0)
@volatile private var unpublishLatch = new CountDownLatch(0)
/**
* Adds a route to the actor identified by a Publish message to the global CamelContext.
*/
protected def receive = {
case r: ConsumerRegistered => {
handleConsumerRegistered(r)
publishLatch.countDown // needed for testing only.
}
case u: ConsumerUnregistered => {
handleConsumerUnregistered(u)
unpublishLatch.countDown // needed for testing only.
}
case _ => { /* ignore */}
}
/**
* Sets the expected number of actors to be published. Used for testing only.
*/
private[camel] def expectPublishCount(count: Int): Unit =
publishLatch = new CountDownLatch(count)
/**
* Sets the expected number of actors to be unpublished. Used for testing only.
*/
private[camel] def expectUnpublishCount(count: Int): Unit =
unpublishLatch = new CountDownLatch(count)
/**
* Waits for the expected number of actors to be published. Used for testing only.
*/
private[camel] def awaitPublish = publishLatch.await
/**
* Waits for the expected number of actors to be unpublished. Used for testing only.
*/
private[camel] def awaitUnpublish = unpublishLatch.await
/**
* Creates a route to the registered consumer actor.
*/
private def handleConsumerRegistered(event: ConsumerRegistered) {
CamelContextManager.context.addRoutes(new ConsumerRoute(event.uri, event.id, event.uuid))
log.info("published actor %s (%s) at endpoint %s" format (event.clazz, event.id, event.uri))
}
/**
* Stops route to the already un-registered consumer actor.
*/
private def handleConsumerUnregistered(event: ConsumerUnregistered) {
CamelContextManager.context.stopRoute(event.id)
log.info("unpublished actor %s (%s) from endpoint %s" format (event.clazz, event.id, event.uri))
}
}
/**
* Defines the route to a consumer actor.
*
* @param endpointUri endpoint URI of the consumer actor
* @param id actor identifier
* @param uuid <code>true</code> if <code>id</code> refers to Actor.uuid, <code>false</code> if
* <code>id</code> refers to Actor.getId.
*
* @author Martin Krasser
*/
class ConsumerRoute(val endpointUri: String, id: String, uuid: Boolean) extends RouteBuilder {
// TODO: make conversions configurable
private val bodyConversions = Map(
"file" -> classOf[InputStream]
)
def configure = {
val schema = endpointUri take endpointUri.indexOf(":") // e.g. "http" from "http://whatever/..."
bodyConversions.get(schema) match {
case Some(clazz) => from(endpointUri).routeId(id).convertBodyTo(clazz).to(actorUri)
case None => from(endpointUri).routeId(id).to(actorUri)
}
}
private def actorUri = (if (uuid) "actor:uuid:%s" else "actor:id:%s") format id
}
/**
* A registration listener that triggers publication and un-publication of consumer actors.
*
* @author Martin Krasser
*/
class PublishRequestor(consumerPublisher: ActorRef) extends Actor {
protected def receive = {
case ActorRegistered(actor) => for (event <- ConsumerRegistered.forConsumer(actor)) consumerPublisher ! event
case ActorUnregistered(actor) => for (event <- ConsumerUnregistered.forConsumer(actor)) consumerPublisher ! event
}
}
/**
* Consumer actor lifecycle event.
*
* @author Martin Krasser
*/
sealed trait ConsumerEvent
/**
* Event indicating that a consumer actor has been registered at the actor registry.
*
* @param clazz clazz name of the referenced actor
* @param uri endpoint URI of the consumer actor
* @param id actor identifier
* @param uuid <code>true</code> if <code>id</code> is the actor's uuid, <code>false</code> if
* <code>id</code> is the actor's id.
*
* @author Martin Krasser
*/
case class ConsumerRegistered(clazz: String, uri: String, id: String, uuid: Boolean) extends ConsumerEvent
/**
* Event indicating that a consumer actor has been unregistered from the actor registry.
*
* @param clazz clazz name of the referenced actor
* @param uri endpoint URI of the consumer actor
* @param id actor identifier
* @param uuid <code>true</code> if <code>id</code> is the actor's uuid, <code>false</code> if
* <code>id</code> is the actor's id.
*
* @author Martin Krasser
*/
case class ConsumerUnregistered(clazz: String, uri: String, id: String, uuid: Boolean) extends ConsumerEvent
/**
* @author Martin Krasser
*/
private[camel] object ConsumerRegistered {
/**
* Optionally creates an ConsumerRegistered event message for a consumer actor or None if
* <code>actorRef</code> is not a consumer actor.
*/
def forConsumer(actorRef: ActorRef): Option[ConsumerRegistered] = actorRef match {
case ConsumerDescriptor(clazz, uri, id, uuid) => Some(ConsumerRegistered(clazz, uri, id, uuid))
case _ => None
}
}
/**
* @author Martin Krasser
*/
private[camel] object ConsumerUnregistered {
/**
* Optionally creates an ConsumerUnregistered event message for a consumer actor or None if
* <code>actorRef</code> is not a consumer actor.
*/
def forConsumer(actorRef: ActorRef): Option[ConsumerUnregistered] = actorRef match {
case ConsumerDescriptor(clazz, uri, id, uuid) => Some(ConsumerUnregistered(clazz, uri, id, uuid))
case _ => None
}
}
/**
* Describes a consumer actor with elements that are relevant for publishing an actor at a
* Camel endpoint (or unpublishing an actor from an endpoint).
*
* @author Martin Krasser
*/
private[camel] object ConsumerDescriptor {
/**
* An extractor that optionally creates a 4-tuple from a consumer actor reference containing
* the target actor's class name, endpoint URI, identifier and a hint whether the identifier
* is the actor uuid or actor id. If <code>actorRef</code> doesn't reference a consumer actor,
* None is returned.
*/
def unapply(actorRef: ActorRef): Option[(String, String, String, Boolean)] =
unapplyConsumerInstance(actorRef) orElse unapplyConsumeAnnotated(actorRef)
private def unapplyConsumeAnnotated(actorRef: ActorRef): Option[(String, String, String, Boolean)] = {
val annotation = actorRef.actorClass.getAnnotation(classOf[consume])
if (annotation eq null) None
else if (actorRef.remoteAddress.isDefined) None
else Some((actorRef.actor.getClass.getName, annotation.value, actorRef.id, false))
}
private def unapplyConsumerInstance(actorRef: ActorRef): Option[(String, String, String, Boolean)] =
if (!actorRef.actor.isInstanceOf[Consumer]) None
else if (actorRef.remoteAddress.isDefined) None
else Some((actorRef.actor.getClass.getName, actorRef.actor.asInstanceOf[Consumer].endpointUri, actorRef.uuid, true))
}

View file

@ -0,0 +1,14 @@
package se.scalablesolutions.akka.camel;
import se.scalablesolutions.akka.actor.annotation.consume;
/**
* @author Martin Krasser
*/
public class Pojo {
public String foo(String s) {
return String.format("foo: %s", s);
}
}

View file

@ -0,0 +1,34 @@
package se.scalablesolutions.akka.camel;
import org.apache.camel.Body;
import org.apache.camel.Header;
import se.scalablesolutions.akka.actor.annotation.consume;
/**
* @author Martin Krasser
*/
public class PojoBase {
public String m1(String b, String h) {
return "m1base: " + b + " " + h;
}
@consume("direct:m2base")
public String m2(@Body String b, @Header("test") String h) {
return "m2base: " + b + " " + h;
}
@consume("direct:m3base")
public String m3(@Body String b, @Header("test") String h) {
return "m3base: " + b + " " + h;
}
@consume("direct:m4base")
public String m4(@Body String b, @Header("test") String h) {
return "m4base: " + b + " " + h;
}
public void m5(@Body String b, @Header("test") String h) {
}
}

View file

@ -0,0 +1,23 @@
package se.scalablesolutions.akka.camel;
import org.apache.camel.Body;
import org.apache.camel.Header;
import se.scalablesolutions.akka.actor.annotation.consume;
/**
* @author Martin Krasser
*/
public class PojoImpl implements PojoIntf {
public String m1(String b, String h) {
return "m1impl: " + b + " " + h;
}
@consume("direct:m2impl")
public String m2(@Body String b, @Header("test") String h) {
return "m2impl: " + b + " " + h;
}
}

View file

@ -0,0 +1,18 @@
package se.scalablesolutions.akka.camel;
import org.apache.camel.Body;
import org.apache.camel.Header;
import se.scalablesolutions.akka.actor.annotation.consume;
/**
* @author Martin Krasser
*/
public interface PojoIntf {
public String m1(String b, String h);
@consume("direct:m2intf")
public String m2(@Body String b, @Header("test") String h);
}

View file

@ -0,0 +1,15 @@
package se.scalablesolutions.akka.camel;
import se.scalablesolutions.akka.actor.annotation.consume;
/**
* @author Martin Krasser
*/
public class PojoRemote {
@consume("direct:remote-active-object")
public String foo(String s) {
return String.format("remote active object: %s", s);
}
}

View file

@ -0,0 +1,14 @@
package se.scalablesolutions.akka.camel;
import se.scalablesolutions.akka.actor.annotation.consume;
/**
* @author Martin Krasser
*/
public class PojoSingle {
@consume("direct:foo")
public void foo(String b) {
}
}

View file

@ -0,0 +1,27 @@
package se.scalablesolutions.akka.camel;
import org.apache.camel.Body;
import org.apache.camel.Header;
import se.scalablesolutions.akka.actor.annotation.consume;
public class PojoSub extends PojoBase {
@Override
@consume("direct:m1sub")
public String m1(@Body String b, @Header("test") String h) {
return "m1sub: " + b + " " + h;
}
@Override
public String m2(String b, String h) {
return "m2sub: " + b + " " + h;
}
@Override
@consume("direct:m3sub")
public String m3(@Body String b, @Header("test") String h) {
return "m3sub: " + b + " " + h;
}
}

View file

@ -1,53 +1,36 @@
package se.scalablesolutions.akka.camel.service
package se.scalablesolutions.akka.camel
import java.util.concurrent.{CountDownLatch, TimeUnit}
import org.apache.camel.builder.RouteBuilder
import org.scalatest.{GivenWhenThen, BeforeAndAfterAll, FeatureSpec}
import se.scalablesolutions.akka.actor.{Actor, ActorRegistry}
import se.scalablesolutions.akka.camel.{CamelContextManager, Message, Consumer}
import Actor._
object CamelServiceFeatureTest {
class TestConsumer(uri: String) extends Actor with Consumer {
def endpointUri = uri
protected def receive = {
case msg: Message => self.reply("received %s" format msg.body)
}
}
class TestActor extends Actor {
self.id = "custom-actor-id"
protected def receive = {
case msg: Message => self.reply("received %s" format msg.body)
}
}
class TestRoute extends RouteBuilder {
def configure {
from("direct:custom-route-test-1").to("actor:custom-actor-id")
}
}
}
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.actor.{ActiveObject, Actor, ActorRegistry}
class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with GivenWhenThen {
import CamelServiceFeatureTest._
var service: CamelService = CamelService.newInstance
var service: CamelService = _
override protected def beforeAll = {
ActorRegistry.shutdownAll
// create new CamelService instance
service = CamelService.newInstance
// register test consumer before starting the CamelService
actorOf(new TestConsumer("direct:publish-test-1")).start
// Consigure a custom camel route
// Configure a custom camel route
CamelContextManager.init
CamelContextManager.context.addRoutes(new TestRoute)
// set expectations for testing purposes
service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].expectPublishCount(1)
// start consumer publisher, otherwise we cannot set message
// count expectations in the next step (needed for testing only).
service.consumerPublisher.start
// set expectations on publish count
val latch = service.consumerPublisher.!![CountDownLatch](SetExpectedMessageCount(1)).get
// start the CamelService
service.load
// await publication of first test consumer
service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].awaitPublish
assert(latch.await(5000, TimeUnit.MILLISECONDS))
}
override protected def afterAll = {
@ -60,9 +43,9 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi
scenario("access registered consumer actors via Camel direct-endpoints") {
given("two consumer actors registered before and after CamelService startup")
service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].expectPublishCount(1)
val latch = service.consumerPublisher.!![CountDownLatch](SetExpectedMessageCount(1)).get
actorOf(new TestConsumer("direct:publish-test-2")).start
service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].awaitPublish
assert(latch.await(5000, TimeUnit.MILLISECONDS))
when("requests are sent to these actors")
val response1 = CamelContextManager.template.requestBody("direct:publish-test-1", "msg1")
@ -81,14 +64,14 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi
given("a consumer actor that has been stopped")
assert(CamelContextManager.context.hasEndpoint(endpointUri) eq null)
service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].expectPublishCount(1)
var latch = service.consumerPublisher.!![CountDownLatch](SetExpectedMessageCount(1)).get
val consumer = actorOf(new TestConsumer(endpointUri)).start
service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].awaitPublish
assert(latch.await(5000, TimeUnit.MILLISECONDS))
assert(CamelContextManager.context.hasEndpoint(endpointUri) ne null)
service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].expectUnpublishCount(1)
latch = service.consumerPublisher.!![CountDownLatch](SetExpectedMessageCount(1)).get
consumer.stop
service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].awaitUnpublish
assert(latch.await(5000, TimeUnit.MILLISECONDS))
// endpoint is still there but the route has been stopped
assert(CamelContextManager.context.hasEndpoint(endpointUri) ne null)
@ -114,4 +97,48 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi
assert(response === "received msg3")
}
}
feature("Publish active object methods in the global CamelContext") {
scenario("access active object methods via Camel direct-endpoints") {
given("an active object registered after CamelService startup")
val latch = service.consumerPublisher.!![CountDownLatch](SetExpectedMessageCount(3)).get
ActiveObject.newInstance(classOf[PojoBase])
assert(latch.await(5000, TimeUnit.MILLISECONDS))
when("requests are sent to published methods")
val response1 = CamelContextManager.template.requestBodyAndHeader("direct:m2base", "x", "test", "y")
val response2 = CamelContextManager.template.requestBodyAndHeader("direct:m3base", "x", "test", "y")
val response3 = CamelContextManager.template.requestBodyAndHeader("direct:m4base", "x", "test", "y")
then("each should have returned a different response")
assert(response1 === "m2base: x y")
assert(response2 === "m3base: x y")
assert(response3 === "m4base: x y")
}
}
}
object CamelServiceFeatureTest {
class TestConsumer(uri: String) extends Actor with Consumer {
def endpointUri = uri
protected def receive = {
case msg: Message => self.reply("received %s" format msg.body)
}
}
class TestActor extends Actor {
self.id = "custom-actor-id"
protected def receive = {
case msg: Message => self.reply("received %s" format msg.body)
}
}
class TestRoute extends RouteBuilder {
def configure {
from("direct:custom-route-test-1").to("actor:custom-actor-id")
}
}
}

View file

@ -0,0 +1,46 @@
package se.scalablesolutions.akka.camel
import java.net.InetSocketAddress
import org.junit.Test
import org.scalatest.junit.JUnitSuite
import se.scalablesolutions.akka.actor.{AspectInit, ActiveObject}
import se.scalablesolutions.akka.camel.ConsumerMethodRegistered._
class ConsumerMethodRegisteredTest extends JUnitSuite {
val remoteAddress = new InetSocketAddress("localhost", 8888);
val remoteAspectInit = AspectInit(classOf[String], null, Some(remoteAddress), 1000)
val localAspectInit = AspectInit(classOf[String], null, None, 1000)
val activePojoBase = ActiveObject.newInstance(classOf[PojoBase])
val activePojoSub = ActiveObject.newInstance(classOf[PojoSub])
val activePojoIntf = ActiveObject.newInstance(classOf[PojoIntf], new PojoImpl)
val ascendingMethodName = (r1: ConsumerMethodRegistered, r2: ConsumerMethodRegistered) =>
r1.method.getName < r2.method.getName
@Test def shouldSelectPojoBaseMethods234 = {
val registered = forConsumer(activePojoBase, localAspectInit).sortWith(ascendingMethodName)
assert(registered.size === 3)
assert(registered.map(_.method.getName) === List("m2", "m3", "m4"))
}
@Test def shouldSelectPojoSubMethods134 = {
val registered = forConsumer(activePojoSub, localAspectInit).sortWith(ascendingMethodName)
assert(registered.size === 3)
assert(registered.map(_.method.getName) === List("m1", "m3", "m4"))
}
@Test def shouldSelectPojoIntfMethod2 = {
val registered = forConsumer(activePojoIntf, localAspectInit)
assert(registered.size === 1)
assert(registered(0).method.getName === "m2")
}
@Test def shouldIgnoreRemoteProxies = {
val registered = forConsumer(activePojoBase, remoteAspectInit)
assert(registered.size === 0)
}
}

View file

@ -1,4 +1,4 @@
package se.scalablesolutions.akka.camel.service
package se.scalablesolutions.akka.camel
import org.junit.Test
import org.scalatest.junit.JUnitSuite
@ -6,7 +6,6 @@ import org.scalatest.junit.JUnitSuite
import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.actor.annotation.consume
import se.scalablesolutions.akka.camel.Consumer
object ConsumerRegisteredTest {
@consume("mock:test1")
@ -29,20 +28,22 @@ class ConsumerRegisteredTest extends JUnitSuite {
import ConsumerRegisteredTest._
@Test def shouldCreatePublishRequestList = {
val as = List(actorOf[ConsumeAnnotatedActor])
val a = actorOf[ConsumeAnnotatedActor]
val as = List(a)
val events = for (a <- as; e <- ConsumerRegistered.forConsumer(a)) yield e
assert(events === List(ConsumerRegistered(classOf[ConsumeAnnotatedActor].getName, "mock:test1", "test", false)))
assert(events === List(ConsumerRegistered(a, "mock:test1", "test", false)))
}
@Test def shouldCreateSomePublishRequestWithActorId = {
val event = ConsumerRegistered.forConsumer(actorOf[ConsumeAnnotatedActor])
assert(event === Some(ConsumerRegistered(classOf[ConsumeAnnotatedActor].getName, "mock:test1", "test", false)))
val a = actorOf[ConsumeAnnotatedActor]
val event = ConsumerRegistered.forConsumer(a)
assert(event === Some(ConsumerRegistered(a, "mock:test1", "test", false)))
}
@Test def shouldCreateSomePublishRequestWithActorUuid = {
val ca = actorOf[ConsumerActor]
val event = ConsumerRegistered.forConsumer(ca)
assert(event === Some(ConsumerRegistered(ca.actor.getClass.getName, "mock:test2", ca.uuid, true)))
assert(event === Some(ConsumerRegistered(ca, "mock:test2", ca.uuid, true)))
}
@Test def shouldCreateNone = {

View file

@ -0,0 +1,69 @@
package se.scalablesolutions.akka.camel
import java.util.concurrent.{CountDownLatch, TimeUnit}
import org.junit.{Before, After, Test}
import org.scalatest.junit.JUnitSuite
import se.scalablesolutions.akka.actor._
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.camel.support.{SetExpectedMessageCount => SetExpectedTestMessageCount, _}
class PublishRequestorTest extends JUnitSuite {
import PublishRequestorTest._
var publisher: ActorRef = _
var requestor: ActorRef = _
var consumer: ActorRef = _
@Before def setUp = {
publisher = actorOf[PublisherMock].start
requestor = actorOf[PublishRequestor].start
requestor ! PublishRequestorInit(publisher)
consumer = actorOf(new Actor with Consumer {
def endpointUri = "mock:test"
protected def receive = null
}).start
}
@After def tearDown = {
ActorRegistry.shutdownAll
}
@Test def shouldReceiveConsumerMethodRegisteredEvent = {
val obj = ActiveObject.newInstance(classOf[PojoSingle])
val init = AspectInit(classOf[PojoSingle], null, None, 1000)
val latch = publisher.!![CountDownLatch](SetExpectedTestMessageCount(1)).get
requestor ! AspectInitRegistered(obj, init)
assert(latch.await(5000, TimeUnit.MILLISECONDS))
val event = (publisher !! GetRetainedMessage).get.asInstanceOf[ConsumerMethodRegistered]
assert(event.init === init)
assert(event.uri === "direct:foo")
assert(event.activeObject === obj)
assert(event.method.getName === "foo")
}
@Test def shouldReceiveConsumerRegisteredEvent = {
val latch = publisher.!![CountDownLatch](SetExpectedTestMessageCount(1)).get
requestor ! ActorRegistered(consumer)
assert(latch.await(5000, TimeUnit.MILLISECONDS))
assert((publisher !! GetRetainedMessage) ===
Some(ConsumerRegistered(consumer, "mock:test", consumer.uuid, true)))
}
@Test def shouldReceiveConsumerUnregisteredEvent = {
val latch = publisher.!![CountDownLatch](SetExpectedTestMessageCount(1)).get
requestor ! ActorUnregistered(consumer)
assert(latch.await(5000, TimeUnit.MILLISECONDS))
assert((publisher !! GetRetainedMessage) ===
Some(ConsumerUnregistered(consumer, "mock:test", consumer.uuid, true)))
}
}
object PublishRequestorTest {
class PublisherMock extends TestActor with Retain with Countdown {
def handler = retain andThen countdown
}
}

View file

@ -0,0 +1,89 @@
package se.scalablesolutions.akka.camel
import java.util.concurrent.{CountDownLatch, TimeUnit}
import org.scalatest.{GivenWhenThen, BeforeAndAfterAll, FeatureSpec}
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.actor.{ActiveObject, ActorRegistry, RemoteActor}
import se.scalablesolutions.akka.remote.{RemoteClient, RemoteServer}
/**
* @author Martin Krasser
*/
class RemoteConsumerTest extends FeatureSpec with BeforeAndAfterAll with GivenWhenThen {
import RemoteConsumerTest._
var service: CamelService = _
var server: RemoteServer = _
override protected def beforeAll = {
ActorRegistry.shutdownAll
service = CamelService.newInstance
service.load
server = new RemoteServer()
server.start(host, port)
Thread.sleep(1000)
}
override protected def afterAll = {
server.shutdown
service.unload
RemoteClient.shutdownAll
ActorRegistry.shutdownAll
Thread.sleep(1000)
}
feature("Client-initiated remote consumer actor") {
scenario("access published remote consumer actor") {
given("a client-initiated remote consumer actor")
val consumer = actorOf[RemoteConsumer].start
when("remote consumer publication is triggered")
val latch = service.consumerPublisher.!![CountDownLatch](SetExpectedMessageCount(1)).get
consumer !! "init"
assert(latch.await(5000, TimeUnit.MILLISECONDS))
then("the published actor is accessible via its endpoint URI")
val response = CamelContextManager.template.requestBody("direct:remote-actor", "test")
assert(response === "remote actor: test")
}
}
/* TODO: enable once issues with remote active objects are resolved
feature("Client-initiated remote consumer active object") {
scenario("access published remote consumer method") {
given("a client-initiated remote consumer active object")
val consumer = ActiveObject.newRemoteInstance(classOf[PojoRemote], host, port)
when("remote consumer publication is triggered")
val latch = service.consumerPublisher.!![CountDownLatch](SetExpectedMessageCount(1)).get
consumer.foo("init")
assert(latch.await(5000, TimeUnit.MILLISECONDS))
then("the published method is accessible via its endpoint URI")
val response = CamelContextManager.template.requestBody("direct:remote-active-object", "test")
assert(response === "remote active object: test")
}
}
*/
}
object RemoteConsumerTest {
val host = "localhost"
val port = 7774
class RemoteConsumer extends RemoteActor(host, port) with Consumer {
def endpointUri = "direct:remote-actor"
protected def receive = {
case "init" => self.reply("done")
case m: Message => self.reply("remote actor: %s" format m.body)
}
}
}

View file

@ -0,0 +1,105 @@
package se.scalablesolutions.akka.camel.component
import org.scalatest.{BeforeAndAfterEach, BeforeAndAfterAll, FeatureSpec}
import org.apache.camel.builder.RouteBuilder
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.actor.{ActorRegistry, ActiveObject}
import se.scalablesolutions.akka.camel._
import org.apache.camel.impl.{DefaultCamelContext, SimpleRegistry}
import org.apache.camel.{ResolveEndpointFailedException, ExchangePattern, Exchange, Processor}
/**
* @author Martin Krasser
*/
class ActiveObjectComponentFeatureTest extends FeatureSpec with BeforeAndAfterAll with BeforeAndAfterEach {
import ActiveObjectComponentFeatureTest._
import CamelContextManager.template
override protected def beforeAll = {
val activePojo = ActiveObject.newInstance(classOf[Pojo]) // not a consumer
val activePojoBase = ActiveObject.newInstance(classOf[PojoBase])
val activePojoIntf = ActiveObject.newInstance(classOf[PojoIntf], new PojoImpl)
val registry = new SimpleRegistry
registry.put("pojo", activePojo)
CamelContextManager.init(new DefaultCamelContext(registry))
CamelContextManager.context.addRoutes(new CustomRouteBuilder)
CamelContextManager.start
CamelContextManager.activeObjectRegistry.put("base", activePojoBase)
CamelContextManager.activeObjectRegistry.put("intf", activePojoIntf)
}
override protected def afterAll = {
CamelContextManager.stop
ActorRegistry.shutdownAll
}
feature("Communicate with an active object from a Camel application using active object endpoint URIs") {
import ActiveObjectComponent.InternalSchema
import ExchangePattern._
scenario("in-out exchange with proxy created from interface and method returning String") {
val result = template.requestBodyAndHeader("%s:intf?method=m2" format InternalSchema, "x", "test", "y")
assert(result === "m2impl: x y")
}
scenario("in-out exchange with proxy created from class and method returning String") {
val result = template.requestBodyAndHeader("%s:base?method=m2" format InternalSchema, "x", "test", "y")
assert(result === "m2base: x y")
}
scenario("in-out exchange with proxy created from class and method returning void") {
val result = template.requestBodyAndHeader("%s:base?method=m5" format InternalSchema, "x", "test", "y")
assert(result === "x") // returns initial body
}
scenario("in-only exchange with proxy created from class and method returning String") {
val result = template.send("%s:base?method=m2" format InternalSchema, InOnly, new Processor {
def process(exchange: Exchange) = {
exchange.getIn.setBody("x")
exchange.getIn.setHeader("test", "y")
}
});
assert(result.getPattern === InOnly)
assert(result.getIn.getBody === "m2base: x y")
assert(result.getOut.getBody === null)
}
scenario("in-only exchange with proxy created from class and method returning void") {
val result = template.send("%s:base?method=m5" format InternalSchema, InOnly, new Processor {
def process(exchange: Exchange) = {
exchange.getIn.setBody("x")
exchange.getIn.setHeader("test", "y")
}
});
assert(result.getPattern === InOnly)
assert(result.getIn.getBody === "x")
assert(result.getOut.getBody === null)
}
}
feature("Communicate with an active object from a Camel application from a custom Camel route") {
scenario("in-out exchange with externally registered active object") {
val result = template.requestBody("direct:test", "test")
assert(result === "foo: test")
}
scenario("in-out exchange with internally registered active object not possible") {
intercept[ResolveEndpointFailedException] {
template.requestBodyAndHeader("active-object:intf?method=m2", "x", "test", "y")
}
}
}
}
object ActiveObjectComponentFeatureTest {
class CustomRouteBuilder extends RouteBuilder {
def configure = {
from("direct:test").to("active-object:pojo?method=foo")
}
}
}

View file

@ -1,11 +1,14 @@
package se.scalablesolutions.akka.camel.component
import java.util.concurrent.{TimeUnit, CountDownLatch}
import org.apache.camel.RuntimeCamelException
import org.scalatest.{BeforeAndAfterEach, BeforeAndAfterAll, FeatureSpec}
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.actor.{ActorRegistry, Actor}
import se.scalablesolutions.akka.camel.support.{Respond, Countdown, Tester, Retain}
import se.scalablesolutions.akka.camel.{Message, CamelContextManager}
import se.scalablesolutions.akka.camel.support._
class ActorComponentFeatureTest extends FeatureSpec with BeforeAndAfterAll with BeforeAndAfterEach {
override protected def beforeAll = {
@ -20,41 +23,37 @@ class ActorComponentFeatureTest extends FeatureSpec with BeforeAndAfterAll with
feature("Communicate with an actor from a Camel application using actor endpoint URIs") {
import CamelContextManager.template
import Actor._
scenario("one-way communication using actor id") {
val actor = actorOf(new Tester with Retain with Countdown[Message])
actor.start
val actor = actorOf[Tester1].start
val latch = actor.!![CountDownLatch](SetExpectedMessageCount(1)).get
template.sendBody("actor:%s" format actor.id, "Martin")
assert(actor.actor.asInstanceOf[Countdown[Message]].waitFor)
assert(actor.actor.asInstanceOf[Retain].body === "Martin")
assert(latch.await(5000, TimeUnit.MILLISECONDS))
val reply = (actor !! GetRetainedMessage).get.asInstanceOf[Message]
assert(reply.body === "Martin")
}
scenario("one-way communication using actor uuid") {
val actor = actorOf(new Tester with Retain with Countdown[Message])
actor.start
val actor = actorOf[Tester1].start
val latch = actor.!![CountDownLatch](SetExpectedMessageCount(1)).get
template.sendBody("actor:uuid:%s" format actor.uuid, "Martin")
assert(actor.actor.asInstanceOf[Countdown[Message]].waitFor)
assert(actor.actor.asInstanceOf[Retain].body === "Martin")
assert(latch.await(5000, TimeUnit.MILLISECONDS))
val reply = (actor !! GetRetainedMessage).get.asInstanceOf[Message]
assert(reply.body === "Martin")
}
scenario("two-way communication using actor id") {
val actor = actorOf(new Tester with Respond)
actor.start
val actor = actorOf[Tester2].start
assert(template.requestBody("actor:%s" format actor.id, "Martin") === "Hello Martin")
}
scenario("two-way communication using actor uuid") {
val actor = actorOf(new Tester with Respond)
actor.start
val actor = actorOf[Tester2].start
assert(template.requestBody("actor:uuid:%s" format actor.uuid, "Martin") === "Hello Martin")
}
scenario("two-way communication with timeout") {
val actor = actorOf(new Tester {
self.timeout = 1
})
actor.start
val actor = actorOf[Tester3].start
intercept[RuntimeCamelException] {
template.requestBody("actor:uuid:%s" format actor.uuid, "Martin")
}

View file

@ -2,41 +2,41 @@ package se.scalablesolutions.akka.camel.component
import ActorComponentTest._
import java.util.concurrent.TimeoutException
import java.util.concurrent.{CountDownLatch, TimeoutException, TimeUnit}
import org.apache.camel.ExchangePattern
import org.junit.{After, Test}
import org.scalatest.junit.JUnitSuite
import org.scalatest.BeforeAndAfterAll
import se.scalablesolutions.akka.actor.ActorRegistry
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.camel.support.{Countdown, Retain, Tester, Respond}
import se.scalablesolutions.akka.actor.ActorRegistry
import se.scalablesolutions.akka.camel.{Failure, Message}
import se.scalablesolutions.akka.camel.support._
class ActorProducerTest extends JUnitSuite with BeforeAndAfterAll {
@After def tearDown = ActorRegistry.shutdownAll
@Test def shouldSendMessageToActor = {
val actor = actorOf(new Tester with Retain with Countdown[Message])
val actor = actorOf[Tester1].start
val latch = actor.!![CountDownLatch](SetExpectedMessageCount(1)).get
val endpoint = mockEndpoint("actor:uuid:%s" format actor.uuid)
val exchange = endpoint.createExchange(ExchangePattern.InOnly)
actor.start
exchange.getIn.setBody("Martin")
exchange.getIn.setHeader("k1", "v1")
endpoint.createProducer.process(exchange)
actor.actor.asInstanceOf[Countdown[Message]].waitFor
assert(actor.actor.asInstanceOf[Retain].body === "Martin")
assert(actor.actor.asInstanceOf[Retain].headers === Map(Message.MessageExchangeId -> exchange.getExchangeId, "k1" -> "v1"))
assert(latch.await(5000, TimeUnit.MILLISECONDS))
val reply = (actor !! GetRetainedMessage).get.asInstanceOf[Message]
assert(reply.body === "Martin")
assert(reply.headers === Map(Message.MessageExchangeId -> exchange.getExchangeId, "k1" -> "v1"))
}
@Test def shouldSendMessageToActorAndReceiveResponse = {
val actor = actorOf(new Tester with Respond {
val actor = actorOf(new Tester2 {
override def response(msg: Message) = Message(super.response(msg), Map("k2" -> "v2"))
})
}).start
val endpoint = mockEndpoint("actor:uuid:%s" format actor.uuid)
val exchange = endpoint.createExchange(ExchangePattern.InOut)
actor.start
exchange.getIn.setBody("Martin")
exchange.getIn.setHeader("k1", "v1")
endpoint.createProducer.process(exchange)
@ -45,12 +45,11 @@ class ActorProducerTest extends JUnitSuite with BeforeAndAfterAll {
}
@Test def shouldSendMessageToActorAndReceiveFailure = {
val actor = actorOf(new Tester with Respond {
val actor = actorOf(new Tester2 {
override def response(msg: Message) = Failure(new Exception("testmsg"), Map("k3" -> "v3"))
})
}).start
val endpoint = mockEndpoint("actor:uuid:%s" format actor.uuid)
val exchange = endpoint.createExchange(ExchangePattern.InOut)
actor.start
exchange.getIn.setBody("Martin")
exchange.getIn.setHeader("k1", "v1")
endpoint.createProducer.process(exchange)
@ -60,12 +59,9 @@ class ActorProducerTest extends JUnitSuite with BeforeAndAfterAll {
}
@Test def shouldSendMessageToActorAndTimeout: Unit = {
val actor = actorOf(new Tester {
self.timeout = 1
})
val actor = actorOf[Tester3].start
val endpoint = mockEndpoint("actor:uuid:%s" format actor.uuid)
val exchange = endpoint.createExchange(ExchangePattern.InOut)
actor.start
exchange.getIn.setBody("Martin")
intercept[TimeoutException] {
endpoint.createProducer.process(exchange)

View file

@ -1,55 +0,0 @@
package se.scalablesolutions.akka.camel.service
import _root_.org.junit.{Before, After, Test}
import org.scalatest.junit.JUnitSuite
import se.scalablesolutions.akka.camel.Consumer
import se.scalablesolutions.akka.camel.support.{Receive, Countdown}
import se.scalablesolutions.akka.actor.{Actor, ActorRef, ActorRegistry, ActorRegistered, ActorUnregistered}
import Actor._
object PublishRequestorTest {
class PublisherMock extends Actor with Receive[ConsumerEvent] {
var received: ConsumerEvent = _
protected def receive = {
case msg: ConsumerRegistered => onMessage(msg)
case msg: ConsumerUnregistered => onMessage(msg)
}
def onMessage(msg: ConsumerEvent) = received = msg
}
}
class PublishRequestorTest extends JUnitSuite {
import PublishRequestorTest._
var publisher: ActorRef = _
var requestor: ActorRef = _
var consumer: ActorRef = _
@Before def setUp = {
publisher = actorOf(new PublisherMock with Countdown[ConsumerEvent]).start
requestor = actorOf(new PublishRequestor(publisher)).start
consumer = actorOf(new Actor with Consumer {
def endpointUri = "mock:test"
protected def receive = null
}).start
}
@After def tearDown = {
ActorRegistry.shutdownAll
}
@Test def shouldReceiveConsumerRegisteredEvent = {
requestor.!(ActorRegistered(consumer))(None)
publisher.actor.asInstanceOf[Countdown[ConsumerEvent]].waitFor
assert(publisher.actor.asInstanceOf[PublisherMock].received ===
ConsumerRegistered(consumer.actor.getClass.getName, "mock:test", consumer.uuid, true))
}
@Test def shouldReceiveConsumerUnregisteredEvent = {
requestor.!(ActorUnregistered(consumer))(None)
publisher.actor.asInstanceOf[Countdown[ConsumerRegistered]].waitFor
assert(publisher.actor.asInstanceOf[PublisherMock].received ===
ConsumerUnregistered(consumer.actor.getClass.getName, "mock:test", consumer.uuid, true))
}
}

View file

@ -5,45 +5,71 @@ import java.util.concurrent.{TimeUnit, CountDownLatch}
import se.scalablesolutions.akka.camel.Message
import se.scalablesolutions.akka.actor.Actor
trait Receive[T] {
def onMessage(msg: T): Unit
import TestSupport._
object TestSupport {
type Handler = PartialFunction[Any, Any]
}
trait Respond extends Receive[Message] { this: Actor =>
abstract override def onMessage(msg: Message): Unit = {
super.onMessage(msg)
this.self.reply(response(msg))
trait TestActor extends Actor {
def receive = {
case msg => {
handler(msg)
}
}
def handler: Handler
}
class Tester1 extends TestActor with Retain with Countdown {
def handler = retain andThen countdown
}
class Tester2 extends TestActor with Respond {
def handler = respond
}
class Tester3 extends TestActor with Noop {
self.timeout = 1
def handler = noop
}
trait Countdown { this: Actor =>
var latch: CountDownLatch = new CountDownLatch(0)
def countdown: Handler = {
case SetExpectedMessageCount(num) => {
latch = new CountDownLatch(num)
self.reply(latch)
}
case msg => latch.countDown
}
}
trait Respond { this: Actor =>
def respond: Handler = {
case msg: Message => self.reply(response(msg))
}
def response(msg: Message): Any = "Hello %s" format msg.body
}
trait Retain extends Receive[Message] {
var body: Any = _
var headers = Map.empty[String, Any]
abstract override def onMessage(msg: Message): Unit = {
super.onMessage(msg)
body = msg.body
headers = msg.headers
trait Retain { this: Actor =>
var message: Any = _
def retain: Handler = {
case GetRetainedMessage => self.reply(message)
case msg => {
message = msg
msg
}
}
}
trait Countdown[T] extends Receive[T] {
val count = 1
val duration = 5000
val latch = new CountDownLatch(count)
def waitFor = latch.await(duration, TimeUnit.MILLISECONDS)
def countDown = latch.countDown
abstract override def onMessage(msg: T) = {
super.onMessage(msg)
countDown
trait Noop { this: Actor =>
def noop: Handler = {
case msg => msg
}
}
class Tester extends Actor with Receive[Message] {
def receive = {
case msg: Message => onMessage(msg)
}
def onMessage(msg: Message): Unit = {}
}
case class SetExpectedMessageCount(num: Int)
case class GetRetainedMessage()

View file

@ -10,7 +10,7 @@ import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface consume {
public abstract String value();

View file

@ -476,7 +476,7 @@ object ActiveObject extends Logging {
}
private[akka] object AspectInitRegistry {
private[akka] object AspectInitRegistry extends ListenerManagement {
private val initializations = new java.util.concurrent.ConcurrentHashMap[AnyRef, AspectInit]
def initFor(target: AnyRef) = {
@ -485,9 +485,16 @@ private[akka] object AspectInitRegistry {
init
}
def register(target: AnyRef, init: AspectInit) = initializations.put(target, init)
def register(target: AnyRef, init: AspectInit) = {
val res = initializations.put(target, init)
foreachListener(_ ! AspectInitRegistered(target, init))
res
}
}
private[akka] sealed trait AspectInitRegistryEvent
private[akka] case class AspectInitRegistered(proxy: AnyRef, init: AspectInit) extends AspectInitRegistryEvent
private[akka] sealed case class AspectInit(
val target: Class[_],
val actorRef: ActorRef,

View file

@ -228,12 +228,11 @@ object Actor extends Logging {
def spawn(body: => Unit): Unit = {
case object Spawn
actorOf(new Actor() {
self.start
self ! Spawn
def receive = {
case Spawn => body; self.stop
}
})
}).start ! Spawn
}
}
@ -413,6 +412,22 @@ trait Actor extends Logging {
*/
def initTransactionalState {}
/**
* Use <code>reply(..)</code> to reply with a message to the original sender of the message currently
* being processed.
* <p/>
* Throws an IllegalStateException if unable to determine what to reply to.
*/
def reply(message: Any) = self.reply(message)
/**
* Use <code>reply_?(..)</code> to reply with a message to the original sender of the message currently
* being processed.
* <p/>
* Returns true if reply was sent, and false if unable to determine what to reply to.
*/
def reply_?(message: Any): Boolean = self.reply_?(message)
// =========================================
// ==== INTERNAL IMPLEMENTATION DETAILS ====
// =========================================

View file

@ -119,7 +119,7 @@ trait ActorRef extends TransactionManagement {
* <p/>
* Identifier for actor, does not have to be a unique one. Default is the 'uuid'.
* <p/>
* This field is used for logging, AspectRegistry.actorsFor, identifier for remote
* This field is used for logging, AspectRegistry.actorsFor(id), identifier for remote
* actor in RemoteServer etc.But also as the identifier for persistence, which means
* that you can use a custom name to be able to retrieve the "correct" persisted state
* upon restart, remote restart etc.
@ -208,8 +208,8 @@ trait ActorRef extends TransactionManagement {
protected[akka] var _sender: Option[ActorRef] = None
protected[akka] var _senderFuture: Option[CompletableFuture[Any]] = None
protected[akka] def sender_=(s: Option[ActorRef]) = guard.withGuard { _sender = s}
protected[akka] def senderFuture_=(sf: Option[CompletableFuture[Any]]) = guard.withGuard { _senderFuture = sf}
protected[akka] def sender_=(s: Option[ActorRef]) = guard.withGuard { _sender = s }
protected[akka] def senderFuture_=(sf: Option[CompletableFuture[Any]]) = guard.withGuard { _senderFuture = sf }
/**
* The reference sender Actor of the last received message.
@ -243,6 +243,11 @@ trait ActorRef extends TransactionManagement {
*/
def uuid = _uuid
/**
* Tests if the actor is able to handle the message passed in as arguments.
*/
def isDefinedAt(message: Any): Boolean = actor.base.isDefinedAt(message)
/**
* Only for internal use. UUID is effectively final.
*/
@ -838,6 +843,11 @@ sealed class LocalActorRef private[akka](
*/
def mailboxSize: Int = _mailbox.size
/**
* Returns a copy of all the messages, put into a List[MessageInvocation].
*/
def messagesInMailbox: List[MessageInvocation] = _mailbox.toArray.toList.asInstanceOf[List[MessageInvocation]]
/**
* Shuts down and removes all linked actors.
*/
@ -886,7 +896,6 @@ sealed class LocalActorRef private[akka](
}
protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = {
sender = senderOption
joinTransaction(message)
if (remoteAddress.isDefined) {
@ -919,7 +928,6 @@ sealed class LocalActorRef private[akka](
timeout: Long,
senderOption: Option[ActorRef],
senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = {
sender = senderOption
joinTransaction(message)
if (remoteAddress.isDefined) {
@ -969,9 +977,9 @@ sealed class LocalActorRef private[akka](
Actor.log.warning("Actor [%s] is shut down, ignoring message [%s]", toString, messageHandle)
return
}
sender = messageHandle.sender
senderFuture = messageHandle.senderFuture
try {
sender = messageHandle.sender
senderFuture = messageHandle.senderFuture
if (TransactionManagement.isTransactionalityEnabled) transactionalDispatch(messageHandle)
else dispatch(messageHandle)
} catch {
@ -985,9 +993,7 @@ sealed class LocalActorRef private[akka](
val message = messageHandle.message //serializeMessage(messageHandle.message)
setTransactionSet(messageHandle.transactionSet)
try {
if (actor.base.isDefinedAt(message)) actor.base(message) // invoke user actor's receive partial function
else throw new IllegalArgumentException(
"No handler matching message [" + message + "] in " + toString)
actor.base(message)
} catch {
case e =>
_isBeingRestarted = true
@ -1016,20 +1022,16 @@ sealed class LocalActorRef private[akka](
}
setTransactionSet(txSet)
def proceed = {
if (actor.base.isDefinedAt(message)) actor.base(message) // invoke user actor's receive partial function
else throw new IllegalArgumentException(
toString + " could not process message [" + message + "]" +
"\n\tsince no matching 'case' clause in its 'receive' method could be found")
setTransactionSet(txSet) // restore transaction set to allow atomic block to do commit
}
try {
if (isTransactor) {
atomic {
proceed
actor.base(message)
setTransactionSet(txSet) // restore transaction set to allow atomic block to do commit
}
} else proceed
} else {
actor.base(message)
setTransactionSet(txSet) // restore transaction set to allow atomic block to do commit
}
} catch {
case e: IllegalStateException => {}
case e =>

View file

@ -4,11 +4,10 @@
package se.scalablesolutions.akka.actor
import se.scalablesolutions.akka.util.Logging
import scala.collection.mutable.ListBuffer
import scala.reflect.Manifest
import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap}
import se.scalablesolutions.akka.util.ListenerManagement
sealed trait ActorRegistryEvent
case class ActorRegistered(actor: ActorRef) extends ActorRegistryEvent
@ -26,11 +25,10 @@ case class ActorUnregistered(actor: ActorRef) extends ActorRegistryEvent
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object ActorRegistry extends Logging {
object ActorRegistry extends ListenerManagement {
private val actorsByUUID = new ConcurrentHashMap[String, ActorRef]
private val actorsById = new ConcurrentHashMap[String, List[ActorRef]]
private val actorsByClassName = new ConcurrentHashMap[String, List[ActorRef]]
private val registrationListeners = new CopyOnWriteArrayList[ActorRef]
/**
* Returns all actors in the system.
@ -141,29 +139,4 @@ object ActorRegistry extends Logging {
actorsByClassName.clear
log.info("All actors have been shut down and unregistered from ActorRegistry")
}
/**
* Adds the registration <code>listener</code> this this registry's listener list.
*/
def addRegistrationListener(listener: ActorRef) = {
listener.start
registrationListeners.add(listener)
}
/**
* Removes the registration <code>listener</code> this this registry's listener list.
*/
def removeRegistrationListener(listener: ActorRef) = {
listener.stop
registrationListeners.remove(listener)
}
private def foreachListener(f: (ActorRef) => Unit) {
val iterator = registrationListeners.iterator
while (iterator.hasNext) {
val listener = iterator.next
if (listener.isRunning) f(listener)
else log.warning("Can't send ActorRegistryEvent to [%s] since it is not running.", listener)
}
}
}

View file

@ -16,7 +16,7 @@ class ConfigurationException(message: String) extends RuntimeException(message)
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object Config extends Logging {
val VERSION = "0.9"
val VERSION = "0.10"
// Set Multiverse options for max speed
System.setProperty("org.multiverse.MuliverseConstants.sanityChecks", "false")

View file

@ -7,7 +7,7 @@ package se.scalablesolutions.akka.dispatch
import java.util.List
import se.scalablesolutions.akka.util.{HashCode, Logging}
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
import se.scalablesolutions.akka.actor.{Actor, ActorRef, ActorInitializationException}
import java.util.concurrent.ConcurrentHashMap
@ -23,7 +23,12 @@ final class MessageInvocation(val receiver: ActorRef,
val transactionSet: Option[CountDownCommitBarrier]) {
if (receiver eq null) throw new IllegalArgumentException("receiver is null")
def invoke = receiver.invoke(this)
def invoke = try {
receiver.invoke(this)
} catch {
case e: NullPointerException => throw new ActorInitializationException(
"Don't call 'self ! message' in the Actor's constructor (e.g. body of the class).")
}
def send = receiver.dispatcher.dispatch(this)

View file

@ -38,8 +38,11 @@ object RemoteRequestProtocolIdFactory {
def nextId: Long = id.getAndIncrement + nodeId
}
/**
* Life-cycle events for RemoteClient.
*/
sealed trait RemoteClientLifeCycleEvent
case class RemoteClientError(cause: Throwable) extends RemoteClientLifeCycleEvent
case class RemoteClientError(cause: Throwable, host: String, port: Int) extends RemoteClientLifeCycleEvent
case class RemoteClientDisconnected(host: String, port: Int) extends RemoteClientLifeCycleEvent
case class RemoteClientConnected(host: String, port: Int) extends RemoteClientLifeCycleEvent
@ -186,7 +189,7 @@ class RemoteClient private[akka] (val hostname: String, val port: Int, loader: O
val channel = connection.awaitUninterruptibly.getChannel
openChannels.add(channel)
if (!connection.isSuccess) {
listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(connection.getCause))
listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(connection.getCause, hostname, port))
log.error(connection.getCause, "Remote client connection to [%s:%s] has failed", hostname, port)
}
isRunning = true
@ -222,7 +225,7 @@ class RemoteClient private[akka] (val hostname: String, val port: Int, loader: O
}
} else {
val exception = new IllegalStateException("Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.")
listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(exception))
listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(exception, hostname, port))
throw exception
}
@ -311,12 +314,12 @@ class RemoteClientHandler(val name: String,
futures.remove(reply.getId)
} else {
val exception = new IllegalArgumentException("Unknown message received in remote client handler: " + result)
client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(exception))
client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(exception, client.hostname, client.port))
throw exception
}
} catch {
case e: Exception =>
client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(e))
client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(e, client.hostname, client.port))
log.error("Unexpected exception in remote client handler: %s", e)
throw e
}
@ -331,7 +334,7 @@ class RemoteClientHandler(val name: String,
client.connection.awaitUninterruptibly // Wait until the connection attempt succeeds or fails.
if (!client.connection.isSuccess) {
client.listeners.toArray.foreach(l =>
l.asInstanceOf[ActorRef] ! RemoteClientError(client.connection.getCause))
l.asInstanceOf[ActorRef] ! RemoteClientError(client.connection.getCause, client.hostname, client.port))
log.error(client.connection.getCause, "Reconnection to [%s] has failed", remoteAddress)
}
}
@ -351,7 +354,7 @@ class RemoteClientHandler(val name: String,
}
override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(event.getCause))
client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(event.getCause, client.hostname, client.port))
log.error(event.getCause, "Unexpected exception from downstream in remote client")
event.getChannel.close
}

View file

@ -0,0 +1,49 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.util
import java.util.concurrent.CopyOnWriteArrayList
import se.scalablesolutions.akka.actor.ActorRef
/**
* A manager for listener actors. Intended for mixin by observables.
*
* @author Martin Krasser
*/
trait ListenerManagement extends Logging {
private val listeners = new CopyOnWriteArrayList[ActorRef]
/**
* Adds the <code>listener</code> this this registry's listener list.
* The <code>listener</code> is started by this method.
*/
def addListener(listener: ActorRef) = {
listener.start
listeners.add(listener)
}
/**
* Removes the <code>listener</code> this this registry's listener list.
* The <code>listener</code> is stopped by this method.
*/
def removeListener(listener: ActorRef) = {
listener.stop
listeners.remove(listener)
}
/**
* Execute <code>f</code> with each listener as argument.
*/
protected def foreachListener(f: (ActorRef) => Unit) {
val iterator = listeners.iterator
while (iterator.hasNext) {
val listener = iterator.next
if (listener.isRunning) f(listener)
else log.warning("Can't notify [%s] since it is not running.", listener)
}
}
}

View file

@ -0,0 +1,119 @@
/* The Computer Language Benchmarks Game
http://shootout.alioth.debian.org/
contributed by Julien Gaugaz
inspired by the version contributed by Yura Taras and modified by Isaac Gouy
*/
package se.scalablesolutions.akka.actor
import se.scalablesolutions.akka.actor.Actor._
object Chameneos {
sealed trait ChameneosEvent
case class Meet(from: ActorRef, colour: Colour) extends ChameneosEvent
case class Change(colour: Colour) extends ChameneosEvent
case class MeetingCount(count: Int) extends ChameneosEvent
case object Exit extends ChameneosEvent
abstract class Colour
case object RED extends Colour
case object YELLOW extends Colour
case object BLUE extends Colour
case object FADED extends Colour
val colours = Array[Colour](BLUE, RED, YELLOW)
var start = 0L
var end = 0L
class Chameneo(var mall: ActorRef, var colour: Colour, cid: Int) extends Actor {
var meetings = 0
self.start
mall ! Meet(self, colour)
def receive = {
case Meet(from, otherColour) =>
colour = complement(otherColour)
meetings = meetings +1
from ! Change(colour)
mall ! Meet(self, colour)
case Change(newColour) =>
colour = newColour
meetings = meetings +1
mall ! Meet(self, colour)
case Exit =>
colour = FADED
self.sender.get ! MeetingCount(meetings)
}
def complement(otherColour: Colour): Colour = colour match {
case RED => otherColour match {
case RED => RED
case YELLOW => BLUE
case BLUE => YELLOW
case FADED => FADED
}
case YELLOW => otherColour match {
case RED => BLUE
case YELLOW => YELLOW
case BLUE => RED
case FADED => FADED
}
case BLUE => otherColour match {
case RED => YELLOW
case YELLOW => RED
case BLUE => BLUE
case FADED => FADED
}
case FADED => FADED
}
override def toString = cid + "(" + colour + ")"
}
class Mall(var n: Int, numChameneos: Int) extends Actor {
var waitingChameneo: Option[ActorRef] = None
var sumMeetings = 0
var numFaded = 0
override def init = {
for (i <- 0 until numChameneos) actorOf(new Chameneo(self, colours(i % 3), i))
}
def receive = {
case MeetingCount(i) =>
numFaded += 1
sumMeetings += i
if (numFaded == numChameneos) {
Chameneos.end = System.currentTimeMillis
self.stop
}
case msg @ Meet(a, c) =>
if (n > 0) {
waitingChameneo match {
case Some(chameneo) =>
n -= 1
chameneo ! msg
waitingChameneo = None
case None => waitingChameneo = self.sender
}
} else {
waitingChameneo.foreach(_ ! Exit)
self.sender.get ! Exit
}
}
}
def run {
// System.setProperty("akka.config", "akka.conf")
Chameneos.start = System.currentTimeMillis
actorOf(new Mall(1000000, 4)).start
Thread.sleep(10000)
println("Elapsed: " + (end - start))
}
def main(args : Array[String]): Unit = run
}

View file

@ -6,7 +6,7 @@ package se.scalablesolutions.akka.servlet
import se.scalablesolutions.akka.remote.BootableRemoteActorService
import se.scalablesolutions.akka.actor.BootableActorLoaderService
import se.scalablesolutions.akka.camel.service.CamelService
import se.scalablesolutions.akka.camel.CamelService
import se.scalablesolutions.akka.config.Config
import se.scalablesolutions.akka.util.{Logging, Bootable}

View file

@ -7,7 +7,7 @@ package se.scalablesolutions.akka.kernel
import se.scalablesolutions.akka.servlet.AkkaLoader
import se.scalablesolutions.akka.remote.BootableRemoteActorService
import se.scalablesolutions.akka.actor.BootableActorLoaderService
import se.scalablesolutions.akka.camel.service.CamelService
import se.scalablesolutions.akka.camel.CamelService
import se.scalablesolutions.akka.config.Config
object Main {

View file

@ -33,14 +33,6 @@ trait VectorStorageBackend[T] extends StorageBackend {
trait RefStorageBackend[T] extends StorageBackend {
def insertRefStorageFor(name: String, element: T)
def getRefStorageFor(name: String): Option[T]
def incrementAtomically(name: String): Option[Int] =
throw new UnsupportedOperationException // only for redis
def incrementByAtomically(name: String, by: Int): Option[Int] =
throw new UnsupportedOperationException // only for redis
def decrementAtomically(name: String): Option[Int] =
throw new UnsupportedOperationException // only for redis
def decrementByAtomically(name: String, by: Int): Option[Int] =
throw new UnsupportedOperationException // only for redis
}
// for Queue

View file

@ -11,20 +11,45 @@ import se.scalablesolutions.akka.config.Config.config
import com.redis._
trait Encoder {
trait Base64Encoder {
def encode(bytes: Array[Byte]): Array[Byte]
def decode(bytes: Array[Byte]): Array[Byte]
}
trait CommonsCodecBase64 {
import org.apache.commons.codec.binary.Base64._
def encode(bytes: Array[Byte]): Array[Byte] = encodeBase64(bytes)
def decode(bytes: Array[Byte]): Array[Byte] = decodeBase64(bytes)
trait Base64StringEncoder {
def byteArrayToString(bytes: Array[Byte]): String
def stringToByteArray(str: String): Array[Byte]
}
object Base64Encoder extends Encoder with CommonsCodecBase64
import Base64Encoder._
trait NullBase64 {
def encode(bytes: Array[Byte]): Array[Byte] = bytes
def decode(bytes: Array[Byte]): Array[Byte] = bytes
}
object CommonsCodec {
import org.apache.commons.codec.binary.Base64
import org.apache.commons.codec.binary.Base64._
val b64 = new Base64(true)
trait CommonsCodecBase64 {
def encode(bytes: Array[Byte]): Array[Byte] = encodeBase64(bytes)
def decode(bytes: Array[Byte]): Array[Byte] = decodeBase64(bytes)
}
object Base64Encoder extends Base64Encoder with CommonsCodecBase64
trait CommonsCodecBase64StringEncoder {
def byteArrayToString(bytes: Array[Byte]) = encodeBase64URLSafeString(bytes)
def stringToByteArray(str: String) = b64.decode(str)
}
object Base64StringEncoder extends Base64StringEncoder with CommonsCodecBase64StringEncoder
}
import CommonsCodec._
import CommonsCodec.Base64Encoder._
import CommonsCodec.Base64StringEncoder._
/**
* A module for supporting Redis based persistence.
@ -95,7 +120,7 @@ private [akka] object RedisStorageBackend extends
def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[Array[Byte], Array[Byte]]]): Unit = withErrorHandling {
mset(entries.map(e =>
(makeRedisKey(name, e._1), new String(e._2))))
(makeRedisKey(name, e._1), byteArrayToString(e._2))))
}
/**
@ -138,7 +163,7 @@ private [akka] object RedisStorageBackend extends
db.get(makeRedisKey(name, key)) match {
case None =>
throw new NoSuchElementException(new String(key) + " not present")
case Some(s) => Some(s.getBytes)
case Some(s) => Some(stringToByteArray(s))
}
}
@ -155,7 +180,7 @@ private [akka] object RedisStorageBackend extends
case None =>
throw new NoSuchElementException(name + " not present")
case Some(keys) =>
keys.map(key => (makeKeyFromRedisKey(key)._2, db.get(key).get.getBytes)).toList
keys.map(key => (makeKeyFromRedisKey(key)._2, stringToByteArray(db.get(key).get))).toList
}
}
@ -207,7 +232,7 @@ private [akka] object RedisStorageBackend extends
}
def insertVectorStorageEntryFor(name: String, element: Array[Byte]): Unit = withErrorHandling {
db.lpush(new String(encode(name.getBytes)), new String(element))
db.lpush(new String(encode(name.getBytes)), byteArrayToString(element))
}
def insertVectorStorageEntriesFor(name: String, elements: List[Array[Byte]]): Unit = withErrorHandling {
@ -215,14 +240,15 @@ private [akka] object RedisStorageBackend extends
}
def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]): Unit = withErrorHandling {
db.lset(new String(encode(name.getBytes)), index, new String(elem))
db.lset(new String(encode(name.getBytes)), index, byteArrayToString(elem))
}
def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] = withErrorHandling {
db.lindex(new String(encode(name.getBytes)), index) match {
case None =>
throw new NoSuchElementException(name + " does not have element at " + index)
case Some(e) => e.getBytes
case Some(e) =>
stringToByteArray(e)
}
}
@ -246,75 +272,46 @@ private [akka] object RedisStorageBackend extends
case None =>
throw new NoSuchElementException(name + " does not have elements in the range specified")
case Some(l) =>
l map (_.get.getBytes)
l map ( e => stringToByteArray(e.get))
}
}
def getVectorStorageSizeFor(name: String): Int = {
def getVectorStorageSizeFor(name: String): Int = withErrorHandling {
db.llen(new String(encode(name.getBytes))) match {
case None =>
throw new NoSuchElementException(name + " not present")
case Some(l) => l
case Some(l) =>
l
}
}
def insertRefStorageFor(name: String, element: Array[Byte]): Unit = withErrorHandling {
db.set(new String(encode(name.getBytes)), new String(element))
db.set(new String(encode(name.getBytes)), byteArrayToString(element))
}
def insertRefStorageFor(name: String, element: String): Unit = withErrorHandling {
db.set(new String(encode(name.getBytes)), element)
}
def getRefStorageFor(name: String): Option[Array[Byte]] = withErrorHandling {
db.get(new String(encode(name.getBytes))) match {
case None =>
throw new NoSuchElementException(name + " not present")
case Some(s) => Some(s.getBytes)
}
}
override def incrementAtomically(name: String): Option[Int] = withErrorHandling {
db.incr(new String(encode(name.getBytes))) match {
case Some(i) => Some(i)
case None =>
throw new IllegalArgumentException(name + " exception in incr")
}
}
override def incrementByAtomically(name: String, by: Int): Option[Int] = withErrorHandling {
db.incrby(new String(encode(name.getBytes)), by) match {
case Some(i) => Some(i)
case None =>
throw new IllegalArgumentException(name + " exception in incrby")
}
}
override def decrementAtomically(name: String): Option[Int] = withErrorHandling {
db.decr(new String(encode(name.getBytes))) match {
case Some(i) => Some(i)
case None =>
throw new IllegalArgumentException(name + " exception in decr")
}
}
override def decrementByAtomically(name: String, by: Int): Option[Int] = withErrorHandling {
db.decrby(new String(encode(name.getBytes)), by) match {
case Some(i) => Some(i)
case None =>
throw new IllegalArgumentException(name + " exception in decrby")
case Some(s) => Some(stringToByteArray(s))
}
}
// add to the end of the queue
def enqueue(name: String, item: Array[Byte]): Boolean = withErrorHandling {
db.rpush(new String(encode(name.getBytes)), new String(item))
db.rpush(new String(encode(name.getBytes)), byteArrayToString(item))
}
// pop from the front of the queue
def dequeue(name: String): Option[Array[Byte]] = withErrorHandling {
db.lpop(new String(encode(name.getBytes))) match {
case None =>
throw new NoSuchElementException(name + " not present")
case Some(s) =>
Some(s.getBytes)
case Some(s) => Some(stringToByteArray(s))
}
}
@ -336,7 +333,7 @@ private [akka] object RedisStorageBackend extends
case None =>
throw new NoSuchElementException("No element at " + start)
case Some(s) =>
List(s.getBytes)
List(stringToByteArray(s))
}
case n =>
db.lrange(new String(encode(name.getBytes)), start, start + count - 1) match {
@ -344,7 +341,7 @@ private [akka] object RedisStorageBackend extends
throw new NoSuchElementException(
"No element found between " + start + " and " + (start + count - 1))
case Some(es) =>
es.map(_.get.getBytes)
es.map(e => stringToByteArray(e.get))
}
}
}
@ -359,7 +356,7 @@ private [akka] object RedisStorageBackend extends
// add item to sorted set identified by name
def zadd(name: String, zscore: String, item: Array[Byte]): Boolean = withErrorHandling {
db.zadd(new String(encode(name.getBytes)), zscore, new String(item)) match {
db.zadd(new String(encode(name.getBytes)), zscore, byteArrayToString(item)) match {
case Some(1) => true
case _ => false
}
@ -367,7 +364,7 @@ private [akka] object RedisStorageBackend extends
// remove item from sorted set identified by name
def zrem(name: String, item: Array[Byte]): Boolean = withErrorHandling {
db.zrem(new String(encode(name.getBytes)), new String(item)) match {
db.zrem(new String(encode(name.getBytes)), byteArrayToString(item)) match {
case Some(1) => true
case _ => false
}
@ -383,7 +380,7 @@ private [akka] object RedisStorageBackend extends
}
def zscore(name: String, item: Array[Byte]): Option[Float] = withErrorHandling {
db.zscore(new String(encode(name.getBytes)), new String(item)) match {
db.zscore(new String(encode(name.getBytes)), byteArrayToString(item)) match {
case Some(s) => Some(s.toFloat)
case None => None
}
@ -394,7 +391,7 @@ private [akka] object RedisStorageBackend extends
case None =>
throw new NoSuchElementException(name + " not present")
case Some(s) =>
s.map(_.get.getBytes)
s.map(e => stringToByteArray(e.get))
}
}
@ -404,7 +401,7 @@ private [akka] object RedisStorageBackend extends
case None =>
throw new NoSuchElementException(name + " not present")
case Some(l) =>
l.map{ case (elem, score) => (elem.get.getBytes, score.get.toFloat) }
l.map{ case (elem, score) => (stringToByteArray(elem.get), score.get.toFloat) }
}
}

View file

@ -0,0 +1,74 @@
package se.scalablesolutions.akka.persistence.redis
import sbinary._
import sbinary.Operations._
import sbinary.DefaultProtocol._
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
import se.scalablesolutions.akka.config.OneForOneStrategy
import Actor._
import se.scalablesolutions.akka.persistence.common.PersistentVector
import se.scalablesolutions.akka.stm.Transaction.Global._
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.util.Logging
import java.util.{Calendar, Date}
object Serial {
implicit object DateFormat extends Format[Date] {
def reads(in : Input) = new Date(read[Long](in))
def writes(out: Output, value: Date) = write[Long](out, value.getTime)
}
case class Name(id: Int, name: String, address: String, dateOfBirth: Date, dateDied: Option[Date])
implicit val NameFormat: Format[Name] = asProduct5(Name)(Name.unapply(_).get)
}
case class GETFOO(s: String)
case class SETFOO(s: String)
object SampleStorage {
class RedisSampleStorage extends Actor {
self.lifeCycle = Some(LifeCycle(Permanent))
val EVENT_MAP = "akka.sample.map"
private var eventMap = atomic { RedisStorage.getMap(EVENT_MAP) }
import sbinary._
import DefaultProtocol._
import Operations._
import Serial._
import java.util.Calendar
val dtb = Calendar.getInstance.getTime
val n = Name(100, "debasish ghosh", "kolkata", dtb, Some(dtb))
def receive = {
case SETFOO(str) =>
atomic {
eventMap += (str.getBytes, toByteArray[Name](n))
}
self.reply(str)
case GETFOO(str) =>
val ev = atomic {
eventMap.keySet.size
}
println("************* " + ev)
self.reply(ev)
}
}
}
import Serial._
import SampleStorage._
object Runner {
def run {
val proc = actorOf[RedisSampleStorage]
proc.start
val i: Option[String] = proc !! SETFOO("debasish")
println("i = " + i)
val ev: Option[Int] = proc !! GETFOO("debasish")
println(ev)
}
}

View file

@ -9,6 +9,11 @@ import org.junit.runner.RunWith
import se.scalablesolutions.akka.serialization.Serializable
import se.scalablesolutions.akka.serialization.Serializer._
import sbinary._
import sbinary.Operations._
import sbinary.DefaultProtocol._
import java.util.{Calendar, Date}
import RedisStorageBackend._
@RunWith(classOf[JUnitRunner])
@ -39,15 +44,6 @@ class RedisStorageBackendSpec extends
"T-1", "debasish.language".getBytes).get) should equal("java")
}
/**
it("should enter a custom object for transaction T-1") {
val n = Name(100, "debasish", "kolkata")
// insertMapStorageEntryFor("T-1", "debasish.identity".getBytes, Java.out(n))
// insertMapStorageEntryFor("T-1", "debasish.identity".getBytes, n.toBytes)
getMapStorageSizeFor("T-1") should equal(5)
}
**/
it("should enter key/values for another transaction T-2") {
insertMapStorageEntryFor("T-2", "debasish.age".getBytes, "49".getBytes)
insertMapStorageEntryFor("T-2", "debasish.spouse".getBytes, "paramita".getBytes)
@ -61,6 +57,21 @@ class RedisStorageBackendSpec extends
}
}
describe("Store and query long value in maps") {
it("should enter 4 entries in redis for transaction T-1") {
val d = Calendar.getInstance.getTime.getTime
insertMapStorageEntryFor("T-11", "debasish".getBytes,
toByteArray[Long](d))
getMapStorageSizeFor("T-11") should equal(1)
fromByteArray[Long](getMapStorageEntryFor("T-11", "debasish".getBytes).get) should equal(d)
}
it("should remove map storage for T-1 and T2") {
removeMapStorageFor("T-11")
}
}
describe("Range query in maps") {
it("should enter 7 entries in redis for transaction T-5") {
insertMapStorageEntryFor("T-5", "trade.refno".getBytes, "R-123".getBytes)
@ -93,73 +104,61 @@ class RedisStorageBackendSpec extends
}
}
describe("Store and query objects in maps") {
import NameSerialization._
it("should write a Name object and fetch it properly") {
val dtb = Calendar.getInstance.getTime
val n = Name(100, "debasish ghosh", "kolkata", dtb, Some(dtb))
insertMapStorageEntryFor("T-31", "debasish".getBytes, toByteArray[Name](n))
getMapStorageSizeFor("T-31") should equal(1)
fromByteArray[Name](getMapStorageEntryFor("T-31", "debasish".getBytes).get) should equal(n)
}
it("should remove map storage for T31") {
removeMapStorageFor("T-31")
}
}
describe("Store and query in vectors") {
it("should write 4 entries in a vector for transaction T-3") {
insertVectorStorageEntryFor("T-3", "debasish".getBytes)
insertVectorStorageEntryFor("T-3", "maulindu".getBytes)
val n = Name(100, "debasish", "kolkata")
// insertVectorStorageEntryFor("T-3", Java.out(n))
// insertVectorStorageEntryFor("T-3", n.toBytes)
insertVectorStorageEntryFor("T-3", "1200".getBytes)
getVectorStorageSizeFor("T-3") should equal(3)
val dt = Calendar.getInstance.getTime.getTime
insertVectorStorageEntryFor("T-3", toByteArray[Long](dt))
getVectorStorageSizeFor("T-3") should equal(4)
fromByteArray[Long](getVectorStorageEntryFor("T-3", 0)) should equal(dt)
getVectorStorageSizeFor("T-3") should equal(4)
}
}
describe("Store and query objects in vectors") {
import NameSerialization._
it("should write a Name object and fetch it properly") {
val dtb = Calendar.getInstance.getTime
val n = Name(100, "debasish ghosh", "kolkata", dtb, Some(dtb))
insertVectorStorageEntryFor("T-31", toByteArray[Name](n))
getVectorStorageSizeFor("T-31") should equal(1)
fromByteArray[Name](getVectorStorageEntryFor("T-31", 0)) should equal(n)
}
}
describe("Store and query in ref") {
import NameSerialization._
it("should write 4 entries in 4 refs for transaction T-4") {
insertRefStorageFor("T-4", "debasish".getBytes)
insertRefStorageFor("T-4", "maulindu".getBytes)
insertRefStorageFor("T-4", "1200".getBytes)
new String(getRefStorageFor("T-4").get) should equal("1200")
// val n = Name(100, "debasish", "kolkata")
// insertRefStorageFor("T-4", Java.out(n))
// insertRefStorageFor("T-4", n.toBytes)
// Java.in(getRefStorageFor("T-4").get, Some(classOf[Name])).asInstanceOf[Name] should equal(n)
// n.fromBytes(getRefStorageFor("T-4").get) should equal(n)
}
}
describe("atomic increment in ref") {
it("should increment an existing key value by 1") {
insertRefStorageFor("T-4-1", "1200".getBytes)
new String(getRefStorageFor("T-4-1").get) should equal("1200")
incrementAtomically("T-4-1").get should equal(1201)
}
it("should create and increment a non-existing key value by 1") {
incrementAtomically("T-4-2").get should equal(1)
new String(getRefStorageFor("T-4-2").get) should equal("1")
}
it("should increment an existing key value by the amount specified") {
insertRefStorageFor("T-4-3", "1200".getBytes)
new String(getRefStorageFor("T-4-3").get) should equal("1200")
incrementByAtomically("T-4-3", 50).get should equal(1250)
}
it("should create and increment a non-existing key value by the amount specified") {
incrementByAtomically("T-4-4", 20).get should equal(20)
new String(getRefStorageFor("T-4-4").get) should equal("20")
}
}
describe("atomic decrement in ref") {
it("should decrement an existing key value by 1") {
insertRefStorageFor("T-4-5", "1200".getBytes)
new String(getRefStorageFor("T-4-5").get) should equal("1200")
decrementAtomically("T-4-5").get should equal(1199)
}
it("should create and decrement a non-existing key value by 1") {
decrementAtomically("T-4-6").get should equal(-1)
new String(getRefStorageFor("T-4-6").get) should equal("-1")
}
it("should decrement an existing key value by the amount specified") {
insertRefStorageFor("T-4-7", "1200".getBytes)
new String(getRefStorageFor("T-4-7").get) should equal("1200")
decrementByAtomically("T-4-7", 50).get should equal(1150)
}
it("should create and decrement a non-existing key value by the amount specified") {
decrementByAtomically("T-4-8", 20).get should equal(-20)
new String(getRefStorageFor("T-4-8").get) should equal("-20")
it("should write a Name object and fetch it properly") {
val dtb = Calendar.getInstance.getTime
val n = Name(100, "debasish ghosh", "kolkata", dtb, Some(dtb))
insertRefStorageFor("T-4", toByteArray[Name](n))
fromByteArray[Name](getRefStorageFor("T-4").get) should equal(n)
}
}
@ -185,6 +184,14 @@ class RedisStorageBackendSpec extends
new String(l(1)) should equal("yukihiro matsumoto")
new String(l(2)) should equal("claude shannon")
}
it("should write a Name object and fetch it properly") {
import NameSerialization._
val dtb = Calendar.getInstance.getTime
val n = Name(100, "debasish ghosh", "kolkata", dtb, Some(dtb))
enqueue("T-5-1", toByteArray[Name](n))
fromByteArray[Name](peek("T-5-1", 0, 1).head) should equal(n)
fromByteArray[Name](dequeue("T-5-1").get) should equal(n)
}
}
describe("store and query in sorted set") {
@ -221,27 +228,18 @@ class RedisStorageBackendSpec extends
}
}
case class Name(id: Int, name: String, address: String)
extends Serializable.SBinary[Name] {
import sbinary._
import sbinary.Operations._
import sbinary.DefaultProtocol._
object NameSerialization {
implicit object DateFormat extends Format[Date] {
def reads(in : Input) =
new Date(read[Long](in))
def this() = this(0, null, null)
implicit object NameFormat extends Format[Name] {
def reads(in : Input) = Name(
read[Int](in),
read[String](in),
read[String](in))
def writes(out: Output, value: Name) = {
write[Int](out, value.id)
write[String](out, value.name)
write[String](out, value.address)
}
def writes(out: Output, value: Date) =
write[Long](out, value.getTime)
}
def fromBytes(bytes: Array[Byte]) = fromByteArray[Name](bytes)
case class Name(id: Int, name: String,
address: String, dateOfBirth: Date, dateDied: Option[Date])
def toBytes: Array[Byte] = toByteArray(this)
implicit val NameFormat: Format[Name] =
asProduct5(Name)(Name.unapply(_).get)
}

View file

@ -3,38 +3,33 @@ Ants
Ants is written by Peter Vlugter.
Ants is based on the Clojure [ants simulation][ants.clj] by Rich Hickey, and ported to Scala using [Akka][akka] and [Spde][spde].
[ants.clj]:http://clojure.googlegroups.com/web/ants.clj
[akka]:http://akkasource.org
[spde]:http://technically.us/spde/
Ants is roughly based on the Clojure [ants simulation][ants.clj] by Rich Hickey, and ported to Scala using [Akka][akka] and [Spde][spde].
Requirements
------------
To build and run Ants you need [Simple Build Tool][sbt] (sbt).
[sbt]: http://code.google.com/p/simple-build-tool/
Running
-------
First time, 'sbt update' to get dependencies, then to run Ants use 'sbt run'.
Here is an example. First type 'sbt' to start SBT interactively, the run 'update' and 'run':
> cd $AKKA_HOME
> % sbt
> update
> > update
> run
> > project akka-sample-ants
> > run
Notice
------
Ants is based on the Clojure ants simulation by Rich Hickey.
Ants is roughly based on the Clojure ants simulation by Rich Hickey.
Copyright (c) Rich Hickey. All rights reserved.
The use and distribution terms for this software are covered by the
@ -44,4 +39,8 @@ By using this software in any fashion, you are agreeing to be bound by
the terms of this license.
You must not remove this notice, or any other, from this software.
[ants.clj]:http://clojure.googlegroups.com/web/ants.clj
[akka]:http://akkasource.org
[spde]:http://technically.us/spde/
[sbt]: http://code.google.com/p/simple-build-tool/
[cpl]: http://opensource.org/licenses/cpl1.0.php

View file

@ -0,0 +1,12 @@
package sample.camel;
/**
* @author Martin Krasser
*/
public class BeanImpl implements BeanIntf {
public String foo(String s) {
return "hello " + s;
}
}

View file

@ -0,0 +1,10 @@
package sample.camel;
/**
* @author Martin Krasser
*/
public interface BeanIntf {
public String foo(String s);
}

View file

@ -0,0 +1,24 @@
package sample.camel;
import org.apache.camel.Body;
import org.apache.camel.Header;
import se.scalablesolutions.akka.actor.annotation.consume;
/**
* @author Martin Krasser
*/
public class ConsumerPojo1 {
@consume("file:data/input/pojo")
public void foo(String body) {
System.out.println("Received message:");
System.out.println(body);
}
@consume("jetty:http://0.0.0.0:8877/camel/pojo")
public String bar(@Body String body, @Header("name") String header) {
return String.format("body=%s header=%s", body, header);
}
}

View file

@ -0,0 +1,17 @@
package sample.camel;
import org.apache.camel.Body;
import org.apache.camel.Header;
import se.scalablesolutions.akka.actor.annotation.consume;
/**
* @author Martin Krasser
*/
public class ConsumerPojo2 {
@consume("direct:default")
public String foo(String body) {
return String.format("default: %s", body);
}
}

View file

@ -0,0 +1,18 @@
package sample.camel;
import org.apache.camel.Body;
import org.apache.camel.Header;
import se.scalablesolutions.akka.actor.annotation.consume;
/**
* @author Martin Krasser
*/
public class RemoteConsumerPojo1 {
@consume("jetty:http://localhost:6644/camel/remote-active-object-1")
public String foo(@Body String body, @Header("name") String header) {
return String.format("remote1: body=%s header=%s", body, header);
}
}

View file

@ -0,0 +1,17 @@
package sample.camel;
import org.apache.camel.Body;
import org.apache.camel.Header;
import se.scalablesolutions.akka.actor.annotation.consume;
/**
* @author Martin Krasser
*/
public class RemoteConsumerPojo2 {
@consume("jetty:http://localhost:6644/camel/remote-active-object-2")
public String foo(@Body String body, @Header("name") String header) {
return String.format("remote2: body=%s header=%s", body, header);
}
}

View file

@ -0,0 +1,12 @@
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:akka="http://www.akkasource.org/schema/akka"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.akkasource.org/schema/akka
http://scalablesolutions.se/akka/akka.xsd">
<akka:active-object id="blah2" target="sample.camel.BeanImpl" timeout="1000" />
</beans>

View file

@ -9,10 +9,10 @@ import se.scalablesolutions.akka.util.Logging
* Client-initiated remote actor.
*/
class RemoteActor1 extends RemoteActor("localhost", 7777) with Consumer {
def endpointUri = "jetty:http://localhost:6644/remote1"
def endpointUri = "jetty:http://localhost:6644/camel/remote-actor-1"
protected def receive = {
case msg: Message => self.reply(Message("hello %s" format msg.body, Map("sender" -> "remote1")))
case msg: Message => self.reply(Message("hello %s" format msg.bodyAs[String], Map("sender" -> "remote1")))
}
}
@ -20,10 +20,10 @@ class RemoteActor1 extends RemoteActor("localhost", 7777) with Consumer {
* Server-initiated remote actor.
*/
class RemoteActor2 extends Actor with Consumer {
def endpointUri = "jetty:http://localhost:6644/remote2"
def endpointUri = "jetty:http://localhost:6644/camel/remote-actor-2"
protected def receive = {
case msg: Message => self.reply(Message("hello %s" format msg.body, Map("sender" -> "remote2")))
case msg: Message => self.reply(Message("hello %s" format msg.bodyAs[String], Map("sender" -> "remote2")))
}
}
@ -37,14 +37,14 @@ class Producer1 extends Actor with Producer {
}
class Consumer1 extends Actor with Consumer with Logging {
def endpointUri = "file:data/input"
def endpointUri = "file:data/input/actor"
def receive = {
case msg: Message => log.info("received %s" format msg.bodyAs[String])
}
}
@consume("jetty:http://0.0.0.0:8877/camel/test1")
@consume("jetty:http://0.0.0.0:8877/camel/default")
class Consumer2 extends Actor {
def receive = {
case msg: Message => self.reply("Hello %s" format msg.bodyAs[String])

View file

@ -1,29 +0,0 @@
package sample.camel
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.camel.Message
import se.scalablesolutions.akka.remote.RemoteClient
/**
* @author Martin Krasser
*/
object Application1 {
//
// TODO: completion of example
//
def main(args: Array[String]) {
implicit val sender: Option[ActorRef] = None
val actor1 = actorOf[RemoteActor1]
val actor2 = RemoteClient.actorFor("remote2", "localhost", 7777)
actor1.start
println(actor1 !! Message("actor1"))
println(actor2 !! Message("actor2"))
}
}

View file

@ -6,8 +6,8 @@ import org.apache.camel.impl.DefaultCamelContext
import org.apache.camel.spring.spi.ApplicationContextRegistry
import org.springframework.context.support.ClassPathXmlApplicationContext
import se.scalablesolutions.akka.actor.SupervisorFactory
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.actor.{ActiveObject, Supervisor}
import se.scalablesolutions.akka.camel.CamelContextManager
import se.scalablesolutions.akka.config.ScalaConfig._
@ -16,23 +16,29 @@ import se.scalablesolutions.akka.config.ScalaConfig._
*/
class Boot {
// -----------------------------------------------------------------------
// Create CamelContext with Spring-based registry and custom route builder
// -----------------------------------------------------------------------
val context = new ClassPathXmlApplicationContext("/sample-camel-context.xml", getClass)
val context = new ClassPathXmlApplicationContext("/context-boot.xml", getClass)
val registry = new ApplicationContextRegistry(context)
CamelContextManager.init(new DefaultCamelContext(registry))
CamelContextManager.context.addRoutes(new CustomRouteBuilder)
// Basic example
// -----------------------------------------------------------------------
// Basic example (using a supervisor for consumer actors)
// -----------------------------------------------------------------------
val factory = SupervisorFactory(
val supervisor = Supervisor(
SupervisorConfig(
RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])),
Supervise(actorOf[Consumer1], LifeCycle(Permanent)) ::
Supervise(actorOf[Consumer2], LifeCycle(Permanent)) :: Nil))
factory.newInstance.start
// -----------------------------------------------------------------------
// Routing example
// -----------------------------------------------------------------------
val producer = actorOf[Producer1]
val mediator = actorOf(new Transformer(producer))
@ -42,7 +48,9 @@ class Boot {
mediator.start
consumer.start
// Publish subscribe example
// -----------------------------------------------------------------------
// Publish subscribe examples
// -----------------------------------------------------------------------
//
// Cometd example commented out because camel-cometd is broken in Camel 2.3
@ -60,14 +68,27 @@ class Boot {
//val cometdPublisherBridge = actorOf(new PublisherBridge("jetty:http://0.0.0.0:8877/camel/pub/cometd", cometdPublisher)).start
val jmsPublisherBridge = actorOf(new PublisherBridge("jetty:http://0.0.0.0:8877/camel/pub/jms", jmsPublisher)).start
// -----------------------------------------------------------------------
// Actor un-publishing and re-publishing example
// -----------------------------------------------------------------------
actorOf[Consumer4].start // POSTing "stop" to http://0.0.0.0:8877/camel/stop stops and unpublishes this actor
actorOf[Consumer5].start // POSTing any msg to http://0.0.0.0:8877/camel/start starts and published Consumer4 again.
// -----------------------------------------------------------------------
// Active object example
// -----------------------------------------------------------------------
ActiveObject.newInstance(classOf[ConsumerPojo1])
}
/**
* @author Martin Krasser
*/
class CustomRouteBuilder extends RouteBuilder {
def configure {
val actorUri = "actor:%s" format classOf[Consumer2].getName
from("jetty:http://0.0.0.0:8877/camel/test2").to(actorUri)
from("jetty:http://0.0.0.0:8877/camel/custom").to(actorUri)
from("direct:welcome").process(new Processor() {
def process(exchange: Exchange) {
exchange.getOut.setBody("Welcome %s" format exchange.getIn.getBody)

View file

@ -0,0 +1,33 @@
package sample.camel
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.actor.{ActiveObject, Actor, ActorRef}
import se.scalablesolutions.akka.camel.Message
import se.scalablesolutions.akka.remote.RemoteClient
/**
* @author Martin Krasser
*/
object ClientApplication {
//
// TODO: completion of example
//
def main(args: Array[String]) {
val actor1 = actorOf[RemoteActor1]
val actor2 = RemoteClient.actorFor("remote2", "localhost", 7777)
val actobj1 = ActiveObject.newRemoteInstance(classOf[RemoteConsumerPojo1], "localhost", 7777)
//val actobj2 = TODO: create reference to server-managed active object (RemoteConsumerPojo2)
actor1.start
println(actor1 !! Message("actor1")) // activates and publishes actor remotely
println(actor2 !! Message("actor2")) // actor already activated and published remotely
println(actobj1.foo("x", "y")) // activates and publishes active object methods remotely
// ...
}
}

View file

@ -1,13 +1,13 @@
package sample.camel
import se.scalablesolutions.akka.camel.service.CamelService
import se.scalablesolutions.akka.remote.RemoteNode
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.camel.CamelService
import se.scalablesolutions.akka.remote.RemoteNode
/**
* @author Martin Krasser
*/
object Application2 {
object ServerApplication {
//
// TODO: completion of example

View file

@ -0,0 +1,60 @@
package sample.camel
import org.apache.camel.impl.{DefaultCamelContext, SimpleRegistry}
import org.apache.camel.builder.RouteBuilder
import se.scalablesolutions.akka.camel.{CamelService, CamelContextManager}
import se.scalablesolutions.akka.actor.{ActorRegistry, ActiveObject}
/**
* @author Martin Krasser
*/
object PlainApplication {
def main(args: Array[String]) {
import CamelContextManager.context
// 'externally' register active objects
val registry = new SimpleRegistry
registry.put("pojo1", ActiveObject.newInstance(classOf[BeanIntf], new BeanImpl))
registry.put("pojo2", ActiveObject.newInstance(classOf[BeanImpl]))
// customize CamelContext
CamelContextManager.init(new DefaultCamelContext(registry))
CamelContextManager.context.addRoutes(new PlainApplicationRoute)
// start CamelService
val camelService = CamelService.newInstance
camelService.load
// 'internally' register active object (requires CamelService)
ActiveObject.newInstance(classOf[ConsumerPojo2])
// access 'externally' registered active objects with active-object component
assert("hello msg1" == context.createProducerTemplate.requestBody("direct:test1", "msg1"))
assert("hello msg2" == context.createProducerTemplate.requestBody("direct:test2", "msg2"))
// internal registration is done in background. Wait a bit ...
Thread.sleep(1000)
// access 'internally' (automatically) registered active-objects
// (see @consume annotation value at ConsumerPojo2.foo method)
assert("default: msg3" == context.createProducerTemplate.requestBody("direct:default", "msg3"))
// shutdown CamelService
camelService.unload
// shutdown all (internally) created actors
ActorRegistry.shutdownAll
}
}
class PlainApplicationRoute extends RouteBuilder {
def configure = {
from("direct:test1").to("active-object:pojo1?method=foo")
from("direct:test2").to("active-object:pojo2?method=foo")
}
}
object SpringApplication {
// TODO
}

View file

@ -123,11 +123,15 @@ class PersistentSimpleServiceActor extends Transactor {
def receive = {
case "Tick" => if (hasStartedTicking) {
val bytes = storage.get(KEY.getBytes).get
val counter = ByteBuffer.wrap(bytes).getInt
storage.put(KEY.getBytes, ByteBuffer.allocate(4).putInt(counter + 1).array)
val counter = Integer.parseInt(new String(bytes, "UTF8"))
storage.put(KEY.getBytes, (counter + 1).toString.getBytes )
// val bytes = storage.get(KEY.getBytes).get
// val counter = ByteBuffer.wrap(bytes).getInt
// storage.put(KEY.getBytes, ByteBuffer.allocate(4).putInt(counter + 1).array)
self.reply(<success>Tick:{counter + 1}</success>)
} else {
storage.put(KEY.getBytes, Array(0.toByte))
storage.put(KEY.getBytes, "0".getBytes)
// storage.put(KEY.getBytes, Array(0.toByte))
hasStartedTicking = true
self.reply(<success>Tick: 0</success>)
}

View file

@ -15,7 +15,7 @@
</log>
<akka>
version = "0.9"
version = "0.10"
# FQN (Fully Qualified Name) to the class doing initial active object/actor
# supervisor bootstrap, should be defined in default constructor

View file

@ -1,6 +1,6 @@
project.organization=se.scalablesolutions.akka
project.name=akka
project.version=0.9
project.version=0.10
scala.version=2.8.0.RC3
sbt.version=0.7.4
def.scala.version=2.7.7

View file

@ -48,7 +48,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) with AkkaWrappe
// must be resolved from a ModuleConfiguration. This will result in a significant acceleration of the update action.
// Therefore, if repositories are defined, this must happen as def, not as val.
// -------------------------------------------------------------------------------------------------------------------
val embeddedRepo = "Embedded Repo" at (info.projectPath / "embedded-repo").asURL.toString // Fast enough => No need for a module configuration here!
val embeddedRepo = "Embedded Repo" at (info.projectPath / "embedded-repo").asURL.toString
val scalaTestModuleConfig = ModuleConfiguration("org.scalatest", ScalaToolsSnapshots)
def guiceyFruitRepo = "GuiceyFruit Repo" at "http://guiceyfruit.googlecode.com/svn/repo/releases/"
val guiceyFruitModuleConfig = ModuleConfiguration("org.guiceyfruit", guiceyFruitRepo)
@ -192,7 +192,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) with AkkaWrappe
val guicey = "org.guiceyfruit" % "guice-all" % "2.0" % "compile"
val aopalliance = "aopalliance" % "aopalliance" % "1.0" % "compile"
val protobuf = "com.google.protobuf" % "protobuf-java" % "2.3.0" % "compile"
val multiverse = "org.multiverse" % "multiverse-alpha" % MULTIVERSE_VERSION % "compile"
val multiverse = "org.multiverse" % "multiverse-alpha" % MULTIVERSE_VERSION % "compile" intransitive()
val jgroups = "jgroups" % "jgroups" % "2.9.0.GA" % "compile"
// testing
@ -243,6 +243,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) with AkkaWrappe
class AkkaRedisProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) {
val redis = "com.redis" % "redisclient" % "2.8.0.RC3-1.4-SNAPSHOT" % "compile"
val commons_codec = "commons-codec" % "commons-codec" % "1.4" % "compile"
override def testOptions = TestFilter((name: String) => name.endsWith("Test")) :: Nil
}
@ -367,8 +368,8 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) with AkkaWrappe
def removeDupEntries(paths: PathFinder) =
Path.lazyPathFinder {
val mapped = paths.get map { p => (p.relativePath, p) }
(Map() ++ mapped).values.toList
}
(Map() ++ mapped).values.toList
}
def allArtifacts = {
Path.fromFile(buildScalaInstance.libraryJar) +++