diff --git a/akka-cluster-sharding/src/main/mima-filters/2.5.7.backwards.excludes b/akka-cluster-sharding/src/main/mima-filters/2.5.7.backwards.excludes deleted file mode 100644 index 554c5a9582..0000000000 --- a/akka-cluster-sharding/src/main/mima-filters/2.5.7.backwards.excludes +++ /dev/null @@ -1,6 +0,0 @@ -# 24058 - Add ClusterSharding.start overloads -ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.sharding.ShardRegion.props") -ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.sharding.Shard.props") -ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.sharding.DDataShard.this") -ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.sharding.PersistentShard.this") -ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.sharding.Shard.this") diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala index 21cc91c791..9baa4e34e7 100755 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala @@ -6,7 +6,6 @@ package akka.cluster.sharding import java.net.URLEncoder import java.util.Optional import java.util.concurrent.ConcurrentHashMap -import java.util.function.{ Function ⇒ JFunction } import scala.concurrent.Await import akka.actor.Actor @@ -21,6 +20,7 @@ import akka.actor.NoSerializationVerificationNeeded import akka.actor.PoisonPill import akka.actor.Props import akka.cluster.Cluster +import akka.cluster.ddata.DistributedData import akka.cluster.singleton.ClusterSingletonManager import akka.pattern.BackoffSupervisor import akka.util.ByteString @@ -33,6 +33,7 @@ import scala.util.control.NonFatal import akka.actor.Status import akka.cluster.ClusterSettings import akka.cluster.ClusterSettings.DataCenter +import akka.stream.{ Inlet, Outlet } import scala.collection.immutable import scala.collection.JavaConverters._ @@ -220,7 +221,7 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { requireClusterRole(settings.role) implicit val timeout = system.settings.CreationTimeout - val startMsg = Start(typeName, _ ⇒ entityProps, settings, + val startMsg = Start(typeName, entityProps, settings, extractEntityId, extractShardId, allocationStrategy, handOffStopMessage) val Started(shardRegion) = Await.result(guardian ? startMsg, timeout.duration) regions.put(typeName, shardRegion) @@ -259,7 +260,7 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { settings.tuningParameters.leastShardAllocationRebalanceThreshold, settings.tuningParameters.leastShardAllocationMaxSimultaneousRebalance) - start(typeName, _ ⇒ entityProps, settings, extractEntityId, extractShardId, allocationStrategy, PoisonPill) + start(typeName, entityProps, settings, extractEntityId, extractShardId, allocationStrategy, PoisonPill) } /** @@ -291,7 +292,7 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { start( typeName, - _ ⇒ entityProps, + entityProps, settings, extractEntityId = { case msg if messageExtractor.entityId(msg) ne null ⇒ @@ -334,154 +335,6 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { start(typeName, entityProps, settings, messageExtractor, allocationStrategy, PoisonPill) } - /** - * Scala API: Register a named entity type by defining a factory for the [[akka.actor.Props]] of - * the entity actor and functions to extract entity and shard identifier from messages. The - * [[ShardRegion]] actor for this type can later be retrieved with the [[#shardRegion]] method. - * - * Some settings can be configured as described in the `akka.cluster.sharding` section - * of the `reference.conf`. - * - * @param typeName the name of the entity type - * @param entityPropsFactory function that, given an entity id, returns the `Props` of the entity actors - * that will be created by the `ShardRegion` - * @param settings configuration settings, see [[ClusterShardingSettings]] - * @param extractEntityId partial function to extract the entity id and the message to send to the - * entity from the incoming message, if the partial function does not match the message will - * be `unhandled`, i.e. posted as `Unhandled` messages on the event stream - * @param extractShardId function to determine the shard id for an incoming message, only messages - * that passed the `extractEntityId` will be used - * @param allocationStrategy possibility to use a custom shard allocation and - * rebalancing logic - * @param handOffStopMessage the message that will be sent to entities when they are to be stopped - * for a rebalance or graceful shutdown of a `ShardRegion`, e.g. `PoisonPill`. - * @return the actor ref of the [[ShardRegion]] that is to be responsible for the shard - */ - def start( - typeName: String, - entityPropsFactory: String ⇒ Props, - settings: ClusterShardingSettings, - extractEntityId: ShardRegion.ExtractEntityId, - extractShardId: ShardRegion.ExtractShardId, - allocationStrategy: ShardAllocationStrategy, - handOffStopMessage: Any): ActorRef = { - - requireClusterRole(settings.role) - implicit val timeout = system.settings.CreationTimeout - val startMsg = Start(typeName, entityPropsFactory, settings, - extractEntityId, extractShardId, allocationStrategy, handOffStopMessage) - val Started(shardRegion) = Await.result(guardian ? startMsg, timeout.duration) - regions.put(typeName, shardRegion) - shardRegion - } - - /** - * Scala API: Register a named entity type by defining a factory for the [[akka.actor.Props]] of - * the entity actor and functions to extract entity and shard identifier from messages. The - * [[ShardRegion]] actor for this type can later be retrieved with the [[#shardRegion]] method. - * - * The default shard allocation strategy [[ShardCoordinator.LeastShardAllocationStrategy]] - * is used. [[akka.actor.PoisonPill]] is used as `handOffStopMessage`. - * - * Some settings can be configured as described in the `akka.cluster.sharding` section - * of the `reference.conf`. - * - * @param typeName the name of the entity type - * @param entityPropsFactory function that, given an entity id, returns the `Props` of the entity actors - * that will be created by the `ShardRegion` - * @param settings configuration settings, see [[ClusterShardingSettings]] - * @param extractEntityId partial function to extract the entity id and the message to send to the - * entity from the incoming message, if the partial function does not match the message will - * be `unhandled`, i.e. posted as `Unhandled` messages on the event stream - * @param extractShardId function to determine the shard id for an incoming message, only messages - * that passed the `extractEntityId` will be used - * @return the actor ref of the [[ShardRegion]] that is to be responsible for the shard - */ - def start( - typeName: String, - entityPropsFactory: String ⇒ Props, - settings: ClusterShardingSettings, - extractEntityId: ShardRegion.ExtractEntityId, - extractShardId: ShardRegion.ExtractShardId): ActorRef = { - - val allocationStrategy = new LeastShardAllocationStrategy( - settings.tuningParameters.leastShardAllocationRebalanceThreshold, - settings.tuningParameters.leastShardAllocationMaxSimultaneousRebalance) - - start(typeName, entityPropsFactory, settings, extractEntityId, extractShardId, allocationStrategy, PoisonPill) - } - - /** - * Java/Scala API: Register a named entity type by defining a factory for the [[akka.actor.Props]] of - * the entity actor and functions to extract entity and shard identifier from messages. The - * [[ShardRegion]] actor for this type can later be retrieved with the [[#shardRegion]] method. - * - * Some settings can be configured as described in the `akka.cluster.sharding` section - * of the `reference.conf`. - * - * @param typeName the name of the entity type - * @param entityPropsFactory function that, given an entity id, returns the `Props` of the entity actors - * that will be created by the `ShardRegion` - * @param settings configuration settings, see [[ClusterShardingSettings]] - * @param messageExtractor functions to extract the entity id, shard id, and the message to send to the - * entity from the incoming message, see [[ShardRegion.MessageExtractor]] - * @param allocationStrategy possibility to use a custom shard allocation and - * rebalancing logic - * @param handOffStopMessage the message that will be sent to entities when they are to be stopped - * for a rebalance or graceful shutdown of a `ShardRegion`, e.g. `PoisonPill`. - * @return the actor ref of the [[ShardRegion]] that is to be responsible for the shard - */ - def start( - typeName: String, - entityPropsFactory: JFunction[String, Props], - settings: ClusterShardingSettings, - messageExtractor: ShardRegion.MessageExtractor, - allocationStrategy: ShardAllocationStrategy, - handOffStopMessage: Any): ActorRef = { - - start( - typeName, entityPropsFactory.apply _, settings, - extractEntityId = { - case msg if messageExtractor.entityId(msg) ne null ⇒ - (messageExtractor.entityId(msg), messageExtractor.entityMessage(msg)) - }: ShardRegion.ExtractEntityId, - extractShardId = msg ⇒ messageExtractor.shardId(msg), - allocationStrategy = allocationStrategy, - handOffStopMessage = handOffStopMessage) - } - - /** - * Java/Scala API: Register a named entity type by defining a factory for the [[akka.actor.Props]] of - * the entity actor and functions to extract entity and shard identifier from messages. The - * [[ShardRegion]] actor for this type can later be retrieved with the [[#shardRegion]] method. - * - * The default shard allocation strategy [[ShardCoordinator.LeastShardAllocationStrategy]] - * is used. [[akka.actor.PoisonPill]] is used as `handOffStopMessage`. - * - * Some settings can be configured as described in the `akka.cluster.sharding` section - * of the `reference.conf`. - * - * @param typeName the name of the entity type - * @param entityPropsFactory function that, given an entity id, returns the `Props` of the entity actors - * that will be created by the `ShardRegion` - * @param settings configuration settings, see [[ClusterShardingSettings]] - * @param messageExtractor functions to extract the entity id, shard id, and the message to send to the - * entity from the incoming message - * @return the actor ref of the [[ShardRegion]] that is to be responsible for the shard - */ - def start( - typeName: String, - entityPropsFactory: JFunction[String, Props], - settings: ClusterShardingSettings, - messageExtractor: ShardRegion.MessageExtractor): ActorRef = { - - val allocationStrategy = new LeastShardAllocationStrategy( - settings.tuningParameters.leastShardAllocationRebalanceThreshold, - settings.tuningParameters.leastShardAllocationMaxSimultaneousRebalance) - - start(typeName, entityPropsFactory, settings, messageExtractor, allocationStrategy, PoisonPill) - } - /** * Scala API: Register a named entity type `ShardRegion` on this node that will run in proxy only mode, * i.e. it will delegate messages to other `ShardRegion` actors on other nodes, but not host any @@ -664,7 +517,7 @@ private[akka] object ClusterShardingGuardian { import ShardCoordinator.ShardAllocationStrategy final case class Start( typeName: String, - entityPropsFactory: String ⇒ Props, + entityProps: Props, settings: ClusterShardingSettings, extractEntityId: ShardRegion.ExtractEntityId, extractShardId: ShardRegion.ExtractShardId, @@ -729,7 +582,7 @@ private[akka] class ClusterShardingGuardian extends Actor { def receive = { case Start(typeName, - entityPropsFactory, + entityProps, settings, extractEntityId, extractShardId, @@ -769,7 +622,7 @@ private[akka] class ClusterShardingGuardian extends Actor { context.actorOf( ShardRegion.props( typeName = typeName, - entityPropsFactory = entityPropsFactory, + entityProps = entityProps, settings = settings, coordinatorPath = cPath, extractEntityId = extractEntityId, diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala index e773d17336..61eb8eb2a1 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala @@ -107,7 +107,7 @@ private[akka] object Shard { def props( typeName: String, shardId: ShardRegion.ShardId, - entityPropsFactory: String ⇒ Props, + entityProps: Props, settings: ClusterShardingSettings, extractEntityId: ShardRegion.ExtractEntityId, extractShardId: ShardRegion.ExtractShardId, @@ -115,13 +115,13 @@ private[akka] object Shard { replicator: ActorRef, majorityMinCap: Int): Props = { if (settings.rememberEntities && settings.stateStoreMode == ClusterShardingSettings.StateStoreModeDData) { - Props(new DDataShard(typeName, shardId, entityPropsFactory, settings, extractEntityId, extractShardId, + Props(new DDataShard(typeName, shardId, entityProps, settings, extractEntityId, extractShardId, handOffStopMessage, replicator, majorityMinCap)).withDeploy(Deploy.local) } else if (settings.rememberEntities && settings.stateStoreMode == ClusterShardingSettings.StateStoreModePersistence) - Props(new PersistentShard(typeName, shardId, entityPropsFactory, settings, extractEntityId, extractShardId, handOffStopMessage)) + Props(new PersistentShard(typeName, shardId, entityProps, settings, extractEntityId, extractShardId, handOffStopMessage)) .withDeploy(Deploy.local) else - Props(new Shard(typeName, shardId, entityPropsFactory, settings, extractEntityId, extractShardId, handOffStopMessage)) + Props(new Shard(typeName, shardId, entityProps, settings, extractEntityId, extractShardId, handOffStopMessage)) .withDeploy(Deploy.local) } } @@ -137,7 +137,7 @@ private[akka] object Shard { private[akka] class Shard( typeName: String, shardId: ShardRegion.ShardId, - entityPropsFactory: String ⇒ Props, + entityProps: Props, settings: ClusterShardingSettings, extractEntityId: ShardRegion.ExtractEntityId, extractShardId: ShardRegion.ExtractShardId, @@ -332,7 +332,6 @@ private[akka] class Shard( context.child(name).getOrElse { log.debug("Starting entity [{}] in shard [{}]", id, shardId) - val entityProps = entityPropsFactory(id) val a = context.watch(context.actorOf(entityProps, name)) idByRef = idByRef.updated(a, id) refById = refById.updated(id, a) @@ -476,12 +475,12 @@ private[akka] trait RememberingShard { selfType: Shard ⇒ private[akka] class PersistentShard( typeName: String, shardId: ShardRegion.ShardId, - entityPropsFactory: String ⇒ Props, + entityProps: Props, override val settings: ClusterShardingSettings, extractEntityId: ShardRegion.ExtractEntityId, extractShardId: ShardRegion.ExtractShardId, handOffStopMessage: Any) extends Shard( - typeName, shardId, entityPropsFactory, settings, extractEntityId, extractShardId, handOffStopMessage) + typeName, shardId, entityProps, settings, extractEntityId, extractShardId, handOffStopMessage) with RememberingShard with PersistentActor with ActorLogging { import ShardRegion.{ EntityId, Msg } @@ -570,14 +569,14 @@ private[akka] class PersistentShard( private[akka] class DDataShard( typeName: String, shardId: ShardRegion.ShardId, - entityPropsFactory: String ⇒ Props, + entityProps: Props, override val settings: ClusterShardingSettings, extractEntityId: ShardRegion.ExtractEntityId, extractShardId: ShardRegion.ExtractShardId, handOffStopMessage: Any, replicator: ActorRef, majorityMinCap: Int) extends Shard( - typeName, shardId, entityPropsFactory, settings, extractEntityId, extractShardId, handOffStopMessage) + typeName, shardId, entityProps, settings, extractEntityId, extractShardId, handOffStopMessage) with RememberingShard with Stash with ActorLogging { import ShardRegion.{ EntityId, Msg } diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala index 6ed29a2958..4390955882 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala @@ -33,7 +33,7 @@ object ShardRegion { */ private[akka] def props( typeName: String, - entityPropsFactory: String ⇒ Props, + entityProps: Props, settings: ClusterShardingSettings, coordinatorPath: String, extractEntityId: ShardRegion.ExtractEntityId, @@ -41,7 +41,7 @@ object ShardRegion { handOffStopMessage: Any, replicator: ActorRef, majorityMinCap: Int): Props = - Props(new ShardRegion(typeName, Some(entityPropsFactory), dataCenter = None, settings, coordinatorPath, extractEntityId, + Props(new ShardRegion(typeName, Some(entityProps), dataCenter = None, settings, coordinatorPath, extractEntityId, extractShardId, handOffStopMessage, replicator, majorityMinCap)).withDeploy(Deploy.local) /** @@ -366,7 +366,7 @@ object ShardRegion { */ private[akka] class ShardRegion( typeName: String, - entityPropsFactory: Option[String ⇒ Props], + entityProps: Option[Props], dataCenter: Option[DataCenter], settings: ClusterShardingSettings, coordinatorPath: String, @@ -682,7 +682,7 @@ private[akka] class ShardRegion( } def registrationMessage: Any = - if (entityPropsFactory.isDefined) Register(self) else RegisterProxy(self) + if (entityProps.isDefined) Register(self) else RegisterProxy(self) def requestShardBufferHomes(): Unit = { shardBuffers.foreach { @@ -799,8 +799,8 @@ private[akka] class ShardRegion( None else { shards.get(id).orElse( - entityPropsFactory match { - case Some(factory) if !shardsByRef.values.exists(_ == id) ⇒ + entityProps match { + case Some(props) if !shardsByRef.values.exists(_ == id) ⇒ log.debug("Starting shard [{}] in region", id) val name = URLEncoder.encode(id, "utf-8") @@ -808,7 +808,7 @@ private[akka] class ShardRegion( Shard.props( typeName, id, - factory, + props, settings, extractEntityId, extractShardId, diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala index 22c12d98d8..60d3c1a12d 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala @@ -3,10 +3,11 @@ */ package akka.cluster.sharding -import akka.cluster.ddata.{ Replicator, ReplicatorSettings } -import akka.cluster.sharding.ShardCoordinator.Internal.{ HandOff, ShardStopped } -import akka.cluster.sharding.ShardRegion.{ CurrentRegions, GetCurrentRegions, Passivate } - +import akka.cluster.ddata.{ ReplicatorSettings, Replicator } +import akka.cluster.sharding.ShardCoordinator.Internal.{ ShardStopped, HandOff } +import akka.cluster.sharding.ShardRegion.Passivate +import akka.cluster.sharding.ShardRegion.GetCurrentRegions +import akka.cluster.sharding.ShardRegion.CurrentRegions import language.postfixOps import scala.concurrent.duration._ import com.typesafe.config.ConfigFactory @@ -23,7 +24,6 @@ import akka.remote.testkit.STMultiNodeSpec import akka.testkit._ import akka.testkit.TestEvent.Mute import java.io.File - import org.apache.commons.io.FileUtils import akka.cluster.singleton.ClusterSingletonManager import akka.cluster.singleton.ClusterSingletonManagerSettings @@ -39,17 +39,13 @@ object ClusterShardingSpec { case object Stop final case class CounterChanged(delta: Int) - object Counter { - val ShardingTypeName: String = "Counter" - def props(id: String): Props = Props(new Counter(id)) - } - - class Counter(id: String) extends PersistentActor { + class Counter extends PersistentActor { import ShardRegion.Passivate context.setReceiveTimeout(120.seconds) - override def persistenceId: String = s"${Counter.ShardingTypeName}-$id" + // self.path.name is the entity identifier (utf-8 URL-encoded) + override def persistenceId: String = "Counter-" + self.path.name var count = 0 //#counter-actor @@ -91,29 +87,18 @@ object ClusterShardingSpec { case ShardRegion.StartEntity(id) ⇒ (id.toLong % numberOfShards).toString } - object QualifiedCounter { - def props(typeName: String, id: String): Props = Props(new QualifiedCounter(typeName, id)) + def qualifiedCounterProps(typeName: String): Props = + Props(new QualifiedCounter(typeName)) + + class QualifiedCounter(typeName: String) extends Counter { + override def persistenceId: String = typeName + "-" + self.path.name } - class QualifiedCounter(typeName: String, id: String) extends Counter(id) { - override def persistenceId: String = s"$typeName-$id" - } - - object AnotherCounter { - val ShardingTypeName: String = "AnotherCounter" - def props(id: String): Props = Props(new AnotherCounter(id)) - } - - class AnotherCounter(id: String) extends QualifiedCounter(AnotherCounter.ShardingTypeName, id) + class AnotherCounter extends QualifiedCounter("AnotherCounter") //#supervisor - object CounterSupervisor { - val ShardingTypeName: String = "CounterSupervisor" - def props(id: String): Props = Props(new CounterSupervisor(id)) - } - - class CounterSupervisor(entityId: String) extends Actor { - val counter = context.actorOf(Counter.props(entityId), "theCounter") + class CounterSupervisor extends Actor { + val counter = context.actorOf(Props[Counter], "theCounter") override val supervisorStrategy = OneForOneStrategy() { case _: IllegalArgumentException ⇒ SupervisorStrategy.Resume @@ -128,9 +113,6 @@ object ClusterShardingSpec { } //#supervisor - // must use different unique name for some tests than the one used in API tests - val TestCounterShardingTypeName = s"Test${Counter.ShardingTypeName}" - } abstract class ClusterShardingSpecConfig( @@ -316,7 +298,7 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends Mu ShardCoordinator.props(typeName, settings, allocationStrategy, replicator, majorityMinCap) } - List(TestCounterShardingTypeName, "rebalancingCounter", "RememberCounterEntities", "AnotherRememberCounter", + List("counter", "rebalancingCounter", "RememberCounterEntities", "AnotherRememberCounter", "RememberCounter", "RebalancingRememberCounter", "AutoMigrateRememberRegionTest").foreach { typeName ⇒ val rebalanceEnabled = typeName.toLowerCase.startsWith("rebalancing") val rememberEnabled = typeName.toLowerCase.contains("remember") @@ -347,7 +329,7 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends Mu system.actorOf( ShardRegion.props( typeName = typeName, - entityPropsFactory = entityId ⇒ QualifiedCounter.props(typeName, entityId), + entityProps = qualifiedCounterProps(typeName), settings = settings, coordinatorPath = "/user/" + typeName + "Coordinator/singleton/coordinator", extractEntityId = extractEntityId, @@ -358,7 +340,7 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends Mu name = typeName + "Region") } - lazy val region = createRegion(TestCounterShardingTypeName, rememberEntities = false) + lazy val region = createRegion("counter", rememberEntities = false) lazy val rebalancingRegion = createRegion("rebalancingCounter", rememberEntities = false) lazy val persistentEntitiesRegion = createRegion("RememberCounterEntities", rememberEntities = true) @@ -430,7 +412,7 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends Mu region ! EntityEnvelope(2, Increment) region ! Get(2) expectMsg(3) - lastSender.path should ===(node(second) / "user" / s"${TestCounterShardingTypeName}Region" / "2" / "2") + lastSender.path should ===(node(second) / "user" / "counterRegion" / "2" / "2") region ! Get(11) expectMsg(1) @@ -438,7 +420,7 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends Mu lastSender.path should ===(region.path / "11" / "11") region ! Get(12) expectMsg(1) - lastSender.path should ===(node(second) / "user" / s"${TestCounterShardingTypeName}Region" / "0" / "12") + lastSender.path should ===(node(second) / "user" / "counterRegion" / "0" / "12") } enterBarrier("first-update") @@ -477,10 +459,10 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends Mu val settings = ClusterShardingSettings(cfg) val proxy = system.actorOf( ShardRegion.proxyProps( - typeName = TestCounterShardingTypeName, + typeName = "counter", dataCenter = None, settings, - coordinatorPath = s"/user/${TestCounterShardingTypeName}Coordinator/singleton/coordinator", + coordinatorPath = "/user/counterCoordinator/singleton/coordinator", extractEntityId = extractEntityId, extractShardId = extractShardId, system.deadLetters, @@ -555,12 +537,12 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends Mu region ! EntityEnvelope(3, Increment) region ! Get(3) expectMsg(11) - lastSender.path should ===(node(third) / "user" / s"${TestCounterShardingTypeName}Region" / "3" / "3") + lastSender.path should ===(node(third) / "user" / "counterRegion" / "3" / "3") region ! EntityEnvelope(4, Increment) region ! Get(4) expectMsg(21) - lastSender.path should ===(node(fourth) / "user" / s"${TestCounterShardingTypeName}Region" / "4" / "4") + lastSender.path should ===(node(fourth) / "user" / "counterRegion" / "4" / "4") } enterBarrier("first-update") @@ -593,7 +575,7 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends Mu within(1.second) { region.tell(Get(3), probe3.ref) probe3.expectMsg(11) - probe3.lastSender.path should ===(node(third) / "user" / s"${TestCounterShardingTypeName}Region" / "3" / "3") + probe3.lastSender.path should ===(node(third) / "user" / "counterRegion" / "3" / "3") } } val probe4 = TestProbe() @@ -601,7 +583,7 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends Mu within(1.second) { region.tell(Get(4), probe4.ref) probe4.expectMsg(21) - probe4.lastSender.path should ===(node(fourth) / "user" / s"${TestCounterShardingTypeName}Region" / "4" / "4") + probe4.lastSender.path should ===(node(fourth) / "user" / "counterRegion" / "4" / "4") } } @@ -646,23 +628,23 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends Mu runOn(third, fourth, fifth, sixth) { //#counter-start val counterRegion: ActorRef = ClusterSharding(system).start( - typeName = Counter.ShardingTypeName, - entityPropsFactory = entityId ⇒ Counter.props(entityId), + typeName = "Counter", + entityProps = Props[Counter], settings = ClusterShardingSettings(system), extractEntityId = extractEntityId, extractShardId = extractShardId) //#counter-start ClusterSharding(system).start( - typeName = AnotherCounter.ShardingTypeName, - entityPropsFactory = entityId ⇒ AnotherCounter.props(entityId), + typeName = "AnotherCounter", + entityProps = Props[AnotherCounter], settings = ClusterShardingSettings(system), extractEntityId = extractEntityId, extractShardId = extractShardId) //#counter-supervisor-start ClusterSharding(system).start( - typeName = CounterSupervisor.ShardingTypeName, - entityPropsFactory = entityId ⇒ CounterSupervisor.props(entityId), + typeName = "SupervisedCounter", + entityProps = Props[CounterSupervisor], settings = ClusterShardingSettings(system), extractEntityId = extractEntityId, extractShardId = extractShardId) @@ -671,18 +653,17 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends Mu enterBarrier("extension-started") runOn(fifth) { //#counter-usage - val counterRegion: ActorRef = ClusterSharding(system).shardRegion(Counter.ShardingTypeName) - val entityId = 999 - counterRegion ! Get(entityId) + val counterRegion: ActorRef = ClusterSharding(system).shardRegion("Counter") + counterRegion ! Get(123) expectMsg(0) - counterRegion ! EntityEnvelope(entityId, Increment) - counterRegion ! Get(entityId) + counterRegion ! EntityEnvelope(123, Increment) + counterRegion ! Get(123) expectMsg(1) //#counter-usage - ClusterSharding(system).shardRegion(AnotherCounter.ShardingTypeName) ! EntityEnvelope(entityId, Decrement) - ClusterSharding(system).shardRegion(AnotherCounter.ShardingTypeName) ! Get(entityId) + ClusterSharding(system).shardRegion("AnotherCounter") ! EntityEnvelope(123, Decrement) + ClusterSharding(system).shardRegion("AnotherCounter") ! Get(123) expectMsg(-1) } @@ -691,8 +672,8 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends Mu // sixth is a frontend node, i.e. proxy only runOn(sixth) { for (n ← 1000 to 1010) { - ClusterSharding(system).shardRegion(Counter.ShardingTypeName) ! EntityEnvelope(n, Increment) - ClusterSharding(system).shardRegion(Counter.ShardingTypeName) ! Get(n) + ClusterSharding(system).shardRegion("Counter") ! EntityEnvelope(n, Increment) + ClusterSharding(system).shardRegion("Counter") ! Get(n) expectMsg(1) lastSender.path.address should not be (Cluster(system).selfAddress) } @@ -705,7 +686,7 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends Mu runOn(first) { val counterRegionViaStart: ActorRef = ClusterSharding(system).start( typeName = "ApiTest", - entityPropsFactory = Counter.props, + entityProps = Props[Counter], settings = ClusterShardingSettings(system), extractEntityId = extractEntityId, extractShardId = extractShardId) @@ -722,7 +703,7 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends Mu runOn(sixth) { // #proxy-dc val counterProxyDcB: ActorRef = ClusterSharding(system).startProxy( - typeName = Counter.ShardingTypeName, + typeName = "Counter", role = None, dataCenter = Some("B"), extractEntityId = extractEntityId, diff --git a/akka-docs/src/main/paradox/cluster-sharding.md b/akka-docs/src/main/paradox/cluster-sharding.md index fdf5cb73a0..354b2ae2be 100644 --- a/akka-docs/src/main/paradox/cluster-sharding.md +++ b/akka-docs/src/main/paradox/cluster-sharding.md @@ -45,8 +45,8 @@ The above actor uses event sourcing and the support provided in @scala[`Persiste It does not have to be a persistent actor, but in case of failure or migration of entities between nodes it must be able to recover its state if it is valuable. -Note how the `persistenceId` is defined - it must be unique to the entity, so using the entity identifier is advised. -You may define it in other ways, but it must be unique. +Note how the `persistenceId` is defined. The name of the actor is the entity identifier (utf-8 URL-encoded). +You may define it another way, but it must be unique. When using the sharding extension you are first, typically at system startup on each node in the cluster, supposed to register the supported entity types with the `ClusterSharding.start` @@ -58,10 +58,6 @@ Scala Java : @@snip [ClusterShardingTest.java]($code$/java/jdocs/sharding/ClusterShardingTest.java) { #counter-start } -In some cases, the actor may need to know the `entityId` associated with it. This can be achieved using the `entityPropsFactory` -parameter to `ClusterSharding.start`. The entity ID will be passed to the factory as a parameter, which can then be used in -the creation of the actor (like the above example). - The @scala[`extractEntityId` and `extractShardId` are two] @java[`messageExtractor` defines] application specific @scala[functions] @java[methods] to extract the entity identifier and the shard identifier from incoming messages. diff --git a/akka-docs/src/test/java/jdocs/sharding/ClusterShardingTest.java b/akka-docs/src/test/java/jdocs/sharding/ClusterShardingTest.java index 7feef07c56..21c4d819c8 100644 --- a/akka-docs/src/test/java/jdocs/sharding/ClusterShardingTest.java +++ b/akka-docs/src/test/java/jdocs/sharding/ClusterShardingTest.java @@ -7,8 +7,6 @@ package jdocs.sharding; import static java.util.concurrent.TimeUnit.SECONDS; import java.util.Optional; -import java.util.function.Function; - import scala.concurrent.duration.Duration; import akka.actor.AbstractActor; @@ -19,6 +17,7 @@ import akka.actor.OneForOneStrategy; import akka.actor.PoisonPill; import akka.actor.Props; import akka.actor.SupervisorStrategy; +import akka.actor.Terminated; import akka.actor.ReceiveTimeout; //#counter-extractor import akka.cluster.sharding.ShardRegion; @@ -32,6 +31,7 @@ import akka.cluster.sharding.ClusterShardingSettings; //#counter-start import akka.persistence.AbstractPersistentActor; +import akka.cluster.Cluster; import akka.japi.pf.DeciderBuilder; // Doc code, compile only @@ -85,12 +85,12 @@ public class ClusterShardingTest { //#counter-start Option roleOption = Option.none(); ClusterShardingSettings settings = ClusterShardingSettings.create(system); - ActorRef startedCounterRegion = ClusterSharding.get(system).start(Counter.ShardingTypeName, - entityId -> Counter.props(entityId), settings, messageExtractor); + ActorRef startedCounterRegion = ClusterSharding.get(system).start("Counter", + Props.create(Counter.class), settings, messageExtractor); //#counter-start //#counter-usage - ActorRef counterRegion = ClusterSharding.get(system).shardRegion(Counter.ShardingTypeName); + ActorRef counterRegion = ClusterSharding.get(system).shardRegion("Counter"); counterRegion.tell(new Counter.Get(123), getSelf()); counterRegion.tell(new Counter.EntityEnvelope(123, @@ -99,14 +99,14 @@ public class ClusterShardingTest { //#counter-usage //#counter-supervisor-start - ClusterSharding.get(system).start(CounterSupervisor.ShardingTypeName, - entityId -> CounterSupervisor.props(entityId), settings, messageExtractor); + ClusterSharding.get(system).start("SupervisedCounter", + Props.create(CounterSupervisor.class), settings, messageExtractor); //#counter-supervisor-start //#proxy-dc ActorRef counterProxyDcB = ClusterSharding.get(system).startProxy( - Counter.ShardingTypeName, + "Counter", Optional.empty(), Optional.of("B"), // data center name messageExtractor); @@ -189,22 +189,12 @@ public class ClusterShardingTest { } } - public static final String ShardingTypeName = "Counter"; - - public static Props props(String id) { - return Props.create(() -> new Counter(id)); - } - - final String entityId; int count = 0; - public Counter(String entityId) { - this.entityId = entityId; - } - + // getSelf().path().name() is the entity identifier (utf-8 URL-encoded) @Override public String persistenceId() { - return ShardingTypeName + "-" + entityId; + return "Counter-" + getSelf().path().name(); } @Override @@ -257,17 +247,9 @@ public class ClusterShardingTest { static//#supervisor public class CounterSupervisor extends AbstractActor { - public static final String ShardingTypeName = "CounterSupervisor"; - public static Props props(String entityId) { - return Props.create(() -> new CounterSupervisor(entityId)); - } - - private final ActorRef counter; - - public CounterSupervisor(String entityId) { - counter = getContext().actorOf(Counter.props(entityId), "theCounter"); - } + private final ActorRef counter = getContext().actorOf( + Props.create(Counter.class), "theCounter"); private static final SupervisorStrategy strategy = new OneForOneStrategy(DeciderBuilder.