Merge branch 'master' of git@github.com:jboner/akka

This commit is contained in:
Viktor Klang 2009-12-17 21:50:52 +01:00
commit 3ac57d67e0
28 changed files with 27 additions and 423 deletions

View file

@ -70,19 +70,19 @@
<!-- For Protocol/Serialization -->
<dependency>
<groupId>org.h2.compress</groupId>
<groupId>voldemort.store.compress</groupId>
<artifactId>h2-lzf</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<version>1.1.0</version>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.1.0</version>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>

View file

@ -1,48 +0,0 @@
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>akka-camel</artifactId>
<name>Akka Camel Module</name>
<packaging>jar</packaging>
<parent>
<artifactId>akka</artifactId>
<groupId>se.scalablesolutions.akka</groupId>
<version>0.6</version>
<relativePath>../pom.xml</relativePath>
</parent>
<!-- Core deps -->
<dependencies>
<dependency>
<artifactId>akka-util</artifactId>
<groupId>${project.groupId}</groupId>
<version>${project.version}</version>
</dependency>
<dependency>
<artifactId>akka-actors</artifactId>
<groupId>${project.groupId}</groupId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-core</artifactId>
<version>2.0-SNAPSHOT</version>
</dependency>
</dependencies>
<build>
<resources>
<resource>
<filtering>false</filtering>
<directory>src/main/resources</directory>
<includes>
<include>META-INF/*</include>
</includes>
</resource>
</resources>
</build>
</project>

View file

@ -1 +0,0 @@
class=se.scalablesolutions.akka.kernel.camel.ActiveObjectComponent

View file

@ -1,23 +0,0 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.camel
import config.ActiveObjectConfigurator
import java.util.Map
import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue}
import org.apache.camel.{Endpoint, Exchange}
import org.apache.camel.impl.DefaultComponent
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class ActiveObjectComponent(val conf: ActiveObjectConfigurator) extends DefaultComponent {
override def createEndpoint(uri: String, remaining: String, parameters: Map[_,_]): Endpoint = {
//val consumers = getAndRemoveParameter(parameters, "concurrentConsumers", classOf[Int], 1)
new ActiveObjectEndpoint(uri, this, conf)
}
}

View file

@ -1,36 +0,0 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.camel
import java.util.concurrent.{BlockingQueue, ExecutorService, Executors, ThreadFactory, TimeUnit}
import util.Logging
import org.apache.camel.{AsyncCallback, AsyncProcessor, Consumer, Exchange, Processor}
import org.apache.camel.impl.ServiceSupport
import org.apache.camel.impl.converter.AsyncProcessorTypeConverter
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class ActiveObjectConsumer(
val endpoint: ActiveObjectEndpoint,
proc: Processor,
val activeObject: AnyRef)
extends ServiceSupport with Consumer with Runnable with Logging {
val processor = AsyncProcessorTypeConverter.convert(proc)
println("------- creating consumer for: "+ endpoint.uri)
override def run = {
}
def doStart() = {
}
def doStop() = {
}
override def toString(): String = "ActiveObjectConsumer [" + endpoint.getEndpointUri + "]"
}

View file

@ -1,48 +0,0 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.camel
import config.ActiveObjectConfigurator
import util.Logging
import java.util.{ArrayList, HashSet, List, Set}
import java.util.concurrent.{BlockingQueue, CopyOnWriteArraySet, LinkedBlockingQueue}
import org.apache.camel.{Component, Consumer, Exchange, Processor, Producer}
import org.apache.camel.impl.{DefaultEndpoint, DefaultComponent};
import org.apache.camel.spi.BrowsableEndpoint;
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class ActiveObjectEndpoint(val uri: String, val component: DefaultComponent, val conf: ActiveObjectConfigurator) // FIXME: need abstraction trait here
extends DefaultEndpoint(uri) with BrowsableEndpoint with Logging {
val firstSep = uri.indexOf(':')
val lastSep = uri.lastIndexOf( '.')
val scheme = uri.substring(0, firstSep)
val activeObjectName = uri.substring(uri.indexOf(':') + 1, lastSep)
val activeObjectClass = Thread.currentThread.getContextClassLoader.loadClass(activeObjectName)
val methodName = uri.substring(lastSep + 1, uri.length)
val activeObject = conf.getInstance(activeObjectClass).asInstanceOf[MessageDriven]
// val activeObjectProxy = conf.getInstanceProxy(activeObjectName)
// val genericServer = supervisor.getServerOrElse(
// activeObjectName,
// throw new IllegalArgumentException("Can't find active object with name [" + activeObjectName + "] and method [" + methodName + "]"))
log.debug("Creating Camel Endpoint for scheme [%s] and component [%s]", scheme, activeObjectName)
private var queue: BlockingQueue[Exchange] = new LinkedBlockingQueue[Exchange](1000)
override def createProducer: Producer = new ActiveObjectProducer(this, activeObject)
override def createConsumer(processor: Processor): Consumer = new ActiveObjectConsumer(this, processor, activeObject)
override def getExchanges: List[Exchange] = new ArrayList[Exchange](queue)
override def isSingleton = true
}

View file

@ -1,42 +0,0 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.camel
import java.util.Collection
import util.Logging;
import java.util.concurrent.BlockingQueue;
import org.apache.camel.{Exchange, AsyncProcessor, AsyncCallback}
import org.apache.camel.impl.DefaultProducer
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class ActiveObjectProducer(
val endpoint: ActiveObjectEndpoint,
val activeObject: MessageDriven)
extends DefaultProducer(endpoint) with AsyncProcessor with Logging {
private val actorName = endpoint.activeObjectName
def process(exchange: Exchange) = activeObject.onMessage(exchange) // FIXME: should we not invoke the generic server here?
def process(exchange: Exchange, callback: AsyncCallback): Boolean = {
val copy = exchange.copy
copy.setProperty("CamelAsyncCallback", callback)
activeObject.onMessage(copy)
callback.done(true)
true
}
override def doStart = {
super.doStart
}
override def doStop = {
super.doStop
}
override def toString(): String = "ActiveObjectProducer [" + endpoint.getEndpointUri + "]"
}

View file

@ -1,35 +0,0 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.config
import org.apache.camel.{Routes, CamelContext, Endpoint}
trait CamelConfigurator {
/**
* Add Camel routes for the active objects.
* <pre>
* activeObjectConfigurator.addRoutes(new RouteBuilder() {
* def configure = {
* from("akka:actor1").to("akka:actor2")
* from("akka:actor2").process(new Processor() {
* def process(e: Exchange) = {
* println("Received exchange: " + e.getIn())
* }
* })
* }
* }).inject().supervise();
* </pre>
*/
def addRoutes(routes: Routes): ActiveObjectConfiguratorBase
def getCamelContext: CamelContext
def getRoutingEndpoint(uri: String): Endpoint
def getRoutingEndpoints: java.util.Collection[Endpoint]
def getRoutingEndpoints(uri: String): java.util.Collection[Endpoint]
}

