cluster client handover when receptionist's node leaves the cluster (#24167)

* Always add sender of GetContacts to client interactions

* Handover clients when receptionist leaves the cluster

* Revision based on code review

* Cluster receptionist only tracks connected clients
This commit is contained in:
fredfp 2018-01-09 17:41:33 +08:00 committed by Patrik Nordwall
parent 804dc4b6ba
commit 0bd408977b
5 changed files with 161 additions and 16 deletions

View file

@ -384,6 +384,7 @@ final class ClusterClient(settings: ClusterClientSettings) extends Actor with Ac
context.become(active(receptionist) orElse contactPointMessages) context.become(active(receptionist) orElse contactPointMessages)
connectTimerCancelable.foreach(_.cancel()) connectTimerCancelable.foreach(_.cancel())
failureDetector.heartbeat() failureDetector.heartbeat()
self ! HeartbeatTick // will register us as active client of the selected receptionist
case ActorIdentity(_, None) // ok, use another instead case ActorIdentity(_, None) // ok, use another instead
case HeartbeatTick case HeartbeatTick
failureDetector.heartbeat() failureDetector.heartbeat()
@ -397,6 +398,7 @@ final class ClusterClient(settings: ClusterClientSettings) extends Actor with Ac
case ReconnectTimeout case ReconnectTimeout
log.warning("Receptionist reconnect not successful within {} stopping cluster client", settings.reconnectTimeout) log.warning("Receptionist reconnect not successful within {} stopping cluster client", settings.reconnectTimeout)
context.stop(self) context.stop(self)
case ReceptionistShutdown // ok, haven't chosen a receptionist yet
} }
} }
@ -409,11 +411,8 @@ final class ClusterClient(settings: ClusterClientSettings) extends Actor with Ac
receptionist forward DistributedPubSubMediator.Publish(topic, msg) receptionist forward DistributedPubSubMediator.Publish(topic, msg)
case HeartbeatTick case HeartbeatTick
if (!failureDetector.isAvailable) { if (!failureDetector.isAvailable) {
log.info("Lost contact with [{}], restablishing connection", receptionist) log.info("Lost contact with [{}], reestablishing connection", receptionist)
sendGetContacts() reestablish()
scheduleRefreshContactsTick(establishingGetContactsInterval)
context.become(establishing orElse contactPointMessages)
failureDetector.heartbeat()
} else } else
receptionist ! Heartbeat receptionist ! Heartbeat
case HeartbeatRsp case HeartbeatRsp
@ -428,6 +427,11 @@ final class ClusterClient(settings: ClusterClientSettings) extends Actor with Ac
} }
publishContactPoints() publishContactPoints()
case _: ActorIdentity // ok, from previous establish, already handled case _: ActorIdentity // ok, from previous establish, already handled
case ReceptionistShutdown
if (receptionist == sender()) {
log.info("Receptionist [{}] is shutting down, reestablishing connection", receptionist)
reestablish()
}
} }
def contactPointMessages: Actor.Receive = { def contactPointMessages: Actor.Receive = {
@ -485,6 +489,13 @@ final class ClusterClient(settings: ClusterClientSettings) extends Actor with Ac
} }
contactPathsPublished = contactPaths contactPathsPublished = contactPaths
} }
def reestablish(): Unit = {
sendGetContacts()
scheduleRefreshContactsTick(establishingGetContactsInterval)
context.become(establishing orElse contactPointMessages)
failureDetector.heartbeat()
}
} }
object ClusterClientReceptionist extends ExtensionId[ClusterClientReceptionist] with ExtensionIdProvider { object ClusterClientReceptionist extends ExtensionId[ClusterClientReceptionist] with ExtensionIdProvider {
@ -801,6 +812,8 @@ object ClusterReceptionist {
@SerialVersionUID(1L) @SerialVersionUID(1L)
case object HeartbeatRsp extends ClusterClientMessage with DeadLetterSuppression case object HeartbeatRsp extends ClusterClientMessage with DeadLetterSuppression
@SerialVersionUID(1L) @SerialVersionUID(1L)
case object ReceptionistShutdown extends ClusterClientMessage with DeadLetterSuppression
@SerialVersionUID(1L)
case object Ping extends DeadLetterSuppression case object Ping extends DeadLetterSuppression
case object CheckDeadlines case object CheckDeadlines
@ -907,6 +920,7 @@ final class ClusterReceptionist(pubSubMediator: ActorRef, settings: ClusterRecep
super.postStop() super.postStop()
cluster unsubscribe self cluster unsubscribe self
checkDeadlinesTask.cancel() checkDeadlinesTask.cancel()
clientInteractions.keySet.foreach(_ ! ReceptionistShutdown)
} }
def matchingRole(m: Member): Boolean = role.forall(m.hasRole) def matchingRole(m: Member): Boolean = role.forall(m.hasRole)
@ -953,7 +967,6 @@ final class ClusterReceptionist(pubSubMediator: ActorRef, settings: ClusterRecep
if (log.isDebugEnabled) if (log.isDebugEnabled)
log.debug("Client [{}] gets contactPoints [{}]", sender().path, contacts.contactPoints.mkString(",")) log.debug("Client [{}] gets contactPoints [{}]", sender().path, contacts.contactPoints.mkString(","))
sender() ! contacts sender() ! contacts
updateClientInteractions(sender())
} }
case state: CurrentClusterState case state: CurrentClusterState

View file

@ -25,6 +25,7 @@ private[akka] class ClusterClientMessageSerializer(val system: ExtendedActorSyst
private val GetContactsManifest = "B" private val GetContactsManifest = "B"
private val HeartbeatManifest = "C" private val HeartbeatManifest = "C"
private val HeartbeatRspManifest = "D" private val HeartbeatRspManifest = "D"
private val ReceptionistShutdownManifest = "E"
private val emptyByteArray = Array.empty[Byte] private val emptyByteArray = Array.empty[Byte]
@ -32,22 +33,25 @@ private[akka] class ClusterClientMessageSerializer(val system: ExtendedActorSyst
ContactsManifest contactsFromBinary, ContactsManifest contactsFromBinary,
GetContactsManifest { _ GetContacts }, GetContactsManifest { _ GetContacts },
HeartbeatManifest { _ Heartbeat }, HeartbeatManifest { _ Heartbeat },
HeartbeatRspManifest { _ HeartbeatRsp }) HeartbeatRspManifest { _ HeartbeatRsp },
ReceptionistShutdownManifest { _ ReceptionistShutdown })
override def manifest(obj: AnyRef): String = obj match { override def manifest(obj: AnyRef): String = obj match {
case _: Contacts ContactsManifest case _: Contacts ContactsManifest
case GetContacts GetContactsManifest case GetContacts GetContactsManifest
case Heartbeat HeartbeatManifest case Heartbeat HeartbeatManifest
case HeartbeatRsp HeartbeatRspManifest case HeartbeatRsp HeartbeatRspManifest
case ReceptionistShutdown ReceptionistShutdownManifest
case _ case _
throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]") throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]")
} }
override def toBinary(obj: AnyRef): Array[Byte] = obj match { override def toBinary(obj: AnyRef): Array[Byte] = obj match {
case m: Contacts contactsToProto(m).toByteArray case m: Contacts contactsToProto(m).toByteArray
case GetContacts emptyByteArray case GetContacts emptyByteArray
case Heartbeat emptyByteArray case Heartbeat emptyByteArray
case HeartbeatRsp emptyByteArray case HeartbeatRsp emptyByteArray
case ReceptionistShutdown emptyByteArray
case _ case _
throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]") throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]")
} }

