diff --git a/akka-contrib/src/main/scala/akka/contrib/pattern/DistributedPubSubMediator.scala b/akka-contrib/src/main/scala/akka/contrib/pattern/DistributedPubSubMediator.scala index 3df58d1c4c..6a710c6293 100644 --- a/akka-contrib/src/main/scala/akka/contrib/pattern/DistributedPubSubMediator.scala +++ b/akka-contrib/src/main/scala/akka/contrib/pattern/DistributedPubSubMediator.scala @@ -337,11 +337,15 @@ class DistributedPubSubMediator( case Delta(buckets) ⇒ // reply from Status message in the gossip chat // the Delta contains potential updates (newer versions) from the other node - if (nodes.contains(sender().path.address)) { + // only accept deltas/buckets from known nodes, otherwise there is a risk of + // adding back entries when nodes are removed + if (nodes(sender().path.address)) { buckets foreach { b ⇒ - val myBucket = registry(b.owner) - if (b.version > myBucket.version) { - registry += (b.owner -> myBucket.copy(version = b.version, content = myBucket.content ++ b.content)) + if (nodes(b.owner)) { + val myBucket = registry(b.owner) + if (b.version > myBucket.version) { + registry += (b.owner -> myBucket.copy(version = b.version, content = myBucket.content ++ b.content)) + } } } } diff --git a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/DistributedPubSubMediatorSpec.scala b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/DistributedPubSubMediatorSpec.scala index a88fa98527..91e6eeda35 100644 --- a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/DistributedPubSubMediatorSpec.scala +++ b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/DistributedPubSubMediatorSpec.scala @@ -32,7 +32,6 @@ object DistributedPubSubMediatorSpec extends MultiNodeConfig { akka.remote.log-remote-lifecycle-events = off akka.cluster.auto-down-unreachable-after = 0s akka.contrib.cluster.pub-sub.max-delta-elements = 500 - #akka.remote.log-frame-size-exceeding = 1024b """)) object TestChatUser { @@ -388,5 +387,24 @@ class DistributedPubSubMediatorSpec extends MultiNodeSpec(DistributedPubSubMedia enterBarrier("after-12") } + + "remove entries when node is removed" in within(30 seconds) { + mediator ! Count + val countBefore = expectMsgType[Int] + + runOn(first) { + testConductor.exit(third, 0).await + } + + enterBarrier("third-shutdown") + + // third had 2 entries u5 and u11, and those should be removed everywhere + runOn(first, second) { + awaitCount(countBefore - 2) + } + + enterBarrier("after-13") + } } + }