Merge pull request #17697 from akka/wip-15110-singleton-proxy-buffer-patriknw

!clt #15110 Use buffer instead of stash in singleton proxy and cluster client
This commit is contained in:
Patrik Nordwall 2015-06-18 11:20:16 +02:00
commit bc4d480d7e
7 changed files with 155 additions and 47 deletions

1
.gitignore vendored
View file

@ -27,6 +27,7 @@
.scalastyle
.settings
.cache*
.tmpBin
.tags
.tags_sorted_by_file
.target

View file

@ -92,18 +92,16 @@ akka.cluster.client {
# heartbeat-interval + acceptable-heartbeat-pause, i.e. 15 seconds with
# the default settings.
acceptable-heartbeat-pause = 13s
# If connection to the receptionist is not established the client will buffer
# this number of messages and deliver them the connection is established.
# When the buffer is full old messages will be dropped when new messages are sent
# via the client. Use 0 to disable buffering, i.e. messages will be dropped
# immediately if the location of the singleton is unknown.
# Maximum allowed buffer size is 10000.
buffer-size = 1000
}
# //#cluster-client-mailbox-config
akka.cluster.client {
mailbox {
mailbox-type = "akka.dispatch.UnboundedDequeBasedMailbox"
stash-capacity = 1000
}
}
# //#cluster-client-mailbox-config
akka.cluster.singleton {
# The actor name of the child singleton actor.
singleton-name = "singleton"
@ -143,5 +141,14 @@ akka.cluster.singleton-proxy {
role = ""
# Interval at which the proxy will try to resolve the singleton instance.
singleton-identification-interval = 1s
singleton-identification-interval = 1s
# If the location of the singleton is unknown the proxy will buffer this
# number of messages and deliver them when the singleton is identified.
# When the buffer is full old messages will be dropped when new messages are
# sent via the proxy.
# Use 0 to disable buffering, i.e. messages will be dropped immediately if
# the location of the singleton is unknown.
# Maximum allowed buffer size is 10000.
buffer-size = 1000
}

View file

@ -24,7 +24,6 @@ import akka.actor.Identify
import akka.actor.NoSerializationVerificationNeeded
import akka.actor.Props
import akka.actor.ReceiveTimeout
import akka.actor.Stash
import akka.actor.Terminated
import akka.cluster.Cluster
import akka.cluster.ClusterEvent._
@ -57,7 +56,8 @@ object ClusterClientSettings {
establishingGetContactsInterval = config.getDuration("establishing-get-contacts-interval", MILLISECONDS).millis,
refreshContactsInterval = config.getDuration("refresh-contacts-interval", MILLISECONDS).millis,
heartbeatInterval = config.getDuration("heartbeat-interval", MILLISECONDS).millis,
acceptableHeartbeatPause = config.getDuration("acceptable-heartbeat-pause", MILLISECONDS).millis)
acceptableHeartbeatPause = config.getDuration("acceptable-heartbeat-pause", MILLISECONDS).millis,
bufferSize = config.getInt("buffer-size"))
}
/**
@ -87,13 +87,21 @@ object ClusterClientSettings {
* be accepted before considering it to be an anomaly. The ClusterClient is using the
* [[akka.remote.DeadlineFailureDetector]], which will trigger if there are no heartbeats
* within the duration `heartbeatInterval + acceptableHeartbeatPause`.
* @param bufferSize If connection to the receptionist is not established the client
* will buffer this number of messages and deliver them the connection is established.
* When the buffer is full old messages will be dropped when new messages are sent via the
* client. Use 0 to disable buffering, i.e. messages will be dropped immediately if the
* location of the receptionist is unavailable.
*/
final class ClusterClientSettings(
val initialContacts: Set[ActorPath],
val establishingGetContactsInterval: FiniteDuration,
val refreshContactsInterval: FiniteDuration,
val heartbeatInterval: FiniteDuration,
val acceptableHeartbeatPause: FiniteDuration) extends NoSerializationVerificationNeeded {
val acceptableHeartbeatPause: FiniteDuration,
val bufferSize: Int) extends NoSerializationVerificationNeeded {
require(bufferSize >= 0 && bufferSize <= 10000, "bufferSize must be >= 0 and <= 10000")
def withInitialContacts(initialContacts: Set[ActorPath]): ClusterClientSettings = {
require(initialContacts.nonEmpty, "initialContacts must be defined")
@ -109,15 +117,18 @@ final class ClusterClientSettings(
def withHeartbeat(heartbeatInterval: FiniteDuration, acceptableHeartbeatPause: FiniteDuration): ClusterClientSettings =
copy(heartbeatInterval = heartbeatInterval, acceptableHeartbeatPause = acceptableHeartbeatPause)
def withBufferSize(bufferSize: Int): ClusterClientSettings =
copy(bufferSize = bufferSize)
private def copy(
initialContacts: Set[ActorPath] = initialContacts,
establishingGetContactsInterval: FiniteDuration = establishingGetContactsInterval,
refreshContactsInterval: FiniteDuration = refreshContactsInterval,
heartbeatInterval: FiniteDuration = heartbeatInterval,
acceptableHeartbeatPause: FiniteDuration = acceptableHeartbeatPause): ClusterClientSettings =
acceptableHeartbeatPause: FiniteDuration = acceptableHeartbeatPause,
bufferSize: Int = bufferSize): ClusterClientSettings =
new ClusterClientSettings(initialContacts, establishingGetContactsInterval, refreshContactsInterval,
heartbeatInterval, acceptableHeartbeatPause)
heartbeatInterval, acceptableHeartbeatPause, bufferSize)
}
object ClusterClient {
@ -126,7 +137,7 @@ object ClusterClient {
* Scala API: Factory method for `ClusterClient` [[akka.actor.Props]].
*/
def props(settings: ClusterClientSettings): Props =
Props(new ClusterClient(settings)).withDeploy(Deploy.local).withMailbox("akka.cluster.client.mailbox")
Props(new ClusterClient(settings)).withDeploy(Deploy.local)
@SerialVersionUID(1L)
final case class Send(path: String, msg: Any, localAffinity: Boolean) {
@ -181,8 +192,17 @@ object ClusterClient {
*
* Use the factory method [[ClusterClient#props]]) to create the
* [[akka.actor.Props]] for the actor.
*
* If the receptionist is not currently available, the client will buffer the messages
* and then deliver them when the connection to the receptionist has been established.
* The size of the buffer is configurable and it can be disabled by using a buffer size
* of 0. When the buffer is full old messages will be dropped when new messages are sent
* via the client.
*
* Note that this is a best effort implementation: messages can always be lost due to the distributed
* nature of the actors involved.
*/
class ClusterClient(settings: ClusterClientSettings) extends Actor with Stash with ActorLogging {
final class ClusterClient(settings: ClusterClientSettings) extends Actor with ActorLogging {
import ClusterClient._
import ClusterClient.Internal._
@ -205,6 +225,8 @@ class ClusterClient(settings: ClusterClientSettings) extends Actor with Stash wi
scheduleRefreshContactsTick(establishingGetContactsInterval)
self ! RefreshContactsTick
val buffer = new java.util.LinkedList[(Any, ActorRef)]
def scheduleRefreshContactsTick(interval: FiniteDuration): Unit = {
refreshContactsTask foreach { _.cancel() }
refreshContactsTask = Some(context.system.scheduler.schedule(
@ -228,14 +250,19 @@ class ClusterClient(settings: ClusterClientSettings) extends Actor with Stash wi
case ActorIdentity(_, Some(receptionist))
log.info("Connected to [{}]", receptionist.path)
scheduleRefreshContactsTick(refreshContactsInterval)
unstashAll()
sendBuffered(receptionist)
context.become(active(receptionist))
failureDetector.heartbeat()
case ActorIdentity(_, None) // ok, use another instead
case HeartbeatTick
failureDetector.heartbeat()
case RefreshContactsTick sendGetContacts()
case msg stash()
case Send(path, msg, localAffinity)
buffer(DistributedPubSubMediator.Send(path, msg, localAffinity))
case SendToAll(path, msg)
buffer(DistributedPubSubMediator.SendToAll(path, msg))
case Publish(topic, msg)
buffer(DistributedPubSubMediator.Publish(topic, msg))
}
def active(receptionist: ActorRef): Actor.Receive = {
@ -270,6 +297,26 @@ class ClusterClient(settings: ClusterClientSettings) extends Actor with Stash wi
else if (contacts.size == 1) (initialContactsSel ++ contacts) foreach { _ ! GetContacts }
else contacts foreach { _ ! GetContacts }
}
def buffer(msg: Any): Unit =
if (settings.bufferSize == 0)
log.debug("Receptionist not available and buffering is disabled, dropping message [{}]", msg.getClass.getName)
else if (buffer.size == settings.bufferSize) {
val (m, _) = buffer.removeFirst()
log.debug("Receptionist not available, buffer is full, dropping first message [{}]", m.getClass.getName)
buffer.addLast((msg, sender()))
} else {
log.debug("Receptionist not available, buffering message type [{}]", msg.getClass.getName)
buffer.addLast((msg, sender()))
}
def sendBuffered(receptionist: ActorRef): Unit = {
log.debug("Sending buffered messages to receptionist")
while (!buffer.isEmpty) {
val (msg, snd) = buffer.removeFirst()
receptionist.tell(msg, snd)
}
}
}
object ClusterClientReceptionist extends ExtensionId[ClusterClientReceptionist] with ExtensionIdProvider {
@ -286,7 +333,7 @@ object ClusterClientReceptionist extends ExtensionId[ClusterClientReceptionist]
* with settings defined in config section `akka.cluster.client.receptionist`.
* The [[akka.cluster.pubsub.DistributedPubSubMediator]] is started by the [[akka.cluster.pubsub.DistributedPubSub]] extension.
*/
class ClusterClientReceptionist(system: ExtendedActorSystem) extends Extension {
final class ClusterClientReceptionist(system: ExtendedActorSystem) extends Extension {
private val config = system.settings.config.getConfig("akka.cluster.client.receptionist")
private val role: Option[String] = config.getString("role") match {
@ -479,8 +526,9 @@ object ClusterReceptionist {
* The `sender` of the response messages, as seen by the client, is preserved
* as the original sender, so the client can choose to send subsequent messages
* directly to the actor in the cluster.
*
*/
class ClusterReceptionist(pubSubMediator: ActorRef, settings: ClusterReceptionistSettings)
final class ClusterReceptionist(pubSubMediator: ActorRef, settings: ClusterReceptionistSettings)
extends Actor with ActorLogging {
import DistributedPubSubMediator.{ Send, SendToAll, Publish }

View file

@ -34,7 +34,8 @@ object ClusterSingletonProxySettings {
def apply(config: Config): ClusterSingletonProxySettings =
new ClusterSingletonProxySettings(
role = roleOption(config.getString("role")),
singletonIdentificationInterval = config.getDuration("singleton-identification-interval", MILLISECONDS).millis)
singletonIdentificationInterval = config.getDuration("singleton-identification-interval", MILLISECONDS).millis,
bufferSize = config.getInt("buffer-size"))
/**
* Java API: Create settings from the default configuration
@ -59,10 +60,17 @@ object ClusterSingletonProxySettings {
/**
* @param role The role of the cluster nodes where the singleton can be deployed. If None, then any node will do.
* @param singletonIdentificationInterval Interval at which the proxy will try to resolve the singleton instance.
* @param bufferSize If the location of the singleton is unknown the proxy will buffer this number of messages
* and deliver them when the singleton is identified. When the buffer is full old messages will be dropped
* when new messages are sent viea the proxy. Use 0 to disable buffering, i.e. messages will be dropped
* immediately if the location of the singleton is unknown.
*/
final class ClusterSingletonProxySettings(
val role: Option[String],
val singletonIdentificationInterval: FiniteDuration) extends NoSerializationVerificationNeeded {
val singletonIdentificationInterval: FiniteDuration,
val bufferSize: Int) extends NoSerializationVerificationNeeded {
require(bufferSize >= 0 && bufferSize <= 10000, "bufferSize must be >= 0 and <= 10000")
def withRole(role: String): ClusterSingletonProxySettings = copy(role = ClusterSingletonProxySettings.roleOption(role))
@ -71,9 +79,13 @@ final class ClusterSingletonProxySettings(
def withSingletonIdentificationInterval(singletonIdentificationInterval: FiniteDuration): ClusterSingletonProxySettings =
copy(singletonIdentificationInterval = singletonIdentificationInterval)
def withBufferSize(bufferSize: Int): ClusterSingletonProxySettings =
copy(bufferSize = bufferSize)
private def copy(role: Option[String] = role,
singletonIdentificationInterval: FiniteDuration = singletonIdentificationInterval): ClusterSingletonProxySettings =
new ClusterSingletonProxySettings(role, singletonIdentificationInterval)
singletonIdentificationInterval: FiniteDuration = singletonIdentificationInterval,
bufferSize: Int = bufferSize): ClusterSingletonProxySettings =
new ClusterSingletonProxySettings(role, singletonIdentificationInterval, bufferSize)
}
object ClusterSingletonProxy {
@ -96,11 +108,11 @@ object ClusterSingletonProxy {
*
* The proxy can be started on every node where the singleton needs to be reached and used as if it were the singleton
* itself. It will then act as a router to the currently running singleton instance. If the singleton is not currently
* available, e.g., during hand off or startup, the proxy will stash the messages sent to the singleton and then unstash
* them when the singleton is finally available. The proxy mixes in the [[akka.actor.Stash]] trait, so it can be
* configured accordingly.
* available, e.g., during hand off or startup, the proxy will buffer the messages sent to the singleton and then deliver
* them when the singleton is finally available. The size of the buffer is configurable and it can be disabled by using
* a buffer size of 0. When the buffer is full old messages will be dropped when new messages are sent via the proxy.
*
* The proxy works by keeping track of the oldest cluster member. When a new oldest member is identified, e.g., because
* The proxy works by keeping track of the oldest cluster member. When a new oldest member is identified, e.g. because
* the older one left the cluster, or at startup, the proxy will try to identify the singleton on the oldest member by
* periodically sending an [[akka.actor.Identify]] message until the singleton responds with its
* [[akka.actor.ActorIdentity]].
@ -108,7 +120,7 @@ object ClusterSingletonProxy {
* Note that this is a best effort implementation: messages can always be lost due to the distributed nature of the
* actors involved.
*/
class ClusterSingletonProxy(singletonPathString: String, settings: ClusterSingletonProxySettings) extends Actor with Stash with ActorLogging {
final class ClusterSingletonProxy(singletonPathString: String, settings: ClusterSingletonProxySettings) extends Actor with ActorLogging {
import settings._
val singletonPath = singletonPathString.split("/")
var identifyCounter = 0
@ -124,6 +136,8 @@ class ClusterSingletonProxy(singletonPathString: String, settings: ClusterSingle
}
var membersByAge: immutable.SortedSet[Member] = immutable.SortedSet.empty(ageOrdering)
var buffer = new java.util.LinkedList[(Any, ActorRef)]
// subscribe to MemberEvent, re-subscribe when restart
override def preStart(): Unit = {
cancelTimer()
@ -206,11 +220,12 @@ class ClusterSingletonProxy(singletonPathString: String, settings: ClusterSingle
// singleton identification logic
case ActorIdentity(identifyId, Some(s))
// if the new singleton is defined, unstash all messages
// if the new singleton is defined, deliver all buffered messages
log.info("Singleton identified: {}", s.path)
singleton = Some(s)
context.watch(s)
cancelTimer()
unstashAll()
sendBuffered()
case _: ActorIdentity // do nothing
case ClusterSingletonProxy.TryToIdentifySingleton if identifyTimer.isDefined
membersByAge.headOption.foreach {
@ -219,16 +234,41 @@ class ClusterSingletonProxy(singletonPathString: String, settings: ClusterSingle
log.debug("Trying to identify singleton at {}", singletonAddress)
context.actorSelection(singletonAddress) ! Identify(identifyId)
}
case Terminated(ref)
if (singleton.exists(_ == ref)) {
// buffering mode, identification of new will start when old node is removed
singleton = None
}
// forwarding/stashing logic
case msg: Any
singleton match {
case Some(s)
log.debug("Forwarding message to current singleton instance {}", msg)
log.debug("Forwarding message type [{}] to current singleton instance", msg.getClass.getName)
s forward msg
case None
log.debug("No singleton available, stashing message {}", msg)
stash()
buffer(msg)
}
}
def buffer(msg: Any): Unit =
if (settings.bufferSize == 0)
log.debug("Singleton not available and buffering is disabled, dropping message [{}]", msg.getClass.getName)
else if (buffer.size == settings.bufferSize) {
val (m, _) = buffer.removeFirst()
log.debug("Singleton not available, buffer is full, dropping first message [{}]", m.getClass.getName)
buffer.addLast((msg, sender()))
} else {
log.debug("Singleton not available, buffering message type [{}]", msg.getClass.getName)
buffer.addLast((msg, sender()))
}
def sendBuffered(): Unit = {
log.debug("Sending buffered messages to current singleton instance")
val target = singleton.get
while (!buffer.isEmpty) {
val (msg, snd) = buffer.removeFirst()
target.tell(msg, snd)
}
}
}

View file

@ -251,6 +251,10 @@ Parameters to the ``Props`` factory methods have been moved to settings object `
and ``ClusterSingletonProxySettings``. These can be created from system configuration properties and also
amended with API as needed.
The buffer size of the ``ClusterSingletonProxy`` can be defined in the ``ClusterSingletonProxySettings``
instead of defining ``stash-capacity`` of the mailbox. Buffering can be disabled by using a
buffer size of 0.
DistributedPubSub construction
==============================
@ -268,6 +272,10 @@ The parameters of the ``Props`` factory methods in the ``ClusterClient`` compani
has been moved to settings object ``ClusterClientSettings``. This can be created from
system configuration properties and also amended with API as needed.
The buffer size of the ``ClusterClient`` can be defined in the ``ClusterClientSettings``
instead of defining ``stash-capacity`` of the mailbox. Buffering can be disabled by using a
buffer size of 0.
Normally, the ``ClusterReceptionist`` actor is started by the ``ClusterReceptionistExtension``.
This extension has been renamed to ``ClusterClientReceptionist``. It is also possible to start
it as an ordinary actor if you need multiple instances of it with different settings.

View file

@ -54,13 +54,13 @@ directly to the actor in the cluster.
While establishing a connection to a receptionist the ``ClusterClient`` will buffer
messages and send them when the connection is established. If the buffer is full
the ``ClusterClient`` will throw ``akka.actor.StashOverflowException``, which can be
handled in by the supervision strategy of the parent actor. The size of the buffer
can be configured by the following ``stash-capacity`` setting of the mailbox that is
used by the ``ClusterClient`` actor.
.. includecode:: ../../../akka-cluster-tools/src/main/resources/reference.conf#cluster-client-mailbox-config
the ``ClusterClient`` will drop old messages when new messages are sent via the client.
The size of the buffer is configurable and it can be disabled by using a buffer size of 0.
It's worth noting that messages can always be lost because of the distributed nature
of these actors. As always, additional logic should be implemented in the destination
(acknowledgement) and in the client (retry) actors to ensure at-least-once message delivery.
An Example
----------

View file

@ -44,10 +44,14 @@ the oldest node in the cluster and resolve the singleton's ``ActorRef`` by expli
singleton's ``actorSelection`` the ``akka.actor.Identify`` message and waiting for it to reply.
This is performed periodically if the singleton doesn't reply within a certain (configurable) time.
Given the implementation, there might be periods of time during which the ``ActorRef`` is unavailable,
e.g., when a node leaves the cluster. In these cases, the proxy will stash away all messages until it
is able to identify the singleton. It's worth noting that messages can always be lost because of the
distributed nature of these actors. As always, additional logic should be implemented in the singleton
(acknowledgement) and in the client (retry) actors to ensure at-least-once message delivery.
e.g., when a node leaves the cluster. In these cases, the proxy will buffer the messages sent to the
singleton and then deliver them when the singleton is finally available. If the buffer is full
the ``ClusterSingletonProxy`` will drop old messages when new messages are sent via the proxy.
The size of the buffer is configurable and it can be disabled by using a buffer size of 0.
It's worth noting that messages can always be lost because of the distributed nature of these actors.
As always, additional logic should be implemented in the singleton (acknowledgement) and in the
client (retry) actors to ensure at-least-once message delivery.
Potential problems to be aware of
---------------------------------