3:d iteration of modularization (all but fun tests done)

This commit is contained in:
jboner 2009-09-03 11:02:21 +02:00
parent 91ad702e52
commit 6015b09bec
33 changed files with 388 additions and 164 deletions

View file

@ -1,7 +1,7 @@
<!DOCTYPE aspectwerkz PUBLIC "-//AspectWerkz//DTD//EN" "http://aspectwerkz.codehaus.org/dtd/aspectwerkz2.dtd">
<aspectwerkz>
<system id="akka">
<package name="se.scalablesolutions.akka.kernel.actor">
<package name="se.scalablesolutions.akka.actor">
<aspect class="ActiveObjectAspect" />
</package>
</system>

1
akka-actors/src/test/scala/AllSuite.scala Executable file → Normal file
View file

@ -12,7 +12,6 @@ import org.scalatest._
class AllSuite extends SuperSuite(
List(
new SupervisorSpec
// new ActiveObjectSpec,
// new RestManagerSpec
)

View file

@ -4,7 +4,7 @@ import junit.framework.Test
import junit.framework.TestCase
import junit.framework.TestSuite
import actor.{ActorSpec, RemoteActorSpec, PersistentActorSpec, InMemoryActorSpec}
import actor.{ActorSpec, RemoteActorSpec, InMemoryActorSpec, SupervisorSpec, RemoteSupervisorSpec}
import reactor.{EventBasedSingleThreadDispatcherTest, EventBasedThreadPoolDispatcherTest}
import util.SchedulerSpec
@ -17,10 +17,8 @@ object AllTest extends TestCase {
suite.addTestSuite(classOf[EventBasedThreadPoolDispatcherTest])
suite.addTestSuite(classOf[ActorSpec])
suite.addTestSuite(classOf[RemoteActorSpec])
//suite.addTestSuite(classOf[PersistentActorSpec])
suite.addTestSuite(classOf[InMemoryActorSpec])
suite.addTestSuite(classOf[SchedulerSpec])
//suite.addTestSuite(classOf[TransactionClasherSpec])
suite
}

View file

@ -2,8 +2,9 @@
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka
package se.scalablesolutions.akka.camel
/*
import config.ActiveObjectGuiceConfigurator
import annotation.oneway
import config.ScalaConfig._
@ -28,16 +29,16 @@ import org.apache.camel.builder.RouteBuilder
import org.apache.camel.impl.DefaultCamelContext
// REQUIRES: -Djava.naming.factory.initial=org.apache.camel.util.jndi.CamelInitialContextFactory
*/
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
*
//@RunWith(classOf[JUnit4Runner])
class CamelSpec extends Spec with ShouldMatchers {
describe("A Camel routing scheme") {
it("should route message from direct:test to actor A using @Bean endpoint") {
/*
val latch = new CountDownLatch(1);
val conf = new ActiveObjectGuiceConfigurator
@ -80,7 +81,6 @@ class CamelSpec extends Spec with ShouldMatchers {
val received = latch.await(5, TimeUnit.SECONDS)
received should equal (true)
conf.stop
*/
}
}
}
@ -98,3 +98,4 @@ class CamelFooImpl extends CamelFoo {
class CamelBarImpl extends CamelBar {
def bar(msg: String) = msg + "return_bar "
}
*/

View file

@ -26,9 +26,9 @@ case class FailureOneWay(key: String, value: String, failer: Actor)
class InMemStatefulActor extends Actor {
timeout = 100000
makeTransactionRequired
private val mapState = TransactionalState.newInMemoryMap[String, String]
private val vectorState = TransactionalState.newInMemoryVector[String]
private val refState = TransactionalState.newInMemoryRef[String]
private val mapState = TransactionalState.newMap[String, String]
private val vectorState = TransactionalState.newVector[String]
private val refState = TransactionalState.newRef[String]
def receive: PartialFunction[Any, Unit] = {
case GetMapState(key) =>

View file

@ -2,6 +2,7 @@ package se.scalablesolutions.akka.actor
import java.util.concurrent.TimeUnit
import junit.framework.TestCase
import nio.{RemoteServer, RemoteClient}
import org.junit.{Test, Before}
import org.junit.Assert._
@ -26,7 +27,7 @@ class RemoteActorSpecActorBidirectional extends Actor {
}
class RemoteActorSpec extends TestCase {
kernel.Kernel.config
akka.Config.config
new Thread(new Runnable() {
def run = {
val server = new RemoteServer

View file

@ -2,11 +2,10 @@
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka
package se.scalablesolutions.akka.actor
import akka.serialization.BinaryString
import nio.{RemoteClient, RemoteServer}
import actor.{Supervisor, SupervisorFactory, Actor, StartSupervisor}
import config.ScalaConfig._
//import com.jteigen.scalatest.JUnit4Runner
@ -23,7 +22,7 @@ object Log {
//@RunWith(classOf[JUnit4Runner])
class RemoteSupervisorSpec extends junit.framework.TestCase with Suite {
Kernel.config
akka.Config.config
new Thread(new Runnable() {
def run = {
val server = new RemoteServer
@ -248,43 +247,43 @@ class RemoteSupervisorSpec extends junit.framework.TestCase with Suite {
/*
def testOneWayKillSingleActorOneForOne = {
Log.messageLog = ""
Logg.messageLog = ""
val sup = getSingleActorOneForOneSupervisor
sup ! StartSupervisor
Thread.sleep(500)
pingpong1 ! BinaryString("Die")
Thread.sleep(500)
expect("DIE") {
Log.messageLog
Logg.messageLog
}
}
def testOneWayCallKillCallSingleActorOneForOne = {
Log.messageLog = ""
Logg.messageLog = ""
val sup = getSingleActorOneForOneSupervisor
sup ! StartSupervisor
Thread.sleep(500)
pingpong1 ! OneWay
Thread.sleep(500)
expect("oneway") {
Log.oneWayLog
Logg.oneWayLog
}
pingpong1 ! BinaryString("Die")
Thread.sleep(500)
expect("DIE") {
Log.messageLog
Logg.messageLog
}
pingpong1 ! OneWay
Thread.sleep(500)
expect("onewayoneway") {
Log.oneWayLog
Logg.oneWayLog
}
}
*/
/*
def testOneWayKillSingleActorAllForOne = {
Log.messageLog = ""
Logg.messageLog = ""
val sup = getSingleActorAllForOneSupervisor
sup ! StartSupervisor
Thread.sleep(500)
@ -293,12 +292,12 @@ class RemoteSupervisorSpec extends junit.framework.TestCase with Suite {
}
Thread.sleep(500)
expect("DIE") {
Log.messageLog
Logg.messageLog
}
}
def testOneWayCallKillCallSingleActorAllForOne = {
Log.messageLog = ""
Logg.messageLog = ""
val sup = getSingleActorAllForOneSupervisor
sup ! StartSupervisor
Thread.sleep(500)
@ -307,26 +306,26 @@ class RemoteSupervisorSpec extends junit.framework.TestCase with Suite {
}
Thread.sleep(500)
expect("ping") {
Log.messageLog
Logg.messageLog
}
intercept(classOf[RuntimeException]) {
pingpong1 ! BinaryString("Die")
}
Thread.sleep(500)
expect("pingDIE") {
Log.messageLog
Logg.messageLog
}
expect("pong") {
(pingpong1 ! BinaryString("Ping")).getOrElse("nil")
}
Thread.sleep(500)
expect("pingDIEping") {
Log.messageLog
Logg.messageLog
}
}
def testOneWayKillMultipleActorsOneForOne = {
Log.messageLog = ""
Logg.messageLog = ""
val sup = getMultipleActorsOneForOneConf
sup ! StartSupervisor
Thread.sleep(500)
@ -335,12 +334,12 @@ class RemoteSupervisorSpec extends junit.framework.TestCase with Suite {
}
Thread.sleep(500)
expect("DIE") {
Log.messageLog
Logg.messageLog
}
}
def tesOneWayCallKillCallMultipleActorsOneForOne = {
Log.messageLog = ""
Logg.messageLog = ""
val sup = getMultipleActorsOneForOneConf
sup ! StartSupervisor
Thread.sleep(500)
@ -357,14 +356,14 @@ class RemoteSupervisorSpec extends junit.framework.TestCase with Suite {
}
Thread.sleep(500)
expect("pingpingping") {
Log.messageLog
Logg.messageLog
}
intercept(classOf[RuntimeException]) {
pingpong2 ! BinaryString("Die")
}
Thread.sleep(500)
expect("pingpingpingDIE") {
Log.messageLog
Logg.messageLog
}
expect("pong") {
(pingpong1 ! BinaryString("Ping")).getOrElse("nil")
@ -379,12 +378,12 @@ class RemoteSupervisorSpec extends junit.framework.TestCase with Suite {
}
Thread.sleep(500)
expect("pingpingpingDIEpingpingping") {
Log.messageLog
Logg.messageLog
}
}
def testOneWayKillMultipleActorsAllForOne = {
Log.messageLog = ""
Logg.messageLog = ""
val sup = getMultipleActorsAllForOneConf
sup ! StartSupervisor
Thread.sleep(500)
@ -393,12 +392,12 @@ class RemoteSupervisorSpec extends junit.framework.TestCase with Suite {
}
Thread.sleep(500)
expect("DIEDIEDIE") {
Log.messageLog
Logg.messageLog
}
}
def tesOneWayCallKillCallMultipleActorsAllForOne = {
Log.messageLog = ""
Logg.messageLog = ""
val sup = getMultipleActorsAllForOneConf
sup ! StartSupervisor
Thread.sleep(500)
@ -415,14 +414,14 @@ class RemoteSupervisorSpec extends junit.framework.TestCase with Suite {
}
Thread.sleep(500)
expect("pingpingping") {
Log.messageLog
Logg.messageLog
}
intercept(classOf[RuntimeException]) {
pingpong2 ! BinaryString("Die")
}
Thread.sleep(500)
expect("pingpingpingDIEDIEDIE") {
Log.messageLog
Logg.messageLog
}
expect("pong") {
(pingpong1 ! BinaryString("Ping")).getOrElse("nil")
@ -437,14 +436,14 @@ class RemoteSupervisorSpec extends junit.framework.TestCase with Suite {
}
Thread.sleep(500)
expect("pingpingpingDIEDIEDIEpingpingping") {
Log.messageLog
Logg.messageLog
}
}
*/
/*
def testNestedSupervisorsTerminateFirstLevelActorAllForOne = {
Log.messageLog = ""
Logg.messageLog = ""
val sup = getNestedSupervisorsAllForOneConf
sup ! StartSupervisor
intercept(classOf[RuntimeException]) {
@ -452,7 +451,7 @@ class RemoteSupervisorSpec extends junit.framework.TestCase with Suite {
}
Thread.sleep(500)
expect("DIEDIEDIE") {
Log.messageLog
Logg.messageLog
}
}
*/

View file

@ -1,6 +1,6 @@
package se.scalablesolutions.akka.util
import se.scalablesolutions.akka.kernel.actor.Actor
import se.scalablesolutions.akka.actor.Actor
import java.util.concurrent.TimeUnit

View file

@ -4,7 +4,6 @@
package se.scalablesolutions.akka.actor
import actor.{Supervisor, SupervisorFactory, Actor, StartSupervisor}
import config.ScalaConfig._
//import com.jteigen.scalatest.JUnit4Runner

View file

@ -8,10 +8,6 @@ import org.junit.Assert._
import state.TransactionalState
object Log {
var log = ""
}
class TxActor(clasher: Actor) extends Actor {
timeout = 1000000
makeTransactionRequired
@ -24,7 +20,7 @@ class TxActor(clasher: Actor) extends Actor {
}
class TxClasherActor extends Actor {
val vector = TransactionalState.newInMemoryVector[String]
val vector = TransactionalState.newVector[String]
timeout = 1000000
makeTransactionRequired
var count = 0
@ -59,7 +55,7 @@ class TxActorOneWay(clasher: Actor) extends Actor {
}
class TxClasherActorOneWay extends Actor {
val vector = TransactionalState.newInMemoryVector[String]
val vector = TransactionalState.newVector[String]
timeout = 1000000
makeTransactionRequired
var count = 0

View file

@ -2,15 +2,13 @@
* Copyright (C) 2009 Scalable Solutions.
*/
package com.scalablesolutions.akka.amqp
package se.scalablesolutions.akka.amqp
import java.lang.String
import rabbitmq.client.{AMQP => RabbitMQ, _}
import rabbitmq.client.ConnectionFactory
import com.rabbitmq.client.{AMQP => RabbitMQ, _}
import com.rabbitmq.client.ConnectionFactory
import se.scalablesolutions.akka.kernel.Kernel
import se.scalablesolutions.akka.kernel.actor.Actor
import se.scalablesolutions.akka.serialization.Serializer
import actor.Actor
import serialization.Serializer
import java.util.{Timer, TimerTask}

View file

@ -15,13 +15,225 @@
<relativePath>../pom.xml</relativePath>
</parent>
<!-- Core deps -->
<!-- akka deps -->
<dependencies>
<dependency>
<artifactId>akka-actors</artifactId>
<groupId>se.scalablesolutions.akka</groupId>
<version>0.6</version>
</dependency>
<dependency>
<artifactId>akka-persistence</artifactId>
<groupId>se.scalablesolutions.akka</groupId>
<version>0.6</version>
</dependency>
<dependency>
<artifactId>akka-rest</artifactId>
<groupId>se.scalablesolutions.akka</groupId>
<version>0.6</version>
</dependency>
<dependency>
<artifactId>akka-amqp</artifactId>
<groupId>se.scalablesolutions.akka</groupId>
<version>0.6</version>
</dependency>
<dependency>
<artifactId>akka-camel</artifactId>
<groupId>se.scalablesolutions.akka</groupId>
<version>0.6</version>
</dependency>
<dependency>
<artifactId>akka-util-java</artifactId>
<groupId>se.scalablesolutions.akka</groupId>
<version>0.6</version>
</dependency>
<dependency>
<artifactId>akka-util</artifactId>
<groupId>se.scalablesolutions.akka</groupId>
<version>0.6</version>
</dependency>
<!-- Core deps -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.codehaus.aspectwerkz</groupId>
<artifactId>aspectwerkz-nodeps-jdk5</artifactId>
<version>2.1</version>
</dependency>
<dependency>
<groupId>org.codehaus.aspectwerkz</groupId>
<artifactId>aspectwerkz-jdk5</artifactId>
<version>2.1</version>
</dependency>
<dependency>
<groupId>net.lag</groupId>
<artifactId>configgy</artifactId>
<version>1.3</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-core</artifactId>
<version>2.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.guiceyfruit</groupId>
<artifactId>guice-core</artifactId>
<version>2.0-beta-4</version>
</dependency>
<dependency>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
<version>3.1.0.GA</version>
</dependency>
<dependency>
<groupId>org.scala-tools</groupId>
<artifactId>javautils</artifactId>
<version>2.7.4-0.1</version>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>rabbitmq-client</artifactId>
<version>0.9.1</version>
</dependency>
<!-- For Protocol/Serialization -->
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>sbinary</groupId>
<artifactId>sbinary</artifactId>
<version>0.3</version>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>scala-json</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>dispatch.json</groupId>
<artifactId>dispatch-json</artifactId>
<version>0.5.2</version>
</dependency>
<dependency>
<groupId>dispatch.http</groupId>
<artifactId>dispatch-http</artifactId>
<version>0.5.2</version>
</dependency>
<dependency>
<groupId>sjson.json</groupId>
<artifactId>sjson</artifactId>
<version>0.1</version>
</dependency>
<!-- For Mongo -->
<dependency>
<groupId>com.mongodb</groupId>
<artifactId>mongo</artifactId>
<version>0.6</version>
</dependency>
<!-- For Cassandra -->
<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>cassandra</artifactId>
<version>0.4.0-trunk</version>
</dependency>
<dependency>
<groupId>com.facebook</groupId>
<artifactId>thrift</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>com.facebook</groupId>
<artifactId>fb303</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>commons-pool</groupId>
<artifactId>commons-pool</artifactId>
<version>1.5.1</version>
</dependency>
<!-- For third-party logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.4.3</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.4.3</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.13</version>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.0.4</version>
</dependency>
<!-- For Jersey & Atmosphere -->
<dependency>
<groupId>com.sun.grizzly</groupId>
<artifactId>grizzly-comet-webserver</artifactId>
<version>1.8.6.3</version>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-server</artifactId>
<version>1.1.1-ea</version>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-json</artifactId>
<version>1.1.1-ea</version>
</dependency>
<dependency>
<groupId>javax.ws.rs</groupId>
<artifactId>jsr311-api</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>com.sun.jersey.contribs</groupId>
<artifactId>jersey-scala</artifactId>
<version>1.1.2-ea-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.atmosphere</groupId>
<artifactId>atmosphere-core</artifactId>
<version>0.3</version>
</dependency>
<dependency>
<groupId>org.atmosphere</groupId>
<artifactId>atmosphere-portable-runtime</artifactId>
<version>0.3</version>
</dependency>
<dependency>
<groupId>org.atmosphere</groupId>
<artifactId>atmosphere-compat</artifactId>
<version>0.3</version>
</dependency>
</dependencies>
<build>

View file

@ -32,7 +32,7 @@ class AkkaServlet extends ServletContainer with AtmosphereServletProcessor with
override def initiate(rc: ResourceConfig, wa: WebApplication) = {
akka.Kernel.boot // will boot if not already booted by 'main'
val configurators = ConfiguratorRepository.getConfiguratorsFor(getServletContext)
val configurators = ConfiguratorRepository.getConfigurators
rc.getClasses.addAll(configurators.flatMap(_.getComponentInterfaces))
log.info("Starting AkkaServlet with ResourceFilters: " + rc.getProperty("com.sun.jersey.spi.container.ResourceFilters"));
@ -59,7 +59,7 @@ class AkkaServlet extends ServletContainer with AtmosphereServletProcessor with
event.getResponse.getWriter.write(data)
event.getResponse.getWriter.flush
}
} else log.info("Null event message :/ req[%s] res[%s]", event.getRequest, event.getResponse)
} else log.info("Null event message: req[%s] res[%s]", event.getRequest, event.getResponse)
event
}

View file

@ -12,8 +12,6 @@ import javax.ws.rs.core.UriBuilder
import java.io.File
import java.net.URLClassLoader
import net.lag.configgy.{Config, Configgy, RuntimeEnvironment, ParseException}
import rest.AkkaCometServlet
import nio.RemoteServer
import state.CassandraStorage

View file

@ -2,14 +2,14 @@
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.kernel.state
package se.scalablesolutions.akka.state
import java.io.{Flushable, Closeable}
import util.Logging
import util.Helpers._
import serialization.Serializer
import kernel.Kernel.config
import akka.Config.config
import org.apache.cassandra.db.ColumnFamily
import org.apache.cassandra.service._

View file

@ -2,14 +2,14 @@
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.kernel.state
package se.scalablesolutions.akka.state
import java.io.{Flushable, Closeable}
import util.Logging
import util.Helpers._
import serialization.Serializer
import kernel.Kernel.config
import akka.Config.config
import org.apache.cassandra.db.ColumnFamily
import org.apache.cassandra.service._
@ -46,7 +46,7 @@ object CassandraStorage extends MapStorage
*/
private[this] val serializer: Serializer = {
kernel.Kernel.config.getString("akka.storage.cassandra.storage-format", "java") match {
config.getString("akka.storage.cassandra.storage-format", "java") match {
case "scala-json" => Serializer.ScalaJSON
case "java-json" => Serializer.JavaJSON
case "protobuf" => Serializer.Protobuf
@ -231,14 +231,14 @@ val REF_COLUMN_FAMILY = "ref:item"
val IS_ASCENDING = true
val RUN_THRIFT_SERVICE = kernel.Kernel.config.getBool("akka.storage.cassandra.thrift-server.service", false)
val RUN_THRIFT_SERVICE = akka.akka.config.getBool("akka.storage.cassandra.thrift-server.service", false)
val CONSISTENCY_LEVEL = {
if (kernel.Kernel.config.getBool("akka.storage.cassandra.blocking", true)) 0
if (akka.akka.config.getBool("akka.storage.cassandra.blocking", true)) 0
else 1 }
@volatile private[this] var isRunning = false
private[this] val serializer: Serializer = {
kernel.Kernel.config.getString("akka.storage.cassandra.storage-format", "java") match {
akka.akka.config.getString("akka.storage.cassandra.storage-format", "java") match {
case "scala-json" => Serializer.ScalaJSON
case "java-json" => Serializer.JavaJSON
case "protobuf" => Serializer.Protobuf
@ -398,7 +398,7 @@ case object Start
case object Stop
private[this] val serverEngine: TThreadPoolServer = try {
val pidFile = kernel.Kernel.config.getString("akka.storage.cassandra.thrift-server.pidfile", "akka.pid")
val pidFile = akka.akka.config.getString("akka.storage.cassandra.thrift-server.pidfile", "akka.pid")
if (pidFile != null) new File(pidFile).deleteOnExit();
val listenPort = DatabaseDescriptor.getThriftPort

View file

@ -2,7 +2,7 @@
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.kernel.state
package se.scalablesolutions.akka.state
import scala.actors.Actor
import scala.actors.OutputChannel

View file

@ -1,10 +1,11 @@
package se.scalablesolutions.akka.kernel.state
package se.scalablesolutions.akka.state
import akka.util.Logging
import serialization.{Serializer}
import akka.Config.config
import sjson.json.Serializer._
import com.mongodb._
import se.scalablesolutions.akka.kernel.util.Logging
import serialization.{Serializer}
import kernel.Kernel.config
import sjson.json.Serializer._
import java.util.{Map=>JMap, List=>JList, ArrayList=>JArrayList}
@ -18,8 +19,7 @@ import java.util.{Map=>JMap, List=>JList, ArrayList=>JArrayList}
* <p/>
* @author <a href="http://debasishg.blogspot.com">Debasish Ghosh</a>
*/
object MongoStorage extends MapStorage
with VectorStorage with RefStorage with Logging {
object MongoStorage extends MapStorage with VectorStorage with RefStorage with Logging {
// enrich with null safe findOne
class RichDBCollection(value: DBCollection) {
@ -36,18 +36,15 @@ object MongoStorage extends MapStorage
val KEY = "key"
val VALUE = "value"
val COLLECTION = "akka_coll"
val MONGODB_SERVER_HOSTNAME =
config.getString("akka.storage.mongodb.hostname", "127.0.0.1")
val MONGODB_SERVER_DBNAME =
config.getString("akka.storage.mongodb.dbname", "testdb")
val MONGODB_SERVER_PORT =
config.getInt("akka.storage.mongodb.port", 27017)
val db = new Mongo(MONGODB_SERVER_HOSTNAME,
MONGODB_SERVER_PORT, MONGODB_SERVER_DBNAME)
val MONGODB_SERVER_HOSTNAME = config.getString("akka.storage.mongodb.hostname", "127.0.0.1")
val MONGODB_SERVER_DBNAME = config.getString("akka.storage.mongodb.dbname", "testdb")
val MONGODB_SERVER_PORT = config.getInt("akka.storage.mongodb.port", 27017)
val db = new Mongo(MONGODB_SERVER_HOSTNAME, MONGODB_SERVER_PORT, MONGODB_SERVER_DBNAME)
val coll = db.getCollection(COLLECTION)
// @fixme: make this pluggable
// FIXME: make this pluggable
private[this] val serializer = SJSON
override def insertMapStorageEntryFor(name: String,

View file

@ -2,9 +2,9 @@
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.kernel.state
package se.scalablesolutions.akka.state
import kernel.stm.TransactionManagement
import stm.TransactionManagement
import akka.collection._
import org.codehaus.aspectwerkz.proxy.Uuid
@ -71,7 +71,7 @@ class PersistentState {
abstract class PersistentTransactionalMap[K, V] extends TransactionalMap[K, V] {
// FIXME: need to handle remove in another changeSet
protected[kernel] val changeSet = new HashMap[K, V]
protected[akka] val changeSet = new HashMap[K, V]
def getRange(start: Option[AnyRef], count: Int)
@ -221,7 +221,7 @@ class MongoPersistentTransactionalMap extends TemplatePersistentTransactionalMap
abstract class PersistentTransactionalVector[T] extends TransactionalVector[T] {
// FIXME: need to handle remove in another changeSet
protected[kernel] val changeSet = new ArrayBuffer[T]
protected[akka] val changeSet = new ArrayBuffer[T]
// ---- For Transactional ----
override def begin = {}

View file

@ -2,7 +2,7 @@
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.kernel.state
package se.scalablesolutions.akka.state
import org.apache.commons.pool._
import org.apache.commons.pool.impl._

View file

@ -1,8 +1,7 @@
package se.scalablesolutions.akka.kernel.state
package se.scalablesolutions.akka.state
// abstracts persistence storage
trait Storage {
}
trait Storage
// for Maps
trait MapStorage extends Storage {
@ -13,8 +12,7 @@ trait MapStorage extends Storage {
def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef]
def getMapStorageSizeFor(name: String): Int
def getMapStorageFor(name: String): List[Tuple2[AnyRef, AnyRef]]
def getMapStorageRangeFor(name: String, start: Option[AnyRef],
finish: Option[AnyRef], count: Int): List[Tuple2[AnyRef, AnyRef]]
def getMapStorageRangeFor(name: String, start: Option[AnyRef], finish: Option[AnyRef], count: Int): List[Tuple2[AnyRef, AnyRef]]
}
// for vectors

View file

@ -0,0 +1,18 @@
package se.scalablesolutions.akka
import akka.state.{MongoStorageSpec, MongoPersistentActorSpec, CassandraPersistentActorSpec}
import junit.framework.Test
import junit.framework.TestCase
import junit.framework.TestSuite
object AllTest extends TestCase {
def suite(): Test = {
val suite = new TestSuite("All Scala tests")
//suite.addTestSuite(classOf[CassandraPersistentActorSpec])
//suite.addTestSuite(classOf[MongoPersistentActorSpec])
//suite.addTestSuite(classOf[MongoStorageSpec])
suite
}
def main(args: Array[String]) = junit.textui.TestRunner.run(suite)
}

View file

@ -1,22 +1,38 @@
package se.scalablesolutions.akka.kernel.actor
package se.scalablesolutions.akka.state
import akka.actor.Actor
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.TimeUnit
import junit.framework.TestCase
import kernel.Kernel
import kernel.reactor._
import reactor._
import kernel.state.{CassandraStorageConfig, TransactionalState}
import org.junit.{Test, Before}
import org.junit.Assert._
class PersistentActor extends Actor {
case class GetMapState(key: String)
case object GetVectorState
case object GetVectorSize
case object GetRefState
case class SetMapState(key: String, value: String)
case class SetVectorState(key: String)
case class SetRefState(key: String)
case class Success(key: String, value: String)
case class Failure(key: String, value: String, failer: Actor)
case class SetMapStateOneWay(key: String, value: String)
case class SetVectorStateOneWay(key: String)
case class SetRefStateOneWay(key: String)
case class SuccessOneWay(key: String, value: String)
case class FailureOneWay(key: String, value: String, failer: Actor)
class CassandraPersistentActor extends Actor {
timeout = 100000
makeTransactionRequired
private val mapState = TransactionalState.newPersistentMap(CassandraStorageConfig())
private val vectorState = TransactionalState.newPersistentVector(CassandraStorageConfig())
private val refState = TransactionalState.newPersistentRef(CassandraStorageConfig())
private val mapState = PersistentState.newMap(CassandraStorageConfig())
private val vectorState = PersistentState.newVector(CassandraStorageConfig())
private val refState = PersistentState.newRef(CassandraStorageConfig())
def receive: PartialFunction[Any, Unit] = {
case GetMapState(key) =>
@ -56,11 +72,11 @@ class PersistentActor extends Actor {
}
}
class PersistentActorSpec extends TestCase {
class CassandraPersistentActorSpec extends TestCase {
@Test
def testMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
val stateful = new PersistentActor
val stateful = new CassandraPersistentActor
stateful.start
stateful !! SetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init") // set init state
stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
@ -69,7 +85,7 @@ class PersistentActorSpec extends TestCase {
@Test
def testMapShouldRollbackStateForStatefulServerInCaseOfFailure = {
val stateful = new PersistentActor
val stateful = new CassandraPersistentActor
stateful.start
stateful !! SetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init") // set init state
val failer = new PersistentFailerActor
@ -83,7 +99,7 @@ class PersistentActorSpec extends TestCase {
@Test
def testVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
val stateful = new PersistentActor
val stateful = new CassandraPersistentActor
stateful.start
stateful !! SetVectorState("init") // set init state
stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
@ -92,7 +108,7 @@ class PersistentActorSpec extends TestCase {
@Test
def testVectorShouldRollbackStateForStatefulServerInCaseOfFailure = {
val stateful = new PersistentActor
val stateful = new CassandraPersistentActor
stateful.start
stateful !! SetVectorState("init") // set init state
val failer = new PersistentFailerActor
@ -106,7 +122,7 @@ class PersistentActorSpec extends TestCase {
@Test
def testRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
val stateful = new PersistentActor
val stateful = new CassandraPersistentActor
stateful.start
stateful !! SetRefState("init") // set init state
stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
@ -115,7 +131,7 @@ class PersistentActorSpec extends TestCase {
@Test
def testRefShouldRollbackStateForStatefulServerInCaseOfFailure = {
val stateful = new PersistentActor
val stateful = new CassandraPersistentActor
stateful.start
stateful !! SetRefState("init") // set init state
val failer = new PersistentFailerActor

View file

@ -1,14 +1,12 @@
package se.scalablesolutions.akka.kernel.actor
package se.scalablesolutions.akka.state
import akka.actor.Actor
import junit.framework.TestCase
import org.junit.{Test, Before}
import org.junit.Assert._
import dispatch.json._
import dispatch.json.Js._
import kernel.state.{MongoStorageConfig, TransactionalState}
/**
* A persistent actor based on MongoDB storage.
* <p/>
@ -31,9 +29,9 @@ case object LogSize
class BankAccountActor extends Actor {
makeTransactionRequired
private val accountState =
TransactionalState.newPersistentMap(MongoStorageConfig())
PersistentState.newMap(MongoStorageConfig())
private val txnLog =
TransactionalState.newPersistentVector(MongoStorageConfig())
PersistentState.newVector(MongoStorageConfig())
def receive: PartialFunction[Any, Unit] = {
// check balance

View file

@ -1,4 +1,4 @@
package se.scalablesolutions.akka.kernel.state
package se.scalablesolutions.akka.state
import junit.framework.TestCase

View file

@ -22,11 +22,6 @@
<groupId>se.scalablesolutions.akka</groupId>
<version>0.6</version>
</dependency>
<dependency>
<artifactId>akka-kernel</artifactId>
<groupId>se.scalablesolutions.akka</groupId>
<version>0.6</version>
</dependency>
<dependency>
<artifactId>akka-actors</artifactId>
<groupId>se.scalablesolutions.akka</groupId>

View file

@ -1,7 +1,7 @@
package sample.java;
import se.scalablesolutions.akka.kernel.config.ActiveObjectManager;
import static se.scalablesolutions.akka.kernel.config.JavaConfig.*;
import se.scalablesolutions.akka.config.ActiveObjectManager;
import static se.scalablesolutions.akka.config.JavaConfig.*;
public class Boot {
final private ActiveObjectManager manager = new ActiveObjectManager();

View file

@ -11,9 +11,10 @@ import javax.ws.rs.Produces;
import se.scalablesolutions.akka.annotation.transactionrequired;
import se.scalablesolutions.akka.annotation.prerestart;
import se.scalablesolutions.akka.annotation.postrestart;
import se.scalablesolutions.akka.kernel.state.TransactionalState;
import se.scalablesolutions.akka.kernel.state.TransactionalMap;
import se.scalablesolutions.akka.kernel.state.CassandraStorageConfig;
import se.scalablesolutions.akka.state.TransactionalState;
import se.scalablesolutions.akka.state.PersistentState;
import se.scalablesolutions.akka.state.TransactionalMap;
import se.scalablesolutions.akka.state.CassandraStorageConfig;
/**
* Try service out by invoking (multiple times):
@ -28,8 +29,8 @@ public class PersistentSimpleService {
private String KEY = "COUNTER";
private boolean hasStartedTicking = false;
private TransactionalState factory = new TransactionalState();
private TransactionalMap<Object, Object> storage = factory.newPersistentMap(new CassandraStorageConfig());
private PersistentState factory = new PersistentState();
private TransactionalMap<Object, Object> storage = factory.newMap(new CassandraStorageConfig());
@GET
@Produces({"application/html"})

View file

@ -11,9 +11,9 @@ import javax.ws.rs.Produces;
import se.scalablesolutions.akka.annotation.transactionrequired;
import se.scalablesolutions.akka.annotation.prerestart;
import se.scalablesolutions.akka.annotation.postrestart;
import se.scalablesolutions.akka.kernel.state.TransactionalState;
import se.scalablesolutions.akka.kernel.state.TransactionalMap;
import se.scalablesolutions.akka.kernel.state.CassandraStorageConfig;
import se.scalablesolutions.akka.state.TransactionalState;
import se.scalablesolutions.akka.state.TransactionalMap;
import se.scalablesolutions.akka.state.CassandraStorageConfig;
/**
* Try service out by invoking (multiple times):
@ -29,7 +29,7 @@ public class SimpleService {
private boolean hasStartedTicking = false;
private TransactionalState factory = new TransactionalState();
private TransactionalMap storage = factory.newInMemoryMap();
private TransactionalMap storage = factory.newMap();
@GET
@Produces({"application/json"})

View file

@ -1,9 +1,9 @@
package sample.lift
import se.scalablesolutions.akka.kernel.state.{TransactionalState, CassandraStorageConfig}
import se.scalablesolutions.akka.kernel.actor.{SupervisorFactory, Actor}
import se.scalablesolutions.akka.kernel.config.ScalaConfig._
import se.scalablesolutions.akka.kernel.util.Logging
import se.scalablesolutions.akka.state.{PersistentState, TransactionalState, CassandraStorageConfig}
import se.scalablesolutions.akka.actor.{SupervisorFactory, Actor}
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.util.Logging
import javax.ws.rs.core.MultivaluedMap
import javax.ws.rs.{GET, POST, Path, Produces, WebApplicationException, Consumes}
@ -22,7 +22,7 @@ class SimpleService extends Actor {
case object Tick
private val KEY = "COUNTER";
private var hasStartedTicking = false;
private val storage = TransactionalState.newInMemoryMap[String, Integer]
private val storage = TransactionalState.newMap[String, Integer]
@GET
@Produces(Array("text/html"))
@ -55,7 +55,7 @@ class PersistentSimpleService extends Actor {
case object Tick
private val KEY = "COUNTER";
private var hasStartedTicking = false;
private val storage = TransactionalState.newPersistentMap(CassandraStorageConfig())
private val storage = PersistentState.newMap(CassandraStorageConfig())
@GET
@Produces(Array("text/html"))

View file

@ -7,11 +7,11 @@ import _root_.net.liftweb.sitemap.Loc._
import Helpers._
import _root_.net.liftweb.http.auth._
import se.scalablesolutions.akka.kernel.state.{TransactionalState, CassandraStorageConfig}
import se.scalablesolutions.akka.kernel.actor.{SupervisorFactory, Actor}
import se.scalablesolutions.akka.kernel.config.ScalaConfig._
import se.scalablesolutions.akka.kernel.util.Logging
import sample.lift.SimpleService
import se.scalablesolutions.akka.actor.{SupervisorFactory, Actor}
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.util.Logging
import sample.lift.{PersistentSimpleService, SimpleService}
/**
* A class that's instantiated early and run. It allows the application
@ -44,7 +44,7 @@ class Boot {
new SimpleService,
LifeCycle(Permanent, 100)) ::
Supervise(
new SimpleService,
new PersistentSimpleService,
LifeCycle(Permanent, 100)) ::
Nil)
}

View file

@ -4,10 +4,10 @@
package sample.scala
import se.scalablesolutions.akka.kernel.state.{TransactionalState, CassandraStorageConfig}
import se.scalablesolutions.akka.kernel.actor.{SupervisorFactory, Actor}
import se.scalablesolutions.akka.kernel.config.ScalaConfig._
import se.scalablesolutions.akka.kernel.util.Logging
import se.scalablesolutions.akka.state.{PersistentState, TransactionalState, CassandraStorageConfig}
import se.scalablesolutions.akka.actor.{SupervisorFactory, Actor}
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.util.Logging
import javax.ws.rs.core.MultivaluedMap
import javax.ws.rs.{GET, POST, Path, Produces, WebApplicationException, Consumes}
@ -51,7 +51,7 @@ class SimpleService extends Actor {
case object Tick
private val KEY = "COUNTER";
private var hasStartedTicking = false;
private val storage = TransactionalState.newInMemoryMap[String, Integer]
private val storage = TransactionalState.newMap[String, Integer]
@GET
@Produces(Array("text/html"))
@ -84,7 +84,7 @@ class PersistentSimpleService extends Actor {
case object Tick
private val KEY = "COUNTER";
private var hasStartedTicking = false;
private val storage = TransactionalState.newPersistentMap(CassandraStorageConfig())
private val storage = PersistentState.newMap(CassandraStorageConfig())
@GET
@Produces(Array("text/html"))