From e5979bc31c6f4b998a5f7c60d9b1686353ff653f Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 2 Jul 2012 12:30:49 +0200 Subject: [PATCH] Gossip merge in large cluster, #2290 * Trying to simultaneously resolving conflicts at several nodes creates new conflicts. Therefore the leader resolves conflicts to limit divergence. To avoid overload there is also a configurable rate limit of how many conflicts that are handled by second. * Netty blocks when sending to broken connections. ClusterHeartbeatSender actor isolates sending to different nodes by using child workers for each target address and thereby reduce the risk of irregular heartbeats to healty nodes due to broken connections to other nodes. --- .../src/main/resources/reference.conf | 12 + .../src/main/scala/akka/cluster/Cluster.scala | 328 ++++++++++++++---- .../scala/akka/cluster/ClusterSettings.scala | 7 + .../scala/akka/cluster/LargeClusterSpec.scala | 25 +- .../akka/cluster/ClusterConfigSpec.scala | 5 + 5 files changed, 304 insertions(+), 73 deletions(-) diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index 613f6320d8..14536ae8b4 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -61,6 +61,10 @@ akka { # Probability value is between 0.0 and 1.0. 0.0 means never, 1.0 means always. gossip-different-view-probability = 0.8 + # Limit number of merge conflicts per second that are handled. If the limit is + # exceeded the conflicting gossip messages are dropped and will reappear later. + max-gossip-merge-rate = 5.0 + failure-detector { # defines the failure detector threshold @@ -97,5 +101,13 @@ akka { tick-duration = 33ms ticks-per-wheel = 512 } + + # Netty blocks when sending to broken connections, and this circuit breaker + # is used to reduce connect attempts to broken connections. + send-circuit-breaker { + max-failures = 3 + call-timeout = 2 s + reset-timeout = 30 s + } } } diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index b1bf73eddb..0b856edd17 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -29,6 +29,7 @@ import scala.annotation.tailrec import scala.collection.immutable.{ Map, SortedSet } import scala.collection.GenTraversableOnce import java.util.concurrent.atomic.AtomicLong +import java.security.MessageDigest /** * Interface for membership change listener. @@ -200,7 +201,13 @@ object Member { /** * Envelope adding a sender address to the gossip. */ -case class GossipEnvelope(from: Address, gossip: Gossip) extends ClusterMessage +case class GossipEnvelope(from: Address, gossip: Gossip, conversation: Boolean = true) extends ClusterMessage + +/** + * When conflicting versions of received and local [[akka.cluster.Gossip]] is detected + * it's forwarded to the leader for conflict resolution. + */ +case class GossipMergeConflict(a: GossipEnvelope, b: GossipEnvelope) extends ClusterMessage /** * Defines the current status of a cluster member node @@ -354,6 +361,11 @@ case class Gossip( Gossip(GossipOverview(mergedSeen, mergedUnreachable), mergedMembers, mergedMeta, mergedVClock) } + def isLeader(address: Address): Boolean = + members.nonEmpty && (address == members.head.address) + + def leader: Option[Address] = members.headOption.map(_.address) + override def toString = "Gossip(" + "overview = " + overview + @@ -368,6 +380,15 @@ case class Gossip( */ case class Heartbeat(from: Address) extends ClusterMessage +/** + * INTERNAL API. + * + * Command to [akka.cluster.ClusterHeartbeatSender]], which will send [[akka.cluster.Heartbeat]] + * to the other node. + * Local only, no need to serialize. + */ +private[cluster] case class SendHeartbeat(heartbeatMsg: Heartbeat, to: Address, deadline: Deadline) + /** * INTERNAL API. * @@ -423,7 +444,8 @@ private[cluster] final class ClusterCommandDaemon(cluster: Cluster) extends Acto private[cluster] final class ClusterGossipDaemon(cluster: Cluster) extends Actor with ActorLogging { def receive = { - case GossipEnvelope(from, gossip) ⇒ cluster.receiveGossip(from, gossip) + case msg: GossipEnvelope ⇒ cluster.receiveGossip(msg) + case msg: GossipMergeConflict ⇒ cluster.receiveGossipMerge(msg) } override def unhandled(unknown: Any) = log.error("[{}] can not respond to messages - received [{}]", @@ -447,6 +469,85 @@ private[cluster] final class ClusterHeartbeatDaemon(cluster: Cluster) extends Ac self.path, unknown) } +/* + * This actor is responsible for sending the heartbeat messages to + * other nodes. Netty blocks when sending to broken connections. This actor + * isolates sending to different nodes by using child workers for each target + * address and thereby reduce the risk of irregular heartbeats to healty + * nodes due to broken connections to other nodes. + */ +private[cluster] final class ClusterHeartbeatSender(cluster: Cluster) extends Actor with ActorLogging { + + /** + * Looks up and returns the remote cluster heartbeat connection for the specific address. + */ + def clusterHeartbeatConnectionFor(address: Address): ActorRef = + context.system.actorFor(RootActorPath(address) / "system" / "cluster" / "heartbeat") + + val digester = MessageDigest.getInstance("MD5") + + /** + * Child name is MD5 hash of the address + */ + def hash(name: String): String = { + digester update name.getBytes("UTF-8") + digester.digest.map { h ⇒ "%02x".format(0xFF & h) }.mkString + } + + def receive = { + case msg @ SendHeartbeat(from, to, deadline) ⇒ + val workerName = hash(to.toString) + val worker = context.actorFor(workerName) match { + case notFound if notFound.isTerminated ⇒ + context.actorOf(Props(new ClusterHeartbeatSenderWorker( + cluster.settings.SendCircuitBreakerSettings, clusterHeartbeatConnectionFor(to))), workerName) + case child ⇒ child + } + worker ! msg + } + +} + +/** + * Responsible for sending [[akka.cluster.Heartbeat]] to one specific address. + * + * Netty blocks when sending to broken connections, and this actor uses + * a configurable circuit breaker to reduce connect attempts to broken + * connections. + * + * @see ClusterHeartbeatSender + */ +private[cluster] final class ClusterHeartbeatSenderWorker( + cbSettings: CircuitBreakerSettings, toRef: ActorRef) + extends Actor with ActorLogging { + + val breaker = CircuitBreaker(context.system.scheduler, + cbSettings.maxFailures, cbSettings.callTimeout, cbSettings.resetTimeout). + onHalfOpen(log.debug("CircuitBreaker Half-Open for: [{}]", toRef)). + onOpen(log.debug("CircuitBreaker Open for [{}]", toRef)). + onClose(log.debug("CircuitBreaker Closed for [{}]", toRef)) + + context.setReceiveTimeout(30 seconds) + + def receive = { + case SendHeartbeat(heartbeatMsg, _, deadline) ⇒ + if (!deadline.isOverdue) { + // the CircuitBreaker will measure elapsed time and open if too many long calls + try breaker.withSyncCircuitBreaker { + log.debug("Cluster Node [{}] - Heartbeat to [{}]", heartbeatMsg.from, toRef) + toRef ! heartbeatMsg + if (deadline.isOverdue) log.debug("Sending heartbeat to [{}] took longer than expected", toRef) + } catch { case e: CircuitBreakerOpenException ⇒ /* skip sending heartbeat to broken connection */ } + + // make sure it will cleanup when not used any more + context.setReceiveTimeout(30 seconds) + } + + case ReceiveTimeout ⇒ context.stop(self) // cleanup when not used + + } +} + /** * INTERNAL API. * @@ -455,14 +556,16 @@ private[cluster] final class ClusterHeartbeatDaemon(cluster: Cluster) extends Ac private[cluster] final class ClusterDaemonSupervisor(cluster: Cluster) extends Actor with ActorLogging { val configuredDispatcher = cluster.settings.UseDispatcher - private val commands = context.actorOf(Props(new ClusterCommandDaemon(cluster)). + val commands = context.actorOf(Props(new ClusterCommandDaemon(cluster)). withDispatcher(configuredDispatcher), name = "commands") - private val gossip = context.actorOf(Props(new ClusterGossipDaemon(cluster)). + val gossip = context.actorOf(Props(new ClusterGossipDaemon(cluster)). withDispatcher(configuredDispatcher). withRouter(RoundRobinRouter(cluster.settings.NrOfGossipDaemons)), name = "gossip") - private val heartbeat = context.actorOf(Props(new ClusterHeartbeatDaemon(cluster)). + val heartbeat = context.actorOf(Props(new ClusterHeartbeatDaemon(cluster)). withDispatcher(configuredDispatcher), name = "heartbeat") + val heartbeatSender = context.actorOf(Props(new ClusterHeartbeatSender(cluster)). + withDispatcher(configuredDispatcher), name = "heartbeatSender") def receive = Actor.emptyBehavior @@ -699,15 +802,15 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) /** * Is this node the leader? */ - def isLeader: Boolean = { - val members = latestGossip.members - members.nonEmpty && (selfAddress == members.head.address) - } + def isLeader: Boolean = latestGossip.isLeader(selfAddress) /** * Get the address of the current leader. */ - def leader: Address = latestGossip.members.head.address + def leader: Address = latestGossip.leader match { + case Some(x) ⇒ x + case None ⇒ throw new IllegalStateException("There is no leader in this cluster") + } /** * Is this node a singleton cluster? @@ -862,7 +965,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) if (!state.compareAndSet(localState, newState)) joining(node) // recur if we failed update else { - log.info("Cluster Node [{}] - Node [{}] is JOINING", selfAddress, node) + log.debug("Cluster Node [{}] - Node [{}] is JOINING", selfAddress, node) // treat join as initial heartbeat, so that it becomes unavailable if nothing more happens if (node != selfAddress) { failureDetector heartbeat node @@ -986,74 +1089,164 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) // Can be removed when gossip has been optimized private val _receivedGossipCount = new AtomicLong + /** * INTERNAL API. */ private[cluster] def receivedGossipCount: Long = _receivedGossipCount.get + /** + * INTERNAL API. + */ + private[cluster] def mergeCount: Long = _mergeCount.get + + // Can be removed when gossip has been optimized + private val _mergeCount = new AtomicLong + + /** + * INTERNAL API. + */ + private[cluster] def mergeDetectedCount: Long = _mergeDetectedCount.get + + // Can be removed when gossip has been optimized + private val _mergeDetectedCount = new AtomicLong + + private val _mergeConflictCount = new AtomicLong + private def mergeRate(count: Long): Double = (count * 1000.0) / GossipInterval.toMillis + + /** + * INTERNAL API. + * + * When conflicting versions of received and local [[akka.cluster.Gossip]] is detected + * it's forwarded to the leader for conflict resolution. Trying to simultaneously + * resolving conflicts at several nodes creates new conflicts. Therefore the leader resolves + * conflicts to limit divergence. To avoid overload there is also a configurable rate + * limit of how many conflicts that are handled by second. If the limit is + * exceeded the conflicting gossip messages are dropped and will reappear later. + */ + private[cluster] def receiveGossipMerge(merge: GossipMergeConflict): Unit = { + val count = _mergeConflictCount.incrementAndGet + val rate = mergeRate(count) + if (rate <= MaxGossipMergeRate) { + receiveGossip(merge.a.copy(conversation = false)) + receiveGossip(merge.b.copy(conversation = false)) + + // use one-way gossip from leader to reduce load of leader + def sendBack(to: Address): Unit = { + if (to != selfAddress && !latestGossip.overview.unreachable.exists(_.address == to)) + oneWayGossipTo(to) + } + + sendBack(merge.a.from) + sendBack(merge.b.from) + + } else { + log.debug("Dropping gossip merge conflict due to rate [{}] / s ", rate) + } + } + /** * INTERNAL API. * * Receive new gossip. */ @tailrec - final private[cluster] def receiveGossip(from: Address, remoteGossip: Gossip): Unit = { + final private[cluster] def receiveGossip(envelope: GossipEnvelope): Unit = { + val from = envelope.from + val remoteGossip = envelope.gossip val localState = state.get val localGossip = localState.latestGossip - if (!localGossip.overview.isNonDownUnreachable(from)) { + if (remoteGossip.overview.unreachable.exists(_.address == selfAddress)) { + // FIXME how should we handle this situation? + log.debug("Received gossip with self as unreachable, from [{}]", from) - val winningGossip = - if (remoteGossip.version <> localGossip.version) { - // concurrent - val mergedGossip = remoteGossip merge localGossip - val versionedMergedGossip = mergedGossip :+ vclockNode + } else if (!localGossip.overview.isNonDownUnreachable(from)) { - versionedMergedGossip + // leader handles merge conflicts, or when they have different views of how is leader + val handleMerge = localGossip.leader == Some(selfAddress) || localGossip.leader != remoteGossip.leader + val conflict = remoteGossip.version <> localGossip.version - } else if (remoteGossip.version < localGossip.version) { - // local gossip is newer - localGossip + if (conflict && !handleMerge) { + // delegate merge resolution to leader to reduce number of simultaneous resolves, + // which will result in new conflicts + log.debug("Merge conflict [{}] detected [{}] <> [{}]", _mergeDetectedCount.incrementAndGet, selfAddress, from) + + val count = _mergeConflictCount.incrementAndGet + val rate = mergeRate(count) + if (rate <= MaxGossipMergeRate) { + val leaderConnection = clusterGossipConnectionFor(localGossip.leader.get) + leaderConnection ! GossipMergeConflict(GossipEnvelope(selfAddress, localGossip), envelope) } else { - // remote gossip is newer - remoteGossip + log.debug("Skipping gossip merge conflict due to rate [{}] / s ", rate) } - val newJoinInProgress = - if (localState.joinInProgress.isEmpty) localState.joinInProgress - else localState.joinInProgress -- - winningGossip.members.map(_.address) -- - winningGossip.overview.unreachable.map(_.address) + } else { - val newState = localState copy ( - latestGossip = winningGossip seen selfAddress, - joinInProgress = newJoinInProgress) + val winningGossip = - // for all new joining nodes we optimistically remove them from the failure detector, since if we wait until - // we have won the CAS, then the node might be picked up by the reapUnreachableMembers task and moved to - // unreachable before we can remove the node from the failure detector - (newState.latestGossip.members -- localState.latestGossip.members).filter(_.status == Joining).foreach { - case node ⇒ failureDetector.remove(node.address) - } + if (conflict) { + // conflicting versions, merge, and new version + val mergedGossip = remoteGossip merge localGossip + mergedGossip :+ vclockNode - // if we won the race then update else try again - if (!state.compareAndSet(localState, newState)) receiveGossip(from, remoteGossip) // recur if we fail the update - else { - log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, from) + } else if (remoteGossip.version < localGossip.version) { + // local gossip is newer + localGossip - if ((winningGossip ne localGossip) && (winningGossip ne remoteGossip)) - log.debug( - """Couldn't establish a causal relationship between "remote" gossip and "local" gossip - Remote[{}] - Local[{}] - merged them into [{}]""", - remoteGossip, localGossip, winningGossip) + } else if (!remoteGossip.members.exists(_.address == selfAddress)) { + // FIXME This is a very strange. It can happen when many nodes join at the same time. + // It's not detected as an ordinary version conflict <> + // If we don't handle this situation there will be IllegalArgumentException when marking this as seen + // merge, and new version + val mergedGossip = remoteGossip merge (localGossip :+ Member(selfAddress, Joining)) + mergedGossip :+ vclockNode - _receivedGossipCount.incrementAndGet() - notifyMembershipChangeListeners(localState, newState) + } else { + // remote gossip is newer + remoteGossip - if ((winningGossip ne remoteGossip) || (newState.latestGossip ne remoteGossip)) { - // send back gossip to sender when sender had different view, i.e. merge, or sender had - // older or sender had newer - gossipTo(from) + } + + val newJoinInProgress = + if (localState.joinInProgress.isEmpty) localState.joinInProgress + else localState.joinInProgress -- + winningGossip.members.map(_.address) -- + winningGossip.overview.unreachable.map(_.address) + + val newState = localState copy ( + latestGossip = winningGossip seen selfAddress, + joinInProgress = newJoinInProgress) + + // for all new joining nodes we optimistically remove them from the failure detector, since if we wait until + // we have won the CAS, then the node might be picked up by the reapUnreachableMembers task and moved to + // unreachable before we can remove the node from the failure detector + (newState.latestGossip.members -- localState.latestGossip.members).filter(_.status == Joining).foreach { + case node ⇒ failureDetector.remove(node.address) + } + + // if we won the race then update else try again + if (!state.compareAndSet(localState, newState)) receiveGossip(envelope) // recur if we fail the update + else { + log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, from) + + if (conflict) { + _mergeCount.incrementAndGet + log.debug( + """Couldn't establish a causal relationship between "remote" gossip and "local" gossip - Remote[{}] - Local[{}] - merged them into [{}]""", + remoteGossip, localGossip, winningGossip) + } + + _receivedGossipCount.incrementAndGet() + notifyMembershipChangeListeners(localState, newState) + + if (envelope.conversation && + (conflict || (winningGossip ne remoteGossip) || (newState.latestGossip ne remoteGossip))) { + // send back gossip to sender when sender had different view, i.e. merge, or sender had + // older or sender had newer + gossipTo(from) + } } } } @@ -1074,10 +1267,19 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) * * Gossips latest gossip to an address. */ - private[cluster] def gossipTo(address: Address): Unit = { + private[cluster] def gossipTo(address: Address): Unit = + gossipTo(address, GossipEnvelope(selfAddress, latestGossip, conversation = true)) + + /** + * INTERNAL API. + */ + private[cluster] def oneWayGossipTo(address: Address): Unit = + gossipTo(address, GossipEnvelope(selfAddress, latestGossip, conversation = false)) + + private def gossipTo(address: Address, gossipMsg: GossipEnvelope): Unit = if (address != selfAddress) { val connection = clusterGossipConnectionFor(address) log.debug("Cluster Node [{}] - Gossiping to [{}]", selfAddress, connection) - connection ! GossipEnvelope(selfAddress, latestGossip) + connection ! gossipMsg } /** @@ -1112,6 +1314,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) */ private[cluster] def gossip(): Unit = { val localState = state.get + _mergeConflictCount.set(0) log.debug("Cluster Node [{}] - Initiating new round of gossip", selfAddress) @@ -1161,11 +1364,9 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) val beatTo = localState.latestGossip.members.toSeq.map(_.address) ++ localState.joinInProgress.keys - for (address ← beatTo; if address != selfAddress) { - val connection = clusterHeartbeatConnectionFor(address) - log.debug("Cluster Node [{}] - Heartbeat to [{}]", selfAddress, connection) - connection ! selfHeartbeat - } + val deadline = Deadline.now + HeartbeatInterval + for (address ← beatTo; if address != selfAddress) + clusterHeartbeatSender ! SendHeartbeat(selfHeartbeat, address, deadline) } /** @@ -1187,7 +1388,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) val newlyDetectedUnreachableMembers = localMembers filterNot { member ⇒ failureDetector.isAvailable(member.address) } - if (newlyDetectedUnreachableMembers.nonEmpty) { // we have newly detected members marked as unavailable + if (newlyDetectedUnreachableMembers.nonEmpty) { val newMembers = localMembers -- newlyDetectedUnreachableMembers val newUnreachableMembers = localUnreachableMembers ++ newlyDetectedUnreachableMembers @@ -1204,7 +1405,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) // if we won the race then update else try again if (!state.compareAndSet(localState, newState)) reapUnreachableMembers() // recur else { - log.info("Cluster Node [{}] - Marking node(s) as UNREACHABLE [{}]", selfAddress, newlyDetectedUnreachableMembers.mkString(", ")) + log.error("Cluster Node [{}] - Marking node(s) as UNREACHABLE [{}]", selfAddress, newlyDetectedUnreachableMembers.mkString(", ")) notifyMembershipChangeListeners(localState, newState) } @@ -1484,10 +1685,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) */ private def clusterGossipConnectionFor(address: Address): ActorRef = system.actorFor(RootActorPath(address) / "system" / "cluster" / "gossip") - /** - * Looks up and returns the remote cluster heartbeat connection for the specific address. - */ - private def clusterHeartbeatConnectionFor(address: Address): ActorRef = system.actorFor(RootActorPath(address) / "system" / "cluster" / "heartbeat") + private def clusterHeartbeatSender: ActorRef = system.actorFor(clusterDaemons.path / "heartbeatSender") /** * Gets the addresses of a all the 'deputy' nodes - excluding this node if part of the group. diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index 2a63f32e83..e9d95de446 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -42,6 +42,13 @@ class ClusterSettings(val config: Config, val systemName: String) { case id ⇒ id } final val GossipDifferentViewProbability: Double = getDouble("akka.cluster.gossip-different-view-probability") + final val MaxGossipMergeRate: Double = getDouble("akka.cluster.max-gossip-merge-rate") final val SchedulerTickDuration: Duration = Duration(getMilliseconds("akka.cluster.scheduler.tick-duration"), MILLISECONDS) final val SchedulerTicksPerWheel: Int = getInt("akka.cluster.scheduler.ticks-per-wheel") + final val SendCircuitBreakerSettings: CircuitBreakerSettings = CircuitBreakerSettings( + maxFailures = getInt("akka.cluster.send-circuit-breaker.max-failures"), + callTimeout = Duration(getMilliseconds("akka.cluster.send-circuit-breaker.call-timeout"), MILLISECONDS), + resetTimeout = Duration(getMilliseconds("akka.cluster.send-circuit-breaker.reset-timeout"), MILLISECONDS)) } + +case class CircuitBreakerSettings(maxFailures: Int, callTimeout: Duration, resetTimeout: Duration) \ No newline at end of file diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala index e3dc7719c1..014983426f 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala @@ -36,12 +36,21 @@ object LargeClusterMultiJvmSpec extends MultiNodeConfig { akka.cluster { gossip-interval = 500 ms auto-join = off - failure-detector.threshold = 4 + nr-of-gossip-daemons = 2 + failure-detector.acceptable-heartbeat-pause = 10s } akka.loglevel = INFO - akka.actor.default-dispatcher.fork-join-executor.parallelism-max = 2 + akka.actor.default-dispatcher.fork-join-executor { + # when using nodes-per-datacenter=10 we need some extra + # threads to keep up with netty connect blocking + parallelism-min = 13 + parallelism-max = 13 + } akka.scheduler.tick-duration = 33 ms - akka.remote.netty.execution-pool-size = 0 + akka.remote.netty.execution-pool-size = 4 + #akka.remote.netty.reconnection-time-window = 1s + akka.remote.netty.backoff-timeout = 500ms + akka.remote.netty.connection-timeout = 500ms # don't use testconductor transport in this test, especially not # when using use-dispatcher-for-io @@ -244,12 +253,11 @@ abstract class LargeClusterSpec } } - // FIXME sometimes this fails, FD marks nodes from other than second-datacenter as unavailable - "detect failure and auto-down crashed nodes in second-datacenter" taggedAs LongRunningTest ignore { + "detect failure and auto-down crashed nodes in second-datacenter" taggedAs LongRunningTest in { val unreachableNodes = nodesPerDatacenter val liveNodes = nodesPerDatacenter * 4 - within(20.seconds + expectedMaxDuration(liveNodes)) { + within(30.seconds + (3.seconds * liveNodes)) { val startGossipCounts = Map.empty[Cluster, Long] ++ systems.map(sys ⇒ (Cluster(sys) -> Cluster(sys).receivedGossipCount)) def gossipCount(c: Cluster): Long = c.receivedGossipCount - startGossipCounts(c) @@ -278,10 +286,11 @@ abstract class LargeClusterSpec runOn(firstDatacenter, thirdDatacenter, fourthDatacenter, fifthDatacenter) { Await.ready(latch, remaining) awaitCond(systems.forall(Cluster(_).convergence.isDefined)) + val mergeCount = systems.map(sys ⇒ Cluster(sys).mergeCount).sum val counts = systems.map(sys ⇒ gossipCount(Cluster(sys))) val formattedStats = "mean=%s min=%s max=%s".format(counts.sum / nodesPerDatacenter, counts.min, counts.max) - log.info("Convergence of [{}] nodes reached after failure, it took [{}], received [{}] gossip messages per node", - liveNodes, tookMillis, formattedStats) + log.info("Convergence of [{}] nodes reached after failure, it took [{}], received [{}] gossip messages per node, merged [{}] times", + liveNodes, tookMillis, formattedStats, mergeCount) } enterBarrier("after-6") diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala index d146e22982..88f04b9d7d 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala @@ -35,8 +35,13 @@ class ClusterConfigSpec extends AkkaSpec { AutoDown must be(true) UseDispatcher must be(Dispatchers.DefaultDispatcherId) GossipDifferentViewProbability must be(0.8 plusOrMinus 0.0001) + MaxGossipMergeRate must be(5.0 plusOrMinus 0.0001) SchedulerTickDuration must be(33 millis) SchedulerTicksPerWheel must be(512) + SendCircuitBreakerSettings must be(CircuitBreakerSettings( + maxFailures = 3, + callTimeout = 2 seconds, + resetTimeout = 30 seconds)) } } }