ActiveObjectComponent now written in Scala

This commit is contained in:
Martin Krasser 2010-06-03 07:15:38 +02:00
parent 9af36d6496
commit bfa3ee2a2b
6 changed files with 98 additions and 131 deletions

View file

@ -1,46 +0,0 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.camel.component;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.camel.Endpoint;
import org.apache.camel.Processor;
import org.apache.camel.component.bean.BeanComponent;
import org.apache.camel.component.bean.BeanEndpoint;
import org.apache.camel.component.bean.BeanHolder;
/**
* Camel component for accessing active objects.
*
* @author Martin Krasser
*/
public class ActiveObjectComponent extends BeanComponent {
public static final String DEFAULT_SCHEMA = "actobj";
private Map<String, Object> registry = new ConcurrentHashMap<String, Object>();
public Map<String, Object> getActiveObjectRegistry() {
return registry;
}
protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
BeanEndpoint beanEndpoint = new BeanEndpoint(uri, this);
beanEndpoint.setBeanName(remaining);
beanEndpoint.setBeanHolder(createBeanHolder(remaining));
Processor processor = beanEndpoint.getProcessor();
setProperties(processor, parameters);
return beanEndpoint;
}
private BeanHolder createBeanHolder(String beanName) throws Exception {
BeanHolder holder = new ActiveObjectHolder(registry, getCamelContext(), beanName).createCacheHolder();
registry.remove(beanName);
return holder;
}
}

View file

@ -1,36 +0,0 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.camel.component;
import java.util.Map;
import org.apache.camel.CamelContext;
import org.apache.camel.NoSuchBeanException;
import org.apache.camel.component.bean.BeanInfo;
import org.apache.camel.component.bean.RegistryBean;
/**
* @author Martin Krasser
*/
public class ActiveObjectHolder extends RegistryBean {
private Map<String, Object> activeObjectRegistry;
public ActiveObjectHolder(Map<String, Object> activeObjectRegistry, CamelContext context, String name) {
super(context, name);
this.activeObjectRegistry = activeObjectRegistry;
}
@Override
public BeanInfo getBeanInfo() {
return new BeanInfo(getContext(), getBean().getClass(), getParameterMappingStrategy());
}
@Override
public Object getBean() throws NoSuchBeanException {
return activeObjectRegistry.get(getName());
}
}

View file

