From 7686fa523ea181f13e12b333d087d8c1ab80e5c6 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 6 Feb 2015 08:43:28 +0100 Subject: [PATCH] =con #13909 Don't use remote watch in ClusterClient * because it will result in quarantine if failure detection triggers and that kind of coupling is exactly what is not desired for a ClusterClient * replace by simple heartbeat failure detection, DeadlineFailureDetector * DeadLetterSuppression --- .../src/main/resources/reference.conf | 11 +++ .../akka/cluster/client/ClusterClient.scala | 78 +++++++++++++----- .../cluster/client/ClusterClientSpec.scala | 82 +++++++++++++++---- 3 files changed, 134 insertions(+), 37 deletions(-) diff --git a/akka-cluster-tools/src/main/resources/reference.conf b/akka-cluster-tools/src/main/resources/reference.conf index 47bc3b3a10..f9cf58b127 100644 --- a/akka-cluster-tools/src/main/resources/reference.conf +++ b/akka-cluster-tools/src/main/resources/reference.conf @@ -81,6 +81,17 @@ akka.cluster.client { # Interval at which the client will ask the ClusterReceptionist for # new contact points to be used for next reconnect. refresh-contacts-interval = 60s + + # How often failure detection heartbeat messages should be sent + heartbeat-interval = 2s + + # Number of potentially lost/delayed heartbeats that will be + # accepted before considering it to be an anomaly. + # The ClusterClient is using the akka.remote.DeadlineFailureDetector, which + # will trigger if there are no heartbeats within the duration + # heartbeat-interval + acceptable-heartbeat-pause, i.e. 15 seconds with + # the default settings. + acceptable-heartbeat-pause = 13s } # //#cluster-client-mailbox-config diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/client/ClusterClient.scala b/akka-cluster-tools/src/main/scala/akka/cluster/client/ClusterClient.scala index ad45c8b81f..d0b4b1fd20 100644 --- a/akka-cluster-tools/src/main/scala/akka/cluster/client/ClusterClient.scala +++ b/akka-cluster-tools/src/main/scala/akka/cluster/client/ClusterClient.scala @@ -4,10 +4,8 @@ package akka.cluster.client import java.net.URLEncoder - import scala.collection.immutable import scala.concurrent.duration._ - import akka.actor.Actor import akka.actor.ActorIdentity import akka.actor.ActorLogging @@ -37,6 +35,8 @@ import akka.japi.Util.immutableSeq import akka.routing.ConsistentHash import akka.routing.MurmurHash import com.typesafe.config.Config +import akka.actor.DeadLetterSuppression +import akka.remote.DeadlineFailureDetector object ClusterClientSettings { /** @@ -55,7 +55,9 @@ object ClusterClientSettings { new ClusterClientSettings( initialContacts, establishingGetContactsInterval = config.getDuration("establishing-get-contacts-interval", MILLISECONDS).millis, - refreshContactsInterval = config.getDuration("refresh-contacts-interval", MILLISECONDS).millis) + refreshContactsInterval = config.getDuration("refresh-contacts-interval", MILLISECONDS).millis, + heartbeatInterval = config.getDuration("heartbeat-interval", MILLISECONDS).millis, + acceptableHeartbeatPause = config.getDuration("acceptable-heartbeat-pause", MILLISECONDS).millis) } /** @@ -79,11 +81,19 @@ object ClusterClientSettings { * to establish contact with one of ClusterReceptionist on the servers (cluster nodes) * @param refreshContactsInterval Interval at which the client will ask the * `ClusterReceptionist` for new contact points to be used for next reconnect. + * @param heartbeatInterval How often failure detection heartbeat messages for detection + * of failed connections should be sent. + * @param acceptableHeartbeatPause Number of potentially lost/delayed heartbeats that will + * be accepted before considering it to be an anomaly. The ClusterClient is using the + * [[akka.remote.DeadlineFailureDetector]], which will trigger if there are no heartbeats + * within the duration `heartbeatInterval + acceptableHeartbeatPause`. */ final class ClusterClientSettings( val initialContacts: Set[ActorPath], val establishingGetContactsInterval: FiniteDuration, - val refreshContactsInterval: FiniteDuration) extends NoSerializationVerificationNeeded { + val refreshContactsInterval: FiniteDuration, + val heartbeatInterval: FiniteDuration, + val acceptableHeartbeatPause: FiniteDuration) extends NoSerializationVerificationNeeded { def withInitialContacts(initialContacts: Set[ActorPath]): ClusterClientSettings = { require(initialContacts.nonEmpty, "initialContacts must be defined") @@ -96,11 +106,17 @@ final class ClusterClientSettings( def withRefreshContactsInterval(refreshContactsInterval: FiniteDuration): ClusterClientSettings = copy(refreshContactsInterval = refreshContactsInterval) + def withHeartbeat(heartbeatInterval: FiniteDuration, acceptableHeartbeatPause: FiniteDuration): ClusterClientSettings = + copy(heartbeatInterval = heartbeatInterval, acceptableHeartbeatPause = acceptableHeartbeatPause) + private def copy( initialContacts: Set[ActorPath] = initialContacts, establishingGetContactsInterval: FiniteDuration = establishingGetContactsInterval, - refreshContactsInterval: FiniteDuration = refreshContactsInterval): ClusterClientSettings = - new ClusterClientSettings(initialContacts, establishingGetContactsInterval, refreshContactsInterval) + refreshContactsInterval: FiniteDuration = refreshContactsInterval, + heartbeatInterval: FiniteDuration = heartbeatInterval, + acceptableHeartbeatPause: FiniteDuration = acceptableHeartbeatPause): ClusterClientSettings = + new ClusterClientSettings(initialContacts, establishingGetContactsInterval, refreshContactsInterval, + heartbeatInterval, acceptableHeartbeatPause) } @@ -129,6 +145,7 @@ object ClusterClient { */ private[akka] object Internal { case object RefreshContactsTick + case object HeartbeatTick } } @@ -174,12 +191,16 @@ class ClusterClient(settings: ClusterClientSettings) extends Actor with Stash wi require(initialContacts.nonEmpty, "initialContacts must be defined") + val failureDetector = new DeadlineFailureDetector(acceptableHeartbeatPause, heartbeatInterval) + val initialContactsSel: immutable.IndexedSeq[ActorSelection] = initialContacts.map(context.actorSelection).toVector var contacts = initialContactsSel sendGetContacts() import context.dispatcher + val heartbeatTask = context.system.scheduler.schedule( + heartbeatInterval, heartbeatInterval, self, HeartbeatTick) var refreshContactsTask: Option[Cancellable] = None scheduleRefreshContactsTick(establishingGetContactsInterval) self ! RefreshContactsTick @@ -192,6 +213,7 @@ class ClusterClient(settings: ClusterClientSettings) extends Actor with Stash wi override def postStop(): Unit = { super.postStop() + heartbeatTask.cancel() refreshContactsTask foreach { _.cancel() } } @@ -204,15 +226,16 @@ class ClusterClient(settings: ClusterClientSettings) extends Actor with Stash wi contacts foreach { _ ! Identify(None) } } case ActorIdentity(_, Some(receptionist)) ⇒ - context watch receptionist log.info("Connected to [{}]", receptionist.path) - context.watch(receptionist) scheduleRefreshContactsTick(refreshContactsInterval) unstashAll() context.become(active(receptionist)) + failureDetector.heartbeat() case ActorIdentity(_, None) ⇒ // ok, use another instead - case RefreshContactsTick ⇒ sendGetContacts() - case msg ⇒ stash() + case HeartbeatTick ⇒ + failureDetector.heartbeat() + case RefreshContactsTick ⇒ sendGetContacts() + case msg ⇒ stash() } def active(receptionist: ActorRef): Actor.Receive = { @@ -222,17 +245,23 @@ class ClusterClient(settings: ClusterClientSettings) extends Actor with Stash wi receptionist forward DistributedPubSubMediator.SendToAll(path, msg) case Publish(topic, msg) ⇒ receptionist forward DistributedPubSubMediator.Publish(topic, msg) + case HeartbeatTick ⇒ + if (!failureDetector.isAvailable) { + log.info("Lost contact with [{}], restablishing connection", receptionist) + sendGetContacts() + scheduleRefreshContactsTick(establishingGetContactsInterval) + context.become(establishing) + failureDetector.heartbeat() + } else + receptionist ! Heartbeat + case HeartbeatRsp ⇒ + failureDetector.heartbeat() case RefreshContactsTick ⇒ receptionist ! GetContacts case Contacts(contactPoints) ⇒ // refresh of contacts if (contactPoints.nonEmpty) contacts = contactPoints - case Terminated(`receptionist`) ⇒ - log.info("Lost contact with [{}], restablishing connection", receptionist) - sendGetContacts() - scheduleRefreshContactsTick(establishingGetContactsInterval) - context.become(establishing) case _: ActorIdentity ⇒ // ok, from previous establish, already handled } @@ -405,11 +434,15 @@ object ClusterReceptionist { */ private[akka] object Internal { @SerialVersionUID(1L) - case object GetContacts + case object GetContacts extends DeadLetterSuppression @SerialVersionUID(1L) final case class Contacts(contactPoints: immutable.IndexedSeq[ActorSelection]) @SerialVersionUID(1L) - case object Ping + case object Heartbeat extends DeadLetterSuppression + @SerialVersionUID(1L) + case object HeartbeatRsp extends DeadLetterSuppression + @SerialVersionUID(1L) + case object Ping extends DeadLetterSuppression /** * Replies are tunneled via this actor, child of the receptionist, to avoid @@ -417,12 +450,10 @@ object ClusterReceptionist { */ class ClientResponseTunnel(client: ActorRef, timeout: FiniteDuration) extends Actor { context.setReceiveTimeout(timeout) - context.watch(client) def receive = { - case Ping ⇒ // keep alive from client - case ReceiveTimeout ⇒ context stop self - case Terminated(`client`) ⇒ context stop self - case msg ⇒ client forward msg + case Ping ⇒ // keep alive from client + case ReceiveTimeout ⇒ context stop self + case msg ⇒ client forward msg } } } @@ -508,6 +539,9 @@ class ClusterReceptionist(pubSubMediator: ActorRef, settings: ClusterReceptionis tunnel ! Ping // keep alive pubSubMediator.tell(msg, tunnel) + case Heartbeat ⇒ + sender() ! HeartbeatRsp + case GetContacts ⇒ // Consistent hashing is used to ensure that the reply to GetContacts // is the same from all nodes (most of the time) and it also diff --git a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/client/ClusterClientSpec.scala b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/client/ClusterClientSpec.scala index 2299898089..74d362d8be 100644 --- a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/client/ClusterClientSpec.scala +++ b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/client/ClusterClientSpec.scala @@ -18,6 +18,7 @@ import akka.remote.testkit.STMultiNodeSpec import akka.testkit._ import akka.actor.Address import akka.cluster.pubsub._ +import akka.remote.transport.ThrottlerTransportAdapter.Direction object ClusterClientSpec extends MultiNodeConfig { val client = role("client") @@ -31,13 +32,17 @@ object ClusterClientSpec extends MultiNodeConfig { akka.actor.provider = "akka.cluster.ClusterActorRefProvider" akka.remote.log-remote-lifecycle-events = off akka.cluster.auto-down-unreachable-after = 0s + akka.cluster.client.heartbeat-interval = 1s + akka.cluster.client.acceptable-heartbeat-pause = 3s """)) + testTransport(on = true) + class TestService(testActor: ActorRef) extends Actor { def receive = { case msg ⇒ testActor forward msg - sender() ! "ack" + sender() ! msg + "-ack" } } @@ -77,11 +82,13 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod } } - def roleName(addr: Address): Option[RoleName] = roles.find(node(_).address == addr) + var remainingServerRoleNames = Set(first, second, third, fourth) - def initialContacts = Set( - node(second) / "user" / "receptionist", - node(third) / "user" / "receptionist") + def roleName(addr: Address): Option[RoleName] = remainingServerRoleNames.find(node(_).address == addr) + + def initialContacts = (remainingServerRoleNames - first - fourth).map { r ⇒ + node(r) / "user" / "receptionist" + } "A ClusterClient" must { @@ -104,9 +111,10 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod "communicate to actor on any node in cluster" in within(10 seconds) { runOn(client) { val c = system.actorOf(ClusterClient.props( - ClusterClientSettings(system).withInitialContacts(initialContacts))) + ClusterClientSettings(system).withInitialContacts(initialContacts)), "client1") c ! ClusterClient.Send("/user/testService", "hello", localAffinity = true) - expectMsg("ack") + expectMsg("hello-ack") + system.stop(c) } runOn(fourth) { expectMsg("hello") @@ -140,7 +148,7 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod //#client runOn(client) { val c = system.actorOf(ClusterClient.props( - ClusterClientSettings(system).withInitialContacts(initialContacts))) + ClusterClientSettings(system).withInitialContacts(initialContacts)), "client") c ! ClusterClient.Send("/user/serviceA", "hello", localAffinity = true) c ! ClusterClient.SendToAll("/user/serviceB", "hi") } @@ -151,7 +159,7 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod receiveN(3).toSet should ===(Set("hello", "hi")) } - { //not used, only demo + lazy val docOnly = { //not used, only demo //#initialContacts val initialContacts = Set( system.actorSelection("akka.tcp://OtherSys@host1:2552/user/receptionist"), @@ -164,7 +172,7 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod enterBarrier("after-3") } - "re-establish connection to receptionist when connection is lost" in within(30 seconds) { + "re-establish connection to another receptionist when server is shutdown" in within(30 seconds) { runOn(first, second, third, fourth) { val service2 = system.actorOf(Props(classOf[TestService], testActor), "service2") ClusterClientReceptionist(system).registerService(service2) @@ -174,20 +182,24 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod runOn(client) { val c = system.actorOf(ClusterClient.props( - ClusterClientSettings(system).withInitialContacts(initialContacts))) + ClusterClientSettings(system).withInitialContacts(initialContacts)), "client2") c ! ClusterClient.Send("/user/service2", "bonjour", localAffinity = true) - expectMsg("ack") + expectMsg("bonjour-ack") val lastSenderAddress = lastSender.path.address val receptionistRoleName = roleName(lastSenderAddress) match { case Some(r) ⇒ r case None ⇒ fail("unexpected missing roleName: " + lastSender.path.address) } testConductor.exit(receptionistRoleName, 0).await - awaitAssert { - c ! ClusterClient.Send("/user/service2", "hi again", localAffinity = true) - expectMsg(1 second, "ack") + remainingServerRoleNames -= receptionistRoleName + within(remaining - 3.seconds) { + awaitAssert { + c ! ClusterClient.Send("/user/service2", "hi again", localAffinity = true) + expectMsg(1 second, "hi again-ack") + } } + system.stop(c) } enterBarrier("verifed-3") receiveWhile(2 seconds) { @@ -197,5 +209,45 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod enterBarrier("after-4") } + "re-establish connection to receptionist after partition" in within(30 seconds) { + runOn(client) { + val c = system.actorOf(ClusterClient.props( + ClusterClientSettings(system).withInitialContacts(initialContacts)), "client3") + + c ! ClusterClient.Send("/user/service2", "bonjour2", localAffinity = true) + expectMsg("bonjour2-ack") + val lastSenderAddress = lastSender.path.address + val receptionistRoleName = roleName(lastSenderAddress) match { + case Some(r) ⇒ r + case None ⇒ fail("unexpected missing roleName: " + lastSender.path.address) + } + // shutdown all but the one that the client is connected to + remainingServerRoleNames.foreach { r ⇒ + if (r != receptionistRoleName) + testConductor.exit(r, 0).await + } + remainingServerRoleNames = Set(receptionistRoleName) + // network partition between client and server + testConductor.blackhole(client, receptionistRoleName, Direction.Both).await + c ! ClusterClient.Send("/user/service2", "ping", localAffinity = true) + // if we would use remote watch the failure detector would trigger and + // connection quarantined + expectNoMsg(5 seconds) + + testConductor.passThrough(client, receptionistRoleName, Direction.Both).await + + val expectedAddress = node(receptionistRoleName).address + awaitAssert { + c ! ClusterClient.Send("/user/service2", "bonjour3", localAffinity = true) + expectMsg(1 second, "bonjour3-ack") + val lastSenderAddress = lastSender.path.address + lastSenderAddress should be(expectedAddress) + } + system.stop(c) + } + + enterBarrier("after-5") + } + } }