diff --git a/README.textile b/README.textile index dd4595d2c7..b241c1605d 100644 --- a/README.textile +++ b/README.textile @@ -1,73 +1,68 @@ -h1. Akka Actor Kernel: -__RESTful Distributed Persistent Transactional Actors__ - -h1. Introduction - -The Akka kernel implements a unique hybrid of: -* The Actor model (Actors and Active Objects) -** Asynchronous, non-blocking highly concurrent components. -** Supervision with "let-it-crash" semantics. Components are loosely coupled and restarted upon failure. -* Software Transactional Memory (STM). -* BASE persistence - Pluggable Eventually Consistent distributed scalable persistent storage. -* Remoting - Distributed services. -* REST - JAX-RS binding. - -h2. Here is a short overview - -h3. The Actor model and supervisor hierarchies - -"Actors":http://en.wikipedia.org/wiki/Actor_model with "Erlang OTP-style supervisors":http://www.erlang.org/doc/design_principles/sup_princ.html#5 and "embrace failure/let-it-crash" semantics to allow implementation of asynchronous, non-blocking and highly fault-tolerant systems. Sort of "SEDA":http://www.eecs.harvard.edu/~mdw/proj/seda/ in a box with highly configurable and monitorable (JMX and w3c) thread pools and message queues. - -h3. Software Transactional Memory (STM) - -"Software Transactional Memory (STM)":http://en.wikipedia.org/wiki/Software_transactional_memory for composable message flows. Distributed transactions will come very soon, backed up by "ZooKeeper":http://hadoop.apache.org/zookeeper/. The STM works with both persistent datastructures and in-memory datastructures (see below). - -h3. BASE: Eventually Consistent Distributed persistence - -Akka provides a "Eventually Consistent":http://www.allthingsdistributed.com/2008/12/eventually_consistent.html Transactional Persistent Map, Vector and Ref. Backed up by the "Cassandra":http://incubator.apache.org/cassandra/ highly scalable, eventually consistent, distributed, structured key-value store. Akka will add support for "Terracotta":http://terracotta.org, "Redis":http://code.google.com/p/redis/, "Memcached":http://www.danga.com/memcached/, "Voldemort":http://project-voldemort.com/, "Tokyo Cabinet/Tyrant":http://tokyocabinet.sourceforge.net/ and "Hazelcast":http://www.hazelcast.com/ shortly. - -New nodes can be added and removed on the fly to support true scaling of cluster. The addition of Terracotta and Hazelcast will allow for atomic (ACID) transactions (non-BASE). - -h3. REST - -Actors can be exposed as "REST":http://en.wikipedia.org/wiki/Representational_State_Transfer services through "JAX-RS":https://jersey.dev.java.net/. - -h3. Remoting - -Actors can be defined and started on remote nodes, supporting both remote failures and supervision/linking. Enabling another dimension of fault-tolerance. - -h3. Java and Scala API - -Both a Java API through Active Objects and annotations as well as a Scala API with Erlang-style Actors with pattern matching etc. - -h3. Microkernel - -Akka has a microkernel that embeds the Actor management, Persistence service, REST integration, JMX management and Remote service. Simply drop your application in the /deploy directory and start up the kernel and you should be able to access your Actors through REST. - -h2. Documentation - -Akka has pretty thorough "reference documentation":https://github.com/jboner/akka/wikis. Covering examples, APIs and configuration. - -h2. Distribution - -The latest distribution can be found in the "downloads section":https://github.com/jboner/akka/downloads - -h2. License - -
-This software is licensed under the Apache 2 license, quoted below.
-
-Copyright 2009 Scalable Solutions AB 
-
-Licensed under the Apache License, Version 2.0 (the "License"); you may not
-use this file except in compliance with the License. You may obtain a copy of
-the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-License for the specific language governing permissions and limitations under
-the License.
-
+h1. Akka: RESTful Distributed Persistent Transactional Actors + +h3. "http://akkasource.org":http://akkasource.org + +Akka implements a unique hybrid of: +* The Actor model (Actors and Active Objects), which gives you: +** Concurrency (high-level and simple) +** Asynchronous, non-blocking and highly performant components. +** Supervision with "let-it-crash" semantics. Components are loosely coupled and restarted upon failure. +* Software Transactional Memory (STM). +* BASE and ACID persistence - Pluggable Eventually Consistent or ACID distributed scalable persistent storage. +* Remoting - Distributed services with supervision and error management +* REST (JAX-RS) and Comet bindings. +* Monitoring and Management + +Akka can be used in two different ways: +* As a library: used by a web app, to be put into ‘WEB-INF/lib’ +* As a kernel: stand-alone kernel, embedding the servlet container + +See the "Use-case and Deployment Scenarios":http://wiki.github.com/jboner/akka/use-case-and-deployment-scenarios for details. + +h1. What's Akka all about? Why should I care? + +If you are new to Akka then I suggest you start with either the: + +* "High Level View":http://wiki.github.com/jboner/akka/modules-the-high-level-view; which is outlining the different modules in Akka. +* "Use-case and Deployment Scenarios":http://wiki.github.com/jboner/akka/use-case-and-deployment-scenarios; outlining how and in which use-case and deployment scenarios can I use Akka? +* "Examples":http://wiki.github.com/jboner/akka/examples; showing how to build a RESTful, transactional, persistent Active Object and Actor. + +After that you can dive into the "Reference Manual":http://wiki.github.com/jboner/akka/akka-reference-manual. + +h1. Documentation + +Akka has pretty thorough "reference documentation":https://github.com/jboner/akka/wikis. Covering examples, APIs and configuration. + +h1. Distribution + +The latest distribution can be found in the "downloads section":https://github.com/jboner/akka/downloads + +h1. Mailing List + +If you have questions and/or feedback: please sign up to the Akka User mailing list: +"http://groups.google.com/group/akka-user":http://groups.google.com/group/akka-user + +h1. Professional Support + +Scalable Solutions AB is providing a variety of professional support packages for Akka, please visit their website for details: +"http://scalablesolutions.se":http://scalablesolutions.se + +h1. License + +
+This software is licensed under the Apache 2 license, quoted below.
+
+Copyright 2009 Scalable Solutions AB 
+
+Licensed under the Apache License, Version 2.0 (the "License"); you may not
+use this file except in compliance with the License. You may obtain a copy of
+the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+License for the specific language governing permissions and limitations under
+the License.
+
diff --git a/kernel/pom.xml b/kernel/pom.xml index fd13176fcd..79c20b2a1c 100644 --- a/kernel/pom.xml +++ b/kernel/pom.xml @@ -125,6 +125,11 @@ cassidy 0.1 + + commons-pool + commons-pool + 1.5.1 + diff --git a/kernel/src/main/scala/Kernel.scala b/kernel/src/main/scala/Kernel.scala index 84d87fd2f4..5aef3fe949 100644 --- a/kernel/src/main/scala/Kernel.scala +++ b/kernel/src/main/scala/Kernel.scala @@ -16,7 +16,7 @@ import net.lag.configgy.{Config, Configgy, RuntimeEnvironment} import kernel.jersey.AkkaServlet import kernel.nio.RemoteServer -import kernel.state.CassandraStorage +import kernel.state.EmbeddedCassandraStorage import kernel.util.Logging /** @@ -119,7 +119,7 @@ object Kernel extends Logging { private[akka] def startCassandra = if (config.getBool("akka.storage.cassandra.service", true)) { System.setProperty("cassandra", "") System.setProperty("storage-config", akka.Boot.CONFIG + "/") - CassandraStorage.start + EmbeddedCassandraStorage.start } private[akka] def startJersey = { @@ -159,7 +159,7 @@ object Kernel extends Logging { println("=================================================") var start = System.currentTimeMillis - for (i <- 1 to NR_ENTRIES) CassandraStorage.insertMapStorageEntryFor("test", i.toString, "data") + for (i <- 1 to NR_ENTRIES) EmbeddedCassandraStorage.insertMapStorageEntryFor("test", i.toString, "data") var end = System.currentTimeMillis println("Writes per second: " + NR_ENTRIES / ((end - start).toDouble / 1000)) @@ -167,13 +167,13 @@ object Kernel extends Logging { start = System.currentTimeMillis val entries = new scala.collection.mutable.ArrayBuffer[Tuple2[String, String]] for (i <- 1 to NR_ENTRIES) entries += (i.toString, "data") - CassandraStorage.insertMapStorageEntriesFor("test", entries.toList) + EmbeddedCassandraStorage.insertMapStorageEntriesFor("test", entries.toList) end = System.currentTimeMillis println("Writes per second - batch: " + NR_ENTRIES / ((end - start).toDouble / 1000)) println("=================================================") start = System.currentTimeMillis - for (i <- 1 to NR_ENTRIES) CassandraStorage.getMapStorageEntryFor("test", i.toString) + for (i <- 1 to NR_ENTRIES) EmbeddedCassandraStorage.getMapStorageEntryFor("test", i.toString) end = System.currentTimeMillis println("Reads per second: " + NR_ENTRIES / ((end - start).toDouble / 1000)) diff --git a/kernel/src/main/scala/state/CassandraStorage.scala b/kernel/src/main/scala/state/CassandraStorage.scala index 151d37126a..5cd46ec9a2 100644 --- a/kernel/src/main/scala/state/CassandraStorage.scala +++ b/kernel/src/main/scala/state/CassandraStorage.scala @@ -18,6 +18,185 @@ import org.apache.thrift.transport.TServerSocket import org.apache.thrift.transport.TTransportFactory import org.apache.thrift.TProcessorFactory +/** + * NOTE: requires command line options: + *
+ * -Dcassandra -Dstorage-config=config/ -Dpidfile=akka.pid + *

+ * @author Jonas Bonér + */ +object EmbeddedCassandraStorage extends Logging { + val TABLE_NAME = "akka" + val MAP_COLUMN_FAMILY = "map" + val VECTOR_COLUMN_FAMILY = "vector" + val REF_COLUMN_FAMILY = "ref:item" + + val IS_ASCENDING = true + + val RUN_THRIFT_SERVICE = kernel.Kernel.config.getBool("akka.storage.cassandra.thrift-server.service", false) + val BLOCKING_CALL = { + if (kernel.Kernel.config.getBool("akka.storage.cassandra.blocking", true)) 0 + else 1 + } + + @volatile private[this] var isRunning = false + private[this] val serializer: Serializer = { + kernel.Kernel.config.getString("akka.storage.cassandra.storage-format", "java") match { + case "scala-json" => Serializer.ScalaJSON + case "java-json" => Serializer.JavaJSON + //case "sbinary" => Serializer.SBinary + case "java" => Serializer.Java + case "avro" => throw new UnsupportedOperationException("Avro serialization protocol is not yet supported") + case unknown => throw new UnsupportedOperationException("Unknown storage serialization protocol [" + unknown + "]") + } + } + + // TODO: is this server thread-safe or needed to be wrapped up in an actor? + private[this] val server = classOf[CassandraServer].newInstance.asInstanceOf[CassandraServer] + + private[this] var thriftServer: CassandraThriftServer = _ + + def start = synchronized { + if (!isRunning) { + try { + server.start + log.info("Cassandra persistent storage has started up successfully"); + } catch { + case e => + log.error("Could not start up Cassandra persistent storage") + throw e + } + if (RUN_THRIFT_SERVICE) { + thriftServer = new CassandraThriftServer(server) + thriftServer.start + } + isRunning + } + } + + def stop = if (isRunning) { + //server.storageService.shutdown + if (RUN_THRIFT_SERVICE) thriftServer.stop + } + + // =============================================================== + // For Ref + // =============================================================== + + def insertRefStorageFor(name: String, element: AnyRef) = { + server.insert( + TABLE_NAME, + name, + REF_COLUMN_FAMILY, + serializer.out(element), + System.currentTimeMillis, + BLOCKING_CALL) + } + + def getRefStorageFor(name: String): Option[AnyRef] = { + try { + val column = server.get_column(TABLE_NAME, name, REF_COLUMN_FAMILY) + Some(serializer.in(column.value, None)) + } catch { + case e => + e.printStackTrace + None + } + } + + // =============================================================== + // For Vector + // =============================================================== + + def insertVectorStorageEntryFor(name: String, element: AnyRef) = { + server.insert( + TABLE_NAME, + name, + VECTOR_COLUMN_FAMILY + ":" + getVectorStorageSizeFor(name), + serializer.out(element), + System.currentTimeMillis, + BLOCKING_CALL) + } + + def getVectorStorageEntryFor(name: String, index: Int): AnyRef = { + try { + val column = server.get_column(TABLE_NAME, name, VECTOR_COLUMN_FAMILY + ":" + index) + serializer.in(column.value, None) + } catch { + case e => + e.printStackTrace + throw new Predef.NoSuchElementException(e.getMessage) + } + } + + def getVectorStorageRangeFor(name: String, start: Int, count: Int): List[AnyRef] = + server.get_slice(TABLE_NAME, name, VECTOR_COLUMN_FAMILY, IS_ASCENDING, count) + .toArray.toList.asInstanceOf[List[Tuple2[String, AnyRef]]].map(tuple => tuple._2) + + def getVectorStorageSizeFor(name: String): Int = + server.get_column_count(TABLE_NAME, name, VECTOR_COLUMN_FAMILY) + + // =============================================================== + // For Map + // =============================================================== + + def insertMapStorageEntryFor(name: String, key: String, value: AnyRef) = { + server.insert( + TABLE_NAME, + name, + MAP_COLUMN_FAMILY + ":" + key, + serializer.out(value), + System.currentTimeMillis, + BLOCKING_CALL) + } + + def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[String, AnyRef]]) = { + import java.util.{Map, HashMap, List, ArrayList} + val columns: Map[String, List[column_t]] = new HashMap + for (entry <- entries) { + val cls: List[column_t] = new ArrayList + cls.add(new column_t(entry._1, serializer.out(entry._2), System.currentTimeMillis)) + columns.put(MAP_COLUMN_FAMILY, cls) + } + server.batch_insert(new batch_mutation_t( + TABLE_NAME, + name, + columns), + BLOCKING_CALL) + } + + def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef] = { + try { + val column = server.get_column(TABLE_NAME, name, MAP_COLUMN_FAMILY + ":" + key) + Some(serializer.in(column.value, None)) + } catch { + case e => + e.printStackTrace + None + } + } + + def getMapStorageFor(name: String): List[Tuple2[String, AnyRef]] = { + val columns = server.get_columns_since(TABLE_NAME, name, MAP_COLUMN_FAMILY, -1) + .toArray.toList.asInstanceOf[List[org.apache.cassandra.service.column_t]] + for { + column <- columns + col = (column.columnName, serializer.in(column.value, None)) + } yield col + } + + def getMapStorageSizeFor(name: String): Int = + server.get_column_count(TABLE_NAME, name, MAP_COLUMN_FAMILY) + + def removeMapStorageFor(name: String) = + server.remove(TABLE_NAME, name, MAP_COLUMN_FAMILY, System.currentTimeMillis, BLOCKING_CALL) + + def getMapStorageRangeFor(name: String, start: Int, count: Int): List[Tuple2[String, AnyRef]] = { + server.get_slice(TABLE_NAME, name, MAP_COLUMN_FAMILY, IS_ASCENDING, count) + .toArray.toList.asInstanceOf[List[Tuple2[String, AnyRef]]] + } +} + /** * NOTE: requires command line options: *
@@ -50,6 +229,18 @@ object CassandraStorage extends Logging { case unknown => throw new UnsupportedOperationException("Unknown storage serialization protocol [" + unknown + "]") } } + + implicit def strToBytes(s : String) = s.getBytes("UTF-8") + import scala.collection.jcl.Conversions._ + import se.foldleft.pool._ + import se.foldleft.cassidy._ + val cassidy = new Cassidy(StackPool(SocketProvider("localhost", 9160)), Protocol.Binary) // or JSON + cassidy.doWork { s => { + val user_id = "1" + s.++|("users", user_id, "base_attributes:name", "Lord Foo Bar", false) + s.++|("users", user_id, "base_attributes:age", "24", false) + for( i <- s./("users", user_id, "base_attributes", None, None).toList) println(i) + }} // TODO: is this server thread-safe or needed to be wrapped up in an actor? private[this] val server = classOf[CassandraServer].newInstance.asInstanceOf[CassandraServer] diff --git a/kernel/src/main/scala/state/State.scala b/kernel/src/main/scala/state/State.scala index 8967ed3dd8..32a0ea9888 100644 --- a/kernel/src/main/scala/state/State.scala +++ b/kernel/src/main/scala/state/State.scala @@ -13,7 +13,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap} sealed abstract class TransactionalStateConfig abstract class PersistentStorageConfig extends TransactionalStateConfig -case class CassandraStorageConfig extends PersistentStorageConfig +case class EmbeddedCassandraStorageConfig extends PersistentStorageConfig case class TerracottaStorageConfig extends PersistentStorageConfig case class TokyoCabinetStorageConfig extends PersistentStorageConfig @@ -22,7 +22,7 @@ case class TokyoCabinetStorageConfig extends PersistentStorageConfig *

* Example Scala usage: *

- * val myMap = TransactionalState.newPersistentMap(CassandraStorageConfig)
+ * val myMap = TransactionalState.newPersistentMap(EmbeddedCassandraStorageConfig)
  * 
*/ object TransactionalState extends TransactionalState @@ -33,24 +33,24 @@ object TransactionalState extends TransactionalState * Example Java usage: *
  * TransactionalState state = new TransactionalState();
- * TransactionalMap myMap = state.newPersistentMap(new CassandraStorageConfig());
+ * TransactionalMap myMap = state.newPersistentMap(new EmbeddedCassandraStorageConfig());
  * 
*/ class TransactionalState { def newPersistentMap(config: PersistentStorageConfig): TransactionalMap[String, AnyRef] = config match { - case CassandraStorageConfig() => new CassandraPersistentTransactionalMap + case EmbeddedCassandraStorageConfig() => new CassandraPersistentTransactionalMap case TerracottaStorageConfig() => throw new UnsupportedOperationException case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException } def newPersistentVector(config: PersistentStorageConfig): TransactionalVector[AnyRef] = config match { - case CassandraStorageConfig() => new CassandraPersistentTransactionalVector + case EmbeddedCassandraStorageConfig() => new CassandraPersistentTransactionalVector case TerracottaStorageConfig() => throw new UnsupportedOperationException case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException } def newPersistentRef(config: PersistentStorageConfig): TransactionalRef[AnyRef] = config match { - case CassandraStorageConfig() => new CassandraPersistentTransactionalRef + case EmbeddedCassandraStorageConfig() => new CassandraPersistentTransactionalRef case TerracottaStorageConfig() => throw new UnsupportedOperationException case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException } @@ -208,7 +208,7 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Str override def getRange(start: Int, count: Int) = { verifyTransaction try { - CassandraStorage.getMapStorageRangeFor(uuid, start, count) + EmbeddedCassandraStorage.getMapStorageRangeFor(uuid, start, count) } catch { case e: Exception => Nil } @@ -216,7 +216,7 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Str // ---- For Transactional ---- override def commit = { - CassandraStorage.insertMapStorageEntriesFor(uuid, changeSet.toList) + EmbeddedCassandraStorage.insertMapStorageEntriesFor(uuid, changeSet.toList) changeSet.clear } @@ -224,7 +224,7 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Str override def clear = { verifyTransaction try { - CassandraStorage.removeMapStorageFor(uuid) + EmbeddedCassandraStorage.removeMapStorageFor(uuid) } catch { case e: Exception => {} } @@ -233,7 +233,7 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Str override def contains(key: String): Boolean = { try { verifyTransaction - CassandraStorage.getMapStorageEntryFor(uuid, key).isDefined + EmbeddedCassandraStorage.getMapStorageEntryFor(uuid, key).isDefined } catch { case e: Exception => false } @@ -242,7 +242,7 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Str override def size: Int = { verifyTransaction try { - CassandraStorage.getMapStorageSizeFor(uuid) + EmbeddedCassandraStorage.getMapStorageSizeFor(uuid) } catch { case e: Exception => 0 } @@ -254,7 +254,7 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Str // if (changeSet.contains(key)) changeSet.get(key) // else { val result = try { - CassandraStorage.getMapStorageEntryFor(uuid, key) + EmbeddedCassandraStorage.getMapStorageEntryFor(uuid, key) } catch { case e: Exception => None } @@ -266,7 +266,7 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Str //verifyTransaction new Iterator[Tuple2[String, AnyRef]] { private val originalList: List[Tuple2[String, AnyRef]] = try { - CassandraStorage.getMapStorageFor(uuid) + EmbeddedCassandraStorage.getMapStorageFor(uuid) } catch { case e: Throwable => Nil } @@ -388,17 +388,17 @@ class CassandraPersistentTransactionalVector extends PersistentTransactionalVect override def get(index: Int): AnyRef = { verifyTransaction if (changeSet.size > index) changeSet(index) - else CassandraStorage.getVectorStorageEntryFor(uuid, index) + else EmbeddedCassandraStorage.getVectorStorageEntryFor(uuid, index) } override def getRange(start: Int, count: Int): List[AnyRef] = { verifyTransaction - CassandraStorage.getVectorStorageRangeFor(uuid, start, count) + EmbeddedCassandraStorage.getVectorStorageRangeFor(uuid, start, count) } override def length: Int = { verifyTransaction - CassandraStorage.getVectorStorageSizeFor(uuid) + EmbeddedCassandraStorage.getVectorStorageSizeFor(uuid) } override def apply(index: Int): AnyRef = get(index) @@ -415,7 +415,7 @@ class CassandraPersistentTransactionalVector extends PersistentTransactionalVect // ---- For Transactional ---- override def commit = { // FIXME: should use batch function once the bug is resolved - for (element <- changeSet) CassandraStorage.insertVectorStorageEntryFor(uuid, element) + for (element <- changeSet) EmbeddedCassandraStorage.insertVectorStorageEntryFor(uuid, element) changeSet.clear } } @@ -460,7 +460,7 @@ class TransactionalRef[T] extends Transactional { class CassandraPersistentTransactionalRef extends TransactionalRef[AnyRef] { override def commit = if (ref.isDefined) { - CassandraStorage.insertRefStorageFor(uuid, ref.get) + EmbeddedCassandraStorage.insertRefStorageFor(uuid, ref.get) ref = None } @@ -468,7 +468,7 @@ class CassandraPersistentTransactionalRef extends TransactionalRef[AnyRef] { override def get: Option[AnyRef] = { verifyTransaction - CassandraStorage.getRefStorageFor(uuid) + EmbeddedCassandraStorage.getRefStorageFor(uuid) } override def isDefined: Boolean = get.isDefined @@ -478,4 +478,4 @@ class CassandraPersistentTransactionalRef extends TransactionalRef[AnyRef] { if (ref.isDefined) ref else default } -} \ No newline at end of file +} diff --git a/kernel/src/test/scala/PersistentActorSpec.scala b/kernel/src/test/scala/PersistentActorSpec.scala index 0d8b464dd1..990d7e5d83 100644 --- a/kernel/src/test/scala/PersistentActorSpec.scala +++ b/kernel/src/test/scala/PersistentActorSpec.scala @@ -7,16 +7,16 @@ import junit.framework.TestCase import kernel.Kernel import kernel.reactor._ -import kernel.state.{CassandraStorageConfig, TransactionalState} +import kernel.state.{EmbeddedCassandraStorageConfig, TransactionalState} import org.junit.{Test, Before} import org.junit.Assert._ class PersistentActor extends Actor { timeout = 100000 makeTransactionRequired - private val mapState = TransactionalState.newPersistentMap(CassandraStorageConfig()) - private val vectorState = TransactionalState.newPersistentVector(CassandraStorageConfig()) - private val refState = TransactionalState.newPersistentRef(CassandraStorageConfig()) + private val mapState = TransactionalState.newPersistentMap(EmbeddedCassandraStorageConfig()) + private val vectorState = TransactionalState.newPersistentVector(EmbeddedCassandraStorageConfig()) + private val refState = TransactionalState.newPersistentRef(EmbeddedCassandraStorageConfig()) def receive: PartialFunction[Any, Unit] = { case GetMapState(key) => diff --git a/samples-scala/src/main/scala/SimpleService.scala b/samples-scala/src/main/scala/SimpleService.scala index b29dbef6ec..dca7efdd5c 100644 --- a/samples-scala/src/main/scala/SimpleService.scala +++ b/samples-scala/src/main/scala/SimpleService.scala @@ -1,7 +1,7 @@ package sample.scala import javax.ws.rs.{Path, GET, Produces} -import se.scalablesolutions.akka.kernel.state.{TransactionalState, TransactionalMap, CassandraStorageConfig} +import se.scalablesolutions.akka.kernel.state.{TransactionalState, TransactionalMap, EmbeddedCassandraStorageConfig} import se.scalablesolutions.akka.kernel.actor.{Supervisor, SupervisorFactory, Actor, StartSupervisor} import se.scalablesolutions.akka.kernel.config.ScalaConfig._ @@ -35,7 +35,7 @@ class SimpleService extends Actor { case object Tick private val KEY = "COUNTER"; private var hasStartedTicking = false; - private val storage = TransactionalState.newPersistentMap(CassandraStorageConfig()) + private val storage = TransactionalState.newPersistentMap(EmbeddedCassandraStorageConfig()) @GET @Produces(Array("application/json")) @@ -58,4 +58,4 @@ class SimpleService extends Actor { override protected def postRestart(reason: AnyRef, config: Option[AnyRef]) = { println("Restarting due to: " + reason.asInstanceOf[Exception].getMessage) } -} \ No newline at end of file +}