From 11972b449725188aca21a8935e04afb53efd61a5 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 21 Oct 2013 12:00:38 +0200 Subject: [PATCH] +con #3677 Limit delta in DistributedPubSubMediator --- .../src/main/resources/reference.conf | 5 ++ .../akka/contrib/pattern/ClusterClient.scala | 7 ++- .../pattern/DistributedPubSubMediator.scala | 30 ++++++++---- .../DistributedPubSubMediatorSpec.scala | 46 +++++++++++++++++++ 4 files changed, 79 insertions(+), 9 deletions(-) diff --git a/akka-contrib/src/main/resources/reference.conf b/akka-contrib/src/main/resources/reference.conf index 4c9aa46d83..6a34eeaeeb 100644 --- a/akka-contrib/src/main/resources/reference.conf +++ b/akka-contrib/src/main/resources/reference.conf @@ -24,6 +24,11 @@ akka.contrib.cluster.pub-sub { # Removed entries are pruned after this duration removed-time-to-live = 120s + + # Maximum number of elements to transfer in one message when synchronizing the registries. + # Next chunk will be transferred in next round of gossip. + max-delta-elements = 500 + } # //#pub-sub-ext-config diff --git a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterClient.scala b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterClient.scala index 0850b4da87..f4c023aca5 100644 --- a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterClient.scala +++ b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterClient.scala @@ -63,7 +63,12 @@ object ClusterClient { } @SerialVersionUID(1L) - case class Send(path: String, msg: Any, localAffinity: Boolean) + case class Send(path: String, msg: Any, localAffinity: Boolean) { + /** + * Convenience constructor with `localAffinity` false + */ + def this(path: String, msg: Any) = this(path, msg, localAffinity = false) + } @SerialVersionUID(1L) case class SendToAll(path: String, msg: Any) @SerialVersionUID(1L) diff --git a/akka-contrib/src/main/scala/akka/contrib/pattern/DistributedPubSubMediator.scala b/akka-contrib/src/main/scala/akka/contrib/pattern/DistributedPubSubMediator.scala index c45863cd9a..d30360dec2 100644 --- a/akka-contrib/src/main/scala/akka/contrib/pattern/DistributedPubSubMediator.scala +++ b/akka-contrib/src/main/scala/akka/contrib/pattern/DistributedPubSubMediator.scala @@ -42,8 +42,9 @@ object DistributedPubSubMediator { role: Option[String], routingLogic: RoutingLogic = RandomRoutingLogic(), gossipInterval: FiniteDuration = 1.second, - removedTimeToLive: FiniteDuration = 2.minutes): Props = - Props(classOf[DistributedPubSubMediator], role, routingLogic, gossipInterval, removedTimeToLive) + removedTimeToLive: FiniteDuration = 2.minutes, + maxDeltaElements: Int = 500): Props = + Props(classOf[DistributedPubSubMediator], role, routingLogic, gossipInterval, removedTimeToLive, maxDeltaElements) /** * Java API: Factory method for `DistributedPubSubMediator` [[akka.actor.Props]]. @@ -52,8 +53,9 @@ object DistributedPubSubMediator { role: String, routingLogic: RoutingLogic, gossipInterval: FiniteDuration, - removedTimeToLive: FiniteDuration): Props = - props(Internal.roleOption(role), routingLogic, gossipInterval, removedTimeToLive) + removedTimeToLive: FiniteDuration, + maxDeltaElements: Int): Props = + props(Internal.roleOption(role), routingLogic, gossipInterval, removedTimeToLive, maxDeltaElements) /** * Java API: Factory method for `DistributedPubSubMediator` [[akka.actor.Props]] @@ -213,7 +215,8 @@ class DistributedPubSubMediator( role: Option[String], routingLogic: RoutingLogic, gossipInterval: FiniteDuration, - removedTimeToLive: FiniteDuration) + removedTimeToLive: FiniteDuration, + maxDeltaElements: Int) extends Actor with ActorLogging { import DistributedPubSubMediator._ @@ -404,13 +407,22 @@ class DistributedPubSubMediator( def collectDelta(otherVersions: Map[Address, Long]): immutable.Iterable[Bucket] = { // missing entries are represented by version 0 val filledOtherVersions = myVersions.map { case (k, _) ⇒ k -> 0L } ++ otherVersions + var count = 0 filledOtherVersions.collect { - case (owner, v) if registry(owner).version > v ⇒ + case (owner, v) if registry(owner).version > v && count < maxDeltaElements ⇒ val bucket = registry(owner) val deltaContent = bucket.content.filter { case (_, value) ⇒ value.version > v } - bucket.copy(content = deltaContent) + count += deltaContent.size + if (count <= maxDeltaElements) + bucket.copy(content = deltaContent) + else { + // exceeded the maxDeltaElements, pick the elements with lowest versions + val sortedContent = deltaContent.toVector.sortBy(_._2.version) + val chunk = sortedContent.take(maxDeltaElements - (count - sortedContent.size)) + bucket.copy(content = chunk.toMap, version = chunk.last._2.version) + } } } @@ -482,11 +494,13 @@ class DistributedPubSubExtension(system: ExtendedActorSystem) extends Extension case "round-robin" ⇒ RoundRobinRoutingLogic() case "consistent-hashing" ⇒ ConsistentHashingRoutingLogic(system) case "broadcast" ⇒ BroadcastRoutingLogic() + case other ⇒ throw new IllegalArgumentException(s"Unknown 'routing-logic': [$other]") } val gossipInterval = Duration(config.getMilliseconds("gossip-interval"), MILLISECONDS) val removedTimeToLive = Duration(config.getMilliseconds("removed-time-to-live"), MILLISECONDS) + val maxDeltaElements = config.getInt("max-delta-elements") val name = config.getString("name") - system.actorOf(DistributedPubSubMediator.props(role, routingLogic, gossipInterval, removedTimeToLive), + system.actorOf(DistributedPubSubMediator.props(role, routingLogic, gossipInterval, removedTimeToLive, maxDeltaElements), name) } } diff --git a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/DistributedPubSubMediatorSpec.scala b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/DistributedPubSubMediatorSpec.scala index ea2d07b5ea..8072e27cab 100644 --- a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/DistributedPubSubMediatorSpec.scala +++ b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/DistributedPubSubMediatorSpec.scala @@ -18,6 +18,8 @@ import akka.remote.testkit.MultiNodeSpec import akka.remote.testkit.STMultiNodeSpec import akka.testkit._ import akka.actor.ActorLogging +import akka.contrib.pattern.DistributedPubSubMediator.Internal.Status +import akka.contrib.pattern.DistributedPubSubMediator.Internal.Delta object DistributedPubSubMediatorSpec extends MultiNodeConfig { val first = role("first") @@ -29,6 +31,8 @@ object DistributedPubSubMediatorSpec extends MultiNodeConfig { akka.actor.provider = "akka.cluster.ClusterActorRefProvider" akka.remote.log-remote-lifecycle-events = off akka.cluster.auto-down-unreachable-after = 0s + akka.contrib.cluster.pub-sub.max-delta-elements = 500 + #akka.remote.log-frame-size-exceeding = 1024b """)) object TestChatUser { @@ -341,5 +345,47 @@ class DistributedPubSubMediatorSpec extends MultiNodeSpec(DistributedPubSubMedia enterBarrier("after-11") } + + "transfer delta correctly" in { + val firstAddress = node(first).address + val secondAddress = node(second).address + val thirdAddress = node(third).address + + runOn(first) { + mediator ! Status(versions = Map.empty) + val deltaBuckets = expectMsgType[Delta].buckets + deltaBuckets.size must be (3) + deltaBuckets.find(_.owner == firstAddress).get.content.size must be(7) + deltaBuckets.find(_.owner == secondAddress).get.content.size must be(6) + deltaBuckets.find(_.owner == thirdAddress).get.content.size must be(2) + } + enterBarrier("verified-initial-delta") + + // this test is configured with max-delta-elements = 500 + val many = 1010 + runOn(first) { + for (i <- 0 until many) + mediator ! Put(createChatUser("u" + (1000 + i))) + + mediator ! Status(versions = Map.empty) + val deltaBuckets1 = expectMsgType[Delta].buckets + deltaBuckets1.map(_.content.size).sum must be (500) + + mediator ! Status(versions = deltaBuckets1.map(b => b.owner -> b.version).toMap) + val deltaBuckets2 = expectMsgType[Delta].buckets + deltaBuckets1.map(_.content.size).sum must be (500) + + mediator ! Status(versions = deltaBuckets2.map(b => b.owner -> b.version).toMap) + val deltaBuckets3 = expectMsgType[Delta].buckets + + deltaBuckets3.map(_.content.size).sum must be (7 + 6 + 2 + many - 500 - 500) + } + + + enterBarrier("verified-delta-with-many") + awaitCount(13 + many) + + enterBarrier("after-12") + } } }