From e2608e7cc21dc2ce8f5a05ff579b115e6674155d Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 10 Jun 2015 19:21:45 +0200 Subject: [PATCH 1/2] !clt #15110 Use buffer instead of stash in singleton proxy * drop first in singleton proxy --- .gitignore | 1 + .../src/main/resources/reference.conf | 11 ++- .../singleton/ClusterSingletonProxy.scala | 68 +++++++++++++++---- .../project/migration-guide-2.3.x-2.4.x.rst | 4 ++ akka-docs/rst/scala/cluster-singleton.rst | 12 ++-- 5 files changed, 77 insertions(+), 19 deletions(-) diff --git a/.gitignore b/.gitignore index 233c9eb850..2d8c08866b 100755 --- a/.gitignore +++ b/.gitignore @@ -27,6 +27,7 @@ .scalastyle .settings .cache* +.tmpBin .tags .tags_sorted_by_file .target diff --git a/akka-cluster-tools/src/main/resources/reference.conf b/akka-cluster-tools/src/main/resources/reference.conf index f9cf58b127..432940115a 100644 --- a/akka-cluster-tools/src/main/resources/reference.conf +++ b/akka-cluster-tools/src/main/resources/reference.conf @@ -143,5 +143,14 @@ akka.cluster.singleton-proxy { role = "" # Interval at which the proxy will try to resolve the singleton instance. - singleton-identification-interval = 1s + singleton-identification-interval = 1s + + # If the location of the singleton is unknown the proxy will buffer this + # number of messages and deliver them when the singleton is identified. + # When the buffer is full old messages will be dropped when new messages are + # sent via the proxy. + # Use 0 to disable buffering, i.e. messages will be dropped immediately if + # the location of the singleton is unknown. + # Maximum allowed buffer size is 10000. + buffer-size = 1000 } diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonProxy.scala b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonProxy.scala index 0a5564ea77..15fea1f68c 100644 --- a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonProxy.scala +++ b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonProxy.scala @@ -34,7 +34,8 @@ object ClusterSingletonProxySettings { def apply(config: Config): ClusterSingletonProxySettings = new ClusterSingletonProxySettings( role = roleOption(config.getString("role")), - singletonIdentificationInterval = config.getDuration("singleton-identification-interval", MILLISECONDS).millis) + singletonIdentificationInterval = config.getDuration("singleton-identification-interval", MILLISECONDS).millis, + bufferSize = config.getInt("buffer-size")) /** * Java API: Create settings from the default configuration @@ -59,10 +60,17 @@ object ClusterSingletonProxySettings { /** * @param role The role of the cluster nodes where the singleton can be deployed. If None, then any node will do. * @param singletonIdentificationInterval Interval at which the proxy will try to resolve the singleton instance. + * @param bufferSize If the location of the singleton is unknown the proxy will buffer this number of messages + * and deliver them when the singleton is identified. When the buffer is full old messages will be dropped + * when new messages are sent viea the proxy. Use 0 to disable buffering, i.e. messages will be dropped + * immediately if the location of the singleton is unknown. */ final class ClusterSingletonProxySettings( val role: Option[String], - val singletonIdentificationInterval: FiniteDuration) extends NoSerializationVerificationNeeded { + val singletonIdentificationInterval: FiniteDuration, + val bufferSize: Int) extends NoSerializationVerificationNeeded { + + require(bufferSize >= 0 && bufferSize <= 10000, "bufferSize must be >= 0 and <= 10000") def withRole(role: String): ClusterSingletonProxySettings = copy(role = ClusterSingletonProxySettings.roleOption(role)) @@ -71,9 +79,13 @@ final class ClusterSingletonProxySettings( def withSingletonIdentificationInterval(singletonIdentificationInterval: FiniteDuration): ClusterSingletonProxySettings = copy(singletonIdentificationInterval = singletonIdentificationInterval) + def withBufferSize(bufferSize: Int): ClusterSingletonProxySettings = + copy(bufferSize = bufferSize) + private def copy(role: Option[String] = role, - singletonIdentificationInterval: FiniteDuration = singletonIdentificationInterval): ClusterSingletonProxySettings = - new ClusterSingletonProxySettings(role, singletonIdentificationInterval) + singletonIdentificationInterval: FiniteDuration = singletonIdentificationInterval, + bufferSize: Int = bufferSize): ClusterSingletonProxySettings = + new ClusterSingletonProxySettings(role, singletonIdentificationInterval, bufferSize) } object ClusterSingletonProxy { @@ -96,11 +108,11 @@ object ClusterSingletonProxy { * * The proxy can be started on every node where the singleton needs to be reached and used as if it were the singleton * itself. It will then act as a router to the currently running singleton instance. If the singleton is not currently - * available, e.g., during hand off or startup, the proxy will stash the messages sent to the singleton and then unstash - * them when the singleton is finally available. The proxy mixes in the [[akka.actor.Stash]] trait, so it can be - * configured accordingly. + * available, e.g., during hand off or startup, the proxy will buffer the messages sent to the singleton and then deliver + * them when the singleton is finally available. The size of the buffer is configurable and it can be disabled by using + * a buffer size of 0. When the buffer is full old messages will be dropped when new messages are sent via the proxy. * - * The proxy works by keeping track of the oldest cluster member. When a new oldest member is identified, e.g., because + * The proxy works by keeping track of the oldest cluster member. When a new oldest member is identified, e.g. because * the older one left the cluster, or at startup, the proxy will try to identify the singleton on the oldest member by * periodically sending an [[akka.actor.Identify]] message until the singleton responds with its * [[akka.actor.ActorIdentity]]. @@ -108,7 +120,7 @@ object ClusterSingletonProxy { * Note that this is a best effort implementation: messages can always be lost due to the distributed nature of the * actors involved. */ -class ClusterSingletonProxy(singletonPathString: String, settings: ClusterSingletonProxySettings) extends Actor with Stash with ActorLogging { +final class ClusterSingletonProxy(singletonPathString: String, settings: ClusterSingletonProxySettings) extends Actor with ActorLogging { import settings._ val singletonPath = singletonPathString.split("/") var identifyCounter = 0 @@ -124,6 +136,8 @@ class ClusterSingletonProxy(singletonPathString: String, settings: ClusterSingle } var membersByAge: immutable.SortedSet[Member] = immutable.SortedSet.empty(ageOrdering) + var buffer = new java.util.LinkedList[(Any, ActorRef)] + // subscribe to MemberEvent, re-subscribe when restart override def preStart(): Unit = { cancelTimer() @@ -206,11 +220,12 @@ class ClusterSingletonProxy(singletonPathString: String, settings: ClusterSingle // singleton identification logic case ActorIdentity(identifyId, Some(s)) ⇒ - // if the new singleton is defined, unstash all messages + // if the new singleton is defined, deliver all buffered messages log.info("Singleton identified: {}", s.path) singleton = Some(s) + context.watch(s) cancelTimer() - unstashAll() + sendBuffered() case _: ActorIdentity ⇒ // do nothing case ClusterSingletonProxy.TryToIdentifySingleton if identifyTimer.isDefined ⇒ membersByAge.headOption.foreach { @@ -219,16 +234,41 @@ class ClusterSingletonProxy(singletonPathString: String, settings: ClusterSingle log.debug("Trying to identify singleton at {}", singletonAddress) context.actorSelection(singletonAddress) ! Identify(identifyId) } + case Terminated(ref) ⇒ + if (singleton.exists(_ == ref)) { + // buffering mode, identification of new will start when old node is removed + singleton = None + } // forwarding/stashing logic case msg: Any ⇒ singleton match { case Some(s) ⇒ - log.debug("Forwarding message to current singleton instance {}", msg) + log.debug("Forwarding message type [{}] to current singleton instance", msg.getClass.getName) s forward msg case None ⇒ - log.debug("No singleton available, stashing message {}", msg) - stash() + buffer(msg) } } + + def buffer(msg: Any): Unit = + if (settings.bufferSize == 0) + log.debug("Singleton not available and buffering is disabled, dropping message [{}]", msg.getClass.getName) + else if (buffer.size == settings.bufferSize) { + val (m, _) = buffer.removeFirst() + log.debug("Singleton not available, buffer is full, dropping first message [{}]", m.getClass.getName) + buffer.addLast((msg, sender())) + } else { + log.debug("Singleton not available, buffering message type [{}]", msg.getClass.getName) + buffer.addLast((msg, sender())) + } + + def sendBuffered(): Unit = { + log.debug("Sending buffered messages to current singleton instance") + val target = singleton.get + while (!buffer.isEmpty) { + val (msg, snd) = buffer.removeFirst() + target.tell(msg, snd) + } + } } diff --git a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst index 1d150d9703..76da7c85b1 100644 --- a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst +++ b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst @@ -251,6 +251,10 @@ Parameters to the ``Props`` factory methods have been moved to settings object ` and ``ClusterSingletonProxySettings``. These can be created from system configuration properties and also amended with API as needed. +The buffer size of the ``ClusterSingletonProxy`` can be defined in the ``ClusterSingletonProxySettings`` +instead of defining ``stash-capacity`` of the mailbox. Buffering can be disabled by using a +buffer size of 0. + DistributedPubSub construction ============================== diff --git a/akka-docs/rst/scala/cluster-singleton.rst b/akka-docs/rst/scala/cluster-singleton.rst index ab027d55b5..ea4ba7a356 100644 --- a/akka-docs/rst/scala/cluster-singleton.rst +++ b/akka-docs/rst/scala/cluster-singleton.rst @@ -44,10 +44,14 @@ the oldest node in the cluster and resolve the singleton's ``ActorRef`` by expli singleton's ``actorSelection`` the ``akka.actor.Identify`` message and waiting for it to reply. This is performed periodically if the singleton doesn't reply within a certain (configurable) time. Given the implementation, there might be periods of time during which the ``ActorRef`` is unavailable, -e.g., when a node leaves the cluster. In these cases, the proxy will stash away all messages until it -is able to identify the singleton. It's worth noting that messages can always be lost because of the -distributed nature of these actors. As always, additional logic should be implemented in the singleton -(acknowledgement) and in the client (retry) actors to ensure at-least-once message delivery. +e.g., when a node leaves the cluster. In these cases, the proxy will buffer the messages sent to the +singleton and then deliver them when the singleton is finally available. If the buffer is full +the ``ClusterSingletonProxy`` will drop old messages when new messages are sent via the proxy. +The size of the buffer is configurable and it can be disabled by using a buffer size of 0. + +It's worth noting that messages can always be lost because of the distributed nature of these actors. +As always, additional logic should be implemented in the singleton (acknowledgement) and in the +client (retry) actors to ensure at-least-once message delivery. Potential problems to be aware of --------------------------------- From a0e0c394fe22ad0b17c5960e5e81452cf476cb91 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 10 Jun 2015 19:58:45 +0200 Subject: [PATCH 2/2] !clt #15110 Use buffer instead of stash in cluster client * drop first in ClusterClient --- .../src/main/resources/reference.conf | 18 +++-- .../akka/cluster/client/ClusterClient.scala | 72 +++++++++++++++---- .../project/migration-guide-2.3.x-2.4.x.rst | 4 ++ akka-docs/rst/scala/cluster-client.rst | 12 ++-- 4 files changed, 78 insertions(+), 28 deletions(-) diff --git a/akka-cluster-tools/src/main/resources/reference.conf b/akka-cluster-tools/src/main/resources/reference.conf index 432940115a..5116677c08 100644 --- a/akka-cluster-tools/src/main/resources/reference.conf +++ b/akka-cluster-tools/src/main/resources/reference.conf @@ -92,18 +92,16 @@ akka.cluster.client { # heartbeat-interval + acceptable-heartbeat-pause, i.e. 15 seconds with # the default settings. acceptable-heartbeat-pause = 13s + + # If connection to the receptionist is not established the client will buffer + # this number of messages and deliver them the connection is established. + # When the buffer is full old messages will be dropped when new messages are sent + # via the client. Use 0 to disable buffering, i.e. messages will be dropped + # immediately if the location of the singleton is unknown. + # Maximum allowed buffer size is 10000. + buffer-size = 1000 } -# //#cluster-client-mailbox-config -akka.cluster.client { - mailbox { - mailbox-type = "akka.dispatch.UnboundedDequeBasedMailbox" - stash-capacity = 1000 - } -} -# //#cluster-client-mailbox-config - - akka.cluster.singleton { # The actor name of the child singleton actor. singleton-name = "singleton" diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/client/ClusterClient.scala b/akka-cluster-tools/src/main/scala/akka/cluster/client/ClusterClient.scala index d0b4b1fd20..0f8bb673d3 100644 --- a/akka-cluster-tools/src/main/scala/akka/cluster/client/ClusterClient.scala +++ b/akka-cluster-tools/src/main/scala/akka/cluster/client/ClusterClient.scala @@ -24,7 +24,6 @@ import akka.actor.Identify import akka.actor.NoSerializationVerificationNeeded import akka.actor.Props import akka.actor.ReceiveTimeout -import akka.actor.Stash import akka.actor.Terminated import akka.cluster.Cluster import akka.cluster.ClusterEvent._ @@ -57,7 +56,8 @@ object ClusterClientSettings { establishingGetContactsInterval = config.getDuration("establishing-get-contacts-interval", MILLISECONDS).millis, refreshContactsInterval = config.getDuration("refresh-contacts-interval", MILLISECONDS).millis, heartbeatInterval = config.getDuration("heartbeat-interval", MILLISECONDS).millis, - acceptableHeartbeatPause = config.getDuration("acceptable-heartbeat-pause", MILLISECONDS).millis) + acceptableHeartbeatPause = config.getDuration("acceptable-heartbeat-pause", MILLISECONDS).millis, + bufferSize = config.getInt("buffer-size")) } /** @@ -87,13 +87,21 @@ object ClusterClientSettings { * be accepted before considering it to be an anomaly. The ClusterClient is using the * [[akka.remote.DeadlineFailureDetector]], which will trigger if there are no heartbeats * within the duration `heartbeatInterval + acceptableHeartbeatPause`. + * @param bufferSize If connection to the receptionist is not established the client + * will buffer this number of messages and deliver them the connection is established. + * When the buffer is full old messages will be dropped when new messages are sent via the + * client. Use 0 to disable buffering, i.e. messages will be dropped immediately if the + * location of the receptionist is unavailable. */ final class ClusterClientSettings( val initialContacts: Set[ActorPath], val establishingGetContactsInterval: FiniteDuration, val refreshContactsInterval: FiniteDuration, val heartbeatInterval: FiniteDuration, - val acceptableHeartbeatPause: FiniteDuration) extends NoSerializationVerificationNeeded { + val acceptableHeartbeatPause: FiniteDuration, + val bufferSize: Int) extends NoSerializationVerificationNeeded { + + require(bufferSize >= 0 && bufferSize <= 10000, "bufferSize must be >= 0 and <= 10000") def withInitialContacts(initialContacts: Set[ActorPath]): ClusterClientSettings = { require(initialContacts.nonEmpty, "initialContacts must be defined") @@ -109,15 +117,18 @@ final class ClusterClientSettings( def withHeartbeat(heartbeatInterval: FiniteDuration, acceptableHeartbeatPause: FiniteDuration): ClusterClientSettings = copy(heartbeatInterval = heartbeatInterval, acceptableHeartbeatPause = acceptableHeartbeatPause) + def withBufferSize(bufferSize: Int): ClusterClientSettings = + copy(bufferSize = bufferSize) + private def copy( initialContacts: Set[ActorPath] = initialContacts, establishingGetContactsInterval: FiniteDuration = establishingGetContactsInterval, refreshContactsInterval: FiniteDuration = refreshContactsInterval, heartbeatInterval: FiniteDuration = heartbeatInterval, - acceptableHeartbeatPause: FiniteDuration = acceptableHeartbeatPause): ClusterClientSettings = + acceptableHeartbeatPause: FiniteDuration = acceptableHeartbeatPause, + bufferSize: Int = bufferSize): ClusterClientSettings = new ClusterClientSettings(initialContacts, establishingGetContactsInterval, refreshContactsInterval, - heartbeatInterval, acceptableHeartbeatPause) - + heartbeatInterval, acceptableHeartbeatPause, bufferSize) } object ClusterClient { @@ -126,7 +137,7 @@ object ClusterClient { * Scala API: Factory method for `ClusterClient` [[akka.actor.Props]]. */ def props(settings: ClusterClientSettings): Props = - Props(new ClusterClient(settings)).withDeploy(Deploy.local).withMailbox("akka.cluster.client.mailbox") + Props(new ClusterClient(settings)).withDeploy(Deploy.local) @SerialVersionUID(1L) final case class Send(path: String, msg: Any, localAffinity: Boolean) { @@ -181,8 +192,17 @@ object ClusterClient { * * Use the factory method [[ClusterClient#props]]) to create the * [[akka.actor.Props]] for the actor. + * + * If the receptionist is not currently available, the client will buffer the messages + * and then deliver them when the connection to the receptionist has been established. + * The size of the buffer is configurable and it can be disabled by using a buffer size + * of 0. When the buffer is full old messages will be dropped when new messages are sent + * via the client. + * + * Note that this is a best effort implementation: messages can always be lost due to the distributed + * nature of the actors involved. */ -class ClusterClient(settings: ClusterClientSettings) extends Actor with Stash with ActorLogging { +final class ClusterClient(settings: ClusterClientSettings) extends Actor with ActorLogging { import ClusterClient._ import ClusterClient.Internal._ @@ -205,6 +225,8 @@ class ClusterClient(settings: ClusterClientSettings) extends Actor with Stash wi scheduleRefreshContactsTick(establishingGetContactsInterval) self ! RefreshContactsTick + val buffer = new java.util.LinkedList[(Any, ActorRef)] + def scheduleRefreshContactsTick(interval: FiniteDuration): Unit = { refreshContactsTask foreach { _.cancel() } refreshContactsTask = Some(context.system.scheduler.schedule( @@ -228,14 +250,19 @@ class ClusterClient(settings: ClusterClientSettings) extends Actor with Stash wi case ActorIdentity(_, Some(receptionist)) ⇒ log.info("Connected to [{}]", receptionist.path) scheduleRefreshContactsTick(refreshContactsInterval) - unstashAll() + sendBuffered(receptionist) context.become(active(receptionist)) failureDetector.heartbeat() case ActorIdentity(_, None) ⇒ // ok, use another instead case HeartbeatTick ⇒ failureDetector.heartbeat() case RefreshContactsTick ⇒ sendGetContacts() - case msg ⇒ stash() + case Send(path, msg, localAffinity) ⇒ + buffer(DistributedPubSubMediator.Send(path, msg, localAffinity)) + case SendToAll(path, msg) ⇒ + buffer(DistributedPubSubMediator.SendToAll(path, msg)) + case Publish(topic, msg) ⇒ + buffer(DistributedPubSubMediator.Publish(topic, msg)) } def active(receptionist: ActorRef): Actor.Receive = { @@ -270,6 +297,26 @@ class ClusterClient(settings: ClusterClientSettings) extends Actor with Stash wi else if (contacts.size == 1) (initialContactsSel ++ contacts) foreach { _ ! GetContacts } else contacts foreach { _ ! GetContacts } } + + def buffer(msg: Any): Unit = + if (settings.bufferSize == 0) + log.debug("Receptionist not available and buffering is disabled, dropping message [{}]", msg.getClass.getName) + else if (buffer.size == settings.bufferSize) { + val (m, _) = buffer.removeFirst() + log.debug("Receptionist not available, buffer is full, dropping first message [{}]", m.getClass.getName) + buffer.addLast((msg, sender())) + } else { + log.debug("Receptionist not available, buffering message type [{}]", msg.getClass.getName) + buffer.addLast((msg, sender())) + } + + def sendBuffered(receptionist: ActorRef): Unit = { + log.debug("Sending buffered messages to receptionist") + while (!buffer.isEmpty) { + val (msg, snd) = buffer.removeFirst() + receptionist.tell(msg, snd) + } + } } object ClusterClientReceptionist extends ExtensionId[ClusterClientReceptionist] with ExtensionIdProvider { @@ -286,7 +333,7 @@ object ClusterClientReceptionist extends ExtensionId[ClusterClientReceptionist] * with settings defined in config section `akka.cluster.client.receptionist`. * The [[akka.cluster.pubsub.DistributedPubSubMediator]] is started by the [[akka.cluster.pubsub.DistributedPubSub]] extension. */ -class ClusterClientReceptionist(system: ExtendedActorSystem) extends Extension { +final class ClusterClientReceptionist(system: ExtendedActorSystem) extends Extension { private val config = system.settings.config.getConfig("akka.cluster.client.receptionist") private val role: Option[String] = config.getString("role") match { @@ -479,8 +526,9 @@ object ClusterReceptionist { * The `sender` of the response messages, as seen by the client, is preserved * as the original sender, so the client can choose to send subsequent messages * directly to the actor in the cluster. + * */ -class ClusterReceptionist(pubSubMediator: ActorRef, settings: ClusterReceptionistSettings) +final class ClusterReceptionist(pubSubMediator: ActorRef, settings: ClusterReceptionistSettings) extends Actor with ActorLogging { import DistributedPubSubMediator.{ Send, SendToAll, Publish } diff --git a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst index 76da7c85b1..04d6d1f3e2 100644 --- a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst +++ b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst @@ -272,6 +272,10 @@ The parameters of the ``Props`` factory methods in the ``ClusterClient`` compani has been moved to settings object ``ClusterClientSettings``. This can be created from system configuration properties and also amended with API as needed. +The buffer size of the ``ClusterClient`` can be defined in the ``ClusterClientSettings`` +instead of defining ``stash-capacity`` of the mailbox. Buffering can be disabled by using a +buffer size of 0. + Normally, the ``ClusterReceptionist`` actor is started by the ``ClusterReceptionistExtension``. This extension has been renamed to ``ClusterClientReceptionist``. It is also possible to start it as an ordinary actor if you need multiple instances of it with different settings. diff --git a/akka-docs/rst/scala/cluster-client.rst b/akka-docs/rst/scala/cluster-client.rst index 89a870d252..e52ad9c5f2 100644 --- a/akka-docs/rst/scala/cluster-client.rst +++ b/akka-docs/rst/scala/cluster-client.rst @@ -54,13 +54,13 @@ directly to the actor in the cluster. While establishing a connection to a receptionist the ``ClusterClient`` will buffer messages and send them when the connection is established. If the buffer is full -the ``ClusterClient`` will throw ``akka.actor.StashOverflowException``, which can be -handled in by the supervision strategy of the parent actor. The size of the buffer -can be configured by the following ``stash-capacity`` setting of the mailbox that is -used by the ``ClusterClient`` actor. - -.. includecode:: ../../../akka-cluster-tools/src/main/resources/reference.conf#cluster-client-mailbox-config +the ``ClusterClient`` will drop old messages when new messages are sent via the client. +The size of the buffer is configurable and it can be disabled by using a buffer size of 0. +It's worth noting that messages can always be lost because of the distributed nature +of these actors. As always, additional logic should be implemented in the destination +(acknowledgement) and in the client (retry) actors to ensure at-least-once message delivery. + An Example ----------