mid cassandra rewrite

This commit is contained in:
Jonas Boner 2009-08-02 16:14:12 +02:00
parent 136cb4e334
commit 32ef59c67c
7 changed files with 296 additions and 105 deletions

View file

@ -1,58 +1,53 @@
h1. Akka Actor Kernel: h1. Akka: RESTful Distributed Persistent Transactional Actors
__RESTful Distributed Persistent Transactional Actors__
h1. Introduction h3. "http://akkasource.org":http://akkasource.org
The Akka kernel implements a unique hybrid of: Akka implements a unique hybrid of:
* The Actor model (Actors and Active Objects) * The Actor model (Actors and Active Objects), which gives you:
** Asynchronous, non-blocking highly concurrent components. ** Concurrency (high-level and simple)
** Asynchronous, non-blocking and highly performant components.
** Supervision with "let-it-crash" semantics. Components are loosely coupled and restarted upon failure. ** Supervision with "let-it-crash" semantics. Components are loosely coupled and restarted upon failure.
* Software Transactional Memory (STM). * Software Transactional Memory (STM).
* BASE persistence - Pluggable Eventually Consistent distributed scalable persistent storage. * BASE and ACID persistence - Pluggable Eventually Consistent or ACID distributed scalable persistent storage.
* Remoting - Distributed services. * Remoting - Distributed services with supervision and error management
* REST - JAX-RS binding. * REST (JAX-RS) and Comet bindings.
* Monitoring and Management
h2. Here is a short overview Akka can be used in two different ways:
* As a library: used by a web app, to be put into WEB-INF/lib
* As a kernel: stand-alone kernel, embedding the servlet container
h3. The Actor model and supervisor hierarchies See the "Use-case and Deployment Scenarios":http://wiki.github.com/jboner/akka/use-case-and-deployment-scenarios for details.
"Actors":http://en.wikipedia.org/wiki/Actor_model with "Erlang OTP-style supervisors":http://www.erlang.org/doc/design_principles/sup_princ.html#5 and "embrace failure/let-it-crash" semantics to allow implementation of asynchronous, non-blocking and highly fault-tolerant systems. Sort of "SEDA":http://www.eecs.harvard.edu/~mdw/proj/seda/ in a box with highly configurable and monitorable (JMX and w3c) thread pools and message queues. h1. What's Akka all about? Why should I care?
h3. Software Transactional Memory (STM) If you are new to Akka then I suggest you start with either the:
"Software Transactional Memory (STM)":http://en.wikipedia.org/wiki/Software_transactional_memory for composable message flows. Distributed transactions will come very soon, backed up by "ZooKeeper":http://hadoop.apache.org/zookeeper/. The STM works with both persistent datastructures and in-memory datastructures (see below). * "High Level View":http://wiki.github.com/jboner/akka/modules-the-high-level-view; which is outlining the different modules in Akka.
* "Use-case and Deployment Scenarios":http://wiki.github.com/jboner/akka/use-case-and-deployment-scenarios; outlining how and in which use-case and deployment scenarios can I use Akka?
* "Examples":http://wiki.github.com/jboner/akka/examples; showing how to build a RESTful, transactional, persistent Active Object and Actor.
h3. BASE: Eventually Consistent Distributed persistence After that you can dive into the "Reference Manual":http://wiki.github.com/jboner/akka/akka-reference-manual.
Akka provides a "Eventually Consistent":http://www.allthingsdistributed.com/2008/12/eventually_consistent.html Transactional Persistent Map, Vector and Ref. Backed up by the "Cassandra":http://incubator.apache.org/cassandra/ highly scalable, eventually consistent, distributed, structured key-value store. Akka will add support for "Terracotta":http://terracotta.org, "Redis":http://code.google.com/p/redis/, "Memcached":http://www.danga.com/memcached/, "Voldemort":http://project-voldemort.com/, "Tokyo Cabinet/Tyrant":http://tokyocabinet.sourceforge.net/ and "Hazelcast":http://www.hazelcast.com/ shortly. h1. Documentation
New nodes can be added and removed on the fly to support true scaling of cluster. The addition of Terracotta and Hazelcast will allow for atomic (ACID) transactions (non-BASE).
h3. REST
Actors can be exposed as "REST":http://en.wikipedia.org/wiki/Representational_State_Transfer services through "JAX-RS":https://jersey.dev.java.net/.
h3. Remoting
Actors can be defined and started on remote nodes, supporting both remote failures and supervision/linking. Enabling another dimension of fault-tolerance.
h3. Java and Scala API
Both a Java API through Active Objects and annotations as well as a Scala API with Erlang-style Actors with pattern matching etc.
h3. Microkernel
Akka has a microkernel that embeds the Actor management, Persistence service, REST integration, JMX management and Remote service. Simply drop your application in the /deploy directory and start up the kernel and you should be able to access your Actors through REST.
h2. Documentation
Akka has pretty thorough "reference documentation":https://github.com/jboner/akka/wikis. Covering examples, APIs and configuration. Akka has pretty thorough "reference documentation":https://github.com/jboner/akka/wikis. Covering examples, APIs and configuration.
h2. Distribution h1. Distribution
The latest distribution can be found in the "downloads section":https://github.com/jboner/akka/downloads The latest distribution can be found in the "downloads section":https://github.com/jboner/akka/downloads
h2. License h1. Mailing List
If you have questions and/or feedback: please sign up to the Akka User mailing list:
"http://groups.google.com/group/akka-user":http://groups.google.com/group/akka-user
h1. Professional Support
Scalable Solutions AB is providing a variety of professional support packages for Akka, please visit their website for details:
"http://scalablesolutions.se":http://scalablesolutions.se
h1. License
<pre> <pre>
This software is licensed under the Apache 2 license, quoted below. This software is licensed under the Apache 2 license, quoted below.

View file

@ -125,6 +125,11 @@
<artifactId>cassidy</artifactId> <artifactId>cassidy</artifactId>
<version>0.1</version> <version>0.1</version>
</dependency> </dependency>
<dependency>
<groupId>commons-pool</groupId>
<artifactId>commons-pool</artifactId>
<version>1.5.1</version>
</dependency>
<!-- For Jersey --> <!-- For Jersey -->
<dependency> <dependency>

View file

@ -16,7 +16,7 @@ import net.lag.configgy.{Config, Configgy, RuntimeEnvironment}
import kernel.jersey.AkkaServlet import kernel.jersey.AkkaServlet
import kernel.nio.RemoteServer import kernel.nio.RemoteServer
import kernel.state.CassandraStorage import kernel.state.EmbeddedCassandraStorage
import kernel.util.Logging import kernel.util.Logging
/** /**
@ -119,7 +119,7 @@ object Kernel extends Logging {
private[akka] def startCassandra = if (config.getBool("akka.storage.cassandra.service", true)) { private[akka] def startCassandra = if (config.getBool("akka.storage.cassandra.service", true)) {
System.setProperty("cassandra", "") System.setProperty("cassandra", "")
System.setProperty("storage-config", akka.Boot.CONFIG + "/") System.setProperty("storage-config", akka.Boot.CONFIG + "/")
CassandraStorage.start EmbeddedCassandraStorage.start
} }
private[akka] def startJersey = { private[akka] def startJersey = {
@ -159,7 +159,7 @@ object Kernel extends Logging {
println("=================================================") println("=================================================")
var start = System.currentTimeMillis var start = System.currentTimeMillis
for (i <- 1 to NR_ENTRIES) CassandraStorage.insertMapStorageEntryFor("test", i.toString, "data") for (i <- 1 to NR_ENTRIES) EmbeddedCassandraStorage.insertMapStorageEntryFor("test", i.toString, "data")
var end = System.currentTimeMillis var end = System.currentTimeMillis
println("Writes per second: " + NR_ENTRIES / ((end - start).toDouble / 1000)) println("Writes per second: " + NR_ENTRIES / ((end - start).toDouble / 1000))
@ -167,13 +167,13 @@ object Kernel extends Logging {
start = System.currentTimeMillis start = System.currentTimeMillis
val entries = new scala.collection.mutable.ArrayBuffer[Tuple2[String, String]] val entries = new scala.collection.mutable.ArrayBuffer[Tuple2[String, String]]
for (i <- 1 to NR_ENTRIES) entries += (i.toString, "data") for (i <- 1 to NR_ENTRIES) entries += (i.toString, "data")
CassandraStorage.insertMapStorageEntriesFor("test", entries.toList) EmbeddedCassandraStorage.insertMapStorageEntriesFor("test", entries.toList)
end = System.currentTimeMillis end = System.currentTimeMillis
println("Writes per second - batch: " + NR_ENTRIES / ((end - start).toDouble / 1000)) println("Writes per second - batch: " + NR_ENTRIES / ((end - start).toDouble / 1000))
println("=================================================") println("=================================================")
start = System.currentTimeMillis start = System.currentTimeMillis
for (i <- 1 to NR_ENTRIES) CassandraStorage.getMapStorageEntryFor("test", i.toString) for (i <- 1 to NR_ENTRIES) EmbeddedCassandraStorage.getMapStorageEntryFor("test", i.toString)
end = System.currentTimeMillis end = System.currentTimeMillis
println("Reads per second: " + NR_ENTRIES / ((end - start).toDouble / 1000)) println("Reads per second: " + NR_ENTRIES / ((end - start).toDouble / 1000))

View file

@ -18,6 +18,185 @@ import org.apache.thrift.transport.TServerSocket
import org.apache.thrift.transport.TTransportFactory import org.apache.thrift.transport.TTransportFactory
import org.apache.thrift.TProcessorFactory import org.apache.thrift.TProcessorFactory
/**
* NOTE: requires command line options:
* <br/>
* <code>-Dcassandra -Dstorage-config=config/ -Dpidfile=akka.pid</code>
* <p/>
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object EmbeddedCassandraStorage extends Logging {
val TABLE_NAME = "akka"
val MAP_COLUMN_FAMILY = "map"
val VECTOR_COLUMN_FAMILY = "vector"
val REF_COLUMN_FAMILY = "ref:item"
val IS_ASCENDING = true
val RUN_THRIFT_SERVICE = kernel.Kernel.config.getBool("akka.storage.cassandra.thrift-server.service", false)
val BLOCKING_CALL = {
if (kernel.Kernel.config.getBool("akka.storage.cassandra.blocking", true)) 0
else 1
}
@volatile private[this] var isRunning = false
private[this] val serializer: Serializer = {
kernel.Kernel.config.getString("akka.storage.cassandra.storage-format", "java") match {
case "scala-json" => Serializer.ScalaJSON
case "java-json" => Serializer.JavaJSON
//case "sbinary" => Serializer.SBinary
case "java" => Serializer.Java
case "avro" => throw new UnsupportedOperationException("Avro serialization protocol is not yet supported")
case unknown => throw new UnsupportedOperationException("Unknown storage serialization protocol [" + unknown + "]")
}
}
// TODO: is this server thread-safe or needed to be wrapped up in an actor?
private[this] val server = classOf[CassandraServer].newInstance.asInstanceOf[CassandraServer]
private[this] var thriftServer: CassandraThriftServer = _
def start = synchronized {
if (!isRunning) {
try {
server.start
log.info("Cassandra persistent storage has started up successfully");
} catch {
case e =>
log.error("Could not start up Cassandra persistent storage")
throw e
}
if (RUN_THRIFT_SERVICE) {
thriftServer = new CassandraThriftServer(server)
thriftServer.start
}
isRunning
}
}
def stop = if (isRunning) {
//server.storageService.shutdown
if (RUN_THRIFT_SERVICE) thriftServer.stop
}
// ===============================================================
// For Ref
// ===============================================================
def insertRefStorageFor(name: String, element: AnyRef) = {
server.insert(
TABLE_NAME,
name,
REF_COLUMN_FAMILY,
serializer.out(element),
System.currentTimeMillis,
BLOCKING_CALL)
}
def getRefStorageFor(name: String): Option[AnyRef] = {
try {
val column = server.get_column(TABLE_NAME, name, REF_COLUMN_FAMILY)
Some(serializer.in(column.value, None))
} catch {
case e =>
e.printStackTrace
None
}
}
// ===============================================================
// For Vector
// ===============================================================
def insertVectorStorageEntryFor(name: String, element: AnyRef) = {
server.insert(
TABLE_NAME,
name,
VECTOR_COLUMN_FAMILY + ":" + getVectorStorageSizeFor(name),
serializer.out(element),
System.currentTimeMillis,
BLOCKING_CALL)
}
def getVectorStorageEntryFor(name: String, index: Int): AnyRef = {
try {
val column = server.get_column(TABLE_NAME, name, VECTOR_COLUMN_FAMILY + ":" + index)
serializer.in(column.value, None)
} catch {
case e =>
e.printStackTrace
throw new Predef.NoSuchElementException(e.getMessage)
}
}
def getVectorStorageRangeFor(name: String, start: Int, count: Int): List[AnyRef] =
server.get_slice(TABLE_NAME, name, VECTOR_COLUMN_FAMILY, IS_ASCENDING, count)
.toArray.toList.asInstanceOf[List[Tuple2[String, AnyRef]]].map(tuple => tuple._2)
def getVectorStorageSizeFor(name: String): Int =
server.get_column_count(TABLE_NAME, name, VECTOR_COLUMN_FAMILY)
// ===============================================================
// For Map
// ===============================================================
def insertMapStorageEntryFor(name: String, key: String, value: AnyRef) = {
server.insert(
TABLE_NAME,
name,
MAP_COLUMN_FAMILY + ":" + key,
serializer.out(value),
System.currentTimeMillis,
BLOCKING_CALL)
}
def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[String, AnyRef]]) = {
import java.util.{Map, HashMap, List, ArrayList}
val columns: Map[String, List[column_t]] = new HashMap
for (entry <- entries) {
val cls: List[column_t] = new ArrayList
cls.add(new column_t(entry._1, serializer.out(entry._2), System.currentTimeMillis))
columns.put(MAP_COLUMN_FAMILY, cls)
}
server.batch_insert(new batch_mutation_t(
TABLE_NAME,
name,
columns),
BLOCKING_CALL)
}
def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef] = {
try {
val column = server.get_column(TABLE_NAME, name, MAP_COLUMN_FAMILY + ":" + key)
Some(serializer.in(column.value, None))
} catch {
case e =>
e.printStackTrace
None
}
}
def getMapStorageFor(name: String): List[Tuple2[String, AnyRef]] = {
val columns = server.get_columns_since(TABLE_NAME, name, MAP_COLUMN_FAMILY, -1)
.toArray.toList.asInstanceOf[List[org.apache.cassandra.service.column_t]]
for {
column <- columns
col = (column.columnName, serializer.in(column.value, None))
} yield col
}
def getMapStorageSizeFor(name: String): Int =
server.get_column_count(TABLE_NAME, name, MAP_COLUMN_FAMILY)
def removeMapStorageFor(name: String) =
server.remove(TABLE_NAME, name, MAP_COLUMN_FAMILY, System.currentTimeMillis, BLOCKING_CALL)
def getMapStorageRangeFor(name: String, start: Int, count: Int): List[Tuple2[String, AnyRef]] = {
server.get_slice(TABLE_NAME, name, MAP_COLUMN_FAMILY, IS_ASCENDING, count)
.toArray.toList.asInstanceOf[List[Tuple2[String, AnyRef]]]
}
}
/** /**
* NOTE: requires command line options: * NOTE: requires command line options:
* <br/> * <br/>
@ -51,6 +230,18 @@ object CassandraStorage extends Logging {
} }
} }
implicit def strToBytes(s : String) = s.getBytes("UTF-8")
import scala.collection.jcl.Conversions._
import se.foldleft.pool._
import se.foldleft.cassidy._
val cassidy = new Cassidy(StackPool(SocketProvider("localhost", 9160)), Protocol.Binary) // or JSON
cassidy.doWork { s => {
val user_id = "1"
s.++|("users", user_id, "base_attributes:name", "Lord Foo Bar", false)
s.++|("users", user_id, "base_attributes:age", "24", false)
for( i <- s./("users", user_id, "base_attributes", None, None).toList) println(i)
}}
// TODO: is this server thread-safe or needed to be wrapped up in an actor? // TODO: is this server thread-safe or needed to be wrapped up in an actor?
private[this] val server = classOf[CassandraServer].newInstance.asInstanceOf[CassandraServer] private[this] val server = classOf[CassandraServer].newInstance.asInstanceOf[CassandraServer]

View file

@ -13,7 +13,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap}
sealed abstract class TransactionalStateConfig sealed abstract class TransactionalStateConfig
abstract class PersistentStorageConfig extends TransactionalStateConfig abstract class PersistentStorageConfig extends TransactionalStateConfig
case class CassandraStorageConfig extends PersistentStorageConfig case class EmbeddedCassandraStorageConfig extends PersistentStorageConfig
case class TerracottaStorageConfig extends PersistentStorageConfig case class TerracottaStorageConfig extends PersistentStorageConfig
case class TokyoCabinetStorageConfig extends PersistentStorageConfig case class TokyoCabinetStorageConfig extends PersistentStorageConfig
@ -22,7 +22,7 @@ case class TokyoCabinetStorageConfig extends PersistentStorageConfig
* <p/> * <p/>
* Example Scala usage: * Example Scala usage:
* <pre> * <pre>
* val myMap = TransactionalState.newPersistentMap(CassandraStorageConfig) * val myMap = TransactionalState.newPersistentMap(EmbeddedCassandraStorageConfig)
* </pre> * </pre>
*/ */
object TransactionalState extends TransactionalState object TransactionalState extends TransactionalState
@ -33,24 +33,24 @@ object TransactionalState extends TransactionalState
* Example Java usage: * Example Java usage:
* <pre> * <pre>
* TransactionalState state = new TransactionalState(); * TransactionalState state = new TransactionalState();
* TransactionalMap myMap = state.newPersistentMap(new CassandraStorageConfig()); * TransactionalMap myMap = state.newPersistentMap(new EmbeddedCassandraStorageConfig());
* </pre> * </pre>
*/ */
class TransactionalState { class TransactionalState {
def newPersistentMap(config: PersistentStorageConfig): TransactionalMap[String, AnyRef] = config match { def newPersistentMap(config: PersistentStorageConfig): TransactionalMap[String, AnyRef] = config match {
case CassandraStorageConfig() => new CassandraPersistentTransactionalMap case EmbeddedCassandraStorageConfig() => new CassandraPersistentTransactionalMap
case TerracottaStorageConfig() => throw new UnsupportedOperationException case TerracottaStorageConfig() => throw new UnsupportedOperationException
case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException
} }
def newPersistentVector(config: PersistentStorageConfig): TransactionalVector[AnyRef] = config match { def newPersistentVector(config: PersistentStorageConfig): TransactionalVector[AnyRef] = config match {
case CassandraStorageConfig() => new CassandraPersistentTransactionalVector case EmbeddedCassandraStorageConfig() => new CassandraPersistentTransactionalVector
case TerracottaStorageConfig() => throw new UnsupportedOperationException case TerracottaStorageConfig() => throw new UnsupportedOperationException
case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException
} }
def newPersistentRef(config: PersistentStorageConfig): TransactionalRef[AnyRef] = config match { def newPersistentRef(config: PersistentStorageConfig): TransactionalRef[AnyRef] = config match {
case CassandraStorageConfig() => new CassandraPersistentTransactionalRef case EmbeddedCassandraStorageConfig() => new CassandraPersistentTransactionalRef
case TerracottaStorageConfig() => throw new UnsupportedOperationException case TerracottaStorageConfig() => throw new UnsupportedOperationException
case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException
} }
@ -208,7 +208,7 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Str
override def getRange(start: Int, count: Int) = { override def getRange(start: Int, count: Int) = {
verifyTransaction verifyTransaction
try { try {
CassandraStorage.getMapStorageRangeFor(uuid, start, count) EmbeddedCassandraStorage.getMapStorageRangeFor(uuid, start, count)
} catch { } catch {
case e: Exception => Nil case e: Exception => Nil
} }
@ -216,7 +216,7 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Str
// ---- For Transactional ---- // ---- For Transactional ----
override def commit = { override def commit = {
CassandraStorage.insertMapStorageEntriesFor(uuid, changeSet.toList) EmbeddedCassandraStorage.insertMapStorageEntriesFor(uuid, changeSet.toList)
changeSet.clear changeSet.clear
} }
@ -224,7 +224,7 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Str
override def clear = { override def clear = {
verifyTransaction verifyTransaction
try { try {
CassandraStorage.removeMapStorageFor(uuid) EmbeddedCassandraStorage.removeMapStorageFor(uuid)
} catch { } catch {
case e: Exception => {} case e: Exception => {}
} }
@ -233,7 +233,7 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Str
override def contains(key: String): Boolean = { override def contains(key: String): Boolean = {
try { try {
verifyTransaction verifyTransaction
CassandraStorage.getMapStorageEntryFor(uuid, key).isDefined EmbeddedCassandraStorage.getMapStorageEntryFor(uuid, key).isDefined
} catch { } catch {
case e: Exception => false case e: Exception => false
} }
@ -242,7 +242,7 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Str
override def size: Int = { override def size: Int = {
verifyTransaction verifyTransaction
try { try {
CassandraStorage.getMapStorageSizeFor(uuid) EmbeddedCassandraStorage.getMapStorageSizeFor(uuid)
} catch { } catch {
case e: Exception => 0 case e: Exception => 0
} }
@ -254,7 +254,7 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Str
// if (changeSet.contains(key)) changeSet.get(key) // if (changeSet.contains(key)) changeSet.get(key)
// else { // else {
val result = try { val result = try {
CassandraStorage.getMapStorageEntryFor(uuid, key) EmbeddedCassandraStorage.getMapStorageEntryFor(uuid, key)
} catch { } catch {
case e: Exception => None case e: Exception => None
} }
@ -266,7 +266,7 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Str
//verifyTransaction //verifyTransaction
new Iterator[Tuple2[String, AnyRef]] { new Iterator[Tuple2[String, AnyRef]] {
private val originalList: List[Tuple2[String, AnyRef]] = try { private val originalList: List[Tuple2[String, AnyRef]] = try {
CassandraStorage.getMapStorageFor(uuid) EmbeddedCassandraStorage.getMapStorageFor(uuid)
} catch { } catch {
case e: Throwable => Nil case e: Throwable => Nil
} }
@ -388,17 +388,17 @@ class CassandraPersistentTransactionalVector extends PersistentTransactionalVect
override def get(index: Int): AnyRef = { override def get(index: Int): AnyRef = {
verifyTransaction verifyTransaction
if (changeSet.size > index) changeSet(index) if (changeSet.size > index) changeSet(index)
else CassandraStorage.getVectorStorageEntryFor(uuid, index) else EmbeddedCassandraStorage.getVectorStorageEntryFor(uuid, index)
} }
override def getRange(start: Int, count: Int): List[AnyRef] = { override def getRange(start: Int, count: Int): List[AnyRef] = {
verifyTransaction verifyTransaction
CassandraStorage.getVectorStorageRangeFor(uuid, start, count) EmbeddedCassandraStorage.getVectorStorageRangeFor(uuid, start, count)
} }
override def length: Int = { override def length: Int = {
verifyTransaction verifyTransaction
CassandraStorage.getVectorStorageSizeFor(uuid) EmbeddedCassandraStorage.getVectorStorageSizeFor(uuid)
} }
override def apply(index: Int): AnyRef = get(index) override def apply(index: Int): AnyRef = get(index)
@ -415,7 +415,7 @@ class CassandraPersistentTransactionalVector extends PersistentTransactionalVect
// ---- For Transactional ---- // ---- For Transactional ----
override def commit = { override def commit = {
// FIXME: should use batch function once the bug is resolved // FIXME: should use batch function once the bug is resolved
for (element <- changeSet) CassandraStorage.insertVectorStorageEntryFor(uuid, element) for (element <- changeSet) EmbeddedCassandraStorage.insertVectorStorageEntryFor(uuid, element)
changeSet.clear changeSet.clear
} }
} }
@ -460,7 +460,7 @@ class TransactionalRef[T] extends Transactional {
class CassandraPersistentTransactionalRef extends TransactionalRef[AnyRef] { class CassandraPersistentTransactionalRef extends TransactionalRef[AnyRef] {
override def commit = if (ref.isDefined) { override def commit = if (ref.isDefined) {
CassandraStorage.insertRefStorageFor(uuid, ref.get) EmbeddedCassandraStorage.insertRefStorageFor(uuid, ref.get)
ref = None ref = None
} }
@ -468,7 +468,7 @@ class CassandraPersistentTransactionalRef extends TransactionalRef[AnyRef] {
override def get: Option[AnyRef] = { override def get: Option[AnyRef] = {
verifyTransaction verifyTransaction
CassandraStorage.getRefStorageFor(uuid) EmbeddedCassandraStorage.getRefStorageFor(uuid)
} }
override def isDefined: Boolean = get.isDefined override def isDefined: Boolean = get.isDefined

View file

@ -7,16 +7,16 @@ import junit.framework.TestCase
import kernel.Kernel import kernel.Kernel
import kernel.reactor._ import kernel.reactor._
import kernel.state.{CassandraStorageConfig, TransactionalState} import kernel.state.{EmbeddedCassandraStorageConfig, TransactionalState}
import org.junit.{Test, Before} import org.junit.{Test, Before}
import org.junit.Assert._ import org.junit.Assert._
class PersistentActor extends Actor { class PersistentActor extends Actor {
timeout = 100000 timeout = 100000
makeTransactionRequired makeTransactionRequired
private val mapState = TransactionalState.newPersistentMap(CassandraStorageConfig()) private val mapState = TransactionalState.newPersistentMap(EmbeddedCassandraStorageConfig())
private val vectorState = TransactionalState.newPersistentVector(CassandraStorageConfig()) private val vectorState = TransactionalState.newPersistentVector(EmbeddedCassandraStorageConfig())
private val refState = TransactionalState.newPersistentRef(CassandraStorageConfig()) private val refState = TransactionalState.newPersistentRef(EmbeddedCassandraStorageConfig())
def receive: PartialFunction[Any, Unit] = { def receive: PartialFunction[Any, Unit] = {
case GetMapState(key) => case GetMapState(key) =>

View file

@ -1,7 +1,7 @@
package sample.scala package sample.scala
import javax.ws.rs.{Path, GET, Produces} import javax.ws.rs.{Path, GET, Produces}
import se.scalablesolutions.akka.kernel.state.{TransactionalState, TransactionalMap, CassandraStorageConfig} import se.scalablesolutions.akka.kernel.state.{TransactionalState, TransactionalMap, EmbeddedCassandraStorageConfig}
import se.scalablesolutions.akka.kernel.actor.{Supervisor, SupervisorFactory, Actor, StartSupervisor} import se.scalablesolutions.akka.kernel.actor.{Supervisor, SupervisorFactory, Actor, StartSupervisor}
import se.scalablesolutions.akka.kernel.config.ScalaConfig._ import se.scalablesolutions.akka.kernel.config.ScalaConfig._
@ -35,7 +35,7 @@ class SimpleService extends Actor {
case object Tick case object Tick
private val KEY = "COUNTER"; private val KEY = "COUNTER";
private var hasStartedTicking = false; private var hasStartedTicking = false;
private val storage = TransactionalState.newPersistentMap(CassandraStorageConfig()) private val storage = TransactionalState.newPersistentMap(EmbeddedCassandraStorageConfig())
@GET @GET
@Produces(Array("application/json")) @Produces(Array("application/json"))