Merge branch 'master' into wip-2109-port-cluster-test-jboner

This commit is contained in:
Jonas Bonér 2012-05-25 17:31:03 +02:00
commit 5e6f856743
19 changed files with 467 additions and 397 deletions

View file

@ -648,7 +648,7 @@ object Logging {
import java.util.Date import java.util.Date
private val dateFormat = new SimpleDateFormat("MM/dd/yyyy HH:mm:ss.SSS") private val dateFormat = new SimpleDateFormat("MM/dd/yyyy HH:mm:ss.SSS")
private val errorFormat = "[ERROR] [%s] [%s] [%s] %s\n%s".intern private val errorFormat = "[ERROR] [%s] [%s] [%s] %s%s".intern
private val errorFormatWithoutCause = "[ERROR] [%s] [%s] [%s] %s".intern private val errorFormatWithoutCause = "[ERROR] [%s] [%s] [%s] %s".intern
private val warningFormat = "[WARN] [%s] [%s] [%s] %s".intern private val warningFormat = "[WARN] [%s] [%s] [%s] %s".intern
private val infoFormat = "[INFO] [%s] [%s] [%s] %s".intern private val infoFormat = "[INFO] [%s] [%s] [%s] %s".intern
@ -728,10 +728,12 @@ object Logging {
* Returns the StackTrace for the given Throwable as a String * Returns the StackTrace for the given Throwable as a String
*/ */
def stackTraceFor(e: Throwable): String = e match { def stackTraceFor(e: Throwable): String = e match {
case null | Error.NoCause | _: NoStackTrace "" case null | Error.NoCause ""
case _: NoStackTrace " (" + e.getClass.getName + ")"
case other case other
val sw = new java.io.StringWriter val sw = new java.io.StringWriter
val pw = new java.io.PrintWriter(sw) val pw = new java.io.PrintWriter(sw)
pw.append('\n')
other.printStackTrace(pw) other.printStackTrace(pw)
sw.toString sw.toString
} }

View file

@ -100,7 +100,18 @@ class Member(val address: Address, val status: MemberStatus) extends ClusterMess
object Member { object Member {
import MemberStatus._ import MemberStatus._
implicit val ordering = Ordering.fromLessThan[Member](_.address.toString < _.address.toString) /**
* Sort Address by host and port
*/
implicit val addressOrdering: Ordering[Address] = Ordering.fromLessThan[Address] { (a, b)
if (a.host != b.host) a.host.getOrElse("").compareTo(b.host.getOrElse("")) < 0
else if (a.port != b.port) a.port.getOrElse(0) < b.port.getOrElse(0)
else false
}
implicit val ordering: Ordering[Member] = new Ordering[Member] {
def compare(x: Member, y: Member) = addressOrdering.compare(x.address, y.address)
}
def apply(address: Address, status: MemberStatus): Member = new Member(address, status) def apply(address: Address, status: MemberStatus): Member = new Member(address, status)

View file

@ -1,101 +1,90 @@
/** ///**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> // * Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/ // */
//
package akka.cluster //package akka.cluster
//
import org.scalatest.BeforeAndAfter //import org.scalatest.BeforeAndAfter
import com.typesafe.config.ConfigFactory //import com.typesafe.config.ConfigFactory
import akka.remote.testkit.MultiNodeConfig //import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec //import akka.remote.testkit.MultiNodeSpec
import akka.testkit.ImplicitSender //import akka.testkit._
//
object JoinTwoClustersMultiJvmSpec extends MultiNodeConfig { //object JoinTwoClustersMultiJvmSpec extends MultiNodeConfig {
val a1 = role("a1") // val a1 = role("a1")
val a2 = role("a2") // val a2 = role("a2")
val b1 = role("b1") // val b1 = role("b1")
val b2 = role("b2") // val b2 = role("b2")
val c1 = role("c1") // val c1 = role("c1")
val c2 = role("c2") // val c2 = role("c2")
//
commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString(""" // commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
akka.cluster { //
gossip-frequency = 200 ms //}
leader-actions-frequency = 200 ms //
periodic-tasks-initial-delay = 300 ms //class JoinTwoClustersMultiJvmNode1 extends JoinTwoClustersSpec
} //class JoinTwoClustersMultiJvmNode2 extends JoinTwoClustersSpec
"""))) //class JoinTwoClustersMultiJvmNode3 extends JoinTwoClustersSpec
//class JoinTwoClustersMultiJvmNode4 extends JoinTwoClustersSpec
} //class JoinTwoClustersMultiJvmNode5 extends JoinTwoClustersSpec
//class JoinTwoClustersMultiJvmNode6 extends JoinTwoClustersSpec
class JoinTwoClustersMultiJvmNode1 extends JoinTwoClustersSpec //
class JoinTwoClustersMultiJvmNode2 extends JoinTwoClustersSpec //abstract class JoinTwoClustersSpec extends MultiNodeSpec(JoinTwoClustersMultiJvmSpec) with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter {
class JoinTwoClustersMultiJvmNode3 extends JoinTwoClustersSpec // import JoinTwoClustersMultiJvmSpec._
class JoinTwoClustersMultiJvmNode4 extends JoinTwoClustersSpec //
class JoinTwoClustersMultiJvmNode5 extends JoinTwoClustersSpec // override def initialParticipants = 6
class JoinTwoClustersMultiJvmNode6 extends JoinTwoClustersSpec //
// after {
abstract class JoinTwoClustersSpec extends MultiNodeSpec(JoinTwoClustersMultiJvmSpec) with ImplicitSender with BeforeAndAfter { // testConductor.enter("after")
import JoinTwoClustersMultiJvmSpec._ // }
//
override def initialParticipants = 6 // val a1Address = node(a1).address
// val b1Address = node(b1).address
def cluster: Cluster = Cluster(system) // val c1Address = node(c1).address
//
after { // "Three different clusters (A, B and C)" must {
testConductor.enter("after") //
} // "be able to 'elect' a single leader after joining (A -> B)" taggedAs LongRunningTest in {
//
val a1Address = node(a1).address // runOn(a1, a2) {
val b1Address = node(b1).address // cluster.join(a1Address)
val c1Address = node(c1).address // }
// runOn(b1, b2) {
def awaitUpConvergence(numberOfMembers: Int): Unit = { // cluster.join(b1Address)
awaitCond(cluster.latestGossip.members.size == numberOfMembers) // }
awaitCond(cluster.latestGossip.members.forall(_.status == MemberStatus.Up)) // runOn(c1, c2) {
awaitCond(cluster.convergence.isDefined) // cluster.join(c1Address)
} // }
//
"Three different clusters (A, B and C)" must { // awaitUpConvergence(numberOfMembers = 2)
//
"be able to 'elect' a single leader after joining (A -> B)" in { // assertLeader(a1, a2)
// assertLeader(b1, b2)
runOn(a1, a2) { // assertLeader(c1, c2)
cluster.join(a1Address) //
} // runOn(b2) {
runOn(b1, b2) { // cluster.join(a1Address)
cluster.join(b1Address) // }
} //
runOn(c1, c2) { // runOn(a1, a2, b1, b2) {
cluster.join(c1Address) // awaitUpConvergence(numberOfMembers = 4)
} // }
//
awaitUpConvergence(numberOfMembers = 2) // assertLeader(a1, a2, b1, b2)
// assertLeader(c1, c2)
cluster.isLeader must be(ifNode(a1, b1, c1)(true)(false)) //
// }
runOn(b2) { //
cluster.join(a1Address) // "be able to 'elect' a single leader after joining (C -> A + B)" taggedAs LongRunningTest in {
} //
// runOn(b2) {
runOn(a1, a2, b1, b2) { // cluster.join(c1Address)
awaitUpConvergence(numberOfMembers = 4) // }
} //
// awaitUpConvergence(numberOfMembers = 6)
cluster.isLeader must be(ifNode(a1, c1)(true)(false)) //
// assertLeader(a1, a2, b1, b2, c1, c2)
} // }
// }
"be able to 'elect' a single leader after joining (C -> A + B)" in { //
//}
runOn(b2) {
cluster.join(c1Address)
}
awaitUpConvergence(numberOfMembers = 6)
cluster.isLeader must be(ifNode(a1)(true)(false))
}
}
}

View file

@ -1,85 +1,77 @@
/** ///**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> // * Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/ // */
package akka.cluster //package akka.cluster
//
import scala.collection.immutable.SortedSet //import scala.collection.immutable.SortedSet
import org.scalatest.BeforeAndAfter //import org.scalatest.BeforeAndAfter
import com.typesafe.config.ConfigFactory //import com.typesafe.config.ConfigFactory
import akka.remote.testkit.MultiNodeConfig //import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec //import akka.remote.testkit.MultiNodeSpec
import akka.testkit.ImplicitSender //import akka.testkit._
import akka.testkit.TestLatch //
//object MembershipChangeListenerMultiJvmSpec extends MultiNodeConfig {
object MembershipChangeListenerMultiJvmSpec extends MultiNodeConfig { // val first = role("first")
val first = role("first") // val second = role("second")
val second = role("second") // val third = role("third")
val third = role("third") //
// commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString(""" //
akka.cluster { //}
gossip-frequency = 200 ms //
leader-actions-frequency = 200 ms //class MembershipChangeListenerMultiJvmNode1 extends MembershipChangeListenerSpec
periodic-tasks-initial-delay = 300 ms //class MembershipChangeListenerMultiJvmNode2 extends MembershipChangeListenerSpec
} //class MembershipChangeListenerMultiJvmNode3 extends MembershipChangeListenerSpec
"""))) //
//abstract class MembershipChangeListenerSpec extends MultiNodeSpec(MembershipChangeListenerMultiJvmSpec)
} // with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter {
// import MembershipChangeListenerMultiJvmSpec._
class MembershipChangeListenerMultiJvmNode1 extends MembershipChangeListenerSpec //
class MembershipChangeListenerMultiJvmNode2 extends MembershipChangeListenerSpec // override def initialParticipants = 3
class MembershipChangeListenerMultiJvmNode3 extends MembershipChangeListenerSpec //
// after {
abstract class MembershipChangeListenerSpec extends MultiNodeSpec(MembershipChangeListenerMultiJvmSpec) with ImplicitSender with BeforeAndAfter { // testConductor.enter("after")
import MembershipChangeListenerMultiJvmSpec._ // }
//
override def initialParticipants = 3 // "A set of connected cluster systems" must {
//
def cluster: Cluster = Cluster(system) // val firstAddress = node(first).address
// val secondAddress = node(second).address
after { //
testConductor.enter("after") // "(when two systems) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" taggedAs LongRunningTest in {
} //
// runOn(first, second) {
"A set of connected cluster systems" must { // cluster.join(firstAddress)
// val latch = TestLatch()
val firstAddress = node(first).address // cluster.registerListener(new MembershipChangeListener {
val secondAddress = node(second).address // def notify(members: SortedSet[Member]) {
// if (members.size == 2 && members.forall(_.status == MemberStatus.Up))
"(when two systems) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" in { // latch.countDown()
// }
runOn(first, second) { // })
cluster.join(firstAddress) // latch.await
val latch = TestLatch() // cluster.convergence.isDefined must be(true)
cluster.registerListener(new MembershipChangeListener { // }
def notify(members: SortedSet[Member]) { //
if (members.size == 2 && members.forall(_.status == MemberStatus.Up)) // }
latch.countDown() //
} // "(when three systems) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" taggedAs LongRunningTest in {
}) //
latch.await // runOn(third) {
cluster.convergence.isDefined must be(true) // cluster.join(firstAddress)
} // }
//
} // val latch = TestLatch()
// cluster.registerListener(new MembershipChangeListener {
"(when three systems) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" in { // def notify(members: SortedSet[Member]) {
// if (members.size == 3 && members.forall(_.status == MemberStatus.Up))
runOn(third) { // latch.countDown()
cluster.join(firstAddress) // }
} // })
// latch.await
val latch = TestLatch() // cluster.convergence.isDefined must be(true)
cluster.registerListener(new MembershipChangeListener { //
def notify(members: SortedSet[Member]) { // }
if (members.size == 3 && members.forall(_.status == MemberStatus.Up)) // }
latch.countDown() //
} //}
})
latch.await
cluster.convergence.isDefined must be(true)
}
}
}

View file

@ -0,0 +1,63 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import akka.actor.Address
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeSpec
import akka.util.duration._
object MultiNodeClusterSpec {
def clusterConfig: Config = ConfigFactory.parseString("""
akka.cluster {
gossip-frequency = 200 ms
leader-actions-frequency = 200 ms
periodic-tasks-initial-delay = 300 ms
}
akka.test {
single-expect-default = 5 s
}
""")
}
trait MultiNodeClusterSpec { self: MultiNodeSpec
def cluster: Cluster = Cluster(system)
/**
* Assert that the member addresses match the expected addresses in the
* sort order used by the cluster.
*/
def assertMembers(gotMembers: Iterable[Member], expectedAddresses: Address*): Unit = {
import Member.addressOrdering
val members = gotMembers.toIndexedSeq
members.size must be(expectedAddresses.length)
expectedAddresses.sorted.zipWithIndex.foreach { case (a, i) members(i).address must be(a) }
}
/**
* Assert that the cluster has elected the correct leader
* out of all nodes in the cluster. First
* member in the cluster ring is expected leader.
*/
def assertLeader(nodesInCluster: RoleName*): Unit = if (nodesInCluster.contains(mySelf)) {
nodesInCluster.length must not be (0)
import Member.addressOrdering
val expectedLeader = nodesInCluster.map(role (role, node(role).address)).sortBy(_._2).head._1
cluster.isLeader must be(ifNode(expectedLeader)(true)(false))
}
/**
* Wait until the expected number of members has status Up
* and convergence has been reached.
*/
def awaitUpConvergence(numberOfMembers: Int): Unit = {
awaitCond(cluster.latestGossip.members.size == numberOfMembers)
awaitCond(cluster.latestGossip.members.forall(_.status == MemberStatus.Up))
awaitCond(cluster.convergence.isDefined, 10 seconds)
}
}

View file

@ -1,88 +1,70 @@
/** ///**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> // * Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/ // */
package akka.cluster //package akka.cluster
//
import com.typesafe.config.ConfigFactory //import com.typesafe.config.ConfigFactory
import org.scalatest.BeforeAndAfter //import org.scalatest.BeforeAndAfter
import akka.remote.testkit.MultiNodeConfig //import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec //import akka.remote.testkit.MultiNodeSpec
import akka.testkit._ //import akka.testkit._
//
object NodeMembershipMultiJvmSpec extends MultiNodeConfig { //object NodeMembershipMultiJvmSpec extends MultiNodeConfig {
val first = role("first") // val first = role("first")
val second = role("second") // val second = role("second")
val third = role("third") // val third = role("third")
//
commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString(""" // commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
akka.cluster { //
gossip-frequency = 200 ms //}
leader-actions-frequency = 200 ms //
periodic-tasks-initial-delay = 300 ms //class NodeMembershipMultiJvmNode1 extends NodeMembershipSpec
} //class NodeMembershipMultiJvmNode2 extends NodeMembershipSpec
"""))) //class NodeMembershipMultiJvmNode3 extends NodeMembershipSpec
//
} //abstract class NodeMembershipSpec extends MultiNodeSpec(NodeMembershipMultiJvmSpec) with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter {
// import NodeMembershipMultiJvmSpec._
class NodeMembershipMultiJvmNode1 extends NodeMembershipSpec //
class NodeMembershipMultiJvmNode2 extends NodeMembershipSpec // override def initialParticipants = 3
class NodeMembershipMultiJvmNode3 extends NodeMembershipSpec //
// after {
abstract class NodeMembershipSpec extends MultiNodeSpec(NodeMembershipMultiJvmSpec) with ImplicitSender with BeforeAndAfter { // testConductor.enter("after")
import NodeMembershipMultiJvmSpec._ // }
//
override def initialParticipants = 3 // val firstAddress = node(first).address
// val secondAddress = node(second).address
def cluster: Cluster = Cluster(system) // val thirdAddress = node(third).address
//
after { // "A set of connected cluster systems" must {
testConductor.enter("after") //
} // "(when two systems) start gossiping to each other so that both systems gets the same gossip info" taggedAs LongRunningTest in {
//
val firstAddress = node(first).address // runOn(first, second) {
val secondAddress = node(second).address // cluster.join(firstAddress)
val thirdAddress = node(third).address // awaitCond(cluster.latestGossip.members.size == 2)
// assertMembers(cluster.latestGossip.members, firstAddress, secondAddress)
"A set of connected cluster systems" must { // awaitCond {
// cluster.latestGossip.members.forall(_.status == MemberStatus.Up)
"(when two systems) start gossiping to each other so that both systems gets the same gossip info" in { // }
// awaitCond(cluster.convergence.isDefined)
runOn(first, second) { // }
cluster.join(firstAddress) //
awaitCond(cluster.latestGossip.members.size == 2) // }
val members = cluster.latestGossip.members.toIndexedSeq //
members.size must be(2) // "(when three systems) start gossiping to each other so that both systems gets the same gossip info" taggedAs LongRunningTest in {
val sortedAddresses = IndexedSeq(firstAddress, secondAddress).sortBy(_.toString) //
members(0).address must be(sortedAddresses(0)) // runOn(third) {
members(1).address must be(sortedAddresses(1)) // cluster.join(firstAddress)
awaitCond { // }
cluster.latestGossip.members.forall(_.status == MemberStatus.Up) //
} // awaitCond(cluster.latestGossip.members.size == 3)
awaitCond(cluster.convergence.isDefined) // assertMembers(cluster.latestGossip.members, firstAddress, secondAddress, thirdAddress)
} // awaitCond {
// cluster.latestGossip.members.forall(_.status == MemberStatus.Up)
} // }
// awaitCond(cluster.convergence.isDefined)
"(when three systems) start gossiping to each other so that both systems gets the same gossip info" in { //
// }
runOn(third) { // }
cluster.join(firstAddress) //
} //}
// runOn all
awaitCond(cluster.latestGossip.members.size == 3)
val members = cluster.latestGossip.members.toIndexedSeq
members.size must be(3)
val sortedAddresses = IndexedSeq(firstAddress, secondAddress, thirdAddress).sortBy(_.toString)
members(0).address must be(sortedAddresses(0))
members(1).address must be(sortedAddresses(1))
members(2).address must be(sortedAddresses(2))
awaitCond {
cluster.latestGossip.members.forall(_.status == MemberStatus.Up)
}
awaitCond(cluster.convergence.isDefined)
}
}
}

View file

@ -1,82 +1,74 @@
/** ///**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> // * Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/ // */
package akka.cluster //package akka.cluster
//
import com.typesafe.config.ConfigFactory //import com.typesafe.config.ConfigFactory
import org.scalatest.BeforeAndAfter //import org.scalatest.BeforeAndAfter
import akka.remote.testkit.MultiNodeConfig //import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec //import akka.remote.testkit.MultiNodeSpec
import akka.testkit._ //import akka.testkit._
//
object NodeStartupMultiJvmSpec extends MultiNodeConfig { //object NodeStartupMultiJvmSpec extends MultiNodeConfig {
val first = role("first") // val first = role("first")
val second = role("second") // val second = role("second")
//
commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString(""" // commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
akka.cluster { //
gossip-frequency = 200 ms //}
leader-actions-frequency = 200 ms //
periodic-tasks-initial-delay = 300 ms //class NodeStartupMultiJvmNode1 extends NodeStartupSpec
} //class NodeStartupMultiJvmNode2 extends NodeStartupSpec
"""))) //
//abstract class NodeStartupSpec extends MultiNodeSpec(NodeStartupMultiJvmSpec) with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter {
} // import NodeStartupMultiJvmSpec._
//
class NodeStartupMultiJvmNode1 extends NodeStartupSpec // override def initialParticipants = 2
class NodeStartupMultiJvmNode2 extends NodeStartupSpec //
// after {
abstract class NodeStartupSpec extends MultiNodeSpec(NodeStartupMultiJvmSpec) with ImplicitSender with BeforeAndAfter { // testConductor.enter("after")
import NodeStartupMultiJvmSpec._ // }
//
override def initialParticipants = 2 // val firstAddress = node(first).address
// val secondAddress = node(second).address
def cluster: Cluster = Cluster(system) //
// "A first cluster node with a 'node-to-join' config set to empty string (singleton cluster)" must {
after { //
testConductor.enter("after") // "be a singleton cluster when started up" taggedAs LongRunningTest in {
} // runOn(first) {
// awaitCond(cluster.isSingletonCluster)
val firstAddress = node(first).address // // FIXME #2117 singletonCluster should reach convergence
val secondAddress = node(second).address // //awaitCond(cluster.convergence.isDefined)
// }
"A first cluster node with a 'node-to-join' config set to empty string (singleton cluster)" must { // }
//
"be a singleton cluster when started up" in { // "be in 'Joining' phase when started up" taggedAs LongRunningTest in {
runOn(first) { // runOn(first) {
awaitCond(cluster.isSingletonCluster) // val members = cluster.latestGossip.members
// FIXME #2117 singletonCluster should reach convergence // members.size must be(1)
//awaitCond(cluster.convergence.isDefined) //
} // val joiningMember = members find (_.address == firstAddress)
} // joiningMember must not be (None)
// joiningMember.get.status must be(MemberStatus.Joining)
"be in 'Joining' phase when started up" in { // }
runOn(first) { // }
val members = cluster.latestGossip.members // }
members.size must be(1) //
// "A second cluster node" must {
val joiningMember = members find (_.address == firstAddress) // "join the other node cluster when sending a Join command" taggedAs LongRunningTest in {
joiningMember must not be (None) //
joiningMember.get.status must be(MemberStatus.Joining) // runOn(second) {
} // cluster.join(firstAddress)
} // }
} //
// awaitCond {
"A second cluster node" must { // cluster.latestGossip.members.exists { member
"join the other node cluster when sending a Join command" in { // member.address == secondAddress && member.status == MemberStatus.Up
// }
runOn(second) { // }
cluster.join(firstAddress) // cluster.latestGossip.members.size must be(2)
} // awaitCond(cluster.convergence.isDefined)
// }
awaitCond { // }
cluster.latestGossip.members.exists { member //
member.address == secondAddress && member.status == MemberStatus.Up //}
}
}
cluster.latestGossip.members.size must be(2)
awaitCond(cluster.convergence.isDefined)
}
}
}

View file

@ -0,0 +1,31 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import akka.actor.Address
import scala.util.Random
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class MemberSpec extends WordSpec with MustMatchers {
"Member" must {
"be sorted by address correctly" in {
import Member.ordering
// sorting should be done on host and port, only
val m1 = Member(Address("akka", "sys1", "host1", 9000), MemberStatus.Up)
val m2 = Member(Address("akka", "sys1", "host1", 10000), MemberStatus.Up)
val m3 = Member(Address("cluster", "sys2", "host2", 8000), MemberStatus.Up)
val m4 = Member(Address("cluster", "sys2", "host2", 9000), MemberStatus.Up)
val m5 = Member(Address("cluster", "sys1", "host2", 10000), MemberStatus.Up)
val expected = IndexedSeq(m1, m2, m3, m4, m5)
val shuffled = Random.shuffle(expected)
shuffled.sorted must be(expected)
}
}
}

View file

@ -181,7 +181,7 @@ which might contain actor references.
External Akka Serializers External Akka Serializers
========================= =========================
`Akka-protostuff by Roman Levenstein<https://github.com/romix/akka-protostuff-serialization>`_ `Akka-protostuff by Roman Levenstein <https://github.com/romix/akka-protostuff-serialization>`_
`Akka-quickser by Roman Levenstein<https://github.com/romix/akka-quickser-serialization>`_ `Akka-quickser by Roman Levenstein <https://github.com/romix/akka-quickser-serialization>`_

View file

@ -163,7 +163,7 @@ Typed Actor Hierarchies
Since you can obtain a contextual Typed Actor Extension by passing in an ``ActorContext`` Since you can obtain a contextual Typed Actor Extension by passing in an ``ActorContext``
you can create child Typed Actors by invoking ``typedActorOf(..)`` on that. you can create child Typed Actors by invoking ``typedActorOf(..)`` on that.
.. includecode:: code/akka/docs/actor/TypedActorDocTestBase.java .. includecode:: code/docs/actor/TypedActorDocTestBase.java
:include: typed-actor-hierarchy :include: typed-actor-hierarchy
You can also create a child Typed Actor in regular Akka Actors by giving the ``UntypedActorContext`` You can also create a child Typed Actor in regular Akka Actors by giving the ``UntypedActorContext``

View file

@ -1,7 +1,7 @@
package docs.testkit package docs.testkit
import org.specs2._ import org.specs2.Specification
import org.specs2.specification.Scope import org.specs2.specification.{ Step, Scope }
import akka.actor.{ Props, ActorSystem, Actor } import akka.actor.{ Props, ActorSystem, Actor }
import akka.testkit.{ TestKit, ImplicitSender } import akka.testkit.{ TestKit, ImplicitSender }
@ -13,10 +13,12 @@ class Specs2DemoAcceptance extends Specification {
p ^ p ^
"A TestKit should" ^ "A TestKit should" ^
"work properly with Specs2 acceptance tests" ! e1 ^ "work properly with Specs2 acceptance tests" ! e1 ^
"correctly convert durations" ! e2 "correctly convert durations" ! e2 ^
Step(system.shutdown()) ^ end // do not forget to shutdown!
val system = ActorSystem() val system = ActorSystem()
// an alternative to mixing in NoTimeConversions
implicit def d2d(d: org.specs2.time.Duration): akka.util.FiniteDuration = implicit def d2d(d: org.specs2.time.Duration): akka.util.FiniteDuration =
akka.util.Duration(d.inMilliseconds, "millis") akka.util.Duration(d.inMilliseconds, "millis")

View file

@ -2,20 +2,19 @@ package docs.testkit
import org.specs2.mutable.Specification import org.specs2.mutable.Specification
import org.specs2.specification.Scope import org.specs2.specification.Scope
import org.specs2.time.NoTimeConversions
import akka.actor.{ Props, ActorSystem, Actor } import akka.actor.{ Props, ActorSystem, Actor }
import akka.testkit.{ TestKit, ImplicitSender } import akka.testkit.{ TestKit, ImplicitSender }
import akka.util.duration._
class Specs2DemoUnitSpec extends Specification { class Specs2DemoUnitSpec extends Specification with NoTimeConversions {
val system = ActorSystem() val system = ActorSystem()
implicit def d2d(d: org.specs2.time.Duration): akka.util.FiniteDuration =
akka.util.Duration(d.inMilliseconds, "millis")
/* /*
* this is needed if different test cases would clash when run concurrently, * this is needed if different test cases would clash when run concurrently,
* e.g. when creating specifically named top-level actors * e.g. when creating specifically named top-level actors; leave out otherwise
*/ */
sequential sequential
@ -31,4 +30,6 @@ class Specs2DemoUnitSpec extends Specification {
} }
} }
} }
step(system.shutdown) // do not forget to shutdown!
} }

View file

@ -188,7 +188,7 @@ which might contain actor references.
External Akka Serializers External Akka Serializers
========================= =========================
`Akka-protostuff by Roman Levenstein<https://github.com/romix/akka-protostuff-serialization>`_ `Akka-protostuff by Roman Levenstein <https://github.com/romix/akka-protostuff-serialization>`_
`Akka-quickser by Roman Levenstein<https://github.com/romix/akka-quickser-serialization>`_ `Akka-quickser by Roman Levenstein <https://github.com/romix/akka-quickser-serialization>`_

View file

@ -684,11 +684,15 @@ Some `Specs2 <http://specs2.org>`_ users have contributed examples of how to wor
with :class:`org.specs2.specification.Scope`. with :class:`org.specs2.specification.Scope`.
* The Specification traits provide a :class:`Duration` DSL which uses partly * The Specification traits provide a :class:`Duration` DSL which uses partly
the same method names as :class:`akka.util.Duration`, resulting in ambiguous the same method names as :class:`akka.util.Duration`, resulting in ambiguous
implicits if ``akka.util.duration._`` is imported. The work-around is to use implicits if ``akka.util.duration._`` is imported. There are two work-arounds:
the Specification variants and supply an implicit conversion to the Akka
Duration. This conversion is not supplied with the Akka distribution because * either use the Specification variant of Duration and supply an implicit
that would mean that our JAR files would dependon Specs2, which is not conversion to the Akka Duration. This conversion is not supplied with the
justified by this little feature. Akka distribution because that would mean that our JAR files would dependon
Specs2, which is not justified by this little feature.
* or mix :class:`org.specs2.time.NoTimeConversions` into the Specification.
* Specifications are by default executed concurrently, which requires some care * Specifications are by default executed concurrently, which requires some care
when writing the tests or alternatively the ``sequential`` keyword. when writing the tests or alternatively the ``sequential`` keyword.

View file

@ -163,7 +163,7 @@ Typed Actor Hierarchies
Since you can obtain a contextual Typed Actor Extension by passing in an ``ActorContext`` Since you can obtain a contextual Typed Actor Extension by passing in an ``ActorContext``
you can create child Typed Actors by invoking ``typedActorOf(..)`` on that: you can create child Typed Actors by invoking ``typedActorOf(..)`` on that:
.. includecode:: code/akka/docs/actor/TypedActorDocSpec.scala .. includecode:: code/docs/actor/TypedActorDocSpec.scala
:include: typed-actor-hierarchy :include: typed-actor-hierarchy
You can also create a child Typed Actor in regular Akka Actors by giving the ``ActorContext`` You can also create a child Typed Actor in regular Akka Actors by giving the ``ActorContext``

View file

@ -468,11 +468,16 @@ private[akka] object BarrierCoordinator {
override def toString = productPrefix + productIterator.mkString("(", ", ", ")") override def toString = productPrefix + productIterator.mkString("(", ", ", ")")
} }
case class BarrierTimeout(data: Data) extends RuntimeException(data.barrier) with NoStackTrace with Printer case class BarrierTimeout(data: Data)
case class DuplicateNode(data: Data, node: Controller.NodeInfo) extends RuntimeException with NoStackTrace with Printer extends RuntimeException("timeout while waiting for barrier '" + data.barrier + "'") with NoStackTrace with Printer
case class WrongBarrier(barrier: String, client: ActorRef, data: Data) extends RuntimeException(barrier) with NoStackTrace with Printer case class DuplicateNode(data: Data, node: Controller.NodeInfo)
extends RuntimeException(node.toString) with NoStackTrace with Printer
case class WrongBarrier(barrier: String, client: ActorRef, data: Data)
extends RuntimeException(data.clients.find(_.fsm == client).map(_.name.toString).getOrElse(client.toString) +
" tried to enter '" + barrier + "' while we were waiting for '" + data.barrier + "'") with NoStackTrace with Printer
case class BarrierEmpty(data: Data, msg: String) extends RuntimeException(msg) with NoStackTrace with Printer case class BarrierEmpty(data: Data, msg: String) extends RuntimeException(msg) with NoStackTrace with Printer
case class ClientLost(data: Data, client: RoleName) extends RuntimeException with NoStackTrace with Printer case class ClientLost(data: Data, client: RoleName)
extends RuntimeException("unannounced disconnect of " + client) with NoStackTrace with Printer
} }
/** /**
@ -506,7 +511,7 @@ private[akka] class BarrierCoordinator extends Actor with LoggingFSM[BarrierCoor
if (clients.find(_.name == n.name).isDefined) throw new DuplicateNode(d, n) if (clients.find(_.name == n.name).isDefined) throw new DuplicateNode(d, n)
stay using d.copy(clients = clients + n) stay using d.copy(clients = clients + n)
case Event(ClientDisconnected(name), d @ Data(clients, _, arrived)) case Event(ClientDisconnected(name), d @ Data(clients, _, arrived))
if (clients.isEmpty) throw BarrierEmpty(d, "no client to disconnect") if (clients.isEmpty) throw BarrierEmpty(d, "cannot disconnect " + name + ": no client to disconnect")
(clients find (_.name == name)) match { (clients find (_.name == name)) match {
case None stay case None stay
case Some(c) throw ClientLost(d.copy(clients = clients - c, arrived = arrived filterNot (_ == c.fsm)), name) case Some(c) throw ClientLost(d.copy(clients = clients - c, arrived = arrived filterNot (_ == c.fsm)), name)
@ -524,7 +529,7 @@ private[akka] class BarrierCoordinator extends Actor with LoggingFSM[BarrierCoor
else else
goto(Waiting) using d.copy(barrier = name, arrived = sender :: Nil) goto(Waiting) using d.copy(barrier = name, arrived = sender :: Nil)
case Event(RemoveClient(name), d @ Data(clients, _, _)) case Event(RemoveClient(name), d @ Data(clients, _, _))
if (clients.isEmpty) throw BarrierEmpty(d, "no client to remove") if (clients.isEmpty) throw BarrierEmpty(d, "cannot remove " + name + ": no client to remove")
stay using d.copy(clients = clients filterNot (_.name == name)) stay using d.copy(clients = clients filterNot (_.name == name))
} }

View file

@ -54,7 +54,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with
EventFilter[BarrierEmpty](occurrences = 1) intercept { EventFilter[BarrierEmpty](occurrences = 1) intercept {
b ! RemoveClient(A) b ! RemoveClient(A)
} }
expectMsg(Failed(b, BarrierEmpty(Data(Set(), "", Nil), "no client to remove"))) expectMsg(Failed(b, BarrierEmpty(Data(Set(), "", Nil), "cannot remove RoleName(a): no client to remove")))
} }
"register clients and disconnect them" in { "register clients and disconnect them" in {
@ -68,7 +68,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with
EventFilter[BarrierEmpty](occurrences = 1) intercept { EventFilter[BarrierEmpty](occurrences = 1) intercept {
b ! ClientDisconnected(A) b ! ClientDisconnected(A)
} }
expectMsg(Failed(b, BarrierEmpty(Data(Set(), "", Nil), "no client to disconnect"))) expectMsg(Failed(b, BarrierEmpty(Data(Set(), "", Nil), "cannot disconnect RoleName(a): no client to disconnect")))
} }
"fail entering barrier when nobody registered" in { "fail entering barrier when nobody registered" in {
@ -187,7 +187,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with
EventFilter[BarrierEmpty](occurrences = 1) intercept { EventFilter[BarrierEmpty](occurrences = 1) intercept {
barrier ! RemoveClient(A) barrier ! RemoveClient(A)
} }
expectMsg(Failed(barrier, BarrierEmpty(Data(Set(), "", Nil), "no client to remove"))) expectMsg(Failed(barrier, BarrierEmpty(Data(Set(), "", Nil), "cannot remove RoleName(a): no client to remove")))
barrier ! NodeInfo(A, AddressFromURIString("akka://sys"), a.ref) barrier ! NodeInfo(A, AddressFromURIString("akka://sys"), a.ref)
a.send(barrier, EnterBarrier("right")) a.send(barrier, EnterBarrier("right"))
a.expectMsg(ToClient(BarrierResult("right", false))) a.expectMsg(ToClient(BarrierResult("right", false)))

View file

@ -131,17 +131,15 @@ class Hakker(name: String, left: ActorRef, right: ActorRef) extends Actor {
object DiningHakkers { object DiningHakkers {
val system = ActorSystem() val system = ActorSystem()
def main(args: Array[String]): Unit = { def main(args: Array[String]): Unit = run
run
}
def run { def run {
//Create 5 chopsticks //Create 5 chopsticks
val chopsticks = for (i 1 to 5) yield system.actorOf(Props[Chopstick], "Chopstick " + i) val chopsticks = for (i 1 to 5) yield system.actorOf(Props[Chopstick], "Chopstick" + i)
//Create 5 awesome hakkers and assign them their left and right chopstick //Create 5 awesome hakkers and assign them their left and right chopstick
val hakkers = for { val hakkers = for {
(name, i) List("Ghosh", "Bonér", "Klang", "Krasser", "Manie").zipWithIndex (name, i) List("Ghosh", "Boner", "Klang", "Krasser", "Manie").zipWithIndex
} yield system.actorOf(Props(new Hakker(name, chopsticks(i), chopsticks((i + 1) % 5)))) } yield system.actorOf(Props(new Hakker(name, chopsticks(i), chopsticks((i + 1) % 5))))
//Signal all hakkers that they should start thinking, and watch the show //Signal all hakkers that they should start thinking, and watch the show

View file

@ -169,16 +169,14 @@ object DiningHakkersOnFsm {
val system = ActorSystem() val system = ActorSystem()
def main(args: Array[String]): Unit = { def main(args: Array[String]): Unit = run
run
}
def run = { def run = {
// Create 5 chopsticks // Create 5 chopsticks
val chopsticks = for (i 1 to 5) yield system.actorOf(Props[Chopstick], "Chopstick " + i) val chopsticks = for (i 1 to 5) yield system.actorOf(Props[Chopstick], "Chopstick" + i)
// Create 5 awesome fsm hakkers and assign them their left and right chopstick // Create 5 awesome fsm hakkers and assign them their left and right chopstick
val hakkers = for { val hakkers = for {
(name, i) List("Ghosh", "Bonér", "Klang", "Krasser", "Manie").zipWithIndex (name, i) List("Ghosh", "Boner", "Klang", "Krasser", "Manie").zipWithIndex
} yield system.actorOf(Props(new FSMHakker(name, chopsticks(i), chopsticks((i + 1) % 5)))) } yield system.actorOf(Props(new FSMHakker(name, chopsticks(i), chopsticks((i + 1) % 5))))
hakkers.foreach(_ ! Think) hakkers.foreach(_ ! Think)