Revert source incompatible sharding changes (#24126)

* Revert "fix entityPropsFactory id param, #21809"
This reverts commit cd7eae28f6.
* Revert "Merge pull request #24058 from talpr/talpr-24053-add-entity-id-to-sharding-props"
This reverts commit 8417e70460, reversing
changes made to 22e85f869d.
This commit is contained in:
Johan Andrén 2017-12-07 17:49:29 +01:00 committed by GitHub
parent cd7eae28f6
commit 582f6a4836
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 81 additions and 276 deletions

View file

@ -1,6 +0,0 @@
# 24058 - Add ClusterSharding.start overloads
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.sharding.ShardRegion.props")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.sharding.Shard.props")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.sharding.DDataShard.this")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.sharding.PersistentShard.this")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.sharding.Shard.this")

View file

@ -6,7 +6,6 @@ package akka.cluster.sharding
import java.net.URLEncoder import java.net.URLEncoder
import java.util.Optional import java.util.Optional
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import java.util.function.{ Function JFunction }
import scala.concurrent.Await import scala.concurrent.Await
import akka.actor.Actor import akka.actor.Actor
@ -21,6 +20,7 @@ import akka.actor.NoSerializationVerificationNeeded
import akka.actor.PoisonPill import akka.actor.PoisonPill
import akka.actor.Props import akka.actor.Props
import akka.cluster.Cluster import akka.cluster.Cluster
import akka.cluster.ddata.DistributedData
import akka.cluster.singleton.ClusterSingletonManager import akka.cluster.singleton.ClusterSingletonManager
import akka.pattern.BackoffSupervisor import akka.pattern.BackoffSupervisor
import akka.util.ByteString import akka.util.ByteString
@ -33,6 +33,7 @@ import scala.util.control.NonFatal
import akka.actor.Status import akka.actor.Status
import akka.cluster.ClusterSettings import akka.cluster.ClusterSettings
import akka.cluster.ClusterSettings.DataCenter import akka.cluster.ClusterSettings.DataCenter
import akka.stream.{ Inlet, Outlet }
import scala.collection.immutable import scala.collection.immutable
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
@ -220,7 +221,7 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
requireClusterRole(settings.role) requireClusterRole(settings.role)
implicit val timeout = system.settings.CreationTimeout implicit val timeout = system.settings.CreationTimeout
val startMsg = Start(typeName, _ entityProps, settings, val startMsg = Start(typeName, entityProps, settings,
extractEntityId, extractShardId, allocationStrategy, handOffStopMessage) extractEntityId, extractShardId, allocationStrategy, handOffStopMessage)
val Started(shardRegion) = Await.result(guardian ? startMsg, timeout.duration) val Started(shardRegion) = Await.result(guardian ? startMsg, timeout.duration)
regions.put(typeName, shardRegion) regions.put(typeName, shardRegion)
@ -259,7 +260,7 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
settings.tuningParameters.leastShardAllocationRebalanceThreshold, settings.tuningParameters.leastShardAllocationRebalanceThreshold,
settings.tuningParameters.leastShardAllocationMaxSimultaneousRebalance) settings.tuningParameters.leastShardAllocationMaxSimultaneousRebalance)
start(typeName, _ entityProps, settings, extractEntityId, extractShardId, allocationStrategy, PoisonPill) start(typeName, entityProps, settings, extractEntityId, extractShardId, allocationStrategy, PoisonPill)
} }
/** /**
@ -291,7 +292,7 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
start( start(
typeName, typeName,
_ entityProps, entityProps,
settings, settings,
extractEntityId = { extractEntityId = {
case msg if messageExtractor.entityId(msg) ne null case msg if messageExtractor.entityId(msg) ne null
@ -334,154 +335,6 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
start(typeName, entityProps, settings, messageExtractor, allocationStrategy, PoisonPill) start(typeName, entityProps, settings, messageExtractor, allocationStrategy, PoisonPill)
} }
/**
* Scala API: Register a named entity type by defining a factory for the [[akka.actor.Props]] of
* the entity actor and functions to extract entity and shard identifier from messages. The
* [[ShardRegion]] actor for this type can later be retrieved with the [[#shardRegion]] method.
*
* Some settings can be configured as described in the `akka.cluster.sharding` section
* of the `reference.conf`.
*
* @param typeName the name of the entity type
* @param entityPropsFactory function that, given an entity id, returns the `Props` of the entity actors
* that will be created by the `ShardRegion`
* @param settings configuration settings, see [[ClusterShardingSettings]]
* @param extractEntityId partial function to extract the entity id and the message to send to the
* entity from the incoming message, if the partial function does not match the message will
* be `unhandled`, i.e. posted as `Unhandled` messages on the event stream
* @param extractShardId function to determine the shard id for an incoming message, only messages
* that passed the `extractEntityId` will be used
* @param allocationStrategy possibility to use a custom shard allocation and
* rebalancing logic
* @param handOffStopMessage the message that will be sent to entities when they are to be stopped
* for a rebalance or graceful shutdown of a `ShardRegion`, e.g. `PoisonPill`.
* @return the actor ref of the [[ShardRegion]] that is to be responsible for the shard
*/
def start(
typeName: String,
entityPropsFactory: String Props,
settings: ClusterShardingSettings,
extractEntityId: ShardRegion.ExtractEntityId,
extractShardId: ShardRegion.ExtractShardId,
allocationStrategy: ShardAllocationStrategy,
handOffStopMessage: Any): ActorRef = {
requireClusterRole(settings.role)
implicit val timeout = system.settings.CreationTimeout
val startMsg = Start(typeName, entityPropsFactory, settings,
extractEntityId, extractShardId, allocationStrategy, handOffStopMessage)
val Started(shardRegion) = Await.result(guardian ? startMsg, timeout.duration)
regions.put(typeName, shardRegion)
shardRegion
}
/**
* Scala API: Register a named entity type by defining a factory for the [[akka.actor.Props]] of
* the entity actor and functions to extract entity and shard identifier from messages. The
* [[ShardRegion]] actor for this type can later be retrieved with the [[#shardRegion]] method.
*
* The default shard allocation strategy [[ShardCoordinator.LeastShardAllocationStrategy]]
* is used. [[akka.actor.PoisonPill]] is used as `handOffStopMessage`.
*
* Some settings can be configured as described in the `akka.cluster.sharding` section
* of the `reference.conf`.
*
* @param typeName the name of the entity type
* @param entityPropsFactory function that, given an entity id, returns the `Props` of the entity actors
* that will be created by the `ShardRegion`
* @param settings configuration settings, see [[ClusterShardingSettings]]
* @param extractEntityId partial function to extract the entity id and the message to send to the
* entity from the incoming message, if the partial function does not match the message will
* be `unhandled`, i.e. posted as `Unhandled` messages on the event stream
* @param extractShardId function to determine the shard id for an incoming message, only messages
* that passed the `extractEntityId` will be used
* @return the actor ref of the [[ShardRegion]] that is to be responsible for the shard
*/
def start(
typeName: String,
entityPropsFactory: String Props,
settings: ClusterShardingSettings,
extractEntityId: ShardRegion.ExtractEntityId,
extractShardId: ShardRegion.ExtractShardId): ActorRef = {
val allocationStrategy = new LeastShardAllocationStrategy(
settings.tuningParameters.leastShardAllocationRebalanceThreshold,
settings.tuningParameters.leastShardAllocationMaxSimultaneousRebalance)
start(typeName, entityPropsFactory, settings, extractEntityId, extractShardId, allocationStrategy, PoisonPill)
}
/**
* Java/Scala API: Register a named entity type by defining a factory for the [[akka.actor.Props]] of
* the entity actor and functions to extract entity and shard identifier from messages. The
* [[ShardRegion]] actor for this type can later be retrieved with the [[#shardRegion]] method.
*
* Some settings can be configured as described in the `akka.cluster.sharding` section
* of the `reference.conf`.
*
* @param typeName the name of the entity type
* @param entityPropsFactory function that, given an entity id, returns the `Props` of the entity actors
* that will be created by the `ShardRegion`
* @param settings configuration settings, see [[ClusterShardingSettings]]
* @param messageExtractor functions to extract the entity id, shard id, and the message to send to the
* entity from the incoming message, see [[ShardRegion.MessageExtractor]]
* @param allocationStrategy possibility to use a custom shard allocation and
* rebalancing logic
* @param handOffStopMessage the message that will be sent to entities when they are to be stopped
* for a rebalance or graceful shutdown of a `ShardRegion`, e.g. `PoisonPill`.
* @return the actor ref of the [[ShardRegion]] that is to be responsible for the shard
*/
def start(
typeName: String,
entityPropsFactory: JFunction[String, Props],
settings: ClusterShardingSettings,
messageExtractor: ShardRegion.MessageExtractor,
allocationStrategy: ShardAllocationStrategy,
handOffStopMessage: Any): ActorRef = {
start(
typeName, entityPropsFactory.apply _, settings,
extractEntityId = {
case msg if messageExtractor.entityId(msg) ne null
(messageExtractor.entityId(msg), messageExtractor.entityMessage(msg))
}: ShardRegion.ExtractEntityId,
extractShardId = msg messageExtractor.shardId(msg),
allocationStrategy = allocationStrategy,
handOffStopMessage = handOffStopMessage)
}
/**
* Java/Scala API: Register a named entity type by defining a factory for the [[akka.actor.Props]] of
* the entity actor and functions to extract entity and shard identifier from messages. The
* [[ShardRegion]] actor for this type can later be retrieved with the [[#shardRegion]] method.
*
* The default shard allocation strategy [[ShardCoordinator.LeastShardAllocationStrategy]]
* is used. [[akka.actor.PoisonPill]] is used as `handOffStopMessage`.
*
* Some settings can be configured as described in the `akka.cluster.sharding` section
* of the `reference.conf`.
*
* @param typeName the name of the entity type
* @param entityPropsFactory function that, given an entity id, returns the `Props` of the entity actors
* that will be created by the `ShardRegion`
* @param settings configuration settings, see [[ClusterShardingSettings]]
* @param messageExtractor functions to extract the entity id, shard id, and the message to send to the
* entity from the incoming message
* @return the actor ref of the [[ShardRegion]] that is to be responsible for the shard
*/
def start(
typeName: String,
entityPropsFactory: JFunction[String, Props],
settings: ClusterShardingSettings,
messageExtractor: ShardRegion.MessageExtractor): ActorRef = {
val allocationStrategy = new LeastShardAllocationStrategy(
settings.tuningParameters.leastShardAllocationRebalanceThreshold,
settings.tuningParameters.leastShardAllocationMaxSimultaneousRebalance)
start(typeName, entityPropsFactory, settings, messageExtractor, allocationStrategy, PoisonPill)
}
/** /**
* Scala API: Register a named entity type `ShardRegion` on this node that will run in proxy only mode, * Scala API: Register a named entity type `ShardRegion` on this node that will run in proxy only mode,
* i.e. it will delegate messages to other `ShardRegion` actors on other nodes, but not host any * i.e. it will delegate messages to other `ShardRegion` actors on other nodes, but not host any
@ -664,7 +517,7 @@ private[akka] object ClusterShardingGuardian {
import ShardCoordinator.ShardAllocationStrategy import ShardCoordinator.ShardAllocationStrategy
final case class Start( final case class Start(
typeName: String, typeName: String,
entityPropsFactory: String Props, entityProps: Props,
settings: ClusterShardingSettings, settings: ClusterShardingSettings,
extractEntityId: ShardRegion.ExtractEntityId, extractEntityId: ShardRegion.ExtractEntityId,
extractShardId: ShardRegion.ExtractShardId, extractShardId: ShardRegion.ExtractShardId,
@ -729,7 +582,7 @@ private[akka] class ClusterShardingGuardian extends Actor {
def receive = { def receive = {
case Start(typeName, case Start(typeName,
entityPropsFactory, entityProps,
settings, settings,
extractEntityId, extractEntityId,
extractShardId, extractShardId,
@ -769,7 +622,7 @@ private[akka] class ClusterShardingGuardian extends Actor {
context.actorOf( context.actorOf(
ShardRegion.props( ShardRegion.props(
typeName = typeName, typeName = typeName,
entityPropsFactory = entityPropsFactory, entityProps = entityProps,
settings = settings, settings = settings,
coordinatorPath = cPath, coordinatorPath = cPath,
extractEntityId = extractEntityId, extractEntityId = extractEntityId,

View file

@ -107,7 +107,7 @@ private[akka] object Shard {
def props( def props(
typeName: String, typeName: String,
shardId: ShardRegion.ShardId, shardId: ShardRegion.ShardId,
entityPropsFactory: String Props, entityProps: Props,
settings: ClusterShardingSettings, settings: ClusterShardingSettings,
extractEntityId: ShardRegion.ExtractEntityId, extractEntityId: ShardRegion.ExtractEntityId,
extractShardId: ShardRegion.ExtractShardId, extractShardId: ShardRegion.ExtractShardId,
@ -115,13 +115,13 @@ private[akka] object Shard {
replicator: ActorRef, replicator: ActorRef,
majorityMinCap: Int): Props = { majorityMinCap: Int): Props = {
if (settings.rememberEntities && settings.stateStoreMode == ClusterShardingSettings.StateStoreModeDData) { if (settings.rememberEntities && settings.stateStoreMode == ClusterShardingSettings.StateStoreModeDData) {
Props(new DDataShard(typeName, shardId, entityPropsFactory, settings, extractEntityId, extractShardId, Props(new DDataShard(typeName, shardId, entityProps, settings, extractEntityId, extractShardId,
handOffStopMessage, replicator, majorityMinCap)).withDeploy(Deploy.local) handOffStopMessage, replicator, majorityMinCap)).withDeploy(Deploy.local)
} else if (settings.rememberEntities && settings.stateStoreMode == ClusterShardingSettings.StateStoreModePersistence) } else if (settings.rememberEntities && settings.stateStoreMode == ClusterShardingSettings.StateStoreModePersistence)
Props(new PersistentShard(typeName, shardId, entityPropsFactory, settings, extractEntityId, extractShardId, handOffStopMessage)) Props(new PersistentShard(typeName, shardId, entityProps, settings, extractEntityId, extractShardId, handOffStopMessage))
.withDeploy(Deploy.local) .withDeploy(Deploy.local)
else else
Props(new Shard(typeName, shardId, entityPropsFactory, settings, extractEntityId, extractShardId, handOffStopMessage)) Props(new Shard(typeName, shardId, entityProps, settings, extractEntityId, extractShardId, handOffStopMessage))
.withDeploy(Deploy.local) .withDeploy(Deploy.local)
} }
} }
@ -137,7 +137,7 @@ private[akka] object Shard {
private[akka] class Shard( private[akka] class Shard(
typeName: String, typeName: String,
shardId: ShardRegion.ShardId, shardId: ShardRegion.ShardId,
entityPropsFactory: String Props, entityProps: Props,
settings: ClusterShardingSettings, settings: ClusterShardingSettings,
extractEntityId: ShardRegion.ExtractEntityId, extractEntityId: ShardRegion.ExtractEntityId,
extractShardId: ShardRegion.ExtractShardId, extractShardId: ShardRegion.ExtractShardId,
@ -332,7 +332,6 @@ private[akka] class Shard(
context.child(name).getOrElse { context.child(name).getOrElse {
log.debug("Starting entity [{}] in shard [{}]", id, shardId) log.debug("Starting entity [{}] in shard [{}]", id, shardId)
val entityProps = entityPropsFactory(id)
val a = context.watch(context.actorOf(entityProps, name)) val a = context.watch(context.actorOf(entityProps, name))
idByRef = idByRef.updated(a, id) idByRef = idByRef.updated(a, id)
refById = refById.updated(id, a) refById = refById.updated(id, a)
@ -476,12 +475,12 @@ private[akka] trait RememberingShard { selfType: Shard ⇒
private[akka] class PersistentShard( private[akka] class PersistentShard(
typeName: String, typeName: String,
shardId: ShardRegion.ShardId, shardId: ShardRegion.ShardId,
entityPropsFactory: String Props, entityProps: Props,
override val settings: ClusterShardingSettings, override val settings: ClusterShardingSettings,
extractEntityId: ShardRegion.ExtractEntityId, extractEntityId: ShardRegion.ExtractEntityId,
extractShardId: ShardRegion.ExtractShardId, extractShardId: ShardRegion.ExtractShardId,
handOffStopMessage: Any) extends Shard( handOffStopMessage: Any) extends Shard(
typeName, shardId, entityPropsFactory, settings, extractEntityId, extractShardId, handOffStopMessage) typeName, shardId, entityProps, settings, extractEntityId, extractShardId, handOffStopMessage)
with RememberingShard with PersistentActor with ActorLogging { with RememberingShard with PersistentActor with ActorLogging {
import ShardRegion.{ EntityId, Msg } import ShardRegion.{ EntityId, Msg }
@ -570,14 +569,14 @@ private[akka] class PersistentShard(
private[akka] class DDataShard( private[akka] class DDataShard(
typeName: String, typeName: String,
shardId: ShardRegion.ShardId, shardId: ShardRegion.ShardId,
entityPropsFactory: String Props, entityProps: Props,
override val settings: ClusterShardingSettings, override val settings: ClusterShardingSettings,
extractEntityId: ShardRegion.ExtractEntityId, extractEntityId: ShardRegion.ExtractEntityId,
extractShardId: ShardRegion.ExtractShardId, extractShardId: ShardRegion.ExtractShardId,
handOffStopMessage: Any, handOffStopMessage: Any,
replicator: ActorRef, replicator: ActorRef,
majorityMinCap: Int) extends Shard( majorityMinCap: Int) extends Shard(
typeName, shardId, entityPropsFactory, settings, extractEntityId, extractShardId, handOffStopMessage) typeName, shardId, entityProps, settings, extractEntityId, extractShardId, handOffStopMessage)
with RememberingShard with Stash with ActorLogging { with RememberingShard with Stash with ActorLogging {
import ShardRegion.{ EntityId, Msg } import ShardRegion.{ EntityId, Msg }

View file

@ -33,7 +33,7 @@ object ShardRegion {
*/ */
private[akka] def props( private[akka] def props(
typeName: String, typeName: String,
entityPropsFactory: String Props, entityProps: Props,
settings: ClusterShardingSettings, settings: ClusterShardingSettings,
coordinatorPath: String, coordinatorPath: String,
extractEntityId: ShardRegion.ExtractEntityId, extractEntityId: ShardRegion.ExtractEntityId,
@ -41,7 +41,7 @@ object ShardRegion {
handOffStopMessage: Any, handOffStopMessage: Any,
replicator: ActorRef, replicator: ActorRef,
majorityMinCap: Int): Props = majorityMinCap: Int): Props =
Props(new ShardRegion(typeName, Some(entityPropsFactory), dataCenter = None, settings, coordinatorPath, extractEntityId, Props(new ShardRegion(typeName, Some(entityProps), dataCenter = None, settings, coordinatorPath, extractEntityId,
extractShardId, handOffStopMessage, replicator, majorityMinCap)).withDeploy(Deploy.local) extractShardId, handOffStopMessage, replicator, majorityMinCap)).withDeploy(Deploy.local)
/** /**
@ -366,7 +366,7 @@ object ShardRegion {
*/ */
private[akka] class ShardRegion( private[akka] class ShardRegion(
typeName: String, typeName: String,
entityPropsFactory: Option[String Props], entityProps: Option[Props],
dataCenter: Option[DataCenter], dataCenter: Option[DataCenter],
settings: ClusterShardingSettings, settings: ClusterShardingSettings,
coordinatorPath: String, coordinatorPath: String,
@ -682,7 +682,7 @@ private[akka] class ShardRegion(
} }
def registrationMessage: Any = def registrationMessage: Any =
if (entityPropsFactory.isDefined) Register(self) else RegisterProxy(self) if (entityProps.isDefined) Register(self) else RegisterProxy(self)
def requestShardBufferHomes(): Unit = { def requestShardBufferHomes(): Unit = {
shardBuffers.foreach { shardBuffers.foreach {
@ -799,8 +799,8 @@ private[akka] class ShardRegion(
None None
else { else {
shards.get(id).orElse( shards.get(id).orElse(
entityPropsFactory match { entityProps match {
case Some(factory) if !shardsByRef.values.exists(_ == id) case Some(props) if !shardsByRef.values.exists(_ == id)
log.debug("Starting shard [{}] in region", id) log.debug("Starting shard [{}] in region", id)
val name = URLEncoder.encode(id, "utf-8") val name = URLEncoder.encode(id, "utf-8")
@ -808,7 +808,7 @@ private[akka] class ShardRegion(
Shard.props( Shard.props(
typeName, typeName,
id, id,
factory, props,
settings, settings,
extractEntityId, extractEntityId,
extractShardId, extractShardId,

View file

@ -3,10 +3,11 @@
*/ */
package akka.cluster.sharding package akka.cluster.sharding
import akka.cluster.ddata.{ Replicator, ReplicatorSettings } import akka.cluster.ddata.{ ReplicatorSettings, Replicator }
import akka.cluster.sharding.ShardCoordinator.Internal.{ HandOff, ShardStopped } import akka.cluster.sharding.ShardCoordinator.Internal.{ ShardStopped, HandOff }
import akka.cluster.sharding.ShardRegion.{ CurrentRegions, GetCurrentRegions, Passivate } import akka.cluster.sharding.ShardRegion.Passivate
import akka.cluster.sharding.ShardRegion.GetCurrentRegions
import akka.cluster.sharding.ShardRegion.CurrentRegions
import language.postfixOps import language.postfixOps
import scala.concurrent.duration._ import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
@ -23,7 +24,6 @@ import akka.remote.testkit.STMultiNodeSpec
import akka.testkit._ import akka.testkit._
import akka.testkit.TestEvent.Mute import akka.testkit.TestEvent.Mute
import java.io.File import java.io.File
import org.apache.commons.io.FileUtils import org.apache.commons.io.FileUtils
import akka.cluster.singleton.ClusterSingletonManager import akka.cluster.singleton.ClusterSingletonManager
import akka.cluster.singleton.ClusterSingletonManagerSettings import akka.cluster.singleton.ClusterSingletonManagerSettings
@ -39,17 +39,13 @@ object ClusterShardingSpec {
case object Stop case object Stop
final case class CounterChanged(delta: Int) final case class CounterChanged(delta: Int)
object Counter { class Counter extends PersistentActor {
val ShardingTypeName: String = "Counter"
def props(id: String): Props = Props(new Counter(id))
}
class Counter(id: String) extends PersistentActor {
import ShardRegion.Passivate import ShardRegion.Passivate
context.setReceiveTimeout(120.seconds) context.setReceiveTimeout(120.seconds)
override def persistenceId: String = s"${Counter.ShardingTypeName}-$id" // self.path.name is the entity identifier (utf-8 URL-encoded)
override def persistenceId: String = "Counter-" + self.path.name
var count = 0 var count = 0
//#counter-actor //#counter-actor
@ -91,29 +87,18 @@ object ClusterShardingSpec {
case ShardRegion.StartEntity(id) (id.toLong % numberOfShards).toString case ShardRegion.StartEntity(id) (id.toLong % numberOfShards).toString
} }
object QualifiedCounter { def qualifiedCounterProps(typeName: String): Props =
def props(typeName: String, id: String): Props = Props(new QualifiedCounter(typeName, id)) Props(new QualifiedCounter(typeName))
class QualifiedCounter(typeName: String) extends Counter {
override def persistenceId: String = typeName + "-" + self.path.name
} }
class QualifiedCounter(typeName: String, id: String) extends Counter(id) { class AnotherCounter extends QualifiedCounter("AnotherCounter")
override def persistenceId: String = s"$typeName-$id"
}
object AnotherCounter {
val ShardingTypeName: String = "AnotherCounter"
def props(id: String): Props = Props(new AnotherCounter(id))
}
class AnotherCounter(id: String) extends QualifiedCounter(AnotherCounter.ShardingTypeName, id)
//#supervisor //#supervisor
object CounterSupervisor { class CounterSupervisor extends Actor {
val ShardingTypeName: String = "CounterSupervisor" val counter = context.actorOf(Props[Counter], "theCounter")
def props(id: String): Props = Props(new CounterSupervisor(id))
}
class CounterSupervisor(entityId: String) extends Actor {
val counter = context.actorOf(Counter.props(entityId), "theCounter")
override val supervisorStrategy = OneForOneStrategy() { override val supervisorStrategy = OneForOneStrategy() {
case _: IllegalArgumentException SupervisorStrategy.Resume case _: IllegalArgumentException SupervisorStrategy.Resume
@ -128,9 +113,6 @@ object ClusterShardingSpec {
} }
//#supervisor //#supervisor
// must use different unique name for some tests than the one used in API tests
val TestCounterShardingTypeName = s"Test${Counter.ShardingTypeName}"
} }
abstract class ClusterShardingSpecConfig( abstract class ClusterShardingSpecConfig(
@ -316,7 +298,7 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends Mu
ShardCoordinator.props(typeName, settings, allocationStrategy, replicator, majorityMinCap) ShardCoordinator.props(typeName, settings, allocationStrategy, replicator, majorityMinCap)
} }
List(TestCounterShardingTypeName, "rebalancingCounter", "RememberCounterEntities", "AnotherRememberCounter", List("counter", "rebalancingCounter", "RememberCounterEntities", "AnotherRememberCounter",
"RememberCounter", "RebalancingRememberCounter", "AutoMigrateRememberRegionTest").foreach { typeName "RememberCounter", "RebalancingRememberCounter", "AutoMigrateRememberRegionTest").foreach { typeName
val rebalanceEnabled = typeName.toLowerCase.startsWith("rebalancing") val rebalanceEnabled = typeName.toLowerCase.startsWith("rebalancing")
val rememberEnabled = typeName.toLowerCase.contains("remember") val rememberEnabled = typeName.toLowerCase.contains("remember")
@ -347,7 +329,7 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends Mu
system.actorOf( system.actorOf(
ShardRegion.props( ShardRegion.props(
typeName = typeName, typeName = typeName,
entityPropsFactory = entityId QualifiedCounter.props(typeName, entityId), entityProps = qualifiedCounterProps(typeName),
settings = settings, settings = settings,
coordinatorPath = "/user/" + typeName + "Coordinator/singleton/coordinator", coordinatorPath = "/user/" + typeName + "Coordinator/singleton/coordinator",
extractEntityId = extractEntityId, extractEntityId = extractEntityId,
@ -358,7 +340,7 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends Mu
name = typeName + "Region") name = typeName + "Region")
} }
lazy val region = createRegion(TestCounterShardingTypeName, rememberEntities = false) lazy val region = createRegion("counter", rememberEntities = false)
lazy val rebalancingRegion = createRegion("rebalancingCounter", rememberEntities = false) lazy val rebalancingRegion = createRegion("rebalancingCounter", rememberEntities = false)
lazy val persistentEntitiesRegion = createRegion("RememberCounterEntities", rememberEntities = true) lazy val persistentEntitiesRegion = createRegion("RememberCounterEntities", rememberEntities = true)
@ -430,7 +412,7 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends Mu
region ! EntityEnvelope(2, Increment) region ! EntityEnvelope(2, Increment)
region ! Get(2) region ! Get(2)
expectMsg(3) expectMsg(3)
lastSender.path should ===(node(second) / "user" / s"${TestCounterShardingTypeName}Region" / "2" / "2") lastSender.path should ===(node(second) / "user" / "counterRegion" / "2" / "2")
region ! Get(11) region ! Get(11)
expectMsg(1) expectMsg(1)
@ -438,7 +420,7 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends Mu
lastSender.path should ===(region.path / "11" / "11") lastSender.path should ===(region.path / "11" / "11")
region ! Get(12) region ! Get(12)
expectMsg(1) expectMsg(1)
lastSender.path should ===(node(second) / "user" / s"${TestCounterShardingTypeName}Region" / "0" / "12") lastSender.path should ===(node(second) / "user" / "counterRegion" / "0" / "12")
} }
enterBarrier("first-update") enterBarrier("first-update")
@ -477,10 +459,10 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends Mu
val settings = ClusterShardingSettings(cfg) val settings = ClusterShardingSettings(cfg)
val proxy = system.actorOf( val proxy = system.actorOf(
ShardRegion.proxyProps( ShardRegion.proxyProps(
typeName = TestCounterShardingTypeName, typeName = "counter",
dataCenter = None, dataCenter = None,
settings, settings,
coordinatorPath = s"/user/${TestCounterShardingTypeName}Coordinator/singleton/coordinator", coordinatorPath = "/user/counterCoordinator/singleton/coordinator",
extractEntityId = extractEntityId, extractEntityId = extractEntityId,
extractShardId = extractShardId, extractShardId = extractShardId,
system.deadLetters, system.deadLetters,
@ -555,12 +537,12 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends Mu
region ! EntityEnvelope(3, Increment) region ! EntityEnvelope(3, Increment)
region ! Get(3) region ! Get(3)
expectMsg(11) expectMsg(11)
lastSender.path should ===(node(third) / "user" / s"${TestCounterShardingTypeName}Region" / "3" / "3") lastSender.path should ===(node(third) / "user" / "counterRegion" / "3" / "3")
region ! EntityEnvelope(4, Increment) region ! EntityEnvelope(4, Increment)
region ! Get(4) region ! Get(4)
expectMsg(21) expectMsg(21)
lastSender.path should ===(node(fourth) / "user" / s"${TestCounterShardingTypeName}Region" / "4" / "4") lastSender.path should ===(node(fourth) / "user" / "counterRegion" / "4" / "4")
} }
enterBarrier("first-update") enterBarrier("first-update")
@ -593,7 +575,7 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends Mu
within(1.second) { within(1.second) {
region.tell(Get(3), probe3.ref) region.tell(Get(3), probe3.ref)
probe3.expectMsg(11) probe3.expectMsg(11)
probe3.lastSender.path should ===(node(third) / "user" / s"${TestCounterShardingTypeName}Region" / "3" / "3") probe3.lastSender.path should ===(node(third) / "user" / "counterRegion" / "3" / "3")
} }
} }
val probe4 = TestProbe() val probe4 = TestProbe()
@ -601,7 +583,7 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends Mu
within(1.second) { within(1.second) {
region.tell(Get(4), probe4.ref) region.tell(Get(4), probe4.ref)
probe4.expectMsg(21) probe4.expectMsg(21)
probe4.lastSender.path should ===(node(fourth) / "user" / s"${TestCounterShardingTypeName}Region" / "4" / "4") probe4.lastSender.path should ===(node(fourth) / "user" / "counterRegion" / "4" / "4")
} }
} }
@ -646,23 +628,23 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends Mu
runOn(third, fourth, fifth, sixth) { runOn(third, fourth, fifth, sixth) {
//#counter-start //#counter-start
val counterRegion: ActorRef = ClusterSharding(system).start( val counterRegion: ActorRef = ClusterSharding(system).start(
typeName = Counter.ShardingTypeName, typeName = "Counter",
entityPropsFactory = entityId Counter.props(entityId), entityProps = Props[Counter],
settings = ClusterShardingSettings(system), settings = ClusterShardingSettings(system),
extractEntityId = extractEntityId, extractEntityId = extractEntityId,
extractShardId = extractShardId) extractShardId = extractShardId)
//#counter-start //#counter-start
ClusterSharding(system).start( ClusterSharding(system).start(
typeName = AnotherCounter.ShardingTypeName, typeName = "AnotherCounter",
entityPropsFactory = entityId AnotherCounter.props(entityId), entityProps = Props[AnotherCounter],
settings = ClusterShardingSettings(system), settings = ClusterShardingSettings(system),
extractEntityId = extractEntityId, extractEntityId = extractEntityId,
extractShardId = extractShardId) extractShardId = extractShardId)
//#counter-supervisor-start //#counter-supervisor-start
ClusterSharding(system).start( ClusterSharding(system).start(
typeName = CounterSupervisor.ShardingTypeName, typeName = "SupervisedCounter",
entityPropsFactory = entityId CounterSupervisor.props(entityId), entityProps = Props[CounterSupervisor],
settings = ClusterShardingSettings(system), settings = ClusterShardingSettings(system),
extractEntityId = extractEntityId, extractEntityId = extractEntityId,
extractShardId = extractShardId) extractShardId = extractShardId)
@ -671,18 +653,17 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends Mu
enterBarrier("extension-started") enterBarrier("extension-started")
runOn(fifth) { runOn(fifth) {
//#counter-usage //#counter-usage
val counterRegion: ActorRef = ClusterSharding(system).shardRegion(Counter.ShardingTypeName) val counterRegion: ActorRef = ClusterSharding(system).shardRegion("Counter")
val entityId = 999 counterRegion ! Get(123)
counterRegion ! Get(entityId)
expectMsg(0) expectMsg(0)
counterRegion ! EntityEnvelope(entityId, Increment) counterRegion ! EntityEnvelope(123, Increment)
counterRegion ! Get(entityId) counterRegion ! Get(123)
expectMsg(1) expectMsg(1)
//#counter-usage //#counter-usage
ClusterSharding(system).shardRegion(AnotherCounter.ShardingTypeName) ! EntityEnvelope(entityId, Decrement) ClusterSharding(system).shardRegion("AnotherCounter") ! EntityEnvelope(123, Decrement)
ClusterSharding(system).shardRegion(AnotherCounter.ShardingTypeName) ! Get(entityId) ClusterSharding(system).shardRegion("AnotherCounter") ! Get(123)
expectMsg(-1) expectMsg(-1)
} }
@ -691,8 +672,8 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends Mu
// sixth is a frontend node, i.e. proxy only // sixth is a frontend node, i.e. proxy only
runOn(sixth) { runOn(sixth) {
for (n 1000 to 1010) { for (n 1000 to 1010) {
ClusterSharding(system).shardRegion(Counter.ShardingTypeName) ! EntityEnvelope(n, Increment) ClusterSharding(system).shardRegion("Counter") ! EntityEnvelope(n, Increment)
ClusterSharding(system).shardRegion(Counter.ShardingTypeName) ! Get(n) ClusterSharding(system).shardRegion("Counter") ! Get(n)
expectMsg(1) expectMsg(1)
lastSender.path.address should not be (Cluster(system).selfAddress) lastSender.path.address should not be (Cluster(system).selfAddress)
} }
@ -705,7 +686,7 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends Mu
runOn(first) { runOn(first) {
val counterRegionViaStart: ActorRef = ClusterSharding(system).start( val counterRegionViaStart: ActorRef = ClusterSharding(system).start(
typeName = "ApiTest", typeName = "ApiTest",
entityPropsFactory = Counter.props, entityProps = Props[Counter],
settings = ClusterShardingSettings(system), settings = ClusterShardingSettings(system),
extractEntityId = extractEntityId, extractEntityId = extractEntityId,
extractShardId = extractShardId) extractShardId = extractShardId)
@ -722,7 +703,7 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends Mu
runOn(sixth) { runOn(sixth) {
// #proxy-dc // #proxy-dc
val counterProxyDcB: ActorRef = ClusterSharding(system).startProxy( val counterProxyDcB: ActorRef = ClusterSharding(system).startProxy(
typeName = Counter.ShardingTypeName, typeName = "Counter",
role = None, role = None,
dataCenter = Some("B"), dataCenter = Some("B"),
extractEntityId = extractEntityId, extractEntityId = extractEntityId,

View file

@ -45,8 +45,8 @@ The above actor uses event sourcing and the support provided in @scala[`Persiste
It does not have to be a persistent actor, but in case of failure or migration of entities between nodes it must be able to recover It does not have to be a persistent actor, but in case of failure or migration of entities between nodes it must be able to recover
its state if it is valuable. its state if it is valuable.
Note how the `persistenceId` is defined - it must be unique to the entity, so using the entity identifier is advised. Note how the `persistenceId` is defined. The name of the actor is the entity identifier (utf-8 URL-encoded).
You may define it in other ways, but it must be unique. You may define it another way, but it must be unique.
When using the sharding extension you are first, typically at system startup on each node When using the sharding extension you are first, typically at system startup on each node
in the cluster, supposed to register the supported entity types with the `ClusterSharding.start` in the cluster, supposed to register the supported entity types with the `ClusterSharding.start`
@ -58,10 +58,6 @@ Scala
Java Java
: @@snip [ClusterShardingTest.java]($code$/java/jdocs/sharding/ClusterShardingTest.java) { #counter-start } : @@snip [ClusterShardingTest.java]($code$/java/jdocs/sharding/ClusterShardingTest.java) { #counter-start }
In some cases, the actor may need to know the `entityId` associated with it. This can be achieved using the `entityPropsFactory`
parameter to `ClusterSharding.start`. The entity ID will be passed to the factory as a parameter, which can then be used in
the creation of the actor (like the above example).
The @scala[`extractEntityId` and `extractShardId` are two] @java[`messageExtractor` defines] application specific @scala[functions] @java[methods] to extract the entity The @scala[`extractEntityId` and `extractShardId` are two] @java[`messageExtractor` defines] application specific @scala[functions] @java[methods] to extract the entity
identifier and the shard identifier from incoming messages. identifier and the shard identifier from incoming messages.

View file

@ -7,8 +7,6 @@ package jdocs.sharding;
import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.concurrent.TimeUnit.SECONDS;
import java.util.Optional; import java.util.Optional;
import java.util.function.Function;
import scala.concurrent.duration.Duration; import scala.concurrent.duration.Duration;
import akka.actor.AbstractActor; import akka.actor.AbstractActor;
@ -19,6 +17,7 @@ import akka.actor.OneForOneStrategy;
import akka.actor.PoisonPill; import akka.actor.PoisonPill;
import akka.actor.Props; import akka.actor.Props;
import akka.actor.SupervisorStrategy; import akka.actor.SupervisorStrategy;
import akka.actor.Terminated;
import akka.actor.ReceiveTimeout; import akka.actor.ReceiveTimeout;
//#counter-extractor //#counter-extractor
import akka.cluster.sharding.ShardRegion; import akka.cluster.sharding.ShardRegion;
@ -32,6 +31,7 @@ import akka.cluster.sharding.ClusterShardingSettings;
//#counter-start //#counter-start
import akka.persistence.AbstractPersistentActor; import akka.persistence.AbstractPersistentActor;
import akka.cluster.Cluster;
import akka.japi.pf.DeciderBuilder; import akka.japi.pf.DeciderBuilder;
// Doc code, compile only // Doc code, compile only
@ -85,12 +85,12 @@ public class ClusterShardingTest {
//#counter-start //#counter-start
Option<String> roleOption = Option.none(); Option<String> roleOption = Option.none();
ClusterShardingSettings settings = ClusterShardingSettings.create(system); ClusterShardingSettings settings = ClusterShardingSettings.create(system);
ActorRef startedCounterRegion = ClusterSharding.get(system).start(Counter.ShardingTypeName, ActorRef startedCounterRegion = ClusterSharding.get(system).start("Counter",
entityId -> Counter.props(entityId), settings, messageExtractor); Props.create(Counter.class), settings, messageExtractor);
//#counter-start //#counter-start
//#counter-usage //#counter-usage
ActorRef counterRegion = ClusterSharding.get(system).shardRegion(Counter.ShardingTypeName); ActorRef counterRegion = ClusterSharding.get(system).shardRegion("Counter");
counterRegion.tell(new Counter.Get(123), getSelf()); counterRegion.tell(new Counter.Get(123), getSelf());
counterRegion.tell(new Counter.EntityEnvelope(123, counterRegion.tell(new Counter.EntityEnvelope(123,
@ -99,14 +99,14 @@ public class ClusterShardingTest {
//#counter-usage //#counter-usage
//#counter-supervisor-start //#counter-supervisor-start
ClusterSharding.get(system).start(CounterSupervisor.ShardingTypeName, ClusterSharding.get(system).start("SupervisedCounter",
entityId -> CounterSupervisor.props(entityId), settings, messageExtractor); Props.create(CounterSupervisor.class), settings, messageExtractor);
//#counter-supervisor-start //#counter-supervisor-start
//#proxy-dc //#proxy-dc
ActorRef counterProxyDcB = ActorRef counterProxyDcB =
ClusterSharding.get(system).startProxy( ClusterSharding.get(system).startProxy(
Counter.ShardingTypeName, "Counter",
Optional.empty(), Optional.empty(),
Optional.of("B"), // data center name Optional.of("B"), // data center name
messageExtractor); messageExtractor);
@ -189,22 +189,12 @@ public class ClusterShardingTest {
} }
} }
public static final String ShardingTypeName = "Counter";
public static Props props(String id) {
return Props.create(() -> new Counter(id));
}
final String entityId;
int count = 0; int count = 0;
public Counter(String entityId) { // getSelf().path().name() is the entity identifier (utf-8 URL-encoded)
this.entityId = entityId;
}
@Override @Override
public String persistenceId() { public String persistenceId() {
return ShardingTypeName + "-" + entityId; return "Counter-" + getSelf().path().name();
} }
@Override @Override
@ -257,17 +247,9 @@ public class ClusterShardingTest {
static//#supervisor static//#supervisor
public class CounterSupervisor extends AbstractActor { public class CounterSupervisor extends AbstractActor {
public static final String ShardingTypeName = "CounterSupervisor";
public static Props props(String entityId) { private final ActorRef counter = getContext().actorOf(
return Props.create(() -> new CounterSupervisor(entityId)); Props.create(Counter.class), "theCounter");
}
private final ActorRef counter;
public CounterSupervisor(String entityId) {
counter = getContext().actorOf(Counter.props(entityId), "theCounter");
}
private static final SupervisorStrategy strategy = private static final SupervisorStrategy strategy =
new OneForOneStrategy(DeciderBuilder. new OneForOneStrategy(DeciderBuilder.