View file

@ -1,14 +0,0 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.camel
import org.apache.camel.Exchange
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait MessageDriven {
def onMessage(exchange: Exchange)
}

View file

@ -1,16 +0,0 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.camel
import actor.Supervisor
import util.Logging
import org.apache.camel.impl.{DefaultCamelContext, DefaultEndpoint, DefaultComponent}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class SupervisorAwareCamelContext extends DefaultCamelContext with Logging {
var supervisor: Supervisor = _
}

View file

@ -1,101 +0,0 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.camel
/*
import config.ActiveObjectGuiceConfigurator
import annotation.oneway
import config.ScalaConfig._
import com.google.inject.{AbstractModule, Scopes}
//import com.jteigen.scalatest.JUnit4Runner
import org.apache.camel.component.bean.ProxyHelper
import org.junit.runner.RunWith
import org.scalatest._
import org.scalatest.matchers._
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.camel.CamelContext
import org.apache.camel.Endpoint
import org.apache.camel.Exchange
import org.apache.camel.Processor
import org.apache.camel.Producer
import org.apache.camel.builder.RouteBuilder
import org.apache.camel.impl.DefaultCamelContext
// REQUIRES: -Djava.naming.factory.initial=org.apache.camel.util.jndi.CamelInitialContextFactory
*/
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*
//@RunWith(classOf[JUnit4Runner])
class CamelSpec extends Spec with ShouldMatchers {
describe("A Camel routing scheme") {
it("should route message from direct:test to actor A using @Bean endpoint") {
val latch = new CountDownLatch(1);
val conf = new ActiveObjectGuiceConfigurator
conf.configure(
RestartStrategy(AllForOne, 3, 5000),
Component(
"camelfoo",
classOf[CamelFoo],
classOf[CamelFooImpl],
LifeCycle(Permanent),
1000) ::
Nil
).addRoutes(new RouteBuilder() {
def configure = {
from("direct:test").to("bean:camelfoo").process(new Processor() {
def process(e: Exchange) = {
println("Received exchange: " + e.getIn())
latch.countDown
}
})
}}
).supervise
val endpoint = conf.getRoutingEndpoint("direct:test")
val proxy = ProxyHelper.createProxy(endpoint, classOf[CamelFoo])
proxy.foo("hello there")
val exchange = endpoint.createExchange
println("----- " + exchange)
exchange.getIn().setBody("hello there")
val producer = endpoint.createProducer
println("----- " + producer)
producer.process(exchange)
// now lets sleep for a while
val received = latch.await(5, TimeUnit.SECONDS)
received should equal (true)
conf.stop
}
}
}
trait CamelFoo {
@oneway def foo(msg: String)
}
trait CamelBar {
def bar(msg: String): String
}
class CamelFooImpl extends CamelFoo {
def foo(msg: String) = println("CamelFoo.foo:" + msg)
}
class CamelBarImpl extends CamelBar {
def bar(msg: String) = msg + "return_bar "
}
*/

