initial camel integration

This commit is contained in:
Jonas Boner 2009-05-09 17:18:31 +02:00
parent c5062e50f7
commit 46ede93684
21 changed files with 1070 additions and 0 deletions

View file

@ -0,0 +1,62 @@
<?xml version="1.0" encoding="UTF-8"?>
<module relativePaths="true" MavenProjectsManager.isMavenModule="true" type="JAVA_MODULE" version="4">
<component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_5" inherit-compiler-output="false">
<output url="file://$MODULE_DIR$/target/classes" />
<output-test url="file://$MODULE_DIR$/target/test-classes" />
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/src/main" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src/test/java" isTestSource="true" />
<excludeFolder url="file://$MODULE_DIR$/target" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="module" module-name="akka-kernel" exported="" />
<orderEntry type="module" module-name="akka-util-java" exported="" />
<orderEntry type="library" exported="" name="Maven: org.guiceyfruit:guice-core:2.0-SNAPSHOT" level="project" />
<orderEntry type="library" exported="" name="Maven: com.google.code.google-collections:google-collect:snapshot-20080530" level="project" />
<orderEntry type="library" exported="" name="Maven: cglib:cglib:2.2" level="project" />
<orderEntry type="library" exported="" name="Maven: asm:asm:3.1" level="project" />
<orderEntry type="library" exported="" name="Maven: aopalliance:aopalliance:1.0" level="project" />
<orderEntry type="library" exported="" name="Maven: org.guiceyfruit:guice-jsr250:2.0-SNAPSHOT" level="project" />
<orderEntry type="library" exported="" name="Maven: javax.annotation:jsr250-api:1.0" level="project" />
<orderEntry type="library" exported="" name="Maven: org.scala-lang:scala-library:2.7.3" level="project" />
<orderEntry type="library" exported="" name="Maven: net.lag:configgy:1.2" level="project" />
<orderEntry type="library" exported="" name="Maven: com.sun.grizzly:grizzly-servlet-webserver:1.8.6.3" level="project" />
<orderEntry type="library" exported="" name="Maven: com.sun.grizzly:grizzly-http:1.8.6.3" level="project" />
<orderEntry type="library" exported="" name="Maven: com.sun.grizzly:grizzly-framework:1.8.6.3" level="project" />
<orderEntry type="library" exported="" name="Maven: com.sun.grizzly:grizzly-http-utils:1.8.6.3" level="project" />
<orderEntry type="library" exported="" name="Maven: com.sun.grizzly:grizzly-rcm:1.8.6.3" level="project" />
<orderEntry type="library" exported="" name="Maven: com.sun.grizzly:grizzly-portunif:1.8.6.3" level="project" />
<orderEntry type="library" exported="" name="Maven: com.sun.grizzly:grizzly-http-servlet:1.8.6.3" level="project" />
<orderEntry type="library" exported="" name="Maven: javax.servlet:servlet-api:2.5" level="project" />
<orderEntry type="library" exported="" name="Maven: com.sun.jersey:jersey-server:1.0.1" level="project" />
<orderEntry type="library" exported="" name="Maven: com.sun.jersey:jersey-core:1.0.1" level="project" />
<orderEntry type="library" exported="" name="Maven: javax.ws.rs:jsr311-api:1.0" level="project" />
<orderEntry type="library" exported="" name="Maven: com.sun.jersey:jersey-json:1.0.1" level="project" />
<orderEntry type="library" exported="" name="Maven: org.codehaus.jettison:jettison:1.0.1" level="project" />
<orderEntry type="library" exported="" name="Maven: stax:stax-api:1.0.1" level="project" />
<orderEntry type="library" exported="" name="Maven: com.sun.xml.bind:jaxb-impl:2.1" level="project" />
<orderEntry type="library" exported="" name="Maven: javax.xml.bind:jaxb-api:2.1" level="project" />
<orderEntry type="library" exported="" name="Maven: javax.xml.stream:stax-api:1.0-2" level="project" />
<orderEntry type="library" exported="" name="Maven: javax.activation:activation:1.1" level="project" />
<orderEntry type="library" exported="" name="Maven: com.sun.jersey:jersey-atom:1.0.1" level="project" />
<orderEntry type="library" exported="" name="Maven: rome:rome:0.9" level="project" />
<orderEntry type="library" exported="" name="Maven: jdom:jdom:1.0" level="project" />
<orderEntry type="library" exported="" name="Maven: org.jboss.netty:netty:3.1.0.BETA2" level="project" />
<orderEntry type="library" exported="" name="Maven: org.apache.cassandra:cassandra:0.3.0-dev" level="project" />
<orderEntry type="library" exported="" name="Maven: com.facebook:thrift:1.0" level="project" />
<orderEntry type="library" exported="" name="Maven: com.facebook:fb303:1.0" level="project" />
<orderEntry type="library" exported="" name="Maven: org.apache.camel:camel-core:2.0-SNAPSHOT" level="project" />
<orderEntry type="library" exported="" name="Maven: commons-logging:commons-logging-api:1.1" level="project" />
<orderEntry type="library" exported="" name="Maven: com.assembla.scala.mina:mina-core:2.0.0-M2-SNAPSHOT" level="project" />
<orderEntry type="library" exported="" name="Maven: org.slf4j:slf4j-api:1.4.3" level="project" />
<orderEntry type="library" exported="" name="Maven: com.assembla.scala.mina:mina-integration-scala:2.0.0-M2-SNAPSHOT" level="project" />
<orderEntry type="library" exported="" name="Maven: org.slf4j:slf4j-log4j12:1.4.3" level="project" />
<orderEntry type="library" exported="" name="Maven: log4j:log4j:1.2.13" level="project" />
<orderEntry type="library" exported="" name="Maven: junit:junit:4.5" level="project" />
<orderEntry type="library" name="Maven: org.jmock:jmock:2.4.0" level="project" />
<orderEntry type="library" name="Maven: org.hamcrest:hamcrest-core:1.1" level="project" />
<orderEntry type="library" name="Maven: org.hamcrest:hamcrest-library:1.1" level="project" />
</component>
</module>

