-Dcassandra -Dstorage-config=config/ -Dpidfile=akka.pid
- *
* @author Jonas Bonér
*/
object CassandraStorage extends Logging {
- val TABLE_NAME = "akka"
- val MAP_COLUMN_FAMILY = "map"
- val VECTOR_COLUMN_FAMILY = "vector"
- val REF_COLUMN_FAMILY = "ref:item"
+ val KEYSPACE = "akka"
+ val MAP_COLUMN_PARENT = new ColumnParent("map", null)
+ val VECTOR_COLUMN_PARENT = new ColumnParent("vector", null)
+ val REF_COLUMN_PARENT = new ColumnParent("ref", null)
+ val REF_KEY = "item".getBytes("UTF-8")
+ val CASSANDRA_SERVER_HOSTNAME = config.getString("akka.storage.cassandra.hostname", "127.0.0.1")
+ val CASSANDRA_SERVER_PORT = config.getInt("akka.storage.cassandra.port", 9160)
+ val CONSISTENCY_LEVEL = config.getInt("akka.storage.cassandra.consistency-level", 1)
val IS_ASCENDING = true
- val RUN_THRIFT_SERVICE = kernel.Kernel.config.getBool("akka.storage.cassandra.thrift-server.service", false)
- val BLOCKING_CALL = {
- if (kernel.Kernel.config.getBool("akka.storage.cassandra.blocking", true)) 0
- else 1
- }
-
@volatile private[this] var isRunning = false
+ private[this] val protocol: Protocol = Protocol.Binary
+/* {
+ config.getString("akka.storage.cassandra.procotol", "binary") match {
+ case "binary" => Protocol.Binary
+ case "json" => Protocol.JSON
+ case "simple-json" => Protocol.SimpleJSON
+ case unknown => throw new UnsupportedOperationException("Unknown storage serialization protocol [" + unknown + "]")
+ }
+ }
+*/
+
private[this] val serializer: Serializer = {
kernel.Kernel.config.getString("akka.storage.cassandra.storage-format", "java") match {
case "scala-json" => Serializer.ScalaJSON
@@ -51,193 +55,400 @@ object CassandraStorage extends Logging {
case unknown => throw new UnsupportedOperationException("Unknown storage serialization protocol [" + unknown + "]")
}
}
-
- // TODO: is this server thread-safe or needed to be wrapped up in an actor?
- private[this] val server = classOf[CassandraServer].newInstance.asInstanceOf[CassandraServer]
- private[this] var thriftServer: CassandraThriftServer = _
-
+ private[this] var sessions: Option[CassandraSessionPool[_]] = None
+
def start = synchronized {
if (!isRunning) {
try {
- server.start
+ sessions = Some(new CassandraSessionPool(
+ KEYSPACE,
+ StackPool(SocketProvider(CASSANDRA_SERVER_HOSTNAME, CASSANDRA_SERVER_PORT)),
+ protocol,
+ CONSISTENCY_LEVEL))
log.info("Cassandra persistent storage has started up successfully");
} catch {
case e =>
log.error("Could not start up Cassandra persistent storage")
throw e
}
- if (RUN_THRIFT_SERVICE) {
- thriftServer = new CassandraThriftServer(server)
- thriftServer.start
- }
isRunning
}
}
- def stop = if (isRunning) {
- //server.storageService.shutdown
- if (RUN_THRIFT_SERVICE) thriftServer.stop
+ def stop = synchronized {
+ if (isRunning && sessions.isDefined) sessions.get.close
}
// ===============================================================
// For Ref
// ===============================================================
- def insertRefStorageFor(name: String, element: AnyRef) = {
- server.insert(
- TABLE_NAME,
- name,
- REF_COLUMN_FAMILY,
- serializer.out(element),
- System.currentTimeMillis,
- BLOCKING_CALL)
- }
-
- def getRefStorageFor(name: String): Option[AnyRef] = {
- try {
- val column = server.get_column(TABLE_NAME, name, REF_COLUMN_FAMILY)
- Some(serializer.in(column.value, None))
- } catch {
- case e =>
- e.printStackTrace
- None
+ def insertRefStorageFor(name: String, element: AnyRef) = if (sessions.isDefined) {
+ sessions.get.withSession {
+ _ ++| (name,
+ new ColumnPath(REF_COLUMN_PARENT.getColumn_family, null, REF_KEY),
+ serializer.out(element),
+ System.currentTimeMillis,
+ CONSISTENCY_LEVEL)
}
- }
+ } else throw new IllegalStateException("CassandraStorage is not started")
- // ===============================================================
- // For Vector
- // ===============================================================
-
- def insertVectorStorageEntryFor(name: String, element: AnyRef) = {
- server.insert(
- TABLE_NAME,
- name,
- VECTOR_COLUMN_FAMILY + ":" + getVectorStorageSizeFor(name),
- serializer.out(element),
- System.currentTimeMillis,
- BLOCKING_CALL)
- }
-
- def getVectorStorageEntryFor(name: String, index: Int): AnyRef = {
+ def getRefStorageFor(name: String): Option[AnyRef] = if (sessions.isDefined) {
try {
- val column = server.get_column(TABLE_NAME, name, VECTOR_COLUMN_FAMILY + ":" + index)
- serializer.in(column.value, None)
- } catch {
- case e =>
- e.printStackTrace
- throw new Predef.NoSuchElementException(e.getMessage)
- }
- }
-
- def getVectorStorageRangeFor(name: String, start: Int, count: Int): List[AnyRef] =
- server.get_slice(TABLE_NAME, name, VECTOR_COLUMN_FAMILY, IS_ASCENDING, count)
- .toArray.toList.asInstanceOf[List[Tuple2[String, AnyRef]]].map(tuple => tuple._2)
-
- def getVectorStorageSizeFor(name: String): Int =
- server.get_column_count(TABLE_NAME, name, VECTOR_COLUMN_FAMILY)
-
- // ===============================================================
- // For Map
- // ===============================================================
-
- def insertMapStorageEntryFor(name: String, key: String, value: AnyRef) = {
- server.insert(
- TABLE_NAME,
- name,
- MAP_COLUMN_FAMILY + ":" + key,
- serializer.out(value),
- System.currentTimeMillis,
- BLOCKING_CALL)
- }
-
- def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[String, AnyRef]]) = {
- import java.util.{Map, HashMap, List, ArrayList}
- val columns: Map[String, List[column_t]] = new HashMap
- for (entry <- entries) {
- val cls: List[column_t] = new ArrayList
- cls.add(new column_t(entry._1, serializer.out(entry._2), System.currentTimeMillis))
- columns.put(MAP_COLUMN_FAMILY, cls)
- }
- server.batch_insert(new batch_mutation_t(
- TABLE_NAME,
- name,
- columns),
- BLOCKING_CALL)
- }
-
- def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef] = {
- try {
- val column = server.get_column(TABLE_NAME, name, MAP_COLUMN_FAMILY + ":" + key)
- Some(serializer.in(column.value, None))
+ val column: Option[Column] = sessions.get.withSession {
+ _ | (name, new ColumnPath(REF_COLUMN_PARENT.getColumn_family, null, REF_KEY))
+ }
+ if (column.isDefined) Some(serializer.in(column.get.value, None))
+ else None
} catch {
case e =>
e.printStackTrace
None
}
- }
+ } else throw new IllegalStateException("CassandraStorage is not started")
- def getMapStorageFor(name: String): List[Tuple2[String, AnyRef]] = {
- val columns = server.get_columns_since(TABLE_NAME, name, MAP_COLUMN_FAMILY, -1)
+ // ===============================================================
+ // For Vector
+ // ===============================================================
+
+ def insertVectorStorageEntryFor(name: String, element: AnyRef) = if (sessions.isDefined) {
+ sessions.get.withSession {
+ _ ++| (name,
+ new ColumnPath(VECTOR_COLUMN_PARENT.getColumn_family, null, intToBytes(getVectorStorageSizeFor(name))),
+ serializer.out(element),
+ System.currentTimeMillis,
+ CONSISTENCY_LEVEL)
+ }
+ } else throw new IllegalStateException("CassandraStorage is not started")
+
+ def getVectorStorageEntryFor(name: String, index: Int): AnyRef = if (sessions.isDefined) {
+ val column: Option[Column] = sessions.get.withSession {
+ _ | (name, new ColumnPath(VECTOR_COLUMN_PARENT.getColumn_family, null, intToBytes(index)))
+ }
+ if (column.isDefined) serializer.in(column.get.value, None)
+ else throw new NoSuchElementException("No element for vector [" + name + "] and index [" + index + "]")
+ } else throw new IllegalStateException("CassandraStorage is not started")
+
+ def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[AnyRef] = if (sessions.isDefined) {
+ val startBytes = if (start.isDefined) intToBytes(start.get) else null
+ val finishBytes = if (finish.isDefined) intToBytes(finish.get) else null
+ val columns: List[Column] = sessions.get.withSession {
+ _ / (name,
+ VECTOR_COLUMN_PARENT,
+ startBytes, finishBytes,
+ IS_ASCENDING,
+ count,
+ CONSISTENCY_LEVEL)
+ }
+ columns.map(column => serializer.in(column.value, None))
+ } else throw new IllegalStateException("CassandraStorage is not started")
+
+ def getVectorStorageSizeFor(name: String): Int = if (sessions.isDefined) {
+ sessions.get.withSession {
+ _ |# (name, VECTOR_COLUMN_PARENT)
+ }
+ } else throw new IllegalStateException("CassandraStorage is not started")
+
+ // ===============================================================
+ // For Map
+ // ===============================================================
+
+ def insertMapStorageEntryFor(name: String, key: AnyRef, element: AnyRef) = if (sessions.isDefined) {
+ sessions.get.withSession {
+ _ ++| (name,
+ new ColumnPath(MAP_COLUMN_PARENT.getColumn_family, null, serializer.out(key)),
+ serializer.out(element),
+ System.currentTimeMillis,
+ CONSISTENCY_LEVEL)
+ }
+ } else throw new IllegalStateException("CassandraStorage is not started")
+
+ def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[AnyRef, AnyRef]]) = if (sessions.isDefined) {
+ val cf2columns: java.util.Map[String, java.util.List[Column]] = new java.util.HashMap
+ for (entry <- entries) {
+ val columns: java.util.List[Column] = new java.util.ArrayList
+ columns.add(new Column(serializer.out(entry._1), serializer.out(entry._2), System.currentTimeMillis))
+ cf2columns.put(MAP_COLUMN_PARENT.getColumn_family, columns)
+ }
+ sessions.get.withSession {
+ _ ++| (new BatchMutation(name, cf2columns), CONSISTENCY_LEVEL)
+ }
+ } else throw new IllegalStateException("CassandraStorage is not started")
+
+ def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef] = if (sessions.isDefined) {
+ try {
+ val column: Option[Column] = sessions.get.withSession {
+ _ | (name, new ColumnPath(MAP_COLUMN_PARENT.getColumn_family, null, serializer.out(key)))
+ }
+ if (column.isDefined) Some(serializer.in(column.get.value, None))
+ else None
+ } catch {
+ case e =>
+ e.printStackTrace
+ None
+ }
+ } else throw new IllegalStateException("CassandraStorage is not started")
+
+ def getMapStorageFor(name: String): List[Tuple2[AnyRef, AnyRef]] = if (sessions.isDefined) {
+ throw new UnsupportedOperationException
+ /*
+ val columns = server.get_columns_since(name, MAP_COLUMN_FAMILY, -1)
.toArray.toList.asInstanceOf[List[org.apache.cassandra.service.column_t]]
for {
column <- columns
- col = (column.columnName, serializer.in(column.value, None))
+ col = (column.columnName, column.value)
} yield col
- }
-
- def getMapStorageSizeFor(name: String): Int =
- server.get_column_count(TABLE_NAME, name, MAP_COLUMN_FAMILY)
+ */
+ } else throw new IllegalStateException("CassandraStorage is not started")
- def removeMapStorageFor(name: String) =
- server.remove(TABLE_NAME, name, MAP_COLUMN_FAMILY, System.currentTimeMillis, BLOCKING_CALL)
-
- def getMapStorageRangeFor(name: String, start: Int, count: Int): List[Tuple2[String, AnyRef]] = {
- server.get_slice(TABLE_NAME, name, MAP_COLUMN_FAMILY, IS_ASCENDING, count)
- .toArray.toList.asInstanceOf[List[Tuple2[String, AnyRef]]]
- }
-}
-
-class CassandraThriftServer(server: CassandraServer) extends Logging {
- case object Start
- case object Stop
-
- private[this] val serverEngine: TThreadPoolServer = try {
- val pidFile = kernel.Kernel.config.getString("akka.storage.cassandra.thrift-server.pidfile", "akka.pid")
- if (pidFile != null) new File(pidFile).deleteOnExit();
- val listenPort = DatabaseDescriptor.getThriftPort
-
- val processor = new Cassandra.Processor(server)
- val tServerSocket = new TServerSocket(listenPort)
- val tProtocolFactory = new TBinaryProtocol.Factory
-
- val options = new TThreadPoolServer.Options
- options.minWorkerThreads = 64
- new TThreadPoolServer(new TProcessorFactory(processor),
- tServerSocket,
- new TTransportFactory,
- new TTransportFactory,
- tProtocolFactory,
- tProtocolFactory,
- options)
- } catch {
- case e =>
- log.error("Could not start up Cassandra thrift service")
- throw e
- }
-
- import scala.actors.Actor._
- private[this] val serverDaemon = actor {
- receive {
- case Start =>
- serverEngine.serve
- log.info("Cassandra thrift service has starting up successfully")
- case Stop =>
- log.info("Cassandra thrift service is shutting down...")
- serverEngine.stop
+ def getMapStorageSizeFor(name: String): Int = if (sessions.isDefined) {
+ sessions.get.withSession {
+ _ |# (name, MAP_COLUMN_PARENT)
}
- }
+ } else throw new IllegalStateException("CassandraStorage is not started")
- def start = serverDaemon ! Start
- def stop = serverDaemon ! Stop
+ def removeMapStorageFor(name: String): Unit = removeMapStorageFor(name, null)
+
+ def removeMapStorageFor(name: String, key: AnyRef): Unit = if (sessions.isDefined) {
+ val keyBytes = if (key == null) null else serializer.out(key)
+ sessions.get.withSession {
+ _ -- (name,
+ new ColumnPathOrParent(MAP_COLUMN_PARENT.getColumn_family, null, keyBytes),
+ System.currentTimeMillis,
+ CONSISTENCY_LEVEL)
+ }
+ } else throw new IllegalStateException("CassandraStorage is not started")
+
+ def getMapStorageRangeFor(name: String, start: Option[AnyRef], finish: Option[AnyRef], count: Int):
+ List[Tuple2[AnyRef, AnyRef]] = if (sessions.isDefined) {
+ val startBytes = if (start.isDefined) serializer.out(start.get) else null
+ val finishBytes = if (finish.isDefined) serializer.out(finish.get) else null
+ val columns: List[Column] = sessions.get.withSession {
+ _ / (name, MAP_COLUMN_PARENT, startBytes, finishBytes, IS_ASCENDING, count, CONSISTENCY_LEVEL)
+ }
+ columns.map(column => (column.name, serializer.in(column.value, None)))
+ } else throw new IllegalStateException("CassandraStorage is not started")
}
+
+/**
+ * NOTE: requires command line options:
+ * -Dcassandra -Dstorage-config=config/ -Dpidfile=akka.pid
+ *
+ * @author Jonas Bonér
+ *
+object EmbeddedCassandraStorage extends Logging {
+val KEYSPACE = "akka"
+val MAP_COLUMN_FAMILY = "map"
+val VECTOR_COLUMN_FAMILY = "vector"
+val REF_COLUMN_FAMILY = "ref:item"
+
+val IS_ASCENDING = true
+
+val RUN_THRIFT_SERVICE = kernel.Kernel.config.getBool("akka.storage.cassandra.thrift-server.service", false)
+val CONSISTENCY_LEVEL = {
+if (kernel.Kernel.config.getBool("akka.storage.cassandra.blocking", true)) 0
+else 1 }
+
+@volatile private[this] var isRunning = false
+private[this] val serializer: Serializer = {
+kernel.Kernel.config.getString("akka.storage.cassandra.storage-format", "java") match {
+case "scala-json" => Serializer.ScalaJSON
+case "java-json" => Serializer.JavaJSON
+case "protobuf" => Serializer.Protobuf
+case "java" => Serializer.Java
+case "sbinary" => throw new UnsupportedOperationException("SBinary serialization protocol is not yet supported for storage")
+case "avro" => throw new UnsupportedOperationException("Avro serialization protocol is not yet supported for storage")
+case unknown => throw new UnsupportedOperationException("Unknown storage serialization protocol [" + unknown + "]")
+}
+}
+
+// TODO: is this server thread-safe or needed to be wrapped up in an actor?
+private[this] val server = classOf[CassandraServer].newInstance.asInstanceOf[CassandraServer]
+
+private[this] var thriftServer: CassandraThriftServer = _
+
+def start = synchronized {
+if (!isRunning) {
+try {
+server.start
+log.info("Cassandra persistent storage has started up successfully");
+} catch {
+case e =>
+log.error("Could not start up Cassandra persistent storage")
+throw e
+}
+if (RUN_THRIFT_SERVICE) {
+thriftServer = new CassandraThriftServer(server)
+thriftServer.start
+}
+isRunning
+}
+}
+
+def stop = if (isRunning) {
+//server.storageService.shutdown
+if (RUN_THRIFT_SERVICE) thriftServer.stop
+}
+
+// ===============================================================
+// For Ref
+// ===============================================================
+
+def insertRefStorageFor(name: String, element: AnyRef) = {
+server.insert(
+KEYSPACE,
+name,
+REF_COLUMN_FAMILY,
+element,
+System.currentTimeMillis,
+CONSISTENCY_LEVEL)
+}
+
+def getRefStorageFor(name: String): Option[AnyRef] = {
+try {
+val column = server.get_column(KEYSPACE, name, REF_COLUMN_FAMILY)
+Some(serializer.in(column.value, None))
+} catch {
+case e =>
+e.printStackTrace
+None }
+}
+
+// ===============================================================
+// For Vector
+// ===============================================================
+
+def insertVectorStorageEntryFor(name: String, element: AnyRef) = {
+server.insert(
+KEYSPACE,
+name,
+VECTOR_COLUMN_FAMILY + ":" + getVectorStorageSizeFor(name),
+element,
+System.currentTimeMillis,
+CONSISTENCY_LEVEL)
+}
+
+def getVectorStorageEntryFor(name: String, index: Int): AnyRef = {
+try {
+val column = server.get_column(KEYSPACE, name, VECTOR_COLUMN_FAMILY + ":" + index)
+serializer.in(column.value, None)
+} catch {
+case e =>
+e.printStackTrace
+throw new Predef.NoSuchElementException(e.getMessage)
+}
+}
+
+def getVectorStorageRangeFor(name: String, start: Int, count: Int): List[AnyRef] =
+server.get_slice(KEYSPACE, name, VECTOR_COLUMN_FAMILY, IS_ASCENDING, count)
+.toArray.toList.asInstanceOf[List[Tuple2[String, AnyRef]]].map(tuple => tuple._2)
+
+def getVectorStorageSizeFor(name: String): Int =
+server.get_column_count(KEYSPACE, name, VECTOR_COLUMN_FAMILY)
+
+// ===============================================================
+// For Map
+// ===============================================================
+
+def insertMapStorageEntryFor(name: String, key: String, value: AnyRef) = {
+server.insert(
+KEYSPACE, name,
+MAP_COLUMN_FAMILY + ":" + key,
+serializer.out(value),
+System.currentTimeMillis,
+CONSISTENCY_LEVEL)
+}
+
+def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[String, AnyRef]]) = {
+import java.util.{ Map, HashMap, List, ArrayList }
+val columns: Map[String, List[column_t]] = new HashMap
+for (entry <- entries) {
+val cls: List[column_t] = new ArrayList
+cls.add(new column_t(entry._1, serializer.out(entry._2), System.currentTimeMillis))
+columns.put(MAP_COLUMN_FAMILY, cls)
+}
+server.batch_insert(new BatchMutation(
+KEYSPACE, name,
+columns),
+CONSISTENCY_LEVEL)
+}
+
+def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef] = {
+try {
+val column = server.get_column(KEYSPACE, name, MAP_COLUMN_FAMILY + ":" + key)
+Some(serializer.in(column.value, None))
+} catch {
+case e =>
+e.printStackTrace
+None
+}
+}
+
+def getMapStorageFor(name: String): List[Tuple2[String, AnyRef]] = {
+val columns = server.get_columns_since(KEYSPACE, name, MAP_COLUMN_FAMILY, -1)
+.toArray.toList.asInstanceOf[List[org.apache.cassandra.service.column_t]]
+for {
+column <- columns
+col = (column.columnName, serializer.in(column.value, None))
+} yield col
+}
+
+def getMapStorageSizeFor(name: String): Int =
+server.get_column_count(KEYSPACE, name, MAP_COLUMN_FAMILY)
+
+def removeMapStorageFor(name: String) =
+server.remove(KEYSPACE, name, MAP_COLUMN_FAMILY, System.currentTimeMillis, CONSISTENCY_LEVEL)
+
+def getMapStorageRangeFor(name: String, start: Int, count: Int): List[Tuple2[String, AnyRef]] = {
+server.get_slice(KEYSPACE, name, MAP_COLUMN_FAMILY, IS_ASCENDING, count)
+.toArray.toList.asInstanceOf[List[Tuple2[String, AnyRef]]]
+}
+}
+
+
+class CassandraThriftServer(server: CassandraServer) extends Logging {
+case object Start
+case object Stop
+
+private[this] val serverEngine: TThreadPoolServer = try {
+val pidFile = kernel.Kernel.config.getString("akka.storage.cassandra.thrift-server.pidfile", "akka.pid")
+if (pidFile != null) new File(pidFile).deleteOnExit();
+val listenPort = DatabaseDescriptor.getThriftPort
+
+val processor = new Cassandra.Processor(server)
+val tServerSocket = new TServerSocket(listenPort)
+val tProtocolFactory = new TBinaryProtocol.Factory
+
+val options = new TThreadPoolServer.Options
+options.minWorkerThreads = 64
+new TThreadPoolServer(new TProcessorFactory(processor),
+tServerSocket,
+new TTransportFactory,
+new TTransportFactory,
+tProtocolFactory,
+tProtocolFactory,
+options)
+} catch {
+case e =>
+log.error("Could not start up Cassandra thrift service")
+throw e
+}
+
+import scala.actors.Actor._
+private[this] val serverDaemon = actor {
+receive {
+case Start =>
+serverEngine.serve
+log.info("Cassandra thrift service has starting up successfully")
+case Stop =>
+log.info("Cassandra thrift service is shutting down...")
+serverEngine.stop
+}
+}
+
+def start = serverDaemon ! Start
+def stop = serverDaemon ! Stop
+}
+ */
diff --git a/kernel/src/main/scala/state/Pool.scala b/kernel/src/main/scala/state/Pool.scala
new file mode 100644
index 0000000000..6391645562
--- /dev/null
+++ b/kernel/src/main/scala/state/Pool.scala
@@ -0,0 +1,96 @@
+/**
+ * Copyright (C) 2009 Scalable Solutions.
+ */
+
+package se.scalablesolutions.akka.kernel.state
+
+import org.apache.commons.pool._
+import org.apache.commons.pool.impl._
+
+import org.apache.thrift.transport._
+
+trait Pool[T] extends java.io.Closeable {
+ def borrowObject: T
+ def returnObject(t: T): Unit
+ def invalidateObject(t: T): Unit
+ def addObject: Unit
+ def getNumIdle: Int
+ def getNumActive: Int
+ def clear: Unit
+ def setFactory(factory: PoolItemFactory[T]): Unit
+}
+
+trait PoolFactory[T] {
+ def createPool: Pool[T]
+}
+
+trait PoolItemFactory[T] {
+ def makeObject: T
+ def destroyObject(t: T): Unit
+ def validateObject(t: T): Boolean
+ def activateObject(t: T): Unit
+ def passivateObject(t: T): Unit
+}
+
+trait PoolBridge[T, OP <: ObjectPool] extends Pool[T] {
+ val impl: OP
+ override def borrowObject: T = impl.borrowObject.asInstanceOf[T]
+ override def returnObject(t: T) = impl.returnObject(t)
+ override def invalidateObject(t: T) = impl.invalidateObject(t)
+ override def addObject = impl.addObject
+ override def getNumIdle: Int = impl.getNumIdle
+ override def getNumActive: Int = impl.getNumActive
+ override def clear: Unit = impl.clear
+ override def close: Unit = impl.close
+ override def setFactory(factory: PoolItemFactory[T]) = impl.setFactory(toPoolableObjectFactory(factory))
+
+ def toPoolableObjectFactory[T](pif: PoolItemFactory[T]) = new PoolableObjectFactory {
+ def makeObject: Object = pif.makeObject.asInstanceOf[Object]
+ def destroyObject(o: Object): Unit = pif.destroyObject(o.asInstanceOf[T])
+ def validateObject(o: Object): Boolean = pif.validateObject(o.asInstanceOf[T])
+ def activateObject(o: Object): Unit = pif.activateObject(o.asInstanceOf[T])
+ def passivateObject(o: Object): Unit = pif.passivateObject(o.asInstanceOf[T])
+ }
+}
+
+object StackPool {
+ def apply[T](factory: PoolItemFactory[T]) = new PoolBridge[T,StackObjectPool] {
+ val impl = new StackObjectPool(toPoolableObjectFactory(factory))
+ }
+
+ def apply[T](factory: PoolItemFactory[T], maxIdle: Int) = new PoolBridge[T,StackObjectPool] {
+ val impl = new StackObjectPool(toPoolableObjectFactory(factory),maxIdle)
+ }
+
+ def apply[T](factory: PoolItemFactory[T], maxIdle: Int, initIdleCapacity: Int) = new PoolBridge[T,StackObjectPool] {
+ val impl = new StackObjectPool(toPoolableObjectFactory(factory),maxIdle,initIdleCapacity)
+ }
+}
+
+object SoftRefPool {
+ def apply[T](factory: PoolItemFactory[T]) = new PoolBridge[T,SoftReferenceObjectPool] {
+ val impl = new SoftReferenceObjectPool(toPoolableObjectFactory(factory))
+ }
+
+ def apply[T](factory: PoolItemFactory[T], initSize: Int) = new PoolBridge[T,SoftReferenceObjectPool] {
+ val impl = new SoftReferenceObjectPool(toPoolableObjectFactory(factory),initSize)
+ }
+}
+
+trait TransportFactory[T <: TTransport] extends PoolItemFactory[T] {
+ def createTransport: T
+ def makeObject: T = createTransport
+ def destroyObject(transport: T): Unit = transport.close
+ def validateObject(transport: T) = transport.isOpen
+ def activateObject(transport: T): Unit = if( !transport.isOpen ) transport.open else ()
+ def passivateObject(transport: T): Unit = transport.flush
+}
+
+case class SocketProvider(val host: String, val port: Int) extends TransportFactory[TSocket] {
+ def createTransport = {
+ val t = new TSocket(host, port)
+ t.open
+ t
+ }
+}
+
diff --git a/kernel/src/main/scala/state/State.scala b/kernel/src/main/scala/state/State.scala
index 8967ed3dd8..46114eb1bc 100644
--- a/kernel/src/main/scala/state/State.scala
+++ b/kernel/src/main/scala/state/State.scala
@@ -37,7 +37,7 @@ object TransactionalState extends TransactionalState
*
*/
class TransactionalState {
- def newPersistentMap(config: PersistentStorageConfig): TransactionalMap[String, AnyRef] = config match {
+ def newPersistentMap(config: PersistentStorageConfig): TransactionalMap[AnyRef, AnyRef] = config match {
case CassandraStorageConfig() => new CassandraPersistentTransactionalMap
case TerracottaStorageConfig() => throw new UnsupportedOperationException
case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException
@@ -174,7 +174,7 @@ abstract class PersistentTransactionalMap[K, V] extends TransactionalMap[K, V] {
// FIXME: need to handle remove in another changeSet
protected[kernel] val changeSet = new HashMap[K, V]
- def getRange(start: Int, count: Int)
+ def getRange(start: Option[AnyRef], count: Int)
// ---- For Transactional ----
override def begin = {}
@@ -188,11 +188,6 @@ abstract class PersistentTransactionalMap[K, V] extends TransactionalMap[K, V] {
None // always return None to speed up writes (else need to go to DB to get
}
- override def remove(key: K) = {
- verifyTransaction
- changeSet -= key
- }
-
override def -=(key: K) = remove(key)
override def update(key: K, value: V) = put(key, value)
@@ -203,12 +198,21 @@ abstract class PersistentTransactionalMap[K, V] extends TransactionalMap[K, V] {
*
* @author Jonas Bonér
*/
-class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[String, AnyRef] {
+class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[AnyRef, AnyRef] {
- override def getRange(start: Int, count: Int) = {
+ override def remove(key: AnyRef) = {
+ verifyTransaction
+ if (changeSet.contains(key)) changeSet -= key
+ else CassandraStorage.removeMapStorageFor(uuid, key)
+ }
+
+ override def getRange(start: Option[AnyRef], count: Int) =
+ getRange(start, None, count)
+
+ def getRange(start: Option[AnyRef], finish: Option[AnyRef], count: Int) = {
verifyTransaction
try {
- CassandraStorage.getMapStorageRangeFor(uuid, start, count)
+ CassandraStorage.getMapStorageRangeFor(uuid, start, finish, count)
} catch {
case e: Exception => Nil
}
@@ -230,7 +234,7 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Str
}
}
- override def contains(key: String): Boolean = {
+ override def contains(key: AnyRef): Boolean = {
try {
verifyTransaction
CassandraStorage.getMapStorageEntryFor(uuid, key).isDefined
@@ -249,7 +253,7 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Str
}
// ---- For scala.collection.mutable.Map ----
- override def get(key: String): Option[AnyRef] = {
+ override def get(key: AnyRef): Option[AnyRef] = {
verifyTransaction
// if (changeSet.contains(key)) changeSet.get(key)
// else {
@@ -262,16 +266,16 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Str
//}
}
- override def elements: Iterator[Tuple2[String, AnyRef]] = {
+ override def elements: Iterator[Tuple2[AnyRef, AnyRef]] = {
//verifyTransaction
- new Iterator[Tuple2[String, AnyRef]] {
- private val originalList: List[Tuple2[String, AnyRef]] = try {
+ new Iterator[Tuple2[AnyRef, AnyRef]] {
+ private val originalList: List[Tuple2[AnyRef, AnyRef]] = try {
CassandraStorage.getMapStorageFor(uuid)
} catch {
case e: Throwable => Nil
}
private var elements = originalList.reverse
- override def next: Tuple2[String, AnyRef]= synchronized {
+ override def next: Tuple2[AnyRef, AnyRef]= synchronized {
val element = elements.head
elements = elements.tail
element
@@ -391,9 +395,12 @@ class CassandraPersistentTransactionalVector extends PersistentTransactionalVect
else CassandraStorage.getVectorStorageEntryFor(uuid, index)
}
- override def getRange(start: Int, count: Int): List[AnyRef] = {
+ override def getRange(start: Int, count: Int): List[AnyRef] =
+ getRange(Some(start), None, count)
+
+ def getRange(start: Option[Int], finish: Option[Int], count: Int): List[AnyRef] = {
verifyTransaction
- CassandraStorage.getVectorStorageRangeFor(uuid, start, count)
+ CassandraStorage.getVectorStorageRangeFor(uuid, start, finish, count)
}
override def length: Int = {
@@ -478,4 +485,4 @@ class CassandraPersistentTransactionalRef extends TransactionalRef[AnyRef] {
if (ref.isDefined) ref
else default
}
-}
\ No newline at end of file
+}
diff --git a/kernel/src/main/scala/util/Helpers.scala b/kernel/src/main/scala/util/Helpers.scala
index cc629e0801..6d43f06030 100644
--- a/kernel/src/main/scala/util/Helpers.scala
+++ b/kernel/src/main/scala/util/Helpers.scala
@@ -4,15 +4,9 @@
package se.scalablesolutions.akka.kernel.util
-import java.io.UnsupportedEncodingException
-import java.security.{NoSuchAlgorithmException, MessageDigest}
+import java.security.MessageDigest
import java.util.concurrent.locks.ReentrantReadWriteLock
-import scala.actors._
-import scala.actors.Actor._
-
-import net.lag.logging.Logger
-
class SystemFailure(cause: Throwable) extends RuntimeException(cause)
/**
@@ -20,7 +14,18 @@ class SystemFailure(cause: Throwable) extends RuntimeException(cause)
*/
object Helpers extends Logging {
- def getDigestFor(s: String) = {
+ implicit def null2Option[T](t: T): Option[T] = if (t != null) Some(t) else None
+
+ def intToBytes(value: Int): Array[Byte] = {
+ val bytes = new Array[Byte](4)
+ bytes(0) = (value >>> 24).asInstanceOf[Byte]
+ bytes(1) = (value >>> 16).asInstanceOf[Byte]
+ bytes(2) = (value >>> 8).asInstanceOf[Byte]
+ bytes(3) = value.asInstanceOf[Byte]
+ bytes
+ }
+
+ def getMD5For(s: String) = {
val digest = MessageDigest.getInstance("MD5")
digest.update(s.getBytes("ASCII"))
val bytes = digest.digest
@@ -59,51 +64,5 @@ object Helpers extends Logging {
}
}
}
-
- // ================================================
- // implicit conversion between regular actor and actor with a type future
- implicit def actorWithFuture(a: Actor) = new ActorWithTypedFuture(a)
-
- abstract class FutureWithTimeout[T](ch: InputChannel[T]) extends Future[T](ch) {
- def receiveWithin(timeout: Int) : Option[T]
- override def respond(f: T => Unit): Unit = throw new UnsupportedOperationException("Does not support the Responder API")
- }
-
- def receiveOrFail[T](future: => FutureWithTimeout[T], timeout: Int, errorHandler: => T): T = {
- future.receiveWithin(timeout) match {
- case None => errorHandler
- case Some(reply) => reply
- }
- }
-
- class ActorWithTypedFuture(a: Actor) {
- require(a != null)
-
- def !!: FutureWithTimeout[A] = {
- val ftch = new Channel[A](Actor.self)
- a.send(msg, ftch.asInstanceOf[OutputChannel[Any]])
- new FutureWithTimeout[A](ftch) {
- def apply() =
- if (isSet) value.get.asInstanceOf[A]
- else ch.receive {
- case a =>
- value = Some(a)
- value.get.asInstanceOf[A]
- }
- def isSet = receiveWithin(0).isDefined
- def receiveWithin(timeout: Int): Option[A] = value match {
- case None => ch.receiveWithin(timeout) {
- case TIMEOUT =>
- log.debug("Future timed out while waiting for actor [%s]", a)
- None
- case a =>
- value = Some(a)
- value.asInstanceOf[Option[A]]
- }
- case a => a.asInstanceOf[Option[A]]
- }
- }
- }
- }
}
diff --git a/kernel/src/test/scala/AllTest.scala b/kernel/src/test/scala/AllTest.scala
index b0ef909aae..a225bfb080 100644
--- a/kernel/src/test/scala/AllTest.scala
+++ b/kernel/src/test/scala/AllTest.scala
@@ -16,7 +16,7 @@ object AllTest extends TestCase {
suite.addTestSuite(classOf[EventBasedThreadPoolDispatcherTest])
suite.addTestSuite(classOf[ActorSpec])
suite.addTestSuite(classOf[RemoteActorSpec])
- suite.addTestSuite(classOf[PersistentActorSpec])
+ //suite.addTestSuite(classOf[PersistentActorSpec])
suite.addTestSuite(classOf[InMemoryActorSpec])
//suite.addTestSuite(classOf[TransactionClasherSpec])
suite
diff --git a/lib/cassandra-0.4.0-dev.jar b/lib/cassandra-0.4.0-dev.jar
index 8274103dc3..706336669f 100644
Binary files a/lib/cassandra-0.4.0-dev.jar and b/lib/cassandra-0.4.0-dev.jar differ
diff --git a/lib/javautils-2.7.4-0.1.jar b/lib/javautils-2.7.4-0.1.jar
new file mode 100644
index 0000000000..a0c51bf7da
Binary files /dev/null and b/lib/javautils-2.7.4-0.1.jar differ
diff --git a/lib/netty-3.1.0.CR1.jar b/lib/netty-3.1.0.GA.jar
similarity index 57%
rename from lib/netty-3.1.0.CR1.jar
rename to lib/netty-3.1.0.GA.jar
index f1180b0477..94c9b24902 100644
Binary files a/lib/netty-3.1.0.CR1.jar and b/lib/netty-3.1.0.GA.jar differ
diff --git a/pom.xml b/pom.xml
index 4231f8c073..f321ff0e88 100644
--- a/pom.xml
+++ b/pom.xml
@@ -22,9 +22,42 @@