From 19c7017482933172ab54a8fad754952dcecf614d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andre=CC=81n?= Date: Thu, 7 Jan 2016 19:54:55 +0100 Subject: [PATCH] =clu #18577 Option to stop cluster client after no receptionist contact timeout --- .../src/main/resources/reference.conf | 9 ++ .../akka/cluster/client/ClusterClient.scala | 87 ++++++++++---- .../client/ClusterClientStopSpec.scala | 111 ++++++++++++++++++ akka-docs/rst/java/cluster-client.rst | 22 ++++ akka-docs/rst/scala/cluster-client.rst | 22 ++++ .../leveldb/EventsByPersistenceIdSpec.scala | 2 +- 6 files changed, 227 insertions(+), 26 deletions(-) create mode 100644 akka-cluster-tools/src/multi-jvm/scala/akka/cluster/client/ClusterClientStopSpec.scala diff --git a/akka-cluster-tools/src/main/resources/reference.conf b/akka-cluster-tools/src/main/resources/reference.conf index 780d8451ab..f504547ae4 100644 --- a/akka-cluster-tools/src/main/resources/reference.conf +++ b/akka-cluster-tools/src/main/resources/reference.conf @@ -112,6 +112,15 @@ akka.cluster.client { # immediately if the location of the singleton is unknown. # Maximum allowed buffer size is 10000. buffer-size = 1000 + + # If connection to the receiptionist is lost and the client has not been + # able to acquire a new connection for this long the client will stop itself. + # This duration makes it possible to watch the cluster client and react on a more permanent + # loss of connection with the cluster, for example by accessing some kind of + # service registry for an updated set of initial contacts to start a new cluster client with. + # If this is not wanted it can be set to "off" to disable the timeout and retry + # forever. + reconnect-timeout = off } # //#cluster-client-config diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/client/ClusterClient.scala b/akka-cluster-tools/src/main/scala/akka/cluster/client/ClusterClient.scala index c184f78c78..805e08e07f 100644 --- a/akka-cluster-tools/src/main/scala/akka/cluster/client/ClusterClient.scala +++ b/akka-cluster-tools/src/main/scala/akka/cluster/client/ClusterClient.scala @@ -58,7 +58,11 @@ object ClusterClientSettings { refreshContactsInterval = config.getDuration("refresh-contacts-interval", MILLISECONDS).millis, heartbeatInterval = config.getDuration("heartbeat-interval", MILLISECONDS).millis, acceptableHeartbeatPause = config.getDuration("acceptable-heartbeat-pause", MILLISECONDS).millis, - bufferSize = config.getInt("buffer-size")) + bufferSize = config.getInt("buffer-size"), + reconnectTimeout = config.getString("reconnect-timeout") match { + case "off" ⇒ None + case _ ⇒ Some(config.getDuration("reconnect-timeout", MILLISECONDS).millis) + }) } /** @@ -96,6 +100,10 @@ object ClusterClientSettings { * When the buffer is full old messages will be dropped when new messages are sent via the * client. Use 0 to disable buffering, i.e. messages will be dropped immediately if the * location of the receptionist is unavailable. + * @param reconnectTimeout If the connection to the receptionist is lost and cannot + * be re-established within this duration the cluster client will be stopped. This makes it possible + * to watch it from another actor and possibly acquire a new list of initialContacts from some + * external service registry */ final class ClusterClientSettings( val initialContacts: Set[ActorPath], @@ -103,10 +111,24 @@ final class ClusterClientSettings( val refreshContactsInterval: FiniteDuration, val heartbeatInterval: FiniteDuration, val acceptableHeartbeatPause: FiniteDuration, - val bufferSize: Int) extends NoSerializationVerificationNeeded { + val bufferSize: Int, + val reconnectTimeout: Option[FiniteDuration]) extends NoSerializationVerificationNeeded { require(bufferSize >= 0 && bufferSize <= 10000, "bufferSize must be >= 0 and <= 10000") + /** + * For binary/source compatibility + */ + def this( + initialContacts: Set[ActorPath], + establishingGetContactsInterval: FiniteDuration, + refreshContactsInterval: FiniteDuration, + heartbeatInterval: FiniteDuration, + acceptableHeartbeatPause: FiniteDuration, + bufferSize: Int) = + this(initialContacts, establishingGetContactsInterval, refreshContactsInterval, heartbeatInterval, + acceptableHeartbeatPause, bufferSize, None) + /** * Scala API */ @@ -135,15 +157,19 @@ final class ClusterClientSettings( def withBufferSize(bufferSize: Int): ClusterClientSettings = copy(bufferSize = bufferSize) + def withReconnectTimeout(reconnectTimeout: Option[FiniteDuration]): ClusterClientSettings = + copy(reconnectTimeout = reconnectTimeout) + private def copy( initialContacts: Set[ActorPath] = initialContacts, establishingGetContactsInterval: FiniteDuration = establishingGetContactsInterval, refreshContactsInterval: FiniteDuration = refreshContactsInterval, heartbeatInterval: FiniteDuration = heartbeatInterval, acceptableHeartbeatPause: FiniteDuration = acceptableHeartbeatPause, - bufferSize: Int = bufferSize): ClusterClientSettings = + bufferSize: Int = bufferSize, + reconnectTimeout: Option[FiniteDuration] = reconnectTimeout): ClusterClientSettings = new ClusterClientSettings(initialContacts, establishingGetContactsInterval, refreshContactsInterval, - heartbeatInterval, acceptableHeartbeatPause, bufferSize) + heartbeatInterval, acceptableHeartbeatPause, bufferSize, reconnectTimeout) } object ClusterClient { @@ -172,6 +198,7 @@ object ClusterClient { private[akka] object Internal { case object RefreshContactsTick case object HeartbeatTick + case object ReconnectTimeout } } @@ -257,27 +284,37 @@ final class ClusterClient(settings: ClusterClientSettings) extends Actor with Ac def receive = establishing def establishing: Actor.Receive = { - case Contacts(contactPoints) ⇒ - if (contactPoints.nonEmpty) { - contacts = contactPoints.map(context.actorSelection) - contacts foreach { _ ! Identify(None) } - } - case ActorIdentity(_, Some(receptionist)) ⇒ - log.info("Connected to [{}]", receptionist.path) - scheduleRefreshContactsTick(refreshContactsInterval) - sendBuffered(receptionist) - context.become(active(receptionist)) - failureDetector.heartbeat() - case ActorIdentity(_, None) ⇒ // ok, use another instead - case HeartbeatTick ⇒ - failureDetector.heartbeat() - case RefreshContactsTick ⇒ sendGetContacts() - case Send(path, msg, localAffinity) ⇒ - buffer(DistributedPubSubMediator.Send(path, msg, localAffinity)) - case SendToAll(path, msg) ⇒ - buffer(DistributedPubSubMediator.SendToAll(path, msg)) - case Publish(topic, msg) ⇒ - buffer(DistributedPubSubMediator.Publish(topic, msg)) + val connectTimerCancelable = settings.reconnectTimeout.map { timeout ⇒ + context.system.scheduler.scheduleOnce(timeout, self, ReconnectTimeout) + } + + { + case Contacts(contactPoints) ⇒ + if (contactPoints.nonEmpty) { + contacts = contactPoints.map(context.actorSelection) + contacts foreach { _ ! Identify(None) } + } + case ActorIdentity(_, Some(receptionist)) ⇒ + log.info("Connected to [{}]", receptionist.path) + scheduleRefreshContactsTick(refreshContactsInterval) + sendBuffered(receptionist) + context.become(active(receptionist)) + connectTimerCancelable.foreach(_.cancel()) + failureDetector.heartbeat() + case ActorIdentity(_, None) ⇒ // ok, use another instead + case HeartbeatTick ⇒ + failureDetector.heartbeat() + case RefreshContactsTick ⇒ sendGetContacts() + case Send(path, msg, localAffinity) ⇒ + buffer(DistributedPubSubMediator.Send(path, msg, localAffinity)) + case SendToAll(path, msg) ⇒ + buffer(DistributedPubSubMediator.SendToAll(path, msg)) + case Publish(topic, msg) ⇒ + buffer(DistributedPubSubMediator.Publish(topic, msg)) + case ReconnectTimeout ⇒ + log.warning("Receptionist reconnect not successful within {} stopping cluster client", settings.reconnectTimeout) + context.stop(self) + } } def active(receptionist: ActorRef): Actor.Receive = { diff --git a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/client/ClusterClientStopSpec.scala b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/client/ClusterClientStopSpec.scala new file mode 100644 index 0000000000..378270ca69 --- /dev/null +++ b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/client/ClusterClientStopSpec.scala @@ -0,0 +1,111 @@ +/** + * Copyright (C) 2009-2016 Typesafe Inc. + */ +package akka.cluster.client + +import akka.actor.{ Actor, Props, Terminated } +import akka.cluster.Cluster +import akka.cluster.pubsub.{ DistributedPubSub, DistributedPubSubMediator } +import akka.remote.testconductor.RoleName +import akka.remote.testkit.{ STMultiNodeSpec, MultiNodeSpec, MultiNodeConfig } +import akka.testkit.{ EventFilter, ImplicitSender } +import com.typesafe.config.ConfigFactory +import scala.concurrent.Await +import scala.concurrent.duration._ + +object ClusterClientStopSpec extends MultiNodeConfig { + val client = role("client") + val first = role("first") + val second = role("second") + commonConfig(ConfigFactory.parseString(""" + akka.loglevel = INFO + akka.actor.provider = "akka.cluster.ClusterActorRefProvider" + akka.remote.log-remote-lifecycle-events = off + akka.cluster.client { + heartbeat-interval = 1s + acceptable-heartbeat-pause = 1s + reconnect-timeout = 3s + receptionist.number-of-contacts = 1 + + } + akka.test.filter-leeway = 10s + """)) + + class Service extends Actor { + def receive = { + case msg ⇒ sender() ! msg + } + } +} + +class ClusterClientStopMultiJvmNode1 extends ClusterClientStopSpec +class ClusterClientStopMultiJvmNode2 extends ClusterClientStopSpec +class ClusterClientStopMultiJvmNode3 extends ClusterClientStopSpec + +class ClusterClientStopSpec extends MultiNodeSpec(ClusterClientStopSpec) with STMultiNodeSpec with ImplicitSender { + + import ClusterClientStopSpec._ + + override def initialParticipants: Int = 3 + + def join(from: RoleName, to: RoleName): Unit = { + runOn(from) { + Cluster(system) join node(to).address + ClusterClientReceptionist(system) + } + enterBarrier(from.name + "-joined") + } + + def awaitCount(expected: Int): Unit = { + awaitAssert { + DistributedPubSub(system).mediator ! DistributedPubSubMediator.Count + expectMsgType[Int] should ===(expected) + } + } + + def initialContacts = Set(first, second).map { r ⇒ + node(r) / "system" / "receptionist" + } + + "A Cluster Client" should { + + "startup cluster" in within(30.seconds) { + join(first, first) + join(second, first) + runOn(first) { + val service = system.actorOf(Props(classOf[Service]), "testService") + ClusterClientReceptionist(system).registerService(service) + } + runOn(first, second) { + awaitCount(1) + } + + enterBarrier("cluster-started") + } + + "stop if re-establish fails for too long time" in within(20.seconds) { + runOn(client) { + val c = system.actorOf(ClusterClient.props( + ClusterClientSettings(system).withInitialContacts(initialContacts)), "client1") + c ! ClusterClient.Send("/user/testService", "hello", localAffinity = true) + expectMsgType[String](3.seconds) should be("hello") + enterBarrier("was-in-contact") + + watch(c) + + expectTerminated(c, 10.seconds) + EventFilter.warning(start = "Receptionist reconnect not successful within", occurrences = 1) + + } + + runOn(first, second) { + enterBarrier("was-in-contact") + Await.ready(system.terminate(), 10.seconds) + + } + + } + + } + +} \ No newline at end of file diff --git a/akka-docs/rst/java/cluster-client.rst b/akka-docs/rst/java/cluster-client.rst index add40c0c73..219b1f229a 100644 --- a/akka-docs/rst/java/cluster-client.rst +++ b/akka-docs/rst/java/cluster-client.rst @@ -146,3 +146,25 @@ a parameter to the ``ClusterClient.props`` factory method, i.e. each client can with different settings if needed. .. includecode:: ../../../akka-cluster-tools/src/main/resources/reference.conf#cluster-client-config + +Failure handling +---------------- +When the cluster client is started it must be provided with a list of initial contacts which are cluster +nodes where receptionists are running. It will then repeatedly (with an interval configurable +by ``establishing-get-contacts-interval``) try to contact those until it gets in contact with one of them. +While running, the list of contacts are continuously updated with data from the receptionists (again, with an +interval configurable with ``refresh-contacts-interval``), so that if there are more receptionists in the cluster +than the initial contacts provided to the client the client will learn about them. + +While the client is running it will detect failures in its connection to the receptionist by heartbeats +if more than a configurable amount of heartbeats are missed the client will try to reconnect to its known +set of contacts to find a receptionist it can access. + +When the cluster cannot be reached at all +----------------------------------------- +It is possible to make the cluster client stop entirely if it cannot find a receptionist it can talk to +within a configurable interval. This is configured with the ``reconnect-timeout``, which defaults to ``off``. +This can be useful when initial contacts are provided from some kind of service registry, cluster node addresses +are entirely dynamic and the entire cluster might shut down or crash, be restarted on new addresses. Since the +client will be stopped in that case a monitoring actor can watch it and upon ``Terminate`` a new set of initial +contacts can be fetched and a new cluster client started. \ No newline at end of file diff --git a/akka-docs/rst/scala/cluster-client.rst b/akka-docs/rst/scala/cluster-client.rst index ecec0cd354..f058acb6fb 100644 --- a/akka-docs/rst/scala/cluster-client.rst +++ b/akka-docs/rst/scala/cluster-client.rst @@ -146,3 +146,25 @@ a parameter to the ``ClusterClient.props`` factory method, i.e. each client can with different settings if needed. .. includecode:: ../../../akka-cluster-tools/src/main/resources/reference.conf#cluster-client-config + +Failure handling +---------------- +When the cluster client is started it must be provided with a list of initial contacts which are cluster +nodes where receptionists are running. It will then repeatedly (with an interval configurable +by ``establishing-get-contacts-interval``) try to contact those until it gets in contact with one of them. +While running, the list of contacts are continuously updated with data from the receptionists (again, with an +interval configurable with ``refresh-contacts-interval``), so that if there are more receptionists in the cluster +than the initial contacts provided to the client the client will learn about them. + +While the client is running it will detect failures in its connection to the receptionist by heartbeats +if more than a configurable amount of heartbeats are missed the client will try to reconnect to its known +set of contacts to find a receptionist it can access. + +When the cluster cannot be reached at all +----------------------------------------- +It is possible to make the cluster client stop entirely if it cannot find a receptionist it can talk to +within a configurable interval. This is configured with the ``reconnect-timeout``, which defaults to ``off``. +This can be useful when initial contacts are provided from some kind of service registry, cluster node addresses +are entirely dynamic and the entire cluster might shut down or crash, be restarted on new addresses. Since the +client will be stopped in that case a monitoring actor can watch it and upon ``Terminate`` a new set of initial +contacts can be fetched and a new cluster client started. \ No newline at end of file diff --git a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdSpec.scala b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdSpec.scala index f2a45ba64a..580f9c25ce 100644 --- a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdSpec.scala +++ b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdSpec.scala @@ -120,7 +120,7 @@ class EventsByPersistenceIdSpec extends AkkaSpec(EventsByPersistenceIdSpec.confi expectMsg(s"${2L}-deleted") val src = queries.currentEventsByPersistenceId("h", 0L, Long.MaxValue) - src.map(_.event).runWith(TestSink.probe[Any]).request(1).expectNext("h-3") expectComplete() + src.map(_.event).runWith(TestSink.probe[Any]).request(1).expectNext("h-3").expectComplete() } "return empty stream for empty journal" in {