View file

@ -0,0 +1,121 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.api;
import com.google.inject.*;
import com.google.inject.jsr250.ResourceProviderFactory;
import se.scalablesolutions.akka.kernel.configuration.*;
import se.scalablesolutions.akka.kernel.ActiveObjectFactory;
import se.scalablesolutions.akka.kernel.ActiveObjectProxy;
import se.scalablesolutions.akka.kernel.Supervisor;
import se.scalablesolutions.akka.kernel.Worker;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.HashMap;
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
public class ActiveObjectGuiceConfigurator {
private List<Module> modules = new ArrayList<Module>();
private Injector injector;
private Supervisor supervisor;
private RestartStrategy restartStrategy;
private Component[] components;
private Map<Class, Component> configRegistry = new HashMap<Class, Component>(); // TODO is configRegistry needed?
private Map<Class, ActiveObjectProxy> activeObjectRegistry = new HashMap<Class, ActiveObjectProxy>();
private ActiveObjectFactory activeObjectFactory = new ActiveObjectFactory();
public synchronized <T> T getExternalDependency(Class<T> clazz) {
return injector.getInstance(clazz);
}
/**
* Returns the active abject that has been put under supervision for the class specified.
*
* @param clazz the class for the active object
* @return the active object for the class
*/
public synchronized <T> T getActiveObject(Class<T> clazz) {
if (injector == null) throw new IllegalStateException("inject() and supervise() must be called before invoking newInstance(clazz)");
if (activeObjectRegistry.containsKey(clazz)) {
final ActiveObjectProxy activeObjectProxy = activeObjectRegistry.get(clazz);
activeObjectProxy.setTargetInstance(injector.getInstance(clazz));
return (T)activeObjectFactory.newInstance(clazz, activeObjectProxy);
} else throw new IllegalStateException("Class " + clazz.getName() + " has not been put under supervision (by passing in the config to the supervise() method");
}
public synchronized ActiveObjectGuiceConfigurator configureActiveObjects(final RestartStrategy restartStrategy, final Component[] components) {
this.restartStrategy = restartStrategy;
this.components = components;
modules.add(new AbstractModule() {
protected void configure() {
bind(ResourceProviderFactory.class);
for (int i = 0; i < components.length; i++) {
Component c = components[i];
bind((Class) c.intf()).to((Class) c.target()).in(Singleton.class);
}
}
});
return this;
}
public synchronized ActiveObjectGuiceConfigurator inject() {
if (injector != null) throw new IllegalStateException("inject() has already been called on this configurator");
injector = Guice.createInjector(modules);
return this;
}
public synchronized ActiveObjectGuiceConfigurator supervise() {
if (injector == null) inject();
injector = Guice.createInjector(modules);
List<Worker> workers = new ArrayList<Worker>();
for (int i = 0; i < components.length; i++) {
final Component c = components[i];
final ActiveObjectProxy activeObjectProxy = new ActiveObjectProxy(c.intf(), c.target(), c.timeout());
workers.add(c.newWorker(activeObjectProxy));
activeObjectRegistry.put(c.intf(), activeObjectProxy);
}
supervisor = activeObjectFactory.supervise(restartStrategy.transform(), workers);
return this;
}
/**
* Add additional services to be wired in.
* <pre>
* ActiveObjectGuiceConfigurator.addExternalGuiceModule(new AbstractModule {
* protected void configure() {
* bind(Foo.class).to(FooImpl.class).in(Scopes.SINGLETON);
* bind(BarImpl.class);
* link(Bar.class).to(BarImpl.class);
* bindConstant(named("port")).to(8080);
* }})
* </pre>
*/
public synchronized ActiveObjectGuiceConfigurator addExternalGuiceModule(Module module) {
modules.add(module);
return this;
}
public List<Module> getGuiceModules() {
return modules;
}
public synchronized void reset() {
modules = new ArrayList<Module>();
configRegistry = new HashMap<Class, Component>();
activeObjectRegistry = new HashMap<Class, ActiveObjectProxy>();
injector = null;
restartStrategy = null;
}
public synchronized void stop() {
supervisor.stop();
}
}

67
kernel/akka-kernel.iml Normal file
View file

