From 9dc124dacd9f282b494582cdb6b07fe0812bfc77 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 25 Jan 2013 15:03:52 +0100 Subject: [PATCH] Remove work-around for sending to broken connections, see #2909 * Previous work-around was introduced because Netty blocks when sending to broken connections. This is supposed to be solved by the non-blocking new remoting. * Removed HeartbeatSender and CoreSender in cluster * Added tests to verify that broken connections don't disturb live connection --- .../src/main/resources/reference.conf | 7 -- .../scala/akka/cluster/ClusterDaemon.scala | 46 ++------ .../scala/akka/cluster/ClusterHeartbeat.scala | 101 ++---------------- .../scala/akka/cluster/ClusterSettings.scala | 5 - .../akka/cluster/ClusterConfigSpec.scala | 4 - .../test/scala/akka/remote/RemotingSpec.scala | 63 ++++++++--- 6 files changed, 72 insertions(+), 154 deletions(-) diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index 4532c0ff8a..6a1f7ac3c3 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -167,13 +167,6 @@ akka { ticks-per-wheel = 512 } - # Netty blocks when sending to broken connections, and this circuit breaker - # is used to reduce connect attempts to broken connections. - send-circuit-breaker { - max-failures = 3 - call-timeout = 2 s - reset-timeout = 30 s - } } # Default configuration for routers diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index b2ecc0fbdb..106de783ae 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -99,8 +99,6 @@ private[cluster] object InternalClusterAction { case object PublishStatsTick extends Tick - case class SendClusterMessage(to: Address, msg: ClusterMessage) - case class SendGossipTo(address: Address) case object GetClusterCoreRef @@ -194,8 +192,12 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto var stats = ClusterStats() - val coreSender = context.actorOf(Props[ClusterCoreSender]. - withDispatcher(UseDispatcher), name = "coreSender") + /** + * Looks up and returns the remote cluster command connection for the specific address. + */ + private def clusterCore(address: Address): ActorRef = + context.actorFor(RootActorPath(address) / "system" / "cluster" / "core") + val heartbeatSender = context.actorOf(Props[ClusterHeartbeatSender]. withDispatcher(UseDispatcher), name = "heartbeatSender") @@ -300,7 +302,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto if (address == selfAddress) joining(address) else - coreSender ! SendClusterMessage(address, ClusterUserAction.Join(selfAddress)) + clusterCore(address) ! ClusterUserAction.Join(selfAddress) } } @@ -497,7 +499,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto val rate = mergeRate(stats.mergeConflictCount) if (rate <= MaxGossipMergeRate) - coreSender ! SendClusterMessage(to = localGossip.leader.get, msg = GossipMergeConflict(GossipEnvelope(selfAddress, localGossip), envelope)) + localGossip.leader foreach { clusterCore(_) ! GossipMergeConflict(GossipEnvelope(selfAddress, localGossip), envelope) } else log.debug("Skipping gossip merge conflict due to rate [{}] / s ", rate) @@ -712,18 +714,14 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto removedMembers foreach { member ⇒ val address = member.address log.info("Cluster Node [{}] - Leader is moving node [{}] from EXITING to REMOVED - and removing node from node ring", selfAddress, address) - coreSender ! SendClusterMessage( - to = address, - msg = ClusterLeaderAction.Remove(address)) + clusterCore(address) ! ClusterLeaderAction.Remove(address) } // tell all exiting members to exit exitingMembers foreach { member ⇒ val address = member.address log.info("Cluster Node [{}] - Leader is moving node [{}] from LEAVING to EXITING", selfAddress, address) - coreSender ! SendClusterMessage( - to = address, - msg = ClusterLeaderAction.Exit(address)) // FIXME should use ? to await completion of handoff? + clusterCore(address) ! ClusterLeaderAction.Exit(address) // FIXME should use ? to await completion of handoff? } // log the auto-downing of the unreachable nodes @@ -804,8 +802,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto gossipTo(address, GossipEnvelope(selfAddress, latestGossip, conversation = false)) def gossipTo(address: Address, gossipMsg: GossipEnvelope): Unit = - if (address != selfAddress) - coreSender ! SendClusterMessage(address, gossipMsg) + if (address != selfAddress) clusterCore(address) ! gossipMsg def publish(newGossip: Gossip): Unit = { publisher ! PublishChanges(newGossip) @@ -873,27 +870,6 @@ private[cluster] final class JoinSeedNodeProcess(seedNodes: immutable.IndexedSeq } } -/** - * INTERNAL API. - */ -private[cluster] final class ClusterCoreSender extends Actor with ActorLogging { - import InternalClusterAction._ - - val selfAddress = Cluster(context.system).selfAddress - - /** - * Looks up and returns the remote cluster command connection for the specific address. - */ - private def clusterCoreConnectionFor(address: Address): ActorRef = - context.actorFor(RootActorPath(address) / "system" / "cluster" / "core") - - def receive = { - case SendClusterMessage(to, msg) ⇒ - log.debug("Cluster Node [{}] - Trying to send [{}] to [{}]", selfAddress, msg.getClass.getSimpleName, to) - clusterCoreConnectionFor(to) ! msg - } -} - /** * INTERNAL API * diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala index f46b611c44..309b0d039a 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala @@ -7,9 +7,7 @@ import language.postfixOps import scala.collection.immutable import scala.concurrent.duration._ -import java.net.URLEncoder -import akka.actor.{ ActorLogging, ActorRef, Address, Actor, RootActorPath, PoisonPill, Props } -import akka.pattern.{ CircuitBreaker, CircuitBreakerOpenException } +import akka.actor.{ ActorLogging, ActorRef, Address, Actor, RootActorPath, Props } import akka.cluster.ClusterEvent._ import akka.routing.MurmurHash @@ -81,15 +79,9 @@ private[cluster] object ClusterHeartbeatSender { * * This actor is responsible for sending the heartbeat messages to * a few other nodes that will monitor this node. - * - * Netty blocks when sending to broken connections. This actor - * isolates sending to different nodes by using child actors for each target - * address and thereby reduce the risk of irregular heartbeats to healty - * nodes due to broken connections to other nodes. */ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogging { import ClusterHeartbeatSender._ - import ClusterHeartbeatSenderConnection._ import ClusterHeartbeatReceiver._ import InternalClusterAction.HeartbeatTick @@ -121,13 +113,13 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg /** * Looks up and returns the remote cluster heartbeat connection for the specific address. */ - def clusterHeartbeatConnectionFor(address: Address): ActorRef = + def heartbeatReceiver(address: Address): ActorRef = context.actorFor(RootActorPath(address) / "system" / "cluster" / "heartbeatReceiver") /** * Looks up and returns the remote cluster heartbeat sender for the specific address. */ - def heartbeatSenderFor(address: Address): ActorRef = context.actorFor(self.path.toStringWithAddress(address)) + def heartbeatSender(address: Address): ActorRef = context.actorFor(self.path.toStringWithAddress(address)) def receive = { case HeartbeatTick ⇒ heartbeat() @@ -154,7 +146,7 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg def sendHeartbeatRequest(address: Address): Unit = if (!cluster.failureDetector.isMonitoring(address) && state.ring.mySenders.contains(address)) { - heartbeatSenderFor(address) ! selfHeartbeatRequest + heartbeatSender(address) ! selfHeartbeatRequest // schedule the expected heartbeat for later, which will give the // sender a chance to start heartbeating, and also trigger some resends of // the heartbeat request @@ -170,28 +162,19 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg def heartbeat(): Unit = { state = state.removeOverdueHeartbeatRequest() - def connection(to: Address): ActorRef = { - // URL encoded target address as child actor name - val connectionName = URLEncoder.encode(to.toString, "UTF-8") - context.actorFor(connectionName) match { - case notFound if notFound.isTerminated ⇒ - context.actorOf(Props(new ClusterHeartbeatSenderConnection(clusterHeartbeatConnectionFor(to))), connectionName) - case child ⇒ child - } + state.active foreach { to ⇒ + log.debug("Cluster Node [{}] - Heartbeat to [{}]", cluster.selfAddress, to) + heartbeatReceiver(to) ! selfHeartbeat } - val deadline = Deadline.now + HeartbeatInterval - state.active foreach { to ⇒ connection(to) ! SendHeartbeat(selfHeartbeat, to, deadline) } - // When sending heartbeats to a node is stopped a few `EndHeartbeat` messages is // sent to notify it that no more heartbeats will be sent. for ((to, count) ← state.ending) { - val c = connection(to) - c ! SendEndHeartbeat(selfEndHeartbeat, to) - if (count == NumberOfEndHeartbeats) { + log.debug("Cluster Node [{}] - EndHeartbeat to [{}]", cluster.selfAddress, to) + heartbeatReceiver(to) ! selfEndHeartbeat + if (count == NumberOfEndHeartbeats) state = state.removeEnding(to) - c ! PoisonPill - } else + else state = state.increaseEndingCount(to) } @@ -308,68 +291,6 @@ private[cluster] case class ClusterHeartbeatSenderState private ( } -/** - * INTERNAL API - */ -private[cluster] object ClusterHeartbeatSenderConnection { - import ClusterHeartbeatReceiver._ - - /** - * Command to [akka.cluster.ClusterHeartbeatSenderConnection]], which will send - * [[akka.cluster.ClusterHeartbeatReceiver.Heartbeat]] to the other node. - * Local only, no need to serialize. - */ - case class SendHeartbeat(heartbeatMsg: Heartbeat, to: Address, deadline: Deadline) - - /** - * Command to [akka.cluster.ClusterHeartbeatSenderConnection]], which will send - * [[akka.cluster.ClusterHeartbeatReceiver.EndHeartbeat]] to the other node. - * Local only, no need to serialize. - */ - case class SendEndHeartbeat(endHeartbeatMsg: EndHeartbeat, to: Address) -} - -/** - * Responsible for sending [[akka.cluster.ClusterHeartbeatReceiver.Heartbeat]] - * and [[akka.cluster.ClusterHeartbeatReceiver.EndHeartbeat]] to one specific address. - * - * This actor exists only because Netty blocks when sending to broken connections, - * and this actor uses a configurable circuit breaker to reduce connect attempts to broken - * connections. - * - * @see akka.cluster.ClusterHeartbeatSender - */ -private[cluster] final class ClusterHeartbeatSenderConnection(toRef: ActorRef) - extends Actor with ActorLogging { - - import ClusterHeartbeatSenderConnection._ - - val breaker = { - val cbSettings = Cluster(context.system).settings.SendCircuitBreakerSettings - CircuitBreaker(context.system.scheduler, - cbSettings.maxFailures, cbSettings.callTimeout, cbSettings.resetTimeout). - onHalfOpen(log.debug("CircuitBreaker Half-Open for: [{}]", toRef)). - onOpen(log.info("CircuitBreaker Open for [{}]", toRef)). - onClose(log.debug("CircuitBreaker Closed for [{}]", toRef)) - } - - def receive = { - case SendHeartbeat(heartbeatMsg, _, deadline) ⇒ - if (!deadline.isOverdue) { - log.debug("Cluster Node [{}] - Heartbeat to [{}]", heartbeatMsg.from, toRef) - // Netty blocks when sending to broken connections, the CircuitBreaker will - // measure elapsed time and open if too many long calls - try breaker.withSyncCircuitBreaker { - toRef ! heartbeatMsg - } catch { case e: CircuitBreakerOpenException ⇒ /* skip sending heartbeat to broken connection */ } - } - if (deadline.isOverdue) log.info("Sending heartbeat to [{}] took longer than expected", toRef) - case SendEndHeartbeat(endHeartbeatMsg, _) ⇒ - log.debug("Cluster Node [{}] - EndHeartbeat to [{}]", endHeartbeatMsg.from, toRef) - toRef ! endHeartbeatMsg - } -} - /** * INTERNAL API * diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index eced8bae97..2020c67993 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -72,10 +72,6 @@ class ClusterSettings(val config: Config, val systemName: String) { final val MaxGossipMergeRate: Double = getDouble("akka.cluster.max-gossip-merge-rate") final val SchedulerTickDuration: FiniteDuration = Duration(getMilliseconds("akka.cluster.scheduler.tick-duration"), MILLISECONDS) final val SchedulerTicksPerWheel: Int = getInt("akka.cluster.scheduler.ticks-per-wheel") - final val SendCircuitBreakerSettings: CircuitBreakerSettings = CircuitBreakerSettings( - maxFailures = getInt("akka.cluster.send-circuit-breaker.max-failures"), - callTimeout = Duration(getMilliseconds("akka.cluster.send-circuit-breaker.call-timeout"), MILLISECONDS), - resetTimeout = Duration(getMilliseconds("akka.cluster.send-circuit-breaker.reset-timeout"), MILLISECONDS)) final val MetricsEnabled: Boolean = getBoolean("akka.cluster.metrics.enabled") final val MetricsCollectorClass: String = getString("akka.cluster.metrics.collector-class") final val MetricsInterval: FiniteDuration = { @@ -87,4 +83,3 @@ class ClusterSettings(val config: Config, val systemName: String) { } requiring (_ > Duration.Zero, "metrics.moving-average-half-life must be > 0") } -case class CircuitBreakerSettings(maxFailures: Int, callTimeout: FiniteDuration, resetTimeout: FiniteDuration) diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala index 60c83399be..4479520833 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala @@ -45,10 +45,6 @@ class ClusterConfigSpec extends AkkaSpec { MaxGossipMergeRate must be(5.0 plusOrMinus 0.0001) SchedulerTickDuration must be(33 millis) SchedulerTicksPerWheel must be(512) - SendCircuitBreakerSettings must be(CircuitBreakerSettings( - maxFailures = 3, - callTimeout = 2 seconds, - resetTimeout = 30 seconds)) MetricsEnabled must be(true) MetricsCollectorClass must be(classOf[SigarMetricsCollector].getName) MetricsInterval must be(3 seconds) diff --git a/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala b/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala index 71f70129fa..72f3b83ce4 100644 --- a/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala @@ -13,11 +13,11 @@ import scala.concurrent.duration._ import akka.remote.transport.AssociationRegistry object RemotingSpec { - class Echo extends Actor { + class Echo1 extends Actor { var target: ActorRef = context.system.deadLetters def receive = { - case (p: Props, n: String) ⇒ sender ! context.actorOf(Props[Echo], n) + case (p: Props, n: String) ⇒ sender ! context.actorOf(Props[Echo1], n) case ex: Exception ⇒ throw ex case s: String ⇒ sender ! context.actorFor(s) case x ⇒ target = sender; sender ! x @@ -33,6 +33,12 @@ object RemotingSpec { } } + class Echo2 extends Actor { + def receive = { + case "ping" ⇒ sender ! (("pong", sender)) + } + } + val cfg: Config = ConfigFactory parseString (""" common-ssl-settings { key-store = "%s" @@ -115,11 +121,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D sys.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].deployer.deploy(d) } - val remote = other.actorOf(Props(new Actor { - def receive = { - case "ping" ⇒ sender ! (("pong", sender)) - } - }), "echo") + val remote = other.actorOf(Props[Echo2], "echo") val here = system.actorFor("akka.test://remote-sys@localhost:12346/user/echo") @@ -154,8 +156,43 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D }(other) } + "not be exhausted by sending to broken connections" in { + val moreSystems = Vector.fill(10)(ActorSystem(other.name, other.settings.config)) + moreSystems foreach (_.actorOf(Props[Echo2], name = "echo")) + val moreRefs = moreSystems map (sys ⇒ system.actorFor(RootActorPath(addr(sys, "tcp")) / "user" / "echo")) + val aliveEcho = system.actorFor(RootActorPath(addr(other, "tcp")) / "user" / "echo") + val n = 100 + + // first everything is up and running + 1 to n foreach { x ⇒ + aliveEcho ! "ping" + moreRefs(x % moreSystems.size) ! "ping" + } + + within(5.seconds) { + receiveN(n * 2) foreach { reply ⇒ reply must be(("pong", testActor)) } + } + + // then we shutdown all but one system to simulate broken connections + moreSystems foreach { sys ⇒ + sys.shutdown() + sys.awaitTermination(5.seconds.dilated) + sys.isTerminated must be(true) + } + + 1 to n foreach { x ⇒ + aliveEcho ! "ping" + moreRefs(x % moreSystems.size) ! "ping" + } + + // ping messages to aliveEcho should go through even though we use many different broken connections + within(5.seconds) { + receiveN(n) foreach { reply ⇒ reply must be(("pong", testActor)) } + } + } + "create and supervise children on remote node" in { - val r = system.actorOf(Props[Echo], "blub") + val r = system.actorOf(Props[Echo1], "blub") r.path.toString must be === "akka.test://remote-sys@localhost:12346/remote/akka.test/RemotingSpec@localhost:12345/user/blub" r ! 42 expectMsg(42) @@ -176,9 +213,9 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D case s: String ⇒ sender ! context.actorFor(s) } }), "looker") - l ! (Props[Echo], "child") + l ! (Props[Echo1], "child") val r = expectMsgType[ActorRef] - r ! (Props[Echo], "grandchild") + r ! (Props[Echo1], "grandchild") val remref = expectMsgType[ActorRef] remref.asInstanceOf[ActorRefScope].isLocal must be(true) val myref = system.actorFor(system / "looker" / "child" / "grandchild") @@ -199,7 +236,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D } "be able to use multiple transports and use the appropriate one (TCP)" in { - val r = system.actorOf(Props[Echo], "gonk") + val r = system.actorOf(Props[Echo1], "gonk") r.path.toString must be === s"akka.tcp://remote-sys@localhost:${port(other, "tcp")}/remote/akka.tcp/RemotingSpec@localhost:${port(system, "tcp")}/user/gonk" r ! 42 @@ -215,7 +252,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D } "be able to use multiple transports and use the appropriate one (UDP)" in { - val r = system.actorOf(Props[Echo], "zagzag") + val r = system.actorOf(Props[Echo1], "zagzag") r.path.toString must be === s"akka.udp://remote-sys@localhost:${port(other, "udp")}/remote/akka.udp/RemotingSpec@localhost:${port(system, "udp")}/user/zagzag" r ! 42 @@ -231,7 +268,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D } "be able to use multiple transports and use the appropriate one (SSL)" in { - val r = system.actorOf(Props[Echo], "roghtaar") + val r = system.actorOf(Props[Echo1], "roghtaar") r.path.toString must be === s"akka.ssl.tcp://remote-sys@localhost:${port(other, "ssl.tcp")}/remote/akka.ssl.tcp/RemotingSpec@localhost:${port(system, "ssl.tcp")}/user/roghtaar" r ! 42