Typed sharding: Allow queying of local shard region (#25409)

* Typed sharding: Allow quering of local shard region

Partially addresses #24466 still need to query
all shard regions

* Update to new testkit
This commit is contained in:
Christopher Batey 2018-12-04 15:53:07 +00:00 committed by GitHub
parent 895cd70b86
commit b38b407c73
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 174 additions and 36 deletions

View file

@ -0,0 +1,31 @@
/*
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding.typed
import akka.actor.typed.ActorRef
import akka.cluster.sharding.ShardRegion.CurrentShardRegionState
import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
/**
* Protocol for querying sharding state e.g. A ShardRegion's state
*/
sealed trait ClusterShardingQuery
/**
* Query the ShardRegion state for the given entity type key. This will get the state of the
* local ShardRegion's state.
*/
final case class GetShardRegionState(entityTypeKey: EntityTypeKey[_], replyTo: ActorRef[CurrentShardRegionState]) extends ClusterShardingQuery {
/**
* Java API
*
* Query the ShardRegion state for the given entity type key. This will get the state of the
* local ShardRegion's state.
*/
def this(entityTypeKey: javadsl.EntityTypeKey[_], replyTo: ActorRef[CurrentShardRegionState]) = this(entityTypeKey.asScala, replyTo)
}
// TODO - GetClusterShardingStats

View file

@ -123,7 +123,7 @@ import akka.util.Timeout
require(system.isInstanceOf[ActorSystemAdapter[_]], "only adapted untyped actor systems can be used for cluster features") require(system.isInstanceOf[ActorSystemAdapter[_]], "only adapted untyped actor systems can be used for cluster features")
private val cluster = Cluster(system) private val cluster = Cluster(system)
private val untypedSystem = system.toUntyped private val untypedSystem: ExtendedActorSystem = system.toUntyped.asInstanceOf[ExtendedActorSystem]
private val untypedSharding = akka.cluster.sharding.ClusterSharding(untypedSystem) private val untypedSharding = akka.cluster.sharding.ClusterSharding(untypedSystem)
private val log: LoggingAdapter = Logging(untypedSystem, classOf[scaladsl.ClusterSharding]) private val log: LoggingAdapter = Logging(untypedSystem, classOf[scaladsl.ClusterSharding])
@ -214,7 +214,6 @@ import akka.util.Timeout
val behv = behavior(new EntityContext(entityId, shardCommandDelegator)) val behv = behavior(new EntityContext(entityId, shardCommandDelegator))
PropsAdapter(poisonPillInterceptor(behv), entityProps) PropsAdapter(poisonPillInterceptor(behv), entityProps)
} }
untypedSharding.internalStart( untypedSharding.internalStart(
typeKey.name, typeKey.name,
untypedEntityPropsFactory, untypedEntityPropsFactory,
@ -264,6 +263,12 @@ import akka.util.Timeout
new LeastShardAllocationStrategy(threshold, maxSimultaneousRebalance) new LeastShardAllocationStrategy(threshold, maxSimultaneousRebalance)
} }
override lazy val shardState: ActorRef[ClusterShardingQuery] = {
import akka.actor.typed.scaladsl.adapter._
val behavior = ShardingState.behavior(untypedSharding)
untypedSystem.systemActorOf(PropsAdapter(behavior), "typedShardState")
}
} }
/** /**

View file

@ -0,0 +1,30 @@
/*
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding.typed.internal
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors
import akka.cluster.sharding.{ ClusterSharding, ShardRegion }
import akka.cluster.sharding.ShardRegion.CurrentShardRegionState
import akka.actor.typed.scaladsl.adapter._
import akka.annotation.InternalApi
import akka.cluster.sharding.typed.{ ClusterShardingQuery, GetShardRegionState }
/**
* INTERNAL API
*/
@InternalApi
object ShardingState {
def behavior(untypedSharding: ClusterSharding): Behavior[ClusterShardingQuery] = Behaviors.receiveMessage {
case GetShardRegionState(key, replyTo)
if (untypedSharding.getShardTypeNames.contains(key.name)) {
untypedSharding.shardRegion(key.name).tell(ShardRegion.GetShardRegionState, replyTo.toUntyped)
} else {
replyTo ! CurrentShardRegionState(Set.empty)
}
Behavior.same
}
}

View file

