!clt #15110 Use buffer instead of stash in cluster client

* drop first in ClusterClient
This commit is contained in:
Patrik Nordwall 2015-06-10 19:58:45 +02:00
parent e2608e7cc2
commit a0e0c394fe
4 changed files with 78 additions and 28 deletions

View file

@ -92,17 +92,15 @@ akka.cluster.client {
# heartbeat-interval + acceptable-heartbeat-pause, i.e. 15 seconds with
# the default settings.
acceptable-heartbeat-pause = 13s
}
# //#cluster-client-mailbox-config
akka.cluster.client {
mailbox {
mailbox-type = "akka.dispatch.UnboundedDequeBasedMailbox"
stash-capacity = 1000
}
# If connection to the receptionist is not established the client will buffer
# this number of messages and deliver them the connection is established.
# When the buffer is full old messages will be dropped when new messages are sent
# via the client. Use 0 to disable buffering, i.e. messages will be dropped
# immediately if the location of the singleton is unknown.
# Maximum allowed buffer size is 10000.
buffer-size = 1000
}
# //#cluster-client-mailbox-config
akka.cluster.singleton {
# The actor name of the child singleton actor.

View file

@ -24,7 +24,6 @@ import akka.actor.Identify
import akka.actor.NoSerializationVerificationNeeded
import akka.actor.Props
import akka.actor.ReceiveTimeout
import akka.actor.Stash
import akka.actor.Terminated
import akka.cluster.Cluster
import akka.cluster.ClusterEvent._
@ -57,7 +56,8 @@ object ClusterClientSettings {
establishingGetContactsInterval = config.getDuration("establishing-get-contacts-interval", MILLISECONDS).millis,
refreshContactsInterval = config.getDuration("refresh-contacts-interval", MILLISECONDS).millis,
heartbeatInterval = config.getDuration("heartbeat-interval", MILLISECONDS).millis,
acceptableHeartbeatPause = config.getDuration("acceptable-heartbeat-pause", MILLISECONDS).millis)
acceptableHeartbeatPause = config.getDuration("acceptable-heartbeat-pause", MILLISECONDS).millis,
bufferSize = config.getInt("buffer-size"))
}
/**
@ -87,13 +87,21 @@ object ClusterClientSettings {
* be accepted before considering it to be an anomaly. The ClusterClient is using the
* [[akka.remote.DeadlineFailureDetector]], which will trigger if there are no heartbeats
* within the duration `heartbeatInterval + acceptableHeartbeatPause`.
* @param bufferSize If connection to the receptionist is not established the client
* will buffer this number of messages and deliver them the connection is established.
* When the buffer is full old messages will be dropped when new messages are sent via the
* client. Use 0 to disable buffering, i.e. messages will be dropped immediately if the
* location of the receptionist is unavailable.
*/
final class ClusterClientSettings(
val initialContacts: Set[ActorPath],
val establishingGetContactsInterval: FiniteDuration,
val refreshContactsInterval: FiniteDuration,
val heartbeatInterval: FiniteDuration,
val acceptableHeartbeatPause: FiniteDuration) extends NoSerializationVerificationNeeded {
val acceptableHeartbeatPause: FiniteDuration,
val bufferSize: Int) extends NoSerializationVerificationNeeded {
require(bufferSize >= 0 && bufferSize <= 10000, "bufferSize must be >= 0 and <= 10000")
def withInitialContacts(initialContacts: Set[ActorPath]): ClusterClientSettings = {
require(initialContacts.nonEmpty, "initialContacts must be defined")
@ -109,15 +117,18 @@ final class ClusterClientSettings(
def withHeartbeat(heartbeatInterval: FiniteDuration, acceptableHeartbeatPause: FiniteDuration): ClusterClientSettings =
copy(heartbeatInterval = heartbeatInterval, acceptableHeartbeatPause = acceptableHeartbeatPause)
def withBufferSize(bufferSize: Int): ClusterClientSettings =
copy(bufferSize = bufferSize)
private def copy(
initialContacts: Set[ActorPath] = initialContacts,
establishingGetContactsInterval: FiniteDuration = establishingGetContactsInterval,
refreshContactsInterval: FiniteDuration = refreshContactsInterval,
heartbeatInterval: FiniteDuration = heartbeatInterval,
acceptableHeartbeatPause: FiniteDuration = acceptableHeartbeatPause): ClusterClientSettings =
acceptableHeartbeatPause: FiniteDuration = acceptableHeartbeatPause,
bufferSize: Int = bufferSize): ClusterClientSettings =
new ClusterClientSettings(initialContacts, establishingGetContactsInterval, refreshContactsInterval,
heartbeatInterval, acceptableHeartbeatPause)
heartbeatInterval, acceptableHeartbeatPause, bufferSize)
}
object ClusterClient {
@ -126,7 +137,7 @@ object ClusterClient {
* Scala API: Factory method for `ClusterClient` [[akka.actor.Props]].
*/
def props(settings: ClusterClientSettings): Props =
Props(new ClusterClient(settings)).withDeploy(Deploy.local).withMailbox("akka.cluster.client.mailbox")
Props(new ClusterClient(settings)).withDeploy(Deploy.local)
@SerialVersionUID(1L)
final case class Send(path: String, msg: Any, localAffinity: Boolean) {
@ -181,8 +192,17 @@ object ClusterClient {
*
* Use the factory method [[ClusterClient#props]]) to create the
* [[akka.actor.Props]] for the actor.
*
* If the receptionist is not currently available, the client will buffer the messages
* and then deliver them when the connection to the receptionist has been established.
* The size of the buffer is configurable and it can be disabled by using a buffer size
* of 0. When the buffer is full old messages will be dropped when new messages are sent
* via the client.
*
* Note that this is a best effort implementation: messages can always be lost due to the distributed
* nature of the actors involved.
*/
class ClusterClient(settings: ClusterClientSettings) extends Actor with Stash with ActorLogging {
final class ClusterClient(settings: ClusterClientSettings) extends Actor with ActorLogging {
import ClusterClient._
import ClusterClient.Internal._
@ -205,6 +225,8 @@ class ClusterClient(settings: ClusterClientSettings) extends Actor with Stash wi
scheduleRefreshContactsTick(establishingGetContactsInterval)
self ! RefreshContactsTick
val buffer = new java.util.LinkedList[(Any, ActorRef)]
def scheduleRefreshContactsTick(interval: FiniteDuration): Unit = {
refreshContactsTask foreach { _.cancel() }
refreshContactsTask = Some(context.system.scheduler.schedule(
@ -228,14 +250,19 @@ class ClusterClient(settings: ClusterClientSettings) extends Actor with Stash wi
case ActorIdentity(_, Some(receptionist))
log.info("Connected to [{}]", receptionist.path)
scheduleRefreshContactsTick(refreshContactsInterval)
unstashAll()
sendBuffered(receptionist)
context.become(active(receptionist))
failureDetector.heartbeat()
case ActorIdentity(_, None) // ok, use another instead
case HeartbeatTick
failureDetector.heartbeat()
case RefreshContactsTick sendGetContacts()
case msg stash()
case Send(path, msg, localAffinity)
buffer(DistributedPubSubMediator.Send(path, msg, localAffinity))
case SendToAll(path, msg)
buffer(DistributedPubSubMediator.SendToAll(path, msg))
case Publish(topic, msg)
buffer(DistributedPubSubMediator.Publish(topic, msg))
}
def active(receptionist: ActorRef): Actor.Receive = {
@ -270,6 +297,26 @@ class ClusterClient(settings: ClusterClientSettings) extends Actor with Stash wi
else if (contacts.size == 1) (initialContactsSel ++ contacts) foreach { _ ! GetContacts }
else contacts foreach { _ ! GetContacts }
}
def buffer(msg: Any): Unit =
if (settings.bufferSize == 0)
log.debug("Receptionist not available and buffering is disabled, dropping message [{}]", msg.getClass.getName)
else if (buffer.size == settings.bufferSize) {
val (m, _) = buffer.removeFirst()
log.debug("Receptionist not available, buffer is full, dropping first message [{}]", m.getClass.getName)
buffer.addLast((msg, sender()))
} else {
log.debug("Receptionist not available, buffering message type [{}]", msg.getClass.getName)
buffer.addLast((msg, sender()))
}
def sendBuffered(receptionist: ActorRef): Unit = {
log.debug("Sending buffered messages to receptionist")
while (!buffer.isEmpty) {
val (msg, snd) = buffer.removeFirst()
receptionist.tell(msg, snd)
}
}
}
object ClusterClientReceptionist extends ExtensionId[ClusterClientReceptionist] with ExtensionIdProvider {
@ -286,7 +333,7 @@ object ClusterClientReceptionist extends ExtensionId[ClusterClientReceptionist]
* with settings defined in config section `akka.cluster.client.receptionist`.
* The [[akka.cluster.pubsub.DistributedPubSubMediator]] is started by the [[akka.cluster.pubsub.DistributedPubSub]] extension.
*/
class ClusterClientReceptionist(system: ExtendedActorSystem) extends Extension {
final class ClusterClientReceptionist(system: ExtendedActorSystem) extends Extension {
private val config = system.settings.config.getConfig("akka.cluster.client.receptionist")
private val role: Option[String] = config.getString("role") match {
@ -479,8 +526,9 @@ object ClusterReceptionist {
* The `sender` of the response messages, as seen by the client, is preserved
* as the original sender, so the client can choose to send subsequent messages
* directly to the actor in the cluster.
*
*/
class ClusterReceptionist(pubSubMediator: ActorRef, settings: ClusterReceptionistSettings)
final class ClusterReceptionist(pubSubMediator: ActorRef, settings: ClusterReceptionistSettings)
extends Actor with ActorLogging {
import DistributedPubSubMediator.{ Send, SendToAll, Publish }

View file

@ -272,6 +272,10 @@ The parameters of the ``Props`` factory methods in the ``ClusterClient`` compani
has been moved to settings object ``ClusterClientSettings``. This can be created from
system configuration properties and also amended with API as needed.
The buffer size of the ``ClusterClient`` can be defined in the ``ClusterClientSettings``
instead of defining ``stash-capacity`` of the mailbox. Buffering can be disabled by using a
buffer size of 0.
Normally, the ``ClusterReceptionist`` actor is started by the ``ClusterReceptionistExtension``.
This extension has been renamed to ``ClusterClientReceptionist``. It is also possible to start
it as an ordinary actor if you need multiple instances of it with different settings.

View file

@ -54,12 +54,12 @@ directly to the actor in the cluster.
While establishing a connection to a receptionist the ``ClusterClient`` will buffer
messages and send them when the connection is established. If the buffer is full
the ``ClusterClient`` will throw ``akka.actor.StashOverflowException``, which can be
handled in by the supervision strategy of the parent actor. The size of the buffer
can be configured by the following ``stash-capacity`` setting of the mailbox that is
used by the ``ClusterClient`` actor.
the ``ClusterClient`` will drop old messages when new messages are sent via the client.
The size of the buffer is configurable and it can be disabled by using a buffer size of 0.
.. includecode:: ../../../akka-cluster-tools/src/main/resources/reference.conf#cluster-client-mailbox-config
It's worth noting that messages can always be lost because of the distributed nature
of these actors. As always, additional logic should be implemented in the destination
(acknowledgement) and in the client (retry) actors to ensure at-least-once message delivery.
An Example
----------