Added the concept of PortPrefix in the clustering tests to avoid clashes in binding of server port when running the tests in parallel.

Signed-off-by: Jonas Bonér <jonas@jonasboner.com>
This commit is contained in:
Jonas Bonér 2012-05-04 15:03:23 +02:00
parent 1639028a78
commit 5adaa1a271
10 changed files with 107 additions and 91 deletions

View file

@ -15,6 +15,7 @@ import com.typesafe.config._
import java.net.InetSocketAddress
class ClientDowningSpec extends ClusterSpec("akka.cluster.auto-down = off") with ImplicitSender {
val portPrefix = 1
var node1: Cluster = _
var node2: Cluster = _
@ -34,8 +35,8 @@ class ClientDowningSpec extends ClusterSpec("akka.cluster.auto-down = off") with
.parseString("""
akka {
actor.provider = "akka.remote.RemoteActorRefProvider"
remote.netty.port = 5550
}""")
remote.netty.port = %d550
}""".format(portPrefix))
.withFallback(system.settings.config))
.asInstanceOf[ActorSystemImpl]
val remote1 = system1.provider.asInstanceOf[RemoteActorRefProvider]
@ -48,9 +49,9 @@ class ClientDowningSpec extends ClusterSpec("akka.cluster.auto-down = off") with
.parseString("""
akka {
actor.provider = "akka.remote.RemoteActorRefProvider"
remote.netty.port = 5551
cluster.node-to-join = "akka://system1@localhost:5550"
}""")
remote.netty.port = %d551
cluster.node-to-join = "akka://system1@localhost:%d550"
}""".format(portPrefix, portPrefix))
.withFallback(system.settings.config))
.asInstanceOf[ActorSystemImpl]
val remote2 = system2.provider.asInstanceOf[RemoteActorRefProvider]
@ -63,9 +64,9 @@ class ClientDowningSpec extends ClusterSpec("akka.cluster.auto-down = off") with
.parseString("""
akka {
actor.provider = "akka.remote.RemoteActorRefProvider"
remote.netty.port = 5552
cluster.node-to-join = "akka://system1@localhost:5550"
}""")
remote.netty.port = %d552
cluster.node-to-join = "akka://system1@localhost:%d550"
}""".format(portPrefix, portPrefix))
.withFallback(system.settings.config))
.asInstanceOf[ActorSystemImpl]
val remote3 = system3.provider.asInstanceOf[RemoteActorRefProvider]
@ -78,9 +79,9 @@ class ClientDowningSpec extends ClusterSpec("akka.cluster.auto-down = off") with
.parseString("""
akka {
actor.provider = "akka.remote.RemoteActorRefProvider"
remote.netty.port = 5553
cluster.node-to-join = "akka://system1@localhost:5550"
}""")
remote.netty.port = %d553
cluster.node-to-join = "akka://system1@localhost:%d550"
}""".format(portPrefix, portPrefix))
.withFallback(system.settings.config))
.asInstanceOf[ActorSystemImpl]
val remote4 = system4.provider.asInstanceOf[RemoteActorRefProvider]

View file

