Implemented the Ref and the Vector backend apis
This commit is contained in:
parent
633ba6e51b
commit
cb830ed0ef
2 changed files with 141 additions and 44 deletions
|
|
@ -4,54 +4,120 @@
|
|||
|
||||
package se.scalablesolutions.akka.persistence.hbase
|
||||
|
||||
import scala.collection.mutable.ListBuffer
|
||||
import se.scalablesolutions.akka.stm._
|
||||
import se.scalablesolutions.akka.persistence.common._
|
||||
import se.scalablesolutions.akka.util.Logging
|
||||
import se.scalablesolutions.akka.util.Helpers._
|
||||
import se.scalablesolutions.akka.config.Config.config
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor
|
||||
import org.apache.hadoop.hbase.HTableDescriptor
|
||||
import org.apache.hadoop.hbase.client.HBaseAdmin
|
||||
import org.apache.hadoop.hbase.client.HTable
|
||||
import org.apache.hadoop.hbase.client.Put
|
||||
import org.apache.hadoop.hbase.client.Get
|
||||
import org.apache.hadoop.hbase.util.Bytes
|
||||
|
||||
/**
|
||||
* @author <a href="http://www.davidgreco.it">David Greco</a>
|
||||
*/
|
||||
private[akka] object HbaseStorageBackend extends MapStorageBackend[Array[Byte], Array[Byte]] with VectorStorageBackend[Array[Byte]] with RefStorageBackend[Array[Byte]] with Logging {
|
||||
|
||||
type ElementType = Array[Byte]
|
||||
|
||||
val KEYSPACE = "akka"
|
||||
val REF_KEY = "item".getBytes("UTF-8")
|
||||
val EMPTY_BYTE_ARRAY = new Array[Byte](0)
|
||||
|
||||
val HBASE_ZOOKEEPER_QUORUM = config.getString("akka.storage.hbase.zookeeper.quorum", "localhost")
|
||||
|
||||
val CONFIGURATION = new HBaseConfiguration
|
||||
CONFIGURATION.set("hbase.zookeeper.quorum", HBASE_ZOOKEEPER_QUORUM)
|
||||
val ADMIN = new HBaseAdmin(CONFIGURATION)
|
||||
val REF_TABLE_NAME = "__REF_TABLE"
|
||||
val VECTOR_TABLE_NAME = "__VECTOR_TABLE"
|
||||
val VECTOR_ELEMENT_COLUMN_FAMILY_NAME = "__VECTOR_ELEMENT"
|
||||
val MAP_KEY_COLUMN_FAMILY_NAME = "__MAP_KEY"
|
||||
val MAP_ELEMENT_COLUMN_FAMILY_NAME = "__MAP_ELEMENT"
|
||||
val MAP_TABLE_NAME = "__MAP_TABLE"
|
||||
var REF_TABLE: HTable = _
|
||||
var VECTOR_TABLE: HTable = _
|
||||
|
||||
CONFIGURATION.set("hbase.zookeeper.quorum", HBASE_ZOOKEEPER_QUORUM)
|
||||
|
||||
init
|
||||
|
||||
def init {
|
||||
if (!ADMIN.tableExists(REF_TABLE_NAME)) {
|
||||
ADMIN.createTable(new HTableDescriptor(REF_TABLE_NAME))
|
||||
ADMIN.disableTable(REF_TABLE_NAME)
|
||||
ADMIN.addColumn(REF_TABLE_NAME, new HColumnDescriptor("element"))
|
||||
ADMIN.enableTable(REF_TABLE_NAME)
|
||||
}
|
||||
REF_TABLE = new HTable(CONFIGURATION, REF_TABLE_NAME);
|
||||
|
||||
if (!ADMIN.tableExists(VECTOR_TABLE_NAME)) {
|
||||
ADMIN.createTable(new HTableDescriptor(VECTOR_TABLE_NAME))
|
||||
ADMIN.disableTable(VECTOR_TABLE_NAME)
|
||||
ADMIN.addColumn(VECTOR_TABLE_NAME, new HColumnDescriptor(VECTOR_ELEMENT_COLUMN_FAMILY_NAME))
|
||||
ADMIN.enableTable(VECTOR_TABLE_NAME);
|
||||
}
|
||||
VECTOR_TABLE = new HTable(CONFIGURATION, VECTOR_TABLE_NAME)
|
||||
}
|
||||
|
||||
def drop {
|
||||
if (ADMIN.tableExists(REF_TABLE_NAME)) {
|
||||
ADMIN.disableTable(REF_TABLE_NAME)
|
||||
ADMIN.deleteTable(REF_TABLE_NAME)
|
||||
}
|
||||
if (ADMIN.tableExists(VECTOR_TABLE_NAME)) {
|
||||
ADMIN.disableTable(VECTOR_TABLE_NAME)
|
||||
ADMIN.deleteTable(VECTOR_TABLE_NAME)
|
||||
}
|
||||
init
|
||||
}
|
||||
|
||||
// ===============================================================
|
||||
// For Ref
|
||||
// ===============================================================
|
||||
|
||||
def insertRefStorageFor(name: String, element: Array[Byte]) = {}
|
||||
def insertRefStorageFor(name: String, element: Array[Byte]) = {
|
||||
val row = new Put(Bytes.toBytes(name))
|
||||
row.add(Bytes.toBytes("element"), Bytes.toBytes("element"), element)
|
||||
REF_TABLE.put(row)
|
||||
}
|
||||
|
||||
def getRefStorageFor(name: String): Option[Array[Byte]] = {
|
||||
return None
|
||||
val row = new Get(Bytes.toBytes(name))
|
||||
val result = REF_TABLE.get(row)
|
||||
if (result.isEmpty()) {
|
||||
return None;
|
||||
} else {
|
||||
val element = result.getValue(Bytes.toBytes("element"), Bytes.toBytes("element"))
|
||||
return Some(element)
|
||||
}
|
||||
}
|
||||
|
||||
// ===============================================================
|
||||
// For Vector
|
||||
// ===============================================================
|
||||
|
||||
def insertVectorStorageEntryFor(name: String, element: Array[Byte]) = {}
|
||||
def insertVectorStorageEntryFor(name: String, element: Array[Byte]) = {
|
||||
val row = new Put(Bytes.toBytes(name))
|
||||
val size = getVectorStorageSizeFor(name)
|
||||
row.add(Bytes.toBytes(VECTOR_ELEMENT_COLUMN_FAMILY_NAME), Bytes.toBytes(size), element)
|
||||
row.add(Bytes.toBytes(VECTOR_ELEMENT_COLUMN_FAMILY_NAME), Bytes.toBytes("size"), Bytes.toBytes(size+1))
|
||||
VECTOR_TABLE.put(row)
|
||||
}
|
||||
|
||||
def insertVectorStorageEntriesFor(name: String, elements: List[Array[Byte]]) = elements.foreach(insertVectorStorageEntryFor(name, _))
|
||||
def insertVectorStorageEntriesFor(name: String, elements: List[Array[Byte]]) = elements.reverse.foreach(insertVectorStorageEntryFor(name, _))
|
||||
|
||||
def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]) = {}
|
||||
def updateVectorStorageEntryFor(name: String, index: Int, element: Array[Byte]) = {
|
||||
val row = new Put(Bytes.toBytes(name))
|
||||
row.add(Bytes.toBytes(VECTOR_ELEMENT_COLUMN_FAMILY_NAME), Bytes.toBytes(index), element)
|
||||
VECTOR_TABLE.put(row)
|
||||
}
|
||||
|
||||
def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] = {
|
||||
EMPTY_BYTE_ARRAY
|
||||
val row = new Get(Bytes.toBytes(name))
|
||||
val result = VECTOR_TABLE.get(row)
|
||||
val size = getVectorStorageSizeFor(name)
|
||||
val colnum = size - index - 1
|
||||
return result.getValue(Bytes.toBytes(VECTOR_ELEMENT_COLUMN_FAMILY_NAME),Bytes.toBytes(colnum))
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -61,11 +127,39 @@ private[akka] object HbaseStorageBackend extends MapStorageBackend[Array[Byte],
|
|||
* if <tt>start</tt> == 0 and <tt>finish</tt> == 0, return an empty collection
|
||||
*/
|
||||
def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[Array[Byte]] = {
|
||||
Nil
|
||||
val row = new Get(Bytes.toBytes(name))
|
||||
val result = VECTOR_TABLE.get(row)
|
||||
val size = Bytes.toInt(result.getValue(Bytes.toBytes(VECTOR_ELEMENT_COLUMN_FAMILY_NAME), Bytes.toBytes("size")))
|
||||
var listBuffer = new ListBuffer[Array[Byte]]
|
||||
|
||||
if(start.isDefined && finish.isDefined) {
|
||||
for(i <- start.get to finish.get-1) {
|
||||
val colnum = size - i - 1
|
||||
listBuffer += result.getValue(Bytes.toBytes(VECTOR_ELEMENT_COLUMN_FAMILY_NAME),Bytes.toBytes(colnum))
|
||||
}
|
||||
return listBuffer.toList
|
||||
} else {
|
||||
val b = start.getOrElse(0)
|
||||
val e = if(!finish.isDefined) {
|
||||
val ee: Int = b + count -1
|
||||
if(ee < size-1) ee else size-1
|
||||
}
|
||||
for(i <- b.asInstanceOf[Int] to e.asInstanceOf[Int]) {
|
||||
val colnum = size - i - 1
|
||||
listBuffer += result.getValue(Bytes.toBytes(VECTOR_ELEMENT_COLUMN_FAMILY_NAME),Bytes.toBytes(colnum))
|
||||
}
|
||||
return listBuffer.toList
|
||||
}
|
||||
}
|
||||
|
||||
def getVectorStorageSizeFor(name: String): Int = {
|
||||
0
|
||||
val row = new Get(Bytes.toBytes(name))
|
||||
val result = VECTOR_TABLE.get(row)
|
||||
if (result.isEmpty()) {
|
||||
0
|
||||
} else {
|
||||
Bytes.toInt(result.getValue(Bytes.toBytes(VECTOR_ELEMENT_COLUMN_FAMILY_NAME), Bytes.toBytes("size")))
|
||||
}
|
||||
}
|
||||
|
||||
// ===============================================================
|
||||
|
|
|
|||
|
|
@ -2,30 +2,35 @@ package se.scalablesolutions.akka.persistence.hbase
|
|||
|
||||
import org.scalatest.Spec
|
||||
import org.scalatest.matchers.ShouldMatchers
|
||||
import org.scalatest.BeforeAndAfterAll
|
||||
import org.scalatest.BeforeAndAfterEach
|
||||
import org.scalatest.junit.JUnitRunner
|
||||
import org.junit.runner.RunWith
|
||||
import java.util.NoSuchElementException
|
||||
|
||||
@RunWith(classOf[JUnitRunner])
|
||||
class HbaseStorageSpec extends
|
||||
Spec with
|
||||
ShouldMatchers with
|
||||
BeforeAndAfterEach {
|
||||
|
||||
Spec with
|
||||
ShouldMatchers with
|
||||
BeforeAndAfterAll with
|
||||
BeforeAndAfterEach {
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility
|
||||
|
||||
|
||||
val testUtil = new HBaseTestingUtility
|
||||
|
||||
override def beforeAll {
|
||||
testUtil.startMiniCluster
|
||||
}
|
||||
|
||||
override def afterAll {
|
||||
testUtil.shutdownMiniCluster
|
||||
}
|
||||
|
||||
override def beforeEach {
|
||||
testUtil.startMiniCluster
|
||||
//MongoStorageBackend.drop
|
||||
HbaseStorageBackend.drop
|
||||
}
|
||||
|
||||
|
||||
override def afterEach {
|
||||
testUtil.shutdownMiniCluster
|
||||
//MongoStorageBackend.drop
|
||||
HbaseStorageBackend.drop
|
||||
}
|
||||
|
||||
/*
|
||||
describe("persistent maps") {
|
||||
it("should insert with single key and value") {
|
||||
|
|
@ -85,10 +90,9 @@ class HbaseStorageSpec extends
|
|||
}
|
||||
*/
|
||||
|
||||
/*
|
||||
describe("persistent vectors") {
|
||||
it("should insert a single value") {
|
||||
import MongoStorageBackend._
|
||||
import HbaseStorageBackend._
|
||||
|
||||
insertVectorStorageEntryFor("t1", "martin odersky".getBytes)
|
||||
insertVectorStorageEntryFor("t1", "james gosling".getBytes)
|
||||
|
|
@ -97,7 +101,7 @@ class HbaseStorageSpec extends
|
|||
}
|
||||
|
||||
it("should insert multiple values") {
|
||||
import MongoStorageBackend._
|
||||
import HbaseStorageBackend._
|
||||
|
||||
insertVectorStorageEntryFor("t1", "martin odersky".getBytes)
|
||||
insertVectorStorageEntryFor("t1", "james gosling".getBytes)
|
||||
|
|
@ -110,21 +114,20 @@ class HbaseStorageSpec extends
|
|||
}
|
||||
|
||||
it("should fetch a range of values") {
|
||||
import MongoStorageBackend._
|
||||
import HbaseStorageBackend._
|
||||
|
||||
insertVectorStorageEntryFor("t1", "martin odersky".getBytes)
|
||||
insertVectorStorageEntryFor("t1", "james gosling".getBytes)
|
||||
getVectorStorageSizeFor("t1") should equal(2)
|
||||
insertVectorStorageEntriesFor("t1", List("ola bini".getBytes, "james strachan".getBytes, "dennis ritchie".getBytes))
|
||||
getVectorStorageRangeFor("t1", None, None, 100).map(new String(_)) should equal(List("ola bini", "james strachan", "dennis ritchie", "james gosling", "martin odersky"))
|
||||
getVectorStorageRangeFor("t1", Some(0), Some(5), 100).map(new String(_)) should equal(List("ola bini", "james strachan", "dennis ritchie", "james gosling", "martin odersky"))
|
||||
getVectorStorageRangeFor("t1", Some(2), Some(5), 100).map(new String(_)) should equal(List("dennis ritchie", "james gosling", "martin odersky"))
|
||||
|
||||
getVectorStorageSizeFor("t1") should equal(5)
|
||||
//getVectorStorageRangeFor("t1", Some(0), Some(5), 100).map(new String(_)) should equal(List("ola bini", "james strachan", "dennis ritchie", "james gosling", "martin odersky"))
|
||||
//getVectorStorageRangeFor("t1", Some(2), Some(5), 100).map(new String(_)) should equal(List("dennis ritchie", "james gosling", "martin odersky"))
|
||||
//getVectorStorageSizeFor("t1") should equal(5)
|
||||
}
|
||||
|
||||
it("should insert and query complex structures") {
|
||||
import MongoStorageBackend._
|
||||
import HbaseStorageBackend._
|
||||
import sjson.json.DefaultProtocol._
|
||||
import sjson.json.JsonSerialization._
|
||||
|
||||
|
|
@ -151,16 +154,16 @@ class HbaseStorageSpec extends
|
|||
getVectorStorageSizeFor("t2") should equal(3)
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
describe("persistent refs") {
|
||||
it("should insert a ref") {
|
||||
import HbaseStorageBackend._
|
||||
|
||||
|
||||
insertRefStorageFor("t1", "martin odersky".getBytes)
|
||||
//new String(getRefStorageFor("t1").get) should equal("martin odersky")
|
||||
//insertRefStorageFor("t1", "james gosling".getBytes)
|
||||
//new String(getRefStorageFor("t1").get) should equal("james gosling")
|
||||
//getRefStorageFor("t2") should equal(None)
|
||||
new String(getRefStorageFor("t1").get) should equal("martin odersky")
|
||||
insertRefStorageFor("t1", "james gosling".getBytes)
|
||||
new String(getRefStorageFor("t1").get) should equal("james gosling")
|
||||
getRefStorageFor("t2") should equal(None)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue