=con #13909 Don't use remote watch in ClusterClient

* because it will result in quarantine if failure
  detection triggers and that kind of coupling is
  exactly what is not desired for a ClusterClient
* replace by simple heartbeat failure detection,
  DeadlineFailureDetector

* DeadLetterSuppression
This commit is contained in:
Patrik Nordwall 2015-02-06 08:43:28 +01:00
parent b99b8090f0
commit 7686fa523e
3 changed files with 134 additions and 37 deletions

View file

@ -81,6 +81,17 @@ akka.cluster.client {
# Interval at which the client will ask the ClusterReceptionist for # Interval at which the client will ask the ClusterReceptionist for
# new contact points to be used for next reconnect. # new contact points to be used for next reconnect.
refresh-contacts-interval = 60s refresh-contacts-interval = 60s
# How often failure detection heartbeat messages should be sent
heartbeat-interval = 2s
# Number of potentially lost/delayed heartbeats that will be
# accepted before considering it to be an anomaly.
# The ClusterClient is using the akka.remote.DeadlineFailureDetector, which
# will trigger if there are no heartbeats within the duration
# heartbeat-interval + acceptable-heartbeat-pause, i.e. 15 seconds with
# the default settings.
acceptable-heartbeat-pause = 13s
} }
# //#cluster-client-mailbox-config # //#cluster-client-mailbox-config

View file

