+con #3677 Limit delta in DistributedPubSubMediator

This commit is contained in:
Patrik Nordwall 2013-10-21 12:00:38 +02:00
parent ff83edea0b
commit 11972b4497
4 changed files with 79 additions and 9 deletions

View file

@ -24,6 +24,11 @@ akka.contrib.cluster.pub-sub {
# Removed entries are pruned after this duration # Removed entries are pruned after this duration
removed-time-to-live = 120s removed-time-to-live = 120s
# Maximum number of elements to transfer in one message when synchronizing the registries.
# Next chunk will be transferred in next round of gossip.
max-delta-elements = 500
} }
# //#pub-sub-ext-config # //#pub-sub-ext-config

View file

@ -63,7 +63,12 @@ object ClusterClient {
} }
@SerialVersionUID(1L) @SerialVersionUID(1L)
case class Send(path: String, msg: Any, localAffinity: Boolean) case class Send(path: String, msg: Any, localAffinity: Boolean) {
/**
* Convenience constructor with `localAffinity` false
*/
def this(path: String, msg: Any) = this(path, msg, localAffinity = false)
}
@SerialVersionUID(1L) @SerialVersionUID(1L)
case class SendToAll(path: String, msg: Any) case class SendToAll(path: String, msg: Any)
@SerialVersionUID(1L) @SerialVersionUID(1L)

View file

@ -42,8 +42,9 @@ object DistributedPubSubMediator {
role: Option[String], role: Option[String],
routingLogic: RoutingLogic = RandomRoutingLogic(), routingLogic: RoutingLogic = RandomRoutingLogic(),
gossipInterval: FiniteDuration = 1.second, gossipInterval: FiniteDuration = 1.second,
removedTimeToLive: FiniteDuration = 2.minutes): Props = removedTimeToLive: FiniteDuration = 2.minutes,
Props(classOf[DistributedPubSubMediator], role, routingLogic, gossipInterval, removedTimeToLive) maxDeltaElements: Int = 500): Props =
Props(classOf[DistributedPubSubMediator], role, routingLogic, gossipInterval, removedTimeToLive, maxDeltaElements)
/** /**
* Java API: Factory method for `DistributedPubSubMediator` [[akka.actor.Props]]. * Java API: Factory method for `DistributedPubSubMediator` [[akka.actor.Props]].
@ -52,8 +53,9 @@ object DistributedPubSubMediator {
role: String, role: String,
routingLogic: RoutingLogic, routingLogic: RoutingLogic,
gossipInterval: FiniteDuration, gossipInterval: FiniteDuration,
removedTimeToLive: FiniteDuration): Props = removedTimeToLive: FiniteDuration,
props(Internal.roleOption(role), routingLogic, gossipInterval, removedTimeToLive) maxDeltaElements: Int): Props =
props(Internal.roleOption(role), routingLogic, gossipInterval, removedTimeToLive, maxDeltaElements)
/** /**
* Java API: Factory method for `DistributedPubSubMediator` [[akka.actor.Props]] * Java API: Factory method for `DistributedPubSubMediator` [[akka.actor.Props]]
@ -213,7 +215,8 @@ class DistributedPubSubMediator(
role: Option[String], role: Option[String],
routingLogic: RoutingLogic, routingLogic: RoutingLogic,
gossipInterval: FiniteDuration, gossipInterval: FiniteDuration,
removedTimeToLive: FiniteDuration) removedTimeToLive: FiniteDuration,
maxDeltaElements: Int)
extends Actor with ActorLogging { extends Actor with ActorLogging {
import DistributedPubSubMediator._ import DistributedPubSubMediator._
@ -404,13 +407,22 @@ class DistributedPubSubMediator(
def collectDelta(otherVersions: Map[Address, Long]): immutable.Iterable[Bucket] = { def collectDelta(otherVersions: Map[Address, Long]): immutable.Iterable[Bucket] = {
// missing entries are represented by version 0 // missing entries are represented by version 0
val filledOtherVersions = myVersions.map { case (k, _) k -> 0L } ++ otherVersions val filledOtherVersions = myVersions.map { case (k, _) k -> 0L } ++ otherVersions
var count = 0
filledOtherVersions.collect { filledOtherVersions.collect {
case (owner, v) if registry(owner).version > v case (owner, v) if registry(owner).version > v && count < maxDeltaElements
val bucket = registry(owner) val bucket = registry(owner)
val deltaContent = bucket.content.filter { val deltaContent = bucket.content.filter {
case (_, value) value.version > v case (_, value) value.version > v
} }
bucket.copy(content = deltaContent) count += deltaContent.size
if (count <= maxDeltaElements)
bucket.copy(content = deltaContent)
else {
// exceeded the maxDeltaElements, pick the elements with lowest versions
val sortedContent = deltaContent.toVector.sortBy(_._2.version)
val chunk = sortedContent.take(maxDeltaElements - (count - sortedContent.size))
bucket.copy(content = chunk.toMap, version = chunk.last._2.version)
}
} }
} }
@ -482,11 +494,13 @@ class DistributedPubSubExtension(system: ExtendedActorSystem) extends Extension
case "round-robin" RoundRobinRoutingLogic() case "round-robin" RoundRobinRoutingLogic()
case "consistent-hashing" ConsistentHashingRoutingLogic(system) case "consistent-hashing" ConsistentHashingRoutingLogic(system)
case "broadcast" BroadcastRoutingLogic() case "broadcast" BroadcastRoutingLogic()
case other throw new IllegalArgumentException(s"Unknown 'routing-logic': [$other]")
} }
val gossipInterval = Duration(config.getMilliseconds("gossip-interval"), MILLISECONDS) val gossipInterval = Duration(config.getMilliseconds("gossip-interval"), MILLISECONDS)
val removedTimeToLive = Duration(config.getMilliseconds("removed-time-to-live"), MILLISECONDS) val removedTimeToLive = Duration(config.getMilliseconds("removed-time-to-live"), MILLISECONDS)
val maxDeltaElements = config.getInt("max-delta-elements")
val name = config.getString("name") val name = config.getString("name")
system.actorOf(DistributedPubSubMediator.props(role, routingLogic, gossipInterval, removedTimeToLive), system.actorOf(DistributedPubSubMediator.props(role, routingLogic, gossipInterval, removedTimeToLive, maxDeltaElements),
name) name)
} }
} }

View file

@ -18,6 +18,8 @@ import akka.remote.testkit.MultiNodeSpec
import akka.remote.testkit.STMultiNodeSpec import akka.remote.testkit.STMultiNodeSpec
import akka.testkit._ import akka.testkit._
import akka.actor.ActorLogging import akka.actor.ActorLogging
import akka.contrib.pattern.DistributedPubSubMediator.Internal.Status
import akka.contrib.pattern.DistributedPubSubMediator.Internal.Delta
object DistributedPubSubMediatorSpec extends MultiNodeConfig { object DistributedPubSubMediatorSpec extends MultiNodeConfig {
val first = role("first") val first = role("first")
@ -29,6 +31,8 @@ object DistributedPubSubMediatorSpec extends MultiNodeConfig {
akka.actor.provider = "akka.cluster.ClusterActorRefProvider" akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote.log-remote-lifecycle-events = off akka.remote.log-remote-lifecycle-events = off
akka.cluster.auto-down-unreachable-after = 0s akka.cluster.auto-down-unreachable-after = 0s
akka.contrib.cluster.pub-sub.max-delta-elements = 500
#akka.remote.log-frame-size-exceeding = 1024b
""")) """))
object TestChatUser { object TestChatUser {
@ -341,5 +345,47 @@ class DistributedPubSubMediatorSpec extends MultiNodeSpec(DistributedPubSubMedia
enterBarrier("after-11") enterBarrier("after-11")
} }
"transfer delta correctly" in {
val firstAddress = node(first).address
val secondAddress = node(second).address
val thirdAddress = node(third).address
runOn(first) {
mediator ! Status(versions = Map.empty)
val deltaBuckets = expectMsgType[Delta].buckets
deltaBuckets.size must be (3)
deltaBuckets.find(_.owner == firstAddress).get.content.size must be(7)
deltaBuckets.find(_.owner == secondAddress).get.content.size must be(6)
deltaBuckets.find(_.owner == thirdAddress).get.content.size must be(2)
}
enterBarrier("verified-initial-delta")
// this test is configured with max-delta-elements = 500
val many = 1010
runOn(first) {
for (i <- 0 until many)
mediator ! Put(createChatUser("u" + (1000 + i)))
mediator ! Status(versions = Map.empty)
val deltaBuckets1 = expectMsgType[Delta].buckets
deltaBuckets1.map(_.content.size).sum must be (500)
mediator ! Status(versions = deltaBuckets1.map(b => b.owner -> b.version).toMap)
val deltaBuckets2 = expectMsgType[Delta].buckets
deltaBuckets1.map(_.content.size).sum must be (500)
mediator ! Status(versions = deltaBuckets2.map(b => b.owner -> b.version).toMap)
val deltaBuckets3 = expectMsgType[Delta].buckets
deltaBuckets3.map(_.content.size).sum must be (7 + 6 + 2 + many - 500 - 500)
}
enterBarrier("verified-delta-with-many")
awaitCount(13 + many)
enterBarrier("after-12")
}
} }
} }