diff --git a/.gitignore b/.gitignore index 233c9eb850..2d8c08866b 100755 --- a/.gitignore +++ b/.gitignore @@ -27,6 +27,7 @@ .scalastyle .settings .cache* +.tmpBin .tags .tags_sorted_by_file .target diff --git a/akka-cluster-tools/src/main/resources/reference.conf b/akka-cluster-tools/src/main/resources/reference.conf index f9cf58b127..432940115a 100644 --- a/akka-cluster-tools/src/main/resources/reference.conf +++ b/akka-cluster-tools/src/main/resources/reference.conf @@ -143,5 +143,14 @@ akka.cluster.singleton-proxy { role = "" # Interval at which the proxy will try to resolve the singleton instance. - singleton-identification-interval = 1s + singleton-identification-interval = 1s + + # If the location of the singleton is unknown the proxy will buffer this + # number of messages and deliver them when the singleton is identified. + # When the buffer is full old messages will be dropped when new messages are + # sent via the proxy. + # Use 0 to disable buffering, i.e. messages will be dropped immediately if + # the location of the singleton is unknown. + # Maximum allowed buffer size is 10000. + buffer-size = 1000 } diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonProxy.scala b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonProxy.scala index 0a5564ea77..15fea1f68c 100644 --- a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonProxy.scala +++ b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonProxy.scala @@ -34,7 +34,8 @@ object ClusterSingletonProxySettings { def apply(config: Config): ClusterSingletonProxySettings = new ClusterSingletonProxySettings( role = roleOption(config.getString("role")), - singletonIdentificationInterval = config.getDuration("singleton-identification-interval", MILLISECONDS).millis) + singletonIdentificationInterval = config.getDuration("singleton-identification-interval", MILLISECONDS).millis, + bufferSize = config.getInt("buffer-size")) /** * Java API: Create settings from the default configuration @@ -59,10 +60,17 @@ object ClusterSingletonProxySettings { /** * @param role The role of the cluster nodes where the singleton can be deployed. If None, then any node will do. * @param singletonIdentificationInterval Interval at which the proxy will try to resolve the singleton instance. + * @param bufferSize If the location of the singleton is unknown the proxy will buffer this number of messages + * and deliver them when the singleton is identified. When the buffer is full old messages will be dropped + * when new messages are sent viea the proxy. Use 0 to disable buffering, i.e. messages will be dropped + * immediately if the location of the singleton is unknown. */ final class ClusterSingletonProxySettings( val role: Option[String], - val singletonIdentificationInterval: FiniteDuration) extends NoSerializationVerificationNeeded { + val singletonIdentificationInterval: FiniteDuration, + val bufferSize: Int) extends NoSerializationVerificationNeeded { + + require(bufferSize >= 0 && bufferSize <= 10000, "bufferSize must be >= 0 and <= 10000") def withRole(role: String): ClusterSingletonProxySettings = copy(role = ClusterSingletonProxySettings.roleOption(role)) @@ -71,9 +79,13 @@ final class ClusterSingletonProxySettings( def withSingletonIdentificationInterval(singletonIdentificationInterval: FiniteDuration): ClusterSingletonProxySettings = copy(singletonIdentificationInterval = singletonIdentificationInterval) + def withBufferSize(bufferSize: Int): ClusterSingletonProxySettings = + copy(bufferSize = bufferSize) + private def copy(role: Option[String] = role, - singletonIdentificationInterval: FiniteDuration = singletonIdentificationInterval): ClusterSingletonProxySettings = - new ClusterSingletonProxySettings(role, singletonIdentificationInterval) + singletonIdentificationInterval: FiniteDuration = singletonIdentificationInterval, + bufferSize: Int = bufferSize): ClusterSingletonProxySettings = + new ClusterSingletonProxySettings(role, singletonIdentificationInterval, bufferSize) } object ClusterSingletonProxy { @@ -96,11 +108,11 @@ object ClusterSingletonProxy { * * The proxy can be started on every node where the singleton needs to be reached and used as if it were the singleton * itself. It will then act as a router to the currently running singleton instance. If the singleton is not currently - * available, e.g., during hand off or startup, the proxy will stash the messages sent to the singleton and then unstash - * them when the singleton is finally available. The proxy mixes in the [[akka.actor.Stash]] trait, so it can be - * configured accordingly. + * available, e.g., during hand off or startup, the proxy will buffer the messages sent to the singleton and then deliver + * them when the singleton is finally available. The size of the buffer is configurable and it can be disabled by using + * a buffer size of 0. When the buffer is full old messages will be dropped when new messages are sent via the proxy. * - * The proxy works by keeping track of the oldest cluster member. When a new oldest member is identified, e.g., because + * The proxy works by keeping track of the oldest cluster member. When a new oldest member is identified, e.g. because * the older one left the cluster, or at startup, the proxy will try to identify the singleton on the oldest member by * periodically sending an [[akka.actor.Identify]] message until the singleton responds with its * [[akka.actor.ActorIdentity]]. @@ -108,7 +120,7 @@ object ClusterSingletonProxy { * Note that this is a best effort implementation: messages can always be lost due to the distributed nature of the * actors involved. */ -class ClusterSingletonProxy(singletonPathString: String, settings: ClusterSingletonProxySettings) extends Actor with Stash with ActorLogging { +final class ClusterSingletonProxy(singletonPathString: String, settings: ClusterSingletonProxySettings) extends Actor with ActorLogging { import settings._ val singletonPath = singletonPathString.split("/") var identifyCounter = 0 @@ -124,6 +136,8 @@ class ClusterSingletonProxy(singletonPathString: String, settings: ClusterSingle } var membersByAge: immutable.SortedSet[Member] = immutable.SortedSet.empty(ageOrdering) + var buffer = new java.util.LinkedList[(Any, ActorRef)] + // subscribe to MemberEvent, re-subscribe when restart override def preStart(): Unit = { cancelTimer() @@ -206,11 +220,12 @@ class ClusterSingletonProxy(singletonPathString: String, settings: ClusterSingle // singleton identification logic case ActorIdentity(identifyId, Some(s)) ⇒ - // if the new singleton is defined, unstash all messages + // if the new singleton is defined, deliver all buffered messages log.info("Singleton identified: {}", s.path) singleton = Some(s) + context.watch(s) cancelTimer() - unstashAll() + sendBuffered() case _: ActorIdentity ⇒ // do nothing case ClusterSingletonProxy.TryToIdentifySingleton if identifyTimer.isDefined ⇒ membersByAge.headOption.foreach { @@ -219,16 +234,41 @@ class ClusterSingletonProxy(singletonPathString: String, settings: ClusterSingle log.debug("Trying to identify singleton at {}", singletonAddress) context.actorSelection(singletonAddress) ! Identify(identifyId) } + case Terminated(ref) ⇒ + if (singleton.exists(_ == ref)) { + // buffering mode, identification of new will start when old node is removed + singleton = None + } // forwarding/stashing logic case msg: Any ⇒ singleton match { case Some(s) ⇒ - log.debug("Forwarding message to current singleton instance {}", msg) + log.debug("Forwarding message type [{}] to current singleton instance", msg.getClass.getName) s forward msg case None ⇒ - log.debug("No singleton available, stashing message {}", msg) - stash() + buffer(msg) } } + + def buffer(msg: Any): Unit = + if (settings.bufferSize == 0) + log.debug("Singleton not available and buffering is disabled, dropping message [{}]", msg.getClass.getName) + else if (buffer.size == settings.bufferSize) { + val (m, _) = buffer.removeFirst() + log.debug("Singleton not available, buffer is full, dropping first message [{}]", m.getClass.getName) + buffer.addLast((msg, sender())) + } else { + log.debug("Singleton not available, buffering message type [{}]", msg.getClass.getName) + buffer.addLast((msg, sender())) + } + + def sendBuffered(): Unit = { + log.debug("Sending buffered messages to current singleton instance") + val target = singleton.get + while (!buffer.isEmpty) { + val (msg, snd) = buffer.removeFirst() + target.tell(msg, snd) + } + } } diff --git a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst index 1d150d9703..76da7c85b1 100644 --- a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst +++ b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst @@ -251,6 +251,10 @@ Parameters to the ``Props`` factory methods have been moved to settings object ` and ``ClusterSingletonProxySettings``. These can be created from system configuration properties and also amended with API as needed. +The buffer size of the ``ClusterSingletonProxy`` can be defined in the ``ClusterSingletonProxySettings`` +instead of defining ``stash-capacity`` of the mailbox. Buffering can be disabled by using a +buffer size of 0. + DistributedPubSub construction ============================== diff --git a/akka-docs/rst/scala/cluster-singleton.rst b/akka-docs/rst/scala/cluster-singleton.rst index ab027d55b5..ea4ba7a356 100644 --- a/akka-docs/rst/scala/cluster-singleton.rst +++ b/akka-docs/rst/scala/cluster-singleton.rst @@ -44,10 +44,14 @@ the oldest node in the cluster and resolve the singleton's ``ActorRef`` by expli singleton's ``actorSelection`` the ``akka.actor.Identify`` message and waiting for it to reply. This is performed periodically if the singleton doesn't reply within a certain (configurable) time. Given the implementation, there might be periods of time during which the ``ActorRef`` is unavailable, -e.g., when a node leaves the cluster. In these cases, the proxy will stash away all messages until it -is able to identify the singleton. It's worth noting that messages can always be lost because of the -distributed nature of these actors. As always, additional logic should be implemented in the singleton -(acknowledgement) and in the client (retry) actors to ensure at-least-once message delivery. +e.g., when a node leaves the cluster. In these cases, the proxy will buffer the messages sent to the +singleton and then deliver them when the singleton is finally available. If the buffer is full +the ``ClusterSingletonProxy`` will drop old messages when new messages are sent via the proxy. +The size of the buffer is configurable and it can be disabled by using a buffer size of 0. + +It's worth noting that messages can always be lost because of the distributed nature of these actors. +As always, additional logic should be implemented in the singleton (acknowledgement) and in the +client (retry) actors to ensure at-least-once message delivery. Potential problems to be aware of ---------------------------------