!cto #17454 Introduce ClusterClientSettings and ClusterReceptionistSettings

* rename ClusterReceptionistExtension to ClusterClientReceptionist
This commit is contained in:
Patrik Nordwall 2015-05-08 08:49:08 +02:00
parent b8594d475e
commit c57a8da744
6 changed files with 230 additions and 113 deletions

View file

@ -609,10 +609,8 @@ object ShardRegion {
private case object Retry extends ShardRegionCommand
private def roleOption(role: String): Option[String] = role match {
case null | "" None
case _ Some(role)
}
private def roleOption(role: String): Option[String] =
if (role == "") None else Option(role)
/**
* INTERNAL API. Sends `PoisonPill` to the entries and when all of them have terminated

View file

@ -47,7 +47,7 @@ akka.actor {
# //#receptionist-ext-config
# Settings for the ClusterReceptionistExtension
# Settings for the ClusterClientReceptionist extension
akka.cluster.client.receptionist {
# Actor name of the ClusterReceptionist actor, /user/receptionist
name = receptionist
@ -65,6 +65,24 @@ akka.cluster.client.receptionist {
}
# //#receptionist-ext-config
# Settings for the ClusterClient
akka.cluster.client {
# Actor paths of the ClusterReceptionist actors on the servers (cluster nodes)
# that the client will try to contact initially. It is mandatory to specify
# at least one initial contact.
# Comma separated full actor paths defined by a string on the form of
# "akka.tcp://system@hostname:port/user/receptionist"
initial-contacts = []
# Interval at which the client retries to establish contact with one of
# ClusterReceptionist on the servers (cluster nodes)
establishing-get-contacts-interval = 3s
# Interval at which the client will ask the ClusterReceptionist for
# new contact points to be used for next reconnect.
refresh-contacts-interval = 60s
}
# //#cluster-client-mailbox-config
akka.cluster.client {
mailbox {

View file

@ -4,64 +4,113 @@
package akka.cluster.client
import java.net.URLEncoder
import scala.collection.immutable
import scala.concurrent.duration._
import akka.actor.Actor
import akka.actor.ActorIdentity
import akka.actor.ActorLogging
import akka.actor.ActorPath
import akka.actor.ActorRef
import akka.actor.ActorSelection
import akka.actor.ActorSystem
import akka.actor.Address
import akka.actor.Cancellable
import akka.actor.Deploy
import akka.actor.ExtendedActorSystem
import akka.actor.Extension
import akka.actor.ExtensionId
import akka.actor.ExtensionIdProvider
import akka.actor.Identify
import akka.actor.NoSerializationVerificationNeeded
import akka.actor.Props
import akka.actor.ReceiveTimeout
import akka.actor.Stash
import akka.actor.Terminated
import akka.cluster.Cluster
import akka.cluster.ClusterEvent._
import akka.cluster.Member
import akka.cluster.MemberStatus
import akka.cluster.pubsub._
import akka.japi.Util.immutableSeq
import akka.routing.ConsistentHash
import akka.routing.MurmurHash
import akka.actor.Stash
import akka.actor.Cancellable
import akka.cluster.pubsub._
import com.typesafe.config.Config
object ClusterClientSettings {
/**
* Create settings from the default configuration
* `akka.cluster.client`.
*/
def apply(system: ActorSystem): ClusterClientSettings =
apply(system.settings.config.getConfig("akka.cluster.client"))
/**
* Create settings from a configuration with the same layout as
* the default configuration `akka.cluster.client`.
*/
def apply(config: Config): ClusterClientSettings = {
val initialContacts = immutableSeq(config.getStringList("initial-contacts")).map(ActorPath.fromString).toSet
new ClusterClientSettings(
initialContacts,
establishingGetContactsInterval = config.getDuration("establishing-get-contacts-interval", MILLISECONDS).millis,
refreshContactsInterval = config.getDuration("refresh-contacts-interval", MILLISECONDS).millis)
}
/**
* Java API: Create settings from the default configuration
* `akka.cluster.client`.
*/
def create(system: ActorSystem): ClusterClientSettings = apply(system)
/**
* Java API: Create settings from a configuration with the same layout as
* the default configuration `akka.cluster.client`.
*/
def create(config: Config): ClusterClientSettings = apply(config)
}
/**
* @param initialContacts Actor paths of the `ClusterReceptionist` actors on
* the servers (cluster nodes) that the client will try to contact initially.
* @param establishingGetContactsInterval Interval at which the client retries
* to establish contact with one of ClusterReceptionist on the servers (cluster nodes)
* @param refreshContactsInterval Interval at which the client will ask the
* `ClusterReceptionist` for new contact points to be used for next reconnect.
*/
final class ClusterClientSettings(
val initialContacts: Set[ActorPath],
val establishingGetContactsInterval: FiniteDuration,
val refreshContactsInterval: FiniteDuration) extends NoSerializationVerificationNeeded {
def withInitialContacts(initialContacts: Set[ActorPath]): ClusterClientSettings = {
require(initialContacts.nonEmpty, "initialContacts must be defined")
copy(initialContacts = initialContacts)
}
def withEstablishingGetContactsInterval(establishingGetContactsInterval: FiniteDuration): ClusterClientSettings =
copy(establishingGetContactsInterval = establishingGetContactsInterval)
def withRefreshContactsInterval(refreshContactsInterval: FiniteDuration): ClusterClientSettings =
copy(refreshContactsInterval = refreshContactsInterval)
private def copy(
initialContacts: Set[ActorPath] = initialContacts,
establishingGetContactsInterval: FiniteDuration = establishingGetContactsInterval,
refreshContactsInterval: FiniteDuration = refreshContactsInterval): ClusterClientSettings =
new ClusterClientSettings(initialContacts, establishingGetContactsInterval, refreshContactsInterval)
}
object ClusterClient {
/**
* Scala API: Factory method for `ClusterClient` [[akka.actor.Props]].
*/
def props(
initialContacts: Set[ActorSelection],
establishingGetContactsInterval: FiniteDuration = 3.second,
refreshContactsInterval: FiniteDuration = 1.minute): Props =
Props(classOf[ClusterClient], initialContacts, establishingGetContactsInterval, refreshContactsInterval).
withMailbox("akka.cluster.client.mailbox")
/**
* Java API: Factory method for `ClusterClient` [[akka.actor.Props]].
*/
def props(
initialContacts: java.util.Set[ActorSelection],
establishingGetContactsInterval: FiniteDuration,
refreshContactsInterval: FiniteDuration): Props = {
import scala.collection.JavaConverters._
props(initialContacts.asScala.toSet, establishingGetContactsInterval, refreshContactsInterval)
}
/**
* Java API: Factory method for `ClusterClient` [[akka.actor.Props]] with
* default values.
*/
def defaultProps(initialContacts: java.util.Set[ActorSelection]): Props = {
import scala.collection.JavaConverters._
props(initialContacts.asScala.toSet)
}
def props(settings: ClusterClientSettings): Props =
Props(new ClusterClient(settings)).withDeploy(Deploy.local).withMailbox("akka.cluster.client.mailbox")
@SerialVersionUID(1L)
final case class Send(path: String, msg: Any, localAffinity: Boolean) {
@ -116,17 +165,18 @@ object ClusterClient {
* Use the factory method [[ClusterClient#props]]) to create the
* [[akka.actor.Props]] for the actor.
*/
class ClusterClient(
initialContacts: Set[ActorSelection],
establishingGetContactsInterval: FiniteDuration,
refreshContactsInterval: FiniteDuration)
extends Actor with Stash with ActorLogging {
class ClusterClient(settings: ClusterClientSettings) extends Actor with Stash with ActorLogging {
import ClusterClient._
import ClusterClient.Internal._
import ClusterReceptionist.Internal._
import settings._
var contacts: immutable.IndexedSeq[ActorSelection] = initialContacts.toVector
require(initialContacts.nonEmpty, "initialContacts must be defined")
val initialContactsSel: immutable.IndexedSeq[ActorSelection] =
initialContacts.map(context.actorSelection).toVector
var contacts = initialContactsSel
sendGetContacts()
import context.dispatcher
@ -187,27 +237,27 @@ class ClusterClient(
}
def sendGetContacts(): Unit = {
if (contacts.isEmpty) initialContacts foreach { _ ! GetContacts }
else if (contacts.size == 1) (initialContacts ++ contacts) foreach { _ ! GetContacts }
if (contacts.isEmpty) initialContactsSel foreach { _ ! GetContacts }
else if (contacts.size == 1) (initialContactsSel ++ contacts) foreach { _ ! GetContacts }
else contacts foreach { _ ! GetContacts }
}
}
object ClusterClientReceptionist extends ExtensionId[ClusterClientReceptionist] with ExtensionIdProvider {
override def get(system: ActorSystem): ClusterClientReceptionist = super.get(system)
override def lookup = ClusterClientReceptionist
override def createExtension(system: ExtendedActorSystem): ClusterClientReceptionist =
new ClusterClientReceptionist(system)
}
/**
* Extension that starts [[ClusterReceptionist]] and accompanying [[akka.cluster.pubsub.DistributedPubSubMediator]]
* with settings defined in config section `akka.cluster.client.receptionist`.
* The [[akka.cluster.pubsub.DistributedPubSubMediator]] is started by the [[akka.cluster.pubsub.DistributedPubSubExtension]] extension.
* The [[akka.cluster.pubsub.DistributedPubSubMediator]] is started by the [[akka.cluster.pubsub.DistributedPubSub]] extension.
*/
object ClusterReceptionistExtension extends ExtensionId[ClusterReceptionistExtension] with ExtensionIdProvider {
override def get(system: ActorSystem): ClusterReceptionistExtension = super.get(system)
override def lookup = ClusterReceptionistExtension
override def createExtension(system: ExtendedActorSystem): ClusterReceptionistExtension =
new ClusterReceptionistExtension(system)
}
class ClusterReceptionistExtension(system: ExtendedActorSystem) extends Extension {
class ClusterClientReceptionist(system: ExtendedActorSystem) extends Extension {
private val config = system.settings.config.getConfig("akka.cluster.client.receptionist")
private val role: Option[String] = config.getString("role") match {
@ -264,18 +314,82 @@ class ClusterReceptionistExtension(system: ExtendedActorSystem) extends Extensio
if (isTerminated)
system.deadLetters
else {
val numberOfContacts: Int = config.getInt("number-of-contacts")
val responseTunnelReceiveTimeout =
config.getDuration("response-tunnel-receive-timeout", MILLISECONDS).millis
val name = config.getString("name")
// important to use val mediator here to activate it outside of ClusterReceptionist constructor
val mediator = pubSubMediator
system.actorOf(ClusterReceptionist.props(mediator, role, numberOfContacts,
responseTunnelReceiveTimeout), name)
system.actorOf(ClusterReceptionist.props(mediator, ClusterReceptionistSettings(config)), name)
}
}
}
object ClusterReceptionistSettings {
/**
* Create settings from the default configuration
* `akka.cluster.client.receptionist`.
*/
def apply(system: ActorSystem): ClusterReceptionistSettings =
apply(system.settings.config.getConfig("akka.cluster.client.receptionist"))
/**
* Create settings from a configuration with the same layout as
* the default configuration `akka.cluster.client.receptionist`.
*/
def apply(config: Config): ClusterReceptionistSettings =
new ClusterReceptionistSettings(
role = roleOption(config.getString("role")),
numberOfContacts = config.getInt("number-of-contacts"),
responseTunnelReceiveTimeout = config.getDuration("response-tunnel-receive-timeout", MILLISECONDS).millis)
/**
* Java API: Create settings from the default configuration
* `akka.cluster.client.receptionist`.
*/
def create(system: ActorSystem): ClusterReceptionistSettings = apply(system)
/**
* Java API: Create settings from a configuration with the same layout as
* the default configuration `akka.cluster.client.receptionist`.
*/
def create(config: Config): ClusterReceptionistSettings = apply(config)
/**
* INTERNAL API
*/
private[akka] def roleOption(role: String): Option[String] =
if (role == "") None else Option(role)
}
/**
* @param role Start the receptionist on members tagged with this role.
* All members are used if undefined.
* @param numberOfContacts The receptionist will send this number of contact points to the client
* @param responseTunnelReceiveTimeout The actor that tunnel response messages to the
* client will be stopped after this time of inactivity.
*/
final class ClusterReceptionistSettings(
val role: Option[String],
val numberOfContacts: Int,
val responseTunnelReceiveTimeout: FiniteDuration) extends NoSerializationVerificationNeeded {
def withRole(role: String): ClusterReceptionistSettings = copy(role = ClusterReceptionistSettings.roleOption(role))
def withRole(role: Option[String]): ClusterReceptionistSettings = copy(role = role)
def withNumberOfContacts(numberOfContacts: Int): ClusterReceptionistSettings =
copy(numberOfContacts = numberOfContacts)
def withResponseTunnelReceiveTimeout(responseTunnelReceiveTimeout: FiniteDuration): ClusterReceptionistSettings =
copy(responseTunnelReceiveTimeout = responseTunnelReceiveTimeout)
private def copy(
role: Option[String] = role,
numberOfContacts: Int = numberOfContacts,
responseTunnelReceiveTimeout: FiniteDuration = responseTunnelReceiveTimeout): ClusterReceptionistSettings =
new ClusterReceptionistSettings(role, numberOfContacts, responseTunnelReceiveTimeout)
}
object ClusterReceptionist {
/**
@ -283,29 +397,8 @@ object ClusterReceptionist {
*/
def props(
pubSubMediator: ActorRef,
role: Option[String],
numberOfContacts: Int = 3,
responseTunnelReceiveTimeout: FiniteDuration = 30.seconds): Props =
Props(classOf[ClusterReceptionist], pubSubMediator, role, numberOfContacts, responseTunnelReceiveTimeout)
/**
* Java API: Factory method for `ClusterReceptionist` [[akka.actor.Props]].
*/
def props(
pubSubMediator: ActorRef,
role: String,
numberOfContacts: Int,
responseTunnelReceiveTimeout: FiniteDuration): Props =
props(pubSubMediator, Internal.roleOption(role), numberOfContacts, responseTunnelReceiveTimeout)
/**
* Java API: Factory method for `ClusterReceptionist` [[akka.actor.Props]]
* with default values.
*/
def props(
pubSubMediator: ActorRef,
role: String): Props =
props(pubSubMediator, Internal.roleOption(role))
settings: ClusterReceptionistSettings): Props =
Props(new ClusterReceptionist(pubSubMediator, settings)).withDeploy(Deploy.local)
/**
* INTERNAL API
@ -318,11 +411,6 @@ object ClusterReceptionist {
@SerialVersionUID(1L)
case object Ping
def roleOption(role: String): Option[String] = role match {
case null | "" None
case _ Some(role)
}
/**
* Replies are tunneled via this actor, child of the receptionist, to avoid
* inbound connections from other cluster nodes to the client.
@ -344,7 +432,7 @@ object ClusterReceptionist {
/**
* [[ClusterClient]] connects to this actor to retrieve. The `ClusterReceptionist` is
* supposed to be started on all nodes, or all nodes with specified role, in the cluster.
* The receptionist can be started with the [[ClusterReceptionistExtension]] or as an
* The receptionist can be started with the [[ClusterClientReceptionist]] or as an
* ordinary actor (use the factory method [[ClusterReceptionist#props]]).
*
* The receptionist forwards messages from the client to the associated [[akka.cluster.pubsub.DistributedPubSubMediator]],
@ -361,16 +449,13 @@ object ClusterReceptionist {
* as the original sender, so the client can choose to send subsequent messages
* directly to the actor in the cluster.
*/
class ClusterReceptionist(
pubSubMediator: ActorRef,
role: Option[String],
numberOfContacts: Int,
responseTunnelReceiveTimeout: FiniteDuration)
class ClusterReceptionist(pubSubMediator: ActorRef, settings: ClusterReceptionistSettings)
extends Actor with ActorLogging {
import DistributedPubSubMediator.{ Send, SendToAll, Publish }
import ClusterReceptionist.Internal._
import settings._
val cluster = Cluster(context.system)
import cluster.selfAddress

View file

@ -68,7 +68,7 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
enterBarrier(from.name + "-joined")
}
def createReceptionist(): Unit = ClusterReceptionistExtension(system)
def createReceptionist(): Unit = ClusterClientReceptionist(system)
def awaitCount(expected: Int): Unit = {
awaitAssert {
@ -80,8 +80,8 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
def roleName(addr: Address): Option[RoleName] = roles.find(node(_).address == addr)
def initialContacts = Set(
system.actorSelection(node(second) / "user" / "receptionist"),
system.actorSelection(node(third) / "user" / "receptionist"))
node(second) / "user" / "receptionist",
node(third) / "user" / "receptionist")
"A ClusterClient" must {
@ -92,7 +92,7 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
join(fourth, first)
runOn(fourth) {
val service = system.actorOf(Props(classOf[TestService], testActor), "testService")
ClusterReceptionistExtension(system).registerService(service)
ClusterClientReceptionist(system).registerService(service)
}
runOn(first, second, third, fourth) {
awaitCount(1)
@ -103,7 +103,8 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
"communicate to actor on any node in cluster" in within(10 seconds) {
runOn(client) {
val c = system.actorOf(ClusterClient.props(initialContacts))
val c = system.actorOf(ClusterClient.props(
ClusterClientSettings(system).withInitialContacts(initialContacts)))
c ! ClusterClient.Send("/user/testService", "hello", localAffinity = true)
expectMsg("ack")
}
@ -122,12 +123,12 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
//#server
runOn(host1) {
val serviceA = system.actorOf(Props[Service], "serviceA")
ClusterReceptionistExtension(system).registerService(serviceA)
ClusterClientReceptionist(system).registerService(serviceA)
}
runOn(host2, host3) {
val serviceB = system.actorOf(Props[Service], "serviceB")
ClusterReceptionistExtension(system).registerService(serviceB)
ClusterClientReceptionist(system).registerService(serviceB)
}
//#server
@ -138,7 +139,8 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
//#client
runOn(client) {
val c = system.actorOf(ClusterClient.props(initialContacts))
val c = system.actorOf(ClusterClient.props(
ClusterClientSettings(system).withInitialContacts(initialContacts)))
c ! ClusterClient.Send("/user/serviceA", "hello", localAffinity = true)
c ! ClusterClient.SendToAll("/user/serviceB", "hi")
}
@ -165,13 +167,14 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
"re-establish connection to receptionist when connection is lost" in within(30 seconds) {
runOn(first, second, third, fourth) {
val service2 = system.actorOf(Props(classOf[TestService], testActor), "service2")
ClusterReceptionistExtension(system).registerService(service2)
ClusterClientReceptionist(system).registerService(service2)
awaitCount(8)
}
enterBarrier("service2-replicated")
runOn(client) {
val c = system.actorOf(ClusterClient.props(initialContacts))
val c = system.actorOf(ClusterClient.props(
ClusterClientSettings(system).withInitialContacts(initialContacts)))
c ! ClusterClient.Send("/user/service2", "bonjour", localAffinity = true)
expectMsg("ack")

View file

@ -228,10 +228,23 @@ amended with API as needed.
DistributedPubSub construction
==============================
Normally, the ``DistributedPubSubMediator`` is started by the ``DistributedPubSubExtension``.
Normally, the ``DistributedPubSubMediator`` actor is started by the ``DistributedPubSubExtension``.
This extension has been renamed to ``DistributedPubSub``. It is also possible to start
it as an ordinary actor if you need multiple instances of it with different settings.
The parameters of the ``Props`` factory methods in the ``DistributedPubSubMediator`` companion
has been moved to settings object ``DistributedPubSubSettings``. This can be created from
system configuration properties and also amended with API as needed.
ClusterClient construction
==========================
The parameters of the ``Props`` factory methods in the ``ClusterClient`` companion
has been moved to settings object ``ClusterClientSettings``. This can be created from
system configuration properties and also amended with API as needed.
Normally, the ``ClusterReceptionist`` actor is started by the ``ClusterReceptionistExtension``.
This extension has been renamed to ``ClusterClientReceptionist``. It is also possible to start
it as an ordinary actor if you need multiple instances of it with different settings.
The parameters of the ``Props`` factory methods in the ``ClusterReceptionist`` companion
has been moved to settings object ``ClusterReceptionistSettings``. This can be created from
system configuration properties and also amended with API as needed.

View file

@ -18,12 +18,12 @@ the cluster client.
The receptionist is supposed to be started on all nodes, or all nodes with specified role,
in the cluster. The receptionist can be started with the ``ClusterReceptionistExtension``
in the cluster. The receptionist can be started with the ``ClusterClientReceptionist`` extension
or as an ordinary actor.
You can send messages via the ``ClusterClient`` to any actor in the cluster that is registered
in the ``DistributedPubSubMediator`` used by the ``ClusterReceptionist``.
The ``ClusterReceptionistExtension`` provides methods for registration of actors that
The ``ClusterClientReceptionist`` provides methods for registration of actors that
should be reachable from the client. Messages are wrapped in ``ClusterClient.Send``,
``ClusterClient.SendToAll`` or ``ClusterClient.Publish``.
@ -67,7 +67,7 @@ An Example
On the cluster nodes first start the receptionist. Note, it is recommended to load the extension
when the actor system is started by defining it in the ``akka.extensions`` configuration property::
akka.extensions = ["akka.cluster.client.ClusterReceptionistExtension"]
akka.extensions = ["akka.cluster.client.ClusterClientReceptionist"]
Next, register the actors that should be available for the client.
@ -89,25 +89,25 @@ A more comprehensive sample is available in the `Typesafe Activator <http://www.
tutorial named `Distributed workers with Akka and Scala! <http://www.typesafe.com/activator/template/akka-distributed-workers>`_
and `Distributed workers with Akka and Java! <http://www.typesafe.com/activator/template/akka-distributed-workers-java>`_.
ClusterReceptionistExtension
ClusterClientReceptionist
----------------------------
In the example above the receptionist is started and accessed with the ``akka.cluster.client.ClusterReceptionistExtension``.
In the example above the receptionist is started and accessed with the ``akka.cluster.client.ClusterClientReceptionist``.
That is convenient and perfectly fine in most cases, but it can be good to know that it is possible to
start the ``akka.cluster.client.ClusterReceptionist`` actor as an ordinary actor and you can have several
different receptionists at the same time, serving different types of clients.
The ``ClusterReceptionistExtension`` can be configured with the following properties:
The ``ClusterClientReceptionist`` can be configured with the following properties:
.. includecode:: ../../../akka-cluster-tools/src/main/resources/reference.conf#receptionist-ext-config
Note that the ``ClusterReceptionistExtension`` uses the ``DistributedPubSub`` extension, which is described
Note that the ``ClusterClientReceptionist`` uses the ``DistributedPubSub`` extension, which is described
in :ref:`distributed-pub-sub`.
It is recommended to load the extension when the actor system is started by defining it in the
``akka.extensions`` configuration property::
akka.extensions = ["akka.cluster.client.ClusterReceptionistExtension"]
akka.extensions = ["akka.cluster.client.ClusterClientReceptionist"]
Dependencies
------------