Aligned the hbase test to the new mechanism for optionally running integration tests

This commit is contained in:
David Greco 2010-09-24 09:17:29 +02:00
parent b76c76651f
commit 83b2450393
3 changed files with 586 additions and 0 deletions

View file

@ -0,0 +1,177 @@
package se.scalablesolutions.akka.persistence.hbase
import org.scalatest.Spec
import org.scalatest.matchers.ShouldMatchers
import org.scalatest.BeforeAndAfterAll
import org.scalatest.BeforeAndAfterEach
class HbaseStorageSpecTestIntegration extends
Spec with
ShouldMatchers with
BeforeAndAfterAll with
BeforeAndAfterEach {
import org.apache.hadoop.hbase.HBaseTestingUtility
val testUtil = new HBaseTestingUtility
override def beforeAll {
testUtil.startMiniCluster
}
override def afterAll {
testUtil.shutdownMiniCluster
}
override def beforeEach {
HbaseStorageBackend.drop
}
override def afterEach {
HbaseStorageBackend.drop
}
describe("persistent maps") {
it("should insert with single key and value") {
import HbaseStorageBackend._
insertMapStorageEntryFor("t1", "odersky".getBytes, "scala".getBytes)
insertMapStorageEntryFor("t1", "gosling".getBytes, "java".getBytes)
insertMapStorageEntryFor("t1", "stroustrup".getBytes, "c++".getBytes)
getMapStorageSizeFor("t1") should equal(3)
new String(getMapStorageEntryFor("t1", "odersky".getBytes).get) should equal("scala")
new String(getMapStorageEntryFor("t1", "gosling".getBytes).get) should equal("java")
new String(getMapStorageEntryFor("t1", "stroustrup".getBytes).get) should equal("c++")
getMapStorageEntryFor("t1", "torvalds".getBytes) should equal(None)
}
it("should insert with multiple keys and values") {
import HbaseStorageBackend._
val l = List(("stroustrup", "c++"), ("odersky", "scala"), ("gosling", "java"))
insertMapStorageEntriesFor("t1", l.map { case (k, v) => (k.getBytes, v.getBytes) })
getMapStorageSizeFor("t1") should equal(3)
new String(getMapStorageEntryFor("t1", "stroustrup".getBytes).get) should equal("c++")
new String(getMapStorageEntryFor("t1", "gosling".getBytes).get) should equal("java")
new String(getMapStorageEntryFor("t1", "odersky".getBytes).get) should equal("scala")
getMapStorageEntryFor("t1", "torvalds".getBytes) should equal(None)
getMapStorageEntryFor("t2", "torvalds".getBytes) should equal(None)
getMapStorageFor("t1").map { case (k, v) => (new String(k), new String(v)) } should equal (l)
removeMapStorageFor("t1", "gosling".getBytes)
getMapStorageSizeFor("t1") should equal(2)
removeMapStorageFor("t1")
getMapStorageSizeFor("t1") should equal(0)
}
it("should do proper range queries") {
import HbaseStorageBackend._
val l = List(
("bjarne stroustrup", "c++"),
("martin odersky", "scala"),
("james gosling", "java"),
("yukihiro matsumoto", "ruby"),
("slava pestov", "factor"),
("rich hickey", "clojure"),
("ola bini", "ioke"),
("dennis ritchie", "c"),
("larry wall", "perl"),
("guido van rossum", "python"),
("james strachan", "groovy"))
val rl = List(
("james gosling", "java"),
("james strachan", "groovy"),
("larry wall", "perl"),
("martin odersky", "scala"),
("ola bini", "ioke"), ("rich hickey", "clojure"),
("slava pestov", "factor"))
insertMapStorageEntriesFor("t1", l.map { case (k, v) => (k.getBytes, v.getBytes) })
getMapStorageSizeFor("t1") should equal(l.size)
getMapStorageRangeFor("t1", None, None, 100).map { case (k, v) => (new String(k), new String(v)) } should equal(l.sortWith(_._1 < _._1))
getMapStorageRangeFor("t1", Option("james gosling".getBytes), Option("slava pestov".getBytes), 100).map { case (k, v) => (new String(k), new String(v)) } should equal(rl.sortWith(_._1 < _._1))
getMapStorageRangeFor("t1", None, None, 5).map { case (k, v) => (new String(k), new String(v)) }.size should equal(5)
}
}
describe("persistent vectors") {
it("should insert a single value") {
import HbaseStorageBackend._
insertVectorStorageEntryFor("t1", "martin odersky".getBytes)
insertVectorStorageEntryFor("t1", "james gosling".getBytes)
new String(getVectorStorageEntryFor("t1", 0)) should equal("james gosling")
new String(getVectorStorageEntryFor("t1", 1)) should equal("martin odersky")
}
it("should insert multiple values") {
import HbaseStorageBackend._
insertVectorStorageEntryFor("t1", "martin odersky".getBytes)
insertVectorStorageEntryFor("t1", "james gosling".getBytes)
insertVectorStorageEntriesFor("t1", List("ola bini".getBytes, "james strachan".getBytes, "dennis ritchie".getBytes))
new String(getVectorStorageEntryFor("t1", 0)) should equal("ola bini")
new String(getVectorStorageEntryFor("t1", 1)) should equal("james strachan")
new String(getVectorStorageEntryFor("t1", 2)) should equal("dennis ritchie")
new String(getVectorStorageEntryFor("t1", 3)) should equal("james gosling")
new String(getVectorStorageEntryFor("t1", 4)) should equal("martin odersky")
}
it("should fetch a range of values") {
import HbaseStorageBackend._
insertVectorStorageEntryFor("t1", "martin odersky".getBytes)
insertVectorStorageEntryFor("t1", "james gosling".getBytes)
getVectorStorageSizeFor("t1") should equal(2)
insertVectorStorageEntriesFor("t1", List("ola bini".getBytes, "james strachan".getBytes, "dennis ritchie".getBytes))
getVectorStorageRangeFor("t1", None, None, 100).map(new String(_)) should equal(List("ola bini", "james strachan", "dennis ritchie", "james gosling", "martin odersky"))
getVectorStorageRangeFor("t1", Some(0), Some(5), 100).map(new String(_)) should equal(List("ola bini", "james strachan", "dennis ritchie", "james gosling", "martin odersky"))
getVectorStorageRangeFor("t1", Some(2), Some(5), 100).map(new String(_)) should equal(List("dennis ritchie", "james gosling", "martin odersky"))
getVectorStorageRangeFor("t1", Some(0), Some(0), 100).size should equal(0)
getVectorStorageSizeFor("t1") should equal(5)
}
it("should insert and query complex structures") {
import HbaseStorageBackend._
import sjson.json.DefaultProtocol._
import sjson.json.JsonSerialization._
// a list[AnyRef] should be added successfully
val l = List("ola bini".getBytes, tobinary(List(100, 200, 300)), tobinary(List(1, 2, 3)))
// for id = t1
insertVectorStorageEntriesFor("t1", l)
new String(getVectorStorageEntryFor("t1", 0)) should equal("ola bini")
frombinary[List[Int]](getVectorStorageEntryFor("t1", 1)) should equal(List(100, 200, 300))
frombinary[List[Int]](getVectorStorageEntryFor("t1", 2)) should equal(List(1, 2, 3))
getVectorStorageSizeFor("t1") should equal(3)
// some more for id = t1
val m = List(tobinary(Map(1 -> "dg", 2 -> "mc", 3 -> "nd")), tobinary(List("martin odersky", "james gosling")))
insertVectorStorageEntriesFor("t1", m)
// size should add up
getVectorStorageSizeFor("t1") should equal(5)
// now for a diff id
insertVectorStorageEntriesFor("t2", l)
getVectorStorageSizeFor("t2") should equal(3)
}
}
describe("persistent refs") {
it("should insert a ref") {
import HbaseStorageBackend._
insertRefStorageFor("t1", "martin odersky".getBytes)
new String(getRefStorageFor("t1").get) should equal("martin odersky")
insertRefStorageFor("t1", "james gosling".getBytes)
new String(getRefStorageFor("t1").get) should equal("james gosling")
getRefStorageFor("t2") should equal(None)
}
}
}

