From e5cd47279d24af1685feb7c16787da6cd87e7bd9 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 25 Aug 2014 10:26:28 +0200 Subject: [PATCH] =con Harden ClusterShardingSpec some more * Replace sleep with awaitAssert * Use separate probes for awaitAssert checks to avoid spill-over to the testActor * Some additional cleanup * Deliver buffered messages when HostShard is received Test failures showed that initial messages could be re-ordered otherwise --- .../main/scala/akka/dispatch/Mailbox.scala | 2 +- .../contrib/pattern/ClusterSharding.scala | 19 +++--- .../contrib/pattern/ClusterShardingSpec.scala | 60 ++++++++----------- 3 files changed, 37 insertions(+), 44 deletions(-) diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index ff4e0fffff..d81a01012a 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -372,7 +372,7 @@ class NodeMessageQueue extends AbstractNodeQueue[Envelope] with MessageQueue wit //Discards overflowing messages into DeadLetters class BoundedNodeMessageQueue(capacity: Int) extends AbstractBoundedNodeQueue[Envelope](capacity) -with MessageQueue with BoundedMessageQueueSemantics with MultipleConsumerSemantics { + with MessageQueue with BoundedMessageQueueSemantics with MultipleConsumerSemantics { final def pushTimeOut: Duration = Duration.Undefined final def enqueue(receiver: ActorRef, handle: Envelope): Unit = diff --git a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala index 938dfdb647..04a9590487 100644 --- a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala +++ b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala @@ -731,6 +731,7 @@ class ShardRegion( //Start the shard, if already started this does nothing getShard(shard) + deliverBufferedMessages(shard) sender() ! ShardStarted(shard) @@ -748,14 +749,7 @@ class ShardRegion( if (ref != self) context.watch(ref) - shardBuffers.get(shard) match { - case Some(buf) ⇒ - buf.foreach { - case (msg, snd) ⇒ deliverMessage(msg, snd) - } - shardBuffers -= shard - case None ⇒ - } + deliverBufferedMessages(shard) case RegisterAck(coord) ⇒ context.watch(coord) @@ -845,6 +839,15 @@ class ShardRegion( } } + def deliverBufferedMessages(shard: String): Unit = { + shardBuffers.get(shard) match { + case Some(buf) ⇒ + buf.foreach { case (msg, snd) ⇒ deliverMessage(msg, snd) } + shardBuffers -= shard + case None ⇒ + } + } + def deliverMessage(msg: Any, snd: ActorRef): Unit = { val shard = shardResolver(msg) regionByShard.get(shard) match { diff --git a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala index 83052c6782..6d73e1037e 100644 --- a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala +++ b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala @@ -445,7 +445,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult if (probe.lastSender.path == region.path / (n % 12).toString / n.toString) count += 1 } - count should be(2) + count should be >= (2) } } } @@ -563,9 +563,10 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult region ! HandOff("1") expectMsg(10 seconds, "ShardStopped not received", ShardStopped("1")) + val probe = TestProbe() awaitAssert({ - shard ! Identify(1) - expectMsg(1 second, "Shard was still around", ActorIdentity(1, None)) + shard.tell(Identify(1), probe.ref) + probe.expectMsg(1 second, "Shard was still around", ActorIdentity(1, None)) }, 5 seconds, 500 millis) //Get the path to where the shard now resides @@ -576,10 +577,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult // not sent a message to it via the ShardRegion val counter1 = system.actorSelection(lastSender.path.parent / "1") counter1 ! Identify(2) - receiveOne(1 second) match { - case ActorIdentity(2, location) ⇒ - location should not be (None) - } + expectMsgType[ActorIdentity](3 seconds).ref should not be (None) counter1 ! Get(1) expectMsg(1) @@ -598,10 +596,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult //Check that no counter "1" exists in this shard val secondCounter1 = system.actorSelection(lastSender.path.parent / "1") secondCounter1 ! Identify(3) - receiveOne(1 second) match { - case ActorIdentity(3, location) ⇒ - location should be(None) - } + expectMsg(3 seconds, ActorIdentity(3, None)) } enterBarrier("after-11") @@ -639,18 +634,21 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult //Watch for the terminated message expectTerminated(counter1, 5 seconds) + val probe1 = TestProbe() awaitAssert({ //Check counter 1 is dead - counter1 ! Identify(1) - expectMsg(1 second, "Entry 1 was still around", ActorIdentity(1, None)) + counter1.tell(Identify(1), probe1.ref) + probe1.expectMsg(1 second, "Entry 1 was still around", ActorIdentity(1, None)) }, 5 second, 500 millis) //Stop the shard cleanly region ! HandOff("1") expectMsg(10 seconds, "ShardStopped not received", ShardStopped("1")) + + val probe2 = TestProbe() awaitAssert({ - shard ! Identify(2) - expectMsg(1 second, "Shard was still around", ActorIdentity(2, None)) + shard.tell(Identify(2), probe2.ref) + probe2.expectMsg(1 second, "Shard was still around", ActorIdentity(2, None)) }, 5 seconds, 500 millis) } @@ -665,14 +663,11 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult //Check counter 1 is still dead system.actorSelection(shard / "1") ! Identify(3) - receiveOne(1 second) should be(ActorIdentity(3, None)) + expectMsg(ActorIdentity(3, None)) //Check counter 13 is alive again 8 system.actorSelection(shard / "13") ! Identify(4) - receiveOne(1 second) match { - case ActorIdentity(4, location) ⇒ - location should not be (None) - } + expectMsgType[ActorIdentity](3 seconds).ref should not be (None) } enterBarrier("after-12") @@ -694,14 +689,11 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult counter1 ! Stop + val probe = TestProbe() awaitAssert({ - counter1 ! Identify(1) - receiveOne(1 second) match { - case ActorIdentity(1, location) ⇒ - location should not be (None) - } + counter1.tell(Identify(1), probe.ref) + probe.expectMsgType[ActorIdentity](1 second).ref should not be (None) }, 5.seconds, 500.millis) - } enterBarrier("after-13") @@ -728,17 +720,15 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult } enterBarrier("region4-up") - //Wait for migration to happen - Thread sleep 2500 - + // Wait for migration to happen //Test the shard, thus counter was moved onto node 4 and started. runOn(fourth) { val counter1 = system.actorSelection(system / "AutoMigrateRegionTestRegion" / "1" / "1") - counter1 ! Identify(1) - receiveOne(1 second) match { - case ActorIdentity(1, location) ⇒ - location should not be (None) - } + val probe = TestProbe() + awaitAssert({ + counter1.tell(Identify(1), probe.ref) + probe.expectMsgType[ActorIdentity](1 second).ref should not be (None) + }, 5.seconds, 500 millis) counter1 ! Get(1) expectMsg(2) @@ -771,7 +761,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult for (n ← 2 to 12) { val entry = system.actorSelection(system / "PersistentCounterRegion" / (n % 12).toString / n.toString) entry ! Identify(n) - receiveOne(1 second) match { + receiveOne(3 seconds) match { case ActorIdentity(id, Some(_)) if id == n ⇒ count = count + 1 case ActorIdentity(id, None) ⇒ //Not on the fifth shard }