diff --git a/akka.ipr b/akka.ipr
index a102f6ba6e..a22637ec00 100644
--- a/akka.ipr
+++ b/akka.ipr
@@ -679,7 +679,6 @@
-
@@ -1799,6 +1798,17 @@
+
+
+
+
+
+
+
+
+
+
+
diff --git a/akka.iws b/akka.iws
index 8b2c663fcf..5fbfeffb69 100644
--- a/akka.iws
+++ b/akka.iws
@@ -6,16 +6,17 @@
-
+
-
-
-
+
+
-
-
+
+
+
+
@@ -75,7 +76,7 @@
-
+
@@ -154,7 +155,7 @@
-
+
@@ -163,7 +164,7 @@
-
+
@@ -181,7 +182,7 @@
-
+
@@ -199,8 +200,10 @@
-
-
+
+
+
+
@@ -895,40 +898,6 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
@@ -2472,21 +2441,23 @@
-
-
+
+
+
+
-
+
-
+
@@ -2514,7 +2485,7 @@
-
+
diff --git a/config/akka-reference.conf b/config/akka-reference.conf
index f8f6c48105..539555444a 100644
--- a/config/akka-reference.conf
+++ b/config/akka-reference.conf
@@ -1,6 +1,6 @@
-#################################
-# Akka Actor Kernel Config File #
-#################################
+####################
+# Akka Config File #
+####################
# This file has all the default settings, so all these could be remove with no visible effect.
# Modify as needed.
@@ -22,9 +22,7 @@
# supervisor bootstrap, should be defined in default constructor
timeout = 5000 # default timeout for future based invocations
- concurrent-mode = off # if turned on, then the same actor instance is allowed to execute concurrently -
- # e.g. departing from the actor model for better performance
- serialize-messages = on # does a deep clone of (non-primitive) messages to ensure immutability
+ serialize-messages = off # does a deep clone of (non-primitive) messages to ensure immutability
@@ -50,17 +48,14 @@
- system = "cassandra" # Options: cassandra (coming: terracotta, redis, tokyo-cabinet, tokyo-tyrant, voldemort, memcached, hazelcast)
+ system = "cassandra" # Options: cassandra (coming: terracotta, mongodb, redis, tokyo-cabinet, voldemort, memcached)
service = on
- storage-format = "java" # Options: java, scala-json, java-json
- blocking = false # inserts and queries should be blocking or not
-
-
- service = on
- pidfile = "akka.pid"
-
+ hostname = "localhost" # ip address or hostname of one of the Cassandra cluster's seeds
+ port = 9160
+ storage-format = "binary" # Options: binary, json, simple-json
+ blocking = false # inserts and queries should be blocking or not
diff --git a/embedded-repo/org/apache/cassandra/cassandra/0.4.0-dev/cassandra-0.4.0-dev.jar b/embedded-repo/org/apache/cassandra/cassandra/0.4.0-dev/cassandra-0.4.0-dev.jar
index 8274103dc3..d9f762ad2a 100644
Binary files a/embedded-repo/org/apache/cassandra/cassandra/0.4.0-dev/cassandra-0.4.0-dev.jar and b/embedded-repo/org/apache/cassandra/cassandra/0.4.0-dev/cassandra-0.4.0-dev.jar differ
diff --git a/kernel/pom.xml b/kernel/pom.xml
index 7f0970db32..98f460cf2b 100644
--- a/kernel/pom.xml
+++ b/kernel/pom.xml
@@ -125,6 +125,11 @@
cassidy
0.1
+
+ commons-pool
+ commons-pool
+ 1.5.1
+
diff --git a/kernel/src/main/scala/Kernel.scala b/kernel/src/main/scala/Kernel.scala
index 0dfb90d1b2..8ec3290782 100644
--- a/kernel/src/main/scala/Kernel.scala
+++ b/kernel/src/main/scala/Kernel.scala
@@ -16,7 +16,7 @@ import net.lag.configgy.{Config, Configgy, RuntimeEnvironment}
import kernel.jersey.AkkaCometServlet
import kernel.nio.RemoteServer
-import kernel.state.CassandraStorage
+import kernel.state.EmbeddedCassandraStorage
import kernel.util.Logging
/**
@@ -178,7 +178,7 @@ object Kernel extends Logging {
println("=================================================")
var start = System.currentTimeMillis
- for (i <- 1 to NR_ENTRIES) CassandraStorage.insertMapStorageEntryFor("test", i.toString, "data")
+ for (i <- 1 to NR_ENTRIES) EmbeddedCassandraStorage.insertMapStorageEntryFor("test", i.toString, "data")
var end = System.currentTimeMillis
println("Writes per second: " + NR_ENTRIES / ((end - start).toDouble / 1000))
@@ -186,13 +186,13 @@ object Kernel extends Logging {
start = System.currentTimeMillis
val entries = new scala.collection.mutable.ArrayBuffer[Tuple2[String, String]]
for (i <- 1 to NR_ENTRIES) entries += (i.toString, "data")
- CassandraStorage.insertMapStorageEntriesFor("test", entries.toList)
+ EmbeddedCassandraStorage.insertMapStorageEntriesFor("test", entries.toList)
end = System.currentTimeMillis
println("Writes per second - batch: " + NR_ENTRIES / ((end - start).toDouble / 1000))
println("=================================================")
start = System.currentTimeMillis
- for (i <- 1 to NR_ENTRIES) CassandraStorage.getMapStorageEntryFor("test", i.toString)
+ for (i <- 1 to NR_ENTRIES) EmbeddedCassandraStorage.getMapStorageEntryFor("test", i.toString)
end = System.currentTimeMillis
println("Reads per second: " + NR_ENTRIES / ((end - start).toDouble / 1000))
diff --git a/kernel/src/main/scala/state/CassandraStorage.scala b/kernel/src/main/scala/state/CassandraStorage.scala
index 498c04615f..b2538722fb 100644
--- a/kernel/src/main/scala/state/CassandraStorage.scala
+++ b/kernel/src/main/scala/state/CassandraStorage.scala
@@ -4,7 +4,7 @@
package se.scalablesolutions.akka.kernel.state
-import java.io.File
+import java.io.{File, Flushable, Closeable}
import kernel.util.Logging
import serialization.{Serializer, Serializable, SerializationProtocol}
@@ -12,11 +12,12 @@ import serialization.{Serializer, Serializable, SerializationProtocol}
import org.apache.cassandra.config.DatabaseDescriptor
import org.apache.cassandra.service._
-import org.apache.thrift.server.TThreadPoolServer
-import org.apache.thrift.protocol.TBinaryProtocol
-import org.apache.thrift.transport.TServerSocket
-import org.apache.thrift.transport.TTransportFactory
+//import org.apache.thrift.server.TThreadPoolServer
import org.apache.thrift.TProcessorFactory
+import org.apache.thrift.transport._
+import org.apache.thrift._
+import org.apache.thrift.transport._
+import org.apache.thrift.protocol._
/**
* NOTE: requires command line options:
@@ -33,6 +34,300 @@ object CassandraStorage extends Logging {
val IS_ASCENDING = true
+ import kernel.Kernel.config
+
+ val CASSANDRA_SERVER_HOSTNAME = config.getString("akka.storage.cassandra.hostname", "localhost")
+ val CASSANDRA_SERVER_PORT = config.getInt("akka.storage.cassandra.port", 9160)
+ val BLOCKING_CALL = if (config.getBool("akka.storage.cassandra.blocking", true)) 0
+ else 1
+
+ @volatile private[this] var isRunning = false
+ private[this] val protocol: Protocol = {
+ config.getString("akka.storage.cassandra.storage-format", "binary") match {
+ case "binary" => Protocol.Binary
+ case "json" => Protocol.JSON
+ case "simple-json" => Protocol.SimpleJSON
+ case unknown => throw new UnsupportedOperationException("Unknown storage serialization protocol [" + unknown + "]")
+ }
+ }
+
+
+ private[this] var sessions: Option[CassandraSessionPool[_]] = None
+
+ def start = synchronized {
+ if (!isRunning) {
+ try {
+ sessions = Some(new CassandraSessionPool(StackPool(SocketProvider(CASSANDRA_SERVER_HOSTNAME, CASSANDRA_SERVER_PORT)), protocol))
+ log.info("Cassandra persistent storage has started up successfully");
+ } catch {
+ case e =>
+ log.error("Could not start up Cassandra persistent storage")
+ throw e
+ }
+ isRunning
+ }
+ }
+
+ def stop = synchronized {
+ if (isRunning && sessions.isDefined) sessions.get.close
+ }
+
+ //implicit def strToBytes(s: String) = s.getBytes("UTF-8")
+
+/*
+ def insertRefStorageFor(name: String, element: AnyRef) = sessions.withSession { session => {
+ val user_id = "1"
+ session ++| ("users", user_id, "base_attributes:name", "Lord Foo Bar", false)
+ session ++| ("users", user_id, "base_attributes:age", "24", false)
+ for( i <- session / ("users", user_id, "base_attributes", None, None).toList) println(i)
+ }}
+*/
+ // ===============================================================
+ // For Ref
+ // ===============================================================
+
+ def insertRefStorageFor(name: String, element: String) = if (sessions.isDefined) {
+ sessions.get.withSession {
+ _ ++| (
+ TABLE_NAME,
+ name,
+ REF_COLUMN_FAMILY,
+ element,
+ System.currentTimeMillis,
+ BLOCKING_CALL)
+ }
+ } else throw new IllegalStateException("CassandraStorage is not started")
+
+ def getRefStorageFor(name: String): Option[String] = if (sessions.isDefined) {
+ try {
+ val column = sessions.get.withSession { _ | (TABLE_NAME, name, REF_COLUMN_FAMILY) }
+ Some(column.value)
+ } catch {
+ case e =>
+ e.printStackTrace
+ None
+ }
+ } else throw new IllegalStateException("CassandraStorage is not started")
+
+ // ===============================================================
+ // For Vector
+ // ===============================================================
+
+ def insertVectorStorageEntryFor(name: String, element: String) = if (sessions.isDefined) {
+ sessions.get.withSession {
+ _ ++| (
+ TABLE_NAME,
+ name,
+ VECTOR_COLUMN_FAMILY + ":" + getVectorStorageSizeFor(name),
+ element,
+ System.currentTimeMillis,
+ BLOCKING_CALL)
+ }
+ } else throw new IllegalStateException("CassandraStorage is not started")
+
+ def getVectorStorageEntryFor(name: String, index: Int): String = if (sessions.isDefined) {
+ try {
+ val column = sessions.get.withSession { _ | (TABLE_NAME, name, VECTOR_COLUMN_FAMILY + ":" + index) }
+ column.value
+ } catch {
+ case e =>
+ e.printStackTrace
+ throw new NoSuchElementException(e.getMessage)
+ }
+ } else throw new IllegalStateException("CassandraStorage is not started")
+
+ def getVectorStorageRangeFor(name: String, start: Int, count: Int): List[String] = if (sessions.isDefined) {
+ sessions.get.withSession { _ / (TABLE_NAME, name, VECTOR_COLUMN_FAMILY, IS_ASCENDING, count) }.map(_.value)
+ } else throw new IllegalStateException("CassandraStorage is not started")
+
+ def getVectorStorageSizeFor(name: String): Int = if (sessions.isDefined) {
+ sessions.get.withSession { _ |# (TABLE_NAME, name, VECTOR_COLUMN_FAMILY) }
+ } else throw new IllegalStateException("CassandraStorage is not started")
+
+ // ===============================================================
+ // For Map
+ // ===============================================================
+
+ def insertMapStorageEntryFor(name: String, key: String, value: String) = if (sessions.isDefined) {
+ sessions.get.withSession {
+ _ ++| (
+ TABLE_NAME,
+ name,
+ MAP_COLUMN_FAMILY + ":" + key,
+ value,
+ System.currentTimeMillis,
+ BLOCKING_CALL)
+ }
+ } else throw new IllegalStateException("CassandraStorage is not started")
+
+ def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[String, String]]) = if (sessions.isDefined) {
+ import java.util.{Map, HashMap, List, ArrayList}
+ val columns: Map[String, List[column_t]] = new HashMap
+ for (entry <- entries) {
+ val cls: List[column_t] = new ArrayList
+ cls.add(new column_t(entry._1, entry._2, System.currentTimeMillis))
+ columns.put(MAP_COLUMN_FAMILY, cls)
+ }
+ sessions.get.withSession {
+ _ ++| (
+ new batch_mutation_t(
+ TABLE_NAME,
+ name,
+ columns),
+ BLOCKING_CALL)
+ }
+ } else throw new IllegalStateException("CassandraStorage is not started")
+
+ def getMapStorageEntryFor(name: String, key: String): Option[String] = if (sessions.isDefined) {
+ try {
+ val column = sessions.get.withSession { _ | (TABLE_NAME, name, MAP_COLUMN_FAMILY + ":" + key) }
+ Some(column.value)
+ } catch {
+ case e =>
+ e.printStackTrace
+ None
+ }
+ } else throw new IllegalStateException("CassandraStorage is not started")
+
+ /*
+ def getMapStorageFor(name: String): List[Tuple2[String, String]] = if (sessions.isDefined) {
+ val columns = server.get_columns_since(TABLE_NAME, name, MAP_COLUMN_FAMILY, -1)
+ .toArray.toList.asInstanceOf[List[org.apache.cassandra.service.column_t]]
+ for {
+ column <- columns
+ col = (column.columnName, column.value)
+ } yield col
+ } else throw new IllegalStateException("CassandraStorage is not started")
+ */
+
+ def getMapStorageSizeFor(name: String): Int = if (sessions.isDefined) {
+ sessions.get.withSession { _ |# (TABLE_NAME, name, MAP_COLUMN_FAMILY) }
+ } else throw new IllegalStateException("CassandraStorage is not started")
+
+ def removeMapStorageFor(name: String) = if (sessions.isDefined) {
+ sessions.get.withSession { _ -- (TABLE_NAME, name, MAP_COLUMN_FAMILY, System.currentTimeMillis, BLOCKING_CALL) }
+ } else throw new IllegalStateException("CassandraStorage is not started")
+
+ def getMapStorageRangeFor(name: String, start: Int, count: Int): List[Tuple2[String, String]] = if (sessions.isDefined) {
+ sessions.get.withSession { _ / (TABLE_NAME, name, MAP_COLUMN_FAMILY, IS_ASCENDING, count) }.toArray.toList.asInstanceOf[List[Tuple2[String, String]]]
+ } else throw new IllegalStateException("CassandraStorage is not started")
+}
+
+trait CassandraSession extends Closeable with Flushable {
+ import scala.collection.jcl.Conversions._
+ import org.scala_tools.javautils.Imports._
+
+ private implicit def null2Option[T](t: T): Option[T] = if(t != null) Some(t) else None
+
+ protected val client: Cassandra.Client
+
+ val obtainedAt: Long
+
+ def /(tableName: String, key: String, columnParent: String, start: Option[Int],end: Option[Int]): List[column_t] =
+ client.get_slice(tableName, key, columnParent, start.getOrElse(-1),end.getOrElse(-1)).toList
+
+ def /(tableName: String, key: String, columnParent: String, colNames: List[String]): List[column_t] =
+ client.get_slice_by_names(tableName, key, columnParent, colNames.asJava ).toList
+
+ def |(tableName: String, key: String, colPath: String): Option[column_t] =
+ client.get_column(tableName, key, colPath)
+
+ def |#(tableName: String, key: String, columnParent: String): Int =
+ client.get_column_count(tableName, key, columnParent)
+
+ def ++|(tableName: String, key: String, columnPath: String, cellData: Array[Byte], timestamp: Long, block: Int) =
+ client.insert(tableName, key, columnPath, cellData,timestamp,block)
+
+ def ++|(tableName: String, key: String, columnPath: String, cellData: Array[Byte], block: Int) =
+ client.insert(tableName,key,columnPath,cellData,obtainedAt,block)
+
+ def ++|(batch: batch_mutation_t, block: Int) =
+ client.batch_insert(batch, block)
+
+ def --(tableName: String, key: String, columnPathOrParent: String, timestamp: Long, block: Int) =
+ client.remove(tableName, key, columnPathOrParent, timestamp, block)
+
+ def --(tableName: String, key: String, columnPathOrParent: String, block: Int) =
+ client.remove(tableName, key, columnPathOrParent, obtainedAt, block)
+
+ def /@(tableName: String, key: String, columnParent: String, timestamp: Long): List[column_t] =
+ client.get_columns_since(tableName, key, columnParent, timestamp).toList
+
+ def /^(tableName: String, key: String, columnFamily: String, start: Option[Int], end: Option[Int], count: Int ): List[superColumn_t] =
+ client.get_slice_super(tableName, key,columnFamily, start.getOrElse(-1), end.getOrElse(-1)).toList //TODO upgrade thrift interface to support count
+
+ def /^(tableName: String, key: String, columnFamily: String, superColNames: List[String]): List[superColumn_t] =
+ client.get_slice_super_by_names(tableName, key, columnFamily, superColNames.asJava).toList
+
+ def |^(tableName: String, key: String, superColumnPath: String): Option[superColumn_t] =
+ client.get_superColumn(tableName,key,superColumnPath)
+
+ def ++|^ (batch: batch_mutation_super_t, block: Int) =
+ client.batch_insert_superColumn(batch, block)
+
+ def keys(tableName: String, startsWith: String, stopsAt: String, maxResults: Option[Int]): List[String] =
+ client.get_key_range(tableName, startsWith, stopsAt, maxResults.getOrElse(-1)).toList
+
+ def property(name: String): String = client.getStringProperty(name)
+ def properties(name: String): List[String] = client.getStringListProperty(name).toList
+ def describeTable(tableName: String) = client.describeTable(tableName)
+
+ def ?(query: String) = client.executeQuery(query)
+}
+
+class CassandraSessionPool[T <: TTransport](transportPool: Pool[T], inputProtocol: Protocol, outputProtocol: Protocol) extends Closeable {
+ def this(transportPool: Pool[T], ioProtocol: Protocol) = this(transportPool,ioProtocol,ioProtocol)
+
+ def newSession: CassandraSession = {
+ val t = transportPool.borrowObject
+ val c = new Cassandra.Client(inputProtocol(t),outputProtocol(t))
+ new CassandraSession {
+ val client = c
+ val obtainedAt = System.currentTimeMillis
+ def flush = t.flush
+ def close = transportPool.returnObject(t)
+ }
+ }
+
+ def withSession[R](body: CassandraSession => R) = {
+ val session = newSession
+ try {
+ val result = body(session)
+ session.flush
+ result
+ } finally {
+ session.close
+ }
+ }
+
+ def close = transportPool.close
+}
+
+sealed abstract class Protocol(val factory: TProtocolFactory) {
+ def apply(transport: TTransport) = factory.getProtocol(transport)
+}
+
+object Protocol {
+ object Binary extends Protocol(new TBinaryProtocol.Factory)
+ object SimpleJSON extends Protocol(new TSimpleJSONProtocol.Factory)
+ object JSON extends Protocol(new TJSONProtocol.Factory)
+}
+
+/**
+ * NOTE: requires command line options:
+ *
+ * -Dcassandra -Dstorage-config=config/ -Dpidfile=akka.pid
+ *
+ * @author Jonas Bonér
+ *
+object EmbeddedCassandraStorage extends Logging {
+ val TABLE_NAME = "akka"
+ val MAP_COLUMN_FAMILY = "map"
+ val VECTOR_COLUMN_FAMILY = "vector"
+ val REF_COLUMN_FAMILY = "ref:item"
+
+ val IS_ASCENDING = true
+
val RUN_THRIFT_SERVICE = kernel.Kernel.config.getBool("akka.storage.cassandra.thrift-server.service", false)
val BLOCKING_CALL = {
if (kernel.Kernel.config.getBool("akka.storage.cassandra.blocking", true)) 0
@@ -89,7 +384,7 @@ object CassandraStorage extends Logging {
TABLE_NAME,
name,
REF_COLUMN_FAMILY,
- serializer.out(element),
+ element,
System.currentTimeMillis,
BLOCKING_CALL)
}
@@ -114,7 +409,7 @@ object CassandraStorage extends Logging {
TABLE_NAME,
name,
VECTOR_COLUMN_FAMILY + ":" + getVectorStorageSizeFor(name),
- serializer.out(element),
+ element,
System.currentTimeMillis,
BLOCKING_CALL)
}
@@ -198,6 +493,7 @@ object CassandraStorage extends Logging {
}
}
+
class CassandraThriftServer(server: CassandraServer) extends Logging {
case object Start
case object Stop
@@ -241,3 +537,4 @@ class CassandraThriftServer(server: CassandraServer) extends Logging {
def start = serverDaemon ! Start
def stop = serverDaemon ! Stop
}
+*/
diff --git a/kernel/src/main/scala/state/Pool.scala b/kernel/src/main/scala/state/Pool.scala
new file mode 100755
index 0000000000..1ba3ba1b3c
--- /dev/null
+++ b/kernel/src/main/scala/state/Pool.scala
@@ -0,0 +1,94 @@
+/**
+ * Copyright (C) 2009 Scalable Solutions.
+ */
+
+package se.scalablesolutions.akka.kernel.state
+
+import org.apache.commons.pool._
+import org.apache.commons.pool.impl._
+
+trait Pool[T] extends java.io.Closeable {
+ def borrowObject: T
+ def returnObject(t: T): Unit
+ def invalidateObject(t: T): Unit
+ def addObject: Unit
+ def getNumIdle: Int
+ def getNumActive: Int
+ def clear: Unit
+ def setFactory(factory: PoolItemFactory[T]): Unit
+}
+
+trait PoolFactory[T] {
+ def createPool: Pool[T]
+}
+
+trait PoolItemFactory[T] {
+ def makeObject: T
+ def destroyObject(t: T): Unit
+ def validateObject(t: T): Boolean
+ def activateObject(t: T): Unit
+ def passivateObject(t: T): Unit
+}
+
+trait PoolBridge[T, OP <: ObjectPool] extends Pool[T] {
+ val impl: OP
+ override def borrowObject: T = impl.borrowObject.asInstanceOf[T]
+ override def returnObject(t: T) = impl.returnObject(t)
+ override def invalidateObject(t: T) = impl.invalidateObject(t)
+ override def addObject = impl.addObject
+ override def getNumIdle: Int = impl.getNumIdle
+ override def getNumActive: Int = impl.getNumActive
+ override def clear: Unit = impl.clear
+ override def close: Unit = impl.close
+ override def setFactory(factory: PoolItemFactory[T]) = impl.setFactory(toPoolableObjectFactory(factory))
+
+ def toPoolableObjectFactory[T](pif: PoolItemFactory[T]) = new PoolableObjectFactory {
+ def makeObject: Object = pif.makeObject.asInstanceOf[Object]
+ def destroyObject(o: Object): Unit = pif.destroyObject(o.asInstanceOf[T])
+ def validateObject(o: Object): Boolean = pif.validateObject(o.asInstanceOf[T])
+ def activateObject(o: Object): Unit = pif.activateObject(o.asInstanceOf[T])
+ def passivateObject(o: Object): Unit = pif.passivateObject(o.asInstanceOf[T])
+ }
+}
+
+object StackPool {
+ def apply[T](factory: PoolItemFactory[T]) = new PoolBridge[T,StackObjectPool] {
+ val impl = new StackObjectPool(toPoolableObjectFactory(factory))
+ }
+
+ def apply[T](factory: PoolItemFactory[T], maxIdle: Int) = new PoolBridge[T,StackObjectPool] {
+ val impl = new StackObjectPool(toPoolableObjectFactory(factory),maxIdle)
+ }
+
+ def apply[T](factory: PoolItemFactory[T], maxIdle: Int, initIdleCapacity: Int) = new PoolBridge[T,StackObjectPool] {
+ val impl = new StackObjectPool(toPoolableObjectFactory(factory),maxIdle,initIdleCapacity)
+ }
+}
+
+object SoftRefPool {
+ def apply[T](factory: PoolItemFactory[T]) = new PoolBridge[T,SoftReferenceObjectPool] {
+ val impl = new SoftReferenceObjectPool(toPoolableObjectFactory(factory))
+ }
+
+ def apply[T](factory: PoolItemFactory[T], initSize: Int) = new PoolBridge[T,SoftReferenceObjectPool] {
+ val impl = new SoftReferenceObjectPool(toPoolableObjectFactory(factory),initSize)
+ }
+}
+
+trait TransportFactory[T <: TTransport] extends PoolItemFactory[T] {
+ def createTransport: T
+ def makeObject: T = createTransport
+ def destroyObject(transport: T): Unit = transport.close
+ def validateObject(transport: T) = transport.isOpen
+ def activateObject(transport: T): Unit = if( !transport.isOpen ) transport.open else ()
+ def passivateObject(transport: T): Unit = transport.flush
+}
+
+case class SocketProvider(val host: String, val port: Int) extends TransportFactory[TSocket] {
+ def createTransport = {
+ val t = new TSocket(host,port)
+ t.open
+ t
+ }
+}
+
diff --git a/kernel/src/main/scala/state/State.scala b/kernel/src/main/scala/state/State.scala
index 8967ed3dd8..58c1cbfbe8 100644
--- a/kernel/src/main/scala/state/State.scala
+++ b/kernel/src/main/scala/state/State.scala
@@ -478,4 +478,4 @@ class CassandraPersistentTransactionalRef extends TransactionalRef[AnyRef] {
if (ref.isDefined) ref
else default
}
-}
\ No newline at end of file
+}
diff --git a/lib/cassandra-0.4.0-dev.jar b/lib/cassandra-0.4.0-dev.jar
index 8274103dc3..d9f762ad2a 100644
Binary files a/lib/cassandra-0.4.0-dev.jar and b/lib/cassandra-0.4.0-dev.jar differ
diff --git a/samples-java/akka-samples-java.iml b/samples-java/akka-samples-java.iml
index 28437e4328..4a210fcd93 100644
--- a/samples-java/akka-samples-java.iml
+++ b/samples-java/akka-samples-java.iml
@@ -32,6 +32,11 @@
+
+
+
+
+
@@ -45,6 +50,7 @@
+
@@ -61,10 +67,6 @@
-
-
-
-
diff --git a/samples-scala/src/main/scala/SimpleService.scala b/samples-scala/src/main/scala/SimpleService.scala
index 79eaaad468..59d28e52e6 100644
--- a/samples-scala/src/main/scala/SimpleService.scala
+++ b/samples-scala/src/main/scala/SimpleService.scala
@@ -114,4 +114,4 @@ class JsonpFilter extends BroadcastFilter[String] with Logging {
message +
"\" }); \n\n")
}
-}
\ No newline at end of file
+}