From fb2ba7ec0ccf79c5d451d8ccaee924891f18d114 Mon Sep 17 00:00:00 2001 From: David Greco Date: Wed, 15 Sep 2010 16:06:02 +0200 Subject: [PATCH] working on the hbase integration --- .../src/main/scala/HbaseStorage.scala | 32 ++++---- .../src/main/scala/HbaseStorageBackend.scala | 78 +++++++++---------- .../src/test/scala/SimpleHbaseTest.scala | 16 ++++ config/akka-reference.conf | 4 + 4 files changed, 73 insertions(+), 57 deletions(-) diff --git a/akka-persistence/akka-persistence-hbase/src/main/scala/HbaseStorage.scala b/akka-persistence/akka-persistence-hbase/src/main/scala/HbaseStorage.scala index dcb07cfe9e..1c3abdff4e 100644 --- a/akka-persistence/akka-persistence-hbase/src/main/scala/HbaseStorage.scala +++ b/akka-persistence/akka-persistence-hbase/src/main/scala/HbaseStorage.scala @@ -8,20 +8,20 @@ import se.scalablesolutions.akka.util.UUID import se.scalablesolutions.akka.stm._ import se.scalablesolutions.akka.persistence.common._ -object HbaseStorage /*extends Storage*/ { +object HbaseStorage extends Storage { type ElementType = Array[Byte] - //def newMap: PersistentMap[ElementType, ElementType] = newMap(UUID.newUuid.toString) - //def newVector: PersistentVector[ElementType] = newVector(UUID.newUuid.toString) - //def newRef: PersistentRef[ElementType] = newRef(UUID.newUuid.toString) + def newMap: PersistentMap[ElementType, ElementType] = newMap(UUID.newUuid.toString) + def newVector: PersistentVector[ElementType] = newVector(UUID.newUuid.toString) + def newRef: PersistentRef[ElementType] = newRef(UUID.newUuid.toString) - //def getMap(id: String): PersistentMap[ElementType, ElementType] = newMap(id) - //def getVector(id: String): PersistentVector[ElementType] = newVector(id) - //def getRef(id: String): PersistentRef[ElementType] = newRef(id) + def getMap(id: String): PersistentMap[ElementType, ElementType] = newMap(id) + def getVector(id: String): PersistentVector[ElementType] = newVector(id) + def getRef(id: String): PersistentRef[ElementType] = newRef(id) - //def newMap(id: String): PersistentMap[ElementType, ElementType] = new HbasePersistentMap(id) - //def newVector(id: String): PersistentVector[ElementType] = new HbasePersistentVector(id) - //def newRef(id: String): PersistentRef[ElementType] = new HbasePersistentRef(id) + def newMap(id: String): PersistentMap[ElementType, ElementType] = new HbasePersistentMap(id) + def newVector(id: String): PersistentVector[ElementType] = new HbasePersistentVector(id) + def newRef(id: String): PersistentRef[ElementType] = new HbasePersistentRef(id) } /** @@ -29,9 +29,9 @@ object HbaseStorage /*extends Storage*/ { * * @author David Greco */ -class HbasePersistentMap(id: String) /*extends PersistentMapBinary*/ { +class HbasePersistentMap(id: String) extends PersistentMapBinary { val uuid = id - //val storage = HbaseStorageBackend + val storage = HbaseStorageBackend } /** @@ -39,12 +39,12 @@ class HbasePersistentMap(id: String) /*extends PersistentMapBinary*/ { * * @author David Greco */ -class HbasePersistentVector(id: String) /*extends PersistentVector[Array[Byte]]*/ { +class HbasePersistentVector(id: String) extends PersistentVector[Array[Byte]] { val uuid = id - //val storage = HbaseStorageBackend + val storage = HbaseStorageBackend } -class HbasePersistentRef(id: String) /*extends PersistentRef[Array[Byte]]*/ { +class HbasePersistentRef(id: String) extends PersistentRef[Array[Byte]] { val uuid = id - //val storage = HbaseStorageBackend + val storage = HbaseStorageBackend } diff --git a/akka-persistence/akka-persistence-hbase/src/main/scala/HbaseStorageBackend.scala b/akka-persistence/akka-persistence-hbase/src/main/scala/HbaseStorageBackend.scala index 11187c3888..dafaafb0c4 100644 --- a/akka-persistence/akka-persistence-hbase/src/main/scala/HbaseStorageBackend.scala +++ b/akka-persistence/akka-persistence-hbase/src/main/scala/HbaseStorageBackend.scala @@ -13,17 +13,13 @@ import se.scalablesolutions.akka.config.Config.config /** * @author David Greco */ -private[akka] object HbaseStorageBackend /* extends - MapStorageBackend[Array[Byte], Array[Byte]] with - VectorStorageBackend[Array[Byte]] with - RefStorageBackend[Array[Byte]] with - Logging */ { +private[akka] object HbaseStorageBackend extends MapStorageBackend[Array[Byte], Array[Byte]] with VectorStorageBackend[Array[Byte]] with RefStorageBackend[Array[Byte]] with Logging { type ElementType = Array[Byte] - val KEYSPACE = "akka" - val REF_KEY = "item".getBytes("UTF-8") - val EMPTY_BYTE_ARRAY = new Array[Byte](0) + val KEYSPACE = "akka" + val REF_KEY = "item".getBytes("UTF-8") + val EMPTY_BYTE_ARRAY = new Array[Byte](0) val HBASE_ZOOKEEPER_QUORUM = config.getString("akka.storage.hbase.zookeeper.quorum", "127.0.0.1") @@ -31,65 +27,65 @@ private[akka] object HbaseStorageBackend /* extends // For Ref // =============================================================== - def insertRefStorageFor(name: String, element: Array[Byte]) = { - } + def insertRefStorageFor(name: String, element: Array[Byte]) = {} - //def getRefStorageFor(name: String): Option[Array[Byte]] = { - //} + def getRefStorageFor(name: String): Option[Array[Byte]] = { + return None + } // =============================================================== // For Vector // =============================================================== - def insertVectorStorageEntryFor(name: String, element: Array[Byte]) = { + def insertVectorStorageEntryFor(name: String, element: Array[Byte]) = {} + + def insertVectorStorageEntriesFor(name: String, elements: List[Array[Byte]]) = elements.foreach(insertVectorStorageEntryFor(name, _)) + + def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]) = {} + + def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] = { + EMPTY_BYTE_ARRAY } - def insertVectorStorageEntriesFor(name: String, elements: List[Array[Byte]]) = - elements.foreach(insertVectorStorageEntryFor(name, _)) - - def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]) = { - } - - //def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] = { - //} - /** * if start and finish both are defined, ignore count and * report the range [start, finish) * if start is not defined, assume start = 0 * if start == 0 and finish == 0, return an empty collection */ -// def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): -// } + def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[Array[Byte]] = { + Nil + } -// def getVectorStorageSizeFor(name: String): Int = { -// } + def getVectorStorageSizeFor(name: String): Int = { + 0 + } // =============================================================== // For Map // =============================================================== - def insertMapStorageEntryFor(name: String, key: Array[Byte], element: Array[Byte]) = { + def insertMapStorageEntryFor(name: String, key: Array[Byte], element: Array[Byte]) = {} + + def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[Array[Byte], Array[Byte]]]) = {} + + def getMapStorageEntryFor(name: String, key: Array[Byte]): Option[Array[Byte]] = { + None } - def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[Array[Byte], Array[Byte]]]) = { + def getMapStorageFor(name: String): List[Tuple2[Array[Byte], Array[Byte]]] = { + Nil } -// def getMapStorageEntryFor(name: String, key: Array[Byte]): Option[Array[Byte]] = { -// } - -// def getMapStorageFor(name: String): List[Tuple2[Array[Byte], Array[Byte]]] = { -// } - -// def getMapStorageSizeFor(name: String): Int = { -// } + def getMapStorageSizeFor(name: String): Int = { + 0 + } def removeMapStorageFor(name: String): Unit = removeMapStorageFor(name, null) - def removeMapStorageFor(name: String, key: Array[Byte]): Unit = { - } + def removeMapStorageFor(name: String, key: Array[Byte]): Unit = {} -// def getMapStorageRangeFor(name: String, start: Option[Array[Byte]], finish: Option[Array[Byte]], count: Int): -// List[Tuple2[Array[Byte], Array[Byte]]] = { -// } + def getMapStorageRangeFor(name: String, start: Option[Array[Byte]], finish: Option[Array[Byte]], count: Int): List[Tuple2[Array[Byte], Array[Byte]]] = { + Nil + } } diff --git a/akka-persistence/akka-persistence-hbase/src/test/scala/SimpleHbaseTest.scala b/akka-persistence/akka-persistence-hbase/src/test/scala/SimpleHbaseTest.scala index 3dc1517930..40c00cc30e 100644 --- a/akka-persistence/akka-persistence-hbase/src/test/scala/SimpleHbaseTest.scala +++ b/akka-persistence/akka-persistence-hbase/src/test/scala/SimpleHbaseTest.scala @@ -42,6 +42,22 @@ class PersistenceSpec extends Spec with BeforeAndAfterAll with ShouldMatchers { table should not equal(null) } + it("should use the quorum read from the akka configuration") { + import se.scalablesolutions.akka.config.Config.config + import org.apache.hadoop.hbase.HBaseConfiguration + import org.apache.hadoop.hbase.client.HBaseAdmin + import org.apache.hadoop.hbase.client.HTable + + val HBASE_ZOOKEEPER_QUORUM = config.getString("akka.storage.hbase.zookeeper.quorum", "0") + HBASE_ZOOKEEPER_QUORUM should not equal("0") + HBASE_ZOOKEEPER_QUORUM should equal("localhost") + + val configuration = new HBaseConfiguration + configuration.set("hbase.zookeeper.quorum", HBASE_ZOOKEEPER_QUORUM) + val admin = new HBaseAdmin(configuration) + admin.tableExists("ATable") should equal(true) + } + } } diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 10a9c84118..09e1f39ed3 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -164,5 +164,9 @@ akka { hostname = "127.0.0.1" # IP address or hostname of the Redis instance port = 6379 # Port to Redis } + + hbase { + zookeeper.quorum = "localhost" + } } }