upgraded to latest version of Cassandra, some API changes
This commit is contained in:
parent
49f433b012
commit
a153ece1f5
39 changed files with 117 additions and 97 deletions
|
|
@ -5,6 +5,7 @@
|
|||
package se.scalablesolutions.akka.kernel
|
||||
|
||||
import java.io.File
|
||||
import java.lang.reflect.Constructor
|
||||
|
||||
import org.apache.cassandra.config.DatabaseDescriptor
|
||||
import org.apache.cassandra.service._
|
||||
|
|
@ -18,9 +19,16 @@ final object CassandraNode extends Logging {
|
|||
val ACTOR_KEY_PREFIX = "actor"
|
||||
val ACTOR_MAP_COLUMN_FAMILY = "map"
|
||||
|
||||
// TODO: make pluggable (JSON, Thrift, Protobuf etc.)
|
||||
private[this] var serializer: Serializer = new JavaSerializationSerializer
|
||||
|
||||
// TODO: is this server thread-safe or needed to be wrapped up in an actor?
|
||||
private[this] val server = new CassandraServer
|
||||
|
||||
private[this] val server = {
|
||||
val ctor = classOf[CassandraServer].getConstructor(Array[Class[_]]():_*)
|
||||
ctor.setAccessible(true)
|
||||
ctor.newInstance(Array[AnyRef]():_*).asInstanceOf[CassandraServer]
|
||||
}
|
||||
|
||||
def start = {
|
||||
try {
|
||||
server.start
|
||||
|
|
@ -32,45 +40,44 @@ final object CassandraNode extends Logging {
|
|||
}
|
||||
}
|
||||
|
||||
def stop = server.shutdown
|
||||
def stop = {}
|
||||
|
||||
def insertActorStorageEntry(actorName: String, entry: String, content: String) = {
|
||||
def insertActorStorageEntry(actorName: String, entry: String, content: AnyRef) = {
|
||||
server.insert(
|
||||
TABLE_NAME,
|
||||
ACTOR_KEY_PREFIX + ":" + actorName,
|
||||
ACTOR_MAP_COLUMN_FAMILY + ":" + entry,
|
||||
content,
|
||||
serializer.out(content),
|
||||
System.currentTimeMillis)
|
||||
}
|
||||
|
||||
def insertActorStorageEntries(actorName: String, entries: List[Tuple2[String, String]]) = {
|
||||
def insertActorStorageEntries(actorName: String, entries: List[Tuple2[String, AnyRef]]) = {
|
||||
import java.util.{Map, HashMap, List, ArrayList}
|
||||
val columns: Map[String, List[column_t]] = new HashMap
|
||||
for (entry <- entries) {
|
||||
val cls: List[column_t] = new ArrayList
|
||||
cls.add(new column_t(entry._1, entry._2, System.currentTimeMillis))
|
||||
cls.add(new column_t(entry._1, serializer.out(entry._2), System.currentTimeMillis))
|
||||
columns.put(ACTOR_MAP_COLUMN_FAMILY, cls)
|
||||
}
|
||||
server.batch_insert_blocking(new batch_mutation_t(
|
||||
TABLE_NAME,
|
||||
ACTOR_KEY_PREFIX + ":" + actorName,
|
||||
columns,
|
||||
new HashMap[String, List[column_t]]))
|
||||
columns))
|
||||
}
|
||||
|
||||
def getActorStorageEntryFor(actorName: String, entry: String): Option[String] = {
|
||||
def getActorStorageEntryFor(actorName: String, entry: AnyRef): Option[AnyRef] = {
|
||||
try {
|
||||
val column = server.get_column(TABLE_NAME, ACTOR_KEY_PREFIX + ":" + actorName, ACTOR_MAP_COLUMN_FAMILY + ":" + entry)
|
||||
Some(column.value)
|
||||
Some(serializer.in(column.value))
|
||||
} catch { case e => None }
|
||||
}
|
||||
|
||||
def getActorStorageFor(actorName: String): List[Tuple2[String, String]] = {
|
||||
def getActorStorageFor(actorName: String): List[Tuple2[String, AnyRef]] = {
|
||||
val columns = server.get_columns_since(TABLE_NAME, ACTOR_KEY_PREFIX, ACTOR_MAP_COLUMN_FAMILY, -1)
|
||||
.toArray.toList.asInstanceOf[List[org.apache.cassandra.service.column_t]]
|
||||
for {
|
||||
column <- columns
|
||||
col = (column.columnName, column.value)
|
||||
col = (column.columnName, serializer.in(column.value))
|
||||
} yield col
|
||||
}
|
||||
|
||||
|
|
@ -78,11 +85,11 @@ final object CassandraNode extends Logging {
|
|||
server.get_column_count(TABLE_NAME, ACTOR_KEY_PREFIX + ":" + actorName, ACTOR_MAP_COLUMN_FAMILY)
|
||||
|
||||
def removeActorStorageFor(actorName: String) =
|
||||
server.remove(TABLE_NAME, ACTOR_KEY_PREFIX + ":" + actorName, ACTOR_MAP_COLUMN_FAMILY)
|
||||
server.remove(TABLE_NAME, ACTOR_KEY_PREFIX + ":" + actorName, ACTOR_MAP_COLUMN_FAMILY, System.currentTimeMillis, false)
|
||||
|
||||
def getActorStorageRange(actorName: String, start: Int, count: Int): List[Tuple2[String, String]] =
|
||||
def getActorStorageRange(actorName: String, start: Int, count: Int): List[Tuple2[String, AnyRef]] =
|
||||
server.get_slice(TABLE_NAME, ACTOR_KEY_PREFIX + ":" + actorName, ACTOR_MAP_COLUMN_FAMILY, start, count)
|
||||
.toArray.toList.asInstanceOf[List[Tuple2[String, String]]]
|
||||
.toArray.toList.asInstanceOf[List[Tuple2[String, AnyRef]]]
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
|
|||
|
|
@ -89,7 +89,8 @@ class GenericServerContainer(
|
|||
private[kernel] var lifeCycle: Option[LifeCycle] = None
|
||||
private[kernel] val lock = new ReadWriteLock
|
||||
private[kernel] val txItemsLock = new ReadWriteLock
|
||||
|
||||
private[kernel] val serializer = new JavaSerializationSerializer
|
||||
|
||||
private var server: GenericServer = _
|
||||
private var currentConfig: Option[AnyRef] = None
|
||||
private var timeout = 5000
|
||||
|
|
@ -315,7 +316,7 @@ class GenericServerContainer(
|
|||
|
||||
private[kernel] def cloneServerAndReturnOldVersion: GenericServer = lock.withWriteLock {
|
||||
val oldServer = server
|
||||
server = Serializer.deepClone(server)
|
||||
server = serializer.deepClone(server)
|
||||
oldServer
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -9,7 +9,15 @@ import java.io.{ObjectOutputStream, ByteArrayOutputStream, ObjectInputStream, By
|
|||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
object Serializer {
|
||||
trait Serializer {
|
||||
def out(obj: AnyRef): Array[Byte]
|
||||
def in(bytes: Array[Byte]): AnyRef
|
||||
}
|
||||
|
||||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
class JavaSerializationSerializer extends Serializer {
|
||||
def deepClone[T <: AnyRef](obj: T): T = in(out(obj)).asInstanceOf[T]
|
||||
|
||||
def out(obj: AnyRef): Array[Byte] = {
|
||||
|
|
|
|||
|
|
@ -83,7 +83,7 @@ class InMemoryTransactionalMap[K, V] extends TransactionalMap[K, V] {
|
|||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
class CassandraPersistentTransactionalMap(actorNameInstance: AnyRef) extends PersistentTransactionalMap[String, String] {
|
||||
class CassandraPersistentTransactionalMap(actorNameInstance: AnyRef) extends PersistentTransactionalMap[String, AnyRef] {
|
||||
val actorName = actorNameInstance.getClass.getName
|
||||
override def begin = {}
|
||||
override def rollback = {}
|
||||
|
|
@ -96,7 +96,7 @@ class CassandraPersistentTransactionalMap(actorNameInstance: AnyRef) extends Per
|
|||
}
|
||||
}
|
||||
|
||||
override def get(key: String): String = CassandraNode.getActorStorageEntryFor(actorName, key)
|
||||
override def get(key: String): AnyRef = CassandraNode.getActorStorageEntryFor(actorName, key)
|
||||
.getOrElse(throw new NoSuchElementException("Could not find element for key [" + key + "]"))
|
||||
|
||||
override def contains(key: String): Boolean = CassandraNode.getActorStorageEntryFor(actorName, key).isDefined
|
||||
|
|
@ -107,12 +107,12 @@ class CassandraPersistentTransactionalMap(actorNameInstance: AnyRef) extends Per
|
|||
|
||||
override def getRange(start: Int, count: Int) = CassandraNode.getActorStorageRange(actorName, start, count)
|
||||
|
||||
override def elements: Iterator[Tuple2[String, String]] = {
|
||||
new Iterator[Tuple2[String, String]] {
|
||||
private val originalList: List[Tuple2[String, String]] = CassandraNode.getActorStorageFor(actorName)
|
||||
override def elements: Iterator[Tuple2[String, AnyRef]] = {
|
||||
new Iterator[Tuple2[String, AnyRef]] {
|
||||
private val originalList: List[Tuple2[String, AnyRef]] = CassandraNode.getActorStorageFor(actorName)
|
||||
private var elements = originalList.reverse
|
||||
|
||||
override def next: Tuple2[String, String]= synchronized {
|
||||
override def next: Tuple2[String, AnyRef]= synchronized {
|
||||
val element = elements.head
|
||||
elements = elements.tail
|
||||
element
|
||||
|
|
|
|||
|
|
@ -141,6 +141,8 @@ class Supervisor(faultHandler: FaultHandlingStrategy) extends Actor with Logging
|
|||
}
|
||||
}
|
||||
|
||||
def stop = Actor.self ! Stop
|
||||
|
||||
def act = {
|
||||
self.trapExit = true
|
||||
loop {
|
||||
|
|
|
|||
|
|
@ -93,16 +93,18 @@ class Transaction extends Logging {
|
|||
private def ensureIsActive = if (status != TransactionStatus.Active)
|
||||
throw new IllegalStateException("Expected ACTIVE transaction - current status [" + status + "]")
|
||||
|
||||
private def ensureIsActiveOrAborted =
|
||||
if (!(status == TransactionStatus.Active || status == TransactionStatus.Aborted))
|
||||
throw new IllegalStateException("Expected ACTIVE or ABORTED transaction - current status [" + status + "]")
|
||||
private def ensureIsActiveOrAborted = if (!(status == TransactionStatus.Active || status == TransactionStatus.Aborted))
|
||||
throw new IllegalStateException("Expected ACTIVE or ABORTED transaction - current status [" + status + "]")
|
||||
|
||||
override def equals(that: Any): Boolean =
|
||||
override def equals(that: Any): Boolean = synchronized {
|
||||
that != null &&
|
||||
that.isInstanceOf[Transaction] &&
|
||||
that.asInstanceOf[Transaction].id == this.id
|
||||
}
|
||||
|
||||
override def hashCode(): Int = id.toInt
|
||||
|
||||
override def toString(): String = "Transaction[" + id + ", " + status + "]"
|
||||
}
|
||||
override def toString(): String = synchronized {
|
||||
"Transaction[" + id + ", " + status + "]"
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue