/** * Copyright (C) 2009-2010 Scalable Solutions AB */ package se.scalablesolutions.akka.persistence.common import se.scalablesolutions.akka.stm._ import se.scalablesolutions.akka.stm.TransactionManagement.transaction import se.scalablesolutions.akka.util.Logging // FIXME move to 'stm' package + add message with more info class NoTransactionInScopeException extends RuntimeException class StorageException(message: String) extends RuntimeException(message) /** * Example Scala usage. *

* New map with generated id. *

 * val myMap = CassandraStorage.newMap
 * 
* * New map with user-defined id. *
 * val myMap = MongoStorage.newMap(id)
 * 
* * Get map by user-defined id. *
 * val myMap = CassandraStorage.getMap(id)
 * 
* * Example Java usage: *
 * PersistentMap myMap = MongoStorage.newMap();
 * 
* Or: *
 * MongoPersistentMap myMap = MongoStorage.getMap(id);
 * 
* * @author Jonas Bonér * @author Debasish Ghosh */ trait Storage { type ElementType def newMap: PersistentMap[ElementType, ElementType] def newVector: PersistentVector[ElementType] def newRef: PersistentRef[ElementType] def newQueue: PersistentQueue[ElementType] = // only implemented for redis throw new UnsupportedOperationException def getMap(id: String): PersistentMap[ElementType, ElementType] def getVector(id: String): PersistentVector[ElementType] def getRef(id: String): PersistentRef[ElementType] def getQueue(id: String): PersistentQueue[ElementType] = // only implemented for redis throw new UnsupportedOperationException def newMap(id: String): PersistentMap[ElementType, ElementType] def newVector(id: String): PersistentVector[ElementType] def newRef(id: String): PersistentRef[ElementType] def newQueue(id: String): PersistentQueue[ElementType] = // only implemented for redis throw new UnsupportedOperationException } /** * Implementation of PersistentMap for every concrete * storage will have the same workflow. This abstracts the workflow. * * Subclasses just need to provide the actual concrete instance for the * abstract val storage. * * @author Jonas Bonér */ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V] with Transactional with Committable with Logging { protected val newAndUpdatedEntries = TransactionalState.newMap[K, V] protected val removedEntries = TransactionalState.newVector[K] protected val shouldClearOnCommit = TransactionalRef[Boolean]() // to be concretized in subclasses val storage: MapStorageBackend[K, V] def commit = { removedEntries.toList.foreach(key => storage.removeMapStorageFor(uuid, key)) storage.insertMapStorageEntriesFor(uuid, newAndUpdatedEntries.toList) if (shouldClearOnCommit.isDefined && shouldClearOnCommit.get.get) storage.removeMapStorageFor(uuid) newAndUpdatedEntries.clear removedEntries.clear } def -=(key: K) = remove(key) def +=(key: K, value: V) = put(key, value) override def put(key: K, value: V): Option[V] = { register newAndUpdatedEntries.put(key, value) } override def update(key: K, value: V) = { register newAndUpdatedEntries.update(key, value) } def remove(key: K) = { register removedEntries.add(key) } def slice(start: Option[K], count: Int): List[Tuple2[K, V]] = slice(start, None, count) def slice(start: Option[K], finish: Option[K], count: Int): List[Tuple2[K, V]] = try { storage.getMapStorageRangeFor(uuid, start, finish, count) } catch { case e: Exception => Nil } override def clear = { register shouldClearOnCommit.swap(true) } override def contains(key: K): Boolean = try { newAndUpdatedEntries.contains(key) || storage.getMapStorageEntryFor(uuid, key).isDefined } catch { case e: Exception => false } override def size: Int = try { storage.getMapStorageSizeFor(uuid) } catch { case e: Exception => 0 } override def get(key: K): Option[V] = { if (newAndUpdatedEntries.contains(key)) { newAndUpdatedEntries.get(key) } else try { storage.getMapStorageEntryFor(uuid, key) } catch { case e: Exception => None } } override def elements: Iterator[Tuple2[K, V]] = { new Iterator[Tuple2[K, V]] { private val originalList: List[Tuple2[K, V]] = try { storage.getMapStorageFor(uuid) } catch { case e: Throwable => Nil } private var elements = newAndUpdatedEntries.toList union originalList.reverse override def next: Tuple2[K, V]= synchronized { val element = elements.head elements = elements.tail element } override def hasNext: Boolean = synchronized { !elements.isEmpty } } } private def register = { if (transaction.get.isEmpty) throw new NoTransactionInScopeException transaction.get.get.register(uuid, this) } } /** * Implements a template for a concrete persistent transactional vector based storage. * * @author Jonas Bonér */ trait PersistentVector[T] extends RandomAccessSeq[T] with Transactional with Committable { protected val newElems = TransactionalState.newVector[T] protected val updatedElems = TransactionalState.newMap[Int, T] protected val removedElems = TransactionalState.newVector[T] protected val shouldClearOnCommit = TransactionalRef[Boolean]() val storage: VectorStorageBackend[T] def commit = { for (element <- newElems) storage.insertVectorStorageEntryFor(uuid, element) for (entry <- updatedElems) storage.updateVectorStorageEntryFor(uuid, entry._1, entry._2) newElems.clear updatedElems.clear } def +(elem: T) = add(elem) def add(elem: T) = { register newElems + elem } def apply(index: Int): T = get(index) def get(index: Int): T = { if (newElems.size > index) newElems(index) else storage.getVectorStorageEntryFor(uuid, index) } override def slice(start: Int, count: Int): RandomAccessSeq[T] = slice(Some(start), None, count) def slice(start: Option[Int], finish: Option[Int], count: Int): RandomAccessSeq[T] = { val buffer = new scala.collection.mutable.ArrayBuffer[T] storage.getVectorStorageRangeFor(uuid, start, finish, count).foreach(buffer.append(_)) buffer } /** * Removes the tail element of this vector. */ def pop: T = { register throw new UnsupportedOperationException("PersistentVector::pop is not implemented") } def update(index: Int, newElem: T) = { register storage.updateVectorStorageEntryFor(uuid, index, newElem) } override def first: T = get(0) override def last: T = { if (newElems.length != 0) newElems.last else { val len = length if (len == 0) throw new NoSuchElementException("Vector is empty") get(len - 1) } } def length: Int = storage.getVectorStorageSizeFor(uuid) + newElems.length private def register = { if (transaction.get.isEmpty) throw new NoTransactionInScopeException transaction.get.get.register(uuid, this) } } /** * Implements a persistent reference with abstract storage. * * @author Jonas Bonér */ trait PersistentRef[T] extends Transactional with Committable { protected val ref = new TransactionalRef[T] val storage: RefStorageBackend[T] def commit = if (ref.isDefined) { storage.insertRefStorageFor(uuid, ref.get.get) ref.swap(null.asInstanceOf[T]) } def swap(elem: T) = { register ref.swap(elem) } def get: Option[T] = if (ref.isDefined) ref.get else storage.getRefStorageFor(uuid) def isDefined: Boolean = ref.isDefined || storage.getRefStorageFor(uuid).isDefined def getOrElse(default: => T): T = { val current = get if (current.isDefined) current.get else default } private def register = { if (transaction.get.isEmpty) throw new NoTransactionInScopeException transaction.get.get.register(uuid, this) } } /** * Implementation of PersistentQueue for every concrete * storage will have the same workflow. This abstracts the workflow. *

* Enqueue is simpler, we just have to record the operation in a local * transactional store for playback during commit. This store * enqueueNDequeuedEntries stores the entire history of enqueue * and dequeue that will be played at commit on the underlying store. *

* The main challenge with dequeue is that we need to return the element * that has been dequeued. Hence in addition to the above store, we need to * have another local queue that actually does the enqueue dequeue operations * that take place only during this transaction. This gives us the * element that will be dequeued next from the set of elements enqueued * during this transaction. *

* The third item that we need is an index to the underlying storage element * that may also have to be dequeued as part of the current transaction. This * is modeled using a ref to an Int that points to elements in the underlyinng store. *

* Subclasses just need to provide the actual concrete instance for the * abstract val storage. * * @author Debasish Ghosh */ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A] with Transactional with Committable with Logging { abstract case class QueueOp case object ENQ extends QueueOp case object DEQ extends QueueOp import scala.collection.immutable.Queue // current trail that will be played on commit to the underlying store protected val enqueuedNDequeuedEntries = TransactionalState.newVector[(Option[A], QueueOp)] protected val shouldClearOnCommit = TransactionalRef[Boolean]() // local queue that will record all enqueues and dequeues in the current txn protected val localQ = TransactionalRef[Queue[A]]() // keeps a pointer to the underlying storage for the enxt candidate to be dequeued protected val pickMeForDQ = TransactionalRef[Int]() localQ.swap(Queue.Empty) pickMeForDQ.swap(0) // to be concretized in subclasses val storage: QueueStorageBackend[A] def commit = { enqueuedNDequeuedEntries.toList.foreach { e => e._2 match { case ENQ => storage.enqueue(uuid, e._1.get) case DEQ => storage.dequeue(uuid) } } if (shouldClearOnCommit.isDefined && shouldClearOnCommit.get.get) { storage.remove(uuid) } enqueuedNDequeuedEntries.clear localQ.swap(Queue.Empty) pickMeForDQ.swap(0) } override def enqueue(elems: A*) { register elems.foreach(e => { enqueuedNDequeuedEntries.add((Some(e), ENQ)) localQ.get.get.enqueue(e) }) } override def dequeue: A = { register // record for later playback enqueuedNDequeuedEntries.add((None, DEQ)) val i = pickMeForDQ.get.get if (i < storage.size(uuid)) { // still we can DQ from storage pickMeForDQ.swap(i + 1) storage.peek(uuid, i, 1)(0) } else { // check we have transient candidates in localQ for DQ if (localQ.get.get.isEmpty == false) { val (a, q) = localQ.get.get.dequeue localQ.swap(q) a } else throw new NoSuchElementException("trying to dequeue from empty queue") } } override def clear = { register shouldClearOnCommit.swap(true) localQ.swap(Queue.Empty) pickMeForDQ.swap(0) } override def size: Int = try { storage.size(uuid) + localQ.get.get.length } catch { case e: Exception => 0 } override def isEmpty: Boolean = size == 0 override def +=(elem: A): Unit = enqueue(elem) override def ++=(elems: Iterator[A]): Unit = enqueue(elems.toList: _*) override def ++=(elems: Iterable[A]): Unit = this ++= elems.elements override def dequeueFirst(p: A => Boolean): Option[A] = throw new UnsupportedOperationException("dequeueFirst not supported") override def dequeueAll(p: A => Boolean): Seq[A] = throw new UnsupportedOperationException("dequeueAll not supported") private def register = { if (transaction.get.isEmpty) throw new NoTransactionInScopeException transaction.get.get.register(uuid, this) } }