View file

@ -0,0 +1,347 @@
package se.scalablesolutions.akka.persistence.hbase
import org.scalatest.Spec
import org.scalatest.matchers.ShouldMatchers
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
import se.scalablesolutions.akka.config.OneForOneStrategy
import Actor._
import se.scalablesolutions.akka.stm.global._
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.util.Logging
import HbaseStorageBackend._
case class GET(k: String)
case class SET(k: String, v: String)
case class REM(k: String)
case class CONTAINS(k: String)
case object MAP_SIZE
case class MSET(kvs: List[(String, String)])
case class REMOVE_AFTER_PUT(kvsToAdd: List[(String, String)], ksToRem: List[String])
case class CLEAR_AFTER_PUT(kvsToAdd: List[(String, String)])
case class PUT_WITH_SLICE(kvsToAdd: List[(String, String)], start: String, cnt: Int)
case class PUT_REM_WITH_SLICE(kvsToAdd: List[(String, String)], ksToRem: List[String], start: String, cnt: Int)
case class VADD(v: String)
case class VUPD(i: Int, v: String)
case class VUPD_AND_ABORT(i: Int, v: String)
case class VGET(i: Int)
case object VSIZE
case class VGET_AFTER_VADD(vsToAdd: List[String], isToFetch: List[Int])
case class VADD_WITH_SLICE(vsToAdd: List[String], start: Int, cnt: Int)
object Storage {
class HbaseSampleMapStorage extends Actor {
self.lifeCycle = Some(LifeCycle(Permanent))
val FOO_MAP = "akka.sample.map"
private var fooMap = atomic { HbaseStorage.getMap(FOO_MAP) }
def receive = {
case SET(k, v) =>
atomic {
fooMap += (k.getBytes, v.getBytes)
}
self.reply((k, v))
case GET(k) =>
val v = atomic {
fooMap.get(k.getBytes).map(new String(_)).getOrElse(k + " Not found")
}
self.reply(v)
case REM(k) =>
val v = atomic {
fooMap -= k.getBytes
}
self.reply(k)
case CONTAINS(k) =>
val v = atomic {
fooMap contains k.getBytes
}
self.reply(v)
case MAP_SIZE =>
val v = atomic {
fooMap.size
}
self.reply(v)
case MSET(kvs) => atomic {
kvs.foreach {kv => fooMap += (kv._1.getBytes, kv._2.getBytes) }
}
self.reply(kvs.size)
case REMOVE_AFTER_PUT(kvs2add, ks2rem) => atomic {
kvs2add.foreach {kv =>
fooMap += (kv._1.getBytes, kv._2.getBytes)
}
ks2rem.foreach {k =>
fooMap -= k.getBytes
}}
self.reply(fooMap.size)
case CLEAR_AFTER_PUT(kvs2add) => atomic {
kvs2add.foreach {kv =>
fooMap += (kv._1.getBytes, kv._2.getBytes)
}
fooMap.clear
}
self.reply(true)
case PUT_WITH_SLICE(kvs2add, from, cnt) =>
val v = atomic {
kvs2add.foreach {kv =>
fooMap += (kv._1.getBytes, kv._2.getBytes)
}
fooMap.slice(Some(from.getBytes), cnt)
}
self.reply(v: List[(Array[Byte], Array[Byte])])
case PUT_REM_WITH_SLICE(kvs2add, ks2rem, from, cnt) =>
val v = atomic {
kvs2add.foreach {kv =>
fooMap += (kv._1.getBytes, kv._2.getBytes)
}
ks2rem.foreach {k =>
fooMap -= k.getBytes
}
fooMap.slice(Some(from.getBytes), cnt)
}
self.reply(v: List[(Array[Byte], Array[Byte])])
}
}
class HbaseSampleVectorStorage extends Actor {
self.lifeCycle = Some(LifeCycle(Permanent))
val FOO_VECTOR = "akka.sample.vector"
private var fooVector = atomic { HbaseStorage.getVector(FOO_VECTOR) }
def receive = {
case VADD(v) =>
val size =
atomic {
fooVector + v.getBytes
fooVector length
}
self.reply(size)
case VGET(index) =>
val ind =
atomic {
fooVector get index
}
self.reply(ind)
case VGET_AFTER_VADD(vs, is) =>
val els =
atomic {
vs.foreach(fooVector + _.getBytes)
(is.foldRight(List[Array[Byte]]())(fooVector.get(_) :: _)).map(new String(_))
}
self.reply(els)
case VUPD_AND_ABORT(index, value) =>
val l =
atomic {
fooVector.update(index, value.getBytes)
// force fail
fooVector get 100
}
self.reply(index)
case VADD_WITH_SLICE(vs, s, c) =>
val l =
atomic {
vs.foreach(fooVector + _.getBytes)
fooVector.slice(Some(s), None, c)
}
self.reply(l.map(new String(_)))
}
}
}
import Storage._
@RunWith(classOf[JUnitRunner])
class HbaseTicket343SpecTestIntegration extends Spec with ShouldMatchers with BeforeAndAfterAll with BeforeAndAfterEach {
import org.apache.hadoop.hbase.HBaseTestingUtility
val testUtil = new HBaseTestingUtility
override def beforeAll {
testUtil.startMiniCluster
}
override def afterAll {
testUtil.shutdownMiniCluster
}
override def beforeEach {
HbaseStorageBackend.drop
}
override def afterEach {
HbaseStorageBackend.drop
}
describe("Ticket 343 Issue #1") {
it("remove after put should work within the same transaction") {
val proc = actorOf[HbaseSampleMapStorage]
proc.start
(proc !! SET("debasish", "anshinsoft")).getOrElse("Set failed") should equal(("debasish", "anshinsoft"))
(proc !! GET("debasish")).getOrElse("Get failed") should equal("anshinsoft")
(proc !! MAP_SIZE).getOrElse("Size failed") should equal(1)
(proc !! MSET(List(("dg", "1"), ("mc", "2"), ("nd", "3")))).getOrElse("Mset failed") should equal(3)
(proc !! GET("dg")).getOrElse("Get failed") should equal("1")
(proc !! GET("mc")).getOrElse("Get failed") should equal("2")
(proc !! GET("nd")).getOrElse("Get failed") should equal("3")
(proc !! MAP_SIZE).getOrElse("Size failed") should equal(4)
val add = List(("a", "1"), ("b", "2"), ("c", "3"))
val rem = List("a", "debasish")
(proc !! REMOVE_AFTER_PUT(add, rem)).getOrElse("REMOVE_AFTER_PUT failed") should equal(5)
(proc !! GET("debasish")).getOrElse("debasish not found") should equal("debasish Not found")
(proc !! GET("a")).getOrElse("a not found") should equal("a Not found")
(proc !! GET("b")).getOrElse("b not found") should equal("2")
(proc !! CONTAINS("b")).getOrElse("b not found") should equal(true)
(proc !! CONTAINS("debasish")).getOrElse("debasish not found") should equal(false)
(proc !! MAP_SIZE).getOrElse("Size failed") should equal(5)
proc.stop
}
}
describe("Ticket 343 Issue #2") {
it("clear after put should work within the same transaction") {
val proc = actorOf[HbaseSampleMapStorage]
proc.start
(proc !! SET("debasish", "anshinsoft")).getOrElse("Set failed") should equal(("debasish", "anshinsoft"))
(proc !! GET("debasish")).getOrElse("Get failed") should equal("anshinsoft")
(proc !! MAP_SIZE).getOrElse("Size failed") should equal(1)
val add = List(("a", "1"), ("b", "2"), ("c", "3"))
(proc !! CLEAR_AFTER_PUT(add)).getOrElse("CLEAR_AFTER_PUT failed") should equal(true)
(proc !! MAP_SIZE).getOrElse("Size failed") should equal(0)
proc.stop
}
}
describe("Ticket 343 Issue #3") {
it("map size should change after the transaction") {
val proc = actorOf[HbaseSampleMapStorage]
proc.start
(proc !! SET("debasish", "anshinsoft")).getOrElse("Set failed") should equal(("debasish", "anshinsoft"))
(proc !! GET("debasish")).getOrElse("Get failed") should equal("anshinsoft")
(proc !! MAP_SIZE).getOrElse("Size failed") should equal(1)
(proc !! MSET(List(("dg", "1"), ("mc", "2"), ("nd", "3")))).getOrElse("Mset failed") should equal(3)
(proc !! MAP_SIZE).getOrElse("Size failed") should equal(4)
(proc !! GET("dg")).getOrElse("Get failed") should equal("1")
(proc !! GET("mc")).getOrElse("Get failed") should equal("2")
(proc !! GET("nd")).getOrElse("Get failed") should equal("3")
proc.stop
}
}
describe("slice test") {
it("should pass") {
val proc = actorOf[HbaseSampleMapStorage]
proc.start
(proc !! SET("debasish", "anshinsoft")).getOrElse("Set failed") should equal(("debasish", "anshinsoft"))
(proc !! GET("debasish")).getOrElse("Get failed") should equal("anshinsoft")
// (proc !! MAP_SIZE).getOrElse("Size failed") should equal(1)
(proc !! MSET(List(("dg", "1"), ("mc", "2"), ("nd", "3")))).getOrElse("Mset failed") should equal(3)
(proc !! MAP_SIZE).getOrElse("Size failed") should equal(4)
(proc !! PUT_WITH_SLICE(List(("ec", "1"), ("tb", "2"), ("mc", "10")), "dg", 3)).get.asInstanceOf[List[(Array[Byte], Array[Byte])]].map { case (k, v) => (new String(k), new String(v)) } should equal(List(("dg", "1"), ("ec", "1"), ("mc", "10")))
(proc !! PUT_REM_WITH_SLICE(List(("fc", "1"), ("gb", "2"), ("xy", "10")), List("tb", "fc"), "dg", 5)).get.asInstanceOf[List[(Array[Byte], Array[Byte])]].map { case (k, v) => (new String(k), new String(v)) } should equal(List(("dg", "1"), ("ec", "1"), ("gb", "2"), ("mc", "10"), ("nd", "3")))
proc.stop
}
}
describe("Ticket 343 Issue #4") {
it("vector get should not ignore elements that were in vector before transaction") {
val proc = actorOf[HbaseSampleVectorStorage]
proc.start
// add 4 elements in separate transactions
(proc !! VADD("debasish")).getOrElse("VADD failed") should equal(1)
(proc !! VADD("maulindu")).getOrElse("VADD failed") should equal(2)
(proc !! VADD("ramanendu")).getOrElse("VADD failed") should equal(3)
(proc !! VADD("nilanjan")).getOrElse("VADD failed") should equal(4)
new String((proc !! VGET(0)).get.asInstanceOf[Array[Byte]] ) should equal("nilanjan")
new String((proc !! VGET(1)).get.asInstanceOf[Array[Byte]] ) should equal("ramanendu")
new String((proc !! VGET(2)).get.asInstanceOf[Array[Byte]] ) should equal("maulindu")
new String((proc !! VGET(3)).get.asInstanceOf[Array[Byte]] ) should equal("debasish")
// now add 3 more and do gets in the same transaction
(proc !! VGET_AFTER_VADD(List("a", "b", "c"), List(0, 2, 4))).get.asInstanceOf[List[String]] should equal(List("c", "a", "ramanendu"))
proc.stop
}
}
describe("Ticket 343 Issue #6") {
it("vector update should not ignore transaction") {
val proc = actorOf[HbaseSampleVectorStorage]
proc.start
// add 4 elements in separate transactions
(proc !! VADD("debasish")).getOrElse("VADD failed") should equal(1)
(proc !! VADD("maulindu")).getOrElse("VADD failed") should equal(2)
(proc !! VADD("ramanendu")).getOrElse("VADD failed") should equal(3)
(proc !! VADD("nilanjan")).getOrElse("VADD failed") should equal(4)
evaluating {
(proc !! VUPD_AND_ABORT(0, "virat")).getOrElse("VUPD_AND_ABORT failed")
} should produce [Exception]
// update aborts and hence values will remain unchanged
new String((proc !! VGET(0)).get.asInstanceOf[Array[Byte]] ) should equal("nilanjan")
proc.stop
}
}
describe("Ticket 343 Issue #5") {
it("vector slice() should not ignore elements added in current transaction") {
val proc = actorOf[HbaseSampleVectorStorage]
proc.start
// add 4 elements in separate transactions
(proc !! VADD("debasish")).getOrElse("VADD failed") should equal(1)
(proc !! VADD("maulindu")).getOrElse("VADD failed") should equal(2)
(proc !! VADD("ramanendu")).getOrElse("VADD failed") should equal(3)
(proc !! VADD("nilanjan")).getOrElse("VADD failed") should equal(4)
// slice with no new elements added in current transaction
(proc !! VADD_WITH_SLICE(List(), 2, 2)).getOrElse("VADD_WITH_SLICE failed") should equal(Vector("maulindu", "debasish"))
// slice with new elements added in current transaction
(proc !! VADD_WITH_SLICE(List("a", "b", "c", "d"), 2, 2)).getOrElse("VADD_WITH_SLICE failed") should equal(Vector("b", "a"))
proc.stop
}
}
}

