diff --git a/akka.ipr b/akka.ipr
index a22637ec00..eda6fcf859 100644
--- a/akka.ipr
+++ b/akka.ipr
@@ -917,17 +917,6 @@
-
-
-
-
-
-
-
-
-
-
-
@@ -1316,28 +1305,6 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
@@ -1809,6 +1776,50 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/akka.iws b/akka.iws
index a542bf2f14..74b79505fb 100644
--- a/akka.iws
+++ b/akka.iws
@@ -7,11 +7,26 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -71,7 +86,7 @@
-
+
@@ -147,10 +162,91 @@
-
+
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -169,22 +265,22 @@
@@ -222,7 +318,7 @@
- Inspections
+ AOP
@@ -408,6 +504,352 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -439,6 +881,14 @@
+
+
+
+
+
+
+
+
@@ -466,6 +916,36 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -496,66 +976,6 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
@@ -662,41 +1082,7 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
@@ -831,7 +1217,7 @@
-
+
@@ -877,7 +1263,7 @@
-
+
@@ -2046,16 +2432,17 @@
-
+
-
+
-
+
+
@@ -2066,8 +2453,7 @@
-
-
+
@@ -2110,114 +2496,114 @@
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/bin/start-akka-server.sh b/bin/start-akka-server.sh
index 04814b7b11..44cdeec8f5 100755
--- a/bin/start-akka-server.sh
+++ b/bin/start-akka-server.sh
@@ -47,6 +47,7 @@ CLASSPATH=$CLASSPATH:$LIB_DIR/guice-jsr250-2.0-SNAPSHOT.jar
CLASSPATH=$CLASSPATH:$LIB_DIR/high-scale-lib.jar
CLASSPATH=$CLASSPATH:$LIB_DIR/jackson-core-asl-1.1.0.jar
CLASSPATH=$CLASSPATH:$LIB_DIR/jackson-mapper-asl-1.1.0.jar
+CLASSPATH=$CLASSPATH:$LIB_DIR/javautils-2.7.4-0.1.jar
CLASSPATH=$CLASSPATH:$LIB_DIR/jersey-client-1.1.1-ea.jar
CLASSPATH=$CLASSPATH:$LIB_DIR/jersey-core-1.1.1-ea.jar
CLASSPATH=$CLASSPATH:$LIB_DIR/jersey-json-1.1.1-ea.jar
@@ -59,7 +60,7 @@ CLASSPATH=$CLASSPATH:$LIB_DIR/libfb303.jar
CLASSPATH=$CLASSPATH:$LIB_DIR/libthrift.jar
CLASSPATH=$CLASSPATH:$LIB_DIR/log4j-1.2.15.jar
CLASSPATH=$CLASSPATH:$LIB_DIR/lucene-core-2.2.0.jar
-CLASSPATH=$CLASSPATH:$LIB_DIR/netty-3.1.0.CR1.jar
+CLASSPATH=$CLASSPATH:$LIB_DIR/netty-3.1.0.GA.jar
CLASSPATH=$CLASSPATH:$LIB_DIR/providerutil.jar
CLASSPATH=$CLASSPATH:$LIB_DIR/protobuf-java-2.1.0.jar
CLASSPATH=$CLASSPATH:$LIB_DIR/scala-library-2.7.5.jar
diff --git a/config/akka-reference.conf b/config/akka-reference.conf
index 539555444a..60aefb99fc 100644
--- a/config/akka-reference.conf
+++ b/config/akka-reference.conf
@@ -52,10 +52,10 @@
service = on
- hostname = "localhost" # ip address or hostname of one of the Cassandra cluster's seeds
+ hostname = "127.0.0.1" # ip address or hostname of one of the Cassandra cluster's seeds
port = 9160
- storage-format = "binary" # Options: binary, json, simple-json
- blocking = false # inserts and queries should be blocking or not
+ storage-format = "java" # Options: java, scala-json, java-json, protobuf
+ consistency-level = 1 #
diff --git a/embedded-repo/org/apache/cassandra/cassandra/0.4.0-dev/cassandra-0.4.0-dev.jar b/embedded-repo/org/apache/cassandra/cassandra/0.4.0-dev/cassandra-0.4.0-dev.jar
index d9f762ad2a..706336669f 100644
Binary files a/embedded-repo/org/apache/cassandra/cassandra/0.4.0-dev/cassandra-0.4.0-dev.jar and b/embedded-repo/org/apache/cassandra/cassandra/0.4.0-dev/cassandra-0.4.0-dev.jar differ
diff --git a/fun-test-java/akka-fun-test-java.iml b/fun-test-java/akka-fun-test-java.iml
index 9ef386fdde..93009ab9bd 100644
--- a/fun-test-java/akka-fun-test-java.iml
+++ b/fun-test-java/akka-fun-test-java.iml
@@ -32,8 +32,9 @@
-
+
+
diff --git a/kernel/akka-kernel.iml b/kernel/akka-kernel.iml
index 575fe344c0..91e7a458e0 100644
--- a/kernel/akka-kernel.iml
+++ b/kernel/akka-kernel.iml
@@ -48,8 +48,9 @@
-
+
+
diff --git a/kernel/pom.xml b/kernel/pom.xml
index 98f460cf2b..5af15b15be 100644
--- a/kernel/pom.xml
+++ b/kernel/pom.xml
@@ -54,13 +54,18 @@
org.jboss.netty
netty
- 3.1.0.CR1
+ 3.1.0.GA
org.apache
zookeeper
3.1.0
+
+ org.scala-tools
+ javautils
+ 2.7.4-0.1
+
diff --git a/kernel/src/main/scala/Kernel.scala b/kernel/src/main/scala/Kernel.scala
index 8ec3290782..7e07c35a41 100644
--- a/kernel/src/main/scala/Kernel.scala
+++ b/kernel/src/main/scala/Kernel.scala
@@ -12,11 +12,11 @@ import javax.ws.rs.core.UriBuilder
import java.io.File
import java.net.URLClassLoader
-import net.lag.configgy.{Config, Configgy, RuntimeEnvironment}
+import net.lag.configgy.{Config, Configgy, RuntimeEnvironment, ParseException}
import kernel.jersey.AkkaCometServlet
import kernel.nio.RemoteServer
-import kernel.state.EmbeddedCassandraStorage
+import kernel.state.CassandraStorage
import kernel.util.Logging
/**
@@ -82,7 +82,7 @@ object Kernel extends Logging {
Configgy.configureFromResource("akka.conf", getClass.getClassLoader)
log.info("Config loaded from the application classpath.")
} catch {
- case e: Exception =>
+ case e: ParseException =>
try {
if (HOME.isDefined) {
val configFile = HOME.get + "/config/akka.conf"
@@ -90,7 +90,7 @@ object Kernel extends Logging {
Configgy.configure(configFile)
} else throw new IllegalStateException("AKKA_HOME is not defined and no 'akka.conf' can be found on the classpath, aborting")
} catch {
- case e: net.lag.configgy.ParseException => throw new IllegalStateException("AKKA_HOME is not defined and no 'akka.conf' can be found on the classpath, aborting")
+ case e: ParseException => throw new IllegalStateException("AKKA_HOME is not defined and no 'akka.conf' can be found on the classpath, aborting")
}
}
//val runtime = new RuntimeEnvironment(getClass)
@@ -178,7 +178,7 @@ object Kernel extends Logging {
println("=================================================")
var start = System.currentTimeMillis
- for (i <- 1 to NR_ENTRIES) EmbeddedCassandraStorage.insertMapStorageEntryFor("test", i.toString, "data")
+ for (i <- 1 to NR_ENTRIES) CassandraStorage.insertMapStorageEntryFor("test", i.toString, "data")
var end = System.currentTimeMillis
println("Writes per second: " + NR_ENTRIES / ((end - start).toDouble / 1000))
@@ -186,13 +186,13 @@ object Kernel extends Logging {
start = System.currentTimeMillis
val entries = new scala.collection.mutable.ArrayBuffer[Tuple2[String, String]]
for (i <- 1 to NR_ENTRIES) entries += (i.toString, "data")
- EmbeddedCassandraStorage.insertMapStorageEntriesFor("test", entries.toList)
+ CassandraStorage.insertMapStorageEntriesFor("test", entries.toList)
end = System.currentTimeMillis
println("Writes per second - batch: " + NR_ENTRIES / ((end - start).toDouble / 1000))
println("=================================================")
start = System.currentTimeMillis
- for (i <- 1 to NR_ENTRIES) EmbeddedCassandraStorage.getMapStorageEntryFor("test", i.toString)
+ for (i <- 1 to NR_ENTRIES) CassandraStorage.getMapStorageEntryFor("test", i.toString)
end = System.currentTimeMillis
println("Reads per second: " + NR_ENTRIES / ((end - start).toDouble / 1000))
diff --git a/kernel/src/main/scala/serialization/Serializer.scala b/kernel/src/main/scala/serialization/Serializer.scala
index 12284067e0..bfa8bfe011 100644
--- a/kernel/src/main/scala/serialization/Serializer.scala
+++ b/kernel/src/main/scala/serialization/Serializer.scala
@@ -77,7 +77,6 @@ object Serializer {
message.toBuilder().mergeFrom(bytes).build
}
- // For Java
def in(bytes: Array[Byte], clazz: Class[_]): AnyRef = {
if (clazz == null) throw new IllegalArgumentException("Protobuf message can't be null")
in(bytes, Some(clazz))
diff --git a/kernel/src/main/scala/state/CassandraSession.scala b/kernel/src/main/scala/state/CassandraSession.scala
new file mode 100644
index 0000000000..a877b5bce1
--- /dev/null
+++ b/kernel/src/main/scala/state/CassandraSession.scala
@@ -0,0 +1,240 @@
+/**
+ * Copyright (C) 2009 Scalable Solutions.
+ */
+
+package se.scalablesolutions.akka.kernel.state
+
+import java.io.{Flushable, Closeable}
+
+import util.Logging
+import util.Helpers._
+import serialization.Serializer
+import kernel.Kernel.config
+
+import org.apache.cassandra.db.ColumnFamily
+import org.apache.cassandra.service._
+
+import org.apache.thrift.transport._
+import org.apache.thrift.protocol._
+
+/**
+ * @author Jonas Bonér
+ */
+trait CassandraSession extends Closeable with Flushable {
+ import scala.collection.jcl.Conversions._
+ import org.scala_tools.javautils.Imports._
+ import java.util.{Map => JMap}
+
+ protected val client: Cassandra.Client
+ protected val keyspace: String
+
+ val obtainedAt: Long
+ val consistencyLevel: Int
+ val schema: JMap[String, JMap[String, String]]
+
+ /**
+ * Count is always the max number of results to return.
+
+ So it means, starting with `start`, or the first one if start is
+ empty, go until you hit `finish` or `count`, whichever comes first.
+ Empty is not a legal column name so if finish is empty it is ignored
+ and only count is used.
+
+ We don't offer a numeric offset since that can't be supported
+ efficiently with a log-structured merge disk format.
+ */
+ def /(key: String, columnParent: ColumnParent, start: Array[Byte], end: Array[Byte], ascending: Boolean, count: Int): List[Column] =
+ /(key, columnParent, start, end, ascending, count, consistencyLevel)
+
+ def /(key: String, columnParent: ColumnParent, start: Array[Byte], end: Array[Byte], ascending: Boolean, count: Int, consistencyLevel: Int): List[Column] =
+ client.get_slice(keyspace, key, columnParent, start, end, ascending, count, consistencyLevel).toList
+
+ def /(key: String, columnParent: ColumnParent, colNames: List[Array[Byte]]): List[Column] =
+ /(key, columnParent, colNames, consistencyLevel)
+
+ def /(key: String, columnParent: ColumnParent, colNames: List[Array[Byte]], consistencyLevel: Int): List[Column] =
+ client.get_slice_by_names(keyspace, key, columnParent, colNames.asJava, consistencyLevel).toList
+
+ def |(key: String, colPath: ColumnPath): Option[Column] =
+ |(key, colPath, consistencyLevel)
+
+ def |(key: String, colPath: ColumnPath, consistencyLevel: Int): Option[Column] =
+ client.get_column(keyspace, key, colPath, consistencyLevel)
+
+ def |#(key: String, columnParent: ColumnParent): Int =
+ |#(key, columnParent, consistencyLevel)
+
+ def |#(key: String, columnParent: ColumnParent, consistencyLevel: Int): Int =
+ client.get_column_count(keyspace, key, columnParent, consistencyLevel)
+
+ def ++|(key: String, colPath: ColumnPath, value: Array[Byte]): Unit =
+ ++|(key, colPath, value, obtainedAt, consistencyLevel)
+
+ def ++|(key: String, colPath: ColumnPath, value: Array[Byte], timestamp: Long): Unit =
+ ++|(key, colPath, value, timestamp, consistencyLevel)
+
+ def ++|(key: String, colPath: ColumnPath, value: Array[Byte], timestamp: Long, consistencyLevel: Int) =
+ client.insert(keyspace, key, colPath, value, timestamp, consistencyLevel)
+
+ def ++|(batch: BatchMutation): Unit =
+ ++|(batch, consistencyLevel)
+
+ def ++|(batch: BatchMutation, consistencyLevel: Int): Unit =
+ client.batch_insert(keyspace, batch, consistencyLevel)
+
+ def --(key: String, columnPathOrParent: ColumnPathOrParent, timestamp: Long): Unit =
+ --(key, columnPathOrParent, timestamp, consistencyLevel)
+
+ def --(key: String, columnPathOrParent: ColumnPathOrParent, timestamp: Long, consistencyLevel: Int): Unit =
+ client.remove(keyspace, key, columnPathOrParent, timestamp, consistencyLevel)
+
+ def /^(key: String, columnFamily: String, start: Array[Byte], end: Array[Byte], ascending: Boolean, count: Int): List[SuperColumn] =
+ /^(key, columnFamily, start, end, ascending, count, consistencyLevel)
+
+ def /^(key: String, columnFamily: String, start: Array[Byte], end: Array[Byte], ascending: Boolean, count: Int, consistencyLevel: Int): List[SuperColumn] =
+ client.get_slice_super(keyspace, key, columnFamily, start, end, ascending, count, consistencyLevel).toList
+
+ def /^(key: String, columnFamily: String, superColNames: List[Array[Byte]]): List[SuperColumn] =
+ /^(key, columnFamily, superColNames, consistencyLevel)
+
+ def /^(key: String, columnFamily: String, superColNames: List[Array[Byte]], consistencyLevel: Int): List[SuperColumn] =
+ client.get_slice_super_by_names(keyspace, key, columnFamily, superColNames.asJava, consistencyLevel).toList
+
+ def |^(key: String, superColumnPath: SuperColumnPath): Option[SuperColumn] =
+ |^(key, superColumnPath, consistencyLevel)
+
+ def |^(key: String, superColumnPath: SuperColumnPath, consistencyLevel: Int): Option[SuperColumn] =
+ client.get_super_column(keyspace, key, superColumnPath, consistencyLevel)
+
+ def ++|^(batch: BatchMutationSuper): Unit =
+ ++|^(batch, consistencyLevel)
+
+ def ++|^(batch: BatchMutationSuper, consistencyLevel: Int): Unit =
+ client.batch_insert_super_column(keyspace, batch, consistencyLevel)
+
+ def getRange(key: String, columnParent: ColumnParent, start: Array[Byte], end: Array[Byte], ascending: Boolean, count: Int): List[Column] =
+ getRange(key, columnParent, start, end, ascending, count, consistencyLevel)
+
+ def getRange(key: String, columnParent: ColumnParent, start: Array[Byte], end: Array[Byte], ascending: Boolean, count: Int, consistencyLevel: Int): List[Column] =
+ client.get_slice(keyspace, key, columnParent, start, end, ascending, count, consistencyLevel).toList
+
+ def getRange(key: String, columnParent: ColumnParent, colNames: List[Array[Byte]]): List[Column] =
+ getRange(key, columnParent, colNames, consistencyLevel)
+
+ def getRange(key: String, columnParent: ColumnParent, colNames: List[Array[Byte]], consistencyLevel: Int): List[Column] =
+ client.get_slice_by_names(keyspace, key, columnParent, colNames.asJava, consistencyLevel).toList
+
+ def getColumn(key: String, colPath: ColumnPath): Option[Column] =
+ getColumn(key, colPath, consistencyLevel)
+
+ def getColumn(key: String, colPath: ColumnPath, consistencyLevel: Int): Option[Column] =
+ client.get_column(keyspace, key, colPath, consistencyLevel)
+
+ def getColumnCount(key: String, columnParent: ColumnParent): Int =
+ getColumnCount(key, columnParent, consistencyLevel)
+
+ def getColumnCount(key: String, columnParent: ColumnParent, consistencyLevel: Int): Int =
+ client.get_column_count(keyspace, key, columnParent, consistencyLevel)
+
+ def insertColumn(key: String, colPath: ColumnPath, value: Array[Byte]): Unit =
+ insertColumn(key, colPath, value, obtainedAt, consistencyLevel)
+
+ def insertColumn(key: String, colPath: ColumnPath, value: Array[Byte], timestamp: Long): Unit =
+ insertColumn(key, colPath, value, timestamp, consistencyLevel)
+
+ def insertColumn(key: String, colPath: ColumnPath, value: Array[Byte], timestamp: Long, consistencyLevel: Int) =
+ client.insert(keyspace, key, colPath, value, timestamp, consistencyLevel)
+
+ def insertColumn(batch: BatchMutation): Unit =
+ insertColumn(batch, consistencyLevel)
+
+ def insertColumn(batch: BatchMutation, consistencyLevel: Int): Unit =
+ client.batch_insert(keyspace, batch, consistencyLevel)
+
+ def removeColumn(key: String, columnPathOrParent: ColumnPathOrParent, timestamp: Long): Unit =
+ removeColumn(key, columnPathOrParent, timestamp, consistencyLevel)
+
+ def removeColumn(key: String, columnPathOrParent: ColumnPathOrParent, timestamp: Long, consistencyLevel: Int): Unit =
+ client.remove(keyspace, key, columnPathOrParent, timestamp, consistencyLevel)
+
+ def getSuperRange(key: String, columnFamily: String, start: Array[Byte], end: Array[Byte], ascending: Boolean, count: Int): List[SuperColumn] =
+ getSuperRange(key, columnFamily, start, end, ascending, count, consistencyLevel)
+
+ def getSuperRange(key: String, columnFamily: String, start: Array[Byte], end: Array[Byte], ascending: Boolean, count: Int, consistencyLevel: Int): List[SuperColumn] =
+ client.get_slice_super(keyspace, key, columnFamily, start, end, ascending, count, consistencyLevel).toList
+
+ def getSuperRange(key: String, columnFamily: String, superColNames: List[Array[Byte]]): List[SuperColumn] =
+ getSuperRange(key, columnFamily, superColNames, consistencyLevel)
+
+ def getSuperRange(key: String, columnFamily: String, superColNames: List[Array[Byte]], consistencyLevel: Int): List[SuperColumn] =
+ client.get_slice_super_by_names(keyspace, key, columnFamily, superColNames.asJava, consistencyLevel).toList
+
+ def getSuperColumn(key: String, superColumnPath: SuperColumnPath): Option[SuperColumn] =
+ getSuperColumn(key, superColumnPath, consistencyLevel)
+
+ def getSuperColumn(key: String, superColumnPath: SuperColumnPath, consistencyLevel: Int): Option[SuperColumn] =
+ client.get_super_column(keyspace, key, superColumnPath, consistencyLevel)
+
+ def insertSuperColumn(batch: BatchMutationSuper): Unit =
+ insertSuperColumn(batch, consistencyLevel)
+
+ def insertSuperColumn(batch: BatchMutationSuper, consistencyLevel: Int): Unit =
+ client.batch_insert_super_column(keyspace, batch, consistencyLevel)
+
+ def keys(columnFamily: String, startsWith: String, stopsAt: String, maxResults: Option[Int]): List[String] =
+ client.get_key_range(keyspace, columnFamily, startsWith, stopsAt, maxResults.getOrElse(-1)).toList
+}
+
+class CassandraSessionPool[T <: TTransport](
+ space: String,
+ transportPool: Pool[T],
+ inputProtocol: Protocol,
+ outputProtocol: Protocol,
+ consistency: Int) extends Closeable with Logging {
+
+ def this(space: String, transportPool: Pool[T], ioProtocol: Protocol, consistency: Int) =
+ this (space, transportPool, ioProtocol, ioProtocol, consistency)
+
+ def newSession: CassandraSession = newSession(consistency)
+
+ def newSession(consistencyLevel: Int): CassandraSession = {
+ val socket = transportPool.borrowObject
+ val cassandraClient = new Cassandra.Client(inputProtocol(socket), outputProtocol(socket))
+ val cassandraSchema = cassandraClient.describe_keyspace(space)
+ new CassandraSession {
+ val keyspace = space
+ val client = cassandraClient
+ val obtainedAt = System.currentTimeMillis
+ val consistencyLevel = consistency
+ val schema = cassandraSchema
+ log.debug("Creating %s", toString)
+
+ def flush = socket.flush
+ def close = transportPool.returnObject(socket)
+ override def toString = "[CassandraSession]\n\tkeyspace = " + keyspace + "\n\tschema = " + schema
+ }
+ }
+
+ def withSession[T](body: CassandraSession => T) = {
+ val session = newSession(consistency)
+ try {
+ val result = body(session)
+ session.flush
+ result
+ } finally {
+ session.close
+ }
+ }
+
+ def close = transportPool.close
+}
+
+sealed abstract class Protocol(val factory: TProtocolFactory) {
+ def apply(transport: TTransport) = factory.getProtocol(transport)
+}
+
+object Protocol {
+ object Binary extends Protocol(new TBinaryProtocol.Factory)
+ object SimpleJSON extends Protocol(new TSimpleJSONProtocol.Factory)
+ object JSON extends Protocol(new TJSONProtocol.Factory)
+}
diff --git a/kernel/src/main/scala/state/CassandraStorage.scala b/kernel/src/main/scala/state/CassandraStorage.scala
index 196177c8d5..4622841026 100644
--- a/kernel/src/main/scala/state/CassandraStorage.scala
+++ b/kernel/src/main/scala/state/CassandraStorage.scala
@@ -4,63 +4,68 @@
package se.scalablesolutions.akka.kernel.state
-import java.io.{File, Flushable, Closeable}
+import java.io.{Flushable, Closeable}
-import kernel.util.Logging
-import serialization.{Serializer, Serializable, SerializationProtocol}
+import util.Logging
+import util.Helpers._
+import serialization.Serializer
+import kernel.Kernel.config
-import org.apache.cassandra.db._
-import org.apache.cassandra.config.DatabaseDescriptor
+import org.apache.cassandra.db.ColumnFamily
import org.apache.cassandra.service._
-//import org.apache.thrift.server.TThreadPoolServer
-import org.apache.thrift.TProcessorFactory
-import org.apache.thrift.transport._
-import org.apache.thrift._
import org.apache.thrift.transport._
import org.apache.thrift.protocol._
/**
- * NOTE: requires command line options:
- *
- * -Dcassandra -Dstorage-config=config/ -Dpidfile=akka.pid
- *
* @author Jonas Bonér
*/
object CassandraStorage extends Logging {
- val TABLE_NAME = "akka"
- val MAP_COLUMN_FAMILY = "map"
- val VECTOR_COLUMN_FAMILY = "vector"
- val REF_COLUMN_FAMILY = "ref:item"
-
- val DEFAULT_CONSISTENCY_LEVEL = 10 //What should the default be?
+ val KEYSPACE = "akka"
+ val MAP_COLUMN_PARENT = new ColumnParent("map", null)
+ val VECTOR_COLUMN_PARENT = new ColumnParent("vector", null)
+ val REF_COLUMN_PARENT = new ColumnParent("ref", null)
+ val REF_KEY = "item".getBytes("UTF-8")
+ val CASSANDRA_SERVER_HOSTNAME = config.getString("akka.storage.cassandra.hostname", "127.0.0.1")
+ val CASSANDRA_SERVER_PORT = config.getInt("akka.storage.cassandra.port", 9160)
+ val CONSISTENCY_LEVEL = config.getInt("akka.storage.cassandra.consistency-level", 1)
val IS_ASCENDING = true
- import kernel.Kernel.config
-
- val CASSANDRA_SERVER_HOSTNAME = config.getString("akka.storage.cassandra.hostname", "localhost")
- val CASSANDRA_SERVER_PORT = config.getInt("akka.storage.cassandra.port", 9160)
- val BLOCKING_CALL = if (config.getBool("akka.storage.cassandra.blocking", true)) 0
- else 1
-
@volatile private[this] var isRunning = false
- private[this] val protocol: Protocol = {
- config.getString("akka.storage.cassandra.storage-format", "binary") match {
+ private[this] val protocol: Protocol = Protocol.Binary
+/* {
+ config.getString("akka.storage.cassandra.procotol", "binary") match {
case "binary" => Protocol.Binary
case "json" => Protocol.JSON
case "simple-json" => Protocol.SimpleJSON
case unknown => throw new UnsupportedOperationException("Unknown storage serialization protocol [" + unknown + "]")
}
}
+*/
+ private[this] val serializer: Serializer = {
+ kernel.Kernel.config.getString("akka.storage.cassandra.storage-format", "java") match {
+ case "scala-json" => Serializer.ScalaJSON
+ case "java-json" => Serializer.JavaJSON
+ case "protobuf" => Serializer.Protobuf
+ case "java" => Serializer.Java
+ case "sbinary" => throw new UnsupportedOperationException("SBinary serialization protocol is not yet supported for storage")
+ case "avro" => throw new UnsupportedOperationException("Avro serialization protocol is not yet supported for storage")
+ case unknown => throw new UnsupportedOperationException("Unknown storage serialization protocol [" + unknown + "]")
+ }
+ }
private[this] var sessions: Option[CassandraSessionPool[_]] = None
def start = synchronized {
if (!isRunning) {
try {
- sessions = Some(new CassandraSessionPool(StackPool(SocketProvider(CASSANDRA_SERVER_HOSTNAME, CASSANDRA_SERVER_PORT)), protocol, DEFAULT_CONSISTENCY_LEVEL))
+ sessions = Some(new CassandraSessionPool(
+ KEYSPACE,
+ StackPool(SocketProvider(CASSANDRA_SERVER_HOSTNAME, CASSANDRA_SERVER_PORT)),
+ protocol,
+ CONSISTENCY_LEVEL))
log.info("Cassandra persistent storage has started up successfully");
} catch {
case e =>
@@ -75,36 +80,27 @@ object CassandraStorage extends Logging {
if (isRunning && sessions.isDefined) sessions.get.close
}
- //implicit def strToBytes(s: String) = s.getBytes("UTF-8")
-
- /*
- def insertRefStorageFor(name: String, element: AnyRef) = sessions.withSession { session => {
- val user_id = "1"
- session ++| ("users", user_id, "base_attributes:name", "Lord Foo Bar", false)
- session ++| ("users", user_id, "base_attributes:age", "24", false)
- for( i <- session / ("users", user_id, "base_attributes", None, None).toList) println(i)
- }}
- */
// ===============================================================
// For Ref
// ===============================================================
- def insertRefStorageFor(name: String, element: String) = if (sessions.isDefined) {
+ def insertRefStorageFor(name: String, element: AnyRef) = if (sessions.isDefined) {
sessions.get.withSession {
- _ ++| (
- TABLE_NAME,
- name,
- REF_COLUMN_FAMILY,
- element,
- System.currentTimeMillis,
- BLOCKING_CALL)
+ _ ++| (name,
+ new ColumnPath(REF_COLUMN_PARENT.getColumn_family, null, REF_KEY),
+ serializer.out(element),
+ System.currentTimeMillis,
+ CONSISTENCY_LEVEL)
}
} else throw new IllegalStateException("CassandraStorage is not started")
- def getRefStorageFor(name: String): Option[String] = if (sessions.isDefined) {
+ def getRefStorageFor(name: String): Option[AnyRef] = if (sessions.isDefined) {
try {
- val column = sessions.get.withSession {_ | (TABLE_NAME, name, REF_COLUMN_FAMILY)}
- Some(column.value)
+ val column: Option[Column] = sessions.get.withSession {
+ _ | (name, new ColumnPath(REF_COLUMN_PARENT.getColumn_family, null, REF_KEY))
+ }
+ if (column.isDefined) Some(serializer.in(column.get.value, None))
+ else None
} catch {
case e =>
e.printStackTrace
@@ -116,75 +112,77 @@ object CassandraStorage extends Logging {
// For Vector
// ===============================================================
- def insertVectorStorageEntryFor(name: String, element: String) = if (sessions.isDefined) {
+ def insertVectorStorageEntryFor(name: String, element: AnyRef) = if (sessions.isDefined) {
sessions.get.withSession {
- _ ++| (
- TABLE_NAME,
- name,
- VECTOR_COLUMN_FAMILY + ":" + getVectorStorageSizeFor(name),
- element,
- System.currentTimeMillis,
- BLOCKING_CALL)
+ _ ++| (name,
+ new ColumnPath(VECTOR_COLUMN_PARENT.getColumn_family, null, intToBytes(getVectorStorageSizeFor(name))),
+ serializer.out(element),
+ System.currentTimeMillis,
+ CONSISTENCY_LEVEL)
}
} else throw new IllegalStateException("CassandraStorage is not started")
- def getVectorStorageEntryFor(name: String, index: Int): String = if (sessions.isDefined) {
- try {
- val column = sessions.get.withSession {_ | (TABLE_NAME, name, VECTOR_COLUMN_FAMILY + ":" + index)}
- column.value
- } catch {
- case e =>
- e.printStackTrace
- throw new NoSuchElementException(e.getMessage)
+ def getVectorStorageEntryFor(name: String, index: Int): AnyRef = if (sessions.isDefined) {
+ val column: Option[Column] = sessions.get.withSession {
+ _ | (name, new ColumnPath(VECTOR_COLUMN_PARENT.getColumn_family, null, intToBytes(index)))
}
+ if (column.isDefined) serializer.in(column.get.value, None)
+ else throw new NoSuchElementException("No element for vector [" + name + "] and index [" + index + "]")
} else throw new IllegalStateException("CassandraStorage is not started")
- def getVectorStorageRangeFor(name: String, start: Int, count: Int): List[String] = if (sessions.isDefined) {
- sessions.get.withSession {_ / (TABLE_NAME, name, VECTOR_COLUMN_FAMILY, IS_ASCENDING, count)}.map(_.value)
+ def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[AnyRef] = if (sessions.isDefined) {
+ val startBytes = if (start.isDefined) intToBytes(start.get) else null
+ val finishBytes = if (finish.isDefined) intToBytes(finish.get) else null
+ val columns: List[Column] = sessions.get.withSession {
+ _ / (name,
+ VECTOR_COLUMN_PARENT,
+ startBytes, finishBytes,
+ IS_ASCENDING,
+ count,
+ CONSISTENCY_LEVEL)
+ }
+ columns.map(column => serializer.in(column.value, None))
} else throw new IllegalStateException("CassandraStorage is not started")
def getVectorStorageSizeFor(name: String): Int = if (sessions.isDefined) {
- sessions.get.withSession {_ |# (TABLE_NAME, name, VECTOR_COLUMN_FAMILY)}
+ sessions.get.withSession {
+ _ |# (name, VECTOR_COLUMN_PARENT)
+ }
} else throw new IllegalStateException("CassandraStorage is not started")
// ===============================================================
// For Map
// ===============================================================
- def insertMapStorageEntryFor(name: String, key: String, value: String) = if (sessions.isDefined) {
+ def insertMapStorageEntryFor(name: String, key: AnyRef, element: AnyRef) = if (sessions.isDefined) {
sessions.get.withSession {
- _ ++| (
- TABLE_NAME,
- name,
- MAP_COLUMN_FAMILY + ":" + key,
- value,
- System.currentTimeMillis,
- BLOCKING_CALL)
+ _ ++| (name,
+ new ColumnPath(MAP_COLUMN_PARENT.getColumn_family, null, serializer.out(key)),
+ serializer.out(element),
+ System.currentTimeMillis,
+ CONSISTENCY_LEVEL)
}
} else throw new IllegalStateException("CassandraStorage is not started")
- def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[String, String]]) = if (sessions.isDefined) {
- import java.util.{Map, HashMap, List, ArrayList}
- val columns: Map[String, List[column_t]] = new HashMap
+ def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[AnyRef, AnyRef]]) = if (sessions.isDefined) {
+ val cf2columns: java.util.Map[String, java.util.List[Column]] = new java.util.HashMap
for (entry <- entries) {
- val cls: List[column_t] = new ArrayList
- cls.add(new column_t(entry._1, entry._2, System.currentTimeMillis))
- columns.put(MAP_COLUMN_FAMILY, cls)
+ val columns: java.util.List[Column] = new java.util.ArrayList
+ columns.add(new Column(serializer.out(entry._1), serializer.out(entry._2), System.currentTimeMillis))
+ cf2columns.put(MAP_COLUMN_PARENT.getColumn_family, columns)
}
sessions.get.withSession {
- _ ++| (
- new batch_mutation_t(
- TABLE_NAME,
- name,
- columns),
- BLOCKING_CALL)
+ _ ++| (new BatchMutation(name, cf2columns), CONSISTENCY_LEVEL)
}
} else throw new IllegalStateException("CassandraStorage is not started")
- def getMapStorageEntryFor(name: String, key: String): Option[String] = if (sessions.isDefined) {
+ def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef] = if (sessions.isDefined) {
try {
- val column = sessions.get.withSession {_ | (TABLE_NAME, name, MAP_COLUMN_FAMILY + ":" + key)}
- Some(column.value)
+ val column: Option[Column] = sessions.get.withSession {
+ _ | (name, new ColumnPath(MAP_COLUMN_PARENT.getColumn_family, null, serializer.out(key)))
+ }
+ if (column.isDefined) Some(serializer.in(column.get.value, None))
+ else None
} catch {
case e =>
e.printStackTrace
@@ -192,161 +190,45 @@ object CassandraStorage extends Logging {
}
} else throw new IllegalStateException("CassandraStorage is not started")
- /*
- def getMapStorageFor(name: String): List[Tuple2[String, String]] = if (sessions.isDefined) {
- val columns = server.get_columns_since(TABLE_NAME, name, MAP_COLUMN_FAMILY, -1)
+ def getMapStorageFor(name: String): List[Tuple2[AnyRef, AnyRef]] = if (sessions.isDefined) {
+ throw new UnsupportedOperationException
+ /*
+ val columns = server.get_columns_since(name, MAP_COLUMN_FAMILY, -1)
.toArray.toList.asInstanceOf[List[org.apache.cassandra.service.column_t]]
for {
column <- columns
col = (column.columnName, column.value)
} yield col
- } else throw new IllegalStateException("CassandraStorage is not started")
*/
+ } else throw new IllegalStateException("CassandraStorage is not started")
def getMapStorageSizeFor(name: String): Int = if (sessions.isDefined) {
- sessions.get.withSession {_ |# (TABLE_NAME, name, MAP_COLUMN_FAMILY)}
- } else throw new IllegalStateException("CassandraStorage is not started")
-
- def removeMapStorageFor(name: String) = if (sessions.isDefined) {
- sessions.get.withSession {_ -- (TABLE_NAME, name, MAP_COLUMN_FAMILY, System.currentTimeMillis, BLOCKING_CALL)}
- } else throw new IllegalStateException("CassandraStorage is not started")
-
- def getMapStorageRangeFor(name: String, start: Int, count: Int): List[Tuple2[String, String]] = if (sessions.isDefined) {
- sessions.get.withSession {_ / (TABLE_NAME, name, MAP_COLUMN_FAMILY, IS_ASCENDING, count)}.toArray.toList.asInstanceOf[List[Tuple2[String, String]]]
- } else throw new IllegalStateException("CassandraStorage is not started")
-}
-
-trait CassandraSession extends Closeable with Flushable {
- import scala.collection.jcl.Conversions._
- import org.scala_tools.javautils.Imports._
-
- private implicit def null2Option[T](t: T): Option[T] = if (t != null) Some(t) else None
-
- protected val client: Cassandra.Client
-
- val obtainedAt: Long
- val consistencyLevel: Int
-
- def /(keyspace: String, key: String, columnParent: ColumnParent, start: Array[Byte], end: Array[Byte], ascending: Boolean, count: Int): List[Column] =
- /(keyspace, key, columnParent, start, end, ascending, count, consistencyLevel)
-
- def /(keyspace: String, key: String, columnParent: ColumnParent, start: Array[Byte], end: Array[Byte], ascending: Boolean, count: Int, consistencyLevel: Int): List[Column] =
- client.get_slice(keyspace, key, columnParent, start, end, ascending, count, consistencyLevel).toList
-
- def /(keyspace: String, key: String, columnParent: ColumnParent, colNames: List[Array[Byte]]): List[Column] =
- /(keyspace, key, columnParent, colNames, consistencyLevel)
-
- def /(keyspace: String, key: String, columnParent: ColumnParent, colNames: List[Array[Byte]], consistencyLevel: Int): List[Column] =
- client.get_slice_by_names(keyspace, key, columnParent, colNames.asJava, consistencyLevel).toList
-
- def |(keyspace: String, key: String, colPath: ColumnPath): Option[Column] =
- |(keyspace, key, colPath, consistencyLevel)
-
- def |(keyspace: String, key: String, colPath: ColumnPath, consistencyLevel: Int): Option[Column] =
- client.get_column(keyspace, key, colPath, consistencyLevel)
-
- def |#(keyspace: String, key: String, columnParent: ColumnParent): Int =
- |#(keyspace, key, columnParent, consistencyLevel)
-
- def |#(keyspace: String, key: String, columnParent: ColumnParent, consistencyLevel: Int): Int =
- client.get_column_count(keyspace, key, columnParent, consistencyLevel)
-
- def ++|(keyspace: String, key: String, columnPath: ColumnPath, value: Array[Byte]): Unit =
- ++|(keyspace, key, columnPath, value, obtainedAt, consistencyLevel)
-
- def ++|(keyspace: String, key: String, columnPath: ColumnPath, value: Array[Byte], timestamp: Long): Unit =
- ++|(keyspace, key, columnPath, value, timestamp, consistencyLevel)
-
- def ++|(keyspace: String, key: String, columnPath: ColumnPath, value: Array[Byte], timestamp: Long, consistencyLevel: Int) =
- client.insert(keyspace, key, columnPath, value, timestamp, consistencyLevel)
-
- def ++|(keyspace: String, batch: BatchMutation): Unit =
- ++|(keyspace, batch, consistencyLevel)
-
- def ++|(keyspace: String, batch: BatchMutation, consistencyLevel: Int): Unit =
- client.batch_insert(keyspace, batch, consistencyLevel)
-
- def --(keyspace: String, key: String, columnPathOrParent: ColumnPathOrParent, timestamp: Long): Unit =
- --(keyspace, key, columnPathOrParent, timestamp, consistencyLevel)
-
- def --(keyspace: String, key: String, columnPathOrParent: ColumnPathOrParent, timestamp: Long, consistencyLevel: Int): Unit =
- client.remove(keyspace, key, columnPathOrParent, timestamp, consistencyLevel)
-
- def /^(keyspace: String, key: String, columnFamily: String, start: Array[Byte], end: Array[Byte], ascending: Boolean, count: Int): List[SuperColumn] =
- /^(keyspace, key, columnFamily, start, end, ascending, count, consistencyLevel)
-
- def /^(keyspace: String, key: String, columnFamily: String, start: Array[Byte], end: Array[Byte], ascending: Boolean, count: Int, consistencyLevel: Int): List[SuperColumn] =
- client.get_slice_super(keyspace, key, columnFamily, start, end, ascending, count, consistencyLevel).toList
-
- def /^(keyspace: String, key: String, columnFamily: String, superColNames: List[Array[Byte]]): List[SuperColumn] =
- /^(keyspace, key, columnFamily, superColNames, consistencyLevel)
-
- def /^(keyspace: String, key: String, columnFamily: String, superColNames: List[Array[Byte]], consistencyLevel: Int): List[SuperColumn] =
- client.get_slice_super_by_names(keyspace, key, columnFamily, superColNames.asJava, consistencyLevel).toList
-
- def |^(keyspace: String, key: String, superColumnPath: SuperColumnPath): Option[SuperColumn] =
- |^(keyspace, key, superColumnPath, consistencyLevel)
-
- def |^(keyspace: String, key: String, superColumnPath: SuperColumnPath, consistencyLevel: Int): Option[SuperColumn] =
- client.get_super_column(keyspace, key, superColumnPath, consistencyLevel)
-
- def ++|^(keyspace: String, batch: BatchMutationSuper): Unit =
- ++|^(keyspace, batch, consistencyLevel)
-
- def ++|^(keyspace: String, batch: BatchMutationSuper, consistencyLevel: Int): Unit =
- client.batch_insert_super_column(keyspace, batch, consistencyLevel)
-
- def keys(keyspace: String, columnFamily: String, startsWith: String, stopsAt: String, maxResults: Option[Int]): List[String] =
- client.get_key_range(keyspace, columnFamily, startsWith, stopsAt, maxResults.getOrElse(-1)).toList
-
- def property(name: String): String = client.get_string_property(name)
-
- def properties(name: String): List[String] = client.get_string_list_property(name).toList
-
- def describeTable(keyspace: String) = client.describe_keyspace(keyspace)
-
- def ?(query: String) = client.execute_query(query)
-}
-
-class CassandraSessionPool[T <: TTransport](transportPool: Pool[T], inputProtocol: Protocol, outputProtocol: Protocol, defConsistencyLvl: Int) extends Closeable {
- def this(transportPool: Pool[T], ioProtocol: Protocol, defConsistencyLvl: Int) = this (transportPool, ioProtocol, ioProtocol, defConsistencyLvl)
-
- def newSession: CassandraSession = newSession(defConsistencyLvl)
-
- def newSession(consistencyLvl: Int): CassandraSession = {
- val t = transportPool.borrowObject
- val c = new Cassandra.Client(inputProtocol(t), outputProtocol(t))
- new CassandraSession {
- val client = c
- val obtainedAt = System.currentTimeMillis
- val consistencyLevel = consistencyLvl
- def flush = t.flush
- def close = transportPool.returnObject(t)
+ sessions.get.withSession {
+ _ |# (name, MAP_COLUMN_PARENT)
}
- }
+ } else throw new IllegalStateException("CassandraStorage is not started")
- def withSession[R](body: CassandraSession => R) = {
- val session = newSession(defConsistencyLvl)
- try {
- val result = body(session)
- session.flush
- result
- } finally {
- session.close
+ def removeMapStorageFor(name: String): Unit = removeMapStorageFor(name, null)
+
+ def removeMapStorageFor(name: String, key: AnyRef): Unit = if (sessions.isDefined) {
+ val keyBytes = if (key == null) null else serializer.out(key)
+ sessions.get.withSession {
+ _ -- (name,
+ new ColumnPathOrParent(MAP_COLUMN_PARENT.getColumn_family, null, keyBytes),
+ System.currentTimeMillis,
+ CONSISTENCY_LEVEL)
}
- }
+ } else throw new IllegalStateException("CassandraStorage is not started")
- def close = transportPool.close
-}
-
-sealed abstract class Protocol(val factory: TProtocolFactory) {
- def apply(transport: TTransport) = factory.getProtocol(transport)
-}
-
-object Protocol {
- object Binary extends Protocol(new TBinaryProtocol.Factory)
- object SimpleJSON extends Protocol(new TSimpleJSONProtocol.Factory)
- object JSON extends Protocol(new TJSONProtocol.Factory)
+ def getMapStorageRangeFor(name: String, start: Option[AnyRef], finish: Option[AnyRef], count: Int):
+ List[Tuple2[AnyRef, AnyRef]] = if (sessions.isDefined) {
+ val startBytes = if (start.isDefined) serializer.out(start.get) else null
+ val finishBytes = if (finish.isDefined) serializer.out(finish.get) else null
+ val columns: List[Column] = sessions.get.withSession {
+ _ / (name, MAP_COLUMN_PARENT, startBytes, finishBytes, IS_ASCENDING, count, CONSISTENCY_LEVEL)
+ }
+ columns.map(column => (column.name, serializer.in(column.value, None)))
+ } else throw new IllegalStateException("CassandraStorage is not started")
}
/**
@@ -357,7 +239,7 @@ object Protocol {
* @author Jonas Bonér
*
object EmbeddedCassandraStorage extends Logging {
-val TABLE_NAME = "akka"
+val KEYSPACE = "akka"
val MAP_COLUMN_FAMILY = "map"
val VECTOR_COLUMN_FAMILY = "vector"
val REF_COLUMN_FAMILY = "ref:item"
@@ -365,7 +247,7 @@ val REF_COLUMN_FAMILY = "ref:item"
val IS_ASCENDING = true
val RUN_THRIFT_SERVICE = kernel.Kernel.config.getBool("akka.storage.cassandra.thrift-server.service", false)
-val BLOCKING_CALL = {
+val CONSISTENCY_LEVEL = {
if (kernel.Kernel.config.getBool("akka.storage.cassandra.blocking", true)) 0
else 1 }
@@ -416,17 +298,17 @@ if (RUN_THRIFT_SERVICE) thriftServer.stop
def insertRefStorageFor(name: String, element: AnyRef) = {
server.insert(
-TABLE_NAME,
+KEYSPACE,
name,
REF_COLUMN_FAMILY,
element,
System.currentTimeMillis,
-BLOCKING_CALL)
+CONSISTENCY_LEVEL)
}
def getRefStorageFor(name: String): Option[AnyRef] = {
try {
-val column = server.get_column(TABLE_NAME, name, REF_COLUMN_FAMILY)
+val column = server.get_column(KEYSPACE, name, REF_COLUMN_FAMILY)
Some(serializer.in(column.value, None))
} catch {
case e =>
@@ -440,17 +322,17 @@ None }
def insertVectorStorageEntryFor(name: String, element: AnyRef) = {
server.insert(
-TABLE_NAME,
+KEYSPACE,
name,
VECTOR_COLUMN_FAMILY + ":" + getVectorStorageSizeFor(name),
element,
System.currentTimeMillis,
-BLOCKING_CALL)
+CONSISTENCY_LEVEL)
}
def getVectorStorageEntryFor(name: String, index: Int): AnyRef = {
try {
-val column = server.get_column(TABLE_NAME, name, VECTOR_COLUMN_FAMILY + ":" + index)
+val column = server.get_column(KEYSPACE, name, VECTOR_COLUMN_FAMILY + ":" + index)
serializer.in(column.value, None)
} catch {
case e =>
@@ -460,11 +342,11 @@ throw new Predef.NoSuchElementException(e.getMessage)
}
def getVectorStorageRangeFor(name: String, start: Int, count: Int): List[AnyRef] =
-server.get_slice(TABLE_NAME, name, VECTOR_COLUMN_FAMILY, IS_ASCENDING, count)
+server.get_slice(KEYSPACE, name, VECTOR_COLUMN_FAMILY, IS_ASCENDING, count)
.toArray.toList.asInstanceOf[List[Tuple2[String, AnyRef]]].map(tuple => tuple._2)
def getVectorStorageSizeFor(name: String): Int =
-server.get_column_count(TABLE_NAME, name, VECTOR_COLUMN_FAMILY)
+server.get_column_count(KEYSPACE, name, VECTOR_COLUMN_FAMILY)
// ===============================================================
// For Map
@@ -472,11 +354,11 @@ server.get_column_count(TABLE_NAME, name, VECTOR_COLUMN_FAMILY)
def insertMapStorageEntryFor(name: String, key: String, value: AnyRef) = {
server.insert(
-TABLE_NAME, name,
+KEYSPACE, name,
MAP_COLUMN_FAMILY + ":" + key,
serializer.out(value),
System.currentTimeMillis,
-BLOCKING_CALL)
+CONSISTENCY_LEVEL)
}
def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[String, AnyRef]]) = {
@@ -487,15 +369,15 @@ val cls: List[column_t] = new ArrayList
cls.add(new column_t(entry._1, serializer.out(entry._2), System.currentTimeMillis))
columns.put(MAP_COLUMN_FAMILY, cls)
}
-server.batch_insert(new batch_mutation_t(
-TABLE_NAME, name,
+server.batch_insert(new BatchMutation(
+KEYSPACE, name,
columns),
-BLOCKING_CALL)
+CONSISTENCY_LEVEL)
}
def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef] = {
try {
-val column = server.get_column(TABLE_NAME, name, MAP_COLUMN_FAMILY + ":" + key)
+val column = server.get_column(KEYSPACE, name, MAP_COLUMN_FAMILY + ":" + key)
Some(serializer.in(column.value, None))
} catch {
case e =>
@@ -505,7 +387,7 @@ None
}
def getMapStorageFor(name: String): List[Tuple2[String, AnyRef]] = {
-val columns = server.get_columns_since(TABLE_NAME, name, MAP_COLUMN_FAMILY, -1)
+val columns = server.get_columns_since(KEYSPACE, name, MAP_COLUMN_FAMILY, -1)
.toArray.toList.asInstanceOf[List[org.apache.cassandra.service.column_t]]
for {
column <- columns
@@ -514,13 +396,13 @@ col = (column.columnName, serializer.in(column.value, None))
}
def getMapStorageSizeFor(name: String): Int =
-server.get_column_count(TABLE_NAME, name, MAP_COLUMN_FAMILY)
+server.get_column_count(KEYSPACE, name, MAP_COLUMN_FAMILY)
def removeMapStorageFor(name: String) =
-server.remove(TABLE_NAME, name, MAP_COLUMN_FAMILY, System.currentTimeMillis, BLOCKING_CALL)
+server.remove(KEYSPACE, name, MAP_COLUMN_FAMILY, System.currentTimeMillis, CONSISTENCY_LEVEL)
def getMapStorageRangeFor(name: String, start: Int, count: Int): List[Tuple2[String, AnyRef]] = {
-server.get_slice(TABLE_NAME, name, MAP_COLUMN_FAMILY, IS_ASCENDING, count)
+server.get_slice(KEYSPACE, name, MAP_COLUMN_FAMILY, IS_ASCENDING, count)
.toArray.toList.asInstanceOf[List[Tuple2[String, AnyRef]]]
}
}
diff --git a/kernel/src/main/scala/state/Pool.scala b/kernel/src/main/scala/state/Pool.scala
old mode 100755
new mode 100644
index 1ba3ba1b3c..6391645562
--- a/kernel/src/main/scala/state/Pool.scala
+++ b/kernel/src/main/scala/state/Pool.scala
@@ -7,6 +7,8 @@ package se.scalablesolutions.akka.kernel.state
import org.apache.commons.pool._
import org.apache.commons.pool.impl._
+import org.apache.thrift.transport._
+
trait Pool[T] extends java.io.Closeable {
def borrowObject: T
def returnObject(t: T): Unit
@@ -86,7 +88,7 @@ trait TransportFactory[T <: TTransport] extends PoolItemFactory[T] {
case class SocketProvider(val host: String, val port: Int) extends TransportFactory[TSocket] {
def createTransport = {
- val t = new TSocket(host,port)
+ val t = new TSocket(host, port)
t.open
t
}
diff --git a/kernel/src/main/scala/state/State.scala b/kernel/src/main/scala/state/State.scala
index 58c1cbfbe8..46114eb1bc 100644
--- a/kernel/src/main/scala/state/State.scala
+++ b/kernel/src/main/scala/state/State.scala
@@ -37,7 +37,7 @@ object TransactionalState extends TransactionalState
*
*/
class TransactionalState {
- def newPersistentMap(config: PersistentStorageConfig): TransactionalMap[String, AnyRef] = config match {
+ def newPersistentMap(config: PersistentStorageConfig): TransactionalMap[AnyRef, AnyRef] = config match {
case CassandraStorageConfig() => new CassandraPersistentTransactionalMap
case TerracottaStorageConfig() => throw new UnsupportedOperationException
case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException
@@ -174,7 +174,7 @@ abstract class PersistentTransactionalMap[K, V] extends TransactionalMap[K, V] {
// FIXME: need to handle remove in another changeSet
protected[kernel] val changeSet = new HashMap[K, V]
- def getRange(start: Int, count: Int)
+ def getRange(start: Option[AnyRef], count: Int)
// ---- For Transactional ----
override def begin = {}
@@ -188,11 +188,6 @@ abstract class PersistentTransactionalMap[K, V] extends TransactionalMap[K, V] {
None // always return None to speed up writes (else need to go to DB to get
}
- override def remove(key: K) = {
- verifyTransaction
- changeSet -= key
- }
-
override def -=(key: K) = remove(key)
override def update(key: K, value: V) = put(key, value)
@@ -203,12 +198,21 @@ abstract class PersistentTransactionalMap[K, V] extends TransactionalMap[K, V] {
*
* @author Jonas Bonér
*/
-class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[String, AnyRef] {
+class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[AnyRef, AnyRef] {
- override def getRange(start: Int, count: Int) = {
+ override def remove(key: AnyRef) = {
+ verifyTransaction
+ if (changeSet.contains(key)) changeSet -= key
+ else CassandraStorage.removeMapStorageFor(uuid, key)
+ }
+
+ override def getRange(start: Option[AnyRef], count: Int) =
+ getRange(start, None, count)
+
+ def getRange(start: Option[AnyRef], finish: Option[AnyRef], count: Int) = {
verifyTransaction
try {
- CassandraStorage.getMapStorageRangeFor(uuid, start, count)
+ CassandraStorage.getMapStorageRangeFor(uuid, start, finish, count)
} catch {
case e: Exception => Nil
}
@@ -230,7 +234,7 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Str
}
}
- override def contains(key: String): Boolean = {
+ override def contains(key: AnyRef): Boolean = {
try {
verifyTransaction
CassandraStorage.getMapStorageEntryFor(uuid, key).isDefined
@@ -249,7 +253,7 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Str
}
// ---- For scala.collection.mutable.Map ----
- override def get(key: String): Option[AnyRef] = {
+ override def get(key: AnyRef): Option[AnyRef] = {
verifyTransaction
// if (changeSet.contains(key)) changeSet.get(key)
// else {
@@ -262,16 +266,16 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Str
//}
}
- override def elements: Iterator[Tuple2[String, AnyRef]] = {
+ override def elements: Iterator[Tuple2[AnyRef, AnyRef]] = {
//verifyTransaction
- new Iterator[Tuple2[String, AnyRef]] {
- private val originalList: List[Tuple2[String, AnyRef]] = try {
+ new Iterator[Tuple2[AnyRef, AnyRef]] {
+ private val originalList: List[Tuple2[AnyRef, AnyRef]] = try {
CassandraStorage.getMapStorageFor(uuid)
} catch {
case e: Throwable => Nil
}
private var elements = originalList.reverse
- override def next: Tuple2[String, AnyRef]= synchronized {
+ override def next: Tuple2[AnyRef, AnyRef]= synchronized {
val element = elements.head
elements = elements.tail
element
@@ -391,9 +395,12 @@ class CassandraPersistentTransactionalVector extends PersistentTransactionalVect
else CassandraStorage.getVectorStorageEntryFor(uuid, index)
}
- override def getRange(start: Int, count: Int): List[AnyRef] = {
+ override def getRange(start: Int, count: Int): List[AnyRef] =
+ getRange(Some(start), None, count)
+
+ def getRange(start: Option[Int], finish: Option[Int], count: Int): List[AnyRef] = {
verifyTransaction
- CassandraStorage.getVectorStorageRangeFor(uuid, start, count)
+ CassandraStorage.getVectorStorageRangeFor(uuid, start, finish, count)
}
override def length: Int = {
diff --git a/kernel/src/main/scala/util/Helpers.scala b/kernel/src/main/scala/util/Helpers.scala
index cc629e0801..6d43f06030 100644
--- a/kernel/src/main/scala/util/Helpers.scala
+++ b/kernel/src/main/scala/util/Helpers.scala
@@ -4,15 +4,9 @@
package se.scalablesolutions.akka.kernel.util
-import java.io.UnsupportedEncodingException
-import java.security.{NoSuchAlgorithmException, MessageDigest}
+import java.security.MessageDigest
import java.util.concurrent.locks.ReentrantReadWriteLock
-import scala.actors._
-import scala.actors.Actor._
-
-import net.lag.logging.Logger
-
class SystemFailure(cause: Throwable) extends RuntimeException(cause)
/**
@@ -20,7 +14,18 @@ class SystemFailure(cause: Throwable) extends RuntimeException(cause)
*/
object Helpers extends Logging {
- def getDigestFor(s: String) = {
+ implicit def null2Option[T](t: T): Option[T] = if (t != null) Some(t) else None
+
+ def intToBytes(value: Int): Array[Byte] = {
+ val bytes = new Array[Byte](4)
+ bytes(0) = (value >>> 24).asInstanceOf[Byte]
+ bytes(1) = (value >>> 16).asInstanceOf[Byte]
+ bytes(2) = (value >>> 8).asInstanceOf[Byte]
+ bytes(3) = value.asInstanceOf[Byte]
+ bytes
+ }
+
+ def getMD5For(s: String) = {
val digest = MessageDigest.getInstance("MD5")
digest.update(s.getBytes("ASCII"))
val bytes = digest.digest
@@ -59,51 +64,5 @@ object Helpers extends Logging {
}
}
}
-
- // ================================================
- // implicit conversion between regular actor and actor with a type future
- implicit def actorWithFuture(a: Actor) = new ActorWithTypedFuture(a)
-
- abstract class FutureWithTimeout[T](ch: InputChannel[T]) extends Future[T](ch) {
- def receiveWithin(timeout: Int) : Option[T]
- override def respond(f: T => Unit): Unit = throw new UnsupportedOperationException("Does not support the Responder API")
- }
-
- def receiveOrFail[T](future: => FutureWithTimeout[T], timeout: Int, errorHandler: => T): T = {
- future.receiveWithin(timeout) match {
- case None => errorHandler
- case Some(reply) => reply
- }
- }
-
- class ActorWithTypedFuture(a: Actor) {
- require(a != null)
-
- def !!: FutureWithTimeout[A] = {
- val ftch = new Channel[A](Actor.self)
- a.send(msg, ftch.asInstanceOf[OutputChannel[Any]])
- new FutureWithTimeout[A](ftch) {
- def apply() =
- if (isSet) value.get.asInstanceOf[A]
- else ch.receive {
- case a =>
- value = Some(a)
- value.get.asInstanceOf[A]
- }
- def isSet = receiveWithin(0).isDefined
- def receiveWithin(timeout: Int): Option[A] = value match {
- case None => ch.receiveWithin(timeout) {
- case TIMEOUT =>
- log.debug("Future timed out while waiting for actor [%s]", a)
- None
- case a =>
- value = Some(a)
- value.asInstanceOf[Option[A]]
- }
- case a => a.asInstanceOf[Option[A]]
- }
- }
- }
- }
}
diff --git a/lib/cassandra-0.4.0-dev.jar b/lib/cassandra-0.4.0-dev.jar
index d9f762ad2a..706336669f 100644
Binary files a/lib/cassandra-0.4.0-dev.jar and b/lib/cassandra-0.4.0-dev.jar differ
diff --git a/lib/javautils-2.7.4-0.1.jar b/lib/javautils-2.7.4-0.1.jar
new file mode 100644
index 0000000000..a0c51bf7da
Binary files /dev/null and b/lib/javautils-2.7.4-0.1.jar differ
diff --git a/lib/netty-3.1.0.CR1.jar b/lib/netty-3.1.0.GA.jar
similarity index 57%
rename from lib/netty-3.1.0.CR1.jar
rename to lib/netty-3.1.0.GA.jar
index f1180b0477..94c9b24902 100644
Binary files a/lib/netty-3.1.0.CR1.jar and b/lib/netty-3.1.0.GA.jar differ
diff --git a/pom.xml b/pom.xml
index f6097e18de..f321ff0e88 100644
--- a/pom.xml
+++ b/pom.xml
@@ -26,6 +26,38 @@
samples-java
+
+ Scalable Solutions AB
+ http://scalablesolutions.se
+
+
+
+ scm:git:git://github.com/jboner/akka.git
+ scm:git:git@github.com:jboner/akka.git
+ http://github.com/jboner/akka
+
+
+
+
+ the Apache License, ASL Version 2.0
+ http://www.apache.org/licenses/LICENSE-2.0
+
+
+
+
+
+ jboner
+ Jonas Bonér
+ +1
+ jonas AT jonasboner DOTCOM
+
+ Founder
+ Hacker
+ Despot
+
+
+
+
repo1.maven
diff --git a/samples-java/akka-samples-java.iml b/samples-java/akka-samples-java.iml
index 4a210fcd93..5871ff71d7 100644
--- a/samples-java/akka-samples-java.iml
+++ b/samples-java/akka-samples-java.iml
@@ -32,13 +32,9 @@
-
-
-
-
-
-
+
+
@@ -67,6 +63,10 @@
+
+
+
+
diff --git a/samples-lift/akka-samples-lift.iml b/samples-lift/akka-samples-lift.iml
index 7e8bb22ce4..d5189eca42 100644
--- a/samples-lift/akka-samples-lift.iml
+++ b/samples-lift/akka-samples-lift.iml
@@ -39,8 +39,14 @@
-
+
+
+
+
+
+
+
@@ -69,10 +75,6 @@
-
-
-
-
diff --git a/samples-scala/akka-samples-scala.iml b/samples-scala/akka-samples-scala.iml
index 05d2b682d7..955c18a9ce 100644
--- a/samples-scala/akka-samples-scala.iml
+++ b/samples-scala/akka-samples-scala.iml
@@ -37,8 +37,14 @@
-
+
+
+
+
+
+
+
@@ -67,10 +73,6 @@
-
-
-
-