diff --git a/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortSession.scala b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortSession.scala deleted file mode 100644 index c0eca74832..0000000000 --- a/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortSession.scala +++ /dev/null @@ -1,24 +0,0 @@ -/** - * Copyright (C) 2009-2010 Scalable Solutions AB - */ - -package se.scalablesolutions.akka.persistence.voldemort - -import se.scalablesolutions.akka.util.UUID -import se.scalablesolutions.akka.stm._ -import se.scalablesolutions.akka.persistence.common._ -import voldemort.client.StoreClient - - -class VoldemortSession { - - val voldemort: StoreClient - - def getOptionalBytes(name: String): Option[Array[Byte]] = { - - } - - def put(name:) - - -} \ No newline at end of file diff --git a/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorage.scala b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorage.scala index a590de349b..b906460ca6 100644 --- a/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorage.scala +++ b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorage.scala @@ -34,10 +34,10 @@ class VoldemortPersistentMap(id: String) extends PersistentMapBinary { class VoldemortPersistentVector(id: String) extends PersistentVector[Array[Byte]] { val uuid = id - val storage = VoldemortStoragebackend + val storage = VoldemortStorageBackend } class VoldemortPersistentRef(id: String) extends PersistentRef[Array[Byte]] { val uuid = id - val storage = VoldemortStoragebackend + val storage = VoldemortStorageBackend } diff --git a/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala index 5732fbac8d..6849aa09b2 100644 --- a/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala +++ b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala @@ -13,6 +13,11 @@ import se.scalablesolutions.akka.config.Config.config import voldemort.client._ import collection.mutable.{Set, HashSet, ArrayBuffer} import java.lang.String +import voldemort.utils.ByteUtils +import collection.immutable.{SortedSet, TreeSet} +import voldemort.versioning.Versioned +import java.util.Map +import collection.JavaConversions private[akka] object VoldemortStorageBackend extends @@ -23,25 +28,30 @@ MapStorageBackend[Array[Byte], Array[Byte]] with /** * Concat the owner+key+lenght of owner so owned data will be colocated - * Store the length of owner as last byte to work aroune the rarest case + * Store the length of owner as last byte to work around the rare case * where ownerbytes1 + keybytes1 == ownerbytes2 + keybytes2 but ownerbytes1 != ownerbytes2 */ private def mapKey(owner: String, key: Array[Byte]): Array[Byte] = { val ownerBytes: Array[Byte] = owner.getBytes("UTF-8") val ownerLenghtByte = ownerBytes.length.byteValue - val mapKey = new Array[Byte](ownerBytes.length + key.length + 1) - System.arraycopy(ownerBytes, 0, mapKey, 0, ownerBytes.length) - System.arraycopy(key, 0, mapKey, ownerBytes.length, key.length) - mapKey.update(mapKey.length - 1) = ownerLenghtByte + val theMapKey = new Array[Byte](ownerBytes.length + key.length + 1) + System.arraycopy(ownerBytes, 0, theMapKey, 0, ownerBytes.length) + System.arraycopy(key, 0, theMapKey, ownerBytes.length, key.length) + theMapKey.update(theMapKey.length - 1, ownerLenghtByte) + theMapKey } - var refClient: StoreClient - var mapKeyClient: StoreClient - var mapValueClient: StoreClient + var refClient: StoreClient[String, Array[Byte]] = null + var mapKeyClient: StoreClient[String, SortedSet[Array[Byte]]] = null + var mapValueClient: StoreClient[Array[Byte], Array[Byte]] = null + + implicit val byteOrder = new Ordering[Array[Byte]] { + override def compare(x: Array[Byte], y: Array[Byte]) = ByteUtils.compare(x, y) + } def getRefStorageFor(name: String): Option[Array[Byte]] = { - val result: Array[Byte] = refClient.get(RefKey(name).key) + val result: Array[Byte] = refClient.getValue(name) result match { case null => None case _ => Some(result) @@ -49,29 +59,41 @@ MapStorageBackend[Array[Byte], Array[Byte]] with } def insertRefStorageFor(name: String, element: Array[Byte]) = { - refClient.put(RefKey(name).key, element) + refClient.put(name, element) } def getMapStorageRangeFor(name: String, start: Option[Array[Byte]], finish: Option[Array[Byte]], count: Int): List[(Array[Byte], Array[Byte])] = { - + val allkeys: SortedSet[Array[Byte]] = mapKeyClient.getValue(name, new TreeSet[Array[Byte]]) + val range = allkeys.rangeImpl(start, finish).take(count) + getKeyValues(range) } def getMapStorageFor(name: String): List[(Array[Byte], Array[Byte])] = { - val keys: Set[Array[Byte]] = mapKeyClient.getValue(name, new HashSet[Array[Byte]](0)) - val entries: ArrayBuffer[(Array[Byte], Array[Byte])] = new ArrayBuffer - keys.foreach { - entries += (_, mapValueClient.getValue(mapKey(name, _))) - } - entries.toList + val keys = mapKeyClient.getValue(name, new TreeSet[Array[Byte]]()) + getKeyValues(keys) + } + + private def getKeyValues(keys: SortedSet[Array[Byte]]): List[(Array[Byte], Array[Byte])] = { + val all: Map[Array[Byte], Versioned[Array[Byte]]] = mapValueClient.getAll(JavaConversions.asIterable(keys)) + JavaConversions.asMap(all).foldLeft(new ArrayBuffer[(Array[Byte], Array[Byte])]) { + (buf, keyVal) => { + keyVal match { + case (key, versioned) => { + buf += key -> versioned.getValue + } + } + buf + } + }.toList } def getMapStorageSizeFor(name: String): Int = { - val keys: Set[Array[Byte]] = mapKeyClient.getValue(name, new HashSet[Array[Byte]](0)) + val keys = mapKeyClient.getValue(name, new TreeSet[Array[Byte]]()) keys.size } def getMapStorageEntryFor(name: String, key: Array[Byte]): Option[Array[Byte]] = { - val result: Array[Byte] = mapValueClient.get(mapKey(name, key)) + val result: Array[Byte] = mapValueClient.getValue(mapKey(name, key)) result match { case null => None case _ => Some(result) @@ -79,7 +101,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with } def removeMapStorageFor(name: String, key: Array[Byte]) = { - val keys: Set[Array[Byte]] = mapKeyClient.getValue(name, new HashSet[Array[Byte]](0)) + var keys = mapKeyClient.getValue(name, new TreeSet[Array[Byte]]()) keys -= key mapKeyClient.put(name, keys) mapValueClient.delete(mapKey(name, key)) @@ -87,33 +109,35 @@ MapStorageBackend[Array[Byte], Array[Byte]] with def removeMapStorageFor(name: String) = { - val keys: Set[Array[Byte]] = mapKeyClient.getValue(name, new HashSet[Array[Byte]](0)) + val keys = mapKeyClient.getValue(name, new TreeSet[Array[Byte]]()) keys.foreach { - mapValueClient.delete(mapKey(name, _)) + key => + mapValueClient.delete(mapKey(name, key)) } mapKeyClient.delete(name) } def insertMapStorageEntryFor(name: String, key: Array[Byte], value: Array[Byte]) = { - mapValueClient.put(mapKey(name, key)) - val keys: Set[Array[Byte]] = mapKeyClient.getValue(name, new HashSet[Array[Byte]](0)) + mapValueClient.put(mapKey(name, key), value) + var keys = mapKeyClient.getValue(name, new TreeSet[Array[Byte]]()) keys += key mapKeyClient.put(name, keys) } def insertMapStorageEntriesFor(name: String, entries: List[(Array[Byte], Array[Byte])]) = { - val newKeys = new HashSet[Array[Byte]] - entries.foreach { - (key, value) => mapValueClient.put(mapKey(name, key), value) - newKeys += key + val newKeys = entries.map { + case (key, value) => { + mapValueClient.put(mapKey(name, key), value) + key + } } - val keys: Set[Array[Byte]] = mapKeyClient.getValue(name, new HashSet[Array[Byte]](0)) - keys += key + var keys = mapKeyClient.getValue(name, new TreeSet[Array[Byte]]()) + keys ++= newKeys mapKeyClient.put(name, keys) } - def getVectorStorageSizeFor(name: String): Int = null + def getVectorStorageSizeFor(name: String): Int = 0 def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[Array[Byte]] = null