diff --git a/api-java/akka-api-java.iml b/api-java/akka-api-java.iml new file mode 100644 index 0000000000..7b73f73842 --- /dev/null +++ b/api-java/akka-api-java.iml @@ -0,0 +1,62 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/api-java/java/se/scalablesolutions/akka/api/ActiveObjectGuiceConfigurator.java b/api-java/java/se/scalablesolutions/akka/api/ActiveObjectGuiceConfigurator.java new file mode 100755 index 0000000000..e3cc4bb78e --- /dev/null +++ b/api-java/java/se/scalablesolutions/akka/api/ActiveObjectGuiceConfigurator.java @@ -0,0 +1,121 @@ +/** + * Copyright (C) 2009 Scalable Solutions. + */ + +package se.scalablesolutions.akka.api; + +import com.google.inject.*; +import com.google.inject.jsr250.ResourceProviderFactory; + +import se.scalablesolutions.akka.kernel.configuration.*; +import se.scalablesolutions.akka.kernel.ActiveObjectFactory; +import se.scalablesolutions.akka.kernel.ActiveObjectProxy; +import se.scalablesolutions.akka.kernel.Supervisor; +import se.scalablesolutions.akka.kernel.Worker; + +import java.util.List; +import java.util.ArrayList; +import java.util.Map; +import java.util.HashMap; + +/** + * @author Jonas Bonér + */ +public class ActiveObjectGuiceConfigurator { + private List modules = new ArrayList(); + private Injector injector; + private Supervisor supervisor; + private RestartStrategy restartStrategy; + private Component[] components; + private Map configRegistry = new HashMap(); // TODO is configRegistry needed? + private Map activeObjectRegistry = new HashMap(); + private ActiveObjectFactory activeObjectFactory = new ActiveObjectFactory(); + + public synchronized T getExternalDependency(Class clazz) { + return injector.getInstance(clazz); + } + + /** + * Returns the active abject that has been put under supervision for the class specified. + * + * @param clazz the class for the active object + * @return the active object for the class + */ + public synchronized T getActiveObject(Class clazz) { + if (injector == null) throw new IllegalStateException("inject() and supervise() must be called before invoking newInstance(clazz)"); + if (activeObjectRegistry.containsKey(clazz)) { + final ActiveObjectProxy activeObjectProxy = activeObjectRegistry.get(clazz); + activeObjectProxy.setTargetInstance(injector.getInstance(clazz)); + return (T)activeObjectFactory.newInstance(clazz, activeObjectProxy); + } else throw new IllegalStateException("Class " + clazz.getName() + " has not been put under supervision (by passing in the config to the supervise() method"); + } + + public synchronized ActiveObjectGuiceConfigurator configureActiveObjects(final RestartStrategy restartStrategy, final Component[] components) { + this.restartStrategy = restartStrategy; + this.components = components; + modules.add(new AbstractModule() { + protected void configure() { + bind(ResourceProviderFactory.class); + for (int i = 0; i < components.length; i++) { + Component c = components[i]; + bind((Class) c.intf()).to((Class) c.target()).in(Singleton.class); + } + } + }); + return this; + } + + public synchronized ActiveObjectGuiceConfigurator inject() { + if (injector != null) throw new IllegalStateException("inject() has already been called on this configurator"); + injector = Guice.createInjector(modules); + return this; + } + + public synchronized ActiveObjectGuiceConfigurator supervise() { + if (injector == null) inject(); + injector = Guice.createInjector(modules); + List workers = new ArrayList(); + for (int i = 0; i < components.length; i++) { + final Component c = components[i]; + final ActiveObjectProxy activeObjectProxy = new ActiveObjectProxy(c.intf(), c.target(), c.timeout()); + workers.add(c.newWorker(activeObjectProxy)); + activeObjectRegistry.put(c.intf(), activeObjectProxy); + } + supervisor = activeObjectFactory.supervise(restartStrategy.transform(), workers); + return this; + } + + + /** + * Add additional services to be wired in. + *
+   * ActiveObjectGuiceConfigurator.addExternalGuiceModule(new AbstractModule {
+   *   protected void configure() {
+   *     bind(Foo.class).to(FooImpl.class).in(Scopes.SINGLETON);
+   *     bind(BarImpl.class);
+   *     link(Bar.class).to(BarImpl.class);
+   *     bindConstant(named("port")).to(8080);
+   *   }})
+   * 
+ */ + public synchronized ActiveObjectGuiceConfigurator addExternalGuiceModule(Module module) { + modules.add(module); + return this; + } + + public List getGuiceModules() { + return modules; + } + + public synchronized void reset() { + modules = new ArrayList(); + configRegistry = new HashMap(); + activeObjectRegistry = new HashMap(); + injector = null; + restartStrategy = null; + } + + public synchronized void stop() { + supervisor.stop(); + } +} diff --git a/kernel/akka-kernel.iml b/kernel/akka-kernel.iml new file mode 100644 index 0000000000..57185ba999 --- /dev/null +++ b/kernel/akka-kernel.iml @@ -0,0 +1,67 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/kernel/src/main/resources/META-INF/services/org/apache/camel/component/akka b/kernel/src/main/resources/META-INF/services/org/apache/camel/component/akka new file mode 100644 index 0000000000..7c846bc93e --- /dev/null +++ b/kernel/src/main/resources/META-INF/services/org/apache/camel/component/akka @@ -0,0 +1 @@ +class=se.scalablesolutions.akka.kernel.camel.ActiveObjectComponent \ No newline at end of file diff --git a/kernel/src/main/scala/camel/ActiveObjectComponent.scala b/kernel/src/main/scala/camel/ActiveObjectComponent.scala new file mode 100644 index 0000000000..2d2315e943 --- /dev/null +++ b/kernel/src/main/scala/camel/ActiveObjectComponent.scala @@ -0,0 +1,22 @@ +/** + * Copyright (C) 2009 Scalable Solutions. + */ + +package se.scalablesolutions.akka.kernel.camel + +import config.ActiveObjectGuiceConfigurator +import java.util.Map +import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue} + +import org.apache.camel.{Endpoint, Exchange} +import org.apache.camel.impl.DefaultComponent + +/** + * @author Jonas Bonér + */ +class ActiveObjectComponent(val conf: ActiveObjectGuiceConfigurator) extends DefaultComponent { + override def createEndpoint(uri: String, remaining: String, parameters: Map[_,_]): Endpoint = { + //val consumers = getAndRemoveParameter(parameters, "concurrentConsumers", classOf[Int], 1) + new ActiveObjectEndpoint(uri, this, conf) + } +} diff --git a/kernel/src/main/scala/camel/ActiveObjectConsumer.scala b/kernel/src/main/scala/camel/ActiveObjectConsumer.scala new file mode 100644 index 0000000000..a3ac5425a9 --- /dev/null +++ b/kernel/src/main/scala/camel/ActiveObjectConsumer.scala @@ -0,0 +1,36 @@ +/** + * Copyright (C) 2009 Scalable Solutions. + */ + +package se.scalablesolutions.akka.kernel.camel + +import java.util.concurrent.{BlockingQueue, ExecutorService, Executors, ThreadFactory, TimeUnit} + +import se.scalablesolutions.akka.kernel.{Logging, GenericServerContainer} + +import org.apache.camel.{AsyncCallback, AsyncProcessor, Consumer, Exchange, Processor} +import org.apache.camel.impl.ServiceSupport +import org.apache.camel.impl.converter.AsyncProcessorTypeConverter + +/** + * @author Jonas Bonér + */ +class ActiveObjectConsumer( + val endpoint: ActiveObjectEndpoint, + proc: Processor, + val activeObject: AnyRef) + extends ServiceSupport with Consumer with Runnable with Logging { + val processor = AsyncProcessorTypeConverter.convert(proc) + println("------- creating consumer for: "+ endpoint.uri) + + override def run = { + } + + def doStart() = { + } + + def doStop() = { + } + + override def toString(): String = "ActiveObjectConsumer [" + endpoint.getEndpointUri + "]" +} diff --git a/kernel/src/main/scala/camel/ActiveObjectEndpoint.scala b/kernel/src/main/scala/camel/ActiveObjectEndpoint.scala new file mode 100644 index 0000000000..7946f87753 --- /dev/null +++ b/kernel/src/main/scala/camel/ActiveObjectEndpoint.scala @@ -0,0 +1,47 @@ +/** + * Copyright (C) 2009 Scalable Solutions. + */ + +package se.scalablesolutions.akka.kernel.camel + +import config.ActiveObjectGuiceConfigurator +import se.scalablesolutions.akka.kernel.Logging + +import java.util.{ArrayList, HashSet, List, Set} +import java.util.concurrent.{BlockingQueue, CopyOnWriteArraySet, LinkedBlockingQueue} + +import org.apache.camel.{Component, Consumer, Exchange, Processor, Producer} +import org.apache.camel.impl.{DefaultEndpoint, DefaultComponent}; +import org.apache.camel.spi.BrowsableEndpoint; + +/** + * @author Jonas Bonér + */ +class ActiveObjectEndpoint(val uri: String, val component: DefaultComponent, val conf: ActiveObjectGuiceConfigurator) // FIXME: need abstraction trait here + extends DefaultEndpoint(uri) with BrowsableEndpoint with Logging { + + val firstSep = uri.indexOf(':') + val lastSep = uri.lastIndexOf( '.') + + val scheme = uri.substring(0, firstSep) + val activeObjectName = uri.substring(uri.indexOf(':') + 1, lastSep) + val methodName = uri.substring(lastSep + 1, uri.length) + val activeObject = conf.getActiveObject(activeObjectName).asInstanceOf[MessageDriven] +// val activeObjectProxy = conf.getActiveObjectProxy(activeObjectName) + +// val genericServer = supervisor.getServerOrElse( +// activeObjectName, +// throw new IllegalArgumentException("Can't find active object with name [" + activeObjectName + "] and method [" + methodName + "]")) + + log.debug("Creating Camel Endpoint for scheme [%s] and component [%s]", scheme, activeObjectName) + + private var queue: BlockingQueue[Exchange] = new LinkedBlockingQueue[Exchange](1000) + + override def createProducer: Producer = new ActiveObjectProducer(this, activeObject) + + override def createConsumer(processor: Processor): Consumer = new ActiveObjectConsumer(this, processor, activeObject) + + override def getExchanges: List[Exchange] = new ArrayList[Exchange](queue) + + override def isSingleton = true +} diff --git a/kernel/src/main/scala/camel/ActiveObjectProducer.scala b/kernel/src/main/scala/camel/ActiveObjectProducer.scala new file mode 100644 index 0000000000..0c7a6a4d71 --- /dev/null +++ b/kernel/src/main/scala/camel/ActiveObjectProducer.scala @@ -0,0 +1,43 @@ +/** + * Copyright (C) 2009 Scalable Solutions. + */ + +package se.scalablesolutions.akka.kernel.camel + +import java.util.Collection; +import java.util.concurrent.BlockingQueue; + +import se.scalablesolutions.akka.kernel.{Logging, GenericServerContainer} + +import org.apache.camel.{Exchange, AsyncProcessor, AsyncCallback} +import org.apache.camel.impl.DefaultProducer + +/** + * @author Jonas Bonér + */ +class ActiveObjectProducer( + val endpoint: ActiveObjectEndpoint, + val activeObject: MessageDriven) + extends DefaultProducer(endpoint) with AsyncProcessor with Logging { + private val actorName = endpoint.activeObjectName + + def process(exchange: Exchange) = activeObject.onMessage(exchange) // FIXME: should we not invoke the generic server here? + + def process(exchange: Exchange, callback: AsyncCallback): Boolean = { + val copy = exchange.copy + copy.setProperty("CamelAsyncCallback", callback) + activeObject.onMessage(copy) + callback.done(true) + true + } + + override def doStart = { + super.doStart + } + + override def doStop = { + super.doStop + } + + override def toString(): String = "ActiveObjectProducer [" + endpoint.getEndpointUri + "]" +} diff --git a/kernel/src/main/scala/camel/MessageDriven.scala b/kernel/src/main/scala/camel/MessageDriven.scala new file mode 100644 index 0000000000..889ddafeff --- /dev/null +++ b/kernel/src/main/scala/camel/MessageDriven.scala @@ -0,0 +1,14 @@ +/** + * Copyright (C) 2009 Scalable Solutions. + */ + +package se.scalablesolutions.akka.kernel.camel + +import org.apache.camel.Exchange + +/** + * @author Jonas Bonér + */ +trait MessageDriven { + def onMessage(exchange: Exchange) +} \ No newline at end of file diff --git a/kernel/src/main/scala/camel/SupervisorAwareCamelContext.scala b/kernel/src/main/scala/camel/SupervisorAwareCamelContext.scala new file mode 100644 index 0000000000..041bfd7048 --- /dev/null +++ b/kernel/src/main/scala/camel/SupervisorAwareCamelContext.scala @@ -0,0 +1,14 @@ +//** + * Copyright (C) 2009 Scalable Solutions. + */ + +package se.scalablesolutions.akka.kernel.camel + +import org.apache.camel.impl.{DefaultCamelContext, DefaultEndpoint, DefaultComponent} +import se.scalablesolutions.akka.kernel.{Supervisor, Logging} +/** + * @author Jonas Bonér + */ +class SupervisorAwareCamelContext extends DefaultCamelContext with Logging { + var supervisor: Supervisor = _ +} \ No newline at end of file diff --git a/kernel/src/main/scala/config/ActiveObjectGuiceConfigurator.scala b/kernel/src/main/scala/config/ActiveObjectGuiceConfigurator.scala new file mode 100644 index 0000000000..14d8005ca5 --- /dev/null +++ b/kernel/src/main/scala/config/ActiveObjectGuiceConfigurator.scala @@ -0,0 +1,189 @@ +/** + * Copyright (C) 2009 Scalable Solutions. + */ + +package se.scalablesolutions.akka.kernel.config + + +import com.google.inject._ +import com.google.inject.jsr250.ResourceProviderFactory + +import java.lang.reflect.Method +import kernel.camel.ActiveObjectComponent +import org.apache.camel.impl.{JndiRegistry, DefaultCamelContext} +import org.apache.camel.{Endpoint, Routes} +import scala.collection.mutable.HashMap +import se.scalablesolutions.akka.kernel.ActiveObjectFactory +import se.scalablesolutions.akka.kernel.ActiveObjectProxy +import se.scalablesolutions.akka.kernel.Supervisor +import se.scalablesolutions.akka.kernel.config.ScalaConfig._ + + +/** + * @author Jonas Bonér + */ +class ActiveObjectGuiceConfigurator extends Logging { + val AKKA_CAMEL_ROUTING_SCHEME = "akka" + + private var injector: Injector = _ + private var supervisor: Supervisor = _ + private var restartStrategy: RestartStrategy = _ + private var components: List[Component] = _ + private var bindings: List[DependencyBinding] = Nil + private var configRegistry = new HashMap[Class[_], Component] // TODO is configRegistry needed? + private var activeObjectRegistry = new HashMap[String, Tuple3[Class[_], Class[_], ActiveObjectProxy]] + private var activeObjectFactory = new ActiveObjectFactory + private var camelContext = new DefaultCamelContext(); + private var modules = new java.util.ArrayList[Module] + private var methodToUriRegistry = new HashMap[Method, String] + + def getExternalDependency[T](clazz: Class[T]): T = synchronized { + injector.getInstance(clazz).asInstanceOf[T] + } + + def getRoutingEndpoint(uri: String): Endpoint = synchronized { + camelContext.getEndpoint(uri) + } + + def getRoutingEndpoints: java.util.Collection[Endpoint] = synchronized { + camelContext.getEndpoints + } + + def getRoutingEndpoints(uri: String): java.util.Collection[Endpoint] = synchronized { + camelContext.getEndpoints(uri) + } + + /** + * Returns the active abject that has been put under supervision for the class specified. + * + * @param clazz the class for the active object + * @return the active object for the class + */ + def getActiveObject(name: String): AnyRef = synchronized { + //def getActiveObject[T](name: String): T = synchronized { + println("Looking up active object " + name) + log.debug("Looking up active object [%s]", name) + if (injector == null) throw new IllegalStateException("inject() and supervise() must be called before invoking newInstance(clazz)") + val activeObjectOption: Option[Tuple3[Class[_], Class[_], ActiveObjectProxy]] = activeObjectRegistry.get(name) + if (activeObjectOption.isDefined) { + val classInfo = activeObjectOption.get + val intfClass = classInfo._1 + val implClass = classInfo._2 + val activeObjectProxy = classInfo._3 + //activeObjectProxy.setTargetInstance(injector.getInstance(clazz).asInstanceOf[AnyRef]) + val target = implClass.newInstance + injector.injectMembers(target) + activeObjectProxy.setTargetInstance(target.asInstanceOf[AnyRef]) + activeObjectFactory.newInstance(intfClass, activeObjectProxy).asInstanceOf[AnyRef] + } else throw new IllegalStateException("Class " + name + " has not been put under supervision (by passing in the config to the 'supervise') method") + } + + def getActiveObjectProxy(name: String): ActiveObjectProxy = synchronized { + log.debug("Looking up active object proxy [%s]", name) + if (injector == null) throw new IllegalStateException("inject() and supervise() must be called before invoking newInstance(clazz)") + val activeObjectOption: Option[Tuple3[Class[_], Class[_], ActiveObjectProxy]] = activeObjectRegistry.get(name) + if (activeObjectOption.isDefined) activeObjectOption.get._3 + else throw new IllegalStateException("Class " + name + " has not been put under supervision (by passing in the config to the 'supervise') method") + } + + def configureActiveObjects(restartStrategy: RestartStrategy, components: List[Component]): ActiveObjectGuiceConfigurator = synchronized { + this.restartStrategy = restartStrategy + this.components = components.toArray.toList.asInstanceOf[List[Component]] + bindings = for (c <- this.components) + yield new DependencyBinding(c.intf, c.target) // build up the Guice interface class -> impl class bindings + val arrayList = new java.util.ArrayList[DependencyBinding]() + for (b <- bindings) arrayList.add(b) + modules.add(new ActiveObjectGuiceModule(arrayList)) + this + } + + def inject: ActiveObjectGuiceConfigurator = synchronized { + if (injector != null) throw new IllegalStateException("inject() has already been called on this configurator") + injector = Guice.createInjector(modules) + this + } + + def supervise: ActiveObjectGuiceConfigurator = synchronized { + if (injector == null) inject + injector = Guice.createInjector(modules) + var workers = new java.util.ArrayList[Worker] + for (component <- components) { + val activeObjectProxy = new ActiveObjectProxy(component.intf, component.target, component.timeout, this) + workers.add(Worker(activeObjectProxy.server, component.lifeCycle)) + activeObjectRegistry.put(component.name, (component.intf, component.target, activeObjectProxy)) + camelContext.getRegistry.asInstanceOf[JndiRegistry].bind(component.name, activeObjectProxy) + for (method <- component.intf.getDeclaredMethods.toList) { + registerMethodForUri(method, component.name) + } + log.debug("Registering active object in Camel context under the name [%s]", component.target.getName) + } + supervisor = activeObjectFactory.supervise(restartStrategy, workers) + camelContext.addComponent(AKKA_CAMEL_ROUTING_SCHEME, new ActiveObjectComponent(this)) + camelContext.start + this + } + + /** + * Add additional services to be wired in. + *
+   * ActiveObjectGuiceModule.addExternalGuiceModule(new AbstractModule {
+   *   protected void configure() {
+   *     bind(Foo.class).to(FooImpl.class).in(Scopes.SINGLETON);
+   *     bind(BarImpl.class);
+   *     link(Bar.class).to(BarImpl.class);
+   *     bindConstant(named("port")).to(8080);
+   *   }})
+   * 
+ */ + def addExternalGuiceModule(module: Module): ActiveObjectGuiceConfigurator = synchronized { + modules.add(module) + this + } + + /** + * Add Camel routes for the active objects. + *
+   * activeObjectGuiceModule.addRoutes(new RouteBuilder() {
+   *   def configure = {
+   *     from("akka:actor1").to("akka:actor2")
+   *     from("akka:actor2").process(new Processor() {
+   *       def process(e: Exchange) = {
+   *         println("Received exchange: " + e.getIn())
+   *       }
+   *     })
+   *   }
+   * }).inject().supervise();
+   * 
+ */ + def addRoutes(routes: Routes): ActiveObjectGuiceConfigurator = synchronized { + camelContext.addRoutes(routes) + this + } + + def getGuiceModules = modules + + def reset = synchronized { + modules = new java.util.ArrayList[Module] + configRegistry = new HashMap[Class[_], Component] + activeObjectRegistry = new HashMap[String, Tuple3[Class[_], Class[_], ActiveObjectProxy]] + methodToUriRegistry = new HashMap[Method, String] + injector = null + restartStrategy = null + camelContext = new DefaultCamelContext + } + + def stop = synchronized { + camelContext.stop + supervisor.stop + } + + def registerMethodForUri(method: Method, componentName: String) = + methodToUriRegistry += method -> buildUri(method, componentName) + + def lookupUriFor(method: Method): String = + methodToUriRegistry.getOrElse(method, throw new IllegalStateException("Could not find URI for method [" + method.getName + "]")) + + def buildUri(method: Method, componentName: String): String = + AKKA_CAMEL_ROUTING_SCHEME + ":" + componentName + "." + method.getName +} + \ No newline at end of file diff --git a/kernel/src/main/scala/config/ActiveObjectGuiceConfiguratorForJava.scala b/kernel/src/main/scala/config/ActiveObjectGuiceConfiguratorForJava.scala new file mode 100644 index 0000000000..3829645ed4 --- /dev/null +++ b/kernel/src/main/scala/config/ActiveObjectGuiceConfiguratorForJava.scala @@ -0,0 +1,155 @@ +/** + * Copyright (C) 2009 Scalable Solutions. + */ + +package se.scalablesolutions.akka.kernel.config + +import akka.kernel.config.JavaConfig._ +import akka.kernel.{Supervisor, ActiveObjectProxy, ActiveObjectFactory} +import akka.kernel.config.{DependencyBinding, ActiveObjectGuiceModule} + +import com.google.inject._ +import com.google.inject.jsr250.ResourceProviderFactory + +import java.util.{ArrayList, HashMap, Collection} +import org.apache.camel.impl.{JndiRegistry, DefaultCamelContext} +import org.apache.camel.{Endpoint, Routes} + +/** + * @author Jonas Bonér + */ +class ActiveObjectGuiceConfiguratorForJava { + private var modules = new ArrayList[Module] + private var injector: Injector = _ + private var supervisor: Supervisor = _ + private var restartStrategy: RestartStrategy = _ + private var components: List[Component] = _ + private var bindings: List[DependencyBinding] = Nil + private var configRegistry = new HashMap[Class[_], Component] // TODO is configRegistry needed? + private var activeObjectRegistry = new HashMap[String, Tuple2[Class[_], ActiveObjectProxy]] + private var activeObjectFactory = new ActiveObjectFactory + private var camelContext = new DefaultCamelContext(); + + def getExternalDependency[T](clazz: Class[T]): T = synchronized { + injector.getInstance(clazz).asInstanceOf[T] + } + + def getRoutingEndpoint(uri: String): Endpoint = synchronized { + camelContext.getEndpoint(uri) + } + + def getRoutingEndpoints: Collection[Endpoint] = synchronized { + camelContext.getEndpoints + } + + def getRoutingEndpoints(uri: String): Collection[Endpoint] = synchronized { + camelContext.getEndpoints(uri) + } + + /** + * Returns the active abject that has been put under supervision for the class specified. + * + * @param clazz the class for the active object + * @return the active object for the class + */ + def getActiveObject[T](name: String): T = synchronized { + if (injector == null) throw new IllegalStateException("inject() and supervise() must be called before invoking newInstance(clazz)") + if (activeObjectRegistry.containsKey(name)) { + val activeObjectTuple = activeObjectRegistry.get(name) + val clazz = activeObjectTuple._1 + val activeObjectProxy = activeObjectTuple._2 + //activeObjectProxy.setTargetInstance(injector.getInstance(clazz).asInstanceOf[AnyRef]) + val target = clazz.newInstance + injector.injectMembers(target) + activeObjectProxy.setTargetInstance(target.asInstanceOf[AnyRef]) + activeObjectFactory.newInstance(clazz, activeObjectProxy).asInstanceOf[T] + } else throw new IllegalStateException("Class " + name + " has not been put under supervision (by passing in the config to the supervise() method") + } + + def configureActiveObjects(restartStrategy: RestartStrategy, components: Array[Component]): ActiveObjectGuiceConfiguratorForJava = synchronized { + this.restartStrategy = restartStrategy + this.components = components.toArray.toList.asInstanceOf[List[Component]] + bindings = for (c <- this.components) yield { + new DependencyBinding(c.intf, c.target) + } + val arrayList = new ArrayList[DependencyBinding]() + for (b <- bindings) arrayList.add(b) + modules.add(new ActiveObjectGuiceModule(arrayList)) + this + } + + def inject(): ActiveObjectGuiceConfiguratorForJava = synchronized { + if (injector != null) throw new IllegalStateException("inject() has already been called on this configurator") + injector = Guice.createInjector(modules) + this + } + + def supervise: ActiveObjectGuiceConfiguratorForJava = synchronized { + if (injector == null) inject() + injector = Guice.createInjector(modules) + val workers = new java.util.ArrayList[se.scalablesolutions.akka.kernel.config.ScalaConfig.Worker] + for (c <- components) { + val activeObjectProxy = new ActiveObjectProxy(c.intf, c.target, c.timeout) + workers.add(c.newWorker(activeObjectProxy)) + activeObjectRegistry.put(c.name, (c.intf, activeObjectProxy)) + camelContext.getRegistry.asInstanceOf[JndiRegistry].bind(c.intf.getName, activeObjectProxy) + } + supervisor = activeObjectFactory.supervise(restartStrategy.transform, workers) + camelContext.start + this + } + + + /** + * Add additional services to be wired in. + *
+   * ActiveObjectGuiceModule.addExternalGuiceModule(new AbstractModule {
+   *   protected void configure() {
+   *     bind(Foo.class).to(FooImpl.class).in(Scopes.SINGLETON);
+   *     bind(BarImpl.class);
+   *     link(Bar.class).to(BarImpl.class);
+   *     bindConstant(named("port")).to(8080);
+   *   }})
+   * 
+ */ + def addExternalGuiceModule(module: Module): ActiveObjectGuiceConfiguratorForJava = synchronized { + modules.add(module) + this + } + + /** + * Add Camel routes for the active objects. + *
+   * activeObjectGuiceModule.addRoutes(new RouteBuilder() {
+   *   def configure = {
+   *     from("akka:actor1").to("akka:actor2")
+   *     from("akka:actor2").process(new Processor() {
+   *       def process(e: Exchange) = {
+   *         println("Received exchange: " + e.getIn())
+   *       }
+   *     })
+   *   }
+   * }).inject().supervise();
+   * 
+ */ + def addRoutes(routes: Routes): ActiveObjectGuiceConfiguratorForJava = synchronized { + camelContext.addRoutes(routes) + this + } + + def getGuiceModules = modules + + def reset = synchronized { + modules = new ArrayList[Module] + configRegistry = new HashMap[Class[_], Component] + activeObjectRegistry = new HashMap[String, Tuple2[Class[_], ActiveObjectProxy]] + injector = null + restartStrategy = null + camelContext = new DefaultCamelContext + } + + def stop = synchronized { + camelContext.stop + supervisor.stop + } +} diff --git a/kernel/src/main/scala/config/Config.scala b/kernel/src/main/scala/config/Config.scala new file mode 100644 index 0000000000..aa71471799 --- /dev/null +++ b/kernel/src/main/scala/config/Config.scala @@ -0,0 +1,93 @@ +/** + * Copyright (C) 2009 Scalable Solutions. + */ + +package se.scalablesolutions.akka.kernel.config + +import reflect.BeanProperty + +import se.scalablesolutions.akka.kernel.GenericServerContainer + +/** + * Configuration classes - not to be used as messages. + * + * @author Jonas Bonér + */ +object ScalaConfig { + sealed abstract class ConfigElement + + abstract class Server extends ConfigElement + abstract class FailOverScheme extends ConfigElement + abstract class Scope extends ConfigElement + + case class SupervisorConfig(restartStrategy: RestartStrategy, worker: List[Server]) extends Server + case class Worker(serverContainer: GenericServerContainer, lifeCycle: LifeCycle) extends Server + + case class RestartStrategy(scheme: FailOverScheme, maxNrOfRetries: Int, withinTimeRange: Int) extends ConfigElement + + case object AllForOne extends FailOverScheme + case object OneForOne extends FailOverScheme + + case class LifeCycle(scope: Scope, shutdownTime: Int) extends ConfigElement + case object Permanent extends Scope + case object Transient extends Scope + case object Temporary extends Scope + + case class Component(val name: String, + val intf: Class[_], + val target: Class[_], + val lifeCycle: LifeCycle, + val timeout: Int) extends Server +} + +/** + * @author Jonas Bonér + */ +object JavaConfig { + sealed abstract class ConfigElement + + class RestartStrategy( + @BeanProperty val scheme: FailOverScheme, + @BeanProperty val maxNrOfRetries: Int, + @BeanProperty val withinTimeRange: Int) extends ConfigElement { + def transform = se.scalablesolutions.akka.kernel.config.ScalaConfig.RestartStrategy( + scheme.transform, maxNrOfRetries, withinTimeRange) + } + class LifeCycle(@BeanProperty val scope: Scope, @BeanProperty val shutdownTime: Int) extends ConfigElement { + def transform = se.scalablesolutions.akka.kernel.config.ScalaConfig.LifeCycle(scope.transform, shutdownTime) + } + + abstract class Scope extends ConfigElement { + def transform: se.scalablesolutions.akka.kernel.config.ScalaConfig.Scope + } + class Permanent extends Scope { + override def transform = se.scalablesolutions.akka.kernel.config.ScalaConfig.Permanent + } + class Transient extends Scope { + override def transform = se.scalablesolutions.akka.kernel.config.ScalaConfig.Transient + } + class Temporary extends Scope { + override def transform = se.scalablesolutions.akka.kernel.config.ScalaConfig.Temporary + } + + abstract class FailOverScheme extends ConfigElement { + def transform: se.scalablesolutions.akka.kernel.config.ScalaConfig.FailOverScheme + } + class AllForOne extends FailOverScheme { + override def transform = se.scalablesolutions.akka.kernel.config.ScalaConfig.AllForOne + } + class OneForOne extends FailOverScheme { + override def transform = se.scalablesolutions.akka.kernel.config.ScalaConfig.OneForOne + } + + abstract class Server extends ConfigElement + class Component(@BeanProperty val name: String, + @BeanProperty val intf: Class[_], + @BeanProperty val target: Class[_], + @BeanProperty val lifeCycle: LifeCycle, + @BeanProperty val timeout: Int) extends Server { + def newWorker(proxy: ActiveObjectProxy) = + se.scalablesolutions.akka.kernel.config.ScalaConfig.Worker(proxy.server, lifeCycle.transform) + } + +} \ No newline at end of file diff --git a/kernel/src/test/scala/CamelSpec.scala b/kernel/src/test/scala/CamelSpec.scala new file mode 100644 index 0000000000..1be2bd9925 --- /dev/null +++ b/kernel/src/test/scala/CamelSpec.scala @@ -0,0 +1,105 @@ +/** + * Copyright (C) 2009 Scalable Solutions. + */ + +package se.scalablesolutions.akka.kernel + +import akka.kernel.config.ActiveObjectGuiceConfigurator +import annotation.oneway +import kernel.config.ScalaConfig._ + +import com.google.inject.{AbstractModule, Scopes} +import com.jteigen.scalatest.JUnit4Runner + +import org.junit.runner.RunWith +import org.scalatest._ +import org.scalatest.matchers._ + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.CamelContext +import org.apache.camel.Endpoint +import org.apache.camel.Exchange +import org.apache.camel.Processor +import org.apache.camel.Producer +import org.apache.camel.builder.RouteBuilder +import org.apache.camel.impl.DefaultCamelContext + +/** + * @author Jonas Bonér + */ +@RunWith(classOf[JUnit4Runner]) +class CamelSpec extends Spec with ShouldMatchers { + + describe("A Camel routing scheme") { + + it("should route message from actor A to actor B") { + val latch = new CountDownLatch(1); + + val conf = new ActiveObjectGuiceConfigurator + conf.configureActiveObjects( + RestartStrategy(AllForOne, 3, 5000), + Component( + "camelfoo", + classOf[CamelFoo], + classOf[CamelFooImpl], + LifeCycle(Permanent, 1000), + 1000) :: + Component( + "camelbar", + classOf[CamelBar], + classOf[CamelBarImpl], + LifeCycle(Permanent, 1000), + 1000) :: + Nil + ).addRoutes(new RouteBuilder() { + def configure = { + from("akka:camelfoo.foo").to("akka:camelbar.bar") + from("akka:camelbar.bar").process(new Processor() { + def process(e: Exchange) = { + println("Received exchange: " + e.getIn()) + latch.countDown + } + }) + } + }).supervise + + val endpoint = conf.getRoutingEndpoint("akka:camelfoo.foo") +// println("----- " + endpoint) +// val exchange = endpoint.createExchange +// println("----- " + exchange) + + conf.getActiveObject(classOf[CamelFooImpl].getName).asInstanceOf[CamelFoo].foo("Hello Foo") + +// +// exchange.getIn().setHeader("cheese", 123) +// exchange.getIn().setBody("body") +// +// val producer = endpoint.createProducer +// println("----- " + producer) +// +// producer.process(exchange) +// +// // now lets sleep for a while +// val received = latch.await(5, TimeUnit.SECONDS) +// received should equal (true) +// +// conf.stop + } + } +} + +trait CamelFoo { + @oneway def foo(msg: String) +} +trait CamelBar { + def bar(msg: String): String +} + +class CamelFooImpl extends CamelFoo { + def foo(msg: String) = println("CamelFoo.foo:" + msg) +} +class CamelBarImpl extends CamelBar { + def bar(msg: String) = msg + "return_bar " +} diff --git a/lib/camel-core-2.0-SNAPSHOT.jar b/lib/camel-core-2.0-SNAPSHOT.jar new file mode 100644 index 0000000000..61b8015a70 Binary files /dev/null and b/lib/camel-core-2.0-SNAPSHOT.jar differ diff --git a/lib/cassandra-0.3.0-dev.jar b/lib/cassandra-0.3.0-dev.jar new file mode 100644 index 0000000000..584131f521 Binary files /dev/null and b/lib/cassandra-0.3.0-dev.jar differ diff --git a/util-java/.classpath b/util-java/.classpath new file mode 100644 index 0000000000..a8a9c007ea --- /dev/null +++ b/util-java/.classpath @@ -0,0 +1,9 @@ + + + + + + + + + diff --git a/util-java/.project b/util-java/.project new file mode 100644 index 0000000000..ef0174daf3 --- /dev/null +++ b/util-java/.project @@ -0,0 +1,17 @@ + + + util-java + + + + + + org.eclipse.jdt.core.javabuilder + + + + + + org.eclipse.jdt.core.javanature + + diff --git a/util-java/akka-util-java.iml b/util-java/akka-util-java.iml new file mode 100644 index 0000000000..1dd8480230 --- /dev/null +++ b/util-java/akka-util-java.iml @@ -0,0 +1,21 @@ + + + + + + + + + + + + + + + + + + + + + diff --git a/util-java/src/main/java/se/scalablesolutions/akka/kernel/config/ActiveObjectGuiceModule.java b/util-java/src/main/java/se/scalablesolutions/akka/kernel/config/ActiveObjectGuiceModule.java new file mode 100644 index 0000000000..4e135bd887 --- /dev/null +++ b/util-java/src/main/java/se/scalablesolutions/akka/kernel/config/ActiveObjectGuiceModule.java @@ -0,0 +1,30 @@ +/** + * Copyright (C) 2009 Scalable Solutions. + */ + +package se.scalablesolutions.akka.kernel.config; + +import java.util.List; + +import com.google.inject.AbstractModule; +import com.google.inject.Singleton; +import com.google.inject.jsr250.ResourceProviderFactory; + +/** + * @author Jonas Bonér + */ +public class ActiveObjectGuiceModule extends AbstractModule { + private final List bindings; + + public ActiveObjectGuiceModule(final List bindings) { + this.bindings = bindings; + } + + protected void configure() { + bind(ResourceProviderFactory.class); + for (int i = 0; i < bindings.size(); i++) { + final DependencyBinding db = bindings.get(i); + bind((Class) db.getInterface()).to((Class) db.getTarget()).in(Singleton.class); + } + } +} diff --git a/util-java/src/main/java/se/scalablesolutions/akka/kernel/config/DependencyBinding.java b/util-java/src/main/java/se/scalablesolutions/akka/kernel/config/DependencyBinding.java new file mode 100644 index 0000000000..42176be9dc --- /dev/null +++ b/util-java/src/main/java/se/scalablesolutions/akka/kernel/config/DependencyBinding.java @@ -0,0 +1,24 @@ +/** + * Copyright (C) 2009 Scalable Solutions. + */ + +package se.scalablesolutions.akka.kernel.config; + +/** + * @author Jonas Bonér + */ +public class DependencyBinding { + private final Class intf; + private final Class target; + + public DependencyBinding(final Class intf, final Class target) { + this.intf = intf; + this.target = target; + } + public Class getInterface() { + return intf; + } + public Class getTarget() { + return target; + } +}