EntityTypeKey in typed Cluster Sharding, #23690

This commit is contained in:
Patrik Nordwall 2017-09-22 15:41:20 +02:00
parent 386289ee70
commit 0a5181c1ce
5 changed files with 80 additions and 62 deletions

View file

@ -62,6 +62,7 @@ class ClusterShardingSpec extends TypedSpec(ClusterShardingSpec.config) with Sca
implicit val untypedSystem = system.toUntyped
private val untypedCluster = akka.cluster.Cluster(untypedSystem)
val typeKey = EntityTypeKey[TestProtocol]("envelope-shard")
val behavior = Actor.immutable[TestProtocol] {
case (_, StopPlz())
Actor.stopped
@ -74,6 +75,8 @@ class ClusterShardingSpec extends TypedSpec(ClusterShardingSpec.config) with Sca
toMe ! "Hello!"
Actor.same
}
val typeKey2 = EntityTypeKey[IdTestProtocol]("no-envelope-shard")
val behaviorWithId = Actor.immutable[IdTestProtocol] {
case (_, IdStopPlz(_))
Actor.stopped
@ -95,7 +98,7 @@ class ClusterShardingSpec extends TypedSpec(ClusterShardingSpec.config) with Sca
val ref = sharding.spawn(
behavior,
Props.empty,
"envelope-shard",
typeKey,
ClusterShardingSettings(system),
10,
StopPlz())
@ -110,7 +113,7 @@ class ClusterShardingSpec extends TypedSpec(ClusterShardingSpec.config) with Sca
val ref = sharding.spawn(
behaviorWithId,
Props.empty,
"no-envelope-shard",
typeKey2,
ClusterShardingSettings(system),
ShardingMessageExtractor.noEnvelope[IdTestProtocol](10, _.id),
IdStopPlz("THE_ID_HERE"))
@ -140,8 +143,7 @@ class ClusterShardingSpec extends TypedSpec(ClusterShardingSpec.config) with Sca
untypedCluster.join(untypedCluster.selfAddress)
def `11 EntityRef - tell`(): Unit = {
val charlieRef: EntityRef[TestProtocol] =
sharding.entityRefFor[TestProtocol]("envelope-shard", "charlie")
val charlieRef = sharding.entityRefFor(typeKey, "charlie")
val p = TestProbe[String]()
@ -154,11 +156,9 @@ class ClusterShardingSpec extends TypedSpec(ClusterShardingSpec.config) with Sca
charlieRef ! StopPlz()
}
def `11 EntityRef - ask`(): Unit = {
val bobRef: EntityRef[TestProtocol] =
sharding.entityRefFor[TestProtocol]("envelope-shard", "bob")
val charlieRef: EntityRef[TestProtocol] =
sharding.entityRefFor[TestProtocol]("envelope-shard", "charlie")
def `12 EntityRef - ask`(): Unit = {
val bobRef = sharding.entityRefFor(typeKey, "bob")
val charlieRef = sharding.entityRefFor(typeKey, "charlie")
val p = TestProbe[String]()

View file

@ -12,6 +12,7 @@ import akka.typed.scaladsl.adapter.PropsAdapter
import akka.typed.{ ActorRef, ActorSystem, Behavior, Extension, ExtensionId, Props }
import scala.language.implicitConversions
import scala.reflect.ClassTag
/**
* Default envelope type that may be used with Cluster Sharding.
@ -63,8 +64,7 @@ object ShardingMessageExtractor {
*/
def noEnvelope[A](
maxNumberOfShards: Int,
extractEntityId: A String
): ShardingMessageExtractor[A, A] =
extractEntityId: A String): ShardingMessageExtractor[A, A] =
new HashCodeNoEnvelopeMessageExtractor[A](maxNumberOfShards) {
// TODO catch MatchError here and return null for those to yield an "unhandled" when partial functions are used?
def entityId(message: A) = extractEntityId(message)
@ -143,6 +143,28 @@ abstract class HashCodeNoEnvelopeMessageExtractor[A](maxNumberOfShards: Int) ext
override def toString = s"HashCodeNoEnvelopeMessageExtractor($maxNumberOfShards)"
}
/**
* The key of an entity type, the `name` must be unique.
*/
abstract class EntityTypeKey[T] {
def name: String
}
object EntityTypeKey {
/**
* Scala API: Creates an `EntityTypeKey`. The `name` must be unique.
*/
def apply[T](name: String)(implicit tTag: ClassTag[T]): EntityTypeKey[T] =
AdaptedClusterShardingImpl.EntityTypeKeyImpl(name, implicitly[ClassTag[T]].runtimeClass.getName)
/**
* Java API: Creates an `EntityTypeKey`. The `name` must be unique.
*/
def create[T](messageClass: Class[T], name: String): EntityTypeKey[T] =
AdaptedClusterShardingImpl.EntityTypeKeyImpl(name, messageClass.getName)
}
object ClusterSharding extends ExtensionId[ClusterSharding] {
override def createExtension(system: ActorSystem[_]): ClusterSharding =
@ -152,6 +174,15 @@ object ClusterSharding extends ExtensionId[ClusterSharding] {
def get(system: ActorSystem[_]): ClusterSharding = apply(system)
}
/**
* INTERNAL API
*/
@InternalApi private[akka] object AdaptedClusterShardingImpl {
final case class EntityTypeKeyImpl[T](name: String, messageClassName: String) extends EntityTypeKey[T] {
override def toString: String = s"EntityTypeKey[$messageClassName]($name)"
}
}
/** INTERNAL API */
@InternalApi
final class AdaptedClusterShardingImpl(system: ActorSystem[_]) extends ClusterSharding {
@ -165,27 +196,27 @@ final class AdaptedClusterShardingImpl(system: ActorSystem[_]) extends ClusterSh
override def spawn[A](
behavior: Behavior[A],
entityProps: Props,
typeName: String,
typeKey: EntityTypeKey[A],
settings: ClusterShardingSettings,
maxNumberOfShards: Int,
handOffStopMessage: A): ActorRef[ShardingEnvelope[A]] = {
val extractor = new HashCodeMessageExtractor[A](maxNumberOfShards)
spawn(behavior, entityProps, typeName, settings, extractor, defaultShardAllocationStrategy(settings), handOffStopMessage)
spawn(behavior, entityProps, typeKey, settings, extractor, defaultShardAllocationStrategy(settings), handOffStopMessage)
}
override def spawn[E, A](
behavior: Behavior[A],
entityProps: Props,
typeName: String,
typeKey: EntityTypeKey[A],
settings: ClusterShardingSettings,
messageExtractor: ShardingMessageExtractor[E, A],
handOffStopMessage: A): ActorRef[E] =
spawn(behavior, entityProps, typeName, settings, messageExtractor, defaultShardAllocationStrategy(settings), handOffStopMessage)
spawn(behavior, entityProps, typeKey, settings, messageExtractor, defaultShardAllocationStrategy(settings), handOffStopMessage)
override def spawn[E, A](
behavior: Behavior[A],
entityProps: Props,
typeName: String,
typeKey: EntityTypeKey[A],
settings: ClusterShardingSettings,
extractor: ShardingMessageExtractor[E, A],
allocationStrategy: ShardAllocationStrategy,
@ -197,35 +228,30 @@ final class AdaptedClusterShardingImpl(system: ActorSystem[_]) extends ClusterSh
if (settings.shouldHostShard(cluster)) {
system.log.info("Starting Shard Region [{}]...")
untypedSharding.start(
typeName,
typeKey.name,
PropsAdapter(behavior, entityProps),
untypedSettings,
extractor, extractor,
defaultShardAllocationStrategy(settings),
handOffStopMessage
)
handOffStopMessage)
} else {
system.log.info("Starting Shard Region Proxy [{}] (no actors will be hosted on this node)...")
untypedSharding.startProxy(
typeName,
typeKey.name,
settings.role,
dataCenter = None, // TODO what about the multi-dc value here?
extractShardId = extractor,
extractEntityId = extractor
)
extractEntityId = extractor)
}
ActorRefAdapter(ref)
}
override def entityRefFor[A](typeName: String, entityId: String): EntityRef[A] = {
new AdaptedEntityRefImpl[A](untypedSharding.shardRegion(typeName), entityId)
override def entityRefFor[A](typeKey: EntityTypeKey[A], entityId: String): EntityRef[A] = {
new AdaptedEntityRefImpl[A](untypedSharding.shardRegion(typeKey.name), entityId)
}
override def getEntityRefFor[A](msgClass: Class[A], typeName: String, entityId: String): EntityRef[A] =
entityRefFor[A](typeName, entityId)
override def defaultShardAllocationStrategy(settings: ClusterShardingSettings): ShardAllocationStrategy = {
val threshold = settings.tuningParameters.leastShardAllocationRebalanceThreshold
val maxSimultaneousRebalance = settings.tuningParameters.leastShardAllocationMaxSimultaneousRebalance
@ -259,7 +285,7 @@ sealed trait ClusterSharding extends Extension {
* [[akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy]] will be used for shard allocation strategy.
*
* @param behavior The behavior for entities
* @param typeName A name that uniquely identifies the type of entity in this cluster
* @param typeKey A key that uniquely identifies the type of entity in this cluster
* @param handOffStopMessage Message sent to an entity to tell it to stop
* @tparam A The type of command the entity accepts
*/
@ -268,7 +294,7 @@ sealed trait ClusterSharding extends Extension {
def spawn[A](
behavior: Behavior[A],
props: Props,
typeName: String,
typeKey: EntityTypeKey[A],
settings: ClusterShardingSettings,
maxNumberOfShards: Int,
handOffStopMessage: A): ActorRef[ShardingEnvelope[A]]
@ -277,7 +303,7 @@ sealed trait ClusterSharding extends Extension {
* Spawn a shard region or a proxy depending on if the settings require role and if this node has such a role.
*
* @param behavior The behavior for entities
* @param typeName A name that uniquely identifies the type of entity in this cluster
* @param typeKey A key that uniquely identifies the type of entity in this cluster
* @param entityProps Props to apply when starting an entity
* @param allocationStrategy Allocation strategy which decides on which nodes to allocate new shards
* @param handOffStopMessage Message sent to an entity to tell it to stop
@ -287,18 +313,17 @@ sealed trait ClusterSharding extends Extension {
def spawn[E, A](
behavior: Behavior[A],
entityProps: Props,
typeName: String,
typeKey: EntityTypeKey[A],
settings: ClusterShardingSettings,
messageExtractor: ShardingMessageExtractor[E, A],
allocationStrategy: ShardAllocationStrategy,
handOffStopMessage: A
): ActorRef[E]
handOffStopMessage: A): ActorRef[E]
/**
* Spawn a shard region or a proxy depending on if the settings require role and if this node has such a role.
*
* @param behavior The behavior for entities
* @param typeName A name that uniquely identifies the type of entity in this cluster
* @param typeKey A key that uniquely identifies the type of entity in this cluster
* @param entityProps Props to apply when starting an entity
* @param handOffStopMessage Message sent to an entity to tell it to stop
* @tparam E A possible envelope around the message the entity accepts
@ -307,11 +332,10 @@ sealed trait ClusterSharding extends Extension {
def spawn[E, A](
behavior: Behavior[A],
entityProps: Props,
typeName: String,
typeKey: EntityTypeKey[A],
settings: ClusterShardingSettings,
messageExtractor: ShardingMessageExtractor[E, A],
handOffStopMessage: A
): ActorRef[E]
handOffStopMessage: A): ActorRef[E]
/**
* Create an `ActorRef`-like reference to a specific sharded entity.
@ -324,20 +348,7 @@ sealed trait ClusterSharding extends Extension {
*
* For in-depth documentation of its semantics, see [[EntityRef]].
*/
def entityRefFor[A](typeName: String, entityId: String): EntityRef[A]
/**
* Java API: Create an `ActorRef`-like reference to a specific sharded entity.
* Messages sent to it will be wrapped in a [[ShardingEnvelope]] and passed to the local shard region or proxy.
*
* Messages sent through this [[EntityRef]] will be wrapped in a [[ShardingEnvelope]] including the
* here provided `entityId`.
*
* FIXME a more typed version of this API will be explored in https://github.com/akka/akka/issues/23690
*
* For in-depth documentation of its semantics, see [[EntityRef]].
*/
def getEntityRefFor[A](msgClass: Class[A], typeName: String, entityId: String): EntityRef[A]
def entityRefFor[A](typeKey: EntityTypeKey[A], entityId: String): EntityRef[A]
/** The default ShardAllocationStrategy currently is [[LeastShardAllocationStrategy]] however could be changed in the future. */
def defaultShardAllocationStrategy(settings: ClusterShardingSettings): ShardAllocationStrategy

View file

@ -30,8 +30,8 @@ private[typed] trait ReceptionistBehaviorProvider {
@InternalApi
private[typed] object ReceptionistImpl extends ReceptionistBehaviorProvider {
// FIXME: make sure to provide serializer
case class DefaultServiceKey[T](id: String)(implicit tTag: ClassTag[T]) extends ServiceKey[T] {
override def toString: String = s"ServiceKey[$tTag]($id)"
final case class DefaultServiceKey[T](id: String, typeName: String) extends ServiceKey[T] {
override def toString: String = s"ServiceKey[$typeName]($id)"
}
/**

View file

@ -40,8 +40,7 @@ class Receptionist(system: ActorSystem[_]) extends Extension {
// FIXME: where should that timeout be configured? Shouldn't there be a better `Extension`
// implementation that does this dance for us?
10.seconds
))
10.seconds))
}
}
@ -84,10 +83,17 @@ object Receptionist extends ExtensionId[Receptionist] {
object ServiceKey {
/**
* Creates a service key. The given ID should uniquely define a service with a given protocol.
* Scala API: Creates a service key. The given ID should uniquely define a service with a given protocol.
*/
// FIXME: not sure if the ClassTag pulls its weight. It's only used in toString currently.
def apply[T](id: String)(implicit tTag: ClassTag[T]): ServiceKey[T] = ReceptionistImpl.DefaultServiceKey(id)
def apply[T](id: String)(implicit tTag: ClassTag[T]): ServiceKey[T] =
ReceptionistImpl.DefaultServiceKey(id, implicitly[ClassTag[T]].runtimeClass.getName)
/**
* Java API: Creates a service key. The given ID should uniquely define a service with a given protocol.
*/
def create[T](clazz: Class[T], id: String): ServiceKey[T] =
ReceptionistImpl.DefaultServiceKey(id, clazz.getName)
}
/** Internal superclass for external and internal commands */

View file

@ -160,6 +160,7 @@ lazy val streamTestsTck = akkaModule("akka-stream-tests-tck")
lazy val typed = akkaModule("akka-typed")
.dependsOn(
testkit % "compile->compile;test->test",
persistence % "provided->compile",
cluster % "provided->compile",
clusterTools % "provided->compile",
clusterSharding % "provided->compile",
@ -169,11 +170,11 @@ lazy val typedTests = akkaModule("akka-typed-tests")
.dependsOn(typed, typedTestkit % "compile->compile;test->provided;test->test")
// the provided dependencies
.dependsOn(
persistence % "compile->compile;test->test",
cluster % "test->test",
clusterTools,
clusterSharding,
distributedData,
persistence % "compile->compile;test->test")
distributedData)
lazy val typedTestkit = akkaModule("akka-typed-testkit")
.dependsOn(typed, testkit % "compile->compile;test->test")