From ceb0678de2bdd981752bd1441b6e14f725c00f0f Mon Sep 17 00:00:00 2001 From: Christopher Hunt Date: Wed, 4 May 2016 04:50:16 -0700 Subject: [PATCH] Observe the cluster client and its receptionist Allows the cluster client and its receptionist to be observable in terms of contact points becoming available and client heartbeats. Furthermore a query API for requesting the current state has been provided. --- .../src/main/resources/reference.conf | 14 + .../akka/cluster/client/ClusterClient.scala | 348 +++++++++++++++++- .../cluster/client/ClusterClientSpec.scala | 152 +++++++- .../cluster/client/ClusterClientTest.java | 77 +++- akka-docs/rst/java/cluster-client.rst | 23 ++ akka-docs/rst/scala/cluster-client.rst | 23 ++ project/MiMa.scala | 7 +- 7 files changed, 610 insertions(+), 34 deletions(-) diff --git a/akka-cluster-tools/src/main/resources/reference.conf b/akka-cluster-tools/src/main/resources/reference.conf index b5f5617cfb..524f179f5e 100644 --- a/akka-cluster-tools/src/main/resources/reference.conf +++ b/akka-cluster-tools/src/main/resources/reference.conf @@ -72,6 +72,20 @@ akka.cluster.client.receptionist { # If specified you need to define the settings of the actual dispatcher. use-dispatcher = "" + # How often failure detection heartbeat messages should be received for + # each ClusterClient + heartbeat-interval = 2s + + # Number of potentially lost/delayed heartbeats that will be + # accepted before considering it to be an anomaly. + # The ClusterReceptionist is using the akka.remote.DeadlineFailureDetector, which + # will trigger if there are no heartbeats within the duration + # heartbeat-interval + acceptable-heartbeat-pause, i.e. 15 seconds with + # the default settings. + acceptable-heartbeat-pause = 13s + + # Failure detection checking interval for checking all ClusterClients + failure-detection-interval = 2s } # //#receptionist-ext-config diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/client/ClusterClient.scala b/akka-cluster-tools/src/main/scala/akka/cluster/client/ClusterClient.scala index fa393b7e59..54062f2846 100644 --- a/akka-cluster-tools/src/main/scala/akka/cluster/client/ClusterClient.scala +++ b/akka-cluster-tools/src/main/scala/akka/cluster/client/ClusterClient.scala @@ -4,6 +4,7 @@ package akka.cluster.client import java.net.URLEncoder + import scala.collection.immutable import scala.concurrent.duration._ import akka.actor.Actor @@ -11,10 +12,10 @@ import akka.actor.ActorIdentity import akka.actor.ActorLogging import akka.actor.ActorPath import akka.actor.ActorRef -import akka.actor.ActorSelection import akka.actor.ActorSystem import akka.actor.Address import akka.actor.Cancellable +import akka.actor.DeadLetterSuppression import akka.actor.Deploy import akka.actor.ExtendedActorSystem import akka.actor.Extension @@ -24,6 +25,7 @@ import akka.actor.Identify import akka.actor.NoSerializationVerificationNeeded import akka.actor.Props import akka.actor.ReceiveTimeout +import akka.actor.Terminated import akka.cluster.Cluster import akka.cluster.ClusterEvent._ import akka.cluster.Member @@ -33,10 +35,11 @@ import akka.japi.Util.immutableSeq import akka.routing.ConsistentHash import akka.routing.MurmurHash import com.typesafe.config.Config -import akka.actor.DeadLetterSuppression import akka.remote.DeadlineFailureDetector import akka.dispatch.Dispatchers +import scala.collection.immutable.{ HashMap, HashSet } + object ClusterClientSettings { /** * Create settings from the default configuration @@ -171,6 +174,80 @@ final class ClusterClientSettings( heartbeatInterval, acceptableHeartbeatPause, bufferSize, reconnectTimeout) } +/** + * Declares a super type for all events emitted by the `ClusterClient` + * in relation to contact points being added or removed. + */ +sealed trait ContactPointChange { + val contactPoint: ActorPath +} + +/** + * Emitted to a subscriber when contact points have been + * received by the ClusterClient and a new one has been added. + */ +final case class ContactPointAdded(override val contactPoint: ActorPath) extends ContactPointChange + +/** + * Emitted to a subscriber when contact points have been + * received by the ClusterClient and a new one has been added. + */ +final case class ContactPointRemoved(override val contactPoint: ActorPath) extends ContactPointChange + +sealed abstract class SubscribeContactPoints +/** + * Subscribe to a cluster client's contact point changes where + * it is guaranteed that a sender receives the initial state + * of contact points prior to any events in relation to them + * changing. + * The sender will automatically become unsubscribed when it + * terminates. + */ +case object SubscribeContactPoints extends SubscribeContactPoints { + /** + * Java API: get the singleton instance + */ + def getInstance = this +} + +sealed abstract class UnsubscribeContactPoints +/** + * Explicitly unsubscribe from contact point change events. + */ +case object UnsubscribeContactPoints extends UnsubscribeContactPoints { + /** + * Java API: get the singleton instance + */ + def getInstance = this +} + +sealed abstract class GetContactPoints +/** + * Get the contact points known to this client. A ``ContactPoints`` message + * will be replied. + */ +case object GetContactPoints extends GetContactPoints { + /** + * Java API: get the singleton instance + */ + def getInstance = this +} + +/** + * The reply to ``GetContactPoints``. + * + * @param contactPoints The presently known list of contact points. + */ +final case class ContactPoints(contactPoints: Set[ActorPath]) { + import scala.collection.JavaConverters._ + + /** + * Java API + */ + def getContactPoints: java.util.Set[ActorPath] = + contactPoints.asJava +} + object ClusterClient { /** @@ -254,11 +331,17 @@ final class ClusterClient(settings: ClusterClientSettings) extends Actor with Ac val failureDetector = new DeadlineFailureDetector(acceptableHeartbeatPause, heartbeatInterval) - val initialContactsSel: immutable.IndexedSeq[ActorSelection] = - initialContacts.map(context.actorSelection).toVector + var contactPaths: HashSet[ActorPath] = + initialContacts.to[HashSet] + val initialContactsSel = + contactPaths.map(context.actorSelection) var contacts = initialContactsSel sendGetContacts() + var contactPathsPublished = HashSet.empty[ActorPath] + + var subscribers = Vector.empty[ActorRef] + import context.dispatcher val heartbeatTask = context.system.scheduler.schedule( heartbeatInterval, heartbeatInterval, self, HeartbeatTick) @@ -280,7 +363,7 @@ final class ClusterClient(settings: ClusterClientSettings) extends Actor with Ac refreshContactsTask foreach { _.cancel() } } - def receive = establishing + def receive = establishing orElse contactPointMessages def establishing: Actor.Receive = { val connectTimerCancelable = settings.reconnectTimeout.map { timeout ⇒ @@ -290,14 +373,16 @@ final class ClusterClient(settings: ClusterClientSettings) extends Actor with Ac { case Contacts(contactPoints) ⇒ if (contactPoints.nonEmpty) { - contacts = contactPoints.map(context.actorSelection) - contacts foreach { _ ! Identify(None) } + contactPaths = contactPoints.map(ActorPath.fromString).to[HashSet] + contacts = contactPaths.map(context.actorSelection) + contacts foreach { _ ! Identify(Array.emptyByteArray) } } + publishContactPoints() case ActorIdentity(_, Some(receptionist)) ⇒ log.info("Connected to [{}]", receptionist.path) scheduleRefreshContactsTick(refreshContactsInterval) sendBuffered(receptionist) - context.become(active(receptionist)) + context.become(active(receptionist) orElse contactPointMessages) connectTimerCancelable.foreach(_.cancel()) failureDetector.heartbeat() case ActorIdentity(_, None) ⇒ // ok, use another instead @@ -328,7 +413,7 @@ final class ClusterClient(settings: ClusterClientSettings) extends Actor with Ac log.info("Lost contact with [{}], restablishing connection", receptionist) sendGetContacts() scheduleRefreshContactsTick(establishingGetContactsInterval) - context.become(establishing) + context.become(establishing orElse contactPointMessages) failureDetector.heartbeat() } else receptionist ! Heartbeat @@ -338,15 +423,33 @@ final class ClusterClient(settings: ClusterClientSettings) extends Actor with Ac receptionist ! GetContacts case Contacts(contactPoints) ⇒ // refresh of contacts - if (contactPoints.nonEmpty) - contacts = contactPoints.map(context.actorSelection) + if (contactPoints.nonEmpty) { + contactPaths = contactPoints.map(ActorPath.fromString).to[HashSet] + contacts = contactPaths.map(context.actorSelection) + } + publishContactPoints() case _: ActorIdentity ⇒ // ok, from previous establish, already handled } + def contactPointMessages: Actor.Receive = { + case SubscribeContactPoints ⇒ + val subscriber = sender() + subscriber ! ContactPoints(contactPaths) + subscribers :+= subscriber + context.watch(subscriber) + case UnsubscribeContactPoints ⇒ + val subscriber = sender() + subscribers = subscribers.filterNot(_ == subscriber) + case Terminated(subscriber) ⇒ + self.tell(UnsubscribeContactPoints, subscriber) + case GetContactPoints ⇒ + sender() ! ContactPoints(contactPaths) + } + def sendGetContacts(): Unit = { val sendTo = if (contacts.isEmpty) initialContactsSel - else if (contacts.size == 1) (initialContactsSel union contacts) + else if (contacts.size == 1) initialContactsSel union contacts else contacts if (log.isDebugEnabled) log.debug(s"""Sending GetContacts to [${sendTo.mkString(",")}]""") @@ -372,12 +475,24 @@ final class ClusterClient(settings: ClusterClientSettings) extends Actor with Ac receptionist.tell(msg, snd) } } + + def publishContactPoints(): Unit = { + for (cp ← contactPaths if !contactPathsPublished.contains(cp)) { + val contactPointAdded = ContactPointAdded(cp) + subscribers.foreach(_ ! contactPointAdded) + } + for (cp ← contactPathsPublished if !contactPaths.contains(cp)) { + val contactPointRemoved = ContactPointRemoved(cp) + subscribers.foreach(_ ! contactPointRemoved) + } + contactPathsPublished = contactPaths + } } object ClusterClientReceptionist extends ExtensionId[ClusterClientReceptionist] with ExtensionIdProvider { override def get(system: ActorSystem): ClusterClientReceptionist = super.get(system) - override def lookup = ClusterClientReceptionist + override def lookup() = ClusterClientReceptionist override def createExtension(system: ExtendedActorSystem): ClusterClientReceptionist = new ClusterClientReceptionist(system) @@ -456,6 +571,13 @@ final class ClusterClientReceptionist(system: ExtendedActorSystem) extends Exten .withDispatcher(dispatcher), name) } } + + /** + * Returns the underlying receptionist actor, particularly so that its + * events can be observed via subscribe/unsubscribe. + */ + def underlying: ActorRef = + receptionist } object ClusterReceptionistSettings { @@ -474,7 +596,10 @@ object ClusterReceptionistSettings { new ClusterReceptionistSettings( role = roleOption(config.getString("role")), numberOfContacts = config.getInt("number-of-contacts"), - responseTunnelReceiveTimeout = config.getDuration("response-tunnel-receive-timeout", MILLISECONDS).millis) + responseTunnelReceiveTimeout = config.getDuration("response-tunnel-receive-timeout", MILLISECONDS).millis, + heartbeatInterval = config.getDuration("heartbeat-interval", MILLISECONDS).millis, + acceptableHeartbeatPause = config.getDuration("acceptable-heartbeat-pause", MILLISECONDS).millis, + failureDetectionInterval = config.getDuration("failure-detection-interval", MILLISECONDS).millis) /** * Java API: Create settings from the default configuration @@ -518,12 +643,62 @@ final class ClusterReceptionistSettings( def withResponseTunnelReceiveTimeout(responseTunnelReceiveTimeout: FiniteDuration): ClusterReceptionistSettings = copy(responseTunnelReceiveTimeout = responseTunnelReceiveTimeout) + def withHeartbeat( + heartbeatInterval: FiniteDuration, + acceptableHeartbeatPause: FiniteDuration, + failureDetectionInterval: FiniteDuration): ClusterReceptionistSettings = + copy( + heartbeatInterval = heartbeatInterval, + acceptableHeartbeatPause = acceptableHeartbeatPause, + failureDetectionInterval = failureDetectionInterval) + + // BEGIN BINARY COMPATIBILITY + // The following is required in order to maintain binary + // compatibility with 2.4. Post 2.4, the following 3 properties should + // be moved to the class's constructor, and the following section of code + // should be removed entirely. + // TODO: ADDRESS FOR v.2.5 + + def heartbeatInterval: FiniteDuration = + _heartbeatInterval + def acceptableHeartbeatPause: FiniteDuration = + _acceptableHeartbeatPause + def failureDetectionInterval: FiniteDuration = + _failureDetectionInterval + + private var _heartbeatInterval: FiniteDuration = 2.seconds + private var _acceptableHeartbeatPause: FiniteDuration = 13.seconds + private var _failureDetectionInterval: FiniteDuration = 2.second + + def this( + role: Option[String], + numberOfContacts: Int, + responseTunnelReceiveTimeout: FiniteDuration, + heartbeatInterval: FiniteDuration, + acceptableHeartbeatPause: FiniteDuration, + failureDetectionInterval: FiniteDuration) = { + this(role, numberOfContacts, responseTunnelReceiveTimeout) + this._heartbeatInterval = heartbeatInterval + this._acceptableHeartbeatPause = acceptableHeartbeatPause + this._failureDetectionInterval = failureDetectionInterval + } + + // END BINARY COMPATIBILITY + private def copy( role: Option[String] = role, numberOfContacts: Int = numberOfContacts, - responseTunnelReceiveTimeout: FiniteDuration = responseTunnelReceiveTimeout): ClusterReceptionistSettings = - new ClusterReceptionistSettings(role, numberOfContacts, responseTunnelReceiveTimeout) - + responseTunnelReceiveTimeout: FiniteDuration = responseTunnelReceiveTimeout, + heartbeatInterval: FiniteDuration = heartbeatInterval, + acceptableHeartbeatPause: FiniteDuration = acceptableHeartbeatPause, + failureDetectionInterval: FiniteDuration = failureDetectionInterval): ClusterReceptionistSettings = + new ClusterReceptionistSettings( + role, + numberOfContacts, + responseTunnelReceiveTimeout, + heartbeatInterval, + acceptableHeartbeatPause, + failureDetectionInterval) } /** @@ -531,6 +706,80 @@ final class ClusterReceptionistSettings( */ sealed trait ClusterClientMessage extends Serializable +/** + * Declares a super type for all events emitted by the `ClusterReceptionist`. + * in relation to cluster clients being interacted with. + */ +sealed trait ClusterClientInteraction { + val clusterClient: ActorRef +} + +/** + * Emitted to the Akka event stream when a cluster client has interacted with + * a receptionist. + */ +final case class ClusterClientUp(override val clusterClient: ActorRef) extends ClusterClientInteraction + +/** + * Emitted to the Akka event stream when a cluster client was previously connected + * but then not seen for some time. + */ +final case class ClusterClientUnreachable(override val clusterClient: ActorRef) extends ClusterClientInteraction + +sealed abstract class SubscribeClusterClients +/** + * Subscribe to a cluster receptionist's client interactions where + * it is guaranteed that a sender receives the initial state + * of contact points prior to any events in relation to them + * changing. + * The sender will automatically become unsubscribed when it + * terminates. + */ +case object SubscribeClusterClients extends SubscribeClusterClients { + /** + * Java API: get the singleton instance + */ + def getInstance = this +} + +sealed abstract class UnsubscribeClusterClients +/** + * Explicitly unsubscribe from client interaction events. + */ +case object UnsubscribeClusterClients extends UnsubscribeClusterClients { + /** + * Java API: get the singleton instance + */ + def getInstance = this +} + +sealed abstract class GetClusterClients +/** + * Get the cluster clients known to this receptionist. A ``ClusterClients`` message + * will be replied. + */ +case object GetClusterClients extends GetClusterClients { + /** + * Java API: get the singleton instance + */ + def getInstance = this +} + +/** + * The reply to ``GetClusterClients``. + * + * @param clusterClients The presently known list of cluster clients. + */ +final case class ClusterClients(clusterClients: Set[ActorRef]) { + import scala.collection.JavaConverters._ + + /** + * Java API + */ + def getClusterClients: java.util.Set[ActorRef] = + clusterClients.asJava +} + object ClusterReceptionist { /** @@ -555,6 +804,7 @@ object ClusterReceptionist { case object HeartbeatRsp extends ClusterClientMessage with DeadLetterSuppression @SerialVersionUID(1L) case object Ping extends DeadLetterSuppression + case object CheckDeadlines /** * Replies are tunneled via this actor, child of the receptionist, to avoid @@ -609,12 +859,12 @@ final class ClusterReceptionist(pubSubMediator: ActorRef, settings: ClusterRecep import cluster.selfAddress require(role.forall(cluster.selfRoles.contains), - s"This cluster member [${selfAddress}] doesn't have the role [$role]") + s"This cluster member [$selfAddress] doesn't have the role [$role]") var nodes: immutable.SortedSet[Address] = { def hashFor(node: Address): Int = node match { // cluster node identifier is the host and port of the address; protocol and system is assumed to be the same - case Address(_, _, Some(host), Some(port)) ⇒ MurmurHash.stringHash(s"${host}:${port}") + case Address(_, _, Some(host), Some(port)) ⇒ MurmurHash.stringHash(s"$host:$port") case _ ⇒ throw new IllegalStateException(s"Unexpected address without host/port: [$node]") } @@ -628,6 +878,17 @@ final class ClusterReceptionist(pubSubMediator: ActorRef, settings: ClusterRecep val virtualNodesFactor = 10 var consistentHash: ConsistentHash[Address] = ConsistentHash(nodes, virtualNodesFactor) + var clientInteractions = HashMap.empty[ActorRef, DeadlineFailureDetector] + var clientsPublished = HashSet.empty[ActorRef] + + var subscribers = Vector.empty[ActorRef] + + val checkDeadlinesTask = context.system.scheduler.schedule( + failureDetectionInterval, + failureDetectionInterval, + self, + CheckDeadlines)(context.dispatcher) + override def preStart(): Unit = { super.preStart() require(!cluster.isTerminated, "Cluster node must not be terminated") @@ -637,6 +898,7 @@ final class ClusterReceptionist(pubSubMediator: ActorRef, settings: ClusterRecep override def postStop(): Unit = { super.postStop() cluster unsubscribe self + checkDeadlinesTask.cancel() } def matchingRole(m: Member): Boolean = role.forall(m.hasRole) @@ -659,6 +921,7 @@ final class ClusterReceptionist(pubSubMediator: ActorRef, settings: ClusterRecep case Heartbeat ⇒ if (verboseHeartbeat) log.debug("Heartbeat from client [{}]", sender().path) sender() ! HeartbeatRsp + updateClientInteractions(sender()) case GetContacts ⇒ // Consistent hashing is used to ensure that the reply to GetContacts @@ -682,6 +945,7 @@ final class ClusterReceptionist(pubSubMediator: ActorRef, settings: ClusterRecep if (log.isDebugEnabled) log.debug("Client [{}] gets contactPoints [{}]", sender().path, contacts.contactPoints.mkString(",")) sender() ! contacts + updateClientInteractions(sender()) } case state: CurrentClusterState ⇒ @@ -703,7 +967,53 @@ final class ClusterReceptionist(pubSubMediator: ActorRef, settings: ClusterRecep } case _: MemberEvent ⇒ // not of interest + + case SubscribeClusterClients ⇒ + val subscriber = sender() + subscriber ! ClusterClients(clientInteractions.keySet.to[HashSet]) + subscribers :+= subscriber + context.watch(subscriber) + + case UnsubscribeClusterClients ⇒ + val subscriber = sender() + subscribers = subscribers.filterNot(_ == subscriber) + + case Terminated(subscriber) ⇒ + self.tell(UnsubscribeClusterClients, subscriber) + + case GetClusterClients ⇒ + sender() ! ClusterClients(clientInteractions.keySet.to[HashSet]) + + case CheckDeadlines ⇒ + clientInteractions = clientInteractions.filter { + case (_, failureDetector) ⇒ + failureDetector.isAvailable + } + publishClientsUnreachable() } + def updateClientInteractions(client: ActorRef): Unit = + clientInteractions.get(client) match { + case Some(failureDetector) ⇒ + failureDetector.heartbeat() + case None ⇒ + val failureDetector = new DeadlineFailureDetector(acceptableHeartbeatPause, heartbeatInterval) + failureDetector.heartbeat() + clientInteractions = clientInteractions + (client -> failureDetector) + log.debug("Received new contact from [{}]", client.path) + val clusterClientUp = ClusterClientUp(client) + subscribers.foreach(_ ! clusterClientUp) + clientsPublished = clientInteractions.keySet.to[HashSet] + } + + def publishClientsUnreachable(): Unit = { + val publishableClients = clientInteractions.keySet.to[HashSet] + for (c ← clientsPublished if !publishableClients.contains(c)) { + log.debug("Lost contact with [{}]", c.path) + val clusterClientUnreachable = ClusterClientUnreachable(c) + subscribers.foreach(_ ! clusterClientUnreachable) + } + clientsPublished = publishableClients + } } diff --git a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/client/ClusterClientSpec.scala b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/client/ClusterClientSpec.scala index 7823867438..89c7554ead 100644 --- a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/client/ClusterClientSpec.scala +++ b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/client/ClusterClientSpec.scala @@ -6,22 +6,19 @@ package akka.cluster.client import language.postfixOps import scala.concurrent.duration._ import com.typesafe.config.ConfigFactory -import akka.actor.Actor -import akka.actor.ActorPath -import akka.actor.ActorRef -import akka.actor.ActorSystem -import akka.actor.ExtendedActorSystem -import akka.actor.Props +import akka.actor.{ Actor, ActorPath, ActorRef, ActorSystem, Address, ExtendedActorSystem, NoSerializationVerificationNeeded, Props } import akka.cluster.Cluster -import akka.cluster.ClusterEvent._ +import akka.cluster.client.ClusterClientSpec.TestClientListener.LatestContactPoints +import akka.cluster.client.ClusterClientSpec.TestReceptionistListener.LatestClusterClients import akka.remote.testconductor.RoleName import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.remote.testkit.STMultiNodeSpec import akka.testkit._ -import akka.actor.Address import akka.cluster.pubsub._ import akka.remote.transport.ThrottlerTransportAdapter.Direction +import akka.util.Timeout + import scala.concurrent.Await object ClusterClientSpec extends MultiNodeConfig { @@ -38,8 +35,12 @@ object ClusterClientSpec extends MultiNodeConfig { akka.cluster.auto-down-unreachable-after = 0s akka.cluster.client.heartbeat-interval = 1s akka.cluster.client.acceptable-heartbeat-pause = 3s + akka.cluster.client.refresh-contacts-interval = 1s # number-of-contacts must be >= 4 because we shutdown all but one in the end akka.cluster.client.receptionist.number-of-contacts = 4 + akka.cluster.client.receptionist.heartbeat-interval = 10s + akka.cluster.client.receptionist.acceptable-heartbeat-pause = 10s + akka.cluster.client.receptionist.failure-detection-interval = 1s akka.test.filter-leeway = 10s """)) @@ -63,6 +64,83 @@ object ClusterClientSpec extends MultiNodeConfig { } } + //#clientEventsListener + class ClientListener(targetClient: ActorRef) extends Actor { + override def preStart(): Unit = + targetClient ! SubscribeContactPoints + + def receive: Receive = + receiveWithContactPoints(Set.empty) + + def receiveWithContactPoints(contactPoints: Set[ActorPath]): Receive = { + case ContactPoints(cps) ⇒ + context.become(receiveWithContactPoints(cps)) + // Now do something with the up-to-date "cps" + case ContactPointAdded(cp) ⇒ + context.become(receiveWithContactPoints(contactPoints + cp)) + // Now do something with an up-to-date "contactPoints + cp" + case ContactPointRemoved(cp) ⇒ + context.become(receiveWithContactPoints(contactPoints - cp)) + // Now do something with an up-to-date "contactPoints - cp" + } + } + //#clientEventsListener + + object TestClientListener { + case object GetLatestContactPoints + case class LatestContactPoints(contactPoints: Set[ActorPath]) extends NoSerializationVerificationNeeded + } + + class TestClientListener(targetClient: ActorRef) extends ClientListener(targetClient) { + + import TestClientListener._ + + override def receiveWithContactPoints(contactPoints: Set[ActorPath]): Receive = { + case GetLatestContactPoints ⇒ + sender() ! LatestContactPoints(contactPoints) + case msg: Any ⇒ + super.receiveWithContactPoints(contactPoints)(msg) + } + } + + //#receptionistEventsListener + class ReceptionistListener(targetReceptionist: ActorRef) extends Actor { + override def preStart(): Unit = + targetReceptionist ! SubscribeClusterClients + + def receive: Receive = + receiveWithClusterClients(Set.empty) + + def receiveWithClusterClients(clusterClients: Set[ActorRef]): Receive = { + case ClusterClients(cs) ⇒ + context.become(receiveWithClusterClients(cs)) + // Now do something with the up-to-date "c" + case ClusterClientUp(c) ⇒ + context.become(receiveWithClusterClients(clusterClients + c)) + // Now do something with an up-to-date "clusterClients + c" + case ClusterClientUnreachable(c) ⇒ + context.become(receiveWithClusterClients(clusterClients - c)) + // Now do something with an up-to-date "clusterClients - c" + } + } + //#receptionistEventsListener + + object TestReceptionistListener { + case object GetLatestClusterClients + case class LatestClusterClients(clusterClients: Set[ActorRef]) extends NoSerializationVerificationNeeded + } + + class TestReceptionistListener(targetReceptionist: ActorRef) extends ReceptionistListener(targetReceptionist) { + + import TestReceptionistListener._ + + override def receiveWithClusterClients(clusterClients: Set[ActorRef]): Receive = { + case GetLatestClusterClients ⇒ + sender() ! LatestClusterClients(clusterClients) + case msg: Any ⇒ + super.receiveWithClusterClients(clusterClients)(msg) + } + } } class ClusterClientMultiJvmNode1 extends ClusterClientSpec @@ -185,6 +263,50 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod enterBarrier("after-3") } + "report events" in within(15 seconds) { + runOn(client) { + implicit val timeout = Timeout(1.second.dilated) + val c = Await.result(system.actorSelection("/user/client").resolveOne(), timeout.duration) + val l = system.actorOf(Props(classOf[TestClientListener], c), "reporter-client-listener") + + val expectedContacts = Set(first, second, third, fourth).map(node(_) / "system" / "receptionist") + within(10.seconds) { + awaitAssert { + val probe = TestProbe() + l.tell(TestClientListener.GetLatestContactPoints, probe.ref) + probe.expectMsgType[LatestContactPoints].contactPoints should ===(expectedContacts) + } + } + } + + enterBarrier("reporter-client-listener-tested") + + runOn(first, second, third) { + // Only run this test on a node that knows about our client. It could be that no node knows + // but there isn't a means of expressing that at least one of the nodes needs to pass the test. + implicit val timeout = Timeout(2.seconds.dilated) + val r = ClusterClientReceptionist(system).underlying + r ! GetClusterClients + val cps = expectMsgType[ClusterClients] + if (cps.clusterClients.exists(_.path.name == "client")) { + log.info("Testing that the receptionist has just one client") + val l = system.actorOf(Props(classOf[TestReceptionistListener], r), "reporter-receptionist-listener") + + val c = Await.result(system.actorSelection(node(client) / "user" / "client").resolveOne(), timeout.duration) + val expectedClients = Set(c) + within(10.seconds) { + awaitAssert { + val probe = TestProbe() + l.tell(TestReceptionistListener.GetLatestClusterClients, probe.ref) + probe.expectMsgType[LatestClusterClients].clusterClients should ===(expectedClients) + } + } + } + } + + enterBarrier("after-6") + } + "re-establish connection to another receptionist when server is shutdown" in within(30 seconds) { runOn(first, second, third, fourth) { val service2 = system.actorOf(Props(classOf[TestService], testActor), "service2") @@ -219,6 +341,20 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod case "hi again" ⇒ case other ⇒ fail("unexpected message: " + other) } + enterBarrier("verifed-4") + runOn(client) { + // Locate the test listener from a previous test and see that it agrees + // with what the client is telling it about what receptionists are alive + val l = system.actorSelection("/user/reporter-client-listener") + val expectedContacts = remainingServerRoleNames.map(node(_) / "system" / "receptionist") + within(10.seconds) { + awaitAssert { + val probe = TestProbe() + l.tell(TestClientListener.GetLatestContactPoints, probe.ref) + probe.expectMsgType[LatestContactPoints].contactPoints should ===(expectedContacts) + } + } + } enterBarrier("after-4") } diff --git a/akka-cluster-tools/src/test/java/akka/cluster/client/ClusterClientTest.java b/akka-cluster-tools/src/test/java/akka/cluster/client/ClusterClientTest.java index 306593c979..71117b1629 100644 --- a/akka-cluster-tools/src/test/java/akka/cluster/client/ClusterClientTest.java +++ b/akka-cluster-tools/src/test/java/akka/cluster/client/ClusterClientTest.java @@ -4,6 +4,7 @@ package akka.cluster.client; +import akka.actor.*; import com.typesafe.config.ConfigFactory; import java.util.Arrays; import java.util.HashSet; @@ -11,12 +12,6 @@ import java.util.Set; import org.junit.ClassRule; import org.junit.Test; -import akka.actor.ActorPath; -import akka.actor.ActorPaths; -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.actor.Props; -import akka.actor.UntypedActor; import akka.testkit.AkkaJUnitActorSystemResource; import org.scalatest.junit.JUnitSuite; @@ -57,10 +52,80 @@ public class ClusterClientTest extends JUnitSuite { c.tell(new ClusterClient.Send("/user/serviceA", "hello", true), ActorRef.noSender()); c.tell(new ClusterClient.SendToAll("/user/serviceB", "hi"), ActorRef.noSender()); //#client + + system.actorOf(Props.create(ClientListener.class, c)); + system.actorOf(Props.create(ReceptionistListener.class, ClusterClientReceptionist.get(system).underlying())); } static public class Service extends UntypedActor { public void onReceive(Object msg) { } } + + //#clientEventsListener + static public class ClientListener extends UntypedActor { + private final ActorRef targetClient; + private final Set contactPoints = new HashSet<>(); + + public ClientListener(ActorRef targetClient) { + this.targetClient = targetClient; + } + + @Override + public void preStart() { + targetClient.tell(SubscribeContactPoints.getInstance(), sender()); + } + + @Override + public void onReceive(Object message) { + if (message instanceof ContactPoints) { + ContactPoints msg = (ContactPoints)message; + contactPoints.addAll(msg.getContactPoints()); + // Now do something with an up-to-date "contactPoints" + } else if (message instanceof ContactPointAdded) { + ContactPointAdded msg = (ContactPointAdded) message; + contactPoints.add(msg.contactPoint()); + // Now do something with an up-to-date "contactPoints" + } else if (message instanceof ContactPointRemoved) { + ContactPointRemoved msg = (ContactPointRemoved)message; + contactPoints.remove(msg.contactPoint()); + // Now do something with an up-to-date "contactPoints" + } + } + } + //#clientEventsListener + + //#receptionistEventsListener + static public class ReceptionistListener extends UntypedActor { + private final ActorRef targetReceptionist; + private final Set clusterClients = new HashSet<>(); + + public ReceptionistListener(ActorRef targetReceptionist) { + this.targetReceptionist = targetReceptionist; + } + + @Override + public void preStart() { + targetReceptionist.tell(SubscribeClusterClients.getInstance(), sender()); + } + + @Override + public void onReceive(Object message) { + if (message instanceof ClusterClients) { + ClusterClients msg = (ClusterClients) message; + clusterClients.addAll(msg.getClusterClients()); + // Now do something with an up-to-date "clusterClients" + } else if (message instanceof ClusterClientUp) { + ClusterClientUp msg = (ClusterClientUp) message; + clusterClients.add(msg.clusterClient()); + // Now do something with an up-to-date "clusterClients" + } else if (message instanceof ClusterClientUnreachable) { + ClusterClientUnreachable msg = (ClusterClientUnreachable) message; + clusterClients.remove(msg.clusterClient()); + // Now do something with an up-to-date "clusterClients" + } + } + } + //#receptionistEventsListener + } diff --git a/akka-docs/rst/java/cluster-client.rst b/akka-docs/rst/java/cluster-client.rst index 3676991a9f..29aa4413a6 100644 --- a/akka-docs/rst/java/cluster-client.rst +++ b/akka-docs/rst/java/cluster-client.rst @@ -33,6 +33,16 @@ The ``ClusterClientReceptionist`` provides methods for registration of actors th should be reachable from the client. Messages are wrapped in ``ClusterClient.Send``, ``ClusterClient.SendToAll`` or ``ClusterClient.Publish``. +Both the ``ClusterClient`` and the ``ClusterClientReceptionist`` emit events that can be subscribed to. +The ``ClusterClient`` sends out notifications in relation to having received a list of contact points +from the ``ClusterClientReceptionist``. One use of this list might be for the client to record its +contact points. A client that is restarted could then use this information to supersede any previously +configured contact points. + +The ``ClusterClientReceptionist`` sends out notifications in relation to having received a contact +from a ``ClusterClient``. This notification enables the server containing the receptionist to become aware of +what clients are connected. + **1. ClusterClient.Send** The message will be delivered to one recipient with a matching path, if any such @@ -112,6 +122,19 @@ It is recommended to load the extension when the actor system is started by defi akka.extensions = ["akka.cluster.client.ClusterClientReceptionist"] +Events +------ +As mentioned earlier, both the ``ClusterClient`` and ``ClusterClientReceptionist`` emit events that can be subscribed to. +The following code snippet declares an actor that will receive notifications on contact points (addresses to the available +receptionists), as they become available. The code illustrates subscribing to the events and receiving the ``ClusterClient`` +initial state. + +.. includecode:: ../../../akka-cluster-tools/src/test/java/akka/cluster/client/ClusterClientTest.java#clientEventsListener + +Similarly we can have an actor that behaves in a similar fashion for learning what cluster clients contact a ``ClusterClientReceptionist``: + +.. includecode:: ../../../akka-cluster-tools/src/test/java/akka/cluster/client/ClusterClientTest.java#receptionistEventsListener + Dependencies ------------ diff --git a/akka-docs/rst/scala/cluster-client.rst b/akka-docs/rst/scala/cluster-client.rst index 9060df2e0f..1289f325b4 100644 --- a/akka-docs/rst/scala/cluster-client.rst +++ b/akka-docs/rst/scala/cluster-client.rst @@ -33,6 +33,16 @@ The ``ClusterClientReceptionist`` provides methods for registration of actors th should be reachable from the client. Messages are wrapped in ``ClusterClient.Send``, ``ClusterClient.SendToAll`` or ``ClusterClient.Publish``. +Both the ``ClusterClient`` and the ``ClusterClientReceptionist`` emit events that can be subscribed to. +The ``ClusterClient`` sends out notifications in relation to having received a list of contact points +from the ``ClusterClientReceptionist``. One use of this list might be for the client to record its +contact points. A client that is restarted could then use this information to supersede any previously +configured contact points. + +The ``ClusterClientReceptionist`` sends out notifications in relation to having received contact +from a ``ClusterClient``. This notification enables the server containing the receptionist to become aware of +what clients are connected. + **1. ClusterClient.Send** The message will be delivered to one recipient with a matching path, if any such @@ -112,6 +122,19 @@ It is recommended to load the extension when the actor system is started by defi akka.extensions = ["akka.cluster.client.ClusterClientReceptionist"] +Events +------ +As mentioned earlier, both the ``ClusterClient`` and ``ClusterClientReceptionist`` emit events that can be subscribed to. +The following code snippet declares an actor that will receive notifications on contact points (addresses to the available +receptionists), as they become available. The code illustrates subscribing to the events and receiving the ``ClusterClient`` +initial state. + +.. includecode:: ../../../akka-cluster-tools/src/multi-jvm/scala/akka/cluster/client/ClusterClientSpec.scala#clientEventsListener + +Similarly we can have an actor that behaves in a similar fashion for learning what cluster clients contact a ``ClusterClientReceptionist``: + +.. includecode:: ../../../akka-cluster-tools/src/multi-jvm/scala/akka/cluster/client/ClusterClientSpec.scala#receptionistEventsListener + Dependencies ------------ diff --git a/project/MiMa.scala b/project/MiMa.scala index 5da02c171f..9f23d27f08 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -797,7 +797,12 @@ object MiMa extends AutoPlugin { ProblemFilters.exclude[MissingTypesProblem]("akka.stream.extra.Timed$StopTimed"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.extra.Timed#StopTimed.onPush"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.extra.Timed#StopTimed.onUpstreamFinish"), - ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.extra.Timed#StopTimed.onUpstreamFailure") + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.extra.Timed#StopTimed.onUpstreamFailure"), + + // #20462 - now uses a Set instead of a Seq within the private API of the cluster client + ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.client.ClusterClient.contacts_="), + ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.cluster.client.ClusterClient.contacts"), + ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.cluster.client.ClusterClient.initialContactsSel") ) ) }