Get the (non-multi-jvm) cluster tests working again

- avoid initialising the cluster in TransactionLog
- allow TransactionLog to be started and shutdown multiple times
- correct the startup and shutdown in the transaction log tests
This commit is contained in:
Peter Vlugter 2011-09-08 17:19:27 +02:00
parent d8390a61f6
commit 0430e284a6
5 changed files with 58 additions and 41 deletions

View file

@ -84,7 +84,7 @@ class TransactionLog private (
def recordEntry(entry: Array[Byte]) {
if (isOpen.isOn) {
val entryBytes =
if (Cluster.shouldCompressData) LZF.compress(entry)
if (shouldCompressData) LZF.compress(entry)
else entry
try {
@ -118,7 +118,7 @@ class TransactionLog private (
def recordSnapshot(snapshot: Array[Byte]) {
if (isOpen.isOn) {
val snapshotBytes =
if (Cluster.shouldCompressData) LZF.compress(snapshot)
if (shouldCompressData) LZF.compress(snapshot)
else snapshot
try {
@ -311,7 +311,7 @@ class TransactionLog private (
while (enumeration.hasMoreElements) {
val bytes = enumeration.nextElement.getEntry
val entry =
if (Cluster.shouldCompressData) LZF.uncompress(bytes)
if (shouldCompressData) LZF.uncompress(bytes)
else bytes
entries = entries :+ entry
}
@ -356,6 +356,10 @@ class TransactionLog private (
*/
object TransactionLog {
val zooKeeperServers = config.getString("akka.cluster.zookeeper-server-addresses", "localhost:2181")
val sessionTimeout = Duration(config.getInt("akka.cluster.session-timeout", 60), TIME_UNIT).toMillis.toInt
val connectionTimeout = Duration(config.getInt("akka.cluster.connection-timeout", 60), TIME_UNIT).toMillis.toInt
val digestType = config.getString("akka.cluster.replication.digest-type", "CRC32") match {
case "CRC32" BookKeeper.DigestType.CRC32
case "MAC" BookKeeper.DigestType.MAC
@ -367,40 +371,17 @@ object TransactionLog {
val quorumSize = config.getInt("akka.cluster.replication.quorum-size", 2)
val snapshotFrequency = config.getInt("akka.cluster.replication.snapshot-frequency", 1000)
val timeout = Duration(config.getInt("akka.cluster.replication.timeout", 30), TIME_UNIT).toMillis
val shouldCompressData = config.getBool("akka.cluster.use-compression", false)
private[akka] val transactionLogNode = "/transaction-log-ids"
private val isConnected = new Switch(false)
private[akka] lazy val (bookieClient, zkClient) = {
val bk = new BookKeeper(Cluster.zooKeeperServers)
@volatile
private[akka] var bookieClient: BookKeeper = _
val zk = new AkkaZkClient(
Cluster.zooKeeperServers,
Cluster.sessionTimeout,
Cluster.connectionTimeout,
Cluster.defaultZooKeeperSerializer)
try {
zk.create(transactionLogNode, null, CreateMode.PERSISTENT)
} catch {
case e: ZkNodeExistsException {} // do nothing
case e: Throwable handleError(e)
}
EventHandler.info(this,
("Transaction log service started with" +
"\n\tdigest type [%s]" +
"\n\tensemble size [%s]" +
"\n\tquorum size [%s]" +
"\n\tlogging time out [%s]").format(
digestType,
ensembleSize,
quorumSize,
timeout))
isConnected.switchOn
(bk, zk)
}
@volatile
private[akka] var zkClient: AkkaZkClient = _
private[akka] def apply(
ledger: LedgerHandle,
@ -409,6 +390,34 @@ object TransactionLog {
replicationScheme: ReplicationScheme) =
new TransactionLog(ledger, id, isAsync, replicationScheme)
/**
* Starts up the transaction log.
*/
def start(): Unit = {
isConnected switchOn {
bookieClient = new BookKeeper(zooKeeperServers)
zkClient = new AkkaZkClient(zooKeeperServers, sessionTimeout, connectionTimeout)
try {
zkClient.create(transactionLogNode, null, CreateMode.PERSISTENT)
} catch {
case e: ZkNodeExistsException {} // do nothing
case e: Throwable handleError(e)
}
EventHandler.info(this,
("Transaction log service started with" +
"\n\tdigest type [%s]" +
"\n\tensemble size [%s]" +
"\n\tquorum size [%s]" +
"\n\tlogging time out [%s]").format(
digestType,
ensembleSize,
quorumSize,
timeout))
}
}
/**
* Shuts down the transaction log.
*/
@ -575,10 +584,12 @@ object LocalBookKeeperEnsemble {
*/
def start() {
isRunning switchOn {
EventHandler.info(this, "Starting up LocalBookKeeperEnsemble...")
localBookKeeper = new LocalBookKeeper(TransactionLog.ensembleSize)
localBookKeeper.runZookeeper(port)
localBookKeeper.initializeZookeper()
localBookKeeper.runBookies()
EventHandler.info(this, "LocalBookKeeperEnsemble started up successfully")
}
}

View file

@ -9,6 +9,8 @@ import org.scalatest.matchers.MustMatchers
import org.scalatest.BeforeAndAfterAll
import akka.actor._
import akka.event.EventHandler
import akka.testkit.{ EventFilter, TestEvent }
import com.eaio.uuid.UUID
@ -49,8 +51,10 @@ class AsynchronousTransactionLogSpec extends WordSpec with MustMatchers with Bef
}
"fail to be opened if non existing - asynchronous" in {
EventHandler.notify(TestEvent.Mute(EventFilter[ReplicationException]))
val uuid = (new UUID).toString
intercept[ReplicationException](TransactionLog.logFor(uuid, true, null))
EventHandler.notify(TestEvent.UnMuteAll)
}
"be able to overweite an existing txlog if one already exists - asynchronous" in {
@ -67,6 +71,7 @@ class AsynchronousTransactionLogSpec extends WordSpec with MustMatchers with Bef
}
"be able to record and delete entries - asynchronous" in {
EventHandler.notify(TestEvent.Mute(EventFilter[ReplicationException]))
val uuid = (new UUID).toString
val txlog1 = TransactionLog.newLogFor(uuid, true, null)
Thread.sleep(200)
@ -78,6 +83,7 @@ class AsynchronousTransactionLogSpec extends WordSpec with MustMatchers with Bef
txlog1.delete
Thread.sleep(200)
intercept[ReplicationException](TransactionLog.logFor(uuid, true, null))
EventHandler.notify(TestEvent.UnMuteAll)
}
"be able to record entries and read entries with 'entriesInRange' - asynchronous" in {
@ -214,14 +220,11 @@ class AsynchronousTransactionLogSpec extends WordSpec with MustMatchers with Bef
override def beforeAll() = {
LocalBookKeeperEnsemble.start()
TransactionLog.start()
}
override def afterAll() = {
Cluster.node.shutdown()
LocalCluster.shutdownLocalCluster()
TransactionLog.shutdown()
LocalBookKeeperEnsemble.shutdown()
Actor.registry.local.shutdownAll()
Scheduler.shutdown()
}
}

View file

@ -9,6 +9,8 @@ import org.scalatest.matchers.MustMatchers
import org.scalatest.BeforeAndAfterAll
import akka.actor._
import akka.event.EventHandler
import akka.testkit.{ EventFilter, TestEvent }
import com.eaio.uuid.UUID
@ -33,8 +35,10 @@ class SynchronousTransactionLogSpec extends WordSpec with MustMatchers with Befo
}
"fail to be opened if non existing - synchronous" in {
EventHandler.notify(TestEvent.Mute(EventFilter[ReplicationException]))
val uuid = (new UUID).toString
intercept[ReplicationException](TransactionLog.logFor(uuid, false, null))
EventHandler.notify(TestEvent.UnMuteAll)
}
"be able to be checked for existence - synchronous" in {
@ -175,16 +179,12 @@ class SynchronousTransactionLogSpec extends WordSpec with MustMatchers with Befo
}
override def beforeAll() = {
LocalCluster.startLocalCluster()
LocalBookKeeperEnsemble.start()
TransactionLog.start()
}
override def afterAll() = {
Cluster.node.shutdown()
LocalCluster.shutdownLocalCluster()
TransactionLog.shutdown()
LocalBookKeeperEnsemble.shutdown()
Actor.registry.local.shutdownAll()
Scheduler.shutdown()
}
}

View file

@ -232,10 +232,13 @@ class TestActorRefSpec extends WordSpec with MustMatchers with BeforeAndAfterEac
EventHandler.notify(TestEvent.Mute(filter))
val log = TestActorRef[Logger]
EventHandler.addListener(log)
val eventHandlerLevel = EventHandler.level
EventHandler.level = EventHandler.WarningLevel
boss link ref
val la = log.underlyingActor
la.count must be(1)
la.msg must (include("supervisor") and include("CallingThreadDispatcher"))
EventHandler.level = eventHandlerLevel
EventHandler.removeListener(log)
EventHandler.notify(TestEvent.UnMute(filter))
}

View file

@ -6,5 +6,5 @@ include "akka-reference.conf"
akka {
event-handlers = ["akka.testkit.TestEventListener"]
event-handler-level = "INFO"
event-handler-level = "ERROR"
}