diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 3f672d1803..98501c8f6f 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -1,6 +1,6 @@ -################################# -# Akka Actor Kernel Config File # -################################# +#################### +# Akka Config File # +#################### # This file has all the default settings, so all these could be remove with no visible effect. # Modify as needed. @@ -21,9 +21,7 @@ # supervisor bootstrap, should be defined in default constructor timeout = 5000 # default timeout for future based invocations - concurrent-mode = off # if turned on, then the same actor instance is allowed to execute concurrently - - # e.g. departing from the actor model for better performance - serialize-messages = on # does a deep clone of (non-primitive) messages to ensure immutability + serialize-messages = off # does a deep clone of (non-primitive) messages to ensure immutability @@ -49,17 +47,14 @@ - system = "cassandra" # Options: cassandra (coming: terracotta, redis, tokyo-cabinet, tokyo-tyrant, voldemort, memcached, hazelcast) + system = "cassandra" # Options: cassandra (coming: terracotta, mongodb, redis, tokyo-cabinet, voldemort, memcached) service = on - storage-format = "java" # Options: java, scala-json, java-json - blocking = false # inserts and queries should be blocking or not - - - service = on - pidfile = "akka.pid" - + hostname = "localhost" # ip address or hostname of one of the Cassandra cluster's seeds + port = 9160 + storage-format = "binary" # Options: binary, json, simple-json + blocking = false # inserts and queries should be blocking or not diff --git a/embedded-repo/org/apache/cassandra/cassandra/0.4.0-dev/cassandra-0.4.0-dev.jar b/embedded-repo/org/apache/cassandra/cassandra/0.4.0-dev/cassandra-0.4.0-dev.jar index 8274103dc3..d9f762ad2a 100644 Binary files a/embedded-repo/org/apache/cassandra/cassandra/0.4.0-dev/cassandra-0.4.0-dev.jar and b/embedded-repo/org/apache/cassandra/cassandra/0.4.0-dev/cassandra-0.4.0-dev.jar differ diff --git a/kernel/src/main/scala/state/CassandraStorage.scala b/kernel/src/main/scala/state/CassandraStorage.scala index 5cd46ec9a2..872efade73 100644 --- a/kernel/src/main/scala/state/CassandraStorage.scala +++ b/kernel/src/main/scala/state/CassandraStorage.scala @@ -4,7 +4,7 @@ package se.scalablesolutions.akka.kernel.state -import java.io.File +import java.io.{File, Flushable, Closeable} import kernel.util.Logging import serialization.{Serializer, Serializable, SerializationProtocol} @@ -12,11 +12,12 @@ import serialization.{Serializer, Serializable, SerializationProtocol} import org.apache.cassandra.config.DatabaseDescriptor import org.apache.cassandra.service._ -import org.apache.thrift.server.TThreadPoolServer -import org.apache.thrift.protocol.TBinaryProtocol -import org.apache.thrift.transport.TServerSocket -import org.apache.thrift.transport.TTransportFactory +//import org.apache.thrift.server.TThreadPoolServer import org.apache.thrift.TProcessorFactory +import org.apache.thrift.transport._ +import org.apache.thrift._ +import org.apache.thrift.transport._ +import org.apache.thrift.protocol._ /** * NOTE: requires command line options: @@ -25,6 +26,300 @@ import org.apache.thrift.TProcessorFactory *

