Cluster client and receptionist, see #1165

This commit is contained in:
Patrik Nordwall 2013-04-14 22:30:09 +02:00
parent 1d1a6383df
commit 089b25b3d4
7 changed files with 661 additions and 0 deletions

View file

@ -0,0 +1,72 @@
.. _cluster-client:
Cluster Client
==============
An actor system that is not part of the cluster can communicate with actors
somewhere in the cluster via this ``ClusterClient``. The client can of course be part of
another cluster. It only needs to know the location of one (or more) nodes to use as initial
contact point. It will establish a connection to a ``ClusterReceptionist`` somewhere in
the cluster. It will monitor the connection to the receptionist and establish a new
connection if the link goes down. When looking for a new receptionist it uses fresh
contact points retrieved from previous establishment, or periodically refreshed contacts,
i.e. not necessarily the initial contact points.
The receptionist is supposed to be started on all nodes, or all nodes with specified role,
in the cluster. The receptionist can be started with the ``ClusterReceptionistExtension``
or as an ordinary actor.
You can send messages via the ``ClusterClient`` to any actor in the cluster that is registered
in the ``DistributedPubSubMediator`` used by the ``ClusterReceptionist``.
Messages are wrapped in ``DistributedPubSubMediator.Send``, ``DistributedPubSubMediator.SendToAll``
or ``DistributedPubSubMediator.Publish`` with the semantics described in
:ref:`distributed-pub-sub`.
Response messages from the destination actor are tunneled via the receptionist
to avoid inbound connections from other cluster nodes to the client, i.e.
the ``sender``, as seen by the destination actor, is not the client itself.
The ``sender`` of the response messages, as seen by the client, is preserved
as the original sender, so the client can choose to send subsequent messages
directly to the actor in the cluster.
An Example
----------
On the cluster nodes you start the receptionist and register the actors that
should be available for the client.
.. includecode:: @contribSrc@/src/multi-jvm/scala/akka/contrib/pattern/ClusterClientSpec.scala#server
On the client you create the ``ClusterClient`` actor and use it as a gateway for sending
messages to the actors identified by their path (without address information) somewhere
in the cluster.
.. includecode:: @contribSrc@/src/multi-jvm/scala/akka/contrib/pattern/ClusterClientSpec.scala#client
The ``initialContacts`` parameter is a ``Set[ActorSelection]``, which can be created like this:
.. includecode:: @contribSrc@/src/multi-jvm/scala/akka/contrib/pattern/ClusterClientSpec.scala#initialContacts
You will probably define the address information of the initial contact points in configuration or system property.
ClusterReceptionistExtension
----------------------------
In the example above the receptionist is started and accessed with the ``akka.contrib.pattern.ClusterReceptionistExtension``.
That is convenient and perfectly fine in most cases, but it can be good to know that it is possible to
start the ````akka.contrib.pattern.ClusterReceptionist`` actor as an ordinary actor and you can have several
different receptionists at the same time, serving different types of clients.
The ``ClusterReceptionistExtension`` can be configured with the following properties:
.. includecode:: @contribSrc@/src/main/resources/reference.conf#receptionist-ext-config
Note that the ``ClusterReceptionistExtension`` uses the ``DistributedPubSubExtension``, which is described
in :ref:`distributed-pub-sub`.
It is recommended to load the extension when the actor system is started by defining it in
``akka.extensions`` configuration property::
akka.extensions = ["akka.contrib.pattern.ClusterReceptionistExtension"]

View file

@ -36,6 +36,7 @@ The Current List of Modules
peek-mailbox
cluster-singleton
distributed-pub-sub
cluster-client
Suggested Way of Using these Contributions
------------------------------------------

View file

@ -22,3 +22,23 @@ akka.contrib.cluster.pub-sub {
removed-time-to-live = 120s
}
# //#pub-sub-ext-config
# //#receptionist-ext-config
# Settings for the ClusterReceptionistExtension
akka.contrib.cluster.receptionist {
# Actor name of the ClusterReceptionist actor, /user/receptionist
name = receptionist
# Start the receptionist on members tagged with this role.
# All members are used if undefined or empty.
role = ""
# The receptionist will send this number of contact points to the client
number-of-contacts = 3
# The actor that tunnel response messages to the client will be stopped
# after this time of inactivity.
response-tunnel-receive-timeout = 30s
}
# //#receptionist-ext-config

View file

