+ * http://localhost:9998/jmx + * ?service=service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi + * &component=se.scalablesolutions.akka:type=Stats + * &attribute=counter_NrOfActors + *+ */ +@Path("/jmx") +class RestfulJMX extends Actor with Logging { + private case class Request(service: String, component: String, attribute: String) + + private val connectors = new ConcurrentHashMap[String, JMXConnector] + + @GET + @Produces(Array("text/plain")) + def queryJMX( + @QueryParam("service") service: String, + @QueryParam("component") component: String, + @QueryParam("attribute") attribute: String): String= + (this !! Request(service, component, attribute)).getOrElse("Error in REST JMX management service") + + override def receive: PartialFunction[Any, Unit] = { + case Request(service, component, attribute) => reply(retrieveAttribute(service, component, attribute)) + } + + private def retrieveAttribute(service: String, component: String, attribute: String): String = { + try { + var connector = connectors.putIfAbsent(service, JMXConnectorFactory.connect(new JMXServiceURL(service))) + connector.getMBeanServerConnection.getAttribute(new ObjectName(component), attribute).toString + } catch { + case e: Exception => + if (connectors.contains(service)) connectors.remove(service) + throw e + } + } +} + +/** + * REST interface to Akka's statistics recorder. + * + * Here is an example that retreives a statistics report. + *
+ * http://localhost:9998/stats?reset=true + *+ */ +@Path("/stats") +class StatisticsReporter extends Actor with Logging { + private case class Stats(reset: Boolean) + @GET + @Produces(Array("text/html")) + def stats(@QueryParam("reset") reset: String): scala.xml.Elem = + (this !! Stats(java.lang.Boolean.valueOf(reset).booleanValue)).getOrElse(
{Management.getStats(reset)})
+ }
+}
+
+class RestfulJMXBoot extends Logging {
+ log.info("Booting Restful JMX servivce")
+ object factory extends SupervisorFactory {
+ override def getSupervisorConfig: SupervisorConfig = {
+ SupervisorConfig(
+ RestartStrategy(OneForOne, 3, 100),
+ Supervise(
+ new RestfulJMX,
+ LifeCycle(Permanent, 100)) ::
+ Supervise(
+ new StatisticsReporter,
+ LifeCycle(Permanent, 100)) ::
+ Nil)
+ }
+ }
+ factory.newSupervisor.startSupervisor
+}
diff --git a/kernel/src/main/scala/management/ScalaJMX.scala b/kernel/src/main/scala/management/ScalaJMX.scala
new file mode 100755
index 0000000000..b4a7800f88
--- /dev/null
+++ b/kernel/src/main/scala/management/ScalaJMX.scala
@@ -0,0 +1,171 @@
+/**
+ * Copyright (C) 2009 Scalable Solutions.
+ */
+
+package se.scalablesolutions.akka.kernel.management
+
+import javax.management._
+import java.lang.management._
+
+/*
+object ScalaJMX {
+
+ val mbeanServer = ManagementFactory.getPlatformMBeanServer
+
+ def register(t: AnyRef, i: Class, name: ObjectName) = mbeanServer.registerMBean(new StandardMBean(t, i), name)
+ def registerBean(bean: DynamicMBean, name: ObjectName): ObjectInstance = mbeanServer.registerMBean(bean, name)
+ def register(t: AnyRef, name: String): ObjectInstance = register(t, beanClass(t), name)
+
+ def info(name: ObjectName): SBean = mbeanServer.getMBeanInfo(name)
+ def bean(name: ObjectName): SBeanInfo = convBeanInfo(name, mbeanServer.getMBeanInfo(name))
+ def invoke(name: ObjectName, operationName: String, params: Array[Object], signature: Array[String]): Object =
+ mbeanServer.invoke(name, operationName, params, signature)
+ def call(name: ObjectName, operationName: String): Object = invoke(name, operationName, Array[Object](), Array[String]())
+
+ def get(name: ObjectName, attribute: String) = mbeanServer.getAttribute(name, attribute)
+ def set(name: ObjectName, attribute: String, value: Object) = mbeanServer.setAttribute(name, new Attribute(attribute, value))
+
+ implicit def instanceToName(oi: ObjectInstance) = oi.getObjectName()
+ implicit def stringToName(name: String) = ObjectName.getInstance(name)
+ implicit def convBean(bi: MBeanInfo):SBean = SBean(bi.getClassName(), bi.getDescription(), bi.getAttributes(), bi.getNotifications(), bi.getOperations(), bi.getConstructors())
+ implicit def seqToArr(seq: Seq[AnyRef]): Array[Object] = seq.toArray
+
+ def convBeanInfo(name: ObjectName, bi: MBeanInfo):SBeanInfo = new SBeanInfo(name, bi.getClassName(), bi.getDescription(), bi.getAttributes(), bi.getNotifications(), bi.getOperations(), bi.getConstructors())
+
+ implicit def convAttrs(attrs: Array[MBeanAttributeInfo]): Seq[SAttr] =
+ for (val a <- attrs) yield a
+ implicit def convParams(params: Array[MBeanParameterInfo]): Seq[SParameter] =
+ for (val p <- params) yield p
+ implicit def convNotes(notes: Array[MBeanNotificationInfo]): Seq[SNotification] =
+ for (val p <- notes) yield p
+ implicit def convCons(cons: Array[MBeanConstructorInfo]): Seq[SConstructor] =
+ for (val p <- cons) yield p
+ implicit def convOps(cons: Array[MBeanOperationInfo]): Seq[SOperation] =
+ for (val p <- cons) yield p
+
+ implicit def convAttr(attr: MBeanAttributeInfo) = SAttr(attr.getName(), attr.getDescription(), attr.getType(), attr.isIs(), attr.isReadable(), attr.isWritable())
+ implicit def convNote(note: MBeanNotificationInfo) = SNotification(note.getName(), note.getDescription(), note.getNotifTypes())
+ implicit def convOp(op: MBeanOperationInfo):SOperation = SOperation(op.getName(), op.getDescription(), op.getImpact(), op.getReturnType(), op.getSignature())
+ implicit def convCon(con: MBeanConstructorInfo):SConstructor = SConstructor(con getName, con getDescription, con getSignature)
+ implicit def convParam(p: MBeanParameterInfo) = SParameter(p getName, p getDescription, p getType)
+
+ private def beanClass(t: AnyRef) = Class.forName(t.getClass().getName() + "MBean")
+}
+
+class MBean(mbeanInterface: String) extends StandardMBean(Class.forName(mbeanInterface))
+
+abstract class SFeature(val name: String, val description: String)
+
+case class SBean(className: String, description: String,
+ attrs: Seq[SAttr], notes: Seq[SNotification],
+ ops: Seq[SOperation], cons: Seq[SConstructor]) {
+ def writable = attrs.toList.filter(sa => sa.writable)
+}
+
+class SBeanInfo(name: ObjectName, className: String, description: String,
+ attrs: Seq[SAttr], notes: Seq[SNotification],
+ ops: Seq[SOperation], cons: Seq[SConstructor])
+extends SBean(className, description, attrs, notes, ops, cons) {
+
+ def get(attribute: String) = SJMX.get(name, attribute)
+ def set(attribute: String, value: Object) = SJMX.set(name, attribute, value)
+ def call(opName: String) = SJMX.call(name, opName)
+}
+
+case class SAttr(
+ override val name: String,
+ override val description: String,
+ jmxType: String, isIs: boolean, readable: boolean, writable: boolean
+) extends SFeature(name, description)
+
+case class SNotification(
+ override val name: String,
+ override val description: String,
+ notifTypes: Array[String]) extends SFeature(name, description)
+
+case class SOperation(
+ override val name: String,
+ override val description: String,
+ impact: int,
+ returnType: String,
+ signature: Seq[SParameter]) extends SFeature(name, description)
+
+case class SParameter(
+ override val name: String,
+ override val description: String,
+ jmxType: String) extends SFeature(name, description)
+
+case class SConstructor(
+ override val name: String,
+ override val description: String,
+ signature: Seq[SParameter]) extends SFeature(name, description)
+
+*/
+
+/*
+package com.soletta.spipe;
+
+import javax.management.{StandardMBean,ObjectName,MBeanInfo};
+
+class SPipe extends MBean("com.soletta.spipe.SPipeMBean") with SPipeMBean {
+
+ import Console.println;
+ import SJMX._;
+
+ private var desc: String = "Yipe!";
+
+ def go = {
+ val oname: ObjectName = "default:name=SPipe";
+ val instance = SJMX.registerBean(this, oname);
+
+ set(oname, "Factor", "Hello!");
+ println(get(oname, "Factor"));
+
+ val SBean(n, d, Seq(_, a2, a3, _*), _, ops, _) = info(oname);
+ println("Bean name is " + n + ", description is " + d);
+ println("Second attribute is " + a2);
+ println("Third attribute is " + a3);
+ println("Writable attributes are " + info(oname).writable);
+ println("Ops: " + ops);
+
+ val x =
+ -Dcassandra -Dstorage-config=config/ -Dpidfile=akka.pid
- *
* @author Jonas Bonér
*/
-object CassandraStorage extends Logging {
- val TABLE_NAME = "akka"
- val MAP_COLUMN_FAMILY = "map"
- val VECTOR_COLUMN_FAMILY = "vector"
- val REF_COLUMN_FAMILY = "ref:item"
+object CassandraStorage extends MapStorage with VectorStorage with Logging {
+ val KEYSPACE = "akka"
+ val MAP_COLUMN_PARENT = new ColumnParent("map", null)
+ val VECTOR_COLUMN_PARENT = new ColumnParent("vector", null)
+ val REF_COLUMN_PARENT = new ColumnParent("ref", null)
+ val REF_KEY = "item".getBytes("UTF-8")
+ val CASSANDRA_SERVER_HOSTNAME = config.getString("akka.storage.cassandra.hostname", "127.0.0.1")
+ val CASSANDRA_SERVER_PORT = config.getInt("akka.storage.cassandra.port", 9160)
+ val CONSISTENCY_LEVEL = config.getInt("akka.storage.cassandra.consistency-level", 1)
val IS_ASCENDING = true
- val RUN_THRIFT_SERVICE = kernel.Kernel.config.getBool("akka.storage.cassandra.thrift-server.service", false)
- val BLOCKING_CALL = {
- if (kernel.Kernel.config.getBool("akka.storage.cassandra.blocking", true)) 0
- else 1
- }
-
@volatile private[this] var isRunning = false
+ private[this] val protocol: Protocol = Protocol.Binary
+/* {
+ config.getString("akka.storage.cassandra.procotol", "binary") match {
+ case "binary" => Protocol.Binary
+ case "json" => Protocol.JSON
+ case "simple-json" => Protocol.SimpleJSON
+ case unknown => throw new UnsupportedOperationException("Unknown storage serialization protocol [" + unknown + "]")
+ }
+ }
+*/
+
private[this] val serializer: Serializer = {
kernel.Kernel.config.getString("akka.storage.cassandra.storage-format", "java") match {
case "scala-json" => Serializer.ScalaJSON
@@ -51,193 +55,403 @@ object CassandraStorage extends Logging {
case unknown => throw new UnsupportedOperationException("Unknown storage serialization protocol [" + unknown + "]")
}
}
-
- // TODO: is this server thread-safe or needed to be wrapped up in an actor?
- private[this] val server = classOf[CassandraServer].newInstance.asInstanceOf[CassandraServer]
- private[this] var thriftServer: CassandraThriftServer = _
-
+ private[this] var sessions: Option[CassandraSessionPool[_]] = None
+
def start = synchronized {
if (!isRunning) {
try {
- server.start
+ sessions = Some(new CassandraSessionPool(
+ KEYSPACE,
+ StackPool(SocketProvider(CASSANDRA_SERVER_HOSTNAME, CASSANDRA_SERVER_PORT)),
+ protocol,
+ CONSISTENCY_LEVEL))
log.info("Cassandra persistent storage has started up successfully");
} catch {
case e =>
log.error("Could not start up Cassandra persistent storage")
throw e
}
- if (RUN_THRIFT_SERVICE) {
- thriftServer = new CassandraThriftServer(server)
- thriftServer.start
- }
isRunning
}
}
- def stop = if (isRunning) {
- //server.storageService.shutdown
- if (RUN_THRIFT_SERVICE) thriftServer.stop
+ def stop = synchronized {
+ if (isRunning && sessions.isDefined) sessions.get.close
}
// ===============================================================
// For Ref
// ===============================================================
- def insertRefStorageFor(name: String, element: AnyRef) = {
- server.insert(
- TABLE_NAME,
- name,
- REF_COLUMN_FAMILY,
- serializer.out(element),
- System.currentTimeMillis,
- BLOCKING_CALL)
- }
-
- def getRefStorageFor(name: String): Option[AnyRef] = {
- try {
- val column = server.get_column(TABLE_NAME, name, REF_COLUMN_FAMILY)
- Some(serializer.in(column.value, None))
- } catch {
- case e =>
- e.printStackTrace
- None
+ def insertRefStorageFor(name: String, element: AnyRef) = if (sessions.isDefined) {
+ sessions.get.withSession {
+ _ ++| (name,
+ new ColumnPath(REF_COLUMN_PARENT.getColumn_family, null, REF_KEY),
+ serializer.out(element),
+ System.currentTimeMillis,
+ CONSISTENCY_LEVEL)
}
- }
+ } else throw new IllegalStateException("CassandraStorage is not started")
- // ===============================================================
- // For Vector
- // ===============================================================
-
- def insertVectorStorageEntryFor(name: String, element: AnyRef) = {
- server.insert(
- TABLE_NAME,
- name,
- VECTOR_COLUMN_FAMILY + ":" + getVectorStorageSizeFor(name),
- serializer.out(element),
- System.currentTimeMillis,
- BLOCKING_CALL)
- }
-
- def getVectorStorageEntryFor(name: String, index: Int): AnyRef = {
+ def getRefStorageFor(name: String): Option[AnyRef] = if (sessions.isDefined) {
try {
- val column = server.get_column(TABLE_NAME, name, VECTOR_COLUMN_FAMILY + ":" + index)
- serializer.in(column.value, None)
- } catch {
- case e =>
- e.printStackTrace
- throw new Predef.NoSuchElementException(e.getMessage)
- }
- }
-
- def getVectorStorageRangeFor(name: String, start: Int, count: Int): List[AnyRef] =
- server.get_slice(TABLE_NAME, name, VECTOR_COLUMN_FAMILY, IS_ASCENDING, count)
- .toArray.toList.asInstanceOf[List[Tuple2[String, AnyRef]]].map(tuple => tuple._2)
-
- def getVectorStorageSizeFor(name: String): Int =
- server.get_column_count(TABLE_NAME, name, VECTOR_COLUMN_FAMILY)
-
- // ===============================================================
- // For Map
- // ===============================================================
-
- def insertMapStorageEntryFor(name: String, key: String, value: AnyRef) = {
- server.insert(
- TABLE_NAME,
- name,
- MAP_COLUMN_FAMILY + ":" + key,
- serializer.out(value),
- System.currentTimeMillis,
- BLOCKING_CALL)
- }
-
- def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[String, AnyRef]]) = {
- import java.util.{Map, HashMap, List, ArrayList}
- val columns: Map[String, List[column_t]] = new HashMap
- for (entry <- entries) {
- val cls: List[column_t] = new ArrayList
- cls.add(new column_t(entry._1, serializer.out(entry._2), System.currentTimeMillis))
- columns.put(MAP_COLUMN_FAMILY, cls)
- }
- server.batch_insert(new batch_mutation_t(
- TABLE_NAME,
- name,
- columns),
- BLOCKING_CALL)
- }
-
- def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef] = {
- try {
- val column = server.get_column(TABLE_NAME, name, MAP_COLUMN_FAMILY + ":" + key)
- Some(serializer.in(column.value, None))
+ val column: Option[Column] = sessions.get.withSession {
+ _ | (name, new ColumnPath(REF_COLUMN_PARENT.getColumn_family, null, REF_KEY))
+ }
+ if (column.isDefined) Some(serializer.in(column.get.value, None))
+ else None
} catch {
case e =>
e.printStackTrace
None
}
+ } else throw new IllegalStateException("CassandraStorage is not started")
+
+ // ===============================================================
+ // For Vector
+ // ===============================================================
+
+ override def insertVectorStorageEntryFor(name: String, element: AnyRef) = if (sessions.isDefined) {
+ sessions.get.withSession {
+ _ ++| (name,
+ new ColumnPath(VECTOR_COLUMN_PARENT.getColumn_family, null, intToBytes(getVectorStorageSizeFor(name))),
+ serializer.out(element),
+ System.currentTimeMillis,
+ CONSISTENCY_LEVEL)
+ }
+ } else throw new IllegalStateException("CassandraStorage is not started")
+
+ override def insertVectorStorageEntriesFor(name: String, elements: List[AnyRef]) = {
}
- def getMapStorageFor(name: String): List[Tuple2[String, AnyRef]] = {
- val columns = server.get_columns_since(TABLE_NAME, name, MAP_COLUMN_FAMILY, -1)
+ override def getVectorStorageEntryFor(name: String, index: Int): AnyRef = if (sessions.isDefined) {
+ val column: Option[Column] = sessions.get.withSession {
+ _ | (name, new ColumnPath(VECTOR_COLUMN_PARENT.getColumn_family, null, intToBytes(index)))
+ }
+ if (column.isDefined) serializer.in(column.get.value, None)
+ else throw new NoSuchElementException("No element for vector [" + name + "] and index [" + index + "]")
+ } else throw new IllegalStateException("CassandraStorage is not started")
+
+ override def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[AnyRef] = if (sessions.isDefined) {
+ val startBytes = if (start.isDefined) intToBytes(start.get) else null
+ val finishBytes = if (finish.isDefined) intToBytes(finish.get) else null
+ val columns: List[Column] = sessions.get.withSession {
+ _ / (name,
+ VECTOR_COLUMN_PARENT,
+ startBytes, finishBytes,
+ IS_ASCENDING,
+ count,
+ CONSISTENCY_LEVEL)
+ }
+ columns.map(column => serializer.in(column.value, None))
+ } else throw new IllegalStateException("CassandraStorage is not started")
+
+ override def getVectorStorageSizeFor(name: String): Int = if (sessions.isDefined) {
+ sessions.get.withSession {
+ _ |# (name, VECTOR_COLUMN_PARENT)
+ }
+ } else throw new IllegalStateException("CassandraStorage is not started")
+
+ // ===============================================================
+ // For Map
+ // ===============================================================
+
+ override def insertMapStorageEntryFor(name: String, key: AnyRef, element: AnyRef) = if (sessions.isDefined) {
+ sessions.get.withSession {
+ _ ++| (name,
+ new ColumnPath(MAP_COLUMN_PARENT.getColumn_family, null, serializer.out(key)),
+ serializer.out(element),
+ System.currentTimeMillis,
+ CONSISTENCY_LEVEL)
+ }
+ } else throw new IllegalStateException("CassandraStorage is not started")
+
+ override def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[AnyRef, AnyRef]]) = if (sessions.isDefined) {
+ val cf2columns: java.util.Map[String, java.util.List[Column]] = new java.util.HashMap
+ for (entry <- entries) {
+ val columns: java.util.List[Column] = new java.util.ArrayList
+ columns.add(new Column(serializer.out(entry._1), serializer.out(entry._2), System.currentTimeMillis))
+ cf2columns.put(MAP_COLUMN_PARENT.getColumn_family, columns)
+ }
+ sessions.get.withSession {
+ _ ++| (new BatchMutation(name, cf2columns), CONSISTENCY_LEVEL)
+ }
+ } else throw new IllegalStateException("CassandraStorage is not started")
+
+ override def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef] = if (sessions.isDefined) {
+ try {
+ val column: Option[Column] = sessions.get.withSession {
+ _ | (name, new ColumnPath(MAP_COLUMN_PARENT.getColumn_family, null, serializer.out(key)))
+ }
+ if (column.isDefined) Some(serializer.in(column.get.value, None))
+ else None
+ } catch {
+ case e =>
+ e.printStackTrace
+ None
+ }
+ } else throw new IllegalStateException("CassandraStorage is not started")
+
+ override def getMapStorageFor(name: String): List[Tuple2[AnyRef, AnyRef]] = if (sessions.isDefined) {
+ throw new UnsupportedOperationException
+ /*
+ val columns = server.get_columns_since(name, MAP_COLUMN_FAMILY, -1)
.toArray.toList.asInstanceOf[List[org.apache.cassandra.service.column_t]]
for {
column <- columns
- col = (column.columnName, serializer.in(column.value, None))
+ col = (column.columnName, column.value)
} yield col
- }
-
- def getMapStorageSizeFor(name: String): Int =
- server.get_column_count(TABLE_NAME, name, MAP_COLUMN_FAMILY)
+ */
+ } else throw new IllegalStateException("CassandraStorage is not started")
- def removeMapStorageFor(name: String) =
- server.remove(TABLE_NAME, name, MAP_COLUMN_FAMILY, System.currentTimeMillis, BLOCKING_CALL)
-
- def getMapStorageRangeFor(name: String, start: Int, count: Int): List[Tuple2[String, AnyRef]] = {
- server.get_slice(TABLE_NAME, name, MAP_COLUMN_FAMILY, IS_ASCENDING, count)
- .toArray.toList.asInstanceOf[List[Tuple2[String, AnyRef]]]
- }
-}
-
-class CassandraThriftServer(server: CassandraServer) extends Logging {
- case object Start
- case object Stop
-
- private[this] val serverEngine: TThreadPoolServer = try {
- val pidFile = kernel.Kernel.config.getString("akka.storage.cassandra.thrift-server.pidfile", "akka.pid")
- if (pidFile != null) new File(pidFile).deleteOnExit();
- val listenPort = DatabaseDescriptor.getThriftPort
-
- val processor = new Cassandra.Processor(server)
- val tServerSocket = new TServerSocket(listenPort)
- val tProtocolFactory = new TBinaryProtocol.Factory
-
- val options = new TThreadPoolServer.Options
- options.minWorkerThreads = 64
- new TThreadPoolServer(new TProcessorFactory(processor),
- tServerSocket,
- new TTransportFactory,
- new TTransportFactory,
- tProtocolFactory,
- tProtocolFactory,
- options)
- } catch {
- case e =>
- log.error("Could not start up Cassandra thrift service")
- throw e
- }
-
- import scala.actors.Actor._
- private[this] val serverDaemon = actor {
- receive {
- case Start =>
- serverEngine.serve
- log.info("Cassandra thrift service has starting up successfully")
- case Stop =>
- log.info("Cassandra thrift service is shutting down...")
- serverEngine.stop
+ override def getMapStorageSizeFor(name: String): Int = if (sessions.isDefined) {
+ sessions.get.withSession {
+ _ |# (name, MAP_COLUMN_PARENT)
}
- }
+ } else throw new IllegalStateException("CassandraStorage is not started")
- def start = serverDaemon ! Start
- def stop = serverDaemon ! Stop
+ override def removeMapStorageFor(name: String): Unit = removeMapStorageFor(name, null)
+
+ override def removeMapStorageFor(name: String, key: AnyRef): Unit = if (sessions.isDefined) {
+ val keyBytes = if (key == null) null else serializer.out(key)
+ sessions.get.withSession {
+ _ -- (name,
+ new ColumnPathOrParent(MAP_COLUMN_PARENT.getColumn_family, null, keyBytes),
+ System.currentTimeMillis,
+ CONSISTENCY_LEVEL)
+ }
+ } else throw new IllegalStateException("CassandraStorage is not started")
+
+ override def getMapStorageRangeFor(name: String, start: Option[AnyRef], finish: Option[AnyRef], count: Int):
+ List[Tuple2[AnyRef, AnyRef]] = if (sessions.isDefined) {
+ val startBytes = if (start.isDefined) serializer.out(start.get) else null
+ val finishBytes = if (finish.isDefined) serializer.out(finish.get) else null
+ val columns: List[Column] = sessions.get.withSession {
+ _ / (name, MAP_COLUMN_PARENT, startBytes, finishBytes, IS_ASCENDING, count, CONSISTENCY_LEVEL)
+ }
+ columns.map(column => (column.name, serializer.in(column.value, None)))
+ } else throw new IllegalStateException("CassandraStorage is not started")
}
+
+/**
+ * NOTE: requires command line options:
+ * -Dcassandra -Dstorage-config=config/ -Dpidfile=akka.pid
+ *
+ * @author Jonas Bonér
+ *
+object EmbeddedCassandraStorage extends Logging {
+val KEYSPACE = "akka"
+val MAP_COLUMN_FAMILY = "map"
+val VECTOR_COLUMN_FAMILY = "vector"
+val REF_COLUMN_FAMILY = "ref:item"
+
+val IS_ASCENDING = true
+
+val RUN_THRIFT_SERVICE = kernel.Kernel.config.getBool("akka.storage.cassandra.thrift-server.service", false)
+val CONSISTENCY_LEVEL = {
+if (kernel.Kernel.config.getBool("akka.storage.cassandra.blocking", true)) 0
+else 1 }
+
+@volatile private[this] var isRunning = false
+private[this] val serializer: Serializer = {
+kernel.Kernel.config.getString("akka.storage.cassandra.storage-format", "java") match {
+case "scala-json" => Serializer.ScalaJSON
+case "java-json" => Serializer.JavaJSON
+case "protobuf" => Serializer.Protobuf
+case "java" => Serializer.Java
+case "sbinary" => throw new UnsupportedOperationException("SBinary serialization protocol is not yet supported for storage")
+case "avro" => throw new UnsupportedOperationException("Avro serialization protocol is not yet supported for storage")
+case unknown => throw new UnsupportedOperationException("Unknown storage serialization protocol [" + unknown + "]")
+}
+}
+
+// TODO: is this server thread-safe or needed to be wrapped up in an actor?
+private[this] val server = classOf[CassandraServer].newInstance.asInstanceOf[CassandraServer]
+
+private[this] var thriftServer: CassandraThriftServer = _
+
+def start = synchronized {
+if (!isRunning) {
+try {
+server.start
+log.info("Cassandra persistent storage has started up successfully");
+} catch {
+case e =>
+log.error("Could not start up Cassandra persistent storage")
+throw e
+}
+if (RUN_THRIFT_SERVICE) {
+thriftServer = new CassandraThriftServer(server)
+thriftServer.start
+}
+isRunning
+}
+}
+
+def stop = if (isRunning) {
+//server.storageService.shutdown
+if (RUN_THRIFT_SERVICE) thriftServer.stop
+}
+
+// ===============================================================
+// For Ref
+// ===============================================================
+
+def insertRefStorageFor(name: String, element: AnyRef) = {
+server.insert(
+KEYSPACE,
+name,
+REF_COLUMN_FAMILY,
+element,
+System.currentTimeMillis,
+CONSISTENCY_LEVEL)
+}
+
+def getRefStorageFor(name: String): Option[AnyRef] = {
+try {
+val column = server.get_column(KEYSPACE, name, REF_COLUMN_FAMILY)
+Some(serializer.in(column.value, None))
+} catch {
+case e =>
+e.printStackTrace
+None }
+}
+
+// ===============================================================
+// For Vector
+// ===============================================================
+
+def insertVectorStorageEntryFor(name: String, element: AnyRef) = {
+server.insert(
+KEYSPACE,
+name,
+VECTOR_COLUMN_FAMILY + ":" + getVectorStorageSizeFor(name),
+element,
+System.currentTimeMillis,
+CONSISTENCY_LEVEL)
+}
+
+def getVectorStorageEntryFor(name: String, index: Int): AnyRef = {
+try {
+val column = server.get_column(KEYSPACE, name, VECTOR_COLUMN_FAMILY + ":" + index)
+serializer.in(column.value, None)
+} catch {
+case e =>
+e.printStackTrace
+throw new Predef.NoSuchElementException(e.getMessage)
+}
+}
+
+def getVectorStorageRangeFor(name: String, start: Int, count: Int): List[AnyRef] =
+server.get_slice(KEYSPACE, name, VECTOR_COLUMN_FAMILY, IS_ASCENDING, count)
+.toArray.toList.asInstanceOf[List[Tuple2[String, AnyRef]]].map(tuple => tuple._2)
+
+def getVectorStorageSizeFor(name: String): Int =
+server.get_column_count(KEYSPACE, name, VECTOR_COLUMN_FAMILY)
+
+// ===============================================================
+// For Map
+// ===============================================================
+
+def insertMapStorageEntryFor(name: String, key: String, value: AnyRef) = {
+server.insert(
+KEYSPACE, name,
+MAP_COLUMN_FAMILY + ":" + key,
+serializer.out(value),
+System.currentTimeMillis,
+CONSISTENCY_LEVEL)
+}
+
+def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[String, AnyRef]]) = {
+import java.util.{ Map, HashMap, List, ArrayList }
+val columns: Map[String, List[column_t]] = new HashMap
+for (entry <- entries) {
+val cls: List[column_t] = new ArrayList
+cls.add(new column_t(entry._1, serializer.out(entry._2), System.currentTimeMillis))
+columns.put(MAP_COLUMN_FAMILY, cls)
+}
+server.batch_insert(new BatchMutation(
+KEYSPACE, name,
+columns),
+CONSISTENCY_LEVEL)
+}
+
+def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef] = {
+try {
+val column = server.get_column(KEYSPACE, name, MAP_COLUMN_FAMILY + ":" + key)
+Some(serializer.in(column.value, None))
+} catch {
+case e =>
+e.printStackTrace
+None
+}
+}
+
+def getMapStorageFor(name: String): List[Tuple2[String, AnyRef]] = {
+val columns = server.get_columns_since(KEYSPACE, name, MAP_COLUMN_FAMILY, -1)
+.toArray.toList.asInstanceOf[List[org.apache.cassandra.service.column_t]]
+for {
+column <- columns
+col = (column.columnName, serializer.in(column.value, None))
+} yield col
+}
+
+def getMapStorageSizeFor(name: String): Int =
+server.get_column_count(KEYSPACE, name, MAP_COLUMN_FAMILY)
+
+def removeMapStorageFor(name: String) =
+server.remove(KEYSPACE, name, MAP_COLUMN_FAMILY, System.currentTimeMillis, CONSISTENCY_LEVEL)
+
+def getMapStorageRangeFor(name: String, start: Int, count: Int): List[Tuple2[String, AnyRef]] = {
+server.get_slice(KEYSPACE, name, MAP_COLUMN_FAMILY, IS_ASCENDING, count)
+.toArray.toList.asInstanceOf[List[Tuple2[String, AnyRef]]]
+}
+}
+
+
+class CassandraThriftServer(server: CassandraServer) extends Logging {
+case object Start
+case object Stop
+
+private[this] val serverEngine: TThreadPoolServer = try {
+val pidFile = kernel.Kernel.config.getString("akka.storage.cassandra.thrift-server.pidfile", "akka.pid")
+if (pidFile != null) new File(pidFile).deleteOnExit();
+val listenPort = DatabaseDescriptor.getThriftPort
+
+val processor = new Cassandra.Processor(server)
+val tServerSocket = new TServerSocket(listenPort)
+val tProtocolFactory = new TBinaryProtocol.Factory
+
+val options = new TThreadPoolServer.Options
+options.minWorkerThreads = 64
+new TThreadPoolServer(new TProcessorFactory(processor),
+tServerSocket,
+new TTransportFactory,
+new TTransportFactory,
+tProtocolFactory,
+tProtocolFactory,
+options)
+} catch {
+case e =>
+log.error("Could not start up Cassandra thrift service")
+throw e
+}
+
+import scala.actors.Actor._
+private[this] val serverDaemon = actor {
+receive {
+case Start =>
+serverEngine.serve
+log.info("Cassandra thrift service has starting up successfully")
+case Stop =>
+log.info("Cassandra thrift service is shutting down...")
+serverEngine.stop
+}
+}
+
+def start = serverDaemon ! Start
+def stop = serverDaemon ! Stop
+}
+ */
diff --git a/kernel/src/main/scala/state/MongoStorage.scala b/kernel/src/main/scala/state/MongoStorage.scala
new file mode 100644
index 0000000000..657a6ec9fc
--- /dev/null
+++ b/kernel/src/main/scala/state/MongoStorage.scala
@@ -0,0 +1,258 @@
+package se.scalablesolutions.akka.kernel.state
+
+import com.mongodb._
+import se.scalablesolutions.akka.kernel.util.Logging
+import serialization.{Serializer}
+import kernel.Kernel.config
+
+import java.util.{Map=>JMap, List=>JList, ArrayList=>JArrayList}
+
+object MongoStorage extends MapStorage
+ with VectorStorage with Logging {
+
+ // enrich with null safe findOne
+ class RichDBCollection(value: DBCollection) {
+ def findOneNS(o: DBObject): Option[DBObject] = {
+ value.findOne(o) match {
+ case null => None
+ case x => Some(x)
+ }
+ }
+ }
+
+ implicit def enrichDBCollection(c: DBCollection) = new RichDBCollection(c)
+
+ val KEY = "key"
+ val VALUE = "value"
+ val COLLECTION = "akka_coll"
+ val MONGODB_SERVER_HOSTNAME =
+ config.getString("akka.storage.mongodb.hostname", "127.0.0.1")
+ val MONGODB_SERVER_DBNAME =
+ config.getString("akka.storage.mongodb.dbname", "testdb")
+ val MONGODB_SERVER_PORT =
+ config.getInt("akka.storage.mongodb.port", 27017)
+
+ val db = new Mongo(MONGODB_SERVER_HOSTNAME,
+ MONGODB_SERVER_PORT, MONGODB_SERVER_DBNAME)
+ val coll = db.getCollection(COLLECTION)
+
+ // @fixme: make this pluggable
+ private[this] val serializer: Serializer = Serializer.ScalaJSON
+
+ override def insertMapStorageEntryFor(name: String,
+ key: AnyRef, value: AnyRef) {
+ insertMapStorageEntriesFor(name, List((key, value)))
+ }
+
+ override def insertMapStorageEntriesFor(name: String,
+ entries: List[Tuple2[AnyRef, AnyRef]]) {
+ import java.util.{Map, HashMap}
+
+ val m: Map[AnyRef, AnyRef] = new HashMap
+ for ((k, v) <- entries) {
+ m.put(k, serializer.out(v))
+ }
+
+ nullSafeFindOne(name) match {
+ case None =>
+ coll.insert(new BasicDBObject().append(KEY, name).append(VALUE, m))
+ case Some(dbo) => {
+ // collate the maps
+ val o = dbo.get(VALUE).asInstanceOf[Map[AnyRef, AnyRef]]
+ o.putAll(m)
+
+ // remove existing reference
+ removeMapStorageFor(name)
+ // and insert
+ coll.insert(new BasicDBObject().append(KEY, name).append(VALUE, o))
+ }
+ }
+ }
+
+ override def removeMapStorageFor(name: String) = {
+ val q = new BasicDBObject
+ q.put(KEY, name)
+ coll.remove(q)
+ }
+
+ override def removeMapStorageFor(name: String, key: AnyRef) = {
+ nullSafeFindOne(name) match {
+ case None =>
+ case Some(dbo) => {
+ val orig = dbo.get(VALUE).asInstanceOf[DBObject].toMap
+ orig.remove(key.asInstanceOf[String])
+
+ // remove existing reference
+ removeMapStorageFor(name)
+ // and insert
+ coll.insert(new BasicDBObject().append(KEY, name).append(VALUE, orig))
+ }
+ }
+ }
+
+ override def getMapStorageEntryFor(name: String,
+ key: AnyRef): Option[AnyRef] = {
+ getValueForKey(name, key.asInstanceOf[String])
+ }
+
+ override def getMapStorageSizeFor(name: String): Int = {
+ nullSafeFindOne(name) match {
+ case None => 0
+ case Some(dbo) =>
+ dbo.get(VALUE).asInstanceOf[JMap[String, AnyRef]].keySet.size
+ }
+ }
+
+ override def getMapStorageFor(name: String): List[Tuple2[AnyRef, AnyRef]] = {
+ val m =
+ nullSafeFindOne(name) match {
+ case None =>
+ throw new Predef.NoSuchElementException(name + " not present")
+ case Some(dbo) =>
+ dbo.get(VALUE).asInstanceOf[JMap[String, AnyRef]]
+ }
+ val n =
+ List(m.keySet.toArray: _*).asInstanceOf[List[String]]
+ val vals =
+ for(s <- n)
+ yield (s, serializer.in(m.get(s).asInstanceOf[Array[Byte]], None))
+ vals.asInstanceOf[List[Tuple2[String, AnyRef]]]
+ }
+
+ override def getMapStorageRangeFor(name: String, start: Option[AnyRef],
+ finish: Option[AnyRef],
+ count: Int): List[Tuple2[AnyRef, AnyRef]] = {
+ val m =
+ nullSafeFindOne(name) match {
+ case None =>
+ throw new Predef.NoSuchElementException(name + " not present")
+ case Some(dbo) =>
+ dbo.get(VALUE).asInstanceOf[JMap[String, AnyRef]]
+ }
+
+ /**
+ * count is the max number of results to return. Start with
+ * start or 0 (if start is not defined) and go until
+ * you hit finish or count.
+ */
+ val s = if (start.isDefined) start.get.asInstanceOf[Int] else 0
+ val cnt =
+ if (finish.isDefined) {
+ val f = finish.get.asInstanceOf[Int]
+ if (f >= s) Math.min(count, (f - s)) else count
+ }
+ else count
+
+ val n =
+ List(m.keySet.toArray: _*).asInstanceOf[List[String]].sort((e1, e2) => (e1 compareTo e2) < 0).slice(s, s + cnt)
+ val vals =
+ for(s <- n)
+ yield (s, serializer.in(m.get(s).asInstanceOf[Array[Byte]], None))
+ vals.asInstanceOf[List[Tuple2[String, AnyRef]]]
+ }
+
+ private def getValueForKey(name: String, key: String): Option[AnyRef] = {
+ try {
+ nullSafeFindOne(name) match {
+ case None => None
+ case Some(dbo) =>
+ Some(serializer.in(
+ dbo.get(VALUE)
+ .asInstanceOf[JMap[String, AnyRef]]
+ .get(key).asInstanceOf[Array[Byte]], None))
+ }
+ } catch {
+ case e =>
+ throw new Predef.NoSuchElementException(e.getMessage)
+ }
+ }
+
+ override def insertVectorStorageEntriesFor(name: String, elements: List[AnyRef]) = {
+ val q = new BasicDBObject
+ q.put(KEY, name)
+
+ val currentList =
+ coll.findOneNS(q) match {
+ case None =>
+ new JArrayList[AnyRef]
+ case Some(dbo) =>
+ dbo.get(VALUE).asInstanceOf[JArrayList[AnyRef]]
+ }
+ if (!currentList.isEmpty) {
+ // record exists
+ // remove before adding
+ coll.remove(q)
+ }
+
+ // add to the current list
+ elements.map(serializer.out(_)).foreach(currentList.add(_))
+
+ coll.insert(
+ new BasicDBObject()
+ .append(KEY, name)
+ .append(VALUE, currentList)
+ )
+ }
+
+ override def insertVectorStorageEntryFor(name: String, element: AnyRef) = {
+ insertVectorStorageEntriesFor(name, List(element))
+ }
+
+ override def getVectorStorageEntryFor(name: String, index: Int): AnyRef = {
+ try {
+ val o =
+ nullSafeFindOne(name) match {
+ case None =>
+ throw new Predef.NoSuchElementException(name + " not present")
+
+ case Some(dbo) =>
+ dbo.get(VALUE).asInstanceOf[JList[AnyRef]]
+ }
+ serializer.in(
+ o.get(index).asInstanceOf[Array[Byte]],
+ None
+ )
+ } catch {
+ case e =>
+ throw new Predef.NoSuchElementException(e.getMessage)
+ }
+ }
+
+ override def getVectorStorageRangeFor(name: String,
+ start: Option[Int], finish: Option[Int], count: Int): List[AnyRef] = {
+ try {
+ val o =
+ nullSafeFindOne(name) match {
+ case None =>
+ throw new Predef.NoSuchElementException(name + " not present")
+
+ case Some(dbo) =>
+ dbo.get(VALUE).asInstanceOf[JList[AnyRef]]
+ }
+
+ // pick the subrange and make a Scala list
+ val l =
+ List(o.subList(start.get, start.get + count).toArray: _*)
+
+ for(e <- l)
+ yield serializer.in(e.asInstanceOf[Array[Byte]], None)
+ } catch {
+ case e =>
+ throw new Predef.NoSuchElementException(e.getMessage)
+ }
+ }
+
+ override def getVectorStorageSizeFor(name: String): Int = {
+ nullSafeFindOne(name) match {
+ case None => 0
+ case Some(dbo) =>
+ dbo.get(VALUE).asInstanceOf[JList[AnyRef]].size
+ }
+ }
+
+ private def nullSafeFindOne(name: String): Option[DBObject] = {
+ val o = new BasicDBObject
+ o.put(KEY, name)
+ coll.findOneNS(o)
+ }
+}
diff --git a/kernel/src/main/scala/state/Pool.scala b/kernel/src/main/scala/state/Pool.scala
new file mode 100644
index 0000000000..6391645562
--- /dev/null
+++ b/kernel/src/main/scala/state/Pool.scala
@@ -0,0 +1,96 @@
+/**
+ * Copyright (C) 2009 Scalable Solutions.
+ */
+
+package se.scalablesolutions.akka.kernel.state
+
+import org.apache.commons.pool._
+import org.apache.commons.pool.impl._
+
+import org.apache.thrift.transport._
+
+trait Pool[T] extends java.io.Closeable {
+ def borrowObject: T
+ def returnObject(t: T): Unit
+ def invalidateObject(t: T): Unit
+ def addObject: Unit
+ def getNumIdle: Int
+ def getNumActive: Int
+ def clear: Unit
+ def setFactory(factory: PoolItemFactory[T]): Unit
+}
+
+trait PoolFactory[T] {
+ def createPool: Pool[T]
+}
+
+trait PoolItemFactory[T] {
+ def makeObject: T
+ def destroyObject(t: T): Unit
+ def validateObject(t: T): Boolean
+ def activateObject(t: T): Unit
+ def passivateObject(t: T): Unit
+}
+
+trait PoolBridge[T, OP <: ObjectPool] extends Pool[T] {
+ val impl: OP
+ override def borrowObject: T = impl.borrowObject.asInstanceOf[T]
+ override def returnObject(t: T) = impl.returnObject(t)
+ override def invalidateObject(t: T) = impl.invalidateObject(t)
+ override def addObject = impl.addObject
+ override def getNumIdle: Int = impl.getNumIdle
+ override def getNumActive: Int = impl.getNumActive
+ override def clear: Unit = impl.clear
+ override def close: Unit = impl.close
+ override def setFactory(factory: PoolItemFactory[T]) = impl.setFactory(toPoolableObjectFactory(factory))
+
+ def toPoolableObjectFactory[T](pif: PoolItemFactory[T]) = new PoolableObjectFactory {
+ def makeObject: Object = pif.makeObject.asInstanceOf[Object]
+ def destroyObject(o: Object): Unit = pif.destroyObject(o.asInstanceOf[T])
+ def validateObject(o: Object): Boolean = pif.validateObject(o.asInstanceOf[T])
+ def activateObject(o: Object): Unit = pif.activateObject(o.asInstanceOf[T])
+ def passivateObject(o: Object): Unit = pif.passivateObject(o.asInstanceOf[T])
+ }
+}
+
+object StackPool {
+ def apply[T](factory: PoolItemFactory[T]) = new PoolBridge[T,StackObjectPool] {
+ val impl = new StackObjectPool(toPoolableObjectFactory(factory))
+ }
+
+ def apply[T](factory: PoolItemFactory[T], maxIdle: Int) = new PoolBridge[T,StackObjectPool] {
+ val impl = new StackObjectPool(toPoolableObjectFactory(factory),maxIdle)
+ }
+
+ def apply[T](factory: PoolItemFactory[T], maxIdle: Int, initIdleCapacity: Int) = new PoolBridge[T,StackObjectPool] {
+ val impl = new StackObjectPool(toPoolableObjectFactory(factory),maxIdle,initIdleCapacity)
+ }
+}
+
+object SoftRefPool {
+ def apply[T](factory: PoolItemFactory[T]) = new PoolBridge[T,SoftReferenceObjectPool] {
+ val impl = new SoftReferenceObjectPool(toPoolableObjectFactory(factory))
+ }
+
+ def apply[T](factory: PoolItemFactory[T], initSize: Int) = new PoolBridge[T,SoftReferenceObjectPool] {
+ val impl = new SoftReferenceObjectPool(toPoolableObjectFactory(factory),initSize)
+ }
+}
+
+trait TransportFactory[T <: TTransport] extends PoolItemFactory[T] {
+ def createTransport: T
+ def makeObject: T = createTransport
+ def destroyObject(transport: T): Unit = transport.close
+ def validateObject(transport: T) = transport.isOpen
+ def activateObject(transport: T): Unit = if( !transport.isOpen ) transport.open else ()
+ def passivateObject(transport: T): Unit = transport.flush
+}
+
+case class SocketProvider(val host: String, val port: Int) extends TransportFactory[TSocket] {
+ def createTransport = {
+ val t = new TSocket(host, port)
+ t.open
+ t
+ }
+}
+
diff --git a/kernel/src/main/scala/state/State.scala b/kernel/src/main/scala/state/State.scala
index e49ffb25d6..45c0d7b1dc 100644
--- a/kernel/src/main/scala/state/State.scala
+++ b/kernel/src/main/scala/state/State.scala
@@ -18,6 +18,7 @@ abstract class PersistentStorageConfig extends TransactionalStateConfig
case class CassandraStorageConfig extends PersistentStorageConfig
case class TerracottaStorageConfig extends PersistentStorageConfig
case class TokyoCabinetStorageConfig extends PersistentStorageConfig
+case class MongoStorageConfig extends PersistentStorageConfig
/**
* Scala API.
@@ -39,14 +40,16 @@ object TransactionalState extends TransactionalState
*
*/
class TransactionalState {
- def newPersistentMap(config: PersistentStorageConfig): TransactionalMap[String, AnyRef] = config match {
+ def newPersistentMap(config: PersistentStorageConfig): TransactionalMap[AnyRef, AnyRef] = config match {
case CassandraStorageConfig() => new CassandraPersistentTransactionalMap
+ case MongoStorageConfig() => new MongoPersistentTransactionalMap
case TerracottaStorageConfig() => throw new UnsupportedOperationException
case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException
}
def newPersistentVector(config: PersistentStorageConfig): TransactionalVector[AnyRef] = config match {
case CassandraStorageConfig() => new CassandraPersistentTransactionalVector
+ case MongoStorageConfig() => new MongoPersistentTransactionalVector
case TerracottaStorageConfig() => throw new UnsupportedOperationException
case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException
}
@@ -142,7 +145,7 @@ abstract class PersistentTransactionalMap[K, V] extends TransactionalMap[K, V] {
// FIXME: need to handle remove in another changeSet
protected[kernel] val changeSet = new HashMap[K, V]
- def getRange(start: Int, count: Int)
+ def getRange(start: Option[AnyRef], count: Int)
def begin
def commit
@@ -155,27 +158,38 @@ abstract class PersistentTransactionalMap[K, V] extends TransactionalMap[K, V] {
None // always return None to speed up writes (else need to go to DB to get
}
- override def remove(key: K) = {
- verifyTransaction
- changeSet -= key
- }
-
override def -=(key: K) = remove(key)
override def update(key: K, value: V) = put(key, value)
}
/**
- * Implements a persistent transactional map based on the Cassandra distributed P2P key-value storage.
+ * Implementation of PersistentTransactionalMap for every concrete
+ * storage will have the same workflow. This abstracts the workflow.
+ *
+ * Subclasses just need to provide the actual concrete instance for the
+ * abstract val storage.
*
* @author Jonas Bonér
*/
-class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[String, AnyRef] {
+abstract class TemplatePersistentTransactionalMap extends PersistentTransactionalMap[AnyRef, AnyRef] {
- override def getRange(start: Int, count: Int) = {
+ // to be concretized in subclasses
+ val storage: MapStorage
+
+ override def remove(key: AnyRef) = {
+ verifyTransaction
+ if (changeSet.contains(key)) changeSet -= key
+ else storage.removeMapStorageFor(uuid, key)
+ }
+
+ override def getRange(start: Option[AnyRef], count: Int) =
+ getRange(start, None, count)
+
+ def getRange(start: Option[AnyRef], finish: Option[AnyRef], count: Int) = {
verifyTransaction
try {
- CassandraStorage.getMapStorageRangeFor(uuid, start, count)
+ storage.getMapStorageRangeFor(uuid, start, finish, count)
} catch {
case e: Exception => Nil
}
@@ -183,7 +197,7 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Str
// ---- For Transactional ----
override def commit = {
- CassandraStorage.insertMapStorageEntriesFor(uuid, changeSet.toList)
+ storage.insertMapStorageEntriesFor(uuid, changeSet.toList)
changeSet.clear
}
@@ -191,16 +205,16 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Str
override def clear = {
verifyTransaction
try {
- CassandraStorage.removeMapStorageFor(uuid)
+ storage.removeMapStorageFor(uuid)
} catch {
case e: Exception => {}
}
}
- override def contains(key: String): Boolean = {
+ override def contains(key: AnyRef): Boolean = {
try {
verifyTransaction
- CassandraStorage.getMapStorageEntryFor(uuid, key).isDefined
+ storage.getMapStorageEntryFor(uuid, key).isDefined
} catch {
case e: Exception => false
}
@@ -209,19 +223,19 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Str
override def size: Int = {
verifyTransaction
try {
- CassandraStorage.getMapStorageSizeFor(uuid)
+ storage.getMapStorageSizeFor(uuid)
} catch {
case e: Exception => 0
}
}
// ---- For scala.collection.mutable.Map ----
- override def get(key: String): Option[AnyRef] = {
+ override def get(key: AnyRef): Option[AnyRef] = {
verifyTransaction
// if (changeSet.contains(key)) changeSet.get(key)
// else {
val result = try {
- CassandraStorage.getMapStorageEntryFor(uuid, key)
+ storage.getMapStorageEntryFor(uuid, key)
} catch {
case e: Exception => None
}
@@ -229,16 +243,16 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Str
//}
}
- override def elements: Iterator[Tuple2[String, AnyRef]] = {
+ override def elements: Iterator[Tuple2[AnyRef, AnyRef]] = {
//verifyTransaction
- new Iterator[Tuple2[String, AnyRef]] {
- private val originalList: List[Tuple2[String, AnyRef]] = try {
- CassandraStorage.getMapStorageFor(uuid)
+ new Iterator[Tuple2[AnyRef, AnyRef]] {
+ private val originalList: List[Tuple2[AnyRef, AnyRef]] = try {
+ storage.getMapStorageFor(uuid)
} catch {
case e: Throwable => Nil
}
private var elements = originalList.reverse
- override def next: Tuple2[String, AnyRef]= synchronized {
+ override def next: Tuple2[AnyRef, AnyRef]= synchronized {
val element = elements.head
elements = elements.tail
element
@@ -248,6 +262,25 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Str
}
}
+
+/**
+ * Implements a persistent transactional map based on the Cassandra distributed P2P key-value storage.
+ *
+ * @author Debasish Ghosh
+ */
+class CassandraPersistentTransactionalMap extends TemplatePersistentTransactionalMap {
+ val storage = CassandraStorage
+}
+
+/**
+ * Implements a persistent transactional map based on the MongoDB distributed P2P key-value storage.
+ *
+ * @author Debasish Ghosh
+ */
+class MongoPersistentTransactionalMap extends TemplatePersistentTransactionalMap {
+ val storage = MongoStorage
+}
+
/**
* Base for all transactional vector implementations.
*
@@ -344,27 +377,32 @@ abstract class PersistentTransactionalVector[T] extends TransactionalVector[T] {
}
/**
- * Implements a persistent transactional vector based on the Cassandra distributed P2P key-value storage.
+ * Implements a template for a concrete persistent transactional vector based storage.
*
- * @author Jonas Bonér
+ * @author Debasish Ghosh
*/
-class CassandraPersistentTransactionalVector extends PersistentTransactionalVector[AnyRef] {
+abstract class TemplatePersistentTransactionalVector extends PersistentTransactionalVector[AnyRef] {
+
+ val storage: VectorStorage
// ---- For TransactionalVector ----
override def get(index: Int): AnyRef = {
verifyTransaction
if (changeSet.size > index) changeSet(index)
- else CassandraStorage.getVectorStorageEntryFor(uuid, index)
+ else storage.getVectorStorageEntryFor(uuid, index)
}
- override def getRange(start: Int, count: Int): List[AnyRef] = {
+ override def getRange(start: Int, count: Int): List[AnyRef] =
+ getRange(Some(start), None, count)
+
+ def getRange(start: Option[Int], finish: Option[Int], count: Int): List[AnyRef] = {
verifyTransaction
- CassandraStorage.getVectorStorageRangeFor(uuid, start, count)
+ storage.getVectorStorageRangeFor(uuid, start, finish, count)
}
override def length: Int = {
verifyTransaction
- CassandraStorage.getVectorStorageSizeFor(uuid)
+ storage.getVectorStorageSizeFor(uuid)
}
override def apply(index: Int): AnyRef = get(index)
@@ -381,11 +419,29 @@ class CassandraPersistentTransactionalVector extends PersistentTransactionalVect
// ---- For Transactional ----
override def commit = {
// FIXME: should use batch function once the bug is resolved
- for (element <- changeSet) CassandraStorage.insertVectorStorageEntryFor(uuid, element)
+ for (element <- changeSet) storage.insertVectorStorageEntryFor(uuid, element)
changeSet.clear
}
}
+/**
+ * Implements a persistent transactional vector based on the Cassandra distributed P2P key-value storage.
+ *
+ * @author Debaissh Ghosh
+ */
+class CassandraPersistentTransactionalVector extends TemplatePersistentTransactionalVector {
+ val storage = CassandraStorage
+}
+
+/**
+ * Implements a persistent transactional vector based on the MongoDB distributed P2P key-value storage.
+ *
+ * @author Debaissh Ghosh
+ */
+class MongoPersistentTransactionalVector extends TemplatePersistentTransactionalVector {
+ val storage = MongoStorage
+}
+
/**
* Implements a transactional reference.
*
diff --git a/kernel/src/main/scala/state/Storage.scala b/kernel/src/main/scala/state/Storage.scala
new file mode 100644
index 0000000000..2d31695af5
--- /dev/null
+++ b/kernel/src/main/scala/state/Storage.scala
@@ -0,0 +1,27 @@
+package se.scalablesolutions.akka.kernel.state
+
+// abstracts persistence storage
+trait Storage {
+}
+
+// for Maps
+trait MapStorage extends Storage {
+ def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[AnyRef, AnyRef]])
+ def insertMapStorageEntryFor(name: String, key: AnyRef, value: AnyRef)
+ def removeMapStorageFor(name: String)
+ def removeMapStorageFor(name: String, key: AnyRef)
+ def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef]
+ def getMapStorageSizeFor(name: String): Int
+ def getMapStorageFor(name: String): List[Tuple2[AnyRef, AnyRef]]
+ def getMapStorageRangeFor(name: String, start: Option[AnyRef],
+ finish: Option[AnyRef], count: Int): List[Tuple2[AnyRef, AnyRef]]
+}
+
+// for vectors
+trait VectorStorage extends Storage {
+ def insertVectorStorageEntryFor(name: String, element: AnyRef)
+ def insertVectorStorageEntriesFor(name: String, elements: List[AnyRef])
+ def getVectorStorageEntryFor(name: String, index: Int): AnyRef
+ def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[AnyRef]
+ def getVectorStorageSizeFor(name: String): Int
+}
diff --git a/kernel/src/main/scala/util/Helpers.scala b/kernel/src/main/scala/util/Helpers.scala
index cc629e0801..6d43f06030 100644
--- a/kernel/src/main/scala/util/Helpers.scala
+++ b/kernel/src/main/scala/util/Helpers.scala
@@ -4,15 +4,9 @@
package se.scalablesolutions.akka.kernel.util
-import java.io.UnsupportedEncodingException
-import java.security.{NoSuchAlgorithmException, MessageDigest}
+import java.security.MessageDigest
import java.util.concurrent.locks.ReentrantReadWriteLock
-import scala.actors._
-import scala.actors.Actor._
-
-import net.lag.logging.Logger
-
class SystemFailure(cause: Throwable) extends RuntimeException(cause)
/**
@@ -20,7 +14,18 @@ class SystemFailure(cause: Throwable) extends RuntimeException(cause)
*/
object Helpers extends Logging {
- def getDigestFor(s: String) = {
+ implicit def null2Option[T](t: T): Option[T] = if (t != null) Some(t) else None
+
+ def intToBytes(value: Int): Array[Byte] = {
+ val bytes = new Array[Byte](4)
+ bytes(0) = (value >>> 24).asInstanceOf[Byte]
+ bytes(1) = (value >>> 16).asInstanceOf[Byte]
+ bytes(2) = (value >>> 8).asInstanceOf[Byte]
+ bytes(3) = value.asInstanceOf[Byte]
+ bytes
+ }
+
+ def getMD5For(s: String) = {
val digest = MessageDigest.getInstance("MD5")
digest.update(s.getBytes("ASCII"))
val bytes = digest.digest
@@ -59,51 +64,5 @@ object Helpers extends Logging {
}
}
}
-
- // ================================================
- // implicit conversion between regular actor and actor with a type future
- implicit def actorWithFuture(a: Actor) = new ActorWithTypedFuture(a)
-
- abstract class FutureWithTimeout[T](ch: InputChannel[T]) extends Future[T](ch) {
- def receiveWithin(timeout: Int) : Option[T]
- override def respond(f: T => Unit): Unit = throw new UnsupportedOperationException("Does not support the Responder API")
- }
-
- def receiveOrFail[T](future: => FutureWithTimeout[T], timeout: Int, errorHandler: => T): T = {
- future.receiveWithin(timeout) match {
- case None => errorHandler
- case Some(reply) => reply
- }
- }
-
- class ActorWithTypedFuture(a: Actor) {
- require(a != null)
-
- def !!: FutureWithTimeout[A] = {
- val ftch = new Channel[A](Actor.self)
- a.send(msg, ftch.asInstanceOf[OutputChannel[Any]])
- new FutureWithTimeout[A](ftch) {
- def apply() =
- if (isSet) value.get.asInstanceOf[A]
- else ch.receive {
- case a =>
- value = Some(a)
- value.get.asInstanceOf[A]
- }
- def isSet = receiveWithin(0).isDefined
- def receiveWithin(timeout: Int): Option[A] = value match {
- case None => ch.receiveWithin(timeout) {
- case TIMEOUT =>
- log.debug("Future timed out while waiting for actor [%s]", a)
- None
- case a =>
- value = Some(a)
- value.asInstanceOf[Option[A]]
- }
- case a => a.asInstanceOf[Option[A]]
- }
- }
- }
- }
}
diff --git a/kernel/src/test/scala/AllTest.scala b/kernel/src/test/scala/AllTest.scala
index b0ef909aae..a225bfb080 100644
--- a/kernel/src/test/scala/AllTest.scala
+++ b/kernel/src/test/scala/AllTest.scala
@@ -16,7 +16,7 @@ object AllTest extends TestCase {
suite.addTestSuite(classOf[EventBasedThreadPoolDispatcherTest])
suite.addTestSuite(classOf[ActorSpec])
suite.addTestSuite(classOf[RemoteActorSpec])
- suite.addTestSuite(classOf[PersistentActorSpec])
+ //suite.addTestSuite(classOf[PersistentActorSpec])
suite.addTestSuite(classOf[InMemoryActorSpec])
//suite.addTestSuite(classOf[TransactionClasherSpec])
suite
diff --git a/kernel/src/test/scala/EventBasedSingleThreadDispatcherTest.scala b/kernel/src/test/scala/EventBasedSingleThreadDispatcherTest.scala
index dc272893f5..3ac4eee51a 100644
--- a/kernel/src/test/scala/EventBasedSingleThreadDispatcherTest.scala
+++ b/kernel/src/test/scala/EventBasedSingleThreadDispatcherTest.scala
@@ -55,7 +55,7 @@ class EventBasedSingleThreadDispatcherTest extends TestCase {
val guardLock = new ReentrantLock
val handleLatch = new CountDownLatch(100)
val key = "key"
- val dispatcher = new EventBasedSingleThreadDispatcher
+ val dispatcher = new EventBasedSingleThreadDispatcher("name")
dispatcher.registerHandler(key, new TestMessageHandle(handleLatch))
dispatcher.start
for (i <- 0 until 100) {
@@ -69,7 +69,7 @@ class EventBasedSingleThreadDispatcherTest extends TestCase {
val handleLatch = new CountDownLatch(2)
val key1 = "key1"
val key2 = "key2"
- val dispatcher = new EventBasedSingleThreadDispatcher
+ val dispatcher = new EventBasedSingleThreadDispatcher("name")
dispatcher.registerHandler(key1, new TestMessageHandle(handleLatch))
dispatcher.registerHandler(key2, new TestMessageHandle(handleLatch))
dispatcher.start
@@ -83,7 +83,7 @@ class EventBasedSingleThreadDispatcherTest extends TestCase {
val handleLatch = new CountDownLatch(200)
val key1 = "key1"
val key2 = "key2"
- val dispatcher = new EventBasedSingleThreadDispatcher
+ val dispatcher = new EventBasedSingleThreadDispatcher("name")
dispatcher.registerHandler(key1, new MessageInvoker {
var currentValue = -1;
def invoke(message: MessageInvocation) {
diff --git a/kernel/src/test/scala/EventBasedThreadPoolDispatcherTest.scala b/kernel/src/test/scala/EventBasedThreadPoolDispatcherTest.scala
index 89b908015e..c0b205d6f6 100644
--- a/kernel/src/test/scala/EventBasedThreadPoolDispatcherTest.scala
+++ b/kernel/src/test/scala/EventBasedThreadPoolDispatcherTest.scala
@@ -37,7 +37,7 @@ class EventBasedThreadPoolDispatcherTest extends TestCase {
val guardLock = new ReentrantLock
val handleLatch = new CountDownLatch(10)
val key = "key"
- val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher
+ val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher("name")
dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100)
.setCorePoolSize(2)
.setMaxPoolSize(4)
@@ -76,7 +76,7 @@ class EventBasedThreadPoolDispatcherTest extends TestCase {
val handlersBarrier = new CyclicBarrier(3)
val key1 = "key1"
val key2 = "key2"
- val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher
+ val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher("name")
dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100)
.setCorePoolSize(2)
.setMaxPoolSize(4)
@@ -121,7 +121,7 @@ class EventBasedThreadPoolDispatcherTest extends TestCase {
val handleLatch = new CountDownLatch(200)
val key1 = "key1"
val key2 = "key2"
- val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher
+ val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher("name")
dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100)
.setCorePoolSize(2)
.setMaxPoolSize(4)
diff --git a/kernel/src/test/scala/JerseySpec.scala b/kernel/src/test/scala/JerseySpec.scala
deleted file mode 100644
index 5f0147ea02..0000000000
--- a/kernel/src/test/scala/JerseySpec.scala
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Copyright (C) 2009 Scalable Solutions.
- */
-
-package se.scalablesolutions.akka.kernel
-
-import akka.kernel.config.ActiveObjectGuiceConfigurator
-import kernel.config.ScalaConfig._
-
-import com.sun.grizzly.http.SelectorThread
-import com.sun.jersey.api.client.Client
-import com.sun.jersey.core.header.MediaTypes
-import com.sun.jersey.api.container.grizzly.GrizzlyWebContainerFactory
-import javax.ws.rs.core.UriBuilder
-import javax.ws.rs.{Produces, Path, GET}
-
-import com.google.inject.{AbstractModule, Scopes}
-
-import org.scalatest.Spec
-import org.scalatest.matchers.ShouldMatchers
-
-//simport com.jteigen.scalatest.JUnit4Runner
-import org.junit.runner.RunWith
-import org.junit.Test
-import org.junit.Assert._
-
-/**
- * @author Jonas Bonér
- */
-//@RunWith(classOf[JUnit4Runner])
-class JerseySpec extends Spec with ShouldMatchers {
-
- describe("A Jersey REST service") {
- it("should ...") {
- /*
- val selector = startJersey
- selector.start
- val conf = new ActiveObjectGuiceConfigurator
- conf.configure(
- RestartStrategy(AllForOne, 3, 5000),
- Component(
- classOf[resource.JerseyFoo],
- LifeCycle(Permanent, 1000),
- 1000) ::
- Nil).supervise
-
- conf.getInstance(classOf[resource.JerseyFoo])
- */
-
- /*
- val client = Client.create
- val webResource = client.resource(UriBuilder.fromUri("http://localhost/").port(9998).build)
- //val webResource = client.resource("http://localhost:9998/foo")
- val responseMsg = webResource.get(classOf[String])
- responseMsg should equal ("Hello World")
- selector.stopEndpoint
- */
- }
- }
-
- def startJersey: SelectorThread = {
- val initParams = new java.util.HashMap[String, String]
- initParams.put("com.sun.jersey.config.property.packages", "se.scalablesolutions.akka.kernel")
- GrizzlyWebContainerFactory.create(UriBuilder.fromUri("http://localhost/").port(9998).build(), initParams)
- }
-}
-
-// @GET
-// @Produces("application/json")
-// @Path("/network/{id: [0-9]+}/{nid}")
-// def getUserByNetworkId(@PathParam {val value = "id"} id: Int, @PathParam {val value = "nid"} networkId: String): User = {
-// val q = em.createQuery("SELECT u FROM User u WHERE u.networkId = :id AND u.networkUserId = :nid")
-// q.setParameter("id", id)
-// q.setParameter("nid", networkId)
-// q.getSingleResult.asInstanceOf[User]
-// }
-
-package resource {
- import javax.ws.rs.{Produces, Path, GET}
-
- class JerseyFoo {
- @GET
- @Produces(Array("application/json"))
- def foo: String = { val ret = "JerseyFoo.foo"; println(ret); ret }
- }
- @Path("/foo")
- class JerseyFooSub extends JerseyFoo
- class JerseyBar {
- def bar(msg: String) = msg + "return_bar "
- }
-}
diff --git a/kernel/src/test/scala/MongoStorageSpec.scala b/kernel/src/test/scala/MongoStorageSpec.scala
new file mode 100644
index 0000000000..5baa03baae
--- /dev/null
+++ b/kernel/src/test/scala/MongoStorageSpec.scala
@@ -0,0 +1,272 @@
+package se.scalablesolutions.akka.kernel.state
+
+import junit.framework.TestCase
+
+import org.junit.{Test, Before}
+import org.junit.Assert._
+
+class MongoStorageSpec extends TestCase {
+
+ val changeSetV = new scala.collection.mutable.ArrayBuffer[AnyRef]
+ val changeSetM = new scala.collection.mutable.HashMap[AnyRef, AnyRef]
+
+ override def setUp = {
+ MongoStorage.coll.drop
+ }
+
+ @Test
+ def testVectorInsertForTransactionId = {
+ changeSetV += "debasish" // string
+ changeSetV += List(1, 2, 3) // Scala List
+ changeSetV += List(100, 200)
+ MongoStorage.insertVectorStorageEntriesFor("U-A1", changeSetV.toList)
+ assertEquals(
+ 3,
+ MongoStorage.getVectorStorageSizeFor("U-A1"))
+ changeSetV.clear
+
+ // changeSetV should be reinitialized
+ changeSetV += List(12, 23, 45)
+ changeSetV += "maulindu"
+ MongoStorage.insertVectorStorageEntriesFor("U-A1", changeSetV.toList)
+ assertEquals(
+ 5,
+ MongoStorage.getVectorStorageSizeFor("U-A1"))
+
+ // add more to the same changeSetV
+ changeSetV += "ramanendu"
+ changeSetV += Map(1 -> "dg", 2 -> "mc")
+
+ // add for a diff transaction
+ MongoStorage.insertVectorStorageEntriesFor("U-A2", changeSetV.toList)
+ assertEquals(
+ 4,
+ MongoStorage.getVectorStorageSizeFor("U-A2"))
+
+ // previous transaction change set should remain same
+ assertEquals(
+ 5,
+ MongoStorage.getVectorStorageSizeFor("U-A1"))
+
+ // test single element entry
+ MongoStorage.insertVectorStorageEntryFor("U-A1", Map(1->1, 2->4, 3->9))
+ assertEquals(
+ 6,
+ MongoStorage.getVectorStorageSizeFor("U-A1"))
+ }
+
+ @Test
+ def testVectorFetchForKeys = {
+
+ // initially everything 0
+ assertEquals(
+ 0,
+ MongoStorage.getVectorStorageSizeFor("U-A2"))
+
+ assertEquals(
+ 0,
+ MongoStorage.getVectorStorageSizeFor("U-A1"))
+
+ // get some stuff
+ changeSetV += "debasish"
+ changeSetV += List(12, 13, 14)
+ MongoStorage.insertVectorStorageEntriesFor("U-A1", changeSetV.toList)
+
+ assertEquals(
+ 2,
+ MongoStorage.getVectorStorageSizeFor("U-A1"))
+
+ assertEquals(
+ "debasish",
+ MongoStorage.getVectorStorageEntryFor("U-A1", 0).asInstanceOf[String])
+
+ assertEquals(
+ List(12, 13, 14),
+ MongoStorage.getVectorStorageEntryFor("U-A1", 1).asInstanceOf[List[Int]])
+
+ changeSetV.clear
+ changeSetV += Map(1->1, 2->4, 3->9)
+ changeSetV += BigInt(2310)
+ changeSetV += List(100, 200, 300)
+ MongoStorage.insertVectorStorageEntriesFor("U-A1", changeSetV.toList)
+
+ assertEquals(
+ 5,
+ MongoStorage.getVectorStorageSizeFor("U-A1"))
+
+ val r =
+ MongoStorage.getVectorStorageRangeFor("U-A1", Some(1), None, 3)
+
+ assertEquals(3, r.size)
+ assertEquals(List(12, 13, 14), r(0).asInstanceOf[List[Int]])
+ }
+
+ @Test
+ def testVectorFetchForNonExistentKeys = {
+ try {
+ MongoStorage.getVectorStorageEntryFor("U-A1", 1)
+ fail("should throw an exception")
+ } catch {case e: Predef.NoSuchElementException => {}}
+
+ try {
+ MongoStorage.getVectorStorageRangeFor("U-A1", Some(2), None, 12)
+ fail("should throw an exception")
+ } catch {case e: Predef.NoSuchElementException => {}}
+ }
+
+ @Test
+ def testMapInsertForTransactionId = {
+ case class Foo(no: Int, name: String)
+ fillMap
+
+ // add some more to changeSet
+ changeSetM += "5" -> Foo(12, "dg")
+ changeSetM += "6" -> java.util.Calendar.getInstance.getTime
+
+ // insert all into Mongo
+ MongoStorage.insertMapStorageEntriesFor("U-M1", changeSetM.toList)
+ assertEquals(
+ 6,
+ MongoStorage.getMapStorageSizeFor("U-M1"))
+
+ // individual insert api
+ MongoStorage.insertMapStorageEntryFor("U-M1", "7", "akka")
+ MongoStorage.insertMapStorageEntryFor("U-M1", "8", List(23, 25))
+ assertEquals(
+ 8,
+ MongoStorage.getMapStorageSizeFor("U-M1"))
+
+ // add the same changeSet for another transaction
+ MongoStorage.insertMapStorageEntriesFor("U-M2", changeSetM.toList)
+ assertEquals(
+ 6,
+ MongoStorage.getMapStorageSizeFor("U-M2"))
+
+ // the first transaction should remain the same
+ assertEquals(
+ 8,
+ MongoStorage.getMapStorageSizeFor("U-M1"))
+ changeSetM.clear
+ }
+
+ @Test
+ def testMapContents = {
+ fillMap
+ MongoStorage.insertMapStorageEntriesFor("U-M1", changeSetM.toList)
+ MongoStorage.getMapStorageEntryFor("U-M1", "2") match {
+ case Some(x) => assertEquals("peter", x.asInstanceOf[String])
+ case None => fail("should fetch peter")
+ }
+ MongoStorage.getMapStorageEntryFor("U-M1", "4") match {
+ case Some(x) => assertEquals(3, x.asInstanceOf[List[Int]].size)
+ case None => fail("should fetch list")
+ }
+ MongoStorage.getMapStorageEntryFor("U-M1", "3") match {
+ case Some(x) => assertEquals(2, x.asInstanceOf[List[Int]].size)
+ case None => fail("should fetch list")
+ }
+
+ // get the entire map
+ val l: List[Tuple2[AnyRef, AnyRef]] =
+ MongoStorage.getMapStorageFor("U-M1")
+
+ assertEquals(4, l.size)
+ assertTrue(l.map(_._1).contains("1"))
+ assertTrue(l.map(_._1).contains("2"))
+ assertTrue(l.map(_._1).contains("3"))
+ assertTrue(l.map(_._1).contains("4"))
+
+ assertTrue(l.map(_._2).contains("john"))
+
+ // trying to fetch for a non-existent transaction will throw
+ try {
+ MongoStorage.getMapStorageFor("U-M2")
+ fail("should throw an exception")
+ } catch {case e: Predef.NoSuchElementException => {}}
+
+ changeSetM.clear
+ }
+
+ @Test
+ def testMapContentsByRange = {
+ fillMap
+ changeSetM += "5" -> Map(1 -> "dg", 2 -> "mc")
+ MongoStorage.insertMapStorageEntriesFor("U-M1", changeSetM.toList)
+
+ // specify start and count
+ val l: List[Tuple2[AnyRef, AnyRef]] =
+ MongoStorage.getMapStorageRangeFor(
+ "U-M1", Some(Integer.valueOf(2)), None, 3)
+
+ assertEquals(3, l.size)
+ assertEquals("3", l(0)._1.asInstanceOf[String])
+ assertEquals(List(100, 200), l(0)._2.asInstanceOf[List[Int]])
+ assertEquals("4", l(1)._1.asInstanceOf[String])
+ assertEquals(List(10, 20, 30), l(1)._2.asInstanceOf[List[Int]])
+
+ // specify start, finish and count where finish - start == count
+ assertEquals(3,
+ MongoStorage.getMapStorageRangeFor(
+ "U-M1", Some(Integer.valueOf(2)), Some(Integer.valueOf(5)), 3).size)
+
+ // specify start, finish and count where finish - start > count
+ assertEquals(3,
+ MongoStorage.getMapStorageRangeFor(
+ "U-M1", Some(Integer.valueOf(2)), Some(Integer.valueOf(9)), 3).size)
+
+ // do not specify start or finish
+ assertEquals(3,
+ MongoStorage.getMapStorageRangeFor(
+ "U-M1", None, None, 3).size)
+
+ // specify finish and count
+ assertEquals(3,
+ MongoStorage.getMapStorageRangeFor(
+ "U-M1", None, Some(Integer.valueOf(3)), 3).size)
+
+ // specify start, finish and count where finish < start
+ assertEquals(3,
+ MongoStorage.getMapStorageRangeFor(
+ "U-M1", Some(Integer.valueOf(2)), Some(Integer.valueOf(1)), 3).size)
+
+ changeSetM.clear
+ }
+
+ @Test
+ def testMapStorageRemove = {
+ fillMap
+ changeSetM += "5" -> Map(1 -> "dg", 2 -> "mc")
+
+ MongoStorage.insertMapStorageEntriesFor("U-M1", changeSetM.toList)
+ assertEquals(5,
+ MongoStorage.getMapStorageSizeFor("U-M1"))
+
+ // remove key "3"
+ MongoStorage.removeMapStorageFor("U-M1", "3")
+ assertEquals(4,
+ MongoStorage.getMapStorageSizeFor("U-M1"))
+
+ try {
+ MongoStorage.getMapStorageEntryFor("U-M1", "3")
+ fail("should throw exception")
+ } catch { case e => {}}
+
+ // remove the whole stuff
+ MongoStorage.removeMapStorageFor("U-M1")
+
+ try {
+ MongoStorage.getMapStorageFor("U-M1")
+ fail("should throw exception")
+ } catch { case e: NoSuchElementException => {}}
+
+ changeSetM.clear
+ }
+
+ private def fillMap = {
+ changeSetM += "1" -> "john"
+ changeSetM += "2" -> "peter"
+ changeSetM += "3" -> List(100, 200)
+ changeSetM += "4" -> List(10, 20, 30)
+ changeSetM
+ }
+}
diff --git a/kernel/src/test/scala/RemoteActorSpec.scala b/kernel/src/test/scala/RemoteActorSpec.scala
index 61397e6018..3387aa8eb0 100644
--- a/kernel/src/test/scala/RemoteActorSpec.scala
+++ b/kernel/src/test/scala/RemoteActorSpec.scala
@@ -26,7 +26,6 @@ class RemoteActorSpecActorBidirectional extends Actor {
}
class RemoteActorSpec extends TestCase {
-
kernel.Kernel.config
new Thread(new Runnable() {
def run = {
diff --git a/kernel/src/test/scala/ThreadBasedDispatcherTest.scala b/kernel/src/test/scala/ThreadBasedDispatcherTest.scala
index 621e3dccfd..1ad5c0b733 100644
--- a/kernel/src/test/scala/ThreadBasedDispatcherTest.scala
+++ b/kernel/src/test/scala/ThreadBasedDispatcherTest.scala
@@ -49,7 +49,7 @@ class ThreadBasedDispatcherTest extends TestCase {
private def internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially: Unit = {
val guardLock = new ReentrantLock
val handleLatch = new CountDownLatch(100)
- val dispatcher = new ThreadBasedDispatcher(new TestMessageHandle(handleLatch))
+ val dispatcher = new ThreadBasedDispatcher("name", new TestMessageHandle(handleLatch))
dispatcher.start
for (i <- 0 until 100) {
dispatcher.messageQueue.append(new MessageInvocation("id", new Object, None, None))
@@ -60,7 +60,7 @@ class ThreadBasedDispatcherTest extends TestCase {
private def internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder: Unit = {
val handleLatch = new CountDownLatch(100)
- val dispatcher = new ThreadBasedDispatcher(new MessageInvoker {
+ val dispatcher = new ThreadBasedDispatcher("name", new MessageInvoker {
var currentValue = -1;
def invoke(message: MessageInvocation) {
if (threadingIssueDetected.get) return
diff --git a/lib/antlr-3.1.3.jar b/lib/antlr-3.1.3.jar
deleted file mode 100644
index d3bd9cf942..0000000000
Binary files a/lib/antlr-3.1.3.jar and /dev/null differ
diff --git a/lib/aspectwerkz-jdk5-2.1.jar b/lib/aspectwerkz-jdk5-2.1.jar
new file mode 100755
index 0000000000..c258335f67
Binary files /dev/null and b/lib/aspectwerkz-jdk5-2.1.jar differ
diff --git a/lib/aspectwerkz2.dtd b/lib/aspectwerkz2.dtd
new file mode 100755
index 0000000000..5ed29fe769
--- /dev/null
+++ b/lib/aspectwerkz2.dtd
@@ -0,0 +1,252 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/lib/atmosphere-core-0.3.jar~HEAD b/lib/atmosphere-core-0.3.jar~HEAD
deleted file mode 100644
index aef71c00ba..0000000000
Binary files a/lib/atmosphere-core-0.3.jar~HEAD and /dev/null differ
diff --git a/lib/atmosphere-portable-runtime-0.3.jar~HEAD b/lib/atmosphere-portable-runtime-0.3.jar~HEAD
deleted file mode 100644
index 19875c8c51..0000000000
Binary files a/lib/atmosphere-portable-runtime-0.3.jar~HEAD and /dev/null differ
diff --git a/lib/cassandra-0.4.0-dev.jar b/lib/cassandra-0.4.0-dev.jar
deleted file mode 100644
index 8274103dc3..0000000000
Binary files a/lib/cassandra-0.4.0-dev.jar and /dev/null differ
diff --git a/lib/cassandra-0.4.0-trunk.jar b/lib/cassandra-0.4.0-trunk.jar
new file mode 100755
index 0000000000..44796086be
Binary files /dev/null and b/lib/cassandra-0.4.0-trunk.jar differ
diff --git a/lib/cassidy-0.1.jar b/lib/cassidy-0.1.jar
deleted file mode 100755
index ec7e313eac..0000000000
Binary files a/lib/cassidy-0.1.jar and /dev/null differ
diff --git a/lib/commons-collections-3.2.1.jar b/lib/commons-collections-3.2.1.jar
deleted file mode 100644
index c35fa1fee1..0000000000
Binary files a/lib/commons-collections-3.2.1.jar and /dev/null differ
diff --git a/lib/commons-javaflow-1.0-SNAPSHOT.jar b/lib/commons-javaflow-1.0-SNAPSHOT.jar
deleted file mode 100644
index 199b853307..0000000000
Binary files a/lib/commons-javaflow-1.0-SNAPSHOT.jar and /dev/null differ
diff --git a/lib/commons-lang-2.4.jar b/lib/commons-lang-2.4.jar
deleted file mode 100644
index 532939ecab..0000000000
Binary files a/lib/commons-lang-2.4.jar and /dev/null differ
diff --git a/lib/commons-pool-1.5.1.jar b/lib/commons-pool-1.5.1.jar
new file mode 100755
index 0000000000..c3ff84cfb8
Binary files /dev/null and b/lib/commons-pool-1.5.1.jar differ
diff --git a/lib/guice-jsr250-2.0-SNAPSHOT.jar b/lib/guice-jsr250-2.0-SNAPSHOT.jar
deleted file mode 100644
index f2be8ad48d..0000000000
Binary files a/lib/guice-jsr250-2.0-SNAPSHOT.jar and /dev/null differ
diff --git a/lib/high-scale-lib.jar b/lib/high-scale-lib.jar
deleted file mode 100644
index 421a436eed..0000000000
Binary files a/lib/high-scale-lib.jar and /dev/null differ
diff --git a/lib/javautils-2.7.4-0.1.jar b/lib/javautils-2.7.4-0.1.jar
new file mode 100644
index 0000000000..a0c51bf7da
Binary files /dev/null and b/lib/javautils-2.7.4-0.1.jar differ
diff --git a/lib/jersey-client-1.1.1-ea.jar b/lib/jersey-client-1.1.1-ea.jar
deleted file mode 100755
index fae00c4665..0000000000
Binary files a/lib/jersey-client-1.1.1-ea.jar and /dev/null differ
diff --git a/lib/jersey-core-1.1.1-ea.jar b/lib/jersey-core-1.1.1-ea.jar
old mode 100755
new mode 100644
diff --git a/lib/jersey-server-1.1.1-ea.jar b/lib/jersey-server-1.1.1-ea.jar
old mode 100755
new mode 100644
diff --git a/lib/jsr250-api-1.0.jar b/lib/jsr250-api-1.0.jar
deleted file mode 100644
index c1f29bf844..0000000000
Binary files a/lib/jsr250-api-1.0.jar and /dev/null differ
diff --git a/lib/junit-4.5.jar b/lib/junit-4.5.jar
deleted file mode 100644
index 733921623d..0000000000
Binary files a/lib/junit-4.5.jar and /dev/null differ
diff --git a/lib/junit4runner-1.0.jar b/lib/junit4runner-1.0.jar
deleted file mode 100644
index 6f91bd8044..0000000000
Binary files a/lib/junit4runner-1.0.jar and /dev/null differ
diff --git a/lib/lucene-core-2.2.0.jar b/lib/lucene-core-2.2.0.jar
deleted file mode 100644
index 2469481c38..0000000000
Binary files a/lib/lucene-core-2.2.0.jar and /dev/null differ
diff --git a/lib/netty-3.1.0.CR1.jar b/lib/netty-3.1.0.GA.jar
similarity index 57%
rename from lib/netty-3.1.0.CR1.jar
rename to lib/netty-3.1.0.GA.jar
index f1180b0477..94c9b24902 100644
Binary files a/lib/netty-3.1.0.CR1.jar and b/lib/netty-3.1.0.GA.jar differ
diff --git a/lib/scala-stats-1.0.jar b/lib/scala-stats-1.0.jar
new file mode 100644
index 0000000000..6b1b43bf7b
Binary files /dev/null and b/lib/scala-stats-1.0.jar differ
diff --git a/lib/scalatest-0.9.5.jar b/lib/scalatest-0.9.5.jar
deleted file mode 100644
index adb241a55a..0000000000
Binary files a/lib/scalatest-0.9.5.jar and /dev/null differ
diff --git a/lib/stringtemplate-3.0.jar b/lib/stringtemplate-3.0.jar
deleted file mode 100644
index df5e6e517f..0000000000
Binary files a/lib/stringtemplate-3.0.jar and /dev/null differ
diff --git a/pom.xml b/pom.xml
index 394d898dce..98f19bd07b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1,4 +1,4 @@
-