@ -0,0 +1,67 @@
<?xml version="1.0" encoding="UTF-8"?>
<module relativePaths="true" MavenProjectsManager.isMavenModule="true" type="JAVA_MODULE" version="4">
<component name="NewModuleRootManager" inherit-compiler-output="false">
<output url="file://$MODULE_DIR$/target/classes" />
<output-test url="file://$MODULE_DIR$/target/test-classes" />
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/src/main/resources" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src/main/scala" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src/test/scala" isTestSource="true" />
<excludeFolder url="file://$MODULE_DIR$/target" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="module-library">
<library name="Scala 2.7.3">
<CLASSES>
<root url="jar://$MODULE_DIR$/../../../../bin/scala-2.7.3.final/lib/scala-compiler.jar!/" />
<root url="jar://$MODULE_DIR$/../../../../bin/scala-2.7.3.final/lib/scala-library.jar!/" />
</CLASSES>
<JAVADOC />
<SOURCES />
</library>
</orderEntry>
<orderEntry type="library" exported="" name="Maven: se.scalablesolutions.akka:akka-util-java:0.1" level="project" />
<orderEntry type="library" exported="" name="Maven: org.scala-lang:scala-library:2.7.3" level="project" />
<orderEntry type="library" exported="" name="Maven: net.lag:configgy:1.2" level="project" />
<orderEntry type="library" exported="" name="Maven: org.guiceyfruit:guice-core:2.0-SNAPSHOT" level="project" />
<orderEntry type="library" exported="" name="Maven: com.google.code.google-collections:google-collect:snapshot-20080530" level="project" />
<orderEntry type="library" exported="" name="Maven: cglib:cglib:2.2" level="project" />
<orderEntry type="library" exported="" name="Maven: asm:asm:3.1" level="project" />
<orderEntry type="library" exported="" name="Maven: aopalliance:aopalliance:1.0" level="project" />
<orderEntry type="library" exported="" name="Maven: org.guiceyfruit:guice-jsr250:2.0-SNAPSHOT" level="project" />
<orderEntry type="library" exported="" name="Maven: javax.annotation:jsr250-api:1.0" level="project" />
<orderEntry type="library" exported="" name="Maven: com.sun.grizzly:grizzly-servlet-webserver:1.8.6.3" level="project" />
<orderEntry type="library" exported="" name="Maven: com.sun.grizzly:grizzly-http:1.8.6.3" level="project" />
<orderEntry type="library" exported="" name="Maven: com.sun.grizzly:grizzly-framework:1.8.6.3" level="project" />
<orderEntry type="library" exported="" name="Maven: com.sun.grizzly:grizzly-http-utils:1.8.6.3" level="project" />
<orderEntry type="library" exported="" name="Maven: com.sun.grizzly:grizzly-rcm:1.8.6.3" level="project" />
<orderEntry type="library" exported="" name="Maven: com.sun.grizzly:grizzly-portunif:1.8.6.3" level="project" />
<orderEntry type="library" exported="" name="Maven: com.sun.grizzly:grizzly-http-servlet:1.8.6.3" level="project" />
<orderEntry type="library" exported="" name="Maven: javax.servlet:servlet-api:2.5" level="project" />
<orderEntry type="library" exported="" name="Maven: com.sun.jersey:jersey-server:1.0.1" level="project" />
<orderEntry type="library" exported="" name="Maven: com.sun.jersey:jersey-core:1.0.1" level="project" />
<orderEntry type="library" exported="" name="Maven: javax.ws.rs:jsr311-api:1.0" level="project" />
<orderEntry type="library" exported="" name="Maven: com.sun.jersey:jersey-json:1.0.1" level="project" />
<orderEntry type="library" exported="" name="Maven: org.codehaus.jettison:jettison:1.0.1" level="project" />
<orderEntry type="library" exported="" name="Maven: stax:stax-api:1.0.1" level="project" />
<orderEntry type="library" exported="" name="Maven: com.sun.xml.bind:jaxb-impl:2.1" level="project" />
<orderEntry type="library" exported="" name="Maven: javax.xml.bind:jaxb-api:2.1" level="project" />
<orderEntry type="library" exported="" name="Maven: javax.xml.stream:stax-api:1.0-2" level="project" />
<orderEntry type="library" exported="" name="Maven: javax.activation:activation:1.1" level="project" />
<orderEntry type="library" exported="" name="Maven: com.sun.jersey:jersey-atom:1.0.1" level="project" />
<orderEntry type="library" exported="" name="Maven: rome:rome:0.9" level="project" />
<orderEntry type="library" exported="" name="Maven: jdom:jdom:1.0" level="project" />
<orderEntry type="library" exported="" name="Maven: org.jboss.netty:netty:3.1.0.BETA2" level="project" />
<orderEntry type="library" exported="" name="Maven: org.apache.cassandra:cassandra:0.3.0-dev" level="project" />
<orderEntry type="library" exported="" name="Maven: com.facebook:thrift:1.0" level="project" />
<orderEntry type="library" exported="" name="Maven: com.facebook:fb303:1.0" level="project" />
<orderEntry type="library" exported="" name="Maven: com.assembla.scala.mina:mina-core:2.0.0-M2-SNAPSHOT" level="project" />
<orderEntry type="library" exported="" name="Maven: org.slf4j:slf4j-api:1.4.3" level="project" />
<orderEntry type="library" exported="" name="Maven: com.assembla.scala.mina:mina-integration-scala:2.0.0-M2-SNAPSHOT" level="project" />
<orderEntry type="library" exported="" name="Maven: org.slf4j:slf4j-log4j12:1.4.3" level="project" />
<orderEntry type="library" exported="" name="Maven: log4j:log4j:1.2.13" level="project" />
<orderEntry type="library" name="Maven: org.scala-tools.testing:scalatest:0.9.5" level="project" />
</component>
</module>

View file

@ -0,0 +1 @@
class=se.scalablesolutions.akka.kernel.camel.ActiveObjectComponent

View file

@ -0,0 +1,22 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.kernel.camel
import config.ActiveObjectGuiceConfigurator
import java.util.Map
import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue}
import org.apache.camel.{Endpoint, Exchange}
import org.apache.camel.impl.DefaultComponent
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class ActiveObjectComponent(val conf: ActiveObjectGuiceConfigurator) extends DefaultComponent {
override def createEndpoint(uri: String, remaining: String, parameters: Map[_,_]): Endpoint = {
//val consumers = getAndRemoveParameter(parameters, "concurrentConsumers", classOf[Int], 1)
new ActiveObjectEndpoint(uri, this, conf)
}
}

View file

@ -0,0 +1,36 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.kernel.camel
import java.util.concurrent.{BlockingQueue, ExecutorService, Executors, ThreadFactory, TimeUnit}
import se.scalablesolutions.akka.kernel.{Logging, GenericServerContainer}
import org.apache.camel.{AsyncCallback, AsyncProcessor, Consumer, Exchange, Processor}
import org.apache.camel.impl.ServiceSupport
import org.apache.camel.impl.converter.AsyncProcessorTypeConverter
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class ActiveObjectConsumer(
val endpoint: ActiveObjectEndpoint,
proc: Processor,
val activeObject: AnyRef)
extends ServiceSupport with Consumer with Runnable with Logging {
val processor = AsyncProcessorTypeConverter.convert(proc)
println("------- creating consumer for: "+ endpoint.uri)
override def run = {
}
def doStart() = {
}
def doStop() = {
}
override def toString(): String = "ActiveObjectConsumer [" + endpoint.getEndpointUri + "]"
}

View file

@ -0,0 +1,47 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.kernel.camel
import config.ActiveObjectGuiceConfigurator
import se.scalablesolutions.akka.kernel.Logging
import java.util.{ArrayList, HashSet, List, Set}
import java.util.concurrent.{BlockingQueue, CopyOnWriteArraySet, LinkedBlockingQueue}
import org.apache.camel.{Component, Consumer, Exchange, Processor, Producer}
import org.apache.camel.impl.{DefaultEndpoint, DefaultComponent};
import org.apache.camel.spi.BrowsableEndpoint;
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class ActiveObjectEndpoint(val uri: String, val component: DefaultComponent, val conf: ActiveObjectGuiceConfigurator) // FIXME: need abstraction trait here
extends DefaultEndpoint(uri) with BrowsableEndpoint with Logging {
val firstSep = uri.indexOf(':')
val lastSep = uri.lastIndexOf( '.')
val scheme = uri.substring(0, firstSep)
val activeObjectName = uri.substring(uri.indexOf(':') + 1, lastSep)
val methodName = uri.substring(lastSep + 1, uri.length)
val activeObject = conf.getActiveObject(activeObjectName).asInstanceOf[MessageDriven]
// val activeObjectProxy = conf.getActiveObjectProxy(activeObjectName)
// val genericServer = supervisor.getServerOrElse(
// activeObjectName,
// throw new IllegalArgumentException("Can't find active object with name [" + activeObjectName + "] and method [" + methodName + "]"))
log.debug("Creating Camel Endpoint for scheme [%s] and component [%s]", scheme, activeObjectName)
private var queue: BlockingQueue[Exchange] = new LinkedBlockingQueue[Exchange](1000)
override def createProducer: Producer = new ActiveObjectProducer(this, activeObject)
override def createConsumer(processor: Processor): Consumer = new ActiveObjectConsumer(this, processor, activeObject)
override def getExchanges: List[Exchange] = new ArrayList[Exchange](queue)
override def isSingleton = true
}

View file

@ -0,0 +1,43 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.kernel.camel
import java.util.Collection;
import java.util.concurrent.BlockingQueue;
import se.scalablesolutions.akka.kernel.{Logging, GenericServerContainer}
import org.apache.camel.{Exchange, AsyncProcessor, AsyncCallback}
import org.apache.camel.impl.DefaultProducer
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class ActiveObjectProducer(
val endpoint: ActiveObjectEndpoint,
val activeObject: MessageDriven)
extends DefaultProducer(endpoint) with AsyncProcessor with Logging {
private val actorName = endpoint.activeObjectName
def process(exchange: Exchange) = activeObject.onMessage(exchange) // FIXME: should we not invoke the generic server here?
def process(exchange: Exchange, callback: AsyncCallback): Boolean = {
val copy = exchange.copy
copy.setProperty("CamelAsyncCallback", callback)
activeObject.onMessage(copy)
callback.done(true)
true
}
override def doStart = {
super.doStart
}
override def doStop = {
super.doStop
}
override def toString(): String = "ActiveObjectProducer [" + endpoint.getEndpointUri + "]"
}

View file

@ -0,0 +1,14 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.kernel.camel
import org.apache.camel.Exchange
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait MessageDriven {
def onMessage(exchange: Exchange)
}

View file

@ -0,0 +1,14 @@
//**
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.kernel.camel
import org.apache.camel.impl.{DefaultCamelContext, DefaultEndpoint, DefaultComponent}
import se.scalablesolutions.akka.kernel.{Supervisor, Logging}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class SupervisorAwareCamelContext extends DefaultCamelContext with Logging {
var supervisor: Supervisor = _
}

View file

@ -0,0 +1,189 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.kernel.config
import com.google.inject._
import com.google.inject.jsr250.ResourceProviderFactory
import java.lang.reflect.Method
import kernel.camel.ActiveObjectComponent
import org.apache.camel.impl.{JndiRegistry, DefaultCamelContext}
import org.apache.camel.{Endpoint, Routes}
import scala.collection.mutable.HashMap
import se.scalablesolutions.akka.kernel.ActiveObjectFactory
import se.scalablesolutions.akka.kernel.ActiveObjectProxy
import se.scalablesolutions.akka.kernel.Supervisor
import se.scalablesolutions.akka.kernel.config.ScalaConfig._
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class ActiveObjectGuiceConfigurator extends Logging {
val AKKA_CAMEL_ROUTING_SCHEME = "akka"
private var injector: Injector = _
private var supervisor: Supervisor = _
private var restartStrategy: RestartStrategy = _
private var components: List[Component] = _
private var bindings: List[DependencyBinding] = Nil
private var configRegistry = new HashMap[Class[_], Component] // TODO is configRegistry needed?
private var activeObjectRegistry = new HashMap[String, Tuple3[Class[_], Class[_], ActiveObjectProxy]]
private var activeObjectFactory = new ActiveObjectFactory
private var camelContext = new DefaultCamelContext();
private var modules = new java.util.ArrayList[Module]
private var methodToUriRegistry = new HashMap[Method, String]
def getExternalDependency[T](clazz: Class[T]): T = synchronized {
injector.getInstance(clazz).asInstanceOf[T]
}
def getRoutingEndpoint(uri: String): Endpoint = synchronized {
camelContext.getEndpoint(uri)
}
def getRoutingEndpoints: java.util.Collection[Endpoint] = synchronized {
camelContext.getEndpoints
}
def getRoutingEndpoints(uri: String): java.util.Collection[Endpoint] = synchronized {
camelContext.getEndpoints(uri)
}
/**
* Returns the active abject that has been put under supervision for the class specified.
*
* @param clazz the class for the active object
* @return the active object for the class
*/
def getActiveObject(name: String): AnyRef = synchronized {
//def getActiveObject[T](name: String): T = synchronized {
println("Looking up active object " + name)
log.debug("Looking up active object [%s]", name)
if (injector == null) throw new IllegalStateException("inject() and supervise() must be called before invoking newInstance(clazz)")
val activeObjectOption: Option[Tuple3[Class[_], Class[_], ActiveObjectProxy]] = activeObjectRegistry.get(name)
if (activeObjectOption.isDefined) {
val classInfo = activeObjectOption.get
val intfClass = classInfo._1
val implClass = classInfo._2
val activeObjectProxy = classInfo._3
//activeObjectProxy.setTargetInstance(injector.getInstance(clazz).asInstanceOf[AnyRef])
val target = implClass.newInstance
injector.injectMembers(target)
activeObjectProxy.setTargetInstance(target.asInstanceOf[AnyRef])
activeObjectFactory.newInstance(intfClass, activeObjectProxy).asInstanceOf[AnyRef]
} else throw new IllegalStateException("Class " + name + " has not been put under supervision (by passing in the config to the 'supervise') method")
}
def getActiveObjectProxy(name: String): ActiveObjectProxy = synchronized {
log.debug("Looking up active object proxy [%s]", name)
if (injector == null) throw new IllegalStateException("inject() and supervise() must be called before invoking newInstance(clazz)")
val activeObjectOption: Option[Tuple3[Class[_], Class[_], ActiveObjectProxy]] = activeObjectRegistry.get(name)
if (activeObjectOption.isDefined) activeObjectOption.get._3
else throw new IllegalStateException("Class " + name + " has not been put under supervision (by passing in the config to the 'supervise') method")
}
def configureActiveObjects(restartStrategy: RestartStrategy, components: List[Component]): ActiveObjectGuiceConfigurator = synchronized {
this.restartStrategy = restartStrategy
this.components = components.toArray.toList.asInstanceOf[List[Component]]
bindings = for (c <- this.components)
yield new DependencyBinding(c.intf, c.target) // build up the Guice interface class -> impl class bindings
val arrayList = new java.util.ArrayList[DependencyBinding]()
for (b <- bindings) arrayList.add(b)
modules.add(new ActiveObjectGuiceModule(arrayList))
this
}
def inject: ActiveObjectGuiceConfigurator = synchronized {
if (injector != null) throw new IllegalStateException("inject() has already been called on this configurator")
injector = Guice.createInjector(modules)
this
}
def supervise: ActiveObjectGuiceConfigurator = synchronized {
if (injector == null) inject
injector = Guice.createInjector(modules)
var workers = new java.util.ArrayList[Worker]
for (component <- components) {
val activeObjectProxy = new ActiveObjectProxy(component.intf, component.target, component.timeout, this)
workers.add(Worker(activeObjectProxy.server, component.lifeCycle))
activeObjectRegistry.put(component.name, (component.intf, component.target, activeObjectProxy))
camelContext.getRegistry.asInstanceOf[JndiRegistry].bind(component.name, activeObjectProxy)
for (method <- component.intf.getDeclaredMethods.toList) {
registerMethodForUri(method, component.name)
}
log.debug("Registering active object in Camel context under the name [%s]", component.target.getName)
}
supervisor = activeObjectFactory.supervise(restartStrategy, workers)
camelContext.addComponent(AKKA_CAMEL_ROUTING_SCHEME, new ActiveObjectComponent(this))
camelContext.start
this
}
/**
* Add additional services to be wired in.
* <pre>
* ActiveObjectGuiceModule.addExternalGuiceModule(new AbstractModule {
* protected void configure() {
* bind(Foo.class).to(FooImpl.class).in(Scopes.SINGLETON);
* bind(BarImpl.class);
* link(Bar.class).to(BarImpl.class);
* bindConstant(named("port")).to(8080);
* }})
* </pre>
*/
def addExternalGuiceModule(module: Module): ActiveObjectGuiceConfigurator = synchronized {
modules.add(module)
this
}
/**
* Add Camel routes for the active objects.
* <pre>
* activeObjectGuiceModule.addRoutes(new RouteBuilder() {
* def configure = {
* from("akka:actor1").to("akka:actor2")
* from("akka:actor2").process(new Processor() {
* def process(e: Exchange) = {
* println("Received exchange: " + e.getIn())
* }
* })
* }
* }).inject().supervise();
* </pre>
*/
def addRoutes(routes: Routes): ActiveObjectGuiceConfigurator = synchronized {
camelContext.addRoutes(routes)
this
}
def getGuiceModules = modules
def reset = synchronized {
modules = new java.util.ArrayList[Module]
configRegistry = new HashMap[Class[_], Component]
activeObjectRegistry = new HashMap[String, Tuple3[Class[_], Class[_], ActiveObjectProxy]]
methodToUriRegistry = new HashMap[Method, String]
injector = null
restartStrategy = null
camelContext = new DefaultCamelContext
}
def stop = synchronized {
camelContext.stop
supervisor.stop
}
def registerMethodForUri(method: Method, componentName: String) =
methodToUriRegistry += method -> buildUri(method, componentName)
def lookupUriFor(method: Method): String =
methodToUriRegistry.getOrElse(method, throw new IllegalStateException("Could not find URI for method [" + method.getName + "]"))
def buildUri(method: Method, componentName: String): String =
AKKA_CAMEL_ROUTING_SCHEME + ":" + componentName + "." + method.getName
}

View file

@ -0,0 +1,155 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.kernel.config
import akka.kernel.config.JavaConfig._
import akka.kernel.{Supervisor, ActiveObjectProxy, ActiveObjectFactory}
import akka.kernel.config.{DependencyBinding, ActiveObjectGuiceModule}
import com.google.inject._
import com.google.inject.jsr250.ResourceProviderFactory
import java.util.{ArrayList, HashMap, Collection}
import org.apache.camel.impl.{JndiRegistry, DefaultCamelContext}
import org.apache.camel.{Endpoint, Routes}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class ActiveObjectGuiceConfiguratorForJava {
private var modules = new ArrayList[Module]
private var injector: Injector = _
private var supervisor: Supervisor = _
private var restartStrategy: RestartStrategy = _
private var components: List[Component] = _
private var bindings: List[DependencyBinding] = Nil
private var configRegistry = new HashMap[Class[_], Component] // TODO is configRegistry needed?
private var activeObjectRegistry = new HashMap[String, Tuple2[Class[_], ActiveObjectProxy]]
private var activeObjectFactory = new ActiveObjectFactory
private var camelContext = new DefaultCamelContext();
def getExternalDependency[T](clazz: Class[T]): T = synchronized {
injector.getInstance(clazz).asInstanceOf[T]
}
def getRoutingEndpoint(uri: String): Endpoint = synchronized {
camelContext.getEndpoint(uri)
}
def getRoutingEndpoints: Collection[Endpoint] = synchronized {
camelContext.getEndpoints
}
def getRoutingEndpoints(uri: String): Collection[Endpoint] = synchronized {
camelContext.getEndpoints(uri)
}
/**
* Returns the active abject that has been put under supervision for the class specified.
*
* @param clazz the class for the active object
* @return the active object for the class
*/
def getActiveObject[T](name: String): T = synchronized {
if (injector == null) throw new IllegalStateException("inject() and supervise() must be called before invoking newInstance(clazz)")
if (activeObjectRegistry.containsKey(name)) {
val activeObjectTuple = activeObjectRegistry.get(name)
val clazz = activeObjectTuple._1
val activeObjectProxy = activeObjectTuple._2
//activeObjectProxy.setTargetInstance(injector.getInstance(clazz).asInstanceOf[AnyRef])
val target = clazz.newInstance
injector.injectMembers(target)
activeObjectProxy.setTargetInstance(target.asInstanceOf[AnyRef])
activeObjectFactory.newInstance(clazz, activeObjectProxy).asInstanceOf[T]
} else throw new IllegalStateException("Class " + name + " has not been put under supervision (by passing in the config to the supervise() method")
}
def configureActiveObjects(restartStrategy: RestartStrategy, components: Array[Component]): ActiveObjectGuiceConfiguratorForJava = synchronized {
this.restartStrategy = restartStrategy
this.components = components.toArray.toList.asInstanceOf[List[Component]]
bindings = for (c <- this.components) yield {
new DependencyBinding(c.intf, c.target)
}
val arrayList = new ArrayList[DependencyBinding]()
for (b <- bindings) arrayList.add(b)
modules.add(new ActiveObjectGuiceModule(arrayList))
this
}
def inject(): ActiveObjectGuiceConfiguratorForJava = synchronized {
if (injector != null) throw new IllegalStateException("inject() has already been called on this configurator")
injector = Guice.createInjector(modules)
this
}
def supervise: ActiveObjectGuiceConfiguratorForJava = synchronized {
if (injector == null) inject()
injector = Guice.createInjector(modules)
val workers = new java.util.ArrayList[se.scalablesolutions.akka.kernel.config.ScalaConfig.Worker]
for (c <- components) {
val activeObjectProxy = new ActiveObjectProxy(c.intf, c.target, c.timeout)
workers.add(c.newWorker(activeObjectProxy))
activeObjectRegistry.put(c.name, (c.intf, activeObjectProxy))
camelContext.getRegistry.asInstanceOf[JndiRegistry].bind(c.intf.getName, activeObjectProxy)
}
supervisor = activeObjectFactory.supervise(restartStrategy.transform, workers)
camelContext.start
this
}
/**
* Add additional services to be wired in.
* <pre>
* ActiveObjectGuiceModule.addExternalGuiceModule(new AbstractModule {
* protected void configure() {
* bind(Foo.class).to(FooImpl.class).in(Scopes.SINGLETON);
* bind(BarImpl.class);
* link(Bar.class).to(BarImpl.class);
* bindConstant(named("port")).to(8080);
* }})
* </pre>
*/
def addExternalGuiceModule(module: Module): ActiveObjectGuiceConfiguratorForJava = synchronized {
modules.add(module)
this
}
/**
* Add Camel routes for the active objects.
* <pre>
* activeObjectGuiceModule.addRoutes(new RouteBuilder() {
* def configure = {
* from("akka:actor1").to("akka:actor2")
* from("akka:actor2").process(new Processor() {
* def process(e: Exchange) = {
* println("Received exchange: " + e.getIn())
* }
* })
* }
* }).inject().supervise();
* </pre>
*/
def addRoutes(routes: Routes): ActiveObjectGuiceConfiguratorForJava = synchronized {
camelContext.addRoutes(routes)
this
}
def getGuiceModules = modules
def reset = synchronized {
modules = new ArrayList[Module]
configRegistry = new HashMap[Class[_], Component]
activeObjectRegistry = new HashMap[String, Tuple2[Class[_], ActiveObjectProxy]]
injector = null
restartStrategy = null
camelContext = new DefaultCamelContext
}
def stop = synchronized {
camelContext.stop
supervisor.stop
}
}

