diff --git a/akka-camel/src/main/java/se/scalablesolutions/akka/camel/component/ActiveObjectComponent.java b/akka-camel/src/main/java/se/scalablesolutions/akka/camel/component/ActiveObjectComponent.java deleted file mode 100644 index f84788e1fe..0000000000 --- a/akka-camel/src/main/java/se/scalablesolutions/akka/camel/component/ActiveObjectComponent.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Copyright (C) 2009-2010 Scalable Solutions AB - */ - -package se.scalablesolutions.akka.camel.component; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.camel.Endpoint; -import org.apache.camel.Processor; -import org.apache.camel.component.bean.BeanComponent; -import org.apache.camel.component.bean.BeanEndpoint; -import org.apache.camel.component.bean.BeanHolder; - -/** - * Camel component for accessing active objects. - * - * @author Martin Krasser - */ -public class ActiveObjectComponent extends BeanComponent { - - public static final String DEFAULT_SCHEMA = "actobj"; - - private Map registry = new ConcurrentHashMap(); - - public Map getActiveObjectRegistry() { - return registry; - } - - protected Endpoint createEndpoint(String uri, String remaining, Map parameters) throws Exception { - BeanEndpoint beanEndpoint = new BeanEndpoint(uri, this); - beanEndpoint.setBeanName(remaining); - beanEndpoint.setBeanHolder(createBeanHolder(remaining)); - Processor processor = beanEndpoint.getProcessor(); - setProperties(processor, parameters); - return beanEndpoint; - } - - private BeanHolder createBeanHolder(String beanName) throws Exception { - BeanHolder holder = new ActiveObjectHolder(registry, getCamelContext(), beanName).createCacheHolder(); - registry.remove(beanName); - return holder; - } - -} diff --git a/akka-camel/src/main/java/se/scalablesolutions/akka/camel/component/ActiveObjectHolder.java b/akka-camel/src/main/java/se/scalablesolutions/akka/camel/component/ActiveObjectHolder.java deleted file mode 100644 index 289faeeac1..0000000000 --- a/akka-camel/src/main/java/se/scalablesolutions/akka/camel/component/ActiveObjectHolder.java +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Copyright (C) 2009-2010 Scalable Solutions AB - */ - -package se.scalablesolutions.akka.camel.component; - -import java.util.Map; - -import org.apache.camel.CamelContext; -import org.apache.camel.NoSuchBeanException; -import org.apache.camel.component.bean.BeanInfo; -import org.apache.camel.component.bean.RegistryBean; - -/** - * @author Martin Krasser - */ -public class ActiveObjectHolder extends RegistryBean { - - private Map activeObjectRegistry; - - public ActiveObjectHolder(Map activeObjectRegistry, CamelContext context, String name) { - super(context, name); - this.activeObjectRegistry = activeObjectRegistry; - } - - @Override - public BeanInfo getBeanInfo() { - return new BeanInfo(getContext(), getBean().getClass(), getParameterMappingStrategy()); - } - - @Override - public Object getBean() throws NoSuchBeanException { - return activeObjectRegistry.get(getName()); - } - -} diff --git a/akka-camel/src/main/scala/CamelContextLifecycle.scala b/akka-camel/src/main/scala/CamelContextLifecycle.scala index 1015b56b64..2285534fc5 100644 --- a/akka-camel/src/main/scala/CamelContextLifecycle.scala +++ b/akka-camel/src/main/scala/CamelContextLifecycle.scala @@ -4,6 +4,8 @@ package se.scalablesolutions.akka.camel +import java.util.Map + import org.apache.camel.{ProducerTemplate, CamelContext} import org.apache.camel.impl.DefaultCamelContext @@ -35,7 +37,7 @@ trait CamelContextLifecycle extends Logging { * Registry in which active objects are TEMPORARILY registered during * creation of Camel routes to active objects. */ - private[camel] var activeObjectRegistry: java.util.Map[String, AnyRef] = _ + private[camel] var activeObjectRegistry: Map[String, AnyRef] = _ /** * Returns the managed CamelContext. @@ -93,10 +95,10 @@ trait CamelContextLifecycle extends Logging { */ def init(context: CamelContext) { this.activeObjectComponent = new ActiveObjectComponent - this.activeObjectRegistry = activeObjectComponent.getActiveObjectRegistry + this.activeObjectRegistry = activeObjectComponent.activeObjectRegistry this.context = context this.context.setStreamCaching(true) - this.context.addComponent(ActiveObjectComponent.DEFAULT_SCHEMA, activeObjectComponent) + this.context.addComponent(ActiveObjectComponent.DefaultSchema, activeObjectComponent) this.template = context.createProducerTemplate _initialized = true log.info("Camel context initialized") diff --git a/akka-camel/src/main/scala/component/ActiveObjectComponent.scala b/akka-camel/src/main/scala/component/ActiveObjectComponent.scala new file mode 100644 index 0000000000..a192b10fc9 --- /dev/null +++ b/akka-camel/src/main/scala/component/ActiveObjectComponent.scala @@ -0,0 +1,67 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.camel.component + +import java.util.Map +import java.util.concurrent.ConcurrentHashMap +import org.apache.camel.CamelContext +import org.apache.camel.component.bean._ + +/** + * @author Martin Krasser + */ +object ActiveObjectComponent { + val DefaultSchema = "actobj" +} + +/** + * @author Martin Krasser + */ +class ActiveObjectComponent extends BeanComponent { + val activeObjectRegistry = new ConcurrentHashMap[String, AnyRef] + + override def createEndpoint(uri: String, remaining: String, parameters: Map[String, AnyRef]) = { + val endpoint = new BeanEndpoint(uri, this) + endpoint.setBeanName(remaining) + endpoint.setBeanHolder(createBeanHolder(remaining)) + setProperties(endpoint.getProcessor, parameters) + endpoint + } + + private def createBeanHolder(beanName: String) = + new ActiveObjectHolder(activeObjectRegistry, getCamelContext, beanName).createCacheHolder +} + +/** + * @author Martin Krasser + */ +class ActiveObjectHolder(activeObjectRegistry: Map[String, AnyRef], context: CamelContext, name: String) + extends RegistryBean(context, name) { + + override def getBeanInfo: BeanInfo = + new ActiveObjectInfo(getContext, getBean.getClass, getParameterMappingStrategy) + + override def getBean: AnyRef = + activeObjectRegistry.get(getName) +} + +/** + * @author Martin Krasser + */ +class ActiveObjectInfo(context: CamelContext, clazz: Class[_], strategy: ParameterMappingStrategy) + extends BeanInfo(context, clazz, strategy) { + + protected override def introspect(clazz: Class[_]): Unit = { + for (method <- clazz.getDeclaredMethods) { + if (isValidMethod(clazz, method)) { + introspect(clazz, method) + } + } + val superclass = clazz.getSuperclass + if (superclass != null && !superclass.equals(classOf[Any])) { + introspect(superclass) + } + } +} \ No newline at end of file diff --git a/akka-camel/src/main/scala/service/ConsumerPublisher.scala b/akka-camel/src/main/scala/service/ConsumerPublisher.scala index 5d7ca23961..86a2cb1b44 100644 --- a/akka-camel/src/main/scala/service/ConsumerPublisher.scala +++ b/akka-camel/src/main/scala/service/ConsumerPublisher.scala @@ -3,6 +3,8 @@ */ package se.scalablesolutions.akka.camel.service +import collection.mutable.ListBuffer + import java.io.InputStream import java.lang.reflect.Method import java.util.concurrent.CountDownLatch @@ -12,9 +14,8 @@ import org.apache.camel.builder.RouteBuilder import se.scalablesolutions.akka.actor._ import se.scalablesolutions.akka.actor.annotation.consume import se.scalablesolutions.akka.camel.{Consumer, CamelContextManager} -import se.scalablesolutions.akka.util.Logging import se.scalablesolutions.akka.camel.component.ActiveObjectComponent -import collection.mutable.ListBuffer +import se.scalablesolutions.akka.util.Logging /** * Actor that publishes consumer actors as Camel endpoints at the CamelContext managed @@ -78,7 +79,7 @@ class ConsumerPublisher extends Actor with Logging { * Creates a route to the registered consumer actor. */ private def handleConsumerRegistered(event: ConsumerRegistered) { - CamelContextManager.context.addRoutes(new ConsumerRoute(event.uri, event.id, event.uuid)) + CamelContextManager.context.addRoutes(new ConsumerActorRoute(event.uri, event.id, event.uuid)) log.info("published actor %s (%s) at endpoint %s" format (event.clazz, event.id, event.uri)) } @@ -102,6 +103,23 @@ class ConsumerPublisher extends Actor with Logging { } } +abstract class ConsumerRoute(endpointUri: String, id: String) extends RouteBuilder { + // TODO: make conversions configurable + private val bodyConversions = Map( + "file" -> classOf[InputStream] + ) + + def configure = { + val schema = endpointUri take endpointUri.indexOf(":") // e.g. "http" from "http://whatever/..." + bodyConversions.get(schema) match { + case Some(clazz) => from(endpointUri).routeId(id).convertBodyTo(clazz).to(targetUri) + case None => from(endpointUri).routeId(id).to(targetUri) + } + } + + protected def targetUri: String +} + /** * Defines the route to a consumer actor. * @@ -112,50 +130,12 @@ class ConsumerPublisher extends Actor with Logging { * * @author Martin Krasser */ -class ConsumerRoute(val endpointUri: String, id: String, uuid: Boolean) extends RouteBuilder { - // - // - // TODO: factor out duplicated code from ConsumerRoute and ConsumerMethodRoute - // - // - - // TODO: make conversions configurable - private val bodyConversions = Map( - "file" -> classOf[InputStream] - ) - - def configure = { - val schema = endpointUri take endpointUri.indexOf(":") // e.g. "http" from "http://whatever/..." - bodyConversions.get(schema) match { - case Some(clazz) => from(endpointUri).routeId(id).convertBodyTo(clazz).to(actorUri) - case None => from(endpointUri).routeId(id).to(actorUri) - } - } - - private def actorUri = (if (uuid) "actor:uuid:%s" else "actor:id:%s") format id +class ConsumerActorRoute(endpointUri: String, id: String, uuid: Boolean) extends ConsumerRoute(endpointUri, id) { + protected override def targetUri = (if (uuid) "actor:uuid:%s" else "actor:id:%s") format id } -class ConsumerMethodRoute(val endpointUri: String, id: String, method: String) extends RouteBuilder { - // - // - // TODO: factor out duplicated code from ConsumerRoute and ConsumerMethodRoute - // - // - - // TODO: make conversions configurable - private val bodyConversions = Map( - "file" -> classOf[InputStream] - ) - - def configure = { - val schema = endpointUri take endpointUri.indexOf(":") // e.g. "http" from "http://whatever/..." - bodyConversions.get(schema) match { - case Some(clazz) => from(endpointUri).convertBodyTo(clazz).to(activeObjectUri) - case None => from(endpointUri).to(activeObjectUri) - } - } - - private def activeObjectUri = "%s:%s?method=%s" format (ActiveObjectComponent.DEFAULT_SCHEMA, id, method) +class ConsumerMethodRoute(val endpointUri: String, id: String, method: String) extends ConsumerRoute(endpointUri, id) { + protected override def targetUri = "%s:%s?method=%s" format (ActiveObjectComponent.DefaultSchema, id, method) } /** diff --git a/akka-camel/src/test/scala/service/CamelServiceFeatureTest.scala b/akka-camel/src/test/scala/service/CamelServiceFeatureTest.scala index 26b31daffa..adec3735c6 100644 --- a/akka-camel/src/test/scala/service/CamelServiceFeatureTest.scala +++ b/akka-camel/src/test/scala/service/CamelServiceFeatureTest.scala @@ -3,9 +3,9 @@ package se.scalablesolutions.akka.camel.service import org.apache.camel.builder.RouteBuilder import org.scalatest.{GivenWhenThen, BeforeAndAfterAll, FeatureSpec} +import se.scalablesolutions.akka.actor.Actor._ import se.scalablesolutions.akka.actor.{Actor, ActorRegistry} import se.scalablesolutions.akka.camel.{CamelContextManager, Message, Consumer} -import Actor._ object CamelServiceFeatureTest {