@ -188,6 +188,11 @@ abstract class ClusterSharding {
*/ */
def entityRefFor[M](typeKey: EntityTypeKey[M], entityId: String): EntityRef[M] def entityRefFor[M](typeKey: EntityTypeKey[M], entityId: String): EntityRef[M]
/**
* Actor for querying Cluster Sharding state
*/
def shardState: ActorRef[ClusterShardingQuery]
/** /**
* The default is currently [[akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy]] with the * The default is currently [[akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy]] with the
* given `settings`. This could be changed in the future. * given `settings`. This could be changed in the future.

View file

@ -6,10 +6,10 @@ package akka.cluster.sharding.typed
package scaladsl package scaladsl
import scala.concurrent.Future import scala.concurrent.Future
import scala.collection.{ immutable im }
import akka.util.Timeout
import scala.reflect.ClassTag import scala.reflect.ClassTag
import akka.util.Timeout
import akka.actor.typed.ActorRef import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior import akka.actor.typed.Behavior
@ -24,7 +24,7 @@ import akka.annotation.InternalApi
import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy
import akka.cluster.sharding.typed.internal.ClusterShardingImpl import akka.cluster.sharding.typed.internal.ClusterShardingImpl
import akka.cluster.sharding.typed.internal.EntityTypeKeyImpl import akka.cluster.sharding.typed.internal.EntityTypeKeyImpl
import akka.cluster.sharding.ShardRegion.{ StartEntity UntypedStartEntity } import akka.cluster.sharding.ShardRegion.{ CurrentShardRegionState, StartEntity UntypedStartEntity }
import akka.persistence.typed.PersistenceId import akka.persistence.typed.PersistenceId
object ClusterSharding extends ExtensionId[ClusterSharding] { object ClusterSharding extends ExtensionId[ClusterSharding] {
@ -189,6 +189,11 @@ trait ClusterSharding extends Extension { javadslSelf: javadsl.ClusterSharding
*/ */
def entityRefFor[M](typeKey: EntityTypeKey[M], entityId: String): EntityRef[M] def entityRefFor[M](typeKey: EntityTypeKey[M], entityId: String): EntityRef[M]
/**
* Actor for querying Cluster Sharding state
*/
def shardState: ActorRef[ClusterShardingQuery]
/** /**
* The default is currently [[akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy]] with the * The default is currently [[akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy]] with the
* given `settings`. This could be changed in the future. * given `settings`. This could be changed in the future.
@ -199,6 +204,7 @@ trait ClusterSharding extends Extension { javadslSelf: javadsl.ClusterSharding
* INTERNAL API * INTERNAL API
*/ */
@InternalApi private[akka] def asJava: javadsl.ClusterSharding = javadslSelf @InternalApi private[akka] def asJava: javadsl.ClusterSharding = javadslSelf
} }
object Entity { object Entity {

View file

@ -62,7 +62,7 @@ object ClusterShardingSpec {
"akka.cluster.sharding.typed.scaladsl.ClusterShardingSpec$$IdTestProtocol" = test "akka.cluster.sharding.typed.scaladsl.ClusterShardingSpec$$IdTestProtocol" = test
} }
} }
""".stripMargin) """)
sealed trait TestProtocol extends java.io.Serializable sealed trait TestProtocol extends java.io.Serializable
final case class ReplyPlz(toMe: ActorRef[String]) extends TestProtocol final case class ReplyPlz(toMe: ActorRef[String]) extends TestProtocol
@ -131,23 +131,10 @@ object ClusterShardingSpec {
final case class TheReply(s: String) final case class TheReply(s: String)
} val typeKey = EntityTypeKey[TestProtocol]("envelope-shard")
val typeKey2 = EntityTypeKey[IdTestProtocol]("no-envelope-shard")
class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec.config) with WordSpecLike { def behavior(shard: ActorRef[ClusterSharding.ShardCommand], stopProbe: Option[ActorRef[String]] = None) =
import ClusterShardingSpec._
val sharding = ClusterSharding(system)
val system2 = ActorSystem(Behaviors.ignore[Any], name = system.name, config = system.settings.config)
val sharding2 = ClusterSharding(system2)
override def afterAll(): Unit = {
ActorTestKit.shutdown(system2, 5.seconds)
super.afterAll()
}
private val typeKey = EntityTypeKey[TestProtocol]("envelope-shard")
private def behavior(shard: ActorRef[ClusterSharding.ShardCommand], stopProbe: Option[ActorRef[String]] = None) =
Behaviors.receive[TestProtocol] { Behaviors.receive[TestProtocol] {
case (ctx, PassivatePlz()) case (ctx, PassivatePlz())
shard ! ClusterSharding.Passivate(ctx.self) shard ! ClusterSharding.Passivate(ctx.self)
@ -171,8 +158,7 @@ class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec.
Behaviors.same Behaviors.same
} }
private val typeKey2 = EntityTypeKey[IdTestProtocol]("no-envelope-shard") def behaviorWithId(shard: ActorRef[ClusterSharding.ShardCommand]) = Behaviors.receive[IdTestProtocol] {
private def behaviorWithId(shard: ActorRef[ClusterSharding.ShardCommand]) = Behaviors.receive[IdTestProtocol] {
case (_, IdStopPlz()) case (_, IdStopPlz())
Behaviors.stopped Behaviors.stopped
@ -186,6 +172,27 @@ class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec.
Behaviors.same Behaviors.same
} }
val idTestProtocolMessageExtractor = ShardingMessageExtractor.noEnvelope[IdTestProtocol](10, IdStopPlz()) {
case IdReplyPlz(id, _) id
case IdWhoAreYou(id, _) id
case other throw new IllegalArgumentException(s"Unexpected message $other")
}
}
class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec.config) with WordSpecLike {
import ClusterShardingSpec._
val sharding = ClusterSharding(system)
val system2 = ActorSystem(Behaviors.ignore[Any], name = system.name, config = system.settings.config)
val sharding2 = ClusterSharding(system2)
override def afterAll(): Unit = {
ActorTestKit.shutdown(system2, 5.seconds)
super.afterAll()
}
private val shardingRef1: ActorRef[ShardingEnvelope[TestProtocol]] = sharding.init(Entity( private val shardingRef1: ActorRef[ShardingEnvelope[TestProtocol]] = sharding.init(Entity(
typeKey, typeKey,
ctx behavior(ctx.shard)) ctx behavior(ctx.shard))
@ -210,12 +217,7 @@ class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec.
private val shardingRef4 = sharding2.init(Entity( private val shardingRef4 = sharding2.init(Entity(
typeKey2, typeKey2,
ctx behaviorWithId(ctx.shard)) ctx behaviorWithId(ctx.shard))
.withMessageExtractor( .withMessageExtractor(idTestProtocolMessageExtractor)
ShardingMessageExtractor.noEnvelope[IdTestProtocol](10, IdStopPlz()) {
case IdReplyPlz(id, _) id
case IdWhoAreYou(id, _) id
case other throw new IllegalArgumentException(s"Unexpected message $other")
})
.withStopMessage(IdStopPlz()) .withStopMessage(IdStopPlz())
) )

View file

@ -0,0 +1,62 @@
/*
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding.typed.scaladsl
import akka.actor.testkit.typed.scaladsl.{ ScalaTestWithActorTestKit, TestProbe }
import akka.actor.typed.ActorRef
import akka.cluster.sharding.ShardRegion.{ CurrentShardRegionState, ShardState }
import akka.cluster.sharding.typed.scaladsl.ClusterShardingSpec._
import akka.cluster.sharding.typed.{ GetShardRegionState, ShardingMessageExtractor }
import akka.cluster.typed.{ Cluster, Join }
import org.scalatest.WordSpecLike
class ClusterShardingStateSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec.config) with WordSpecLike {
val sharding = ClusterSharding(system)
val shardExtractor = ShardingMessageExtractor.noEnvelope[IdTestProtocol](10, IdStopPlz()) {
case IdReplyPlz(id, _) id
case IdWhoAreYou(id, _) id
case other throw new IllegalArgumentException(s"Unexpected message $other")
}
val cluster = Cluster(system)
val typeKey: EntityTypeKey[IdTestProtocol] = ClusterShardingSpec.typeKey2
"Cluster Sharding" must {
"allow querying of the shard region state" in {
val probe = TestProbe[CurrentShardRegionState]()
cluster.manager ! Join(cluster.selfMember.address)
// Before the region is started
sharding.shardState ! GetShardRegionState(typeKey, probe.ref)
probe.expectMessage(CurrentShardRegionState(Set()))
val shardingRef: ActorRef[IdTestProtocol] = sharding.init(
Entity(
typeKey,
ctx ClusterShardingSpec.behaviorWithId(ctx.shard))
.withStopMessage(IdStopPlz())
.withMessageExtractor(idTestProtocolMessageExtractor)
)
sharding.shardState ! GetShardRegionState(typeKey, probe.ref)
probe.expectMessage(CurrentShardRegionState(Set()))
// Create a shard
val replyProbe = TestProbe[String]()
shardingRef ! IdReplyPlz("id1", replyProbe.ref)
replyProbe.expectMessage("Hello!")
//#get-region-state
ClusterSharding(system).shardState ! GetShardRegionState(typeKey, probe.ref)
val state = probe.expectMessageType[CurrentShardRegionState]
//#get-region-state
state.shards should be(Set(ShardState(shardExtractor.shardId("id1"), Set("id1"))))
}
}
}

View file

@ -642,7 +642,7 @@ private[akka] class ShardRegion(
case (shardId, state) ShardRegion.ShardState(shardId, state.entityIds) case (shardId, state) ShardRegion.ShardState(shardId, state.entityIds)
}.toSet) }.toSet)
}.recover { }.recover {
case x: AskTimeoutException CurrentShardRegionState(Set.empty) case _: AskTimeoutException CurrentShardRegionState(Set.empty)
}.pipeTo(ref) }.pipeTo(ref)
} }

View file

@ -50,7 +50,7 @@ object Replicator {
* Convenience for `ask`. * Convenience for `ask`.
*/ */
def apply[A <: ReplicatedData](key: Key[A], consistency: ReadConsistency): ActorRef[GetResponse[A]] Get[A] = def apply[A <: ReplicatedData](key: Key[A], consistency: ReadConsistency): ActorRef[GetResponse[A]] Get[A] =
(replyTo Get(key, consistency, replyTo, None)) replyTo Get(key, consistency, replyTo, None)
} }
/** /**

View file

@ -8,11 +8,8 @@ import akka.actor.typed.*;
import akka.actor.typed.javadsl.Behaviors; import akka.actor.typed.javadsl.Behaviors;
//#import //#import
import akka.cluster.typed.ClusterSingleton; import akka.cluster.typed.*;
import akka.cluster.typed.ClusterSingletonSettings;
import java.time.Duration; import java.time.Duration;
//#import //#import
public class SingletonCompileOnlyTest { public class SingletonCompileOnlyTest {