entityId => Behavior in ClusterSharding API, #24470

* spawn with String => Behavior since the entityId is often needed
* some type inference is lost, and completely breaks down with overloads
This commit is contained in:
Patrik Nordwall 2018-02-01 12:54:29 +01:00
parent 0cc9785d73
commit 2cd1187e7b
10 changed files with 182 additions and 61 deletions

View file

@ -249,26 +249,45 @@ final class AdaptedClusterShardingImpl(system: ActorSystem[_]) extends ClusterSh
private val log: LoggingAdapter = Logging(untypedSystem, classOf[ClusterSharding])
override def spawn[A](
behavior: Behavior[A],
behavior: String Behavior[A],
entityProps: Props,
typeKey: EntityTypeKey[A],
settings: ClusterShardingSettings,
maxNumberOfShards: Int,
handOffStopMessage: A): ActorRef[ShardingEnvelope[A]] = {
val extractor = new HashCodeMessageExtractor[A](maxNumberOfShards, handOffStopMessage)
spawn(behavior, entityProps, typeKey, settings, extractor, defaultShardAllocationStrategy(settings))
spawn2(behavior, entityProps, typeKey, settings, extractor, defaultShardAllocationStrategy(settings))
}
override def spawn[E, A](
behavior: Behavior[A],
override def spawnJavadsl[A](
behavior: EntityIdToBehavior[A],
entityProps: Props,
typeKey: EntityTypeKey[A],
settings: ClusterShardingSettings,
maxNumberOfShards: Int,
handOffStopMessage: A): ActorRef[ShardingEnvelope[A]] = {
val extractor = new HashCodeMessageExtractor[A](maxNumberOfShards, handOffStopMessage)
spawnJavadsl(behavior, entityProps, typeKey, settings, extractor, defaultShardAllocationStrategy(settings))
}
override def spawn3[E, A](
behavior: String Behavior[A],
entityProps: Props,
typeKey: EntityTypeKey[A],
settings: ClusterShardingSettings,
messageExtractor: ShardingMessageExtractor[E, A]): ActorRef[E] =
spawn(behavior, entityProps, typeKey, settings, messageExtractor, defaultShardAllocationStrategy(settings))
spawn2(behavior, entityProps, typeKey, settings, messageExtractor, defaultShardAllocationStrategy(settings))
override def spawn[E, A](
behavior: Behavior[A],
override def spawnJavadsl[E, A](
behavior: EntityIdToBehavior[A],
entityProps: Props,
typeKey: EntityTypeKey[A],
settings: ClusterShardingSettings,
messageExtractor: ShardingMessageExtractor[E, A]): ActorRef[E] =
spawnJavadsl(behavior, entityProps, typeKey, settings, messageExtractor, defaultShardAllocationStrategy(settings))
override def spawn2[E, A](
behavior: String Behavior[A],
entityProps: Props,
typeKey: EntityTypeKey[A],
settings: ClusterShardingSettings,
@ -294,14 +313,16 @@ final class AdaptedClusterShardingImpl(system: ActorSystem[_]) extends ClusterSh
if (settings.shouldHostShard(cluster)) {
log.info("Starting Shard Region [{}]...", typeKey.name)
val untypedProps = behavior match {
val untypedEntityPropsFactory: String akka.actor.Props = { entityId
behavior(entityId) match {
case u: UntypedBehavior[_] u.untypedProps // PersistentBehavior
case _ PropsAdapter(behavior, entityProps)
case b PropsAdapter(b, entityProps)
}
}
untypedSharding.start(
untypedSharding.internalStart(
typeKey.name,
untypedProps,
untypedEntityPropsFactory,
untypedSettings,
extractEntityId,
extractShardId,
@ -321,6 +342,16 @@ final class AdaptedClusterShardingImpl(system: ActorSystem[_]) extends ClusterSh
ActorRefAdapter(ref)
}
override def spawnJavadsl[E, A](
behavior: EntityIdToBehavior[A],
entityProps: Props,
typeKey: EntityTypeKey[A],
settings: ClusterShardingSettings,
extractor: ShardingMessageExtractor[E, A],
allocationStrategy: ShardAllocationStrategy): ActorRef[E] = {
spawn2(entityId behavior.apply(entityId), entityProps, typeKey, settings, extractor, allocationStrategy)
}
override def entityRefFor[A](typeKey: EntityTypeKey[A], entityId: String): EntityRef[A] = {
new AdaptedEntityRefImpl[A](untypedSharding.shardRegion(typeKey.name), entityId)
}
@ -333,24 +364,30 @@ final class AdaptedClusterShardingImpl(system: ActorSystem[_]) extends ClusterSh
}
@FunctionalInterface
trait EntityIdToBehavior[A] {
def apply(entityId: String): Behavior[A]
}
@DoNotInherit
sealed abstract class ClusterSharding extends Extension {
/**
* Spawn a shard region or a proxy depending on if the settings require role and if this node has such a role.
* Scala API: Spawn a shard region or a proxy depending on if the settings require role and if this node has
* such a role.
*
* Messages are sent to the entities by wrapping the messages in a [[ShardingEnvelope]] with the entityId of the
* recipient actor.
* A [[HashCodeMessageExtractor]] will be used for extracting entityId and shardId
* [[akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy]] will be used for shard allocation strategy.
*
* @param behavior The behavior for entities
* @param behavior Create the behavior for an entity given a entityId
* @param typeKey A key that uniquely identifies the type of entity in this cluster
* @param handOffStopMessage Message sent to an entity to tell it to stop, e.g. when rebalanced.
* @tparam A The type of command the entity accepts
*/
def spawn[A](
behavior: Behavior[A],
behavior: String Behavior[A],
props: Props,
typeKey: EntityTypeKey[A],
settings: ClusterShardingSettings,
@ -358,9 +395,32 @@ sealed abstract class ClusterSharding extends Extension {
handOffStopMessage: A): ActorRef[ShardingEnvelope[A]]
/**
* Spawn a shard region or a proxy depending on if the settings require role and if this node has such a role.
* Java API: Spawn a shard region or a proxy depending on if the settings require role and if this node has
* such a role.
*
* @param behavior The behavior for entities
* Messages are sent to the entities by wrapping the messages in a [[ShardingEnvelope]] with the entityId of the
* recipient actor.
* A [[HashCodeMessageExtractor]] will be used for extracting entityId and shardId
* [[akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy]] will be used for shard allocation strategy.
*
* @param behavior Create the behavior for an entity given a entityId
* @param typeKey A key that uniquely identifies the type of entity in this cluster
* @param handOffStopMessage Message sent to an entity to tell it to stop, e.g. when rebalanced.
* @tparam A The type of command the entity accepts
*/
def spawnJavadsl[A]( // FIXME javadsl package
behavior: EntityIdToBehavior[A],
props: Props,
typeKey: EntityTypeKey[A],
settings: ClusterShardingSettings,
maxNumberOfShards: Int,
handOffStopMessage: A): ActorRef[ShardingEnvelope[A]]
/**
* Scala API: Spawn a shard region or a proxy depending on if the settings require role and if this node
* has such a role.
*
* @param behavior Create the behavior for an entity given a entityId
* @param typeKey A key that uniquely identifies the type of entity in this cluster
* @param entityProps Props to apply when starting an entity
* @param messageExtractor Extract entityId, shardId, and unwrap messages.
@ -368,8 +428,8 @@ sealed abstract class ClusterSharding extends Extension {
* @tparam E A possible envelope around the message the entity accepts
* @tparam A The type of command the entity accepts
*/
def spawn[E, A](
behavior: Behavior[A],
def spawn2[E, A](
behavior: String Behavior[A],
entityProps: Props,
typeKey: EntityTypeKey[A],
settings: ClusterShardingSettings,
@ -377,17 +437,56 @@ sealed abstract class ClusterSharding extends Extension {
allocationStrategy: ShardAllocationStrategy): ActorRef[E]
/**
* Spawn a shard region or a proxy depending on if the settings require role and if this node has such a role.
* Java API: Spawn a shard region or a proxy depending on if the settings require role and if this node
* has such a role.
*
* @param behavior The behavior for entities
* @param behavior Create the behavior for an entity given a entityId
* @param typeKey A key that uniquely identifies the type of entity in this cluster
* @param entityProps Props to apply when starting an entity
* @param messageExtractor Extract entityId, shardId, and unwrap messages.
* @param allocationStrategy Allocation strategy which decides on which nodes to allocate new shards
* @tparam E A possible envelope around the message the entity accepts
* @tparam A The type of command the entity accepts
*/
def spawnJavadsl[E, A]( // FIXME javadsl package
behavior: EntityIdToBehavior[A],
entityProps: Props,
typeKey: EntityTypeKey[A],
settings: ClusterShardingSettings,
messageExtractor: ShardingMessageExtractor[E, A],
allocationStrategy: ShardAllocationStrategy): ActorRef[E]
/**
* Scala API: Spawn a shard region or a proxy depending on if the settings require role and if this node
* has such a role.
*
* @param behavior Create the behavior for an entity given a entityId
* @param typeKey A key that uniquely identifies the type of entity in this cluster
* @param entityProps Props to apply when starting an entity
* @param messageExtractor Extract entityId, shardId, and unwrap messages.
* @tparam E A possible envelope around the message the entity accepts
* @tparam A The type of command the entity accepts
*/
def spawn[E, A](
behavior: Behavior[A],
def spawn3[E, A](
behavior: String Behavior[A],
entityProps: Props,
typeKey: EntityTypeKey[A],
settings: ClusterShardingSettings,
messageExtractor: ShardingMessageExtractor[E, A]): ActorRef[E]
/**
* Java API: Spawn a shard region or a proxy depending on if the settings require role and if this node
* has such a role.
*
* @param behavior Create the behavior for an entity given a entityId
* @param typeKey A key that uniquely identifies the type of entity in this cluster
* @param entityProps Props to apply when starting an entity
* @param messageExtractor Extract entityId, shardId, and unwrap messages.
* @tparam E A possible envelope around the message the entity accepts
* @tparam A The type of command the entity accepts
*/
def spawnJavadsl[E, A]( // FIXME javadsl package
behavior: EntityIdToBehavior[A],
entityProps: Props,
typeKey: EntityTypeKey[A],
settings: ClusterShardingSettings,

View file

@ -24,10 +24,10 @@ public class ShardingCompileOnlyTest {
}
}
public static Behavior<CounterCommand> counter(Integer value) {
public static Behavior<CounterCommand> counter(String entityId, Integer value) {
return Behaviors.immutable(CounterCommand.class)
.onMessage(Increment.class, (ctx, msg) -> {
return counter(value + 1);
return counter(entityId,value + 1);
})
.onMessage(GetValue.class, (ctx, msg) -> {
msg.replyTo.tell(value);
@ -49,8 +49,8 @@ public class ShardingCompileOnlyTest {
//#spawn
EntityTypeKey<CounterCommand> typeKey = EntityTypeKey.create(CounterCommand.class, "Counter");
ActorRef<ShardingEnvelope<CounterCommand>> shardRegion = sharding.spawn(
counter(0),
ActorRef<ShardingEnvelope<CounterCommand>> shardRegion = sharding.spawnJavadsl(
entityId -> counter(entityId,0),
Props.empty(),
typeKey,
ClusterShardingSettings.create(system),
@ -69,7 +69,7 @@ public class ShardingCompileOnlyTest {
ClusterSingleton singleton = ClusterSingleton.get(system);
// Start if needed and provide a proxy to a named singleton
ActorRef<CounterCommand> proxy = singleton.spawn(
counter(0),
counter("TheCounter", 0),
"GlobalCounter",
Props.empty(),
ClusterSingletonSettings.create(system),

View file

@ -4,6 +4,8 @@
package akka.cluster.sharding.typed
import akka.actor.typed.{ ActorRef, Behavior, Props, TypedAkkaSpecWithShutdown }
import akka.cluster.typed.Cluster
import akka.cluster.typed.Join
import akka.persistence.typed.scaladsl.PersistentBehaviors
import akka.testkit.typed.TestKit
import akka.testkit.typed.scaladsl.TestProbe
@ -58,20 +60,15 @@ object ClusterShardingPersistenceSpec {
class ClusterShardingPersistenceSpec extends TestKit("ClusterShardingPersistenceSPec", ClusterShardingPersistenceSpec.config)
with TypedAkkaSpecWithShutdown {
import ClusterShardingPersistenceSpec._
import akka.actor.typed.scaladsl.adapter._
implicit val s = system
val sharding = ClusterSharding(system)
implicit val untypedSystem = system.toUntyped
private val untypedCluster = akka.cluster.Cluster(untypedSystem)
"Typed cluster sharding with persistent actor" must {
untypedCluster.join(untypedCluster.selfAddress)
Cluster(system).manager ! Join(Cluster(system).selfMember.address)
"start persistent actor" in {
ClusterSharding(system).spawn(persistentActor, Props.empty, typeKey,
ClusterSharding(system).spawn[Command](_ persistentActor, Props.empty, typeKey,
ClusterShardingSettings(system), maxNumberOfShards = 100, handOffStopMessage = StopPlz)
val p = TestProbe[String]()

View file

@ -163,24 +163,24 @@ class ClusterShardingSpec extends TestKit("ClusterShardingSpec", ClusterSharding
Behaviors.same
}
val shardingRef1 = sharding.spawn(
behavior,
val shardingRef1: ActorRef[ShardingEnvelope[TestProtocol]] = sharding.spawn[TestProtocol](
_ behavior,
Props.empty,
typeKey,
ClusterShardingSettings(system),
10,
StopPlz())
val shardingRef2 = sharding2.spawn(
behavior,
val shardingRef2 = sharding2.spawn[TestProtocol](
_ behavior,
Props.empty,
typeKey,
ClusterShardingSettings(system2),
10,
StopPlz())
val shardingRef3 = sharding.spawn(
behaviorWithId,
val shardingRef3: ActorRef[IdTestProtocol] = sharding.spawn3[IdTestProtocol, IdTestProtocol](
_ behaviorWithId,
Props.empty,
typeKey2,
ClusterShardingSettings(system),
@ -190,8 +190,8 @@ class ClusterShardingSpec extends TestKit("ClusterShardingSpec", ClusterSharding
case other throw new IllegalArgumentException(s"Unexpected message $other")
})
val shardingRef4 = sharding2.spawn(
behaviorWithId,
val shardingRef4 = sharding2.spawn3[IdTestProtocol, IdTestProtocol](
_ behaviorWithId,
Props.empty,
typeKey2,
ClusterShardingSettings(system2),

View file

@ -21,9 +21,9 @@ object ShardingCompileOnlySpec {
final case class GetValue(replyTo: ActorRef[Int]) extends CounterCommand
case object GoodByeCounter extends CounterCommand
def counter(value: Int): Behavior[CounterCommand] = Behaviors.immutable[CounterCommand] {
def counter(entityId: String, value: Int): Behavior[CounterCommand] = Behaviors.immutable[CounterCommand] {
case (ctx, Increment)
counter(value + 1)
counter(entityId, value + 1)
case (ctx, GetValue(replyTo))
replyTo ! value
Behaviors.same
@ -34,7 +34,7 @@ object ShardingCompileOnlySpec {
val TypeKey = EntityTypeKey[CounterCommand]("Counter")
// if a extractor is defined then the type would be ActorRef[BasicCommand]
val shardRegion: ActorRef[ShardingEnvelope[CounterCommand]] = sharding.spawn[CounterCommand](
behavior = counter(0),
behavior = entityId counter(entityId, 0),
props = Props.empty,
typeKey = TypeKey,
settings = ClusterShardingSettings(system),
@ -54,7 +54,7 @@ object ShardingCompileOnlySpec {
//#persistence
val ShardingTypeName = EntityTypeKey[BlogCommand]("BlogPost")
ClusterSharding(system).spawn[BlogCommand](
behavior = InDepthPersistentBehaviorSpec.behavior,
behavior = _ InDepthPersistentBehaviorSpec.behavior,
props = Props.empty,
typeKey = ShardingTypeName,
settings = ClusterShardingSettings(system),
@ -68,7 +68,7 @@ object ShardingCompileOnlySpec {
val singletonManager = ClusterSingleton(system)
// Start if needed and provide a proxy to a named singleton
val proxy: ActorRef[CounterCommand] = singletonManager.spawn(
behavior = counter(0),
behavior = counter("TheCounter", 0),
"GlobalCounter",
Props.empty,
ClusterSingletonSettings(system),

View file

@ -0,0 +1,8 @@
# #24470 Sharding entityId => Behavior factory
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.sharding.Shard.props")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.sharding.DDataShard.this")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.sharding.PersistentShard.this")
ProblemFilters.exclude[Problem]("akka.cluster.sharding.ClusterShardingGuardian*")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.sharding.ShardRegion.props")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.sharding.Shard.this")

View file

@ -8,6 +8,7 @@ import java.util.Optional
import java.util.concurrent.ConcurrentHashMap
import scala.concurrent.Await
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.ActorSystem
@ -28,16 +29,17 @@ import akka.pattern.ask
import akka.dispatch.Dispatchers
import akka.cluster.ddata.ReplicatorSettings
import akka.cluster.ddata.Replicator
import scala.util.control.NonFatal
import akka.actor.Status
import akka.cluster.ClusterSettings
import akka.cluster.ClusterSettings.DataCenter
import akka.stream.{ Inlet, Outlet }
import scala.collection.immutable
import scala.collection.JavaConverters._
import akka.annotation.InternalApi
/**
* This extension provides sharding functionality of actors in a cluster.
* The typical use case is when you have many stateful actors that together consume
@ -219,6 +221,21 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
allocationStrategy: ShardAllocationStrategy,
handOffStopMessage: Any): ActorRef = {
internalStart(typeName, _ entityProps, settings, extractEntityId, extractShardId, allocationStrategy, handOffStopMessage)
}
/**
* INTERNAL API
*/
@InternalApi private[akka] def internalStart(
typeName: String,
entityProps: String Props,
settings: ClusterShardingSettings,
extractEntityId: ShardRegion.ExtractEntityId,
extractShardId: ShardRegion.ExtractShardId,
allocationStrategy: ShardAllocationStrategy,
handOffStopMessage: Any): ActorRef = {
requireClusterRole(settings.role)
implicit val timeout = system.settings.CreationTimeout
val startMsg = Start(typeName, entityProps, settings,
@ -290,9 +307,9 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
allocationStrategy: ShardAllocationStrategy,
handOffStopMessage: Any): ActorRef = {
start(
internalStart(
typeName,
entityProps,
_ entityProps,
settings,
extractEntityId = {
case msg if messageExtractor.entityId(msg) ne null
@ -517,7 +534,7 @@ private[akka] object ClusterShardingGuardian {
import ShardCoordinator.ShardAllocationStrategy
final case class Start(
typeName: String,
entityProps: Props,
entityProps: String Props,
settings: ClusterShardingSettings,
extractEntityId: ShardRegion.ExtractEntityId,
extractShardId: ShardRegion.ExtractShardId,

View file

@ -107,7 +107,7 @@ private[akka] object Shard {
def props(
typeName: String,
shardId: ShardRegion.ShardId,
entityProps: Props,
entityProps: String Props,
settings: ClusterShardingSettings,
extractEntityId: ShardRegion.ExtractEntityId,
extractShardId: ShardRegion.ExtractShardId,
@ -137,7 +137,7 @@ private[akka] object Shard {
private[akka] class Shard(
typeName: String,
shardId: ShardRegion.ShardId,
entityProps: Props,
entityProps: String Props,
settings: ClusterShardingSettings,
extractEntityId: ShardRegion.ExtractEntityId,
extractShardId: ShardRegion.ExtractShardId,
@ -335,7 +335,7 @@ private[akka] class Shard(
context.child(name).getOrElse {
log.debug("Starting entity [{}] in shard [{}]", id, shardId)
val a = context.watch(context.actorOf(entityProps, name))
val a = context.watch(context.actorOf(entityProps(id), name))
idByRef = idByRef.updated(a, id)
refById = refById.updated(id, a)
state = state.copy(state.entities + id)
@ -478,7 +478,7 @@ private[akka] trait RememberingShard { selfType: Shard ⇒
private[akka] class PersistentShard(
typeName: String,
shardId: ShardRegion.ShardId,
entityProps: Props,
entityProps: String Props,
override val settings: ClusterShardingSettings,
extractEntityId: ShardRegion.ExtractEntityId,
extractShardId: ShardRegion.ExtractShardId,
@ -572,7 +572,7 @@ private[akka] class PersistentShard(
private[akka] class DDataShard(
typeName: String,
shardId: ShardRegion.ShardId,
entityProps: Props,
entityProps: String Props,
override val settings: ClusterShardingSettings,
extractEntityId: ShardRegion.ExtractEntityId,
extractShardId: ShardRegion.ExtractShardId,

View file

@ -33,7 +33,7 @@ object ShardRegion {
*/
private[akka] def props(
typeName: String,
entityProps: Props,
entityProps: String Props,
settings: ClusterShardingSettings,
coordinatorPath: String,
extractEntityId: ShardRegion.ExtractEntityId,
@ -366,7 +366,7 @@ object ShardRegion {
*/
private[akka] class ShardRegion(
typeName: String,
entityProps: Option[Props],
entityProps: Option[String Props],
dataCenter: Option[DataCenter],
settings: ClusterShardingSettings,
coordinatorPath: String,

View file

@ -329,7 +329,7 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends Mu
system.actorOf(
ShardRegion.props(
typeName = typeName,
entityProps = qualifiedCounterProps(typeName),
entityProps = _ qualifiedCounterProps(typeName),
settings = settings,
coordinatorPath = "/user/" + typeName + "Coordinator/singleton/coordinator",
extractEntityId = extractEntityId,