diff --git a/akka-persistence/akka-persistence-hbase/src/main/scala/HbaseStorage.scala b/akka-persistence/akka-persistence-hbase/src/main/scala/HbaseStorage.scala
index dcb07cfe9e..1c3abdff4e 100644
--- a/akka-persistence/akka-persistence-hbase/src/main/scala/HbaseStorage.scala
+++ b/akka-persistence/akka-persistence-hbase/src/main/scala/HbaseStorage.scala
@@ -8,20 +8,20 @@ import se.scalablesolutions.akka.util.UUID
import se.scalablesolutions.akka.stm._
import se.scalablesolutions.akka.persistence.common._
-object HbaseStorage /*extends Storage*/ {
+object HbaseStorage extends Storage {
type ElementType = Array[Byte]
- //def newMap: PersistentMap[ElementType, ElementType] = newMap(UUID.newUuid.toString)
- //def newVector: PersistentVector[ElementType] = newVector(UUID.newUuid.toString)
- //def newRef: PersistentRef[ElementType] = newRef(UUID.newUuid.toString)
+ def newMap: PersistentMap[ElementType, ElementType] = newMap(UUID.newUuid.toString)
+ def newVector: PersistentVector[ElementType] = newVector(UUID.newUuid.toString)
+ def newRef: PersistentRef[ElementType] = newRef(UUID.newUuid.toString)
- //def getMap(id: String): PersistentMap[ElementType, ElementType] = newMap(id)
- //def getVector(id: String): PersistentVector[ElementType] = newVector(id)
- //def getRef(id: String): PersistentRef[ElementType] = newRef(id)
+ def getMap(id: String): PersistentMap[ElementType, ElementType] = newMap(id)
+ def getVector(id: String): PersistentVector[ElementType] = newVector(id)
+ def getRef(id: String): PersistentRef[ElementType] = newRef(id)
- //def newMap(id: String): PersistentMap[ElementType, ElementType] = new HbasePersistentMap(id)
- //def newVector(id: String): PersistentVector[ElementType] = new HbasePersistentVector(id)
- //def newRef(id: String): PersistentRef[ElementType] = new HbasePersistentRef(id)
+ def newMap(id: String): PersistentMap[ElementType, ElementType] = new HbasePersistentMap(id)
+ def newVector(id: String): PersistentVector[ElementType] = new HbasePersistentVector(id)
+ def newRef(id: String): PersistentRef[ElementType] = new HbasePersistentRef(id)
}
/**
@@ -29,9 +29,9 @@ object HbaseStorage /*extends Storage*/ {
*
* @author David Greco
*/
-class HbasePersistentMap(id: String) /*extends PersistentMapBinary*/ {
+class HbasePersistentMap(id: String) extends PersistentMapBinary {
val uuid = id
- //val storage = HbaseStorageBackend
+ val storage = HbaseStorageBackend
}
/**
@@ -39,12 +39,12 @@ class HbasePersistentMap(id: String) /*extends PersistentMapBinary*/ {
*
* @author David Greco
*/
-class HbasePersistentVector(id: String) /*extends PersistentVector[Array[Byte]]*/ {
+class HbasePersistentVector(id: String) extends PersistentVector[Array[Byte]] {
val uuid = id
- //val storage = HbaseStorageBackend
+ val storage = HbaseStorageBackend
}
-class HbasePersistentRef(id: String) /*extends PersistentRef[Array[Byte]]*/ {
+class HbasePersistentRef(id: String) extends PersistentRef[Array[Byte]] {
val uuid = id
- //val storage = HbaseStorageBackend
+ val storage = HbaseStorageBackend
}
diff --git a/akka-persistence/akka-persistence-hbase/src/main/scala/HbaseStorageBackend.scala b/akka-persistence/akka-persistence-hbase/src/main/scala/HbaseStorageBackend.scala
index 11187c3888..dafaafb0c4 100644
--- a/akka-persistence/akka-persistence-hbase/src/main/scala/HbaseStorageBackend.scala
+++ b/akka-persistence/akka-persistence-hbase/src/main/scala/HbaseStorageBackend.scala
@@ -13,17 +13,13 @@ import se.scalablesolutions.akka.config.Config.config
/**
* @author David Greco
*/
-private[akka] object HbaseStorageBackend /* extends
- MapStorageBackend[Array[Byte], Array[Byte]] with
- VectorStorageBackend[Array[Byte]] with
- RefStorageBackend[Array[Byte]] with
- Logging */ {
+private[akka] object HbaseStorageBackend extends MapStorageBackend[Array[Byte], Array[Byte]] with VectorStorageBackend[Array[Byte]] with RefStorageBackend[Array[Byte]] with Logging {
type ElementType = Array[Byte]
- val KEYSPACE = "akka"
- val REF_KEY = "item".getBytes("UTF-8")
- val EMPTY_BYTE_ARRAY = new Array[Byte](0)
+ val KEYSPACE = "akka"
+ val REF_KEY = "item".getBytes("UTF-8")
+ val EMPTY_BYTE_ARRAY = new Array[Byte](0)
val HBASE_ZOOKEEPER_QUORUM = config.getString("akka.storage.hbase.zookeeper.quorum", "127.0.0.1")
@@ -31,65 +27,65 @@ private[akka] object HbaseStorageBackend /* extends
// For Ref
// ===============================================================
- def insertRefStorageFor(name: String, element: Array[Byte]) = {
- }
+ def insertRefStorageFor(name: String, element: Array[Byte]) = {}
- //def getRefStorageFor(name: String): Option[Array[Byte]] = {
- //}
+ def getRefStorageFor(name: String): Option[Array[Byte]] = {
+ return None
+ }
// ===============================================================
// For Vector
// ===============================================================
- def insertVectorStorageEntryFor(name: String, element: Array[Byte]) = {
+ def insertVectorStorageEntryFor(name: String, element: Array[Byte]) = {}
+
+ def insertVectorStorageEntriesFor(name: String, elements: List[Array[Byte]]) = elements.foreach(insertVectorStorageEntryFor(name, _))
+
+ def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]) = {}
+
+ def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] = {
+ EMPTY_BYTE_ARRAY
}
- def insertVectorStorageEntriesFor(name: String, elements: List[Array[Byte]]) =
- elements.foreach(insertVectorStorageEntryFor(name, _))
-
- def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]) = {
- }
-
- //def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] = {
- //}
-
/**
* if start and finish both are defined, ignore count and
* report the range [start, finish)
* if start is not defined, assume start = 0
* if start == 0 and finish == 0, return an empty collection
*/
-// def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int):
-// }
+ def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[Array[Byte]] = {
+ Nil
+ }
-// def getVectorStorageSizeFor(name: String): Int = {
-// }
+ def getVectorStorageSizeFor(name: String): Int = {
+ 0
+ }
// ===============================================================
// For Map
// ===============================================================
- def insertMapStorageEntryFor(name: String, key: Array[Byte], element: Array[Byte]) = {
+ def insertMapStorageEntryFor(name: String, key: Array[Byte], element: Array[Byte]) = {}
+
+ def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[Array[Byte], Array[Byte]]]) = {}
+
+ def getMapStorageEntryFor(name: String, key: Array[Byte]): Option[Array[Byte]] = {
+ None
}
- def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[Array[Byte], Array[Byte]]]) = {
+ def getMapStorageFor(name: String): List[Tuple2[Array[Byte], Array[Byte]]] = {
+ Nil
}
-// def getMapStorageEntryFor(name: String, key: Array[Byte]): Option[Array[Byte]] = {
-// }
-
-// def getMapStorageFor(name: String): List[Tuple2[Array[Byte], Array[Byte]]] = {
-// }
-
-// def getMapStorageSizeFor(name: String): Int = {
-// }
+ def getMapStorageSizeFor(name: String): Int = {
+ 0
+ }
def removeMapStorageFor(name: String): Unit = removeMapStorageFor(name, null)
- def removeMapStorageFor(name: String, key: Array[Byte]): Unit = {
- }
+ def removeMapStorageFor(name: String, key: Array[Byte]): Unit = {}
-// def getMapStorageRangeFor(name: String, start: Option[Array[Byte]], finish: Option[Array[Byte]], count: Int):
-// List[Tuple2[Array[Byte], Array[Byte]]] = {
-// }
+ def getMapStorageRangeFor(name: String, start: Option[Array[Byte]], finish: Option[Array[Byte]], count: Int): List[Tuple2[Array[Byte], Array[Byte]]] = {
+ Nil
+ }
}
diff --git a/akka-persistence/akka-persistence-hbase/src/test/scala/SimpleHbaseTest.scala b/akka-persistence/akka-persistence-hbase/src/test/scala/SimpleHbaseTest.scala
index 3dc1517930..40c00cc30e 100644
--- a/akka-persistence/akka-persistence-hbase/src/test/scala/SimpleHbaseTest.scala
+++ b/akka-persistence/akka-persistence-hbase/src/test/scala/SimpleHbaseTest.scala
@@ -42,6 +42,22 @@ class PersistenceSpec extends Spec with BeforeAndAfterAll with ShouldMatchers {
table should not equal(null)
}
+ it("should use the quorum read from the akka configuration") {
+ import se.scalablesolutions.akka.config.Config.config
+ import org.apache.hadoop.hbase.HBaseConfiguration
+ import org.apache.hadoop.hbase.client.HBaseAdmin
+ import org.apache.hadoop.hbase.client.HTable
+
+ val HBASE_ZOOKEEPER_QUORUM = config.getString("akka.storage.hbase.zookeeper.quorum", "0")
+ HBASE_ZOOKEEPER_QUORUM should not equal("0")
+ HBASE_ZOOKEEPER_QUORUM should equal("localhost")
+
+ val configuration = new HBaseConfiguration
+ configuration.set("hbase.zookeeper.quorum", HBASE_ZOOKEEPER_QUORUM)
+ val admin = new HBaseAdmin(configuration)
+ admin.tableExists("ATable") should equal(true)
+ }
+
}
}
diff --git a/config/akka-reference.conf b/config/akka-reference.conf
index 10a9c84118..09e1f39ed3 100644
--- a/config/akka-reference.conf
+++ b/config/akka-reference.conf
@@ -164,5 +164,9 @@ akka {
hostname = "127.0.0.1" # IP address or hostname of the Redis instance
port = 6379 # Port to Redis
}
+
+ hbase {
+ zookeeper.quorum = "localhost"
+ }
}
}