diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala index 688c228ab0..3cf2a1deb1 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala @@ -5,13 +5,11 @@ package akka.cluster.sharding import java.net.URLEncoder import java.util.concurrent.ConcurrentHashMap - import scala.collection.immutable import scala.concurrent.Await import scala.concurrent.Future import scala.concurrent.duration._ import scala.util.Success - import akka.actor.Actor import akka.actor.ActorLogging import akka.actor.ActorRef @@ -47,6 +45,7 @@ import akka.persistence._ import akka.pattern.ask import akka.pattern.pipe import akka.util.ByteString +import akka.actor.Address /** * This extension provides sharding functionality of actors in a cluster. @@ -612,6 +611,29 @@ object ShardRegion { */ def gracefulShutdownInstance = GracefulShutdown + /* + * Send this message to the `ShardRegion` actor to request for [[CurrentRegions]], + * which contains the addresses of all registered regions. + * Intended for testing purpose to see when cluster sharding is "ready". + */ + @SerialVersionUID(1L) final case object GetCurrentRegions extends ShardRegionCommand + + def getCurrentRegionsInstance = GetCurrentRegions + + /** + * Reply to [[GetCurrentRegions]] + */ + @SerialVersionUID(1L) final case class CurrentRegions(regions: Set[Address]) { + /** + * Java API + */ + def getRegions: java.util.Set[Address] = { + import scala.collection.JavaConverters._ + regions.asJava + } + + } + private case object Retry extends ShardRegionCommand private def roleOption(role: String): Option[String] = @@ -828,6 +850,12 @@ class ShardRegion( gracefulShutdownInProgress = true sendGracefulShutdownToCoordinator() + case GetCurrentRegions ⇒ + coordinator match { + case Some(c) ⇒ c.forward(GetCurrentRegions) + case None ⇒ sender() ! CurrentRegions(Set.empty) + } + case _ ⇒ unhandled(cmd) } @@ -1846,6 +1874,13 @@ class ShardCoordinator(settings: ClusterShardingSettings, allocationStrategy: Sh // it will soon be stopped when singleton is stopped context.become(shuttingDown) + case ShardRegion.GetCurrentRegions ⇒ + val reply = ShardRegion.CurrentRegions(persistentState.regions.keySet.map { ref ⇒ + if (ref.path.address.host.isEmpty) Cluster(context.system).selfAddress + else ref.path.address + }) + sender() ! reply + case _: CurrentClusterState ⇒ } diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala index 97814ca3d0..a4184fa566 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala @@ -5,6 +5,8 @@ package akka.cluster.sharding import akka.cluster.sharding.ShardCoordinator.Internal.{ ShardStopped, HandOff } import akka.cluster.sharding.ShardRegion.Passivate +import akka.cluster.sharding.ShardRegion.GetCurrentRegions +import akka.cluster.sharding.ShardRegion.CurrentRegions import language.postfixOps import scala.concurrent.duration._ import com.typesafe.config.ConfigFactory @@ -264,6 +266,9 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult region ! EntryEnvelope(1, Decrement) region ! Get(1) expectMsg(2) + + region ! GetCurrentRegions + expectMsg(CurrentRegions(Set(Cluster(system).selfAddress))) } enterBarrier("after-2") @@ -308,6 +313,9 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult region ! Get(2) expectMsg(3) lastSender.path should ===(region.path / "2" / "2") + + region ! GetCurrentRegions + expectMsg(CurrentRegions(Set(Cluster(system).selfAddress, node(first).address))) } enterBarrier("after-3")