@ -4,6 +4,8 @@
package se.scalablesolutions.akka.camel
import java.util.Map
import org.apache.camel.{ProducerTemplate, CamelContext}
import org.apache.camel.impl.DefaultCamelContext
@ -35,7 +37,7 @@ trait CamelContextLifecycle extends Logging {
* Registry in which active objects are TEMPORARILY registered during
* creation of Camel routes to active objects.
*/
private[camel] var activeObjectRegistry: java.util.Map[String, AnyRef] = _
private[camel] var activeObjectRegistry: Map[String, AnyRef] = _
/**
* Returns the managed CamelContext.
@ -93,10 +95,10 @@ trait CamelContextLifecycle extends Logging {
*/
def init(context: CamelContext) {
this.activeObjectComponent = new ActiveObjectComponent
this.activeObjectRegistry = activeObjectComponent.getActiveObjectRegistry
this.activeObjectRegistry = activeObjectComponent.activeObjectRegistry
this.context = context
this.context.setStreamCaching(true)
this.context.addComponent(ActiveObjectComponent.DEFAULT_SCHEMA, activeObjectComponent)
this.context.addComponent(ActiveObjectComponent.DefaultSchema, activeObjectComponent)
this.template = context.createProducerTemplate
_initialized = true
log.info("Camel context initialized")

View file

@ -0,0 +1,67 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.camel.component
import java.util.Map
import java.util.concurrent.ConcurrentHashMap
import org.apache.camel.CamelContext
import org.apache.camel.component.bean._
/**
* @author Martin Krasser
*/
object ActiveObjectComponent {
val DefaultSchema = "actobj"
}
/**
* @author Martin Krasser
*/
class ActiveObjectComponent extends BeanComponent {
val activeObjectRegistry = new ConcurrentHashMap[String, AnyRef]
override def createEndpoint(uri: String, remaining: String, parameters: Map[String, AnyRef]) = {
val endpoint = new BeanEndpoint(uri, this)
endpoint.setBeanName(remaining)
endpoint.setBeanHolder(createBeanHolder(remaining))
setProperties(endpoint.getProcessor, parameters)
endpoint
}
private def createBeanHolder(beanName: String) =
new ActiveObjectHolder(activeObjectRegistry, getCamelContext, beanName).createCacheHolder
}
/**
* @author Martin Krasser
*/
class ActiveObjectHolder(activeObjectRegistry: Map[String, AnyRef], context: CamelContext, name: String)
extends RegistryBean(context, name) {
override def getBeanInfo: BeanInfo =
new ActiveObjectInfo(getContext, getBean.getClass, getParameterMappingStrategy)
override def getBean: AnyRef =
activeObjectRegistry.get(getName)
}
/**
* @author Martin Krasser
*/
class ActiveObjectInfo(context: CamelContext, clazz: Class[_], strategy: ParameterMappingStrategy)
extends BeanInfo(context, clazz, strategy) {
protected override def introspect(clazz: Class[_]): Unit = {
for (method <- clazz.getDeclaredMethods) {
if (isValidMethod(clazz, method)) {
introspect(clazz, method)
}
}
val superclass = clazz.getSuperclass
if (superclass != null && !superclass.equals(classOf[Any])) {
introspect(superclass)
}
}
}

View file

@ -3,6 +3,8 @@
*/
package se.scalablesolutions.akka.camel.service
import collection.mutable.ListBuffer
import java.io.InputStream
import java.lang.reflect.Method
import java.util.concurrent.CountDownLatch
@ -12,9 +14,8 @@ import org.apache.camel.builder.RouteBuilder
import se.scalablesolutions.akka.actor._
import se.scalablesolutions.akka.actor.annotation.consume
import se.scalablesolutions.akka.camel.{Consumer, CamelContextManager}
import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.camel.component.ActiveObjectComponent
import collection.mutable.ListBuffer
import se.scalablesolutions.akka.util.Logging
/**
* Actor that publishes consumer actors as Camel endpoints at the CamelContext managed
@ -78,7 +79,7 @@ class ConsumerPublisher extends Actor with Logging {
* Creates a route to the registered consumer actor.
*/
private def handleConsumerRegistered(event: ConsumerRegistered) {
CamelContextManager.context.addRoutes(new ConsumerRoute(event.uri, event.id, event.uuid))
CamelContextManager.context.addRoutes(new ConsumerActorRoute(event.uri, event.id, event.uuid))
log.info("published actor %s (%s) at endpoint %s" format (event.clazz, event.id, event.uri))
}
@ -102,6 +103,23 @@ class ConsumerPublisher extends Actor with Logging {
}
}
abstract class ConsumerRoute(endpointUri: String, id: String) extends RouteBuilder {
// TODO: make conversions configurable
private val bodyConversions = Map(
"file" -> classOf[InputStream]
)
def configure = {
val schema = endpointUri take endpointUri.indexOf(":") // e.g. "http" from "http://whatever/..."
bodyConversions.get(schema) match {
case Some(clazz) => from(endpointUri).routeId(id).convertBodyTo(clazz).to(targetUri)
case None => from(endpointUri).routeId(id).to(targetUri)
}
}
protected def targetUri: String
}
/**
* Defines the route to a consumer actor.
*
@ -112,50 +130,12 @@ class ConsumerPublisher extends Actor with Logging {
*
* @author Martin Krasser
*/
class ConsumerRoute(val endpointUri: String, id: String, uuid: Boolean) extends RouteBuilder {
//
//
// TODO: factor out duplicated code from ConsumerRoute and ConsumerMethodRoute
//
//
// TODO: make conversions configurable
private val bodyConversions = Map(
"file" -> classOf[InputStream]
)
def configure = {
val schema = endpointUri take endpointUri.indexOf(":") // e.g. "http" from "http://whatever/..."
bodyConversions.get(schema) match {
case Some(clazz) => from(endpointUri).routeId(id).convertBodyTo(clazz).to(actorUri)
case None => from(endpointUri).routeId(id).to(actorUri)
}
}
private def actorUri = (if (uuid) "actor:uuid:%s" else "actor:id:%s") format id
class ConsumerActorRoute(endpointUri: String, id: String, uuid: Boolean) extends ConsumerRoute(endpointUri, id) {
protected override def targetUri = (if (uuid) "actor:uuid:%s" else "actor:id:%s") format id
}
class ConsumerMethodRoute(val endpointUri: String, id: String, method: String) extends RouteBuilder {
//
//
// TODO: factor out duplicated code from ConsumerRoute and ConsumerMethodRoute
//
//
// TODO: make conversions configurable
private val bodyConversions = Map(
"file" -> classOf[InputStream]
)
def configure = {
val schema = endpointUri take endpointUri.indexOf(":") // e.g. "http" from "http://whatever/..."
bodyConversions.get(schema) match {
case Some(clazz) => from(endpointUri).convertBodyTo(clazz).to(activeObjectUri)
case None => from(endpointUri).to(activeObjectUri)
}
}
private def activeObjectUri = "%s:%s?method=%s" format (ActiveObjectComponent.DEFAULT_SCHEMA, id, method)
class ConsumerMethodRoute(val endpointUri: String, id: String, method: String) extends ConsumerRoute(endpointUri, id) {
protected override def targetUri = "%s:%s?method=%s" format (ActiveObjectComponent.DefaultSchema, id, method)
}
/**

View file

@ -3,9 +3,9 @@ package se.scalablesolutions.akka.camel.service
import org.apache.camel.builder.RouteBuilder
import org.scalatest.{GivenWhenThen, BeforeAndAfterAll, FeatureSpec}
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.actor.{Actor, ActorRegistry}
import se.scalablesolutions.akka.camel.{CamelContextManager, Message, Consumer}
import Actor._
object CamelServiceFeatureTest {