* @author Jonas Bonér */ +object CassandraStorage extends Logging { + val TABLE_NAME = "akka" + val MAP_COLUMN_FAMILY = "map" + val VECTOR_COLUMN_FAMILY = "vector" + val REF_COLUMN_FAMILY = "ref:item" + + val IS_ASCENDING = true + + import kernel.Kernel.config + + val CASSANDRA_SERVER_HOSTNAME = config.getString("akka.storage.cassandra.hostname", "localhost") + val CASSANDRA_SERVER_PORT = config.getInt("akka.storage.cassandra.port", 9160) + val BLOCKING_CALL = if (config.getBool("akka.storage.cassandra.blocking", true)) 0 + else 1 + + @volatile private[this] var isRunning = false + private[this] val protocol: Protocol = { + config.getString("akka.storage.cassandra.storage-format", "binary") match { + case "binary" => Protocol.Binary + case "json" => Protocol.JSON + case "simple-json" => Protocol.SimpleJSON + case unknown => throw new UnsupportedOperationException("Unknown storage serialization protocol [" + unknown + "]") + } + } + + + private[this] var sessions: Option[CassandraSessionPool[_]] = None + + def start = synchronized { + if (!isRunning) { + try { + sessions = Some(new CassandraSessionPool(StackPool(SocketProvider(CASSANDRA_SERVER_HOSTNAME, CASSANDRA_SERVER_PORT)), protocol)) + log.info("Cassandra persistent storage has started up successfully"); + } catch { + case e => + log.error("Could not start up Cassandra persistent storage") + throw e + } + isRunning + } + } + + def stop = synchronized { + if (isRunning && sessions.isDefined) sessions.get.close + } + + //implicit def strToBytes(s: String) = s.getBytes("UTF-8") + +/* + def insertRefStorageFor(name: String, element: AnyRef) = sessions.withSession { session => { + val user_id = "1" + session ++| ("users", user_id, "base_attributes:name", "Lord Foo Bar", false) + session ++| ("users", user_id, "base_attributes:age", "24", false) + for( i <- session / ("users", user_id, "base_attributes", None, None).toList) println(i) + }} +*/ + // =============================================================== + // For Ref + // =============================================================== + + def insertRefStorageFor(name: String, element: String) = if (sessions.isDefined) { + sessions.get.withSession { + _ ++| ( + TABLE_NAME, + name, + REF_COLUMN_FAMILY, + element, + System.currentTimeMillis, + BLOCKING_CALL) + } + } else throw new IllegalStateException("CassandraStorage is not started") + + def getRefStorageFor(name: String): Option[String] = if (sessions.isDefined) { + try { + val column = sessions.get.withSession { _ | (TABLE_NAME, name, REF_COLUMN_FAMILY) } + Some(column.value) + } catch { + case e => + e.printStackTrace + None + } + } else throw new IllegalStateException("CassandraStorage is not started") + + // =============================================================== + // For Vector + // =============================================================== + + def insertVectorStorageEntryFor(name: String, element: String) = if (sessions.isDefined) { + sessions.get.withSession { + _ ++| ( + TABLE_NAME, + name, + VECTOR_COLUMN_FAMILY + ":" + getVectorStorageSizeFor(name), + element, + System.currentTimeMillis, + BLOCKING_CALL) + } + } else throw new IllegalStateException("CassandraStorage is not started") + + def getVectorStorageEntryFor(name: String, index: Int): String = if (sessions.isDefined) { + try { + val column = sessions.get.withSession { _ | (TABLE_NAME, name, VECTOR_COLUMN_FAMILY + ":" + index) } + column.value + } catch { + case e => + e.printStackTrace + throw new NoSuchElementException(e.getMessage) + } + } else throw new IllegalStateException("CassandraStorage is not started") + + def getVectorStorageRangeFor(name: String, start: Int, count: Int): List[String] = if (sessions.isDefined) { + sessions.get.withSession { _ / (TABLE_NAME, name, VECTOR_COLUMN_FAMILY, IS_ASCENDING, count) }.map(_.value) + } else throw new IllegalStateException("CassandraStorage is not started") + + def getVectorStorageSizeFor(name: String): Int = if (sessions.isDefined) { + sessions.get.withSession { _ |# (TABLE_NAME, name, VECTOR_COLUMN_FAMILY) } + } else throw new IllegalStateException("CassandraStorage is not started") + + // =============================================================== + // For Map + // =============================================================== + + def insertMapStorageEntryFor(name: String, key: String, value: String) = if (sessions.isDefined) { + sessions.get.withSession { + _ ++| ( + TABLE_NAME, + name, + MAP_COLUMN_FAMILY + ":" + key, + value, + System.currentTimeMillis, + BLOCKING_CALL) + } + } else throw new IllegalStateException("CassandraStorage is not started") + + def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[String, String]]) = if (sessions.isDefined) { + import java.util.{Map, HashMap, List, ArrayList} + val columns: Map[String, List[column_t]] = new HashMap + for (entry <- entries) { + val cls: List[column_t] = new ArrayList + cls.add(new column_t(entry._1, entry._2, System.currentTimeMillis)) + columns.put(MAP_COLUMN_FAMILY, cls) + } + sessions.get.withSession { + _ ++| ( + new batch_mutation_t( + TABLE_NAME, + name, + columns), + BLOCKING_CALL) + } + } else throw new IllegalStateException("CassandraStorage is not started") + + def getMapStorageEntryFor(name: String, key: String): Option[String] = if (sessions.isDefined) { + try { + val column = sessions.get.withSession { _ | (TABLE_NAME, name, MAP_COLUMN_FAMILY + ":" + key) } + Some(column.value) + } catch { + case e => + e.printStackTrace + None + } + } else throw new IllegalStateException("CassandraStorage is not started") + + /* + def getMapStorageFor(name: String): List[Tuple2[String, String]] = if (sessions.isDefined) { + val columns = server.get_columns_since(TABLE_NAME, name, MAP_COLUMN_FAMILY, -1) + .toArray.toList.asInstanceOf[List[org.apache.cassandra.service.column_t]] + for { + column <- columns + col = (column.columnName, column.value) + } yield col + } else throw new IllegalStateException("CassandraStorage is not started") + */ + + def getMapStorageSizeFor(name: String): Int = if (sessions.isDefined) { + sessions.get.withSession { _ |# (TABLE_NAME, name, MAP_COLUMN_FAMILY) } + } else throw new IllegalStateException("CassandraStorage is not started") + + def removeMapStorageFor(name: String) = if (sessions.isDefined) { + sessions.get.withSession { _ -- (TABLE_NAME, name, MAP_COLUMN_FAMILY, System.currentTimeMillis, BLOCKING_CALL) } + } else throw new IllegalStateException("CassandraStorage is not started") + + def getMapStorageRangeFor(name: String, start: Int, count: Int): List[Tuple2[String, String]] = if (sessions.isDefined) { + sessions.get.withSession { _ / (TABLE_NAME, name, MAP_COLUMN_FAMILY, IS_ASCENDING, count) }.toArray.toList.asInstanceOf[List[Tuple2[String, String]]] + } else throw new IllegalStateException("CassandraStorage is not started") +} + +trait CassandraSession extends Closeable with Flushable { + import scala.collection.jcl.Conversions._ + import org.scala_tools.javautils.Imports._ + + private implicit def null2Option[T](t: T): Option[T] = if(t != null) Some(t) else None + + protected val client: Cassandra.Client + + val obtainedAt: Long + + def /(tableName: String, key: String, columnParent: String, start: Option[Int],end: Option[Int]): List[column_t] = + client.get_slice(tableName, key, columnParent, start.getOrElse(-1),end.getOrElse(-1)).toList + + def /(tableName: String, key: String, columnParent: String, colNames: List[String]): List[column_t] = + client.get_slice_by_names(tableName, key, columnParent, colNames.asJava ).toList + + def |(tableName: String, key: String, colPath: String): Option[column_t] = + client.get_column(tableName, key, colPath) + + def |#(tableName: String, key: String, columnParent: String): Int = + client.get_column_count(tableName, key, columnParent) + + def ++|(tableName: String, key: String, columnPath: String, cellData: Array[Byte], timestamp: Long, block: Int) = + client.insert(tableName, key, columnPath, cellData,timestamp,block) + + def ++|(tableName: String, key: String, columnPath: String, cellData: Array[Byte], block: Int) = + client.insert(tableName,key,columnPath,cellData,obtainedAt,block) + + def ++|(batch: batch_mutation_t, block: Int) = + client.batch_insert(batch, block) + + def --(tableName: String, key: String, columnPathOrParent: String, timestamp: Long, block: Int) = + client.remove(tableName, key, columnPathOrParent, timestamp, block) + + def --(tableName: String, key: String, columnPathOrParent: String, block: Int) = + client.remove(tableName, key, columnPathOrParent, obtainedAt, block) + + def /@(tableName: String, key: String, columnParent: String, timestamp: Long): List[column_t] = + client.get_columns_since(tableName, key, columnParent, timestamp).toList + + def /^(tableName: String, key: String, columnFamily: String, start: Option[Int], end: Option[Int], count: Int ): List[superColumn_t] = + client.get_slice_super(tableName, key,columnFamily, start.getOrElse(-1), end.getOrElse(-1)).toList //TODO upgrade thrift interface to support count + + def /^(tableName: String, key: String, columnFamily: String, superColNames: List[String]): List[superColumn_t] = + client.get_slice_super_by_names(tableName, key, columnFamily, superColNames.asJava).toList + + def |^(tableName: String, key: String, superColumnPath: String): Option[superColumn_t] = + client.get_superColumn(tableName,key,superColumnPath) + + def ++|^ (batch: batch_mutation_super_t, block: Int) = + client.batch_insert_superColumn(batch, block) + + def keys(tableName: String, startsWith: String, stopsAt: String, maxResults: Option[Int]): List[String] = + client.get_key_range(tableName, startsWith, stopsAt, maxResults.getOrElse(-1)).toList + + def property(name: String): String = client.getStringProperty(name) + def properties(name: String): List[String] = client.getStringListProperty(name).toList + def describeTable(tableName: String) = client.describeTable(tableName) + + def ?(query: String) = client.executeQuery(query) +} + +class CassandraSessionPool[T <: TTransport](transportPool: Pool[T], inputProtocol: Protocol, outputProtocol: Protocol) extends Closeable { + def this(transportPool: Pool[T], ioProtocol: Protocol) = this(transportPool,ioProtocol,ioProtocol) + + def newSession: CassandraSession = { + val t = transportPool.borrowObject + val c = new Cassandra.Client(inputProtocol(t),outputProtocol(t)) + new CassandraSession { + val client = c + val obtainedAt = System.currentTimeMillis + def flush = t.flush + def close = transportPool.returnObject(t) + } + } + + def withSession[R](body: CassandraSession => R) = { + val session = newSession + try { + val result = body(session) + session.flush + result + } finally { + session.close + } + } + + def close = transportPool.close +} + +sealed abstract class Protocol(val factory: TProtocolFactory) { + def apply(transport: TTransport) = factory.getProtocol(transport) +} + +object Protocol { + object Binary extends Protocol(new TBinaryProtocol.Factory) + object SimpleJSON extends Protocol(new TSimpleJSONProtocol.Factory) + object JSON extends Protocol(new TJSONProtocol.Factory) +} + +/** + * NOTE: requires command line options: + *
+ * -Dcassandra -Dstorage-config=config/ -Dpidfile=akka.pid + *

+ * @author Jonas Bonér + * object EmbeddedCassandraStorage extends Logging { val TABLE_NAME = "akka" val MAP_COLUMN_FAMILY = "map" @@ -88,7 +383,7 @@ object EmbeddedCassandraStorage extends Logging { TABLE_NAME, name, REF_COLUMN_FAMILY, - serializer.out(element), + element, System.currentTimeMillis, BLOCKING_CALL) } @@ -113,7 +408,7 @@ object EmbeddedCassandraStorage extends Logging { TABLE_NAME, name, VECTOR_COLUMN_FAMILY + ":" + getVectorStorageSizeFor(name), - serializer.out(element), + element, System.currentTimeMillis, BLOCKING_CALL) } @@ -197,196 +492,6 @@ object EmbeddedCassandraStorage extends Logging { } } -/** - * NOTE: requires command line options: - *
- * -Dcassandra -Dstorage-config=config/ -Dpidfile=akka.pid - *

- * @author Jonas Bonér - */ -object CassandraStorage extends Logging { - val TABLE_NAME = "akka" - val MAP_COLUMN_FAMILY = "map" - val VECTOR_COLUMN_FAMILY = "vector" - val REF_COLUMN_FAMILY = "ref:item" - - val IS_ASCENDING = true - - val RUN_THRIFT_SERVICE = kernel.Kernel.config.getBool("akka.storage.cassandra.thrift-server.service", false) - val BLOCKING_CALL = { - if (kernel.Kernel.config.getBool("akka.storage.cassandra.blocking", true)) 0 - else 1 - } - - @volatile private[this] var isRunning = false - private[this] val serializer: Serializer = { - kernel.Kernel.config.getString("akka.storage.cassandra.storage-format", "java") match { - case "scala-json" => Serializer.ScalaJSON - case "java-json" => Serializer.JavaJSON - //case "sbinary" => Serializer.SBinary - case "java" => Serializer.Java - case "avro" => throw new UnsupportedOperationException("Avro serialization protocol is not yet supported") - case unknown => throw new UnsupportedOperationException("Unknown storage serialization protocol [" + unknown + "]") - } - } - - implicit def strToBytes(s : String) = s.getBytes("UTF-8") - import scala.collection.jcl.Conversions._ - import se.foldleft.pool._ - import se.foldleft.cassidy._ - val cassidy = new Cassidy(StackPool(SocketProvider("localhost", 9160)), Protocol.Binary) // or JSON - cassidy.doWork { s => { - val user_id = "1" - s.++|("users", user_id, "base_attributes:name", "Lord Foo Bar", false) - s.++|("users", user_id, "base_attributes:age", "24", false) - for( i <- s./("users", user_id, "base_attributes", None, None).toList) println(i) - }} - - // TODO: is this server thread-safe or needed to be wrapped up in an actor? - private[this] val server = classOf[CassandraServer].newInstance.asInstanceOf[CassandraServer] - - private[this] var thriftServer: CassandraThriftServer = _ - - def start = synchronized { - if (!isRunning) { - try { - server.start - log.info("Cassandra persistent storage has started up successfully"); - } catch { - case e => - log.error("Could not start up Cassandra persistent storage") - throw e - } - if (RUN_THRIFT_SERVICE) { - thriftServer = new CassandraThriftServer(server) - thriftServer.start - } - isRunning - } - } - - def stop = if (isRunning) { - //server.storageService.shutdown - if (RUN_THRIFT_SERVICE) thriftServer.stop - } - - // =============================================================== - // For Ref - // =============================================================== - - def insertRefStorageFor(name: String, element: AnyRef) = { - server.insert( - TABLE_NAME, - name, - REF_COLUMN_FAMILY, - serializer.out(element), - System.currentTimeMillis, - BLOCKING_CALL) - } - - def getRefStorageFor(name: String): Option[AnyRef] = { - try { - val column = server.get_column(TABLE_NAME, name, REF_COLUMN_FAMILY) - Some(serializer.in(column.value, None)) - } catch { - case e => - e.printStackTrace - None - } - } - - // =============================================================== - // For Vector - // =============================================================== - - def insertVectorStorageEntryFor(name: String, element: AnyRef) = { - server.insert( - TABLE_NAME, - name, - VECTOR_COLUMN_FAMILY + ":" + getVectorStorageSizeFor(name), - serializer.out(element), - System.currentTimeMillis, - BLOCKING_CALL) - } - - def getVectorStorageEntryFor(name: String, index: Int): AnyRef = { - try { - val column = server.get_column(TABLE_NAME, name, VECTOR_COLUMN_FAMILY + ":" + index) - serializer.in(column.value, None) - } catch { - case e => - e.printStackTrace - throw new Predef.NoSuchElementException(e.getMessage) - } - } - - def getVectorStorageRangeFor(name: String, start: Int, count: Int): List[AnyRef] = - server.get_slice(TABLE_NAME, name, VECTOR_COLUMN_FAMILY, IS_ASCENDING, count) - .toArray.toList.asInstanceOf[List[Tuple2[String, AnyRef]]].map(tuple => tuple._2) - - def getVectorStorageSizeFor(name: String): Int = - server.get_column_count(TABLE_NAME, name, VECTOR_COLUMN_FAMILY) - - // =============================================================== - // For Map - // =============================================================== - - def insertMapStorageEntryFor(name: String, key: String, value: AnyRef) = { - server.insert( - TABLE_NAME, - name, - MAP_COLUMN_FAMILY + ":" + key, - serializer.out(value), - System.currentTimeMillis, - BLOCKING_CALL) - } - - def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[String, AnyRef]]) = { - import java.util.{Map, HashMap, List, ArrayList} - val columns: Map[String, List[column_t]] = new HashMap - for (entry <- entries) { - val cls: List[column_t] = new ArrayList - cls.add(new column_t(entry._1, serializer.out(entry._2), System.currentTimeMillis)) - columns.put(MAP_COLUMN_FAMILY, cls) - } - server.batch_insert(new batch_mutation_t( - TABLE_NAME, - name, - columns), - BLOCKING_CALL) - } - - def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef] = { - try { - val column = server.get_column(TABLE_NAME, name, MAP_COLUMN_FAMILY + ":" + key) - Some(serializer.in(column.value, None)) - } catch { - case e => - e.printStackTrace - None - } - } - - def getMapStorageFor(name: String): List[Tuple2[String, AnyRef]] = { - val columns = server.get_columns_since(TABLE_NAME, name, MAP_COLUMN_FAMILY, -1) - .toArray.toList.asInstanceOf[List[org.apache.cassandra.service.column_t]] - for { - column <- columns - col = (column.columnName, serializer.in(column.value, None)) - } yield col - } - - def getMapStorageSizeFor(name: String): Int = - server.get_column_count(TABLE_NAME, name, MAP_COLUMN_FAMILY) - - def removeMapStorageFor(name: String) = - server.remove(TABLE_NAME, name, MAP_COLUMN_FAMILY, System.currentTimeMillis, BLOCKING_CALL) - - def getMapStorageRangeFor(name: String, start: Int, count: Int): List[Tuple2[String, AnyRef]] = { - server.get_slice(TABLE_NAME, name, MAP_COLUMN_FAMILY, IS_ASCENDING, count) - .toArray.toList.asInstanceOf[List[Tuple2[String, AnyRef]]] - } -} class CassandraThriftServer(server: CassandraServer) extends Logging { case object Start @@ -431,3 +536,4 @@ class CassandraThriftServer(server: CassandraServer) extends Logging { def start = serverDaemon ! Start def stop = serverDaemon ! Stop } +*/ diff --git a/kernel/src/main/scala/state/Pool.scala b/kernel/src/main/scala/state/Pool.scala new file mode 100755 index 0000000000..1ba3ba1b3c --- /dev/null +++ b/kernel/src/main/scala/state/Pool.scala @@ -0,0 +1,94 @@ +/** + * Copyright (C) 2009 Scalable Solutions. + */ + +package se.scalablesolutions.akka.kernel.state + +import org.apache.commons.pool._ +import org.apache.commons.pool.impl._ + +trait Pool[T] extends java.io.Closeable { + def borrowObject: T + def returnObject(t: T): Unit + def invalidateObject(t: T): Unit + def addObject: Unit + def getNumIdle: Int + def getNumActive: Int + def clear: Unit + def setFactory(factory: PoolItemFactory[T]): Unit +} + +trait PoolFactory[T] { + def createPool: Pool[T] +} + +trait PoolItemFactory[T] { + def makeObject: T + def destroyObject(t: T): Unit + def validateObject(t: T): Boolean + def activateObject(t: T): Unit + def passivateObject(t: T): Unit +} + +trait PoolBridge[T, OP <: ObjectPool] extends Pool[T] { + val impl: OP + override def borrowObject: T = impl.borrowObject.asInstanceOf[T] + override def returnObject(t: T) = impl.returnObject(t) + override def invalidateObject(t: T) = impl.invalidateObject(t) + override def addObject = impl.addObject + override def getNumIdle: Int = impl.getNumIdle + override def getNumActive: Int = impl.getNumActive + override def clear: Unit = impl.clear + override def close: Unit = impl.close + override def setFactory(factory: PoolItemFactory[T]) = impl.setFactory(toPoolableObjectFactory(factory)) + + def toPoolableObjectFactory[T](pif: PoolItemFactory[T]) = new PoolableObjectFactory { + def makeObject: Object = pif.makeObject.asInstanceOf[Object] + def destroyObject(o: Object): Unit = pif.destroyObject(o.asInstanceOf[T]) + def validateObject(o: Object): Boolean = pif.validateObject(o.asInstanceOf[T]) + def activateObject(o: Object): Unit = pif.activateObject(o.asInstanceOf[T]) + def passivateObject(o: Object): Unit = pif.passivateObject(o.asInstanceOf[T]) + } +} + +object StackPool { + def apply[T](factory: PoolItemFactory[T]) = new PoolBridge[T,StackObjectPool] { + val impl = new StackObjectPool(toPoolableObjectFactory(factory)) + } + + def apply[T](factory: PoolItemFactory[T], maxIdle: Int) = new PoolBridge[T,StackObjectPool] { + val impl = new StackObjectPool(toPoolableObjectFactory(factory),maxIdle) + } + + def apply[T](factory: PoolItemFactory[T], maxIdle: Int, initIdleCapacity: Int) = new PoolBridge[T,StackObjectPool] { + val impl = new StackObjectPool(toPoolableObjectFactory(factory),maxIdle,initIdleCapacity) + } +} + +object SoftRefPool { + def apply[T](factory: PoolItemFactory[T]) = new PoolBridge[T,SoftReferenceObjectPool] { + val impl = new SoftReferenceObjectPool(toPoolableObjectFactory(factory)) + } + + def apply[T](factory: PoolItemFactory[T], initSize: Int) = new PoolBridge[T,SoftReferenceObjectPool] { + val impl = new SoftReferenceObjectPool(toPoolableObjectFactory(factory),initSize) + } +} + +trait TransportFactory[T <: TTransport] extends PoolItemFactory[T] { + def createTransport: T + def makeObject: T = createTransport + def destroyObject(transport: T): Unit = transport.close + def validateObject(transport: T) = transport.isOpen + def activateObject(transport: T): Unit = if( !transport.isOpen ) transport.open else () + def passivateObject(transport: T): Unit = transport.flush +} + +case class SocketProvider(val host: String, val port: Int) extends TransportFactory[TSocket] { + def createTransport = { + val t = new TSocket(host,port) + t.open + t + } +} + diff --git a/kernel/src/main/scala/state/State.scala b/kernel/src/main/scala/state/State.scala index 32a0ea9888..58c1cbfbe8 100644 --- a/kernel/src/main/scala/state/State.scala +++ b/kernel/src/main/scala/state/State.scala @@ -13,7 +13,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap} sealed abstract class TransactionalStateConfig abstract class PersistentStorageConfig extends TransactionalStateConfig -case class EmbeddedCassandraStorageConfig extends PersistentStorageConfig +case class CassandraStorageConfig extends PersistentStorageConfig case class TerracottaStorageConfig extends PersistentStorageConfig case class TokyoCabinetStorageConfig extends PersistentStorageConfig @@ -22,7 +22,7 @@ case class TokyoCabinetStorageConfig extends PersistentStorageConfig *

* Example Scala usage: *

- * val myMap = TransactionalState.newPersistentMap(EmbeddedCassandraStorageConfig)
+ * val myMap = TransactionalState.newPersistentMap(CassandraStorageConfig)
  * 
*/ object TransactionalState extends TransactionalState @@ -33,24 +33,24 @@ object TransactionalState extends TransactionalState * Example Java usage: *
  * TransactionalState state = new TransactionalState();
- * TransactionalMap myMap = state.newPersistentMap(new EmbeddedCassandraStorageConfig());
+ * TransactionalMap myMap = state.newPersistentMap(new CassandraStorageConfig());
  * 
*/ class TransactionalState { def newPersistentMap(config: PersistentStorageConfig): TransactionalMap[String, AnyRef] = config match { - case EmbeddedCassandraStorageConfig() => new CassandraPersistentTransactionalMap + case CassandraStorageConfig() => new CassandraPersistentTransactionalMap case TerracottaStorageConfig() => throw new UnsupportedOperationException case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException } def newPersistentVector(config: PersistentStorageConfig): TransactionalVector[AnyRef] = config match { - case EmbeddedCassandraStorageConfig() => new CassandraPersistentTransactionalVector + case CassandraStorageConfig() => new CassandraPersistentTransactionalVector case TerracottaStorageConfig() => throw new UnsupportedOperationException case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException } def newPersistentRef(config: PersistentStorageConfig): TransactionalRef[AnyRef] = config match { - case EmbeddedCassandraStorageConfig() => new CassandraPersistentTransactionalRef + case CassandraStorageConfig() => new CassandraPersistentTransactionalRef case TerracottaStorageConfig() => throw new UnsupportedOperationException case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException } @@ -208,7 +208,7 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Str override def getRange(start: Int, count: Int) = { verifyTransaction try { - EmbeddedCassandraStorage.getMapStorageRangeFor(uuid, start, count) + CassandraStorage.getMapStorageRangeFor(uuid, start, count) } catch { case e: Exception => Nil } @@ -216,7 +216,7 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Str // ---- For Transactional ---- override def commit = { - EmbeddedCassandraStorage.insertMapStorageEntriesFor(uuid, changeSet.toList) + CassandraStorage.insertMapStorageEntriesFor(uuid, changeSet.toList) changeSet.clear } @@ -224,7 +224,7 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Str override def clear = { verifyTransaction try { - EmbeddedCassandraStorage.removeMapStorageFor(uuid) + CassandraStorage.removeMapStorageFor(uuid) } catch { case e: Exception => {} } @@ -233,7 +233,7 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Str override def contains(key: String): Boolean = { try { verifyTransaction - EmbeddedCassandraStorage.getMapStorageEntryFor(uuid, key).isDefined + CassandraStorage.getMapStorageEntryFor(uuid, key).isDefined } catch { case e: Exception => false } @@ -242,7 +242,7 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Str override def size: Int = { verifyTransaction try { - EmbeddedCassandraStorage.getMapStorageSizeFor(uuid) + CassandraStorage.getMapStorageSizeFor(uuid) } catch { case e: Exception => 0 } @@ -254,7 +254,7 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Str // if (changeSet.contains(key)) changeSet.get(key) // else { val result = try { - EmbeddedCassandraStorage.getMapStorageEntryFor(uuid, key) + CassandraStorage.getMapStorageEntryFor(uuid, key) } catch { case e: Exception => None } @@ -266,7 +266,7 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Str //verifyTransaction new Iterator[Tuple2[String, AnyRef]] { private val originalList: List[Tuple2[String, AnyRef]] = try { - EmbeddedCassandraStorage.getMapStorageFor(uuid) + CassandraStorage.getMapStorageFor(uuid) } catch { case e: Throwable => Nil } @@ -388,17 +388,17 @@ class CassandraPersistentTransactionalVector extends PersistentTransactionalVect override def get(index: Int): AnyRef = { verifyTransaction if (changeSet.size > index) changeSet(index) - else EmbeddedCassandraStorage.getVectorStorageEntryFor(uuid, index) + else CassandraStorage.getVectorStorageEntryFor(uuid, index) } override def getRange(start: Int, count: Int): List[AnyRef] = { verifyTransaction - EmbeddedCassandraStorage.getVectorStorageRangeFor(uuid, start, count) + CassandraStorage.getVectorStorageRangeFor(uuid, start, count) } override def length: Int = { verifyTransaction - EmbeddedCassandraStorage.getVectorStorageSizeFor(uuid) + CassandraStorage.getVectorStorageSizeFor(uuid) } override def apply(index: Int): AnyRef = get(index) @@ -415,7 +415,7 @@ class CassandraPersistentTransactionalVector extends PersistentTransactionalVect // ---- For Transactional ---- override def commit = { // FIXME: should use batch function once the bug is resolved - for (element <- changeSet) EmbeddedCassandraStorage.insertVectorStorageEntryFor(uuid, element) + for (element <- changeSet) CassandraStorage.insertVectorStorageEntryFor(uuid, element) changeSet.clear } } @@ -460,7 +460,7 @@ class TransactionalRef[T] extends Transactional { class CassandraPersistentTransactionalRef extends TransactionalRef[AnyRef] { override def commit = if (ref.isDefined) { - EmbeddedCassandraStorage.insertRefStorageFor(uuid, ref.get) + CassandraStorage.insertRefStorageFor(uuid, ref.get) ref = None } @@ -468,7 +468,7 @@ class CassandraPersistentTransactionalRef extends TransactionalRef[AnyRef] { override def get: Option[AnyRef] = { verifyTransaction - EmbeddedCassandraStorage.getRefStorageFor(uuid) + CassandraStorage.getRefStorageFor(uuid) } override def isDefined: Boolean = get.isDefined diff --git a/kernel/src/test/scala/PersistentActorSpec.scala b/kernel/src/test/scala/PersistentActorSpec.scala index 990d7e5d83..0d8b464dd1 100644 --- a/kernel/src/test/scala/PersistentActorSpec.scala +++ b/kernel/src/test/scala/PersistentActorSpec.scala @@ -7,16 +7,16 @@ import junit.framework.TestCase import kernel.Kernel import kernel.reactor._ -import kernel.state.{EmbeddedCassandraStorageConfig, TransactionalState} +import kernel.state.{CassandraStorageConfig, TransactionalState} import org.junit.{Test, Before} import org.junit.Assert._ class PersistentActor extends Actor { timeout = 100000 makeTransactionRequired - private val mapState = TransactionalState.newPersistentMap(EmbeddedCassandraStorageConfig()) - private val vectorState = TransactionalState.newPersistentVector(EmbeddedCassandraStorageConfig()) - private val refState = TransactionalState.newPersistentRef(EmbeddedCassandraStorageConfig()) + private val mapState = TransactionalState.newPersistentMap(CassandraStorageConfig()) + private val vectorState = TransactionalState.newPersistentVector(CassandraStorageConfig()) + private val refState = TransactionalState.newPersistentRef(CassandraStorageConfig()) def receive: PartialFunction[Any, Unit] = { case GetMapState(key) => diff --git a/lib/cassandra-0.4.0-dev.jar b/lib/cassandra-0.4.0-dev.jar index 8274103dc3..d9f762ad2a 100644 Binary files a/lib/cassandra-0.4.0-dev.jar and b/lib/cassandra-0.4.0-dev.jar differ diff --git a/samples-scala/src/main/scala/SimpleService.scala b/samples-scala/src/main/scala/SimpleService.scala index dca7efdd5c..bead8ea6e9 100644 --- a/samples-scala/src/main/scala/SimpleService.scala +++ b/samples-scala/src/main/scala/SimpleService.scala @@ -1,7 +1,7 @@ package sample.scala import javax.ws.rs.{Path, GET, Produces} -import se.scalablesolutions.akka.kernel.state.{TransactionalState, TransactionalMap, EmbeddedCassandraStorageConfig} +import se.scalablesolutions.akka.kernel.state.{TransactionalState, TransactionalMap, CassandraStorageConfig} import se.scalablesolutions.akka.kernel.actor.{Supervisor, SupervisorFactory, Actor, StartSupervisor} import se.scalablesolutions.akka.kernel.config.ScalaConfig._ @@ -35,7 +35,7 @@ class SimpleService extends Actor { case object Tick private val KEY = "COUNTER"; private var hasStartedTicking = false; - private val storage = TransactionalState.newPersistentMap(EmbeddedCassandraStorageConfig()) + private val storage = TransactionalState.newPersistentMap(CassandraStorageConfig()) @GET @Produces(Array("application/json"))