From a0e0c394fe22ad0b17c5960e5e81452cf476cb91 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 10 Jun 2015 19:58:45 +0200 Subject: [PATCH] !clt #15110 Use buffer instead of stash in cluster client * drop first in ClusterClient --- .../src/main/resources/reference.conf | 18 +++-- .../akka/cluster/client/ClusterClient.scala | 72 +++++++++++++++---- .../project/migration-guide-2.3.x-2.4.x.rst | 4 ++ akka-docs/rst/scala/cluster-client.rst | 12 ++-- 4 files changed, 78 insertions(+), 28 deletions(-) diff --git a/akka-cluster-tools/src/main/resources/reference.conf b/akka-cluster-tools/src/main/resources/reference.conf index 432940115a..5116677c08 100644 --- a/akka-cluster-tools/src/main/resources/reference.conf +++ b/akka-cluster-tools/src/main/resources/reference.conf @@ -92,18 +92,16 @@ akka.cluster.client { # heartbeat-interval + acceptable-heartbeat-pause, i.e. 15 seconds with # the default settings. acceptable-heartbeat-pause = 13s + + # If connection to the receptionist is not established the client will buffer + # this number of messages and deliver them the connection is established. + # When the buffer is full old messages will be dropped when new messages are sent + # via the client. Use 0 to disable buffering, i.e. messages will be dropped + # immediately if the location of the singleton is unknown. + # Maximum allowed buffer size is 10000. + buffer-size = 1000 } -# //#cluster-client-mailbox-config -akka.cluster.client { - mailbox { - mailbox-type = "akka.dispatch.UnboundedDequeBasedMailbox" - stash-capacity = 1000 - } -} -# //#cluster-client-mailbox-config - - akka.cluster.singleton { # The actor name of the child singleton actor. singleton-name = "singleton" diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/client/ClusterClient.scala b/akka-cluster-tools/src/main/scala/akka/cluster/client/ClusterClient.scala index d0b4b1fd20..0f8bb673d3 100644 --- a/akka-cluster-tools/src/main/scala/akka/cluster/client/ClusterClient.scala +++ b/akka-cluster-tools/src/main/scala/akka/cluster/client/ClusterClient.scala @@ -24,7 +24,6 @@ import akka.actor.Identify import akka.actor.NoSerializationVerificationNeeded import akka.actor.Props import akka.actor.ReceiveTimeout -import akka.actor.Stash import akka.actor.Terminated import akka.cluster.Cluster import akka.cluster.ClusterEvent._ @@ -57,7 +56,8 @@ object ClusterClientSettings { establishingGetContactsInterval = config.getDuration("establishing-get-contacts-interval", MILLISECONDS).millis, refreshContactsInterval = config.getDuration("refresh-contacts-interval", MILLISECONDS).millis, heartbeatInterval = config.getDuration("heartbeat-interval", MILLISECONDS).millis, - acceptableHeartbeatPause = config.getDuration("acceptable-heartbeat-pause", MILLISECONDS).millis) + acceptableHeartbeatPause = config.getDuration("acceptable-heartbeat-pause", MILLISECONDS).millis, + bufferSize = config.getInt("buffer-size")) } /** @@ -87,13 +87,21 @@ object ClusterClientSettings { * be accepted before considering it to be an anomaly. The ClusterClient is using the * [[akka.remote.DeadlineFailureDetector]], which will trigger if there are no heartbeats * within the duration `heartbeatInterval + acceptableHeartbeatPause`. + * @param bufferSize If connection to the receptionist is not established the client + * will buffer this number of messages and deliver them the connection is established. + * When the buffer is full old messages will be dropped when new messages are sent via the + * client. Use 0 to disable buffering, i.e. messages will be dropped immediately if the + * location of the receptionist is unavailable. */ final class ClusterClientSettings( val initialContacts: Set[ActorPath], val establishingGetContactsInterval: FiniteDuration, val refreshContactsInterval: FiniteDuration, val heartbeatInterval: FiniteDuration, - val acceptableHeartbeatPause: FiniteDuration) extends NoSerializationVerificationNeeded { + val acceptableHeartbeatPause: FiniteDuration, + val bufferSize: Int) extends NoSerializationVerificationNeeded { + + require(bufferSize >= 0 && bufferSize <= 10000, "bufferSize must be >= 0 and <= 10000") def withInitialContacts(initialContacts: Set[ActorPath]): ClusterClientSettings = { require(initialContacts.nonEmpty, "initialContacts must be defined") @@ -109,15 +117,18 @@ final class ClusterClientSettings( def withHeartbeat(heartbeatInterval: FiniteDuration, acceptableHeartbeatPause: FiniteDuration): ClusterClientSettings = copy(heartbeatInterval = heartbeatInterval, acceptableHeartbeatPause = acceptableHeartbeatPause) + def withBufferSize(bufferSize: Int): ClusterClientSettings = + copy(bufferSize = bufferSize) + private def copy( initialContacts: Set[ActorPath] = initialContacts, establishingGetContactsInterval: FiniteDuration = establishingGetContactsInterval, refreshContactsInterval: FiniteDuration = refreshContactsInterval, heartbeatInterval: FiniteDuration = heartbeatInterval, - acceptableHeartbeatPause: FiniteDuration = acceptableHeartbeatPause): ClusterClientSettings = + acceptableHeartbeatPause: FiniteDuration = acceptableHeartbeatPause, + bufferSize: Int = bufferSize): ClusterClientSettings = new ClusterClientSettings(initialContacts, establishingGetContactsInterval, refreshContactsInterval, - heartbeatInterval, acceptableHeartbeatPause) - + heartbeatInterval, acceptableHeartbeatPause, bufferSize) } object ClusterClient { @@ -126,7 +137,7 @@ object ClusterClient { * Scala API: Factory method for `ClusterClient` [[akka.actor.Props]]. */ def props(settings: ClusterClientSettings): Props = - Props(new ClusterClient(settings)).withDeploy(Deploy.local).withMailbox("akka.cluster.client.mailbox") + Props(new ClusterClient(settings)).withDeploy(Deploy.local) @SerialVersionUID(1L) final case class Send(path: String, msg: Any, localAffinity: Boolean) { @@ -181,8 +192,17 @@ object ClusterClient { * * Use the factory method [[ClusterClient#props]]) to create the * [[akka.actor.Props]] for the actor. + * + * If the receptionist is not currently available, the client will buffer the messages + * and then deliver them when the connection to the receptionist has been established. + * The size of the buffer is configurable and it can be disabled by using a buffer size + * of 0. When the buffer is full old messages will be dropped when new messages are sent + * via the client. + * + * Note that this is a best effort implementation: messages can always be lost due to the distributed + * nature of the actors involved. */ -class ClusterClient(settings: ClusterClientSettings) extends Actor with Stash with ActorLogging { +final class ClusterClient(settings: ClusterClientSettings) extends Actor with ActorLogging { import ClusterClient._ import ClusterClient.Internal._ @@ -205,6 +225,8 @@ class ClusterClient(settings: ClusterClientSettings) extends Actor with Stash wi scheduleRefreshContactsTick(establishingGetContactsInterval) self ! RefreshContactsTick + val buffer = new java.util.LinkedList[(Any, ActorRef)] + def scheduleRefreshContactsTick(interval: FiniteDuration): Unit = { refreshContactsTask foreach { _.cancel() } refreshContactsTask = Some(context.system.scheduler.schedule( @@ -228,14 +250,19 @@ class ClusterClient(settings: ClusterClientSettings) extends Actor with Stash wi case ActorIdentity(_, Some(receptionist)) ⇒ log.info("Connected to [{}]", receptionist.path) scheduleRefreshContactsTick(refreshContactsInterval) - unstashAll() + sendBuffered(receptionist) context.become(active(receptionist)) failureDetector.heartbeat() case ActorIdentity(_, None) ⇒ // ok, use another instead case HeartbeatTick ⇒ failureDetector.heartbeat() case RefreshContactsTick ⇒ sendGetContacts() - case msg ⇒ stash() + case Send(path, msg, localAffinity) ⇒ + buffer(DistributedPubSubMediator.Send(path, msg, localAffinity)) + case SendToAll(path, msg) ⇒ + buffer(DistributedPubSubMediator.SendToAll(path, msg)) + case Publish(topic, msg) ⇒ + buffer(DistributedPubSubMediator.Publish(topic, msg)) } def active(receptionist: ActorRef): Actor.Receive = { @@ -270,6 +297,26 @@ class ClusterClient(settings: ClusterClientSettings) extends Actor with Stash wi else if (contacts.size == 1) (initialContactsSel ++ contacts) foreach { _ ! GetContacts } else contacts foreach { _ ! GetContacts } } + + def buffer(msg: Any): Unit = + if (settings.bufferSize == 0) + log.debug("Receptionist not available and buffering is disabled, dropping message [{}]", msg.getClass.getName) + else if (buffer.size == settings.bufferSize) { + val (m, _) = buffer.removeFirst() + log.debug("Receptionist not available, buffer is full, dropping first message [{}]", m.getClass.getName) + buffer.addLast((msg, sender())) + } else { + log.debug("Receptionist not available, buffering message type [{}]", msg.getClass.getName) + buffer.addLast((msg, sender())) + } + + def sendBuffered(receptionist: ActorRef): Unit = { + log.debug("Sending buffered messages to receptionist") + while (!buffer.isEmpty) { + val (msg, snd) = buffer.removeFirst() + receptionist.tell(msg, snd) + } + } } object ClusterClientReceptionist extends ExtensionId[ClusterClientReceptionist] with ExtensionIdProvider { @@ -286,7 +333,7 @@ object ClusterClientReceptionist extends ExtensionId[ClusterClientReceptionist] * with settings defined in config section `akka.cluster.client.receptionist`. * The [[akka.cluster.pubsub.DistributedPubSubMediator]] is started by the [[akka.cluster.pubsub.DistributedPubSub]] extension. */ -class ClusterClientReceptionist(system: ExtendedActorSystem) extends Extension { +final class ClusterClientReceptionist(system: ExtendedActorSystem) extends Extension { private val config = system.settings.config.getConfig("akka.cluster.client.receptionist") private val role: Option[String] = config.getString("role") match { @@ -479,8 +526,9 @@ object ClusterReceptionist { * The `sender` of the response messages, as seen by the client, is preserved * as the original sender, so the client can choose to send subsequent messages * directly to the actor in the cluster. + * */ -class ClusterReceptionist(pubSubMediator: ActorRef, settings: ClusterReceptionistSettings) +final class ClusterReceptionist(pubSubMediator: ActorRef, settings: ClusterReceptionistSettings) extends Actor with ActorLogging { import DistributedPubSubMediator.{ Send, SendToAll, Publish } diff --git a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst index 76da7c85b1..04d6d1f3e2 100644 --- a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst +++ b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst @@ -272,6 +272,10 @@ The parameters of the ``Props`` factory methods in the ``ClusterClient`` compani has been moved to settings object ``ClusterClientSettings``. This can be created from system configuration properties and also amended with API as needed. +The buffer size of the ``ClusterClient`` can be defined in the ``ClusterClientSettings`` +instead of defining ``stash-capacity`` of the mailbox. Buffering can be disabled by using a +buffer size of 0. + Normally, the ``ClusterReceptionist`` actor is started by the ``ClusterReceptionistExtension``. This extension has been renamed to ``ClusterClientReceptionist``. It is also possible to start it as an ordinary actor if you need multiple instances of it with different settings. diff --git a/akka-docs/rst/scala/cluster-client.rst b/akka-docs/rst/scala/cluster-client.rst index 89a870d252..e52ad9c5f2 100644 --- a/akka-docs/rst/scala/cluster-client.rst +++ b/akka-docs/rst/scala/cluster-client.rst @@ -54,13 +54,13 @@ directly to the actor in the cluster. While establishing a connection to a receptionist the ``ClusterClient`` will buffer messages and send them when the connection is established. If the buffer is full -the ``ClusterClient`` will throw ``akka.actor.StashOverflowException``, which can be -handled in by the supervision strategy of the parent actor. The size of the buffer -can be configured by the following ``stash-capacity`` setting of the mailbox that is -used by the ``ClusterClient`` actor. - -.. includecode:: ../../../akka-cluster-tools/src/main/resources/reference.conf#cluster-client-mailbox-config +the ``ClusterClient`` will drop old messages when new messages are sent via the client. +The size of the buffer is configurable and it can be disabled by using a buffer size of 0. +It's worth noting that messages can always be lost because of the distributed nature +of these actors. As always, additional logic should be implemented in the destination +(acknowledgement) and in the client (retry) actors to ensure at-least-once message delivery. + An Example ----------