Remove work-around for sending to broken connections, see #2909

* Previous work-around was introduced because Netty blocks when sending
to broken connections. This is supposed to be solved by the non-blocking
new remoting.
* Removed HeartbeatSender and CoreSender in cluster
* Added tests to verify that broken connections don't disturb live connection
This commit is contained in:
Patrik Nordwall 2013-01-25 15:03:52 +01:00
parent 89b31c995c
commit 9dc124dacd
6 changed files with 72 additions and 154 deletions

View file

@ -167,13 +167,6 @@ akka {
ticks-per-wheel = 512 ticks-per-wheel = 512
} }
# Netty blocks when sending to broken connections, and this circuit breaker
# is used to reduce connect attempts to broken connections.
send-circuit-breaker {
max-failures = 3
call-timeout = 2 s
reset-timeout = 30 s
}
} }
# Default configuration for routers # Default configuration for routers

View file

@ -99,8 +99,6 @@ private[cluster] object InternalClusterAction {
case object PublishStatsTick extends Tick case object PublishStatsTick extends Tick
case class SendClusterMessage(to: Address, msg: ClusterMessage)
case class SendGossipTo(address: Address) case class SendGossipTo(address: Address)
case object GetClusterCoreRef case object GetClusterCoreRef
@ -194,8 +192,12 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
var stats = ClusterStats() var stats = ClusterStats()
val coreSender = context.actorOf(Props[ClusterCoreSender]. /**
withDispatcher(UseDispatcher), name = "coreSender") * Looks up and returns the remote cluster command connection for the specific address.
*/
private def clusterCore(address: Address): ActorRef =
context.actorFor(RootActorPath(address) / "system" / "cluster" / "core")
val heartbeatSender = context.actorOf(Props[ClusterHeartbeatSender]. val heartbeatSender = context.actorOf(Props[ClusterHeartbeatSender].
withDispatcher(UseDispatcher), name = "heartbeatSender") withDispatcher(UseDispatcher), name = "heartbeatSender")
@ -300,7 +302,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
if (address == selfAddress) if (address == selfAddress)
joining(address) joining(address)
else else
coreSender ! SendClusterMessage(address, ClusterUserAction.Join(selfAddress)) clusterCore(address) ! ClusterUserAction.Join(selfAddress)
} }
} }
@ -497,7 +499,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
val rate = mergeRate(stats.mergeConflictCount) val rate = mergeRate(stats.mergeConflictCount)
if (rate <= MaxGossipMergeRate) if (rate <= MaxGossipMergeRate)
coreSender ! SendClusterMessage(to = localGossip.leader.get, msg = GossipMergeConflict(GossipEnvelope(selfAddress, localGossip), envelope)) localGossip.leader foreach { clusterCore(_) ! GossipMergeConflict(GossipEnvelope(selfAddress, localGossip), envelope) }
else else
log.debug("Skipping gossip merge conflict due to rate [{}] / s ", rate) log.debug("Skipping gossip merge conflict due to rate [{}] / s ", rate)
@ -712,18 +714,14 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
removedMembers foreach { member removedMembers foreach { member
val address = member.address val address = member.address
log.info("Cluster Node [{}] - Leader is moving node [{}] from EXITING to REMOVED - and removing node from node ring", selfAddress, address) log.info("Cluster Node [{}] - Leader is moving node [{}] from EXITING to REMOVED - and removing node from node ring", selfAddress, address)
coreSender ! SendClusterMessage( clusterCore(address) ! ClusterLeaderAction.Remove(address)
to = address,
msg = ClusterLeaderAction.Remove(address))
} }
// tell all exiting members to exit // tell all exiting members to exit
exitingMembers foreach { member exitingMembers foreach { member
val address = member.address val address = member.address
log.info("Cluster Node [{}] - Leader is moving node [{}] from LEAVING to EXITING", selfAddress, address) log.info("Cluster Node [{}] - Leader is moving node [{}] from LEAVING to EXITING", selfAddress, address)
coreSender ! SendClusterMessage( clusterCore(address) ! ClusterLeaderAction.Exit(address) // FIXME should use ? to await completion of handoff?
to = address,
msg = ClusterLeaderAction.Exit(address)) // FIXME should use ? to await completion of handoff?
} }
// log the auto-downing of the unreachable nodes // log the auto-downing of the unreachable nodes
@ -804,8 +802,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
gossipTo(address, GossipEnvelope(selfAddress, latestGossip, conversation = false)) gossipTo(address, GossipEnvelope(selfAddress, latestGossip, conversation = false))
def gossipTo(address: Address, gossipMsg: GossipEnvelope): Unit = def gossipTo(address: Address, gossipMsg: GossipEnvelope): Unit =
if (address != selfAddress) if (address != selfAddress) clusterCore(address) ! gossipMsg
coreSender ! SendClusterMessage(address, gossipMsg)
def publish(newGossip: Gossip): Unit = { def publish(newGossip: Gossip): Unit = {
publisher ! PublishChanges(newGossip) publisher ! PublishChanges(newGossip)
@ -873,27 +870,6 @@ private[cluster] final class JoinSeedNodeProcess(seedNodes: immutable.IndexedSeq
} }
} }
/**
* INTERNAL API.
*/
private[cluster] final class ClusterCoreSender extends Actor with ActorLogging {
import InternalClusterAction._
val selfAddress = Cluster(context.system).selfAddress
/**
* Looks up and returns the remote cluster command connection for the specific address.
*/
private def clusterCoreConnectionFor(address: Address): ActorRef =
context.actorFor(RootActorPath(address) / "system" / "cluster" / "core")
def receive = {
case SendClusterMessage(to, msg)
log.debug("Cluster Node [{}] - Trying to send [{}] to [{}]", selfAddress, msg.getClass.getSimpleName, to)
clusterCoreConnectionFor(to) ! msg
}
}
/** /**
* INTERNAL API * INTERNAL API
* *

View file

@ -7,9 +7,7 @@ import language.postfixOps
import scala.collection.immutable import scala.collection.immutable
import scala.concurrent.duration._ import scala.concurrent.duration._
import java.net.URLEncoder import akka.actor.{ ActorLogging, ActorRef, Address, Actor, RootActorPath, Props }
import akka.actor.{ ActorLogging, ActorRef, Address, Actor, RootActorPath, PoisonPill, Props }
import akka.pattern.{ CircuitBreaker, CircuitBreakerOpenException }
import akka.cluster.ClusterEvent._ import akka.cluster.ClusterEvent._
import akka.routing.MurmurHash import akka.routing.MurmurHash
@ -81,15 +79,9 @@ private[cluster] object ClusterHeartbeatSender {
* *
* This actor is responsible for sending the heartbeat messages to * This actor is responsible for sending the heartbeat messages to
* a few other nodes that will monitor this node. * a few other nodes that will monitor this node.
*
* Netty blocks when sending to broken connections. This actor
* isolates sending to different nodes by using child actors for each target
* address and thereby reduce the risk of irregular heartbeats to healty
* nodes due to broken connections to other nodes.
*/ */
private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogging { private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogging {
import ClusterHeartbeatSender._ import ClusterHeartbeatSender._
import ClusterHeartbeatSenderConnection._
import ClusterHeartbeatReceiver._ import ClusterHeartbeatReceiver._
import InternalClusterAction.HeartbeatTick import InternalClusterAction.HeartbeatTick
@ -121,13 +113,13 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
/** /**
* Looks up and returns the remote cluster heartbeat connection for the specific address. * Looks up and returns the remote cluster heartbeat connection for the specific address.
*/ */
def clusterHeartbeatConnectionFor(address: Address): ActorRef = def heartbeatReceiver(address: Address): ActorRef =
context.actorFor(RootActorPath(address) / "system" / "cluster" / "heartbeatReceiver") context.actorFor(RootActorPath(address) / "system" / "cluster" / "heartbeatReceiver")
/** /**
* Looks up and returns the remote cluster heartbeat sender for the specific address. * Looks up and returns the remote cluster heartbeat sender for the specific address.
*/ */
def heartbeatSenderFor(address: Address): ActorRef = context.actorFor(self.path.toStringWithAddress(address)) def heartbeatSender(address: Address): ActorRef = context.actorFor(self.path.toStringWithAddress(address))
def receive = { def receive = {
case HeartbeatTick heartbeat() case HeartbeatTick heartbeat()
@ -154,7 +146,7 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
def sendHeartbeatRequest(address: Address): Unit = def sendHeartbeatRequest(address: Address): Unit =
if (!cluster.failureDetector.isMonitoring(address) && state.ring.mySenders.contains(address)) { if (!cluster.failureDetector.isMonitoring(address) && state.ring.mySenders.contains(address)) {
heartbeatSenderFor(address) ! selfHeartbeatRequest heartbeatSender(address) ! selfHeartbeatRequest
// schedule the expected heartbeat for later, which will give the // schedule the expected heartbeat for later, which will give the
// sender a chance to start heartbeating, and also trigger some resends of // sender a chance to start heartbeating, and also trigger some resends of
// the heartbeat request // the heartbeat request
@ -170,28 +162,19 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
def heartbeat(): Unit = { def heartbeat(): Unit = {
state = state.removeOverdueHeartbeatRequest() state = state.removeOverdueHeartbeatRequest()
def connection(to: Address): ActorRef = { state.active foreach { to
// URL encoded target address as child actor name log.debug("Cluster Node [{}] - Heartbeat to [{}]", cluster.selfAddress, to)
val connectionName = URLEncoder.encode(to.toString, "UTF-8") heartbeatReceiver(to) ! selfHeartbeat
context.actorFor(connectionName) match {
case notFound if notFound.isTerminated
context.actorOf(Props(new ClusterHeartbeatSenderConnection(clusterHeartbeatConnectionFor(to))), connectionName)
case child child
}
} }
val deadline = Deadline.now + HeartbeatInterval
state.active foreach { to connection(to) ! SendHeartbeat(selfHeartbeat, to, deadline) }
// When sending heartbeats to a node is stopped a few `EndHeartbeat` messages is // When sending heartbeats to a node is stopped a few `EndHeartbeat` messages is
// sent to notify it that no more heartbeats will be sent. // sent to notify it that no more heartbeats will be sent.
for ((to, count) state.ending) { for ((to, count) state.ending) {
val c = connection(to) log.debug("Cluster Node [{}] - EndHeartbeat to [{}]", cluster.selfAddress, to)
c ! SendEndHeartbeat(selfEndHeartbeat, to) heartbeatReceiver(to) ! selfEndHeartbeat
if (count == NumberOfEndHeartbeats) { if (count == NumberOfEndHeartbeats)
state = state.removeEnding(to) state = state.removeEnding(to)
c ! PoisonPill else
} else
state = state.increaseEndingCount(to) state = state.increaseEndingCount(to)
} }
@ -308,68 +291,6 @@ private[cluster] case class ClusterHeartbeatSenderState private (
} }
/**
* INTERNAL API
*/
private[cluster] object ClusterHeartbeatSenderConnection {
import ClusterHeartbeatReceiver._
/**
* Command to [akka.cluster.ClusterHeartbeatSenderConnection]], which will send
* [[akka.cluster.ClusterHeartbeatReceiver.Heartbeat]] to the other node.
* Local only, no need to serialize.
*/
case class SendHeartbeat(heartbeatMsg: Heartbeat, to: Address, deadline: Deadline)
/**
* Command to [akka.cluster.ClusterHeartbeatSenderConnection]], which will send
* [[akka.cluster.ClusterHeartbeatReceiver.EndHeartbeat]] to the other node.
* Local only, no need to serialize.
*/
case class SendEndHeartbeat(endHeartbeatMsg: EndHeartbeat, to: Address)
}
/**
* Responsible for sending [[akka.cluster.ClusterHeartbeatReceiver.Heartbeat]]
* and [[akka.cluster.ClusterHeartbeatReceiver.EndHeartbeat]] to one specific address.
*
* This actor exists only because Netty blocks when sending to broken connections,
* and this actor uses a configurable circuit breaker to reduce connect attempts to broken
* connections.
*
* @see akka.cluster.ClusterHeartbeatSender
*/
private[cluster] final class ClusterHeartbeatSenderConnection(toRef: ActorRef)
extends Actor with ActorLogging {
import ClusterHeartbeatSenderConnection._
val breaker = {
val cbSettings = Cluster(context.system).settings.SendCircuitBreakerSettings
CircuitBreaker(context.system.scheduler,
cbSettings.maxFailures, cbSettings.callTimeout, cbSettings.resetTimeout).
onHalfOpen(log.debug("CircuitBreaker Half-Open for: [{}]", toRef)).
onOpen(log.info("CircuitBreaker Open for [{}]", toRef)).
onClose(log.debug("CircuitBreaker Closed for [{}]", toRef))
}
def receive = {
case SendHeartbeat(heartbeatMsg, _, deadline)
if (!deadline.isOverdue) {
log.debug("Cluster Node [{}] - Heartbeat to [{}]", heartbeatMsg.from, toRef)
// Netty blocks when sending to broken connections, the CircuitBreaker will
// measure elapsed time and open if too many long calls
try breaker.withSyncCircuitBreaker {
toRef ! heartbeatMsg
} catch { case e: CircuitBreakerOpenException /* skip sending heartbeat to broken connection */ }
}
if (deadline.isOverdue) log.info("Sending heartbeat to [{}] took longer than expected", toRef)
case SendEndHeartbeat(endHeartbeatMsg, _)
log.debug("Cluster Node [{}] - EndHeartbeat to [{}]", endHeartbeatMsg.from, toRef)
toRef ! endHeartbeatMsg
}
}
/** /**
* INTERNAL API * INTERNAL API
* *

View file

@ -72,10 +72,6 @@ class ClusterSettings(val config: Config, val systemName: String) {
final val MaxGossipMergeRate: Double = getDouble("akka.cluster.max-gossip-merge-rate") final val MaxGossipMergeRate: Double = getDouble("akka.cluster.max-gossip-merge-rate")
final val SchedulerTickDuration: FiniteDuration = Duration(getMilliseconds("akka.cluster.scheduler.tick-duration"), MILLISECONDS) final val SchedulerTickDuration: FiniteDuration = Duration(getMilliseconds("akka.cluster.scheduler.tick-duration"), MILLISECONDS)
final val SchedulerTicksPerWheel: Int = getInt("akka.cluster.scheduler.ticks-per-wheel") final val SchedulerTicksPerWheel: Int = getInt("akka.cluster.scheduler.ticks-per-wheel")
final val SendCircuitBreakerSettings: CircuitBreakerSettings = CircuitBreakerSettings(
maxFailures = getInt("akka.cluster.send-circuit-breaker.max-failures"),
callTimeout = Duration(getMilliseconds("akka.cluster.send-circuit-breaker.call-timeout"), MILLISECONDS),
resetTimeout = Duration(getMilliseconds("akka.cluster.send-circuit-breaker.reset-timeout"), MILLISECONDS))
final val MetricsEnabled: Boolean = getBoolean("akka.cluster.metrics.enabled") final val MetricsEnabled: Boolean = getBoolean("akka.cluster.metrics.enabled")
final val MetricsCollectorClass: String = getString("akka.cluster.metrics.collector-class") final val MetricsCollectorClass: String = getString("akka.cluster.metrics.collector-class")
final val MetricsInterval: FiniteDuration = { final val MetricsInterval: FiniteDuration = {
@ -87,4 +83,3 @@ class ClusterSettings(val config: Config, val systemName: String) {
} requiring (_ > Duration.Zero, "metrics.moving-average-half-life must be > 0") } requiring (_ > Duration.Zero, "metrics.moving-average-half-life must be > 0")
} }
case class CircuitBreakerSettings(maxFailures: Int, callTimeout: FiniteDuration, resetTimeout: FiniteDuration)

View file

@ -45,10 +45,6 @@ class ClusterConfigSpec extends AkkaSpec {
MaxGossipMergeRate must be(5.0 plusOrMinus 0.0001) MaxGossipMergeRate must be(5.0 plusOrMinus 0.0001)
SchedulerTickDuration must be(33 millis) SchedulerTickDuration must be(33 millis)
SchedulerTicksPerWheel must be(512) SchedulerTicksPerWheel must be(512)
SendCircuitBreakerSettings must be(CircuitBreakerSettings(
maxFailures = 3,
callTimeout = 2 seconds,
resetTimeout = 30 seconds))
MetricsEnabled must be(true) MetricsEnabled must be(true)
MetricsCollectorClass must be(classOf[SigarMetricsCollector].getName) MetricsCollectorClass must be(classOf[SigarMetricsCollector].getName)
MetricsInterval must be(3 seconds) MetricsInterval must be(3 seconds)

View file

@ -13,11 +13,11 @@ import scala.concurrent.duration._
import akka.remote.transport.AssociationRegistry import akka.remote.transport.AssociationRegistry
object RemotingSpec { object RemotingSpec {
class Echo extends Actor { class Echo1 extends Actor {
var target: ActorRef = context.system.deadLetters var target: ActorRef = context.system.deadLetters
def receive = { def receive = {
case (p: Props, n: String) sender ! context.actorOf(Props[Echo], n) case (p: Props, n: String) sender ! context.actorOf(Props[Echo1], n)
case ex: Exception throw ex case ex: Exception throw ex
case s: String sender ! context.actorFor(s) case s: String sender ! context.actorFor(s)
case x target = sender; sender ! x case x target = sender; sender ! x
@ -33,6 +33,12 @@ object RemotingSpec {
} }
} }
class Echo2 extends Actor {
def receive = {
case "ping" sender ! (("pong", sender))
}
}
val cfg: Config = ConfigFactory parseString (""" val cfg: Config = ConfigFactory parseString ("""
common-ssl-settings { common-ssl-settings {
key-store = "%s" key-store = "%s"
@ -115,11 +121,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
sys.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].deployer.deploy(d) sys.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].deployer.deploy(d)
} }
val remote = other.actorOf(Props(new Actor { val remote = other.actorOf(Props[Echo2], "echo")
def receive = {
case "ping" sender ! (("pong", sender))
}
}), "echo")
val here = system.actorFor("akka.test://remote-sys@localhost:12346/user/echo") val here = system.actorFor("akka.test://remote-sys@localhost:12346/user/echo")
@ -154,8 +156,43 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
}(other) }(other)
} }
"not be exhausted by sending to broken connections" in {
val moreSystems = Vector.fill(10)(ActorSystem(other.name, other.settings.config))
moreSystems foreach (_.actorOf(Props[Echo2], name = "echo"))
val moreRefs = moreSystems map (sys system.actorFor(RootActorPath(addr(sys, "tcp")) / "user" / "echo"))
val aliveEcho = system.actorFor(RootActorPath(addr(other, "tcp")) / "user" / "echo")
val n = 100
// first everything is up and running
1 to n foreach { x
aliveEcho ! "ping"
moreRefs(x % moreSystems.size) ! "ping"
}
within(5.seconds) {
receiveN(n * 2) foreach { reply reply must be(("pong", testActor)) }
}
// then we shutdown all but one system to simulate broken connections
moreSystems foreach { sys
sys.shutdown()
sys.awaitTermination(5.seconds.dilated)
sys.isTerminated must be(true)
}
1 to n foreach { x
aliveEcho ! "ping"
moreRefs(x % moreSystems.size) ! "ping"
}
// ping messages to aliveEcho should go through even though we use many different broken connections
within(5.seconds) {
receiveN(n) foreach { reply reply must be(("pong", testActor)) }
}
}
"create and supervise children on remote node" in { "create and supervise children on remote node" in {
val r = system.actorOf(Props[Echo], "blub") val r = system.actorOf(Props[Echo1], "blub")
r.path.toString must be === "akka.test://remote-sys@localhost:12346/remote/akka.test/RemotingSpec@localhost:12345/user/blub" r.path.toString must be === "akka.test://remote-sys@localhost:12346/remote/akka.test/RemotingSpec@localhost:12345/user/blub"
r ! 42 r ! 42
expectMsg(42) expectMsg(42)
@ -176,9 +213,9 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
case s: String sender ! context.actorFor(s) case s: String sender ! context.actorFor(s)
} }
}), "looker") }), "looker")
l ! (Props[Echo], "child") l ! (Props[Echo1], "child")
val r = expectMsgType[ActorRef] val r = expectMsgType[ActorRef]
r ! (Props[Echo], "grandchild") r ! (Props[Echo1], "grandchild")
val remref = expectMsgType[ActorRef] val remref = expectMsgType[ActorRef]
remref.asInstanceOf[ActorRefScope].isLocal must be(true) remref.asInstanceOf[ActorRefScope].isLocal must be(true)
val myref = system.actorFor(system / "looker" / "child" / "grandchild") val myref = system.actorFor(system / "looker" / "child" / "grandchild")
@ -199,7 +236,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
} }
"be able to use multiple transports and use the appropriate one (TCP)" in { "be able to use multiple transports and use the appropriate one (TCP)" in {
val r = system.actorOf(Props[Echo], "gonk") val r = system.actorOf(Props[Echo1], "gonk")
r.path.toString must be === r.path.toString must be ===
s"akka.tcp://remote-sys@localhost:${port(other, "tcp")}/remote/akka.tcp/RemotingSpec@localhost:${port(system, "tcp")}/user/gonk" s"akka.tcp://remote-sys@localhost:${port(other, "tcp")}/remote/akka.tcp/RemotingSpec@localhost:${port(system, "tcp")}/user/gonk"
r ! 42 r ! 42
@ -215,7 +252,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
} }
"be able to use multiple transports and use the appropriate one (UDP)" in { "be able to use multiple transports and use the appropriate one (UDP)" in {
val r = system.actorOf(Props[Echo], "zagzag") val r = system.actorOf(Props[Echo1], "zagzag")
r.path.toString must be === r.path.toString must be ===
s"akka.udp://remote-sys@localhost:${port(other, "udp")}/remote/akka.udp/RemotingSpec@localhost:${port(system, "udp")}/user/zagzag" s"akka.udp://remote-sys@localhost:${port(other, "udp")}/remote/akka.udp/RemotingSpec@localhost:${port(system, "udp")}/user/zagzag"
r ! 42 r ! 42
@ -231,7 +268,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
} }
"be able to use multiple transports and use the appropriate one (SSL)" in { "be able to use multiple transports and use the appropriate one (SSL)" in {
val r = system.actorOf(Props[Echo], "roghtaar") val r = system.actorOf(Props[Echo1], "roghtaar")
r.path.toString must be === r.path.toString must be ===
s"akka.ssl.tcp://remote-sys@localhost:${port(other, "ssl.tcp")}/remote/akka.ssl.tcp/RemotingSpec@localhost:${port(system, "ssl.tcp")}/user/roghtaar" s"akka.ssl.tcp://remote-sys@localhost:${port(other, "ssl.tcp")}/remote/akka.ssl.tcp/RemotingSpec@localhost:${port(system, "ssl.tcp")}/user/roghtaar"
r ! 42 r ! 42