diff --git a/akka.iws b/akka.iws
index 5fbfeffb69..a542bf2f14 100644
--- a/akka.iws
+++ b/akka.iws
@@ -6,17 +6,12 @@
-
-
-
+
+
-
-
-
-
-
+
+
-
@@ -152,93 +147,10 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
@@ -258,7 +170,6 @@
@@ -496,184 +408,6 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
@@ -2439,15 +2173,6 @@
-
-
-
-
-
-
-
-
-
@@ -2490,6 +2215,13 @@
+
+
+
+
+
+
+
diff --git a/fun-test-java/akka-fun-test-java.iml b/fun-test-java/akka-fun-test-java.iml
index a906c3d925..9ef386fdde 100644
--- a/fun-test-java/akka-fun-test-java.iml
+++ b/fun-test-java/akka-fun-test-java.iml
@@ -45,6 +45,7 @@
+
diff --git a/kernel/akka-kernel.iml b/kernel/akka-kernel.iml
index 2ab816b9b2..575fe344c0 100644
--- a/kernel/akka-kernel.iml
+++ b/kernel/akka-kernel.iml
@@ -48,11 +48,6 @@
-
-
-
-
-
@@ -66,6 +61,7 @@
+
@@ -82,6 +78,10 @@
+
+
+
+
diff --git a/kernel/src/main/scala/state/CassandraStorage.scala b/kernel/src/main/scala/state/CassandraStorage.scala
index b2538722fb..196177c8d5 100644
--- a/kernel/src/main/scala/state/CassandraStorage.scala
+++ b/kernel/src/main/scala/state/CassandraStorage.scala
@@ -9,6 +9,7 @@ import java.io.{File, Flushable, Closeable}
import kernel.util.Logging
import serialization.{Serializer, Serializable, SerializationProtocol}
+import org.apache.cassandra.db._
import org.apache.cassandra.config.DatabaseDescriptor
import org.apache.cassandra.service._
@@ -32,6 +33,8 @@ object CassandraStorage extends Logging {
val VECTOR_COLUMN_FAMILY = "vector"
val REF_COLUMN_FAMILY = "ref:item"
+ val DEFAULT_CONSISTENCY_LEVEL = 10 //What should the default be?
+
val IS_ASCENDING = true
import kernel.Kernel.config
@@ -39,7 +42,7 @@ object CassandraStorage extends Logging {
val CASSANDRA_SERVER_HOSTNAME = config.getString("akka.storage.cassandra.hostname", "localhost")
val CASSANDRA_SERVER_PORT = config.getInt("akka.storage.cassandra.port", 9160)
val BLOCKING_CALL = if (config.getBool("akka.storage.cassandra.blocking", true)) 0
- else 1
+ else 1
@volatile private[this] var isRunning = false
private[this] val protocol: Protocol = {
@@ -51,13 +54,13 @@ object CassandraStorage extends Logging {
}
}
-
+
private[this] var sessions: Option[CassandraSessionPool[_]] = None
def start = synchronized {
if (!isRunning) {
try {
- sessions = Some(new CassandraSessionPool(StackPool(SocketProvider(CASSANDRA_SERVER_HOSTNAME, CASSANDRA_SERVER_PORT)), protocol))
+ sessions = Some(new CassandraSessionPool(StackPool(SocketProvider(CASSANDRA_SERVER_HOSTNAME, CASSANDRA_SERVER_PORT)), protocol, DEFAULT_CONSISTENCY_LEVEL))
log.info("Cassandra persistent storage has started up successfully");
} catch {
case e =>
@@ -67,21 +70,21 @@ object CassandraStorage extends Logging {
isRunning
}
}
-
+
def stop = synchronized {
if (isRunning && sessions.isDefined) sessions.get.close
}
-
+
//implicit def strToBytes(s: String) = s.getBytes("UTF-8")
-/*
- def insertRefStorageFor(name: String, element: AnyRef) = sessions.withSession { session => {
- val user_id = "1"
- session ++| ("users", user_id, "base_attributes:name", "Lord Foo Bar", false)
- session ++| ("users", user_id, "base_attributes:age", "24", false)
- for( i <- session / ("users", user_id, "base_attributes", None, None).toList) println(i)
- }}
-*/
+ /*
+ def insertRefStorageFor(name: String, element: AnyRef) = sessions.withSession { session => {
+ val user_id = "1"
+ session ++| ("users", user_id, "base_attributes:name", "Lord Foo Bar", false)
+ session ++| ("users", user_id, "base_attributes:age", "24", false)
+ for( i <- session / ("users", user_id, "base_attributes", None, None).toList) println(i)
+ }}
+ */
// ===============================================================
// For Ref
// ===============================================================
@@ -89,23 +92,23 @@ object CassandraStorage extends Logging {
def insertRefStorageFor(name: String, element: String) = if (sessions.isDefined) {
sessions.get.withSession {
_ ++| (
- TABLE_NAME,
- name,
- REF_COLUMN_FAMILY,
- element,
- System.currentTimeMillis,
- BLOCKING_CALL)
+ TABLE_NAME,
+ name,
+ REF_COLUMN_FAMILY,
+ element,
+ System.currentTimeMillis,
+ BLOCKING_CALL)
}
} else throw new IllegalStateException("CassandraStorage is not started")
def getRefStorageFor(name: String): Option[String] = if (sessions.isDefined) {
try {
- val column = sessions.get.withSession { _ | (TABLE_NAME, name, REF_COLUMN_FAMILY) }
+ val column = sessions.get.withSession {_ | (TABLE_NAME, name, REF_COLUMN_FAMILY)}
Some(column.value)
} catch {
case e =>
e.printStackTrace
- None
+ None
}
} else throw new IllegalStateException("CassandraStorage is not started")
@@ -116,32 +119,32 @@ object CassandraStorage extends Logging {
def insertVectorStorageEntryFor(name: String, element: String) = if (sessions.isDefined) {
sessions.get.withSession {
_ ++| (
- TABLE_NAME,
- name,
- VECTOR_COLUMN_FAMILY + ":" + getVectorStorageSizeFor(name),
- element,
- System.currentTimeMillis,
- BLOCKING_CALL)
+ TABLE_NAME,
+ name,
+ VECTOR_COLUMN_FAMILY + ":" + getVectorStorageSizeFor(name),
+ element,
+ System.currentTimeMillis,
+ BLOCKING_CALL)
}
} else throw new IllegalStateException("CassandraStorage is not started")
def getVectorStorageEntryFor(name: String, index: Int): String = if (sessions.isDefined) {
try {
- val column = sessions.get.withSession { _ | (TABLE_NAME, name, VECTOR_COLUMN_FAMILY + ":" + index) }
+ val column = sessions.get.withSession {_ | (TABLE_NAME, name, VECTOR_COLUMN_FAMILY + ":" + index)}
column.value
} catch {
case e =>
e.printStackTrace
throw new NoSuchElementException(e.getMessage)
- }
+ }
} else throw new IllegalStateException("CassandraStorage is not started")
def getVectorStorageRangeFor(name: String, start: Int, count: Int): List[String] = if (sessions.isDefined) {
- sessions.get.withSession { _ / (TABLE_NAME, name, VECTOR_COLUMN_FAMILY, IS_ASCENDING, count) }.map(_.value)
+ sessions.get.withSession {_ / (TABLE_NAME, name, VECTOR_COLUMN_FAMILY, IS_ASCENDING, count)}.map(_.value)
} else throw new IllegalStateException("CassandraStorage is not started")
def getVectorStorageSizeFor(name: String): Int = if (sessions.isDefined) {
- sessions.get.withSession { _ |# (TABLE_NAME, name, VECTOR_COLUMN_FAMILY) }
+ sessions.get.withSession {_ |# (TABLE_NAME, name, VECTOR_COLUMN_FAMILY)}
} else throw new IllegalStateException("CassandraStorage is not started")
// ===============================================================
@@ -151,12 +154,12 @@ object CassandraStorage extends Logging {
def insertMapStorageEntryFor(name: String, key: String, value: String) = if (sessions.isDefined) {
sessions.get.withSession {
_ ++| (
- TABLE_NAME,
- name,
- MAP_COLUMN_FAMILY + ":" + key,
- value,
- System.currentTimeMillis,
- BLOCKING_CALL)
+ TABLE_NAME,
+ name,
+ MAP_COLUMN_FAMILY + ":" + key,
+ value,
+ System.currentTimeMillis,
+ BLOCKING_CALL)
}
} else throw new IllegalStateException("CassandraStorage is not started")
@@ -170,17 +173,17 @@ object CassandraStorage extends Logging {
}
sessions.get.withSession {
_ ++| (
- new batch_mutation_t(
- TABLE_NAME,
- name,
- columns),
- BLOCKING_CALL)
+ new batch_mutation_t(
+ TABLE_NAME,
+ name,
+ columns),
+ BLOCKING_CALL)
}
} else throw new IllegalStateException("CassandraStorage is not started")
def getMapStorageEntryFor(name: String, key: String): Option[String] = if (sessions.isDefined) {
try {
- val column = sessions.get.withSession { _ | (TABLE_NAME, name, MAP_COLUMN_FAMILY + ":" + key) }
+ val column = sessions.get.withSession {_ | (TABLE_NAME, name, MAP_COLUMN_FAMILY + ":" + key)}
Some(column.value)
} catch {
case e =>
@@ -201,15 +204,15 @@ object CassandraStorage extends Logging {
*/
def getMapStorageSizeFor(name: String): Int = if (sessions.isDefined) {
- sessions.get.withSession { _ |# (TABLE_NAME, name, MAP_COLUMN_FAMILY) }
+ sessions.get.withSession {_ |# (TABLE_NAME, name, MAP_COLUMN_FAMILY)}
} else throw new IllegalStateException("CassandraStorage is not started")
def removeMapStorageFor(name: String) = if (sessions.isDefined) {
- sessions.get.withSession { _ -- (TABLE_NAME, name, MAP_COLUMN_FAMILY, System.currentTimeMillis, BLOCKING_CALL) }
+ sessions.get.withSession {_ -- (TABLE_NAME, name, MAP_COLUMN_FAMILY, System.currentTimeMillis, BLOCKING_CALL)}
} else throw new IllegalStateException("CassandraStorage is not started")
def getMapStorageRangeFor(name: String, start: Int, count: Int): List[Tuple2[String, String]] = if (sessions.isDefined) {
- sessions.get.withSession { _ / (TABLE_NAME, name, MAP_COLUMN_FAMILY, IS_ASCENDING, count) }.toArray.toList.asInstanceOf[List[Tuple2[String, String]]]
+ sessions.get.withSession {_ / (TABLE_NAME, name, MAP_COLUMN_FAMILY, IS_ASCENDING, count)}.toArray.toList.asInstanceOf[List[Tuple2[String, String]]]
} else throw new IllegalStateException("CassandraStorage is not started")
}
@@ -217,80 +220,113 @@ trait CassandraSession extends Closeable with Flushable {
import scala.collection.jcl.Conversions._
import org.scala_tools.javautils.Imports._
- private implicit def null2Option[T](t: T): Option[T] = if(t != null) Some(t) else None
+ private implicit def null2Option[T](t: T): Option[T] = if (t != null) Some(t) else None
protected val client: Cassandra.Client
val obtainedAt: Long
-
- def /(tableName: String, key: String, columnParent: String, start: Option[Int],end: Option[Int]): List[column_t] =
- client.get_slice(tableName, key, columnParent, start.getOrElse(-1),end.getOrElse(-1)).toList
-
- def /(tableName: String, key: String, columnParent: String, colNames: List[String]): List[column_t] =
- client.get_slice_by_names(tableName, key, columnParent, colNames.asJava ).toList
+ val consistencyLevel: Int
- def |(tableName: String, key: String, colPath: String): Option[column_t] =
- client.get_column(tableName, key, colPath)
-
- def |#(tableName: String, key: String, columnParent: String): Int =
- client.get_column_count(tableName, key, columnParent)
-
- def ++|(tableName: String, key: String, columnPath: String, cellData: Array[Byte], timestamp: Long, block: Int) =
- client.insert(tableName, key, columnPath, cellData,timestamp,block)
-
- def ++|(tableName: String, key: String, columnPath: String, cellData: Array[Byte], block: Int) =
- client.insert(tableName,key,columnPath,cellData,obtainedAt,block)
-
- def ++|(batch: batch_mutation_t, block: Int) =
- client.batch_insert(batch, block)
-
- def --(tableName: String, key: String, columnPathOrParent: String, timestamp: Long, block: Int) =
- client.remove(tableName, key, columnPathOrParent, timestamp, block)
-
- def --(tableName: String, key: String, columnPathOrParent: String, block: Int) =
- client.remove(tableName, key, columnPathOrParent, obtainedAt, block)
-
- def /@(tableName: String, key: String, columnParent: String, timestamp: Long): List[column_t] =
- client.get_columns_since(tableName, key, columnParent, timestamp).toList
-
- def /^(tableName: String, key: String, columnFamily: String, start: Option[Int], end: Option[Int], count: Int ): List[superColumn_t] =
- client.get_slice_super(tableName, key,columnFamily, start.getOrElse(-1), end.getOrElse(-1)).toList //TODO upgrade thrift interface to support count
+ def /(keyspace: String, key: String, columnParent: ColumnParent, start: Array[Byte], end: Array[Byte], ascending: Boolean, count: Int): List[Column] =
+ /(keyspace, key, columnParent, start, end, ascending, count, consistencyLevel)
- def /^(tableName: String, key: String, columnFamily: String, superColNames: List[String]): List[superColumn_t] =
- client.get_slice_super_by_names(tableName, key, columnFamily, superColNames.asJava).toList
+ def /(keyspace: String, key: String, columnParent: ColumnParent, start: Array[Byte], end: Array[Byte], ascending: Boolean, count: Int, consistencyLevel: Int): List[Column] =
+ client.get_slice(keyspace, key, columnParent, start, end, ascending, count, consistencyLevel).toList
- def |^(tableName: String, key: String, superColumnPath: String): Option[superColumn_t] =
- client.get_superColumn(tableName,key,superColumnPath)
+ def /(keyspace: String, key: String, columnParent: ColumnParent, colNames: List[Array[Byte]]): List[Column] =
+ /(keyspace, key, columnParent, colNames, consistencyLevel)
- def ++|^ (batch: batch_mutation_super_t, block: Int) =
- client.batch_insert_superColumn(batch, block)
+ def /(keyspace: String, key: String, columnParent: ColumnParent, colNames: List[Array[Byte]], consistencyLevel: Int): List[Column] =
+ client.get_slice_by_names(keyspace, key, columnParent, colNames.asJava, consistencyLevel).toList
- def keys(tableName: String, startsWith: String, stopsAt: String, maxResults: Option[Int]): List[String] =
- client.get_key_range(tableName, startsWith, stopsAt, maxResults.getOrElse(-1)).toList
-
- def property(name: String): String = client.getStringProperty(name)
- def properties(name: String): List[String] = client.getStringListProperty(name).toList
- def describeTable(tableName: String) = client.describeTable(tableName)
-
- def ?(query: String) = client.executeQuery(query)
+ def |(keyspace: String, key: String, colPath: ColumnPath): Option[Column] =
+ |(keyspace, key, colPath, consistencyLevel)
+
+ def |(keyspace: String, key: String, colPath: ColumnPath, consistencyLevel: Int): Option[Column] =
+ client.get_column(keyspace, key, colPath, consistencyLevel)
+
+ def |#(keyspace: String, key: String, columnParent: ColumnParent): Int =
+ |#(keyspace, key, columnParent, consistencyLevel)
+
+ def |#(keyspace: String, key: String, columnParent: ColumnParent, consistencyLevel: Int): Int =
+ client.get_column_count(keyspace, key, columnParent, consistencyLevel)
+
+ def ++|(keyspace: String, key: String, columnPath: ColumnPath, value: Array[Byte]): Unit =
+ ++|(keyspace, key, columnPath, value, obtainedAt, consistencyLevel)
+
+ def ++|(keyspace: String, key: String, columnPath: ColumnPath, value: Array[Byte], timestamp: Long): Unit =
+ ++|(keyspace, key, columnPath, value, timestamp, consistencyLevel)
+
+ def ++|(keyspace: String, key: String, columnPath: ColumnPath, value: Array[Byte], timestamp: Long, consistencyLevel: Int) =
+ client.insert(keyspace, key, columnPath, value, timestamp, consistencyLevel)
+
+ def ++|(keyspace: String, batch: BatchMutation): Unit =
+ ++|(keyspace, batch, consistencyLevel)
+
+ def ++|(keyspace: String, batch: BatchMutation, consistencyLevel: Int): Unit =
+ client.batch_insert(keyspace, batch, consistencyLevel)
+
+ def --(keyspace: String, key: String, columnPathOrParent: ColumnPathOrParent, timestamp: Long): Unit =
+ --(keyspace, key, columnPathOrParent, timestamp, consistencyLevel)
+
+ def --(keyspace: String, key: String, columnPathOrParent: ColumnPathOrParent, timestamp: Long, consistencyLevel: Int): Unit =
+ client.remove(keyspace, key, columnPathOrParent, timestamp, consistencyLevel)
+
+ def /^(keyspace: String, key: String, columnFamily: String, start: Array[Byte], end: Array[Byte], ascending: Boolean, count: Int): List[SuperColumn] =
+ /^(keyspace, key, columnFamily, start, end, ascending, count, consistencyLevel)
+
+ def /^(keyspace: String, key: String, columnFamily: String, start: Array[Byte], end: Array[Byte], ascending: Boolean, count: Int, consistencyLevel: Int): List[SuperColumn] =
+ client.get_slice_super(keyspace, key, columnFamily, start, end, ascending, count, consistencyLevel).toList
+
+ def /^(keyspace: String, key: String, columnFamily: String, superColNames: List[Array[Byte]]): List[SuperColumn] =
+ /^(keyspace, key, columnFamily, superColNames, consistencyLevel)
+
+ def /^(keyspace: String, key: String, columnFamily: String, superColNames: List[Array[Byte]], consistencyLevel: Int): List[SuperColumn] =
+ client.get_slice_super_by_names(keyspace, key, columnFamily, superColNames.asJava, consistencyLevel).toList
+
+ def |^(keyspace: String, key: String, superColumnPath: SuperColumnPath): Option[SuperColumn] =
+ |^(keyspace, key, superColumnPath, consistencyLevel)
+
+ def |^(keyspace: String, key: String, superColumnPath: SuperColumnPath, consistencyLevel: Int): Option[SuperColumn] =
+ client.get_super_column(keyspace, key, superColumnPath, consistencyLevel)
+
+ def ++|^(keyspace: String, batch: BatchMutationSuper): Unit =
+ ++|^(keyspace, batch, consistencyLevel)
+
+ def ++|^(keyspace: String, batch: BatchMutationSuper, consistencyLevel: Int): Unit =
+ client.batch_insert_super_column(keyspace, batch, consistencyLevel)
+
+ def keys(keyspace: String, columnFamily: String, startsWith: String, stopsAt: String, maxResults: Option[Int]): List[String] =
+ client.get_key_range(keyspace, columnFamily, startsWith, stopsAt, maxResults.getOrElse(-1)).toList
+
+ def property(name: String): String = client.get_string_property(name)
+
+ def properties(name: String): List[String] = client.get_string_list_property(name).toList
+
+ def describeTable(keyspace: String) = client.describe_keyspace(keyspace)
+
+ def ?(query: String) = client.execute_query(query)
}
-class CassandraSessionPool[T <: TTransport](transportPool: Pool[T], inputProtocol: Protocol, outputProtocol: Protocol) extends Closeable {
- def this(transportPool: Pool[T], ioProtocol: Protocol) = this(transportPool,ioProtocol,ioProtocol)
-
- def newSession: CassandraSession = {
+class CassandraSessionPool[T <: TTransport](transportPool: Pool[T], inputProtocol: Protocol, outputProtocol: Protocol, defConsistencyLvl: Int) extends Closeable {
+ def this(transportPool: Pool[T], ioProtocol: Protocol, defConsistencyLvl: Int) = this (transportPool, ioProtocol, ioProtocol, defConsistencyLvl)
+
+ def newSession: CassandraSession = newSession(defConsistencyLvl)
+
+ def newSession(consistencyLvl: Int): CassandraSession = {
val t = transportPool.borrowObject
- val c = new Cassandra.Client(inputProtocol(t),outputProtocol(t))
+ val c = new Cassandra.Client(inputProtocol(t), outputProtocol(t))
new CassandraSession {
val client = c
val obtainedAt = System.currentTimeMillis
+ val consistencyLevel = consistencyLvl
def flush = t.flush
def close = transportPool.returnObject(t)
}
}
def withSession[R](body: CassandraSession => R) = {
- val session = newSession
+ val session = newSession(defConsistencyLvl)
try {
val result = body(session)
session.flush
@@ -299,7 +335,7 @@ class CassandraSessionPool[T <: TTransport](transportPool: Pool[T], inputProtoco
session.close
}
}
-
+
def close = transportPool.close
}
@@ -320,221 +356,217 @@ object Protocol {
*
* @author Jonas Bonér
*
-object EmbeddedCassandraStorage extends Logging {
- val TABLE_NAME = "akka"
- val MAP_COLUMN_FAMILY = "map"
- val VECTOR_COLUMN_FAMILY = "vector"
- val REF_COLUMN_FAMILY = "ref:item"
+object EmbeddedCassandraStorage extends Logging {
+val TABLE_NAME = "akka"
+val MAP_COLUMN_FAMILY = "map"
+val VECTOR_COLUMN_FAMILY = "vector"
+val REF_COLUMN_FAMILY = "ref:item"
- val IS_ASCENDING = true
+val IS_ASCENDING = true
- val RUN_THRIFT_SERVICE = kernel.Kernel.config.getBool("akka.storage.cassandra.thrift-server.service", false)
- val BLOCKING_CALL = {
- if (kernel.Kernel.config.getBool("akka.storage.cassandra.blocking", true)) 0
- else 1
- }
+val RUN_THRIFT_SERVICE = kernel.Kernel.config.getBool("akka.storage.cassandra.thrift-server.service", false)
+val BLOCKING_CALL = {
+if (kernel.Kernel.config.getBool("akka.storage.cassandra.blocking", true)) 0
+else 1 }
- @volatile private[this] var isRunning = false
- private[this] val serializer: Serializer = {
- kernel.Kernel.config.getString("akka.storage.cassandra.storage-format", "java") match {
- case "scala-json" => Serializer.ScalaJSON
- case "java-json" => Serializer.JavaJSON
- case "protobuf" => Serializer.Protobuf
- case "java" => Serializer.Java
- case "sbinary" => throw new UnsupportedOperationException("SBinary serialization protocol is not yet supported for storage")
- case "avro" => throw new UnsupportedOperationException("Avro serialization protocol is not yet supported for storage")
- case unknown => throw new UnsupportedOperationException("Unknown storage serialization protocol [" + unknown + "]")
- }
- }
-
- // TODO: is this server thread-safe or needed to be wrapped up in an actor?
- private[this] val server = classOf[CassandraServer].newInstance.asInstanceOf[CassandraServer]
+@volatile private[this] var isRunning = false
+private[this] val serializer: Serializer = {
+kernel.Kernel.config.getString("akka.storage.cassandra.storage-format", "java") match {
+case "scala-json" => Serializer.ScalaJSON
+case "java-json" => Serializer.JavaJSON
+case "protobuf" => Serializer.Protobuf
+case "java" => Serializer.Java
+case "sbinary" => throw new UnsupportedOperationException("SBinary serialization protocol is not yet supported for storage")
+case "avro" => throw new UnsupportedOperationException("Avro serialization protocol is not yet supported for storage")
+case unknown => throw new UnsupportedOperationException("Unknown storage serialization protocol [" + unknown + "]")
+}
+}
- private[this] var thriftServer: CassandraThriftServer = _
-
- def start = synchronized {
- if (!isRunning) {
- try {
- server.start
- log.info("Cassandra persistent storage has started up successfully");
- } catch {
- case e =>
- log.error("Could not start up Cassandra persistent storage")
- throw e
- }
- if (RUN_THRIFT_SERVICE) {
- thriftServer = new CassandraThriftServer(server)
- thriftServer.start
- }
- isRunning
- }
- }
+// TODO: is this server thread-safe or needed to be wrapped up in an actor?
+private[this] val server = classOf[CassandraServer].newInstance.asInstanceOf[CassandraServer]
- def stop = if (isRunning) {
- //server.storageService.shutdown
- if (RUN_THRIFT_SERVICE) thriftServer.stop
- }
+private[this] var thriftServer: CassandraThriftServer = _
- // ===============================================================
- // For Ref
- // ===============================================================
+def start = synchronized {
+if (!isRunning) {
+try {
+server.start
+log.info("Cassandra persistent storage has started up successfully");
+} catch {
+case e =>
+log.error("Could not start up Cassandra persistent storage")
+throw e
+}
+if (RUN_THRIFT_SERVICE) {
+thriftServer = new CassandraThriftServer(server)
+thriftServer.start
+}
+isRunning
+}
+}
- def insertRefStorageFor(name: String, element: AnyRef) = {
- server.insert(
- TABLE_NAME,
- name,
- REF_COLUMN_FAMILY,
- element,
- System.currentTimeMillis,
- BLOCKING_CALL)
- }
+def stop = if (isRunning) {
+//server.storageService.shutdown
+if (RUN_THRIFT_SERVICE) thriftServer.stop
+}
- def getRefStorageFor(name: String): Option[AnyRef] = {
- try {
- val column = server.get_column(TABLE_NAME, name, REF_COLUMN_FAMILY)
- Some(serializer.in(column.value, None))
- } catch {
- case e =>
- e.printStackTrace
- None
- }
- }
+// ===============================================================
+// For Ref
+// ===============================================================
- // ===============================================================
- // For Vector
- // ===============================================================
+def insertRefStorageFor(name: String, element: AnyRef) = {
+server.insert(
+TABLE_NAME,
+name,
+REF_COLUMN_FAMILY,
+element,
+System.currentTimeMillis,
+BLOCKING_CALL)
+}
- def insertVectorStorageEntryFor(name: String, element: AnyRef) = {
- server.insert(
- TABLE_NAME,
- name,
- VECTOR_COLUMN_FAMILY + ":" + getVectorStorageSizeFor(name),
- element,
- System.currentTimeMillis,
- BLOCKING_CALL)
- }
+def getRefStorageFor(name: String): Option[AnyRef] = {
+try {
+val column = server.get_column(TABLE_NAME, name, REF_COLUMN_FAMILY)
+Some(serializer.in(column.value, None))
+} catch {
+case e =>
+e.printStackTrace
+None }
+}
- def getVectorStorageEntryFor(name: String, index: Int): AnyRef = {
- try {
- val column = server.get_column(TABLE_NAME, name, VECTOR_COLUMN_FAMILY + ":" + index)
- serializer.in(column.value, None)
- } catch {
- case e =>
- e.printStackTrace
- throw new Predef.NoSuchElementException(e.getMessage)
- }
- }
+// ===============================================================
+// For Vector
+// ===============================================================
- def getVectorStorageRangeFor(name: String, start: Int, count: Int): List[AnyRef] =
- server.get_slice(TABLE_NAME, name, VECTOR_COLUMN_FAMILY, IS_ASCENDING, count)
- .toArray.toList.asInstanceOf[List[Tuple2[String, AnyRef]]].map(tuple => tuple._2)
+def insertVectorStorageEntryFor(name: String, element: AnyRef) = {
+server.insert(
+TABLE_NAME,
+name,
+VECTOR_COLUMN_FAMILY + ":" + getVectorStorageSizeFor(name),
+element,
+System.currentTimeMillis,
+BLOCKING_CALL)
+}
- def getVectorStorageSizeFor(name: String): Int =
- server.get_column_count(TABLE_NAME, name, VECTOR_COLUMN_FAMILY)
+def getVectorStorageEntryFor(name: String, index: Int): AnyRef = {
+try {
+val column = server.get_column(TABLE_NAME, name, VECTOR_COLUMN_FAMILY + ":" + index)
+serializer.in(column.value, None)
+} catch {
+case e =>
+e.printStackTrace
+throw new Predef.NoSuchElementException(e.getMessage)
+}
+}
- // ===============================================================
- // For Map
- // ===============================================================
+def getVectorStorageRangeFor(name: String, start: Int, count: Int): List[AnyRef] =
+server.get_slice(TABLE_NAME, name, VECTOR_COLUMN_FAMILY, IS_ASCENDING, count)
+.toArray.toList.asInstanceOf[List[Tuple2[String, AnyRef]]].map(tuple => tuple._2)
- def insertMapStorageEntryFor(name: String, key: String, value: AnyRef) = {
- server.insert(
- TABLE_NAME,
- name,
- MAP_COLUMN_FAMILY + ":" + key,
- serializer.out(value),
- System.currentTimeMillis,
- BLOCKING_CALL)
- }
+def getVectorStorageSizeFor(name: String): Int =
+server.get_column_count(TABLE_NAME, name, VECTOR_COLUMN_FAMILY)
- def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[String, AnyRef]]) = {
- import java.util.{Map, HashMap, List, ArrayList}
- val columns: Map[String, List[column_t]] = new HashMap
- for (entry <- entries) {
- val cls: List[column_t] = new ArrayList
- cls.add(new column_t(entry._1, serializer.out(entry._2), System.currentTimeMillis))
- columns.put(MAP_COLUMN_FAMILY, cls)
- }
- server.batch_insert(new batch_mutation_t(
- TABLE_NAME,
- name,
- columns),
- BLOCKING_CALL)
- }
+// ===============================================================
+// For Map
+// ===============================================================
- def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef] = {
- try {
- val column = server.get_column(TABLE_NAME, name, MAP_COLUMN_FAMILY + ":" + key)
- Some(serializer.in(column.value, None))
- } catch {
- case e =>
- e.printStackTrace
- None
- }
- }
+def insertMapStorageEntryFor(name: String, key: String, value: AnyRef) = {
+server.insert(
+TABLE_NAME, name,
+MAP_COLUMN_FAMILY + ":" + key,
+serializer.out(value),
+System.currentTimeMillis,
+BLOCKING_CALL)
+}
- def getMapStorageFor(name: String): List[Tuple2[String, AnyRef]] = {
- val columns = server.get_columns_since(TABLE_NAME, name, MAP_COLUMN_FAMILY, -1)
- .toArray.toList.asInstanceOf[List[org.apache.cassandra.service.column_t]]
- for {
- column <- columns
- col = (column.columnName, serializer.in(column.value, None))
- } yield col
- }
-
- def getMapStorageSizeFor(name: String): Int =
- server.get_column_count(TABLE_NAME, name, MAP_COLUMN_FAMILY)
+def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[String, AnyRef]]) = {
+import java.util.{ Map, HashMap, List, ArrayList }
+val columns: Map[String, List[column_t]] = new HashMap
+for (entry <- entries) {
+val cls: List[column_t] = new ArrayList
+cls.add(new column_t(entry._1, serializer.out(entry._2), System.currentTimeMillis))
+columns.put(MAP_COLUMN_FAMILY, cls)
+}
+server.batch_insert(new batch_mutation_t(
+TABLE_NAME, name,
+columns),
+BLOCKING_CALL)
+}
- def removeMapStorageFor(name: String) =
- server.remove(TABLE_NAME, name, MAP_COLUMN_FAMILY, System.currentTimeMillis, BLOCKING_CALL)
+def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef] = {
+try {
+val column = server.get_column(TABLE_NAME, name, MAP_COLUMN_FAMILY + ":" + key)
+Some(serializer.in(column.value, None))
+} catch {
+case e =>
+e.printStackTrace
+None
+}
+}
- def getMapStorageRangeFor(name: String, start: Int, count: Int): List[Tuple2[String, AnyRef]] = {
- server.get_slice(TABLE_NAME, name, MAP_COLUMN_FAMILY, IS_ASCENDING, count)
- .toArray.toList.asInstanceOf[List[Tuple2[String, AnyRef]]]
- }
+def getMapStorageFor(name: String): List[Tuple2[String, AnyRef]] = {
+val columns = server.get_columns_since(TABLE_NAME, name, MAP_COLUMN_FAMILY, -1)
+.toArray.toList.asInstanceOf[List[org.apache.cassandra.service.column_t]]
+for {
+column <- columns
+col = (column.columnName, serializer.in(column.value, None))
+} yield col
+}
+
+def getMapStorageSizeFor(name: String): Int =
+server.get_column_count(TABLE_NAME, name, MAP_COLUMN_FAMILY)
+
+def removeMapStorageFor(name: String) =
+server.remove(TABLE_NAME, name, MAP_COLUMN_FAMILY, System.currentTimeMillis, BLOCKING_CALL)
+
+def getMapStorageRangeFor(name: String, start: Int, count: Int): List[Tuple2[String, AnyRef]] = {
+server.get_slice(TABLE_NAME, name, MAP_COLUMN_FAMILY, IS_ASCENDING, count)
+.toArray.toList.asInstanceOf[List[Tuple2[String, AnyRef]]]
+}
}
-class CassandraThriftServer(server: CassandraServer) extends Logging {
- case object Start
- case object Stop
+class CassandraThriftServer(server: CassandraServer) extends Logging {
+case object Start
+case object Stop
- private[this] val serverEngine: TThreadPoolServer = try {
- val pidFile = kernel.Kernel.config.getString("akka.storage.cassandra.thrift-server.pidfile", "akka.pid")
- if (pidFile != null) new File(pidFile).deleteOnExit();
- val listenPort = DatabaseDescriptor.getThriftPort
+private[this] val serverEngine: TThreadPoolServer = try {
+val pidFile = kernel.Kernel.config.getString("akka.storage.cassandra.thrift-server.pidfile", "akka.pid")
+if (pidFile != null) new File(pidFile).deleteOnExit();
+val listenPort = DatabaseDescriptor.getThriftPort
- val processor = new Cassandra.Processor(server)
- val tServerSocket = new TServerSocket(listenPort)
- val tProtocolFactory = new TBinaryProtocol.Factory
+val processor = new Cassandra.Processor(server)
+val tServerSocket = new TServerSocket(listenPort)
+val tProtocolFactory = new TBinaryProtocol.Factory
- val options = new TThreadPoolServer.Options
- options.minWorkerThreads = 64
- new TThreadPoolServer(new TProcessorFactory(processor),
- tServerSocket,
- new TTransportFactory,
- new TTransportFactory,
- tProtocolFactory,
- tProtocolFactory,
- options)
- } catch {
- case e =>
- log.error("Could not start up Cassandra thrift service")
- throw e
- }
-
- import scala.actors.Actor._
- private[this] val serverDaemon = actor {
- receive {
- case Start =>
- serverEngine.serve
- log.info("Cassandra thrift service has starting up successfully")
- case Stop =>
- log.info("Cassandra thrift service is shutting down...")
- serverEngine.stop
- }
- }
-
- def start = serverDaemon ! Start
- def stop = serverDaemon ! Stop
+val options = new TThreadPoolServer.Options
+options.minWorkerThreads = 64
+new TThreadPoolServer(new TProcessorFactory(processor),
+tServerSocket,
+new TTransportFactory,
+new TTransportFactory,
+tProtocolFactory,
+tProtocolFactory,
+options)
+} catch {
+case e =>
+log.error("Could not start up Cassandra thrift service")
+throw e
}
-*/
+
+import scala.actors.Actor._
+private[this] val serverDaemon = actor {
+receive {
+case Start =>
+serverEngine.serve
+log.info("Cassandra thrift service has starting up successfully")
+case Stop =>
+log.info("Cassandra thrift service is shutting down...")
+serverEngine.stop
+}
+}
+
+def start = serverDaemon ! Start
+def stop = serverDaemon ! Stop
+}
+ */
diff --git a/samples-lift/akka-samples-lift.iml b/samples-lift/akka-samples-lift.iml
index 70037ebd8b..7e8bb22ce4 100644
--- a/samples-lift/akka-samples-lift.iml
+++ b/samples-lift/akka-samples-lift.iml
@@ -39,11 +39,6 @@
-
-
-
-
-
@@ -57,6 +52,7 @@
+
@@ -73,6 +69,10 @@
+
+
+
+
diff --git a/samples-scala/akka-samples-scala.iml b/samples-scala/akka-samples-scala.iml
index ee46aa485c..05d2b682d7 100644
--- a/samples-scala/akka-samples-scala.iml
+++ b/samples-scala/akka-samples-scala.iml
@@ -50,6 +50,7 @@
+