diff --git a/akka-contrib/docs/cluster-sharding.rst b/akka-contrib/docs/cluster-sharding.rst index 6971201426..9f1473769d 100644 --- a/akka-contrib/docs/cluster-sharding.rst +++ b/akka-contrib/docs/cluster-sharding.rst @@ -8,12 +8,12 @@ be able to interact with them using their logical identifier, but without having their physical location in the cluster, which might also change over time. It could for example be actors representing Aggregate Roots in Domain-Driven Design terminology. -Here we call these actors "entries". These actors typically have persistent (durable) state, +Here we call these actors "entries". These actors typically have persistent (durable) state, but this feature is not limited to actors with persistent state. Cluster sharding is typically used when you have many stateful actors that together consume more resources (e.g. memory) than fit on one machine. If you only have a few stateful actors -it might be easier to run them on a :ref:`cluster-singleton` node. +it might be easier to run them on a :ref:`cluster-singleton` node. In this context sharding means that actors with an identifier, so called entries, can be automatically distributed across multiple nodes in the cluster. Each entry @@ -46,21 +46,21 @@ identifier and the shard identifier from incoming messages. .. includecode:: @contribSrc@/src/test/java/akka/contrib/pattern/ClusterShardingTest.java#counter-extractor -This example illustrates two different ways to define the entry identifier in the messages: +This example illustrates two different ways to define the entry identifier in the messages: * The ``Get`` message includes the identifier itself. * The ``EntryEnvelope`` holds the identifier, and the actual message that is - sent to the entry actor is wrapped in the envelope. + sent to the entry actor is wrapped in the envelope. Note how these two messages types are handled in the ``entryId`` and ``entryMessage`` methods shown above. A shard is a group of entries that will be managed together. The grouping is defined by the ``shardResolver`` function shown above. Creating a good sharding algorithm is an interesting challenge in itself. Try to produce a uniform distribution, i.e. same amount of entries in each shard. -As a rule of thumb, the number of shards should be a factor ten greater than the planned maximum number +As a rule of thumb, the number of shards should be a factor ten greater than the planned maximum number of cluster nodes. -Messages to the entries are always sent via the local ``ShardRegion``. The ``ShardRegion`` actor for a +Messages to the entries are always sent via the local ``ShardRegion``. The ``ShardRegion`` actor for a named entry type can be retrieved with ``ClusterSharding.shardRegion``. The ``ShardRegion`` will lookup the location of the shard for the entry if it does not already know its location. It will delegate the message to the right node and it will create the entry actor on demand, i.e. when the @@ -92,21 +92,21 @@ identifier and the shard identifier from incoming messages. .. includecode:: @contribSrc@/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala#counter-extractor -This example illustrates two different ways to define the entry identifier in the messages: +This example illustrates two different ways to define the entry identifier in the messages: * The ``Get`` message includes the identifier itself. * The ``EntryEnvelope`` holds the identifier, and the actual message that is - sent to the entry actor is wrapped in the envelope. + sent to the entry actor is wrapped in the envelope. Note how these two messages types are handled in the ``idExtractor`` function shown above. A shard is a group of entries that will be managed together. The grouping is defined by the ``shardResolver`` function shown above. Creating a good sharding algorithm is an interesting challenge in itself. Try to produce a uniform distribution, i.e. same amount of entries in each shard. -As a rule of thumb, the number of shards should be a factor ten greater than the planned maximum number -of cluster nodes. +As a rule of thumb, the number of shards should be a factor ten greater than the planned maximum number +of cluster nodes. -Messages to the entries are always sent via the local ``ShardRegion``. The ``ShardRegion`` actor for a +Messages to the entries are always sent via the local ``ShardRegion``. The ``ShardRegion`` actor for a named entry type can be retrieved with ``ClusterSharding.shardRegion``. The ``ShardRegion`` will lookup the location of the shard for the entry if it does not already know its location. It will delegate the message to the right node and it will create the entry actor on demand, i.e. when the @@ -123,14 +123,16 @@ How it works The ``ShardRegion`` actor is started on each node in the cluster, or group of nodes tagged with a specific role. The ``ShardRegion`` is created with two application specific functions to extract the entry identifier and the shard identifier from incoming messages. -A shard is a group of entries that will be managed together. For the first message in a +A shard is a group of entries that will be managed together. For the first message in a specific shard the ``ShardRegion`` request the location of the shard from a central coordinator, -the ``ShardCoordinator``. +the ``ShardCoordinator``. + +The ``ShardCoordinator`` decides which ``ShardRegion`` shall own the ``Shard`` and informs +that ``ShardRegion``. The region will confirm this request and create the ``Shard`` supervisor +as a child actor. The individual ``Entries`` will then be created when needed by the ``Shard`` +actor. Incoming messages thus travel via the ``ShardRegion`` and the ``Shard`` to the target +``Entry``. -The ``ShardCoordinator`` decides which ``ShardRegion`` that -owns the shard. The ``ShardRegion`` receives the decided home of the shard -and if that is the ``ShardRegion`` instance itself it will create a local child -actor representing the entry and direct all messages for that entry to it. If the shard home is another ``ShardRegion`` instance messages will be forwarded to that ``ShardRegion`` instance instead. While resolving the location of a shard incoming messages for that shard are buffered and later delivered when the @@ -139,20 +141,20 @@ to the target destination immediately without involving the ``ShardCoordinator`` Scenario 1: -#. Incoming message M1 to ``ShardRegion`` instance R1. -#. M1 is mapped to shard S1. R1 doesn't know about S1, so it asks the coordinator C for the location of S1. +#. Incoming message M1 to ``ShardRegion`` instance R1. +#. M1 is mapped to shard S1. R1 doesn't know about S1, so it asks the coordinator C for the location of S1. #. C answers that the home of S1 is R1. #. R1 creates child actor for the entry E1 and sends buffered messages for S1 to E1 child #. All incoming messages for S1 which arrive at R1 can be handled by R1 without C. It creates entry children as needed, and forwards messages to them. Scenario 2: -#. Incoming message M2 to R1. -#. M2 is mapped to S2. R1 doesn't know about S2, so it asks C for the location of S2. +#. Incoming message M2 to R1. +#. M2 is mapped to S2. R1 doesn't know about S2, so it asks C for the location of S2. #. C answers that the home of S2 is R2. #. R1 sends buffered messages for S2 to R2 #. All incoming messages for S2 which arrive at R1 can be handled by R1 without C. It forwards messages to R2. -#. R2 receives message for S2, ask C, which answers that the home of S2 is R2, and we are in Scenario 1 (but for R2). +#. R2 receives message for S2, ask C, which answers that the home of S2 is R2, and we are in Scenario 1 (but for R2). To make sure that at most one instance of a specific entry actor is running somewhere in the cluster it is important that all nodes have the same view of where the shards @@ -226,12 +228,30 @@ reduce memory consumption. This is done by the application specific implementati the entry actors for example by defining receive timeout (``context.setReceiveTimeout``). If a message is already enqueued to the entry when it stops itself the enqueued message in the mailbox will be dropped. To support graceful passivation without loosing such -messages the entry actor can send ``ShardRegion.Passivate`` to its parent ``ShardRegion``. +messages the entry actor can send ``ShardRegion.Passivate`` to its parent ``Shard``. The specified wrapped message in ``Passivate`` will be sent back to the entry, which is -then supposed to stop itself. Incoming messages will be buffered by the ``ShardRegion`` +then supposed to stop itself. Incoming messages will be buffered by the ``Shard`` between reception of ``Passivate`` and termination of the entry. Such buffered messages are thereafter delivered to a new incarnation of the entry. +Remembering Entries +------------------- + +The list of entries in each ``Shard`` can be made persistent (durable) by setting +the ``rememberEntries`` flag to true when calling ``ClusterSharding.start``. When configured +to remember entries, whenever a ``Shard`` is rebalanced onto another node or recovers after a +crash it will recreate all the entries which were previously running in that ``Shard``. To +permanently stop entries, a ``Passivate`` message must be sent to the parent the ``Shard``, otherwise the +entry will be automatically restarted after the entry restart backoff specified in the configuration. + +When ``rememberEntries`` is set to false, a ``Shard`` will not automatically restart any entries +after a rebalance or recovering from a crash. Entries will only be started once the first message +for that entry has been received in the ``Shard``. Entries will not be restarted if they stop without +using a ``Passivate``. + +Note that the state of the entries themselves will not be restored unless they have been made persistent, +e.g. with ``akka-persistence``. + Configuration ------------- diff --git a/akka-contrib/src/main/resources/reference.conf b/akka-contrib/src/main/resources/reference.conf index 4c82a8ad94..a858df7221 100644 --- a/akka-contrib/src/main/resources/reference.conf +++ b/akka-contrib/src/main/resources/reference.conf @@ -93,6 +93,16 @@ akka.contrib.cluster.sharding { buffer-size = 100000 # Timeout of the shard rebalancing process. handoff-timeout = 60 s + # Time given to a region to acknowdge it's hosting a shard. + shard-start-timeout = 10 s + # If the shard can't store state changes it will retry the action + # again after this duration. Any messages sent to an affected entry + # will be buffered until the state change is processed + shard-failure-backoff = 10 s + # If the shard is remembering entries and an entry stops itself without + # using passivate. The entry will be restarted after this duration or when + # the next message for it is received, which ever occurs first. + entry-restart-backoff = 10 s # Rebalance check is performed periodically with this interval. rebalance-interval = 10 s # How often the coordinator saves persistent snapshots, which are diff --git a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala index 5e8a89734c..b4f1a7100f 100644 --- a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala +++ b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala @@ -5,6 +5,9 @@ package akka.contrib.pattern import java.net.URLEncoder import java.util.concurrent.ConcurrentHashMap +import akka.contrib.pattern.Shard.{ ShardCommand, StateChange } +import akka.contrib.pattern.ShardCoordinator.Internal.SnapshotTick + import scala.collection.immutable import scala.concurrent.Await import scala.concurrent.duration._ @@ -32,11 +35,8 @@ import akka.cluster.ClusterEvent.MemberUp import akka.cluster.Member import akka.cluster.MemberStatus import akka.pattern.ask -import akka.persistence.PersistentActor +import akka.persistence._ import akka.cluster.ClusterEvent.ClusterDomainEvent -import akka.persistence.SnapshotOffer -import akka.persistence.SaveSnapshotSuccess -import akka.persistence.SaveSnapshotFailure /** * This extension provides sharding functionality of actors in a cluster. @@ -184,6 +184,9 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { val RetryInterval: FiniteDuration = config.getDuration("retry-interval", MILLISECONDS).millis val BufferSize: Int = config.getInt("buffer-size") val HandOffTimeout: FiniteDuration = config.getDuration("handoff-timeout", MILLISECONDS).millis + val ShardStartTimeout: FiniteDuration = config.getDuration("shard-start-timeout", MILLISECONDS).millis + val ShardFailureBackoff = config.getDuration("shard-failure-backoff", MILLISECONDS).millis + val EntryRestartBackoff = config.getDuration("entry-restart-backoff", MILLISECONDS).millis val RebalanceInterval: FiniteDuration = config.getDuration("rebalance-interval", MILLISECONDS).millis val SnapshotInterval: FiniteDuration = config.getDuration("snapshot-interval", MILLISECONDS).millis val LeastShardAllocationRebalanceThreshold: Int = @@ -208,6 +211,8 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { * if not defined (None) the `ShardRegion` on this node will run in proxy only mode, i.e. * it will delegate messages to other `ShardRegion` actors on other nodes, but not host any * entry actors itself + * @param rememberEntries true if entry actors shall created be automatically restarted upon `Shard` + * restart. i.e. if the `Shard` is started on a different `ShardRegion` due to rebalance or crash. * @param idExtractor partial function to extract the entry id and the message to send to the * entry from the incoming message, if the partial function does not match the message will * be `unhandled`, i.e. posted as `Unhandled` messages on the event stream @@ -220,12 +225,13 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { def start( typeName: String, entryProps: Option[Props], + rememberEntries: Boolean, idExtractor: ShardRegion.IdExtractor, shardResolver: ShardRegion.ShardResolver, allocationStrategy: ShardAllocationStrategy): ActorRef = { implicit val timeout = system.settings.CreationTimeout - val startMsg = Start(typeName, entryProps, idExtractor, shardResolver, allocationStrategy) + val startMsg = Start(typeName, entryProps, rememberEntries, idExtractor, shardResolver, allocationStrategy) val Started(shardRegion) = Await.result(guardian ? startMsg, timeout.duration) regions.put(typeName, shardRegion) shardRegion @@ -247,6 +253,8 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { * if not defined (None) the `ShardRegion` on this node will run in proxy only mode, i.e. * it will delegate messages to other `ShardRegion` actors on other nodes, but not host any * entry actors itself + * @param rememberEntries true if entry actors shall created be automatically restarted upon `Shard` + * restart. i.e. if the `Shard` is started on a different `ShardRegion` due to rebalance or crash. * @param idExtractor partial function to extract the entry id and the message to send to the * entry from the incoming message, if the partial function does not match the message will * be `unhandled`, i.e. posted as `Unhandled` messages on the event stream @@ -257,10 +265,11 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { def start( typeName: String, entryProps: Option[Props], + rememberEntries: Boolean, idExtractor: ShardRegion.IdExtractor, shardResolver: ShardRegion.ShardResolver): ActorRef = { - start(typeName, entryProps, idExtractor, shardResolver, + start(typeName, entryProps, rememberEntries, idExtractor, shardResolver, new LeastShardAllocationStrategy(LeastShardAllocationRebalanceThreshold, LeastShardAllocationMaxSimultaneousRebalance)) } @@ -277,6 +286,8 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { * if not defined (null) the `ShardRegion` on this node will run in proxy only mode, i.e. * it will delegate messages to other `ShardRegion` actors on other nodes, but not host any * entry actors itself + * @param rememberEntries true if entry actors shall created be automatically restarted upon `Shard` + * restart. i.e. if the `Shard` is started on a different `ShardRegion` due to rebalance or crash. * @param messageExtractor functions to extract the entry id, shard id, and the message to send to the * entry from the incoming message * @param allocationStrategy possibility to use a custom shard allocation and @@ -286,10 +297,11 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { def start( typeName: String, entryProps: Props, + rememberEntries: Boolean, messageExtractor: ShardRegion.MessageExtractor, allocationStrategy: ShardAllocationStrategy): ActorRef = { - start(typeName, entryProps = Option(entryProps), + start(typeName, entryProps = Option(entryProps), rememberEntries, idExtractor = { case msg if messageExtractor.entryId(msg) ne null ⇒ (messageExtractor.entryId(msg), messageExtractor.entryMessage(msg)) @@ -314,6 +326,8 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { * if not defined (null) the `ShardRegion` on this node will run in proxy only mode, i.e. * it will delegate messages to other `ShardRegion` actors on other nodes, but not host any * entry actors itself + * @param rememberEntries true if entry actors shall created be automatically restarted upon `Shard` + * restart. i.e. if the `Shard` is started on a different `ShardRegion` due to rebalance or crash. * @param messageExtractor functions to extract the entry id, shard id, and the message to send to the * entry from the incoming message * @return the actor ref of the [[ShardRegion]] that is to be responsible for the shard @@ -321,9 +335,10 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { def start( typeName: String, entryProps: Props, + rememberEntries: Boolean, messageExtractor: ShardRegion.MessageExtractor): ActorRef = { - start(typeName, entryProps, messageExtractor, + start(typeName, entryProps, rememberEntries, messageExtractor, new LeastShardAllocationStrategy(LeastShardAllocationRebalanceThreshold, LeastShardAllocationMaxSimultaneousRebalance)) } @@ -344,7 +359,7 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { */ private[akka] object ClusterShardingGuardian { import ShardCoordinator.ShardAllocationStrategy - final case class Start(typeName: String, entryProps: Option[Props], idExtractor: ShardRegion.IdExtractor, + final case class Start(typeName: String, entryProps: Option[Props], rememberEntries: Boolean, idExtractor: ShardRegion.IdExtractor, shardResolver: ShardRegion.ShardResolver, allocationStrategy: ShardAllocationStrategy) extends NoSerializationVerificationNeeded final case class Started(shardRegion: ActorRef) extends NoSerializationVerificationNeeded @@ -362,13 +377,13 @@ private[akka] class ClusterShardingGuardian extends Actor { import sharding.Settings._ def receive = { - case Start(typeName, entryProps, idExtractor, shardResolver, allocationStrategy) ⇒ + case Start(typeName, entryProps, rememberEntries, idExtractor, shardResolver, allocationStrategy) ⇒ val encName = URLEncoder.encode(typeName, "utf-8") val coordinatorSingletonManagerName = encName + "Coordinator" val coordinatorPath = (self.path / coordinatorSingletonManagerName / "singleton" / "coordinator").toStringWithoutAddress val shardRegion = context.child(encName).getOrElse { if (HasNecessaryClusterRole && context.child(coordinatorSingletonManagerName).isEmpty) { - val coordinatorProps = ShardCoordinator.props(handOffTimeout = HandOffTimeout, + val coordinatorProps = ShardCoordinator.props(handOffTimeout = HandOffTimeout, shardStartTimeout = ShardStartTimeout, rebalanceInterval = RebalanceInterval, snapshotInterval = SnapshotInterval, allocationStrategy) val singletonProps = ShardCoordinatorSupervisor.props(CoordinatorFailureBackoff, coordinatorProps) context.actorOf(ClusterSingletonManager.props( @@ -380,11 +395,16 @@ private[akka] class ClusterShardingGuardian extends Actor { } context.actorOf(ShardRegion.props( + typeName = typeName, entryProps = if (HasNecessaryClusterRole) entryProps else None, role = Role, coordinatorPath = coordinatorPath, retryInterval = RetryInterval, + snapshotInterval = SnapshotInterval, + shardFailureBackoff = ShardFailureBackoff, + entryRestartBackoff = EntryRestartBackoff, bufferSize = BufferSize, + rememberEntries = rememberEntries, idExtractor = idExtractor, shardResolver = shardResolver), name = encName) @@ -404,40 +424,55 @@ object ShardRegion { * Scala API: Factory method for the [[akka.actor.Props]] of the [[ShardRegion]] actor. */ def props( + typeName: String, entryProps: Option[Props], role: Option[String], coordinatorPath: String, retryInterval: FiniteDuration, + shardFailureBackoff: FiniteDuration, + entryRestartBackoff: FiniteDuration, + snapshotInterval: FiniteDuration, bufferSize: Int, + rememberEntries: Boolean, idExtractor: ShardRegion.IdExtractor, shardResolver: ShardRegion.ShardResolver): Props = - Props(classOf[ShardRegion], entryProps, role, coordinatorPath, retryInterval, bufferSize, idExtractor, shardResolver) + Props(new ShardRegion(typeName, entryProps, role, coordinatorPath, retryInterval, shardFailureBackoff, entryRestartBackoff, snapshotInterval, bufferSize, rememberEntries, idExtractor, shardResolver)) /** * Scala API: Factory method for the [[akka.actor.Props]] of the [[ShardRegion]] actor. */ def props( + typeName: String, entryProps: Props, role: Option[String], coordinatorPath: String, retryInterval: FiniteDuration, + shardFailureBackoff: FiniteDuration, + entryRestartBackoff: FiniteDuration, + snapshotInterval: FiniteDuration, bufferSize: Int, + rememberEntries: Boolean, idExtractor: ShardRegion.IdExtractor, shardResolver: ShardRegion.ShardResolver): Props = - props(Some(entryProps), role, coordinatorPath, retryInterval, bufferSize, idExtractor, shardResolver) + props(typeName, Some(entryProps), role, coordinatorPath, retryInterval, shardFailureBackoff, entryRestartBackoff, snapshotInterval, bufferSize, rememberEntries, idExtractor, shardResolver) /** * Java API: Factory method for the [[akka.actor.Props]] of the [[ShardRegion]] actor. */ def props( + typeName: String, entryProps: Props, role: String, coordinatorPath: String, retryInterval: FiniteDuration, + shardFailureBackoff: FiniteDuration, + entryRestartBackoff: FiniteDuration, + snapshotInterval: FiniteDuration, bufferSize: Int, + rememberEntries: Boolean, messageExtractor: ShardRegion.MessageExtractor): Props = { - props(Option(entryProps), roleOption(role), coordinatorPath, retryInterval, bufferSize, + props(typeName, Option(entryProps), roleOption(role), coordinatorPath, retryInterval, shardFailureBackoff, entryRestartBackoff, snapshotInterval, bufferSize, rememberEntries, idExtractor = { case msg if messageExtractor.entryId(msg) ne null ⇒ (messageExtractor.entryId(msg), messageExtractor.entryMessage(msg)) @@ -450,30 +485,31 @@ object ShardRegion { * when using it in proxy only mode. */ def proxyProps( + typeName: String, role: Option[String], coordinatorPath: String, retryInterval: FiniteDuration, bufferSize: Int, idExtractor: ShardRegion.IdExtractor, shardResolver: ShardRegion.ShardResolver): Props = - Props(classOf[ShardRegion], None, role, coordinatorPath, retryInterval, bufferSize, idExtractor, shardResolver) + Props(new ShardRegion(typeName, None, role, coordinatorPath, retryInterval, Duration.Zero, Duration.Zero, Duration.Zero, bufferSize, false, idExtractor, shardResolver)) /** * Java API: : Factory method for the [[akka.actor.Props]] of the [[ShardRegion]] actor * when using it in proxy only mode. */ def proxyProps( + typeName: String, role: String, coordinatorPath: String, retryInterval: FiniteDuration, bufferSize: Int, messageExtractor: ShardRegion.MessageExtractor): Props = { - proxyProps(roleOption(role), coordinatorPath, retryInterval, bufferSize, - idExtractor = { - case msg if messageExtractor.entryId(msg) ne null ⇒ - (messageExtractor.entryId(msg), messageExtractor.entryMessage(msg)) - }, + proxyProps(typeName, roleOption(role), coordinatorPath, retryInterval, bufferSize, idExtractor = { + case msg if messageExtractor.entryId(msg) ne null ⇒ + (messageExtractor.entryId(msg), messageExtractor.entryMessage(msg)) + }, shardResolver = msg ⇒ messageExtractor.shardId(msg)) } @@ -581,6 +617,9 @@ object ShardRegion { } } } + + private[akka] def handOffStopperProps(shard: String, replyTo: ActorRef, entries: Set[ActorRef]): Props = + Props(new HandOffStopper(shard, replyTo, entries)) } /** @@ -591,11 +630,16 @@ object ShardRegion { * @see [[ClusterSharding$ ClusterSharding extension]] */ class ShardRegion( + typeName: String, entryProps: Option[Props], role: Option[String], coordinatorPath: String, retryInterval: FiniteDuration, + shardFailureBackoff: FiniteDuration, + entryRestartBackoff: FiniteDuration, + snapshotInterval: FiniteDuration, bufferSize: Int, + rememberEntries: Boolean, idExtractor: ShardRegion.IdExtractor, shardResolver: ShardRegion.ShardResolver) extends Actor with ActorLogging { @@ -610,15 +654,12 @@ class ShardRegion( var regions = Map.empty[ActorRef, Set[ShardId]] var regionByShard = Map.empty[ShardId, ActorRef] - var entries = Map.empty[ActorRef, ShardId] - var entriesByShard = Map.empty[ShardId, Set[ActorRef]] var shardBuffers = Map.empty[ShardId, Vector[(Msg, ActorRef)]] - var passivatingBuffers = Map.empty[ActorRef, Vector[(Msg, ActorRef)]] + var shards = Map.empty[ShardId, ActorRef] + var shardsByRef = Map.empty[ActorRef, ShardId] + var handingOff = Set.empty[ActorRef] - def totalBufferSize = { - shardBuffers.map { case (_, buf) ⇒ buf.size }.sum + - passivatingBuffers.map { case (_, buf) ⇒ buf.size }.sum - } + def totalBufferSize = shardBuffers.map { case (_, buf) ⇒ buf.size }.sum import context.dispatcher val retryTask = context.system.scheduler.schedule(retryInterval, retryInterval, self, Retry) @@ -683,6 +724,16 @@ class ShardRegion( } def receiveCoordinatorMessage(msg: CoordinatorMessage): Unit = msg match { + case HostShard(shard) ⇒ + log.debug("Host Shard [{}] ", shard) + regionByShard = regionByShard.updated(shard, self) + regions = regions.updated(self, regions.getOrElse(self, Set.empty) + shard) + + //Start the shard, if already started this does nothing + getShard(shard) + + sender() ! ShardStarted(shard) + case ShardHome(shard, ref) ⇒ log.debug("Shard [{}] located at [{}]", shard, ref) regionByShard.get(shard) match { @@ -693,8 +744,10 @@ class ShardRegion( } regionByShard = regionByShard.updated(shard, ref) regions = regions.updated(ref, regions.getOrElse(ref, Set.empty) + shard) + if (ref != self) context.watch(ref) + shardBuffers.get(shard) match { case Some(buf) ⇒ buf.foreach { @@ -709,6 +762,9 @@ class ShardRegion( coordinator = Some(coord) requestShardBufferHomes() + case SnapshotTick ⇒ + context.children.foreach(_ ! SnapshotTick) + case BeginHandOff(shard) ⇒ log.debug("BeginHandOff shard [{}]", shard) if (regionByShard.contains(shard)) { @@ -720,7 +776,7 @@ class ShardRegion( } sender() ! BeginHandOffAck(shard) - case HandOff(shard) ⇒ + case msg @ HandOff(shard) ⇒ log.debug("HandOff shard [{}]", shard) // must drop requests that came in between the BeginHandOff and now, @@ -729,9 +785,10 @@ class ShardRegion( if (shardBuffers.contains(shard)) shardBuffers -= shard - if (entriesByShard.contains(shard)) - context.actorOf(Props(classOf[HandOffStopper], shard, sender(), entriesByShard(shard))) - else + if (shards.contains(shard)) { + handingOff = handingOff + shards(shard) + shards(shard) forward msg + } else sender() ! ShardStopped(shard) case _ ⇒ unhandled(msg) @@ -739,9 +796,6 @@ class ShardRegion( } def receiveCommand(cmd: ShardRegionCommand): Unit = cmd match { - case Passivate(stopMessage) ⇒ - passivate(sender(), stopMessage) - case Retry ⇒ if (coordinator.isEmpty) register() @@ -760,22 +814,17 @@ class ShardRegion( regions -= ref if (log.isDebugEnabled) log.debug("Region [{}] with shards [{}] terminated", ref, shards.mkString(", ")) - } else if (entries.contains(ref)) { - val shard = entries(ref) - val newShardEntities = entriesByShard(shard) - ref - if (newShardEntities.isEmpty) - entriesByShard -= shard - else - entriesByShard = entriesByShard.updated(shard, newShardEntities) - entries -= ref - if (passivatingBuffers.contains(ref)) { - log.debug("Passivating completed {}, buffered [{}]", ref, passivatingBuffers(ref).size) - // deliver messages that were received between Passivate and Terminated, - // will create new entry instance and deliver the messages to it - passivatingBuffers(ref) foreach { - case (msg, snd) ⇒ deliverMessage(msg, snd) - } - passivatingBuffers -= ref + } else if (shardsByRef.contains(ref)) { + val shardId: ShardId = shardsByRef(ref) + + //Are we meant to be handing off, or is this a unknown stop? + if (handingOff.contains(ref)) { + shardsByRef = shardsByRef - ref + shards = shards - shardId + handingOff = handingOff - ref + log.debug("Shard [{}] handoff complete", shardId) + } else { + throw new IllegalStateException(s"Shard [$shardId] terminated while not being handed off.") } } } @@ -800,35 +849,7 @@ class ShardRegion( val shard = shardResolver(msg) regionByShard.get(shard) match { case Some(ref) if ref == self ⇒ - val (id, m) = idExtractor(msg) - if (id == null || id == "") { - log.warning("Id must not be empty, dropping message [{}]", msg.getClass.getName) - context.system.deadLetters ! msg - } else { - val name = URLEncoder.encode(id, "utf-8") - val entry = context.child(name).getOrElse { - if (entryProps.isEmpty) - throw new IllegalStateException("Shard must not be allocated to a proxy only ShardRegion") - log.debug("Starting entry [{}] in shard [{}]", id, shard) - val a = context.watch(context.actorOf(entryProps.get, name)) - entries = entries.updated(a, shard) - entriesByShard = entriesByShard.updated(shard, entriesByShard.getOrElse(shard, Set.empty) + a) - a - } - passivatingBuffers.get(entry) match { - case None ⇒ - log.debug("Message [{}] for shard [{}] sent to entry", m.getClass.getName, shard) - entry.tell(m, snd) - case Some(buf) ⇒ - if (totalBufferSize >= bufferSize) { - log.debug("Buffer is full, dropping message for passivated entry in shard [{}]", shard) - context.system.deadLetters ! msg - } else { - log.debug("Message for shard [{}] buffered due to entry being passivated", shard) - passivatingBuffers = passivatingBuffers.updated(entry, buf :+ ((msg, snd))) - } - } - } + getShard(shard).tell(msg, snd) case Some(ref) ⇒ log.debug("Forwarding request for shard [{}] to [{}]", shard, ref) ref.tell(msg, snd) @@ -850,15 +871,332 @@ class ShardRegion( } } - def passivate(entry: ActorRef, stopMessage: Any): Unit = { - val entry = sender() - if (entries.contains(entry) && !passivatingBuffers.contains(entry)) { - log.debug("Passivating started {}", entry) - passivatingBuffers = passivatingBuffers.updated(entry, Vector.empty) - entry ! stopMessage + def getShard(id: ShardId): ActorRef = shards.getOrElse( + id, + entryProps match { + case Some(props) ⇒ + log.debug("Starting shard [{}] in region", id) + + val name = URLEncoder.encode(id, "utf-8") + val shard = context.watch(context.actorOf( + Shard.props( + typeName, + id, + props, + shardFailureBackoff, + entryRestartBackoff, + snapshotInterval, + bufferSize, + rememberEntries, + idExtractor, + shardResolver), + name)) + shards = shards.updated(id, shard) + shardsByRef = shardsByRef.updated(shard, id) + shard + case None ⇒ + throw new IllegalStateException("Shard must not be allocated to a proxy only ShardRegion") + }) +} + +/** + * INTERNAL API + * @see [[ClusterSharding$ ClusterSharding extension]] + */ +private[akka] object Shard { + import ShardRegion.EntryId + + object State { + val Empty = State() + } + + /** + * A Shard command + */ + sealed trait ShardCommand + + /** + * When a `StateChange` fails to write to the journal, we will retry it after a back + * off. + */ + final case class RetryPersistence(payload: StateChange) extends ShardCommand + + /** + * The Snapshot tick for the shards + */ + private case object SnapshotTick extends ShardCommand + + /** + * When an remembering entries and the entry stops without issuing a `Passivate`, we + * restart it after a back off using this message. + */ + final case class RestartEntry(entry: EntryId) extends ShardCommand + + /** + * A case class which represents a state change for the Shard + */ + sealed trait StateChange { val entryId: EntryId } + + /** + * `State` change for starting an entry in this `Shard` + */ + @SerialVersionUID(1L) final case class EntryStarted(entryId: EntryId) extends StateChange + + /** + * `State` change for an entry which has terminated. + */ + @SerialVersionUID(1L) final case class EntryStopped(entryId: EntryId) extends StateChange + + /** + * Persistent state of the Shard. + */ + @SerialVersionUID(1L) final case class State private ( + entries: Set[EntryId] = Set.empty) + + /** + * Factory method for the [[akka.actor.Props]] of the [[Shard]] actor. + */ + def props(typeName: String, + shardId: ShardRegion.ShardId, + entryProps: Props, + shardFailureBackoff: FiniteDuration, + entryRestartBackoff: FiniteDuration, + snapshotInterval: FiniteDuration, + bufferSize: Int, + rememberEntries: Boolean, + idExtractor: ShardRegion.IdExtractor, + shardResolver: ShardRegion.ShardResolver): Props = + Props(new Shard(typeName, shardId, entryProps, shardFailureBackoff, entryRestartBackoff, snapshotInterval, bufferSize, rememberEntries, idExtractor, shardResolver)) +} + +/** + * INTERNAL API + * + * This actor creates children entry actors on demand that it is told to be + * responsible for. + * + * @see [[ClusterSharding$ ClusterSharding extension]] + */ +private[akka] class Shard( + typeName: String, + shardId: ShardRegion.ShardId, + entryProps: Props, + shardFailureBackoff: FiniteDuration, + entryRestartBackoff: FiniteDuration, + snapshotInterval: FiniteDuration, + bufferSize: Int, + rememberEntries: Boolean, + idExtractor: ShardRegion.IdExtractor, + shardResolver: ShardRegion.ShardResolver) extends PersistentActor with ActorLogging { + + import ShardRegion.{ handOffStopperProps, EntryId, Msg, Passivate } + import ShardCoordinator.Internal.{ HandOff, ShardStopped } + import Shard.{ State, RetryPersistence, RestartEntry, EntryStopped, EntryStarted, SnapshotTick } + import akka.contrib.pattern.ShardCoordinator.Internal.CoordinatorMessage + import akka.contrib.pattern.ShardRegion.ShardRegionCommand + import akka.persistence.RecoveryCompleted + + import context.dispatcher + val snapshotTask = context.system.scheduler.schedule(snapshotInterval, snapshotInterval, self, SnapshotTick) + + override def persistenceId = s"/sharding/${typeName}Shard/${shardId}" + + var state = State.Empty + var idByRef = Map.empty[ActorRef, EntryId] + var refById = Map.empty[EntryId, ActorRef] + var passivating = Set.empty[ActorRef] + var messageBuffers = Map.empty[EntryId, Vector[(Msg, ActorRef)]] + + var handOffStopper: Option[ActorRef] = None + + def totalBufferSize = messageBuffers.map { case (_, buf) ⇒ buf.size }.sum + + def processChange[A](event: A)(handler: A ⇒ Unit): Unit = + if (rememberEntries) persist(event)(handler) + else handler(event) + + override def receiveRecover: Receive = { + case EntryStarted(id) if rememberEntries ⇒ state = state.copy(state.entries + id) + case EntryStopped(id) if rememberEntries ⇒ state = state.copy(state.entries - id) + case SnapshotOffer(_, snapshot: State) ⇒ state = snapshot + case RecoveryCompleted ⇒ state.entries foreach getEntry + } + + override def receiveCommand: Receive = { + case Terminated(ref) ⇒ receiveTerminated(ref) + case msg: CoordinatorMessage ⇒ receiveCoordinatorMessage(msg) + case msg: ShardCommand ⇒ receiveShardCommand(msg) + case msg: ShardRegionCommand ⇒ receiveShardRegionCommand(msg) + case PersistenceFailure(payload: StateChange, _, _) ⇒ persistenceFailure(payload) + case msg if idExtractor.isDefinedAt(msg) ⇒ deliverMessage(msg, sender()) + } + + def receiveShardCommand(msg: ShardCommand): Unit = msg match { + case SnapshotTick ⇒ saveSnapshot(state) + case RetryPersistence(payload) ⇒ retryPersistence(payload) + case RestartEntry(id) ⇒ getEntry(id) + } + + def receiveShardRegionCommand(msg: ShardRegionCommand): Unit = msg match { + case Passivate(stopMessage) ⇒ passivate(sender(), stopMessage) + case _ ⇒ unhandled(msg) + } + + def receiveCoordinatorMessage(msg: CoordinatorMessage): Unit = msg match { + case HandOff(`shardId`) ⇒ handOff(sender()) + case HandOff(shard) ⇒ log.warning("Shard [{}] can not hand off for another Shard [{}]", shardId, shard) + case _ ⇒ unhandled(msg) + } + + def persistenceFailure(payload: StateChange): Unit = { + log.debug("Persistence of [{}] failed, will backoff and retry", payload) + if (!messageBuffers.isDefinedAt(payload.entryId)) { + messageBuffers = messageBuffers.updated(payload.entryId, Vector.empty) + } + + context.system.scheduler.scheduleOnce(shardFailureBackoff, self, RetryPersistence(payload)) + } + + def retryPersistence(payload: StateChange): Unit = { + log.debug("Retrying Persistence of [{}]", payload) + persist(payload) { _ ⇒ + payload match { + case msg: EntryStarted ⇒ sendMsgBuffer(msg) + case msg: EntryStopped ⇒ passivateCompleted(msg) + } } } + def handOff(replyTo: ActorRef): Unit = handOffStopper match { + case Some(_) ⇒ log.warning("HandOff shard [{}] received during existing handOff", shardId) + case None ⇒ + log.debug("HandOff shard [{}]", shardId) + + if (state.entries.nonEmpty) { + handOffStopper = Some(context.watch(context.actorOf(handOffStopperProps(shardId, replyTo, idByRef.keySet)))) + + //During hand off we only care about watching for termination of the hand off stopper + context become { + case Terminated(ref) ⇒ receiveTerminated(ref) + } + } else { + replyTo ! ShardStopped(shardId) + context stop self + } + } + + def receiveTerminated(ref: ActorRef): Unit = { + if (handOffStopper.exists(_ == ref)) { + context stop self + } else if (idByRef.contains(ref) && handOffStopper.isEmpty) { + val id = idByRef(ref) + if (messageBuffers.getOrElse(id, Vector.empty).nonEmpty) { + //Note; because we're not persisting the EntryStopped, we don't need + // to persist the EntryStarted either. + log.debug("Starting entry [{}] again, there are buffered messages for it", id) + sendMsgBuffer(EntryStarted(id)) + } else { + if (rememberEntries && !passivating.contains(ref)) { + log.debug("Entry [{}] stopped without passivating, will restart after backoff", id) + context.system.scheduler.scheduleOnce(entryRestartBackoff, self, RestartEntry(id)) + } else processChange(EntryStopped(id))(passivateCompleted) + } + + passivating = passivating - ref + } + } + + def passivate(entry: ActorRef, stopMessage: Any): Unit = { + idByRef.get(entry) match { + case Some(id) if !messageBuffers.contains(id) ⇒ + log.debug("Passivating started on entry {}", id) + + passivating = passivating + entry + messageBuffers = messageBuffers.updated(id, Vector.empty) + entry ! stopMessage + + case _ ⇒ //ignored + } + } + + // EntryStopped persistence handler + def passivateCompleted(event: EntryStopped): Unit = { + log.debug("Entry stopped [{}]", event.entryId) + + val ref = refById(event.entryId) + idByRef -= ref + refById -= event.entryId + + state = state.copy(state.entries - event.entryId) + messageBuffers = messageBuffers - event.entryId + } + + // EntryStarted persistence handler + def sendMsgBuffer(event: EntryStarted): Unit = { + //Get the buffered messages and remove the buffer + val messages = messageBuffers.getOrElse(event.entryId, Vector.empty) + messageBuffers = messageBuffers - event.entryId + + if (messages.nonEmpty) { + log.debug("Sending message buffer for entry [{}] ([{}] messages)", event.entryId, messages.size) + getEntry(event.entryId) + + //Now there is no deliveryBuffer we can try to redeliver + // and as the child exists, the message will be directly forwarded + messages foreach { + case (msg, snd) ⇒ deliverMessage(msg, snd) + } + } + } + + def deliverMessage(msg: Any, snd: ActorRef): Unit = { + val (id, payload) = idExtractor(msg) + if (id == null || id == "") { + log.warning("Id must not be empty, dropping message [{}]", msg.getClass.getName) + context.system.deadLetters ! msg + } else { + messageBuffers.get(id) match { + case None ⇒ deliverTo(id, msg, payload, snd) + + case Some(buf) if totalBufferSize >= bufferSize ⇒ + log.debug("Buffer is full, dropping message for entry [{}]", id) + context.system.deadLetters ! msg + + case Some(buf) ⇒ + log.debug("Message for entry [{}] buffered", id) + messageBuffers = messageBuffers.updated(id, buf :+ ((msg, snd))) + } + } + } + + def deliverTo(id: EntryId, msg: Any, payload: Msg, snd: ActorRef): Unit = { + val name = URLEncoder.encode(id, "utf-8") + context.child(name) match { + case Some(actor) ⇒ + actor.tell(payload, snd) + + case None if rememberEntries ⇒ + //Note; we only do this if remembering, otherwise the buffer is an overhead + messageBuffers = messageBuffers.updated(id, Vector((msg, snd))) + persist(EntryStarted(id))(sendMsgBuffer) + + case None ⇒ + getEntry(id).tell(payload, snd) + } + } + + def getEntry(id: EntryId): ActorRef = { + val name = URLEncoder.encode(id, "utf-8") + context.child(name).getOrElse { + log.debug("Starting entry [{}] in shard [{}]", id, shardId) + + val a = context.watch(context.actorOf(entryProps, name)) + idByRef = idByRef.updated(a, id) + refById = refById.updated(id, a) + state = state.copy(state.entries + id) + a + } + } } /** @@ -869,7 +1207,7 @@ object ShardCoordinatorSupervisor { * Factory method for the [[akka.actor.Props]] of the [[ShardCoordinator]] actor. */ def props(failureBackoff: FiniteDuration, coordinatorProps: Props): Props = - Props(classOf[ShardCoordinatorSupervisor], failureBackoff, coordinatorProps) + Props(new ShardCoordinatorSupervisor(failureBackoff, coordinatorProps)) /** * INTERNAL API @@ -905,9 +1243,9 @@ object ShardCoordinator { /** * Factory method for the [[akka.actor.Props]] of the [[ShardCoordinator]] actor. */ - def props(handOffTimeout: FiniteDuration, rebalanceInterval: FiniteDuration, snapshotInterval: FiniteDuration, + def props(handOffTimeout: FiniteDuration, shardStartTimeout: FiniteDuration, rebalanceInterval: FiniteDuration, snapshotInterval: FiniteDuration, allocationStrategy: ShardAllocationStrategy): Props = - Props(classOf[ShardCoordinator], handOffTimeout, rebalanceInterval, snapshotInterval, allocationStrategy) + Props(new ShardCoordinator(handOffTimeout, shardStartTimeout, rebalanceInterval, snapshotInterval, allocationStrategy)) /** * Interface of the pluggable shard allocation and rebalancing logic used by the [[ShardCoordinator]]. @@ -1039,6 +1377,10 @@ object ShardCoordinator { * Acknowledgement from `ShardCoordinator` that [[Register]] or [[RegisterProxy]] was sucessful. */ @SerialVersionUID(1L) final case class RegisterAck(coordinator: ActorRef) extends CoordinatorMessage + /** + * Periodic message to trigger persistent snapshot + */ + @SerialVersionUID(1L) final case object SnapshotTick extends CoordinatorMessage /** * `ShardRegion` requests the location of a shard by sending this message * to the `ShardCoordinator`. @@ -1048,6 +1390,14 @@ object ShardCoordinator { * `ShardCoordinator` replies with this message for [[GetShardHome]] requests. */ @SerialVersionUID(1L) final case class ShardHome(shard: ShardId, ref: ActorRef) extends CoordinatorMessage + /** + * `ShardCoodinator` informs a `ShardRegion` that it is hosting this shard + */ + @SerialVersionUID(1L) final case class HostShard(shard: ShardId) extends CoordinatorMessage + /** + * `ShardRegion` replies with this message for [[HostShard]] requests which lead to it hosting the shard + */ + @SerialVersionUID(1l) final case class ShardStarted(shard: ShardId) extends CoordinatorMessage /** * `ShardCoordinator` initiates rebalancing process by sending this message * to all registered `ShardRegion` actors (including proxy only). They are @@ -1090,11 +1440,12 @@ object ShardCoordinator { * Persistent state of the event sourced ShardCoordinator. */ @SerialVersionUID(1L) final case class State private ( - // region for each shard - val shards: Map[ShardId, ActorRef] = Map.empty, + // region for each shard + shards: Map[ShardId, ActorRef] = Map.empty, // shards for each region - val regions: Map[ActorRef, Vector[ShardId]] = Map.empty, - val regionProxies: Set[ActorRef] = Set.empty) { + regions: Map[ActorRef, Vector[ShardId]] = Map.empty, + regionProxies: Set[ActorRef] = Set.empty, + unallocatedShards: Set[ShardId] = Set.empty) { def updated(event: DomainEvent): State = event match { case ShardRegionRegistered(region) ⇒ @@ -1107,7 +1458,8 @@ object ShardCoordinator { require(regions.contains(region), s"Terminated region $region not registered: $this") copy( regions = regions - region, - shards = shards -- regions(region)) + shards = shards -- regions(region), + unallocatedShards = unallocatedShards ++ regions(region)) case ShardRegionProxyTerminated(proxy) ⇒ require(regionProxies.contains(proxy), s"Terminated region proxy $proxy not registered: $this") copy(regionProxies = regionProxies - proxy) @@ -1116,14 +1468,16 @@ object ShardCoordinator { require(!shards.contains(shard), s"Shard [$shard] already allocated: $this") copy( shards = shards.updated(shard, region), - regions = regions.updated(region, regions(region) :+ shard)) + regions = regions.updated(region, regions(region) :+ shard), + unallocatedShards = unallocatedShards - shard) case ShardHomeDeallocated(shard) ⇒ require(shards.contains(shard), s"Shard [$shard] not allocated: $this") val region = shards(shard) require(regions.contains(region), s"Region $region for shard [$shard] not registered: $this") copy( shards = shards - shard, - regions = regions.updated(region, regions(region).filterNot(_ == shard))) + regions = regions.updated(region, regions(region).filterNot(_ == shard)), + unallocatedShards = unallocatedShards + shard) } } @@ -1133,16 +1487,14 @@ object ShardCoordinator { * Periodic message to trigger rebalance */ private case object RebalanceTick - /** - * Periodic message to trigger persistent snapshot - */ - private case object SnapshotTick /** * End of rebalance process performed by [[RebalanceWorker]] */ private final case class RebalanceDone(shard: ShardId, ok: Boolean) - - private case object AfterRecover + /** + * Check if we've received a shard start request + */ + private final case class ResendShardHost(shard: ShardId, region: ActorRef) /** * INTERNAL API. Rebalancing process is performed by this actor. @@ -1182,6 +1534,10 @@ object ShardCoordinator { } } + private[akka] def rebalanceWorkerProps(shard: String, from: ActorRef, handOffTimeout: FiniteDuration, + regions: Set[ActorRef]): Props = + Props(new RebalanceWorker(shard, from, handOffTimeout, regions)) + } /** @@ -1189,25 +1545,24 @@ object ShardCoordinator { * * @see [[ClusterSharding$ ClusterSharding extension]] */ -class ShardCoordinator(handOffTimeout: FiniteDuration, rebalanceInterval: FiniteDuration, +class ShardCoordinator(handOffTimeout: FiniteDuration, shardStartTimeout: FiniteDuration, rebalanceInterval: FiniteDuration, snapshotInterval: FiniteDuration, allocationStrategy: ShardCoordinator.ShardAllocationStrategy) extends PersistentActor with ActorLogging { import ShardCoordinator._ import ShardCoordinator.Internal._ import ShardRegion.ShardId + import akka.actor.Cancellable override def persistenceId = self.path.toStringWithoutAddress var persistentState = State.empty var rebalanceInProgress = Set.empty[ShardId] + var unAckedHostShards = Map.empty[ShardId, Cancellable] import context.dispatcher val rebalanceTask = context.system.scheduler.schedule(rebalanceInterval, rebalanceInterval, self, RebalanceTick) val snapshotTask = context.system.scheduler.schedule(snapshotInterval, snapshotInterval, self, SnapshotTick) - // this will be stashed and received when the recovery is completed - self ! AfterRecover - override def postStop(): Unit = { super.postStop() rebalanceTask.cancel() @@ -1240,7 +1595,18 @@ class ShardCoordinator(handOffTimeout: FiniteDuration, rebalanceInterval: Finite case SnapshotOffer(_, state: State) ⇒ log.debug("receiveRecover SnapshotOffer {}", state) - persistentState = state + //Old versions of the state object may not have unallocatedShard set, + // thus it will be null. + if (state.unallocatedShards == null) + persistentState = state.copy(unallocatedShards = Set.empty) + else + persistentState = state + + case RecoveryCompleted ⇒ + persistentState.regionProxies.foreach(context.watch) + persistentState.regions.foreach { case (a, _) ⇒ context.watch(a) } + persistentState.shards.foreach { case (a, r) ⇒ sendHostShardMsg(a, r) } + allocateShardHomes() } override def receiveCommand: Receive = { @@ -1250,9 +1616,14 @@ class ShardCoordinator(handOffTimeout: FiniteDuration, rebalanceInterval: Finite sender() ! RegisterAck(self) else persist(ShardRegionRegistered(region)) { evt ⇒ + val firstRegion = persistentState.regions.isEmpty + persistentState = persistentState.updated(evt) context.watch(region) sender() ! RegisterAck(self) + + if (firstRegion) + allocateShardHomes() } case RegisterProxy(proxy) ⇒ @@ -1269,8 +1640,13 @@ class ShardCoordinator(handOffTimeout: FiniteDuration, rebalanceInterval: Finite case Terminated(ref) ⇒ if (persistentState.regions.contains(ref)) { log.debug("ShardRegion terminated: [{}]", ref) + + require(persistentState.regions.contains(ref), s"Terminated region $ref not registered") + persistentState.regions(ref).foreach { s ⇒ self ! GetShardHome(s) } + persist(ShardRegionTerminated(ref)) { evt ⇒ persistentState = persistentState.updated(evt) + allocateShardHomes() } } else if (persistentState.regionProxies.contains(ref)) { log.debug("ShardRegion proxy terminated: [{}]", ref) @@ -1291,19 +1667,35 @@ class ShardCoordinator(handOffTimeout: FiniteDuration, rebalanceInterval: Finite persist(ShardHomeAllocated(shard, region)) { evt ⇒ persistentState = persistentState.updated(evt) log.debug("Shard [{}] allocated at [{}]", evt.shard, evt.region) + + sendHostShardMsg(evt.shard, evt.region) sender() ! ShardHome(evt.shard, evt.region) } } } } + case ShardStarted(shard) ⇒ + unAckedHostShards.get(shard) match { + case Some(cancel) ⇒ + cancel.cancel() + unAckedHostShards = unAckedHostShards - shard + case _ ⇒ + } + + case ResendShardHost(shard, region) ⇒ + persistentState.shards.get(shard) match { + case Some(`region`) ⇒ sendHostShardMsg(shard, region) + case _ ⇒ //Reallocated to another region + } + case RebalanceTick ⇒ if (persistentState.regions.nonEmpty) allocationStrategy.rebalance(persistentState.regions, rebalanceInProgress).foreach { shard ⇒ rebalanceInProgress += shard val rebalanceFromRegion = persistentState.shards(shard) log.debug("Rebalance shard [{}] from [{}]", shard, rebalanceFromRegion) - context.actorOf(Props(classOf[RebalanceWorker], shard, rebalanceFromRegion, handOffTimeout, + context.actorOf(rebalanceWorkerProps(shard, rebalanceFromRegion, handOffTimeout, persistentState.regions.keySet ++ persistentState.regionProxies)) } @@ -1315,6 +1707,7 @@ class ShardCoordinator(handOffTimeout: FiniteDuration, rebalanceInterval: Finite persist(ShardHomeDeallocated(shard)) { evt ⇒ persistentState = persistentState.updated(evt) log.debug("Shard [{}] deallocated", evt.shard) + allocateShardHomes() } case SnapshotTick ⇒ @@ -1327,11 +1720,19 @@ class ShardCoordinator(handOffTimeout: FiniteDuration, rebalanceInterval: Finite case SaveSnapshotFailure(_, reason) ⇒ log.warning("Persistent snapshot failure: {}", reason.getMessage) - case AfterRecover ⇒ - persistentState.regionProxies.foreach(context.watch) - persistentState.regions.foreach { case (a, _) ⇒ context.watch(a) } + case ShardHome(_, _) ⇒ + //On rebalance, we send ourselves a GetShardHome message to reallocate a + // shard. This recieve handles the "response" from that message. i.e. Ingores it. } + def sendHostShardMsg(shard: ShardId, region: ActorRef): Unit = { + region ! HostShard(shard) + val cancel = context.system.scheduler.scheduleOnce(shardStartTimeout, self, ResendShardHost(shard, region)) + unAckedHostShards = unAckedHostShards.updated(shard, cancel) + } + + def allocateShardHomes(): Unit = persistentState.unallocatedShards.foreach { self ! GetShardHome(_) } + } diff --git a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingFailureSpec.scala b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingFailureSpec.scala index ab6e95e567..ae94e5f26c 100644 --- a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingFailureSpec.scala +++ b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingFailureSpec.scala @@ -4,13 +4,12 @@ package akka.contrib.pattern import java.io.File +import akka.contrib.pattern.ShardRegion.Passivate + import scala.concurrent.duration._ import org.apache.commons.io.FileUtils import com.typesafe.config.ConfigFactory -import akka.actor.Actor -import akka.actor.ActorIdentity -import akka.actor.Identify -import akka.actor.Props +import akka.actor._ import akka.cluster.Cluster import akka.cluster.ClusterEvent._ import akka.persistence.Persistence @@ -44,6 +43,7 @@ object ClusterShardingFailureSpec extends MultiNodeConfig { } akka.persistence.snapshot-store.local.dir = "target/snapshots-ClusterShardingFailureSpec" akka.contrib.cluster.sharding.coordinator-failure-backoff = 3s + akka.contrib.cluster.sharding.shard-failure-backoff = 3s """)) testTransport(on = true) @@ -111,6 +111,7 @@ class ClusterShardingFailureSpec extends MultiNodeSpec(ClusterShardingFailureSpe ClusterSharding(system).start( typeName = "Entity", entryProps = Some(Props[Entity]), + rememberEntries = true, idExtractor = idExtractor, shardResolver = shardResolver) } @@ -143,10 +144,13 @@ class ClusterShardingFailureSpec extends MultiNodeSpec(ClusterShardingFailureSpe runOn(first) { region ! Add("10", 1) region ! Add("20", 2) + region ! Add("21", 3) region ! Get("10") expectMsg(Value("10", 1)) region ! Get("20") expectMsg(Value("20", 2)) + region ! Get("21") + expectMsg(Value("21", 3)) } enterBarrier("after-2") @@ -160,9 +164,29 @@ class ClusterShardingFailureSpec extends MultiNodeSpec(ClusterShardingFailureSpe enterBarrier("journal-blackholed") runOn(first) { + region ! Get("21") + expectMsg(Value("21", 3)) + val entry21 = lastSender + val shard2 = system.actorSelection(entry21.path.parent) + + //Test the ShardCoordinator allocating shards during a journal failure region ! Add("30", 3) + + //Test the Shard starting entries and persisting during a journal failure + region ! Add("11", 1) + + //Test the Shard passivate works during a journal failure + shard2.tell(Passivate(PoisonPill), entry21) + region ! Add("21", 1) + + region ! Get("21") + expectMsg(Value("21", 1)) + region ! Get("30") expectMsg(Value("30", 3)) + + region ! Get("11") + expectMsg(Value("11", 1)) } runOn(controller) { @@ -175,8 +199,11 @@ class ClusterShardingFailureSpec extends MultiNodeSpec(ClusterShardingFailureSpe region ! Add("10", 1) region ! Add("20", 2) region ! Add("30", 3) + region ! Add("11", 4) region ! Get("10") expectMsg(Value("10", 2)) + region ! Get("11") + expectMsg(Value("11", 5)) region ! Get("20") expectMsg(Value("20", 4)) region ! Get("30") diff --git a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala index cf2403403d..f4955aaaab 100644 --- a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala +++ b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala @@ -3,6 +3,9 @@ */ package akka.contrib.pattern +import akka.contrib.pattern.ShardCoordinator.Internal.{ ShardStopped, HandOff } +import akka.contrib.pattern.ShardRegion.Passivate + import language.postfixOps import scala.concurrent.duration._ import com.typesafe.config.ConfigFactory @@ -51,12 +54,14 @@ object ClusterShardingSpec extends MultiNodeConfig { role = backend retry-interval = 1 s handoff-timeout = 10 s + shard-start-timeout = 5s + entry-restart-backoff = 1s rebalance-interval = 2 s least-shard-allocation-strategy { rebalance-threshold = 2 max-simultaneous-rebalance = 1 } - } + } """)) nodeConfig(sixth) { @@ -77,9 +82,9 @@ object ClusterShardingSpec extends MultiNodeConfig { context.setReceiveTimeout(120.seconds) - // self.path.parent.name is the type name (utf-8 URL-encoded) + // self.path.parent.parent.name is the type name (utf-8 URL-encoded) // self.path.name is the entry identifier (utf-8 URL-encoded) - override def persistenceId: String = self.path.parent.name + "-" + self.path.name + override def persistenceId: String = self.path.parent.parent.name + "-" + self.path.name var count = 0 //#counter-actor @@ -162,7 +167,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult def createCoordinator(): Unit = { val allocationStrategy = new ShardCoordinator.LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 1) - val coordinatorProps = ShardCoordinator.props(handOffTimeout = 10.second, rebalanceInterval = 2.seconds, + val coordinatorProps = ShardCoordinator.props(handOffTimeout = 10.seconds, shardStartTimeout = 10.seconds, rebalanceInterval = 2.seconds, snapshotInterval = 3600.seconds, allocationStrategy) system.actorOf(ClusterSingletonManager.props( singletonProps = ShardCoordinatorSupervisor.props(failureBackoff = 5.seconds, coordinatorProps), @@ -173,15 +178,27 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult } lazy val region = system.actorOf(ShardRegion.props( + typeName = "counter", entryProps = Props[Counter], role = None, coordinatorPath = "/user/counterCoordinator/singleton/coordinator", retryInterval = 1.second, + shardFailureBackoff = 1.second, + entryRestartBackoff = 1.second, + snapshotInterval = 1.hour, bufferSize = 1000, + rememberEntries = false, idExtractor = idExtractor, shardResolver = shardResolver), name = "counterRegion") + lazy val persistentRegion = ClusterSharding(system).start( + typeName = "PersistentCounter", + entryProps = Some(Props[Counter]), + rememberEntries = true, + idExtractor = idExtractor, + shardResolver = shardResolver) + "Cluster sharding" must { "setup shared journal" in { @@ -239,22 +256,22 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult region ! EntryEnvelope(2, Increment) region ! Get(2) expectMsg(3) - lastSender.path should be(node(second) / "user" / "counterRegion" / "2") + lastSender.path should be(node(second) / "user" / "counterRegion" / "2" / "2") region ! Get(11) expectMsg(1) // local on first - lastSender.path should be(region.path / "11") + lastSender.path should be(region.path / "11" / "11") region ! Get(12) expectMsg(1) - lastSender.path should be(node(second) / "user" / "counterRegion" / "12") + lastSender.path should be(node(second) / "user" / "counterRegion" / "0" / "12") } enterBarrier("first-update") runOn(second) { region ! Get(2) expectMsg(3) - lastSender.path should be(region.path / "2") + lastSender.path should be(region.path / "2" / "2") } enterBarrier("after-3") @@ -291,7 +308,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult within(1.second) { region.tell(Get(2), probe1.ref) probe1.expectMsg(4) - probe1.lastSender.path should be(region.path / "2") + probe1.lastSender.path should be(region.path / "2" / "2") } } val probe2 = TestProbe() @@ -299,7 +316,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult within(1.second) { region.tell(Get(12), probe2.ref) probe2.expectMsg(1) - probe2.lastSender.path should be(region.path / "12") + probe2.lastSender.path should be(region.path / "0" / "12") } } } @@ -331,25 +348,25 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult region ! EntryEnvelope(3, Increment) region ! Get(3) expectMsg(11) - lastSender.path should be(node(third) / "user" / "counterRegion" / "3") + lastSender.path should be(node(third) / "user" / "counterRegion" / "3" / "3") region ! EntryEnvelope(4, Increment) region ! Get(4) expectMsg(21) - lastSender.path should be(node(fourth) / "user" / "counterRegion" / "4") + lastSender.path should be(node(fourth) / "user" / "counterRegion" / "4" / "4") } enterBarrier("first-update") runOn(third) { region ! Get(3) expectMsg(11) - lastSender.path should be(region.path / "3") + lastSender.path should be(region.path / "3" / "3") } runOn(fourth) { region ! Get(4) expectMsg(21) - lastSender.path should be(region.path / "4") + lastSender.path should be(region.path / "4" / "4") } enterBarrier("after-6") @@ -369,7 +386,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult within(1.second) { region.tell(Get(3), probe3.ref) probe3.expectMsg(11) - probe3.lastSender.path should be(node(third) / "user" / "counterRegion" / "3") + probe3.lastSender.path should be(node(third) / "user" / "counterRegion" / "3" / "3") } } val probe4 = TestProbe() @@ -377,7 +394,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult within(1.second) { region.tell(Get(4), probe4.ref) probe4.expectMsg(21) - probe4.lastSender.path should be(node(fourth) / "user" / "counterRegion" / "4") + probe4.lastSender.path should be(node(fourth) / "user" / "counterRegion" / "4" / "4") } } @@ -407,7 +424,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult } } - // add more shards, which should later trigger rebalance to new node sixth + // add more shards, which should later trigger rebalance to new node sixth for (n ← 5 to 10) region ! EntryEnvelope(n, Increment) @@ -428,7 +445,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult for (n ← 1 to 10) { region.tell(Get(n), probe.ref) probe.expectMsgType[Int] - if (probe.lastSender.path == region.path / n.toString) + if (probe.lastSender.path == region.path / (n % 12).toString / n.toString) count += 1 } count should be(2) @@ -444,6 +461,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult "support proxy only mode" in within(10.seconds) { runOn(sixth) { val proxy = system.actorOf(ShardRegion.proxyProps( + typeName = "counter", role = None, coordinatorPath = "/user/counterCoordinator/singleton/coordinator", retryInterval = 1.second, @@ -466,12 +484,14 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult val counterRegion: ActorRef = ClusterSharding(system).start( typeName = "Counter", entryProps = Some(Props[Counter]), + rememberEntries = false, idExtractor = idExtractor, shardResolver = shardResolver) - //#counter-start + //#counter-start ClusterSharding(system).start( typeName = "AnotherCounter", entryProps = Some(Props[Counter]), + rememberEntries = false, idExtractor = idExtractor, shardResolver = shardResolver) } @@ -512,6 +532,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult val counterRegionViaStart: ActorRef = ClusterSharding(system).start( typeName = "ApiTest", entryProps = Some(Props[Counter]), + rememberEntries = false, idExtractor = idExtractor, shardResolver = shardResolver) @@ -522,5 +543,263 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult enterBarrier("after-10") } + + "Persistent Cluster Shards" must { + "recover entries upon restart" in within(50.seconds) { + runOn(third, fourth, fifth) { + ClusterSharding(system).start( + typeName = "PersistentCounterEntries", + entryProps = Some(Props[Counter]), + rememberEntries = true, + idExtractor = idExtractor, + shardResolver = shardResolver) + + ClusterSharding(system).start( + typeName = "AnotherPersistentCounter", + entryProps = Some(Props[Counter]), + rememberEntries = true, + idExtractor = idExtractor, + shardResolver = shardResolver) + } + enterBarrier("persistent-started") + + runOn(third) { + val counterRegion: ActorRef = ClusterSharding(system).shardRegion("PersistentCounterEntries") + + //Create an increment counter 1 + counterRegion ! EntryEnvelope(1, Increment) + counterRegion ! Get(1) + expectMsg(1) + + //Shut down the shard and confirm it's dead + val shard = system.actorSelection(lastSender.path.parent) + val region = system.actorSelection(lastSender.path.parent.parent) + + //Stop the shard cleanly + region ! HandOff("1") + expectMsg(10 seconds, "ShardStopped not received", ShardStopped("1")) + + awaitAssert({ + shard ! Identify(1) + expectMsg(1 second, "Shard was still around", ActorIdentity(1, None)) + }, 5 seconds, 500 millis) + + //Get the path to where the shard now resides + counterRegion ! Get(13) + expectMsg(0) + + //Check that counter 1 is now alive again, even though we have + // not sent a message to it via the ShardRegion + val counter1 = system.actorSelection(lastSender.path.parent / "1") + counter1 ! Identify(2) + receiveOne(1 second) match { + case ActorIdentity(2, location) ⇒ + location should not be (None) + } + + counter1 ! Get(1) + expectMsg(1) + } + + enterBarrier("after-shard-restart") + + runOn(fourth) { + //Check a second region does not share the same persistent shards + val anotherRegion: ActorRef = ClusterSharding(system).shardRegion("AnotherPersistentCounter") + + //Create a separate 13 counter + anotherRegion ! EntryEnvelope(13, Increment) + anotherRegion ! Get(13) + expectMsg(1) + + //Check that no counter "1" exists in this shard + val secondCounter1 = system.actorSelection(lastSender.path.parent / "1") + secondCounter1 ! Identify(3) + receiveOne(1 second) match { + case ActorIdentity(3, location) ⇒ + location should be(None) + } + + } + enterBarrier("after-11") + } + + "permanently stop entries which passivate" in within(50.seconds) { + runOn(third, fourth, fifth) { + persistentRegion + } + enterBarrier("cluster-started-12") + + runOn(third) { + //Create and increment counter 1 + persistentRegion ! EntryEnvelope(1, Increment) + persistentRegion ! Get(1) + expectMsg(1) + + val counter1 = lastSender + val shard = system.actorSelection(counter1.path.parent) + val region = system.actorSelection(counter1.path.parent.parent) + + //Create and increment counter 13 + persistentRegion ! EntryEnvelope(13, Increment) + persistentRegion ! Get(13) + expectMsg(1) + + val counter13 = lastSender + + counter1.path.parent should be(counter13.path.parent) + + //Send the shard the passivate message from the counter + shard.tell(Passivate(Stop), counter1) + awaitAssert({ + //Check counter 1 is dead + counter1 ! Identify(1) + expectMsg(1 second, "Entry 1 was still around", ActorIdentity(1, None)) + }, 5 second, 500 millis) + + //Stop the shard cleanly + region ! HandOff("1") + expectMsg(10 seconds, "ShardStopped not received", ShardStopped("1")) + awaitAssert({ + shard ! Identify(2) + expectMsg(1 second, "Shard was still around", ActorIdentity(2, None)) + }, 5 seconds, 500 millis) + } + + enterBarrier("shard-shutdown-12") + + runOn(fourth) { + //Force the shard back up + persistentRegion ! Get(25) + expectMsg(0) + + val shard = lastSender.path.parent + + //Check counter 1 is still dead + system.actorSelection(shard / "1") ! Identify(3) + receiveOne(1 second) should be(ActorIdentity(3, None)) + + //Check counter 13 is alive again 8 + system.actorSelection(shard / "13") ! Identify(4) + receiveOne(1 second) match { + case ActorIdentity(4, location) ⇒ + location should not be (None) + } + } + + enterBarrier("after-12") + } + + "restart entries which stop without passivating" in within(50.seconds) { + runOn(third, fourth) { + persistentRegion + } + enterBarrier("cluster-started-12") + + runOn(third) { + //Create and increment counter 1 + persistentRegion ! EntryEnvelope(1, Increment) + persistentRegion ! Get(1) + expectMsg(2) + + val counter1 = system.actorSelection(lastSender.path) + + counter1 ! Stop + + awaitAssert({ + counter1 ! Identify(1) + receiveOne(1 second) match { + case ActorIdentity(1, location) ⇒ + location should not be (None) + } + }, 5.seconds, 500.millis) + + } + + enterBarrier("after-13") + } + + "be migrated to new regions upon region failure" in within(50.seconds) { + lazy val migrationRegion: ActorRef = ClusterSharding(system).start( + typeName = "AutoMigrateRegionTest", + entryProps = Some(Props[Counter]), + rememberEntries = true, + idExtractor = idExtractor, + shardResolver = shardResolver) + + //Start only one region, and force an entry onto that region + runOn(third) { + migrationRegion ! EntryEnvelope(1, Increment) + } + enterBarrier("shard1-region3") + + //Start another region and test it talks to node 3 + runOn(fourth) { + migrationRegion ! EntryEnvelope(1, Increment) + + migrationRegion ! Get(1) + expectMsg(2) + lastSender.path should be(node(third) / "user" / "sharding" / "AutoMigrateRegionTest" / "1" / "1") + + //Kill region 3 + system.actorSelection(lastSender.path.parent.parent) ! PoisonPill + } + enterBarrier("region4-up") + + //Wait for migration to happen + Thread sleep 2500 + + //Test the shard, thus counter was moved onto node 4 and started. + runOn(fourth) { + val counter1 = system.actorSelection(system / "sharding" / "AutoMigrateRegionTest" / "1" / "1") + counter1 ! Identify(1) + receiveOne(1 second) match { + case ActorIdentity(1, location) ⇒ + location should not be (None) + } + + counter1 ! Get(1) + expectMsg(2) + } + + enterBarrier("after-14") + } + + "ensure rebalance restarts shards" in within(50.seconds) { + runOn(fourth) { + for (i ← 2 to 12) { + persistentRegion ! EntryEnvelope(i, Increment) + } + + for (i ← 2 to 12) { + persistentRegion ! Get(i) + expectMsg(1) + } + } + enterBarrier("entries-started") + + runOn(fifth) { + persistentRegion + } + enterBarrier("fifth-joined-shard") + + runOn(fifth) { + var count = 0 + + for (n ← 2 to 12) { + var entry = system.actorSelection(system / "sharding" / "PersistentCounter" / (n % 12).toString / n.toString) + entry ! Identify(n) + receiveOne(1 second) match { + case ActorIdentity(id, Some(_)) if id == n ⇒ count = count + 1 + case ActorIdentity(id, None) ⇒ //Not on the fifth shard + } + } + + assert(count >= 3, s"Not enough entries migrated, only ${count}") + } + + enterBarrier("after-15") + } + } } diff --git a/akka-contrib/src/test/java/akka/contrib/pattern/ClusterShardingTest.java b/akka-contrib/src/test/java/akka/contrib/pattern/ClusterShardingTest.java index e42c11cec4..0a24529871 100644 --- a/akka-contrib/src/test/java/akka/contrib/pattern/ClusterShardingTest.java +++ b/akka-contrib/src/test/java/akka/contrib/pattern/ClusterShardingTest.java @@ -63,7 +63,7 @@ public class ClusterShardingTest { //#counter-extractor //#counter-start - ActorRef startedCounterRegion = ClusterSharding.get(system).start("Counter", Props.create(Counter.class), + ActorRef startedCounterRegion = ClusterSharding.get(system).start("Counter", Props.create(Counter.class), false, messageExtractor); //#counter-start @@ -111,12 +111,12 @@ public class ClusterShardingTest { } int count = 0; - - // getSelf().path().parent().name() is the type name (utf-8 URL-encoded) + + // getSelf().path().parent().parent().name() is the type name (utf-8 URL-encoded) // getSelf().path().name() is the entry identifier (utf-8 URL-encoded) @Override public String persistenceId() { - return getSelf().path().parent().name() + "-" + getSelf().path().name(); + return getSelf().path().parent().parent().name() + "-" + getSelf().path().name(); } @Override diff --git a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst index 81d158b56b..47ac809453 100644 --- a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst +++ b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst @@ -70,6 +70,14 @@ can be retrieved by calling ``channel.socket``. This allows for accessing new NI A new class ``DatagramChannelCreator`` which extends ``SocketOption`` has been added. ``DatagramChannelCreator`` can be used for custom ``DatagramChannel`` creation logic. This allows for opening IPv6 multicast datagram channels. +Cluster Sharding Entry Path Change +================================== +Previously in ``2.3.x`` entries were direct children of the local ``ShardRegion``. In examples the ``persistenceId`` of entries +included ``self.path.parent.name`` to include the cluster type name. + +In ``2.4.x`` entries are now children of a ``Shard``, which in turn is a child of the local ``ShardRegion``. To include the shard +type in the ``persistenceId`` it is now accessed by ``self.path.parent.parent.name`` from each entry. + Removed Deprecated Features =========================== @@ -91,7 +99,7 @@ The following, previously deprecated, features have been removed: in the way that was introduced in Akka 2.3. * Timeout constructor without unit - + * JavaLoggingEventHandler, replaced by JavaLogger * UntypedActorFactory @@ -103,7 +111,7 @@ Slf4j logging filter If you use ``Slf4jLogger`` you should add the following configuration:: - akka.logging-filter = "akka.event.slf4j.Slf4jLoggingFilter" + akka.logging-filter = "akka.event.slf4j.Slf4jLoggingFilter" It will filter the log events using the backend configuration (e.g. logback.xml) before they are published to the event bus.