External shard allocation strategy (#28211)

This commit is contained in:
Christopher Batey 2020-01-21 16:08:51 +00:00 committed by GitHub
parent cbde44fc48
commit cbf9f71c32
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 965 additions and 5 deletions

View file

@ -0,0 +1,27 @@
/*
* Copyright (C) 2019-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding.typed
import akka.cluster.sharding.typed.internal.Murmur2
abstract class Murmur2NoEnvelopeMessageExtractor[M](val numberOfShards: Int) extends ShardingMessageExtractor[M, M] {
override def shardId(entityId: String): String = Murmur2.shardId(entityId, numberOfShards)
override def unwrapMessage(message: M): M = message
}
/**
* The murmur2 message extractor uses the same algorithm as the default kafka partitoiner
* allowing kafka partitions to be mapped to shards.
* This can be used with the [[akka.cluster.sharding.external.ExternalShardAllocationStrategy]] to have messages
* processed locally.
*
* Extend [[Murmur2NoEnvelopeMessageExtractor]] to not use a message envelope extractor.
*/
final class Murmur2MessageExtractor[M](val numberOfShards: Int)
extends ShardingMessageExtractor[ShardingEnvelope[M], M] {
override def entityId(envelope: ShardingEnvelope[M]): String = envelope.entityId
override def shardId(entityId: String): String = Murmur2.shardId(entityId, numberOfShards)
override def unwrapMessage(envelope: ShardingEnvelope[M]): M = envelope.message
}

View file

@ -0,0 +1,77 @@
/*
* toPositive and murmur based on Java version in https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
* to match up with Kafka partitioning. Licensed under:
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package akka.cluster.sharding.typed.internal
import java.nio.charset.StandardCharsets
import akka.annotation.InternalApi
/**
* INTERNAL API
*/
@InternalApi
private[sharding] object Murmur2 {
def toPositive(number: Int): Int = number & 0x7fffffff
def murmur2(data: Array[Byte]) = {
val length = data.length
val seed = 0x9747b28c
// 'm' and 'r' are mixing constants generated offline.
// They're not really 'magic', they just happen to work well.
val m = 0x5bd1e995
val r = 24
// Initialize the hash to a random value
var h = seed ^ length
val length4 = length / 4
for (i <- 0 until length4) {
val i4 = i * 4
var k = (data(i4 + 0) & 0xff) + ((data(i4 + 1) & 0xff) << 8) + ((data(i4 + 2) & 0xff) << 16) + ((data(i4 + 3) & 0xff) << 24)
k *= m
k ^= k >>> r
k *= m
h *= m
h ^= k
}
// Handle the last few bytes of the input array
length % 4 match {
case 3 =>
h ^= (data((length & ~3) + 2) & 0xff) << 16
h ^= (data((length & ~3) + 1) & 0xff) << 8
h ^= data(length & ~3) & 0xff
h *= m
case 2 =>
h ^= (data((length & ~3) + 1) & 0xff) << 8
h ^= data(length & ~3) & 0xff
h *= m
case 1 =>
h ^= data(length & ~3) & 0xff
h *= m
}
h ^= h >>> 13
h *= m
h ^= h >>> 15
h
}
// To match https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java#L59
def shardId(entityId: String, nrShards: Int): String =
(toPositive(murmur2(entityId.getBytes(StandardCharsets.UTF_8))) % nrShards).toString
}

View file

@ -467,7 +467,7 @@ object EntityTypeKey {
*
* Please note that an implicit [[akka.util.Timeout]] must be available to use this pattern.
*
* Note: it is preferrable to use the non-symbolic ask method as it easier allows for wildcards for
* Note: it is preferable to use the non-symbolic ask method as it easier allows for wildcards for
* the `replyTo: ActorRef`.
*
* @tparam Res The response protocol, what the other actor sends back

View file

@ -0,0 +1,44 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.akka.cluster.sharding.typed;
import akka.Done;
import akka.actor.Address;
import akka.actor.typed.ActorRef;
import akka.actor.typed.ActorSystem;
import akka.cluster.sharding.external.ExternalShardAllocation;
import akka.cluster.sharding.external.javadsl.ExternalShardAllocationClient;
import akka.cluster.sharding.typed.ShardingEnvelope;
import akka.cluster.sharding.typed.javadsl.ClusterSharding;
import akka.cluster.sharding.typed.javadsl.Entity;
import akka.cluster.sharding.typed.javadsl.EntityTypeKey;
import java.util.concurrent.CompletionStage;
import static jdocs.akka.cluster.sharding.typed.ShardingCompileOnlyTest.Counter;
public class ExternalShardAllocationCompileOnlyTest {
void example() {
ActorSystem<?> system = null;
ClusterSharding sharding = ClusterSharding.get(system);
// #entity
EntityTypeKey<Counter.Command> typeKey = EntityTypeKey.create(Counter.Command.class, "Counter");
ActorRef<ShardingEnvelope<Counter.Command>> shardRegion =
sharding.init(Entity.of(typeKey, ctx -> Counter.create(ctx.getEntityId())));
// #entity
// #client
ExternalShardAllocationClient client =
ExternalShardAllocation.get(system).getClient(typeKey.name());
CompletionStage<Done> done =
client.setShardLocation("shard-id-1", new Address("akka", "system", "127.0.0.1", 2552));
// #client
}
}

View file

@ -0,0 +1,42 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.akka.cluster.sharding.typed
import akka.Done
import akka.actor.Address
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.cluster.sharding.external.ExternalShardAllocation
import akka.cluster.sharding.external.ExternalShardAllocationStrategy
import akka.cluster.sharding.external.scaladsl.ExternalShardAllocationClient
import akka.cluster.sharding.typed.ShardingEnvelope
import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
import akka.cluster.sharding.typed.scaladsl.ClusterSharding
import akka.cluster.sharding.typed.scaladsl.Entity
import docs.akka.cluster.sharding.typed.ShardingCompileOnlySpec.Basics.Counter
import scala.concurrent.Future
class ExternalShardAllocationCompileOnlySpec {
val system: ActorSystem[_] = ???
val sharding = ClusterSharding(system)
// #entity
val TypeKey = EntityTypeKey[Counter.Command]("Counter")
val entity = Entity(TypeKey)(createBehavior = entityContext => Counter(entityContext.entityId))
.withAllocationStrategy(new ExternalShardAllocationStrategy(system, TypeKey.name))
// #entity
val shardRegion: ActorRef[ShardingEnvelope[Counter.Command]] =
sharding.init(entity)
// #client
val client: ExternalShardAllocationClient = ExternalShardAllocation(system).clientFor(TypeKey.name)
val done: Future[Done] = client.updateShardLocation("shard-id-1", Address("akka", "system", "127.0.0.1", 2552))
// #client
}

View file

@ -110,6 +110,11 @@ akka.cluster.sharding {
max-simultaneous-rebalance = 3
}
external-shard-allocation-strategy {
# How long to wait for the client to persist an allocation to ddata or get a all shard locations
client-timeout = 5s
}
# Timeout of waiting the initial distributed state for the shard coordinator (an initial state will be queried again if the timeout happened)
# and for a shard to get its state when remembered entities is enabled
# The read from ddata is a ReadMajority, for small clusters (< majority-min-cap) every node needs to respond

View file

@ -104,6 +104,22 @@ object ShardCoordinator {
rebalanceInProgress: Set[ShardId]): Future[Set[ShardId]]
}
/**
* Shard allocation strategy where start is called by the shard coordinator before any calls to
* rebalance or allocate shard. This can be used if there is any expensive initialization to be done
* that you do not want to to in the constructor as it will happen on every node rather than just
* the node that hosts the ShardCoordinator
*/
trait StartableAllocationStrategy extends ShardAllocationStrategy {
/**
* Called before any calls to allocate/rebalance.
* Do not block. If asynchronous actions are required they can be started here and
* delay the Futures returned by allocate/rebalance.
*/
def start(): Unit
}
/**
* Java API: Java implementations of custom shard allocation and rebalancing logic used by the [[ShardCoordinator]]
* should extend this abstract class and implement the two methods.
@ -534,6 +550,14 @@ abstract class ShardCoordinator(
protected def typeName: String
override def preStart(): Unit = {
allocationStrategy match {
case strategy: StartableAllocationStrategy =>
strategy.start()
case _ =>
}
}
override def postStop(): Unit = {
super.postStop()
rebalanceTask.cancel()
@ -600,7 +624,9 @@ abstract class ShardCoordinator(
AllocateShardResult(shard, Some(region), getShardHomeSender)
}
.recover {
case _ => AllocateShardResult(shard, None, getShardHomeSender)
case t =>
log.error(t, "Shard [{}] allocation failed.", shard)
AllocateShardResult(shard, None, getShardHomeSender)
}
.pipeTo(self)
}

View file

@ -0,0 +1,7 @@
/*
* Copyright (C) 2019-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding.external
final class ClientTimeoutException(reason: String) extends RuntimeException(reason)

View file

@ -0,0 +1,55 @@
/*
* Copyright (C) 2019-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding.external
import java.util.concurrent.ConcurrentHashMap
import akka.actor.ExtendedActorSystem
import akka.actor.Extension
import akka.actor.ExtensionId
import akka.actor.ExtensionIdProvider
import akka.cluster.sharding.external.internal.ExternalShardAllocationClientImpl
import java.util.function.{ Function => JFunction }
import akka.actor.ClassicActorSystemProvider
import akka.annotation.ApiMayChange
/**
* API May Change
*/
@ApiMayChange
final class ExternalShardAllocation(system: ExtendedActorSystem) extends Extension {
private val clients = new ConcurrentHashMap[String, ExternalShardAllocationClientImpl]
private val factory = new JFunction[String, ExternalShardAllocationClientImpl] {
override def apply(typeName: String): ExternalShardAllocationClientImpl =
new ExternalShardAllocationClientImpl(system, typeName)
}
/**
* Scala API
*/
def clientFor(typeName: String): scaladsl.ExternalShardAllocationClient = client(typeName)
/**
* Java API
*/
def getClient(typeName: String): javadsl.ExternalShardAllocationClient = client(typeName)
private def client(typeName: String): ExternalShardAllocationClientImpl = {
clients.computeIfAbsent(typeName, factory)
}
}
object ExternalShardAllocation extends ExtensionId[ExternalShardAllocation] with ExtensionIdProvider {
override def createExtension(system: ExtendedActorSystem): ExternalShardAllocation =
new ExternalShardAllocation(system)
override def lookup(): ExternalShardAllocation.type = ExternalShardAllocation
override def get(system: ClassicActorSystemProvider): ExternalShardAllocation = super.get(system)
}

View file

@ -0,0 +1,219 @@
/*
* Copyright (C) 2019-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding.external
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorRef
import akka.actor.ActorRefScope
import akka.actor.Address
import akka.actor.AddressFromURIString
import akka.actor.ClassicActorSystemProvider
import akka.actor.ExtendedActorSystem
import akka.actor.NoSerializationVerificationNeeded
import akka.actor.Props
import akka.actor.Stash
import akka.cluster.Cluster
import akka.cluster.ddata.DistributedData
import akka.cluster.ddata.LWWMapKey
import akka.cluster.ddata.Replicator.Changed
import akka.cluster.ddata.Replicator.Subscribe
import akka.cluster.sharding.ShardCoordinator
import akka.cluster.sharding.ShardRegion.ShardId
import akka.event.Logging
import akka.pattern.AskTimeoutException
import akka.util.Timeout
import scala.collection.immutable
import scala.concurrent.Future
import scala.concurrent.duration._
object ExternalShardAllocationStrategy {
type ShardRegion = ActorRef
// local only messages
private[akka] final case class GetShardLocation(shard: ShardId)
private[akka] final case object GetShardLocations
private[akka] final case class GetShardLocationsResponse(desiredAllocations: Map[ShardId, Address])
private[akka] final case class GetShardLocationResponse(address: Option[Address])
// only returned locally, serialized as a string
final case class ShardLocation(address: Address) extends NoSerializationVerificationNeeded
private object DDataStateActor {
def props(typeName: String) = Props(new DDataStateActor(typeName))
}
// uses a string primitive types are optimized in ddata to not serialize every entity
// separately
private[akka] def ddataKey(typeName: String): LWWMapKey[ShardId, String] = {
LWWMapKey[ShardId, String](s"external-sharding-$typeName")
}
private class DDataStateActor(typeName: String) extends Actor with ActorLogging with Stash {
private val replicator = DistributedData(context.system).replicator
private val Key = ddataKey(typeName)
override def preStart(): Unit = {
log.debug("Starting ddata state actor for [{}]", typeName)
replicator ! Subscribe(Key, self)
}
var currentLocations: Map[ShardId, String] = Map.empty
override def receive: Receive = {
case c @ Changed(key: LWWMapKey[ShardId, String] @unchecked) =>
val newLocations = c.get(key).entries
currentLocations ++= newLocations
log.debug("Received updated shard locations [{}] all locations are now [{}]", newLocations, currentLocations)
case GetShardLocation(shard) =>
log.debug("GetShardLocation [{}]", shard)
val shardLocation = currentLocations.get(shard).map(asStr => AddressFromURIString(asStr))
sender() ! GetShardLocationResponse(shardLocation)
case GetShardLocations =>
log.debug("GetShardLocations")
sender() ! GetShardLocationsResponse(currentLocations.transform((_, asStr) => AddressFromURIString(asStr)))
}
}
}
class ExternalShardAllocationStrategy(systemProvider: ClassicActorSystemProvider, typeName: String)(
// local only ask
implicit val timeout: Timeout = Timeout(5.seconds))
extends ShardCoordinator.StartableAllocationStrategy {
private val system = systemProvider.classicSystem
import ExternalShardAllocationStrategy._
import akka.pattern.ask
import system.dispatcher
private val log = Logging(system, classOf[ExternalShardAllocationStrategy])
private var shardState: ActorRef = _
private[akka] def createShardStateActor(): ActorRef = {
system
.asInstanceOf[ExtendedActorSystem]
.systemActorOf(DDataStateActor.props(typeName), s"external-allocation-state-$typeName")
}
private val cluster = Cluster(system)
override def start(): Unit = {
shardState = createShardStateActor()
}
override def allocateShard(
requester: ShardRegion,
shardId: ShardId,
currentShardAllocations: Map[ShardRegion, immutable.IndexedSeq[ShardId]]): Future[ShardRegion] = {
log.debug("allocateShard [{}] [{}] [{}]", shardId, requester, currentShardAllocations)
// current shard allocations include all current shard regions
(shardState ? GetShardLocation(shardId))
.mapTo[GetShardLocationResponse]
.map {
case GetShardLocationResponse(None) =>
log.debug("No specific location for shard [{}]. Allocating to requester [{}]", shardId, requester)
requester
case GetShardLocationResponse(Some(address)) =>
// if it is the local address, convert it so it is found in the shards
if (address == cluster.selfAddress) {
currentShardAllocations.keys.find(_.path.address.hasLocalScope) match {
case None =>
log.debug("unable to find local shard in currentShardAllocation. Using requester")
requester
case Some(localShardRegion) =>
log.debug("allocating to local shard")
localShardRegion
}
} else {
currentShardAllocations.keys.find(_.path.address == address) match {
case None =>
log.debug(
"External shard location [{}] for shard [{}] not found in members [{}]",
address,
shardId,
currentShardAllocations.keys.mkString(","))
requester
case Some(location) =>
log.debug("Allocating shard to location [{}]", location)
location
}
}
}
.recover {
case _: AskTimeoutException =>
log.warning(
"allocate timed out waiting for shard allocation state [{}]. Allocating to requester [{}]",
shardId,
requester)
requester
}
}
override def rebalance(
currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardId]],
rebalanceInProgress: Set[ShardId]): Future[Set[ShardId]] = {
log.debug("rebalance [{}] [{}]", currentShardAllocations, rebalanceInProgress)
val currentAllocationByAddress: Map[Address, immutable.IndexedSeq[ShardId]] = currentShardAllocations.map {
case (ref: ActorRefScope, value) if ref.isLocal =>
(cluster.selfAddress, value) // so it can be compared to a address with host and port
case (key, value) => (key.path.address, value)
}
val currentlyAllocatedShards: Set[ShardId] = currentShardAllocations.foldLeft(Set.empty[ShardId]) {
case (acc, next) => acc ++ next._2.toSet
}
log.debug("Current allocations by address: [{}]", currentAllocationByAddress)
val shardsThatNeedRebalanced: Future[Set[ShardId]] = for {
desiredMappings <- (shardState ? GetShardLocations).mapTo[GetShardLocationsResponse]
} yield {
log.debug("desired allocations: [{}]", desiredMappings.desiredAllocations)
desiredMappings.desiredAllocations.filter {
case (shardId, expectedLocation) if currentlyAllocatedShards.contains(shardId) =>
currentAllocationByAddress.get(expectedLocation) match {
case None =>
log.debug(
"Shard [{}] desired location [{}] is not part of the cluster, not rebalancing",
shardId,
expectedLocation)
false // not a current allocation so don't rebalance yet
case Some(shards) =>
val inCorrectLocation = shards.contains(shardId)
!inCorrectLocation
}
case (shardId, _) =>
log.debug("Shard [{}] not currently allocated so not rebalancing to desired location", shardId)
false
}
}.keys.toSet
shardsThatNeedRebalanced
.map { done =>
if (done.nonEmpty) {
log.debug("Shards not currently in their desired location [{}]", done)
}
done
}
.recover {
case _: AskTimeoutException =>
log.warning("rebalance timed out waiting for shard allocation state. Keeping existing allocations")
Set.empty
}
}
}

View file

@ -0,0 +1,17 @@
/*
* Copyright (C) 2019-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding.external
import akka.cluster.sharding.ShardRegion.ShardId
import akka.cluster.sharding.external.ExternalShardAllocationStrategy.ShardLocation
import akka.util.ccompat.JavaConverters._
final class ShardLocations(val locations: Map[ShardId, ShardLocation]) {
/**
* Java API
*/
def getShardLocations(): java.util.Map[ShardId, ShardLocation] = locations.asJava
}

View file

@ -0,0 +1,91 @@
/*
* Copyright (C) 2019-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding.external.internal
import java.util.concurrent.CompletionStage
import akka.Done
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.Address
import akka.actor.AddressFromURIString
import akka.annotation.InternalApi
import akka.cluster.ddata.DistributedData
import akka.cluster.ddata.LWWMap
import akka.cluster.ddata.Replicator.Get
import akka.cluster.ddata.Replicator.GetFailure
import akka.cluster.ddata.Replicator.GetSuccess
import akka.cluster.ddata.Replicator.ReadMajority
import akka.cluster.ddata.Replicator.Update
import akka.cluster.ddata.Replicator.UpdateSuccess
import akka.cluster.ddata.Replicator.UpdateTimeout
import akka.cluster.ddata.Replicator.WriteLocal
import akka.cluster.ddata.SelfUniqueAddress
import akka.cluster.sharding.ShardRegion.ShardId
import akka.cluster.sharding.external.ClientTimeoutException
import akka.cluster.sharding.external.ExternalShardAllocationStrategy
import akka.cluster.sharding.external.ExternalShardAllocationStrategy.ShardLocation
import akka.cluster.sharding.external.ShardLocations
import akka.event.Logging
import akka.util.Timeout
import akka.util.PrettyDuration._
import akka.pattern.ask
import scala.concurrent.Future
import scala.compat.java8.FutureConverters._
import akka.util.JavaDurationConverters._
/**
* INTERNAL API
*/
@InternalApi
final private[external] class ExternalShardAllocationClientImpl(system: ActorSystem, typeName: String)
extends akka.cluster.sharding.external.scaladsl.ExternalShardAllocationClient
with akka.cluster.sharding.external.javadsl.ExternalShardAllocationClient {
private val log = Logging(system, classOf[ExternalShardAllocationClientImpl])
private val replicator: ActorRef = DistributedData(system).replicator
private val self: SelfUniqueAddress = DistributedData(system).selfUniqueAddress
private val timeout =
system.settings.config
.getDuration("akka.cluster.sharding.external-shard-allocation-strategy.client-timeout")
.asScala
private implicit val askTimeout = Timeout(timeout * 2)
private implicit val ec = system.dispatchers.internalDispatcher
private val Key = ExternalShardAllocationStrategy.ddataKey(typeName)
override def updateShardLocation(shard: ShardId, location: Address): Future[Done] = {
log.debug("updateShardLocation {} {} key {}", shard, location, Key)
(replicator ? Update(Key, LWWMap.empty[ShardId, String], WriteLocal, None) { existing =>
existing.put(self, shard, location.toString)
}).flatMap {
case UpdateSuccess(_, _) => Future.successful(Done)
case UpdateTimeout =>
Future.failed(new ClientTimeoutException(s"Unable to update shard location after ${timeout.duration.pretty}"))
}
}
override def setShardLocation(shard: ShardId, location: Address): CompletionStage[Done] =
updateShardLocation(shard, location).toJava
override def shardLocations(): Future[ShardLocations] = {
(replicator ? Get(Key, ReadMajority(timeout)))
.flatMap {
case success @ GetSuccess(`Key`, _) =>
Future.successful(
success.get(Key).entries.transform((_, asStr) => ShardLocation(AddressFromURIString(asStr))))
case GetFailure(_, _) =>
Future.failed((new ClientTimeoutException(s"Unable to get shard locations after ${timeout.duration.pretty}")))
}
.map { locations =>
new ShardLocations(locations)
}
}
override def getShardLocations(): CompletionStage[ShardLocations] = shardLocations().toJava
}

View file

@ -0,0 +1,39 @@
/*
* Copyright (C) 2019-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding.external.javadsl
import java.util.concurrent.CompletionStage
import akka.Done
import akka.actor.Address
import akka.annotation.ApiMayChange
import akka.cluster.sharding.ShardRegion.ShardId
import akka.cluster.sharding.external.ShardLocations
/**
* API May Change
*
* Not for user extension
*/
@ApiMayChange
trait ExternalShardAllocationClient {
/**
* Update the given shard's location. The [[Address]] should
* match one of the nodes in the cluster. If the node has not joined
* the cluster yet it will be moved to that node after the first cluster
* sharding rebalance.
*
* @param shard The shard identifier
* @param location Location (akka node) to allocate the shard to
* @return Confirmation that the update has been propagated to a majority of cluster nodes
*/
def setShardLocation(shard: ShardId, location: Address): CompletionStage[Done]
/**
* Get all the current shard locations that have been set via setShardLocation
*/
def getShardLocations(): CompletionStage[ShardLocations]
}

View file

@ -0,0 +1,39 @@
/*
* Copyright (C) 2019-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding.external.scaladsl
import akka.Done
import akka.actor.Address
import akka.annotation.ApiMayChange
import akka.cluster.sharding.ShardRegion.ShardId
import akka.cluster.sharding.external.ShardLocations
import scala.concurrent.Future
/**
* API May Change
*
* Not for user extension
*/
@ApiMayChange
trait ExternalShardAllocationClient {
/**
* Update the given shard's location. The [[Address]] should
* match one of the nodes in the cluster. If the node has not joined
* the cluster yet it will be moved to that node after the first cluster
* sharding rebalance.
*
* @param shard The shard identifier
* @param location Location (akka node) to allocate the shard to
* @return Confirmation that the update has been propagated to a majority of cluster nodes
*/
def updateShardLocation(shard: ShardId, location: Address): Future[Done]
/**
* Get all the current shard locations that have been set via updateShardLocation
*/
def shardLocations(): Future[ShardLocations]
}

