Merge pull request #1792 from akka/wip-3677-limit-delta-patriknw
+con #3677 Limit delta in DistributedPubSubMediator
This commit is contained in:
commit
6c82e3f7b2
4 changed files with 79 additions and 9 deletions
|
|
@ -24,6 +24,11 @@ akka.contrib.cluster.pub-sub {
|
|||
|
||||
# Removed entries are pruned after this duration
|
||||
removed-time-to-live = 120s
|
||||
|
||||
# Maximum number of elements to transfer in one message when synchronizing the registries.
|
||||
# Next chunk will be transferred in next round of gossip.
|
||||
max-delta-elements = 500
|
||||
|
||||
}
|
||||
# //#pub-sub-ext-config
|
||||
|
||||
|
|
|
|||
|
|
@ -63,7 +63,12 @@ object ClusterClient {
|
|||
}
|
||||
|
||||
@SerialVersionUID(1L)
|
||||
case class Send(path: String, msg: Any, localAffinity: Boolean)
|
||||
case class Send(path: String, msg: Any, localAffinity: Boolean) {
|
||||
/**
|
||||
* Convenience constructor with `localAffinity` false
|
||||
*/
|
||||
def this(path: String, msg: Any) = this(path, msg, localAffinity = false)
|
||||
}
|
||||
@SerialVersionUID(1L)
|
||||
case class SendToAll(path: String, msg: Any)
|
||||
@SerialVersionUID(1L)
|
||||
|
|
|
|||
|
|
@ -42,8 +42,9 @@ object DistributedPubSubMediator {
|
|||
role: Option[String],
|
||||
routingLogic: RoutingLogic = RandomRoutingLogic(),
|
||||
gossipInterval: FiniteDuration = 1.second,
|
||||
removedTimeToLive: FiniteDuration = 2.minutes): Props =
|
||||
Props(classOf[DistributedPubSubMediator], role, routingLogic, gossipInterval, removedTimeToLive)
|
||||
removedTimeToLive: FiniteDuration = 2.minutes,
|
||||
maxDeltaElements: Int = 500): Props =
|
||||
Props(classOf[DistributedPubSubMediator], role, routingLogic, gossipInterval, removedTimeToLive, maxDeltaElements)
|
||||
|
||||
/**
|
||||
* Java API: Factory method for `DistributedPubSubMediator` [[akka.actor.Props]].
|
||||
|
|
@ -52,8 +53,9 @@ object DistributedPubSubMediator {
|
|||
role: String,
|
||||
routingLogic: RoutingLogic,
|
||||
gossipInterval: FiniteDuration,
|
||||
removedTimeToLive: FiniteDuration): Props =
|
||||
props(Internal.roleOption(role), routingLogic, gossipInterval, removedTimeToLive)
|
||||
removedTimeToLive: FiniteDuration,
|
||||
maxDeltaElements: Int): Props =
|
||||
props(Internal.roleOption(role), routingLogic, gossipInterval, removedTimeToLive, maxDeltaElements)
|
||||
|
||||
/**
|
||||
* Java API: Factory method for `DistributedPubSubMediator` [[akka.actor.Props]]
|
||||
|
|
@ -213,7 +215,8 @@ class DistributedPubSubMediator(
|
|||
role: Option[String],
|
||||
routingLogic: RoutingLogic,
|
||||
gossipInterval: FiniteDuration,
|
||||
removedTimeToLive: FiniteDuration)
|
||||
removedTimeToLive: FiniteDuration,
|
||||
maxDeltaElements: Int)
|
||||
extends Actor with ActorLogging {
|
||||
|
||||
import DistributedPubSubMediator._
|
||||
|
|
@ -404,13 +407,22 @@ class DistributedPubSubMediator(
|
|||
def collectDelta(otherVersions: Map[Address, Long]): immutable.Iterable[Bucket] = {
|
||||
// missing entries are represented by version 0
|
||||
val filledOtherVersions = myVersions.map { case (k, _) ⇒ k -> 0L } ++ otherVersions
|
||||
var count = 0
|
||||
filledOtherVersions.collect {
|
||||
case (owner, v) if registry(owner).version > v ⇒
|
||||
case (owner, v) if registry(owner).version > v && count < maxDeltaElements ⇒
|
||||
val bucket = registry(owner)
|
||||
val deltaContent = bucket.content.filter {
|
||||
case (_, value) ⇒ value.version > v
|
||||
}
|
||||
bucket.copy(content = deltaContent)
|
||||
count += deltaContent.size
|
||||
if (count <= maxDeltaElements)
|
||||
bucket.copy(content = deltaContent)
|
||||
else {
|
||||
// exceeded the maxDeltaElements, pick the elements with lowest versions
|
||||
val sortedContent = deltaContent.toVector.sortBy(_._2.version)
|
||||
val chunk = sortedContent.take(maxDeltaElements - (count - sortedContent.size))
|
||||
bucket.copy(content = chunk.toMap, version = chunk.last._2.version)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -482,11 +494,13 @@ class DistributedPubSubExtension(system: ExtendedActorSystem) extends Extension
|
|||
case "round-robin" ⇒ RoundRobinRoutingLogic()
|
||||
case "consistent-hashing" ⇒ ConsistentHashingRoutingLogic(system)
|
||||
case "broadcast" ⇒ BroadcastRoutingLogic()
|
||||
case other ⇒ throw new IllegalArgumentException(s"Unknown 'routing-logic': [$other]")
|
||||
}
|
||||
val gossipInterval = Duration(config.getMilliseconds("gossip-interval"), MILLISECONDS)
|
||||
val removedTimeToLive = Duration(config.getMilliseconds("removed-time-to-live"), MILLISECONDS)
|
||||
val maxDeltaElements = config.getInt("max-delta-elements")
|
||||
val name = config.getString("name")
|
||||
system.actorOf(DistributedPubSubMediator.props(role, routingLogic, gossipInterval, removedTimeToLive),
|
||||
system.actorOf(DistributedPubSubMediator.props(role, routingLogic, gossipInterval, removedTimeToLive, maxDeltaElements),
|
||||
name)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -18,6 +18,8 @@ import akka.remote.testkit.MultiNodeSpec
|
|||
import akka.remote.testkit.STMultiNodeSpec
|
||||
import akka.testkit._
|
||||
import akka.actor.ActorLogging
|
||||
import akka.contrib.pattern.DistributedPubSubMediator.Internal.Status
|
||||
import akka.contrib.pattern.DistributedPubSubMediator.Internal.Delta
|
||||
|
||||
object DistributedPubSubMediatorSpec extends MultiNodeConfig {
|
||||
val first = role("first")
|
||||
|
|
@ -29,6 +31,8 @@ object DistributedPubSubMediatorSpec extends MultiNodeConfig {
|
|||
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
|
||||
akka.remote.log-remote-lifecycle-events = off
|
||||
akka.cluster.auto-down-unreachable-after = 0s
|
||||
akka.contrib.cluster.pub-sub.max-delta-elements = 500
|
||||
#akka.remote.log-frame-size-exceeding = 1024b
|
||||
"""))
|
||||
|
||||
object TestChatUser {
|
||||
|
|
@ -341,5 +345,47 @@ class DistributedPubSubMediatorSpec extends MultiNodeSpec(DistributedPubSubMedia
|
|||
|
||||
enterBarrier("after-11")
|
||||
}
|
||||
|
||||
"transfer delta correctly" in {
|
||||
val firstAddress = node(first).address
|
||||
val secondAddress = node(second).address
|
||||
val thirdAddress = node(third).address
|
||||
|
||||
runOn(first) {
|
||||
mediator ! Status(versions = Map.empty)
|
||||
val deltaBuckets = expectMsgType[Delta].buckets
|
||||
deltaBuckets.size must be (3)
|
||||
deltaBuckets.find(_.owner == firstAddress).get.content.size must be(7)
|
||||
deltaBuckets.find(_.owner == secondAddress).get.content.size must be(6)
|
||||
deltaBuckets.find(_.owner == thirdAddress).get.content.size must be(2)
|
||||
}
|
||||
enterBarrier("verified-initial-delta")
|
||||
|
||||
// this test is configured with max-delta-elements = 500
|
||||
val many = 1010
|
||||
runOn(first) {
|
||||
for (i <- 0 until many)
|
||||
mediator ! Put(createChatUser("u" + (1000 + i)))
|
||||
|
||||
mediator ! Status(versions = Map.empty)
|
||||
val deltaBuckets1 = expectMsgType[Delta].buckets
|
||||
deltaBuckets1.map(_.content.size).sum must be (500)
|
||||
|
||||
mediator ! Status(versions = deltaBuckets1.map(b => b.owner -> b.version).toMap)
|
||||
val deltaBuckets2 = expectMsgType[Delta].buckets
|
||||
deltaBuckets1.map(_.content.size).sum must be (500)
|
||||
|
||||
mediator ! Status(versions = deltaBuckets2.map(b => b.owner -> b.version).toMap)
|
||||
val deltaBuckets3 = expectMsgType[Delta].buckets
|
||||
|
||||
deltaBuckets3.map(_.content.size).sum must be (7 + 6 + 2 + many - 500 - 500)
|
||||
}
|
||||
|
||||
|
||||
enterBarrier("verified-delta-with-many")
|
||||
awaitCount(13 + many)
|
||||
|
||||
enterBarrier("after-12")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue