diff --git a/akka.ipr b/akka.ipr
index ab65121d85..226c8db6c4 100644
--- a/akka.ipr
+++ b/akka.ipr
@@ -1869,6 +1869,17 @@
+
+
+
+
+
+
+
+
+
+
+
@@ -1913,17 +1924,6 @@
-
-
-
-
-
-
-
-
-
-
-
diff --git a/akka.iws b/akka.iws
index d06ec27572..bdf018e8e4 100644
--- a/akka.iws
+++ b/akka.iws
@@ -6,7 +6,12 @@
-
+
+
+
+
+
+
@@ -89,151 +94,15 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
@@ -247,22 +116,22 @@
@@ -322,116 +191,6 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
@@ -533,36 +292,6 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
@@ -593,66 +322,6 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
@@ -721,40 +390,78 @@
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -765,6 +472,20 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -1972,16 +1693,16 @@
-
+
-
+
+
-
@@ -1992,7 +1713,7 @@
-
+
@@ -2035,45 +1756,6 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
@@ -2115,27 +1797,6 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
@@ -2164,41 +1825,74 @@
-
-
-
-
-
+
+
+
+
+
+
+
+
-
+
-
+
-
+
+
+
+
+
+
+
+
-
+
+
+
+
+
+
+
+
+
+
+
+
-
+
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistenceManager.java b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistenceManager.java
index 8e498368d5..b059c81657 100644
--- a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistenceManager.java
+++ b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistenceManager.java
@@ -4,9 +4,7 @@ public class PersistenceManager {
private static volatile boolean isRunning = false;
public static void init() {
if (!isRunning) {
- se.scalablesolutions.akka.kernel.Kernel.config();
- se.scalablesolutions.akka.kernel.Kernel.startCassandra();
- se.scalablesolutions.akka.kernel.Kernel.startRemoteService();
+ se.scalablesolutions.akka.kernel.Kernel.startRemoteService();
isRunning = true;
}
}
diff --git a/kernel/src/main/scala/Kernel.scala b/kernel/src/main/scala/Kernel.scala
index ada31e9f91..a77daaa63b 100644
--- a/kernel/src/main/scala/Kernel.scala
+++ b/kernel/src/main/scala/Kernel.scala
@@ -63,17 +63,6 @@ object Kernel extends Logging {
if (RUN_REMOTE_SERVICE) startRemoteService
if (RUN_MANAGEMENT_SERVICE) startManagementService
-
- STORAGE_SYSTEM match {
- case "cassandra" => startCassandra
- case "terracotta" => throw new UnsupportedOperationException("terracotta storage backend is not yet supported")
- case "mongodb" => throw new UnsupportedOperationException("mongodb storage backend is not yet supported")
- case "redis" => throw new UnsupportedOperationException("redis storage backend is not yet supported")
- case "voldemort" => throw new UnsupportedOperationException("voldemort storage backend is not yet supported")
- case "tokyo-cabinet" => throw new UnsupportedOperationException("tokyo-cabinet storage backend is not yet supported")
- case _ => throw new UnsupportedOperationException("Unknown storage system [" + STORAGE_SYSTEM + "]")
- }
-
if (RUN_REST_SERVICE) startREST
Thread.currentThread.setContextClassLoader(getClass.getClassLoader)
@@ -84,7 +73,7 @@ object Kernel extends Logging {
def uptime = (System.currentTimeMillis - startTime) / 1000
- def setupConfig: Config = {
+ private def setupConfig: Config = {
if (HOME.isDefined) {
try {
val configFile = HOME.get + "/config/akka.conf"
@@ -143,14 +132,7 @@ object Kernel extends Logging {
log.info("Management service started successfully.")
}
- private[akka] def startCassandra = if (config.getBool("akka.storage.cassandra.service", true)) {
- System.setProperty("cassandra", "")
- if (HOME.isDefined) System.setProperty("storage-config", HOME.get + "/config/")
- else if (System.getProperty("storage-config", "NIL") == "NIL") throw new IllegalStateException("AKKA_HOME and -Dstorage-config=... is not set. Can't start up Cassandra. Either set AKKA_HOME or set the -Dstorage-config=... variable to the directory with the Cassandra storage-conf.xml file.")
- CassandraStorage.start
- }
-
- private[akka] def startREST = {
+ def startREST = {
val uri = UriBuilder.fromUri(REST_URL).port(REST_PORT).build()
val scheme = uri.getScheme
diff --git a/kernel/src/main/scala/state/CassandraStorage.scala b/kernel/src/main/scala/state/CassandraStorage.scala
index fdc6e607db..10ce3722d7 100644
--- a/kernel/src/main/scala/state/CassandraStorage.scala
+++ b/kernel/src/main/scala/state/CassandraStorage.scala
@@ -57,47 +57,28 @@ object CassandraStorage extends MapStorage
}
}
- private[this] var sessions: Option[CassandraSessionPool[_]] = None
-
- def start = synchronized {
- if (!isRunning) {
- try {
- sessions = Some(new CassandraSessionPool(
- KEYSPACE,
- StackPool(SocketProvider(CASSANDRA_SERVER_HOSTNAME, CASSANDRA_SERVER_PORT)),
- protocol,
- CONSISTENCY_LEVEL))
- log.info("Cassandra persistent storage has started up successfully");
- } catch {
- case e =>
- log.error("Could not start up Cassandra persistent storage")
- throw e
- }
- isRunning
- }
- }
-
- def stop = synchronized {
- if (isRunning && sessions.isDefined) sessions.get.close
- }
-
+ private[this] var sessions = new CassandraSessionPool(
+ KEYSPACE,
+ StackPool(SocketProvider(CASSANDRA_SERVER_HOSTNAME, CASSANDRA_SERVER_PORT)),
+ protocol,
+ CONSISTENCY_LEVEL)
// ===============================================================
// For Ref
// ===============================================================
- override def insertRefStorageFor(name: String, element: AnyRef) = if (sessions.isDefined) {
- sessions.get.withSession {
+ override def insertRefStorageFor(name: String, element: AnyRef) = {
+ sessions.withSession {
_ ++| (name,
new ColumnPath(REF_COLUMN_PARENT.getColumn_family, null, REF_KEY),
serializer.out(element),
System.currentTimeMillis,
CONSISTENCY_LEVEL)
}
- } else throw new IllegalStateException("CassandraStorage is not started")
+ }
- override def getRefStorageFor(name: String): Option[AnyRef] = if (sessions.isDefined) {
+ override def getRefStorageFor(name: String): Option[AnyRef] = {
try {
- val column: Option[Column] = sessions.get.withSession {
+ val column: Option[Column] = sessions.withSession {
_ | (name, new ColumnPath(REF_COLUMN_PARENT.getColumn_family, null, REF_KEY))
}
if (column.isDefined) Some(serializer.in(column.get.value, None))
@@ -107,37 +88,37 @@ object CassandraStorage extends MapStorage
e.printStackTrace
None
}
- } else throw new IllegalStateException("CassandraStorage is not started")
+ }
// ===============================================================
// For Vector
// ===============================================================
- override def insertVectorStorageEntryFor(name: String, element: AnyRef) = if (sessions.isDefined) {
- sessions.get.withSession {
+ override def insertVectorStorageEntryFor(name: String, element: AnyRef) = {
+ sessions.withSession {
_ ++| (name,
new ColumnPath(VECTOR_COLUMN_PARENT.getColumn_family, null, intToBytes(getVectorStorageSizeFor(name))),
serializer.out(element),
System.currentTimeMillis,
CONSISTENCY_LEVEL)
}
- } else throw new IllegalStateException("CassandraStorage is not started")
+ }
override def insertVectorStorageEntriesFor(name: String, elements: List[AnyRef]) = {
}
- override def getVectorStorageEntryFor(name: String, index: Int): AnyRef = if (sessions.isDefined) {
- val column: Option[Column] = sessions.get.withSession {
+ override def getVectorStorageEntryFor(name: String, index: Int): AnyRef = {
+ val column: Option[Column] = sessions.withSession {
_ | (name, new ColumnPath(VECTOR_COLUMN_PARENT.getColumn_family, null, intToBytes(index)))
}
if (column.isDefined) serializer.in(column.get.value, None)
else throw new NoSuchElementException("No element for vector [" + name + "] and index [" + index + "]")
- } else throw new IllegalStateException("CassandraStorage is not started")
+ }
- override def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[AnyRef] = if (sessions.isDefined) {
+ override def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[AnyRef] = {
val startBytes = if (start.isDefined) intToBytes(start.get) else null
val finishBytes = if (finish.isDefined) intToBytes(finish.get) else null
- val columns: List[Column] = sessions.get.withSession {
+ val columns: List[Column] = sessions.withSession {
_ / (name,
VECTOR_COLUMN_PARENT,
startBytes, finishBytes,
@@ -146,43 +127,43 @@ object CassandraStorage extends MapStorage
CONSISTENCY_LEVEL)
}
columns.map(column => serializer.in(column.value, None))
- } else throw new IllegalStateException("CassandraStorage is not started")
+ }
- override def getVectorStorageSizeFor(name: String): Int = if (sessions.isDefined) {
- sessions.get.withSession {
+ override def getVectorStorageSizeFor(name: String): Int = {
+ sessions.withSession {
_ |# (name, VECTOR_COLUMN_PARENT)
}
- } else throw new IllegalStateException("CassandraStorage is not started")
+ }
// ===============================================================
// For Map
// ===============================================================
- override def insertMapStorageEntryFor(name: String, key: AnyRef, element: AnyRef) = if (sessions.isDefined) {
- sessions.get.withSession {
+ override def insertMapStorageEntryFor(name: String, key: AnyRef, element: AnyRef) = {
+ sessions.withSession {
_ ++| (name,
new ColumnPath(MAP_COLUMN_PARENT.getColumn_family, null, serializer.out(key)),
serializer.out(element),
System.currentTimeMillis,
CONSISTENCY_LEVEL)
}
- } else throw new IllegalStateException("CassandraStorage is not started")
+ }
- override def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[AnyRef, AnyRef]]) = if (sessions.isDefined) {
+ override def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[AnyRef, AnyRef]]) = {
val cf2columns: java.util.Map[String, java.util.List[Column]] = new java.util.HashMap
for (entry <- entries) {
val columns: java.util.List[Column] = new java.util.ArrayList
columns.add(new Column(serializer.out(entry._1), serializer.out(entry._2), System.currentTimeMillis))
cf2columns.put(MAP_COLUMN_PARENT.getColumn_family, columns)
}
- sessions.get.withSession {
+ sessions.withSession {
_ ++| (new BatchMutation(name, cf2columns), CONSISTENCY_LEVEL)
}
- } else throw new IllegalStateException("CassandraStorage is not started")
+ }
- override def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef] = if (sessions.isDefined) {
+ override def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef] = {
try {
- val column: Option[Column] = sessions.get.withSession {
+ val column: Option[Column] = sessions.withSession {
_ | (name, new ColumnPath(MAP_COLUMN_PARENT.getColumn_family, null, serializer.out(key)))
}
if (column.isDefined) Some(serializer.in(column.get.value, None))
@@ -192,9 +173,9 @@ object CassandraStorage extends MapStorage
e.printStackTrace
None
}
- } else throw new IllegalStateException("CassandraStorage is not started")
+ }
- override def getMapStorageFor(name: String): List[Tuple2[AnyRef, AnyRef]] = if (sessions.isDefined) {
+ override def getMapStorageFor(name: String): List[Tuple2[AnyRef, AnyRef]] = {
throw new UnsupportedOperationException
/*
val columns = server.get_columns_since(name, MAP_COLUMN_FAMILY, -1)
@@ -204,35 +185,35 @@ object CassandraStorage extends MapStorage
col = (column.columnName, column.value)
} yield col
*/
- } else throw new IllegalStateException("CassandraStorage is not started")
+ }
- override def getMapStorageSizeFor(name: String): Int = if (sessions.isDefined) {
- sessions.get.withSession {
+ override def getMapStorageSizeFor(name: String): Int = {
+ sessions.withSession {
_ |# (name, MAP_COLUMN_PARENT)
}
- } else throw new IllegalStateException("CassandraStorage is not started")
+ }
override def removeMapStorageFor(name: String): Unit = removeMapStorageFor(name, null)
- override def removeMapStorageFor(name: String, key: AnyRef): Unit = if (sessions.isDefined) {
+ override def removeMapStorageFor(name: String, key: AnyRef): Unit = {
val keyBytes = if (key == null) null else serializer.out(key)
- sessions.get.withSession {
+ sessions.withSession {
_ -- (name,
new ColumnPathOrParent(MAP_COLUMN_PARENT.getColumn_family, null, keyBytes),
System.currentTimeMillis,
CONSISTENCY_LEVEL)
}
- } else throw new IllegalStateException("CassandraStorage is not started")
+ }
override def getMapStorageRangeFor(name: String, start: Option[AnyRef], finish: Option[AnyRef], count: Int):
- List[Tuple2[AnyRef, AnyRef]] = if (sessions.isDefined) {
+ List[Tuple2[AnyRef, AnyRef]] = {
val startBytes = if (start.isDefined) serializer.out(start.get) else null
val finishBytes = if (finish.isDefined) serializer.out(finish.get) else null
- val columns: List[Column] = sessions.get.withSession {
+ val columns: List[Column] = sessions.withSession {
_ / (name, MAP_COLUMN_PARENT, startBytes, finishBytes, IS_ASCENDING, count, CONSISTENCY_LEVEL)
}
columns.map(column => (column.name, serializer.in(column.value, None)))
- } else throw new IllegalStateException("CassandraStorage is not started")
+ }
}
/**
diff --git a/kernel/src/test/scala/PersistentActorSpec.scala b/kernel/src/test/scala/PersistentActorSpec.scala
index 0d8b464dd1..07002e27eb 100644
--- a/kernel/src/test/scala/PersistentActorSpec.scala
+++ b/kernel/src/test/scala/PersistentActorSpec.scala
@@ -56,15 +56,7 @@ class PersistentActor extends Actor {
}
}
-object PersistenceManager {
- @volatile var isRunning = false
- def init = if (!isRunning) {
- Kernel.startCassandra
- isRunning = true
- }
-}
class PersistentActorSpec extends TestCase {
- PersistenceManager.init
@Test
def testMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
diff --git a/util-java/akka-util-java.iml b/util-java/akka-util-java.iml
index 646310bd42..a80717f779 100644
--- a/util-java/akka-util-java.iml
+++ b/util-java/akka-util-java.iml
@@ -12,7 +12,6 @@
-