retry only failed/not executed Ops if the starabe backend is not transactional
This commit is contained in:
parent
5494fec9a9
commit
d4cd0ff3ac
1 changed files with 54 additions and 23 deletions
|
|
@ -48,6 +48,8 @@ class StorageException(message: String) extends RuntimeException(message)
|
|||
trait Storage {
|
||||
type ElementType
|
||||
|
||||
def transactional:Boolean=false
|
||||
|
||||
def newMap: PersistentMap[ElementType, ElementType]
|
||||
|
||||
def newVector: PersistentVector[ElementType]
|
||||
|
|
@ -87,7 +89,11 @@ trait Storage {
|
|||
|
||||
private[akka] object PersistentMap {
|
||||
// operations on the Map
|
||||
sealed trait Op
|
||||
sealed trait Op{
|
||||
private var unexec = true
|
||||
def wasExecuted() = {unexec = false}
|
||||
def notExecuted() = unexec
|
||||
}
|
||||
case object PUT extends Op
|
||||
case object REM extends Op
|
||||
case object UPD extends Op
|
||||
|
|
@ -177,15 +183,19 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
|
|||
val storage: MapStorageBackend[K, V]
|
||||
|
||||
def commit = {
|
||||
appendOnlyTxLog.foreach {
|
||||
case LogEntry(k, v, o) => o match {
|
||||
case PUT => storage.insertMapStorageEntryFor(uuid, k.get, v.get)
|
||||
case UPD => storage.insertMapStorageEntryFor(uuid, k.get, v.get)
|
||||
case REM => storage.removeMapStorageFor(uuid, k.get)
|
||||
case CLR => storage.removeMapStorageFor(uuid)
|
||||
appendOnlyTxLog.foreach{
|
||||
case LogEntry(k, v, o) => {
|
||||
if (storage.transactional || o.notExecuted) {
|
||||
o match {
|
||||
case PUT => storage.insertMapStorageEntryFor(uuid, k.get, v.get)
|
||||
case UPD => storage.insertMapStorageEntryFor(uuid, k.get, v.get)
|
||||
case REM => storage.removeMapStorageFor(uuid, k.get)
|
||||
case CLR => storage.removeMapStorageFor(uuid)
|
||||
}
|
||||
o.wasExecuted
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
appendOnlyTxLog.clear
|
||||
clearDistinctKeys
|
||||
}
|
||||
|
|
@ -439,7 +449,11 @@ trait PersistentMapBinary extends PersistentMap[Array[Byte], Array[Byte]] {
|
|||
|
||||
private[akka] object PersistentVector {
|
||||
// operations on the Vector
|
||||
sealed trait Op
|
||||
sealed trait Op{
|
||||
private var unexec = true
|
||||
def wasExecuted() = {unexec = false}
|
||||
def notExecuted() = unexec
|
||||
}
|
||||
case object ADD extends Op
|
||||
case object UPD extends Op
|
||||
case object POP extends Op
|
||||
|
|
@ -468,10 +482,13 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa
|
|||
|
||||
def commit = {
|
||||
for (entry <- appendOnlyTxLog) {
|
||||
(entry: @unchecked) match {
|
||||
case LogEntry(_, Some(v), ADD) => storage.insertVectorStorageEntryFor(uuid, v)
|
||||
case LogEntry(Some(i), Some(v), UPD) => storage.updateVectorStorageEntryFor(uuid, i, v)
|
||||
case LogEntry(_, _, POP) => storage.removeVectorStorageEntryFor(uuid)
|
||||
if (storage.transactional || entry.op.notExecuted) {
|
||||
(entry: @unchecked) match {
|
||||
case LogEntry(_, Some(v), ADD) => storage.insertVectorStorageEntryFor(uuid, v)
|
||||
case LogEntry(Some(i), Some(v), UPD) => storage.updateVectorStorageEntryFor(uuid, i, v)
|
||||
case LogEntry(_, _, POP) => storage.removeVectorStorageEntryFor(uuid)
|
||||
}
|
||||
entry.op.wasExecuted
|
||||
}
|
||||
}
|
||||
appendOnlyTxLog.clear
|
||||
|
|
@ -595,7 +612,11 @@ trait PersistentRef[T] extends Transactional with Committable with Abortable {
|
|||
|
||||
private[akka] object PersistentQueue {
|
||||
//Operations for PersistentQueue
|
||||
sealed trait QueueOp
|
||||
sealed trait QueueOp{
|
||||
private var unexec = true
|
||||
def wasExecuted() = {unexec = false}
|
||||
def notExecuted() = unexec
|
||||
}
|
||||
case object ENQ extends QueueOp
|
||||
case object DEQ extends QueueOp
|
||||
}
|
||||
|
|
@ -639,11 +660,14 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A]
|
|||
// to be concretized in subclasses
|
||||
val storage: QueueStorageBackend[A]
|
||||
|
||||
def commit = synchronized {
|
||||
def commit = synchronized{
|
||||
for (entry <- appendOnlyTxLog) {
|
||||
(entry: @unchecked) match {
|
||||
case LogEntry(Some(v), ENQ) => storage.enqueue(uuid, v)
|
||||
case LogEntry(_, DEQ) => storage.dequeue(uuid)
|
||||
if (storage.transactional || entry.op.notExecuted) {
|
||||
(entry: @unchecked) match {
|
||||
case LogEntry(Some(v), ENQ) => storage.enqueue(uuid, v)
|
||||
case LogEntry(_, DEQ) => storage.dequeue(uuid)
|
||||
}
|
||||
entry.op.wasExecuted
|
||||
}
|
||||
}
|
||||
appendOnlyTxLog.clear
|
||||
|
|
@ -718,7 +742,11 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A]
|
|||
|
||||
private[akka] object PersistentSortedSet {
|
||||
// operations on the SortedSet
|
||||
sealed trait Op
|
||||
sealed trait Op{
|
||||
private var unexec = true
|
||||
def wasExecuted() = {unexec = false}
|
||||
def notExecuted() = unexec
|
||||
}
|
||||
case object ADD extends Op
|
||||
case object REM extends Op
|
||||
}
|
||||
|
|
@ -772,9 +800,12 @@ trait PersistentSortedSet[A] extends Transactional with Committable with Abortab
|
|||
|
||||
def commit = {
|
||||
for (entry <- appendOnlyTxLog) {
|
||||
(entry: @unchecked) match {
|
||||
case LogEntry(e, Some(s), ADD) => storage.zadd(uuid, String.valueOf(s), e)
|
||||
case LogEntry(e, _, REM) => storage.zrem(uuid, e)
|
||||
if (storage.transactional || entry.op.notExecuted) {
|
||||
(entry: @unchecked) match {
|
||||
case LogEntry(e, Some(s), ADD) => storage.zadd(uuid, String.valueOf(s), e)
|
||||
case LogEntry(e, _, REM) => storage.zrem(uuid, e)
|
||||
}
|
||||
entry.op.wasExecuted
|
||||
}
|
||||
}
|
||||
appendOnlyTxLog.clear
|
||||
|
|
@ -827,7 +858,7 @@ trait PersistentSortedSet[A] extends Transactional with Committable with Abortab
|
|||
val es = replay
|
||||
|
||||
// a multimap with key as A and value as Set of scores
|
||||
val m = new collection.mutable.HashMap[A, collection.mutable.Set[Float]]
|
||||
val m = new collection.mutable.HashMap[A, collection.mutable.Set[Float]]
|
||||
with collection.mutable.MultiMap[A, Float]
|
||||
for(e <- es) m.addBinding(e._1, e._2)
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue