diff --git a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala index b4f1a7100f..938dfdb647 100644 --- a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala +++ b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala @@ -659,7 +659,7 @@ class ShardRegion( var shardsByRef = Map.empty[ActorRef, ShardId] var handingOff = Set.empty[ActorRef] - def totalBufferSize = shardBuffers.map { case (_, buf) ⇒ buf.size }.sum + def totalBufferSize = shardBuffers.foldLeft(0) { (sum, entry) ⇒ sum + entry._2.size } import context.dispatcher val retryTask = context.system.scheduler.schedule(retryInterval, retryInterval, self, Retry) @@ -786,7 +786,7 @@ class ShardRegion( shardBuffers -= shard if (shards.contains(shard)) { - handingOff = handingOff + shards(shard) + handingOff += shards(shard) shards(shard) forward msg } else sender() ! ShardStopped(shard) @@ -1009,7 +1009,7 @@ private[akka] class Shard( var handOffStopper: Option[ActorRef] = None - def totalBufferSize = messageBuffers.map { case (_, buf) ⇒ buf.size }.sum + def totalBufferSize = messageBuffers.foldLeft(0) { (sum, entry) ⇒ sum + entry._2.size } def processChange[A](event: A)(handler: A ⇒ Unit): Unit = if (rememberEntries) persist(event)(handler) diff --git a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala index f4955aaaab..83052c6782 100644 --- a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala +++ b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala @@ -9,10 +9,7 @@ import akka.contrib.pattern.ShardRegion.Passivate import language.postfixOps import scala.concurrent.duration._ import com.typesafe.config.ConfigFactory -import akka.actor.ActorIdentity -import akka.actor.Identify -import akka.actor.PoisonPill -import akka.actor.Props +import akka.actor._ import akka.cluster.Cluster import akka.persistence.PersistentActor import akka.persistence.Persistence @@ -26,8 +23,6 @@ import akka.testkit._ import akka.testkit.TestEvent.Mute import java.io.File import org.apache.commons.io.FileUtils -import akka.actor.ReceiveTimeout -import akka.actor.ActorRef object ClusterShardingSpec extends MultiNodeConfig { val controller = role("controller") @@ -169,35 +164,37 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult val allocationStrategy = new ShardCoordinator.LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 1) val coordinatorProps = ShardCoordinator.props(handOffTimeout = 10.seconds, shardStartTimeout = 10.seconds, rebalanceInterval = 2.seconds, snapshotInterval = 3600.seconds, allocationStrategy) - system.actorOf(ClusterSingletonManager.props( - singletonProps = ShardCoordinatorSupervisor.props(failureBackoff = 5.seconds, coordinatorProps), - singletonName = "singleton", - terminationMessage = PoisonPill, - role = None), - name = "counterCoordinator") + + for (coordinatorName ← List("counter", "PersistentCounterEntries", "AnotherPersistentCounter", "PersistentCounter", "AutoMigrateRegionTest")) + system.actorOf(ClusterSingletonManager.props( + singletonProps = ShardCoordinatorSupervisor.props(failureBackoff = 5.seconds, coordinatorProps), + singletonName = "singleton", + terminationMessage = PoisonPill, + role = None), + name = coordinatorName + "Coordinator") } - lazy val region = system.actorOf(ShardRegion.props( - typeName = "counter", + def createRegion(typeName: String, rememberEntries: Boolean): ActorRef = system.actorOf(ShardRegion.props( + typeName = typeName, entryProps = Props[Counter], role = None, - coordinatorPath = "/user/counterCoordinator/singleton/coordinator", + coordinatorPath = "/user/" + typeName + "Coordinator/singleton/coordinator", retryInterval = 1.second, shardFailureBackoff = 1.second, entryRestartBackoff = 1.second, snapshotInterval = 1.hour, bufferSize = 1000, - rememberEntries = false, + rememberEntries = rememberEntries, idExtractor = idExtractor, shardResolver = shardResolver), - name = "counterRegion") + name = typeName + "Region") - lazy val persistentRegion = ClusterSharding(system).start( - typeName = "PersistentCounter", - entryProps = Some(Props[Counter]), - rememberEntries = true, - idExtractor = idExtractor, - shardResolver = shardResolver) + lazy val region = createRegion("counter", rememberEntries = false) + + lazy val persistentEntriesRegion = createRegion("PersistentCounterEntries", rememberEntries = true) + lazy val anotherPersistentRegion = createRegion("AnotherPersistentCounter", rememberEntries = true) + lazy val persistentRegion = createRegion("PersistentCounter", rememberEntries = true) + lazy val autoMigrateRegion = createRegion("AutoMigrateRegionTest", rememberEntries = true) "Cluster sharding" must { @@ -547,28 +544,15 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult "Persistent Cluster Shards" must { "recover entries upon restart" in within(50.seconds) { runOn(third, fourth, fifth) { - ClusterSharding(system).start( - typeName = "PersistentCounterEntries", - entryProps = Some(Props[Counter]), - rememberEntries = true, - idExtractor = idExtractor, - shardResolver = shardResolver) - - ClusterSharding(system).start( - typeName = "AnotherPersistentCounter", - entryProps = Some(Props[Counter]), - rememberEntries = true, - idExtractor = idExtractor, - shardResolver = shardResolver) + persistentEntriesRegion + anotherPersistentRegion } enterBarrier("persistent-started") runOn(third) { - val counterRegion: ActorRef = ClusterSharding(system).shardRegion("PersistentCounterEntries") - //Create an increment counter 1 - counterRegion ! EntryEnvelope(1, Increment) - counterRegion ! Get(1) + persistentEntriesRegion ! EntryEnvelope(1, Increment) + persistentEntriesRegion ! Get(1) expectMsg(1) //Shut down the shard and confirm it's dead @@ -585,7 +569,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult }, 5 seconds, 500 millis) //Get the path to where the shard now resides - counterRegion ! Get(13) + persistentEntriesRegion ! Get(13) expectMsg(0) //Check that counter 1 is now alive again, even though we have @@ -605,11 +589,10 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult runOn(fourth) { //Check a second region does not share the same persistent shards - val anotherRegion: ActorRef = ClusterSharding(system).shardRegion("AnotherPersistentCounter") //Create a separate 13 counter - anotherRegion ! EntryEnvelope(13, Increment) - anotherRegion ! Get(13) + anotherPersistentRegion ! EntryEnvelope(13, Increment) + anotherPersistentRegion ! Get(13) expectMsg(1) //Check that no counter "1" exists in this shard @@ -624,7 +607,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult enterBarrier("after-11") } - "permanently stop entries which passivate" in within(50.seconds) { + "permanently stop entries which passivate" in within(15.seconds) { runOn(third, fourth, fifth) { persistentRegion } @@ -650,7 +633,12 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult counter1.path.parent should be(counter13.path.parent) //Send the shard the passivate message from the counter + watch(counter1) shard.tell(Passivate(Stop), counter1) + + //Watch for the terminated message + expectTerminated(counter1, 5 seconds) + awaitAssert({ //Check counter 1 is dead counter1 ! Identify(1) @@ -719,27 +707,21 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult enterBarrier("after-13") } - "be migrated to new regions upon region failure" in within(50.seconds) { - lazy val migrationRegion: ActorRef = ClusterSharding(system).start( - typeName = "AutoMigrateRegionTest", - entryProps = Some(Props[Counter]), - rememberEntries = true, - idExtractor = idExtractor, - shardResolver = shardResolver) + "be migrated to new regions upon region failure" in within(15.seconds) { //Start only one region, and force an entry onto that region runOn(third) { - migrationRegion ! EntryEnvelope(1, Increment) + autoMigrateRegion ! EntryEnvelope(1, Increment) } enterBarrier("shard1-region3") //Start another region and test it talks to node 3 runOn(fourth) { - migrationRegion ! EntryEnvelope(1, Increment) + autoMigrateRegion ! EntryEnvelope(1, Increment) - migrationRegion ! Get(1) + autoMigrateRegion ! Get(1) expectMsg(2) - lastSender.path should be(node(third) / "user" / "sharding" / "AutoMigrateRegionTest" / "1" / "1") + lastSender.path should be(node(third) / "user" / "AutoMigrateRegionTestRegion" / "1" / "1") //Kill region 3 system.actorSelection(lastSender.path.parent.parent) ! PoisonPill @@ -751,7 +733,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult //Test the shard, thus counter was moved onto node 4 and started. runOn(fourth) { - val counter1 = system.actorSelection(system / "sharding" / "AutoMigrateRegionTest" / "1" / "1") + val counter1 = system.actorSelection(system / "AutoMigrateRegionTestRegion" / "1" / "1") counter1 ! Identify(1) receiveOne(1 second) match { case ActorIdentity(1, location) ⇒ @@ -787,7 +769,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult var count = 0 for (n ← 2 to 12) { - var entry = system.actorSelection(system / "sharding" / "PersistentCounter" / (n % 12).toString / n.toString) + val entry = system.actorSelection(system / "PersistentCounterRegion" / (n % 12).toString / n.toString) entry ! Identify(n) receiveOne(1 second) match { case ActorIdentity(id, Some(_)) if id == n ⇒ count = count + 1