Split up the TransactionLogSpec into two tests: AsynchronousTransactionLogSpec and SynchronousTransactionLogSpec, also did various minor edits and comments.

Signed-off-by: Jonas Bonér <jonas@jonasboner.com>
This commit is contained in:
Jonas Bonér 2011-08-15 10:42:09 +02:00
parent 5b2b463ce0
commit 48ee9deb3a
5 changed files with 214 additions and 168 deletions

View file

@ -403,13 +403,6 @@ class DefaultClusterNode private[akka] (
def membershipNodes: Array[String] = locallyCachedMembershipNodes.toList.toArray.asInstanceOf[Array[String]]
private[akka] val nodeConnections: ConcurrentMap[String, Tuple2[InetSocketAddress, ActorRef]] = {
val conns = new ConcurrentHashMap[String, Tuple2[InetSocketAddress, ActorRef]]
if (includeRefNodeInReplicaSet)
conns.put(nodeAddress.nodeName, (remoteServerAddress, remoteDaemon)) // add the remote connection to 'this' node as well, but as a 'local' actor
conns
}
// zookeeper listeners
private val stateListener = new StateListener(this)
private val membershipListener = new MembershipChildListener(this)
@ -420,6 +413,17 @@ class DefaultClusterNode private[akka] (
// Address -> ClusterActorRef
private val clusterActorRefs = new Index[InetSocketAddress, ClusterActorRef]
// ============================================================================================================
// ========== WARNING: THESE FIELDS AND EVERYTHING USING THEM IN THE CONSTRUCTOR NEEDS TO BE LAZY =============
// ============================================================================================================
lazy private[akka] val nodeConnections: ConcurrentMap[String, Tuple2[InetSocketAddress, ActorRef]] = {
val conns = new ConcurrentHashMap[String, Tuple2[InetSocketAddress, ActorRef]]
if (includeRefNodeInReplicaSet)
conns.put(nodeAddress.nodeName, (remoteServerAddress, remoteDaemon)) // add the remote connection to 'this' node as well, but as a 'local' actor
conns
}
// ZooKeeper client
lazy private[cluster] val zkClient = new AkkaZkClient(zkServerAddresses, sessionTimeout, connectionTimeout, serializer)
@ -441,6 +445,8 @@ class DefaultClusterNode private[akka] (
LEADER_ELECTION_PATH, null,
leaderElectionCallback)
// ============================================================================================================
if (enableJMX) createMBean
// =======================================
@ -476,6 +482,7 @@ class DefaultClusterNode private[akka] (
}
def shutdown() {
def shutdownNode() {
ignore[ZkNoNodeException](zkClient.deleteRecursive(membershipNodePath))

View file

@ -407,7 +407,7 @@ class ActiveRemoteClient private[akka] (
//Please note that this method does _not_ remove the ARC from the NettyRemoteClientModule's map of clients
def shutdown() = runSwitch switchOff {
EventHandler.info(this, "Shutting down [%s]".format(name))
EventHandler.info(this, "Shutting down remote client [%s]".format(name))
notifyListeners(RemoteClientShutdown(module, remoteAddress))
timer.stop()
@ -655,6 +655,7 @@ class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String,
serverModule.notifyListeners(RemoteServerStarted(serverModule))
def shutdown() {
EventHandler.info(this, "Shutting down remote server [%s]".format(name))
try {
val shutdownSignal = {
val b = RemoteControlProtocol.newBuilder.setCommandType(CommandType.SHUTDOWN)

View file

@ -78,7 +78,7 @@ class ClusterActorRefCleanupMultiJvmNode1 extends MasterClusterTestNode {
case e: RoutingException
}
//since the call to the node failed, the node must have been removed from the list.
//since the call to the node failed, the node must have been removed from the list.
clusteredRef.connectionsSize must be(1)
//send a message to this node,

View file

@ -8,170 +8,14 @@ import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import org.scalatest.BeforeAndAfterAll
import akka.actor._
import com.eaio.uuid.UUID
class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterAll {
class AsynchronousTransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterAll {
private var bookKeeper: BookKeeper = _
private var localBookKeeper: LocalBookKeeper = _
"A synchronous used Transaction Log" should {
"be able to be deleted - synchronous" in {
val uuid = (new UUID).toString
val txlog = TransactionLog.newLogFor(uuid, false, null)
val entry = "hello".getBytes("UTF-8")
txlog.recordEntry(entry)
txlog.delete()
txlog.close()
val zkClient = TransactionLog.zkClient
assert(zkClient.readData(txlog.snapshotPath, true) == null)
assert(zkClient.readData(txlog.txLogPath, true) == null)
}
"fail to be opened if non existing - synchronous" in {
val uuid = (new UUID).toString
intercept[ReplicationException](TransactionLog.logFor(uuid, false, null))
}
"be able to be checked for existence - synchronous" in {
val uuid = (new UUID).toString
TransactionLog.exists(uuid) must be(false)
TransactionLog.newLogFor(uuid, false, null)
TransactionLog.exists(uuid) must be(true)
}
"be able to record entries - synchronous" in {
val uuid = (new UUID).toString
val txlog = TransactionLog.newLogFor(uuid, false, null)
val entry = "hello".getBytes("UTF-8")
txlog.recordEntry(entry)
}
"be able to overweite an existing txlog if one already exists - synchronous" in {
val uuid = (new UUID).toString
val txlog1 = TransactionLog.newLogFor(uuid, false, null)
val entry = "hello".getBytes("UTF-8")
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
txlog1.close
val txLog2 = TransactionLog.newLogFor(uuid, false, null)
txLog2.latestSnapshotId.isDefined must be(false)
txLog2.latestEntryId must be(-1)
}
"be able to record and delete entries - synchronous" in {
val uuid = (new UUID).toString
val txlog1 = TransactionLog.newLogFor(uuid, false, null)
val entry = "hello".getBytes("UTF-8")
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
txlog1.delete
txlog1.close
intercept[ReplicationException](TransactionLog.logFor(uuid, false, null))
}
"be able to record entries and read entries with 'entriesInRange' - synchronous" in {
val uuid = (new UUID).toString
val txlog1 = TransactionLog.newLogFor(uuid, false, null)
val entry = "hello".getBytes("UTF-8")
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
txlog1.close
val txlog2 = TransactionLog.logFor(uuid, false, null)
val entries = txlog2.entriesInRange(0, 1).map(bytes new String(bytes, "UTF-8"))
entries.size must equal(2)
entries(0) must equal("hello")
entries(1) must equal("hello")
txlog2.close
}
"be able to record entries and read entries with 'entries' - synchronous" in {
val uuid = (new UUID).toString
val txlog1 = TransactionLog.newLogFor(uuid, false, null)
val entry = "hello".getBytes("UTF-8")
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
// txlog1.close // should work without txlog.close
val txlog2 = TransactionLog.logFor(uuid, false, null)
val entries = txlog2.entries.map(bytes new String(bytes, "UTF-8"))
entries.size must equal(4)
entries(0) must equal("hello")
entries(1) must equal("hello")
entries(2) must equal("hello")
entries(3) must equal("hello")
txlog2.close
}
"be able to record a snapshot - synchronous" in {
val uuid = (new UUID).toString
val txlog1 = TransactionLog.newLogFor(uuid, false, null)
val snapshot = "snapshot".getBytes("UTF-8")
txlog1.recordSnapshot(snapshot)
txlog1.close
}
"be able to record and read a snapshot and following entries - synchronous" in {
val uuid = (new UUID).toString
val txlog1 = TransactionLog.newLogFor(uuid, false, null)
val snapshot = "snapshot".getBytes("UTF-8")
txlog1.recordSnapshot(snapshot)
val entry = "hello".getBytes("UTF-8")
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
txlog1.close
val txlog2 = TransactionLog.logFor(uuid, false, null)
val (snapshotAsBytes, entriesAsBytes) = txlog2.latestSnapshotAndSubsequentEntries
new String(snapshotAsBytes.getOrElse(fail("No snapshot")), "UTF-8") must equal("snapshot")
val entries = entriesAsBytes.map(bytes new String(bytes, "UTF-8"))
entries.size must equal(4)
entries(0) must equal("hello")
entries(1) must equal("hello")
entries(2) must equal("hello")
entries(3) must equal("hello")
txlog2.close
}
"be able to record entries then a snapshot then more entries - and then read from the snapshot and the following entries - synchronous" in {
val uuid = (new UUID).toString
val txlog1 = TransactionLog.newLogFor(uuid, false, null)
val entry = "hello".getBytes("UTF-8")
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
val snapshot = "snapshot".getBytes("UTF-8")
txlog1.recordSnapshot(snapshot)
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
txlog1.close
val txlog2 = TransactionLog.logFor(uuid, false, null)
val (snapshotAsBytes, entriesAsBytes) = txlog2.latestSnapshotAndSubsequentEntries
new String(snapshotAsBytes.getOrElse(fail("No snapshot")), "UTF-8") must equal("snapshot")
val entries = entriesAsBytes.map(bytes new String(bytes, "UTF-8"))
entries.size must equal(2)
entries(0) must equal("hello")
entries(1) must equal("hello")
txlog2.close
}
}
"An asynchronous Transaction Log" should {
"be able to record entries - asynchronous" in {
val uuid = (new UUID).toString
@ -373,7 +217,11 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA
}
override def afterAll() = {
Cluster.node.shutdown()
Cluster.shutdownLocalCluster()
TransactionLog.shutdown()
LocalBookKeeperEnsemble.shutdown()
Actor.registry.local.shutdownAll()
Scheduler.shutdown()
}
}

View file

@ -0,0 +1,190 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import org.apache.bookkeeper.client.BookKeeper
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import org.scalatest.BeforeAndAfterAll
import akka.actor._
import com.eaio.uuid.UUID
class SynchronousTransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterAll {
private var bookKeeper: BookKeeper = _
private var localBookKeeper: LocalBookKeeper = _
"A synchronous used Transaction Log" should {
"be able to be deleted - synchronous" in {
val uuid = (new UUID).toString
val txlog = TransactionLog.newLogFor(uuid, false, null)
val entry = "hello".getBytes("UTF-8")
txlog.recordEntry(entry)
txlog.delete()
txlog.close()
val zkClient = TransactionLog.zkClient
assert(zkClient.readData(txlog.snapshotPath, true) == null)
assert(zkClient.readData(txlog.txLogPath, true) == null)
}
"fail to be opened if non existing - synchronous" in {
val uuid = (new UUID).toString
intercept[ReplicationException](TransactionLog.logFor(uuid, false, null))
}
"be able to be checked for existence - synchronous" in {
val uuid = (new UUID).toString
TransactionLog.exists(uuid) must be(false)
TransactionLog.newLogFor(uuid, false, null)
TransactionLog.exists(uuid) must be(true)
}
"be able to record entries - synchronous" in {
val uuid = (new UUID).toString
val txlog = TransactionLog.newLogFor(uuid, false, null)
val entry = "hello".getBytes("UTF-8")
txlog.recordEntry(entry)
}
"be able to overweite an existing txlog if one already exists - synchronous" in {
val uuid = (new UUID).toString
val txlog1 = TransactionLog.newLogFor(uuid, false, null)
val entry = "hello".getBytes("UTF-8")
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
txlog1.close
val txLog2 = TransactionLog.newLogFor(uuid, false, null)
txLog2.latestSnapshotId.isDefined must be(false)
txLog2.latestEntryId must be(-1)
}
"be able to record and delete entries - synchronous" in {
val uuid = (new UUID).toString
val txlog1 = TransactionLog.newLogFor(uuid, false, null)
val entry = "hello".getBytes("UTF-8")
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
txlog1.delete
txlog1.close
// intercept[ReplicationException](TransactionLog.logFor(uuid, false, null))
}
"be able to record entries and read entries with 'entriesInRange' - synchronous" in {
val uuid = (new UUID).toString
val txlog1 = TransactionLog.newLogFor(uuid, false, null)
val entry = "hello".getBytes("UTF-8")
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
txlog1.close
val txlog2 = TransactionLog.logFor(uuid, false, null)
val entries = txlog2.entriesInRange(0, 1).map(bytes new String(bytes, "UTF-8"))
entries.size must equal(2)
entries(0) must equal("hello")
entries(1) must equal("hello")
txlog2.close
}
"be able to record entries and read entries with 'entries' - synchronous" in {
val uuid = (new UUID).toString
val txlog1 = TransactionLog.newLogFor(uuid, false, null)
val entry = "hello".getBytes("UTF-8")
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
txlog1.close // should work without txlog.close
val txlog2 = TransactionLog.logFor(uuid, false, null)
val entries = txlog2.entries.map(bytes new String(bytes, "UTF-8"))
entries.size must equal(4)
entries(0) must equal("hello")
entries(1) must equal("hello")
entries(2) must equal("hello")
entries(3) must equal("hello")
txlog2.close
}
"be able to record a snapshot - synchronous" in {
val uuid = (new UUID).toString
val txlog1 = TransactionLog.newLogFor(uuid, false, null)
val snapshot = "snapshot".getBytes("UTF-8")
txlog1.recordSnapshot(snapshot)
txlog1.close
}
"be able to record and read a snapshot and following entries - synchronous" in {
val uuid = (new UUID).toString
val txlog1 = TransactionLog.newLogFor(uuid, false, null)
val snapshot = "snapshot".getBytes("UTF-8")
txlog1.recordSnapshot(snapshot)
val entry = "hello".getBytes("UTF-8")
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
txlog1.close
val txlog2 = TransactionLog.logFor(uuid, false, null)
val (snapshotAsBytes, entriesAsBytes) = txlog2.latestSnapshotAndSubsequentEntries
new String(snapshotAsBytes.getOrElse(fail("No snapshot")), "UTF-8") must equal("snapshot")
val entries = entriesAsBytes.map(bytes new String(bytes, "UTF-8"))
entries.size must equal(4)
entries(0) must equal("hello")
entries(1) must equal("hello")
entries(2) must equal("hello")
entries(3) must equal("hello")
txlog2.close
}
"be able to record entries then a snapshot then more entries - and then read from the snapshot and the following entries - synchronous" in {
val uuid = (new UUID).toString
val txlog1 = TransactionLog.newLogFor(uuid, false, null)
val entry = "hello".getBytes("UTF-8")
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
val snapshot = "snapshot".getBytes("UTF-8")
txlog1.recordSnapshot(snapshot)
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
txlog1.close
val txlog2 = TransactionLog.logFor(uuid, false, null)
val (snapshotAsBytes, entriesAsBytes) = txlog2.latestSnapshotAndSubsequentEntries
new String(snapshotAsBytes.getOrElse(fail("No snapshot")), "UTF-8") must equal("snapshot")
val entries = entriesAsBytes.map(bytes new String(bytes, "UTF-8"))
entries.size must equal(2)
entries(0) must equal("hello")
entries(1) must equal("hello")
txlog2.close
}
}
override def beforeAll() = {
Cluster.startLocalCluster()
LocalBookKeeperEnsemble.start()
}
override def afterAll() = {
Cluster.node.shutdown()
Cluster.shutdownLocalCluster()
TransactionLog.shutdown()
LocalBookKeeperEnsemble.shutdown()
Actor.registry.local.shutdownAll()
Scheduler.shutdown()
}
}