Wrong message ordering in ClusterDomainEventPublisherSpec, see #2648
* Problem: testActor subscriber sometimes didn't receive expected published message * Reason: the testActor subscribe/unsubscribe in beforeEach/afterEach via different instances of the publisher, therefore the ordering of those messages were not guaranteed, and unsubscribe happend after subscribe * Solution: subscribe directly to the eventStream once, and use separate TestProbe to verify the subscribe/unsubscribe
This commit is contained in:
parent
c1a96493b2
commit
ddb03c910d
1 changed files with 21 additions and 6 deletions
|
|
@ -16,6 +16,7 @@ import akka.cluster.ClusterEvent._
|
||||||
import akka.testkit.AkkaSpec
|
import akka.testkit.AkkaSpec
|
||||||
import akka.testkit.ImplicitSender
|
import akka.testkit.ImplicitSender
|
||||||
import akka.actor.ActorRef
|
import akka.actor.ActorRef
|
||||||
|
import akka.testkit.TestProbe
|
||||||
|
|
||||||
object ClusterDomainEventPublisherSpec {
|
object ClusterDomainEventPublisherSpec {
|
||||||
val config = """
|
val config = """
|
||||||
|
|
@ -47,14 +48,15 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec(ClusterDomainEventPublish
|
||||||
val g4 = Gossip(members = SortedSet(d1, a1, b1, c2)).seen(a1.address)
|
val g4 = Gossip(members = SortedSet(d1, a1, b1, c2)).seen(a1.address)
|
||||||
val g5 = Gossip(members = SortedSet(d1, a1, b1, c2)).seen(a1.address).seen(b1.address).seen(c2.address).seen(d1.address)
|
val g5 = Gossip(members = SortedSet(d1, a1, b1, c2)).seen(a1.address).seen(b1.address).seen(c2.address).seen(d1.address)
|
||||||
|
|
||||||
|
override def atStartup(): Unit = {
|
||||||
|
system.eventStream.subscribe(testActor, classOf[ClusterDomainEvent])
|
||||||
|
}
|
||||||
|
|
||||||
override def beforeEach(): Unit = {
|
override def beforeEach(): Unit = {
|
||||||
publisher = system.actorOf(Props[ClusterDomainEventPublisher])
|
publisher = system.actorOf(Props[ClusterDomainEventPublisher])
|
||||||
publisher ! Subscribe(testActor, classOf[ClusterDomainEvent])
|
|
||||||
expectMsgType[CurrentClusterState]
|
|
||||||
}
|
}
|
||||||
|
|
||||||
override def afterEach(): Unit = {
|
override def afterEach(): Unit = {
|
||||||
publisher ! Unsubscribe(testActor, None)
|
|
||||||
system.stop(publisher)
|
system.stop(publisher)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -116,10 +118,23 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec(ClusterDomainEventPublish
|
||||||
expectMsgType[SeenChanged]
|
expectMsgType[SeenChanged]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"send CurrentClusterState when subscribe" in {
|
||||||
|
val subscriber = TestProbe()
|
||||||
|
publisher ! Subscribe(subscriber.ref, classOf[ClusterDomainEvent])
|
||||||
|
subscriber.expectMsgType[CurrentClusterState]
|
||||||
|
// but only to the new subscriber
|
||||||
|
expectNoMsg(1 second)
|
||||||
|
}
|
||||||
|
|
||||||
"support unsubscribe" in {
|
"support unsubscribe" in {
|
||||||
publisher ! Unsubscribe(testActor, Some(classOf[ClusterDomainEvent]))
|
val subscriber = TestProbe()
|
||||||
publisher ! PublishChanges(g1, g2)
|
publisher ! Subscribe(subscriber.ref, classOf[ClusterDomainEvent])
|
||||||
expectNoMsg
|
subscriber.expectMsgType[CurrentClusterState]
|
||||||
|
publisher ! Unsubscribe(subscriber.ref, Some(classOf[ClusterDomainEvent]))
|
||||||
|
publisher ! PublishChanges(Gossip(members = SortedSet(a1)), Gossip(members = SortedSet(a1, b1)))
|
||||||
|
subscriber.expectNoMsg(1 second)
|
||||||
|
// but testActor is still subscriber
|
||||||
|
expectMsg(MemberUp(b1))
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue