!con #15331 Support async allocateShard and rebalance
This commit is contained in:
parent
a04a223966
commit
bcb36c36d9
7 changed files with 369 additions and 54 deletions
|
|
@ -306,7 +306,7 @@ private[akka] class LocalActorRef private[akka] (
|
||||||
_mailboxType: MailboxType,
|
_mailboxType: MailboxType,
|
||||||
_supervisor: InternalActorRef,
|
_supervisor: InternalActorRef,
|
||||||
override val path: ActorPath)
|
override val path: ActorPath)
|
||||||
extends ActorRefWithCell with LocalRef {
|
extends ActorRefWithCell with LocalRef {
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Safe publication of this class’s fields is guaranteed by mailbox.setActor()
|
* Safe publication of this class’s fields is guaranteed by mailbox.setActor()
|
||||||
|
|
@ -599,10 +599,10 @@ private[akka] class DeadLetterActorRef(_provider: ActorRefProvider,
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
*/
|
*/
|
||||||
private[akka] class VirtualPathContainer(
|
private[akka] class VirtualPathContainer(
|
||||||
override val provider: ActorRefProvider,
|
override val provider: ActorRefProvider,
|
||||||
override val path: ActorPath,
|
override val path: ActorPath,
|
||||||
override val getParent: InternalActorRef,
|
override val getParent: InternalActorRef,
|
||||||
val log: LoggingAdapter) extends MinimalActorRef {
|
val log: LoggingAdapter) extends MinimalActorRef {
|
||||||
|
|
||||||
private val children = new ConcurrentHashMap[String, InternalActorRef]
|
private val children = new ConcurrentHashMap[String, InternalActorRef]
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -39,6 +39,10 @@ import akka.persistence._
|
||||||
import akka.cluster.ClusterEvent.ClusterDomainEvent
|
import akka.cluster.ClusterEvent.ClusterDomainEvent
|
||||||
import akka.cluster.singleton.ClusterSingletonManager
|
import akka.cluster.singleton.ClusterSingletonManager
|
||||||
import akka.cluster.singleton.ClusterSingletonManagerSettings
|
import akka.cluster.singleton.ClusterSingletonManagerSettings
|
||||||
|
import scala.concurrent.Future
|
||||||
|
import akka.dispatch.ExecutionContexts
|
||||||
|
import akka.pattern.pipe
|
||||||
|
import scala.util.Success
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This extension provides sharding functionality of actors in a cluster.
|
* This extension provides sharding functionality of actors in a cluster.
|
||||||
|
|
@ -1288,11 +1292,11 @@ object ShardCoordinator {
|
||||||
* @param shardId the id of the shard to allocate
|
* @param shardId the id of the shard to allocate
|
||||||
* @param currentShardAllocations all actor refs to `ShardRegion` and their current allocated shards,
|
* @param currentShardAllocations all actor refs to `ShardRegion` and their current allocated shards,
|
||||||
* in the order they were allocated
|
* in the order they were allocated
|
||||||
* @return the actor ref of the [[ShardRegion]] that is to be responsible for the shard, must be one of
|
* @return a `Future` of the actor ref of the [[ShardRegion]] that is to be responsible for the shard, must be one of
|
||||||
* the references included in the `currentShardAllocations` parameter
|
* the references included in the `currentShardAllocations` parameter
|
||||||
*/
|
*/
|
||||||
def allocateShard(requester: ActorRef, shardId: ShardId,
|
def allocateShard(requester: ActorRef, shardId: ShardId,
|
||||||
currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardId]]): ActorRef
|
currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardId]]): Future[ActorRef]
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Invoked periodically to decide which shards to rebalance to another location.
|
* Invoked periodically to decide which shards to rebalance to another location.
|
||||||
|
|
@ -1300,10 +1304,10 @@ object ShardCoordinator {
|
||||||
* in the order they were allocated
|
* in the order they were allocated
|
||||||
* @param rebalanceInProgress set of shards that are currently being rebalanced, i.e.
|
* @param rebalanceInProgress set of shards that are currently being rebalanced, i.e.
|
||||||
* you should not include these in the returned set
|
* you should not include these in the returned set
|
||||||
* @return the shards to be migrated, may be empty to skip rebalance in this round
|
* @return a `Future` of the shards to be migrated, may be empty to skip rebalance in this round
|
||||||
*/
|
*/
|
||||||
def rebalance(currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardId]],
|
def rebalance(currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardId]],
|
||||||
rebalanceInProgress: Set[ShardId]): Set[ShardId]
|
rebalanceInProgress: Set[ShardId]): Future[Set[ShardId]]
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -1312,15 +1316,17 @@ object ShardCoordinator {
|
||||||
*/
|
*/
|
||||||
abstract class AbstractShardAllocationStrategy extends ShardAllocationStrategy {
|
abstract class AbstractShardAllocationStrategy extends ShardAllocationStrategy {
|
||||||
override final def allocateShard(requester: ActorRef, shardId: ShardId,
|
override final def allocateShard(requester: ActorRef, shardId: ShardId,
|
||||||
currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardId]]): ActorRef = {
|
currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardId]]): Future[ActorRef] = {
|
||||||
|
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
allocateShard(requester, shardId, currentShardAllocations.asJava)
|
allocateShard(requester, shardId, currentShardAllocations.asJava)
|
||||||
}
|
}
|
||||||
|
|
||||||
override final def rebalance(currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardId]],
|
override final def rebalance(currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardId]],
|
||||||
rebalanceInProgress: Set[ShardId]): Set[ShardId] = {
|
rebalanceInProgress: Set[ShardId]): Future[Set[ShardId]] = {
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
rebalance(currentShardAllocations.asJava, rebalanceInProgress.asJava).asScala.toSet
|
implicit val ec = ExecutionContexts.sameThreadExecutionContext
|
||||||
|
rebalance(currentShardAllocations.asJava, rebalanceInProgress.asJava).map(_.asScala.toSet)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -1330,11 +1336,11 @@ object ShardCoordinator {
|
||||||
* @param shardId the id of the shard to allocate
|
* @param shardId the id of the shard to allocate
|
||||||
* @param currentShardAllocations all actor refs to `ShardRegion` and their current allocated shards,
|
* @param currentShardAllocations all actor refs to `ShardRegion` and their current allocated shards,
|
||||||
* in the order they were allocated
|
* in the order they were allocated
|
||||||
* @return the actor ref of the [[ShardRegion]] that is to be responsible for the shard, must be one of
|
* @return a `Future` of the actor ref of the [[ShardRegion]] that is to be responsible for the shard, must be one of
|
||||||
* the references included in the `currentShardAllocations` parameter
|
* the references included in the `currentShardAllocations` parameter
|
||||||
*/
|
*/
|
||||||
def allocateShard(requester: ActorRef, shardId: String,
|
def allocateShard(requester: ActorRef, shardId: String,
|
||||||
currentShardAllocations: java.util.Map[ActorRef, immutable.IndexedSeq[String]]): ActorRef
|
currentShardAllocations: java.util.Map[ActorRef, immutable.IndexedSeq[String]]): Future[ActorRef]
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Invoked periodically to decide which shards to rebalance to another location.
|
* Invoked periodically to decide which shards to rebalance to another location.
|
||||||
|
|
@ -1342,12 +1348,14 @@ object ShardCoordinator {
|
||||||
* in the order they were allocated
|
* in the order they were allocated
|
||||||
* @param rebalanceInProgress set of shards that are currently being rebalanced, i.e.
|
* @param rebalanceInProgress set of shards that are currently being rebalanced, i.e.
|
||||||
* you should not include these in the returned set
|
* you should not include these in the returned set
|
||||||
* @return the shards to be migrated, may be empty to skip rebalance in this round
|
* @return a `Future` of the shards to be migrated, may be empty to skip rebalance in this round
|
||||||
*/
|
*/
|
||||||
def rebalance(currentShardAllocations: java.util.Map[ActorRef, immutable.IndexedSeq[String]],
|
def rebalance(currentShardAllocations: java.util.Map[ActorRef, immutable.IndexedSeq[String]],
|
||||||
rebalanceInProgress: java.util.Set[String]): java.util.Set[String]
|
rebalanceInProgress: java.util.Set[String]): Future[java.util.Set[String]]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private val emptyRebalanceResult = Future.successful(Set.empty[ShardId])
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The default implementation of [[ShardCoordinator.LeastShardAllocationStrategy]]
|
* The default implementation of [[ShardCoordinator.LeastShardAllocationStrategy]]
|
||||||
* allocates new shards to the `ShardRegion` with least number of previously allocated shards.
|
* allocates new shards to the `ShardRegion` with least number of previously allocated shards.
|
||||||
|
|
@ -1361,23 +1369,23 @@ object ShardCoordinator {
|
||||||
extends ShardAllocationStrategy with Serializable {
|
extends ShardAllocationStrategy with Serializable {
|
||||||
|
|
||||||
override def allocateShard(requester: ActorRef, shardId: ShardId,
|
override def allocateShard(requester: ActorRef, shardId: ShardId,
|
||||||
currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardId]]): ActorRef = {
|
currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardId]]): Future[ActorRef] = {
|
||||||
val (regionWithLeastShards, _) = currentShardAllocations.minBy { case (_, v) ⇒ v.size }
|
val (regionWithLeastShards, _) = currentShardAllocations.minBy { case (_, v) ⇒ v.size }
|
||||||
regionWithLeastShards
|
Future.successful(regionWithLeastShards)
|
||||||
}
|
}
|
||||||
|
|
||||||
override def rebalance(currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardId]],
|
override def rebalance(currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardId]],
|
||||||
rebalanceInProgress: Set[ShardId]): Set[ShardId] = {
|
rebalanceInProgress: Set[ShardId]): Future[Set[ShardId]] = {
|
||||||
if (rebalanceInProgress.size < maxSimultaneousRebalance) {
|
if (rebalanceInProgress.size < maxSimultaneousRebalance) {
|
||||||
val (regionWithLeastShards, leastShards) = currentShardAllocations.minBy { case (_, v) ⇒ v.size }
|
val (regionWithLeastShards, leastShards) = currentShardAllocations.minBy { case (_, v) ⇒ v.size }
|
||||||
val mostShards = currentShardAllocations.collect {
|
val mostShards = currentShardAllocations.collect {
|
||||||
case (_, v) ⇒ v.filterNot(s ⇒ rebalanceInProgress(s))
|
case (_, v) ⇒ v.filterNot(s ⇒ rebalanceInProgress(s))
|
||||||
}.maxBy(_.size)
|
}.maxBy(_.size)
|
||||||
if (mostShards.size - leastShards.size >= rebalanceThreshold)
|
if (mostShards.size - leastShards.size >= rebalanceThreshold)
|
||||||
Set(mostShards.head)
|
Future.successful(Set(mostShards.head))
|
||||||
else
|
else
|
||||||
Set.empty
|
emptyRebalanceResult
|
||||||
} else Set.empty
|
} else emptyRebalanceResult
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -1451,6 +1459,17 @@ object ShardCoordinator {
|
||||||
*/
|
*/
|
||||||
@SerialVersionUID(1L) final case class ShardStopped(shard: ShardId) extends CoordinatorCommand
|
@SerialVersionUID(1L) final case class ShardStopped(shard: ShardId) extends CoordinatorCommand
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Result of `allocateShard` is piped to self with this message.
|
||||||
|
*/
|
||||||
|
@SerialVersionUID(1L) final case class AllocateShardResult(
|
||||||
|
shard: ShardId, shardRegion: Option[ActorRef], getShardHomeSender: ActorRef) extends CoordinatorCommand
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Result of `rebalance` is piped to self with this message.
|
||||||
|
*/
|
||||||
|
@SerialVersionUID(1L) final case class RebalanceResult(shards: Set[ShardId]) extends CoordinatorCommand
|
||||||
|
|
||||||
// DomainEvents for the persistent state of the event sourced ShardCoordinator
|
// DomainEvents for the persistent state of the event sourced ShardCoordinator
|
||||||
sealed trait DomainEvent
|
sealed trait DomainEvent
|
||||||
@SerialVersionUID(1L) final case class ShardRegionRegistered(region: ActorRef) extends DomainEvent
|
@SerialVersionUID(1L) final case class ShardRegionRegistered(region: ActorRef) extends DomainEvent
|
||||||
|
|
@ -1693,20 +1712,29 @@ class ShardCoordinator(handOffTimeout: FiniteDuration, shardStartTimeout: Finite
|
||||||
case Some(ref) ⇒ sender() ! ShardHome(shard, ref)
|
case Some(ref) ⇒ sender() ! ShardHome(shard, ref)
|
||||||
case None ⇒
|
case None ⇒
|
||||||
if (persistentState.regions.nonEmpty) {
|
if (persistentState.regions.nonEmpty) {
|
||||||
val region = allocationStrategy.allocateShard(sender(), shard, persistentState.regions)
|
val getShardHomeSender = sender()
|
||||||
require(persistentState.regions.contains(region),
|
val regionFuture = allocationStrategy.allocateShard(getShardHomeSender, shard, persistentState.regions)
|
||||||
s"Allocated region $region for shard [$shard] must be one of the registered regions: $persistentState")
|
regionFuture.value match {
|
||||||
persist(ShardHomeAllocated(shard, region)) { evt ⇒
|
case Some(Success(region)) ⇒
|
||||||
persistentState = persistentState.updated(evt)
|
continueGetShardHome(shard, region, getShardHomeSender)
|
||||||
log.debug("Shard [{}] allocated at [{}]", evt.shard, evt.region)
|
case _ ⇒
|
||||||
|
// continue when future is completed
|
||||||
sendHostShardMsg(evt.shard, evt.region)
|
regionFuture.map { region ⇒
|
||||||
sender() ! ShardHome(evt.shard, evt.region)
|
AllocateShardResult(shard, Some(region), getShardHomeSender)
|
||||||
|
}.recover {
|
||||||
|
case _ ⇒ AllocateShardResult(shard, None, getShardHomeSender)
|
||||||
|
}.pipeTo(self)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
case AllocateShardResult(shard, None, getShardHomeSender) ⇒
|
||||||
|
log.debug("Shard [{}] allocation failed. It will be retried.", shard)
|
||||||
|
|
||||||
|
case AllocateShardResult(shard, Some(region), getShardHomeSender) ⇒
|
||||||
|
continueGetShardHome(shard, region, getShardHomeSender)
|
||||||
|
|
||||||
case ShardStarted(shard) ⇒
|
case ShardStarted(shard) ⇒
|
||||||
unAckedHostShards.get(shard) match {
|
unAckedHostShards.get(shard) match {
|
||||||
case Some(cancel) ⇒
|
case Some(cancel) ⇒
|
||||||
|
|
@ -1722,14 +1750,22 @@ class ShardCoordinator(handOffTimeout: FiniteDuration, shardStartTimeout: Finite
|
||||||
}
|
}
|
||||||
|
|
||||||
case RebalanceTick ⇒
|
case RebalanceTick ⇒
|
||||||
if (persistentState.regions.nonEmpty)
|
if (persistentState.regions.nonEmpty) {
|
||||||
allocationStrategy.rebalance(persistentState.regions, rebalanceInProgress).foreach { shard ⇒
|
val shardsFuture = allocationStrategy.rebalance(persistentState.regions, rebalanceInProgress)
|
||||||
rebalanceInProgress += shard
|
shardsFuture.value match {
|
||||||
val rebalanceFromRegion = persistentState.shards(shard)
|
case Some(Success(shards)) ⇒
|
||||||
log.debug("Rebalance shard [{}] from [{}]", shard, rebalanceFromRegion)
|
continueRebalance(shards)
|
||||||
context.actorOf(rebalanceWorkerProps(shard, rebalanceFromRegion, handOffTimeout,
|
case _ ⇒
|
||||||
persistentState.regions.keySet ++ persistentState.regionProxies))
|
// continue when future is completed
|
||||||
|
shardsFuture.map { shards ⇒ RebalanceResult(shards)
|
||||||
|
}.recover {
|
||||||
|
case _ ⇒ RebalanceResult(Set.empty)
|
||||||
|
}.pipeTo(self)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
case RebalanceResult(shards) ⇒
|
||||||
|
continueRebalance(shards)
|
||||||
|
|
||||||
case RebalanceDone(shard, ok) ⇒
|
case RebalanceDone(shard, ok) ⇒
|
||||||
rebalanceInProgress -= shard
|
rebalanceInProgress -= shard
|
||||||
|
|
@ -1777,4 +1813,39 @@ class ShardCoordinator(handOffTimeout: FiniteDuration, shardStartTimeout: Finite
|
||||||
|
|
||||||
def allocateShardHomes(): Unit = persistentState.unallocatedShards.foreach { self ! GetShardHome(_) }
|
def allocateShardHomes(): Unit = persistentState.unallocatedShards.foreach { self ! GetShardHome(_) }
|
||||||
|
|
||||||
|
def continueGetShardHome(shard: ShardId, region: ActorRef, getShardHomeSender: ActorRef): Unit =
|
||||||
|
if (!rebalanceInProgress.contains(shard)) {
|
||||||
|
persistentState.shards.get(shard) match {
|
||||||
|
case Some(ref) ⇒ getShardHomeSender ! ShardHome(shard, ref)
|
||||||
|
case None ⇒
|
||||||
|
if (persistentState.regions.contains(region)) {
|
||||||
|
persist(ShardHomeAllocated(shard, region)) { evt ⇒
|
||||||
|
persistentState = persistentState.updated(evt)
|
||||||
|
log.debug("Shard [{}] allocated at [{}]", evt.shard, evt.region)
|
||||||
|
|
||||||
|
sendHostShardMsg(evt.shard, evt.region)
|
||||||
|
getShardHomeSender ! ShardHome(evt.shard, evt.region)
|
||||||
|
}
|
||||||
|
} else
|
||||||
|
log.debug("Allocated region {} for shard [{}] is not (any longer) one of the registered regions: {}",
|
||||||
|
region, shard, persistentState)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
def continueRebalance(shards: Set[ShardId]): Unit =
|
||||||
|
shards.foreach { shard ⇒
|
||||||
|
if (!rebalanceInProgress(shard)) {
|
||||||
|
persistentState.shards.get(shard) match {
|
||||||
|
case Some(rebalanceFromRegion) ⇒
|
||||||
|
rebalanceInProgress += shard
|
||||||
|
log.debug("Rebalance shard [{}] from [{}]", shard, rebalanceFromRegion)
|
||||||
|
context.actorOf(rebalanceWorkerProps(shard, rebalanceFromRegion, handOffTimeout,
|
||||||
|
persistentState.regions.keySet ++ persistentState.regionProxies))
|
||||||
|
case None ⇒
|
||||||
|
log.debug("Rebalance of non-existing shard [{}] is ignored", shard)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,234 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
|
||||||
|
*/
|
||||||
|
package akka.cluster.sharding
|
||||||
|
|
||||||
|
import scala.collection.immutable
|
||||||
|
import java.io.File
|
||||||
|
import akka.cluster.sharding.ShardRegion.Passivate
|
||||||
|
import scala.concurrent.duration._
|
||||||
|
import org.apache.commons.io.FileUtils
|
||||||
|
import com.typesafe.config.ConfigFactory
|
||||||
|
import akka.actor._
|
||||||
|
import akka.cluster.Cluster
|
||||||
|
import akka.cluster.ClusterEvent._
|
||||||
|
import akka.persistence.Persistence
|
||||||
|
import akka.persistence.journal.leveldb.SharedLeveldbJournal
|
||||||
|
import akka.persistence.journal.leveldb.SharedLeveldbStore
|
||||||
|
import akka.remote.testconductor.RoleName
|
||||||
|
import akka.remote.testkit.MultiNodeConfig
|
||||||
|
import akka.remote.testkit.MultiNodeSpec
|
||||||
|
import akka.remote.testkit.STMultiNodeSpec
|
||||||
|
import akka.remote.transport.ThrottlerTransportAdapter.Direction
|
||||||
|
import akka.testkit._
|
||||||
|
import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy
|
||||||
|
import scala.concurrent.Future
|
||||||
|
import akka.util.Timeout
|
||||||
|
import akka.pattern.ask
|
||||||
|
|
||||||
|
object ClusterShardingCustomShardAllocationSpec extends MultiNodeConfig {
|
||||||
|
val first = role("first")
|
||||||
|
val second = role("second")
|
||||||
|
|
||||||
|
commonConfig(ConfigFactory.parseString("""
|
||||||
|
akka.loglevel = INFO
|
||||||
|
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
|
||||||
|
akka.remote.log-remote-lifecycle-events = off
|
||||||
|
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared"
|
||||||
|
akka.persistence.journal.leveldb-shared {
|
||||||
|
timeout = 5s
|
||||||
|
store {
|
||||||
|
native = off
|
||||||
|
dir = "target/journal-ClusterShardingCustomShardAllocationSpec"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
|
||||||
|
akka.persistence.snapshot-store.local.dir = "target/snapshots-ClusterShardingCustomShardAllocationSpec"
|
||||||
|
"""))
|
||||||
|
|
||||||
|
class Entity extends Actor {
|
||||||
|
def receive = {
|
||||||
|
case id: Int ⇒ sender() ! id
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
val idExtractor: ShardRegion.IdExtractor = {
|
||||||
|
case id: Int ⇒ (id.toString, id)
|
||||||
|
}
|
||||||
|
|
||||||
|
val shardResolver: ShardRegion.ShardResolver = msg ⇒ msg match {
|
||||||
|
case id: Int ⇒ id.toString
|
||||||
|
}
|
||||||
|
|
||||||
|
case object AllocateReq
|
||||||
|
case class UseRegion(region: ActorRef)
|
||||||
|
case object UseRegionAck
|
||||||
|
case object RebalanceReq
|
||||||
|
case class RebalanceShards(shards: Set[String])
|
||||||
|
case object RebalanceShardsAck
|
||||||
|
|
||||||
|
class Allocator extends Actor {
|
||||||
|
var useRegion: Option[ActorRef] = None
|
||||||
|
var rebalance = Set.empty[String]
|
||||||
|
def receive = {
|
||||||
|
case UseRegion(region) ⇒
|
||||||
|
useRegion = Some(region)
|
||||||
|
sender() ! UseRegionAck
|
||||||
|
case AllocateReq ⇒
|
||||||
|
useRegion.foreach { sender() ! _ }
|
||||||
|
case RebalanceShards(shards) ⇒
|
||||||
|
rebalance = shards
|
||||||
|
sender() ! RebalanceShardsAck
|
||||||
|
case RebalanceReq ⇒
|
||||||
|
sender() ! rebalance
|
||||||
|
rebalance = Set.empty
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
case class TestAllocationStrategy(ref: ActorRef) extends ShardAllocationStrategy {
|
||||||
|
implicit val timeout = Timeout(3.seconds)
|
||||||
|
override def allocateShard(requester: ActorRef, shardId: ShardRegion.ShardId, currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardRegion.ShardId]]): Future[ActorRef] = {
|
||||||
|
(ref ? AllocateReq).mapTo[ActorRef]
|
||||||
|
}
|
||||||
|
|
||||||
|
override def rebalance(currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardRegion.ShardId]], rebalanceInProgress: Set[ShardRegion.ShardId]): Future[Set[ShardRegion.ShardId]] = {
|
||||||
|
(ref ? RebalanceReq).mapTo[Set[String]]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
class ClusterShardingCustomShardAllocationMultiJvmNode1 extends ClusterShardingCustomShardAllocationSpec
|
||||||
|
class ClusterShardingCustomShardAllocationMultiJvmNode2 extends ClusterShardingCustomShardAllocationSpec
|
||||||
|
|
||||||
|
class ClusterShardingCustomShardAllocationSpec extends MultiNodeSpec(ClusterShardingCustomShardAllocationSpec) with STMultiNodeSpec with ImplicitSender {
|
||||||
|
import ClusterShardingCustomShardAllocationSpec._
|
||||||
|
|
||||||
|
override def initialParticipants = roles.size
|
||||||
|
|
||||||
|
val storageLocations = List(
|
||||||
|
"akka.persistence.journal.leveldb.dir",
|
||||||
|
"akka.persistence.journal.leveldb-shared.store.dir",
|
||||||
|
"akka.persistence.snapshot-store.local.dir").map(s ⇒ new File(system.settings.config.getString(s)))
|
||||||
|
|
||||||
|
override protected def atStartup() {
|
||||||
|
runOn(first) {
|
||||||
|
storageLocations.foreach(dir ⇒ if (dir.exists) FileUtils.deleteDirectory(dir))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
override protected def afterTermination() {
|
||||||
|
runOn(first) {
|
||||||
|
storageLocations.foreach(dir ⇒ if (dir.exists) FileUtils.deleteDirectory(dir))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
def join(from: RoleName, to: RoleName): Unit = {
|
||||||
|
runOn(from) {
|
||||||
|
Cluster(system) join node(to).address
|
||||||
|
startSharding()
|
||||||
|
}
|
||||||
|
enterBarrier(from.name + "-joined")
|
||||||
|
}
|
||||||
|
|
||||||
|
def startSharding(): Unit = {
|
||||||
|
ClusterSharding(system).start(
|
||||||
|
typeName = "Entity",
|
||||||
|
entryProps = Some(Props[Entity]),
|
||||||
|
roleOverride = None,
|
||||||
|
rememberEntries = false,
|
||||||
|
idExtractor = idExtractor,
|
||||||
|
shardResolver = shardResolver,
|
||||||
|
allocationStrategy = TestAllocationStrategy(allocator))
|
||||||
|
}
|
||||||
|
|
||||||
|
lazy val region = ClusterSharding(system).shardRegion("Entity")
|
||||||
|
|
||||||
|
lazy val allocator = system.actorOf(Props[Allocator], "allocator")
|
||||||
|
|
||||||
|
"Cluster sharding with custom allocation strategy" must {
|
||||||
|
|
||||||
|
"setup shared journal" in {
|
||||||
|
// start the Persistence extension
|
||||||
|
Persistence(system)
|
||||||
|
runOn(first) {
|
||||||
|
system.actorOf(Props[SharedLeveldbStore], "store")
|
||||||
|
}
|
||||||
|
enterBarrier("peristence-started")
|
||||||
|
|
||||||
|
runOn(first, second) {
|
||||||
|
system.actorSelection(node(first) / "user" / "store") ! Identify(None)
|
||||||
|
val sharedStore = expectMsgType[ActorIdentity].ref.get
|
||||||
|
SharedLeveldbJournal.setStore(sharedStore, system)
|
||||||
|
}
|
||||||
|
|
||||||
|
enterBarrier("after-1")
|
||||||
|
}
|
||||||
|
|
||||||
|
"use specified region" in within(10.seconds) {
|
||||||
|
join(first, first)
|
||||||
|
|
||||||
|
runOn(first) {
|
||||||
|
allocator ! UseRegion(region)
|
||||||
|
expectMsg(UseRegionAck)
|
||||||
|
region ! 1
|
||||||
|
expectMsg(1)
|
||||||
|
lastSender.path should be(region.path / "1" / "1")
|
||||||
|
}
|
||||||
|
enterBarrier("first-started")
|
||||||
|
|
||||||
|
join(second, first)
|
||||||
|
|
||||||
|
region ! 2
|
||||||
|
expectMsg(2)
|
||||||
|
runOn(first) {
|
||||||
|
lastSender.path should be(region.path / "2" / "2")
|
||||||
|
}
|
||||||
|
runOn(second) {
|
||||||
|
lastSender.path should be(node(first) / "user" / "sharding" / "Entity" / "2" / "2")
|
||||||
|
}
|
||||||
|
enterBarrier("second-started")
|
||||||
|
|
||||||
|
runOn(first) {
|
||||||
|
system.actorSelection(node(second) / "user" / "sharding" / "Entity") ! Identify(None)
|
||||||
|
val secondRegion = expectMsgType[ActorIdentity].ref.get
|
||||||
|
allocator ! UseRegion(secondRegion)
|
||||||
|
expectMsg(UseRegionAck)
|
||||||
|
}
|
||||||
|
enterBarrier("second-active")
|
||||||
|
|
||||||
|
region ! 3
|
||||||
|
expectMsg(3)
|
||||||
|
runOn(second) {
|
||||||
|
lastSender.path should be(region.path / "3" / "3")
|
||||||
|
}
|
||||||
|
runOn(first) {
|
||||||
|
lastSender.path should be(node(second) / "user" / "sharding" / "Entity" / "3" / "3")
|
||||||
|
}
|
||||||
|
|
||||||
|
enterBarrier("after-2")
|
||||||
|
}
|
||||||
|
|
||||||
|
"rebalance specified shards" in within(15.seconds) {
|
||||||
|
runOn(first) {
|
||||||
|
allocator ! RebalanceShards(Set("2"))
|
||||||
|
expectMsg(RebalanceShardsAck)
|
||||||
|
|
||||||
|
awaitAssert {
|
||||||
|
val p = TestProbe()
|
||||||
|
region.tell(2, p.ref)
|
||||||
|
p.expectMsg(2.second, 2)
|
||||||
|
p.lastSender.path should be(node(second) / "user" / "sharding" / "Entity" / "2" / "2")
|
||||||
|
}
|
||||||
|
|
||||||
|
region ! 1
|
||||||
|
expectMsg(1)
|
||||||
|
lastSender.path should be(region.path / "1" / "1")
|
||||||
|
}
|
||||||
|
|
||||||
|
enterBarrier("after-2")
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
@ -3,8 +3,10 @@
|
||||||
*/
|
*/
|
||||||
package akka.cluster.sharding
|
package akka.cluster.sharding
|
||||||
|
|
||||||
import akka.testkit.AkkaSpec
|
import scala.concurrent.Await
|
||||||
|
import scala.concurrent.duration._
|
||||||
import akka.actor.Props
|
import akka.actor.Props
|
||||||
|
import akka.testkit.AkkaSpec
|
||||||
|
|
||||||
class LeastShardAllocationStrategySpec extends AkkaSpec {
|
class LeastShardAllocationStrategySpec extends AkkaSpec {
|
||||||
import ShardCoordinator._
|
import ShardCoordinator._
|
||||||
|
|
@ -18,7 +20,7 @@ class LeastShardAllocationStrategySpec extends AkkaSpec {
|
||||||
"LeastShardAllocationStrategy" must {
|
"LeastShardAllocationStrategy" must {
|
||||||
"allocate to region with least number of shards" in {
|
"allocate to region with least number of shards" in {
|
||||||
val allocations = Map(regionA -> Vector("shard1"), regionB -> Vector("shard2"), regionC -> Vector.empty)
|
val allocations = Map(regionA -> Vector("shard1"), regionB -> Vector("shard2"), regionC -> Vector.empty)
|
||||||
allocationStrategy.allocateShard(regionA, "shard3", allocations) should ===(regionC)
|
Await.result(allocationStrategy.allocateShard(regionA, "shard3", allocations), 3.seconds) should ===(regionC)
|
||||||
}
|
}
|
||||||
|
|
||||||
"rebalance from region with most number of shards" in {
|
"rebalance from region with most number of shards" in {
|
||||||
|
|
@ -26,22 +28,22 @@ class LeastShardAllocationStrategySpec extends AkkaSpec {
|
||||||
regionC -> Vector.empty)
|
regionC -> Vector.empty)
|
||||||
|
|
||||||
// so far regionB has 2 shards and regionC has 0 shards, but the diff is less than rebalanceThreshold
|
// so far regionB has 2 shards and regionC has 0 shards, but the diff is less than rebalanceThreshold
|
||||||
allocationStrategy.rebalance(allocations, Set.empty) should be(Set.empty)
|
Await.result(allocationStrategy.rebalance(allocations, Set.empty), 3.seconds) should ===(Set.empty[String])
|
||||||
|
|
||||||
val allocations2 = allocations.updated(regionB, Vector("shard2", "shard3", "shard4"))
|
val allocations2 = allocations.updated(regionB, Vector("shard2", "shard3", "shard4"))
|
||||||
allocationStrategy.rebalance(allocations2, Set.empty) should ===(Set("shard2"))
|
Await.result(allocationStrategy.rebalance(allocations2, Set.empty), 3.seconds) should ===(Set("shard2"))
|
||||||
allocationStrategy.rebalance(allocations2, Set("shard4")) should be(Set.empty)
|
Await.result(allocationStrategy.rebalance(allocations2, Set("shard4")), 3.seconds) should ===(Set.empty[String])
|
||||||
|
|
||||||
val allocations3 = allocations2.updated(regionA, Vector("shard1", "shard5", "shard6"))
|
val allocations3 = allocations2.updated(regionA, Vector("shard1", "shard5", "shard6"))
|
||||||
allocationStrategy.rebalance(allocations3, Set("shard1")) should ===(Set("shard2"))
|
Await.result(allocationStrategy.rebalance(allocations3, Set("shard1")), 3.seconds) should ===(Set("shard2"))
|
||||||
}
|
}
|
||||||
|
|
||||||
"must limit number of simultanious rebalance" in {
|
"must limit number of simultanious rebalance" in {
|
||||||
val allocations = Map(regionA -> Vector("shard1"),
|
val allocations = Map(regionA -> Vector("shard1"),
|
||||||
regionB -> Vector("shard2", "shard3", "shard4", "shard5", "shard6"), regionC -> Vector.empty)
|
regionB -> Vector("shard2", "shard3", "shard4", "shard5", "shard6"), regionC -> Vector.empty)
|
||||||
|
|
||||||
allocationStrategy.rebalance(allocations, Set("shard2")) should ===(Set("shard3"))
|
Await.result(allocationStrategy.rebalance(allocations, Set("shard2")), 3.seconds) should ===(Set("shard3"))
|
||||||
allocationStrategy.rebalance(allocations, Set("shard2", "shard3")) should be(Set.empty)
|
Await.result(allocationStrategy.rebalance(allocations, Set("shard2", "shard3")), 3.seconds) should ===(Set.empty[String])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -293,9 +293,8 @@ terminate the actor system.
|
||||||
|
|
||||||
.. includecode:: ../../../akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/factorial/FactorialFrontendMain.java#registerOnRemoved
|
.. includecode:: ../../../akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/factorial/FactorialFrontendMain.java#registerOnRemoved
|
||||||
|
|
||||||
.. note::
|
.. note:: Register a OnMemberRemoved callback on a cluster that have been shutdown, the callback will be invoked immediately on
|
||||||
Register a OnMemberRemoved callback on a cluster that have been shutdown ,the callback will be invoked immediately on
|
the caller thread, otherwise it will be invoked later when the current member status changed to 'Removed'. You may
|
||||||
the caller thread,otherwise it will be invoked later when the current member status changed to 'Removed'.You may
|
|
||||||
want to install some cleanup handling after the cluster was started up,but the cluster might already be shutting
|
want to install some cleanup handling after the cluster was started up,but the cluster might already be shutting
|
||||||
down when you installing, and depending on the race is not healthy.
|
down when you installing, and depending on the race is not healthy.
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -31,7 +31,7 @@ depend on 3.8.0.Final that break with 3.10.3.Final should be able to manually do
|
||||||
to 3.8.0.Final and Akka will still work with that version.
|
to 3.8.0.Final and Akka will still work with that version.
|
||||||
|
|
||||||
Advanced Notice: TypedActors will go away
|
Advanced Notice: TypedActors will go away
|
||||||
========================================
|
=========================================
|
||||||
|
|
||||||
While technically not yet deprecated, the current ``akka.actor.TypedActor`` support will be superseded by
|
While technically not yet deprecated, the current ``akka.actor.TypedActor`` support will be superseded by
|
||||||
the Akka Typed project that is currently being developed in open preview mode. If you are using TypedActors
|
the Akka Typed project that is currently being developed in open preview mode. If you are using TypedActors
|
||||||
|
|
@ -192,7 +192,7 @@ since it is still enabled in akka-cluster by default (for compatibility with pas
|
||||||
Router configuration entries have also changed for the module, they use prefix ``cluster-metrics-``:
|
Router configuration entries have also changed for the module, they use prefix ``cluster-metrics-``:
|
||||||
``cluster-metrics-adaptive-pool`` and ``cluster-metrics-adaptive-group``
|
``cluster-metrics-adaptive-pool`` and ``cluster-metrics-adaptive-group``
|
||||||
Metrics extension classes and objects are located in the new package ``akka.cluster.metrics``.
|
Metrics extension classes and objects are located in the new package ``akka.cluster.metrics``.
|
||||||
Please see :ref:`Scala <cluster-metrics-scala>`, :ref:`Java <cluster-metrics-java>` for more information.
|
Please see :ref:`Scala <cluster_metrics_scala>`, :ref:`Java <cluster_metrics_java>` for more information.
|
||||||
|
|
||||||
Microkernel is Deprecated
|
Microkernel is Deprecated
|
||||||
=========================
|
=========================
|
||||||
|
|
@ -254,3 +254,13 @@ it as an ordinary actor if you need multiple instances of it with different sett
|
||||||
The parameters of the ``Props`` factory methods in the ``ClusterReceptionist`` companion
|
The parameters of the ``Props`` factory methods in the ``ClusterReceptionist`` companion
|
||||||
has been moved to settings object ``ClusterReceptionistSettings``. This can be created from
|
has been moved to settings object ``ClusterReceptionistSettings``. This can be created from
|
||||||
system configuration properties and also amended with API as needed.
|
system configuration properties and also amended with API as needed.
|
||||||
|
|
||||||
|
Asynchronous ShardAllocationStrategy
|
||||||
|
====================================
|
||||||
|
|
||||||
|
The methods of the ``ShardAllocationStrategy`` and ``AbstractShardAllocationStrategy`` in Cluster Sharding
|
||||||
|
have changed return type to a ``Future`` to support asynchronous decision. For example you can ask an
|
||||||
|
actor external actor of how to allocate shards or rebalance shards.
|
||||||
|
|
||||||
|
For the synchronous case you can return the result via ``scala.concurrent.Future.successful`` in Scala or
|
||||||
|
``akka.dispatch.Futures.successful`` in Java.
|
||||||
|
|
|
||||||
|
|
@ -287,9 +287,8 @@ terminate the actor system.
|
||||||
|
|
||||||
.. includecode:: ../../../akka-samples/akka-sample-cluster-scala/src/main/scala/sample/cluster/factorial/FactorialFrontend.scala#registerOnRemoved
|
.. includecode:: ../../../akka-samples/akka-sample-cluster-scala/src/main/scala/sample/cluster/factorial/FactorialFrontend.scala#registerOnRemoved
|
||||||
|
|
||||||
.. note::
|
.. note:: Register a OnMemberRemoved callback on a cluster that have been shutdown, the callback will be invoked immediately on
|
||||||
Register a OnMemberRemoved callback on a cluster that have been shutdown ,the callback will be invoked immediately on
|
the caller thread, otherwise it will be invoked later when the current member status changed to 'Removed'. You may
|
||||||
the caller thread,otherwise it will be invoked later when the current member status changed to 'Removed'.You may
|
|
||||||
want to install some cleanup handling after the cluster was started up,but the cluster might already be shutting
|
want to install some cleanup handling after the cluster was started up,but the cluster might already be shutting
|
||||||
down when you installing, and depending on the race is not healthy.
|
down when you installing, and depending on the race is not healthy.
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue