diff --git a/akka-typed-tests/src/test/scala/akka/typed/cluster/sharding/ClusterShardingSpec.scala b/akka-typed-tests/src/test/scala/akka/typed/cluster/sharding/ClusterShardingSpec.scala index f075c0272d..187ed4c339 100644 --- a/akka-typed-tests/src/test/scala/akka/typed/cluster/sharding/ClusterShardingSpec.scala +++ b/akka-typed-tests/src/test/scala/akka/typed/cluster/sharding/ClusterShardingSpec.scala @@ -62,6 +62,7 @@ class ClusterShardingSpec extends TypedSpec(ClusterShardingSpec.config) with Sca implicit val untypedSystem = system.toUntyped private val untypedCluster = akka.cluster.Cluster(untypedSystem) + val typeKey = EntityTypeKey[TestProtocol]("envelope-shard") val behavior = Actor.immutable[TestProtocol] { case (_, StopPlz()) ⇒ Actor.stopped @@ -74,6 +75,8 @@ class ClusterShardingSpec extends TypedSpec(ClusterShardingSpec.config) with Sca toMe ! "Hello!" Actor.same } + + val typeKey2 = EntityTypeKey[IdTestProtocol]("no-envelope-shard") val behaviorWithId = Actor.immutable[IdTestProtocol] { case (_, IdStopPlz(_)) ⇒ Actor.stopped @@ -95,7 +98,7 @@ class ClusterShardingSpec extends TypedSpec(ClusterShardingSpec.config) with Sca val ref = sharding.spawn( behavior, Props.empty, - "envelope-shard", + typeKey, ClusterShardingSettings(system), 10, StopPlz()) @@ -110,7 +113,7 @@ class ClusterShardingSpec extends TypedSpec(ClusterShardingSpec.config) with Sca val ref = sharding.spawn( behaviorWithId, Props.empty, - "no-envelope-shard", + typeKey2, ClusterShardingSettings(system), ShardingMessageExtractor.noEnvelope[IdTestProtocol](10, _.id), IdStopPlz("THE_ID_HERE")) @@ -140,8 +143,7 @@ class ClusterShardingSpec extends TypedSpec(ClusterShardingSpec.config) with Sca untypedCluster.join(untypedCluster.selfAddress) def `11 EntityRef - tell`(): Unit = { - val charlieRef: EntityRef[TestProtocol] = - sharding.entityRefFor[TestProtocol]("envelope-shard", "charlie") + val charlieRef = sharding.entityRefFor(typeKey, "charlie") val p = TestProbe[String]() @@ -154,11 +156,9 @@ class ClusterShardingSpec extends TypedSpec(ClusterShardingSpec.config) with Sca charlieRef ! StopPlz() } - def `11 EntityRef - ask`(): Unit = { - val bobRef: EntityRef[TestProtocol] = - sharding.entityRefFor[TestProtocol]("envelope-shard", "bob") - val charlieRef: EntityRef[TestProtocol] = - sharding.entityRefFor[TestProtocol]("envelope-shard", "charlie") + def `12 EntityRef - ask`(): Unit = { + val bobRef = sharding.entityRefFor(typeKey, "bob") + val charlieRef = sharding.entityRefFor(typeKey, "charlie") val p = TestProbe[String]() diff --git a/akka-typed/src/main/scala/akka/typed/cluster/sharding/ClusterSharding.scala b/akka-typed/src/main/scala/akka/typed/cluster/sharding/ClusterSharding.scala index 7ec0a531b1..18693b64c9 100644 --- a/akka-typed/src/main/scala/akka/typed/cluster/sharding/ClusterSharding.scala +++ b/akka-typed/src/main/scala/akka/typed/cluster/sharding/ClusterSharding.scala @@ -12,6 +12,7 @@ import akka.typed.scaladsl.adapter.PropsAdapter import akka.typed.{ ActorRef, ActorSystem, Behavior, Extension, ExtensionId, Props } import scala.language.implicitConversions +import scala.reflect.ClassTag /** * Default envelope type that may be used with Cluster Sharding. @@ -63,8 +64,7 @@ object ShardingMessageExtractor { */ def noEnvelope[A]( maxNumberOfShards: Int, - extractEntityId: A ⇒ String - ): ShardingMessageExtractor[A, A] = + extractEntityId: A ⇒ String): ShardingMessageExtractor[A, A] = new HashCodeNoEnvelopeMessageExtractor[A](maxNumberOfShards) { // TODO catch MatchError here and return null for those to yield an "unhandled" when partial functions are used? def entityId(message: A) = extractEntityId(message) @@ -143,6 +143,28 @@ abstract class HashCodeNoEnvelopeMessageExtractor[A](maxNumberOfShards: Int) ext override def toString = s"HashCodeNoEnvelopeMessageExtractor($maxNumberOfShards)" } +/** + * The key of an entity type, the `name` must be unique. + */ +abstract class EntityTypeKey[T] { + def name: String +} + +object EntityTypeKey { + /** + * Scala API: Creates an `EntityTypeKey`. The `name` must be unique. + */ + def apply[T](name: String)(implicit tTag: ClassTag[T]): EntityTypeKey[T] = + AdaptedClusterShardingImpl.EntityTypeKeyImpl(name, implicitly[ClassTag[T]].runtimeClass.getName) + + /** + * Java API: Creates an `EntityTypeKey`. The `name` must be unique. + */ + def create[T](messageClass: Class[T], name: String): EntityTypeKey[T] = + AdaptedClusterShardingImpl.EntityTypeKeyImpl(name, messageClass.getName) + +} + object ClusterSharding extends ExtensionId[ClusterSharding] { override def createExtension(system: ActorSystem[_]): ClusterSharding = @@ -152,6 +174,15 @@ object ClusterSharding extends ExtensionId[ClusterSharding] { def get(system: ActorSystem[_]): ClusterSharding = apply(system) } +/** + * INTERNAL API + */ +@InternalApi private[akka] object AdaptedClusterShardingImpl { + final case class EntityTypeKeyImpl[T](name: String, messageClassName: String) extends EntityTypeKey[T] { + override def toString: String = s"EntityTypeKey[$messageClassName]($name)" + } +} + /** INTERNAL API */ @InternalApi final class AdaptedClusterShardingImpl(system: ActorSystem[_]) extends ClusterSharding { @@ -165,27 +196,27 @@ final class AdaptedClusterShardingImpl(system: ActorSystem[_]) extends ClusterSh override def spawn[A]( behavior: Behavior[A], entityProps: Props, - typeName: String, + typeKey: EntityTypeKey[A], settings: ClusterShardingSettings, maxNumberOfShards: Int, handOffStopMessage: A): ActorRef[ShardingEnvelope[A]] = { val extractor = new HashCodeMessageExtractor[A](maxNumberOfShards) - spawn(behavior, entityProps, typeName, settings, extractor, defaultShardAllocationStrategy(settings), handOffStopMessage) + spawn(behavior, entityProps, typeKey, settings, extractor, defaultShardAllocationStrategy(settings), handOffStopMessage) } override def spawn[E, A]( behavior: Behavior[A], entityProps: Props, - typeName: String, + typeKey: EntityTypeKey[A], settings: ClusterShardingSettings, messageExtractor: ShardingMessageExtractor[E, A], handOffStopMessage: A): ActorRef[E] = - spawn(behavior, entityProps, typeName, settings, messageExtractor, defaultShardAllocationStrategy(settings), handOffStopMessage) + spawn(behavior, entityProps, typeKey, settings, messageExtractor, defaultShardAllocationStrategy(settings), handOffStopMessage) override def spawn[E, A]( behavior: Behavior[A], entityProps: Props, - typeName: String, + typeKey: EntityTypeKey[A], settings: ClusterShardingSettings, extractor: ShardingMessageExtractor[E, A], allocationStrategy: ShardAllocationStrategy, @@ -197,42 +228,37 @@ final class AdaptedClusterShardingImpl(system: ActorSystem[_]) extends ClusterSh if (settings.shouldHostShard(cluster)) { system.log.info("Starting Shard Region [{}]...") untypedSharding.start( - typeName, + typeKey.name, PropsAdapter(behavior, entityProps), untypedSettings, extractor, extractor, defaultShardAllocationStrategy(settings), - handOffStopMessage - ) + handOffStopMessage) } else { system.log.info("Starting Shard Region Proxy [{}] (no actors will be hosted on this node)...") untypedSharding.startProxy( - typeName, + typeKey.name, settings.role, dataCenter = None, // TODO what about the multi-dc value here? extractShardId = extractor, - extractEntityId = extractor - ) + extractEntityId = extractor) } ActorRefAdapter(ref) } - override def entityRefFor[A](typeName: String, entityId: String): EntityRef[A] = { - new AdaptedEntityRefImpl[A](untypedSharding.shardRegion(typeName), entityId) + override def entityRefFor[A](typeKey: EntityTypeKey[A], entityId: String): EntityRef[A] = { + new AdaptedEntityRefImpl[A](untypedSharding.shardRegion(typeKey.name), entityId) } - override def getEntityRefFor[A](msgClass: Class[A], typeName: String, entityId: String): EntityRef[A] = - entityRefFor[A](typeName, entityId) - override def defaultShardAllocationStrategy(settings: ClusterShardingSettings): ShardAllocationStrategy = { val threshold = settings.tuningParameters.leastShardAllocationRebalanceThreshold val maxSimultaneousRebalance = settings.tuningParameters.leastShardAllocationMaxSimultaneousRebalance new LeastShardAllocationStrategy(threshold, maxSimultaneousRebalance) } - // --- extractor conversions --- + // --- extractor conversions --- @InternalApi private implicit def convertExtractEntityId[E, A](extractor: ShardingMessageExtractor[E, A]): UntypedShardRegion.ExtractEntityId = { // TODO what if msg was null @@ -259,16 +285,16 @@ sealed trait ClusterSharding extends Extension { * [[akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy]] will be used for shard allocation strategy. * * @param behavior The behavior for entities - * @param typeName A name that uniquely identifies the type of entity in this cluster + * @param typeKey A key that uniquely identifies the type of entity in this cluster * @param handOffStopMessage Message sent to an entity to tell it to stop * @tparam A The type of command the entity accepts */ - // TODO: FYI, I think it would be very good to have rule that "behavior, otherstuff" + // TODO: FYI, I think it would be very good to have rule that "behavior, otherstuff" // TODO: or "behavior, props, otherstuff" be the consistent style we want to promote in parameter ordering, WDYT? def spawn[A]( behavior: Behavior[A], props: Props, - typeName: String, + typeKey: EntityTypeKey[A], settings: ClusterShardingSettings, maxNumberOfShards: Int, handOffStopMessage: A): ActorRef[ShardingEnvelope[A]] @@ -277,7 +303,7 @@ sealed trait ClusterSharding extends Extension { * Spawn a shard region or a proxy depending on if the settings require role and if this node has such a role. * * @param behavior The behavior for entities - * @param typeName A name that uniquely identifies the type of entity in this cluster + * @param typeKey A key that uniquely identifies the type of entity in this cluster * @param entityProps Props to apply when starting an entity * @param allocationStrategy Allocation strategy which decides on which nodes to allocate new shards * @param handOffStopMessage Message sent to an entity to tell it to stop @@ -287,18 +313,17 @@ sealed trait ClusterSharding extends Extension { def spawn[E, A]( behavior: Behavior[A], entityProps: Props, - typeName: String, + typeKey: EntityTypeKey[A], settings: ClusterShardingSettings, messageExtractor: ShardingMessageExtractor[E, A], allocationStrategy: ShardAllocationStrategy, - handOffStopMessage: A - ): ActorRef[E] + handOffStopMessage: A): ActorRef[E] /** * Spawn a shard region or a proxy depending on if the settings require role and if this node has such a role. * * @param behavior The behavior for entities - * @param typeName A name that uniquely identifies the type of entity in this cluster + * @param typeKey A key that uniquely identifies the type of entity in this cluster * @param entityProps Props to apply when starting an entity * @param handOffStopMessage Message sent to an entity to tell it to stop * @tparam E A possible envelope around the message the entity accepts @@ -307,11 +332,10 @@ sealed trait ClusterSharding extends Extension { def spawn[E, A]( behavior: Behavior[A], entityProps: Props, - typeName: String, + typeKey: EntityTypeKey[A], settings: ClusterShardingSettings, messageExtractor: ShardingMessageExtractor[E, A], - handOffStopMessage: A - ): ActorRef[E] + handOffStopMessage: A): ActorRef[E] /** * Create an `ActorRef`-like reference to a specific sharded entity. @@ -324,20 +348,7 @@ sealed trait ClusterSharding extends Extension { * * For in-depth documentation of its semantics, see [[EntityRef]]. */ - def entityRefFor[A](typeName: String, entityId: String): EntityRef[A] - - /** - * Java API: Create an `ActorRef`-like reference to a specific sharded entity. - * Messages sent to it will be wrapped in a [[ShardingEnvelope]] and passed to the local shard region or proxy. - * - * Messages sent through this [[EntityRef]] will be wrapped in a [[ShardingEnvelope]] including the - * here provided `entityId`. - * - * FIXME a more typed version of this API will be explored in https://github.com/akka/akka/issues/23690 - * - * For in-depth documentation of its semantics, see [[EntityRef]]. - */ - def getEntityRefFor[A](msgClass: Class[A], typeName: String, entityId: String): EntityRef[A] + def entityRefFor[A](typeKey: EntityTypeKey[A], entityId: String): EntityRef[A] /** The default ShardAllocationStrategy currently is [[LeastShardAllocationStrategy]] however could be changed in the future. */ def defaultShardAllocationStrategy(settings: ClusterShardingSettings): ShardAllocationStrategy diff --git a/akka-typed/src/main/scala/akka/typed/internal/receptionist/ReceptionistImpl.scala b/akka-typed/src/main/scala/akka/typed/internal/receptionist/ReceptionistImpl.scala index 4adbd3aae5..4db3331abf 100644 --- a/akka-typed/src/main/scala/akka/typed/internal/receptionist/ReceptionistImpl.scala +++ b/akka-typed/src/main/scala/akka/typed/internal/receptionist/ReceptionistImpl.scala @@ -30,8 +30,8 @@ private[typed] trait ReceptionistBehaviorProvider { @InternalApi private[typed] object ReceptionistImpl extends ReceptionistBehaviorProvider { // FIXME: make sure to provide serializer - case class DefaultServiceKey[T](id: String)(implicit tTag: ClassTag[T]) extends ServiceKey[T] { - override def toString: String = s"ServiceKey[$tTag]($id)" + final case class DefaultServiceKey[T](id: String, typeName: String) extends ServiceKey[T] { + override def toString: String = s"ServiceKey[$typeName]($id)" } /** diff --git a/akka-typed/src/main/scala/akka/typed/receptionist/Receptionist.scala b/akka-typed/src/main/scala/akka/typed/receptionist/Receptionist.scala index 14bdc25c93..b90d3c57a2 100644 --- a/akka-typed/src/main/scala/akka/typed/receptionist/Receptionist.scala +++ b/akka-typed/src/main/scala/akka/typed/receptionist/Receptionist.scala @@ -40,8 +40,7 @@ class Receptionist(system: ActorSystem[_]) extends Extension { // FIXME: where should that timeout be configured? Shouldn't there be a better `Extension` // implementation that does this dance for us? - 10.seconds - )) + 10.seconds)) } } @@ -84,10 +83,17 @@ object Receptionist extends ExtensionId[Receptionist] { object ServiceKey { /** - * Creates a service key. The given ID should uniquely define a service with a given protocol. + * Scala API: Creates a service key. The given ID should uniquely define a service with a given protocol. */ - // FIXME: not sure if the ClassTag pulls its weight. It's only used in toString currently. - def apply[T](id: String)(implicit tTag: ClassTag[T]): ServiceKey[T] = ReceptionistImpl.DefaultServiceKey(id) + def apply[T](id: String)(implicit tTag: ClassTag[T]): ServiceKey[T] = + ReceptionistImpl.DefaultServiceKey(id, implicitly[ClassTag[T]].runtimeClass.getName) + + /** + * Java API: Creates a service key. The given ID should uniquely define a service with a given protocol. + */ + def create[T](clazz: Class[T], id: String): ServiceKey[T] = + ReceptionistImpl.DefaultServiceKey(id, clazz.getName) + } /** Internal superclass for external and internal commands */ diff --git a/build.sbt b/build.sbt index f984f9e855..8786a25226 100644 --- a/build.sbt +++ b/build.sbt @@ -160,6 +160,7 @@ lazy val streamTestsTck = akkaModule("akka-stream-tests-tck") lazy val typed = akkaModule("akka-typed") .dependsOn( testkit % "compile->compile;test->test", + persistence % "provided->compile", cluster % "provided->compile", clusterTools % "provided->compile", clusterSharding % "provided->compile", @@ -169,11 +170,11 @@ lazy val typedTests = akkaModule("akka-typed-tests") .dependsOn(typed, typedTestkit % "compile->compile;test->provided;test->test") // the provided dependencies .dependsOn( + persistence % "compile->compile;test->test", cluster % "test->test", clusterTools, clusterSharding, - distributedData, - persistence % "compile->compile;test->test") + distributedData) lazy val typedTestkit = akkaModule("akka-typed-testkit") .dependsOn(typed, testkit % "compile->compile;test->test")