View file

@ -0,0 +1,186 @@
/*
* Copyright (C) 2019-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.Address
import akka.actor.PoisonPill
import akka.actor.Props
import akka.cluster.Cluster
import akka.cluster.MultiNodeClusterSpec
import akka.cluster.sharding.ExternalShardAllocationSpec.GiveMeYourHome.Get
import akka.cluster.sharding.ExternalShardAllocationSpec.GiveMeYourHome.Home
import akka.cluster.sharding.external.ExternalShardAllocation
import akka.cluster.sharding.external.ExternalShardAllocationStrategy
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.serialization.jackson.CborSerializable
import akka.testkit.ImplicitSender
import akka.testkit.TestProbe
import com.typesafe.config.ConfigFactory
import org.scalatest.concurrent.ScalaFutures
import scala.concurrent.duration._
object ExternalShardAllocationSpecConfig extends MultiNodeConfig {
commonConfig(ConfigFactory.parseString("""
akka.loglevel = INFO
akka.actor.provider = "cluster"
akka.cluster.sharding {
distributed-data.durable.lmdb {
dir = target/ExternalShardAllocationSpec/sharding-ddata
map-size = 10 MiB
}
retry-interval = 2000ms
waiting-for-state-timeout = 2000ms
rebalance-interval = 1s
}
""").withFallback(MultiNodeClusterSpec.clusterConfig))
val first = role("first")
val second = role("second")
val third = role("third")
val forth = role("forth")
}
class ExternalShardAllocationSpecMultiJvmNode1 extends ExternalShardAllocationSpec
class ExternalShardAllocationSpecMultiJvmNode2 extends ExternalShardAllocationSpec
class ExternalShardAllocationSpecMultiJvmNode3 extends ExternalShardAllocationSpec
class ExternalShardAllocationSpecMultiJvmNode4 extends ExternalShardAllocationSpec
object ExternalShardAllocationSpec {
object GiveMeYourHome {
case class Get(id: String) extends CborSerializable
case class Home(address: Address) extends CborSerializable
val extractEntityId: ShardRegion.ExtractEntityId = {
case g @ Get(id) => (id, g)
}
// shard == id to make testing easier
val extractShardId: ShardRegion.ExtractShardId = {
case Get(id) => id
}
}
class GiveMeYourHome extends Actor with ActorLogging {
val selfAddress = Cluster(context.system).selfAddress
log.info("Started on {}", selfAddress)
override def receive: Receive = {
case Get(_) =>
sender() ! Home(selfAddress)
}
}
}
abstract class ExternalShardAllocationSpec
extends MultiNodeSpec(ExternalShardAllocationSpecConfig)
with MultiNodeClusterSpec
with ImplicitSender
with ScalaFutures {
import ExternalShardAllocationSpecConfig._
import ExternalShardAllocationSpec._
import ExternalShardAllocationSpec.GiveMeYourHome._
override implicit val patienceConfig: PatienceConfig = PatienceConfig(5.second)
val typeName = "home"
val initiallyOnForth = "on-forth"
"External shard allocation" must {
"form cluster" in {
awaitClusterUp(first, second, third)
enterBarrier("cluster-started")
}
lazy val shardRegion = {
ClusterSharding(system).start(
typeName = typeName,
entityProps = Props[GiveMeYourHome],
settings = ClusterShardingSettings(system),
extractEntityId = extractEntityId,
extractShardId = extractShardId,
new ExternalShardAllocationStrategy(system, typeName),
PoisonPill)
}
"start cluster sharding" in {
shardRegion
enterBarrier("shard-region-started")
}
"default to allocating a shard to the local shard region" in {
runOn(first, second, third) {
shardRegion ! Get(myself.name)
val actorLocation = expectMsgType[Home](20.seconds).address
actorLocation shouldEqual Cluster(system).selfAddress
}
enterBarrier("local-message-sent")
}
"move shard via distributed data" in {
val shardToSpecifyLocation = "cats"
runOn(first) {
ExternalShardAllocation(system)
.clientFor(typeName)
.updateShardLocation(shardToSpecifyLocation, Cluster(system).selfAddress)
.futureValue
}
enterBarrier("shard-location-updated")
runOn(second, third) {
val probe = TestProbe()
awaitAssert({
shardRegion.tell(Get(shardToSpecifyLocation), probe.ref)
probe.expectMsg(Home(address(first)))
}, 10.seconds)
}
enterBarrier("shard-allocated-to-specific-node")
}
"allocate to a node that does not exist yet" in {
val onForthShardId = "on-forth"
val forthAddress = address(forth)
runOn(second) {
system.log.info("Allocating {} on {}", onForthShardId, forthAddress)
ExternalShardAllocation(system).clientFor(typeName).updateShardLocation(onForthShardId, forthAddress)
}
enterBarrier("allocated-to-new-node")
runOn(forth) {
joinWithin(first)
}
enterBarrier("forth-node-joined")
runOn(first, second, third) {
awaitAssert({
shardRegion ! Get(initiallyOnForth)
expectMsg(Home(address(forth)))
}, 10.seconds)
}
enterBarrier("shard-allocated-to-forth")
}
"move allocation" in {
runOn(third) {
system.log.info("Moving shard from forth to first: {}", address(first))
ExternalShardAllocation(system).clientFor(typeName).updateShardLocation(initiallyOnForth, address(first))
}
enterBarrier("shard-moved-from-forth-to-first")
runOn(first, second, third, forth) {
awaitAssert({
shardRegion ! Get(initiallyOnForth)
expectMsg(Home(address(first)))
}, 10.seconds)
}
enterBarrier("finished")
}
}
}

View file

@ -0,0 +1,55 @@
/*
* Copyright (C) 2019-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding.external
import akka.cluster.sharding.external.ExternalShardAllocationStrategy.GetShardLocation
import akka.cluster.sharding.external.ExternalShardAllocationStrategy.GetShardLocationResponse
import akka.cluster.sharding.external.ExternalShardAllocationStrategy.GetShardLocations
import akka.testkit.AkkaSpec
import akka.testkit.TestProbe
import akka.util.Timeout
import scala.concurrent.duration._
class ExternalShardAllocationStrategySpec extends AkkaSpec("""
akka.actor.provider = cluster
akka.loglevel = INFO
""") {
val requester = TestProbe()
"ExternalShardAllocation allocate" must {
"default to requester if query times out" in {
val (strat, _) = createStrategy()
strat.allocateShard(requester.ref, "shard-1", Map.empty).futureValue shouldEqual requester.ref
}
"default to requester if no allocation" in {
val (strat, probe) = createStrategy()
val allocation = strat.allocateShard(requester.ref, "shard-1", Map.empty)
probe.expectMsg(GetShardLocation("shard-1"))
probe.reply(GetShardLocationResponse(None))
allocation.futureValue shouldEqual requester.ref
}
}
"ExternalShardAllocation rebalance" must {
"default to no rebalance if query times out" in {
val (strat, probe) = createStrategy()
val rebalance = strat.rebalance(Map.empty, Set.empty)
probe.expectMsg(GetShardLocations)
rebalance.futureValue shouldEqual Set.empty
}
}
def createStrategy(): (ExternalShardAllocationStrategy, TestProbe) = {
val probe = TestProbe()
val strategy = new ExternalShardAllocationStrategy(system, "type")(Timeout(250.millis)) {
override private[akka] def createShardStateActor() = probe.ref
}
strategy.start()
(strategy, probe)
}
}

View file

@ -1544,11 +1544,12 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
case None => NotFound(key, req)
}
replyTo ! reply
} else
} else {
context.actorOf(
ReadAggregator
.props(key, consistency, req, selfUniqueAddress, nodes, unreachable, localValue, replyTo)
.withDispatcher(context.props.dispatcher))
}
}
def isLocalGet(readConsistency: ReadConsistency): Boolean =

View file

@ -162,10 +162,40 @@ The `number-of-shards` configuration value must be the same for all nodes in the
configuration check when joining. Changing the value requires stopping all nodes in the cluster.
The shards are allocated to the nodes in the cluster. The decision of where to allocate a shard is done
by a shard allocation strategy. The default implementation `akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy`
by a shard allocation strategy. The default implementation @apidoc[ShardCoordinator.LeastShardAllocationStrategy]
allocates new shards to the `ShardRegion` (node) with least number of previously allocated shards.
This strategy can be replaced by an application specific implementation.
### External shard allocation
An alternative allocation strategy is the @apidoc[ExternalShardAllocationStrategy] which allows
explicit control over where shards are allocated via the @apidoc[ExternalShardAllocation] extension.
This can be used, for example, to match up Kafka Partition consumption with shard locations.
To use it set it as the allocation strategy on your `Entity`:
Scala
: @@snip [ExternalShardAllocationCompileOnlySpec](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ExternalShardAllocationCompileOnlySpec.scala) { #entity }
Java
: @@snip [ExternalShardAllocationCompileOnlyTest](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ExternalShardAllocationCompileOnlyTest.java) { #entity }
For any shardId that has not been allocated it will be allocated to the requesting node. To make explicit allocations:
Scala
: @@snip [ExternalShardAllocationCompileOnlySpec](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ExternalShardAllocationCompileOnlySpec.scala) { #client }
Java
: @@snip [ExternalShardAllocationCompileOnlyTest](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ExternalShardAllocationCompileOnlyTest.java) { #client }
Any new or moved shard allocations will be moved on the next rebalance.
The communication from the client to the shard allocation strategy is via @ref[Distributed Data](./distributed-data.md).
It uses a single `LWWMap` that can support 10s of thousands of shards. Later versions could use multiple keys to
support a greater number of shards.
### Custom shard allocation
An optional custom shard allocation strategy can be passed into the optional parameter when initializing an entity type
or explicitly using the `withAllocationStrategy` function.
See the API documentation of @scala[`akka.cluster.sharding.ShardAllocationStrategy`]@java[`akka.cluster.sharding.AbstractShardAllocationStrategy`] for details of how to implement a custom `ShardAllocationStrategy`.