@ -0,0 +1,359 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.contrib.pattern
import java.net.URLEncoder
import scala.collection.immutable
import scala.concurrent.duration._
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorRef
import akka.actor.ActorSelection
import akka.actor.ActorSystem
import akka.actor.Address
import akka.actor.ExtendedActorSystem
import akka.actor.Extension
import akka.actor.ExtensionId
import akka.actor.ExtensionIdProvider
import akka.actor.Props
import akka.actor.ReceiveTimeout
import akka.actor.Terminated
import akka.cluster.Cluster
import akka.cluster.ClusterEvent._
import akka.cluster.Member
import akka.cluster.MemberStatus
import akka.routing.ConsistentHash
import akka.routing.MurmurHash
/**
* INTERNAL API
*/
private object ClusterClient {
case object PingTick
case object RefreshContactsTick
}
/**
* This actor is intended to be used on an external node that is not member
* of the cluster. It acts like a gateway for sending messages to actors
* somewhere in the cluster. From the initial contact points it will establish
* a connection to a [[ClusterReceptionist]] somewhere in the cluster. It will
* monitor the connection to the receptionist and establish a new connection if
* the link goes down. When looking for a new receptionist it uses fresh contact
* points retrieved from previous establishment, or periodically refreshed
* contacts, i.e. not necessarily the initial contact points.
*
* You can send messages via the `ClusterClient` to any actor in the cluster
* that is registered in the [[DistributedPubSubMediator]] used by the [[ClusterReceptionist]].
* Messages are wrapped in [[DistributedPubSubMediator.Send]], [[DistributedPubSubMediator.SendToAll]]
* or [[DistributedPubSubMediator.Publish]] with the semantics described in
* [[DistributedPubSubMediator]].
*/
class ClusterClient(
initialContacts: Set[ActorSelection],
pingInterval: FiniteDuration = 3.second,
refreshContactsInterval: FiniteDuration = 1.minute)
extends Actor with ActorLogging {
import ClusterClient._
import ClusterReceptionist._
import DistributedPubSubMediator.{ Send, SendToAll, Publish }
var contacts: immutable.IndexedSeq[ActorSelection] = initialContacts.toVector
sendGetContacts()
import context.dispatcher
val refreshContactsTask = context.system.scheduler.schedule(
refreshContactsInterval, refreshContactsInterval, self, RefreshContactsTick)
val pingTask = context.system.scheduler.schedule(pingInterval, pingInterval, self, PingTick)
override def postStop(): Unit = {
super.postStop()
refreshContactsTask.cancel()
pingTask.cancel()
}
def receive = establishing
def establishing: Actor.Receive = {
case Contacts(contactPoints)
if (contactPoints.nonEmpty) {
contacts = contactPoints
contacts foreach { _ ! Identify(None) }
}
case ActorIdentity(_, Some(receptionist))
context watch receptionist
log.info("Connected to [{}]", receptionist.path)
context.become(active(receptionist))
case ActorIdentity(_, None) // ok, use another instead
case PingTick sendGetContacts()
case Pong
case RefreshContactsTick
case msg context.system.deadLetters forward msg
}
def active(receptionist: ActorRef): Actor.Receive = {
def becomeEstablishing(): Unit = {
log.info("Lost contact with [{}], restablishing connection", receptionist)
sendGetContacts()
context.become(establishing)
}
var pongTimestamp = System.nanoTime
{
case msg @ (_: Send | _: SendToAll | _: Publish)
receptionist forward msg
case PingTick
if (System.nanoTime - pongTimestamp > 3 * pingInterval.toNanos)
becomeEstablishing()
else
receptionist ! Ping
case Pong
pongTimestamp = System.nanoTime
case RefreshContactsTick
receptionist ! GetContacts
case Terminated(`receptionist`)
becomeEstablishing()
case Contacts(contactPoints)
// refresh of contacts
if (contactPoints.nonEmpty)
contacts = contactPoints
case _: ActorIdentity // ok, from previous establish, already handled
}
}
def sendGetContacts(): Unit = {
if (contacts.isEmpty) initialContacts foreach { _ ! GetContacts }
else if (contacts.size == 1) (initialContacts ++ contacts) foreach { _ ! GetContacts }
else contacts foreach { _ ! GetContacts }
}
}
/**
* Extension that starts [[ClusterReceptionist]] and accompanying [[DistributedPubSubMediator]]
* with settings defined in config section `akka.contrib.cluster.receptionist`.
* The [[DistributedPubSubMediator]] is started by the [[DistributedPubSubExtension]].
*/
object ClusterReceptionistExtension extends ExtensionId[ClusterReceptionistExtension] with ExtensionIdProvider {
override def get(system: ActorSystem): ClusterReceptionistExtension = super.get(system)
override def lookup = ClusterReceptionistExtension
override def createExtension(system: ExtendedActorSystem): ClusterReceptionistExtension =
new ClusterReceptionistExtension(system)
}
class ClusterReceptionistExtension(system: ExtendedActorSystem) extends Extension {
private val config = system.settings.config.getConfig("akka.contrib.cluster.receptionist")
private val role: Option[String] = config.getString("role") match {
case "" None
case r Some(r)
}
/**
* Returns true if this member is not tagged with the role configured for the
* receptionist.
*/
def isTerminated: Boolean = Cluster(system).isTerminated || !role.forall(Cluster(system).selfRoles.contains)
/**
* Register the actors that should be reachable for the clients in this [[DistributedPubSubMediator]].
*/
def pubSubMediator: ActorRef = DistributedPubSubExtension(system).mediator
/**
* The [[ClusterReceptionist]] actor
*/
val receptionist: ActorRef = {
if (isTerminated)
system.deadLetters
else {
val numberOfContacts: Int = config.getInt("number-of-contacts")
val responseTunnelReceiveTimeout =
Duration(config.getMilliseconds("response-tunnel-receive-timeout"), MILLISECONDS)
val name = config.getString("name")
// important to use val mediator here to activate it outside of ClusterReceptionist constructor
val mediator = pubSubMediator
system.actorOf(Props(new ClusterReceptionist(mediator, role, numberOfContacts,
responseTunnelReceiveTimeout)), name)
}
}
}
/**
* INTERNAL API
*/
private[pattern] object ClusterReceptionist {
@SerialVersionUID(1L)
case object GetContacts
@SerialVersionUID(1L)
case class Contacts(contactPoints: immutable.IndexedSeq[ActorSelection])
@SerialVersionUID(1L)
case object Ping
@SerialVersionUID(1L)
case object Pong
// FIXME change to akka.actor.Identify when that is in master
@SerialVersionUID(1L)
case class Identify(messageId: Any)
@SerialVersionUID(1L)
case class ActorIdentity(correlationId: Any, ref: Option[ActorRef])
def roleOption(role: String): Option[String] = role match {
case null | "" None
case _ Some(role)
}
/**
* Replies are tunneled via this actor, child of the receptionist, to avoid
* inbound connections from other cluster nodes to the client.
*/
class ClientResponseTunnel(client: ActorRef, timeout: FiniteDuration) extends Actor {
context.setReceiveTimeout(timeout)
def receive = {
case Ping // keep alive from client
case ReceiveTimeout context stop self
case msg client forward msg
}
}
}
/**
* [[ClusterClient]] connects to this actor to retrieve. The `ClusterReceptionist` is
* supposed to be started on all nodes, or all nodes with specified role, in the cluster.
* The receptionist can be started with the [[ClusterReceptionistExtension]] or as an
* ordinary actor.
*
* The receptionist forwards messages from the client to the associated [[DistributedPubSubMediator]],
* i.e. the client can send messages to any actor in the cluster that is registered in the
* `DistributedPubSubMediator`. Messages from the client are wrapped in
* [[DistributedPubSubMediator.Send]], [[DistributedPubSubMediator.SendToAll]]
* or [[DistributedPubSubMediator.Publish]] with the semantics described in
* [[DistributedPubSubMediator]].
*
* Response messages from the destination actor are tunneled via the receptionist
* to avoid inbound connections from other cluster nodes to the client, i.e.
* the `sender`, as seen by the destination actor, is not the client itself.
* The `sender` of the response messages, as seen by the client, is preserved
* as the original sender, so the client can choose to send subsequent messages
* directly to the actor in the cluster.
*/
class ClusterReceptionist(
pubSubMediator: ActorRef,
role: Option[String],
numberOfContacts: Int = 3,
responseTunnelReceiveTimeout: FiniteDuration = 30.seconds)
extends Actor with ActorLogging {
import DistributedPubSubMediator.{ Send, SendToAll, Publish }
/**
* Java API constructor with default values.
*/
def this(pubSubMediator: ActorRef, role: String) =
this(pubSubMediator, ClusterReceptionist.roleOption(role))
import ClusterReceptionist._
val cluster = Cluster(context.system)
import cluster.selfAddress
require(role.forall(cluster.selfRoles.contains),
s"This cluster member [${selfAddress}] doesn't have the role [$role]")
var nodes: immutable.SortedSet[Address] = {
def hashFor(node: Address): Int = node match {
// cluster node identifier is the host and port of the address; protocol and system is assumed to be the same
case Address(_, _, Some(host), Some(port)) MurmurHash.stringHash(s"${host}:${port}")
case _
throw new IllegalStateException(s"Unexpected address without host/port: [$node]")
}
implicit val ringOrdering: Ordering[Address] = Ordering.fromLessThan[Address] { (a, b)
val ha = hashFor(a)
val hb = hashFor(b)
ha < hb || (ha == hb && Member.addressOrdering.compare(a, b) < 0)
}
immutable.SortedSet()
}
val virtualNodesFactor = 10
var consistentHash: ConsistentHash[Address] = ConsistentHash(nodes, virtualNodesFactor)
override def preStart(): Unit = {
super.preStart()
require(!cluster.isTerminated, "Cluster node must not be terminated")
cluster.subscribe(self, classOf[MemberEvent])
}
override def postStop(): Unit = {
super.postStop()
cluster unsubscribe self
}
def matchingRole(m: Member): Boolean = role.forall(m.hasRole)
def responseTunnel(client: ActorRef): ActorRef = {
val encName = URLEncoder.encode(client.path.toSerializationFormat, "utf-8")
context.child(encName) match {
case Some(tunnel) tunnel
case None
context.actorOf(Props(new ClientResponseTunnel(client, responseTunnelReceiveTimeout)), encName)
}
}
def receive = {
case msg @ (_: Send | _: SendToAll | _: Publish)
pubSubMediator.tell(msg, responseTunnel(sender))
case Ping
responseTunnel(sender) ! Ping // keep alive
sender ! Pong
// FIXME remove when akka.actor.Identify when is in master
case Identify(messageId)
sender ! ActorIdentity(messageId, Some(self))
case GetContacts
// Consistent hashing is used to ensure that the reply to GetContacts
// is the same from all nodes (most of the time) and it also
// load balances the client connections among the nodes in the cluster.
if (numberOfContacts >= nodes.size) {
sender ! Contacts(nodes.map(a context.actorSelection(self.path.toStringWithAddress(a)))(collection.breakOut))
} else {
// using toStringWithAddress in case the client is local, normally it is not, and
// toStringWithAddress will use the remote address of the client
val a = consistentHash.nodeFor(sender.path.toStringWithAddress(cluster.selfAddress))
val slice = {
val first = nodes.from(a).tail.take(numberOfContacts)
if (first.size == numberOfContacts) first
else first ++ nodes.take(numberOfContacts - first.size)
}
sender ! Contacts(slice.map(a context.actorSelection(self.path.toStringWithAddress(a)))(collection.breakOut))
}
case state: CurrentClusterState
nodes = nodes.empty ++ state.members.collect { case m if m.status != MemberStatus.Joining && matchingRole(m) m.address }
consistentHash = ConsistentHash(nodes, virtualNodesFactor)
case MemberUp(m)
if (matchingRole(m)) {
nodes += m.address
consistentHash = ConsistentHash(nodes, virtualNodesFactor)
}
case MemberRemoved(m)
if (m.address == selfAddress)
context stop self
else if (matchingRole(m)) {
nodes -= m.address
consistentHash = ConsistentHash(nodes, virtualNodesFactor)
}
case _: MemberEvent // not of interest
}
}

