diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/client/ClusterClient.scala b/akka-cluster-tools/src/main/scala/akka/cluster/client/ClusterClient.scala index 4069785236..8b058172ba 100644 --- a/akka-cluster-tools/src/main/scala/akka/cluster/client/ClusterClient.scala +++ b/akka-cluster-tools/src/main/scala/akka/cluster/client/ClusterClient.scala @@ -384,6 +384,7 @@ final class ClusterClient(settings: ClusterClientSettings) extends Actor with Ac context.become(active(receptionist) orElse contactPointMessages) connectTimerCancelable.foreach(_.cancel()) failureDetector.heartbeat() + self ! HeartbeatTick // will register us as active client of the selected receptionist case ActorIdentity(_, None) ⇒ // ok, use another instead case HeartbeatTick ⇒ failureDetector.heartbeat() @@ -397,6 +398,7 @@ final class ClusterClient(settings: ClusterClientSettings) extends Actor with Ac case ReconnectTimeout ⇒ log.warning("Receptionist reconnect not successful within {} stopping cluster client", settings.reconnectTimeout) context.stop(self) + case ReceptionistShutdown ⇒ // ok, haven't chosen a receptionist yet } } @@ -409,11 +411,8 @@ final class ClusterClient(settings: ClusterClientSettings) extends Actor with Ac receptionist forward DistributedPubSubMediator.Publish(topic, msg) case HeartbeatTick ⇒ if (!failureDetector.isAvailable) { - log.info("Lost contact with [{}], restablishing connection", receptionist) - sendGetContacts() - scheduleRefreshContactsTick(establishingGetContactsInterval) - context.become(establishing orElse contactPointMessages) - failureDetector.heartbeat() + log.info("Lost contact with [{}], reestablishing connection", receptionist) + reestablish() } else receptionist ! Heartbeat case HeartbeatRsp ⇒ @@ -428,6 +427,11 @@ final class ClusterClient(settings: ClusterClientSettings) extends Actor with Ac } publishContactPoints() case _: ActorIdentity ⇒ // ok, from previous establish, already handled + case ReceptionistShutdown ⇒ + if (receptionist == sender()) { + log.info("Receptionist [{}] is shutting down, reestablishing connection", receptionist) + reestablish() + } } def contactPointMessages: Actor.Receive = { @@ -485,6 +489,13 @@ final class ClusterClient(settings: ClusterClientSettings) extends Actor with Ac } contactPathsPublished = contactPaths } + + def reestablish(): Unit = { + sendGetContacts() + scheduleRefreshContactsTick(establishingGetContactsInterval) + context.become(establishing orElse contactPointMessages) + failureDetector.heartbeat() + } } object ClusterClientReceptionist extends ExtensionId[ClusterClientReceptionist] with ExtensionIdProvider { @@ -801,6 +812,8 @@ object ClusterReceptionist { @SerialVersionUID(1L) case object HeartbeatRsp extends ClusterClientMessage with DeadLetterSuppression @SerialVersionUID(1L) + case object ReceptionistShutdown extends ClusterClientMessage with DeadLetterSuppression + @SerialVersionUID(1L) case object Ping extends DeadLetterSuppression case object CheckDeadlines @@ -907,6 +920,7 @@ final class ClusterReceptionist(pubSubMediator: ActorRef, settings: ClusterRecep super.postStop() cluster unsubscribe self checkDeadlinesTask.cancel() + clientInteractions.keySet.foreach(_ ! ReceptionistShutdown) } def matchingRole(m: Member): Boolean = role.forall(m.hasRole) @@ -953,7 +967,6 @@ final class ClusterReceptionist(pubSubMediator: ActorRef, settings: ClusterRecep if (log.isDebugEnabled) log.debug("Client [{}] gets contactPoints [{}]", sender().path, contacts.contactPoints.mkString(",")) sender() ! contacts - updateClientInteractions(sender()) } case state: CurrentClusterState ⇒ diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/client/protobuf/ClusterClientMessageSerializer.scala b/akka-cluster-tools/src/main/scala/akka/cluster/client/protobuf/ClusterClientMessageSerializer.scala index f97d3771b2..867406fa91 100644 --- a/akka-cluster-tools/src/main/scala/akka/cluster/client/protobuf/ClusterClientMessageSerializer.scala +++ b/akka-cluster-tools/src/main/scala/akka/cluster/client/protobuf/ClusterClientMessageSerializer.scala @@ -25,6 +25,7 @@ private[akka] class ClusterClientMessageSerializer(val system: ExtendedActorSyst private val GetContactsManifest = "B" private val HeartbeatManifest = "C" private val HeartbeatRspManifest = "D" + private val ReceptionistShutdownManifest = "E" private val emptyByteArray = Array.empty[Byte] @@ -32,22 +33,25 @@ private[akka] class ClusterClientMessageSerializer(val system: ExtendedActorSyst ContactsManifest → contactsFromBinary, GetContactsManifest → { _ ⇒ GetContacts }, HeartbeatManifest → { _ ⇒ Heartbeat }, - HeartbeatRspManifest → { _ ⇒ HeartbeatRsp }) + HeartbeatRspManifest → { _ ⇒ HeartbeatRsp }, + ReceptionistShutdownManifest → { _ ⇒ ReceptionistShutdown }) override def manifest(obj: AnyRef): String = obj match { - case _: Contacts ⇒ ContactsManifest - case GetContacts ⇒ GetContactsManifest - case Heartbeat ⇒ HeartbeatManifest - case HeartbeatRsp ⇒ HeartbeatRspManifest + case _: Contacts ⇒ ContactsManifest + case GetContacts ⇒ GetContactsManifest + case Heartbeat ⇒ HeartbeatManifest + case HeartbeatRsp ⇒ HeartbeatRspManifest + case ReceptionistShutdown ⇒ ReceptionistShutdownManifest case _ ⇒ throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]") } override def toBinary(obj: AnyRef): Array[Byte] = obj match { - case m: Contacts ⇒ contactsToProto(m).toByteArray - case GetContacts ⇒ emptyByteArray - case Heartbeat ⇒ emptyByteArray - case HeartbeatRsp ⇒ emptyByteArray + case m: Contacts ⇒ contactsToProto(m).toByteArray + case GetContacts ⇒ emptyByteArray + case Heartbeat ⇒ emptyByteArray + case HeartbeatRsp ⇒ emptyByteArray + case ReceptionistShutdown ⇒ emptyByteArray case _ ⇒ throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]") } diff --git a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/client/ClusterClientHandoverSpec.scala b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/client/ClusterClientHandoverSpec.scala new file mode 100644 index 0000000000..ec4d861493 --- /dev/null +++ b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/client/ClusterClientHandoverSpec.scala @@ -0,0 +1,127 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package akka.cluster.client + +import akka.actor.ActorRef +import akka.cluster.{ Cluster, ClusterReadView, MemberStatus } +import akka.remote.testconductor.RoleName +import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec, STMultiNodeSpec } +import akka.testkit.{ ImplicitSender, TestActors } +import com.typesafe.config.ConfigFactory + +import scala.concurrent.duration._ + +object ClusterClientHandoverSpec extends MultiNodeConfig { + val client = role("client") + val first = role("first") + val second = role("second") + commonConfig(ConfigFactory.parseString(""" + akka.loglevel = INFO + akka.actor.provider = "cluster" + akka.remote.log-remote-lifecycle-events = off + akka.cluster.client { + heartbeat-interval = 1d + acceptable-heartbeat-pause = 1d + reconnect-timeout = 3s + refresh-contacts-interval = 1d + + } + akka.test.filter-leeway = 10s + """)) +} + +class ClusterClientHandoverMultiJvmNode1 extends ClusterClientHandoverSpec + +class ClusterClientHandoverMultiJvmNode2 extends ClusterClientHandoverSpec + +class ClusterClientHandoverMultiJvmNode3 extends ClusterClientHandoverSpec + +class ClusterClientHandoverSpec extends MultiNodeSpec(ClusterClientHandoverSpec) with STMultiNodeSpec with ImplicitSender { + + import ClusterClientHandoverSpec._ + + override def initialParticipants: Int = 3 + + def join(from: RoleName, to: RoleName): Unit = { + runOn(from) { + Cluster(system) join node(to).address + ClusterClientReceptionist(system) + } + enterBarrier(from.name + "-joined") + } + + def clusterView: ClusterReadView = Cluster(system).readView + + def awaitUp(expected: Int): Unit = { + awaitAssert { + awaitAssert(clusterView.members.size should ===(expected)) + awaitAssert(clusterView.members.map(_.status) should ===(Set(MemberStatus.Up))) + } + } + + def initialContacts = Set(first, second).map { r ⇒ + node(r) / "system" / "receptionist" + } + + "A Cluster Client" should { + + "startup cluster with a single node" in within(30.seconds) { + join(first, first) + runOn(first) { + val service = system.actorOf(TestActors.echoActorProps, "testService") + ClusterClientReceptionist(system).registerService(service) + awaitUp(1) + } + + enterBarrier("cluster-started") + } + + var clusterClient: ActorRef = null + + "establish connection to first node" in { + runOn(client) { + clusterClient = system.actorOf(ClusterClient.props( + ClusterClientSettings(system).withInitialContacts(initialContacts)), "client1") + clusterClient ! ClusterClient.Send("/user/testService", "hello", localAffinity = true) + expectMsgType[String](3.seconds) should be("hello") + } + + enterBarrier("established") + } + + "bring the second node into the cluster" in { + join(second, first) + runOn(second) { + val service = system.actorOf(TestActors.echoActorProps, "testService") + ClusterClientReceptionist(system).registerService(service) + awaitUp(2) + } + + enterBarrier("second-up") + } + + "remove first node from the cluster" in { + runOn(first) { + Cluster(system) leave node(first).address + } + + runOn(second) { + awaitUp(1) + } + + enterBarrier("handover-done") + } + + "re-establish on receptionist shutdown" in { + runOn(client) { + clusterClient ! ClusterClient.Send("/user/testService", "hello", localAffinity = true) + expectMsgType[String](3.seconds) should be("hello") + } + + enterBarrier("handover-successful") + } + + } + +} diff --git a/akka-cluster-tools/src/test/scala/akka/cluster/client/protobuf/ClusterClientMessageSerializerSpec.scala b/akka-cluster-tools/src/test/scala/akka/cluster/client/protobuf/ClusterClientMessageSerializerSpec.scala index 33e02fc176..96a4f95daa 100644 --- a/akka-cluster-tools/src/test/scala/akka/cluster/client/protobuf/ClusterClientMessageSerializerSpec.scala +++ b/akka-cluster-tools/src/test/scala/akka/cluster/client/protobuf/ClusterClientMessageSerializerSpec.scala @@ -28,6 +28,7 @@ class ClusterClientMessageSerializerSpec extends AkkaSpec { checkSerialization(GetContacts) checkSerialization(Heartbeat) checkSerialization(HeartbeatRsp) + checkSerialization(ReceptionistShutdown) } } } diff --git a/akka-docs/src/main/paradox/cluster-client.md b/akka-docs/src/main/paradox/cluster-client.md index f9b15bbd94..fffe4c3825 100644 --- a/akka-docs/src/main/paradox/cluster-client.md +++ b/akka-docs/src/main/paradox/cluster-client.md @@ -153,7 +153,7 @@ Scala Java : @@snip [ClusterClientTest.java]($akka$/akka-cluster-tools/src/test/java/akka/cluster/client/ClusterClientTest.java) { #clientEventsListener } -Similarly we can have an actor that behaves in a similar fashion for learning what cluster clients contact a `ClusterClientReceptionist`: +Similarly we can have an actor that behaves in a similar fashion for learning what cluster clients are connected to a `ClusterClientReceptionist`: Scala : @@snip [ClusterClientSpec.scala]($akka$/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/client/ClusterClientSpec.scala) { #receptionistEventsListener }