View file

@ -0,0 +1,127 @@
/**
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.cluster.client
import akka.actor.ActorRef
import akka.cluster.{ Cluster, ClusterReadView, MemberStatus }
import akka.remote.testconductor.RoleName
import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec, STMultiNodeSpec }
import akka.testkit.{ ImplicitSender, TestActors }
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
object ClusterClientHandoverSpec extends MultiNodeConfig {
val client = role("client")
val first = role("first")
val second = role("second")
commonConfig(ConfigFactory.parseString("""
akka.loglevel = INFO
akka.actor.provider = "cluster"
akka.remote.log-remote-lifecycle-events = off
akka.cluster.client {
heartbeat-interval = 1d
acceptable-heartbeat-pause = 1d
reconnect-timeout = 3s
refresh-contacts-interval = 1d
}
akka.test.filter-leeway = 10s
"""))
}
class ClusterClientHandoverMultiJvmNode1 extends ClusterClientHandoverSpec
class ClusterClientHandoverMultiJvmNode2 extends ClusterClientHandoverSpec
class ClusterClientHandoverMultiJvmNode3 extends ClusterClientHandoverSpec
class ClusterClientHandoverSpec extends MultiNodeSpec(ClusterClientHandoverSpec) with STMultiNodeSpec with ImplicitSender {
import ClusterClientHandoverSpec._
override def initialParticipants: Int = 3
def join(from: RoleName, to: RoleName): Unit = {
runOn(from) {
Cluster(system) join node(to).address
ClusterClientReceptionist(system)
}
enterBarrier(from.name + "-joined")
}
def clusterView: ClusterReadView = Cluster(system).readView
def awaitUp(expected: Int): Unit = {
awaitAssert {
awaitAssert(clusterView.members.size should ===(expected))
awaitAssert(clusterView.members.map(_.status) should ===(Set(MemberStatus.Up)))
}
}
def initialContacts = Set(first, second).map { r
node(r) / "system" / "receptionist"
}
"A Cluster Client" should {
"startup cluster with a single node" in within(30.seconds) {
join(first, first)
runOn(first) {
val service = system.actorOf(TestActors.echoActorProps, "testService")
ClusterClientReceptionist(system).registerService(service)
awaitUp(1)
}
enterBarrier("cluster-started")
}
var clusterClient: ActorRef = null
"establish connection to first node" in {
runOn(client) {
clusterClient = system.actorOf(ClusterClient.props(
ClusterClientSettings(system).withInitialContacts(initialContacts)), "client1")
clusterClient ! ClusterClient.Send("/user/testService", "hello", localAffinity = true)
expectMsgType[String](3.seconds) should be("hello")
}
enterBarrier("established")
}
"bring the second node into the cluster" in {
join(second, first)
runOn(second) {
val service = system.actorOf(TestActors.echoActorProps, "testService")
ClusterClientReceptionist(system).registerService(service)
awaitUp(2)
}
enterBarrier("second-up")
}
"remove first node from the cluster" in {
runOn(first) {
Cluster(system) leave node(first).address
}
runOn(second) {
awaitUp(1)
}
enterBarrier("handover-done")
}
"re-establish on receptionist shutdown" in {
runOn(client) {
clusterClient ! ClusterClient.Send("/user/testService", "hello", localAffinity = true)
expectMsgType[String](3.seconds) should be("hello")
}
enterBarrier("handover-successful")
}
}
}

View file

@ -28,6 +28,7 @@ class ClusterClientMessageSerializerSpec extends AkkaSpec {
checkSerialization(GetContacts) checkSerialization(GetContacts)
checkSerialization(Heartbeat) checkSerialization(Heartbeat)
checkSerialization(HeartbeatRsp) checkSerialization(HeartbeatRsp)
checkSerialization(ReceptionistShutdown)
} }
} }
} }

View file

@ -153,7 +153,7 @@ Scala
Java Java
: @@snip [ClusterClientTest.java]($akka$/akka-cluster-tools/src/test/java/akka/cluster/client/ClusterClientTest.java) { #clientEventsListener } : @@snip [ClusterClientTest.java]($akka$/akka-cluster-tools/src/test/java/akka/cluster/client/ClusterClientTest.java) { #clientEventsListener }
Similarly we can have an actor that behaves in a similar fashion for learning what cluster clients contact a `ClusterClientReceptionist`: Similarly we can have an actor that behaves in a similar fashion for learning what cluster clients are connected to a `ClusterClientReceptionist`:
Scala Scala
: @@snip [ClusterClientSpec.scala]($akka$/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/client/ClusterClientSpec.scala) { #receptionistEventsListener } : @@snip [ClusterClientSpec.scala]($akka$/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/client/ClusterClientSpec.scala) { #receptionistEventsListener }