From b602a86eed0312f9b29c840e68be82bc79e44065 Mon Sep 17 00:00:00 2001 From: Jonas Boner Date: Mon, 11 May 2009 13:48:32 +0200 Subject: [PATCH] init impl of camel bean:actor routing --- akka.iws | 74 +++++++++---------- buildfile | 9 ++- .../ActiveObjectGuiceConfigurator.scala | 68 +++++++++-------- kernel/src/test/scala/CamelSpec.scala | 57 ++++++-------- 4 files changed, 101 insertions(+), 107 deletions(-) diff --git a/akka.iws b/akka.iws index 9df14e5052..e16ecdfba3 100644 --- a/akka.iws +++ b/akka.iws @@ -106,7 +106,7 @@ - + @@ -124,7 +124,7 @@ - + @@ -378,19 +378,19 @@ - + - + - - + + @@ -402,12 +402,12 @@ - + - + @@ -535,7 +535,7 @@ - + - + - + - + @@ -733,23 +733,9 @@ - + - - - - - - - - - - - - - - - + @@ -768,9 +754,23 @@ + + + + + + + + + + + + + + - + diff --git a/buildfile b/buildfile index 95e4d2512c..3247d607da 100644 --- a/buildfile +++ b/buildfile @@ -67,14 +67,19 @@ define 'akka' do desc 'Akka Actor Kernel core implementation' define 'kernel' do - compile.with(AKKA_UTIL_JAVA, GUICEYFRUIT, AOPALLIANCE, NETTY, JERSEY, GRIZZLY, CASSANDRA, THRIFT, FB303, CAMEL, SLF4J, GOOGLE_COLLECT, CGLIB, JSR_250, COMMONS_LOGGING, CONFIGGY, JUNIT4RUNNER, JUNIT4, SCALATEST) + compile.with( + AKKA_UTIL_JAVA, GUICEYFRUIT, AOPALLIANCE, NETTY, JERSEY, GRIZZLY, + CASSANDRA, THRIFT, FB303, CAMEL, SLF4J, GOOGLE_COLLECT, CGLIB, JSR_250, + COMMONS_LOGGING, CONFIGGY, JUNIT4RUNNER, JUNIT4, SCALATEST) test.using :junit package :jar end desc 'Akka Java API' define 'api-java' do - compile.with(AKKA_KERNEL, AKKA_UTIL_JAVA, NETTY, JERSEY, GRIZZLY, CASSANDRA, THRIFT, FB303, CAMEL, SLF4J, CONFIGGY, GUICEYFRUIT, SCALA, GOOGLE_COLLECT, AOPALLIANCE, CGLIB, JSR_250) + compile.with(AKKA_KERNEL, AKKA_UTIL_JAVA, NETTY, JERSEY, GRIZZLY, + CASSANDRA, THRIFT, FB303, CAMEL, SLF4J, CONFIGGY, GUICEYFRUIT, SCALA, + GOOGLE_COLLECT, AOPALLIANCE, CGLIB, JSR_250) test.using :junit package :jar end diff --git a/kernel/src/main/scala/config/ActiveObjectGuiceConfigurator.scala b/kernel/src/main/scala/config/ActiveObjectGuiceConfigurator.scala index 4dde99f6e4..4d9ca0c0e1 100644 --- a/kernel/src/main/scala/config/ActiveObjectGuiceConfigurator.scala +++ b/kernel/src/main/scala/config/ActiveObjectGuiceConfigurator.scala @@ -10,7 +10,7 @@ import com.google.inject.jsr250.ResourceProviderFactory import java.lang.reflect.Method import kernel.camel.ActiveObjectComponent import org.apache.camel.impl.{JndiRegistry, DefaultCamelContext} -import org.apache.camel.{Endpoint, Routes} +import org.apache.camel.{CamelContext, Endpoint, Routes} import scala.collection.mutable.HashMap import se.scalablesolutions.akka.kernel.ActiveObjectFactory import se.scalablesolutions.akka.kernel.ActiveObjectProxy @@ -31,27 +31,10 @@ class ActiveObjectGuiceConfigurator extends Logging { private var configRegistry = new HashMap[Class[_], Component] // TODO is configRegistry needed? private var activeObjectRegistry = new HashMap[String, Tuple3[Class[_], Class[_], ActiveObjectProxy]] private var activeObjectFactory = new ActiveObjectFactory -// private var camelContext = new DefaultCamelContext(); + private var camelContext = new DefaultCamelContext private var modules = new java.util.ArrayList[Module] private var methodToUriRegistry = new HashMap[Method, String] - def getExternalDependency[T](clazz: Class[T]): T = synchronized { - injector.getInstance(clazz).asInstanceOf[T] - } - -/* - def getRoutingEndpoint(uri: String): Endpoint = synchronized { - camelContext.getEndpoint(uri) - } - - def getRoutingEndpoints: java.util.Collection[Endpoint] = synchronized { - camelContext.getEndpoints - } - - def getRoutingEndpoints(uri: String): java.util.Collection[Endpoint] = synchronized { - camelContext.getEndpoints(uri) - } -*/ /** * Returns the active abject that has been put under supervision for the class specified. * @@ -60,16 +43,14 @@ class ActiveObjectGuiceConfigurator extends Logging { */ def getActiveObject(name: String): AnyRef = synchronized { //def getActiveObject[T](name: String): T = synchronized { - println("Looking up active object " + name) log.debug("Looking up active object [%s]", name) - if (injector == null) throw new IllegalStateException("inject() and supervise() must be called before invoking newInstance(clazz)") + if (injector == null) throw new IllegalStateException("inject() and/or supervise() must be called before invoking newInstance(clazz)") val activeObjectOption: Option[Tuple3[Class[_], Class[_], ActiveObjectProxy]] = activeObjectRegistry.get(name) if (activeObjectOption.isDefined) { val classInfo = activeObjectOption.get val intfClass = classInfo._1 val implClass = classInfo._2 val activeObjectProxy = classInfo._3 - //activeObjectProxy.setTargetInstance(injector.getInstance(clazz).asInstanceOf[AnyRef]) val target = implClass.newInstance injector.injectMembers(target) activeObjectProxy.setTargetInstance(target.asInstanceOf[AnyRef]) @@ -85,6 +66,22 @@ class ActiveObjectGuiceConfigurator extends Logging { else throw new IllegalStateException("Class " + name + " has not been put under supervision (by passing in the config to the 'supervise') method") } + def getExternalDependency[T](clazz: Class[T]): T = synchronized { + injector.getInstance(clazz).asInstanceOf[T] + } + + def getRoutingEndpoint(uri: String): Endpoint = synchronized { + camelContext.getEndpoint(uri) + } + + def getRoutingEndpoints: java.util.Collection[Endpoint] = synchronized { + camelContext.getEndpoints + } + + def getRoutingEndpoints(uri: String): java.util.Collection[Endpoint] = synchronized { + camelContext.getEndpoints(uri) + } + def configureActiveObjects(restartStrategy: RestartStrategy, components: List[Component]): ActiveObjectGuiceConfigurator = synchronized { this.restartStrategy = restartStrategy this.components = components.toArray.toList.asInstanceOf[List[Component]] @@ -104,21 +101,20 @@ class ActiveObjectGuiceConfigurator extends Logging { def supervise: ActiveObjectGuiceConfigurator = synchronized { if (injector == null) inject - injector = Guice.createInjector(modules) var workers = new java.util.ArrayList[Worker] for (component <- components) { val activeObjectProxy = new ActiveObjectProxy(component.intf, component.target, component.timeout) workers.add(Worker(activeObjectProxy.server, component.lifeCycle)) activeObjectRegistry.put(component.name, (component.intf, component.target, activeObjectProxy)) -// camelContext.getRegistry.asInstanceOf[JndiRegistry].bind(component.name, activeObjectProxy) -// for (method <- component.intf.getDeclaredMethods.toList) { -// registerMethodForUri(method, component.name) -// } -// log.debug("Registering active object in Camel context under the name [%s]", component.target.getName) + camelContext.getRegistry.asInstanceOf[JndiRegistry].bind(component.name, activeObjectProxy) + for (method <- component.intf.getDeclaredMethods.toList) { + registerMethodForUri(method, component.name) + } + log.debug("Registering active object in Camel context under the name [%s]", component.target.getName) } supervisor = activeObjectFactory.supervise(restartStrategy, workers) -// camelContext.addComponent(AKKA_CAMEL_ROUTING_SCHEME, new ActiveObjectComponent(this)) -// camelContext.start + camelContext.addComponent(AKKA_CAMEL_ROUTING_SCHEME, new ActiveObjectComponent(this)) + camelContext.start this } @@ -153,13 +149,15 @@ class ActiveObjectGuiceConfigurator extends Logging { * } * }).inject().supervise(); * - * + */ def addRoutes(routes: Routes): ActiveObjectGuiceConfigurator = synchronized { camelContext.addRoutes(routes) this } - */ - def getGuiceModules = modules + + def getCamelContext: CamelContext = camelContext + + def getGuiceModules: java.util.List[Module] = modules def reset = synchronized { modules = new java.util.ArrayList[Module] @@ -168,11 +166,11 @@ class ActiveObjectGuiceConfigurator extends Logging { methodToUriRegistry = new HashMap[Method, String] injector = null restartStrategy = null -// camelContext = new DefaultCamelContext + camelContext = new DefaultCamelContext } def stop = synchronized { -// camelContext.stop + camelContext.stop supervisor.stop } diff --git a/kernel/src/test/scala/CamelSpec.scala b/kernel/src/test/scala/CamelSpec.scala index b9a0cd5601..89a7d441ff 100644 --- a/kernel/src/test/scala/CamelSpec.scala +++ b/kernel/src/test/scala/CamelSpec.scala @@ -11,6 +11,7 @@ import kernel.config.ScalaConfig._ import com.google.inject.{AbstractModule, Scopes} import com.jteigen.scalatest.JUnit4Runner +import org.apache.camel.component.bean.ProxyHelper import org.junit.runner.RunWith import org.scalatest._ import org.scalatest.matchers._ @@ -26,6 +27,8 @@ import org.apache.camel.Producer import org.apache.camel.builder.RouteBuilder import org.apache.camel.impl.DefaultCamelContext +// REQUIRES: -Djava.naming.factory.initial=org.apache.camel.util.jndi.CamelInitialContextFactory + /** * @author Jonas Bonér */ @@ -33,10 +36,7 @@ import org.apache.camel.impl.DefaultCamelContext class CamelSpec extends Spec with ShouldMatchers { describe("A Camel routing scheme") { - it("dummy") { - } -/* - it("should route message from actor A to actor B") { + it("should route message from direct:test to actor A using @Bean endpoint") { val latch = new CountDownLatch(1); val conf = new ActiveObjectGuiceConfigurator @@ -48,17 +48,10 @@ class CamelSpec extends Spec with ShouldMatchers { classOf[CamelFooImpl], LifeCycle(Permanent, 1000), 1000) :: - Component( - "camelbar", - classOf[CamelBar], - classOf[CamelBarImpl], - LifeCycle(Permanent, 1000), - 1000) :: Nil ).addRoutes(new RouteBuilder() { def configure = { - from("akka:camelfoo.foo").to("akka:camelbar.bar") - from("akka:camelbar.bar").process(new Processor() { + from("direct:test").to("bean:camelfoo").process(new Processor() { def process(e: Exchange) = { println("Received exchange: " + e.getIn()) latch.countDown @@ -67,29 +60,27 @@ class CamelSpec extends Spec with ShouldMatchers { }} ).supervise - //val endpoint = conf.getRoutingEndpoint("akka:camelfoo.foo") -// println("----- " + endpoint) -// val exchange = endpoint.createExchange -// println("----- " + exchange) + val endpoint = conf.getRoutingEndpoint("direct:test") + val proxy = ProxyHelper.createProxy(endpoint, classOf[CamelFoo]) - conf.getActiveObject(classOf[CamelFooImpl].getName).asInstanceOf[CamelFoo].foo("Hello Foo") - -// -// exchange.getIn().setHeader("cheese", 123) -// exchange.getIn().setBody("body") -// -// val producer = endpoint.createProducer -// println("----- " + producer) -// -// producer.process(exchange) -// -// // now lets sleep for a while -// val received = latch.await(5, TimeUnit.SECONDS) -// received should equal (true) -// -// conf.stop + proxy.foo("hello there") +/* + val exchange = endpoint.createExchange + println("----- " + exchange) + + exchange.getIn().setBody("hello there") + + val producer = endpoint.createProducer + println("----- " + producer) + + producer.process(exchange) + + // now lets sleep for a while + val received = latch.await(5, TimeUnit.SECONDS) + received should equal (true) +*/ + conf.stop } - */ } }