From eb81dcf0df9719e73c6cafc7b2dee3fcd0fc39cb Mon Sep 17 00:00:00 2001 From: David Greco Date: Fri, 17 Sep 2010 17:44:24 +0200 Subject: [PATCH] Now all the tests used to pass with Mongo and Cassandra are passing --- .../src/main/scala/HbaseStorageBackend.scala | 16 +- .../test/scala/HbasePersistentActorSpec.scala | 174 ++++++++++++++++++ .../src/test/scala/HbaseStorageSpec.scala | 6 +- 3 files changed, 192 insertions(+), 4 deletions(-) create mode 100644 akka-persistence/akka-persistence-hbase/src/test/scala/HbasePersistentActorSpec.scala diff --git a/akka-persistence/akka-persistence-hbase/src/main/scala/HbaseStorageBackend.scala b/akka-persistence/akka-persistence-hbase/src/main/scala/HbaseStorageBackend.scala index e477cfe7e3..e61ff24537 100644 --- a/akka-persistence/akka-persistence-hbase/src/main/scala/HbaseStorageBackend.scala +++ b/akka-persistence/akka-persistence-hbase/src/main/scala/HbaseStorageBackend.scala @@ -229,7 +229,21 @@ private[akka] object HbaseStorageBackend extends MapStorageBackend[Array[Byte], MAP_TABLE.delete(row) } + //TODO implement the start and the finish semantic def getMapStorageRangeFor(name: String, start: Option[Array[Byte]], finish: Option[Array[Byte]], count: Int): List[Tuple2[Array[Byte], Array[Byte]]] = { - Nil + val row = new Get(Bytes.toBytes(name)) + val result = MAP_TABLE.get(row) + val iterator = result.getFamilyMap(Bytes.toBytes(MAP_ELEMENT_COLUMN_FAMILY_NAME)).entrySet.iterator + val listBuffer = new ListBuffer[Tuple2[Array[Byte], Array[Byte]]] + val size = result.size + + val cnt = if(count > size) size else count + var i: Int = 0 + while(iterator.hasNext && i < cnt) { + val raw = iterator.next + listBuffer += Tuple2(raw.asInstanceOf[java.util.Map.Entry[Array[Byte], Array[Byte]]].getKey, raw.asInstanceOf[java.util.Map.Entry[Array[Byte],Array[Byte]]].getValue) + i = i+1 + } + listBuffer.toList } } diff --git a/akka-persistence/akka-persistence-hbase/src/test/scala/HbasePersistentActorSpec.scala b/akka-persistence/akka-persistence-hbase/src/test/scala/HbasePersistentActorSpec.scala new file mode 100644 index 0000000000..468cd800ce --- /dev/null +++ b/akka-persistence/akka-persistence-hbase/src/test/scala/HbasePersistentActorSpec.scala @@ -0,0 +1,174 @@ +package se.scalablesolutions.akka.persistence.hbase + +import se.scalablesolutions.akka.actor.{ Actor, ActorRef, Transactor } +import Actor._ + +import org.junit.Test +import org.junit.Assert._ +import org.junit.BeforeClass +import org.junit.Before +import org.junit.AfterClass +import org.junit.After + +import org.scalatest.junit.JUnitSuite +import org.scalatest.BeforeAndAfterAll +import org.apache.hadoop.hbase.HBaseTestingUtility + +case class GetMapState(key: String) +case object GetVectorState +case object GetVectorSize +case object GetRefState + +case class SetMapState(key: String, value: String) +case class SetVectorState(key: String) +case class SetRefState(key: String) +case class Success(key: String, value: String) +case class Failure(key: String, value: String, failer: ActorRef) + +case class SetMapStateOneWay(key: String, value: String) +case class SetVectorStateOneWay(key: String) +case class SetRefStateOneWay(key: String) +case class SuccessOneWay(key: String, value: String) +case class FailureOneWay(key: String, value: String, failer: ActorRef) + +class HbasePersistentActor extends Transactor { + self.timeout = 100000 + + private lazy val mapState = HbaseStorage.newMap + private lazy val vectorState = HbaseStorage.newVector + private lazy val refState = HbaseStorage.newRef + + def receive = { + case GetMapState(key) => + self.reply(mapState.get(key.getBytes("UTF-8")).get) + case GetVectorSize => + self.reply(vectorState.length.asInstanceOf[AnyRef]) + case GetRefState => + self.reply(refState.get.get) + case SetMapState(key, msg) => + mapState.put(key.getBytes("UTF-8"), msg.getBytes("UTF-8")) + self.reply(msg) + case SetVectorState(msg) => + vectorState.add(msg.getBytes("UTF-8")) + self.reply(msg) + case SetRefState(msg) => + refState.swap(msg.getBytes("UTF-8")) + self.reply(msg) + case Success(key, msg) => + mapState.put(key.getBytes("UTF-8"), msg.getBytes("UTF-8")) + vectorState.add(msg.getBytes("UTF-8")) + refState.swap(msg.getBytes("UTF-8")) + self.reply(msg) + case Failure(key, msg, failer) => + mapState.put(key.getBytes("UTF-8"), msg.getBytes("UTF-8")) + vectorState.add(msg.getBytes("UTF-8")) + refState.swap(msg.getBytes("UTF-8")) + failer !! "Failure" + self.reply(msg) + } +} + +@serializable +class PersistentFailerActor extends Transactor { + def receive = { + case "Failure" => + throw new RuntimeException("Expected exception; to test fault-tolerance") + } +} + +class HbasePersistentActorSpec extends JUnitSuite with BeforeAndAfterAll { + + val testUtil = new HBaseTestingUtility + + override def beforeAll { + testUtil.startMiniCluster + } + + override def afterAll { + testUtil.shutdownMiniCluster + } + + @Before + def beforeEach { + HbaseStorageBackend.drop + } + + @After + def afterEach { + HbaseStorageBackend.drop + } + + @Test + def testMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess = { + val stateful = actorOf[HbasePersistentActor] + stateful.start + stateful !! SetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init") // set init state + stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired + val result = (stateful !! GetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")).as[Array[Byte]].get + assertEquals("new state", new String(result, 0, result.length, "UTF-8")) + } + + @Test + def testMapShouldRollbackStateForStatefulServerInCaseOfFailure = { + val stateful = actorOf[HbasePersistentActor] + stateful.start + stateful !! SetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init") // set init state + val failer = actorOf[PersistentFailerActor] + failer.start + try { + stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method + fail("should have thrown an exception") + } catch { case e: RuntimeException => {} } + val result = (stateful !! GetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")).as[Array[Byte]].get + assertEquals("init", new String(result, 0, result.length, "UTF-8")) // check that state is == init state + } + + @Test + def testVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess = { + val stateful = actorOf[HbasePersistentActor] + stateful.start + stateful !! SetVectorState("init") // set init state + stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired + assertEquals(2, (stateful !! GetVectorSize).get.asInstanceOf[java.lang.Integer].intValue) + } + + @Test + def testVectorShouldRollbackStateForStatefulServerInCaseOfFailure = { + val stateful = actorOf[HbasePersistentActor] + stateful.start + stateful !! SetVectorState("init") // set init state + val failer = actorOf[PersistentFailerActor] + failer.start + try { + stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method + fail("should have thrown an exception") + } catch { case e: RuntimeException => {} } + assertEquals(1, (stateful !! GetVectorSize).get.asInstanceOf[java.lang.Integer].intValue) + } + + @Test + def testRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess = { + val stateful = actorOf[HbasePersistentActor] + stateful.start + stateful !! SetRefState("init") // set init state + stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired + val result = (stateful !! GetRefState).as[Array[Byte]].get + assertEquals("new state", new String(result, 0, result.length, "UTF-8")) + } + + @Test + def testRefShouldRollbackStateForStatefulServerInCaseOfFailure = { + val stateful = actorOf[HbasePersistentActor] + stateful.start + stateful !! SetRefState("init") // set init state + val failer = actorOf[PersistentFailerActor] + failer.start + try { + stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method + fail("should have thrown an exception") + } catch { case e: RuntimeException => {} } + val result = (stateful !! GetRefState).as[Array[Byte]].get + assertEquals("init", new String(result, 0, result.length, "UTF-8")) // check that state is == init state + } + +} diff --git a/akka-persistence/akka-persistence-hbase/src/test/scala/HbaseStorageSpec.scala b/akka-persistence/akka-persistence-hbase/src/test/scala/HbaseStorageSpec.scala index a52a10cb34..c7613e1b5f 100644 --- a/akka-persistence/akka-persistence-hbase/src/test/scala/HbaseStorageSpec.scala +++ b/akka-persistence/akka-persistence-hbase/src/test/scala/HbaseStorageSpec.scala @@ -66,9 +66,9 @@ BeforeAndAfterEach { removeMapStorageFor("t1") getMapStorageSizeFor("t1") should equal(0) } -/* + it("should do proper range queries") { - import MongoStorageBackend._ + import HbaseStorageBackend._ val l = List( ("bjarne stroustrup", "c++"), ("martin odersky", "scala"), @@ -86,7 +86,7 @@ BeforeAndAfterEach { getMapStorageRangeFor("t1", None, None, 100).map { case (k, v) => (new String(k), new String(v)) } should equal(l.sortWith(_._1 < _._1)) getMapStorageRangeFor("t1", None, None, 5).map { case (k, v) => (new String(k), new String(v)) }.size should equal(5) } - */ + } describe("persistent vectors") {