From d4cd0ff3acdcf28ff410afb623413e711dbd2c7f Mon Sep 17 00:00:00 2001 From: ticktock Date: Mon, 15 Nov 2010 16:06:04 -0500 Subject: [PATCH] retry only failed/not executed Ops if the starabe backend is not transactional --- .../src/main/scala/Storage.scala | 77 +++++++++++++------ 1 file changed, 54 insertions(+), 23 deletions(-) diff --git a/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala b/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala index e44851ccc4..3cdb1dd0f0 100644 --- a/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala +++ b/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala @@ -48,6 +48,8 @@ class StorageException(message: String) extends RuntimeException(message) trait Storage { type ElementType + def transactional:Boolean=false + def newMap: PersistentMap[ElementType, ElementType] def newVector: PersistentVector[ElementType] @@ -87,7 +89,11 @@ trait Storage { private[akka] object PersistentMap { // operations on the Map - sealed trait Op + sealed trait Op{ + private var unexec = true + def wasExecuted() = {unexec = false} + def notExecuted() = unexec + } case object PUT extends Op case object REM extends Op case object UPD extends Op @@ -177,15 +183,19 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V] val storage: MapStorageBackend[K, V] def commit = { - appendOnlyTxLog.foreach { - case LogEntry(k, v, o) => o match { - case PUT => storage.insertMapStorageEntryFor(uuid, k.get, v.get) - case UPD => storage.insertMapStorageEntryFor(uuid, k.get, v.get) - case REM => storage.removeMapStorageFor(uuid, k.get) - case CLR => storage.removeMapStorageFor(uuid) + appendOnlyTxLog.foreach{ + case LogEntry(k, v, o) => { + if (storage.transactional || o.notExecuted) { + o match { + case PUT => storage.insertMapStorageEntryFor(uuid, k.get, v.get) + case UPD => storage.insertMapStorageEntryFor(uuid, k.get, v.get) + case REM => storage.removeMapStorageFor(uuid, k.get) + case CLR => storage.removeMapStorageFor(uuid) + } + o.wasExecuted + } } } - appendOnlyTxLog.clear clearDistinctKeys } @@ -439,7 +449,11 @@ trait PersistentMapBinary extends PersistentMap[Array[Byte], Array[Byte]] { private[akka] object PersistentVector { // operations on the Vector - sealed trait Op + sealed trait Op{ + private var unexec = true + def wasExecuted() = {unexec = false} + def notExecuted() = unexec + } case object ADD extends Op case object UPD extends Op case object POP extends Op @@ -468,10 +482,13 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa def commit = { for (entry <- appendOnlyTxLog) { - (entry: @unchecked) match { - case LogEntry(_, Some(v), ADD) => storage.insertVectorStorageEntryFor(uuid, v) - case LogEntry(Some(i), Some(v), UPD) => storage.updateVectorStorageEntryFor(uuid, i, v) - case LogEntry(_, _, POP) => storage.removeVectorStorageEntryFor(uuid) + if (storage.transactional || entry.op.notExecuted) { + (entry: @unchecked) match { + case LogEntry(_, Some(v), ADD) => storage.insertVectorStorageEntryFor(uuid, v) + case LogEntry(Some(i), Some(v), UPD) => storage.updateVectorStorageEntryFor(uuid, i, v) + case LogEntry(_, _, POP) => storage.removeVectorStorageEntryFor(uuid) + } + entry.op.wasExecuted } } appendOnlyTxLog.clear @@ -595,7 +612,11 @@ trait PersistentRef[T] extends Transactional with Committable with Abortable { private[akka] object PersistentQueue { //Operations for PersistentQueue - sealed trait QueueOp + sealed trait QueueOp{ + private var unexec = true + def wasExecuted() = {unexec = false} + def notExecuted() = unexec + } case object ENQ extends QueueOp case object DEQ extends QueueOp } @@ -639,11 +660,14 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A] // to be concretized in subclasses val storage: QueueStorageBackend[A] - def commit = synchronized { + def commit = synchronized{ for (entry <- appendOnlyTxLog) { - (entry: @unchecked) match { - case LogEntry(Some(v), ENQ) => storage.enqueue(uuid, v) - case LogEntry(_, DEQ) => storage.dequeue(uuid) + if (storage.transactional || entry.op.notExecuted) { + (entry: @unchecked) match { + case LogEntry(Some(v), ENQ) => storage.enqueue(uuid, v) + case LogEntry(_, DEQ) => storage.dequeue(uuid) + } + entry.op.wasExecuted } } appendOnlyTxLog.clear @@ -718,7 +742,11 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A] private[akka] object PersistentSortedSet { // operations on the SortedSet - sealed trait Op + sealed trait Op{ + private var unexec = true + def wasExecuted() = {unexec = false} + def notExecuted() = unexec + } case object ADD extends Op case object REM extends Op } @@ -772,9 +800,12 @@ trait PersistentSortedSet[A] extends Transactional with Committable with Abortab def commit = { for (entry <- appendOnlyTxLog) { - (entry: @unchecked) match { - case LogEntry(e, Some(s), ADD) => storage.zadd(uuid, String.valueOf(s), e) - case LogEntry(e, _, REM) => storage.zrem(uuid, e) + if (storage.transactional || entry.op.notExecuted) { + (entry: @unchecked) match { + case LogEntry(e, Some(s), ADD) => storage.zadd(uuid, String.valueOf(s), e) + case LogEntry(e, _, REM) => storage.zrem(uuid, e) + } + entry.op.wasExecuted } } appendOnlyTxLog.clear @@ -827,7 +858,7 @@ trait PersistentSortedSet[A] extends Transactional with Committable with Abortab val es = replay // a multimap with key as A and value as Set of scores - val m = new collection.mutable.HashMap[A, collection.mutable.Set[Float]] + val m = new collection.mutable.HashMap[A, collection.mutable.Set[Float]] with collection.mutable.MultiMap[A, Float] for(e <- es) m.addBinding(e._1, e._2)