Merge pull request #17526 from akka/wip-13909-cluster-client-watch-patriknw

=con #13909 Don't use remote watch in ClusterClient
This commit is contained in:
Patrik Nordwall 2015-06-02 16:57:45 +02:00
commit 10f039f70d
3 changed files with 134 additions and 37 deletions

View file

@ -81,6 +81,17 @@ akka.cluster.client {
# Interval at which the client will ask the ClusterReceptionist for
# new contact points to be used for next reconnect.
refresh-contacts-interval = 60s
# How often failure detection heartbeat messages should be sent
heartbeat-interval = 2s
# Number of potentially lost/delayed heartbeats that will be
# accepted before considering it to be an anomaly.
# The ClusterClient is using the akka.remote.DeadlineFailureDetector, which
# will trigger if there are no heartbeats within the duration
# heartbeat-interval + acceptable-heartbeat-pause, i.e. 15 seconds with
# the default settings.
acceptable-heartbeat-pause = 13s
}
# //#cluster-client-mailbox-config

View file

@ -4,10 +4,8 @@
package akka.cluster.client
import java.net.URLEncoder
import scala.collection.immutable
import scala.concurrent.duration._
import akka.actor.Actor
import akka.actor.ActorIdentity
import akka.actor.ActorLogging
@ -37,6 +35,8 @@ import akka.japi.Util.immutableSeq
import akka.routing.ConsistentHash
import akka.routing.MurmurHash
import com.typesafe.config.Config
import akka.actor.DeadLetterSuppression
import akka.remote.DeadlineFailureDetector
object ClusterClientSettings {
/**
@ -55,7 +55,9 @@ object ClusterClientSettings {
new ClusterClientSettings(
initialContacts,
establishingGetContactsInterval = config.getDuration("establishing-get-contacts-interval", MILLISECONDS).millis,
refreshContactsInterval = config.getDuration("refresh-contacts-interval", MILLISECONDS).millis)
refreshContactsInterval = config.getDuration("refresh-contacts-interval", MILLISECONDS).millis,
heartbeatInterval = config.getDuration("heartbeat-interval", MILLISECONDS).millis,
acceptableHeartbeatPause = config.getDuration("acceptable-heartbeat-pause", MILLISECONDS).millis)
}
/**
@ -79,11 +81,19 @@ object ClusterClientSettings {
* to establish contact with one of ClusterReceptionist on the servers (cluster nodes)
* @param refreshContactsInterval Interval at which the client will ask the
* `ClusterReceptionist` for new contact points to be used for next reconnect.
* @param heartbeatInterval How often failure detection heartbeat messages for detection
* of failed connections should be sent.
* @param acceptableHeartbeatPause Number of potentially lost/delayed heartbeats that will
* be accepted before considering it to be an anomaly. The ClusterClient is using the
* [[akka.remote.DeadlineFailureDetector]], which will trigger if there are no heartbeats
* within the duration `heartbeatInterval + acceptableHeartbeatPause`.
*/
final class ClusterClientSettings(
val initialContacts: Set[ActorPath],
val establishingGetContactsInterval: FiniteDuration,
val refreshContactsInterval: FiniteDuration) extends NoSerializationVerificationNeeded {
val refreshContactsInterval: FiniteDuration,
val heartbeatInterval: FiniteDuration,
val acceptableHeartbeatPause: FiniteDuration) extends NoSerializationVerificationNeeded {
def withInitialContacts(initialContacts: Set[ActorPath]): ClusterClientSettings = {
require(initialContacts.nonEmpty, "initialContacts must be defined")
@ -96,11 +106,17 @@ final class ClusterClientSettings(
def withRefreshContactsInterval(refreshContactsInterval: FiniteDuration): ClusterClientSettings =
copy(refreshContactsInterval = refreshContactsInterval)
def withHeartbeat(heartbeatInterval: FiniteDuration, acceptableHeartbeatPause: FiniteDuration): ClusterClientSettings =
copy(heartbeatInterval = heartbeatInterval, acceptableHeartbeatPause = acceptableHeartbeatPause)
private def copy(
initialContacts: Set[ActorPath] = initialContacts,
establishingGetContactsInterval: FiniteDuration = establishingGetContactsInterval,
refreshContactsInterval: FiniteDuration = refreshContactsInterval): ClusterClientSettings =
new ClusterClientSettings(initialContacts, establishingGetContactsInterval, refreshContactsInterval)
refreshContactsInterval: FiniteDuration = refreshContactsInterval,
heartbeatInterval: FiniteDuration = heartbeatInterval,
acceptableHeartbeatPause: FiniteDuration = acceptableHeartbeatPause): ClusterClientSettings =
new ClusterClientSettings(initialContacts, establishingGetContactsInterval, refreshContactsInterval,
heartbeatInterval, acceptableHeartbeatPause)
}
@ -129,6 +145,7 @@ object ClusterClient {
*/
private[akka] object Internal {
case object RefreshContactsTick
case object HeartbeatTick
}
}
@ -174,12 +191,16 @@ class ClusterClient(settings: ClusterClientSettings) extends Actor with Stash wi
require(initialContacts.nonEmpty, "initialContacts must be defined")
val failureDetector = new DeadlineFailureDetector(acceptableHeartbeatPause, heartbeatInterval)
val initialContactsSel: immutable.IndexedSeq[ActorSelection] =
initialContacts.map(context.actorSelection).toVector
var contacts = initialContactsSel
sendGetContacts()
import context.dispatcher
val heartbeatTask = context.system.scheduler.schedule(
heartbeatInterval, heartbeatInterval, self, HeartbeatTick)
var refreshContactsTask: Option[Cancellable] = None
scheduleRefreshContactsTick(establishingGetContactsInterval)
self ! RefreshContactsTick
@ -192,6 +213,7 @@ class ClusterClient(settings: ClusterClientSettings) extends Actor with Stash wi
override def postStop(): Unit = {
super.postStop()
heartbeatTask.cancel()
refreshContactsTask foreach { _.cancel() }
}
@ -204,15 +226,16 @@ class ClusterClient(settings: ClusterClientSettings) extends Actor with Stash wi
contacts foreach { _ ! Identify(None) }
}
case ActorIdentity(_, Some(receptionist))
context watch receptionist
log.info("Connected to [{}]", receptionist.path)
context.watch(receptionist)
scheduleRefreshContactsTick(refreshContactsInterval)
unstashAll()
context.become(active(receptionist))
failureDetector.heartbeat()
case ActorIdentity(_, None) // ok, use another instead
case RefreshContactsTick sendGetContacts()
case msg stash()
case HeartbeatTick
failureDetector.heartbeat()
case RefreshContactsTick sendGetContacts()
case msg stash()
}
def active(receptionist: ActorRef): Actor.Receive = {
@ -222,17 +245,23 @@ class ClusterClient(settings: ClusterClientSettings) extends Actor with Stash wi
receptionist forward DistributedPubSubMediator.SendToAll(path, msg)
case Publish(topic, msg)
receptionist forward DistributedPubSubMediator.Publish(topic, msg)
case HeartbeatTick
if (!failureDetector.isAvailable) {
log.info("Lost contact with [{}], restablishing connection", receptionist)
sendGetContacts()
scheduleRefreshContactsTick(establishingGetContactsInterval)
context.become(establishing)
failureDetector.heartbeat()
} else
receptionist ! Heartbeat
case HeartbeatRsp
failureDetector.heartbeat()
case RefreshContactsTick
receptionist ! GetContacts
case Contacts(contactPoints)
// refresh of contacts
if (contactPoints.nonEmpty)
contacts = contactPoints
case Terminated(`receptionist`)
log.info("Lost contact with [{}], restablishing connection", receptionist)
sendGetContacts()
scheduleRefreshContactsTick(establishingGetContactsInterval)
context.become(establishing)
case _: ActorIdentity // ok, from previous establish, already handled
}
@ -405,11 +434,15 @@ object ClusterReceptionist {
*/
private[akka] object Internal {
@SerialVersionUID(1L)
case object GetContacts
case object GetContacts extends DeadLetterSuppression
@SerialVersionUID(1L)
final case class Contacts(contactPoints: immutable.IndexedSeq[ActorSelection])
@SerialVersionUID(1L)
case object Ping
case object Heartbeat extends DeadLetterSuppression
@SerialVersionUID(1L)
case object HeartbeatRsp extends DeadLetterSuppression
@SerialVersionUID(1L)
case object Ping extends DeadLetterSuppression
/**
* Replies are tunneled via this actor, child of the receptionist, to avoid
@ -417,12 +450,10 @@ object ClusterReceptionist {
*/
class ClientResponseTunnel(client: ActorRef, timeout: FiniteDuration) extends Actor {
context.setReceiveTimeout(timeout)
context.watch(client)
def receive = {
case Ping // keep alive from client
case ReceiveTimeout context stop self
case Terminated(`client`) context stop self
case msg client forward msg
case Ping // keep alive from client
case ReceiveTimeout context stop self
case msg client forward msg
}
}
}
@ -508,6 +539,9 @@ class ClusterReceptionist(pubSubMediator: ActorRef, settings: ClusterReceptionis
tunnel ! Ping // keep alive
pubSubMediator.tell(msg, tunnel)
case Heartbeat
sender() ! HeartbeatRsp
case GetContacts
// Consistent hashing is used to ensure that the reply to GetContacts
// is the same from all nodes (most of the time) and it also

View file

@ -18,6 +18,7 @@ import akka.remote.testkit.STMultiNodeSpec
import akka.testkit._
import akka.actor.Address
import akka.cluster.pubsub._
import akka.remote.transport.ThrottlerTransportAdapter.Direction
object ClusterClientSpec extends MultiNodeConfig {
val client = role("client")
@ -31,13 +32,17 @@ object ClusterClientSpec extends MultiNodeConfig {
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote.log-remote-lifecycle-events = off
akka.cluster.auto-down-unreachable-after = 0s
akka.cluster.client.heartbeat-interval = 1s
akka.cluster.client.acceptable-heartbeat-pause = 3s
"""))
testTransport(on = true)
class TestService(testActor: ActorRef) extends Actor {
def receive = {
case msg
testActor forward msg
sender() ! "ack"
sender() ! msg + "-ack"
}
}
@ -77,11 +82,13 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
}
}
def roleName(addr: Address): Option[RoleName] = roles.find(node(_).address == addr)
var remainingServerRoleNames = Set(first, second, third, fourth)
def initialContacts = Set(
node(second) / "user" / "receptionist",
node(third) / "user" / "receptionist")
def roleName(addr: Address): Option[RoleName] = remainingServerRoleNames.find(node(_).address == addr)
def initialContacts = (remainingServerRoleNames - first - fourth).map { r
node(r) / "user" / "receptionist"
}
"A ClusterClient" must {
@ -104,9 +111,10 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
"communicate to actor on any node in cluster" in within(10 seconds) {
runOn(client) {
val c = system.actorOf(ClusterClient.props(
ClusterClientSettings(system).withInitialContacts(initialContacts)))
ClusterClientSettings(system).withInitialContacts(initialContacts)), "client1")
c ! ClusterClient.Send("/user/testService", "hello", localAffinity = true)
expectMsg("ack")
expectMsg("hello-ack")
system.stop(c)
}
runOn(fourth) {
expectMsg("hello")
@ -140,7 +148,7 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
//#client
runOn(client) {
val c = system.actorOf(ClusterClient.props(
ClusterClientSettings(system).withInitialContacts(initialContacts)))
ClusterClientSettings(system).withInitialContacts(initialContacts)), "client")
c ! ClusterClient.Send("/user/serviceA", "hello", localAffinity = true)
c ! ClusterClient.SendToAll("/user/serviceB", "hi")
}
@ -151,7 +159,7 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
receiveN(3).toSet should ===(Set("hello", "hi"))
}
{ //not used, only demo
lazy val docOnly = { //not used, only demo
//#initialContacts
val initialContacts = Set(
system.actorSelection("akka.tcp://OtherSys@host1:2552/user/receptionist"),
@ -164,7 +172,7 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
enterBarrier("after-3")
}
"re-establish connection to receptionist when connection is lost" in within(30 seconds) {
"re-establish connection to another receptionist when server is shutdown" in within(30 seconds) {
runOn(first, second, third, fourth) {
val service2 = system.actorOf(Props(classOf[TestService], testActor), "service2")
ClusterClientReceptionist(system).registerService(service2)
@ -174,20 +182,24 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
runOn(client) {
val c = system.actorOf(ClusterClient.props(
ClusterClientSettings(system).withInitialContacts(initialContacts)))
ClusterClientSettings(system).withInitialContacts(initialContacts)), "client2")
c ! ClusterClient.Send("/user/service2", "bonjour", localAffinity = true)
expectMsg("ack")
expectMsg("bonjour-ack")
val lastSenderAddress = lastSender.path.address
val receptionistRoleName = roleName(lastSenderAddress) match {
case Some(r) r
case None fail("unexpected missing roleName: " + lastSender.path.address)
}
testConductor.exit(receptionistRoleName, 0).await
awaitAssert {
c ! ClusterClient.Send("/user/service2", "hi again", localAffinity = true)
expectMsg(1 second, "ack")
remainingServerRoleNames -= receptionistRoleName
within(remaining - 3.seconds) {
awaitAssert {
c ! ClusterClient.Send("/user/service2", "hi again", localAffinity = true)
expectMsg(1 second, "hi again-ack")
}
}
system.stop(c)
}
enterBarrier("verifed-3")
receiveWhile(2 seconds) {
@ -197,5 +209,45 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
enterBarrier("after-4")
}
"re-establish connection to receptionist after partition" in within(30 seconds) {
runOn(client) {
val c = system.actorOf(ClusterClient.props(
ClusterClientSettings(system).withInitialContacts(initialContacts)), "client3")
c ! ClusterClient.Send("/user/service2", "bonjour2", localAffinity = true)
expectMsg("bonjour2-ack")
val lastSenderAddress = lastSender.path.address
val receptionistRoleName = roleName(lastSenderAddress) match {
case Some(r) r
case None fail("unexpected missing roleName: " + lastSender.path.address)
}
// shutdown all but the one that the client is connected to
remainingServerRoleNames.foreach { r
if (r != receptionistRoleName)
testConductor.exit(r, 0).await
}
remainingServerRoleNames = Set(receptionistRoleName)
// network partition between client and server
testConductor.blackhole(client, receptionistRoleName, Direction.Both).await
c ! ClusterClient.Send("/user/service2", "ping", localAffinity = true)
// if we would use remote watch the failure detector would trigger and
// connection quarantined
expectNoMsg(5 seconds)
testConductor.passThrough(client, receptionistRoleName, Direction.Both).await
val expectedAddress = node(receptionistRoleName).address
awaitAssert {
c ! ClusterClient.Send("/user/service2", "bonjour3", localAffinity = true)
expectMsg(1 second, "bonjour3-ack")
val lastSenderAddress = lastSender.path.address
lastSenderAddress should be(expectedAddress)
}
system.stop(c)
}
enterBarrier("after-5")
}
}
}