From ddb03c910db4dc80ff35cf385acd1111de172f33 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 25 Oct 2012 08:13:41 +0200 Subject: [PATCH] Wrong message ordering in ClusterDomainEventPublisherSpec, see #2648 * Problem: testActor subscriber sometimes didn't receive expected published message * Reason: the testActor subscribe/unsubscribe in beforeEach/afterEach via different instances of the publisher, therefore the ordering of those messages were not guaranteed, and unsubscribe happend after subscribe * Solution: subscribe directly to the eventStream once, and use separate TestProbe to verify the subscribe/unsubscribe --- .../ClusterDomainEventPublisherSpec.scala | 27 ++++++++++++++----- 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala index 0907a3abb8..188c91505c 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala @@ -16,6 +16,7 @@ import akka.cluster.ClusterEvent._ import akka.testkit.AkkaSpec import akka.testkit.ImplicitSender import akka.actor.ActorRef +import akka.testkit.TestProbe object ClusterDomainEventPublisherSpec { val config = """ @@ -47,14 +48,15 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec(ClusterDomainEventPublish val g4 = Gossip(members = SortedSet(d1, a1, b1, c2)).seen(a1.address) val g5 = Gossip(members = SortedSet(d1, a1, b1, c2)).seen(a1.address).seen(b1.address).seen(c2.address).seen(d1.address) + override def atStartup(): Unit = { + system.eventStream.subscribe(testActor, classOf[ClusterDomainEvent]) + } + override def beforeEach(): Unit = { publisher = system.actorOf(Props[ClusterDomainEventPublisher]) - publisher ! Subscribe(testActor, classOf[ClusterDomainEvent]) - expectMsgType[CurrentClusterState] } override def afterEach(): Unit = { - publisher ! Unsubscribe(testActor, None) system.stop(publisher) } @@ -116,10 +118,23 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec(ClusterDomainEventPublish expectMsgType[SeenChanged] } + "send CurrentClusterState when subscribe" in { + val subscriber = TestProbe() + publisher ! Subscribe(subscriber.ref, classOf[ClusterDomainEvent]) + subscriber.expectMsgType[CurrentClusterState] + // but only to the new subscriber + expectNoMsg(1 second) + } + "support unsubscribe" in { - publisher ! Unsubscribe(testActor, Some(classOf[ClusterDomainEvent])) - publisher ! PublishChanges(g1, g2) - expectNoMsg + val subscriber = TestProbe() + publisher ! Subscribe(subscriber.ref, classOf[ClusterDomainEvent]) + subscriber.expectMsgType[CurrentClusterState] + publisher ! Unsubscribe(subscriber.ref, Some(classOf[ClusterDomainEvent])) + publisher ! PublishChanges(Gossip(members = SortedSet(a1)), Gossip(members = SortedSet(a1, b1))) + subscriber.expectNoMsg(1 second) + // but testActor is still subscriber + expectMsg(MemberUp(b1)) } }