working on the hbase integration
This commit is contained in:
parent
7475b85fc5
commit
fb2ba7ec0c
4 changed files with 73 additions and 57 deletions
|
|
@ -8,20 +8,20 @@ import se.scalablesolutions.akka.util.UUID
|
|||
import se.scalablesolutions.akka.stm._
|
||||
import se.scalablesolutions.akka.persistence.common._
|
||||
|
||||
object HbaseStorage /*extends Storage*/ {
|
||||
object HbaseStorage extends Storage {
|
||||
type ElementType = Array[Byte]
|
||||
|
||||
//def newMap: PersistentMap[ElementType, ElementType] = newMap(UUID.newUuid.toString)
|
||||
//def newVector: PersistentVector[ElementType] = newVector(UUID.newUuid.toString)
|
||||
//def newRef: PersistentRef[ElementType] = newRef(UUID.newUuid.toString)
|
||||
def newMap: PersistentMap[ElementType, ElementType] = newMap(UUID.newUuid.toString)
|
||||
def newVector: PersistentVector[ElementType] = newVector(UUID.newUuid.toString)
|
||||
def newRef: PersistentRef[ElementType] = newRef(UUID.newUuid.toString)
|
||||
|
||||
//def getMap(id: String): PersistentMap[ElementType, ElementType] = newMap(id)
|
||||
//def getVector(id: String): PersistentVector[ElementType] = newVector(id)
|
||||
//def getRef(id: String): PersistentRef[ElementType] = newRef(id)
|
||||
def getMap(id: String): PersistentMap[ElementType, ElementType] = newMap(id)
|
||||
def getVector(id: String): PersistentVector[ElementType] = newVector(id)
|
||||
def getRef(id: String): PersistentRef[ElementType] = newRef(id)
|
||||
|
||||
//def newMap(id: String): PersistentMap[ElementType, ElementType] = new HbasePersistentMap(id)
|
||||
//def newVector(id: String): PersistentVector[ElementType] = new HbasePersistentVector(id)
|
||||
//def newRef(id: String): PersistentRef[ElementType] = new HbasePersistentRef(id)
|
||||
def newMap(id: String): PersistentMap[ElementType, ElementType] = new HbasePersistentMap(id)
|
||||
def newVector(id: String): PersistentVector[ElementType] = new HbasePersistentVector(id)
|
||||
def newRef(id: String): PersistentRef[ElementType] = new HbasePersistentRef(id)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -29,9 +29,9 @@ object HbaseStorage /*extends Storage*/ {
|
|||
*
|
||||
* @author <a href="http://www.davidgreco.it">David Greco</a>
|
||||
*/
|
||||
class HbasePersistentMap(id: String) /*extends PersistentMapBinary*/ {
|
||||
class HbasePersistentMap(id: String) extends PersistentMapBinary {
|
||||
val uuid = id
|
||||
//val storage = HbaseStorageBackend
|
||||
val storage = HbaseStorageBackend
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -39,12 +39,12 @@ class HbasePersistentMap(id: String) /*extends PersistentMapBinary*/ {
|
|||
*
|
||||
* @author <a href="http://www.davidgreco.it">David Greco</a>
|
||||
*/
|
||||
class HbasePersistentVector(id: String) /*extends PersistentVector[Array[Byte]]*/ {
|
||||
class HbasePersistentVector(id: String) extends PersistentVector[Array[Byte]] {
|
||||
val uuid = id
|
||||
//val storage = HbaseStorageBackend
|
||||
val storage = HbaseStorageBackend
|
||||
}
|
||||
|
||||
class HbasePersistentRef(id: String) /*extends PersistentRef[Array[Byte]]*/ {
|
||||
class HbasePersistentRef(id: String) extends PersistentRef[Array[Byte]] {
|
||||
val uuid = id
|
||||
//val storage = HbaseStorageBackend
|
||||
val storage = HbaseStorageBackend
|
||||
}
|
||||
|
|
|
|||
|
|
@ -13,17 +13,13 @@ import se.scalablesolutions.akka.config.Config.config
|
|||
/**
|
||||
* @author <a href="http://www.davidgreco.it">David Greco</a>
|
||||
*/
|
||||
private[akka] object HbaseStorageBackend /* extends
|
||||
MapStorageBackend[Array[Byte], Array[Byte]] with
|
||||
VectorStorageBackend[Array[Byte]] with
|
||||
RefStorageBackend[Array[Byte]] with
|
||||
Logging */ {
|
||||
private[akka] object HbaseStorageBackend extends MapStorageBackend[Array[Byte], Array[Byte]] with VectorStorageBackend[Array[Byte]] with RefStorageBackend[Array[Byte]] with Logging {
|
||||
|
||||
type ElementType = Array[Byte]
|
||||
|
||||
val KEYSPACE = "akka"
|
||||
val REF_KEY = "item".getBytes("UTF-8")
|
||||
val EMPTY_BYTE_ARRAY = new Array[Byte](0)
|
||||
val KEYSPACE = "akka"
|
||||
val REF_KEY = "item".getBytes("UTF-8")
|
||||
val EMPTY_BYTE_ARRAY = new Array[Byte](0)
|
||||
|
||||
val HBASE_ZOOKEEPER_QUORUM = config.getString("akka.storage.hbase.zookeeper.quorum", "127.0.0.1")
|
||||
|
||||
|
|
@ -31,65 +27,65 @@ private[akka] object HbaseStorageBackend /* extends
|
|||
// For Ref
|
||||
// ===============================================================
|
||||
|
||||
def insertRefStorageFor(name: String, element: Array[Byte]) = {
|
||||
}
|
||||
def insertRefStorageFor(name: String, element: Array[Byte]) = {}
|
||||
|
||||
//def getRefStorageFor(name: String): Option[Array[Byte]] = {
|
||||
//}
|
||||
def getRefStorageFor(name: String): Option[Array[Byte]] = {
|
||||
return None
|
||||
}
|
||||
|
||||
// ===============================================================
|
||||
// For Vector
|
||||
// ===============================================================
|
||||
|
||||
def insertVectorStorageEntryFor(name: String, element: Array[Byte]) = {
|
||||
def insertVectorStorageEntryFor(name: String, element: Array[Byte]) = {}
|
||||
|
||||
def insertVectorStorageEntriesFor(name: String, elements: List[Array[Byte]]) = elements.foreach(insertVectorStorageEntryFor(name, _))
|
||||
|
||||
def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]) = {}
|
||||
|
||||
def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] = {
|
||||
EMPTY_BYTE_ARRAY
|
||||
}
|
||||
|
||||
def insertVectorStorageEntriesFor(name: String, elements: List[Array[Byte]]) =
|
||||
elements.foreach(insertVectorStorageEntryFor(name, _))
|
||||
|
||||
def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]) = {
|
||||
}
|
||||
|
||||
//def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] = {
|
||||
//}
|
||||
|
||||
/**
|
||||
* if <tt>start</tt> and <tt>finish</tt> both are defined, ignore <tt>count</tt> and
|
||||
* report the range [start, finish)
|
||||
* if <tt>start</tt> is not defined, assume <tt>start</tt> = 0
|
||||
* if <tt>start</tt> == 0 and <tt>finish</tt> == 0, return an empty collection
|
||||
*/
|
||||
// def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int):
|
||||
// }
|
||||
def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[Array[Byte]] = {
|
||||
Nil
|
||||
}
|
||||
|
||||
// def getVectorStorageSizeFor(name: String): Int = {
|
||||
// }
|
||||
def getVectorStorageSizeFor(name: String): Int = {
|
||||
0
|
||||
}
|
||||
|
||||
// ===============================================================
|
||||
// For Map
|
||||
// ===============================================================
|
||||
|
||||
def insertMapStorageEntryFor(name: String, key: Array[Byte], element: Array[Byte]) = {
|
||||
def insertMapStorageEntryFor(name: String, key: Array[Byte], element: Array[Byte]) = {}
|
||||
|
||||
def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[Array[Byte], Array[Byte]]]) = {}
|
||||
|
||||
def getMapStorageEntryFor(name: String, key: Array[Byte]): Option[Array[Byte]] = {
|
||||
None
|
||||
}
|
||||
|
||||
def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[Array[Byte], Array[Byte]]]) = {
|
||||
def getMapStorageFor(name: String): List[Tuple2[Array[Byte], Array[Byte]]] = {
|
||||
Nil
|
||||
}
|
||||
|
||||
// def getMapStorageEntryFor(name: String, key: Array[Byte]): Option[Array[Byte]] = {
|
||||
// }
|
||||
|
||||
// def getMapStorageFor(name: String): List[Tuple2[Array[Byte], Array[Byte]]] = {
|
||||
// }
|
||||
|
||||
// def getMapStorageSizeFor(name: String): Int = {
|
||||
// }
|
||||
def getMapStorageSizeFor(name: String): Int = {
|
||||
0
|
||||
}
|
||||
|
||||
def removeMapStorageFor(name: String): Unit = removeMapStorageFor(name, null)
|
||||
|
||||
def removeMapStorageFor(name: String, key: Array[Byte]): Unit = {
|
||||
}
|
||||
def removeMapStorageFor(name: String, key: Array[Byte]): Unit = {}
|
||||
|
||||
// def getMapStorageRangeFor(name: String, start: Option[Array[Byte]], finish: Option[Array[Byte]], count: Int):
|
||||
// List[Tuple2[Array[Byte], Array[Byte]]] = {
|
||||
// }
|
||||
def getMapStorageRangeFor(name: String, start: Option[Array[Byte]], finish: Option[Array[Byte]], count: Int): List[Tuple2[Array[Byte], Array[Byte]]] = {
|
||||
Nil
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -42,6 +42,22 @@ class PersistenceSpec extends Spec with BeforeAndAfterAll with ShouldMatchers {
|
|||
table should not equal(null)
|
||||
}
|
||||
|
||||
it("should use the quorum read from the akka configuration") {
|
||||
import se.scalablesolutions.akka.config.Config.config
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration
|
||||
import org.apache.hadoop.hbase.client.HBaseAdmin
|
||||
import org.apache.hadoop.hbase.client.HTable
|
||||
|
||||
val HBASE_ZOOKEEPER_QUORUM = config.getString("akka.storage.hbase.zookeeper.quorum", "0")
|
||||
HBASE_ZOOKEEPER_QUORUM should not equal("0")
|
||||
HBASE_ZOOKEEPER_QUORUM should equal("localhost")
|
||||
|
||||
val configuration = new HBaseConfiguration
|
||||
configuration.set("hbase.zookeeper.quorum", HBASE_ZOOKEEPER_QUORUM)
|
||||
val admin = new HBaseAdmin(configuration)
|
||||
admin.tableExists("ATable") should equal(true)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -164,5 +164,9 @@ akka {
|
|||
hostname = "127.0.0.1" # IP address or hostname of the Redis instance
|
||||
port = 6379 # Port to Redis
|
||||
}
|
||||
|
||||
hbase {
|
||||
zookeeper.quorum = "localhost"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue