From bd4106e7ea35ba91a87e49b1769ff415b78d2d39 Mon Sep 17 00:00:00 2001 From: David Greco Date: Fri, 17 Sep 2010 14:10:32 +0200 Subject: [PATCH] Starting to work on the hbase storage backend for maps --- .../src/main/scala/HbaseStorageBackend.scala | 22 ++++++++++++++++--- .../src/test/scala/HbaseStorageSpec.scala | 6 ++--- 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/akka-persistence/akka-persistence-hbase/src/main/scala/HbaseStorageBackend.scala b/akka-persistence/akka-persistence-hbase/src/main/scala/HbaseStorageBackend.scala index ce8934f393..e477cfe7e3 100644 --- a/akka-persistence/akka-persistence-hbase/src/main/scala/HbaseStorageBackend.scala +++ b/akka-persistence/akka-persistence-hbase/src/main/scala/HbaseStorageBackend.scala @@ -17,6 +17,7 @@ import org.apache.hadoop.hbase.client.HBaseAdmin import org.apache.hadoop.hbase.client.HTable import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.client.Get +import org.apache.hadoop.hbase.client.Delete import org.apache.hadoop.hbase.util.Bytes /** @@ -196,7 +197,15 @@ private[akka] object HbaseStorageBackend extends MapStorageBackend[Array[Byte], } def getMapStorageFor(name: String): List[Tuple2[Array[Byte], Array[Byte]]] = { - Nil + val row = new Get(Bytes.toBytes(name)) + val result = MAP_TABLE.get(row) + val raw = result.getFamilyMap(Bytes.toBytes(MAP_ELEMENT_COLUMN_FAMILY_NAME)).entrySet.toArray + val listBuffer = new ListBuffer[Tuple2[Array[Byte], Array[Byte]]] + + for(i <- Range(raw.size-1, -1, -1)) { + listBuffer += Tuple2(raw.apply(i).asInstanceOf[java.util.Map.Entry[Array[Byte], Array[Byte]]].getKey, raw.apply(i).asInstanceOf[java.util.Map.Entry[Array[Byte],Array[Byte]]].getValue) + } + listBuffer.toList } def getMapStorageSizeFor(name: String): Int = { @@ -209,9 +218,16 @@ private[akka] object HbaseStorageBackend extends MapStorageBackend[Array[Byte], } } - def removeMapStorageFor(name: String): Unit = removeMapStorageFor(name, null) + def removeMapStorageFor(name: String): Unit = { + val row = new Delete(Bytes.toBytes(name)) + MAP_TABLE.delete(row) + } - def removeMapStorageFor(name: String, key: Array[Byte]): Unit = {} + def removeMapStorageFor(name: String, key: Array[Byte]): Unit = { + val row = new Delete(Bytes.toBytes(name)) + row.deleteColumns(Bytes.toBytes(MAP_ELEMENT_COLUMN_FAMILY_NAME), key) + MAP_TABLE.delete(row) + } def getMapStorageRangeFor(name: String, start: Option[Array[Byte]], finish: Option[Array[Byte]], count: Int): List[Tuple2[Array[Byte], Array[Byte]]] = { Nil diff --git a/akka-persistence/akka-persistence-hbase/src/test/scala/HbaseStorageSpec.scala b/akka-persistence/akka-persistence-hbase/src/test/scala/HbaseStorageSpec.scala index 52aeabd49b..a52a10cb34 100644 --- a/akka-persistence/akka-persistence-hbase/src/test/scala/HbaseStorageSpec.scala +++ b/akka-persistence/akka-persistence-hbase/src/test/scala/HbaseStorageSpec.scala @@ -44,9 +44,9 @@ BeforeAndAfterEach { new String(getMapStorageEntryFor("t1", "stroustrup".getBytes).get) should equal("c++") getMapStorageEntryFor("t1", "torvalds".getBytes) should equal(None) } -/* + it("should insert with multiple keys and values") { - import MongoStorageBackend._ + import HbaseStorageBackend._ val l = List(("stroustrup", "c++"), ("odersky", "scala"), ("gosling", "java")) insertMapStorageEntriesFor("t1", l.map { case (k, v) => (k.getBytes, v.getBytes) }) @@ -66,7 +66,7 @@ BeforeAndAfterEach { removeMapStorageFor("t1") getMapStorageSizeFor("t1") should equal(0) } - +/* it("should do proper range queries") { import MongoStorageBackend._ val l = List(