diff --git a/LICENSE b/LICENSE index ec8ad0882e..c17d7ab321 100755 --- a/LICENSE +++ b/LICENSE @@ -1,15 +1,20 @@ This software is licensed under the Apache 2 license, quoted below. -Copyright 2009 Scalable Solutions AB +Copyright 2009 Scalable Solutions AB [http://scalablesolutions.se] Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + [http://www.apache.org/licenses/LICENSE-2.0] Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. + +--------------- + +Licenses for dependency projects can be found here: +[http://wiki.github.com/jboner/akka/licenses] \ No newline at end of file diff --git a/akka-actors/pom.xml b/akka-actors/pom.xml index cb76e1fbb5..3b8564235a 100644 --- a/akka-actors/pom.xml +++ b/akka-actors/pom.xml @@ -62,6 +62,11 @@ javautils 2.7.4-0.1 + + org.multiverse + multiverse + 0.3 + diff --git a/akka-actors/src/main/scala/serialization/Serializer.scala b/akka-actors/src/main/scala/serialization/Serializer.scala index 2d2917e0b5..10b781fbf9 100644 --- a/akka-actors/src/main/scala/serialization/Serializer.scala +++ b/akka-actors/src/main/scala/serialization/Serializer.scala @@ -9,7 +9,7 @@ import java.io.{ObjectOutputStream, ByteArrayOutputStream, ObjectInputStream, By import reflect.{BeanProperty, Manifest} import sbinary.DefaultProtocol import org.codehaus.jackson.map.ObjectMapper -import sjson.json.{Serializer=>SJSONSerializer} +import sjson.json.{Serializer =>SJSONSerializer} /** * @author Jonas Bonér diff --git a/akka-actors/src/main/scala/stm/ChangeSet.scala b/akka-actors/src/main/scala/stm/ChangeSet.scala deleted file mode 100644 index 440df8c18d..0000000000 --- a/akka-actors/src/main/scala/stm/ChangeSet.scala +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Copyright (C) 2009 Scalable Solutions. - */ - -package se.scalablesolutions.akka.stm - -import state.{Transactional, TransactionalMap} -import util.Helpers.ReadWriteLock -import scala.collection.immutable.HashSet - -@serializable -class ChangeSet { - private val lock = new ReadWriteLock - - private var transactionalItems: Set[Transactional] = new HashSet - private[akka] def +(item: Transactional) = lock.withWriteLock { - transactionalItems += item - } - private[akka] def items: List[Transactional] = lock.withReadLock { - transactionalItems.toList.asInstanceOf[List[Transactional]] - } - - private[akka] def clear = lock.withWriteLock { - transactionalItems = new HashSet - } - - // FIXME: add hashCode and equals - VERY IMPORTANT -} - diff --git a/akka-actors/src/main/scala/stm/HashTrie.scala b/akka-actors/src/main/scala/stm/HashTrie.scala index 02ad19ffe9..44d83d127b 100755 --- a/akka-actors/src/main/scala/stm/HashTrie.scala +++ b/akka-actors/src/main/scala/stm/HashTrie.scala @@ -34,6 +34,8 @@ package se.scalablesolutions.akka.collection +trait PersistentDataStructure + /** * A clean-room port of Rich Hickey's persistent hash trie implementation from * Clojure (http://clojure.org). Originally presented as a mutable structure in @@ -43,7 +45,7 @@ package se.scalablesolutions.akka.collection * @author Rich Hickey */ @serializable -final class HashTrie[K, +V] private (root: Node[K, V]) extends Map[K, V] { +final class HashTrie[K, +V] private (root: Node[K, V]) extends Map[K, V] with PersistentDataStructure { lazy val size = root.size def this() = this(new EmptyNode[K]) diff --git a/akka-actors/src/main/scala/stm/Transaction.scala b/akka-actors/src/main/scala/stm/Transaction.scala index f7aba599d8..8e5174da81 100644 --- a/akka-actors/src/main/scala/stm/Transaction.scala +++ b/akka-actors/src/main/scala/stm/Transaction.scala @@ -12,6 +12,7 @@ import org.multiverse.api.{Transaction => MultiverseTransaction} import org.multiverse.stms.alpha.AlphaStm import org.multiverse.utils.GlobalStmInstance import org.multiverse.utils.TransactionThreadLocal._ +import org.multiverse.templates.{OrElseTemplate, AtomicTemplate} import java.util.concurrent.atomic.AtomicInteger @@ -23,7 +24,34 @@ object Multiverse { GlobalStmInstance.set(STM) setThreadLocalTransaction(null) } - + +/** + * Example of Or-Else transaction management. + *
+ * import se.scalablesolutions.akka.stm.{Transaction => Tx}
+ * Tx.Or {
+ *   .. // try to do something
+ * } Tx.Else {
+ *   .. // if transaction clashes try do do something else to minimize contention
+ * }
+ * 
+ * + * @author Jonas Bonér + */ +object Transaction { + + // -- Monad -------------------------- + + + // -- OrElse -------------------------- + def Or[A](orBody: => A) = elseBody(orBody) + def elseBody[A](orBody: => A) = new { + def Else(elseBody: => A) = new OrElseTemplate[A] { + def run(t: MultiverseTransaction) = orBody + def orelserun(t: MultiverseTransaction) = elseBody + }.execute + } +} /** * @author Jonas Bonér */ @@ -31,9 +59,7 @@ object Multiverse { private[this] var _id = 0L def id = _id @volatile private[this] var status: TransactionStatus = TransactionStatus.New - private[kernel] var transaction: MultiverseTransaction = _ - - private[this] val transactionalItems = new ChangeSet + private[akka] var transaction: MultiverseTransaction = _ private[this] var participants: List[String] = Nil private[this] var precommitted: List[String] = Nil @@ -46,12 +72,11 @@ object Multiverse { def register(transactional: Transactional) = synchronized { ensureIsActiveOrNew - transactionalItems + transactional } def begin(participant: String) = synchronized { ensureIsActiveOrNew - transaction = Multiverse.STM.startUpdateTransaction + transaction = Multiverse.STM.startUpdateTransaction("akka") _id = transaction.getReadVersion log.debug("Creating a new transaction with id [%s]", _id) @@ -80,7 +105,6 @@ object Multiverse { } else false if (haveAllPreCommitted && transaction != null) { transaction.commit - transactionalItems.items.foreach(_.commit) status = TransactionStatus.Completed reset true @@ -118,7 +142,6 @@ object Multiverse { def isAborted = status == TransactionStatus.Aborted private def reset = { - transactionalItems.clear participants = Nil precommitted = Nil } @@ -133,7 +156,7 @@ object Multiverse { throw new IllegalStateException("Expected ACTIVE or NEW transaction - current status [" + status + "]: " + toString) // For reinitialize transaction after sending it over the wire - private[kernel] def reinit = synchronized { + private[akka] def reinit = synchronized { import net.lag.logging.{Logger, Level} if (log == null) { log = Logger.get(this.getClass.getName) @@ -155,31 +178,6 @@ object Multiverse { } - - - - - - - - - - - - - - - - - - - - - - - - - @serializable sealed abstract class TransactionStatus object TransactionStatus { case object New extends TransactionStatus diff --git a/akka-actors/src/main/scala/stm/TransactionManagement.scala b/akka-actors/src/main/scala/stm/TransactionManagement.scala index 37ebeb4e29..fc7d1b0334 100644 --- a/akka-actors/src/main/scala/stm/TransactionManagement.scala +++ b/akka-actors/src/main/scala/stm/TransactionManagement.scala @@ -50,7 +50,6 @@ trait TransactionManagement extends Logging { activeTx = tx threadBoundTx.set(tx) setThreadLocalTransaction(tx.get.transaction) - println("------ Start: " + tx.get.transaction) tx } diff --git a/akka-actors/src/main/scala/stm/TransactionalState.scala b/akka-actors/src/main/scala/stm/TransactionalState.scala index c671a72ea5..b562e73532 100644 --- a/akka-actors/src/main/scala/stm/TransactionalState.scala +++ b/akka-actors/src/main/scala/stm/TransactionalState.scala @@ -4,6 +4,7 @@ package se.scalablesolutions.akka.state +import org.multiverse.datastructures.refs.manual.Ref import stm.TransactionManagement import akka.collection._ @@ -17,6 +18,14 @@ import scala.collection.mutable.{ArrayBuffer, HashMap} * Example Scala usage: *
  * val myMap = TransactionalState.newMap
+ * val myVector = TransactionalState.newVector
+ * val myRef = TransactionalState.newRef
+ * 
+ * Or: + *
+ * val myMap = TransactionalMap()
+ * val myVector = TransactionalVector()
+ * val myRef = TransactionalRef()
  * 
*/ object TransactionalState extends TransactionalState @@ -31,9 +40,9 @@ object TransactionalState extends TransactionalState * */ class TransactionalState { - def newMap[K, V]: TransactionalMap[K, V] = new InMemoryTransactionalMap[K, V] - def newVector[T]: TransactionalVector[T] = new InMemoryTransactionalVector[T] - def newRef[T]: TransactionalRef[T] = new TransactionalRef[T] + def newMap[K, V] = new TransactionalMap[K, V] + def newVector[T] = new TransactionalVector[T] + def newRef[T] = new TransactionalRef[T] } /** @@ -43,203 +52,175 @@ class TransactionalState { trait Transactional { // FIXME: won't work across the cluster val uuid = Uuid.newUuid.toString - - private[akka] def begin - private[akka] def commit - private[akka] def rollback - - protected def verifyTransaction = { - val cflowTx = TransactionManagement.threadBoundTx.get - if (!cflowTx.isDefined) { - throw new IllegalStateException("Can't access transactional reference outside the scope of a transaction [" + this + "]") - } else { - cflowTx.get.register(this) - } - } } /** - * Base trait for all state implementations (persistent or in-memory). - * - * FIXME: Create Java versions using pcollections - * - * @author Jonas Bonér - */ -trait TransactionalMap[K, V] extends Transactional with scala.collection.mutable.Map[K, V] { - override def hashCode: Int = System.identityHashCode(this); - override def equals(other: Any): Boolean = false - def remove(key: K) -} - -/** - * Not thread-safe, but should only be using from within an Actor, e.g. one single thread at a time. - * - * @author Jonas Bonér - */ -class InMemoryTransactionalMap[K, V] extends TransactionalMap[K, V] { - protected[akka] var state = new HashTrie[K, V] - protected[akka] var snapshot = state - - // ---- For Transactional ---- - override def begin = snapshot = state - override def commit = snapshot = state - override def rollback = state = snapshot - - // ---- Overriding scala.collection.mutable.Map behavior ---- - override def contains(key: K): Boolean = { - verifyTransaction - state.contains(key) - } - - override def clear = { - verifyTransaction - state = new HashTrie[K, V] - } - - override def size: Int = { - verifyTransaction - state.size - } - - // ---- For scala.collection.mutable.Map ---- - override def remove(key: K) = { - verifyTransaction - state = state - key - } - - override def elements: Iterator[(K, V)] = { -// verifyTransaction - state.elements - } - - override def get(key: K): Option[V] = { - verifyTransaction - state.get(key) - } - - override def put(key: K, value: V): Option[V] = { - verifyTransaction - val oldValue = state.get(key) - state = state.update(key, value) - oldValue - } - - override def -=(key: K) = { - verifyTransaction - remove(key) - } - - override def update(key: K, value: V) = { - verifyTransaction - put(key, value) - } -} - -/** - * Base for all transactional vector implementations. - * - * @author Jonas Bonér - */ -abstract class TransactionalVector[T] extends Transactional with RandomAccessSeq[T] { - override def hashCode: Int = System.identityHashCode(this); - override def equals(other: Any): Boolean = false - - def add(elem: T) - - def get(index: Int): T - - def getRange(start: Int, count: Int): List[T] -} - -/** - * Implements an in-memory transactional vector. - * - * Not thread-safe, but should only be using from within an Actor, e.g. one single thread at a time. - * - * @author Jonas Bonér - */ -class InMemoryTransactionalVector[T] extends TransactionalVector[T] { - private[akka] var state: Vector[T] = EmptyVector - private[akka] var snapshot = state - - def add(elem: T) = { - verifyTransaction - state = state + elem - } - - def get(index: Int): T = { - verifyTransaction - state(index) - } - - def getRange(start: Int, count: Int): List[T] = { - verifyTransaction - state.slice(start, count).toList.asInstanceOf[List[T]] - } - - // ---- For Transactional ---- - override def begin = snapshot = state - - override def commit = snapshot = state - - override def rollback = state = snapshot - - // ---- For Seq ---- - def length: Int = { - verifyTransaction - state.length - } - - def apply(index: Int): T = { - verifyTransaction - state(index) - } - - override def elements: Iterator[T] = { - //verifyTransaction - state.elements - } - - override def toList: List[T] = { - verifyTransaction - state.toList - } -} - -/** - * Implements a transactional reference. - * - * Not thread-safe, but should only be using from within an Actor, e.g. one single thread at a time. + * Implements a transactional managed reference. * * @author Jonas Bonér */ class TransactionalRef[T] extends Transactional { - private[akka] var ref: Option[T] = None - private[akka] var snapshot: Option[T] = None - - override def begin = if (ref.isDefined) snapshot = Some(ref.get) - - override def commit = if (ref.isDefined) snapshot = Some(ref.get) - - override def rollback = if (snapshot.isDefined) ref = Some(snapshot.get) - + protected[this] var ref: Option[Ref[T]] = None + + def set(elem: T) = swap(elem) + def swap(elem: T) = { - verifyTransaction - ref = Some(elem) + synchronized { if (ref.isEmpty) ref = Some(new Ref[T]) } + ref.get.set(elem) + } + + def get: Option[T] = + if (isEmpty) None + else Some(ref.get.get) + + def getOrWait: T = { + synchronized { if (ref.isEmpty) ref = Some(new Ref[T]) } + ref.get.getOrAwait + } + + def getOrElse(default: => T): T = + if (isEmpty) default + else ref.get.get + + def isDefined: Boolean = ref.isDefined && !ref.get.isNull + + def isEmpty: Boolean = !isDefined +} + +object TransactionalRef { + def apply[T](elem: T) = { + if (elem == null) throw new IllegalArgumentException("Can't define TransactionalRef with a null initial value, needs to be a PersistentDataStructure") + val ref = new TransactionalRef[T] + ref.swap(elem) + ref + } +} + +/** + * Implements an in-memory transactional Map based on Clojure's PersistentMap. + * + * Not thread-safe, but should only be using from within an Actor, e.g. one single thread at a time. + * + * @author Jonas Bonér + */ +class TransactionalMap[K, V] extends Transactional with scala.collection.mutable.Map[K, V] { + protected[this] val ref = TransactionalRef[HashTrie[K, V]](new HashTrie[K, V]) + + def -=(key: K) = remove(key) + + def +=(key: K, value: V) = put(key, value) + + def remove(key: K) = ref.swap(ref.get.get - key) + + def apply(key: K): Option[V] = get(key) + + def get(key: K): Option[V] = ref.get.get.get(key) + + override def put(key: K, value: V): Option[V] = { + val map = ref.get.get + val oldValue = map.get(key) + ref.swap(map.update(key, value)) + oldValue + } + + def update(key: K, value: V) = { + val map = ref.get.get + val oldValue = map.get(key) + ref.swap(map.update(key, value)) + } + + def elements: Iterator[(K, V)] = ref.get.get.elements + + override def contains(key: K): Boolean = ref.get.get.contains(key) + + override def clear = ref.swap(new HashTrie[K, V]) + + def size: Int = ref.get.get.size + + override def hashCode: Int = System.identityHashCode(this); + + override def equals(other: Any): Boolean = + other.isInstanceOf[TransactionalMap[_, _]] && + other.hashCode == hashCode +} + +object TransactionalMap { + def apply[T]() = new TransactionalMap +} + +/** + * Implements an in-memory transactional Vector based on Clojure's PersistentVector. + * + * Not thread-safe, but should only be using from within an Actor, e.g. one single thread at a time. + * + * @author Jonas Bonér + */ +class TransactionalVector[T] extends Transactional with RandomAccessSeq[T] { + private[this] val ref = TransactionalRef[Vector[T]](EmptyVector) + + def clear = ref.swap(EmptyVector) + + def +(elem: T) = add(elem) + + def add(elem: T) = ref.swap(ref.get.get + elem) + + def get(index: Int): T = ref.get.get.apply(index) + + /** + * Removes the tail element of this vector. + */ + def pop = ref.swap(ref.get.get.pop) + + def update(index: Int, elem: T) = ref.swap(ref.get.get.update(index, elem)) + + def length: Int = ref.get.get.length + + def apply(index: Int): T = ref.get.get.apply(index) + + override def hashCode: Int = System.identityHashCode(this); + + override def equals(other: Any): Boolean = + other.isInstanceOf[TransactionalVector[_]] && + other.hashCode == hashCode +} + +object TransactionalVector { + def apply[T]() = new TransactionalVector +} + + +/* +class TransactionalRef[T] private(elem: T) extends Transactional { + private[this] val ref = new Ref[T](elem) + + def swap(elem: T) = { + println("----- setting ref: " + ref) + println("----- setting in thread: " + Thread.currentThread) + ref.set(elem) } def get: Option[T] = { - verifyTransaction - ref + if (ref.isNull) None + else Some(ref.get) } - + + def getOrWait: T = ref.getOrAwait + def getOrElse(default: => T): T = { - verifyTransaction - ref.getOrElse(default) + if (ref.isNull) default + else ref.get } - def isDefined: Boolean = { - verifyTransaction - ref.isDefined + def isDefined: Boolean = !ref.isNull + + def isEmpty: Boolean = ref.isNull +} + +object TransactionalRef { + def apply[T](elem: T) = { + if (elem == null) throw new IllegalArgumentException("Can't define TransactionalRef with a null initial value") + new TransactionalRef[T](elem) } } +*/ + diff --git a/akka-actors/src/main/scala/stm/Vector.scala b/akka-actors/src/main/scala/stm/Vector.scala index 9ead78e3b5..d743c82ddb 100644 --- a/akka-actors/src/main/scala/stm/Vector.scala +++ b/akka-actors/src/main/scala/stm/Vector.scala @@ -43,7 +43,8 @@ import Vector._ * @author Rich Hickey */ @serializable -class Vector[+T] private (val length: Int, shift: Int, root: Array[AnyRef], tail: Array[AnyRef]) extends RandomAccessSeq[T] { outer => +class Vector[+T] private (val length: Int, shift: Int, root: Array[AnyRef], tail: Array[AnyRef]) + extends RandomAccessSeq[T] with PersistentDataStructure { outer => private val tailOff = length - tail.length /* diff --git a/akka-actors/src/test/scala/InMemoryActorSpec.scala b/akka-actors/src/test/scala/InMemoryActorSpec.scala index 85e3e32599..d152522b6d 100644 --- a/akka-actors/src/test/scala/InMemoryActorSpec.scala +++ b/akka-actors/src/test/scala/InMemoryActorSpec.scala @@ -26,6 +26,7 @@ case class FailureOneWay(key: String, value: String, failer: Actor) class InMemStatefulActor extends Actor { timeout = 100000 makeTransactionRequired + //dispatcher = se.scalablesolutions.akka.reactor.Dispatchers.newThreadBasedDispatcher(this) private val mapState = TransactionalState.newMap[String, String] private val vectorState = TransactionalState.newVector[String] private val refState = TransactionalState.newRef[String] @@ -86,7 +87,7 @@ class InMemFailerActor extends Actor { } class InMemoryActorSpec extends TestCase { -/* + @Test def testOneWayMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess = { val stateful = new InMemStatefulActor @@ -213,7 +214,7 @@ class InMemoryActorSpec extends TestCase { Thread.sleep(100) assertEquals("init", (stateful !! GetRefState).get) // check that state is == init state } - */ + @Test def testRefShouldRollbackStateForStatefulServerInCaseOfFailure = { val stateful = new InMemStatefulActor diff --git a/akka-kernel/pom.xml b/akka-kernel/pom.xml index f20026d7a7..66242c456d 100755 --- a/akka-kernel/pom.xml +++ b/akka-kernel/pom.xml @@ -95,15 +95,14 @@ 2.7.4-0.1
-<<<<<<< HEAD:kernel/pom.xml org.multiverse multiverse 0.3 -======= + + com.rabbitmq rabbitmq-client 0.9.1 ->>>>>>> master:akka-kernel/pom.xml diff --git a/akka-persistence/src/main/scala/CassandraSession.scala b/akka-persistence/src/main/scala/CassandraSession.scala index fc2121a33c..aeea811382 100644 --- a/akka-persistence/src/main/scala/CassandraSession.scala +++ b/akka-persistence/src/main/scala/CassandraSession.scala @@ -9,11 +9,14 @@ import java.io.{Flushable, Closeable} import util.Logging import util.Helpers._ import serialization.Serializer -import akka.Config.config +import Config.config import org.apache.cassandra.db.ColumnFamily import org.apache.cassandra.service._ +import org.apache.commons.pool._ +import org.apache.commons.pool.impl._ + import org.apache.thrift.transport._ import org.apache.thrift.protocol._ @@ -238,3 +241,89 @@ object Protocol { object SimpleJSON extends Protocol(new TSimpleJSONProtocol.Factory) object JSON extends Protocol(new TJSONProtocol.Factory) } + +trait Pool[T] extends java.io.Closeable { + def borrowObject: T + def returnObject(t: T): Unit + def invalidateObject(t: T): Unit + def addObject: Unit + def getNumIdle: Int + def getNumActive: Int + def clear: Unit + def setFactory(factory: PoolItemFactory[T]): Unit +} + +trait PoolFactory[T] { + def createPool: Pool[T] +} + +trait PoolItemFactory[T] { + def makeObject: T + def destroyObject(t: T): Unit + def validateObject(t: T): Boolean + def activateObject(t: T): Unit + def passivateObject(t: T): Unit +} + +trait PoolBridge[T, OP <: ObjectPool] extends Pool[T] { + val impl: OP + override def borrowObject: T = impl.borrowObject.asInstanceOf[T] + override def returnObject(t: T) = impl.returnObject(t) + override def invalidateObject(t: T) = impl.invalidateObject(t) + override def addObject = impl.addObject + override def getNumIdle: Int = impl.getNumIdle + override def getNumActive: Int = impl.getNumActive + override def clear: Unit = impl.clear + override def close: Unit = impl.close + override def setFactory(factory: PoolItemFactory[T]) = impl.setFactory(toPoolableObjectFactory(factory)) + + def toPoolableObjectFactory[T](pif: PoolItemFactory[T]) = new PoolableObjectFactory { + def makeObject: Object = pif.makeObject.asInstanceOf[Object] + def destroyObject(o: Object): Unit = pif.destroyObject(o.asInstanceOf[T]) + def validateObject(o: Object): Boolean = pif.validateObject(o.asInstanceOf[T]) + def activateObject(o: Object): Unit = pif.activateObject(o.asInstanceOf[T]) + def passivateObject(o: Object): Unit = pif.passivateObject(o.asInstanceOf[T]) + } +} + +object StackPool { + def apply[T](factory: PoolItemFactory[T]) = new PoolBridge[T,StackObjectPool] { + val impl = new StackObjectPool(toPoolableObjectFactory(factory)) + } + + def apply[T](factory: PoolItemFactory[T], maxIdle: Int) = new PoolBridge[T,StackObjectPool] { + val impl = new StackObjectPool(toPoolableObjectFactory(factory),maxIdle) + } + + def apply[T](factory: PoolItemFactory[T], maxIdle: Int, initIdleCapacity: Int) = new PoolBridge[T,StackObjectPool] { + val impl = new StackObjectPool(toPoolableObjectFactory(factory),maxIdle,initIdleCapacity) + } +} + +object SoftRefPool { + def apply[T](factory: PoolItemFactory[T]) = new PoolBridge[T,SoftReferenceObjectPool] { + val impl = new SoftReferenceObjectPool(toPoolableObjectFactory(factory)) + } + + def apply[T](factory: PoolItemFactory[T], initSize: Int) = new PoolBridge[T,SoftReferenceObjectPool] { + val impl = new SoftReferenceObjectPool(toPoolableObjectFactory(factory),initSize) + } +} + +trait TransportFactory[T <: TTransport] extends PoolItemFactory[T] { + def createTransport: T + def makeObject: T = createTransport + def destroyObject(transport: T): Unit = transport.close + def validateObject(transport: T) = transport.isOpen + def activateObject(transport: T): Unit = if( !transport.isOpen ) transport.open else () + def passivateObject(transport: T): Unit = transport.flush +} + +case class SocketProvider(val host: String, val port: Int) extends TransportFactory[TSocket] { + def createTransport = { + val t = new TSocket(host, port) + t.open + t + } +} + diff --git a/akka-persistence/src/main/scala/CassandraStorage.scala b/akka-persistence/src/main/scala/CassandraStorage.scala index 02f5f5dbec..3537b2f2eb 100644 --- a/akka-persistence/src/main/scala/CassandraStorage.scala +++ b/akka-persistence/src/main/scala/CassandraStorage.scala @@ -17,6 +17,8 @@ import org.apache.cassandra.service._ import org.apache.thrift.transport._ import org.apache.thrift.protocol._ +import scala.collection.mutable.ArrayBuffer + /** * @author Jonas Bonér */ @@ -62,11 +64,12 @@ object CassandraStorage extends MapStorage StackPool(SocketProvider(CASSANDRA_SERVER_HOSTNAME, CASSANDRA_SERVER_PORT)), protocol, CONSISTENCY_LEVEL) + // =============================================================== // For Ref // =============================================================== - override def insertRefStorageFor(name: String, element: AnyRef) = { + def insertRefStorageFor(name: String, element: AnyRef) = { sessions.withSession { _ ++| (name, new ColumnPath(REF_COLUMN_PARENT.getColumn_family, null, REF_KEY), @@ -76,7 +79,7 @@ object CassandraStorage extends MapStorage } } - override def getRefStorageFor(name: String): Option[AnyRef] = { + def getRefStorageFor(name: String): Option[AnyRef] = { try { val column: Option[Column] = sessions.withSession { _ | (name, new ColumnPath(REF_COLUMN_PARENT.getColumn_family, null, REF_KEY)) @@ -94,7 +97,7 @@ object CassandraStorage extends MapStorage // For Vector // =============================================================== - override def insertVectorStorageEntryFor(name: String, element: AnyRef) = { + def insertVectorStorageEntryFor(name: String, element: AnyRef) = { sessions.withSession { _ ++| (name, new ColumnPath(VECTOR_COLUMN_PARENT.getColumn_family, null, intToBytes(getVectorStorageSizeFor(name))), @@ -104,10 +107,20 @@ object CassandraStorage extends MapStorage } } - override def insertVectorStorageEntriesFor(name: String, elements: List[AnyRef]) = { + def insertVectorStorageEntriesFor(name: String, elements: List[AnyRef]) = { } - override def getVectorStorageEntryFor(name: String, index: Int): AnyRef = { + def updateVectorStorageEntryFor(name: String, index: Int, elem: AnyRef) = { + sessions.withSession { + _ ++| (name, + new ColumnPath(VECTOR_COLUMN_PARENT.getColumn_family, null, intToBytes(index)), + serializer.out(elem), + System.currentTimeMillis, + CONSISTENCY_LEVEL) + } + } + + def getVectorStorageEntryFor(name: String, index: Int): AnyRef = { val column: Option[Column] = sessions.withSession { _ | (name, new ColumnPath(VECTOR_COLUMN_PARENT.getColumn_family, null, intToBytes(index))) } @@ -115,7 +128,7 @@ object CassandraStorage extends MapStorage else throw new NoSuchElementException("No element for vector [" + name + "] and index [" + index + "]") } - override def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[AnyRef] = { + def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): RandomAccessSeq[AnyRef] = { val startBytes = if (start.isDefined) intToBytes(start.get) else null val finishBytes = if (finish.isDefined) intToBytes(finish.get) else null val columns: List[Column] = sessions.withSession { @@ -126,10 +139,12 @@ object CassandraStorage extends MapStorage count, CONSISTENCY_LEVEL) } - columns.map(column => serializer.in(column.value, None)) + val buffer = new ArrayBuffer[AnyRef] + for (elem <- columns.map(column => serializer.in(column.value, None))) buffer.append(elem) + buffer } - override def getVectorStorageSizeFor(name: String): Int = { + def getVectorStorageSizeFor(name: String): Int = { sessions.withSession { _ |# (name, VECTOR_COLUMN_PARENT) } @@ -139,7 +154,7 @@ object CassandraStorage extends MapStorage // For Map // =============================================================== - override def insertMapStorageEntryFor(name: String, key: AnyRef, element: AnyRef) = { + def insertMapStorageEntryFor(name: String, key: AnyRef, element: AnyRef) = { sessions.withSession { _ ++| (name, new ColumnPath(MAP_COLUMN_PARENT.getColumn_family, null, serializer.out(key)), @@ -149,7 +164,7 @@ object CassandraStorage extends MapStorage } } - override def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[AnyRef, AnyRef]]) = { + def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[AnyRef, AnyRef]]) = { val cf2columns: java.util.Map[String, java.util.List[Column]] = new java.util.HashMap for (entry <- entries) { val columns: java.util.List[Column] = new java.util.ArrayList @@ -161,7 +176,7 @@ object CassandraStorage extends MapStorage } } - override def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef] = { + def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef] = { try { val column: Option[Column] = sessions.withSession { _ | (name, new ColumnPath(MAP_COLUMN_PARENT.getColumn_family, null, serializer.out(key))) @@ -175,7 +190,7 @@ object CassandraStorage extends MapStorage } } - override def getMapStorageFor(name: String): List[Tuple2[AnyRef, AnyRef]] = { + def getMapStorageFor(name: String): List[Tuple2[AnyRef, AnyRef]] = { throw new UnsupportedOperationException /* val columns = server.get_columns_since(name, MAP_COLUMN_FAMILY, -1) @@ -187,15 +202,15 @@ object CassandraStorage extends MapStorage */ } - override def getMapStorageSizeFor(name: String): Int = { + def getMapStorageSizeFor(name: String): Int = { sessions.withSession { _ |# (name, MAP_COLUMN_PARENT) } } - override def removeMapStorageFor(name: String): Unit = removeMapStorageFor(name, null) + def removeMapStorageFor(name: String): Unit = removeMapStorageFor(name, null) - override def removeMapStorageFor(name: String, key: AnyRef): Unit = { + def removeMapStorageFor(name: String, key: AnyRef): Unit = { val keyBytes = if (key == null) null else serializer.out(key) sessions.withSession { _ -- (name, @@ -205,7 +220,7 @@ object CassandraStorage extends MapStorage } } - override def getMapStorageRangeFor(name: String, start: Option[AnyRef], finish: Option[AnyRef], count: Int): + def getMapStorageRangeFor(name: String, start: Option[AnyRef], finish: Option[AnyRef], count: Int): List[Tuple2[AnyRef, AnyRef]] = { val startBytes = if (start.isDefined) serializer.out(start.get) else null val finishBytes = if (finish.isDefined) serializer.out(finish.get) else null diff --git a/akka-persistence/src/main/scala/MongoStorage.scala b/akka-persistence/src/main/scala/MongoStorage.scala index bea43fbf23..a8c921a44b 100644 --- a/akka-persistence/src/main/scala/MongoStorage.scala +++ b/akka-persistence/src/main/scala/MongoStorage.scala @@ -1,287 +1,261 @@ -package se.scalablesolutions.akka.state - -import akka.util.Logging -import serialization.{Serializer} -import akka.Config.config -import sjson.json.Serializer._ - -import com.mongodb._ - -import java.util.{Map=>JMap, List=>JList, ArrayList=>JArrayList} - -/** - * A module for supporting MongoDB based persistence. - *

- * The module offers functionality for: - *

  • Persistent Maps
  • - *
  • Persistent Vectors
  • - *
  • Persistent Refs
  • - *

    - * @author Debasish Ghosh - */ -object MongoStorage extends MapStorage with VectorStorage with RefStorage with Logging { - - // enrich with null safe findOne - class RichDBCollection(value: DBCollection) { - def findOneNS(o: DBObject): Option[DBObject] = { - value.findOne(o) match { - case null => None - case x => Some(x) - } - } - } - - implicit def enrichDBCollection(c: DBCollection) = new RichDBCollection(c) - - val KEY = "key" - val VALUE = "value" - val COLLECTION = "akka_coll" - - val MONGODB_SERVER_HOSTNAME = config.getString("akka.storage.mongodb.hostname", "127.0.0.1") - val MONGODB_SERVER_DBNAME = config.getString("akka.storage.mongodb.dbname", "testdb") - val MONGODB_SERVER_PORT = config.getInt("akka.storage.mongodb.port", 27017) - - val db = new Mongo(MONGODB_SERVER_HOSTNAME, MONGODB_SERVER_PORT, MONGODB_SERVER_DBNAME) - val coll = db.getCollection(COLLECTION) - - // FIXME: make this pluggable - private[this] val serializer = SJSON - - override def insertMapStorageEntryFor(name: String, - key: AnyRef, value: AnyRef) { - insertMapStorageEntriesFor(name, List((key, value))) - } - - override def insertMapStorageEntriesFor(name: String, - entries: List[Tuple2[AnyRef, AnyRef]]) { - import java.util.{Map, HashMap} - - val m: Map[AnyRef, AnyRef] = new HashMap - for ((k, v) <- entries) { - m.put(k, serializer.out(v)) - } - - nullSafeFindOne(name) match { - case None => - coll.insert(new BasicDBObject().append(KEY, name).append(VALUE, m)) - case Some(dbo) => { - // collate the maps - val o = dbo.get(VALUE).asInstanceOf[Map[AnyRef, AnyRef]] - o.putAll(m) - - // remove existing reference - removeMapStorageFor(name) - // and insert - coll.insert(new BasicDBObject().append(KEY, name).append(VALUE, o)) - } - } - } - - override def removeMapStorageFor(name: String) = { - val q = new BasicDBObject - q.put(KEY, name) - coll.remove(q) - } - - override def removeMapStorageFor(name: String, key: AnyRef) = { - nullSafeFindOne(name) match { - case None => - case Some(dbo) => { - val orig = dbo.get(VALUE).asInstanceOf[DBObject].toMap - orig.remove(key.asInstanceOf[String]) - - // remove existing reference - removeMapStorageFor(name) - // and insert - coll.insert(new BasicDBObject().append(KEY, name).append(VALUE, orig)) - } - } - } - - override def getMapStorageEntryFor(name: String, - key: AnyRef): Option[AnyRef] = { - getValueForKey(name, key.asInstanceOf[String]) - } - - override def getMapStorageSizeFor(name: String): Int = { - nullSafeFindOne(name) match { - case None => 0 - case Some(dbo) => - dbo.get(VALUE).asInstanceOf[JMap[String, AnyRef]].keySet.size - } - } - - override def getMapStorageFor(name: String): List[Tuple2[AnyRef, AnyRef]] = { - val m = - nullSafeFindOne(name) match { - case None => - throw new Predef.NoSuchElementException(name + " not present") - case Some(dbo) => - dbo.get(VALUE).asInstanceOf[JMap[String, AnyRef]] - } - val n = - List(m.keySet.toArray: _*).asInstanceOf[List[String]] - val vals = - for(s <- n) - yield (s, serializer.in[AnyRef](m.get(s).asInstanceOf[Array[Byte]])) - vals.asInstanceOf[List[Tuple2[String, AnyRef]]] - } - - override def getMapStorageRangeFor(name: String, start: Option[AnyRef], - finish: Option[AnyRef], - count: Int): List[Tuple2[AnyRef, AnyRef]] = { - val m = - nullSafeFindOne(name) match { - case None => - throw new Predef.NoSuchElementException(name + " not present") - case Some(dbo) => - dbo.get(VALUE).asInstanceOf[JMap[String, AnyRef]] - } - - /** - * count is the max number of results to return. Start with - * start or 0 (if start is not defined) and go until - * you hit finish or count. - */ - val s = if (start.isDefined) start.get.asInstanceOf[Int] else 0 - val cnt = - if (finish.isDefined) { - val f = finish.get.asInstanceOf[Int] - if (f >= s) Math.min(count, (f - s)) else count - } - else count - - val n = - List(m.keySet.toArray: _*).asInstanceOf[List[String]].sort((e1, e2) => (e1 compareTo e2) < 0).slice(s, s + cnt) - val vals = - for(s <- n) - yield (s, serializer.in[AnyRef](m.get(s).asInstanceOf[Array[Byte]])) - vals.asInstanceOf[List[Tuple2[String, AnyRef]]] - } - - private def getValueForKey(name: String, key: String): Option[AnyRef] = { - try { - nullSafeFindOne(name) match { - case None => None - case Some(dbo) => - Some(serializer.in[AnyRef]( - dbo.get(VALUE) - .asInstanceOf[JMap[String, AnyRef]] - .get(key).asInstanceOf[Array[Byte]])) - } - } catch { - case e => - throw new Predef.NoSuchElementException(e.getMessage) - } - } - - override def insertVectorStorageEntriesFor(name: String, elements: List[AnyRef]) = { - val q = new BasicDBObject - q.put(KEY, name) - - val currentList = - coll.findOneNS(q) match { - case None => - new JArrayList[AnyRef] - case Some(dbo) => - dbo.get(VALUE).asInstanceOf[JArrayList[AnyRef]] - } - if (!currentList.isEmpty) { - // record exists - // remove before adding - coll.remove(q) - } - - // add to the current list - elements.map(serializer.out(_)).foreach(currentList.add(_)) - - coll.insert( - new BasicDBObject() - .append(KEY, name) - .append(VALUE, currentList) - ) - } - - override def insertVectorStorageEntryFor(name: String, element: AnyRef) = { - insertVectorStorageEntriesFor(name, List(element)) - } - - override def getVectorStorageEntryFor(name: String, index: Int): AnyRef = { - try { - val o = - nullSafeFindOne(name) match { - case None => - throw new Predef.NoSuchElementException(name + " not present") - - case Some(dbo) => - dbo.get(VALUE).asInstanceOf[JList[AnyRef]] - } - serializer.in[AnyRef]( - o.get(index).asInstanceOf[Array[Byte]]) - } catch { - case e => - throw new Predef.NoSuchElementException(e.getMessage) - } - } - - override def getVectorStorageRangeFor(name: String, - start: Option[Int], finish: Option[Int], count: Int): List[AnyRef] = { - try { - val o = - nullSafeFindOne(name) match { - case None => - throw new Predef.NoSuchElementException(name + " not present") - - case Some(dbo) => - dbo.get(VALUE).asInstanceOf[JList[AnyRef]] - } - - // pick the subrange and make a Scala list - val l = - List(o.subList(start.get, start.get + count).toArray: _*) - - for(e <- l) - yield serializer.in[AnyRef](e.asInstanceOf[Array[Byte]]) - } catch { - case e => - throw new Predef.NoSuchElementException(e.getMessage) - } - } - - override def getVectorStorageSizeFor(name: String): Int = { - nullSafeFindOne(name) match { - case None => 0 - case Some(dbo) => - dbo.get(VALUE).asInstanceOf[JList[AnyRef]].size - } - } - - private def nullSafeFindOne(name: String): Option[DBObject] = { - val o = new BasicDBObject - o.put(KEY, name) - coll.findOneNS(o) - } - - override def insertRefStorageFor(name: String, element: AnyRef) = { - nullSafeFindOne(name) match { - case None => - case Some(dbo) => { - val q = new BasicDBObject - q.put(KEY, name) - coll.remove(q) - } - } - coll.insert( - new BasicDBObject() - .append(KEY, name) - .append(VALUE, serializer.out(element))) - } - - override def getRefStorageFor(name: String): Option[AnyRef] = { - nullSafeFindOne(name) match { - case None => None - case Some(dbo) => - Some(serializer.in[AnyRef](dbo.get(VALUE).asInstanceOf[Array[Byte]])) - } - } -} +/** + * Copyright (C) 2009 Scalable Solutions. + */ + +package se.scalablesolutions.akka.state + +import akka.util.Logging +import serialization.{Serializer} +import Config.config +import sjson.json.Serializer._ + +import com.mongodb._ + +import scala.collection.mutable.ArrayBuffer + +import java.util.{Map=>JMap, List=>JList, ArrayList=>JArrayList} + +/** + * A module for supporting MongoDB based persistence. + *

    + * The module offers functionality for: + *

  • Persistent Maps
  • + *
  • Persistent Vectors
  • + *
  • Persistent Refs
  • + *

    + * @author Debasish Ghosh + */ +object MongoStorage extends MapStorage with VectorStorage with RefStorage with Logging { + + // enrich with null safe findOne + class RichDBCollection(value: DBCollection) { + def findOneNS(o: DBObject): Option[DBObject] = { + value.findOne(o) match { + case null => None + case x => Some(x) + } + } + } + + implicit def enrichDBCollection(c: DBCollection) = new RichDBCollection(c) + + val KEY = "key" + val VALUE = "value" + val COLLECTION = "akka_coll" + + val MONGODB_SERVER_HOSTNAME = config.getString("akka.storage.mongodb.hostname", "127.0.0.1") + val MONGODB_SERVER_DBNAME = config.getString("akka.storage.mongodb.dbname", "testdb") + val MONGODB_SERVER_PORT = config.getInt("akka.storage.mongodb.port", 27017) + + val db = new Mongo(MONGODB_SERVER_HOSTNAME, MONGODB_SERVER_PORT, MONGODB_SERVER_DBNAME) + val coll = db.getCollection(COLLECTION) + + // FIXME: make this pluggable + private[this] val serializer = SJSON + + def insertMapStorageEntryFor(name: String, + key: AnyRef, value: AnyRef) { + insertMapStorageEntriesFor(name, List((key, value))) + } + + def insertMapStorageEntriesFor(name: String, + entries: List[Tuple2[AnyRef, AnyRef]]) { + import java.util.{Map, HashMap} + + val m: Map[AnyRef, AnyRef] = new HashMap + for ((k, v) <- entries) m.put(k, serializer.out(v)) + + nullSafeFindOne(name) match { + case None => + coll.insert(new BasicDBObject().append(KEY, name).append(VALUE, m)) + case Some(dbo) => { + // collate the maps + val o = dbo.get(VALUE).asInstanceOf[Map[AnyRef, AnyRef]] + o.putAll(m) + + // remove existing reference + removeMapStorageFor(name) + // and insert + coll.insert(new BasicDBObject().append(KEY, name).append(VALUE, o)) + } + } + } + + def removeMapStorageFor(name: String) = { + val q = new BasicDBObject + q.put(KEY, name) + coll.remove(q) + } + + def removeMapStorageFor(name: String, key: AnyRef) = { + nullSafeFindOne(name) match { + case None => + case Some(dbo) => { + val orig = dbo.get(VALUE).asInstanceOf[DBObject].toMap + orig.remove(key.asInstanceOf[String]) + + // remove existing reference + removeMapStorageFor(name) + // and insert + coll.insert(new BasicDBObject().append(KEY, name).append(VALUE, orig)) + } + } + } + + def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef] = { + getValueForKey(name, key.asInstanceOf[String]) + } + + def getMapStorageSizeFor(name: String): Int = { + nullSafeFindOne(name) match { + case None => 0 + case Some(dbo) => + dbo.get(VALUE).asInstanceOf[JMap[String, AnyRef]].keySet.size + } + } + + def getMapStorageFor(name: String): List[Tuple2[AnyRef, AnyRef]] = { + val m = nullSafeFindOne(name) match { + case None => + throw new Predef.NoSuchElementException(name + " not present") + case Some(dbo) => + dbo.get(VALUE).asInstanceOf[JMap[String, AnyRef]] + } + val n = List(m.keySet.toArray: _*).asInstanceOf[List[String]] + val vals = for(s <- n) yield (s, serializer.in[AnyRef](m.get(s).asInstanceOf[Array[Byte]])) + vals.asInstanceOf[List[Tuple2[String, AnyRef]]] + } + + def getMapStorageRangeFor(name: String, start: Option[AnyRef], + finish: Option[AnyRef], + count: Int): List[Tuple2[AnyRef, AnyRef]] = { + val m = nullSafeFindOne(name) match { + case None => + throw new Predef.NoSuchElementException(name + " not present") + case Some(dbo) => + dbo.get(VALUE).asInstanceOf[JMap[String, AnyRef]] + } + + /** + * count is the max number of results to return. Start with + * start or 0 (if start is not defined) and go until + * you hit finish or count. + */ + val s = if (start.isDefined) start.get.asInstanceOf[Int] else 0 + val cnt = + if (finish.isDefined) { + val f = finish.get.asInstanceOf[Int] + if (f >= s) Math.min(count, (f - s)) else count + } + else count + + val n = List(m.keySet.toArray: _*).asInstanceOf[List[String]].sort((e1, e2) => (e1 compareTo e2) < 0).slice(s, s + cnt) + val vals = for(s <- n) yield (s, serializer.in[AnyRef](m.get(s).asInstanceOf[Array[Byte]])) + vals.asInstanceOf[List[Tuple2[String, AnyRef]]] + } + + private def getValueForKey(name: String, key: String): Option[AnyRef] = { + try { + nullSafeFindOne(name) match { + case None => None + case Some(dbo) => + Some(serializer.in[AnyRef](dbo.get(VALUE).asInstanceOf[JMap[String, AnyRef]].get(key).asInstanceOf[Array[Byte]])) + } + } catch { + case e => throw new Predef.NoSuchElementException(e.getMessage) + } + } + + def updateVectorStorageEntryFor(name: String, index: Int, elem: AnyRef) = throw new UnsupportedOperationException("The updateVectorStorageEntryFor method is not yet implemented for MongoDB") + + def insertVectorStorageEntriesFor(name: String, elements: List[AnyRef]) = { + val q = new BasicDBObject + q.put(KEY, name) + + val currentList = + coll.findOneNS(q) match { + case None => + new JArrayList[AnyRef] + case Some(dbo) => + dbo.get(VALUE).asInstanceOf[JArrayList[AnyRef]] + } + if (!currentList.isEmpty) { + // record exists + // remove before adding + coll.remove(q) + } + + // add to the current list + elements.map(serializer.out(_)).foreach(currentList.add(_)) + coll.insert(new BasicDBObject().append(KEY, name).append(VALUE, currentList)) + } + + def insertVectorStorageEntryFor(name: String, element: AnyRef) = insertVectorStorageEntriesFor(name, List(element)) + + def getVectorStorageEntryFor(name: String, index: Int): AnyRef = { + try { + val o = nullSafeFindOne(name) match { + case None => + throw new Predef.NoSuchElementException(name + " not present") + case Some(dbo) => + dbo.get(VALUE).asInstanceOf[JList[AnyRef]] + } + serializer.in[AnyRef](o.get(index).asInstanceOf[Array[Byte]]) + } catch { + case e => throw new Predef.NoSuchElementException(e.getMessage) + } + } + + def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): RandomAccessSeq[AnyRef] = { + try { + val o = nullSafeFindOne(name) match { + case None => + throw new Predef.NoSuchElementException(name + " not present") + case Some(dbo) => + dbo.get(VALUE).asInstanceOf[JList[AnyRef]] + } + + // pick the subrange and make a Scala list + val l = List(o.subList(start.get, start.get + count).toArray: _*) + val buffer = new ArrayBuffer[AnyRef] + for (elem <- l.map(e => serializer.in[AnyRef](e.asInstanceOf[Array[Byte]]))) buffer.append(elem) + buffer + } catch { + case e => throw new Predef.NoSuchElementException(e.getMessage) + } + } + + def getVectorStorageSizeFor(name: String): Int = { + nullSafeFindOne(name) match { + case None => 0 + case Some(dbo) => + dbo.get(VALUE).asInstanceOf[JList[AnyRef]].size + } + } + + private def nullSafeFindOne(name: String): Option[DBObject] = { + val o = new BasicDBObject + o.put(KEY, name) + coll.findOneNS(o) + } + + def insertRefStorageFor(name: String, element: AnyRef) = { + nullSafeFindOne(name) match { + case None => + case Some(dbo) => { + val q = new BasicDBObject + q.put(KEY, name) + coll.remove(q) + } + } + coll.insert(new BasicDBObject().append(KEY, name).append(VALUE, serializer.out(element))) + } + + def getRefStorageFor(name: String): Option[AnyRef] = { + nullSafeFindOne(name) match { + case None => None + case Some(dbo) => + Some(serializer.in[AnyRef](dbo.get(VALUE).asInstanceOf[Array[Byte]])) + } + } +} diff --git a/akka-persistence/src/main/scala/PersistentState.scala b/akka-persistence/src/main/scala/PersistentState.scala index 8617f05c7e..23f8464508 100644 --- a/akka-persistence/src/main/scala/PersistentState.scala +++ b/akka-persistence/src/main/scala/PersistentState.scala @@ -38,23 +38,23 @@ object PersistentState extends PersistentState * */ class PersistentState { - def newMap(config: PersistentStorageConfig): TransactionalMap[AnyRef, AnyRef] = config match { - case CassandraStorageConfig() => new CassandraPersistentTransactionalMap - case MongoStorageConfig() => new MongoPersistentTransactionalMap + def newMap(config: PersistentStorageConfig): PersistentMap[AnyRef, AnyRef] = config match { + case CassandraStorageConfig() => new CassandraPersistentMap + case MongoStorageConfig() => new MongoPersistentMap case TerracottaStorageConfig() => throw new UnsupportedOperationException case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException } - def newVector(config: PersistentStorageConfig): TransactionalVector[AnyRef] = config match { - case CassandraStorageConfig() => new CassandraPersistentTransactionalVector - case MongoStorageConfig() => new MongoPersistentTransactionalVector + def newVector(config: PersistentStorageConfig): PersistentVector[AnyRef] = config match { + case CassandraStorageConfig() => new CassandraPersistentVector + case MongoStorageConfig() => new MongoPersistentVector case TerracottaStorageConfig() => throw new UnsupportedOperationException case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException } - def newRef(config: PersistentStorageConfig): TransactionalRef[AnyRef] = config match { - case CassandraStorageConfig() => new CassandraPersistentTransactionalRef - case MongoStorageConfig() => new MongoPersistentTransactionalRef + def newRef(config: PersistentStorageConfig): PersistentRef[AnyRef] = config match { + case CassandraStorageConfig() => new CassandraPersistentRef + case MongoStorageConfig() => new MongoPersistentRef case TerracottaStorageConfig() => throw new UnsupportedOperationException case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException } @@ -68,32 +68,10 @@ class PersistentState { * * @author Jonas Bonér */ -abstract class PersistentTransactionalMap[K, V] extends TransactionalMap[K, V] { - - // FIXME: need to handle remove in another changeSet - protected[akka] val changeSet = new HashMap[K, V] - - def getRange(start: Option[AnyRef], count: Int) - - // ---- For Transactional ---- - override def begin = {} - - override def rollback = changeSet.clear - - // ---- For scala.collection.mutable.Map ---- - override def put(key: K, value: V): Option[V] = { - verifyTransaction - changeSet += key -> value - None // always return None to speed up writes (else need to go to DB to get - } - - override def -=(key: K) = remove(key) - - override def update(key: K, value: V) = put(key, value) -} +trait PersistentMap[K, V] extends Transactional with scala.collection.mutable.Map[K, V] /** - * Implementation of PersistentTransactionalMap for every concrete + * Implementation of PersistentMap for every concrete * storage will have the same workflow. This abstracts the workflow. * * Subclasses just need to provide the actual concrete instance for the @@ -101,86 +79,65 @@ abstract class PersistentTransactionalMap[K, V] extends TransactionalMap[K, V] { * * @author Jonas Bonér */ -abstract class TemplatePersistentTransactionalMap extends PersistentTransactionalMap[AnyRef, AnyRef] { +trait TemplatePersistentMap extends PersistentMap[AnyRef, AnyRef] { + protected val newAndUpdatedEntries = TransactionalState.newMap[AnyRef, AnyRef] + protected val removedEntries = TransactionalState.newMap[AnyRef, AnyRef] + protected val shouldClearOnCommit = TransactionalRef[Boolean](false) // to be concretized in subclasses val storage: MapStorage - override def remove(key: AnyRef) = { - verifyTransaction - if (changeSet.contains(key)) changeSet -= key - else storage.removeMapStorageFor(uuid, key) + def commit = { + storage.removeMapStorageFor(uuid, removedEntries.toList) + storage.insertMapStorageEntriesFor(uuid, newAndUpdatedEntries.toList) + if (shouldClearOnCommit.isDefined & shouldClearOnCommit.get.get) storage.removeMapStorageFor(uuid) + newAndUpdatedEntries.clear + removedEntries.clear } - override def getRange(start: Option[AnyRef], count: Int) = - getRange(start, None, count) + def -=(key: AnyRef) = remove(key) - def getRange(start: Option[AnyRef], finish: Option[AnyRef], count: Int) = { - verifyTransaction - try { + def +=(key: AnyRef, value: AnyRef) = put(key, value) + + override def put(key: AnyRef, value: AnyRef): Option[AnyRef] = newAndUpdatedEntries.put(key, value) + + override def update(key: AnyRef, value: AnyRef) = newAndUpdatedEntries.update(key, value) + + def remove(key: AnyRef) = removedEntries.remove(key) + + def slice(start: Option[AnyRef], count: Int): List[Tuple2[AnyRef, AnyRef]] = slice(start, None, count) + + def slice(start: Option[AnyRef], finish: Option[AnyRef], count: Int): List[Tuple2[AnyRef, AnyRef]] = try { storage.getMapStorageRangeFor(uuid, start, finish, count) - } catch { - case e: Exception => Nil - } - } + } catch { case e: Exception => Nil } - // ---- For Transactional ---- - override def commit = { - storage.insertMapStorageEntriesFor(uuid, changeSet.toList) - changeSet.clear - } + override def clear = shouldClearOnCommit.swap(true) - // ---- Overriding scala.collection.mutable.Map behavior ---- - override def clear = { - verifyTransaction - try { - storage.removeMapStorageFor(uuid) - } catch { - case e: Exception => {} - } - } - - override def contains(key: AnyRef): Boolean = { - try { - verifyTransaction + override def contains(key: AnyRef): Boolean = try { + newAndUpdatedEntries.contains(key) || storage.getMapStorageEntryFor(uuid, key).isDefined - } catch { - case e: Exception => false - } - } + } catch { case e: Exception => false } - override def size: Int = { - verifyTransaction - try { - storage.getMapStorageSizeFor(uuid) - } catch { - case e: Exception => 0 - } - } + override def size: Int = try { + storage.getMapStorageSizeFor(uuid) + newAndUpdatedEntries.size + } catch { case e: Exception => 0 } - // ---- For scala.collection.mutable.Map ---- override def get(key: AnyRef): Option[AnyRef] = { - verifyTransaction - // if (changeSet.contains(key)) changeSet.get(key) - // else { - val result = try { + if (newAndUpdatedEntries.contains(key)) newAndUpdatedEntries.get(key) + else try { storage.getMapStorageEntryFor(uuid, key) - } catch { - case e: Exception => None - } - result - //} + } catch { case e: Exception => None } } override def elements: Iterator[Tuple2[AnyRef, AnyRef]] = { - //verifyTransaction new Iterator[Tuple2[AnyRef, AnyRef]] { private val originalList: List[Tuple2[AnyRef, AnyRef]] = try { storage.getMapStorageFor(uuid) } catch { case e: Throwable => Nil } - private var elements = originalList.reverse + // FIXME how to deal with updated entries, these should be replaced in the originalList not just added + private var elements = newAndUpdatedEntries.toList ::: originalList.reverse override def next: Tuple2[AnyRef, AnyRef]= synchronized { val element = elements.head elements = elements.tail @@ -197,7 +154,7 @@ abstract class TemplatePersistentTransactionalMap extends PersistentTransactiona * * @author Debasish Ghosh */ -class CassandraPersistentTransactionalMap extends TemplatePersistentTransactionalMap { +class CassandraPersistentMap extends TemplatePersistentMap { val storage = CassandraStorage } @@ -206,7 +163,7 @@ class CassandraPersistentTransactionalMap extends TemplatePersistentTransactiona * * @author Debasish Ghosh */ -class MongoPersistentTransactionalMap extends TemplatePersistentTransactionalMap { +class MongoPersistentMap extends TemplatePersistentMap { val storage = MongoStorage } @@ -218,69 +175,62 @@ class MongoPersistentTransactionalMap extends TemplatePersistentTransactionalMap * * @author Jonas Bonér */ -abstract class PersistentTransactionalVector[T] extends TransactionalVector[T] { - - // FIXME: need to handle remove in another changeSet - protected[akka] val changeSet = new ArrayBuffer[T] - - // ---- For Transactional ---- - override def begin = {} - - override def rollback = changeSet.clear - - // ---- For TransactionalVector ---- - override def add(value: T) = { - verifyTransaction - changeSet += value - } -} +trait PersistentVector[T] extends Transactional with RandomAccessSeq[T] /** * Implements a template for a concrete persistent transactional vector based storage. * * @author Debasish Ghosh */ -abstract class TemplatePersistentTransactionalVector extends PersistentTransactionalVector[AnyRef] { +trait TemplatePersistentVector extends PersistentVector[AnyRef] { + protected val newElems = TransactionalState.newVector[AnyRef] + protected val updatedElems = TransactionalState.newMap[Int, AnyRef] + protected val removedElems = TransactionalState.newVector[AnyRef] + protected val shouldClearOnCommit = TransactionalRef[Boolean](false) val storage: VectorStorage - // ---- For TransactionalVector ---- - override def get(index: Int): AnyRef = { - verifyTransaction - if (changeSet.size > index) changeSet(index) + def commit = { + // FIXME: should use batch function once the bug is resolved + for (element <- newElems) storage.insertVectorStorageEntryFor(uuid, element) + for (entry <- updatedElems) storage.updateVectorStorageEntryFor(uuid, entry._1, entry._2) + newElems.clear + updatedElems.clear + } + + def apply(index: Int): AnyRef = get(index) + + def get(index: Int): AnyRef = { + if (newElems.size > index) newElems(index) else storage.getVectorStorageEntryFor(uuid, index) } - override def getRange(start: Int, count: Int): List[AnyRef] = - getRange(Some(start), None, count) + override def slice(start: Int, count: Int): RandomAccessSeq[AnyRef] = slice(Some(start), None, count) - def getRange(start: Option[Int], finish: Option[Int], count: Int): List[AnyRef] = { - verifyTransaction + def slice(start: Option[Int], finish: Option[Int], count: Int): RandomAccessSeq[AnyRef] = storage.getVectorStorageRangeFor(uuid, start, finish, count) - } - override def length: Int = { - verifyTransaction - storage.getVectorStorageSizeFor(uuid) - } + /** + * Removes the tail element of this vector. + */ + // FIXME + def pop: AnyRef = throw new UnsupportedOperationException("need to implement persistent vector pop") - override def apply(index: Int): AnyRef = get(index) + // FIXME + def update(index: Int, newElem: AnyRef) = storage.updateVectorStorageEntryFor(uuid, index, newElem) override def first: AnyRef = get(0) override def last: AnyRef = { - verifyTransaction - val l = length - if (l == 0) throw new NoSuchElementException("Vector is empty") - get(length - 1) + if (newElems.length != 0) newElems.last + else { + val len = length + if (len == 0) throw new NoSuchElementException("Vector is empty") + get(len - 1) + } } - // ---- For Transactional ---- - override def commit = { - // FIXME: should use batch function once the bug is resolved - for (element <- changeSet) storage.insertVectorStorageEntryFor(uuid, element) - changeSet.clear - } + def length: Int = storage.getVectorStorageSizeFor(uuid) + newElems.length } /** @@ -288,7 +238,7 @@ abstract class TemplatePersistentTransactionalVector extends PersistentTransacti * * @author Debaissh Ghosh */ -class CassandraPersistentTransactionalVector extends TemplatePersistentTransactionalVector { +class CassandraPersistentVector extends TemplatePersistentVector { val storage = CassandraStorage } @@ -297,38 +247,37 @@ class CassandraPersistentTransactionalVector extends TemplatePersistentTransacti * * @author Debaissh Ghosh */ -class MongoPersistentTransactionalVector extends TemplatePersistentTransactionalVector { +class MongoPersistentVector extends TemplatePersistentVector { val storage = MongoStorage } -abstract class TemplatePersistentTransactionalRef extends TransactionalRef[AnyRef] { +trait PersistentRef[T] extends Transactional + +trait TemplatePersistentRef extends PersistentRef[AnyRef] { + protected val ref = new TransactionalRef[AnyRef] + val storage: RefStorage - override def commit = if (ref.isDefined) { + def commit = if (ref.isDefined) { storage.insertRefStorageFor(uuid, ref.get) - ref = None + ref.swap(null) } - override def rollback = ref = None + def get: Option[AnyRef] = if (ref.isDefined) ref.get else storage.getRefStorageFor(uuid) - override def get: Option[AnyRef] = { - verifyTransaction - storage.getRefStorageFor(uuid) - } + def isDefined: Boolean = ref.isDefined || storage.getRefStorageFor(uuid).isDefined - override def isDefined: Boolean = get.isDefined - - override def getOrElse(default: => AnyRef): AnyRef = { - val ref = get - if (ref.isDefined) ref + def getOrElse(default: => AnyRef): AnyRef = { + val current = get + if (current.isDefined) current else default } } -class CassandraPersistentTransactionalRef extends TemplatePersistentTransactionalRef { +class CassandraPersistentRef extends TemplatePersistentRef { val storage = CassandraStorage } -class MongoPersistentTransactionalRef extends TemplatePersistentTransactionalRef { +class MongoPersistentRef extends TemplatePersistentRef { val storage = MongoStorage } diff --git a/akka-persistence/src/main/scala/Pool.scala b/akka-persistence/src/main/scala/Pool.scala deleted file mode 100644 index 697366b7d6..0000000000 --- a/akka-persistence/src/main/scala/Pool.scala +++ /dev/null @@ -1,96 +0,0 @@ -/** - * Copyright (C) 2009 Scalable Solutions. - */ - -package se.scalablesolutions.akka.state - -import org.apache.commons.pool._ -import org.apache.commons.pool.impl._ - -import org.apache.thrift.transport._ - -trait Pool[T] extends java.io.Closeable { - def borrowObject: T - def returnObject(t: T): Unit - def invalidateObject(t: T): Unit - def addObject: Unit - def getNumIdle: Int - def getNumActive: Int - def clear: Unit - def setFactory(factory: PoolItemFactory[T]): Unit -} - -trait PoolFactory[T] { - def createPool: Pool[T] -} - -trait PoolItemFactory[T] { - def makeObject: T - def destroyObject(t: T): Unit - def validateObject(t: T): Boolean - def activateObject(t: T): Unit - def passivateObject(t: T): Unit -} - -trait PoolBridge[T, OP <: ObjectPool] extends Pool[T] { - val impl: OP - override def borrowObject: T = impl.borrowObject.asInstanceOf[T] - override def returnObject(t: T) = impl.returnObject(t) - override def invalidateObject(t: T) = impl.invalidateObject(t) - override def addObject = impl.addObject - override def getNumIdle: Int = impl.getNumIdle - override def getNumActive: Int = impl.getNumActive - override def clear: Unit = impl.clear - override def close: Unit = impl.close - override def setFactory(factory: PoolItemFactory[T]) = impl.setFactory(toPoolableObjectFactory(factory)) - - def toPoolableObjectFactory[T](pif: PoolItemFactory[T]) = new PoolableObjectFactory { - def makeObject: Object = pif.makeObject.asInstanceOf[Object] - def destroyObject(o: Object): Unit = pif.destroyObject(o.asInstanceOf[T]) - def validateObject(o: Object): Boolean = pif.validateObject(o.asInstanceOf[T]) - def activateObject(o: Object): Unit = pif.activateObject(o.asInstanceOf[T]) - def passivateObject(o: Object): Unit = pif.passivateObject(o.asInstanceOf[T]) - } -} - -object StackPool { - def apply[T](factory: PoolItemFactory[T]) = new PoolBridge[T,StackObjectPool] { - val impl = new StackObjectPool(toPoolableObjectFactory(factory)) - } - - def apply[T](factory: PoolItemFactory[T], maxIdle: Int) = new PoolBridge[T,StackObjectPool] { - val impl = new StackObjectPool(toPoolableObjectFactory(factory),maxIdle) - } - - def apply[T](factory: PoolItemFactory[T], maxIdle: Int, initIdleCapacity: Int) = new PoolBridge[T,StackObjectPool] { - val impl = new StackObjectPool(toPoolableObjectFactory(factory),maxIdle,initIdleCapacity) - } -} - -object SoftRefPool { - def apply[T](factory: PoolItemFactory[T]) = new PoolBridge[T,SoftReferenceObjectPool] { - val impl = new SoftReferenceObjectPool(toPoolableObjectFactory(factory)) - } - - def apply[T](factory: PoolItemFactory[T], initSize: Int) = new PoolBridge[T,SoftReferenceObjectPool] { - val impl = new SoftReferenceObjectPool(toPoolableObjectFactory(factory),initSize) - } -} - -trait TransportFactory[T <: TTransport] extends PoolItemFactory[T] { - def createTransport: T - def makeObject: T = createTransport - def destroyObject(transport: T): Unit = transport.close - def validateObject(transport: T) = transport.isOpen - def activateObject(transport: T): Unit = if( !transport.isOpen ) transport.open else () - def passivateObject(transport: T): Unit = transport.flush -} - -case class SocketProvider(val host: String, val port: Int) extends TransportFactory[TSocket] { - def createTransport = { - val t = new TSocket(host, port) - t.open - t - } -} - diff --git a/akka-persistence/src/main/scala/Storage.scala b/akka-persistence/src/main/scala/Storage.scala index a041a932e0..26765c0b6d 100644 --- a/akka-persistence/src/main/scala/Storage.scala +++ b/akka-persistence/src/main/scala/Storage.scala @@ -1,3 +1,7 @@ +/** + * Copyright (C) 2009 Scalable Solutions. + */ + package se.scalablesolutions.akka.state // abstracts persistence storage @@ -15,12 +19,13 @@ trait MapStorage extends Storage { def getMapStorageRangeFor(name: String, start: Option[AnyRef], finish: Option[AnyRef], count: Int): List[Tuple2[AnyRef, AnyRef]] } -// for vectors +// for Vectors trait VectorStorage extends Storage { def insertVectorStorageEntryFor(name: String, element: AnyRef) def insertVectorStorageEntriesFor(name: String, elements: List[AnyRef]) + def updateVectorStorageEntryFor(name: String, index: Int, elem: AnyRef) def getVectorStorageEntryFor(name: String, index: Int): AnyRef - def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[AnyRef] + def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): RandomAccessSeq[AnyRef] def getVectorStorageSizeFor(name: String): Int } diff --git a/akka.iml b/akka.iml old mode 100755 new mode 100644 index 477bfd39d2..4a4e9fc451 --- a/akka.iml +++ b/akka.iml @@ -1,14 +1,8 @@ - - - - - - diff --git a/akka.ipr b/akka.ipr old mode 100755 new mode 100644 index 226c8db6c4..4b6cd3fc6a --- a/akka.ipr +++ b/akka.ipr @@ -7,37 +7,7 @@ - + @@ -63,45 +33,6 @@ - - - - - - - - - - - - - @@ -122,242 +53,6 @@ @@ -403,17 +98,14 @@ - - - @@ -543,32 +235,25 @@ - - - - - - - - - - - - - - - - + + + + + + + + + + + + - - - + - - - - - - - - - - - - - - - - - - localhost - 5050 - - - - -