View file

@ -0,0 +1,199 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.contrib.pattern
import language.postfixOps
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Props
import akka.cluster.Cluster
import akka.cluster.ClusterEvent._
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.testkit.STMultiNodeSpec
import akka.testkit._
import akka.actor.Address
object ClusterClientSpec extends MultiNodeConfig {
val client = role("client")
val first = role("first")
val second = role("second")
val third = role("third")
val fourth = role("fourth")
commonConfig(ConfigFactory.parseString("""
akka.loglevel = INFO
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote.log-remote-lifecycle-events = off
akka.cluster.auto-join = off
akka.cluster.auto-down = on
"""))
class TestService(testActor: ActorRef) extends Actor {
def receive = {
case msg
testActor forward msg
sender ! "ack"
}
}
class Service extends Actor {
def receive = {
case msg sender ! msg
}
}
}
class ClusterClientMultiJvmNode1 extends ClusterClientSpec
class ClusterClientMultiJvmNode2 extends ClusterClientSpec
class ClusterClientMultiJvmNode3 extends ClusterClientSpec
class ClusterClientMultiJvmNode4 extends ClusterClientSpec
class ClusterClientMultiJvmNode5 extends ClusterClientSpec
class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNodeSpec with ImplicitSender {
import ClusterClientSpec._
import DistributedPubSubMediator._
override def initialParticipants = roles.size
def join(from: RoleName, to: RoleName): Unit = {
runOn(from) {
Cluster(system) join node(to).address
createReceptionist()
}
enterBarrier(from.name + "-joined")
}
def createReceptionist(): ActorRef = ClusterReceptionistExtension(system).receptionist
def mediator: ActorRef = ClusterReceptionistExtension(system).pubSubMediator
def receptionist: ActorRef = ClusterReceptionistExtension(system).receptionist
def awaitCount(expected: Int): Unit = {
awaitAssert {
mediator ! Count
expectMsgType[Int] must be(expected)
}
}
def roleName(addr: Address): Option[RoleName] = roles.find(node(_).address == addr)
def initialContacts = Set(
system.actorSelection(node(second) / "user" / "receptionist"),
system.actorSelection(node(third) / "user" / "receptionist"))
"A ClusterClient" must {
"startup cluster" in within(30 seconds) {
join(first, first)
join(second, first)
join(third, first)
join(fourth, first)
runOn(fourth) {
val service = system.actorOf(Props(new TestService(testActor)), "testService")
mediator ! Put(service)
}
runOn(first, second, third, fourth) {
awaitCount(1)
}
enterBarrier("after-1")
}
"communicate to actor on any node in cluster" in within(10 seconds) {
runOn(client) {
val c = system.actorOf(Props(new ClusterClient(initialContacts)))
awaitAssert {
c ! Send("/user/testService", "hello", localAffinity = true)
expectMsg(1 second, "ack")
}
}
runOn(fourth) {
expectMsg("hello")
}
enterBarrier("after-2")
}
"demonstrate usage" in within(15 seconds) {
def host1 = first
def host2 = second
def host3 = third
//#server
runOn(host1) {
val mediator = ClusterReceptionistExtension(system).pubSubMediator
val serviceA = system.actorOf(Props[Service], "serviceA")
mediator ! DistributedPubSubMediator.Put(serviceA)
}
runOn(host2, host3) {
val mediator = ClusterReceptionistExtension(system).pubSubMediator
val serviceB = system.actorOf(Props[Service], "serviceB")
mediator ! DistributedPubSubMediator.Put(serviceB)
}
//#server
//#client
runOn(client) {
val c = system.actorOf(Props(new ClusterClient(initialContacts)))
c ! DistributedPubSubMediator.Send("/user/serviceA", "hello", localAffinity = true)
c ! DistributedPubSubMediator.SendToAll("/user/serviceB", "hi")
}
//#client
{ //not used, only demo
//#initialContacts
val initialContacts = Set(
system.actorSelection("akka.tcp://OtherSys@host1:2552/user/receptionist"),
system.actorSelection("akka.tcp://OtherSys@host2:2552/user/receptionist"))
//#initialContacts
}
// strange, barriers fail without this sleep
Thread.sleep(1000)
enterBarrier("after-3")
}
"re-establish connection to receptionist when connection is lost" in within(30 seconds) {
runOn(first, second, third, fourth) {
val service2 = system.actorOf(Props(new TestService(testActor)), "service2")
mediator ! Put(service2)
awaitCount(8)
}
enterBarrier("service2-replicated")
runOn(client) {
val c = system.actorOf(Props(new ClusterClient(initialContacts)))
awaitAssert {
c ! Send("/user/service2", "bonjour", localAffinity = true)
expectMsg(1 second, "ack")
}
val lastSenderAddress = lastSender.path.address
val receptionistRoleName = roleName(lastSenderAddress) match {
case Some(r) r
case None fail("unexpected missing roleName: " + lastSender.path.address)
}
testConductor.shutdown(receptionistRoleName, 0).await
awaitAssert {
c ! Send("/user/service2", "hi again", localAffinity = true)
expectMsg(1 second, "ack")
}
}
enterBarrier("verifed-3")
receiveWhile(2 seconds) {
case "hi again"
case other fail("unexpected message: " + other)
}
enterBarrier("after-4")
}
}
}

View file

@ -274,6 +274,11 @@ Distributed Publish Subscribe Pattern
See :ref:`distributed-pub-sub` in the contrib module.
Cluster Client
^^^^^^^^^^^^^^
See :ref:`cluster-client` in the contrib module.
Failure Detector
^^^^^^^^^^^^^^^^

View file

@ -262,6 +262,11 @@ Distributed Publish Subscribe Pattern
See :ref:`distributed-pub-sub` in the contrib module.
Cluster Client
^^^^^^^^^^^^^^
See :ref:`cluster-client` in the contrib module.
Failure Detector
^^^^^^^^^^^^^^^^