View file

@ -0,0 +1,62 @@
package se.scalablesolutions.akka.persistence.hbase
import org.scalatest.Spec
import org.scalatest.matchers.ShouldMatchers
import org.scalatest.BeforeAndAfterAll
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
import org.junit.Test
import org.apache.hadoop.hbase.HBaseTestingUtility
@RunWith(classOf[JUnitRunner])
class SimpleHbaseSpecTestIntegration extends Spec with BeforeAndAfterAll with ShouldMatchers {
import org.apache.hadoop.hbase.HBaseTestingUtility
val testUtil = new HBaseTestingUtility
override def beforeAll {
testUtil.startMiniCluster
}
override def afterAll {
testUtil.shutdownMiniCluster
}
describe("simple hbase persistence test") {
it("should create a table") {
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.HTableDescriptor
import org.apache.hadoop.hbase.HColumnDescriptor
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.client.HTable
val descriptor = new HTableDescriptor(Bytes.toBytes("ATable"))
descriptor.addFamily(new HColumnDescriptor(Bytes.toBytes("Family1")))
descriptor.addFamily(new HColumnDescriptor(Bytes.toBytes("Family2")))
val admin = new HBaseAdmin(testUtil.getConfiguration)
admin.createTable(descriptor)
val table = new HTable(testUtil.getConfiguration, Bytes.toBytes("ATable"))
table should not equal (null)
}
it("should use the quorum read from the akka configuration and access the table") {
import se.scalablesolutions.akka.config.Config.config
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.client.HTable
val HBASE_ZOOKEEPER_QUORUM = config.getString("akka.storage.hbase.zookeeper-quorum", "0")
HBASE_ZOOKEEPER_QUORUM should not equal ("0")
HBASE_ZOOKEEPER_QUORUM should equal("localhost")
val configuration = new HBaseConfiguration
configuration.set("hbase.zookeeper.quorum", HBASE_ZOOKEEPER_QUORUM)
val admin = new HBaseAdmin(configuration)
admin.tableExists("ATable") should equal(true)
}
}
}