Merge pull request #1995 from akka/wip-3865-remove-race-pubsub-patriknw
=con #3865 Fix race in pub-sub when nodes are removed
This commit is contained in:
commit
094b2a50d9
2 changed files with 27 additions and 5 deletions
|
|
@ -337,11 +337,15 @@ class DistributedPubSubMediator(
|
|||
case Delta(buckets) ⇒
|
||||
// reply from Status message in the gossip chat
|
||||
// the Delta contains potential updates (newer versions) from the other node
|
||||
if (nodes.contains(sender().path.address)) {
|
||||
// only accept deltas/buckets from known nodes, otherwise there is a risk of
|
||||
// adding back entries when nodes are removed
|
||||
if (nodes(sender().path.address)) {
|
||||
buckets foreach { b ⇒
|
||||
val myBucket = registry(b.owner)
|
||||
if (b.version > myBucket.version) {
|
||||
registry += (b.owner -> myBucket.copy(version = b.version, content = myBucket.content ++ b.content))
|
||||
if (nodes(b.owner)) {
|
||||
val myBucket = registry(b.owner)
|
||||
if (b.version > myBucket.version) {
|
||||
registry += (b.owner -> myBucket.copy(version = b.version, content = myBucket.content ++ b.content))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -32,7 +32,6 @@ object DistributedPubSubMediatorSpec extends MultiNodeConfig {
|
|||
akka.remote.log-remote-lifecycle-events = off
|
||||
akka.cluster.auto-down-unreachable-after = 0s
|
||||
akka.contrib.cluster.pub-sub.max-delta-elements = 500
|
||||
#akka.remote.log-frame-size-exceeding = 1024b
|
||||
"""))
|
||||
|
||||
object TestChatUser {
|
||||
|
|
@ -388,5 +387,24 @@ class DistributedPubSubMediatorSpec extends MultiNodeSpec(DistributedPubSubMedia
|
|||
|
||||
enterBarrier("after-12")
|
||||
}
|
||||
|
||||
"remove entries when node is removed" in within(30 seconds) {
|
||||
mediator ! Count
|
||||
val countBefore = expectMsgType[Int]
|
||||
|
||||
runOn(first) {
|
||||
testConductor.exit(third, 0).await
|
||||
}
|
||||
|
||||
enterBarrier("third-shutdown")
|
||||
|
||||
// third had 2 entries u5 and u11, and those should be removed everywhere
|
||||
runOn(first, second) {
|
||||
awaitCount(countBefore - 2)
|
||||
}
|
||||
|
||||
enterBarrier("after-13")
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue