diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingQuery.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingQuery.scala new file mode 100644 index 0000000000..dad3f2a3cf --- /dev/null +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingQuery.scala @@ -0,0 +1,31 @@ +/* + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.cluster.sharding.typed + +import akka.actor.typed.ActorRef +import akka.cluster.sharding.ShardRegion.CurrentShardRegionState +import akka.cluster.sharding.typed.scaladsl.EntityTypeKey + +/** + * Protocol for querying sharding state e.g. A ShardRegion's state + */ +sealed trait ClusterShardingQuery + +/** + * Query the ShardRegion state for the given entity type key. This will get the state of the + * local ShardRegion's state. + */ +final case class GetShardRegionState(entityTypeKey: EntityTypeKey[_], replyTo: ActorRef[CurrentShardRegionState]) extends ClusterShardingQuery { + /** + * Java API + * + * Query the ShardRegion state for the given entity type key. This will get the state of the + * local ShardRegion's state. + */ + def this(entityTypeKey: javadsl.EntityTypeKey[_], replyTo: ActorRef[CurrentShardRegionState]) = this(entityTypeKey.asScala, replyTo) +} + +// TODO - GetClusterShardingStats + diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala index 2904b4ad3d..e0c88db533 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala @@ -123,7 +123,7 @@ import akka.util.Timeout require(system.isInstanceOf[ActorSystemAdapter[_]], "only adapted untyped actor systems can be used for cluster features") private val cluster = Cluster(system) - private val untypedSystem = system.toUntyped + private val untypedSystem: ExtendedActorSystem = system.toUntyped.asInstanceOf[ExtendedActorSystem] private val untypedSharding = akka.cluster.sharding.ClusterSharding(untypedSystem) private val log: LoggingAdapter = Logging(untypedSystem, classOf[scaladsl.ClusterSharding]) @@ -214,7 +214,6 @@ import akka.util.Timeout val behv = behavior(new EntityContext(entityId, shardCommandDelegator)) PropsAdapter(poisonPillInterceptor(behv), entityProps) } - untypedSharding.internalStart( typeKey.name, untypedEntityPropsFactory, @@ -264,6 +263,12 @@ import akka.util.Timeout new LeastShardAllocationStrategy(threshold, maxSimultaneousRebalance) } + override lazy val shardState: ActorRef[ClusterShardingQuery] = { + import akka.actor.typed.scaladsl.adapter._ + val behavior = ShardingState.behavior(untypedSharding) + untypedSystem.systemActorOf(PropsAdapter(behavior), "typedShardState") + } + } /** diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardingState.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardingState.scala new file mode 100644 index 0000000000..f0f725d58f --- /dev/null +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardingState.scala @@ -0,0 +1,30 @@ +/* + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.cluster.sharding.typed.internal +import akka.actor.typed.Behavior +import akka.actor.typed.scaladsl.Behaviors +import akka.cluster.sharding.{ ClusterSharding, ShardRegion } +import akka.cluster.sharding.ShardRegion.CurrentShardRegionState +import akka.actor.typed.scaladsl.adapter._ +import akka.annotation.InternalApi +import akka.cluster.sharding.typed.{ ClusterShardingQuery, GetShardRegionState } + +/** + * INTERNAL API + */ +@InternalApi +object ShardingState { + + def behavior(untypedSharding: ClusterSharding): Behavior[ClusterShardingQuery] = Behaviors.receiveMessage { + case GetShardRegionState(key, replyTo) ⇒ + if (untypedSharding.getShardTypeNames.contains(key.name)) { + untypedSharding.shardRegion(key.name).tell(ShardRegion.GetShardRegionState, replyTo.toUntyped) + } else { + replyTo ! CurrentShardRegionState(Set.empty) + } + Behavior.same + } + +} diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala index 4925dd034e..2e35360a83 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala @@ -188,6 +188,11 @@ abstract class ClusterSharding { */ def entityRefFor[M](typeKey: EntityTypeKey[M], entityId: String): EntityRef[M] + /** + * Actor for querying Cluster Sharding state + */ + def shardState: ActorRef[ClusterShardingQuery] + /** * The default is currently [[akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy]] with the * given `settings`. This could be changed in the future. diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala index ba75e8ccac..a25301f21d 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala @@ -6,10 +6,10 @@ package akka.cluster.sharding.typed package scaladsl import scala.concurrent.Future - -import akka.util.Timeout +import scala.collection.{ immutable ⇒ im } import scala.reflect.ClassTag +import akka.util.Timeout import akka.actor.typed.ActorRef import akka.actor.typed.ActorSystem import akka.actor.typed.Behavior @@ -24,7 +24,7 @@ import akka.annotation.InternalApi import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy import akka.cluster.sharding.typed.internal.ClusterShardingImpl import akka.cluster.sharding.typed.internal.EntityTypeKeyImpl -import akka.cluster.sharding.ShardRegion.{ StartEntity ⇒ UntypedStartEntity } +import akka.cluster.sharding.ShardRegion.{ CurrentShardRegionState, StartEntity ⇒ UntypedStartEntity } import akka.persistence.typed.PersistenceId object ClusterSharding extends ExtensionId[ClusterSharding] { @@ -189,6 +189,11 @@ trait ClusterSharding extends Extension { javadslSelf: javadsl.ClusterSharding */ def entityRefFor[M](typeKey: EntityTypeKey[M], entityId: String): EntityRef[M] + /** + * Actor for querying Cluster Sharding state + */ + def shardState: ActorRef[ClusterShardingQuery] + /** * The default is currently [[akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy]] with the * given `settings`. This could be changed in the future. @@ -199,6 +204,7 @@ trait ClusterSharding extends Extension { javadslSelf: javadsl.ClusterSharding * INTERNAL API */ @InternalApi private[akka] def asJava: javadsl.ClusterSharding = javadslSelf + } object Entity { diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingSpec.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingSpec.scala index fe6553f4c3..1769f319b8 100644 --- a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingSpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingSpec.scala @@ -62,7 +62,7 @@ object ClusterShardingSpec { "akka.cluster.sharding.typed.scaladsl.ClusterShardingSpec$$IdTestProtocol" = test } } - """.stripMargin) + """) sealed trait TestProtocol extends java.io.Serializable final case class ReplyPlz(toMe: ActorRef[String]) extends TestProtocol @@ -131,23 +131,10 @@ object ClusterShardingSpec { final case class TheReply(s: String) -} + val typeKey = EntityTypeKey[TestProtocol]("envelope-shard") + val typeKey2 = EntityTypeKey[IdTestProtocol]("no-envelope-shard") -class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec.config) with WordSpecLike { - import ClusterShardingSpec._ - - val sharding = ClusterSharding(system) - - val system2 = ActorSystem(Behaviors.ignore[Any], name = system.name, config = system.settings.config) - val sharding2 = ClusterSharding(system2) - - override def afterAll(): Unit = { - ActorTestKit.shutdown(system2, 5.seconds) - super.afterAll() - } - - private val typeKey = EntityTypeKey[TestProtocol]("envelope-shard") - private def behavior(shard: ActorRef[ClusterSharding.ShardCommand], stopProbe: Option[ActorRef[String]] = None) = + def behavior(shard: ActorRef[ClusterSharding.ShardCommand], stopProbe: Option[ActorRef[String]] = None) = Behaviors.receive[TestProtocol] { case (ctx, PassivatePlz()) ⇒ shard ! ClusterSharding.Passivate(ctx.self) @@ -171,8 +158,7 @@ class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec. Behaviors.same } - private val typeKey2 = EntityTypeKey[IdTestProtocol]("no-envelope-shard") - private def behaviorWithId(shard: ActorRef[ClusterSharding.ShardCommand]) = Behaviors.receive[IdTestProtocol] { + def behaviorWithId(shard: ActorRef[ClusterSharding.ShardCommand]) = Behaviors.receive[IdTestProtocol] { case (_, IdStopPlz()) ⇒ Behaviors.stopped @@ -186,6 +172,27 @@ class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec. Behaviors.same } + val idTestProtocolMessageExtractor = ShardingMessageExtractor.noEnvelope[IdTestProtocol](10, IdStopPlz()) { + case IdReplyPlz(id, _) ⇒ id + case IdWhoAreYou(id, _) ⇒ id + case other ⇒ throw new IllegalArgumentException(s"Unexpected message $other") + } +} + +class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec.config) with WordSpecLike { + + import ClusterShardingSpec._ + + val sharding = ClusterSharding(system) + + val system2 = ActorSystem(Behaviors.ignore[Any], name = system.name, config = system.settings.config) + val sharding2 = ClusterSharding(system2) + + override def afterAll(): Unit = { + ActorTestKit.shutdown(system2, 5.seconds) + super.afterAll() + } + private val shardingRef1: ActorRef[ShardingEnvelope[TestProtocol]] = sharding.init(Entity( typeKey, ctx ⇒ behavior(ctx.shard)) @@ -210,12 +217,7 @@ class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec. private val shardingRef4 = sharding2.init(Entity( typeKey2, ctx ⇒ behaviorWithId(ctx.shard)) - .withMessageExtractor( - ShardingMessageExtractor.noEnvelope[IdTestProtocol](10, IdStopPlz()) { - case IdReplyPlz(id, _) ⇒ id - case IdWhoAreYou(id, _) ⇒ id - case other ⇒ throw new IllegalArgumentException(s"Unexpected message $other") - }) + .withMessageExtractor(idTestProtocolMessageExtractor) .withStopMessage(IdStopPlz()) ) diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingStateSpec.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingStateSpec.scala new file mode 100644 index 0000000000..83a13f748f --- /dev/null +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingStateSpec.scala @@ -0,0 +1,62 @@ +/* + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.cluster.sharding.typed.scaladsl + +import akka.actor.testkit.typed.scaladsl.{ ScalaTestWithActorTestKit, TestProbe } +import akka.actor.typed.ActorRef +import akka.cluster.sharding.ShardRegion.{ CurrentShardRegionState, ShardState } +import akka.cluster.sharding.typed.scaladsl.ClusterShardingSpec._ +import akka.cluster.sharding.typed.{ GetShardRegionState, ShardingMessageExtractor } +import akka.cluster.typed.{ Cluster, Join } +import org.scalatest.WordSpecLike + +class ClusterShardingStateSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec.config) with WordSpecLike { + + val sharding = ClusterSharding(system) + + val shardExtractor = ShardingMessageExtractor.noEnvelope[IdTestProtocol](10, IdStopPlz()) { + case IdReplyPlz(id, _) ⇒ id + case IdWhoAreYou(id, _) ⇒ id + case other ⇒ throw new IllegalArgumentException(s"Unexpected message $other") + } + + val cluster = Cluster(system) + + val typeKey: EntityTypeKey[IdTestProtocol] = ClusterShardingSpec.typeKey2 + + "Cluster Sharding" must { + "allow querying of the shard region state" in { + val probe = TestProbe[CurrentShardRegionState]() + cluster.manager ! Join(cluster.selfMember.address) + + // Before the region is started + sharding.shardState ! GetShardRegionState(typeKey, probe.ref) + probe.expectMessage(CurrentShardRegionState(Set())) + + val shardingRef: ActorRef[IdTestProtocol] = sharding.init( + Entity( + typeKey, + ctx ⇒ ClusterShardingSpec.behaviorWithId(ctx.shard)) + .withStopMessage(IdStopPlz()) + .withMessageExtractor(idTestProtocolMessageExtractor) + ) + + sharding.shardState ! GetShardRegionState(typeKey, probe.ref) + probe.expectMessage(CurrentShardRegionState(Set())) + + // Create a shard + val replyProbe = TestProbe[String]() + shardingRef ! IdReplyPlz("id1", replyProbe.ref) + replyProbe.expectMessage("Hello!") + + //#get-region-state + ClusterSharding(system).shardState ! GetShardRegionState(typeKey, probe.ref) + val state = probe.expectMessageType[CurrentShardRegionState] + //#get-region-state + state.shards should be(Set(ShardState(shardExtractor.shardId("id1"), Set("id1")))) + } + } + +} diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala index bac89f11c1..d7bd8b190c 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala @@ -642,7 +642,7 @@ private[akka] class ShardRegion( case (shardId, state) ⇒ ShardRegion.ShardState(shardId, state.entityIds) }.toSet) }.recover { - case x: AskTimeoutException ⇒ CurrentShardRegionState(Set.empty) + case _: AskTimeoutException ⇒ CurrentShardRegionState(Set.empty) }.pipeTo(ref) } diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/scaladsl/Replicator.scala b/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/scaladsl/Replicator.scala index 441d407042..04cd6e59d8 100644 --- a/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/scaladsl/Replicator.scala +++ b/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/scaladsl/Replicator.scala @@ -50,7 +50,7 @@ object Replicator { * Convenience for `ask`. */ def apply[A <: ReplicatedData](key: Key[A], consistency: ReadConsistency): ActorRef[GetResponse[A]] ⇒ Get[A] = - (replyTo ⇒ Get(key, consistency, replyTo, None)) + replyTo ⇒ Get(key, consistency, replyTo, None) } /** diff --git a/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/SingletonCompileOnlyTest.java b/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/SingletonCompileOnlyTest.java index 8e2a91490e..109fe59b4b 100644 --- a/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/SingletonCompileOnlyTest.java +++ b/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/SingletonCompileOnlyTest.java @@ -8,11 +8,8 @@ import akka.actor.typed.*; import akka.actor.typed.javadsl.Behaviors; //#import -import akka.cluster.typed.ClusterSingleton; -import akka.cluster.typed.ClusterSingletonSettings; - +import akka.cluster.typed.*; import java.time.Duration; - //#import public class SingletonCompileOnlyTest {