From 4b33cf98dfee621d28f6f9a1d1372ff5926d728f Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 7 Feb 2014 09:23:23 +0100 Subject: [PATCH] =con #3865 Fix race in pub-sub when nodes are removed * The race can happen if the MemberRemoved event is received followed by a Delta update from a node that has not yet got the MemberRemoved. That will make the bucket for the removed node to be added back in the registry. --- .../pattern/DistributedPubSubMediator.scala | 12 +++++++---- .../DistributedPubSubMediatorSpec.scala | 20 ++++++++++++++++++- 2 files changed, 27 insertions(+), 5 deletions(-) diff --git a/akka-contrib/src/main/scala/akka/contrib/pattern/DistributedPubSubMediator.scala b/akka-contrib/src/main/scala/akka/contrib/pattern/DistributedPubSubMediator.scala index 3df58d1c4c..6a710c6293 100644 --- a/akka-contrib/src/main/scala/akka/contrib/pattern/DistributedPubSubMediator.scala +++ b/akka-contrib/src/main/scala/akka/contrib/pattern/DistributedPubSubMediator.scala @@ -337,11 +337,15 @@ class DistributedPubSubMediator( case Delta(buckets) ⇒ // reply from Status message in the gossip chat // the Delta contains potential updates (newer versions) from the other node - if (nodes.contains(sender().path.address)) { + // only accept deltas/buckets from known nodes, otherwise there is a risk of + // adding back entries when nodes are removed + if (nodes(sender().path.address)) { buckets foreach { b ⇒ - val myBucket = registry(b.owner) - if (b.version > myBucket.version) { - registry += (b.owner -> myBucket.copy(version = b.version, content = myBucket.content ++ b.content)) + if (nodes(b.owner)) { + val myBucket = registry(b.owner) + if (b.version > myBucket.version) { + registry += (b.owner -> myBucket.copy(version = b.version, content = myBucket.content ++ b.content)) + } } } } diff --git a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/DistributedPubSubMediatorSpec.scala b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/DistributedPubSubMediatorSpec.scala index a88fa98527..91e6eeda35 100644 --- a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/DistributedPubSubMediatorSpec.scala +++ b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/DistributedPubSubMediatorSpec.scala @@ -32,7 +32,6 @@ object DistributedPubSubMediatorSpec extends MultiNodeConfig { akka.remote.log-remote-lifecycle-events = off akka.cluster.auto-down-unreachable-after = 0s akka.contrib.cluster.pub-sub.max-delta-elements = 500 - #akka.remote.log-frame-size-exceeding = 1024b """)) object TestChatUser { @@ -388,5 +387,24 @@ class DistributedPubSubMediatorSpec extends MultiNodeSpec(DistributedPubSubMedia enterBarrier("after-12") } + + "remove entries when node is removed" in within(30 seconds) { + mediator ! Count + val countBefore = expectMsgType[Int] + + runOn(first) { + testConductor.exit(third, 0).await + } + + enterBarrier("third-shutdown") + + // third had 2 entries u5 and u11, and those should be removed everywhere + runOn(first, second) { + awaitCount(countBefore - 2) + } + + enterBarrier("after-13") + } } + }