From 5d41b79b277011c4c839f2447541ee970c779a2c Mon Sep 17 00:00:00 2001 From: jboner Date: Wed, 19 Aug 2009 12:56:33 +0200 Subject: [PATCH] removed Cassandra startup procedure --- akka.ipr | 22 +- akka.iws | 596 +++++------------- .../akka/api/PersistenceManager.java | 4 +- kernel/src/main/scala/Kernel.scala | 22 +- .../main/scala/state/CassandraStorage.scala | 105 ++- .../src/test/scala/PersistentActorSpec.scala | 8 - util-java/akka-util-java.iml | 1 - 7 files changed, 202 insertions(+), 556 deletions(-) diff --git a/akka.ipr b/akka.ipr index ab65121d85..226c8db6c4 100644 --- a/akka.ipr +++ b/akka.ipr @@ -1869,6 +1869,17 @@ + + + + + + + + + + + @@ -1913,17 +1924,6 @@ - - - - - - - - - - - diff --git a/akka.iws b/akka.iws index d06ec27572..bdf018e8e4 100644 --- a/akka.iws +++ b/akka.iws @@ -6,7 +6,12 @@ - + + + + + + @@ -89,151 +94,15 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - @@ -247,22 +116,22 @@ @@ -322,116 +191,6 @@ @@ -1972,16 +1693,16 @@ - + - + + - @@ -1992,7 +1713,7 @@ - + @@ -2035,45 +1756,6 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - @@ -2115,27 +1797,6 @@ - - - - - - - - - - - - - - - - - - - - - @@ -2164,41 +1825,74 @@ - - - - - + + + + + + + + - + - + - + + + + + + + + - + + + + + + + + + + + + - + - - - - + + + + + + + + + + + + + + + + diff --git a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistenceManager.java b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistenceManager.java index 8e498368d5..b059c81657 100644 --- a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistenceManager.java +++ b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistenceManager.java @@ -4,9 +4,7 @@ public class PersistenceManager { private static volatile boolean isRunning = false; public static void init() { if (!isRunning) { - se.scalablesolutions.akka.kernel.Kernel.config(); - se.scalablesolutions.akka.kernel.Kernel.startCassandra(); - se.scalablesolutions.akka.kernel.Kernel.startRemoteService(); + se.scalablesolutions.akka.kernel.Kernel.startRemoteService(); isRunning = true; } } diff --git a/kernel/src/main/scala/Kernel.scala b/kernel/src/main/scala/Kernel.scala index ada31e9f91..a77daaa63b 100644 --- a/kernel/src/main/scala/Kernel.scala +++ b/kernel/src/main/scala/Kernel.scala @@ -63,17 +63,6 @@ object Kernel extends Logging { if (RUN_REMOTE_SERVICE) startRemoteService if (RUN_MANAGEMENT_SERVICE) startManagementService - - STORAGE_SYSTEM match { - case "cassandra" => startCassandra - case "terracotta" => throw new UnsupportedOperationException("terracotta storage backend is not yet supported") - case "mongodb" => throw new UnsupportedOperationException("mongodb storage backend is not yet supported") - case "redis" => throw new UnsupportedOperationException("redis storage backend is not yet supported") - case "voldemort" => throw new UnsupportedOperationException("voldemort storage backend is not yet supported") - case "tokyo-cabinet" => throw new UnsupportedOperationException("tokyo-cabinet storage backend is not yet supported") - case _ => throw new UnsupportedOperationException("Unknown storage system [" + STORAGE_SYSTEM + "]") - } - if (RUN_REST_SERVICE) startREST Thread.currentThread.setContextClassLoader(getClass.getClassLoader) @@ -84,7 +73,7 @@ object Kernel extends Logging { def uptime = (System.currentTimeMillis - startTime) / 1000 - def setupConfig: Config = { + private def setupConfig: Config = { if (HOME.isDefined) { try { val configFile = HOME.get + "/config/akka.conf" @@ -143,14 +132,7 @@ object Kernel extends Logging { log.info("Management service started successfully.") } - private[akka] def startCassandra = if (config.getBool("akka.storage.cassandra.service", true)) { - System.setProperty("cassandra", "") - if (HOME.isDefined) System.setProperty("storage-config", HOME.get + "/config/") - else if (System.getProperty("storage-config", "NIL") == "NIL") throw new IllegalStateException("AKKA_HOME and -Dstorage-config=... is not set. Can't start up Cassandra. Either set AKKA_HOME or set the -Dstorage-config=... variable to the directory with the Cassandra storage-conf.xml file.") - CassandraStorage.start - } - - private[akka] def startREST = { + def startREST = { val uri = UriBuilder.fromUri(REST_URL).port(REST_PORT).build() val scheme = uri.getScheme diff --git a/kernel/src/main/scala/state/CassandraStorage.scala b/kernel/src/main/scala/state/CassandraStorage.scala index fdc6e607db..10ce3722d7 100644 --- a/kernel/src/main/scala/state/CassandraStorage.scala +++ b/kernel/src/main/scala/state/CassandraStorage.scala @@ -57,47 +57,28 @@ object CassandraStorage extends MapStorage } } - private[this] var sessions: Option[CassandraSessionPool[_]] = None - - def start = synchronized { - if (!isRunning) { - try { - sessions = Some(new CassandraSessionPool( - KEYSPACE, - StackPool(SocketProvider(CASSANDRA_SERVER_HOSTNAME, CASSANDRA_SERVER_PORT)), - protocol, - CONSISTENCY_LEVEL)) - log.info("Cassandra persistent storage has started up successfully"); - } catch { - case e => - log.error("Could not start up Cassandra persistent storage") - throw e - } - isRunning - } - } - - def stop = synchronized { - if (isRunning && sessions.isDefined) sessions.get.close - } - + private[this] var sessions = new CassandraSessionPool( + KEYSPACE, + StackPool(SocketProvider(CASSANDRA_SERVER_HOSTNAME, CASSANDRA_SERVER_PORT)), + protocol, + CONSISTENCY_LEVEL) // =============================================================== // For Ref // =============================================================== - override def insertRefStorageFor(name: String, element: AnyRef) = if (sessions.isDefined) { - sessions.get.withSession { + override def insertRefStorageFor(name: String, element: AnyRef) = { + sessions.withSession { _ ++| (name, new ColumnPath(REF_COLUMN_PARENT.getColumn_family, null, REF_KEY), serializer.out(element), System.currentTimeMillis, CONSISTENCY_LEVEL) } - } else throw new IllegalStateException("CassandraStorage is not started") + } - override def getRefStorageFor(name: String): Option[AnyRef] = if (sessions.isDefined) { + override def getRefStorageFor(name: String): Option[AnyRef] = { try { - val column: Option[Column] = sessions.get.withSession { + val column: Option[Column] = sessions.withSession { _ | (name, new ColumnPath(REF_COLUMN_PARENT.getColumn_family, null, REF_KEY)) } if (column.isDefined) Some(serializer.in(column.get.value, None)) @@ -107,37 +88,37 @@ object CassandraStorage extends MapStorage e.printStackTrace None } - } else throw new IllegalStateException("CassandraStorage is not started") + } // =============================================================== // For Vector // =============================================================== - override def insertVectorStorageEntryFor(name: String, element: AnyRef) = if (sessions.isDefined) { - sessions.get.withSession { + override def insertVectorStorageEntryFor(name: String, element: AnyRef) = { + sessions.withSession { _ ++| (name, new ColumnPath(VECTOR_COLUMN_PARENT.getColumn_family, null, intToBytes(getVectorStorageSizeFor(name))), serializer.out(element), System.currentTimeMillis, CONSISTENCY_LEVEL) } - } else throw new IllegalStateException("CassandraStorage is not started") + } override def insertVectorStorageEntriesFor(name: String, elements: List[AnyRef]) = { } - override def getVectorStorageEntryFor(name: String, index: Int): AnyRef = if (sessions.isDefined) { - val column: Option[Column] = sessions.get.withSession { + override def getVectorStorageEntryFor(name: String, index: Int): AnyRef = { + val column: Option[Column] = sessions.withSession { _ | (name, new ColumnPath(VECTOR_COLUMN_PARENT.getColumn_family, null, intToBytes(index))) } if (column.isDefined) serializer.in(column.get.value, None) else throw new NoSuchElementException("No element for vector [" + name + "] and index [" + index + "]") - } else throw new IllegalStateException("CassandraStorage is not started") + } - override def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[AnyRef] = if (sessions.isDefined) { + override def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[AnyRef] = { val startBytes = if (start.isDefined) intToBytes(start.get) else null val finishBytes = if (finish.isDefined) intToBytes(finish.get) else null - val columns: List[Column] = sessions.get.withSession { + val columns: List[Column] = sessions.withSession { _ / (name, VECTOR_COLUMN_PARENT, startBytes, finishBytes, @@ -146,43 +127,43 @@ object CassandraStorage extends MapStorage CONSISTENCY_LEVEL) } columns.map(column => serializer.in(column.value, None)) - } else throw new IllegalStateException("CassandraStorage is not started") + } - override def getVectorStorageSizeFor(name: String): Int = if (sessions.isDefined) { - sessions.get.withSession { + override def getVectorStorageSizeFor(name: String): Int = { + sessions.withSession { _ |# (name, VECTOR_COLUMN_PARENT) } - } else throw new IllegalStateException("CassandraStorage is not started") + } // =============================================================== // For Map // =============================================================== - override def insertMapStorageEntryFor(name: String, key: AnyRef, element: AnyRef) = if (sessions.isDefined) { - sessions.get.withSession { + override def insertMapStorageEntryFor(name: String, key: AnyRef, element: AnyRef) = { + sessions.withSession { _ ++| (name, new ColumnPath(MAP_COLUMN_PARENT.getColumn_family, null, serializer.out(key)), serializer.out(element), System.currentTimeMillis, CONSISTENCY_LEVEL) } - } else throw new IllegalStateException("CassandraStorage is not started") + } - override def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[AnyRef, AnyRef]]) = if (sessions.isDefined) { + override def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[AnyRef, AnyRef]]) = { val cf2columns: java.util.Map[String, java.util.List[Column]] = new java.util.HashMap for (entry <- entries) { val columns: java.util.List[Column] = new java.util.ArrayList columns.add(new Column(serializer.out(entry._1), serializer.out(entry._2), System.currentTimeMillis)) cf2columns.put(MAP_COLUMN_PARENT.getColumn_family, columns) } - sessions.get.withSession { + sessions.withSession { _ ++| (new BatchMutation(name, cf2columns), CONSISTENCY_LEVEL) } - } else throw new IllegalStateException("CassandraStorage is not started") + } - override def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef] = if (sessions.isDefined) { + override def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef] = { try { - val column: Option[Column] = sessions.get.withSession { + val column: Option[Column] = sessions.withSession { _ | (name, new ColumnPath(MAP_COLUMN_PARENT.getColumn_family, null, serializer.out(key))) } if (column.isDefined) Some(serializer.in(column.get.value, None)) @@ -192,9 +173,9 @@ object CassandraStorage extends MapStorage e.printStackTrace None } - } else throw new IllegalStateException("CassandraStorage is not started") + } - override def getMapStorageFor(name: String): List[Tuple2[AnyRef, AnyRef]] = if (sessions.isDefined) { + override def getMapStorageFor(name: String): List[Tuple2[AnyRef, AnyRef]] = { throw new UnsupportedOperationException /* val columns = server.get_columns_since(name, MAP_COLUMN_FAMILY, -1) @@ -204,35 +185,35 @@ object CassandraStorage extends MapStorage col = (column.columnName, column.value) } yield col */ - } else throw new IllegalStateException("CassandraStorage is not started") + } - override def getMapStorageSizeFor(name: String): Int = if (sessions.isDefined) { - sessions.get.withSession { + override def getMapStorageSizeFor(name: String): Int = { + sessions.withSession { _ |# (name, MAP_COLUMN_PARENT) } - } else throw new IllegalStateException("CassandraStorage is not started") + } override def removeMapStorageFor(name: String): Unit = removeMapStorageFor(name, null) - override def removeMapStorageFor(name: String, key: AnyRef): Unit = if (sessions.isDefined) { + override def removeMapStorageFor(name: String, key: AnyRef): Unit = { val keyBytes = if (key == null) null else serializer.out(key) - sessions.get.withSession { + sessions.withSession { _ -- (name, new ColumnPathOrParent(MAP_COLUMN_PARENT.getColumn_family, null, keyBytes), System.currentTimeMillis, CONSISTENCY_LEVEL) } - } else throw new IllegalStateException("CassandraStorage is not started") + } override def getMapStorageRangeFor(name: String, start: Option[AnyRef], finish: Option[AnyRef], count: Int): - List[Tuple2[AnyRef, AnyRef]] = if (sessions.isDefined) { + List[Tuple2[AnyRef, AnyRef]] = { val startBytes = if (start.isDefined) serializer.out(start.get) else null val finishBytes = if (finish.isDefined) serializer.out(finish.get) else null - val columns: List[Column] = sessions.get.withSession { + val columns: List[Column] = sessions.withSession { _ / (name, MAP_COLUMN_PARENT, startBytes, finishBytes, IS_ASCENDING, count, CONSISTENCY_LEVEL) } columns.map(column => (column.name, serializer.in(column.value, None))) - } else throw new IllegalStateException("CassandraStorage is not started") + } } /** diff --git a/kernel/src/test/scala/PersistentActorSpec.scala b/kernel/src/test/scala/PersistentActorSpec.scala index 0d8b464dd1..07002e27eb 100644 --- a/kernel/src/test/scala/PersistentActorSpec.scala +++ b/kernel/src/test/scala/PersistentActorSpec.scala @@ -56,15 +56,7 @@ class PersistentActor extends Actor { } } -object PersistenceManager { - @volatile var isRunning = false - def init = if (!isRunning) { - Kernel.startCassandra - isRunning = true - } -} class PersistentActorSpec extends TestCase { - PersistenceManager.init @Test def testMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess = { diff --git a/util-java/akka-util-java.iml b/util-java/akka-util-java.iml index 646310bd42..a80717f779 100644 --- a/util-java/akka-util-java.iml +++ b/util-java/akka-util-java.iml @@ -12,7 +12,6 @@ -