=clu #18577 Option to stop cluster client after no receptionist contact timeout
This commit is contained in:
parent
673174a64a
commit
19c7017482
6 changed files with 227 additions and 26 deletions
|
|
@ -112,6 +112,15 @@ akka.cluster.client {
|
|||
# immediately if the location of the singleton is unknown.
|
||||
# Maximum allowed buffer size is 10000.
|
||||
buffer-size = 1000
|
||||
|
||||
# If connection to the receiptionist is lost and the client has not been
|
||||
# able to acquire a new connection for this long the client will stop itself.
|
||||
# This duration makes it possible to watch the cluster client and react on a more permanent
|
||||
# loss of connection with the cluster, for example by accessing some kind of
|
||||
# service registry for an updated set of initial contacts to start a new cluster client with.
|
||||
# If this is not wanted it can be set to "off" to disable the timeout and retry
|
||||
# forever.
|
||||
reconnect-timeout = off
|
||||
}
|
||||
# //#cluster-client-config
|
||||
|
||||
|
|
|
|||
|
|
@ -58,7 +58,11 @@ object ClusterClientSettings {
|
|||
refreshContactsInterval = config.getDuration("refresh-contacts-interval", MILLISECONDS).millis,
|
||||
heartbeatInterval = config.getDuration("heartbeat-interval", MILLISECONDS).millis,
|
||||
acceptableHeartbeatPause = config.getDuration("acceptable-heartbeat-pause", MILLISECONDS).millis,
|
||||
bufferSize = config.getInt("buffer-size"))
|
||||
bufferSize = config.getInt("buffer-size"),
|
||||
reconnectTimeout = config.getString("reconnect-timeout") match {
|
||||
case "off" ⇒ None
|
||||
case _ ⇒ Some(config.getDuration("reconnect-timeout", MILLISECONDS).millis)
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -96,6 +100,10 @@ object ClusterClientSettings {
|
|||
* When the buffer is full old messages will be dropped when new messages are sent via the
|
||||
* client. Use 0 to disable buffering, i.e. messages will be dropped immediately if the
|
||||
* location of the receptionist is unavailable.
|
||||
* @param reconnectTimeout If the connection to the receptionist is lost and cannot
|
||||
* be re-established within this duration the cluster client will be stopped. This makes it possible
|
||||
* to watch it from another actor and possibly acquire a new list of initialContacts from some
|
||||
* external service registry
|
||||
*/
|
||||
final class ClusterClientSettings(
|
||||
val initialContacts: Set[ActorPath],
|
||||
|
|
@ -103,10 +111,24 @@ final class ClusterClientSettings(
|
|||
val refreshContactsInterval: FiniteDuration,
|
||||
val heartbeatInterval: FiniteDuration,
|
||||
val acceptableHeartbeatPause: FiniteDuration,
|
||||
val bufferSize: Int) extends NoSerializationVerificationNeeded {
|
||||
val bufferSize: Int,
|
||||
val reconnectTimeout: Option[FiniteDuration]) extends NoSerializationVerificationNeeded {
|
||||
|
||||
require(bufferSize >= 0 && bufferSize <= 10000, "bufferSize must be >= 0 and <= 10000")
|
||||
|
||||
/**
|
||||
* For binary/source compatibility
|
||||
*/
|
||||
def this(
|
||||
initialContacts: Set[ActorPath],
|
||||
establishingGetContactsInterval: FiniteDuration,
|
||||
refreshContactsInterval: FiniteDuration,
|
||||
heartbeatInterval: FiniteDuration,
|
||||
acceptableHeartbeatPause: FiniteDuration,
|
||||
bufferSize: Int) =
|
||||
this(initialContacts, establishingGetContactsInterval, refreshContactsInterval, heartbeatInterval,
|
||||
acceptableHeartbeatPause, bufferSize, None)
|
||||
|
||||
/**
|
||||
* Scala API
|
||||
*/
|
||||
|
|
@ -135,15 +157,19 @@ final class ClusterClientSettings(
|
|||
def withBufferSize(bufferSize: Int): ClusterClientSettings =
|
||||
copy(bufferSize = bufferSize)
|
||||
|
||||
def withReconnectTimeout(reconnectTimeout: Option[FiniteDuration]): ClusterClientSettings =
|
||||
copy(reconnectTimeout = reconnectTimeout)
|
||||
|
||||
private def copy(
|
||||
initialContacts: Set[ActorPath] = initialContacts,
|
||||
establishingGetContactsInterval: FiniteDuration = establishingGetContactsInterval,
|
||||
refreshContactsInterval: FiniteDuration = refreshContactsInterval,
|
||||
heartbeatInterval: FiniteDuration = heartbeatInterval,
|
||||
acceptableHeartbeatPause: FiniteDuration = acceptableHeartbeatPause,
|
||||
bufferSize: Int = bufferSize): ClusterClientSettings =
|
||||
bufferSize: Int = bufferSize,
|
||||
reconnectTimeout: Option[FiniteDuration] = reconnectTimeout): ClusterClientSettings =
|
||||
new ClusterClientSettings(initialContacts, establishingGetContactsInterval, refreshContactsInterval,
|
||||
heartbeatInterval, acceptableHeartbeatPause, bufferSize)
|
||||
heartbeatInterval, acceptableHeartbeatPause, bufferSize, reconnectTimeout)
|
||||
}
|
||||
|
||||
object ClusterClient {
|
||||
|
|
@ -172,6 +198,7 @@ object ClusterClient {
|
|||
private[akka] object Internal {
|
||||
case object RefreshContactsTick
|
||||
case object HeartbeatTick
|
||||
case object ReconnectTimeout
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -257,27 +284,37 @@ final class ClusterClient(settings: ClusterClientSettings) extends Actor with Ac
|
|||
def receive = establishing
|
||||
|
||||
def establishing: Actor.Receive = {
|
||||
case Contacts(contactPoints) ⇒
|
||||
if (contactPoints.nonEmpty) {
|
||||
contacts = contactPoints.map(context.actorSelection)
|
||||
contacts foreach { _ ! Identify(None) }
|
||||
}
|
||||
case ActorIdentity(_, Some(receptionist)) ⇒
|
||||
log.info("Connected to [{}]", receptionist.path)
|
||||
scheduleRefreshContactsTick(refreshContactsInterval)
|
||||
sendBuffered(receptionist)
|
||||
context.become(active(receptionist))
|
||||
failureDetector.heartbeat()
|
||||
case ActorIdentity(_, None) ⇒ // ok, use another instead
|
||||
case HeartbeatTick ⇒
|
||||
failureDetector.heartbeat()
|
||||
case RefreshContactsTick ⇒ sendGetContacts()
|
||||
case Send(path, msg, localAffinity) ⇒
|
||||
buffer(DistributedPubSubMediator.Send(path, msg, localAffinity))
|
||||
case SendToAll(path, msg) ⇒
|
||||
buffer(DistributedPubSubMediator.SendToAll(path, msg))
|
||||
case Publish(topic, msg) ⇒
|
||||
buffer(DistributedPubSubMediator.Publish(topic, msg))
|
||||
val connectTimerCancelable = settings.reconnectTimeout.map { timeout ⇒
|
||||
context.system.scheduler.scheduleOnce(timeout, self, ReconnectTimeout)
|
||||
}
|
||||
|
||||
{
|
||||
case Contacts(contactPoints) ⇒
|
||||
if (contactPoints.nonEmpty) {
|
||||
contacts = contactPoints.map(context.actorSelection)
|
||||
contacts foreach { _ ! Identify(None) }
|
||||
}
|
||||
case ActorIdentity(_, Some(receptionist)) ⇒
|
||||
log.info("Connected to [{}]", receptionist.path)
|
||||
scheduleRefreshContactsTick(refreshContactsInterval)
|
||||
sendBuffered(receptionist)
|
||||
context.become(active(receptionist))
|
||||
connectTimerCancelable.foreach(_.cancel())
|
||||
failureDetector.heartbeat()
|
||||
case ActorIdentity(_, None) ⇒ // ok, use another instead
|
||||
case HeartbeatTick ⇒
|
||||
failureDetector.heartbeat()
|
||||
case RefreshContactsTick ⇒ sendGetContacts()
|
||||
case Send(path, msg, localAffinity) ⇒
|
||||
buffer(DistributedPubSubMediator.Send(path, msg, localAffinity))
|
||||
case SendToAll(path, msg) ⇒
|
||||
buffer(DistributedPubSubMediator.SendToAll(path, msg))
|
||||
case Publish(topic, msg) ⇒
|
||||
buffer(DistributedPubSubMediator.Publish(topic, msg))
|
||||
case ReconnectTimeout ⇒
|
||||
log.warning("Receptionist reconnect not successful within {} stopping cluster client", settings.reconnectTimeout)
|
||||
context.stop(self)
|
||||
}
|
||||
}
|
||||
|
||||
def active(receptionist: ActorRef): Actor.Receive = {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,111 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2016 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.cluster.client
|
||||
|
||||
import akka.actor.{ Actor, Props, Terminated }
|
||||
import akka.cluster.Cluster
|
||||
import akka.cluster.pubsub.{ DistributedPubSub, DistributedPubSubMediator }
|
||||
import akka.remote.testconductor.RoleName
|
||||
import akka.remote.testkit.{ STMultiNodeSpec, MultiNodeSpec, MultiNodeConfig }
|
||||
import akka.testkit.{ EventFilter, ImplicitSender }
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import scala.concurrent.Await
|
||||
import scala.concurrent.duration._
|
||||
|
||||
object ClusterClientStopSpec extends MultiNodeConfig {
|
||||
val client = role("client")
|
||||
val first = role("first")
|
||||
val second = role("second")
|
||||
commonConfig(ConfigFactory.parseString("""
|
||||
akka.loglevel = INFO
|
||||
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
|
||||
akka.remote.log-remote-lifecycle-events = off
|
||||
akka.cluster.client {
|
||||
heartbeat-interval = 1s
|
||||
acceptable-heartbeat-pause = 1s
|
||||
reconnect-timeout = 3s
|
||||
receptionist.number-of-contacts = 1
|
||||
|
||||
}
|
||||
akka.test.filter-leeway = 10s
|
||||
"""))
|
||||
|
||||
class Service extends Actor {
|
||||
def receive = {
|
||||
case msg ⇒ sender() ! msg
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class ClusterClientStopMultiJvmNode1 extends ClusterClientStopSpec
|
||||
class ClusterClientStopMultiJvmNode2 extends ClusterClientStopSpec
|
||||
class ClusterClientStopMultiJvmNode3 extends ClusterClientStopSpec
|
||||
|
||||
class ClusterClientStopSpec extends MultiNodeSpec(ClusterClientStopSpec) with STMultiNodeSpec with ImplicitSender {
|
||||
|
||||
import ClusterClientStopSpec._
|
||||
|
||||
override def initialParticipants: Int = 3
|
||||
|
||||
def join(from: RoleName, to: RoleName): Unit = {
|
||||
runOn(from) {
|
||||
Cluster(system) join node(to).address
|
||||
ClusterClientReceptionist(system)
|
||||
}
|
||||
enterBarrier(from.name + "-joined")
|
||||
}
|
||||
|
||||
def awaitCount(expected: Int): Unit = {
|
||||
awaitAssert {
|
||||
DistributedPubSub(system).mediator ! DistributedPubSubMediator.Count
|
||||
expectMsgType[Int] should ===(expected)
|
||||
}
|
||||
}
|
||||
|
||||
def initialContacts = Set(first, second).map { r ⇒
|
||||
node(r) / "system" / "receptionist"
|
||||
}
|
||||
|
||||
"A Cluster Client" should {
|
||||
|
||||
"startup cluster" in within(30.seconds) {
|
||||
join(first, first)
|
||||
join(second, first)
|
||||
runOn(first) {
|
||||
val service = system.actorOf(Props(classOf[Service]), "testService")
|
||||
ClusterClientReceptionist(system).registerService(service)
|
||||
}
|
||||
runOn(first, second) {
|
||||
awaitCount(1)
|
||||
}
|
||||
|
||||
enterBarrier("cluster-started")
|
||||
}
|
||||
|
||||
"stop if re-establish fails for too long time" in within(20.seconds) {
|
||||
runOn(client) {
|
||||
val c = system.actorOf(ClusterClient.props(
|
||||
ClusterClientSettings(system).withInitialContacts(initialContacts)), "client1")
|
||||
c ! ClusterClient.Send("/user/testService", "hello", localAffinity = true)
|
||||
expectMsgType[String](3.seconds) should be("hello")
|
||||
enterBarrier("was-in-contact")
|
||||
|
||||
watch(c)
|
||||
|
||||
expectTerminated(c, 10.seconds)
|
||||
EventFilter.warning(start = "Receptionist reconnect not successful within", occurrences = 1)
|
||||
|
||||
}
|
||||
|
||||
runOn(first, second) {
|
||||
enterBarrier("was-in-contact")
|
||||
Await.ready(system.terminate(), 10.seconds)
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -146,3 +146,25 @@ a parameter to the ``ClusterClient.props`` factory method, i.e. each client can
|
|||
with different settings if needed.
|
||||
|
||||
.. includecode:: ../../../akka-cluster-tools/src/main/resources/reference.conf#cluster-client-config
|
||||
|
||||
Failure handling
|
||||
----------------
|
||||
When the cluster client is started it must be provided with a list of initial contacts which are cluster
|
||||
nodes where receptionists are running. It will then repeatedly (with an interval configurable
|
||||
by ``establishing-get-contacts-interval``) try to contact those until it gets in contact with one of them.
|
||||
While running, the list of contacts are continuously updated with data from the receptionists (again, with an
|
||||
interval configurable with ``refresh-contacts-interval``), so that if there are more receptionists in the cluster
|
||||
than the initial contacts provided to the client the client will learn about them.
|
||||
|
||||
While the client is running it will detect failures in its connection to the receptionist by heartbeats
|
||||
if more than a configurable amount of heartbeats are missed the client will try to reconnect to its known
|
||||
set of contacts to find a receptionist it can access.
|
||||
|
||||
When the cluster cannot be reached at all
|
||||
-----------------------------------------
|
||||
It is possible to make the cluster client stop entirely if it cannot find a receptionist it can talk to
|
||||
within a configurable interval. This is configured with the ``reconnect-timeout``, which defaults to ``off``.
|
||||
This can be useful when initial contacts are provided from some kind of service registry, cluster node addresses
|
||||
are entirely dynamic and the entire cluster might shut down or crash, be restarted on new addresses. Since the
|
||||
client will be stopped in that case a monitoring actor can watch it and upon ``Terminate`` a new set of initial
|
||||
contacts can be fetched and a new cluster client started.
|
||||
|
|
@ -146,3 +146,25 @@ a parameter to the ``ClusterClient.props`` factory method, i.e. each client can
|
|||
with different settings if needed.
|
||||
|
||||
.. includecode:: ../../../akka-cluster-tools/src/main/resources/reference.conf#cluster-client-config
|
||||
|
||||
Failure handling
|
||||
----------------
|
||||
When the cluster client is started it must be provided with a list of initial contacts which are cluster
|
||||
nodes where receptionists are running. It will then repeatedly (with an interval configurable
|
||||
by ``establishing-get-contacts-interval``) try to contact those until it gets in contact with one of them.
|
||||
While running, the list of contacts are continuously updated with data from the receptionists (again, with an
|
||||
interval configurable with ``refresh-contacts-interval``), so that if there are more receptionists in the cluster
|
||||
than the initial contacts provided to the client the client will learn about them.
|
||||
|
||||
While the client is running it will detect failures in its connection to the receptionist by heartbeats
|
||||
if more than a configurable amount of heartbeats are missed the client will try to reconnect to its known
|
||||
set of contacts to find a receptionist it can access.
|
||||
|
||||
When the cluster cannot be reached at all
|
||||
-----------------------------------------
|
||||
It is possible to make the cluster client stop entirely if it cannot find a receptionist it can talk to
|
||||
within a configurable interval. This is configured with the ``reconnect-timeout``, which defaults to ``off``.
|
||||
This can be useful when initial contacts are provided from some kind of service registry, cluster node addresses
|
||||
are entirely dynamic and the entire cluster might shut down or crash, be restarted on new addresses. Since the
|
||||
client will be stopped in that case a monitoring actor can watch it and upon ``Terminate`` a new set of initial
|
||||
contacts can be fetched and a new cluster client started.
|
||||
|
|
@ -120,7 +120,7 @@ class EventsByPersistenceIdSpec extends AkkaSpec(EventsByPersistenceIdSpec.confi
|
|||
expectMsg(s"${2L}-deleted")
|
||||
|
||||
val src = queries.currentEventsByPersistenceId("h", 0L, Long.MaxValue)
|
||||
src.map(_.event).runWith(TestSink.probe[Any]).request(1).expectNext("h-3") expectComplete()
|
||||
src.map(_.event).runWith(TestSink.probe[Any]).request(1).expectNext("h-3").expectComplete()
|
||||
}
|
||||
|
||||
"return empty stream for empty journal" in {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue