Added test for Cluster ChangeListener: NodeConnected, more to come. Also fixed bug in Cluster
This commit is contained in:
parent
241831c5ee
commit
b93755080d
7 changed files with 87 additions and 4 deletions
|
|
@ -435,10 +435,10 @@ class DefaultClusterNode private[akka] (
|
||||||
/**
|
/**
|
||||||
* Registers a cluster change listener.
|
* Registers a cluster change listener.
|
||||||
*/
|
*/
|
||||||
def register(listener: ChangeListener): ClusterNode = if (isConnected.isOff) {
|
def register(listener: ChangeListener): ClusterNode = {
|
||||||
changeListeners.add(listener)
|
changeListeners.add(listener)
|
||||||
this
|
this
|
||||||
} else throw new IllegalStateException("Can not register 'ChangeListener' after the cluster node has been started")
|
}
|
||||||
|
|
||||||
private[cluster] def publish(change: ChangeNotification) {
|
private[cluster] def publish(change: ChangeNotification) {
|
||||||
changeListeners.iterator.foreach(_.notify(change, this))
|
changeListeners.iterator.foreach(_.notify(change, this))
|
||||||
|
|
|
||||||
|
|
@ -17,8 +17,6 @@ import java.util.concurrent.{ CyclicBarrier, TimeUnit }
|
||||||
|
|
||||||
import scala.collection.JavaConversions._
|
import scala.collection.JavaConversions._
|
||||||
|
|
||||||
// FIXME: Test sending all funs
|
|
||||||
|
|
||||||
class MyJavaSerializableActor extends Actor with Serializable {
|
class MyJavaSerializableActor extends Actor with Serializable {
|
||||||
var count = 0
|
var count = 0
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1 @@
|
||||||
|
akka.event-handler-level = "DEBUG"
|
||||||
|
|
@ -0,0 +1 @@
|
||||||
|
-Dakka.cluster.nodename=node1 -Dakka.cluster.port=9991
|
||||||
|
|
@ -0,0 +1 @@
|
||||||
|
akka.event-handler-level = "DEBUG"
|
||||||
|
|
@ -0,0 +1 @@
|
||||||
|
-Dakka.cluster.nodename=node2 -Dakka.cluster.port=9992
|
||||||
|
|
@ -0,0 +1,81 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package akka.cluster.changelisteners.nodeconnected
|
||||||
|
|
||||||
|
import org.scalatest.WordSpec
|
||||||
|
import org.scalatest.matchers.MustMatchers
|
||||||
|
import org.scalatest.BeforeAndAfterAll
|
||||||
|
|
||||||
|
import akka.cluster._
|
||||||
|
import ChangeListener._
|
||||||
|
|
||||||
|
import java.util.concurrent._
|
||||||
|
|
||||||
|
object NodeConnectedChangeListenerMultiJvmSpec {
|
||||||
|
var NrOfNodes = 2
|
||||||
|
}
|
||||||
|
|
||||||
|
class NodeConnectedChangeListenerMultiJvmNode1 extends WordSpec with MustMatchers with BeforeAndAfterAll {
|
||||||
|
import NodeConnectedChangeListenerMultiJvmSpec._
|
||||||
|
|
||||||
|
"A NodeConnected change listener" must {
|
||||||
|
|
||||||
|
"be invoked when a new node joins the cluster" in {
|
||||||
|
System.getProperty("akka.cluster.nodename", "") must be("node1")
|
||||||
|
System.getProperty("akka.cluster.port", "") must be("9991")
|
||||||
|
|
||||||
|
val latch = new CountDownLatch(1)
|
||||||
|
Cluster.node.register(new ChangeListener {
|
||||||
|
override def nodeConnected(node: String, client: ClusterNode) {
|
||||||
|
latch.countDown
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
Cluster.node.shutdown()
|
||||||
|
|
||||||
|
Cluster.barrier("start-node1", NrOfNodes) {
|
||||||
|
Cluster.node.start()
|
||||||
|
}
|
||||||
|
|
||||||
|
Cluster.barrier("start-node2", NrOfNodes) {
|
||||||
|
latch.await(5, TimeUnit.SECONDS) must be === true
|
||||||
|
}
|
||||||
|
|
||||||
|
Cluster.barrier("shutdown", NrOfNodes) {
|
||||||
|
// Cluster.node.shutdown()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
override def beforeAll() = {
|
||||||
|
Cluster.startLocalCluster()
|
||||||
|
}
|
||||||
|
|
||||||
|
override def afterAll() = {
|
||||||
|
Cluster.shutdownLocalCluster()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class NodeConnectedChangeListenerMultiJvmNode2 extends WordSpec with MustMatchers {
|
||||||
|
import NodeConnectedChangeListenerMultiJvmSpec._
|
||||||
|
|
||||||
|
"A NodeConnected change listener" must {
|
||||||
|
|
||||||
|
"be invoked when a new node joins the cluster" in {
|
||||||
|
System.getProperty("akka.cluster.nodename", "") must be("node2")
|
||||||
|
System.getProperty("akka.cluster.port", "") must be("9992")
|
||||||
|
|
||||||
|
Cluster.barrier("start-node1", NrOfNodes) {}
|
||||||
|
|
||||||
|
Cluster.barrier("start-node2", NrOfNodes) {
|
||||||
|
Cluster.node.start()
|
||||||
|
}
|
||||||
|
|
||||||
|
Cluster.barrier("shutdown", NrOfNodes) {
|
||||||
|
// Cluster.node.shutdown()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue