diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index e3bcaca53c..e22fe6d34e 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -306,7 +306,7 @@ private[akka] class LocalActorRef private[akka] ( _mailboxType: MailboxType, _supervisor: InternalActorRef, override val path: ActorPath) - extends ActorRefWithCell with LocalRef { + extends ActorRefWithCell with LocalRef { /* * Safe publication of this class’s fields is guaranteed by mailbox.setActor() @@ -599,10 +599,10 @@ private[akka] class DeadLetterActorRef(_provider: ActorRefProvider, * INTERNAL API */ private[akka] class VirtualPathContainer( - override val provider: ActorRefProvider, - override val path: ActorPath, - override val getParent: InternalActorRef, - val log: LoggingAdapter) extends MinimalActorRef { + override val provider: ActorRefProvider, + override val path: ActorPath, + override val getParent: InternalActorRef, + val log: LoggingAdapter) extends MinimalActorRef { private val children = new ConcurrentHashMap[String, InternalActorRef] diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala index d7165743a5..f5e679d691 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala @@ -39,6 +39,10 @@ import akka.persistence._ import akka.cluster.ClusterEvent.ClusterDomainEvent import akka.cluster.singleton.ClusterSingletonManager import akka.cluster.singleton.ClusterSingletonManagerSettings +import scala.concurrent.Future +import akka.dispatch.ExecutionContexts +import akka.pattern.pipe +import scala.util.Success /** * This extension provides sharding functionality of actors in a cluster. @@ -1288,11 +1292,11 @@ object ShardCoordinator { * @param shardId the id of the shard to allocate * @param currentShardAllocations all actor refs to `ShardRegion` and their current allocated shards, * in the order they were allocated - * @return the actor ref of the [[ShardRegion]] that is to be responsible for the shard, must be one of + * @return a `Future` of the actor ref of the [[ShardRegion]] that is to be responsible for the shard, must be one of * the references included in the `currentShardAllocations` parameter */ def allocateShard(requester: ActorRef, shardId: ShardId, - currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardId]]): ActorRef + currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardId]]): Future[ActorRef] /** * Invoked periodically to decide which shards to rebalance to another location. @@ -1300,10 +1304,10 @@ object ShardCoordinator { * in the order they were allocated * @param rebalanceInProgress set of shards that are currently being rebalanced, i.e. * you should not include these in the returned set - * @return the shards to be migrated, may be empty to skip rebalance in this round + * @return a `Future` of the shards to be migrated, may be empty to skip rebalance in this round */ def rebalance(currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardId]], - rebalanceInProgress: Set[ShardId]): Set[ShardId] + rebalanceInProgress: Set[ShardId]): Future[Set[ShardId]] } /** @@ -1312,15 +1316,17 @@ object ShardCoordinator { */ abstract class AbstractShardAllocationStrategy extends ShardAllocationStrategy { override final def allocateShard(requester: ActorRef, shardId: ShardId, - currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardId]]): ActorRef = { + currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardId]]): Future[ActorRef] = { + import scala.collection.JavaConverters._ allocateShard(requester, shardId, currentShardAllocations.asJava) } override final def rebalance(currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardId]], - rebalanceInProgress: Set[ShardId]): Set[ShardId] = { + rebalanceInProgress: Set[ShardId]): Future[Set[ShardId]] = { import scala.collection.JavaConverters._ - rebalance(currentShardAllocations.asJava, rebalanceInProgress.asJava).asScala.toSet + implicit val ec = ExecutionContexts.sameThreadExecutionContext + rebalance(currentShardAllocations.asJava, rebalanceInProgress.asJava).map(_.asScala.toSet) } /** @@ -1330,11 +1336,11 @@ object ShardCoordinator { * @param shardId the id of the shard to allocate * @param currentShardAllocations all actor refs to `ShardRegion` and their current allocated shards, * in the order they were allocated - * @return the actor ref of the [[ShardRegion]] that is to be responsible for the shard, must be one of + * @return a `Future` of the actor ref of the [[ShardRegion]] that is to be responsible for the shard, must be one of * the references included in the `currentShardAllocations` parameter */ def allocateShard(requester: ActorRef, shardId: String, - currentShardAllocations: java.util.Map[ActorRef, immutable.IndexedSeq[String]]): ActorRef + currentShardAllocations: java.util.Map[ActorRef, immutable.IndexedSeq[String]]): Future[ActorRef] /** * Invoked periodically to decide which shards to rebalance to another location. @@ -1342,12 +1348,14 @@ object ShardCoordinator { * in the order they were allocated * @param rebalanceInProgress set of shards that are currently being rebalanced, i.e. * you should not include these in the returned set - * @return the shards to be migrated, may be empty to skip rebalance in this round + * @return a `Future` of the shards to be migrated, may be empty to skip rebalance in this round */ def rebalance(currentShardAllocations: java.util.Map[ActorRef, immutable.IndexedSeq[String]], - rebalanceInProgress: java.util.Set[String]): java.util.Set[String] + rebalanceInProgress: java.util.Set[String]): Future[java.util.Set[String]] } + private val emptyRebalanceResult = Future.successful(Set.empty[ShardId]) + /** * The default implementation of [[ShardCoordinator.LeastShardAllocationStrategy]] * allocates new shards to the `ShardRegion` with least number of previously allocated shards. @@ -1361,23 +1369,23 @@ object ShardCoordinator { extends ShardAllocationStrategy with Serializable { override def allocateShard(requester: ActorRef, shardId: ShardId, - currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardId]]): ActorRef = { + currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardId]]): Future[ActorRef] = { val (regionWithLeastShards, _) = currentShardAllocations.minBy { case (_, v) ⇒ v.size } - regionWithLeastShards + Future.successful(regionWithLeastShards) } override def rebalance(currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardId]], - rebalanceInProgress: Set[ShardId]): Set[ShardId] = { + rebalanceInProgress: Set[ShardId]): Future[Set[ShardId]] = { if (rebalanceInProgress.size < maxSimultaneousRebalance) { val (regionWithLeastShards, leastShards) = currentShardAllocations.minBy { case (_, v) ⇒ v.size } val mostShards = currentShardAllocations.collect { case (_, v) ⇒ v.filterNot(s ⇒ rebalanceInProgress(s)) }.maxBy(_.size) if (mostShards.size - leastShards.size >= rebalanceThreshold) - Set(mostShards.head) + Future.successful(Set(mostShards.head)) else - Set.empty - } else Set.empty + emptyRebalanceResult + } else emptyRebalanceResult } } @@ -1451,6 +1459,17 @@ object ShardCoordinator { */ @SerialVersionUID(1L) final case class ShardStopped(shard: ShardId) extends CoordinatorCommand + /** + * Result of `allocateShard` is piped to self with this message. + */ + @SerialVersionUID(1L) final case class AllocateShardResult( + shard: ShardId, shardRegion: Option[ActorRef], getShardHomeSender: ActorRef) extends CoordinatorCommand + + /** + * Result of `rebalance` is piped to self with this message. + */ + @SerialVersionUID(1L) final case class RebalanceResult(shards: Set[ShardId]) extends CoordinatorCommand + // DomainEvents for the persistent state of the event sourced ShardCoordinator sealed trait DomainEvent @SerialVersionUID(1L) final case class ShardRegionRegistered(region: ActorRef) extends DomainEvent @@ -1693,20 +1712,29 @@ class ShardCoordinator(handOffTimeout: FiniteDuration, shardStartTimeout: Finite case Some(ref) ⇒ sender() ! ShardHome(shard, ref) case None ⇒ if (persistentState.regions.nonEmpty) { - val region = allocationStrategy.allocateShard(sender(), shard, persistentState.regions) - require(persistentState.regions.contains(region), - s"Allocated region $region for shard [$shard] must be one of the registered regions: $persistentState") - persist(ShardHomeAllocated(shard, region)) { evt ⇒ - persistentState = persistentState.updated(evt) - log.debug("Shard [{}] allocated at [{}]", evt.shard, evt.region) - - sendHostShardMsg(evt.shard, evt.region) - sender() ! ShardHome(evt.shard, evt.region) + val getShardHomeSender = sender() + val regionFuture = allocationStrategy.allocateShard(getShardHomeSender, shard, persistentState.regions) + regionFuture.value match { + case Some(Success(region)) ⇒ + continueGetShardHome(shard, region, getShardHomeSender) + case _ ⇒ + // continue when future is completed + regionFuture.map { region ⇒ + AllocateShardResult(shard, Some(region), getShardHomeSender) + }.recover { + case _ ⇒ AllocateShardResult(shard, None, getShardHomeSender) + }.pipeTo(self) } } } } + case AllocateShardResult(shard, None, getShardHomeSender) ⇒ + log.debug("Shard [{}] allocation failed. It will be retried.", shard) + + case AllocateShardResult(shard, Some(region), getShardHomeSender) ⇒ + continueGetShardHome(shard, region, getShardHomeSender) + case ShardStarted(shard) ⇒ unAckedHostShards.get(shard) match { case Some(cancel) ⇒ @@ -1722,14 +1750,22 @@ class ShardCoordinator(handOffTimeout: FiniteDuration, shardStartTimeout: Finite } case RebalanceTick ⇒ - if (persistentState.regions.nonEmpty) - allocationStrategy.rebalance(persistentState.regions, rebalanceInProgress).foreach { shard ⇒ - rebalanceInProgress += shard - val rebalanceFromRegion = persistentState.shards(shard) - log.debug("Rebalance shard [{}] from [{}]", shard, rebalanceFromRegion) - context.actorOf(rebalanceWorkerProps(shard, rebalanceFromRegion, handOffTimeout, - persistentState.regions.keySet ++ persistentState.regionProxies)) + if (persistentState.regions.nonEmpty) { + val shardsFuture = allocationStrategy.rebalance(persistentState.regions, rebalanceInProgress) + shardsFuture.value match { + case Some(Success(shards)) ⇒ + continueRebalance(shards) + case _ ⇒ + // continue when future is completed + shardsFuture.map { shards ⇒ RebalanceResult(shards) + }.recover { + case _ ⇒ RebalanceResult(Set.empty) + }.pipeTo(self) } + } + + case RebalanceResult(shards) ⇒ + continueRebalance(shards) case RebalanceDone(shard, ok) ⇒ rebalanceInProgress -= shard @@ -1777,4 +1813,39 @@ class ShardCoordinator(handOffTimeout: FiniteDuration, shardStartTimeout: Finite def allocateShardHomes(): Unit = persistentState.unallocatedShards.foreach { self ! GetShardHome(_) } + def continueGetShardHome(shard: ShardId, region: ActorRef, getShardHomeSender: ActorRef): Unit = + if (!rebalanceInProgress.contains(shard)) { + persistentState.shards.get(shard) match { + case Some(ref) ⇒ getShardHomeSender ! ShardHome(shard, ref) + case None ⇒ + if (persistentState.regions.contains(region)) { + persist(ShardHomeAllocated(shard, region)) { evt ⇒ + persistentState = persistentState.updated(evt) + log.debug("Shard [{}] allocated at [{}]", evt.shard, evt.region) + + sendHostShardMsg(evt.shard, evt.region) + getShardHomeSender ! ShardHome(evt.shard, evt.region) + } + } else + log.debug("Allocated region {} for shard [{}] is not (any longer) one of the registered regions: {}", + region, shard, persistentState) + } + } + + def continueRebalance(shards: Set[ShardId]): Unit = + shards.foreach { shard ⇒ + if (!rebalanceInProgress(shard)) { + persistentState.shards.get(shard) match { + case Some(rebalanceFromRegion) ⇒ + rebalanceInProgress += shard + log.debug("Rebalance shard [{}] from [{}]", shard, rebalanceFromRegion) + context.actorOf(rebalanceWorkerProps(shard, rebalanceFromRegion, handOffTimeout, + persistentState.regions.keySet ++ persistentState.regionProxies)) + case None ⇒ + log.debug("Rebalance of non-existing shard [{}] is ignored", shard) + } + + } + } + } diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingCustomShardAllocationSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingCustomShardAllocationSpec.scala new file mode 100644 index 0000000000..aab37288d9 --- /dev/null +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingCustomShardAllocationSpec.scala @@ -0,0 +1,234 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.cluster.sharding + +import scala.collection.immutable +import java.io.File +import akka.cluster.sharding.ShardRegion.Passivate +import scala.concurrent.duration._ +import org.apache.commons.io.FileUtils +import com.typesafe.config.ConfigFactory +import akka.actor._ +import akka.cluster.Cluster +import akka.cluster.ClusterEvent._ +import akka.persistence.Persistence +import akka.persistence.journal.leveldb.SharedLeveldbJournal +import akka.persistence.journal.leveldb.SharedLeveldbStore +import akka.remote.testconductor.RoleName +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.remote.testkit.STMultiNodeSpec +import akka.remote.transport.ThrottlerTransportAdapter.Direction +import akka.testkit._ +import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy +import scala.concurrent.Future +import akka.util.Timeout +import akka.pattern.ask + +object ClusterShardingCustomShardAllocationSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + + commonConfig(ConfigFactory.parseString(""" + akka.loglevel = INFO + akka.actor.provider = "akka.cluster.ClusterActorRefProvider" + akka.remote.log-remote-lifecycle-events = off + akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared" + akka.persistence.journal.leveldb-shared { + timeout = 5s + store { + native = off + dir = "target/journal-ClusterShardingCustomShardAllocationSpec" + } + } + akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" + akka.persistence.snapshot-store.local.dir = "target/snapshots-ClusterShardingCustomShardAllocationSpec" + """)) + + class Entity extends Actor { + def receive = { + case id: Int ⇒ sender() ! id + } + } + + val idExtractor: ShardRegion.IdExtractor = { + case id: Int ⇒ (id.toString, id) + } + + val shardResolver: ShardRegion.ShardResolver = msg ⇒ msg match { + case id: Int ⇒ id.toString + } + + case object AllocateReq + case class UseRegion(region: ActorRef) + case object UseRegionAck + case object RebalanceReq + case class RebalanceShards(shards: Set[String]) + case object RebalanceShardsAck + + class Allocator extends Actor { + var useRegion: Option[ActorRef] = None + var rebalance = Set.empty[String] + def receive = { + case UseRegion(region) ⇒ + useRegion = Some(region) + sender() ! UseRegionAck + case AllocateReq ⇒ + useRegion.foreach { sender() ! _ } + case RebalanceShards(shards) ⇒ + rebalance = shards + sender() ! RebalanceShardsAck + case RebalanceReq ⇒ + sender() ! rebalance + rebalance = Set.empty + } + } + + case class TestAllocationStrategy(ref: ActorRef) extends ShardAllocationStrategy { + implicit val timeout = Timeout(3.seconds) + override def allocateShard(requester: ActorRef, shardId: ShardRegion.ShardId, currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardRegion.ShardId]]): Future[ActorRef] = { + (ref ? AllocateReq).mapTo[ActorRef] + } + + override def rebalance(currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardRegion.ShardId]], rebalanceInProgress: Set[ShardRegion.ShardId]): Future[Set[ShardRegion.ShardId]] = { + (ref ? RebalanceReq).mapTo[Set[String]] + } + } + +} + +class ClusterShardingCustomShardAllocationMultiJvmNode1 extends ClusterShardingCustomShardAllocationSpec +class ClusterShardingCustomShardAllocationMultiJvmNode2 extends ClusterShardingCustomShardAllocationSpec + +class ClusterShardingCustomShardAllocationSpec extends MultiNodeSpec(ClusterShardingCustomShardAllocationSpec) with STMultiNodeSpec with ImplicitSender { + import ClusterShardingCustomShardAllocationSpec._ + + override def initialParticipants = roles.size + + val storageLocations = List( + "akka.persistence.journal.leveldb.dir", + "akka.persistence.journal.leveldb-shared.store.dir", + "akka.persistence.snapshot-store.local.dir").map(s ⇒ new File(system.settings.config.getString(s))) + + override protected def atStartup() { + runOn(first) { + storageLocations.foreach(dir ⇒ if (dir.exists) FileUtils.deleteDirectory(dir)) + } + } + + override protected def afterTermination() { + runOn(first) { + storageLocations.foreach(dir ⇒ if (dir.exists) FileUtils.deleteDirectory(dir)) + } + } + + def join(from: RoleName, to: RoleName): Unit = { + runOn(from) { + Cluster(system) join node(to).address + startSharding() + } + enterBarrier(from.name + "-joined") + } + + def startSharding(): Unit = { + ClusterSharding(system).start( + typeName = "Entity", + entryProps = Some(Props[Entity]), + roleOverride = None, + rememberEntries = false, + idExtractor = idExtractor, + shardResolver = shardResolver, + allocationStrategy = TestAllocationStrategy(allocator)) + } + + lazy val region = ClusterSharding(system).shardRegion("Entity") + + lazy val allocator = system.actorOf(Props[Allocator], "allocator") + + "Cluster sharding with custom allocation strategy" must { + + "setup shared journal" in { + // start the Persistence extension + Persistence(system) + runOn(first) { + system.actorOf(Props[SharedLeveldbStore], "store") + } + enterBarrier("peristence-started") + + runOn(first, second) { + system.actorSelection(node(first) / "user" / "store") ! Identify(None) + val sharedStore = expectMsgType[ActorIdentity].ref.get + SharedLeveldbJournal.setStore(sharedStore, system) + } + + enterBarrier("after-1") + } + + "use specified region" in within(10.seconds) { + join(first, first) + + runOn(first) { + allocator ! UseRegion(region) + expectMsg(UseRegionAck) + region ! 1 + expectMsg(1) + lastSender.path should be(region.path / "1" / "1") + } + enterBarrier("first-started") + + join(second, first) + + region ! 2 + expectMsg(2) + runOn(first) { + lastSender.path should be(region.path / "2" / "2") + } + runOn(second) { + lastSender.path should be(node(first) / "user" / "sharding" / "Entity" / "2" / "2") + } + enterBarrier("second-started") + + runOn(first) { + system.actorSelection(node(second) / "user" / "sharding" / "Entity") ! Identify(None) + val secondRegion = expectMsgType[ActorIdentity].ref.get + allocator ! UseRegion(secondRegion) + expectMsg(UseRegionAck) + } + enterBarrier("second-active") + + region ! 3 + expectMsg(3) + runOn(second) { + lastSender.path should be(region.path / "3" / "3") + } + runOn(first) { + lastSender.path should be(node(second) / "user" / "sharding" / "Entity" / "3" / "3") + } + + enterBarrier("after-2") + } + + "rebalance specified shards" in within(15.seconds) { + runOn(first) { + allocator ! RebalanceShards(Set("2")) + expectMsg(RebalanceShardsAck) + + awaitAssert { + val p = TestProbe() + region.tell(2, p.ref) + p.expectMsg(2.second, 2) + p.lastSender.path should be(node(second) / "user" / "sharding" / "Entity" / "2" / "2") + } + + region ! 1 + expectMsg(1) + lastSender.path should be(region.path / "1" / "1") + } + + enterBarrier("after-2") + } + + } +} + diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/LeastShardAllocationStrategySpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/LeastShardAllocationStrategySpec.scala index 983be7af4d..3ca0d1f85b 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/LeastShardAllocationStrategySpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/LeastShardAllocationStrategySpec.scala @@ -3,8 +3,10 @@ */ package akka.cluster.sharding -import akka.testkit.AkkaSpec +import scala.concurrent.Await +import scala.concurrent.duration._ import akka.actor.Props +import akka.testkit.AkkaSpec class LeastShardAllocationStrategySpec extends AkkaSpec { import ShardCoordinator._ @@ -18,7 +20,7 @@ class LeastShardAllocationStrategySpec extends AkkaSpec { "LeastShardAllocationStrategy" must { "allocate to region with least number of shards" in { val allocations = Map(regionA -> Vector("shard1"), regionB -> Vector("shard2"), regionC -> Vector.empty) - allocationStrategy.allocateShard(regionA, "shard3", allocations) should ===(regionC) + Await.result(allocationStrategy.allocateShard(regionA, "shard3", allocations), 3.seconds) should ===(regionC) } "rebalance from region with most number of shards" in { @@ -26,22 +28,22 @@ class LeastShardAllocationStrategySpec extends AkkaSpec { regionC -> Vector.empty) // so far regionB has 2 shards and regionC has 0 shards, but the diff is less than rebalanceThreshold - allocationStrategy.rebalance(allocations, Set.empty) should be(Set.empty) + Await.result(allocationStrategy.rebalance(allocations, Set.empty), 3.seconds) should ===(Set.empty[String]) val allocations2 = allocations.updated(regionB, Vector("shard2", "shard3", "shard4")) - allocationStrategy.rebalance(allocations2, Set.empty) should ===(Set("shard2")) - allocationStrategy.rebalance(allocations2, Set("shard4")) should be(Set.empty) + Await.result(allocationStrategy.rebalance(allocations2, Set.empty), 3.seconds) should ===(Set("shard2")) + Await.result(allocationStrategy.rebalance(allocations2, Set("shard4")), 3.seconds) should ===(Set.empty[String]) val allocations3 = allocations2.updated(regionA, Vector("shard1", "shard5", "shard6")) - allocationStrategy.rebalance(allocations3, Set("shard1")) should ===(Set("shard2")) + Await.result(allocationStrategy.rebalance(allocations3, Set("shard1")), 3.seconds) should ===(Set("shard2")) } "must limit number of simultanious rebalance" in { val allocations = Map(regionA -> Vector("shard1"), regionB -> Vector("shard2", "shard3", "shard4", "shard5", "shard6"), regionC -> Vector.empty) - allocationStrategy.rebalance(allocations, Set("shard2")) should ===(Set("shard3")) - allocationStrategy.rebalance(allocations, Set("shard2", "shard3")) should be(Set.empty) + Await.result(allocationStrategy.rebalance(allocations, Set("shard2")), 3.seconds) should ===(Set("shard3")) + Await.result(allocationStrategy.rebalance(allocations, Set("shard2", "shard3")), 3.seconds) should ===(Set.empty[String]) } } } diff --git a/akka-docs/rst/java/cluster-usage.rst b/akka-docs/rst/java/cluster-usage.rst index 791ca11324..7d09e5b2f4 100644 --- a/akka-docs/rst/java/cluster-usage.rst +++ b/akka-docs/rst/java/cluster-usage.rst @@ -293,9 +293,8 @@ terminate the actor system. .. includecode:: ../../../akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/factorial/FactorialFrontendMain.java#registerOnRemoved -.. note:: -Register a OnMemberRemoved callback on a cluster that have been shutdown ,the callback will be invoked immediately on - the caller thread,otherwise it will be invoked later when the current member status changed to 'Removed'.You may +.. note:: Register a OnMemberRemoved callback on a cluster that have been shutdown, the callback will be invoked immediately on + the caller thread, otherwise it will be invoked later when the current member status changed to 'Removed'. You may want to install some cleanup handling after the cluster was started up,but the cluster might already be shutting down when you installing, and depending on the race is not healthy. diff --git a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst index 28f7d9610f..dc870a0b6c 100644 --- a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst +++ b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst @@ -31,7 +31,7 @@ depend on 3.8.0.Final that break with 3.10.3.Final should be able to manually do to 3.8.0.Final and Akka will still work with that version. Advanced Notice: TypedActors will go away -======================================== +========================================= While technically not yet deprecated, the current ``akka.actor.TypedActor`` support will be superseded by the Akka Typed project that is currently being developed in open preview mode. If you are using TypedActors @@ -192,7 +192,7 @@ since it is still enabled in akka-cluster by default (for compatibility with pas Router configuration entries have also changed for the module, they use prefix ``cluster-metrics-``: ``cluster-metrics-adaptive-pool`` and ``cluster-metrics-adaptive-group`` Metrics extension classes and objects are located in the new package ``akka.cluster.metrics``. -Please see :ref:`Scala `, :ref:`Java ` for more information. +Please see :ref:`Scala `, :ref:`Java ` for more information. Microkernel is Deprecated ========================= @@ -254,3 +254,13 @@ it as an ordinary actor if you need multiple instances of it with different sett The parameters of the ``Props`` factory methods in the ``ClusterReceptionist`` companion has been moved to settings object ``ClusterReceptionistSettings``. This can be created from system configuration properties and also amended with API as needed. + +Asynchronous ShardAllocationStrategy +==================================== + +The methods of the ``ShardAllocationStrategy`` and ``AbstractShardAllocationStrategy`` in Cluster Sharding +have changed return type to a ``Future`` to support asynchronous decision. For example you can ask an +actor external actor of how to allocate shards or rebalance shards. + +For the synchronous case you can return the result via ``scala.concurrent.Future.successful`` in Scala or +``akka.dispatch.Futures.successful`` in Java. diff --git a/akka-docs/rst/scala/cluster-usage.rst b/akka-docs/rst/scala/cluster-usage.rst index 7319840b6e..759dd2719b 100644 --- a/akka-docs/rst/scala/cluster-usage.rst +++ b/akka-docs/rst/scala/cluster-usage.rst @@ -287,9 +287,8 @@ terminate the actor system. .. includecode:: ../../../akka-samples/akka-sample-cluster-scala/src/main/scala/sample/cluster/factorial/FactorialFrontend.scala#registerOnRemoved -.. note:: - Register a OnMemberRemoved callback on a cluster that have been shutdown ,the callback will be invoked immediately on - the caller thread,otherwise it will be invoked later when the current member status changed to 'Removed'.You may +.. note:: Register a OnMemberRemoved callback on a cluster that have been shutdown, the callback will be invoked immediately on + the caller thread, otherwise it will be invoked later when the current member status changed to 'Removed'. You may want to install some cleanup handling after the cluster was started up,but the cluster might already be shutting down when you installing, and depending on the race is not healthy.