@ -9,14 +9,14 @@ import akka.util.duration._
import akka.util.Duration
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ClusterConfigSpec extends ClusterSpec {
class ClusterConfigSpec extends AkkaSpec {
"Clustering" must {
"be able to parse generic cluster config elements" in {
val settings = new ClusterSettings(system.settings.config, system.name)
import settings._
FailureDetectorThreshold must be(3)
FailureDetectorThreshold must be(8)
FailureDetectorMaxSampleSize must be(1000)
NodeToJoin must be(None)
PeriodicTasksInitialDelay must be(1 seconds)

View file

@ -39,6 +39,14 @@ object ClusterSpec {
}
abstract class ClusterSpec(_system: ActorSystem) extends AkkaSpec(_system) {
case class PortPrefix(port: Int) {
def withPortPrefix: Int = (portPrefix.toString + port.toString).toInt
}
implicit def intToPortPrefix(port: Int) = PortPrefix(port)
def portPrefix: Int
def this(config: Config) = this(ActorSystem(AkkaSpec.getCallerName, config.withFallback(ClusterSpec.testConf)))
def this(s: String) = this(ConfigFactory.parseString(s))

View file

@ -14,6 +14,7 @@ import com.typesafe.config._
import java.net.InetSocketAddress
class GossipingAccrualFailureDetectorSpec extends ClusterSpec with ImplicitSender {
val portPrefix = 2
var node1: Cluster = _
var node2: Cluster = _
@ -31,8 +32,8 @@ class GossipingAccrualFailureDetectorSpec extends ClusterSpec with ImplicitSende
.parseString("""
akka {
actor.provider = "akka.remote.RemoteActorRefProvider"
remote.netty.port=5550
}""")
remote.netty.port=%d550
}""".format(portPrefix))
.withFallback(system.settings.config))
.asInstanceOf[ActorSystemImpl]
val remote1 = system1.provider.asInstanceOf[RemoteActorRefProvider]
@ -45,9 +46,9 @@ class GossipingAccrualFailureDetectorSpec extends ClusterSpec with ImplicitSende
.parseString("""
akka {
actor.provider = "akka.remote.RemoteActorRefProvider"
remote.netty.port=5551
cluster.node-to-join = "akka://system1@localhost:5550"
}""")
remote.netty.port=%d551
cluster.node-to-join = "akka://system1@localhost:%d550"
}""".format(portPrefix, portPrefix))
.withFallback(system.settings.config))
.asInstanceOf[ActorSystemImpl]
val remote2 = system2.provider.asInstanceOf[RemoteActorRefProvider]
@ -60,9 +61,9 @@ class GossipingAccrualFailureDetectorSpec extends ClusterSpec with ImplicitSende
.parseString("""
akka {
actor.provider = "akka.remote.RemoteActorRefProvider"
remote.netty.port=5552
cluster.node-to-join = "akka://system1@localhost:5550"
}""")
remote.netty.port=%d552
cluster.node-to-join = "akka://system1@localhost:%d550"
}""".format(portPrefix, portPrefix))
.withFallback(system.settings.config))
.asInstanceOf[ActorSystemImpl]
val remote3 = system3.provider.asInstanceOf[RemoteActorRefProvider]

View file

@ -15,6 +15,7 @@ import com.typesafe.config._
import java.net.InetSocketAddress
class JoinTwoClustersSpec extends ClusterSpec("akka.cluster.failure-detector.threshold = 5") with ImplicitSender {
val portPrefix = 3
var node1: Cluster = _
var node2: Cluster = _
@ -38,8 +39,8 @@ class JoinTwoClustersSpec extends ClusterSpec("akka.cluster.failure-detector.thr
.parseString("""
akka {
actor.provider = "akka.remote.RemoteActorRefProvider"
remote.netty.port = 5551
}""")
remote.netty.port = %d551
}""".format(portPrefix))
.withFallback(system.settings.config))
.asInstanceOf[ActorSystemImpl]
node1 = Cluster(system1)
@ -49,9 +50,9 @@ class JoinTwoClustersSpec extends ClusterSpec("akka.cluster.failure-detector.thr
.parseString("""
akka {
actor.provider = "akka.remote.RemoteActorRefProvider"
remote.netty.port = 5552
cluster.node-to-join = "akka://system1@localhost:5551"
}""")
remote.netty.port = %d552
cluster.node-to-join = "akka://system1@localhost:%d551"
}""".format(portPrefix, portPrefix))
.withFallback(system.settings.config))
.asInstanceOf[ActorSystemImpl]
node2 = Cluster(system2)
@ -61,8 +62,8 @@ class JoinTwoClustersSpec extends ClusterSpec("akka.cluster.failure-detector.thr
.parseString("""
akka {
actor.provider = "akka.remote.RemoteActorRefProvider"
remote.netty.port = 5553
}""")
remote.netty.port = %d553
}""".format(portPrefix))
.withFallback(system.settings.config))
.asInstanceOf[ActorSystemImpl]
node3 = Cluster(system3)
@ -72,9 +73,9 @@ class JoinTwoClustersSpec extends ClusterSpec("akka.cluster.failure-detector.thr
.parseString("""
akka {
actor.provider = "akka.remote.RemoteActorRefProvider"
remote.netty.port = 5554
cluster.node-to-join = "akka://system3@localhost:5553"
}""")
remote.netty.port = %d554
cluster.node-to-join = "akka://system3@localhost:%d553"
}""".format(portPrefix, portPrefix))
.withFallback(system.settings.config))
.asInstanceOf[ActorSystemImpl]
node4 = Cluster(system4)
@ -84,8 +85,8 @@ class JoinTwoClustersSpec extends ClusterSpec("akka.cluster.failure-detector.thr
.parseString("""
akka {
actor.provider = "akka.remote.RemoteActorRefProvider"
remote.netty.port = 5555
}""")
remote.netty.port = %d555
}""".format(portPrefix))
.withFallback(system.settings.config))
.asInstanceOf[ActorSystemImpl]
node5 = Cluster(system5)
@ -95,9 +96,9 @@ class JoinTwoClustersSpec extends ClusterSpec("akka.cluster.failure-detector.thr
.parseString("""
akka {
actor.provider = "akka.remote.RemoteActorRefProvider"
remote.netty.port = 5556
cluster.node-to-join = "akka://system5@localhost:5555"
}""")
remote.netty.port = %d556
cluster.node-to-join = "akka://system5@localhost:%d555"
}""".format(portPrefix, portPrefix))
.withFallback(system.settings.config))
.asInstanceOf[ActorSystemImpl]
node6 = Cluster(system6)

View file

@ -15,6 +15,7 @@ import com.typesafe.config._
import java.net.InetSocketAddress
class LeaderDowningSpec extends ClusterSpec with ImplicitSender {
val portPrefix = 4
var node1: Cluster = _
var node2: Cluster = _
@ -34,8 +35,8 @@ class LeaderDowningSpec extends ClusterSpec with ImplicitSender {
.parseString("""
akka {
actor.provider = "akka.remote.RemoteActorRefProvider"
remote.netty.port = 5550
}""")
remote.netty.port = %d550
}""".format(portPrefix))
.withFallback(system.settings.config))
.asInstanceOf[ActorSystemImpl]
val remote1 = system1.provider.asInstanceOf[RemoteActorRefProvider]
@ -48,9 +49,9 @@ class LeaderDowningSpec extends ClusterSpec with ImplicitSender {
.parseString("""
akka {
actor.provider = "akka.remote.RemoteActorRefProvider"
remote.netty.port = 5551
cluster.node-to-join = "akka://system1@localhost:5550"
}""")
remote.netty.port = %d551
cluster.node-to-join = "akka://system1@localhost:%d550"
}""".format(portPrefix, portPrefix))
.withFallback(system.settings.config))
.asInstanceOf[ActorSystemImpl]
val remote2 = system2.provider.asInstanceOf[RemoteActorRefProvider]
@ -63,9 +64,9 @@ class LeaderDowningSpec extends ClusterSpec with ImplicitSender {
.parseString("""
akka {
actor.provider = "akka.remote.RemoteActorRefProvider"
remote.netty.port = 5552
cluster.node-to-join = "akka://system1@localhost:5550"
}""")
remote.netty.port = %d552
cluster.node-to-join = "akka://system1@localhost:%d550"
}""".format(portPrefix, portPrefix))
.withFallback(system.settings.config))
.asInstanceOf[ActorSystemImpl]
val remote3 = system3.provider.asInstanceOf[RemoteActorRefProvider]
@ -78,9 +79,9 @@ class LeaderDowningSpec extends ClusterSpec with ImplicitSender {
.parseString("""
akka {
actor.provider = "akka.remote.RemoteActorRefProvider"
remote.netty.port = 5553
cluster.node-to-join = "akka://system1@localhost:5550"
}""")
remote.netty.port = %d553
cluster.node-to-join = "akka://system1@localhost:%d550"
}""".format(portPrefix, portPrefix))
.withFallback(system.settings.config))
.asInstanceOf[ActorSystemImpl]
val remote4 = system4.provider.asInstanceOf[RemoteActorRefProvider]

View file

@ -15,6 +15,7 @@ import com.typesafe.config._
import java.net.InetSocketAddress
class LeaderElectionSpec extends ClusterSpec with ImplicitSender {
val portPrefix = 5
var node1: Cluster = _
var node2: Cluster = _
@ -32,8 +33,8 @@ class LeaderElectionSpec extends ClusterSpec with ImplicitSender {
.parseString("""
akka {
actor.provider = "akka.remote.RemoteActorRefProvider"
remote.netty.port = 5550
}""")
remote.netty.port = %d550
}""".format(portPrefix))
.withFallback(system.settings.config))
.asInstanceOf[ActorSystemImpl]
node1 = Cluster(system1)
@ -44,9 +45,9 @@ class LeaderElectionSpec extends ClusterSpec with ImplicitSender {
.parseString("""
akka {
actor.provider = "akka.remote.RemoteActorRefProvider"
remote.netty.port = 5551
cluster.node-to-join = "akka://system1@localhost:5550"
}""")
remote.netty.port = %d551
cluster.node-to-join = "akka://system1@localhost:%d550"
}""".format(portPrefix, portPrefix))
.withFallback(system.settings.config))
.asInstanceOf[ActorSystemImpl]
node2 = Cluster(system2)
@ -57,9 +58,9 @@ class LeaderElectionSpec extends ClusterSpec with ImplicitSender {
.parseString("""
akka {
actor.provider = "akka.remote.RemoteActorRefProvider"
remote.netty.port = 5552
cluster.node-to-join = "akka://system1@localhost:5550"
}""")
remote.netty.port = %d552
cluster.node-to-join = "akka://system1@localhost:%d550"
}""".format(portPrefix, portPrefix))
.withFallback(system.settings.config))
.asInstanceOf[ActorSystemImpl]
node3 = Cluster(system3)

View file

@ -17,6 +17,7 @@ import scala.collection.immutable.SortedSet
import com.typesafe.config._
class MembershipChangeListenerSpec extends ClusterSpec with ImplicitSender {
val portPrefix = 6
var node0: Cluster = _
var node1: Cluster = _
@ -33,8 +34,8 @@ class MembershipChangeListenerSpec extends ClusterSpec with ImplicitSender {
.parseString("""
akka {
actor.provider = "akka.remote.RemoteActorRefProvider"
remote.netty.port=5550
}""")
remote.netty.port = %d550
}""".format(portPrefix))
.withFallback(system.settings.config))
.asInstanceOf[ActorSystemImpl]
val remote0 = system0.provider.asInstanceOf[RemoteActorRefProvider]
@ -44,9 +45,9 @@ class MembershipChangeListenerSpec extends ClusterSpec with ImplicitSender {
.parseString("""
akka {
actor.provider = "akka.remote.RemoteActorRefProvider"
remote.netty.port=5551
cluster.node-to-join = "akka://system0@localhost:5550"
}""")
remote.netty.port=%d551
cluster.node-to-join = "akka://system0@localhost:%d550"
}""".format(portPrefix, portPrefix))
.withFallback(system.settings.config))
.asInstanceOf[ActorSystemImpl]
val remote1 = system1.provider.asInstanceOf[RemoteActorRefProvider]
@ -81,9 +82,9 @@ class MembershipChangeListenerSpec extends ClusterSpec with ImplicitSender {
.parseString("""
akka {
actor.provider = "akka.remote.RemoteActorRefProvider"
remote.netty.port=5552
cluster.node-to-join = "akka://system0@localhost:5550"
}""")
remote.netty.port=%d552
cluster.node-to-join = "akka://system0@localhost:%d550"
}""".format(portPrefix, portPrefix))
.withFallback(system.settings.config))
.asInstanceOf[ActorSystemImpl]
val remote2 = system2.provider.asInstanceOf[RemoteActorRefProvider]

View file

@ -14,6 +14,7 @@ import akka.util.duration._
import com.typesafe.config._
class NodeMembershipSpec extends ClusterSpec with ImplicitSender {
val portPrefix = 7
var node0: Cluster = _
var node1: Cluster = _
@ -32,8 +33,8 @@ class NodeMembershipSpec extends ClusterSpec with ImplicitSender {
.parseString("""
akka {
actor.provider = "akka.remote.RemoteActorRefProvider"
remote.netty.port=5550
}""")
remote.netty.port = %d550
}""".format(portPrefix))
.withFallback(system.settings.config))
.asInstanceOf[ActorSystemImpl]
val remote0 = system0.provider.asInstanceOf[RemoteActorRefProvider]
@ -44,9 +45,9 @@ class NodeMembershipSpec extends ClusterSpec with ImplicitSender {
.parseString("""
akka {
actor.provider = "akka.remote.RemoteActorRefProvider"
remote.netty.port=5551
cluster.node-to-join = "akka://system0@localhost:5550"
}""")
remote.netty.port = %d551
cluster.node-to-join = "akka://system0@localhost:%d550"
}""".format(portPrefix, portPrefix))
.withFallback(system.settings.config))
.asInstanceOf[ActorSystemImpl]
val remote1 = system1.provider.asInstanceOf[RemoteActorRefProvider]
@ -57,16 +58,16 @@ class NodeMembershipSpec extends ClusterSpec with ImplicitSender {
val members0 = node0.latestGossip.members.toArray
members0.size must be(2)
members0(0).address.port.get must be(5550)
members0(0).address.port.get must be(550.withPortPrefix)
members0(0).status must be(MemberStatus.Up)
members0(1).address.port.get must be(5551)
members0(1).address.port.get must be(551.withPortPrefix)
members0(1).status must be(MemberStatus.Up)
val members1 = node1.latestGossip.members.toArray
members1.size must be(2)
members1(0).address.port.get must be(5550)
members1(0).address.port.get must be(550.withPortPrefix)
members1(0).status must be(MemberStatus.Up)
members1(1).address.port.get must be(5551)
members1(1).address.port.get must be(551.withPortPrefix)
members1(1).status must be(MemberStatus.Up)
}
@ -77,9 +78,9 @@ class NodeMembershipSpec extends ClusterSpec with ImplicitSender {
.parseString("""
akka {
actor.provider = "akka.remote.RemoteActorRefProvider"
remote.netty.port=5552
cluster.node-to-join = "akka://system0@localhost:5550"
}""")
remote.netty.port = %d552
cluster.node-to-join = "akka://system0@localhost:%d550"
}""".format(portPrefix, portPrefix))
.withFallback(system.settings.config))
.asInstanceOf[ActorSystemImpl]
val remote2 = system2.provider.asInstanceOf[RemoteActorRefProvider]
@ -90,29 +91,29 @@ class NodeMembershipSpec extends ClusterSpec with ImplicitSender {
val members0 = node0.latestGossip.members.toArray
val version = node0.latestGossip.version
members0.size must be(3)
members0(0).address.port.get must be(5550)
members0(0).address.port.get must be(550.withPortPrefix)
members0(0).status must be(MemberStatus.Up)
members0(1).address.port.get must be(5551)
members0(1).address.port.get must be(551.withPortPrefix)
members0(1).status must be(MemberStatus.Up)
members0(2).address.port.get must be(5552)
members0(2).address.port.get must be(552.withPortPrefix)
members0(2).status must be(MemberStatus.Up)
val members1 = node1.latestGossip.members.toArray
members1.size must be(3)
members1(0).address.port.get must be(5550)
members1(0).address.port.get must be(550.withPortPrefix)
members1(0).status must be(MemberStatus.Up)
members1(1).address.port.get must be(5551)
members1(1).address.port.get must be(551.withPortPrefix)
members1(1).status must be(MemberStatus.Up)
members1(2).address.port.get must be(5552)
members1(2).address.port.get must be(552.withPortPrefix)
members1(2).status must be(MemberStatus.Up)
val members2 = node2.latestGossip.members.toArray
members2.size must be(3)
members2(0).address.port.get must be(5550)
members2(0).address.port.get must be(550.withPortPrefix)
members2(0).status must be(MemberStatus.Up)
members2(1).address.port.get must be(5551)
members2(1).address.port.get must be(551.withPortPrefix)
members2(1).status must be(MemberStatus.Up)
members2(2).address.port.get must be(5552)
members2(2).address.port.get must be(552.withPortPrefix)
members2(2).status must be(MemberStatus.Up)
}
}

View file

@ -14,6 +14,7 @@ import akka.util.duration._
import com.typesafe.config._
class NodeStartupSpec extends ClusterSpec with ImplicitSender {
val portPrefix = 8
var node0: Cluster = _
var node1: Cluster = _
@ -26,8 +27,8 @@ class NodeStartupSpec extends ClusterSpec with ImplicitSender {
.parseString("""
akka {
actor.provider = "akka.remote.RemoteActorRefProvider"
remote.netty.port=5550
} """)
remote.netty.port=%d550
}""".format(portPrefix))
.withFallback(system.settings.config))
.asInstanceOf[ActorSystemImpl]
val remote0 = system0.provider.asInstanceOf[RemoteActorRefProvider]
@ -40,7 +41,7 @@ class NodeStartupSpec extends ClusterSpec with ImplicitSender {
"be in 'Joining' phase when started up" taggedAs LongRunningTest in {
val members = node0.latestGossip.members
val joiningMember = members find (_.address.port.get == 5550)
val joiningMember = members find (_.address.port.get == 550.withPortPrefix)
joiningMember must be('defined)
joiningMember.get.status must be(MemberStatus.Joining)
}
@ -52,9 +53,9 @@ class NodeStartupSpec extends ClusterSpec with ImplicitSender {
.parseString("""
akka {
actor.provider = "akka.remote.RemoteActorRefProvider"
remote.netty.port=5551
cluster.node-to-join = "akka://system0@localhost:5550"
}""")
remote.netty.port=%d551
cluster.node-to-join = "akka://system0@localhost:%d550"
}""".format(portPrefix, portPrefix))
.withFallback(system.settings.config))
.asInstanceOf[ActorSystemImpl]
val remote1 = system1.provider.asInstanceOf[RemoteActorRefProvider]
@ -62,7 +63,7 @@ class NodeStartupSpec extends ClusterSpec with ImplicitSender {
Thread.sleep(10.seconds.dilated.toMillis) // give enough time for node1 to JOIN node0 and leader to move him to UP
val members = node0.latestGossip.members
val joiningMember = members find (_.address.port.get == 5551)
val joiningMember = members find (_.address.port.get == 551.withPortPrefix)
joiningMember must be('defined)
joiningMember.get.status must be(MemberStatus.Up)
}