diff --git a/akka-actor/src/main/scala/akka/event/Logging.scala b/akka-actor/src/main/scala/akka/event/Logging.scala index b91509ac9f..0777d9aef1 100644 --- a/akka-actor/src/main/scala/akka/event/Logging.scala +++ b/akka-actor/src/main/scala/akka/event/Logging.scala @@ -875,3 +875,16 @@ class BusLogging(val bus: LoggingBus, val logSource: String, val logClass: Class protected def notifyInfo(message: String): Unit = bus.publish(Info(logSource, logClass, message)) protected def notifyDebug(message: String): Unit = bus.publish(Debug(logSource, logClass, message)) } + +private[akka] object NoLogging extends LoggingAdapter { + def isErrorEnabled = false + def isWarningEnabled = false + def isInfoEnabled = false + def isDebugEnabled = false + + protected def notifyError(message: String): Unit = () + protected def notifyError(cause: Throwable, message: String): Unit = () + protected def notifyWarning(message: String): Unit = () + protected def notifyInfo(message: String): Unit = () + protected def notifyDebug(message: String): Unit = () +} diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index a86bc0148c..c495e470ce 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -118,6 +118,15 @@ object Member { case _ ⇒ None } + def pickHighestPriority(a: Set[Member], b: Set[Member]): Set[Member] = { + // group all members by Address => Seq[Member] + val groupedByAddress = (a.toSeq ++ b.toSeq).groupBy(_.address) + // pick highest MemberStatus + (Set.empty[Member] /: groupedByAddress) { + case (acc, (_, members)) ⇒ acc + members.reduceLeft(highestPriorityOf) + } + } + /** * Picks the Member with the highest "priority" MemberStatus. */ @@ -130,8 +139,8 @@ object Member { case (_, Exiting) ⇒ m2 case (Leaving, _) ⇒ m1 case (_, Leaving) ⇒ m2 - case (Up, Joining) ⇒ m1 - case (Joining, Up) ⇒ m2 + case (Up, Joining) ⇒ m2 + case (Joining, Up) ⇒ m1 case (Joining, Joining) ⇒ m1 case (Up, Up) ⇒ m1 } @@ -268,21 +277,12 @@ case class Gossip( // 2. merge meta-data val mergedMeta = this.meta ++ that.meta - def pickHighestPriority(a: Seq[Member], b: Seq[Member]): Set[Member] = { - // group all members by Address => Seq[Member] - val groupedByAddress = (a ++ b).groupBy(_.address) - // pick highest MemberStatus - (Set.empty[Member] /: groupedByAddress) { - case (acc, (_, members)) ⇒ acc + members.reduceLeft(Member.highestPriorityOf) - } - } - // 3. merge unreachable by selecting the single Member with highest MemberStatus out of the Member groups - val mergedUnreachable = pickHighestPriority(this.overview.unreachable.toSeq, that.overview.unreachable.toSeq) + val mergedUnreachable = Member.pickHighestPriority(this.overview.unreachable, that.overview.unreachable) // 4. merge members by selecting the single Member with highest MemberStatus out of the Member groups, // and exclude unreachable - val mergedMembers = Gossip.emptyMembers ++ pickHighestPriority(this.members.toSeq, that.members.toSeq). + val mergedMembers = Gossip.emptyMembers ++ Member.pickHighestPriority(this.members, that.members). filterNot(mergedUnreachable.contains) // 5. fresh seen table @@ -522,24 +522,28 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) } // start periodic gossip to random nodes in cluster - private val gossipTask = FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay, GossipInterval) { - gossip() - } + private val gossipTask = + FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(GossipInterval), GossipInterval) { + gossip() + } // start periodic heartbeat to all nodes in cluster - private val heartbeatTask = FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay, HeartbeatInterval) { - heartbeat() - } + private val heartbeatTask = + FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(HeartbeatInterval), HeartbeatInterval) { + heartbeat() + } // start periodic cluster failure detector reaping (moving nodes condemned by the failure detector to unreachable list) - private val failureDetectorReaperTask = FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay, UnreachableNodesReaperInterval) { - reapUnreachableMembers() - } + private val failureDetectorReaperTask = + FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(UnreachableNodesReaperInterval), UnreachableNodesReaperInterval) { + reapUnreachableMembers() + } // start periodic leader action management (only applies for the current leader) - private val leaderActionsTask = FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay, LeaderActionsInterval) { - leaderActions() - } + private val leaderActionsTask = + FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(LeaderActionsInterval), LeaderActionsInterval) { + leaderActions() + } createMBean() diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala index bdc0a1ae8b..52206f1b8c 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala @@ -39,7 +39,7 @@ abstract class ConvergenceSpec "A cluster of 3 members" must { - "reach initial convergence" taggedAs LongRunningTest ignore { + "reach initial convergence" taggedAs LongRunningTest in { awaitClusterUp(first, second, third) runOn(fourth) { @@ -49,7 +49,7 @@ abstract class ConvergenceSpec testConductor.enter("after-1") } - "not reach convergence while any nodes are unreachable" taggedAs LongRunningTest ignore { + "not reach convergence while any nodes are unreachable" taggedAs LongRunningTest in { val thirdAddress = node(third).address testConductor.enter("before-shutdown") @@ -81,7 +81,7 @@ abstract class ConvergenceSpec testConductor.enter("after-2") } - "not move a new joining node to Up while there is no convergence" taggedAs LongRunningTest ignore { + "not move a new joining node to Up while there is no convergence" taggedAs LongRunningTest in { runOn(fourth) { // try to join cluster.join(node(first).address) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala index f4ea161b2a..4b64bb6e58 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala @@ -17,7 +17,7 @@ object JoinTwoClustersMultiJvmSpec extends MultiNodeConfig { val c1 = role("c1") val c2 = role("c2") - commonConfig(debugConfig(on = true).withFallback(MultiNodeClusterSpec.clusterConfig)) + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) } class JoinTwoClustersMultiJvmNode1 extends JoinTwoClustersSpec with FailureDetectorPuppetStrategy diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala index d9b2c7b876..88cee08191 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala @@ -21,7 +21,7 @@ object MembershipChangeListenerExitingMultiJvmSpec extends MultiNodeConfig { .withFallback(ConfigFactory.parseString(""" akka.cluster { leader-actions-interval = 5 s # increase the leader action task interval - unreachable-nodes-reaper-interval = 30 s # turn "off" reaping to unreachable node set + unreachable-nodes-reaper-interval = 300 s # turn "off" reaping to unreachable node set } """) .withFallback(MultiNodeClusterSpec.clusterConfig))) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala index eda29ea0f0..0640e58175 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala @@ -19,7 +19,7 @@ object MembershipChangeListenerLeavingMultiJvmSpec extends MultiNodeConfig { debugConfig(on = false) .withFallback(ConfigFactory.parseString(""" akka.cluster.leader-actions-interval = 5 s - akka.cluster.unreachable-nodes-reaper-interval = 30 s + akka.cluster.unreachable-nodes-reaper-interval = 300 s # turn "off" """)) .withFallback(MultiNodeClusterSpec.clusterConfig)) } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index 7b08afc4a9..7d1457234f 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -11,6 +11,8 @@ import akka.remote.testkit.MultiNodeSpec import akka.testkit._ import akka.util.duration._ import akka.util.Duration +import org.scalatest.Suite +import org.scalatest.TestFailedException import java.util.concurrent.ConcurrentHashMap import akka.actor.ActorPath import akka.actor.RootActorPath @@ -32,7 +34,7 @@ object MultiNodeClusterSpec { """) } -trait MultiNodeClusterSpec extends FailureDetectorStrategy { self: MultiNodeSpec ⇒ +trait MultiNodeClusterSpec extends FailureDetectorStrategy with Suite { self: MultiNodeSpec ⇒ override def initialParticipants = roles.size @@ -59,6 +61,24 @@ trait MultiNodeClusterSpec extends FailureDetectorStrategy { self: MultiNodeSpec */ implicit def role2Address(role: RoleName): Address = address(role) + // Cluster tests are written so that if previous step (test method) failed + // it will most likely not be possible to run next step. This ensures + // fail fast of steps after the first failure. + private var failed = false + override protected def withFixture(test: NoArgTest): Unit = try { + if (failed) { + val e = new TestFailedException("Previous step failed", 0) + // short stack trace + e.setStackTrace(e.getStackTrace.take(1)) + throw e + } + super.withFixture(test) + } catch { + case t ⇒ + failed = true + throw t + } + /** * The cluster node instance. Needs to be lazily created. */ @@ -176,8 +196,8 @@ trait MultiNodeClusterSpec extends FailureDetectorStrategy { self: MultiNodeSpec def compare(x: RoleName, y: RoleName) = addressOrdering.compare(address(x), address(y)) } - def roleName(address: Address): Option[RoleName] = { - testConductor.getNodes.await.find(node(_).address == address) + def roleName(addr: Address): Option[RoleName] = { + roles.find(address(_) == addr) } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala index 6378a74040..fc62c17c1d 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala @@ -20,7 +20,7 @@ object NodeLeavingAndExitingMultiJvmSpec extends MultiNodeConfig { .withFallback(ConfigFactory.parseString(""" akka.cluster { leader-actions-interval = 5 s # increase the leader action task frequency to make sure we get a chance to test the LEAVING state - unreachable-nodes-reaper-interval = 30 s + unreachable-nodes-reaper-interval = 300 s # turn "off" } """) .withFallback(MultiNodeClusterSpec.clusterConfig))) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala index b8486841c6..6f3ddfc866 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala @@ -21,18 +21,17 @@ object SunnyWeatherMultiJvmSpec extends MultiNodeConfig { commonConfig(ConfigFactory.parseString(""" akka.cluster { - gossip-interval = 400 ms nr-of-deputy-nodes = 0 } akka.loglevel = INFO """)) } -class SunnyWeatherMultiJvmNode1 extends SunnyWeatherSpec with FailureDetectorPuppetStrategy -class SunnyWeatherMultiJvmNode2 extends SunnyWeatherSpec with FailureDetectorPuppetStrategy -class SunnyWeatherMultiJvmNode3 extends SunnyWeatherSpec with FailureDetectorPuppetStrategy -class SunnyWeatherMultiJvmNode4 extends SunnyWeatherSpec with FailureDetectorPuppetStrategy -class SunnyWeatherMultiJvmNode5 extends SunnyWeatherSpec with FailureDetectorPuppetStrategy +class SunnyWeatherMultiJvmNode1 extends SunnyWeatherSpec with AccrualFailureDetectorStrategy +class SunnyWeatherMultiJvmNode2 extends SunnyWeatherSpec with AccrualFailureDetectorStrategy +class SunnyWeatherMultiJvmNode3 extends SunnyWeatherSpec with AccrualFailureDetectorStrategy +class SunnyWeatherMultiJvmNode4 extends SunnyWeatherSpec with AccrualFailureDetectorStrategy +class SunnyWeatherMultiJvmNode5 extends SunnyWeatherSpec with AccrualFailureDetectorStrategy abstract class SunnyWeatherSpec extends MultiNodeSpec(SunnyWeatherMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala new file mode 100644 index 0000000000..16cc4b385f --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala @@ -0,0 +1,432 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.cluster + +import com.typesafe.config.ConfigFactory +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import akka.actor.Address +import akka.remote.testconductor.RoleName +import MemberStatus._ + +object TransitionMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + val fourth = role("fourth") + val fifth = role("fifth") + + commonConfig(debugConfig(on = false). + withFallback(ConfigFactory.parseString( + "akka.cluster.periodic-tasks-initial-delay = 300 s # turn off all periodic tasks")). + withFallback(MultiNodeClusterSpec.clusterConfig)) +} + +class TransitionMultiJvmNode1 extends TransitionSpec with FailureDetectorPuppetStrategy +class TransitionMultiJvmNode2 extends TransitionSpec with FailureDetectorPuppetStrategy +class TransitionMultiJvmNode3 extends TransitionSpec with FailureDetectorPuppetStrategy +class TransitionMultiJvmNode4 extends TransitionSpec with FailureDetectorPuppetStrategy +class TransitionMultiJvmNode5 extends TransitionSpec with FailureDetectorPuppetStrategy + +abstract class TransitionSpec + extends MultiNodeSpec(TransitionMultiJvmSpec) + with MultiNodeClusterSpec { + + import TransitionMultiJvmSpec._ + + // sorted in the order used by the cluster + def leader(roles: RoleName*) = roles.sorted.head + def nonLeader(roles: RoleName*) = roles.toSeq.sorted.tail + + def memberStatus(address: Address): MemberStatus = { + val statusOption = (cluster.latestGossip.members ++ cluster.latestGossip.overview.unreachable).collectFirst { + case m if m.address == address ⇒ m.status + } + statusOption must not be (None) + statusOption.get + } + + def memberAddresses: Set[Address] = cluster.latestGossip.members.map(_.address) + + def members: Set[RoleName] = memberAddresses.flatMap(roleName(_)) + + def seenLatestGossip: Set[RoleName] = { + val gossip = cluster.latestGossip + gossip.overview.seen.collect { + case (address, v) if v == gossip.version ⇒ roleName(address) + }.flatten.toSet + } + + def awaitSeen(addresses: Address*): Unit = awaitCond { + seenLatestGossip.map(node(_).address) == addresses.toSet + } + + def awaitMembers(addresses: Address*): Unit = awaitCond { + memberAddresses == addresses.toSet + } + + def awaitMemberStatus(address: Address, status: MemberStatus): Unit = awaitCond { + memberStatus(address) == Up + } + + // DSL sugar for `role1 gossipTo role2` + implicit def roleExtras(role: RoleName): RoleWrapper = new RoleWrapper(role) + var gossipBarrierCounter = 0 + class RoleWrapper(fromRole: RoleName) { + def gossipTo(toRole: RoleName): Unit = { + gossipBarrierCounter += 1 + runOn(toRole) { + val g = cluster.latestGossip + testConductor.enter("before-gossip-" + gossipBarrierCounter) + awaitCond(cluster.latestGossip != g) // received gossip + testConductor.enter("after-gossip-" + gossipBarrierCounter) + } + runOn(fromRole) { + testConductor.enter("before-gossip-" + gossipBarrierCounter) + cluster.gossipTo(node(toRole).address) // send gossip + testConductor.enter("after-gossip-" + gossipBarrierCounter) + } + runOn(roles.filterNot(r ⇒ r == fromRole || r == toRole): _*) { + testConductor.enter("before-gossip-" + gossipBarrierCounter) + testConductor.enter("after-gossip-" + gossipBarrierCounter) + } + } + } + + "A Cluster" must { + + "start nodes as singleton clusters" taggedAs LongRunningTest in { + + startClusterNode() + cluster.isSingletonCluster must be(true) + cluster.status must be(Joining) + cluster.convergence.isDefined must be(true) + cluster.leaderActions() + cluster.status must be(Up) + + testConductor.enter("after-1") + } + + "perform correct transitions when second joining first" taggedAs LongRunningTest in { + + runOn(second) { + cluster.join(first) + } + runOn(first) { + awaitMembers(first, second) + memberStatus(first) must be(Up) + memberStatus(second) must be(Joining) + cluster.convergence.isDefined must be(false) + } + testConductor.enter("second-joined") + + first gossipTo second + runOn(second) { + members must be(Set(first, second)) + memberStatus(first) must be(Up) + memberStatus(second) must be(Joining) + // we got a conflicting version in second, and therefore not convergence in second + seenLatestGossip must be(Set(second)) + cluster.convergence.isDefined must be(false) + } + + second gossipTo first + runOn(first) { + seenLatestGossip must be(Set(first, second)) + } + + first gossipTo second + runOn(second) { + seenLatestGossip must be(Set(first, second)) + } + + runOn(first, second) { + memberStatus(first) must be(Up) + memberStatus(second) must be(Joining) + cluster.convergence.isDefined must be(true) + } + testConductor.enter("convergence-joining-2") + + runOn(leader(first, second)) { + cluster.leaderActions() + memberStatus(first) must be(Up) + memberStatus(second) must be(Up) + } + testConductor.enter("leader-actions-2") + + leader(first, second) gossipTo nonLeader(first, second).head + runOn(nonLeader(first, second).head) { + memberStatus(first) must be(Up) + memberStatus(second) must be(Up) + seenLatestGossip must be(Set(first, second)) + cluster.convergence.isDefined must be(true) + } + + nonLeader(first, second).head gossipTo leader(first, second) + runOn(first, second) { + memberStatus(first) must be(Up) + memberStatus(second) must be(Up) + seenLatestGossip must be(Set(first, second)) + cluster.convergence.isDefined must be(true) + } + + testConductor.enter("after-2") + } + + "perform correct transitions when third joins second" taggedAs LongRunningTest in { + + runOn(third) { + cluster.join(second) + } + runOn(second) { + awaitMembers(first, second, third) + cluster.convergence.isDefined must be(false) + memberStatus(third) must be(Joining) + seenLatestGossip must be(Set(second)) + } + testConductor.enter("third-joined-second") + + second gossipTo first + runOn(first) { + members must be(Set(first, second, third)) + cluster.convergence.isDefined must be(false) + memberStatus(third) must be(Joining) + } + + first gossipTo third + runOn(third) { + members must be(Set(first, second, third)) + cluster.convergence.isDefined must be(false) + memberStatus(third) must be(Joining) + // conflicting version + seenLatestGossip must be(Set(third)) + } + + third gossipTo first + third gossipTo second + runOn(first, second) { + seenLatestGossip must be(Set(myself, third)) + } + + first gossipTo second + runOn(second) { + seenLatestGossip must be(Set(first, second, third)) + cluster.convergence.isDefined must be(true) + } + + runOn(first, third) { + cluster.convergence.isDefined must be(false) + } + + second gossipTo first + second gossipTo third + runOn(first, second, third) { + seenLatestGossip must be(Set(first, second, third)) + memberStatus(first) must be(Up) + memberStatus(second) must be(Up) + memberStatus(third) must be(Joining) + cluster.convergence.isDefined must be(true) + } + + testConductor.enter("convergence-joining-3") + + runOn(leader(first, second, third)) { + cluster.leaderActions() + memberStatus(first) must be(Up) + memberStatus(second) must be(Up) + memberStatus(third) must be(Up) + } + testConductor.enter("leader-actions-3") + + // leader gossipTo first non-leader + leader(first, second, third) gossipTo nonLeader(first, second, third).head + runOn(nonLeader(first, second, third).head) { + memberStatus(third) must be(Up) + seenLatestGossip must be(Set(leader(first, second, third), myself)) + cluster.convergence.isDefined must be(false) + } + + // first non-leader gossipTo the other non-leader + nonLeader(first, second, third).head gossipTo nonLeader(first, second, third).tail.head + runOn(nonLeader(first, second, third).head) { + cluster.gossipTo(node(nonLeader(first, second, third).tail.head).address) + } + runOn(nonLeader(first, second, third).tail.head) { + memberStatus(third) must be(Up) + seenLatestGossip must be(Set(first, second, third)) + cluster.convergence.isDefined must be(true) + } + + // and back again + nonLeader(first, second, third).tail.head gossipTo nonLeader(first, second, third).head + runOn(nonLeader(first, second, third).head) { + memberStatus(third) must be(Up) + seenLatestGossip must be(Set(first, second, third)) + cluster.convergence.isDefined must be(true) + } + + // first non-leader gossipTo the leader + nonLeader(first, second, third).head gossipTo leader(first, second, third) + runOn(first, second, third) { + memberStatus(first) must be(Up) + memberStatus(second) must be(Up) + memberStatus(third) must be(Up) + seenLatestGossip must be(Set(first, second, third)) + cluster.convergence.isDefined must be(true) + } + + testConductor.enter("after-3") + } + + "startup a second separated cluster consisting of nodes fourth and fifth" taggedAs LongRunningTest in { + runOn(fourth) { + cluster.join(fifth) + awaitMembers(fourth, fifth) + cluster.gossipTo(fifth) + awaitSeen(fourth, fifth) + cluster.convergence.isDefined must be(true) + } + runOn(fifth) { + awaitMembers(fourth, fifth) + cluster.gossipTo(fourth) + awaitSeen(fourth, fifth) + cluster.gossipTo(fourth) + cluster.convergence.isDefined must be(true) + } + testConductor.enter("fourth-joined-fifth") + + testConductor.enter("after-4") + } + + "perform correct transitions when second cluster (node fourth) joins first cluster (node third)" taggedAs LongRunningTest in { + + runOn(fourth) { + cluster.join(third) + } + runOn(third) { + awaitMembers(first, second, third, fourth) + seenLatestGossip must be(Set(third)) + } + testConductor.enter("fourth-joined-third") + + third gossipTo second + runOn(second) { + seenLatestGossip must be(Set(second, third)) + } + + second gossipTo fourth + runOn(fourth) { + members must be(roles.toSet) + // merge conflict + seenLatestGossip must be(Set(fourth)) + } + + fourth gossipTo first + fourth gossipTo second + fourth gossipTo third + fourth gossipTo fifth + runOn(first, second, third, fifth) { + members must be(roles.toSet) + seenLatestGossip must be(Set(fourth, myself)) + } + + first gossipTo fifth + runOn(fifth) { + seenLatestGossip must be(Set(first, fourth, fifth)) + } + + fifth gossipTo third + runOn(third) { + seenLatestGossip must be(Set(first, third, fourth, fifth)) + } + + third gossipTo second + runOn(second) { + seenLatestGossip must be(roles.toSet) + cluster.convergence.isDefined must be(true) + } + + second gossipTo first + second gossipTo third + second gossipTo fourth + third gossipTo fifth + + seenLatestGossip must be(roles.toSet) + memberStatus(first) must be(Up) + memberStatus(second) must be(Up) + memberStatus(third) must be(Up) + memberStatus(fourth) must be(Joining) + memberStatus(fifth) must be(Up) + cluster.convergence.isDefined must be(true) + + testConductor.enter("convergence-joining-3") + + runOn(leader(roles: _*)) { + cluster.leaderActions() + memberStatus(fourth) must be(Up) + seenLatestGossip must be(Set(myself)) + cluster.convergence.isDefined must be(false) + } + // spread the word + for (x :: y :: Nil ← (roles.sorted ++ roles.sorted.dropRight(1)).toList.sliding(2)) { + x gossipTo y + } + + testConductor.enter("spread-5") + + seenLatestGossip must be(roles.toSet) + memberStatus(first) must be(Up) + memberStatus(second) must be(Up) + memberStatus(third) must be(Up) + memberStatus(fourth) must be(Up) + memberStatus(fifth) must be(Up) + cluster.convergence.isDefined must be(true) + + testConductor.enter("after-5") + } + + "perform correct transitions when second becomes unavailble" taggedAs LongRunningTest in { + runOn(fifth) { + markNodeAsUnavailable(second) + cluster.reapUnreachableMembers() + cluster.latestGossip.overview.unreachable must contain(Member(second, Up)) + seenLatestGossip must be(Set(fifth)) + } + + // spread the word + val gossipRound = List(fifth, fourth, third, first, third, fourth, fifth) + for (x :: y :: Nil ← gossipRound.sliding(2)) { + x gossipTo y + } + + runOn((roles.filterNot(_ == second)): _*) { + cluster.latestGossip.overview.unreachable must contain(Member(second, Up)) + cluster.convergence.isDefined must be(false) + } + + runOn(third) { + cluster.down(second) + awaitMemberStatus(second, Down) + } + + // spread the word + val gossipRound2 = List(third, fourth, fifth, first, third, fourth, fifth) + for (x :: y :: Nil ← gossipRound2.sliding(2)) { + x gossipTo y + } + + runOn((roles.filterNot(_ == second)): _*) { + cluster.latestGossip.overview.unreachable must contain(Member(second, Down)) + memberStatus(second) must be(Down) + seenLatestGossip must be(Set(first, third, fourth, fifth)) + cluster.convergence.isDefined must be(true) + } + + testConductor.enter("after-6") + } + + } +} diff --git a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala index 449ebf7bff..8020010655 100644 --- a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala @@ -33,12 +33,12 @@ class GossipSpec extends WordSpec with MustMatchers { val g2 = Gossip(members = SortedSet(a2, c2, e2)) val merged1 = g1 merge g2 - merged1.members must be(SortedSet(a1, c1, e2)) - merged1.members.toSeq.map(_.status) must be(Seq(Up, Leaving, Up)) + merged1.members must be(SortedSet(a2, c1, e1)) + merged1.members.toSeq.map(_.status) must be(Seq(Joining, Leaving, Joining)) val merged2 = g2 merge g1 - merged2.members must be(SortedSet(a1, c1, e2)) - merged2.members.toSeq.map(_.status) must be(Seq(Up, Leaving, Up)) + merged2.members must be(SortedSet(a2, c1, e1)) + merged2.members.toSeq.map(_.status) must be(Seq(Joining, Leaving, Joining)) } @@ -48,12 +48,12 @@ class GossipSpec extends WordSpec with MustMatchers { val g2 = Gossip(members = Gossip.emptyMembers, overview = GossipOverview(unreachable = Set(a2, b2, c2, d2))) val merged1 = g1 merge g2 - merged1.overview.unreachable must be(Set(a1, b2, c1, d2)) - merged1.overview.unreachable.toSeq.sorted.map(_.status) must be(Seq(Up, Removed, Leaving, Removed)) + merged1.overview.unreachable must be(Set(a2, b2, c1, d2)) + merged1.overview.unreachable.toSeq.sorted.map(_.status) must be(Seq(Joining, Removed, Leaving, Removed)) val merged2 = g2 merge g1 - merged2.overview.unreachable must be(Set(a1, b2, c1, d2)) - merged2.overview.unreachable.toSeq.sorted.map(_.status) must be(Seq(Up, Removed, Leaving, Removed)) + merged2.overview.unreachable must be(Set(a2, b2, c1, d2)) + merged2.overview.unreachable.toSeq.sorted.map(_.status) must be(Seq(Joining, Removed, Leaving, Removed)) } @@ -62,14 +62,14 @@ class GossipSpec extends WordSpec with MustMatchers { val g2 = Gossip(members = SortedSet(a2, c2), overview = GossipOverview(unreachable = Set(b2, d2))) val merged1 = g1 merge g2 - merged1.members must be(SortedSet(a1)) - merged1.members.toSeq.map(_.status) must be(Seq(Up)) + merged1.members must be(SortedSet(a2)) + merged1.members.toSeq.map(_.status) must be(Seq(Joining)) merged1.overview.unreachable must be(Set(b2, c1, d2)) merged1.overview.unreachable.toSeq.sorted.map(_.status) must be(Seq(Removed, Leaving, Removed)) val merged2 = g2 merge g1 - merged2.members must be(SortedSet(a1)) - merged2.members.toSeq.map(_.status) must be(Seq(Up)) + merged2.members must be(SortedSet(a2)) + merged2.members.toSeq.map(_.status) must be(Seq(Joining)) merged2.overview.unreachable must be(Set(b2, c1, d2)) merged2.overview.unreachable.toSeq.sorted.map(_.status) must be(Seq(Removed, Leaving, Removed)) diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettySSLSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettySSLSupport.scala index 4c68069278..7e006373c2 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettySSLSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettySSLSupport.scala @@ -16,22 +16,22 @@ import akka.security.provider.AkkaProvider * Used for adding SSL support to Netty pipeline * Internal use only */ -private[netty] object NettySSLSupport { +private[akka] object NettySSLSupport { /** * Construct a SSLHandler which can be inserted into a Netty server/client pipeline */ def apply(settings: NettySettings, log: LoggingAdapter, isClient: Boolean): SslHandler = if (isClient) initialiseClientSSL(settings, log) else initialiseServerSSL(settings, log) - private def initialiseCustomSecureRandom(settings: NettySettings, log: LoggingAdapter): SecureRandom = { + def initialiseCustomSecureRandom(rngName: Option[String], sourceOfRandomness: Option[String], log: LoggingAdapter): SecureRandom = { /** * According to this bug report: http://bugs.sun.com/view_bug.do?bug_id=6202721 * Using /dev/./urandom is only necessary when using SHA1PRNG on Linux * Use 'new SecureRandom()' instead of 'SecureRandom.getInstance("SHA1PRNG")' to avoid having problems */ - settings.SSLRandomSource foreach { path ⇒ System.setProperty("java.security.egd", path) } + sourceOfRandomness foreach { path ⇒ System.setProperty("java.security.egd", path) } - val rng = settings.SSLRandomNumberGenerator match { + val rng = rngName match { case Some(r @ ("AES128CounterRNGFast" | "AES128CounterRNGSecure" | "AES256CounterRNGSecure")) ⇒ log.debug("SSL random number generator set to: {}", r) val akka = new AkkaProvider @@ -86,7 +86,7 @@ private[netty] object NettySSLSupport { trustStore.load(new FileInputStream(trustStorePath), trustStorePassword.toCharArray) //FIXME does the FileInputStream need to be closed? trustManagerFactory.init(trustStore) val trustManagers: Array[TrustManager] = trustManagerFactory.getTrustManagers - Option(SSLContext.getInstance(protocol)) map { ctx ⇒ ctx.init(null, trustManagers, initialiseCustomSecureRandom(settings, log)); ctx } + Option(SSLContext.getInstance(protocol)) map { ctx ⇒ ctx.init(null, trustManagers, initialiseCustomSecureRandom(settings.SSLRandomNumberGenerator, settings.SSLRandomSource, log)); ctx } } catch { case e: FileNotFoundException ⇒ throw new RemoteTransportException("Client SSL connection could not be established because trust store could not be loaded", e) case e: IOException ⇒ throw new RemoteTransportException("Client SSL connection could not be established because: " + e.getMessage, e) @@ -123,7 +123,7 @@ private[netty] object NettySSLSupport { val keyStore = KeyStore.getInstance(KeyStore.getDefaultType) keyStore.load(new FileInputStream(keyStorePath), keyStorePassword.toCharArray) //FIXME does the FileInputStream need to be closed? factory.init(keyStore, keyStorePassword.toCharArray) - Option(SSLContext.getInstance(protocol)) map { ctx ⇒ ctx.init(factory.getKeyManagers, null, initialiseCustomSecureRandom(settings, log)); ctx } + Option(SSLContext.getInstance(protocol)) map { ctx ⇒ ctx.init(factory.getKeyManagers, null, initialiseCustomSecureRandom(settings.SSLRandomNumberGenerator, settings.SSLRandomSource, log)); ctx } } catch { case e: FileNotFoundException ⇒ throw new RemoteTransportException("Server SSL connection could not be established because key store could not be loaded", e) case e: IOException ⇒ throw new RemoteTransportException("Server SSL connection could not be established because: " + e.getMessage, e) diff --git a/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala b/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala index ff41e369ff..4ac3c7ffe0 100644 --- a/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala @@ -9,7 +9,9 @@ import com.typesafe.config._ import akka.dispatch.{ Await, Future } import akka.pattern.ask import java.io.File -import java.security.{ PrivilegedAction, AccessController } +import java.security.{ SecureRandom, PrivilegedAction, AccessController } +import netty.NettySSLSupport +import akka.event.{ NoLogging, LoggingAdapter } object Configuration { // set this in your JAVA_OPTS to see all ssl debug info: "-Djavax.net.debug=ssl,keymanager" @@ -38,38 +40,42 @@ object Configuration { } """ - def getConfig(rng: String): String = { - conf.format(trustStore, keyStore, rng) - } + def getCipherConfig(cipher: String): (String, Boolean, Config) = if (try { + NettySSLSupport.initialiseCustomSecureRandom(Some(cipher), None, NoLogging) ne null + } catch { + case iae: IllegalArgumentException if iae.getMessage == "Cannot support %s with currently installed providers".format(cipher) ⇒ false + case nsae: java.security.NoSuchAlgorithmException ⇒ false + }) (cipher, true, ConfigFactory.parseString(conf.format(trustStore, keyStore, cipher))) else (cipher, false, AkkaSpec.testConf) } -@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class Ticket1978SHA1PRNG extends Ticket1978CommunicationSpec(Configuration.getConfig("SHA1PRNG")) +import Configuration.getCipherConfig @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class Ticket1978AES128CounterRNGFast extends Ticket1978CommunicationSpec(Configuration.getConfig("AES128CounterRNGFast")) +class Ticket1978SHA1PRNGSpec extends Ticket1978CommunicationSpec(getCipherConfig("SHA1PRNG")) + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class Ticket1978AES128CounterRNGFastSpec extends Ticket1978CommunicationSpec(getCipherConfig("AES128CounterRNGFast")) /** * Both of the Secure variants require access to the Internet to access random.org. */ @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class Ticket1978AES128CounterRNGSecure extends Ticket1978CommunicationSpec(Configuration.getConfig("AES128CounterRNGSecure")) +class Ticket1978AES128CounterRNGSecureSpec extends Ticket1978CommunicationSpec(getCipherConfig("AES128CounterRNGSecure")) /** * Both of the Secure variants require access to the Internet to access random.org. */ @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class Ticket1978AES256CounterRNGSecure extends Ticket1978CommunicationSpec(Configuration.getConfig("AES256CounterRNGSecure")) +class Ticket1978AES256CounterRNGSecureSpec extends Ticket1978CommunicationSpec(getCipherConfig("AES256CounterRNGSecure")) @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class Ticket1978CommunicationSpec(val configuration: String) - extends AkkaSpec(configuration) with ImplicitSender with DefaultTimeout { +class Ticket1978NonExistingRNGSecureSpec extends Ticket1978CommunicationSpec(("NonExistingRNG", false, AkkaSpec.testConf)) + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +abstract class Ticket1978CommunicationSpec(val cipherEnabledconfig: (String, Boolean, Config)) extends AkkaSpec(cipherEnabledconfig._3) with ImplicitSender with DefaultTimeout { import RemoteCommunicationSpec._ - // default SecureRandom RNG - def this() = this(Configuration.getConfig("")) - val conf = ConfigFactory.parseString("akka.remote.netty.port=12346").withFallback(system.settings.config) val other = ActorSystem("remote-sys", conf) @@ -86,74 +92,79 @@ class Ticket1978CommunicationSpec(val configuration: String) } "SSL Remoting" must { - - "support remote look-ups" in { - here ! "ping" - expectMsgPF() { - case ("pong", s: AnyRef) if s eq testActor ⇒ true - } - } - - "send error message for wrong address" in { - EventFilter.error(start = "dropping", occurrences = 1).intercept { - system.actorFor("akka://remotesys@localhost:12346/user/echo") ! "ping" - }(other) - } - - "support ask" in { - Await.result(here ? "ping", timeout.duration) match { - case ("pong", s: akka.pattern.PromiseActorRef) ⇒ // good - case m ⇒ fail(m + " was not (pong, AskActorRef)") - } - } - - "send dead letters on remote if actor does not exist" in { - EventFilter.warning(pattern = "dead.*buh", occurrences = 1).intercept { - system.actorFor("akka://remote-sys@localhost:12346/does/not/exist") ! "buh" - }(other) - } - - "create and supervise children on remote node" in { - val r = system.actorOf(Props[Echo], "blub") - r.path.toString must be === "akka://remote-sys@localhost:12346/remote/Ticket1978CommunicationSpec@localhost:12345/user/blub" - r ! 42 - expectMsg(42) - EventFilter[Exception]("crash", occurrences = 1).intercept { - r ! new Exception("crash") - }(other) - expectMsg("preRestart") - r ! 42 - expectMsg(42) - system.stop(r) - expectMsg("postStop") - } - - "look-up actors across node boundaries" in { - val l = system.actorOf(Props(new Actor { - def receive = { - case (p: Props, n: String) ⇒ sender ! context.actorOf(p, n) - case s: String ⇒ sender ! context.actorFor(s) + if (cipherEnabledconfig._2) { + "support remote look-ups" in { + here ! "ping" + expectMsgPF() { + case ("pong", s: AnyRef) if s eq testActor ⇒ true } - }), "looker") - l ! (Props[Echo], "child") - val r = expectMsgType[ActorRef] - r ! (Props[Echo], "grandchild") - val remref = expectMsgType[ActorRef] - remref.isInstanceOf[LocalActorRef] must be(true) - val myref = system.actorFor(system / "looker" / "child" / "grandchild") - myref.isInstanceOf[RemoteActorRef] must be(true) - myref ! 43 - expectMsg(43) - lastSender must be theSameInstanceAs remref - r.asInstanceOf[RemoteActorRef].getParent must be(l) - system.actorFor("/user/looker/child") must be theSameInstanceAs r - Await.result(l ? "child/..", timeout.duration).asInstanceOf[AnyRef] must be theSameInstanceAs l - Await.result(system.actorFor(system / "looker" / "child") ? "..", timeout.duration).asInstanceOf[AnyRef] must be theSameInstanceAs l - } + } - "not fail ask across node boundaries" in { - val f = for (_ ← 1 to 1000) yield here ? "ping" mapTo manifest[(String, ActorRef)] - Await.result(Future.sequence(f), remaining).map(_._1).toSet must be(Set("pong")) + "send error message for wrong address" in { + EventFilter.error(start = "dropping", occurrences = 1).intercept { + system.actorFor("akka://remotesys@localhost:12346/user/echo") ! "ping" + }(other) + } + + "support ask" in { + Await.result(here ? "ping", timeout.duration) match { + case ("pong", s: akka.pattern.PromiseActorRef) ⇒ // good + case m ⇒ fail(m + " was not (pong, AskActorRef)") + } + } + + "send dead letters on remote if actor does not exist" in { + EventFilter.warning(pattern = "dead.*buh", occurrences = 1).intercept { + system.actorFor("akka://remote-sys@localhost:12346/does/not/exist") ! "buh" + }(other) + } + + "create and supervise children on remote node" in { + val r = system.actorOf(Props[Echo], "blub") + r.path.toString must be === "akka://remote-sys@localhost:12346/remote/Ticket1978CommunicationSpec@localhost:12345/user/blub" + r ! 42 + expectMsg(42) + EventFilter[Exception]("crash", occurrences = 1).intercept { + r ! new Exception("crash") + }(other) + expectMsg("preRestart") + r ! 42 + expectMsg(42) + system.stop(r) + expectMsg("postStop") + } + + "look-up actors across node boundaries" in { + val l = system.actorOf(Props(new Actor { + def receive = { + case (p: Props, n: String) ⇒ sender ! context.actorOf(p, n) + case s: String ⇒ sender ! context.actorFor(s) + } + }), "looker") + l ! (Props[Echo], "child") + val r = expectMsgType[ActorRef] + r ! (Props[Echo], "grandchild") + val remref = expectMsgType[ActorRef] + remref.isInstanceOf[LocalActorRef] must be(true) + val myref = system.actorFor(system / "looker" / "child" / "grandchild") + myref.isInstanceOf[RemoteActorRef] must be(true) + myref ! 43 + expectMsg(43) + lastSender must be theSameInstanceAs remref + r.asInstanceOf[RemoteActorRef].getParent must be(l) + system.actorFor("/user/looker/child") must be theSameInstanceAs r + Await.result(l ? "child/..", timeout.duration).asInstanceOf[AnyRef] must be theSameInstanceAs l + Await.result(system.actorFor(system / "looker" / "child") ? "..", timeout.duration).asInstanceOf[AnyRef] must be theSameInstanceAs l + } + + "not fail ask across node boundaries" in { + val f = for (_ ← 1 to 1000) yield here ? "ping" mapTo manifest[(String, ActorRef)] + Await.result(Future.sequence(f), remaining).map(_._1).toSet must be(Set("pong")) + } + } else { + "not be run when the cipher is not supported by the platform this test is currently being executed on" ignore { + + } } }