View file

@ -0,0 +1,93 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.kernel.config
import reflect.BeanProperty
import se.scalablesolutions.akka.kernel.GenericServerContainer
/**
* Configuration classes - not to be used as messages.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object ScalaConfig {
sealed abstract class ConfigElement
abstract class Server extends ConfigElement
abstract class FailOverScheme extends ConfigElement
abstract class Scope extends ConfigElement
case class SupervisorConfig(restartStrategy: RestartStrategy, worker: List[Server]) extends Server
case class Worker(serverContainer: GenericServerContainer, lifeCycle: LifeCycle) extends Server
case class RestartStrategy(scheme: FailOverScheme, maxNrOfRetries: Int, withinTimeRange: Int) extends ConfigElement
case object AllForOne extends FailOverScheme
case object OneForOne extends FailOverScheme
case class LifeCycle(scope: Scope, shutdownTime: Int) extends ConfigElement
case object Permanent extends Scope
case object Transient extends Scope
case object Temporary extends Scope
case class Component(val name: String,
val intf: Class[_],
val target: Class[_],
val lifeCycle: LifeCycle,
val timeout: Int) extends Server
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object JavaConfig {
sealed abstract class ConfigElement
class RestartStrategy(
@BeanProperty val scheme: FailOverScheme,
@BeanProperty val maxNrOfRetries: Int,
@BeanProperty val withinTimeRange: Int) extends ConfigElement {
def transform = se.scalablesolutions.akka.kernel.config.ScalaConfig.RestartStrategy(
scheme.transform, maxNrOfRetries, withinTimeRange)
}
class LifeCycle(@BeanProperty val scope: Scope, @BeanProperty val shutdownTime: Int) extends ConfigElement {
def transform = se.scalablesolutions.akka.kernel.config.ScalaConfig.LifeCycle(scope.transform, shutdownTime)
}
abstract class Scope extends ConfigElement {
def transform: se.scalablesolutions.akka.kernel.config.ScalaConfig.Scope
}
class Permanent extends Scope {
override def transform = se.scalablesolutions.akka.kernel.config.ScalaConfig.Permanent
}
class Transient extends Scope {
override def transform = se.scalablesolutions.akka.kernel.config.ScalaConfig.Transient
}
class Temporary extends Scope {
override def transform = se.scalablesolutions.akka.kernel.config.ScalaConfig.Temporary
}
abstract class FailOverScheme extends ConfigElement {
def transform: se.scalablesolutions.akka.kernel.config.ScalaConfig.FailOverScheme
}
class AllForOne extends FailOverScheme {
override def transform = se.scalablesolutions.akka.kernel.config.ScalaConfig.AllForOne
}
class OneForOne extends FailOverScheme {
override def transform = se.scalablesolutions.akka.kernel.config.ScalaConfig.OneForOne
}
abstract class Server extends ConfigElement
class Component(@BeanProperty val name: String,
@BeanProperty val intf: Class[_],
@BeanProperty val target: Class[_],
@BeanProperty val lifeCycle: LifeCycle,
@BeanProperty val timeout: Int) extends Server {
def newWorker(proxy: ActiveObjectProxy) =
se.scalablesolutions.akka.kernel.config.ScalaConfig.Worker(proxy.server, lifeCycle.transform)
}
}

View file

@ -0,0 +1,105 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.kernel
import akka.kernel.config.ActiveObjectGuiceConfigurator
import annotation.oneway
import kernel.config.ScalaConfig._
import com.google.inject.{AbstractModule, Scopes}
import com.jteigen.scalatest.JUnit4Runner
import org.junit.runner.RunWith
import org.scalatest._
import org.scalatest.matchers._
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.camel.CamelContext
import org.apache.camel.Endpoint
import org.apache.camel.Exchange
import org.apache.camel.Processor
import org.apache.camel.Producer
import org.apache.camel.builder.RouteBuilder
import org.apache.camel.impl.DefaultCamelContext
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
@RunWith(classOf[JUnit4Runner])
class CamelSpec extends Spec with ShouldMatchers {
describe("A Camel routing scheme") {
it("should route message from actor A to actor B") {
val latch = new CountDownLatch(1);
val conf = new ActiveObjectGuiceConfigurator
conf.configureActiveObjects(
RestartStrategy(AllForOne, 3, 5000),
Component(
"camelfoo",
classOf[CamelFoo],
classOf[CamelFooImpl],
LifeCycle(Permanent, 1000),
1000) ::
Component(
"camelbar",
classOf[CamelBar],
classOf[CamelBarImpl],
LifeCycle(Permanent, 1000),
1000) ::
Nil
).addRoutes(new RouteBuilder() {
def configure = {
from("akka:camelfoo.foo").to("akka:camelbar.bar")
from("akka:camelbar.bar").process(new Processor() {
def process(e: Exchange) = {
println("Received exchange: " + e.getIn())
latch.countDown
}
})
}
}).supervise
val endpoint = conf.getRoutingEndpoint("akka:camelfoo.foo")
// println("----- " + endpoint)
// val exchange = endpoint.createExchange
// println("----- " + exchange)
conf.getActiveObject(classOf[CamelFooImpl].getName).asInstanceOf[CamelFoo].foo("Hello Foo")
//
// exchange.getIn().setHeader("cheese", 123)
// exchange.getIn().setBody("body")
//
// val producer = endpoint.createProducer
// println("----- " + producer)
//
// producer.process(exchange)
//
// // now lets sleep for a while
// val received = latch.await(5, TimeUnit.SECONDS)
// received should equal (true)
//
// conf.stop
}
}
}
trait CamelFoo {
@oneway def foo(msg: String)
}
trait CamelBar {
def bar(msg: String): String
}
class CamelFooImpl extends CamelFoo {
def foo(msg: String) = println("CamelFoo.foo:" + msg)
}
class CamelBarImpl extends CamelBar {
def bar(msg: String) = msg + "return_bar "
}

Binary file not shown.

BIN
lib/cassandra-0.3.0-dev.jar Normal file

Binary file not shown.

9
util-java/.classpath Normal file
View file

@ -0,0 +1,9 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="src" path="src/main/java"/>
<classpathentry kind="lib" path="target/akka-util-java-0.1.jar"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
<classpathentry kind="lib" path="/Users/jboner/src/scala/akka/lib/guice-core-2.0-SNAPSHOT.jar"/>
<classpathentry kind="lib" path="/Users/jboner/src/scala/akka/lib/guice-jsr250-2.0-SNAPSHOT.jar"/>
<classpathentry kind="output" path="target/classes"/>
</classpath>

17
util-java/.project Normal file
View file

@ -0,0 +1,17 @@
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>util-java</name>
<comment></comment>
<projects>
</projects>
<buildSpec>
<buildCommand>
<name>org.eclipse.jdt.core.javabuilder</name>
<arguments>
</arguments>
</buildCommand>
</buildSpec>
<natures>
<nature>org.eclipse.jdt.core.javanature</nature>
</natures>
</projectDescription>

View file

@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
<module relativePaths="true" MavenProjectsManager.isMavenModule="true" type="JAVA_MODULE" version="4">
<component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_5" inherit-compiler-output="false">
<output url="file://$MODULE_DIR$/target/classes" />
<output-test url="file://$MODULE_DIR$/target/test-classes" />
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/src/main/java" isTestSource="false" />
<excludeFolder url="file://$MODULE_DIR$/target" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="library" exported="" name="Maven: org.guiceyfruit:guice-core:2.0-SNAPSHOT" level="project" />
<orderEntry type="library" exported="" name="Maven: com.google.code.google-collections:google-collect:snapshot-20080530" level="project" />
<orderEntry type="library" exported="" name="Maven: cglib:cglib:2.2" level="project" />
<orderEntry type="library" exported="" name="Maven: asm:asm:3.1" level="project" />
<orderEntry type="library" exported="" name="Maven: aopalliance:aopalliance:1.0" level="project" />
<orderEntry type="library" exported="" name="Maven: org.guiceyfruit:guice-jsr250:2.0-SNAPSHOT" level="project" />
<orderEntry type="library" exported="" name="Maven: javax.annotation:jsr250-api:1.0" level="project" />
</component>
</module>

View file

@ -0,0 +1,30 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.kernel.config;
import java.util.List;
import com.google.inject.AbstractModule;
import com.google.inject.Singleton;
import com.google.inject.jsr250.ResourceProviderFactory;
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
public class ActiveObjectGuiceModule extends AbstractModule {
private final List<DependencyBinding> bindings;
public ActiveObjectGuiceModule(final List<DependencyBinding> bindings) {
this.bindings = bindings;
}
protected void configure() {
bind(ResourceProviderFactory.class);
for (int i = 0; i < bindings.size(); i++) {
final DependencyBinding db = bindings.get(i);
bind((Class) db.getInterface()).to((Class) db.getTarget()).in(Singleton.class);
}
}
}

View file

@ -0,0 +1,24 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.kernel.config;
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
public class DependencyBinding {
private final Class intf;
private final Class target;
public DependencyBinding(final Class intf, final Class target) {
this.intf = intf;
this.target = target;
}
public Class getInterface() {
return intf;
}
public Class getTarget() {
return target;
}
}