From e81a1d36780a8859d7610a122f3fa32fde8f7ab2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Fri, 17 Jun 2011 10:25:33 +0200 Subject: [PATCH] Added transaction log spec --- .../akka/cluster/TransactionLogSpec.scala | 281 ++++++++++++++++++ 1 file changed, 281 insertions(+) create mode 100644 akka-cluster/src/test/scala/akka/cluster/TransactionLogSpec.scala diff --git a/akka-cluster/src/test/scala/akka/cluster/TransactionLogSpec.scala b/akka-cluster/src/test/scala/akka/cluster/TransactionLogSpec.scala new file mode 100644 index 0000000000..e0b7452af4 --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/TransactionLogSpec.scala @@ -0,0 +1,281 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB + */ +package akka.cluster + +import org.apache.bookkeeper.client.{ BookKeeper, BKException } +import BKException._ +import org.apache.zookeeper.server.ZooKeeperServer + +import org.scalatest.WordSpec +import org.scalatest.matchers.MustMatchers +import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach, Spec } + +import akka.serialization._ +import akka.actor._ +import ActorSerialization._ +import Actor._ + +import java.util.concurrent.{ CyclicBarrier, TimeUnit } +import java.io.File +import java.nio.ByteBuffer + +import com.eaio.uuid.UUID + +import scala.collection.JavaConversions._ + +class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterAll { + private var bookKeeper: BookKeeper = _ + private var localBookKeeper: LocalBookKeeper = _ + + // synchronous API + "A Transaction Log" should { + "be able to record entries - synchronous" in { + val uuid = (new UUID).toString + val txlog = TransactionLog.newLogFor(uuid, false, null, JavaSerializer) + val entry = "hello".getBytes("UTF-8") + txlog.recordEntry(entry) + } + + "be able to record and delete entries - synchronous" in { + val uuid = (new UUID).toString + val txlog1 = TransactionLog.newLogFor(uuid, false, null, JavaSerializer) + val entry = "hello".getBytes("UTF-8") + txlog1.recordEntry(entry) + txlog1.recordEntry(entry) + txlog1.delete + txlog1.close + intercept[BKNoSuchLedgerExistsException](TransactionLog.logFor(uuid, false, null, JavaSerializer)) + } + + "be able to record entries and read entries with 'entriesInRange' - synchronous" in { + val uuid = (new UUID).toString + val txlog1 = TransactionLog.newLogFor(uuid, false, null, JavaSerializer) + val entry = "hello".getBytes("UTF-8") + txlog1.recordEntry(entry) + txlog1.recordEntry(entry) + txlog1.close + + val txlog2 = TransactionLog.logFor(uuid, false, null, JavaSerializer) + val entries = txlog2.entriesInRange(0, 1).map(bytes ⇒ new String(bytes, "UTF-8")) + entries.size must equal(2) + entries(0) must equal("hello") + entries(1) must equal("hello") + txlog2.close + } + + "be able to record entries and read entries with 'entries' - synchronous" in { + val uuid = (new UUID).toString + val txlog1 = TransactionLog.newLogFor(uuid, false, null, JavaSerializer) + val entry = "hello".getBytes("UTF-8") + txlog1.recordEntry(entry) + txlog1.recordEntry(entry) + txlog1.recordEntry(entry) + txlog1.recordEntry(entry) + txlog1.close + + val txlog2 = TransactionLog.logFor(uuid, false, null, JavaSerializer) + val entries = txlog2.entries.map(bytes ⇒ new String(bytes, "UTF-8")) + entries.size must equal(4) + entries(0) must equal("hello") + entries(1) must equal("hello") + entries(2) must equal("hello") + entries(3) must equal("hello") + txlog2.close + } + + "be able to record a snapshot - synchronous" in { + val uuid = (new UUID).toString + val txlog1 = TransactionLog.newLogFor(uuid, false, null, JavaSerializer) + val snapshot = "snapshot".getBytes("UTF-8") + txlog1.recordSnapshot(snapshot) + txlog1.close + } + + "be able to record and read a snapshot and following entries - synchronous" in { + val uuid = (new UUID).toString + val txlog1 = TransactionLog.newLogFor(uuid, false, null, JavaSerializer) + val snapshot = "snapshot".getBytes("UTF-8") + txlog1.recordSnapshot(snapshot) + + val entry = "hello".getBytes("UTF-8") + txlog1.recordEntry(entry) + txlog1.recordEntry(entry) + txlog1.recordEntry(entry) + txlog1.recordEntry(entry) + txlog1.close + + val txlog2 = TransactionLog.logFor(uuid, false, null, JavaSerializer) + val (snapshotAsBytes, entriesAsBytes) = txlog2.toByteArraysLatestSnapshot + new String(snapshotAsBytes, "UTF-8") must equal("snapshot") + + val entries = entriesAsBytes.map(bytes ⇒ new String(bytes, "UTF-8")) + entries.size must equal(4) + entries(0) must equal("hello") + entries(1) must equal("hello") + entries(2) must equal("hello") + entries(3) must equal("hello") + txlog2.close + } + + "be able to record entries then a snapshot then more entries - and then read from the snapshot and the following entries - synchronous" in { + val uuid = (new UUID).toString + val txlog1 = TransactionLog.newLogFor(uuid, false, null, JavaSerializer) + + val entry = "hello".getBytes("UTF-8") + txlog1.recordEntry(entry) + txlog1.recordEntry(entry) + txlog1.recordEntry(entry) + + val snapshot = "snapshot".getBytes("UTF-8") + txlog1.recordSnapshot(snapshot) + + txlog1.recordEntry(entry) + txlog1.recordEntry(entry) + txlog1.close + + val txlog2 = TransactionLog.logFor(uuid, false, null, JavaSerializer) + val (snapshotAsBytes, entriesAsBytes) = txlog2.toByteArraysLatestSnapshot + new String(snapshotAsBytes, "UTF-8") must equal("snapshot") + + val entries = entriesAsBytes.map(bytes ⇒ new String(bytes, "UTF-8")) + entries.size must equal(2) + entries(0) must equal("hello") + entries(1) must equal("hello") + txlog2.close + } + } + + "A Transaction Log" should { + "be able to record entries - asynchronous" in { + val uuid = (new UUID).toString + val txlog = TransactionLog.newLogFor(uuid, true, null, JavaSerializer) + val entry = "hello".getBytes("UTF-8") + txlog.recordEntry(entry) + Thread.sleep(100) + txlog.close + } + + "be able to record and delete entries - asynchronous" in { + val uuid = (new UUID).toString + val txlog1 = TransactionLog.newLogFor(uuid, true, null, JavaSerializer) + val entry = "hello".getBytes("UTF-8") + txlog1.recordEntry(entry) + txlog1.recordEntry(entry) + txlog1.delete + Thread.sleep(100) + intercept[BKNoSuchLedgerExistsException](TransactionLog.logFor(uuid, true, null, JavaSerializer)) + } + "be able to record entries and read entries with 'entriesInRange' - asynchronous" in { + val uuid = (new UUID).toString + val txlog1 = TransactionLog.newLogFor(uuid, true, null, JavaSerializer) + val entry = "hello".getBytes("UTF-8") + txlog1.recordEntry(entry) + txlog1.recordEntry(entry) + Thread.sleep(100) + txlog1.close + + val txlog2 = TransactionLog.logFor(uuid, true, null, JavaSerializer) + val entries = txlog2.entriesInRange(0, 1).map(bytes ⇒ new String(bytes, "UTF-8")) + entries.size must equal(2) + entries(0) must equal("hello") + entries(1) must equal("hello") + Thread.sleep(100) + txlog2.close + } + + "be able to record entries and read entries with 'entries' - asynchronous" in { + val uuid = (new UUID).toString + val txlog1 = TransactionLog.newLogFor(uuid, true, null, JavaSerializer) + val entry = "hello".getBytes("UTF-8") + txlog1.recordEntry(entry) + txlog1.recordEntry(entry) + txlog1.recordEntry(entry) + txlog1.recordEntry(entry) + Thread.sleep(100) + txlog1.close + + val txlog2 = TransactionLog.logFor(uuid, true, null, JavaSerializer) + val entries = txlog2.entries.map(bytes ⇒ new String(bytes, "UTF-8")) + entries.size must equal(4) + entries(0) must equal("hello") + entries(1) must equal("hello") + entries(2) must equal("hello") + entries(3) must equal("hello") + Thread.sleep(100) + txlog2.close + } + + "be able to record a snapshot - asynchronous" in { + val uuid = (new UUID).toString + val txlog1 = TransactionLog.newLogFor(uuid, true, null, JavaSerializer) + val snapshot = "snapshot".getBytes("UTF-8") + txlog1.recordSnapshot(snapshot) + Thread.sleep(100) + txlog1.close + } + + "be able to record and read a snapshot and following entries - asynchronous" in { + val uuid = (new UUID).toString + val txlog1 = TransactionLog.newLogFor(uuid, true, null, JavaSerializer) + val snapshot = "snapshot".getBytes("UTF-8") + txlog1.recordSnapshot(snapshot) + + val entry = "hello".getBytes("UTF-8") + txlog1.recordEntry(entry) + txlog1.recordEntry(entry) + txlog1.recordEntry(entry) + txlog1.recordEntry(entry) + Thread.sleep(100) + txlog1.close + + val txlog2 = TransactionLog.logFor(uuid, true, null, JavaSerializer) + val (snapshotAsBytes, entriesAsBytes) = txlog2.toByteArraysLatestSnapshot + new String(snapshotAsBytes, "UTF-8") must equal("snapshot") + + val entries = entriesAsBytes.map(bytes ⇒ new String(bytes, "UTF-8")) + entries.size must equal(4) + entries(0) must equal("hello") + entries(1) must equal("hello") + entries(2) must equal("hello") + entries(3) must equal("hello") + Thread.sleep(100) + txlog2.close + } + + "be able to record entries then a snapshot then more entries - and then read from the snapshot and the following entries - asynchronous" in { + val uuid = (new UUID).toString + val txlog1 = TransactionLog.newLogFor(uuid, true, null, JavaSerializer) + + val entry = "hello".getBytes("UTF-8") + txlog1.recordEntry(entry) + txlog1.recordEntry(entry) + txlog1.recordEntry(entry) + val snapshot = "snapshot".getBytes("UTF-8") + txlog1.recordSnapshot(snapshot) + txlog1.recordEntry(entry) + txlog1.recordEntry(entry) + Thread.sleep(100) + txlog1.close + + val txlog2 = TransactionLog.logFor(uuid, true, null, JavaSerializer) + val (snapshotAsBytes, entriesAsBytes) = txlog2.toByteArraysLatestSnapshot + new String(snapshotAsBytes, "UTF-8") must equal("snapshot") + val entries = entriesAsBytes.map(bytes ⇒ new String(bytes, "UTF-8")) + entries.size must equal(2) + entries(0) must equal("hello") + entries(1) must equal("hello") + Thread.sleep(100) + txlog2.close + } + } + + override def beforeAll() = { + LocalBookKeeperEnsemble.start() + } + + override def afterAll() = { + TransactionLog.shutdown() + LocalBookKeeperEnsemble.shutdown() + } +}