diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index 613f6320d8..14536ae8b4 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -61,6 +61,10 @@ akka { # Probability value is between 0.0 and 1.0. 0.0 means never, 1.0 means always. gossip-different-view-probability = 0.8 + # Limit number of merge conflicts per second that are handled. If the limit is + # exceeded the conflicting gossip messages are dropped and will reappear later. + max-gossip-merge-rate = 5.0 + failure-detector { # defines the failure detector threshold @@ -97,5 +101,13 @@ akka { tick-duration = 33ms ticks-per-wheel = 512 } + + # Netty blocks when sending to broken connections, and this circuit breaker + # is used to reduce connect attempts to broken connections. + send-circuit-breaker { + max-failures = 3 + call-timeout = 2 s + reset-timeout = 30 s + } } } diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index b1bf73eddb..0b856edd17 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -29,6 +29,7 @@ import scala.annotation.tailrec import scala.collection.immutable.{ Map, SortedSet } import scala.collection.GenTraversableOnce import java.util.concurrent.atomic.AtomicLong +import java.security.MessageDigest /** * Interface for membership change listener. @@ -200,7 +201,13 @@ object Member { /** * Envelope adding a sender address to the gossip. */ -case class GossipEnvelope(from: Address, gossip: Gossip) extends ClusterMessage +case class GossipEnvelope(from: Address, gossip: Gossip, conversation: Boolean = true) extends ClusterMessage + +/** + * When conflicting versions of received and local [[akka.cluster.Gossip]] is detected + * it's forwarded to the leader for conflict resolution. + */ +case class GossipMergeConflict(a: GossipEnvelope, b: GossipEnvelope) extends ClusterMessage /** * Defines the current status of a cluster member node @@ -354,6 +361,11 @@ case class Gossip( Gossip(GossipOverview(mergedSeen, mergedUnreachable), mergedMembers, mergedMeta, mergedVClock) } + def isLeader(address: Address): Boolean = + members.nonEmpty && (address == members.head.address) + + def leader: Option[Address] = members.headOption.map(_.address) + override def toString = "Gossip(" + "overview = " + overview + @@ -368,6 +380,15 @@ case class Gossip( */ case class Heartbeat(from: Address) extends ClusterMessage +/** + * INTERNAL API. + * + * Command to [akka.cluster.ClusterHeartbeatSender]], which will send [[akka.cluster.Heartbeat]] + * to the other node. + * Local only, no need to serialize. + */ +private[cluster] case class SendHeartbeat(heartbeatMsg: Heartbeat, to: Address, deadline: Deadline) + /** * INTERNAL API. * @@ -423,7 +444,8 @@ private[cluster] final class ClusterCommandDaemon(cluster: Cluster) extends Acto private[cluster] final class ClusterGossipDaemon(cluster: Cluster) extends Actor with ActorLogging { def receive = { - case GossipEnvelope(from, gossip) ⇒ cluster.receiveGossip(from, gossip) + case msg: GossipEnvelope ⇒ cluster.receiveGossip(msg) + case msg: GossipMergeConflict ⇒ cluster.receiveGossipMerge(msg) } override def unhandled(unknown: Any) = log.error("[{}] can not respond to messages - received [{}]", @@ -447,6 +469,85 @@ private[cluster] final class ClusterHeartbeatDaemon(cluster: Cluster) extends Ac self.path, unknown) } +/* + * This actor is responsible for sending the heartbeat messages to + * other nodes. Netty blocks when sending to broken connections. This actor + * isolates sending to different nodes by using child workers for each target + * address and thereby reduce the risk of irregular heartbeats to healty + * nodes due to broken connections to other nodes. + */ +private[cluster] final class ClusterHeartbeatSender(cluster: Cluster) extends Actor with ActorLogging { + + /** + * Looks up and returns the remote cluster heartbeat connection for the specific address. + */ + def clusterHeartbeatConnectionFor(address: Address): ActorRef = + context.system.actorFor(RootActorPath(address) / "system" / "cluster" / "heartbeat") + + val digester = MessageDigest.getInstance("MD5") + + /** + * Child name is MD5 hash of the address + */ + def hash(name: String): String = { + digester update name.getBytes("UTF-8") + digester.digest.map { h ⇒ "%02x".format(0xFF & h) }.mkString + } + + def receive = { + case msg @ SendHeartbeat(from, to, deadline) ⇒ + val workerName = hash(to.toString) + val worker = context.actorFor(workerName) match { + case notFound if notFound.isTerminated ⇒ + context.actorOf(Props(new ClusterHeartbeatSenderWorker( + cluster.settings.SendCircuitBreakerSettings, clusterHeartbeatConnectionFor(to))), workerName) + case child ⇒ child + } + worker ! msg + } + +} + +/** + * Responsible for sending [[akka.cluster.Heartbeat]] to one specific address. + * + * Netty blocks when sending to broken connections, and this actor uses + * a configurable circuit breaker to reduce connect attempts to broken + * connections. + * + * @see ClusterHeartbeatSender + */ +private[cluster] final class ClusterHeartbeatSenderWorker( + cbSettings: CircuitBreakerSettings, toRef: ActorRef) + extends Actor with ActorLogging { + + val breaker = CircuitBreaker(context.system.scheduler, + cbSettings.maxFailures, cbSettings.callTimeout, cbSettings.resetTimeout). + onHalfOpen(log.debug("CircuitBreaker Half-Open for: [{}]", toRef)). + onOpen(log.debug("CircuitBreaker Open for [{}]", toRef)). + onClose(log.debug("CircuitBreaker Closed for [{}]", toRef)) + + context.setReceiveTimeout(30 seconds) + + def receive = { + case SendHeartbeat(heartbeatMsg, _, deadline) ⇒ + if (!deadline.isOverdue) { + // the CircuitBreaker will measure elapsed time and open if too many long calls + try breaker.withSyncCircuitBreaker { + log.debug("Cluster Node [{}] - Heartbeat to [{}]", heartbeatMsg.from, toRef) + toRef ! heartbeatMsg + if (deadline.isOverdue) log.debug("Sending heartbeat to [{}] took longer than expected", toRef) + } catch { case e: CircuitBreakerOpenException ⇒ /* skip sending heartbeat to broken connection */ } + + // make sure it will cleanup when not used any more + context.setReceiveTimeout(30 seconds) + } + + case ReceiveTimeout ⇒ context.stop(self) // cleanup when not used + + } +} + /** * INTERNAL API. * @@ -455,14 +556,16 @@ private[cluster] final class ClusterHeartbeatDaemon(cluster: Cluster) extends Ac private[cluster] final class ClusterDaemonSupervisor(cluster: Cluster) extends Actor with ActorLogging { val configuredDispatcher = cluster.settings.UseDispatcher - private val commands = context.actorOf(Props(new ClusterCommandDaemon(cluster)). + val commands = context.actorOf(Props(new ClusterCommandDaemon(cluster)). withDispatcher(configuredDispatcher), name = "commands") - private val gossip = context.actorOf(Props(new ClusterGossipDaemon(cluster)). + val gossip = context.actorOf(Props(new ClusterGossipDaemon(cluster)). withDispatcher(configuredDispatcher). withRouter(RoundRobinRouter(cluster.settings.NrOfGossipDaemons)), name = "gossip") - private val heartbeat = context.actorOf(Props(new ClusterHeartbeatDaemon(cluster)). + val heartbeat = context.actorOf(Props(new ClusterHeartbeatDaemon(cluster)). withDispatcher(configuredDispatcher), name = "heartbeat") + val heartbeatSender = context.actorOf(Props(new ClusterHeartbeatSender(cluster)). + withDispatcher(configuredDispatcher), name = "heartbeatSender") def receive = Actor.emptyBehavior @@ -699,15 +802,15 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) /** * Is this node the leader? */ - def isLeader: Boolean = { - val members = latestGossip.members - members.nonEmpty && (selfAddress == members.head.address) - } + def isLeader: Boolean = latestGossip.isLeader(selfAddress) /** * Get the address of the current leader. */ - def leader: Address = latestGossip.members.head.address + def leader: Address = latestGossip.leader match { + case Some(x) ⇒ x + case None ⇒ throw new IllegalStateException("There is no leader in this cluster") + } /** * Is this node a singleton cluster? @@ -862,7 +965,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) if (!state.compareAndSet(localState, newState)) joining(node) // recur if we failed update else { - log.info("Cluster Node [{}] - Node [{}] is JOINING", selfAddress, node) + log.debug("Cluster Node [{}] - Node [{}] is JOINING", selfAddress, node) // treat join as initial heartbeat, so that it becomes unavailable if nothing more happens if (node != selfAddress) { failureDetector heartbeat node @@ -986,74 +1089,164 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) // Can be removed when gossip has been optimized private val _receivedGossipCount = new AtomicLong + /** * INTERNAL API. */ private[cluster] def receivedGossipCount: Long = _receivedGossipCount.get + /** + * INTERNAL API. + */ + private[cluster] def mergeCount: Long = _mergeCount.get + + // Can be removed when gossip has been optimized + private val _mergeCount = new AtomicLong + + /** + * INTERNAL API. + */ + private[cluster] def mergeDetectedCount: Long = _mergeDetectedCount.get + + // Can be removed when gossip has been optimized + private val _mergeDetectedCount = new AtomicLong + + private val _mergeConflictCount = new AtomicLong + private def mergeRate(count: Long): Double = (count * 1000.0) / GossipInterval.toMillis + + /** + * INTERNAL API. + * + * When conflicting versions of received and local [[akka.cluster.Gossip]] is detected + * it's forwarded to the leader for conflict resolution. Trying to simultaneously + * resolving conflicts at several nodes creates new conflicts. Therefore the leader resolves + * conflicts to limit divergence. To avoid overload there is also a configurable rate + * limit of how many conflicts that are handled by second. If the limit is + * exceeded the conflicting gossip messages are dropped and will reappear later. + */ + private[cluster] def receiveGossipMerge(merge: GossipMergeConflict): Unit = { + val count = _mergeConflictCount.incrementAndGet + val rate = mergeRate(count) + if (rate <= MaxGossipMergeRate) { + receiveGossip(merge.a.copy(conversation = false)) + receiveGossip(merge.b.copy(conversation = false)) + + // use one-way gossip from leader to reduce load of leader + def sendBack(to: Address): Unit = { + if (to != selfAddress && !latestGossip.overview.unreachable.exists(_.address == to)) + oneWayGossipTo(to) + } + + sendBack(merge.a.from) + sendBack(merge.b.from) + + } else { + log.debug("Dropping gossip merge conflict due to rate [{}] / s ", rate) + } + } + /** * INTERNAL API. * * Receive new gossip. */ @tailrec - final private[cluster] def receiveGossip(from: Address, remoteGossip: Gossip): Unit = { + final private[cluster] def receiveGossip(envelope: GossipEnvelope): Unit = { + val from = envelope.from + val remoteGossip = envelope.gossip val localState = state.get val localGossip = localState.latestGossip - if (!localGossip.overview.isNonDownUnreachable(from)) { + if (remoteGossip.overview.unreachable.exists(_.address == selfAddress)) { + // FIXME how should we handle this situation? + log.debug("Received gossip with self as unreachable, from [{}]", from) - val winningGossip = - if (remoteGossip.version <> localGossip.version) { - // concurrent - val mergedGossip = remoteGossip merge localGossip - val versionedMergedGossip = mergedGossip :+ vclockNode + } else if (!localGossip.overview.isNonDownUnreachable(from)) { - versionedMergedGossip + // leader handles merge conflicts, or when they have different views of how is leader + val handleMerge = localGossip.leader == Some(selfAddress) || localGossip.leader != remoteGossip.leader + val conflict = remoteGossip.version <> localGossip.version - } else if (remoteGossip.version < localGossip.version) { - // local gossip is newer - localGossip + if (conflict && !handleMerge) { + // delegate merge resolution to leader to reduce number of simultaneous resolves, + // which will result in new conflicts + log.debug("Merge conflict [{}] detected [{}] <> [{}]", _mergeDetectedCount.incrementAndGet, selfAddress, from) + + val count = _mergeConflictCount.incrementAndGet + val rate = mergeRate(count) + if (rate <= MaxGossipMergeRate) { + val leaderConnection = clusterGossipConnectionFor(localGossip.leader.get) + leaderConnection ! GossipMergeConflict(GossipEnvelope(selfAddress, localGossip), envelope) } else { - // remote gossip is newer - remoteGossip + log.debug("Skipping gossip merge conflict due to rate [{}] / s ", rate) } - val newJoinInProgress = - if (localState.joinInProgress.isEmpty) localState.joinInProgress - else localState.joinInProgress -- - winningGossip.members.map(_.address) -- - winningGossip.overview.unreachable.map(_.address) + } else { - val newState = localState copy ( - latestGossip = winningGossip seen selfAddress, - joinInProgress = newJoinInProgress) + val winningGossip = - // for all new joining nodes we optimistically remove them from the failure detector, since if we wait until - // we have won the CAS, then the node might be picked up by the reapUnreachableMembers task and moved to - // unreachable before we can remove the node from the failure detector - (newState.latestGossip.members -- localState.latestGossip.members).filter(_.status == Joining).foreach { - case node ⇒ failureDetector.remove(node.address) - } + if (conflict) { + // conflicting versions, merge, and new version + val mergedGossip = remoteGossip merge localGossip + mergedGossip :+ vclockNode - // if we won the race then update else try again - if (!state.compareAndSet(localState, newState)) receiveGossip(from, remoteGossip) // recur if we fail the update - else { - log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, from) + } else if (remoteGossip.version < localGossip.version) { + // local gossip is newer + localGossip - if ((winningGossip ne localGossip) && (winningGossip ne remoteGossip)) - log.debug( - """Couldn't establish a causal relationship between "remote" gossip and "local" gossip - Remote[{}] - Local[{}] - merged them into [{}]""", - remoteGossip, localGossip, winningGossip) + } else if (!remoteGossip.members.exists(_.address == selfAddress)) { + // FIXME This is a very strange. It can happen when many nodes join at the same time. + // It's not detected as an ordinary version conflict <> + // If we don't handle this situation there will be IllegalArgumentException when marking this as seen + // merge, and new version + val mergedGossip = remoteGossip merge (localGossip :+ Member(selfAddress, Joining)) + mergedGossip :+ vclockNode - _receivedGossipCount.incrementAndGet() - notifyMembershipChangeListeners(localState, newState) + } else { + // remote gossip is newer + remoteGossip - if ((winningGossip ne remoteGossip) || (newState.latestGossip ne remoteGossip)) { - // send back gossip to sender when sender had different view, i.e. merge, or sender had - // older or sender had newer - gossipTo(from) + } + + val newJoinInProgress = + if (localState.joinInProgress.isEmpty) localState.joinInProgress + else localState.joinInProgress -- + winningGossip.members.map(_.address) -- + winningGossip.overview.unreachable.map(_.address) + + val newState = localState copy ( + latestGossip = winningGossip seen selfAddress, + joinInProgress = newJoinInProgress) + + // for all new joining nodes we optimistically remove them from the failure detector, since if we wait until + // we have won the CAS, then the node might be picked up by the reapUnreachableMembers task and moved to + // unreachable before we can remove the node from the failure detector + (newState.latestGossip.members -- localState.latestGossip.members).filter(_.status == Joining).foreach { + case node ⇒ failureDetector.remove(node.address) + } + + // if we won the race then update else try again + if (!state.compareAndSet(localState, newState)) receiveGossip(envelope) // recur if we fail the update + else { + log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, from) + + if (conflict) { + _mergeCount.incrementAndGet + log.debug( + """Couldn't establish a causal relationship between "remote" gossip and "local" gossip - Remote[{}] - Local[{}] - merged them into [{}]""", + remoteGossip, localGossip, winningGossip) + } + + _receivedGossipCount.incrementAndGet() + notifyMembershipChangeListeners(localState, newState) + + if (envelope.conversation && + (conflict || (winningGossip ne remoteGossip) || (newState.latestGossip ne remoteGossip))) { + // send back gossip to sender when sender had different view, i.e. merge, or sender had + // older or sender had newer + gossipTo(from) + } } } } @@ -1074,10 +1267,19 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) * * Gossips latest gossip to an address. */ - private[cluster] def gossipTo(address: Address): Unit = { + private[cluster] def gossipTo(address: Address): Unit = + gossipTo(address, GossipEnvelope(selfAddress, latestGossip, conversation = true)) + + /** + * INTERNAL API. + */ + private[cluster] def oneWayGossipTo(address: Address): Unit = + gossipTo(address, GossipEnvelope(selfAddress, latestGossip, conversation = false)) + + private def gossipTo(address: Address, gossipMsg: GossipEnvelope): Unit = if (address != selfAddress) { val connection = clusterGossipConnectionFor(address) log.debug("Cluster Node [{}] - Gossiping to [{}]", selfAddress, connection) - connection ! GossipEnvelope(selfAddress, latestGossip) + connection ! gossipMsg } /** @@ -1112,6 +1314,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) */ private[cluster] def gossip(): Unit = { val localState = state.get + _mergeConflictCount.set(0) log.debug("Cluster Node [{}] - Initiating new round of gossip", selfAddress) @@ -1161,11 +1364,9 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) val beatTo = localState.latestGossip.members.toSeq.map(_.address) ++ localState.joinInProgress.keys - for (address ← beatTo; if address != selfAddress) { - val connection = clusterHeartbeatConnectionFor(address) - log.debug("Cluster Node [{}] - Heartbeat to [{}]", selfAddress, connection) - connection ! selfHeartbeat - } + val deadline = Deadline.now + HeartbeatInterval + for (address ← beatTo; if address != selfAddress) + clusterHeartbeatSender ! SendHeartbeat(selfHeartbeat, address, deadline) } /** @@ -1187,7 +1388,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) val newlyDetectedUnreachableMembers = localMembers filterNot { member ⇒ failureDetector.isAvailable(member.address) } - if (newlyDetectedUnreachableMembers.nonEmpty) { // we have newly detected members marked as unavailable + if (newlyDetectedUnreachableMembers.nonEmpty) { val newMembers = localMembers -- newlyDetectedUnreachableMembers val newUnreachableMembers = localUnreachableMembers ++ newlyDetectedUnreachableMembers @@ -1204,7 +1405,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) // if we won the race then update else try again if (!state.compareAndSet(localState, newState)) reapUnreachableMembers() // recur else { - log.info("Cluster Node [{}] - Marking node(s) as UNREACHABLE [{}]", selfAddress, newlyDetectedUnreachableMembers.mkString(", ")) + log.error("Cluster Node [{}] - Marking node(s) as UNREACHABLE [{}]", selfAddress, newlyDetectedUnreachableMembers.mkString(", ")) notifyMembershipChangeListeners(localState, newState) } @@ -1484,10 +1685,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) */ private def clusterGossipConnectionFor(address: Address): ActorRef = system.actorFor(RootActorPath(address) / "system" / "cluster" / "gossip") - /** - * Looks up and returns the remote cluster heartbeat connection for the specific address. - */ - private def clusterHeartbeatConnectionFor(address: Address): ActorRef = system.actorFor(RootActorPath(address) / "system" / "cluster" / "heartbeat") + private def clusterHeartbeatSender: ActorRef = system.actorFor(clusterDaemons.path / "heartbeatSender") /** * Gets the addresses of a all the 'deputy' nodes - excluding this node if part of the group. diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index 2a63f32e83..e9d95de446 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -42,6 +42,13 @@ class ClusterSettings(val config: Config, val systemName: String) { case id ⇒ id } final val GossipDifferentViewProbability: Double = getDouble("akka.cluster.gossip-different-view-probability") + final val MaxGossipMergeRate: Double = getDouble("akka.cluster.max-gossip-merge-rate") final val SchedulerTickDuration: Duration = Duration(getMilliseconds("akka.cluster.scheduler.tick-duration"), MILLISECONDS) final val SchedulerTicksPerWheel: Int = getInt("akka.cluster.scheduler.ticks-per-wheel") + final val SendCircuitBreakerSettings: CircuitBreakerSettings = CircuitBreakerSettings( + maxFailures = getInt("akka.cluster.send-circuit-breaker.max-failures"), + callTimeout = Duration(getMilliseconds("akka.cluster.send-circuit-breaker.call-timeout"), MILLISECONDS), + resetTimeout = Duration(getMilliseconds("akka.cluster.send-circuit-breaker.reset-timeout"), MILLISECONDS)) } + +case class CircuitBreakerSettings(maxFailures: Int, callTimeout: Duration, resetTimeout: Duration) \ No newline at end of file diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala index e3dc7719c1..014983426f 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala @@ -36,12 +36,21 @@ object LargeClusterMultiJvmSpec extends MultiNodeConfig { akka.cluster { gossip-interval = 500 ms auto-join = off - failure-detector.threshold = 4 + nr-of-gossip-daemons = 2 + failure-detector.acceptable-heartbeat-pause = 10s } akka.loglevel = INFO - akka.actor.default-dispatcher.fork-join-executor.parallelism-max = 2 + akka.actor.default-dispatcher.fork-join-executor { + # when using nodes-per-datacenter=10 we need some extra + # threads to keep up with netty connect blocking + parallelism-min = 13 + parallelism-max = 13 + } akka.scheduler.tick-duration = 33 ms - akka.remote.netty.execution-pool-size = 0 + akka.remote.netty.execution-pool-size = 4 + #akka.remote.netty.reconnection-time-window = 1s + akka.remote.netty.backoff-timeout = 500ms + akka.remote.netty.connection-timeout = 500ms # don't use testconductor transport in this test, especially not # when using use-dispatcher-for-io @@ -244,12 +253,11 @@ abstract class LargeClusterSpec } } - // FIXME sometimes this fails, FD marks nodes from other than second-datacenter as unavailable - "detect failure and auto-down crashed nodes in second-datacenter" taggedAs LongRunningTest ignore { + "detect failure and auto-down crashed nodes in second-datacenter" taggedAs LongRunningTest in { val unreachableNodes = nodesPerDatacenter val liveNodes = nodesPerDatacenter * 4 - within(20.seconds + expectedMaxDuration(liveNodes)) { + within(30.seconds + (3.seconds * liveNodes)) { val startGossipCounts = Map.empty[Cluster, Long] ++ systems.map(sys ⇒ (Cluster(sys) -> Cluster(sys).receivedGossipCount)) def gossipCount(c: Cluster): Long = c.receivedGossipCount - startGossipCounts(c) @@ -278,10 +286,11 @@ abstract class LargeClusterSpec runOn(firstDatacenter, thirdDatacenter, fourthDatacenter, fifthDatacenter) { Await.ready(latch, remaining) awaitCond(systems.forall(Cluster(_).convergence.isDefined)) + val mergeCount = systems.map(sys ⇒ Cluster(sys).mergeCount).sum val counts = systems.map(sys ⇒ gossipCount(Cluster(sys))) val formattedStats = "mean=%s min=%s max=%s".format(counts.sum / nodesPerDatacenter, counts.min, counts.max) - log.info("Convergence of [{}] nodes reached after failure, it took [{}], received [{}] gossip messages per node", - liveNodes, tookMillis, formattedStats) + log.info("Convergence of [{}] nodes reached after failure, it took [{}], received [{}] gossip messages per node, merged [{}] times", + liveNodes, tookMillis, formattedStats, mergeCount) } enterBarrier("after-6") diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala index d146e22982..88f04b9d7d 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala @@ -35,8 +35,13 @@ class ClusterConfigSpec extends AkkaSpec { AutoDown must be(true) UseDispatcher must be(Dispatchers.DefaultDispatcherId) GossipDifferentViewProbability must be(0.8 plusOrMinus 0.0001) + MaxGossipMergeRate must be(5.0 plusOrMinus 0.0001) SchedulerTickDuration must be(33 millis) SchedulerTicksPerWheel must be(512) + SendCircuitBreakerSettings must be(CircuitBreakerSettings( + maxFailures = 3, + callTimeout = 2 seconds, + resetTimeout = 30 seconds)) } } }