@ -4,10 +4,8 @@
package akka.cluster.client package akka.cluster.client
import java.net.URLEncoder import java.net.URLEncoder
import scala.collection.immutable import scala.collection.immutable
import scala.concurrent.duration._ import scala.concurrent.duration._
import akka.actor.Actor import akka.actor.Actor
import akka.actor.ActorIdentity import akka.actor.ActorIdentity
import akka.actor.ActorLogging import akka.actor.ActorLogging
@ -37,6 +35,8 @@ import akka.japi.Util.immutableSeq
import akka.routing.ConsistentHash import akka.routing.ConsistentHash
import akka.routing.MurmurHash import akka.routing.MurmurHash
import com.typesafe.config.Config import com.typesafe.config.Config
import akka.actor.DeadLetterSuppression
import akka.remote.DeadlineFailureDetector
object ClusterClientSettings { object ClusterClientSettings {
/** /**
@ -55,7 +55,9 @@ object ClusterClientSettings {
new ClusterClientSettings( new ClusterClientSettings(
initialContacts, initialContacts,
establishingGetContactsInterval = config.getDuration("establishing-get-contacts-interval", MILLISECONDS).millis, establishingGetContactsInterval = config.getDuration("establishing-get-contacts-interval", MILLISECONDS).millis,
refreshContactsInterval = config.getDuration("refresh-contacts-interval", MILLISECONDS).millis) refreshContactsInterval = config.getDuration("refresh-contacts-interval", MILLISECONDS).millis,
heartbeatInterval = config.getDuration("heartbeat-interval", MILLISECONDS).millis,
acceptableHeartbeatPause = config.getDuration("acceptable-heartbeat-pause", MILLISECONDS).millis)
} }
/** /**
@ -79,11 +81,19 @@ object ClusterClientSettings {
* to establish contact with one of ClusterReceptionist on the servers (cluster nodes) * to establish contact with one of ClusterReceptionist on the servers (cluster nodes)
* @param refreshContactsInterval Interval at which the client will ask the * @param refreshContactsInterval Interval at which the client will ask the
* `ClusterReceptionist` for new contact points to be used for next reconnect. * `ClusterReceptionist` for new contact points to be used for next reconnect.
* @param heartbeatInterval How often failure detection heartbeat messages for detection
* of failed connections should be sent.
* @param acceptableHeartbeatPause Number of potentially lost/delayed heartbeats that will
* be accepted before considering it to be an anomaly. The ClusterClient is using the
* [[akka.remote.DeadlineFailureDetector]], which will trigger if there are no heartbeats
* within the duration `heartbeatInterval + acceptableHeartbeatPause`.
*/ */
final class ClusterClientSettings( final class ClusterClientSettings(
val initialContacts: Set[ActorPath], val initialContacts: Set[ActorPath],
val establishingGetContactsInterval: FiniteDuration, val establishingGetContactsInterval: FiniteDuration,
val refreshContactsInterval: FiniteDuration) extends NoSerializationVerificationNeeded { val refreshContactsInterval: FiniteDuration,
val heartbeatInterval: FiniteDuration,
val acceptableHeartbeatPause: FiniteDuration) extends NoSerializationVerificationNeeded {
def withInitialContacts(initialContacts: Set[ActorPath]): ClusterClientSettings = { def withInitialContacts(initialContacts: Set[ActorPath]): ClusterClientSettings = {
require(initialContacts.nonEmpty, "initialContacts must be defined") require(initialContacts.nonEmpty, "initialContacts must be defined")
@ -96,11 +106,17 @@ final class ClusterClientSettings(
def withRefreshContactsInterval(refreshContactsInterval: FiniteDuration): ClusterClientSettings = def withRefreshContactsInterval(refreshContactsInterval: FiniteDuration): ClusterClientSettings =
copy(refreshContactsInterval = refreshContactsInterval) copy(refreshContactsInterval = refreshContactsInterval)
def withHeartbeat(heartbeatInterval: FiniteDuration, acceptableHeartbeatPause: FiniteDuration): ClusterClientSettings =
copy(heartbeatInterval = heartbeatInterval, acceptableHeartbeatPause = acceptableHeartbeatPause)
private def copy( private def copy(
initialContacts: Set[ActorPath] = initialContacts, initialContacts: Set[ActorPath] = initialContacts,
establishingGetContactsInterval: FiniteDuration = establishingGetContactsInterval, establishingGetContactsInterval: FiniteDuration = establishingGetContactsInterval,
refreshContactsInterval: FiniteDuration = refreshContactsInterval): ClusterClientSettings = refreshContactsInterval: FiniteDuration = refreshContactsInterval,
new ClusterClientSettings(initialContacts, establishingGetContactsInterval, refreshContactsInterval) heartbeatInterval: FiniteDuration = heartbeatInterval,
acceptableHeartbeatPause: FiniteDuration = acceptableHeartbeatPause): ClusterClientSettings =
new ClusterClientSettings(initialContacts, establishingGetContactsInterval, refreshContactsInterval,
heartbeatInterval, acceptableHeartbeatPause)
} }
@ -129,6 +145,7 @@ object ClusterClient {
*/ */
private[akka] object Internal { private[akka] object Internal {
case object RefreshContactsTick case object RefreshContactsTick
case object HeartbeatTick
} }
} }
@ -174,12 +191,16 @@ class ClusterClient(settings: ClusterClientSettings) extends Actor with Stash wi
require(initialContacts.nonEmpty, "initialContacts must be defined") require(initialContacts.nonEmpty, "initialContacts must be defined")
val failureDetector = new DeadlineFailureDetector(acceptableHeartbeatPause, heartbeatInterval)
val initialContactsSel: immutable.IndexedSeq[ActorSelection] = val initialContactsSel: immutable.IndexedSeq[ActorSelection] =
initialContacts.map(context.actorSelection).toVector initialContacts.map(context.actorSelection).toVector
var contacts = initialContactsSel var contacts = initialContactsSel
sendGetContacts() sendGetContacts()
import context.dispatcher import context.dispatcher
val heartbeatTask = context.system.scheduler.schedule(
heartbeatInterval, heartbeatInterval, self, HeartbeatTick)
var refreshContactsTask: Option[Cancellable] = None var refreshContactsTask: Option[Cancellable] = None
scheduleRefreshContactsTick(establishingGetContactsInterval) scheduleRefreshContactsTick(establishingGetContactsInterval)
self ! RefreshContactsTick self ! RefreshContactsTick
@ -192,6 +213,7 @@ class ClusterClient(settings: ClusterClientSettings) extends Actor with Stash wi
override def postStop(): Unit = { override def postStop(): Unit = {
super.postStop() super.postStop()
heartbeatTask.cancel()
refreshContactsTask foreach { _.cancel() } refreshContactsTask foreach { _.cancel() }
} }
@ -204,13 +226,14 @@ class ClusterClient(settings: ClusterClientSettings) extends Actor with Stash wi
contacts foreach { _ ! Identify(None) } contacts foreach { _ ! Identify(None) }
} }
case ActorIdentity(_, Some(receptionist)) case ActorIdentity(_, Some(receptionist))
context watch receptionist
log.info("Connected to [{}]", receptionist.path) log.info("Connected to [{}]", receptionist.path)
context.watch(receptionist)
scheduleRefreshContactsTick(refreshContactsInterval) scheduleRefreshContactsTick(refreshContactsInterval)
unstashAll() unstashAll()
context.become(active(receptionist)) context.become(active(receptionist))
failureDetector.heartbeat()
case ActorIdentity(_, None) // ok, use another instead case ActorIdentity(_, None) // ok, use another instead
case HeartbeatTick
failureDetector.heartbeat()
case RefreshContactsTick sendGetContacts() case RefreshContactsTick sendGetContacts()
case msg stash() case msg stash()
} }
@ -222,17 +245,23 @@ class ClusterClient(settings: ClusterClientSettings) extends Actor with Stash wi
receptionist forward DistributedPubSubMediator.SendToAll(path, msg) receptionist forward DistributedPubSubMediator.SendToAll(path, msg)
case Publish(topic, msg) case Publish(topic, msg)
receptionist forward DistributedPubSubMediator.Publish(topic, msg) receptionist forward DistributedPubSubMediator.Publish(topic, msg)
case HeartbeatTick
if (!failureDetector.isAvailable) {
log.info("Lost contact with [{}], restablishing connection", receptionist)
sendGetContacts()
scheduleRefreshContactsTick(establishingGetContactsInterval)
context.become(establishing)
failureDetector.heartbeat()
} else
receptionist ! Heartbeat
case HeartbeatRsp
failureDetector.heartbeat()
case RefreshContactsTick case RefreshContactsTick
receptionist ! GetContacts receptionist ! GetContacts
case Contacts(contactPoints) case Contacts(contactPoints)
// refresh of contacts // refresh of contacts
if (contactPoints.nonEmpty) if (contactPoints.nonEmpty)
contacts = contactPoints contacts = contactPoints
case Terminated(`receptionist`)
log.info("Lost contact with [{}], restablishing connection", receptionist)
sendGetContacts()
scheduleRefreshContactsTick(establishingGetContactsInterval)
context.become(establishing)
case _: ActorIdentity // ok, from previous establish, already handled case _: ActorIdentity // ok, from previous establish, already handled
} }
@ -405,11 +434,15 @@ object ClusterReceptionist {
*/ */
private[akka] object Internal { private[akka] object Internal {
@SerialVersionUID(1L) @SerialVersionUID(1L)
case object GetContacts case object GetContacts extends DeadLetterSuppression
@SerialVersionUID(1L) @SerialVersionUID(1L)
final case class Contacts(contactPoints: immutable.IndexedSeq[ActorSelection]) final case class Contacts(contactPoints: immutable.IndexedSeq[ActorSelection])
@SerialVersionUID(1L) @SerialVersionUID(1L)
case object Ping case object Heartbeat extends DeadLetterSuppression
@SerialVersionUID(1L)
case object HeartbeatRsp extends DeadLetterSuppression
@SerialVersionUID(1L)
case object Ping extends DeadLetterSuppression
/** /**
* Replies are tunneled via this actor, child of the receptionist, to avoid * Replies are tunneled via this actor, child of the receptionist, to avoid
@ -417,11 +450,9 @@ object ClusterReceptionist {
*/ */
class ClientResponseTunnel(client: ActorRef, timeout: FiniteDuration) extends Actor { class ClientResponseTunnel(client: ActorRef, timeout: FiniteDuration) extends Actor {
context.setReceiveTimeout(timeout) context.setReceiveTimeout(timeout)
context.watch(client)
def receive = { def receive = {
case Ping // keep alive from client case Ping // keep alive from client
case ReceiveTimeout context stop self case ReceiveTimeout context stop self
case Terminated(`client`) context stop self
case msg client forward msg case msg client forward msg
} }
} }
@ -508,6 +539,9 @@ class ClusterReceptionist(pubSubMediator: ActorRef, settings: ClusterReceptionis
tunnel ! Ping // keep alive tunnel ! Ping // keep alive
pubSubMediator.tell(msg, tunnel) pubSubMediator.tell(msg, tunnel)
case Heartbeat
sender() ! HeartbeatRsp
case GetContacts case GetContacts
// Consistent hashing is used to ensure that the reply to GetContacts // Consistent hashing is used to ensure that the reply to GetContacts
// is the same from all nodes (most of the time) and it also // is the same from all nodes (most of the time) and it also

View file

@ -18,6 +18,7 @@ import akka.remote.testkit.STMultiNodeSpec
import akka.testkit._ import akka.testkit._
import akka.actor.Address import akka.actor.Address
import akka.cluster.pubsub._ import akka.cluster.pubsub._
import akka.remote.transport.ThrottlerTransportAdapter.Direction
object ClusterClientSpec extends MultiNodeConfig { object ClusterClientSpec extends MultiNodeConfig {
val client = role("client") val client = role("client")
@ -31,13 +32,17 @@ object ClusterClientSpec extends MultiNodeConfig {
akka.actor.provider = "akka.cluster.ClusterActorRefProvider" akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote.log-remote-lifecycle-events = off akka.remote.log-remote-lifecycle-events = off
akka.cluster.auto-down-unreachable-after = 0s akka.cluster.auto-down-unreachable-after = 0s
akka.cluster.client.heartbeat-interval = 1s
akka.cluster.client.acceptable-heartbeat-pause = 3s
""")) """))
testTransport(on = true)
class TestService(testActor: ActorRef) extends Actor { class TestService(testActor: ActorRef) extends Actor {
def receive = { def receive = {
case msg case msg
testActor forward msg testActor forward msg
sender() ! "ack" sender() ! msg + "-ack"
} }
} }
@ -77,11 +82,13 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
} }
} }
def roleName(addr: Address): Option[RoleName] = roles.find(node(_).address == addr) var remainingServerRoleNames = Set(first, second, third, fourth)
def initialContacts = Set( def roleName(addr: Address): Option[RoleName] = remainingServerRoleNames.find(node(_).address == addr)
node(second) / "user" / "receptionist",
node(third) / "user" / "receptionist") def initialContacts = (remainingServerRoleNames - first - fourth).map { r
node(r) / "user" / "receptionist"
}
"A ClusterClient" must { "A ClusterClient" must {
@ -104,9 +111,10 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
"communicate to actor on any node in cluster" in within(10 seconds) { "communicate to actor on any node in cluster" in within(10 seconds) {
runOn(client) { runOn(client) {
val c = system.actorOf(ClusterClient.props( val c = system.actorOf(ClusterClient.props(
ClusterClientSettings(system).withInitialContacts(initialContacts))) ClusterClientSettings(system).withInitialContacts(initialContacts)), "client1")
c ! ClusterClient.Send("/user/testService", "hello", localAffinity = true) c ! ClusterClient.Send("/user/testService", "hello", localAffinity = true)
expectMsg("ack") expectMsg("hello-ack")
system.stop(c)
} }
runOn(fourth) { runOn(fourth) {
expectMsg("hello") expectMsg("hello")
@ -140,7 +148,7 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
//#client //#client
runOn(client) { runOn(client) {
val c = system.actorOf(ClusterClient.props( val c = system.actorOf(ClusterClient.props(
ClusterClientSettings(system).withInitialContacts(initialContacts))) ClusterClientSettings(system).withInitialContacts(initialContacts)), "client")
c ! ClusterClient.Send("/user/serviceA", "hello", localAffinity = true) c ! ClusterClient.Send("/user/serviceA", "hello", localAffinity = true)
c ! ClusterClient.SendToAll("/user/serviceB", "hi") c ! ClusterClient.SendToAll("/user/serviceB", "hi")
} }
@ -151,7 +159,7 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
receiveN(3).toSet should ===(Set("hello", "hi")) receiveN(3).toSet should ===(Set("hello", "hi"))
} }
{ //not used, only demo lazy val docOnly = { //not used, only demo
//#initialContacts //#initialContacts
val initialContacts = Set( val initialContacts = Set(
system.actorSelection("akka.tcp://OtherSys@host1:2552/user/receptionist"), system.actorSelection("akka.tcp://OtherSys@host1:2552/user/receptionist"),
@ -164,7 +172,7 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
enterBarrier("after-3") enterBarrier("after-3")
} }
"re-establish connection to receptionist when connection is lost" in within(30 seconds) { "re-establish connection to another receptionist when server is shutdown" in within(30 seconds) {
runOn(first, second, third, fourth) { runOn(first, second, third, fourth) {
val service2 = system.actorOf(Props(classOf[TestService], testActor), "service2") val service2 = system.actorOf(Props(classOf[TestService], testActor), "service2")
ClusterClientReceptionist(system).registerService(service2) ClusterClientReceptionist(system).registerService(service2)
@ -174,21 +182,25 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
runOn(client) { runOn(client) {
val c = system.actorOf(ClusterClient.props( val c = system.actorOf(ClusterClient.props(
ClusterClientSettings(system).withInitialContacts(initialContacts))) ClusterClientSettings(system).withInitialContacts(initialContacts)), "client2")
c ! ClusterClient.Send("/user/service2", "bonjour", localAffinity = true) c ! ClusterClient.Send("/user/service2", "bonjour", localAffinity = true)
expectMsg("ack") expectMsg("bonjour-ack")
val lastSenderAddress = lastSender.path.address val lastSenderAddress = lastSender.path.address
val receptionistRoleName = roleName(lastSenderAddress) match { val receptionistRoleName = roleName(lastSenderAddress) match {
case Some(r) r case Some(r) r
case None fail("unexpected missing roleName: " + lastSender.path.address) case None fail("unexpected missing roleName: " + lastSender.path.address)
} }
testConductor.exit(receptionistRoleName, 0).await testConductor.exit(receptionistRoleName, 0).await
remainingServerRoleNames -= receptionistRoleName
within(remaining - 3.seconds) {
awaitAssert { awaitAssert {
c ! ClusterClient.Send("/user/service2", "hi again", localAffinity = true) c ! ClusterClient.Send("/user/service2", "hi again", localAffinity = true)
expectMsg(1 second, "ack") expectMsg(1 second, "hi again-ack")
} }
} }
system.stop(c)
}
enterBarrier("verifed-3") enterBarrier("verifed-3")
receiveWhile(2 seconds) { receiveWhile(2 seconds) {
case "hi again" case "hi again"
@ -197,5 +209,45 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
enterBarrier("after-4") enterBarrier("after-4")
} }
"re-establish connection to receptionist after partition" in within(30 seconds) {
runOn(client) {
val c = system.actorOf(ClusterClient.props(
ClusterClientSettings(system).withInitialContacts(initialContacts)), "client3")
c ! ClusterClient.Send("/user/service2", "bonjour2", localAffinity = true)
expectMsg("bonjour2-ack")
val lastSenderAddress = lastSender.path.address
val receptionistRoleName = roleName(lastSenderAddress) match {
case Some(r) r
case None fail("unexpected missing roleName: " + lastSender.path.address)
}
// shutdown all but the one that the client is connected to
remainingServerRoleNames.foreach { r
if (r != receptionistRoleName)
testConductor.exit(r, 0).await
}
remainingServerRoleNames = Set(receptionistRoleName)
// network partition between client and server
testConductor.blackhole(client, receptionistRoleName, Direction.Both).await
c ! ClusterClient.Send("/user/service2", "ping", localAffinity = true)
// if we would use remote watch the failure detector would trigger and
// connection quarantined
expectNoMsg(5 seconds)
testConductor.passThrough(client, receptionistRoleName, Direction.Both).await
val expectedAddress = node(receptionistRoleName).address
awaitAssert {
c ! ClusterClient.Send("/user/service2", "bonjour3", localAffinity = true)
expectMsg(1 second, "bonjour3-ack")
val lastSenderAddress = lastSender.path.address
lastSenderAddress should be(expectedAddress)
}
system.stop(c)
}
enterBarrier("after-5")
}
} }
} }