diff --git a/akka-persistence/src/main/scala/CassandraSession.scala b/akka-persistence/src/main/scala/CassandraSession.scala index aeea811382..c3950268d8 100644 --- a/akka-persistence/src/main/scala/CassandraSession.scala +++ b/akka-persistence/src/main/scala/CassandraSession.scala @@ -9,7 +9,6 @@ import java.io.{Flushable, Closeable} import util.Logging import util.Helpers._ import serialization.Serializer -import Config.config import org.apache.cassandra.db.ColumnFamily import org.apache.cassandra.service._ @@ -21,6 +20,18 @@ import org.apache.thrift.transport._ import org.apache.thrift.protocol._ /** + * Usage: + *
+ * // Uses default StackPool, for other pools see the API
+ * val sessions = new CassandraSessionPool(
+ * hostname, port,
+ * keyspace,
+ * Protocol.JSON,
+ * consistencyLevel)
+ *
+ * sessions.withSession { session => ... }
+ *
+ *
* @author Jonas Bonér
*/
trait CassandraSession extends Closeable with Flushable {
@@ -35,8 +46,8 @@ trait CassandraSession extends Closeable with Flushable {
val consistencyLevel: Int
val schema: JMap[String, JMap[String, String]]
- /**
- * Count is always the max number of results to return.
+ /*
+ Count is always the max number of results to return.
So it means, starting with `start`, or the first one if start is
empty, go until you hit `finish` or `count`, whichever comes first.
@@ -188,21 +199,34 @@ trait CassandraSession extends Closeable with Flushable {
client.get_key_range(keyspace, columnFamily, startsWith, stopsAt, maxResults.getOrElse(-1)).toList
}
+/**
+ * Usage:
+ *
+ * val sessions = new CassandraSessionPool(
+ * hostname, port,
+ * keyspace,
+ * Protocol.JSON,
+ * consistencyLevel)
+ * sessions.withSession { session => ... }
+ *
+ */
class CassandraSessionPool[T <: TTransport](
space: String,
transportPool: Pool[T],
- inputProtocol: Protocol,
- outputProtocol: Protocol,
+ protocol: Protocol,
consistency: Int) extends Closeable with Logging {
- def this(space: String, transportPool: Pool[T], ioProtocol: Protocol, consistency: Int) =
- this (space, transportPool, ioProtocol, ioProtocol, consistency)
+ /**
+ * Uses StackPool as session pool.
+ */
+ def this(hostname: String, port: Int, space: String, ioProtocol: Protocol, consistency: Int) =
+ this(space, StackPool(SocketProvider(hostname, port).asInstanceOf[PoolItemFactory[T]]), ioProtocol, consistency)
def newSession: CassandraSession = newSession(consistency)
def newSession(consistencyLevel: Int): CassandraSession = {
val socket = transportPool.borrowObject
- val cassandraClient = new Cassandra.Client(inputProtocol(socket), outputProtocol(socket))
+ val cassandraClient = new Cassandra.Client(protocol(socket), protocol(socket))
val cassandraSchema = cassandraClient.describe_keyspace(space)
new CassandraSession {
val keyspace = space
@@ -287,15 +311,15 @@ trait PoolBridge[T, OP <: ObjectPool] extends Pool[T] {
}
object StackPool {
- def apply[T](factory: PoolItemFactory[T]) = new PoolBridge[T,StackObjectPool] {
+ def apply[T](factory: PoolItemFactory[T]) = new PoolBridge[T, StackObjectPool] {
val impl = new StackObjectPool(toPoolableObjectFactory(factory))
}
- def apply[T](factory: PoolItemFactory[T], maxIdle: Int) = new PoolBridge[T,StackObjectPool] {
+ def apply[T](factory: PoolItemFactory[T], maxIdle: Int) = new PoolBridge[T, StackObjectPool] {
val impl = new StackObjectPool(toPoolableObjectFactory(factory),maxIdle)
}
- def apply[T](factory: PoolItemFactory[T], maxIdle: Int, initIdleCapacity: Int) = new PoolBridge[T,StackObjectPool] {
+ def apply[T](factory: PoolItemFactory[T], maxIdle: Int, initIdleCapacity: Int) = new PoolBridge[T, StackObjectPool] {
val impl = new StackObjectPool(toPoolableObjectFactory(factory),maxIdle,initIdleCapacity)
}
}
diff --git a/akka-persistence/src/main/scala/CassandraStorage.scala b/akka-persistence/src/main/scala/CassandraStorage.scala
index 1ac84b77b1..c425151ad7 100644
--- a/akka-persistence/src/main/scala/CassandraStorage.scala
+++ b/akka-persistence/src/main/scala/CassandraStorage.scala
@@ -9,7 +9,7 @@ import java.io.{Flushable, Closeable}
import util.Logging
import util.Helpers._
import serialization.Serializer
-import akka.Config.config
+import Config.config
import org.apache.cassandra.db.ColumnFamily
import org.apache.cassandra.service._
@@ -107,7 +107,9 @@ object CassandraStorage extends MapStorage
}
}
+ // FIXME implement
def insertVectorStorageEntriesFor(name: String, elements: List[AnyRef]) = {
+ throw new UnsupportedOperationException("insertVectorStorageEntriesFor for CassandraStorage is not implemented yet")
}
def updateVectorStorageEntryFor(name: String, index: Int, elem: AnyRef) = {
@@ -128,7 +130,7 @@ object CassandraStorage extends MapStorage
else throw new NoSuchElementException("No element for vector [" + name + "] and index [" + index + "]")
}
- def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): RandomAccessSeq[AnyRef] = {
+ def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[AnyRef] = {
val startBytes = if (start.isDefined) intToBytes(start.get) else null
val finishBytes = if (finish.isDefined) intToBytes(finish.get) else null
val columns: List[Column] = sessions.withSession {
@@ -139,9 +141,7 @@ object CassandraStorage extends MapStorage
count,
CONSISTENCY_LEVEL)
}
- val buffer = new ArrayBuffer[AnyRef]
- for (elem <- columns.map(column => serializer.in(column.value, None))) buffer.append(elem)
- buffer
+ columns.map(column => serializer.in(column.value, None))
}
def getVectorStorageSizeFor(name: String): Int = {
diff --git a/akka-persistence/src/main/scala/MongoStorage.scala b/akka-persistence/src/main/scala/MongoStorage.scala
index a8c921a44b..f9f566c92f 100644
--- a/akka-persistence/src/main/scala/MongoStorage.scala
+++ b/akka-persistence/src/main/scala/MongoStorage.scala
@@ -4,15 +4,13 @@
package se.scalablesolutions.akka.state
-import akka.util.Logging
-import serialization.{Serializer}
+import util.Logging
import Config.config
+
import sjson.json.Serializer._
import com.mongodb._
-import scala.collection.mutable.ArrayBuffer
-
import java.util.{Map=>JMap, List=>JList, ArrayList=>JArrayList}
/**
@@ -53,17 +51,17 @@ object MongoStorage extends MapStorage with VectorStorage with RefStorage with L
// FIXME: make this pluggable
private[this] val serializer = SJSON
- def insertMapStorageEntryFor(name: String,
- key: AnyRef, value: AnyRef) {
+ def insertMapStorageEntryFor(name: String, key: AnyRef, value: AnyRef) {
insertMapStorageEntriesFor(name, List((key, value)))
}
- def insertMapStorageEntriesFor(name: String,
- entries: List[Tuple2[AnyRef, AnyRef]]) {
+ def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[AnyRef, AnyRef]]) {
import java.util.{Map, HashMap}
val m: Map[AnyRef, AnyRef] = new HashMap
- for ((k, v) <- entries) m.put(k, serializer.out(v))
+ for ((k, v) <- entries) {
+ m.put(k, serializer.out(v))
+ }
nullSafeFindOne(name) match {
case None =>
@@ -102,10 +100,9 @@ object MongoStorage extends MapStorage with VectorStorage with RefStorage with L
}
}
- def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef] = {
+ def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef] =
getValueForKey(name, key.asInstanceOf[String])
- }
-
+
def getMapStorageSizeFor(name: String): Int = {
nullSafeFindOne(name) match {
case None => 0
@@ -115,21 +112,26 @@ object MongoStorage extends MapStorage with VectorStorage with RefStorage with L
}
def getMapStorageFor(name: String): List[Tuple2[AnyRef, AnyRef]] = {
- val m = nullSafeFindOne(name) match {
+ val m =
+ nullSafeFindOne(name) match {
case None =>
throw new Predef.NoSuchElementException(name + " not present")
case Some(dbo) =>
dbo.get(VALUE).asInstanceOf[JMap[String, AnyRef]]
}
- val n = List(m.keySet.toArray: _*).asInstanceOf[List[String]]
- val vals = for(s <- n) yield (s, serializer.in[AnyRef](m.get(s).asInstanceOf[Array[Byte]]))
+ val n =
+ List(m.keySet.toArray: _*).asInstanceOf[List[String]]
+ val vals =
+ for(s <- n)
+ yield (s, serializer.in[AnyRef](m.get(s).asInstanceOf[Array[Byte]]))
vals.asInstanceOf[List[Tuple2[String, AnyRef]]]
}
def getMapStorageRangeFor(name: String, start: Option[AnyRef],
finish: Option[AnyRef],
count: Int): List[Tuple2[AnyRef, AnyRef]] = {
- val m = nullSafeFindOne(name) match {
+ val m =
+ nullSafeFindOne(name) match {
case None =>
throw new Predef.NoSuchElementException(name + " not present")
case Some(dbo) =>
@@ -149,8 +151,11 @@ object MongoStorage extends MapStorage with VectorStorage with RefStorage with L
}
else count
- val n = List(m.keySet.toArray: _*).asInstanceOf[List[String]].sort((e1, e2) => (e1 compareTo e2) < 0).slice(s, s + cnt)
- val vals = for(s <- n) yield (s, serializer.in[AnyRef](m.get(s).asInstanceOf[Array[Byte]]))
+ val n =
+ List(m.keySet.toArray: _*).asInstanceOf[List[String]].sort((e1, e2) => (e1 compareTo e2) < 0).slice(s, s + cnt)
+ val vals =
+ for(s <- n)
+ yield (s, serializer.in[AnyRef](m.get(s).asInstanceOf[Array[Byte]]))
vals.asInstanceOf[List[Tuple2[String, AnyRef]]]
}
@@ -159,15 +164,17 @@ object MongoStorage extends MapStorage with VectorStorage with RefStorage with L
nullSafeFindOne(name) match {
case None => None
case Some(dbo) =>
- Some(serializer.in[AnyRef](dbo.get(VALUE).asInstanceOf[JMap[String, AnyRef]].get(key).asInstanceOf[Array[Byte]]))
+ Some(serializer.in[AnyRef](
+ dbo.get(VALUE)
+ .asInstanceOf[JMap[String, AnyRef]]
+ .get(key).asInstanceOf[Array[Byte]]))
}
} catch {
- case e => throw new Predef.NoSuchElementException(e.getMessage)
+ case e =>
+ throw new Predef.NoSuchElementException(e.getMessage)
}
}
- def updateVectorStorageEntryFor(name: String, index: Int, elem: AnyRef) = throw new UnsupportedOperationException("The updateVectorStorageEntryFor method is not yet implemented for MongoDB")
-
def insertVectorStorageEntriesFor(name: String, elements: List[AnyRef]) = {
val q = new BasicDBObject
q.put(KEY, name)
@@ -187,44 +194,63 @@ object MongoStorage extends MapStorage with VectorStorage with RefStorage with L
// add to the current list
elements.map(serializer.out(_)).foreach(currentList.add(_))
- coll.insert(new BasicDBObject().append(KEY, name).append(VALUE, currentList))
+
+ coll.insert(
+ new BasicDBObject()
+ .append(KEY, name)
+ .append(VALUE, currentList)
+ )
}
- def insertVectorStorageEntryFor(name: String, element: AnyRef) = insertVectorStorageEntriesFor(name, List(element))
+ def insertVectorStorageEntryFor(name: String, element: AnyRef) = {
+ insertVectorStorageEntriesFor(name, List(element))
+ }
def getVectorStorageEntryFor(name: String, index: Int): AnyRef = {
try {
- val o = nullSafeFindOne(name) match {
+ val o =
+ nullSafeFindOne(name) match {
case None =>
throw new Predef.NoSuchElementException(name + " not present")
+
case Some(dbo) =>
dbo.get(VALUE).asInstanceOf[JList[AnyRef]]
}
- serializer.in[AnyRef](o.get(index).asInstanceOf[Array[Byte]])
+ serializer.in[AnyRef](
+ o.get(index).asInstanceOf[Array[Byte]])
} catch {
- case e => throw new Predef.NoSuchElementException(e.getMessage)
+ case e =>
+ throw new Predef.NoSuchElementException(e.getMessage)
}
}
- def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): RandomAccessSeq[AnyRef] = {
+ def getVectorStorageRangeFor(name: String,
+ start: Option[Int], finish: Option[Int], count: Int): List[AnyRef] = {
try {
- val o = nullSafeFindOne(name) match {
+ val o =
+ nullSafeFindOne(name) match {
case None =>
throw new Predef.NoSuchElementException(name + " not present")
+
case Some(dbo) =>
dbo.get(VALUE).asInstanceOf[JList[AnyRef]]
}
// pick the subrange and make a Scala list
- val l = List(o.subList(start.get, start.get + count).toArray: _*)
- val buffer = new ArrayBuffer[AnyRef]
- for (elem <- l.map(e => serializer.in[AnyRef](e.asInstanceOf[Array[Byte]]))) buffer.append(elem)
- buffer
+ val l =
+ List(o.subList(start.get, start.get + count).toArray: _*)
+
+ for(e <- l)
+ yield serializer.in[AnyRef](e.asInstanceOf[Array[Byte]])
} catch {
- case e => throw new Predef.NoSuchElementException(e.getMessage)
+ case e =>
+ throw new Predef.NoSuchElementException(e.getMessage)
}
}
+ // FIXME implement updateVectorStorageEntryFor
+ def updateVectorStorageEntryFor(name: String, index: Int, elem: AnyRef) = throw new UnsupportedOperationException
+
def getVectorStorageSizeFor(name: String): Int = {
nullSafeFindOne(name) match {
case None => 0
@@ -248,7 +274,10 @@ object MongoStorage extends MapStorage with VectorStorage with RefStorage with L
coll.remove(q)
}
}
- coll.insert(new BasicDBObject().append(KEY, name).append(VALUE, serializer.out(element)))
+ coll.insert(
+ new BasicDBObject()
+ .append(KEY, name)
+ .append(VALUE, serializer.out(element)))
}
def getRefStorageFor(name: String): Option[AnyRef] = {
@@ -258,4 +287,4 @@ object MongoStorage extends MapStorage with VectorStorage with RefStorage with L
Some(serializer.in[AnyRef](dbo.get(VALUE).asInstanceOf[Array[Byte]]))
}
}
-}
+}
\ No newline at end of file
diff --git a/akka-persistence/src/main/scala/PersistentState.scala b/akka-persistence/src/main/scala/PersistentState.scala
index 61fa249cff..56096df810 100644
--- a/akka-persistence/src/main/scala/PersistentState.scala
+++ b/akka-persistence/src/main/scala/PersistentState.scala
@@ -70,7 +70,7 @@ trait PersistentMap extends scala.collection.mutable.Map[AnyRef, AnyRef] with Tr
// to be concretized in subclasses
val storage: MapStorage
- def commit = {
+ private[akka] def commit = {
storage.removeMapStorageFor(uuid, removedEntries.toList)
storage.insertMapStorageEntriesFor(uuid, newAndUpdatedEntries.toList)
if (shouldClearOnCommit.isDefined & shouldClearOnCommit.get.get) storage.removeMapStorageFor(uuid)
@@ -162,7 +162,7 @@ trait PersistentVector extends RandomAccessSeq[AnyRef] with Transactional {
val storage: VectorStorage
- def commit = {
+ private[akka] def commit = {
// FIXME: should use batch function once the bug is resolved
for (element <- newElems) storage.insertVectorStorageEntryFor(uuid, element)
for (entry <- updatedElems) storage.updateVectorStorageEntryFor(uuid, entry._1, entry._2)
@@ -183,8 +183,11 @@ trait PersistentVector extends RandomAccessSeq[AnyRef] with Transactional {
override def slice(start: Int, count: Int): RandomAccessSeq[AnyRef] = slice(Some(start), None, count)
- def slice(start: Option[Int], finish: Option[Int], count: Int): RandomAccessSeq[AnyRef] =
- storage.getVectorStorageRangeFor(uuid, start, finish, count)
+ def slice(start: Option[Int], finish: Option[Int], count: Int): RandomAccessSeq[AnyRef] = {
+ val buffer = new scala.collection.mutable.ArrayBuffer[AnyRef]
+ storage.getVectorStorageRangeFor(uuid, start, finish, count).foreach(buffer.append(_))
+ buffer
+ }
/**
* Removes the tail element of this vector.
@@ -236,7 +239,7 @@ trait PersistentRef extends Transactional {
val storage: RefStorage
- def commit = if (ref.isDefined) {
+ private[akka] def commit = if (ref.isDefined) {
storage.insertRefStorageFor(uuid, ref.get)
ref.swap(null)
}
diff --git a/akka-persistence/src/main/scala/Storage.scala b/akka-persistence/src/main/scala/Storage.scala
index 26765c0b6d..52dc45afa7 100644
--- a/akka-persistence/src/main/scala/Storage.scala
+++ b/akka-persistence/src/main/scala/Storage.scala
@@ -25,7 +25,7 @@ trait VectorStorage extends Storage {
def insertVectorStorageEntriesFor(name: String, elements: List[AnyRef])
def updateVectorStorageEntryFor(name: String, index: Int, elem: AnyRef)
def getVectorStorageEntryFor(name: String, index: Int): AnyRef
- def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): RandomAccessSeq[AnyRef]
+ def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[AnyRef]
def getVectorStorageSizeFor(name: String): Int
}
diff --git a/akka-persistence/src/test/scala/MongoPersistentActorSpec.scala b/akka-persistence/src/test/scala/MongoPersistentActorSpec.scala
index dc060224f6..de422919f9 100644
--- a/akka-persistence/src/test/scala/MongoPersistentActorSpec.scala
+++ b/akka-persistence/src/test/scala/MongoPersistentActorSpec.scala
@@ -26,6 +26,150 @@ case class MultiDebit(accountNo: String, amounts: List[BigInt], failer: Actor)
case class Credit(accountNo: String, amount: BigInt)
case object LogSize
+class BankAccountActor extends Actor {
+ makeTransactionRequired
+
+ private var accountState: PersistentMap = _
+ private var txnLog: PersistentVector = _
+ override def initializeTransactionalState = {
+ accountState = PersistentState.newMap(MongoStorageConfig())
+ txnLog = PersistentState.newVector(MongoStorageConfig())
+ }
+
+ def receive: PartialFunction[Any, Unit] = {
+ // check balance
+ case Balance(accountNo) =>
+ txnLog.add("Balance:" + accountNo)
+ reply(accountState.get(accountNo).get)
+
+ // debit amount: can fail
+ case Debit(accountNo, amount, failer) =>
+ txnLog.add("Debit:" + accountNo + " " + amount)
+ val m: BigInt =
+ accountState.get(accountNo) match {
+ case None => 0
+ case Some(v) => v.asInstanceOf[BigInt]
+ }
+ accountState.put(accountNo, (m - amount))
+ if (amount > m)
+ failer !! "Failure"
+ reply(m - amount)
+
+ // many debits: can fail
+ // demonstrates true rollback even if multiple puts have been done
+ case MultiDebit(accountNo, amounts, failer) =>
+ txnLog.add("MultiDebit:" + accountNo + " " + amounts.map(_.intValue).foldLeft(0)(_ + _))
+ val m: BigInt =
+ accountState.get(accountNo) match {
+ case None => 0
+ case Some(v) => BigInt(v.asInstanceOf[String])
+ }
+ var bal: BigInt = 0
+ amounts.foreach {amount =>
+ bal = bal + amount
+ accountState.put(accountNo, (m - bal))
+ }
+ if (bal > m) failer !! "Failure"
+ reply(m - bal)
+
+ // credit amount
+ case Credit(accountNo, amount) =>
+ txnLog.add("Credit:" + accountNo + " " + amount)
+ val m: BigInt =
+ accountState.get(accountNo) match {
+ case None => 0
+ case Some(v) => v.asInstanceOf[BigInt]
+ }
+ accountState.put(accountNo, (m + amount))
+ reply(m + amount)
+
+ case LogSize =>
+ reply(txnLog.length.asInstanceOf[AnyRef])
+ }
+}
+
+class MongoPersistentActorSpec extends TestCase {
+ @Test
+ def testSuccessfulDebit = {
+ val bactor = new BankAccountActor
+ bactor.start
+ val failer = new PersistentFailerActor
+ failer.start
+ bactor !! Credit("a-123", 5000)
+ bactor !! Debit("a-123", 3000, failer)
+ val b = (bactor !! Balance("a-123"))
+ assertTrue(b.isDefined)
+ assertEquals(BigInt(2000), b.get)
+
+ bactor !! Credit("a-123", 7000)
+ val b1 = (bactor !! Balance("a-123"))
+ assertTrue(b1.isDefined)
+ assertEquals(BigInt(9000), b1.get)
+
+ bactor !! Debit("a-123", 8000, failer)
+ val b2 = (bactor !! Balance("a-123"))
+ assertTrue(b2.isDefined)
+ assertEquals(BigInt(1000), b2.get)
+ assertEquals(7, (bactor !! LogSize).get)
+ }
+
+ @Test
+ def testUnsuccessfulDebit = {
+ val bactor = new BankAccountActor
+ bactor.start
+ bactor !! Credit("a-123", 5000)
+
+ val b = (bactor !! Balance("a-123"))
+ assertTrue(b.isDefined)
+ assertEquals(BigInt(5000), b.get)
+
+ val failer = new PersistentFailerActor
+ failer.start
+ try {
+ bactor !! Debit("a-123", 7000, failer)
+ fail("should throw exception")
+ } catch { case e: RuntimeException => {}}
+
+ val b1 = (bactor !! Balance("a-123"))
+ assertTrue(b1.isDefined)
+ assertEquals(BigInt(5000), b1.get)
+
+ // should not count the failed one
+ assertEquals(3, (bactor !! LogSize).get)
+ }
+
+ @Test
+ def testUnsuccessfulMultiDebit = {
+ val bactor = new BankAccountActor
+ bactor.start
+ bactor !! Credit("a-123", 5000)
+ val b = (bactor !! Balance("a-123"))
+ assertTrue(b.isDefined)
+ assertEquals(BigInt(5000), b.get)
+
+ val failer = new PersistentFailerActor
+ failer.start
+ try {
+ bactor !! MultiDebit("a-123", List(500, 2000, 1000, 3000), failer)
+ fail("should throw exception")
+ } catch { case e: RuntimeException => {}}
+
+ val b1 = (bactor !! Balance("a-123"))
+ assertTrue(b1.isDefined)
+ assertEquals(BigInt(5000), b1.get)
+
+ // should not count the failed one
+ assertEquals(3, (bactor !! LogSize).get)
+ }
+}
+
+/*
+case class Balance(accountNo: String)
+case class Debit(accountNo: String, amount: BigInt, failer: Actor)
+case class MultiDebit(accountNo: String, amounts: List[BigInt], failer: Actor)
+case class Credit(accountNo: String, amount: BigInt)
+case object LogSize
+
class BankAccountActor extends Actor {
makeTransactionRequired
@@ -49,6 +193,7 @@ class BankAccountActor extends Actor {
accountState.get(accountNo) match {
case None => 0
case Some(v) => {
+ println("======= " + v)
val JsNumber(n) = v.asInstanceOf[JsValue]
BigInt(n.toString)
}
@@ -168,3 +313,4 @@ class MongoPersistentActorSpec extends TestCase {
assertEquals(3, (bactor !! LogSize).get)
}
}
+*/
\ No newline at end of file
diff --git a/akka.iml b/akka.iml
index 4a4e9fc451..f748697a86 100644
--- a/akka.iml
+++ b/akka.iml
@@ -1,5 +1,10 @@