View file

@ -37,11 +37,6 @@
<groupId>${project.groupId}</groupId>
<version>${project.version}</version>
</dependency>
<dependency>
<artifactId>akka-camel</artifactId>
<groupId>${project.groupId}</groupId>
<version>${project.version}</version>
</dependency>
<dependency>
<artifactId>akka-security</artifactId>
<groupId>${project.groupId}</groupId>

View file

@ -28,9 +28,9 @@
<!-- For Mongo -->
<dependency>
<groupId>com.mongodb</groupId>
<artifactId>mongo</artifactId>
<version>1.0</version>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
<version>1.1</version>
</dependency>
<!-- For Cassandra -->

View file

@ -1,8 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>high-scale-lib</groupId>
<artifactId>high-scale-lib</artifactId>
<version>1.0</version>
<packaging>jar</packaging>
</project>

View file

@ -1,8 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.camel</groupId>
<artifactId>camel-core</artifactId>
<version>2.0-SNAPSHOT</version>
<packaging>jar</packaging>
</project>

View file

@ -1,8 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache</groupId>
<artifactId>zookeeper</artifactId>
<version>3.1.0</version>
<packaging>jar</packaging>
</project>

View file

@ -1,8 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<version>1.1.0</version>
<packaging>jar</packaging>
</project>

View file

@ -1,8 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.1.0</version>
<packaging>jar</packaging>
</project>

View file

@ -1,7 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.h2.compress</groupId>
<groupId>voldemort.store.compress</groupId>
<artifactId>h2-lzf</artifactId>
<version>1.0</version>
<packaging>jar</packaging>

27
pom.xml
View file

@ -13,12 +13,26 @@
<packaging>pom</packaging>
<description>
Akka implements a unique hybrid of the Actor model and Software Transactional Memory (STM).
Akka gives you you:
* Concurrency (high-level and simple).
* Asynchronous, non-blocking, event-driven and highly performant components.
* Scalability through very performant remote actors.
* Fault-tolerance through supervision hierarchies with “let-it-crash” semantics.
Akka implements a unique hybrid of:
* Actors , which gives you:
* Simple and high-level abstractions for concurrency and parallelism.
* Asynchronous, non-blocking and highly performant event-driven programming model.
* Very lightweight event-driven processes (create ~6.5 million actors on 4 G RAM).
* Supervision hierarchies with let-it-crash semantics. For writing highly fault-tolerant systems that never stops, systems that self-heals.
* Software Transactional Memory (STM). (Distributed transactions coming soon).
* Transactors: combine actors and STM into transactional actors. Allows you to compose atomic message flows with automatic rollback and retry.
* Remoting: highly performant distributed actors with remote supervision and error management.
* Cluster membership management.
Akka also has a set of add-on modules:
* Persistence: A set of pluggable back-end storage modules that works in sync with the STM.
* Cassandra distributed and highly scalable database.
* MongoDB document database.
* Redis data structures database (upcoming)
* REST (JAX-RS): Expose actors as REST services.
* Comet: Expose actors as Comet services.
* Security: Digest and Kerberos based security.
* Microkernel: Run Akka as a stand-alone kernel.
</description>
<properties>
@ -39,7 +53,6 @@
<module>akka-actors</module>
<module>akka-persistence</module>
<module>akka-rest</module>
<module>akka-camel</module>
<module>akka-amqp</module>
<module>akka-security</module>
<module>akka-kernel</module>