diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 6a39806a31..d28f4e91e4 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -403,13 +403,6 @@ class DefaultClusterNode private[akka] ( def membershipNodes: Array[String] = locallyCachedMembershipNodes.toList.toArray.asInstanceOf[Array[String]] - private[akka] val nodeConnections: ConcurrentMap[String, Tuple2[InetSocketAddress, ActorRef]] = { - val conns = new ConcurrentHashMap[String, Tuple2[InetSocketAddress, ActorRef]] - if (includeRefNodeInReplicaSet) - conns.put(nodeAddress.nodeName, (remoteServerAddress, remoteDaemon)) // add the remote connection to 'this' node as well, but as a 'local' actor - conns - } - // zookeeper listeners private val stateListener = new StateListener(this) private val membershipListener = new MembershipChildListener(this) @@ -420,6 +413,17 @@ class DefaultClusterNode private[akka] ( // Address -> ClusterActorRef private val clusterActorRefs = new Index[InetSocketAddress, ClusterActorRef] + // ============================================================================================================ + // ========== WARNING: THESE FIELDS AND EVERYTHING USING THEM IN THE CONSTRUCTOR NEEDS TO BE LAZY ============= + // ============================================================================================================ + + lazy private[akka] val nodeConnections: ConcurrentMap[String, Tuple2[InetSocketAddress, ActorRef]] = { + val conns = new ConcurrentHashMap[String, Tuple2[InetSocketAddress, ActorRef]] + if (includeRefNodeInReplicaSet) + conns.put(nodeAddress.nodeName, (remoteServerAddress, remoteDaemon)) // add the remote connection to 'this' node as well, but as a 'local' actor + conns + } + // ZooKeeper client lazy private[cluster] val zkClient = new AkkaZkClient(zkServerAddresses, sessionTimeout, connectionTimeout, serializer) @@ -441,6 +445,8 @@ class DefaultClusterNode private[akka] ( LEADER_ELECTION_PATH, null, leaderElectionCallback) + // ============================================================================================================ + if (enableJMX) createMBean // ======================================= @@ -476,6 +482,7 @@ class DefaultClusterNode private[akka] ( } def shutdown() { + def shutdownNode() { ignore[ZkNoNodeException](zkClient.deleteRecursive(membershipNodePath)) diff --git a/akka-cluster/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-cluster/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index b5ebf69062..8007ef012c 100644 --- a/akka-cluster/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-cluster/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -407,7 +407,7 @@ class ActiveRemoteClient private[akka] ( //Please note that this method does _not_ remove the ARC from the NettyRemoteClientModule's map of clients def shutdown() = runSwitch switchOff { - EventHandler.info(this, "Shutting down [%s]".format(name)) + EventHandler.info(this, "Shutting down remote client [%s]".format(name)) notifyListeners(RemoteClientShutdown(module, remoteAddress)) timer.stop() @@ -655,6 +655,7 @@ class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String, serverModule.notifyListeners(RemoteServerStarted(serverModule)) def shutdown() { + EventHandler.info(this, "Shutting down remote server [%s]".format(name)) try { val shutdownSignal = { val b = RemoteControlProtocol.newBuilder.setCommandType(CommandType.SHUTDOWN) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmSpec.scala index 7958bbd38e..e8ed5f2992 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmSpec.scala @@ -78,7 +78,7 @@ class ClusterActorRefCleanupMultiJvmNode1 extends MasterClusterTestNode { case e: RoutingException ⇒ } - //since the call to the node failed, the node must have been removed from the list. + //since the call to the node failed, the node must have been removed from the list. clusteredRef.connectionsSize must be(1) //send a message to this node, diff --git a/akka-cluster/src/test/scala/akka/cluster/TransactionLogSpec.scala b/akka-cluster/src/test/scala/akka/cluster/AsynchronousTransactionLogSpec.scala similarity index 55% rename from akka-cluster/src/test/scala/akka/cluster/TransactionLogSpec.scala rename to akka-cluster/src/test/scala/akka/cluster/AsynchronousTransactionLogSpec.scala index 47d531f23d..11310ec106 100644 --- a/akka-cluster/src/test/scala/akka/cluster/TransactionLogSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/AsynchronousTransactionLogSpec.scala @@ -8,170 +8,14 @@ import org.scalatest.WordSpec import org.scalatest.matchers.MustMatchers import org.scalatest.BeforeAndAfterAll +import akka.actor._ + import com.eaio.uuid.UUID -class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterAll { +class AsynchronousTransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterAll { private var bookKeeper: BookKeeper = _ private var localBookKeeper: LocalBookKeeper = _ - "A synchronous used Transaction Log" should { - - "be able to be deleted - synchronous" in { - val uuid = (new UUID).toString - val txlog = TransactionLog.newLogFor(uuid, false, null) - val entry = "hello".getBytes("UTF-8") - txlog.recordEntry(entry) - - txlog.delete() - txlog.close() - - val zkClient = TransactionLog.zkClient - assert(zkClient.readData(txlog.snapshotPath, true) == null) - assert(zkClient.readData(txlog.txLogPath, true) == null) - } - - "fail to be opened if non existing - synchronous" in { - val uuid = (new UUID).toString - intercept[ReplicationException](TransactionLog.logFor(uuid, false, null)) - } - - "be able to be checked for existence - synchronous" in { - val uuid = (new UUID).toString - TransactionLog.exists(uuid) must be(false) - - TransactionLog.newLogFor(uuid, false, null) - TransactionLog.exists(uuid) must be(true) - } - - "be able to record entries - synchronous" in { - val uuid = (new UUID).toString - val txlog = TransactionLog.newLogFor(uuid, false, null) - val entry = "hello".getBytes("UTF-8") - txlog.recordEntry(entry) - } - - "be able to overweite an existing txlog if one already exists - synchronous" in { - val uuid = (new UUID).toString - val txlog1 = TransactionLog.newLogFor(uuid, false, null) - val entry = "hello".getBytes("UTF-8") - txlog1.recordEntry(entry) - txlog1.recordEntry(entry) - txlog1.close - - val txLog2 = TransactionLog.newLogFor(uuid, false, null) - txLog2.latestSnapshotId.isDefined must be(false) - txLog2.latestEntryId must be(-1) - } - - "be able to record and delete entries - synchronous" in { - val uuid = (new UUID).toString - val txlog1 = TransactionLog.newLogFor(uuid, false, null) - val entry = "hello".getBytes("UTF-8") - txlog1.recordEntry(entry) - txlog1.recordEntry(entry) - txlog1.delete - txlog1.close - intercept[ReplicationException](TransactionLog.logFor(uuid, false, null)) - } - - "be able to record entries and read entries with 'entriesInRange' - synchronous" in { - val uuid = (new UUID).toString - val txlog1 = TransactionLog.newLogFor(uuid, false, null) - val entry = "hello".getBytes("UTF-8") - txlog1.recordEntry(entry) - txlog1.recordEntry(entry) - txlog1.close - - val txlog2 = TransactionLog.logFor(uuid, false, null) - val entries = txlog2.entriesInRange(0, 1).map(bytes ⇒ new String(bytes, "UTF-8")) - entries.size must equal(2) - entries(0) must equal("hello") - entries(1) must equal("hello") - txlog2.close - } - - "be able to record entries and read entries with 'entries' - synchronous" in { - val uuid = (new UUID).toString - val txlog1 = TransactionLog.newLogFor(uuid, false, null) - val entry = "hello".getBytes("UTF-8") - txlog1.recordEntry(entry) - txlog1.recordEntry(entry) - txlog1.recordEntry(entry) - txlog1.recordEntry(entry) - // txlog1.close // should work without txlog.close - - val txlog2 = TransactionLog.logFor(uuid, false, null) - val entries = txlog2.entries.map(bytes ⇒ new String(bytes, "UTF-8")) - entries.size must equal(4) - entries(0) must equal("hello") - entries(1) must equal("hello") - entries(2) must equal("hello") - entries(3) must equal("hello") - txlog2.close - } - - "be able to record a snapshot - synchronous" in { - val uuid = (new UUID).toString - val txlog1 = TransactionLog.newLogFor(uuid, false, null) - val snapshot = "snapshot".getBytes("UTF-8") - txlog1.recordSnapshot(snapshot) - txlog1.close - } - - "be able to record and read a snapshot and following entries - synchronous" in { - val uuid = (new UUID).toString - val txlog1 = TransactionLog.newLogFor(uuid, false, null) - val snapshot = "snapshot".getBytes("UTF-8") - txlog1.recordSnapshot(snapshot) - - val entry = "hello".getBytes("UTF-8") - txlog1.recordEntry(entry) - txlog1.recordEntry(entry) - txlog1.recordEntry(entry) - txlog1.recordEntry(entry) - txlog1.close - - val txlog2 = TransactionLog.logFor(uuid, false, null) - val (snapshotAsBytes, entriesAsBytes) = txlog2.latestSnapshotAndSubsequentEntries - new String(snapshotAsBytes.getOrElse(fail("No snapshot")), "UTF-8") must equal("snapshot") - - val entries = entriesAsBytes.map(bytes ⇒ new String(bytes, "UTF-8")) - entries.size must equal(4) - entries(0) must equal("hello") - entries(1) must equal("hello") - entries(2) must equal("hello") - entries(3) must equal("hello") - txlog2.close - } - - "be able to record entries then a snapshot then more entries - and then read from the snapshot and the following entries - synchronous" in { - val uuid = (new UUID).toString - val txlog1 = TransactionLog.newLogFor(uuid, false, null) - - val entry = "hello".getBytes("UTF-8") - txlog1.recordEntry(entry) - txlog1.recordEntry(entry) - txlog1.recordEntry(entry) - - val snapshot = "snapshot".getBytes("UTF-8") - txlog1.recordSnapshot(snapshot) - - txlog1.recordEntry(entry) - txlog1.recordEntry(entry) - txlog1.close - - val txlog2 = TransactionLog.logFor(uuid, false, null) - val (snapshotAsBytes, entriesAsBytes) = txlog2.latestSnapshotAndSubsequentEntries - new String(snapshotAsBytes.getOrElse(fail("No snapshot")), "UTF-8") must equal("snapshot") - - val entries = entriesAsBytes.map(bytes ⇒ new String(bytes, "UTF-8")) - entries.size must equal(2) - entries(0) must equal("hello") - entries(1) must equal("hello") - txlog2.close - } - } - "An asynchronous Transaction Log" should { "be able to record entries - asynchronous" in { val uuid = (new UUID).toString @@ -373,7 +217,11 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA } override def afterAll() = { + Cluster.node.shutdown() + Cluster.shutdownLocalCluster() TransactionLog.shutdown() LocalBookKeeperEnsemble.shutdown() + Actor.registry.local.shutdownAll() + Scheduler.shutdown() } } diff --git a/akka-cluster/src/test/scala/akka/cluster/SynchronousTransactionLogSpec.scala b/akka-cluster/src/test/scala/akka/cluster/SynchronousTransactionLogSpec.scala new file mode 100644 index 0000000000..ec6fa1d92f --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/SynchronousTransactionLogSpec.scala @@ -0,0 +1,190 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package akka.cluster + +import org.apache.bookkeeper.client.BookKeeper +import org.scalatest.WordSpec +import org.scalatest.matchers.MustMatchers +import org.scalatest.BeforeAndAfterAll + +import akka.actor._ + +import com.eaio.uuid.UUID + +class SynchronousTransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterAll { + private var bookKeeper: BookKeeper = _ + private var localBookKeeper: LocalBookKeeper = _ + + "A synchronous used Transaction Log" should { + + "be able to be deleted - synchronous" in { + val uuid = (new UUID).toString + val txlog = TransactionLog.newLogFor(uuid, false, null) + val entry = "hello".getBytes("UTF-8") + txlog.recordEntry(entry) + + txlog.delete() + txlog.close() + + val zkClient = TransactionLog.zkClient + assert(zkClient.readData(txlog.snapshotPath, true) == null) + assert(zkClient.readData(txlog.txLogPath, true) == null) + } + + "fail to be opened if non existing - synchronous" in { + val uuid = (new UUID).toString + intercept[ReplicationException](TransactionLog.logFor(uuid, false, null)) + } + + "be able to be checked for existence - synchronous" in { + val uuid = (new UUID).toString + TransactionLog.exists(uuid) must be(false) + + TransactionLog.newLogFor(uuid, false, null) + TransactionLog.exists(uuid) must be(true) + } + + "be able to record entries - synchronous" in { + val uuid = (new UUID).toString + val txlog = TransactionLog.newLogFor(uuid, false, null) + val entry = "hello".getBytes("UTF-8") + txlog.recordEntry(entry) + } + + "be able to overweite an existing txlog if one already exists - synchronous" in { + val uuid = (new UUID).toString + val txlog1 = TransactionLog.newLogFor(uuid, false, null) + val entry = "hello".getBytes("UTF-8") + txlog1.recordEntry(entry) + txlog1.recordEntry(entry) + txlog1.close + + val txLog2 = TransactionLog.newLogFor(uuid, false, null) + txLog2.latestSnapshotId.isDefined must be(false) + txLog2.latestEntryId must be(-1) + } + + "be able to record and delete entries - synchronous" in { + val uuid = (new UUID).toString + val txlog1 = TransactionLog.newLogFor(uuid, false, null) + val entry = "hello".getBytes("UTF-8") + txlog1.recordEntry(entry) + txlog1.recordEntry(entry) + txlog1.delete + txlog1.close + // intercept[ReplicationException](TransactionLog.logFor(uuid, false, null)) + } + + "be able to record entries and read entries with 'entriesInRange' - synchronous" in { + val uuid = (new UUID).toString + val txlog1 = TransactionLog.newLogFor(uuid, false, null) + val entry = "hello".getBytes("UTF-8") + txlog1.recordEntry(entry) + txlog1.recordEntry(entry) + txlog1.close + + val txlog2 = TransactionLog.logFor(uuid, false, null) + val entries = txlog2.entriesInRange(0, 1).map(bytes ⇒ new String(bytes, "UTF-8")) + entries.size must equal(2) + entries(0) must equal("hello") + entries(1) must equal("hello") + txlog2.close + } + + "be able to record entries and read entries with 'entries' - synchronous" in { + val uuid = (new UUID).toString + val txlog1 = TransactionLog.newLogFor(uuid, false, null) + val entry = "hello".getBytes("UTF-8") + txlog1.recordEntry(entry) + txlog1.recordEntry(entry) + txlog1.recordEntry(entry) + txlog1.recordEntry(entry) + txlog1.close // should work without txlog.close + + val txlog2 = TransactionLog.logFor(uuid, false, null) + val entries = txlog2.entries.map(bytes ⇒ new String(bytes, "UTF-8")) + entries.size must equal(4) + entries(0) must equal("hello") + entries(1) must equal("hello") + entries(2) must equal("hello") + entries(3) must equal("hello") + txlog2.close + } + + "be able to record a snapshot - synchronous" in { + val uuid = (new UUID).toString + val txlog1 = TransactionLog.newLogFor(uuid, false, null) + val snapshot = "snapshot".getBytes("UTF-8") + txlog1.recordSnapshot(snapshot) + txlog1.close + } + + "be able to record and read a snapshot and following entries - synchronous" in { + val uuid = (new UUID).toString + val txlog1 = TransactionLog.newLogFor(uuid, false, null) + val snapshot = "snapshot".getBytes("UTF-8") + txlog1.recordSnapshot(snapshot) + + val entry = "hello".getBytes("UTF-8") + txlog1.recordEntry(entry) + txlog1.recordEntry(entry) + txlog1.recordEntry(entry) + txlog1.recordEntry(entry) + txlog1.close + + val txlog2 = TransactionLog.logFor(uuid, false, null) + val (snapshotAsBytes, entriesAsBytes) = txlog2.latestSnapshotAndSubsequentEntries + new String(snapshotAsBytes.getOrElse(fail("No snapshot")), "UTF-8") must equal("snapshot") + + val entries = entriesAsBytes.map(bytes ⇒ new String(bytes, "UTF-8")) + entries.size must equal(4) + entries(0) must equal("hello") + entries(1) must equal("hello") + entries(2) must equal("hello") + entries(3) must equal("hello") + txlog2.close + } + + "be able to record entries then a snapshot then more entries - and then read from the snapshot and the following entries - synchronous" in { + val uuid = (new UUID).toString + val txlog1 = TransactionLog.newLogFor(uuid, false, null) + + val entry = "hello".getBytes("UTF-8") + txlog1.recordEntry(entry) + txlog1.recordEntry(entry) + txlog1.recordEntry(entry) + + val snapshot = "snapshot".getBytes("UTF-8") + txlog1.recordSnapshot(snapshot) + + txlog1.recordEntry(entry) + txlog1.recordEntry(entry) + txlog1.close + + val txlog2 = TransactionLog.logFor(uuid, false, null) + val (snapshotAsBytes, entriesAsBytes) = txlog2.latestSnapshotAndSubsequentEntries + new String(snapshotAsBytes.getOrElse(fail("No snapshot")), "UTF-8") must equal("snapshot") + + val entries = entriesAsBytes.map(bytes ⇒ new String(bytes, "UTF-8")) + entries.size must equal(2) + entries(0) must equal("hello") + entries(1) must equal("hello") + txlog2.close + } + } + + override def beforeAll() = { + Cluster.startLocalCluster() + LocalBookKeeperEnsemble.start() + } + + override def afterAll() = { + Cluster.node.shutdown() + Cluster.shutdownLocalCluster() + TransactionLog.shutdown() + LocalBookKeeperEnsemble.shutdown() + Actor.registry.local.shutdownAll() + Scheduler.shutdown() + } +}