diff --git a/akka-persistence/akka-persistence-cassandra/src/main/scala/CassandraStorageBackend.scala b/akka-persistence/akka-persistence-cassandra/src/main/scala/CassandraStorageBackend.scala index 03cc0048f3..a4eb481215 100644 --- a/akka-persistence/akka-persistence-cassandra/src/main/scala/CassandraStorageBackend.scala +++ b/akka-persistence/akka-persistence-cassandra/src/main/scala/CassandraStorageBackend.scala @@ -62,8 +62,6 @@ private[akka] object CassandraStorageBackend extends CommonStorageBackend { CONSISTENCY_LEVEL) - - class CassandraAccess(parent: ColumnParent) extends CommonStorageBackendAccess { def path(key: Array[Byte]): ColumnPath = { @@ -77,7 +75,8 @@ private[akka] object CassandraStorageBackend extends CommonStorageBackend { } } } - def getAll(owner: String, keys: Iterable[Array[Byte]]): Map[Array[Byte], Array[Byte]] = { + + override def getAll(owner: String, keys: Iterable[Array[Byte]]): Map[Array[Byte], Array[Byte]] = { sessions.withSession{ session => { var predicate = new SlicePredicate().setColumn_names(JavaConversions.asList(keys.toList)) @@ -91,7 +90,8 @@ private[akka] object CassandraStorageBackend extends CommonStorageBackend { } } - def getValue(owner: String, key: Array[Byte], default: Array[Byte]) = { + + def get(owner: String, key: Array[Byte], default: Array[Byte]) = { sessions.withSession{ session => { try @@ -115,6 +115,7 @@ private[akka] object CassandraStorageBackend extends CommonStorageBackend { } } + def drop() = { sessions.withSession{ session => { diff --git a/akka-persistence/akka-persistence-common/src/main/scala/CommonStorageBackend.scala b/akka-persistence/akka-persistence-common/src/main/scala/CommonStorageBackend.scala index 2dd9147d74..420d82b474 100644 --- a/akka-persistence/akka-persistence-common/src/main/scala/CommonStorageBackend.scala +++ b/akka-persistence/akka-persistence-common/src/main/scala/CommonStorageBackend.scala @@ -11,6 +11,7 @@ import collection.Map import java.util.{Map => JMap} import akka.persistence.common.PersistentMapBinary.COrdering._ import collection.immutable._ +import collection.mutable.ArrayBuffer private[akka] trait CommonStorageBackendAccess { @@ -19,27 +20,74 @@ private[akka] trait CommonStorageBackendAccess { /*abstract*/ - def getValue(owner: String, key: Array[Byte], default: Array[Byte]): Array[Byte] + def get(owner: String, key: Array[Byte], default: Array[Byte]): Array[Byte] + + def getAll(owner: String, keys: Iterable[Array[Byte]]): Map[Array[Byte], Array[Byte]] = { + keys.foldLeft(new HashMap[Array[Byte], Array[Byte]]) { + (map, key) => { + Option(get(owner, key)) match { + case Some(value) => map + (key -> value) + case None => map + } + } + } + } def put(owner: String, key: Array[Byte], value: Array[Byte]): Unit + def putAll(owner: String, keyValues: Iterable[(Array[Byte], Array[Byte])]): Unit = { + keyValues.foreach{ + kv => kv match { + case (key, value) => put(owner, key, value) + } + } + } + def delete(owner: String, key: Array[Byte]): Unit - def getAll(owner: String, keys: Iterable[Array[Byte]]): Map[Array[Byte], Array[Byte]] + def deleteAll(owner: String, keys: Iterable[Array[Byte]]): Unit = { + keys.foreach(delete(owner, _)) + } def drop(): Unit /*concrete*/ - def decodeKey(owner: String, key: Array[Byte]) = key + def decodeMapKey(owner: String, key: Array[Byte]) = key - def delete(owner: String, index: Int): Unit = delete(owner, IntSerializer.toBytes(index)) + def decodeIndexedKey(owner: String, key: Array[Byte]) = key - def getValue(owner: String, index: Int): Array[Byte] = getValue(owner, IntSerializer.toBytes(index)) + def deleteIndexed(owner: String, index: Int): Unit = delete(owner, IntSerializer.toBytes(index)) - def getValue(owner: String, key: Array[Byte]): Array[Byte] = getValue(owner, key, null) + def getIndexed(owner: String, index: Int): Array[Byte] = get(owner, IntSerializer.toBytes(index)) - def put(owner: String, index: Int, value: Array[Byte]): Unit = put(owner, IntSerializer.toBytes(index), value) + def get(owner: String, key: Array[Byte]): Array[Byte] = get(owner, key, null) + + def putIndexed(owner: String, index: Int, value: Array[Byte]): Unit = put(owner, IntSerializer.toBytes(index), value) + + def putAllIndexed(owner: String, values: Iterable[(Int, Array[Byte])]): Unit = { + putAll(owner, values.map{ + iv => { + iv match { + case (i, value) => (IntSerializer.toBytes(i) -> value) + } + } + }) + } + + def getAllIndexed(owner: String, keys: Iterable[Int]): Map[Int, Array[Byte]] = { + val byteKeys = keys.map(IntSerializer.toBytes(_)) + getAll(owner, byteKeys).map{ + kv => kv match { + case (key, value) => (IntSerializer.fromBytes(key) -> value) + } + } + } + + def deleteAllIndexed(owner: String, keys: Iterable[Int]): Unit = { + val byteKeys = keys.map(IntSerializer.toBytes(_)) + deleteAll(owner, byteKeys) + } } private[akka] trait KVStorageBackendAccess extends CommonStorageBackendAccess { @@ -57,7 +105,7 @@ private[akka] trait KVStorageBackendAccess extends CommonStorageBackendAccess { def delete(key: Array[Byte]): Unit - override def decodeKey(owner: String, key: Array[Byte]): Array[Byte] = { + override def decodeMapKey(owner: String, key: Array[Byte]): Array[Byte] = { val mapKeyLength = key.length - IntSerializer.bytesPerInt - owner.getBytes("UTF-8").length val mapkey = new Array[Byte](mapKeyLength) System.arraycopy(key, key.length - mapKeyLength, mapkey, 0, mapKeyLength) @@ -69,21 +117,21 @@ private[akka] trait KVStorageBackendAccess extends CommonStorageBackendAccess { put(getKey(owner, key), value) } - override def put(owner: String, index: Int, value: Array[Byte]): Unit = { + override def putIndexed(owner: String, index: Int, value: Array[Byte]): Unit = { put(getIndexedKey(owner, index), value) } - override def getValue(owner: String, key: Array[Byte]): Array[Byte] = { + override def get(owner: String, key: Array[Byte]): Array[Byte] = { getValue(getKey(owner, key)) } - override def getValue(owner: String, index: Int): Array[Byte] = { + override def getIndexed(owner: String, index: Int): Array[Byte] = { getValue(getIndexedKey(owner, index)) } - override def getValue(owner: String, key: Array[Byte], default: Array[Byte]): Array[Byte] = { + override def get(owner: String, key: Array[Byte], default: Array[Byte]): Array[Byte] = { getValue(getKey(owner, key), default) } @@ -94,7 +142,7 @@ private[akka] trait KVStorageBackendAccess extends CommonStorageBackendAccess { }) } - override def delete(owner: String, index: Int): Unit = { + override def deleteIndexed(owner: String, index: Int): Unit = { delete(getIndexedKey(owner, index)) } @@ -118,6 +166,13 @@ private[akka] object CommonStorageBackend { val mapKeysIndex: Array[Byte] = new Array[Byte](1).padTo(1, mapKeySetKeyHeader) val mapKeysWrapperPad: Array[Byte] = new Array[Byte](1).padTo(1, mapKeyHeader) + /** + * Wrap map key prepends mapKeysWrapperPad (1-byte) to map keys so that we can + * use a seperate 1 byte key to store the map keyset. + * + * This basically creates the map key used in underlying storage + */ + def wrapMapKey(key: Array[Byte]): Array[Byte] = { val wrapped = new Array[Byte](key.length + mapKeysWrapperPad.length) System.arraycopy(mapKeysWrapperPad, 0, wrapped, 0, mapKeysWrapperPad.length) @@ -125,6 +180,11 @@ private[akka] object CommonStorageBackend { wrapped } + /** + * unwrapMapKey removes the mapKeysWrapperPad, this translates the map key used + * in underlying storage back to a key that is understandable by the frontend + */ + def unwrapMapKey(key: Array[Byte]): Array[Byte] = { val unwrapped = new Array[Byte](key.length - mapKeysWrapperPad.length) System.arraycopy(key, mapKeysWrapperPad.length, unwrapped, 0, unwrapped.length) @@ -260,7 +320,7 @@ private[akka] trait CommonStorageBackend extends MapStorageBackend[Array[Byte], def getRefStorageFor(name: String): Option[Array[Byte]] = { - val result: Array[Byte] = refAccess.getValue(name, refItem) + val result: Array[Byte] = refAccess.get(name, refItem) Option(result) } @@ -293,7 +353,7 @@ private[akka] trait CommonStorageBackend extends MapStorageBackend[Array[Byte], entry match { case (namePlusKey: Array[Byte], value: Array[Byte]) => { //need to fix here - returned += mapAccess.decodeKey(name, unwrapMapKey(namePlusKey)) -> getMapValueFromStored(value) + returned += mapAccess.decodeMapKey(name, unwrapMapKey(namePlusKey)) -> getMapValueFromStored(value) } } } @@ -307,7 +367,7 @@ private[akka] trait CommonStorageBackend extends MapStorageBackend[Array[Byte], } def getMapStorageEntryFor(name: String, key: Array[Byte]): Option[Array[Byte]] = { - val result: Array[Byte] = mapAccess.getValue(name, wrapMapKey(key)) + val result: Array[Byte] = mapAccess.get(name, wrapMapKey(key)) result match { case null => None case _ => Some(getMapValueFromStored(result)) @@ -341,11 +401,15 @@ private[akka] trait CommonStorageBackend extends MapStorageBackend[Array[Byte], } def insertMapStorageEntriesFor(name: String, entries: List[(Array[Byte], Array[Byte])]) = { - val newKeys = entries.map{ + val toInsert = entries.map{ + kv => kv match { + case (key, value) => (wrapMapKey(key) -> getStoredMapValue(value)) + } + } + mapAccess.putAll(name, toInsert) + val newKeys = toInsert.map{ case (key, value) => { - val wrapped = wrapMapKey(key) - mapAccess.put(name, wrapped, getStoredMapValue(value)) - wrapped + key } } var keys = getMapKeys(name) @@ -358,10 +422,9 @@ private[akka] trait CommonStorageBackend extends MapStorageBackend[Array[Byte], } def getMapKeys(name: String): SortedSet[Array[Byte]] = { - SortedSetSerializer.fromBytes(mapAccess.getValue(name, mapKeysIndex, Array.empty[Byte])) + SortedSetSerializer.fromBytes(mapAccess.get(name, mapKeysIndex, Array.empty[Byte])) } - def getVectorStorageSizeFor(name: String): Int = { getVectorMetadata(name).size } @@ -381,19 +444,16 @@ private[akka] trait CommonStorageBackend extends MapStorageBackend[Array[Byte], cnt = mdata.size - st } - val ret = mdata.getRangeIndexes(st, count).toList map { - index: Int => { - log.debug("getting:" + index) - vectorAccess.getValue(name, index) - } - } - ret + val indexes = mdata.getRangeIndexes(st, count) + val result = vectorAccess.getAllIndexed(name, indexes) + indexes.map(result.get(_).get).toList + } def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] = { val mdata = getVectorMetadata(name) if (mdata.size > 0 && index < mdata.size) { - vectorAccess.getValue(name, mdata.getRangeIndexes(index, 1)(0)) + vectorAccess.getIndexed(name, mdata.getRangeIndexes(index, 1)(0)) } else { throw new StorageException("In Vector:" + name + " No such Index:" + index) } @@ -403,8 +463,8 @@ private[akka] trait CommonStorageBackend extends MapStorageBackend[Array[Byte], val mdata = getVectorMetadata(name) if (mdata.size > 0 && index < mdata.size) { elem match { - case null => vectorAccess.delete(name, mdata.getRangeIndexes(index, 1)(0)) - case _ => vectorAccess.put(name, mdata.getRangeIndexes(index, 1)(0), elem) + case null => vectorAccess.deleteIndexed(name, mdata.getRangeIndexes(index, 1)(0)) + case _ => vectorAccess.putIndexed(name, mdata.getRangeIndexes(index, 1)(0), elem) } } else { throw new StorageException("In Vector:" + name + " No such Index:" + index) @@ -412,18 +472,35 @@ private[akka] trait CommonStorageBackend extends MapStorageBackend[Array[Byte], } def insertVectorStorageEntriesFor(name: String, elements: List[Array[Byte]]) = { + var mdata = getVectorMetadata(name) + var deletes: List[Int] = Nil + var puts: List[(Int, Array[Byte])] = Nil elements.foreach{ - insertVectorStorageEntryFor(name, _) + element => { + if (mdata.canInsert) { + element match { + case null => deletes = mdata.head :: deletes + case _ => puts = (mdata.head -> element) :: puts + } + mdata = mdata.copy(head = mdata.nextInsert) + } else { + throw new IllegalStateException("The vector dosent have enough capacity to insert these entries") + } + } } + vectorAccess.deleteAllIndexed(name, deletes) + vectorAccess.putAllIndexed(name, puts) + vectorAccess.put(name, vectorHeadIndex, IntSerializer.toBytes(mdata.head)) + } def insertVectorStorageEntryFor(name: String, element: Array[Byte]) = { val mdata = getVectorMetadata(name) if (mdata.canInsert) { element match { - case null => vectorAccess.delete(name, mdata.head) - case _ => vectorAccess.put(name, mdata.head, element) + case null => vectorAccess.deleteIndexed(name, mdata.head) + case _ => vectorAccess.putIndexed(name, mdata.head, element) } vectorAccess.put(name, vectorHeadIndex, IntSerializer.toBytes(mdata.nextInsert)) } else { @@ -439,7 +516,7 @@ private[akka] trait CommonStorageBackend extends MapStorageBackend[Array[Byte], vectorAccess.put(name, vectorTailIndex, IntSerializer.toBytes(mdata.nextRemove)) try { - vectorAccess.delete(name, mdata.tail) + vectorAccess.deleteIndexed(name, mdata.tail) } catch { case e: Exception => log.warn("Exception while trying to clean up a popped element from the vector, this is acceptable") } @@ -450,9 +527,12 @@ private[akka] trait CommonStorageBackend extends MapStorageBackend[Array[Byte], } def getVectorMetadata(name: String): VectorMetadata = { - val head = vectorAccess.getValue(name, vectorHeadIndex, zero) - val tail = vectorAccess.getValue(name, vectorTailIndex, zero) - VectorMetadata(IntSerializer.fromBytes(head), IntSerializer.fromBytes(tail)) + val result = vectorAccess.getAll(name, List(vectorHeadIndex, vectorTailIndex)) + val head = result.getOrElse(vectorHeadIndex, zero) + val tail = result.getOrElse(vectorTailIndex, zero) + val mdata = VectorMetadata(IntSerializer.fromBytes(head), IntSerializer.fromBytes(tail)) + log.debug(mdata.toString) + mdata } def getOrDefaultToZero(map: Map[Array[Byte], Array[Byte]], key: Array[Byte]): Int = { @@ -467,7 +547,7 @@ private[akka] trait CommonStorageBackend extends MapStorageBackend[Array[Byte], val mdata = getQueueMetadata(name) mdata.getActiveIndexes foreach { index => - queueAccess.delete(name, index) + queueAccess.deleteIndexed(name, index) } queueAccess.delete(name, queueHeadIndex) queueAccess.delete(name, queueTailIndex) @@ -476,13 +556,9 @@ private[akka] trait CommonStorageBackend extends MapStorageBackend[Array[Byte], def peek(name: String, start: Int, count: Int): List[Array[Byte]] = { val mdata = getQueueMetadata(name) - val ret = mdata.getPeekIndexes(start, count).toList map { - index: Int => { - log.debug("peeking:" + index) - queueAccess.getValue(name, index) - } - } - ret + val indexes = mdata.getPeekIndexes(start, count) + val result = queueAccess.getAllIndexed(name, indexes) + indexes.map(result.get(_).get).toList } def size(name: String): Int = { @@ -494,13 +570,13 @@ private[akka] trait CommonStorageBackend extends MapStorageBackend[Array[Byte], if (mdata.canDequeue) { try { - val dequeued = queueAccess.getValue(name, mdata.head) + val dequeued = queueAccess.getIndexed(name, mdata.head) queueAccess.put(name, queueHeadIndex, IntSerializer.toBytes(mdata.nextDequeue)) Some(dequeued) } finally { try { - queueAccess.delete(name, mdata.head) + queueAccess.deleteIndexed(name, mdata.head) } catch { //a failure to delete is ok, just leaves a K-V in Voldemort that will be overwritten if the queue ever wraps around case e: Exception => log.warn(e, "caught an exception while deleting a dequeued element, however this will not cause any inconsistency in the queue") @@ -515,8 +591,8 @@ private[akka] trait CommonStorageBackend extends MapStorageBackend[Array[Byte], val mdata = getQueueMetadata(name) if (mdata.canEnqueue) { item match { - case null => queueAccess.delete(name, mdata.tail) - case _ => queueAccess.put(name, mdata.tail, item) + case null => queueAccess.deleteIndexed(name, mdata.tail) + case _ => queueAccess.putIndexed(name, mdata.tail, item) } queueAccess.put(name, queueTailIndex, IntSerializer.toBytes(mdata.nextEnqueue)) Some(mdata.size + 1) @@ -526,8 +602,9 @@ private[akka] trait CommonStorageBackend extends MapStorageBackend[Array[Byte], } def getQueueMetadata(name: String): QueueMetadata = { - val head = queueAccess.getValue(name, vectorHeadIndex, zero) - val tail = queueAccess.getValue(name, vectorTailIndex, zero) + val result = queueAccess.getAll(name, List(vectorHeadIndex, vectorTailIndex)) + val head = result.get(vectorHeadIndex).getOrElse(zero) + val tail = result.get(vectorTailIndex).getOrElse(zero) QueueMetadata(IntSerializer.fromBytes(head), IntSerializer.fromBytes(tail)) } diff --git a/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala b/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala index 7d74496936..eb423f1599 100644 --- a/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala +++ b/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala @@ -305,10 +305,25 @@ object PersistentMapBinary { ArrayOrdering.compare(o1.toArray, o2.toArray) } //backend + implicit object ArrayOrdering extends Ordering[Array[Byte]] { - def compare(o1: Array[Byte], o2: Array[Byte]) = - new String(o1) compare new String(o2) + def compare(o1: Array[Byte], o2: Array[Byte]): Int = { + if (o1.size == o2.size) { + for (i <- 0 until o1.size) { + var a = o1(i) + var b = o2(i) + if (a != b) { + return (a - b) / (Math.abs(a - b)) + } + } + 0 + } else { + (o1.length - o2.length) / (Math.max(1, Math.abs(o1.length - o2.length))) + } + } + } + } } diff --git a/akka-persistence/akka-persistence-memcached/src/main/scala/MemcachedStorageBackend.scala b/akka-persistence/akka-persistence-memcached/src/main/scala/MemcachedStorageBackend.scala index b84b99adbd..4bf907ec69 100644 --- a/akka-persistence/akka-persistence-memcached/src/main/scala/MemcachedStorageBackend.scala +++ b/akka-persistence/akka-persistence-memcached/src/main/scala/MemcachedStorageBackend.scala @@ -48,10 +48,10 @@ private[akka] object MemcachedStorageBackend extends CommonStorageBackend { base64.encodeToString(key) } - override def decodeKey(owner: String, key: Array[Byte]) = { + override def decodeMapKey(owner: String, key: Array[Byte]) = { val newkey = new Array[Byte](key.length - typeBytes.length) System.arraycopy(key, 0, newkey, 0, newkey.length) - super.decodeKey(owner, newkey) + super.decodeMapKey(owner, newkey) } def drop() = client.flush() diff --git a/akka-persistence/akka-persistence-simpledb/src/test/scala/SimpledbTestIntegration.scala b/akka-persistence/akka-persistence-simpledb/src/test/scala/SimpledbTestIntegration.scala index 5e0452b8be..7dce059757 100644 --- a/akka-persistence/akka-persistence-simpledb/src/test/scala/SimpledbTestIntegration.scala +++ b/akka-persistence/akka-persistence-simpledb/src/test/scala/SimpledbTestIntegration.scala @@ -22,7 +22,7 @@ class SimpledbTestIntegration extends Spec with ShouldMatchers with BeforeAndAft val value = new Array[Byte](valsize) mapAccess.put(name, key, value) - val result = mapAccess.getValue(name, key, Array.empty[Byte]) + val result = mapAccess.get(name, key, Array.empty[Byte]) result.size should be(value.size) result should be(value) }